import os
import importlib.util
import sys
import argparse
import numpy as np
from scipy.interpolate import interp1d
import matplotlib.pyplot as plt
from geos.hdf5_wrapper import hdf5_wrapper
unit_map = {
'milliseconds': 1e-3,
'seconds': 1.0,
'minutes': 60.0,
'hours': 60.0 * 60.0,
'days': 60.0 * 60.0 * 24.0,
'years': 60.0 * 60.0 * 24.0 * 365.25
}
DEFAULT_SET_NAME = 'empty_setName'
[docs]
def interpolate_values_time( ta, xa, tb ):
"""
Interpolate array values in time
Args:
ta (np.ndarray): Target time array
xa (np.ndarray): Target value array
tb (np.ndarray): Baseline time array
Returns:
np.ndarray: Interpolated value array
"""
N = list( np.shape( xa ) )
M = len( tb )
ta = np.ascontiguousarray( np.squeeze( ta ) )
tb = np.ascontiguousarray( np.squeeze( tb ) )
if ( len( N ) == 1 ):
return interp1d( ta, xa )( tb )
else:
# Reshape the input array so that we can work on the non-time axes
S = np.prod( N[ 1: ] )
xc = np.reshape( xa, ( N[ 0 ], S ) )
xd = np.zeros( ( len( tb ), S ) )
for ii in range( S ):
xd[ :, ii ] = interp1d( ta, xc[ :, ii ], bounds_error=False, fill_value='extrapolate' )( tb )
# Return the array to it's expected shape
N[ 0 ] = M
return np.reshape( xd, N )
[docs]
def evaluate_external_script( script, fn, data ):
"""
Evaluate an external script to produce the curve
Args:
script (str): Path to a python script
fn (str): Name of the function to call
Returns:
np.ndarray: Curve values
"""
script = os.path.abspath( script )
if os.path.isfile( script ):
module_name = os.path.split( script )[ 1 ]
module_name = module_name[ :module_name.rfind( '.' ) ]
spec = importlib.util.spec_from_file_location( module_name, script )
module = importlib.util.module_from_spec( spec )
sys.modules[ module_name ] = module
spec.loader.exec_module( module )
if hasattr( module, fn ):
return getattr( module, fn )( **data )
else:
raise Exception( f'External script does not contain the expected function ({fn})' )
else:
raise FileNotFoundError( f'Could not find script: {script}' )
[docs]
def check_diff( parameter_name, set_name, target, baseline, tolerance, errors, modifier='baseline' ):
"""
Compute the L2-norm of the diff and compare to the set tolerance
Args:
parameter_name (str): Parameter name
set_name (str): Set name
target (np.ndarray): Target value array
baseline (np.ndarray): Baseline value array
tolerance (float): Required tolerance of diff
errors (list): List to add any errors
Returns:
np.ndarray: Interpolated value array
"""
dx = target - baseline
diff = np.sqrt( np.sum( dx * dx ) ) / dx.size
if ( diff > tolerance ):
errors.append(
f'{modifier}_{parameter_name}_{set_name} diff exceeds tolerance: ||t-b||/N={diff}, {modifier}_tolerance={tolerance}'
)
[docs]
def compare_time_history_curves( fname, baseline, curve, tolerance, output, output_n_column, units_time,
script_instructions ):
"""
Compute time history curves
Args:
fname (str): Target curve file name
baseline (str): Baseline curve file name
curve (list): list containing pairs of value and set names to test
tolerance (list): Tolerance for curve diffs
output (str): Path to place output figures
ncol (int): Number of columns to use in the figure
units_time (str): Time units for the figure
script_instructions (list): List of (script, function, parameter, setname) values
Returns:
tuple: warnings, errors
"""
# Setup
files = { 'target': fname, 'baseline': baseline }
warnings = []
errors = []
location_string_options = [ 'ReferencePosition', 'elementCenter' ]
location_strings = {}
tol = {}
if len( curve ) != len( tolerance ):
raise Exception(
f'Curvecheck inputs must be of the same length: curves ({len(curve)}) and tolerance ({len(tolerance)})' )
# Load data and check sizes
data = {}
data_sizes = {}
for k, f in files.items():
if os.path.isfile( f ):
data[ k ] = hdf5_wrapper( f ).get_copy()
else:
errors.append( f'{k} file not found: {f}' )
continue
for ( p, s ), t in zip( curve, tolerance ):
if s == DEFAULT_SET_NAME:
key = f'{p}'
else:
key = f'{p} {s}'
if f'{p} Time' not in data[ k ].keys():
errors.append( f'Value not found in {k} file: {p}' )
continue
if key not in data[ k ].keys():
errors.append( f'Set not found in {k} file: {s}' )
continue
# Check for a location string (this may not be consistent across the same file)
for kb in data[ k ].keys():
for kc in location_string_options:
if ( kc in kb ) and ( p in kb ):
location_strings[ p ] = kc
if p not in location_strings:
test_keys = ', '.join( location_string_options )
all_keys = ', '.join( data[ k ].keys() )
errors.append(
f'Could not find location string for parameter: {p}, search_options=({test_keys}), all_options={all_keys}'
)
# Check data sizes in the initial loop to make later logic easier
if p not in data_sizes:
data_sizes[ p ] = {}
tol[ p ] = {}
if s not in data_sizes[ p ]:
data_sizes[ p ][ s ] = {}
tol[ p ][ s ] = float( t[ 0 ] )
data_sizes[ p ][ s ][ k ] = list( np.shape( data[ k ][ key ] ) )
# Record requested tolerance
if p not in tol:
tol[ p ] = {}
if s not in tol[ p ]:
tol[ p ][ s ] = t
# Generate script-based curve
if script_instructions and ( len( data ) > 0 ):
data[ 'script' ] = {}
for script, fn, p, s in script_instructions:
k = location_strings[ p ]
data[ 'script' ][ f'{p} Time' ] = data[ 'target' ][ f'{p} Time' ]
key = f'{p} {k}'
key2 = f'{p}'
if s != DEFAULT_SET_NAME:
key += f' {s}'
key2 += f' {s}'
data[ 'script' ][ key ] = data[ 'target' ][ key ]
data[ 'script' ][ key2 ] = evaluate_external_script( script, fn, data[ 'target' ] )
data_sizes[ p ][ s ][ 'script' ] = list( np.shape( data[ 'script' ][ key2 ] ) )
# Reshape data if necessary so that they have a predictable number of dimensions
for k in data.keys():
for p, s in curve:
key = f'{p}'
if s != DEFAULT_SET_NAME:
key += f' {s}'
if ( len( data_sizes[ p ][ s ][ k ] ) == 1 ):
data[ k ][ key ] = np.reshape( data[ k ][ key ], ( -1, 1, 1 ) )
data_sizes[ p ][ s ][ k ].append( 1 )
elif ( len( data_sizes[ p ][ s ][ k ] ) == 2 ):
data[ k ][ key ] = np.expand_dims( data[ k ][ key ], -1 )
data_sizes[ p ][ s ][ k ].append( 1 )
# Check data diffs
size_err = '{}_{} values have different sizes: target=({},{},{}) baseline=({},{},{})'
for p, set_data in data_sizes.items():
for s, set_sizes in set_data.items():
key = f'{p}'
if s != DEFAULT_SET_NAME:
key += f' {s}'
if ( ( 'baseline' in set_sizes ) and ( 'target' in set_sizes ) ):
xa = data[ 'target' ][ key ]
xb = data[ 'baseline' ][ key ]
if set_sizes[ 'target' ] == set_sizes[ 'baseline' ]:
check_diff( p, s, xa, xb, tol[ p ][ s ], errors )
else:
warnings.append( size_err.format( p, s, *set_sizes[ 'target' ], *set_sizes[ 'baseline' ] ) )
# Check whether the data can be interpolated
if ( len( set_sizes[ 'baseline' ] ) == 1 ) or ( set_sizes[ 'target' ][ 1: ]
== set_sizes[ 'baseline' ][ 1: ] ):
warnings.append( f'Interpolating target curve in time: {p}_{s}' )
ta = data[ 'target' ][ f'{p} Time' ]
tb = data[ 'baseline' ][ f'{p} Time' ]
xc = interpolate_values_time( ta, xa, tb )
check_diff( p, s, xc, xb, tol[ p ][ s ], errors )
else:
errors.append( f'Cannot perform a curve check for {p}_{s}' )
if ( ( 'script' in set_sizes ) and ( 'target' in set_sizes ) ):
xa = data[ 'target' ][ key ]
xb = data[ 'script' ][ key ]
check_diff( p, s, xa, xb, tol[ p ][ s ], errors, modifier='script' )
# Render figures
output = os.path.expanduser( output )
os.makedirs( output, exist_ok=True )
for p, set_data in data_sizes.items():
for s, set_sizes in set_data.items():
curve_check_figure( p, location_strings[ p ], s, data, data_sizes, output, output_n_column, units_time )
return warnings, errors
[docs]
def curve_check_parser():
"""
Build the curve check parser
Returns:
argparse.parser: The curve check parser
"""
# Custom action class
class PairAction( argparse.Action ):
def __call__( self, parser, namespace, values, option_string=None ):
pairs = getattr( namespace, self.dest )
if len( values ) == 1:
pairs.append( ( values[ 0 ], DEFAULT_SET_NAME ) )
elif len( values ) == 2:
pairs.append( ( values[ 0 ], values[ 1 ] ) )
else:
raise Exception( 'Only a single value or a pair of values are expected' )
setattr( namespace, self.dest, pairs )
# Custom action class
class ScriptAction( argparse.Action ):
def __call__( self, parser, namespace, values, option_string=None ):
scripts = getattr( namespace, self.dest )
scripts.append( values )
N = len( values )
if ( N < 3 ) or ( N > 4 ):
raise Exception( 'The -s option requires 3 or 4 inputs' )
elif N == 3:
values.append( DEFAULT_SET_NAME )
setattr( namespace, self.dest, scripts )
parser = argparse.ArgumentParser()
parser.add_argument( "filename", help="Path to the time history file" )
parser.add_argument( "baseline", help="Path to the baseline file" )
parser.add_argument( '-c',
'--curve',
nargs='+',
action=PairAction,
help='Curves to check (value) or (value, setname)',
default=[] )
parser.add_argument( "-t",
"--tolerance",
nargs='+',
action='append',
help=f"The tolerance for each curve check diffs (||x-y||/N)",
default=[] )
parser.add_argument( "-w",
"--Werror",
action="store_true",
help="Force all warnings to be errors, default is False.",
default=False )
parser.add_argument( "-o", "--output", help="Output figures to this directory", default='./curve_check_figures' )
unit_choices = list( unit_map.keys() )
parser.add_argument( "-n", "--n-column", help="Number of columns for the output figure", default=1 )
parser.add_argument( "-u",
"--units-time",
help=f"Time units for plots (default=seconds)",
choices=unit_choices,
default='seconds' )
parser.add_argument( "-s",
"--script",
nargs='+',
action=ScriptAction,
help='Python script instructions (path, function, value, setname)',
default=[] )
return parser
[docs]
def main():
"""
Entry point for the curve check script
"""
parser = curve_check_parser()
args = parser.parse_args()
warnings, errors = compare_time_history_curves( args.filename, args.baseline, args.curve, args.tolerance,
args.output, args.n_column, args.units_time, args.script )
# Write errors/warnings to the screen
if args.Werror:
errors.extend( warnings[ : ] )
warnings = []
if len( warnings ):
print( 'Curve check warnings:' )
print( '\n'.join( warnings ) )
if len( errors ):
print( 'Curve check errors:' )
print( '\n'.join( errors ) )
raise Exception( f'Curve check produced {len(errors)} errors!' )
if __name__ == '__main__':
main()