20 #ifndef GALOIS_RUNTIME_EXECUTOR_DETERMINISTIC_H
21 #define GALOIS_RUNTIME_EXECUTOR_DETERMINISTIC_H
25 #include <type_traits>
27 #include <boost/iterator/counting_iterator.hpp>
28 #include <boost/iterator/iterator_facade.hpp>
29 #include <boost/iterator/transform_iterator.hpp>
32 #include "galois/config.h"
64 template <
typename T,
bool UseLocalState>
70 DItemBase(
const T& _val,
unsigned long _id) : val(_val), id(_id) {}
71 void* getLocalState()
const {
return nullptr; }
72 void setLocalState(
void*) {}
76 class DItemBase<T, true> {
86 DItemBase(
const T& _val,
unsigned long _id)
87 : val(_val), localState(nullptr), id(_id) {}
88 void* getLocalState()
const {
return localState; }
89 void setLocalState(
void* ptr) { localState = ptr; }
92 template <
typename OptionsTy>
94 DItemBase<typename OptionsTy::value_type, OptionsTy::useLocalState>;
96 class FirstPassBase :
public SimpleRuntimeContext {
101 explicit FirstPassBase(
bool f =
true)
104 bool isFirstPass(
void)
const {
return firstPassFlag; }
106 void setFirstPass(
void) { firstPassFlag =
true; }
108 void resetFirstPass(
void) { firstPassFlag =
false; }
114 alwaysAcquire(lockable, f);
119 template <
typename OptionsTy,
bool HasFixedNeighborhood,
bool HasIntentToRead>
120 class DeterministicContextBase :
public FirstPassBase {
122 typedef DItem<OptionsTy> Item;
129 DeterministicContextBase(
const Item& _item)
130 : FirstPassBase(true), item(_item), notReady(false) {}
134 bool isReady() {
return !notReady; }
141 DeterministicContextBase* other;
143 other =
static_cast<DeterministicContextBase*
>(this->
getOwner(lockable));
147 bool conflict = other->item.id < this->item.id;
159 other->notReady =
true;
163 static void initialize() {}
166 class HasIntentToReadContext :
public FirstPassBase {
172 HasIntentToReadContext(
unsigned long id,
bool w)
173 : FirstPassBase(true), id(id), notReady(false), isWriter(w) {}
175 bool isReady() {
return !notReady; }
179 public HasIntentToReadContext {
180 template <
typename,
bool,
bool>
181 friend class DeterministicContextBase;
184 ReaderContext(
unsigned long id)
185 : galois::
UnionFindNode<ReaderContext>(const_cast<ReaderContext*>(this)),
186 HasIntentToReadContext(id, false) {}
191 ReaderContext* r = this->
find();
196 bool propagate() {
return this->
find()->isReady(); }
203 template <
typename OptionsTy>
204 class DeterministicContextBase<OptionsTy, false, true>
205 :
public HasIntentToReadContext {
207 typedef DItem<OptionsTy> Item;
211 ReaderContext readerCtx;
213 void acquireRead(Lockable* lockable) {
214 HasIntentToReadContext* other;
216 other =
static_cast<HasIntentToReadContext*
>(this->
getOwner(lockable));
217 if (other ==
this || other == &readerCtx)
220 bool conflict = other->id < this->id;
223 readerCtx.notReady =
true;
225 readerCtx.merge(static_cast<ReaderContext*>(other));
229 }
while (!readerCtx.stealByCAS(lockable, other));
233 if (other->isWriter) {
235 other->notReady =
true;
237 static_cast<ReaderContext*
>(other)->merge(&readerCtx);
242 void acquireWrite(Lockable* lockable) {
243 HasIntentToReadContext* other;
245 other =
static_cast<HasIntentToReadContext*
>(this->
getOwner(lockable));
246 if (other ==
this || other == &readerCtx)
249 bool conflict = other->id < this->id;
252 this->notReady =
true;
261 other->notReady =
true;
266 DeterministicContextBase(
const Item& i)
267 : HasIntentToReadContext(i.id, true), item(i), readerCtx(i.id) {}
271 void build() { readerCtx.build(); }
274 if (this->isReady() && !readerCtx.propagate())
275 this->notReady =
true;
285 acquireRead(lockable);
288 acquireWrite(lockable);
292 static void initialize() {}
295 template <
typename OptionsTy>
296 class DeterministicContextBase<OptionsTy, true, false> :
public FirstPassBase {
298 typedef DItem<OptionsTy> Item;
303 std::atomic<int> preds;
305 struct ContextPtrLessThan {
306 bool operator()(
const DeterministicContextBase* a,
307 const DeterministicContextBase* b)
const {
310 if (a->item.id == b->item.id)
312 return a->item.id < b->item.id;
317 DeterministicContextBase(
const Item& _item)
318 : FirstPassBase(true), item(_item), preds(0) {}
324 edges.clear(*dagListHeap);
325 succs.clear(*dagListHeap);
328 void addEdge(DeterministicContextBase* o) {
329 succs.push_front(*dagListHeap, o);
333 bool isReady() {
return false; }
338 DeterministicContextBase* owner =
339 static_cast<DeterministicContextBase*
>(this->
getOwner(lockable));
346 owner =
static_cast<DeterministicContextBase*
>(this->
getOwner(lockable));
349 if (std::find(edges.begin(), edges.end(), owner) != edges.end())
351 edges.push_front(*dagListHeap, owner);
354 static void initialize() {
357 sizeof(
typename ContextList::block_type));
361 template <
typename OptionsTy>
362 class DeterministicContextBase<OptionsTy, true, true> {
366 template <
typename OptionsTy>
367 using DeterministicContext =
368 DeterministicContextBase<OptionsTy, OptionsTy::hasFixedNeighborhood,
369 OptionsTy::hasIntentToRead>;
371 template <
typename T>
374 unsigned long parent;
377 DNewItem(
const T& _val,
unsigned long _parent,
unsigned _count)
378 : val(_val), parent(_parent), count(_count) {}
380 bool operator<(const DNewItem<T>& o)
const {
381 if (parent < o.parent)
383 else if (parent == o.parent)
384 return count < o.count;
390 return parent == o.parent && count == o.count;
393 bool operator!=(
const DNewItem<T>& o)
const {
return !(*
this == o); }
395 struct GetValue :
public std::unary_function<DNewItem<T>, const T&> {
396 const T&
operator()(
const DNewItem<T>& x)
const {
return x.val; }
400 template <
typename InputIteratorTy>
401 void safe_advance(InputIteratorTy& it,
size_t d,
size_t& cur,
size_t dist) {
402 if (d + cur >= dist) {
411 template <
int ChunkSize,
typename T>
413 worklists::ChunkFIFO<ChunkSize, T, false> m_data;
414 worklists::ChunkLIFO<16, T, false> m_buffer;
417 FIFO() : m_size(0) {}
421 while ((p = m_buffer.pop()))
423 while ((p = m_data.pop()))
429 if ((p = m_buffer.pop()) || (p = m_data.pop())) {
437 if ((p = m_buffer.pop())) {
439 }
else if ((p = m_data.pop())) {
445 void push(
const T& val) {
450 size_t size()
const {
return m_size; }
452 bool empty()
const {
return m_size == 0; }
455 template <
typename T,
typename FunctionTy,
typename ArgsTy>
456 struct OptionsCommon {
458 typedef FunctionTy function2_type;
459 typedef ArgsTy args_type;
461 constexpr
static bool needStats = galois::internal::NeedStats<ArgsTy>::value;
462 constexpr
static bool needsPush = !has_trait<no_pushes_tag, ArgsTy>();
463 constexpr
static bool needsAborts = !has_trait<no_conflicts_tag, ArgsTy>();
464 constexpr
static bool needsPia = has_trait<per_iter_alloc_tag, ArgsTy>();
465 constexpr
static bool needsBreak = has_trait<parallel_break_tag, ArgsTy>();
467 constexpr
static bool hasBreak = has_trait<det_parallel_break_tag, ArgsTy>();
468 constexpr
static bool hasId = has_trait<det_id_tag, ArgsTy>();
470 constexpr
static bool useLocalState = has_trait<local_state_tag, ArgsTy>();
471 constexpr
static bool hasFixedNeighborhood =
472 has_trait<fixed_neighborhood_tag, ArgsTy>();
473 constexpr
static bool hasIntentToRead =
474 has_trait<intent_to_read_tag, ArgsTy>();
476 static const int ChunkSize = 32;
477 static const unsigned InitialNumRounds = 100;
478 static const size_t MinDelta = ChunkSize * 40;
481 !hasFixedNeighborhood || (hasFixedNeighborhood && hasId),
482 "Please provide id function when operator has fixed neighborhood");
487 OptionsCommon(function2_type f, ArgsTy a) : fn2(f), args(a) {}
490 template <
typename T,
typename FunctionTy,
typename ArgsTy,
bool Enable>
491 struct OptionsBase :
public OptionsCommon<T, FunctionTy, ArgsTy> {
492 typedef OptionsCommon<T, FunctionTy, ArgsTy> SuperTy;
493 typedef FunctionTy function1_type;
497 OptionsBase(function1_type f, ArgsTy a) : SuperTy(f, a), fn1(f) {}
500 template <
typename T,
typename FunctionTy,
typename ArgsTy>
501 struct OptionsBase<T, FunctionTy, ArgsTy, true>
502 :
public OptionsCommon<T, FunctionTy, ArgsTy> {
503 typedef OptionsCommon<T, FunctionTy, ArgsTy> SuperTy;
504 typedef typename get_trait_type<neighborhood_visitor_tag, ArgsTy>::type::type
509 OptionsBase(FunctionTy f, ArgsTy a)
510 : SuperTy(f, a), fn1(
get_trait_value<neighborhood_visitor_tag>(a).value) {
514 template <
typename T,
typename FunctionTy,
typename ArgsTy>
515 using Options = OptionsBase<T, FunctionTy, ArgsTy,
516 has_trait<neighborhood_visitor_tag, ArgsTy>()>;
518 template <
typename OptionsTy,
bool Enable>
519 class DAGManagerBase {
520 typedef DeterministicContext<OptionsTy> Context;
523 void destroyDAGManager() {}
524 void pushDAGTask(Context*) {}
525 bool buildDAG() {
return false; }
526 template <
typename Executor,
typename ExecutorTLD>
527 bool executeDAG(Executor&, ExecutorTLD&) {
532 template <
typename OptionsTy>
533 class DAGManagerBase<OptionsTy, true> {
534 typedef DeterministicContext<OptionsTy> Context;
535 typedef worklists::PerSocketChunkFIFO<OptionsTy::ChunkSize * 2, Context*> WL1;
536 typedef worklists::PerThreadChunkLIFO<OptionsTy::ChunkSize * 2, Context*> WL2;
537 typedef worklists::PerSocketChunkFIFO<32, Context*> WL3;
539 struct ThreadLocalData :
private boost::noncopyable {
540 typedef std::vector<Context*,
541 typename PerIterAllocTy::rebind<Context*>::other>
546 ThreadLocalData() :
alloc(&heap), sortBuf(
alloc) {}
549 substrate::PerThreadStorage<ThreadLocalData> data;
553 substrate::TerminationDetection& term;
554 substrate::Barrier& barrier;
561 void destroyDAGManager() { data.getLocal()->heap.clear(); }
563 void pushDAGTask(Context* ctx) { taskList.push(ctx); }
566 ThreadLocalData& tld = *data.getLocal();
568 while ((p = taskList.pop())) {
571 std::copy(ctx->edges.begin(), ctx->edges.end(),
572 std::back_inserter(tld.sortBuf));
573 std::sort(tld.sortBuf.begin(), tld.sortBuf.end(),
574 typename Context::ContextPtrLessThan());
576 if (!tld.sortBuf.empty()) {
577 Context* last = tld.sortBuf.front();
578 for (
auto ii = tld.sortBuf.begin() + 1, ei = tld.sortBuf.end();
581 if (last != cur && cur != ctx)
592 template <
typename Executor,
typename ExecutorTLD>
593 bool executeDAG(Executor& e, ExecutorTLD& etld) {
594 auto& local = e.getLocalWindowManager();
599 while ((p = taskList2.pop())) {
601 if (ctx->preds.load(std::memory_order_relaxed) == 0)
602 sourceList.push(ctx);
605 term.initializeThread();
609 size_t oldCommitted = 0;
610 size_t committed = 0;
613 while ((p = sourceList.pop())) {
615 assert(ctx->preds == 0);
617 commit = e.executeTask(etld, ctx);
618 local.incrementCommitted();
621 e.deallocLocalState(etld.facing);
623 if (OptionsTy::needsPia && !OptionsTy::useLocalState)
624 etld.facing.resetAlloc();
626 etld.facing.resetPushBuffer();
629 for (
auto& succ : ctx->succs) {
630 int v = --succ->preds;
633 sourceList.push(succ);
637 term.localTermination(oldCommitted != committed);
638 oldCommitted = committed;
639 substrate::asmPause();
640 }
while (!term.globalTermination());
642 if (OptionsTy::needsPia && OptionsTy::useLocalState)
643 etld.facing.resetAlloc();
651 template <
typename OptionsTy>
652 using DAGManager = DAGManagerBase<OptionsTy, OptionsTy::hasFixedNeighborhood>;
654 template <
typename OptionsTy,
bool Enable>
655 struct StateManagerBase {
657 typedef typename OptionsTy::function2_type function_type;
658 void allocLocalState(UserContextAccess<value_type>&, function_type&) {}
659 void deallocLocalState(UserContextAccess<value_type>&) {}
660 void saveLocalState(UserContextAccess<value_type>&, DItem<OptionsTy>&) {}
661 void restoreLocalState(UserContextAccess<value_type>&,
662 const DItem<OptionsTy>&) {}
663 void reuseItem(DItem<OptionsTy>&) {}
665 template <
typename LWL,
typename GWL>
667 const DItem<OptionsTy>& item)
const {
668 return gwl.emplace(item);
671 template <
typename LWL,
typename GWL>
676 template <
typename LWL,
typename GWL>
677 void popContext(LWL&, GWL& gwl)
const {
682 template <
typename OptionsTy>
683 struct StateManagerBase<OptionsTy, true> {
685 typedef typename OptionsTy::function2_type function_type;
686 typedef typename get_trait_type<
687 local_state_tag,
typename OptionsTy::args_type>::type::type LocalState;
689 void allocLocalState(UserContextAccess<value_type>& c, function_type&) {
690 void* p = c.data().getPerIterAlloc().allocate(
sizeof(LocalState));
695 void deallocLocalState(UserContextAccess<value_type>& c) {
696 LocalState* p = c.data().template getLocalState<LocalState>();
701 void saveLocalState(UserContextAccess<value_type>& c,
702 DItem<OptionsTy>& item) {
703 item.setLocalState(c.data().template getLocalState<LocalState>());
706 void restoreLocalState(UserContextAccess<value_type>& c,
707 const DItem<OptionsTy>& item) {
708 c.setLocalState(item.getLocalState());
711 template <
typename LWL,
typename GWL>
713 const DItem<OptionsTy>& item)
const {
714 return lwl.emplace(item);
717 template <
typename LWL,
typename GWL>
722 template <
typename LWL,
typename GWL>
723 void popContext(LWL& lwl, GWL&)
const {
727 void reuseItem(DItem<OptionsTy>& item) { item.setLocalState(
nullptr); }
730 template <
typename OptionsTy>
731 using StateManager = StateManagerBase<OptionsTy, OptionsTy::useLocalState>;
733 template <
typename OptionsTy,
bool Enable>
734 class BreakManagerBase {
736 bool checkBreak() {
return false; }
737 BreakManagerBase(
const OptionsTy&) {}
740 template <
typename OptionsTy>
741 class BreakManagerBase<OptionsTy, true> {
742 typedef typename get_trait_type<det_parallel_break_tag,
743 typename OptionsTy::args_type>::type::type
746 substrate::Barrier& barrier;
747 substrate::CacheLineStorage<volatile long> done;
750 BreakManagerBase(
const OptionsTy& o)
756 done.get() = breakFn();
762 template <
typename OptionsTy>
763 using BreakManager = BreakManagerBase<OptionsTy, OptionsTy::hasBreak>;
765 template <
typename OptionsTy,
bool Enable>
766 class IntentToReadManagerBase {
767 typedef DeterministicContext<OptionsTy> Context;
770 void pushIntentToReadTask(Context*) {}
771 bool buildIntentToRead() {
return false; }
774 template <
typename OptionsTy>
775 class IntentToReadManagerBase<OptionsTy, true> {
776 typedef DeterministicContext<OptionsTy> Context;
778 substrate::PerThreadStorage<WL> pending;
779 substrate::Barrier& barrier;
784 void pushIntentToReadTask(Context* ctx) {
785 pending.getLocal()->push_back(ctx);
790 bool buildIntentToRead() {
791 for (Context* ctx : *pending.getLocal())
794 for (Context* ctx : *pending.getLocal())
796 pending.getLocal()->clear();
801 template <
typename OptionsTy>
802 using IntentToReadManager =
803 IntentToReadManagerBase<OptionsTy, OptionsTy::hasIntentToRead>;
805 template <
typename OptionsTy,
bool Enable>
806 class WindowManagerBase {
808 class ThreadLocalData {
809 template <
typename,
bool>
810 friend class WindowManagerBase;
817 size_t nextWindow(
bool first =
false) {
822 committed = iterations = 0;
826 void incrementIterations() { ++iterations; }
827 void incrementCommitted() { ++committed; }
831 substrate::PerThreadStorage<ThreadLocalData> data;
837 ThreadLocalData& getLocalWindowManager() {
return *data.getLocal(); }
839 size_t nextWindow(
size_t dist,
size_t atleast,
size_t base = 0) {
843 ThreadLocalData& local = *data.getLocal();
844 return local.nextWindow(
true);
846 return initialWindow(dist, atleast, base);
850 size_t initialWindow(
size_t dist,
size_t atleast,
size_t base = 0) {
851 ThreadLocalData& local = *data.getLocal();
852 size_t w =
std::max(dist / OptionsTy::InitialNumRounds, atleast) + base;
853 local.window = local.delta = w;
857 void calculateWindow(
bool inner) {
858 ThreadLocalData& local = *data.getLocal();
861 size_t allcommitted = 0;
862 size_t alliterations = 0;
863 for (
unsigned i = 0; i < numActive; ++i) {
864 ThreadLocalData& r = *data.getRemote(i);
865 allcommitted += r.committed;
866 alliterations += r.iterations;
870 alliterations > 0 ? allcommitted / (float)alliterations : 0.0;
871 const float target = 0.95;
873 if (commitRatio >= target)
874 local.delta += local.delta;
875 else if (allcommitted == 0) {
876 assert((alliterations == 0) &&
"someone should have committed");
877 local.delta += local.delta;
879 local.delta = commitRatio / target * local.delta;
882 if (local.delta < OptionsTy::MinDelta)
883 local.delta = OptionsTy::MinDelta;
884 }
else if (local.delta < OptionsTy::MinDelta) {
893 snprintf(buf, 1024,
"%d %.3f (%zu/%zu) window: %zu delta: %zu\n", inner,
894 commitRatio, allcommitted, alliterations, local.window,
902 template <
typename OptionsTy>
903 class WindowManagerBase<OptionsTy, true> {
905 class ThreadLocalData {
909 void incrementIterations() {}
910 void incrementCommitted() {}
914 ThreadLocalData data;
917 ThreadLocalData& getLocalWindowManager() {
return data; }
919 size_t nextWindow(
size_t,
size_t,
size_t = 0) {
return data.nextWindow(); }
921 size_t initialWindow(
size_t,
size_t,
size_t = 0) {
925 void calculateWindow(
bool) {}
928 template <
typename OptionsTy>
929 using WindowManager =
930 WindowManagerBase<OptionsTy, OptionsTy::hasFixedNeighborhood>;
932 template <
typename OptionsTy,
bool Enable>
933 struct IdManagerBase {
935 IdManagerBase(
const OptionsTy&) {}
939 template <
typename OptionsTy>
940 class IdManagerBase<OptionsTy, true> {
943 typename get_trait_type<det_id_tag,
944 typename OptionsTy::args_type>::type::type IdFn;
948 IdManagerBase(
const OptionsTy& o)
950 uintptr_t id(
const value_type& x) {
return idFn(x); }
953 template <
typename OptionsTy>
954 using IdManager = IdManagerBase<OptionsTy, OptionsTy::hasId>;
956 template <
typename OptionsTy>
957 class NewWorkManager :
public IdManager<OptionsTy> {
959 typedef DItem<OptionsTy> Item;
960 typedef DNewItem<value_type> NewItem;
961 typedef std::vector<NewItem, typename PerIterAllocTy::rebind<NewItem>::other>
963 typedef typename NewItemsTy::iterator NewItemsIterator;
964 typedef FIFO<1024, Item> ReserveTy;
965 typedef worklists::PerSocketChunkFIFO<OptionsTy::ChunkSize, NewItem> NewWork;
967 struct GetNewItem :
public std::unary_function<int, NewItemsTy&> {
968 NewWorkManager*
self;
969 GetNewItem(NewWorkManager* s = 0) : self(s) {}
971 return self->data.getRemote(i)->newItems;
975 typedef boost::transform_iterator<GetNewItem, boost::counting_iterator<int>>
977 typedef std::vector<NewItem, typename PerIterAllocTy::rebind<NewItem>::other>
980 typename PerIterAllocTy::rebind<value_type>::other>
983 struct ThreadLocalData {
992 ThreadLocalData() :
alloc(&heap), newItems(
alloc) {}
997 substrate::PerThreadStorage<ThreadLocalData> data;
1000 DistributeBuf distributeBuf;
1001 substrate::Barrier& barrier;
1004 bool merge(
int begin,
int end) {
1007 else if (begin + 1 == end)
1008 return !data.getRemote(begin)->newItems.empty();
1010 bool retval =
false;
1011 int mid = (end - begin) / 2 + begin;
1012 retval |= merge(begin, mid);
1013 retval |= merge(mid, end);
1015 GetNewItem fn(
this);
1017 MergeOuterIt bbegin(boost::make_counting_iterator(begin), fn);
1018 MergeOuterIt mmid(boost::make_counting_iterator(mid), fn);
1019 MergeOuterIt eend(boost::make_counting_iterator(end), fn);
1021 typename NewItemsTy::iterator, GetBegin,
1022 GetEnd>(bbegin, mmid);
1024 typename NewItemsTy::iterator, GetBegin,
1025 GetEnd>(mmid, eend);
1027 typename NewItemsTy::iterator, GetBegin,
1028 GetEnd>(bbegin, eend);
1030 while (aa.first != aa.second && bb.first != bb.second) {
1031 if (*aa.first < *bb.first)
1032 mergeBuf.push_back(*aa.first++);
1034 mergeBuf.push_back(*bb.first++);
1037 for (; aa.first != aa.second; ++aa.first)
1038 mergeBuf.push_back(*aa.first);
1040 for (; bb.first != bb.second; ++bb.first)
1041 mergeBuf.push_back(*bb.first);
1043 for (NewItemsIterator ii = mergeBuf.begin(), ei = mergeBuf.end(); ii != ei;
1049 assert(cc.first == cc.second);
1064 template <
typename InputIteratorTy>
1065 void redistribute(InputIteratorTy ii, InputIteratorTy ei,
size_t dist,
1066 size_t window,
unsigned tid) {
1068 size_t blockSize = window;
1069 size_t numBlocks = dist / blockSize;
1075 if (cur < blockSize * numBlocks)
1076 id = (cur % numBlocks) * blockSize + (cur / numBlocks);
1079 distributeBuf[id] = *ii;
1084 template <
typename InputIteratorTy,
typename WL>
1085 void copyMine(InputIteratorTy ii, InputIteratorTy ei,
size_t dist, WL*
wl,
1086 size_t window,
unsigned tid) {
1087 ThreadLocalData& local = *data.getLocal();
1092 unsigned long id = k * numActive + tid;
1094 wl->push(Item(*ii,
id));
1102 unsigned long id = k * numActive + tid;
1103 local.reserve.push(Item(*ii,
id));
1109 template <
typename InputIteratorTy,
typename WL>
1110 void copyAllWithIds(InputIteratorTy ii, InputIteratorTy ei, WL* wl,
1112 ThreadLocalData& local = *data.getLocal();
1113 for (; ii != ei; ++ii) {
1114 unsigned long id = ii->parent;
1116 wl->push(Item(ii->val,
id));
1121 for (; ii != ei; ++ii) {
1122 unsigned long id = ii->parent;
1123 local.reserve.push(Item(ii->val,
id));
1127 template <
typename InputIteratorTy,
typename WL>
1128 void copyMineAfterRedistribute(InputIteratorTy ii, InputIteratorTy ei,
1129 size_t dist, WL* wl,
size_t window,
1132 distributeBuf.resize(dist);
1135 redistribute(ii, ei, dist, window, tid);
1137 copyMine(distributeBuf.begin(), distributeBuf.end(), dist,
wl, window, tid);
1140 template <
typename WL>
1141 void parallelSort(WindowManager<OptionsTy>& wm, WL* wl,
unsigned tid) {
1142 ThreadLocalData& local = *data.getLocal();
1144 local.newItems.clear();
1146 while ((p = this->new_.pop())) {
1147 local.newItems.push_back(*p);
1150 NewItemsIterator ii = local.newItems.begin();
1151 NewItemsIterator ei = local.newItems.end();
1153 initialLimits(ii, ei);
1154 local.size = local.newItems.size();
1159 receiveLimits(local);
1160 broadcastLimits(local);
1161 if (!OptionsTy::hasId) {
1162 mergeBuf.reserve(local.size);
1163 merge(0, numActive);
1169 if (OptionsTy::hasId) {
1170 size_t window = wm.nextWindow(local.maxId - local.minId,
1171 OptionsTy::MinDelta, local.minId);
1172 copyAllWithIds(ii, ei, wl, window);
1174 GetNewItem fn(
this);
1175 MergeOuterIt bbegin(boost::make_counting_iterator(0), fn);
1176 MergeOuterIt eend(boost::make_counting_iterator((
int)numActive), fn);
1178 typename NewItemsTy::iterator, GetBegin,
1179 GetEnd>(bbegin, eend);
1181 size_t window = wm.nextWindow(local.size, OptionsTy::MinDelta);
1182 copyMineAfterRedistribute(boost::make_transform_iterator(
1183 ii.first,
typename NewItem::GetValue()),
1184 boost::make_transform_iterator(
1185 ii.second,
typename NewItem::GetValue()),
1186 local.size, wl, window, tid);
1190 void broadcastLimits(ThreadLocalData& local) {
1191 for (
unsigned i = 1; i < numActive; ++i) {
1192 ThreadLocalData& other = *data.getRemote(i);
1193 other.minId = local.minId;
1194 other.maxId = local.maxId;
1195 other.size = local.size;
1199 void receiveLimits(ThreadLocalData& local) {
1200 for (
unsigned i = 1; i < numActive; ++i) {
1201 ThreadLocalData& other = *data.getRemote(i);
1202 local.minId =
std::min(other.minId, local.minId);
1203 local.maxId =
std::max(other.maxId, local.maxId);
1204 local.size += other.size;
1209 template <
typename BiIteratorTy>
1210 void initialLimits(BiIteratorTy ii, BiIteratorTy ei) {
1211 ThreadLocalData& local = *data.getLocal();
1215 local.size = std::distance(ii, ei);
1219 local.minId = local.maxId = ii->parent;
1221 local.minId = ii->parent;
1222 local.maxId = (ei - 1)->parent;
1227 template <
typename InputIteratorTy>
1228 void sortInitialWorkDispatch(InputIteratorTy, InputIteratorTy, ...) {}
1230 template <
typename InputIteratorTy,
bool HasId = OptionsTy::hasId,
1231 bool HasFixed = OptionsTy::hasFixedNeighborhood>
1232 auto sortInitialWorkDispatch(InputIteratorTy ii, InputIteratorTy ei,
int) ->
1233 typename std::enable_if<HasId && !HasFixed, void>::type {
1234 ThreadLocalData& local = *data.getLocal();
1235 size_t dist = std::distance(ii, ei);
1237 mergeBuf.reserve(dist);
1238 for (; ii != ei; ++ii)
1239 mergeBuf.emplace_back(*ii, this->id(*ii), 1);
1243 initialLimits(mergeBuf.begin(), mergeBuf.end());
1244 broadcastLimits(local);
1248 NewWorkManager(
const OptionsTy& o)
1249 : IdManager<OptionsTy>(o),
alloc(&heap), mergeBuf(
alloc),
1254 bool emptyReserve() {
return data.getLocal()->reserve.empty(); }
1256 template <
typename WL>
1257 void pushNextWindow(WL* wl,
size_t window) {
1258 ThreadLocalData& local = *data.getLocal();
1260 while ((p = local.reserve.peek())) {
1261 if (p->id >= window)
1264 local.reserve.pop();
1268 void clearNewWork() { data.getLocal()->heap.clear(); }
1270 template <
typename InputIteratorTy>
1271 void sortInitialWork(InputIteratorTy ii, InputIteratorTy ei) {
1272 return sortInitialWorkDispatch(ii, ei, 0);
1275 template <
typename InputIteratorTy,
typename WL>
1276 void addInitialWork(WindowManager<OptionsTy>& wm, InputIteratorTy b,
1277 InputIteratorTy e, WL* wl) {
1278 size_t dist = std::distance(b, e);
1279 if (OptionsTy::hasId) {
1280 ThreadLocalData& local = *data.getLocal();
1281 size_t window = wm.initialWindow(dist, OptionsTy::MinDelta, local.minId);
1282 if (OptionsTy::hasFixedNeighborhood) {
1285 copyMine(boost::make_transform_iterator(mergeBuf.begin(),
1286 typename NewItem::GetValue()),
1287 boost::make_transform_iterator(mergeBuf.end(),
1288 typename NewItem::GetValue()),
1292 size_t window = wm.initialWindow(dist, OptionsTy::MinDelta);
1293 copyMineAfterRedistribute(b, e, dist, wl, window,
1298 template <
bool HasId = OptionsTy::hasId>
1299 auto pushNew(
const value_type& val,
unsigned long,
unsigned) ->
1300 typename std::enable_if<HasId, void>::type {
1301 new_.push(NewItem(val, this->
id(val), 1));
1304 template <
bool HasId = OptionsTy::hasId>
1305 auto pushNew(
const value_type& val,
unsigned long parent,
unsigned count) ->
1306 typename std::enable_if<!HasId, void>::type {
1307 new_.push(NewItem(val, parent, count));
1310 template <
typename WL>
1311 void distributeNewWork(WindowManager<OptionsTy>& wm, WL* wl) {
1316 template <
typename OptionsTy>
1317 class Executor :
public BreakManager<OptionsTy>,
1318 public StateManager<OptionsTy>,
1319 public NewWorkManager<OptionsTy>,
1320 public WindowManager<OptionsTy>,
1321 public DAGManager<OptionsTy>,
1322 public IntentToReadManager<OptionsTy> {
1324 typedef DItem<OptionsTy> Item;
1325 typedef DeterministicContext<OptionsTy> Context;
1327 typedef worklists::PerSocketChunkFIFO<OptionsTy::ChunkSize, Item> WL;
1328 typedef worklists::PerSocketChunkFIFO<OptionsTy::ChunkSize, Context>
1330 typedef worklists::ChunkFIFO<OptionsTy::ChunkSize, Context, false>
1334 using LoopStat = LoopStatistics<OptionsTy::needStats>;
1335 struct ThreadLocalData :
public LoopStat {
1337 typename OptionsTy::function1_type fn1;
1338 typename OptionsTy::function2_type fn2;
1339 LocalPendingWork localPending;
1340 UserContextAccess<value_type> facing;
1347 ThreadLocalData(
const OptionsTy& o,
const char*
loopname)
1348 : LoopStat(loopname), fn1(o.fn1), fn2(o.fn2), rounds(0),
1353 substrate::Barrier& barrier;
1355 PendingWork pending;
1357 substrate::CacheLineStorage<volatile long> innerDone;
1358 substrate::CacheLineStorage<volatile long> outerDone;
1359 substrate::CacheLineStorage<volatile long> hasNewWork;
1361 int runFunction(ThreadLocalData& tld, Context* ctx);
1363 bool pendingLoop(ThreadLocalData& tld);
1364 bool commitLoop(ThreadLocalData& tld);
1367 void drainPending(ThreadLocalData& tld) {
1369 while ((ctx = this->peekContext(tld.localPending, pending))) {
1371 this->popContext(tld.localPending, pending);
1376 Executor(
const OptionsTy& o)
1377 : BreakManager<OptionsTy>(o), NewWorkManager<OptionsTy>(o), options(o),
1379 loopname(galois::internal::getLoopName(o.args)) {
1380 static_assert(!OptionsTy::needsBreak || OptionsTy::hasBreak,
1381 "need to use break function to break loop");
1384 bool executeTask(ThreadLocalData& tld, Context* ctx);
1386 template <
typename RangeTy>
1388 Context::initialize();
1389 this->addInitialWork(*
this, range.begin(), range.end(), &worklists[1]);
1392 template <
typename RangeTy>
1393 void init(
const RangeTy& range) {
1394 this->sortInitialWork(range.begin(), range.end());
1400 template <
typename OptionsTy>
1401 void Executor<OptionsTy>::go() {
1402 ThreadLocalData tld(options,
loopname);
1403 auto& local = this->getLocalWindowManager();
1404 tld.wlcur = &worklists[0];
1405 tld.wlnext = &worklists[1];
1407 tld.hasNewWork =
false;
1415 std::swap(tld.wlcur, tld.wlnext);
1416 bool nextPending = pendingLoop(tld);
1417 innerDone.get() =
true;
1421 if (this->buildDAG())
1424 if (this->buildIntentToRead())
1427 bool nextCommit =
false;
1428 outerDone.get() =
true;
1430 if (this->executeDAG(*
this, tld)) {
1431 if (OptionsTy::needsBreak)
1437 nextCommit = commitLoop(tld);
1439 if (nextPending || nextCommit)
1440 innerDone.get() =
false;
1444 if (innerDone.get())
1447 this->calculateWindow(
true);
1451 this->pushNextWindow(tld.wlnext, local.nextWindow());
1454 if (!this->emptyReserve())
1455 outerDone.get() =
false;
1458 hasNewWork.get() =
true;
1460 if (this->checkBreak())
1465 if (outerDone.get()) {
1466 if (!OptionsTy::needsPush)
1468 if (!hasNewWork.get())
1470 this->distributeNewWork(*
this, tld.wlnext);
1471 tld.hasNewWork =
false;
1474 hasNewWork.get() =
false;
1476 this->calculateWindow(
false);
1478 this->pushNextWindow(tld.wlnext, local.nextWindow());
1482 this->destroyDAGManager();
1483 this->clearNewWork();
1485 if (OptionsTy::needStats) {
1493 template <
typename OptionsTy>
1494 int Executor<OptionsTy>::runFunction(ThreadLocalData& tld, Context* ctx) {
1496 #ifdef GALOIS_USE_LONGJMP_ABORT
1497 if ((result = setjmp(
execFrame)) == 0) {
1498 #elif defined(GALOIS_USE_EXCEPTION_ABORT)
1501 tld.fn1(ctx->item.val, tld.facing.data());
1502 #ifdef GALOIS_USE_LONGJMP_ABORT
1504 clearConflictLock();
1506 #elif defined(GALOIS_USE_EXCEPTION_ABORT)
1508 clearConflictLock();
1515 template <
typename OptionsTy>
1516 bool Executor<OptionsTy>::pendingLoop(ThreadLocalData& tld) {
1517 auto& local = this->getLocalWindowManager();
1518 bool retval =
false;
1520 while ((p = tld.wlcur->pop())) {
1523 Context* ctx = this->emplaceContext(tld.localPending, pending, *p);
1524 this->pushDAGTask(ctx);
1525 local.incrementIterations();
1528 ctx->startIteration();
1529 ctx->setFirstPass();
1530 tld.inc_iterations();
1531 tld.facing.setFirstPass();
1534 this->allocLocalState(tld.facing, tld.fn2);
1535 int result = runFunction(tld, ctx);
1537 tld.facing.resetFirstPass();
1538 ctx->resetFirstPass();
1552 if (OptionsTy::needsPia && !OptionsTy::useLocalState)
1553 tld.facing.resetAlloc();
1555 if (commit || OptionsTy::hasFixedNeighborhood) {
1556 this->saveLocalState(tld.facing, ctx->item);
1565 template <
typename OptionsTy>
1566 bool Executor<OptionsTy>::executeTask(ThreadLocalData& tld, Context* ctx) {
1568 this->restoreLocalState(tld.facing, ctx->item);
1569 tld.facing.resetFirstPass();
1570 ctx->resetFirstPass();
1572 #ifdef GALOIS_USE_LONGJMP_ABORT
1573 if ((result = setjmp(
execFrame)) == 0) {
1574 #elif defined(GALOIS_USE_EXCEPTION_ABORT)
1577 tld.fn2(ctx->item.val, tld.facing.data());
1578 #ifdef GALOIS_USE_LONGJMP_ABORT
1580 clearConflictLock();
1582 #elif defined(GALOIS_USE_EXCEPTION_ABORT)
1584 clearConflictLock();
1600 if (OptionsTy::needsPush) {
1601 unsigned long parent = ctx->item.id;
1605 for (
auto& item : tld.facing.getPushBuffer()) {
1606 this->pushNew(item, parent, ++count);
1612 tld.hasNewWork =
true;
1614 assert(OptionsTy::needsPush || tld.facing.getPushBuffer().begin() ==
1615 tld.facing.getPushBuffer().end());
1620 template <
typename OptionsTy>
1621 bool Executor<OptionsTy>::commitLoop(ThreadLocalData& tld) {
1622 bool retval =
false;
1623 auto& local = this->getLocalWindowManager();
1626 while ((ctx = this->peekContext(tld.localPending, pending))) {
1627 bool commit =
false;
1629 commit = executeTask(tld, ctx);
1632 ctx->commitIteration();
1633 local.incrementCommitted();
1635 this->reuseItem(ctx->item);
1636 tld.wlnext->push(ctx->item);
1637 tld.inc_conflicts();
1639 ctx->cancelIteration();
1642 this->deallocLocalState(tld.facing);
1644 if (OptionsTy::needsPia && !OptionsTy::useLocalState)
1645 tld.facing.resetAlloc();
1647 tld.facing.resetPushBuffer();
1649 this->popContext(tld.localPending, pending);
1652 if (OptionsTy::needsPia && OptionsTy::useLocalState)
1653 tld.facing.resetAlloc();
1663 namespace worklists {
1668 template <
typename T =
int>
1670 template <
bool _concurrent>
1673 template <
typename _T>
1683 template <
class T,
class FunctionTy,
class ArgsTy>
1685 :
public internal::Executor<internal::Options<T, FunctionTy, ArgsTy>> {
Deterministic execution.
Definition: Executor_Deterministic.h:1669
static SimpleRuntimeContext * getOwner(Lockable *lockable)
Definition: libgalois/include/galois/runtime/Context.h:143
void setOwner(Lockable *lockable)
Definition: libgalois/include/galois/runtime/Context.h:112
galois::runtime::ExternalHeapAllocator< char, IterAllocBaseTy > PerIterAllocTy
Per-iteration allocator that conforms to STL allocator interface.
Definition: Mem.h:36
Intrusive union-find implementation.
Definition: UnionFind.h:33
UnionFindNode(ReaderContext *s)
Definition: UnionFind.h:49
galois::runtime::BumpWithMallocHeap< galois::runtime::FreeListHeap< galois::runtime::SystemHeap > > IterAllocBaseTy
[PerIterAllocTy example] Base allocator for per-iteration allocator
Definition: Mem.h:32
void setThreadContext(SimpleRuntimeContext *n)
used by the parallel code to set up conflict detection per thread
Definition: Context.cpp:31
unsigned int getActiveThreads() noexcept
Returns the number of threads in use.
Definition: Threads.cpp:37
static SizedHeap * getHeapForSize(const size_t)
[FixedSizeAllocator example]
Definition: Mem.cpp:36
thread_local std::jmp_buf execFrame
Definition: Context.cpp:29
substrate::Barrier & getBarrier(unsigned activeThreads)
Have a pre-instantiated barrier available for use.
Definition: Substrate.cpp:24
void init(const RangeTy &range)
Definition: Executor_ParaMeter.h:409
T value_type
Definition: Executor_Deterministic.h:1676
void initThread(const RangeTy &) const
Definition: Executor_ParaMeter.h:415
const char * loopname
Definition: Executor_ParaMeter.h:145
#define GALOIS_DIE(...)
Definition: gIO.h:96
TerminationDetection & getSystemTermination(unsigned activeThreads)
Definition: Termination.cpp:35
void addToNhood(Lockable *lockable)
Definition: libgalois/include/galois/runtime/Context.h:150
bool operator!=(const flat_map< _Key, _Tp, _Compare, _Alloc > &__x, const flat_map< _Key, _Tp, _Compare, _Alloc > &__y)
Based on operator==.
Definition: FlatMap.h:344
void sort(RandomAccessIterator first, RandomAccessIterator last, Compare comp)
Definition: ParallelSTL.h:247
internal::Executor< OptionsTy > SuperTy
Definition: Executor_Deterministic.h:1687
Galois version of boost::optional.
Definition: optional.h:34
const ReaderContext * find() const
Definition: UnionFind.h:60
s_wl< T, Args...> wl(Args &&...args)
Definition: Traits.h:219
SimpleRuntimeContext(bool child=false)
Definition: libgalois/include/galois/runtime/Context.h:172
void reportStat_Single(const S1 ®ion, const S2 &category, const T &value)
Definition: Statistics.h:544
const Ty max(std::atomic< Ty > &a, const Ty &b)
Definition: AtomicHelpers.h:40
static unsigned getTID()
Definition: ThreadPool.h:204
virtual void subAcquire(Lockable *lockable, galois::MethodFlag m)
Definition: Context.cpp:96
bool stealByCAS(Lockable *lockable, LockManagerBase *other)
Definition: libgalois/include/galois/runtime/Context.h:102
void gPrint(Args &&...args)
Prints a sequence of things.
Definition: gIO.h:47
unsigned int activeThreads
Definition: Threads.cpp:26
std::pair< TwoLevelIteratorA< OuterIter, InnerIter, CategoryOrTraversal, InnerBeginFn, InnerEndFn >, TwoLevelIteratorA< OuterIter, InnerIter, CategoryOrTraversal, InnerBeginFn, InnerEndFn > > make_two_level_iterator(OuterIter outer_begin, OuterIter outer_end)
Definition: TwoLevelIteratorA.h:420
ConflictFlag
Definition: libgalois/include/galois/runtime/Context.h:41
MethodFlag
What should the runtime do when executing a method.
Definition: MethodFlags.h:34
void * alloc()
Definition: PerThreadStorage.cpp:42
void operator()(void)
Definition: Executor_ParaMeter.h:417
internal::Options< T, FunctionTy, ArgsTy > OptionsTy
Definition: Executor_Deterministic.h:1686
const Ty min(std::atomic< Ty > &a, const Ty &b)
Definition: AtomicHelpers.h:70
static bool tryLock(Lockable *lockable)
Definition: libgalois/include/galois/runtime/Context.h:124
Definition: Executor_ForEach.h:150
ThreadPrivateHeap< FreeListHeap< BumpHeap< SystemHeap > > > SizedHeap
[FixedSizeAllocator example]
Definition: runtime/Mem.h:609
IterTy safe_advance(IterTy b, IterTy e, Distance n)
Like std::advance but returns end if end is closer than the advance amount.
Definition: gstl.h:222
bool operator==(const flat_map< _Key, _Tp, _Compare, _Alloc > &__x, const flat_map< _Key, _Tp, _Compare, _Alloc > &__y)
Definition: FlatMap.h:331
Definition: libgalois/include/galois/runtime/Context.h:42
ForEachExecutor(FunctionTy f, const ArgsTy &args)
Definition: Executor_Deterministic.h:1688
Wrapper< T, std::deque< T >, false > FIFO
Definition: Simple.h:83
constexpr auto get_trait_value(Tuple tpl)
Returns the value associated with the given trait T in a tuple.
Definition: Traits.h:112
Like std::deque but use Galois memory management functionality.
Definition: gdeque.h:43
T value_type
Definition: Executor_ParaMeter.h:111
Definition: libgalois/include/galois/runtime/Context.h:44
unsigned commitIteration()
Definition: Context.cpp:77