Galois
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
Executor_Deterministic.h
Go to the documentation of this file.
1 /*
2  * This file belongs to the Galois project, a C++ library for exploiting
3  * parallelism. The code is being released under the terms of the 3-Clause BSD
4  * License (a copy is located in LICENSE.txt at the top-level directory).
5  *
6  * Copyright (C) 2018, The University of Texas at Austin. All rights reserved.
7  * UNIVERSITY EXPRESSLY DISCLAIMS ANY AND ALL WARRANTIES CONCERNING THIS
8  * SOFTWARE AND DOCUMENTATION, INCLUDING ANY WARRANTIES OF MERCHANTABILITY,
9  * FITNESS FOR ANY PARTICULAR PURPOSE, NON-INFRINGEMENT AND WARRANTIES OF
10  * PERFORMANCE, AND ANY WARRANTY THAT MIGHT OTHERWISE ARISE FROM COURSE OF
11  * DEALING OR USAGE OF TRADE. NO WARRANTY IS EITHER EXPRESS OR IMPLIED WITH
12  * RESPECT TO THE USE OF THE SOFTWARE OR DOCUMENTATION. Under no circumstances
13  * shall University be liable for incidental, special, indirect, direct or
14  * consequential damages or loss of profits, interruption of business, or
15  * related expenses which may arise from use of Software or Documentation,
16  * including but not limited to those resulting from defects in Software and/or
17  * Documentation, or loss or inaccuracy of data of any kind.
18  */
19 
20 #ifndef GALOIS_RUNTIME_EXECUTOR_DETERMINISTIC_H
21 #define GALOIS_RUNTIME_EXECUTOR_DETERMINISTIC_H
22 
23 #include <deque>
24 #include <queue>
25 #include <type_traits>
26 
27 #include <boost/iterator/counting_iterator.hpp>
28 #include <boost/iterator/iterator_facade.hpp>
29 #include <boost/iterator/transform_iterator.hpp>
30 
31 #include "galois/Bag.h"
32 #include "galois/config.h"
33 #include "galois/gIO.h"
34 #include "galois/gslist.h"
35 #include "galois/ParallelSTL.h"
38 #include "galois/runtime/Mem.h"
39 #include "galois/runtime/Range.h"
45 #include "galois/Threads.h"
47 #include "galois/UnionFind.h"
49 
50 // TODO deterministic hash
51 // TODO deterministic hash: only give ids to window
52 // TODO detect and fail if using releasable objects
53 // TODO fixed neighborhood: cyclic scheduling
54 // TODO fixed neighborhood: reduce list contention
55 // TODO fixed neighborhood: profile, reuse graph
56 // TODO fixed neighborhood: still ~2X slower than implicit version on bfs
57 namespace galois {
58 namespace runtime {
60 namespace internal {
61 
62 extern thread_local SizedHeapFactory::SizedHeap* dagListHeap;
63 
64 template <typename T, bool UseLocalState>
65 class DItemBase {
66 public:
67  T val;
68  unsigned long id;
69 
70  DItemBase(const T& _val, unsigned long _id) : val(_val), id(_id) {}
71  void* getLocalState() const { return nullptr; }
72  void setLocalState(void*) {}
73 };
74 
75 template <typename T>
76 class DItemBase<T, true> {
77 public:
78  T val;
79 
80 private:
81  void* localState;
82 
83 public:
84  unsigned long id;
85 
86  DItemBase(const T& _val, unsigned long _id)
87  : val(_val), localState(nullptr), id(_id) {}
88  void* getLocalState() const { return localState; }
89  void setLocalState(void* ptr) { localState = ptr; }
90 };
91 
92 template <typename OptionsTy>
93 using DItem =
94  DItemBase<typename OptionsTy::value_type, OptionsTy::useLocalState>;
95 
96 class FirstPassBase : public SimpleRuntimeContext {
97 protected:
98  bool firstPassFlag;
99 
100 public:
101  explicit FirstPassBase(bool f = true)
102  : SimpleRuntimeContext(true), firstPassFlag(f) {}
103 
104  bool isFirstPass(void) const { return firstPassFlag; }
105 
106  void setFirstPass(void) { firstPassFlag = true; }
107 
108  void resetFirstPass(void) { firstPassFlag = false; }
109 
110  virtual void alwaysAcquire(Lockable*, galois::MethodFlag) = 0;
111 
112  virtual void subAcquire(Lockable* lockable, galois::MethodFlag f) {
113  if (isFirstPass()) {
114  alwaysAcquire(lockable, f);
115  }
116  }
117 };
118 
119 template <typename OptionsTy, bool HasFixedNeighborhood, bool HasIntentToRead>
120 class DeterministicContextBase : public FirstPassBase {
121 public:
122  typedef DItem<OptionsTy> Item;
123  Item item;
124 
125 private:
126  bool notReady;
127 
128 public:
129  DeterministicContextBase(const Item& _item)
130  : FirstPassBase(true), item(_item), notReady(false) {}
131 
132  void clear() {}
133 
134  bool isReady() { return !notReady; }
135 
136  virtual void alwaysAcquire(Lockable* lockable, galois::MethodFlag) {
137 
138  if (this->tryLock(lockable))
139  this->addToNhood(lockable);
140 
141  DeterministicContextBase* other;
142  do {
143  other = static_cast<DeterministicContextBase*>(this->getOwner(lockable));
144  if (other == this)
145  return;
146  if (other) {
147  bool conflict = other->item.id < this->item.id;
148  if (conflict) {
149  // A lock that I want but can't get
150  notReady = true;
151  return;
152  }
153  }
154  } while (!this->stealByCAS(lockable, other));
155 
156  // Disable loser
157  if (other) {
158  // Only need atomic write
159  other->notReady = true;
160  }
161  }
162 
163  static void initialize() {}
164 };
165 
166 class HasIntentToReadContext : public FirstPassBase {
167 public:
168  unsigned long id;
169  bool notReady;
170  bool isWriter;
171 
172  HasIntentToReadContext(unsigned long id, bool w)
173  : FirstPassBase(true), id(id), notReady(false), isWriter(w) {}
174 
175  bool isReady() { return !notReady; }
176 };
177 
178 class ReaderContext : public galois::UnionFindNode<ReaderContext>,
179  public HasIntentToReadContext {
180  template <typename, bool, bool>
181  friend class DeterministicContextBase;
182 
183 public:
184  ReaderContext(unsigned long id)
185  : galois::UnionFindNode<ReaderContext>(const_cast<ReaderContext*>(this)),
186  HasIntentToReadContext(id, false) {}
187 
188  void build() {
189  if (this->isReady())
190  return;
191  ReaderContext* r = this->find();
192  if (r->isReady())
193  r->notReady = true;
194  }
195 
196  bool propagate() { return this->find()->isReady(); }
197 
198  virtual void alwaysAcquire(Lockable*, galois::MethodFlag) {
199  GALOIS_DIE("unreachable");
200  }
201 };
202 
203 template <typename OptionsTy>
204 class DeterministicContextBase<OptionsTy, false, true>
205  : public HasIntentToReadContext {
206 public:
207  typedef DItem<OptionsTy> Item;
208  Item item;
209 
210 private:
211  ReaderContext readerCtx;
212 
213  void acquireRead(Lockable* lockable) {
214  HasIntentToReadContext* other;
215  do {
216  other = static_cast<HasIntentToReadContext*>(this->getOwner(lockable));
217  if (other == this || other == &readerCtx)
218  return;
219  if (other) {
220  bool conflict = other->id < this->id;
221  if (conflict) {
222  if (other->isWriter)
223  readerCtx.notReady = true;
224  else
225  readerCtx.merge(static_cast<ReaderContext*>(other));
226  return;
227  }
228  }
229  } while (!readerCtx.stealByCAS(lockable, other));
230 
231  // Disable loser
232  if (other) {
233  if (other->isWriter) {
234  // Only need atomic write
235  other->notReady = true;
236  } else {
237  static_cast<ReaderContext*>(other)->merge(&readerCtx);
238  }
239  }
240  }
241 
242  void acquireWrite(Lockable* lockable) {
243  HasIntentToReadContext* other;
244  do {
245  other = static_cast<HasIntentToReadContext*>(this->getOwner(lockable));
246  if (other == this || other == &readerCtx)
247  return;
248  if (other) {
249  bool conflict = other->id < this->id;
250  if (conflict) {
251  // A lock that I want but can't get
252  this->notReady = true;
253  return;
254  }
255  }
256  } while (!this->stealByCAS(lockable, other));
257 
258  // Disable loser
259  if (other) {
260  // Only need atomic write
261  other->notReady = true;
262  }
263  }
264 
265 public:
266  DeterministicContextBase(const Item& i)
267  : HasIntentToReadContext(i.id, true), item(i), readerCtx(i.id) {}
268 
269  void clear() {}
270 
271  void build() { readerCtx.build(); }
272 
273  void propagate() {
274  if (this->isReady() && !readerCtx.propagate())
275  this->notReady = true;
276  }
277 
278  virtual void alwaysAcquire(Lockable* lockable, galois::MethodFlag m) {
279  assert(m == MethodFlag::READ || m == MethodFlag::WRITE);
280 
281  if (this->tryLock(lockable))
282  this->addToNhood(lockable);
283 
284  if (m == MethodFlag::READ) {
285  acquireRead(lockable);
286  } else {
287  assert(m == MethodFlag::WRITE);
288  acquireWrite(lockable);
289  }
290  }
291 
292  static void initialize() {}
293 };
294 
295 template <typename OptionsTy>
296 class DeterministicContextBase<OptionsTy, true, false> : public FirstPassBase {
297 public:
298  typedef DItem<OptionsTy> Item;
300  Item item;
301  ContextList edges;
302  ContextList succs;
303  std::atomic<int> preds;
304 
305  struct ContextPtrLessThan {
306  bool operator()(const DeterministicContextBase* a,
307  const DeterministicContextBase* b) const {
308  // XXX non-deterministic behavior when we have multiple items with the
309  // same id
310  if (a->item.id == b->item.id)
311  return a < b;
312  return a->item.id < b->item.id;
313  }
314  };
315 
316 public:
317  DeterministicContextBase(const Item& _item)
318  : FirstPassBase(true), item(_item), preds(0) {}
319 
320  void clear() {
321  assert(preds == 0);
322  this->commitIteration();
323  // TODO replace with bulk heap
324  edges.clear(*dagListHeap);
325  succs.clear(*dagListHeap);
326  }
327 
328  void addEdge(DeterministicContextBase* o) {
329  succs.push_front(*dagListHeap, o);
330  o->preds += 1;
331  }
332 
333  bool isReady() { return false; }
334 
335  virtual void alwaysAcquire(Lockable* lockable, galois::MethodFlag) {
336 
337  // First to lock becomes representative
338  DeterministicContextBase* owner =
339  static_cast<DeterministicContextBase*>(this->getOwner(lockable));
340  while (!owner) {
341  if (this->tryLock(lockable)) {
342  this->setOwner(lockable);
343  this->addToNhood(lockable);
344  }
345 
346  owner = static_cast<DeterministicContextBase*>(this->getOwner(lockable));
347  }
348 
349  if (std::find(edges.begin(), edges.end(), owner) != edges.end())
350  return;
351  edges.push_front(*dagListHeap, owner);
352  }
353 
354  static void initialize() {
355  if (!dagListHeap)
356  dagListHeap = SizedHeapFactory::getHeapForSize(
357  sizeof(typename ContextList::block_type));
358  }
359 };
360 
361 template <typename OptionsTy>
362 class DeterministicContextBase<OptionsTy, true, true> {
363  // TODO implement me
364 };
365 
366 template <typename OptionsTy>
367 using DeterministicContext =
368  DeterministicContextBase<OptionsTy, OptionsTy::hasFixedNeighborhood,
369  OptionsTy::hasIntentToRead>;
370 
371 template <typename T>
372 struct DNewItem {
373  T val;
374  unsigned long parent;
375  unsigned count;
376 
377  DNewItem(const T& _val, unsigned long _parent, unsigned _count)
378  : val(_val), parent(_parent), count(_count) {}
379 
380  bool operator<(const DNewItem<T>& o) const {
381  if (parent < o.parent)
382  return true;
383  else if (parent == o.parent)
384  return count < o.count;
385  else
386  return false;
387  }
388 
389  bool operator==(const DNewItem<T>& o) const {
390  return parent == o.parent && count == o.count;
391  }
392 
393  bool operator!=(const DNewItem<T>& o) const { return !(*this == o); }
394 
395  struct GetValue : public std::unary_function<DNewItem<T>, const T&> {
396  const T& operator()(const DNewItem<T>& x) const { return x.val; }
397  };
398 };
399 
400 template <typename InputIteratorTy>
401 void safe_advance(InputIteratorTy& it, size_t d, size_t& cur, size_t dist) {
402  if (d + cur >= dist) {
403  d = dist - cur;
404  }
405  std::advance(it, d);
406  cur += d;
407 }
408 
411 template <int ChunkSize, typename T>
412 struct FIFO {
413  worklists::ChunkFIFO<ChunkSize, T, false> m_data;
414  worklists::ChunkLIFO<16, T, false> m_buffer;
415  size_t m_size;
416 
417  FIFO() : m_size(0) {}
418 
419  ~FIFO() {
421  while ((p = m_buffer.pop()))
422  ;
423  while ((p = m_data.pop()))
424  ;
425  }
426 
427  galois::optional<T> pop() {
429  if ((p = m_buffer.pop()) || (p = m_data.pop())) {
430  --m_size;
431  }
432  return p;
433  }
434 
435  galois::optional<T> peek() {
437  if ((p = m_buffer.pop())) {
438  m_buffer.push(*p);
439  } else if ((p = m_data.pop())) {
440  m_buffer.push(*p);
441  }
442  return p;
443  }
444 
445  void push(const T& val) {
446  m_data.push(val);
447  ++m_size;
448  }
449 
450  size_t size() const { return m_size; }
451 
452  bool empty() const { return m_size == 0; }
453 };
454 
455 template <typename T, typename FunctionTy, typename ArgsTy>
456 struct OptionsCommon {
457  typedef T value_type;
458  typedef FunctionTy function2_type;
459  typedef ArgsTy args_type;
460 
461  constexpr static bool needStats = galois::internal::NeedStats<ArgsTy>::value;
462  constexpr static bool needsPush = !has_trait<no_pushes_tag, ArgsTy>();
463  constexpr static bool needsAborts = !has_trait<no_conflicts_tag, ArgsTy>();
464  constexpr static bool needsPia = has_trait<per_iter_alloc_tag, ArgsTy>();
465  constexpr static bool needsBreak = has_trait<parallel_break_tag, ArgsTy>();
466 
467  constexpr static bool hasBreak = has_trait<det_parallel_break_tag, ArgsTy>();
468  constexpr static bool hasId = has_trait<det_id_tag, ArgsTy>();
469 
470  constexpr static bool useLocalState = has_trait<local_state_tag, ArgsTy>();
471  constexpr static bool hasFixedNeighborhood =
472  has_trait<fixed_neighborhood_tag, ArgsTy>();
473  constexpr static bool hasIntentToRead =
474  has_trait<intent_to_read_tag, ArgsTy>();
475 
476  static const int ChunkSize = 32;
477  static const unsigned InitialNumRounds = 100;
478  static const size_t MinDelta = ChunkSize * 40;
479 
480  static_assert(
481  !hasFixedNeighborhood || (hasFixedNeighborhood && hasId),
482  "Please provide id function when operator has fixed neighborhood");
483 
484  function2_type fn2;
485  args_type args;
486 
487  OptionsCommon(function2_type f, ArgsTy a) : fn2(f), args(a) {}
488 };
489 
490 template <typename T, typename FunctionTy, typename ArgsTy, bool Enable>
491 struct OptionsBase : public OptionsCommon<T, FunctionTy, ArgsTy> {
492  typedef OptionsCommon<T, FunctionTy, ArgsTy> SuperTy;
493  typedef FunctionTy function1_type;
494 
495  function1_type fn1;
496 
497  OptionsBase(function1_type f, ArgsTy a) : SuperTy(f, a), fn1(f) {}
498 };
499 
500 template <typename T, typename FunctionTy, typename ArgsTy>
501 struct OptionsBase<T, FunctionTy, ArgsTy, true>
502  : public OptionsCommon<T, FunctionTy, ArgsTy> {
503  typedef OptionsCommon<T, FunctionTy, ArgsTy> SuperTy;
504  typedef typename get_trait_type<neighborhood_visitor_tag, ArgsTy>::type::type
505  function1_type;
506 
507  function1_type fn1;
508 
509  OptionsBase(FunctionTy f, ArgsTy a)
510  : SuperTy(f, a), fn1(get_trait_value<neighborhood_visitor_tag>(a).value) {
511  }
512 };
513 
514 template <typename T, typename FunctionTy, typename ArgsTy>
515 using Options = OptionsBase<T, FunctionTy, ArgsTy,
516  has_trait<neighborhood_visitor_tag, ArgsTy>()>;
517 
518 template <typename OptionsTy, bool Enable>
519 class DAGManagerBase {
520  typedef DeterministicContext<OptionsTy> Context;
521 
522 public:
523  void destroyDAGManager() {}
524  void pushDAGTask(Context*) {}
525  bool buildDAG() { return false; }
526  template <typename Executor, typename ExecutorTLD>
527  bool executeDAG(Executor&, ExecutorTLD&) {
528  return false;
529  }
530 };
531 
532 template <typename OptionsTy>
533 class DAGManagerBase<OptionsTy, true> {
534  typedef DeterministicContext<OptionsTy> Context;
535  typedef worklists::PerSocketChunkFIFO<OptionsTy::ChunkSize * 2, Context*> WL1;
536  typedef worklists::PerThreadChunkLIFO<OptionsTy::ChunkSize * 2, Context*> WL2;
537  typedef worklists::PerSocketChunkFIFO<32, Context*> WL3;
538 
539  struct ThreadLocalData : private boost::noncopyable {
540  typedef std::vector<Context*,
541  typename PerIterAllocTy::rebind<Context*>::other>
542  SortBuf;
543  IterAllocBaseTy heap;
545  SortBuf sortBuf;
546  ThreadLocalData() : alloc(&heap), sortBuf(alloc) {}
547  };
548 
549  substrate::PerThreadStorage<ThreadLocalData> data;
550  WL1 taskList;
551  WL2 taskList2;
552  WL3 sourceList;
553  substrate::TerminationDetection& term;
554  substrate::Barrier& barrier;
555 
556 public:
557  DAGManagerBase()
558  : term(substrate::getSystemTermination(activeThreads)),
559  barrier(getBarrier(activeThreads)) {}
560 
561  void destroyDAGManager() { data.getLocal()->heap.clear(); }
562 
563  void pushDAGTask(Context* ctx) { taskList.push(ctx); }
564 
565  bool buildDAG() {
566  ThreadLocalData& tld = *data.getLocal();
568  while ((p = taskList.pop())) {
569  Context* ctx = *p;
570  tld.sortBuf.clear();
571  std::copy(ctx->edges.begin(), ctx->edges.end(),
572  std::back_inserter(tld.sortBuf));
573  std::sort(tld.sortBuf.begin(), tld.sortBuf.end(),
574  typename Context::ContextPtrLessThan());
575 
576  if (!tld.sortBuf.empty()) {
577  Context* last = tld.sortBuf.front();
578  for (auto ii = tld.sortBuf.begin() + 1, ei = tld.sortBuf.end();
579  ii != ei; ++ii) {
580  Context* cur = *ii;
581  if (last != cur && cur != ctx)
582  last->addEdge(cur);
583  last = cur;
584  }
585  }
586 
587  taskList2.push(ctx);
588  }
589  return true;
590  }
591 
592  template <typename Executor, typename ExecutorTLD>
593  bool executeDAG(Executor& e, ExecutorTLD& etld) {
594  auto& local = e.getLocalWindowManager();
596  Context* ctx;
597 
598  // Go through all tasks to find intial sources and
599  while ((p = taskList2.pop())) {
600  ctx = *p;
601  if (ctx->preds.load(std::memory_order_relaxed) == 0)
602  sourceList.push(ctx);
603  }
604 
605  term.initializeThread();
606 
607  barrier.wait();
608 
609  size_t oldCommitted = 0;
610  size_t committed = 0;
611  do {
613  while ((p = sourceList.pop())) {
614  ctx = *p;
615  assert(ctx->preds == 0);
616  bool commit;
617  commit = e.executeTask(etld, ctx);
618  local.incrementCommitted();
619  assert(commit);
620  committed += 1;
621  e.deallocLocalState(etld.facing);
622 
623  if (OptionsTy::needsPia && !OptionsTy::useLocalState)
624  etld.facing.resetAlloc();
625 
626  etld.facing.resetPushBuffer();
627 
628  // enqueue successors
629  for (auto& succ : ctx->succs) {
630  int v = --succ->preds;
631  assert(v >= 0);
632  if (v == 0)
633  sourceList.push(succ);
634  }
635  }
636 
637  term.localTermination(oldCommitted != committed);
638  oldCommitted = committed;
639  substrate::asmPause();
640  } while (!term.globalTermination());
641 
642  if (OptionsTy::needsPia && OptionsTy::useLocalState)
643  etld.facing.resetAlloc();
644 
645  setThreadContext(0);
646 
647  return true;
648  }
649 };
650 
651 template <typename OptionsTy>
652 using DAGManager = DAGManagerBase<OptionsTy, OptionsTy::hasFixedNeighborhood>;
653 
654 template <typename OptionsTy, bool Enable>
655 struct StateManagerBase {
656  typedef typename OptionsTy::value_type value_type;
657  typedef typename OptionsTy::function2_type function_type;
658  void allocLocalState(UserContextAccess<value_type>&, function_type&) {}
659  void deallocLocalState(UserContextAccess<value_type>&) {}
660  void saveLocalState(UserContextAccess<value_type>&, DItem<OptionsTy>&) {}
661  void restoreLocalState(UserContextAccess<value_type>&,
662  const DItem<OptionsTy>&) {}
663  void reuseItem(DItem<OptionsTy>&) {}
664 
665  template <typename LWL, typename GWL>
666  typename GWL::value_type* emplaceContext(LWL&, GWL& gwl,
667  const DItem<OptionsTy>& item) const {
668  return gwl.emplace(item);
669  }
670 
671  template <typename LWL, typename GWL>
672  typename GWL::value_type* peekContext(LWL&, GWL& gwl) const {
673  return gwl.peek();
674  }
675 
676  template <typename LWL, typename GWL>
677  void popContext(LWL&, GWL& gwl) const {
678  gwl.pop_peeked();
679  }
680 };
681 
682 template <typename OptionsTy>
683 struct StateManagerBase<OptionsTy, true> {
684  typedef typename OptionsTy::value_type value_type;
685  typedef typename OptionsTy::function2_type function_type;
686  typedef typename get_trait_type<
687  local_state_tag, typename OptionsTy::args_type>::type::type LocalState;
688 
689  void allocLocalState(UserContextAccess<value_type>& c, function_type&) {
690  void* p = c.data().getPerIterAlloc().allocate(sizeof(LocalState));
691  // new (p) LocalState(self, c.data().getPerIterAlloc());
692  c.setLocalState(p);
693  }
694 
695  void deallocLocalState(UserContextAccess<value_type>& c) {
696  LocalState* p = c.data().template getLocalState<LocalState>();
697  if (p)
698  p->~LocalState();
699  }
700 
701  void saveLocalState(UserContextAccess<value_type>& c,
702  DItem<OptionsTy>& item) {
703  item.setLocalState(c.data().template getLocalState<LocalState>());
704  }
705 
706  void restoreLocalState(UserContextAccess<value_type>& c,
707  const DItem<OptionsTy>& item) {
708  c.setLocalState(item.getLocalState());
709  }
710 
711  template <typename LWL, typename GWL>
712  typename LWL::value_type* emplaceContext(LWL& lwl, GWL&,
713  const DItem<OptionsTy>& item) const {
714  return lwl.emplace(item);
715  }
716 
717  template <typename LWL, typename GWL>
718  typename LWL::value_type* peekContext(LWL& lwl, GWL&) const {
719  return lwl.peek();
720  }
721 
722  template <typename LWL, typename GWL>
723  void popContext(LWL& lwl, GWL&) const {
724  lwl.pop_peeked();
725  }
726 
727  void reuseItem(DItem<OptionsTy>& item) { item.setLocalState(nullptr); }
728 };
729 
730 template <typename OptionsTy>
731 using StateManager = StateManagerBase<OptionsTy, OptionsTy::useLocalState>;
732 
733 template <typename OptionsTy, bool Enable>
734 class BreakManagerBase {
735 public:
736  bool checkBreak() { return false; }
737  BreakManagerBase(const OptionsTy&) {}
738 };
739 
740 template <typename OptionsTy>
741 class BreakManagerBase<OptionsTy, true> {
742  typedef typename get_trait_type<det_parallel_break_tag,
743  typename OptionsTy::args_type>::type::type
744  BreakFn;
745  BreakFn breakFn;
746  substrate::Barrier& barrier;
747  substrate::CacheLineStorage<volatile long> done;
748 
749 public:
750  BreakManagerBase(const OptionsTy& o)
751  : breakFn(get_trait_value<det_parallel_break_tag>(o.args).value),
752  barrier(getBarrier(activeThreads)) {}
753 
754  bool checkBreak() {
756  done.get() = breakFn();
757  barrier.wait();
758  return done.get();
759  }
760 };
761 
762 template <typename OptionsTy>
763 using BreakManager = BreakManagerBase<OptionsTy, OptionsTy::hasBreak>;
764 
765 template <typename OptionsTy, bool Enable>
766 class IntentToReadManagerBase {
767  typedef DeterministicContext<OptionsTy> Context;
768 
769 public:
770  void pushIntentToReadTask(Context*) {}
771  bool buildIntentToRead() { return false; }
772 };
773 
774 template <typename OptionsTy>
775 class IntentToReadManagerBase<OptionsTy, true> {
776  typedef DeterministicContext<OptionsTy> Context;
777  typedef galois::gdeque<Context*> WL;
778  substrate::PerThreadStorage<WL> pending;
779  substrate::Barrier& barrier;
780 
781 public:
782  IntentToReadManagerBase() : barrier(getBarrier(activeThreads)) {}
783 
784  void pushIntentToReadTask(Context* ctx) {
785  pending.getLocal()->push_back(ctx);
786  }
787 
788  // NB(ddn): Need to gather information from dependees before commitLoop
789  // otherwise some contexts will be deallocated before we have time to check
790  bool buildIntentToRead() {
791  for (Context* ctx : *pending.getLocal())
792  ctx->build();
793  barrier.wait();
794  for (Context* ctx : *pending.getLocal())
795  ctx->propagate();
796  pending.getLocal()->clear();
797  return true;
798  }
799 };
800 
801 template <typename OptionsTy>
802 using IntentToReadManager =
803  IntentToReadManagerBase<OptionsTy, OptionsTy::hasIntentToRead>;
804 
805 template <typename OptionsTy, bool Enable>
806 class WindowManagerBase {
807 public:
808  class ThreadLocalData {
809  template <typename, bool>
810  friend class WindowManagerBase;
811  size_t window;
812  size_t delta;
813  size_t committed;
814  size_t iterations;
815 
816  public:
817  size_t nextWindow(bool first = false) {
818  if (first)
819  window = delta;
820  else
821  window += delta;
822  committed = iterations = 0;
823  return window;
824  }
825 
826  void incrementIterations() { ++iterations; }
827  void incrementCommitted() { ++committed; }
828  };
829 
830 private:
831  substrate::PerThreadStorage<ThreadLocalData> data;
832  unsigned numActive;
833 
834 public:
835  WindowManagerBase() { numActive = getActiveThreads(); }
836 
837  ThreadLocalData& getLocalWindowManager() { return *data.getLocal(); }
838 
839  size_t nextWindow(size_t dist, size_t atleast, size_t base = 0) {
840  if (false) {
841  // This, which tries to continue delta with new work, seems to result in
842  // more conflicts (although less total rounds) and more time
843  ThreadLocalData& local = *data.getLocal();
844  return local.nextWindow(true);
845  } else {
846  return initialWindow(dist, atleast, base);
847  }
848  }
849 
850  size_t initialWindow(size_t dist, size_t atleast, size_t base = 0) {
851  ThreadLocalData& local = *data.getLocal();
852  size_t w = std::max(dist / OptionsTy::InitialNumRounds, atleast) + base;
853  local.window = local.delta = w;
854  return w;
855  }
856 
857  void calculateWindow(bool inner) {
858  ThreadLocalData& local = *data.getLocal();
859 
860  // Accumulate all threads' info
861  size_t allcommitted = 0;
862  size_t alliterations = 0;
863  for (unsigned i = 0; i < numActive; ++i) {
864  ThreadLocalData& r = *data.getRemote(i);
865  allcommitted += r.committed;
866  alliterations += r.iterations;
867  }
868 
869  float commitRatio =
870  alliterations > 0 ? allcommitted / (float)alliterations : 0.0;
871  const float target = 0.95;
872 
873  if (commitRatio >= target)
874  local.delta += local.delta;
875  else if (allcommitted == 0) {
876  assert((alliterations == 0) && "someone should have committed");
877  local.delta += local.delta;
878  } else
879  local.delta = commitRatio / target * local.delta;
880 
881  if (!inner) {
882  if (local.delta < OptionsTy::MinDelta)
883  local.delta = OptionsTy::MinDelta;
884  } else if (local.delta < OptionsTy::MinDelta) {
885  // Try to get some new work instead of increasing window
886  local.delta = 0;
887  }
888 
889  // Useful debugging info
890  if (false) {
891  if (substrate::ThreadPool::getTID() == 0) {
892  char buf[1024];
893  snprintf(buf, 1024, "%d %.3f (%zu/%zu) window: %zu delta: %zu\n", inner,
894  commitRatio, allcommitted, alliterations, local.window,
895  local.delta);
896  gPrint(buf);
897  }
898  }
899  }
900 };
901 
902 template <typename OptionsTy>
903 class WindowManagerBase<OptionsTy, true> {
904 public:
905  class ThreadLocalData {
906  public:
907  size_t nextWindow() { return std::numeric_limits<size_t>::max(); }
908 
909  void incrementIterations() {}
910  void incrementCommitted() {}
911  };
912 
913 private:
914  ThreadLocalData data;
915 
916 public:
917  ThreadLocalData& getLocalWindowManager() { return data; }
918 
919  size_t nextWindow(size_t, size_t, size_t = 0) { return data.nextWindow(); }
920 
921  size_t initialWindow(size_t, size_t, size_t = 0) {
923  }
924 
925  void calculateWindow(bool) {}
926 };
927 
928 template <typename OptionsTy>
929 using WindowManager =
930  WindowManagerBase<OptionsTy, OptionsTy::hasFixedNeighborhood>;
931 
932 template <typename OptionsTy, bool Enable>
933 struct IdManagerBase {
934  typedef typename OptionsTy::value_type value_type;
935  IdManagerBase(const OptionsTy&) {}
936  uintptr_t id(const value_type&) { return 0; }
937 };
938 
939 template <typename OptionsTy>
940 class IdManagerBase<OptionsTy, true> {
941  typedef typename OptionsTy::value_type value_type;
942  typedef
943  typename get_trait_type<det_id_tag,
944  typename OptionsTy::args_type>::type::type IdFn;
945  IdFn idFn;
946 
947 public:
948  IdManagerBase(const OptionsTy& o)
949  : idFn(get_trait_value<det_id_tag>(o.args).value) {}
950  uintptr_t id(const value_type& x) { return idFn(x); }
951 };
952 
953 template <typename OptionsTy>
954 using IdManager = IdManagerBase<OptionsTy, OptionsTy::hasId>;
955 
956 template <typename OptionsTy>
957 class NewWorkManager : public IdManager<OptionsTy> {
958  typedef typename OptionsTy::value_type value_type;
959  typedef DItem<OptionsTy> Item;
960  typedef DNewItem<value_type> NewItem;
961  typedef std::vector<NewItem, typename PerIterAllocTy::rebind<NewItem>::other>
962  NewItemsTy;
963  typedef typename NewItemsTy::iterator NewItemsIterator;
964  typedef FIFO<1024, Item> ReserveTy;
965  typedef worklists::PerSocketChunkFIFO<OptionsTy::ChunkSize, NewItem> NewWork;
966 
967  struct GetNewItem : public std::unary_function<int, NewItemsTy&> {
968  NewWorkManager* self;
969  GetNewItem(NewWorkManager* s = 0) : self(s) {}
970  NewItemsTy& operator()(int i) const {
971  return self->data.getRemote(i)->newItems;
972  }
973  };
974 
975  typedef boost::transform_iterator<GetNewItem, boost::counting_iterator<int>>
976  MergeOuterIt;
977  typedef std::vector<NewItem, typename PerIterAllocTy::rebind<NewItem>::other>
978  MergeBuf;
979  typedef std::vector<value_type,
980  typename PerIterAllocTy::rebind<value_type>::other>
981  DistributeBuf;
982 
983  struct ThreadLocalData {
984  IterAllocBaseTy heap;
986  NewItemsTy newItems;
987  ReserveTy reserve;
988  size_t minId;
989  size_t maxId;
990  size_t size;
991 
992  ThreadLocalData() : alloc(&heap), newItems(alloc) {}
993  };
994 
995  IterAllocBaseTy heap;
997  substrate::PerThreadStorage<ThreadLocalData> data;
998  NewWork new_;
999  MergeBuf mergeBuf;
1000  DistributeBuf distributeBuf;
1001  substrate::Barrier& barrier;
1002  unsigned numActive;
1003 
1004  bool merge(int begin, int end) {
1005  if (begin == end)
1006  return false;
1007  else if (begin + 1 == end)
1008  return !data.getRemote(begin)->newItems.empty();
1009 
1010  bool retval = false;
1011  int mid = (end - begin) / 2 + begin;
1012  retval |= merge(begin, mid);
1013  retval |= merge(mid, end);
1014 
1015  GetNewItem fn(this);
1016 
1017  MergeOuterIt bbegin(boost::make_counting_iterator(begin), fn);
1018  MergeOuterIt mmid(boost::make_counting_iterator(mid), fn);
1019  MergeOuterIt eend(boost::make_counting_iterator(end), fn);
1020  auto aa = make_two_level_iterator<std::forward_iterator_tag, MergeOuterIt,
1021  typename NewItemsTy::iterator, GetBegin,
1022  GetEnd>(bbegin, mmid);
1023  auto bb = make_two_level_iterator<std::forward_iterator_tag, MergeOuterIt,
1024  typename NewItemsTy::iterator, GetBegin,
1025  GetEnd>(mmid, eend);
1026  auto cc = make_two_level_iterator<std::forward_iterator_tag, MergeOuterIt,
1027  typename NewItemsTy::iterator, GetBegin,
1028  GetEnd>(bbegin, eend);
1029 
1030  while (aa.first != aa.second && bb.first != bb.second) {
1031  if (*aa.first < *bb.first)
1032  mergeBuf.push_back(*aa.first++);
1033  else
1034  mergeBuf.push_back(*bb.first++);
1035  }
1036 
1037  for (; aa.first != aa.second; ++aa.first)
1038  mergeBuf.push_back(*aa.first);
1039 
1040  for (; bb.first != bb.second; ++bb.first)
1041  mergeBuf.push_back(*bb.first);
1042 
1043  for (NewItemsIterator ii = mergeBuf.begin(), ei = mergeBuf.end(); ii != ei;
1044  ++ii)
1045  *cc.first++ = *ii;
1046 
1047  mergeBuf.clear();
1048 
1049  assert(cc.first == cc.second);
1050 
1051  return retval;
1052  }
1053 
1064  template <typename InputIteratorTy>
1065  void redistribute(InputIteratorTy ii, InputIteratorTy ei, size_t dist,
1066  size_t window, unsigned tid) {
1067  // ThreadLocalData& local = *data.getLocal();
1068  size_t blockSize = window;
1069  size_t numBlocks = dist / blockSize;
1070 
1071  size_t cur = 0;
1072  safe_advance(ii, tid, cur, dist);
1073  while (ii != ei) {
1074  unsigned long id;
1075  if (cur < blockSize * numBlocks)
1076  id = (cur % numBlocks) * blockSize + (cur / numBlocks);
1077  else
1078  id = cur;
1079  distributeBuf[id] = *ii;
1080  safe_advance(ii, numActive, cur, dist);
1081  }
1082  }
1083 
1084  template <typename InputIteratorTy, typename WL>
1085  void copyMine(InputIteratorTy ii, InputIteratorTy ei, size_t dist, WL* wl,
1086  size_t window, unsigned tid) {
1087  ThreadLocalData& local = *data.getLocal();
1088  size_t cur = 0;
1089  size_t k = 0;
1090  safe_advance(ii, tid, cur, dist);
1091  while (ii != ei) {
1092  unsigned long id = k * numActive + tid;
1093  if (id < window)
1094  wl->push(Item(*ii, id));
1095  else
1096  break;
1097  ++k;
1098  safe_advance(ii, numActive, cur, dist);
1099  }
1100 
1101  while (ii != ei) {
1102  unsigned long id = k * numActive + tid;
1103  local.reserve.push(Item(*ii, id));
1104  ++k;
1105  safe_advance(ii, numActive, cur, dist);
1106  }
1107  }
1108 
1109  template <typename InputIteratorTy, typename WL>
1110  void copyAllWithIds(InputIteratorTy ii, InputIteratorTy ei, WL* wl,
1111  size_t window) {
1112  ThreadLocalData& local = *data.getLocal();
1113  for (; ii != ei; ++ii) {
1114  unsigned long id = ii->parent;
1115  if (id < window)
1116  wl->push(Item(ii->val, id));
1117  else
1118  break;
1119  }
1120 
1121  for (; ii != ei; ++ii) {
1122  unsigned long id = ii->parent;
1123  local.reserve.push(Item(ii->val, id));
1124  }
1125  }
1126 
1127  template <typename InputIteratorTy, typename WL>
1128  void copyMineAfterRedistribute(InputIteratorTy ii, InputIteratorTy ei,
1129  size_t dist, WL* wl, size_t window,
1130  unsigned tid) {
1131  if (tid == 0) {
1132  distributeBuf.resize(dist);
1133  }
1134  barrier.wait();
1135  redistribute(ii, ei, dist, window, tid);
1136  barrier.wait();
1137  copyMine(distributeBuf.begin(), distributeBuf.end(), dist, wl, window, tid);
1138  }
1139 
1140  template <typename WL>
1141  void parallelSort(WindowManager<OptionsTy>& wm, WL* wl, unsigned tid) {
1142  ThreadLocalData& local = *data.getLocal();
1143 
1144  local.newItems.clear();
1146  while ((p = this->new_.pop())) {
1147  local.newItems.push_back(*p);
1148  }
1149 
1150  NewItemsIterator ii = local.newItems.begin();
1151  NewItemsIterator ei = local.newItems.end();
1152  std::sort(ii, ei);
1153  initialLimits(ii, ei);
1154  local.size = local.newItems.size();
1155 
1156  barrier.wait();
1157 
1158  if (tid == 0) {
1159  receiveLimits(local);
1160  broadcastLimits(local);
1161  if (!OptionsTy::hasId) {
1162  mergeBuf.reserve(local.size);
1163  merge(0, numActive);
1164  }
1165  }
1166 
1167  barrier.wait();
1168 
1169  if (OptionsTy::hasId) {
1170  size_t window = wm.nextWindow(local.maxId - local.minId,
1171  OptionsTy::MinDelta, local.minId);
1172  copyAllWithIds(ii, ei, wl, window);
1173  } else {
1174  GetNewItem fn(this);
1175  MergeOuterIt bbegin(boost::make_counting_iterator(0), fn);
1176  MergeOuterIt eend(boost::make_counting_iterator((int)numActive), fn);
1177  auto ii = make_two_level_iterator<std::forward_iterator_tag, MergeOuterIt,
1178  typename NewItemsTy::iterator, GetBegin,
1179  GetEnd>(bbegin, eend);
1180 
1181  size_t window = wm.nextWindow(local.size, OptionsTy::MinDelta);
1182  copyMineAfterRedistribute(boost::make_transform_iterator(
1183  ii.first, typename NewItem::GetValue()),
1184  boost::make_transform_iterator(
1185  ii.second, typename NewItem::GetValue()),
1186  local.size, wl, window, tid);
1187  }
1188  }
1189 
1190  void broadcastLimits(ThreadLocalData& local) {
1191  for (unsigned i = 1; i < numActive; ++i) {
1192  ThreadLocalData& other = *data.getRemote(i);
1193  other.minId = local.minId;
1194  other.maxId = local.maxId;
1195  other.size = local.size;
1196  }
1197  }
1198 
1199  void receiveLimits(ThreadLocalData& local) {
1200  for (unsigned i = 1; i < numActive; ++i) {
1201  ThreadLocalData& other = *data.getRemote(i);
1202  local.minId = std::min(other.minId, local.minId);
1203  local.maxId = std::max(other.maxId, local.maxId);
1204  local.size += other.size;
1205  }
1206  }
1207 
1209  template <typename BiIteratorTy>
1210  void initialLimits(BiIteratorTy ii, BiIteratorTy ei) {
1211  ThreadLocalData& local = *data.getLocal();
1212 
1213  local.minId = std::numeric_limits<size_t>::max();
1214  local.maxId = std::numeric_limits<size_t>::min();
1215  local.size = std::distance(ii, ei);
1216 
1217  if (ii != ei) {
1218  if (ii + 1 == ei) {
1219  local.minId = local.maxId = ii->parent;
1220  } else {
1221  local.minId = ii->parent;
1222  local.maxId = (ei - 1)->parent;
1223  }
1224  }
1225  }
1226 
1227  template <typename InputIteratorTy>
1228  void sortInitialWorkDispatch(InputIteratorTy, InputIteratorTy, ...) {}
1229 
1230  template <typename InputIteratorTy, bool HasId = OptionsTy::hasId,
1231  bool HasFixed = OptionsTy::hasFixedNeighborhood>
1232  auto sortInitialWorkDispatch(InputIteratorTy ii, InputIteratorTy ei, int) ->
1233  typename std::enable_if<HasId && !HasFixed, void>::type {
1234  ThreadLocalData& local = *data.getLocal();
1235  size_t dist = std::distance(ii, ei);
1236 
1237  mergeBuf.reserve(dist);
1238  for (; ii != ei; ++ii)
1239  mergeBuf.emplace_back(*ii, this->id(*ii), 1);
1240 
1241  ParallelSTL::sort(mergeBuf.begin(), mergeBuf.end());
1242 
1243  initialLimits(mergeBuf.begin(), mergeBuf.end());
1244  broadcastLimits(local);
1245  }
1246 
1247 public:
1248  NewWorkManager(const OptionsTy& o)
1249  : IdManager<OptionsTy>(o), alloc(&heap), mergeBuf(alloc),
1250  distributeBuf(alloc), barrier(getBarrier(activeThreads)) {
1251  numActive = getActiveThreads();
1252  }
1253 
1254  bool emptyReserve() { return data.getLocal()->reserve.empty(); }
1255 
1256  template <typename WL>
1257  void pushNextWindow(WL* wl, size_t window) {
1258  ThreadLocalData& local = *data.getLocal();
1260  while ((p = local.reserve.peek())) {
1261  if (p->id >= window)
1262  break;
1263  wl->push(*p);
1264  local.reserve.pop();
1265  }
1266  }
1267 
1268  void clearNewWork() { data.getLocal()->heap.clear(); }
1269 
1270  template <typename InputIteratorTy>
1271  void sortInitialWork(InputIteratorTy ii, InputIteratorTy ei) {
1272  return sortInitialWorkDispatch(ii, ei, 0);
1273  }
1274 
1275  template <typename InputIteratorTy, typename WL>
1276  void addInitialWork(WindowManager<OptionsTy>& wm, InputIteratorTy b,
1277  InputIteratorTy e, WL* wl) {
1278  size_t dist = std::distance(b, e);
1279  if (OptionsTy::hasId) {
1280  ThreadLocalData& local = *data.getLocal();
1281  size_t window = wm.initialWindow(dist, OptionsTy::MinDelta, local.minId);
1282  if (OptionsTy::hasFixedNeighborhood) {
1283  copyMine(b, e, dist, wl, window, substrate::ThreadPool::getTID());
1284  } else {
1285  copyMine(boost::make_transform_iterator(mergeBuf.begin(),
1286  typename NewItem::GetValue()),
1287  boost::make_transform_iterator(mergeBuf.end(),
1288  typename NewItem::GetValue()),
1289  mergeBuf.size(), wl, window, substrate::ThreadPool::getTID());
1290  }
1291  } else {
1292  size_t window = wm.initialWindow(dist, OptionsTy::MinDelta);
1293  copyMineAfterRedistribute(b, e, dist, wl, window,
1295  }
1296  }
1297 
1298  template <bool HasId = OptionsTy::hasId>
1299  auto pushNew(const value_type& val, unsigned long, unsigned) ->
1300  typename std::enable_if<HasId, void>::type {
1301  new_.push(NewItem(val, this->id(val), 1));
1302  }
1303 
1304  template <bool HasId = OptionsTy::hasId>
1305  auto pushNew(const value_type& val, unsigned long parent, unsigned count) ->
1306  typename std::enable_if<!HasId, void>::type {
1307  new_.push(NewItem(val, parent, count));
1308  }
1309 
1310  template <typename WL>
1311  void distributeNewWork(WindowManager<OptionsTy>& wm, WL* wl) {
1312  parallelSort(wm, wl, substrate::ThreadPool::getTID());
1313  }
1314 };
1315 
1316 template <typename OptionsTy>
1317 class Executor : public BreakManager<OptionsTy>,
1318  public StateManager<OptionsTy>,
1319  public NewWorkManager<OptionsTy>,
1320  public WindowManager<OptionsTy>,
1321  public DAGManager<OptionsTy>,
1322  public IntentToReadManager<OptionsTy> {
1323  typedef typename OptionsTy::value_type value_type;
1324  typedef DItem<OptionsTy> Item;
1325  typedef DeterministicContext<OptionsTy> Context;
1326 
1327  typedef worklists::PerSocketChunkFIFO<OptionsTy::ChunkSize, Item> WL;
1328  typedef worklists::PerSocketChunkFIFO<OptionsTy::ChunkSize, Context>
1329  PendingWork;
1330  typedef worklists::ChunkFIFO<OptionsTy::ChunkSize, Context, false>
1331  LocalPendingWork;
1332 
1333  // Truly thread-local
1334  using LoopStat = LoopStatistics<OptionsTy::needStats>;
1335  struct ThreadLocalData : public LoopStat {
1336 
1337  typename OptionsTy::function1_type fn1;
1338  typename OptionsTy::function2_type fn2;
1339  LocalPendingWork localPending;
1340  UserContextAccess<value_type> facing;
1341 
1342  WL* wlcur;
1343  WL* wlnext;
1344  size_t rounds;
1345  size_t outerRounds;
1346  bool hasNewWork;
1347  ThreadLocalData(const OptionsTy& o, const char* loopname)
1348  : LoopStat(loopname), fn1(o.fn1), fn2(o.fn2), rounds(0),
1349  outerRounds(0) {}
1350  };
1351 
1352  OptionsTy options;
1353  substrate::Barrier& barrier;
1354  WL worklists[2];
1355  PendingWork pending;
1356  const char* loopname;
1357  substrate::CacheLineStorage<volatile long> innerDone;
1358  substrate::CacheLineStorage<volatile long> outerDone;
1359  substrate::CacheLineStorage<volatile long> hasNewWork;
1360 
1361  int runFunction(ThreadLocalData& tld, Context* ctx);
1362 
1363  bool pendingLoop(ThreadLocalData& tld);
1364  bool commitLoop(ThreadLocalData& tld);
1365  void go();
1366 
1367  void drainPending(ThreadLocalData& tld) {
1368  Context* ctx;
1369  while ((ctx = this->peekContext(tld.localPending, pending))) {
1370  ctx->clear();
1371  this->popContext(tld.localPending, pending);
1372  }
1373  }
1374 
1375 public:
1376  Executor(const OptionsTy& o)
1377  : BreakManager<OptionsTy>(o), NewWorkManager<OptionsTy>(o), options(o),
1378  barrier(getBarrier(activeThreads)),
1379  loopname(galois::internal::getLoopName(o.args)) {
1380  static_assert(!OptionsTy::needsBreak || OptionsTy::hasBreak,
1381  "need to use break function to break loop");
1382  }
1383 
1384  bool executeTask(ThreadLocalData& tld, Context* ctx);
1385 
1386  template <typename RangeTy>
1387  void initThread(const RangeTy& range) {
1388  Context::initialize();
1389  this->addInitialWork(*this, range.begin(), range.end(), &worklists[1]);
1390  }
1391 
1392  template <typename RangeTy>
1393  void init(const RangeTy& range) {
1394  this->sortInitialWork(range.begin(), range.end());
1395  }
1396 
1397  void operator()() { go(); }
1398 };
1399 
1400 template <typename OptionsTy>
1401 void Executor<OptionsTy>::go() {
1402  ThreadLocalData tld(options, loopname);
1403  auto& local = this->getLocalWindowManager();
1404  tld.wlcur = &worklists[0];
1405  tld.wlnext = &worklists[1];
1406 
1407  tld.hasNewWork = false;
1408 
1409  while (true) {
1410  ++tld.outerRounds;
1411 
1412  while (true) {
1413  ++tld.rounds;
1414 
1415  std::swap(tld.wlcur, tld.wlnext);
1416  bool nextPending = pendingLoop(tld);
1417  innerDone.get() = true;
1418 
1419  barrier.wait();
1420 
1421  if (this->buildDAG())
1422  barrier.wait();
1423 
1424  if (this->buildIntentToRead())
1425  barrier.wait();
1426 
1427  bool nextCommit = false;
1428  outerDone.get() = true;
1429 
1430  if (this->executeDAG(*this, tld)) {
1431  if (OptionsTy::needsBreak)
1432  barrier.wait();
1433  drainPending(tld);
1434  break;
1435  }
1436 
1437  nextCommit = commitLoop(tld);
1438 
1439  if (nextPending || nextCommit)
1440  innerDone.get() = false;
1441 
1442  barrier.wait();
1443 
1444  if (innerDone.get())
1445  break;
1446 
1447  this->calculateWindow(true);
1448 
1449  barrier.wait();
1450 
1451  this->pushNextWindow(tld.wlnext, local.nextWindow());
1452  }
1453 
1454  if (!this->emptyReserve())
1455  outerDone.get() = false;
1456 
1457  if (tld.hasNewWork)
1458  hasNewWork.get() = true;
1459 
1460  if (this->checkBreak())
1461  break;
1462 
1463  barrier.wait();
1464 
1465  if (outerDone.get()) {
1466  if (!OptionsTy::needsPush)
1467  break;
1468  if (!hasNewWork.get()) // (1)
1469  break;
1470  this->distributeNewWork(*this, tld.wlnext);
1471  tld.hasNewWork = false;
1472  // NB: assumes that distributeNewWork has a barrier otherwise checking at
1473  // (1) is erroneous
1474  hasNewWork.get() = false;
1475  } else {
1476  this->calculateWindow(false);
1477 
1478  this->pushNextWindow(tld.wlnext, local.nextWindow());
1479  }
1480  }
1481 
1482  this->destroyDAGManager();
1483  this->clearNewWork();
1484 
1485  if (OptionsTy::needStats) {
1486  if (substrate::ThreadPool::getTID() == 0) {
1487  reportStat_Single(loopname, "RoundsExecuted", tld.rounds);
1488  reportStat_Single(loopname, "OuterRoundsExecuted", tld.outerRounds);
1489  }
1490  }
1491 }
1492 
1493 template <typename OptionsTy>
1494 int Executor<OptionsTy>::runFunction(ThreadLocalData& tld, Context* ctx) {
1495  int result = 0;
1496 #ifdef GALOIS_USE_LONGJMP_ABORT
1497  if ((result = setjmp(execFrame)) == 0) {
1498 #elif defined(GALOIS_USE_EXCEPTION_ABORT)
1499  try {
1500 #endif
1501  tld.fn1(ctx->item.val, tld.facing.data());
1502 #ifdef GALOIS_USE_LONGJMP_ABORT
1503  } else {
1504  clearConflictLock();
1505  }
1506 #elif defined(GALOIS_USE_EXCEPTION_ABORT)
1507  } catch (const ConflictFlag& flag) {
1508  clearConflictLock();
1509  result = flag;
1510  }
1511 #endif
1512  return result;
1513 }
1514 
1515 template <typename OptionsTy>
1516 bool Executor<OptionsTy>::pendingLoop(ThreadLocalData& tld) {
1517  auto& local = this->getLocalWindowManager();
1518  bool retval = false;
1520  while ((p = tld.wlcur->pop())) {
1521  // Use a new context for each item because there is a race when reusing
1522  // between aborted iterations.
1523  Context* ctx = this->emplaceContext(tld.localPending, pending, *p);
1524  this->pushDAGTask(ctx);
1525  local.incrementIterations();
1526  bool commit = true;
1527 
1528  ctx->startIteration();
1529  ctx->setFirstPass();
1530  tld.inc_iterations();
1531  tld.facing.setFirstPass();
1532  setThreadContext(ctx);
1533 
1534  this->allocLocalState(tld.facing, tld.fn2);
1535  int result = runFunction(tld, ctx);
1536  // FIXME: clearReleasable();
1537  tld.facing.resetFirstPass();
1538  ctx->resetFirstPass();
1539  switch (result) {
1540  case 0:
1541  case REACHED_FAILSAFE:
1542  break;
1543  case CONFLICT:
1544  commit = false;
1545  break;
1546  default:
1547  abort();
1548  break;
1549  }
1550 
1551  // TODO only needed if fn1 needs pia
1552  if (OptionsTy::needsPia && !OptionsTy::useLocalState)
1553  tld.facing.resetAlloc();
1554 
1555  if (commit || OptionsTy::hasFixedNeighborhood) {
1556  this->saveLocalState(tld.facing, ctx->item);
1557  } else {
1558  retval = true;
1559  }
1560  }
1561 
1562  return retval;
1563 }
1564 
1565 template <typename OptionsTy>
1566 bool Executor<OptionsTy>::executeTask(ThreadLocalData& tld, Context* ctx) {
1567  setThreadContext(ctx);
1568  this->restoreLocalState(tld.facing, ctx->item);
1569  tld.facing.resetFirstPass();
1570  ctx->resetFirstPass();
1571  int result = 0;
1572 #ifdef GALOIS_USE_LONGJMP_ABORT
1573  if ((result = setjmp(execFrame)) == 0) {
1574 #elif defined(GALOIS_USE_EXCEPTION_ABORT)
1575  try {
1576 #endif
1577  tld.fn2(ctx->item.val, tld.facing.data());
1578 #ifdef GALOIS_USE_LONGJMP_ABORT
1579  } else {
1580  clearConflictLock();
1581  }
1582 #elif defined(GALOIS_USE_EXCEPTION_ABORT)
1583  } catch (const ConflictFlag& flag) {
1584  clearConflictLock();
1585  result = flag;
1586  }
1587 #endif
1588  // FIXME: clearReleasable();
1589  switch (result) {
1590  case 0:
1591  break;
1592  case CONFLICT:
1593  return false;
1594  break;
1595  default:
1596  GALOIS_DIE("unknown conflict flag");
1597  break;
1598  }
1599 
1600  if (OptionsTy::needsPush) {
1601  unsigned long parent = ctx->item.id;
1602  // typedef typename UserContextAccess<value_type>::PushBufferTy::iterator
1603  // iterator;
1604  unsigned count = 0;
1605  for (auto& item : tld.facing.getPushBuffer()) {
1606  this->pushNew(item, parent, ++count);
1607  if (count == 0) {
1608  GALOIS_DIE("counter overflow");
1609  }
1610  }
1611  if (count)
1612  tld.hasNewWork = true;
1613  }
1614  assert(OptionsTy::needsPush || tld.facing.getPushBuffer().begin() ==
1615  tld.facing.getPushBuffer().end());
1616 
1617  return true;
1618 }
1619 
1620 template <typename OptionsTy>
1621 bool Executor<OptionsTy>::commitLoop(ThreadLocalData& tld) {
1622  bool retval = false;
1623  auto& local = this->getLocalWindowManager();
1624 
1625  Context* ctx;
1626  while ((ctx = this->peekContext(tld.localPending, pending))) {
1627  bool commit = false;
1628  if (ctx->isReady())
1629  commit = executeTask(tld, ctx);
1630 
1631  if (commit) {
1632  ctx->commitIteration();
1633  local.incrementCommitted();
1634  } else {
1635  this->reuseItem(ctx->item);
1636  tld.wlnext->push(ctx->item);
1637  tld.inc_conflicts();
1638  retval = true;
1639  ctx->cancelIteration();
1640  }
1641 
1642  this->deallocLocalState(tld.facing);
1643 
1644  if (OptionsTy::needsPia && !OptionsTy::useLocalState)
1645  tld.facing.resetAlloc();
1646 
1647  tld.facing.resetPushBuffer();
1648  ctx->clear();
1649  this->popContext(tld.localPending, pending);
1650  }
1651 
1652  if (OptionsTy::needsPia && OptionsTy::useLocalState)
1653  tld.facing.resetAlloc();
1654 
1655  setThreadContext(0);
1656 
1657  return retval;
1658 }
1659 
1660 } // namespace internal
1661 } // namespace runtime
1662 
1663 namespace worklists {
1664 
1668 template <typename T = int>
1670  template <bool _concurrent>
1672 
1673  template <typename _T>
1675 
1676  typedef T value_type;
1677 };
1678 
1679 } // namespace worklists
1680 
1681 namespace runtime {
1682 
1683 template <class T, class FunctionTy, class ArgsTy>
1684 struct ForEachExecutor<worklists::Deterministic<T>, FunctionTy, ArgsTy>
1685  : public internal::Executor<internal::Options<T, FunctionTy, ArgsTy>> {
1686  typedef internal::Options<T, FunctionTy, ArgsTy> OptionsTy;
1687  typedef internal::Executor<OptionsTy> SuperTy;
1688  ForEachExecutor(FunctionTy f, const ArgsTy& args)
1689  : SuperTy(OptionsTy(f, args)) {}
1690 };
1691 
1692 } // namespace runtime
1693 
1694 } // namespace galois
1695 #endif
Deterministic execution.
Definition: Executor_Deterministic.h:1669
static SimpleRuntimeContext * getOwner(Lockable *lockable)
Definition: libgalois/include/galois/runtime/Context.h:143
void setOwner(Lockable *lockable)
Definition: libgalois/include/galois/runtime/Context.h:112
galois::runtime::ExternalHeapAllocator< char, IterAllocBaseTy > PerIterAllocTy
Per-iteration allocator that conforms to STL allocator interface.
Definition: Mem.h:36
Intrusive union-find implementation.
Definition: UnionFind.h:33
UnionFindNode(ReaderContext *s)
Definition: UnionFind.h:49
galois::runtime::BumpWithMallocHeap< galois::runtime::FreeListHeap< galois::runtime::SystemHeap > > IterAllocBaseTy
[PerIterAllocTy example] Base allocator for per-iteration allocator
Definition: Mem.h:32
void setThreadContext(SimpleRuntimeContext *n)
used by the parallel code to set up conflict detection per thread
Definition: Context.cpp:31
unsigned int getActiveThreads() noexcept
Returns the number of threads in use.
Definition: Threads.cpp:37
static SizedHeap * getHeapForSize(const size_t)
[FixedSizeAllocator example]
Definition: Mem.cpp:36
thread_local std::jmp_buf execFrame
Definition: Context.cpp:29
substrate::Barrier & getBarrier(unsigned activeThreads)
Have a pre-instantiated barrier available for use.
Definition: Substrate.cpp:24
void init(const RangeTy &range)
Definition: Executor_ParaMeter.h:409
T value_type
Definition: Executor_Deterministic.h:1676
void initThread(const RangeTy &) const
Definition: Executor_ParaMeter.h:415
const char * loopname
Definition: Executor_ParaMeter.h:145
#define GALOIS_DIE(...)
Definition: gIO.h:96
TerminationDetection & getSystemTermination(unsigned activeThreads)
Definition: Termination.cpp:35
void addToNhood(Lockable *lockable)
Definition: libgalois/include/galois/runtime/Context.h:150
bool operator!=(const flat_map< _Key, _Tp, _Compare, _Alloc > &__x, const flat_map< _Key, _Tp, _Compare, _Alloc > &__y)
Based on operator==.
Definition: FlatMap.h:344
void sort(RandomAccessIterator first, RandomAccessIterator last, Compare comp)
Definition: ParallelSTL.h:247
internal::Executor< OptionsTy > SuperTy
Definition: Executor_Deterministic.h:1687
Galois version of boost::optional.
Definition: optional.h:34
Definition: gslist.h:35
const ReaderContext * find() const
Definition: UnionFind.h:60
s_wl< T, Args...> wl(Args &&...args)
Definition: Traits.h:219
SimpleRuntimeContext(bool child=false)
Definition: libgalois/include/galois/runtime/Context.h:172
void reportStat_Single(const S1 &region, const S2 &category, const T &value)
Definition: Statistics.h:544
const Ty max(std::atomic< Ty > &a, const Ty &b)
Definition: AtomicHelpers.h:40
static unsigned getTID()
Definition: ThreadPool.h:204
virtual void subAcquire(Lockable *lockable, galois::MethodFlag m)
Definition: Context.cpp:96
bool stealByCAS(Lockable *lockable, LockManagerBase *other)
Definition: libgalois/include/galois/runtime/Context.h:102
void gPrint(Args &&...args)
Prints a sequence of things.
Definition: gIO.h:47
unsigned int activeThreads
Definition: Threads.cpp:26
std::pair< TwoLevelIteratorA< OuterIter, InnerIter, CategoryOrTraversal, InnerBeginFn, InnerEndFn >, TwoLevelIteratorA< OuterIter, InnerIter, CategoryOrTraversal, InnerBeginFn, InnerEndFn > > make_two_level_iterator(OuterIter outer_begin, OuterIter outer_end)
Definition: TwoLevelIteratorA.h:420
ConflictFlag
Definition: libgalois/include/galois/runtime/Context.h:41
MethodFlag
What should the runtime do when executing a method.
Definition: MethodFlags.h:34
void * alloc()
Definition: PerThreadStorage.cpp:42
void operator()(void)
Definition: Executor_ParaMeter.h:417
internal::Options< T, FunctionTy, ArgsTy > OptionsTy
Definition: Executor_Deterministic.h:1686
const Ty min(std::atomic< Ty > &a, const Ty &b)
Definition: AtomicHelpers.h:70
static bool tryLock(Lockable *lockable)
Definition: libgalois/include/galois/runtime/Context.h:124
Definition: Executor_ForEach.h:150
ThreadPrivateHeap< FreeListHeap< BumpHeap< SystemHeap > > > SizedHeap
[FixedSizeAllocator example]
Definition: runtime/Mem.h:609
IterTy safe_advance(IterTy b, IterTy e, Distance n)
Like std::advance but returns end if end is closer than the advance amount.
Definition: gstl.h:222
bool operator==(const flat_map< _Key, _Tp, _Compare, _Alloc > &__x, const flat_map< _Key, _Tp, _Compare, _Alloc > &__y)
Definition: FlatMap.h:331
Definition: libgalois/include/galois/runtime/Context.h:42
ForEachExecutor(FunctionTy f, const ArgsTy &args)
Definition: Executor_Deterministic.h:1688
Wrapper< T, std::deque< T >, false > FIFO
Definition: Simple.h:83
constexpr auto get_trait_value(Tuple tpl)
Returns the value associated with the given trait T in a tuple.
Definition: Traits.h:112
Like std::deque but use Galois memory management functionality.
Definition: gdeque.h:43
T value_type
Definition: Executor_ParaMeter.h:111
Definition: libgalois/include/galois/runtime/Context.h:44
unsigned commitIteration()
Definition: Context.cpp:77