"""
This module provides functions for retrieving and processing data from the United States
Geological Survey (USGS) National Water Information System (NWIS). It enables access to
river flow data and related measurements useful for hydrokinetic resource assessment.
"""
from typing import Dict, Union, Optional
import os
import json
import shutil
import requests
import pandas as pd
import xarray as xr
from pandas import DataFrame
from mhkit.utils.cache import handle_caching
def _read_usgs_json(text: Dict, to_pandas: bool = True) -> Union[DataFrame, xr.Dataset]:
"""
Process USGS JSON response into a pandas DataFrame or xarray Dataset.
Parameters
----------
text : dict
JSON response from USGS API containing time series data
to_pandas : bool, optional
Flag to output pandas instead of xarray. Default = True.
Returns
-------
data : pandas.DataFrame or xarray.Dataset
Processed time series data
"""
data = pd.DataFrame()
for i in range(len(text["value"]["timeSeries"])):
try:
site_name = text["value"]["timeSeries"][i]["variable"][
"variableDescription"
]
site_data = pd.DataFrame(
text["value"]["timeSeries"][i]["values"][0]["value"]
)
site_data.set_index("dateTime", drop=True, inplace=True)
site_data.index = pd.to_datetime(site_data.index, utc=True)
site_data.rename(columns={"value": site_name}, inplace=True)
site_data[site_name] = pd.to_numeric(site_data[site_name])
site_data.index.name = None
del site_data["qualifiers"]
data = data.combine_first(site_data)
except (KeyError, ValueError, TypeError, pd.errors.OutOfBoundsDatetime) as e:
print(f"Warning: Failed to process time series {i}: {str(e)}")
continue
if not to_pandas:
data = data.to_dataset()
return data
[docs]
def read_usgs_file(
file_name: str, to_pandas: bool = True
) -> Union[DataFrame, xr.Dataset]:
"""
Reads a USGS JSON data file (from https://waterdata.usgs.gov/nwis)
Parameters
----------
file_name : str
Name of USGS JSON data file
to_pandas: bool (optional)
Flag to output pandas instead of xarray. Default = True.
Returns
-------
data : pandas DataFrame or xarray Dataset
Data indexed by datetime with columns named according to the parameter's
variable description
"""
if not isinstance(to_pandas, bool):
raise TypeError(f"to_pandas must be of type bool. Got: {type(to_pandas)}")
with open(file_name, encoding="utf-8") as json_file:
text = json.load(json_file)
data = _read_usgs_json(text, to_pandas)
return data
# pylint: disable=too-many-locals
[docs]
def request_usgs_data(
station: str,
parameter: str,
start_date: str,
end_date: str,
options: Optional[Dict] = None,
) -> Union[DataFrame, xr.Dataset]:
"""
Loads USGS data directly from https://waterdata.usgs.gov/nwis using a
GET request
The request URL prints to the screen.
Parameters
----------
station : str
USGS station number (e.g. '08313000')
parameter : str
USGS parameter ID (e.g. '00060' for Discharge, cubic feet per second)
start_date : str
Start date in the format 'YYYY-MM-DD' (e.g. '2018-01-01')
end_date : str
End date in the format 'YYYY-MM-DD' (e.g. '2018-12-31')
options : dict, optional
Dictionary containing optional parameters:
- data_type: str
Data type, options include 'Daily' (return the mean daily value) and
'Instantaneous'. Default = 'Daily'
- proxy: dict or None
Proxy settings for the request. Default = None
- write_json: str or None
Name of json file to write data. Default = None
- clear_cache: bool
If True, the cache for this specific request will be cleared. Default = False
- to_pandas: bool
Flag to output pandas instead of xarray. Default = True
- timeout: int
Timeout in seconds for the HTTP request. Default = 30
Returns
-------
data : pandas DataFrame or xarray Dataset
Data indexed by datetime with columns named according to the parameter's
variable description
"""
# Set default options
options = options or {}
data_type = options.get("data_type", "Daily")
proxy = options.get("proxy", None)
write_json = options.get("write_json", None)
clear_cache = options.get("clear_cache", False)
to_pandas = options.get("to_pandas", True)
timeout = options.get("timeout", 30) # 30 seconds default timeout
if data_type not in ["Daily", "Instantaneous"]:
raise ValueError(f"data_type must be Daily or Instantaneous. Got: {data_type}")
if not isinstance(to_pandas, bool):
raise TypeError(f"to_pandas must be of type bool. Got: {type(to_pandas)}")
if not isinstance(timeout, (int, float)) or timeout <= 0:
raise ValueError(f"timeout must be a positive number. Got: {timeout}")
# Define the path to the cache directory
cache_dir = os.path.join(os.path.expanduser("~"), ".cache", "mhkit", "usgs")
# Create a unique filename based on the function parameters
hash_params = f"{station}_{parameter}_{start_date}_{end_date}_{data_type}"
cached_data, _, cache_filepath = handle_caching(
hash_params,
cache_dir,
cache_content={"data": None, "metadata": None, "write_json": write_json},
clear_cache_file=clear_cache,
)
if cached_data is not None:
return cached_data
# If no cached data, proceed with the API request
if data_type == "Daily":
data_url = "https://waterservices.usgs.gov/nwis/dv"
api_query = (
"/?format=json&sites="
+ station
+ "&startDT="
+ start_date
+ "&endDT="
+ end_date
+ "&statCd=00003"
+ "¶meterCd="
+ parameter
+ "&siteStatus=all"
)
else:
data_url = "https://waterservices.usgs.gov/nwis/iv"
api_query = (
"/?format=json&sites="
+ station
+ "&startDT="
+ start_date
+ "&endDT="
+ end_date
+ "¶meterCd="
+ parameter
+ "&siteStatus=all"
)
print("Data request URL: ", data_url + api_query)
max_retries = 3
retry_count = 0
while retry_count < max_retries:
try:
response = requests.get(
url=data_url + api_query, proxies=proxy, timeout=timeout, verify=True
)
text = json.loads(response.text)
break
except requests.exceptions.SSLError as e:
retry_count += 1
if retry_count == max_retries:
raise e
print(
f"SSL Error occurred, retrying... (Attempt {retry_count}/{max_retries})"
)
continue
# handle_caching is only set-up for pandas, so force this data to output as pandas for now
data = _read_usgs_json(text, True)
# After making the API request and processing the response, write the
# response to a cache file
handle_caching(
hash_params,
cache_dir,
cache_content={"data": data, "metadata": None, "write_json": None},
clear_cache_file=clear_cache,
)
if write_json:
shutil.copy(cache_filepath, write_json)
if not to_pandas:
data = data.to_dataset()
return data