NAMD
SynchronousCollectives.C
Go to the documentation of this file.
1 #include "CudaRecord.h"
2 #include "CudaUtils.h"
3 #include "NamdTypes.h"
4 
5 #include "SynchronousCollectives.decl.h"
7 #include "Node.h"
8 #include "SimParameters.h"
9 #include "NamdEventsProfiling.h"
10 #include "Priorities.h"
11 
12 #include <cstring> // std::memcpy
13 
14 #include "charm++.h"
15 
16 #if defined(NAMD_CUDA) || defined(NAMD_HIP)
17 
18 /*
19  * PUP (Pack-UnPack) various types that will communicated via Charm++
20  */
21 #if !(defined(__NVCC__) || defined(__HIPCC__))
22 #include <pup.h>
24 #endif // !((__NVCC__) || (__HIPCC__))
25 
27 {
28  if (CkpvAccess(SynchronousCollectives_instance) == NULL) {
29  CkpvAccess(SynchronousCollectives_instance) = this;
30  } else {
31  NAMD_bug("SynchronousCollectives instanced twice on same processor!");
32  }
33 }
34 
36 
38  allPes_ = CkpvAccess(BOCclass_group).synchronousCollectives;
39  currentBarrierAll_ = std::vector<int>(CkNumPes(), 0);
40  currentBarrierSingle_ = std::vector<int>(1, 0);
41 }
42 
43 void SynchronousCollectives::initMasterScope(const int isMasterPe, const int isMasterDevice,
44  const int numDevices, const int deviceIndex, const std::vector<int>& masterPeList) {
45 
46  isMasterPe_ = isMasterPe;
47  numDevices_ = numDevices;
48  deviceIndex_ = deviceIndex;
49  masterPeList_ = masterPeList;
50 
51  currentBarrierMasterPe_ = std::vector<int>(numDevices_, 0);
52  forceBarrierAll(); // Make sure all PEs have set expectedBarrierMasterPe
53 
54  masterPes_ = CProxySection_SynchronousCollectives(allPes_.ckGetGroupID(),
55  masterPeList_.data(), masterPeList_.size());
56  masterPesMulticast_ = CProxySection_SynchronousCollectives(allPes_.ckGetGroupID(),
57  masterPeList_.data(), masterPeList_.size());
58 
59  //
60  // For section broadcasts, we must use the multi-cast library; however, it requires explicitly
61  // defined messages, so the multi-cast section will not be used for the non-reduction sections
62  //
63  CProxy_CkMulticastMgr mcastProxy = CkpvAccess(BOCclass_group).multicastMgr;
64  CkMulticastMgr *mcastPtr = CProxy_CkMulticastMgr(mcastProxy).ckLocalBranch();
65  masterPesMulticast_.ckSectionDelegate(mcastPtr);
66 
67  if (isMasterDevice && isMasterPe_) {
69  setThread(CthSelf());
70  masterPesMulticast_.setupMulticastSection(msg);
71  suspendAndCheck(SynchronousCollectiveScope::single);
72  } else if (isMasterPe_) {
73  suspendAndCheck(SynchronousCollectiveScope::single);
74  }
75 
76  forceBarrierAll(); // Make sure all PEs have set expectedBarrierMasterPe
77 }
78 
79 void SynchronousCollectives::incrementCount(const SynchronousCollectiveScope scope, const int index) {
80  auto& currentBarrier = getBarrier(scope);
81  if (currentBarrier.size() <= index) {
82  NAMD_bug("SynchronousCollectives currentBarrier not large enough");
83  }
84  currentBarrier[index]++;
85 }
86 
87 void SynchronousCollectives::suspendAndCheck(const SynchronousCollectiveScope scope) {
88  auto& currentBarrier = getBarrier(scope);
89  bool done = true;
90  do {
91  CthYield();
92  done = true;
93  for (size_t i = 0; i < currentBarrier.size(); i++) {
94  done = (done && currentBarrier[i]);
95  }
96  } while (!done);
97 
98  for (size_t i = 0; i < currentBarrier.size(); i++) {
99  currentBarrier[i]--;
100  }
101 }
102 
104  int finished = false;
105  switch (waitPhase_) {
106  case 0:
107  break;
108  case 1:
109  finished = true;
110  break;
111  }
112 
113  waitPhase_++;
114  if (!CkMyPe()) {
115  if (!finished) {
116  CkStartQD(CkCallback(CkIndex_SynchronousCollectives::wait(), thisgroup));
117  }
118  }
119 
120  if (finished) {
121  waitPhase_ = 0;
122  CthAwaken(self_awaken_thread_);
123  }
124 }
125 
127  NAMD_EVENT_START(1, NamdProfileEvent::CHARM_WAITANDAWAKEN);
128  setThread(CthSelf());
129  wait();
130  CthSuspend();
131  NAMD_EVENT_STOP(1, NamdProfileEvent::CHARM_WAITANDAWAKEN);
132 }
133 
135  incrementCount(SynchronousCollectiveScope::all, PE);
136 }
137 
138 void SynchronousCollectives::recvBarrierMasterPe(const int deviceIndex) {
139  incrementCount(SynchronousCollectiveScope::master, deviceIndex);
140 }
141 
143  allPes_.recvBarrierAll(CkMyPe());
144  suspendAndCheck(SynchronousCollectiveScope::all);
145 }
146 
148  if (scope == SynchronousCollectiveScope::single) {
149  NAMD_bug("SynchronousCollectives::barrier does not support single scope");
150  }
151 
152  NAMD_EVENT_START(1, NamdProfileEvent::CHARM_BARRIER);
153  if (scope == SynchronousCollectiveScope::all) {
154  if (CkNumNodes() == 1) {
155  CmiNodeBarrier();
156  } else if (currentBarrierMasterPe_.size() == 0) {
157  // If expectedBarrierMasterPe is not set, then we need to
158  // default back to a true all synchronization
159  forceBarrierAll();
160  } else {
161  if (isMasterPe_) {
163  }
164  CmiNodeBarrier();
165  }
166  } else if (isMasterPe_) {
167  masterPes_.recvBarrierMasterPe(deviceIndex_);
168  suspendAndCheck(SynchronousCollectiveScope::master);
169  }
170  NAMD_EVENT_STOP(1, NamdProfileEvent::CHARM_BARRIER);
171 }
172 
174  allPes_.broadcastReductionResult(msg->getSize(), (char*) msg->getData());
175  // TODO: Should this message be deleted here
176 }
177 
179  masterPes_.broadcastReductionResult(msg->getSize(), (char*) msg->getData());
180  // TODO: Should this message be deleted here
181 }
182 
184  std::memcpy(reductionPtr_, (void*) data, n);
185  incrementCount(SynchronousCollectiveScope::single, 0);
186 }
187 
188 template<typename T>
189 std::vector<T> SynchronousCollectives::allReduce(std::vector<T>& data, CkReduction::reducerType type,
191  if (scope == SynchronousCollectiveScope::single) {
192  NAMD_bug("SynchronousCollectives::allreduce does not support single scope");
193  }
194  NAMD_EVENT_START(1, NamdProfileEvent::CHARM_ALLREDUCE);
195 
196  std::vector<T> out;
197  reductionTemp_ = std::vector<T>(data.size());
198  reductionPtr_ = (void*) std::any_cast<std::vector<T>&>(reductionTemp_).data();
199 
200  setThread(CthSelf());
201  if (scope == SynchronousCollectiveScope::all) {
202  CkCallback cb(CkReductionTarget(SynchronousCollectives, handleReductionAll),
203  thisProxy[thisIndex]);
204 
205  contribute(data.size() * sizeof(T), data.data(), type, cb);
206  suspendAndCheck(SynchronousCollectiveScope::single);
207  } else if (isMasterPe_) {
208  CkCallback cb(CkReductionTarget(SynchronousCollectives, handleReductionMaster),
209  thisProxy[thisIndex]);
210 
211  CProxy_CkMulticastMgr mcastProxy = CkpvAccess(BOCclass_group).multicastMgr;
212  CkMulticastMgr *mcastPtr = CProxy_CkMulticastMgr(mcastProxy).ckLocalBranch();
213  mcastPtr->contribute(data.size() * sizeof(T), data.data(), type, reductionCookie_, cb);
214  suspendAndCheck(SynchronousCollectiveScope::single);
215  }
216  out = std::move(std::any_cast<std::vector<T>&>(reductionTemp_));
217  NAMD_EVENT_STOP(1, NamdProfileEvent::CHARM_ALLREDUCE);
218  return out;
219 }
220 
222  CkGetSectionInfo(reductionCookie_, msg);
223  delete msg;
224  incrementCount(SynchronousCollectiveScope::single, 0);
225 }
226 
227 template<typename T>
228 void SynchronousCollectives::sendAllGather(const T& data, const SynchronousCollectiveScope scope, const unsigned int key) {
229  if (scope == SynchronousCollectiveScope::all) {
230  allPes_.recvIndexData(CkMyPe(), data, scope, key);
231  } else if (isMasterPe_) {
232  masterPes_.recvIndexData(deviceIndex_, data, scope, key);
233  }
234 }
235 
236 template<typename T>
237 void SynchronousCollectives::recvIndexData(const int index, const T& data, const SynchronousCollectiveScope scope, const unsigned int key) {
238  const int tempSize = (scope == SynchronousCollectiveScope::all) ? CkNumPes() : numDevices_;
239  auto res = tempData_.try_emplace(key, std::in_place_type<std::vector<T>>, tempSize);
240 
241  std::vector<T>& tempVec = std::any_cast<std::vector<T>&>(res.first->second);
242  if (index >= tempVec.size()) {
243  NAMD_die("SynchronousCollectives::recvIndexData: temp array not large enough");
244  }
245 
246  tempVec[index] = std::move(data);
247  incrementCount(scope, index);
248 }
249 
250 template<typename T>
251 T SynchronousCollectives::retrieveTemp(const unsigned int key) {
252  auto outIter = tempData_.find(key);
253  if (outIter == tempData_.end()) {
254  NAMD_die("SynchronousCollectives::retrieveTemp: could not find data");
255  }
256  auto out = std::move(std::any_cast<T&>(outIter->second));
257  tempData_.erase(key);
258  return out;
259 }
260 
261 template<typename T>
262 std::vector<T> SynchronousCollectives::allGather(const T& data, const SynchronousCollectiveScope scope) {
263  if (scope == SynchronousCollectiveScope::single) {
264  NAMD_bug("SynchronousCollectives::allgather does not support single scope");
265  }
266  NAMD_EVENT_START(1, NamdProfileEvent::CHARM_ALLGATHER);
267 
268  std::vector<T> out;
269  if (scope == SynchronousCollectiveScope::all || isMasterPe_) {
270  const unsigned int key = getKey(scope);
271  sendAllGather<T>(data, scope, key);
272  suspendAndCheck(scope);
273 
274  out = retrieveTemp<std::vector<T>>(key);
275  }
276  NAMD_EVENT_STOP(1, NamdProfileEvent::CHARM_ALLGATHER);
277  return out;
278 }
279 
280 template<typename T>
281 void SynchronousCollectives::sendAlltoallv(const std::vector<T>& data,
282  const SynchronousCollectiveScope scope, const unsigned int key) {
283 
284  CProxy_SynchronousCollectives cp(thisgroup);
285  if (scope == SynchronousCollectiveScope::all) {
286  for (size_t i = 0; i < CkNumPes(); i++) {
287  cp[i].recvIndexData<T>(CkMyPe(), data[i], scope, key);
288  }
289  } else if (isMasterPe_) {
290  for (size_t i = 0; i < numDevices_; i++) {
291  const int PE = masterPeList_[i];
292  cp[PE].recvIndexData<T>(deviceIndex_, data[i], scope, key);
293  }
294  }
295 }
296 
297 template<typename T>
298 std::vector<T> SynchronousCollectives::alltoallv(const std::vector<T>& data, const SynchronousCollectiveScope scope) {
299  if (scope == SynchronousCollectiveScope::single) {
300  NAMD_bug("SynchronousCollectives::alltoallv does not support single scope");
301  }
302  NAMD_EVENT_START(1, NamdProfileEvent::CHARM_ALLTOALL);
303 
304  std::vector<T> out;
305 
306  if (scope == SynchronousCollectiveScope::all || isMasterPe_) {
307  const unsigned int key = getKey(scope);
308  sendAlltoallv<T>(data, scope, key);
309  suspendAndCheck(scope);
310  out = retrieveTemp<std::vector<T>>(key);
311  }
312 
313  NAMD_EVENT_STOP(1, NamdProfileEvent::CHARM_ALLTOALL);
314  return out;
315 }
316 
317 template<typename T>
318 void SynchronousCollectives::sendBroadcast(const T& data, const SynchronousCollectiveScope scope, const unsigned int key) {
319  if (scope == SynchronousCollectiveScope::all) {
320  allPes_.recvBroadcast<T>(data, key);
321  } else if (isMasterPe_) {
322  masterPes_.recvBroadcast<T>(data, key);
323  }
324 }
325 
326 template<typename T>
327 void SynchronousCollectives::recvBroadcast(const T& data, const unsigned int key) {
328  // Since we are only expecting one message, the key should not exist in the map
329  tempData_.insert({key, std::move(data)});
330  incrementCount(SynchronousCollectiveScope::single, 0);
331 }
332 
333 template<typename T>
334 T SynchronousCollectives::broadcast(const T& data, const bool isRoot, const SynchronousCollectiveScope scope) {
335  if (scope == SynchronousCollectiveScope::single) {
336  NAMD_bug("SynchronousCollectives::broadcast does not support single scope");
337  }
338  NAMD_EVENT_START(1, NamdProfileEvent::CHARM_BROADCAST);
339 
340  T out = data; // If we are not participating in the broadcast, just return the input
341  if (scope == SynchronousCollectiveScope::all || isMasterPe_) {
342  const unsigned int key = getKey(scope);
343  if (isRoot) {
344  sendBroadcast(data, scope, key);
345  }
346  suspendAndCheck(SynchronousCollectiveScope::single);
347 
348  out = retrieveTemp<T>(key);
349  }
350  NAMD_EVENT_STOP(1, NamdProfileEvent::CHARM_BROADCAST);
351  return out;
352 }
353 
354 #define INSTANTIATE_ALLGATHER(type) \
355 template std::vector<type> \
356 SynchronousCollectives::allGather<type>(const type&, SynchronousCollectiveScope);
357 
359 INSTANTIATE_ALLGATHER(unsigned long long);
360 #if defined(NAMD_CUDA) || defined(NAMD_HIP)
361 INSTANTIATE_ALLGATHER(cudaIpcMemHandle_t);
362 #endif // NAMD_CUDA || NAMD_HIP
363 INSTANTIATE_ALLGATHER(std::vector<CudaLocalRecord>);
364 INSTANTIATE_ALLGATHER(std::vector<int>);
365 
366 #undef INSTANTIATE_ALLGATHER
367 
368 #define INSTANTIATE_ALLTOALLV(type) \
369 template std::vector<type> \
370 SynchronousCollectives::alltoallv<type>(const std::vector<type>&, SynchronousCollectiveScope);
371 
372 INSTANTIATE_ALLTOALLV(std::vector<int>);
374 
375 #undef INSTANTIATE_ALLTOALLV
376 
377 #define INSTANTIATE_ALLREDUCE(type) \
378 template std::vector<type> \
379 SynchronousCollectives::allReduce<type>(std::vector<type>&, \
380  CkReduction::reducerType, SynchronousCollectiveScope)
381 
382 INSTANTIATE_ALLREDUCE(unsigned int);
383 INSTANTIATE_ALLREDUCE(size_t);
384 INSTANTIATE_ALLREDUCE(double);
385 
386 #undef INSTANTIATE_ALLREDUCE
387 
388 #endif /* NAMD_CUDA || NAMD_HIP */
389 
390 #include "SynchronousCollectives.def.h"
391 
void barrier(const SynchronousCollectiveScope scope)
#define NAMD_EVENT_STOP(eon, id)
std::vector< T > allGather(const T &data, const SynchronousCollectiveScope scope)
void handleReductionMaster(CkReductionMsg *msg)
std::vector< T > allReduce(std::vector< T > &data, CkReduction::reducerType type, const SynchronousCollectiveScope scope)
#define INSTANTIATE_ALLGATHER(type)
void initMasterScope(const int isMasterPe, const int isMasterDevice, const int numDevices, const int deviceIndex, const std::vector< int > &masterPeList)
int masterPeList[MAX_NUM_DEVICES]
Definition: DeviceCUDA.C:95
T broadcast(const T &data, const bool isRoot, const SynchronousCollectiveScope scope)
#define NAMD_EVENT_START(eon, id)
void recvBarrierAll(const int PE)
void NAMD_bug(const char *err_msg)
Definition: common.C:196
void recvBarrierMasterPe(const int deviceIndex)
void broadcastReductionResult(int n, char *data)
void NAMD_die(const char *err_msg)
Definition: common.C:148
SynchronousCollectiveScope
void recvIndexData(const int index, const T &data, const SynchronousCollectiveScope scope, const unsigned int key)
#define INSTANTIATE_ALLREDUCE(type)
void setupMulticastSection(SynchronousCollectivesMulticastMsg *msg)
void handleReductionAll(CkReductionMsg *msg)
PUPbytes(CudaLocalRecord)
#define INSTANTIATE_ALLTOALLV(type)
std::vector< T > alltoallv(const std::vector< T > &data, const SynchronousCollectiveScope scope)
void recvBroadcast(const T &data, const unsigned int key)