Source code for ptp.reader

import logging

import numpy as np

import ptp.compression
from ptp.timestamping import Timestamp
from ptp.mechanisms import DelayReqResp

logger = logging.getLogger(__name__)


[docs]class Reader(): """Reader of data acquired through the testbed Reads a log file containing timestamps acquired via serial communication with the testbed. Post-processes the timestamps following the same sequence that is adopted in PTP simulation and similarly saves reults into a list of dictionaries containing the same keys as produced by the simulation. """ def __init__(self, ds_file=None, infer_secs=False, no_pps=False, reverse_ms=True): """Constructor Args: ds_file : JSON dataset file to read from infer_secs : Ignore acquired seconds and infer their values instead no_pps : Logs to be processed do not contain the reference timestamps acquired from the PPS RTC. reverse_ms : Reverse the master-to-slave direction for offset computations. This is used when the PDelay request-response that is being processed is originated at the slave, so that t1 and t4 are slave timestamps. """ self.running = True self.data = list() self.metadata = None self.ds_file = ds_file self.infer_secs = infer_secs self.no_pps = no_pps self.reverse_ms = reverse_ms if (infer_secs): logger.warning("Inferring seconds") # Prepare to infer seconds, if so desired self.last_t1 = 0 self.last_t2 = 0 self.last_t3 = 0 self.last_t4 = 0 self.last_t1_ns = 0 self.last_t2_ns = 0 self.last_t3_ns = 0 self.last_t4_ns = 0 self.t1_sec = 0 self.t2_sec = 0 self.t3_sec = 0 self.t4_sec = 0 self.last_t1_pps_ns = 0 self.last_t4_pps_ns = 0 self.t1_pps_sec = 0 self.t4_pps_sec = 0 self.idx = 0 # Progress self.last_progress_print = 0
[docs] def process(self, data, pr_level=logging.DEBUG): """Process a set of timestamps Apply the timestamps into a delay request-response mechanism object and use the latter to obtain PTP metrics. Args: data : dicitionary containing timestamp data pr_level : logging level to be used when logging timestamps Returns: results dictionary containing PTP sync metrics """ self.idx = idx = data["idx"] # Print header periodically if (self.idx % 20 == 0): DelayReqResp.log_header(level=pr_level) if (self.infer_secs): # Ns values t1_ns = data["t1"] t2_ns = data["t2"] t3_ns = data["t3"] t4_ns = data["t4"] # Has ns wrapped around? if (t1_ns < self.last_t1_ns): self.t1_sec += 1 if (t2_ns < self.last_t2_ns): self.t2_sec += 1 if (t3_ns < self.last_t3_ns): self.t3_sec += 1 if (t4_ns < self.last_t4_ns): self.t4_sec += 1 # Corresponding timestamp instances t1 = Timestamp(self.t1_sec, t1_ns) t2 = Timestamp(self.t2_sec, t2_ns) t3 = Timestamp(self.t3_sec, t3_ns) t4 = Timestamp(self.t4_sec, t4_ns) # Save ns values for next iteration (for wrapping detection) self.last_t1_ns = t1_ns self.last_t2_ns = t2_ns self.last_t3_ns = t3_ns self.last_t4_ns = t4_ns # PPS timestamps if (not self.no_pps): t1_pps_ns = data["t1_pps"] t4_pps_ns = data["t4_pps"] if (t1_pps_ns < self.last_t1_pps_ns): self.t1_pps_sec += 1 if (t4_pps_ns < self.last_t4_pps_ns): self.t4_pps_sec += 1 t1_pps = Timestamp(self.t1_pps_sec, t1_pps_ns) t4_pps = Timestamp(self.t4_pps_sec, t4_pps_ns) self.last_t1_pps_ns = t1_pps_ns self.last_t4_pps_ns = t4_pps_ns else: t1 = Timestamp(data["t1_sec"], data["t1"]) t2 = Timestamp(data["t2_sec"], data["t2"]) t3 = Timestamp(data["t3_sec"], data["t3"]) t4 = Timestamp(data["t4_sec"], data["t4"]) if (not self.no_pps): t1_pps = Timestamp(data["t1_pps_sec"], data["t1_pps"]) t4_pps = Timestamp(data["t4_pps_sec"], data["t4_pps"]) # Are all the timestamps progressing monotonically? assert (float(t1 - self.last_t1) > 0) assert (float(t2 - self.last_t2) > 0) assert (float(t3 - self.last_t3) > 0) assert (float(t4 - self.last_t4) > 0) self.last_t1 = t1 self.last_t2 = t2 self.last_t3 = t3 self.last_t4 = t4 # Add timestamps to delay req-resp if (self.reverse_ms): dreqresp = DelayReqResp(idx, t3) dreqresp.set_t2(idx, t4) dreqresp.set_t3(idx, t1) dreqresp.set_t4(idx, t2) else: dreqresp = DelayReqResp(idx, t1) dreqresp.set_t2(idx, t2) dreqresp.set_t3(idx, t3) dreqresp.set_t4(idx, t4) # Set ground truth based on PPS timestamps if (not self.no_pps): if (self.reverse_ms): forward_delay = float(t4_pps - t3) backward_delay = float(t2 - t1_pps) else: forward_delay = float(t2 - t1_pps) backward_delay = float(t4_pps - t3) dreqresp.set_forward_delay(idx, forward_delay) dreqresp.set_backward_delay(idx, backward_delay) dreqresp.set_true_toffset(t4_pps, t4) # Process results = dreqresp.process() dreqresp.log(results, level=pr_level) return results
[docs] def check_progress(self, i_iter, n_iter): """Check/print simulation progress Args: i_iter : Iteration index n_iter : Numger of iterations """ progress = i_iter / n_iter if (progress > self.last_progress_print + 0.1): logger.info("Reader progress: %6.2f %%" % (progress * 100)) self.last_progress_print = progress
[docs] def run(self, max_len=0): """Loads timestamps and post-processes to generate PTP data Load a list containing sets of timestamps (t1, t2, t3 and t4) from a JSON and save each set of PTP metrics into self.data just like the PTP simulation would. Args: max_len : Maximum number of entries to process """ # Load and decompress dataset codec = ptp.compression.Codec(filename=self.ds_file) ds = codec.decompress() # Check metadata for compatibility with old captures if ('metadata' in ds): self.metadata = ds['metadata'] data = ds['data'] else: data = ds # Debug print header DelayReqResp.log_header() # Restrict number of iterations, if so desired if (max_len > 0): n_data = max_len else: n_data = len(data) optional_metrics = [ "temp", "rru_occ", "rru2_occ", "bbu_occ", "pps_err", "pps_err2", "seq_id", "y_pps", "y_pps2" ] # Put info in dictionary and append to self.data for i in range(0, n_data): results = self.process(data[i]) # Append optional metrics to results if they are present for key in optional_metrics: if (key in data[i]): results[key] = data[i][key] self.data.append(results) self.check_progress(i, n_data)
[docs] def trim(self, interval): """Restrict dataset to given interval Args: interval : Desired dataset interval given as start:end in hours """ start = float(interval.split(":")[0]) end = float(interval.split(":")[1]) assert (end > start), "Interval must be positive" ns_per_hour = 1e9 * 60 * 60 t_start = self.data[0]["t1"] t_h = np.array([float(r["t1"] - t_start) for r in self.data]) / ns_per_hour i_s = np.where(t_h > start)[0][0] i_e = np.where(t_h <= end)[0][-1] self.data = self.data[i_s:i_e]