GEOS
LifoStorageHost.hpp
1 /*
2  * ------------------------------------------------------------------------------------------------------------
3  * SPDX-License-Identifier: LGPL-2.1-only
4  *
5  * Copyright (c) 2016-2024 Lawrence Livermore National Security LLC
6  * Copyright (c) 2018-2024 TotalEnergies
7  * Copyright (c) 2018-2024 The Board of Trustees of the Leland Stanford Junior University
8  * Copyright (c) 2023-2024 Chevron
9  * Copyright (c) 2019- GEOS/GEOSX Contributors
10  * All rights reserved
11  *
12  * See top level LICENSE, COPYRIGHT, CONTRIBUTORS, NOTICE, and ACKNOWLEDGEMENTS files for details.
13  * ------------------------------------------------------------------------------------------------------------
14  */
15 
16 #ifndef LIFOSTORAGEHOST_HPP
17 #define LIFOSTORAGEHOST_HPP
18 
19 #include <deque>
20 #include <future>
21 #include <mutex>
22 #include <condition_variable>
23 #include <camp/camp.hpp>
24 #include <sys/stat.h>
25 #include <fcntl.h>
26 #include <unistd.h>
27 #include <algorithm>
28 
29 #include "common/GEOS_RAJA_Interface.hpp"
30 #include "common/TimingMacros.hpp"
31 #include "common/LifoStorageCommon.hpp"
32 
33 namespace geos
34 {
38 template< typename T, typename INDEX_TYPE >
39 class LifoStorageHost : public LifoStorageCommon< T, INDEX_TYPE >
40 {
42 
43 public:
44 
45 
55  LifoStorageHost( std::string name, size_t elemCnt, int numberOfBuffersToStoreOnHost, int maxNumberOfBuffers ):
56  LifoStorageCommon< T, INDEX_TYPE >( name, elemCnt, numberOfBuffersToStoreOnHost, maxNumberOfBuffers ),
57  m_pushToHostFutures( maxNumberOfBuffers ),
58  m_popFromHostFutures( maxNumberOfBuffers )
59  {}
60 
66  void pushAsync( arrayView1d< T > array ) override final
67  {
68  //To be sure 2 pushes are not mixed
69  pushWait();
70  int id = baseLifo::m_bufferCount++;
71  GEOS_ERROR_IF( baseLifo::m_hostDeque.capacity() == 0,
72  "Cannot save on a Lifo without host storage (please set lifoSize, lifoOnDevice and lifoOnHost in xml file)" );
73 
74  std::packaged_task< void() > task( std::bind( [ this ] ( int pushId, arrayView1d< T > pushedArray ) {
75  baseLifo::m_hostDeque.emplaceFront( pushedArray );
76 
77  if( baseLifo::m_maxNumberOfBuffers - pushId > (int)baseLifo::m_hostDeque.capacity() )
78  {
79  LIFO_MARK_SCOPE( geos::lifoStorage::pushAddTasks );
80  // This buffer will go to host memory, and maybe on disk
81  std::packaged_task< void() > t2( std::bind( &LifoStorageHost< T, INDEX_TYPE >::hostToDisk, this, baseLifo::m_bufferToDiskCount++ ) );
82  {
83  std::unique_lock< std::mutex > l2( baseLifo::m_task_queue_mutex[1] );
84  baseLifo::m_task_queue[1].emplace_back( std::move( t2 ) );
85  }
87  }
88  }, id, array ) );
89  m_pushToHostFutures[id] = task.get_future();
90  {
91  std::unique_lock< std::mutex > lock( baseLifo::m_task_queue_mutex[0] );
92  baseLifo::m_task_queue[0].emplace_back( std::move( task ) );
93  }
95  }
96 
100  void pushWait() override final
101  {
102  if( baseLifo::m_bufferCount > 0 )
103  {
104  m_pushToHostFutures[baseLifo::m_bufferCount-1].wait();
105  }
106  }
107 
113  void popAsync( arrayView1d< T > array ) override final
114  {
115  int id = --baseLifo::m_bufferCount;
116 
117  std::packaged_task< void() > task( std::bind ( [ this ] ( arrayView1d< T > poppedArray ) {
118  baseLifo::m_hostDeque.popFront( poppedArray );
119 
121  {
122  LIFO_MARK_SCOPE( geos::LifoStorageHost::popAddTasks );
123  // Trigger pull one buffer from host, and maybe from disk
124  std::packaged_task< void() > task2( std::bind( &LifoStorageHost< T, INDEX_TYPE >::diskToHost, this, --baseLifo::m_bufferToDiskCount ) );
125  {
126  std::unique_lock< std::mutex > lock2( baseLifo::m_task_queue_mutex[1] );
127  baseLifo::m_task_queue[1].emplace_back( std::move( task2 ) );
128  }
130  }
131  }, array ) );
132  m_popFromHostFutures[id] = task.get_future();
133  {
134  std::unique_lock< std::mutex > lock( baseLifo::m_task_queue_mutex[0] );
135  baseLifo::m_task_queue[0].emplace_back( std::move( task ) );
136  }
138  }
139 
143  void popWait() override final
144  {
146  {
147  m_popFromHostFutures[baseLifo::m_bufferCount].wait();
148  }
149  }
150 
158  static int computeNumberOfBufferOnDevice( int percent, size_t bufferSize, int maxNumberOfBuffers )
159  {
160  GEOS_UNUSED_VAR( percent, bufferSize, maxNumberOfBuffers );
161  return 0;
162  }
163 
164 private:
165 
166  // Futures associated to push to host in case we have no device buffers
167  std::vector< std::future< void > > m_pushToHostFutures;
168  // Futures associated to pop from host in case we have no device buffers
169  std::vector< std::future< void > > m_popFromHostFutures;
170 };
171 }
172 #endif // LIFOSTORAGEHOST_HPP
#define GEOS_UNUSED_VAR(...)
Mark an unused variable and silence compiler warnings.
Definition: GeosxMacros.hpp:84
#define GEOS_ERROR_IF(EXP, msg)
Conditionally raise a hard error and terminate the program.
Definition: Logger.hpp:142
int m_bufferCount
counter of buffer stored in LIFO
int m_bufferToDiskCount
counter of buffer pushed to disk
FixedSizeDequeWithMutexes< T, INDEX_TYPE > m_hostDeque
Queue of data stored on host memory.
std::deque< std::packaged_task< void() > > m_task_queue[2]
queue of task to be executed by m_worker.
std::mutex m_task_queue_mutex[2]
mutex to protect access to m_task_queue.
std::condition_variable m_task_queue_not_empty_cond[2]
condition used to tell m_worker queue has been filled or processed is stopped.
int m_maxNumberOfBuffers
number of buffers to be inserted into the LIFO
static int computeNumberOfBufferOnDevice(int percent, size_t bufferSize, int maxNumberOfBuffers)
LifoStorageHost(std::string name, size_t elemCnt, int numberOfBuffersToStoreOnHost, int maxNumberOfBuffers)
void popAsync(arrayView1d< T > array) override final
void pushAsync(arrayView1d< T > array) override final
void pushWait() override final
void popWait() override final
ArrayView< T, 1 > arrayView1d
Alias for 1D array view.
Definition: DataTypes.hpp:180
std::string string
String type.
Definition: DataTypes.hpp:91