Source code for mhkit.wave.io.swan

from scipy.io import loadmat
from os.path import isfile
import pandas as pd
import xarray as xr
import numpy as np
import re
from mhkit.utils import convert_to_dataset, convert_nested_dict_and_pandas


[docs] def read_table(swan_file, to_pandas=True): """ Reads in SWAN table format output Parameters ---------- swan_file: str filename to import to_pandas: bool (optional) Flag to output pandas instead of xarray. Default = True. Returns ------- swan_data: pandas DataFrame or xarray Dataset Dataframe of swan output metaDict: Dictionary Dictionary of metaData """ if not isinstance(swan_file, str): raise TypeError(f"swan_file must be of type str. Got: {type(swan_file)}") if not isfile(swan_file): raise ValueError(f"File not found: {swan_file}") if not isinstance(to_pandas, bool): raise TypeError(f"to_pandas must be of type bool. Got: {type(to_pandas)}") f = open(swan_file, "r") header_line_number = 4 for i in range(header_line_number + 2): line = f.readline() if line.startswith("% Run"): metaDict = _parse_line_metadata(line) if metaDict["Table"].endswith("SWAN"): metaDict["Table"] = metaDict["Table"].split(" SWAN")[:-1] if i == header_line_number: header = re.split("\\s+", line.rstrip().strip("%").lstrip()) metaDict["header"] = header if i == header_line_number + 1: units = re.split( "\\s+", line.strip(" %\n").replace("[", "").replace("]", "") ) metaDict["units"] = units f.close() swan_data = pd.read_csv( swan_file, sep="\\s+", comment="%", names=metaDict["header"] ) if not to_pandas: swan_data = convert_to_dataset(swan_data) return swan_data, metaDict
[docs] def read_block(swan_file, to_pandas=True): """ Reads in SWAN block output with headers and creates a dictionary of DataFrames or Datasets for each SWAN output variable in the output file. Parameters ---------- swan_file: str swan block file to import to_pandas: bool (optional) Flag to output a dictionary of pandas objects instead of a dictionary of xarray objects. Default = True. Returns ------- data: Dictionary Dictionary of DataFrames or Datasets of swan output variables metaDict: Dictionary Dictionary of metaData dependent on file type """ if not isinstance(swan_file, str): raise TypeError(f"swan_file must be of type str. Got: {type(swan_file)}") if not isfile(swan_file): raise ValueError(f"File not found: {swan_file}") if not isinstance(to_pandas, bool): raise TypeError(f"to_pandas must be of type bool. Got: {type(to_pandas)}") extension = swan_file.split(".")[1].lower() if extension == "mat": dataDict = _read_block_mat(swan_file) metaData = {"filetype": "mat", "variables": [var for var in dataDict.keys()]} else: dataDict, metaData = _read_block_txt(swan_file) if not to_pandas: dataDict = convert_nested_dict_and_pandas(dataDict) return dataDict, metaData
def _read_block_txt(swan_file): """ Reads in SWAN block output with headers and creates a dictionary of DataFrames for each SWAN output variable in the output file. Parameters ---------- swan_file: str swan block file to import (must be written with headers) Returns ------- dataDict: Dictionary Dictionary of DataFrame of swan output variables metaDict: Dictionary Dictionary of metaData dependent on file type """ if not isinstance(swan_file, str): raise TypeError(f"swan_file must be of type str. Got: {type(swan_file)}") if not isfile(swan_file): raise ValueError(f"File not found: {swan_file}") f = open(swan_file) runLines = [] metaDict = {} column_position = None dataDict = {} for position, line in enumerate(f): if line.startswith("% Run"): varPosition = position runLines.extend([position]) column_position = position + 5 varDict = _parse_line_metadata(line) varDict["unitMultiplier"] = float(varDict["Unit"].split(" ")[0]) metaDict[varPosition] = varDict variable = varDict["vars"] dataDict[variable] = {} if position == column_position and column_position != None: columns = line.strip("% \n").split() metaDict[varPosition]["cols"] = columns N_columns = len(columns) columns_position = None if not line.startswith("%"): raw_data = " ".join(re.split(" |\\.", line.strip(" \n"))).split() index_number = int(raw_data[0]) columns_data = raw_data[1:] data = [] possibleNaNs = ["****"] NNaNsTotal = sum([line.count(nanVal) for nanVal in possibleNaNs]) if NNaNsTotal > 0: for vals in columns_data: NNaNs = 0 for nanVal in possibleNaNs: NNaNs += vals.count(nanVal) if NNaNs > 0: for i in range(NNaNs): data.extend([np.nan]) else: data.extend([float(vals)]) else: data.extend([float(val) for val in columns_data]) dataDict[variable][index_number] = data metaData = pd.DataFrame(metaDict).T f.close() for var in metaData.vars.values: df = pd.DataFrame(dataDict[var]).T varCols = metaData[metaData.vars == var].cols.values.tolist()[0] colsDict = dict(zip(df.columns.values.tolist(), varCols)) df.rename(columns=colsDict) unitMultiplier = metaData[metaData.vars == var].unitMultiplier.values[0] dataDict[var] = df * unitMultiplier metaData.pop("cols") metaData = metaData.set_index("vars").T.to_dict() return dataDict, metaData def _read_block_mat(swan_file): """ Reads in SWAN matlab output and creates a dictionary of DataFrames for each swan output variable. Parameters ---------- swan_file: str filename to import Returns ------- dataDict: Dictionary Dictionary of DataFrame of swan output variables """ if not isinstance(swan_file, str): raise TypeError(f"swan_file must be of type str. Got: {type(swan_file)}") if not isfile(swan_file): raise ValueError(f"File not found: {swan_file}") dataDict = loadmat(swan_file, struct_as_record=False, squeeze_me=True) removeKeys = ["__header__", "__version__", "__globals__"] for key in removeKeys: dataDict.pop(key, None) for key in dataDict.keys(): dataDict[key] = pd.DataFrame(dataDict[key]) return dataDict def _parse_line_metadata(line): """ Parses the variable metadata into a dictionary Parameters ---------- line: str line from block swan data to parse Returns ------- metaDict: Dictionary Dictionary of variable metadata """ if not isinstance(line, str): raise TypeError(f"line must be of type str. Got: {type(line)}") metaDict = {} meta = re.sub( "\\s+", " ", line.replace(",", " ").strip("% \n").replace("**", "vars:") ) mList = meta.split(":") elms = [elm.split(" ") for elm in mList] for elm in elms: try: elm.remove("") except: pass for i in range(len(elms) - 1): elm = elms[i] key = elm[-1] val = " ".join(elms[i + 1][:-1]) metaDict[key] = val metaDict[key] = " ".join(elms[-1]) return metaDict
[docs] def dictionary_of_block_to_table(dictionary_of_DataFrames, names=None, to_pandas=True): """ Converts a dictionary of structured 2D grid SWAN block format x (columns),y (index) to SWAN table format x (column),y (column), values (column) DataFrame or Dataset. Parameters ---------- dictionary_of_DataFrames: Dictionary Dictionary of DataFrames in with columns as X indicie and Y as index. names: List (Optional) Name of data column in returned table. Default=Dictionary.keys() to_pandas: bool (optional) Flag to output pandas instead of xarray. Default = True. Returns ------- swanTables: pandas DataFrame or xarray Dataset DataFrame/Dataset with columns x,y,values where values = Dictionary.keys() or names """ if not isinstance(dictionary_of_DataFrames, dict): raise TypeError( f"dictionary_of_DataFrames must be of type dict. Got: {type(dictionary_of_DataFrames)}" ) if not bool(dictionary_of_DataFrames): raise ValueError( f"dictionary_of_DataFrames is empty. Got: {dictionary_of_DataFrames}" ) for key in dictionary_of_DataFrames: if not isinstance(dictionary_of_DataFrames[key], pd.DataFrame): raise TypeError( f"Dictionary key:{key} must be of type pd.DataFrame. Got: {type(dictionary_of_DataFrames[key])}" ) if not isinstance(names, type(None)): if not isinstance(names, list): raise TypeError( f"If specified, names must be of type list. Got: {type(names)}" ) if not all([isinstance(elm, str) for elm in names]): raise ValueError( f"If specified, all elements in names must be of type string. Got: {names}" ) if not len(names) == len(dictionary_of_DataFrames): raise ValueError( "If specified, names must the same length as dictionary_of_DataFrames" ) if not isinstance(to_pandas, bool): raise TypeError(f"to_pandas must be of type bool. Got: {type(to_pandas)}") if names == None: variables = [var for var in dictionary_of_DataFrames.keys()] else: variables = names var0 = variables[0] swanTables = block_to_table(dictionary_of_DataFrames[var0], name=var0) for var in variables[1:]: tmp_dat = block_to_table(dictionary_of_DataFrames[var], name=var) swanTables[var] = tmp_dat[var] if not to_pandas: swanTables = convert_to_dataset(swanTables) return swanTables
[docs] def block_to_table(data, name="values", to_pandas=True): """ Converts structured 2D grid SWAN block format x (columns), y (index) to SWAN table format x (column),y (column), values (column) DataFrame. Parameters ---------- data: pandas DataFrame or xarray Dataset DataFrame in with columns as X indicie and Y as index. name: string (Optional) Name of data column in returned table. Default='values' to_pandas: bool (optional) Flag to output pandas instead of xarray. Default = True. Returns ------- table: pandas DataFrame or xarray Dataset DataFrame with columns x,y,values """ if isinstance(data, xr.Dataset): data = data.to_pandas() if not isinstance(data, pd.DataFrame): raise TypeError(f"data must be of type pd.DataFrame. Got: {type(data)}") if not isinstance(name, str): raise TypeError(f"If specified, name must be of type str. Got: {type(name)}") if not isinstance(to_pandas, bool): raise TypeError(f"to_pandas must be of type bool. Got: {type(to_pandas)}") table = data.unstack().reset_index(name=name) table = table.rename(columns={"level_0": "x", "level_1": "y"}) table.sort_values(["x", "y"], ascending=[True, True], inplace=True) if not to_pandas: table = convert_to_dataset(table) return table