import numpy as np
import scipy.signal as signal
from datetime import timedelta
from typing import List
from resistics.common.base import ResisticsBase
from resistics.common.math import intdiv
from resistics.common.print import (
generalPrint,
warningPrint,
blockPrint,
arrayToStringInt,
)
from resistics.config.io import loadConfig
from resistics.decimate.parameters import DecimationParameters
from resistics.time.data import TimeData
from resistics.time.filter import downsampleData
[docs]class Decimator(ResisticsBase):
"""Decimate time data
Decimates time data by factors until the minimum number of required samples is reached. When a downsample factor is too large, downsampling is performed in multiple steps to maintain accuracy of result.
Attributes
----------
timeData : TimeData
timeData object to decimate
sampleFreq : float
Sampling frequency of time data in Hz
chans : List[str]
Channels in time data
numSamples : int
Number of samples in timeData
decParams : DecimationParams
A DecimationParams object holding decimation information
minSamples : int
Minimum required samples to decimate
level : int
Current decimation level
maxDownSampleFactor : int
Max allowable downsampling in one go. Downsampling becomes less accurate at large downsample factors
Methods
-------
__init__(timeData, decParams)
Initialise Decimator with a TimeData object and DecimationParams object
incrementLevel()
Downsample the timeData to the next decimation level
downsample(downsampleFactor)
Do the downsampling
printList()
Class status returned as list of strings
"""
def __init__(self, timeData: TimeData, decParams: DecimationParameters) -> None:
"""Initialise with timeData and decimation parameters
Parameters
----------
timeData : TimeData
The time data to decimate
decParams : DecimationParams
Decimation parameters for performing the decimation
"""
self.timeData: TimeData = timeData
self.sampleFreq: float = timeData.sampleFreq * 1.0
self.chans: List = timeData.chans
self.numSamples: int = timeData.numSamples
self.decParams: DecimationParameters = decParams
config = loadConfig()
self.minSamples: int = config["Decimation"]["minsamples"]
self.level: int = -1
self.maxDownsampleFactor: int = 8
[docs] def incrementLevel(self) -> bool:
"""Downsample to the next decimation level
Returns
-------
out : bool
True if downsampling completed successfully. False otherwise
Notes
-----
When the downsampling factor is too large, downsampling is performed in multiple steps. Downsampling will become increasingly inaccurate using the scipy routine when factor is too large
"""
# increment level, 0 is the first level
self.level = self.level + 1
downsampleFactor = self.decParams.getIncrementalFactor(self.level)
# if downsample factor is greater than maxDownsampleFactor, downsample in multiple steps
numDownsamples = 1
downsampleList = [downsampleFactor]
if downsampleFactor > self.maxDownsampleFactor:
# this should give an integer
numDownsamples = intdiv(downsampleFactor, self.maxDownsampleFactor)
downsampleList = [self.maxDownsampleFactor, numDownsamples]
# print info
self.printText(
"Downsample factor of {:d} greater than max decimation factor {:d}.".format(
downsampleFactor, self.maxDownsampleFactor
)
)
self.printText(
"Downsampling in multiple decimations given by factors: {}".format(
arrayToStringInt(downsampleList)
)
)
for iDS in range(0, numDownsamples):
check = self.downsample(downsampleList[iDS])
if not check: # check outcome of decimation
return False
# otherwise, everything ok, update class vars and return True
self.sampleFreq = self.timeData.sampleFreq
self.numSamples = self.timeData.numSamples
return True
[docs] def downsample(self, downsampleFactor: int) -> bool:
"""Downsample time data
Parameters
----------
downsampleFactor : int
Downsampling factor
Returns
-------
out : bool
True if downsampling completed successfully. False otherwise
Notes
-----
When the downsampling causes number of simples to fall below minSamples, downsampling is not performed. The function returns False in this situation
"""
# check to see not at max level
if self.level >= self.decParams.numLevels:
self.printWarning(
"Error, number of decimation levels exceeded, returning no data"
)
return False
# if downsample factor is 1, nothing to do
if downsampleFactor == 1:
return True
# downsampling reduces the number of samples by downsample factor
# if new number of samples is too small, return False
if self.numSamples / downsampleFactor < self.minSamples:
self.printWarning(
"Next decimation level has less than {} samples. Decimation is exiting.\nSet minimum of samples required using decimator.setMinSamples().".format(
self.minSamples
)
)
return False
# do the resampling
self.timeData.data = downsampleData(self.timeData.data, downsampleFactor)
# update the rest of the timeData object
self.timeData.sampleFreq = self.timeData.sampleFreq / downsampleFactor
self.timeData.numSamples = self.timeData.data[self.chans[0]].size
# start time stays the same, but end time needs to change
self.timeData.stopTime = self.timeData.startTime + timedelta(
seconds=(1.0 / self.timeData.sampleFreq) * (self.timeData.numSamples - 1)
)
self.timeData.addComment(
"Time data decimated from {} Hz to {} Hz, new start time {}, new end time {}".format(
self.sampleFreq,
self.timeData.sampleFreq,
self.timeData.startTime,
self.timeData.stopTime,
)
)
return True
[docs] def printList(self) -> List[str]:
"""Class information as a list of strings
Returns
-------
out : list
List of strings with information
"""
textLst = []
textLst.append("Current level = {:d}".format(self.level))
if self.level == -1:
textLst.append("This is the initial level - no decimation has occured")
textLst.append("Current sample freq. [Hz] = {:.6f}".format(self.sampleFreq))
textLst.append("Current sample rate [s] = {:.6f}".format(1.0 / self.sampleFreq))
textLst.append("Current number of samples = {:d}".format(self.numSamples))
return textLst