Source code for dcase_framework.metadata
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Meta data
=========
Utility classes for meta data storage and handling.
Usage examples:
.. code-block:: python
:linenos:
# load meta data
metadata_container = MetaDataContainer(filename='event_list.txt').load()
# Print content
print(metadata_container)
# Log content
metadata_container.log()
# Filter based on event label
print(metadata_container.filter(event_label='drawer'))
# Filter based on scene label
print(metadata_container.filter(scene_label='office'))
# Filter time segment
print(metadata_container.filter_time_segment(onset=10.0, offset=30.0))
# Add time offset
metadata_container.add_time_offset(100)
# Process events
print(metadata_container.process_events(minimum_event_length=0.5, minimum_event_gap=0.5))
# Combine content
metadata_container2 = MetaDataContainer(filename='event_list.txt').load()
metadata_container += metadata_container2
# Unique file list
metadata_container.file_list
MetaDataItem
^^^^^^^^^^^^
Dict based class for storing meta data item (i.e. one row in meta data file).
.. autosummary::
:toctree: generated/
MetaDataItem
MetaDataItem.id
MetaDataItem.file
MetaDataItem.scene_label
MetaDataItem.event_label
MetaDataItem.event_onset
MetaDataItem.event_offset
MetaDataItem.identifier
MetaDataItem.source_label
MetaDataContainer
^^^^^^^^^^^^^^^^^
List of MetaDataItems for storing meta data file in one container.
Reads meta data from CSV-text files. Preferred delimiter is tab, however, other delimiters are supported automatically (they are sniffed automatically).
Supported input formats:
- [file(string)]
- [event_onset (float)][tab][event_offset (float)]
- [file(string)][scene_label(string)]
- [file(string)][scene_label(string)][identifier(string)]
- [event_onset (float)][tab][event_offset (float)][tab][event_label (string)]
- [file(string)][event_onset (float)][tab][event_offset (float)][tab][event_label (string)]
- [file(string)][event_onset (float)][tab][event_offset (float)][tab][event_label (string)][tab][identifier(string)]
- [file(string)[tab][scene_label][tab][event_onset (float)][tab][event_offset (float)][tab][event_label (string)]
- [file(string)[tab][scene_label][tab][event_onset (float)][tab][event_offset (float)][tab][event_label (string)][tab][source_label(string)]
.. autosummary::
:toctree: generated/
MetaDataContainer
MetaDataContainer.log
MetaDataContainer.show
MetaDataContainer.get_string
MetaDataContainer.update
MetaDataContainer.filter
MetaDataContainer.filter_time_segment
MetaDataContainer.process_events
MetaDataContainer.remove_field
MetaDataContainer.slice_field
MetaDataContainer.filter_time_segment
MetaDataContainer.add_time_offset
MetaDataContainer.file_list
MetaDataContainer.event_count
MetaDataContainer.scene_label_count
MetaDataContainer.event_label_count
MetaDataContainer.unique_scene_labels
MetaDataContainer.unique_event_labels
MetaDataContainer.max_offset
MetaDataContainer.load
MetaDataContainer.save
MetaDataContainer.event_stat_counts
MetaDataContainer.event_roll
EventRoll
^^^^^^^^^
Class to convert MetaDataContainer to binary matrix indicating event activity withing time segment defined by time_resolution.
.. autosummary::
:toctree: generated/
EventRoll
EventRoll.roll
EventRoll.pad
EventRoll.plot
ProbabilityItem
^^^^^^^^^^^^^^^
Dict based class for storing meta data item along with probability.
.. autosummary::
:toctree: generated/
ProbabilityItem
ProbabilityItem.id
ProbabilityItem.file
ProbabilityItem.label
ProbabilityItem.timestamp
ProbabilityItem.probability
ProbabilityItem.get_list
ProbabilityContainer
^^^^^^^^^^^^^^^^^^^^
List of ProbabilityItem for storing meta data along with probabilities in one container.
.. autosummary::
:toctree: generated/
ProbabilityContainer
ProbabilityContainer.log
ProbabilityContainer.show
ProbabilityContainer.update
ProbabilityContainer.file_list
ProbabilityContainer.unique_labels
ProbabilityContainer.filter
ProbabilityContainer.get_string
ProbabilityContainer.load
ProbabilityContainer.save
"""
from __future__ import print_function, absolute_import
from .files import ListFile
from .utils import posix_path, get_parameter_hash
import os
import numpy
import csv
import math
import logging
import copy
import re
class MetaMixin(object):
@property
def _delimiter(self):
"""Use csv.sniffer to guess delimeter for CSV file
Returns
-------
"""
sniffer = csv.Sniffer()
valid_delimiters = ['\t', ',', ';', ' ']
delimiter = '\t'
with open(self.filename, 'rt') as f1:
try:
example_content = f1.read(1024)
dialect = sniffer.sniff(example_content)
if hasattr(dialect, '_delimiter'):
if dialect._delimiter in valid_delimiters:
delimiter = dialect._delimiter
elif hasattr(dialect, 'delimiter'):
if dialect.delimiter in valid_delimiters:
delimiter = dialect.delimiter
else:
# Fall back to default
delimiter = '\t'
except:
# Fall back to default
delimiter = '\t'
return delimiter
def update(self, data):
"""Replace content with given list
Parameters
----------
data : list
New content
Returns
-------
self
"""
list.__init__(self, data)
return self
def log(self, level='info'):
"""Log container content
Parameters
----------
level : str
Logging level, possible values [info, debug, warn, warning, error, critical]
Default value "info"
Returns
-------
Nothing
"""
lines = str(self).split('\n')
logger = logging.getLogger(__name__)
for line in lines:
if level.lower() == 'debug':
logger.debug(line)
elif level.lower() == 'info':
logger.info(line)
elif level.lower() == 'warn' or level.lower() == 'warning':
logger.warn(line)
elif level.lower() == 'error':
logger.error(line)
elif level.lower() == 'critical':
logger.critical(line)
def show(self, **kwargs):
"""Print container content
Returns
-------
Nothing
"""
print(self.get_string(**kwargs))
[docs]class MetaDataItem(dict):
[docs] def __init__(self, *args, **kwargs):
"""Constructor
Parameters
----------
dict
"""
dict.__init__(self, *args)
# Process fields
if 'file' in self:
# Keep file paths in unix format even under Windows
self['file'] = posix_path(self['file'])
if 'event_onset' in self:
self['onset'] = self['event_onset']
if 'event_offset' in self:
self['offset'] = self['event_offset']
if 'onset' in self:
self['onset'] = float(self['onset'])
self['event_onset'] = self['onset']
if 'offset' in self:
self['offset'] = float(self['offset'])
self['event_offset'] = self['offset']
if 'event_label' in self and self.event_label:
self['event_label'] = self['event_label'].strip()
if self['event_label'].lower() == 'none':
self['event_label'] = None
if 'scene_label' in self and self.scene_label:
self['scene_label'] = self['scene_label'].strip()
if self['scene_label'].lower() == 'none':
self['scene_label'] = None
if 'tags' in self and self.tags:
if isinstance(self['tags'], str):
self['tags'] = self['tags'].strip()
if self['tags'].lower() == 'none':
self['tags'] = None
if self['tags'] and '#' in self['tags']:
self['tags'] = [x.strip() for x in self['tags'].split('#')]
elif self['tags'] and ',' in self['tags']:
self['tags'] = [x.strip() for x in self['tags'].split(',')]
elif self['tags'] and ';' in self['tags']:
self['tags'] = [x.strip() for x in self['tags'].split(';')]
elif self['tags'] and ':' in self['tags']:
self['tags'] = [x.strip() for x in self['tags'].split(':')]
# Remove empty tags
self['tags'] = list(filter(None, self['tags']))
# Sort tags
self['tags'].sort()
def __str__(self):
if len(self.file) > 30:
file_string = '...'+self.file[-27:]
else:
file_string = self.file
string_data = ' {0:<30s} |'.format(
file_string if file_string is not None else '---'
)
if self.onset is not None:
string_data += ' {:6.2f} |'.format(self.onset)
else:
string_data += ' {:>6s} |'.format('---')
if self.offset is not None:
string_data += ' {:6.2f} |'.format(self.offset)
else:
string_data += ' {:>6s} |'.format('---')
string_data += ' {:<18s} |'.format(self.scene_label if self.scene_label is not None else '---')
string_data += ' {:<20s} |'.format(self.event_label if self.event_label is not None else '---')
string_data += ' {:<20s} |'.format(','.join(self.tags) if self.tags is not None else '---')
string_data += ' {:<8s} |'.format(self.identifier if self.identifier is not None else '---')
string_data += ' {:<8s} |'.format(self.source_label if self.source_label is not None else '---')
return string_data
def __setitem__(self, key, value):
if key == 'event_onset':
super(MetaDataItem, self).__setitem__('event_onset', value)
return super(MetaDataItem, self).__setitem__('onset', value)
elif key == 'event_offset':
super(MetaDataItem, self).__setitem__('event_offset', value)
return super(MetaDataItem, self).__setitem__('offset', value)
else:
return super(MetaDataItem, self).__setitem__(key, value)
@property
def id(self):
"""Unique item identifier
ID is formed by taking MD5 hash of the item data.
Returns
-------
id : str
Unique item id
"""
string = ''
if self.file:
string += self.file
if self.scene_label:
string += self.scene_label
if self.event_label:
string += self.event_label
if self.tags:
string += ','.join(self.tags)
if self.onset:
string += '{:8.4f}'.format(self.onset)
if self.offset:
string += '{:8.4f}'.format(self.offset)
return get_parameter_hash(string)
@staticmethod
def get_header():
string_data = ' {0:<30s} | {1:<6s} | {2:<6s} | {3:<18s} | {4:<20s} | {5:<20s} | {6:<8s} | {7:<8s} |\n'.format(
'File',
'Onset',
'Offset',
'Scene label',
'Event label',
'Tags',
'Loc.ID',
'Source'
)
string_data += ' {0:<30s} + {1:<6s} + {2:<6s} + {3:<18s} + {4:<20s} + {5:<20s} + {6:<8s} + {7:<8s} +\n'.format(
'-'*30,
'-'*6,
'-'*6,
'-' * 18,
'-'*20,
'-'*20,
'-'*8,
'-'*8
)
return string_data
def get_list(self):
"""Return item values in a list with specified order.
Returns
-------
list
"""
fields = list(self.keys())
# Select only valid fields
valid_fields = ['event_label', 'file', 'offset', 'onset', 'scene_label', 'identifier', 'source_label', 'tags']
fields = list(set(fields).intersection(valid_fields))
fields.sort()
if fields == ['file']:
return [self.file]
elif fields == ['event_label', 'file', 'offset', 'onset', 'scene_label']:
return [self.file, self.scene_label, self.onset, self.offset, self.event_label]
elif fields == ['offset', 'onset']:
return [self.onset, self.offset]
elif fields == ['event_label', 'offset', 'onset']:
return [self.onset, self.offset, self.event_label]
elif fields == ['file', 'scene_label']:
return [self.file, self.scene_label]
elif fields == ['file', 'identifier', 'scene_label']:
return [self.file, self.scene_label, self.identifier]
elif fields == ['event_label', 'file', 'offset', 'onset']:
return [self.file, self.onset, self.offset, self.event_label]
elif fields == ['event_label', 'file', 'offset', 'onset', 'identifier', 'scene_label']:
return [self.file, self.scene_label, self.onset, self.offset, self.event_label, self.identifier]
elif fields == ['event_label', 'file', 'offset', 'onset', 'scene_label', 'source_label']:
return [self.file, self.scene_label, self.onset, self.offset, self.event_label, self.source_label]
elif fields == ['event_label', 'file', 'offset', 'onset', 'identifier', 'scene_label', 'source_label']:
return [self.file, self.scene_label, self.onset, self.offset, self.event_label,
self.source_label, self.identifier]
elif fields == ['file', 'tags']:
return [self.file, ";".join(self.tags)+";"]
elif fields == ['file', 'scene_label', 'tags']:
return [self.file, self.scene_label, ";".join(self.tags)+";"]
elif fields == ['file', 'offset', 'onset', 'scene_label', 'tags']:
return [self.file, self.scene_label, self.onset, self.offset, ";".join(self.tags)+";"]
else:
message = '{name}: Invalid meta data format [{format}]'.format(
name=self.__class__.__name__,
format=str(fields)
)
raise ValueError(message)
@property
def file(self):
"""Filename
Returns
-------
str or None
filename
"""
if 'file' in self:
return self['file']
else:
return None
@file.setter
def file(self, value):
# Keep file paths in unix format even under Windows
self['file'] = posix_path(value)
@property
def scene_label(self):
"""Scene label
Returns
-------
str or None
scene label
"""
if 'scene_label' in self:
return self['scene_label']
else:
return None
@scene_label.setter
def scene_label(self, value):
self['scene_label'] = value
@property
def event_label(self):
"""Event label
Returns
-------
str or None
event label
"""
if 'event_label' in self:
return self['event_label']
else:
return None
@event_label.setter
def event_label(self, value):
self['event_label'] = value
@property
def onset(self):
"""Onset
Returns
-------
float or None
onset
"""
if 'onset' in self:
return self['onset']
else:
return None
@onset.setter
def onset(self, value):
self['onset'] = float(value)
@property
def offset(self):
"""Offset
Returns
-------
float or None
offset
"""
if 'offset' in self:
return self['offset']
else:
return None
@offset.setter
def offset(self, value):
self['offset'] = float(value)
@property
def event_onset(self):
"""Event onset
Returns
-------
float or None
event onset
"""
if 'onset' in self:
return self['onset']
else:
return None
@event_onset.setter
def event_onset(self, value):
self['onset'] = float(value)
self['event_onset'] = self['onset']
@property
def event_offset(self):
"""Event offset
Returns
-------
float or None
event offset
"""
if 'offset' in self:
return self['offset']
else:
return None
@event_offset.setter
def event_offset(self, value):
self['offset'] = float(value)
self['event_offset'] = self['offset']
@property
def identifier(self):
"""Identifier
Returns
-------
str or None
location identifier
"""
if 'identifier' in self:
return self['identifier']
else:
return None
@identifier.setter
def identifier(self, value):
self['identifier'] = value
@property
def source_label(self):
"""Source label
Returns
-------
str or None
source label
"""
if 'source_label' in self:
return self['source_label']
else:
return None
@source_label.setter
def source_label(self, value):
self['source_label'] = value
@property
def tags(self):
"""Tags
Returns
-------
list or None
tags
"""
if 'tags' in self:
return self['tags']
else:
return None
@tags.setter
def tags(self, value):
if isinstance(value, str):
value = value.strip()
if value.lower() == 'none':
value = None
if value and '#' in value:
value = [x.strip() for x in value.split('#')]
elif value and ',' in value:
value = [x.strip() for x in value.split(',')]
elif value and ':' in value:
value = [x.strip() for x in value.split(':')]
elif value and ';' in value:
value = [x.strip() for x in value.split(';')]
self['tags'] = value
# Remove empty tags
self['tags'] = list(filter(None, self['tags']))
# Sort tags
self['tags'].sort()
[docs]class MetaDataContainer(ListFile, MetaMixin):
valid_formats = ['csv', 'txt', 'ann']
[docs] def __init__(self, *args, **kwargs):
super(MetaDataContainer, self).__init__(*args, **kwargs)
self.item_class = MetaDataItem
# Convert all items in the list to MetaDataItems
for item_id in range(0, len(self)):
if not isinstance(self[item_id], self.item_class):
self[item_id] = self.item_class(self[item_id])
def __str__(self):
return self.get_string()
def __add__(self, other):
return self.update(super(MetaDataContainer, self).__add__(other))
[docs] def log(self, level='info'):
"""Log container content
Parameters
----------
level : str
Logging level, possible values [info, debug, warn, warning, error, critical]
Default value "info"
Returns
-------
Nothing
"""
lines = str(self).split('\n')
logger = logging.getLogger(__name__)
for line in lines:
if level.lower() == 'debug':
logger.debug(line)
elif level.lower() == 'info':
logger.info(line)
elif level.lower() == 'warn' or level.lower() == 'warning':
logger.warn(line)
elif level.lower() == 'error':
logger.error(line)
elif level.lower() == 'critical':
logger.critical(line)
[docs] def show(self, show_stats=True):
"""Print container content
Returns
-------
Nothing
"""
print(self.get_string(show_stats=show_stats))
[docs] def get_string(self, show_stats=True):
"""Get content in string format
Parameters
----------
show_stats : bool
Include scene and event statistics
Default value "True"
Returns
-------
str
Multi-line string
"""
string_data = ''
string_data += self.item_class().get_header()
for i in self:
string_data += str(self.item_class(i)) + '\n'
stats = self._stats()
if show_stats:
if 'scenes' in stats and 'scene_label_list' in stats['scenes'] and stats['scenes']['scene_label_list']:
string_data += '\n === Scene statistics ===\n'
string_data += ' {0:<40s} | {1:<5s} |\n'.format('Scene label', 'Count')
string_data += ' {0:<40s} + {1:<5s} +\n'.format('-' * 40, '-' * 5)
for scene_id, scene_label in enumerate(stats['scenes']['scene_label_list']):
string_data += ' {0:<40s} | {1:5d} |\n'.format(scene_label,
int(stats['scenes']['count'][scene_id]))
if 'events' in stats and 'event_label_list' in stats['events'] and stats['events']['event_label_list']:
string_data += '\n === Event statistics ===\n'
string_data += ' {0:<40s} | {1:<5s} | {2:<10s} | {3:<10s} |\n'.format(
'Event label',
'Count',
'Total Len',
'Avg Len'
)
string_data += ' {0:<40s} + {1:<5s} + {2:10s} + {3:10s} +\n'.format(
'-'*40,
'-'*5,
'-'*10,
'-'*10
)
for event_id, event_label in enumerate(stats['events']['event_label_list']):
string_data += ' {0:<40s} | {1:5d} | {2:10.2f} | {3:10.2f} |\n'.format(
(event_label[:38] + '..') if len(event_label) > 38 else event_label,
int(stats['events']['count'][event_id]),
stats['events']['length'][event_id],
stats['events']['avg_length'][event_id]
)
if 'tags' in stats and 'tag_list' in stats['tags'] and stats['tags']['tag_list']:
string_data += '\n === Tag statistics ===\n'
string_data += ' {0:<40s} | {1:<5s} |\n'.format('Tag', 'Count')
string_data += ' {0:<40s} + {1:<5s} +\n'.format('-' * 40, '-' * 5)
for tag_id, tag in enumerate(stats['tags']['tag_list']):
string_data += ' {0:<40s} | {1:5d} |\n'.format(tag, int(stats['tags']['count'][tag_id]))
return string_data
[docs] def filter(self, filename=None, file_list=None, scene_label=None, event_label=None, tag=None):
"""Filter content
Parameters
----------
filename : str, optional
Filename to be matched
Default value "None"
file_list : list, optional
List of filenames to be matched
Default value "None"
scene_label : str, optional
Scene label to be matched
Default value "None"
event_label : str, optional
Event label to be matched
Default value "None"
tag : str, optional
Tag to be matched
Default value "None"
Returns
-------
MetaDataContainer
"""
data = []
for item in self:
matched = False
if filename and item.file == filename:
matched = True
if file_list and item.file in file_list:
matched = True
if scene_label and item.scene_label == scene_label:
matched = True
if event_label and item.event_label == event_label:
matched = True
if tag and item.tags and tag in item.tags:
matched = True
if matched:
data.append(copy.deepcopy(item))
return MetaDataContainer(data)
[docs] def filter_time_segment(self, onset=None, offset=None):
"""Filter time segment
Parameters
----------
onset : float > 0.0
Segment start
Default value "None"
offset : float > 0.0
Segment end
Default value "None"
Returns
-------
MetaDataContainer
"""
data = []
for item in self:
matched = False
if onset and not offset and item.onset >= onset:
matched = True
elif not onset and offset and item.offset <= offset:
matched = True
elif onset and offset and item.onset >= onset and item.offset <= offset:
matched = True
if matched:
data.append(item)
return MetaDataContainer(data)
[docs] def process_events(self, minimum_event_length=None, minimum_event_gap=None):
"""Process event content
Makes sure that minimum event length and minimum event gap conditions are met per event label class.
Parameters
----------
minimum_event_length : float > 0.0
Minimum event length in seconds, shorten than given are filtered out from the output.
(Default value=None)
minimum_event_gap : float > 0.0
Minimum allowed gap between events in seconds from same event label class.
(Default value=None)
Returns
-------
MetaDataContainer
"""
processed_events = []
for filename in self.file_list:
for event_label in self.unique_event_labels:
current_events_items = self.filter(filename=filename, event_label=event_label)
# Sort events
current_events_items = sorted(current_events_items, key=lambda k: k.event_onset)
# 1. remove short events
event_results_1 = []
for event in current_events_items:
if minimum_event_length is not None:
if event.offset - event.onset >= minimum_event_length:
event_results_1.append(event)
else:
event_results_1.append(event)
if len(event_results_1) and minimum_event_gap is not None:
# 2. remove small gaps between events
event_results_2 = []
# Load first event into event buffer
buffered_event_onset = event_results_1[0].onset
buffered_event_offset = event_results_1[0].offset
for i in range(1, len(event_results_1)):
if event_results_1[i].onset - buffered_event_offset > minimum_event_gap:
# The gap between current event and the buffered is bigger than minimum event gap,
# store event, and replace buffered event
current_event = copy.deepcopy(event_results_1[i])
current_event.onset = buffered_event_onset
current_event.offset = buffered_event_offset
event_results_2.append(current_event)
buffered_event_onset = event_results_1[i].onset
buffered_event_offset = event_results_1[i].offset
else:
# The gap between current event and the buffered is smaller than minimum event gap,
# extend the buffered event until the current offset
buffered_event_offset = event_results_1[i].offset
# Store last event from buffer
current_event = copy.copy(event_results_1[len(event_results_1) - 1])
current_event.onset = buffered_event_onset
current_event.offset = buffered_event_offset
event_results_2.append(current_event)
processed_events += event_results_2
else:
processed_events += event_results_1
return MetaDataContainer(processed_events)
[docs] def add_time_offset(self, offset):
"""Add time offset to event onset and offset timestamps
Parameters
----------
offset : float > 0.0
Offset to be added to the onset and offsets
Returns
-------
"""
for item in self:
if item.onset:
item.onset += offset
if item.offset:
item.offset += offset
return self
[docs] def remove_field(self, field_name):
"""Remove field from meta items
Parameters
----------
field_name : str
Field name
Returns
-------
"""
for item in self:
if field_name in item:
del item[field_name]
return self
[docs] def slice_field(self, field_name):
"""Slice field values into list
Parameters
----------
field_name : str
Field name
Returns
-------
"""
data = []
for item in self:
if field_name in item:
data.append(item[field_name])
else:
data.append(None)
return data
@property
def file_list(self):
"""List of unique files in the container
Returns
-------
list
"""
files = {}
for item in self:
files[item.file] = item.file
return sorted(files.values())
@property
def event_count(self):
"""Get number of events
Returns
-------
event_count: integer > 0
"""
return len(self)
@property
def scene_label_count(self):
"""Get number of unique scene labels
Returns
-------
scene_label_count: float >= 0
"""
return len(self.unique_scene_labels)
@property
def event_label_count(self):
"""Get number of unique event labels
Returns
-------
event_label_count: float >= 0
"""
return len(self.unique_event_labels)
@property
def unique_event_labels(self):
"""Get unique event labels
Returns
-------
labels: list, shape=(n,)
Unique labels in alphabetical order
"""
labels = []
for item in self:
if 'event_label' in item and item['event_label'] not in labels:
labels.append(item['event_label'])
labels.sort()
return labels
@property
def unique_scene_labels(self):
"""Get unique scene labels
Returns
-------
labels: list, shape=(n,)
Unique labels in alphabetical order
"""
labels = []
for item in self:
if 'scene_label' in item and item['scene_label'] not in labels:
labels.append(item['scene_label'])
labels.sort()
return labels
@property
def unique_tags(self):
"""Get unique tags
Returns
-------
tags: list, shape=(n,)
Unique tags in alphabetical order
"""
tags = []
for item in self:
if 'tags' in item:
for tag in item['tags']:
if tag not in tags:
tags.append(tag)
tags.sort()
return tags
@property
def max_offset(self):
"""Find the offset (end-time) of last event
Returns
-------
max_offset: float > 0
maximum offset
"""
max_offset = 0
for item in self:
if 'offset' in item and item.offset > max_offset:
max_offset = item.offset
return max_offset
def _stats(self, event_label_list=None, scene_label_list=None, tag_list=None):
"""Statistics of the container content
Parameters
----------
event_label_list : list of str
List of event labels to be included in the statistics. If none given, all unique labels used
Default value "None"
scene_label_list : list of str
List of scene labels to be included in the statistics. If none given, all unique labels used
Default value "None"
tag_list : list of str
List of tags to be included in the statistics. If none given, all unique tags used
Default value "None"
Returns
-------
dict
"""
if event_label_list is None:
event_label_list = self.unique_event_labels
if scene_label_list is None:
scene_label_list = self.unique_scene_labels
if tag_list is None:
tag_list = self.unique_tags
scene_counts = numpy.zeros(len(scene_label_list))
for scene_id, scene_label in enumerate(scene_label_list):
for item in self:
if item.scene_label and item.scene_label == scene_label:
scene_counts[scene_id] += 1
event_lengths = numpy.zeros(len(event_label_list))
event_counts = numpy.zeros(len(event_label_list))
for event_id, event_label in enumerate(event_label_list):
for item in self:
if item.onset is not None and item.offset is not None and item.event_label == event_label:
event_lengths[event_id] += item.offset - item.onset
event_counts[event_id] += 1
tag_counts = numpy.zeros(len(tag_list))
for tag_id, tag in enumerate(tag_list):
for item in self:
if item.tags and tag in item.tags:
tag_counts[tag_id] += 1
return {
'scenes': {
'count': scene_counts,
'scene_label_list': scene_label_list,
},
'events': {
'length': event_lengths,
'count': event_counts,
'avg_length': event_lengths/event_counts,
'event_label_list': event_label_list
},
'tags': {
'count': tag_counts,
'tag_list': tag_list,
}
}
[docs] def load(self, filename=None):
"""Load event list from delimited text file (csv-formated)
Preferred delimiter is tab, however, other delimiters are supported automatically (they are sniffed automatically).
Supported input formats:
- [file(string)]
- [file(string)][scene_label(string)]
- [file(string)][scene_label(string)][identifier(string)]
- [event_onset (float)][tab][event_offset (float)]
- [event_onset (float)][tab][event_offset (float)][tab][event_label (string)]
- [file(string)][event_onset (float)][tab][event_offset (float)][tab][event_label (string)]
- [file(string)[tab][scene_label(string)][tab][event_onset (float)][tab][event_offset (float)][tab][event_label (string)]
- [file(string)[tab][scene_label(string)][tab][event_onset (float)][tab][event_offset (float)][tab][event_label (string)][tab][source(single character)]
- [file(string)[tab][scene_label(string)][tab][event_onset (float)][tab][event_offset (float)][tab][event_label (string)][tab][source(string)]
- [file(string)[tab][tags (list of strings, delimited with ;)]
- [file(string)[tab][scene_label(string)][tab][tags (list of strings, delimited with ;)]
- [file(string)[tab][scene_label(string)][tab][tags (list of strings, delimited with ;)][tab][event_onset (float)][tab][event_offset (float)]
Parameters
----------
filename : str
Path to the event list in text format (csv). If none given, one given for class constructor is used.
Default value "None"
Returns
-------
data : list of event dicts
List containing event dicts
"""
if filename:
self.filename = filename
self.format = self.detect_file_format(self.filename)
if not os.path.isfile(self.filename):
raise IOError('{0}: File not found [{1}]'.format(self.__class__.__name__, self.filename))
data = []
field_validator = FieldValidator()
with open(self.filename, 'rtU') as f:
for row in csv.reader(f, delimiter=self._delimiter):
if row:
row_format = []
for item in row:
row_format.append(field_validator.process(item))
if row_format == ['audiofile']:
# Format: [file]
data.append(
self.item_class({
'file': row[0],
})
)
elif row_format == ['number', 'number']:
# Format: [event_onset event_offset]
data.append(
self.item_class({
'onset': float(row[0]),
'offset': float(row[1])
})
)
elif row_format == ['audiofile', 'string']:
# Format: [file scene_label]
data.append(
self.item_class({
'file': row[0],
'scene_label': row[1],
})
)
elif row_format == ['audiofile', 'string', 'string']:
# Format: [file scene_label identifier]
data.append(
self.item_class({
'file': row[0],
'scene_label': row[1],
'identifier': row[2],
})
)
elif row_format == ['number', 'number', 'string']:
# Format: [onset offset event_label]
data.append(
self.item_class({
'onset': float(row[0]),
'offset': float(row[1]),
'event_label': row[2]
})
)
elif row_format == ['audiofile', 'number', 'number', 'string']:
# Format: [file onset offset event_label]
data.append(
self.item_class({
'file': row[0],
'onset': float(row[1]),
'offset': float(row[2]),
'event_label': row[3]
})
)
elif row_format == ['file', 'string', 'number', 'number']:
# Format: [file event_label onset offset]
data.append(
self.item_class({
'file': row[0],
'onset': float(row[2]),
'offset': float(row[3]),
'event_label': row[1]
})
)
elif row_format == ['audiofile', 'number', 'number', 'string', 'string']:
# Format: [file onset offset event_label identifier]
data.append(
self.item_class({
'file': row[0],
'onset': float(row[1]),
'offset': float(row[2]),
'event_label': row[3],
'identifier': row[4],
})
)
elif row_format == ['audiofile', 'string', 'number', 'number', 'string']:
# Format: [file scene_label onset offset event_label]
data.append(
self.item_class({
'file': row[0],
'scene_label': row[1],
'onset': float(row[2]),
'offset': float(row[3]),
'event_label': row[4]
})
)
elif row_format == ['audiofile', 'string', 'number', 'number', 'string', 'alpha1']:
# Format: [file scene_label onset offset event_label source_label]
data.append(
self.item_class({
'file': row[0],
'scene_label': row[1],
'onset': float(row[2]),
'offset': float(row[3]),
'event_label': row[4],
'source_label': row[5]
})
)
elif row_format == ['audiofile', 'string', 'number', 'number', 'string', 'string']:
# Format: [file scene_label onset offset event_label source_label]
data.append(
self.item_class({
'file': row[0],
'scene_label': row[1],
'onset': float(row[2]),
'offset': float(row[3]),
'event_label': row[4],
'source_label': row[5]
})
)
elif row_format == ['audiofile', 'string', 'number', 'number', 'string', 'alpha1', 'string']:
# Format: [file scene_label onset offset event_label source_label identifier]
data.append(
self.item_class({
'file': row[0],
'scene_label': row[1],
'onset': float(row[2]),
'offset': float(row[3]),
'event_label': row[4],
'source_label': row[5],
'identifier': row[6]
})
)
elif row_format == ['audiofile', 'string', 'number', 'number', 'string', 'string', 'string']:
# Format: [file scene_label onset offset event_label source_label identifier]
data.append(
self.item_class({
'file': row[0],
'scene_label': row[1],
'onset': float(row[2]),
'offset': float(row[3]),
'event_label': row[4],
'source_label': row[5],
'identifier': row[6]
})
)
elif row_format == ['audiofile', 'string', 'list']:
# Format: [file scene_label tags]
data.append(
self.item_class({
'file': row[0],
'scene_label': row[1],
'tags': row[2]
})
)
elif row_format == ['audiofile', 'list']:
# Format: [file tags]
data.append(
self.item_class({
'file': row[0],
'tags': row[1]
})
)
elif row_format == ['audiofile', 'string', 'number', 'number', 'list']:
# Format: [file scene_label onset offset tags]
data.append(
self.item_class({
'file': row[0],
'scene_label': row[1],
'onset': float(row[2]),
'offset': float(row[3]),
'tags': row[4]
})
)
else:
message = '{0}: Unknown row format [{1}]'.format(self.__class__.__name__, row)
logging.getLogger(self.__class__.__name__,).exception(message)
raise IOError(message)
list.__init__(self, data)
return self
[docs] def save(self, filename=None, delimiter='\t'):
"""Save content to csv file
Parameters
----------
filename : str
Filename. If none given, one given for class constructor is used.
Default value "None"
delimiter : str
Delimiter to be used
Default value "\t"
Returns
-------
self
"""
if filename:
self.filename = filename
f = open(self.filename, 'wt')
try:
writer = csv.writer(f, delimiter=delimiter)
for item in self:
writer.writerow(item.get_list())
finally:
f.close()
return self
[docs] def event_stat_counts(self):
"""Event count statistics
Returns
-------
dict
"""
stats = {}
for event_label in self.unique_event_labels:
stats[event_label] = len(self.filter(event_label=event_label))
return stats
[docs] def event_roll(self, label_list=None, time_resolution=0.01, label='event_label'):
"""Event roll
Event roll is binary matrix indicating event activity withing time segment defined by time_resolution.
Parameters
----------
label_list : list
List of labels in correct order
time_resolution : float > 0.0
Time resolution used when converting event into event roll.
Default value "0.01"
label : str
Meta data field used to create event roll
Default value "event_label"
Returns
-------
numpy.ndarray [shape=(math.ceil(data_length * 1 / time_resolution), amount of classes)]
"""
max_offset_value = self.max_offset
if label_list is None:
label_list = self.unique_event_labels
# Initialize event roll
event_roll = numpy.zeros((int(math.ceil(max_offset_value * 1.0 / time_resolution)), len(label_list)))
# Fill-in event_roll
for item in self:
pos = label_list.index(item[label])
onset = int(math.floor(item.onset * 1.0 / time_resolution))
offset = int(math.ceil(item.offset * 1.0 / time_resolution))
event_roll[onset:offset, pos] = 1
return event_roll
[docs]class EventRoll(object):
[docs] def __init__(self, metadata_container, label_list=None, time_resolution=0.01, label='event_label', length=None):
"""Event roll
Event roll is binary matrix indicating event activity withing time segment defined by time_resolution.
Parameters
----------
metadata_container : MetaDataContainer
Meta data
label_list : list
List of labels in correct order
time_resolution : float > 0.0
Time resolution used when converting event into event roll.
Default value "0.01"
label : str
Meta data field used to create event roll
Default value "event_label"
length : int, optional
length of event roll, if none given max offset of the meta data is used.
Default value "None"
"""
self.metadata_container = metadata_container
if label_list is None:
self.label_list = metadata_container.unique_event_labels
else:
self.label_list = label_list
self.time_resolution = time_resolution
self.label = label
if length is None:
self.max_offset_value = metadata_container.max_offset
else:
self.max_offset_value = length
# Initialize event roll
self.event_roll = numpy.zeros(
(int(math.ceil(self.max_offset_value * 1.0 / self.time_resolution)), len(self.label_list))
)
# Fill-in event_roll
for item in self.metadata_container:
if item.onset is not None and item.offset is not None:
if item[self.label]:
pos = self.label_list.index(item[self.label])
onset = int(numpy.floor(item.onset * 1.0 / self.time_resolution))
offset = int(numpy.ceil(item.offset * 1.0 / self.time_resolution))
if offset > self.event_roll.shape[0]:
# we have event which continues beyond max_offset_value
offset = self.event_roll.shape[0]
if onset <= self.event_roll.shape[0]:
# We have event inside roll
self.event_roll[onset:offset, pos] = 1
@property
def roll(self):
"""Event roll
Returns
-------
event_roll: np.ndarray, shape=(m,k)
Event roll
"""
return self.event_roll
[docs] def pad(self, length):
"""Pad event roll's length to given length
Parameters
----------
length : int
Length to be padded
Returns
-------
event_roll: np.ndarray, shape=(m,k)
Padded event roll
"""
if length > self.event_roll.shape[0]:
padding = numpy.zeros((length-self.event_roll.shape[0], self.event_roll.shape[1]))
self.event_roll = numpy.vstack((self.event_roll, padding))
elif length < self.event_roll.shape[0]:
self.event_roll = self.event_roll[0:length, :]
return self.event_roll
[docs] def plot(self):
"""Plot Event roll
Returns
-------
None
"""
import matplotlib.pyplot as plt
plt.matshow(self.event_roll.T, cmap=plt.cm.gray, interpolation='nearest', aspect='auto')
plt.show()
class FieldValidator(object):
audio_file_extensions = ['wav', 'flac', 'mp3', 'raw']
def process(self, field):
if self.is_audiofile(field):
return 'audiofile'
elif self.is_number(field):
return 'number'
elif self.is_list(field):
return 'list'
elif self.is_alpha(field, length=1):
return 'alpha1'
elif self.is_alpha(field, length=2):
return 'alpha2'
else:
return 'string'
def is_number(self, field):
"""Test for number field
Parameters
----------
field
Returns
-------
bool
"""
try:
float(field) # for int, long and float
except ValueError:
try:
complex(field) # for complex
except ValueError:
return False
return True
def is_audiofile(self, field):
"""Test for audio file field
Parameters
----------
field
Returns
-------
bool
"""
if field.endswith(tuple(self.audio_file_extensions)):
return True
else:
return False
def is_list(self, field):
"""Test for list field, valid delimiters [ : ; #]
Parameters
----------
field
Returns
-------
bool
"""
if len(re.split(r'[;|:|#"]+', field)) > 1:
return True
else:
return False
def is_alpha(self, field, length=1):
"""Test for alpha field with length 1
Parameters
----------
field
Returns
-------
bool
"""
if len(field) == length and field.isalpha():
return True
else:
return False
[docs]class ProbabilityItem(dict):
[docs] def __init__(self, *args, **kwargs):
"""Constructor
Parameters
----------
dict
"""
dict.__init__(self, *args)
# Process fields
if 'file' in self:
# Keep file paths in unix format even under Windows
self['file'] = posix_path(self['file'])
if 'timestamp' in self:
self['timestamp'] = float(self['timestamp'])
if 'label' in self and self.label:
self['label'] = self['label'].strip()
if self['label'].lower() == 'none':
self['label'] = None
if 'probability' in self:
self['probability'] = float(self['probability'])
def __str__(self):
if len(self.file) > 40:
file_string = '...'+self.file[-37:]
else:
file_string = self.file
string_data = ' {0:<40s} |'.format(
file_string if file_string is not None else '---'
)
if self.timestamp is not None:
string_data += ' {:10.8f} |'.format(self.timestamp)
else:
string_data += ' {:>10s} |'.format('---')
string_data += ' {:<22s} |'.format(self.label if self.label is not None else '---')
if self.probability is not None:
string_data += ' {:18.8f} |'.format(self.probability)
else:
string_data += ' {:>18s} |'.format('---')
return string_data
@staticmethod
def get_header():
string_data = ' {0:<40s} | {1:<10s} | {2:<22s} | {3:<18s} |\n'.format(
'File',
'Timestamp',
'Label',
'Probability'
)
string_data += ' {0:<40s} + {1:<10s} + {2:<22s} + {3:<18s} +\n'.format(
'-' * 40,
'-' * 10,
'-' * 22,
'-' * 18
)
return string_data
@property
def file(self):
"""Filename
Returns
-------
str or None
filename
"""
if 'file' in self:
return self['file']
else:
return None
@file.setter
def file(self, value):
# Keep file paths in unix format even under Windows
self['file'] = posix_path(value)
@property
def label(self):
"""Label
Returns
-------
str or None
label
"""
if 'label' in self:
return self['label']
else:
return None
@label.setter
def label(self, value):
self['label'] = value
@property
def timestamp(self):
"""timestamp
Returns
-------
float or None
timestamp
"""
if 'timestamp' in self:
return self['timestamp']
else:
return None
@timestamp.setter
def timestamp(self, value):
self['timestamp'] = float(value)
@property
def probability(self):
"""probability
Returns
-------
float or None
probability
"""
if 'probability' in self:
return self['probability']
else:
return None
@probability.setter
def probability(self, value):
self['probability'] = float(value)
@property
def id(self):
"""Unique item identifier
ID is formed by taking MD5 hash of the item data.
Returns
-------
id : str
Unique item id
"""
string = ''
if self.file:
string += self.file
if self.timestamp:
string += '{:8.4f}'.format(self.timestamp)
if self.label:
string += self.label
if self.probability:
string += '{:8.4f}'.format(self.probability)
return get_parameter_hash(string)
[docs] def get_list(self):
"""Return item values in a list with specified order.
Returns
-------
list
"""
fields = list(self.keys())
# Select only valid fields
valid_fields = ['file', 'label', 'probability', 'timestamp']
fields = list(set(fields).intersection(valid_fields))
fields.sort()
if fields == ['file', 'label', 'probability']:
return [self.file, self.label, self.probability]
elif fields == ['file', 'label', 'probability', 'timestamp']:
return [self.file, self.timestamp, self.label, self.probability]
else:
message = '{name}: Invalid meta data format [{format}]'.format(
name=self.__class__.__name__,
format=str(fields)
)
raise ValueError(message)
[docs]class ProbabilityContainer(ListFile, MetaMixin):
valid_formats = ['csv', 'txt']
[docs] def __init__(self, *args, **kwargs):
super(ProbabilityContainer, self).__init__(*args, **kwargs)
self.item_class = ProbabilityItem
# Convert all items in the list to ProbabilityItem
for item_id in range(0, len(self)):
if not isinstance(self[item_id], self.item_class):
self[item_id] = self.item_class(self[item_id])
def __add__(self, other):
return self.update(super(ProbabilityContainer, self).__add__(other))
@property
def file_list(self):
"""List of unique files in the container
Returns
-------
list
"""
files = {}
for item in self:
files[item.file] = item.file
return sorted(files.values())
@property
def unique_labels(self):
"""Get unique labels
Returns
-------
labels: list, shape=(n,)
Unique labels in alphabetical order
"""
labels = []
for item in self:
if 'label' in item and item['label'] not in labels:
labels.append(item.label)
labels.sort()
return labels
[docs] def filter(self, filename=None, file_list=None, label=None):
"""Filter content
Parameters
----------
filename : str, optional
Filename to be matched
Default value "None"
file_list : list, optional
List of filenames to be matched
Default value "None"
label : str, optional
Label to be matched
Default value "None"
Returns
-------
ProbabilityContainer
"""
data = []
for item in self:
matched = False
if filename and item.file == filename:
matched = True
if file_list and item.file in file_list:
matched = True
if label and item.label == label:
matched = True
if matched:
data.append(copy.deepcopy(item))
return ProbabilityContainer(data)
[docs] def get_string(self):
"""Get content in string format
Parameters
----------
Returns
-------
str
Multi-line string
"""
string_data = ''
string_data += self.item_class().get_header()
for filename in self.file_list:
for i in self.filter(filename=filename):
string_data += str(self.item_class(i)) + '\n'
string_data += '\n'
return string_data
[docs] def load(self, filename=None):
"""Load probability list from delimited text file (csv-formated)
Preferred delimiter is tab, however, other delimiters are supported automatically (they are sniffed automatically).
Supported input formats:
- [file(string)][label(string)][probability(float)]
Parameters
----------
filename : str
Path to the probability list in text format (csv). If none given, one given for class constructor is used.
Default value "None"
Returns
-------
data : list of probability item dicts
List containing probability item dicts
"""
if filename:
self.filename = filename
self.format = self.detect_file_format(self.filename)
if not os.path.isfile(self.filename):
raise IOError('{0}: File not found [{1}]'.format(self.__class__.__name__, self.filename))
data = []
field_validator = FieldValidator()
with open(self.filename, 'rt') as f:
for row in csv.reader(f, delimiter=self._delimiter):
if row:
row_format = []
for item in row:
row_format.append(field_validator.process(item))
if row_format == ['audiofile', 'string', 'number']:
# Format: [file label probability]
data.append(
self.item_class({
'file': row[0],
'label': row[1],
'probability': row[2],
})
)
else:
message = '{0}: Unknown row format [{1}]'.format(self.__class__.__name__, row)
logging.getLogger(self.__class__.__name__,).exception(message)
raise IOError(message)
list.__init__(self, data)
return self
[docs] def save(self, filename=None, delimiter='\t'):
"""Save content to csv file
Parameters
----------
filename : str
Filename. If none given, one given for class constructor is used.
Default value "None"
delimiter : str
Delimiter to be used
Default value "\t"
Returns
-------
self
"""
if filename:
self.filename = filename
f = open(self.filename, 'wt')
try:
writer = csv.writer(f, delimiter=delimiter)
for item in self:
writer.writerow(item.get_list())
finally:
f.close()
return self