00001
00028 #ifndef GALOIS_RUNTIME_PARALLELWORK_H
00029 #define GALOIS_RUNTIME_PARALLELWORK_H
00030 #include <algorithm>
00031 #include <numeric>
00032 #include <sstream>
00033 #include <math.h>
00034 #include "Galois/TypeTraits.h"
00035 #include "Galois/Executable.h"
00036 #include "Galois/Mem.h"
00037
00038 #include "Galois/Runtime/Support.h"
00039 #include "Galois/Runtime/Context.h"
00040 #include "Galois/Runtime/Threads.h"
00041 #include "Galois/Runtime/PerCPU.h"
00042 #include "Galois/Runtime/WorkList.h"
00043 #include "Galois/Runtime/DebugWorkList.h"
00044 #include "Galois/Runtime/Termination.h"
00045
00046 #ifdef GALOIS_VTUNE
00047 #include "ittnotify.h"
00048 #endif
00049
00050 namespace GaloisRuntime {
00051
00052
00053 template<bool SRC_ACTIVE>
00054 class SimpleRuntimeContextHandler;
00055
00056 template<>
00057 class SimpleRuntimeContextHandler<true> {
00058 SimpleRuntimeContext src;
00059 public:
00060 void start_iteration() {
00061 src.start_iteration();
00062 }
00063 void cancel_iteration() {
00064 src.cancel_iteration();
00065 }
00066 void commit_iteration() {
00067 src.commit_iteration();
00068 }
00069 void start_parallel_region() {
00070 setThreadContext(&src);
00071 }
00072 void end_parallel_region() {
00073 setThreadContext(0);
00074 }
00075 };
00076
00077 template<>
00078 class SimpleRuntimeContextHandler<false> {
00079 public:
00080 void start_iteration() {}
00081 void cancel_iteration() {}
00082 void commit_iteration() {}
00083 void start_parallel_region() {}
00084 void end_parallel_region() {}
00085 };
00086
00087
00088 template<bool STAT_ACTIVE>
00089 class StatisticHandler;
00090
00091 template<>
00092 class StatisticHandler<true> {
00093 unsigned long conflicts;
00094 unsigned long iterations;
00095 public:
00096 StatisticHandler() :conflicts(0), iterations(0) {}
00097 void inc_iterations() {
00098 ++iterations;
00099 }
00100 void inc_conflicts() {
00101 ++conflicts;
00102 }
00103 void report_stat(const char* loopname) const {
00104 reportStatSum("Conflicts", conflicts, loopname);
00105 reportStatSum("Iterations", iterations, loopname);
00106 reportStatAvg("ConflictsDistribution", conflicts, loopname);
00107 reportStatAvg("IterationsDistribution", iterations, loopname);
00108 }
00109 void done() {
00110 GaloisRuntime::statDone();
00111 }
00112 int num() {
00113 return GaloisRuntime::getSystemThreadPool().getActiveThreads();
00114 }
00115 };
00116
00117 template<>
00118 class StatisticHandler<false> {
00119 public:
00120 inline void inc_iterations() const {}
00121 inline void inc_conflicts() const {}
00122 inline void report_stat(const char*) const {}
00123 inline void done() const {}
00124 inline int num() const { return 0; }
00125 };
00126
00127
00128
00129 template<bool BREAK_ACTIVE>
00130 class API_Break;
00131 template<bool BREAK_ACTIVE>
00132 class BreakImpl;
00133
00134 template<>
00135 class BreakImpl<true> {
00136 volatile bool break_;
00137 public:
00138 BreakImpl() : break_(false) {}
00139
00140 void handleBreak() {
00141 break_ = true;
00142 }
00143
00144 template<typename WLTy>
00145 void check(WLTy* wl) {
00146 typedef typename WLTy::value_type value_type;
00147 if (break_) {
00148 while (true) {
00149 std::pair<bool, value_type> p = wl->pop();
00150 if (!p.first)
00151 break;
00152 }
00153 }
00154 }
00155 };
00156
00157 template<>
00158 class BreakImpl<false> {
00159 public:
00160 template<typename WLTy>
00161 void check(WLTy*) {}
00162 };
00163
00164 template<>
00165 class API_Break<true> {
00166 BreakImpl<true>* p_;
00167 protected:
00168 void init_break(BreakImpl<true>* p) {
00169 p_ = p;
00170 }
00171 public:
00172 void breakLoop() {
00173 p_->handleBreak();
00174 }
00175 };
00176 template<>
00177 class API_Break<false> {
00178 protected:
00179 void init_break(BreakImpl<false>* p) { }
00180 };
00181
00182
00183 template<bool PIA_ACTIVE>
00184 class API_PerIter;
00185
00186 template<>
00187 class API_PerIter<true>
00188 {
00189 Galois::ItAllocBaseTy IterationAllocatorBase;
00190 Galois::PerIterAllocTy PerIterationAllocator;
00191
00192 protected:
00193 void __resetAlloc() {
00194 IterationAllocatorBase.clear();
00195 }
00196
00197 public:
00198 API_PerIter()
00199 :IterationAllocatorBase(),
00200 PerIterationAllocator(&IterationAllocatorBase)
00201 {}
00202
00203 virtual ~API_PerIter() {
00204 IterationAllocatorBase.clear();
00205 }
00206
00207 Galois::PerIterAllocTy& getPerIterAlloc() {
00208 return PerIterationAllocator;
00209 }
00210 };
00211
00212 template<>
00213 class API_PerIter<false>
00214 {
00215 protected:
00216 void __resetAlloc() {}
00217 };
00218
00219
00220
00221 template<bool PUSH_ACTIVE, typename WLT>
00222 class API_Push;
00223
00224 template<typename WLT>
00225 class API_Push<true, WLT> {
00226 typedef typename WLT::value_type value_type;
00227 WLT* wl;
00228 protected:
00229 void init_wl(WLT* _wl) {
00230 wl = _wl;
00231 }
00232 public:
00233 void push(const value_type& v) {
00234 wl->push(v);
00235 }
00236 };
00237
00238 template<typename WLT>
00239 class API_Push<false, WLT> {
00240 protected:
00241 void init_wl(WLT* _wl) {}
00242 };
00243
00244
00245 template<typename Function>
00246 struct Configurator {
00247 enum {
00248 CollectStats = !Galois::does_not_need_stats<Function>::value,
00249 NeedsBreak = Galois::needs_parallel_break<Function>::value,
00250 NeedsPush = !Galois::does_not_need_parallel_push<Function>::value,
00251 NeedsContext = !Galois::does_not_need_context<Function>::value,
00252 NeedsPIA = Galois::needs_per_iter_alloc<Function>::value
00253 };
00254 };
00255
00256 template<typename Function, class WorkListTy>
00257 class ParallelThreadContext
00258 : public SimpleRuntimeContextHandler<Configurator<Function>::NeedsContext>,
00259 public StatisticHandler<Configurator<Function>::CollectStats>
00260 {
00261 typedef typename WorkListTy::value_type value_type;
00262 public:
00263 class UserAPI
00264 :public API_PerIter<Configurator<Function>::NeedsPIA>,
00265 public API_Push<Configurator<Function>::NeedsPush, WorkListTy>,
00266 public API_Break<Configurator<Function>::NeedsBreak>
00267 {
00268 friend class ParallelThreadContext;
00269 };
00270
00271 private:
00272
00273 UserAPI facing;
00274 TerminationDetection::tokenHolder* lterm;
00275 bool leader;
00276
00277 public:
00278 ParallelThreadContext() {}
00279
00280 virtual ~ParallelThreadContext() {}
00281
00282 void initialize(TerminationDetection::tokenHolder* t,
00283 bool _leader,
00284 WorkListTy* wl,
00285 BreakImpl<Configurator<Function>::NeedsBreak>* p) {
00286 lterm = t;
00287 leader = _leader;
00288 facing.init_wl(wl);
00289 facing.init_break(p);
00290 }
00291
00292 void workHappened() {
00293 lterm->workHappened();
00294 }
00295
00296 bool is_leader() const {
00297 return leader;
00298 }
00299
00300 UserAPI& userFacing() {
00301 return facing;
00302 }
00303
00304 void resetAlloc() {
00305 facing.__resetAlloc();
00306 }
00307
00308 };
00309
00310 template<class WorkListTy, class Function>
00311 class ForEachWork : public Galois::Executable {
00312 typedef typename WorkListTy::value_type value_type;
00313 typedef GaloisRuntime::WorkList::MP_SC_FIFO<value_type> AbortedListTy;
00314 typedef ParallelThreadContext<Function, WorkListTy> PCTy;
00315
00316 WorkListTy global_wl;
00317 BreakImpl<Configurator<Function>::NeedsBreak> breaker;
00318 Function& f;
00319 const char* loopname;
00320
00321 PerCPU<PCTy> tdata;
00322 TerminationDetection term;
00323 AbortedListTy aborted;
00324 volatile long abort_happened;
00325
00326 bool drainAborted() {
00327 bool retval = false;
00328 abort_happened = 0;
00329 std::pair<bool, value_type> p = aborted.pop();
00330 while (p.first) {
00331 retval = true;
00332 global_wl.push(p.second);
00333 p = aborted.pop();
00334 }
00335 return retval;
00336 }
00337
00338 void doAborted(value_type val) {
00339 aborted.push(val);
00340 abort_happened = 1;
00341 }
00342
00343 void doProcess(value_type val, PCTy& tld) {
00344 tld.inc_iterations();
00345 tld.start_iteration();
00346 try {
00347 f(val, tld.userFacing());
00348 } catch (int a) {
00349 tld.cancel_iteration();
00350 tld.inc_conflicts();
00351 doAborted(val);
00352 return;
00353 }
00354 tld.commit_iteration();
00355 tld.resetAlloc();
00356 }
00357
00358 public:
00359 template<typename IterTy>
00360 ForEachWork(IterTy b, IterTy e, Function& _f, const char* _loopname)
00361 :f(_f), loopname(_loopname), abort_happened(0) {
00362 global_wl.fill_initial(b, e);
00363 }
00364
00365 ~ForEachWork() {
00366 for (int i = 0; i < tdata.get(0).num(); ++i)
00367 tdata.get(i).report_stat(loopname);
00368 tdata.get(0).done();
00369 assert(global_wl.empty());
00370 }
00371
00372 virtual void operator()() {
00373 PCTy& tld = tdata.get();
00374 tld.initialize(term.getLocalTokenHolder(),
00375 tdata.myEffectiveID() == 0,
00376 &global_wl,
00377 &breaker);
00378
00379 tld.start_parallel_region();
00380 do {
00381 std::pair<bool, value_type> p = global_wl.pop();
00382 if (p.first) {
00383 tld.workHappened();
00384 doProcess(p.second, tld);
00385 do {
00386 if (tld.is_leader() && abort_happened) {
00387 drainAborted();
00388 }
00389 breaker.check(&global_wl);
00390 p = global_wl.pop();
00391 if (p.first) {
00392 doProcess(p.second, tld);
00393 } else {
00394 break;
00395 }
00396 } while(true);
00397 }
00398 if (tld.is_leader() && drainAborted())
00399 continue;
00400
00401 breaker.check(&global_wl);
00402
00403 term.localTermination();
00404 } while (!term.globalTermination());
00405
00406 tld.end_parallel_region();
00407 }
00408 };
00409
00410 template<typename WLTy, typename IterTy, typename Function>
00411 void for_each_impl(IterTy b, IterTy e, Function f, const char* loopname) {
00412 #ifdef GALOIS_VTUNE
00413 __itt_resume();
00414 #endif
00415
00416 typedef typename WLTy::template retype<typename std::iterator_traits<IterTy>::value_type>::WL aWLTy;
00417
00418 ForEachWork<aWLTy, Function> GW(b, e, f, loopname);
00419 ThreadPool& PTP = getSystemThreadPool();
00420
00421 PTP.run(&GW);
00422
00423 #ifdef GALOIS_VTUNE
00424 __itt_pause();
00425 #endif
00426 }
00427
00428 }
00429
00430 #endif