Source code for dcase_framework.features

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Features
==================
Classes for feature handling

FeatureContainer
^^^^^^^^^^^^^^^^

Container class to store features along with statistics and meta data. Class is based on dict through
inheritance of FeatureFile class.

Usage examples:

.. code-block:: python
    :linenos:

    # Example 1
    feature_container = FeatureContainer(filename='features.cpickle')
    feature_container.show()
    feature_container.log()
    print('Feature shape={shape}'.format(shape=feature_container.shape))
    print('Feature channels={channels}'.format(channels=feature_container.channels))
    print('Feature frames={frames}'.format(frames=feature_container.frames))
    print('Feature vector length={vector_length}'.format(vector_length=feature_container.vector_length))
    print(feature_container.feat)
    print(feature_container.stat)
    print(feature_container.meta)
    # Example 2
    feature_container = FeatureContainer().load(filename='features.cpickle')
    # Example 3
    feature_repository = FeatureContainer().load(filename_list={'mel':'mel_features.cpickle', 'mfcc':'mfcc_features.cpickle'})
    # Example 4
    feature_container = FeatureContainer(features=[numpy.ones((100,10)),numpy.ones((100,10))])

.. autosummary::
    :toctree: generated/

    FeatureContainer
    FeatureContainer.show
    FeatureContainer.log
    FeatureContainer.get_path
    FeatureContainer.shape
    FeatureContainer.channels
    FeatureContainer.frames
    FeatureContainer.vector_length
    FeatureContainer.feat
    FeatureContainer.stat
    FeatureContainer.meta
    FeatureContainer.load


FeatureRepository
^^^^^^^^^^^^^^^^^

Feature repository class, where feature containers for each type of features are stored in a dict. Type name is
used as key.

.. autosummary::
    :toctree: generated/

    FeatureRepository
    FeatureRepository.show
    FeatureRepository.log
    FeatureRepository.get_path
    FeatureRepository.load

FeatureExtractor
^^^^^^^^^^^^^^^^

Feature extractor class.

Usage examples:

.. code-block:: python
    :linenos:

    # Example 1, to get feature only without storing them
    feature_repository = FeatureExtractor().extract(audio_file='debug/test.wav',
                                                    extractor_name='mfcc',
                                                    extractor_params={
                                                        'mfcc': {
                                                            'n_mfcc': 10
                                                        }
                                                    }
                                                    )
    feature_repository['mfcc'].show()

    # Example 2, to store features during the extraction
    feature_repository = FeatureExtractor(store=True).extract(
        audio_file='debug/test.wav',
        extractor_name='mfcc',
        extractor_params={
            'mfcc': {
                'n_mfcc': 10
            }
        },
        storage_paths={
            'mfcc': 'debug/test.mfcc.cpickle'
        }
    )

    # Example 3
    print(FeatureExtractor().get_default_parameters())


.. autosummary::
    :toctree: generated/

    FeatureExtractor
    FeatureExtractor.extract
    FeatureExtractor.get_default_parameters

FeatureNormalizer
^^^^^^^^^^^^^^^^^

Feature normalizer class.

Usage examples:

.. code-block:: python
    :linenos:

    # Example 1
    normalizer = FeatureNormalizer()
    for feature_matrix in training_items:
         normalizer.accumulate(feature_matrix)
    normalizer.finalize()

    for feature_matrix in test_items:
        feature_matrix_normalized = normalizer.normalizer(feature_matrix)
        # used the features

    # Example 2
    with FeatureNormalizer() as norm:
        norm.accumulate(feature_repository['mfcc'])

    for feature_matrix in test_items:
        feature_matrix_normalized = normalizer.normalizer(feature_matrix)
        # used the features

.. autosummary::
    :toctree: generated/

    FeatureNormalizer
    FeatureNormalizer.accumulate
    FeatureNormalizer.finalize
    FeatureNormalizer.normalize
    FeatureNormalizer.process

FeatureStacker
^^^^^^^^^^^^^^

Feature stacking class. Class takes feature vector recipe and FeatureRepository, and creates appropriate feature matrix.


**Feature vector recipe**

With a recipe one can either select full matrix, only part of with start and end index, or select individual rows from it.

Example recipe:

.. code-block:: python
    :linenos:

    [
     {
        'method': 'mfcc',
     },
     {
        'method': 'mfcc_delta'
        'vector-index: {
            'channel': 0,
            'start': 1,
            'end': 17,
            'full': False,
            'selection': False,
        }
      },
     {
        'method': 'mfcc_acceleration',
        'vector-index: {
            'channel': 0,
            'full': False,
            'selection': True,
            'vector': [2, 4, 6]
        }
     }
    ]

See  :py:meth:`dcase_framework.ParameterContainer._parse_recipe` how text recipe can be confiniently used to generate
above structure.

.. autosummary::
    :toctree: generated/

    FeatureStacker
    FeatureStacker.normalizer
    FeatureStacker.feature_vector
    FeatureStacker.process


FeatureAggregator
^^^^^^^^^^^^^^^^^

Feature aggregator can be used to process feature matrix in a processing windows.
This processing stage can be used to collapse features within certain window lengths by
calculating mean and std of them, or flatten the matrix into one feature vector.

Supported processing methods:

- ``flatten``
- ``mean``
- ``std``
- ``cov``
- ``kurtosis``
- ``skew``

The processing methods can combined.

Usage examples:

.. code-block:: python
    :linenos:

    feature_aggregator = FeatureAggregator(
        recipe=['mean', 'std'],
        win_length_frames=10,
        hop_length_frames=1,
    )

    feature_stacker = FeatureStacker(recipe=[{'method': 'mfcc'}])
    feature_repository = FeatureContainer().load(filename_list={'mfcc': 'mfcc.cpickle'})
    feature_matrix = feature_stacker.feature_vector(feature_repository=feature_repository)
    feature_matrix = feature_aggregator.process(feature_container=feature_matrix)

.. autosummary::
    :toctree: generated/

    FeatureAggregator
    FeatureAggregator.process

FeatureMasker
^^^^^^^^^^^^^

Feature masker can be used to mask segments of feature matrix out. For examples, error segments of signal
can be excluded from the matrix.

Usage examples:

.. code-block:: python
    :linenos:

    feature_masker = FeatureMasker(hop_length_seconds=0.01)
    mask_events = MetaDataContainer([
        {
            'event_onset': 1.0,
            'event_offset': 1.5,
        },
        {
            'event_onset': 2.0,
            'event_offset': 2.5,
        },
    ])

    masked_features = feature_masker.process(feature_repository=feature_repository, mask_events=mask_events)

.. autosummary::
    :toctree: generated/

    FeatureMasker
    FeatureMasker.process

"""

from __future__ import print_function, absolute_import
from six import iteritems
import os
import logging
import numpy
import librosa
import scipy
import collections
import copy
from time import gmtime, strftime
from .files import FeatureFile, AudioFile, DataFile, RepositoryFile
from .containers import ContainerMixin, DottedDict
from .parameters import ParameterContainer
from .utils import filelist_exists
from .metadata import MetaDataContainer


[docs]class FeatureContainer(FeatureFile, ContainerMixin): """Feature container inherited from dict Container has following internal structure: - feat, list of feature matrices, [channel][frames,feature_vector] - stat, list of feature statistics - meta, dict with feature meta data """ __version__ = '0.0.1'
[docs] def __init__(self, *args, **kwargs): """Constructor Parameters ---------- filename: str, optional If filename is given container is loaded in the initialization stage. Default value "None" features: list, optional """ if kwargs.get('filename', None): super(FeatureContainer, self).__init__({ 'feat': [], 'stat': None, 'meta': {}, }) self.load(filename=kwargs.get('filename')) else: if kwargs.get('features', []): super(FeatureContainer, self).__init__({ 'feat': kwargs.get('features', []), 'stat': None, 'meta': kwargs.get('meta', {}), }) else: super(FeatureContainer, self).__init__(*args, **kwargs)
@property def shape(self): """Shape of feature matrix Returns ------- """ if 'feat' in self: return self.feat[0].shape else: return None @property def channels(self): """Number of feature channels Returns ------- int """ if 'feat' in self: return len(self.feat) else: return None @property def frames(self): """Number of feature frames Returns ------- int """ if 'feat' in self: return self.feat[0].shape[0] else: return None @property def vector_length(self): """Feature vector length Returns ------- int """ if 'feat' in self: return self.feat[0].shape[1] else: return None @property def feat(self): """Feature data Returns ------- list of numpy.ndarray """ if 'feat' in self: return self['feat'] else: return None @feat.setter def feat(self, value): self['feat'] = value @property def stat(self): """Statistics of feature data Returns ------- list of dicts """ if self.feat: if 'stat' not in self or not self['stat']: stat_container = [] for channel_data in self.feat: stat_container.append({ 'mean': numpy.mean(channel_data, axis=0), 'std': numpy.std(channel_data, axis=0), 'N': channel_data.shape[0], 'S1': numpy.sum(channel_data, axis=0), 'S2': numpy.sum(channel_data ** 2, axis=0), }) self['stat'] = stat_container return self['stat'] else: return None @property def meta(self): """Meta data Returns ------- dict """ if 'meta' in self: return self['meta'] else: return None @meta.setter def meta(self, value): self['meta'] = value
[docs] def load(self, filename=None, filename_dict=None): """Load data into container If filename is given, container is loaded from disk If filename_list is given, a FeatureRepository is created and returned. Parameters ---------- filename : str, optional filename_dict : dict, optional Returns ------- FeatureContainer or FeatureRepository """ if filename: return super(FeatureContainer, self).load(filename=filename) if filename_dict: repository = FeatureRepository({}) for method, filename in iteritems(filename_dict): repository[method] = FeatureContainer().load(filename=filename) return repository
[docs]class FeatureRepository(RepositoryFile, ContainerMixin): """Feature repository Feature containers for each type of features are stored in a dict. Type name is used as key. """ __version__ = '0.0.1'
[docs] def __init__(self, *args, **kwargs): """Constructor Parameters ---------- filename_dict: dict Dict of file paths, feature extraction method label as key, and filename as value. If given, features are loaded in the initialization stage. Default value "None" features: list, optional """ super(FeatureRepository, self).__init__(*args, **kwargs) self.logger = kwargs.get('logger', logging.getLogger(__name__)) if kwargs.get('filename_dict', None): self.filename_dict = kwargs.get('filename_dict', None) self.load()
[docs] def load(self, filename_dict=None): """Load file list Parameters ---------- filename_dict : dict Dict of file paths, feature extraction method label as key, and filename as value. Returns ------- self """ if filename_dict: self.filename_dict = filename_dict if self.filename_dict and filelist_exists(self.filename_dict): dict.clear(self) sorted(self.filename_dict) for method, filename in iteritems(self.filename_dict): if not method.startswith('_'): # Skip method starting with '_', those are just for extra info self[method] = FeatureContainer().load(filename=filename) return self else: message = '{name}: Feature repository cannot be loaded [{filename_dict}]'.format( name=self.__class__.__name__, filename_dict=self.filename_dict ) self.logger.exception(message) raise IOError(message)
[docs]class FeatureExtractor(object): """Feature extractor""" __version__ = '0.0.1'
[docs] def __init__(self, *args, **kwargs): """Constructor Parameters ---------- store : bool Store features to disk Default value "False" overwrite : bool If set True, features are overwritten on disk Default value "False" """ self.eps = numpy.spacing(1) self.overwrite = kwargs.get('overwrite', False) self.store = kwargs.get('store', False) self.logger = kwargs.get('logger', logging.getLogger(__name__)) self.valid_extractors = [ 'mfcc', 'mfcc_delta', 'mfcc_acceleration', 'mel' ] self.valid_extractors += kwargs.get('valid_extractors', []) self.default_general_parameters = { 'fs': 44100, 'win_length_samples': int(0.04 * 44100), 'hop_length_samples': int(0.02 * 44100), } self.default_general_parameters.update(kwargs.get('default_general_parameters', {})) self.default_parameters = { 'mfcc': { 'mono': True, # [True, False] 'window': 'hamming_asymmetric', # [hann_asymmetric, hamming_asymmetric] 'spectrogram_type': 'magnitude', # [magnitude, power] 'n_mfcc': 20, # Number of MFCC coefficients 'n_mels': 40, # Number of MEL bands used 'n_fft': 2048, # FFT length 'fmin': 0, # Minimum frequency when constructing MEL bands 'fmax': 22050, # Maximum frequency when constructing MEL band 'htk': False, # Switch for HTK-styled MEL-frequency equation }, 'mfcc_delta': { 'width': 9, 'dependency_method': 'mfcc', }, 'mfcc_acceleration': { 'width': 9, 'dependency_method': 'mfcc', }, 'mel': { 'mono': True, # [True, False] 'window': 'hamming_asymmetric', # [hann_asymmetric, hamming_asymmetric] 'spectrogram_type': 'magnitude', # [magnitude, power] 'n_mels': 40, # Number of MEL bands used 'normalize_mel_bands': False, # [True, False] 'n_fft': 2048, # FFT length 'fmin': 0, # Minimum frequency when constructing MEL bands 'fmax': 22050, # Maximum frequency when constructing MEL band 'htk': True, # Switch for HTK-styled MEL-frequency equation 'log': True, # Logarithmic } } self.default_parameters.update(kwargs.get('default_parameters', {})) # Update general parameters and expand dependencies for method, data in iteritems(self.default_parameters): data.update(self.default_general_parameters) if ('dependency_method' in data and data['dependency_method'] in self.valid_extractors and data['dependency_method'] in self.default_parameters): data['dependency_parameters'] = self.default_parameters[data['dependency_method']]
def __getstate__(self): # Return only needed data for pickle return { 'eps': self.eps, 'overwrite': self.overwrite, 'store': self.store, 'valid_extractors': self.valid_extractors, 'default_general_parameters': self.default_general_parameters, 'default_parameters': self.default_parameters, } def __setstate__(self, d): self.eps = d['eps'] self.overwrite = d['overwrite'] self.store = d['store'] self.valid_extractors = d['valid_extractors'] self.default_general_parameters = d['default_general_parameters'] self.default_parameters = d['default_parameters'] self.logger = logging.getLogger(__name__)
[docs] def extract(self, audio_file, extractor_params=None, storage_paths=None, extractor_name=None): """Extract features for audio file Parameters ---------- audio_file : str Filename of audio file. extractor_params : dict of dicts Keys at first level corresponds to feature extraction methods, and second level is parameters given to the extractor method. If none given, default parameters used. storage_paths : dict of strings Keys at first level corresponds to feature extraction methods, second level is path to store feature containers. extractor_name : str Feature extractor method name, if none given, extractor_params is used. Use this to select specific extractor method. Default value "None" Raises ------ ValueError: Unknown extractor method Returns ------- FeatureRepository Repository, a dict of FeatureContainers """ if extractor_params is None: extractor_params = {} if storage_paths is None: storage_paths = {} # Get extractor list if extractor_name is None: extractor_list = list(extractor_params.keys()) else: extractor_list = [extractor_name] if extractor_name in extractor_params: extractor_params = { extractor_name: extractor_params[extractor_name] } # Update (recursively) internal default parameters with given parameters extractor_params = self._update(self.default_parameters, extractor_params) # Update general parameters and expand dependencies for method, data in iteritems(extractor_params): if ('dependency_method' in data and data['dependency_method'] in self.valid_extractors and data['dependency_method'] in extractor_params): data['dependency_parameters'] = extractor_params[data['dependency_method']] feature_repository = FeatureRepository({}) for extractor_name in extractor_list: if extractor_name not in self.valid_extractors: message = '{name}: Invalid extractor method [{method}]'.format( name=self.__class__.__name__, method=extractor_name ) self.logger.exception(message) raise ValueError(message) current_extractor_params = extractor_params[extractor_name] extract = True # Check do we need to extract anything if not self.overwrite and extractor_name in storage_paths and os.path.isfile(storage_paths[extractor_name]): # Load from disk feature_repository[extractor_name] = FeatureContainer(filename=storage_paths[extractor_name]) # Check the parameters hash1 = ParameterContainer().get_hash(current_extractor_params) hash2 = ParameterContainer().get_hash(feature_repository[extractor_name]['meta']['parameters']) if hash1 == hash2: # The loaded data contains features with same parameters, no need to extract them anymore extract = False # Feature extraction stage if extract: # Load audio y, fs = self._load_audio(audio_file=audio_file, params=current_extractor_params) # Check for dependency to other features if 'dependency_method' in current_extractor_params and current_extractor_params['dependency_method']: # Current extractor is depending on other extractor if current_extractor_params['dependency_method'] not in self.valid_extractors: message = '{name}: Invalid dependency extractor method [{method1}] for method [{method2}]'.format( name=self.__class__.__name__, method1=current_extractor_params['dependency_method'], method2=extractor_name ) self.logger.exception(message) raise ValueError(message) if (current_extractor_params['dependency_method'] in storage_paths and os.path.isfile(storage_paths[current_extractor_params['dependency_method']])): # Load features from disk data = FeatureContainer( filename=storage_paths[current_extractor_params['dependency_method']] ).feat else: # Extract features dependency_func = getattr(self, '_{}'.format(current_extractor_params['dependency_method']), None) if dependency_func is not None: data = dependency_func(data=y, params=current_extractor_params['dependency_parameters']) else: message = '{name}: No extraction method for dependency extractor [{method}]'.format( name=self.__class__.__name__, method=current_extractor_params['dependency_method'] ) self.logger.exception(message) raise ValueError(message) else: # By pass data = y # Extract features extractor_func = getattr(self, '_{}'.format(extractor_name), None) if extractor_func is not None: data = extractor_func(data=data, params=current_extractor_params) # Feature extraction meta information meta = { 'parameters': current_extractor_params, 'datetime': strftime("%Y-%m-%d %H:%M:%S", gmtime()), 'audio_file': audio_file, 'extractor_version': self.__version__, } # Create feature container feature_container = FeatureContainer(features=data, meta=meta) if self.store and extractor_name in storage_paths: feature_container.save(filename=storage_paths[extractor_name]) feature_repository[extractor_name] = feature_container else: message = '{name}: No extraction method for extractor [{method}]'.format( name=self.__class__.__name__, method=extractor_name ) self.logger.exception(message) raise ValueError(message) return FeatureRepository(feature_repository)
[docs] def get_default_parameters(self): """Get default parameters as dict Returns ------- DottedDict """ return DottedDict(self.default_parameters)
def _mel(self, data, params): """Mel-band energies Parameters ---------- data : numpy.ndarray Audio data. params : dict Parameters. Returns ------- list of numpy.ndarray List of feature matrices, feature matrix per audio channel. """ window = self._window_function( N=params.get('win_length_samples'), window_type=params.get('window') ) mel_basis = librosa.filters.mel( sr=params.get('fs'), n_fft=params.get('n_fft'), n_mels=params.get('n_mels'), fmin=params.get('fmin'), fmax=params.get('fmax'), htk=params.get('htk') ) if params.get('normalize_mel_bands'): mel_basis /= numpy.max(mel_basis, axis=-1)[:, None] feature_matrix = [] for channel in range(0, data.shape[0]): spectrogram_ = self._spectrogram( y=data[channel, :], n_fft=params.get('n_fft'), win_length_samples=params.get('win_length_samples'), hop_length_samples=params.get('hop_length_samples'), spectrogram_type=params.get('spectrogram_type') if 'spectrogram_type' in params else 'magnitude', center=True, window=window ) mel_spectrum = numpy.dot(mel_basis, spectrogram_) if params.get('log'): mel_spectrum = numpy.log(mel_spectrum + self.eps) mel_spectrum = mel_spectrum.T feature_matrix.append(mel_spectrum) return feature_matrix def _mfcc(self, data, params): """Static MFCC Parameters ---------- data : numpy.ndarray Audio data params : dict Parameters Returns ------- list of numpy.ndarray List of feature matrices, feature matrix per audio channel """ window = self._window_function( N=params.get('win_length_samples'), window_type=params.get('window') ) mel_basis = librosa.filters.mel( sr=params.get('fs'), n_fft=params.get('n_fft'), n_mels=params.get('n_mels'), fmin=params.get('fmin'), fmax=params.get('fmax'), htk=params.get('htk') ) if params.get('normalize_mel_bands'): mel_basis /= numpy.max(mel_basis, axis=-1)[:, None] feature_matrix = [] for channel in range(0, data.shape[0]): # Calculate Static Coefficients spectrogram_ = self._spectrogram( y=data[channel, :], n_fft=params.get('n_fft'), win_length_samples=params.get('win_length_samples'), hop_length_samples=params.get('hop_length_samples'), spectrogram_type=params.get('spectrogram_type') if 'spectrogram_type' in params else 'magnitude', center=True, window=window ) mel_spectrum = numpy.dot(mel_basis, spectrogram_) mfcc = librosa.feature.mfcc(S=librosa.logamplitude(mel_spectrum), n_mfcc=params.get('n_mfcc')) feature_matrix.append(mfcc.T) return feature_matrix def _mfcc_delta(self, data, params): """Delta MFCC Parameters ---------- data : numpy.ndarray Audio data params : dict Parameters Returns ------- list of numpy.ndarray List of feature matrices, feature matrix per audio channel """ feature_matrix = [] for channel in range(0, len(data)): # Delta coefficients delta = librosa.feature.delta( data[channel].T, width=params.get('width') ) feature_matrix.append(delta.T) return feature_matrix def _mfcc_acceleration(self, data, params): """Acceleration MFCC Parameters ---------- data : numpy.ndarray Audio data params : dict Parameters Returns ------- list of numpy.ndarray List of feature matrices, feature matrix per audio channel """ feature_matrix = [] for channel in range(0, len(data)): # Acceleration coefficients (aka delta delta) acceleration = librosa.feature.delta( data[channel].T, order=2, width=params.get('width') ) feature_matrix.append(acceleration.T) return feature_matrix def _load_audio(self, audio_file, params): """Load audio using AudioFile class Parameters ---------- audio_file : str params : dict Returns ------- numpy.ndarray Audio data fs : int Sampling frequency """ # Collect parameters mono = False if 'mono' in params: mono = params.get('mono') elif 'dependency_parameters' in params and 'mono' in params['dependency_parameters']: mono = params['dependency_parameters']['mono'] fs = None if 'fs' in params: fs = params.get('fs') elif 'dependency_parameters' in params and 'fs' in params['dependency_parameters']: fs = params['dependency_parameters']['fs'] normalize_audio = False if 'normalize_audio' in params: normalize_audio = params.get('normalize_audio') elif 'dependency_parameters' in params and 'normalize_audio' in params['dependency_parameters']: normalize_audio = params['dependency_parameters']['normalize_audio'] # Load audio with correct parameters y, fs = AudioFile().load(filename=audio_file, mono=mono, fs=fs) if mono: # Make sure mono audio has correct shape y = numpy.reshape(y, [1, -1]) # Normalize audio if normalize_audio: for channel in range(0, y.shape[0]): y[channel] = self._normalize_audio(y[channel]) return y, fs @staticmethod def _normalize_audio(y, head_room=0.005): """Normalize audio Parameters ---------- y : numpy.ndarray Audio data head_room : float Head room Returns ------- numpy.ndarray Audio data """ mean_value = numpy.mean(y) y -= mean_value max_value = max(abs(y)) + head_room return y / max_value def _window_function(self, N, window_type='hamming_asymmetric'): """Window function Parameters ---------- N : int window length window_type : str window type (Default value='hamming_asymmetric') Raises ------ ValueError: Unknown window type Returns ------- window function : array """ # Windowing function if window_type == 'hamming_asymmetric': return scipy.signal.hamming(N, sym=False) elif window_type == 'hamming_symmetric': return scipy.signal.hamming(N, sym=True) elif window_type == 'hann_asymmetric': return scipy.signal.hann(N, sym=False) elif window_type == 'hann_symmetric': return scipy.signal.hann(N, sym=True) else: message = '{name}: Unknown window type [{window_type}]'.format( name=self.__class__.__name__, window_type=window_type ) self.logger.exception(message) raise ValueError(message) def _spectrogram(self, y, n_fft=1024, win_length_samples=0.04, hop_length_samples=0.02, window=scipy.signal.hamming(1024, sym=False), center=True, spectrogram_type='magnitude'): """Spectrogram Parameters ---------- y : numpy.ndarray Audio data n_fft : int FFT size Default value "1024" win_length_samples : float Window length in seconds Default value "0.04" hop_length_samples : float Hop length in seconds Default value "0.02" window : array Window function Default value "scipy.signal.hamming(1024, sym=False)" center : bool If true, input signal is padded so to the frame is centered at hop length Default value "True" spectrogram_type : str Type of spectrogram "magnitude" or "power" Default value "magnitude" Returns ------- np.ndarray [shape=(1 + n_fft/2, t), dtype=dtype] STFT matrix """ if spectrogram_type == 'magnitude': return numpy.abs(librosa.stft(y + self.eps, n_fft=n_fft, win_length=win_length_samples, hop_length=hop_length_samples, center=center, window=window)) elif spectrogram_type == 'power': return numpy.abs(librosa.stft(y + self.eps, n_fft=n_fft, win_length=win_length_samples, hop_length=hop_length_samples, center=center, window=window)) ** 2 else: message = '{name}: Unknown spectrum type [{spectrogram_type}]'.format( name=self.__class__.__name__, spectrogram_type=spectrogram_type ) self.logger.exception(message) raise ValueError(message) def _update(self, d, u): """Recursive dict update """ for k, v in iteritems(u): if isinstance(v, collections.Mapping): r = self._update(d.get(k, {}), v) d[k] = r else: d[k] = u[k] return d
class FeatureProcessingUnitMixin(object): """Feature processing chain unit mixin""" def process(self, feature_data): pass
[docs]class FeatureStacker(FeatureProcessingUnitMixin): """Feature stacker""" __version__ = '0.0.1'
[docs] def __init__(self, recipe, feature_hop=1, **kwargs): """Constructor Parameters ---------- recipe : dict Stacking recipe feature_hop : int, optional Feature hopping Default value 1 """ self.recipe = recipe self.feature_hop = feature_hop self.logger = kwargs.get('logger', logging.getLogger(__name__))
def __getstate__(self): """Return only needed data for pickle""" return { 'recipe': self.recipe, 'feature_hop': self.feature_hop, } def __setstate__(self, d): self.recipe = d['recipe'] self.feature_hop = d['feature_hop'] self.logger = logging.getLogger(__name__)
[docs] def normalizer(self, normalizer_list): """Stack normalization factors based on stack map Parameters ---------- normalizer_list : dict List of Normalizer classes Returns ------- dict Stacked normalizer variables in a dict """ # Check that all feature matrices have same amount of frames frame_count = -1 for feature in self.recipe: method = feature['method'] if 'vector-index' in feature: channel = feature['vector-index']['channel'] else: channel = 0 # Default value if frame_count == -1: frame_count = normalizer_list[method]['N'] elif frame_count != normalizer_list[method]['N']: message = '{name}: Normalizers should have seen same number of frames {count1} != {count2} [{method}]'.format( name=self.__class__.__name__, count1=frame_count, count2=normalizer_list[method]['N'], method=method) self.logger.exception(message) raise AssertionError(message) stacked_mean = [] stacked_std = [] for feature in self.recipe: method = feature['method'] # Default values channel = 0 if 'vector-index' in feature: channel = feature['vector-index']['channel'] if ('vector-index' not in feature or ('vector-index' in feature and 'full' in feature['vector-index'] and feature['vector-index']['full'])): # We have Full matrix stacked_mean.append(normalizer_list[method]['mean'][channel]) stacked_std.append(normalizer_list[method]['std'][channel]) elif ('vector-index' in feature and 'vector' in feature['vector-index'] and 'selection' in feature['vector-index'] and feature['vector-index']['selection']): # We have selector vector stacked_mean.append(normalizer_list[method]['mean'][channel][:, feature['vector-index']['vector']]) stacked_std.append(normalizer_list[method]['std'][channel][:, feature['vector-index']['vector']]) elif ('vector-index' in feature and 'start' in feature['vector-index'] and 'end' in feature['vector-index']): # we have start and end index stacked_mean.append(normalizer_list[method]['mean'][channel][:, feature['vector-index']['start']:feature['vector-index']['end']]) stacked_std.append(normalizer_list[method]['std'][channel][:, feature['vector-index']['start']:feature['vector-index']['end']]) normalizer = { 'mean': [numpy.hstack(stacked_mean)], 'std': [numpy.hstack(stacked_std)], 'N': [frame_count], } return normalizer
[docs] def feature_vector(self, feature_repository): """Feature vector creation Parameters ---------- feature_repository : FeatureRepository, dict Feature repository with needed features Returns ------- FeatureContainer """ # Check that all feature matrices have same amount of frames frame_count = -1 for feature in self.recipe: method = feature['method'] channel = 0 # Default value if 'vector-index' in feature: channel = feature['vector-index']['channel'] if frame_count == -1: frame_count = feature_repository[method].feat[channel].shape[0] elif frame_count != feature_repository[method].feat[channel].shape[0]: message = '{name}: Feature matrices should have same number of frames {count1} != {count2} [{method}]'.format( name=self.__class__.__name__, count1=frame_count, count2=feature_repository[method].feat[channel].shape[0], method=method ) self.logger.exception(message) raise AssertionError(message) # Stack features feature_matrix = [] for feature in self.recipe: method = feature['method'] # Default values channel = 0 if 'vector-index' in feature: channel = feature['vector-index']['channel'] if ('vector-index' not in feature or ('vector-index' in feature and 'full' in feature['vector-index'] and feature['vector-index']['full'])): # We have Full matrix feature_matrix.append(feature_repository[method].feat[channel][::self.feature_hop, :]) elif ('vector-index' in feature and 'vector' in feature['vector-index'] and 'selection' in feature['vector-index'] and feature['vector-index']['selection']): index = numpy.array(feature['vector-index']['vector']) # We have selector vector feature_matrix.append(feature_repository[method].feat[channel][::self.feature_hop, index]) elif ('vector-index' in feature and 'start' in feature['vector-index'] and 'end' in feature['vector-index']): # we have start and end index feature_matrix.append(feature_repository[method].feat[channel][::self.feature_hop, feature['vector-index']['start']:feature['vector-index']['end']]) meta = { 'parameters': { 'fs': feature_repository[method].meta['parameters']['fs'], 'win_length_seconds': feature_repository[method].meta['parameters'].get('win_length_seconds'), 'win_length_samples': feature_repository[method].meta['parameters'].get('win_length_samples'), 'hop_length_seconds': feature_repository[method].meta['parameters'].get('hop_length_seconds'), 'hop_length_samples': feature_repository[method].meta['parameters'].get('hop_length_samples'), }, 'datetime': strftime("%Y-%m-%d %H:%M:%S", gmtime()), 'audio_file': feature_repository[method].meta['audio_file'], 'extractor_version': None, } return FeatureContainer(features=[numpy.hstack(feature_matrix)], meta=meta)
[docs] def process(self, feature_data): """Feature vector creation Parameters ---------- feature_data : FeatureRepository Feature repository with needed features Returns ------- FeatureContainer """ return self.feature_vector(feature_repository=feature_data)
[docs]class FeatureNormalizer(DataFile, ContainerMixin, FeatureProcessingUnitMixin): """Feature normalizer Accumulates feature statistics Examples -------- >>> normalizer = FeatureNormalizer() >>> for feature_matrix in training_items: >>> normalizer.accumulate(feature_matrix) >>> >>> normalizer.finalize() >>> for feature_matrix in test_items: >>> feature_matrix_normalized = normalizer.normalizer(feature_matrix) >>> # used the features """ __version__ = '0.0.1'
[docs] def __init__(self, stat=None, feature_matrix=None): """__init__ method. Parameters ---------- stat : dict or None Pre-calculated statistics in dict to initialize internal state feature_matrix : numpy.ndarray [shape=(frames, number of feature values)] or None Feature matrix to be used in the initialization """ if stat: defaults = { 'N': [], 'S1': [], 'S2': [], 'mean': [], 'std': [], } defaults.update(stat) super(DataFile, self).__init__(defaults) elif feature_matrix and stat is None: super(DataFile, self).__init__( { 'N': [feature_matrix.shape[0]], 'S1': [numpy.sum(feature_matrix, axis=0)], 'S2': [numpy.sum(feature_matrix ** 2, axis=0)], 'mean': [numpy.mean(feature_matrix, axis=0)], 'std': [numpy.std(feature_matrix, axis=0)], } ) self.finalize() else: super(DataFile, self).__init__( { 'N': [], 'S1': [], 'S2': [], 'mean': [], 'std': [], } )
def __enter__(self): return self def __exit__(self, type, value, traceback): # Finalize accumulated calculation self.finalize() def __getstate__(self): # Return only needed data for pickle return { 'N': self['N'], 'S1': self['S1'], 'S2': self['S2'], 'mean': self['mean'], 'std': self['std'], } def __setstate__(self, d): self.N = d['N'] self.S1 = d['S1'] self.S2 = d['S2'] self.mean = d['mean'] self.std = d['std']
[docs] def accumulate(self, feature_container): """Accumulate statistics Parameters ---------- feature_container : FeatureContainer Returns ------- nothing """ stat = feature_container.stat for channel in range(0, len(stat)): if len(self['N']) <= channel: self['N'].insert(channel, 0) self['N'][channel] += stat[channel]['N'] if len(self['mean']) <= channel: self['mean'].insert(channel, 0) self['mean'][channel] += stat[channel]['mean'] if len(self['S1']) <= channel: self['S1'].insert(channel, 0) self['S1'][channel] += stat[channel]['S1'] if len(self['S2']) <= channel: self['S2'].insert(channel, 0) self['S2'][channel] += stat[channel]['S2'] return self
[docs] def finalize(self): """Finalize statistics calculation Accumulated values are used to get mean and std for the seen feature data. Parameters ---------- Returns ------- None """ for channel in range(0, len(self['N'])): # Finalize statistics self['mean'][channel] = self['S1'][channel] / self['N'][channel] if len(self['std']) <= channel: self['std'].insert(channel, 0) self['std'][channel] = numpy.sqrt((self['N'][channel] * self['S2'][channel] - (self['S1'][channel] * self['S1'][channel])) / (self['N'][channel] * (self['N'][channel] - 1))) # In case we have very brain-death material we get std = Nan => 0.0 self['std'][channel] = numpy.nan_to_num(self['std'][channel]) self['mean'][channel] = numpy.reshape(self['mean'][channel], [1, -1]) self['std'][channel] = numpy.reshape(self['std'][channel], [1, -1]) return self
[docs] def normalize(self, feature_container, channel=0): """Normalize feature matrix with internal statistics of the class Parameters ---------- feature_container : numpy.ndarray [shape=(frames, number of feature values)] Feature matrix to be normalized channel : int Feature channel Default value "0" Returns ------- feature_matrix : numpy.ndarray [shape=(frames, number of feature values)] Normalized feature matrix """ if isinstance(feature_container, FeatureContainer): feature_container.feat[channel] = (feature_container.feat[channel] - self['mean'][channel]) / self['std'][channel] return feature_container elif isinstance(feature_container, numpy.ndarray): return (feature_container - self['mean'][channel]) / self['std'][channel]
[docs] def process(self, feature_data): """Normalize feature matrix with internal statistics of the class Parameters ---------- feature_data : FeatureContainer or numpy.ndarray [shape=(frames, number of feature values)] Feature matrix to be normalized Returns ------- feature_matrix : numpy.ndarray [shape=(frames, number of feature values)] Normalized feature matrix """ return self.normalize(feature_container=feature_data)
[docs]class FeatureAggregator(FeatureProcessingUnitMixin): """Feature aggregator""" __version__ = '0.0.1' valid_method = ['mean', 'std', 'cov', 'kurtosis', 'skew', 'flatten']
[docs] def __init__(self, *args, **kwargs): """Constructor Parameters ---------- recipe : list of dict or list of str Aggregation recipe, supported methods [mean, std, cov, kurtosis, skew, flatten]. win_length_frames : int Window length in feature frames hop_length_frames : int Hop length in feature frames """ if isinstance(kwargs.get('recipe'), dict): self.recipe = [d['method'] for d in kwargs.get('recipe')] elif isinstance(kwargs.get('recipe'), list): recipe = kwargs.get('recipe') if isinstance(recipe[0], dict): self.recipe = [d['method'] for d in kwargs.get('recipe')] else: self.recipe = recipe self.win_length_frames = kwargs.get('win_length_frames') self.hop_length_frames = kwargs.get('hop_length_frames')
def __getstate__(self): # Return only needed data for pickle return { 'recipe': self.recipe, 'win_length_frames': self.win_length_frames, 'hop_length_frames': self.hop_length_frames, } def __setstate__(self, d): self.recipe = d['recipe'] self.win_length_frames = d['win_length_frames'] self.hop_length_frames = d['hop_length_frames']
[docs] def process(self, feature_data): """Process features Parameters ---------- feature_data : FeatureContainer Features to be aggregated Returns ------- FeatureContainer """ # Not the most efficient way as numpy stride_tricks would produce # faster code, however, opted for cleaner presentation this time. feature_data_per_channel = [] for channel in range(0, feature_data.channels): aggregated_features = [] for frame in range(0, feature_data.feat[channel].shape[0], self.hop_length_frames): # Get start and end of the window, keep frame at the middle (approximately) start_frame = int(frame - numpy.floor(self.win_length_frames/2.0)) end_frame = int(frame + numpy.ceil(self.win_length_frames / 2.0)) frame_id = numpy.array(range(start_frame, end_frame)) # If start of feature matrix, pad with first frame frame_id[frame_id < 0] = 0 # If end of the feature matrix, pad with last frame frame_id[frame_id > feature_data.feat[channel].shape[0] - 1] = feature_data.feat[channel].shape[0] - 1 current_frame = feature_data.feat[channel][frame_id, :] aggregated_frame = [] if 'mean' in self.recipe: aggregated_frame.append(current_frame.mean(axis=0)) if 'std' in self.recipe: aggregated_frame.append(current_frame.std(axis=0)) if 'cov' in self.recipe: aggregated_frame.append(numpy.cov(current_frame).flatten()) if 'kurtosis' in self.recipe: aggregated_frame.append(scipy.stats.kurtosis(current_frame)) if 'skew' in self.recipe: aggregated_frame.append(scipy.stats.skew(current_frame)) if 'flatten' in self.recipe: aggregated_frame.append(current_frame.flatten()) if aggregated_frame: aggregated_features.append(numpy.concatenate(aggregated_frame)) feature_data_per_channel.append(numpy.vstack(aggregated_features)) meta = { 'parameters': { 'recipe': self.recipe, 'win_length_frames': self.win_length_frames, 'hop_length_frames': self.hop_length_frames, }, 'datetime': strftime("%Y-%m-%d %H:%M:%S", gmtime()), } if 'audio_file' in feature_data.meta: meta['audio_file'] = feature_data.meta['audio_file'] return FeatureContainer(features=feature_data_per_channel, meta=meta)
[docs]class FeatureMasker(object): """Feature masker""" __version__ = '0.0.1'
[docs] def __init__(self, *args, **kwargs): """Constructor Parameters ---------- hop_length_seconds : float Hop length in seconds """ self.hop_length_seconds = kwargs.get('hop_length_seconds') # Initialize mask events self.mask_events = MetaDataContainer()
def __getstate__(self): # Return only needed data for pickle return { 'hop_length_seconds': self.hop_length_seconds, } def __setstate__(self, d): self.hop_length_seconds = d['hop_length_seconds'] self.mask_events = MetaDataContainer() def set_mask(self, mask_events): """Set masking events Parameters ---------- mask_events : list of MetaItems or MetaDataContainer Event list used for masking """ self.mask_events = mask_events return self def masking(self, feature_data, mask_event): """Masking feature repository with given events Parameters ---------- feature_data : FeatureRepository mask_events : list of MetaItems or MetaDataContainer Event list used for masking Returns ------- FeatureRepository """ for method in list(feature_data.keys()): removal_mask = numpy.ones((feature_data[method].shape[0]), dtype=bool) for mask_event in self.mask_events: onset_frame = int(numpy.floor(mask_event.event_onset / self.hop_length_seconds)) offset_frame = int(numpy.ceil(mask_event.event_offset / self.hop_length_seconds)) if offset_frame > feature_data[method].shape[0]: offset_frame = feature_data[method].shape[0] removal_mask[onset_frame:offset_frame] = False for channel in range(0, feature_data[method].channels): feature_data[method].feat[channel] = feature_data[method].feat[channel][removal_mask, :] return feature_data
[docs] def process(self, feature_data): """Process feature repository Parameters ---------- feature_data : FeatureRepository Returns ------- FeatureRepository """ if self.mask_events: return self.masking(feature_data=feature_data, mask_event=self.mask_events) else: return feature_data