GEOS
LifoStorageCommon.hpp
1 /*
2  * ------------------------------------------------------------------------------------------------------------
3  * SPDX-License-Identifier: LGPL-2.1-only
4  *
5  * Copyright (c) 2016-2024 Lawrence Livermore National Security LLC
6  * Copyright (c) 2018-2024 TotalEnergies
7  * Copyright (c) 2018-2024 The Board of Trustees of the Leland Stanford Junior University
8  * Copyright (c) 2023-2024 Chevron
9  * Copyright (c) 2019- GEOS/GEOSX Contributors
10  * All rights reserved
11  *
12  * See top level LICENSE, COPYRIGHT, CONTRIBUTORS, NOTICE, and ACKNOWLEDGEMENTS files for details.
13  * ------------------------------------------------------------------------------------------------------------
14  */
15 
16 #ifndef LIFOSTORAGECOMMON_HPP
17 #define LIFOSTORAGECOMMON_HPP
18 
19 #include <deque>
20 #include <future>
21 #include <mutex>
22 #include <condition_variable>
23 #include <camp/camp.hpp>
24 #include <sys/stat.h>
25 #include <fcntl.h>
26 #include <unistd.h>
27 #include <algorithm>
28 
29 #ifdef LIFO_DISABLE_CALIPER
30 #define LIFO_MARK_FUNCTION
31 #define LIFO_MARK_SCOPE( a )
32 #define LIFO_LOG_RANK( a ) std::cerr << a << std::endl;
33 #else
34 #define LIFO_MARK_FUNCTION GEOS_MARK_FUNCTION
35 #define LIFO_MARK_SCOPE( a ) GEOS_MARK_SCOPE( a )
36 #define LIFO_LOG_RANK( a ) GEOS_LOG_RANK( a )
37 #endif
38 
39 #include "common/GEOS_RAJA_Interface.hpp"
40 #include "common/TimingMacros.hpp"
41 #include "common/FixedSizeDequeWithMutexes.hpp"
42 #include "common/MultiMutexesLock.hpp"
43 
44 
45 namespace geos
46 {
50 template< typename T, typename INDEX_TYPE >
52 {
53 public:
54 
55 
65  LifoStorageCommon( std::string name, size_t elemCnt, int numberOfBuffersToStoreOnHost, int maxNumberOfBuffers ):
66  m_maxNumberOfBuffers( maxNumberOfBuffers ),
67  m_bufferSize( elemCnt*sizeof( T ) ),
68  m_name( name ),
69  m_hostDeque( numberOfBuffersToStoreOnHost, elemCnt, LvArray::MemorySpace::host ),
71  m_continue( true ),
72  m_hasPoppedBefore( false )
73  {
76  }
77 
78  virtual ~LifoStorageCommon()
79  {
80  m_continue = false;
81  m_task_queue_not_empty_cond[0].notify_all();
82  m_task_queue_not_empty_cond[1].notify_all();
83  m_worker[0].join();
84  m_worker[1].join();
85  }
86 
92  virtual void pushAsync( arrayView1d< T > array ) = 0;
93 
97  virtual void pushWait() = 0;
98 
104  virtual void popAsync( arrayView1d< T > array ) = 0;
105 
110  {
111  LIFO_MARK_FUNCTION;
112  //wait the last push to avoid race condition
113  pushWait();
114  if( m_hasPoppedBefore )
115  {
116  // Ensure last pop is finished
117  popWait();
118  }
119  else
120  {
122  LIFO_LOG_RANK( " LIFO : warning number of entered buffered (" << m_bufferCount
123  << ") != max LIFO size (" << m_maxNumberOfBuffers << ") !" );
124  // Ensure that all push step are ended
125  for( int queueId = 0; queueId < 2; queueId++ )
126  {
127  std::unique_lock< std::mutex > lock( m_task_queue_mutex[queueId] );
128  m_task_queue_not_empty_cond[queueId].wait( lock, [ this, &queueId ] { return m_task_queue[queueId].empty(); } );
129  }
130  }
131  m_hasPoppedBefore = true;
132  }
133 
137  virtual void popWait() = 0;
138 
144  bool empty()
145  {
146  return m_bufferCount == 0;
147  }
148 
157  static int computeNumberOfBufferOnHost( int percent, size_t bufferSize, int maxNumberOfBuffers, int numberOfBuffersToStoreOnDevice )
158  {
159  GEOS_ERROR_IF( percent > 100, "Error, percentage of memory should be smallerer than -100, check lifoOnHost (should be greater that -100)" );
160 #if defined( _SC_AVPHYS_PAGES ) && defined( _SC_PAGESIZE )
161  size_t const free = sysconf( _SC_AVPHYS_PAGES ) * sysconf( _SC_PAGESIZE );
162 #else
163  size_t const free = 0;
164  GEOS_ERROR( "To use LifoStorage, both _SC_AVPHYS_PAGES and _SC_PAGESIZE must be defined." );
165 #endif
166  int numberOfBuffersToStoreOnHost = std::max( 1, std::min( ( int )( 0.01 * percent * free / bufferSize ), maxNumberOfBuffers - numberOfBuffersToStoreOnDevice ) );
167  double freeGB = ( ( double ) free ) / ( 1024.0 * 1024.0 * 1024.0 ) / MpiWrapper::nodeCommSize();
168  LIFO_LOG_RANK( " LIFO : available memory on host " << freeGB << " GB" );
169  return numberOfBuffersToStoreOnHost;
170  }
171 
172 protected:
173 
177  size_t m_bufferSize;
182 
189 
190 
192  std::condition_variable m_task_queue_not_empty_cond[2];
194  std::mutex m_task_queue_mutex[2];
196  std::deque< std::packaged_task< void() > > m_task_queue[2];
198  std::thread m_worker[2];
203 
209  void hostToDisk( int id )
210  {
211  LIFO_MARK_FUNCTION;
212  {
213  auto lock = make_multilock( m_hostDeque.m_popMutex, m_hostDeque.m_backMutex );
214  writeOnDisk( m_hostDeque.back().dataIfContiguous(), id );
215  m_hostDeque.pop_back();
216  }
217  m_hostDeque.m_notFullCond.notify_all();
218  }
219 
225  void diskToHost( int id )
226  {
227  LIFO_MARK_FUNCTION;
228  {
229  auto lock = make_multilock( m_hostDeque.m_emplaceMutex, m_hostDeque.m_backMutex );
230  m_hostDeque.m_notFullCond.wait( lock, [ this ] { return !( m_hostDeque.full() ); } );
231  readOnDisk( const_cast< T * >(m_hostDeque.next_back().dataIfContiguous()), id );
232  m_hostDeque.inc_back();
233  }
234  m_hostDeque.m_notEmptyCond.notify_all();
235  }
242  bool dirExists( const std::string & dirName )
243  {
244  struct stat buffer;
245  return stat( dirName.c_str(), &buffer ) == 0;
246  }
247 
254  void writeOnDisk( const T * d, int id )
255  {
256  LIFO_MARK_FUNCTION;
257  std::string fileName = GEOS_FMT( "{}_{:08}.dat", m_name, id );
258  int lastDirSeparator = fileName.find_last_of( "/\\" );
259  std::string dirName = fileName.substr( 0, lastDirSeparator );
260  if( string::npos != (size_t)lastDirSeparator && !dirExists( dirName ))
261  makeDirsForPath( dirName );
262 
263  std::ofstream wf( fileName, std::ios::out | std::ios::binary );
264  GEOS_ERROR_IF( !wf || wf.fail() || !wf.is_open(),
265  "Could not open file "<< fileName << " for writting" );
266  wf.write( (const char *)d, m_bufferSize );
267  GEOS_ERROR_IF( wf.bad() || wf.fail(),
268  "An error occured while writting "<< fileName );
269  wf.close();
270  }
271 
278  void readOnDisk( T * d, int id )
279  {
280  LIFO_MARK_FUNCTION;
281  std::string fileName = GEOS_FMT( "{}_{:08}.dat", m_name, id );
282  std::ifstream wf( fileName, std::ios::in | std::ios::binary );
283  GEOS_ERROR_IF( !wf,
284  "Could not open file "<< fileName << " for reading" );
285  wf.read( (char *)d, m_bufferSize );
286  wf.close();
287  remove( fileName.c_str() );
288  }
289 
290 
291 private:
297  void wait_and_consume_tasks( int queueId )
298  {
299  LIFO_MARK_FUNCTION;
300  while( m_continue )
301  {
302  std::unique_lock< std::mutex > lock( m_task_queue_mutex[queueId] );
303  {
304  LIFO_MARK_SCOPE( waitForTask );
305  m_task_queue_not_empty_cond[queueId].wait( lock, [ this, &queueId ] { return !( m_task_queue[queueId].empty() && m_continue ); } );
306  }
307  if( m_continue == false ) break;
308  std::packaged_task< void() > task( std::move( m_task_queue[queueId].front() ) );
309  m_task_queue[queueId].pop_front();
310  lock.unlock();
311  m_task_queue_not_empty_cond[queueId].notify_all();
312  {
313  LIFO_MARK_SCOPE( runningTask );
314  task();
315  }
316  }
317  }
318 };
319 }
320 #endif // LIFOSTORAGECOMMON_HPP
#define GEOS_ERROR(msg)
Raise a hard error and terminate the program.
Definition: Logger.hpp:157
#define GEOS_ERROR_IF(EXP, msg)
Conditionally raise a hard error and terminate the program.
Definition: Logger.hpp:142
Associate mutexes with the fixedSizeDeque.
virtual void pushAsync(arrayView1d< T > array)=0
int m_bufferCount
counter of buffer stored in LIFO
virtual void popWait()=0
int m_bufferToDiskCount
counter of buffer pushed to disk
static int computeNumberOfBufferOnHost(int percent, size_t bufferSize, int maxNumberOfBuffers, int numberOfBuffersToStoreOnDevice)
std::string m_name
name used to store data on disk
FixedSizeDequeWithMutexes< T, INDEX_TYPE > m_hostDeque
Queue of data stored on host memory.
void readOnDisk(T *d, int id)
virtual void popAsync(arrayView1d< T > array)=0
std::deque< std::packaged_task< void() > > m_task_queue[2]
queue of task to be executed by m_worker.
std::mutex m_task_queue_mutex[2]
mutex to protect access to m_task_queue.
std::thread m_worker[2]
thread to execute tasks.
std::condition_variable m_task_queue_not_empty_cond[2]
condition used to tell m_worker queue has been filled or processed is stopped.
size_t m_bufferSize
size of one buffer in bytes
virtual void pushWait()=0
LifoStorageCommon(std::string name, size_t elemCnt, int numberOfBuffersToStoreOnHost, int maxNumberOfBuffers)
bool dirExists(const std::string &dirName)
int m_bufferToHostCount
counter of buffer pushed to host
bool m_hasPoppedBefore
marker to detect first pop
bool m_continue
boolean to keep m_worker alive.
int m_maxNumberOfBuffers
number of buffers to be inserted into the LIFO
void writeOnDisk(const T *d, int id)
ArrayView< T, 1 > arrayView1d
Alias for 1D array view.
Definition: DataTypes.hpp:180
auto make_multilock(Mutexes &&... mutexes)
Helper to construct MultiMutexesLock (usage auto lock = make_multilock( mutex1, mutex2,...
std::string string
String type.
Definition: DataTypes.hpp:91
void makeDirsForPath(std::string const &path)
Make directories for path.
static int nodeCommSize()
Compute the number of ranks allocated on the same node.