Source code for resistics.spectra.calculator

import numpy as np
import scipy.signal as signal
import pyfftw
from typing import List, Dict, Union

from resistics.common.base import ResisticsBase
from resistics.spectra.data import SpectrumData
from resistics.time.data import TimeData
from resistics.config.io import loadConfig


[docs]class SpectrumCalculator(ResisticsBase): """Class for calculating spectra from time data windows The class requires the pyfftw and FFTW3 libraries, which allow for fast fourier transforms for spectra calculations Attributes ---------- sampleFreq : float Sample frequency of the data numSamples : int The number of samples in each window window : bool (true) Flag for applying a window function before fourier transform windowFunc : np.ndarray Window function to apply to time data before fourier transform dataArray : np.ndarray numpy array to copy data into before performing fourier transform ffotObj : pyfftw.FFTW fourier transform object Methods ------- __init__(sampleFreq, winSamples, config=None) Initialise with time data sampling frequency and the number of samples in a window calcFourierTransform(timeData) Perform fourier transform for timeData and return specData object printList() Class status returned as list of strings """ def __init__( self, sampleFreq: float, winSamples: int, config: Union[Dict, None] = None ) -> None: """Initialise Parameters ---------- sampleFreq : float Sampling frequency of time data winSamples : int Number of samples in a window """ self.sampleFreq: float = sampleFreq self.numSamples: int = winSamples if config is None: config = loadConfig() self.window: str = config["Spectra"]["windowfunc"] self.applywindow: bool = config["Spectra"]["applywindow"] self.windowFunc: np.ndarray = signal.get_window( config["Spectra"]["windowfunc"], winSamples ) # create an pyfftw plan self.dataArray: np.ndarray = pyfftw.empty_aligned( self.numSamples, dtype="float64" ) self.fftObj: pyfftw.FFTW = pyfftw.builders.rfft(self.dataArray)
[docs] def calcFourierCoeff(self, timeData: TimeData) -> SpectrumData: """Fast fourier transform of timeData Parameters ---------- timeData : TimeData A TimeData object Returns ------- specData : SpectrumData A SpectrumData object """ fftData: Dict = {} for c in timeData.chans: # first copy data into dataArray self.dataArray[:] = timeData.data[c][:] # no need to pad, these are usually multiples of two # detrend and apply window function if set self.dataArray[:] = signal.detrend(self.dataArray, type="linear") if self.applywindow: self.dataArray[:] = self.dataArray * self.windowFunc # use pytfftw here fftData[c] = np.array(self.fftObj()) # calculate frequency array dataSize = fftData[timeData.chans[0]].size # comments specComments = [] if self.applywindow: specComments.append( "Time data multiplied by {} window function".format(self.window) ) specComments.append( "Fourier transform performed, time data size = {}, spectra data size = {}".format( self.numSamples, dataSize ) ) return SpectrumData( windowSize=self.numSamples, dataSize=dataSize, sampleFreq=timeData.sampleFreq, startTime=timeData.startTime, stopTime=timeData.stopTime, data=fftData, comments=timeData.comments + specComments, )
[docs] def printList(self) -> List: """Class information as a list of strings Returns ------- out : list List of strings with information """ textLst = ["Sample frequency = {:f} [Hz]".format(self.sampleFreq)] return textLst