Source code for

#!/usr/bin/env python
# -*- coding: utf-8 -*-
Data utils

Utility classes related to data handling.


Data sequencer class to process data matrix into sequences (images). Sequences can overlap. Sequencing grid can be
altered between calls.

.. autosummary::
    :toctree: generated/



Data processor class to process raw features into data suitable for machine learning algorithms. Feature processing
chain and data processing chain are defined during the class construction, and these processing chains are applied
to the input data.

.. autosummary::
    :toctree: generated/



Data buffering class, which can be used to store data and meta data associated to the item. Item data is accessed
through item key. When internal buffer is filled, oldest item is replaced.

.. autosummary::
    :toctree: generated/



Data processing chain class, inherited from list.

.. autosummary::
    :toctree: generated/



from __future__ import print_function, absolute_import
from six import iteritems
import logging
import numpy
import copy
import collections
from .features import FeatureRepository

[docs]class DataBuffer(object): """Data buffer (FIFO) Buffer can store data and meta data associated to it. """
[docs] def __init__(self, *args, **kwargs): """__init__ method. Parameters ---------- size : int Number of item to store in the buffer Default value 10 """ self.size = kwargs.get('size', 10) self.index = collections.deque(maxlen=self.size) self.data_buffer = collections.deque(maxlen=self.size) self.meta_buffer = collections.deque(maxlen=self.size)
[docs] def count(self): """Buffer usage Returns ------- buffer length: int """ return len(self.index)
[docs] def full(self): """Buffer full Returns ------- bool """ if self.count() == self.size: return True else: return False
[docs] def key_exists(self, key): """Check that key exists in the buffer Parameters ---------- key : str or number Key value Returns ------- bool """ if key in self.index: return True else: return False
[docs] def set(self, key, data=None, meta=None): """Insert item to the buffer Parameters ---------- key : str or number Key value data : Item data meta : Item meta Returns ------- DataBuffer object """ if not self.key_exists(key): self.index.append(key) self.data_buffer.append(data) self.meta_buffer.append(meta) return self
[docs] def get(self, key): """Get item based on key Parameters ---------- key : str or number Key value Returns ------- data : (data, meta) """ if self.key_exists(key): index = list(self.index).index(key) return self.data_buffer[index], self.meta_buffer[index] else: return None, None
[docs] def clear(self): """Empty the buffer """ self.index.clear() self.data_buffer.clear() self.meta_buffer.clear()
class DataProcessingUnitMixin(object): """Data processing chain unit mixin""" def process(self, data): pass
[docs]class DataSequencer(DataProcessingUnitMixin): """Data sequencer""" __version__ = '0.0.1'
[docs] def __init__(self, *args, **kwargs): """__init__ method. Parameters ---------- frames : int Sequence length Default value 10 hop : int Hop value of when forming the sequence Default value = frames padding: bool Replicate data when sequence is not full Default value False shift_step : int Sequence start temporal shifting amount, is added once method increase_shifting is called Default value 0 shift_border : string, {'roll', 'shift'} Sequence border handling when doing temporal shifting. Default value 'roll' shift_max : int Maximum value for temporal shift Default value None """ self.logger = kwargs.get('logger', logging.getLogger(__name__)) self.frames = kwargs.get('frames', 10) self.hop_size = kwargs.get('hop') if self.hop_size is None: self.hop_size = self.frames self.padding = kwargs.get('padding', False) self.shift = 0 self.shift_step = kwargs.get('shift_step', 0) self.shift_border = kwargs.get('shift_border') if self.shift_border is None: self.shift_border = 'roll' if self.shift_border not in ['roll', 'shift']: message = '{name}: Unknown temporal shifting border handling [{border_mode}]'.format( name=self.__class__.__name__, border_mode=self.shift_border ) self.logger.exception(message) raise IOError(message) self.shift_max = kwargs.get('shift_max')
def __getstate__(self): # Return only needed data for pickle return { 'frames': self.frames, 'hop_size': self.hop_size, 'padding': self.padding, 'shift': self.shift, 'shift_step': self.shift_step, 'shift_border': self.shift_border, 'shift_max': self.shift_max, } def __setstate__(self, d): self.frames = d['frames'] self.hop_size = d['hop_size'] self.padding = d['padding'] self.shift = d['shift'] self.shift_step = d['shift_step'] self.shift_border = d['shift_border'] self.shift_max = d['shift_max'] self.logger = logging.getLogger(__name__)
[docs] def process(self, data): """Process Parameters ---------- data : numpy.ndarray Data Returns ------- numpy.ndarray """ # Make copy of the data to prevent modifications to the original data data = copy.deepcopy(data) # Not the most efficient way as numpy stride_tricks would produce # faster code, however, opted for cleaner presentation this time. data_length = data.shape[0] X = [] if self.shift_border == 'shift': segment_indexes = numpy.arange(self.shift, data_length, self.hop_size) elif self.shift_border == 'roll': segment_indexes = numpy.arange(0, data_length, self.hop_size) if self.shift: # Roll data data = numpy.roll( data, shift=-self.shift, axis=0 ) if self.padding: if len(segment_indexes) == 0: # Have at least one segment segment_indexes = numpy.array([0]) else: # Remove segments which are not full segment_indexes = segment_indexes[(segment_indexes+self.frames-1) < data_length] for segment_start_frame in segment_indexes: segment_end_frame = segment_start_frame + self.frames frame_ids = numpy.array(range(segment_start_frame, segment_end_frame)) if self.padding: # If start of matrix, pad with first frame frame_ids[frame_ids < 0] = 0 # If end of the matrix, pad with last frame frame_ids[frame_ids > data_length - 1] = data_length - 1 X.append(numpy.expand_dims(data[frame_ids, :], axis=0)) if len(X) == 0: message = '{name}: Cannot create valid segment, adjust segment length and hop size, or use ' \ 'padding flag.'.format(name=self.__class__.__name__) self.logger.exception(message) raise IOError(message) return numpy.concatenate(X)
[docs] def increase_shifting(self, shift_step=None): """Increase temporal shifting Parameters ---------- shift_step : int Optional value, if none given shift_step parameter given for init is used. Default value None """ if shift_step is None: shift_step = self.shift_step self.shift += shift_step if self.shift_max and self.shift > self.shift_max: self.shift = 0
[docs]class DataProcessor(object): """Data processors with feature and data processing chains Feature processing chain comprehend all processing done to get feature matrix synchronized with meta data. Data processing chain is applied to the feature matrix and meta data to reshape data for machine learning. """ __version__ = '0.0.1'
[docs] def __init__(self, *args, **kwargs): """Constructor Parameters ---------- feature_processing_chain : ProcessingChain List of processing functions data_processing_chain : ProcessingChain List of data processing functions """ self.feature_processing_chain = kwargs.get('feature_processing_chain', ProcessingChain()) self.data_processing_chain = kwargs.get('data_processing_chain', ProcessingChain()) self.logger = kwargs.get('logger', logging.getLogger(__name__))
def __getstate__(self): # Return only needed data for pickle return { 'feature_processing_chain': self.feature_processing_chain, 'data_processing_chain': self.data_processing_chain, } def __setstate__(self, d): self.feature_processing_chain = d['feature_processing_chain'] self.data_processing_chain = d['data_processing_chain'] self.logger = logging.getLogger(__name__)
[docs] def load(self, feature_filename_dict, process_features=True, process_data=True): """Load feature item Parameters ---------- feature_filename_dict : dict of filenames Dict with feature extraction methods as keys and value corresponding feature file process_features : bool Apply feature processing chain. Default value True process_data : bool Apply data processing chain. Default value True Returns ------- Processed feature data """ # Load item feature_data = FeatureRepository(filename_dict=feature_filename_dict) # Process item return self.process( feature_data=feature_data, process_features=process_features, process_data=process_data )
[docs] def process(self, feature_data, process_features=True, process_data=True): """Process feature data Parameters ---------- feature_data : FeatureContainer Feature data. process_features : bool Apply feature processing chain. Default value True process_data : bool Apply data processing chain. Default value True Returns ------- feature_data : ndarray Processed feature data feature_vector_count : int Number of feature vectors before data processing """ # Go through the feature processing chain if process_features: feature_data = self.process_features(feature_data=feature_data) # Save feature matrix length before doing data processing chain feature_vector_count = feature_data.shape[0] if self.data_processing_chain and process_data: return self.process_data(data=feature_data.feat[0]), feature_vector_count else: return feature_data.feat[0], feature_vector_count
[docs] def process_features(self, feature_data): """Process feature data Parameters ---------- feature_data : FeatureContainer Feature data. Returns ------- feature_data : ndarray Processed feature data """ if hasattr(feature_data, 'feat'): feature_data = feature_data.feat[0] # Do processing feature_data = self.feature_processing_chain.process(feature_data) return feature_data
[docs] def process_activity_data(self, activity_data): """Process activity data Parameters ---------- activity_data : ndarray Activity data, usually binary matrix Returns ------- activity_data : ndarray Processed activity data """ return self.process_data(data=activity_data, metadata=True)
[docs] def process_data(self, data, metadata=False): """Process data Parameters ---------- data : ndarray Data metadata : bool Processing metadata, extra dimension added non-metadata Default value True Returns ------- data : ndarray Processed data """ if hasattr(data, 'feat'): data = data.feat[0] # Go through the data processing chain if self.data_processing_chain: data = self.data_processing_chain.process(data) if not metadata: data = numpy.expand_dims(data, axis=1) return data
[docs] def call_method(self, method_name, parameters=None): """Call class method in the processing chain items Processing chain (feature and data) is gone through and given method is called to processing items having such method. Parameters ---------- method_name : str Method name to call parameters : dict Parameters for the method Default value {} """ self.feature_processing_chain.call_method( method_name=method_name, parameters=parameters ) self.data_processing_chain.call_method( method_name=method_name, parameters=parameters )
[docs]class ProcessingChain(list):
[docs] def process(self, data): """Process the data with processing chain Parameters ---------- data : FeatureContainer or numpy.ndarray Data Returns ------- data : numpy.ndarray Processed data """ for step in self: if step is not None and hasattr(step, 'process'): data = step.process(data) return data
[docs] def call_method(self, method_name, parameters=None): """Call class method in the processing chain items Processing chain is gone through and given method is called to processing items having such method. Parameters ---------- method_name : str Method name to call parameters : dict Parameters for the method Default value {} """ parameters = parameters or {} for item in self: if hasattr(item, method_name): getattr(item, method_name)(**parameters)