import numpy as np
import scipy.signal as signal
from datetime import datetime, timedelta
from typing import Dict
from resistics.time.data import TimeData
from resistics.common.print import generalPrint, arrayToStringInt
from resistics.common.math import intdiv
[docs]def normalise(timeData: TimeData, inplace: bool = True) -> TimeData:
"""Normalise time data
Parameters
----------
timeData : TimeData
timeData to normalise
inplace : bool, optional
Whether to manipulate the data inplace
Returns
-------
TimeData
Normalised time data
"""
if not inplace:
timeData = timeData.copy()
timeData.data = normaliseData(timeData.data)
timeData.addComment("Data normalised")
return timeData
[docs]def normaliseData(data: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
"""Normalise array data
Normalisation is done by dividing by the result of numpy.norm of the data
Parameters
----------
data : Dict
Dictionary with channel as keys and data as values
Returns
-------
Dict
Dictionary with channel as keys and normalised data as values
"""
for c in data:
data[c] = data[c] / np.linalg.norm(data[c])
return data
[docs]def lowPass(timeData: TimeData, cutoff: float, inplace: bool = True) -> TimeData:
"""Lowpass butterworth filter for time data
Parameters
----------
timeData : TimeData
timeData to filter
cutoff : float
Cutoff frequency in Hz
inplace : bool, optional
Whether to manipulate the data inplace
Returns
-------
TimeData
Filtered time data
"""
if not inplace:
timeData = timeData.copy()
timeData.data = lowPassData(timeData.data, timeData.sampleFreq, cutoff)
timeData.addComment("Low pass filter applied with cutoff {} Hz".format(cutoff))
return timeData
[docs]def lowPassData(
data: Dict[str, np.ndarray], sampleFreq: float, cutoff: float, order: int = 5
) -> Dict[str, np.ndarray]:
"""Lowpass butterworth filter for array data
Parameters
----------
data : Dict
Dictionary with channel as keys and data as values
cutoff : float
Cutoff frequency in Hz
Returns
-------
data : Dict
Dictionary with channel as keys and data as values
"""
# create the filter
normalisedCutoff = 2.0 * cutoff / sampleFreq
b, a = signal.butter(order, normalisedCutoff, btype="lowpass", analog=False)
# filter each channel
return filterData(data, b, a)
[docs]def highPass(timeData: TimeData, cutoff: float, inplace: bool = True) -> TimeData:
"""Highpass butterworth filter for time data
Parameters
----------
timeData : TimeData
timeData to filter
cutoff : float
Cutoff frequency in Hz
inplace : bool, optional
Whether to manipulate the data inplace
Returns
-------
TimeData
Filtered time data
"""
if not inplace:
timeData = timeData.copy()
timeData.data = highPassData(timeData.data, timeData.sampleFreq, cutoff)
timeData.addComment("High pass filter applied with cutoff {} Hz".format(cutoff))
return timeData
[docs]def highPassData(
data: Dict[str, np.ndarray], sampleFreq: float, cutoff: float, order: int = 5
):
"""Highpass butterworth filter for array data
Parameters
----------
data : Dict
Dictionary with channel as keys and data as values
cutoff : float
Cutoff frequency in Hz
Returns
-------
data : Dict
Dictionary with channel as keys and data as values
"""
# create the filter
normalisedCutoff = 2.0 * cutoff / sampleFreq
b, a = signal.butter(order, normalisedCutoff, btype="highpass", analog=False)
return filterData(data, b, a)
[docs]def bandPass(
timeData: TimeData, cutoffLow: float, cutoffHigh: float, inplace: bool = True
) -> TimeData:
"""Bandpass butterworth filter for time data
Parameters
----------
timeData : TimeData
timeData to filter
cutoff : float
Cutoff frequency in Hz
inplace : bool, optional
Whether to manipulate the data inplace
Returns
-------
TimeData
Filtered time data
"""
if not inplace:
timeData = timeData.copy()
timeData.data = bandPassData(
timeData.data, timeData.sampleFreq, cutoffLow, cutoffHigh
)
timeData.addComment(
"Band pass filter applied with cutoffs {} Hz and {} Hz".format(
cutoffLow, cutoffHigh
)
)
return timeData
[docs]def bandPassData(
data: Dict[str, np.ndarray],
sampleFreq: float,
cutoffLow: float,
cutoffHigh: float,
order: int = 5,
):
"""Bandpass butterworth filter for array data
Parameters
----------
data : Dict
Dictionary with channel as keys and data as values
cutoff : float
Cutoff frequency in Hz
Returns
-------
data : Dict
Dictionary with channel as keys and data as values
"""
# create the filter
normalisedCutoffLow = 2.0 * cutoffLow / sampleFreq
normalisedCutoffHigh = 2.0 * cutoffHigh / sampleFreq
b, a = signal.butter(
order,
[normalisedCutoffLow, normalisedCutoffHigh],
btype="bandpass",
analog=False,
)
return filterData(data, b, a)
[docs]def filterData(data: Dict[str, np.ndarray], b, a) -> Dict[str, np.ndarray]:
"""Butterworth filter for array data
Parameters
----------
data : Dict
Dictionary with channel as keys and data as values
b : array_like
The numerator coefficient vector of the filter
a : array_like
The denominator coefficient vector of the filter. If a[0] is not 1, then both a and b are normalized by a[0]
Returns
-------
data : Dict
Dictionary with channel as keys and data as values
"""
filteredData = {}
for c in data:
filteredData[c] = signal.filtfilt(b, a, data[c], method="gust", irlen=500)
return filteredData
[docs]def notchFilter(timeData: TimeData, notch: float, inplace: bool = True) -> TimeData:
"""Bandpass butterworth filter for time data
Parameters
----------
timeData : TimeData
timeData to filter
notch : float
Frequency to notch filter in Hz
inplace : bool, optional
Whether to manipulate the data inplace
Returns
-------
TimeData
Filtered time data
"""
if not inplace:
timeData = timeData.copy()
timeData.data = notchFilterData(
timeData.data, timeData.sampleFreq, notch, notch / 5.0
)
timeData.addComment("Notch filter applied at {} Hz".format(notch))
return timeData
[docs]def notchFilterData(
data: Dict[str, np.ndarray], sampleFreq: float, notch: float, band: float
) -> Dict[str, np.ndarray]:
"""Notch filter array data
Parameters
----------
data : Dict
Dictionary with channel as keys and data as values
sampleFreq : float
Sampling frequency in Hz
notch : float
Frequency to notch in Hz
band : float
The bandwidth around the centerline freqency that you wish to filter
Returns
-------
data : Dict
Dictionary with channel as keys and data as values
"""
# set parameters
nyq = sampleFreq / 2.0
low = notch - band / 2.0
high = notch + band / 2.0
low = low / nyq
high = high / nyq
# filter
order = 2
filter_type = "bessel"
filteredData = {}
for c in data:
b, a = signal.iirfilter(
order, [low, high], btype="bandstop", analog=False, ftype=filter_type
)
filteredData[c] = signal.lfilter(b, a, data[c])
return filteredData
[docs]def resample(timeData: TimeData, resampFreq: float, inplace: bool = True) -> TimeData:
"""Resample time data
Parameters
----------
timeData : TimeData
timeData to filter
resampFreq : float
The frequency to resample to
inplace : bool, optional
Whether to manipulate the data inplace
Returns
-------
TimeData
Filtered time data
"""
origFreq = timeData.sampleFreq
if not inplace:
timeData = timeData.copy()
timeData.data = resampleData(timeData.data, timeData.sampleFreq, resampFreq)
# update the time info
timeData.sampleFreq = resampFreq
timeData.numSamples = timeData.data[timeData.chans[0]].size
timeData.stopTime = timeData.startTime + timedelta(
seconds=(1.0 / timeData.sampleFreq) * (timeData.numSamples - 1)
)
timeData.addComment(
"Time data resampled from {:.6f} Hz to {:.6f} Hz".format(origFreq, resampFreq)
)
return timeData
[docs]def resampleData(
data: Dict[str, np.ndarray], sampleFreq: float, sampleFreqNew: float
) -> Dict[str, np.ndarray]:
"""Resample array data
Resample the data using the polyphase method which does not assume periodicity
Calculate the upsample and then the downsampling rate and using polyphase filtering, the final sample rate is:
(up / down) * original sample rate
Therefore, to get a sampling frequency of sampleFreqNew, want:
(sampleFreqNew / sampleFreq) * sampleFreq
Use the fractions library to get up and down as integers which they are required to be.
Parameters
----------
data : Dict
Dictionary with channel as keys and data as values
sampleFreq : float
The current sampling frequency in Hz
sampleFreqNew : float
The sampling frequency in Hz to resample to
Returns
-------
data : Dict
Dictionary with channel as keys and data as values
"""
from fractions import Fraction
frac = Fraction(
1.0 / sampleFreq
).limit_denominator() # because this is most probably a float
frac = Fraction(frac * int(sampleFreqNew))
frac.limit_denominator()
# otherwise, normal polyphase filtering
resampleData = {}
for c in data:
resampleData[c] = signal.resample_poly(
data[c], frac.numerator, frac.denominator
)
return resampleData
[docs]def downsampleData(
data: Dict[str, np.ndarray], downsampleFactor: int
) -> Dict[str, np.ndarray]:
"""Decimate array data
Parameters
----------
data : Dict
Dictionary with channel as keys and data as values
downsampleFactor : int
The factor to downsample the data by
Returns
-------
data : Dict
Dictionary with channel as keys and data as values
"""
# if downsampleFactor is 1, nothing to do
if downsampleFactor == 1:
return data
# a downsample factor should not be greater than 13
# hence factorise downsampleFactors that are greater than this
if downsampleFactor > 13:
downsamples = factorise(downsampleFactor)
generalPrint(
"Decimation",
"Downsample factor {} greater than 13. Downsampling will be performed in multiple steps of {}".format(
downsampleFactor, arrayToStringInt(downsamples)
),
)
else:
downsamples = [downsampleFactor]
# downsample for each factor in downsamples
for factor in downsamples:
for c in data:
data[c] = signal.decimate(data[c], factor, zero_phase=True)
return data
[docs]def factorise(number: int):
"""Factorise a number to avoid too large a downsample factor
Parameters
----------
number : int
The number to factorise
Returns
-------
List[int]
The downsampling factors to use
Notes
-----
There's a few pathological cases here that are being ignored. For example, what if the downsample factor is the product of two primes greater than 13.
"""
import primefac
factors = list(primefac.primefac(number))
downsamples = []
val = 1
for f in factors:
test = val * f
if test > 13:
downsamples.append(val)
val = 1
val = val * f
# logic: on the last value of f, val*f is tested
# if this is greater than 13, the previous val is added, which leaves one factor leftover
# if not greater than 13, then this is not added either
# so append the last value. the only situation in which this fails is the last factor itself is over 13.
downsamples.append(val)
return downsamples