Source code for resistics.time.reader_ascii

import os
import glob
from datetime import datetime, timedelta
import numpy as np
from typing import List, Tuple

from resistics.time.data import TimeData
from resistics.time.reader_internal import TimeReaderInternal
from resistics.time.clean import removeZerosSingle, removeNansSingle


[docs]class TimeReaderAscii(TimeReaderInternal): """Data reader for ascii formatted data The ASCII data reader reads ascii data files and internally formatted header files. No further scaling is applied to the data values in either getUnscaledSamples or getPhysicalSamples. All the data is assumed to be in the correct units. In fact, if the data does not have to be calibrated, the units could be anything as long as they are internally consistent. Methods ------- setParameters() Set data format parameters getUnscaledSamples() Get the raw unscaled samples from an ascii file getPhysicalSamples() Get data in field units. Note: no further scaling is applied in this function, ascii data is assumed to be in field units """
[docs] def setParameters(self) -> None: """Set data reader parameters This will vary for the different data formats. By default, setup for the internal data format. """ self.headerF = glob.glob(os.path.join(self.dataPath, "*.hdr")) self.dataF = glob.glob(os.path.join(self.dataPath, "*.ascii")) self.dataByteOffset = 0 self.dataByteSize = 4
[docs] def getUnscaledSamples(self, **kwargs) -> TimeData: """Get raw data from ascii data file This function simply reads the lines which match the samples to be read Parameters ---------- chans : List[str], optional List of channels to return if not all are required startSample : int, optional First sample to return endSample : int, optional Last sample to return Returns ------- TimeData Time data object """ # initialise chans, startSample and endSample with the whole dataset options = self.parseGetDataKeywords(kwargs) # get samples - this is inclusive dSamples = options["endSample"] - options["startSample"] + 1 # loop through chans and get data data = {} for chan in options["chans"]: # check to make sure channel exists self.checkChan(chan) # get data file dFile = os.path.join(self.dataPath, self.getChanDataFile(chan)) # read the lines dataChan = np.zeros(shape=(dSamples), dtype=np.float32) with open(dFile) as dF: for il, line in enumerate(dF): if il > options["endSample"]: break if il >= options["startSample"] and il <= options["endSample"]: dIndex = il - options["startSample"] dataChan[dIndex] = float(line.strip()) # set the data data[chan] = dataChan # get data start and stop time startTime, stopTime = self.sample2time( options["startSample"], options["endSample"] ) # dataset comments comments = [] comments.append( "Unscaled data {} to {} read in from measurement {}, samples {} to {}".format( startTime, stopTime, self.dataPath, options["startSample"], options["endSample"], ) ) comments.append("Sampling frequency {}".format(self.getSampleFreq())) if len(self.comments) > 0: comments = self.comments + comments return TimeData( sampleFreq=self.getSampleFreq(), startTime=startTime, stopTime=stopTime, data=data, comments=comments, )
[docs] def getPhysicalSamples(self, **kwargs): """Get ascii data scaled to physical values Warnings -------- No scaling happens in getPhysicalSamples. Ascii data is assumed to be properly scaled to mV for magnetic channels and mV/km for electric channels (i.e. field units) Parameters ---------- chans : List[str] List of channels to return if not all are required startSample : int First sample to return endSample : int Last sample to return remaverage : bool Remove average from the data remzeros : bool Remove zeroes from the data remnans: bool Remove NanNs from the data Returns ------- TimeData Time data object """ options = self.parseGetDataKeywords(kwargs) timeData = self.getUnscaledSamples( chans=options["chans"], startSample=options["startSample"], endSample=options["endSample"], ) # no further scaling applied to ascii data for chan in options["chans"]: # if remove zeros - False by default if options["remzeros"]: timeData.data[chan] = removeZerosSingle(timeData.data[chan]) # if remove nans - False by default if options["remnans"]: timeData.data[chan] = removeNansSingle(timeData.data[chan]) # remove the average from the data - True by default # do this after all scaling and removing nans and zeros if options["remaverage"]: timeData.data[chan] = timeData.data[chan] - np.average( timeData.data[chan] ) timeData.addComment( "Remove zeros: {}, remove nans: {}, remove average: {}".format( options["remzeros"], options["remnans"], options["remaverage"] ) ) return timeData