27 #ifndef _GALOIS_DIST_NEWGENERIC_H
28 #define _GALOIS_DIST_NEWGENERIC_H
35 #define CUSP_PT_TIMER 0
46 template <
typename NodeTy,
typename EdgeTy,
typename Partitioner>
49 constexpr
static unsigned edgePartitionSendBufSize = 8388608;
50 constexpr
static const char*
const GRNAME =
"dGraph_Generic";
51 Partitioner* graphPartitioner;
54 uint32_t _edgeStateRounds;
55 std::vector<galois::DGAccumulator<uint64_t>> hostLoads;
56 std::vector<uint64_t> old_hostLoads;
58 uint32_t G2LEdgeCut(uint64_t gid, uint32_t globalOffset)
const {
62 return gid - globalOffset;
71 void freeVector(V& vectorToKill) {
73 vectorToKill.swap(dummyVector);
76 uint32_t nodesToReceive;
84 return graphPartitioner->retrieveMaster(gid);
87 virtual bool isOwned(uint64_t gid)
const {
92 virtual bool isLocal(uint64_t gid)
const {
102 virtual bool is_vertex_cut()
const {
return graphPartitioner->isVertexCut(); }
104 return graphPartitioner->cartesianGrid();
111 if (_edgeStateRounds > 1) {
112 if (!graphPartitioner->noCommunication()) {
114 hostLoads[i].reset();
115 old_hostLoads[i] = 0;
125 if (_edgeStateRounds > 1) {
126 if (!graphPartitioner->noCommunication()) {
128 old_hostLoads[i] += hostLoads[i].reduce();
129 hostLoads[i].reset();
139 if (_edgeStateRounds > 1) {
140 if (!graphPartitioner->noCommunication()) {
144 old_hostLoads[i],
"\n");
155 const std::string& filename,
unsigned host,
unsigned _numHosts,
156 bool cuspAsync =
true, uint32_t stateRounds = 100,
bool transpose =
false,
158 uint32_t nodeWeight = 0, uint32_t edgeWeight = 0,
159 std::string masterBlockFile =
"",
bool readFromFile =
false,
160 std::string localGraphFileName =
"local_graph",
161 uint32_t edgeStateRounds = 1)
162 :
base_DistGraph(host, _numHosts), _edgeStateRounds(edgeStateRounds) {
165 "GraphPartitioningTime", GRNAME);
166 Tgraph_construct.
start();
170 "] Reading local graph from file ", localGraphFileName,
173 Tgraph_construct.
stop();
180 std::vector<unsigned> dummy;
182 if (masterBlockFile ==
"") {
204 std::vector<std::vector<uint64_t>> numOutgoingEdges;
206 std::vector<galois::DynamicBitSet> hasIncomingEdge;
209 if (!graphPartitioner->noCommunication()) {
210 if (_edgeStateRounds > 1) {
225 graphReadTimer.
start();
229 graphReadTimer.
stop();
232 if (graphPartitioner->masterAssignPhase()) {
236 "] Starting master assignment.\n");
238 phase0(bufGraph, cuspAsync, stateRounds);
241 "] Master assignment complete.\n");
245 inspectionTimer.
start();
250 if (!graphPartitioner->noCommunication()) {
251 edgeInspection(bufGraph, numOutgoingEdges, hasIncomingEdge,
258 nodeMapping(numOutgoingEdges, finalIncoming, prefixSumOfEdges);
264 uint64_t edgeOffset = *bufGraph.
edgeBegin(nodeBegin);
266 edgeCutInspection(bufGraph, inspectionTimer, edgeOffset,
274 if (graphPartitioner->masterAssignPhase()) {
275 graphPartitioner->enterStage2();
279 numOutgoingEdges.clear();
280 hasIncomingEdge.clear();
282 freeVector(numOutgoingEdges);
283 freeVector(hasIncomingEdge);
297 [&](uint64_t n) { base_graph.fixEndEdge(n, prefixSumOfEdges[n]); },
303 prefixSumOfEdges.clear();
304 freeVector(prefixSumOfEdges);
307 TfillMirrors.
start();
311 if (_edgeStateRounds > 1) {
317 if (!graphPartitioner->noCommunication()) {
329 if (graphPartitioner->isVertexCut() &&
330 graphPartitioner->cartesianGrid().first == 0) {
347 Tthread_ranges.
start();
349 Tthread_ranges.
stop();
355 Tgraph_construct.
stop();
361 (uint32_t)stateRounds);
373 std::vector<uint32_t>& assignedThreadRanges,
374 uint64_t startNode, uint64_t endNode) {
376 threadRangeTime.start();
377 uint64_t numLocalNodes = endNode - startNode;
379 edgePrefixSum.
resize(numLocalNodes);
385 uint64_t offset = n - startNode;
390 for (
unsigned i = 1; i < numLocalNodes; i++) {
391 edgePrefixSum[i] += edgePrefixSum[i - 1];
398 assignedThreadRanges[i] += startNode;
402 boost::counting_iterator<size_t>(startNode),
403 boost::counting_iterator<size_t>(startNode + numLocalNodes),
404 assignedThreadRanges.data());
406 threadRangeTime.stop();
423 bitsetSetupTimer.start();
428 std::vector<uint32_t> rangeVector;
433 getSpecificThreadRange(bufGraph, rangeVector, start, end);
446 for (; ii < ee; ++ii) {
448 if ((dst < start) || (dst >= end)) {
458 bitsetSetupTimer.stop();
473 uint64_t phase0MapSetup(
475 std::unordered_map<uint64_t, uint32_t>& gid2offsets,
478 mapSetupTimer.start();
482 uint32_t lid = numLocal;
484 uint64_t numToReserve = ghosts.
count();
485 gid2offsets.reserve(numToReserve);
493 for (uint64_t gid = start; gid < end; ++gid) {
494 if (ghosts.
test(gid)) {
495 gid2offsets[gid] = lid;
496 syncNodes[h].push_back(gid - start);
501 (end - start) / 64,
" vs. vector size ",
502 syncNodes[h].
size() / 2);
506 assert(lid == numToReserve);
508 (ghosts.
size() - numLocal) / 64,
" vs. total vector size ",
514 mapSetupTimer.stop();
533 p0BitsetCommTimer.start();
534 uint64_t bytesSent = 0;
541 galois::runtime::gSerialize(bitsetBuffer, syncNodes[h]);
542 bytesSent += bitsetBuffer.
size();
549 for (
unsigned h = 0; h < net.Num - 1; h++) {
554 uint32_t sendingHost = p->first;
559 p0BitsetCommTimer.stop();
562 GRNAME, std::string(
"Phase0SendRecvBitsetsBytesSent"), bytesSent);
577 void syncLoad(std::vector<uint64_t>& loads,
579 assert(loads.size() == accums.size());
583 for (
unsigned i = 0; i < loads.size(); i++) {
585 syncer += (accums[i].load());
587 uint64_t accumulation = syncer.reduce();
588 loads[i] += accumulation;
599 template <
typename VType>
603 nonAtomic.
resize(atomic.size());
608 nonAtomic[i] = atomic[i].load();
625 unsigned bytesSent = 0;
635 galois::runtime::gSerialize(b, 4);
636 galois::runtime::gSerialize(b, nodeAccum);
637 galois::runtime::gSerialize(b, edgeAccum);
638 bytesSent += b.
size();
658 void asyncRecvLoad(std::vector<uint64_t>& nodeLoads,
659 std::vector<uint64_t>& edgeLoads,
671 unsigned messageType = (unsigned)-1;
675 if (messageType == 4) {
682 assert(recvNodeAccum.
size() == recvEdgeAccum.
size());
683 assert(recvNodeAccum.
size() == nodeLoads.size());
684 assert(recvEdgeAccum.
size() == edgeLoads.size());
689 nodeLoads[i] += recvNodeAccum[i];
690 edgeLoads[i] += recvEdgeAccum[i];
693 }
else if (messageType == 3) {
695 uint32_t sendingHost = p->first;
696 assert(!loadsClear.
test(sendingHost));
697 loadsClear.
set(sendingHost);
699 GALOIS_DIE(
"unexpected message type in async load synchronization: ",
722 void asyncSyncLoad(std::vector<uint64_t>& nodeLoads,
724 std::vector<uint64_t>& edgeLoads,
738 extractAtomicToPODArray(nodeAccum, nonAtomicNodeAccum);
739 extractAtomicToPODArray(edgeAccum, nonAtomicEdgeAccum);
748 nodeLoads[i] += nonAtomicNodeAccum[i];
749 edgeLoads[i] += nonAtomicEdgeAccum[i];
754 for (
unsigned i = 0; i < nodeAccum.
size(); i++) {
755 assert(nodeAccum[i].load() == 0);
756 assert(edgeAccum[i].load() == 0);
761 asyncSendLoad(nonAtomicNodeAccum, nonAtomicEdgeAccum);
762 asyncRecvLoad(nodeLoads, edgeLoads, loadsClear);
774 void printLoad(std::vector<uint64_t>& loads,
776 assert(loads.size() == accums.size());
777 for (
unsigned i = 0; i < loads.size(); i++) {
779 " accum ", accums[i].load());
793 template <
typename T>
794 std::vector<T> getDataFromOffsets(std::vector<uint32_t>& offsetVector,
795 const std::vector<T>& dataVector) {
796 std::vector<T> toReturn;
797 toReturn.resize(offsetVector.size());
801 [&](
unsigned i) { toReturn[i] = dataVector[offsetVector[i]]; },
818 std::vector<uint32_t>& dataVector,
819 std::string timerName = std::string()) {
821 std::string statString = std::string(
"Phase0SendOffsets_") + timerName;
822 uint64_t bytesSent = 0;
826 sendOffsetsTimer.
start();
829 if (toSync.
count()) {
830 std::vector<uint32_t> offsetVector = toSync.
getOffsets();
832 std::vector<uint32_t> mastersToSend =
833 getDataFromOffsets(offsetVector, dataVector);
835 assert(mastersToSend.size());
837 size_t num_selected = toSync.
count();
838 size_t num_total = toSync.
size();
841 size_t bitset_alloc_size =
842 ((num_total + 63) / 64) *
sizeof(uint64_t) + (2 *
sizeof(
size_t));
843 size_t bitsetDataSize = (num_selected *
sizeof(uint32_t)) +
844 bitset_alloc_size +
sizeof(num_selected);
845 size_t offsetsDataSize = (num_selected *
sizeof(uint32_t)) +
846 (num_selected *
sizeof(
unsigned int)) +
847 sizeof(uint32_t) +
sizeof(num_selected);
851 if (bitsetDataSize < offsetsDataSize) {
853 galois::runtime::gSerialize(b, 1u);
854 galois::runtime::gSerialize(b, toSync);
855 galois::runtime::gSerialize(b, mastersToSend);
858 galois::runtime::gSerialize(b, 2u);
859 galois::runtime::gSerialize(b, offsetVector);
860 galois::runtime::gSerialize(b, mastersToSend);
862 bytesSent += b.
size();
867 galois::runtime::gSerialize(b, 0u);
868 bytesSent += b.
size();
871 sendOffsetsTimer.stop();
888 void syncAssignmentSends(
889 uint32_t begin, uint32_t end, uint32_t numLocalNodes,
890 std::vector<uint32_t>& localNodeToMaster,
893 p0assignSendTime.start();
896 toSync.
resize(numLocalNodes);
906 if ((lid >= begin) && (lid < end)) {
912 sendOffsets(h, toSync, localNodeToMaster,
"NewAssignments");
916 p0assignSendTime.stop();
924 void sendAllClears(
unsigned phase = 0) {
925 unsigned bytesSent = 0;
928 allClearTimer.start();
934 galois::runtime::gSerialize(b, 3u);
935 bytesSent += b.
size();
939 }
else if (phase == 0) {
946 allClearTimer.stop();
952 void saveReceivedMappings(std::vector<uint32_t>& localNodeToMaster,
953 std::unordered_map<uint64_t, uint32_t>& gid2offsets,
954 unsigned sendingHost,
955 std::vector<uint32_t>& receivedOffsets,
956 std::vector<uint32_t>& receivedMasters) {
962 assert(receivedMasters.size() == receivedOffsets.size());
967 uint64_t curGID = hostOffset + receivedOffsets[i];
968 uint32_t indexIntoMap = gid2offsets[curGID];
971 localNodeToMaster[indexIntoMap] = receivedMasters[i];
984 std::pair<unsigned, unsigned>
985 recvOffsetsAndMasters(std::vector<uint32_t>& receivedOffsets,
986 std::vector<uint32_t>& receivedMasters) {
994 uint32_t sendingHost = p->first;
995 unsigned messageType = (unsigned)-1;
1000 if (messageType == 1) {
1006 }
else if (messageType == 2) {
1010 }
else if (messageType != 0) {
1011 GALOIS_DIE(
"invalid message type for sync of master assignments: ",
1016 " send message type ", messageType);
1018 return std::make_pair(sendingHost, messageType);
1029 void recvOffsetsAndMastersAsync(
1030 std::vector<uint32_t>& localNodeToMaster,
1031 std::unordered_map<uint64_t, uint32_t>& gid2offsets,
1040 uint32_t sendingHost = p->first;
1041 unsigned messageType = (unsigned)-1;
1043 std::vector<uint32_t> receivedOffsets;
1044 std::vector<uint32_t> receivedMasters;
1049 if (messageType == 1) {
1055 saveReceivedMappings(localNodeToMaster, gid2offsets, sendingHost,
1056 receivedOffsets, receivedMasters);
1057 }
else if (messageType == 2) {
1061 saveReceivedMappings(localNodeToMaster, gid2offsets, sendingHost,
1062 receivedOffsets, receivedMasters);
1063 }
else if (messageType == 3) {
1066 assert(!hostFinished.
test(sendingHost));
1067 hostFinished.
set(sendingHost);
1068 }
else if (messageType != 0) {
1069 GALOIS_DIE(
"invalid message type for sync of master assignments: ",
1074 " send message type ", messageType);
1089 syncAssignmentReceives(std::vector<uint32_t>& localNodeToMaster,
1090 std::unordered_map<uint64_t, uint32_t>& gid2offsets) {
1093 p0assignReceiveTime.start();
1096 for (
unsigned h = 0; h < base_DistGraph::numHosts - 1; h++) {
1097 unsigned sendingHost;
1098 unsigned messageType;
1099 std::vector<uint32_t> receivedOffsets;
1100 std::vector<uint32_t> receivedMasters;
1102 std::tie(sendingHost, messageType) =
1103 recvOffsetsAndMasters(receivedOffsets, receivedMasters);
1105 if (messageType == 1 || messageType == 2) {
1106 saveReceivedMappings(localNodeToMaster, gid2offsets, sendingHost,
1107 receivedOffsets, receivedMasters);
1111 p0assignReceiveTime.stop();
1114 void syncAssignmentReceivesAsync(
1115 std::vector<uint32_t>& localNodeToMaster,
1116 std::unordered_map<uint64_t, uint32_t>& gid2offsets,
1120 p0assignReceiveTime.start();
1122 recvOffsetsAndMastersAsync(localNodeToMaster, gid2offsets, hostFinished);
1124 p0assignReceiveTime.stop();
1139 void syncAssignment(
1140 uint32_t begin, uint32_t end, uint32_t numLocalNodes,
1141 std::vector<uint32_t>& localNodeToMaster,
1143 std::unordered_map<uint64_t, uint32_t>& gid2offsets) {
1145 syncAssignmentTimer.start();
1147 syncAssignmentSends(begin, end, numLocalNodes, localNodeToMaster,
1149 syncAssignmentReceives(localNodeToMaster, gid2offsets);
1151 syncAssignmentTimer.stop();
1154 void syncAssignmentAsync(
1155 uint32_t begin, uint32_t end, uint32_t numLocalNodes,
1156 std::vector<uint32_t>& localNodeToMaster,
1158 std::unordered_map<uint64_t, uint32_t>& gid2offsets,
1162 syncAssignmentTimer.start();
1164 syncAssignmentSends(begin, end, numLocalNodes, localNodeToMaster,
1166 syncAssignmentReceivesAsync(localNodeToMaster, gid2offsets, hostFinished);
1168 syncAssignmentTimer.stop();
1179 void sendMastersToOwners(
1180 std::vector<uint32_t>& localNodeToMaster,
1188 toSend.
resize(end - begin);
1197 if (localNodeToMaster[lid] == h) {
1206 sendOffsets(h, toSend, localNodeToMaster,
"MastersToOwners");
1215 void recvMastersToOwners() {
1216 for (
unsigned h = 0; h < base_DistGraph::numHosts - 1; h++) {
1217 unsigned sendingHost;
1218 unsigned messageType;
1219 std::vector<uint32_t> receivedOffsets;
1220 std::vector<uint32_t> receivedMasters;
1222 std::tie(sendingHost, messageType) =
1223 recvOffsetsAndMasters(receivedOffsets, receivedMasters);
1225 if (messageType == 1 || messageType == 2) {
1226 assert(receivedMasters.size() == receivedOffsets.size());
1230 for (
unsigned i = 0; i < receivedMasters.size(); i++) {
1231 uint64_t gidToMap = hostOffset + receivedOffsets[i];
1235 graphPartitioner->addMasterMapping(gidToMap, receivedMasters[i]);
1251 const uint32_t stateRounds) {
1255 syncNodes.resize(base_DistGraph::numHosts);
1258 phase0BitsetSetup(bufGraph, ghosts);
1260 std::unordered_map<uint64_t, uint32_t> gid2offsets;
1261 uint64_t neighborCount = phase0MapSetup(ghosts, gid2offsets, syncNodes);
1265 phase0SendRecv(syncNodes);
1269 p0allocTimer.start();
1273 std::vector<uint64_t> nodeLoads;
1274 std::vector<uint64_t> edgeLoads;
1275 std::vector<galois::CopyableAtomic<uint64_t>> nodeAccum;
1276 std::vector<galois::CopyableAtomic<uint64_t>> edgeAccum;
1277 nodeLoads.assign(base_DistGraph::numHosts, 0);
1278 edgeLoads.assign(base_DistGraph::numHosts, 0);
1279 nodeAccum.assign(base_DistGraph::numHosts, 0);
1280 edgeAccum.assign(base_DistGraph::numHosts, 0);
1282 uint32_t numLocalNodes =
1286 std::vector<uint32_t> localNodeToMaster;
1287 localNodeToMaster.assign(numLocalNodes + neighborCount, (uint32_t)-1);
1295 galois::gPrint(
"Using asynchronous master determination sends.\n");
1298 hostFinished.
resize(base_DistGraph::numHosts);
1299 loadsClear.
resize(base_DistGraph::numHosts);
1302 p0allocTimer.stop();
1307 for (uint32_t i : localNodeToMaster) {
1308 assert(i == (uint32_t)-1);
1313 galois::gPrint(
"Number of BSP sync rounds in master assignment: ",
1320 for (
unsigned syncRound = 0; syncRound < stateRounds; syncRound++) {
1325 syncRound, stateRounds);
1328 std::vector<uint32_t> rangeVec;
1330 getSpecificThreadRange(bufGraph, rangeVec, beginNode, endNode);
1343 [&](uint32_t node) {
1347 uint32_t assignedHost = graphPartitioner->getMaster(
1348 node, bufGraph, localNodeToMaster, gid2offsets, nodeLoads,
1349 nodeAccum, edgeLoads, edgeAccum);
1351 assert(assignedHost != (uint32_t)-1);
1354 localNodeToMaster[node - globalOffset] = assignedHost;
1367 syncAssignment(beginNode - globalOffset, endNode - globalOffset,
1368 numLocalNodes, localNodeToMaster, syncNodes,
1372 if (beginNode != endNode) {
1373 syncAssignmentAsync(beginNode - globalOffset, endNode - globalOffset,
1374 numLocalNodes, localNodeToMaster, syncNodes,
1375 gid2offsets, hostFinished);
1382 loadSyncTimer.start();
1384 syncLoad(nodeLoads, nodeAccum);
1385 syncLoad(edgeLoads, edgeAccum);
1387 asyncSyncLoad(nodeLoads, nodeAccum, edgeLoads, edgeAccum, loadsClear);
1389 loadSyncTimer.stop();
1394 hostFinished.
count());
1411 while (hostFinished.
count() != base_DistGraph::numHosts ||
1422 syncAssignmentReceivesAsync(localNodeToMaster, gid2offsets,
1424 asyncRecvLoad(nodeLoads, edgeLoads, loadsClear);
1430 printLoad(nodeLoads, nodeAccum);
1431 printLoad(edgeLoads, edgeAccum);
1435 for (uint32_t i = 0; i < localNodeToMaster.size(); i++) {
1436 if (localNodeToMaster[i] == (uint32_t)-1) {
1438 assert(localNodeToMaster[i] != (uint32_t)-1);
1449 "] Local master assignment "
1457 p0master2ownerTimer.start();
1458 sendMastersToOwners(localNodeToMaster, syncNodes);
1459 recvMastersToOwners();
1460 p0master2ownerTimer.stop();
1466 graphPartitioner->saveGID2HostInfo(gid2offsets, localNodeToMaster,
1472 uint64_t edgeOffset,
1476 incomingMirrors.
reset();
1490 auto ee = bufGraph.
edgeEnd(n);
1491 for (; ii < ee; ++ii) {
1493 if (graphPartitioner->retrieveMaster(dst) != myID) {
1494 incomingMirrors.
set(dst);
1497 prefixSumOfEdges[n - globalOffset] = (*ee) - edgeOffset;
1498 ltgv[n - globalOffset] = n;
1504 inspectionTimer.
stop();
1509 "] Edge inspection time: ", inspectionTimer.
get_usec() / 1000000.0f,
1510 " seconds to read ", allBytesRead,
" bytes (",
1511 allBytesRead / (float)inspectionTimer.
get_usec(),
" MBPS)\n");
1514 uint32_t additionalMirrorCount = incomingMirrors.
count();
1519 prefixSumOfEdges.resize(prefixSumOfEdges.size() + additionalMirrorCount,
1520 prefixSumOfEdges.back());
1522 prefixSumOfEdges.resize(additionalMirrorCount);
1525 if (additionalMirrorCount > 0) {
1529 std::vector<uint64_t> threadPrefixSums(activeThreads);
1533 std::tie(beginNode, endNode) =
1536 for (
size_t i = beginNode; i < endNode; i++) {
1537 if (incomingMirrors.
test(i))
1540 threadPrefixSums[tid] = count;
1543 for (
unsigned int i = 1; i < threadPrefixSums.size(); i++) {
1544 threadPrefixSums[i] += threadPrefixSums[i - 1];
1547 assert(threadPrefixSums.back() == additionalMirrorCount);
1554 std::tie(beginNode, endNode) =
1557 uint32_t threadStartLocation = 0;
1559 threadStartLocation = threadPrefixSums[tid - 1];
1561 uint32_t handledNodes = 0;
1562 for (
size_t i = beginNode; i < endNode; i++) {
1563 if (incomingMirrors.
test(i)) {
1565 threadStartLocation +
1574 if (prefixSumOfEdges.size() != 0) {
1604 template <
typename GraphTy,
1605 typename std::enable_if<!std::is_void<
1607 void edgeCutLoad(GraphTy&
graph,
1624 uint32_t lsrc = this->G2LEdgeCut(n, globalOffset);
1627 for (; ii < ee; ++ii) {
1629 decltype(gdst) ldst = this->G2LEdgeCut(gdst, globalOffset);
1630 auto gdata = bGraph.edgeData(*ii);
1631 graph.constructEdge(cur++, ldst, gdata);
1633 assert(cur == (*graph.
edge_end(lsrc)));
1642 "] Edge loading time: ", timer.get_usec() / 1000000.0f,
1643 " seconds to read ", bGraph.
getBytesRead(),
" bytes (",
1644 bGraph.
getBytesRead() / (float)timer.get_usec(),
" MBPS)\n");
1656 template <
typename GraphTy,
1657 typename std::enable_if<std::is_void<
1659 void edgeCutLoad(GraphTy& graph,
1676 uint32_t lsrc = this->G2LEdgeCut(n, globalOffset);
1679 for (; ii < ee; ++ii) {
1681 decltype(gdst) ldst = this->G2LEdgeCut(gdst, globalOffset);
1682 graph.constructEdge(cur++, ldst);
1684 assert(cur == (*graph.
edge_end(lsrc)));
1693 "] Edge loading time: ", timer.get_usec() / 1000000.0f,
1694 " seconds to read ", bGraph.
getBytesRead(),
" bytes (",
1695 bGraph.
getBytesRead() / (float)timer.get_usec(),
" MBPS)\n");
1708 std::vector<std::vector<uint64_t>>& numOutgoingEdges,
1709 std::vector<galois::DynamicBitSet>& hasIncomingEdge,
1717 numOutgoingEdges[i].assign(numRead, 0);
1721 hostHasOutgoing.
resize(base_DistGraph::numHosts);
1722 hostHasOutgoing.
reset();
1723 assignEdges(bufGraph, numOutgoingEdges, hasIncomingEdge, hostHasOutgoing);
1725 inspectionTimer.
stop();
1730 "] Edge inspection time: ", inspectionTimer.
get_usec() / 1000000.0f,
1731 " seconds to read ", allBytesRead,
" bytes (",
1732 allBytesRead / (float)inspectionTimer.
get_usec(),
" MBPS)\n");
1737 sendInspectionData(numOutgoingEdges, hasIncomingEdge, hostHasOutgoing);
1742 if (hasIncomingEdge[myHostID].
size() == 0) {
1744 hasIncomingEdge[myHostID].reset();
1746 recvInspectionData(numOutgoingEdges, hasIncomingEdge[myHostID]);
1763 std::vector<std::vector<uint64_t>>& numOutgoingEdges,
1764 std::vector<galois::DynamicBitSet>& hasIncomingEdge,
1766 std::vector<galois::CopyableAtomic<char>> indicatorVars(
1767 base_DistGraph::numHosts);
1770 indicatorVars[i] = 0;
1777 for (
unsigned syncRound = 0; syncRound < _edgeStateRounds; syncRound++) {
1782 syncRound, _edgeStateRounds);
1790 auto ee_end = bufGraph.
edgeEnd(src);
1791 uint64_t numEdgesL = std::distance(ee, ee_end);
1793 for (; ee != ee_end; ee++) {
1795 uint32_t hostBelongs = -1;
1796 hostBelongs = graphPartitioner->getEdgeOwner(src, dst, numEdgesL);
1797 if (_edgeStateRounds > 1) {
1798 hostLoads[hostBelongs] += 1;
1801 numOutgoingEdges[hostBelongs][src - globalOffset] += 1;
1802 hostHasOutgoing.
set(hostBelongs);
1803 bool hostIsMasterOfDest =
1804 (hostBelongs == graphPartitioner->retrieveMaster(dst));
1808 if (!hostIsMasterOfDest) {
1809 auto& bitsetStatus = indicatorVars[hostBelongs];
1812 if (bitsetStatus == 0) {
1815 bitsetStatus.compare_exchange_strong(expected, 1);
1818 hasIncomingEdge[hostBelongs].resize(globalNodes);
1819 hasIncomingEdge[hostBelongs].reset();
1824 while (indicatorVars[hostBelongs] != 2)
1826 hasIncomingEdge[hostBelongs].set(dst);
1850 const std::vector<uint64_t>& hostOutgoingEdges) {
1854 std::vector<uint64_t> threadPrefixSums(activeThreads);
1857 assert(hostSize == hostOutgoingEdges.size());
1864 std::tie(beginNode, endNode) =
1867 for (
size_t i = beginNode; i < endNode; i++) {
1868 if (hostOutgoingEdges[i] > 0) {
1872 threadPrefixSums[tid] = count;
1876 for (
unsigned int i = 1; i < threadPrefixSums.size(); i++) {
1877 threadPrefixSums[i] += threadPrefixSums[i - 1];
1880 uint32_t numNonZero = threadPrefixSums[activeThreads - 1];
1881 std::vector<uint32_t> masterLocation;
1882 masterLocation.resize(numNonZero, (uint32_t)-1);
1884 assert(numNonZero > 0);
1893 std::tie(beginNode, endNode) =
1896 uint32_t threadStartLocation = 0;
1898 threadStartLocation = threadPrefixSums[tid - 1];
1901 uint32_t handledNodes = 0;
1902 for (
size_t i = beginNode; i < endNode; i++) {
1903 if (hostOutgoingEdges[i] > 0) {
1905 masterLocation[threadStartLocation + handledNodes] =
1906 graphPartitioner->retrieveMaster(i + startNode);
1913 for (uint32_t i : masterLocation) {
1914 assert(i != (uint32_t)-1);
1920 galois::runtime::gSerialize(b, masterLocation);
1926 size_t numOfNodes = hostIncomingEdges.
count();
1927 std::vector<uint32_t> masterMap;
1928 masterMap.resize(numOfNodes, (uint32_t)-1);
1930 std::vector<uint32_t> bitsetOffsets = hostIncomingEdges.
getOffsets();
1947 [&](
size_t offset) {
1949 graphPartitioner->retrieveMaster(bitsetOffsets[offset]);
1954 for (uint32_t i : masterMap) {
1955 assert(i != (uint32_t)-1);
1956 assert(i < base_DistGraph::numHosts);
1962 galois::runtime::gSerialize(b, masterMap);
1965 void deserializeOutgoingMasterMap(
1966 uint32_t senderHost,
const std::vector<uint64_t>& hostOutgoingEdges,
1967 const std::vector<uint32_t>& recvMasterLocations) {
1971 assert(hostSize == hostOutgoingEdges.size());
1973 offsetsToConsider.
resize(hostSize);
1974 offsetsToConsider.reset();
1980 [&](
size_t offset) {
1981 if (hostOutgoingEdges[offset] > 0) {
1982 offsetsToConsider.set(offset);
1986 assert(offsetsToConsider.count() == recvMasterLocations.size());
1991 size_t curCount = 0;
1993 for (uint32_t offset : offsetsToConsider.getOffsets()) {
1997 graphPartitioner->addMasterMapping(offset + hostOffset,
1998 recvMasterLocations[curCount]);
2016 void deserializeIncomingMasterMap(
2017 const std::vector<uint32_t>& gids,
2018 const std::vector<uint32_t>& recvMasterLocations) {
2019 assert(gids.size() == recvMasterLocations.size());
2020 size_t curCount = 0;
2021 for (uint64_t gid : gids) {
2026 graphPartitioner->addMasterMapping(gid, recvMasterLocations[curCount]);
2041 void sendInspectionData(std::vector<std::vector<uint64_t>>& numOutgoingEdges,
2042 std::vector<galois::DynamicBitSet>& hasIncomingEdge,
2049 for (
unsigned h = 0; h < net.Num; h++) {
2052 if (!hostHasOutgoing.
test(h)) {
2053 numOutgoingEdges[h].clear();
2061 if (hostHasOutgoing.
test(h)) {
2062 galois::runtime::gSerialize(b, 1);
2063 galois::runtime::gSerialize(b, numOutgoingEdges[h]);
2064 if (graphPartitioner->masterAssignPhase()) {
2065 serializeOutgoingMasterMap(b, numOutgoingEdges[h]);
2068 galois::runtime::gSerialize(b, 0);
2070 numOutgoingEdges[h].clear();
2074 uint64_t bitsetSize = curBitset.
size();
2075 uint64_t onlyOffsetsSize = curBitset.
count() * 32;
2076 if (bitsetSize == 0) {
2078 galois::runtime::gSerialize(b, 0);
2079 }
else if (onlyOffsetsSize <= bitsetSize) {
2081 std::vector<uint32_t> offsets = curBitset.
getOffsets();
2082 galois::runtime::gSerialize(b, 2);
2083 galois::runtime::gSerialize(b, offsets);
2085 if (graphPartitioner->masterAssignPhase()) {
2088 serializeIncomingMasterMap(b, curBitset);
2092 galois::runtime::gSerialize(b, 1);
2093 galois::runtime::gSerialize(b, curBitset);
2094 if (graphPartitioner->masterAssignPhase()) {
2097 serializeIncomingMasterMap(b, curBitset);
2111 GRNAME, std::string(
"EdgeInspectionBytesSent"), bytesSent.
reduce());
2125 void recvInspectionData(std::vector<std::vector<uint64_t>>& numOutgoingEdges,
2129 for (
unsigned h = 0; h < net.Num - 1; h++) {
2136 uint32_t sendingHost = p->first;
2139 uint32_t outgoingExists = 2;
2142 if (outgoingExists == 1) {
2146 if (graphPartitioner->masterAssignPhase()) {
2147 std::vector<uint32_t> recvMasterLocations;
2149 deserializeOutgoingMasterMap(
2150 sendingHost, numOutgoingEdges[sendingHost], recvMasterLocations);
2152 }
else if (outgoingExists == 0) {
2154 numOutgoingEdges[sendingHost].clear();
2156 GALOIS_DIE(
"invalid recv inspection data metadata mode, outgoing");
2159 uint32_t bitsetMetaMode = 3;
2161 if (bitsetMetaMode == 1) {
2167 if (graphPartitioner->masterAssignPhase()) {
2168 std::vector<uint32_t> recvMasterLocations;
2170 deserializeIncomingMasterMap(recvSet.
getOffsets(),
2171 recvMasterLocations);
2173 }
else if (bitsetMetaMode == 2) {
2175 std::vector<uint32_t> recvOffsets;
2177 for (uint32_t offset : recvOffsets) {
2178 hasIncomingEdge.
set(offset);
2181 if (graphPartitioner->masterAssignPhase()) {
2182 std::vector<uint32_t> recvMasterLocations;
2184 deserializeIncomingMasterMap(recvOffsets, recvMasterLocations);
2186 }
else if (bitsetMetaMode == 0) {
2189 GALOIS_DIE(
"invalid recv inspection data metadata mode");
2194 "] Inspection receives complete.\n");
2202 nodeMapping(std::vector<std::vector<uint64_t>>& numOutgoingEdges,
2205 base_DistGraph::numNodes = 0;
2211 base_DistGraph::numHosts * 1.15);
2215 inspectMasterNodes(numOutgoingEdges, prefixSumOfEdges);
2216 inspectOutgoingNodes(numOutgoingEdges, prefixSumOfEdges);
2217 createIntermediateMetadata(prefixSumOfEdges, hasIncomingEdge.
count());
2218 inspectIncomingNodes(hasIncomingEdge, prefixSumOfEdges);
2219 finalizeInspection(prefixSumOfEdges);
2222 "] To receive this many nodes: ", nodesToReceive);
2225 return prefixSumOfEdges;
2232 void inspectMasterNodes(std::vector<std::vector<uint64_t>>& numOutgoingEdges,
2241 std::vector<uint64_t> threadPrefixSums(activeThreads);
2244 size_t hostSize = lastNode - startNode;
2246 if (numOutgoingEdges[h].
size() != 0) {
2247 assert(hostSize == numOutgoingEdges[h].
size());
2256 std::tie(beginNode, endNode) =
2259 for (
size_t i = beginNode; i < endNode; i++) {
2263 if (graphPartitioner->retrieveMaster(i + startNode) == myHID) {
2267 threadPrefixSums[tid] = count;
2271 for (
unsigned int i = 1; i < threadPrefixSums.size(); i++) {
2272 threadPrefixSums[i] += threadPrefixSums[i - 1];
2277 base_DistGraph::numNodes);
2279 uint32_t newMasterNodes = threadPrefixSums[activeThreads - 1];
2281 h,
": ", newMasterNodes);
2284 prefixSumOfEdges.resize(base_DistGraph::numNodes + newMasterNodes);
2288 if (newMasterNodes > 0) {
2293 std::tie(beginNode, endNode) =
2297 uint32_t threadStartLocation = 0;
2299 threadStartLocation = threadPrefixSums[tid - 1];
2302 uint32_t handledNodes = 0;
2303 for (
size_t i = beginNode; i < endNode; i++) {
2304 uint32_t globalID = startNode + i;
2306 if (graphPartitioner->retrieveMaster(globalID) == myHID) {
2308 if (numOutgoingEdges[h].
size() > 0) {
2309 uint64_t myEdges = numOutgoingEdges[h][i];
2310 numOutgoingEdges[h][i] = 0;
2312 prefixSumOfEdges[startingNodeIndex + threadStartLocation +
2313 handledNodes] = myEdges;
2314 if (myEdges > 0 && h != myHID) {
2318 prefixSumOfEdges[startingNodeIndex + threadStartLocation +
2323 threadStartLocation +
2324 handledNodes] = globalID;
2329 base_DistGraph::numNodes += newMasterNodes;
2333 nodesToReceive += toReceive.
reduce();
2343 inspectOutgoingNodes(std::vector<std::vector<uint64_t>>& numOutgoingEdges,
2351 size_t hostSize = numOutgoingEdges[h].size();
2353 if (hostSize == 0) {
2358 std::vector<uint64_t> threadPrefixSums(activeThreads);
2365 std::tie(beginNode, endNode) =
2368 for (
size_t i = beginNode; i < endNode; i++) {
2369 if (numOutgoingEdges[h][i] > 0) {
2373 threadPrefixSums[tid] = count;
2377 for (
unsigned int i = 1; i < threadPrefixSums.size(); i++) {
2378 threadPrefixSums[i] += threadPrefixSums[i - 1];
2383 base_DistGraph::numNodes);
2385 uint32_t newOutgoingNodes = threadPrefixSums[activeThreads - 1];
2387 prefixSumOfEdges.resize(base_DistGraph::numNodes + newOutgoingNodes);
2394 if (newOutgoingNodes > 0) {
2399 std::tie(beginNode, endNode) =
2403 uint32_t threadStartLocation = 0;
2405 threadStartLocation = threadPrefixSums[tid - 1];
2408 uint32_t handledNodes = 0;
2410 for (
size_t i = beginNode; i < endNode; i++) {
2411 uint64_t myEdges = numOutgoingEdges[h][i];
2413 prefixSumOfEdges[startingNodeIndex + threadStartLocation +
2414 handledNodes] = myEdges;
2416 threadStartLocation +
2417 handledNodes] = startNode + i;
2420 if (myEdges > 0 && h != myHID) {
2426 base_DistGraph::numNodes += newOutgoingNodes;
2429 numOutgoingEdges[h].clear();
2432 nodesToReceive += toReceive.
reduce();
2445 const uint64_t incomingEstimate) {
2446 if (base_DistGraph::numNodes == 0) {
2455 prefixSumOfEdges[i] += prefixSumOfEdges[i - 1];
2470 std::vector<uint64_t> threadPrefixSums(activeThreads);
2475 std::tie(beginNode, endNode) =
2478 for (
size_t i = beginNode; i < endNode; i++) {
2481 if (hasIncomingEdge.
test(i) &&
2485 threadPrefixSums[tid] = count;
2488 for (
unsigned int i = 1; i < threadPrefixSums.size(); i++) {
2489 threadPrefixSums[i] += threadPrefixSums[i - 1];
2493 assert(base_DistGraph::localToGlobalVector.
size() ==
2494 base_DistGraph::numNodes);
2496 uint32_t newIncomingNodes = threadPrefixSums[activeThreads - 1];
2498 prefixSumOfEdges.resize(base_DistGraph::numNodes + newIncomingNodes);
2499 base_DistGraph::localToGlobalVector.resize(base_DistGraph::numNodes +
2504 if (newIncomingNodes > 0) {
2509 std::tie(beginNode, endNode) =
2513 uint32_t threadStartLocation = 0;
2515 threadStartLocation = threadPrefixSums[tid - 1];
2518 uint32_t handledNodes = 0;
2520 for (
size_t i = beginNode; i < endNode; i++) {
2521 if (hasIncomingEdge.
test(i) &&
2523 prefixSumOfEdges[startingNodeIndex + threadStartLocation +
2525 base_DistGraph::localToGlobalVector[startingNodeIndex +
2526 threadStartLocation +
2532 base_DistGraph::numNodes += newIncomingNodes;
2545 prefixSumOfEdges[i] += prefixSumOfEdges[i - 1];
2550 if (prefixSumOfEdges.size() != 0) {
2563 void fillMirrors() {
2568 uint32_t globalID = base_DistGraph::localToGlobalVector[i];
2570 .push_back(globalID);
2576 template <
typename GraphTy>
2577 void loadEdges(GraphTy& graph,
2580 if (std::is_void<typename GraphTy::edge_data_type>::value) {
2581 fprintf(stderr,
"Loading void edge-data while creating edges.\n");
2583 fprintf(stderr,
"Loading edge-data while creating edges.\n");
2589 std::atomic<uint32_t> receivedNodes;
2590 receivedNodes.store(0);
2593 loadEdgeTimer.start();
2596 sendEdges(graph, bufGraph, receivedNodes);
2603 [&](
unsigned,
unsigned) { receiveEdges(graph, receivedNodes); });
2606 loadEdgeTimer.stop();
2609 loadEdgeTimer.get_usec() / 1000000.0f,
" seconds to read ",
2610 bufBytesRead,
" bytes (",
2611 bufBytesRead / (float)loadEdgeTimer.get_usec(),
" MBPS)\n");
2615 template <
typename GraphTy,
2616 typename std::enable_if<!std::is_void<
2618 void sendEdges(GraphTy& graph,
2620 std::atomic<uint32_t>& receivedNodes) {
2621 using DstVecType = std::vector<std::vector<uint64_t>>;
2623 std::vector<std::vector<typename GraphTy::edge_data_type>>;
2624 using SendBufferVecTy = std::vector<galois::runtime::SendBuffer>;
2627 base_DistGraph::numHosts);
2629 base_DistGraph::numHosts);
2631 base_DistGraph::numHosts);
2640 messagesSent.
reset();
2642 maxBytesSent.
reset();
2644 for (
unsigned syncRound = 0; syncRound < _edgeStateRounds; syncRound++) {
2657 uint64_t curEdge = 0;
2659 lsrc = this->
G2L(src);
2665 auto ee_end = bufGraph.
edgeEnd(src);
2666 uint64_t numEdgesL = std::distance(ee, ee_end);
2667 auto& gdst_vec = *gdst_vecs.getLocal();
2668 auto& gdata_vec = *gdata_vecs.getLocal();
2670 for (
unsigned i = 0; i <
numHosts; ++i) {
2671 gdst_vec[i].clear();
2672 gdata_vec[i].clear();
2673 gdst_vec[i].reserve(numEdgesL);
2677 for (; ee != ee_end; ++ee) {
2679 auto gdata = bufGraph.
edgeData(*ee);
2681 uint32_t hostBelongs =
2682 graphPartitioner->getEdgeOwner(src, gdst, numEdgesL);
2683 if (_edgeStateRounds > 1) {
2684 hostLoads[hostBelongs] += 1;
2687 if (hostBelongs ==
id) {
2690 uint32_t ldst = this->
G2L(gdst);
2691 graph.constructEdge(curEdge++, ldst, gdata);
2696 gdst_vec[hostBelongs].push_back(gdst);
2697 gdata_vec[hostBelongs].push_back(gdata);
2703 assert(curEdge == (*graph.edge_end(lsrc)));
2707 for (uint32_t h = 0; h <
numHosts; ++h) {
2711 if (gdst_vec[h].
size() > 0) {
2712 auto& b = (*sendBuffers.getLocal())[h];
2713 galois::runtime::gSerialize(b, src);
2714 galois::runtime::gSerialize(b, gdst_vec[h]);
2715 galois::runtime::gSerialize(b, gdata_vec[h]);
2718 if (b.
size() > edgePartitionSendBufSize) {
2733 this->processReceivedEdgeBuffer(buffer, graph, receivedNodes);
2744 for (
unsigned threadNum = 0; threadNum < sendBuffers.size(); ++threadNum) {
2745 auto& sbr = *sendBuffers.getRemote(threadNum);
2749 auto& sendBuffer = sbr[h];
2750 if (sendBuffer.size() > 0) {
2752 bytesSent.
update(sendBuffer.size());
2753 maxBytesSent.
update(sendBuffer.size());
2756 sendBuffer.getVec().clear();
2764 GRNAME, std::string(
"EdgeLoadingMessagesSent"), messagesSent.
reduce());
2766 GRNAME, std::string(
"EdgeLoadingBytesSent"), bytesSent.
reduce());
2768 GRNAME, std::string(
"EdgeLoadingMaxBytesSent"), maxBytesSent.
reduce());
2772 template <
typename GraphTy,
2773 typename std::enable_if<std::is_void<
2775 void sendEdges(GraphTy& graph,
2777 std::atomic<uint32_t>& receivedNodes) {
2778 using DstVecType = std::vector<std::vector<uint64_t>>;
2779 using SendBufferVecTy = std::vector<galois::runtime::SendBuffer>;
2782 base_DistGraph::numHosts);
2784 base_DistGraph::numHosts);
2793 messagesSent.
reset();
2795 maxBytesSent.
reset();
2797 for (
unsigned syncRound = 0; syncRound < _edgeStateRounds; syncRound++) {
2810 uint64_t curEdge = 0;
2812 lsrc = this->
G2L(src);
2818 auto ee_end = bufGraph.
edgeEnd(src);
2819 uint64_t numEdgesL = std::distance(ee, ee_end);
2820 auto& gdst_vec = *gdst_vecs.getLocal();
2822 for (
unsigned i = 0; i <
numHosts; ++i) {
2823 gdst_vec[i].clear();
2827 for (; ee != ee_end; ++ee) {
2829 uint32_t hostBelongs =
2830 graphPartitioner->getEdgeOwner(src, gdst, numEdgesL);
2831 if (_edgeStateRounds > 1) {
2832 hostLoads[hostBelongs] += 1;
2835 if (hostBelongs ==
id) {
2838 uint32_t ldst = this->
G2L(gdst);
2839 graph.constructEdge(curEdge++, ldst);
2844 gdst_vec[hostBelongs].push_back(gdst);
2850 assert(curEdge == (*graph.edge_end(lsrc)));
2854 for (uint32_t h = 0; h <
numHosts; ++h) {
2858 if (gdst_vec[h].
size() > 0) {
2859 auto& b = (*sendBuffers.getLocal())[h];
2860 galois::runtime::gSerialize(b, src);
2861 galois::runtime::gSerialize(b, gdst_vec[h]);
2864 if (b.
size() > edgePartitionSendBufSize) {
2879 this->processReceivedEdgeBuffer(buffer, graph, receivedNodes);
2890 for (
unsigned threadNum = 0; threadNum < sendBuffers.size(); ++threadNum) {
2891 auto& sbr = *sendBuffers.getRemote(threadNum);
2895 auto& sendBuffer = sbr[h];
2896 if (sendBuffer.size() > 0) {
2898 bytesSent.
update(sendBuffer.size());
2899 maxBytesSent.
update(sendBuffer.size());
2902 sendBuffer.getVec().clear();
2910 GRNAME, std::string(
"EdgeLoadingMessagesSent"), messagesSent.
reduce());
2912 GRNAME, std::string(
"EdgeLoadingBytesSent"), bytesSent.
reduce());
2914 GRNAME, std::string(
"EdgeLoadingMaxBytesSent"), maxBytesSent.
reduce());
2918 template <
typename GraphTy>
2919 void processReceivedEdgeBuffer(
2920 std::optional<std::pair<uint32_t, galois::runtime::RecvBuffer>>& buffer,
2921 GraphTy& graph, std::atomic<uint32_t>& receivedNodes) {
2923 auto& rb = buffer->second;
2924 while (rb.r_size() > 0) {
2926 std::vector<uint64_t> gdst_vec;
2930 uint32_t lsrc = this->
G2L(n);
2932 uint64_t cur_end = *graph.edge_end(lsrc);
2933 assert((cur_end - cur) == gdst_vec.size());
2934 deserializeEdges(graph, rb, gdst_vec, cur, cur_end);
2944 template <
typename GraphTy>
2945 void receiveEdges(GraphTy& graph, std::atomic<uint32_t>& receivedNodes) {
2949 while (receivedNodes < nodesToReceive) {
2952 processReceivedEdgeBuffer(p, graph, receivedNodes);
2956 template <
typename GraphTy,
2957 typename std::enable_if<!std::is_void<
2960 std::vector<uint64_t>& gdst_vec, uint64_t& cur,
2961 uint64_t& cur_end) {
2962 std::vector<typename GraphTy::edge_data_type> gdata_vec;
2965 while (cur < cur_end) {
2966 auto gdata = gdata_vec[i];
2967 uint64_t gdst = gdst_vec[i++];
2968 uint32_t ldst = this->
G2L(gdst);
2969 graph.constructEdge(cur++, ldst, gdata);
2974 template <
typename GraphTy,
2975 typename std::enable_if<std::is_void<
2978 std::vector<uint64_t>& gdst_vec, uint64_t& cur,
2979 uint64_t& cur_end) {
2981 while (cur < cur_end) {
2982 uint64_t gdst = gdst_vec[i++];
2983 uint32_t ldst = this->
G2L(gdst);
2984 graph.constructEdge(cur++, ldst);
2991 template <
typename NodeTy,
typename EdgeTy,
typename Partitioner>
2992 constexpr
const char*
const
EdgeIterator edgeEnd(uint64_t globalNodeID)
Get the index to the first edge of the node after the provided node.
Definition: BufferedGraph.h:414
Distributed sum-reducer for getting the sum of some value across multiple hosts.
Definition: DReducible.h:45
void printEdgeLoad()
Debug function: prints host loads.
Definition: NewGeneric.h:138
uint64_t numGlobalEdges
Total edges in the global unpartitioned graph.
Definition: DistributedGraph.h:80
void gInfo(Args &&...args)
Prints an info string from a sequence of things.
Definition: gIO.h:55
void increment_evilPhase()
Increments evilPhase, a phase counter used by communication.
Definition: DistributedGraph.h:133
unsigned int getActiveThreads() noexcept
Returns the number of threads in use.
Definition: Threads.cpp:37
std::pair< IterTy, IterTy > block_range(IterTy b, IterTy e, unsigned id, unsigned num)
Returns a continuous block from the range based on the number of divisions and the id of the block re...
Definition: gstl.h:244
void reset()
Definition: Reduction.h:113
Class that loads a portion of a Galois graph from disk directly into memory buffers for access...
Definition: BufferedGraph.h:49
uint64_t numGlobalNodes
Total nodes in the global unpartitioned graph.
Definition: DistributedGraph.h:79
void bitwise_or(const DynamicBitSet &other)
Definition: libgalois/include/galois/DynamicBitset.h:241
const uint32_t numHosts
Total number of machines.
Definition: DistributedGraph.h:85
EdgeIterator edgeBegin(uint64_t globalNodeID)
Get the index to the first edge of the provided node THAT THIS GRAPH HAS LOADED (not necessary the fi...
Definition: BufferedGraph.h:386
void reportStat_Tmax(const S1 ®ion, const S2 &category, const T &value)
Definition: Statistics.h:556
EdgeDataType edgeData(uint64_t globalEdgeID)
Get the edge data of some edge.
Definition: BufferedGraph.h:462
std::vector< uint32_t > getOffsets() const
Returns a vector containing the set bits in this bitset in order from left to right.
Definition: libgalois/include/galois/DynamicBitset.h:347
void resize(size_t n)
Definition: PODResizeableArray.h:142
Concurrent dynamically allocated bitset.
Definition: libgalois/include/galois/DynamicBitset.h:47
void reportParam(const S1 ®ion, const S2 &category, const V &value)
Definition: Statistics.h:616
Buffer for serialization of data.
Definition: Serialize.h:56
void resetEdgeLoad()
Reset load balance on host reducibles.
Definition: NewGeneric.h:110
void gDebug(Args &&...GALOIS_USED_ONLY_IN_DEBUG(args))
Prints a debug string from a sequence of things; prints nothing if NDEBUG is defined.
Definition: gIO.h:72
virtual std::pair< unsigned, unsigned > cartesianGrid() const
Returns Cartesian split (if it exists, else returns pair of 0s.
Definition: NewGeneric.h:103
size_t size() const
Gets number of nodes on this (local) graph.
Definition: DistributedGraph.h:646
void gDeserialize(DeSerializeBuffer &buf, T1 &&t1, Args &&...args)
Deserialize data in a buffer into a series of objects.
Definition: Serialize.h:1032
uint32_t size() const
Gets the number of global nodes in the graph.
Definition: BufferedGraph.h:294
Ty reset()
Reset the entire accumulator.
Definition: DReducible.h:153
SpecificRange< IterTy > makeSpecificRange(IterTy begin, IterTy end, const uint32_t *thread_ranges)
Creates a SpecificRange object.
Definition: Range.h:219
void transpose(const char *regionName=NULL)
Perform an in-memory transpose of the graph, replacing the original CSR to CSC.
Definition: LC_CSR_Graph.h:615
const char * loopname
Definition: Executor_ParaMeter.h:145
#define GALOIS_DIE(...)
Definition: gIO.h:96
void read_local_graph_from_file(std::string)
Read the local LC_CSR graph from the file on a disk.
Definition: DistributedGraph.h:834
uint64_t numEdges
Num edges in this graph in total.
Definition: DistributedGraph.h:82
void resetReadCounters()
Reset reading counters.
Definition: BufferedGraph.h:496
void reportStat_Tsum(const S1 ®ion, const S2 &category, const T &value)
Definition: Statistics.h:562
vTy & getVec()
Returns vector of data stored in this serialize buffer.
Definition: Serialize.h:115
bool test(size_t index) const
Check a bit to see if it is currently set.
Definition: libgalois/include/galois/DynamicBitset.h:192
Class that inherits from std::atomic to make it copyable by defining a copy constructor.
Definition: AtomicWrapper.h:40
uint32_t G2L(uint64_t gid) const
Definition: DistributedGraph.h:482
void determineThreadRanges()
Uses a pre-computed prefix sum to determine division of nodes among threads.
Definition: DistributedGraph.h:725
void reserve(size_t n)
Definition: PODResizeableArray.h:129
edge_iterator edge_begin(GraphNode N)
Definition: OfflineGraph.h:278
Accumulator for T where accumulation is max.
Definition: Reduction.h:189
void loadPartialGraph(const std::string &filename, uint64_t nodeStart, uint64_t nodeEnd, uint64_t edgeStart, uint64_t edgeEnd, uint64_t numGlobalNodes, uint64_t numGlobalEdges)
Given a node/edge range to load, loads the specified portion of the graph into memory buffers using r...
Definition: BufferedGraph.h:348
uint64_t count() const
Count how many bits are set in the bitset.
Definition: libgalois/include/galois/DynamicBitset.h:320
std::vector< std::vector< size_t > > mirrorNodes
Mirror nodes from different hosts. For reduce.
Definition: DistributedGraph.h:100
void resize(uint64_t n)
Resizes the bitset.
Definition: libgalois/include/galois/DynamicBitset.h:78
std::vector< std::pair< uint64_t, uint64_t > > gid2host
Information that converts host to range of nodes that host reads.
Definition: DistributedGraph.h:98
std::vector< T, Pow2Alloc< T >> Vector
[STL vector using Pow_2_VarSizeAlloc]
Definition: gstl.h:52
~NewDistGraphGeneric()
Free the graph partitioner.
Definition: NewGeneric.h:368
virtual unsigned getHostID(uint64_t gid) const
Determines which host has the master for a particular node.
Definition: NewGeneric.h:82
void reportStat_Single(const S1 ®ion, const S2 &category, const T &value)
Definition: Statistics.h:544
Definition: PerThreadStorage.h:88
void reset()
Gets the space taken by the bitset.
Definition: libgalois/include/galois/DynamicBitset.h:110
virtual bool isOwned(uint64_t gid) const
Determine if a node has a master on this host.
Definition: NewGeneric.h:87
Definition: OfflineGraph.h:63
size_type size() const
Returns the size of the serialize buffer.
Definition: Serialize.h:125
void constructNodes()
Definition: LC_CSR_Graph.h:570
MASTERS_DISTRIBUTION
Enums specifying how masters are to be distributed among hosts.
Definition: DistributedGraph.h:48
bool transposed
Marks if the graph is transposed or not.
Definition: DistributedGraph.h:76
void determineThreadRangesMaster()
Determines the thread ranges for master nodes only and saves them to the object.
Definition: DistributedGraph.h:736
void gPrint(Args &&...args)
Prints a sequence of things.
Definition: gIO.h:47
edge_iterator edge_end(GraphNode N)
Gets the end edge boundary of some node.
Definition: DistributedGraph.h:625
unsigned evilPhasePlus1()
Returns evilPhase + 1, handling loop around as necessary.
Definition: DistributedGraph.h:144
unsigned int activeThreads
Definition: Threads.cpp:26
boost::counting_iterator< uint64_t > edge_iterator
Definition: OfflineGraph.h:194
uint64_t getNodeOffset() const
Definition: BufferedGraph.h:304
NetworkInterface & getSystemNetworkInterface()
Get the network interface.
Definition: Network.cpp:131
size_t size() const
Gets the size of the bitset.
Definition: libgalois/include/galois/DynamicBitset.h:99
void syncEdgeLoad()
Sync load balance on hosts using reducibles.
Definition: NewGeneric.h:124
size_type size() const
Definition: PODResizeableArray.h:125
SpecificRange is a range type where a threads range is specified by an an int array that tells you wh...
Definition: Range.h:122
uint32_t numNodes
Num nodes in this graph in total.
Definition: DistributedGraph.h:81
void clear()
Definition: PODResizeableArray.h:147
Contains the implementation for DistGraph.
uint32_t numOwned
Number of nodes owned (masters) by this host.
Definition: DistributedGraph.h:89
void do_all(const RangeFunc &rangeMaker, FunctionTy &&fn, const Args &...args)
Standard do-all loop.
Definition: Loops.h:71
Definition: NewGeneric.h:47
bool set(size_t index)
Set a bit in the bitset.
Definition: libgalois/include/galois/DynamicBitset.h:206
uint64_t getBytesRead()
Returns the total number of bytes read from this graph so far.
Definition: BufferedGraph.h:508
uint32_t numNodesWithEdges
Number of nodes (masters + mirrors) that have outgoing edges.
Definition: DistributedGraph.h:94
void start()
Definition: Timer.cpp:82
void on_each(FunctionTy &&fn, const Args &...args)
Low-level parallel loop.
Definition: Loops.h:86
virtual bool is_vertex_cut() const
Returns true if current partition is a vertex cut.
Definition: NewGeneric.h:102
uint64_t computeMasters(MASTERS_DISTRIBUTION masters_distribution, galois::graphs::OfflineGraph &g, const std::vector< unsigned > &scalefactor, uint32_t nodeWeight=0, uint32_t edgeWeight=0, unsigned DecomposeFactor=1)
Wrapper call that will call into more specific compute masters functions that compute masters based o...
Definition: DistributedGraph.h:364
void initializeSpecificRanges()
Initializes the 3 range objects that a user can access to iterate over the graph in different ways...
Definition: DistributedGraph.h:785
uint64_t get_usec() const
Definition: Timer.cpp:92
NewDistGraphGeneric(const std::string &filename, unsigned host, unsigned _numHosts, bool cuspAsync=true, uint32_t stateRounds=100, bool transpose=false, galois::graphs::MASTERS_DISTRIBUTION md=BALANCED_EDGES_OF_MASTERS, uint32_t nodeWeight=0, uint32_t edgeWeight=0, std::string masterBlockFile="", bool readFromFile=false, std::string localGraphFileName="local_graph", uint32_t edgeStateRounds=1)
Constructor.
Definition: NewGeneric.h:154
T & reduce()
Returns the final reduction value.
Definition: Reduction.h:102
size_t size() const
Definition: OfflineGraph.h:271
void readersFromFile(galois::graphs::OfflineGraph &g, std::string filename)
reader assignment from a file corresponds to master assignment if using an edge cut ...
Definition: DistributedGraph.h:404
std::unordered_map< uint64_t, uint32_t > globalToLocalMap
LID = globalToLocalMap[GID].
Definition: DistributedGraph.h:105
uint32_t evilPhase
Variable that keeps track of which network send/recv phase a program is currently on...
Definition: Network.cpp:36
Base DistGraph class that all distributed graphs extend from.
Definition: DistributedGraph.h:64
Buffer for deserialization of data.
Definition: Serialize.h:147
auto iterate(C &cont)
Definition: Range.h:323
This is a container that encapsulates a resizeable array of plain-old-datatype (POD) elements...
Definition: PODResizeableArray.h:40
void update(T &&rhs)
Updates the thread local value by applying the reduction operator to current and newly provided value...
Definition: Reduction.h:90
void resetAndFree()
Free all of the in memory buffers in this object and reset graph status.
Definition: BufferedGraph.h:516
std::vector< uint64_t > localToGlobalVector
GID = localToGlobalVector[LID].
Definition: DistributedGraph.h:103
const unsigned id
ID of the machine.
Definition: DistributedGraph.h:84
void allocateFrom(const FileGraph &graph)
Definition: LC_CSR_Graph.h:513
size_t sizeEdges() const
Definition: OfflineGraph.h:272
uint64_t edgeDestination(uint64_t globalEdgeID)
Get the global node id of the destination of the provided edge.
Definition: BufferedGraph.h:437
EdgeTy edge_data_type
Definition: LC_CSR_Graph.h:147
std::vector< uint32_t > determineUnitRangesFromPrefixSum(uint32_t unitsToSplit, VectorTy &edgePrefixSum, uint32_t nodeAlpha=0)
Uses the divideByNode function (which is binary search based) to divide nodes among units using a pro...
Definition: GraphHelpers.h:484
virtual bool isLocal(uint64_t gid) const
Determine if a node has a proxy on this host.
Definition: NewGeneric.h:92
void stop()
Definition: Timer.cpp:87
Implements distributed reducible objects for easy reduction of values across a distributed system...
Galois Timer that automatically reports stats upon destruction Provides statistic interface around ti...
Definition: Timer.h:63
GraphTy graph
The internal graph used by DistGraph to represent the graph.
Definition: DistributedGraph.h:73
void determineThreadRangesWithEdges()
Determines the thread ranges for nodes with edges only and saves them to the object.
Definition: DistributedGraph.h:762
uint32_t beginMaster
Local id of the beginning of master nodes.
Definition: DistributedGraph.h:91
balance edges
Definition: DistributedGraph.h:52