Source code for mhkit.wave.io.swan

from scipy.io import loadmat
from os.path import isfile
import pandas as pd
import numpy as np
import re 
  

[docs]def read_table(swan_file): ''' Reads in SWAN table format output Parameters ---------- swan_file: str filename to import Returns ------- swan_data: DataFrame Dataframe of swan output metaDict: Dictionary Dictionary of metaData ''' assert isinstance(swan_file, str), 'swan_file must be of type str' assert isfile(swan_file)==True, f'File not found: {swan_file}' f = open(swan_file,'r') header_line_number = 4 for i in range(header_line_number+2): line = f.readline() if line.startswith('% Run'): metaDict = _parse_line_metadata(line) if metaDict['Table'].endswith('SWAN'): metaDict['Table'] = metaDict['Table'].split(' SWAN')[:-1] if i == header_line_number: header = re.split("\s+",line.rstrip().strip('%').lstrip()) metaDict['header'] = header if i == header_line_number+1: units = re.split('\s+',line.strip(' %\n').replace('[','').replace(']','')) metaDict['units'] = units f.close() swan_data = pd.read_csv(swan_file, sep='\s+', comment='%', names=metaDict['header']) return swan_data, metaDict
[docs]def read_block(swan_file): ''' Reads in SWAN block output with headers and creates a dictionary of DataFrames for each SWAN output variable in the output file. Parameters ---------- swan_file: str swan block file to import Returns ------- data: Dictionary Dictionary of DataFrame of swan output variables metaDict: Dictionary Dictionary of metaData dependent on file type ''' assert isinstance(swan_file, str), 'swan_file must be of type str' assert isfile(swan_file)==True, f'File not found: {swan_file}' extension = swan_file.split('.')[1].lower() if extension == 'mat': dataDict = _read_block_mat(swan_file) metaData = {'filetype': 'mat', 'variables': [var for var in dataDict.keys()]} else: dataDict, metaData = _read_block_txt(swan_file) return dataDict, metaData
def _read_block_txt(swan_file): ''' Reads in SWAN block output with headers and creates a dictionary of DataFrames for each SWAN output variable in the output file. Parameters ---------- swan_file: str swan block file to import (must be written with headers) Returns ------- dataDict: Dictionary Dictionary of DataFrame of swan output variables metaDict: Dictionary Dictionary of metaData dependent on file type ''' assert isinstance(swan_file, str), 'swan_file must be of type str' assert isfile(swan_file)==True, f'File not found: {swan_file}' f = open(swan_file) runLines=[] metaDict = {} column_position = None dataDict={} for position, line in enumerate(f): if line.startswith('% Run'): varPosition = position runLines.extend([position]) column_position = position + 5 varDict = _parse_line_metadata(line) varDict['unitMultiplier'] = float(varDict['Unit'].split(' ')[0]) metaDict[varPosition] = varDict variable = varDict['vars'] dataDict[variable] = {} if position==column_position and column_position!=None: columns = line.strip('% \n').split() metaDict[varPosition]['cols'] = columns N_columns = len(columns) columns_position = None if not line.startswith('%'): raw_data = ' '.join(re.split(' |\.', line.strip(' \n'))).split() index_number = int(raw_data[0]) columns_data = raw_data[1:] data=[] possibleNaNs = ['****'] NNaNsTotal = sum([line.count(nanVal) for nanVal in possibleNaNs]) if NNaNsTotal>0: for vals in columns_data: NNaNs = 0 for nanVal in possibleNaNs: NNaNs += vals.count(nanVal) if NNaNs > 0: for i in range(NNaNs): data.extend([np.nan]) else: data.extend([float(vals)]) else: data.extend([float(val) for val in columns_data]) dataDict[variable][index_number] = data metaData = pd.DataFrame(metaDict).T f.close() for var in metaData.vars.values: df = pd.DataFrame(dataDict[var]).T varCols = metaData[metaData.vars == var].cols.values.tolist()[0] colsDict = dict(zip(df.columns.values.tolist(), varCols)) df.rename(columns=colsDict) unitMultiplier = metaData[metaData.vars == var].unitMultiplier.values[0] dataDict[var] = df * unitMultiplier metaData.pop('cols') metaData = metaData.set_index('vars').T.to_dict() return dataDict, metaData def _read_block_mat(swan_file): ''' Reads in SWAN matlab output and creates a dictionary of DataFrames for each swan output variable. Parameters ---------- swan_file: str filename to import Returns ------- dataDict: Dictionary Dictionary of DataFrame of swan output variables ''' assert isinstance(swan_file, str), 'swan_file must be of type str' assert isfile(swan_file)==True, f'File not found: {swan_file}' dataDict = loadmat(swan_file, struct_as_record=False, squeeze_me=True) removeKeys = ['__header__', '__version__', '__globals__'] for key in removeKeys: dataDict.pop(key, None) for key in dataDict.keys(): dataDict[key] = pd.DataFrame(dataDict[key]) return dataDict def _parse_line_metadata(line): ''' Parses the variable metadata into a dictionary Parameters ---------- line: str line from block swan data to parse Returns ------- metaDict: Dictionary Dictionary of variable metadata ''' assert isinstance(line, str), 'line must be of type str' metaDict={} meta=re.sub('\s+', " ", line.replace(',', ' ').strip('% \n').replace('**', 'vars:')) mList = meta.split(':') elms = [elm.split(' ') for elm in mList] for elm in elms: try: elm.remove('') except: pass for i in range(len(elms)-1): elm = elms[i] key = elm[-1] val = ' '.join(elms[i+1][:-1]) metaDict[key] = val metaDict[key] = ' '.join(elms[-1]) return metaDict
[docs]def dictionary_of_block_to_table(dictionary_of_DataFrames, names=None): ''' Converts a dictionary of structured 2D grid SWAN block format x (columns),y (index) to SWAN table format x (column),y (column), values (column) DataFrame. Parameters ---------- dictionary_of_DataFrames: Dictionary Dictionary of DataFrames in with columns as X indicie and Y as index. names: List (Optional) Name of data column in returned table. Default=Dictionary.keys() Returns ------- swanTables: DataFrame DataFrame with columns x,y,values where values = Dictionary.keys() or names ''' assert isinstance(dictionary_of_DataFrames, dict), ( 'dictionary_of_DataFrames must be of type Dict') assert bool(dictionary_of_DataFrames), 'dictionary_of_DataFrames is empty' for key in dictionary_of_DataFrames: assert isinstance(dictionary_of_DataFrames[key],pd.DataFrame), ( f'Dictionary key:{key} must be of type pd.DataFrame') if not isinstance(names, type(None)): assert isinstance(names, list), ( 'If specified names must be of type list') assert all([isinstance(elm, str) for elm in names]), ( 'If specified all elements in names must be of type string') assert len(names) == len(dictionary_of_DataFrames), ( 'If specified names must the same length as dictionary_of_DataFrames') if names == None: variables = [var for var in dictionary_of_DataFrames.keys() ] else: variables = names var0 = variables[0] swanTables = block_to_table(dictionary_of_DataFrames[var0], name=var0) for var in variables[1:]: tmp_dat = block_to_table(dictionary_of_DataFrames[var], name=var) swanTables[var] = tmp_dat[var] return swanTables
[docs]def block_to_table(data, name='values'): ''' Converts structured 2D grid SWAN block format x (columns), y (index) to SWAN table format x (column),y (column), values (column) DataFrame. Parameters ---------- data: DataFrame DataFrame in with columns as X indicie and Y as index. name: string (Optional) Name of data column in returned table. Default='values' Returns ------- table: DataFrame DataFrame with columns x,y,values ''' assert isinstance(data,pd.DataFrame), 'data must be of type pd.DataFrame' assert isinstance(name, str), 'Name must be of type str' table = data.unstack().reset_index(name=name) table = table.rename(columns={'level_0':'x', 'level_1': 'y'}) table.sort_values(['x', 'y'], ascending=[True, True], inplace=True) return table