Source code for geos.processing.tools.StressProjector

# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright 2023-2026 TotalEnergies.
# SPDX-FileContributor: Nicolas Pillardou, Paloma Martinez
import logging
from pathlib import Path
import numpy as np
import numpy.typing as npt
from typing_extensions import Self, Any, Union, Set
from scipy.spatial import cKDTree
from xml.etree.ElementTree import ElementTree, Element, SubElement
from enum import Enum

from vtkmodules.vtkCommonDataModel import vtkUnstructuredGrid
from vtkmodules.util.numpy_support import vtk_to_numpy

from geos.geomechanics.model.StressTensor import ( StressTensor )

from geos.mesh.io.vtkIO import ( writeMesh, VtkOutput )
from geos.mesh.utils.genericHelpers import ( extractCellSelection )
from geos.mesh.utils.arrayHelpers import ( isAttributeInObject, getArrayInObject, computeCellCenterCoordinates )
from geos.mesh.utils.arrayModifiers import ( createAttribute, updateAttribute )
from geos.utils.pieceEnum import ( Piece )

from geos.utils.Logger import ( Logger, getLogger )
from geos.utils.GeosOutputsConstants import ( GeosMeshOutputsEnum, PostProcessingOutputsEnum )

loggerTitle = "Stress Projector"


[docs] class StressProjectorWeightingScheme( str, Enum ): """Enum for weighting scheme.""" ARITHMETIC = "arithmetic" HARM = "harmonic" DIST = "distance" VOL = "volume" DIST_VOL = "distanceVolume" INV_SQ_DIST = "inverseSquareDistance"
[docs] class StressProjector: """Projects volume stress onto fault surfaces and tracks principal stresses in VTU.""" def __init__( self: Self, adjacencyMapping: dict[ int, dict[ str, list[ int ] ] ], geometricProperties: dict[ str, Any ], outputDir: str = ".", logger: Union[ Logger, None ] = None ) -> None: """Initialize with pre-computed adjacency mapping and geometric properties. Args: adjacencyMapping (dict[int, list[ vtkUnstructuredGrid]]): Pre-computed dict mapping fault cells to volume cells geometricProperties (dict[str, Any]): Pre-computed geometric properties from FaultGeometry: - 'volumes': cell volumes - 'centers': cell centers - 'distances': distances to fault - 'faultTree': KDTree for fault outputDir (str, optional): Output directory to save plots. Defaults is current directory logger (Union[Logger, None], optional): A logger to manage the output messages. Defaults to None, an internal logger is used. """ self.adjacencyMapping = adjacencyMapping self.stressName: str = GeosMeshOutputsEnum.AVERAGE_STRESS.value self.biotName: str = PostProcessingOutputsEnum.BIOT_COEFFICIENT.value # Store pre-computed geometric properties self.volumeCellVolumes = geometricProperties[ 'volumes' ] self.volumeCenters = geometricProperties[ 'centers' ] self.distanceToFault = geometricProperties[ 'distances' ] self.faultTree = geometricProperties[ 'faultTree' ] # Storage for time series metadata self.timestepInfo: list[ dict[ str, Any ] ] = [] # Track which cells to monitor (optional) self.monitoredCells: set[ int ] | None = None # Output directory for VTU files self.vtuOutputDir = Path( outputDir ) / "principal_stresses" # Logger self.logger: Logger if logger is None: self.logger = getLogger( loggerTitle, True ) else: self.logger = logging.getLogger( f"{logger.name}" ) self.logger.setLevel( logging.INFO ) self.logger.propagate = False
[docs] def setMonitoredCells( self: Self, cellIndices: list[ int ] ) -> None: """Set specific cells to monitor (optional). Args: cellIndices (list[int], optional): List of volume cell indices to track. If None, all contributing cells are tracked """ if len( cellIndices ) != 0: self.monitoredCells = set( cellIndices )
[docs] def projectStressToFault( self: Self, volumeData: vtkUnstructuredGrid, volumeInitial: vtkUnstructuredGrid, faultSurface: vtkUnstructuredGrid, time: float, timestep: int | None = None, weightingScheme: StressProjectorWeightingScheme = StressProjectorWeightingScheme.ARITHMETIC, computePrincipalStresses: bool = False, frictionAngle: float = 10, cohesion: float = 0 ) -> tuple[ vtkUnstructuredGrid, vtkUnstructuredGrid, vtkUnstructuredGrid | None ]: """Project stress and save principal stresses to VTU. Now uses pre-computed geometric properties for efficiency Args: volumeData (vtkUnstructuredGrid): Volumic mesh. volumeInitial (vtkUnstructuredGrid): Pre-simulation mesh faultSurface (vtkUnstructureGrid): Fault mesh. time (float): Time. timestep (int | None, optional): Timestep considered. Defaults is None. weightingScheme (StressProjectorWeightingScheme, optional): Weighting scheme for projection. Defaults is "arithmetic". computePrincipalStresses (bool, optional): Flag to compute principal stresses. Defaults is False. frictionAngle (float, optional): Friction angle in degrees. Defaults is 10 degrees. cohesion (float, optional): Rock cohesion in bar. Defaults is 0 bar. Returns: tuple[vtkUnstructuredGrid, vtkUnstructuredGrid, vtkUnstructuredGrid]: Fault mesh, volumic mesh, cell contributing mesh. """ if not isAttributeInObject( volumeData, self.stressName, Piece.CELLS ): raise ValueError( f"No stress data '{self.stressName}' in dataset" ) # ===================================================================== # 1. EXTRACT STRESS DATA # ===================================================================== pressure = getArrayInObject( volumeData, "pressure", Piece.CELLS ) / 1e5 pressureFault = getArrayInObject( volumeInitial, "pressure", Piece.CELLS ) / 1e5 pressureInitial = getArrayInObject( volumeInitial, "pressure", Piece.CELLS ) / 1e5 biot = getArrayInObject( volumeData, self.biotName, Piece.CELLS ) stressEffective = StressTensor.buildFromArray( getArrayInObject( volumeData, self.stressName, Piece.CELLS ) / 1e5 ) stressEffectiveInitial = StressTensor.buildFromArray( getArrayInObject( volumeInitial, self.stressName, Piece.CELLS ) / 1e5 ) # Convert effective stress to total stress arrI = np.eye( 3 )[ None, :, : ] stressTotal = stressEffective - biot[ :, None, None ] * pressure[ :, None, None ] * arrI stressTotalInitial = stressEffectiveInitial - biot[ :, None, None ] * pressureInitial[ :, None, None ] * arrI # ===================================================================== # 3. PREPARE FAULT GEOMETRY # ===================================================================== normals = vtk_to_numpy( faultSurface.GetCellData().GetNormals() ) tangent1 = vtk_to_numpy( faultSurface.GetCellData().GetArray( "Tangents1" ) ) tangent2 = vtk_to_numpy( faultSurface.GetCellData().GetArray( "Tangents2" ) ) faultCenters = vtk_to_numpy( computeCellCenterCoordinates( faultSurface ) ) updateAttribute( faultSurface, faultCenters, 'elementCenter', Piece.CELLS, logger=self.logger ) nFault = faultSurface.GetNumberOfCells() # ===================================================================== # 4. COMPUTE PRINCIPAL STRESSES FOR CONTRIBUTING CELLS # ===================================================================== if computePrincipalStresses and timestep is not None: # Collect all unique contributing cells allContributingCells: Set[ int ] = set() for _faultIdx, neighbors in self.adjacencyMapping.items(): allContributingCells.update( neighbors[ 'plus' ] ) allContributingCells.update( neighbors[ 'minus' ] ) # Filter by monitored cells if specified if self.monitoredCells is not None: cellsToTrack = allContributingCells.intersection( self.monitoredCells ) else: cellsToTrack = allContributingCells self.logger.info( f"Computing principal stresses for {len(cellsToTrack)} contributing cells..." ) # Create mesh with only contributing cells contributingMesh = self._createVolumicContribMesh( volumeData, faultSurface, cellsToTrack, self.adjacencyMapping, computePrincipalStresses=computePrincipalStresses, frictionAngle=frictionAngle, cohesion=cohesion ) self._savePrincipalStressVTU( contributingMesh, time, timestep ) else: contributingMesh = None # ===================================================================== # 6. PROJECT STRESS FOR EACH FAULT CELL # ===================================================================== sigmaNArr = np.zeros( nFault ) tauArr = np.zeros( nFault ) tauDipArr = np.zeros( nFault ) tauStrikeArr = np.zeros( nFault ) deltaSigmaNArr = np.zeros( nFault ) deltaTauArr = np.zeros( nFault ) nContributors = np.zeros( nFault, dtype=int ) self.logger.info( f"Projecting stress to {nFault} fault cells..." ) self.logger.info( f" Weighting scheme: {weightingScheme.value}" ) for faultIdx in range( nFault ): if faultIdx not in self.adjacencyMapping: continue volPlus = self.adjacencyMapping[ faultIdx ][ 'plus' ] volMinus = self.adjacencyMapping[ faultIdx ][ 'minus' ] allVol = volPlus + volMinus if len( allVol ) == 0: continue # =================================================================== # CALCULATE WEIGHTS (using pre-computed properties) # =================================================================== if weightingScheme == StressProjectorWeightingScheme.ARITHMETIC or weightingScheme == StressProjectorWeightingScheme.HARM: weights = np.ones( len( allVol ) ) / len( allVol ) elif weightingScheme == StressProjectorWeightingScheme.DIST: # Use pre-computed distances dists = np.array( [ self.distanceToFault[ v ] for v in allVol ] ) dists = np.maximum( dists, 1e-6 ) invDists = 1.0 / dists weights = invDists / np.sum( invDists ) elif weightingScheme == StressProjectorWeightingScheme.VOL: # Use pre-computed volumes vols = np.array( [ self.volumeCellVolumes[ v ] for v in allVol ] ) weights = vols / np.sum( vols ) elif weightingScheme == StressProjectorWeightingScheme.DIST_VOL: # Use pre-computed volumes and distances vols = np.array( [ self.volumeCellVolumes[ v ] for v in allVol ] ) dists = np.array( [ self.distanceToFault[ v ] for v in allVol ] ) dists = np.maximum( dists, 1e-6 ) weights = vols / dists weights = weights / np.sum( weights ) elif weightingScheme == StressProjectorWeightingScheme.INV_SQ_DIST: # Use pre-computed distances dists = np.array( [ self.distanceToFault[ v ] for v in allVol ] ) dists = np.maximum( dists, 1e-6 ) invSquareDistance = 1.0 / ( dists**2 ) weights = invSquareDistance / np.sum( invSquareDistance ) else: raise ValueError( f"Unknown weighting scheme: {weightingScheme}" ) # =================================================================== # ACCUMULATE WEIGHTED CONTRIBUTIONS # =================================================================== sigmaN = 0.0 tau = 0.0 tauDip = 0.0 tauStrike = 0.0 deltaSigmaN = 0.0 deltaTau = 0.0 for volIdx, w in zip( allVol, weights ): # Total stress (with pressure) sigmaFinal = stressTotal[ volIdx ] + pressureFault[ volIdx ] * np.eye( 3 ) sigmaInit = stressTotalInitial[ volIdx ] + pressureInitial[ volIdx ] * np.eye( 3 ) # Rotate to fault frame resFinal = StressTensor.rotateToFaultFrame( sigmaFinal, normals[ faultIdx ], tangent1[ faultIdx ], tangent2[ faultIdx ] ) resInitial = StressTensor.rotateToFaultFrame( sigmaInit, normals[ faultIdx ], tangent1[ faultIdx ], tangent2[ faultIdx ] ) # Accumulate weighted contributions sigmaN += w * resFinal[ 'normalStress' ] tau += w * resFinal[ 'shearStress' ] tauDip += w * resFinal[ 'shearDip' ] tauStrike += w * resFinal[ 'shearStrike' ] deltaSigmaN += w * ( resFinal[ 'normalStress' ] - resInitial[ 'normalStress' ] ) deltaTau += w * ( resFinal[ 'shearStress' ] - resInitial[ 'shearStress' ] ) sigmaNArr[ faultIdx ] = sigmaN tauArr[ faultIdx ] = tau tauDipArr[ faultIdx ] = tauDip tauStrikeArr[ faultIdx ] = tauStrike deltaSigmaNArr[ faultIdx ] = deltaSigmaN deltaTauArr[ faultIdx ] = deltaTau nContributors[ faultIdx ] = len( allVol ) # ===================================================================== # 7. STORE RESULTS ON FAULT SURFACE # ===================================================================== for attributeName, value in zip( [ "sigmaNEffective", "tauEffective", "tauStrike", "tauDip", "deltaSigmaNEffective", "deltaTauEffective" ], [ sigmaNArr, tauDipArr, tauStrikeArr, tauDipArr, deltaSigmaNArr, deltaTauArr ] ): updateAttribute( faultSurface, value, attributeName, Piece.CELLS, logger=self.logger ) # ===================================================================== # 8. STATISTICS # ===================================================================== valid = nContributors > 0 nValid = np.sum( valid ) self.logger.info( f" Stress projected: {nValid}/{nFault} fault cells ({nValid/nFault*100:.1f}%)" ) if np.sum( valid ) > 0: self.logger.info( f" Contributors per fault cell: min={np.min(nContributors[valid])}, " f"max={np.max(nContributors[valid])}, " f"mean={np.mean(nContributors[valid]):.1f}" ) return faultSurface, volumeData, contributingMesh
[docs] @staticmethod def computePrincipalStresses( stressTensor: StressTensor ) -> dict[ str, npt.NDArray[ np.float64 ] ]: """Compute principal stresses and directions. Convention: Compression is NEGATIVE - sigma1 = most compressive (most negative) - sigma3 = least compressive (least negative, or most tensile) Args: stressTensor (StressTensor): Stress tensor object Returns: dict[str, npt.NDArray[ np.float64]]: dict with eigenvalues, eigenvectors, meanStress, deviatoricStress """ eigenvalues, eigenvectors = np.linalg.eigh( stressTensor ) # Sort from MOST NEGATIVE to LEAST NEGATIVE (most compressive to least) # Example: -600 < -450 < -200, so -600 is sigma1 (most compressive) idx = np.argsort( eigenvalues ) # Ascending order (most negative first) eigenvaluesSorted = eigenvalues[ idx ] eigenVectorsSorted = eigenvectors[ :, idx ] return { 'sigma1': eigenvaluesSorted[ 0 ], # Most compressive (most negative) 'sigma2': eigenvaluesSorted[ 1 ], # Intermediate 'sigma3': eigenvaluesSorted[ 2 ], # Least compressive (least negative) 'meanStress': np.mean( eigenvaluesSorted ), 'deviatoricStress': eigenvaluesSorted[ 0 ] - eigenvaluesSorted[ 2 ], # sigma1 - sigma3 (negative - more negative = positive or less negative) 'direction1': eigenVectorsSorted[ :, 0 ], # Direction of sigma1 'direction2': eigenVectorsSorted[ :, 1 ], # Direction of sigma2 'direction3': eigenVectorsSorted[ :, 2 ] # Direction of sigma3 }
def _createVolumicContribMesh( self: Self, volumeData: vtkUnstructuredGrid, faultSurface: vtkUnstructuredGrid, cellsToTrack: set[ int ], mapping: dict[ int, dict[ str, list[ int ] ] ], computePrincipalStresses: bool = False, frictionAngle: float = 10, cohesion: float = 0 ) -> vtkUnstructuredGrid: """Create a mesh containing only contributing cells with principal stress data and compute analytical normal/shear stresses based on fault dip angle. Args: volumeData (vtkUnstructuredGrid): Volume mesh with stress data (rock_stress or averageStress) faultSurface (vtkUnstructuredGrid): Fault surface with dipAngle and strikeAngle per cell cellsToTrack (set[int]): Set of volume cell indices to include mapping (dict[int, list[ vtkUnstructuredGrid]]): Adjacency mapping {faultIdx: {'plus': [...], 'minus': [...]}} computePrincipalStresses (bool): Flag to compute principal stresses frictionAngle (float): Friction angle in degrees. cohesion (float): Cohesion in bar. Returns: vtkUnstructuredGrid: Mesh subset from contributing cells with new/updated arrays. """ # =================================================================== # EXTRACT STRESS DATA FROM VOLUME # =================================================================== if not isAttributeInObject( volumeData, self.stressName, Piece.CELLS ): raise ValueError( f"No stress data '{self.stressName}' in volume dataset" ) # Extract effective stress and pressure pressure = getArrayInObject( volumeData, "pressure", Piece.CELLS ) / 1e5 # Convert to bar biot = getArrayInObject( volumeData, self.biotName, Piece.CELLS ) stressEffective = StressTensor.buildFromArray( getArrayInObject( volumeData, self.stressName, Piece.CELLS ) / 1e5 ) # Convert effective stress to total stress arrI = np.eye( 3 )[ None, :, : ] stressTotal = stressEffective - biot[ :, None, None ] * pressure[ :, None, None ] * arrI # =================================================================== # EXTRACT SUBSET OF CELLS # =================================================================== cellIndices = sorted( cellsToTrack ) cellMask = np.zeros( volumeData.GetNumberOfCells(), dtype=bool ) cellMask[ cellIndices ] = True subsetMesh = extractCellSelection( volumeData, cellMask ) if subsetMesh.GetNumberOfCells() == 0: raise ValueError( "No cells found in the subset mesh." ) # =================================================================== # REBUILD MAPPING: subsetIdx -> originalIdx # =================================================================== originalCenters = vtk_to_numpy( computeCellCenterCoordinates( volumeData ) )[ cellIndices ] subsetCenters = vtk_to_numpy( computeCellCenterCoordinates( subsetMesh ) ) tree = cKDTree( originalCenters ) subsetToOriginal = np.zeros( subsetMesh.GetNumberOfCells(), dtype=int ) for subsetIdx in range( subsetMesh.GetNumberOfCells() ): dist, idx = tree.query( subsetCenters[ subsetIdx ] ) if dist > 1e-6: self.logger.warning( f"Cell {subsetIdx} not matched (dist={dist})" ) subsetToOriginal[ subsetIdx ] = cellIndices[ idx ] # =================================================================== # MAP VOLUME CELLS TO FAULT DIP/STRIKE ANGLES # =================================================================== self.logger.info( "Mapping volume cells to fault dip/strike angles..." ) # Create mapping: volume_cell_id -> [dipAngles, strikeAngles] volumeToDip: dict[ int, list[ np.float64 ] ] = {} volumeToStrike: dict[ int, list[ np.float64 ] ] = {} for faultIdx, neighbors in mapping.items(): # Get dip and strike angle from fault cell faultDip = getArrayInObject( faultSurface, 'dipAngle', Piece.CELLS )[ faultIdx ] # Strike is optional if isAttributeInObject( faultSurface, 'strikeAngle', Piece.CELLS ): faultStrike = getArrayInObject( faultSurface, 'strikeAngle', Piece.CELLS )[ faultIdx ] else: faultStrike = np.nan # Assign to all contributing volume cells (plus and minus) for volIdx in neighbors[ 'plus' ] + neighbors[ 'minus' ]: if volIdx not in volumeToDip: volumeToDip[ volIdx ] = [] volumeToStrike[ volIdx ] = [] volumeToDip[ volIdx ].append( faultDip ) volumeToStrike[ volIdx ].append( faultStrike ) # Average if a volume cell contributes to multiple fault cells volumeToDipAvg = { volIdx: np.mean( dips ) for volIdx, dips in volumeToDip.items() } volumeToStrikeAvg = { volIdx: np.mean( strikes ) for volIdx, strikes in volumeToStrike.items() } self.logger.info( f"Mapped {len(volumeToDipAvg)} volume cells to fault angles" ) # Statistics allDips = [ np.mean( dips ) for dips in volumeToDip.values() ] if len( allDips ) > 0: self.logger.info( f"Dip angle range: [{np.min(allDips):.1f}, {np.max(allDips):.1f}]°" ) # =================================================================== # COMPUTE PRINCIPAL STRESSES AND ANALYTICAL FAULT STRESSES # =================================================================== nCells = subsetMesh.GetNumberOfCells() sigma1Arr = np.zeros( nCells ) sigma2Arr = np.zeros( nCells ) sigma3Arr = np.zeros( nCells ) meanStressArr = np.zeros( nCells ) deviatoricStressArr = np.zeros( nCells ) pressureArr = np.zeros( nCells ) direction1Arr = np.zeros( ( nCells, 3 ) ) direction2Arr = np.zeros( ( nCells, 3 ) ) direction3Arr = np.zeros( ( nCells, 3 ) ) # NEW: Analytical fault stresses sigmaNAnalyticalArr = np.zeros( nCells ) tauAnalyticalArr = np.zeros( nCells ) dipAngleArr = np.zeros( nCells ) strikeAngleArr = np.zeros( nCells ) deltaArr = np.zeros( nCells ) sideArr = np.zeros( nCells, dtype=int ) nFaultCellsArr = np.zeros( nCells, dtype=int ) self.logger.info( "Computing principal stresses and analytical projections..." ) for subsetIdx in range( nCells ): origIdx = subsetToOriginal[ subsetIdx ] # =============================================================== # COMPUTE PRINCIPAL STRESSES # =============================================================== # Total stress = effective stress + pore pressure sigmaTotalCell = stressTotal[ origIdx ] + pressure[ origIdx ] * np.eye( 3 ) principal = self.computePrincipalStresses( sigmaTotalCell ) sigma1Arr[ subsetIdx ] = principal[ 'sigma1' ] sigma2Arr[ subsetIdx ] = principal[ 'sigma2' ] sigma3Arr[ subsetIdx ] = principal[ 'sigma3' ] meanStressArr[ subsetIdx ] = principal[ 'meanStress' ] deviatoricStressArr[ subsetIdx ] = principal[ 'deviatoricStress' ] pressureArr[ subsetIdx ] = pressure[ origIdx ] direction1Arr[ subsetIdx ] = principal[ 'direction1' ] direction2Arr[ subsetIdx ] = principal[ 'direction2' ] direction3Arr[ subsetIdx ] = principal[ 'direction3' ] # =============================================================== # COMPUTE ANALYTICAL FAULT STRESSES (Anderson formulas) # =============================================================== if origIdx in volumeToDipAvg: dipDeg = volumeToDipAvg[ origIdx ] dipAngleArr[ subsetIdx ] = dipDeg strikeDeg = volumeToStrikeAvg.get( origIdx, np.nan ) strikeAngleArr[ subsetIdx ] = strikeDeg # d = 90° - dip (angle from horizontal) deltaDeg = 90.0 - dipDeg deltaRad = np.radians( deltaDeg ) deltaArr[ subsetIdx ] = deltaDeg # Extract principal stresses (compression negative) sigma1 = principal[ 'sigma1' ] # Most compressive (most negative) sigma3 = principal[ 'sigma3' ] # Least compressive (least negative) # Anderson formulas (1951) # sigma_n = (sigma1 + sigma3)/2 - (sigma1 - sigma3)/2 * cos(2d) # tau = |(sigma1 - sigma3)/2 * sin(2d)| sigmaMean = ( sigma1 + sigma3 ) / 2.0 sigmaDiff = ( sigma1 - sigma3 ) / 2.0 sigmaNAnalytical = sigmaMean - sigmaDiff * np.cos( 2 * deltaRad ) tauAnalytical = sigmaDiff * np.sin( 2 * deltaRad ) sigmaNAnalyticalArr[ subsetIdx ] = sigmaNAnalytical tauAnalyticalArr[ subsetIdx ] = np.abs( tauAnalytical ) else: # No fault association - set to NaN dipAngleArr[ subsetIdx ] = np.nan strikeAngleArr[ subsetIdx ] = np.nan deltaArr[ subsetIdx ] = np.nan sigmaNAnalyticalArr[ subsetIdx ] = np.nan tauAnalyticalArr[ subsetIdx ] = np.nan # =============================================================== # DETERMINE SIDE (plus/minus/both) # =============================================================== isPlus = False isMinus = False faultCellCount = 0 for _faultIdx, neighbors in mapping.items(): if origIdx in neighbors[ 'plus' ]: isPlus = True faultCellCount += 1 if origIdx in neighbors[ 'minus' ]: isMinus = True faultCellCount += 1 if isPlus and isMinus: sideArr[ subsetIdx ] = 3 # both elif isPlus: sideArr[ subsetIdx ] = 1 # plus elif isMinus: sideArr[ subsetIdx ] = 2 # minus else: sideArr[ subsetIdx ] = 0 # none (should not happen) nFaultCellsArr[ subsetIdx ] = faultCellCount # =================================================================== # ADD DATA TO MESH # =================================================================== for attributeName, attributeArray in zip( ( 'sigma1', 'sigma2', 'sigma3', 'meanStress', 'deviatoricStress', 'pressure_bar', 'sigma1Direction', 'sigma2Direction', 'sigma3Direction', 'sigmaNAnalytical', 'tauAnalytical', 'dipAngle', 'strikeAngle', 'deltaAngle', ), ( sigma1Arr, sigma2Arr, sigma3Arr, meanStressArr, deviatoricStressArr, pressureArr, direction1Arr, direction2Arr, direction3Arr, sigmaNAnalyticalArr, tauAnalyticalArr, dipAngleArr, strikeAngleArr, deltaArr ) ): updateAttribute( subsetMesh, attributeArray, attributeName, piece=Piece.CELLS, logger=self.logger ) # =================================================================== # COMPUTE SCU ANALYTICALLY (Mohr-Coulomb) # =================================================================== mu = np.tan( np.radians( frictionAngle ) ) # tau_crit = C - sigma_n * mu # Note: sigma_n is negative (compression), so -sigma_n * mu is positive tauCriticalArr: npt.NDArray[ np.float64 ] = cohesion - sigmaNAnalyticalArr * mu # SCU = tau / tau_crit SCUAnalyticalArr: npt.NDArray[ np.float64 ] = np.divide( tauAnalyticalArr, tauCriticalArr, out=np.zeros_like( tauAnalyticalArr ), where=tauCriticalArr != 0 ) for attributeName, attributeArray in zip( ( 'tauCriticalAnalytical', 'SCUAnalytical', 'side', 'nFaultCells', 'originalCellIds' ), ( tauCriticalArr, SCUAnalyticalArr, sideArr, nFaultCellsArr, subsetToOriginal ) ): createAttribute( subsetMesh, attributeArray, attributeName, piece=Piece.CELLS, logger=self.logger ) # =================================================================== # STATISTICS # =================================================================== validAnalytical = ~np.isnan( sigmaNAnalyticalArr ) nValid = np.sum( validAnalytical ) nCritical = np.sum( ( SCUAnalyticalArr >= 0.8 ) & ( SCUAnalyticalArr < 1.0 ) ) nUnstable = np.sum( SCUAnalyticalArr >= 1.0 ) if nValid > 0: self.logger.info( f"Analytical fault stresses computed for {nValid}/{nCells} cells" ) self.logger.info( f" sigma_n range: [{np.nanmin(sigmaNAnalyticalArr):.1f}, {np.nanmax(sigmaNAnalyticalArr):.1f}] bar" ) self.logger.info( f" tau range: [{np.nanmin(tauAnalyticalArr):.1f}, {np.nanmax(tauAnalyticalArr):.1f}] bar" ) self.logger.info( f" Dip angle range: [{np.nanmin(dipAngleArr):.1f}, {np.nanmax(dipAngleArr):.1f}]°" ) self.logger.info( f" SCU range: [{np.nanmin(SCUAnalyticalArr[validAnalytical]):.2f}, {np.nanmax(SCUAnalyticalArr[validAnalytical]):.2f}]" ) self.logger.info( f" Critical cells (SCU≥0.8): {nCritical} ({nCritical/nValid*100:.1f}%)" ) self.logger.info( f" Unstable cells (SCU≥1.0): {nUnstable} ({nUnstable/nValid*100:.1f}%)" ) else: self.logger.warning( " No analytical stresses computed (no fault mapping)" ) return subsetMesh def _savePrincipalStressVTU( self: Self, mesh: vtkUnstructuredGrid, time: float, timestep: int ) -> None: """Save principal stress mesh to VTU file. Parameters: mesh: PyVista mesh with principal stress data time: Simulation time timestep: Timestep index """ # Create output directory self.vtuOutputDir.mkdir( parents=True, exist_ok=True ) # Generate filename vtuFilename = f"principal_stresses_{timestep:05d}.vtu" vtuPath = self.vtuOutputDir / vtuFilename # Save mesh writeMesh( mesh=mesh, vtkOutput=VtkOutput( vtuPath ), logger=self.logger, canOverwrite=True ) # Store metadata for PVD self.timestepInfo.append( { 'time': time if time is not None else timestep, 'timestep': timestep, 'file': vtuFilename } ) self.logger.info( f" Saved principal stresses: {vtuFilename}" )
[docs] def savePVDCollection( self: Self, filename: str = "principal_stresses.pvd" ) -> None: """Create PVD file for time series visualization in ParaView. Args: filename: Name of PVD file. Defaults is "principal_stresses.pvd". """ if len( self.timestepInfo ) == 0: self.logger.error( " No timestep data to save in PVD" ) return pvdPath = self.vtuOutputDir / filename self.logger.info( f" Creating PVD collection: {pvdPath}" ) self.logger.info( f" Timesteps: {len(self.timestepInfo)}" ) # Create XML structure root = Element( 'VTKFile' ) root.set( 'type', 'Collection' ) root.set( 'version', '0.1' ) root.set( 'byte_order', 'LittleEndian' ) collection = SubElement( root, 'Collection' ) for info in self.timestepInfo: dataset = SubElement( collection, 'DataSet' ) dataset.set( 'timestep', str( info[ 'time' ] ) ) dataset.set( 'group', '' ) dataset.set( 'part', '0' ) dataset.set( 'file', info[ 'file' ] ) # Write to file tree = ElementTree( root ) tree.write( str( pvdPath ), encoding='utf-8', xml_declaration=True ) self.logger.info( "PVD file created successfully: {pvdPath}" )
[docs] def setStressName( self: Self, stressName: str ) -> None: """Set the stress attribute name. Args: stressName (str): Stress attribute name in the input mesh. """ if stressName != "None": self.stressName = stressName
[docs] def setBiotCoefficientName( self: Self, biotName: str ) -> None: """Set the stress attribute name. Args: biotName (str): Biot coefficient attribute name in the input mesh. """ if biotName != "None": self.biotName = biotName