GEOSX
LifoStorageHost.hpp
1 /*
2  * ------------------------------------------------------------------------------------------------------------
3  * SPDX-License-Identifier: LGPL-2.1-only
4  *
5  * Copyright (c) 2018-2020 Lawrence Livermore National Security LLC
6  * Copyright (c) 2018-2020 The Board of Trustees of the Leland Stanford Junior University
7  * Copyright (c) 2018-2020 TotalEnergies
8  * Copyright (c) 2019- GEOSX Contributors
9  * All rights reserved
10  *
11  * See top level LICENSE, COPYRIGHT, CONTRIBUTORS, NOTICE, and ACKNOWLEDGEMENTS files for details.
12  * ------------------------------------------------------------------------------------------------------------
13  */
14 #ifndef LIFOSTORAGEHOST_HPP
15 #define LIFOSTORAGEHOST_HPP
16 
17 #include <deque>
18 #include <future>
19 #include <mutex>
20 #include <condition_variable>
21 #include <camp/camp.hpp>
22 #include <sys/stat.h>
23 #include <fcntl.h>
24 #include <unistd.h>
25 #include <algorithm>
26 
27 #include "common/GEOS_RAJA_Interface.hpp"
28 #include "common/TimingMacros.hpp"
29 #include "common/LifoStorageCommon.hpp"
30 
31 namespace geos
32 {
36 template< typename T, typename INDEX_TYPE >
37 class LifoStorageHost : public LifoStorageCommon< T, INDEX_TYPE >
38 {
40 
41 public:
42 
43 
53  LifoStorageHost( std::string name, size_t elemCnt, int numberOfBuffersToStoreOnHost, int maxNumberOfBuffers ):
54  LifoStorageCommon< T, INDEX_TYPE >( name, elemCnt, numberOfBuffersToStoreOnHost, maxNumberOfBuffers ),
55  m_pushToHostFutures( maxNumberOfBuffers ),
56  m_popFromHostFutures( maxNumberOfBuffers )
57  {}
58 
64  void pushAsync( arrayView1d< T > array ) override final
65  {
66  //To be sure 2 pushes are not mixed
67  pushWait();
68  int id = baseLifo::m_bufferCount++;
69  GEOS_ERROR_IF( baseLifo::m_hostDeque.capacity() == 0,
70  "Cannot save on a Lifo without host storage (please set lifoSize, lifoOnDevice and lifoOnHost in xml file)" );
71 
72  std::packaged_task< void() > task( std::bind( [ this ] ( int pushId, arrayView1d< T > pushedArray ) {
73  baseLifo::m_hostDeque.emplaceFront( pushedArray );
74 
75  if( baseLifo::m_maxNumberOfBuffers - pushId > (int)baseLifo::m_hostDeque.capacity() )
76  {
77  LIFO_MARK_SCOPE( geosx::lifoStorage::pushAddTasks );
78  // This buffer will go to host memory, and maybe on disk
79  std::packaged_task< void() > t2( std::bind( &LifoStorageHost< T, INDEX_TYPE >::hostToDisk, this, baseLifo::m_bufferToDiskCount++ ) );
80  {
81  std::unique_lock< std::mutex > l2( baseLifo::m_task_queue_mutex[1] );
82  baseLifo::m_task_queue[1].emplace_back( std::move( t2 ) );
83  }
85  }
86  }, id, array ) );
87  m_pushToHostFutures[id] = task.get_future();
88  {
89  std::unique_lock< std::mutex > lock( baseLifo::m_task_queue_mutex[0] );
90  baseLifo::m_task_queue[0].emplace_back( std::move( task ) );
91  }
93  }
94 
98  void pushWait() override final
99  {
100  if( baseLifo::m_bufferCount > 0 )
101  {
102  m_pushToHostFutures[baseLifo::m_bufferCount-1].wait();
103  }
104  }
105 
111  void popAsync( arrayView1d< T > array ) override final
112  {
113  int id = --baseLifo::m_bufferCount;
114 
115  std::packaged_task< void() > task( std::bind ( [ this ] ( arrayView1d< T > poppedArray ) {
116  baseLifo::m_hostDeque.popFront( poppedArray );
117 
119  {
120  LIFO_MARK_SCOPE( geosx::LifoStorageHost::popAddTasks );
121  // Trigger pull one buffer from host, and maybe from disk
122  std::packaged_task< void() > task2( std::bind( &LifoStorageHost< T, INDEX_TYPE >::diskToHost, this, --baseLifo::m_bufferToDiskCount ) );
123  {
124  std::unique_lock< std::mutex > lock2( baseLifo::m_task_queue_mutex[1] );
125  baseLifo::m_task_queue[1].emplace_back( std::move( task2 ) );
126  }
128  }
129  }, array ) );
130  m_popFromHostFutures[id] = task.get_future();
131  {
132  std::unique_lock< std::mutex > lock( baseLifo::m_task_queue_mutex[0] );
133  baseLifo::m_task_queue[0].emplace_back( std::move( task ) );
134  }
136  }
137 
141  void popWait() override final
142  {
144  {
145  m_popFromHostFutures[baseLifo::m_bufferCount].wait();
146  }
147  }
148 
156  static int computeNumberOfBufferOnDevice( int percent, size_t bufferSize, int maxNumberOfBuffers )
157  {
158  GEOS_UNUSED_VAR( percent, bufferSize, maxNumberOfBuffers );
159  return 0;
160  }
161 
162 private:
163 
164  // Futures associated to push to host in case we have no device buffers
165  std::vector< std::future< void > > m_pushToHostFutures;
166  // Futures associated to pop from host in case we have no device buffers
167  std::vector< std::future< void > > m_popFromHostFutures;
168 };
169 }
170 #endif // LIFOSTORAGEHOST_HPP
#define GEOS_UNUSED_VAR(...)
Mark an unused variable and silence compiler warnings.
Definition: GeosxMacros.hpp:83
#define GEOS_ERROR_IF(EXP, msg)
Conditionally raise a hard error and terminate the program.
Definition: Logger.hpp:107
int m_bufferCount
counter of buffer stored in LIFO
int m_bufferToDiskCount
counter of buffer pushed to disk
FixedSizeDequeWithMutexes< T, INDEX_TYPE > m_hostDeque
Queue of data stored on host memory.
std::deque< std::packaged_task< void() > > m_task_queue[2]
queue of task to be executed by m_worker.
std::mutex m_task_queue_mutex[2]
mutex to protect access to m_task_queue.
std::condition_variable m_task_queue_not_empty_cond[2]
condition used to tell m_worker queue has been filled or processed is stopped.
int m_maxNumberOfBuffers
number of buffers to be inserted into the LIFO
static int computeNumberOfBufferOnDevice(int percent, size_t bufferSize, int maxNumberOfBuffers)
LifoStorageHost(std::string name, size_t elemCnt, int numberOfBuffersToStoreOnHost, int maxNumberOfBuffers)
void popAsync(arrayView1d< T > array) override final
void pushAsync(arrayView1d< T > array) override final
void pushWait() override final
void popWait() override final
ArrayView< T, 1 > arrayView1d
Alias for 1D array view.
Definition: DataTypes.hpp:220
std::string string
String type.
Definition: DataTypes.hpp:131