GEOS
LifoStorageCuda.hpp
1 /*
2  * ------------------------------------------------------------------------------------------------------------
3  * SPDX-License-Identifier: LGPL-2.1-only
4  *
5  * Copyright (c) 2016-2024 Lawrence Livermore National Security LLC
6  * Copyright (c) 2018-2024 Total, S.A
7  * Copyright (c) 2018-2024 The Board of Trustees of the Leland Stanford Junior University
8  * Copyright (c) 2023-2024 Chevron
9  * Copyright (c) 2019- GEOS/GEOSX Contributors
10  * All rights reserved
11  *
12  * See top level LICENSE, COPYRIGHT, CONTRIBUTORS, NOTICE, and ACKNOWLEDGEMENTS files for details.
13  * ------------------------------------------------------------------------------------------------------------
14  */
15 
16 #ifndef LIFOSTORAGECUDA_HPP
17 #define LIFOSTORAGECUDA_HPP
18 
19 #include <deque>
20 #include <future>
21 #include <mutex>
22 #include <condition_variable>
23 #include <camp/camp.hpp>
24 #include <sys/stat.h>
25 #include <fcntl.h>
26 #include <unistd.h>
27 #include <algorithm>
28 
29 #include "common/FixedSizeDeque.hpp"
30 #include "common/GEOS_RAJA_Interface.hpp"
31 #include "common/TimingMacros.hpp"
32 #include "common/FixedSizeDequeWithMutexes.hpp"
33 #include "common/MultiMutexesLock.hpp"
34 #include "common/LifoStorageCommon.hpp"
35 
36 namespace geos
37 {
42 template< typename T, typename INDEX_TYPE >
43 class LifoStorageCuda : public LifoStorageCommon< T, INDEX_TYPE >
44 {
46 public:
47 
48 
59  LifoStorageCuda( std::string name, size_t elemCnt, int numberOfBuffersToStoreOnDevice, int numberOfBuffersToStoreOnHost, int maxNumberOfBuffers ):
60  LifoStorageCommon< T, INDEX_TYPE >( name, elemCnt, numberOfBuffersToStoreOnHost, maxNumberOfBuffers ),
61  m_deviceDeque( numberOfBuffersToStoreOnDevice, elemCnt, LvArray::MemorySpace::cuda ),
62  m_pushToDeviceEvents( maxNumberOfBuffers ),
63  m_popFromDeviceEvents( maxNumberOfBuffers )
64  {}
65 
71  void pushAsync( arrayView1d< T > array ) override final
72  {
73  //To be sure 2 pushes are not mixed
74  pushWait();
75  int id = baseLifo::m_bufferCount++;
76  GEOS_ERROR_IF( baseLifo::m_hostDeque.capacity() == 0 && m_deviceDeque.capacity() < baseLifo::m_maxNumberOfBuffers,
77  "Cannot save on a Lifo without host storage (please set lifoSize, lifoOnDevice and lifoOnHost in xml file)" );
78 
79  m_pushToDeviceEvents[id] = m_deviceDeque.emplaceFront( array );
80 
81  if( baseLifo::m_maxNumberOfBuffers - id > (int)m_deviceDeque.capacity() )
82  {
83  LIFO_MARK_SCOPE( geos::lifoStorage::pushAddTasks );
84  // This buffer will go to host memory, and maybe on disk
85  std::packaged_task< void() > task( std::bind( &LifoStorageCuda< T, INDEX_TYPE >::deviceToHost, this, baseLifo::m_bufferToHostCount++ ) );
86  {
87  std::unique_lock< std::mutex > lock( baseLifo::m_task_queue_mutex[0] );
88  baseLifo::m_task_queue[0].emplace_back( std::move( task ) );
89  }
91  }
92  }
93 
97  void pushWait() override final
98  {
99  if( baseLifo::m_bufferCount > 0 )
100  {
101  m_pushToDeviceEvents[baseLifo::m_bufferCount-1].wait();
102  }
103  }
104 
110  void popAsync( arrayView1d< T > array ) override final
111  {
112  LIFO_MARK_FUNCTION;
113  int id = --baseLifo::m_bufferCount;
114 
116  {
117  LIFO_MARK_SCOPE( geos::LifoStorageCuda::popAddTasks );
118  // Trigger pull one buffer from host, and maybe from disk
119  std::packaged_task< void() > task( std::bind( &LifoStorageCuda< T, INDEX_TYPE >::hostToDevice, this, --baseLifo::m_bufferToHostCount, id ) );
120  {
121  std::unique_lock< std::mutex > lock( baseLifo::m_task_queue_mutex[0] );
122  baseLifo::m_task_queue[0].emplace_back( std::move( task ) );
123  }
125  }
126  m_popFromDeviceEvents[id] = m_deviceDeque.popFront( array );
127  }
128 
132  void popWait() override final
133  {
135  {
136  int bufferCount = baseLifo::m_bufferCount;
137  m_popFromDeviceEvents[bufferCount].wait();
138  }
139  }
140 
148  static int computeNumberOfBufferOnDevice( int percent, size_t bufferSize, int maxNumberOfBuffers )
149  {
150  GEOS_ERROR_IF( percent > 100, "Error, percentage of memory should be smaller than 100, check lifoOnDevice (should be greater than -100)" );
151  size_t free, total;
152  GEOS_ERROR_IF( cudaSuccess != cudaMemGetInfo( &free, &total ), "Error getting CUDA device available memory" );
153  double freeGB = ( ( double ) free ) / ( 1024.0 * 1024.0 * 1024.0 );
154  LIFO_LOG_RANK( " LIFO : available memory on device " << freeGB << " GB" );
155  return std::min( ( int )( 0.01 * percent * free / bufferSize ), maxNumberOfBuffers );
156  }
157 
158 private:
159 
165  void deviceToHost( int id )
166  {
167  LIFO_MARK_FUNCTION;
168  // The copy to host will only start when the data is copied on device buffer
169  baseLifo::m_hostDeque.getStream().wait_for( const_cast< camp::resources::Event * >( &m_pushToDeviceEvents[id] ) );
170  baseLifo::m_hostDeque.emplaceFrontFromBack( m_deviceDeque );
171 
172  if( baseLifo::m_maxNumberOfBuffers - id > (int)(m_deviceDeque.capacity() + baseLifo::m_hostDeque.capacity()) )
173  {
174  // This buffer will go to host then maybe to disk
175  std::packaged_task< void() > task( std::bind( &LifoStorageCuda< T, INDEX_TYPE >::hostToDisk, this, baseLifo::m_bufferToDiskCount++ ) );
176  {
177  std::unique_lock< std::mutex > lock( baseLifo::m_task_queue_mutex[1] );
178  baseLifo::m_task_queue[1].emplace_back( std::move( task ) );
179  }
181  }
182  }
183 
190  void hostToDevice( int id, int id_pop )
191  {
192  LIFO_MARK_FUNCTION;
193  // enqueue diskToHost on worker #2 if needed
195  {
196  // This buffer will go to host then to disk
197  std::packaged_task< void() > task( std::bind( &LifoStorageCuda< T, INDEX_TYPE >::diskToHost, this, --baseLifo::m_bufferToDiskCount ) );
198  {
199  std::unique_lock< std::mutex > lock( baseLifo::m_task_queue_mutex[1] );
200  baseLifo::m_task_queue[1].emplace_back( std::move( task ) );
201  }
203  }
204 
205  m_deviceDeque.emplaceBackFromFront( baseLifo::m_hostDeque );
206  }
207 
209  FixedSizeDequeWithMutexes< T, INDEX_TYPE > m_deviceDeque;
210  // Events associated to ith copies to device buffer
211  std::vector< camp::resources::Event > m_pushToDeviceEvents;
212  // Events associated to ith copies from device buffer
213  std::vector< camp::resources::Event > m_popFromDeviceEvents;
214 };
215 }
216 #endif // LIFOSTORAGE_HPP
#define GEOS_ERROR_IF(EXP, msg)
Conditionally raise a hard error and terminate the program.
Definition: Logger.hpp:142
int m_bufferCount
counter of buffer stored in LIFO
int m_bufferToDiskCount
counter of buffer pushed to disk
FixedSizeDequeWithMutexes< T, INDEX_TYPE > m_hostDeque
Queue of data stored on host memory.
std::deque< std::packaged_task< void() > > m_task_queue[2]
queue of task to be executed by m_worker.
std::mutex m_task_queue_mutex[2]
mutex to protect access to m_task_queue.
std::condition_variable m_task_queue_not_empty_cond[2]
condition used to tell m_worker queue has been filled or processed is stopped.
int m_bufferToHostCount
counter of buffer pushed to host
int m_maxNumberOfBuffers
number of buffers to be inserted into the LIFO
void pushWait() override final
void popAsync(arrayView1d< T > array) override final
LifoStorageCuda(std::string name, size_t elemCnt, int numberOfBuffersToStoreOnDevice, int numberOfBuffersToStoreOnHost, int maxNumberOfBuffers)
void popWait() override final
static int computeNumberOfBufferOnDevice(int percent, size_t bufferSize, int maxNumberOfBuffers)
void pushAsync(arrayView1d< T > array) override final
ArrayView< T, 1 > arrayView1d
Alias for 1D array view.
Definition: DataTypes.hpp:180
std::string string
String type.
Definition: DataTypes.hpp:91