Galois
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
GluonEdgeSubstrate.h
Go to the documentation of this file.
1 /*
2  * This file belongs to the Galois project, a C++ library for exploiting
3  * parallelism. The code is being released under the terms of the 3-Clause BSD
4  * License (a copy is located in LICENSE.txt at the top-level directory).
5  *
6  * Copyright (C) 2019, The University of Texas at Austin. All rights reserved.
7  * UNIVERSITY EXPRESSLY DISCLAIMS ANY AND ALL WARRANTIES CONCERNING THIS
8  * SOFTWARE AND DOCUMENTATION, INCLUDING ANY WARRANTIES OF MERCHANTABILITY,
9  * FITNESS FOR ANY PARTICULAR PURPOSE, NON-INFRINGEMENT AND WARRANTIES OF
10  * PERFORMANCE, AND ANY WARRANTY THAT MIGHT OTHERWISE ARISE FROM COURSE OF
11  * DEALING OR USAGE OF TRADE. NO WARRANTY IS EITHER EXPRESS OR IMPLIED WITH
12  * RESPECT TO THE USE OF THE SOFTWARE OR DOCUMENTATION. Under no circumstances
13  * shall University be liable for incidental, special, indirect, direct or
14  * consequential damages or loss of profits, interruption of business, or
15  * related expenses which may arise from use of Software or Documentation,
16  * including but not limited to those resulting from defects in Software and/or
17  * Documentation, or loss or inaccuracy of data of any kind.
18  */
19 
26 // TODO merge with GluonSubstrate; way too much code duplication
27 
28 #ifndef _GALOIS_GLUONEDGESUB_H_
29 #define _GALOIS_GLUONEDGESUB_H_
30 
31 #include <unordered_map>
32 #include <fstream>
33 
38 #include "galois/DynamicBitset.h"
39 
40 #ifdef GALOIS_ENABLE_GPU
42 #endif
43 
44 #include "galois/runtime/BareMPI.h"
45 
46 // TODO make not global
49 
50 #ifdef GALOIS_USE_BARE_MPI
51 extern BareMPI bare_mpi;
53 #endif
54 
55 namespace galois {
56 namespace graphs {
57 
66 template <typename GraphTy>
68 private:
70  enum SyncType {
71  syncReduce,
72  syncBroadcast
73  };
74 
76  constexpr static const char* const RNAME = "GluonEdges";
77 
79  GraphTy& userGraph;
80  const unsigned id;
81  DataCommMode substrateDataMode;
82  const uint32_t
83  numHosts;
84  uint32_t num_run;
85  uint32_t num_round;
86 
87  // memoization optimization
89  std::vector<std::vector<size_t>> masterEdges;
92  std::vector<std::vector<size_t>>& mirrorEdges;
94  size_t maxSharedSize;
95 
96 #ifdef GALOIS_USE_BARE_MPI
97  std::vector<MPI_Group> mpi_identity_groups;
98 #endif
99  // Used for efficient comms
100  galois::DynamicBitSet syncBitset;
102 
103  void reset_bitset(void (*bitset_reset_range)(size_t, size_t)) {
104  if (userGraph.sizeEdges() > 0) {
105  bitset_reset_range(0, userGraph.sizeEdges() - 1);
106  }
107  }
108 
110  void inline incrementEvilPhase() {
112  // limit defined by MPI or LCI
114  uint32_t{std::numeric_limits<int16_t>::max()}) {
116  }
117  }
118 
120  // Proxy communication setup
122 
126  void exchangeProxyInfo() {
128 
129  // send off the mirror edges
130  for (unsigned x = 0; x < numHosts; ++x) {
131  if (x == id)
132  continue;
133 
135  gSerialize(b, mirrorEdges[x]);
136  net.sendTagged(x, galois::runtime::evilPhase, b);
137  }
138 
139  // receive the mirror edges
140  for (unsigned x = 0; x < numHosts; ++x) {
141  if (x == id)
142  continue;
143 
144  decltype(net.recieveTagged(galois::runtime::evilPhase, nullptr)) p;
145  do {
146  p = net.recieveTagged(galois::runtime::evilPhase, nullptr);
147  } while (!p);
148 
149  galois::runtime::gDeserialize(p->second, masterEdges[p->first]);
150  }
151  incrementEvilPhase();
152  }
153 
158  void sendInfoToHost() {
160 
161  uint64_t totalMirrorEdges =
162  userGraph.sizeEdges() - userGraph.numOwnedEdges();
163  uint64_t totalOwnedEdges = userGraph.numOwnedEdges();
164 
165  // send info to host
166  for (unsigned x = 0; x < numHosts; ++x) {
167  if (x == id)
168  continue;
169 
171  gSerialize(b, totalMirrorEdges, totalOwnedEdges);
172  net.sendTagged(x, galois::runtime::evilPhase, b);
173  }
174 
175  // receive
176  for (unsigned x = 0; x < numHosts; ++x) {
177  if (x == id)
178  continue;
179 
180  decltype(net.recieveTagged(galois::runtime::evilPhase, nullptr)) p;
181  do {
182  p = net.recieveTagged(galois::runtime::evilPhase, nullptr);
183  } while (!p);
184 
185  uint64_t totalMirrorFromOther;
186  uint64_t totalOwnedFromOther;
187  galois::runtime::gDeserialize(p->second, totalMirrorFromOther,
188  totalOwnedFromOther);
189  totalMirrorEdges += totalMirrorFromOther;
190  totalOwnedEdges += totalOwnedFromOther;
191  }
192  incrementEvilPhase();
193 
194  assert(userGraph.globalEdges() == totalOwnedEdges);
195 
196  // report stats
197  if (net.ID == 0) {
198  reportProxyStats(totalMirrorEdges);
199  }
200  }
201 
206  void setupCommunication() {
207  galois::CondStatTimer<MORE_DIST_STATS> Tcomm_setup("CommunicationSetupTime",
208  RNAME);
209 
210  // barrier so that all hosts start the timer together
212 
213  Tcomm_setup.start();
214 
215  // Exchange information for memoization optimization.
216  exchangeProxyInfo();
217  // convert the global ids stored in the master/mirror edges arrays to local
218  // ids
219  // TODO: use 32-bit distinct vectors for masters and mirrors from here on
220  for (uint32_t h = 0; h < masterEdges.size(); ++h) {
222  galois::iterate(size_t{0}, masterEdges[h].size()),
223  [&](size_t n) {
224  masterEdges[h][n] = userGraph.getEdgeLID(masterEdges[h][n]);
225  },
226 #if GALOIS_COMM_STATS
227  galois::loopname(get_run_identifier("MasterEdges").c_str()),
228 #endif
229  galois::no_stats());
230  }
231 
232  for (uint32_t h = 0; h < mirrorEdges.size(); ++h) {
234  galois::iterate(size_t{0}, mirrorEdges[h].size()),
235  [&](size_t n) {
236  mirrorEdges[h][n] = userGraph.getEdgeLID(mirrorEdges[h][n]);
237  },
238 #if GALOIS_COMM_STATS
239  galois::loopname(get_run_identifier("MirrorEdges").c_str()),
240 #endif
241  galois::no_stats());
242  }
243 
244  Tcomm_setup.stop();
245 
246  maxSharedSize = 0;
247  // report masters/mirrors to/from other hosts as statistics
248  for (auto x = 0U; x < masterEdges.size(); ++x) {
249  if (x == id)
250  continue;
251  std::string master_edges_str =
252  "MasterEdgesFrom_" + std::to_string(id) + "_To_" + std::to_string(x);
253  galois::runtime::reportStatCond_Tsum<MORE_DIST_STATS>(
254  RNAME, master_edges_str, masterEdges[x].size());
255  if (masterEdges[x].size() > maxSharedSize) {
256  maxSharedSize = masterEdges[x].size();
257  }
258  }
259 
260  for (auto x = 0U; x < mirrorEdges.size(); ++x) {
261  if (x == id)
262  continue;
263  std::string mirror_edges_str =
264  "MirrorEdgesFrom_" + std::to_string(x) + "_To_" + std::to_string(id);
265  galois::runtime::reportStatCond_Tsum<MORE_DIST_STATS>(
266  RNAME, mirror_edges_str, mirrorEdges[x].size());
267  if (mirrorEdges[x].size() > maxSharedSize) {
268  maxSharedSize = mirrorEdges[x].size();
269  }
270  }
271 
272  sendInfoToHost();
273 
274  // do not track memory usage of partitioning
276  net.resetMemUsage();
277  }
278 
286  void reportProxyStats(uint64_t totalMirrorEdges) {
287  float replication_factor =
288  (float)(totalMirrorEdges + userGraph.globalEdges()) /
289  (float)userGraph.globalEdges();
290  galois::runtime::reportStat_Single(RNAME, "ReplicationFactorEdges",
291  replication_factor);
292  galois::runtime::reportStatCond_Single<MORE_DIST_STATS>(
293  RNAME, "TotalGlobalMirrorEdges", totalMirrorEdges);
294  }
295 
297  // Initializers
299 
303  void initBareMPI() {
304 #ifdef GALOIS_USE_BARE_MPI
305  if (bare_mpi == noBareMPI)
306  return;
307 
308 #ifdef GALOIS_USE_LCI
309  // sanity check of ranks
310  int taskRank;
311  MPI_Comm_rank(MPI_COMM_WORLD, &taskRank);
312  if ((unsigned)taskRank != id)
313  GALOIS_DIE("mismatch in MPI rank");
314  int numTasks;
315  MPI_Comm_size(MPI_COMM_WORLD, &numTasks);
316  if ((unsigned)numTasks != numHosts)
317  GALOIS_DIE("mismatch in MPI rank");
318 #endif
319  // group setup
320  MPI_Group world_group;
321  MPI_Comm_group(MPI_COMM_WORLD, &world_group);
322  mpi_identity_groups.resize(numHosts);
323 
324  for (unsigned x = 0; x < numHosts; ++x) {
325  const int g[1] = {(int)x};
326  MPI_Group_incl(world_group, 1, g, &mpi_identity_groups[x]);
327  }
328 
329  if (id == 0) {
330  switch (bare_mpi) {
331  case nonBlockingBareMPI:
332  galois::gPrint("Using non-blocking bare MPI\n");
333  break;
334  case oneSidedBareMPI:
335  galois::gPrint("Using one-sided bare MPI\n");
336  break;
337  case noBareMPI:
338  default:
339  GALOIS_DIE("unsupported bare MPI");
340  }
341  }
342 #endif
343  }
344 
345 public:
350  GluonEdgeSubstrate() = delete;
351 
361  GluonEdgeSubstrate(GraphTy& _userGraph, unsigned host, unsigned numHosts,
362  bool doNothing = false,
363  DataCommMode _substrateDataMode = DataCommMode::noData)
364  : galois::runtime::GlobalObject(this), userGraph(_userGraph), id(host),
365  substrateDataMode(_substrateDataMode), numHosts(numHosts), num_run(0),
366  num_round(0), mirrorEdges(userGraph.getMirrorEdges()) {
367  if (!doNothing) {
368  galois::StatTimer edgeSubstrateSetupTimer(
369  "GluonEdgeSubstrateConstructTime", RNAME);
370  edgeSubstrateSetupTimer.start();
371 
372  // set global
373  enforcedDataMode = _substrateDataMode;
374 
375  initBareMPI();
376  // master setup from mirrors done by setupCommunication call
377  masterEdges.resize(numHosts);
378 
379  // setup proxy communication
380  galois::CondStatTimer<MORE_DIST_STATS> Tgraph_construct_comm(
381  "GraphCommSetupTime", RNAME);
382  Tgraph_construct_comm.start();
383  setupCommunication();
384  Tgraph_construct_comm.stop();
385 
386  edgeSubstrateSetupTimer.stop();
387  }
388  }
389 
391  // Data extraction from bitsets
393 
394 private:
408  template <SyncType syncType>
409  void getOffsetsFromBitset(const std::string& loopName,
410  const galois::DynamicBitSet& bitset_comm,
412  size_t& bit_set_count) const {
413  // timer creation
414  std::string syncTypeStr = (syncType == syncReduce) ? "Reduce" : "Broadcast";
415  std::string offsets_timer_str(syncTypeStr + "Offsets_" +
416  get_run_identifier(loopName));
417  galois::CondStatTimer<GALOIS_COMM_STATS> Toffsets(offsets_timer_str.c_str(),
418  RNAME);
419 
420  Toffsets.start();
421 
423  std::vector<unsigned int> t_prefix_bit_counts(activeThreads);
424 
425  // count how many bits are set on each thread
426  galois::on_each([&](unsigned tid, unsigned nthreads) {
427  // TODO use block_range instead
428  unsigned int block_size = bitset_comm.size() / nthreads;
429  if ((bitset_comm.size() % nthreads) > 0)
430  ++block_size;
431  assert((block_size * nthreads) >= bitset_comm.size());
432 
433  unsigned int start = tid * block_size;
434  unsigned int end = (tid + 1) * block_size;
435  if (end > bitset_comm.size())
436  end = bitset_comm.size();
437 
438  unsigned int count = 0;
439  for (unsigned int i = start; i < end; ++i) {
440  if (bitset_comm.test(i))
441  ++count;
442  }
443 
444  t_prefix_bit_counts[tid] = count;
445  });
446 
447  // calculate prefix sum of bits per thread
448  for (unsigned int i = 1; i < activeThreads; ++i) {
449  t_prefix_bit_counts[i] += t_prefix_bit_counts[i - 1];
450  }
451  // total num of set bits
452  bit_set_count = t_prefix_bit_counts[activeThreads - 1];
453 
454  // calculate the indices of the set bits and save them to the offset
455  // vector
456  if (bit_set_count > 0) {
457  offsets.resize(bit_set_count);
458  galois::on_each([&](unsigned tid, unsigned nthreads) {
459  // TODO use block_range instead
460  // TODO this is same calculation as above; maybe refactor it
461  // into function?
462  unsigned int block_size = bitset_comm.size() / nthreads;
463  if ((bitset_comm.size() % nthreads) > 0)
464  ++block_size;
465  assert((block_size * nthreads) >= bitset_comm.size());
466 
467  unsigned int start = tid * block_size;
468  unsigned int end = (tid + 1) * block_size;
469  if (end > bitset_comm.size())
470  end = bitset_comm.size();
471 
472  unsigned int count = 0;
473  unsigned int t_prefix_bit_count;
474  if (tid == 0) {
475  t_prefix_bit_count = 0;
476  } else {
477  t_prefix_bit_count = t_prefix_bit_counts[tid - 1];
478  }
479 
480  for (unsigned int i = start; i < end; ++i) {
481  if (bitset_comm.test(i)) {
482  offsets[t_prefix_bit_count + count] = i;
483  ++count;
484  }
485  }
486  });
487  }
488  Toffsets.stop();
489  }
490 
513  template <typename FnTy, SyncType syncType>
514  void getBitsetAndOffsets(const std::string& loopName,
515  const std::vector<size_t>& indices,
516  const galois::DynamicBitSet& bitset_compute,
517  galois::DynamicBitSet& bitset_comm,
519  size_t& bit_set_count,
520  DataCommMode& data_mode) const {
521  if (substrateDataMode != onlyData) {
522  bitset_comm.reset();
523  std::string syncTypeStr =
524  (syncType == syncReduce) ? "Reduce" : "Broadcast";
525  std::string doall_str(syncTypeStr + "Bitset_" + loopName);
526 
527  bitset_comm.reset();
528  // determine which local edges in the indices array need to be
529  // sychronized
531  galois::iterate(size_t{0}, indices.size()),
532  [&](size_t n) {
533  // assumes each lid is unique as test is not thread safe
534  size_t lid = indices[n];
535  if (bitset_compute.test(lid)) {
536  bitset_comm.set(n);
537  }
538  },
539 #if GALOIS_COMM_STATS
540  galois::loopname(get_run_identifier(doall_str).c_str()),
541 #endif
542  galois::no_stats());
543 
544  // get the number of set bits and the offsets into the comm bitset
545  getOffsetsFromBitset<syncType>(loopName, bitset_comm, offsets,
546  bit_set_count);
547  }
548 
549  data_mode =
550  get_data_mode<typename FnTy::ValTy>(bit_set_count, indices.size());
551  }
552 
554  // Local to global ID conversion
556 
567  template <SyncType syncType>
568  void convertLIDToGID(const std::string& loopName,
569  const std::vector<size_t>& indices,
571  galois::gWarn("LID to GID edge conversion is extremely inefficient at the "
572  "moment!");
573  std::string syncTypeStr = (syncType == syncReduce) ? "Reduce" : "Broadcast";
574  std::string doall_str(syncTypeStr + "_LID2GID_" +
575  get_run_identifier(loopName));
577  galois::iterate(size_t{0}, offsets.size()),
578  [&](size_t n) {
579  offsets[n] =
580  static_cast<uint32_t>(userGraph.getEdgeGID(indices[offsets[n]]));
581  },
582 #if GALOIS_COMM_STATS
583  galois::loopname(get_run_identifier(doall_str).c_str()),
584 #endif
585  galois::no_stats());
586  }
587 
596  template <SyncType syncType>
597  void convertGIDToLID(const std::string& loopName,
599  galois::gWarn("convert GID to LID used in sync call (not optimized)");
600  std::string syncTypeStr = (syncType == syncReduce) ? "Reduce" : "Broadcast";
601  std::string doall_str(syncTypeStr + "_GID2LID_" +
602  get_run_identifier(loopName));
603 
605  galois::iterate(size_t{0}, offsets.size()),
606  [&](size_t n) { offsets[n] = userGraph.getEdgeLID(offsets[n]); },
607 #if GALOIS_COMM_STATS
608  galois::loopname(get_run_identifier(doall_str).c_str()),
609 #endif
610  galois::no_stats());
611  }
612 
614  // Message prep functions (buffering, send buffer getting, etc.)
616 
628  template <
629  SyncType syncType, typename SyncFnTy, typename BitsetFnTy, bool async,
630  typename std::enable_if<!BitsetFnTy::is_vector_bitset()>::type* = nullptr>
631  void getSendBuffer(std::string loopName, unsigned x,
633  auto& sharedEdges = (syncType == syncReduce) ? mirrorEdges : masterEdges;
634 
635  if (BitsetFnTy::is_valid()) {
636  syncExtract<syncType, SyncFnTy, BitsetFnTy, async>(loopName, x,
637  sharedEdges[x], b);
638  } else {
639  syncExtract<syncType, SyncFnTy, async>(loopName, x, sharedEdges[x], b);
640  }
641 
642  std::string syncTypeStr = (syncType == syncReduce) ? "Reduce" : "Broadcast";
643  std::string statSendBytes_str(syncTypeStr + "SendBytes_" +
644  get_run_identifier(loopName));
645 
646  galois::runtime::reportStat_Tsum(RNAME, statSendBytes_str, b.size());
647  }
648 
666  template <bool async, SyncType syncType, typename VecType>
667  void serializeMessage(std::string loopName, DataCommMode data_mode,
668  size_t bit_set_count, std::vector<size_t>& indices,
670  galois::DynamicBitSet& bit_set_comm, VecType& val_vec,
672  std::string syncTypeStr = (syncType == syncReduce) ? "Reduce" : "Broadcast";
673  std::string serialize_timer_str(syncTypeStr + "SerializeMessage_" +
674  get_run_identifier(loopName));
676  serialize_timer_str.c_str(), RNAME);
677  if (data_mode == noData) {
678  if (!async) {
679  Tserialize.start();
680  gSerialize(b, data_mode);
681  Tserialize.stop();
682  }
683  } else if (data_mode == gidsData) {
684  offsets.resize(bit_set_count);
685  convertLIDToGID<syncType>(loopName, indices, offsets);
686  val_vec.resize(bit_set_count);
687  Tserialize.start();
688  gSerialize(b, data_mode, bit_set_count, offsets, val_vec);
689  Tserialize.stop();
690  } else if (data_mode == offsetsData) {
691  offsets.resize(bit_set_count);
692  val_vec.resize(bit_set_count);
693  Tserialize.start();
694  gSerialize(b, data_mode, bit_set_count, offsets, val_vec);
695  Tserialize.stop();
696  } else if (data_mode == bitsetData) {
697  val_vec.resize(bit_set_count);
698  Tserialize.start();
699  gSerialize(b, data_mode, bit_set_count, bit_set_comm, val_vec);
700  Tserialize.stop();
701  } else { // onlyData
702  Tserialize.start();
703  gSerialize(b, data_mode, val_vec);
704  Tserialize.stop();
705  }
706  }
707 
732  template <SyncType syncType, typename VecType>
733  void deserializeMessage(std::string loopName, DataCommMode data_mode,
734  uint32_t num, galois::runtime::RecvBuffer& buf,
735  size_t& bit_set_count,
737  galois::DynamicBitSet& bit_set_comm,
738  size_t& buf_start, size_t& retval, VecType& val_vec) {
739  std::string syncTypeStr = (syncType == syncReduce) ? "Reduce" : "Broadcast";
740  std::string serialize_timer_str(syncTypeStr + "DeserializeMessage_" +
741  get_run_identifier(loopName));
743  serialize_timer_str.c_str(), RNAME);
744  Tdeserialize.start();
745 
746  // get other metadata associated with message if mode isn't OnlyData
747  if (data_mode != onlyData) {
748  galois::runtime::gDeserialize(buf, bit_set_count);
749 
750  if (data_mode == gidsData) {
751  galois::runtime::gDeserialize(buf, offsets);
752  convertGIDToLID<syncType>(loopName, offsets);
753  } else if (data_mode == offsetsData) {
754  galois::runtime::gDeserialize(buf, offsets);
755  } else if (data_mode == bitsetData) {
756  bit_set_comm.resize(num);
757  galois::runtime::gDeserialize(buf, bit_set_comm);
758  } else if (data_mode == dataSplit) {
759  galois::runtime::gDeserialize(buf, buf_start);
760  } else if (data_mode == dataSplitFirst) {
761  galois::runtime::gDeserialize(buf, retval);
762  }
763  }
764 
765  // get data itself
766  galois::runtime::gDeserialize(buf, val_vec);
767 
768  Tdeserialize.stop();
769  }
770 
772  // Other helper functions
774  // Requirement: For all X and Y,
775  // On X, nothingToSend(Y) <=> On Y, nothingToRecv(X)
784  bool nothingToSend(unsigned host, SyncType syncType) {
785  auto& sharedEdges = (syncType == syncReduce) ? mirrorEdges : masterEdges;
786  return (sharedEdges[host].size() == 0);
787  }
788 
798  bool nothingToRecv(unsigned host, SyncType syncType) {
799  auto& sharedEdges = (syncType == syncReduce) ? masterEdges : mirrorEdges;
800  return (sharedEdges[host].size() == 0);
801  }
802 
817  template <typename SyncFnTy>
818  void reportRedundantSize(std::string loopName, std::string syncTypeStr,
819  uint32_t totalToSend, size_t bitSetCount,
820  const galois::DynamicBitSet& bitSetComm) {
821  size_t redundant_size =
822  (totalToSend - bitSetCount) * sizeof(typename SyncFnTy::ValTy);
823  size_t bit_set_size = (bitSetComm.get_vec().size() * sizeof(uint64_t));
824 
825  if (redundant_size > bit_set_size) {
826  std::string statSavedBytes_str(syncTypeStr + "SavedBytes_" +
827  get_run_identifier(loopName));
828 
829  galois::runtime::reportStatCond_Tsum<MORE_DIST_STATS>(
830  RNAME, statSavedBytes_str, (redundant_size - bit_set_size));
831  }
832  }
833 
835  // Extract data from edges (for reduce and broadcast)
837 
849  /* Reduction extract resets the value afterwards */
850  template <typename FnTy, SyncType syncType>
851  inline typename FnTy::ValTy extractWrapper(size_t lid) {
852  if (syncType == syncReduce) {
853  auto val = FnTy::extract(lid, userGraph.getEdgeData(lid));
854  FnTy::reset(lid, userGraph.getEdgeData(lid));
855  return val;
856  } else {
857  return FnTy::extract(lid, userGraph.getEdgeData(lid));
858  }
859  }
860 
875  /* Reduction extract resets the value afterwards */
876  template <typename FnTy, SyncType syncType>
877  inline typename FnTy::ValTy extractWrapper(size_t lid, unsigned vecIndex) {
878  if (syncType == syncReduce) {
879  auto val = FnTy::extract(lid, userGraph.getEdgeData(lid), vecIndex);
880  FnTy::reset(lid, userGraph.getEdgeData(lid), vecIndex);
881  return val;
882  } else {
883  return FnTy::extract(lid, userGraph.getEdgeData(lid), vecIndex);
884  }
885  }
886 
908  template <typename FnTy, SyncType syncType, bool identity_offsets = false,
909  bool parallelize = true>
910  void extractSubset(const std::string& loopName,
911  const std::vector<size_t>& indices, size_t size,
914  size_t start = 0) {
915  if (parallelize) {
916  std::string syncTypeStr =
917  (syncType == syncReduce) ? "Reduce" : "Broadcast";
918  std::string doall_str(syncTypeStr + "ExtractVal_" + loopName);
919 
921  galois::iterate(start, start + size),
922  [&](unsigned int n) {
923  unsigned int offset;
924  if (identity_offsets)
925  offset = n;
926  else
927  offset = offsets[n];
928  size_t lid = indices[offset];
929  val_vec[n - start] = extractWrapper<FnTy, syncType>(lid);
930  },
931 #if GALOIS_COMM_STATS
932  galois::loopname(get_run_identifier(doall_str).c_str()),
933 #endif
934  galois::no_stats());
935  } else { // non-parallel version
936  for (unsigned n = start; n < start + size; ++n) {
937  unsigned int offset;
938  if (identity_offsets)
939  offset = n;
940  else
941  offset = offsets[n];
942 
943  size_t lid = indices[offset];
944  val_vec[n - start] = extractWrapper<FnTy, syncType>(lid);
945  }
946  }
947  }
948 
975  // TODO find a better way to have this variant without code duplication
976  template <typename FnTy, SyncType syncType, bool identity_offsets = false,
977  bool parallelize = true, bool vecSync = false,
978  typename std::enable_if<vecSync>::type* = nullptr>
979  void extractSubset(const std::string& loopName,
980  const std::vector<size_t>& indices, size_t size,
983  unsigned vecIndex, size_t start = 0) {
984  val_vec.resize(size); // resize val vec for this vecIndex
985 
986  if (parallelize) {
987  std::string syncTypeStr =
988  (syncType == syncReduce) ? "Reduce" : "Broadcast";
989  std::string doall_str(syncTypeStr + "ExtractValVector_" + loopName);
990 
992  galois::iterate(start, start + size),
993  [&](unsigned int n) {
994  unsigned int offset;
995  if (identity_offsets)
996  offset = n;
997  else
998  offset = offsets[n];
999  size_t lid = indices[offset];
1000  val_vec[n - start] = extractWrapper<FnTy, syncType>(lid, vecIndex);
1001  },
1002 #if GALOIS_COMM_STATS
1003  galois::loopname(get_run_identifier(doall_str).c_str()),
1004 #endif
1005  galois::no_stats());
1006  } else { // non-parallel version
1007  for (unsigned n = start; n < start + size; ++n) {
1008  unsigned int offset;
1009  if (identity_offsets)
1010  offset = n;
1011  else
1012  offset = offsets[n];
1013  size_t lid = indices[offset];
1014  val_vec[n - start] = extractWrapper<FnTy, syncType>(lid, vecIndex);
1015  }
1016  }
1017  }
1018 
1043  template <typename FnTy, typename SeqTy, SyncType syncType,
1044  bool identity_offsets = false, bool parallelize = true>
1045  void extractSubset(const std::string& loopName,
1046  const std::vector<size_t>& indices, size_t size,
1048  galois::runtime::SendBuffer& b, SeqTy lseq,
1049  size_t start = 0) {
1050  if (parallelize) {
1051  std::string syncTypeStr =
1052  (syncType == syncReduce) ? "Reduce" : "Broadcast";
1053  std::string doall_str(syncTypeStr + "ExtractVal_" + loopName);
1054 
1056  galois::iterate(start, start + size),
1057  [&](unsigned int n) {
1058  unsigned int offset;
1059  if (identity_offsets)
1060  offset = n;
1061  else
1062  offset = offsets[n];
1063 
1064  size_t lid = indices[offset];
1065  gSerializeLazy(b, lseq, n - start,
1066  extractWrapper<FnTy, syncType>(lid));
1067  },
1068 #if GALOIS_COMM_STATS
1069  galois::loopname(get_run_identifier(doall_str).c_str()),
1070 #endif
1071  galois::no_stats());
1072  } else {
1073  for (unsigned int n = start; n < start + size; ++n) {
1074  unsigned int offset;
1075  if (identity_offsets)
1076  offset = n;
1077  else
1078  offset = offsets[n];
1079  size_t lid = indices[offset];
1080  gSerializeLazy(b, lseq, n - start, extractWrapper<FnTy, syncType>(lid));
1081  }
1082  }
1083  }
1084 
1097  template <typename FnTy, SyncType syncType>
1098  inline bool extractBatchWrapper(unsigned x, galois::runtime::SendBuffer& b) {
1099  if (syncType == syncReduce) {
1100  return FnTy::extract_reset_batch(x, b.getVec().data());
1101  } else {
1102  return FnTy::extract_batch(x, b.getVec().data());
1103  }
1104  }
1105 
1124  template <typename FnTy, SyncType syncType>
1125  inline bool extractBatchWrapper(unsigned x, galois::runtime::SendBuffer& b,
1126  size_t& s, DataCommMode& data_mode) {
1127  if (syncType == syncReduce) {
1128  return FnTy::extract_reset_batch(x, b.getVec().data(), &s, &data_mode);
1129  } else {
1130  return FnTy::extract_batch(x, b.getVec().data(), &s, &data_mode);
1131  }
1132  }
1133 
1135  // Reduce/sets on node (for broadcast)
1137 
1149  template <typename FnTy, SyncType syncType, bool async>
1150  inline void setWrapper(size_t lid, typename FnTy::ValTy val,
1151  galois::DynamicBitSet& bit_set_compute) {
1152  if (syncType == syncReduce) {
1153  if (FnTy::reduce(lid, userGraph.getEdgeData(lid), val)) {
1154  if (bit_set_compute.size() != 0) {
1155  bit_set_compute.set(lid);
1156  }
1157  }
1158  } else {
1159  if (async) {
1160  FnTy::reduce(lid, userGraph.getEdgeData(lid), val);
1161  } else {
1162  FnTy::setVal(lid, userGraph.getEdgeData(lid), val);
1163  assert(FnTy::extract(lid, userGraph.getEdgeData(lid)) == val);
1164  }
1165  }
1166  }
1167 
1191  template <typename VecTy, typename FnTy, SyncType syncType, bool async,
1192  bool identity_offsets = false, bool parallelize = true>
1193  void setSubset(const std::string& loopName, const VecTy& indices, size_t size,
1196  galois::DynamicBitSet& bit_set_compute, size_t start = 0) {
1197  std::string syncTypeStr = (syncType == syncReduce) ? "Reduce" : "Broadcast";
1198  std::string doall_str(syncTypeStr + "SetVal_" +
1199  get_run_identifier(loopName));
1200 
1201  if (parallelize) {
1203  galois::iterate(start, start + size),
1204  [&](unsigned int n) {
1205  unsigned int offset;
1206  if (identity_offsets)
1207  offset = n;
1208  else
1209  offset = offsets[n];
1210  auto lid = indices[offset];
1211  setWrapper<FnTy, syncType, async>(lid, val_vec[n - start],
1212  bit_set_compute);
1213  },
1214 #if GALOIS_COMM_STATS
1215  galois::loopname(get_run_identifier(doall_str).c_str()),
1216 #endif
1217  galois::no_stats());
1218  } else {
1219  for (unsigned int n = start; n < start + size; ++n) {
1220  unsigned int offset;
1221  if (identity_offsets)
1222  offset = n;
1223  else
1224  offset = offsets[n];
1225  auto lid = indices[offset];
1226  setWrapper<FnTy, syncType, async>(lid, val_vec[n - start],
1227  bit_set_compute);
1228  }
1229  }
1230  }
1231 
1243  template <typename FnTy, SyncType syncType, bool async>
1244  inline bool setBatchWrapper(unsigned x, galois::runtime::RecvBuffer& b) {
1245  if (syncType == syncReduce) {
1246  return FnTy::reduce_batch(x, b.getVec().data() + b.getOffset());
1247  } else {
1248  if (async) {
1249  return FnTy::reduce_mirror_batch(x, b.getVec().data() + b.getOffset());
1250  } else {
1251  return FnTy::setVal_batch(x, b.getVec().data() + b.getOffset());
1252  }
1253  }
1254  }
1255 
1272  template <typename FnTy, SyncType syncType, bool async>
1273  inline bool setBatchWrapper(unsigned x, galois::runtime::RecvBuffer& b,
1274  DataCommMode& data_mode) {
1275  if (syncType == syncReduce) {
1276  return FnTy::reduce_batch(x, b.getVec().data() + b.getOffset(),
1277  data_mode);
1278  } else {
1279  if (async) {
1280  return FnTy::reduce_mirror_batch(x, b.getVec().data() + b.getOffset(),
1281  data_mode);
1282  } else {
1283  return FnTy::setVal_batch(x, b.getVec().data() + b.getOffset(),
1284  data_mode);
1285  }
1286  }
1287  }
1288 
1290  // Sends
1292 
1306  template <SyncType syncType, typename SyncFnTy, bool async,
1307  typename std::enable_if<galois::runtime::is_memory_copyable<
1308  typename SyncFnTy::ValTy>::value>::type* = nullptr>
1309  void syncExtract(std::string loopName, unsigned from_id,
1310  std::vector<size_t>& indices,
1312  uint32_t num = indices.size();
1314  val_vec; // sometimes wasteful
1315  galois::PODResizeableArray<unsigned int>& offsets = syncOffsets;
1316  std::string syncTypeStr = (syncType == syncReduce) ? "Reduce" : "Broadcast";
1317  std::string extract_timer_str(syncTypeStr + "Extract_" +
1318  get_run_identifier(loopName));
1319  galois::CondStatTimer<GALOIS_COMM_STATS> Textract(extract_timer_str.c_str(),
1320  RNAME);
1321  std::string extract_batch_timer_str(syncTypeStr + "ExtractBatch_" +
1322  get_run_identifier(loopName));
1324  extract_batch_timer_str.c_str(), RNAME);
1325 
1326  DataCommMode data_mode;
1327 
1328  Textract.start();
1329 
1330  if (num > 0) {
1331  data_mode = onlyData;
1332  b.reserve(sizeof(DataCommMode) + sizeof(size_t) +
1333  (num * sizeof(typename SyncFnTy::ValTy)));
1334 
1335  Textractbatch.start();
1336  bool batch_succeeded =
1337  extractBatchWrapper<SyncFnTy, syncType>(from_id, b);
1338  Textractbatch.stop();
1339 
1340  if (!batch_succeeded) {
1341  b.resize(0);
1342  val_vec.reserve(maxSharedSize);
1343  val_vec.resize(num);
1344  gSerialize(b, onlyData);
1345  auto lseq = gSerializeLazySeq(
1346  b, num,
1348  extractSubset<SyncFnTy, decltype(lseq), syncType, true, true>(
1349  loopName, indices, num, offsets, b, lseq);
1350  } else {
1351  b.resize(sizeof(DataCommMode) + sizeof(size_t) +
1352  (num * sizeof(typename SyncFnTy::ValTy)));
1353  }
1354  } else {
1355  b.resize(0);
1356  if (!async) {
1357  data_mode = noData;
1358  gSerialize(b, noData);
1359  }
1360  }
1361 
1362  Textract.stop();
1363 
1364  std::string metadata_str(syncTypeStr + "MetadataMode_" +
1365  std::to_string(data_mode) + "_" +
1366  get_run_identifier(loopName));
1367  galois::runtime::reportStatCond_Single<MORE_DIST_STATS>(RNAME, metadata_str,
1368  1);
1369  }
1370 
1388  template <SyncType syncType, typename SyncFnTy, bool async,
1389  typename std::enable_if<!galois::runtime::is_memory_copyable<
1390  typename SyncFnTy::ValTy>::value>::type* = nullptr>
1391  void syncExtract(std::string loopName, unsigned from_id,
1392  std::vector<size_t>& indices,
1394  std::string syncTypeStr = (syncType == syncReduce) ? "Reduce" : "Broadcast";
1395  std::string extract_timer_str(syncTypeStr + "Extract_" +
1396  get_run_identifier(loopName));
1397  galois::CondStatTimer<GALOIS_COMM_STATS> Textract(extract_timer_str.c_str(),
1398  RNAME);
1399  std::string extract_batch_timer_str(syncTypeStr + "ExtractBatch_" +
1400  get_run_identifier(loopName));
1402  extract_batch_timer_str.c_str(), RNAME);
1403 
1404  DataCommMode data_mode;
1405 
1406  uint32_t num = indices.size();
1408  static galois::PODResizeableArray<unsigned int> dummyVector;
1409 
1410  Textract.start();
1411 
1412  if (num > 0) {
1413  data_mode = onlyData;
1414  b.reserve(sizeof(DataCommMode) + sizeof(size_t) +
1415  (num * sizeof(typename SyncFnTy::ValTy)));
1416 
1417  Textractbatch.start();
1418  bool batch_succeeded =
1419  extractBatchWrapper<SyncFnTy, syncType>(from_id, b);
1420  Textractbatch.stop();
1421 
1422  if (!batch_succeeded) {
1423  b.resize(0);
1424  val_vec.reserve(maxSharedSize);
1425  val_vec.resize(num);
1426  // get everything (note I pass in "indices" as offsets as it won't
1427  // even get used anyways)
1428  extractSubset<SyncFnTy, syncType, true, true>(loopName, indices, num,
1429  dummyVector, val_vec);
1430  gSerialize(b, onlyData, val_vec);
1431  } else {
1432  b.resize(sizeof(DataCommMode) + sizeof(size_t) +
1433  (num * sizeof(typename SyncFnTy::ValTy)));
1434  }
1435 
1436  } else {
1437  b.resize(0);
1438  if (!async) {
1439  data_mode = noData;
1440  gSerialize(b, noData);
1441  }
1442  }
1443 
1444  Textract.stop();
1445 
1446  std::string metadata_str(syncTypeStr + "MetadataMode_" +
1447  std::to_string(data_mode) + "_" +
1448  get_run_identifier(loopName));
1449  galois::runtime::reportStatCond_Single<MORE_DIST_STATS>(RNAME, metadata_str,
1450  1);
1451  }
1452 
1470  template <
1471  SyncType syncType, typename SyncFnTy, typename BitsetFnTy, bool async,
1472  typename std::enable_if<!BitsetFnTy::is_vector_bitset()>::type* = nullptr>
1473  void syncExtract(std::string loopName, unsigned from_id,
1474  std::vector<size_t>& indices,
1476  const galois::DynamicBitSet& bit_set_compute = BitsetFnTy::get();
1477  uint64_t manualBitsetCount = bit_set_compute.count();
1478  uint32_t num = indices.size();
1479  galois::DynamicBitSet& bit_set_comm = syncBitset;
1481  galois::PODResizeableArray<unsigned int>& offsets = syncOffsets;
1482 
1483  std::string syncTypeStr = (syncType == syncReduce) ? "Reduce" : "Broadcast";
1484  std::string extract_timer_str(syncTypeStr + "Extract_" +
1485  get_run_identifier(loopName));
1486  galois::CondStatTimer<GALOIS_COMM_STATS> Textract(extract_timer_str.c_str(),
1487  RNAME);
1488  std::string extract_alloc_timer_str(syncTypeStr + "ExtractAlloc_" +
1489  get_run_identifier(loopName));
1491  extract_alloc_timer_str.c_str(), RNAME);
1492  std::string extract_batch_timer_str(syncTypeStr + "ExtractBatch_" +
1493  get_run_identifier(loopName));
1495  extract_batch_timer_str.c_str(), RNAME);
1496 
1497  DataCommMode data_mode;
1498 
1499  Textract.start();
1500 
1501  if (num > 0 && manualBitsetCount > 0) {
1502  // if (num > 0) {
1503  size_t bit_set_count = 0;
1504  Textractalloc.start();
1505  if (substrateDataMode == gidsData) {
1506  b.reserve(sizeof(DataCommMode) + sizeof(bit_set_count) +
1507  sizeof(size_t) + (num * sizeof(unsigned int)) +
1508  sizeof(size_t) + (num * sizeof(typename SyncFnTy::ValTy)));
1509  } else if (substrateDataMode == offsetsData) {
1510  b.reserve(sizeof(DataCommMode) + sizeof(bit_set_count) +
1511  sizeof(size_t) + (num * sizeof(unsigned int)) +
1512  sizeof(size_t) + (num * sizeof(typename SyncFnTy::ValTy)));
1513  } else if (substrateDataMode == bitsetData) {
1514  size_t bitset_alloc_size = ((num + 63) / 64) * sizeof(uint64_t);
1515  b.reserve(sizeof(DataCommMode) + sizeof(bit_set_count) +
1516  sizeof(size_t) // bitset size
1517  + sizeof(size_t) // bitset vector size
1518  + bitset_alloc_size + sizeof(size_t) +
1519  (num * sizeof(typename SyncFnTy::ValTy)));
1520  } else { // onlyData or noData (auto)
1521  size_t bitset_alloc_size = ((num + 63) / 64) * sizeof(uint64_t);
1522  b.reserve(sizeof(DataCommMode) + sizeof(bit_set_count) +
1523  sizeof(size_t) // bitset size
1524  + sizeof(size_t) // bitset vector size
1525  + bitset_alloc_size + sizeof(size_t) +
1526  (num * sizeof(typename SyncFnTy::ValTy)));
1527  }
1528  Textractalloc.stop();
1529 
1530  Textractbatch.start();
1531  bool batch_succeeded = extractBatchWrapper<SyncFnTy, syncType>(
1532  from_id, b, bit_set_count, data_mode);
1533  Textractbatch.stop();
1534 
1535  // GPUs have a batch function they can use; CPUs do not; therefore,
1536  // CPUS always enter this if block
1537  if (!batch_succeeded) {
1538  Textractalloc.start();
1539  b.resize(0);
1540  bit_set_comm.reserve(maxSharedSize);
1541  offsets.reserve(maxSharedSize);
1542  val_vec.reserve(maxSharedSize);
1543  bit_set_comm.resize(num);
1544  offsets.resize(num);
1545  val_vec.resize(num);
1546  Textractalloc.stop();
1547 
1548  getBitsetAndOffsets<SyncFnTy, syncType>(
1549  loopName, indices, bit_set_compute, bit_set_comm, offsets,
1550  bit_set_count, data_mode);
1551 
1552  if (data_mode == onlyData) {
1553  bit_set_count = indices.size();
1554  extractSubset<SyncFnTy, syncType, true, true>(
1555  loopName, indices, bit_set_count, offsets, val_vec);
1556  } else if (data_mode !=
1557  noData) { // bitsetData or offsetsData or gidsData
1558  extractSubset<SyncFnTy, syncType, false, true>(
1559  loopName, indices, bit_set_count, offsets, val_vec);
1560  }
1561  serializeMessage<async, syncType>(loopName, data_mode, bit_set_count,
1562  indices, offsets, bit_set_comm,
1563  val_vec, b);
1564  } else {
1565  if (data_mode == noData) {
1566  b.resize(0);
1567  if (!async) {
1568  gSerialize(b, data_mode);
1569  }
1570  } else if (data_mode == gidsData) {
1571  b.resize(sizeof(DataCommMode) + sizeof(bit_set_count) +
1572  sizeof(size_t) + (bit_set_count * sizeof(unsigned int)) +
1573  sizeof(size_t) +
1574  (bit_set_count * sizeof(typename SyncFnTy::ValTy)));
1575  } else if (data_mode == offsetsData) {
1576  b.resize(sizeof(DataCommMode) + sizeof(bit_set_count) +
1577  sizeof(size_t) + (bit_set_count * sizeof(unsigned int)) +
1578  sizeof(size_t) +
1579  (bit_set_count * sizeof(typename SyncFnTy::ValTy)));
1580  } else if (data_mode == bitsetData) {
1581  size_t bitset_alloc_size = ((num + 63) / 64) * sizeof(uint64_t);
1582  b.resize(sizeof(DataCommMode) + sizeof(bit_set_count) +
1583  sizeof(size_t) // bitset size
1584  + sizeof(size_t) // bitset vector size
1585  + bitset_alloc_size + sizeof(size_t) +
1586  (bit_set_count * sizeof(typename SyncFnTy::ValTy)));
1587  } else { // onlyData
1588  b.resize(sizeof(DataCommMode) + sizeof(size_t) +
1589  (num * sizeof(typename SyncFnTy::ValTy)));
1590  }
1591  }
1592 
1593  reportRedundantSize<SyncFnTy>(loopName, syncTypeStr, num, bit_set_count,
1594  bit_set_comm);
1595  } else {
1596  b.resize(0);
1597  if (!async) {
1598  data_mode = noData;
1599  gSerialize(b, noData);
1600  }
1601  }
1602 
1603  Textract.stop();
1604 
1605  std::string metadata_str(syncTypeStr + "MetadataMode_" +
1606  std::to_string(data_mode) + "_" +
1607  get_run_identifier(loopName));
1608  galois::runtime::reportStatCond_Single<MORE_DIST_STATS>(RNAME, metadata_str,
1609  1);
1610  }
1611 
1612 #ifdef GALOIS_USE_BARE_MPI
1613 
1616  template <SyncType syncType, typename SyncFnTy, typename BitsetFnTy>
1617  void sync_mpi_send(std::string loopName) {
1618  static std::vector<galois::runtime::SendBuffer> b;
1619  static std::vector<MPI_Request> request;
1620  b.resize(numHosts);
1621  request.resize(numHosts, MPI_REQUEST_NULL);
1622 
1623  for (unsigned h = 1; h < numHosts; ++h) {
1624  unsigned x = (id + h) % numHosts;
1625 
1626  if (nothingToSend(x, syncType))
1627  continue;
1628 
1629  int ready = 0;
1630  MPI_Test(&request[x], &ready, MPI_STATUS_IGNORE);
1631  if (!ready) {
1632  assert(b[x].size() > 0);
1633  MPI_Wait(&request[x], MPI_STATUS_IGNORE);
1634  }
1635  if (b[x].size() > 0) {
1636  b[x].getVec().clear();
1637  }
1638 
1639  getSendBuffer<syncType, SyncFnTy, BitsetFnTy>(loopName, x, b[x]);
1640 
1641  MPI_Isend((uint8_t*)b[x].linearData(), b[x].size(), MPI_BYTE, x, 32767,
1642  MPI_COMM_WORLD, &request[x]);
1643  }
1644 
1645  if (BitsetFnTy::is_valid() && syncType == syncBroadcast) {
1646  reset_bitset(&BitsetFnTy::reset_range);
1647  }
1648  }
1649 
1653  template <SyncType syncType, typename SyncFnTy, typename BitsetFnTy>
1654  void sync_mpi_put(std::string loopName, const MPI_Group& mpi_access_group,
1655  const std::vector<MPI_Win>& window) {
1656 
1657  MPI_Win_start(mpi_access_group, 0, window[id]);
1658 
1659  std::vector<galois::runtime::SendBuffer> b(numHosts);
1660  std::vector<size_t> size(numHosts);
1661  uint64_t send_buffers_size = 0;
1662 
1663  for (unsigned h = 1; h < numHosts; ++h) {
1664  unsigned x = (id + h) % numHosts;
1665 
1666  if (nothingToSend(x, syncType))
1667  continue;
1668 
1669  getSendBuffer<syncType, SyncFnTy, BitsetFnTy>(loopName, x, b[x]);
1670 
1671  size[x] = b[x].size();
1672  send_buffers_size += size[x];
1673  MPI_Put((uint8_t*)&size[x], sizeof(size_t), MPI_BYTE, x, 0,
1674  sizeof(size_t), MPI_BYTE, window[id]);
1675  MPI_Put((uint8_t*)b[x].linearData(), size[x], MPI_BYTE, x, sizeof(size_t),
1676  size[x], MPI_BYTE, window[id]);
1677  }
1678 
1680  net.incrementMemUsage(send_buffers_size);
1681 
1682  MPI_Win_complete(window[id]);
1683  net.decrementMemUsage(send_buffers_size);
1684 
1685  if (BitsetFnTy::is_valid() && syncType == syncBroadcast) {
1686  reset_bitset(&BitsetFnTy::reset_range);
1687  }
1688  }
1689 #endif
1690 
1701  template <SyncType syncType, typename SyncFnTy, typename BitsetFnTy,
1702  bool async>
1703  void syncNetSend(std::string loopName) {
1705  b; // although a static variable, allocation not reused
1706  // due to std::move in net.sendTagged()
1707 
1709  std::string syncTypeStr = (syncType == syncReduce) ? "Reduce" : "Broadcast";
1710  std::string statNumMessages_str(syncTypeStr + "NumMessages_" +
1711  get_run_identifier(loopName));
1712 
1713  size_t numMessages = 0;
1714  for (unsigned h = 1; h < numHosts; ++h) {
1715  unsigned x = (id + h) % numHosts;
1716 
1717  if (nothingToSend(x, syncType))
1718  continue;
1719 
1720  getSendBuffer<syncType, SyncFnTy, BitsetFnTy, async>(loopName, x, b);
1721 
1722  if ((!async) || (b.size() > 0)) {
1723  size_t syncTypePhase = 0;
1724  if (async && (syncType == syncBroadcast))
1725  syncTypePhase = 1;
1726  net.sendTagged(x, galois::runtime::evilPhase, b, syncTypePhase);
1727  ++numMessages;
1728  }
1729  }
1730  if (!async) {
1731  // Will force all messages to be processed before continuing
1732  net.flush();
1733  }
1734 
1735  if (BitsetFnTy::is_valid() && syncType == syncBroadcast) {
1736  reset_bitset(&BitsetFnTy::reset_range);
1737  }
1738 
1739  galois::runtime::reportStat_Tsum(RNAME, statNumMessages_str, numMessages);
1740  }
1741 
1752  template <SyncType syncType, typename SyncFnTy, typename BitsetFnTy,
1753  bool async>
1754  void syncSend(std::string loopName) {
1755  std::string syncTypeStr = (syncType == syncReduce) ? "Reduce" : "Broadcast";
1757  (syncTypeStr + "Send_" + get_run_identifier(loopName)).c_str(), RNAME);
1758 
1759  TSendTime.start();
1760  syncNetSend<syncType, SyncFnTy, BitsetFnTy, async>(loopName);
1761  TSendTime.stop();
1762  }
1763 
1765  // Receives
1767 
1783  template <
1784  SyncType syncType, typename SyncFnTy, typename BitsetFnTy, bool async,
1785  typename std::enable_if<!BitsetFnTy::is_vector_bitset()>::type* = nullptr>
1786  size_t syncRecvApply(uint32_t from_id, galois::runtime::RecvBuffer& buf,
1787  std::string loopName) {
1788  std::string syncTypeStr = (syncType == syncReduce) ? "Reduce" : "Broadcast";
1789  std::string set_timer_str(syncTypeStr + "Set_" +
1790  get_run_identifier(loopName));
1791  galois::CondStatTimer<GALOIS_COMM_STATS> Tset(set_timer_str.c_str(), RNAME);
1792  std::string set_batch_timer_str(syncTypeStr + "SetBatch_" +
1793  get_run_identifier(loopName));
1795  set_batch_timer_str.c_str(), RNAME);
1796 
1797  galois::DynamicBitSet& bit_set_comm = syncBitset;
1799  galois::PODResizeableArray<unsigned int>& offsets = syncOffsets;
1800 
1801  auto& sharedEdges = (syncType == syncReduce) ? masterEdges : mirrorEdges;
1802  uint32_t num = sharedEdges[from_id].size();
1803  size_t retval = 0;
1804  Tset.start();
1805 
1806  if (num > 0) { // only enter if we expect message from that host
1807  DataCommMode data_mode;
1808  // 1st deserialize gets data mode
1809  galois::runtime::gDeserialize(buf, data_mode);
1810 
1811  if (data_mode != noData) {
1812  // GPU update call
1813  Tsetbatch.start();
1814  bool batch_succeeded =
1815  setBatchWrapper<SyncFnTy, syncType, async>(from_id, buf, data_mode);
1816  Tsetbatch.stop();
1817 
1818  // cpu always enters this block
1819  if (!batch_succeeded) {
1820  size_t bit_set_count = num;
1821  size_t buf_start = 0;
1822 
1823  // deserialize the rest of the data in the buffer depending on the
1824  // data mode; arguments passed in here are mostly output vars
1825  deserializeMessage<syncType>(loopName, data_mode, num, buf,
1826  bit_set_count, offsets, bit_set_comm,
1827  buf_start, retval, val_vec);
1828 
1829  bit_set_comm.reserve(maxSharedSize);
1830  offsets.reserve(maxSharedSize);
1831  val_vec.reserve(maxSharedSize);
1832 
1833  galois::DynamicBitSet& bit_set_compute = BitsetFnTy::get();
1834 
1835  if (data_mode == bitsetData) {
1836  size_t bit_set_count2;
1837  getOffsetsFromBitset<syncType>(loopName, bit_set_comm, offsets,
1838  bit_set_count2);
1839  assert(bit_set_count == bit_set_count2);
1840  }
1841 
1842  if (data_mode == onlyData) {
1843  setSubset<decltype(sharedEdges[from_id]), SyncFnTy, syncType, async,
1844  true, true>(loopName, sharedEdges[from_id], bit_set_count,
1845  offsets, val_vec, bit_set_compute);
1846  } else if (data_mode == dataSplit || data_mode == dataSplitFirst) {
1847  setSubset<decltype(sharedEdges[from_id]), SyncFnTy, syncType, async,
1848  true, true>(loopName, sharedEdges[from_id], bit_set_count,
1849  offsets, val_vec, bit_set_compute, buf_start);
1850  } else if (data_mode == gidsData) {
1851  setSubset<decltype(offsets), SyncFnTy, syncType, async, true, true>(
1852  loopName, offsets, bit_set_count, offsets, val_vec,
1853  bit_set_compute);
1854  } else { // bitsetData or offsetsData
1855  setSubset<decltype(sharedEdges[from_id]), SyncFnTy, syncType, async,
1856  false, true>(loopName, sharedEdges[from_id],
1857  bit_set_count, offsets, val_vec,
1858  bit_set_compute);
1859  }
1860  // TODO: reduce could update the bitset, so it needs to be copied
1861  // back to the device
1862  }
1863  }
1864  }
1865 
1866  Tset.stop();
1867 
1868  return retval;
1869  }
1870 
1871 #ifdef GALOIS_USE_BARE_MPI
1872 
1875  template <SyncType syncType, typename SyncFnTy, typename BitsetFnTy>
1876  void sync_mpi_recv_post(std::string loopName,
1877  std::vector<MPI_Request>& request,
1878  const std::vector<std::vector<uint8_t>>& rb) {
1879  for (unsigned h = 1; h < numHosts; ++h) {
1880  unsigned x = (id + numHosts - h) % numHosts;
1881  if (nothingToRecv(x, syncType))
1882  continue;
1883 
1884  MPI_Irecv((uint8_t*)rb[x].data(), rb[x].size(), MPI_BYTE, x, 32767,
1885  MPI_COMM_WORLD, &request[x]);
1886  }
1887  }
1888 
1892  template <SyncType syncType, typename SyncFnTy, typename BitsetFnTy>
1893  void sync_mpi_recv_wait(std::string loopName,
1894  std::vector<MPI_Request>& request,
1895  const std::vector<std::vector<uint8_t>>& rb) {
1896  for (unsigned h = 1; h < numHosts; ++h) {
1897  unsigned x = (id + numHosts - h) % numHosts;
1898  if (nothingToRecv(x, syncType))
1899  continue;
1900 
1901  MPI_Status status;
1902  MPI_Wait(&request[x], &status);
1903 
1904  int size = 0;
1905  MPI_Get_count(&status, MPI_BYTE, &size);
1906 
1907  galois::runtime::RecvBuffer rbuf(rb[x].begin(), rb[x].begin() + size);
1908 
1909  syncRecvApply<syncType, SyncFnTy, BitsetFnTy>(x, rbuf, loopName);
1910  }
1911  }
1912 
1916  template <SyncType syncType, typename SyncFnTy, typename BitsetFnTy>
1917  void sync_mpi_get(std::string loopName, const std::vector<MPI_Win>& window,
1918  const std::vector<std::vector<uint8_t>>& rb) {
1919  for (unsigned h = 1; h < numHosts; ++h) {
1920  unsigned x = (id + numHosts - h) % numHosts;
1921  if (nothingToRecv(x, syncType))
1922  continue;
1923 
1924  MPI_Win_wait(window[x]);
1925 
1926  size_t size = 0;
1927  memcpy(&size, rb[x].data(), sizeof(size_t));
1928 
1929  galois::runtime::RecvBuffer rbuf(rb[x].begin() + sizeof(size_t),
1930  rb[x].begin() + sizeof(size_t) + size);
1931 
1932  MPI_Win_post(mpi_identity_groups[x], 0, window[x]);
1933 
1934  syncRecvApply<syncType, SyncFnTy, BitsetFnTy>(x, rbuf, loopName);
1935  }
1936  }
1937 #endif
1938 
1949  template <SyncType syncType, typename SyncFnTy, typename BitsetFnTy,
1950  bool async>
1951  void syncNetRecv(std::string loopName) {
1953  std::string wait_timer_str("Wait_" + get_run_identifier(loopName));
1954  galois::CondStatTimer<GALOIS_COMM_STATS> Twait(wait_timer_str.c_str(),
1955  RNAME);
1956 
1957  if (async) {
1958  size_t syncTypePhase = 0;
1959  if (syncType == syncBroadcast)
1960  syncTypePhase = 1;
1961  decltype(net.recieveTagged(galois::runtime::evilPhase, nullptr,
1962  syncTypePhase)) p;
1963  do {
1964  p = net.recieveTagged(galois::runtime::evilPhase, nullptr,
1965  syncTypePhase);
1966 
1967  if (p) {
1968  syncRecvApply<syncType, SyncFnTy, BitsetFnTy, async>(
1969  p->first, p->second, loopName);
1970  }
1971  } while (p);
1972  } else {
1973  for (unsigned x = 0; x < numHosts; ++x) {
1974  if (x == id)
1975  continue;
1976  if (nothingToRecv(x, syncType))
1977  continue;
1978 
1979  Twait.start();
1980  decltype(net.recieveTagged(galois::runtime::evilPhase, nullptr)) p;
1981  do {
1982  p = net.recieveTagged(galois::runtime::evilPhase, nullptr);
1983  } while (!p);
1984  Twait.stop();
1985 
1986  syncRecvApply<syncType, SyncFnTy, BitsetFnTy, async>(
1987  p->first, p->second, loopName);
1988  }
1989  incrementEvilPhase();
1990  }
1991  }
1992 
2003  template <SyncType syncType, typename SyncFnTy, typename BitsetFnTy,
2004  bool async>
2005  void syncRecv(std::string loopName) {
2006  std::string syncTypeStr = (syncType == syncReduce) ? "Reduce" : "Broadcast";
2008  (syncTypeStr + "Recv_" + get_run_identifier(loopName)).c_str(), RNAME);
2009 
2010  TRecvTime.start();
2011  syncNetRecv<syncType, SyncFnTy, BitsetFnTy, async>(loopName);
2012  TRecvTime.stop();
2013  }
2014 
2016 // MPI sync variants
2018 #ifdef GALOIS_USE_BARE_MPI
2019 
2022  template <SyncType syncType, typename SyncFnTy, typename BitsetFnTy>
2023  void syncNonblockingMPI(std::string loopName,
2024  bool use_bitset_to_send = true) {
2025  std::string syncTypeStr = (syncType == syncReduce) ? "Reduce" : "Broadcast";
2027  (syncTypeStr + "Send_" + get_run_identifier(loopName)).c_str(), RNAME);
2029  (syncTypeStr + "Recv_" + get_run_identifier(loopName)).c_str(), RNAME);
2030 
2031  static std::vector<std::vector<uint8_t>> rb;
2032  static std::vector<MPI_Request> request;
2033 
2034  if (rb.size() == 0) { // create the receive buffers
2035  TRecvTime.start();
2036  auto& sharedEdges = (syncType == syncReduce) ? masterEdges : mirrorEdges;
2037  rb.resize(numHosts);
2038  request.resize(numHosts, MPI_REQUEST_NULL);
2039 
2040  for (unsigned h = 1; h < numHosts; ++h) {
2041  unsigned x = (id + numHosts - h) % numHosts;
2042  if (nothingToRecv(x, syncType))
2043  continue;
2044 
2045  size_t size =
2046  (sharedEdges[x].size() * sizeof(typename SyncFnTy::ValTy));
2047  size += sizeof(size_t); // vector size
2048  size += sizeof(DataCommMode); // data mode
2049 
2050  rb[x].resize(size);
2051  }
2052  TRecvTime.stop();
2053  }
2054 
2055  TRecvTime.start();
2056  sync_mpi_recv_post<syncType, SyncFnTy, BitsetFnTy>(loopName, request, rb);
2057  TRecvTime.stop();
2058 
2059  TSendTime.start();
2060  if (use_bitset_to_send) {
2061  sync_mpi_send<syncType, SyncFnTy, BitsetFnTy>(loopName);
2062  } else {
2063  sync_mpi_send<syncType, SyncFnTy, galois::InvalidBitsetFnTy>(loopName);
2064  }
2065  TSendTime.stop();
2066 
2067  TRecvTime.start();
2068  sync_mpi_recv_wait<syncType, SyncFnTy, BitsetFnTy>(loopName, request, rb);
2069  TRecvTime.stop();
2070  }
2071 
2075  template <SyncType syncType, typename SyncFnTy, typename BitsetFnTy>
2076  void syncOnesidedMPI(std::string loopName, bool use_bitset_to_send = true) {
2077  std::string syncTypeStr = (syncType == syncReduce) ? "Reduce" : "Broadcast";
2079  (syncTypeStr + "Send_" + get_run_identifier(loopName)).c_str(), RNAME);
2081  (syncTypeStr + "Recv_" + get_run_identifier(loopName)).c_str(), RNAME);
2082 
2083  static std::vector<MPI_Win> window;
2084  static MPI_Group mpi_access_group;
2085  static std::vector<std::vector<uint8_t>> rb;
2086 
2087  if (window.size() == 0) { // create the windows
2088  TRecvTime.start();
2089  auto& sharedEdges = (syncType == syncReduce) ? masterEdges : mirrorEdges;
2090  window.resize(numHosts);
2091  rb.resize(numHosts);
2092 
2093  uint64_t recv_buffers_size = 0;
2094  for (unsigned x = 0; x < numHosts; ++x) {
2095  size_t size = sharedEdges[x].size() * sizeof(typename SyncFnTy::ValTy);
2096  size += sizeof(size_t); // vector size
2097  size += sizeof(DataCommMode); // data mode
2098  size += sizeof(size_t); // buffer size
2099  recv_buffers_size += size;
2100 
2101  rb[x].resize(size);
2102 
2103  MPI_Info info;
2104  MPI_Info_create(&info);
2105  MPI_Info_set(info, "no_locks", "true");
2106  MPI_Info_set(info, "same_disp_unit", "true");
2107 
2108  MPI_Win_create(rb[x].data(), size, 1, info, MPI_COMM_WORLD, &window[x]);
2109 
2110  MPI_Info_free(&info);
2111  }
2113  net.incrementMemUsage(recv_buffers_size);
2114 
2115  for (unsigned h = 1; h < numHosts; ++h) {
2116  unsigned x = (id + numHosts - h) % numHosts;
2117  if (nothingToRecv(x, syncType))
2118  continue;
2119  // exposure group of each window is same as identity group of that
2120  // window
2121  MPI_Win_post(mpi_identity_groups[x], 0, window[x]);
2122  }
2123  TRecvTime.stop();
2124 
2125  TSendTime.start();
2126  std::vector<int> access_hosts;
2127  for (unsigned h = 1; h < numHosts; ++h) {
2128  unsigned x = (id + h) % numHosts;
2129 
2130  if (nothingToSend(x, syncType))
2131  continue;
2132 
2133  access_hosts.push_back(x);
2134  }
2135  MPI_Group world_group;
2136  MPI_Comm_group(MPI_COMM_WORLD, &world_group);
2137  // access group for only one window since only one window is accessed
2138  MPI_Group_incl(world_group, access_hosts.size(), access_hosts.data(),
2139  &mpi_access_group);
2140  TSendTime.stop();
2141  }
2142 
2143  TSendTime.start();
2144  if (use_bitset_to_send) {
2145  sync_mpi_put<syncType, SyncFnTy, BitsetFnTy>(loopName, mpi_access_group,
2146  window);
2147  } else {
2148  sync_mpi_put<syncType, SyncFnTy, galois::InvalidBitsetFnTy>(
2149  loopName, mpi_access_group, window);
2150  }
2151  TSendTime.stop();
2152 
2153  TRecvTime.start();
2154  sync_mpi_get<syncType, SyncFnTy, BitsetFnTy>(loopName, window, rb);
2155  TRecvTime.stop();
2156  }
2157 #endif
2158 
2160  // Higher Level Sync Calls (broadcast/reduce, etc)
2162 
2171  template <typename ReduceFnTy, typename BitsetFnTy, bool async>
2172  inline void reduce(std::string loopName) {
2173  std::string timer_str("Reduce_" + get_run_identifier(loopName));
2174  galois::CondStatTimer<GALOIS_COMM_STATS> TsyncReduce(timer_str.c_str(),
2175  RNAME);
2176  TsyncReduce.start();
2177 
2178 #ifdef GALOIS_USE_BARE_MPI
2179  switch (bare_mpi) {
2180  case noBareMPI:
2181 #endif
2182  syncSend<syncReduce, ReduceFnTy, BitsetFnTy, async>(loopName);
2183  syncRecv<syncReduce, ReduceFnTy, BitsetFnTy, async>(loopName);
2184 #ifdef GALOIS_USE_BARE_MPI
2185  break;
2186  case nonBlockingBareMPI:
2187  syncNonblockingMPI<syncReduce, ReduceFnTy, BitsetFnTy>(loopName);
2188  break;
2189  case oneSidedBareMPI:
2190  syncOnesidedMPI<syncReduce, ReduceFnTy, BitsetFnTy>(loopName);
2191  break;
2192  default:
2193  GALOIS_DIE("unsupported bare MPI");
2194  }
2195 #endif
2196 
2197  TsyncReduce.stop();
2198  }
2199 
2208  template <typename BroadcastFnTy, typename BitsetFnTy, bool async>
2209  inline void broadcast(std::string loopName) {
2210  std::string timer_str("Broadcast_" + get_run_identifier(loopName));
2211  galois::CondStatTimer<GALOIS_COMM_STATS> TsyncBroadcast(timer_str.c_str(),
2212  RNAME);
2213 
2214  TsyncBroadcast.start();
2215 
2216  bool use_bitset = true;
2217 
2218 #ifdef GALOIS_USE_BARE_MPI
2219  switch (bare_mpi) {
2220  case noBareMPI:
2221 #endif
2222  if (use_bitset) {
2223  syncSend<syncBroadcast, BroadcastFnTy, BitsetFnTy, async>(loopName);
2224  } else {
2225  syncSend<syncBroadcast, BroadcastFnTy, galois::InvalidBitsetFnTy,
2226  async>(loopName);
2227  }
2228  syncRecv<syncBroadcast, BroadcastFnTy, BitsetFnTy, async>(loopName);
2229 #ifdef GALOIS_USE_BARE_MPI
2230  break;
2231  case nonBlockingBareMPI:
2232  syncNonblockingMPI<syncBroadcast, BroadcastFnTy, BitsetFnTy>(loopName,
2233  use_bitset);
2234  break;
2235  case oneSidedBareMPI:
2236  syncOnesidedMPI<syncBroadcast, BroadcastFnTy, BitsetFnTy>(loopName,
2237  use_bitset);
2238  break;
2239  default:
2240  GALOIS_DIE("unsupported bare MPI");
2241  }
2242 #endif
2243 
2244  TsyncBroadcast.stop();
2245  }
2246 
2255  template <typename SyncFnTy, typename BitsetFnTy, bool async>
2256  inline void sync_any_to_any(std::string loopName) {
2257  // reduce and broadcast for OEC, IEC, CVC, UVC
2258  reduce<SyncFnTy, BitsetFnTy, async>(loopName);
2259  broadcast<SyncFnTy, BitsetFnTy, async>(loopName);
2260  }
2261 
2263  // Public iterface: sync
2265 
2266 public:
2276  template <typename SyncFnTy, typename BitsetFnTy = galois::InvalidBitsetFnTy,
2277  bool async = false>
2278  inline void sync(std::string loopName) {
2279  std::string timer_str("Sync_" + loopName + "_" + get_run_identifier());
2280  galois::StatTimer Tsync(timer_str.c_str(), RNAME);
2281 
2282  Tsync.start();
2283  sync_any_to_any<SyncFnTy, BitsetFnTy, async>(loopName);
2284  Tsync.stop();
2285  }
2286 
2288  // GPU marshaling
2290 
2291 #ifdef GALOIS_ENABLE_GPU
2292 private:
2293  using GraphNode = typename GraphTy::GraphNode;
2294  using edge_iterator = typename GraphTy::edge_iterator;
2295  using EdgeTy = typename GraphTy::EdgeType;
2296 
2297  // Code that handles getting the graph onto the GPU
2298  template <bool isVoidType,
2299  typename std::enable_if<isVoidType>::type* = nullptr>
2300  inline void setMarshalEdge(EdgeMarshalGraph& GALOIS_UNUSED(m),
2301  const size_t GALOIS_UNUSED(index),
2302  const edge_iterator& GALOIS_UNUSED(e)) {
2303  // do nothing
2304  }
2305 
2306  template <bool isVoidType,
2307  typename std::enable_if<!isVoidType>::type* = nullptr>
2308  inline void setMarshalEdge(EdgeMarshalGraph& m, const size_t index,
2309  const edge_iterator& e) {
2310  m.edge_data[index] = userGraph.getEdgeData(e);
2311  }
2312 
2313 public:
2314  void getEdgeMarshalGraph(EdgeMarshalGraph& m, bool loadProxyEdges = true) {
2315  m.nnodes = userGraph.size();
2316  m.nedges = userGraph.sizeEdges();
2317  m.numOwned = userGraph.numMasters();
2319  m.beginMaster = 0;
2320  m.numNodesWithEdges = userGraph.getNumNodesWithEdges();
2321  m.id = id;
2322  m.numHosts = numHosts;
2323  m.row_start = (index_type*)calloc(m.nnodes + 1, sizeof(index_type));
2324  m.edge_dst = (index_type*)calloc(m.nedges, sizeof(index_type));
2325  m.node_data = (index_type*)calloc(m.nnodes, sizeof(node_data_type));
2326 
2328  if (std::is_void<EdgeTy>::value) {
2329  m.edge_data = NULL;
2330  } else {
2331  if (!std::is_same<EdgeTy, edge_data_type>::value) {
2332  galois::gWarn("Edge data type mismatch between CPU and GPU\n");
2333  }
2334  m.edge_data = (edge_data_type*)calloc(m.nedges, sizeof(edge_data_type));
2335  }
2336 
2338  // TODO not using thread ranges, can be optimized if I can iterate
2339  // directly over userGraph
2340  galois::iterate(userGraph.allNodesRange()),
2341  [&](const GraphNode& nodeID) {
2342  // initialize node_data with localID-to-globalID mapping
2343  m.node_data[nodeID] =
2344  userGraph.getGID(nodeID); // this may not be required.
2345  m.row_start[nodeID] = *(userGraph.edge_begin(nodeID));
2346  for (auto e = userGraph.edge_begin(nodeID);
2347  e != userGraph.edge_end(nodeID); e++) {
2348  auto edgeID = *e;
2349  setMarshalEdge<std::is_void<EdgeTy>::value>(m, edgeID, e);
2350  m.edge_dst[edgeID] = userGraph.getEdgeDst(e);
2351  }
2352  },
2353  galois::steal());
2354 
2355  m.row_start[m.nnodes] = m.nedges;
2356 
2357  // TODO?
2358  // copy memoization meta-data
2359  if (loadProxyEdges) {
2360  m.num_master_edges =
2361  (unsigned int*)calloc(masterEdges.size(), sizeof(unsigned int));
2362  ;
2363  m.master_edges =
2364  (unsigned int**)calloc(masterEdges.size(), sizeof(unsigned int*));
2365  ;
2366 
2367  for (uint32_t h = 0; h < masterEdges.size(); ++h) {
2368  m.num_master_edges[h] = masterEdges[h].size();
2369 
2370  if (masterEdges[h].size() > 0) {
2371  m.master_edges[h] = (unsigned int*)calloc(masterEdges[h].size(),
2372  sizeof(unsigned int));
2373  ;
2374  std::copy(masterEdges[h].begin(), masterEdges[h].end(),
2375  m.master_edges[h]);
2376  } else {
2377  m.master_edges[h] = NULL;
2378  }
2379  }
2380 
2381  m.num_mirror_edges =
2382  (unsigned int*)calloc(mirrorEdges.size(), sizeof(unsigned int));
2383  ;
2384  m.mirror_edges =
2385  (unsigned int**)calloc(mirrorEdges.size(), sizeof(unsigned int*));
2386  ;
2387  for (uint32_t h = 0; h < mirrorEdges.size(); ++h) {
2388  m.num_mirror_edges[h] = mirrorEdges[h].size();
2389 
2390  if (mirrorEdges[h].size() > 0) {
2391  m.mirror_edges[h] = (unsigned int*)calloc(mirrorEdges[h].size(),
2392  sizeof(unsigned int));
2393  ;
2394  std::copy(mirrorEdges[h].begin(), mirrorEdges[h].end(),
2395  m.mirror_edges[h]);
2396  } else {
2397  m.mirror_edges[h] = NULL;
2398  }
2399  }
2400  }
2401 
2404  // userGraph.deallocate();
2405  }
2406 #endif // het galois def
2407 
2408 public:
2410  // Metadata settings/getters
2412 
2417  inline void set_num_run(const uint32_t runNum) { num_run = runNum; }
2418 
2424  inline uint32_t get_run_num() const { return num_run; }
2425 
2431  inline void set_num_round(const uint32_t round) { num_round = round; }
2432 
2441  inline std::string get_run_identifier() const {
2442 #if GALOIS_PER_ROUND_STATS
2443  return std::string(std::to_string(num_run) + "_" +
2444  std::to_string(num_round));
2445 #else
2446  return std::string(std::to_string(num_run));
2447 #endif
2448  }
2449 
2457  inline std::string get_run_identifier(std::string loop_name) const {
2458 #if GALOIS_PER_ROUND_STATS
2459  return std::string(std::string(loop_name) + "_" + std::to_string(num_run) +
2460  "_" + std::to_string(num_round));
2461 #else
2462  return std::string(std::string(loop_name) + "_" + std::to_string(num_run));
2463 #endif
2464  }
2465 
2477  inline std::string get_run_identifier(std::string loop_name,
2478  unsigned alterID) const {
2479 #if GALOIS_PER_ROUND_STATS
2480  return std::string(std::string(loop_name) + "_" + std::to_string(alterID) +
2481  "_" + std::to_string(num_run) + "_" +
2482  std::to_string(num_round));
2483 #else
2484  return std::string(std::string(loop_name) + "_" + std::to_string(alterID) +
2485  "_" + std::to_string(num_run));
2486 #endif
2487  }
2488 };
2489 
2490 template <typename GraphTy>
2491 constexpr const char* const galois::graphs::GluonEdgeSubstrate<GraphTy>::RNAME;
2492 } // end namespace graphs
2493 } // end namespace galois
2494 
2495 #endif // header guard
Contains macros for easily defining common Galois sync structures and the field flags class used for ...
std::string get_run_identifier(std::string loop_name, unsigned alterID) const
Get a run identifier using the set run and set round and append to the passed in string in addition t...
Definition: GluonEdgeSubstrate.h:2477
Definition: Traits.h:247
unsigned int * num_master_edges
Definition: EdgeHostDecls.h:51
void reserve(uint64_t n)
Reserves capacity for the bitset.
Definition: libgalois/include/galois/DynamicBitset.h:90
__global__ void bitset_reset_range(DynamicBitset *__restrict__ bitset, size_t vec_begin, size_t vec_end, bool test1, size_t bit_index1, uint64_t mask1, bool test2, size_t bit_index2, uint64_t mask2)
Definition: DeviceEdgeSync.h:298
size_t nedges
Definition: EdgeHostDecls.h:40
unsigned int numOwned
Definition: EdgeHostDecls.h:41
unsigned int getActiveThreads() noexcept
Returns the number of threads in use.
Definition: Threads.cpp:37
unsigned int ** mirror_edges
Definition: EdgeHostDecls.h:54
Definition: DataCommMode.h:35
unsigned int ** master_edges
Definition: EdgeHostDecls.h:52
index_type * row_start
Definition: EdgeHostDecls.h:47
void set_num_run(const uint32_t runNum)
Set the run number.
Definition: GluonEdgeSubstrate.h:2417
void resize(size_t n)
Definition: PODResizeableArray.h:142
Concurrent dynamically allocated bitset.
Definition: libgalois/include/galois/DynamicBitset.h:47
Buffer for serialization of data.
Definition: Serialize.h:56
void reserve(size_t s)
Reserve more space in the serialize buffer.
Definition: Serialize.h:110
GluonEdgeSubstrate()=delete
Delete default constructor: this class NEEDS to have a graph passed into it.
unsigned int index_type
Definition: EdgeHostDecls.h:33
Contains the DynamicBitSet class and most of its implementation.
void gDeserialize(DeSerializeBuffer &buf, T1 &&t1, Args &&...args)
Deserialize data in a buffer into a series of objects.
Definition: Serialize.h:1032
const char * loopname
Definition: Executor_ParaMeter.h:145
#define GALOIS_DIE(...)
Definition: gIO.h:96
Definition: DataCommMode.h:37
void reportStat_Tsum(const S1 &region, const S2 &category, const T &value)
Definition: Statistics.h:562
vTy & getVec()
Returns vector of data stored in this serialize buffer.
Definition: Serialize.h:115
bool test(size_t index) const
Check a bit to see if it is currently set.
Definition: libgalois/include/galois/DynamicBitset.h:192
Contains forward declarations and the definition of the EdgeMarshalGraph class, which is used to mars...
void reserve(size_t n)
Definition: PODResizeableArray.h:129
unsigned int numNodesWithEdges
Definition: EdgeHostDecls.h:43
int id
Definition: EdgeHostDecls.h:45
A class to be inherited from so that all child classes will have a tracked unique ID...
Definition: GlobalObj.h:43
void sync(std::string loopName)
Main sync call exposed to the user that calls the correct sync function based on provided template ar...
Definition: GluonEdgeSubstrate.h:2278
uint64_t count() const
Count how many bits are set in the bitset.
Definition: libgalois/include/galois/DynamicBitset.h:320
void resize(uint64_t n)
Resizes the bitset.
Definition: libgalois/include/galois/DynamicBitset.h:78
GluonEdgeSubstrate(GraphTy &_userGraph, unsigned host, unsigned numHosts, bool doNothing=false, DataCommMode _substrateDataMode=DataCommMode::noData)
Constructor for GluonEdgeSubstrate.
Definition: GluonEdgeSubstrate.h:361
unsigned int * num_mirror_edges
Definition: EdgeHostDecls.h:53
Contains the DataCommMode enumeration and a function that chooses a data comm mode based on its argum...
node_data_type * node_data
Definition: EdgeHostDecls.h:49
void reportStat_Single(const S1 &region, const S2 &category, const T &value)
Definition: Statistics.h:544
const Ty max(std::atomic< Ty > &a, const Ty &b)
Definition: AtomicHelpers.h:40
std::string get_run_identifier() const
Get a run identifier using the set run and set round.
Definition: GluonEdgeSubstrate.h:2441
void reset()
Gets the space taken by the bitset.
Definition: libgalois/include/galois/DynamicBitset.h:110
Contains declaration of DistStatManager, which reports runtime statistics of a distributed applicatio...
size_type size() const
Returns the size of the serialize buffer.
Definition: Serialize.h:125
void gPrint(Args &&...args)
Prints a sequence of things.
Definition: gIO.h:47
unsigned int activeThreads
Definition: Threads.cpp:26
Definition: Traits.h:206
NetworkInterface & getSystemNetworkInterface()
Get the network interface.
Definition: Network.cpp:131
size_t size() const
Gets the size of the bitset.
Definition: libgalois/include/galois/DynamicBitset.h:99
A structure representing an empty bitset.
Definition: libgalois/include/galois/DynamicBitset.h:413
unsigned int beginMaster
Definition: EdgeHostDecls.h:42
size_type size() const
Definition: PODResizeableArray.h:125
index_type * edge_dst
Definition: EdgeHostDecls.h:48
void reset(Ty &var, Ty val)
Definition: AtomicHelpers.h:202
send no data
Definition: DataCommMode.h:34
Definition: DataCommMode.h:38
Gluon communication substrate that handles communication given a user graph.
Definition: GluonEdgeSubstrate.h:67
void do_all(const RangeFunc &rangeMaker, FunctionTy &&fn, const Args &...args)
Standard do-all loop.
Definition: Loops.h:71
bool set(size_t index)
Set a bit in the bitset.
Definition: libgalois/include/galois/DynamicBitset.h:206
unsigned int node_data_type
Definition: EdgeHostDecls.h:34
Definition: DataCommMode.h:36
GlobalObject(const GlobalObject &)=delete
void start()
Definition: Timer.cpp:82
void on_each(FunctionTy &&fn, const Args &...args)
Low-level parallel loop.
Definition: Loops.h:86
size_t nnodes
Definition: EdgeHostDecls.h:39
Defines the GlobalObject class, which is a base class that other classes inherit from to be assigned ...
substrate::Barrier & getHostBarrier()
Returns a host barrier, which is a regular MPI-Like Barrier for all hosts.
Definition: libdist/src/Barrier.cpp:109
unsigned getOffset() const
Gets the current offset into the deserialize buffer.
Definition: Serialize.h:210
void resize(size_t bytes)
Definition: Serialize.h:103
DataCommMode enforcedDataMode
Specifies what format to send metadata in.
Definition: GluonSubstrate.cpp:29
unsigned numHosts
Definition: EdgeHostDecls.h:46
uint32_t evilPhase
Variable that keeps track of which network send/recv phase a program is currently on...
Definition: Network.cpp:36
Buffer for deserialization of data.
Definition: Serialize.h:147
auto iterate(C &cont)
Definition: Range.h:323
uint32_t get_run_num() const
Get the set run number.
Definition: GluonEdgeSubstrate.h:2424
pointer data()
Definition: PODResizeableArray.h:174
void gWarn(Args &&...args)
Prints a warning string from a sequence of things.
Definition: gIO.h:63
edge_data_type * edge_data
Definition: EdgeHostDecls.h:50
unsigned edge_data_type
Definition: EdgeHostDecls.h:35
Definition: Timer.h:88
void set_num_round(const uint32_t round)
Set the round number for use in the run identifier.
Definition: GluonEdgeSubstrate.h:2431
DataCommMode
Enumeration of data communication modes that can be used in synchronization.
Definition: DataCommMode.h:33
Contains the BareMPI enum and the command line option that controls bare MPI usage.
const auto & get_vec() const
Returns the underlying bitset representation to the user.
Definition: libgalois/include/galois/DynamicBitset.h:63
Definition: DataCommMode.h:39
Indicates if T is memory copyable.
Definition: ExtraTraits.h:64
Definition: DataCommMode.h:40
vTy & getVec()
Get the underlying vector storing the data of the deserialize buffer.
Definition: Serialize.h:244
void stop()
Definition: Timer.cpp:87
Galois Timer that automatically reports stats upon destruction Provides statistic interface around ti...
Definition: Timer.h:63
Definition: EdgeHostDecls.h:38
std::string get_run_identifier(std::string loop_name) const
Get a run identifier using the set run and set round and append to the passed in string.
Definition: GluonEdgeSubstrate.h:2457