# -*- coding: utf-8 -*-
# Copyright (c) 2019 - 2020 Simon Kern
# Copyright (c) 2015 - 2020 Holger Nahrstaedt
# Copyright (c) 2011, 2015, Chris Lee-Messer
# Copyright (c) 2016-2017 The pyedflib Developers
# <https://github.com/holgern/pyedflib>
# See LICENSE for license details.
"""
Created on Tue Jan 7 12:13:47 2020
This file contains high-level functions to work with pyedflib.
Includes
- Reading and writing EDFs
- Anonymizing EDFs
- Comparing EDFs
- Renaming Channels from EDF files
- Dropping Channels from EDF files
@author: skjerns
"""
import os
import numpy as np
import warnings
import pyedflib
from copy import deepcopy
from datetime import datetime
# from . import EdfWriter
# from . import EdfReader
def _get_sample_frequency(signal_header):
# Temporary conditional assignment while we deprecate 'sample_rate' as a channel attribute
# in favor of 'sample_frequency', supporting the use of either to give
# users time to switch to the new interface.
return (signal_header['sample_rate']
if signal_header.get('sample_frequency') is None
else signal_header['sample_frequency'])
def tqdm(iteratable, *args, **kwargs):
"""
These is an optional dependecies that shows a progress bar for some
of the functions, e.g. loading.
install this dependency with `pip install tqdm`
if not installed this is just a pass through iterator.
"""
try:
from tqdm import tqdm as iterator
return iterator(iteratable, *args, **kwargs)
except:
return iteratable
def _parse_date(string):
"""
A simple dateparser that detects common date formats
Parameters
----------
string : str
a date string in format as denoted below.
Returns
-------
datetime.datetime
datetime object of a time.
"""
# some common formats.
formats = ['%Y-%m-%d', '%d-%m-%Y', '%d.%m.%Y', '%Y.%m.%d', '%d %b %Y',
'%Y/%m/%d', '%d/%m/%Y']
for f in formats:
try:
return datetime.strptime(string, f)
except:
pass
try:
import dateparser
return dateparser.parse(string)
except:
print('dateparser is not installed. to convert strings to dates'\
'install via `pip install dateparser`.')
raise ValueError('birthdate must be datetime object or of format'\
' `%d-%m-%Y`, eg. `24-01-2020`')
[docs]def dig2phys(signal, dmin, dmax, pmin, pmax):
"""
converts digital edf values to physical values
Parameters
----------
signal : np.ndarray or int
A numpy array with int values (digital values) or an int.
dmin : int
digital minimum value of the edf file (eg -2048).
dmax : int
digital maximum value of the edf file (eg 2048).
pmin : float
physical maximum value of the edf file (eg -200.0).
pmax : float
physical maximum value of the edf file (eg 200.0).
Returns
-------
physical : np.ndarray or float
converted physical values
"""
m = (pmax-pmin) / (dmax-dmin)
b = pmax / m - dmax
physical = m * (signal + b)
return physical
[docs]def phys2dig(signal, dmin, dmax, pmin, pmax):
"""
converts physical values to digital values
Parameters
----------
signal : np.ndarray or int
A numpy array with int values (digital values) or an int.
dmin : int
digital minimum value of the edf file (eg -2048).
dmax : int
digital maximum value of the edf file (eg 2048).
pmin : float
physical maximum value of the edf file (eg -200.0).
pmax : float
physical maximum value of the edf file (eg 200.0).
Returns
-------
digital : np.ndarray or int
converted digital values
"""
m = (pmax-pmin) / (dmax-dmin)
b = pmax / m - dmax
digital = signal/m - b
return digital
[docs]def read_edf(edf_file, ch_nrs=None, ch_names=None, digital=False, verbose=False):
"""
Convenience function for reading EDF+/BDF data with pyedflib.
Will load the edf and return the signals, the headers of the signals
and the header of the EDF. If all signals have the same sample frequency
will return a numpy array, else a list with the individual signals
Parameters
----------
edf_file : str
link to an edf file.
ch_nrs : list of int, optional
The indices of the channels to read. The default is None.
ch_names : list of str, optional
The names of channels to read. The default is None.
digital : bool, optional
will return the signals as digital values (ADC). The default is False.
verbose : bool, optional
Print progress bar while loading or not. The default is False.
Returns
-------
signals : np.ndarray or list
the signals of the chosen channels contained in the EDF.
signal_headers : list
one signal header for each channel in the EDF.
header : dict
the main header of the EDF file containing meta information.
"""
assert (ch_nrs is None) or (ch_names is None), \
'names xor numbers should be supplied'
if ch_nrs is not None and not isinstance(ch_nrs, list): ch_nrs = [ch_nrs]
if ch_names is not None and \
not isinstance(ch_names, list): ch_names = [ch_names]
with pyedflib.EdfReader(edf_file) as f:
# see which channels we want to load
available_chs = [ch.upper() for ch in f.getSignalLabels()]
n_chrs = f.signals_in_file
# find out which number corresponds to which channel
if ch_names is not None:
ch_nrs = []
for ch in ch_names:
if not ch.upper() in available_chs:
warnings.warn('{} is not in source file (contains {})'\
.format(ch, available_chs))
print('will be ignored.')
else:
ch_nrs.append(available_chs.index(ch.upper()))
# if there ch_nrs is not given, load all channels
if ch_nrs is None: # no numbers means we load all
ch_nrs = range(n_chrs)
# convert negative numbers into positives
ch_nrs = [n_chrs+ch if ch<0 else ch for ch in ch_nrs]
# load headers, signal information and
header = f.getHeader()
signal_headers = [f.getSignalHeaders()[c] for c in ch_nrs]
# add annotations to header
annotations = f.readAnnotations()
annotations = [[s, d, a] for s,d,a in zip(*annotations)]
header['annotations'] = annotations
signals = []
for i,c in enumerate(tqdm(ch_nrs, desc='Reading Channels',
disable=not verbose)):
signal = f.readSignal(c, digital=digital)
signals.append(signal)
# we can only return a np.array if all signals have the same samplefreq
sfreqs = [_get_sample_frequency(shead) for shead in signal_headers]
all_sfreq_same = sfreqs[1:]==sfreqs[:-1]
if all_sfreq_same:
dtype = np.int32 if digital else float
signals = np.array(signals, dtype=dtype)
assert len(signals)==len(signal_headers), 'Something went wrong, lengths'\
' of headers is not length of signals'
del f
return signals, signal_headers, header
[docs]def write_edf(edf_file, signals, signal_headers, header=None, digital=False,
file_type=-1, block_size=1):
"""
Write signals to an edf_file. Header can be generated on the fly with
generic values. EDF+/BDF+ is selected based on the filename extension,
but can be overwritten by setting file_type to pyedflib.FILETYPE_XXX
Parameters
----------
edf_file : np.ndarray or list
where to save the EDF file
signals : list
The signals as a list of arrays or a ndarray.
signal_headers : list of dict
a list with one signal header(dict) for each signal.
See pyedflib.EdfWriter.setSignalHeader..
header : dict
a main header (dict) for the EDF file, see
pyedflib.EdfWriter.setHeader for details.
If no header present, will create an empty header
digital : bool, optional
whether the signals are in digital format (ADC). The default is False.
file_type: int, optional
choose file_type for saving.
EDF = 0, EDF+ = 1, BDF = 2, BDF+ = 3, automatic from extension = -1
block_size : int
set the block size for writing. Should be divisor of signal length
in seconds. Higher values mean faster writing speed, but if it
is not a divisor of the signal duration, it will append zeros.
Can be any value between 1=><=60, -1 will auto-infer the fastest value.
Returns
-------
bool
True if successful, False if failed.
"""
assert header is None or isinstance(header, dict), \
'header must be dictioniary or None'
assert isinstance(signal_headers, list), \
'signal headers must be list'
assert len(signal_headers)==len(signals), \
'signals and signal_headers must be same length'
assert file_type in [-1, 0, 1, 2, 3], \
'file_type must be in range -1, 3'
assert block_size<=60 and block_size>=-1 and block_size!=0, \
'blocksize must be smaller or equal to 60'
# copy objects to prevent accidential changes to mutable objects
header = deepcopy(header)
signal_headers = deepcopy(signal_headers)
if file_type==-1:
ext = os.path.splitext(edf_file)[-1]
if ext.lower() == '.edf':
file_type = pyedflib.FILETYPE_EDFPLUS
elif ext.lower() == '.bdf':
file_type = pyedflib.FILETYPE_BDFPLUS
else:
raise ValueError('Unknown extension {}'.format(ext))
n_channels = len(signals)
# if there is no header, we create one with dummy values
if header is None:
header = {}
default_header = make_header()
default_header.update(header)
header = default_header
# block_size sets the size of each writing block and should be a divisor
# of the length of the signal. If it is not, the remainder of the file
# will be filled with zeros.
signal_duration = len(signals[0]) // _get_sample_frequency(signal_headers[0])
if block_size == -1:
block_size = max([d for d in range(1, 61) if signal_duration % d == 0])
elif signal_duration % block_size != 0:
warnings.warn('Signal length is not dividable by block_size. '+
'The file will have a zeros appended.')
# check dmin, dmax and pmin, pmax dont exceed signal min/max
for sig, shead in zip(signals, signal_headers):
dmin, dmax = shead['digital_min'], shead['digital_max']
pmin, pmax = shead['physical_min'], shead['physical_max']
label = shead['label']
if digital: # exception as it will lead to clipping
assert dmin<=sig.min(), \
'digital_min is {}, but signal_min is {}' \
'for channel {}'.format(dmin, sig.min(), label)
assert dmax>=sig.max(), \
'digital_min is {}, but signal_min is {}' \
'for channel {}'.format(dmax, sig.max(), label)
assert pmin != pmax, \
'physical_min {} should be different from physical_max {}'.format(pmin,pmax)
else: # only warning, as this will not lead to clipping
assert pmin<=sig.min(), \
'phys_min is {}, but signal_min is {} ' \
'for channel {}'.format(pmin, sig.min(), label)
assert pmax>=sig.max(), \
'phys_max is {}, but signal_max is {} ' \
'for channel {}'.format(pmax, sig.max(), label)
frequency_key = 'sample_rate' if shead.get('sample_frequency') is None else 'sample_frequency'
shead[frequency_key] *= block_size
# get annotations, in format [[timepoint, duration, description], [...]]
annotations = header.get('annotations', [])
with pyedflib.EdfWriter(edf_file, n_channels=n_channels, file_type=file_type) as f:
f.setDatarecordDuration(int(100000 * block_size))
f.setSignalHeaders(signal_headers)
f.setHeader(header)
f.writeSamples(signals, digital=digital)
for annotation in annotations:
f.writeAnnotation(*annotation)
del f
return os.path.isfile(edf_file)
[docs]def write_edf_quick(edf_file, signals, sfreq, digital=False):
"""
wrapper for write_pyedf without creating headers.
Use this if you don't care about headers or channel names and just
want to dump some signals with the same sampling freq. to an edf
Parameters
----------
edf_file : str
where to store the data/edf.
signals : np.ndarray
The signals you want to store as numpy array.
sfreq : int
the sampling frequency of the signals.
digital : bool, optional
if the data is present digitally (int) or as mV/uV.The default is False.
Returns
-------
bool
True if successful, else False or raise Error.
"""
signals = np.atleast_2d(signals)
header = make_header(technician='pyedflib-quickwrite')
labels = ['CH_{}'.format(i) for i in range(len(signals))]
pmin, pmax = signals.min(), signals.max()
signal_headers = make_signal_headers(labels, sample_frequency=sfreq,
physical_min=pmin, physical_max=pmax)
return write_edf(edf_file, signals, signal_headers, header, digital=digital)
[docs]def compare_edf(edf_file1, edf_file2, verbose=False):
"""
Loads two edf files and checks whether the values contained in
them are the same. Does not check the header or annotations data.
Mainly to verify that other options (eg anonymization) produce the
same EDF file.
Parameters
----------
edf_file1 : str
edf file 1 to compare.
edf_file2 : str
edf file 2 to compare.
verbose : bool, optional
print progress or not. The default is False.
Returns
-------
bool
True if signals are equal, else raises error.
"""
signals1, shead1, _ = read_edf(edf_file1, digital=True, verbose=verbose)
signals2, shead2, _ = read_edf(edf_file2, digital=True, verbose=verbose)
for i, sigs in enumerate(zip(signals1, signals2)):
s1, s2 = sigs
if np.array_equal(s1, s2): continue # early stopping
s1 = np.abs(s1)
s2 = np.abs(s2)
if np.array_equal(s1, s2): continue # early stopping
close = np.mean(np.isclose(s1, s2))
assert close>0.99, 'Error, digital values of {}'\
' and {} for ch {}: {} are not the same: {:.3f}'.format(
edf_file1, edf_file2, shead1[i]['label'],
shead2[i]['label'], close)
dmin1, dmax1 = shead1[i]['digital_min'], shead1[i]['digital_max']
pmin1, pmax1 = shead1[i]['physical_min'], shead1[i]['physical_max']
dmin2, dmax2 = shead2[i]['digital_min'], shead2[i]['digital_max']
pmin2, pmax2 = shead2[i]['physical_min'], shead2[i]['physical_max']
for i, sigs in enumerate(zip(signals1, signals2)):
s1, s2 = sigs
# convert to physical values, no need to load all data again
s1 = dig2phys(s1, dmin1, dmax1, pmin1, pmax1)
s2 = dig2phys(s2, dmin2, dmax2, pmin2, pmax2)
# compare absolutes in case of inverted signals
if np.array_equal(s1, s2): continue # early stopping
s1 = np.abs(s1)
s2 = np.abs(s2)
if np.array_equal(s1, s2): continue # early stopping
min_dist = np.abs(dig2phys(1, dmin1, dmax1, pmin1, pmax1))
close = np.mean(np.isclose(s1, s2, atol=min_dist))
assert close>0.99, 'Error, physical values of {}'\
' and {} for ch {}: {} are not the same: {:.3f}'.format(
edf_file1, edf_file2, shead1[i]['label'],
shead2[i]['label'], close)
return True
[docs]def drop_channels(edf_source, edf_target=None, to_keep=None, to_drop=None,
verbose=False):
"""
Remove channels from an edf file. Save the file.
For safety reasons, no source files can be overwritten.
Parameters
----------
edf_source : str
The source edf file from which to drop channels.
edf_target : str, optional
Where to save the file.If None, will be edf_source+'dropped.edf'.
The default is None.
to_keep : list, optional
A list of channel names or indices that will be kept.
Strings will always be interpreted as channel names.
'to_keep' will overwrite any droppings proposed by to_drop.
The default is None.
to_drop : list, optional
A list of channel names/indices that should be dropped.
Strings will be interpreted as channel names. The default is None.
verbose : bool, optional
print progress or not. The default is False.
Returns
-------
edf_target : str
the target filename with the dropped channels.
"""
# convert to list if necessary
if isinstance(to_keep, (int, str)): to_keep = [to_keep]
if isinstance(to_drop, (int, str)): to_drop = [to_drop]
# check all parameters are good
assert to_keep is None or to_drop is None,'Supply only to_keep xor to_drop'
if to_keep is not None:
assert all([isinstance(ch, (str, int)) for ch in to_keep]),\
'channels must be int or string'
if to_drop is not None:
assert all([isinstance(ch, (str, int)) for ch in to_drop]),\
'channels must be int or string'
assert os.path.exists(edf_source), \
'source file {} does not exist'.format(edf_source)
assert edf_source!=edf_target, 'For safet, target must not be source file.'
if edf_target is None:
edf_target = os.path.splitext(edf_source)[0] + '_dropped.edf'
if os.path.exists(edf_target):
warnings.warn('Target file will be overwritten')
ch_names = read_edf_header(edf_source)['channels']
# convert to all lowercase for compatibility
ch_names = [ch.lower() for ch in ch_names]
ch_nrs = list(range(len(ch_names)))
if to_keep is not None:
for i,ch in enumerate(to_keep):
if isinstance(ch,str):
ch_idx = ch_names.index(ch.lower())
to_keep[i] = ch_idx
load_channels = list(to_keep) # copy list compatible with py2.7
elif to_drop is not None:
for i,ch in enumerate(to_drop):
if isinstance(ch,str):
ch_idx = ch_names.index(ch.lower())
to_drop[i] = ch_idx
to_drop = [len(ch_nrs)+ch if ch<0 else ch for ch in to_drop]
[ch_nrs.remove(ch) for ch in to_drop]
load_channels = list(ch_nrs)
else:
raise ValueError
signals, signal_headers, header = read_edf(edf_source,
ch_nrs=load_channels,
digital=True, verbose=verbose)
write_edf(edf_target, signals, signal_headers, header, digital=True)
return edf_target
[docs]def anonymize_edf(edf_file, new_file=None,
to_remove=['patientname', 'birthdate'],
new_values=['xxx', ''], verify=False, verbose=False):
"""Anonymize an EDF file by replacing values of header fields.
This function can be used to overwrite all header information that is
patient specific, for example birthdate and patientname. All header fields
can be overwritten this way (i.e., all header.keys() given
_, _, header = read_edf(edf_file, digital=True)).
Parameters
----------
edf_file : str
Filename of an EDF/BDF.
new_file : str | None
The filename of the anonymized file. If None, the input filename
appended with '_anonymized' is used. Defaults to None.
to_remove : list of str
List of attributes to overwrite in the `edf_file`. Defaults to
['patientname', 'birthdate'].
new_values : list of str
List of values used for overwriting the attributes specified in
`to_remove`. Each item in `to_remove` must have a corresponding item
in `new_values`. Defaults to ['xxx', ''].
verify : bool
Compare `edf_file` and `new_file` for equality (i.e., double check that
values are same). Defaults to False
verbose : bool, optional
print progress or not. The default is False.
Returns
-------
bool
True if successful, or if `verify` is False. Raises an error otherwise.
"""
if not len(to_remove) == len(new_values):
raise AssertionError('Each `to_remove` must have one `new_value`')
if new_file is None:
file, ext = os.path.splitext(edf_file)
new_file = file + '_anonymized' + ext
signals, signal_headers, header = read_edf(edf_file, digital=True,
verbose=verbose)
for new_val, attr in zip(new_values, to_remove):
header[attr] = new_val
write_edf(new_file, signals, signal_headers, header, digital=True)
if verify:
compare_edf(edf_file, new_file, verbose=verbose)
return True
[docs]def rename_channels(edf_file, mapping, new_file=None, verbose=False):
"""
A convenience function to rename channels in an EDF file.
Parameters
----------
edf_file : str
an string pointing to an edf file.
mapping : dict
a dictionary with channel mappings as key:value.
eg: {'M1-O2':'A1-O2'}
new_file : str, optional
the new filename. If None will be edf_file + '_renamed'
The default is None.
verbose : bool, optional
print progress or not. The default is False.
Returns
-------
bool
True if successful, False if failed.
"""
header = read_edf_header(edf_file)
channels = header['channels']
if new_file is None:
file, ext = os.path.splitext(edf_file)
new_file = file + '_renamed' + ext
signal_headers = []
signals = []
for ch_nr in tqdm(range(len(channels)), disable=not verbose):
signal, signal_header, _ = read_edf(edf_file, digital=True,
ch_nrs=ch_nr, verbose=verbose)
ch = signal_header[0]['label']
if ch in mapping :
if verbose: print('{} to {}'.format(ch, mapping[ch]))
ch = mapping[ch]
signal_header[0]['label']=ch
else:
if verbose: print('no mapping for {}, leave as it is'.format(ch))
signal_headers.append(signal_header[0])
signals.append(signal.squeeze())
return write_edf(new_file, signals, signal_headers, header, digital=True)
[docs]def change_polarity(edf_file, channels, new_file=None, verify=True,
verbose=False):
"""
Change polarity of certain channels
Parameters
----------
edf_file : str
from which file to change polarity.
channels : list of int
the indices of the channels.
new_file : str, optional
where to save the edf with inverted channels. The default is None.
verify : bool, optional
whether to verify the two edfs for similarity. The default is True.
verbose : str, optional
print progress or not. The default is True.
Returns
-------
bool
True if success.
"""
if new_file is None:
new_file = os.path.splitext(edf_file)[0] + '.edf'
if isinstance(channels, str): channels=[channels]
channels = [c.lower() for c in channels]
signals, signal_headers, header = read_edf(edf_file, digital=True,
verbose=verbose)
for i,sig in enumerate(signals):
label = signal_headers[i]['label'].lower()
if label in channels:
if verbose: print('inverting {}'.format(label))
signals[i] = -sig
write_edf(new_file, signals, signal_headers, header,
digital=True, correct=False, verbose=verbose)
if verify: compare_edf(edf_file, new_file)
return True