Source code for resistics.time.reader_lemib423e
import os
import glob
import struct
from datetime import datetime, timedelta
import numpy as np
from typing import List, Tuple, Dict, Any
from resistics.time.reader_lemib423 import (
TimeReaderLemiB423,
readB423Params,
readB423Header,
)
from resistics.time.reader_internal import TimeReaderInternal
from resistics.time.data import TimeData
from resistics.common.checks import isMagnetic, isElectric, consistentChans
from resistics.common.print import blockPrint
from resistics.time.clean import removeZeros, removeZerosSingle, removeNansSingle
[docs]def folderB423EHeaders(
folderpath: str,
sampleFreq: float,
ex: str = "E1",
ey: str = "E2",
dx: float = 1,
dy: float = 1,
folders: List = [],
) -> None:
"""Construct B423E headers for subfolders of a folder
Parameters
----------
folderpath : str
The path to the folder
sampleFreq : float
The sampling frequency of the data
ex : str, optional
The channel E1, E2, E3 or E4 in the data that represents Ex. Default E1.
ey : str, optional
The channel E1, E2, E3 or E4 in the data that represents Ey. Default E2.
dx : float, optional
Distance between x electrodes
dy : float, optional
Distance between y electrodes
folder : List, optional
An optional list of subfolders
"""
if len(folders) == 0:
folders = [f.path for f in os.scandir(folderpath) if f.is_dir()]
# now construct headers for each folder
for folder in folders:
measB423EHeaders(folder, sampleFreq, ex=ex, ey=ey, dx=dx, dy=dy)
[docs]def measB423EHeaders(
datapath: str,
sampleFreq: float,
ex: str = "E1",
ey: str = "E2",
dx: float = 1,
dy: float = 1,
) -> None:
"""Read a single B423 measurement directory and construct headers
Parameters
----------
site : str
The path to the site
sampleFreq : float
The sampling frequency of the data
ex : str, optional
The channel E1, E2, E3 or E4 in the data that represents Ex. Default E1.
ey : str, optional
The channel E1, E2, E3 or E4 in the data that represents Ey. Default E2.
dx : float, optional
Distance between x electrodes
dy : float, optional
Distance between y electrodes
"""
from resistics.common.print import generalPrint, warningPrint, errorPrint
from resistics.time.writer import TimeWriter
dataFiles = glob.glob(os.path.join(datapath, "*.B423"))
dataFilenames = [os.path.basename(dFile) for dFile in dataFiles]
starts = []
stops = []
cumSamples = 0
for idx, dFile in enumerate(dataFiles):
generalPrint("constructB423EHeaders", "Reading data file {}".format(dFile))
dataHeaders, firstDatetime, lastDatetime, numSamples = readB423Params(
dFile, sampleFreq, 1024, 26
)
print(dataHeaders)
generalPrint(
"constructB423EHeaders",
"start time = {}, end time = {}".format(firstDatetime, lastDatetime),
)
generalPrint(
"constructB423EHeaders", "number of samples = {}".format(numSamples)
)
cumSamples += numSamples
starts.append(firstDatetime)
stops.append(lastDatetime)
# now need to search for any missing data
sampleTime = timedelta(seconds=1.0 / sampleFreq)
# sort by start times
sortIndices = sorted(list(range(len(starts))), key=lambda k: starts[k])
check = True
for i in range(1, len(dataFiles)):
# get the stop time of the previous dataset
stopTimePrev = stops[sortIndices[i - 1]]
startTimeNow = starts[sortIndices[i]]
if startTimeNow != stopTimePrev + sampleTime:
warningPrint(
"constructB423EHeaders", "There is a gap between the datafiles"
)
warningPrint(
"constructB423EHeaders",
"Please separate out datasets with gaps into separate folders",
)
warningPrint("constructB423EHeaders", "Gap found between datafiles:")
warningPrint(
"constructB423EHeaders", "1. {}".format(dataFiles[sortIndices[i - 1]])
)
warningPrint(
"constructB423EHeaders", "2. {}".format(dataFiles[sortIndices[i]])
)
check = False
# if did not pass check, then exit
if not check:
errorPrint(
"constructB423EHeaders",
"All data for a single recording must be continuous.",
quitRun=True,
)
# time of first and last sample
datetimeStart = starts[sortIndices[0]]
datetimeStop = stops[sortIndices[-1]]
# global headers
globalHeaders = {
"sample_freq": sampleFreq,
"num_samples": cumSamples,
"start_time": datetimeStart.strftime("%H:%M:%S.%f"),
"start_date": datetimeStart.strftime("%Y-%m-%d"),
"stop_time": datetimeStop.strftime("%H:%M:%S.%f"),
"stop_date": datetimeStop.strftime("%Y-%m-%d"),
"meas_channels": 4,
}
writer = TimeWriter()
globalHeaders = writer.setGlobalHeadersFromKeywords({}, globalHeaders)
# channel headers
channels = ["E1", "E2", "E3", "E4"]
chanMap = {"E1": 0, "E2": 1, "E3": 2, "E4": 3}
sensors = {"E1": "0", "E2": "0", "E3": "0", "E4": "0"}
posX2 = {"E1": 1, "E2": 1, "E3": 1, "E4": 1}
posY2 = {"E1": 1, "E2": 1, "E3": 1, "E4": 1}
posX2[ex] = dx
posY2[ey] = dy
chanHeaders = []
for chan in channels:
# sensor serial number
cHeader = dict(globalHeaders)
cHeader["ats_data_file"] = ", ".join(dataFilenames)
if ex == chan:
cHeader["channel_type"] = "Ex"
elif ey == chan:
cHeader["channel_type"] = "Ey"
else:
cHeader["channel_type"] = chan
cHeader["scaling_applied"] = False
cHeader["ts_lsb"] = 1
cHeader["gain_stage1"] = 1
cHeader["gain_stage2"] = 1
cHeader["hchopper"] = 0
cHeader["echopper"] = 0
cHeader["pos_x1"] = 0
cHeader["pos_x2"] = posX2[chan]
cHeader["pos_y1"] = 0
cHeader["pos_y2"] = posY2[chan]
cHeader["pos_z1"] = 0
cHeader["pos_z2"] = 1
cHeader["sensor_sernum"] = sensors[chan]
chanHeaders.append(cHeader)
chanHeaders = writer.setChanHeadersFromKeywords(chanHeaders, {})
writer.setOutPath(datapath)
writer.writeHeaders(
globalHeaders, channels, chanMap, chanHeaders, rename=False, ext="h423E"
)
[docs]class TimeReaderLemiB423E(TimeReaderLemiB423):
"""Data reader for Lemi B423E data
Lemi B423E data has the following characteristics:
- 1024 bytes of ASCII headers in the data file with basic scaling information
- There is no separate header file for Lemi B423E data. No information for number of samples, sampling rate etc
- Header files need to be constructed before Lemi B423E data can be read in by resistics. There are helper methods to do this.
- Lemi B423 raw measurement data is signed long integer format
- Getting unscaled samples returns data with unit count for electric fields.
- Optionally, a scaling can be applied for unscaled samples which returns electric fields in microvolt.
In situations where a Lemi B423E dataset is recorded in multiple files, it is required that the recording is continuous.
Attributes
----------
recChannels : Dict
Channels in each data file
dtype : np.float32
The data type
numHeaderFiles : int
The number of header files
numDataFiles : int
The number of data files
Methods
-------
setParameters()
Set parameters specific to a data format
getPhysicalSamples(**kwargs)
Get data in physical units
getScalars(paramsDict)
Get the scalars for each channel as given in the data files
"""
[docs] def setParameters(self) -> None:
"""Set some data reader parameters for reading Lemi B423E data"""
# get a list of the header and data files in the folder
self.headerExt = "h423E"
self.headerF = glob.glob(
os.path.join(self.dataPath, "*.{}".format(self.headerExt))
)
self.dataF = glob.glob(os.path.join(self.dataPath, "*.B423"))
# data byte information
self.dataByteSize = 4
self.recordByteSize = 26
self.dataByteOffset = 1024
# data type
self.dtype = np.int_
# get the number of data files and header files - this should be equal
self.numHeaderFiles: int = len(self.headerF)
self.numDataFiles: int = len(self.dataF)
[docs] def getPhysicalSamples(self, **kwargs):
"""Get data scaled to physical values
resistics uses field units, meaning physical samples will return the following:
- Electrical channels in mV/km
- Magnetic channels in mV
- To get magnetic fields in nT, calibration needs to be performed
Notes
-----
Once Lemi B423E data is scaled (which optionally happens in getUnscaledSamples), the electric channels are in uV (micro volts). Therefore:
- Electric channels need to divided by 1000 along with dipole length division in km (east-west spacing and north-south spacing) to return mV/km.
Parameters
----------
chans : List[str]
List of channels to return if not all are required
startSample : int
First sample to return
endSample : int
Last sample to return
remaverage : bool
Remove average from the data
remzeros : bool
Remove zeroes from the data
remnans: bool
Remove NanNs from the data
Returns
-------
TimeData
Time data object
"""
# initialise chans, startSample and endSample with the whole dataset
options = self.parseGetDataKeywords(kwargs)
# get unscaled data but with gain scalings applied
timeData = self.getUnscaledSamples(
chans=options["chans"],
startSample=options["startSample"],
endSample=options["endSample"],
scale=True,
)
# convert to field units and divide by dipole lengths
for chan in options["chans"]:
# divide by the 1000 to convert electric channels from microvolt to millivolt
timeData.data[chan] = timeData.data[chan] / 1000.0
timeData.addComment(
"Dividing channel {} by 1000 to convert microvolt to millivolt".format(
chan
)
)
if chan == "Ex":
# multiply by 1000/self.getChanDx same as dividing by dist in km
timeData.data[chan] = (
1000.0 * timeData.data[chan] / self.getChanDx(chan)
)
timeData.addComment(
"Dividing channel {} by electrode distance {} km to give mV/km".format(
chan, self.getChanDx(chan) / 1000.0
)
)
if chan == "Ey":
# multiply by 1000/self.getChanDy same as dividing by dist in km
timeData.data[chan] = 1000 * timeData.data[chan] / self.getChanDy(chan)
timeData.addComment(
"Dividing channel {} by electrode distance {} km to give mV/km".format(
chan, self.getChanDy(chan) / 1000.0
)
)
# if remove zeros - False by default
if options["remzeros"]:
timeData.data[chan] = removeZerosSingle(timeData.data[chan])
# if remove nans - False by default
if options["remnans"]:
timeData.data[chan] = removeNansSingle(timeData.data[chan])
# remove the average from the data - True by default
if options["remaverage"]:
timeData.data[chan] = timeData.data[chan] - np.average(
timeData.data[chan]
)
# add comments
timeData.addComment(
"Remove zeros: {}, remove nans: {}, remove average: {}".format(
options["remzeros"], options["remnans"], options["remaverage"]
)
)
return timeData
[docs] def getScalars(self, paramsDict: Dict) -> Dict:
"""Returns the scalars from a parameter dictionary
Parameters
----------
paramsDict : Dict
The parameter dictionary for a data file usually read from the headers in the file
Returns
-------
Dict
Dictionary with channels as keys and scalings as values
"""
# need to get the channel orders here
chans = []
for cH in self.chanHeaders:
chans.append(cH["channel_type"])
return {
chans[0]: [paramsDict["Ke1"], paramsDict["Ae1"]],
chans[1]: [paramsDict["Ke2"], paramsDict["Ae2"]],
chans[2]: [paramsDict["Ke3"], paramsDict["Ae3"]],
chans[3]: [paramsDict["Ke4"], paramsDict["Ae4"]],
}