Source code for resistics.time.reader_ats

import os
import glob
import xml.etree.ElementTree as ET
from datetime import datetime, timedelta
import numpy as np

from resistics.time.reader import TimeReader


[docs]class TimeReaderATS(TimeReader): """Data reader for ATS formatted data For ATS files, header information is XML formatted. The end time in ATS header files is actually one sample past the time of the last sample. The dataReader handles this and gives an end time corresponding to the actual time of the last sample. Methods ------- setParameters() Set data format parameters dataHeaders() Headers to read in readHeaders() Specific function for reading the headers for internal format lineToKeyAndValue(line) Separate a line into key and value with = as a delimiter Notes ----- The raw data units for ATS data are in counts. To get data in field units, ATS data is first multipled by the least significat bit (lsb) defined in the header files, .. code-block:: text data = data * leastSignificantBit, giving data in mV. The lsb includes the gain removal, so no separate gain removal needs to be performed. For electrical channels, there is additional step of dividing by the electrode spacing, which is provided in metres. The extra factor of a 1000 is to convert this to km to give mV/km for electric channels .. code-block:: text data = (1000 * data)/electrodeSpacing Finally, to get magnetic channels in nT, the magnetic channels need to be calibrated. """
[docs] def setParameters(self) -> None: """Set data reader parameters for ATS files""" self.headerF = glob.glob(os.path.join(self.dataPath, "*.xml")) self.dataF = glob.glob(os.path.join(self.dataPath, "*.ats")) self.dataByteOffset = 1024 self.dataByteSize = 4
[docs] def dataHeaders(self): """Return the data headers in the internal file format Returns ------- recordingHeaders : List[str] Headers with information about the recording globalHeaders : List[str] Common headers with information about the recording channelHeadersInput : List[str] Channel setup headers channelHeadersOutput : List[str] Channel recording headers """ recordingHeaders = ["start_time", "start_date", "stop_time", "stop_date"] globalHeaders = ["meas_channels", "sample_freq"] channelHeadersInput = ["gain_stage1", "gain_stage2", "hchopper", "echopper"] channelHeadersOutput = [ "start_time", "start_date", "sample_freq", "num_samples", "ats_data_file", "sensor_type", "channel_type", "ts_lsb", "pos_x1", "pos_x2", "pos_y1", "pos_y2", "pos_z1", "pos_z2", "sensor_sernum", ] return ( recordingHeaders, globalHeaders, channelHeadersInput, channelHeadersOutput, )
[docs] def readHeader(self): """Read time data header file for ATS format Headers for ATS files are XML formatted. """ if len(self.headerF) > 1: self.printWarning( "More xml files than expected. Using: {}".format(self.headerF[0]) ) tree = ET.parse(self.headerF[0]) root = tree.getroot() # get header names rHeaders, gHeaders, cHeadersInput, cHeadersOutput = self.dataHeaders() # get recording headers self.headers = {} recording = root.find("./recording") for rH in rHeaders: self.headers[rH] = recording.find(rH).text # get global config headers globalConfig = recording.find("./input/ADU07Hardware/global_config") for gH in gHeaders: self.headers[gH] = globalConfig.find(gH).text # get the channel headers in the input section self.chanHeaders = [] for chan in root.findall( "./recording/input/ADU07Hardware/channel_config/channel" ): chanH = {} for cH in cHeadersInput: chanH[cH] = chan.find(cH).text self.chanHeaders.append(chanH) # get the channel headers in the ATSWriter section of the output try: recordingOutput = recording.find("output") atsWriter = recordingOutput.find(".//ATSWriter") outputSec = atsWriter.findall("configuration/channel") except: self.printError( "ATSWriter section not found or channel information not found in ATSWriter" ) if len(outputSec) == 0: self.printError( "No channels found in the ATSWriter. Unable to fully construct channel headers. Exiting.", quitRun=True, ) for chan, chanH in zip(outputSec, self.chanHeaders): for cH in cHeadersOutput: chanH[cH] = chan.find(cH).text # a couple of things to do: add microseconds to the times # remember, the actual end time is one sample back # if you do a calculation with the number of samples and the start time self.headers["start_time"] = self.headers["start_time"] + ".000000" self.headers["stop_time"] = self.headers["stop_time"] + ".000000" for chanH in self.chanHeaders: chanH["start_time"] = chanH["start_time"] + ".000000" # set the lsb applied header in chans # for ats files, this is not applied in the raw data files for idx, ch in enumerate(self.chanHeaders): self.chanHeaders[idx]["scaling_applied"] = False self.chanHeaders[idx]["ts_lsb"] = "-{}".format( self.chanHeaders[idx]["ts_lsb"] )