"""Kalman Filter
"""
import logging
import numpy as np
import ptp.cache
logger = logging.getLogger(__name__)
[docs]class KalmanFilter():
"""Kalman Filter for Time/Frequency Offset Processing
The Kalman state vector is:
.. code-block::
s[n] = [ x[n] ]
[ y[n] ]
that is, composed by the time offset (:math:`x[n]`) and frequency offset
(:math:`y[n]`).
The recursive model for the time offset is:
.. math::
x[n] = x[n-1] + y[n-1]*T + w_x[n],
where :math:`w_x[n]` is the time offset state noise.
The recursive model for the frequency offset is:
.. math::
y[n] = y[n-1] + w_y[n],
where :math:`w_y[n]` is the frequency offset state noise.
Thus, the state transition matrix becomes:
.. code-block::
A = [ 1 T ],
[ 0 1 ]
such that:
.. math::
s[n+1] = A*s[n] + w[n],
where :math:`w[n] \\sim N(0, Q)` is the state noise vector and Q is the
so-called state noise covariance matrix.
The state noise covariance matrix takes the expected variability of the
true state into account. The true frequency offset changes over time due to
several oscillator noise sources. Similarly, the time offset uncertainty
comes from phase noise and other effects. Both of these uncertainties are
small in magnitude, so that the transition covariance matrix is expected to
have small values.
The model in [1] considers random-walk in both time and frequency. These
are given in terms of normalized variances, which must be scaled by the
observation period (Sync message period). Here, in contrast, we consider
non-normalized variances, "var_y" and "var_x".
.. code-block::
Q = [ var_x, 0, ]
[ 0, var_y ]
In addition to the state model, the Kalman filter also uses a measurement
model.
The measurement model can rely on scalar observations, as presented in [2],
where only the time offset is observed, or vector observations, as
presented in [1], where both time and frequency offsets are observed.
In the scalar-observation model, the observation is given by:
.. math::
z[n] = x[n] + v_x[n],
where :math:`z[n]`, :math:`x[n]` (time offset), and :math:`v_x[n]` are all
scalars.
In the vector-observation model, it is given by:
.. code-block::
z[n] = [ x[n] ] + [ v_x[n] ]
[ y[n] ] [ v_y[n] ],
where :math:`z[n]` is (2x1).
The observed values come directly from the raw time and frequency offset
measurements that are taken after each delay request-response exchange.
That is, :math:`z[n] = [\\tilde{x}[n], \\tilde{y}[n]]^T`.
The observation covariance matrix reflects the confidence on the
observations. Both time and frequency offset observations are expected to
be very noisy. Thus, it is important to define sufficiently high
variances/covariances in this covariance matrix.
When ignoring the timestamping noise (e.g., due to the timestamp
granularity) and assuming that the PDV is the predominant noise, the
observation noise covariance matrix can be shown to be given by:
.. code-block::
R = [ (Var{d_ms} + Var{d_sm})/4 ]
when the scalar-observation model is used, or:
.. code-block::
R = [ (Var{d_ms} + Var{d_sm})/4 , Var{d_ms} / (2*N*T) ]
[ Var{d_ms} / (2*N*T) , 2*Var{d_ms} / (N*T)**2 ]
when the vector-observation model is used.
The former (scalar-observation) is easier to compute in practice because it
can be computed by "Var{d_est}", namely the variance of the delay estimates
taken from PTP two-way exchanges. In contrast, the latter
(vector-observation) requires the knowledge of the individual m-to-s and
s-to-m variances, which a practical slave does not know. This formulation
also assumes that m-to-s and s-to-m delays are independent (distinct) WSS
and white discrete-time random processes.
References:
1. G. Giorgi and C. Narduzzi, "Performance Analysis of Kalman-Filter-Based
Clock Synchronization in IEEE 1588 Networks," in IEEE Transactions on
Instrumentation and Measurement, vol. 60, no. 8, pp. 2902-2909, Aug.
2011.
2. G. Giorgi, "An Event-Based Kalman Filter for Clock Synchronization," in
IEEE Transactions on Instrumentation and Measurement, vol. 64, no. 2,
pp. 449-457, Feb. 2015.
3. Brown, R. and Hwang, P., Introduction To Random Signals And Applied
Kalman Filtering With Matlab Exercises, 4Th Edition. John Wiley & Sons,
2012.
Args:
data : Array of objects with simulation data
T : Nominal Sync period in sec
N : Interval used for frequency offset estimations
s_0 : Initial state
P_0 : Initial state's covariance matrix
Q : Process noise covariance matrix
R : Measurement covariance matrix
"""
def __init__(self,
data,
T,
N=1,
obs_model='scalar',
s_0=None,
P_0=None,
R=None,
Q=None):
self.data = data # this pointer could be changed
self._original_data = data # this pointer should be immutable
self.T = T
self.N = N
self.obs_model = obs_model
assert(obs_model in ['scalar', 'vector']), \
f"Unknow observation model {obs_model}"
# System measurements (scalar or vector)
if (self.obs_model == 'scalar'):
self._set_scalar_observation()
elif (self.obs_model == 'vector'):
self._set_vector_observation()
# Default system state
self.s_0 = self._set_initial_state() if s_0 is None else s_0
self.P_0 = np.diag([self.R[0, 0], 1e10]) if P_0 is None else P_0
# Note: In order to improve the initial Kalman transitory, initialize
# the covariance matrix with the time offset variance R and a big
# arbitrary number for the frequency offset variance. The objective is
# to initialize the filter with a high gain. In other words, starts the
# filter with low confidence in the state, favoring the observed
# measurements.
self.A = np.array([
[1., self.T], # State transition matrix
[0., 1.]
])
self.Q = np.array([
[1e-13, 0], # State noise cov matrix
[0, 1e-18]
])
# Validate args
assert (s_0 is None or isinstance(s_0, np.ndarray))
assert (P_0 is None or isinstance(P_0, np.ndarray))
assert (Q is None or isinstance(Q, np.ndarray))
assert (R is None or isinstance(R, np.ndarray))
# Override some parameters based on optional constructor arguments
self.Q = self.Q if Q is None else Q # State noise cov matrix
self.R = self.R if R is None else R # Obs noise cov matrix
# Define matrix dimensions
self.dim_state = self.s_0.shape[0]
self.dim_obs = 1 if self.obs_model == 'scalar' else self.z.shape[1]
# Gain and residual error
self.K = np.zeros((self.dim_state, self.dim_obs)) # Kalman gain
self.y = np.zeros((self.dim_obs, 1)) # Residual error
self.S = np.zeros((self.dim_obs, self.dim_obs)) # System uncertainty
# Identity matrix
self.I = np.eye(self.dim_state) # noqa: E741
def _set_initial_state(self):
"""Define the initial state vector
Normally the initial state vector is random. However, here we choose to
use the first measurements as the initial state. The goal is to reduce
the convergence time.
"""
# Estimate frequency offset based on the first two exchanges
t1 = np.array([float(r["t1"]) for r in self.data[:2]])
t2 = np.array([float(r["t2"]) for r in self.data[:2]])
delta_master = float(np.diff(t1))
delta_slave = float(np.diff(t2))
y_est = (delta_slave - delta_master) / delta_master
# First time offset raw estimate
x_est = self.data[0]["x_est"]
return np.array([x_est, 1e9 * y_est])
def _reset_state(self):
"""Set the initial Kalman filter state
The Kalman filter depends on the initial state vector s[-1] and its
covariance matrix. If everything else is static (including
measurements), as it is in this implementation, the filtering results
derive from this initial state. Hence, here, reset the state vector and
the state estimate covariance matrix.
"""
self.s_post = self.s_0 # Initial a posteriori state vector
self.P = self.P_0 # State estimate covariance matrix
def _set_scalar_observation(self):
"""Set scalar observation model
The scalar-observation model processes the time offset measurements.
"""
# Vector containing all scalar measurements (time offset measurements)
# to be processed throughout the filtering:
self.z = np.array([r["x_est"] for r in self.data])
assert (len(self.z) > 0)
# Observation matrix: maps the state variables into the measurements
#
# In the scalar-observation model, the measurement is only composed by
# the time offset, so the measurement transition matrix becomes the row
# vector [1., 0], which neglects the frequency offset state and maps
# only the time offset state to the time offset measurement.
self.H = np.array([[1., 0.]])
# Observation noise covariance matrix
#
# In the scalar observation model, the measurements noise covariance
# matrix reduces to a scalar value, which corresponds to the variance
# of the estimated delay experienced by the PTP messages.
# Interestingly, this variance can be computed based on two-way delay
# measurements that the PTP slave has in practice, as follows:
d_est = np.array([r['d_est'] for r in self.data])
var_d = np.var(d_est)
self.R = np.array([[var_d]])
# Check dimensions
assert (self.H.shape == (1, 2))
assert (self.R.shape == (1, 1))
def _set_vector_observation(self):
"""Set vector observation model
The vector-observation model processes both time and frequency offsets.
"""
# Matrix containing all vector measurements to be processed throughout
# the filtering. Each row is one vector measurement z[n] = [x_est[n],
# y_est[n]], i.e., containing time and frequency offsets:
x_ns = np.array([r["x_est"] for r in self.data if "y_est" in r])
y = np.array([r["y_est"] * 1e9 for r in self.data if "y_est" in r])
self.z = np.vstack((x_ns, y)).T
assert (len(self.z) > 0)
# NOTE: in the recursive time offset model (repeated below), x[n] is in
# ns, while T is in seconds. Hence, y[n] must be in ppb (ns/sec), so
# that y[n]*T yields ns.
# Find where the frequency estimation starts
for i, r in enumerate(self.data):
if ("y_est" in r):
i_obs_start = i
break
self.data = self._original_data[i_obs_start:]
assert (all([("y_est" in r) for r in self.data]))
# Observation matrix
self.H = np.eye(2)
# Observation noise covariance matrix
#
# FIXME: Use the estimated delay estimation here instead
d_ms = np.array([r['d'] for r in self.data])
d_sm = np.array([r['d_bw'] for r in self.data])
var_x = (np.var(d_ms) + np.var(d_sm)) / 4
var_y = (2 * np.var(d_ms)) / ((self.N * self.T)**2)
cov_x_y = np.var(d_ms) / (2 * self.N * self.T)
cov_y_x = cov_x_y
self.R = np.array([[var_x, cov_x_y], [cov_y_x, var_y]])
# Check dimensions
assert (self.H.shape == (2, 2))
assert (self.R.shape == (2, 2))
def _eval_error(self,
q_vec,
error_metric,
early_stopping,
n_samples,
patience=15):
"""Evaluate error for a given process noise covariance matrix
Args:
q_vec : Vector with covariance matrix to evaluate
error_metric : Chosen error metric: 'max-te' or 'mse'
early_stopping : Whether to stop search when min{erro} stalls
n_samples : Number of samples to consider for the analysis,
so that the acceptable transient phase is
neglected on the error assessment.
patience : Number of consecutive iterations without
improvement to wait before signaling an early stop
Returns:
Q_best : Best evaluated covariance matrix
error : Vector with the error computed for all given variances
"""
# Control variables
last_print = 0
min_err = np.inf
patience_count = 0
error = np.zeros(len(q_vec)**2)
Q_matrix = np.array(
[np.diag([var_x, var_y]) for var_x in q_vec for var_y in q_vec])
for i, Q in enumerate(Q_matrix):
# Track progress
progress = (i / len(Q_matrix))
if (progress - last_print > 0.1):
logger.info("Optimization progress {:5.2f} %".format(progress *
100))
last_print = progress
# Run Kalman
self.Q = Q
self.process()
# Get time offset estimation errors
#
# NOTE: use only the last n_samples, given that this is the subset
# of samples that is targeted for analysis (after the acceptable
# transient).
x_err = np.array([
r["x_kf"] - r["x"] for r in self.data if "x_kf" in r
])[-n_samples:]
# Compute error based on different metrics. Note that the entire
# "x_err" time series are used to compute the final error.
if (error_metric == "max-te"):
error[i] = np.amax(np.abs(x_err))
elif (error_metric == "mse"):
error[i] = np.square(x_err).mean()
else:
raise ValueError("Error metric %s is invalid" % (error_metric))
# Keep track of the minimum error with "early stopping"
#
# Stop search if the Q matrix with minimum error remains the same
# for a number of consecutive matrices.
#
# NOTE: patience count tracks the number of iterations with no
# error reduction (or improvement).
# Update min{error}
if (error[i] < min_err):
min_err = error[i] # min error so far
Q_best = Q # best Q matrix so far
patience_count = 0
else:
patience_count += 1
if (early_stopping and patience_count > patience):
break
return Q_best, error
[docs] def process(self, save_aux=False):
"""Process the observations"""
logger.info("Processing Kalman Filter")
# Remove previous estimates in case they already exist
for r in self.data:
r.pop("x_kf", None)
r.pop("y_kf", None)
r.pop("kf", None)
logger.debug("s[-1] = {} \n"
"P[-1] = {} \n"
"H = {} \n"
"Q = {} \n"
"R = {} \n".format(self.s_0, self.P_0, self.H, self.Q,
self.R))
# Reset the initial state
self._reset_state()
# Iterate over observations
for i, z in enumerate(self.z):
# ----- Prediction -----
# Predict the next state of the system (a priori)
# A priori prediction of the state at the next time step
self.s_prior = np.dot(self.A, self.s_post)
# A priori state estimate's covariance matrix
self.P = np.dot(np.dot(self.A, self.P), self.A.T) + self.Q
# ----- Update -----
# Update the next state estimate based on a new measurement
# Residual error between the new measurement and the a priori
# prediction
self.y = z - np.dot(self.H, self.s_prior)
# Kalman gain
PHT = np.dot(self.P, self.H.T)
self.S = np.dot(self.H, PHT) + self.R
SI = np.linalg.pinv(self.S)
self.K = np.dot(PHT, SI)
# Update the state prediction (x) considering the new measurement:
self.s_post = self.s_prior + np.dot(self.K, self.y)
# Update the state prediction's covariance matrix
#
# The formulation usually seen in the literature is a "short form":
#
# P = (I - KH)P
#
# However, here we implement a "long form", also named as Joseph
# update equation, that as showed in [3] is more numerically stable
# and works for non-optimal K:
#
# P = (I-KH)P(I-KH)' + KRK'
#
I_KH = self.I - np.dot(self.K, self.H)
KRK = np.dot(np.dot(self.K, self.R), self.K.T)
self.P = np.dot(np.dot(I_KH, self.P), I_KH.T) + KRK
# Save time and frequency offset estimates on global data records
self.data[i]['x_kf'] = self.s_post[0]
self.data[i]['y_kf'] = self.s_post[1] * 1e-9
# Additional information useful for analysis
if (save_aux):
self.data[i]['kf'] = {}
self.data[i]['x_kf_prior'] = self.s_prior[0]
self.data[i]['y_kf_prior'] = self.s_prior[1] * 1e-9
self.data[i]['kf']['P'] = self.P
self.data[i]['kf']['K'] = self.K
self.data[i]['kf']['S'] = self.S
[docs] def optimize(self,
cache=None,
error_metric='mse',
early_stopping=True,
force=False,
skip=0.2):
"""Optimize the process noise covariance matrix
Tries the filtering with several state noise covariance matrices and
sets the internal matrix (Q) to the best evaluated value.
Args:
cache : Cache handler used to save the optimized
configuration in a json file
error_metric : Chosen error metric: 'max-te' or 'mse'
early_stopping : Whether to stop the search when min{erro} stalls
patience : Number of consecutive iterations without
improvement to wait before signaling an early stop
skip : Fraction of the dataset to skip during the error
assessment. Should be set to an acceptable
transient for analysis.
Returns:
"""
if (cache is not None):
assert (isinstance(cache, ptp.cache.Cache)), "Invalid cache object"
cached_cfg = cache.load('kf')
if (cached_cfg and not force):
self.Q = np.array(cached_cfg['Q'])
return
var_state_vec = np.array(
[1e-18, 1e-16, 1e-14, 1e-12, 1e-10, 1e-8, 1e-6, 1e-4, 1e-2, 1e-1])
# Number of samples to consider when evaluating the error of each KF
# configuration
n_samples = int((1 - skip) * len(self.data))
Q_best, _ = self._eval_error(var_state_vec,
error_metric=error_metric,
early_stopping=early_stopping,
n_samples=n_samples)
logger.info("Optimal process noise covariance mtx: {}".format(Q_best))
if (cache is not None):
cache.save({'Q': Q_best.tolist()}, identifier='kf')
self.Q = Q_best