Source code for geos.pygeos_tools.solvers.WaveSolver

# ------------------------------------------------------------------------------------------------------------
# SPDX-License-Identifier: LGPL-2.1-only
#
# Copyright (c) 2016-2024 Lawrence Livermore National Security LLC
# Copyright (c) 2018-2024 TotalEnergies
# Copyright (c) 2018-2024 The Board of Trustees of the Leland Stanford Junior University
# Copyright (c) 2023-2024 Chevron
# Copyright (c) 2019-     GEOS/GEOSX Contributors
# Copyright (c) 2019-     INRIA project-team Makutu
# All rights reserved
#
# See top level LICENSE, COPYRIGHT, CONTRIBUTORS, NOTICE, and ACKNOWLEDGEMENTS files for details.
# ------------------------------------------------------------------------------------------------------------
import numpy as np
import numpy.typing as npt
import pygeosx
from scipy.fftpack import fftfreq, ifft, fft
from typing import List, Union
from typing_extensions import Self
from geos.pygeos_tools.solvers.Solver import Solver

__doc__ = """
WaveSolver class which is the base class for every AcousticSolver and ElasticSolver classes, and inherits the Solver
capabilities.

This adds more methods to the Solver class to handle seismic sources and receivers.
"""


[docs] class WaveSolver( Solver ): """ WaveSolver Object containing methods useful for simulation using wave solvers in GEOS Attributes ----------- The ones herited from Solver class and: dtSeismo : float Time step to save pressure for seismic trace dtWaveField : float Time step to save fields minTime : float Min time to consider maxTime : float End time to consider minTimeSim : float Starting time of simulation maxTimeSim : float End Time of simulation sourceType : str Type of source sourceFreq : float Frequency of the source """ def __init__( self: Self, solverType: str, dt: float = None, minTime: float = 0.0, maxTime: float = None, dtSeismo: float = None, dtWaveField: float = None, sourceType: str = None, sourceFreq: float = None, **kwargs ): """ Parameters ---------- solverType: str The solverType targeted in GEOS XML deck dt : float Time step for simulation minTime : float Starting time of simulation Default is 0 maxTime : float End Time of simulation dtSeismo : float Time step to save pressure for seismic trace dtWaveField : float Time step to save fields sourceType : str Type of source Default is None sourceFreq : float Frequency of the source Default is None kwargs : keyword args geosx_argv : list GEOSX arguments or command line as a splitted line """ super().__init__( solverType, **kwargs ) self.dt: float = dt self.dtSeismo: float = dtSeismo self.dtWaveField: float = dtWaveField self.minTime: float = minTime self.minTimeSim: float = minTime self.maxTime: float = maxTime self.maxTimeSim: float = maxTime self.sourceType: str = sourceType self.sourceFreq: float = sourceFreq def __repr__( self: Self ): string_list: List[ str ] = list() string_list.append( "Solver type : " + self.type + "\n" ) string_list.append( "dt : " + str( self.dt ) + "\n" ) string_list.append( "maxTime : " + str( self.maxTime ) + "\n" ) string_list.append( "dtSeismo : " + str( self.dtSeismo ) + "\n" ) string_list.append( "Outputs : " + str( self.hdf5Targets ) + "\n" + str( self.vtkTargets ) + "\n" ) rep = "" for string in string_list: rep += string return rep
[docs] def initialize( self: Self, rank: int = 0, xml=None ) -> None: """ Initialization or reinitialization of GEOSX Parameters ---------- rank : int Process rank xml : XML XML object containing parameters for GEOSX initialization. Only required if not set in the __init__ OR if different from it """ super().initialize( rank, xml ) self.updateSourceProperties()
""" Accessors """
[docs] def getVelocityModel( self: Self, velocityName: str, filterGhost: bool = False, **kwargs ) -> npt.NDArray: """ Get the velocity values WARNING: this function aims to work in the specific case of having only 1 CellElementRegion in your XML file and that this CellElementRegion contains only one cellBlock. Parameters ----------- velocityName : str Name of velocity array in GEOS filterGhost : bool Filter the ghost ranks Returns ------- Numpy Array : Array containing the velocity values """ velocity = self.getSolverFieldWithPrefix( velocityName, **kwargs ) if velocity is not None: if filterGhost: velocity_filtered = self.filterGhostRankFor1RegionWith1CellBlock( velocity, **kwargs ) if velocity_filtered is not None: return velocity_filtered else: print( "getVelocityModelFor1RegionWith1CellBlock->filterGhostRank: No ghostRank was found." ) else: return velocity else: print( "getVelocityModelFor1RegionWith1CellBlock: No velocity was found." )
""" Mutators """
[docs] def setSourceAndReceivers( self: Self, sourcesCoords: List = [], receiversCoords: List = [] ) -> None: """ Update sources and receivers positions in GEOS Parameters ---------- sourcesCoords : list List of coordinates for the sources receiversCoords : list List of coordinates for the receivers """ src_pos_geosx = self.solver.get_wrapper( "sourceCoordinates" ).value() src_pos_geosx.set_access_level( pygeosx.pylvarray.RESIZEABLE ) rcv_pos_geosx = self.solver.get_wrapper( "receiverCoordinates" ).value() rcv_pos_geosx.set_access_level( pygeosx.pylvarray.RESIZEABLE ) src_pos_geosx.resize_all( ( len( sourcesCoords ), 3 ) ) if len( sourcesCoords ) == 0: src_pos_geosx.to_numpy()[ : ] = np.zeros( ( 0, 3 ) ) else: src_pos_geosx.to_numpy()[ : ] = sourcesCoords[ : ] rcv_pos_geosx.resize_all( ( len( receiversCoords ), 3 ) ) if len( receiversCoords ) == 0: rcv_pos_geosx.to_numpy()[ : ] = np.zeros( ( 0, 3 ) ) else: rcv_pos_geosx.to_numpy()[ : ] = receiversCoords[ : ] self.solver.reinit()
[docs] def setSourceFrequency( self: Self, freq: float ) -> None: """ Overwrite GEOSX source frequency and set self.sourceFreq Parameters ---------- freq : float Frequency of the source in Hz """ self.setGeosWrapperValueByTargetKey( "/Solvers/" + self.name + "/timeSourceFrequency", freq ) self.sourceFreq = freq
[docs] def setSourceValue( self: Self, value: Union[ npt.NDArray, List ] ) -> None: """ Set the value of the source in GEOS Parameters ---------- value : array/list List/array containing the value of the source at each time step """ src_value = self.solver.get_wrapper( "sourceValue" ).value() src_value.set_access_level( pygeosx.pylvarray.RESIZEABLE ) src_value.resize_all( value.shape ) src_value.to_numpy()[ : ] = value[ : ] self.maxTimeSim = ( value.shape[ 0 ] - 1 ) * self.dt self.setGeosWrapperValueByTargetKey( "Events/minTime", self.minTime ) self.sourceValue = value[ : ]
""" Update method """
[docs] def updateSourceProperties( self: Self ) -> None: """ Updates the frequency and type of source to match the XML """ if self.sourceFreq is None: solverdict = self.xml.solvers[ self.type ] for k, v in solverdict.items(): if k == "timeSourceFrequency": self.sourceFreq = v break if self.sourceType is None: if hasattr( self.xml, "events" ): events = self.xml.events try: for event in events[ "PeriodicEvent" ]: if isinstance( event, dict ): if event[ "target" ] == "/Solvers/" + self.name: self.sourceType = "ricker" + event[ 'rickerOrder' ] else: if event == "target" and events[ "PeriodicEvent" ][ "target" ] == "/Solvers/" + self.name: self.sourceType = "ricker" + events[ "PeriodicEvent" ][ "rickerOrder" ] except KeyError: self.sourceType = "ricker2"
""" Utils """
[docs] def evaluateSource( self: Self ) -> None: """ Evaluate source and update on GEOS Only ricker order {0 - 4} accepted """ sourceTypes = ( "ricker0", "ricker1", "ricker2", "ricker3", "ricker4" ) assert self.sourceType in sourceTypes, f"Only {sourceTypes} are allowed" f0 = self.sourceFreq delay = 1.0 / f0 alpha = -( f0 * np.pi )**2 nsamples = int( round( ( self.maxTime - self.minTime ) / self.dt ) ) + 1 sourceValue = np.zeros( ( nsamples, 1 ) ) order = int( self.sourceType[ -1 ] ) sgn = ( -1 )**( order + 1 ) time = self.minTime for nt in range( nsamples ): if self.minTime <= -1.0 / f0: tmin = -2.9 / f0 tmax = 2.9 / f0 time_d = time else: time_d = time - delay tmin = 0.0 tmax = 2.9 / f0 if ( time > tmin and time < tmax ) or ( self.minTime < -1 / f0 and time == tmin ): gaussian = np.exp( alpha * time_d**2 ) if order == 0: sourceValue[ nt, 0 ] = sgn * gaussian elif order == 1: sourceValue[ nt, 0 ] = sgn * ( 2 * alpha * time_d ) * gaussian elif order == 2: sourceValue[ nt, 0 ] = sgn * ( 2 * alpha + 4 * alpha**2 * time_d**2 ) * gaussian elif order == 3: sourceValue[ nt, 0 ] = sgn * ( 12 * alpha**2 * time_d + 8 * alpha**3 * time_d**3 ) * gaussian elif order == 4: sourceValue[ nt, 0 ] = sgn * ( 12 * alpha**2 + 48 * alpha**3 * time_d**2 + 16 * alpha**4 * time_d**4 ) * gaussian time += self.dt self.setSourceFrequency( self.sourceFreq ) self.setSourceValue( sourceValue ) self.sourceValue = sourceValue
[docs] def filterSource( self: Self, fmax: Union[ str, float ] ) -> None: """ Filter the source value and give the value to GEOSX. Note that is can also modify the start and end time of simulation to avoid discontinuity. Parameters ----------- fmax : float/string Max frequency of the source wanted. The source then have frequencies in the interval [0,fmax+1] """ if str( fmax ) == "all": return minTime = self.minTime maxTime = self.maxTime dt = self.dt f0 = self.sourceFreq sourceValue = self.sourceValue pad = int( round( sourceValue.shape[ 0 ] / 2 ) ) n = sourceValue.shape[ 0 ] + 2 * pad tf = fftfreq( n, dt ) y_fft = np.zeros( ( n, sourceValue.shape[ 1 ] ), dtype="complex_" ) y = np.zeros( y_fft.shape, dtype="complex_" ) for i in range( y_fft.shape[ 1 ] ): y_fft[ pad:n - pad, i ] = sourceValue[ :, i ] y_fft[ :, i ] = fft( y_fft[ :, i ] ) # Perform fourier transform isup = np.where( tf >= fmax )[ 0 ] imax = np.where( tf[ isup ] >= fmax + 1 )[ 0 ][ 0 ] i1 = isup[ 0 ] i2 = isup[ imax ] iinf = np.where( tf <= -fmax )[ 0 ] imin = np.where( tf[ iinf ] <= -fmax - 1 )[ 0 ][ -1 ] i3 = iinf[ imin ] i4 = iinf[ -1 ] for i in range( y_fft.shape[ 1 ] ): y_fft[ i1:i2, i ] = np.cos( ( isup[ 0:imax ] - i1 ) / ( i2 - i1 ) * np.pi / 2 )**2 * y_fft[ i1:i2, i ] y_fft[ i3:i4, i ] = np.cos( ( iinf[ imin:-1 ] - i4 ) / ( i3 - i4 ) * np.pi / 2 )**2 * y_fft[ i3:i4, i ] y_fft[ i2:i3, i ] = 0 for i in range( y.shape[ 1 ] ): y[ :, i ] = ifft( y_fft[ :, i ] ) # Perform inverse fourier transform it0 = int( round( abs( minTime / dt ) ) ) + pad d = int( round( 1 / f0 / dt ) ) i1 = max( it0 - 4 * d, 0 ) i2 = int( round( i1 + d / 4 ) ) i4 = min( n, n - pad + 4 * d ) i3 = int( round( i4 - d / 4 ) ) for i in range( y.shape[ 1 ] ): y[ i1:i2, i ] = np.cos( ( np.arange( i1, i2 ) - i2 ) / ( i2 - i1 ) * np.pi / 2 )**2 * y[ i1:i2, i ] y[ i3:i4, i ] = np.cos( ( np.arange( i3, i4 ) - i3 ) / ( i4 - i3 ) * np.pi / 2 )**2 * y[ i3:i4, i ] y[ max( i1 - d, 0 ):i1, i ] = 0.0 y[ i4:min( i4 + d, n ), i ] = 0.0 t = np.arange( minTime - pad * dt, maxTime + pad * dt + dt / 2, dt ) self.setSourceValue( np.real( y[ max( i1 - d, 0 ):min( i4 + d, n ), : ] ) ) self.minTimeSim = t[ max( i1 - d, 0 ) ] self.maxTimeSim = t[ min( i4 + d, n - 1 ) ] self.setGeosWrapperValueByTargetKey( "Events/minTime", self.minTimeSim ) self.sourceValue = np.real( y[ max( i1 - d, 0 ):min( i4 + d, n ), : ] )
[docs] def outputWaveField( self: Self, time: float, cycleNumber: int ) -> None: """ Trigger the wavefield output Parameters ---------- time : float Current time of simulation cycleNumber : int Current cycle number """ self.collections[ 0 ].collect( time, self.dt, cycleNumber ) self.hdf5Outputs[ 0 ].output( time, self.dt, cycleNumber )