# Source code for resistics.time.clean

import numpy as np
import math
from typing import Dict

[docs]def removeZeros(data: Dict):
"""Remove a stretch of zeros in the data

This function finds a stretch of zeros and tries to fill them in with better data i.e. interpolated data or some such.

Parameters
----------
data : Dict
Dictionary of data with channel as key and a np.ndarray as value

Returns
-------
Dict
Dictionary of data with channel as key and a np.ndarray as value (with zero stretches removed)
"""
for chan in data:
data[chan] = removeZerosSingle(data[chan])
return data

[docs]def removeZerosSingle(data: np.ndarray) -> np.ndarray:
"""Remove a stretch of zeros in a data array

This function finds a stretch of zeros and tries to fill them in with better data i.e. interpolated data or some such.

Parameters
----------
data : np.ndarray
Array of data

Returns
-------
np.ndarray
Array of data with zeros removed
"""
eps = 0.000000001  # use this because of floating point precision
# set an x array
x = np.arange(data.size)
# find zero locations - this returns a tuple, take the first index
zeroLocs = np.where(np.absolute(data) < eps)[0]
if len(zeroLocs) == 0:
return data  # no zeros to remove

# now want to find consecutive zeros
grouped = groupConsecutive(zeroLocs)
indicesToFix = []
# now find groups of 3+
for g in grouped:
if g.size >= 20:
indicesToFix = indicesToFix + list(g)
# now have the indices we want to fix
# can go about interpolating values there
indicesToFix = np.array(sorted(indicesToFix))
mask = np.ones(data.size, np.bool)
mask[indicesToFix] = 0
data[indicesToFix] = np.interp(indicesToFix, x[mask], data[mask])
return data

[docs]def groupConsecutive(vals: np.ndarray, stepsize: int = 1):
"""Takes an array of values and splits it into consecutive sections of stepsize

In general, the stepsize is 1.

Parameters
----------
vals : np.ndarray
A set of values to split into consecutive sections
stepsize : int
The stepsize between values that means they are consecutive

Examples
--------
An array of [1,2,3,5,6,7,10,12,13] would be split into consecutive sections [1,2,3], [5,6,7], [10], [12,13]
"""
return np.split(vals, np.where(np.diff(vals) != stepsize)[0] + 1)

[docs]def removeNans(data: Dict):
"""Remove NaNs in the data

This function finds NaNs in the data and tries to fill them in with better data i.e. interpolated data or some such.

Parameters
----------
data : Dict
Dictionary of data with channel as key and a np.ndarray as value

Returns
-------
Dict
Dictionary of data with channel as key and a np.ndarray as value (with zero stretches removed)
"""
# find nan in the dataset and removes the values
for chan in data:
data[chan] = removeNansSingle(data[chan])
return data

[docs]def removeNansSingle(data):
"""Remove NaNs in a data array

This function finds NaNs in the np.ndarray and tries to fill them in with better data i.e. interpolated data or some such.

Parameters
----------
data : np.ndarray
Array of data

Returns
-------
np.ndarray
Array of data with zeros removed
"""
# set an x array
x = np.arange(data.size)
# find locations of nans - this is a bool array with True in locations with nan values
nanLocs = np.isnan(data)
# if no nans, do nothing
if not np.any(nanLocs):
return data  # no nans to remove
# create mask
mask = np.ones(data.size, np.bool)
mask[nanLocs] = 0  # using numpy indexing with bool arrays
# no need to group, want to remove every nan
data[nanLocs] = np.interp(x[nanLocs], x[mask], data[mask])
return data