# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright 2023-2026 TotalEnergies.
# SPDX-FileContributor: Nicolas Pillardou, Paloma Martinez
import logging
import numpy as np
from pathlib import Path
from typing_extensions import Self, Any
import numpy.typing as npt
from scipy.spatial import cKDTree
from vtkmodules.vtkCommonDataModel import vtkCellLocator, vtkMultiBlockDataSet, vtkUnstructuredGrid
from vtkmodules.util.numpy_support import vtk_to_numpy, numpy_to_vtk
from geos.mesh.utils.arrayModifiers import ( createAttribute, updateAttribute )
from geos.mesh.utils.arrayHelpers import ( getArrayInObject, computeCellCenterCoordinates )
from geos.mesh.utils.multiblockModifiers import ( mergeBlocks )
from geos.mesh.utils.genericHelpers import ( extractCellSelection, extractSurface, computeNormals, computeCellVolumes )
from geos.utils.pieceEnum import ( Piece )
from geos.mesh.io.vtkIO import ( writeMesh, VtkOutput )
from geos.utils.Logger import ( Logger, getLogger )
loggerTitle: str = "Fault Geometry"
[docs]
class FaultGeometry:
"""Handles fault surface extraction and normal computation with optimizations."""
def __init__( self: Self,
mesh: vtkUnstructuredGrid,
faultValues: list[ int | float ],
faultAttribute: str,
volumeMesh: vtkUnstructuredGrid,
outputDir: str = ".",
logger: logging.Logger | None = None ) -> None:
"""Initialize fault geometry with pre-computed topology.
Args:
mesh (vtkUnstructuredGrid): Pre-simulation mesh
faultValues (list[int]): Values of fault attribute to consider
faultAttribute (str): Fault attribute name in the mesh
volumeMesh (vtkUnstructuredGrid): Volumic mesh
outputDir (str, optional): Output directory
Defaults is ".".
logger (Union[Logger, None], optional): A logger to manage the output messages.
Defaults to None, an internal logger is used.
"""
self.initialized = False
self.mesh = mesh
self.faultValues = faultValues
self.faultAttribute = faultAttribute
self.volumeMesh = volumeMesh
self.faultSurface: vtkUnstructuredGrid
self.surfaces: list[ vtkUnstructuredGrid ] = []
self.adjacencyMapping: dict[ int, dict[ str, list[ int ] ] ]
self.contributingCells: vtkUnstructuredGrid
self.contributingCellsPlus: vtkUnstructuredGrid
self.contributingCellsMinus: vtkUnstructuredGrid
# NEW: Pre-computed geometric properties
self.volumeCellVolumes: npt.NDArray[ np.float64 ] # Volume of each cell
self.volumeCenters: npt.NDArray[ np.float64 ] # Center coordinates
self.distanceToFault: npt.NDArray[ np.float64 ] # Distance from each volume cell to nearest fault
self.faultTree: cKDTree # KDTree for fault surface
# Config
self.outputDir = Path( outputDir )
self.outputDir.mkdir( parents=True, exist_ok=True )
# Logger
self.logger: Logger
if logger is None:
self.logger = getLogger( loggerTitle, True )
else:
self.logger = logging.getLogger( f"{logger.name}" )
self.logger.setLevel( logging.INFO )
self.logger.propagate = False
[docs]
def initialize( self: Self, processFaultsSeparately: bool = True, saveContributionCells: bool = True ) -> None:
"""One-time initialization: compute normals, adjacency topology, and geometric properties.
Args:
processFaultsSeparately (bool): Flag to process faults separately or not.
Defaults is True.
saveContributionCells (bool): Save the contributing cells as VTU.
Defaults is True.
"""
self.logger.info( "***Computing normals and adjacency topology***" )
# Extract and compute normals
self.faultSurface, self.surfaces = self._extractAndComputeNormals()
# Pre-compute adjacency mapping
self.logger.info( "Pre-computing volume-fault adjacency topology with face-sharing (adaptive epsilon) method." )
self.adjacencyMapping = self._buildAdjacencyMappingFaceSharing(
processFaultsSeparately=processFaultsSeparately )
# Mark and optionally save contributing cells
self._markContributingCells( saveContributionCells )
# NEW: Pre-compute geometric properties
self._precomputeGeometricProperties()
nMapped = len( self.adjacencyMapping )
nWithBoth = sum( 1 for m in self.adjacencyMapping.values()
if len( m[ 'plus' ] ) > 0 and len( m[ 'minus' ] ) > 0 )
self.logger.info( "Adjacency topology computed:" )
self.logger.info( f" {nMapped}/{self.faultSurface.GetNumberOfCells()} fault cells mapped" )
self.logger.info( f" {nWithBoth} cells have neighbors on both sides" )
self.initialized = True
self.logger.info( "-" * 60 )
def _markContributingCells( self: Self, saveContributionCells: bool = True ) -> None:
"""Mark volume cells that contribute to fault stress projection.
Args:
saveContributionCells (bool): Save contributing cells as VTU.
Defaults is True.
"""
self.logger.info( "Marking contributing volume cells..." )
nVolume = self.volumeMesh.GetNumberOfCells()
# Collect contributing cells by side
allPlus = set()
allMinus = set()
for _faultIdx, neighbors in self.adjacencyMapping.items():
allPlus.update( neighbors[ 'plus' ] )
allMinus.update( neighbors[ 'minus' ] )
# Create classification array
contributionSide = np.zeros( nVolume, dtype=int )
for idx in allPlus:
if 0 <= idx < nVolume:
contributionSide[ idx ] += 1
for idx in allMinus:
if 0 <= idx < nVolume:
contributionSide[ idx ] += 2
# Add classification to volume mesh
contribMask = contributionSide > 0
contribMask = contribMask.astype( int )
createAttribute( self.volumeMesh, contributionSide, "contributionSide", logger=self.logger )
createAttribute( self.volumeMesh, contribMask, "contributionToFaults", logger=self.logger )
# Extract subsets
maskAll = np.where( contribMask )[ 0 ]
maskPlus = np.where( ( contributionSide == 1 ) | ( contributionSide == 3 ) )[ 0 ]
maskMinus = np.where( ( contributionSide == 2 ) | ( contributionSide == 3 ) )[ 0 ]
self.contributingCells = extractCellSelection( self.volumeMesh, maskAll )
self.contributingCellsPlus = extractCellSelection( self.volumeMesh, maskPlus )
self.contributingCellsMinus = extractCellSelection( self.volumeMesh, maskMinus )
# Statistics
nContrib = len( maskAll )
nPlus = np.sum( contributionSide == 1 )
nMinus = np.sum( contributionSide == 2 )
nBoth = np.sum( contributionSide == 3 )
pctContrib = nContrib / nVolume * 100
self.logger.info( f"Total contributing: {nContrib}/{nVolume} ({pctContrib:.1f}%)" )
self.logger.info( f" Plus side only: {nPlus} cells" )
self.logger.info( f" Minus side only: {nMinus} cells" )
self.logger.info( f" Both sides: {nBoth} cells" )
# Save to files if requested
if saveContributionCells:
self._saveContributingCells()
def _saveContributingCells( self: Self ) -> None:
"""Save contributing volume cells to VTU files.
Saves three files: all, plus side, minus side.
"""
# Save all contributing cells
filenameAll = self.outputDir / "contributing_cells_all.vtu"
writeMesh( mesh=self.contributingCells,
vtkOutput=VtkOutput( filenameAll ),
canOverwrite=True,
logger=self.logger )
# Save plus side
filenamePlus = self.outputDir / "contributingCellsPlus.vtu"
writeMesh( mesh=self.contributingCellsPlus,
vtkOutput=VtkOutput( filenamePlus ),
canOverwrite=True,
logger=self.logger )
# Save minus side
filenameMinus = self.outputDir / "contributingCellsMinus.vtu"
writeMesh( mesh=self.contributingCellsMinus,
vtkOutput=VtkOutput( filenameMinus ),
canOverwrite=True,
logger=self.logger )
self.logger.info( "Contribution cells saved:" )
self.logger.info(
f" *All* contributing cells: {filenameAll} - "
f" ({self.contributingCells.GetNumberOfCells()} cells, {self.contributingCells.GetNumberOfPoints()} points)"
)
self.logger.info(
f" *Plus* side cells: {filenamePlus} -"
f" ({self.contributingCellsPlus.GetNumberOfCells()} cells, {self.contributingCellsPlus.GetNumberOfPoints()} points)"
)
self.logger.info(
f" *Minus* side cells: {filenameMinus} -"
f" ({self.contributingCellsMinus.GetNumberOfCells()} cells, {self.contributingCellsMinus.GetNumberOfPoints()} points)"
)
[docs]
def getContributingCells( self: Self, side: str = 'all' ) -> vtkUnstructuredGrid:
"""Get the extracted contributing cells.
Args:
side (str): 'all', 'plus', or 'minus'
Returns:
vtkUnstructuredGrid: Contributing volume cells
Raises:
ValueError: Contributed cells not computed
ValueError: Invalid requested side
"""
if not self.initialized:
raise ValueError( "Contributing cells not yet computed. Call initialize() first." )
if side == 'all':
return self.contributingCells
elif side == 'plus':
return self.contributingCellsPlus
elif side == 'minus':
return self.contributingCellsMinus
else:
raise ValueError( f"Invalid side '{side}'. Must be 'all', 'plus', or 'minus'." )
[docs]
def getGeometricProperties( self: Self ) -> dict[ str, Any ]:
"""Get pre-computed geometric properties.
Properties include:
- 'volumes': ndarray of cell volumes
- 'centers': ndarray of cell centers (nCells, 3)
- 'distances': ndarray of distances to nearest fault cell
- 'faultTree': KDTree for fault surface
Returns:
dict[ str, Any ]: geometric properties
"""
if not self.initialized:
raise ValueError( "Geometric properties not computed. Call initialize() first." )
return {
'volumes': self.volumeCellVolumes,
'centers': self.volumeCenters,
'distances': self.distanceToFault,
'faultTree': self.faultTree
}
def _precomputeGeometricProperties( self: Self ) -> None:
"""Pre-compute geometric properties of volume mesh for efficient stress projection.
Computes:
- Cell volumes (for volume-weighted averaging)
- Cell centers (for distance calculations)
- Distance from each volume cell to nearest fault cell
- KDTree for fault surface
"""
self.logger.info( "Pre-computing geometric properties..." )
nVolume = self.volumeMesh.GetNumberOfCells()
# 1. Compute volume centers
self.volumeCenters = vtk_to_numpy( computeCellCenterCoordinates( self.volumeMesh ) )
# 2. Compute cell volumes
volumeWithSizes = computeCellVolumes( self.volumeMesh )
self.volumeCellVolumes = getArrayInObject( volumeWithSizes, 'Volume', Piece.CELLS )
self.logger.info( f" Cells volume range: [{np.min(self.volumeCellVolumes):.1e}, "
f"{np.max(self.volumeCellVolumes):.1e}] m³" )
# 3. Build KDTree for fault surface (for fast distance queries)
faultCenters = computeCellCenterCoordinates( self.faultSurface )
self.faultTree = cKDTree( faultCenters )
# 4. Compute distance from each volume cell to nearest fault cell
self.distanceToFault = np.zeros( nVolume )
# Vectorized query for all points at once (much faster)
distances, _ = self.faultTree.query( self.volumeCenters )
self.distanceToFault = distances
self.logger.info( f" Distance to fault range: [{np.min(self.distanceToFault):.1f},"
f"{np.max(self.distanceToFault):.1f}] m" )
# 5. Add these properties to volume mesh for reference
createAttribute( self.volumeMesh, self.volumeCellVolumes, 'cellVolume', Piece.CELLS, logger=self.logger )
createAttribute( self.volumeMesh, self.distanceToFault, 'distanceToFault', Piece.CELLS, logger=self.logger )
def _buildAdjacencyMappingFaceSharing( self: Self,
processFaultsSeparately: bool = True
) -> dict[ int, dict[ str, list[ int ] ] ]:
"""Build adjacency for cells sharing faces with fault.
Uses adaptive epsilon optimization.
"""
faultIds = np.unique( getArrayInObject( self.faultSurface, self.faultAttribute, Piece.CELLS ) )
nFaults = len( faultIds )
self.logger.info( f"Processing {nFaults} separate faults: {faultIds}" )
allMappings: dict[ int, dict[ str, list[ int ] ] ] = {}
for faultId in faultIds:
mask = getArrayInObject( self.faultSurface, self.faultAttribute, Piece.CELLS ) == faultId
indices = np.where( mask )[ 0 ]
singleFault = extractCellSelection( self.faultSurface, indices )
self.logger.info( f" Mapping Fault {faultId}..." )
# Build face-sharing mapping with adaptive epsilon
localMapping = self._findFaceSharingCells( singleFault )
# Remap local indices to global fault indices
for localIdx, neighbors in localMapping.items():
globalIdx = indices[ localIdx ]
allMappings[ globalIdx ] = neighbors
return allMappings
def _findFaceSharingCells( self: Self, faultSurface: vtkUnstructuredGrid ) -> dict[ int, dict[ str, list[ int ] ] ]:
"""Find volume cells that share a FACE with fault cells.
Uses FindCell with adaptive epsilon to maximize cells with both neighbors
Args:
faultSurface (vtkUnstructuredGrid): Fault surface mesh
Returns:
dict[int, dict[str, list[int]]] : Mapping from the best epsilon found.
"""
volCenters = vtk_to_numpy( computeCellCenterCoordinates( self.volumeMesh ) )
faultNormals = vtk_to_numpy( faultSurface.GetCellData().GetNormals() )
faultCenters = vtk_to_numpy( computeCellCenterCoordinates( faultSurface ) )
# Determine base epsilon based on mesh size
volBounds = self.volumeMesh.bounds
typicalSize = np.mean( [
volBounds[ 1 ] - volBounds[ 0 ], volBounds[ 3 ] - volBounds[ 2 ], volBounds[ 5 ] - volBounds[ 4 ]
] ) / 100.0
# Build VTK cell locator (once)
locator = vtkCellLocator()
locator.SetDataSet( self.volumeMesh )
locator.BuildLocator()
# Try multiple epsilon values and keep the best
epsilonCandidates = [
typicalSize * 0.005, typicalSize * 0.01, typicalSize * 0.05, typicalSize * 0.1, typicalSize * 0.2,
typicalSize * 0.5, typicalSize * 1.0
]
self.logger.info( f" Testing {len(epsilonCandidates)} epsilon values..." )
bestEpsilon: float
bestMapping: dict
bestScore: float = -1
bestStats: dict = {}
for epsilon in epsilonCandidates:
# Test this epsilon
mapping, stats = self._testEpsilon( faultSurface, locator, epsilon, faultCenters, faultNormals, volCenters )
# Score = percentage with both sides + penalty for no neighbors
score = stats[ 'pctBoth' ] - 2.0 * stats[ 'pctNone' ]
self.logger.info( f" ε={epsilon:.3f}m -> Both: {stats['pctBoth']:.1f}%, "
f" One: {stats['pctOne']:.1f}%, None: {stats['pctNone']:.1f}%, "
f" Avg: {stats['avgNeighbors']:.2f} (score: {score:.1f})" )
if score > bestScore:
bestScore = score
bestEpsilon = epsilon
bestMapping = mapping
bestStats = stats
self.logger.info( f" Best epsilon: {bestEpsilon:.6f}m" )
self.logger.info( " Face-sharing mapping completed:" )
self.logger.info( f" Both sides: {bestStats['nBoth']} ({bestStats['pctBoth']:.1f}%)" )
self.logger.info( f" One side: {bestStats['nOne']} ({bestStats['pctOne']:.1f}%)" )
self.logger.info( f" No neighbors: {bestStats['nNone']} ({bestStats['pctNone']:.1f}%)" )
self.logger.info( f" Average neighbors per fault cell: {bestStats['avgNeighbors']:.2f}" )
return bestMapping
def _testEpsilon(
self: Self, faultSurface: vtkUnstructuredGrid, locator: vtkCellLocator, epsilon: float,
faultCenters: npt.NDArray[ np.float64 ], faultNormals: npt.NDArray[ np.float64 ],
volCenters: npt.NDArray[ np.float64 ]
) -> tuple[ dict[ int, dict[ str, list[ int ] ] ], dict[ str, float | int ] ]:
"""Test a specific epsilon value and return mapping and statistics.
Statistics include:
- 'nBoth': nFoundBoth,
- 'nOne': nFoundOne,
- 'nNone': nFoundNone,
- 'pctBoth': nFoundBoth / nCells * 100,
- 'pctOne': nFoundOne / nCells * 100,
- 'pctNone': nFoundNone / nCells * 100,
- 'avgNeighbors': avgNeighbors
Args:
faultSurface (vtkUnstructuredGrid): Fault mesh
locator (vtkCellLocator): Cell locator
epsilon (float): Epsilon to consider
faultCenters (npt.NDArray[np.float64]): Fault mesh cells centers
faultNormals: npt.NDArray[ np.float64 ]: Fault mesh normals
volCenters (npt.NDArray[np.float64]): Volumic mesh cells centers
Returns:
tuple[ dict[int, dict[str, list[int]]], dict[ str, float | int ]]]: Mapping of plus and minus cells, statistics
"""
mapping = {}
nFoundBoth = 0
nFoundOne = 0
nFoundNone = 0
totalNeighbors = 0
for fid in range( faultSurface.GetNumberOfCells() ):
fcenter = faultCenters[ fid ]
fnormal = faultNormals[ fid ]
plusCells = []
minusCells = []
# Search on PLUS side
pointPlus = fcenter + epsilon * fnormal
cellIdPlus = locator.FindCell( pointPlus )
if cellIdPlus >= 0:
plusCells.append( cellIdPlus )
# Search on MINUS side
pointMinus = fcenter - epsilon * fnormal
cellIdMinus = locator.FindCell( pointMinus )
if cellIdMinus >= 0:
minusCells.append( cellIdMinus )
mapping[ fid ] = { "plus": plusCells, "minus": minusCells }
# Statistics
nNeighbors = len( plusCells ) + len( minusCells )
totalNeighbors += nNeighbors
if len( plusCells ) > 0 and len( minusCells ) > 0:
nFoundBoth += 1
elif len( plusCells ) > 0 or len( minusCells ) > 0:
nFoundOne += 1
else:
nFoundNone += 1
nCells = faultSurface.GetNumberOfCells()
if nCells <= 0:
raise ValueError( "No cell in the fault surface." )
avgNeighbors = totalNeighbors / nCells if nCells > 0 else 0
stats = {
'nBoth': nFoundBoth,
'nOne': nFoundOne,
'nNone': nFoundNone,
'pctBoth': nFoundBoth / nCells * 100,
'pctOne': nFoundOne / nCells * 100,
'pctNone': nFoundNone / nCells * 100,
'avgNeighbors': avgNeighbors
}
return mapping, stats
def _extractAndComputeNormals( self: Self ) -> tuple[ vtkUnstructuredGrid, list[ vtkUnstructuredGrid ] ]:
"""Extract fault surfaces and compute oriented normals/tangents.
Returns:
tuple[vtkUnstructuredGrid, list[vtkUnstructuredGrid]]: Merged mesh containing all fault surfaces, list containing fault meshes.
"""
surfaces = []
mb = vtkMultiBlockDataSet()
mb.SetNumberOfBlocks( len( self.faultValues ) )
for i, faultId in enumerate( self.faultValues ):
# Extract fault cells
faultMask = np.where(
getArrayInObject( self.mesh, self.faultAttribute, piece=Piece.CELLS ) == faultId )[ 0 ]
faultCells = extractCellSelection( self.mesh, ids=faultMask )
if faultCells.GetNumberOfCells() == 0:
self.logger.warning( f" No cells for fault {faultId}" )
continue
# Extract surface
surf = extractSurface( faultCells )
if surf.GetNumberOfCells() == 0:
self.logger.warning( f" No cells found during surface extraction of fault {faultId}" )
continue
# Compute normals
surf = computeNormals( surf, self.logger )
# Orient normals consistently within the fault
surf = self._orientNormals( surf )
mb.SetBlock( i, surf )
surfaces.append( surf )
if len( surfaces ) == 0:
raise ValueError( "No surface could be extracted with {self.faultAttribute} fault attribute name." )
merged = mergeBlocks( mb, keepPartialAttributes=True, logger=self.logger )
self.logger.info( f"Normals computed for {merged.GetNumberOfCells()} fault cells" )
return merged, surfaces
def _orientNormals( self: Self, surf: vtkUnstructuredGrid, rotateNormals: bool = False ) -> vtkUnstructuredGrid:
"""Ensure normals point in consistent direction within the fault.
Args:
surf (vtkUnstructuredGrid): Fault mesh
rotateNormals (bool, optional): Flag to flip the normals.
Defaults is False.
Returns:
vtkUnstructuredGrid: Fault mesh with normals and tangents attributes appended.
"""
normals = vtk_to_numpy( surf.GetCellData().GetNormals() )
meanNormal = np.mean( normals, axis=0 )
meanNormal /= np.linalg.norm( meanNormal )
nCells = len( normals )
tangents1 = np.zeros( ( nCells, 3 ) )
tangents2 = np.zeros( ( nCells, 3 ) )
for i, normal in enumerate( normals ):
# Flip if pointing opposite to mean
if np.dot( normal, meanNormal ) < 0:
normals[ i ] = -normal
if rotateNormals:
normals[ i ] = -normal
# Compute orthogonal tangents
normal = normals[ i ]
if abs( normal[ 0 ] ) > 1e-6 or abs( normal[ 1 ] ) > 1e-6:
t1 = np.array( [ -normal[ 1 ], normal[ 0 ], 0 ] )
else:
t1 = np.array( [ 0, -normal[ 2 ], normal[ 1 ] ] )
t1 /= np.linalg.norm( t1 )
t2 = np.cross( normal, t1 )
t2 /= np.linalg.norm( t2 )
tangents1[ i ] = t1
tangents2[ i ] = t2
surf.GetCellData().SetNormals( numpy_to_vtk( normals.ravel() ) )
createAttribute( surf, tangents1, "Tangents1", logger=self.logger )
createAttribute( surf, tangents2, "Tangents2", logger=self.logger )
surf.GetCellData().SetActiveTangents( "Tangents1" )
dipAngles, strikeAngles = self.computeDipStrikeFromCellBase( normals, tangents1, tangents2 )
createAttribute( surf, dipAngles, "dipAngle", logger=self.logger )
createAttribute( surf, strikeAngles, "strikeAngle", logger=self.logger )
return surf
[docs]
def computeDipStrikeFromCellBase(
self: Self, normals: npt.NDArray[ np.float64 ], tangent1: npt.NDArray[ np.float64 ],
tangent2: npt.NDArray[ np.float64 ] ) -> tuple[ npt.NDArray[ np.float64 ], npt.NDArray[ np.float64 ] ]:
"""Compute dip and strike angles from cell normal and tangent vectors.
Assumptions:
* Coordinate system: X = East, Y = North, Z = Up.
* Vectors are provided per cell (shape: (nCells, 3)).
* Input vectors are assumed to be orthonormal (n = t1 × t2).
Args:
normals (npt.NDArray[np.float64]): Normal vectors
tangent1 (npt.NDArray[np.float64]): First tangent vector
tangent2 (npt.NDArray[np.float64]): Second tangent vector
Returns:
tuple[npt.NDArray[np.float64], npt.NDArray[np.float64]]: Dip and strike.
"""
# 1. Identify the strike vector (the most horizontal tangent)
t1Horizontal = tangent1 - ( tangent1[ :, 2 ][ :, np.newaxis ] * np.array( [ 0, 0, 1 ] ) )
t2Horizontal = tangent2 - ( tangent2[ :, 2 ][ :, np.newaxis ] * np.array( [ 0, 0, 1 ] ) )
normT1Horizontal = np.linalg.norm( t1Horizontal, axis=1 )
normT2Horizontal = np.linalg.norm( t2Horizontal, axis=1 )
useT1 = normT1Horizontal > normT2Horizontal
strikeVector = np.zeros_like( tangent1 )
strikeVector[ useT1 ] = t1Horizontal[ useT1 ]
strikeVector[ ~useT1 ] = t2Horizontal[ ~useT1 ]
# Normalize
strikeNorm = np.linalg.norm( strikeVector, axis=1 )
strikeNorm[ strikeNorm == 0 ] = 1.0
strikeVector = strikeVector / strikeNorm[ :, np.newaxis ]
# 2. Compute the strike (azimuth clockwise from North)
strikeRad = np.arctan2( strikeVector[ :, 0 ], strikeVector[ :, 1 ] ) # atan2(E, N)
strikeDeg = np.degrees( strikeRad )
strikeDeg = np.where( strikeDeg < 0, strikeDeg + 360, strikeDeg )
# 3. Compute the dip
normHorizontal = np.linalg.norm( normals[ :, :2 ], axis=1 )
dipRad = np.arcsin( np.clip( normHorizontal, 0, 1 ) ) # clip to prevent rounding errors
dipDeg = np.degrees( dipRad )
return dipDeg, strikeDeg
[docs]
def diagnoseNormals( self: Self ) -> vtkUnstructuredGrid:
"""Diagnostic visualization to check normal quality.
Shows orthogonality and orientation issues.
Returns:
vtkUnstructuredGrid: The input surface, possibly annotated with an
additional field indicating orthogonality errors.
"""
surface = self.faultSurface
self.logger.info( "Diagnostic of normals" )
normals = surface.GetCellData().GetNormals()
tangent1 = surface.GetCellData().GetTangents()
tangent2 = surface.GetCellData().GetArray( "Tangents2" )
nCells = len( normals )
# Check orthogonality
dotNormT1 = np.array( [ np.dot( normals[ i ], tangent1[ i ] ) for i in range( nCells ) ] )
dotNormT2 = np.array( [ np.dot( normals[ i ], tangent2[ i ] ) for i in range( nCells ) ] )
dotT1T2 = np.array( [ np.dot( tangent1[ i ], tangent2[ i ] ) for i in range( nCells ) ] )
self.logger.info( "Orthogonality (should be close to 0):" )
self.logger.info(
f" Normal · Tangent1 : max={np.max(np.abs(dotNormT1)):.2e}, mean={np.mean(np.abs(dotNormT1)):.2e}" )
self.logger.info(
f" Normal · Tangent2 : max={np.max(np.abs(dotNormT2)):.2e}, mean={np.mean(np.abs(dotNormT2)):.2e}" )
self.logger.info(
f" Tangent1 · Tangent2: max={np.max(np.abs(dotT1T2)):.2e}, mean={np.mean(np.abs(dotT1T2)):.2e}" )
# Check vector norms (should be unit length)
normN = np.linalg.norm( normals, axis=1 )
normT1 = np.linalg.norm( tangent1, axis=1 )
normT2 = np.linalg.norm( tangent2, axis=1 )
self.logger.info( "Norms (should be close to 1):" )
self.logger.info( f" Normals : min={np.min(normN):.6f}, max={np.max(normN):.6f}" )
self.logger.info( f" Tangent1 : min={np.min(normT1):.6f}, max={np.max(normT1):.6f}" )
self.logger.info( f" Tangent2 : min={np.min(normT2):.6f}, max={np.max(normT2):.6f}" )
# Check orientation consistency
meanNormal = np.mean( normals, axis=0 )
meanNormal = meanNormal / np.linalg.norm( meanNormal )
dotsWithMean = np.array( [ np.dot( normals[ i ], meanNormal ) for i in range( nCells ) ] )
nReversed = np.sum( dotsWithMean < 0 )
self.logger.info( "Orientation consistency:" )
self.logger.info( f" Mean normal: [{meanNormal[0]:.3f}, {meanNormal[1]:.3f}, {meanNormal[2]:.3f}]" )
self.logger.info( f" Reversed normals: {nReversed}/{nCells} ({nReversed/nCells*100:.1f}%)" )
# Visual consistency check
if nReversed > nCells * 0.1:
self.logger.warning( "More than 10% of normals point in the opposite direction!" )
else:
self.logger.info( "Normals orientation is consistent" )
# Identify problematic cells (poor orthogonality)
badOrtho = ( ( np.abs( dotNormT1 ) > 1e-3 ) | ( np.abs( dotNormT2 ) > 1e-3 ) | ( np.abs( dotT1T2 ) > 1e-3 ) )
nBad = np.sum( badOrtho )
if nBad > 0:
self.logger.warning( f" {nBad} cells with poor orthogonality (|dot| > 1e-3)" )
updateAttribute( surface,
np.maximum.reduce( [ np.abs( dotNormT1 ),
np.abs( dotNormT2 ),
np.abs( dotT1T2 ) ] ),
"orthogonalityError",
Piece.CELLS,
logger=self.logger )
else:
self.logger.info( "All cells have good orthogonality" )
return surface