Source code for dcase_framework.parameters

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Parameters
==========
Class for parameter handling. ParameterContainer is based on dict and supports reading and writing to YAML files.

Example YAML file:

.. code-block:: yaml

    active_set: SET1

    sets:
        - set_id: SET1
          processor1:
            method: special_method_2
        - set_id: SET2
          processor1:
            method: special_method_3

    defaults:
        flow:
            task1: true
            task2: true
            task3: true

        processor1:
            method: special_method_1
            field1: 44100
            field2: 22050

        processor1_method_parameters:
            special_method_1:
                field1: 'test1'

            special_method_2:
                field1: 'test2'

            special_method_3:
                field1: 'test3'

        processor2:
            recipe: special_method_1;special_method_2;special_method_3

        processor2_method_parameters:
            special_method_1:
                field1: 'test1'

            special_method_2:
                field1: 'test2'

            special_method_3:
                field1: 'test3'

Once :func:`ParameterContainer.process` is called:

1. ``active_set`` field is used to select parameter set to override parameters in the ``defaults`` block. After this parameter container contains only parameters inside defaults block with overrides.
2. Each main level section (``flow``, ``processor``, ``processor_method_parameters`` in the example above) are processed one by one.

    - If section contains ``method``-field, parameters are copied from ``[SECTION_NAME]_method_parameters`` under ``parameters``-field.
    - If section contains ``recipe``-field, recipe is first parsed and parameters are copied from ``[SECTION_NAME]_method_parameters`` under ``parameters``-field.

Parameters after processing:

.. code-block:: yaml

    flow:
        task1: true
        task2: true
        task3: true

    processor1:
        _hash: 1d511b716b3cd075fbc752750b0c5932
        method: special_method_2
        field1: 44100
        field2: 22050
        parameters:
            field1: 'test2'

    processor2:
        _hash: f17897bd2a133d1c1d1c853e491d2a3a
        recipe:
            - method: special_method_1
            - method: special_method_2
            - method: special_method_3

        special_method_1;special_method_2;special_method_3
        parameters:
            special_method_1:
                field1: 'test1'

            special_method_2:
                field1: 'test2'

            special_method_3:
                field1: 'test3'

Recipe
^^^^^^

Recipe special field can be used to select multiple methods. It is specially useful for constructing feature matrix from multiple sources.
Method blocks in the string are delimited with ``;`` (e.g. method1;method2;method1).

Individual items in this list can be formatted following way:

- [method_name (string)]                                                       => full vector
- [method_name (string)]=[start index (int)]-[end index (int)]                 => default channel 0 and vector [start:end]
- [method_name (string)]=[channel (int)]:[start index (int)]-[end index (int)] => specified channel and vector [start:end]
- [method_name (string)]=1,2,3,4,5                                             => vector [1,2,3,4,4]
- [method_name (string)]=0                                                     => specified channel and full vector


Paths and parameter hash
^^^^^^^^^^^^^^^^^^^^^^^^

Parameters under each section is used to form parameter hash. ParameterContainer's property
:py:attr:`dcase_framework.parameters.ParameterContainer.path_structure` defines how these section wise parameter hashes
are used to form storage paths for each section. The main idea is that when parameters change path will change and when
the parameters are the same path is the same allowing reusing already stored data (process with correct parameters).

**Path structure**

Example definition for path structure.

.. code-block:: python

    self.path_structure = {
        'feature_extractor': [
            'feature_extractor.parameters.*'
        ],
        'feature_normalizer': [
            'feature_extractor.parameters.*'
        ],
        'learner': [
            'feature_extractor',
            'feature_normalizer',
            'feature_aggregator',
            'learner'
        ],
        'recognizer': [
            'feature_extractor',
            'feature_normalizer',
            'feature_aggregator',
            'learner',
            'recognizer'
        ],
        'evaluator': [
        ]
    }

One can use wild card for lists (e.g. ``feature_extractor.parameters.*``), in this case each item in the list is producing individual path. This can be used to
make paths, for examples, for each feature extractor separately.


This will lead following paths:

.. code-block:: txt

    feature_extractor/feature_extractor_68a40f5e3b77df9564aaa68c92e95be9/
    feature_extractor/feature_extractor_74c5e3ce692f5973c5071c1cf0a89ee0/
    feature_extractor/feature_extractor_661304966061610bc09744166b10f76e/

    feature_normalizer/feature_extractor_68a40f5e3b77df9564aaa68c92e95be9/
    feature_normalizer/feature_extractor_74c5e3ce692f5973c5071c1cf0a89ee0/
    feature_normalizer/feature_extractor_661304966061610bc09744166b10f76e/

    learner/feature_extractor_5ca1f32c65b3eea59e1bb27b09b747ea/feature_normalizer_67b9b20ff555e8eaee22f5e50695df8b/feature_aggregator_baaf606d9ac1eaca43a6a24b599998a9/learner_624a422b47a32e20b90ad6e6151057f8

    recognizer/feature_extractor_5ca1f32c65b3eea59e1bb27b09b747ea/feature_normalizer_67b9b20ff555e8eaee22f5e50695df8b/feature_aggregator_baaf606d9ac1eaca43a6a24b599998a9/learner_624a422b47a32e20b90ad6e6151057f8/recognizer_08c503973f61ef4c4c5f7c56709d801c

Parameter section used to form hash will be saved in each sub folder (parameters.yaml) to make it easier handle files manually if needed.

**Hash**

Parameter hash value is md5 hash for stringified parameter dict of the section, with a few clean ups helping to keep
hash compatible when extending parameter selection in the section later. Following rules are used:

- If section contains field ``enable`` with value ``False`` all other fields inside this section are excluded from the parameter hash calculation. This will make the hash robust if section is not used but still unused parameters are changed.
- If section contains fields with value ``False``, this field is excluded from the parameter hash calculation. This will enable to add new flag parameters, without changing hash, just define the new flag so that previous behaviour is happening when this field is set to false.
- If section contains any of the ``non_hashable_fields`` fields, those are excluded from the parameter hash calculation. These fields are set when :class:`ParameterContainer` is constructed, and they usually are fields used to print various values to the console. These fields do not change the system output to be saved onto disk, and hence they are excluded from hash.

Use :py:attr:`dcase_framework.parameters.ParameterContainer.non_hashable_fields` to exclude fields from hash. use  :py:attr:`dcase_framework.parameters.ParameterContainer.control_sections` to omit hash calculation for parameter
sections which do not needed it.

ParameterContainer
^^^^^^^^^^^^^^^^^^

Usage examples:

.. code-block:: python
    :linenos:

    # Load parameters
    params = ParameterContainer().load(filename='parameters.yaml')
    # Process parameters
    params.process()
    # Print parameters
    print(params)
    # Get parameters
    value = get_path('section1.parameter1')

.. autosummary::
    :toctree: generated/

    ParameterContainer
    ParameterContainer.load
    ParameterContainer.save
    ParameterContainer.exists
    ParameterContainer.get_path
    ParameterContainer.show
    ParameterContainer.log
    ParameterContainer.override
    ParameterContainer.process
    ParameterContainer.process_method_parameters
    ParameterContainer.get_hash

"""

from __future__ import print_function, absolute_import
from six import iteritems

import os
import hashlib
import json
import copy
import numpy
import itertools
import platform

from .files import ParameterFile
from .containers import ContainerMixin, DottedDict


[docs]class ParameterContainer(ParameterFile, ContainerMixin):
[docs] def __init__(self, *args, **kwargs): """Constructor Parameters ---------- project_base : str Absolute path to the project root section_process_order : list, optional Parameter section processing order. Given dict is used to override internal default list. Default value "None" path_structure : dict of lists, optional Defines how paths are created, section hash is used to create unique folder names. Given dict is used to override internal default list. Default value "None" method_dependencies : dict of dicts, optional Given dict is used to override internal default list. Default value "None" magic_field : dict, optional Dict of field names for specific tasks. Given dict is used to override internal default list. Default value "None" non_hashable_fields : list, optional List of fields skipped when parameter hash for the section is calculated. Given list is used to override internal default list. Default value "None" control_sections : list, optional List of top level sections used for framework control, for these section no hash is calculated. Given list is used to override internal default list. Default value "None" """ super(ParameterContainer, self).__init__(*args, **kwargs) # Mark container non-processed self.processed = False # Project base path if kwargs.get('project_base'): self.project_base = kwargs.get('project_base') else: self.project_base = os.path.dirname(os.path.realpath(__file__)) if os.path.split(self.project_base)[1] == 'src': # If we are in 'src' folder remove one level self.project_base = os.path.join(os.path.split(self.project_base)[0]) # Define section processing order self.section_process_order = [ 'flow', 'general', 'logging', 'path', 'dataset', 'dataset_method_parameters', 'feature_extractor', 'feature_extractor_method_parameters', 'feature_stacker', 'feature_stacker_method_parameters', 'feature_normalizer', 'feature_normalizer_parameters', 'feature_aggregator', 'feature_aggregator_parameters', 'learner', 'recognizer', 'learner_method_parameters', 'recognizer_method_parameters', 'evaluator', ] if kwargs.get('section_process_order'): self.path_structure.update(kwargs.get('section_process_order')) # Define how paths are constructed from section hashes self.path_structure = { 'feature_extractor': [ 'feature_extractor.parameters.*' ], 'feature_normalizer': [ 'feature_extractor.parameters.*' ], 'learner': [ 'feature_extractor', 'feature_stacker', 'feature_normalizer', 'feature_aggregator', 'learner' ], 'recognizer': [ 'feature_extractor', 'feature_stacker', 'feature_normalizer', 'feature_aggregator', 'learner', 'recognizer' ], 'evaluator': [ ] } if kwargs.get('path_structure'): self.path_structure.update(kwargs.get('path_structure')) # Method dependencies map self.method_dependencies = { 'feature_extractor': { 'mel': None, 'mfcc': None, 'mfcc_delta': 'feature_extractor.mfcc', 'mfcc_acceleration': 'feature_extractor.mfcc', }, } if kwargs.get('method_dependencies'): self.method_dependencies.update(kwargs.get('method_dependencies')) # Map for magic field names self.magic_field = { 'default-parameters': 'defaults', 'set-list': 'sets', 'set-id': 'set_id', 'active-set': 'active_set', 'parameters': 'parameters', 'method': 'method', 'recipe': 'recipe', 'path': 'path', 'flow': 'flow', 'logging': 'logging', 'general': 'general', 'evaluator': 'evaluator', } if kwargs.get('magic_field'): self.magic_field.update(kwargs.get('magic_field')) # Fields to be skipped when parameter hash is calculated self.non_hashable_fields = [ '_hash', 'verbose', 'print_system_progress', 'log_system_parameters', 'log_system_progress', 'log_learner_status', 'show_model_information', 'use_ascii_progress_bar', 'label', 'active_scenes', 'active_events', 'plotting_rate', 'focus_span', 'output_format', ] if kwargs.get('non_hashable_fields'): self.non_hashable_fields.update(kwargs.get('non_hashable_fields')) # Parameters sections which will not be included in the master parameter hash self.control_sections = [ 'flow', 'path', 'logging', 'general', 'evaluator', ] if kwargs.get('control_sections'): self.control_sections.update(kwargs.get('control_sections'))
[docs] def override(self, override): """Override container content recursively. Parameters ---------- override : dict, str Depending type following is done: - If dict given, this is used directly to override parameters in the container. - If str is given which is a filename of existing file on disk, parameter file is loaded and it is used to override container parameters - If str is given which contains JSON formatted parameters, content is used to override container parameters Raises ------ ImportError: JSON import failed ValueError: Not JSON formatted string given Returns ------- self """ if isinstance(override, dict): self.merge(override=override) elif isinstance(override, str) and os.path.isfile(override): self.merge(override=ParameterFile(filename=override).load()) elif isinstance(override, str): try: try: import ujson as json except ImportError: try: import json except ImportError: message = '{name}: Unable to import json module'.format( name=self.__class__.__name__ ) self.logger.exception(message) raise ImportError(message) self.merge(override=json.loads(override)) except: message = '{name}: Not JSON formatted string given'.format( name=self.__class__.__name__ ) self.logger.exception(message) raise ValueError(message) return self
[docs] def process(self, create_directories=True, create_parameter_hints=True): """Process parameters Parameters ---------- create_directories : bool Create directories Default value "True" create_parameter_hints : bool Create parameters files to all data folders Default value "True" Raises ------ ValueError: No valid active set given Returns ------- self """ if len(self) == 0: message = '{name}: Parameter container empty'.format( name=self.__class__.__name__ ) self.logger.exception(message) raise IOError(message) if not self.processed: for section_id, section in enumerate(self.control_sections): if section in self.magic_field: self.control_sections[section_id] = self.magic_field[section] if (self.magic_field['default-parameters'] in self and self.magic_field['set-list'] in self and self.magic_field['active-set'] in self): default_params = copy.deepcopy(self[self.magic_field['default-parameters']]) active_set_id = self[self.magic_field['active-set']] override_params = copy.deepcopy( self._search_list_of_dictionaries( key=self.magic_field['set-id'], value=active_set_id, list_of_dictionaries=self[self.magic_field['set-list']] ) ) if not override_params: message = '{name}: No valid active set given [{set_name}]'.format( name=self.__class__.__name__, set_name=active_set_id ) self.logger.exception(message) raise ValueError(message) dict.clear(self) # Empty current content dict.update(self, default_params) # Insert default parameters self['active_set'] = active_set_id self.merge(override=override_params) # Merge override parameters into default parameters if self.magic_field['default-parameters'] in self: default_params = copy.deepcopy(self[self.magic_field['default-parameters']]) dict.clear(self) dict.update(self, default_params) # Get processing order for sections section_list = [] for section in self.section_process_order + list(set(list(self.keys())) - set(self.section_process_order)): if section in self: section_list.append(section) # Parameter processing starts self._convert_main_level_to_dotted() self._preprocess_paths() # 1. Process parameters for section in section_list: field_process_func = getattr(self, '_process_{}'.format(section), None) if field_process_func is not None: field_process_func() if (self[section] and (self.magic_field['method'] in self[section] or self.magic_field['recipe'] in self[section]) and section+'_method_parameters' in self): field_process_parameters_func = getattr(self, '_process_{}_method_parameters'.format(section), None) if field_process_parameters_func is not None: field_process_parameters_func() self._add_hash_to_method_parameters() # 2. Methods and recipies for section in section_list: self.process_method_parameters(section=section) # 3. Inject dependencies for section in section_list: if isinstance(self[section], dict) and self[section] and self.magic_field['parameters'] in self[section]: for key, item in iteritems(self[section][self.magic_field['parameters']]): if (section in self.method_dependencies and key in self.method_dependencies[section] and self.method_dependencies[section][key]): fields = self.method_dependencies[section][key].split('.') if len(fields) == 1: item['dependency_parameters'] = copy.deepcopy( self[section + '_method_parameters'][self.method_dependencies[section][key]] ) item['dependency_method'] = self.method_dependencies[section][key] elif len(fields) == 2: item['dependency_parameters'] = copy.deepcopy( self[fields[0] + '_method_parameters'][fields[1]] ) item['dependency_method'] = fields[1] # 4. Add hash self._add_hash_to_main_parameters() self._add_main_hash() # 5. Post process paths self._postprocess_paths( create_directories=create_directories, create_parameter_hints=create_parameter_hints ) self.processed = True # 6. Clean up # self._clean_unused_parameters() return self
[docs] def process_method_parameters(self, section): """Process methods and recipes in the section Processing rules for fields: - "method" => search for parameters from [section]_method_parameters -section - "recipe" => parse recipe and search for parameters from [section]_method_parameters -section - "\*recipe" => parse recipe Parameters ---------- section : str Section name Raises ------ ValueError: Invalid method for parameter field Returns ------- self """ # Inject method parameters if self[section]: if self.magic_field['method'] in self[section]: if (section + '_method_parameters' in self and self[section][self.magic_field['method']] in self[section + '_method_parameters']): self[section]['parameters'] = copy.deepcopy( self[section + '_method_parameters'][self[section][self.magic_field['method']]] ) else: message = '{name}: Invalid method for parameter field, {field}->method={method}'.format( name=self.__class__.__name__, field=section, method=self[section][self.magic_field['method']] ) self.logger.exception(message) raise ValueError(message) # parse recipes for field in self[section]: if field.endswith(self.magic_field['recipe']): self[section][field] = self._parse_recipe(recipe=self[section][field]) # Inject recipes if self.magic_field['recipe'] in self[section]: self[section][self.magic_field['parameters']] = {} for item in self[section][self.magic_field['recipe']]: if (section + '_method_parameters' in self and item[self.magic_field['method']] in self[section + '_method_parameters']): self[section]['parameters'][item[self.magic_field['method']]] = copy.deepcopy( self[section + '_method_parameters'][item[self.magic_field['method']]] ) else: message = '{name}: Cannot find any parameters for the method in the recipe field, {field}->recipe={method}'.format( name=self.__class__.__name__, field=section, method=item[self.magic_field['method']] ) self.logger.exception(message) raise ValueError(message) return self
@staticmethod def _check_paths(paths, create=True): def make_path(path): if isinstance(path, str) and not os.path.isdir(path): try: os.makedirs(path) except OSError as exception: pass if create: if isinstance(paths, str): make_path(paths) elif isinstance(paths, dict): for key, value in iteritems(paths): make_path(value) elif isinstance(paths, list): for value in paths: make_path(value) def _preprocess_paths(self): # Translate separators if in Windows if platform.system() == 'Windows': for path_field in self['path']: self['path'][path_field] = self['path'][path_field].replace('/', os.path.sep) # If given path is relative, make it absolute if not os.path.isabs(self.get_path('path.data')): self['path']['data'] = os.path.join(self.project_base, self.get_path('path.data')) if not os.path.isabs(self.get_path('path.system_base')): self['path']['system_base'] = os.path.join(self.project_base, self.get_path('path.system_base')) if not os.path.isabs(self.get_path('path.recognizer_challenge_output')): self['path']['recognizer_challenge_output'] = os.path.join( self.project_base, self.get_path('path.recognizer_challenge_output') ) if not os.path.isabs(self.get_path('path.logs')): self['path']['logs'] = os.path.join(self.project_base, self.get_path('path.logs')) def _postprocess_paths(self, create_directories=True, create_parameter_hints=True): # Make sure extended paths exists before saving parameters in them # Check main paths if create_directories: if 'data' in self['path']: self._check_paths(paths=self['path']['data']) if 'system_base' in self['path']: self._check_paths(paths=self['path']['system_base']) if 'logs' in self['path']: self._check_paths(paths=self['path']['logs']) if 'recognizer_challenge_output' in self['path']: self._check_paths(paths=self['path']['recognizer_challenge_output']) # Check path_structure for field, structure in iteritems(self.path_structure): path = self._get_extended_path(path_label=field, structure=structure) if create_directories: self._check_paths(paths=path) if create_parameter_hints: self._save_path_parameters( base=[os.path.join(self['path']['system_base'], self['path'][field])], structure=structure ) self['path'][field] = path @staticmethod def _join_paths(path_parts): if len(path_parts) > 1: for i, value in enumerate(path_parts): if isinstance(value, str): path_parts[i] = [value] if len(path_parts) == 2: path_parts = list(itertools.product(path_parts[0], path_parts[1])) elif len(path_parts) == 3: path_parts = list(itertools.product(path_parts[0], path_parts[1], path_parts[2])) elif len(path_parts) == 4: path_parts = list(itertools.product(path_parts[0], path_parts[1], path_parts[2], path_parts[3])) elif len(path_parts) == 5: path_parts = list(itertools.product(path_parts[0], path_parts[1], path_parts[2], path_parts[3], path_parts[4])) elif len(path_parts) == 6: path_parts = list(itertools.product(path_parts[0], path_parts[1], path_parts[2], path_parts[3], path_parts[4], path_parts[5])) elif len(path_parts) == 7: path_parts = list(itertools.product(path_parts[0], path_parts[1], path_parts[2], path_parts[3], path_parts[4], path_parts[5], path_parts[6])) elif len(path_parts) == 8: path_parts = list(itertools.product(path_parts[0], path_parts[1], path_parts[2], path_parts[3], path_parts[4], path_parts[5], path_parts[6], path_parts[7])) out_path = [] for l in path_parts: out_path.append(os.path.join(*l)) return out_path def _get_extended_path(self, path_label, structure): path_parts = [os.path.join(self['path']['system_base'], self['path'][path_label])] if structure: keys = [] wild_card_found = False for part in structure: if '*' in part: wild_card_found = True path_ = self.get_path(data=self, dotted_path=part[:part.find('*') - 1]) if path_: keys = path_.keys() param_hash = self.get_path(data=self, dotted_path=part + '._hash') if param_hash is not None: if isinstance(param_hash, list): directory_name = [] for h in param_hash: directory_name.append(part.split('.')[0]+'_'+h) else: directory_name = self._get_directory_name(prefix=part.split('.')[0], param_hash=param_hash) path_parts.append(directory_name) paths = self._join_paths(path_parts) if not wild_card_found and len(paths) == 1: return paths[0] else: return dict(zip(keys, paths)) else: return os.path.join(self['path']['system_base'], self['path'][path_label]) def _get_directory_name(self, prefix, param_hash): if platform.system() == 'Windows': # Use short directory names and truncated hash for Windows, as it has path length limit (260) return param_hash[0:20] else: return prefix + '_' + param_hash def _save_path_parameters(self, base, structure, parameter_filename='parameters.yaml'): path_parts = [os.path.join(base[0])] for part in structure: param_hash = self.get_path(data=self, dotted_path=part + '._hash') if param_hash is not None: if isinstance(param_hash, list): directory_name = [] for h in param_hash: directory_name.append(part.split('.')[0] + '_' + h) else: directory_name = self._get_directory_name(prefix=part.split('.')[0], param_hash=param_hash) parameters = self.get_path(data=self, dotted_path=part) path_parts.append(directory_name) current_path = self._join_paths(path_parts) if isinstance(current_path, str): ParameterContainer(parameters).save(filename=os.path.join(current_path[0], parameter_filename)) else: if isinstance(parameters, dict): ParameterContainer(parameters).save(filename=os.path.join(current_path[0], parameter_filename)) else: for path_id, path in enumerate(current_path): if parameters[path_id]: ParameterContainer(parameters[path_id]).save(filename=os.path.join(path, parameter_filename)) def _save_path_parameters_all(self): for path_label, structure in iteritems(self.path_structure): path_parts = [os.path.join(self['path']['system_base'], self['path'][path_label])] for part in structure: param_hash = self.get_path(data=self, dotted_path=part + '.hash') parameters = self.get_path(data=self, dotted_path=part) path_parts.append(param_hash) current_path = self._join_paths(path_parts) if len(current_path) == 1: ParameterContainer(parameters).save(filename=os.path.join(current_path[0], 'parameters.yaml')) else: for path_id, path in enumerate(current_path): if parameters[path_id]: ParameterContainer(parameters[path_id]).save(filename=os.path.join(path, 'parameters.yaml')) def _add_hash_to_main_parameters(self): for field, params in iteritems(self): if isinstance(params, dict): if field not in self.control_sections and self[field]: self[field]['_hash'] = self.get_hash(data=self[field]) def _add_hash_to_method_parameters(self): for field in self: if field.endswith('_method_parameters'): for key, params in iteritems(self[field]): if params: params['_hash'] = self.get_hash(data=params) def _add_main_hash(self): data = {} for field, params in iteritems(self): if isinstance(params, dict): if field not in self.control_sections and self[field]: data[field] = self.get_hash(data=self[field]) self['_hash'] = self.get_hash(data=data) @staticmethod def _parse_recipe(recipe): """Parse feature vector recipe Overall format: [block #1];[block #2];[block #3];... Block formats: - [extractor (string)] = full vector - [extractor (string)]=[start index (int)]-[end index (int)] => default channel 0 and vector [start:end] - [extractor (string)]=[channel (int)]:[start index (int)]-[end index (int)] => specified channel and vector [start:end] - [extractor (string)]=1,2,3,4,5 => vector [1,2,3,4,4] - [extractor (string)]=0 => specified channel and full vector Parameters ---------- recipe : str Feature recipe Returns ------- data : dict Feature recipe structure """ # Define delimiters delimiters = { 'block': ';', 'detail': '=', 'dimension': ':', 'segment': '-', 'vector': ',' } data = [] labels = recipe.split(delimiters['block']) for label in labels: label = label.strip() if label: detail_parts = label.split(delimiters['detail']) method = detail_parts[0].strip() # Default values, used when only extractor is defined e.g. [extractor (string)]; [extractor (string)] vector_index_structure = { 'channel': 0, 'selection': False, 'full': True, } # Inspect recipe further if len(detail_parts) == 2: main_index_parts = detail_parts[1].split(delimiters['dimension']) vector_indexing_string = detail_parts[1] if len(main_index_parts) > 1: # Channel has been defined, # e.g. [extractor (string)]=[channel (int)]:[start index (int)]-[end index (int)] vector_index_structure['channel'] = int(main_index_parts[0]) vector_indexing_string = main_index_parts[1] vector_indexing = vector_indexing_string.split(delimiters['segment']) if len(vector_indexing) > 1: vector_index_structure['start'] = int(vector_indexing[0].strip()) vector_index_structure['end'] = int(vector_indexing[1].strip()) + 1 vector_index_structure['full'] = False vector_index_structure['selection'] = False else: vector_indexing = vector_indexing_string.split(delimiters['vector']) if len(vector_indexing) > 1: a = list(map(int, vector_indexing)) vector_index_structure['full'] = False vector_index_structure['selection'] = True vector_index_structure['vector'] = a else: vector_index_structure['channel'] = int(vector_indexing[0]) vector_index_structure['full'] = True vector_index_structure['selection'] = False current_data = { 'method': method, 'vector-index': vector_index_structure, # 'parameter-path': 'feature.params.' + extractor } else: current_data = { 'method': method, } data.append(current_data) return data def _after_load(self, to_return=None): self.processed = False def _clean_unused_parameters(self): for field in list(self.keys()): if field.endswith('_method_parameters'): del self[field] def _convert_main_level_to_dotted(self): for key, item in iteritems(self): if isinstance(item, dict) and self.magic_field['parameters'] in item: item[self.magic_field['parameters']] = DottedDict(item[self.magic_field['parameters']]) if isinstance(item, dict): self[key] = DottedDict(item) def _process_logging(self): for handler_name, handler_data in iteritems(self['logging']['parameters']['handlers']): if 'filename' in handler_data: handler_data['filename'] = os.path.join(self['path']['logs'], handler_data['filename']) def _process_feature_extractor(self): if ('recipe' not in self['feature_extractor'] and 'feature_stacker' in self and 'stacking_recipe' in self['feature_stacker']): self['feature_extractor']['recipe'] = self.get_path('feature_stacker.stacking_recipe') if 'win_length_seconds' in self['feature_extractor'] and 'fs' in self['feature_extractor']: self['feature_extractor']['win_length_samples'] = int(self.get_path('feature_extractor.win_length_seconds') * self.get_path('feature_extractor.fs')) if 'hop_length_seconds' in self['feature_extractor'] and 'fs' in self['feature_extractor']: self['feature_extractor']['hop_length_samples'] = int(self.get_path('feature_extractor.hop_length_seconds') * self.get_path('feature_extractor.fs')) def _process_feature_normalizer(self): if self.get_path('general.scene_handling'): self['feature_normalizer']['scene_handling'] = self.get_path('general.scene_handling') if self.get_path('general.active_scenes'): self['feature_normalizer']['active_scenes'] = self.get_path('general.active_scenes') if self.get_path('general.event_handling'): self['feature_normalizer']['event_handling'] = self.get_path('general.event_handling') if self.get_path('general.active_events'): self['feature_normalizer']['active_events'] = self.get_path('general.active_events') def _process_feature_extractor_method_parameters(self): # Change None feature parameter sections into empty dicts for method in list(self['feature_extractor_method_parameters'].keys()): if self['feature_extractor_method_parameters'][method] is None: self['feature_extractor_method_parameters'][method] = {} for method, data in iteritems(self['feature_extractor_method_parameters']): data['method'] = method # Copy general parameters if 'fs' in self['feature_extractor']: data['fs'] = self['feature_extractor']['fs'] if 'win_length_seconds' in self['feature_extractor']: data['win_length_seconds'] = self.get_path('feature_extractor.win_length_seconds') if 'win_length_samples' in self['feature_extractor']: data['win_length_samples'] = self.get_path('feature_extractor.win_length_samples') if 'hop_length_seconds' in self['feature_extractor']: data['hop_length_seconds'] = self.get_path('feature_extractor.hop_length_seconds') if 'hop_length_samples' in self['feature_extractor']: data['hop_length_samples'] = self.get_path('feature_extractor.hop_length_samples') def _process_feature_aggregator(self): if 'win_length_seconds' in self['feature_aggregator'] and 'win_length_seconds' in self['feature_extractor']: self['feature_aggregator']['win_length_frames'] = int(numpy.ceil(self.get_path('feature_aggregator.win_length_seconds') / float(self.get_path('feature_extractor.hop_length_seconds')))) if 'hop_length_seconds' in self['feature_aggregator'] and 'win_length_seconds' in self['feature_extractor']: self['feature_aggregator']['hop_length_frames'] = int(numpy.ceil(self.get_path('feature_aggregator.hop_length_seconds') / float(self.get_path('feature_extractor.hop_length_seconds')))) def _process_learner(self): win_length_seconds = self.get_path('feature_extractor.win_length_seconds') hop_length_seconds = self.get_path('feature_extractor.hop_length_seconds') if self.get_path('feature_aggregator.enable'): win_length_seconds = self.get_path('feature_aggregator.win_length_seconds') hop_length_seconds = self.get_path('feature_aggregator.hop_length_seconds') self['learner']['win_length_seconds'] = float(win_length_seconds) self['learner']['hop_length_seconds'] = float(hop_length_seconds) if self.get_path('general.scene_handling'): self['learner']['scene_handling'] = self.get_path('general.scene_handling') if self.get_path('general.active_scenes'): self['learner']['active_scenes'] = self.get_path('general.active_scenes') if self.get_path('general.event_handling'): self['learner']['event_handling'] = self.get_path('general.event_handling') if self.get_path('general.active_events'): self['learner']['active_events'] = self.get_path('general.active_events') def _process_learner_method_parameters(self): for method, data in iteritems(self['learner_method_parameters']): data = DottedDict(data) if data.get_path('training.epoch_processing.enable') and not data.get_path('training.epoch_processing.recognizer'): data['training']['epoch_processing']['recognizer'] = self.get_path('recognizer') def _process_recognizer(self): if self.get_path('general.scene_handling'): self['recognizer']['scene_handling'] = self.get_path('general.scene_handling') if self.get_path('general.active_scenes'): self['recognizer']['active_scenes'] = self.get_path('general.active_scenes') if self.get_path('general.event_handling'): self['recognizer']['event_handling'] = self.get_path('general.event_handling') if self.get_path('general.active_events'): self['recognizer']['active_events'] = self.get_path('general.active_events') if (self.get_path('recognizer.frame_accumulation.enable') and self.get_path('recognizer.frame_accumulation.window_length_seconds')): self['recognizer']['frame_accumulation']['window_length_frames'] = int(self.get_path('recognizer.frame_accumulation.window_length_seconds')/float(self.get_path('learner.hop_length_seconds'))) if (self.get_path('recognizer.event_activity_processing.enable') and self.get_path('recognizer.event_activity_processing.window_length_seconds')): self['recognizer']['event_activity_processing']['window_length_frames'] = int(self.get_path('recognizer.event_activity_processing.window_length_seconds')/float(self.get_path('learner.hop_length_seconds'))) def _process_evaluator(self): if self.get_path('general.scene_handling'): self['evaluator']['scene_handling'] = self.get_path('general.scene_handling') if self.get_path('general.active_scenes'): self['evaluator']['active_scenes'] = self.get_path('general.active_scenes') if self.get_path('general.event_handling'): self['evaluator']['event_handling'] = self.get_path('general.event_handling') if self.get_path('general.active_events'): self['evaluator']['active_events'] = self.get_path('general.active_events')