00001 #ifndef GALOIS_GRAPHLABEXECUTOR_H
00002 #define GALOIS_GRAPHLABEXECUTOR_H
00003
00004 #include "Galois/Bag.h"
00005
00006 #include <boost/mpl/has_xxx.hpp>
00007
00008 namespace Galois {
00010 namespace GraphLab {
00011
00012 BOOST_MPL_HAS_XXX_TRAIT_DEF(tt_needs_gather_in_edges)
00013 template<typename T>
00014 struct needs_gather_in_edges: public has_tt_needs_gather_in_edges<T> {};
00015
00016 BOOST_MPL_HAS_XXX_TRAIT_DEF(tt_needs_gather_out_edges)
00017 template<typename T>
00018 struct needs_gather_out_edges: public has_tt_needs_gather_out_edges<T> {};
00019
00020 BOOST_MPL_HAS_XXX_TRAIT_DEF(tt_needs_scatter_in_edges)
00021 template<typename T>
00022 struct needs_scatter_in_edges: public has_tt_needs_scatter_in_edges<T> {};
00023
00024 BOOST_MPL_HAS_XXX_TRAIT_DEF(tt_needs_scatter_out_edges)
00025 template<typename T>
00026 struct needs_scatter_out_edges: public has_tt_needs_scatter_out_edges<T> {};
00027
00028 struct EmptyMessage {
00029 EmptyMessage& operator+=(const EmptyMessage&) { return *this; }
00030 };
00031
00032 template<typename Graph, typename Operator>
00033 struct Context {
00034 typedef typename Graph::GraphNode GNode;
00035 typedef typename Operator::message_type message_type;
00036 typedef std::pair<GNode,message_type> WorkItem;
00037
00038 private:
00039 template<typename,typename> friend class AsyncEngine;
00040 template<typename,typename> friend class SyncEngine;
00041
00042 typedef std::pair<int,message_type> Message;
00043 typedef std::deque<Message> MyMessages;
00044 typedef Galois::Runtime::PerPackageStorage<MyMessages> Messages;
00045
00046 Galois::UserContext<WorkItem>* ctx;
00047 Graph* graph;
00048 Galois::LargeArray<int>* scoreboard;
00049 Galois::InsertBag<GNode>* next;
00050 Messages* messages;
00051
00052 Context(Galois::UserContext<WorkItem>* c): ctx(c) { }
00053
00054 #if defined(__IBMCPP__) && __IBMCPP__ <= 1210
00055 public:
00056 #endif
00057 Context(Graph* g, Galois::LargeArray<int>* s, Galois::InsertBag<GNode>* n, Messages* m):
00058 graph(g), scoreboard(s), next(n), messages(m) { }
00059
00060 public:
00061
00062 void push(GNode node, const message_type& message) {
00063 if (ctx) {
00064 ctx->push(WorkItem(node, message));
00065 } else {
00066 size_t id = graph->idFromNode(node);
00067 {
00068 int val = (*scoreboard)[id];
00069 if (val == 0 && __sync_bool_compare_and_swap(&(*scoreboard)[id], 0, 1)) {
00070 next->push(node);
00071 }
00072 }
00073
00074 if (messages) {
00075 MyMessages& m = *messages->getLocal();
00076 int val;
00077 while (true) {
00078 val = m[id].first;
00079 if (val == 0 && __sync_bool_compare_and_swap(&m[id].first, 0, 1)) {
00080 m[id].second += message;
00081 m[id].first = 0;
00082 return;
00083 }
00084 }
00085 }
00086 }
00087 }
00088 };
00089
00090 template<typename Graph, typename Operator>
00091 class AsyncEngine {
00092 typedef typename Operator::message_type message_type;
00093 typedef typename Operator::gather_type gather_type;
00094 typedef typename Graph::GraphNode GNode;
00095 typedef typename Graph::in_edge_iterator in_edge_iterator;
00096 typedef typename Graph::edge_iterator edge_iterator;
00097
00098 typedef typename Context<Graph,Operator>::WorkItem WorkItem;
00099
00100 struct Initialize {
00101 AsyncEngine* self;
00102 Galois::InsertBag<WorkItem>& bag;
00103
00104 Initialize(AsyncEngine* s, Galois::InsertBag<WorkItem>& b): self(s), bag(b) { }
00105
00106 void operator()(GNode n) {
00107 bag.push(WorkItem(n, message_type()));
00108 }
00109 };
00110
00111 struct Process {
00112 AsyncEngine* self;
00113 Process(AsyncEngine* s): self(s) { }
00114
00115 void operator()(const WorkItem& item, Galois::UserContext<WorkItem>& ctx) {
00116 Operator op(self->origOp);
00117
00118 GNode node = item.first;
00119 message_type msg = item.second;
00120
00121 if (needs_gather_in_edges<Operator>::value || needs_scatter_in_edges<Operator>::value) {
00122 self->graph.in_edge_begin(node, Galois::MethodFlag::ALL);
00123 }
00124
00125 if (needs_gather_out_edges<Operator>::value || needs_scatter_out_edges<Operator>::value) {
00126 self->graph.edge_begin(node, Galois::MethodFlag::ALL);
00127 }
00128
00129 op.init(self->graph, node, msg);
00130
00131 gather_type sum;
00132 if (needs_gather_in_edges<Operator>::value) {
00133 for (in_edge_iterator ii = self->graph.in_edge_begin(node, Galois::MethodFlag::NONE),
00134 ei = self->graph.in_edge_end(node, Galois::MethodFlag::NONE); ii != ei; ++ii) {
00135 op.gather(self->graph, node, self->graph.getInEdgeDst(ii), node, sum, self->graph.getInEdgeData(ii));
00136 }
00137 }
00138 if (needs_gather_out_edges<Operator>::value) {
00139 for (edge_iterator ii = self->graph.edge_begin(node, Galois::MethodFlag::NONE),
00140 ei = self->graph.edge_end(node, Galois::MethodFlag::NONE); ii != ei; ++ii) {
00141 op.gather(self->graph, node, node, self->graph.getEdgeDst(ii), sum, self->graph.getEdgeData(ii));
00142 }
00143 }
00144
00145 op.apply(self->graph, node, sum);
00146
00147 if (!op.needsScatter(self->graph, node))
00148 return;
00149
00150 Context<Graph,Operator> context(&ctx);
00151
00152 if (needs_scatter_in_edges<Operator>::value) {
00153 for (in_edge_iterator ii = self->graph.in_edge_begin(node, Galois::MethodFlag::NONE),
00154 ei = self->graph.in_edge_end(node, Galois::MethodFlag::NONE); ii != ei; ++ii) {
00155 op.scatter(self->graph, node, self->graph.getInEdgeDst(ii), node, context, self->graph.getInEdgeData(ii));
00156 }
00157 }
00158 if (needs_scatter_out_edges<Operator>::value) {
00159 for (edge_iterator ii = self->graph.edge_begin(node, Galois::MethodFlag::NONE),
00160 ei = self->graph.edge_end(node, Galois::MethodFlag::NONE); ii != ei; ++ii) {
00161 op.scatter(self->graph, node, node, self->graph.getEdgeDst(ii), context, self->graph.getEdgeData(ii));
00162 }
00163 }
00164 }
00165 };
00166
00167 Graph& graph;
00168 Operator origOp;
00169
00170 public:
00171 AsyncEngine(Graph& g, Operator o): graph(g), origOp(o) { }
00172
00173 void execute() {
00174 typedef typename Context<Graph,Operator>::WorkItem WorkItem;
00175 typedef Galois::WorkList::dChunkedFIFO<256> WL;
00176
00177 Galois::InsertBag<WorkItem> bag;
00178 Galois::do_all_local(graph, Initialize(this, bag));
00179 Galois::for_each_local(bag, Process(this), Galois::wl<WL>());
00180 }
00181 };
00182
00183 template<typename Graph, typename Operator>
00184 class SyncEngine {
00185 typedef typename Operator::message_type message_type;
00186 typedef typename Operator::gather_type gather_type;
00187 typedef typename Graph::GraphNode GNode;
00188 typedef typename Graph::in_edge_iterator in_edge_iterator;
00189 typedef typename Graph::edge_iterator edge_iterator;
00190 static const bool NeedMessages = !std::is_same<EmptyMessage,message_type>::value;
00191 typedef Galois::WorkList::dChunkedFIFO<256> WL;
00192 typedef std::pair<int,message_type> Message;
00193 typedef std::deque<Message> MyMessages;
00194 typedef Galois::Runtime::PerPackageStorage<MyMessages> Messages;
00195
00196 Graph& graph;
00197 Operator origOp;
00198 Galois::LargeArray<Operator> ops;
00199 Messages messages;
00200 Galois::LargeArray<int> scoreboard;
00201 Galois::InsertBag<GNode> wls[2];
00202 Galois::Runtime::LL::SimpleLock<true> lock;
00203
00204 struct Gather {
00205 SyncEngine* self;
00206 typedef int tt_does_not_need_push;
00207 typedef int tt_does_not_need_aborts;
00208
00209 Gather(SyncEngine* s): self(s) { }
00210 void operator()(GNode node, Galois::UserContext<GNode>&) {
00211 size_t id = self->graph.idFromNode(node);
00212 Operator& op = self->ops[id];
00213 gather_type sum;
00214
00215 if (needs_gather_in_edges<Operator>::value) {
00216 for (in_edge_iterator ii = self->graph.in_edge_begin(node, Galois::MethodFlag::NONE),
00217 ei = self->graph.in_edge_end(node, Galois::MethodFlag::NONE); ii != ei; ++ii) {
00218 op.gather(self->graph, node, self->graph.getInEdgeDst(ii), node, sum, self->graph.getInEdgeData(ii));
00219 }
00220 }
00221
00222 if (needs_gather_out_edges<Operator>::value) {
00223 for (edge_iterator ii = self->graph.edge_begin(node, Galois::MethodFlag::NONE),
00224 ei = self->graph.edge_end(node, Galois::MethodFlag::NONE); ii != ei; ++ii) {
00225 op.gather(self->graph, node, node, self->graph.getEdgeDst(ii), sum, self->graph.getEdgeData(ii));
00226 }
00227 }
00228
00229 op.apply(self->graph, node, sum);
00230 }
00231 };
00232
00233 template<typename Container>
00234 struct Scatter {
00235 typedef int tt_does_not_need_push;
00236 typedef int tt_does_not_need_aborts;
00237
00238 SyncEngine* self;
00239 Context<Graph,Operator> context;
00240
00241 Scatter(SyncEngine* s, Container& next):
00242 self(s),
00243 context(&self->graph, &self->scoreboard, &next, NeedMessages ? &self->messages : 0)
00244 { }
00245
00246 void operator()(GNode node, Galois::UserContext<GNode>&) {
00247 size_t id = self->graph.idFromNode(node);
00248
00249 Operator& op = self->ops[id];
00250
00251 if (!op.needsScatter(self->graph, node))
00252 return;
00253
00254 if (needs_scatter_in_edges<Operator>::value) {
00255 for (in_edge_iterator ii = self->graph.in_edge_begin(node, Galois::MethodFlag::NONE),
00256 ei = self->graph.in_edge_end(node, Galois::MethodFlag::NONE); ii != ei; ++ii) {
00257 op.scatter(self->graph, node, self->graph.getInEdgeDst(ii), node, context, self->graph.getInEdgeData(ii));
00258 }
00259 }
00260 if (needs_scatter_out_edges<Operator>::value) {
00261 for (edge_iterator ii = self->graph.edge_begin(node, Galois::MethodFlag::NONE),
00262 ei = self->graph.edge_end(node, Galois::MethodFlag::NONE); ii != ei; ++ii) {
00263 op.scatter(self->graph, node, node, self->graph.getEdgeDst(ii), context, self->graph.getEdgeData(ii));
00264 }
00265 }
00266 }
00267 };
00268
00269 template<bool IsFirst>
00270 struct Initialize {
00271 typedef int tt_does_not_need_push;
00272 typedef int tt_does_not_need_aborts;
00273
00274 SyncEngine* self;
00275 Initialize(SyncEngine* s): self(s) { }
00276
00277 void allocateMessages() {
00278 unsigned tid = Galois::Runtime::LL::getTID();
00279 if (!Galois::Runtime::LL::isPackageLeader(tid) || tid == 0)
00280 return;
00281 MyMessages& m = *self->messages.getLocal();
00282 self->lock.lock();
00283 m.resize(self->graph.size());
00284 self->lock.unlock();
00285 }
00286
00287 message_type getMessage(size_t id) {
00288 message_type ret;
00289 if (NeedMessages) {
00290 for (unsigned int i = 0; i < self->messages.size(); ++i) {
00291 if (!Galois::Runtime::LL::isPackageLeader(i))
00292 continue;
00293 MyMessages& m = *self->messages.getRemote(i);
00294 if (m.empty())
00295 continue;
00296 ret += m[id].second;
00297 m[id] = std::make_pair(0, message_type());
00298
00299 if (IsFirst)
00300 break;
00301 }
00302 }
00303 return ret;
00304 }
00305
00306 void operator()(GNode n, Galois::UserContext<GNode>&) {
00307 size_t id = self->graph.idFromNode(n);
00308 if (IsFirst && NeedMessages) {
00309 allocateMessages();
00310 } else if (!IsFirst) {
00311 self->scoreboard[id] = 0;
00312 }
00313
00314 Operator& op = self->ops[id];
00315 op = self->origOp;
00316 op.init(self->graph, n, getMessage(id));
00317
00318
00319 if (needs_gather_in_edges<Operator>::value || needs_gather_out_edges<Operator>::value)
00320 return;
00321
00322 gather_type sum;
00323 op.apply(self->graph, n, sum);
00324
00325 if (needs_scatter_in_edges<Operator>::value || needs_scatter_out_edges<Operator>::value)
00326 return;
00327 }
00328 };
00329
00330 template<bool IsFirst,typename Container1, typename Container2>
00331 void executeStep(Container1& cur, Container2& next) {
00332 Galois::for_each_local(cur, Initialize<IsFirst>(this), Galois::wl<WL>());
00333
00334 if (needs_gather_in_edges<Operator>::value || needs_gather_out_edges<Operator>::value) {
00335 Galois::for_each_local(cur, Gather(this), Galois::wl<WL>());
00336 }
00337
00338 if (needs_scatter_in_edges<Operator>::value || needs_scatter_out_edges<Operator>::value) {
00339 Galois::for_each_local(cur, Scatter<Container2>(this, next), Galois::wl<WL>());
00340 }
00341 }
00342
00343 public:
00344 SyncEngine(Graph& g, Operator op): graph(g), origOp(op) {
00345 ops.create(graph.size());
00346 scoreboard.create(graph.size());
00347 if (NeedMessages)
00348 messages.getLocal()->resize(graph.size());
00349 }
00350
00351 void signal(GNode node, const message_type& msg) {
00352 if (NeedMessages) {
00353 MyMessages& m = *messages.getLocal();
00354 m[graph.idFromNode(node)].second = msg;
00355 }
00356 }
00357
00358 void execute() {
00359 Galois::Statistic rounds("GraphLabRounds");
00360 Galois::InsertBag<GNode>* next = &wls[0];
00361 Galois::InsertBag<GNode>* cur = &wls[1];
00362
00363 executeStep<true>(graph, *next);
00364 rounds += 1;
00365 while (!next->empty()) {
00366 std::swap(cur, next);
00367 executeStep<false>(*cur, *next);
00368 rounds += 1;
00369 cur->clear();
00370 }
00371 }
00372 };
00373
00374 }
00375 }
00376 #endif