"""
This module contains functions for calculating various aspects of power quality, 
particularly focusing on the analysis of harmonics, interharmonics and distortion 
in electrical power systems. These functions are designed to assist in power 
quality assessments by providing tools to analyze voltage and current signals 
for their harmonic and interharmonic components based on the guidelines and methodologies 
outlined in IEC 61000-4-7:2008 ED2 and in IEC 62600-30:2018 ED1.
Functions in this module include:
- harmonics: Calculates the harmonics from time series of voltage or current. 
  This function returns the amplitude of the time-series data harmonics indexed by 
  the harmonic frequency, aiding in the identification of harmonic distortions 
  within the power system.
- harmonic_subgroups: Computes the harmonic subgroups as per IEC 61000-4-7 standards. 
  Harmonic subgroups provide insights into the distribution of power across 
  different harmonic frequencies, which is crucial for understanding the behavior 
  of non-linear loads and their impact on the power quality.
- total_harmonic_current_distortion (THCD): Determines the total harmonic current 
  distortion, offering a summary metric that quantifies the overall level of 
  harmonic distortion present in the current waveform. This metric is essential 
  for assessing compliance with power quality standards and guidelines.
- interharmonics: Identifies and calculates the interharmonics present in the 
  power system. Interharmonics, which are frequencies that occur between the 
  fundamental and harmonic frequencies, can arise from various sources and 
  potentially lead to power quality issues.
"""
from typing import Union
import pandas as pd
import numpy as np
from scipy import fftpack
import xarray as xr
from mhkit.utils import convert_to_dataset
[docs]
def harmonics(
    signal_data: Union[pd.Series, pd.DataFrame, xr.DataArray, xr.Dataset],
    freq: Union[float, int],
    grid_freq: int,
    to_pandas: bool = True,
) -> Union[pd.DataFrame, xr.Dataset]:
    """
    Calculates the harmonics from time series of voltage or current based on IEC 61000-4-7.
    Parameters
    -----------
    signal_data: pandas Series, pandas DataFrame, xarray DataArray, or xarray Dataset
        Time-series of voltage [V] or current [A]
    freq: float or Int
        Frequency of the time-series data [Hz]
    grid_freq: int
        Value indicating if the power supply is 50 or 60 Hz. Options = 50 or 60
    to_pandas: bool (Optional)
        Flag to save output to pandas instead of xarray. Default = True.
    Returns
    --------
    harmonic_amplitudes: pandas DataFrame or xarray Dataset
        Amplitude of the time-series data harmonics indexed by the harmonic
        frequency with signal name columns
    """
    if not isinstance(signal_data, (pd.Series, pd.DataFrame, xr.DataArray, xr.Dataset)):
        raise TypeError(
            "signal_data must be of type pd.Series, pd.DataFrame, "
            + f"xr.DataArray, or xr.Dataset. Got {type(signal_data)}"
        )
    if not isinstance(freq, (float, int)):
        raise TypeError(f"freq must be of type float or integer. Got {type(freq)}")
    if grid_freq not in [50, 60]:
        raise ValueError(f"grid_freq must be either 50 or 60. Got {grid_freq}")
    if not isinstance(to_pandas, bool):
        raise TypeError(f"to_pandas must be of type bool. Got {type(to_pandas)}")
    # Convert input to xr.Dataset
    signal_data = convert_to_dataset(signal_data, "data")
    sample_spacing = 1.0 / freq
    # Loop through all variables in signal_data
    harmonic_amplitudes = xr.Dataset()
    for var in signal_data.data_vars:
        dataarray = signal_data[var]
        dataarray = dataarray.to_numpy()
        frequency_bin_centers = fftpack.fftfreq(len(dataarray), d=sample_spacing)
        harmonics_amplitude = np.abs(np.fft.fft(dataarray, axis=0))
        harmonic_amplitudes = harmonic_amplitudes.assign(
            {var: (["frequency"], harmonics_amplitude)}
        )
        harmonic_amplitudes = harmonic_amplitudes.assign_coords(
            {"frequency": frequency_bin_centers}
        )
    harmonic_amplitudes = harmonic_amplitudes.sortby("frequency")
    if grid_freq == 60:
        hertz = np.arange(0, 3060, 5)
    elif grid_freq == 50:
        hertz = np.arange(0, 2570, 5)
    else:
        raise ValueError(f"grid_freq must be either 50 or 60. Got {grid_freq}")
    harmonic_amplitudes = harmonic_amplitudes.reindex(
        {"frequency": hertz}, method="nearest"
    )
    harmonic_amplitudes = (
        harmonic_amplitudes / len(signal_data[list(signal_data.dims)[0]]) * 2
    )
    if to_pandas:
        harmonic_amplitudes = harmonic_amplitudes.to_pandas()
    return harmonic_amplitudes 
[docs]
def harmonic_subgroups(
    harmonic_amplitudes: Union[pd.Series, pd.DataFrame, xr.DataArray, xr.Dataset],
    grid_freq: int,
    frequency_dimension: str = "",
    to_pandas: bool = True,
) -> Union[pd.DataFrame, xr.Dataset]:
    """
    Calculates the harmonic subgroups based on IEC 61000-4-7
    Parameters
    ----------
    harmonic_amplitudes: pandas Series, pandas DataFrame, xarray DataArray, or xarray Dataset
        Harmonic amplitude indexed by the harmonic frequency
    grid_freq: int
        Value indicating if the power supply is 50 or 60 Hz. Options = 50 or 60
    frequency_dimension: string (optional)
        Name of the xarray dimension corresponding to frequency. If not supplied,
        defaults to the first dimension. Does not affect pandas input.
    to_pandas: bool (Optional)
        Flag to save output to pandas instead of xarray. Default = True.
    Returns
    --------
    subgroup_results: pandas DataFrame or xarray Dataset
        Harmonic subgroups indexed by harmonic frequency
        with signal name columns
    """
    if not isinstance(
        harmonic_amplitudes, (pd.Series, pd.DataFrame, xr.DataArray, xr.Dataset)
    ):
        raise TypeError(
            "harmonic_amplitudes must be of type pd.Series, pd.DataFrame, "
            + f"xr.DataArray, or xr.Dataset. Got {type(harmonic_amplitudes)}"
        )
    if grid_freq not in [50, 60]:
        raise ValueError(f"grid_freq must be either 50 or 60. Got {grid_freq}")
    if not isinstance(to_pandas, bool):
        raise TypeError(f"to_pandas must be of type bool. Got: {type(to_pandas)}")
    if not isinstance(frequency_dimension, str):
        raise TypeError(
            f"frequency_dimension must be of type str. Got: {type(frequency_dimension)}"
        )
    # Convert input to xr.Dataset
    harmonic_amplitudes = convert_to_dataset(harmonic_amplitudes, "harmonic_amplitudes")
    if (
        frequency_dimension != ""
        and frequency_dimension not in harmonic_amplitudes.coords
    ):
        raise ValueError(
            "frequency_dimension was supplied but is not a dimension "
            + f"of harmonic_amplitudes. Got {frequency_dimension}"
        )
    if grid_freq == 60:
        hertz = np.arange(0, 3060, 60)
    else:
        hertz = np.arange(0, 2550, 50)
    # Sort input data index
    if frequency_dimension == "":
        frequency_dimension = list(harmonic_amplitudes.dims)[0]
    harmonic_amplitudes = harmonic_amplitudes.sortby(frequency_dimension)
    # Loop through all variables in harmonics
    subgroup_results = xr.Dataset()
    for var in harmonic_amplitudes.data_vars:
        dataarray = harmonic_amplitudes[var]
        subgroup = np.zeros(np.size(hertz))
        for ihz in np.arange(0, len(hertz)):
            current_frequency = hertz[ihz]
            ind = dataarray.indexes[frequency_dimension].get_loc(current_frequency)
            data_subset = dataarray.isel({frequency_dimension: [ind - 1, ind, ind + 1]})
            subgroup[ihz] = (data_subset**2).sum() ** 0.5
        subgroup_results = subgroup_results.assign({var: (["frequency"], subgroup)})
        subgroup_results = subgroup_results.assign_coords({"frequency": hertz})
    if to_pandas:
        subgroup_results = subgroup_results.to_pandas()
    return subgroup_results 
[docs]
def total_harmonic_current_distortion(
    harmonics_subgroup: Union[pd.Series, pd.DataFrame, xr.DataArray, xr.Dataset],
    frequency_dimension: str = "",
    to_pandas: bool = True,
) -> Union[pd.DataFrame, xr.Dataset]:
    """
    Calculates the total harmonic current distortion (THC) based on IEC/TS 62600-30
    Parameters
    ----------
    harmonics_subgroup: pandas Series, pandas DataFrame, xarray DataArray, or xarray Dataset
        Subgrouped current harmonics indexed by harmonic frequency
    frequency_dimension: string (optional)
        Name of the xarray dimension corresponding to frequency. If not supplied,
        defaults to the first dimension. Does not affect pandas input.
    to_pandas: bool (optional)
        Flag to save output to pandas instead of xarray. Default = True.
    Returns
    --------
    thcd_result: pd.DataFrame or xarray Dataset
        Total harmonic current distortion indexed by signal name with THCD column
    """
    if not isinstance(
        harmonics_subgroup, (pd.Series, pd.DataFrame, xr.DataArray, xr.Dataset)
    ):
        raise TypeError(
            "harmonics_subgroup must be of type pd.Series, pd.DataFrame, "
            + f"xr.DataArray, or xr.Dataset. Got {type(harmonics_subgroup)}"
        )
    if not isinstance(to_pandas, bool):
        raise TypeError(f"to_pandas must be of type bool. Got: {type(to_pandas)}")
    if not isinstance(frequency_dimension, str):
        raise TypeError(
            f"frequency_dimension must be of type bool. Got: {type(frequency_dimension)}"
        )
    # Convert input to xr.Dataset
    harmonics_subgroup = convert_to_dataset(harmonics_subgroup, "harmonics_subgroup")
    if (
        frequency_dimension != ""
        and frequency_dimension not in harmonics_subgroup.coords
    ):
        raise ValueError(
            "frequency_dimension was supplied but is not a dimension "
            + f"of harmonics. Got {frequency_dimension}"
        )
    if frequency_dimension == "":
        frequency_dimension = list(harmonics_subgroup.dims)[0]
    harmonics_sq = harmonics_subgroup.isel({frequency_dimension: slice(2, 50)}) ** 2
    harmonics_sum = harmonics_sq.sum()
    thcd_result = (
        np.sqrt(harmonics_sum) / harmonics_subgroup.isel({frequency_dimension: 1})
    ) * 100
    if isinstance(thcd_result, xr.DataArray):
        thcd_result.name = ["THCD"]
    if to_pandas:
        thcd_result = thcd_result.to_pandas()
    return thcd_result 
[docs]
def interharmonics(
    harmonic_amplitudes: Union[pd.Series, pd.DataFrame, xr.DataArray, xr.Dataset],
    grid_freq: int,
    frequency_dimension: str = "",
    to_pandas: bool = True,
) -> Union[pd.DataFrame, xr.Dataset]:
    """
    Calculates the interharmonics from the harmonic_amplitudes of current
    Parameters
    -----------
    harmonic_amplitudes: pandas Series, pandas DataFrame, xarray DataArray, or xarray Dataset
        Harmonic amplitude indexed by the harmonic frequency
    grid_freq: int
        Value indicating if the power supply is 50 or 60 Hz. Options = 50 or 60
    frequency_dimension: string (optional)
        Name of the xarray dimension corresponding to frequency. If not supplied,
        defaults to the first dimension. Does not affect pandas input.
    to_pandas: bool (Optional)
        Flag to save output to pandas instead of xarray. Default = True.
    Returns
    -------
    interharmonic_groups: pandas DataFrame or xarray Dataset
        Interharmonics groups
    """
    if not isinstance(
        harmonic_amplitudes, (pd.Series, pd.DataFrame, xr.DataArray, xr.Dataset)
    ):
        raise TypeError(
            "harmonic_amplitudes must be of type pd.Series, pd.DataFrame, "
            + f"xr.DataArray, or xr.Dataset. Got {type(harmonic_amplitudes)}"
        )
    if grid_freq == 60:
        hertz = np.arange(0, 3060, 60)
    elif grid_freq == 50:
        hertz = np.arange(0, 2550, 50)
    else:
        raise ValueError(f"grid_freq must be either 50 or 60. Got {grid_freq}")
    if not isinstance(to_pandas, bool):
        raise TypeError(f"to_pandas must be of type bool. Got: {type(to_pandas)}")
    # Convert input to xr.Dataset
    harmonic_amplitudes = convert_to_dataset(harmonic_amplitudes, "harmonic_amplitudes")
    if (
        frequency_dimension != ""
        and frequency_dimension not in harmonic_amplitudes.coords
    ):
        raise ValueError(
            "frequency_dimension was supplied but is not a dimension "
            + f"of harmonic_amplitudes. Got {frequency_dimension}"
        )
    # Sort input data index
    if frequency_dimension == "":
        frequency_dimension = list(harmonic_amplitudes.dims)[0]
    harmonic_amplitudes = harmonic_amplitudes.sortby(frequency_dimension)
    # Loop through all variables in harmonic_amplitudes
    interharmonic_groups = xr.Dataset()
    for var in harmonic_amplitudes.data_vars:
        dataarray = harmonic_amplitudes[var]
        subset = np.zeros(np.size(hertz))
        for ihz in np.arange(0, len(hertz)):
            current_frequency = hertz[ihz]
            ind = dataarray.indexes[frequency_dimension].get_loc(current_frequency)
            if grid_freq == 60:
                data = dataarray.isel({frequency_dimension: slice(ind + 1, ind + 11)})
                subset[ihz] = (data**2).sum() ** 0.5
            else:
                data = dataarray.isel({frequency_dimension: slice(ind + 1, ind + 7)})
                subset[ihz] = (data**2).sum() ** 0.5
        interharmonic_groups = interharmonic_groups.assign(
            {var: (["frequency"], subset)}
        )
        interharmonic_groups = interharmonic_groups.assign_coords({"frequency": hertz})
    if to_pandas:
        interharmonic_groups = interharmonic_groups.to_pandas()
    return interharmonic_groups