Galois
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
GluonSubstrate.h
Go to the documentation of this file.
1 /*
2  * This file belongs to the Galois project, a C++ library for exploiting
3  * parallelism. The code is being released under the terms of the 3-Clause BSD
4  * License (a copy is located in LICENSE.txt at the top-level directory).
5  *
6  * Copyright (C) 2019, The University of Texas at Austin. All rights reserved.
7  * UNIVERSITY EXPRESSLY DISCLAIMS ANY AND ALL WARRANTIES CONCERNING THIS
8  * SOFTWARE AND DOCUMENTATION, INCLUDING ANY WARRANTIES OF MERCHANTABILITY,
9  * FITNESS FOR ANY PARTICULAR PURPOSE, NON-INFRINGEMENT AND WARRANTIES OF
10  * PERFORMANCE, AND ANY WARRANTY THAT MIGHT OTHERWISE ARISE FROM COURSE OF
11  * DEALING OR USAGE OF TRADE. NO WARRANTY IS EITHER EXPRESS OR IMPLIED WITH
12  * RESPECT TO THE USE OF THE SOFTWARE OR DOCUMENTATION. Under no circumstances
13  * shall University be liable for incidental, special, indirect, direct or
14  * consequential damages or loss of profits, interruption of business, or
15  * related expenses which may arise from use of Software or Documentation,
16  * including but not limited to those resulting from defects in Software and/or
17  * Documentation, or loss or inaccuracy of data of any kind.
18  */
19 
26 #ifndef _GALOIS_GLUONSUB_H_
27 #define _GALOIS_GLUONSUB_H_
28 
29 #include <unordered_map>
30 #include <fstream>
31 
36 #include "galois/DynamicBitset.h"
37 
38 #ifdef GALOIS_ENABLE_GPU
39 #include "galois/cuda/HostDecls.h"
40 #endif
41 
42 #include "galois/runtime/BareMPI.h"
43 
44 // TODO find a better way to do this without globals
47 
48 #ifdef GALOIS_USE_BARE_MPI
49 extern BareMPI bare_mpi;
50 #endif
51 
60 };
69 };
70 
71 namespace galois {
72 namespace graphs {
73 
82 template <typename GraphTy>
84 private:
86  enum SyncType {
87  syncReduce,
88  syncBroadcast
89  };
90 
92  constexpr static const char* const RNAME = "Gluon";
93 
95  GraphTy& userGraph;
96  const unsigned id;
97  bool transposed;
98  bool isVertexCut;
99  std::pair<unsigned, unsigned> cartesianGrid;
100  bool partitionAgnostic;
101  DataCommMode substrateDataMode;
102  const uint32_t
103  numHosts;
104  uint32_t num_run;
105  uint32_t num_round;
106  bool isCartCut;
107 
108  // bitvector status hasn't been maintained
115  BITVECTOR_STATUS* currentBVFlag;
116 
117  // memoization optimization
119  std::vector<std::vector<size_t>> masterNodes;
122  std::vector<std::vector<size_t>>& mirrorNodes;
124  size_t maxSharedSize;
125 
126 #ifdef GALOIS_USE_BARE_MPI
127  std::vector<MPI_Group> mpi_identity_groups;
128 #endif
129  // Used for efficient comms
130  galois::DynamicBitSet syncBitset;
132 
139  void reset_bitset(SyncType syncType,
140  void (*bitset_reset_range)(size_t, size_t)) {
141  size_t numMasters = userGraph.numMasters();
142  if (numMasters > 0) {
143  // note this assumes masters are from 0 -> a number; CuSP should
144  // do this automatically
145  if (syncType == syncBroadcast) { // reset masters
146  bitset_reset_range(0, numMasters - 1);
147  } else {
148  assert(syncType == syncReduce);
149  // mirrors occur after masters
150  if (numMasters < userGraph.size()) {
151  bitset_reset_range(numMasters, userGraph.size() - 1);
152  }
153  }
154  } else { // all things are mirrors
155  // only need to reset if reduce
156  if (syncType == syncReduce) {
157  if (userGraph.size() > 0) {
158  bitset_reset_range(0, userGraph.size() - 1);
159  }
160  }
161  }
162  }
163 
165  void inline incrementEvilPhase() {
167  // limit defined by MPI or LCI
169  static_cast<uint32_t>(std::numeric_limits<int16_t>::max())) {
171  }
172  }
173 
175  // Proxy communication setup
177 
181  void exchangeProxyInfo() {
183 
184  // send off the mirror nodes
185  for (unsigned x = 0; x < numHosts; ++x) {
186  if (x == id)
187  continue;
188 
190  gSerialize(b, mirrorNodes[x]);
191  net.sendTagged(x, galois::runtime::evilPhase, b);
192  }
193 
194  // receive the mirror nodes
195  for (unsigned x = 0; x < numHosts; ++x) {
196  if (x == id)
197  continue;
198 
199  decltype(net.recieveTagged(galois::runtime::evilPhase, nullptr)) p;
200  do {
201  p = net.recieveTagged(galois::runtime::evilPhase, nullptr);
202  } while (!p);
203 
204  galois::runtime::gDeserialize(p->second, masterNodes[p->first]);
205  }
206  incrementEvilPhase();
207  }
208 
213  void sendInfoToHost() {
215 
216  uint64_t global_total_mirror_nodes =
217  userGraph.size() - userGraph.numMasters();
218  uint64_t global_total_owned_nodes = userGraph.numMasters();
219 
220  // send info to host
221  for (unsigned x = 0; x < numHosts; ++x) {
222  if (x == id)
223  continue;
224 
226  gSerialize(b, global_total_mirror_nodes, global_total_owned_nodes);
227  net.sendTagged(x, galois::runtime::evilPhase, b);
228  }
229 
230  // receive
231  for (unsigned x = 0; x < numHosts; ++x) {
232  if (x == id)
233  continue;
234 
235  decltype(net.recieveTagged(galois::runtime::evilPhase, nullptr)) p;
236  do {
237  p = net.recieveTagged(galois::runtime::evilPhase, nullptr);
238  } while (!p);
239 
240  uint64_t total_mirror_nodes_from_others;
241  uint64_t total_owned_nodes_from_others;
242  galois::runtime::gDeserialize(p->second, total_mirror_nodes_from_others,
243  total_owned_nodes_from_others);
244  global_total_mirror_nodes += total_mirror_nodes_from_others;
245  global_total_owned_nodes += total_owned_nodes_from_others;
246  }
247  incrementEvilPhase();
248 
249  assert(userGraph.globalSize() == global_total_owned_nodes);
250  // report stats
251  if (net.ID == 0) {
252  reportProxyStats(global_total_mirror_nodes, global_total_owned_nodes);
253  }
254  }
255 
260  void setupCommunication() {
261  galois::CondStatTimer<MORE_DIST_STATS> Tcomm_setup("CommunicationSetupTime",
262  RNAME);
263 
264  // barrier so that all hosts start the timer together
266 
267  Tcomm_setup.start();
268 
269  // Exchange information for memoization optimization.
270  exchangeProxyInfo();
271  // convert the global ids stored in the master/mirror nodes arrays to local
272  // ids
273  // TODO: use 32-bit distinct vectors for masters and mirrors from here on
274  for (uint32_t h = 0; h < masterNodes.size(); ++h) {
276  galois::iterate(size_t{0}, masterNodes[h].size()),
277  [&](size_t n) {
278  masterNodes[h][n] = userGraph.getLID(masterNodes[h][n]);
279  },
280 #if GALOIS_COMM_STATS
281  galois::loopname(get_run_identifier("MasterNodes").c_str()),
282 #endif
283  galois::no_stats());
284  }
285 
286  for (uint32_t h = 0; h < mirrorNodes.size(); ++h) {
288  galois::iterate(size_t{0}, mirrorNodes[h].size()),
289  [&](size_t n) {
290  mirrorNodes[h][n] = userGraph.getLID(mirrorNodes[h][n]);
291  },
292 #if GALOIS_COMM_STATS
293  galois::loopname(get_run_identifier("MirrorNodes").c_str()),
294 #endif
295  galois::no_stats());
296  }
297 
298  Tcomm_setup.stop();
299 
300  maxSharedSize = 0;
301  // report masters/mirrors to/from other hosts as statistics
302  for (auto x = 0U; x < masterNodes.size(); ++x) {
303  if (x == id)
304  continue;
305  std::string master_nodes_str =
306  "MasterNodesFrom_" + std::to_string(id) + "_To_" + std::to_string(x);
307  galois::runtime::reportStatCond_Tsum<MORE_DIST_STATS>(
308  RNAME, master_nodes_str, masterNodes[x].size());
309  if (masterNodes[x].size() > maxSharedSize) {
310  maxSharedSize = masterNodes[x].size();
311  }
312  }
313 
314  for (auto x = 0U; x < mirrorNodes.size(); ++x) {
315  if (x == id)
316  continue;
317  std::string mirror_nodes_str =
318  "MirrorNodesFrom_" + std::to_string(x) + "_To_" + std::to_string(id);
319  galois::runtime::reportStatCond_Tsum<MORE_DIST_STATS>(
320  RNAME, mirror_nodes_str, mirrorNodes[x].size());
321  if (mirrorNodes[x].size() > maxSharedSize) {
322  maxSharedSize = mirrorNodes[x].size();
323  }
324  }
325 
326  sendInfoToHost();
327 
328  // do not track memory usage of partitioning
330  net.resetMemUsage();
331  }
332 
341  void reportProxyStats(uint64_t global_total_mirror_nodes,
342  uint64_t GALOIS_UNUSED(global_total_owned_nodes)) {
343  float replication_factor =
344  (float)(global_total_mirror_nodes + userGraph.globalSize()) /
345  (float)userGraph.globalSize();
346  galois::runtime::reportStat_Single(RNAME, "ReplicationFactor",
347  replication_factor);
348 
349  galois::runtime::reportStatCond_Single<MORE_DIST_STATS>(
350  RNAME, "TotalNodes", userGraph.globalSize());
351  galois::runtime::reportStatCond_Single<MORE_DIST_STATS>(
352  RNAME, "TotalGlobalMirrorNodes", global_total_mirror_nodes);
353  }
354 
356  // Initializers
358 
362  void initBareMPI() {
363 #ifdef GALOIS_USE_BARE_MPI
364  if (bare_mpi == noBareMPI)
365  return;
366 
367 #ifdef GALOIS_USE_LCI
368  // sanity check of ranks
369  int taskRank;
370  MPI_Comm_rank(MPI_COMM_WORLD, &taskRank);
371  if ((unsigned)taskRank != id)
372  GALOIS_DIE("mismatch in MPI rank");
373  int numTasks;
374  MPI_Comm_size(MPI_COMM_WORLD, &numTasks);
375  if ((unsigned)numTasks != numHosts)
376  GALOIS_DIE("mismatch in MPI rank");
377 #endif
378  // group setup
379  MPI_Group world_group;
380  MPI_Comm_group(MPI_COMM_WORLD, &world_group);
381  mpi_identity_groups.resize(numHosts);
382 
383  for (unsigned x = 0; x < numHosts; ++x) {
384  const int g[1] = {(int)x};
385  MPI_Group_incl(world_group, 1, g, &mpi_identity_groups[x]);
386  }
387 
388  if (id == 0) {
389  switch (bare_mpi) {
390  case nonBlockingBareMPI:
391  galois::gPrint("Using non-blocking bare MPI\n");
392  break;
393  case oneSidedBareMPI:
394  galois::gPrint("Using one-sided bare MPI\n");
395  break;
396  case noBareMPI:
397  default:
398  GALOIS_DIE("unsupported bare MPI");
399  }
400  }
401 #endif
402  }
403 
404 public:
409  GluonSubstrate() = delete;
410 
424  GraphTy& _userGraph, unsigned host, unsigned numHosts, bool _transposed,
425  std::pair<unsigned, unsigned> _cartesianGrid = std::make_pair(0u, 0u),
426  bool _partitionAgnostic = false,
427  DataCommMode _enforcedDataMode = DataCommMode::noData)
428  : galois::runtime::GlobalObject(this), userGraph(_userGraph), id(host),
429  transposed(_transposed), isVertexCut(userGraph.is_vertex_cut()),
430  cartesianGrid(_cartesianGrid), partitionAgnostic(_partitionAgnostic),
431  substrateDataMode(_enforcedDataMode), numHosts(numHosts), num_run(0),
432  num_round(0), currentBVFlag(nullptr),
433  mirrorNodes(userGraph.getMirrorNodes()) {
434  if (cartesianGrid.first != 0 && cartesianGrid.second != 0) {
435  GALOIS_ASSERT(cartesianGrid.first * cartesianGrid.second == numHosts,
436  "Cartesian split doesn't equal number of hosts");
437  if (id == 0) {
438  galois::gInfo("Gluon optimizing communication for 2-D cartesian cut: ",
439  cartesianGrid.first, " x ", cartesianGrid.second);
440  }
441  isCartCut = true;
442  } else {
443  assert(cartesianGrid.first == 0 && cartesianGrid.second == 0);
444  isCartCut = false;
445  }
446 
447  // set this global value for use on GPUs mostly
448  enforcedDataMode = _enforcedDataMode;
449 
450  initBareMPI();
451  // master setup from mirrors done by setupCommunication call
452  masterNodes.resize(numHosts);
453  // setup proxy communication
454  galois::CondStatTimer<MORE_DIST_STATS> Tgraph_construct_comm(
455  "GraphCommSetupTime", RNAME);
456  Tgraph_construct_comm.start();
457  setupCommunication();
458  Tgraph_construct_comm.stop();
459  }
460 
462  // Data extraction from bitsets
464 
465 private:
479  template <SyncType syncType>
480  void getOffsetsFromBitset(const std::string& loopName,
481  const galois::DynamicBitSet& bitset_comm,
483  size_t& bit_set_count) const {
484  // timer creation
485  std::string syncTypeStr = (syncType == syncReduce) ? "Reduce" : "Broadcast";
486  std::string offsets_timer_str(syncTypeStr + "Offsets_" +
487  get_run_identifier(loopName));
488  galois::CondStatTimer<GALOIS_COMM_STATS> Toffsets(offsets_timer_str.c_str(),
489  RNAME);
490 
491  Toffsets.start();
492 
494  std::vector<unsigned int> t_prefix_bit_counts(activeThreads);
495 
496  // count how many bits are set on each thread
497  galois::on_each([&](unsigned tid, unsigned nthreads) {
498  // TODO use block_range instead
499  unsigned int block_size = bitset_comm.size() / nthreads;
500  if ((bitset_comm.size() % nthreads) > 0)
501  ++block_size;
502  assert((block_size * nthreads) >= bitset_comm.size());
503 
504  unsigned int start = tid * block_size;
505  unsigned int end = (tid + 1) * block_size;
506  if (end > bitset_comm.size())
507  end = bitset_comm.size();
508 
509  unsigned int count = 0;
510  for (unsigned int i = start; i < end; ++i) {
511  if (bitset_comm.test(i))
512  ++count;
513  }
514 
515  t_prefix_bit_counts[tid] = count;
516  });
517 
518  // calculate prefix sum of bits per thread
519  for (unsigned int i = 1; i < activeThreads; ++i) {
520  t_prefix_bit_counts[i] += t_prefix_bit_counts[i - 1];
521  }
522  // total num of set bits
523  bit_set_count = t_prefix_bit_counts[activeThreads - 1];
524 
525  // calculate the indices of the set bits and save them to the offset
526  // vector
527  if (bit_set_count > 0) {
528  offsets.resize(bit_set_count);
529  galois::on_each([&](unsigned tid, unsigned nthreads) {
530  // TODO use block_range instead
531  // TODO this is same calculation as above; maybe refactor it
532  // into function?
533  unsigned int block_size = bitset_comm.size() / nthreads;
534  if ((bitset_comm.size() % nthreads) > 0)
535  ++block_size;
536  assert((block_size * nthreads) >= bitset_comm.size());
537 
538  unsigned int start = tid * block_size;
539  unsigned int end = (tid + 1) * block_size;
540  if (end > bitset_comm.size())
541  end = bitset_comm.size();
542 
543  unsigned int count = 0;
544  unsigned int t_prefix_bit_count;
545  if (tid == 0) {
546  t_prefix_bit_count = 0;
547  } else {
548  t_prefix_bit_count = t_prefix_bit_counts[tid - 1];
549  }
550 
551  for (unsigned int i = start; i < end; ++i) {
552  if (bitset_comm.test(i)) {
553  offsets[t_prefix_bit_count + count] = i;
554  ++count;
555  }
556  }
557  });
558  }
559  Toffsets.stop();
560  }
561 
584  template <typename FnTy, SyncType syncType>
585  void getBitsetAndOffsets(const std::string& loopName,
586  const std::vector<size_t>& indices,
587  const galois::DynamicBitSet& bitset_compute,
588  galois::DynamicBitSet& bitset_comm,
590  size_t& bit_set_count,
591  DataCommMode& data_mode) const {
592  if (substrateDataMode != onlyData) {
593  bitset_comm.reset();
594  std::string syncTypeStr =
595  (syncType == syncReduce) ? "Reduce" : "Broadcast";
596  std::string doall_str(syncTypeStr + "Bitset_" + loopName);
597 
598  bitset_comm.reset();
599  // determine which local nodes in the indices array need to be
600  // sychronized
602  galois::iterate(size_t{0}, indices.size()),
603  [&](size_t n) {
604  // assumes each lid is unique as test is not thread safe
605  size_t lid = indices[n];
606  if (bitset_compute.test(lid)) {
607  bitset_comm.set(n);
608  }
609  },
610 #if GALOIS_COMM_STATS
611  galois::loopname(get_run_identifier(doall_str).c_str()),
612 #endif
613  galois::no_stats());
614 
615  // get the number of set bits and the offsets into the comm bitset
616  getOffsetsFromBitset<syncType>(loopName, bitset_comm, offsets,
617  bit_set_count);
618  }
619 
620  data_mode =
621  get_data_mode<typename FnTy::ValTy>(bit_set_count, indices.size());
622  }
623 
624  template <typename SyncFnTy>
625  size_t getMaxSendBufferSize(uint32_t numShared) {
626  if (substrateDataMode == gidsData) {
627  return sizeof(DataCommMode) + sizeof(size_t) + sizeof(size_t) +
628  (numShared * sizeof(unsigned int)) + sizeof(size_t) +
629  (numShared * sizeof(typename SyncFnTy::ValTy));
630  } else if (substrateDataMode == offsetsData) {
631  return sizeof(DataCommMode) + sizeof(size_t) + sizeof(size_t) +
632  (numShared * sizeof(unsigned int)) + sizeof(size_t) +
633  (numShared * sizeof(typename SyncFnTy::ValTy));
634  } else if (substrateDataMode == bitsetData) {
635  size_t bitset_alloc_size = ((numShared + 63) / 64) * sizeof(uint64_t);
636  return sizeof(DataCommMode) + sizeof(size_t) +
637  sizeof(size_t) // bitset size
638  + sizeof(size_t) // bitset vector size
639  + bitset_alloc_size + sizeof(size_t) +
640  (numShared * sizeof(typename SyncFnTy::ValTy));
641  } else { // onlyData or noData (auto)
642  size_t bitset_alloc_size = ((numShared + 63) / 64) * sizeof(uint64_t);
643  return sizeof(DataCommMode) + sizeof(size_t) +
644  sizeof(size_t) // bitset size
645  + sizeof(size_t) // bitset vector size
646  + bitset_alloc_size + sizeof(size_t) +
647  (numShared * sizeof(typename SyncFnTy::ValTy));
648  }
649  }
650 
652  // Local to global ID conversion
654 
665  template <SyncType syncType>
666  void convertLIDToGID(const std::string& loopName,
667  const std::vector<size_t>& indices,
669  std::string syncTypeStr = (syncType == syncReduce) ? "Reduce" : "Broadcast";
670  std::string doall_str(syncTypeStr + "_LID2GID_" +
671  get_run_identifier(loopName));
673  galois::iterate(size_t{0}, offsets.size()),
674  [&](size_t n) {
675  offsets[n] =
676  static_cast<uint32_t>(userGraph.getGID(indices[offsets[n]]));
677  },
678 #if GALOIS_COMM_STATS
679  galois::loopname(get_run_identifier(doall_str).c_str()),
680 #endif
681  galois::no_stats());
682  }
683 
692  template <SyncType syncType>
693  void convertGIDToLID(const std::string& loopName,
695  std::string syncTypeStr = (syncType == syncReduce) ? "Reduce" : "Broadcast";
696  std::string doall_str(syncTypeStr + "_GID2LID_" +
697  get_run_identifier(loopName));
698 
700  galois::iterate(size_t{0}, offsets.size()),
701  [&](size_t n) { offsets[n] = userGraph.getLID(offsets[n]); },
702 #if GALOIS_COMM_STATS
703  galois::loopname(get_run_identifier(doall_str).c_str()),
704 #endif
705  galois::no_stats());
706  }
707 
709  // Message prep functions (buffering, send buffer getting, etc.)
711 
723  template <
724  SyncType syncType, typename SyncFnTy, typename BitsetFnTy, typename VecTy,
725  bool async,
726  typename std::enable_if<!BitsetFnTy::is_vector_bitset()>::type* = nullptr>
727  void getSendBuffer(std::string loopName, unsigned x,
729  auto& sharedNodes = (syncType == syncReduce) ? mirrorNodes : masterNodes;
730 
731  if (BitsetFnTy::is_valid()) {
732  syncExtract<syncType, SyncFnTy, BitsetFnTy, VecTy, async>(
733  loopName, x, sharedNodes[x], b);
734  } else {
735  syncExtract<syncType, SyncFnTy, VecTy, async>(loopName, x, sharedNodes[x],
736  b);
737  }
738 
739  std::string syncTypeStr = (syncType == syncReduce) ? "Reduce" : "Broadcast";
740  std::string statSendBytes_str(syncTypeStr + "SendBytes_" +
741  get_run_identifier(loopName));
742 
743  galois::runtime::reportStat_Tsum(RNAME, statSendBytes_str, b.size());
744  }
745  template <
746  SyncType syncType, typename SyncFnTy, typename BitsetFnTy, typename VecTy,
747  bool async,
748  typename std::enable_if<BitsetFnTy::is_vector_bitset()>::type* = nullptr>
749  void getSendBuffer(std::string loopName, unsigned x,
751  auto& sharedNodes = (syncType == syncReduce) ? mirrorNodes : masterNodes;
752 
753  syncExtract<syncType, SyncFnTy, BitsetFnTy, VecTy, async>(
754  loopName, x, sharedNodes[x], b);
755 
756  std::string syncTypeStr = (syncType == syncReduce) ? "Reduce" : "Broadcast";
757  std::string statSendBytes_str(syncTypeStr + "SendBytesVector_" +
758  get_run_identifier(loopName));
759 
760  galois::runtime::reportStat_Tsum(RNAME, statSendBytes_str, b.size());
761  }
762 
780  template <bool async, SyncType syncType, typename VecType>
781  void serializeMessage(std::string loopName, DataCommMode data_mode,
782  size_t bit_set_count, std::vector<size_t>& indices,
784  galois::DynamicBitSet& bit_set_comm, VecType& val_vec,
786  std::string syncTypeStr = (syncType == syncReduce) ? "Reduce" : "Broadcast";
787  std::string serialize_timer_str(syncTypeStr + "SerializeMessage_" +
788  get_run_identifier(loopName));
790  serialize_timer_str.c_str(), RNAME);
791  if (data_mode == noData) {
792  if (!async) {
793  Tserialize.start();
794  gSerialize(b, data_mode);
795  Tserialize.stop();
796  }
797  } else if (data_mode == gidsData) {
798  offsets.resize(bit_set_count);
799  convertLIDToGID<syncType>(loopName, indices, offsets);
800  val_vec.resize(bit_set_count);
801  Tserialize.start();
802  gSerialize(b, data_mode, bit_set_count, offsets, val_vec);
803  Tserialize.stop();
804  } else if (data_mode == offsetsData) {
805  offsets.resize(bit_set_count);
806  val_vec.resize(bit_set_count);
807  Tserialize.start();
808  gSerialize(b, data_mode, bit_set_count, offsets, val_vec);
809  Tserialize.stop();
810  } else if (data_mode == bitsetData) {
811  val_vec.resize(bit_set_count);
812  Tserialize.start();
813  gSerialize(b, data_mode, bit_set_count, bit_set_comm, val_vec);
814  Tserialize.stop();
815  } else { // onlyData
816  Tserialize.start();
817  gSerialize(b, data_mode, val_vec);
818  Tserialize.stop();
819  }
820  }
821 
846  template <SyncType syncType, typename VecType>
847  void deserializeMessage(std::string loopName, DataCommMode data_mode,
848  uint32_t num, galois::runtime::RecvBuffer& buf,
849  size_t& bit_set_count,
851  galois::DynamicBitSet& bit_set_comm,
852  size_t& buf_start, size_t& retval, VecType& val_vec) {
853  std::string syncTypeStr = (syncType == syncReduce) ? "Reduce" : "Broadcast";
854  std::string serialize_timer_str(syncTypeStr + "DeserializeMessage_" +
855  get_run_identifier(loopName));
857  serialize_timer_str.c_str(), RNAME);
858  Tdeserialize.start();
859 
860  // get other metadata associated with message if mode isn't OnlyData
861  if (data_mode != onlyData) {
862  galois::runtime::gDeserialize(buf, bit_set_count);
863 
864  if (data_mode == gidsData) {
865  galois::runtime::gDeserialize(buf, offsets);
866  convertGIDToLID<syncType>(loopName, offsets);
867  } else if (data_mode == offsetsData) {
868  galois::runtime::gDeserialize(buf, offsets);
869  } else if (data_mode == bitsetData) {
870  bit_set_comm.resize(num);
871  galois::runtime::gDeserialize(buf, bit_set_comm);
872  } else if (data_mode == dataSplit) {
873  galois::runtime::gDeserialize(buf, buf_start);
874  } else if (data_mode == dataSplitFirst) {
875  galois::runtime::gDeserialize(buf, retval);
876  }
877  }
878 
879  // get data itself
880  galois::runtime::gDeserialize(buf, val_vec);
881 
882  Tdeserialize.stop();
883  }
884 
886  // Other helper functions
888 
890  unsigned gridRowID() const { return (id / cartesianGrid.second); }
892  unsigned gridRowID(unsigned hid) const {
893  return (hid / cartesianGrid.second);
894  }
896  unsigned gridColumnID() const { return (id % cartesianGrid.second); }
898  unsigned gridColumnID(unsigned hid) const {
899  return (hid % cartesianGrid.second);
900  }
901 
905  bool isNotCommPartnerCVC(unsigned host, SyncType syncType,
906  WriteLocation writeLocation,
907  ReadLocation readLocation) {
908  assert(cartesianGrid.first != 0);
909  assert(cartesianGrid.second != 0);
910 
911  if (transposed) {
912  if (syncType == syncReduce) {
913  switch (writeLocation) {
914  case writeSource:
915  return (gridColumnID() != gridColumnID(host));
916  case writeDestination:
917  return (gridRowID() != gridRowID(host));
918  case writeAny:
919  assert((gridRowID() == gridRowID(host)) ||
920  (gridColumnID() == gridColumnID(host)));
921  return ((gridRowID() != gridRowID(host)) &&
922  (gridColumnID() != gridColumnID(host))); // false
923  default:
924  GALOIS_DIE("unreachable");
925  }
926  } else { // syncBroadcast
927  switch (readLocation) {
928  case readSource:
929  return (gridColumnID() != gridColumnID(host));
930  case readDestination:
931  return (gridRowID() != gridRowID(host));
932  case readAny:
933  assert((gridRowID() == gridRowID(host)) ||
934  (gridColumnID() == gridColumnID(host)));
935  return ((gridRowID() != gridRowID(host)) &&
936  (gridColumnID() != gridColumnID(host))); // false
937  default:
938  GALOIS_DIE("unreachable");
939  }
940  }
941  } else {
942  if (syncType == syncReduce) {
943  switch (writeLocation) {
944  case writeSource:
945  return (gridRowID() != gridRowID(host));
946  case writeDestination:
947  return (gridColumnID() != gridColumnID(host));
948  case writeAny:
949  assert((gridRowID() == gridRowID(host)) ||
950  (gridColumnID() == gridColumnID(host)));
951  return ((gridRowID() != gridRowID(host)) &&
952  (gridColumnID() != gridColumnID(host))); // false
953  default:
954  GALOIS_DIE("unreachable");
955  }
956  } else { // syncBroadcast, 1
957  switch (readLocation) {
958  case readSource:
959  return (gridRowID() != gridRowID(host));
960  case readDestination:
961  return (gridColumnID() != gridColumnID(host));
962  case readAny:
963  assert((gridRowID() == gridRowID(host)) ||
964  (gridColumnID() == gridColumnID(host)));
965  return ((gridRowID() != gridRowID(host)) &&
966  (gridColumnID() != gridColumnID(host))); // false
967  default:
968  GALOIS_DIE("unreachable");
969  }
970  }
971  return false;
972  }
973  }
974 
975  // Requirement: For all X and Y,
976  // On X, nothingToSend(Y) <=> On Y, nothingToRecv(X)
989  bool nothingToSend(unsigned host, SyncType syncType,
990  WriteLocation writeLocation, ReadLocation readLocation) {
991  auto& sharedNodes = (syncType == syncReduce) ? mirrorNodes : masterNodes;
992  // TODO refactor (below)
993  if (!isCartCut) {
994  return (sharedNodes[host].size() == 0);
995  } else {
996  // TODO If CVC, call is not comm partner else use default above
997  if (sharedNodes[host].size() > 0) {
998  return isNotCommPartnerCVC(host, syncType, writeLocation, readLocation);
999  } else {
1000  return true;
1001  }
1002  }
1003  }
1004 
1018  bool nothingToRecv(unsigned host, SyncType syncType,
1019  WriteLocation writeLocation, ReadLocation readLocation) {
1020  auto& sharedNodes = (syncType == syncReduce) ? masterNodes : mirrorNodes;
1021  // TODO refactor (above)
1022  if (!isCartCut) {
1023  return (sharedNodes[host].size() == 0);
1024  } else {
1025  if (sharedNodes[host].size() > 0) {
1026  return isNotCommPartnerCVC(host, syncType, writeLocation, readLocation);
1027  } else {
1028  return true;
1029  }
1030  }
1031  }
1032 
1047  template <typename SyncFnTy>
1048  void reportRedundantSize(std::string loopName, std::string syncTypeStr,
1049  uint32_t totalToSend, size_t bitSetCount,
1050  const galois::DynamicBitSet& bitSetComm) {
1051  size_t redundant_size =
1052  (totalToSend - bitSetCount) * sizeof(typename SyncFnTy::ValTy);
1053  size_t bit_set_size = (bitSetComm.get_vec().size() * sizeof(uint64_t));
1054 
1055  if (redundant_size > bit_set_size) {
1056  std::string statSavedBytes_str(syncTypeStr + "SavedBytes_" +
1057  get_run_identifier(loopName));
1058 
1059  galois::runtime::reportStatCond_Tsum<MORE_DIST_STATS>(
1060  RNAME, statSavedBytes_str, (redundant_size - bit_set_size));
1061  }
1062  }
1063 
1065  // Extract data from nodes (for reduce and broadcast)
1067 
1079  /* Reduction extract resets the value afterwards */
1080  template <typename FnTy, SyncType syncType>
1081  inline typename FnTy::ValTy extractWrapper(size_t lid) {
1082  if (syncType == syncReduce) {
1083  auto val = FnTy::extract(lid, userGraph.getData(lid));
1084  FnTy::reset(lid, userGraph.getData(lid));
1085  return val;
1086  } else {
1087  return FnTy::extract(lid, userGraph.getData(lid));
1088  }
1089  }
1090 
1105  /* Reduction extract resets the value afterwards */
1106  template <typename FnTy, SyncType syncType>
1107  inline typename FnTy::ValTy extractWrapper(size_t lid, unsigned vecIndex) {
1108  if (syncType == syncReduce) {
1109  auto val = FnTy::extract(lid, userGraph.getData(lid), vecIndex);
1110  FnTy::reset(lid, userGraph.getData(lid), vecIndex);
1111  return val;
1112  } else {
1113  return FnTy::extract(lid, userGraph.getData(lid), vecIndex);
1114  }
1115  }
1116 
1138  template <typename FnTy, SyncType syncType, typename VecTy,
1139  bool identity_offsets = false, bool parallelize = true>
1140  void extractSubset(const std::string& loopName,
1141  const std::vector<size_t>& indices, size_t size,
1143  VecTy& val_vec, size_t start = 0) {
1144  if (parallelize) {
1145  std::string syncTypeStr =
1146  (syncType == syncReduce) ? "Reduce" : "Broadcast";
1147  std::string doall_str(syncTypeStr + "ExtractVal_" + loopName);
1148 
1150  galois::iterate(start, start + size),
1151  [&](unsigned int n) {
1152  unsigned int offset;
1153  if (identity_offsets)
1154  offset = n;
1155  else
1156  offset = offsets[n];
1157  size_t lid = indices[offset];
1158  val_vec[n - start] = extractWrapper<FnTy, syncType>(lid);
1159  },
1160 #if GALOIS_COMM_STATS
1161  galois::loopname(get_run_identifier(doall_str).c_str()),
1162 #endif
1163  galois::no_stats());
1164  } else { // non-parallel version
1165  for (unsigned n = start; n < start + size; ++n) {
1166  unsigned int offset;
1167  if (identity_offsets)
1168  offset = n;
1169  else
1170  offset = offsets[n];
1171 
1172  size_t lid = indices[offset];
1173  val_vec[n - start] = extractWrapper<FnTy, syncType>(lid);
1174  }
1175  }
1176  }
1177 
1204  // TODO find a better way to have this variant without code duplication
1205  template <typename FnTy, SyncType syncType, typename VecTy,
1206  bool identity_offsets = false, bool parallelize = true,
1207  bool vecSync = false,
1208  typename std::enable_if<vecSync>::type* = nullptr>
1209  void extractSubset(const std::string& loopName,
1210  const std::vector<size_t>& indices, size_t size,
1212  VecTy& val_vec, unsigned vecIndex, size_t start = 0) {
1213  val_vec.resize(size); // resize val vec for this vecIndex
1214 
1215  if (parallelize) {
1216  std::string syncTypeStr =
1217  (syncType == syncReduce) ? "Reduce" : "Broadcast";
1218  std::string doall_str(syncTypeStr + "ExtractValVector_" + loopName);
1219 
1221  galois::iterate(start, start + size),
1222  [&](unsigned int n) {
1223  unsigned int offset;
1224  if (identity_offsets)
1225  offset = n;
1226  else
1227  offset = offsets[n];
1228  size_t lid = indices[offset];
1229  val_vec[n - start] = extractWrapper<FnTy, syncType>(lid, vecIndex);
1230  },
1231 #if GALOIS_COMM_STATS
1232  galois::loopname(get_run_identifier(doall_str).c_str()),
1233 #endif
1234  galois::no_stats());
1235  } else { // non-parallel version
1236  for (unsigned n = start; n < start + size; ++n) {
1237  unsigned int offset;
1238  if (identity_offsets)
1239  offset = n;
1240  else
1241  offset = offsets[n];
1242  size_t lid = indices[offset];
1243  val_vec[n - start] = extractWrapper<FnTy, syncType>(lid, vecIndex);
1244  }
1245  }
1246  }
1247 
1272  template <typename FnTy, typename SeqTy, SyncType syncType,
1273  bool identity_offsets = false, bool parallelize = true>
1274  void extractSubset(const std::string& loopName,
1275  const std::vector<size_t>& indices, size_t size,
1277  galois::runtime::SendBuffer& b, SeqTy lseq,
1278  size_t start = 0) {
1279  if (parallelize) {
1280  std::string syncTypeStr =
1281  (syncType == syncReduce) ? "Reduce" : "Broadcast";
1282  std::string doall_str(syncTypeStr + "ExtractVal_" + loopName);
1283 
1285  galois::iterate(start, start + size),
1286  [&](unsigned int n) {
1287  unsigned int offset;
1288  if (identity_offsets)
1289  offset = n;
1290  else
1291  offset = offsets[n];
1292 
1293  size_t lid = indices[offset];
1294  gSerializeLazy(b, lseq, n - start,
1295  extractWrapper<FnTy, syncType>(lid));
1296  },
1297 #if GALOIS_COMM_STATS
1298  galois::loopname(get_run_identifier(doall_str).c_str()),
1299 #endif
1300  galois::no_stats());
1301  } else {
1302  for (unsigned int n = start; n < start + size; ++n) {
1303  unsigned int offset;
1304  if (identity_offsets)
1305  offset = n;
1306  else
1307  offset = offsets[n];
1308  size_t lid = indices[offset];
1309  gSerializeLazy(b, lseq, n - start, extractWrapper<FnTy, syncType>(lid));
1310  }
1311  }
1312  }
1313 
1326  template <typename FnTy, SyncType syncType>
1327  inline bool extractBatchWrapper(unsigned x, galois::runtime::SendBuffer& b) {
1328  if (syncType == syncReduce) {
1329  return FnTy::extract_reset_batch(x, b.getVec().data());
1330  } else {
1331  return FnTy::extract_batch(x, b.getVec().data());
1332  }
1333  }
1334 
1353  template <typename FnTy, SyncType syncType>
1354  inline bool extractBatchWrapper(unsigned x, galois::runtime::SendBuffer& b,
1355  size_t& s, DataCommMode& data_mode) {
1356  if (syncType == syncReduce) {
1357  return FnTy::extract_reset_batch(x, b.getVec().data(), &s, &data_mode);
1358  } else {
1359  return FnTy::extract_batch(x, b.getVec().data(), &s, &data_mode);
1360  }
1361  }
1362 
1364  // Reduce/sets on node (for broadcast)
1366 
1378  template <typename FnTy, SyncType syncType, bool async>
1379  inline void setWrapper(size_t lid, typename FnTy::ValTy val,
1380  galois::DynamicBitSet& bit_set_compute) {
1381  if (syncType == syncReduce) {
1382  if (FnTy::reduce(lid, userGraph.getData(lid), val)) {
1383  if (bit_set_compute.size() != 0)
1384  bit_set_compute.set(lid);
1385  }
1386  } else {
1387  if (async)
1388  FnTy::reduce(lid, userGraph.getData(lid), val);
1389  else
1390  FnTy::setVal(lid, userGraph.getData(lid), val);
1391  }
1392  }
1393 
1410  template <typename FnTy, SyncType syncType, bool async>
1411  inline void setWrapper(size_t lid, typename FnTy::ValTy val,
1412  galois::DynamicBitSet& bit_set_compute,
1413  unsigned vecIndex) {
1414  if (syncType == syncReduce) {
1415  if (FnTy::reduce(lid, userGraph.getData(lid), val, vecIndex)) {
1416  if (bit_set_compute.size() != 0)
1417  bit_set_compute.set(lid);
1418  }
1419  } else {
1420  if (async)
1421  FnTy::reduce(lid, userGraph.getData(lid), val, vecIndex);
1422  else
1423  FnTy::setVal(lid, userGraph.getData(lid), val, vecIndex);
1424  }
1425  }
1426 
1450  template <typename IndicesVecTy, typename FnTy, SyncType syncType,
1451  typename VecTy, bool async, bool identity_offsets = false,
1452  bool parallelize = true>
1453  void setSubset(const std::string& loopName, const IndicesVecTy& indices,
1454  size_t size,
1456  VecTy& val_vec, galois::DynamicBitSet& bit_set_compute,
1457  size_t start = 0) {
1458  std::string syncTypeStr = (syncType == syncReduce) ? "Reduce" : "Broadcast";
1459  std::string doall_str(syncTypeStr + "SetVal_" +
1460  get_run_identifier(loopName));
1461 
1462  if (parallelize) {
1464  galois::iterate(start, start + size),
1465  [&](unsigned int n) {
1466  unsigned int offset;
1467  if (identity_offsets)
1468  offset = n;
1469  else
1470  offset = offsets[n];
1471  auto lid = indices[offset];
1472  setWrapper<FnTy, syncType, async>(lid, val_vec[n - start],
1473  bit_set_compute);
1474  },
1475 #if GALOIS_COMM_STATS
1476  galois::loopname(get_run_identifier(doall_str).c_str()),
1477 #endif
1478  galois::no_stats());
1479  } else {
1480  for (unsigned int n = start; n < start + size; ++n) {
1481  unsigned int offset;
1482  if (identity_offsets)
1483  offset = n;
1484  else
1485  offset = offsets[n];
1486  auto lid = indices[offset];
1487  setWrapper<FnTy, syncType, async>(lid, val_vec[n - start],
1488  bit_set_compute);
1489  }
1490  }
1491  }
1492 
1524  // TODO find a better way to have this variant without code duplication
1525  template <typename IndicesVecTy, typename FnTy, SyncType syncType,
1526  typename VecTy, bool async, bool identity_offsets = false,
1527  bool parallelize = true, bool vecSync = false,
1528  typename std::enable_if<vecSync>::type* = nullptr>
1529  void setSubset(const std::string& loopName, const IndicesVecTy& indices,
1530  size_t size,
1532  VecTy& val_vec, galois::DynamicBitSet& bit_set_compute,
1533  unsigned vecIndex, size_t start = 0) {
1534  std::string syncTypeStr = (syncType == syncReduce) ? "Reduce" : "Broadcast";
1535  std::string doall_str(syncTypeStr + "SetValVector_" +
1536  get_run_identifier(loopName));
1537 
1538  if (parallelize) {
1540  galois::iterate(start, start + size),
1541  [&](unsigned int n) {
1542  unsigned int offset;
1543  if (identity_offsets)
1544  offset = n;
1545  else
1546  offset = offsets[n];
1547  auto lid = indices[offset];
1548  setWrapper<FnTy, syncType, async>(lid, val_vec[n - start],
1549  bit_set_compute, vecIndex);
1550  },
1551 #if GALOIS_COMM_STATS
1552  galois::loopname(get_run_identifier(doall_str).c_str()),
1553 #endif
1554  galois::no_stats());
1555  } else {
1556  for (unsigned int n = start; n < start + size; ++n) {
1557  unsigned int offset;
1558  if (identity_offsets)
1559  offset = n;
1560  else
1561  offset = offsets[n];
1562  auto lid = indices[offset];
1563  setWrapper<FnTy, syncType, async>(lid, val_vec[n - start],
1564  bit_set_compute, vecIndex);
1565  }
1566  }
1567  }
1568 
1580  template <typename FnTy, SyncType syncType, bool async>
1581  inline bool setBatchWrapper(unsigned x, galois::runtime::RecvBuffer& b) {
1582  if (syncType == syncReduce) {
1583  return FnTy::reduce_batch(x, b.getVec().data() + b.getOffset());
1584  } else {
1585  if (async) {
1586  return FnTy::reduce_mirror_batch(x, b.getVec().data() + b.getOffset());
1587  } else {
1588  return FnTy::setVal_batch(x, b.getVec().data() + b.getOffset());
1589  }
1590  }
1591  }
1592 
1609  template <typename FnTy, SyncType syncType, bool async>
1610  inline bool setBatchWrapper(unsigned x, galois::runtime::RecvBuffer& b,
1611  DataCommMode& data_mode) {
1612  if (syncType == syncReduce) {
1613  return FnTy::reduce_batch(x, b.getVec().data() + b.getOffset(),
1614  data_mode);
1615  } else {
1616  if (async) {
1617  return FnTy::reduce_mirror_batch(x, b.getVec().data() + b.getOffset(),
1618  data_mode);
1619  } else {
1620  return FnTy::setVal_batch(x, b.getVec().data() + b.getOffset(),
1621  data_mode);
1622  }
1623  }
1624  }
1625 
1627  // Sends
1629 
1643  template <SyncType syncType, typename SyncFnTy, typename VecTy, bool async,
1644  typename std::enable_if<galois::runtime::is_memory_copyable<
1645  typename SyncFnTy::ValTy>::value>::type* = nullptr>
1646  void syncExtract(std::string loopName, unsigned from_id,
1647  std::vector<size_t>& indices,
1649  uint32_t num = indices.size();
1650  static VecTy val_vec; // sometimes wasteful
1651  galois::PODResizeableArray<unsigned int>& offsets = syncOffsets;
1652  std::string syncTypeStr = (syncType == syncReduce) ? "Reduce" : "Broadcast";
1653  std::string extract_timer_str(syncTypeStr + "Extract_" +
1654  get_run_identifier(loopName));
1655  galois::CondStatTimer<GALOIS_COMM_STATS> Textract(extract_timer_str.c_str(),
1656  RNAME);
1657  std::string extract_batch_timer_str(syncTypeStr + "ExtractBatch_" +
1658  get_run_identifier(loopName));
1660  extract_batch_timer_str.c_str(), RNAME);
1661 
1662  DataCommMode data_mode;
1663 
1664  Textract.start();
1665 
1666  if (num > 0) {
1667  data_mode = onlyData;
1668  b.reserve(sizeof(DataCommMode) + sizeof(size_t) +
1669  (num * sizeof(typename SyncFnTy::ValTy)));
1670 
1671  Textractbatch.start();
1672  bool batch_succeeded =
1673  extractBatchWrapper<SyncFnTy, syncType>(from_id, b);
1674  Textractbatch.stop();
1675 
1676  if (!batch_succeeded) {
1677  b.resize(0);
1678  val_vec.reserve(maxSharedSize);
1679  val_vec.resize(num);
1680  gSerialize(b, onlyData);
1681  auto lseq = gSerializeLazySeq(
1682  b, num,
1684  extractSubset<SyncFnTy, decltype(lseq), syncType, true, true>(
1685  loopName, indices, num, offsets, b, lseq);
1686  } else {
1687  b.resize(sizeof(DataCommMode) + sizeof(size_t) +
1688  (num * sizeof(typename SyncFnTy::ValTy)));
1689  }
1690  } else {
1691  data_mode = noData;
1692  b.resize(0);
1693  if (!async) {
1694  gSerialize(b, noData);
1695  }
1696  }
1697 
1698  Textract.stop();
1699 
1700  std::string metadata_str(syncTypeStr + "MetadataMode_" +
1701  std::to_string(data_mode) + "_" +
1702  get_run_identifier(loopName));
1703  galois::runtime::reportStatCond_Single<MORE_DIST_STATS>(RNAME, metadata_str,
1704  1);
1705  }
1706 
1724  template <SyncType syncType, typename SyncFnTy, typename VecTy, bool async,
1725  typename std::enable_if<!galois::runtime::is_memory_copyable<
1726  typename SyncFnTy::ValTy>::value>::type* = nullptr>
1727  void syncExtract(std::string loopName, unsigned from_id,
1728  std::vector<size_t>& indices,
1730  std::string syncTypeStr = (syncType == syncReduce) ? "Reduce" : "Broadcast";
1731  std::string extract_timer_str(syncTypeStr + "Extract_" +
1732  get_run_identifier(loopName));
1733  galois::CondStatTimer<GALOIS_COMM_STATS> Textract(extract_timer_str.c_str(),
1734  RNAME);
1735  std::string extract_batch_timer_str(syncTypeStr + "ExtractBatch_" +
1736  get_run_identifier(loopName));
1738  extract_batch_timer_str.c_str(), RNAME);
1739 
1740  DataCommMode data_mode;
1741 
1742  uint32_t num = indices.size();
1743  static VecTy val_vec; // sometimes wasteful
1744  static galois::PODResizeableArray<unsigned int> dummyVector;
1745 
1746  Textract.start();
1747 
1748  if (num > 0) {
1749  data_mode = onlyData;
1750  b.reserve(sizeof(DataCommMode) + sizeof(size_t) +
1751  (num * sizeof(typename SyncFnTy::ValTy)));
1752 
1753  Textractbatch.start();
1754  bool batch_succeeded =
1755  extractBatchWrapper<SyncFnTy, syncType>(from_id, b);
1756  Textractbatch.stop();
1757 
1758  if (!batch_succeeded) {
1759  b.resize(0);
1760  val_vec.reserve(maxSharedSize);
1761  val_vec.resize(num);
1762  // get everything (note I pass in "indices" as offsets as it won't
1763  // even get used anyways)
1764  extractSubset<SyncFnTy, syncType, VecTy, true, true>(
1765  loopName, indices, num, dummyVector, val_vec);
1766  gSerialize(b, onlyData, val_vec);
1767  } else {
1768  b.resize(sizeof(DataCommMode) + sizeof(size_t) +
1769  (num * sizeof(typename SyncFnTy::ValTy)));
1770  }
1771 
1772  } else {
1773  b.resize(0);
1774  if (!async) {
1775  data_mode = noData;
1776  gSerialize(b, noData);
1777  }
1778  }
1779 
1780  Textract.stop();
1781 
1782  std::string metadata_str(syncTypeStr + "MetadataMode_" +
1783  std::to_string(data_mode) + "_" +
1784  get_run_identifier(loopName));
1785  galois::runtime::reportStatCond_Single<MORE_DIST_STATS>(RNAME, metadata_str,
1786  1);
1787  }
1788 
1806  template <
1807  SyncType syncType, typename SyncFnTy, typename BitsetFnTy, typename VecTy,
1808  bool async,
1809  typename std::enable_if<!BitsetFnTy::is_vector_bitset()>::type* = nullptr>
1810  void syncExtract(std::string loopName, unsigned from_id,
1811  std::vector<size_t>& indices,
1813  uint32_t num = indices.size();
1814  galois::DynamicBitSet& bit_set_comm = syncBitset;
1815  static VecTy val_vec; // sometimes wasteful
1816  galois::PODResizeableArray<unsigned int>& offsets = syncOffsets;
1817 
1818  std::string syncTypeStr = (syncType == syncReduce) ? "Reduce" : "Broadcast";
1819  std::string extract_timer_str(syncTypeStr + "Extract_" +
1820  get_run_identifier(loopName));
1821  galois::CondStatTimer<GALOIS_COMM_STATS> Textract(extract_timer_str.c_str(),
1822  RNAME);
1823  std::string extract_alloc_timer_str(syncTypeStr + "ExtractAlloc_" +
1824  get_run_identifier(loopName));
1826  extract_alloc_timer_str.c_str(), RNAME);
1827  std::string extract_batch_timer_str(syncTypeStr + "ExtractBatch_" +
1828  get_run_identifier(loopName));
1830  extract_batch_timer_str.c_str(), RNAME);
1831 
1832  DataCommMode data_mode;
1833 
1834  Textract.start();
1835 
1836  if (num > 0) {
1837  size_t bit_set_count = 0;
1838  Textractalloc.start();
1839  b.reserve(getMaxSendBufferSize<SyncFnTy>(num));
1840  Textractalloc.stop();
1841 
1842  Textractbatch.start();
1843  bool batch_succeeded = extractBatchWrapper<SyncFnTy, syncType>(
1844  from_id, b, bit_set_count, data_mode);
1845  Textractbatch.stop();
1846 
1847  // GPUs have a batch function they can use; CPUs do not; therefore,
1848  // CPUS always enter this if block
1849  if (!batch_succeeded) {
1850  Textractalloc.start();
1851  b.resize(0);
1852  bit_set_comm.reserve(maxSharedSize);
1853  offsets.reserve(maxSharedSize);
1854  val_vec.reserve(maxSharedSize);
1855  bit_set_comm.resize(num);
1856  offsets.resize(num);
1857  val_vec.resize(num);
1858  Textractalloc.stop();
1859  const galois::DynamicBitSet& bit_set_compute = BitsetFnTy::get();
1860 
1861  getBitsetAndOffsets<SyncFnTy, syncType>(
1862  loopName, indices, bit_set_compute, bit_set_comm, offsets,
1863  bit_set_count, data_mode);
1864 
1865  if (data_mode == onlyData) {
1866  bit_set_count = indices.size();
1867  extractSubset<SyncFnTy, syncType, VecTy, true, true>(
1868  loopName, indices, bit_set_count, offsets, val_vec);
1869  } else if (data_mode !=
1870  noData) { // bitsetData or offsetsData or gidsData
1871  extractSubset<SyncFnTy, syncType, VecTy, false, true>(
1872  loopName, indices, bit_set_count, offsets, val_vec);
1873  }
1874  serializeMessage<async, syncType>(loopName, data_mode, bit_set_count,
1875  indices, offsets, bit_set_comm,
1876  val_vec, b);
1877  } else {
1878  if (data_mode == noData) {
1879  b.resize(0);
1880  if (!async) {
1881  gSerialize(b, data_mode);
1882  }
1883  } else if (data_mode == gidsData) {
1884  b.resize(sizeof(DataCommMode) + sizeof(bit_set_count) +
1885  sizeof(size_t) + (bit_set_count * sizeof(unsigned int)) +
1886  sizeof(size_t) +
1887  (bit_set_count * sizeof(typename SyncFnTy::ValTy)));
1888  } else if (data_mode == offsetsData) {
1889  b.resize(sizeof(DataCommMode) + sizeof(bit_set_count) +
1890  sizeof(size_t) + (bit_set_count * sizeof(unsigned int)) +
1891  sizeof(size_t) +
1892  (bit_set_count * sizeof(typename SyncFnTy::ValTy)));
1893  } else if (data_mode == bitsetData) {
1894  size_t bitset_alloc_size = ((num + 63) / 64) * sizeof(uint64_t);
1895  b.resize(sizeof(DataCommMode) + sizeof(bit_set_count) +
1896  sizeof(size_t) // bitset size
1897  + sizeof(size_t) // bitset vector size
1898  + bitset_alloc_size + sizeof(size_t) +
1899  (bit_set_count * sizeof(typename SyncFnTy::ValTy)));
1900  } else { // onlyData
1901  b.resize(sizeof(DataCommMode) + sizeof(size_t) +
1902  (num * sizeof(typename SyncFnTy::ValTy)));
1903  }
1904  }
1905 
1906  reportRedundantSize<SyncFnTy>(loopName, syncTypeStr, num, bit_set_count,
1907  bit_set_comm);
1908  } else {
1909  data_mode = noData;
1910  b.resize(0);
1911  if (!async) {
1912  gSerialize(b, noData);
1913  }
1914  }
1915 
1916  Textract.stop();
1917 
1918  std::string metadata_str(syncTypeStr + "MetadataMode_" +
1919  std::to_string(data_mode) + "_" +
1920  get_run_identifier(loopName));
1921  galois::runtime::reportStatCond_Single<MORE_DIST_STATS>(RNAME, metadata_str,
1922  1);
1923  }
1924 
1945  template <
1946  SyncType syncType, typename SyncFnTy, typename BitsetFnTy, typename VecTy,
1947  bool async,
1948  typename std::enable_if<BitsetFnTy::is_vector_bitset()>::type* = nullptr>
1949  void syncExtract(std::string loopName, unsigned, std::vector<size_t>& indices,
1951  uint32_t num = indices.size();
1952  galois::DynamicBitSet& bit_set_comm = syncBitset;
1953  static VecTy val_vec; // sometimes wasteful
1954  galois::PODResizeableArray<unsigned int>& offsets = syncOffsets;
1955 
1956  std::string syncTypeStr = (syncType == syncReduce) ? "Reduce" : "Broadcast";
1957  std::string extract_timer_str(syncTypeStr + "ExtractVector_" +
1958  get_run_identifier(loopName));
1959  galois::CondStatTimer<GALOIS_COMM_STATS> Textract(extract_timer_str.c_str(),
1960  RNAME);
1961 
1962  Textract.start();
1963 
1964  if (num > 0) {
1965  bit_set_comm.reserve(maxSharedSize);
1966  offsets.reserve(maxSharedSize);
1967  val_vec.reserve(maxSharedSize);
1968  bit_set_comm.resize(num);
1969  offsets.resize(num);
1970  val_vec.resize(num);
1971  }
1972 
1973  DataCommMode data_mode;
1974  // loop over all bitsets in the vector of bitsets; each one corresponds to
1975  // a different index in the vector field we are synchronizing
1976  for (unsigned i = 0; i < BitsetFnTy::numBitsets(); i++) {
1977  if (num > 0) {
1978  bit_set_comm.reset();
1979 
1980  size_t bit_set_count = 0;
1981 
1982  // No GPU support currently
1983  const galois::DynamicBitSet& bit_set_compute = BitsetFnTy::get(i);
1984 
1985  getBitsetAndOffsets<SyncFnTy, syncType>(
1986  loopName, indices, bit_set_compute, bit_set_comm, offsets,
1987  bit_set_count, data_mode);
1988 
1989  // note the extra template argument which specifies that this is a
1990  // vector extract, i.e. get element i of the vector (i passed in as
1991  // argument as well)
1992  if (data_mode == onlyData) {
1993  // galois::gInfo(id, " node ", i, " has data to send");
1994  bit_set_count = indices.size();
1995  extractSubset<SyncFnTy, syncType, VecTy, true, true, true>(
1996  loopName, indices, bit_set_count, offsets, val_vec, i);
1997  } else if (data_mode !=
1998  noData) { // bitsetData or offsetsData or gidsData
1999  // galois::gInfo(id, " node ", i, " has data to send");
2000  extractSubset<SyncFnTy, syncType, VecTy, false, true, true>(
2001  loopName, indices, bit_set_count, offsets, val_vec, i);
2002  }
2003 
2004  reportRedundantSize<SyncFnTy>(loopName, syncTypeStr, num, bit_set_count,
2005  bit_set_comm);
2006  serializeMessage<async, syncType>(loopName, data_mode, bit_set_count,
2007  indices, offsets, bit_set_comm,
2008  val_vec, b);
2009  } else {
2010  if (!async) { // TODO: is this fine?
2011  // append noData for however many bitsets there are
2012  gSerialize(b, noData);
2013  }
2014  }
2015  }
2016 
2017  Textract.stop();
2018 
2019  // FIXME report metadata mode for the different bitsets?
2020  // std::string metadata_str(syncTypeStr + "_METADATA_MODE" +
2021  // std::to_string(data_mode) +
2022  // get_run_identifier(loopName));
2023  // galois::runtime::reportStat_Single(RNAME, metadata_str, 1);
2024  }
2025 
2026 #ifdef GALOIS_USE_BARE_MPI
2027 
2030  template <WriteLocation writeLocation, ReadLocation readLocation,
2031  SyncType syncType, typename SyncFnTy, typename BitsetFnTy,
2032  typename VecTy, bool async>
2033  void sync_mpi_send(std::string loopName) {
2034  static std::vector<galois::runtime::SendBuffer> b;
2035  static std::vector<MPI_Request> request;
2036  b.resize(numHosts);
2037  request.resize(numHosts, MPI_REQUEST_NULL);
2038 
2039  for (unsigned h = 1; h < numHosts; ++h) {
2040  unsigned x = (id + h) % numHosts;
2041 
2042  if (nothingToSend(x, syncType, writeLocation, readLocation))
2043  continue;
2044 
2045  int ready = 0;
2046  MPI_Test(&request[x], &ready, MPI_STATUS_IGNORE);
2047  if (!ready) {
2048  assert(b[x].size() > 0);
2049  MPI_Wait(&request[x], MPI_STATUS_IGNORE);
2050  }
2051  if (b[x].size() > 0) {
2052  b[x].getVec().clear();
2053  }
2054 
2055  getSendBuffer<syncType, SyncFnTy, BitsetFnTy, VecTy, async>(loopName, x,
2056  b[x]);
2057 
2058  MPI_Isend((uint8_t*)b[x].linearData(), b[x].size(), MPI_BYTE, x, 32767,
2059  MPI_COMM_WORLD, &request[x]);
2060  }
2061 
2062  if (BitsetFnTy::is_valid()) {
2063  reset_bitset(syncType, &BitsetFnTy::reset_range);
2064  }
2065  }
2066 
2070  template <WriteLocation writeLocation, ReadLocation readLocation,
2071  SyncType syncType, typename SyncFnTy, typename BitsetFnTy,
2072  typename VecTy, bool async>
2073  void sync_mpi_put(std::string loopName, const MPI_Group& mpi_access_group,
2074  const std::vector<MPI_Win>& window) {
2075 
2076  MPI_Win_start(mpi_access_group, 0, window[id]);
2077 
2078  std::vector<galois::runtime::SendBuffer> b(numHosts);
2079  std::vector<size_t> size(numHosts);
2080  uint64_t send_buffers_size = 0;
2081 
2082  for (unsigned h = 1; h < numHosts; ++h) {
2083  unsigned x = (id + h) % numHosts;
2084 
2085  if (nothingToSend(x, syncType, writeLocation, readLocation))
2086  continue;
2087 
2088  getSendBuffer<syncType, SyncFnTy, BitsetFnTy, VecTy, async>(loopName, x,
2089  b[x]);
2090 
2091  size[x] = b[x].size();
2092  send_buffers_size += size[x];
2093  MPI_Put((uint8_t*)&size[x], sizeof(size_t), MPI_BYTE, x, 0,
2094  sizeof(size_t), MPI_BYTE, window[id]);
2095  MPI_Put((uint8_t*)b[x].linearData(), size[x], MPI_BYTE, x, sizeof(size_t),
2096  size[x], MPI_BYTE, window[id]);
2097  }
2098 
2100  net.incrementMemUsage(send_buffers_size);
2101 
2102  MPI_Win_complete(window[id]);
2103  net.decrementMemUsage(send_buffers_size);
2104 
2105  if (BitsetFnTy::is_valid()) {
2106  reset_bitset(syncType, &BitsetFnTy::reset_range);
2107  }
2108  }
2109 #endif
2110 
2123  template <WriteLocation writeLocation, ReadLocation readLocation,
2124  SyncType syncType, typename SyncFnTy, typename BitsetFnTy,
2125  typename VecTy, bool async>
2126  void syncNetSend(std::string loopName) {
2128  b; // although a static variable, allocation not reused
2129  // due to std::move in net.sendTagged()
2130 
2132  std::string syncTypeStr = (syncType == syncReduce) ? "Reduce" : "Broadcast";
2133  std::string statNumMessages_str(syncTypeStr + "NumMessages_" +
2134  get_run_identifier(loopName));
2135 
2136  size_t numMessages = 0;
2137  for (unsigned h = 1; h < numHosts; ++h) {
2138  unsigned x = (id + h) % numHosts;
2139 
2140  if (nothingToSend(x, syncType, writeLocation, readLocation))
2141  continue;
2142 
2143  getSendBuffer<syncType, SyncFnTy, BitsetFnTy, VecTy, async>(loopName, x,
2144  b);
2145 
2146  if ((!async) || (b.size() > 0)) {
2147  size_t syncTypePhase = 0;
2148  if (async && (syncType == syncBroadcast))
2149  syncTypePhase = 1;
2150  net.sendTagged(x, galois::runtime::evilPhase, b, syncTypePhase);
2151  ++numMessages;
2152  }
2153  }
2154  if (!async) {
2155  // Will force all messages to be processed before continuing
2156  net.flush();
2157  }
2158 
2159  if (BitsetFnTy::is_valid()) {
2160  reset_bitset(syncType, &BitsetFnTy::reset_range);
2161  }
2162 
2163  galois::runtime::reportStat_Tsum(RNAME, statNumMessages_str, numMessages);
2164  }
2165 
2178  template <WriteLocation writeLocation, ReadLocation readLocation,
2179  SyncType syncType, typename SyncFnTy, typename BitsetFnTy,
2180  typename VecTy, bool async>
2181  void syncSend(std::string loopName) {
2182  std::string syncTypeStr = (syncType == syncReduce) ? "Reduce" : "Broadcast";
2184  (syncTypeStr + "Send_" + get_run_identifier(loopName)).c_str(), RNAME);
2185 
2186  TSendTime.start();
2187  syncNetSend<writeLocation, readLocation, syncType, SyncFnTy, BitsetFnTy,
2188  VecTy, async>(loopName);
2189  TSendTime.stop();
2190  }
2191 
2193  // Receives
2195 
2211  template <
2212  SyncType syncType, typename SyncFnTy, typename BitsetFnTy, typename VecTy,
2213  bool async,
2214  typename std::enable_if<!BitsetFnTy::is_vector_bitset()>::type* = nullptr>
2215  size_t syncRecvApply(uint32_t from_id, galois::runtime::RecvBuffer& buf,
2216  std::string loopName) {
2217  std::string syncTypeStr = (syncType == syncReduce) ? "Reduce" : "Broadcast";
2218  std::string set_timer_str(syncTypeStr + "Set_" +
2219  get_run_identifier(loopName));
2220  galois::CondStatTimer<GALOIS_COMM_STATS> Tset(set_timer_str.c_str(), RNAME);
2221  std::string set_batch_timer_str(syncTypeStr + "SetBatch_" +
2222  get_run_identifier(loopName));
2224  set_batch_timer_str.c_str(), RNAME);
2225 
2226  galois::DynamicBitSet& bit_set_comm = syncBitset;
2227  static VecTy val_vec;
2228  galois::PODResizeableArray<unsigned int>& offsets = syncOffsets;
2229 
2230  auto& sharedNodes = (syncType == syncReduce) ? masterNodes : mirrorNodes;
2231  uint32_t num = sharedNodes[from_id].size();
2232  size_t retval = 0;
2233 
2234  Tset.start();
2235 
2236  if (num > 0) { // only enter if we expect message from that host
2237  DataCommMode data_mode;
2238  // 1st deserialize gets data mode
2239  galois::runtime::gDeserialize(buf, data_mode);
2240 
2241  if (data_mode != noData) {
2242  // GPU update call
2243  Tsetbatch.start();
2244  bool batch_succeeded =
2245  setBatchWrapper<SyncFnTy, syncType, async>(from_id, buf, data_mode);
2246  Tsetbatch.stop();
2247 
2248  // cpu always enters this block
2249  if (!batch_succeeded) {
2250  size_t bit_set_count = num;
2251  size_t buf_start = 0;
2252 
2253  // deserialize the rest of the data in the buffer depending on the
2254  // data mode; arguments passed in here are mostly output vars
2255  deserializeMessage<syncType>(loopName, data_mode, num, buf,
2256  bit_set_count, offsets, bit_set_comm,
2257  buf_start, retval, val_vec);
2258 
2259  bit_set_comm.reserve(maxSharedSize);
2260  offsets.reserve(maxSharedSize);
2261  val_vec.reserve(maxSharedSize);
2262 
2263  galois::DynamicBitSet& bit_set_compute = BitsetFnTy::get();
2264 
2265  if (data_mode == bitsetData) {
2266  size_t bit_set_count2;
2267  getOffsetsFromBitset<syncType>(loopName, bit_set_comm, offsets,
2268  bit_set_count2);
2269  assert(bit_set_count == bit_set_count2);
2270  }
2271 
2272  if (data_mode == onlyData) {
2273  setSubset<decltype(sharedNodes[from_id]), SyncFnTy, syncType, VecTy,
2274  async, true, true>(loopName, sharedNodes[from_id],
2275  bit_set_count, offsets, val_vec,
2276  bit_set_compute);
2277  } else if (data_mode == dataSplit || data_mode == dataSplitFirst) {
2278  setSubset<decltype(sharedNodes[from_id]), SyncFnTy, syncType, VecTy,
2279  async, true, true>(loopName, sharedNodes[from_id],
2280  bit_set_count, offsets, val_vec,
2281  bit_set_compute, buf_start);
2282  } else if (data_mode == gidsData) {
2283  setSubset<decltype(offsets), SyncFnTy, syncType, VecTy, async, true,
2284  true>(loopName, offsets, bit_set_count, offsets, val_vec,
2285  bit_set_compute);
2286  } else { // bitsetData or offsetsData
2287  setSubset<decltype(sharedNodes[from_id]), SyncFnTy, syncType, VecTy,
2288  async, false, true>(loopName, sharedNodes[from_id],
2289  bit_set_count, offsets, val_vec,
2290  bit_set_compute);
2291  }
2292  // TODO: reduce could update the bitset, so it needs to be copied
2293  // back to the device
2294  }
2295  }
2296  }
2297 
2298  Tset.stop();
2299 
2300  return retval;
2301  }
2302 
2323  template <
2324  SyncType syncType, typename SyncFnTy, typename BitsetFnTy, typename VecTy,
2325  bool async,
2326  typename std::enable_if<BitsetFnTy::is_vector_bitset()>::type* = nullptr>
2327  size_t syncRecvApply(uint32_t from_id, galois::runtime::RecvBuffer& buf,
2328  std::string loopName) {
2329  std::string syncTypeStr = (syncType == syncReduce) ? "Reduce" : "Broadcast";
2330  std::string set_timer_str(syncTypeStr + "SetVector_" +
2331  get_run_identifier(loopName));
2332  galois::CondStatTimer<GALOIS_COMM_STATS> Tset(set_timer_str.c_str(), RNAME);
2333 
2334  galois::DynamicBitSet& bit_set_comm = syncBitset;
2335  static VecTy val_vec;
2336  galois::PODResizeableArray<unsigned int>& offsets = syncOffsets;
2337 
2338  auto& sharedNodes = (syncType == syncReduce) ? masterNodes : mirrorNodes;
2339  uint32_t num = sharedNodes[from_id].size();
2340  size_t retval = 0;
2341 
2342  Tset.start();
2343 
2344  if (num > 0) { // only enter if we expect message from that host
2345  for (unsigned i = 0; i < BitsetFnTy::numBitsets(); i++) {
2346  DataCommMode data_mode;
2347  // 1st deserialize gets data mode
2348  galois::runtime::gDeserialize(buf, data_mode);
2349 
2350  if (data_mode != noData) {
2351  size_t bit_set_count = num;
2352  size_t buf_start = 0;
2353 
2354  // deserialize the rest of the data in the buffer depending on the
2355  // data mode; arguments passed in here are mostly output vars
2356  deserializeMessage<syncType>(loopName, data_mode, num, buf,
2357  bit_set_count, offsets, bit_set_comm,
2358  buf_start, retval, val_vec);
2359 
2360  galois::DynamicBitSet& bit_set_compute = BitsetFnTy::get(i);
2361 
2362  if (data_mode == bitsetData) {
2363  size_t bit_set_count2;
2364  getOffsetsFromBitset<syncType>(loopName, bit_set_comm, offsets,
2365  bit_set_count2);
2366  assert(bit_set_count == bit_set_count2);
2367  }
2368 
2369  // Note the extra template argument and i argument which cause
2370  // execution to deal with a particular element of the vector field
2371  // we are synchronizing
2372  if (data_mode == onlyData) {
2373  setSubset<decltype(sharedNodes[from_id]), SyncFnTy, syncType, VecTy,
2374  async, true, true, true>(loopName, sharedNodes[from_id],
2375  bit_set_count, offsets, val_vec,
2376  bit_set_compute, i);
2377  } else if (data_mode == dataSplit || data_mode == dataSplitFirst) {
2378  setSubset<decltype(sharedNodes[from_id]), SyncFnTy, syncType, VecTy,
2379  true, async, true, true, true>(
2380  loopName, sharedNodes[from_id], bit_set_count, offsets, val_vec,
2381  bit_set_compute, i, buf_start);
2382  } else if (data_mode == gidsData) {
2383  setSubset<decltype(offsets), SyncFnTy, syncType, VecTy, async, true,
2384  true, true>(loopName, offsets, bit_set_count, offsets,
2385  val_vec, bit_set_compute, i);
2386  } else { // bitsetData or offsetsData
2387  setSubset<decltype(sharedNodes[from_id]), SyncFnTy, syncType, VecTy,
2388  async, false, true, true>(loopName, sharedNodes[from_id],
2389  bit_set_count, offsets, val_vec,
2390  bit_set_compute, i);
2391  }
2392  }
2393  }
2394  }
2395 
2396  Tset.stop();
2397 
2398  return retval;
2399  }
2400 
2401 #ifdef GALOIS_USE_BARE_MPI
2402 
2405  template <WriteLocation writeLocation, ReadLocation readLocation,
2406  SyncType syncType, typename SyncFnTy, typename BitsetFnTy>
2407  void sync_mpi_recv_post(std::vector<MPI_Request>& request,
2408  const std::vector<std::vector<uint8_t>>& rb) {
2409  for (unsigned h = 1; h < numHosts; ++h) {
2410  unsigned x = (id + numHosts - h) % numHosts;
2411  if (nothingToRecv(x, syncType, writeLocation, readLocation))
2412  continue;
2413 
2414  MPI_Irecv((uint8_t*)rb[x].data(), rb[x].size(), MPI_BYTE, x, 32767,
2415  MPI_COMM_WORLD, &request[x]);
2416  }
2417  }
2418 
2422  template <WriteLocation writeLocation, ReadLocation readLocation,
2423  SyncType syncType, typename SyncFnTy, typename BitsetFnTy,
2424  typename VecTy, bool async>
2425  void sync_mpi_recv_wait(std::string loopName,
2426  std::vector<MPI_Request>& request,
2427  const std::vector<std::vector<uint8_t>>& rb) {
2428  for (unsigned h = 1; h < numHosts; ++h) {
2429  unsigned x = (id + numHosts - h) % numHosts;
2430  if (nothingToRecv(x, syncType, writeLocation, readLocation))
2431  continue;
2432 
2433  MPI_Status status;
2434  MPI_Wait(&request[x], &status);
2435 
2436  int size = 0;
2437  MPI_Get_count(&status, MPI_BYTE, &size);
2438 
2439  galois::runtime::RecvBuffer rbuf(rb[x].begin(), rb[x].begin() + size);
2440 
2441  syncRecvApply<syncType, SyncFnTy, BitsetFnTy, VecTy, async>(x, rbuf,
2442  loopName);
2443  }
2444  }
2445 
2449  template <WriteLocation writeLocation, ReadLocation readLocation,
2450  SyncType syncType, typename SyncFnTy, typename BitsetFnTy,
2451  typename VecTy, bool async>
2452  void sync_mpi_get(std::string loopName, const std::vector<MPI_Win>& window,
2453  const std::vector<std::vector<uint8_t>>& rb) {
2454  for (unsigned h = 1; h < numHosts; ++h) {
2455  unsigned x = (id + numHosts - h) % numHosts;
2456  if (nothingToRecv(x, syncType, writeLocation, readLocation))
2457  continue;
2458 
2459  MPI_Win_wait(window[x]);
2460 
2461  size_t size = 0;
2462  memcpy(&size, rb[x].data(), sizeof(size_t));
2463 
2464  galois::runtime::RecvBuffer rbuf(rb[x].begin() + sizeof(size_t),
2465  rb[x].begin() + sizeof(size_t) + size);
2466 
2467  MPI_Win_post(mpi_identity_groups[x], 0, window[x]);
2468 
2469  syncRecvApply<syncType, SyncFnTy, BitsetFnTy, VecTy, async>(x, rbuf,
2470  loopName);
2471  }
2472  }
2473 #endif
2474 
2487  template <WriteLocation writeLocation, ReadLocation readLocation,
2488  SyncType syncType, typename SyncFnTy, typename BitsetFnTy,
2489  typename VecTy, bool async>
2490  void syncNetRecv(std::string loopName) {
2492  std::string wait_timer_str("Wait_" + get_run_identifier(loopName));
2493  galois::CondStatTimer<GALOIS_COMM_STATS> Twait(wait_timer_str.c_str(),
2494  RNAME);
2495 
2496  if (async) {
2497  size_t syncTypePhase = 0;
2498  if (syncType == syncBroadcast)
2499  syncTypePhase = 1;
2500  decltype(net.recieveTagged(galois::runtime::evilPhase, nullptr,
2501  syncTypePhase)) p;
2502  do {
2503  p = net.recieveTagged(galois::runtime::evilPhase, nullptr,
2504  syncTypePhase);
2505 
2506  if (p) {
2507  syncRecvApply<syncType, SyncFnTy, BitsetFnTy, VecTy, async>(
2508  p->first, p->second, loopName);
2509  }
2510  } while (p);
2511  } else {
2512  for (unsigned x = 0; x < numHosts; ++x) {
2513  if (x == id)
2514  continue;
2515  if (nothingToRecv(x, syncType, writeLocation, readLocation))
2516  continue;
2517 
2518  Twait.start();
2519  decltype(net.recieveTagged(galois::runtime::evilPhase, nullptr)) p;
2520  do {
2521  p = net.recieveTagged(galois::runtime::evilPhase, nullptr);
2522  } while (!p);
2523  Twait.stop();
2524 
2525  syncRecvApply<syncType, SyncFnTy, BitsetFnTy, VecTy, async>(
2526  p->first, p->second, loopName);
2527  }
2528  incrementEvilPhase();
2529  }
2530  }
2531 
2544  template <WriteLocation writeLocation, ReadLocation readLocation,
2545  SyncType syncType, typename SyncFnTy, typename BitsetFnTy,
2546  typename VecTy, bool async>
2547  void syncRecv(std::string loopName) {
2548  std::string syncTypeStr = (syncType == syncReduce) ? "Reduce" : "Broadcast";
2550  (syncTypeStr + "Recv_" + get_run_identifier(loopName)).c_str(), RNAME);
2551 
2552  TRecvTime.start();
2553  syncNetRecv<writeLocation, readLocation, syncType, SyncFnTy, BitsetFnTy,
2554  VecTy, async>(loopName);
2555  TRecvTime.stop();
2556  }
2557 
2559 // MPI sync variants
2561 #ifdef GALOIS_USE_BARE_MPI
2562 
2565  template <WriteLocation writeLocation, ReadLocation readLocation,
2566  SyncType syncType, typename SyncFnTy, typename BitsetFnTy,
2567  typename VecTy, bool async>
2568  void syncNonblockingMPI(std::string loopName,
2569  bool use_bitset_to_send = true) {
2570  std::string syncTypeStr = (syncType == syncReduce) ? "Reduce" : "Broadcast";
2572  (syncTypeStr + "Send_" + get_run_identifier(loopName)).c_str(), RNAME);
2574  (syncTypeStr + "Recv_" + get_run_identifier(loopName)).c_str(), RNAME);
2575 
2576  static std::vector<std::vector<uint8_t>> rb;
2577  static std::vector<MPI_Request> request;
2578 
2579  if (rb.size() == 0) { // create the receive buffers
2580  TRecvTime.start();
2581  auto& sharedNodes = (syncType == syncReduce) ? masterNodes : mirrorNodes;
2582  rb.resize(numHosts);
2583  request.resize(numHosts, MPI_REQUEST_NULL);
2584 
2585  for (unsigned h = 1; h < numHosts; ++h) {
2586  unsigned x = (id + numHosts - h) % numHosts;
2587  if (nothingToRecv(x, syncType, writeLocation, readLocation))
2588  continue;
2589 
2590  size_t size = getMaxSendBufferSize<SyncFnTy>(sharedNodes[x].size());
2591  rb[x].resize(size);
2592  }
2593  TRecvTime.stop();
2594  }
2595 
2596  TRecvTime.start();
2597  sync_mpi_recv_post<writeLocation, readLocation, syncType, SyncFnTy,
2598  BitsetFnTy>(request, rb);
2599  TRecvTime.stop();
2600 
2601  TSendTime.start();
2602  if (use_bitset_to_send) {
2603  sync_mpi_send<writeLocation, readLocation, syncType, SyncFnTy, BitsetFnTy,
2604  VecTy, async>(loopName);
2605  } else {
2606  sync_mpi_send<writeLocation, readLocation, syncType, SyncFnTy,
2607  galois::InvalidBitsetFnTy, VecTy, async>(loopName);
2608  }
2609  TSendTime.stop();
2610 
2611  TRecvTime.start();
2612  sync_mpi_recv_wait<writeLocation, readLocation, syncType, SyncFnTy,
2613  BitsetFnTy, VecTy, async>(loopName, request, rb);
2614  TRecvTime.stop();
2615  }
2616 
2620  template <WriteLocation writeLocation, ReadLocation readLocation,
2621  SyncType syncType, typename SyncFnTy, typename BitsetFnTy,
2622  typename VecTy, bool async>
2623  void syncOnesidedMPI(std::string loopName, bool use_bitset_to_send = true) {
2624  std::string syncTypeStr = (syncType == syncReduce) ? "Reduce" : "Broadcast";
2626  (syncTypeStr + "Send_" + get_run_identifier(loopName)).c_str(), RNAME);
2628  (syncTypeStr + "Recv_" + get_run_identifier(loopName)).c_str(), RNAME);
2629 
2630  static std::vector<MPI_Win> window;
2631  static MPI_Group mpi_access_group;
2632  static std::vector<std::vector<uint8_t>> rb;
2633 
2634  if (window.size() == 0) { // create the windows
2635  TRecvTime.start();
2636  auto& sharedNodes = (syncType == syncReduce) ? masterNodes : mirrorNodes;
2637  window.resize(numHosts);
2638  rb.resize(numHosts);
2639 
2640  uint64_t recv_buffers_size = 0;
2641  for (unsigned x = 0; x < numHosts; ++x) {
2642  size_t size = getMaxSendBufferSize<SyncFnTy>(sharedNodes[x].size());
2643  rb[x].resize(size);
2644  recv_buffers_size += size;
2645 
2646  MPI_Info info;
2647  MPI_Info_create(&info);
2648  MPI_Info_set(info, "no_locks", "true");
2649  MPI_Info_set(info, "same_disp_unit", "true");
2650 
2651  MPI_Win_create(rb[x].data(), size, 1, info, MPI_COMM_WORLD, &window[x]);
2652 
2653  MPI_Info_free(&info);
2654  }
2656  net.incrementMemUsage(recv_buffers_size);
2657 
2658  for (unsigned h = 1; h < numHosts; ++h) {
2659  unsigned x = (id + numHosts - h) % numHosts;
2660  if (nothingToRecv(x, syncType, writeLocation, readLocation))
2661  continue;
2662  // exposure group of each window is same as identity group of that
2663  // window
2664  MPI_Win_post(mpi_identity_groups[x], 0, window[x]);
2665  }
2666  TRecvTime.stop();
2667 
2668  TSendTime.start();
2669  std::vector<int> access_hosts;
2670  for (unsigned h = 1; h < numHosts; ++h) {
2671  unsigned x = (id + h) % numHosts;
2672 
2673  if (nothingToSend(x, syncType, writeLocation, readLocation))
2674  continue;
2675 
2676  access_hosts.push_back(x);
2677  }
2678  MPI_Group world_group;
2679  MPI_Comm_group(MPI_COMM_WORLD, &world_group);
2680  // access group for only one window since only one window is accessed
2681  MPI_Group_incl(world_group, access_hosts.size(), access_hosts.data(),
2682  &mpi_access_group);
2683  TSendTime.stop();
2684  }
2685 
2686  TSendTime.start();
2687  if (use_bitset_to_send) {
2688  sync_mpi_put<writeLocation, readLocation, syncType, SyncFnTy, BitsetFnTy,
2689  VecTy, async>(loopName, mpi_access_group, window);
2690  } else {
2691  sync_mpi_put<writeLocation, readLocation, syncType, SyncFnTy,
2692  galois::InvalidBitsetFnTy, VecTy, async>(
2693  loopName, mpi_access_group, window);
2694  }
2695  TSendTime.stop();
2696 
2697  TRecvTime.start();
2698  sync_mpi_get<writeLocation, readLocation, syncType, SyncFnTy, BitsetFnTy,
2699  VecTy, async>(loopName, window, rb);
2700  TRecvTime.stop();
2701  }
2702 #endif
2703 
2705  // Higher Level Sync Calls (broadcast/reduce, etc)
2707 
2718  template <WriteLocation writeLocation, ReadLocation readLocation,
2719  typename ReduceFnTy, typename BitsetFnTy, bool async>
2720  inline void reduce(std::string loopName) {
2721  std::string timer_str("Reduce_" + get_run_identifier(loopName));
2722  galois::CondStatTimer<GALOIS_COMM_STATS> TsyncReduce(timer_str.c_str(),
2723  RNAME);
2724 
2725  typedef typename ReduceFnTy::ValTy T;
2726  typedef
2727  typename std::conditional<galois::runtime::is_memory_copyable<T>::value,
2729  galois::gstl::Vector<T>>::type VecTy;
2730 
2731  TsyncReduce.start();
2732 
2733 #ifdef GALOIS_USE_BARE_MPI
2734  switch (bare_mpi) {
2735  case noBareMPI:
2736 #endif
2737  syncSend<writeLocation, readLocation, syncReduce, ReduceFnTy, BitsetFnTy,
2738  VecTy, async>(loopName);
2739  syncRecv<writeLocation, readLocation, syncReduce, ReduceFnTy, BitsetFnTy,
2740  VecTy, async>(loopName);
2741 #ifdef GALOIS_USE_BARE_MPI
2742  break;
2743  case nonBlockingBareMPI:
2744  syncNonblockingMPI<writeLocation, readLocation, syncReduce, ReduceFnTy,
2745  BitsetFnTy, VecTy, async>(loopName);
2746  break;
2747  case oneSidedBareMPI:
2748  syncOnesidedMPI<writeLocation, readLocation, syncReduce, ReduceFnTy,
2749  BitsetFnTy, VecTy, async>(loopName);
2750  break;
2751  default:
2752  GALOIS_DIE("unsupported bare MPI");
2753  }
2754 #endif
2755 
2756  TsyncReduce.stop();
2757  }
2758 
2769  template <WriteLocation writeLocation, ReadLocation readLocation,
2770  typename BroadcastFnTy, typename BitsetFnTy, bool async>
2771  inline void broadcast(std::string loopName) {
2772  std::string timer_str("Broadcast_" + get_run_identifier(loopName));
2773  galois::CondStatTimer<GALOIS_COMM_STATS> TsyncBroadcast(timer_str.c_str(),
2774  RNAME);
2775 
2776  typedef typename BroadcastFnTy::ValTy T;
2777  typedef
2778  typename std::conditional<galois::runtime::is_memory_copyable<T>::value,
2779  galois::PODResizeableArray<T>,
2780  galois::gstl::Vector<T>>::type VecTy;
2781 
2782  TsyncBroadcast.start();
2783 
2784  bool use_bitset = true;
2785 
2786  if (currentBVFlag != nullptr) {
2787  if (readLocation == readSource &&
2788  galois::runtime::src_invalid(*currentBVFlag)) {
2789  use_bitset = false;
2790  *currentBVFlag = BITVECTOR_STATUS::NONE_INVALID;
2791  currentBVFlag = nullptr;
2792  } else if (readLocation == readDestination &&
2793  galois::runtime::dst_invalid(*currentBVFlag)) {
2794  use_bitset = false;
2795  *currentBVFlag = BITVECTOR_STATUS::NONE_INVALID;
2796  currentBVFlag = nullptr;
2797  } else if (readLocation == readAny &&
2798  *currentBVFlag != BITVECTOR_STATUS::NONE_INVALID) {
2799  // the bitvector flag being non-null means this call came from
2800  // sync on demand; sync on demand will NEVER use readAny
2801  // if location is read Any + one of src or dst is invalid
2802  GALOIS_DIE("readAny + use of bitvector flag without none_invalid "
2803  "should never happen");
2804  }
2805  }
2806 
2807 #ifdef GALOIS_USE_BARE_MPI
2808  switch (bare_mpi) {
2809  case noBareMPI:
2810 #endif
2811  if (use_bitset) {
2812  syncSend<writeLocation, readLocation, syncBroadcast, BroadcastFnTy,
2813  BitsetFnTy, VecTy, async>(loopName);
2814  } else {
2815  syncSend<writeLocation, readLocation, syncBroadcast, BroadcastFnTy,
2816  galois::InvalidBitsetFnTy, VecTy, async>(loopName);
2817  }
2818  syncRecv<writeLocation, readLocation, syncBroadcast, BroadcastFnTy,
2819  BitsetFnTy, VecTy, async>(loopName);
2820 #ifdef GALOIS_USE_BARE_MPI
2821  break;
2822  case nonBlockingBareMPI:
2823  syncNonblockingMPI<writeLocation, readLocation, syncBroadcast,
2824  BroadcastFnTy, BitsetFnTy, VecTy, async>(loopName,
2825  use_bitset);
2826  break;
2827  case oneSidedBareMPI:
2828  syncOnesidedMPI<writeLocation, readLocation, syncBroadcast, BroadcastFnTy,
2829  BitsetFnTy, VecTy, async>(loopName, use_bitset);
2830  break;
2831  default:
2832  GALOIS_DIE("unsupported bare MPI");
2833  }
2834 #endif
2835 
2836  TsyncBroadcast.stop();
2837  }
2838 
2847  template <typename SyncFnTy, typename BitsetFnTy, bool async>
2848  inline void sync_src_to_src(std::string loopName) {
2849  // do nothing for OEC
2850  // reduce and broadcast for IEC, CVC, UVC
2851  if (transposed || isVertexCut) {
2852  reduce<writeSource, readSource, SyncFnTy, BitsetFnTy, async>(loopName);
2853  broadcast<writeSource, readSource, SyncFnTy, BitsetFnTy, async>(loopName);
2854  }
2855  }
2856 
2865  template <typename SyncFnTy, typename BitsetFnTy, bool async>
2866  inline void sync_src_to_dst(std::string loopName) {
2867  // only broadcast for OEC
2868  // only reduce for IEC
2869  // reduce and broadcast for CVC, UVC
2870  if (transposed) {
2871  reduce<writeSource, readDestination, SyncFnTy, BitsetFnTy, async>(
2872  loopName);
2873  if (isVertexCut) {
2874  broadcast<writeSource, readDestination, SyncFnTy, BitsetFnTy, async>(
2875  loopName);
2876  }
2877  } else {
2878  if (isVertexCut) {
2879  reduce<writeSource, readDestination, SyncFnTy, BitsetFnTy, async>(
2880  loopName);
2881  }
2882  broadcast<writeSource, readDestination, SyncFnTy, BitsetFnTy, async>(
2883  loopName);
2884  }
2885  }
2886 
2895  template <typename SyncFnTy, typename BitsetFnTy, bool async>
2896  inline void sync_src_to_any(std::string loopName) {
2897  // only broadcast for OEC
2898  // reduce and broadcast for IEC, CVC, UVC
2899  if (transposed || isVertexCut) {
2900  reduce<writeSource, readAny, SyncFnTy, BitsetFnTy, async>(loopName);
2901  }
2902  broadcast<writeSource, readAny, SyncFnTy, BitsetFnTy, async>(loopName);
2903  }
2904 
2913  template <typename SyncFnTy, typename BitsetFnTy, bool async>
2914  inline void sync_dst_to_src(std::string loopName) {
2915  // only reduce for OEC
2916  // only broadcast for IEC
2917  // reduce and broadcast for CVC, UVC
2918  if (transposed) {
2919  if (isVertexCut) {
2920  reduce<writeDestination, readSource, SyncFnTy, BitsetFnTy, async>(
2921  loopName);
2922  }
2923  broadcast<writeDestination, readSource, SyncFnTy, BitsetFnTy, async>(
2924  loopName);
2925  } else {
2926  reduce<writeDestination, readSource, SyncFnTy, BitsetFnTy, async>(
2927  loopName);
2928  if (isVertexCut) {
2929  broadcast<writeDestination, readSource, SyncFnTy, BitsetFnTy, async>(
2930  loopName);
2931  }
2932  }
2933  }
2934 
2943  template <typename SyncFnTy, typename BitsetFnTy, bool async>
2944  inline void sync_dst_to_dst(std::string loopName) {
2945  // do nothing for IEC
2946  // reduce and broadcast for OEC, CVC, UVC
2947  if (!transposed || isVertexCut) {
2948  reduce<writeDestination, readDestination, SyncFnTy, BitsetFnTy, async>(
2949  loopName);
2950  broadcast<writeDestination, readDestination, SyncFnTy, BitsetFnTy, async>(
2951  loopName);
2952  }
2953  }
2954 
2963  template <typename SyncFnTy, typename BitsetFnTy, bool async>
2964  inline void sync_dst_to_any(std::string loopName) {
2965  // only broadcast for IEC
2966  // reduce and broadcast for OEC, CVC, UVC
2967  if (!transposed || isVertexCut) {
2968  reduce<writeDestination, readAny, SyncFnTy, BitsetFnTy, async>(loopName);
2969  }
2970  broadcast<writeDestination, readAny, SyncFnTy, BitsetFnTy, async>(loopName);
2971  }
2972 
2981  template <typename SyncFnTy, typename BitsetFnTy, bool async>
2982  inline void sync_any_to_src(std::string loopName) {
2983  // only reduce for OEC
2984  // reduce and broadcast for IEC, CVC, UVC
2985  reduce<writeAny, readSource, SyncFnTy, BitsetFnTy, async>(loopName);
2986  if (transposed || isVertexCut) {
2987  broadcast<writeAny, readSource, SyncFnTy, BitsetFnTy, async>(loopName);
2988  }
2989  }
2990 
2999  template <typename SyncFnTy, typename BitsetFnTy, bool async>
3000  inline void sync_any_to_dst(std::string loopName) {
3001  // only reduce for IEC
3002  // reduce and broadcast for OEC, CVC, UVC
3003  reduce<writeAny, readDestination, SyncFnTy, BitsetFnTy, async>(loopName);
3004 
3005  if (!transposed || isVertexCut) {
3006  broadcast<writeAny, readDestination, SyncFnTy, BitsetFnTy, async>(
3007  loopName);
3008  }
3009  }
3010 
3019  template <typename SyncFnTy, typename BitsetFnTy, bool async>
3020  inline void sync_any_to_any(std::string loopName) {
3021  // reduce and broadcast for OEC, IEC, CVC, UVC
3022  reduce<writeAny, readAny, SyncFnTy, BitsetFnTy, async>(loopName);
3023  broadcast<writeAny, readAny, SyncFnTy, BitsetFnTy, async>(loopName);
3024  }
3025 
3027  // Public iterface: sync
3029 
3030 public:
3043  template <WriteLocation writeLocation, ReadLocation readLocation,
3044  typename SyncFnTy, typename BitsetFnTy = galois::InvalidBitsetFnTy,
3045  bool async = false>
3046  inline void sync(std::string loopName) {
3047  std::string timer_str("Sync_" + loopName + "_" + get_run_identifier());
3048  galois::StatTimer Tsync(timer_str.c_str(), RNAME);
3049 
3050  Tsync.start();
3051 
3052  if (partitionAgnostic) {
3053  sync_any_to_any<SyncFnTy, BitsetFnTy, async>(loopName);
3054  } else {
3055  if (writeLocation == writeSource) {
3056  if (readLocation == readSource) {
3057  sync_src_to_src<SyncFnTy, BitsetFnTy, async>(loopName);
3058  } else if (readLocation == readDestination) {
3059  sync_src_to_dst<SyncFnTy, BitsetFnTy, async>(loopName);
3060  } else { // readAny
3061  sync_src_to_any<SyncFnTy, BitsetFnTy, async>(loopName);
3062  }
3063  } else if (writeLocation == writeDestination) {
3064  if (readLocation == readSource) {
3065  sync_dst_to_src<SyncFnTy, BitsetFnTy, async>(loopName);
3066  } else if (readLocation == readDestination) {
3067  sync_dst_to_dst<SyncFnTy, BitsetFnTy, async>(loopName);
3068  } else { // readAny
3069  sync_dst_to_any<SyncFnTy, BitsetFnTy, async>(loopName);
3070  }
3071  } else { // writeAny
3072  if (readLocation == readSource) {
3073  sync_any_to_src<SyncFnTy, BitsetFnTy, async>(loopName);
3074  } else if (readLocation == readDestination) {
3075  sync_any_to_dst<SyncFnTy, BitsetFnTy, async>(loopName);
3076  } else { // readAny
3077  sync_any_to_any<SyncFnTy, BitsetFnTy, async>(loopName);
3078  }
3079  }
3080  }
3081 
3082  Tsync.stop();
3083  }
3084 
3086  // Sync on demand code (unmaintained, may not work)
3088 private:
3093  template <ReadLocation rl, typename SyncFnTy, typename BitsetFnTy>
3094  struct SyncOnDemandHandler {
3095  // note this call function signature is diff. from specialized versions:
3096  // will cause compile time error if this struct is used (which is what
3097  // we want)
3098  void call() { GALOIS_DIE("invalid read location for sync on demand"); }
3099  };
3100 
3107  template <typename SyncFnTy, typename BitsetFnTy>
3108  struct SyncOnDemandHandler<readSource, SyncFnTy, BitsetFnTy> {
3120  static inline void call(GluonSubstrate* substrate,
3121  galois::runtime::FieldFlags& fieldFlags,
3122  std::string loopName, const BITVECTOR_STATUS&) {
3123  if (fieldFlags.src_to_src() && fieldFlags.dst_to_src()) {
3124  substrate->sync_any_to_src<SyncFnTy, BitsetFnTy>(loopName);
3125  } else if (fieldFlags.src_to_src()) {
3126  substrate->sync_src_to_src<SyncFnTy, BitsetFnTy>(loopName);
3127  } else if (fieldFlags.dst_to_src()) {
3128  substrate->sync_dst_to_src<SyncFnTy, BitsetFnTy>(loopName);
3129  }
3130 
3131  fieldFlags.clear_read_src();
3132  }
3133  };
3134 
3141  template <typename SyncFnTy, typename BitsetFnTy>
3142  struct SyncOnDemandHandler<readDestination, SyncFnTy, BitsetFnTy> {
3154  static inline void call(GluonSubstrate* substrate,
3155  galois::runtime::FieldFlags& fieldFlags,
3156  std::string loopName, const BITVECTOR_STATUS&) {
3157  if (fieldFlags.src_to_dst() && fieldFlags.dst_to_dst()) {
3158  substrate->sync_any_to_dst<SyncFnTy, BitsetFnTy>(loopName);
3159  } else if (fieldFlags.src_to_dst()) {
3160  substrate->sync_src_to_dst<SyncFnTy, BitsetFnTy>(loopName);
3161  } else if (fieldFlags.dst_to_dst()) {
3162  substrate->sync_dst_to_dst<SyncFnTy, BitsetFnTy>(loopName);
3163  }
3164 
3165  fieldFlags.clear_read_dst();
3166  }
3167  };
3168 
3175  template <typename SyncFnTy, typename BitsetFnTy>
3176  struct SyncOnDemandHandler<readAny, SyncFnTy, BitsetFnTy> {
3188  static inline void call(GluonSubstrate* substrate,
3189  galois::runtime::FieldFlags& fieldFlags,
3190  std::string loopName,
3191  const BITVECTOR_STATUS& bvFlag) {
3192  bool src_write = fieldFlags.src_to_src() || fieldFlags.src_to_dst();
3193  bool dst_write = fieldFlags.dst_to_src() || fieldFlags.dst_to_dst();
3194 
3195  if (!(src_write && dst_write)) {
3196  // src or dst write flags aren't set (potentially both are not set),
3197  // but it's NOT the case that both are set, meaning "any" isn't
3198  // required in the "from"; can work at granularity of just src
3199  // write or dst wrte
3200 
3201  if (src_write) {
3202  if (fieldFlags.src_to_src() && fieldFlags.src_to_dst()) {
3203  if (bvFlag == BITVECTOR_STATUS::NONE_INVALID) {
3204  substrate->sync_src_to_any<SyncFnTy, BitsetFnTy>(loopName);
3205  } else if (galois::runtime::src_invalid(bvFlag)) {
3206  // src invalid bitset; sync individually so it can be called
3207  // without bitset
3208  substrate->sync_src_to_dst<SyncFnTy, BitsetFnTy>(loopName);
3209  substrate->sync_src_to_src<SyncFnTy, BitsetFnTy>(loopName);
3210  } else if (galois::runtime::dst_invalid(bvFlag)) {
3211  // dst invalid bitset; sync individually so it can be called
3212  // without bitset
3213  substrate->sync_src_to_src<SyncFnTy, BitsetFnTy>(loopName);
3214  substrate->sync_src_to_dst<SyncFnTy, BitsetFnTy>(loopName);
3215  } else {
3216  GALOIS_DIE("invalid bitvector flag setting in syncOnDemand");
3217  }
3218  } else if (fieldFlags.src_to_src()) {
3219  substrate->sync_src_to_src<SyncFnTy, BitsetFnTy>(loopName);
3220  } else { // src to dst is set
3221  substrate->sync_src_to_dst<SyncFnTy, BitsetFnTy>(loopName);
3222  }
3223  } else if (dst_write) {
3224  if (fieldFlags.dst_to_src() && fieldFlags.dst_to_dst()) {
3225  if (bvFlag == BITVECTOR_STATUS::NONE_INVALID) {
3226  substrate->sync_dst_to_any<SyncFnTy, BitsetFnTy>(loopName);
3227  } else if (galois::runtime::src_invalid(bvFlag)) {
3228  substrate->sync_dst_to_dst<SyncFnTy, BitsetFnTy>(loopName);
3229  substrate->sync_dst_to_src<SyncFnTy, BitsetFnTy>(loopName);
3230  } else if (galois::runtime::dst_invalid(bvFlag)) {
3231  substrate->sync_dst_to_src<SyncFnTy, BitsetFnTy>(loopName);
3232  substrate->sync_dst_to_dst<SyncFnTy, BitsetFnTy>(loopName);
3233  } else {
3234  GALOIS_DIE("invalid bitvector flag setting in syncOnDemand");
3235  }
3236  } else if (fieldFlags.dst_to_src()) {
3237  substrate->sync_dst_to_src<SyncFnTy, BitsetFnTy>(loopName);
3238  } else { // dst to dst is set
3239  substrate->sync_dst_to_dst<SyncFnTy, BitsetFnTy>(loopName);
3240  }
3241  }
3242 
3243  // note the "no flags are set" case will enter into this block
3244  // as well, and it is correctly handled by doing nothing since
3245  // both src/dst_write will be false
3246  } else {
3247  // it is the case that both src/dst write flags are set, so "any"
3248  // is required in the "from"; what remains to be determined is
3249  // the use of src, dst, or any for the destination of the sync
3250  bool src_read = fieldFlags.src_to_src() || fieldFlags.dst_to_src();
3251  bool dst_read = fieldFlags.src_to_dst() || fieldFlags.dst_to_dst();
3252 
3253  if (src_read && dst_read) {
3254  if (bvFlag == BITVECTOR_STATUS::NONE_INVALID) {
3255  substrate->sync_any_to_any<SyncFnTy, BitsetFnTy>(loopName);
3256  } else if (galois::runtime::src_invalid(bvFlag)) {
3257  substrate->sync_any_to_dst<SyncFnTy, BitsetFnTy>(loopName);
3258  substrate->sync_any_to_src<SyncFnTy, BitsetFnTy>(loopName);
3259  } else if (galois::runtime::dst_invalid(bvFlag)) {
3260  substrate->sync_any_to_src<SyncFnTy, BitsetFnTy>(loopName);
3261  substrate->sync_any_to_dst<SyncFnTy, BitsetFnTy>(loopName);
3262  } else {
3263  GALOIS_DIE("invalid bitvector flag setting in syncOnDemand");
3264  }
3265  } else if (src_read) {
3266  substrate->sync_any_to_src<SyncFnTy, BitsetFnTy>(loopName);
3267  } else { // dst_read
3268  substrate->sync_any_to_dst<SyncFnTy, BitsetFnTy>(loopName);
3269  }
3270  }
3271 
3272  fieldFlags.clear_read_src();
3273  fieldFlags.clear_read_dst();
3274  }
3275  };
3276 
3278  // GPU marshaling
3280 
3281 #ifdef GALOIS_ENABLE_GPU
3282 private:
3283  using GraphNode = typename GraphTy::GraphNode;
3284  using edge_iterator = typename GraphTy::edge_iterator;
3285  using EdgeTy = typename GraphTy::EdgeType;
3286 
3287  // Code that handles getting the graph onto the GPU
3288  template <bool isVoidType,
3289  typename std::enable_if<isVoidType>::type* = nullptr>
3290  inline void setMarshalEdge(MarshalGraph& GALOIS_UNUSED(m),
3291  const size_t GALOIS_UNUSED(index),
3292  const edge_iterator& GALOIS_UNUSED(e)) {
3293  // do nothing
3294  }
3295 
3296  template <bool isVoidType,
3297  typename std::enable_if<!isVoidType>::type* = nullptr>
3298  inline void setMarshalEdge(MarshalGraph& m, const size_t index,
3299  const edge_iterator& e) {
3300  m.edge_data[index] = userGraph.getEdgeData(e);
3301  }
3302 
3303 public:
3304  void getMarshalGraph(MarshalGraph& m) {
3305  m.nnodes = userGraph.size();
3306  m.nedges = userGraph.sizeEdges();
3307  m.numOwned = userGraph.numMasters();
3308  // Assumption: master occurs at beginning in contiguous range
3309  m.beginMaster = 0;
3310  m.numNodesWithEdges = userGraph.getNumNodesWithEdges();
3311  m.id = id;
3312  m.numHosts = numHosts;
3313  m.row_start = (index_type*)calloc(m.nnodes + 1, sizeof(index_type));
3314  m.edge_dst = (index_type*)calloc(m.nedges, sizeof(index_type));
3315  m.node_data = (index_type*)calloc(m.nnodes, sizeof(node_data_type));
3316 
3317  // TODO deal with edgety
3318  if (std::is_void<EdgeTy>::value) {
3319  m.edge_data = NULL;
3320  } else {
3321  if (!std::is_same<EdgeTy, edge_data_type>::value) {
3322  galois::gWarn("Edge data type mismatch between CPU and GPU\n");
3323  }
3324  m.edge_data = (edge_data_type*)calloc(m.nedges, sizeof(edge_data_type));
3325  }
3326 
3328  // TODO not using thread ranges, can be optimized if I can iterate
3329  // directly over userGraph
3330  galois::iterate(userGraph.allNodesRange()),
3331  [&](const GraphNode& nodeID) {
3332  // initialize node_data with localID-to-globalID mapping
3333  m.node_data[nodeID] = userGraph.getGID(nodeID);
3334  m.row_start[nodeID] = *(userGraph.edge_begin(nodeID));
3335  for (auto e = userGraph.edge_begin(nodeID);
3336  e != userGraph.edge_end(nodeID); e++) {
3337  auto edgeID = *e;
3338  setMarshalEdge<std::is_void<EdgeTy>::value>(m, edgeID, e);
3339  m.edge_dst[edgeID] = userGraph.getEdgeDst(e);
3340  }
3341  },
3342  galois::steal());
3343 
3344  m.row_start[m.nnodes] = m.nedges;
3345 
3347 
3348  // copy memoization meta-data
3349  m.num_master_nodes =
3350  (unsigned int*)calloc(masterNodes.size(), sizeof(unsigned int));
3351  ;
3352  m.master_nodes =
3353  (unsigned int**)calloc(masterNodes.size(), sizeof(unsigned int*));
3354  ;
3355 
3356  for (uint32_t h = 0; h < masterNodes.size(); ++h) {
3357  m.num_master_nodes[h] = masterNodes[h].size();
3358 
3359  if (masterNodes[h].size() > 0) {
3360  m.master_nodes[h] =
3361  (unsigned int*)calloc(masterNodes[h].size(), sizeof(unsigned int));
3362  ;
3363  std::copy(masterNodes[h].begin(), masterNodes[h].end(),
3364  m.master_nodes[h]);
3365  } else {
3366  m.master_nodes[h] = NULL;
3367  }
3368  }
3369 
3370  m.num_mirror_nodes =
3371  (unsigned int*)calloc(mirrorNodes.size(), sizeof(unsigned int));
3372  ;
3373  m.mirror_nodes =
3374  (unsigned int**)calloc(mirrorNodes.size(), sizeof(unsigned int*));
3375  ;
3376  for (uint32_t h = 0; h < mirrorNodes.size(); ++h) {
3377  m.num_mirror_nodes[h] = mirrorNodes[h].size();
3378 
3379  if (mirrorNodes[h].size() > 0) {
3380  m.mirror_nodes[h] =
3381  (unsigned int*)calloc(mirrorNodes[h].size(), sizeof(unsigned int));
3382  ;
3383  std::copy(mirrorNodes[h].begin(), mirrorNodes[h].end(),
3384  m.mirror_nodes[h]);
3385  } else {
3386  m.mirror_nodes[h] = NULL;
3387  }
3388  }
3389 
3390  // user needs to provide method of freeing up graph (it can do nothing
3391  // if they wish)
3392  userGraph.deallocate();
3393  }
3394 #endif // het galois def
3395 
3397  // Public sync interface
3399 
3400 public:
3413  template <ReadLocation readLocation, typename SyncFnTy,
3414  typename BitsetFnTy = galois::InvalidBitsetFnTy>
3416  std::string loopName) {
3417  std::string timer_str("Sync_" + get_run_identifier(loopName));
3418  galois::StatTimer Tsync(timer_str.c_str(), RNAME);
3419  Tsync.start();
3420 
3421  currentBVFlag = &(fieldFlags.bitvectorStatus);
3422 
3423  // call a template-specialized function depending on the read location
3424  SyncOnDemandHandler<readLocation, SyncFnTy, BitsetFnTy>::call(
3425  this, fieldFlags, loopName, *currentBVFlag);
3426 
3427  currentBVFlag = nullptr;
3428 
3429  Tsync.stop();
3430  }
3431 
3433  // Metadata settings/getters
3435 
3440  inline void set_num_run(const uint32_t runNum) { num_run = runNum; }
3441 
3447  inline uint32_t get_run_num() const { return num_run; }
3448 
3454  inline void set_num_round(const uint32_t round) { num_round = round; }
3455 
3464  inline std::string get_run_identifier() const {
3465 #if GALOIS_PER_ROUND_STATS
3466  return std::string(std::to_string(num_run) + "_" +
3467  std::to_string(num_round));
3468 #else
3469  return std::string(std::to_string(num_run));
3470 #endif
3471  }
3472 
3480  inline std::string get_run_identifier(std::string loop_name) const {
3481 #if GALOIS_PER_ROUND_STATS
3482  return std::string(std::string(loop_name) + "_" + std::to_string(num_run) +
3483  "_" + std::to_string(num_round));
3484 #else
3485  return std::string(std::string(loop_name) + "_" + std::to_string(num_run));
3486 #endif
3487  }
3488 
3500  inline std::string get_run_identifier(std::string loop_name,
3501  unsigned alterID) const {
3502 #if GALOIS_PER_ROUND_STATS
3503  return std::string(std::string(loop_name) + "_" + std::to_string(alterID) +
3504  "_" + std::to_string(num_run) + "_" +
3505  std::to_string(num_round));
3506 #else
3507  return std::string(std::string(loop_name) + "_" + std::to_string(alterID) +
3508  "_" + std::to_string(num_run));
3509 #endif
3510  }
3511 
3518  template <typename FnTy>
3520  // TODO make sure this is correct still
3521  auto mirrorRanges = userGraph.getMirrorRanges();
3522  for (auto r : mirrorRanges) {
3523  if (r.first == r.second)
3524  continue;
3525  assert(r.first < r.second);
3526 
3527  // GPU call
3528  bool batch_succeeded = FnTy::reset_batch(r.first, r.second - 1);
3529 
3530  // CPU always enters this block
3531  if (!batch_succeeded) {
3533  galois::iterate(r.first, r.second),
3534  [&](uint32_t lid) { FnTy::reset(lid, userGraph.getData(lid)); },
3535  galois::no_stats(),
3536  galois::loopname(get_run_identifier("RESET:MIRRORS").c_str()));
3537  }
3538  }
3539  }
3540 
3542 // Checkpointing code for graph
3544 
3545 // @todo Checkpointing code needs updates to make it work.
3546 #ifdef GALOIS_CHECKPOINT
3547 // * Headers for boost serialization
3549 // */
3550 //#include <boost/archive/binary_oarchive.hpp>
3551 //#include <boost/archive/binary_iarchive.hpp>
3552 //#include <boost/serialization/split_member.hpp>
3553 //#include <boost/serialization/binary_object.hpp>
3554 //#include <boost/serialization/serialization.hpp>
3555 //#include <boost/serialization/vector.hpp>
3556 //#include <boost/serialization/unordered_map.hpp>
3557 //
3558 // public:
3559 // /**
3560 // * Checkpoint the complete structure on the node to disk
3561 // */
3562 // void checkpointSaveNodeData(std::string checkpointFileName = "checkpoint") {
3563 // using namespace boost::archive;
3564 // galois::StatTimer TimerSaveCheckPoint(
3565 // get_run_identifier("TimerSaveCheckpoint").c_str(), RNAME);
3566 //
3567 // TimerSaveCheckPoint.start();
3568 // std::string checkpointFileName_local =
3569 // checkpointFileName + "_" + std::to_string(id);
3570 //
3571 // std::ofstream outputStream(checkpointFileName_local, std::ios::binary);
3572 // if (!outputStream.is_open()) {
3573 // galois::gPrint("ERROR: Could not open ", checkpointFileName_local,
3574 // " to save checkpoint!!!\n");
3575 // }
3576 // galois::gPrint("[", id,
3577 // "] Saving local checkpoint to :", checkpointFileName_local,
3578 // "\n");
3579 //
3580 // boost::archive::binary_oarchive ar(outputStream,
3581 // boost::archive::no_header);
3582 //
3583 // // TODO handle this with CuSP
3584 // userGraph.serializeNodeData(ar);
3585 //
3586 // std::string statSendBytes_str("CheckpointBytesTotal");
3587 // constexpr static const char* const RREGION = "RECOVERY";
3588 // size_t cp_size = outputStream.tellp();
3589 // galois::runtime::reportStat_Tsum(RREGION, statSendBytes_str, cp_size);
3590 //
3591 // outputStream.flush();
3592 // outputStream.close();
3593 // TimerSaveCheckPoint.stop();
3594 // }
3595 //
3596 // /**
3597 // * Load checkpointed data from disk.
3598 // */
3599 // void checkpointApplyNodeData(std::string checkpointFileName = "checkpoint")
3600 // {
3601 // using namespace boost::archive;
3602 // galois::StatTimer TimerApplyCheckPoint(
3603 // get_run_identifier("TimerApplyCheckpoint").c_str(), RNAME);
3604 //
3605 // TimerApplyCheckPoint.start();
3606 // std::string checkpointFileName_local =
3607 // checkpointFileName + "_" + std::to_string(id);
3608 //
3609 // std::ifstream inputStream(checkpointFileName_local, std::ios::binary);
3610 //
3611 // if (!inputStream.is_open()) {
3612 // galois::gPrint("ERROR: Could not open ", checkpointFileName_local,
3613 // " to read checkpoint!!!\n");
3614 // }
3615 // galois::gPrint("[", id, "] reading local checkpoint from: ",
3616 // checkpointFileName_local, "\n");
3617 //
3618 // boost::archive::binary_iarchive ar(inputStream,
3619 // boost::archive::no_header);
3620 //
3621 // // TODO handle this with CuSP
3622 // userGraph.deSerializeNodeData(ar);
3623 //
3624 // inputStream.close();
3625 // TimerApplyCheckPoint.stop();
3626 // }
3627 #endif
3628 };
3629 
3630 template <typename GraphTy>
3631 constexpr const char* const galois::graphs::GluonSubstrate<GraphTy>::RNAME;
3632 } // end namespace graphs
3633 } // end namespace galois
3634 
3635 #endif // header guard
Contains macros for easily defining common Galois sync structures and the field flags class used for ...
BITVECTOR_STATUS bitvectorStatus
Status of the bitvector in terms of if it can be used to sync the field.
Definition: SyncStructures.h:76
Contains forward declarations and the definition of the MarshalGraph class, which is used to marshal ...
Definition: Traits.h:247
void reserve(uint64_t n)
Reserves capacity for the bitset.
Definition: libgalois/include/galois/DynamicBitset.h:90
std::string get_run_identifier() const
Get a run identifier using the set run and set round.
Definition: GluonSubstrate.h:3464
write at source and/or destination
Definition: GluonSubstrate.h:59
void set_num_round(const uint32_t round)
Set the round number for use in the run identifier.
Definition: GluonSubstrate.h:3454
bool src_to_src() const
Return true if src2src is set.
Definition: SyncStructures.h:90
void gInfo(Args &&...args)
Prints an info string from a sequence of things.
Definition: gIO.h:55
__global__ void bitset_reset_range(DynamicBitset *__restrict__ bitset, size_t vec_begin, size_t vec_end, bool test1, size_t bit_index1, uint64_t mask1, bool test2, size_t bit_index2, uint64_t mask2)
Definition: DeviceEdgeSync.h:298
write at source
Definition: GluonSubstrate.h:55
unsigned int getActiveThreads() noexcept
Returns the number of threads in use.
Definition: Threads.cpp:37
WriteLocation
Enumeration for specifiying write location for sync calls.
Definition: GluonSubstrate.h:53
Definition: DataCommMode.h:35
GluonSubstrate()=delete
Delete default constructor: this class NEEDS to have a graph passed into it.
void set_num_run(const uint32_t runNum)
Set the run number.
Definition: GluonSubstrate.h:3440
void resize(size_t n)
Definition: PODResizeableArray.h:142
Concurrent dynamically allocated bitset.
Definition: libgalois/include/galois/DynamicBitset.h:47
Buffer for serialization of data.
Definition: Serialize.h:56
void reserve(size_t s)
Reserve more space in the serialize buffer.
Definition: Serialize.h:110
unsigned int index_type
Definition: EdgeHostDecls.h:33
std::string get_run_identifier(std::string loop_name, unsigned alterID) const
Get a run identifier using the set run and set round and append to the passed in string in addition t...
Definition: GluonSubstrate.h:3500
unsigned int * num_mirror_nodes
Definition: HostDecls.h:53
read at destination
Definition: GluonSubstrate.h:66
unsigned int numNodesWithEdges
Definition: HostDecls.h:43
void clear_read_dst()
Sets write dst flags to false.
Definition: SyncStructures.h:128
Contains the DynamicBitSet class and most of its implementation.
ReadLocation
Enumeration for specifiying read location for sync calls.
Definition: GluonSubstrate.h:62
void gDeserialize(DeSerializeBuffer &buf, T1 &&t1, Args &&...args)
Deserialize data in a buffer into a series of objects.
Definition: Serialize.h:1032
const char * loopname
Definition: Executor_ParaMeter.h:145
#define GALOIS_DIE(...)
Definition: gIO.h:96
Definition: DataCommMode.h:37
void reportStat_Tsum(const S1 &region, const S2 &category, const T &value)
Definition: Statistics.h:562
vTy & getVec()
Returns vector of data stored in this serialize buffer.
Definition: Serialize.h:115
bool test(size_t index) const
Check a bit to see if it is currently set.
Definition: libgalois/include/galois/DynamicBitset.h:192
unsigned int numOwned
Definition: HostDecls.h:41
void reserve(size_t n)
Definition: PODResizeableArray.h:129
edge_data_type * edge_data
Definition: HostDecls.h:50
A class to be inherited from so that all child classes will have a tracked unique ID...
Definition: GlobalObj.h:43
read at source
Definition: GluonSubstrate.h:64
#define GALOIS_ASSERT(cond,...)
Like assert but unconditionally executed.
Definition: gIO.h:102
unsigned numHosts
Definition: HostDecls.h:46
void reset_mirrorField()
Given a sync structure, reset the field specified by the structure to the 0 of the reduction on mirro...
Definition: GluonSubstrate.h:3519
void resize(uint64_t n)
Resizes the bitset.
Definition: libgalois/include/galois/DynamicBitset.h:78
Contains the DataCommMode enumeration and a function that chooses a data comm mode based on its argum...
std::vector< T, Pow2Alloc< T >> Vector
[STL vector using Pow_2_VarSizeAlloc]
Definition: gstl.h:52
size_t nnodes
Definition: HostDecls.h:39
unsigned int * num_master_nodes
Definition: HostDecls.h:51
bool src_invalid(BITVECTOR_STATUS bv_flag)
Return true if the sources are invalid in bitvector flag.
Definition: SyncStructures.cpp:30
void reportStat_Single(const S1 &region, const S2 &category, const T &value)
Definition: Statistics.h:544
const Ty max(std::atomic< Ty > &a, const Ty &b)
Definition: AtomicHelpers.h:40
void reset()
Gets the space taken by the bitset.
Definition: libgalois/include/galois/DynamicBitset.h:110
int id
Definition: HostDecls.h:45
index_type * edge_dst
Definition: HostDecls.h:48
Contains declaration of DistStatManager, which reports runtime statistics of a distributed applicatio...
size_type size() const
Returns the size of the serialize buffer.
Definition: Serialize.h:125
unsigned int ** master_nodes
Definition: HostDecls.h:52
void gPrint(Args &&...args)
Prints a sequence of things.
Definition: gIO.h:47
unsigned int activeThreads
Definition: Threads.cpp:26
Definition: Traits.h:206
NetworkInterface & getSystemNetworkInterface()
Get the network interface.
Definition: Network.cpp:131
size_t size() const
Gets the size of the bitset.
Definition: libgalois/include/galois/DynamicBitset.h:99
A structure representing an empty bitset.
Definition: libgalois/include/galois/DynamicBitset.h:413
void sync(std::string loopName)
Main sync call exposed to the user that calls the correct sync function based on provided template ar...
Definition: GluonSubstrate.h:3046
bool src_to_dst() const
Return true if src2dst is set.
Definition: SyncStructures.h:93
size_type size() const
Definition: PODResizeableArray.h:125
void reset(Ty &var, Ty val)
Definition: AtomicHelpers.h:202
send no data
Definition: DataCommMode.h:34
none of the bitvector is invalid
Definition: SyncStructures.h:46
Definition: DataCommMode.h:38
void do_all(const RangeFunc &rangeMaker, FunctionTy &&fn, const Args &...args)
Standard do-all loop.
Definition: Loops.h:71
bool set(size_t index)
Set a bit in the bitset.
Definition: libgalois/include/galois/DynamicBitset.h:206
unsigned int node_data_type
Definition: EdgeHostDecls.h:34
Definition: DataCommMode.h:36
void syncOnDemand(galois::runtime::FieldFlags &fieldFlags, std::string loopName)
Given a structure that contains flags signifying what needs to be synchronized, syncOnDemand will syn...
Definition: GluonSubstrate.h:3415
void clear_read_src()
Sets write src flags to false.
Definition: SyncStructures.h:122
GlobalObject(const GlobalObject &)=delete
void start()
Definition: Timer.cpp:82
void on_each(FunctionTy &&fn, const Args &...args)
Low-level parallel loop.
Definition: Loops.h:86
index_type * row_start
Definition: HostDecls.h:47
Defines the GlobalObject class, which is a base class that other classes inherit from to be assigned ...
write at destination
Definition: GluonSubstrate.h:57
substrate::Barrier & getHostBarrier()
Returns a host barrier, which is a regular MPI-Like Barrier for all hosts.
Definition: libdist/src/Barrier.cpp:109
unsigned getOffset() const
Gets the current offset into the deserialize buffer.
Definition: Serialize.h:210
void resize(size_t bytes)
Definition: Serialize.h:103
BITVECTOR_STATUS
Bitvector status enum specifying validness of certain things in bitvector.
Definition: SyncStructures.h:45
Each field has a FieldFlags object that indicates synchronization status of that field.
Definition: SyncStructures.h:65
bool dst_invalid(BITVECTOR_STATUS bv_flag)
Return true if the destinations are invalid in bitvector flag.
Definition: SyncStructures.cpp:35
DataCommMode enforcedDataMode
Specifies what format to send metadata in.
Definition: GluonSubstrate.cpp:29
uint32_t evilPhase
Variable that keeps track of which network send/recv phase a program is currently on...
Definition: Network.cpp:36
uint32_t get_run_num() const
Get the set run number.
Definition: GluonSubstrate.h:3447
GluonSubstrate(GraphTy &_userGraph, unsigned host, unsigned numHosts, bool _transposed, std::pair< unsigned, unsigned > _cartesianGrid=std::make_pair(0u, 0u), bool _partitionAgnostic=false, DataCommMode _enforcedDataMode=DataCommMode::noData)
Constructor for GluonSubstrate.
Definition: GluonSubstrate.h:423
bool dst_to_dst() const
Return true if dst2dst is set.
Definition: SyncStructures.h:99
std::string get_run_identifier(std::string loop_name) const
Get a run identifier using the set run and set round and append to the passed in string.
Definition: GluonSubstrate.h:3480
Buffer for deserialization of data.
Definition: Serialize.h:147
auto iterate(C &cont)
Definition: Range.h:323
Definition: HostDecls.h:38
pointer data()
Definition: PODResizeableArray.h:174
void gWarn(Args &&...args)
Prints a warning string from a sequence of things.
Definition: gIO.h:63
unsigned edge_data_type
Definition: EdgeHostDecls.h:35
Definition: Timer.h:88
size_t nedges
Definition: HostDecls.h:40
DataCommMode
Enumeration of data communication modes that can be used in synchronization.
Definition: DataCommMode.h:33
unsigned int beginMaster
Definition: HostDecls.h:42
unsigned int ** mirror_nodes
Definition: HostDecls.h:54
Contains the BareMPI enum and the command line option that controls bare MPI usage.
const auto & get_vec() const
Returns the underlying bitset representation to the user.
Definition: libgalois/include/galois/DynamicBitset.h:63
Definition: DataCommMode.h:39
node_data_type * node_data
Definition: HostDecls.h:49
Indicates if T is memory copyable.
Definition: ExtraTraits.h:64
Definition: DataCommMode.h:40
bool dst_to_src() const
Return true if dst2src is set.
Definition: SyncStructures.h:96
vTy & getVec()
Get the underlying vector storing the data of the deserialize buffer.
Definition: Serialize.h:244
void stop()
Definition: Timer.cpp:87
Galois Timer that automatically reports stats upon destruction Provides statistic interface around ti...
Definition: Timer.h:63
Gluon communication substrate that handles communication given a user graph.
Definition: GluonSubstrate.h:83
read at source and/or destination
Definition: GluonSubstrate.h:68