Source code for mhkit.utils

from pecos.utils import index_to_datetime
import matplotlib.pyplot as plt 
import datetime as dt
from mhkit import qc
import pandas as pd 
import numpy as np 


_matlab = False # Private variable indicating if mhkit is run through matlab

[docs]def get_statistics(data,freq,period=600,vector_channels=[]): """ Calculate mean, max, min and stdev statistics of continuous data for a given statistical window. Default length of statistical window (period) is based on IEC TS 62600-3:2020 ED1. Also allows calculation of statistics for multiple statistical windows of continuous data and accounts for vector/directional channels. Parameters ------------ data : pandas DataFrame Data indexed by datetime with columns of data to be analyzed freq : float/int Sample rate of data [Hz] period : float/int Statistical window of interest [sec], default = 600 vector_channels : string or list (optional) List of vector/directional channel names formatted in deg (0-360) Returns --------- means,maxs,mins,stdevs : pandas DataFrame Calculated statistical values from the data, indexed by the first timestamp """ # Check data type assert isinstance(data, pd.DataFrame), 'data must be of type pd.DataFrame' assert isinstance(freq, (float,int)), 'freq must be of type int or float' assert isinstance(period, (float,int)), 'freq must be of type int or float' # catch if vector_channels is not an string array if isinstance(vector_channels,str): vector_channels = [vector_channels] assert isinstance(vector_channels, list), 'vector_channels must be a list of strings' # Check timestamp using qc module data.index = data.index.round('1ms') dataQC = qc.check_timestamp(data,1/freq) dataQC = dataQC['cleaned_data'] # Check to see if data length contains enough data points for statistical window if len(dataQC)%(period*freq) > 0: remain = len(dataQC) % (period*freq) dataQC = dataQC.iloc[0:-int(remain)] print('WARNING: there were not enough data points in the last statistical period. Last '+str(remain)+' points were removed.') # Pre-allocate lists time = [] means = [] maxs = [] mins = [] stdev = [] # Get data chunks to performs stats on step = period*freq for i in range(int(len(dataQC)/(period*freq))): datachunk = dataQC.iloc[i*step:(i+1)*step] # Check whether there are any NaNs in datachunk if datachunk.isnull().any().any(): print('NaNs found in statistical window...check timestamps!') input('Press <ENTER> to continue') continue else: # Get stats time.append(datachunk.index.values[0]) # time vector maxs.append(datachunk.max()) # maxes mins.append(datachunk.min()) # mins means.append(datachunk.mean()) # means stdev.append(datachunk.std()) # standard deviation # calculate vector averages and std for v in vector_channels: vector_avg, vector_std = vector_statistics(datachunk[v]) means[i][v] = vector_avg # overwrite scalar average for channel stdev[i][v] = vector_std # overwrite scalar std for channel # Convert to DataFrames and set index means = pd.DataFrame(means,index=time) maxs = pd.DataFrame(maxs,index=time) mins = pd.DataFrame(mins,index=time) stdevs = pd.DataFrame(stdev,index=time) return means,maxs,mins,stdevs
[docs]def vector_statistics(data): """ Function used to calculate statistics for vector/directional channels based on routine from Campbell data logger and Yamartino algorithm Parameters ---------- data : pandas Series, numpy array, list Vector channel to calculate statistics on [deg, 0-360] Returns ------- vector_avg : numpy array Vector mean statistic vector_std : numpy array Vector standard deviation statistic """ try: data = np.array(data) except: pass assert isinstance(data, np.ndarray), 'data must be of type np.ndarray' # calculate mean Ux = sum(np.sin(data*np.pi/180))/len(data) Uy = sum(np.cos(data*np.pi/180))/len(data) vector_avg = (90 - np.arctan2(Uy,Ux)*180/np.pi) if vector_avg<0: vector_avg = vector_avg+360 elif vector_avg>360: vector_avg = vector_avg-360 # calculate standard deviation magsum = round((Ux**2 + Uy**2)*1e8)/1e8 # round to 8th decimal place to reduce roundoff error epsilon = (1-magsum)**0.5 if not np.isreal(epsilon): # check if epsilon is imaginary (error) vector_std = 0 print('WARNING: epsilon contains imaginary value') else: vector_std = np.arcsin(epsilon)*(1+0.1547*epsilon**3)*180/np.pi return vector_avg, vector_std
[docs]def unwrap_vector(data): """ Function used to unwrap vectors into 0-360 deg range Parameters ------------ data : pandas Series, numpy array, list Data points to be unwrapped [deg] Returns --------- data : numpy array Data points unwrapped between 0-360 deg """ # Check data types try: data = np.array(data) except: pass assert isinstance(data, np.ndarray), 'data must be of type np.ndarray' # Loop through and unwrap points for i in range(len(data)): if data[i] < 0: data[i] = data[i]+360 elif data[i] > 360: data[i] = data[i]-360 if max(data) > 360 or min(data) < 0: data = unwrap_vector(data) return data
[docs]def matlab_to_datetime(matlab_datenum): """ Convert MATLAB datenum format to Python datetime Parameters ------------ matlab_datenum : numpy array MATLAB datenum to be converted Returns --------- time : DateTimeIndex Python datetime values """ # Check data types try: matlab_datenum = np.array(matlab_datenum,ndmin=1) except: pass assert isinstance(matlab_datenum, np.ndarray), 'data must be of type np.ndarray' # Pre-allocate time = [] # loop through dates and convert for t in matlab_datenum: day = dt.datetime.fromordinal(int(t)) dayfrac = dt.timedelta(days=t%1) - dt.timedelta(days = 366) time.append(day + dayfrac) time = np.array(time) time = pd.to_datetime(time) return time
[docs]def excel_to_datetime(excel_num): """ Convert Excel datenum format to Python datetime Parameters ------------ excel_num : numpy array Excel datenums to be converted Returns --------- time : DateTimeIndex Python datetime values """ # Check data types try: excel_num = np.array(excel_num) except: pass assert isinstance(excel_num, np.ndarray), 'data must be of type np.ndarray' # Convert to datetime time = pd.to_datetime('1899-12-30')+pd.to_timedelta(excel_num,'D') return time
[docs]def magnitude_phase(x,y,z=None): ''' Retuns magnitude and phase in two or three dimensions. Parameters ---------- x: array_like x-component y: array_like y-component z: array_like z-component defined positive up. (Optional) Default None. Returns ------- mag: float or array magnitude of the vector theta: float or array radians from the x-axis phi: float or array radians from z-axis defined as positive up. Optional: only returned when z is passed. ''' x=np.array(x) y=np.array(y) threeD=False if not isinstance(z, type(None)): z=np.array(z) threeD=True assert isinstance(x, (float,int,np.ndarray)) assert isinstance(y, (float,int,np.ndarray)) assert isinstance(z, (type(None),float,int,np.ndarray)) if threeD: mag = np.sqrt(x**2 + y**2 + z**2) theta = np.arctan2(y,x) phi = np.arctan2(np.sqrt(x**2+y**2),z) return mag, theta, phi else: mag = np.sqrt(x**2 + y**2) theta = np.arctan2(y, x) return mag, theta
[docs]def unorm(x, y ,z): ''' Calculates the root mean squared value given three arrays. Parameters ---------- x: array One input for the root mean squared calculation.(eq. x velocity) y: array One input for the root mean squared calculation.(eq. y velocity) z: array One input for the root mean squared calculation.(eq. z velocity) Returns ------- unorm : array The root mean squared of x, y, and z. Example ------- If the inputs are [1,2,3], [4,5,6], and [7,8,9] the code take the cordinationg value from each array and calculates the root mean squared. The resulting output is [ 8.1240384, 9.64365076, 11.22497216]. ''' assert isinstance(x,(np.ndarray, np.float64, pd.Series)), 'x must be an array' assert isinstance(y,(np.ndarray, np.float64, pd.Series)), 'y must be an array' assert isinstance(z,(np.ndarray, np.float64, pd.Series)), 'z must be an array' assert all([len(x) == len(y), len (y) ==len (z)]), ('lengths of arrays must' +' match') xyz = np.array([x,y,z]) unorm = np.linalg.norm(xyz, axis= 0) return unorm