Source code for geos.processing.post_processing.FaultStabilityAnalysis

# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright 2023-2026 TotalEnergies.
# SPDX-FileContributor: Nicolas Pillardou, Paloma Martinez
import logging
from typing_extensions import Self
from pathlib import Path

from vtkmodules.vtkCommonDataModel import vtkDataSet, vtkUnstructuredGrid

from geos.mesh.utils.multiblockModifiers import ( mergeBlocks )
from geos.mesh.io.vtkIO import ( PVDReader, createPVD, writeMesh, VtkOutput )

from geos.processing.tools.FaultGeometry import ( FaultGeometry )
from geos.processing.tools.FaultVisualizer import ( Visualizer )
from geos.processing.tools.MohrCoulomb import ( MohrCoulombAnalysis )
from geos.processing.tools.SensitivityAnalyzer import ( SensitivityAnalyzer )
from geos.processing.tools.StressProjector import ( StressProjector, StressProjectorWeightingScheme )

from geos.utils.GeosOutputsConstants import ( GeosMeshOutputsEnum, PostProcessingOutputsEnum )
from geos.utils.Logger import ( getLogger, Logger, CountVerbosityHandler, isHandlerInLogger, getLoggerHandlerType )

__doc__ = """
FaultStabilityAnalysis is a vtk filter that performs an analysis of requested faults in a mesh for several timesteps of simulation.

The pre-simulation mesh containing a fault mask attribute is required to identify the faults.


To use it:

.. code-block:: python

    from geos.processing.post_processing.FaultStabilityAnalysis import FaultStabilityAnalysis

    # Filter inputs.
    inputMesh: vtkDataSet
    faultAttribute: str
    faultValues: list[int|float]
    pvdFile: str
    speHandler: bool

    # Instantiate the filter
    faultStabilityFilter = FaultStabilityAnalysis( inputMesh, faultAttribute, faultValues, pvdFile, speHandler )

    # Set parameters [optional]

    faultStabilityFilter.setStressName( stressName: str )
    faultStabilityFilter.setBiotCoefficientName( biotName: str )
    faultStabilityFilter.setSensitivityFrictionAngles( list[ float ] )
    faultStabilityFilter.setSensitivityCohesions( list[ float] )
    faultStabilityFilter.setProfileStartPoints( list[ tuple[ np.float64, np.float64 ] ] )

    # Set your handler (only if speHandler is True).
    yourHandler: logging.Handler
    faultStabilityFilter.addLoggerHandler( yourHandler )

    # Do calculations
    try:
        faultStabilityFilter.applyFilter()
    except Exception as e:
        mess: str = f"The filter { faultStabilityFilter.logger.name } failed due to: { e }"
        faultStabilityFilter.logger.error( mess, exc_info=True )
"""

loggerTitle: str = "Fault Stability Analysis"


[docs] class FaultStabilityAnalysis: def __init__( self: Self, inputMesh: vtkDataSet, faultAttribute: str, faultValues: list[ int | float ], pvdFile: str, speHandler: bool = False, ) -> None: """Fault stability analysis workflow. Args: inputMesh (vtkDataSet): Pre-simulation mesh faultValues (list[ int | float ]): Fault attribute values to consider faultAttribute (str): Fault attribute name in the initial mesh volumeMeshInitial( vtkDataSet): Output mesh for time t=0 pvdFile (str): GEOS output PVD filename speHandler (bool, optional): True to use a specific handler, False to use the internal handler. Defaults to False. """ self.pvdFile = pvdFile self.timeIndexes: list[ int ] | None = None self.outputDir: Path = Path( "FaultStabilityAnalysis/" ) # Mechanical parameters self.frictionAngle: float = 10 # [degrees] self.cohesion: float = 0 # [bar] # Normal orientation: Rotate normals and tangents from 180° self.rotateNormals: bool = False # Visualization self.savePlots: bool = True self.zscale: float = 1.0 self.nDepthProfiles = 1 self.minDepthProfiles = None self.maxDepthProfiles = None self.saveContributionCells = True # Save vtu contributive cells self.weightingScheme: StressProjectorWeightingScheme = StressProjectorWeightingScheme.ARITHMETIC self.computePrincipalStresses = False self.profileStartPoints: list[ tuple[ float, float ] ] = [] self.profileSearchRadius = None # Sensitivity analysis self.runSensitivity: bool = True self.sensitivityFrictionAngles: list[ float ] = [] # degrees self.sensitivityCohesions: list[ float ] = [] # bar # Variable names self.stressName: str = GeosMeshOutputsEnum.AVERAGE_STRESS.value self.biotName: str = PostProcessingOutputsEnum.BIOT_COEFFICIENT.value # Faults attributes self.faultAttribute = faultAttribute self.faultValues = faultValues self.processFaultsSeparately: bool = True # Logger self.logger: Logger if not speHandler: self.logger = getLogger( loggerTitle, True ) else: self.logger = logging.getLogger( loggerTitle ) self.logger.setLevel( logging.INFO ) self.logger.propagate = False counter: CountVerbosityHandler = CountVerbosityHandler() self.counter: CountVerbosityHandler self.nbWarnings: int = 0 try: self.counter = getLoggerHandlerType( type( counter ), self.logger ) self.counter.resetWarningCount() except ValueError: self.counter = counter self.counter.setLevel( logging.INFO ) self.logger.addHandler( self.counter ) self.volumeMeshInitial = self._getInitialMesh() self.faultGeometry = FaultGeometry( inputMesh, faultValues, faultAttribute, self.volumeMeshInitial, self.outputDir, self.logger )
[docs] def setLoggerHandler( self: Self, handler: logging.Handler ) -> None: """Set a specific handler for the filter logger. In this filter 4 log levels are use, .info, .error, .warning and .critical, be sure to have at least the same 4 levels. Args: handler (logging.Handler): The handler to add. """ if not isHandlerInLogger( handler, self.logger ): self.logger.addHandler( handler ) else: self.logger.warning( "The logger already has this handler, it has not been added." )
def _getInitialMesh( self: Self ) -> vtkUnstructuredGrid: """Get the mesh from timestep 0 in the PVD output file and merge the blocks.""" reader = PVDReader( self.pvdFile, logger=self.logger ) datasetT0 = reader.getDataSetAtTimeIndex( 0 ) return mergeBlocks( datasetT0, keepPartialAttributes=True, logger=self.logger ) def _initializeFaultGeometry( self: Self ) -> None: """Extract faults and compute geometric properties such as normals and adjacency topology.""" self.logger.info( "=" * 70 ) self.logger.info( "Initialize fault geometry" ) self.logger.info( "=" * 70 ) self.faultGeometry.initialize( processFaultsSeparately=self.processFaultsSeparately, saveContributionCells=self.saveContributionCells )
[docs] def processAllTimeIndexesRequested( self: Self ) -> None: """Process all time steps using pre-computed fault geometry. Returns: vtkUnstructuredGrid: Fault mesh. """ self.logger.info( "Reading PVD file" ) reader = PVDReader( self.pvdFile, self.logger ) timeValues = reader.getAllTimestepsValues() if self.timeIndexes: timeValues = timeValues[ self.timeIndexes ] outputFiles = [] dataInitial = None # Get pre-computed data from faultGeometry surface = self.faultGeometry.faultSurface adjacencyMapping = self.faultGeometry.adjacencyMapping geometricProperties = self.faultGeometry.getGeometricProperties() # Initialize projector with pre-computed topology self.logger.info( "Initialize projector with pre-computed topology." ) projector = StressProjector( adjacencyMapping, geometricProperties, self.outputDir, self.logger ) projector.setStressName( self.stressName ) projector.setBiotCoefficientName( self.biotName ) self.logger.info( "=" * 70 ) self.logger.info( "Time Series Processing" ) self.logger.info( "=" * 70 ) for i, time in enumerate( timeValues ): self.logger.info( f"***Step {i+1}/{len(timeValues)}: {time/(365.25*24*3600):.2f} years***" ) # Read time step idx = self.timeIndexes[ i ] if self.timeIndexes else i dataset = reader.getDataSetAtTimeIndex( idx ) # Merge blocks volumeData = mergeBlocks( dataset, keepPartialAttributes=True, logger=self.logger ) if dataInitial is None: dataInitial = volumeData # ----------------------------------- # Projection using pre-computed topology # ----------------------------------- # Projection surfaceResult, volumeMarked, contributingCells = projector.projectStressToFault( volumeData, dataInitial, surface, time=timeValues[ i ], # Simulation time timestep=i, # Timestep index weightingScheme=self.weightingScheme, computePrincipalStresses=self.computePrincipalStresses ) # ----------------------------------- # Mohr-Coulomb analysis # ----------------------------------- cohesion = self.cohesion # bar frictionAngle = self.frictionAngle # degrees mc = MohrCoulombAnalysis( surfaceResult, cohesion, frictionAngle, logger=self.logger ) surfaceResult = mc.analyze() # ----------------------------------- # Visualize # ----------------------------------- self._plotResults( surfaceResult, contributingCells, time ) # ----------------------------------- # Sensitivity analysis # ----------------------------------- if self.runSensitivity: if len( self.sensitivityFrictionAngles ) == 0 or len( self.sensitivityCohesions ) == 0: raise ValueError( "Sensitivity friction angles and cohesions required if runSensitivity is set to True" ) analyzer = SensitivityAnalyzer( self.outputDir, self.logger ) analyzer.runAnalysis( surfaceResult, time, self.sensitivityFrictionAngles, self.sensitivityCohesions, self.profileStartPoints, self.profileSearchRadius ) # Save filename = self.outputDir / f'fault_analysis_{i:04d}.vtu' writeMesh( mesh=surfaceResult, vtkOutput=VtkOutput( str( filename ) ), canOverwrite=True, logger=self.logger ) outputFiles.append( ( time, filename ) ) self.logger.info( f" Saved: {filename}" ) self.logger.info( "=" * 60 ) # Create master PVD createPVD( self.outputDir, 'fault_analysis.pvd', outputFiles, self.logger ) return surfaceResult
[docs] def applyFilter( self: Self ) -> None: """Analyze the stability of the fault for all timesteps requested.""" self.logger.info( f"Apply filter { self.logger.name }." ) self._initializeFaultGeometry() self.processAllTimeIndexesRequested() result: str = f"The filter { self.logger.name } succeeded" if self.counter.warningCount > 0: self.logger.warning( f"{ result } but { self.counter.warningCount } warnings have been logged." ) else: self.logger.info( f"{ result }." ) # Keep number of warnings logged during the filter application and reset the warnings count in case the filter is applied again. self.nbWarnings = self.counter.warningCount self.counter.resetWarningCount() return
def _plotResults( self: Self, surface: vtkUnstructuredGrid, contributingCells: vtkUnstructuredGrid, time: float ) -> None: """Plot and save results for one timestep. Args: surface (vtkUnstructuredGrid): Fault mesh. contributingCells (vtkUnstructuredGrid): Cells contributing to the fault. time (float): Time. """ visualizer = Visualizer( self.profileSearchRadius, self.minDepthProfiles, self.maxDepthProfiles, savePlots=self.savePlots, logger=self.logger ) visualizer.plotMohrCoulombDiagram( surface, time, self.outputDir, save=self.savePlots, ) if self.computePrincipalStresses: # Plot principal stress from volume cells visualizer.plotVolumeStressProfiles( volumeMesh=contributingCells, faultSurface=surface, time=time, path=self.outputDir, profileStartPoints=self.profileStartPoints )
[docs] def setSensitivityFrictionAngles( self: Self, angles: list[ float ] ) -> None: """Set the friction angles for sensitivy analysis.""" self.sensitivityFrictionAngles = angles
[docs] def setSensitivityCohesions( self: Self, cohesions: list[ float ] ) -> None: """Set the cohesions for sensitivy analysis.""" self.sensitivityCohesions = cohesions
[docs] def setOutputDirectory( self: Self, outputDir: Path ) -> None: """Set the saving output directory. Args: outputDir (Path): Output directory """ if outputDir != "None": self.outputDir = outputDir
[docs] def savePlotsOff( self: Self ) -> None: """Switch off plot saving option.""" self.savePlots = False
[docs] def savePlotsOn( self: Self ) -> None: """Switch on plot saving option.""" self.savePlots = True
[docs] def setStressName( self: Self, stressName: str ) -> None: """Set the stress attribute name. Args: stressName (str): Stress attribute name in the input mesh. """ if stressName != "None": self.stressName = stressName
[docs] def setBiotCoefficientName( self: Self, biotName: str ) -> None: """Set the stress attribute name. Args: biotName (str): Biot coefficient attribute name in the input mesh. """ if biotName != "None": self.biotName = biotName
[docs] def setProfileStartPoints( self: Self, startPoints: list[ tuple[ float, float ] ] ) -> None: """Set the profile start points as a list of ( x, y ). Args: startPoints (list[tuple[np.float64, np.float64]]): List of starting points coordinates. """ for coords in startPoints: if not len( coords ) == 2: raise TypeError( f"Expected a vector of length 2, not {len(coords)}." ) self.profileStartPoints = startPoints