Source code for resistics.time.reader_internal

import os
import glob
from datetime import datetime, timedelta
import numpy as np
from typing import List, Tuple

from resistics.time.reader import TimeReader
from resistics.common.io import checkFilepath, lineToKeyAndValue


[docs]class TimeReaderInternal(TimeReader): """Data reader for internal formatted data Internal formatted data is straightforward. Header information is read in from a global header file and header files for each channel, all of them ascii formatted file. Each channel has its own data file written out using numpy's binary write function and with a .dat extension. As raw data is not usually in the internal data format, to avoid any problems, the following workflow is suggested: - Get physical data from the raw data files (ATS, SPAM, Phoenix) - Perform any pre-processing required - Save as internal format, ensuring to set the scaling_applied channel header for every channel to True. This will avoid any further scaling when the data is read in again. Methods ------- dataHeaders() Headers to read in readHeaders() Specific function for reading the headers for internal format lineToKeyAndValue(line) Separate a line into key and value with = as a delimiter """
[docs] def dataHeaders(self) -> Tuple[List[str], List[str], List[str], List[str]]: """Return the data headers in the internal file format Returns ------- recordingHeaders : List[str] Headers with information about the recording globalHeaders : List[str] Common headers with information about the recording channelHeadersInput : List[str] Channel setup headers channelHeadersOutput : List[str] Channel recording headers """ # note, in comparison to ats headers, this also has one called scaling_applied recordingHeaders = ["start_time", "start_date", "stop_time", "stop_date"] globalHeaders = ["meas_channels", "sample_freq"] channelHeadersInput = ["gain_stage1", "gain_stage2", "hchopper", "echopper"] channelHeadersOutput = [ "start_time", "start_date", "sample_freq", "num_samples", "ats_data_file", "sensor_type", "channel_type", "ts_lsb", "scaling_applied", "pos_x1", "pos_x2", "pos_y1", "pos_y2", "pos_z1", "pos_z2", "sensor_sernum", ] return ( recordingHeaders, globalHeaders, channelHeadersInput, channelHeadersOutput, )
[docs] def readHeader(self) -> None: """Read time data header file for internal format""" ext = "hdr" # check to see if there is a different header extension in a child class if hasattr(self, "headerExt"): ext = self.headerExt globalFile = "global.{}".format(ext) # look in headerF for global.hdr if os.path.join(self.dataPath, globalFile) not in self.headerF: self.printError( "Global header not found. The {} file is required".format(globalFile), quitRun=True, ) globalF = open(os.path.join(self.dataPath, globalFile)) lines = globalF.readlines() globalF.close() # ignore the first line lines.pop(0) # now go through and get the headers self.headers = {} for l in lines: if l == "": continue key, val = lineToKeyAndValue(l.strip()) self.headers[key] = val # now want to deal with the chan headers numChans = int(self.headers["meas_channels"]) self.chanHeaders = [] for iChan in range(0, numChans): chanF = open(os.path.join(self.dataPath, "chan_{:02d}.{}".format(iChan, ext))) lines = chanF.readlines() chanF.close() # remove first line and read the headers for the channel lines.pop(0) cHeader = {} for l in lines: if l == "": continue key, val = lineToKeyAndValue(l.strip()) cHeader[key] = val self.chanHeaders.append(cHeader) # finally, read the comments commentPath = os.path.join(self.dataPath, "comments.txt") if checkFilepath(commentPath): f = open(commentPath, "r") self.comments = f.readlines() for idx, comment in enumerate(self.comments): self.comments[idx] = comment.rstrip() f.close() else: self.comments = []