Source code for mhkit.wave.io.hindcast.hindcast

"""
This module provides functions to access and process WPTO wave hindcast data
hosted on AWS at specified latitude and longitude points or the closest
available points. It includes functions to retrieve data for predefined
regions, request point data for various parameters, and request directional
spectrum data.

Functions:
    - region_selection(lat_lon): Returns the name of the predefined region for
      given latitude and longitude coordinates.
    - request_wpto_point_data(data_type, parameter, lat_lon, years, tree=None,
      unscale=True, str_decode=True, hsds=True): Returns data from the WPTO wave
      hindcast hosted on AWS at the specified latitude and longitude point(s) for
      the requested data type, parameter, and years.
    - request_wpto_directional_spectrum(lat_lon, year, tree=None, unscale=True,
      str_decode=True, hsds=True): Returns directional spectra data from the WPTO
      wave hindcast hosted on AWS at the specified latitude and longitude point(s)
      for the given year.

Dependencies:
    - sys
    - time.sleep
    - pandas
    - xarray
    - numpy
    - rex.MultiYearWaveX, rex.WaveX

Author: rpauly, aidanbharath, ssolson
Date: 2023-09-26
"""

import os
import sys
from time import sleep
import pandas as pd
import xarray as xr
import numpy as np
from rex import MultiYearWaveX, WaveX
from mhkit.utils.cache import handle_caching
from mhkit.utils.type_handling import convert_to_dataset


[docs] def region_selection(lat_lon): """ Returns the name of the predefined region in which the given coordinates reside. Can be used to check if the passed lat/lon pair is within the WPTO hindcast dataset. Parameters ---------- lat_lon : list or tuple Latitude and longitude coordinates as floats or integers Returns ------- region : string Name of predefined region for given coordinates """ if not isinstance(lat_lon, (list, tuple)): raise TypeError(f"lat_lon must be of type list or tuple. Got: {type(lat_lon)}") if not all(isinstance(coord, (float, int)) for coord in lat_lon): raise TypeError( f"lat_lon values must be of type float or int. Got: {type(lat_lon[0])}" ) regions = { "Hawaii": {"lat": [15.0, 27.000002], "lon": [-164.0, -151.0]}, "West_Coast": {"lat": [30.0906, 48.8641], "lon": [-130.072, -116.899]}, "Atlantic": {"lat": [24.382, 44.8247], "lon": [-81.552, -65.721]}, } def region_search(lat_lon, region, regions): return all( regions[region][dk][0] <= d <= regions[region][dk][1] for dk, d in {"lat": lat_lon[0], "lon": lat_lon[1]}.items() ) region = [region for region in regions if region_search(lat_lon, region, regions)] if not region: raise ValueError("ERROR: coordinates out of bounds.") return region[0]
[docs] def request_wpto_point_data( data_type, parameter, lat_lon, years, tree=None, unscale=True, str_decode=True, hsds=True, path=None, to_pandas=True, ): """ Returns data from the WPTO wave hindcast hosted on AWS at the specified latitude and longitude point(s), or the closest available point(s). Visit https://registry.opendata.aws/wpto-pds-us-wave/ for more information about the dataset and available locations and years. Note: To access the WPTO hindcast data, you will need to configure h5pyd for data access on HSDS. Please see the WPTO_hindcast_example notebook for setup instructions. Parameters ---------- data_type : string Data set type of interest Options: '3-hour' '1-hour' parameter : string or list of strings Dataset parameter to be downloaded 3-hour dataset options: 'directionality_coefficient', 'energy_period', 'maximum_energy_direction' 'mean_absolute_period', 'mean_zero-crossing_period', 'omni-directional_wave_power', 'peak_period' 'significant_wave_height', 'spectral_width', 'water_depth' 1-hour dataset options: 'directionality_coefficient', 'energy_period', 'maximum_energy_direction' 'mean_absolute_period', 'mean_zero-crossing_period', 'omni-directional_wave_power', 'peak_period', 'significant_wave_height', 'spectral_width', 'water_depth', 'maximim_energy_direction', 'mean_wave_direction', 'frequency_bin_edges' lat_lon : tuple or list of tuples Latitude longitude pairs at which to extract data years : list Year(s) to be accessed. The years 1979-2010 available. Examples: [1996] or [2004,2006,2007] tree : str | cKDTree (optional) cKDTree or path to .pkl file containing pre-computed tree of lat, lon coordinates, default = None unscale : bool (optional) Boolean flag to automatically unscale variables on extraction Default = True str_decode : bool (optional) Boolean flag to decode the bytestring meta data into normal strings. Setting this to False will speed up the meta data read. Default = True hsds : bool (optional) Boolean flag to use h5pyd to handle .h5 'files' hosted on AWS behind HSDS. Setting to False will indicate to look for files on local machine, not AWS. Default = True path : string (optional) Optionally override with a custom .h5 filepath. Useful when setting `hsds=False`. to_pandas: bool (optional) Flag to output pandas instead of xarray. Default = True. Returns --------- data: pandas DataFrame or xarray Dataset Data indexed by datetime with columns named for parameter and cooresponding metadata index meta: DataFrame Location metadata for the requested data location """ if not isinstance(parameter, (str, list)): raise TypeError( f"parameter must be of type string or list. Got: {type(parameter)}" ) if not isinstance(lat_lon, (list, tuple)): raise TypeError(f"lat_lon must be of type list or tuple. Got: {type(lat_lon)}") if not isinstance(data_type, str): raise TypeError(f"data_type must be a string. Got: {type(data_type)}") if not isinstance(years, list): raise TypeError(f"years must be a list. Got: {type(years)}") if not isinstance(tree, (str, type(None))): raise TypeError(f"If specified, tree must be a string. Got: {type(tree)}") if not isinstance(unscale, bool): raise TypeError( f"If specified, unscale must be bool type. Got: {type(unscale)}" ) if not isinstance(str_decode, bool): raise TypeError( f"If specified, str_decode must be bool type. Got: {type(str_decode)}" ) if not isinstance(hsds, bool): raise TypeError(f"If specified, hsds must be bool type. Got: {type(hsds)}") if not isinstance(path, (str, type(None))): raise TypeError(f"If specified, path must be a string. Got: {type(path)}") if not isinstance(to_pandas, bool): raise TypeError( f"If specified, to_pandas must be bool type. Got: {type(to_pandas)}" ) # Attempt to load data from cache # Construct a string representation of the function parameters hash_params = f"{data_type}_{parameter}_{lat_lon}_{years}_{tree}_{unscale}_{str_decode}_{hsds}_{path}_{to_pandas}" cache_dir = _get_cache_dir() data, meta, _ = handle_caching( hash_params, cache_dir, cache_content={"data": None, "metadata": None, "write_json": None}, ) if data is not None: return data, meta else: if "directional_wave_spectrum" in parameter: sys.exit("This function does not support directional_wave_spectrum output") # Check for multiple region selection if isinstance(lat_lon[0], float): region = region_selection(lat_lon) else: region_list = [] for loc in lat_lon: region_list.append(region_selection(loc)) if region_list.count(region_list[0]) == len(lat_lon): region = region_list[0] else: sys.exit("Coordinates must be within the same region!") if path: wave_path = path elif data_type == "3-hour": wave_path = f"/nrel/US_wave/{region}/{region}_wave_*.h5" elif data_type == "1-hour": wave_path = ( f"/nrel/US_wave/virtual_buoy/{region}/{region}_virtual_buoy_*.h5" ) else: print("ERROR: invalid data_type") wave_kwargs = { "tree": tree, "unscale": unscale, "str_decode": str_decode, "hsds": hsds, "years": years, } data_list = [] with MultiYearWaveX(wave_path, **wave_kwargs) as rex_waves: if isinstance(parameter, list): for param in parameter: temp_data = rex_waves.get_lat_lon_df(param, lat_lon) gid = rex_waves.lat_lon_gid(lat_lon) cols = temp_data.columns[:] for i, col in zip(range(len(cols)), cols): temp = f"{param}_{i}" temp_data = temp_data.rename(columns={col: temp}) data_list.append(temp_data) data = pd.concat(data_list, axis=1) else: data = rex_waves.get_lat_lon_df(parameter, lat_lon) cols = data.columns[:] for i, col in zip(range(len(cols)), cols): temp = f"{parameter}_{i}" data = data.rename(columns={col: temp}) meta = rex_waves.meta.loc[cols, :] meta = meta.reset_index(drop=True) gid = rex_waves.lat_lon_gid(lat_lon) meta["gid"] = gid if not to_pandas: data = convert_to_dataset(data) data["time_index"] = pd.to_datetime(data.time_index) if isinstance(parameter, list): param_coords = [f"{param}_{i}" for param in parameter] data.coords["parameter"] = xr.DataArray( param_coords, dims="parameter" ) data.coords["year"] = xr.DataArray(years, dims="year") meta_ds = meta.to_xarray() data = xr.merge([data, meta_ds]) # Remove the 'index' coordinate data = data.drop_vars("index") # save_to_cache(hash_params, data, meta) handle_caching( hash_params, cache_dir, cache_content={"data": data, "metadata": meta, "write_json": None}, ) return data, meta
[docs] def request_wpto_directional_spectrum( lat_lon, year, tree=None, unscale=True, str_decode=True, hsds=True, path=None, ): """ Returns directional spectra data from the WPTO wave hindcast hosted on AWS at the specified latitude and longitude point(s), or the closest available point(s). The data is returned as an xarray Dataset with keys indexed by a graphical identifier (gid). `gid`s are integers which represent a lat, long on which data is stored. Requesting an array of `lat_lons` will return a dataset with multiple `gids` representing the data closest to each requested `lat`, `lon`. Visit https://registry.opendata.aws/wpto-pds-us-wave/ for more information about the dataset and available locations and years. Note: To access the WPTO hindcast data, you will need to configure h5pyd for data access on HSDS. Please see the WPTO_hindcast_example notebook for more information. Parameters ---------- lat_lon: tuple or list of tuples Latitude longitude pairs at which to extract data year : string Year to be accessed. The years 1979-2010 available. Only one year can be requested at a time. tree : str | cKDTree (optional) cKDTree or path to .pkl file containing pre-computed tree of lat, lon coordinates, default = None unscale : bool (optional) Boolean flag to automatically unscale variables on extraction Default = True str_decode : bool (optional) Boolean flag to decode the bytestring meta data into normal strings. Setting this to False will speed up the meta data read. Default = True hsds : bool (optional) Boolean flag to use h5pyd to handle .h5 'files' hosted on AWS behind HSDS. Setting to False will indicate to look for files on local machine, not AWS. Default = True path : string (optional) Optionally override with a custom .h5 filepath. Useful when setting `hsds=False` Returns --------- data: xarray Dataset Coordinates as datetime, frequency, and direction for data at specified location(s) meta: DataFrame Location metadata for the requested data location """ if not isinstance(lat_lon, (list, tuple)): raise TypeError(f"lat_lon must be of type list or tuple. Got: {type(lat_lon)}") if not isinstance(year, str): raise TypeError(f"year must be a string. Got: {type(year)}") if not isinstance(tree, (str, type(None))): raise TypeError(f"If specified, tree must be a string. Got: {type(tree)}") if not isinstance(unscale, bool): raise TypeError( f"If specified, unscale must be bool type. Got: {type(unscale)}" ) if not isinstance(str_decode, bool): raise TypeError( f"If specified, str_decode must be bool type. Got: {type(str_decode)}" ) if not isinstance(hsds, bool): raise TypeError(f"If specified, hsds must be bool type. Got: {type(hsds)}") if not isinstance(path, (str, type(None))): raise TypeError(f"If specified, path must be a string. Got: {type(path)}") # check for multiple region selection if isinstance(lat_lon[0], float): region = region_selection(lat_lon) else: reglist = [region_selection(loc) for loc in lat_lon] if reglist.count(reglist[0]) == len(lat_lon): region = reglist[0] else: sys.exit("Coordinates must be within the same region!") # Attempt to load data from cache hash_params = f"{lat_lon}_{year}_{tree}_{unscale}_{str_decode}_{hsds}_{path}" cache_dir = _get_cache_dir() data, meta, _ = handle_caching( hash_params, cache_dir, cache_content={"data": None, "metadata": None, "write_json": None}, ) if data is not None: return data, meta wave_path = path or ( f"/nrel/US_wave/virtual_buoy/{region}/{region}_virtual_buoy_{year}.h5" ) parameter = "directional_wave_spectrum" wave_kwargs = { "tree": tree, "unscale": unscale, "str_decode": str_decode, "hsds": hsds, } with WaveX(wave_path, **wave_kwargs) as rex_waves: # Get graphical identifier gid = rex_waves.lat_lon_gid(lat_lon) # Setup index and columns columns = [gid] if isinstance(gid, (int, np.integer)) else gid time_index = rex_waves.time_index frequency = rex_waves["frequency"] direction = rex_waves["direction"] index = pd.MultiIndex.from_product( [time_index, frequency, direction], names=["time_index", "frequency", "direction"], ) # Create bins for multiple smaller API dataset requests N = 6 length = len(rex_waves) quotient, remainder = divmod(length, N) bins = [i * quotient for i in range(N + 1)] bins[-1] += remainder index_bins = (np.array(bins) * len(frequency) * len(direction)).tolist() # Request multiple datasets and add to dictionary datas = {} for i in range(len(bins) - 1): idx = index[index_bins[i] : index_bins[i + 1]] # Request with exponential back off wait time sleep_time = 2 num_retries = 4 for _ in range(num_retries): try: data_array = rex_waves[parameter, bins[i] : bins[i + 1], :, :, gid] str_error = None except Exception as err: str_error = str(err) if str_error: sleep(sleep_time) sleep_time *= 2 else: break ax1 = np.prod(data_array.shape[:3]) ax2 = data_array.shape[-1] if len(data_array.shape) == 4 else 1 datas[i] = pd.DataFrame( data_array.reshape(ax1, ax2), columns=columns, index=idx ) data_raw = pd.concat(datas.values()) data = data_raw.to_xarray() data["time_index"] = pd.to_datetime(data.time_index) # Get metadata meta = rex_waves.meta.loc[columns, :] meta = meta.reset_index(drop=True) meta["gid"] = gid # Convert gid to integer or list of integers gid_list = ( [int(g) for g in gid] if isinstance(gid, (list, np.ndarray)) else [int(gid)] ) data_var_concat = xr.concat([data[g] for g in gid_list], dim="gid") # Create a new DataArray with the correct dimensions and coordinates spectral_density = xr.DataArray( data_var_concat.data.reshape( -1, len(frequency), len(direction), len(gid_list) ), dims=["time_index", "frequency", "direction", "gid"], coords={ "time_index": data["time_index"], "frequency": data["frequency"], "direction": data["direction"], "gid": gid_list, }, ) # Create the new dataset data = xr.Dataset( {"spectral_density": spectral_density}, coords={ "time_index": data["time_index"], "frequency": data["frequency"], "direction": data["direction"], "gid": gid_list, }, ) handle_caching( hash_params, cache_dir, cache_content={"data": data, "metadata": meta, "write_json": None}, ) return data, meta
def _get_cache_dir(): """ Returns the path to the cache directory. """ return os.path.join(os.path.expanduser("~"), ".cache", "mhkit", "hindcast")