27 #ifndef _GALOIS_DIST_NEWGENERIC_H 
   28 #define _GALOIS_DIST_NEWGENERIC_H 
   35 #define CUSP_PT_TIMER 0 
   46 template <
typename NodeTy, 
typename EdgeTy, 
typename Partitioner>
 
   49   constexpr 
static unsigned edgePartitionSendBufSize = 8388608;
 
   50   constexpr 
static const char* 
const GRNAME          = 
"dGraph_Generic";
 
   51   Partitioner* graphPartitioner;
 
   54   uint32_t _edgeStateRounds;
 
   55   std::vector<galois::DGAccumulator<uint64_t>> hostLoads;
 
   56   std::vector<uint64_t> old_hostLoads;
 
   58   uint32_t G2LEdgeCut(uint64_t gid, uint32_t globalOffset)
 const {
 
   62       return gid - globalOffset;
 
   71   void freeVector(V& vectorToKill) {
 
   73     vectorToKill.swap(dummyVector);
 
   76   uint32_t nodesToReceive;
 
   84     return graphPartitioner->retrieveMaster(gid);
 
   87   virtual bool isOwned(uint64_t gid)
 const {
 
   92   virtual bool isLocal(uint64_t gid)
 const {
 
  102   virtual bool is_vertex_cut()
 const { 
return graphPartitioner->isVertexCut(); }
 
  104     return graphPartitioner->cartesianGrid();
 
  111     if (_edgeStateRounds > 1) {
 
  112       if (!graphPartitioner->noCommunication()) {
 
  114           hostLoads[i].reset();
 
  115           old_hostLoads[i] = 0;
 
  125     if (_edgeStateRounds > 1) {
 
  126       if (!graphPartitioner->noCommunication()) {
 
  128           old_hostLoads[i] += hostLoads[i].reduce();
 
  129           hostLoads[i].reset();
 
  139     if (_edgeStateRounds > 1) {
 
  140       if (!graphPartitioner->noCommunication()) {
 
  144                            old_hostLoads[i], 
"\n");
 
  155       const std::string& filename, 
unsigned host, 
unsigned _numHosts,
 
  156       bool cuspAsync = 
true, uint32_t stateRounds = 100, 
bool transpose = 
false,
 
  158       uint32_t nodeWeight = 0, uint32_t edgeWeight = 0,
 
  159       std::string masterBlockFile = 
"", 
bool readFromFile = 
false,
 
  160       std::string localGraphFileName = 
"local_graph",
 
  161       uint32_t edgeStateRounds       = 1)
 
  162       : 
base_DistGraph(host, _numHosts), _edgeStateRounds(edgeStateRounds) {
 
  165         "GraphPartitioningTime", GRNAME);
 
  166     Tgraph_construct.
start();
 
  170                      "] Reading local graph from file ", localGraphFileName,
 
  173       Tgraph_construct.
stop();
 
  180     std::vector<unsigned> dummy;
 
  182     if (masterBlockFile == 
"") {
 
  204     std::vector<std::vector<uint64_t>> numOutgoingEdges;
 
  206     std::vector<galois::DynamicBitSet> hasIncomingEdge;
 
  209     if (!graphPartitioner->noCommunication()) {
 
  210       if (_edgeStateRounds > 1) {
 
  225     graphReadTimer.
start();
 
  229     graphReadTimer.
stop();
 
  232     if (graphPartitioner->masterAssignPhase()) {
 
  236                      "] Starting master assignment.\n");
 
  238       phase0(bufGraph, cuspAsync, stateRounds);
 
  241                      "] Master assignment complete.\n");
 
  245     inspectionTimer.
start();
 
  250     if (!graphPartitioner->noCommunication()) {
 
  251       edgeInspection(bufGraph, numOutgoingEdges, hasIncomingEdge,
 
  258       nodeMapping(numOutgoingEdges, finalIncoming, prefixSumOfEdges);
 
  264       uint64_t edgeOffset      = *bufGraph.
edgeBegin(nodeBegin);
 
  266       edgeCutInspection(bufGraph, inspectionTimer, edgeOffset,
 
  274     if (graphPartitioner->masterAssignPhase()) {
 
  275       graphPartitioner->enterStage2();
 
  279     numOutgoingEdges.clear();
 
  280     hasIncomingEdge.clear();
 
  282     freeVector(numOutgoingEdges); 
 
  283     freeVector(hasIncomingEdge);  
 
  297         [&](uint64_t n) { base_graph.fixEndEdge(n, prefixSumOfEdges[n]); },
 
  303     prefixSumOfEdges.clear();
 
  304     freeVector(prefixSumOfEdges); 
 
  307     TfillMirrors.
start();
 
  311     if (_edgeStateRounds > 1) {
 
  317     if (!graphPartitioner->noCommunication()) {
 
  329     if (graphPartitioner->isVertexCut() &&
 
  330         graphPartitioner->cartesianGrid().first == 0) {
 
  347     Tthread_ranges.
start();
 
  349     Tthread_ranges.
stop();
 
  355     Tgraph_construct.
stop();
 
  361                                          (uint32_t)stateRounds);
 
  373                          std::vector<uint32_t>& assignedThreadRanges,
 
  374                          uint64_t startNode, uint64_t endNode) {
 
  376     threadRangeTime.start();
 
  377     uint64_t numLocalNodes = endNode - startNode;
 
  379     edgePrefixSum.
resize(numLocalNodes);
 
  385           uint64_t offset       = n - startNode;
 
  390     for (
unsigned i = 1; i < numLocalNodes; i++) {
 
  391       edgePrefixSum[i] += edgePrefixSum[i - 1];
 
  398       assignedThreadRanges[i] += startNode;
 
  402         boost::counting_iterator<size_t>(startNode),
 
  403         boost::counting_iterator<size_t>(startNode + numLocalNodes),
 
  404         assignedThreadRanges.data());
 
  406     threadRangeTime.stop();
 
  423     bitsetSetupTimer.start();
 
  428     std::vector<uint32_t> rangeVector;
 
  433         getSpecificThreadRange(bufGraph, rangeVector, start, end);
 
  446           for (; ii < ee; ++ii) {
 
  448             if ((dst < start) || (dst >= end)) { 
 
  458     bitsetSetupTimer.stop();
 
  473   uint64_t phase0MapSetup(
 
  475       std::unordered_map<uint64_t, uint32_t>& gid2offsets,
 
  478     mapSetupTimer.start();
 
  482     uint32_t lid = numLocal;
 
  484     uint64_t numToReserve = ghosts.
count();
 
  485     gid2offsets.reserve(numToReserve);
 
  493       for (uint64_t gid = start; gid < end; ++gid) {
 
  494         if (ghosts.
test(gid)) {
 
  495           gid2offsets[gid] = lid;
 
  496           syncNodes[h].push_back(gid - start);
 
  501                      (end - start) / 64, 
" vs. vector size ",
 
  502                      syncNodes[h].
size() / 2);
 
  506     assert(lid == numToReserve);
 
  508                    (ghosts.
size() - numLocal) / 64, 
" vs. total vector size ",
 
  514     mapSetupTimer.stop();
 
  533     p0BitsetCommTimer.start();
 
  534     uint64_t bytesSent = 0;
 
  541         galois::runtime::gSerialize(bitsetBuffer, syncNodes[h]);
 
  542         bytesSent += bitsetBuffer.
size();
 
  549     for (
unsigned h = 0; h < net.Num - 1; h++) {
 
  554       uint32_t sendingHost = p->first;
 
  559     p0BitsetCommTimer.stop();
 
  562         GRNAME, std::string(
"Phase0SendRecvBitsetsBytesSent"), bytesSent);
 
  577   void syncLoad(std::vector<uint64_t>& loads,
 
  579     assert(loads.size() == accums.size());
 
  583     for (
unsigned i = 0; i < loads.size(); i++) {
 
  585       syncer += (accums[i].load());
 
  587       uint64_t accumulation = syncer.reduce();
 
  588       loads[i] += accumulation;
 
  599   template <
typename VType>
 
  603     nonAtomic.
resize(atomic.size());
 
  608           nonAtomic[i] = atomic[i].load();
 
  625     unsigned bytesSent = 0;
 
  635         galois::runtime::gSerialize(b, 4);
 
  636         galois::runtime::gSerialize(b, nodeAccum);
 
  637         galois::runtime::gSerialize(b, edgeAccum);
 
  638         bytesSent += b.
size();
 
  658   void asyncRecvLoad(std::vector<uint64_t>& nodeLoads,
 
  659                      std::vector<uint64_t>& edgeLoads,
 
  671         unsigned messageType = (unsigned)-1;
 
  675         if (messageType == 4) {
 
  682           assert(recvNodeAccum.
size() == recvEdgeAccum.
size());
 
  683           assert(recvNodeAccum.
size() == nodeLoads.size());
 
  684           assert(recvEdgeAccum.
size() == edgeLoads.size());
 
  689                 nodeLoads[i] += recvNodeAccum[i];
 
  690                 edgeLoads[i] += recvEdgeAccum[i];
 
  693         } 
else if (messageType == 3) {
 
  695           uint32_t sendingHost = p->first;
 
  696           assert(!loadsClear.
test(sendingHost));
 
  697           loadsClear.
set(sendingHost);
 
  699           GALOIS_DIE(
"unexpected message type in async load synchronization: ",
 
  722   void asyncSyncLoad(std::vector<uint64_t>& nodeLoads,
 
  724                      std::vector<uint64_t>& edgeLoads,
 
  738     extractAtomicToPODArray(nodeAccum, nonAtomicNodeAccum);
 
  739     extractAtomicToPODArray(edgeAccum, nonAtomicEdgeAccum);
 
  748           nodeLoads[i] += nonAtomicNodeAccum[i];
 
  749           edgeLoads[i] += nonAtomicEdgeAccum[i];
 
  754     for (
unsigned i = 0; i < nodeAccum.
size(); i++) {
 
  755       assert(nodeAccum[i].load() == 0);
 
  756       assert(edgeAccum[i].load() == 0);
 
  761     asyncSendLoad(nonAtomicNodeAccum, nonAtomicEdgeAccum);
 
  762     asyncRecvLoad(nodeLoads, edgeLoads, loadsClear);
 
  774   void printLoad(std::vector<uint64_t>& loads,
 
  776     assert(loads.size() == accums.size());
 
  777     for (
unsigned i = 0; i < loads.size(); i++) {
 
  779                      " accum ", accums[i].load());
 
  793   template <
typename T>
 
  794   std::vector<T> getDataFromOffsets(std::vector<uint32_t>& offsetVector,
 
  795                                     const std::vector<T>& dataVector) {
 
  796     std::vector<T> toReturn;
 
  797     toReturn.resize(offsetVector.size());
 
  801         [&](
unsigned i) { toReturn[i] = dataVector[offsetVector[i]]; },
 
  818                    std::vector<uint32_t>& dataVector,
 
  819                    std::string timerName = std::string()) {
 
  821     std::string statString = std::string(
"Phase0SendOffsets_") + timerName;
 
  822     uint64_t bytesSent     = 0;
 
  826     sendOffsetsTimer.
start();
 
  829     if (toSync.
count()) {
 
  830       std::vector<uint32_t> offsetVector = toSync.
getOffsets();
 
  832       std::vector<uint32_t> mastersToSend =
 
  833           getDataFromOffsets(offsetVector, dataVector);
 
  835       assert(mastersToSend.size());
 
  837       size_t num_selected = toSync.
count();
 
  838       size_t num_total    = toSync.
size();
 
  841       size_t bitset_alloc_size =
 
  842           ((num_total + 63) / 64) * 
sizeof(uint64_t) + (2 * 
sizeof(
size_t));
 
  843       size_t bitsetDataSize = (num_selected * 
sizeof(uint32_t)) +
 
  844                               bitset_alloc_size + 
sizeof(num_selected);
 
  845       size_t offsetsDataSize = (num_selected * 
sizeof(uint32_t)) +
 
  846                                (num_selected * 
sizeof(
unsigned int)) +
 
  847                                sizeof(uint32_t) + 
sizeof(num_selected);
 
  851       if (bitsetDataSize < offsetsDataSize) {
 
  853         galois::runtime::gSerialize(b, 1u);
 
  854         galois::runtime::gSerialize(b, toSync);
 
  855         galois::runtime::gSerialize(b, mastersToSend);
 
  858         galois::runtime::gSerialize(b, 2u);
 
  859         galois::runtime::gSerialize(b, offsetVector);
 
  860         galois::runtime::gSerialize(b, mastersToSend);
 
  862       bytesSent += b.
size();
 
  867       galois::runtime::gSerialize(b, 0u);
 
  868       bytesSent += b.
size();
 
  871     sendOffsetsTimer.stop();
 
  888   void syncAssignmentSends(
 
  889       uint32_t begin, uint32_t end, uint32_t numLocalNodes,
 
  890       std::vector<uint32_t>& localNodeToMaster,
 
  893     p0assignSendTime.start();
 
  896     toSync.
resize(numLocalNodes);
 
  906               if ((lid >= begin) && (lid < end)) {
 
  912         sendOffsets(h, toSync, localNodeToMaster, 
"NewAssignments");
 
  916     p0assignSendTime.stop();
 
  924   void sendAllClears(
unsigned phase = 0) {
 
  925     unsigned bytesSent = 0;
 
  928     allClearTimer.start();
 
  934         galois::runtime::gSerialize(b, 3u);
 
  935         bytesSent += b.
size();
 
  939         } 
else if (phase == 0) {
 
  946     allClearTimer.stop();
 
  952   void saveReceivedMappings(std::vector<uint32_t>& localNodeToMaster,
 
  953                             std::unordered_map<uint64_t, uint32_t>& gid2offsets,
 
  954                             unsigned sendingHost,
 
  955                             std::vector<uint32_t>& receivedOffsets,
 
  956                             std::vector<uint32_t>& receivedMasters) {
 
  962     assert(receivedMasters.size() == receivedOffsets.size());
 
  967           uint64_t curGID       = hostOffset + receivedOffsets[i];
 
  968           uint32_t indexIntoMap = gid2offsets[curGID];
 
  971           localNodeToMaster[indexIntoMap] = receivedMasters[i];
 
  984   std::pair<unsigned, unsigned>
 
  985   recvOffsetsAndMasters(std::vector<uint32_t>& receivedOffsets,
 
  986                         std::vector<uint32_t>& receivedMasters) {
 
  994     uint32_t sendingHost = p->first;
 
  995     unsigned messageType = (unsigned)-1;
 
 1000     if (messageType == 1) {
 
 1006     } 
else if (messageType == 2) {
 
 1010     } 
else if (messageType != 0) {
 
 1011       GALOIS_DIE(
"invalid message type for sync of master assignments: ",
 
 1016                    " send message type ", messageType);
 
 1018     return std::make_pair(sendingHost, messageType);
 
 1029   void recvOffsetsAndMastersAsync(
 
 1030       std::vector<uint32_t>& localNodeToMaster,
 
 1031       std::unordered_map<uint64_t, uint32_t>& gid2offsets,
 
 1040         uint32_t sendingHost = p->first;
 
 1041         unsigned messageType = (unsigned)-1;
 
 1043         std::vector<uint32_t> receivedOffsets;
 
 1044         std::vector<uint32_t> receivedMasters;
 
 1049         if (messageType == 1) {
 
 1055           saveReceivedMappings(localNodeToMaster, gid2offsets, sendingHost,
 
 1056                                receivedOffsets, receivedMasters);
 
 1057         } 
else if (messageType == 2) {
 
 1061           saveReceivedMappings(localNodeToMaster, gid2offsets, sendingHost,
 
 1062                                receivedOffsets, receivedMasters);
 
 1063         } 
else if (messageType == 3) {
 
 1066           assert(!hostFinished.
test(sendingHost));
 
 1067           hostFinished.
set(sendingHost);
 
 1068         } 
else if (messageType != 0) {
 
 1069           GALOIS_DIE(
"invalid message type for sync of master assignments: ",
 
 1074                        " send message type ", messageType);
 
 1089   syncAssignmentReceives(std::vector<uint32_t>& localNodeToMaster,
 
 1090                          std::unordered_map<uint64_t, uint32_t>& gid2offsets) {
 
 1093     p0assignReceiveTime.start();
 
 1096     for (
unsigned h = 0; h < base_DistGraph::numHosts - 1; h++) {
 
 1097       unsigned sendingHost;
 
 1098       unsigned messageType;
 
 1099       std::vector<uint32_t> receivedOffsets;
 
 1100       std::vector<uint32_t> receivedMasters;
 
 1102       std::tie(sendingHost, messageType) =
 
 1103           recvOffsetsAndMasters(receivedOffsets, receivedMasters);
 
 1105       if (messageType == 1 || messageType == 2) {
 
 1106         saveReceivedMappings(localNodeToMaster, gid2offsets, sendingHost,
 
 1107                              receivedOffsets, receivedMasters);
 
 1111     p0assignReceiveTime.stop();
 
 1114   void syncAssignmentReceivesAsync(
 
 1115       std::vector<uint32_t>& localNodeToMaster,
 
 1116       std::unordered_map<uint64_t, uint32_t>& gid2offsets,
 
 1120     p0assignReceiveTime.start();
 
 1122     recvOffsetsAndMastersAsync(localNodeToMaster, gid2offsets, hostFinished);
 
 1124     p0assignReceiveTime.stop();
 
 1139   void syncAssignment(
 
 1140       uint32_t begin, uint32_t end, uint32_t numLocalNodes,
 
 1141       std::vector<uint32_t>& localNodeToMaster,
 
 1143       std::unordered_map<uint64_t, uint32_t>& gid2offsets) {
 
 1145     syncAssignmentTimer.start();
 
 1147     syncAssignmentSends(begin, end, numLocalNodes, localNodeToMaster,
 
 1149     syncAssignmentReceives(localNodeToMaster, gid2offsets);
 
 1151     syncAssignmentTimer.stop();
 
 1154   void syncAssignmentAsync(
 
 1155       uint32_t begin, uint32_t end, uint32_t numLocalNodes,
 
 1156       std::vector<uint32_t>& localNodeToMaster,
 
 1158       std::unordered_map<uint64_t, uint32_t>& gid2offsets,
 
 1162     syncAssignmentTimer.start();
 
 1164     syncAssignmentSends(begin, end, numLocalNodes, localNodeToMaster,
 
 1166     syncAssignmentReceivesAsync(localNodeToMaster, gid2offsets, hostFinished);
 
 1168     syncAssignmentTimer.stop();
 
 1179   void sendMastersToOwners(
 
 1180       std::vector<uint32_t>& localNodeToMaster,
 
 1188     toSend.
resize(end - begin);
 
 1197               if (localNodeToMaster[lid] == h) {
 
 1206         sendOffsets(h, toSend, localNodeToMaster, 
"MastersToOwners");
 
 1215   void recvMastersToOwners() {
 
 1216     for (
unsigned h = 0; h < base_DistGraph::numHosts - 1; h++) {
 
 1217       unsigned sendingHost;
 
 1218       unsigned messageType;
 
 1219       std::vector<uint32_t> receivedOffsets;
 
 1220       std::vector<uint32_t> receivedMasters;
 
 1222       std::tie(sendingHost, messageType) =
 
 1223           recvOffsetsAndMasters(receivedOffsets, receivedMasters);
 
 1225       if (messageType == 1 || messageType == 2) {
 
 1226         assert(receivedMasters.size() == receivedOffsets.size());
 
 1230         for (
unsigned i = 0; i < receivedMasters.size(); i++) {
 
 1231           uint64_t gidToMap = hostOffset + receivedOffsets[i];
 
 1235               graphPartitioner->addMasterMapping(gidToMap, receivedMasters[i]);
 
 1251               const uint32_t stateRounds) {
 
 1255     syncNodes.resize(base_DistGraph::numHosts);
 
 1258     phase0BitsetSetup(bufGraph, ghosts);
 
 1260     std::unordered_map<uint64_t, uint32_t> gid2offsets;
 
 1261     uint64_t neighborCount = phase0MapSetup(ghosts, gid2offsets, syncNodes);
 
 1265     phase0SendRecv(syncNodes);
 
 1269     p0allocTimer.start();
 
 1273     std::vector<uint64_t> nodeLoads;
 
 1274     std::vector<uint64_t> edgeLoads;
 
 1275     std::vector<galois::CopyableAtomic<uint64_t>> nodeAccum;
 
 1276     std::vector<galois::CopyableAtomic<uint64_t>> edgeAccum;
 
 1277     nodeLoads.assign(base_DistGraph::numHosts, 0);
 
 1278     edgeLoads.assign(base_DistGraph::numHosts, 0);
 
 1279     nodeAccum.assign(base_DistGraph::numHosts, 0);
 
 1280     edgeAccum.assign(base_DistGraph::numHosts, 0);
 
 1282     uint32_t numLocalNodes =
 
 1286     std::vector<uint32_t> localNodeToMaster;
 
 1287     localNodeToMaster.assign(numLocalNodes + neighborCount, (uint32_t)-1);
 
 1295         galois::gPrint(
"Using asynchronous master determination sends.\n");
 
 1298       hostFinished.
resize(base_DistGraph::numHosts);
 
 1299       loadsClear.
resize(base_DistGraph::numHosts);
 
 1302     p0allocTimer.stop();
 
 1307     for (uint32_t i : localNodeToMaster) {
 
 1308       assert(i == (uint32_t)-1);
 
 1313       galois::gPrint(
"Number of BSP sync rounds in master assignment: ",
 
 1320     for (
unsigned syncRound = 0; syncRound < stateRounds; syncRound++) {
 
 1325           syncRound, stateRounds);
 
 1328       std::vector<uint32_t> rangeVec;
 
 1330           getSpecificThreadRange(bufGraph, rangeVec, beginNode, endNode);
 
 1343           [&](uint32_t node) {
 
 1347             uint32_t assignedHost = graphPartitioner->getMaster(
 
 1348                 node, bufGraph, localNodeToMaster, gid2offsets, nodeLoads,
 
 1349                 nodeAccum, edgeLoads, edgeAccum);
 
 1351             assert(assignedHost != (uint32_t)-1);
 
 1354             localNodeToMaster[node - globalOffset] = assignedHost;
 
 1367         syncAssignment(beginNode - globalOffset, endNode - globalOffset,
 
 1368                        numLocalNodes, localNodeToMaster, syncNodes,
 
 1372         if (beginNode != endNode) {
 
 1373           syncAssignmentAsync(beginNode - globalOffset, endNode - globalOffset,
 
 1374                               numLocalNodes, localNodeToMaster, syncNodes,
 
 1375                               gid2offsets, hostFinished);
 
 1382       loadSyncTimer.start();
 
 1384         syncLoad(nodeLoads, nodeAccum);
 
 1385         syncLoad(edgeLoads, edgeAccum);
 
 1387         asyncSyncLoad(nodeLoads, nodeAccum, edgeLoads, edgeAccum, loadsClear);
 
 1389       loadSyncTimer.stop();
 
 1394                        hostFinished.
count());
 
 1411       while (hostFinished.
count() != base_DistGraph::numHosts ||
 
 1422         syncAssignmentReceivesAsync(localNodeToMaster, gid2offsets,
 
 1424         asyncRecvLoad(nodeLoads, edgeLoads, loadsClear);
 
 1430     printLoad(nodeLoads, nodeAccum);
 
 1431     printLoad(edgeLoads, edgeAccum);
 
 1435     for (uint32_t i = 0; i < localNodeToMaster.size(); i++) {
 
 1436       if (localNodeToMaster[i] == (uint32_t)-1) {
 
 1438         assert(localNodeToMaster[i] != (uint32_t)-1);
 
 1449                    "] Local master assignment " 
 1457     p0master2ownerTimer.start();
 
 1458     sendMastersToOwners(localNodeToMaster, syncNodes);
 
 1459     recvMastersToOwners();
 
 1460     p0master2ownerTimer.stop();
 
 1466     graphPartitioner->saveGID2HostInfo(gid2offsets, localNodeToMaster,
 
 1472                          uint64_t edgeOffset,
 
 1476     incomingMirrors.
reset();
 
 1490           auto ee = bufGraph.
edgeEnd(n);
 
 1491           for (; ii < ee; ++ii) {
 
 1493             if (graphPartitioner->retrieveMaster(dst) != myID) {
 
 1494               incomingMirrors.
set(dst);
 
 1497           prefixSumOfEdges[n - globalOffset] = (*ee) - edgeOffset;
 
 1498           ltgv[n - globalOffset]             = n;
 
 1504     inspectionTimer.
stop();
 
 1509         "] Edge inspection time: ", inspectionTimer.
get_usec() / 1000000.0f,
 
 1510         " seconds to read ", allBytesRead, 
" bytes (",
 
 1511         allBytesRead / (float)inspectionTimer.
get_usec(), 
" MBPS)\n");
 
 1514     uint32_t additionalMirrorCount = incomingMirrors.
count();
 
 1519       prefixSumOfEdges.resize(prefixSumOfEdges.size() + additionalMirrorCount,
 
 1520                               prefixSumOfEdges.back());
 
 1522       prefixSumOfEdges.resize(additionalMirrorCount);
 
 1525     if (additionalMirrorCount > 0) {
 
 1529       std::vector<uint64_t> threadPrefixSums(activeThreads);
 
 1533         std::tie(beginNode, endNode) =
 
 1536         for (
size_t i = beginNode; i < endNode; i++) {
 
 1537           if (incomingMirrors.
test(i))
 
 1540         threadPrefixSums[tid] = count;
 
 1543       for (
unsigned int i = 1; i < threadPrefixSums.size(); i++) {
 
 1544         threadPrefixSums[i] += threadPrefixSums[i - 1];
 
 1547       assert(threadPrefixSums.back() == additionalMirrorCount);
 
 1554         std::tie(beginNode, endNode) =
 
 1557         uint32_t threadStartLocation = 0;
 
 1559           threadStartLocation = threadPrefixSums[tid - 1];
 
 1561         uint32_t handledNodes = 0;
 
 1562         for (
size_t i = beginNode; i < endNode; i++) {
 
 1563           if (incomingMirrors.
test(i)) {
 
 1565                                                 threadStartLocation +
 
 1574     if (prefixSumOfEdges.size() != 0) {
 
 1604   template <
typename GraphTy,
 
 1605             typename std::enable_if<!std::is_void<
 
 1607   void edgeCutLoad(GraphTy& 
graph,
 
 1624           uint32_t lsrc = this->G2LEdgeCut(n, globalOffset);
 
 1627           for (; ii < ee; ++ii) {
 
 1629             decltype(gdst) ldst = this->G2LEdgeCut(gdst, globalOffset);
 
 1630             auto gdata          = bGraph.edgeData(*ii);
 
 1631             graph.constructEdge(cur++, ldst, gdata);
 
 1633           assert(cur == (*graph.
edge_end(lsrc)));
 
 1642                    "] Edge loading time: ", timer.get_usec() / 1000000.0f,
 
 1643                    " seconds to read ", bGraph.
getBytesRead(), 
" bytes (",
 
 1644                    bGraph.
getBytesRead() / (float)timer.get_usec(), 
" MBPS)\n");
 
 1656   template <
typename GraphTy,
 
 1657             typename std::enable_if<std::is_void<
 
 1659   void edgeCutLoad(GraphTy& graph,
 
 1676           uint32_t lsrc = this->G2LEdgeCut(n, globalOffset);
 
 1679           for (; ii < ee; ++ii) {
 
 1681             decltype(gdst) ldst = this->G2LEdgeCut(gdst, globalOffset);
 
 1682             graph.constructEdge(cur++, ldst);
 
 1684           assert(cur == (*graph.
edge_end(lsrc)));
 
 1693                    "] Edge loading time: ", timer.get_usec() / 1000000.0f,
 
 1694                    " seconds to read ", bGraph.
getBytesRead(), 
" bytes (",
 
 1695                    bGraph.
getBytesRead() / (float)timer.get_usec(), 
" MBPS)\n");
 
 1708                       std::vector<std::vector<uint64_t>>& numOutgoingEdges,
 
 1709                       std::vector<galois::DynamicBitSet>& hasIncomingEdge,
 
 1717       numOutgoingEdges[i].assign(numRead, 0);
 
 1721     hostHasOutgoing.
resize(base_DistGraph::numHosts);
 
 1722     hostHasOutgoing.
reset();
 
 1723     assignEdges(bufGraph, numOutgoingEdges, hasIncomingEdge, hostHasOutgoing);
 
 1725     inspectionTimer.
stop();
 
 1730         "] Edge inspection time: ", inspectionTimer.
get_usec() / 1000000.0f,
 
 1731         " seconds to read ", allBytesRead, 
" bytes (",
 
 1732         allBytesRead / (float)inspectionTimer.
get_usec(), 
" MBPS)\n");
 
 1737     sendInspectionData(numOutgoingEdges, hasIncomingEdge, hostHasOutgoing);
 
 1742     if (hasIncomingEdge[myHostID].
size() == 0) {
 
 1744       hasIncomingEdge[myHostID].reset();
 
 1746     recvInspectionData(numOutgoingEdges, hasIncomingEdge[myHostID]);
 
 1763                    std::vector<std::vector<uint64_t>>& numOutgoingEdges,
 
 1764                    std::vector<galois::DynamicBitSet>& hasIncomingEdge,
 
 1766     std::vector<galois::CopyableAtomic<char>> indicatorVars(
 
 1767         base_DistGraph::numHosts);
 
 1770       indicatorVars[i] = 0;
 
 1777     for (
unsigned syncRound = 0; syncRound < _edgeStateRounds; syncRound++) {
 
 1782           syncRound, _edgeStateRounds);
 
 1790             auto ee_end        = bufGraph.
edgeEnd(src);
 
 1791             uint64_t numEdgesL = std::distance(ee, ee_end);
 
 1793             for (; ee != ee_end; ee++) {
 
 1795               uint32_t hostBelongs = -1;
 
 1796               hostBelongs = graphPartitioner->getEdgeOwner(src, dst, numEdgesL);
 
 1797               if (_edgeStateRounds > 1) {
 
 1798                 hostLoads[hostBelongs] += 1;
 
 1801               numOutgoingEdges[hostBelongs][src - globalOffset] += 1;
 
 1802               hostHasOutgoing.
set(hostBelongs);
 
 1803               bool hostIsMasterOfDest =
 
 1804                   (hostBelongs == graphPartitioner->retrieveMaster(dst));
 
 1808               if (!hostIsMasterOfDest) {
 
 1809                 auto& bitsetStatus = indicatorVars[hostBelongs];
 
 1812                 if (bitsetStatus == 0) {
 
 1815                       bitsetStatus.compare_exchange_strong(expected, 1);
 
 1818                     hasIncomingEdge[hostBelongs].resize(globalNodes);
 
 1819                     hasIncomingEdge[hostBelongs].reset();
 
 1824                 while (indicatorVars[hostBelongs] != 2)
 
 1826                 hasIncomingEdge[hostBelongs].set(dst);
 
 1850                              const std::vector<uint64_t>& hostOutgoingEdges) {
 
 1854     std::vector<uint64_t> threadPrefixSums(activeThreads);
 
 1857     assert(hostSize == hostOutgoingEdges.size());
 
 1864       std::tie(beginNode, endNode) =
 
 1867       for (
size_t i = beginNode; i < endNode; i++) {
 
 1868         if (hostOutgoingEdges[i] > 0) {
 
 1872       threadPrefixSums[tid] = count;
 
 1876     for (
unsigned int i = 1; i < threadPrefixSums.size(); i++) {
 
 1877       threadPrefixSums[i] += threadPrefixSums[i - 1];
 
 1880     uint32_t numNonZero = threadPrefixSums[activeThreads - 1];
 
 1881     std::vector<uint32_t> masterLocation;
 
 1882     masterLocation.resize(numNonZero, (uint32_t)-1);
 
 1884     assert(numNonZero > 0);
 
 1893       std::tie(beginNode, endNode) =
 
 1896       uint32_t threadStartLocation = 0;
 
 1898         threadStartLocation = threadPrefixSums[tid - 1];
 
 1901       uint32_t handledNodes = 0;
 
 1902       for (
size_t i = beginNode; i < endNode; i++) {
 
 1903         if (hostOutgoingEdges[i] > 0) {
 
 1905           masterLocation[threadStartLocation + handledNodes] =
 
 1906               graphPartitioner->retrieveMaster(i + startNode);
 
 1913     for (uint32_t i : masterLocation) {
 
 1914       assert(i != (uint32_t)-1);
 
 1920     galois::runtime::gSerialize(b, masterLocation);
 
 1926     size_t numOfNodes = hostIncomingEdges.
count();
 
 1927     std::vector<uint32_t> masterMap;
 
 1928     masterMap.resize(numOfNodes, (uint32_t)-1);
 
 1930     std::vector<uint32_t> bitsetOffsets = hostIncomingEdges.
getOffsets();
 
 1947         [&](
size_t offset) {
 
 1949               graphPartitioner->retrieveMaster(bitsetOffsets[offset]);
 
 1954     for (uint32_t i : masterMap) {
 
 1955       assert(i != (uint32_t)-1);
 
 1956       assert(i < base_DistGraph::numHosts);
 
 1962     galois::runtime::gSerialize(b, masterMap);
 
 1965   void deserializeOutgoingMasterMap(
 
 1966       uint32_t senderHost, 
const std::vector<uint64_t>& hostOutgoingEdges,
 
 1967       const std::vector<uint32_t>& recvMasterLocations) {
 
 1971     assert(hostSize == hostOutgoingEdges.size());
 
 1973     offsetsToConsider.
resize(hostSize);
 
 1974     offsetsToConsider.reset();
 
 1980         [&](
size_t offset) {
 
 1981           if (hostOutgoingEdges[offset] > 0) {
 
 1982             offsetsToConsider.set(offset);
 
 1986     assert(offsetsToConsider.count() == recvMasterLocations.size());
 
 1991     size_t curCount = 0;
 
 1993     for (uint32_t offset : offsetsToConsider.getOffsets()) {
 
 1997       graphPartitioner->addMasterMapping(offset + hostOffset,
 
 1998                                          recvMasterLocations[curCount]);
 
 2016   void deserializeIncomingMasterMap(
 
 2017       const std::vector<uint32_t>& gids,
 
 2018       const std::vector<uint32_t>& recvMasterLocations) {
 
 2019     assert(gids.size() == recvMasterLocations.size());
 
 2020     size_t curCount = 0;
 
 2021     for (uint64_t gid : gids) {
 
 2026       graphPartitioner->addMasterMapping(gid, recvMasterLocations[curCount]);
 
 2041   void sendInspectionData(std::vector<std::vector<uint64_t>>& numOutgoingEdges,
 
 2042                           std::vector<galois::DynamicBitSet>& hasIncomingEdge,
 
 2049     for (
unsigned h = 0; h < net.Num; h++) {
 
 2052         if (!hostHasOutgoing.
test(h)) {
 
 2053           numOutgoingEdges[h].clear();
 
 2061       if (hostHasOutgoing.
test(h)) {
 
 2062         galois::runtime::gSerialize(b, 1); 
 
 2063         galois::runtime::gSerialize(b, numOutgoingEdges[h]);
 
 2064         if (graphPartitioner->masterAssignPhase()) {
 
 2065           serializeOutgoingMasterMap(b, numOutgoingEdges[h]);
 
 2068         galois::runtime::gSerialize(b, 0); 
 
 2070       numOutgoingEdges[h].clear();
 
 2074       uint64_t bitsetSize              = curBitset.
size(); 
 
 2075       uint64_t onlyOffsetsSize         = curBitset.
count() * 32;
 
 2076       if (bitsetSize == 0) {
 
 2078         galois::runtime::gSerialize(b, 0);
 
 2079       } 
else if (onlyOffsetsSize <= bitsetSize) {
 
 2081         std::vector<uint32_t> offsets = curBitset.
getOffsets();
 
 2082         galois::runtime::gSerialize(b, 2); 
 
 2083         galois::runtime::gSerialize(b, offsets);
 
 2085         if (graphPartitioner->masterAssignPhase()) {
 
 2088           serializeIncomingMasterMap(b, curBitset);
 
 2092         galois::runtime::gSerialize(b, 1);
 
 2093         galois::runtime::gSerialize(b, curBitset);
 
 2094         if (graphPartitioner->masterAssignPhase()) {
 
 2097           serializeIncomingMasterMap(b, curBitset);
 
 2111         GRNAME, std::string(
"EdgeInspectionBytesSent"), bytesSent.
reduce());
 
 2125   void recvInspectionData(std::vector<std::vector<uint64_t>>& numOutgoingEdges,
 
 2129     for (
unsigned h = 0; h < net.Num - 1; h++) {
 
 2136       uint32_t sendingHost = p->first;
 
 2139       uint32_t outgoingExists = 2;
 
 2142       if (outgoingExists == 1) {
 
 2146         if (graphPartitioner->masterAssignPhase()) {
 
 2147           std::vector<uint32_t> recvMasterLocations;
 
 2149           deserializeOutgoingMasterMap(
 
 2150               sendingHost, numOutgoingEdges[sendingHost], recvMasterLocations);
 
 2152       } 
else if (outgoingExists == 0) {
 
 2154         numOutgoingEdges[sendingHost].clear();
 
 2156         GALOIS_DIE(
"invalid recv inspection data metadata mode, outgoing");
 
 2159       uint32_t bitsetMetaMode = 3; 
 
 2161       if (bitsetMetaMode == 1) {
 
 2167         if (graphPartitioner->masterAssignPhase()) {
 
 2168           std::vector<uint32_t> recvMasterLocations;
 
 2170           deserializeIncomingMasterMap(recvSet.
getOffsets(),
 
 2171                                        recvMasterLocations);
 
 2173       } 
else if (bitsetMetaMode == 2) {
 
 2175         std::vector<uint32_t> recvOffsets;
 
 2177         for (uint32_t offset : recvOffsets) {
 
 2178           hasIncomingEdge.
set(offset);
 
 2181         if (graphPartitioner->masterAssignPhase()) {
 
 2182           std::vector<uint32_t> recvMasterLocations;
 
 2184           deserializeIncomingMasterMap(recvOffsets, recvMasterLocations);
 
 2186       } 
else if (bitsetMetaMode == 0) {
 
 2189         GALOIS_DIE(
"invalid recv inspection data metadata mode");
 
 2194                    "] Inspection receives complete.\n");
 
 2202   nodeMapping(std::vector<std::vector<uint64_t>>& numOutgoingEdges,
 
 2205     base_DistGraph::numNodes = 0;
 
 2211                              base_DistGraph::numHosts * 1.15);
 
 2215     inspectMasterNodes(numOutgoingEdges, prefixSumOfEdges);
 
 2216     inspectOutgoingNodes(numOutgoingEdges, prefixSumOfEdges);
 
 2217     createIntermediateMetadata(prefixSumOfEdges, hasIncomingEdge.
count());
 
 2218     inspectIncomingNodes(hasIncomingEdge, prefixSumOfEdges);
 
 2219     finalizeInspection(prefixSumOfEdges);
 
 2222                    "] To receive this many nodes: ", nodesToReceive);
 
 2225     return prefixSumOfEdges;
 
 2232   void inspectMasterNodes(std::vector<std::vector<uint64_t>>& numOutgoingEdges,
 
 2241       std::vector<uint64_t> threadPrefixSums(activeThreads);
 
 2244       size_t hostSize    = lastNode - startNode;
 
 2246       if (numOutgoingEdges[h].
size() != 0) {
 
 2247         assert(hostSize == numOutgoingEdges[h].
size());
 
 2256         std::tie(beginNode, endNode) =
 
 2259         for (
size_t i = beginNode; i < endNode; i++) {
 
 2263           if (graphPartitioner->retrieveMaster(i + startNode) == myHID) {
 
 2267         threadPrefixSums[tid] = count;
 
 2271       for (
unsigned int i = 1; i < threadPrefixSums.size(); i++) {
 
 2272         threadPrefixSums[i] += threadPrefixSums[i - 1];
 
 2277              base_DistGraph::numNodes);
 
 2279       uint32_t newMasterNodes = threadPrefixSums[activeThreads - 1];
 
 2281                      h, 
": ", newMasterNodes);
 
 2284       prefixSumOfEdges.resize(base_DistGraph::numNodes + newMasterNodes);
 
 2288       if (newMasterNodes > 0) {
 
 2293           std::tie(beginNode, endNode) =
 
 2297           uint32_t threadStartLocation = 0;
 
 2299             threadStartLocation = threadPrefixSums[tid - 1];
 
 2302           uint32_t handledNodes = 0;
 
 2303           for (
size_t i = beginNode; i < endNode; i++) {
 
 2304             uint32_t globalID = startNode + i;
 
 2306             if (graphPartitioner->retrieveMaster(globalID) == myHID) {
 
 2308               if (numOutgoingEdges[h].
size() > 0) {
 
 2309                 uint64_t myEdges       = numOutgoingEdges[h][i];
 
 2310                 numOutgoingEdges[h][i] = 0; 
 
 2312                 prefixSumOfEdges[startingNodeIndex + threadStartLocation +
 
 2313                                  handledNodes] = myEdges;
 
 2314                 if (myEdges > 0 && h != myHID) {
 
 2318                 prefixSumOfEdges[startingNodeIndex + threadStartLocation +
 
 2323                                                   threadStartLocation +
 
 2324                                                   handledNodes] = globalID;
 
 2329         base_DistGraph::numNodes += newMasterNodes;
 
 2333     nodesToReceive += toReceive.
reduce();
 
 2343   inspectOutgoingNodes(std::vector<std::vector<uint64_t>>& numOutgoingEdges,
 
 2351       size_t hostSize = numOutgoingEdges[h].size();
 
 2353       if (hostSize == 0) {
 
 2358       std::vector<uint64_t> threadPrefixSums(activeThreads);
 
 2365         std::tie(beginNode, endNode) =
 
 2368         for (
size_t i = beginNode; i < endNode; i++) {
 
 2369           if (numOutgoingEdges[h][i] > 0) {
 
 2373         threadPrefixSums[tid] = count;
 
 2377       for (
unsigned int i = 1; i < threadPrefixSums.size(); i++) {
 
 2378         threadPrefixSums[i] += threadPrefixSums[i - 1];
 
 2383              base_DistGraph::numNodes);
 
 2385       uint32_t newOutgoingNodes = threadPrefixSums[activeThreads - 1];
 
 2387       prefixSumOfEdges.resize(base_DistGraph::numNodes + newOutgoingNodes);
 
 2394       if (newOutgoingNodes > 0) {
 
 2399           std::tie(beginNode, endNode) =
 
 2403           uint32_t threadStartLocation = 0;
 
 2405             threadStartLocation = threadPrefixSums[tid - 1];
 
 2408           uint32_t handledNodes = 0;
 
 2410           for (
size_t i = beginNode; i < endNode; i++) {
 
 2411             uint64_t myEdges = numOutgoingEdges[h][i];
 
 2413               prefixSumOfEdges[startingNodeIndex + threadStartLocation +
 
 2414                                handledNodes]                    = myEdges;
 
 2416                                                   threadStartLocation +
 
 2417                                                   handledNodes] = startNode + i;
 
 2420               if (myEdges > 0 && h != myHID) {
 
 2426         base_DistGraph::numNodes += newOutgoingNodes;
 
 2429       numOutgoingEdges[h].clear();
 
 2432     nodesToReceive += toReceive.
reduce();
 
 2445                              const uint64_t incomingEstimate) {
 
 2446     if (base_DistGraph::numNodes == 0) {
 
 2455       prefixSumOfEdges[i] += prefixSumOfEdges[i - 1];
 
 2470     std::vector<uint64_t> threadPrefixSums(activeThreads);
 
 2475       std::tie(beginNode, endNode) =
 
 2478       for (
size_t i = beginNode; i < endNode; i++) {
 
 2481         if (hasIncomingEdge.
test(i) &&
 
 2485       threadPrefixSums[tid] = count;
 
 2488     for (
unsigned int i = 1; i < threadPrefixSums.size(); i++) {
 
 2489       threadPrefixSums[i] += threadPrefixSums[i - 1];
 
 2493     assert(base_DistGraph::localToGlobalVector.
size() ==
 
 2494            base_DistGraph::numNodes);
 
 2496     uint32_t newIncomingNodes = threadPrefixSums[activeThreads - 1];
 
 2498     prefixSumOfEdges.resize(base_DistGraph::numNodes + newIncomingNodes);
 
 2499     base_DistGraph::localToGlobalVector.resize(base_DistGraph::numNodes +
 
 2504     if (newIncomingNodes > 0) {
 
 2509         std::tie(beginNode, endNode) =
 
 2513         uint32_t threadStartLocation = 0;
 
 2515           threadStartLocation = threadPrefixSums[tid - 1];
 
 2518         uint32_t handledNodes = 0;
 
 2520         for (
size_t i = beginNode; i < endNode; i++) {
 
 2521           if (hasIncomingEdge.
test(i) &&
 
 2523             prefixSumOfEdges[startingNodeIndex + threadStartLocation +
 
 2525             base_DistGraph::localToGlobalVector[startingNodeIndex +
 
 2526                                                 threadStartLocation +
 
 2532       base_DistGraph::numNodes += newIncomingNodes;
 
 2545       prefixSumOfEdges[i] += prefixSumOfEdges[i - 1];
 
 2550     if (prefixSumOfEdges.size() != 0) {
 
 2563   void fillMirrors() {
 
 2568       uint32_t globalID = base_DistGraph::localToGlobalVector[i];
 
 2570           .push_back(globalID);
 
 2576   template <
typename GraphTy>
 
 2577   void loadEdges(GraphTy& graph,
 
 2580       if (std::is_void<typename GraphTy::edge_data_type>::value) {
 
 2581         fprintf(stderr, 
"Loading void edge-data while creating edges.\n");
 
 2583         fprintf(stderr, 
"Loading edge-data while creating edges.\n");
 
 2589     std::atomic<uint32_t> receivedNodes;
 
 2590     receivedNodes.store(0);
 
 2593     loadEdgeTimer.start();
 
 2596     sendEdges(graph, bufGraph, receivedNodes);
 
 2603         [&](
unsigned, 
unsigned) { receiveEdges(graph, receivedNodes); });
 
 2606     loadEdgeTimer.stop();
 
 2609                    loadEdgeTimer.get_usec() / 1000000.0f, 
" seconds to read ",
 
 2610                    bufBytesRead, 
" bytes (",
 
 2611                    bufBytesRead / (float)loadEdgeTimer.get_usec(), 
" MBPS)\n");
 
 2615   template <
typename GraphTy,
 
 2616             typename std::enable_if<!std::is_void<
 
 2618   void sendEdges(GraphTy& graph,
 
 2620                  std::atomic<uint32_t>& receivedNodes) {
 
 2621     using DstVecType = std::vector<std::vector<uint64_t>>;
 
 2623         std::vector<std::vector<typename GraphTy::edge_data_type>>;
 
 2624     using SendBufferVecTy = std::vector<galois::runtime::SendBuffer>;
 
 2627         base_DistGraph::numHosts);
 
 2629         base_DistGraph::numHosts);
 
 2631         base_DistGraph::numHosts);
 
 2640     messagesSent.
reset();
 
 2642     maxBytesSent.
reset();
 
 2644     for (
unsigned syncRound = 0; syncRound < _edgeStateRounds; syncRound++) {
 
 2657             uint64_t curEdge = 0;
 
 2659               lsrc = this->
G2L(src);
 
 2665             auto ee_end        = bufGraph.
edgeEnd(src);
 
 2666             uint64_t numEdgesL = std::distance(ee, ee_end);
 
 2667             auto& gdst_vec     = *gdst_vecs.getLocal();
 
 2668             auto& gdata_vec    = *gdata_vecs.getLocal();
 
 2670             for (
unsigned i = 0; i < 
numHosts; ++i) {
 
 2671               gdst_vec[i].clear();
 
 2672               gdata_vec[i].clear();
 
 2673               gdst_vec[i].reserve(numEdgesL);
 
 2677             for (; ee != ee_end; ++ee) {
 
 2679               auto gdata    = bufGraph.
edgeData(*ee);
 
 2681               uint32_t hostBelongs =
 
 2682                   graphPartitioner->getEdgeOwner(src, gdst, numEdgesL);
 
 2683               if (_edgeStateRounds > 1) {
 
 2684                 hostLoads[hostBelongs] += 1;
 
 2687               if (hostBelongs == 
id) {
 
 2690                 uint32_t ldst = this->
G2L(gdst);
 
 2691                 graph.constructEdge(curEdge++, ldst, gdata);
 
 2696                 gdst_vec[hostBelongs].push_back(gdst);
 
 2697                 gdata_vec[hostBelongs].push_back(gdata);
 
 2703               assert(curEdge == (*graph.edge_end(lsrc)));
 
 2707             for (uint32_t h = 0; h < 
numHosts; ++h) {
 
 2711               if (gdst_vec[h].
size() > 0) {
 
 2712                 auto& b = (*sendBuffers.getLocal())[h];
 
 2713                 galois::runtime::gSerialize(b, src);
 
 2714                 galois::runtime::gSerialize(b, gdst_vec[h]);
 
 2715                 galois::runtime::gSerialize(b, gdata_vec[h]);
 
 2718                 if (b.
size() > edgePartitionSendBufSize) {
 
 2733             this->processReceivedEdgeBuffer(buffer, graph, receivedNodes);
 
 2744     for (
unsigned threadNum = 0; threadNum < sendBuffers.size(); ++threadNum) {
 
 2745       auto& sbr = *sendBuffers.getRemote(threadNum);
 
 2749         auto& sendBuffer = sbr[h];
 
 2750         if (sendBuffer.size() > 0) {
 
 2752           bytesSent.
update(sendBuffer.size());
 
 2753           maxBytesSent.
update(sendBuffer.size());
 
 2756           sendBuffer.getVec().clear();
 
 2764         GRNAME, std::string(
"EdgeLoadingMessagesSent"), messagesSent.
reduce());
 
 2766         GRNAME, std::string(
"EdgeLoadingBytesSent"), bytesSent.
reduce());
 
 2768         GRNAME, std::string(
"EdgeLoadingMaxBytesSent"), maxBytesSent.
reduce());
 
 2772   template <
typename GraphTy,
 
 2773             typename std::enable_if<std::is_void<
 
 2775   void sendEdges(GraphTy& graph,
 
 2777                  std::atomic<uint32_t>& receivedNodes) {
 
 2778     using DstVecType      = std::vector<std::vector<uint64_t>>;
 
 2779     using SendBufferVecTy = std::vector<galois::runtime::SendBuffer>;
 
 2782         base_DistGraph::numHosts);
 
 2784         base_DistGraph::numHosts);
 
 2793     messagesSent.
reset();
 
 2795     maxBytesSent.
reset();
 
 2797     for (
unsigned syncRound = 0; syncRound < _edgeStateRounds; syncRound++) {
 
 2810             uint64_t curEdge = 0;
 
 2812               lsrc = this->
G2L(src);
 
 2818             auto ee_end        = bufGraph.
edgeEnd(src);
 
 2819             uint64_t numEdgesL = std::distance(ee, ee_end);
 
 2820             auto& gdst_vec     = *gdst_vecs.getLocal();
 
 2822             for (
unsigned i = 0; i < 
numHosts; ++i) {
 
 2823               gdst_vec[i].clear();
 
 2827             for (; ee != ee_end; ++ee) {
 
 2829               uint32_t hostBelongs =
 
 2830                   graphPartitioner->getEdgeOwner(src, gdst, numEdgesL);
 
 2831               if (_edgeStateRounds > 1) {
 
 2832                 hostLoads[hostBelongs] += 1;
 
 2835               if (hostBelongs == 
id) {
 
 2838                 uint32_t ldst = this->
G2L(gdst);
 
 2839                 graph.constructEdge(curEdge++, ldst);
 
 2844                 gdst_vec[hostBelongs].push_back(gdst);
 
 2850               assert(curEdge == (*graph.edge_end(lsrc)));
 
 2854             for (uint32_t h = 0; h < 
numHosts; ++h) {
 
 2858               if (gdst_vec[h].
size() > 0) {
 
 2859                 auto& b = (*sendBuffers.getLocal())[h];
 
 2860                 galois::runtime::gSerialize(b, src);
 
 2861                 galois::runtime::gSerialize(b, gdst_vec[h]);
 
 2864                 if (b.
size() > edgePartitionSendBufSize) {
 
 2879             this->processReceivedEdgeBuffer(buffer, graph, receivedNodes);
 
 2890     for (
unsigned threadNum = 0; threadNum < sendBuffers.size(); ++threadNum) {
 
 2891       auto& sbr = *sendBuffers.getRemote(threadNum);
 
 2895         auto& sendBuffer = sbr[h];
 
 2896         if (sendBuffer.size() > 0) {
 
 2898           bytesSent.
update(sendBuffer.size());
 
 2899           maxBytesSent.
update(sendBuffer.size());
 
 2902           sendBuffer.getVec().clear();
 
 2910         GRNAME, std::string(
"EdgeLoadingMessagesSent"), messagesSent.
reduce());
 
 2912         GRNAME, std::string(
"EdgeLoadingBytesSent"), bytesSent.
reduce());
 
 2914         GRNAME, std::string(
"EdgeLoadingMaxBytesSent"), maxBytesSent.
reduce());
 
 2918   template <
typename GraphTy>
 
 2919   void processReceivedEdgeBuffer(
 
 2920       std::optional<std::pair<uint32_t, galois::runtime::RecvBuffer>>& buffer,
 
 2921       GraphTy& graph, std::atomic<uint32_t>& receivedNodes) {
 
 2923       auto& rb = buffer->second;
 
 2924       while (rb.r_size() > 0) {
 
 2926         std::vector<uint64_t> gdst_vec;
 
 2930         uint32_t lsrc = this->
G2L(n);
 
 2932         uint64_t cur_end = *graph.edge_end(lsrc);
 
 2933         assert((cur_end - cur) == gdst_vec.size());
 
 2934         deserializeEdges(graph, rb, gdst_vec, cur, cur_end);
 
 2944   template <
typename GraphTy>
 
 2945   void receiveEdges(GraphTy& graph, std::atomic<uint32_t>& receivedNodes) {
 
 2949     while (receivedNodes < nodesToReceive) {
 
 2952       processReceivedEdgeBuffer(p, graph, receivedNodes);
 
 2956   template <
typename GraphTy,
 
 2957             typename std::enable_if<!std::is_void<
 
 2960                         std::vector<uint64_t>& gdst_vec, uint64_t& cur,
 
 2961                         uint64_t& cur_end) {
 
 2962     std::vector<typename GraphTy::edge_data_type> gdata_vec;
 
 2965     while (cur < cur_end) {
 
 2966       auto gdata    = gdata_vec[i];
 
 2967       uint64_t gdst = gdst_vec[i++];
 
 2968       uint32_t ldst = this->
G2L(gdst);
 
 2969       graph.constructEdge(cur++, ldst, gdata);
 
 2974   template <
typename GraphTy,
 
 2975             typename std::enable_if<std::is_void<
 
 2978                         std::vector<uint64_t>& gdst_vec, uint64_t& cur,
 
 2979                         uint64_t& cur_end) {
 
 2981     while (cur < cur_end) {
 
 2982       uint64_t gdst = gdst_vec[i++];
 
 2983       uint32_t ldst = this->
G2L(gdst);
 
 2984       graph.constructEdge(cur++, ldst);
 
 2991 template <
typename NodeTy, 
typename EdgeTy, 
typename Partitioner>
 
 2992 constexpr 
const char* 
const 
EdgeIterator edgeEnd(uint64_t globalNodeID)
Get the index to the first edge of the node after the provided node. 
Definition: BufferedGraph.h:414
Distributed sum-reducer for getting the sum of some value across multiple hosts. 
Definition: DReducible.h:45
void printEdgeLoad()
Debug function: prints host loads. 
Definition: NewGeneric.h:138
uint64_t numGlobalEdges
Total edges in the global unpartitioned graph. 
Definition: DistributedGraph.h:80
void gInfo(Args &&...args)
Prints an info string from a sequence of things. 
Definition: gIO.h:55
void increment_evilPhase()
Increments evilPhase, a phase counter used by communication. 
Definition: DistributedGraph.h:133
unsigned int getActiveThreads() noexcept
Returns the number of threads in use. 
Definition: Threads.cpp:37
std::pair< IterTy, IterTy > block_range(IterTy b, IterTy e, unsigned id, unsigned num)
Returns a continuous block from the range based on the number of divisions and the id of the block re...
Definition: gstl.h:244
void reset()
Definition: Reduction.h:113
Class that loads a portion of a Galois graph from disk directly into memory buffers for access...
Definition: BufferedGraph.h:49
uint64_t numGlobalNodes
Total nodes in the global unpartitioned graph. 
Definition: DistributedGraph.h:79
void bitwise_or(const DynamicBitSet &other)
Definition: libgalois/include/galois/DynamicBitset.h:241
const uint32_t numHosts
Total number of machines. 
Definition: DistributedGraph.h:85
EdgeIterator edgeBegin(uint64_t globalNodeID)
Get the index to the first edge of the provided node THAT THIS GRAPH HAS LOADED (not necessary the fi...
Definition: BufferedGraph.h:386
void reportStat_Tmax(const S1 ®ion, const S2 &category, const T &value)
Definition: Statistics.h:556
EdgeDataType edgeData(uint64_t globalEdgeID)
Get the edge data of some edge. 
Definition: BufferedGraph.h:462
std::vector< uint32_t > getOffsets() const 
Returns a vector containing the set bits in this bitset in order from left to right. 
Definition: libgalois/include/galois/DynamicBitset.h:347
void resize(size_t n)
Definition: PODResizeableArray.h:142
Concurrent dynamically allocated bitset. 
Definition: libgalois/include/galois/DynamicBitset.h:47
void reportParam(const S1 ®ion, const S2 &category, const V &value)
Definition: Statistics.h:616
Buffer for serialization of data. 
Definition: Serialize.h:56
void resetEdgeLoad()
Reset load balance on host reducibles. 
Definition: NewGeneric.h:110
void gDebug(Args &&...GALOIS_USED_ONLY_IN_DEBUG(args))
Prints a debug string from a sequence of things; prints nothing if NDEBUG is defined. 
Definition: gIO.h:72
virtual std::pair< unsigned, unsigned > cartesianGrid() const 
Returns Cartesian split (if it exists, else returns pair of 0s. 
Definition: NewGeneric.h:103
size_t size() const 
Gets number of nodes on this (local) graph. 
Definition: DistributedGraph.h:646
void gDeserialize(DeSerializeBuffer &buf, T1 &&t1, Args &&...args)
Deserialize data in a buffer into a series of objects. 
Definition: Serialize.h:1032
uint32_t size() const 
Gets the number of global nodes in the graph. 
Definition: BufferedGraph.h:294
Ty reset()
Reset the entire accumulator. 
Definition: DReducible.h:153
SpecificRange< IterTy > makeSpecificRange(IterTy begin, IterTy end, const uint32_t *thread_ranges)
Creates a SpecificRange object. 
Definition: Range.h:219
void transpose(const char *regionName=NULL)
Perform an in-memory transpose of the graph, replacing the original CSR to CSC. 
Definition: LC_CSR_Graph.h:615
const char * loopname
Definition: Executor_ParaMeter.h:145
#define GALOIS_DIE(...)
Definition: gIO.h:96
void read_local_graph_from_file(std::string)
Read the local LC_CSR graph from the file on a disk. 
Definition: DistributedGraph.h:834
uint64_t numEdges
Num edges in this graph in total. 
Definition: DistributedGraph.h:82
void resetReadCounters()
Reset reading counters. 
Definition: BufferedGraph.h:496
void reportStat_Tsum(const S1 ®ion, const S2 &category, const T &value)
Definition: Statistics.h:562
vTy & getVec()
Returns vector of data stored in this serialize buffer. 
Definition: Serialize.h:115
bool test(size_t index) const 
Check a bit to see if it is currently set. 
Definition: libgalois/include/galois/DynamicBitset.h:192
Class that inherits from std::atomic to make it copyable by defining a copy constructor. 
Definition: AtomicWrapper.h:40
uint32_t G2L(uint64_t gid) const 
Definition: DistributedGraph.h:482
void determineThreadRanges()
Uses a pre-computed prefix sum to determine division of nodes among threads. 
Definition: DistributedGraph.h:725
void reserve(size_t n)
Definition: PODResizeableArray.h:129
edge_iterator edge_begin(GraphNode N)
Definition: OfflineGraph.h:278
Accumulator for T where accumulation is max. 
Definition: Reduction.h:189
void loadPartialGraph(const std::string &filename, uint64_t nodeStart, uint64_t nodeEnd, uint64_t edgeStart, uint64_t edgeEnd, uint64_t numGlobalNodes, uint64_t numGlobalEdges)
Given a node/edge range to load, loads the specified portion of the graph into memory buffers using r...
Definition: BufferedGraph.h:348
uint64_t count() const 
Count how many bits are set in the bitset. 
Definition: libgalois/include/galois/DynamicBitset.h:320
std::vector< std::vector< size_t > > mirrorNodes
Mirror nodes from different hosts. For reduce. 
Definition: DistributedGraph.h:100
void resize(uint64_t n)
Resizes the bitset. 
Definition: libgalois/include/galois/DynamicBitset.h:78
std::vector< std::pair< uint64_t, uint64_t > > gid2host
Information that converts host to range of nodes that host reads. 
Definition: DistributedGraph.h:98
std::vector< T, Pow2Alloc< T >> Vector
[STL vector using Pow_2_VarSizeAlloc] 
Definition: gstl.h:52
~NewDistGraphGeneric()
Free the graph partitioner. 
Definition: NewGeneric.h:368
virtual unsigned getHostID(uint64_t gid) const 
Determines which host has the master for a particular node. 
Definition: NewGeneric.h:82
void reportStat_Single(const S1 ®ion, const S2 &category, const T &value)
Definition: Statistics.h:544
Definition: PerThreadStorage.h:88
void reset()
Gets the space taken by the bitset. 
Definition: libgalois/include/galois/DynamicBitset.h:110
virtual bool isOwned(uint64_t gid) const 
Determine if a node has a master on this host. 
Definition: NewGeneric.h:87
Definition: OfflineGraph.h:63
size_type size() const 
Returns the size of the serialize buffer. 
Definition: Serialize.h:125
void constructNodes()
Definition: LC_CSR_Graph.h:570
MASTERS_DISTRIBUTION
Enums specifying how masters are to be distributed among hosts. 
Definition: DistributedGraph.h:48
bool transposed
Marks if the graph is transposed or not. 
Definition: DistributedGraph.h:76
void determineThreadRangesMaster()
Determines the thread ranges for master nodes only and saves them to the object. 
Definition: DistributedGraph.h:736
void gPrint(Args &&...args)
Prints a sequence of things. 
Definition: gIO.h:47
edge_iterator edge_end(GraphNode N)
Gets the end edge boundary of some node. 
Definition: DistributedGraph.h:625
unsigned evilPhasePlus1()
Returns evilPhase + 1, handling loop around as necessary. 
Definition: DistributedGraph.h:144
unsigned int activeThreads
Definition: Threads.cpp:26
boost::counting_iterator< uint64_t > edge_iterator
Definition: OfflineGraph.h:194
uint64_t getNodeOffset() const 
Definition: BufferedGraph.h:304
NetworkInterface & getSystemNetworkInterface()
Get the network interface. 
Definition: Network.cpp:131
size_t size() const 
Gets the size of the bitset. 
Definition: libgalois/include/galois/DynamicBitset.h:99
void syncEdgeLoad()
Sync load balance on hosts using reducibles. 
Definition: NewGeneric.h:124
size_type size() const 
Definition: PODResizeableArray.h:125
SpecificRange is a range type where a threads range is specified by an an int array that tells you wh...
Definition: Range.h:122
uint32_t numNodes
Num nodes in this graph in total. 
Definition: DistributedGraph.h:81
void clear()
Definition: PODResizeableArray.h:147
Contains the implementation for DistGraph. 
uint32_t numOwned
Number of nodes owned (masters) by this host. 
Definition: DistributedGraph.h:89
void do_all(const RangeFunc &rangeMaker, FunctionTy &&fn, const Args &...args)
Standard do-all loop. 
Definition: Loops.h:71
Definition: NewGeneric.h:47
bool set(size_t index)
Set a bit in the bitset. 
Definition: libgalois/include/galois/DynamicBitset.h:206
uint64_t getBytesRead()
Returns the total number of bytes read from this graph so far. 
Definition: BufferedGraph.h:508
uint32_t numNodesWithEdges
Number of nodes (masters + mirrors) that have outgoing edges. 
Definition: DistributedGraph.h:94
void start()
Definition: Timer.cpp:82
void on_each(FunctionTy &&fn, const Args &...args)
Low-level parallel loop. 
Definition: Loops.h:86
virtual bool is_vertex_cut() const 
Returns true if current partition is a vertex cut. 
Definition: NewGeneric.h:102
uint64_t computeMasters(MASTERS_DISTRIBUTION masters_distribution, galois::graphs::OfflineGraph &g, const std::vector< unsigned > &scalefactor, uint32_t nodeWeight=0, uint32_t edgeWeight=0, unsigned DecomposeFactor=1)
Wrapper call that will call into more specific compute masters functions that compute masters based o...
Definition: DistributedGraph.h:364
void initializeSpecificRanges()
Initializes the 3 range objects that a user can access to iterate over the graph in different ways...
Definition: DistributedGraph.h:785
uint64_t get_usec() const 
Definition: Timer.cpp:92
NewDistGraphGeneric(const std::string &filename, unsigned host, unsigned _numHosts, bool cuspAsync=true, uint32_t stateRounds=100, bool transpose=false, galois::graphs::MASTERS_DISTRIBUTION md=BALANCED_EDGES_OF_MASTERS, uint32_t nodeWeight=0, uint32_t edgeWeight=0, std::string masterBlockFile="", bool readFromFile=false, std::string localGraphFileName="local_graph", uint32_t edgeStateRounds=1)
Constructor. 
Definition: NewGeneric.h:154
T & reduce()
Returns the final reduction value. 
Definition: Reduction.h:102
size_t size() const 
Definition: OfflineGraph.h:271
void readersFromFile(galois::graphs::OfflineGraph &g, std::string filename)
reader assignment from a file corresponds to master assignment if using an edge cut ...
Definition: DistributedGraph.h:404
std::unordered_map< uint64_t, uint32_t > globalToLocalMap
LID = globalToLocalMap[GID]. 
Definition: DistributedGraph.h:105
uint32_t evilPhase
Variable that keeps track of which network send/recv phase a program is currently on...
Definition: Network.cpp:36
Base DistGraph class that all distributed graphs extend from. 
Definition: DistributedGraph.h:64
Buffer for deserialization of data. 
Definition: Serialize.h:147
auto iterate(C &cont)
Definition: Range.h:323
This is a container that encapsulates a resizeable array of plain-old-datatype (POD) elements...
Definition: PODResizeableArray.h:40
void update(T &&rhs)
Updates the thread local value by applying the reduction operator to current and newly provided value...
Definition: Reduction.h:90
void resetAndFree()
Free all of the in memory buffers in this object and reset graph status. 
Definition: BufferedGraph.h:516
std::vector< uint64_t > localToGlobalVector
GID = localToGlobalVector[LID]. 
Definition: DistributedGraph.h:103
const unsigned id
ID of the machine. 
Definition: DistributedGraph.h:84
void allocateFrom(const FileGraph &graph)
Definition: LC_CSR_Graph.h:513
size_t sizeEdges() const 
Definition: OfflineGraph.h:272
uint64_t edgeDestination(uint64_t globalEdgeID)
Get the global node id of the destination of the provided edge. 
Definition: BufferedGraph.h:437
EdgeTy edge_data_type
Definition: LC_CSR_Graph.h:147
std::vector< uint32_t > determineUnitRangesFromPrefixSum(uint32_t unitsToSplit, VectorTy &edgePrefixSum, uint32_t nodeAlpha=0)
Uses the divideByNode function (which is binary search based) to divide nodes among units using a pro...
Definition: GraphHelpers.h:484
virtual bool isLocal(uint64_t gid) const 
Determine if a node has a proxy on this host. 
Definition: NewGeneric.h:92
void stop()
Definition: Timer.cpp:87
Implements distributed reducible objects for easy reduction of values across a distributed system...
Galois Timer that automatically reports stats upon destruction Provides statistic interface around ti...
Definition: Timer.h:63
GraphTy graph
The internal graph used by DistGraph to represent the graph. 
Definition: DistributedGraph.h:73
void determineThreadRangesWithEdges()
Determines the thread ranges for nodes with edges only and saves them to the object. 
Definition: DistributedGraph.h:762
uint32_t beginMaster
Local id of the beginning of master nodes. 
Definition: DistributedGraph.h:91
balance edges 
Definition: DistributedGraph.h:52