import numpy as np
import pandas as pd
import xarray as xr
import warnings
from mhkit import dolfyn
from mhkit.river.performance import (circular, ducted, rectangular,
multiple_circular, tip_speed_ratio,
power_coefficient)
def _slice_circular_capture_area(diameter, hub_height, doppler_cell_size):
"""
Slices a circle (capture area) based on ADCP depth bins mapped
across the face of the capture area.
Parameters
-------------
diameter: numeric
Diameter of the capture area.
hub_height: numeric
Turbine hub height altitude above the seabed. Assumes ADCP
depth bins are referenced to the seafloor.
doppler_cell_size: numeric
ADCP depth bin size.
Returns
---------
capture_area_slice: xarray.DataArray
Capture area sliced into horizontal slices of height
`doppler_cell_size`, centered on `hub height`.
"""
def area_of_circle_segment(radius, angle):
# Calculating area of sector
area_of_sector = np.pi * radius**2 * (angle/360)
# Calculating area of triangle
area_of_triangle = 0.5 * radius**2 * np.sin((np.pi*angle)/180)
return area_of_sector - area_of_triangle
def point_on_circle(y, r):
return np.sqrt(r**2 - y**2)
# Capture area - from mhkit.river.performance
d = diameter
cs = doppler_cell_size
A_cap = np.pi*(d/2)**2 # m^2
# Need to chop up capture area into slices based on bin size
# For a cirle:
r_min = hub_height - d/2
r_max = hub_height + d/2
A_edge = np.arange(r_min, r_max+cs, cs)
A_rng = A_edge[:-1] + cs/2 # Center of each slice
# y runs from the bottom edge of the lower centerline slice to
# the top edge of the lowest slice
# Will need to figure out y if the hub height isn't centered
y = abs(A_edge - np.mean(A_edge))
y[np.where(abs(y) > (d/2))] = d/2
# Even vs odd number of slices
if y.size % 2:
odd = 1
else:
odd = 0
y = y[:len(y)//2]
y = np.append(y, 0)
x = point_on_circle(y, d/2)
radii = np.rad2deg(np.arctan(x/y)*2)
# Segments go from outside of circle towards middle
As = area_of_circle_segment(d/2, radii)
# Subtract segments to get area of slices
As_slc = As[1:] - As[:-1]
if not odd:
# Make middle slice half whole
As_slc[-1] = As_slc[-1]*2
# Copy-flip the other slices to get the whole circle
As_slc = np.append(As_slc, np.flip(As_slc[:-1]))
else:
As_slc = abs(As_slc)
return xr.DataArray(As_slc, coords={'range': A_rng})
def _slice_rectangular_capture_area(height, width, hub_height, doppler_cell_size):
"""
Slices a rectangular (capture area) based on ADCP depth bins mapped
across the face of the capture area.
Parameters
-------------
height: numeric
Height of the capture area.
width: numeric
Width of the capture area.
hub_height: numeric
Turbine hub height altitude above the seabed. Assumes ADCP depth
bins are referenced to the seafloor.
doppler_cell_size: numeric
ADCP depth bin size.
Returns
---------
capture_area_slice: xarray.DataArray
Capture area sliced into horizontal slices of height
`doppler_cell_size`, centered on `hub height`.
"""
# Need to chop up capture area into slices based on bin size
# For a rectangle it's pretty simple
cs = doppler_cell_size
r_min = hub_height - height/2
r_max = hub_height + height/2
A_edge = np.arange(r_min, r_max+cs, cs)
A_rng = A_edge[:-1] + cs/2 # Center of each slice
As_slc = np.ones(len(A_rng))*width*cs
return xr.DataArray(As_slc, coords={'range': A_rng})
def _check_dtype(var, var_name):
"""
Checks the datatype of a variable, converting pandas Series to xarray DataArray,
or raising an error if the datatype is neither.
Parameters
-------------
var: xr.DataArray or pd.Series
The variable to be checked.
var_name: str
The name of the variable, used for error message.
Returns
---------
var: xr.DataArray
The input variable, converted to xr.DataArray if it was a pd.Series.
"""
if isinstance(var, pd.Series):
var = var.to_xarray()
elif not isinstance(var, xr.DataArray):
raise TypeError(var_name.capitalize() +
' must be of type xr.DataArray or pd.Series')
return var
[docs]def power_curve(power,
velocity,
hub_height,
doppler_cell_size,
sampling_frequency,
window_avg_time=600,
turbine_profile='circular',
diameter=None,
height=None,
width=None):
"""
Calculates power curve and power statistics for a marine energy
device based on IEC/TS 62600-200 section 9.3.
Parameters
-------------
power: pandas.Series or xarray.DataArray (time)
Device power output timeseries.
velocity: pandas.Series or xarray.DataArray ([range,] time)
1D or 2D streamwise sea water velocity or sea water speed.
hub_height: numeric
Turbine hub height altitude above the seabed. Assumes ADCP
depth bins are referenced to the seafloor.
doppler_cell_size: numeric
ADCP depth bin size.
sampling_frequency: numeric
ADCP sampling frequency in Hz.
window_avg_time: int, optional
Time averaging window in seconds. Defaults to 600.
turbine_profile: 'circular' or 'rectangular', optional
Shape of swept area of the turbine. Defaults to 'circular'.
diameter: numeric, optional
Required for turbine_profile='circular'. Defaults to None.
height: numeric, optional
Required for turbine_profile='rectangular'. Defaults to None.
width: numeric, optional
Required for turbine_profile='rectangular'. Defaults to None.
Returns
---------
pandas.DataFrame
Power-weighted velocity, mean power, power std dev, max and
min power vs hub-height velocity.
"""
# Velocity should be a 2D xarray or pandas array and have dims (range, time)
# Power should have a timestamp coordinate/index
power = _check_dtype(power, 'power')
velocity = _check_dtype(velocity, 'velocity')
if len(velocity.shape) != 2:
raise ValueError("Velocity should be 2 dimensional and have \
dimensions of 'time' (temporal) and 'range' (spatial).")
# Numeric positive checks
numeric_params = [hub_height, doppler_cell_size,
sampling_frequency, window_avg_time]
numeric_param_names = ['hub_height', 'doppler_cell_size',
'sampling_frequency', 'window_avg_time']
for param, name in zip(numeric_params, numeric_param_names):
if not isinstance(param, (int, float)):
raise TypeError(f'{name} must be numeric.')
if param <= 0:
raise ValueError(f'{name} must be positive.')
# Turbine profile related checks
if turbine_profile not in ['circular', 'rectangular']:
raise ValueError(
"`turbine_profile` must be one of 'circular' or 'rectangular'.")
if turbine_profile == 'circular':
if diameter is None:
raise TypeError(
"`diameter` cannot be None for input `turbine_profile` = 'circular'.")
elif not isinstance(diameter, (int, float)) or diameter <= 0:
raise ValueError("`diameter` must be a positive number.")
else: # If the checks pass, calculate A_slc
A_slc = _slice_circular_capture_area(
diameter, hub_height, doppler_cell_size)
else: # Rectangular profile
if height is None or width is None:
raise TypeError(
"`height` and `width` cannot be None for input `turbine_profile` = 'rectangular'.")
elif not all(isinstance(val, (int, float)) and val > 0 for val in [height, width]):
raise ValueError("`height` and `width` must be positive numbers.")
else: # If the checks pass, calculate A_slc
A_slc = _slice_rectangular_capture_area(
height, width, hub_height, doppler_cell_size)
# Streamwise data
U = abs(velocity)
time = U['time'].values
# Interpolate power to velocity timestamps
P = power.interp(time=U['time'], method='linear')
# Power weighted velocity in capture area
# Interpolate U range to capture area slices, then cube and multiply by area
U_hat = U.interp(range=A_slc['range'], method='linear')**3 * A_slc
# Average the velocity across the capture area and divide out area
U_hat = (U_hat.sum('range') / A_slc.sum()) ** (-1/3)
# Time-average velocity at hub-height
bnr = dolfyn.VelBinner(n_bin=window_avg_time *
sampling_frequency, fs=sampling_frequency)
# Hub-height velocity mean
mean_hub_vel = xr.DataArray(bnr.mean(U.sel(range=hub_height, method='nearest').values),
coords={'time': bnr.mean(time)})
# Power-weighted hub-height velocity mean
U_hat_bar = xr.DataArray((bnr.mean(U_hat.values ** 3)) ** (-1/3),
coords={'time': bnr.mean(time)})
# Average power
P_bar = xr.DataArray(bnr.mean(P.values),
coords={'time': bnr.mean(time)})
# Then reorganize into 0.1 m velocity bins and average
U_bins = np.arange(0, np.nanmax(mean_hub_vel) + 0.1, 0.1)
U_hub_vel = mean_hub_vel.assign_coords(
{"time": mean_hub_vel}).rename({"time": "speed"})
U_hub_mean = U_hub_vel.groupby_bins("speed", U_bins).mean()
U_hat_vel = U_hat_bar.assign_coords(
{"time": mean_hub_vel}).rename({"time": "speed"})
U_hat_mean = U_hat_vel.groupby_bins("speed", U_bins).mean()
P_bar_vel = P_bar.assign_coords(
{"time": mean_hub_vel}).rename({"time": "speed"})
P_bar_mean = P_bar_vel.groupby_bins("speed", U_bins).mean()
P_bar_std = P_bar_vel.groupby_bins("speed", U_bins).std()
P_bar_max = P_bar_vel.groupby_bins("speed", U_bins).max()
P_bar_min = P_bar_vel.groupby_bins("speed", U_bins).min()
out = pd.DataFrame((U_hub_mean.to_series(),
U_hat_mean.to_series(),
P_bar_mean.to_series(),
P_bar_std.to_series(),
P_bar_max.to_series(),
P_bar_min.to_series(),
)).T
out.columns = ['U_avg', 'U_avg_power_weighted',
'P_avg', 'P_std', 'P_max', 'P_min']
out.index.name = 'U_bins'
return out
def _average_velocity_bins(U, U_hub, bin_size):
"""
Groups time-ensembles into velocity bins based on hub-height
velocity and averages them.
Parameters
-------------
U: xarray.DataArray
Input variable to group by velocity.
U_hub: xarray.DataArray
Sea water velocity at hub height.
bin_size: numeric
Velocity averaging window size in m/s.
Returns
---------
xarray.DataArray
Data grouped into velocity bins.
"""
# Reorganize into velocity bins and average
U_bins = np.arange(0, np.nanmax(U_hub) + bin_size, bin_size)
# Group time-ensembles into velocity bins based on hub-height velocity and average
out = U.assign_coords({"time": U_hub}).rename({"time": "speed"})
out = out.groupby_bins("speed", U_bins).mean()
return out
def _apply_function(function, bnr, U):
"""
Applies a specified function ('mean', 'rms', or 'std') to the input
data array U, grouped into bins as specified by the binning rules in bnr.
Parameters
-------------
function: str
The name of the function to apply. Must be one of 'mean',
'rms', or 'std'.
bnr: dolfyn.VelBinner or similar
The binning rule object that determines how data in U is
grouped into bins.
U: xarray.DataArray
The input data array to which the function is applied.
Returns
---------
xarray.DataArray
The input data array U after the specified function has been
applied, grouped into bins according to bnr.
"""
if function == 'mean':
# Average data into 5-10 minute ensembles
return xr.DataArray(
bnr.mean(abs(U).values),
coords={'range': U.range,
'time': bnr.mean(U['time'].values)})
elif function == 'rms':
# Reshape tidal velocity - returns (range, ensemble-time, ensemble elements)
U_reshaped = bnr.reshape(abs(U).values)
# Take root-mean-square
U_rms = np.sqrt(np.nanmean(U_reshaped**2, axis=-1))
return xr.DataArray(
U_rms,
coords={'range': U.range,
'time': bnr.mean(U['time'].values)})
elif function == 'std':
# Standard deviation
return xr.DataArray(
bnr.standard_deviation(U.values),
coords={'range': U.range,
'time': bnr.mean(U['time'].values)})
else:
raise ValueError(
f"Unknown function {function}. Should be one of 'mean', 'rms', or 'std'")
[docs]def velocity_profiles(velocity,
hub_height,
water_depth,
sampling_frequency,
window_avg_time=600,
function='mean',
):
"""
Calculates profiles of the mean, root-mean-square (RMS), or standard
deviation(std) of velocity. The chosen metric, specified by `function`,
is calculated for each `window_avg_time` and bin-averaged based on
ensemble velocity, as per IEC/TS 62600-200 sections 9.4 and 9.5.
Parameters
-------------
velocity : pandas.Series or xarray.DataArray ([range,] time)
1D or 2D streamwise sea water velocity or sea water speed.
hub_height : numeric
Turbine hub height altitude above the seabed. Assumes ADCP depth bins
are referenced to the seafloor.
water_depth : numeric
Water depth to seafloor, in same units as velocity `range` coordinate.
sampling_frequency : numeric
ADCP sampling frequency in Hz.
window_avg_time : int, optional
Time averaging window in seconds. Defaults to 600.
func : string
Function to apply. One of 'mean','rms', or 'std'
Returns
---------
pandas.DataFrame
Average velocity profiles based on ensemble mean velocity.
"""
velocity = _check_dtype(velocity, 'velocity')
if len(velocity.shape) != 2:
raise ValueError("Velocity should be 2 dimensional and have \
dimensions of 'time' (temporal) and 'range' (spatial).")
if function not in ['mean', 'rms', 'std']:
raise ValueError("`function` must be one of 'mean', 'rms', or 'std'.")
# Streamwise data
U = velocity
# Create binner
bnr = dolfyn.VelBinner(n_bin=window_avg_time *
sampling_frequency, fs=sampling_frequency)
# Take velocity at hub height
mean_hub_vel = bnr.mean(U.sel(range=hub_height, method='nearest').values)
# Apply mean, root-mean-square, or standard deviation
U_out = _apply_function(function, bnr, U)
# Then reorganize into 0.5 m/s velocity bins and average
profiles = _average_velocity_bins(U_out, mean_hub_vel, bin_size=0.5)
# Extend top and bottom of profiles to the seafloor and sea surface
# Clip off extra depth bins with nans
rdx = profiles.isel(speed_bins=0).notnull().sum().values
profiles = profiles.isel(range=slice(None, rdx+1))
# Set seafloor velocity to 0 m/s
out_data = np.insert(profiles.data, 0, 0, axis=0)
# Set max range to the user-provided water depth
new_range = np.insert(profiles['range'].data[:-1], 0, 0)
new_range = np.append(new_range, water_depth)
# Create a profiles with new range
iec_profiles = xr.DataArray(out_data, coords={'range': new_range,
'speed_bins': profiles['speed_bins']})
# Forward fill to surface
iec_profiles = iec_profiles.ffill('range', limit=None)
return iec_profiles.to_pandas()
[docs]def device_efficiency(power,
velocity,
water_density,
capture_area,
hub_height,
sampling_frequency,
window_avg_time=600):
"""
Calculates marine energy device efficiency based on IEC/TS 62600-200 Section 9.7.
Parameters
-------------
power : pandas.Series or xarray.DataArray (time)
Device power output timeseries in Watts.
velocity : pandas.Series or xarray.DataArray ([range,] time)
1D or 2D streamwise sea water velocity or sea water speed in m/s.
water_density : float, pandas.Series or xarray.DataArray
Sea water density in kg/m^3.
capture_area : numeric
Swept area of marine energy device.
hub_height : numeric
Turbine hub height altitude above the seabed. Assumes ADCP depth bins
are referenced to the seafloor.
sampling_frequency : numeric
ADCP sampling frequency in Hz.
window_avg_time : int, optional
Time averaging window in seconds. Defaults to 600.
Returns
---------
pandas.Series
Device efficiency (power coefficient) in percent.
"""
# Velocity should be a 2D xarray or pandas array and have dims (range, time)
# Power should have a timestamp coordinate/index
power = _check_dtype(power, 'power')
velocity = _check_dtype(velocity, 'velocity')
if len(velocity.shape) != 2:
raise ValueError("Velocity should be 2 dimensional and have \
dimensions of 'time' (temporal) and 'range' (spatial).")
# Streamwise data
U = abs(velocity)
time = U['time'].values
# Power: Interpolate to velocity timeseries
power = _interpolate_power_to_velocity_timeseries(power, U)
# Create binner
bnr = dolfyn.VelBinner(n_bin=window_avg_time *
sampling_frequency, fs=sampling_frequency)
# Hub-height velocity
mean_hub_vel = xr.DataArray(bnr.mean(U.sel(range=hub_height, method='nearest').values),
coords={'time': bnr.mean(time)})
vel_hub = _average_velocity_bins(mean_hub_vel, mean_hub_vel, bin_size=0.1)
# Water density
rho_vel = _calculate_density(water_density, bnr, mean_hub_vel, time)
# Bin average power
P_avg = xr.DataArray(bnr.mean(power.values),
coords={'time': bnr.mean(time)})
P_vel = _average_velocity_bins(P_avg, mean_hub_vel, bin_size=0.1)
# Theoretical power resource
P_resource = 1/2 * rho_vel * capture_area * vel_hub**3
# Efficiency
eta = P_vel / P_resource
out = pd.DataFrame((vel_hub.to_series(),
eta.to_series(),
)).T
out.columns = ['U_avg', 'Efficiency']
out.index.name = 'U_bins'
return out
def _interpolate_power_to_velocity_timeseries(power, U):
"""
Interpolates the power timeseries to match the velocity timeseries time points.
This function checks if the input power is an xarray DataArray or a pandas Series
with a DatetimeIndex and performs interpolation accordingly. If the input power
does not match either of these types, a warning is issued and the original power
timeseries is returned.
Parameters
-------------
power : xarray.DataArray or pandas.Series
The device power output timeseries.
U : xarray.DataArray
2D streamwise sea water velocity or sea water speed.
Returns
---------
xarray.DataArray or pandas.Series
Interpolated power timeseries.
Raises
---------
Warning
If the input power is not a xarray DataArray or pandas Series with
a DatetimeIndex, a warning is issued stating that the function assumes the
power timestamps match the velocity timestamps.
"""
if 'xarray' in type(power).__module__:
return power.interp(time=U['time'], method='linear')
elif 'pandas' in type(power).__module__ and isinstance(power.index, pd.DatetimeIndex):
return power.to_xarray().interp(time=U['time'], method='linear')
else:
warnings.warn(
"Assuming `power` timestamps match `velocity` timestamps")
return power
def _calculate_density(water_density, bnr, mean_hub_vel, time):
"""
Calculates the averaged density for the given time period.
This function first checks if the water_density is a scalar or an array.
If it is an array, the function calculates the mean density over the time
period using the binner object 'bnr', and then averages it over velocity bins.
If it is a scalar, it directly returns the input density.
Parameters
-------------
water_density : numpy.ndarray or float
Sea water density values in kg/m^3. It can be a scalar or a 1D array.
bnr : dolfyn.VelBinner object
Object for binning data over specified time periods.
mean_hub_vel : xarray.DataArray
Mean velocity at the hub height.
time : numpy.ndarray
Time data array.
Returns
---------
xarray.DataArray or float
The averaged water density over velocity bins if water_density is an array,
or the input scalar water_density.
"""
if np.size(water_density) > 1:
rho_avg = xr.DataArray(bnr.mean(water_density.values),
coords={'time': bnr.mean(time)})
return _average_velocity_bins(rho_avg, mean_hub_vel, bin_size=0.1)
else:
return water_density