Source code for mhkit.utils.stat_utils

"""
This module contains functions to perform various statistical calculations 
on continuous data. It includes functions for calculating statistics such as
mean, max, min, and standard deviation over specific windows, as well as functions
for vector/directional statistics. The module also provides utility functions 
to unwrap vectors, compute magnitudes and phases in 2D/3D, and calculate 
the root mean squared values of vector components.

Functions:
----------
- get_statistics: Calculates statistics for continuous data.
- vector_statistics: Calculates vector mean and standard deviation.
- unwrap_vector: Unwraps vector data to fall within a 0-360 degree range.
- magnitude_phase: Computes magnitude and phase for 2D or 3D data.
- unorm: Computes root mean squared value of 3D vectors.
"""

from typing import List, Dict, Optional, Tuple, Union
import pandas as pd
import numpy as np
from mhkit import qc


def _calculate_statistics(
    datachunk: pd.DataFrame, vector_channels: List[str]
) -> Dict[str, Union[pd.Series, float]]:
    """
    Calculate the mean, max, min, and standard deviation for the given datachunk.
    Also calculate vector statistics for vector_channels.

    Parameters
    ----------
    datachunk : pandas DataFrame
        A chunk of data on which to perform statistics.
    vector_channels : list
        List of vector channel names formatted in deg (0-360).

    Returns
    -------
    stats : dict
        A dictionary containing 'means', 'maxs', 'mins', and 'stdevs'.
    """
    means = datachunk.mean()
    maxs = datachunk.max()
    mins = datachunk.min()
    stdevs = datachunk.std()

    for v in vector_channels:
        vector_avg, vector_std = vector_statistics(datachunk[v])
        # overwrite scalar average and std for channel
        means[v] = vector_avg
        stdevs[v] = vector_std

    return {"means": means, "maxs": maxs, "mins": mins, "stdevs": stdevs}


[docs] def get_statistics( data: pd.DataFrame, freq: Union[float, int], period: Union[float, int] = 600, vector_channels: Optional[Union[str, List[str]]] = None, ) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFrame]: """ Calculate mean, max, min and stdev statistics of continuous data for a given statistical window. Default length of statistical window (period) is based on IEC TS 62600-3:2020 ED1. Also allows calculation of statistics for multiple statistical windows of continuous data and accounts for vector/directional channels. Parameters ------------ data : pandas DataFrame Data indexed by datetime with columns of data to be analyzed freq : float/int Sample rate of data [Hz] period : float/int Statistical window of interest [sec], default = 600 vector_channels : string or list (optional) List of vector/directional channel names formatted in deg (0-360) Returns --------- means,maxs,mins,stdevs : pandas DataFrame Calculated statistical values from the data, indexed by the first timestamp """ if vector_channels is None: vector_channels = [] if isinstance(vector_channels, str): vector_channels = [vector_channels] if not isinstance(data, pd.DataFrame): raise TypeError(f"data must be of type pd.DataFrame. Got: {type(data)}") if not isinstance(freq, (float, int)): raise TypeError(f"freq must be of type int or float. Got: {type(freq)}") if not isinstance(period, (float, int)): raise TypeError(f"period must be of type int or float. Got: {type(period)}") if not isinstance(vector_channels, list): raise TypeError( f"vector_channels must be a list of strings. Got: {type(vector_channels)}" ) data.index = data.index.round("1ms") data_qc = qc.check_timestamp(data, 1 / freq)["cleaned_data"] if len(data_qc) % (period * freq) > 0: remain = len(data_qc) % (period * freq) data_qc = data_qc.iloc[0 : -int(remain)] print( f"WARNING: there were not enough data points in the last statistical period. \ Last {remain} points were removed." ) time = [] means = [] maxs = [] mins = [] stdevs = [] step = period * freq for i in range(int(len(data_qc) / step)): datachunk = data_qc.iloc[i * step : (i + 1) * step] if datachunk.isnull().any().any(): print("NaNs found in statistical window...check timestamps!") input("Press <ENTER> to continue") continue time.append(datachunk.index.values[0]) # Calculate statistics for this chunk stats = _calculate_statistics(datachunk, vector_channels) means.append(stats["means"]) maxs.append(stats["maxs"]) mins.append(stats["mins"]) stdevs.append(stats["stdevs"]) # Convert lists to DataFrames means = pd.DataFrame(means, index=time) maxs = pd.DataFrame(maxs, index=time) mins = pd.DataFrame(mins, index=time) stdevs = pd.DataFrame(stdevs, index=time) return means, maxs, mins, stdevs
[docs] def vector_statistics( data: Union[pd.Series, np.ndarray, list] ) -> Tuple[np.ndarray, np.ndarray]: """ Function used to calculate statistics for vector/directional channels based on routine from Campbell data logger and Yamartino algorithm Parameters ---------- data : pandas Series, numpy array, list Vector channel to calculate statistics on [deg, 0-360] Returns ------- vector_avg : numpy array Vector mean statistic vector_std : numpy array Vector standard deviation statistic """ try: data = np.array(data) except (TypeError, ValueError) as e: raise TypeError(f"Error converting data to numpy array: {e}") from e if not isinstance(data, np.ndarray): raise TypeError(f"data must be of type np.ndarray. Got: {type(data)}") # calculate mean u_x = sum(np.sin(data * np.pi / 180)) / len(data) u_y = sum(np.cos(data * np.pi / 180)) / len(data) vector_avg = 90 - np.arctan2(u_y, u_x) * 180 / np.pi if vector_avg < 0: vector_avg = vector_avg + 360 elif vector_avg > 360: vector_avg = vector_avg - 360 # calculate standard deviation # round to 8th decimal place to reduce roundoff error magsum = round((u_x**2 + u_y**2) * 1e8) / 1e8 epsilon = (1 - magsum) ** 0.5 if not np.isreal(epsilon): # check if epsilon is imaginary (error) vector_std = 0 print("WARNING: epsilon contains imaginary value") else: vector_std = np.arcsin(epsilon) * (1 + 0.1547 * epsilon**3) * 180 / np.pi return vector_avg, vector_std
[docs] def unwrap_vector(data: Union[pd.Series, np.ndarray, list]) -> np.ndarray: """ Function used to unwrap vectors into 0-360 deg range Parameters ------------ data : pandas Series, numpy array, list Data points to be unwrapped [deg] Returns --------- data : numpy array Data points unwrapped between 0-360 deg """ # Check data types try: data = np.array(data) except (TypeError, ValueError) as e: raise TypeError(f"Error converting data to numpy array: {e}") from e if not isinstance(data, np.ndarray): raise TypeError(f"data must be of type np.ndarray. Got: {type(data)}") # Loop through and unwrap points for i, value in enumerate(data): if value < 0: data[i] = value + 360 elif value > 360: data[i] = value - 360 if max(data) > 360 or min(data) < 0: data = unwrap_vector(data) return data
[docs] def magnitude_phase( x: Union[float, int, np.ndarray], y: Union[float, int, np.ndarray], z: Optional[Union[float, int, np.ndarray]] = None, ) -> Union[ Tuple[Union[float, np.ndarray], Union[float, np.ndarray]], Tuple[Union[float, np.ndarray], Union[float, np.ndarray], Union[float, np.ndarray]], ]: """ Retuns magnitude and phase in two or three dimensions. Parameters ---------- x: array_like x-component y: array_like y-component z: array_like z-component defined positive up. (Optional) Default None. Returns ------- mag: float or array magnitude of the vector theta: float or array radians from the x-axis phi: float or array radians from z-axis defined as positive up. Optional: only returned when z is passed. """ x = np.array(x) y = np.array(y) three_d = False if not isinstance(z, type(None)): z = np.array(z) three_d = True if not isinstance(x, (float, int, np.ndarray)): raise TypeError(f"x must be of type float, int, or np.ndarray. Got: {type(x)}") if not isinstance(y, (float, int, np.ndarray)): raise TypeError(f"y must be of type float, int, or np.ndarray. Got: {type(y)}") if not isinstance(z, (type(None), float, int, np.ndarray)): raise TypeError( f"If specified, z must be of type float, int, or np.ndarray. Got: {type(z)}" ) if three_d: mag = np.sqrt(x**2 + y**2 + z**2) theta = np.arctan2(y, x) phi = np.arctan2(np.sqrt(x**2 + y**2), z) return mag, theta, phi mag = np.sqrt(x**2 + y**2) theta = np.arctan2(y, x) return mag, theta
[docs] def unorm( x: Union[np.ndarray, np.float64, pd.Series], y: Union[np.ndarray, np.float64, pd.Series], z: Union[np.ndarray, np.float64, pd.Series], ) -> Union[np.ndarray, np.float64]: """ Calculates the root mean squared value given three arrays. Parameters ---------- x: array One input for the root mean squared calculation.(eq. x velocity) y: array One input for the root mean squared calculation.(eq. y velocity) z: array One input for the root mean squared calculation.(eq. z velocity) Returns ------- u_norm : array The root mean squared of x, y, and z. Example ------- If the inputs are [1,2,3], [4,5,6], and [7,8,9] the code take the cordinationg value from each array and calculates the root mean squared. The resulting output is [ 8.1240384, 9.64365076, 11.22497216]. """ if not isinstance(x, (np.ndarray, np.float64, pd.Series)): raise TypeError( f"x must be of type np.ndarray, np.float64, or pd.Series. Got: {type(x)}" ) if not isinstance(y, (np.ndarray, np.float64, pd.Series)): raise TypeError( f"y must be of type np.ndarray, np.float64, or pd.Series. Got: {type(y)}" ) if not isinstance(z, (np.ndarray, np.float64, pd.Series)): raise TypeError( f"z must be of type np.ndarray, np.float64, or pd.Series. Got: {type(z)}" ) if not all([len(x) == len(y), len(y) == len(z)]): raise ValueError("lengths of arrays must match") xyz = np.array([x, y, z]) u_norm = np.linalg.norm(xyz, axis=0) return u_norm