import os
import random
import numpy as np
import scipy.interpolate as interp
from typing import List, Dict, Tuple
from resistics.common.base import ResisticsBase
from resistics.window.selector import WindowSelector
from resistics.transfunc.data import TransferFunctionData
from resistics.transfunc.io import TransferFunctionWriter
from resistics.common.io import checkAndMakeDir, fileFormatSampleFreq
from resistics.common.smooth import smooth1d
from resistics.regression.compute import spectralMatrices
from resistics.regression.robust import (
sampleMAD0,
hermitianTranspose,
olsModel,
chatterjeeMachler,
mmestimateModel,
)
[docs]class LocalRegressor(ResisticsBase):
"""Performs single site (or intersite) transfer function calculations
By default, the LocalRegression is setup to calculate the impedance tensor using Hx, Hy as input channels and Ex, Ey as output channels. To calculate the Tipper, the appropriate input and output channels have to be set.
Attributes
----------
winSelector : WindowSelector
A window selector object which defines which windows to use in the linear model
decParams : DecimationParameters
DecimationParameters object with information about the decimation scheme
winParams : WindowParameters
WindowParameters object with information about the windowing
outpath : str
Location to put the calculated transfer functions (Edi files)
inSite : str
The site to use for the input channels
inChannels: List[str] (["Hx", "Hy"])
List of hannels to use as input channels for the linear system
inSize : int
Number of input channels
outSite : str
The site to use for the output channels
outChannels : List[str] (["Ex", "Ey"])
List of channels to use as output channels for the linear system
outSize : int
Number of output channels
allChannels : List[str]
inChannels and outChannels combined into a single list
crossChannels : List[str]
The channels to calculate the cross spectra out for
intercept : bool (default False)
Flag for including an intercept (static) term in the linear system
method : str (options, "ols", "cm")
String for describing what solution method to use
win : str (default hanning)
Window function to use in robust solution
winSmooth : int (default -1)
The size of the window smoother. If -1, this will be autocalculated based on data size
postpend : str (default "")
String to postpend to the output filename to help file management
evalFreq : List[float] or np.ndarray
The evaluation frequencies
impedances : List
variances : List
Methods
-------
__init__(proj, winSelector, outpath)
Initialise with a Project instance and MaskData instance
setInput(inSite, inChannels)
Set the input site and channels
setOutput(outSite, outChannels)
Set the output site and channels
process()
Process the spectra to calculate the transfer function
getWindowSmooth()
Get the window smooth length
checkForBadValues(numWindows, data)
Check the spectral data for bad values that might cause an error
prepareLinearEqn(data)
Prepare regressors and observations for regression from cross-power data
robustProcess(numWindows, obs, reg)
Robust regression processing
olsProcess(numWindows, obs, reg)
Ordinary least squares processing
stackedProcess(data)
Stacked processing
printList()
Class status returned as list of strings
"""
def __init__(self, winSelector: WindowSelector, outpath: str):
"""Intialise the processor
Parameters
----------
winSelector : WindowSelector
A window selector instance
outpath : str
The path to write the transfer function data to
"""
self.winSelector = winSelector
self.decParams = winSelector.decParams
self.winParams = winSelector.winParams
self.outpath: str = outpath
# default parameters for user options
self.inSite: str = "dummy"
self.inChannels: List[str] = ["Hx", "Hy"]
self.inSize: int = len(self.inChannels)
self.outSite: str = "dummy"
self.outChannels: List[str] = ["Ex", "Ey"]
self.outSize: int = len(self.outChannels)
self.allChannels: List[str] = self.inChannels + self.outChannels
self.crossChannels: List[str] = self.allChannels
# solution options
self.intercept: bool = False
self.method: str = "cm"
# smoothing options
self.win: str = "hanning"
self.winSmooth: int = 9
# output filename
self.postpend: str = ""
# evaluation frequency data
self.evalFreq = []
self.impedances = []
self.variances = []
[docs] def setOutput(self, outSite: str, outChannels: List[str]) -> None:
"""Set information about output site and channels
Parameters
----------
inSite : str
Site to use for output channel data
inChannels : List[str]
Channels to use as the output in the linear system
"""
self.outSite = outSite
self.outChannels = outChannels
self.outSize = len(outChannels)
# set all and cross channels
self.allChannels = self.inChannels + self.outChannels
self.crossChannels = self.allChannels
[docs] def process(self, ncores: int = 0) -> None:
"""Process spectra data
The processing sequence for each decimation level is as below:
1. Get shared (unmasked) windows for all relevant sites (inSite and outSite)
2. For shared unmasked windows
- Calculate out the cross-power spectra.
- Interpolate calculated cross-power data to the evaluation frequencies for the decimation level.
3. For each evaluation frequency
- Do the robust processing to calculate the transfer function at that evaluation frequency.
The spectral power data is smoothed as this tends to improve results. The smoothing can be changed by setting the smoothing parameters. This method is still subject to change in the future as it is an area of active work
"""
numLevels: int = self.decParams.numLevels
for declevel in range(0, numLevels):
self.printText("Processing decimation level {}".format(declevel))
numWindows = self.winSelector.getNumSharedWindows(declevel)
unmaskedWindows = self.winSelector.getUnmaskedWindowsLevel(declevel)
numUnmasked = len(unmaskedWindows)
self.printText(
"Total shared windows for decimation level = {}".format(numWindows)
)
self.printText(
"Total unmasked windows for decimation level = {}".format(numUnmasked)
)
if numUnmasked == 0:
self.printText(
"No unmasked windows found at this decimation level ({:d}), continuing to next level".format(
declevel
)
)
continue
self.printText("{} windows will be processed".format(numUnmasked))
# set variables
evalFreq = self.decParams.getEvalFrequenciesForLevel(declevel)
totalSize: int = self.inSize + self.outSize
numEvalFreq: int = len(evalFreq)
dataSize: int = self.winSelector.getDataSize(declevel)
smoothLen: int = self.getWindowSmooth(datasize=dataSize)
# data array for each evaluation frequency and keep spectral power information for all windows
evalFreqData: np.ndarray = np.empty(
shape=(numEvalFreq, numWindows, totalSize, totalSize), dtype="complex"
)
# global to local map to help choose windows for each evaluation frequency
localWin: int = 0
global2local: Dict = {}
# process spectral batches
spectraBatches = self.winSelector.getSpecReaderBatches(declevel)
numBatches = len(spectraBatches)
for batchIdx, batch in enumerate(spectraBatches):
# find the unmasked batched windows and add the data
batchedWindows = unmaskedWindows.intersection(
set(range(batch["globalrange"][0], batch["globalrange"][1] + 1))
)
self.printText(
"Processing batch {:d} of {:d}: Global window range {:d} to {:d}, {:.3%} of data".format(
batchIdx + 1,
numBatches,
batch["globalrange"][0],
batch["globalrange"][1],
len(batchedWindows) / numUnmasked,
)
)
# collect spectrum data and compute spectral matrices
# set batchedWindows from return to ensure proper order
inReader = batch[self.inSite]
inData, batchedWindows = inReader.readBinaryBatchGlobal(
globalIndices=batchedWindows
)
if self.outSite != self.inSite:
outReader = batch[self.outSite]
outData, _gIndicesOut = outReader.readBinaryBatchGlobal(
globalIndices=batchedWindows
)
else:
outData = inData
out = spectralMatrices(
ncores,
inData,
outData,
self.inChannels,
self.outChannels,
smoothLen,
self.win,
evalFreq,
)
for batchWin, globalWin in enumerate(batchedWindows):
evalFreqData[:, localWin] = out[batchWin]
# local to global map and increment local window
global2local[globalWin] = localWin
localWin = localWin + 1
# close spectra files for decimation level
for batch in spectraBatches:
for site in self.winSelector.sites:
batch[site].closeFile()
# data has been collected for each evaluation frequency, perform robust processing
for eIdx in range(0, numEvalFreq):
self.printText(
"Processing evaluation frequency = {:.6f} [Hz], period = {:.6f} [s]".format(
evalFreq[eIdx], 1 / evalFreq[eIdx]
)
)
# get the constrained windows for the evaluation frequency
evalFreqWindows = self.winSelector.getWindowsForFreq(declevel, eIdx)
if len(evalFreqWindows) == 0:
# no windows meet constraints
self.printText("No windows found - possibly due to masking")
continue
localWinIndices = []
for iW in evalFreqWindows:
localWinIndices.append(global2local[iW])
self.printText(
"{:d} windows will be solved for".format(len(localWinIndices))
)
# restrict processing to data that meets constraints for this evaluation frequency
self.evalFreq.append(evalFreq[eIdx])
# solution using all components
numSolveWindows, obs, reg = self.prepareLinearEqn(
evalFreqData[eIdx, localWinIndices]
)
out, var = self.robustProcess(numSolveWindows, obs, reg)
# out, var = self.olsProcess(numSolveWindows, obs, reg)
# out, var = self.stackedProcess(evalFreqData[eIdx, localWinIndices])
self.impedances.append(out)
self.variances.append(var)
if len(self.evalFreq) == 0:
self.printWarning(
"No data was found at any decimation level for insite {}, outsite {} and specdir {}".format(
self.inSite, self.outSite, self.winSelector.specdir
)
)
return
# write out all the data
self.writeTF(
self.winSelector.specdir,
self.postpend,
self.evalFreq,
self.impedances,
self.variances,
)
[docs] def getWindowSmooth(self, **kwargs):
"""Window smoothing length
Power spectra data is smoothed. This returns the size of the smoothing window.
Parameters
----------
datasize : int
The size of the data
Returns
-------
smoothLen : int
Smoothing size
"""
# check if window size specified by user
if self.winSmooth != -1 and self.winSmooth > 1:
return self.winSmooth
# if not, calculate based on datasize
if "datasize" in kwargs:
winSmooth = kwargs["datasize"] * 1.0 / 16.0
if winSmooth < 3:
return 3 # minimum smoothing
# otherwise round to nearest odd number
winSmooth = np.ceil(winSmooth) // 2 # this is floor division
return int(winSmooth * 2 + 1)
# otherwise, return a default value
return 15
[docs] def checkForBadValues(self, numWindows: int, data: np.ndarray):
"""Check data for bad values and remove
Parameters
----------
numWindows : int
The number of windows
data : np.ndarray
Cross-spectra data
Returns
-------
numGoodWindows : int
The number of good windows
goodData : np.ndarray
The cross-spectra data with bad windows removed
"""
finiteArray = np.ones(shape=(numWindows))
for iW in range(0, numWindows):
if not np.isfinite(data[iW]).all():
finiteArray[iW] = 0
numGoodWindows = sum(finiteArray)
if numGoodWindows == numWindows:
return numWindows, data
self.printWarning(
"Bad data found...number of windows reduced from {} to {}".format(
numWindows, numGoodWindows
)
)
goodWindowIndices = np.where(finiteArray == 1)
return numGoodWindows, data[goodWindowIndices]
[docs] def prepareLinearEqn(self, data: np.ndarray):
r"""Prepare data as a linear equation for the robust regression
This prepares the data for the following type of solution,
.. math::
y = Ax,
where :math:`y` is the observations, :math:`A` is the regressors and :math:`x` is the unknown.
The number of observations is number of windows * number of cross-power channels
The shapes of the arrays are as follows:
- y is (number of output channels, number of observations)
- A is (number of output channels, number of observations, number of input channels)
- x is (number of output channels, number of input channels)
Consider the impedance tensor,
.. math::
:nowrap:
\begin{eqnarray}
E_x & = & Z_{xx} H_x + Z_{xy} H_y \\
E_y & = & Z_{yx} H_x + Z_{yy} H_y
\end{eqnarray}
Here, there are two input channels, :math:`H_x`, :math:`H_y` and two output channels :math:`E_x` and :math:`E_y`. In total, there are four components of the unknown impedance tensor, :math:`Z_{xx}`, :math:`Z_{xy}`, :math:`Z_{yx}`, :math:`Z_{yy}` (number of input channels * number of output channels). The number of observations is the number of windows multiplied by the number of channels used for cross-power spectra.
Parameters
----------
data : np.ndarray
Cross-power spectral data at evaluation frequencies
Returns
-------
numWindows : int
The number of windows included in the regression (after bad value removal)
obs : np.ndarray
Observations array
reg : np.ndarray
Regressors array
"""
numWindows = data.shape[0]
# check for bad values
numWindows, data = self.checkForBadValues(numWindows, data)
crossSize = len(self.crossChannels)
# for each output variable, have number of input regressor variables
# construct our arrays
obs = np.empty(shape=(self.outSize, crossSize * numWindows), dtype="complex")
reg = np.empty(
shape=(self.outSize, crossSize * numWindows, self.inSize), dtype="complex"
)
for iW in range(0, numWindows):
iOffset = iW * crossSize
for i in range(0, self.outSize):
for j, crossChan in enumerate(self.crossChannels):
# this is the observation row where i is the observed output
crossIndex = self.allChannels.index(crossChan)
obs[i, iOffset + j] = data[iW, self.inSize + i, crossIndex]
for k in range(0, self.inSize):
reg[i, iOffset + j, k] = data[iW, k, crossIndex]
return numWindows, obs, reg
[docs] def robustProcess(
self, numWindows: int, obs: np.ndarray, reg: np.ndarray
) -> Tuple[np.ndarray]:
"""Robust regression processing
Perform robust regression processing using observations and regressors for a single evaluation frequency.
Parameters
----------
numWindows : int
The number of windows
obs : np.ndarray
The observations
reg : np.ndarray
The regressors
Returns
-------
output : np.ndarray
The solution to the regression problem
varOutput : np.ndarray
The variance
"""
crossSize = len(self.crossChannels)
# create array for output
output = np.empty(shape=(self.outSize, self.inSize), dtype="complex")
varOutput = np.empty(shape=(self.outSize, self.inSize), dtype="float")
# solve
for i in range(0, self.outSize):
observation = obs[i, :]
predictors = reg[i, :, :]
# save the output
out, resids, weights = chatterjeeMachler(
predictors, observation, intercept=self.intercept
)
# out, resids, scale, weights = mmestimateModel(predictors, observation, intercept=self.intercept)
# now take the weights, apply to the observations and predictors, stack the appropriate rows
observation2 = np.zeros(shape=(crossSize), dtype="complex")
predictors2 = np.zeros(shape=(crossSize, self.inSize), dtype="complex")
for iChan in range(0, crossSize):
# now need to have my indexing array
indexArray = np.arange(iChan, numWindows * crossSize, crossSize)
weightsLim = weights[indexArray]
# weightsLim = weightsLim/np.sum(weightsLim) # normalise weights to 1
observation2[iChan] = (
np.sum(obs[i, indexArray] * weightsLim) / numWindows
)
# now for the regressors
for j in range(0, self.inSize):
predictors2[iChan, j] = (
np.sum(reg[i, indexArray, j] * weightsLim) / numWindows
)
out, resids, weights = chatterjeeMachler(
predictors2, observation2, intercept=self.intercept
)
# out, resids, scale, weights = mmestimateModel(
# predictors2, observation2, intercept=self.intercept)
if self.intercept:
out = out[1:]
# now calculate out the varainces - have the solution out, have the weights
# recalculate out the residuals with the final solution
# calculate standard deviation of residuals
# and then use chatterjee machler formula to estimate variances
# this needs work - better to use an empirical bootstrap method, but this will do for now
resids = np.absolute(observation - np.dot(predictors, out))
scale = sampleMAD0(
resids
) # some measure of standard deviation, rather than using the standard deviation
residsVar = scale * scale
# varPred = np.dot(hermitianTranspose(predictors), weights*predictors) # need to fix this
varPred = np.dot(hermitianTranspose(predictors), predictors)
varPred = np.linalg.inv(varPred) # this is a pxp matrix
varOut = 1.91472 * residsVar * varPred
varOut = np.diag(varOut).real # this should be a real number
# if self.intercept:
# output[i] = out[1:]
# varOutput[i] = varOut[1:]
# else:
# output[i] = out
# varOutput[i] = varOut
output[i] = out
varOutput[i] = varOut
return output, varOutput
[docs] def olsProcess(
self, numWindows: int, obs: np.ndarray, reg: np.ndarray
) -> Tuple[np.ndarray, np.ndarray]:
"""Ordinary least squares regression processing
Perform ordinary least regression processing using observations and regressors for a single evaluation frequency.
Parameters
----------
numWindows : int
The number of windows
obs : np.ndarray
The observations
reg : np.ndarray
The regressors
Returns
-------
output : np.ndarray
The solution to the regression problem
varOutput : np.ndarray
The variance
"""
# create array for output
output = np.empty(shape=(self.outSize, self.inSize), dtype="complex")
varOutput = np.empty(shape=(self.outSize, self.inSize), dtype="float")
# solve
for i in range(0, self.outSize):
observation = obs[i, :]
predictors = reg[i, :, :]
# save the output
out, resids, squareResid, rank, s = olsModel(
predictors, observation, intercept=self.intercept
)
if self.intercept:
out = out[1:]
# now calculate out the varainces - have the solution out, have the weights
# recalculate out the residuals with the final solution
# calculate standard deviation of residuals
# and then use chatterjee machler formula to estimate variances
# this needs work - better to use an empirical bootstrap method, but this will do for now
resids = np.absolute(observation - np.dot(predictors, out))
scale = sampleMAD0(
resids
) # some measure of standard deviation, rather than using the standard deviation
residsVar = scale * scale
# varPred = np.dot(hermitianTranspose(predictors), weights*predictors) # need to fix this
varPred = np.dot(hermitianTranspose(predictors), predictors)
varPred = np.linalg.inv(varPred) # this is a pxp matrix
varOut = 1.91472 * residsVar * varPred
varOut = np.diag(varOut).real # this should be a real number
# if self.intercept:
# output[i] = out[1:]
# varOutput[i] = varOut[1:]
# else:
# output[i] = out
# varOutput[i] = varOut
output[i] = out
varOutput[i] = varOut
return output, varOutput
[docs] def stackedProcess(self, data: np.ndarray) -> np.ndarray:
"""Ordinary least squares processing after stacking
Parameters
----------
data : np.ndarray
Cross-spectra data
Returns
-------
output : np.ndarray
The solution to the regression problem
varOutput : np.ndarray
The variance
"""
# then do various sums
numWindows = data.shape[0]
crossSize = len(self.crossChannels)
# unweighted sum (i.e. normal solution)
unWeightedSum = np.sum(data, axis=0)
unWeightedSum = unWeightedSum / numWindows
# for each output variable, have ninput regressor variables
# let's construct our arrays
obs = np.empty(shape=(self.outSize, crossSize), dtype="complex")
reg = np.empty(shape=(self.outSize, crossSize, self.inSize), dtype="complex")
for i in range(0, self.outSize):
for j, crossChan in enumerate(self.crossChannels):
crossIndex = self.allChannels.index(crossChan)
obs[i, j] = unWeightedSum[self.inSize + i, crossIndex]
for k in range(0, self.inSize):
reg[i, j, k] = unWeightedSum[k, crossIndex]
# create array for output
output = np.empty(shape=(self.outSize, self.inSize), dtype="complex")
for i in range(0, self.outSize):
observation = obs[i, :]
predictors = reg[i, :, :]
# save the output
out, resids, scale, weights = mmestimateModel(
predictors, observation, intercept=self.intercept
)
if self.intercept:
output[i] = out[1:]
else:
output[i] = out
return output, output*0.1
[docs] def writeTF(
self,
specdir: str,
postpend: str,
freq,
data: np.ndarray,
variances: np.ndarray,
**kwargs
):
"""Write the transfer function file
Parameters
----------
specdir : str
The spectra data being used for the transfer function estimate
postpend : str
The optional postpend to the transfer function file
data : np.ndarray
The transfer function estimates
variances : np.ndarray
The transfer function variances
remotesite : str, optional
Optionally, if there is a remote site
remotechans : List[str], optional
Optionally add the remote channels if there is a remote site
"""
# path for writing out to
sampleFreqStr = fileFormatSampleFreq(self.decParams.sampleFreq)
if postpend == "":
filename = "{}_fs{:s}_{}".format(self.outSite, sampleFreqStr, specdir)
else:
filename = "{}_fs{:s}_{}_{}".format(
self.outSite, sampleFreqStr, specdir, postpend
)
datapath = os.path.join(self.outpath, sampleFreqStr)
checkAndMakeDir(datapath)
outfile = os.path.join(datapath, filename)
# now construct the transferFunctionData object
numFreq = len(freq)
dataDict = {}
varDict = {}
for i in range(0, self.outSize):
for j in range(0, self.inSize):
key = "{}{}".format(self.outChannels[i], self.inChannels[j])
dataArray = np.empty(shape=(numFreq), dtype="complex")
varArray = np.empty(shape=(len(freq)), dtype="float")
for ifreq in range(0, numFreq):
dataArray[ifreq] = data[ifreq][i, j]
varArray[ifreq] = variances[ifreq][i, j]
dataDict[key] = dataArray
varDict[key] = varArray
tfData = TransferFunctionData(freq, dataDict, varDict)
# now make the writer and write out
tfWriter = TransferFunctionWriter(outfile, tfData)
tfWriter.setHeaders(
sampleFreq=self.decParams.sampleFreq,
insite=self.inSite,
inchans=self.inChannels,
outsite=self.outSite,
outchans=self.outChannels,
)
if "remotesite" in kwargs:
tfWriter.addHeader("remotesite", kwargs["remotesite"])
if "remotechans" in kwargs:
tfWriter.addHeader("remotechans", kwargs["remotechans"])
tfWriter.write()
[docs] def printList(self) -> List[str]:
"""Class information as a list of strings
Returns
-------
list
List of strings with information
"""
textLst = []
textLst.append("In Site = {}".format(self.inSite))
textLst.append("In Channels = {}".format(self.inChannels))
textLst.append("Out Site = {}".format(self.outSite))
textLst.append("Out Channels = {}".format(self.outChannels))
textLst.append("Sample frequency = {:.3f}".format(self.decParams.sampleFreq))
return textLst