Source code for ptp.ls

"""Least-squares Estimator
"""
import logging

import numpy as np

logger = logging.getLogger(__name__)


[docs]class Ls(): """Least-squares Time Offset Estimator Args: N : observation window length (number of measurements per window) data : Array of objects with simulation data T_ns : nominal time offset measurement period in nanoseconds, used for **debugging only**. It is used to obtain the fractional frequency offset y (drift in sec/sec) when using the efficient LS implementation, since the latter only estimates y*T_ns (drif in nanoseconds/measurement). In the end, this is used for plotting the frequency offset. """ def __init__(self, N, data, T_ns=float('inf')): self.N = N self.data = data self.i_batch = 0 # Learn the measurement period in case it is not provided self.T_ns = T_ns if (np.isinf(self.T_ns)): t1 = np.array([res["t1"] for res in data]) self.T_ns = float(np.mean(np.diff(t1))) logger.warning("Automatically setting T_ns to %f ns", self.T_ns) # Matrix used by the efficient implementation self.P = (2 / (N * (N + 1))) * np.array([[(2 * N - 1), -3], [-3, 6 / (N - 1)]]) def _ls(self, x_obs, t, N, n_windows, new_per_win): """Conventional (non-optimized) LS implementation For each observation window, compute the LS solution given by inv(H'*H)*H'*x. Then, use the resulting LS estimates (initial time offset and frequency offset) to find the LS-fitted time offset at the end of the observation window. Take that value (the last fitted time offset) as the time offset estimate corresponding to the window. This implementation is expensive mainly because it has to re-create the observation matrix H on every iteration, given that this matrix H contains the instant corresponding to each observation. The ideal Sync timestamps to be used in the observation matrix H (see implementation below) would be the true values of timestamps "t2", according to the reference time, not the slave time. However, the problem with using timestamps "t2" directly is that they are subject to slave impairments. When observing a long window, the last timestamp "t2" in the window may have drifted substantially with respect to the true "t2". In contrast, timestamps "t1" are taken at the master side, so they are directly from the reference time. However, the disadvantage of using "t1" is that they do not reflect the actual Sync arrival time after the message's PDV. Returns: Tuple containing the arrays of time and frequency offset estimates corresponding to each observation window. """ # Preallocate results (one for each window) x_f = np.zeros(n_windows) y = np.zeros(n_windows) # Iterate over sliding windows of observations for i in range(0, n_windows): # Window start and end indexes i_s = i * new_per_win i_e = i_s + N # Observation window x_obs_w = x_obs[i_s:i_e] # Timestamps over observation window t_w = t[i_s:i_e] tau = np.asarray([float(tt - t_w[0]) for tt in t_w]) # Observation matrix H = np.hstack((np.ones((N, 1)), tau.reshape(N, 1))) # NOTE: the observation matrix has to be assembled every time # for this approach. The "efficient" approach does not need to # re-compute H (it doesn't even use H). # LS estimation x0, y[i] = np.linalg.lstsq(H, x_obs_w, rcond=None)[0] # LS-fitted final time offset within the observat window T_obs = float(t_w[-1] - t_w[0]) x_f[i] = x0 + (y[i] * T_obs) return x_f, y def _ls_eff(self, x_obs, N, n_windows, new_per_win): """Efficient/optimized LS implementation Finds the LS solution recursively for each observation window with less computational cost. This implementation assumes that the time offset observations are periodic and, with that, simplifies the LS observation matrix H. This choice ignores the fact that the time offset measurements come at irregular instants due to the PDV. On the other hand, it allows for significant complexity reduction. Instead of a matrix H that changes on every iteration (as in the conventional LS implementation), this approach relies on a constant matrix H that can be defined once in the beginning. Using this simplification, the solution becomes based on two accumulators. The first is a running-sum of the time offset observations. The second accumulator is a weighted sum of the observations. Both of them can be efficiently computed recursively. Refer to further information and the derivation within Igor Freire's thesis, Section 3.6 Returns: Tuple containing the arrays of time and frequency offset estimates corresponding to each observation window. """ assert(new_per_win == 1),\ "The efficient LS implementation requires overlapping windows" # Preallocate results (one for each window) x_f = np.zeros(n_windows) y = np.zeros(n_windows) # Preallocate accumulator Q = np.zeros(2) # Starting estimates (starting point for the recursive implementation) Q[0] = np.sum(x_obs[:N]) Q[1] = np.sum(np.multiply(np.arange(N), x_obs[:N])) Theta = np.dot(self.P, Q.T) x_f[0] = Theta[0] + (Theta[1] * (N - 1)) y[0] = Theta[1] / self.T_ns # Iterate over sliding windows of observations for i in range(1, n_windows): # Window start and end indexes i_s = i * new_per_win i_e = i_s + N # Update the accumulators Q[0] -= x_obs[i_s - 1] Q[0] += x_obs[i_e - 1] Q[1] -= Q[0] Q[1] += N * x_obs[i_e - 1] # LS Estimation Theta = np.dot(self.P, Q.T) x0 = Theta[0] # initial time offset within window y_times_T_ns = Theta[1] # drift in nanoseconds/measurement # Fit the final time offset within the current window x_f[i] = x0 + (y_times_T_ns * (N - 1)) # Fractional frequency offset y[i] = y_times_T_ns / self.T_ns return x_f, y def _ls_eff_vec(self, x_obs, N, n_windows, new_per_win): """Vectorized version of the efficient/optimized LS implementation Uses the same formulation as in the above efficient LS implementation. The difference here is that all observation windows are processed at once. The windows are stacked as the columns of a (N x n_windows) matrix and this matrix is processed at once. This implementation is expected to be faster than the sequential (non-vectorized) efficient implementation. However, it uses more memory because it creates a matrix with all (overlapping) windows. Returns: Tuple containing the arrays of time and frequency offset estimates corresponding to each observation window. """ assert(new_per_win == 1),\ "The efficient LS implementation requires overlapping windows" # Stack overlapping windows into columns of a matrix X_obs = np.zeros(shape=(N, n_windows)) for i in range(N): X_obs[i, :] = x_obs[i:(i + n_windows):new_per_win] # Q1 and Q2 accumulator values for each window Q = np.zeros(shape=(2, n_windows)) Q[0, :] = np.sum(X_obs, axis=0) Q[1, :] = np.dot(np.arange(N), X_obs) # LS estimations Theta = np.dot(self.P, Q) X0 = Theta[0, :] Y_times_T_ns = Theta[1, :] Xf = X0 + (Y_times_T_ns * (N - 1)) Y = Y_times_T_ns / self.T_ns return Xf, Y def _compute(self, x_obs, impl, t=None, shift=1): """Compute least squares based on the observation vector Args: x_obs : Vector of time offset observations. impl : Least-squares implementation: "eff-vec", "eff", "t2", or "t1". t : Vector of Sync departure or arrival timestamps. Departure when impl=="t1" and arrival when impl=="t2". Used by the "t1" and "t2" implementations only. shift : Determines the shift (in samples) between consecutive windows or, equivalently, the window overlap. A shift of 1 means that a window of N samples contains N-1 samples from the previous window (i.e., it is fully overlapping). """ if (impl == "t1" or impl == "t2"): assert (t is not None) n_data = len(x_obs) N = self.N win_overlap = N - shift # Window overlap new_per_win = shift # new samples per window n_windows = int((n_data - win_overlap) / new_per_win) logger.debug("Compute batch %d with %d windows" % (self.i_batch, n_windows)) self.i_batch += 1 # Call the LS implementation of choice if (impl == "eff-vec"): return self._ls_eff_vec(x_obs, N, n_windows, new_per_win) elif (impl == "eff"): return self._ls_eff(x_obs, N, n_windows, new_per_win) else: return self._ls(x_obs, t, N, n_windows, new_per_win)
[docs] def process(self, impl="eff", batch_mode=True, batch_size=4096): """Process the observations Using the raw time offset offset measurements and potentially also the Sync arrival/departure timestamps, estimate the time and frequency offset corresponding to each window of samples. Args: impl : Least-squares implementation. Choose between the following: "t2" : Uses timestamp "t2" when forming the observation matrix H. "t1" : Uses timestamp "t1" when forming the observation matrix H. "eff" (default) : Computationally-efficient implementation. "eff-vec" : Computationally-efficient and vectorized implementation. batch_mode : Whether to process observation windows in batches, rather than trying to process all windows at once. Especially important for the vectorized efficient (eff-vec) implementation. batch_size : Number of observation windows that compose a batch. """ assert(impl in ["t1", "t2", "eff", "eff-vec"]), \ "Unsupported LS implementation" logger.info("Processing with N=%d" % (self.N)) n_data = len(self.data) win_overlap = self.N - 1 # samples repeated from past window new_per_win = self.N - win_overlap # new samples per window # NOTE: assume each window of size N has N-1 entries from the previous # window (i.e. fully overlapping windows) # Corresponding number of windows and batches n_windows = int((n_data - win_overlap) / new_per_win) batch_size = batch_size if batch_mode else n_windows # Iterate over batches for i_w_s in range(0, n_windows, batch_size): i_w_e = i_w_s + batch_size # last window of this batch # Sample range covered by the windows i_s = i_w_s * new_per_win # 1st of the 1st window i_e = i_s + (batch_size - 1) * new_per_win + self.N # last of the last # Vector of noisy time offset observations x_obs = np.array([res["x_est"] for res in self.data[i_s:i_e]]) # For impl="t1" or impl="t2", vector of timestamps: if (impl == "t1"): t = np.array([res["t1"] for res in self.data[i_s:i_e]]) elif (impl == "t2"): t = np.array([res["t2"] for res in self.data[i_s:i_e]]) else: t = None # Compute LS for each window of this batch: Xf, Y = self._compute(x_obs, impl, t) assert (len(Xf) == batch_size or i_w_e > n_windows) assert (len(Xf) == len(Y)) for i in range(0, len(Xf)): first_idx_in_window = i_s + i * new_per_win last_idx_in_window = first_idx_in_window + self.N - 1 self.data[last_idx_in_window]["x_ls_" + impl] = Xf[i] self.data[last_idx_in_window]["y_ls_" + impl] = Y[i]