Source code for ptp.pktselection

import logging

import numpy as np
from scipy import stats

import ptp.filters

logger = logging.getLogger(__name__)

SAMPLE_MODE_BIN_0 = 10  # starting value


[docs]class PktSelection(): """Packet Selection Args: N : Observation window length data : Array of objects with simulation data """ def __init__(self, N, data): self.data = data # this pointer could be changed self._original_data = data # this should be immutable # Define window length and associated paramters self._set_window_len(N) # Initialize state state self._reset_state() def _reset_state(self): """Reset state""" self.i_batch = 0 self.data = self._original_data # Sample-mode params self._sample_mode_bin = SAMPLE_MODE_BIN_0 self._sample_mode_bin_fw = SAMPLE_MODE_BIN_0 self._sample_mode_bin_bw = SAMPLE_MODE_BIN_0 self._mode_stall_cnt = 0 def _set_window_len(self, N): """Set window length and associated parameters""" self.N = N def _window(self, v, N, shift=1, copy=False): """Split numpy vector into windows with configurable overlapping Args: v : Numpy vector to split. N : Target window length. shift : Controls the shift between consecutive windows or, equivalently, the overlap. For instance, if shift=1, each window overlaps with N-1 samples of the previous window. copy : Set True to write to the array that is returned by this function, as otherwise the returned array is a memory-sharing view of the same numpy array (like the result of a numpy.reshape). Returns: Matrix containing the overlapping windows. Each line will correspond to a window with N columns. """ sh = (v.size - N + 1, N) st = v.strides * 2 view = np.lib.stride_tricks.as_strided(v, strides=st, shape=sh)[0::shift] if (copy): return view.copy() else: return view def _sample_avg_normal(self, x_obs): """Calculate the average of a given time offset vector Args: x_obs : Vector of time offset observations Returns: The average of the time offset vector """ return np.mean(x_obs) def _sample_avg_normal_vec(self, x_obs): """Calculate the sample-average of time offset observation windows Args: x_obs : Matrix with time offset observations windows Returns: Vector with the average of each individual time offset observation window """ return np.mean(x_obs, axis=1) def _sample_median(self, t2_minus_t1_w, t4_minus_t3_w): """Calculate the sample-median estimate of a given observation window Args: t2_minus_t1_w : Vector of (t2 - t1) differences t4_minus_t3_w : Vector of (t4 - t3) differences Returns: The time offset estimate based on sample-median """ t2_minus_t1 = np.median(t2_minus_t1_w) t4_minus_t3 = np.median(t4_minus_t3_w) x_est = (t2_minus_t1 - t4_minus_t3) / 2 return x_est def _sample_median_vec(self, t2_minus_t1_mtx, t4_minus_t3_mtx): """Calculate the sample-median estimates of given observation windows Args: t2_minus_t1_mtx : Matrix with windows (as lines) containing (t2 - t1) differences t4_minus_t3_mtx : Matrix with windows (as lines) containing (t4 - t3) differences Returns: The individual time offset estimates of each observation window based on sample-median """ t2_minus_t1 = np.median(t2_minus_t1_mtx, axis=1) t4_minus_t3 = np.median(t4_minus_t3_mtx, axis=1) x_est = (t2_minus_t1 - t4_minus_t3) / 2 return x_est def _eapf(self, t2_minus_t1_w, t4_minus_t3_w): """Compute the time offset based on earliest arrivals Apply an earliest arrival packet filter (EAPF) on both master-to-slave Sync messages and slave-to-master Delay_Req messages to compute time offset. Within a window of N message exchanges, find the earliest-arriving Sync and the earliest-arriving Delay_Req, that is, the ones that were subject to minimum delay. To find the Sync, observe the index where the difference (t2 - t1) was the minimum. To find the Delay_Req, look for the minimum of the difference (t4 - t3). In the end, use these minimum differences to compute the time offset estimate. Note that not necessarily the index of minimum (t2 - t1) must be the same as the index of minimum (t4 - t3), as long as the time offset remains sufficiently constant within the window. Args: t2_minus_t1_w : Vector of (t2 - t1) differences t4_minus_t3_w : Vector of (t4 - t3) differences Returns: The time offset estimate """ t2_minus_t1 = np.amin(t2_minus_t1_w) t4_minus_t3 = np.amin(t4_minus_t3_w) x_est = (t2_minus_t1 - t4_minus_t3) / 2 return x_est def _eapf_vec(self, t2_minus_t1_mtx, t4_minus_t3_mtx): """Compute the time offset based on earliest arrivals Vectorized version of the above (`_eapf`). Instead of processing a single observation window, this function processes a matrix with many (or all) observation windows. Args: t2_minus_t1_mtx : Matrix of (t2 - t1) differences t4_minus_t3_mtx : Matrix of (t4 - t3) differences Returns: Vector with the time offset estimates corresponding to each observation window """ t2_minus_t1 = np.amin(t2_minus_t1_mtx, axis=1) t4_minus_t3 = np.amin(t4_minus_t3_mtx, axis=1) x_est = (t2_minus_t1 - t4_minus_t3) / 2 return x_est def _sample_maximum(self, t2_minus_t1_w, t4_minus_t3_w): """Compute the time offset based on latest arrivals Implement the same logic of EAPF, but with the difference that the maximum delay is pursued rather than the minimum. Args: t2_minus_t1_w : Vector of (t2 - t1) differences t4_minus_t3_w : Vector of (t4 - t3) differences Returns: The time offset estimate """ t2_minus_t1 = np.amax(t2_minus_t1_w) t4_minus_t3 = np.amax(t4_minus_t3_w) x_est = (t2_minus_t1 - t4_minus_t3) / 2 return x_est def _sample_maximum_vec(self, t2_minus_t1_mtx, t4_minus_t3_mtx): """Compute the time offset based on latest arrivals Vectorized version of the above (`_sample_maximum`). Instead of processing a single observation window, this function processes a matrix with many (or all) observation windows. Args: t2_minus_t1_mtx : Matrix of (t2 - t1) differences t4_minus_t3_mtx : Matrix of (t4 - t3) differences Returns: Vector with the time offset estimates corresponding to each observation window """ t2_minus_t1 = np.amax(t2_minus_t1_mtx, axis=1) t4_minus_t3 = np.amax(t4_minus_t3_mtx, axis=1) x_est = (t2_minus_t1 - t4_minus_t3) / 2 return x_est def _sample_mode(self, t2_minus_t1_w, t4_minus_t3_w, cnt_threshold=3, stall_patience=10): """Compute the time offset based on sample-mode Regarding the bin adjustment algorithm, note that a higher `cnt_threshold` will lead to more frequent adjustments (enlargements) of the bin width. A lower patience will also enlarge the bin width more frequently. Moreover, note that if the bin becomes too wide, the estimation resolution reduces. Hence, loosely speaking the `cnt_threshold` should be relatively small and `stall_patience` moderately higher. Args: t2_minus_t1_w : Vector of (t2 - t1) differences t4_minus_t3_w : Vector of (t4 - t3) differences cnt_threshold : Minimum number of ocurrences on the mode bin stall_patience : Number of consecutive iterations of stalled mode bin ocurrence count before the bin width is adjusted Returns: The time offset estimate """ bin_width = self._sample_mode_bin half_bin_width = 0.5 * bin_width # Quantize timestamp difference vectors t2_minus_t1_q = np.round(t2_minus_t1_w / bin_width) t4_minus_t3_q = np.round(t4_minus_t3_w / bin_width) # Find the mode for (t2 - t1) (_, idx, counts) = np.unique(t2_minus_t1_q, return_index=True, return_counts=True) mode_cnt_fw = np.amax(counts) mode_idx_fw = idx[np.argmax(counts)] t2_minus_t1 = t2_minus_t1_q[mode_idx_fw] * bin_width + half_bin_width # Find the mode for (t4 - t3) (_, idx, counts) = np.unique(t4_minus_t3_q, return_index=True, return_counts=True) mode_cnt_bw = np.amax(counts) mode_idx_bw = idx[np.argmax(counts)] t4_minus_t3 = t4_minus_t3_q[mode_idx_bw] * bin_width + half_bin_width # Detect when we can't find a mode # # In case the occurrence count of the mode bin is below a threshold for # some consecutive number of iterations (denominated patience), we # infer that the mode detection is stalled and increase the bin width. if (mode_cnt_fw < cnt_threshold and mode_cnt_bw < cnt_threshold): self._mode_stall_cnt += 1 else: self._mode_stall_cnt = 0 if (self._mode_stall_cnt > stall_patience): self._sample_mode_bin += 10 self._mode_stall_cnt = 0 # Final estimate x_est = (t2_minus_t1 - t4_minus_t3) / 2 return x_est def _sample_mode_vec(self, t2_minus_t1_mtx, t4_minus_t3_mtx, cnt_threshold=3, n_tuning=100): """Compute the time offset based on sample-mode Vectorized sample-mode implementation. Its result differs from the non-vectorized sample-mode implementation due to how the bin adjustment is implemented. Args: t2_minus_t1_mtx : Matrix of (t2 - t1) differences t4_minus_t3_mtx : Matrix of (t4 - t3) differences cnt_threshold : Minimum number of ocurrences on the mode bin n_tuning : Number of realizations used for tuning Returns: Vector with the time offset estimates corresponding to each observation window """ # Bin widths for t2-t1 and t4-t3 are independent in this implementation bin_width_fw = self._sample_mode_bin_fw bin_width_bw = self._sample_mode_bin_bw # Tune bin width # # If processing in batch mode, tune bin only for the first # batch. Otherwise, sample-mode would be quite slow. if (self.i_batch == 0): # Tune the bin width based on the first 100 observation windows done = False while (not done): # Quantize timestamp difference matrices t2_minus_t1_q = np.round(t2_minus_t1_mtx[:n_tuning, :] / bin_width_fw) t4_minus_t3_q = np.round(t4_minus_t3_mtx[:n_tuning, :] / bin_width_bw) # Find the mode t2_minus_t1_mode, mode_cnt_fw = stats.mode(t2_minus_t1_q, axis=1) t4_minus_t3_mode, mode_cnt_bw = stats.mode(t4_minus_t3_q, axis=1) # Adjust the bin such that less than 10% of the realizations # (of the windows) have a mode count below threshold. That is, # we want the mode to be significant, rather than close to a # tie. done = True if ((mode_cnt_fw < cnt_threshold).sum() > int(0.1 * n_tuning)): bin_width_fw += 10 done = False if ((mode_cnt_bw < cnt_threshold).sum() > int(0.1 * n_tuning)): bin_width_bw += 10 done = False # Save tuned width self._sample_mode_bin_fw = bin_width_fw self._sample_mode_bin_bw = bin_width_bw logger.info("t2-t1 bin was adjusted to %d" % (bin_width_fw)) logger.info("t4-t3 bin was adjusted to %d" % (bin_width_bw)) half_bin_width_fw = 0.5 * bin_width_fw half_bin_width_bw = 0.5 * bin_width_bw # Quantize timestamp difference matrices t2_minus_t1_q = np.around(t2_minus_t1_mtx / bin_width_fw).astype( np.int64) t4_minus_t3_q = np.around(t4_minus_t3_mtx / bin_width_fw).astype( np.int64) # Find the mode t2_minus_t1_mode, mode_cnt_fw = stats.mode(t2_minus_t1_q, axis=1) t4_minus_t3_mode, mode_cnt_bw = stats.mode(t4_minus_t3_q, axis=1) # Descale to revert quantization t2_minus_t1 = t2_minus_t1_mode * bin_width_fw + half_bin_width_fw t4_minus_t3 = t4_minus_t3_mode * bin_width_bw + half_bin_width_bw # Final estimates x_est = (t2_minus_t1 - t4_minus_t3) / 2 return x_est.reshape(x_est.size) def _tdiff_ops_recursive(self, drift_comp, op): """Recursive filters operating on timestamp differences (t21/t43) Supports recursive sample-minimum and sample-maximum implementations. Args: drift_comp : Whether to apply drift compensation op : Operation (min or max) """ assert (op in ['min', 'max', 'mode']) # Drift compensation array if (drift_comp): drift_corr = np.array([r["cum_drift"] for r in self.data]) else: drift_corr = np.zeros(len(self.data)) # Drift-compensated timetamp difference arrays t21 = np.array([float(r["t2"] - r["t1"]) for r in self.data]) - drift_corr t43 = np.array([float(r["t4"] - r["t3"]) for r in self.data]) + drift_corr # Recursive moving-minimum/maximum of t21 and t43 filter_map = { 'min': ptp.filters.moving_minimum, 'max': ptp.filters.moving_maximum, 'mode': ptp.filters.moving_mode } filter_op = filter_map[op] filtered_t21 = filter_op(self.N, t21) filtered_t43 = filter_op(self.N, t43) # Time offset estimates x_est = (filtered_t21 - filtered_t43) / 2 # Re-add cumulative drift and save on global data records after the # transitory of (N-1) samples: i = self.N - 1 for val in x_est: self.data[i][f"x_pkts_{op}"] = val + drift_corr[i] i += 1 def _toffset_ops_recursive(self, drift_comp, strategy): """Recursive filters operating on time offset measurements Supports recursive sample-average and EWMA implementations. Args: drift_comp : Whether to apply drift compensation strategy : Selection strategy of interest (recursive moving average or exponentially-weighted moving average). """ assert (strategy in ['avg', 'ewma']) # Drift compensation array if (drift_comp): drift_corr = np.array([r["cum_drift"] for r in self.data]) else: drift_corr = np.zeros(len(self.data)) # Drift-compensated time offset measurement array x_obs = np.array([r["x_est"] for r in self.data]) - drift_corr # Filter the measurements filter_op = ptp.filters.moving_average if ( strategy == "avg") else ptp.filters.ewma x_est = filter_op(self.N, x_obs) # Re-add cumulative drift and save on global data records after the # transitory of (N-1) samples: key = strategy.replace('-', '_') i = self.N - 1 for val in x_est: self.data[i][f"x_pkts_{key}"] = val + drift_corr[i] i += 1 def _sample_by_sample(self, strategy, drift_comp): """Sample-by-sample processing Args: strategy : Selection strategy of interest (recursive moving average or exponentially-weighted moving average). drift_comp : Whether to compensate drift of timestamp differences or time offset measurements prior to computing packet selection operators. """ # Assume that all elements of self.data contain "x_est". If drift # compensation is enabled, assume all elements contain "cum_drift". assert (all([("x_est" in r) for r in self.data])) if (drift_comp): assert (all([("cum_drift" in r) for r in self.data])) tdiff_ops = ['min', 'max', 'mode'] toffset_ops = ['avg', 'ewma'] if (strategy in tdiff_ops): self._tdiff_ops_recursive(drift_comp, strategy) elif (strategy in toffset_ops): self._toffset_ops_recursive(drift_comp, strategy) else: raise ValueError( "Strategy choice %{} unsupported".format(strategy)) def _window_by_window(self, strategy, drift_comp): """Window-by-window processing Args: strategy : Window-based packet selection strategy of interest. drift_comp : Whether to compensate drift of timestamp differences or time offset measurements prior to computing packet selection operators. """ for i in range(0, (len(self.data) - self.N) + 1): # Window start and end indexes i_s = i i_e = i + self.N # Drift correction based on cumulative time offset drift estimates if (drift_comp): cum_drift_w = np.array( [r["cum_drift"] for r in self.data[i_s:i_e]]) # Operator that processes time offset measurement windows if (strategy == 'avg'): # Observation window x_obs_w = np.array( [res["x_est"] for res in self.data[i_s:i_e]]) # Remove drift from observed time offset if (drift_comp): x_obs_w = x_obs_w - cum_drift_w x_est = self._sample_avg_normal(x_obs_w) # Operators that process windows of timestamp differences else: t2_minus_t1_w = np.array( [float(r["t2"] - r["t1"]) for r in self.data[i_s:i_e]]) t4_minus_t3_w = np.array( [float(r["t4"] - r["t3"]) for r in self.data[i_s:i_e]]) # Remove drift from observed timestamp differences if (drift_comp): t2_minus_t1_w = t2_minus_t1_w - cum_drift_w t4_minus_t3_w = t4_minus_t3_w + cum_drift_w if (strategy == 'median'): x_est = self._sample_median(t2_minus_t1_w, t4_minus_t3_w) elif (strategy == 'min'): x_est = self._eapf(t2_minus_t1_w, t4_minus_t3_w) elif (strategy == 'max'): x_est = self._sample_maximum(t2_minus_t1_w, t4_minus_t3_w) elif (strategy == 'mode'): x_est = self._sample_mode(t2_minus_t1_w, t4_minus_t3_w) else: raise ValueError("Strategy choice %s unknown" % (strategy)) # Re-add the drift that was accumulated during this observation # window to the estimate produced by the selection operator if (drift_comp): x_est = x_est + cum_drift_w[-1] # Save on global data records key = strategy.replace('-', '_') self.data[i_e - 1][f"x_pkts_{key}"] = x_est if (strategy == 'mode' and (self._sample_mode_bin != SAMPLE_MODE_BIN_0)): logger.info("Sample-mode bin was increased to %d" % (self._sample_mode_bin)) def _matrix_by_matrix(self, strategy, drift_comp, batch, batch_size): """Matrix-by-matrix processing This only processes window-based algorithms when vectorization is enabled. Each matrix consists in stacked windows. That is, each row corresponds to a distinct observation window and the number of columns correponds to the length of observation windows. Args: strategy : Window-based packet selection strategy of interest. drift_comp : Whether to compensate drift of timestamp differences or time offset measurements before computing packet selection operators. batch : Whether to split dataset into batches. batch_size : Number of observation windows on each batch. """ win_overlap = self.N - 1 # samples repeated from past window new_per_win = self.N - win_overlap # new samples per window # NOTE: assume each window of size N has N-1 entries from the previous # window (i.e. fully overlapping windows) # Corresponding number of windows and batches n_windows = int((len(self.data) - win_overlap) / new_per_win) batch_size = batch_size if batch else n_windows for i_w_s in range(0, n_windows, batch_size): i_w_e = i_w_s + batch_size # last window of this batch # Batch index self.i_batch = int(i_w_s / batch_size) logger.debug("Compute batch %d" % (self.i_batch)) # Sample range covered by the windows i_s = i_w_s * new_per_win # 1st of the 1st window i_e = i_s + (batch_size - 1) * new_per_win + self.N # last of the last # Calculate the drift correction windows if (drift_comp): cum_drift_est = np.array( [r["cum_drift"] for r in self.data[i_s:i_e]]) cum_drift_est_w = self._window(cum_drift_est, self.N, shift=new_per_win) else: cum_drift_est_w = [] # NOTE: it is OK to let _window use copy = False here. We don't # mutate these values (they are only read, but not written). if (strategy == 'avg'): # Time offset measurements: x_obs = np.array([res["x_est"] for res in self.data[i_s:i_e]], dtype='float64') # Stacked windows with time offset measurements x_obs_w = self._window(x_obs, self.N, shift=new_per_win) x_est = self._vectorized(strategy=strategy, drift_comp=drift_comp, cum_drift_est=cum_drift_est_w, x_obs=x_obs_w) else: # Timestamps differences t2_minus_t1 = np.array( [float(r["t2"] - r["t1"]) for r in self.data[i_s:i_e]]) t4_minus_t3 = np.array( [float(r["t4"] - r["t3"]) for r in self.data[i_s:i_e]]) # Form observation windows with timestamps differences t2_minus_t1_w = self._window(t2_minus_t1, self.N, shift=new_per_win) t4_minus_t3_w = self._window(t4_minus_t3, self.N, shift=new_per_win) # If batch processing is enabled, process each batch # separately. Otherwise, process all observation windows at # once. x_est = self._vectorized(strategy=strategy, drift_comp=drift_comp, cum_drift_est=cum_drift_est_w, t2_minus_t1=t2_minus_t1_w, t4_minus_t3=t4_minus_t3_w) # There is one estimate per window of this batch, except if we # don't have windows to fill the entire batch assert (len(x_est) == batch_size or i_w_e > n_windows) # Save results on global data records key = strategy.replace('-', '_') for i in range(0, len(x_est)): first_idx_in_window = i_s + i * new_per_win last_idx_in_window = first_idx_in_window + self.N - 1 self.data[last_idx_in_window][f"x_pkts_{key}"] = x_est[i] def _vectorized(self, strategy, drift_comp, cum_drift_est=None, x_obs=None, t2_minus_t1=None, t4_minus_t3=None): """Vectorized processing All observation windows are stacked as lines of a matrix Args: strategy : Select the strategy of interest. drift_comp : Whether to compensate drift of timestamp differences or time offset measurements before computing packet selection operators. cum_drift_est : Matrix with the estimates of the cumulative time offset drift due to frequency offset. x_obs : Matrix with time offset observations windows. t2_minus_t1 : Matrix of (t2 - t1) differences. t4_minus_t3 : Matrix of (t4 - t3) differences. Returns: Vector with the time offset estimates corresponding to each observation window. """ # Operator that processes time offset measurement windows if (strategy == 'avg'): # Time offset measurement assert (x_obs.any()), "Time offset measurement not available" # Remove cumulative drift if (drift_comp): x_obs -= cum_drift_est drift_offsets = cum_drift_est[:, -1] # Sample-average operator x_est = self._sample_avg_normal_vec(x_obs) # Operators that process timestamp difference windows else: # Timestamp differences assert (t2_minus_t1.any()), "Timestamp differences not available" assert (t4_minus_t3.any()), "Timestamp differences not available" # Remove drift if (drift_comp): t2_minus_t1 -= cum_drift_est t4_minus_t3 += cum_drift_est drift_offsets = cum_drift_est[:, -1] # Apply Selection operator if (strategy == 'median'): x_est = self._sample_median_vec(t2_minus_t1, t4_minus_t3) elif (strategy == 'min'): x_est = self._eapf_vec(t2_minus_t1, t4_minus_t3) elif (strategy == 'max'): x_est = self._sample_maximum_vec(t2_minus_t1, t4_minus_t3) elif (strategy == 'mode'): # FIXME: Sample-mode need to be adjusted for batch processing. x_est = self._sample_mode_vec(t2_minus_t1, t4_minus_t3) else: raise ValueError("Strategy choice %s unknown" % (strategy)) # Re-add "quasi-deterministic" cumulative drift to estimates if (drift_comp): x_est += drift_offsets return x_est
[docs] def set_window_len(self, N): """Change the window length Args: N : new window length N """ self._set_window_len(N)
[docs] def process(self, strategy, drift_comp=True, vectorize=True, batch=True, batch_size=4096, calc_drift=True, recursive=True): """Process the observations Using the raw time offset measurements, estimate the time offset using sample-average, EWMA, sample-median, sample-minimum, sample-maximum, or sample-mode over sliding windows of observations. Note: Drift compensation: If drift compensation is enabled, use vector of drifts to compensate the differences of timestamps along the observation window. Note that: .. math:: x[n] = t2[n] - (t1[n] + d_{ms}[n]) .. math:: x[n] = (t3[n] + d_{sm}[n]) - t4[n] so that .. math:: t2[n] - t1[n] = +x[n] + d_{ms}[n], .. math:: t4[n] - t3[n] = -x[n] + d_{sm}[n] However, x[n] can be modeled as: .. math:: x[n] = x_0 + drift[n], where the drift[n] term is mostly due to frequency offset, which sometimes can be accurately estimated (especially for high quality oscillators). Using this model, and if the drift is compensated, it can be stated that: .. math:: t2[n] - t1[n] - drift[n] = +x_0 + d_{ms}[n], .. math:: t4[n] - t3[n] + drift[n] = -x_0 + d_{sm}[n] In the end, this means that drift-compensated timestamp differences will lead to an approximately constant time offset corrupted by variable delay realizations. This is the ideal input to packet selection operators. Once the operators process the drift-compensated timestamp differences, ideally its result would approach: .. code-block:: x_est -> x_0 To complete the work, the total (cumulative) drift over the observation window should be re-added to the estimate, so that it approaches the true time offset by the end of the observation window (while `x_0` is the offset in the beginning): .. code-block:: x_est += drift[N-1] For the operators that process time offset observations x_est[n] directly, the drift can be similarly removed prior to the selection operator and the sum of the drift re-added to the final result. Note: Vectorization and batch processing: There are two types of packet-selection processing in this function: sample-by-sample and window-by-window. All window-based processing algorithms also have corresponding vectorized implementations. The difference on the vectorized implementations is that, instead of processing one window at a time, they process several windows at once, i.e. process a matrix with windows stacked on top of each other. This is referred to as matrix-by-matrix processing. Depending on the dataset size, it may become infeasible to stack all available windows and process the resulting matrix at once. This could consume too much memory. To overcome this, this function relies also on batch processing. In this case, the vectorized implementation is still used, but the number of windows that are processed at once is limited. They are capped to the batch size. In contrast, the two methods (moving average methods) that are sample-by-sample cannot benefit from vectorization. The reason is that they are recursive, and hence must be computed sample by sample. Args: strategy : Select the strategy of interest: "avg", "ewma", "median", "min", "max", or "mode" drift_comp : Whether to compensate drift of timestamp differences or time offset measurements prior to computing packet selection operators. vectorize : Whether to use vectorized implementation of selection operators. When enabled, all observation windows will be stacked as lines of a matrix and the full matrix will be processed at once. Otherwise, each observation window is processed independently. batch : Whether to use batch processing with vectorized implementation. When enabled, divided the matrix with all the observation windows into batches. Use this option in order to decrease the amount of RAM required for processing all windows. batch_size : Define the size of each batch, that is, the number of observation windows that will be processed at once. calc_drift : Compute the cumulative time offset drift estimates and save them on self.data before running the algorithms. In the end, remove the results from self.data. On multiprocessing worker objects, set this to false given that the parent object will already calculate the cumulative drifts and make them available to all workers through the shared self.data. recursive : Prefer a recursive implementation when available. """ if (drift_comp): drift_msg = " and drift compensation" else: drift_msg = "" logger.info("Processing sample-%s with N=%d" % (strategy, self.N) + drift_msg) # Reset state self._reset_state() # Remove previous entries of this metric key = "x_pkts_{}".format(strategy.replace('-', '_')) for r in self.data: r.pop(key, None) # If drift compensation is enabled, find where drift estimates start # and restrict the dataset to be processed by packet selection such # that all entries contain cumulative time offset drift estimates. if (drift_comp and calc_drift): for i, r in enumerate(self._original_data): if ("drift" in r): i_drift_start = i break self.data = self._original_data[i_drift_start:] assert (all([("drift" in r) for r in self.data])) # Place cumulative time offset drifts within the dataset. These # values are accessed later by the packet-selection algorithms. cum_drift = 0 for r in self.data: cum_drift += r["drift"] r["cum_drift"] = cum_drift # If there is a recursive implementation available for the chosen # strategy, use it, as it is expected to be faster. recursive_strategies = ['avg', 'min', 'max', 'ewma', 'mode'] if (strategy in recursive_strategies and recursive): # Sample-by-sample processing (recursive implementations) self._sample_by_sample(strategy, drift_comp) elif (vectorize): # Matrix-by-matrix processing self._matrix_by_matrix(strategy, drift_comp, batch, batch_size) else: # Window-by-window processing self._window_by_window(strategy, drift_comp) # Remove cumulative time offset drifts from the dataset. They are not # used elsewhere. if (drift_comp and calc_drift): for r in self.data: r.pop("cum_drift", None)