GEOSX
LifoStorageCommon.hpp
1 /*
2  * ------------------------------------------------------------------------------------------------------------
3  * SPDX-License-Identifier: LGPL-2.1-only
4  *
5  * Copyright (c) 2018-2020 Lawrence Livermore National Security LLC
6  * Copyright (c) 2018-2020 The Board of Trustees of the Leland Stanford Junior University
7  * Copyright (c) 2018-2020 TotalEnergies
8  * Copyright (c) 2019- GEOSX Contributors
9  * All rights reserved
10  *
11  * See top level LICENSE, COPYRIGHT, CONTRIBUTORS, NOTICE, and ACKNOWLEDGEMENTS files for details.
12  * ------------------------------------------------------------------------------------------------------------
13  */
14 #ifndef LIFOSTORAGECOMMON_HPP
15 #define LIFOSTORAGECOMMON_HPP
16 
17 #include <deque>
18 #include <future>
19 #include <mutex>
20 #include <condition_variable>
21 #include <camp/camp.hpp>
22 #include <sys/stat.h>
23 #include <fcntl.h>
24 #include <unistd.h>
25 #include <algorithm>
26 
27 #ifdef LIFO_DISABLE_CALIPER
28 #define LIFO_MARK_FUNCTION
29 #define LIFO_MARK_SCOPE( a )
30 #define LIFO_LOG_RANK( a ) std::cerr << a << std::endl;
31 #else
32 #define LIFO_MARK_FUNCTION GEOS_MARK_FUNCTION
33 #define LIFO_MARK_SCOPE( a ) GEOS_MARK_SCOPE( a )
34 #define LIFO_LOG_RANK( a ) GEOS_LOG_RANK( a )
35 #endif
36 
37 #include "common/GEOS_RAJA_Interface.hpp"
38 #include "common/TimingMacros.hpp"
39 #include "common/FixedSizeDequeWithMutexes.hpp"
40 #include "common/MultiMutexesLock.hpp"
41 
42 
43 namespace geos
44 {
48 template< typename T, typename INDEX_TYPE >
50 {
51 public:
52 
53 
63  LifoStorageCommon( std::string name, size_t elemCnt, int numberOfBuffersToStoreOnHost, int maxNumberOfBuffers ):
64  m_maxNumberOfBuffers( maxNumberOfBuffers ),
65  m_bufferSize( elemCnt*sizeof( T ) ),
66  m_name( name ),
67  m_hostDeque( numberOfBuffersToStoreOnHost, elemCnt, LvArray::MemorySpace::host ),
69  m_continue( true ),
70  m_hasPoppedBefore( false )
71  {
74  }
75 
76  virtual ~LifoStorageCommon()
77  {
78  m_continue = false;
79  m_task_queue_not_empty_cond[0].notify_all();
80  m_task_queue_not_empty_cond[1].notify_all();
81  m_worker[0].join();
82  m_worker[1].join();
83  }
84 
90  virtual void pushAsync( arrayView1d< T > array ) = 0;
91 
95  virtual void pushWait() = 0;
96 
102  virtual void popAsync( arrayView1d< T > array ) = 0;
103 
108  {
109  LIFO_MARK_FUNCTION;
110  //wait the last push to avoid race condition
111  pushWait();
112  if( m_hasPoppedBefore )
113  {
114  // Ensure last pop is finished
115  popWait();
116  }
117  else
118  {
120  LIFO_LOG_RANK( " LIFO : warning number of entered buffered (" << m_bufferCount
121  << ") != max LIFO size (" << m_maxNumberOfBuffers << ") !" );
122  // Ensure that all push step are ended
123  for( int queueId = 0; queueId < 2; queueId++ )
124  {
125  std::unique_lock< std::mutex > lock( m_task_queue_mutex[queueId] );
126  m_task_queue_not_empty_cond[queueId].wait( lock, [ this, &queueId ] { return m_task_queue[queueId].empty(); } );
127  }
128  }
129  m_hasPoppedBefore = true;
130  }
131 
135  virtual void popWait() = 0;
136 
142  bool empty()
143  {
144  return m_bufferCount == 0;
145  }
146 
155  static int computeNumberOfBufferOnHost( int percent, size_t bufferSize, int maxNumberOfBuffers, int numberOfBuffersToStoreOnDevice )
156  {
157  GEOS_ERROR_IF( percent > 100, "Error, percentage of memory should be smallerer than -100, check lifoOnHost (should be greater that -100)" );
158 #if defined( _SC_AVPHYS_PAGES ) && defined( _SC_PAGESIZE )
159  size_t const free = sysconf( _SC_AVPHYS_PAGES ) * sysconf( _SC_PAGESIZE );
160 #else
161  size_t const free = 0;
162  GEOS_ERROR( "To use LifoStorage, both _SC_AVPHYS_PAGES and _SC_PAGESIZE must be defined." );
163 #endif
164  int numberOfBuffersToStoreOnHost = std::max( 1, std::min( ( int )( 0.01 * percent * free / bufferSize ), maxNumberOfBuffers - numberOfBuffersToStoreOnDevice ) );
165  double freeGB = ( ( double ) free ) / ( 1024.0 * 1024.0 * 1024.0 ) / MpiWrapper::nodeCommSize();
166  LIFO_LOG_RANK( " LIFO : available memory on host " << freeGB << " GB" );
167  return numberOfBuffersToStoreOnHost;
168  }
169 
170 protected:
171 
175  size_t m_bufferSize;
180 
187 
188 
190  std::condition_variable m_task_queue_not_empty_cond[2];
192  std::mutex m_task_queue_mutex[2];
194  std::deque< std::packaged_task< void() > > m_task_queue[2];
196  std::thread m_worker[2];
201 
207  void hostToDisk( int id )
208  {
209  LIFO_MARK_FUNCTION;
210  {
211  auto lock = make_multilock( m_hostDeque.m_popMutex, m_hostDeque.m_backMutex );
212  writeOnDisk( m_hostDeque.back().dataIfContiguous(), id );
213  m_hostDeque.pop_back();
214  }
215  m_hostDeque.m_notFullCond.notify_all();
216  }
217 
223  void diskToHost( int id )
224  {
225  LIFO_MARK_FUNCTION;
226  {
227  auto lock = make_multilock( m_hostDeque.m_emplaceMutex, m_hostDeque.m_backMutex );
228  m_hostDeque.m_notFullCond.wait( lock, [ this ] { return !( m_hostDeque.full() ); } );
229  readOnDisk( const_cast< T * >(m_hostDeque.next_back().dataIfContiguous()), id );
230  m_hostDeque.inc_back();
231  }
232  m_hostDeque.m_notEmptyCond.notify_all();
233  }
240  bool dirExists( const std::string & dirName )
241  {
242  struct stat buffer;
243  return stat( dirName.c_str(), &buffer ) == 0;
244  }
245 
252  void writeOnDisk( const T * d, int id )
253  {
254  LIFO_MARK_FUNCTION;
255  std::string fileName = GEOS_FMT( "{}_{:08}.dat", m_name, id );
256  int lastDirSeparator = fileName.find_last_of( "/\\" );
257  std::string dirName = fileName.substr( 0, lastDirSeparator );
258  if( string::npos != (size_t)lastDirSeparator && !dirExists( dirName ))
259  makeDirsForPath( dirName );
260 
261  std::ofstream wf( fileName, std::ios::out | std::ios::binary );
262  GEOS_ERROR_IF( !wf || wf.fail() || !wf.is_open(),
263  "Could not open file "<< fileName << " for writting" );
264  wf.write( (const char *)d, m_bufferSize );
265  GEOS_ERROR_IF( wf.bad() || wf.fail(),
266  "An error occured while writting "<< fileName );
267  wf.close();
268  }
269 
276  void readOnDisk( T * d, int id )
277  {
278  LIFO_MARK_FUNCTION;
279  std::string fileName = GEOS_FMT( "{}_{:08}.dat", m_name, id );
280  std::ifstream wf( fileName, std::ios::in | std::ios::binary );
281  GEOS_ERROR_IF( !wf,
282  "Could not open file "<< fileName << " for reading" );
283  wf.read( (char *)d, m_bufferSize );
284  wf.close();
285  remove( fileName.c_str() );
286  }
287 
288 
289 private:
295  void wait_and_consume_tasks( int queueId )
296  {
297  LIFO_MARK_FUNCTION;
298  while( m_continue )
299  {
300  std::unique_lock< std::mutex > lock( m_task_queue_mutex[queueId] );
301  {
302  LIFO_MARK_SCOPE( waitForTask );
303  m_task_queue_not_empty_cond[queueId].wait( lock, [ this, &queueId ] { return !( m_task_queue[queueId].empty() && m_continue ); } );
304  }
305  if( m_continue == false ) break;
306  std::packaged_task< void() > task( std::move( m_task_queue[queueId].front() ) );
307  m_task_queue[queueId].pop_front();
308  lock.unlock();
309  m_task_queue_not_empty_cond[queueId].notify_all();
310  {
311  LIFO_MARK_SCOPE( runningTask );
312  task();
313  }
314  }
315  }
316 };
317 }
318 #endif // LIFOSTORAGECOMMON_HPP
#define GEOS_ERROR(msg)
Raise a hard error and terminate the program.
Definition: Logger.hpp:122
#define GEOS_ERROR_IF(EXP, msg)
Conditionally raise a hard error and terminate the program.
Definition: Logger.hpp:107
Associate mutexes with the fixedSizeDeque.
virtual void pushAsync(arrayView1d< T > array)=0
int m_bufferCount
counter of buffer stored in LIFO
virtual void popWait()=0
int m_bufferToDiskCount
counter of buffer pushed to disk
static int computeNumberOfBufferOnHost(int percent, size_t bufferSize, int maxNumberOfBuffers, int numberOfBuffersToStoreOnDevice)
std::string m_name
name used to store data on disk
FixedSizeDequeWithMutexes< T, INDEX_TYPE > m_hostDeque
Queue of data stored on host memory.
void readOnDisk(T *d, int id)
virtual void popAsync(arrayView1d< T > array)=0
std::deque< std::packaged_task< void() > > m_task_queue[2]
queue of task to be executed by m_worker.
std::mutex m_task_queue_mutex[2]
mutex to protect access to m_task_queue.
std::thread m_worker[2]
thread to execute tasks.
std::condition_variable m_task_queue_not_empty_cond[2]
condition used to tell m_worker queue has been filled or processed is stopped.
size_t m_bufferSize
size of one buffer in bytes
virtual void pushWait()=0
LifoStorageCommon(std::string name, size_t elemCnt, int numberOfBuffersToStoreOnHost, int maxNumberOfBuffers)
bool dirExists(const std::string &dirName)
int m_bufferToHostCount
counter of buffer pushed to host
bool m_hasPoppedBefore
marker to detect first pop
bool m_continue
boolean to keep m_worker alive.
int m_maxNumberOfBuffers
number of buffers to be inserted into the LIFO
void writeOnDisk(const T *d, int id)
ArrayView< T, 1 > arrayView1d
Alias for 1D array view.
Definition: DataTypes.hpp:220
auto make_multilock(Mutexes &&... mutexes)
Helper to construct MultiMutexesLock (usage auto lock = make_multilock( mutex1, mutex2,...
std::string string
String type.
Definition: DataTypes.hpp:131
void makeDirsForPath(std::string const &path)
Make directories for path.
static int nodeCommSize()
Compute the number of ranks allocated on the same node.