Source code for ptp.compression

"""Compress/decompress PTP datasets"""
import bz2
import gzip
import json
import logging
import lzma
import os
import pickle
import time


[docs]class Codec(): """Dataset compression coder/decoder (codec) Args: ds : (dictionary) dataset filename : (string) dataset file name compressed : (bool) whether the supplied dataset is compressed """ def __init__(self, ds={}, filename="", compressed=False): assert (isinstance(ds, dict)) assert (isinstance(filename, str)) # When the dataset is not provided to the constructor, it means we need # to load it from the file if (ds == {}): assert(filename != ""), \ "Please provide dataset file name when the dataset itself is \ not provided" ext = os.path.splitext(filename)[1] assert(ext in [".json", ".pickle", ".gz", ".pbz2", ".xz"]), \ "File extension {} not supported".format(ext) self.compressed = "-comp" in filename self._load(filename) else: self.ds = ds # mutable and a shallow copy self.compressed = compressed if (filename != ""): # When the filename is provided, but also the dataset (ds) is # provided, assume that the file doesn't exist. The name is just # there so that we can set "out_name" (output file name). # Otherwise, when the ds argument is empty, assume there is a file # from which we will load the data. self.orig_size = None if (ds != {}) else os.path.getsize(filename) no_ext_name = os.path.splitext(filename)[0] self.name = no_ext_name self.out_name = no_ext_name + "-comp" else: self.orig_size = self.name = self.out_name = None def _load(self, filename): """Load dataset from file""" logger = logging.getLogger("codec-load") tic = time.time() if (filename.endswith('.json')): with open(filename, 'r') as fd: self.ds = json.load(fd) elif (filename.endswith('.pickle')): with open(filename, 'rb') as fd: self.ds = pickle.load(fd) elif (filename.endswith('.gz')): with gzip.open(filename, "rb") as fd: self.ds = pickle.load(fd) elif (filename.endswith('.pbz2')): with bz2.BZ2File(filename, 'rb') as fd: self.ds = pickle.load(fd) elif (filename.endswith('.xz')): with lzma.open(filename, "rb") as fd: self.ds = pickle.load(fd) else: raise ValueError("Unsupported extension") toc = time.time() logger.debug("Deserialization took {:5.2f} secs".format(toc - tic))
[docs] def compress(self): """Reorganize dataset more efficiently for storing into files The data['data'] member of the dataset holds a list of dictionaries, each containing several metrics. This is is very inefficient for storage, since the keys (strings) are repeated on every dictionary. Some of the metrics in the dataset are present on all dictionaries. Hence, they can be stored in lists directly. Other metrics are not present in all dictionaries, in which case they should be stored in a pair of lists, one containing the actual time-series, the other containing the indexes where the elements are present in the dataset. Args: data : Dataset dictionary formatted as {'metadata': x, 'data': y}, i.e., as a dictionary containing the metadata and data keys. Returns: (dict) The compressed dataset """ logger = logging.getLogger("codec-compress") logger.info("Compress dataset") if (self.compressed): logger.warning("Dataset is already compressed") return self.ds self.ds['indexed'] = {} self.ds['idx'] = {} self.ds['non-indexed'] = {} # Find all possible dictionary keys (no need to loop over the entire # dataset, they should be present on the first entries) keys = set() for x in self.ds['data']: keys.update(x.keys()) # Find the keys that are always present candidate_keys = self.ds['data'][0].keys() always_present_keys = list() for key in candidate_keys: always_present = all([key in x for x in self.ds['data']]) if always_present: always_present_keys.append(key) # For keys that are always present, move to non-indexed lists and # remove from the original dictionaries in self.ds['data'] for key in sorted(always_present_keys): logger.debug("Non-indexed key {}".format(key)) ts = [x[key] for x in self.ds['data']] self.ds['non-indexed'][key] = ts for x in self.ds['data']: x.pop(key, None) # For keys that are not always present, move both the time-series and # the indexes into the indexed lists sporadic_keys = keys.difference(set(always_present_keys)) for key in sorted(sporadic_keys): logger.debug("Indexed key {}".format(key)) ts = [x[key] for x in self.ds['data'] if key in x] idx = [i for i, x in enumerate(self.ds['data']) if key in x] # Save time-series self.ds['indexed'][key] = ts # Save indexes # # First, check if another time-series has the same indexes. In the # positive case, save the index entry as a string (the key of the # other time-series with equal indexes). This avoids saving two # equal vectors of indexes unnecessarily. idx_vec_exists = False for idx_key in self.ds['idx']: if (idx == self.ds['idx'][idx_key]): self.ds['idx'][key] = idx_key idx_vec_exists = True break if (not idx_vec_exists): self.ds['idx'][key] = idx for x in self.ds['data']: x.pop(key, None) # We should have removed all elements from all dictionaries assert (not any([x for x in self.ds['data']])) self.ds.pop('data') # Maybe some index vectors were repeated and further savings were # achieved logger.debug("Unique index vectors:") for key in self.ds['idx']: if (isinstance(self.ds['idx'][key], list)): logger.debug("{}".format(key)) logger.debug("Repeated index vectors:") for key in self.ds['idx']: if (isinstance(self.ds['idx'][key], str)): logger.debug("{} -> {}".format(key, self.ds['idx'][key])) self.compressed = True return self.ds
[docs] def decompress(self): """Revert the compression Returns: (dict) The decompressed dataset """ logger = logging.getLogger("codec-decompress") logger.info("Decompress dataset") if (not self.compressed): logger.warning("Dataset is already decompressed") return self.ds assert ('data' not in self.ds) # Non-indexed time-series for key in self.ds['non-indexed']: logger.debug("Non-indexed key {}".format(key)) # Initialize the list of dictionaries if ('data' not in self.ds): ds_len = len(self.ds['non-indexed'][key]) self.ds['data'] = [{} for _ in range(ds_len)] # Add values to each dictionary for i, x in enumerate(self.ds['non-indexed'][key]): self.ds['data'][i][key] = x self.ds.pop('non-indexed') # Indexed time-series assert ('data' in self.ds) for key in self.ds['indexed']: logger.debug("Indexed key {}".format(key)) ts = self.ds['indexed'][key] if (isinstance(self.ds['idx'][key], str)): idx = self.ds['idx'][self.ds['idx'][key]] else: idx = self.ds['idx'][key] # Add values to each dictionary for i, x in zip(idx, ts): self.ds['data'][i][key] = x self.ds.pop('indexed') self.ds.pop('idx') assert (len(self.ds.keys()) == 2) assert ("metadata" in self.ds and "data" in self.ds) self.compressed = False return self.ds
[docs] def dump(self, ext="xz"): """Dump dataset to file Args: ext : Output file extension, which also determines the binary compression scheme to be adopted. Choose from "json", "pickle", "gz", "pbz2" or "xz" """ logger = logging.getLogger("codec-dump") assert (ext in ["json", "pickle", "gz", "pbz2", "xz"]) assert (self.compressed), "Dataset has not been compressed yet" assert (self.out_name is not None), "Original dataset name not provided" ext = ext.lower() outfile = "{}.{}".format(self.out_name, ext) logger.info("Dump compressed dataset into {}".format(outfile)) tic = time.time() if ext == "json": with open(outfile, 'w') as fd: json.dump(self.ds, fd) elif ext == "pickle": with open(outfile, 'wb') as fd: pickle.dump(self.ds, fd) elif ext == "gz": with gzip.open(outfile, "wb") as fd: pickle.dump(self.ds, fd) elif ext == "pbz2": with bz2.BZ2File(outfile, 'wb') as fd: pickle.dump(self.ds, fd) elif ext == "xz": with lzma.open(outfile, "wb") as fd: pickle.dump(self.ds, fd) else: raise ValueError("Unsupported extension {}".format(ext)) toc = time.time() duration = (toc - tic) new_size = os.path.getsize(outfile) new_size_mb = new_size / (2**20) if (self.orig_size is not None): ratio = self.orig_size / new_size logger.info("Compression: format: {:6s} - size: {:5.2f} MB - " "ratio - {:5.2f} - duration: {:5.2f} secs".format( ext, new_size_mb, ratio, duration)) else: logger.info("Compression: format: {:6s} - size: {:5.2f} MB - " "duration: {:5.2f} secs".format( ext, new_size_mb, duration))