Galois
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
NewGeneric.h
Go to the documentation of this file.
1 /*
2  * This file belongs to the Galois project, a C++ library for exploiting
3  * parallelism. The code is being released under the terms of the 3-Clause BSD
4  * License (a copy is located in LICENSE.txt at the top-level directory).
5  *
6  * Copyright (C) 2020, The University of Texas at Austin. All rights reserved.
7  * UNIVERSITY EXPRESSLY DISCLAIMS ANY AND ALL WARRANTIES CONCERNING THIS
8  * SOFTWARE AND DOCUMENTATION, INCLUDING ANY WARRANTIES OF MERCHANTABILITY,
9  * FITNESS FOR ANY PARTICULAR PURPOSE, NON-INFRINGEMENT AND WARRANTIES OF
10  * PERFORMANCE, AND ANY WARRANTY THAT MIGHT OTHERWISE ARISE FROM COURSE OF
11  * DEALING OR USAGE OF TRADE. NO WARRANTY IS EITHER EXPRESS OR IMPLIED WITH
12  * RESPECT TO THE USE OF THE SOFTWARE OR DOCUMENTATION. Under no circumstances
13  * shall University be liable for incidental, special, indirect, direct or
14  * consequential damages or loss of profits, interruption of business, or
15  * related expenses which may arise from use of Software or Documentation,
16  * including but not limited to those resulting from defects in Software and/or
17  * Documentation, or loss or inaccuracy of data of any kind.
18  */
19 
27 #ifndef _GALOIS_DIST_NEWGENERIC_H
28 #define _GALOIS_DIST_NEWGENERIC_H
29 
31 #include "galois/DReducible.h"
32 #include <optional>
33 #include <sstream>
34 
35 #define CUSP_PT_TIMER 0
36 
37 namespace galois {
38 namespace graphs {
46 template <typename NodeTy, typename EdgeTy, typename Partitioner>
47 class NewDistGraphGeneric : public DistGraph<NodeTy, EdgeTy> {
49  constexpr static unsigned edgePartitionSendBufSize = 8388608;
50  constexpr static const char* const GRNAME = "dGraph_Generic";
51  Partitioner* graphPartitioner;
52 
54  uint32_t _edgeStateRounds;
55  std::vector<galois::DGAccumulator<uint64_t>> hostLoads;
56  std::vector<uint64_t> old_hostLoads;
57 
58  uint32_t G2LEdgeCut(uint64_t gid, uint32_t globalOffset) const {
59  assert(isLocal(gid));
60  // optimized for edge cuts
61  if (gid >= globalOffset && gid < globalOffset + base_DistGraph::numOwned)
62  return gid - globalOffset;
63 
64  return base_DistGraph::globalToLocalMap.at(gid);
65  }
66 
70  template <typename V>
71  void freeVector(V& vectorToKill) {
72  V dummyVector;
73  vectorToKill.swap(dummyVector);
74  }
75 
76  uint32_t nodesToReceive;
77 
78 public:
81 
82  virtual unsigned getHostID(uint64_t gid) const {
83  assert(gid < base_DistGraph::numGlobalNodes);
84  return graphPartitioner->retrieveMaster(gid);
85  }
86 
87  virtual bool isOwned(uint64_t gid) const {
88  assert(gid < base_DistGraph::numGlobalNodes);
89  return (graphPartitioner->retrieveMaster(gid) == base_DistGraph::id);
90  }
91 
92  virtual bool isLocal(uint64_t gid) const {
93  assert(gid < base_DistGraph::numGlobalNodes);
94  return (base_DistGraph::globalToLocalMap.find(gid) !=
96  }
97 
98  // TODO current uses graph partitioner
99  // TODO make it so user doens't have to specify; can be done by tracking
100  // if an outgoing mirror is marked as having an incoming edge on any
101  // host
102  virtual bool is_vertex_cut() const { return graphPartitioner->isVertexCut(); }
103  virtual std::pair<unsigned, unsigned> cartesianGrid() const {
104  return graphPartitioner->cartesianGrid();
105  }
106 
110  void resetEdgeLoad() {
111  if (_edgeStateRounds > 1) {
112  if (!graphPartitioner->noCommunication()) {
113  for (unsigned i = 0; i < base_DistGraph::numHosts; i++) {
114  hostLoads[i].reset();
115  old_hostLoads[i] = 0;
116  }
117  }
118  }
119  }
120 
124  void syncEdgeLoad() {
125  if (_edgeStateRounds > 1) {
126  if (!graphPartitioner->noCommunication()) {
127  for (unsigned i = 0; i < base_DistGraph::numHosts; i++) {
128  old_hostLoads[i] += hostLoads[i].reduce();
129  hostLoads[i].reset();
130  }
131  }
132  }
133  }
134 
138  void printEdgeLoad() {
139  if (_edgeStateRounds > 1) {
140  if (!graphPartitioner->noCommunication()) {
141  if (base_DistGraph::id == 0) {
142  for (unsigned i = 0; i < base_DistGraph::numHosts; i++) {
143  galois::gDebug("[", base_DistGraph::id, "] ", i, " ",
144  old_hostLoads[i], "\n");
145  }
146  }
147  }
148  }
149  }
150 
155  const std::string& filename, unsigned host, unsigned _numHosts,
156  bool cuspAsync = true, uint32_t stateRounds = 100, bool transpose = false,
158  uint32_t nodeWeight = 0, uint32_t edgeWeight = 0,
159  std::string masterBlockFile = "", bool readFromFile = false,
160  std::string localGraphFileName = "local_graph",
161  uint32_t edgeStateRounds = 1)
162  : base_DistGraph(host, _numHosts), _edgeStateRounds(edgeStateRounds) {
163  galois::runtime::reportParam("dGraph", "GenericPartitioner", "0");
165  "GraphPartitioningTime", GRNAME);
166  Tgraph_construct.start();
167 
168  if (readFromFile) {
170  "] Reading local graph from file ", localGraphFileName,
171  "\n");
172  base_DistGraph::read_local_graph_from_file(localGraphFileName);
173  Tgraph_construct.stop();
174  return;
175  }
176 
177  galois::graphs::OfflineGraph g(filename);
180  std::vector<unsigned> dummy;
181  // not actually getting masters, but getting assigned readers for nodes
182  if (masterBlockFile == "") {
183  base_DistGraph::computeMasters(md, g, dummy, nodeWeight, edgeWeight);
184  } else {
185  galois::gInfo("Getting reader assignment from file");
186  base_DistGraph::readersFromFile(g, masterBlockFile);
187  }
188 
189  graphPartitioner =
190  new Partitioner(host, _numHosts, base_DistGraph::numGlobalNodes,
192  // TODO abstract this away somehow
193  graphPartitioner->saveGIDToHost(base_DistGraph::gid2host);
194 
195  uint64_t nodeBegin = base_DistGraph::gid2host[base_DistGraph::id].first;
197  g.edge_begin(nodeBegin);
198  uint64_t nodeEnd = base_DistGraph::gid2host[base_DistGraph::id].second;
200  g.edge_begin(nodeEnd);
201 
202  // signifies how many outgoing edges a particular host should expect from
203  // this host
204  std::vector<std::vector<uint64_t>> numOutgoingEdges;
205  // signifies if a host should create a node because it has an incoming edge
206  std::vector<galois::DynamicBitSet> hasIncomingEdge;
207 
208  // only need to use for things that need communication
209  if (!graphPartitioner->noCommunication()) {
210  if (_edgeStateRounds > 1) {
211  hostLoads.resize(base_DistGraph::numHosts);
212  old_hostLoads.resize(base_DistGraph::numHosts);
213  resetEdgeLoad();
214  }
215  numOutgoingEdges.resize(base_DistGraph::numHosts);
216  hasIncomingEdge.resize(base_DistGraph::numHosts);
217  }
218 
219  // phase 0
220 
221  galois::gPrint("[", base_DistGraph::id, "] Starting graph reading.\n");
223  bufGraph.resetReadCounters();
224  galois::StatTimer graphReadTimer("GraphReading", GRNAME);
225  graphReadTimer.start();
226  bufGraph.loadPartialGraph(filename, nodeBegin, nodeEnd, *edgeBegin,
229  graphReadTimer.stop();
230  galois::gPrint("[", base_DistGraph::id, "] Reading graph complete.\n");
231 
232  if (graphPartitioner->masterAssignPhase()) {
233  // loop over all nodes, determine where neighbors are, assign masters
234  galois::StatTimer phase0Timer("Phase0", GRNAME);
236  "] Starting master assignment.\n");
237  phase0Timer.start();
238  phase0(bufGraph, cuspAsync, stateRounds);
239  phase0Timer.stop();
241  "] Master assignment complete.\n");
242  }
243 
244  galois::StatTimer inspectionTimer("EdgeInspection", GRNAME);
245  inspectionTimer.start();
246  bufGraph.resetReadCounters();
247  galois::gstl::Vector<uint64_t> prefixSumOfEdges;
248 
249  // assign edges to other nodes
250  if (!graphPartitioner->noCommunication()) {
251  edgeInspection(bufGraph, numOutgoingEdges, hasIncomingEdge,
252  inspectionTimer);
253  galois::DynamicBitSet& finalIncoming =
254  hasIncomingEdge[base_DistGraph::id];
255 
256  galois::StatTimer mapTimer("NodeMapping", GRNAME);
257  mapTimer.start();
258  nodeMapping(numOutgoingEdges, finalIncoming, prefixSumOfEdges);
259  mapTimer.stop();
260 
261  finalIncoming.resize(0);
262  } else {
263  base_DistGraph::numOwned = nodeEnd - nodeBegin;
264  uint64_t edgeOffset = *bufGraph.edgeBegin(nodeBegin);
265  // edge prefix sum, no comm required
266  edgeCutInspection(bufGraph, inspectionTimer, edgeOffset,
267  prefixSumOfEdges);
268  }
269  // inspection timer is stopped in edgeInspection function
270 
271  // flip partitioners that have a master assignment phase to stage 2
272  // (meaning all nodes and masters that will be on this host are present in
273  // the partitioner's metadata)
274  if (graphPartitioner->masterAssignPhase()) {
275  graphPartitioner->enterStage2();
276  }
277 
278  // get memory back from inspection metadata
279  numOutgoingEdges.clear();
280  hasIncomingEdge.clear();
281  // doubly make sure the data is cleared
282  freeVector(numOutgoingEdges); // should no longer use this variable
283  freeVector(hasIncomingEdge); // should no longer use this variable
284 
285  // Graph construction related calls
286 
288  // Allocate and construct the graph
292 
293  // edge end fixing
294  auto& base_graph = base_DistGraph::graph;
297  [&](uint64_t n) { base_graph.fixEndEdge(n, prefixSumOfEdges[n]); },
298 #if MORE_DIST_STATS
299  galois::loopname("FixEndEdgeLoop"),
300 #endif
301  galois::no_stats());
302  // get memory from prefix sum back
303  prefixSumOfEdges.clear();
304  freeVector(prefixSumOfEdges); // should no longer use this variable
305  galois::CondStatTimer<MORE_DIST_STATS> TfillMirrors("FillMirrors", GRNAME);
306 
307  TfillMirrors.start();
308  fillMirrors();
309  TfillMirrors.stop();
310 
311  if (_edgeStateRounds > 1) {
312  // reset edge load since we need exact same answers again
313  resetEdgeLoad();
314  }
315 
316  // Edge loading
317  if (!graphPartitioner->noCommunication()) {
318  loadEdges(base_DistGraph::graph, bufGraph);
319  } else {
320  // Edge cut construction
321  edgeCutLoad(base_DistGraph::graph, bufGraph);
322  bufGraph.resetAndFree();
323  }
324 
325  // Finalization
326 
327  // TODO this is a hack; fix it somehow
328  // if vertex cut but not a cart cut is the condition
329  if (graphPartitioner->isVertexCut() &&
330  graphPartitioner->cartesianGrid().first == 0) {
332  }
333 
334  if (transpose) {
337  if (base_DistGraph::numNodes > 0) {
338  // consider all nodes to have outgoing edges (TODO better way to do
339  // this?) for now it's fine I guess
341  }
342  }
343 
344  galois::CondStatTimer<MORE_DIST_STATS> Tthread_ranges("ThreadRangesTime",
345  GRNAME);
346 
347  Tthread_ranges.start();
349  Tthread_ranges.stop();
350 
354 
355  Tgraph_construct.stop();
356  galois::gPrint("[", base_DistGraph::id, "] Graph construction complete.\n");
357 
358  // report state rounds
359  if (base_DistGraph::id == 0) {
360  galois::runtime::reportStat_Single(GRNAME, "CuSPStateRounds",
361  (uint32_t)stateRounds);
362  }
363  }
364 
368  ~NewDistGraphGeneric() { delete graphPartitioner; }
369 
370 private:
372  getSpecificThreadRange(galois::graphs::BufferedGraph<EdgeTy>& bufGraph,
373  std::vector<uint32_t>& assignedThreadRanges,
374  uint64_t startNode, uint64_t endNode) {
375  galois::StatTimer threadRangeTime("Phase0ThreadRangeTime");
376  threadRangeTime.start();
377  uint64_t numLocalNodes = endNode - startNode;
379  edgePrefixSum.resize(numLocalNodes);
380 
381  // get thread ranges with a prefix sum
383  galois::iterate(startNode, endNode),
384  [&](unsigned n) {
385  uint64_t offset = n - startNode;
386  edgePrefixSum[offset] = bufGraph.edgeEnd(n) - bufGraph.edgeBegin(n);
387  },
388  galois::no_stats());
389 
390  for (unsigned i = 1; i < numLocalNodes; i++) {
391  edgePrefixSum[i] += edgePrefixSum[i - 1];
392  }
393 
395  galois::runtime::activeThreads, edgePrefixSum);
396 
397  for (unsigned i = 0; i < galois::runtime::activeThreads + 1; i++) {
398  assignedThreadRanges[i] += startNode;
399  }
400 
401  auto toReturn = galois::runtime::makeSpecificRange(
402  boost::counting_iterator<size_t>(startNode),
403  boost::counting_iterator<size_t>(startNode + numLocalNodes),
404  assignedThreadRanges.data());
405 
406  threadRangeTime.stop();
407  return toReturn;
408  }
409 
419  // steps 1 and 2 of neighbor location setup: memory allocation, bitset setting
420  void phase0BitsetSetup(galois::graphs::BufferedGraph<EdgeTy>& bufGraph,
421  galois::DynamicBitSet& ghosts) {
422  galois::StatTimer bitsetSetupTimer("Phase0BitsetSetup", GRNAME);
423  bitsetSetupTimer.start();
424 
425  ghosts.resize(bufGraph.size());
426  ghosts.reset();
427 
428  std::vector<uint32_t> rangeVector;
429  auto start = base_DistGraph::gid2host[base_DistGraph::id].first;
430  auto end = base_DistGraph::gid2host[base_DistGraph::id].second;
431 
433  getSpecificThreadRange(bufGraph, rangeVector, start, end);
434 
435  // Step 2: loop over all local nodes, determine neighbor locations
437  galois::iterate(work),
438  // galois::iterate(base_DistGraph::gid2host[base_DistGraph::id].first,
439  // base_DistGraph::gid2host[base_DistGraph::id].second),
440  [&](unsigned n) {
441  // ptt.start();
442  // galois::gPrint("[", base_DistGraph::id, " ",
443  // galois::substrate::getThreadPool().getTID(), "] ", n, "\n");
444  auto ii = bufGraph.edgeBegin(n);
445  auto ee = bufGraph.edgeEnd(n);
446  for (; ii < ee; ++ii) {
447  uint32_t dst = bufGraph.edgeDestination(*ii);
448  if ((dst < start) || (dst >= end)) { // not owned by this host
449  // set on bitset
450  ghosts.set(dst);
451  }
452  }
453  // ptt.stop();
454  },
455  galois::loopname("Phase0BitsetSetup_DetermineNeighborLocations"),
457 
458  bitsetSetupTimer.stop();
459  }
460 
461  // sets up the gid to lid mapping for phase 0
473  uint64_t phase0MapSetup(
474  galois::DynamicBitSet& ghosts,
475  std::unordered_map<uint64_t, uint32_t>& gid2offsets,
477  galois::StatTimer mapSetupTimer("Phase0MapSetup", GRNAME);
478  mapSetupTimer.start();
479 
480  uint32_t numLocal = base_DistGraph::gid2host[base_DistGraph::id].second -
482  uint32_t lid = numLocal;
483 
484  uint64_t numToReserve = ghosts.count();
485  gid2offsets.reserve(numToReserve);
486 
487  // TODO: parallelize using prefix sum?
488  for (unsigned h = 0; h < base_DistGraph::numHosts; ++h) {
489  if (h == base_DistGraph::id)
490  continue;
491  auto start = base_DistGraph::gid2host[h].first;
492  auto end = base_DistGraph::gid2host[h].second;
493  for (uint64_t gid = start; gid < end; ++gid) {
494  if (ghosts.test(gid)) {
495  gid2offsets[gid] = lid;
496  syncNodes[h].push_back(gid - start);
497  lid++;
498  }
499  }
500  galois::gDebug("[", base_DistGraph::id, " -> ", h, "] bitset size ",
501  (end - start) / 64, " vs. vector size ",
502  syncNodes[h].size() / 2);
503  }
504  lid -= numLocal;
505 
506  assert(lid == numToReserve);
507  galois::gDebug("[", base_DistGraph::id, "] total bitset size ",
508  (ghosts.size() - numLocal) / 64, " vs. total vector size ",
509  numToReserve / 2);
510 
511  // TODO: should not be used after this - refactor to make this clean
512  ghosts.resize(0);
513 
514  mapSetupTimer.stop();
515 
516  return lid;
517  }
518 
519  // steps 4 and 5 of neighbor location setup
529  void phase0SendRecv(
532  galois::StatTimer p0BitsetCommTimer("Phase0SendRecvBitsets", GRNAME);
533  p0BitsetCommTimer.start();
534  uint64_t bytesSent = 0;
535 
536  // Step 4: send bitset to other hosts
537  for (unsigned h = 0; h < base_DistGraph::numHosts; h++) {
538  galois::runtime::SendBuffer bitsetBuffer;
539 
540  if (h != base_DistGraph::id) {
541  galois::runtime::gSerialize(bitsetBuffer, syncNodes[h]);
542  bytesSent += bitsetBuffer.size();
543  net.sendTagged(h, galois::runtime::evilPhase, bitsetBuffer);
544  }
545  }
546 
547  // Step 5: recv bitset to other hosts; this indicates which local nodes each
548  // other host needs to be informed of updates of
549  for (unsigned h = 0; h < net.Num - 1; h++) {
550  decltype(net.recieveTagged(galois::runtime::evilPhase, nullptr)) p;
551  do {
552  p = net.recieveTagged(galois::runtime::evilPhase, nullptr);
553  } while (!p);
554  uint32_t sendingHost = p->first;
555  // deserialize into neighbor bitsets
556  galois::runtime::gDeserialize(p->second, syncNodes[sendingHost]);
557  }
558 
559  p0BitsetCommTimer.stop();
560 
562  GRNAME, std::string("Phase0SendRecvBitsetsBytesSent"), bytesSent);
563 
564  // comm phase complete
566  }
567 
577  void syncLoad(std::vector<uint64_t>& loads,
578  std::vector<galois::CopyableAtomic<uint64_t>>& accums) {
579  assert(loads.size() == accums.size());
580  // use DG accumulator to force barrier on all hosts to sync this data
582  // sync accum for each host one by one
583  for (unsigned i = 0; i < loads.size(); i++) {
584  syncer.reset();
585  syncer += (accums[i].load());
586  accums[i].store(0);
587  uint64_t accumulation = syncer.reduce();
588  loads[i] += accumulation;
589  }
590  }
591 
599  template <typename VType>
600  void
601  extractAtomicToPODArray(std::vector<galois::CopyableAtomic<VType>>& atomic,
603  nonAtomic.resize(atomic.size());
604 
606  galois::iterate((size_t)0, atomic.size()),
607  [&](size_t i) {
608  nonAtomic[i] = atomic[i].load();
609  atomic[i].store(0);
610  },
611  galois::no_stats());
612  }
613 
621  void asyncSendLoad(galois::PODResizeableArray<uint64_t>& nodeAccum,
624 
625  unsigned bytesSent = 0;
626  galois::StatTimer sendTimer("Phase0AsyncSendLoadTime", GRNAME);
627 
628  sendTimer.start();
629  for (unsigned h = 0; h < base_DistGraph::numHosts; h++) {
630  if (h != base_DistGraph::id) {
631  // serialize node and edge accumulations with tag 4 (to avoid
632  // conflict with other tags being used) and send
634 
635  galois::runtime::gSerialize(b, 4);
636  galois::runtime::gSerialize(b, nodeAccum);
637  galois::runtime::gSerialize(b, edgeAccum);
638  bytesSent += b.size();
639 
640  // note the +1 on evil phase; load messages send using a different
641  // phase to avoid conflicts
642  net.sendTagged(h, base_DistGraph::evilPhasePlus1(), b);
643  }
644  }
645  sendTimer.stop();
646 
647  galois::runtime::reportStat_Tsum(GRNAME, "Phase0AsyncSendLoadBytesSent",
648  bytesSent);
649  }
650 
658  void asyncRecvLoad(std::vector<uint64_t>& nodeLoads,
659  std::vector<uint64_t>& edgeLoads,
660  galois::DynamicBitSet& loadsClear) {
662  decltype(net.recieveTagged(base_DistGraph::evilPhasePlus1(), nullptr)) p;
663 
664  galois::StatTimer recvTimer("Phase0AsyncRecvLoadTime", GRNAME);
665  recvTimer.start();
666  do {
667  // note the +1
668  p = net.recieveTagged(base_DistGraph::evilPhasePlus1(), nullptr);
669 
670  if (p) {
671  unsigned messageType = (unsigned)-1;
672  // deserialize message type
673  galois::runtime::gDeserialize(p->second, messageType);
674 
675  if (messageType == 4) {
678  // loads to add
679  galois::runtime::gDeserialize(p->second, recvNodeAccum);
680  galois::runtime::gDeserialize(p->second, recvEdgeAccum);
681 
682  assert(recvNodeAccum.size() == recvEdgeAccum.size());
683  assert(recvNodeAccum.size() == nodeLoads.size());
684  assert(recvEdgeAccum.size() == edgeLoads.size());
685 
687  galois::iterate((size_t)0, recvNodeAccum.size()),
688  [&](size_t i) {
689  nodeLoads[i] += recvNodeAccum[i];
690  edgeLoads[i] += recvEdgeAccum[i];
691  },
692  galois::no_stats());
693  } else if (messageType == 3) {
694  // all clear message from host
695  uint32_t sendingHost = p->first;
696  assert(!loadsClear.test(sendingHost));
697  loadsClear.set(sendingHost);
698  } else {
699  GALOIS_DIE("unexpected message type in async load synchronization: ",
700  messageType);
701  }
702  }
703  } while (p);
704 
705  recvTimer.stop();
706  }
707 
722  void asyncSyncLoad(std::vector<uint64_t>& nodeLoads,
723  std::vector<galois::CopyableAtomic<uint64_t>>& nodeAccum,
724  std::vector<uint64_t>& edgeLoads,
725  std::vector<galois::CopyableAtomic<uint64_t>>& edgeAccum,
726  galois::DynamicBitSet& loadsClear) {
727  assert(nodeLoads.size() == base_DistGraph::numHosts);
728  assert(nodeAccum.size() == base_DistGraph::numHosts);
729  assert(edgeLoads.size() == base_DistGraph::numHosts);
730  assert(edgeAccum.size() == base_DistGraph::numHosts);
731 
732  galois::StatTimer syncTimer("Phase0AsyncSyncLoadTime", GRNAME);
733  syncTimer.start();
734 
735  // extract out data to send
736  galois::PODResizeableArray<uint64_t> nonAtomicNodeAccum;
737  galois::PODResizeableArray<uint64_t> nonAtomicEdgeAccum;
738  extractAtomicToPODArray(nodeAccum, nonAtomicNodeAccum);
739  extractAtomicToPODArray(edgeAccum, nonAtomicEdgeAccum);
740 
741  assert(nonAtomicNodeAccum.size() == base_DistGraph::numHosts);
742  assert(nonAtomicEdgeAccum.size() == base_DistGraph::numHosts);
743 
744  // apply loads to self
746  galois::iterate((uint32_t)0, base_DistGraph::numHosts),
747  [&](size_t i) {
748  nodeLoads[i] += nonAtomicNodeAccum[i];
749  edgeLoads[i] += nonAtomicEdgeAccum[i];
750  },
751  galois::no_stats());
752 
753 #ifndef NDEBUG
754  for (unsigned i = 0; i < nodeAccum.size(); i++) {
755  assert(nodeAccum[i].load() == 0);
756  assert(edgeAccum[i].load() == 0);
757  }
758 #endif
759 
760  // send both nodes and edges accumulation at once
761  asyncSendLoad(nonAtomicNodeAccum, nonAtomicEdgeAccum);
762  asyncRecvLoad(nodeLoads, edgeLoads, loadsClear);
763 
764  syncTimer.stop();
765  }
766 
774  void printLoad(std::vector<uint64_t>& loads,
775  std::vector<galois::CopyableAtomic<uint64_t>>& accums) {
776  assert(loads.size() == accums.size());
777  for (unsigned i = 0; i < loads.size(); i++) {
778  galois::gDebug("[", base_DistGraph::id, "] ", i, " total ", loads[i],
779  " accum ", accums[i].load());
780  }
781  }
782 
793  template <typename T>
794  std::vector<T> getDataFromOffsets(std::vector<uint32_t>& offsetVector,
795  const std::vector<T>& dataVector) {
796  std::vector<T> toReturn;
797  toReturn.resize(offsetVector.size());
798 
800  galois::iterate((size_t)0, offsetVector.size()),
801  [&](unsigned i) { toReturn[i] = dataVector[offsetVector[i]]; },
802  galois::no_stats());
803 
804  return toReturn;
805  }
806 
817  void sendOffsets(unsigned targetHost, galois::DynamicBitSet& toSync,
818  std::vector<uint32_t>& dataVector,
819  std::string timerName = std::string()) {
821  std::string statString = std::string("Phase0SendOffsets_") + timerName;
822  uint64_t bytesSent = 0;
823 
824  galois::StatTimer sendOffsetsTimer(statString.c_str(), GRNAME);
825 
826  sendOffsetsTimer.start();
827 
828  // this means there are updates to send
829  if (toSync.count()) {
830  std::vector<uint32_t> offsetVector = toSync.getOffsets();
831  // get masters to send into a vector
832  std::vector<uint32_t> mastersToSend =
833  getDataFromOffsets(offsetVector, dataVector);
834 
835  assert(mastersToSend.size());
836 
837  size_t num_selected = toSync.count();
838  size_t num_total = toSync.size();
839  // figure out how to send (most efficient method; either bitset
840  // and data or offsets + data)
841  size_t bitset_alloc_size =
842  ((num_total + 63) / 64) * sizeof(uint64_t) + (2 * sizeof(size_t));
843  size_t bitsetDataSize = (num_selected * sizeof(uint32_t)) +
844  bitset_alloc_size + sizeof(num_selected);
845  size_t offsetsDataSize = (num_selected * sizeof(uint32_t)) +
846  (num_selected * sizeof(unsigned int)) +
847  sizeof(uint32_t) + sizeof(num_selected);
848 
850  // tag with send method and do send
851  if (bitsetDataSize < offsetsDataSize) {
852  // send bitset, tag 1
853  galois::runtime::gSerialize(b, 1u);
854  galois::runtime::gSerialize(b, toSync);
855  galois::runtime::gSerialize(b, mastersToSend);
856  } else {
857  // send offsets, tag 2
858  galois::runtime::gSerialize(b, 2u);
859  galois::runtime::gSerialize(b, offsetVector);
860  galois::runtime::gSerialize(b, mastersToSend);
861  }
862  bytesSent += b.size();
863  net.sendTagged(targetHost, galois::runtime::evilPhase, b);
864  } else {
865  // send empty no-op message, tag 0
867  galois::runtime::gSerialize(b, 0u);
868  bytesSent += b.size();
869  net.sendTagged(targetHost, galois::runtime::evilPhase, b);
870  }
871  sendOffsetsTimer.stop();
872 
873  galois::runtime::reportStat_Tsum(GRNAME, statString + "BytesSent",
874  bytesSent);
875  }
876 
888  void syncAssignmentSends(
889  uint32_t begin, uint32_t end, uint32_t numLocalNodes,
890  std::vector<uint32_t>& localNodeToMaster,
892  galois::StatTimer p0assignSendTime("Phase0AssignmentSendTime", GRNAME);
893  p0assignSendTime.start();
894 
895  galois::DynamicBitSet toSync;
896  toSync.resize(numLocalNodes);
897 
898  // send loop
899  for (unsigned h = 0; h < base_DistGraph::numHosts; h++) {
900  if (h != base_DistGraph::id) {
901  toSync.reset();
902  // send if in [start,end) and present in syncNodes[h]
904  galois::iterate(syncNodes[h]),
905  [&](uint32_t lid) {
906  if ((lid >= begin) && (lid < end)) {
907  toSync.set(lid);
908  }
909  },
910  galois::no_stats());
911  // do actual send based on sync bitset
912  sendOffsets(h, toSync, localNodeToMaster, "NewAssignments");
913  }
914  }
915 
916  p0assignSendTime.stop();
917  }
918 
924  void sendAllClears(unsigned phase = 0) {
925  unsigned bytesSent = 0;
927  galois::StatTimer allClearTimer("Phase0SendAllClearTime", GRNAME);
928  allClearTimer.start();
929 
930  // send loop
931  for (unsigned h = 0; h < base_DistGraph::numHosts; h++) {
932  if (h != base_DistGraph::id) {
934  galois::runtime::gSerialize(b, 3u);
935  bytesSent += b.size();
936  // assumes phase is 0 or 1
937  if (phase == 1) {
938  net.sendTagged(h, base_DistGraph::evilPhasePlus1(), b);
939  } else if (phase == 0) {
940  net.sendTagged(h, galois::runtime::evilPhase, b);
941  } else {
942  GALOIS_DIE("unexpected phase: ", phase);
943  }
944  }
945  }
946  allClearTimer.stop();
947 
948  galois::runtime::reportStat_Tsum(GRNAME, "Phase0SendAllClearBytesSent",
949  bytesSent);
950  }
951 
952  void saveReceivedMappings(std::vector<uint32_t>& localNodeToMaster,
953  std::unordered_map<uint64_t, uint32_t>& gid2offsets,
954  unsigned sendingHost,
955  std::vector<uint32_t>& receivedOffsets,
956  std::vector<uint32_t>& receivedMasters) {
957  uint64_t hostOffset = base_DistGraph::gid2host[sendingHost].first;
958  galois::gDebug("[", base_DistGraph::id, "] host ", sendingHost, " offset ",
959  hostOffset);
960 
961  // if execution gets here, messageType was 1 or 2
962  assert(receivedMasters.size() == receivedOffsets.size());
963 
965  galois::iterate((size_t)0, receivedMasters.size()),
966  [&](size_t i) {
967  uint64_t curGID = hostOffset + receivedOffsets[i];
968  uint32_t indexIntoMap = gid2offsets[curGID];
969  galois::gDebug("[", base_DistGraph::id, "] gid ", curGID, " offset ",
970  indexIntoMap);
971  localNodeToMaster[indexIntoMap] = receivedMasters[i];
972  },
973  galois::no_stats());
974  }
975 
984  std::pair<unsigned, unsigned>
985  recvOffsetsAndMasters(std::vector<uint32_t>& receivedOffsets,
986  std::vector<uint32_t>& receivedMasters) {
988 
989  decltype(net.recieveTagged(galois::runtime::evilPhase, nullptr)) p;
990  do {
991  p = net.recieveTagged(galois::runtime::evilPhase, nullptr);
992  } while (!p);
993 
994  uint32_t sendingHost = p->first;
995  unsigned messageType = (unsigned)-1;
996 
997  // deserialize message type
998  galois::runtime::gDeserialize(p->second, messageType);
999 
1000  if (messageType == 1) {
1001  // bitset; deserialize, then get offsets
1002  galois::DynamicBitSet receivedSet;
1003  galois::runtime::gDeserialize(p->second, receivedSet);
1004  receivedOffsets = receivedSet.getOffsets();
1005  galois::runtime::gDeserialize(p->second, receivedMasters);
1006  } else if (messageType == 2) {
1007  // offsets
1008  galois::runtime::gDeserialize(p->second, receivedOffsets);
1009  galois::runtime::gDeserialize(p->second, receivedMasters);
1010  } else if (messageType != 0) {
1011  GALOIS_DIE("invalid message type for sync of master assignments: ",
1012  messageType);
1013  }
1014 
1015  galois::gDebug("[", base_DistGraph::id, "] host ", sendingHost,
1016  " send message type ", messageType);
1017 
1018  return std::make_pair(sendingHost, messageType);
1019  }
1020 
1029  void recvOffsetsAndMastersAsync(
1030  std::vector<uint32_t>& localNodeToMaster,
1031  std::unordered_map<uint64_t, uint32_t>& gid2offsets,
1032  galois::DynamicBitSet& hostFinished) {
1034  decltype(net.recieveTagged(galois::runtime::evilPhase, nullptr)) p;
1035 
1036  // repeat loop until no message
1037  do {
1038  p = net.recieveTagged(galois::runtime::evilPhase, nullptr);
1039  if (p) {
1040  uint32_t sendingHost = p->first;
1041  unsigned messageType = (unsigned)-1;
1042 
1043  std::vector<uint32_t> receivedOffsets;
1044  std::vector<uint32_t> receivedMasters;
1045 
1046  // deserialize message type
1047  galois::runtime::gDeserialize(p->second, messageType);
1048 
1049  if (messageType == 1) {
1050  // bitset; deserialize, then get offsets
1051  galois::DynamicBitSet receivedSet;
1052  galois::runtime::gDeserialize(p->second, receivedSet);
1053  receivedOffsets = receivedSet.getOffsets();
1054  galois::runtime::gDeserialize(p->second, receivedMasters);
1055  saveReceivedMappings(localNodeToMaster, gid2offsets, sendingHost,
1056  receivedOffsets, receivedMasters);
1057  } else if (messageType == 2) {
1058  // offsets
1059  galois::runtime::gDeserialize(p->second, receivedOffsets);
1060  galois::runtime::gDeserialize(p->second, receivedMasters);
1061  saveReceivedMappings(localNodeToMaster, gid2offsets, sendingHost,
1062  receivedOffsets, receivedMasters);
1063  } else if (messageType == 3) {
1064  // host indicating that it is done with all assignments from its
1065  // end; mark as such in bitset
1066  assert(!hostFinished.test(sendingHost));
1067  hostFinished.set(sendingHost);
1068  } else if (messageType != 0) {
1069  GALOIS_DIE("invalid message type for sync of master assignments: ",
1070  messageType);
1071  }
1072 
1073  galois::gDebug("[", base_DistGraph::id, "] host ", sendingHost,
1074  " send message type ", messageType);
1075  }
1076  } while (p);
1077  }
1078 
1088  void
1089  syncAssignmentReceives(std::vector<uint32_t>& localNodeToMaster,
1090  std::unordered_map<uint64_t, uint32_t>& gid2offsets) {
1091  galois::StatTimer p0assignReceiveTime("Phase0AssignmentReceiveTime",
1092  GRNAME);
1093  p0assignReceiveTime.start();
1094 
1095  // receive loop
1096  for (unsigned h = 0; h < base_DistGraph::numHosts - 1; h++) {
1097  unsigned sendingHost;
1098  unsigned messageType;
1099  std::vector<uint32_t> receivedOffsets;
1100  std::vector<uint32_t> receivedMasters;
1101 
1102  std::tie(sendingHost, messageType) =
1103  recvOffsetsAndMasters(receivedOffsets, receivedMasters);
1104 
1105  if (messageType == 1 || messageType == 2) {
1106  saveReceivedMappings(localNodeToMaster, gid2offsets, sendingHost,
1107  receivedOffsets, receivedMasters);
1108  }
1109  }
1110 
1111  p0assignReceiveTime.stop();
1112  }
1113 
1114  void syncAssignmentReceivesAsync(
1115  std::vector<uint32_t>& localNodeToMaster,
1116  std::unordered_map<uint64_t, uint32_t>& gid2offsets,
1117  galois::DynamicBitSet& hostFinished) {
1118  galois::StatTimer p0assignReceiveTime("Phase0AssignmentReceiveTimeAsync",
1119  GRNAME);
1120  p0assignReceiveTime.start();
1121 
1122  recvOffsetsAndMastersAsync(localNodeToMaster, gid2offsets, hostFinished);
1123 
1124  p0assignReceiveTime.stop();
1125  }
1126 
1139  void syncAssignment(
1140  uint32_t begin, uint32_t end, uint32_t numLocalNodes,
1141  std::vector<uint32_t>& localNodeToMaster,
1143  std::unordered_map<uint64_t, uint32_t>& gid2offsets) {
1144  galois::StatTimer syncAssignmentTimer("Phase0SyncAssignmentTime", GRNAME);
1145  syncAssignmentTimer.start();
1146 
1147  syncAssignmentSends(begin, end, numLocalNodes, localNodeToMaster,
1148  syncNodes);
1149  syncAssignmentReceives(localNodeToMaster, gid2offsets);
1150 
1151  syncAssignmentTimer.stop();
1152  }
1153 
1154  void syncAssignmentAsync(
1155  uint32_t begin, uint32_t end, uint32_t numLocalNodes,
1156  std::vector<uint32_t>& localNodeToMaster,
1158  std::unordered_map<uint64_t, uint32_t>& gid2offsets,
1159  galois::DynamicBitSet& hostFinished) {
1160  galois::StatTimer syncAssignmentTimer("Phase0SyncAssignmentAsyncTime",
1161  GRNAME);
1162  syncAssignmentTimer.start();
1163 
1164  syncAssignmentSends(begin, end, numLocalNodes, localNodeToMaster,
1165  syncNodes);
1166  syncAssignmentReceivesAsync(localNodeToMaster, gid2offsets, hostFinished);
1167 
1168  syncAssignmentTimer.stop();
1169  }
1170 
1179  void sendMastersToOwners(
1180  std::vector<uint32_t>& localNodeToMaster,
1182  uint32_t begin = base_DistGraph::gid2host[base_DistGraph::id].first;
1183  uint32_t end = base_DistGraph::gid2host[base_DistGraph::id].second;
1184  // for each host, determine which master assignments still need to be sent
1185  // (if a host is a master of a node, but that node is not present as a
1186  // neighbor on the host, then this host needs to send the master assignment)
1187  galois::DynamicBitSet toSend;
1188  toSend.resize(end - begin);
1189 
1190  for (unsigned h = 0; h < base_DistGraph::numHosts; ++h) {
1191  if (h != base_DistGraph::id) {
1192  toSend.reset();
1193  // send if present in localNodeToMaster but not present in syncNodes
1195  galois::iterate((uint32_t)0, end - begin),
1196  [&](uint32_t lid) {
1197  if (localNodeToMaster[lid] == h) {
1198  toSend.set(lid);
1199  }
1200  },
1201  galois::no_stats());
1203  galois::iterate(syncNodes[h]),
1204  [&](uint32_t lid) { toSend.reset(lid); }, galois::no_stats());
1205 
1206  sendOffsets(h, toSend, localNodeToMaster, "MastersToOwners");
1207  }
1208  }
1209  }
1210 
1215  void recvMastersToOwners() {
1216  for (unsigned h = 0; h < base_DistGraph::numHosts - 1; h++) {
1217  unsigned sendingHost;
1218  unsigned messageType;
1219  std::vector<uint32_t> receivedOffsets;
1220  std::vector<uint32_t> receivedMasters;
1221 
1222  std::tie(sendingHost, messageType) =
1223  recvOffsetsAndMasters(receivedOffsets, receivedMasters);
1224 
1225  if (messageType == 1 || messageType == 2) {
1226  assert(receivedMasters.size() == receivedOffsets.size());
1227  uint64_t hostOffset = base_DistGraph::gid2host[sendingHost].first;
1228 
1229  // must be single threaded as map updating isn't thread-safe
1230  for (unsigned i = 0; i < receivedMasters.size(); i++) {
1231  uint64_t gidToMap = hostOffset + receivedOffsets[i];
1232 #ifndef NDEBUG
1233  bool newMapped =
1234 #endif
1235  graphPartitioner->addMasterMapping(gidToMap, receivedMasters[i]);
1236  assert(newMapped);
1237  }
1238  }
1239  }
1240  }
1241 
1250  void phase0(galois::graphs::BufferedGraph<EdgeTy>& bufGraph, bool async,
1251  const uint32_t stateRounds) {
1252  galois::DynamicBitSet ghosts;
1254  syncNodes; // masterNodes
1255  syncNodes.resize(base_DistGraph::numHosts);
1256 
1257  // determine on which hosts that this host's read nodes havs neighbors on
1258  phase0BitsetSetup(bufGraph, ghosts);
1259  // gid to vector offset setup
1260  std::unordered_map<uint64_t, uint32_t> gid2offsets;
1261  uint64_t neighborCount = phase0MapSetup(ghosts, gid2offsets, syncNodes);
1262  galois::gDebug("[", base_DistGraph::id, "] num neighbors found is ",
1263  neighborCount);
1264  // send off neighbor metadata
1265  phase0SendRecv(syncNodes);
1266 
1267  galois::StatTimer p0allocTimer("Phase0AllocationTime", GRNAME);
1268 
1269  p0allocTimer.start();
1270 
1271  // setup other partitioning metadata: nodes on each host, edges on each
1272  // host (as determined by edge cut)
1273  std::vector<uint64_t> nodeLoads;
1274  std::vector<uint64_t> edgeLoads;
1275  std::vector<galois::CopyableAtomic<uint64_t>> nodeAccum;
1276  std::vector<galois::CopyableAtomic<uint64_t>> edgeAccum;
1277  nodeLoads.assign(base_DistGraph::numHosts, 0);
1278  edgeLoads.assign(base_DistGraph::numHosts, 0);
1279  nodeAccum.assign(base_DistGraph::numHosts, 0);
1280  edgeAccum.assign(base_DistGraph::numHosts, 0);
1281 
1282  uint32_t numLocalNodes =
1285 
1286  std::vector<uint32_t> localNodeToMaster;
1287  localNodeToMaster.assign(numLocalNodes + neighborCount, (uint32_t)-1);
1288 
1289  // bitsets tracking termination of assignments and partitioning loads
1290  galois::DynamicBitSet hostFinished;
1291  galois::DynamicBitSet loadsClear;
1292 
1293  if (async) {
1294  if (base_DistGraph::id == 0) {
1295  galois::gPrint("Using asynchronous master determination sends.\n");
1296  }
1297 
1298  hostFinished.resize(base_DistGraph::numHosts);
1299  loadsClear.resize(base_DistGraph::numHosts);
1300  }
1301 
1302  p0allocTimer.stop();
1303 
1304  uint64_t globalOffset = base_DistGraph::gid2host[base_DistGraph::id].first;
1305 
1306 #ifndef NDEBUG
1307  for (uint32_t i : localNodeToMaster) {
1308  assert(i == (uint32_t)-1);
1309  }
1310 #endif
1311 
1312  if (base_DistGraph::id == 0) {
1313  galois::gPrint("Number of BSP sync rounds in master assignment: ",
1314  stateRounds, "\n");
1315  }
1316 
1317  // galois::PerThreadTimer<CUSP_PT_TIMER> ptt(
1318  // GRNAME, "Phase0DetermineMaster_" + std::string(base_DistGraph::id)
1319  //);
1320  for (unsigned syncRound = 0; syncRound < stateRounds; syncRound++) {
1321  uint32_t beginNode;
1322  uint32_t endNode;
1323  std::tie(beginNode, endNode) = galois::block_range(
1324  globalOffset, base_DistGraph::gid2host[base_DistGraph::id].second,
1325  syncRound, stateRounds);
1326 
1327  // create specific range for this block
1328  std::vector<uint32_t> rangeVec;
1329  auto work =
1330  getSpecificThreadRange(bufGraph, rangeVec, beginNode, endNode);
1331 
1332  // debug print
1333  // galois::on_each([&] (unsigned i, unsigned j) {
1334  // galois::gDebug("[", base_DistGraph::id, " ", i, "] sync round ",
1335  // syncRound, " local range ",
1336  // *work.local_begin(), " ", *work.local_end());
1337  //});
1338 
1340  // iterate over my read nodes
1341  galois::iterate(work),
1342  // galois::iterate(beginNode, endNode),
1343  [&](uint32_t node) {
1344  // ptt.start();
1345  // determine master function takes source node, iterator of
1346  // neighbors
1347  uint32_t assignedHost = graphPartitioner->getMaster(
1348  node, bufGraph, localNodeToMaster, gid2offsets, nodeLoads,
1349  nodeAccum, edgeLoads, edgeAccum);
1350  // != -1 means it was assigned a host
1351  assert(assignedHost != (uint32_t)-1);
1352  // update mapping; this is a local node, so can get position
1353  // on map with subtraction
1354  localNodeToMaster[node - globalOffset] = assignedHost;
1355 
1356  // galois::gDebug("[", base_DistGraph::id, "] state round ",
1357  // syncRound,
1358  // " set ", node, " ", node - globalOffset);
1359 
1360  // ptt.stop();
1361  },
1362  galois::loopname("Phase0DetermineMasters"), galois::steal(),
1363  galois::no_stats());
1364 
1365  // do synchronization of master assignment of neighbors
1366  if (!async) {
1367  syncAssignment(beginNode - globalOffset, endNode - globalOffset,
1368  numLocalNodes, localNodeToMaster, syncNodes,
1369  gid2offsets);
1370  } else {
1371  // don't need to send anything if there is nothing to send unlike sync
1372  if (beginNode != endNode) {
1373  syncAssignmentAsync(beginNode - globalOffset, endNode - globalOffset,
1374  numLocalNodes, localNodeToMaster, syncNodes,
1375  gid2offsets, hostFinished);
1376  }
1377  }
1378 
1379  // sync node/edge loads
1380  galois::StatTimer loadSyncTimer("Phase0LoadSyncTime", GRNAME);
1381 
1382  loadSyncTimer.start();
1383  if (!async) {
1384  syncLoad(nodeLoads, nodeAccum);
1385  syncLoad(edgeLoads, edgeAccum);
1386  } else {
1387  asyncSyncLoad(nodeLoads, nodeAccum, edgeLoads, edgeAccum, loadsClear);
1388  }
1389  loadSyncTimer.stop();
1390 
1391 #ifndef NDEBUG
1392  if (async) {
1393  galois::gDebug("[", base_DistGraph::id, "] host count ",
1394  hostFinished.count());
1395  }
1396 #endif
1397  }
1398 
1399  // if asynchronous, don't move on until everything is done
1400  if (async) {
1401  galois::StatTimer waitTime("Phase0AsyncWaitTime", GRNAME);
1402  // assignment clears
1403  sendAllClears();
1404  // load clears
1405  sendAllClears(1);
1406 
1407  hostFinished.set(base_DistGraph::id);
1408  loadsClear.set(base_DistGraph::id);
1409 
1410  waitTime.start();
1411  while (hostFinished.count() != base_DistGraph::numHosts ||
1412  loadsClear.count() != base_DistGraph::numHosts) {
1413  //#ifndef NDEBUG
1414  // galois::gDebug("[", base_DistGraph::id, "] waiting for all hosts to
1415  // finish, ",
1416  // hostFinished.count());
1417  // galois::gDebug("[", base_DistGraph::id, "] waiting for all hosts
1418  // loads "
1419  // "syncs to finish, ", loadsClear.count());
1420  //#endif
1421  // make sure all assignments are done and all loads are done
1422  syncAssignmentReceivesAsync(localNodeToMaster, gid2offsets,
1423  hostFinished);
1424  asyncRecvLoad(nodeLoads, edgeLoads, loadsClear);
1425  }
1426  waitTime.stop();
1427  }
1428 
1429 #ifndef NDEBUG
1430  printLoad(nodeLoads, nodeAccum);
1431  printLoad(edgeLoads, edgeAccum);
1432 #endif
1433 
1434  // sanity check for correctness (all should be assigned)
1435  for (uint32_t i = 0; i < localNodeToMaster.size(); i++) {
1436  if (localNodeToMaster[i] == (uint32_t)-1) {
1437  // galois::gDebug("[", base_DistGraph::id, "] bad index ", i);
1438  assert(localNodeToMaster[i] != (uint32_t)-1);
1439  }
1440  }
1441 
1443  // increment twice if async is used as async uses 2 phases
1444  if (async) {
1446  }
1447 
1449  "] Local master assignment "
1450  "complete.\n");
1451 
1452  // one more step: let masters know of nodes they own (if they don't
1453  // have the node locally then this is the only way they will learn about
1454  // it)
1455  galois::StatTimer p0master2ownerTimer("Phase0MastersToOwners", GRNAME);
1456 
1457  p0master2ownerTimer.start();
1458  sendMastersToOwners(localNodeToMaster, syncNodes);
1459  recvMastersToOwners();
1460  p0master2ownerTimer.stop();
1461 
1462  galois::gPrint("[", base_DistGraph::id, "] Received my master mappings.\n");
1463 
1465 
1466  graphPartitioner->saveGID2HostInfo(gid2offsets, localNodeToMaster,
1467  bufGraph.getNodeOffset());
1468  }
1469 
1470  void edgeCutInspection(galois::graphs::BufferedGraph<EdgeTy>& bufGraph,
1471  galois::StatTimer& inspectionTimer,
1472  uint64_t edgeOffset,
1473  galois::gstl::Vector<uint64_t>& prefixSumOfEdges) {
1474  galois::DynamicBitSet incomingMirrors;
1475  incomingMirrors.resize(base_DistGraph::numGlobalNodes);
1476  incomingMirrors.reset();
1477  uint32_t myID = base_DistGraph::id;
1478  uint64_t globalOffset = base_DistGraph::gid2host[base_DistGraph::id].first;
1479 
1480  // already set before this is called
1482  prefixSumOfEdges.resize(base_DistGraph::numOwned);
1483 
1488  [&](size_t n) {
1489  auto ii = bufGraph.edgeBegin(n);
1490  auto ee = bufGraph.edgeEnd(n);
1491  for (; ii < ee; ++ii) {
1492  uint32_t dst = bufGraph.edgeDestination(*ii);
1493  if (graphPartitioner->retrieveMaster(dst) != myID) {
1494  incomingMirrors.set(dst);
1495  }
1496  }
1497  prefixSumOfEdges[n - globalOffset] = (*ee) - edgeOffset;
1498  ltgv[n - globalOffset] = n;
1499  },
1500 #if MORE_DIST_STATS
1501  galois::loopname("EdgeInspectionLoop"),
1502 #endif
1504  inspectionTimer.stop();
1505 
1506  uint64_t allBytesRead = bufGraph.getBytesRead();
1508  "[", base_DistGraph::id,
1509  "] Edge inspection time: ", inspectionTimer.get_usec() / 1000000.0f,
1510  " seconds to read ", allBytesRead, " bytes (",
1511  allBytesRead / (float)inspectionTimer.get_usec(), " MBPS)\n");
1512 
1513  // get incoming mirrors ready for creation
1514  uint32_t additionalMirrorCount = incomingMirrors.count();
1516  base_DistGraph::localToGlobalVector.size() + additionalMirrorCount);
1517  if (base_DistGraph::numOwned > 0) {
1518  // fill prefix sum with last number (incomings have no edges)
1519  prefixSumOfEdges.resize(prefixSumOfEdges.size() + additionalMirrorCount,
1520  prefixSumOfEdges.back());
1521  } else {
1522  prefixSumOfEdges.resize(additionalMirrorCount);
1523  }
1524 
1525  if (additionalMirrorCount > 0) {
1526  // TODO move this part below into separate function
1527  uint32_t totalNumNodes = base_DistGraph::numGlobalNodes;
1529  std::vector<uint64_t> threadPrefixSums(activeThreads);
1530  galois::on_each([&](unsigned tid, unsigned nthreads) {
1531  size_t beginNode;
1532  size_t endNode;
1533  std::tie(beginNode, endNode) =
1534  galois::block_range(0u, totalNumNodes, tid, nthreads);
1535  uint64_t count = 0;
1536  for (size_t i = beginNode; i < endNode; i++) {
1537  if (incomingMirrors.test(i))
1538  ++count;
1539  }
1540  threadPrefixSums[tid] = count;
1541  });
1542  // get prefix sums
1543  for (unsigned int i = 1; i < threadPrefixSums.size(); i++) {
1544  threadPrefixSums[i] += threadPrefixSums[i - 1];
1545  }
1546 
1547  assert(threadPrefixSums.back() == additionalMirrorCount);
1548 
1549  uint32_t startingNodeIndex = base_DistGraph::numOwned;
1550  // do actual work, second on_each
1551  galois::on_each([&](unsigned tid, unsigned nthreads) {
1552  size_t beginNode;
1553  size_t endNode;
1554  std::tie(beginNode, endNode) =
1555  galois::block_range(0u, totalNumNodes, tid, nthreads);
1556  // start location to start adding things into prefix sums/vectors
1557  uint32_t threadStartLocation = 0;
1558  if (tid != 0) {
1559  threadStartLocation = threadPrefixSums[tid - 1];
1560  }
1561  uint32_t handledNodes = 0;
1562  for (size_t i = beginNode; i < endNode; i++) {
1563  if (incomingMirrors.test(i)) {
1564  base_DistGraph::localToGlobalVector[startingNodeIndex +
1565  threadStartLocation +
1566  handledNodes] = i;
1567  handledNodes++;
1568  }
1569  }
1570  });
1571  }
1572 
1573  base_DistGraph::numNodes = base_DistGraph::numOwned + additionalMirrorCount;
1574  if (prefixSumOfEdges.size() != 0) {
1575  base_DistGraph::numEdges = prefixSumOfEdges.back();
1576  } else {
1578  }
1581  assert(prefixSumOfEdges.size() == base_DistGraph::numNodes);
1582 
1583  // g2l mapping
1585  for (unsigned i = 0; i < base_DistGraph::numNodes; i++) {
1586  // global to local map construction
1588  i;
1589  }
1590  assert(base_DistGraph::globalToLocalMap.size() == base_DistGraph::numNodes);
1591 
1593  }
1594 
1604  template <typename GraphTy,
1605  typename std::enable_if<!std::is_void<
1606  typename GraphTy::edge_data_type>::value>::type* = nullptr>
1607  void edgeCutLoad(GraphTy& graph,
1609  if (base_DistGraph::id == 0) {
1610  galois::gPrint("Loading edge-data while creating edges\n");
1611  }
1612 
1613  uint64_t globalOffset = base_DistGraph::gid2host[base_DistGraph::id].first;
1614  bGraph.resetReadCounters();
1615  galois::StatTimer timer("EdgeLoading", GRNAME);
1616  timer.start();
1617 
1621  [&](size_t n) {
1622  auto ii = bGraph.edgeBegin(n);
1623  auto ee = bGraph.edgeEnd(n);
1624  uint32_t lsrc = this->G2LEdgeCut(n, globalOffset);
1625  uint64_t cur =
1626  *graph.edge_begin(lsrc, galois::MethodFlag::UNPROTECTED);
1627  for (; ii < ee; ++ii) {
1628  auto gdst = bGraph.edgeDestination(*ii);
1629  decltype(gdst) ldst = this->G2LEdgeCut(gdst, globalOffset);
1630  auto gdata = bGraph.edgeData(*ii);
1631  graph.constructEdge(cur++, ldst, gdata);
1632  }
1633  assert(cur == (*graph.edge_end(lsrc)));
1634  },
1635 #if MORE_DIST_STATS
1636  galois::loopname("EdgeLoadingLoop"),
1637 #endif
1639 
1640  timer.stop();
1642  "] Edge loading time: ", timer.get_usec() / 1000000.0f,
1643  " seconds to read ", bGraph.getBytesRead(), " bytes (",
1644  bGraph.getBytesRead() / (float)timer.get_usec(), " MBPS)\n");
1645  }
1646 
1656  template <typename GraphTy,
1657  typename std::enable_if<std::is_void<
1658  typename GraphTy::edge_data_type>::value>::type* = nullptr>
1659  void edgeCutLoad(GraphTy& graph,
1661  if (base_DistGraph::id == 0) {
1662  galois::gPrint("Loading edge-data while creating edges\n");
1663  }
1664 
1665  uint64_t globalOffset = base_DistGraph::gid2host[base_DistGraph::id].first;
1666  bGraph.resetReadCounters();
1667  galois::StatTimer timer("EdgeLoading", GRNAME);
1668  timer.start();
1669 
1673  [&](size_t n) {
1674  auto ii = bGraph.edgeBegin(n);
1675  auto ee = bGraph.edgeEnd(n);
1676  uint32_t lsrc = this->G2LEdgeCut(n, globalOffset);
1677  uint64_t cur =
1678  *graph.edge_begin(lsrc, galois::MethodFlag::UNPROTECTED);
1679  for (; ii < ee; ++ii) {
1680  auto gdst = bGraph.edgeDestination(*ii);
1681  decltype(gdst) ldst = this->G2LEdgeCut(gdst, globalOffset);
1682  graph.constructEdge(cur++, ldst);
1683  }
1684  assert(cur == (*graph.edge_end(lsrc)));
1685  },
1686 #if MORE_DIST_STATS
1687  galois::loopname("EdgeLoadingLoop"),
1688 #endif
1690 
1691  timer.stop();
1693  "] Edge loading time: ", timer.get_usec() / 1000000.0f,
1694  " seconds to read ", bGraph.getBytesRead(), " bytes (",
1695  bGraph.getBytesRead() / (float)timer.get_usec(), " MBPS)\n");
1696  }
1697 
1707  void edgeInspection(galois::graphs::BufferedGraph<EdgeTy>& bufGraph,
1708  std::vector<std::vector<uint64_t>>& numOutgoingEdges,
1709  std::vector<galois::DynamicBitSet>& hasIncomingEdge,
1710  galois::StatTimer& inspectionTimer) {
1711  // number of nodes that this host has read from disk
1712  uint32_t numRead = base_DistGraph::gid2host[base_DistGraph::id].second -
1714 
1715  // allocate space for outgoing edges
1716  for (uint32_t i = 0; i < base_DistGraph::numHosts; ++i) {
1717  numOutgoingEdges[i].assign(numRead, 0);
1718  }
1719 
1720  galois::DynamicBitSet hostHasOutgoing;
1721  hostHasOutgoing.resize(base_DistGraph::numHosts);
1722  hostHasOutgoing.reset();
1723  assignEdges(bufGraph, numOutgoingEdges, hasIncomingEdge, hostHasOutgoing);
1724 
1725  inspectionTimer.stop();
1726  // report edge inspection time
1727  uint64_t allBytesRead = bufGraph.getBytesRead();
1729  "[", base_DistGraph::id,
1730  "] Edge inspection time: ", inspectionTimer.get_usec() / 1000000.0f,
1731  " seconds to read ", allBytesRead, " bytes (",
1732  allBytesRead / (float)inspectionTimer.get_usec(), " MBPS)\n");
1733 
1734  // old inspection barrier
1735  // galois::runtime::getHostBarrier().wait();
1736 
1737  sendInspectionData(numOutgoingEdges, hasIncomingEdge, hostHasOutgoing);
1738 
1739  // setup a single hasIncomingEdge bitvector
1740 
1741  uint32_t myHostID = base_DistGraph::id;
1742  if (hasIncomingEdge[myHostID].size() == 0) {
1743  hasIncomingEdge[myHostID].resize(base_DistGraph::numGlobalNodes);
1744  hasIncomingEdge[myHostID].reset();
1745  }
1746  recvInspectionData(numOutgoingEdges, hasIncomingEdge[myHostID]);
1748  }
1749 
1762  void assignEdges(galois::graphs::BufferedGraph<EdgeTy>& bufGraph,
1763  std::vector<std::vector<uint64_t>>& numOutgoingEdges,
1764  std::vector<galois::DynamicBitSet>& hasIncomingEdge,
1765  galois::DynamicBitSet& hostHasOutgoing) {
1766  std::vector<galois::CopyableAtomic<char>> indicatorVars(
1767  base_DistGraph::numHosts);
1768  // initialize indicators of initialized bitsets to 0
1769  for (unsigned i = 0; i < base_DistGraph::numHosts; i++) {
1770  indicatorVars[i] = 0;
1771  }
1772 
1773  // global offset into my read nodes
1774  uint64_t globalOffset = base_DistGraph::gid2host[base_DistGraph::id].first;
1775  uint32_t globalNodes = base_DistGraph::numGlobalNodes;
1776 
1777  for (unsigned syncRound = 0; syncRound < _edgeStateRounds; syncRound++) {
1778  uint32_t beginNode;
1779  uint32_t endNode;
1780  std::tie(beginNode, endNode) = galois::block_range(
1781  globalOffset, base_DistGraph::gid2host[base_DistGraph::id].second,
1782  syncRound, _edgeStateRounds);
1783  // TODO maybe edge range this?
1784 
1786  // iterate over my read nodes
1787  galois::iterate(beginNode, endNode),
1788  [&](size_t src) {
1789  auto ee = bufGraph.edgeBegin(src);
1790  auto ee_end = bufGraph.edgeEnd(src);
1791  uint64_t numEdgesL = std::distance(ee, ee_end);
1792 
1793  for (; ee != ee_end; ee++) {
1794  uint32_t dst = bufGraph.edgeDestination(*ee);
1795  uint32_t hostBelongs = -1;
1796  hostBelongs = graphPartitioner->getEdgeOwner(src, dst, numEdgesL);
1797  if (_edgeStateRounds > 1) {
1798  hostLoads[hostBelongs] += 1;
1799  }
1800 
1801  numOutgoingEdges[hostBelongs][src - globalOffset] += 1;
1802  hostHasOutgoing.set(hostBelongs);
1803  bool hostIsMasterOfDest =
1804  (hostBelongs == graphPartitioner->retrieveMaster(dst));
1805 
1806  // this means a mirror must be created for destination node on
1807  // that host since it will not be created otherwise
1808  if (!hostIsMasterOfDest) {
1809  auto& bitsetStatus = indicatorVars[hostBelongs];
1810 
1811  // initialize the bitset if necessary
1812  if (bitsetStatus == 0) {
1813  char expected = 0;
1814  bool result =
1815  bitsetStatus.compare_exchange_strong(expected, 1);
1816  // i swapped successfully, therefore do allocation
1817  if (result) {
1818  hasIncomingEdge[hostBelongs].resize(globalNodes);
1819  hasIncomingEdge[hostBelongs].reset();
1820  bitsetStatus = 2;
1821  }
1822  }
1823  // until initialized, loop
1824  while (indicatorVars[hostBelongs] != 2)
1825  ;
1826  hasIncomingEdge[hostBelongs].set(dst);
1827  }
1828  }
1829  },
1830 #if MORE_DIST_STATS
1831  galois::loopname("AssignEdges"),
1832 #endif
1834  syncEdgeLoad();
1835  }
1836  }
1837 
1848  void
1849  serializeOutgoingMasterMap(galois::runtime::SendBuffer& b,
1850  const std::vector<uint64_t>& hostOutgoingEdges) {
1851  // 2 phase: one phase determines amount of work each thread does,
1852  // second has threads actually do copies
1853  uint32_t activeThreads = galois::getActiveThreads();
1854  std::vector<uint64_t> threadPrefixSums(activeThreads);
1855  size_t hostSize = base_DistGraph::gid2host[base_DistGraph::id].second -
1857  assert(hostSize == hostOutgoingEdges.size());
1858 
1859  // for each thread, figure out how many items it will work with
1860  // (non-zero outgoing edges)
1861  galois::on_each([&](unsigned tid, unsigned nthreads) {
1862  size_t beginNode;
1863  size_t endNode;
1864  std::tie(beginNode, endNode) =
1865  galois::block_range((size_t)0, hostSize, tid, nthreads);
1866  uint64_t count = 0;
1867  for (size_t i = beginNode; i < endNode; i++) {
1868  if (hostOutgoingEdges[i] > 0) {
1869  count++;
1870  }
1871  }
1872  threadPrefixSums[tid] = count;
1873  });
1874 
1875  // get prefix sums
1876  for (unsigned int i = 1; i < threadPrefixSums.size(); i++) {
1877  threadPrefixSums[i] += threadPrefixSums[i - 1];
1878  }
1879 
1880  uint32_t numNonZero = threadPrefixSums[activeThreads - 1];
1881  std::vector<uint32_t> masterLocation;
1882  masterLocation.resize(numNonZero, (uint32_t)-1);
1883  // should only be in here if there's something to send in first place
1884  assert(numNonZero > 0);
1885 
1886  uint64_t startNode = base_DistGraph::gid2host[base_DistGraph::id].first;
1887 
1888  // do actual work, second on_each; find non-zeros again, get master
1889  // corresponding to that non-zero and send to other end
1890  galois::on_each([&](unsigned tid, unsigned nthreads) {
1891  size_t beginNode;
1892  size_t endNode;
1893  std::tie(beginNode, endNode) =
1894  galois::block_range((size_t)0, hostSize, tid, nthreads);
1895  // start location to start adding things into prefix sums/vectors
1896  uint32_t threadStartLocation = 0;
1897  if (tid != 0) {
1898  threadStartLocation = threadPrefixSums[tid - 1];
1899  }
1900 
1901  uint32_t handledNodes = 0;
1902  for (size_t i = beginNode; i < endNode; i++) {
1903  if (hostOutgoingEdges[i] > 0) {
1904  // get master of i
1905  masterLocation[threadStartLocation + handledNodes] =
1906  graphPartitioner->retrieveMaster(i + startNode);
1907  handledNodes++;
1908  }
1909  }
1910  });
1911 
1912 #ifndef NDEBUG
1913  for (uint32_t i : masterLocation) {
1914  assert(i != (uint32_t)-1);
1915  }
1916 #endif
1917 
1918  // serialize into buffer; since this is sent along with vector receiver end
1919  // will know how to deal with it
1920  galois::runtime::gSerialize(b, masterLocation);
1921  }
1922 
1923  void
1924  serializeIncomingMasterMap(galois::runtime::SendBuffer& b,
1925  const galois::DynamicBitSet& hostIncomingEdges) {
1926  size_t numOfNodes = hostIncomingEdges.count();
1927  std::vector<uint32_t> masterMap;
1928  masterMap.resize(numOfNodes, (uint32_t)-1);
1929 
1930  std::vector<uint32_t> bitsetOffsets = hostIncomingEdges.getOffsets();
1931 
1932  // size_t firstBound = base_DistGraph::gid2host[h].first;
1933  // size_t secondBound = base_DistGraph::gid2host[h].second;
1934 
1935  // galois::do_all(
1936  // galois::iterate((size_t)0, firstBound),
1937  // [&] (size_t offset) {
1938  // masterMap[offset] =
1939  // graphPartitioner->retrieveMaster(bitsetOffsets[offset]);
1940  // },
1941  // galois::no_stats()
1942  //);
1943 
1945  // galois::iterate((size_t)secondBound, numOfNodes),
1946  galois::iterate((size_t)0, numOfNodes),
1947  [&](size_t offset) {
1948  masterMap[offset] =
1949  graphPartitioner->retrieveMaster(bitsetOffsets[offset]);
1950  },
1951  galois::no_stats());
1952 
1953 #ifndef NDEBUG
1954  for (uint32_t i : masterMap) {
1955  assert(i != (uint32_t)-1);
1956  assert(i < base_DistGraph::numHosts);
1957  }
1958 #endif
1959 
1960  // serialize into buffer; since this is sent along with vector receiver end
1961  // will know how to deal with it
1962  galois::runtime::gSerialize(b, masterMap);
1963  }
1964 
1965  void deserializeOutgoingMasterMap(
1966  uint32_t senderHost, const std::vector<uint64_t>& hostOutgoingEdges,
1967  const std::vector<uint32_t>& recvMasterLocations) {
1968  uint64_t hostOffset = base_DistGraph::gid2host[senderHost].first;
1969  size_t hostSize = base_DistGraph::gid2host[senderHost].second -
1970  base_DistGraph::gid2host[senderHost].first;
1971  assert(hostSize == hostOutgoingEdges.size());
1972  galois::DynamicBitSet offsetsToConsider;
1973  offsetsToConsider.resize(hostSize);
1974  offsetsToConsider.reset();
1975 
1976  // step 1: figure out offsets that need to be handled (i.e. non-zero): only
1977  // handle if not already in map
1979  galois::iterate((size_t)0, hostOutgoingEdges.size()),
1980  [&](size_t offset) {
1981  if (hostOutgoingEdges[offset] > 0) {
1982  offsetsToConsider.set(offset);
1983  }
1984  },
1986  assert(offsetsToConsider.count() == recvMasterLocations.size());
1987 
1988  // step 2: using bitset that tells which offsets are set, add
1989  // to already master map in partitioner (this is single threaded
1990  // since map is not a concurrent data structure)
1991  size_t curCount = 0;
1992  // size_t actuallySet = 0;
1993  for (uint32_t offset : offsetsToConsider.getOffsets()) {
1994  // galois::gDebug("[", base_DistGraph::id, "] ", " setting ",
1995  // offset + hostOffset, " from host ", senderHost,
1996  // " to ", recvMasterLocations[curCount]);
1997  graphPartitioner->addMasterMapping(offset + hostOffset,
1998  recvMasterLocations[curCount]);
1999  // bool set = graphPartitioner->addMasterMapping(offset + hostOffset,
2000  // recvMasterLocations[curCount]);
2001  // if (set) { actuallySet++; }
2002  curCount++;
2003  }
2004 
2005  // galois::gDebug("[", base_DistGraph::id, "] host ", senderHost, ": set ",
2006  // actuallySet, " out of ", recvMasterLocations.size());
2007  }
2008 
2016  void deserializeIncomingMasterMap(
2017  const std::vector<uint32_t>& gids,
2018  const std::vector<uint32_t>& recvMasterLocations) {
2019  assert(gids.size() == recvMasterLocations.size());
2020  size_t curCount = 0;
2021  for (uint64_t gid : gids) {
2022  assert(gid < base_DistGraph::numGlobalNodes);
2023  // galois::gDebug("[", base_DistGraph::id, "] ", " in-setting ", gid, " to
2024  // ",
2025  // recvMasterLocations[curCount]);
2026  graphPartitioner->addMasterMapping(gid, recvMasterLocations[curCount]);
2027  curCount++;
2028  }
2029  }
2030 
2041  void sendInspectionData(std::vector<std::vector<uint64_t>>& numOutgoingEdges,
2042  std::vector<galois::DynamicBitSet>& hasIncomingEdge,
2043  galois::DynamicBitSet& hostHasOutgoing) {
2045 
2047  bytesSent.reset();
2048 
2049  for (unsigned h = 0; h < net.Num; h++) {
2050  if (h == net.ID) {
2051  // i have no outgoing edges i will keep; go ahead and clear
2052  if (!hostHasOutgoing.test(h)) {
2053  numOutgoingEdges[h].clear();
2054  }
2055  continue;
2056  }
2057  // send outgoing edges data off to comm partner
2059 
2060  // only send if non-zeros exist
2061  if (hostHasOutgoing.test(h)) {
2062  galois::runtime::gSerialize(b, 1); // token saying data exists
2063  galois::runtime::gSerialize(b, numOutgoingEdges[h]);
2064  if (graphPartitioner->masterAssignPhase()) {
2065  serializeOutgoingMasterMap(b, numOutgoingEdges[h]);
2066  }
2067  } else {
2068  galois::runtime::gSerialize(b, 0); // token saying no data exists
2069  }
2070  numOutgoingEdges[h].clear();
2071 
2072  // determine form to send bitset in
2073  galois::DynamicBitSet& curBitset = hasIncomingEdge[h];
2074  uint64_t bitsetSize = curBitset.size(); // num bits
2075  uint64_t onlyOffsetsSize = curBitset.count() * 32;
2076  if (bitsetSize == 0) {
2077  // there was nothing there to send in first place
2078  galois::runtime::gSerialize(b, 0);
2079  } else if (onlyOffsetsSize <= bitsetSize) {
2080  // send only offsets
2081  std::vector<uint32_t> offsets = curBitset.getOffsets();
2082  galois::runtime::gSerialize(b, 2); // 2 = only offsets
2083  galois::runtime::gSerialize(b, offsets);
2084 
2085  if (graphPartitioner->masterAssignPhase()) {
2086  // galois::gDebug("incoming master map serialization");
2087  // serializeIncomingMasterMap(b, curBitset, h);
2088  serializeIncomingMasterMap(b, curBitset);
2089  }
2090  } else {
2091  // send entire bitset
2092  galois::runtime::gSerialize(b, 1);
2093  galois::runtime::gSerialize(b, curBitset);
2094  if (graphPartitioner->masterAssignPhase()) {
2095  // galois::gDebug("incoming master map serialization");
2096  // serializeIncomingMasterMap(b, curBitset, h);
2097  serializeIncomingMasterMap(b, curBitset);
2098  }
2099  }
2100  // get memory from bitset back
2101  curBitset.resize(0);
2102 
2103  bytesSent.update(b.size());
2104 
2105  // send buffer and free memory
2106  net.sendTagged(h, galois::runtime::evilPhase, b);
2107  b.getVec().clear();
2108  }
2109 
2111  GRNAME, std::string("EdgeInspectionBytesSent"), bytesSent.reduce());
2112 
2113  galois::gPrint("[", base_DistGraph::id, "] Inspection sends complete.\n");
2114  }
2115 
2125  void recvInspectionData(std::vector<std::vector<uint64_t>>& numOutgoingEdges,
2126  galois::DynamicBitSet& hasIncomingEdge) {
2128 
2129  for (unsigned h = 0; h < net.Num - 1; h++) {
2130  // expect data from comm partner back
2131  decltype(net.recieveTagged(galois::runtime::evilPhase, nullptr)) p;
2132  do {
2133  p = net.recieveTagged(galois::runtime::evilPhase, nullptr);
2134  } while (!p);
2135 
2136  uint32_t sendingHost = p->first;
2137 
2138  // get outgoing edges; first get status var
2139  uint32_t outgoingExists = 2;
2140  galois::runtime::gDeserialize(p->second, outgoingExists);
2141 
2142  if (outgoingExists == 1) {
2143  // actual data sent
2144  galois::runtime::gDeserialize(p->second, numOutgoingEdges[sendingHost]);
2145 
2146  if (graphPartitioner->masterAssignPhase()) {
2147  std::vector<uint32_t> recvMasterLocations;
2148  galois::runtime::gDeserialize(p->second, recvMasterLocations);
2149  deserializeOutgoingMasterMap(
2150  sendingHost, numOutgoingEdges[sendingHost], recvMasterLocations);
2151  }
2152  } else if (outgoingExists == 0) {
2153  // no data sent; just clear again
2154  numOutgoingEdges[sendingHost].clear();
2155  } else {
2156  GALOIS_DIE("invalid recv inspection data metadata mode, outgoing");
2157  }
2158 
2159  uint32_t bitsetMetaMode = 3; // initialize to invalid mode
2160  galois::runtime::gDeserialize(p->second, bitsetMetaMode);
2161  if (bitsetMetaMode == 1) {
2162  // sent as bitset; deserialize then or with main bitset
2163  galois::DynamicBitSet recvSet;
2164  galois::runtime::gDeserialize(p->second, recvSet);
2165  hasIncomingEdge.bitwise_or(recvSet);
2166 
2167  if (graphPartitioner->masterAssignPhase()) {
2168  std::vector<uint32_t> recvMasterLocations;
2169  galois::runtime::gDeserialize(p->second, recvMasterLocations);
2170  deserializeIncomingMasterMap(recvSet.getOffsets(),
2171  recvMasterLocations);
2172  }
2173  } else if (bitsetMetaMode == 2) {
2174  // sent as vector of offsets
2175  std::vector<uint32_t> recvOffsets;
2176  galois::runtime::gDeserialize(p->second, recvOffsets);
2177  for (uint32_t offset : recvOffsets) {
2178  hasIncomingEdge.set(offset);
2179  }
2180 
2181  if (graphPartitioner->masterAssignPhase()) {
2182  std::vector<uint32_t> recvMasterLocations;
2183  galois::runtime::gDeserialize(p->second, recvMasterLocations);
2184  deserializeIncomingMasterMap(recvOffsets, recvMasterLocations);
2185  }
2186  } else if (bitsetMetaMode == 0) {
2187  // do nothing; there was nothing to receive
2188  } else {
2189  GALOIS_DIE("invalid recv inspection data metadata mode");
2190  }
2191  }
2192 
2194  "] Inspection receives complete.\n");
2195  }
2196 
2202  nodeMapping(std::vector<std::vector<uint64_t>>& numOutgoingEdges,
2203  galois::DynamicBitSet& hasIncomingEdge,
2204  galois::gstl::Vector<uint64_t>& prefixSumOfEdges) {
2205  base_DistGraph::numNodes = 0;
2207  nodesToReceive = 0;
2208 
2209  // reserve overestimation of nodes
2210  prefixSumOfEdges.reserve(base_DistGraph::numGlobalNodes /
2211  base_DistGraph::numHosts * 1.15);
2213  base_DistGraph::numGlobalNodes / base_DistGraph::numHosts * 1.15);
2214 
2215  inspectMasterNodes(numOutgoingEdges, prefixSumOfEdges);
2216  inspectOutgoingNodes(numOutgoingEdges, prefixSumOfEdges);
2217  createIntermediateMetadata(prefixSumOfEdges, hasIncomingEdge.count());
2218  inspectIncomingNodes(hasIncomingEdge, prefixSumOfEdges);
2219  finalizeInspection(prefixSumOfEdges);
2220 
2222  "] To receive this many nodes: ", nodesToReceive);
2223 
2224  galois::gPrint("[", base_DistGraph::id, "] Inspection mapping complete.\n");
2225  return prefixSumOfEdges;
2226  }
2227 
2232  void inspectMasterNodes(std::vector<std::vector<uint64_t>>& numOutgoingEdges,
2233  galois::gstl::Vector<uint64_t>& prefixSumOfEdges) {
2234  uint32_t myHID = base_DistGraph::id;
2235 
2237  toReceive.reset();
2238 
2239  for (unsigned h = 0; h < base_DistGraph::numHosts; ++h) {
2240  uint32_t activeThreads = galois::getActiveThreads();
2241  std::vector<uint64_t> threadPrefixSums(activeThreads);
2242  uint64_t startNode = base_DistGraph::gid2host[h].first;
2243  uint64_t lastNode = base_DistGraph::gid2host[h].second;
2244  size_t hostSize = lastNode - startNode;
2245 
2246  if (numOutgoingEdges[h].size() != 0) {
2247  assert(hostSize == numOutgoingEdges[h].size());
2248  }
2249 
2250  // for each thread, figure out how many items it will work with (only
2251  // owned nodes)
2252  galois::on_each([&](unsigned tid, unsigned nthreads) {
2253  size_t beginNode;
2254  size_t endNode;
2255  // loop over all nodes that host h has read
2256  std::tie(beginNode, endNode) =
2257  galois::block_range((size_t)0, hostSize, tid, nthreads);
2258  uint64_t count = 0;
2259  for (size_t i = beginNode; i < endNode; i++) {
2260  // galois::gDebug("[", base_DistGraph::id, "] ", i + startNode,
2261  // " mapped to ",
2262  // graphPartitioner->retrieveMaster(i+startNode));
2263  if (graphPartitioner->retrieveMaster(i + startNode) == myHID) {
2264  count++;
2265  }
2266  }
2267  threadPrefixSums[tid] = count;
2268  });
2269 
2270  // get prefix sums
2271  for (unsigned int i = 1; i < threadPrefixSums.size(); i++) {
2272  threadPrefixSums[i] += threadPrefixSums[i - 1];
2273  }
2274 
2275  assert(prefixSumOfEdges.size() == base_DistGraph::numNodes);
2277  base_DistGraph::numNodes);
2278 
2279  uint32_t newMasterNodes = threadPrefixSums[activeThreads - 1];
2280  galois::gDebug("[", base_DistGraph::id, "] This many masters from host ",
2281  h, ": ", newMasterNodes);
2282  uint32_t startingNodeIndex = base_DistGraph::numNodes;
2283  // increase size of prefix sum + mapping vector
2284  prefixSumOfEdges.resize(base_DistGraph::numNodes + newMasterNodes);
2285  base_DistGraph::localToGlobalVector.resize(base_DistGraph::numNodes +
2286  newMasterNodes);
2287 
2288  if (newMasterNodes > 0) {
2289  // do actual work, second on_each
2290  galois::on_each([&](unsigned tid, unsigned nthreads) {
2291  size_t beginNode;
2292  size_t endNode;
2293  std::tie(beginNode, endNode) =
2294  galois::block_range((size_t)0, hostSize, tid, nthreads);
2295 
2296  // start location to start adding things into prefix sums/vectors
2297  uint32_t threadStartLocation = 0;
2298  if (tid != 0) {
2299  threadStartLocation = threadPrefixSums[tid - 1];
2300  }
2301 
2302  uint32_t handledNodes = 0;
2303  for (size_t i = beginNode; i < endNode; i++) {
2304  uint32_t globalID = startNode + i;
2305  // if this node is master, get outgoing edges + save mapping
2306  if (graphPartitioner->retrieveMaster(globalID) == myHID) {
2307  // check size
2308  if (numOutgoingEdges[h].size() > 0) {
2309  uint64_t myEdges = numOutgoingEdges[h][i];
2310  numOutgoingEdges[h][i] = 0; // set to 0; does not need to be
2311  // handled later
2312  prefixSumOfEdges[startingNodeIndex + threadStartLocation +
2313  handledNodes] = myEdges;
2314  if (myEdges > 0 && h != myHID) {
2315  toReceive += 1;
2316  }
2317  } else {
2318  prefixSumOfEdges[startingNodeIndex + threadStartLocation +
2319  handledNodes] = 0;
2320  }
2321 
2322  base_DistGraph::localToGlobalVector[startingNodeIndex +
2323  threadStartLocation +
2324  handledNodes] = globalID;
2325  handledNodes++;
2326  }
2327  }
2328  });
2329  base_DistGraph::numNodes += newMasterNodes;
2330  }
2331  }
2332 
2333  nodesToReceive += toReceive.reduce();
2334  // masters have been handled
2336  }
2337 
2342  void
2343  inspectOutgoingNodes(std::vector<std::vector<uint64_t>>& numOutgoingEdges,
2344  galois::gstl::Vector<uint64_t>& prefixSumOfEdges) {
2345  uint32_t myHID = base_DistGraph::id;
2346 
2348  toReceive.reset();
2349 
2350  for (unsigned h = 0; h < base_DistGraph::numHosts; ++h) {
2351  size_t hostSize = numOutgoingEdges[h].size();
2352  // if i got no outgoing info from this host, safely continue to next one
2353  if (hostSize == 0) {
2354  continue;
2355  }
2356 
2357  uint32_t activeThreads = galois::getActiveThreads();
2358  std::vector<uint64_t> threadPrefixSums(activeThreads);
2359 
2360  // for each thread, figure out how many items it will work with (only
2361  // owned nodes)
2362  galois::on_each([&](unsigned tid, unsigned nthreads) {
2363  size_t beginNode;
2364  size_t endNode;
2365  std::tie(beginNode, endNode) =
2366  galois::block_range((size_t)0, hostSize, tid, nthreads);
2367  uint64_t count = 0;
2368  for (size_t i = beginNode; i < endNode; i++) {
2369  if (numOutgoingEdges[h][i] > 0) {
2370  count++;
2371  }
2372  }
2373  threadPrefixSums[tid] = count;
2374  });
2375 
2376  // get prefix sums
2377  for (unsigned int i = 1; i < threadPrefixSums.size(); i++) {
2378  threadPrefixSums[i] += threadPrefixSums[i - 1];
2379  }
2380 
2381  assert(prefixSumOfEdges.size() == base_DistGraph::numNodes);
2383  base_DistGraph::numNodes);
2384 
2385  uint32_t newOutgoingNodes = threadPrefixSums[activeThreads - 1];
2386  // increase size of prefix sum + mapping vector
2387  prefixSumOfEdges.resize(base_DistGraph::numNodes + newOutgoingNodes);
2388  base_DistGraph::localToGlobalVector.resize(base_DistGraph::numNodes +
2389  newOutgoingNodes);
2390 
2391  uint64_t startNode = base_DistGraph::gid2host[h].first;
2392  uint32_t startingNodeIndex = base_DistGraph::numNodes;
2393 
2394  if (newOutgoingNodes > 0) {
2395  // do actual work, second on_each
2396  galois::on_each([&](unsigned tid, unsigned nthreads) {
2397  size_t beginNode;
2398  size_t endNode;
2399  std::tie(beginNode, endNode) =
2400  galois::block_range((size_t)0, hostSize, tid, nthreads);
2401 
2402  // start location to start adding things into prefix sums/vectors
2403  uint32_t threadStartLocation = 0;
2404  if (tid != 0) {
2405  threadStartLocation = threadPrefixSums[tid - 1];
2406  }
2407 
2408  uint32_t handledNodes = 0;
2409 
2410  for (size_t i = beginNode; i < endNode; i++) {
2411  uint64_t myEdges = numOutgoingEdges[h][i];
2412  if (myEdges > 0) {
2413  prefixSumOfEdges[startingNodeIndex + threadStartLocation +
2414  handledNodes] = myEdges;
2415  base_DistGraph::localToGlobalVector[startingNodeIndex +
2416  threadStartLocation +
2417  handledNodes] = startNode + i;
2418  handledNodes++;
2419 
2420  if (myEdges > 0 && h != myHID) {
2421  toReceive += 1;
2422  }
2423  }
2424  }
2425  });
2426  base_DistGraph::numNodes += newOutgoingNodes;
2427  }
2428  // don't need anymore after this point; get memory back
2429  numOutgoingEdges[h].clear();
2430  }
2431 
2432  nodesToReceive += toReceive.reduce();
2434  }
2435 
2443  void
2444  createIntermediateMetadata(galois::gstl::Vector<uint64_t>& prefixSumOfEdges,
2445  const uint64_t incomingEstimate) {
2446  if (base_DistGraph::numNodes == 0) {
2447  return;
2448  }
2450  incomingEstimate);
2452  0;
2453  // global to local map construction using num nodes with edges
2454  for (unsigned i = 1; i < base_DistGraph::numNodesWithEdges; i++) {
2455  prefixSumOfEdges[i] += prefixSumOfEdges[i - 1];
2456  base_DistGraph::globalToLocalMap[base_DistGraph::localToGlobalVector[i]] =
2457  i;
2458  }
2459  }
2460 
2465  void inspectIncomingNodes(galois::DynamicBitSet& hasIncomingEdge,
2466  galois::gstl::Vector<uint64_t>& prefixSumOfEdges) {
2467  uint32_t totalNumNodes = base_DistGraph::numGlobalNodes;
2468 
2469  uint32_t activeThreads = galois::getActiveThreads();
2470  std::vector<uint64_t> threadPrefixSums(activeThreads);
2471 
2472  galois::on_each([&](unsigned tid, unsigned nthreads) {
2473  size_t beginNode;
2474  size_t endNode;
2475  std::tie(beginNode, endNode) =
2476  galois::block_range(0u, totalNumNodes, tid, nthreads);
2477  uint64_t count = 0;
2478  for (size_t i = beginNode; i < endNode; i++) {
2479  // only count if doesn't exist in global/local map + is incoming
2480  // edge
2481  if (hasIncomingEdge.test(i) &&
2483  ++count;
2484  }
2485  threadPrefixSums[tid] = count;
2486  });
2487  // get prefix sums
2488  for (unsigned int i = 1; i < threadPrefixSums.size(); i++) {
2489  threadPrefixSums[i] += threadPrefixSums[i - 1];
2490  }
2491 
2492  assert(prefixSumOfEdges.size() == base_DistGraph::numNodes);
2493  assert(base_DistGraph::localToGlobalVector.size() ==
2494  base_DistGraph::numNodes);
2495 
2496  uint32_t newIncomingNodes = threadPrefixSums[activeThreads - 1];
2497  // increase size of prefix sum + mapping vector
2498  prefixSumOfEdges.resize(base_DistGraph::numNodes + newIncomingNodes);
2499  base_DistGraph::localToGlobalVector.resize(base_DistGraph::numNodes +
2500  newIncomingNodes);
2501 
2502  uint32_t startingNodeIndex = base_DistGraph::numNodes;
2503 
2504  if (newIncomingNodes > 0) {
2505  // do actual work, second on_each
2506  galois::on_each([&](unsigned tid, unsigned nthreads) {
2507  size_t beginNode;
2508  size_t endNode;
2509  std::tie(beginNode, endNode) =
2510  galois::block_range(0u, totalNumNodes, tid, nthreads);
2511 
2512  // start location to start adding things into prefix sums/vectors
2513  uint32_t threadStartLocation = 0;
2514  if (tid != 0) {
2515  threadStartLocation = threadPrefixSums[tid - 1];
2516  }
2517 
2518  uint32_t handledNodes = 0;
2519 
2520  for (size_t i = beginNode; i < endNode; i++) {
2521  if (hasIncomingEdge.test(i) &&
2523  prefixSumOfEdges[startingNodeIndex + threadStartLocation +
2524  handledNodes] = 0;
2525  base_DistGraph::localToGlobalVector[startingNodeIndex +
2526  threadStartLocation +
2527  handledNodes] = i;
2528  handledNodes++;
2529  }
2530  }
2531  });
2532  base_DistGraph::numNodes += newIncomingNodes;
2533  }
2534  }
2535 
2539  void finalizeInspection(galois::gstl::Vector<uint64_t>& prefixSumOfEdges) {
2540  // reserve rest of memory needed
2541  base_DistGraph::globalToLocalMap.reserve(base_DistGraph::numNodes);
2542  for (unsigned i = base_DistGraph::numNodesWithEdges;
2543  i < base_DistGraph::numNodes; i++) {
2544  // finalize prefix sum
2545  prefixSumOfEdges[i] += prefixSumOfEdges[i - 1];
2546  // global to local map construction
2547  base_DistGraph::globalToLocalMap[base_DistGraph::localToGlobalVector[i]] =
2548  i;
2549  }
2550  if (prefixSumOfEdges.size() != 0) {
2551  base_DistGraph::numEdges = prefixSumOfEdges.back();
2552  } else {
2554  }
2555  }
2556 
2558 
2563  void fillMirrors() {
2564  base_DistGraph::mirrorNodes.reserve(base_DistGraph::numNodes -
2566  for (uint32_t i = base_DistGraph::numOwned; i < base_DistGraph::numNodes;
2567  i++) {
2568  uint32_t globalID = base_DistGraph::localToGlobalVector[i];
2569  base_DistGraph::mirrorNodes[graphPartitioner->retrieveMaster(globalID)]
2570  .push_back(globalID);
2571  }
2572  }
2573 
2575 
2576  template <typename GraphTy>
2577  void loadEdges(GraphTy& graph,
2579  if (base_DistGraph::id == 0) {
2580  if (std::is_void<typename GraphTy::edge_data_type>::value) {
2581  fprintf(stderr, "Loading void edge-data while creating edges.\n");
2582  } else {
2583  fprintf(stderr, "Loading edge-data while creating edges.\n");
2584  }
2585  }
2586 
2587  bufGraph.resetReadCounters();
2588 
2589  std::atomic<uint32_t> receivedNodes;
2590  receivedNodes.store(0);
2591 
2592  galois::StatTimer loadEdgeTimer("EdgeLoading", GRNAME);
2593  loadEdgeTimer.start();
2594 
2595  // sends data
2596  sendEdges(graph, bufGraph, receivedNodes);
2597  uint64_t bufBytesRead = bufGraph.getBytesRead();
2598  // get data from graph back (don't need it after sending things out)
2599  bufGraph.resetAndFree();
2600 
2601  // receives data
2603  [&](unsigned, unsigned) { receiveEdges(graph, receivedNodes); });
2605 
2606  loadEdgeTimer.stop();
2607 
2608  galois::gPrint("[", base_DistGraph::id, "] Edge loading time: ",
2609  loadEdgeTimer.get_usec() / 1000000.0f, " seconds to read ",
2610  bufBytesRead, " bytes (",
2611  bufBytesRead / (float)loadEdgeTimer.get_usec(), " MBPS)\n");
2612  }
2613 
2614  // Edge type is not void. (i.e. edge data exists)
2615  template <typename GraphTy,
2616  typename std::enable_if<!std::is_void<
2617  typename GraphTy::edge_data_type>::value>::type* = nullptr>
2618  void sendEdges(GraphTy& graph,
2620  std::atomic<uint32_t>& receivedNodes) {
2621  using DstVecType = std::vector<std::vector<uint64_t>>;
2622  using DataVecType =
2623  std::vector<std::vector<typename GraphTy::edge_data_type>>;
2624  using SendBufferVecTy = std::vector<galois::runtime::SendBuffer>;
2625 
2627  base_DistGraph::numHosts);
2629  base_DistGraph::numHosts);
2631  base_DistGraph::numHosts);
2632 
2634  const unsigned& id = this->base_DistGraph::id;
2635  const unsigned& numHosts = this->base_DistGraph::numHosts;
2636 
2637  galois::GAccumulator<uint64_t> messagesSent;
2639  galois::GReduceMax<uint64_t> maxBytesSent;
2640  messagesSent.reset();
2641  bytesSent.reset();
2642  maxBytesSent.reset();
2643 
2644  for (unsigned syncRound = 0; syncRound < _edgeStateRounds; syncRound++) {
2645  uint32_t beginNode;
2646  uint32_t endNode;
2647  std::tie(beginNode, endNode) = galois::block_range(
2649  base_DistGraph::gid2host[base_DistGraph::id].second, syncRound,
2650  _edgeStateRounds);
2651 
2652  // Go over assigned nodes and distribute edges.
2654  galois::iterate(beginNode, endNode),
2655  [&](uint64_t src) {
2656  uint32_t lsrc = 0;
2657  uint64_t curEdge = 0;
2658  if (this->isLocal(src)) {
2659  lsrc = this->G2L(src);
2660  curEdge =
2661  *graph.edge_begin(lsrc, galois::MethodFlag::UNPROTECTED);
2662  }
2663 
2664  auto ee = bufGraph.edgeBegin(src);
2665  auto ee_end = bufGraph.edgeEnd(src);
2666  uint64_t numEdgesL = std::distance(ee, ee_end);
2667  auto& gdst_vec = *gdst_vecs.getLocal();
2668  auto& gdata_vec = *gdata_vecs.getLocal();
2669 
2670  for (unsigned i = 0; i < numHosts; ++i) {
2671  gdst_vec[i].clear();
2672  gdata_vec[i].clear();
2673  gdst_vec[i].reserve(numEdgesL);
2674  // gdata_vec[i].reserve(numEdgesL);
2675  }
2676 
2677  for (; ee != ee_end; ++ee) {
2678  uint32_t gdst = bufGraph.edgeDestination(*ee);
2679  auto gdata = bufGraph.edgeData(*ee);
2680 
2681  uint32_t hostBelongs =
2682  graphPartitioner->getEdgeOwner(src, gdst, numEdgesL);
2683  if (_edgeStateRounds > 1) {
2684  hostLoads[hostBelongs] += 1;
2685  }
2686 
2687  if (hostBelongs == id) {
2688  // edge belongs here, construct on self
2689  assert(this->isLocal(src));
2690  uint32_t ldst = this->G2L(gdst);
2691  graph.constructEdge(curEdge++, ldst, gdata);
2692  // TODO
2693  // if ldst is an outgoing mirror, this is vertex cut
2694  } else {
2695  // add to host vector to send out later
2696  gdst_vec[hostBelongs].push_back(gdst);
2697  gdata_vec[hostBelongs].push_back(gdata);
2698  }
2699  }
2700 
2701  // make sure all edges accounted for if local
2702  if (this->isLocal(src)) {
2703  assert(curEdge == (*graph.edge_end(lsrc)));
2704  }
2705 
2706  // send
2707  for (uint32_t h = 0; h < numHosts; ++h) {
2708  if (h == id)
2709  continue;
2710 
2711  if (gdst_vec[h].size() > 0) {
2712  auto& b = (*sendBuffers.getLocal())[h];
2713  galois::runtime::gSerialize(b, src);
2714  galois::runtime::gSerialize(b, gdst_vec[h]);
2715  galois::runtime::gSerialize(b, gdata_vec[h]);
2716 
2717  // send if over limit
2718  if (b.size() > edgePartitionSendBufSize) {
2719  messagesSent += 1;
2720  bytesSent.update(b.size());
2721  maxBytesSent.update(b.size());
2722 
2723  net.sendTagged(h, galois::runtime::evilPhase, b);
2724  b.getVec().clear();
2725  b.getVec().reserve(edgePartitionSendBufSize * 1.25);
2726  }
2727  }
2728  }
2729 
2730  // overlap receives
2731  auto buffer =
2732  net.recieveTagged(galois::runtime::evilPhase, nullptr);
2733  this->processReceivedEdgeBuffer(buffer, graph, receivedNodes);
2734  },
2735 #if MORE_DIST_STATS
2736  galois::loopname("EdgeLoadingLoop"),
2737 #endif
2739  syncEdgeLoad();
2740  // printEdgeLoad();
2741  }
2742 
2743  // flush buffers
2744  for (unsigned threadNum = 0; threadNum < sendBuffers.size(); ++threadNum) {
2745  auto& sbr = *sendBuffers.getRemote(threadNum);
2746  for (unsigned h = 0; h < this->base_DistGraph::numHosts; ++h) {
2747  if (h == this->base_DistGraph::id)
2748  continue;
2749  auto& sendBuffer = sbr[h];
2750  if (sendBuffer.size() > 0) {
2751  messagesSent += 1;
2752  bytesSent.update(sendBuffer.size());
2753  maxBytesSent.update(sendBuffer.size());
2754 
2755  net.sendTagged(h, galois::runtime::evilPhase, sendBuffer);
2756  sendBuffer.getVec().clear();
2757  }
2758  }
2759  }
2760 
2761  net.flush();
2762 
2764  GRNAME, std::string("EdgeLoadingMessagesSent"), messagesSent.reduce());
2766  GRNAME, std::string("EdgeLoadingBytesSent"), bytesSent.reduce());
2768  GRNAME, std::string("EdgeLoadingMaxBytesSent"), maxBytesSent.reduce());
2769  }
2770 
2771  // no edge data version
2772  template <typename GraphTy,
2773  typename std::enable_if<std::is_void<
2774  typename GraphTy::edge_data_type>::value>::type* = nullptr>
2775  void sendEdges(GraphTy& graph,
2777  std::atomic<uint32_t>& receivedNodes) {
2778  using DstVecType = std::vector<std::vector<uint64_t>>;
2779  using SendBufferVecTy = std::vector<galois::runtime::SendBuffer>;
2780 
2782  base_DistGraph::numHosts);
2784  base_DistGraph::numHosts);
2785 
2787  const unsigned& id = this->base_DistGraph::id;
2788  const unsigned& numHosts = this->base_DistGraph::numHosts;
2789 
2790  galois::GAccumulator<uint64_t> messagesSent;
2792  galois::GReduceMax<uint64_t> maxBytesSent;
2793  messagesSent.reset();
2794  bytesSent.reset();
2795  maxBytesSent.reset();
2796 
2797  for (unsigned syncRound = 0; syncRound < _edgeStateRounds; syncRound++) {
2798  uint64_t beginNode;
2799  uint64_t endNode;
2800  std::tie(beginNode, endNode) = galois::block_range(
2802  base_DistGraph::gid2host[base_DistGraph::id].second, syncRound,
2803  _edgeStateRounds);
2804 
2805  // Go over assigned nodes and distribute edges.
2807  galois::iterate(beginNode, endNode),
2808  [&](uint64_t src) {
2809  uint32_t lsrc = 0;
2810  uint64_t curEdge = 0;
2811  if (this->isLocal(src)) {
2812  lsrc = this->G2L(src);
2813  curEdge =
2814  *graph.edge_begin(lsrc, galois::MethodFlag::UNPROTECTED);
2815  }
2816 
2817  auto ee = bufGraph.edgeBegin(src);
2818  auto ee_end = bufGraph.edgeEnd(src);
2819  uint64_t numEdgesL = std::distance(ee, ee_end);
2820  auto& gdst_vec = *gdst_vecs.getLocal();
2821 
2822  for (unsigned i = 0; i < numHosts; ++i) {
2823  gdst_vec[i].clear();
2824  // gdst_vec[i].reserve(numEdgesL);
2825  }
2826 
2827  for (; ee != ee_end; ++ee) {
2828  uint32_t gdst = bufGraph.edgeDestination(*ee);
2829  uint32_t hostBelongs =
2830  graphPartitioner->getEdgeOwner(src, gdst, numEdgesL);
2831  if (_edgeStateRounds > 1) {
2832  hostLoads[hostBelongs] += 1;
2833  }
2834 
2835  if (hostBelongs == id) {
2836  // edge belongs here, construct on self
2837  assert(this->isLocal(src));
2838  uint32_t ldst = this->G2L(gdst);
2839  graph.constructEdge(curEdge++, ldst);
2840  // TODO
2841  // if ldst is an outgoing mirror, this is vertex cut
2842  } else {
2843  // add to host vector to send out later
2844  gdst_vec[hostBelongs].push_back(gdst);
2845  }
2846  }
2847 
2848  // make sure all edges accounted for if local
2849  if (this->isLocal(src)) {
2850  assert(curEdge == (*graph.edge_end(lsrc)));
2851  }
2852 
2853  // send
2854  for (uint32_t h = 0; h < numHosts; ++h) {
2855  if (h == id)
2856  continue;
2857 
2858  if (gdst_vec[h].size() > 0) {
2859  auto& b = (*sendBuffers.getLocal())[h];
2860  galois::runtime::gSerialize(b, src);
2861  galois::runtime::gSerialize(b, gdst_vec[h]);
2862 
2863  // send if over limit
2864  if (b.size() > edgePartitionSendBufSize) {
2865  messagesSent += 1;
2866  bytesSent.update(b.size());
2867  maxBytesSent.update(b.size());
2868 
2869  net.sendTagged(h, galois::runtime::evilPhase, b);
2870  b.getVec().clear();
2871  b.getVec().reserve(edgePartitionSendBufSize * 1.25);
2872  }
2873  }
2874  }
2875 
2876  // overlap receives
2877  auto buffer =
2878  net.recieveTagged(galois::runtime::evilPhase, nullptr);
2879  this->processReceivedEdgeBuffer(buffer, graph, receivedNodes);
2880  },
2881 #if MORE_DIST_STATS
2882  galois::loopname("EdgeLoading"),
2883 #endif
2885  syncEdgeLoad();
2886  // printEdgeLoad();
2887  }
2888 
2889  // flush buffers
2890  for (unsigned threadNum = 0; threadNum < sendBuffers.size(); ++threadNum) {
2891  auto& sbr = *sendBuffers.getRemote(threadNum);
2892  for (unsigned h = 0; h < this->base_DistGraph::numHosts; ++h) {
2893  if (h == this->base_DistGraph::id)
2894  continue;
2895  auto& sendBuffer = sbr[h];
2896  if (sendBuffer.size() > 0) {
2897  messagesSent += 1;
2898  bytesSent.update(sendBuffer.size());
2899  maxBytesSent.update(sendBuffer.size());
2900 
2901  net.sendTagged(h, galois::runtime::evilPhase, sendBuffer);
2902  sendBuffer.getVec().clear();
2903  }
2904  }
2905  }
2906 
2907  net.flush();
2908 
2910  GRNAME, std::string("EdgeLoadingMessagesSent"), messagesSent.reduce());
2912  GRNAME, std::string("EdgeLoadingBytesSent"), bytesSent.reduce());
2914  GRNAME, std::string("EdgeLoadingMaxBytesSent"), maxBytesSent.reduce());
2915  }
2916 
2918  template <typename GraphTy>
2919  void processReceivedEdgeBuffer(
2920  std::optional<std::pair<uint32_t, galois::runtime::RecvBuffer>>& buffer,
2921  GraphTy& graph, std::atomic<uint32_t>& receivedNodes) {
2922  if (buffer) {
2923  auto& rb = buffer->second;
2924  while (rb.r_size() > 0) {
2925  uint64_t n;
2926  std::vector<uint64_t> gdst_vec;
2928  galois::runtime::gDeserialize(rb, gdst_vec);
2929  assert(isLocal(n));
2930  uint32_t lsrc = this->G2L(n);
2931  uint64_t cur = *graph.edge_begin(lsrc, galois::MethodFlag::UNPROTECTED);
2932  uint64_t cur_end = *graph.edge_end(lsrc);
2933  assert((cur_end - cur) == gdst_vec.size());
2934  deserializeEdges(graph, rb, gdst_vec, cur, cur_end);
2935  ++receivedNodes;
2936  }
2937  }
2938  }
2939 
2944  template <typename GraphTy>
2945  void receiveEdges(GraphTy& graph, std::atomic<uint32_t>& receivedNodes) {
2947 
2948  // receive edges for all mirror nodes
2949  while (receivedNodes < nodesToReceive) {
2950  decltype(net.recieveTagged(galois::runtime::evilPhase, nullptr)) p;
2951  p = net.recieveTagged(galois::runtime::evilPhase, nullptr);
2952  processReceivedEdgeBuffer(p, graph, receivedNodes);
2953  }
2954  }
2955 
2956  template <typename GraphTy,
2957  typename std::enable_if<!std::is_void<
2958  typename GraphTy::edge_data_type>::value>::type* = nullptr>
2959  void deserializeEdges(GraphTy& graph, galois::runtime::RecvBuffer& b,
2960  std::vector<uint64_t>& gdst_vec, uint64_t& cur,
2961  uint64_t& cur_end) {
2962  std::vector<typename GraphTy::edge_data_type> gdata_vec;
2963  galois::runtime::gDeserialize(b, gdata_vec);
2964  uint64_t i = 0;
2965  while (cur < cur_end) {
2966  auto gdata = gdata_vec[i];
2967  uint64_t gdst = gdst_vec[i++];
2968  uint32_t ldst = this->G2L(gdst);
2969  graph.constructEdge(cur++, ldst, gdata);
2970  // TODO if ldst is an outgoing mirror, this is vertex cut
2971  }
2972  }
2973 
2974  template <typename GraphTy,
2975  typename std::enable_if<std::is_void<
2976  typename GraphTy::edge_data_type>::value>::type* = nullptr>
2977  void deserializeEdges(GraphTy& graph, galois::runtime::RecvBuffer&,
2978  std::vector<uint64_t>& gdst_vec, uint64_t& cur,
2979  uint64_t& cur_end) {
2980  uint64_t i = 0;
2981  while (cur < cur_end) {
2982  uint64_t gdst = gdst_vec[i++];
2983  uint32_t ldst = this->G2L(gdst);
2984  graph.constructEdge(cur++, ldst);
2985  // TODO if ldst is an outgoing mirror, this is vertex cut
2986  }
2987  }
2988 };
2989 
2990 // make GRNAME visible to public
2991 template <typename NodeTy, typename EdgeTy, typename Partitioner>
2992 constexpr const char* const
2994 
2995 } // end namespace graphs
2996 } // end namespace galois
2997 #endif
Definition: Traits.h:247
EdgeIterator edgeEnd(uint64_t globalNodeID)
Get the index to the first edge of the node after the provided node.
Definition: BufferedGraph.h:414
Distributed sum-reducer for getting the sum of some value across multiple hosts.
Definition: DReducible.h:45
void printEdgeLoad()
Debug function: prints host loads.
Definition: NewGeneric.h:138
uint64_t numGlobalEdges
Total edges in the global unpartitioned graph.
Definition: DistributedGraph.h:80
void gInfo(Args &&...args)
Prints an info string from a sequence of things.
Definition: gIO.h:55
void increment_evilPhase()
Increments evilPhase, a phase counter used by communication.
Definition: DistributedGraph.h:133
unsigned int getActiveThreads() noexcept
Returns the number of threads in use.
Definition: Threads.cpp:37
std::pair< IterTy, IterTy > block_range(IterTy b, IterTy e, unsigned id, unsigned num)
Returns a continuous block from the range based on the number of divisions and the id of the block re...
Definition: gstl.h:244
void reset()
Definition: Reduction.h:113
Class that loads a portion of a Galois graph from disk directly into memory buffers for access...
Definition: BufferedGraph.h:49
uint64_t numGlobalNodes
Total nodes in the global unpartitioned graph.
Definition: DistributedGraph.h:79
void bitwise_or(const DynamicBitSet &other)
Definition: libgalois/include/galois/DynamicBitset.h:241
const uint32_t numHosts
Total number of machines.
Definition: DistributedGraph.h:85
EdgeIterator edgeBegin(uint64_t globalNodeID)
Get the index to the first edge of the provided node THAT THIS GRAPH HAS LOADED (not necessary the fi...
Definition: BufferedGraph.h:386
void reportStat_Tmax(const S1 &region, const S2 &category, const T &value)
Definition: Statistics.h:556
EdgeDataType edgeData(uint64_t globalEdgeID)
Get the edge data of some edge.
Definition: BufferedGraph.h:462
std::vector< uint32_t > getOffsets() const
Returns a vector containing the set bits in this bitset in order from left to right.
Definition: libgalois/include/galois/DynamicBitset.h:347
void resize(size_t n)
Definition: PODResizeableArray.h:142
Concurrent dynamically allocated bitset.
Definition: libgalois/include/galois/DynamicBitset.h:47
void reportParam(const S1 &region, const S2 &category, const V &value)
Definition: Statistics.h:616
Buffer for serialization of data.
Definition: Serialize.h:56
void resetEdgeLoad()
Reset load balance on host reducibles.
Definition: NewGeneric.h:110
void gDebug(Args &&...GALOIS_USED_ONLY_IN_DEBUG(args))
Prints a debug string from a sequence of things; prints nothing if NDEBUG is defined.
Definition: gIO.h:72
virtual std::pair< unsigned, unsigned > cartesianGrid() const
Returns Cartesian split (if it exists, else returns pair of 0s.
Definition: NewGeneric.h:103
size_t size() const
Gets number of nodes on this (local) graph.
Definition: DistributedGraph.h:646
void gDeserialize(DeSerializeBuffer &buf, T1 &&t1, Args &&...args)
Deserialize data in a buffer into a series of objects.
Definition: Serialize.h:1032
uint32_t size() const
Gets the number of global nodes in the graph.
Definition: BufferedGraph.h:294
Ty reset()
Reset the entire accumulator.
Definition: DReducible.h:153
SpecificRange< IterTy > makeSpecificRange(IterTy begin, IterTy end, const uint32_t *thread_ranges)
Creates a SpecificRange object.
Definition: Range.h:219
void transpose(const char *regionName=NULL)
Perform an in-memory transpose of the graph, replacing the original CSR to CSC.
Definition: LC_CSR_Graph.h:615
const char * loopname
Definition: Executor_ParaMeter.h:145
#define GALOIS_DIE(...)
Definition: gIO.h:96
void read_local_graph_from_file(std::string)
Read the local LC_CSR graph from the file on a disk.
Definition: DistributedGraph.h:834
uint64_t numEdges
Num edges in this graph in total.
Definition: DistributedGraph.h:82
void resetReadCounters()
Reset reading counters.
Definition: BufferedGraph.h:496
void reportStat_Tsum(const S1 &region, const S2 &category, const T &value)
Definition: Statistics.h:562
vTy & getVec()
Returns vector of data stored in this serialize buffer.
Definition: Serialize.h:115
bool test(size_t index) const
Check a bit to see if it is currently set.
Definition: libgalois/include/galois/DynamicBitset.h:192
Class that inherits from std::atomic to make it copyable by defining a copy constructor.
Definition: AtomicWrapper.h:40
uint32_t G2L(uint64_t gid) const
Definition: DistributedGraph.h:482
void determineThreadRanges()
Uses a pre-computed prefix sum to determine division of nodes among threads.
Definition: DistributedGraph.h:725
void reserve(size_t n)
Definition: PODResizeableArray.h:129
edge_iterator edge_begin(GraphNode N)
Definition: OfflineGraph.h:278
Accumulator for T where accumulation is max.
Definition: Reduction.h:189
void loadPartialGraph(const std::string &filename, uint64_t nodeStart, uint64_t nodeEnd, uint64_t edgeStart, uint64_t edgeEnd, uint64_t numGlobalNodes, uint64_t numGlobalEdges)
Given a node/edge range to load, loads the specified portion of the graph into memory buffers using r...
Definition: BufferedGraph.h:348
uint64_t count() const
Count how many bits are set in the bitset.
Definition: libgalois/include/galois/DynamicBitset.h:320
std::vector< std::vector< size_t > > mirrorNodes
Mirror nodes from different hosts. For reduce.
Definition: DistributedGraph.h:100
void resize(uint64_t n)
Resizes the bitset.
Definition: libgalois/include/galois/DynamicBitset.h:78
std::vector< std::pair< uint64_t, uint64_t > > gid2host
Information that converts host to range of nodes that host reads.
Definition: DistributedGraph.h:98
std::vector< T, Pow2Alloc< T >> Vector
[STL vector using Pow_2_VarSizeAlloc]
Definition: gstl.h:52
~NewDistGraphGeneric()
Free the graph partitioner.
Definition: NewGeneric.h:368
virtual unsigned getHostID(uint64_t gid) const
Determines which host has the master for a particular node.
Definition: NewGeneric.h:82
void reportStat_Single(const S1 &region, const S2 &category, const T &value)
Definition: Statistics.h:544
Definition: PerThreadStorage.h:88
void reset()
Gets the space taken by the bitset.
Definition: libgalois/include/galois/DynamicBitset.h:110
virtual bool isOwned(uint64_t gid) const
Determine if a node has a master on this host.
Definition: NewGeneric.h:87
Definition: OfflineGraph.h:63
size_type size() const
Returns the size of the serialize buffer.
Definition: Serialize.h:125
void constructNodes()
Definition: LC_CSR_Graph.h:570
MASTERS_DISTRIBUTION
Enums specifying how masters are to be distributed among hosts.
Definition: DistributedGraph.h:48
bool transposed
Marks if the graph is transposed or not.
Definition: DistributedGraph.h:76
void determineThreadRangesMaster()
Determines the thread ranges for master nodes only and saves them to the object.
Definition: DistributedGraph.h:736
void gPrint(Args &&...args)
Prints a sequence of things.
Definition: gIO.h:47
edge_iterator edge_end(GraphNode N)
Gets the end edge boundary of some node.
Definition: DistributedGraph.h:625
unsigned evilPhasePlus1()
Returns evilPhase + 1, handling loop around as necessary.
Definition: DistributedGraph.h:144
unsigned int activeThreads
Definition: Threads.cpp:26
Definition: Traits.h:206
boost::counting_iterator< uint64_t > edge_iterator
Definition: OfflineGraph.h:194
uint64_t getNodeOffset() const
Definition: BufferedGraph.h:304
NetworkInterface & getSystemNetworkInterface()
Get the network interface.
Definition: Network.cpp:131
size_t size() const
Gets the size of the bitset.
Definition: libgalois/include/galois/DynamicBitset.h:99
void syncEdgeLoad()
Sync load balance on hosts using reducibles.
Definition: NewGeneric.h:124
size_type size() const
Definition: PODResizeableArray.h:125
SpecificRange is a range type where a threads range is specified by an an int array that tells you wh...
Definition: Range.h:122
uint32_t numNodes
Num nodes in this graph in total.
Definition: DistributedGraph.h:81
void clear()
Definition: PODResizeableArray.h:147
Contains the implementation for DistGraph.
uint32_t numOwned
Number of nodes owned (masters) by this host.
Definition: DistributedGraph.h:89
void do_all(const RangeFunc &rangeMaker, FunctionTy &&fn, const Args &...args)
Standard do-all loop.
Definition: Loops.h:71
Definition: NewGeneric.h:47
bool set(size_t index)
Set a bit in the bitset.
Definition: libgalois/include/galois/DynamicBitset.h:206
uint64_t getBytesRead()
Returns the total number of bytes read from this graph so far.
Definition: BufferedGraph.h:508
uint32_t numNodesWithEdges
Number of nodes (masters + mirrors) that have outgoing edges.
Definition: DistributedGraph.h:94
void start()
Definition: Timer.cpp:82
void on_each(FunctionTy &&fn, const Args &...args)
Low-level parallel loop.
Definition: Loops.h:86
virtual bool is_vertex_cut() const
Returns true if current partition is a vertex cut.
Definition: NewGeneric.h:102
uint64_t computeMasters(MASTERS_DISTRIBUTION masters_distribution, galois::graphs::OfflineGraph &g, const std::vector< unsigned > &scalefactor, uint32_t nodeWeight=0, uint32_t edgeWeight=0, unsigned DecomposeFactor=1)
Wrapper call that will call into more specific compute masters functions that compute masters based o...
Definition: DistributedGraph.h:364
void initializeSpecificRanges()
Initializes the 3 range objects that a user can access to iterate over the graph in different ways...
Definition: DistributedGraph.h:785
uint64_t get_usec() const
Definition: Timer.cpp:92
NewDistGraphGeneric(const std::string &filename, unsigned host, unsigned _numHosts, bool cuspAsync=true, uint32_t stateRounds=100, bool transpose=false, galois::graphs::MASTERS_DISTRIBUTION md=BALANCED_EDGES_OF_MASTERS, uint32_t nodeWeight=0, uint32_t edgeWeight=0, std::string masterBlockFile="", bool readFromFile=false, std::string localGraphFileName="local_graph", uint32_t edgeStateRounds=1)
Constructor.
Definition: NewGeneric.h:154
T & reduce()
Returns the final reduction value.
Definition: Reduction.h:102
size_t size() const
Definition: OfflineGraph.h:271
void readersFromFile(galois::graphs::OfflineGraph &g, std::string filename)
reader assignment from a file corresponds to master assignment if using an edge cut ...
Definition: DistributedGraph.h:404
std::unordered_map< uint64_t, uint32_t > globalToLocalMap
LID = globalToLocalMap[GID].
Definition: DistributedGraph.h:105
uint32_t evilPhase
Variable that keeps track of which network send/recv phase a program is currently on...
Definition: Network.cpp:36
Base DistGraph class that all distributed graphs extend from.
Definition: DistributedGraph.h:64
Buffer for deserialization of data.
Definition: Serialize.h:147
auto iterate(C &cont)
Definition: Range.h:323
This is a container that encapsulates a resizeable array of plain-old-datatype (POD) elements...
Definition: PODResizeableArray.h:40
void update(T &&rhs)
Updates the thread local value by applying the reduction operator to current and newly provided value...
Definition: Reduction.h:90
void resetAndFree()
Free all of the in memory buffers in this object and reset graph status.
Definition: BufferedGraph.h:516
std::vector< uint64_t > localToGlobalVector
GID = localToGlobalVector[LID].
Definition: DistributedGraph.h:103
const unsigned id
ID of the machine.
Definition: DistributedGraph.h:84
void allocateFrom(const FileGraph &graph)
Definition: LC_CSR_Graph.h:513
size_t sizeEdges() const
Definition: OfflineGraph.h:272
Definition: Timer.h:88
uint64_t edgeDestination(uint64_t globalEdgeID)
Get the global node id of the destination of the provided edge.
Definition: BufferedGraph.h:437
EdgeTy edge_data_type
Definition: LC_CSR_Graph.h:147
std::vector< uint32_t > determineUnitRangesFromPrefixSum(uint32_t unitsToSplit, VectorTy &edgePrefixSum, uint32_t nodeAlpha=0)
Uses the divideByNode function (which is binary search based) to divide nodes among units using a pro...
Definition: GraphHelpers.h:484
virtual bool isLocal(uint64_t gid) const
Determine if a node has a proxy on this host.
Definition: NewGeneric.h:92
void stop()
Definition: Timer.cpp:87
Implements distributed reducible objects for easy reduction of values across a distributed system...
Galois Timer that automatically reports stats upon destruction Provides statistic interface around ti...
Definition: Timer.h:63
GraphTy graph
The internal graph used by DistGraph to represent the graph.
Definition: DistributedGraph.h:73
void determineThreadRangesWithEdges()
Determines the thread ranges for nodes with edges only and saves them to the object.
Definition: DistributedGraph.h:762
uint32_t beginMaster
Local id of the beginning of master nodes.
Definition: DistributedGraph.h:91
balance edges
Definition: DistributedGraph.h:52