Source code for ptp.frequency

"""Frequency Offset Estimators
"""
import logging

import numpy as np

import ptp.cache
import ptp.filters

logger = logging.getLogger(__name__)


[docs]class Estimator(): """Frequency offset estimator Args: data : Array of objects with simulation data delta : (int > 0) Observation interval in samples. When set to 1, estimates the frequency offsets based on consecutive data entries. When set to 2, estimates frequency offset i based on timestamps from the i-th iteration and from iteration 'i-2', and so on. pkts : Packet selection/filtering strategy to apply before computing unbiased frequency offet estimates (sample-min or sample-max). When set to None, disables the pre-filtering stage. N_pkts : Packet selection window length. """ def __init__(self, data, delta=1, pkts=None, N_pkts=128): assert (delta > 0) assert (isinstance(delta, int)) assert (pkts in ["sample-min", "sample-max", None]) self.data = data self.delta = delta # If using packet selection self.pkts = pkts self.N_pkts = 128 def _is_cached_cfg_valid(self, cfg, target_cfg): """Check if the cached configuration is valid Compare the cached configuration to a target configuration Args: cfg : Cached configuration. target_cfg : Target configuration. Returns: (bool) True when cached configuration is valid. """ for k in target_cfg: # All keys from the target configuration must be present on the # cached configuration if (k not in cfg): return False # All values must match if (cfg[k] != target_cfg[k]): return False return True def _eval_drift_err(self, loss, criterion, n_samples=0, N=8192): """Evaluate the drift estimation error relative to true drift Assess the quality of the drift estimates by comparing them to the true time offset drifts. Do so considering that the true time offset drifts contain an uncertainty. Assuming that each true time offset is actually "x + u", where "u" is the label uncertainty, consider a maximum uncertainty on true drifts of "2u". Often the true drift between consecutive samples is significantly lower than the uncertainty "2u". Hence, it is often undesirable to consider the instantaneous drift estimation error, as it will be masked by the uncertainty. To overcome this problem, accumulate drift estimates and compare the cumulative values. Accumulate a window of drift estimates long enough such that the cumulative values exceeds the uncertainty significantly. For example, if a frequency offset as low as 10 ppb is expected, and "2*u = 16" ns, accumulate at least 16 seconds so that the cumulative drift values become 10x higher than the uncertainty. Another important requirement is that the drift estimates must be smooth. They are ultimately used for drift compensation within packet selection algorithms. On these algorithms, the drift is first subtracted, then re-added in the end. This means that the accuracy of the cumulative drifts relative to the true cumulative drift since the beginning of the dataset is not important. What matters the most is the accuracy of drift estimates within the observation window. Try to optimize the drifts such that they do not deviate too much from the true drifts over short-term windows. This function uses the drift estimates and true time offset values that are available internally on self.data. Args: loss : Loss function (mse or max-error) criterion : Error criterion used for tuning: cumulative time offset drift error or instantaneous time offset drift error. n_samples : Number of drift samples to consider (0, the default, considers all samples) N : Number of drift estimates to accumulate in a window. Returns: Scalar value representing the MSE (mean square error) or max|error| (maximum absolute error) of the current drift estimates. """ assert (criterion in ['cumulative', 'instantaneous']) assert (loss in ["mse", "max-error"]) # Instantaneous true and estimated drifts true_drift = np.array([(r["x"] - self.data[i - 1]["x"]) for i, r in enumerate(self.data) if "drift" in r]) drift_est = np.array([r["drift"] for r in self.data if "drift" in r]) if (criterion == 'instantaneous'): drift_err = (drift_est - true_drift)[-n_samples:] if (loss == "mse"): return np.square(drift_err).mean() elif (loss == "max-error"): return np.amax(np.abs(drift_err)) # Running-sum of drifts over a rolling window of N samples. If there is # less than N samples in the dataset, use a cumulative sum. if (len(self.data) <= N): true_cum_drift = true_drift.cumsum() cum_drift_est = drift_est.cumsum() else: h = np.ones(N) true_cum_drift = np.convolve(true_drift, h, mode="valid") cum_drift_est = np.convolve(drift_est, h, mode="valid") # Normalized cumulative drift estimation errors # # Normalize by the absolute of the true drifts such that different # frequency offsets (with varying levels of drifts) are equally # weighted. For example, with 128 sample per second, a 10 ppb offset # leads to drifts in the order of 10 ns when accumulated over N=128, # whereas a 100 ppb freq. offset yields cumulative drifts in the order # of 100 ns. If we estimate cumulative drifts of 9 ns and 90 ns, # respectively, the absolute errors are 1 ns and 10 ns, respectively, # which are unfair to compare (e.g., the max-error loss will focus on # high frequency offsets). In contrast, the normalized errors are 0.1 # in both cases, which can be compared fairly. # # To avoid Inf values, return zero for the normalized error everywhere # the true cumulative drift (the normalization factor) presents zero # values. Sometimes the cumulative error window has a net cumulative # drift of zero nanoseconds, which leads to this scenario. cum_drift_err = (cum_drift_est - true_cum_drift)[-n_samples:] norm_factor = np.abs(true_cum_drift[-n_samples:]) norm_cum_drift_err = np.divide(cum_drift_err, norm_factor, out=np.zeros_like(cum_drift_err), where=(norm_factor != 0)) if (loss == "mse"): return np.square(norm_cum_drift_err).mean() elif (loss == "max-error"): return np.amax(np.abs(norm_cum_drift_err)) def _get_window_range(self, max_window_span): """Compute the range of window lengths for frequency offset estimates Compute window ranges both for the unbiased frequency offset estimates and for the packet filtering layer preceding it, if used. Use a relatively short window range for the packet filtering layer, given that, unlike the processing used for time offset estimates, there is no drift correction prior to this packet filtering (for frequency offset estimates). After all, we're still trying to estimate the drifts. Args: max_window_span : Maximum fraction of the dataset to be occupied by the observation window. This parameter controls the maximum tolerable latency to obtain the first frequency offset estimate (by the end of the first observation window). Returns: Tuple containing the arrays (np.ndarray) of window lengths to be used for frequency offset estimation and by the preceding packet filtering layer when active. """ # Short packet filtering window range if (self.pkts is not None): pkts_window_len = 2**np.arange(2, 10) # 2**2 to 2**9 else: pkts_window_len = [0] # Window limit for the actual frequency offset estimation window_len_lim = max_window_span * len(self.data) # If using packet filtering, consider its transient on the window limit if (self.pkts is not None): window_len_lim -= np.max(pkts_window_len) log_max_window = int(np.floor(np.log2(window_len_lim))) window_len = 2**np.arange(1, log_max_window + 1) return window_len, pkts_window_len def _pkts(self, strategy): """Estimate the frequency offsets using packet selection pre-processing Based on timestamp differences: t21[n] = t2[n] - t1[n] = +x[n] + dms[n], t43[n] = t4[n] - t3[n] = -x[n] + dsm[n], process a window of t21[n] and/or t43[n] and estimate the frequency offset using the filtered processed timestamps. Filter with a moving-minimum or moving-maximum filter, so that the delays are more consistently canceled out in the noise term that affects the unbiased frequency offset estimates. Args: strategy : Select between one-way, one-way-reversed, and two-way. The one-way strategy uses the m-to-s timestamps (t1 and t2) only. The two-way strategy relies on the two-way time offset measurements (x_est), namely on all timestamps (t1, t2, t3, and t4). The reversed one-way strategy uses the s-to-m timestamps (t3 and t4) only. """ assert (self.pkts is not None) assert (strategy in ["one-way", "one-way-reversed", "two-way"]) # Packet filtering window and operator N = self.N_pkts op = ptp.filters.moving_minimum if self.pkts == "sample-min" else \ ptp.filters.moving_maximum if (strategy == "one-way"): t1 = np.array([float(r["t1"]) for r in self.data]) t21 = np.array([float(r["t2"] - r["t1"]) for r in self.data]) t21_min = op(N, t21) delta_t1 = t1[(self.delta + N - 1):] - t1[(N - 1):-self.delta] delta_t21 = t21_min[self.delta:] - t21_min[:-self.delta] y_est = delta_t21 / delta_t1 elif (strategy == "one-way-reversed"): t4 = np.array([float(r["t4"]) for r in self.data]) t43 = np.array([float(r["t4"] - r["t3"]) for r in self.data]) t43_min = op(N, t43) delta_t4 = t4[(self.delta + N - 1):] - t4[(N - 1):-self.delta] delta_t43 = t43_min[self.delta:] - t43_min[:-self.delta] y_est = -delta_t43 / delta_t4 elif (strategy == "two-way"): t1 = np.array([float(r["t1"]) for r in self.data]) t21 = np.array([float(r["t2"] - r["t1"]) for r in self.data]) t43 = np.array([float(r["t4"] - r["t3"]) for r in self.data]) t21_min = op(N, t21) t43_min = op(N, t43) delta_t1 = t1[(self.delta + N - 1):] - t1[(N - 1):-self.delta] delta_t21 = t21_min[self.delta:] - t21_min[:-self.delta] delta_t43 = t43_min[self.delta:] - t43_min[:-self.delta] y_est = 0.5 * (delta_t21 - delta_t43) / delta_t1 for i, r in enumerate(self.data[(self.delta + N - 1):]): r["y_est"] = y_est[i]
[docs] def process(self, strategy="two-way"): """Process the data Estimate the frequency offset relative to the reference over all the data. Do so by comparing interval measurements of the slave and the master. This is equivalent to differentiating the time offset measurements. Compute one of the following: .. code-block:: y_est_1[n] = (t21[n] - t21[n-N]) / (t1[n] - t1[n-N]), or .. code-block:: y_est_2[n] = -(t43[n] - t43[n-N]) / (t4[n] - t4[n-N]), or .. code-block:: y_est_3[n] = (y_est_1[n] + y_est_2[n]) / 2. These are the one-way, reversed one-way, and two-way implementations, respectively. Args: strategy : Select between one-way, one-way-reversed, and two-way. The one-way strategy uses the m-to-s timestamps (t1 and t2) only. The two-way strategy relies on the two-way time offset measurements (x_est), namely on all timestamps (t1, t2, t3, and t4). The reversed one-way strategy uses the s-to-m timestamps (t3 and t4) only. """ assert (strategy in ["one-way", "one-way-reversed", "two-way"]) logger.info("Processing with N=%d" % (self.delta)) # Remove previous estimates in case they already exist for r in self.data: r.pop("y_est", None) if (self.pkts is not None): self._pkts(strategy) return if (strategy == "one-way"): t1 = np.array([float(r["t1"]) for r in self.data]) t2 = np.array([float(r["t2"]) for r in self.data]) delta_slave = t2[self.delta:] - t2[:-self.delta] delta_master = t1[self.delta:] - t1[:-self.delta] y_est = (delta_slave - delta_master) / delta_master elif (strategy == "one-way-reversed"): t3 = np.array([float(r["t3"]) for r in self.data]) t4 = np.array([float(r["t4"]) for r in self.data]) delta_slave = t3[self.delta:] - t3[:-self.delta] delta_master = t4[self.delta:] - t4[:-self.delta] y_est = (delta_slave - delta_master) / delta_master elif (strategy == "two-way"): t1 = np.array([float(r["t1"]) for r in self.data]) x_est = np.array([float(r["x_est"]) for r in self.data]) delta_x = x_est[self.delta:] - x_est[:-self.delta] delta_master = t1[self.delta:] - t1[:-self.delta] y_est = delta_x / delta_master for i, r in enumerate(self.data[self.delta:]): r["y_est"] = y_est[i]
[docs] def optimize_to_y(self, strategy, loss="mse", max_window_span=0.2): """Optimize the observation interval used for freq. offset estimation Optimizes the observation interval used for unbiased frequency offset estimations in order to minimize the error between the estimates and the true frequency offset. This minimization can be either in terms of the mean square error (MSE) or the maximum absolute error (max|error|). The problem with this approach is that the truth in "y" (the true frequency offset) is questionable. Since the true y values are also computed based on windows, the window length used for the truth computation affects results and may render the optimization below less effective for drift compensation. Args: strategy : Unbiased frequency offset estimation strategy. loss : Loss function used to optimize the window length (mse or max-error). max_window_span : Maximum fraction of the dataset to be occupied by the observation window. This parameter controls the maximum tolerable latency to obtain the first frequency offset estimate (by the end of the first observation window). """ assert (strategy in ["one-way", "one-way-reversed", "two-way"]) assert (loss in ["mse", "max-error"]) # Window length ranges window_len, pkts_window_len = self._get_window_range(max_window_span) # Number of samples guaranteed to be available for all window lengths n_samples = int(np.floor((1 - max_window_span) * len(self.data))) assert (n_samples > 0) logger.info("Optimize observation window") logger.info("Try from N = {} to N = {}".format(min(window_len), max(window_len))) if (self.pkts is not None): logger.info("Try {} from N = {} to N = {}".format( self.pkts, min(pkts_window_len), max(pkts_window_len))) min_error = np.inf N_opt = 0 N_pkts_opt = 0 for N_pkts in pkts_window_len: for N in window_len: # Re-estimate using new window lengths self.delta = N self.N_pkts = N_pkts self.process(strategy) # Estimation error y_err = np.array([ 1e9 * (r["y_est"] - r["rtc_y"]) for r in self.data[self.delta:] if ("y_est" in r and "rtc_y" in r) ]) if (loss == "mse"): error = np.square(y_err[:n_samples]).mean() # Only use `n_samples` out of y_err. This way, all window # lengths are compared based on the same number of samples. elif (loss == "max-error"): error = np.amax(np.abs(y_err[:n_samples])) if (error < min_error): N_opt = N N_pkts_opt = N_pkts min_error = error loss_label = "MSE" if loss == "mse" else "Max|Error|" logger.info("Minimum {}: {} ppb".format(loss_label, error)) logger.info("Optimum N: {}".format(N_opt)) logger.info("Optimum N_pkts: {}".format(N_pkts_opt)) self.delta = N_opt self.N_pkts = N_pkts_opt
[docs] def optimize_to_drift(self, strategy, loss="mse", criterion='cumulative', max_window_span=0.2, cache=None, cache_id='drift_estimator', force=False): """Optimize the observation interval used for freq. offset estimation Optimize based on the time offset drift estimation errors. This optimizer can lead to better performance because, in the end, what we really care about is predicting drifts accurately. Args: strategy : Unbiased frequency offset estimation strategy. loss : Loss function used to optimize the window length (mse or max-error). criterion : Whether the loss function is applied to the cumulative time offset drift or the instantaneous time offset drift error. max_window_span : Maximum fraction of the dataset to be occupied by the observation window. This parameter controls the maximum tolerable latency to obtain the first frequency offset and time offset drift estimates (by the end of the first observation window). cache : Cache handler used to save the optimal configuration on a JSON file. cache_id : Cache object identifier. force : Force processing even if a configuration file with the optimal parameters already exists in cache. Note: The cumulative criterion typically leads to better optimization performance. The absolute criterion considers estimation errors that are often masked by the uncertainty on dataset labels. For example, if the truth labels have an uncertainty of +-8 ns and the instantaneos drift error is < 1 ns, then the instataneous drift error becomes negligible relative to the uncertainty. In contrast, the cumulative criterion accumulates error such that the cumulative values are significantly greater than the intrinsic uncertainty on dataset labels. For example, if the instantaneous drift is in the order of 1 ns and is accumulated over 200 samples, the cumulative result (around 200 ns) becomes significantly greater than the truth uncertainty (e.g., 8 ns). Consequently, the optimization based on cumulative error considers the actual drift estimation errors. """ assert (strategy in ["one-way", "one-way-reversed", "two-way"]) assert (criterion in ['cumulative', 'instantaneous']) assert (loss in ["mse", "max-error"]) # Check if a cached configuration file exists and is valid if (cache is not None): assert (isinstance(cache, ptp.cache.Cache)), "Invalid cache object" cached_cfg = cache.load(cache_id) if (cached_cfg and not force): target_cfg = { 'data_len': len(self.data), 'max_transient': max_window_span, 'strategy': strategy, 'loss': loss, 'criterion': criterion, 'pkts': self.pkts } if (self._is_cached_cfg_valid(cached_cfg, target_cfg)): self.delta = cached_cfg['N'] self.N_pkts = cached_cfg['N_pkts'] return else: logger.info("Unable to find cached configuration file") # Window length ranges window_len, pkts_window_len = self._get_window_range(max_window_span) # Number of samples guaranteed to be available for all window lengths n_samples = int(np.floor((1 - max_window_span) * len(self.data))) assert (n_samples > 0) logger.info("Optimize observation window") logger.info("Try from N = {} to N = {}".format(min(window_len), max(window_len))) if (self.pkts is not None): logger.info("Try {} from N = {} to N = {}".format( self.pkts, min(pkts_window_len), max(pkts_window_len))) m_error = np.inf N_opt = 0 N_pkts_opt = 0 for N_pkts in pkts_window_len: for N in window_len: # Re-estimate using new window lengths self.delta = N self.N_pkts = N_pkts self.process(strategy) self.estimate_drift() # Estimation error error = self._eval_drift_err(loss, criterion, n_samples) if (error < m_error): m_error = error N_opt = N N_pkts_opt = N_pkts loss_label = "MSE" if loss == "mse" else "Max|Error|" logger.info("Minimum {}: {} ppb".format(loss_label, m_error)) logger.info("Optimum N: {}".format(N_opt)) logger.info("Optimum N_pkts: {}".format(N_pkts_opt)) self.delta = N_opt self.N_pkts = N_pkts_opt # Save optimal configuration and metadata to cache if (cache is not None): cache.save( { 'data_len': len(self.data), 'max_transient': max_window_span, 'criterion': criterion, 'loss': loss, 'strategy': strategy, 'pkts': self.pkts, 'N': int(N_opt), 'N_pkts': int(N_pkts_opt) }, identifier=cache_id)
[docs] def set_truth(self, delta=None): """Set "true" frequency offset based on "true" time offset measurements Args: delta : (int > 0) Observation interval in samples. When set to 1, estimates the frequency offsets based on consecutive data entries. When set to 2, estimates frequency offset i based on timestamps from the i-th iteration and from iteration 'i-2', and so on. When set to None, use the delta value set in self.delta (default: None) """ for r in self.data: r.pop("rtc_y", None) delta = delta or self.delta t1 = np.array([float(r["t1"]) for r in self.data]) x = np.array([r["x"] for r in self.data]) dx = x[delta:] - x[:-delta] dt1 = t1[delta:] - t1[:-delta] y = dx / dt1 for i, r in enumerate(self.data[delta:]): r["rtc_y"] = y[i]
[docs] def estimate_drift(self): """Estimate the incremental drifts due to frequency offset On each iteration, the true time offset changes due to the instantaneous frequency offset. On iteration n, with freq. offset y[n], it will roughly change w.r.t. the previous iteration by: drift[n] = y[n] * (t1[n] - t1[n-1]) Estimate these incremental changes and save on the dataset. """ # Clean previous estimates for r in self.data: r.pop("drift", None) # Compute the drift within the observation window for i, r in enumerate(self.data[1:]): if ("y_est" in r): delta = float(r["t1"] - self.data[i]["t1"]) # NOTE: index i is the enumerated index, not the data entry # index. Since we get self.data[1:] (i.e. starting from index # 1), "i" lags the actual data index by 1. r["drift"] = r["y_est"] * delta
def _calc_loop_constants(self, damping, loopbw): """Compute the proportional and integral gains Refer to "Rice, Michael. Digital Communications: A Discrete-Time Approach. Appendix C." """ theta_n = loopbw / (damping + (1.0 / (4 * damping))) denomin = (1 + 2 * damping * theta_n + (theta_n**2)) Kp_K0_K1 = (4 * damping * theta_n) / denomin Kp_K0_K2 = (4 * (theta_n**2)) / denomin # And since Kp and K0 are assumed unitary Kp = Kp_K0_K1 # proportional gain Ki = Kp_K0_K2 # integrator gain return Kp, Ki
[docs] def loop(self, damping=1.0, loopbw=0.001, settling=0.2): """Estimate time offset drifts using PI loop The PI loop tries to minimize the error between the input time offset estimate and the time offset that is expected based on not only the previous estimate, but also the loop's notion of frequency offset. In the long term, the loop should converge to learning the average time offset drift (or frequency offset) such that the error is minimized. The time offset drift estimates that are produced by the loop are only saved in the main data list after the loop settles. Other blocks will use the presence of drift estimates to infer when drift estimates are locked. In other words, if the "drift" key is not in a dict of the data list, the loop is not locked yet at this point. Note that the convergence of the loop is not explicitly tested here. Instead, we simply define the portion of the dataset that can be used for settling. All estimates produced past this portion of the dataset are saved, the other values are discarded. The rationale is that the optimizer also considers the same settling time margin. Hence, if the damping and loopbw parameters came from the optimizer, the values past the chosen settling time are already optimal and very likely effectively settled (converged). Args: damping : Damping factor loopbw : Loop bandwidth settling : Fraction of the dataset over which the loop can settle. Results are only saved after this portion of the dataset. """ logger.debug("Run PI loop with damping {:f} and loop bw {:f}".format( damping, loopbw)) Kp, Ki = self._calc_loop_constants(damping, loopbw) # Clean previous estimates for r in self.data: r.pop("drift", None) r.pop("x_loop", None) # Index after which the loop is assumed to be settled i_settling = int(np.floor(settling * len(self.data))) # Run loop f_int = 0 dds = self.data[0]["x_est"] for i, r in enumerate(self.data): x_est = r["x_est"] err = x_est - dds f_prop = Kp * err f_int += Ki * err f_err = f_prop + f_int if (i >= i_settling): r["drift"] = f_err r["x_loop"] = dds dds += f_err
[docs] def optimize_loop(self, criterion='cumulative', loss="mse", cache=None, cache_id='loop', force=False, max_transient=0.2): """Find loop parameters that minimize the drift estimation error Tries some pre-defined damping factor and loop bandwidth values. By default the damping factor and loop bandwidth are tuned using the cumulative drift instead of the absolute. This is because the latter leads to a time offset drift estimation that is very close to the 'optimize_to_y', while the former yield the best estimation performance. Args: criterion : Error criterion used for tuning: cumulative or instantaneous time offset drift error. loss : Loss function (mse or max-error). cache : Cache handler used to save the optimal configuration on a JSON file. cache_id : Cache object identifier force : Force processing even if a configuration file with the optimal parameters already exists in cache. max_transient : Maximum fraction of the dataset to be occupied by the transient phase of the loop. This parameter controls the maximum tolerable latency to obtain the first drift estimates. """ assert (criterion in ['cumulative', 'instantaneous']) assert (loss in ["mse", "max-error"]) # Check if a cached configuration file exists and is valid if (cache is not None): assert (isinstance(cache, ptp.cache.Cache)), "Invalid cache object" cached_cfg = cache.load(cache_id) if (cached_cfg and not force): target_cfg = { 'data_len': len(self.data), 'max_transient': max_transient, 'criterion': criterion, 'loss': loss } if (self._is_cached_cfg_valid(cached_cfg, target_cfg)): best_damping = cached_cfg['damping'] best_loopbw = cached_cfg['loopbw'] return best_damping, best_loopbw else: logger.info("Unable to find cached configuration file") damping_vec = [0.5, 0.707, 1.0, 1.2, 1.5, 1.8, 2.0] loopbw_vec = np.concatenate( (np.arange(0.1, 1.0, 0.1), np.arange(0.01, 0.1, 0.01), np.arange(0.001, 0.01, 0.001), np.arange(0.0001, 0.001, 0.0001), np.arange(0.00001, 0.0001, 0.00001))) m_error = np.inf best_damping = None best_loopbw = None for damping in damping_vec: for loopbw in loopbw_vec: # Run loop with the given configurations self.loop(damping=damping, loopbw=loopbw, settling=max_transient) # Evaluate the drift estimation error error = self._eval_drift_err(loss, criterion) # NOTE: there is no need to restrict the range of samples to be # used in this error evaluation. The loop only saves the # estimates that are past the desired transient. Hence, all # samples considered within self._eval_drift_err() are already # within the desired portion of the dataset used for analysis. if (error < m_error): m_error = error best_damping = damping best_loopbw = loopbw logger.info("PI loop optimization") logger.info("Damping factor: {:f}".format(best_damping)) logger.info("Loop bandwidth: {:f}".format(best_loopbw)) # Save optimal configuration and metadata to cache if (cache is not None): cache.save( { 'data_len': len(self.data), 'max_transient': max_transient, 'criterion': criterion, 'loss': loss, 'damping': best_damping, 'loopbw': best_loopbw }, identifier=cache_id) return best_damping, best_loopbw