00001
00028 #ifndef GALOIS_RUNTIME_PARALLELWORK_H
00029 #define GALOIS_RUNTIME_PARALLELWORK_H
00030
00031 #include "Galois/Mem.h"
00032 #include "Galois/Statistic.h"
00033 #include "Galois/Runtime/Barrier.h"
00034 #include "Galois/Runtime/Context.h"
00035 #include "Galois/Runtime/ForEachTraits.h"
00036 #include "Galois/Runtime/Range.h"
00037 #include "Galois/Runtime/Support.h"
00038 #include "Galois/Runtime/Termination.h"
00039 #include "Galois/Runtime/ThreadPool.h"
00040 #include "Galois/Runtime/UserContextAccess.h"
00041 #include "Galois/WorkList/GFifo.h"
00042
00043 #include <algorithm>
00044 #include <functional>
00045
00046 #ifdef GALOIS_USE_HTM
00047 #include <speculation.h>
00048 #endif
00049
00050 namespace Galois {
00052 namespace Runtime {
00053 namespace {
00054
00055 template<bool Enabled>
00056 class LoopStatistics {
00057 unsigned long conflicts;
00058 unsigned long iterations;
00059 const char* loopname;
00060
00061 #ifdef GALOIS_USE_HTM
00062 TmReport_s start;
00063 void init() {
00064 if (LL::getTID()) return;
00065
00066
00067
00068 #pragma tm_atomic
00069 {
00070 conflicts = 0;
00071 }
00072
00073 tm_get_all_stats(&start);
00074 }
00075
00076 void report() {
00077 if (LL::getTID()) return;
00078 TmReport_s stop;
00079 tm_get_all_stats(&stop);
00080 reportStat(loopname, "HTMTransactions",
00081 stop.totalTransactions - start.totalTransactions);
00082 reportStat(loopname, "HTMRollbacks",
00083 stop.totalRollbacks - start.totalRollbacks);
00084 reportStat(loopname, "HTMSerializedJMV",
00085 stop.totalSerializedJMV - start.totalSerializedJMV);
00086 reportStat(loopname, "HTMSerializedMAXRB",
00087 stop.totalSerializedMAXRB - start.totalSerializedMAXRB);
00088 reportStat(loopname, "HTMSerializedOTHER",
00089 stop.totalSerializedOTHER - start.totalSerializedOTHER);
00090 tm_print_stats();
00091 }
00092 #else
00093 void init() { }
00094 void report() { }
00095 #endif
00096
00097 public:
00098 explicit LoopStatistics(const char* ln) :conflicts(0), iterations(0), loopname(ln) { init(); }
00099 ~LoopStatistics() {
00100 reportStat(loopname, "Conflicts", conflicts);
00101 reportStat(loopname, "Iterations", iterations);
00102 report();
00103 }
00104 inline void inc_iterations(int amount = 1) {
00105 iterations += amount;
00106 }
00107 inline void inc_conflicts() {
00108 ++conflicts;
00109 }
00110 };
00111
00112
00113 template <>
00114 class LoopStatistics<false> {
00115 public:
00116 explicit LoopStatistics(const char* ln) {}
00117 inline void inc_iterations(int amount = 1) const { }
00118 inline void inc_conflicts() const { }
00119 };
00120
00121 template<typename value_type>
00122 class AbortHandler {
00123 struct Item { value_type val; int retries; };
00124
00125 typedef WorkList::GFIFO<Item> AbortedList;
00126 PerThreadStorage<AbortedList> queues;
00127 bool useBasicPolicy;
00128
00132 void basicPolicy(const Item& item) {
00133 unsigned tid = LL::getTID();
00134 unsigned package = LL::getPackageForSelf(tid);
00135 queues.getRemote(LL::getLeaderForPackage(package / 2))->push(item);
00136 }
00137
00142 void doublePolicy(const Item& item) {
00143 int retries = item.retries - 1;
00144 if ((retries & 1) == 1) {
00145 queues.getLocal()->push(item);
00146 return;
00147 }
00148
00149 unsigned tid = LL::getTID();
00150 unsigned package = LL::getPackageForSelf(tid);
00151 unsigned leader = LL::getLeaderForPackage(package);
00152 if (tid != leader) {
00153 unsigned next = leader + (tid - leader) / 2;
00154 queues.getRemote(next)->push(item);
00155 } else {
00156 queues.getRemote(LL::getLeaderForPackage(package / 2))->push(item);
00157 }
00158 }
00159
00164 void boundedPolicy(const Item& item) {
00165 int retries = item.retries - 1;
00166 if (retries < 2) {
00167 queues.getLocal()->push(item);
00168 return;
00169 }
00170
00171 unsigned tid = LL::getTID();
00172 unsigned package = LL::getPackageForSelf(tid);
00173 unsigned leader = LL::getLeaderForPackage(package);
00174 if (retries < 5 && tid != leader) {
00175 unsigned next = leader + (tid - leader) / 2;
00176 queues.getRemote(next)->push(item);
00177 } else {
00178 queues.getRemote(LL::getLeaderForPackage(package / 2))->push(item);
00179 }
00180 }
00181
00185 void eagerPolicy(const Item& item) {
00186 queues.getLocal()->push(item);
00187 }
00188
00189 public:
00190 AbortHandler() {
00191
00192 useBasicPolicy = LL::getMaxPackages() > 2;
00193 }
00194
00195 value_type& value(Item& item) const { return item.val; }
00196 value_type& value(value_type& val) const { return val; }
00197
00198 void push(const value_type& val) {
00199 Item item = { val, 1 };
00200 queues.getLocal()->push(item);
00201 }
00202
00203 void push(const Item& item) {
00204 Item newitem = { item.val, item.retries + 1 };
00205 if (useBasicPolicy)
00206 basicPolicy(newitem);
00207 else
00208 doublePolicy(newitem);
00209 }
00210
00211 AbortedList* getQueue() { return queues.getLocal(); }
00212 };
00213
00214 template<class WorkListTy, class T, class FunctionTy>
00215 class ForEachWork {
00216 protected:
00217 typedef T value_type;
00218 typedef typename WorkListTy::template retype<value_type>::type WLTy;
00219
00220 struct ThreadLocalData {
00221 FunctionTy function;
00222 UserContextAccess<value_type> facing;
00223 SimpleRuntimeContext ctx;
00224 LoopStatistics<ForEachTraits<FunctionTy>::NeedsStats> stat;
00225 ThreadLocalData(const FunctionTy& fn, const char* ln): function(fn), stat(ln) {}
00226 };
00227
00228
00229
00230
00231 AbortHandler<value_type> aborted;
00232 TerminationDetection& term;
00233
00234 WLTy wl;
00235 FunctionTy& origFunction;
00236 const char* loopname;
00237 bool broke;
00238
00239 inline void commitIteration(ThreadLocalData& tld) {
00240 if (ForEachTraits<FunctionTy>::NeedsPush) {
00241 auto ii = tld.facing.getPushBuffer().begin();
00242 auto ee = tld.facing.getPushBuffer().end();
00243 if (ii != ee) {
00244 wl.push(ii, ee);
00245 tld.facing.resetPushBuffer();
00246 }
00247 }
00248 if (ForEachTraits<FunctionTy>::NeedsPIA)
00249 tld.facing.resetAlloc();
00250 if (ForEachTraits<FunctionTy>::NeedsAborts)
00251 tld.ctx.commitIteration();
00252 }
00253
00254 template<typename Item>
00255 GALOIS_ATTRIBUTE_NOINLINE
00256 void abortIteration(const Item& item, ThreadLocalData& tld) {
00257 assert(ForEachTraits<FunctionTy>::NeedsAborts);
00258 tld.ctx.cancelIteration();
00259 tld.stat.inc_conflicts();
00260 aborted.push(item);
00261
00262 if (ForEachTraits<FunctionTy>::NeedsPush)
00263 tld.facing.resetPushBuffer();
00264
00265 if (ForEachTraits<FunctionTy>::NeedsPIA)
00266 tld.facing.resetAlloc();
00267 }
00268
00269 #ifdef GALOIS_USE_HTM
00270 # ifndef GALOIS_USE_LONGJMP
00271 # error "HTM must be used with GALOIS_USE_LONGJMP"
00272 # endif
00273 #endif
00274
00275 inline void doProcess(value_type& val, ThreadLocalData& tld) {
00276 tld.stat.inc_iterations();
00277 if (ForEachTraits<FunctionTy>::NeedsAborts)
00278 tld.ctx.startIteration();
00279
00280 #ifdef GALOIS_USE_HTM
00281 # ifndef GALOIS_USE_LONGJMP
00282 # error "HTM must be used with GALOIS_USE_LONGJMP"
00283 # endif
00284 #pragma tm_atomic
00285 {
00286 #endif
00287 tld.function(val, tld.facing.data());
00288 #ifdef GALOIS_USE_HTM
00289 }
00290 #endif
00291
00292 clearReleasable();
00293 commitIteration(tld);
00294 }
00295
00296 bool runQueueSimple(ThreadLocalData& tld) {
00297 bool workHappened = false;
00298 Galois::optional<value_type> p = wl.pop();
00299 if (p)
00300 workHappened = true;
00301 while (p) {
00302 doProcess(*p, tld);
00303 p = wl.pop();
00304 }
00305 return workHappened;
00306 }
00307
00308 template<int limit, typename WL>
00309 bool runQueue(ThreadLocalData& tld, WL& lwl) {
00310 bool workHappened = false;
00311 Galois::optional<typename WL::value_type> p = lwl.pop();
00312 unsigned num = 0;
00313 int result = 0;
00314 if (p)
00315 workHappened = true;
00316 #ifdef GALOIS_USE_LONGJMP
00317 if ((result = setjmp(hackjmp)) == 0) {
00318 #else
00319 try {
00320 #endif
00321 while (p) {
00322 doProcess(aborted.value(*p), tld);
00323 if (limit) {
00324 ++num;
00325 if (num == limit)
00326 break;
00327 }
00328 p = lwl.pop();
00329 }
00330 #ifdef GALOIS_USE_LONGJMP
00331 } else {
00332 clearReleasable();
00333 clearConflictLock();
00334 }
00335 #else
00336 } catch (ConflictFlag const& flag) {
00337 clearReleasable();
00338 clearConflictLock();
00339 result = flag;
00340 }
00341 #endif
00342 switch (result) {
00343 case 0:
00344 break;
00345 case CONFLICT:
00346 abortIteration(*p, tld);
00347 break;
00348 default:
00349 GALOIS_DIE("unknown conflict type");
00350 }
00351 return workHappened;
00352 }
00353
00354 GALOIS_ATTRIBUTE_NOINLINE
00355 bool handleAborts(ThreadLocalData& tld) {
00356 return runQueue<0>(tld, *aborted.getQueue());
00357 }
00358
00359 void fastPushBack(typename UserContextAccess<value_type>::PushBufferTy& x) {
00360 wl.push(x.begin(), x.end());
00361 x.clear();
00362 }
00363
00364 template<bool couldAbort, bool isLeader>
00365 void go() {
00366
00367 ThreadLocalData tld(origFunction, loopname);
00368 tld.facing.setBreakFlag(&broke);
00369 if (couldAbort)
00370 setThreadContext(&tld.ctx);
00371 if (ForEachTraits<FunctionTy>::NeedsPush && !couldAbort)
00372 tld.facing.setFastPushBack(
00373 std::bind(&ForEachWork::fastPushBack, std::ref(*this), std::placeholders::_1));
00374 bool didWork;
00375 do {
00376 didWork = false;
00377
00378 if (couldAbort || ForEachTraits<FunctionTy>::NeedsBreak) {
00379 if (isLeader)
00380 didWork = runQueue<32>(tld, wl);
00381 else
00382 didWork = runQueue<ForEachTraits<FunctionTy>::NeedsBreak ? 32 : 0>(tld, wl);
00383
00384 if (couldAbort)
00385 didWork |= handleAborts(tld);
00386 } else {
00387 didWork = runQueueSimple(tld);
00388 }
00389
00390 term.localTermination(didWork);
00391 } while (!term.globalTermination() && (!ForEachTraits<FunctionTy>::NeedsBreak || !broke));
00392
00393 if (couldAbort)
00394 setThreadContext(0);
00395 }
00396
00397 public:
00398 ForEachWork(FunctionTy& f, const char* l): term(getSystemTermination()), origFunction(f), loopname(l), broke(false) { }
00399
00400 template<typename W>
00401 ForEachWork(W& w, FunctionTy& f, const char* l): term(getSystemTermination()), wl(w), origFunction(f), loopname(l), broke(false) { }
00402
00403 template<typename RangeTy>
00404 void AddInitialWork(const RangeTy& range) {
00405 wl.push_initial(range);
00406 }
00407
00408 void initThread(void) {
00409 term.initializeThread();
00410 }
00411
00412 void operator()() {
00413 bool isLeader = LL::isPackageLeaderForSelf(LL::getTID());
00414 bool couldAbort = ForEachTraits<FunctionTy>::NeedsAborts && activeThreads > 1;
00415 #ifdef GALOIS_USE_HTM
00416 couldAbort = false;
00417 #endif
00418 if (couldAbort && isLeader)
00419 go<true, true>();
00420 else if (couldAbort && !isLeader)
00421 go<true, false>();
00422 else if (!couldAbort && isLeader)
00423 go<false, true>();
00424 else
00425 go<false, false>();
00426 }
00427 };
00428
00429 template<typename WLTy, typename RangeTy, typename FunctionTy>
00430 void for_each_impl(const RangeTy& range, FunctionTy f, const char* loopname) {
00431 if (inGaloisForEach)
00432 GALOIS_DIE("Nested for_each not supported");
00433
00434 StatTimer LoopTimer("LoopTime", loopname);
00435 if (ForEachTraits<FunctionTy>::NeedsStats)
00436 LoopTimer.start();
00437
00438 inGaloisForEach = true;
00439
00440 typedef typename RangeTy::value_type T;
00441 typedef ForEachWork<WLTy, T, FunctionTy> WorkTy;
00442
00443
00444
00445 Barrier& barrier = getSystemBarrier();
00446
00447 WorkTy W(f, loopname);
00448 RunCommand w[5] = {
00449 std::bind(&WorkTy::initThread, std::ref(W)),
00450 std::bind(&WorkTy::template AddInitialWork<RangeTy>, std::ref(W), range),
00451 std::ref(barrier),
00452 std::ref(W),
00453 std::ref(barrier)
00454 };
00455 getSystemThreadPool().run(&w[0], &w[5], activeThreads);
00456 if (ForEachTraits<FunctionTy>::NeedsStats)
00457 LoopTimer.stop();
00458 inGaloisForEach = false;
00459 }
00460
00461 template<typename FunctionTy>
00462 struct WOnEach {
00463 FunctionTy& origFunction;
00464 WOnEach(FunctionTy& f): origFunction(f) { }
00465 void operator()(void) {
00466 FunctionTy fn(origFunction);
00467 fn(LL::getTID(), activeThreads);
00468 }
00469 };
00470
00471 template<typename FunctionTy>
00472 void on_each_impl(FunctionTy fn, const char* loopname = 0) {
00473 if (inGaloisForEach)
00474 GALOIS_DIE("Nested for_each not supported");
00475
00476 inGaloisForEach = true;
00477 RunCommand w[2] = {WOnEach<FunctionTy>(fn),
00478 std::ref(getSystemBarrier())};
00479 getSystemThreadPool().run(&w[0], &w[2], activeThreads);
00480 inGaloisForEach = false;
00481 }
00482
00484 template<typename FunctionTy>
00485 void on_each_simple_impl(FunctionTy fn, const char* loopname = 0) {
00486 if (inGaloisForEach)
00487 GALOIS_DIE("Nested for_each not supported");
00488
00489 inGaloisForEach = true;
00490 Barrier* b = createSimpleBarrier();
00491 b->reinit(activeThreads);
00492 RunCommand w[2] = {WOnEach<FunctionTy>(fn),
00493 std::ref(*b)};
00494 getSystemThreadPool().run(&w[0], &w[2], activeThreads);
00495 delete b;
00496 inGaloisForEach = false;
00497 }
00498
00499 }
00500
00501 void preAlloc_impl(int num);
00502
00503 }
00504 }
00505
00506 #endif
00507