Source code for resistics.project.time

import sys
import os
import numpy as np
from datetime import datetime
import matplotlib.pyplot as plt
from matplotlib.figure import Figure
from typing import Dict, Union, List

from resistics.project.data import ProjectData
from resistics.time.reader import TimeReader
from resistics.time.writer_internal import TimeWriterInternal
from resistics.common.checks import parseKeywords, isElectric
from resistics.project.utils import projectText, projectBlock, checkDateOptions


[docs]def getTimeReader(projData: ProjectData, site: str, meas: str) -> TimeReader: """Get a data reader object for a measurement Data readers will then return a timeData object when given a sample range of data to return for Parameters ---------- projData : ProjectData A project data object site : str The site for which to get the data reader meas : str The measurement in the site for which to get the data reader Returns ------- DataReader A data reader object which allows a user to get time data """ siteData = projData.getSiteData(site) return siteData.getMeasurement(meas)
[docs]def preProcess(projData: ProjectData, **kwargs) -> None: """Pre-process project time data Preprocess the time data using filters, notch filters, resampling or interpolation. A new measurement folder is created under the site. The name of the new measurement folder is: prepend_[name of input measurement]_postpend. By default, prepend is "proc" and postpend is empty. Processed time series data can be saved in a new site by using the outputsite option. Parameters ---------- projData : ProjectData A project data object sites : str, List[str], optional Either a single site or a list of sites sampleFreqs : int, float, List[float], optional The frequencies to preprocess start : str, optional Start date of data to preprocess in format "%Y-%m-%d %H:%M:%S" stop : str, optional Stop date of data to process in format "%Y-%m-%d %H:%M:%S" outputsite : str, optional A site to output the preprocessed time data to. If this site does not exist, it will be created polreverse : Dict[str, bool] Keys are channels and values are boolean flags for reversing scale : Dict[str, float] Keys are channels and values are floats to multiply the channel data by calibrate : bool, optional Boolean flag for calibrating the data. Default is false and setting to True will calibrate where files can be found. normalise : bool, optional Boolean flag for normalising the data. Default is False and setting to True will normalise each channel independently. filter : Dict, optional Filtering options in a dictionary notch : List[float], optional List of frequencies to notch in spectra given as a list of floats resamp : Dict, optional Resampling parameters in a dictionary with entries in the format: {sampleRateFrom: sampleRateTo}. All measurement directories of sampleRateFrom will be resampled to sampleRateTo interp : bool, optional Boolean flag for interpolating the data on to the second, so that sampling is coincident with seconds. This is not always the case. For example, SPAM data is not necessarily sampled on the second, whereas ATS data is. This function is useful when combining data of multiple formats. Interpolation does not change the sampling rate. Default is False. prepend : str, optional String to prepend to the output folder. Default is "proc". postpend : str, optional String to postpend to the output folder. Default is empty. """ from resistics.project.shortcuts import getCalibrator from resistics.project.preprocess import ( applyPolarisationReversalOptions, applyScaleOptions, applyCalibrationOptions, applyFilterOptions, applyInterpolationOptions, applyNormaliseOptions, applyNotchOptions, applyResampleOptions, ) options: Dict = {} options["sites"]: List = projData.getSites() options["sampleFreqs"]: List[float] = projData.getSampleFreqs() options["start"]: Union[bool, str] = False options["stop"]: Union[bool, str] = False options["outputsite"]: str = "" options["polreverse"]: Union[bool, Dict[str, bool]] = False options["scale"]: Union[bool, Dict[str, float]] = False options["calibrate"]: bool = False options["normalise"]: bool = False options["filter"]: Dict = {} options["notch"]: List[float] = [] options["resamp"]: Dict = {} options["interp"]: bool = False options["prepend"]: str = "proc" options["postpend"]: str = "" options = parseKeywords(options, kwargs) # print info text: List = ["Processing with options"] for op, val in options.items(): text.append("\t{} = {}".format(op, val)) projectBlock(text) if isinstance(options["sites"], str): options["sites"] = [options["sites"]] # outputting to another site if options["outputsite"] != "": projectText( "Preprocessed data will be saved to output site {}".format( options["outputsite"] ) ) # create the site projData.createSite(options["outputsite"]) projData.refresh() outputSitePath = projData.getSiteData(options["outputsite"]).timePath # output naming outPre = options["prepend"] + "_" if options["prepend"] != "" else "" outPost = "_" + options["postpend"] if options["postpend"] != "" else "" if outPre == "" and outPost == "" and options["outputsite"] == "": outPre = "proc_" # create a data calibrator writer instance cal = getCalibrator(projData.calPath, projData.config) if options["calibrate"]: cal.printInfo() writer = TimeWriterInternal() # format dates if options["start"]: options["start"] = datetime.strptime(options["start"], "%Y-%m-%d %H:%M:%S") if options["stop"]: options["stop"] = datetime.strptime(options["stop"], "%Y-%m-%d %H:%M:%S") for site in options["sites"]: siteData = projData.getSiteData(site) siteData.printInfo() # loop over frequencies for sampleFreq in options["sampleFreqs"]: measurements = siteData.getMeasurements(sampleFreq) if len(measurements) == 0: # no data files at this sample rate continue # otherwise, process for meas in measurements: # get the reader projectText("Processing site {}, measurement {}".format(site, meas)) reader = siteData.getMeasurement(meas) startTime = reader.getStartDatetime() stopTime = reader.getStopDatetime() if (options["start"] or options["stop"]) and not checkDateOptions( options, startTime, stopTime ): continue # if the data contributes, copy in the data if relevant if options["start"]: startTime = options["start"] if options["stop"]: stopTime = options["stop"] # calculate the samples sampleStart, sampleEnd = reader.time2sample(startTime, stopTime) # now get the data timeData = reader.getPhysicalSamples( startSample=sampleStart, endSample=sampleEnd ) timeData.printInfo() headers = reader.getHeaders() chanHeaders, _ = reader.getChanHeaders() # apply options applyPolarisationReversalOptions(options, timeData) applyScaleOptions(options, timeData) applyCalibrationOptions(options, cal, timeData, reader) applyFilterOptions(options, timeData) applyNotchOptions(options, timeData) applyInterpolationOptions(options, timeData) applyResampleOptions(options, timeData) applyNormaliseOptions(options, timeData) # output dataset path if options["outputsite"] != "": timePath = outputSitePath else: timePath = siteData.timePath outPath = os.path.join(timePath, "{}{}{}".format(outPre, meas, outPost)) # write time data - need to manually change some headers (hence the keywords) writer = TimeWriterInternal() writer.setOutPath(outPath) writer.writeData( headers, chanHeaders, timeData, start_time=timeData.startTime.strftime("%H:%M:%S.%f"), start_date=timeData.startTime.strftime("%Y-%m-%d"), stop_time=timeData.stopTime.strftime("%H:%M:%S.%f"), stop_date=timeData.stopTime.strftime("%Y-%m-%d"), numSamples=timeData.numSamples, sample_freq=timeData.sampleFreq, physical=True, ) writer.printInfo()
[docs]def viewTime( projData: ProjectData, startDate: str, endDate: str, **kwargs ) -> Union[Figure, None]: """View timeseries in the project Parameters ---------- projData : ProjectData The project data instance startDate : str The start of the data range to plot endDate : str The end of the date range to plot sites : List[str], optional List of sites sampleFreqs : List[float], optional List of sample frequencies to plot chans : List[str], optional List of channels to plot polreverse : Dict[str, bool] Keys are channels and values are boolean flags for reversing scale : Dict[str, float] Keys are channels and values are floats to multiply the channel data by calibrate : bool, optional Boolean flag to calibrate data normalise : bool, optional Boolean flag to normalise the data. Default is False and setting to True will normalise each channel independently. notch : List[float], optional List of frequencies to notch out filter : Dict, optional Filter parameters show : bool, optional Boolean flag to show the plot save : bool, optional Boolean flag to save the plot to images folder plotoptions : Dict Dictionary of plot options Returns ------- matplotlib.pyplot.figure or None A matplotlib figure unless the plot is not shown and is saved, in which case None and the figure is closed. """ from resistics.project.shortcuts import getCalibrator from resistics.project.preprocess import ( applyPolarisationReversalOptions, applyScaleOptions, applyCalibrationOptions, applyFilterOptions, applyNormaliseOptions, applyNotchOptions, ) from resistics.common.plot import savePlot, plotOptionsTime options = {} options["sites"]: List[str] = projData.sites options["sampleFreqs"]: Union[List[float], List[str]] = projData.getSampleFreqs() options["chans"]: List[str] = ["Ex", "Ey", "Hx", "Hy", "Hz"] options["polreverse"]: Union[bool, Dict[str, bool]] = False options["scale"]: Union[bool, Dict[str, float]] = False options["calibrate"]: bool = False options["normalise"]: bool = False options["filter"]: Dict = {} options["notch"]: List[float] = [] options["show"]: bool = True options["save"]: bool = False options["plotoptions"]: Dict = plotOptionsTime() options = parseKeywords(options, kwargs) # prepare calibrator cal = getCalibrator(projData.calPath, projData.config) if options["calibrate"]: cal.printInfo() # format startDate and endDate start = datetime.strptime("{}.000".format(startDate), "%Y-%m-%d %H:%M:%S.%f") stop = datetime.strptime("{}.000".format(endDate), "%Y-%m-%d %H:%M:%S.%f") # collect relevant data - dictionary to store timeData timeDataAll = {} for site in options["sites"]: siteData = projData.getSiteData(site) if isinstance(siteData, bool): # site does not exist continue siteData.printInfo() measurements = siteData.getMeasurements() timeDataAll[site] = {} # loop over measurements and save data for each one for meas in measurements: sampleFreq = siteData.getMeasurementSampleFreq(meas) if sampleFreq not in options["sampleFreqs"]: continue # check if data contributes to user defined time period siteStart = siteData.getMeasurementStart(meas) siteStop = siteData.getMeasurementEnd(meas) if siteStop < start or siteStart > stop: continue reader = siteData.getMeasurement(meas) # get the samples of the datetimes sampleStart, sampleStop = reader.time2sample(start, stop) # as the samples returned from time2sample are rounded use sample2time to get the appropriate start and end times for those samples readStart, readStop = reader.sample2time(sampleStart, sampleStop) # get the data for any available channels meaning even those sites with missing channels can be plotted timeData = reader.getPhysicalData(readStart, readStop) projectText( "Plotting measurement {} of site {} between {} and {}".format( meas, site, readStart, readStop ) ) # apply various options applyPolarisationReversalOptions(options, timeData) applyScaleOptions(options, timeData) applyCalibrationOptions(options, cal, timeData, reader) applyFilterOptions(options, timeData) applyNotchOptions(options, timeData) applyNormaliseOptions(options, timeData) timeDataAll[site][meas] = timeData # plot all the data plotfonts = options["plotoptions"]["plotfonts"] fig = plt.figure(figsize=options["plotoptions"]["figsize"]) for site in timeDataAll: for meas in timeDataAll[site]: timeData = timeDataAll[site][meas] timeData.view( sampleStop=timeDataAll[site][meas].numSamples - 1, fig=fig, chans=options["chans"], label="{} - {}".format(site, meas), xlim=[start, stop], plotfonts=plotfonts, ) # add the suptitle st = fig.suptitle( "Time data from {} to {}".format( start.strftime("%Y-%m-%d %H-%M-%S"), stop.strftime("%Y-%m-%d %H-%M-%S") ), fontsize=plotfonts["suptitle"], ) st.set_y(0.98) # do the axis labels numChans = len(options["chans"]) for idx, chan in enumerate(options["chans"]): plt.subplot(numChans, 1, idx + 1) # do the yaxis if isElectric(chan): plt.ylabel("mV/km", fontsize=plotfonts["axisLabel"]) if len(options["plotoptions"]["Eylim"]) > 0: plt.ylim(options["plotoptions"]["Eylim"]) else: if options["calibrate"]: plt.ylabel("nT", fontsize=plotfonts["axisLabel"]) else: plt.ylabel("mV", fontsize=plotfonts["axisLabel"]) if len(options["plotoptions"]["Hylim"]) > 0: plt.ylim(options["plotoptions"]["Hylim"]) plt.legend(loc=1, fontsize=plotfonts["legend"]) # plot format fig.tight_layout(rect=[0, 0.02, 1, 0.96]) fig.subplots_adjust(top=0.92) # plot show and save if options["save"]: impath = projData.imagePath filename = "timeData_{}_{}".format( start.strftime("%Y-%m-%d_%H-%M-%S_"), stop.strftime("%Y-%m-%d_%H-%M-%S") ) savename = savePlot(impath, filename, fig) projectText("Image saved to file {}".format(savename)) if options["show"]: plt.show(block=options["plotoptions"]["block"]) if not options["show"] and options["save"]: plt.close(fig) return None return fig