Source code for resistics.time.reader_lemib423e

import os
import glob
import struct
from datetime import datetime, timedelta
import numpy as np
from typing import List, Tuple, Dict, Any

from resistics.time.reader_lemib423 import (
from resistics.time.reader_internal import TimeReaderInternal
from import TimeData
from resistics.common.checks import isMagnetic, isElectric, consistentChans
from resistics.common.print import blockPrint
from resistics.time.clean import removeZeros, removeZerosSingle, removeNansSingle

[docs]def folderB423EHeaders( folderpath: str, sampleFreq: float, ex: str = "E1", ey: str = "E2", dx: float = 1, dy: float = 1, folders: List = [], ) -> None: """Construct B423E headers for subfolders of a folder Parameters ---------- folderpath : str The path to the folder sampleFreq : float The sampling frequency of the data ex : str, optional The channel E1, E2, E3 or E4 in the data that represents Ex. Default E1. ey : str, optional The channel E1, E2, E3 or E4 in the data that represents Ey. Default E2. dx : float, optional Distance between x electrodes dy : float, optional Distance between y electrodes folder : List, optional An optional list of subfolders """ if len(folders) == 0: folders = [f.path for f in os.scandir(folderpath) if f.is_dir()] # now construct headers for each folder for folder in folders: measB423EHeaders(folder, sampleFreq, ex=ex, ey=ey, dx=dx, dy=dy)
[docs]def measB423EHeaders( datapath: str, sampleFreq: float, ex: str = "E1", ey: str = "E2", dx: float = 1, dy: float = 1, ) -> None: """Read a single B423 measurement directory and construct headers Parameters ---------- site : str The path to the site sampleFreq : float The sampling frequency of the data ex : str, optional The channel E1, E2, E3 or E4 in the data that represents Ex. Default E1. ey : str, optional The channel E1, E2, E3 or E4 in the data that represents Ey. Default E2. dx : float, optional Distance between x electrodes dy : float, optional Distance between y electrodes """ from resistics.common.print import generalPrint, warningPrint, errorPrint from resistics.time.writer import TimeWriter dataFiles = glob.glob(os.path.join(datapath, "*.B423")) dataFilenames = [os.path.basename(dFile) for dFile in dataFiles] starts = [] stops = [] cumSamples = 0 for idx, dFile in enumerate(dataFiles): generalPrint("constructB423EHeaders", "Reading data file {}".format(dFile)) dataHeaders, firstDatetime, lastDatetime, numSamples = readB423Params( dFile, sampleFreq, 1024, 26 ) print(dataHeaders) generalPrint( "constructB423EHeaders", "start time = {}, end time = {}".format(firstDatetime, lastDatetime), ) generalPrint( "constructB423EHeaders", "number of samples = {}".format(numSamples) ) cumSamples += numSamples starts.append(firstDatetime) stops.append(lastDatetime) # now need to search for any missing data sampleTime = timedelta(seconds=1.0 / sampleFreq) # sort by start times sortIndices = sorted(list(range(len(starts))), key=lambda k: starts[k]) check = True for i in range(1, len(dataFiles)): # get the stop time of the previous dataset stopTimePrev = stops[sortIndices[i - 1]] startTimeNow = starts[sortIndices[i]] if startTimeNow != stopTimePrev + sampleTime: warningPrint( "constructB423EHeaders", "There is a gap between the datafiles" ) warningPrint( "constructB423EHeaders", "Please separate out datasets with gaps into separate folders", ) warningPrint("constructB423EHeaders", "Gap found between datafiles:") warningPrint( "constructB423EHeaders", "1. {}".format(dataFiles[sortIndices[i - 1]]) ) warningPrint( "constructB423EHeaders", "2. {}".format(dataFiles[sortIndices[i]]) ) check = False # if did not pass check, then exit if not check: errorPrint( "constructB423EHeaders", "All data for a single recording must be continuous.", quitRun=True, ) # time of first and last sample datetimeStart = starts[sortIndices[0]] datetimeStop = stops[sortIndices[-1]] # global headers globalHeaders = { "sample_freq": sampleFreq, "num_samples": cumSamples, "start_time": datetimeStart.strftime("%H:%M:%S.%f"), "start_date": datetimeStart.strftime("%Y-%m-%d"), "stop_time": datetimeStop.strftime("%H:%M:%S.%f"), "stop_date": datetimeStop.strftime("%Y-%m-%d"), "meas_channels": 4, } writer = TimeWriter() globalHeaders = writer.setGlobalHeadersFromKeywords({}, globalHeaders) # channel headers channels = ["E1", "E2", "E3", "E4"] chanMap = {"E1": 0, "E2": 1, "E3": 2, "E4": 3} sensors = {"E1": "0", "E2": "0", "E3": "0", "E4": "0"} posX2 = {"E1": 1, "E2": 1, "E3": 1, "E4": 1} posY2 = {"E1": 1, "E2": 1, "E3": 1, "E4": 1} posX2[ex] = dx posY2[ey] = dy chanHeaders = [] for chan in channels: # sensor serial number cHeader = dict(globalHeaders) cHeader["ats_data_file"] = ", ".join(dataFilenames) if ex == chan: cHeader["channel_type"] = "Ex" elif ey == chan: cHeader["channel_type"] = "Ey" else: cHeader["channel_type"] = chan cHeader["scaling_applied"] = False cHeader["ts_lsb"] = 1 cHeader["gain_stage1"] = 1 cHeader["gain_stage2"] = 1 cHeader["hchopper"] = 0 cHeader["echopper"] = 0 cHeader["pos_x1"] = 0 cHeader["pos_x2"] = posX2[chan] cHeader["pos_y1"] = 0 cHeader["pos_y2"] = posY2[chan] cHeader["pos_z1"] = 0 cHeader["pos_z2"] = 1 cHeader["sensor_sernum"] = sensors[chan] chanHeaders.append(cHeader) chanHeaders = writer.setChanHeadersFromKeywords(chanHeaders, {}) writer.setOutPath(datapath) writer.writeHeaders( globalHeaders, channels, chanMap, chanHeaders, rename=False, ext="h423E" )
[docs]class TimeReaderLemiB423E(TimeReaderLemiB423): """Data reader for Lemi B423E data Lemi B423E data has the following characteristics: - 1024 bytes of ASCII headers in the data file with basic scaling information - There is no separate header file for Lemi B423E data. No information for number of samples, sampling rate etc - Header files need to be constructed before Lemi B423E data can be read in by resistics. There are helper methods to do this. - Lemi B423 raw measurement data is signed long integer format - Getting unscaled samples returns data with unit count for electric fields. - Optionally, a scaling can be applied for unscaled samples which returns electric fields in microvolt. In situations where a Lemi B423E dataset is recorded in multiple files, it is required that the recording is continuous. Attributes ---------- recChannels : Dict Channels in each data file dtype : np.float32 The data type numHeaderFiles : int The number of header files numDataFiles : int The number of data files Methods ------- setParameters() Set parameters specific to a data format getPhysicalSamples(**kwargs) Get data in physical units getScalars(paramsDict) Get the scalars for each channel as given in the data files """
[docs] def setParameters(self) -> None: """Set some data reader parameters for reading Lemi B423E data""" # get a list of the header and data files in the folder self.headerExt = "h423E" self.headerF = glob.glob( os.path.join(self.dataPath, "*.{}".format(self.headerExt)) ) self.dataF = glob.glob(os.path.join(self.dataPath, "*.B423")) # data byte information self.dataByteSize = 4 self.recordByteSize = 26 self.dataByteOffset = 1024 # data type self.dtype = np.int_ # get the number of data files and header files - this should be equal self.numHeaderFiles: int = len(self.headerF) self.numDataFiles: int = len(self.dataF)
[docs] def getPhysicalSamples(self, **kwargs): """Get data scaled to physical values resistics uses field units, meaning physical samples will return the following: - Electrical channels in mV/km - Magnetic channels in mV - To get magnetic fields in nT, calibration needs to be performed Notes ----- Once Lemi B423E data is scaled (which optionally happens in getUnscaledSamples), the electric channels are in uV (micro volts). Therefore: - Electric channels need to divided by 1000 along with dipole length division in km (east-west spacing and north-south spacing) to return mV/km. Parameters ---------- chans : List[str] List of channels to return if not all are required startSample : int First sample to return endSample : int Last sample to return remaverage : bool Remove average from the data remzeros : bool Remove zeroes from the data remnans: bool Remove NanNs from the data Returns ------- TimeData Time data object """ # initialise chans, startSample and endSample with the whole dataset options = self.parseGetDataKeywords(kwargs) # get unscaled data but with gain scalings applied timeData = self.getUnscaledSamples( chans=options["chans"], startSample=options["startSample"], endSample=options["endSample"], scale=True, ) # convert to field units and divide by dipole lengths for chan in options["chans"]: # divide by the 1000 to convert electric channels from microvolt to millivolt[chan] =[chan] / 1000.0 timeData.addComment( "Dividing channel {} by 1000 to convert microvolt to millivolt".format( chan ) ) if chan == "Ex": # multiply by 1000/self.getChanDx same as dividing by dist in km[chan] = ( 1000.0 *[chan] / self.getChanDx(chan) ) timeData.addComment( "Dividing channel {} by electrode distance {} km to give mV/km".format( chan, self.getChanDx(chan) / 1000.0 ) ) if chan == "Ey": # multiply by 1000/self.getChanDy same as dividing by dist in km[chan] = 1000 *[chan] / self.getChanDy(chan) timeData.addComment( "Dividing channel {} by electrode distance {} km to give mV/km".format( chan, self.getChanDy(chan) / 1000.0 ) ) # if remove zeros - False by default if options["remzeros"]:[chan] = removeZerosSingle([chan]) # if remove nans - False by default if options["remnans"]:[chan] = removeNansSingle([chan]) # remove the average from the data - True by default if options["remaverage"]:[chan] =[chan] - np.average([chan] ) # add comments timeData.addComment( "Remove zeros: {}, remove nans: {}, remove average: {}".format( options["remzeros"], options["remnans"], options["remaverage"] ) ) return timeData
[docs] def getScalars(self, paramsDict: Dict) -> Dict: """Returns the scalars from a parameter dictionary Parameters ---------- paramsDict : Dict The parameter dictionary for a data file usually read from the headers in the file Returns ------- Dict Dictionary with channels as keys and scalings as values """ # need to get the channel orders here chans = [] for cH in self.chanHeaders: chans.append(cH["channel_type"]) return { chans[0]: [paramsDict["Ke1"], paramsDict["Ae1"]], chans[1]: [paramsDict["Ke2"], paramsDict["Ae2"]], chans[2]: [paramsDict["Ke3"], paramsDict["Ae3"]], chans[3]: [paramsDict["Ke4"], paramsDict["Ae4"]], }