Source code for geos.pygeos_tools.solvers.AcousticSolver

# ------------------------------------------------------------------------------------------------------------
# SPDX-License-Identifier: LGPL-2.1-only
#
# Copyright (c) 2016-2024 Lawrence Livermore National Security LLC
# Copyright (c) 2018-2024 TotalEnergies
# Copyright (c) 2018-2024 The Board of Trustees of the Leland Stanford Junior University
# Copyright (c) 2023-2024 Chevron
# Copyright (c) 2019-     GEOS/GEOSX Contributors
# Copyright (c) 2019-     INRIA project-team Makutu
# All rights reserved
#
# See top level LICENSE, COPYRIGHT, CONTRIBUTORS, NOTICE, and ACKNOWLEDGEMENTS files for details.
# ------------------------------------------------------------------------------------------------------------
import h5py
import os
import numpy as np
import numpy.typing as npt
import shutil
from typing import Optional
from typing_extensions import Self
from geos.pygeos_tools.solvers.helpers import MODEL_FOR_GRADIENT
from geos.pygeos_tools.solvers.WaveSolver import WaveSolver
from geos.utils.errors_handling.classes import required_attributes

__doc__ = """
AcousticSolver class inherits from WaveSolver class.

This adds method to read / set pressures at receiver, compute partial gradients and accessors for Density and Velocity
models.
"""


[docs] class AcousticSolver( WaveSolver ): """ AcousticSolver Object containing all methods to run AcousticSEM simulation with GEOSX Attributes ----------- The ones inherited from WaveSolver class modelForGradient : str Gradient model used """ def __init__( self: Self, solverType: str = "AcousticSEM", dt: float = None, minTime: float = 0.0, maxTime: float = None, dtSeismo: float = None, dtWaveField: float = None, sourceType: str = None, sourceFreq: float = None, **kwargs ): """ Parameters ---------- solverType: str The solverType targeted in GEOS XML deck. Defaults to "AcousticSEM" dt : float Time step for simulation minTime : float Starting time of simulation Default is 0 maxTime : float End Time of simulation dtSeismo : float Time step to save pressure for seismic trace dtWaveField : float Time step to save fields sourceType : str Type of source Default is None sourceFreq : float Frequency of the source Default is None kwargs : keyword args geosx_argv : list GEOSX arguments or command line as a splitted line """ super().__init__( solverType=solverType, dt=dt, minTime=minTime, maxTime=maxTime, dtSeismo=dtSeismo, dtWaveField=dtWaveField, sourceType=sourceType, sourceFreq=sourceFreq, **kwargs ) self.modelForGradient: str = None def __repr__( self: Self ): string_list = [] string_list.append( "Solver type : " + type( self ).__name__ + "\n" ) string_list.append( "dt : " + str( self.dt ) + "\n" ) string_list.append( "maxTime : " + str( self.maxTime ) + "\n" ) string_list.append( "dtSeismo : " + str( self.dtSeismo ) + "\n" ) string_list.append( "Outputs : " + str( self.hdf5Targets ) + "\n" + str( self.vtkTargets ) + "\n" ) rep = "" for string in string_list: rep += string return rep """ Accessors """
[docs] def getFullPressureAtReceivers( self: Self, comm ) -> npt.NDArray: """ Return all pressures at receivers values on all ranks Note that for a too large 2d array this may not work. Parameters: ----------- comm : MPI_COMM MPI communicators """ rank = comm.Get_rank() allPressure = comm.gather( self.getPressureAtReceivers(), root=0 ) pressure = np.zeros( self.getPressureAtReceivers().shape ) if rank == 0: for p in allPressure: for i in range( p.shape[ 1 ] ): if any( p[ 1:, i ] ): pressure[ :, i ] = p[ :, i ] pressure = comm.bcast( pressure, root=0 ) return pressure
[docs] def getFullWaveFieldAtReceivers( self: Self, comm ) -> npt.NDArray: return self.getFullPressureAtReceivers( comm )[ :, :-1 ]
@required_attributes( "modelForGradient" ) def getModelForGradient( self: Self ) -> str: return self.modelForGradient
[docs] def getPartialGradientFor1RegionWith1CellBlock( self: Self, filterGhost=False, **kwargs ) -> Optional[ npt.NDArray ]: """ Get the local rank gradient value WARNING: this function aims to work in the specific case of having only 1 CellElementRegion in your XML file and that this CellElementRegion contains only one cellBlock. Returns -------- partialGradient : Numpy Array- Array containing the element id list for the local rank """ partialGradient = self.getSolverFieldWithPrefix( "partialGradient", **kwargs ) if partialGradient is not None: if filterGhost: partialGradient_filtered = self.filterGhostRankFor1RegionWith1CellBlock( partialGradient, **kwargs ) if partialGradient_filtered is not None: return partialGradient_filtered else: print( "getPartialGradientFor1RegionWith1CellBlock->filterGhostRank: Filtering of ghostRank could" + "not be performed. No partialGradient returned." ) else: return partialGradient else: print( "getPartialGradientFor1RegionWith1CellBlock: No partialGradient was found." )
[docs] def getPressureAtReceivers( self: Self ) -> npt.NDArray: """ Get the pressure values at receivers coordinates Returns ------ numpy Array : Array containing the pressure values at all time step at all receivers coordinates """ return self.getGeosWrapperByName( "pressureNp1AtReceivers", [ "Solvers" ] )
[docs] def getWaveField( self: Self ) -> npt.NDArray: return self.getPressureAtReceivers()[ :, :-1 ]
""" Mutators """
[docs] def setModelForGradient( self: Self, modelForGradient: str ) -> None: f""" Set the model for the gradient Parameters ----------- model : str Model for the gradients available are: {list( MODEL_FOR_GRADIENT.__members__.keys() )} """ if modelForGradient in MODEL_FOR_GRADIENT.__members__: self.modelForGradient = MODEL_FOR_GRADIENT[ modelForGradient ].value else: raise ValueError( f"The model for gradient chosen '{modelForGradient}' is not implemented. The available" + f" ones are '{list( MODEL_FOR_GRADIENT.__members__.keys() )}'." )
""" Update methods """
[docs] def updateDensityModel( self: Self, density: npt.NDArray ) -> None: """ Update density values in GEOS Parameters ----------- density : array New values for the density """ self.setGeosWrapperValueByName( "acousticDensity", value=density, filters=[ self.discretization ] )
[docs] def updateVelocityModel( self: Self, vel: npt.NDArray ) -> None: """ Update velocity value in GEOS Parameters ---------- vel : float/array Value(s) for velocity field """ self.setGeosWrapperValueByName( "acousticVelocity", value=vel, filters=[ self.discretization ] )
[docs] def updateVelocityModelFromHDF5( self: Self, filename: str, low: float, high: float, comm, velocityModelName: str, **kwargs ) -> None: """ Update velocity model WARNING: this function aims to work in the specific case of having only 1 CellElementRegion in your XML file and that this CellElementRegion contains only one cellBlock. Parameters ----------- filename : str .hdf5 file where to get the new model low : float Min value threshold. All new values < low are set to low high : float Max value threshold. All new values > high are set to high comm : MPI_COMM MPI communicators """ root = 0 rank = comm.Get_rank() x = None if rank == root: with h5py.File( filename, 'r' ) as f: x = f[ "velocity" ][ : ] imin = np.where( x < low )[ 0 ] imax = np.where( x > high )[ 0 ] x[ imin ] = low x[ imax ] = high if self.modelForGradient == MODEL_FOR_GRADIENT.SLOWNESS_SQUARED.value: x = np.sqrt( 1 / x ) elif self.modelForGradient == MODEL_FOR_GRADIENT.SLOWNESS.value: x = 1 / x elif self.modelForGradient == MODEL_FOR_GRADIENT.VELOCITY.value: pass else: raise ValueError( "Not implemented" ) startModel = self.bcastFieldFor1RegionWith1CellBlock( x, comm, root, **kwargs ) self.setGeosWrapperValueByName( velocityModelName, startModel )
""" Methods for computation and reset of values """
[docs] def computePartialGradientFor1RegionWith1CellBlock( self: Self, shotId: int, minDepth: float, comm, velocityName: str, gradDirectory="partialGradient", filterGhost: bool = False, **kwargs ) -> None: """ Compute the partial Gradient WARNING: this function aims to work in the specific case of having only 1 CellElementRegion in your XML file and that this CellElementRegion contains only one cellBlock. Parameters ----------- shotId : string Number of the shot as string minDepth : float Depth at which gradient values are kept, otherwise it is set to 0. NOTE : this is done in this routine to avoid storage \ of elementCenter coordinates in the .hdf5 \ but might be problem for WolfeConditions later on \ if minDepth is too large comm : MPI_COMM MPI communicators velocity : str Name of the velocity model in GEOS gradDirectory : str, optional Partial gradient directory \ Default is `partialGradient` """ rank = comm.Get_rank() root = 0 # Get local gradient grad = self.getPartialGradientFor1RegionWith1CellBlock( filterGhost, **kwargs ) if self.modelForGradient == MODEL_FOR_GRADIENT.SLOWNESS_SQUARED.value: x = self.getVelocityModel( velocityName, filterGhost, **kwargs ) grad = -( x * x * x / 2 ) * grad elif self.modelForGradient == MODEL_FOR_GRADIENT.SLOWNESS.value: x = self.getVelocityModel( velocityName, filterGhost=True, **kwargs ) grad = -x * x * grad elif self.modelForGradient == MODEL_FOR_GRADIENT.VELOCITY.value: pass else: raise ValueError( "Not implemented" ) grad = grad.astype( np.float64 ) zind = np.where( self.getElementCenterFor1RegionWith1CellBlock( filterGhost, **kwargs )[ :, 2 ] < minDepth )[ 0 ] grad[ zind ] = 0.0 # Gather global gradient gradFull, ntot = self.gatherFieldFor1RegionWith1CellBlock( field=grad, comm=comm, root=root, **kwargs ) if rank == root: os.makedirs( gradDirectory, exist_ok=True ) with h5py.File( f"{gradDirectory}/partialGradient_" + shotId + ".hdf5", 'w' ) as h5p: h5p.create_dataset( "partialGradient", data=np.zeros( ntot ), chunks=True, maxshape=( ntot, ) ) h5p[ "partialGradient" ][ : ] = self.dtWaveField * gradFull shutil.move( f"{gradDirectory}/partialGradient_" + shotId + ".hdf5", f"{gradDirectory}/partialGradient_ready_" + shotId + ".hdf5" ) comm.Barrier()
[docs] def resetWaveField( self: Self, **kwargs ) -> None: """ Reinitialize all pressure values on the Wavefield to zero in GEOSX """ self.setGeosWrapperValueByTargetKey( "Solvers/" + self.name + "/indexSeismoTrace", value=0 ) nodeManagerPath = f"domain/MeshBodies/{self.meshName}/meshLevels/{self.discretization}/nodeManager/" if self.type == "AcousticSEM": for ts in ( "nm1", "n", "np1" ): self.setGeosWrapperValueByTargetKey( nodeManagerPath + f"pressure_{ts}", value=0.0 ) elif self.type == "AcousticFirstOrderSEM": self.setGeosWrapperValueByTargetKey( nodeManagerPath + "pressure_np1", value=0.0 ) prefix = self._getPrefixPathFor1RegionWith1CellBlock( **kwargs ) for component in ( "x", "y", "z" ): self.setGeosWrapperValueByTargetKey( prefix + f"velocity_{component}", value=0.0 )
[docs] def resetPressureAtReceivers( self: Self ) -> None: """ Reinitialize pressure values at receivers to 0 """ self.setGeosWrapperValueByTargetKey( "/Solvers/" + self.name + "/" + "pressureNp1AtReceivers", value=0.0 )