import numpy as np
import scipy.interpolate as interp
from datetime import datetime, timedelta
from typing import Dict
from resistics.time.data import TimeData
from resistics.common.math import intdiv
from resistics.common.print import errorPrint
[docs]def interpolateToSecond(timeData: TimeData, inplace: bool = True) -> TimeData:
"""Interpolate data to be on the second
Some formats of time data (e.g. SPAM) do not start on the second with their sampling. This method interpolates so that sampling starts on the second and improves interoperability with other recording formats.
Parameters
----------
timeData : TimeData
Time data to interpolate onto the second
inplace : bool, optional
Whether to do the interpolation inplace or not. Default is True.
Returns
-------
TimeData
Time data interpolated to start on the second
"""
startTimeInterp, numSamplesInterp, dataInterp = interpolateToSecondData(
timeData.data, timeData.sampleFreq, timeData.startTime
)
if not inplace:
timeData = timeData.copy()
timeData.numSamples = numSamplesInterp
timeData.startTime = startTimeInterp
# calculate end timeEnd
timeData.stopTime = timeData.startTime + timedelta(
seconds=(1.0 / timeData.sampleFreq) * (timeData.numSamples - 1)
)
timeData.data = dataInterp
timeData.addComment(
"Time data interpolated to nearest second. New start time {}, new end time {}, new number of samples {} ".format(
timeData.startTime, timeData.stopTime, timeData.numSamples
)
)
return timeData
[docs]def interpolateToSecondData(
data: Dict[str, np.ndarray], sampleFreq: float, startTime: datetime
) -> Dict[str, np.ndarray]:
"""Interpolate data to be on the second
Interpolates the sampling so that it coincides with full seconds. The function also shifts the start point to the next full second
WARNING: Do not use this method on data recording with a sampling frequency of less than 1Hz
Parameters
----------
data : Dict
Dictionary with channel as keys and data as values
sampleFreq : float
Sampling frequency of the data
startTime : datetime
Time of first sample
Returns
-------
data : Dict
Dictionary with channel as keys and data as values
Notes
-----
This function will truncate the data to the next second.
todo:
This function needs to be more robust for low (< 1Hz) sample frequencies as the use of microseconds and seconds makes no sense for this
"""
# data properties
chans = list(data.keys())
samplePeriod = 1.0 / sampleFreq
# set initial vals
numSamples = data[chans[0]].size
# now caluclate the interpolation
microseconds = startTime.time().microsecond
# check if the dataset already begins on a second
if microseconds == 0:
return startTime, numSamples, data # do nothing, already on the second
# now turn microseconds into a decimal
microseconds = microseconds / 1000000.0
# now calculate the number of complete samples till the next second
eps = 0.000000001
test = microseconds
samplesToDrop = 0
# this loop will always either calculate till the full second or the next sample passed the full second
while test < 1.0 - eps:
test += samplePeriod
samplesToDrop += 1
# if this is exact, i.e. integer number of samples to next second, just need to drop samples
multiple = (1.0 - microseconds) / samplePeriod
if np.absolute(multiple - samplesToDrop) < eps: # floating point arithmetic
dataInterp = {} # create a new dictionary for data
for chan in chans:
dataInterp[chan] = data[chan][samplesToDrop:]
# update the other data
numSamplesInterp = numSamples - samplesToDrop
startTimeInterp = startTime + timedelta(
seconds=1.0 * samplesToDrop / sampleFreq
)
return startTimeInterp, numSamplesInterp, dataInterp
# if here, then we have calculated one extra for samplesToDrop
samplesToDrop -= 1
# now the number of samples to the next full second is not an integer
# interpolation will have to be performed
shift = (multiple - samplesToDrop) * samplePeriod
sampleShift = shift / samplePeriod
x = np.arange(0, numSamples)
xInterp = np.arange(samplesToDrop, numSamples - 1) + sampleShift
# calculate return vars
numSamplesInterp = xInterp.size
startTimeInterp = (
startTime
+ timedelta(seconds=1.0 * samplesToDrop / sampleFreq)
+ timedelta(seconds=shift)
)
# do the interpolation
dataInterp = {}
for chan in chans:
# interpFunc = interp.InterpolatedUnivariateSpline(x, data[chan])
# dataInterp[chan] = interpFunc(xInterp)
tck = interp.splrep(x, data[chan], s=0)
dataInterp[chan] = interp.splev(xInterp, tck, der=0)
# need to calculate how much the
return startTimeInterp, numSamplesInterp, dataInterp
[docs]def fillGap(timeData1, timeData2):
"""Fill gap between time series
Fill gaps between two different recordings. The intent is to fill the gap when recording has been interrupted and there are two data files. Both times series must have the same sampling frequency.
Parameters
----------
timeDat1 : TimeData
Time series data
timeData2 : TimeData
Time series data
Returns
-------
TimeData
Time series data with gap filled
"""
if timeData1.sampleFreq != timeData2.sampleFreq:
errorPrint(
"fillGap",
"fillGap requires both timeData objects to have the same sample rate",
quitRun=True,
)
return False
sampleFreq = timeData1.sampleFreq
sampleRate = 1.0 / sampleFreq
timeDataFirst = timeData1
timeDataSecond = timeData2
if timeData1.startTime > timeData2.stopTime:
timeDataFirst = timeData2
timeDataSecond = timeData1
# now want to do a simple interpolation between timeDataFirst and timeDataSecond
# recall, these times are inclusive, so want to do the samples in between
# this is mostly for clarity of programming
gapStart = timeDataFirst.stopTime + timedelta(seconds=sampleRate)
gapEnd = timeDataSecond.startTime - timedelta(seconds=sampleRate)
# calculate number of samples in the gap
numSamplesGap = (
int(round((gapEnd - gapStart).total_seconds() * sampleFreq)) + 1
) # add 1 because inclusive
# now want to interpolate
newData = {}
for chan in timeDataFirst.chans:
startVal = timeDataFirst.data[chan][-1]
endVal = timeDataSecond.data[chan][0]
increment = 1.0 * (endVal - startVal) / (numSamplesGap + 2)
fillData = np.zeros(shape=(numSamplesGap), dtype=timeDataFirst.data[chan].dtype)
for i in range(0, numSamplesGap):
fillData[i] = startVal + (i + 1) * increment
newData[chan] = np.concatenate(
[timeDataFirst.data[chan], fillData, timeDataSecond.data[chan]]
)
# return a new time data object
# deal with the comment
comment = (
["-----------------------------", "TimeData1 comments"]
+ timeDataFirst.comments
+ ["-----------------------------", "TimeData2 comments"]
+ timeDataSecond.comments
)
comment += ["-----------------------------"] + [
"Gap filled from {} to {}".format(gapStart, gapEnd)
]
return TimeData(
sampleFreq=sampleFreq,
startTime=timeDataFirst.startTime,
stopTime=timeDataSecond.stopTime,
data=newData,
comments=comment,
)