00001
00025 #ifndef GALOIS_RUNTIME_DETERMINISTICWORK_H
00026 #define GALOIS_RUNTIME_DETERMINISTICWORK_H
00027
00028 #include "Galois/config.h"
00029
00030
00031
00032
00033
00034 #include "Galois/config.h"
00035 #include "Galois/Threads.h"
00036
00037 #include "Galois/ParallelSTL/ParallelSTL.h"
00038 #include "Galois/TwoLevelIterator.h"
00039 #include "Galois/Runtime/ll/gio.h"
00040 #include "Galois/Runtime/ll/EnvCheck.h"
00041
00042 #include <boost/iterator/iterator_facade.hpp>
00043 #include <boost/iterator/transform_iterator.hpp>
00044 #include <boost/iterator/counting_iterator.hpp>
00045
00046 #include GALOIS_CXX11_STD_HEADER(type_traits)
00047 #include <deque>
00048 #include <queue>
00049
00050 namespace Galois {
00051 namespace Runtime {
00053 namespace DeterministicImpl {
00054
00055 template<typename T>
00056 struct DItem {
00057 T val;
00058 unsigned long id;
00059 void *localState;
00060
00061 DItem(const T& _val, unsigned long _id): val(_val), id(_id), localState(NULL) { }
00062 };
00063
00064 template<typename T, typename OptionsTy>
00065 class DeterministicContext:
00066 public SimpleRuntimeContext,
00067 public StrictObject<typename boost::mpl::if_c<OptionsTy::useInOrderCommit, unsigned long, void>::type>
00068 {
00069 template<bool useInOrderCommit = OptionsTy::useInOrderCommit>
00070 void updateAborted(unsigned long id, typename std::enable_if<useInOrderCommit>::type* = 0) {
00071 this->get() = std::min(this->get(), id);
00072 }
00073
00074 template<bool useInOrderCommit = OptionsTy::useInOrderCommit>
00075 void updateAborted(unsigned long id, typename std::enable_if<!useInOrderCommit>::type* = 0) { }
00076
00077 public:
00078 DItem<T> item;
00079 bool not_ready;
00080
00081 DeterministicContext(const DItem<T>& _item):
00082 SimpleRuntimeContext(true),
00083 item(_item),
00084 not_ready(false)
00085 { }
00086
00087 bool notReady() const {
00088 return not_ready;
00089 }
00090
00091 virtual void subAcquire(Lockable* lockable) {
00092 if (getPending() == COMMITTING)
00093 return;
00094
00095 if (this->tryLock(lockable)) {
00096 this->addToNhood(lockable);
00097 }
00098
00099 DeterministicContext* other;
00100 do {
00101 other = static_cast<DeterministicContext*>(this->getOwner(lockable));
00102 if (other == this)
00103 return;
00104 if (other) {
00105 bool conflict = other->item.id < this->item.id;
00106 if (conflict) {
00107
00108 not_ready = true;
00109 updateAborted(this->item.id);
00110 return;
00111 }
00112 }
00113 } while (!this->stealByCAS(lockable, other));
00114
00115
00116 if (other) {
00117 other->not_ready = true;
00118 updateAborted(other->item.id);
00119 }
00120
00121 return;
00122 }
00123 };
00124
00125 namespace {
00126
00127 template<typename T, typename Function1Ty, typename Function2Ty>
00128 struct Options {
00129 typedef T value_type;
00130 typedef Function1Ty function1_type;
00131 typedef Function2Ty function2_type;
00132
00133 static const bool needsStats = ForEachTraits<Function1Ty>::NeedsStats || ForEachTraits<Function2Ty>::NeedsStats;
00134 static const bool needsPush = ForEachTraits<Function1Ty>::NeedsPush || ForEachTraits<Function2Ty>::NeedsPush;
00135 static const bool needsBreak = ForEachTraits<Function1Ty>::NeedsBreak || ForEachTraits<Function2Ty>::NeedsBreak;
00136 static const bool hasBreak = has_deterministic_parallel_break<Function1Ty>::value;
00137 static const bool hasId = has_deterministic_id<Function1Ty>::value;
00138 static const bool useLocalState = has_deterministic_local_state<Function1Ty>::value;
00139 #ifdef GALOIS_USE_DET_FIXED_WINDOW
00140 static const bool hasFixedWindow = true;
00141 #else
00142 static const bool hasFixedWindow = false;
00143 #endif
00144 #ifdef GALOIS_USE_DET_INORDER
00145 static const bool useInOrderCommit = true;
00146 #else
00147 static const bool useInOrderCommit = false;
00148 #endif
00149
00150 Function1Ty fn1;
00151 Function2Ty fn2;
00152 int defaultDelta;
00153
00154 Options(const Function1Ty& fn1, const Function2Ty& fn2): fn1(fn1), fn2(fn2) {
00155 if (!LL::EnvCheck("GALOIS_FIXED_DET_WINDOW_SIZE", defaultDelta))
00156 defaultDelta = 256;
00157 }
00158 };
00159
00160 template<typename OptionsTy, bool Enable>
00161 struct StateManagerBase {
00162 typedef typename OptionsTy::value_type value_type;
00163 typedef typename OptionsTy::function1_type function_type;
00164 void alloc(UserContextAccess<value_type>&, function_type& self) { }
00165 void dealloc(UserContextAccess<value_type>&) { }
00166 void save(UserContextAccess<value_type>&, void*&) { }
00167 void restore(UserContextAccess<value_type>&, void*) { }
00168 void reuseItem(DItem<value_type>& item) { }
00169
00170 template<typename LWL, typename GWL>
00171 typename GWL::value_type* emplaceContext(LWL& lwl, GWL& gwl, const DItem<value_type>& item) const {
00172 return gwl.emplace(item);
00173 }
00174
00175 template<typename LWL, typename GWL>
00176 typename GWL::value_type* peekContext(LWL& lwl, GWL& gwl) const {
00177 return gwl.peek();
00178 }
00179
00180 template<typename LWL, typename GWL>
00181 void popContext(LWL& lwl, GWL& gwl) const {
00182 gwl.pop_peeked();
00183 }
00184 };
00185
00186 template<typename OptionsTy>
00187 struct StateManagerBase<OptionsTy, true> {
00188 typedef typename OptionsTy::value_type value_type;
00189 typedef typename OptionsTy::function1_type function_type;
00190 typedef typename function_type::GaloisDeterministicLocalState LocalState;
00191
00192 void alloc(UserContextAccess<value_type>& c, function_type& self) {
00193 void *p = c.data().getPerIterAlloc().allocate(sizeof(LocalState));
00194 new (p) LocalState(self, c.data().getPerIterAlloc());
00195 c.setLocalState(p, false);
00196 }
00197
00198 void dealloc(UserContextAccess<value_type>& c) {
00199 bool dummy;
00200 LocalState *p = reinterpret_cast<LocalState*>(c.data().getLocalState(dummy));
00201 if (p)
00202 p->~LocalState();
00203 }
00204
00205 void save(UserContextAccess<value_type>& c, void*& localState) {
00206 bool dummy;
00207 localState = c.data().getLocalState(dummy);
00208 }
00209
00210 void restore(UserContextAccess<value_type>& c, void* localState) {
00211 c.setLocalState(localState, true);
00212 }
00213
00214 template<typename LWL, typename GWL>
00215 typename LWL::value_type* emplaceContext(LWL& lwl, GWL& gwl, const DItem<value_type>& item) const {
00216 return lwl.emplace(item);
00217 }
00218
00219 template<typename LWL, typename GWL>
00220 typename LWL::value_type* peekContext(LWL& lwl, GWL& gwl) const {
00221 return lwl.peek();
00222 }
00223
00224 template<typename LWL, typename GWL>
00225 void popContext(LWL& lwl, GWL& gwl) const {
00226 lwl.pop_peeked();
00227 }
00228
00229 void reuseItem(DItem<value_type>& item) { item.localState = NULL; }
00230 };
00231
00232 template<typename OptionsTy>
00233 struct StateManager: public StateManagerBase<OptionsTy, OptionsTy::useLocalState> { };
00234
00235 template<typename OptionsTy, bool Enable>
00236 struct BreakManagerBase {
00237 bool checkBreak(typename OptionsTy::function1_type&) { return false; }
00238 };
00239
00240 template<typename OptionsTy>
00241 class BreakManagerBase<OptionsTy, true> {
00242 Barrier& barrier;
00243 LL::CacheLineStorage<volatile long> done;
00244
00245 public:
00246 BreakManagerBase(): barrier(getSystemBarrier()) { }
00247
00248 bool checkBreak(typename OptionsTy::function1_type& fn) {
00249 if (LL::getTID() == 0)
00250 done.data = fn.galoisDeterministicParallelBreak();
00251 barrier.wait();
00252 return done.data;
00253 }
00254 };
00255
00256 template<typename OptionsTy>
00257 struct BreakManager: public BreakManagerBase<OptionsTy, OptionsTy::hasBreak> { };
00258
00259 template<typename OptionsTy, bool Enable>
00260 struct NewWorkManagerBase {
00261 static const bool value = false;
00262 static const int ChunkSize = 32;
00263 static const int MinDelta = ChunkSize * 40;
00264
00265 template<typename Arg>
00266 static uintptr_t id(const typename OptionsTy::function1_type& fn, Arg arg) { return 0; }
00267
00268 template<typename WL, typename T>
00269 void pushNew(const OptionsTy& options, WL& wl, const T& val, unsigned long parent, unsigned count) const {
00270 typedef typename WL::value_type value_type;
00271 wl.push(value_type(val, parent, count));
00272 }
00273 };
00274
00275 template<typename OptionsTy>
00276 struct NewWorkManagerBase<OptionsTy, true> {
00277 static const bool value = true;
00278 static const int ChunkSize = 32;
00279 static const int MinDelta = ChunkSize * 40;
00280
00281 template<typename Arg>
00282 static uintptr_t id(const typename OptionsTy::function1_type& fn, Arg arg) {
00283 return fn.galoisDeterministicId(std::forward<Arg>(arg));
00284 }
00285
00286 template<typename WL, typename T>
00287 void pushNew(const OptionsTy& options, WL& wl, const T& val, unsigned long parent, unsigned count) const {
00288 typedef typename WL::value_type value_type;
00289 wl.push(value_type(val, id(options.fn1, val), 1));
00290 }
00291 };
00292
00293 template<typename OptionsTy>
00294 struct NewWorkManager: public NewWorkManagerBase<OptionsTy, OptionsTy::hasId> { };
00295
00296 template<typename OptionsTy, bool Enable>
00297 struct InOrderManagerBase {
00298 typedef DeterministicContext<typename OptionsTy::value_type, OptionsTy> Context;
00299
00300 void initializeContext(Context* ctx) { }
00301 void updateAborted(const Context* ctx) { }
00302 void resetAborted() { }
00303 bool shouldCommit(const Context* ctx) { return true; }
00304 void allReduceAborted() { }
00305 };
00306
00307 template<typename OptionsTy>
00308 struct InOrderManagerBase<OptionsTy, true> {
00309 typedef DeterministicContext<typename OptionsTy::value_type, OptionsTy> Context;
00310
00311 PerThreadStorage<unsigned long> data;
00312 Barrier& barrier;
00313 InOrderManagerBase(): barrier(getSystemBarrier()) { }
00314
00315 void initializeContext(Context* ctx) { ctx->get() = std::numeric_limits<unsigned long>::max(); }
00316
00317 void updateAborted(const Context* ctx) {
00318 unsigned long& r = *data.getLocal();
00319 r = std::min(r, ctx->get());
00320 }
00321 void resetAborted() { *data.getLocal() = std::numeric_limits<unsigned long>::max(); }
00322 bool shouldCommit(const Context* ctx) {
00323 return ctx->item.id < *data.getLocal();
00324 }
00325 void allReduceAborted() {
00326 unsigned long r = std::numeric_limits<unsigned long>::max();
00327 for (unsigned i = 0; i < activeThreads; ++i)
00328 r = std::min(r, *data.getRemote(i));
00329 barrier.wait();
00330 *data.getLocal() = r;
00331 }
00332 };
00333
00334 template<typename OptionsTy>
00335 struct InOrderManager: public InOrderManagerBase<OptionsTy, OptionsTy::useInOrderCommit> { };
00336
00337 template<typename T>
00338 struct DNewItem {
00339 T val;
00340 unsigned long parent;
00341 unsigned count;
00342
00343 DNewItem(const T& _val, unsigned long _parent, unsigned _count): val(_val), parent(_parent), count(_count) { }
00344
00345 bool operator<(const DNewItem<T>& o) const {
00346 if (parent < o.parent)
00347 return true;
00348 else if (parent == o.parent)
00349 return count < o.count;
00350 else
00351 return false;
00352 }
00353
00354 bool operator==(const DNewItem<T>& o) const {
00355 return parent == o.parent && count == o.count;
00356 }
00357
00358 bool operator!=(const DNewItem<T>& o) const {
00359 return !(*this == o);
00360 }
00361
00362 struct GetFirst: public std::unary_function<DNewItem<T>,const T&> {
00363 const T& operator()(const DNewItem<T>& x) const {
00364 return x.val;
00365 }
00366 };
00367 };
00368
00369 template<typename InputIteratorTy>
00370 void safe_advance(InputIteratorTy& it, size_t d, size_t& cur, size_t dist) {
00371 if (d + cur >= dist) {
00372 d = dist - cur;
00373 }
00374 std::advance(it, d);
00375 cur += d;
00376 }
00377
00379 template<int chunksize,typename T>
00380 struct FIFO {
00381 WorkList::ChunkedFIFO<chunksize,T,false> m_data;
00382 WorkList::ChunkedLIFO<16,T,false> m_buffer;
00383 size_t m_size;
00384
00385 FIFO(): m_size(0) { }
00386
00387 ~FIFO() {
00388 Galois::optional<T> p;
00389 while ((p = m_buffer.pop()))
00390 ;
00391 while ((p = m_data.pop()))
00392 ;
00393 }
00394
00395 Galois::optional<T> pop() {
00396 Galois::optional<T> p;
00397 if ((p = m_buffer.pop()) || (p = m_data.pop())) {
00398 --m_size;
00399 }
00400 return p;
00401 }
00402
00403 Galois::optional<T> peek() {
00404 Galois::optional<T> p;
00405 if ((p = m_buffer.pop())) {
00406 m_buffer.push(*p);
00407 } else if ((p = m_data.pop())) {
00408 m_buffer.push(*p);
00409 }
00410 return p;
00411 }
00412
00413 void push(const T& val) {
00414 m_data.push(val);
00415 ++m_size;
00416 }
00417
00418 size_t size() const {
00419 return m_size;
00420 }
00421
00422 bool empty() const {
00423 return m_size == 0;
00424 }
00425 };
00426
00427 template<typename> class DMergeManagerBase;
00428 template<typename> class DMergeManager;
00429
00431 template<typename OptionsTy>
00432 class DMergeLocal: private boost::noncopyable {
00433 template<typename> friend class DMergeManagerBase;
00434 template<typename> friend class DMergeManager;
00435
00436 typedef typename OptionsTy::value_type value_type;
00437
00438 typedef DItem<value_type> Item;
00439 typedef DNewItem<value_type> NewItem;
00440 typedef std::vector<NewItem, typename PerIterAllocTy::rebind<NewItem>::other> NewItemsTy;
00441 typedef FIFO<1024,Item> ReserveTy;
00442
00443 IterAllocBaseTy heap;
00444 PerIterAllocTy alloc;
00445 ReserveTy reserve;
00446 size_t window;
00447 size_t delta;
00448 size_t committed;
00449 size_t iterations;
00450 size_t aborted;
00451
00452 size_t minId;
00453 size_t maxId;
00454
00455 size_t size;
00456
00457 public:
00458 NewItemsTy newItems;
00459
00460 DMergeLocal(): alloc(&heap), newItems(alloc) {
00461 resetStats();
00462 }
00463
00464 private:
00466 template<typename BiIteratorTy>
00467 void initialLimits(BiIteratorTy ii, BiIteratorTy ei) {
00468 minId = std::numeric_limits<size_t>::max();
00469 maxId = std::numeric_limits<size_t>::min();
00470 size = std::distance(ii, ei);
00471
00472 if (ii != ei) {
00473 if (ii + 1 == ei) {
00474 minId = maxId = ii->parent;
00475 } else {
00476 minId = ii->parent;
00477 maxId = (ei-1)->parent;
00478 }
00479 }
00480 }
00481
00482 template<typename InputIteratorTy,typename WL>
00483 void copyIn(InputIteratorTy ii, InputIteratorTy ei, size_t dist, WL* wl, unsigned numActive) {
00484 unsigned int tid = LL::getTID();
00485 size_t cur = 0;
00486 size_t k = 0;
00487 safe_advance(ii, tid, cur, dist);
00488 while (ii != ei) {
00489 unsigned long id = k * numActive + tid;
00490 if (id < window)
00491 wl->push(Item(*ii, id));
00492 else
00493 break;
00494 ++k;
00495 safe_advance(ii, numActive, cur, dist);
00496 }
00497
00498 while (ii != ei) {
00499 unsigned long id = k * numActive + tid;
00500 reserve.push(Item(*ii, id));
00501 ++k;
00502 safe_advance(ii, numActive, cur, dist);
00503 }
00504 }
00505
00506 size_t fixedWindowSize(const OptionsTy& options) {
00507 if (!OptionsTy::hasFixedWindow) return 0;
00508 if (OptionsTy::hasId)
00509 return options.defaultDelta + minId;
00510 else
00511 return options.defaultDelta;
00512 }
00513 void initialWindow(const OptionsTy& options, size_t dist) {
00514 size_t w = fixedWindowSize(options);
00515 if (!w) {
00516 if (OptionsTy::hasId)
00517 w = std::max((maxId - minId) / 100, (size_t) NewWorkManager<OptionsTy>::MinDelta) + minId;
00518 else
00519 w = std::max(dist / 100, (size_t) NewWorkManager<OptionsTy>::MinDelta);
00520 }
00521 window = delta = w;
00522 }
00523
00524 void receiveLimits(const DMergeLocal<OptionsTy>& other) {
00525 minId = other.minId;
00526 maxId = other.maxId;
00527 size = other.size;
00528 }
00529
00530 void reduceLimits(const DMergeLocal<OptionsTy>& other) {
00531 minId = std::min(other.minId, minId);
00532 maxId = std::max(other.maxId, maxId);
00533 size += other.size;
00534 }
00535
00536 public:
00537 void clear() { heap.clear(); }
00538 void incrementIterations() { ++iterations; }
00539 void incrementCommitted() { ++committed; }
00540 void resetStats() { committed = iterations = aborted = 0; }
00541 bool emptyReserve() { return reserve.empty(); }
00542
00543 template<typename WL>
00544 void nextWindow(WL* wl) {
00545 window += delta;
00546 Galois::optional<Item> p;
00547 while ((p = reserve.peek())) {
00548 if (p->id >= window)
00549 break;
00550 wl->push(*p);
00551 reserve.pop();
00552 }
00553 }
00554 };
00555
00556 template<typename OptionsTy>
00557 class DMergeManagerBase {
00558 protected:
00559 typedef typename OptionsTy::value_type value_type;
00560 typedef DItem<value_type> Item;
00561 typedef DNewItem<value_type> NewItem;
00562 typedef WorkList::dChunkedFIFO<NewWorkManager<OptionsTy>::ChunkSize,NewItem> NewWork;
00563 typedef DMergeLocal<OptionsTy> MergeLocal;
00564 typedef typename MergeLocal::NewItemsTy NewItemsTy;
00565 typedef typename NewItemsTy::iterator NewItemsIterator;
00566
00567 IterAllocBaseTy heap;
00568 PerIterAllocTy alloc;
00569 PerThreadStorage<MergeLocal> data;
00570
00571 NewWork new_;
00572 unsigned numActive;
00573
00574 void broadcastLimits(MergeLocal& mlocal, unsigned int tid) {
00575 for (unsigned i = 0; i < this->numActive; ++i) {
00576 if (i == tid) continue;
00577 MergeLocal& mother = *this->data.getRemote(i);
00578 mother.receiveLimits(mlocal);
00579 }
00580 }
00581
00582 void reduceLimits(MergeLocal& mlocal, unsigned int tid) {
00583 for (unsigned i = 0; i < this->numActive; ++i) {
00584 if (i == tid) continue;
00585 MergeLocal& mother = *this->data.getRemote(i);
00586 mlocal.reduceLimits(mother);
00587 }
00588 }
00589
00590 public:
00591 DMergeManagerBase(): alloc(&heap) {
00592 numActive = getActiveThreads();
00593 }
00594
00595 MergeLocal& get() {
00596 return *data.getLocal();
00597 }
00598
00599 void calculateWindow(const OptionsTy& options, bool inner) {
00600 MergeLocal& mlocal = *data.getLocal();
00601
00602
00603 size_t allcommitted = 0;
00604 size_t alliterations = 0;
00605 for (unsigned i = 0; i < numActive; ++i) {
00606 MergeLocal& mlocal = *data.getRemote(i);
00607 allcommitted += mlocal.committed;
00608 alliterations += mlocal.iterations;
00609 }
00610
00611 float commitRatio = alliterations > 0 ? allcommitted / (float) alliterations : 0.0;
00612 if (OptionsTy::hasFixedWindow) {
00613 if (!inner || allcommitted == alliterations) {
00614 mlocal.delta = mlocal.fixedWindowSize(options);
00615 } else {
00616 mlocal.delta = 0;
00617 }
00618 } else {
00619 const float target = 0.95;
00620
00621 if (commitRatio >= target)
00622 mlocal.delta += mlocal.delta;
00623 else if (allcommitted == 0)
00624 mlocal.delta += mlocal.delta;
00625 else
00626 mlocal.delta = commitRatio / target * mlocal.delta;
00627
00628 if (!inner) {
00629 mlocal.delta = std::max(mlocal.delta, (size_t) NewWorkManager<OptionsTy>::MinDelta);
00630 } else if (mlocal.delta < (size_t) NewWorkManager<OptionsTy>::MinDelta) {
00631
00632 mlocal.delta = 0;
00633 }
00634 }
00635
00636
00637 if (false) {
00638 if (LL::getTID() == 0) {
00639 char buf[1024];
00640 snprintf(buf, 1024, "%d %.3f (%zu/%zu) window: %zu delta: %zu\n",
00641 inner, commitRatio, allcommitted, alliterations, mlocal.window, mlocal.delta);
00642 LL::gPrint(buf);
00643 }
00644 }
00645 }
00646 };
00647
00649 template<typename OptionsTy>
00650 class DMergeManager: public DMergeManagerBase<OptionsTy> {
00651 typedef DMergeManagerBase<OptionsTy> Base;
00652 typedef typename Base::value_type value_type;
00653 typedef typename Base::Item Item;
00654 typedef typename Base::NewItem NewItem;
00655 typedef typename Base::MergeLocal MergeLocal;
00656 typedef typename Base::NewItemsTy NewItemsTy;
00657 typedef typename Base::NewItemsIterator NewItemsIterator;
00658
00659 struct GetNewItem: public std::unary_function<int,NewItemsTy&> {
00660 PerThreadStorage<MergeLocal>* base;
00661 GetNewItem() { }
00662 GetNewItem(PerThreadStorage<MergeLocal>* b): base(b) { }
00663 NewItemsTy& operator()(int i) const { return base->getRemote(i)->newItems; }
00664 };
00665
00666 typedef boost::transform_iterator<GetNewItem, boost::counting_iterator<int> > MergeOuterIt;
00667 typedef typename ChooseStlTwoLevelIterator<MergeOuterIt, typename NewItemsTy::iterator>::type MergeIt;
00668 typedef std::vector<NewItem, typename PerIterAllocTy::rebind<NewItem>::other> MergeBuf;
00669 typedef std::vector<value_type, typename PerIterAllocTy::rebind<value_type>::other> DistributeBuf;
00670
00671 const OptionsTy& options;
00672 MergeBuf mergeBuf;
00673 DistributeBuf distributeBuf;
00674 NewWorkManager<OptionsTy> newWorkManager;
00675 Barrier& barrier;
00676
00677 bool merge(int begin, int end) {
00678 if (begin == end)
00679 return false;
00680 else if (begin + 1 == end)
00681 return !this->data.getRemote(begin)->newItems.empty();
00682
00683 bool retval = false;
00684 int mid = (end - begin) / 2 + begin;
00685 retval |= merge(begin, mid);
00686 retval |= merge(mid, end);
00687
00688 MergeOuterIt bbegin(boost::make_counting_iterator(begin), GetNewItem(&this->data));
00689 MergeOuterIt mmid(boost::make_counting_iterator(mid), GetNewItem(&this->data));
00690 MergeOuterIt eend(boost::make_counting_iterator(end), GetNewItem(&this->data));
00691
00692
00693
00694 MergeIt aa = stl_two_level_begin(bbegin, mmid);
00695 MergeIt ea = stl_two_level_end(bbegin, mmid);
00696 MergeIt bb = stl_two_level_begin(mmid, eend);
00697 MergeIt eb = stl_two_level_end(mmid, eend);
00698 MergeIt cc = stl_two_level_begin(bbegin, eend);
00699 MergeIt ec = stl_two_level_end(bbegin, eend);
00700
00701 while (aa != ea && bb != eb) {
00702 if (*aa < *bb)
00703 mergeBuf.push_back(*aa++);
00704 else
00705 mergeBuf.push_back(*bb++);
00706 }
00707
00708 for (; aa != ea; ++aa)
00709 mergeBuf.push_back(*aa);
00710
00711 for (; bb != eb; ++bb)
00712 mergeBuf.push_back(*bb);
00713
00714 for (NewItemsIterator ii = mergeBuf.begin(), ei = mergeBuf.end(); ii != ei; ++ii)
00715 *cc++ = *ii;
00716
00717 mergeBuf.clear();
00718
00719 assert(cc == ec);
00720
00721 return retval;
00722 }
00723
00725 template<typename InputIteratorTy>
00726 void redistribute(InputIteratorTy ii, InputIteratorTy ei, size_t dist) {
00727 unsigned int tid = LL::getTID();
00728
00729
00730
00731 MergeLocal& mlocal = *this->data.getLocal();
00732
00733 size_t blockSize = mlocal.delta;
00734 size_t numBlocks = dist / blockSize;
00735
00736 size_t cur = 0;
00737 safe_advance(ii, tid, cur, dist);
00738 while (ii != ei) {
00739 unsigned long id;
00740 if (cur < blockSize * numBlocks)
00741
00742 id = (cur % numBlocks) * blockSize + (cur / numBlocks);
00743 else
00744 id = cur;
00745 distributeBuf[id] = *ii;
00746 safe_advance(ii, this->numActive, cur, dist);
00747 }
00748 }
00749
00750 template<typename InputIteratorTy,typename WL>
00751 void distribute(InputIteratorTy ii, InputIteratorTy ei, size_t dist, WL* wl) {
00752 unsigned int tid = LL::getTID();
00753 MergeLocal& mlocal = *this->data.getLocal();
00754 mlocal.initialWindow(options, dist);
00755 if (true) {
00756
00757 if (tid == 0) {
00758 distributeBuf.resize(dist);
00759 }
00760 barrier.wait();
00761 redistribute(ii, ei, dist);
00762 barrier.wait();
00763 mlocal.copyIn(distributeBuf.begin(), distributeBuf.end(), dist, wl, this->numActive);
00764 } else {
00765 mlocal.copyIn(ii, ei, dist, wl, this->numActive);
00766 }
00767 }
00768
00769 template<typename WL>
00770 void parallelSort(WL* wl) {
00771 MergeLocal& mlocal = *this->data.getLocal();
00772
00773 mlocal.newItems.clear();
00774 Galois::optional<NewItem> p;
00775 while ((p = this->new_.pop())) {
00776 mlocal.newItems.push_back(*p);
00777 }
00778
00779 std::sort(mlocal.newItems.begin(), mlocal.newItems.end());
00780 mlocal.initialLimits(mlocal.newItems.begin(), mlocal.newItems.end());
00781
00782 barrier.wait();
00783
00784 unsigned tid = LL::getTID();
00785 if (tid == 0) {
00786 this->reduceLimits(mlocal, tid);
00787 mergeBuf.reserve(mlocal.size);
00788 this->broadcastLimits(mlocal, tid);
00789 merge(0, this->numActive);
00790 }
00791
00792 barrier.wait();
00793
00794 MergeOuterIt bbegin(boost::make_counting_iterator(0), GetNewItem(&this->data));
00795 MergeOuterIt eend(boost::make_counting_iterator((int) this->numActive), GetNewItem(&this->data));
00796 MergeIt ii = stl_two_level_begin(bbegin, eend);
00797 MergeIt ei = stl_two_level_end(eend, eend);
00798
00799 distribute(boost::make_transform_iterator(ii, typename NewItem::GetFirst()),
00800 boost::make_transform_iterator(ei, typename NewItem::GetFirst()),
00801 mlocal.size, wl);
00802 }
00803
00804 public:
00805 DMergeManager(const OptionsTy& o):
00806 options(o), mergeBuf(this->alloc), distributeBuf(this->alloc), barrier(getSystemBarrier())
00807 { }
00808
00809 template<typename InputIteratorTy>
00810 void presort(const OptionsTy& options, InputIteratorTy ii, InputIteratorTy ei) {
00811 if (!OptionsTy::hasId)
00812 return;
00813
00814 size_t dist = std::distance(ii, ei);
00815 mergeBuf.reserve(dist);
00816 for (; ii != ei; ++ii)
00817 mergeBuf.push_back(NewItem(*ii, newWorkManager.id(options.fn1, *ii), 1));
00818 ParallelSTL::sort(mergeBuf.begin(), mergeBuf.end());
00819
00820 MergeLocal& mlocal = *this->data.getLocal();
00821 this->broadcastLimits(mlocal, 0);
00822 }
00823
00824 template<typename InputIteratorTy, typename WL>
00825 void addInitialWork(InputIteratorTy b, InputIteratorTy e, WL* wl) {
00826 if (OptionsTy::hasId) {
00827 distribute(
00828 boost::make_transform_iterator(mergeBuf.begin(), typename NewItem::GetFirst()),
00829 boost::make_transform_iterator(mergeBuf.end(), typename NewItem::GetFirst()),
00830 std::distance(mergeBuf.begin(), mergeBuf.end()), wl);
00831 mergeBuf.clear();
00832 } else {
00833 distribute(b, e, std::distance(b, e), wl);
00834 }
00835 }
00836
00837 template<typename WL>
00838 void pushNew(const OptionsTy& options, const value_type& val, unsigned long parent, unsigned count,
00839 WL* wl, bool& hasNewWork) {
00840 newWorkManager.pushNew(options, this->new_, val, parent, count);
00841 hasNewWork = true;
00842 }
00843
00844 template<typename WL>
00845 bool distributeNewWork(WL* wl) {
00846 parallelSort(wl);
00847 return false;
00848 }
00849 };
00850
00851 template<typename OptionsTy>
00852 class Executor {
00853 typedef typename OptionsTy::value_type value_type;
00854 typedef DItem<value_type> Item;
00855 typedef DNewItem<value_type> NewItem;
00856 typedef DMergeManager<OptionsTy> MergeManager;
00857 typedef DMergeLocal<OptionsTy> MergeLocal;
00858 typedef DeterministicContext<value_type, OptionsTy> Context;
00859
00860 typedef WorkList::dChunkedFIFO<NewWorkManager<OptionsTy>::ChunkSize,Item> WL;
00861 typedef WorkList::dChunkedFIFO<NewWorkManager<OptionsTy>::ChunkSize,Context> PendingWork;
00862 typedef WorkList::ChunkedFIFO<NewWorkManager<OptionsTy>::ChunkSize,Context,false> LocalPendingWork;
00863
00864
00865 struct ThreadLocalData: private boost::noncopyable {
00866 OptionsTy options;
00867 LocalPendingWork localPending;
00868 UserContextAccess<value_type> facing;
00869 LoopStatistics<OptionsTy::needsStats> stat;
00870 WL* wlcur;
00871 WL* wlnext;
00872 size_t rounds;
00873 size_t outerRounds;
00874 bool hasNewWork;
00875 ThreadLocalData(const OptionsTy& o, const char* loopname): options(o), stat(loopname), rounds(0), outerRounds(0) { }
00876 };
00877
00878 PendingWork pending;
00879 MergeManager mergeManager;
00880 BreakManager<OptionsTy> breakManager;
00881 StateManager<OptionsTy> stateManager;
00882 InOrderManager<OptionsTy> inOrderManager;
00883 WL worklists[2];
00884 const OptionsTy& options;
00885 Barrier& barrier;
00886 const char* loopname;
00887 LL::CacheLineStorage<volatile long> innerDone;
00888 LL::CacheLineStorage<volatile long> outerDone;
00889 LL::CacheLineStorage<volatile long> hasNewWork;
00890 int numActive;
00891
00892 bool pendingLoop(ThreadLocalData& tld);
00893 bool commitLoop(ThreadLocalData& tld);
00894 void go();
00895
00896 public:
00897 Executor(const OptionsTy& o, const char* ln):
00898 mergeManager(o), options(o), barrier(getSystemBarrier()), loopname(ln)
00899 {
00900 static_assert(!OptionsTy::needsBreak || OptionsTy::hasBreak,
00901 "need to use break function to break loop");
00902 }
00903
00904 template<typename RangeTy>
00905 void AddInitialWork(RangeTy range) {
00906 mergeManager.addInitialWork(range.begin(), range.end(), &worklists[1]);
00907 }
00908
00909 template<typename IterTy>
00910 void presort(IterTy ii, IterTy ei) {
00911 mergeManager.presort(options, ii, ei);
00912 }
00913
00914 void operator()() {
00915 go();
00916 }
00917 };
00918
00919 template<typename OptionsTy>
00920 void Executor<OptionsTy>::go() {
00921 ThreadLocalData tld(options, loopname);
00922 MergeLocal& mlocal = mergeManager.get();
00923 tld.wlcur = &worklists[0];
00924 tld.wlnext = &worklists[1];
00925
00926 tld.hasNewWork = false;
00927
00928 while (true) {
00929 ++tld.outerRounds;
00930
00931 while (true) {
00932 ++tld.rounds;
00933
00934 inOrderManager.resetAborted();
00935
00936 std::swap(tld.wlcur, tld.wlnext);
00937 setPending(PENDING);
00938 bool nextPending = pendingLoop(tld);
00939 innerDone.data = true;
00940
00941 barrier.wait();
00942
00943 inOrderManager.allReduceAborted();
00944
00945 setPending(COMMITTING);
00946 bool nextCommit = commitLoop(tld);
00947 outerDone.data = true;
00948 if (nextPending || nextCommit)
00949 innerDone.data = false;
00950
00951 barrier.wait();
00952
00953 if (innerDone.data)
00954 break;
00955
00956 mergeManager.calculateWindow(tld.options, true);
00957
00958 barrier.wait();
00959
00960 mlocal.nextWindow(tld.wlnext);
00961 mlocal.resetStats();
00962 }
00963
00964 if (!mlocal.emptyReserve())
00965 outerDone.data = false;
00966
00967 if (tld.hasNewWork)
00968 hasNewWork.data = true;
00969
00970 if (breakManager.checkBreak(tld.options.fn1))
00971 break;
00972
00973 mergeManager.calculateWindow(tld.options, false);
00974
00975 barrier.wait();
00976
00977 if (outerDone.data) {
00978 if (!OptionsTy::needsPush)
00979 break;
00980 if (!hasNewWork.data)
00981 break;
00982 tld.hasNewWork = mergeManager.distributeNewWork(tld.wlnext);
00983
00984 hasNewWork.data = false;
00985 } else {
00986 mlocal.nextWindow(tld.wlnext);
00987 }
00988
00989 mlocal.resetStats();
00990 }
00991
00992 setPending(NON_DET);
00993
00994 mlocal.clear();
00995
00996 if (OptionsTy::needsStats) {
00997 if (LL::getTID() == 0) {
00998 reportStat(loopname, "RoundsExecuted", tld.rounds);
00999 reportStat(loopname, "OuterRoundsExecuted", tld.outerRounds);
01000 if (OptionsTy::hasFixedWindow)
01001 reportStat(loopname, "FixedWindowSize", options.defaultDelta);
01002 if (OptionsTy::useInOrderCommit)
01003 reportStat(loopname, "InOrderCommit", 1);
01004 }
01005 }
01006 }
01007
01008 template<typename OptionsTy>
01009 bool Executor<OptionsTy>::pendingLoop(ThreadLocalData& tld)
01010 {
01011 MergeLocal& mlocal = mergeManager.get();
01012 bool retval = false;
01013 Galois::optional<Item> p;
01014 while ((p = tld.wlcur->pop())) {
01015
01016
01017
01018 Context* ctx = stateManager.emplaceContext(tld.localPending, pending, *p);
01019 inOrderManager.initializeContext(ctx);
01020
01021 assert(ctx != NULL);
01022
01023 mlocal.incrementIterations();
01024 bool commit = true;
01025
01026 ctx->startIteration();
01027 tld.stat.inc_iterations();
01028 setThreadContext(ctx);
01029
01030 stateManager.alloc(tld.facing, tld.options.fn1);
01031 int result = 0;
01032 #ifdef GALOIS_USE_LONGJMP
01033 if ((result = setjmp(hackjmp)) == 0) {
01034 #else
01035 try {
01036 #endif
01037 tld.options.fn1(ctx->item.val, tld.facing.data());
01038 #ifdef GALOIS_USE_LONGJMP
01039 } else { clearConflictLock(); }
01040 #else
01041 } catch (const ConflictFlag& flag) { clearConflictLock(); result = flag; }
01042 #endif
01043 clearReleasable();
01044 switch (result) {
01045 case 0:
01046 case REACHED_FAILSAFE: break;
01047 case CONFLICT: commit = false; break;
01048 default: assert(0 && "Unknown conflict flag"); abort(); break;
01049 }
01050
01051 if (ForEachTraits<typename OptionsTy::function1_type>::NeedsPIA && !OptionsTy::useLocalState)
01052 tld.facing.resetAlloc();
01053
01054 inOrderManager.updateAborted(ctx);
01055
01056 if (commit) {
01057 stateManager.save(tld.facing, ctx->item.localState);
01058 } else {
01059 retval = true;
01060 }
01061 }
01062
01063 return retval;
01064 }
01065
01066 template<typename OptionsTy>
01067 bool Executor<OptionsTy>::commitLoop(ThreadLocalData& tld)
01068 {
01069 bool retval = false;
01070 MergeLocal& mlocal = mergeManager.get();
01071
01072 size_t ncommits = 0;
01073 size_t niter = 0;
01074
01075 Context* ctx;
01076 while ((ctx = stateManager.peekContext(tld.localPending, pending))) {
01077 ++niter;
01078 bool commit = true;
01079
01080
01081 if (ctx->notReady())
01082 commit = false;
01083 else if (!inOrderManager.shouldCommit(ctx))
01084 commit = false;
01085
01086 setThreadContext(ctx);
01087 if (commit) {
01088 stateManager.restore(tld.facing, ctx->item.localState);
01089 int result = 0;
01090 #ifdef GALOIS_USE_LONGJMP
01091 if ((result = setjmp(hackjmp)) == 0) {
01092 #else
01093 try {
01094 #endif
01095 tld.options.fn2(ctx->item.val, tld.facing.data());
01096 #ifdef GALOIS_USE_LONGJMP
01097 } else { clearConflictLock(); }
01098 #else
01099 } catch (const ConflictFlag& flag) { clearConflictLock(); result = flag; }
01100 #endif
01101 clearReleasable();
01102 switch (result) {
01103 case 0: break;
01104 case CONFLICT: commit = false; break;
01105 default: assert(0 && "Unknown conflict flag"); abort(); break;
01106 }
01107 }
01108
01109 stateManager.dealloc(tld.facing);
01110
01111 if (commit) {
01112 ++ncommits;
01113 mlocal.incrementCommitted();
01114 if (ForEachTraits<typename OptionsTy::function2_type>::NeedsPush) {
01115 unsigned long parent = ctx->item.id;
01116 typedef typename UserContextAccess<value_type>::PushBufferTy::iterator iterator;
01117 unsigned count = 0;
01118 for (iterator ii = tld.facing.getPushBuffer().begin(),
01119 ei = tld.facing.getPushBuffer().end(); ii != ei; ++ii) {
01120 mergeManager.pushNew(tld.options, *ii, parent, ++count, tld.wlnext, tld.hasNewWork);
01121 if (count == 0) {
01122 assert(0 && "Counter overflow");
01123 abort();
01124 }
01125 }
01126 }
01127 assert(ForEachTraits<typename OptionsTy::function2_type>::NeedsPush
01128 || tld.facing.getPushBuffer().begin() == tld.facing.getPushBuffer().end());
01129 } else {
01130 stateManager.reuseItem(ctx->item);
01131 tld.wlnext->push(ctx->item);
01132 tld.stat.inc_conflicts();
01133 retval = true;
01134 }
01135
01136 if (commit) {
01137 ctx->commitIteration();
01138 } else {
01139 ctx->cancelIteration();
01140 }
01141
01142 if (ForEachTraits<typename OptionsTy::function2_type>::NeedsPIA && !OptionsTy::useLocalState)
01143 tld.facing.resetAlloc();
01144
01145 tld.facing.resetPushBuffer();
01146 stateManager.popContext(tld.localPending, pending);
01147 }
01148
01149 if (ForEachTraits<typename OptionsTy::function2_type>::NeedsPIA && OptionsTy::useLocalState)
01150 tld.facing.resetAlloc();
01151
01152 setThreadContext(0);
01153
01154 return retval;
01155 }
01156
01157 }
01158 }
01159
01160 template<typename RangeTy, typename WorkTy>
01161 static inline void for_each_det_impl(const RangeTy& range, WorkTy& W) {
01162 W.presort(range.begin(), range.end());
01163
01164 assert(!inGaloisForEach);
01165
01166 inGaloisForEach = true;
01167 RunCommand init(std::bind(&WorkTy::template AddInitialWork<RangeTy>, std::ref(W), std::ref(range)));
01168 RunCommand w[4] = {std::ref(init),
01169 std::ref(getSystemBarrier()),
01170 std::ref(W),
01171 std::ref(getSystemBarrier())};
01172 getSystemThreadPool().run(&w[0], &w[4], activeThreads);
01173 inGaloisForEach = false;
01174 }
01175
01176
01177 #if 0
01178
01185 template<typename IterTy, typename ComparatorTy, typename NhFunc, typename OpFunc>
01186 static inline void for_each_ordered_2p(IterTy b, IterTy e, ComparatorTy comp, NhFunc f1, OpFunc f2, const char* loopname) {
01187 typedef Runtime::StandardRange<IterTy> Range;
01188 typedef typename Range::value_type T;
01189 typedef Runtime::DeterministicImpl::OrderedOptions<T,NhFunc,OpFunc,ComparatorTy> OptionsTy;
01190 typedef Runtime::DeterministicImpl::Executor<OptionsTy> WorkTy;
01191
01192 OptionsTy options(f1, f2, comp);
01193 WorkTy W(options, loopname);
01194 for_each_det_impl(makeStandardRange(b,e), W);
01195 }
01196 #endif
01197
01198 }
01199 }
01200
01201 namespace Galois {
01202
01215 template<typename IterTy, typename Function1Ty, typename Function2Ty>
01216 static inline void for_each_det(IterTy b, IterTy e, Function1Ty prefix, Function2Ty fn, const char* loopname = 0) {
01217 typedef Runtime::StandardRange<IterTy> Range;
01218 typedef typename Range::value_type T;
01219 typedef Runtime::DeterministicImpl::Options<T,Function1Ty,Function2Ty> OptionsTy;
01220 typedef Runtime::DeterministicImpl::Executor<OptionsTy> WorkTy;
01221
01222 OptionsTy options(prefix, fn);
01223 WorkTy W(options, loopname);
01224 Runtime::for_each_det_impl(Runtime::makeStandardRange(b, e), W);
01225 }
01226
01238 template<typename T, typename Function1Ty, typename Function2Ty>
01239 static inline void for_each_det(T i, Function1Ty prefix, Function2Ty fn, const char* loopname = 0) {
01240 T wl[1] = { i };
01241 for_each_det(&wl[0], &wl[1], prefix, fn, loopname);
01242 }
01243
01256 template<typename IterTy, typename FunctionTy>
01257 static inline void for_each_det(IterTy b, IterTy e, FunctionTy fn, const char* loopname = 0) {
01258 for_each_det(b, e, fn, fn, loopname);
01259 }
01260
01272 template<typename T, typename FunctionTy>
01273 static inline void for_each_det(T i, FunctionTy fn, const char* loopname = 0) {
01274 T wl[1] = { i };
01275 for_each_det(&wl[0], &wl[1], fn, fn, loopname);
01276 }
01277
01278 }
01279
01280
01281
01282 #endif