import os
import glob
import struct
from datetime import datetime, timedelta
import numpy as np
from typing import List, Tuple, Dict, Any
from resistics.time.reader_internal import TimeReaderInternal
from resistics.time.data import TimeData
from resistics.common.checks import isMagnetic, isElectric, consistentChans
from resistics.common.print import blockPrint
from resistics.time.clean import removeZeros, removeZerosSingle, removeNansSingle
[docs]def readB423Params(
dataFile: str, sampleFreq: float, dataByteOffset: int, recordByteSize: int
):
"""Get the parameters of the B423 data file
Parameters
----------
dataFile : str
The data file as a string
sampleFreq : float
The sampling frequency in Hz
dataByteOffset : int
The offset till the start of the data in bytes
recordByteSize : int
The size in bytes of a record
Returns
-------
headers : dict
The header values as a dictionary
firstDatetime : datetime
The time of the first sample in the data file
lastDatetime : datetime
The time of the last sample in the data file
numSamples : int
The number of samples in the data file given the sampling frequency
"""
from resistics.common.print import errorPrint
filesize = os.path.getsize(dataFile)
numSamples: float = (filesize - dataByteOffset) / recordByteSize
if not numSamples.is_integer():
errorPrint(
"readB423Params",
"Non-integer number of samples. Maybe the sampling frequency is incorrect",
quitRun=True,
)
else:
numSamples = int(numSamples)
f = open(dataFile, "rb")
hdrBytes = f.read(dataByteOffset)
dataHeaders = readB423Header(hdrBytes)
# read the first record and get the timestamp
bts = f.read(6)
firstTimestamp = struct.unpack("L", bts[0:4])[0]
firstSampleNum = struct.unpack("H", bts[4:])[0]
firstDatetime = datetime.utcfromtimestamp(
firstTimestamp + (firstSampleNum / sampleFreq)
)
# now seek to the end
f.seek(filesize - recordByteSize)
bts = f.read(6)
lastTimestamp = struct.unpack("L", bts[0:4])[0]
lastSampleNum = struct.unpack("H", bts[4:])[0]
lastDatetime = datetime.utcfromtimestamp(
lastTimestamp + (lastSampleNum / sampleFreq)
)
# now check number of samples based on datetimes
numSeconds = np.timedelta64(lastDatetime - firstDatetime) / np.timedelta64(1, "s")
# +1 to numSamplesCalc because needs to be inclusive of the end
numSamplesCalc = int(numSeconds * sampleFreq) + 1
if numSamplesCalc != numSamples:
errorPrint("readB423Params", "There is a gap in data file {}".format(dataFile))
errorPrint("readB423Params", "No gaps allowed within data files")
errorPrint(
"readB423Params", "Please remove this file from the recording", quitRun=True
)
return dataHeaders, firstDatetime, lastDatetime, numSamples
[docs]class TimeReaderLemiB423(TimeReaderInternal):
"""Data reader for Lemi B423 data
Lemi B423 data has the following characteristics:
- 1024 bytes of ASCII headers in the data file with basic scaling information
- There is no separate header file for Lemi B423 data. No information for number of samples, sampling rate etc
- Header files need to be constructed before Lemi B423 data can be read in by resistics. There are helper methods to do this
- Lemi B423 raw measurement data is signed long integer format
- Getting unscaled samples returns data with unit count for both the electric and magnetic fields.
- Scalings specified in B423 files convert electric channels to uV (microvolt) and magnetic channels to millivolts with internal gain still applied
- Getting physical samples converts the electric channels to mV/km by dividing the uV by 1000 and then dividing by the dipole length in km
- For the magnetic channels, the gain is removed to give the magnetic measurements in mV. Calibrating these will give measurements in nT
In situations where a Lemi B423 dataset is recorded in multiple files, it is required that the recording is continuous.
Attributes
----------
dtype : np.int_
The data type, a long integer
recordByteSize : int
The size of a record in bytes
numHeaderFiles : int
The number of header files
numDataFiles : int
The number of data files
Methods
-------
setParameters()
Set parameters specific to a data format
getUnscaledSamples(**kwargs)
Get raw, unscaled data
getDataFilesForSamples(startSample, endSample)
Get the data files that contribute to the request data
readRecords(bts, numRecords)
Read the data records from a provided bytes object
getPhysicalSamples(**kwargs)
Get data in physical units
def readHeader()
Read the B423 measurement file headers
readMeasParams(sampleFreq)
Calculate out the a B423 parameters including start and end date and number of samples
getScalars(paramsDict)
Get the scalars for each channel as given in the data files
printDataFileList()
Get data file information as a list of strings
printDataFiles()
Print data file information to terminal
"""
[docs] def setParameters(self) -> None:
"""Set some data reader parameters for reading Lemi B423 data"""
# get a list of the header and data files in the folder
self.headerExt = "h423"
self.headerF = glob.glob(
os.path.join(self.dataPath, "*.{}".format(self.headerExt))
)
self.dataF = glob.glob(os.path.join(self.dataPath, "*.B423"))
# data byte information
self.dataByteSize = 4
self.recordByteSize = 30
self.dataByteOffset = 1024
# data type
self.dtype = np.int_
# get the number of data files and header files - this should be equal
self.numHeaderFiles: int = len(self.headerF)
self.numDataFiles: int = len(self.dataF)
[docs] def getUnscaledSamples(self, **kwargs) -> TimeData:
"""Get raw data from data file, returned in mV
Lemi B423 data always has five channels, in order Hx, Hy, Hz, Ex, Ey. The raw data is integer counts. Therefore, getting unscaled samples returns raw counts for the measurement. There are additional scalings which can be applied using the scale optional argument. Lemi B423 is recorded in multiple files. It has not been verified whether it is possible for each individual file to have different scaling.
Without the scale option, the data is returned in:
- Counts for both magnetic and electric channels (reading long integers)
With the scaling option, the data is returned in:
- microvolts for the electric channels
- millivolts for the magnetic with the gain applied
Applying the scaling does not appear to remove the internal gain of the Lemi. This will be removed when getting physical samples and the appropriate value must be set in the headers.
Parameters
----------
chans : List[str], optional
List of channels to return if not all are required
startSample : int, optional
First sample to return
endSample : int, optional
Last sample to return
scale : bool, optional
Boolean flag for applying the gain scaling
Returns
-------
TimeData
Time data object
"""
# initialise chans, startSample and endSample with the whole dataset
options = self.parseGetDataKeywords(kwargs)
# get the files to read and the samples to take from them, in the correct order
dataFilesToRead, samplesToRead, scalings = self.getDataFilesForSamples(
options["startSample"], options["endSample"]
)
numSamples = options["endSample"] - options["startSample"] + 1
# set up the dictionary to hold the data
dtype = np.float32 if options["scale"] else self.dtype
data = {}
for chan in options["chans"]:
data[chan] = np.zeros(shape=(numSamples), dtype=dtype)
# prepare comments
startTime, stopTime = self.sample2time(
options["startSample"], options["endSample"]
)
comments = []
comments.append(
"Unscaled data {} to {} read in from measurement {}, samples {} to {}".format(
startTime,
stopTime,
self.dataPath,
options["startSample"],
options["endSample"],
)
)
comments.append("Sampling frequency {}".format(self.getSampleFreq()))
comments.append("Data read from {} files in total".format(len(dataFilesToRead)))
comments.append("Scaling = {}".format(options["scale"]))
# loop through chans and get data
sampleCounter = 0
for dFile, sToRead, scalar in zip(dataFilesToRead, samplesToRead, scalings):
# calculate the starting byte and the number of bytes to read
byteReadStart = self.dataByteOffset + sToRead[0] * self.recordByteSize
dSamples = sToRead[1] - sToRead[0] + 1
dSamplesRead = dSamples * self.getNumChannels()
bytesToRead = dSamples * self.recordByteSize
# read
dFileHandle = open(dFile, "rb")
dFileHandle.seek(byteReadStart, 0) # seek to start byte from start of file
dataBytes = dFileHandle.read(bytesToRead)
dFileHandle.close()
dataRead = self.readRecords(dataBytes, dSamples)
# now need to unpack this
for chan in options["chans"]:
# check to make sure channel exists
self.checkChan(chan)
# get the channel index - the chanIndex should give the right order in the data file
chanIndex = self.chanMap[chan]
# use the range sampleCounter -> sampleCounter + dSamples, because this actually means sampleCounter + dSamples - 1 as python ranges are not inclusive of the end value
data[chan][sampleCounter : sampleCounter + dSamples] = dataRead[
chanIndex : dSamplesRead : self.getNumChannels()
]
if options["scale"]:
data[chan][sampleCounter : sampleCounter + dSamples] = (
data[chan][sampleCounter : sampleCounter + dSamples]
* scalar[chan][0]
+ scalar[chan][1]
)
comments.append(
"Scaling channel {} of file {} with multiplier {} and adding {}".format(
chan, dFile, scalar[chan][0], scalar[chan][1]
)
)
# increment sample counter
sampleCounter = sampleCounter + dSamples # get ready for the next data read
# return data
return TimeData(
sampleFreq=self.getSampleFreq(),
startTime=startTime,
stopTime=stopTime,
data=data,
comments=comments,
)
[docs] def getDataFilesForSamples(
self, startSample: int, endSample: int
) -> Tuple[List[str], List[List[int]], List[float]]:
"""Get the data files that have to be read to cover the sample range
Parameters
----------
startSample : int
Starting sample of the sample range
endSamples : int
Ending sample of the sample range
Returns
-------
dataFilesToRead
Time data object
"""
# have the datafiles saved in sample order beginning with the earliest first
# go through each datafile and find the range to be read
dataFilesToRead = []
samplesToRead = []
scalings = []
for idx, dFile in enumerate(self.dataFileList):
fileStartSamp = self.dataRanges[idx][0]
fileEndSamp = self.dataRanges[idx][1]
if fileStartSamp > endSample or fileEndSamp < startSample:
continue # nothing to read from this file
# in this case, there is some overlap with the samples to read
dataFilesToRead.append(dFile)
readFrom = 0 # i.e. the first sample in the datafile
readTo = fileEndSamp - fileStartSamp # this the last sample in the file
if fileStartSamp < startSample:
readFrom = startSample - fileStartSamp
if fileEndSamp > endSample:
readTo = endSample - fileStartSamp
# this is an inclusive number readFrom to readTo including readTo
samplesToRead.append([readFrom, readTo])
scalings.append(self.scalings[idx])
return dataFilesToRead, samplesToRead, scalings
[docs] def readRecords(self, bts: bytes, numRecords: int):
"""Read a number of B423 records from bytes
Parameters
----------
bts : bytes
The bytes to be read
numRecords : int
The number of records to read. Size of bytes should be numRecords * recordByteSize
Returns
-------
data : np.ndarray
Array holding the data. The data repeats in channel blocks e.g. [Hx, Hy, Hx, Ex, Ey][Hx, Hy, Hx, Ex, Ey]
"""
numChans = self.getNumChannels()
structStr = "{:d}l".format(numChans)
data = np.zeros(shape=(numRecords * numChans), dtype=self.dtype)
for ii in range(0, numRecords):
recordStart = ii * self.recordByteSize
readBytes = bts[recordStart : recordStart + self.recordByteSize]
recordValues = np.array(
struct.unpack(structStr, readBytes[6:-4]), dtype=np.int_
)
dataStart = ii * numChans
data[dataStart : dataStart + numChans] = recordValues
return data
[docs] def getPhysicalSamples(self, **kwargs):
"""Get data scaled to physical values
resistics uses field units, meaning physical samples will return the following:
- Electrical channels in mV/km
- Magnetic channels in mV
- To get magnetic fields in nT, calibration needs to be performed
Notes
-----
Once Lemi B423 data is scaled (which optionally happens in getUnscaledSamples), the magnetic channels is in mV with gain applied and the electric channels is uV (microvolts). Therefore:
- Electric channels need to divided by 1000 along with dipole length division in km (east-west spacing and north-south spacing) to return mV/km.
- Magnetic channels need to be divided by the internal gain value which should be set in the headers
To get magnetic fields in nT, they have to be calibrated.
Parameters
----------
chans : List[str]
List of channels to return if not all are required
startSample : int
First sample to return
endSample : int
Last sample to return
remaverage : bool
Remove average from the data
remzeros : bool
Remove zeroes from the data
remnans: bool
Remove NanNs from the data
Returns
-------
TimeData
Time data object
"""
# initialise chans, startSample and endSample with the whole dataset
options = self.parseGetDataKeywords(kwargs)
# get unscaled data but with gain scalings applied
timeData = self.getUnscaledSamples(
chans=options["chans"],
startSample=options["startSample"],
endSample=options["endSample"],
scale=True,
)
# convert to field units and divide by dipole lengths
for chan in options["chans"]:
if isElectric(chan):
timeData.data[chan] = timeData.data[chan] / 1000.0
timeData.addComment("Dividing channel {} by 1000 to convert microvolt to millivolt".format(chan))
if isMagnetic(chan):
timeData.data[chan] = timeData.data[chan] / self.getChanGain1(chan)
timeData.addComment("Removing gain of {} from channel {}".format(self.getChanGain1(chan), chan))
if chan == "Ex":
# multiply by 1000/self.getChanDx same as dividing by dist in km
timeData.data[chan] = (
1000.0 * timeData.data[chan] / self.getChanDx(chan)
)
timeData.addComment(
"Dividing channel {} by electrode distance {} km to give mV/km".format(
chan, self.getChanDx(chan) / 1000.0
)
)
if chan == "Ey":
# multiply by 1000/self.getChanDy same as dividing by dist in km
timeData.data[chan] = 1000 * timeData.data[chan] / self.getChanDy(chan)
timeData.addComment(
"Dividing channel {} by electrode distance {:.6f} km to give mV/km".format(
chan, self.getChanDy(chan) / 1000.0
)
)
# if remove zeros - False by default
if options["remzeros"]:
timeData.data[chan] = removeZerosSingle(timeData.data[chan])
# if remove nans - False by default
if options["remnans"]:
timeData.data[chan] = removeNansSingle(timeData.data[chan])
# remove the average from the data - True by default
if options["remaverage"]:
timeData.data[chan] = timeData.data[chan] - np.average(
timeData.data[chan]
)
# add comments
timeData.addComment(
"Remove zeros: {}, remove nans: {}, remove average: {}".format(
options["remzeros"], options["remnans"], options["remaverage"]
)
)
return timeData
[docs] def readMeasParams(self, sampleFreq: float = None) -> None:
"""Get the recording parameters for each of the subfiles
Parameters
----------
sampleFreq : float
The sampling frequency in Hz
"""
if sampleFreq is None:
if "sample_freq" in self.headers:
sampleFreq = float(self.getSampleFreq())
else:
self.printError(
"Lemi data has not been setup correctly. Please refer to documentation"
)
self.dataParamsDict: Dict = {}
starts: List = []
stops: List = []
numSamples: List = []
for dFile in self.dataF:
# dataHeaders, startTime, stopTime, nSamples = self.readB423Params(
# dFile, sampleFreq
# )
dataHeaders, startTime, stopTime, nSamples = readB423Params(
dFile, sampleFreq, self.dataByteOffset, self.recordByteSize
)
self.printText(
"Reading data file {}, start = {}, end = {}, samples = {}".format(
dFile, startTime, stopTime, nSamples
)
)
dFileParams: Dict = {}
dFileParams["start"] = startTime
dFileParams["stop"] = stopTime
dFileParams["headers"] = dataHeaders
starts.append(startTime)
stops.append(stopTime)
numSamples.append(nSamples)
self.dataParamsDict[dFile] = dataHeaders
# search for any missing data
sampleTime = timedelta(seconds=1.0 / sampleFreq)
# sort by start times
sortIndices = sorted(list(range(len(starts))), key=lambda k: starts[k])
check = True
for ii in range(1, len(self.dataF)):
# get the stop time of the previous dataset
stopTimePrev = stops[sortIndices[ii - 1]]
startTimeNow = starts[sortIndices[ii]]
if startTimeNow != stopTimePrev + sampleTime:
self.printWarning("There is a gap between the datafiles")
self.printWarning(
"Please separate out datasets with gaps into separate folders"
)
self.printWarning("Gap found between datafiles:")
self.printWarning("1. {}".format(self.dataF[sortIndices[ii - 1]]))
self.printWarning("2. {}".format(self.dataF[sortIndices[ii]]))
check = False
# if did not pass check, then exit
if not check:
self.printError(
"All data for a single recording must be continuous.", quitRun=True
)
# get a list of all the datafiles, scalings and the sample ranges
self.dataFileList: List = []
self.dataRanges: List = []
self.scalings: List = []
sample = -1
# create lookup table to say where the sample ranges are
for ii in range(0, len(self.dataF)):
iSort = sortIndices[ii] # get the sorted index
self.dataFileList.append(self.dataF[iSort])
startSample = sample + 1
# end sample -1 because this is inclusive of the start sample
endSample = startSample + numSamples[iSort] - 1
self.dataRanges.append([startSample, endSample])
# the scalings for the file
self.scalings.append(self.getScalars(self.dataParamsDict[dFile]))
# increment sample
sample = endSample
[docs] def getScalars(self, paramsDict: Dict) -> Dict:
"""Returns the scalars from a parameter dictionary
Parameters
----------
paramsDict : Dict
The parameter dictionary for a data file usually read from the headers in the file
Returns
-------
Dict
Dictionary with channels as keys and scalings as values
"""
return {
"Hx": [paramsDict["Kmx"], paramsDict["Ax"]],
"Hy": [paramsDict["Kmy"], paramsDict["Ay"]],
"Hz": [paramsDict["Kmz"], paramsDict["Az"]],
"Ex": [paramsDict["Ke1"], paramsDict["Ae1"]],
"Ey": [paramsDict["Ke2"], paramsDict["Ae2"]],
}
[docs] def printDataFileList(self) -> List[str]:
"""Information about the data files as a list of strings
Returns
-------
List[str]
List of information about the data files
"""
textLst: List[str] = []
textLst.append("Data File\t\tSample Ranges")
for dFile, sRanges in zip(self.dataFileList, self.dataRanges):
textLst.append("{}\t\t{} - {}".format(dFile, sRanges[0], sRanges[1]))
textLst.append("Total samples = {}".format(self.getNumSamples()))
return textLst
[docs] def printDataFileInfo(self) -> None:
"""Print a list of the data files"""
blockPrint(
"{} Data File List".format(self.__class__.__name__),
self.printDataFileList(),
)