Source code for resistics.regression.remote

import os
import numpy as np
from typing import List, Tuple, Dict

from resistics.common.smooth import smooth1d
from resistics.regression.local import LocalRegressor
from resistics.regression.compute import remoteMatrices
from resistics.regression.robust import (
    chatterjeeMachler,
    sampleMAD0,
    hermitianTranspose,
    olsModel,
    mmestimateModel,
)


[docs]class RemoteRegressor(LocalRegressor): """Perform remote reference processing of magnetotelluric data Remote reference processing uses a different equation for spectral power than the single site processor. The equation that the processor uses is: Attributes ---------- winSelector : WindowSelector A window selector object which defines which windows to use in the linear model decParams : DecimationParameters DecimationParameters object with information about the decimation scheme winParams : WindowParameters WindowParameters object with information about the windowing outpath : str Location to put the calculated transfer functions (Edi files) inSite : str The site to use for the input channels inChannels: List[str] (["Hx", "Hy"]) List of hannels to use as input channels for the linear system inSize : int Number of input channels outSite : str The site to use for the output channels outChannels : List[str] (["Ex", "Ey"]) List of channels to use as output channels for the linear system outSize : int Number of output channels remoteSite : str The site to use as a remote reference remoteChannels : List[str] The channels to use as remote reference channels remoteSize : int Number of remote reference channels intercept : bool (default False) Flag for including an intercept (static) term in the linear system method : str (options, "ols", "cm") String for describing what solution method to use win : str (default hanning) Window function to use in robust solution winSmooth : int (default -1) The size of the window smoother. If -1, this will be autocalculated based on data size postpend : str (default "") String to postpend to the output filename to help file management evalFreq : List[float] or np.ndarray The evaluation frequencies impedances : List variances : List Methods ------- __init__(winSelector, outpath) Initialise the remote reference processor setRemote(remoteSite, remoteChannels) Set the remote site and channels process() Perform the remote reference processing checkRemote() Ensure remote reference site and channels are properly defined prepareLinearEqn(data) Prepare regressors and observations for regression from cross-power data robustProcess(numWindows, obs, reg) Robust regression processing olsProcess(numWindows, obs, reg) Ordinary least squares processing stackedProcess(data) Stacked processing printList() Class status returned as list of strings """ def __init__(self, winSelector, outpath: str): # required self.winSelector = winSelector self.decParams = winSelector.decParams self.winParams = winSelector.winParams self.outpath: str = outpath # default parameters for user options self.inSite: str = "dummy" self.inChannels: List[str] = ["Hx", "Hy"] self.inSize: int = len(self.inChannels) self.outSite: str = "dummy" self.outChannels: List[str] = ["Ex", "Ey"] self.outSize: int = len(self.outChannels) self.remoteSite: str = "" self.remoteChannels: List[str] = ["Hx", "Hy"] self.remoteSize = len(self.remoteChannels) # intercept options self.intercept: bool = False # regression method self.method: str = "mm" # smoothing options self.win: str = "hanning" self.winSmooth: int = -1 # output filename self.postpend: str = "" # evaluation frequency data self.evalFreq = [] self.impedances = [] self.variances = []
[docs] def setRemote(self, remoteSite: str, remoteChannels: List[str]) -> None: """Set information about input site and channels Parameters ---------- remoteSite : str Site to use for input channel data remoteChannels : List[str] Channels to use as the remote reference in the linear system """ self.remoteSite = remoteSite self.remoteChannels = remoteChannels self.remoteSize = len(remoteChannels) self.printText( "Remote reference set with site {} and channels {}".format( self.remoteSite, self.remoteChannels ) )
[docs] def process(self, ncores: int = 0) -> None: """Process spectra data The processing sequence for each decimation level is as below: 1. Get shared (unmasked) windows for all relevant sites (inSite and outSite) 2. For shared unmasked windows - Calculate out the cross-power spectra. - Interpolate calculated cross-power data to the evaluation frequencies for the decimation level. 3. For each evaluation frequency - Do the robust processing to calculate the transfer function at that evaluation frequency. The spectral power data is smoothed as this tends to improve results. The smoothing can be changed by setting the smoothing parameters. This method is still subject to change in the future as it is an area of active work """ numLevels = self.decParams.numLevels for declevel in range(0, numLevels): self.printText("Processing decimation level {}".format(declevel)) numWindows = self.winSelector.getNumSharedWindows(declevel) unmaskedWindows = self.winSelector.getUnmaskedWindowsLevel(declevel) numUnmasked = len(unmaskedWindows) self.printText( "Total shared windows for decimation level = {}".format(numWindows) ) self.printText( "Total unmasked windows for decimation level = {}".format(numUnmasked) ) if numUnmasked == 0: self.printText( "No unmasked windows found at this decimation level ({:d}), continuing to next level".format( declevel ) ) continue self.printText("{} windows will be processed".format(numUnmasked)) # set variables evalFreq = self.decParams.getEvalFrequenciesForLevel(declevel) totalSize: int = self.inSize + self.outSize numEvalFreq: int = len(evalFreq) dataSize: int = self.winSelector.getDataSize(declevel) smoothLen: int = self.getWindowSmooth(datasize=dataSize) # data array for each evaluation frequency and keep spectral power information for all windows evalFreqData = np.empty( shape=(numEvalFreq, numWindows, totalSize, self.remoteSize), dtype="complex", ) # global to local map to help choose windows for each evaluation frequency localWin: int = 0 global2local: Dict = {} # process spectral batches spectraBatches = self.winSelector.getSpecReaderBatches(declevel) numBatches = len(spectraBatches) for batchIdx, batch in enumerate(spectraBatches): # find the unmasked batched windows and add the data batchedWindows = unmaskedWindows.intersection( set(range(batch["globalrange"][0], batch["globalrange"][1] + 1)) ) self.printText( "Processing batch {:d} of {:d}: Global window range {:d} to {:d}, {:d} windows, {:.3%} of data".format( batchIdx + 1, numBatches, batch["globalrange"][0], batch["globalrange"][1], len(batchedWindows), len(batchedWindows) / numUnmasked, ) ) if len(batchedWindows) == 0: self.printText("Batch does not contribute any windows. Continuing to next batch.") continue # collect spectrum data and compute spectral matrices # set batchedWindows from return to ensure proper order inReader = batch[self.inSite] inData, batchedWindows = inReader.readBinaryBatchGlobal( globalIndices=batchedWindows ) if self.outSite != self.inSite: outReader = batch[self.outSite] outData, _gIndicesOut = outReader.readBinaryBatchGlobal( globalIndices=batchedWindows ) else: outData = inData # remote data remoteReader = batch[self.remoteSite] remoteData, _rgIndicesOut = remoteReader.readBinaryBatchGlobal( globalIndices=batchedWindows ) out = remoteMatrices( ncores, inData, outData, remoteData, self.inChannels, self.outChannels, self.remoteChannels, smoothLen, self.win, evalFreq, ) for batchWin, globalWin in enumerate(batchedWindows): evalFreqData[:, localWin] = out[batchWin] # local to global map and increment local window global2local[globalWin] = localWin localWin = localWin + 1 # close spectra files for decimation level for batch in spectraBatches: for site in self.winSelector.sites: batch[site].closeFile() # data has been collected for each evaluation frequency, perform robust processing for eIdx in range(0, numEvalFreq): self.printText( "Processing evaluation frequency = {:.6f} [Hz], period = {:.6f} [s]".format( evalFreq[eIdx], 1 / evalFreq[eIdx] ) ) # get the constrained windows for the evaluation frequency evalFreqWindows = self.winSelector.getWindowsForFreq(declevel, eIdx) if len(evalFreqWindows) == 0: # no windows meet constraints self.printText("No windows found - possibly due to masking") continue localWinIndices = [] for iW in evalFreqWindows: localWinIndices.append(global2local[iW]) self.printText( "{:d} windows will be solved for".format(len(localWinIndices)) ) # restrict processing to data that meets constraints for this evaluation frequency self.evalFreq.append(evalFreq[eIdx]) # solution using all components numSolveWindows, obs, reg = self.prepareLinearEqn( evalFreqData[eIdx, localWinIndices] ) out, var = self.robustProcess(numSolveWindows, obs, reg) # out, var = self.olsProcess(numSolveWindows, obs, reg) self.impedances.append(out) self.variances.append(var) if len(self.evalFreq) == 0: self.printWarning( "No data was found at any decimation level for insite {}, outsite {}, remotesite {} and specdir {}".format( self.inSite, self.outSite, self.remoteSite, self.winSelector.specdir ) ) return # write out all the data self.writeTF( self.winSelector.specdir, self.postpend, self.evalFreq, self.impedances, self.variances, remotesite=self.remoteSite, remotechans=self.remoteChannels, )
[docs] def checkRemote(self): """Check that the remote channels are the same as the input channels Returns ------- bool Check outcome """ check = True check = check and self.remoteSize == self.inSize check = check and self.remoteChannels == self.inChannels return check
[docs] def prepareLinearEqn(self, data): r"""Prepare data as a linear equation for the robust regression This prepares the data for the following type of solution, .. math:: y = Ax, where :math:`y` is the observations, :math:`A` is the regressors and :math:`x` is the unknown. The number of observations is number of windows * number of cross-power channels The shapes of the arrays are as follows: - y is (number of output channels, number of observations) - A is (number of output channels, number of observations, number of input channels) - x is (number of output channels, number of input channels) Consider the impedance tensor, .. math:: :nowrap: \begin{eqnarray} E_x & = & Z_{xx} H_x + Z_{xy} H_y \\ E_y & = & Z_{yx} H_x + Z_{yy} H_y \end{eqnarray} Here, there are two input channels, :math:`H_x`, :math:`H_y` and two output channels :math:`E_x` and :math:`E_y`. In total, there are four components of the unknown impedance tensor, :math:`Z_{xx}`, :math:`Z_{xy}`, :math:`Z_{yx}`, :math:`Z_{yy}` (number of input channels * number of output channels). The number of observations is the number of windows multiplied by the number of channels used for cross-power spectra. Parameters ---------- data : np.ndarray Cross-power spectral data at evaluation frequencies Returns ------- numWindows : int The number of windows included in the regression (after bad value removal) obs : np.ndarray Observations array reg : np.ndarray Regressors array """ numWindows = data.shape[0] numWindows, data = self.checkForBadValues(numWindows, data) # for each output variable, have ninput regressor variables # let's construct our arrays obs = np.empty( shape=(self.outSize, self.remoteSize * numWindows), dtype="complex" ) reg = np.empty( shape=(self.outSize, self.remoteSize * numWindows, self.inSize), dtype="complex", ) for iW in range(0, numWindows): iOffset = iW * self.remoteSize for i in range(0, self.outSize): for j in range(0, self.remoteSize): # this is the observation row where,i is the observed output obs[i, iOffset + j] = data[iW, self.inSize + i, j] for k in range(0, self.inSize): reg[i, iOffset + j, k] = data[iW, k, j] return numWindows, obs, reg
[docs] def robustProcess( self, numWindows: int, obs: np.ndarray, reg: np.ndarray ) -> Tuple[np.ndarray]: """Robust regression processing Perform robust regression processing using observations and regressors for a single evaluation frequency. Parameters ---------- numWindows : int The number of windows obs : np.ndarray The observations reg : np.ndarray The regressors Returns ------- output : np.ndarray The solution to the regression problem varOutput : np.ndarray The variance """ # create array for output output = np.empty(shape=(self.outSize, self.inSize), dtype="complex") varOutput = np.empty(shape=(self.outSize, self.inSize), dtype="float") # solve for i in range(0, self.outSize): observation = obs[i, :] predictors = reg[i, :, :] # save the output out, resids, weights = chatterjeeMachler( predictors, observation, intercept=self.intercept ) # out, resids, scale, weights = mmestimateModel(predictors, observation, intercept=self.intercept) # now take the weights, apply to the observations and predictors, stack the appropriate rows and test observation2 = np.zeros(shape=(self.remoteSize), dtype="complex") predictors2 = np.zeros( shape=(self.remoteSize, self.inSize), dtype="complex" ) for iChan in range(0, self.remoteSize): # now need to have my indexing array indexArray = np.arange( iChan, numWindows * self.remoteSize, self.remoteSize ) weightsLim = weights[indexArray] # weightsLim = weightsLim/np.sum(weightsLim) # normalise weights to 1 observation2[iChan] = ( np.sum(obs[i, indexArray] * weightsLim) / numWindows ) # now for the regressors for j in range(0, self.inSize): predictors2[iChan, j] = ( np.sum(reg[i, indexArray, j] * weightsLim) / numWindows ) out, resids, weights = chatterjeeMachler( predictors2, observation2, intercept=self.intercept ) # out, resids, scale, weights = mmestimateModel(predictors2, observation2, intercept=self.intercept) # now calculate out the varainces - have the solution out, have the weights # recalculate out the residuals with the final solution # calculate standard deviation of residuals # and then use chatterjee machler formula to estimate variances # this needs work - better to use an empirical bootstrap method, but this will do for now resids = np.absolute(observation - np.dot(predictors, out)) scale = sampleMAD0( resids ) # some measure of standard deviation, rather than using the standard deviation residsVar = scale * scale varPred = np.dot(hermitianTranspose(predictors), weights * predictors) varPred = np.linalg.inv(varPred) # this is a pxp matrix varOut = 1.91472 * residsVar * varPred varOut = np.diag(varOut).real # this should be a real number if self.intercept: output[i] = out[1:] varOutput[i] = varOut[1:] else: output[i] = out varOutput[i] = varOut return output, varOutput
[docs] def olsProcess( self, numWindows: int, obs: np.ndarray, reg: np.ndarray ) -> np.ndarray: """Ordinary least squares regression processing Perform ordinary least regression processing using observations and regressors for a single evaluation frequency. Parameters ---------- numWindows : int The number of windows obs : np.ndarray The observations reg : np.ndarray The regressors Returns ------- output : np.ndarray The solution to the regression problem varOutput : np.ndarray The variance """ # create array for output output = np.empty(shape=(self.outSize, self.inSize), dtype="complex") # solve for i in range(0, self.outSize): observation = obs[i, :] predictors = reg[i, :, :] # save the output out, resids, squareResid, rank, s = olsModel( predictors, observation, intercept=self.intercept ) if self.intercept: output[i] = out[1:] else: output[i] = out return output
[docs] def stackedProcess(self, data: np.ndarray) -> np.ndarray: """Ordinary least squares processing after stacking Parameters ---------- data : np.ndarray Cross-spectra data Returns ------- output : np.ndarray The solution to the regression problem varOutput : np.ndarray The variance """ # do various sums numWindows = data.shape[0] numWindows, data = self.checkForBadValues(numWindows, data) # unweighted sum (i.e. normal solution) unWeightedSum = np.sum(data, axis=0) unWeightedSum = unWeightedSum / numWindows # for each output variable, have ninput regressor variables # let's construct our arrays obs = np.empty(shape=(self.outSize, self.remoteSize), dtype="complex") reg = np.empty( shape=(self.outSize, self.remoteSize, self.inSize), dtype="complex" ) for i in range(0, self.outSize): for j in range(0, self.remoteSize): obs[i, j] = unWeightedSum[self.inSize + i, j] for k in range(0, self.inSize): reg[i, j, k] = unWeightedSum[k, j] # create array for output output = np.empty(shape=(self.outSize, self.inSize), dtype="complex") for i in range(0, self.outSize): observation = obs[i, :] predictors = reg[i, :, :] # save the output out, resids, scale, weights = mmestimateModel( predictors, observation, intercept=self.intercept ) if self.intercept: output[i] = out[1:] else: output[i] = out return output
[docs] def printList(self) -> List[str]: """Class information as a list of strings Returns ------- out : list List of strings with information """ textLst: List[str] = [] textLst.append("In Site = {}".format(self.inSite)) textLst.append("In Channels = {}".format(self.inChannels)) textLst.append("Out Site = {}".format(self.outSite)) textLst.append("Out Channels = {}".format(self.outChannels)) textLst.append("Remote Site = {}".format(self.remoteSite)) textLst.append("Remote Channels = {}".format(self.remoteChannels)) textLst.append("Sample frequency = {:.3f}".format(self.decParams.sampleFreq)) return textLst