Source code for resistics.time.writer
import os
import glob
from datetime import datetime, timedelta
import numpy as np
from typing import List, Dict, Union, Any
from resistics.common.base import ResisticsBase
from resistics.common.io import checkAndMakeDir
from resistics.common.print import breakComment
from resistics.time.data import TimeData
from resistics.time.reader import TimeReader
[docs]class TimeWriter(ResisticsBase):
"""Base class for data writers
All input readers provide headers in a common format
Attributes
----------
outPath : str
The path to write to
extension: str
The extension to give to the data files
dtype : data type
Data format to write out, default is np.int32
headers :
Header information to write out
chans : List[str]
Channels to write out
chanMap : Dict
Map between channel and index in channel headers
chanHeaders : List
Channel specific headers
Methods
-------
__init__()
Initialise writer
getOutPath()
Get the path to write out to
setOutPath(path)
Set the path to write out to
setExtension()
For subclasses to set their extension
setGlobalHeadersFromKeywords(headers, keywords)
Set the global headers
setChanHeadersFromKeywords(chanHeaders, keywords)
Set channel headers
calcStopDateTime(sampleFreq, numSamples, datetimeStart)
Calculate time series stop time
globalHeaderwords()
Get a list of the global headers of interest
chanHeaderwords()
Get a list of the channel headers of interest
writeTemplateHeaderFiles(chans, chanFileMap, sampleFreq, numSamples, startDate)
Write out a set of template header files for a case in which ASCII data without headers is to be loaded in
writeDataset(reader, **kwargs)
Write an existing dataset as a different format
writeData(headers, chanHeaders, timeData, **kwargs)
Write data based on headers, channel headers and time series data
write(headers, chanHeaders, chanMap, timeData, **kwargs)
Write out a dataset
writeHeaders(headers, chans, chanMap, chanHeaders)
Write out headers
writeComments(comments)
Write out comments
writeDataFiles(chans, timeData)
Write out time series data - not implemented in base class
printList()
Class status returned as list of strings
"""
def __init__(self):
"""Initialise"""
self.outPath: str = ""
# in subclasses, extension might change i.e. .ascii
self.setExtension()
# data type - the format of the data being written out
self.dtype = np.int32
# information about data being written
self.headers: Union[Dict, None] = None
self.chans: Union[List[str], None] = None
self.chanMap: Union[Dict[str, int], None] = None
self.chanHeaders: Union[List, None] = None
[docs] def getOutPath(self) -> str:
"""Get the out path
Parameters
----------
str
The outpath defining where data is written
"""
return self.outPath
[docs] def setOutPath(self, path: str) -> None:
"""Set the out path
Parameters
----------
path : str
The new outpath defining where data is written
"""
self.outPath = path
[docs] def setExtension(self) -> None:
"""For subclasses to set their own extension type"""
self.extension = ".dat"
[docs] def setGlobalHeadersFromKeywords(self, headers: Dict, keywords: Dict) -> Dict:
"""Set the global headers
Before writing out data, global headers are set. The priority order is:
1. keywords[headername] if headername exists in keywords
2. headers[headername] if headername exists in headers
3. "" where the header is not defined in either keywords or headers
The reason keywords takes top priority is there may be instances where the headers defined in a reader may need to be altered due to processing of time data.
Parameters
----------
headers : Dict
Dictionary of header values
keywords : Dict
A dictionary of header values to overwrite those in headers
"""
globalHeaderwords = self.globalHeaderwords()
for gH in globalHeaderwords:
hdrVal = ""
if gH in headers:
hdrVal = headers[gH]
if gH in keywords:
hdrVal = keywords[gH]
headers[gH] = hdrVal
return headers
[docs] def setChanHeadersFromKeywords(self, chanHeaders: List, keywords: Dict) -> List:
"""Set the channel headers
Before writing out data, channel headers are set. The priority order is:
1. keywords[headername] if headername exists in keywords
2. headers[headername] if headername exists in headers
3. "" where the channel header is not defined in either keywords or headers
The reason keywords takes top priority is there may be instances where the headers defined in a reader may need to be altered due to processing of time data.
Parameters
----------
chanHeaders : List
List of channel headers
keywords : Dict
A dictionary of header values to overwrite those in channel headers
"""
chanHeaderwords = self.chanHeaderwords()
for iChan in range(0, len(chanHeaders)):
for cH in chanHeaderwords:
hdrVal = ""
if cH in chanHeaders[iChan]:
hdrVal = chanHeaders[iChan][cH]
if cH in keywords:
hdrVal = keywords[cH]
chanHeaders[iChan][cH] = hdrVal
return chanHeaders
[docs] def calcStopDateTime(
self, sampleFreq: float, numSamples: int, datetimeStart: datetime
) -> datetime:
"""Calculate time of last sample
Parameters
----------
sampleFreq : float
Sampling frequency in Hz of the time series data
numSamples : int
The number of samples in the time series data
datetimeStart : datetime
The time of the first sample
"""
# calculate duration in seconds
# numSamples - 1 because have to remove the initial sample which is taken at start time
duration = 1.0 * (numSamples - 1) / sampleFreq
datetimeStop = datetimeStart + timedelta(seconds=duration)
return datetimeStop
[docs] def globalHeaderwords(self) -> List[str]:
"""Get a list of global headerwords to write out
Returns
-------
List[str]
A list of the global header words of interest for writing out
"""
gHeaders = [
"sample_freq",
"num_samples",
"start_time",
"start_date",
"stop_time",
"stop_date",
"meas_channels",
]
return gHeaders
[docs] def chanHeaderwords(self) -> List[str]:
"""Get a list of channel headerwords to write out
Returns
-------
List[str]
A list of the global header words of interest for writing out
"""
cHeaders = [
"sample_freq",
"num_samples",
"start_time",
"start_date",
"stop_time",
"stop_date",
"ats_data_file",
"sensor_type",
"channel_type",
"ts_lsb",
"scaling_applied",
"pos_x1",
"pos_x2",
"pos_y1",
"pos_y2",
"pos_z1",
"pos_z2",
"sensor_sernum",
"gain_stage1",
"gain_stage2",
"hchopper",
"echopper",
]
return cHeaders
[docs] def writeTemplateHeaderFiles(
self,
chans: List[str],
chanFileMap: Dict,
sampleFreq: float,
numSamples: int,
startDate: str,
):
"""Write a set of blank headers
Blank headers might be useful for reading in ascii files where no headers are existing. By giving a few header words, many options can be set
Parameters
----------
chans : List[str]
List of chans (e.g. Ex, Ey, Hx, Hy, Hz)
chanFileMap : Dict[str, str]
Map from channel to file
numsamples : int
Number of samples of data
startdate : str
The start date of the recording in format %Y-%m-%d %H:%M:%S
"""
# calculate start and end datetime
datetimeStart = datetime.strptime(startDate, "%Y-%m-%d %H:%M:%S")
datetimeStop = self.calcStopDateTime(sampleFreq, numSamples, datetimeStart)
# set global header words
globalKeywords = dict()
globalKeywords["sample_freq"] = sampleFreq
globalKeywords["num_samples"] = numSamples
globalKeywords["start_date"] = datetimeStart.strftime("%Y-%m-%d")
globalKeywords["start_time"] = datetimeStart.strftime("%H:%M:%S.%f")
globalKeywords["stop_date"] = datetimeStop.strftime("%Y-%m-%d")
globalKeywords["stop_time"] = datetimeStop.strftime("%H:%M:%S.%f")
globalKeywords["meas_channels"] = len(chans)
# empty dictionary so that some headers get defaulted
emptyDict = dict()
# set global headers for keyword arguments
headers = self.setGlobalHeadersFromKeywords(emptyDict, globalKeywords)
# set channel headers for keyword arguments
chanMap: Dict = dict()
chanHeaders: List[Dict] = list()
for idx, chan in enumerate(chans):
chanMap[chan] = idx
chanKeywords = dict(globalKeywords)
chanKeywords["scaling_applied"] = True
chanKeywords["ts_lsb"] = 1
chanKeywords["gain_stage1"] = 1
chanKeywords["gain_stage2"] = 1
chanKeywords["hchopper"] = 0
chanKeywords["echopper"] = 0
chanKeywords["pos_x1"] = 0
chanKeywords["pos_x2"] = 1
chanKeywords["pos_y1"] = 0
chanKeywords["pos_y2"] = 1
chanKeywords["pos_z1"] = 0
chanKeywords["pos_z2"] = 1
chanKeywords["sensor_sernum"] = 1
chanHeaders.append(chanKeywords)
# set the chan header words
self.setChanHeadersFromKeywords(chanHeaders, emptyDict)
for idx, chan in enumerate(chans):
# amend the data file in the chan headers
chanHeaders[idx]["ats_data_file"] = chanFileMap[chan]
chanHeaders[idx]["channel_type"] = chan
for cH in chanHeaders[idx].keys():
if chanHeaders[idx][cH] == "":
chanHeaders[idx][cH] = "None"
self.writeHeaders(headers, chans, chanMap, chanHeaders, rename=False)
[docs] def writeDataset(self, reader: TimeReader, physical: bool = True, **kwargs) -> None:
"""Write out a dataset by passing a data reader
This method is intended to transform an existing dataset into internal format
Parameters
----------
reader : DataReader
A list of the global header words of interest for writing out
physical : bool, optional
An optional flag designating whether to use physical samples or not. Default is true
"""
if self.getOutPath() == "":
self.printError("No output filepath given", quitRun=True)
checkAndMakeDir(self.getOutPath())
# write using information from a reader file
headers = reader.getHeaders()
chanHeaders, chanMap = reader.getChanHeaders()
# now write depending on whether scaling_applied or not
if physical:
# make sure dataset is written out in float and with scaling_applied header set to True
self.dtype = np.float32
kwargs["scaling_applied"] = True
# write out
self.write(
headers, chanHeaders, chanMap, reader.getPhysicalSamples(), **kwargs
)
else:
# write out unscaled samples
self.printWarning(
"Wrinting out of unscaled samples is not recommended due to scaling differences between the formats."
)
self.printWarning(
"Dataset will be written out but problems may be encountered in the future."
)
self.write(
headers, chanHeaders, chanMap, reader.getUnscaledSamples(), **kwargs
)
[docs] def writeData(
self, headers, chanHeaders, timeData, physical: bool = True, **kwargs
):
"""Write out time data
This method requires the user to pass global headers and chan headers explicitly.
Parameters
----------
headers : Dict
Dictionary of headers
chanHeaders : List
List of channel headers
timeData : TimeData
Time series data to write out
physical : bool, optional
An optional flag designating whether the data is in field units (i.e. all scalings have been applied). This will result in the scaling_applied header being set to True. Default value for physical is True (i.e. data is assumed to be in field units).
"""
if self.getOutPath() == "":
self.printWarning("No output filepath given")
return
# make the directory
checkAndMakeDir(self.getOutPath())
# calculate our own cMap
chanMap = {}
for iChan in range(0, len(chanHeaders)):
chanType = chanHeaders[iChan]["channel_type"]
chanMap[chanType] = iChan
# check if in physical units
if physical:
kwargs["scaling_applied"] = True
self.dtype = np.float32
# write the data
self.write(headers, chanHeaders, chanMap, timeData, **kwargs)
[docs] def write(
self,
headers: Dict,
chanHeaders: List,
chanMap: Dict,
timeData: TimeData,
**kwargs
):
"""Write out the header file
Parameters
----------
headers : Dict
Dictionary of headers
chanHeaders : List
List of channel headers
chanMap : Dict
Maps channel to index for chanHeaders
timeData : TimeData
Time series data as TimeData object
"""
# set global headers for keyword arguments
headers = self.setGlobalHeadersFromKeywords(headers, kwargs)
# set channel headers for keyword arguments
chanHeaders = self.setChanHeadersFromKeywords(chanHeaders, kwargs)
# now overwrite the options by checking the TimeData object
# number of samples and sample frequency
# Current method favours the time data object
chans = sorted(list(timeData.chans))
dataSizes = []
for c in chans:
dataSizes.append(timeData.data[c].size)
if min(dataSizes) != max(dataSizes):
self.printWarning(
"Channels do not have the same number of samples: {} - {}".format(
", ".join(chans), ", ".join(dataSizes)
)
)
self.printWarning("Only the smallest number of samples will be written out")
numSamples = min(dataSizes)
if headers["num_samples"] != numSamples:
self.printWarning(
"Number of samples {} in headers does not match number of samples in TimeData object {}. TimeData info will be used.".format(
headers["num_samples"], numSamples
)
)
headers["num_samples"] = numSamples
timeData.numSamples = numSamples
# sample freq
if headers["sample_freq"] != timeData.sampleFreq:
self.printWarning(
"Sample frequency of {} Hz in headers does not match {} Hz in TimeData object".format(
headers["sample_freq"], timeData.sampleFreq
)
)
self.printWarning("Sample frequency in TimeData object will be used")
headers["sample_freq"] = timeData.sampleFreq
# deal with start and end time and create datetime objects
# the start time does not change on resampling, only the end time
datetimeStart = datetime.strptime(
"{} {}".format(headers["start_date"], headers["start_time"]),
"%Y-%m-%d %H:%M:%S.%f",
)
datetimeStop = datetime.strptime(
"{} {}".format(headers["stop_date"], headers["stop_time"]),
"%Y-%m-%d %H:%M:%S.%f",
)
# now let's compare to the time data
if datetimeStart != timeData.startTime:
self.printWarning(
"Start in headers {} does not match that in TimeData object {}. TimeData start time will be used".format(
datetimeStart, timeData.startTime
)
)
datetimeStart = timeData.startTime
if datetimeStop != timeData.stopTime:
self.printWarning(
"Stop in headers {} does not match that in TimeData object {}. TimeData stop time will be used".format(
datetimeStop, timeData.stopTime
)
)
datetimeStop = timeData.stopTime
# now recalculate datetime using the number of samples and compare again
datetimeRecalc = self.calcStopDateTime(
timeData.sampleFreq, numSamples, datetimeStart
)
if datetimeRecalc != datetimeStop:
self.printWarning(
"Note, discrepancy between stop time in given headers and those calculated from data"
)
self.printWarning(
"Causes of this might be resampling or interpolation processes and the limiting of data"
)
self.printWarning(
"If no resampling, interpolation or limiting of data has been performed, please check all times"
)
self.printWarning(
"Stop time {} calculated from data will be used instead of that in data {}".format(
datetimeRecalc, datetimeStop
)
)
datetimeStop = datetimeRecalc
headers["start_date"] = datetimeStart.strftime("%Y-%m-%d")
headers["start_time"] = datetimeStart.strftime("%H:%M:%S.%f")
headers["stop_date"] = datetimeStop.strftime("%Y-%m-%d")
headers["stop_time"] = datetimeStop.strftime("%H:%M:%S.%f")
# now update all the chan headers and limit data to numSamples
for c in chans:
timeData.data[c] = timeData.data[c][:numSamples]
cIndex = chanMap[c]
chanHeaders[cIndex]["num_samples"] = headers["num_samples"]
chanHeaders[cIndex]["sample_freq"] = headers["sample_freq"]
chanHeaders[cIndex]["start_date"] = headers["start_date"]
chanHeaders[cIndex]["start_time"] = headers["start_time"]
chanHeaders[cIndex]["stop_date"] = headers["stop_date"]
chanHeaders[cIndex]["stop_time"] = headers["stop_time"]
# finally, check the number of measurement channels
headers["meas_channels"] = len(chans)
# now write out the headers and save to class variables
self.writeHeaders(headers, chans, chanMap, chanHeaders)
self.headers = headers
self.chans = chans
self.chanMap = chanMap
self.chanHeaders = chanHeaders
# write out comment file
self.writeComments(timeData.comments)
# write out the data files
self.writeDataFiles(chans, timeData)
[docs] def writeHeaders(
self,
headers: Dict[str, Any],
chans: List[str],
chanMap: Dict[str, int],
chanHeaders: List[Dict],
rename: bool = True,
ext: str = "hdr",
) -> bool:
"""Write out the header file
Parameters
----------
headers : Dict
Dictionary of headers
chans : List[str]
Channels as a list of strings
chanMap : Dict
Maps channel to index for chanHeaders
chanHeaders : List
List of channel headers
rename : bool, optional
Rename the output ats_data_files. Default is True and this is the case when writing out data which has been read in from a different source with pre-existing headers. However, if creating template header files, then set this to False.
ext : str, optional
The extension for the headers. Default is hdr
"""
# write out the global headers
f = open(os.path.join(self.getOutPath(), "global.{}".format(ext)), "w")
f.write("HEADER = GLOBAL\n")
globalHeaderwords = self.globalHeaderwords()
for gH in globalHeaderwords:
f.write("{} = {}\n".format(gH, headers[gH]))
f.close()
# write out the channel headers
chanHeaderwords = self.chanHeaderwords()
for idx, c in enumerate(chans):
cf = open(
os.path.join(self.getOutPath(), "chan_{:02d}.{}".format(idx, ext)), "w"
)
cf.write("HEADER = CHANNEL\n")
# use the chanMap to get the index of the chanHeaders list
cIndex = chanMap[c]
# change the data file if necessary
if rename:
chanHeaders[cIndex]["ats_data_file"] = "chan_{:02d}{}".format(
idx, self.extension
)
# write out all the header words
for cH in chanHeaderwords:
cf.write("{} = {}\n".format(cH, chanHeaders[cIndex][cH]))
cf.close()
return True
[docs] def writeComments(self, comments: List[str]) -> None:
"""Write out a comments file
Parameters
----------
comments : List[str]
List of strings with data comments
"""
import resistics
with open(os.path.join(self.getOutPath(), "comments.txt"), "w") as f:
for c in comments:
f.write("{}\n".format(c))
f.write(
"Time series dataset written to {} on {} using resistics {}\n".format(
self.getOutPath(), datetime.now(), resistics.__version__
)
)
f.write(breakComment())
[docs] def writeDataFiles(self, chans, timeData) -> None:
"""Write out data files"""
raise NotImplementedError(
"Write data files not implemented in base class. Only child classes should ever be instantiated."
)
[docs] def printList(self) -> List[str]:
"""Class information as a list of strings
Returns
-------
out : List[str]
List of strings with information
"""
textLst = []
textLst.append("Output file path for data = {}".format(self.getOutPath()))
# if it exists, print out the headers
if self.headers:
textLst.append("Global Headers")
textLst.append(self.headers)
# if exists, print out a list of chans
if self.chans:
textLst.append("Channels found:")
textLst.append(self.chans)
# if exists, print out the chanMap
if self.chanMap:
textLst.append("Channel Map")
textLst.append(self.chanMap)
# if it exists, print out the chanHeaders
if self.chanHeaders:
textLst.append("Channel Headers")
for c in self.chans:
textLst.append(c)
textLst.append(self.chanHeaders[self.chanMap[c]])
return textLst