Source code for resistics.calibrate.calibrator

import os
import glob
import numpy as np
import scipy.interpolate as interp
from typing import Dict, List

from resistics.common.base import ResisticsBase
from resistics.common.checks import isMagnetic
from resistics.common.math import (
    forwardFFT,
    inverseFFT,
    getFrequencyArray,
    padNextPower2,
)
from resistics.common.print import generalPrint, warningPrint, blockPrint
from resistics.calibrate.data import CalibrationData
from resistics.calibrate.io import CalibrationIO
from resistics.calibrate.utils import getKnownCalibrationFormats, getCalName
from resistics.config.io import loadConfig
from resistics.time.data import TimeData


[docs]class Calibrator(ResisticsBase): """Class for calibrating a time dataset Attributes ---------- calExt : list Accepted calibration extensions calFormats : list The calibration file formats of the respective extensions calDir : str Directory path of calibration files calFiles : list[str] List of calibration files found in calibration directory numCalFiles : int Number of found calibration files in calibration directory useTheoretical : bool Flag to use theoretical calibration function Methods ------- __init__(calDirectory) Initialise with calibration directory setCalDir(calDirectory) Set the calibration directory path findCalFiles() Find calibration files in calibration directory calibrate(timeData, sensor, serial, chopper) Calibrate time data getCalFile(sensor, serial, chopper) Get index of matching calibration flile in calFiles list calibrateChan(data, fs, calData) Calibrate an individual channel interpolateCalData(calData, f, spline) Interpolate calibration data to same frequencies as channel data getTheoreticalCalData(sensor) Get theoretical calibration data printList() Class status returned as list of strings """ def __init__(self, calDirectory: str) -> None: """Set the calibration directory and initialise Calibrator will automatically find calibration files in the provided directory Parameters ---------- calDirectory : str Path of directory containing calibration files """ self.calExt, self.calFormats = getKnownCalibrationFormats() self.calFiles: List[str] = [] self.numCalFiles = 0 self.calDir: str = calDirectory self.findCalFiles() # set whether to use theoretical calibration functions conf = loadConfig() self.extend: bool = conf["Calibration"]["extend"] self.useTheoretical: bool = conf["Calibration"]["usetheoretical"]
[docs] def setCalDir(self, calDirectory: str) -> None: """Set the calibration directory and find files Calibrator will automatically find calibration files in the provided directory Parameters ---------- calDirectory : str Path of directory containing calibration files """ self.calDir = calDirectory self.findCalFiles()
[docs] def findCalFiles(self) -> None: """Find calibration files in calibration directory""" self.calFiles.clear() # get unique extensions extensions = list(set(self.calExt)) for calExt in extensions: # get all files of format cE self.calFiles = self.calFiles + glob.glob( os.path.join(self.calDir, "*.{}".format(calExt)) ) self.numCalFiles = len(self.calFiles)
[docs] def calibrate( self, timeData: TimeData, sensor: Dict[str, str], serial: Dict[str, int], chopper: Dict[str, bool], ) -> TimeData: """Calibrate time data For each channel in timeData, searches for a matching calibration file based on sensor type, serial number and chopper. If a calibration file is found, the channel is calibrated using the data in the file. If useTheoretical is False and no file is found, the data is not calibrated todo: If no calibration file is found and the channel is a magnetic data channel, a theoretical function can be used Parameters ---------- timeData : TimeData TimeData object sensor : Dict Dictionary of sensor information with channels as the key and sensor as the value (sensor is a string) serial : Dictionary of serial information with channels as the key and sensor as the value (serial is a number) chopper : Dictionary of chopper information with channels as the key and sensor as the value (chopper is a bool) Returns ------- timeData : TimeData Calibration TimeData object """ calIO = CalibrationIO() # iterate over data for chan in timeData.chans: # output some info self.printText("Calibrating channel {}".format(chan)) # try and find calibration file calFile, calFormat = self.getCalFile( sensor[chan], serial[chan], chopper[chan] ) if calFile == "": # no file found if self.useTheoretical and isMagnetic(chan): # use theoretical calData = self.getTheoreticalCalData(sensor[chan]) timeData.data[chan] = self.calibrateChan( timeData.data[chan], timeData.sampleFreq, calData ) timeData.addComment( "Channel {} calibrated with theoretical calibration function".format( chan ) ) continue else: self.printText( "No Calibration data found - channel will not be calibrated" ) timeData.addComment("Channel {} not calibrated".format(chan)) continue # nothing to do # else file found # no need to separately apply static gain, already included in cal data calIO.refresh(calFile, calFormat, chopper=chopper[chan], extend=self.extend) calData = calIO.read() self.printText( "Calibration file found for sensor {}, serial number {}, chopper {}: {}".format( sensor[chan], serial[chan], chopper[chan], calFile ) ) self.printText("Format: {}".format(calFormat)) self.printText( "Static gain correction of {} applied to calibration data".format( calData.staticGain ) ) # calibrate time data timeData.data[chan] = self.calibrateChan( timeData.data[chan], timeData.sampleFreq, calData ) timeData.addComment( "Channel {} calibrated with calibration data from file {}".format( chan, calFile ) ) # return calibrated time data return timeData
[docs] def getCalFile(self, sensor, serial, chopper) -> int: """Get calibration file for a sensor, serial and chopper combination Parameters ---------- sensor : str Channel data serial : int Sampling frequency Hz chopper : bool Calibration data Returns ------- calFile : str The mathing calibration file calFormat : str The calibration format """ if self.numCalFiles == 0: # no calibration files to calibrate with return "", "" for extension, fileformat in zip(self.calExt, self.calFormats): # get the name for this format - there could be more than one acceptable name calNames = getCalName(fileformat, extension, sensor, serial, chopper) if calNames is None: continue # if a string rather than a list if isinstance(calNames, str): calNames = [calNames] # search to find a calibration file with that name and take the first encountered for calName in calNames: for calFile in self.calFiles: calFileExt = os.path.splitext(calFile)[1].lower() if ( calFileExt == "." + extension.lower() and calName.lower() in calFile.lower() ): return calFile, fileformat # else return empty strings return "", ""
[docs] def calibrateChan( self, data: np.ndarray, sampleFreq: float, calData: CalibrationData ) -> np.ndarray: """Calibrate a channel Perform fourier transform of channel data, deconvolve (division) sensor impulse response and inverse fourier transform. Parameters ---------- data : np.ndarray Channel data sampleFreq : float Sampling frequency Hz calData : CalibrationData Calibration data Returns ------- out : np.ndarray Calibrated channel data """ # do the forward transform dataSize = data.size # pad end of array data = np.pad(data, (0, padNextPower2(dataSize)), "constant") fftData = forwardFFT(data) f = getFrequencyArray(sampleFreq, fftData.size) # do calibration in frequency domain # interpolate calibration info to frequencies in data transferFunc = self.interpolateCalData(calData, f) # recall, zero element excluded, so start from 1 # fft zero element should really be zero because average is removed from data fftData[1:] = fftData[1:] / transferFunc # return the non padded part of the array return inverseFFT(fftData, data.size)[:dataSize]
[docs] def interpolateCalData(self, calData: CalibrationData, f: np.ndarray) -> np.ndarray: """Interpolation calibration data on to frequency points Parameters ---------- calData : CalibrationData Calibration data f : np.ndarray Frequency array where calibration data is defined Returns ------- out : np.ndarray Calibration data interpolated to the frequency array f """ # linear interpolate interpFuncMag = interp.interp1d(calData.freqs, calData.magnitude) interpFuncPhase = interp.interp1d(calData.freqs, calData.phase) return interpFuncMag(f[1:]) * np.exp(1j * interpFuncPhase(f[1:]))
[docs] def getTheoreticalCalData(self, sensor: str) -> np.ndarray: """Get theoretical calibration data for magnetic channels Parameters ---------- sensor : str Sensor type Returns ------- out : np.ndarray Theoretical calibration information """ # should use the sensor to figure out what calibration func if "mfs06" in sensor or "MFS06" in sensor: return CalibrationIO().unitCalibration()
[docs] def printList(self) -> List[str]: """Class information as a list of strings Returns ------- out : list List of strings with information """ textLst = [] textLst.append("Known extensions and calibration formats") textLst.append("Extensions:") textLst.append(", ".join(self.calExt)) textLst.append("Associated formats") textLst.append(", ".join(self.calFormats)) # now print the actual calibration files textLst.append("{} calibration files found:".format(self.numCalFiles)) if self.numCalFiles == 0: textLst.append("\t\tNo calibration files found") else: for calFile in self.calFiles: textLst.append("\t\t{}".format(calFile)) return textLst