GEOSX
LifoStorageCuda.hpp
1 /*
2  * ------------------------------------------------------------------------------------------------------------
3  * SPDX-License-Identifier: LGPL-2.1-only
4  *
5  * Copyright (c) 2018-2020 Lawrence Livermore National Security LLC
6  * Copyright (c) 2018-2020 The Board of Trustees of the Leland Stanford Junior University
7  * Copyright (c) 2018-2020 TotalEnergies
8  * Copyright (c) 2019- GEOSX Contributors
9  * All rights reserved
10  *
11  * See top level LICENSE, COPYRIGHT, CONTRIBUTORS, NOTICE, and ACKNOWLEDGEMENTS files for details.
12  * ------------------------------------------------------------------------------------------------------------
13  */
14 #ifndef LIFOSTORAGECUDA_HPP
15 #define LIFOSTORAGECUDA_HPP
16 
17 #include <deque>
18 #include <future>
19 #include <mutex>
20 #include <condition_variable>
21 #include <camp/camp.hpp>
22 #include <sys/stat.h>
23 #include <fcntl.h>
24 #include <unistd.h>
25 #include <algorithm>
26 
27 #include "common/FixedSizeDeque.hpp"
28 #include "common/GEOS_RAJA_Interface.hpp"
29 #include "common/TimingMacros.hpp"
30 #include "common/FixedSizeDequeWithMutexes.hpp"
31 #include "common/MultiMutexesLock.hpp"
32 #include "common/LifoStorageCommon.hpp"
33 
34 namespace geos
35 {
40 template< typename T, typename INDEX_TYPE >
41 class LifoStorageCuda : public LifoStorageCommon< T, INDEX_TYPE >
42 {
44 public:
45 
46 
57  LifoStorageCuda( std::string name, size_t elemCnt, int numberOfBuffersToStoreOnDevice, int numberOfBuffersToStoreOnHost, int maxNumberOfBuffers ):
58  LifoStorageCommon< T, INDEX_TYPE >( name, elemCnt, numberOfBuffersToStoreOnHost, maxNumberOfBuffers ),
59  m_deviceDeque( numberOfBuffersToStoreOnDevice, elemCnt, LvArray::MemorySpace::cuda ),
60  m_pushToDeviceEvents( maxNumberOfBuffers ),
61  m_popFromDeviceEvents( maxNumberOfBuffers )
62  {}
63 
69  void pushAsync( arrayView1d< T > array ) override final
70  {
71  //To be sure 2 pushes are not mixed
72  pushWait();
73  int id = baseLifo::m_bufferCount++;
74  GEOS_ERROR_IF( baseLifo::m_hostDeque.capacity() == 0 && m_deviceDeque.capacity() < baseLifo::m_maxNumberOfBuffers,
75  "Cannot save on a Lifo without host storage (please set lifoSize, lifoOnDevice and lifoOnHost in xml file)" );
76 
77  m_pushToDeviceEvents[id] = m_deviceDeque.emplaceFront( array );
78 
79  if( baseLifo::m_maxNumberOfBuffers - id > (int)m_deviceDeque.capacity() )
80  {
81  LIFO_MARK_SCOPE( geosx::lifoStorage::pushAddTasks );
82  // This buffer will go to host memory, and maybe on disk
83  std::packaged_task< void() > task( std::bind( &LifoStorageCuda< T, INDEX_TYPE >::deviceToHost, this, baseLifo::m_bufferToHostCount++ ) );
84  {
85  std::unique_lock< std::mutex > lock( baseLifo::m_task_queue_mutex[0] );
86  baseLifo::m_task_queue[0].emplace_back( std::move( task ) );
87  }
89  }
90  }
91 
95  void pushWait() override final
96  {
97  if( baseLifo::m_bufferCount > 0 )
98  {
99  m_pushToDeviceEvents[baseLifo::m_bufferCount-1].wait();
100  }
101  }
102 
108  void popAsync( arrayView1d< T > array ) override final
109  {
110  LIFO_MARK_FUNCTION;
111  int id = --baseLifo::m_bufferCount;
112 
114  {
115  LIFO_MARK_SCOPE( geosx::LifoStorageCuda::popAddTasks );
116  // Trigger pull one buffer from host, and maybe from disk
117  std::packaged_task< void() > task( std::bind( &LifoStorageCuda< T, INDEX_TYPE >::hostToDevice, this, --baseLifo::m_bufferToHostCount, id ) );
118  {
119  std::unique_lock< std::mutex > lock( baseLifo::m_task_queue_mutex[0] );
120  baseLifo::m_task_queue[0].emplace_back( std::move( task ) );
121  }
123  }
124  m_popFromDeviceEvents[id] = m_deviceDeque.popFront( array );
125  }
126 
130  void popWait() override final
131  {
133  {
134  int bufferCount = baseLifo::m_bufferCount;
135  auto cuda_event = m_popFromDeviceEvents[bufferCount].try_get< camp::resources::CudaEvent >();
136  if( cuda_event ) cudaEventSynchronize( cuda_event->getCudaEvent_t() );
137  }
138  }
139 
147  static int computeNumberOfBufferOnDevice( int percent, size_t bufferSize, int maxNumberOfBuffers )
148  {
149  GEOS_ERROR_IF( percent > 100, "Error, percentage of memory should be smaller than 100, check lifoOnDevice (should be greater than -100)" );
150  size_t free, total;
151  GEOS_ERROR_IF( cudaSuccess != cudaMemGetInfo( &free, &total ), "Error getting CUDA device available memory" );
152  double freeGB = ( ( double ) free ) / ( 1024.0 * 1024.0 * 1024.0 );
153  LIFO_LOG_RANK( " LIFO : available memory on device " << freeGB << " GB" );
154  return std::min( ( int )( 0.01 * percent * free / bufferSize ), maxNumberOfBuffers );
155  }
156 
157 private:
158 
164  void deviceToHost( int id )
165  {
166  LIFO_MARK_FUNCTION;
167  // The copy to host will only start when the data is copied on device buffer
168  baseLifo::m_hostDeque.getStream().wait_for( const_cast< camp::resources::Event * >( &m_pushToDeviceEvents[id] ) );
169  baseLifo::m_hostDeque.emplaceFrontFromBack( m_deviceDeque );
170 
171  if( baseLifo::m_maxNumberOfBuffers - id > (int)(m_deviceDeque.capacity() + baseLifo::m_hostDeque.capacity()) )
172  {
173  // This buffer will go to host then maybe to disk
174  std::packaged_task< void() > task( std::bind( &LifoStorageCuda< T, INDEX_TYPE >::hostToDisk, this, baseLifo::m_bufferToDiskCount++ ) );
175  {
176  std::unique_lock< std::mutex > lock( baseLifo::m_task_queue_mutex[1] );
177  baseLifo::m_task_queue[1].emplace_back( std::move( task ) );
178  }
180  }
181  }
182 
189  void hostToDevice( int id, int id_pop )
190  {
191  LIFO_MARK_FUNCTION;
192  // enqueue diskToHost on worker #2 if needed
194  {
195  // This buffer will go to host then to disk
196  std::packaged_task< void() > task( std::bind( &LifoStorageCuda< T, INDEX_TYPE >::diskToHost, this, --baseLifo::m_bufferToDiskCount ) );
197  {
198  std::unique_lock< std::mutex > lock( baseLifo::m_task_queue_mutex[1] );
199  baseLifo::m_task_queue[1].emplace_back( std::move( task ) );
200  }
202  }
203 
204  m_deviceDeque.emplaceBackFromFront( baseLifo::m_hostDeque );
205  }
206 
208  FixedSizeDequeWithMutexes< T, INDEX_TYPE > m_deviceDeque;
209  // Events associated to ith copies to device buffer
210  std::vector< camp::resources::Event > m_pushToDeviceEvents;
211  // Events associated to ith copies from device buffer
212  std::vector< camp::resources::Event > m_popFromDeviceEvents;
213 };
214 }
215 #endif // LIFOSTORAGE_HPP
#define GEOS_ERROR_IF(EXP, msg)
Conditionally raise a hard error and terminate the program.
Definition: Logger.hpp:107
int m_bufferCount
counter of buffer stored in LIFO
int m_bufferToDiskCount
counter of buffer pushed to disk
FixedSizeDequeWithMutexes< T, INDEX_TYPE > m_hostDeque
Queue of data stored on host memory.
std::deque< std::packaged_task< void() > > m_task_queue[2]
queue of task to be executed by m_worker.
std::mutex m_task_queue_mutex[2]
mutex to protect access to m_task_queue.
std::condition_variable m_task_queue_not_empty_cond[2]
condition used to tell m_worker queue has been filled or processed is stopped.
int m_bufferToHostCount
counter of buffer pushed to host
int m_maxNumberOfBuffers
number of buffers to be inserted into the LIFO
void pushWait() override final
void popAsync(arrayView1d< T > array) override final
LifoStorageCuda(std::string name, size_t elemCnt, int numberOfBuffersToStoreOnDevice, int numberOfBuffersToStoreOnHost, int maxNumberOfBuffers)
void popWait() override final
static int computeNumberOfBufferOnDevice(int percent, size_t bufferSize, int maxNumberOfBuffers)
void pushAsync(arrayView1d< T > array) override final
ArrayView< T, 1 > arrayView1d
Alias for 1D array view.
Definition: DataTypes.hpp:220
std::string string
String type.
Definition: DataTypes.hpp:131