Source code for resistics.time.reader_spam
import os
import glob
import xml.etree.ElementTree as ET
from datetime import datetime, timedelta
import numpy as np
from typing import List, Tuple, Dict, Any
from resistics.common.checks import isMagnetic, isElectric, consistentChans
from resistics.common.print import blockPrint
from resistics.time.reader import TimeReader
from resistics.time.data import TimeData
from resistics.time.clean import removeZeros, removeZerosSingle, removeNansSingle
[docs]class TimeReaderSPAM(TimeReader):
"""Data reader for SPAM data
SPAM data has the following characteristics:
- SPAM raw data is single precision floats with unit Volts.
- Getting unscaled samples returns data with unit mV for both the electric and magnetic fields. This is because gain is removed in unscaled samples to ensure consistency when a single recording is made up of multiple data files, each with different gain settings
- The start time in XTR files is the time of the first sample in the data
- The end time in XTR files is the time of the last sample in the data
In situations where a SPAM dataset is recorded in multiple small files, it is required that the recording is continuous.
Attributes
----------
recChannels : Dict
Channels in each data file
dtype : np.float32
The data type
numHeaderFiles : int
The number of header files
numDataFiles : int
The number of data files
Methods
-------
setParameters()
Set parameters specific to a data format
getUnscaledSamples(**kwargs)
Get raw, unscaled data
getDataFilesForSamples(startSample, endSample)
Get the data files that contribute to the requested samples so they can be read
getPhysicalSamples(**kwargs)
Get data in physical units
spamHeaders()
Get sections and section headers to be read in for SPAM data
chanDefaults()
Get defaults values for channel headers
readHeader()
Read SPAM header files
readHeaderXTR(headerFile)
Read a XTR header file
readHeaderXTRX(headerFile)
Read a XTRX header files
headersFromRawFile(rawFile, headers)
Read headers from the data files
mergeHeaders(headersList, chanHeadersList)
Merge the headers from all the data files
printDataFileList()
Get data file information as a list of strings
printDataFiles()
Print data file information to terminal
Notes
-----
Getting unscaled samples for SPAM data removes the gain rather than return exactly the values in the data files. In cases where there are multiple data files, it is not necessary that they have been recorded with the same gain. Therefore, to ensure consistency when looking at raw data, the gain is removed at the getUnscaledSamples stage rather than getPhysicalSamples, where it would have probably been more appropriate. This means that getUnscaledSamples returns data where all channels are in mV.
The scalings to convert the raw data to mV are stored in the ts_lsb chan header and calculated out as the header files are being read.
.. todo::
Implement reading of XTRX header files
"""
[docs] def setParameters(self) -> None:
"""Set some data reader parameters for reading SPAM data"""
# get a list of the header and data files in the folder
self.headerF = glob.glob(os.path.join(self.dataPath, "*.XTR"))
if len(self.headerF) == 0:
self.headerF = glob.glob(os.path.join(self.dataPath, "*.XTRX"))
self.dataF = glob.glob(os.path.join(self.dataPath, "*.RAW"))
# data byte information might be different for each file
# so it is a dictionary
self.dataByteOffset: Dict = {}
self.recChannels = {}
self.dataByteSize = 4
# data type
self.dtype = np.float32
# get the number of data files and header files - this should be equal
self.numHeaderFiles: int = len(self.headerF)
self.numDataFiles: int = len(self.dataF)
[docs] def getUnscaledSamples(self, **kwargs) -> TimeData:
"""Get raw data from data file, returned in mV
SPAM raw data is single precision float with unit Volts. Calling this applies the ts_lsb calculated when the headers are read. This is because when a recording consists of multiple data files, each channel of each data file might have a different scaling. The only way to make the data consistent is to apply the ts_lsb scaling.
Therefore, this method returns the data in mV for all channels.
Parameters
----------
chans : List[str], optional
List of channels to return if not all are required
startSample : int, optional
First sample to return
endSample : int, optional
Last sample to return
Returns
-------
TimeData
Time data object
"""
# initialise chans, startSample and endSample with the whole dataset
options = self.parseGetDataKeywords(kwargs)
# get the files to read and the samples to take from them, in the correct order
dataFilesToRead, samplesToRead, scalings = self.getDataFilesForSamples(
options["startSample"], options["endSample"]
)
numSamples = options["endSample"] - options["startSample"] + 1
# set up the dictionary to hold the data
data = {}
for chan in options["chans"]:
data[chan] = np.zeros(shape=(numSamples), dtype=self.dtype)
# loop through chans and get data
sampleCounter = 0
for dFile, sToRead, scalar in zip(dataFilesToRead, samplesToRead, scalings):
# get samples - this is inclusive
dSamples = sToRead[1] - sToRead[0] + 1
# spam files always record 5 channels
dSamplesRead = dSamples * self.recChannels[dFile]
# read the data
byteOff = (
self.dataByteOffset[dFile]
+ sToRead[0] * self.recChannels[dFile] * self.dataByteSize
)
dFilePath = os.path.join(self.dataPath, dFile)
dataRead = np.memmap(
dFilePath,
dtype=self.dtype,
mode="r",
offset=byteOff,
shape=(dSamplesRead),
)
# now need to unpack this
for chan in options["chans"]:
# check to make sure channel exists
self.checkChan(chan)
# get the channel index - the chanIndex should give the right order in the data file
# as it is the same order as in the header file
chanIndex = self.chanMap[chan]
# use the range sampleCounter -> sampleCounter + dSamples, because this actually means sampleCounter + dSamples - 1 as python ranges are not inclusive of the end value
# scale by the lsb scalar here - note that these can be different for each file in the run
data[chan][sampleCounter : sampleCounter + dSamples] = (
dataRead[chanIndex : dSamplesRead : self.recChannels[dFile]]
* scalar[chan]
)
# increment sample counter
sampleCounter = sampleCounter + dSamples # get ready for the next data read
# return data
startTime, stopTime = self.sample2time(
options["startSample"], options["endSample"]
)
comments = []
comments.append(
"Unscaled data {} to {} read in from measurement {}, samples {} to {}".format(
startTime,
stopTime,
self.dataPath,
options["startSample"],
options["endSample"],
)
)
comments.append("Data read from {} files in total".format(len(dataFilesToRead)))
comments.append(
"Data scaled to mV for all channels using scalings in header files"
)
comments.append("Sampling frequency {}".format(self.getSampleFreq()))
return TimeData(
sampleFreq=self.getSampleFreq(),
startTime=startTime,
stopTime=stopTime,
data=data,
comments=comments,
)
[docs] def getDataFilesForSamples(
self, startSample: int, endSample: int
) -> Tuple[List[str], List[List[int]], List[float]]:
"""Get the data files that have to be read to cover the sample range
Parameters
----------
startSample : int
Starting sample of the sample range
endSamples : int
Ending sample of the sample range
Returns
-------
dataFilesToRead
Time data object
"""
# have the datafiles saved in sample order beginning with the earliest first
# go through each datafile and find the range to be read
dataFilesToRead = []
samplesToRead = []
scalings = []
for idx, dFile in enumerate(self.dataFileList):
fileStartSamp = self.dataRanges[idx][0]
fileEndSamp = self.dataRanges[idx][1]
if fileStartSamp > endSample or fileEndSamp < startSample:
continue # nothing to read from this file
# in this case, there is some overlap with the samples to read
dataFilesToRead.append(dFile)
readFrom = 0 # i.e. the first sample in the datafile
readTo = fileEndSamp - fileStartSamp # this the last sample in the file
if fileStartSamp < startSample:
readFrom = startSample - fileStartSamp
if fileEndSamp > endSample:
readTo = endSample - fileStartSamp
# this is an inclusive number readFrom to readTo including readTo
samplesToRead.append([readFrom, readTo])
scalings.append(self.scalings[idx])
return dataFilesToRead, samplesToRead, scalings
[docs] def getPhysicalSamples(self, **kwargs):
"""Get data scaled to physical values
resistics uses field units, meaning physical samples will return the following:
- Electrical channels in mV/km
- Magnetic channels in mV
- To get magnetic fields in nT, calibration needs to be performed
Notes
-----
The method getUnscaledSamples multiplies the raw data by the ts_lsb converting it to mV. Because gain is removed when getting the unscaledSamples and all channel data is in mV, the only calculation that has to be done is to divide by the dipole lengths (east-west spacing and north-south spacing).
To get magnetic fields in nT, they have to be calibrated.
Parameters
----------
chans : List[str]
List of channels to return if not all are required
startSample : int
First sample to return
endSample : int
Last sample to return
remaverage : bool
Remove average from the data
remzeros : bool
Remove zeroes from the data
remnans: bool
Remove NanNs from the data
Returns
-------
TimeData
Time data object
"""
# initialise chans, startSample and endSample with the whole dataset
options = self.parseGetDataKeywords(kwargs)
# get data
timeData = self.getUnscaledSamples(
chans=options["chans"],
startSample=options["startSample"],
endSample=options["endSample"],
)
# Scalars are applied in getUnscaledSamples to convert to mV - this is for ease of calculation and because each data file in the run might have a separate scaling
# all that is left is to divide by the dipole length in km and remove the average
for chan in options["chans"]:
if chan == "Ex":
# multiply by 1000/self.getChanDx same as dividing by dist in km
timeData.data[chan] = 1000 * timeData.data[chan] / self.getChanDx(chan)
timeData.addComment(
"Dividing channel {} by electrode distance {} km to give mV/km".format(
chan, self.getChanDx(chan) / 1000.0
)
)
if chan == "Ey":
# multiply by 1000/self.getChanDy same as dividing by dist in km
timeData.data[chan] = 1000 * timeData.data[chan] / self.getChanDy(chan)
timeData.addComment(
"Dividing channel {} by electrode distance {} km to give mV/km".format(
chan, self.getChanDy(chan) / 1000.0
)
)
# if remove zeros - False by default
if options["remzeros"]:
timeData.data[chan] = removeZerosSingle(timeData.data[chan])
# if remove nans - False by default
if options["remnans"]:
timeData.data[chan] = removeNansSingle(timeData.data[chan])
# remove the average from the data - True by default
if options["remaverage"]:
timeData.data[chan] = timeData.data[chan] - np.average(
timeData.data[chan]
)
# add comments
timeData.addComment(
"Remove zeros: {}, remove nans: {}, remove average: {}".format(
options["remzeros"], options["remnans"], options["remaverage"]
)
)
return timeData
[docs] def spamHeaders(self) -> Tuple[List[str], Dict[str, str]]:
"""Get the sections in SPAM header files (XTR and XTRX)
Returns
-------
sections : List[str]
The sections in the header files
sectionHeaders : Dict[str, str]
The headers in each section to be read in
"""
sections = ["STATUS", "TITLE", "PROJECT", "FILE", "SITE", "CHANNAME", "DATA"]
sectionHeaders = {}
sectionHeaders["STATUS"] = ["STATUS"]
sectionHeaders["TITLE"] = ["AUTHOR", "VERSION", "DATE", "COMMENT"]
sectionHeaders["FILE"] = ["NAME", "FREQBAND", "DATE"]
sectionHeaders["CHANNAME"] = ["ITEMS", "NAME"]
sectionHeaders["DATA"] = ["ITEMS", "CHAN"]
return sections, sectionHeaders
[docs] def chanDefaults(self) -> Dict[str, Any]:
"""Get defaults for channel headers
Returns
-------
Dict[str, Any]
Dictionary of headers for channels and default values
"""
chanH = {}
chanH["gain_stage1"] = 1
chanH["gain_stage2"] = 1
chanH["hchopper"] = 0 # this depends on sample frequency
chanH["echopper"] = 0
# channel output information (sensor_type, channel_type, ts_lsb, pos_x1, pos_x2, pos_y1, pos_y2, pos_z1, pos_z2, sensor_sernum)
chanH["ats_data_file"] = ""
chanH["num_samples"] = 0
chanH["sensor_type"] = ""
chanH["channel_type"] = ""
chanH["ts_lsb"] = 1
# the lsb/scaling is not applied. data is raw voltage which needs to be scaled
# an lsb is constructed from the scaling in the XTR/XTRX file to take the data to mV
chanH["scaling_applied"] = False # check this
chanH["pos_x1"] = 0
chanH["pos_x2"] = 0
chanH["pos_y1"] = 0
chanH["pos_y2"] = 0
chanH["pos_z1"] = 0
chanH["pos_z2"] = 0
chanH["sensor_sernum"] = 0
return chanH
[docs] def readHeader(self) -> None:
"""Read header files
For SPAM data, the may be more than one header file as data can be split up into smaller files as it is recorded. In that case, the header information should be somehow merged.
All sampling frequencies should be the same
"""
# read header files
self.headersList = []
self.chanHeadersList = []
for headerFile in self.headerF:
if "xtrx" in headerFile.lower():
headers, chanHeaders = self.readHeaderXTRX(headerFile)
else:
headers, chanHeaders = self.readHeaderXTR(headerFile)
self.headersList.append(headers)
self.chanHeadersList.append(chanHeaders)
# check to make sure no gaps, calculate out the sample ranges and list the data files for each sample
self.mergeHeaders(self.headersList, self.chanHeadersList)
[docs] def readHeaderXTR(self, headerFile: str) -> None:
"""Read a XTR header file
The raw data for SPAM is in single precision Volts. However, if there are multiple data files for a single recording, each one may have a different gain. Therefore, a scaling has to be calculated for each data file and channel. This scaling will convert all channels to mV.
For the most part, this method only reads recording information. However, it does additionally calculate out the lsb scaling and store it in the ts_lsb channel header. More information is provided in the notes.
Notes
-----
The raw data for SPAM is in single precision floats and record the raw Voltage measurements of the sensors. However, if there are multiple data files for a single continuous recording, each one may have a different gain. Therefore, a scaling has to be calculated for each data file.
For electric channels, the scaling begins with the scaling provided in the header file in the DATA section. This incorporates any gain occuring in the device. This scaling is further amended by a conversion to mV and polarity reversal,
.. math::
scaling = read scaling from DATA section of header file \\
scaling = 1000 * scaling , \\
scaling = -1000 * scaling , \\
ts_lsb = scaling ,
where the reason for the 1000 factor in line 2 is not clear, nor is the polarity reversal. However, this information was provided by people more familiar with the data format.
For magnetic channels, the scaling in the header file DATA section is ignored. This is because it includes a static gain correction, which would be duplicated at the calibration stage. Therefore, this is not included at this point.
.. math::
scaling = -1000 , \\
ts_lsb = scaling ,
This scaling converts the magnetic data from V to mV.
Parameters
----------
headerFile : str
The XTR header file to read in
"""
with open(headerFile, "r") as f:
lines = f.readlines()
sectionLines = {}
# let's get data
for line in lines:
line = line.strip()
line = line.replace("'", " ")
# continue if line is empty
if line == "":
continue
if "[" in line:
sec = line[1:-1]
sectionLines[sec] = []
else:
sectionLines[sec].append(line)
# the base class is built around a set of headers based on ATS headers
# though this is a bit more work here, it saves lots of code repetition
headers = {}
# recording information (start_time, start_date, stop_time, stop_date, ats_data_file)
fileLine = sectionLines["FILE"][0]
fileSplit = fileLine.split()
headers["sample_freq"] = np.absolute(float(fileSplit[-1]))
timeLine = sectionLines["FILE"][2]
timeSplit = timeLine.split()
# these are the unix time stamps
startDate = float(timeSplit[1] + "." + timeSplit[2])
datetimeStart = datetime.utcfromtimestamp(startDate)
stopDate = float(timeSplit[3] + "." + timeSplit[4])
datetimeStop = datetime.utcfromtimestamp(stopDate)
headers["start_date"] = datetimeStart.strftime("%Y-%m-%d")
headers["start_time"] = datetimeStart.strftime("%H:%M:%S.%f")
headers["stop_date"] = datetimeStop.strftime("%Y-%m-%d")
headers["stop_time"] = datetimeStop.strftime("%H:%M:%S.%f")
# here calculate number of samples
deltaSeconds = (datetimeStop - datetimeStart).total_seconds()
# calculate number of samples - have to add one because the time given in SPAM recording is the actual time of the last sample
numSamples = int(deltaSeconds * headers["sample_freq"]) + 1
# put these in headers for ease of future calculations in merge headers
headers["num_samples"] = numSamples
# spam datasets only have the one data file for all channels
headers["ats_data_file"] = fileSplit[1]
# data information (meas_channels, sample_freq)
chanLine = sectionLines["CHANNAME"][0]
# this gets reformatted to an int later
headers["meas_channels"] = chanLine.split()[1]
numChansInt = int(headers["meas_channels"])
# deal with the channel headers
chanHeaders = []
for iChan in range(0, numChansInt):
chanH = self.chanDefaults()
# set the sample frequency from the main headers
chanH["sample_freq"] = headers["sample_freq"]
# line data - read through the data in the correct channel order
chanLine = sectionLines["CHANNAME"][iChan + 1]
chanSplit = chanLine.split()
dataLine = sectionLines["DATA"][iChan + 1]
dataSplit = dataLine.split()
# channel input information (gain_stage1, gain_stage2, hchopper, echopper)
chanH["gain_stage1"] = 1
chanH["gain_stage2"] = 1
# channel output information (sensor_type, channel_type, ts_lsb, pos_x1, pos_x2, pos_y1, pos_y2, pos_z1, pos_z2, sensor_sernum)
chanH["ats_data_file"] = fileSplit[1]
chanH["num_samples"] = numSamples
# channel information
# spams often use Bx, By - use H within the software as a whole
chanH["channel_type"] = consistentChans(chanSplit[2])
# the sensor number is a bit of a hack - want MFSXXe or something - add MFS in front of the sensor number - this is liable to break
# at the same time, set the chopper
calLine = sectionLines["200{}003".format(iChan + 1)][0]
calSplit = calLine.split()
if isMagnetic(chanH["channel_type"]):
chanH["sensor_sernum"] = calSplit[
2
] # the last three digits is the serial number
sensorType = calSplit[1].split("_")[1][-2:]
chanH["sensor_type"] = "MFS{:02d}".format(int(sensorType))
if "LF" in calSplit[1]:
chanH["hchopper"] = 1
else:
chanH["sensor_type"] = "ELC00"
if "LF" in calLine:
chanH["echopper"] = 1
# data is raw voltage of sensors
# both E and H fields need polarity reversal (from email with Reinhard)
# get scaling from headers
scaling = float(dataSplit[-2])
if isElectric(chanH["channel_type"]):
# the factor of 1000 is not entirely clear
lsb = 1000.0 * scaling
# volts to millivolts and a minus to switch polarity giving data in mV
lsb = -1000.0 * lsb
else:
# volts to millivolts and a minus to switch polarity giving data in mV
# scaling in header file is ignored because it duplicates static gain correction in calibration
lsb = -1000.0
chanH["ts_lsb"] = lsb
# the distances
if chanSplit[2] == "Ex":
chanH["pos_x1"] = float(dataSplit[4]) / 2
chanH["pos_x2"] = chanH["pos_x1"]
if chanSplit[2] == "Ey":
chanH["pos_y1"] = float(dataSplit[4]) / 2
chanH["pos_y2"] = chanH["pos_y1"]
if chanSplit[2] == "Ez":
chanH["pos_z1"] = float(dataSplit[4]) / 2
chanH["pos_z2"] = chanH["pos_z1"]
# append chanHeaders to the list
chanHeaders.append(chanH)
# check information from raw file headers
self.headersFromRawFile(headers["ats_data_file"], headers)
# return the headers and chanHeaders from this file
return headers, chanHeaders
[docs] def readHeaderXTRX(self, headerFile):
"""Read a XTRX header files
XTRX are newer header files and will supercede XTR
Parameters
----------
headerFile : str
The XTRX header file to read in
"""
raise NotImplementedError("Support for XTRX files has not yet been implemented")
[docs] def headersFromRawFile(self, rawFile: str, headers: Dict) -> None:
"""Read headers from the raw data files
Read the headers from the raw file and figure out the data byte offset.
Parameters
----------
rawFile : str
The .RAW data file
headers : Dict
A headers dictionary
Notes
-----
Open with encoding ISO-8859-1 because it has a value for all bytes unlike other encoding. In particular, want to find number of samples and the size of the header. The extended header is ignored.
"""
dFile = open(os.path.join(self.dataPath, rawFile), "r", encoding="ISO-8859-1")
generalHeaderString = dFile.read(1000) # this should be long enough
generalSplit = generalHeaderString.split()
# read GENERAL HEADER
generalHeader = {}
generalHeader["recLength"] = int(generalSplit[0])
generalHeader["fileType"] = generalSplit[1]
generalHeader["wordLength"] = int(generalSplit[2])
generalHeader["version"] = generalSplit[3]
generalHeader["procId"] = generalSplit[4]
generalHeader["numCh"] = int(generalSplit[5])
generalHeader["totalRec"] = int(generalSplit[6])
generalHeader["firstEvent"] = int(generalSplit[7])
generalHeader["numEvent"] = int(generalSplit[8])
generalHeader["extend"] = int(generalSplit[9])
# read EVENT HEADER - there can be multiple of these, but normally only the one
# Multiple events are largely deprecated. Only a single event is used
eventHeaders = []
fileSize = os.path.getsize(os.path.join(self.dataPath, rawFile))
record = generalHeader["firstEvent"]
for ir in range(0, generalHeader["numEvent"]):
seekPt = (record - 1) * generalHeader["recLength"]
if not seekPt > fileSize:
# seek from beginning of file
dFile.seek(seekPt, 0)
# read extra to make sure
eventString = dFile.read(1000)
eventSplit = eventString.split()
eH = {}
eH["start"] = int(eventSplit[0])
eH["startms"] = int(eventSplit[1])
eH["stop"] = int(eventSplit[2])
eH["stopms"] = int(eventSplit[3])
eH["cvalue1"] = float(eventSplit[4])
eH["cvalue2"] = float(eventSplit[5])
eH["cvalue3"] = float(eventSplit[6])
eH["EHInfile"] = int(eventSplit[7])
eH["nextEH"] = int(eventSplit[8])
eH["previousEH"] = int(eventSplit[9])
eH["numData"] = int(eventSplit[10])
eH["startData"] = int(eventSplit[11])
eH["extended"] = int(eventSplit[12])
eventHeaders.append(eH)
if eH["nextEH"] < generalHeader["totalRec"]:
record = eH["nextEH"] # set to go to next eH
else:
break # otherwise break out of for loops
# close the data file
dFile.close()
# now compare number of samples with that calculated previously
if eventHeaders[0]["numData"] != headers["num_samples"]:
self.printWarning("Data file: {}".format(dFile))
self.printWarning(
"Number of samples in raw file header {} does not equal that calculated from data {}".format(
eventHeaders[0]["numData"], headers["num_samples"]
)
)
self.printWarning("Number of samples calculated from data will be used")
# set the byte offset for the file
self.dataByteOffset[rawFile] = (
eventHeaders[0]["startData"] - 1
) * generalHeader["recLength"]
self.recChannels[rawFile] = generalHeader["numCh"]
[docs] def mergeHeaders(self, headersList: List, chanHeadersList: List) -> None:
"""Merge headers from all the header files
Checks all the header files to see if there are any gaps and calculates the sample ranges for each file together with the total number of samples. Sets the start and end time of the recording and class variables datetimeStart and datetimeStop.
Parameters
----------
headersList : List
List of headers from each data file
chanHeadersList :
List of chan headers from each data file
"""
# take the first header as an example
self.headers = headersList[0]
self.chanHeaders = chanHeadersList[0]
if len(headersList) == 1:
# just fill in the data file list and data ranges
self.dataFileList = [self.headers["ats_data_file"]]
self.dataRanges = [[0, self.headers["num_samples"] - 1]]
self.scalings = []
tmp = {}
for cHeader in self.chanHeaders:
tmp[cHeader["channel_type"]] = cHeader["ts_lsb"]
self.scalings.append(tmp)
return # then there was only one file - no need to do all the below
# make sure that all headers have the same sample rate
# and save the start and stop times and dates
startTimes = []
stopTimes = []
numSamples = []
for idx, header in enumerate(headersList):
if header["sample_freq"] != self.headers["sample_freq"]:
self.printError(
"Not all datasets in {} have the same sample frequency.\nExiting...".format(
self.dataPath
),
quitRun=True,
)
if header["meas_channels"] != self.headers["meas_channels"]:
self.printError(
"Not all datasets in {} have the same number of channels.\nExiting...".format(
self.dataPath
),
quitRun=True,
)
# now store startTimes, stopTimes and numSamples
# do this as datetimes, will be easier
startString = "{} {}".format(header["start_date"], header["start_time"])
stopString = "{} {}".format(header["stop_date"], header["stop_time"])
datetimeStart = datetime.strptime(startString, "%Y-%m-%d %H:%M:%S.%f")
datetimeStop = datetime.strptime(stopString, "%Y-%m-%d %H:%M:%S.%f")
startTimes.append(datetimeStart)
stopTimes.append(datetimeStop)
numSamples.append(header["num_samples"])
# check the start and end times
sampleTime = timedelta(seconds=1.0 / self.headers["sample_freq"])
# sort by start times
sortIndices = sorted(list(range(len(startTimes))), key=lambda k: startTimes[k])
# now sort stop times by the same indices
check = True
for i in range(1, self.numHeaderFiles):
# get the stop time of the previous dataset
stopTimePrev = stopTimes[sortIndices[i - 1]]
startTimeNow = startTimes[sortIndices[i]]
if startTimeNow != stopTimePrev + sampleTime:
self.printWarning(
"There is a gap between the datafiles in {}".format(self.dataPath)
)
self.printWarning(
"Please separate out datasets with gaps into separate folders"
)
# print out where the gap was found
self.printWarning("Gap found between datafiles:")
self.printWarning(
"1. {}".format(headersList[sortIndices[i - 1]]["ats_data_file"])
)
self.printWarning(
"2. {}".format(headersList[sortIndices[i]]["ats_data_file"])
)
# set check as false
check = False
# if did not pass check, then exit
if not check:
self.printError(
"Gaps in data. All data for a single recording must be continuous. Exiting...",
quitRun=True,
)
# make sure there are no gaps
totalSamples = sum(numSamples)
# get a list of all the datafiles, scalings and the sample ranges
self.dataFileList = []
self.dataRanges = []
self.scalings = []
sample = -1
# now need some sort of lookup table to say where the sample ranges are
for i in range(0, self.numHeaderFiles):
iSort = sortIndices[i] # get the sorted index
self.dataFileList.append(headersList[iSort]["ats_data_file"])
startSample = sample + 1
endSample = (
startSample + numSamples[iSort] - 1
) # -1 because this is inclusive of the start sample
self.dataRanges.append([startSample, endSample])
# increment sample
sample = endSample
# save the scalings for each chan
tmp = {}
for cHeader in self.chanHeadersList[iSort]:
tmp[cHeader["channel_type"]] = cHeader["ts_lsb"]
self.scalings.append(tmp)
# now set the LSB information for the chanHeaders
# i.e. if they change, this should reflect that
for i in range(0, len(self.chanHeaders)):
chan = self.chanHeaders[i]["channel_type"]
lsbSet = set()
for scalar in self.scalings:
lsbSet.add(scalar[chan])
if len(lsbSet) == 1:
self.chanHeaders[i]["ts_lsb"] = list(lsbSet)[0]
else:
self.printWarning(
"Multiple different LSB values found for chan {}: {}".format(
chan, list(lsbSet)
)
)
self.printWarning(
"This is handled, but the header information given will show only a single LSB value"
)
self.chanHeaders[i]["ts_lsb"] = list(lsbSet)[0]
# set start and end time for headers and chan headers
# do the same with number of samples
datetimeStart = min(startTimes)
datetimeStop = max(stopTimes)
self.headers["start_date"] = datetimeStart.strftime("%Y-%m-%d")
self.headers["start_time"] = datetimeStart.strftime("%H:%M:%S.%f")
self.headers["stop_date"] = datetimeStop.strftime("%Y-%m-%d")
self.headers["stop_time"] = datetimeStop.strftime("%H:%M:%S.%f")
self.headers["num_samples"] = totalSamples
# set datafiles = the whole list of datafiles
self.headers["ats_data_file"] = self.dataFileList
for iChan in range(0, len(self.chanHeaders)):
self.chanHeaders[iChan]["start_date"] = datetimeStart.strftime("%Y-%m-%d")
self.chanHeaders[iChan]["start_time"] = datetimeStart.strftime(
"%H:%M:%S.%f"
)
self.chanHeaders[iChan]["stop_date"] = datetimeStop.strftime("%Y-%m-%d")
self.chanHeaders[iChan]["stop_time"] = datetimeStop.strftime("%H:%M:%S.%f")
self.chanHeaders[iChan]["num_samples"] = totalSamples
self.chanHeaders[iChan]["ats_data_file"] = self.dataFileList
[docs] def printDataFileList(self) -> List[str]:
"""Information about the data files as a list of strings
Returns
-------
List[str]
List of information about the data files
"""
textLst: List[str] = []
textLst.append("Data File\t\tSample Ranges")
for dFile, sRanges in zip(self.dataFileList, self.dataRanges):
textLst.append("{}\t\t{} - {}".format(dFile, sRanges[0], sRanges[1]))
textLst.append("Total samples = {}".format(self.getNumSamples()))
return textLst
[docs] def printDataFileInfo(self) -> None:
"""Print a list of the data files"""
blockPrint(
"{} Data File List".format(self.__class__.__name__),
self.printDataFileList(),
)