Galois
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
MiningPartitioner.h
Go to the documentation of this file.
1 /*
2  * This file belongs to the Galois project, a C++ library for exploiting
3  * parallelism. The code is being released under the terms of the 3-Clause BSD
4  * License (a copy is located in LICENSE.txt at the top-level directory).
5  *
6  * Copyright (C) 2019, The University of Texas at Austin. All rights reserved.
7  * UNIVERSITY EXPRESSLY DISCLAIMS ANY AND ALL WARRANTIES CONCERNING THIS
8  * SOFTWARE AND DOCUMENTATION, INCLUDING ANY WARRANTIES OF MERCHANTABILITY,
9  * FITNESS FOR ANY PARTICULAR PURPOSE, NON-INFRINGEMENT AND WARRANTIES OF
10  * PERFORMANCE, AND ANY WARRANTY THAT MIGHT OTHERWISE ARISE FROM COURSE OF
11  * DEALING OR USAGE OF TRADE. NO WARRANTY IS EITHER EXPRESS OR IMPLIED WITH
12  * RESPECT TO THE USE OF THE SOFTWARE OR DOCUMENTATION. Under no circumstances
13  * shall University be liable for incidental, special, indirect, direct or
14  * consequential damages or loss of profits, interruption of business, or
15  * related expenses which may arise from use of Software or Documentation,
16  * including but not limited to those resulting from defects in Software and/or
17  * Documentation, or loss or inaccuracy of data of any kind.
18  */
19 
30 #ifndef _GALOIS_DIST_MINING_H
31 #define _GALOIS_DIST_MINING_H
32 
34 #include "galois/DReducible.h"
35 
36 namespace galois {
37 namespace graphs {
45 template <typename NodeTy, typename EdgeTy, typename Partitioner>
46 class MiningGraph : public DistGraph<NodeTy, EdgeTy> {
48  constexpr static unsigned edgePartitionSendBufSize = 8388608;
49  constexpr static const char* const GRNAME = "dGraph_Mining";
50  Partitioner* graphPartitioner;
51 
52  uint32_t G2LEdgeCut(uint64_t gid, uint32_t globalOffset) const {
53  assert(isLocal(gid));
54  // optimized for edge cuts
55  if (gid >= globalOffset && gid < globalOffset + base_DistGraph::numOwned)
56  return gid - globalOffset;
57 
58  return base_DistGraph::globalToLocalMap.at(gid);
59  }
60 
64  template <typename V>
65  void freeVector(V& vectorToKill) {
66  V dummyVector;
67  vectorToKill.swap(dummyVector);
68  }
69 
70  uint32_t nodesToReceive;
71 
72  uint64_t myKeptEdges;
73  uint64_t myReadEdges;
74  uint64_t globalKeptEdges;
75  uint64_t totalEdgeProxies;
76 
77  std::vector<std::vector<size_t>> mirrorEdges;
78  std::unordered_map<uint64_t, uint64_t> localEdgeGIDToLID;
79 
80  std::vector<uint64_t> getNodeDegrees(const std::string filename,
81  uint32_t numNodes) {
82  std::vector<uint64_t> nodeDegrees;
83  nodeDegrees.resize(numNodes);
84 
85  // read in prefix sum from GR on disk
86  std::ifstream graphFile(filename.c_str());
87  graphFile.seekg(sizeof(uint64_t) * 4);
88 
89  uint64_t* outIndexBuffer = (uint64_t*)malloc(sizeof(uint64_t) * numNodes);
90  if (outIndexBuffer == nullptr) {
91  GALOIS_DIE("out of memory");
92  }
93  uint64_t numBytesToLoad = numNodes * sizeof(uint64_t);
94  uint64_t bytesRead = 0;
95 
96  while (numBytesToLoad > 0) {
97  graphFile.read(((char*)outIndexBuffer) + bytesRead, numBytesToLoad);
98  size_t numRead = graphFile.gcount();
99  numBytesToLoad -= numRead;
100  bytesRead += numRead;
101  }
102  assert(numBytesToLoad == 0);
103 
105  galois::iterate(0u, numNodes),
106  [&](unsigned n) {
107  if (n != 0) {
108  nodeDegrees[n] = outIndexBuffer[n] - outIndexBuffer[n - 1];
109  } else {
110  nodeDegrees[n] = outIndexBuffer[0];
111  }
112  // galois::gDebug(n, " degree ", nodeDegrees[n]);
113  },
114  galois::loopname("GetNodeDegrees"), galois::no_stats());
115  free(outIndexBuffer);
116 
117 #ifndef NDEBUG
118  if (base_DistGraph::id == 0) {
119  galois::gDebug("Sanity checking node degrees");
120  }
121 
124  galois::iterate(0u, numNodes),
125  [&](unsigned n) { edgeCount += nodeDegrees[n]; },
126  galois::loopname("SanityCheckDegrees"), galois::no_stats());
128 #endif
129 
130  return nodeDegrees;
131  }
132 
133 public:
136 
140  uint64_t numOwnedEdges() const { return myKeptEdges; }
141 
145  uint64_t globalEdges() const { return globalKeptEdges; }
146 
147  std::vector<std::vector<size_t>>& getMirrorEdges() { return mirrorEdges; }
148 
154  unsigned getHostReader(uint64_t gid) const {
155  for (auto i = 0U; i < base_DistGraph::numHosts; ++i) {
156  uint64_t start, end;
157  std::tie(start, end) = base_DistGraph::gid2host[i];
158  if (gid >= start && gid < end) {
159  return i;
160  }
161  }
162  return -1;
163  }
164 
165  virtual unsigned getHostID(uint64_t gid) const {
166  assert(gid < base_DistGraph::numGlobalNodes);
167  return graphPartitioner->retrieveMaster(gid);
168  }
169 
170  virtual bool isOwned(uint64_t gid) const {
171  assert(gid < base_DistGraph::numGlobalNodes);
172  return (graphPartitioner->retrieveMaster(gid) == base_DistGraph::id);
173  }
174 
175  virtual bool isLocal(uint64_t gid) const {
176  assert(gid < base_DistGraph::numGlobalNodes);
177  return (base_DistGraph::globalToLocalMap.find(gid) !=
179  }
180 
185  const std::string& filename, unsigned host, unsigned _numHosts,
186  bool setupGluon = true, bool doSort = false,
188  uint32_t nodeWeight = 0, uint32_t edgeWeight = 0)
189  : base_DistGraph(host, _numHosts) {
190  galois::runtime::reportParam(GRNAME, "MiningGraph", "0");
192  "GraphPartitioningTime", GRNAME);
193  Tgraph_construct.start();
194 
196 
197  galois::graphs::OfflineGraph g(filename);
200  std::vector<unsigned> dummy;
201 
202  // not actually getting masters, but getting assigned readers for nodes
203  base_DistGraph::computeMasters(md, g, dummy, nodeWeight, edgeWeight);
204 
205  std::vector<uint64_t> ndegrees;
206 
207  if (Partitioner::needNodeDegrees()) {
208  if (base_DistGraph::id == 0) {
209  galois::gInfo("Calculating node degrees for partitioner");
210  }
211 
212  galois::runtime::reportParam(GRNAME, "UsingDegreeOrdering", "1");
213  ndegrees = getNodeDegrees(filename, base_DistGraph::numGlobalNodes);
214  }
215 
216  graphPartitioner =
217  new Partitioner(host, _numHosts, base_DistGraph::numGlobalNodes,
219  graphPartitioner->saveGIDToHost(base_DistGraph::gid2host);
220 
222 
223  uint64_t nodeBegin = base_DistGraph::gid2host[base_DistGraph::id].first;
225  g.edge_begin(nodeBegin);
226  uint64_t nodeEnd = base_DistGraph::gid2host[base_DistGraph::id].second;
228  g.edge_begin(nodeEnd);
229 
230  galois::gPrint("[", base_DistGraph::id, "] Starting graph reading.\n");
231  // never read edge data from disk
233  bufGraph.resetReadCounters();
234  galois::StatTimer graphReadTimer("GraphReading", GRNAME);
235  graphReadTimer.start();
236  bufGraph.loadPartialGraph(filename, nodeBegin, nodeEnd, *edgeBegin,
239  graphReadTimer.stop();
240  galois::gPrint("[", base_DistGraph::id, "] Reading graph complete.\n");
241 
243 
244  galois::StatTimer inspectionTimer("EdgeInspection", GRNAME);
245  inspectionTimer.start();
246  bufGraph.resetReadCounters();
247  galois::gstl::Vector<uint64_t> prefixSumOfEdges;
248  base_DistGraph::numOwned = nodeEnd - nodeBegin;
249  prefixSumOfEdges.resize(base_DistGraph::numOwned);
250 
251  // initial pass; set up lid-gid mappings, determine which proxies exist on
252  // this host; prefix sum of edges cna be set up up to the last master
253  // node
254  galois::DynamicBitSet presentProxies =
255  edgeInspectionRound1(bufGraph, prefixSumOfEdges);
256  // set my read nodes on present proxies
257  // TODO parallel?
258  for (uint64_t i = nodeBegin; i < nodeEnd; i++) {
259  presentProxies.set(i);
260  }
261 
262  // vector to store bitsets received from other hosts
263  std::vector<galois::DynamicBitSet> proxiesOnOtherHosts;
264  proxiesOnOtherHosts.resize(_numHosts);
265 
266  // send off mirror proxies that exist on this host to other hosts
267  communicateProxyInfo(presentProxies, proxiesOnOtherHosts);
268 
269  // signifies how many outgoing edges a particular host should expect from
270  // this host
271  std::vector<std::vector<uint64_t>> numOutgoingEdges;
272  numOutgoingEdges.resize(base_DistGraph::numHosts);
273  // edge inspection phase 2: determine how many edges to send to each host
274  // don't actually send yet
275  edgeInspectionRound2(bufGraph, numOutgoingEdges, proxiesOnOtherHosts);
276 
277  // prefix sum finalization
278  finalizePrefixSum(numOutgoingEdges, prefixSumOfEdges);
279 
280  // doubly make sure the data is cleared
281  freeVector(numOutgoingEdges); // should no longer use this variable
282  inspectionTimer.stop();
283 
285 
286  galois::StatTimer allocationTimer("GraphAllocation", GRNAME);
287  allocationTimer.start();
288 
289  // Graph construction related calls
291  // Allocate and construct the graph
295 
296  // edge end fixing
297  auto& base_graph = base_DistGraph::graph;
300  [&](uint64_t n) { base_graph.fixEndEdge(n, prefixSumOfEdges[n]); },
301 #if MORE_DIST_STATS
302  galois::loopname("FixEndEdgeLoop"),
303 #endif
304  galois::no_stats());
305  // get memory from prefix sum back
306  prefixSumOfEdges.clear();
307  freeVector(prefixSumOfEdges); // should no longer use this variable
308 
309  allocationTimer.stop();
310 
312 
313  if (setupGluon) {
314  galois::CondStatTimer<MORE_DIST_STATS> TfillMirrors("FillMirrors",
315  GRNAME);
316 
317  TfillMirrors.start();
318  fillMirrors();
319  TfillMirrors.stop();
320  }
321 
323 
324  loadEdges(base_DistGraph::graph, bufGraph, proxiesOnOtherHosts);
325  // TODO this might be useful to keep around
326  proxiesOnOtherHosts.clear();
327  ndegrees.clear();
328 
329  // SORT EDGES
330  if (doSort) {
332  }
333 
334  if (setupGluon) {
335  galois::CondStatTimer<MORE_DIST_STATS> TfillMirrorsEdges(
336  "FillMirrorsEdges", GRNAME);
337  TfillMirrorsEdges.start();
338  // edges
339  mirrorEdges.resize(base_DistGraph::numHosts);
341  "] Filling mirrors and creating "
342  "mirror map\n");
343  fillMirrorsEdgesAndCreateMirrorMap();
344  TfillMirrorsEdges.stop();
345  }
346 
348 
349  galois::CondStatTimer<MORE_DIST_STATS> Tthread_ranges("ThreadRangesTime",
350  GRNAME);
351 
352  galois::gPrint("[", base_DistGraph::id, "] Determining thread ranges\n");
353 
354  Tthread_ranges.start();
359  Tthread_ranges.stop();
360 
361  Tgraph_construct.stop();
362  galois::gPrint("[", base_DistGraph::id, "] Graph construction complete.\n");
363 
365  accumer.reset();
366  accumer += base_DistGraph::sizeEdges();
367  totalEdgeProxies = accumer.reduce();
368 
369  uint64_t totalNodeProxies;
370  accumer.reset();
371  accumer += base_DistGraph::size();
372  totalNodeProxies = accumer.reduce();
373 
374  // report some statistics
375  if (base_DistGraph::id == 0) {
377  GRNAME, std::string("TotalNodeProxies"), totalNodeProxies);
379  GRNAME, std::string("TotalEdgeProxies"), totalEdgeProxies);
381  std::string("OriginalNumberEdges"),
383  galois::runtime::reportStat_Single(GRNAME, std::string("TotalKeptEdges"),
384  globalKeptEdges);
385  GALOIS_ASSERT(globalKeptEdges * 2 == base_DistGraph::globalSizeEdges());
387  GRNAME, std::string("ReplicationFactorNodes"),
388  (totalNodeProxies) / (double)base_DistGraph::globalSize());
390  GRNAME, std::string("ReplicatonFactorEdges"),
391  (totalEdgeProxies) / (double)globalKeptEdges);
392  }
393  }
394 
398  ~MiningGraph() { delete graphPartitioner; }
399 
400 private:
402  edgeInspectionRound1(galois::graphs::BufferedGraph<void>& bufGraph,
403  galois::gstl::Vector<uint64_t>& prefixSumOfEdges) {
404  galois::DynamicBitSet incomingMirrors;
405  incomingMirrors.resize(base_DistGraph::numGlobalNodes);
406  incomingMirrors.reset();
407 
408  uint32_t myID = base_DistGraph::id;
409  uint64_t globalOffset = base_DistGraph::gid2host[base_DistGraph::id].first;
410 
411  // already set before this is called
413 
415  keptEdges.reset();
416 
418  allEdges.reset();
419 
424  [&](size_t n) {
425  uint64_t edgeCount = 0;
426  auto ii = bufGraph.edgeBegin(n);
427  auto ee = bufGraph.edgeEnd(n);
428  allEdges += std::distance(ii, ee);
429  for (; ii < ee; ++ii) {
430  uint32_t dst = bufGraph.edgeDestination(*ii);
431 
432  if (graphPartitioner->keepEdge(n, dst)) {
433  edgeCount++;
434  keptEdges += 1;
435  // which mirrors do I have
436  if (graphPartitioner->retrieveMaster(dst) != myID) {
437  incomingMirrors.set(dst);
438  }
439  }
440  }
441  prefixSumOfEdges[n - globalOffset] = edgeCount;
442  ltgv[n - globalOffset] = n;
443  },
444 #if MORE_DIST_STATS
445  galois::loopname("EdgeInspectionLoop"),
446 #endif
448 
449  myKeptEdges = keptEdges.read_local();
450  myReadEdges = allEdges.reduce();
451  globalKeptEdges = keptEdges.reduce();
452 
453  // get incoming mirrors ready for creation
454  uint32_t additionalMirrorCount = incomingMirrors.count();
456  base_DistGraph::localToGlobalVector.size() + additionalMirrorCount);
457 
458  // note prefix sum will get finalized in a later step
459  if (base_DistGraph::numOwned > 0) {
460  prefixSumOfEdges.resize(prefixSumOfEdges.size() + additionalMirrorCount,
461  0);
462  } else {
463  prefixSumOfEdges.resize(additionalMirrorCount, 0);
464  }
465 
466  // map creation: lid to gid
467  if (additionalMirrorCount > 0) {
468  uint32_t totalNumNodes = base_DistGraph::numGlobalNodes;
470  std::vector<uint64_t> threadPrefixSums(activeThreads);
471  galois::on_each([&](unsigned tid, unsigned nthreads) {
472  size_t beginNode;
473  size_t endNode;
474  std::tie(beginNode, endNode) =
475  galois::block_range(0u, totalNumNodes, tid, nthreads);
476  uint64_t count = 0;
477  for (size_t i = beginNode; i < endNode; i++) {
478  if (incomingMirrors.test(i))
479  ++count;
480  }
481  threadPrefixSums[tid] = count;
482  });
483  // get prefix sums
484  for (unsigned int i = 1; i < threadPrefixSums.size(); i++) {
485  threadPrefixSums[i] += threadPrefixSums[i - 1];
486  }
487 
488  assert(threadPrefixSums.back() == additionalMirrorCount);
489 
490  uint32_t startingNodeIndex = base_DistGraph::numOwned;
491  // do actual work, second on_each
492  galois::on_each([&](unsigned tid, unsigned nthreads) {
493  size_t beginNode;
494  size_t endNode;
495  std::tie(beginNode, endNode) =
496  galois::block_range(0u, totalNumNodes, tid, nthreads);
497  // start location to start adding things into prefix sums/vectors
498  uint32_t threadStartLocation = 0;
499  if (tid != 0) {
500  threadStartLocation = threadPrefixSums[tid - 1];
501  }
502  uint32_t handledNodes = 0;
503  for (size_t i = beginNode; i < endNode; i++) {
504  if (incomingMirrors.test(i)) {
505  base_DistGraph::localToGlobalVector[startingNodeIndex +
506  threadStartLocation +
507  handledNodes] = i;
508  handledNodes++;
509  }
510  }
511  });
512  }
513 
514  base_DistGraph::numNodes = base_DistGraph::numOwned + additionalMirrorCount;
518 
519  // g2l mapping
521  for (unsigned i = 0; i < base_DistGraph::numNodes; i++) {
522  // global to local map construction
524  i;
525  }
526  assert(base_DistGraph::globalToLocalMap.size() == base_DistGraph::numNodes);
527 
528  return incomingMirrors;
529  }
530 
537  void communicateProxyInfo(
538  galois::DynamicBitSet& presentProxies,
539  std::vector<galois::DynamicBitSet>& proxiesOnOtherHosts) {
541  // Send proxies on this host to other hosts
542  for (unsigned h = 0; h < base_DistGraph::numHosts; ++h) {
543  if (h != base_DistGraph::id) {
544  galois::runtime::SendBuffer bitsetBuffer;
545  galois::runtime::gSerialize(bitsetBuffer, presentProxies);
546  net.sendTagged(h, galois::runtime::evilPhase, bitsetBuffer);
547  }
548  }
549 
550  // receive loop
551  for (unsigned h = 0; h < net.Num - 1; h++) {
552  decltype(net.recieveTagged(galois::runtime::evilPhase, nullptr)) p;
553  do {
554  p = net.recieveTagged(galois::runtime::evilPhase, nullptr);
555  } while (!p);
556  uint32_t sendingHost = p->first;
557  // deserialize proxiesOnOtherHosts
559  proxiesOnOtherHosts[sendingHost]);
560  }
561 
563  }
564 
565  void edgeInspectionRound2(
567  std::vector<std::vector<uint64_t>>& numOutgoingEdges,
568  std::vector<galois::DynamicBitSet>& proxiesOnOtherHosts) {
570  // allocate vectors for counting edges that must be sent
571  // number of nodes that this host has read from disk
572  uint32_t numRead = base_DistGraph::gid2host[base_DistGraph::id].second -
574  // allocate space for outgoing edges
575  for (uint32_t i = 0; i < base_DistGraph::numHosts; ++i) {
576  numOutgoingEdges[i].assign(numRead, 0);
577  }
578  uint64_t globalOffset = base_DistGraph::gid2host[base_DistGraph::id].first;
579 
580  galois::DynamicBitSet hostHasOutgoing;
581  hostHasOutgoing.resize(base_DistGraph::numHosts);
582  hostHasOutgoing.reset();
583 
584  // flip loop order, this can be optimized
585  // for each host, loop over my local nodes
589  [&](size_t n) {
590  auto ii = bufGraph.edgeBegin(n);
591  auto ee = bufGraph.edgeEnd(n);
592 
593  for (; ii < ee; ++ii) {
594  uint32_t dst = bufGraph.edgeDestination(*ii);
595  // make sure this edge is going to be kept and not dropped
596  if (graphPartitioner->keepEdge(n, dst)) {
597  for (unsigned h = 0; h < net.Num; h++) {
598  if (h != net.ID) {
599  if (proxiesOnOtherHosts[h].test(n)) {
600  // if kept, make sure destination exists on that host
601  if (proxiesOnOtherHosts[h].test(dst)) {
602  // if it does, this edge must be duplicated on that host;
603  // increment count
604  numOutgoingEdges[h][n - globalOffset] += 1;
605  hostHasOutgoing.set(h);
606  }
607  }
608  }
609  }
610  }
611  }
612  },
613 #if MORE_DIST_STATS
614  galois::loopname("EdgeInspectionRound2Loop"),
615 #endif
617 
618  // send data off, then receive it
619  sendInspectionData(numOutgoingEdges, hostHasOutgoing);
620  recvInspectionData(numOutgoingEdges);
622  }
623 
632  void sendInspectionData(std::vector<std::vector<uint64_t>>& numOutgoingEdges,
633  galois::DynamicBitSet& hostHasOutgoing) {
635 
637  bytesSent.reset();
638 
639  for (unsigned h = 0; h < net.Num; h++) {
640  if (h == net.ID) {
641  continue;
642  }
643 
644  // send outgoing edges data off to comm partner
646 
647  // only send if non-zeros exist
648  if (hostHasOutgoing.test(h)) {
649  galois::runtime::gSerialize(b, 1); // token saying data exists
650  galois::runtime::gSerialize(b, numOutgoingEdges[h]);
651  } else {
652  galois::runtime::gSerialize(b, 0); // token saying no data exists
653  }
654  numOutgoingEdges[h].clear();
655 
656  bytesSent.update(b.size());
657 
658  // send buffer and free memory
659  net.sendTagged(h, galois::runtime::evilPhase, b);
660  b.getVec().clear();
661  }
663  GRNAME, std::string("EdgeInspectionBytesSent"), bytesSent.reduce());
664 
665  galois::gPrint("[", base_DistGraph::id, "] Inspection sends complete.\n");
666  }
667 
675  void
676  recvInspectionData(std::vector<std::vector<uint64_t>>& numOutgoingEdges) {
678 
679  for (unsigned h = 0; h < net.Num - 1; h++) {
680  // expect data from comm partner back
681  decltype(net.recieveTagged(galois::runtime::evilPhase, nullptr)) p;
682  do {
683  p = net.recieveTagged(galois::runtime::evilPhase, nullptr);
684  } while (!p);
685 
686  uint32_t sendingHost = p->first;
687 
688  // get outgoing edges; first get status var
689  uint32_t outgoingExists = 2;
690  galois::runtime::gDeserialize(p->second, outgoingExists);
691 
692  if (outgoingExists == 1) {
693  // actual data sent
694  galois::runtime::gDeserialize(p->second, numOutgoingEdges[sendingHost]);
695  } else if (outgoingExists == 0) {
696  // no data sent; just clear again
697  numOutgoingEdges[sendingHost].clear();
698  } else {
699  GALOIS_DIE("unreachable: ", outgoingExists);
700  }
701  }
702 
704  "] Inspection receives complete.\n");
705  }
706 
712  finalizePrefixSum(std::vector<std::vector<uint64_t>>& numOutgoingEdges,
713  galois::gstl::Vector<uint64_t>& prefixSumOfEdges) {
715 
716  inspectOutgoingNodes(numOutgoingEdges, prefixSumOfEdges);
717  finalizeInspection(prefixSumOfEdges);
719  "] To receive this many nodes: ", nodesToReceive);
721  "] Inspection allocation complete.\n");
722  return prefixSumOfEdges;
723  }
724 
729  void
730  inspectOutgoingNodes(std::vector<std::vector<uint64_t>>& numOutgoingEdges,
731  galois::gstl::Vector<uint64_t>& prefixSumOfEdges) {
733  toReceive.reset();
734 
735  uint32_t proxyStart = base_DistGraph::numOwned;
736  uint32_t proxyEnd = base_DistGraph::numNodes;
737  assert(proxyEnd == prefixSumOfEdges.size());
738 
739  galois::GAccumulator<uint64_t> edgesToReceive;
740  edgesToReceive.reset();
741 
742  // loop over proxy nodes, see if edges need to be sent from another host
743  // by looking at results of edge inspection
745  galois::iterate(proxyStart, proxyEnd),
746  [&](uint32_t lid) {
747  uint64_t gid = base_DistGraph::localToGlobalVector[lid];
748  assert(gid < base_DistGraph::numGlobalNodes);
749  unsigned hostReader = getHostReader(gid);
750  assert(hostReader < base_DistGraph::numHosts);
751  assert(hostReader != base_DistGraph::id); // self shouldn't be proxy
752 
753  uint64_t nodeOffset = base_DistGraph::gid2host[hostReader].first;
754  if (numOutgoingEdges[hostReader].size()) {
755  if (numOutgoingEdges[hostReader][gid - nodeOffset]) {
756  // if this host is going to send me edges, note it for future use
757  prefixSumOfEdges[lid] =
758  numOutgoingEdges[hostReader][gid - nodeOffset];
759  edgesToReceive += numOutgoingEdges[hostReader][gid - nodeOffset];
760  toReceive += 1;
761  }
762  }
763  },
764  galois::loopname("OutgoingNodeInspection"), galois::steal(),
765  galois::no_stats());
766 
767  galois::gPrint("[", base_DistGraph::id, "] Need receive ",
768  edgesToReceive.reduce(), " edges; self is ", myKeptEdges,
769  "\n");
770  // get memory back
771  numOutgoingEdges.clear();
772  nodesToReceive = toReceive.reduce();
773  }
774 
778  void finalizeInspection(galois::gstl::Vector<uint64_t>& prefixSumOfEdges) {
779  for (unsigned i = 1; i < base_DistGraph::numNodes; i++) {
780  // finalize prefix sum
781  prefixSumOfEdges[i] += prefixSumOfEdges[i - 1];
782  }
783  if (prefixSumOfEdges.size() != 0) {
784  base_DistGraph::numEdges = prefixSumOfEdges.back();
785  } else {
787  }
788  }
789 
791 public:
798  galois::StatTimer mapConstructTimer("GID2LIDMapConstructTimer", GRNAME);
799  mapConstructTimer.start();
800 
801  localEdgeGIDToLID.reserve(base_DistGraph::sizeEdges());
802 
803  uint64_t count = 0;
804  for (unsigned src = 0; src < base_DistGraph::size(); src++) {
805  for (auto edge = base_DistGraph::edge_begin(src);
806  edge != base_DistGraph::edge_end(src); edge++) {
807  assert((*edge) == count);
808  unsigned dst = base_DistGraph::getEdgeDst(edge);
809  uint64_t localGID = getEdgeGIDFromSD(src, dst);
810  // insert into map
811  localEdgeGIDToLID.insert(std::make_pair(localGID, count));
812  count++;
813  }
814  }
815 
816  GALOIS_ASSERT(localEdgeGIDToLID.size() == base_DistGraph::sizeEdges());
818 
819  mapConstructTimer.stop();
820  }
821 
823  galois::runtime::reportStat_Single(GRNAME, std::string("MapAccessesBefore"),
825  }
826 
827  void reportAccess() {
828  galois::runtime::reportStat_Single(GRNAME, std::string("MapAccesses"),
830  }
831 
838  std::pair<uint64_t, bool> getLIDFromMap(unsigned src, unsigned dst) {
839  lgMapAccesses += 1;
840  // try to find gid in map
841  uint64_t localGID = getEdgeGIDFromSD(src, dst);
842  auto findResult = localEdgeGIDToLID.find(localGID);
843 
844  // return if found, else return a false
845  if (findResult != localEdgeGIDToLID.end()) {
846  return std::make_pair(findResult->second, true);
847  } else {
848  // not found
849  return std::make_pair((uint64_t)-1, false);
850  }
851  }
852 
853  uint64_t getEdgeLID(uint64_t gid) {
854  uint64_t sourceNodeGID = edgeGIDToSource(gid);
855  uint64_t sourceNodeLID = base_DistGraph::getLID(sourceNodeGID);
856  uint64_t destNodeLID = base_DistGraph::getLID(edgeGIDToDest(gid));
857 
858  for (auto edge : base_DistGraph::edges(sourceNodeLID)) {
859  uint64_t edgeDst = base_DistGraph::getEdgeDst(edge);
860  if (edgeDst == destNodeLID) {
861  return *edge;
862  }
863  }
864  GALOIS_DIE("unreachable");
865  return (uint64_t)-1;
866  }
867 
868  uint32_t findSourceFromEdge(uint64_t lid) {
869  // TODO binary search
870  // uint32_t left = 0;
871  // uint32_t right = base_DistGraph::numNodes;
872  // uint32_t mid = (left + right) / 2;
873 
874  for (uint32_t mid = 0; mid < base_DistGraph::numNodes; mid++) {
875  uint64_t edge_left = *(base_DistGraph::edge_begin(mid));
876  uint64_t edge_right = *(base_DistGraph::edge_begin(mid + 1));
877 
878  if (edge_left <= lid && lid < edge_right) {
879  return mid;
880  }
881  }
882 
883  GALOIS_DIE("unreachable");
884  return (uint32_t)-1;
885  }
886 
887  uint64_t getEdgeGID(uint64_t lid) {
888  uint32_t src = base_DistGraph::getGID(findSourceFromEdge(lid));
890  return getEdgeGIDFromSD(src, dst);
891  }
892 
893 private:
894  // https://www.quora.com/
895  // Is-there-a-mathematical-function-that-converts-two-numbers-into-one-so-
896  // that-the-two-numbers-can-always-be-extracted-again
897  // GLOBAL IDS ONLY
898  uint64_t getEdgeGIDFromSD(uint32_t source, uint32_t dest) {
899  return source + (dest % base_DistGraph::numGlobalNodes) *
901  }
902 
903  uint64_t edgeGIDToSource(uint64_t gid) {
904  return gid % base_DistGraph::numGlobalNodes;
905  }
906 
907  uint64_t edgeGIDToDest(uint64_t gid) {
908  // assuming this floors
909  return gid / base_DistGraph::numGlobalNodes;
910  }
911 
916  void fillMirrors() {
917  base_DistGraph::mirrorNodes.reserve(base_DistGraph::numNodes -
919  for (uint32_t i = base_DistGraph::numOwned; i < base_DistGraph::numNodes;
920  i++) {
921  uint32_t globalID = base_DistGraph::localToGlobalVector[i];
922  base_DistGraph::mirrorNodes[graphPartitioner->retrieveMaster(globalID)]
923  .push_back(globalID);
924  }
925  }
926 
927  void fillMirrorsEdgesAndCreateMirrorMap() {
928  for (uint32_t src = base_DistGraph::numOwned;
929  src < base_DistGraph::numNodes; src++) {
930  auto ee = base_DistGraph::edge_begin(src);
931  auto ee_end = base_DistGraph::edge_end(src);
932  uint32_t globalSource = base_DistGraph::getGID(src);
933  unsigned sourceOwner = graphPartitioner->retrieveMaster(globalSource);
934 
935  for (; ee != ee_end; ++ee) {
936  // create mirror array
937  uint64_t edgeGID = getEdgeGIDFromSD(
938  globalSource,
940  mirrorEdges[sourceOwner].push_back(edgeGID);
941  }
942  }
943  }
944 
946 
947  template <typename GraphTy>
948  void loadEdges(GraphTy& graph, galois::graphs::BufferedGraph<void>& bufGraph,
949  std::vector<galois::DynamicBitSet>& proxiesOnOtherHosts) {
950  galois::StatTimer loadEdgeTimer("EdgeLoading", GRNAME);
951  loadEdgeTimer.start();
952 
953  bufGraph.resetReadCounters();
954  std::atomic<uint32_t> receivedNodes;
955  receivedNodes.store(0);
956 
957  // sends data
958  sendEdges(graph, bufGraph, receivedNodes, proxiesOnOtherHosts);
959  // uint64_t bufBytesRead = bufGraph.getBytesRead();
960  // get data from graph back (don't need it after sending things out)
961  bufGraph.resetAndFree();
962 
963  // receives data
965  [&](unsigned GALOIS_UNUSED(tid), unsigned GALOIS_UNUSED(nthreads)) {
966  receiveEdges(graph, receivedNodes);
967  });
969  loadEdgeTimer.stop();
970 
971  galois::gPrint("[", base_DistGraph::id, "] Edge loading time: ",
972  loadEdgeTimer.get_usec() / 1000000.0f, " seconds\n");
973  }
974 
975  // no edge data version
976  template <typename GraphTy>
977  void sendEdges(GraphTy& graph, galois::graphs::BufferedGraph<void>& bufGraph,
978  std::atomic<uint32_t>& receivedNodes,
979  std::vector<galois::DynamicBitSet>& proxiesOnOtherHosts) {
980  using DstVecType = std::vector<std::vector<uint64_t>>;
981  using SendBufferVecTy = std::vector<galois::runtime::SendBuffer>;
982 
984  base_DistGraph::numHosts);
986  base_DistGraph::numHosts);
987 
989  const unsigned& id = this->base_DistGraph::id;
990  const unsigned& numHosts = this->base_DistGraph::numHosts;
991 
992  galois::GAccumulator<uint64_t> messagesSent;
994  galois::GReduceMax<uint64_t> maxBytesSent;
995  messagesSent.reset();
996  bytesSent.reset();
997  maxBytesSent.reset();
998 
999  // Go over assigned nodes and distribute edges.
1003  [&](uint64_t src) {
1004  uint32_t lsrc = 0;
1005  uint64_t curEdge = 0;
1006  if (this->isLocal(src)) {
1007  lsrc = this->G2L(src);
1008  curEdge = *graph.edge_begin(lsrc, galois::MethodFlag::UNPROTECTED);
1009  }
1010 
1011  auto ee = bufGraph.edgeBegin(src);
1012  auto ee_end = bufGraph.edgeEnd(src);
1013  auto& gdst_vec = *gdst_vecs.getLocal();
1014 
1015  for (unsigned i = 0; i < numHosts; ++i) {
1016  gdst_vec[i].clear();
1017  }
1018 
1019  for (; ee != ee_end; ++ee) {
1020  uint32_t gdst = bufGraph.edgeDestination(*ee);
1021  // make sure this edge is going to be kept and not dropped
1022  if (graphPartitioner->keepEdge(src, gdst)) {
1023  assert(this->isLocal(src));
1024  uint32_t ldst = this->G2L(gdst);
1025  graph.constructEdge(curEdge++, ldst);
1026 
1027  for (unsigned h = 0; h < net.Num; h++) {
1028  if (h != net.ID) {
1029  if (proxiesOnOtherHosts[h].test(src)) {
1030  // if kept, make sure destination exists on that host
1031  if (proxiesOnOtherHosts[h].test(gdst)) {
1032  // if it does, this edge must be duplicated on that host;
1033  // increment count
1034  gdst_vec[h].push_back(gdst);
1035  }
1036  }
1037  }
1038  }
1039  }
1040  }
1041 
1042  // make sure all edges accounted for if local
1043  if (this->isLocal(src)) {
1044  assert(curEdge == (*graph.edge_end(lsrc)));
1045  }
1046 
1047  // send
1048  for (uint32_t h = 0; h < numHosts; ++h) {
1049  if (h == id)
1050  continue;
1051 
1052  if (gdst_vec[h].size() > 0) {
1053  auto& b = (*sendBuffers.getLocal())[h];
1054  galois::runtime::gSerialize(b, src);
1055  galois::runtime::gSerialize(b, gdst_vec[h]);
1056 
1057  // send if over limit
1058  if (b.size() > edgePartitionSendBufSize) {
1059  messagesSent += 1;
1060  bytesSent.update(b.size());
1061  maxBytesSent.update(b.size());
1062 
1063  net.sendTagged(h, galois::runtime::evilPhase, b);
1064  b.getVec().clear();
1065  b.getVec().reserve(edgePartitionSendBufSize * 1.25);
1066  }
1067  }
1068  }
1069 
1070  // overlap receives
1071  auto buffer = net.recieveTagged(galois::runtime::evilPhase, nullptr);
1072  this->processReceivedEdgeBuffer(buffer, graph, receivedNodes);
1073  },
1074 #if MORE_DIST_STATS
1075  galois::loopname("EdgeLoading"),
1076 #endif
1078 
1079  // flush buffers
1080  for (unsigned threadNum = 0; threadNum < sendBuffers.size(); ++threadNum) {
1081  auto& sbr = *sendBuffers.getRemote(threadNum);
1082  for (unsigned h = 0; h < this->base_DistGraph::numHosts; ++h) {
1083  if (h == this->base_DistGraph::id)
1084  continue;
1085  auto& sendBuffer = sbr[h];
1086  if (sendBuffer.size() > 0) {
1087  messagesSent += 1;
1088  bytesSent.update(sendBuffer.size());
1089  maxBytesSent.update(sendBuffer.size());
1090 
1091  net.sendTagged(h, galois::runtime::evilPhase, sendBuffer);
1092  sendBuffer.getVec().clear();
1093  }
1094  }
1095  }
1096 
1097  net.flush();
1098 
1100  GRNAME, std::string("EdgeLoadingMessagesSent"), messagesSent.reduce());
1102  GRNAME, std::string("EdgeLoadingBytesSent"), bytesSent.reduce());
1104  GRNAME, std::string("EdgeLoadingMaxBytesSent"), maxBytesSent.reduce());
1105  }
1106 
1108  template <typename GraphTy>
1109  void processReceivedEdgeBuffer(
1110  std::optional<std::pair<uint32_t, galois::runtime::RecvBuffer>>& buffer,
1111  GraphTy& graph, std::atomic<uint32_t>& receivedNodes) {
1112  if (buffer) {
1113  auto& rb = buffer->second;
1114  while (rb.r_size() > 0) {
1115  uint64_t n;
1116  std::vector<uint64_t> gdst_vec;
1118  galois::runtime::gDeserialize(rb, gdst_vec);
1119  assert(isLocal(n));
1120  uint32_t lsrc = this->G2L(n);
1121  uint64_t cur = *graph.edge_begin(lsrc, galois::MethodFlag::UNPROTECTED);
1122  uint64_t cur_end = *graph.edge_end(lsrc);
1123  assert((cur_end - cur) == gdst_vec.size());
1124  deserializeEdges(graph, gdst_vec, cur, cur_end);
1125  ++receivedNodes;
1126  }
1127  }
1128  }
1129 
1134  template <typename GraphTy>
1135  void receiveEdges(GraphTy& graph, std::atomic<uint32_t>& receivedNodes) {
1137 
1138  // receive edges for all mirror nodes
1139  while (receivedNodes < nodesToReceive) {
1140  decltype(net.recieveTagged(galois::runtime::evilPhase, nullptr)) p;
1141  p = net.recieveTagged(galois::runtime::evilPhase, nullptr);
1142  processReceivedEdgeBuffer(p, graph, receivedNodes);
1143  }
1144  }
1145 
1146  template <typename GraphTy>
1147  void deserializeEdges(GraphTy& graph, std::vector<uint64_t>& gdst_vec,
1148  uint64_t& cur, uint64_t& cur_end) {
1149  uint64_t i = 0;
1150  while (cur < cur_end) {
1151  uint64_t gdst = gdst_vec[i++];
1152  uint32_t ldst = this->G2L(gdst);
1153  graph.constructEdge(cur++, ldst);
1154  }
1155  }
1156 
1157  virtual bool is_vertex_cut() const { return false; }
1158 };
1159 
1160 // make GRNAME visible to public
1161 template <typename NodeTy, typename EdgeTy, typename Partitioner>
1162 constexpr const char* const
1164 
1165 } // end namespace graphs
1166 } // end namespace galois
1167 #endif
size_t sizeEdges() const
Gets number of edges on this (local) graph.
Definition: DistributedGraph.h:653
std::vector< std::vector< size_t > > & getMirrorEdges()
Definition: MiningPartitioner.h:147
Definition: Traits.h:247
EdgeIterator edgeEnd(uint64_t globalNodeID)
Get the index to the first edge of the node after the provided node.
Definition: BufferedGraph.h:414
Distributed sum-reducer for getting the sum of some value across multiple hosts.
Definition: DReducible.h:45
galois::GAccumulator< uint64_t > lgMapAccesses
Definition: MiningPartitioner.h:792
uint64_t numGlobalEdges
Total edges in the global unpartitioned graph.
Definition: DistributedGraph.h:80
void gInfo(Args &&...args)
Prints an info string from a sequence of things.
Definition: gIO.h:55
uint64_t getEdgeLID(uint64_t gid)
Definition: MiningPartitioner.h:853
void increment_evilPhase()
Increments evilPhase, a phase counter used by communication.
Definition: DistributedGraph.h:133
unsigned int getActiveThreads() noexcept
Returns the number of threads in use.
Definition: Threads.cpp:37
std::pair< IterTy, IterTy > block_range(IterTy b, IterTy e, unsigned id, unsigned num)
Returns a continuous block from the range based on the number of divisions and the id of the block re...
Definition: gstl.h:244
void reset()
Definition: Reduction.h:113
uint64_t getGID(const uint32_t nodeID) const
Converts a local node id into a global node id.
Definition: DistributedGraph.h:562
Class that loads a portion of a Galois graph from disk directly into memory buffers for access...
Definition: BufferedGraph.h:49
uint64_t numGlobalNodes
Total nodes in the global unpartitioned graph.
Definition: DistributedGraph.h:79
const uint32_t numHosts
Total number of machines.
Definition: DistributedGraph.h:85
uint32_t getLID(const uint64_t nodeID) const
Converts a global node id into a local node id.
Definition: DistributedGraph.h:570
EdgeIterator edgeBegin(uint64_t globalNodeID)
Get the index to the first edge of the provided node THAT THIS GRAPH HAS LOADED (not necessary the fi...
Definition: BufferedGraph.h:386
void reportStat_Tmax(const S1 &region, const S2 &category, const T &value)
Definition: Statistics.h:556
virtual unsigned getHostID(uint64_t gid) const
Determines which host has the master for a particular node.
Definition: MiningPartitioner.h:165
Definition: MiningPartitioner.h:46
Concurrent dynamically allocated bitset.
Definition: libgalois/include/galois/DynamicBitset.h:47
void reportParam(const S1 &region, const S2 &category, const V &value)
Definition: Statistics.h:616
Buffer for serialization of data.
Definition: Serialize.h:56
void constructLocalEdgeGIDMap()
Construct a map from local edge GIDs to LID.
Definition: MiningPartitioner.h:796
uint64_t globalEdges() const
Returns # edges kept in all graphs.
Definition: MiningPartitioner.h:145
GraphNode getEdgeDst(edge_iterator ni)
Gets edge destination of edge ni.
Definition: DistributedGraph.h:606
~MiningGraph()
Free the graph partitioner.
Definition: MiningPartitioner.h:398
void gDebug(Args &&...GALOIS_USED_ONLY_IN_DEBUG(args))
Prints a debug string from a sequence of things; prints nothing if NDEBUG is defined.
Definition: gIO.h:72
void reportAccessBefore()
Definition: MiningPartitioner.h:822
void sortEdgesByDestination()
Sort the underlying LC_CSR_Graph by ID (destinations) It sorts edges of the nodes by destination...
Definition: DistributedGraph.h:850
size_t size() const
Gets number of nodes on this (local) graph.
Definition: DistributedGraph.h:646
void gDeserialize(DeSerializeBuffer &buf, T1 &&t1, Args &&...args)
Deserialize data in a buffer into a series of objects.
Definition: Serialize.h:1032
Ty reset()
Reset the entire accumulator.
Definition: DReducible.h:153
const char * loopname
Definition: Executor_ParaMeter.h:145
#define GALOIS_DIE(...)
Definition: gIO.h:96
uint64_t numEdges
Num edges in this graph in total.
Definition: DistributedGraph.h:82
void resetReadCounters()
Reset reading counters.
Definition: BufferedGraph.h:496
void reportStat_Tsum(const S1 &region, const S2 &category, const T &value)
Definition: Statistics.h:562
vTy & getVec()
Returns vector of data stored in this serialize buffer.
Definition: Serialize.h:115
bool test(size_t index) const
Check a bit to see if it is currently set.
Definition: libgalois/include/galois/DynamicBitset.h:192
uint32_t G2L(uint64_t gid) const
Definition: DistributedGraph.h:482
size_t globalSizeEdges() const
Gets number of edges on the global unpartitioned graph.
Definition: DistributedGraph.h:683
void determineThreadRanges()
Uses a pre-computed prefix sum to determine division of nodes among threads.
Definition: DistributedGraph.h:725
void reserve(size_t n)
Definition: PODResizeableArray.h:129
edge_iterator edge_begin(GraphNode N)
Definition: OfflineGraph.h:278
edge_iterator edge_begin(GraphNode N)
Gets the first edge of some node.
Definition: DistributedGraph.h:614
Accumulator for T where accumulation is max.
Definition: Reduction.h:189
void loadPartialGraph(const std::string &filename, uint64_t nodeStart, uint64_t nodeEnd, uint64_t edgeStart, uint64_t edgeEnd, uint64_t numGlobalNodes, uint64_t numGlobalEdges)
Given a node/edge range to load, loads the specified portion of the graph into memory buffers using r...
Definition: BufferedGraph.h:348
#define GALOIS_ASSERT(cond,...)
Like assert but unconditionally executed.
Definition: gIO.h:102
uint64_t count() const
Count how many bits are set in the bitset.
Definition: libgalois/include/galois/DynamicBitset.h:320
std::vector< std::vector< size_t > > mirrorNodes
Mirror nodes from different hosts. For reduce.
Definition: DistributedGraph.h:100
void resize(uint64_t n)
Resizes the bitset.
Definition: libgalois/include/galois/DynamicBitset.h:78
std::vector< std::pair< uint64_t, uint64_t > > gid2host
Information that converts host to range of nodes that host reads.
Definition: DistributedGraph.h:98
std::vector< T, Pow2Alloc< T >> Vector
[STL vector using Pow_2_VarSizeAlloc]
Definition: gstl.h:52
unsigned getHostReader(uint64_t gid) const
Return the reader of a particular node.
Definition: MiningPartitioner.h:154
virtual bool isOwned(uint64_t gid) const
Determine if a node has a master on this host.
Definition: MiningPartitioner.h:170
void reportStat_Single(const S1 &region, const S2 &category, const T &value)
Definition: Statistics.h:544
Definition: PerThreadStorage.h:88
galois::runtime::iterable< galois::NoDerefIterator< edge_iterator > > edges(GraphNode N)
Returns an iterable object over the edges of a particular node in the graph.
Definition: DistributedGraph.h:636
void reset()
Gets the space taken by the bitset.
Definition: libgalois/include/galois/DynamicBitset.h:110
Definition: OfflineGraph.h:63
size_type size() const
Returns the size of the serialize buffer.
Definition: Serialize.h:125
void constructNodes()
Definition: LC_CSR_Graph.h:570
MASTERS_DISTRIBUTION
Enums specifying how masters are to be distributed among hosts.
Definition: DistributedGraph.h:48
void determineThreadRangesMaster()
Determines the thread ranges for master nodes only and saves them to the object.
Definition: DistributedGraph.h:736
void gPrint(Args &&...args)
Prints a sequence of things.
Definition: gIO.h:47
edge_iterator edge_end(GraphNode N)
Gets the end edge boundary of some node.
Definition: DistributedGraph.h:625
unsigned int activeThreads
Definition: Threads.cpp:26
Definition: Traits.h:206
boost::counting_iterator< uint64_t > edge_iterator
Definition: OfflineGraph.h:194
NetworkInterface & getSystemNetworkInterface()
Get the network interface.
Definition: Network.cpp:131
MiningGraph(const std::string &filename, unsigned host, unsigned _numHosts, bool setupGluon=true, bool doSort=false, galois::graphs::MASTERS_DISTRIBUTION md=BALANCED_EDGES_OF_MASTERS, uint32_t nodeWeight=0, uint32_t edgeWeight=0)
Constructor.
Definition: MiningPartitioner.h:184
size_t globalSize() const
Gets number of nodes on the global unpartitioned graph.
Definition: DistributedGraph.h:676
uint32_t numNodes
Num nodes in this graph in total.
Definition: DistributedGraph.h:81
void clear()
Definition: PODResizeableArray.h:147
Contains the implementation for DistGraph.
uint32_t numOwned
Number of nodes owned (masters) by this host.
Definition: DistributedGraph.h:89
void do_all(const RangeFunc &rangeMaker, FunctionTy &&fn, const Args &...args)
Standard do-all loop.
Definition: Loops.h:71
bool set(size_t index)
Set a bit in the bitset.
Definition: libgalois/include/galois/DynamicBitset.h:206
void reportAccess()
Definition: MiningPartitioner.h:827
uint32_t numNodesWithEdges
Number of nodes (masters + mirrors) that have outgoing edges.
Definition: DistributedGraph.h:94
uint64_t numOwnedEdges() const
Returns edges owned by this graph (i.e.
Definition: MiningPartitioner.h:140
void start()
Definition: Timer.cpp:82
void on_each(FunctionTy &&fn, const Args &...args)
Low-level parallel loop.
Definition: Loops.h:86
uint64_t computeMasters(MASTERS_DISTRIBUTION masters_distribution, galois::graphs::OfflineGraph &g, const std::vector< unsigned > &scalefactor, uint32_t nodeWeight=0, uint32_t edgeWeight=0, unsigned DecomposeFactor=1)
Wrapper call that will call into more specific compute masters functions that compute masters based o...
Definition: DistributedGraph.h:364
virtual bool isLocal(uint64_t gid) const
Determine if a node has a proxy on this host.
Definition: MiningPartitioner.h:175
void initializeSpecificRanges()
Initializes the 3 range objects that a user can access to iterate over the graph in different ways...
Definition: DistributedGraph.h:785
Ty read_local()
Read local accumulated value.
Definition: DReducible.h:133
T & reduce()
Returns the final reduction value.
Definition: Reduction.h:102
size_t size() const
Definition: OfflineGraph.h:271
std::unordered_map< uint64_t, uint32_t > globalToLocalMap
LID = globalToLocalMap[GID].
Definition: DistributedGraph.h:105
Ty reduce(std::string runID=std::string())
Reduce data across all hosts, saves the value, and returns the reduced value.
Definition: DReducible.h:169
std::pair< uint64_t, bool > getLIDFromMap(unsigned src, unsigned dst)
checks map constructed above to see which local id corresponds to a node/edge (if it exists) ...
Definition: MiningPartitioner.h:838
uint32_t evilPhase
Variable that keeps track of which network send/recv phase a program is currently on...
Definition: Network.cpp:36
uint64_t getEdgeGID(uint64_t lid)
Definition: MiningPartitioner.h:887
Base DistGraph class that all distributed graphs extend from.
Definition: DistributedGraph.h:64
auto iterate(C &cont)
Definition: Range.h:323
void update(T &&rhs)
Updates the thread local value by applying the reduction operator to current and newly provided value...
Definition: Reduction.h:90
void resetAndFree()
Free all of the in memory buffers in this object and reset graph status.
Definition: BufferedGraph.h:516
std::vector< uint64_t > localToGlobalVector
GID = localToGlobalVector[LID].
Definition: DistributedGraph.h:103
const unsigned id
ID of the machine.
Definition: DistributedGraph.h:84
void allocateFrom(const FileGraph &graph)
Definition: LC_CSR_Graph.h:513
size_t sizeEdges() const
Definition: OfflineGraph.h:272
Definition: Timer.h:88
uint64_t edgeDestination(uint64_t globalEdgeID)
Get the global node id of the destination of the provided edge.
Definition: BufferedGraph.h:437
void stop()
Definition: Timer.cpp:87
Implements distributed reducible objects for easy reduction of values across a distributed system...
Galois Timer that automatically reports stats upon destruction Provides statistic interface around ti...
Definition: Timer.h:63
uint32_t findSourceFromEdge(uint64_t lid)
Definition: MiningPartitioner.h:868
GraphTy graph
The internal graph used by DistGraph to represent the graph.
Definition: DistributedGraph.h:73
void determineThreadRangesWithEdges()
Determines the thread ranges for nodes with edges only and saves them to the object.
Definition: DistributedGraph.h:762
uint32_t beginMaster
Local id of the beginning of master nodes.
Definition: DistributedGraph.h:91
balance edges
Definition: DistributedGraph.h:52