Galois
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
Executor_ForEach.h
Go to the documentation of this file.
1 /*
2  * This file belongs to the Galois project, a C++ library for exploiting
3  * parallelism. The code is being released under the terms of the 3-Clause BSD
4  * License (a copy is located in LICENSE.txt at the top-level directory).
5  *
6  * Copyright (C) 2018, The University of Texas at Austin. All rights reserved.
7  * UNIVERSITY EXPRESSLY DISCLAIMS ANY AND ALL WARRANTIES CONCERNING THIS
8  * SOFTWARE AND DOCUMENTATION, INCLUDING ANY WARRANTIES OF MERCHANTABILITY,
9  * FITNESS FOR ANY PARTICULAR PURPOSE, NON-INFRINGEMENT AND WARRANTIES OF
10  * PERFORMANCE, AND ANY WARRANTY THAT MIGHT OTHERWISE ARISE FROM COURSE OF
11  * DEALING OR USAGE OF TRADE. NO WARRANTY IS EITHER EXPRESS OR IMPLIED WITH
12  * RESPECT TO THE USE OF THE SOFTWARE OR DOCUMENTATION. Under no circumstances
13  * shall University be liable for incidental, special, indirect, direct or
14  * consequential damages or loss of profits, interruption of business, or
15  * related expenses which may arise from use of Software or Documentation,
16  * including but not limited to those resulting from defects in Software and/or
17  * Documentation, or loss or inaccuracy of data of any kind.
18  */
19 
20 #ifndef GALOIS_RUNTIME_EXECUTOR_FOREACH_H
21 #define GALOIS_RUNTIME_EXECUTOR_FOREACH_H
22 
23 #include <algorithm>
24 #include <functional>
25 #include <memory>
26 #include <utility>
27 
28 #include "galois/config.h"
29 #include "galois/gIO.h"
30 #include "galois/Mem.h"
31 #include "galois/runtime/Context.h"
34 #include "galois/runtime/Range.h"
41 #include "galois/Threads.h"
42 #include "galois/Timer.h"
43 #include "galois/Traits.h"
44 #include "galois/worklists/Chunk.h"
46 
47 namespace galois {
49 namespace runtime {
50 
51 template <typename value_type>
52 class AbortHandler {
53  struct Item {
54  value_type val;
55  int retries;
56  };
57 
60  bool useBasicPolicy;
61 
65  void basicPolicy(const Item& item) {
66  auto& tp = substrate::getThreadPool();
67  unsigned socket = tp.getSocket();
68  queues.getRemote(tp.getLeaderForSocket(socket / 2))->push(item);
69  }
70 
75  void doublePolicy(const Item& item) {
76  int retries = item.retries - 1;
77  if ((retries & 1) == 1) {
78  queues.getLocal()->push(item);
79  return;
80  }
81 
82  unsigned tid = substrate::ThreadPool::getTID();
83  auto& tp = substrate::getThreadPool();
84  unsigned socket = substrate::ThreadPool::getSocket();
85  unsigned leader = substrate::ThreadPool::getLeader();
86  if (tid != leader) {
87  unsigned next = leader + (tid - leader) / 2;
88  queues.getRemote(next)->push(item);
89  } else {
90  queues.getRemote(tp.getLeaderForSocket(socket / 2))->push(item);
91  }
92  }
93 
98  void boundedPolicy(const Item& item) {
99  int retries = item.retries - 1;
100  if (retries < 2) {
101  queues.getLocal()->push(item);
102  return;
103  }
104 
105  unsigned tid = substrate::ThreadPool::getTID();
106  auto& tp = substrate::getThreadPool();
107  unsigned socket = substrate::ThreadPool::getSocket();
108  unsigned leader = tp.getLeaderForSocket(socket);
109  if (retries < 5 && tid != leader) {
110  unsigned next = leader + (tid - leader) / 2;
111  queues.getRemote(next)->push(item);
112  } else {
113  queues.getRemote(tp.getLeaderForSocket(socket / 2))->push(item);
114  }
115  }
116 
120  void eagerPolicy(const Item& item) { queues.getLocal()->push(item); }
121 
122 public:
124  // XXX(ddn): Implement smarter adaptive policy
125  useBasicPolicy = substrate::getThreadPool().getMaxSockets() > 2;
126  }
127 
128  value_type& value(Item& item) const { return item.val; }
129  value_type& value(value_type& val) const { return val; }
130 
131  void push(const value_type& val) {
132  Item item = {val, 1};
133  queues.getLocal()->push(item);
134  }
135 
136  void push(const Item& item) {
137  Item newitem = {item.val, item.retries + 1};
138  if (useBasicPolicy)
139  basicPolicy(newitem);
140  else
141  doublePolicy(newitem);
142  }
143 
144  AbortedList* getQueue() { return queues.getLocal(); }
145 };
146 
147 // TODO(ddn): Implement wrapper to allow calling without UserContext
148 // TODO(ddn): Check for operators that implement both with and without context
149 template <class WorkListTy, class FunctionTy, typename ArgsTy>
151 public:
152  static constexpr bool needStats = galois::internal::NeedStats<ArgsTy>::value;
153  static constexpr bool needsPush = !has_trait<no_pushes_tag, ArgsTy>();
154  static constexpr bool needsAborts = !has_trait<no_conflicts_tag, ArgsTy>();
155  static constexpr bool needsPia = has_trait<per_iter_alloc_tag, ArgsTy>();
156  static constexpr bool needsBreak = has_trait<parallel_break_tag, ArgsTy>();
157  static constexpr bool MORE_STATS =
158  needStats && has_trait<more_stats_tag, ArgsTy>();
159 
160 protected:
162 
165  FunctionTy function;
167 
168  explicit ThreadLocalBasics(FunctionTy fn) : facing(), function(fn), ctx() {}
169  };
170 
172 
173  struct ThreadLocalData : public ThreadLocalBasics, public LoopStat {
174 
175  ThreadLocalData(FunctionTy fn, const char* ln)
176  : ThreadLocalBasics(fn), LoopStat(ln) {}
177  };
178 
179  // RunQueueState factors out state within runQueue iterations to protect it
180  // from being overwritten when using longjmp/setjmp.
181  template <typename WL>
182  struct RunQueueState {
183  unsigned int num = 0;
185  };
186 
187  // NB: Place dynamically growing wl after fixed-size PerThreadStorage
188  // members to give higher likelihood of reclaiming PerThreadStorage
189 
193 
195  FunctionTy origFunction;
196  const char* loopname;
197  bool broke;
198 
201 
202  inline void commitIteration(ThreadLocalData& tld) {
203  if (needsPush) {
204  // auto ii = tld.facing.getPushBuffer().begin();
205  // auto ee = tld.facing.getPushBuffer().end();
206  auto& pb = tld.facing.getPushBuffer();
207  auto n = pb.size();
208  if (n) {
209  tld.inc_pushes(n);
210  wl.push(pb.begin(), pb.end());
211  pb.clear();
212  }
213  }
214  if (needsPia)
215  tld.facing.resetAlloc();
216  if (needsAborts)
217  tld.ctx.commitIteration();
218  //++tld.stat_commits;
219  }
220 
221  template <typename Item>
223  ThreadLocalData& tld) {
224  assert(needsAborts);
225  tld.ctx.cancelIteration();
226  tld.inc_conflicts();
227  aborted.push(item);
228  // clear push buffer
229  if (needsPush)
230  tld.facing.resetPushBuffer();
231  // reset allocator
232  if (needsPia)
233  tld.facing.resetAlloc();
234  }
235 
236  inline void doProcess(value_type& val, ThreadLocalData& tld) {
237  if (needsAborts)
238  tld.ctx.startIteration();
239 
240  tld.inc_iterations();
241  tld.function(val, tld.facing.data());
242  commitIteration(tld);
243  }
244 
247  bool didWork = false;
248  while ((p = wl.pop())) {
249  didWork = true;
250  doProcess(*p, tld);
251  }
252  return didWork;
253  }
254 
255  template <unsigned int limit, typename WL>
257 #ifdef GALOIS_USE_LONGJMP_ABORT
258  if (setjmp(execFrame) == 0) {
259  while ((!limit || s.num < limit) && (s.item = lwl.pop())) {
260  ++s.num;
261  doProcess(aborted.value(*s.item), tld);
262  }
263  } else {
264  clearConflictLock();
265  abortIteration(*s.item, tld);
266  }
267 #elif defined(GALOIS_USE_EXCEPTION_ABORT)
268  try {
269  while ((!limit || s.num < limit) && (s.item = lwl.pop())) {
270  ++s.num;
271  doProcess(aborted.value(*s.item), tld);
272  }
273  } catch (ConflictFlag const& flag) {
274  clearConflictLock();
275  abortIteration(*s.item, tld);
276  }
277 #endif
278  }
279 
280  template <unsigned int limit, typename WL>
281  bool runQueue(ThreadLocalData& tld, WL& lwl) {
283  runQueueDispatch<limit>(tld, lwl, s);
284  return s.num > 0;
285  }
286 
289  return runQueue<0>(tld, *aborted.getQueue());
290  }
291 
293  wl.push(x.begin(), x.end());
294  x.clear();
295  }
296 
297  bool checkEmpty(WorkListTy&, ThreadLocalData&, ...) { return true; }
298 
299  template <typename WL>
300  auto checkEmpty(WL& wl, ThreadLocalData&, int)
301  -> decltype(wl.empty(), bool()) {
302  return wl.empty();
303  }
304 
305  template <bool couldAbort, bool isLeader>
306  void go() {
307 
308  execTime.start();
309 
310  // Thread-local data goes on the local stack to be NUMA friendly
312  if (needsBreak)
313  tld.facing.setBreakFlag(&broke);
314  if (couldAbort)
315  setThreadContext(&tld.ctx);
316  if (needsPush && !couldAbort)
318  std::placeholders::_1));
319 
320  while (true) {
321  do {
322  bool didWork = false;
323 
324  // Run some iterations
325  if (couldAbort || needsBreak) {
326  constexpr int __NUM = (needsBreak || isLeader) ? 64 : 0;
327  bool b = runQueue<__NUM>(tld, wl);
328  didWork = b || didWork;
329  // Check for abort
330  if (couldAbort) {
331  b = handleAborts(tld);
332  didWork = b || didWork;
333  }
334  } else { // No try/catch
335  bool b = runQueueSimple(tld);
336  didWork = b || didWork;
337  }
338 
339  // Update node color and prop token
340  term.localTermination(didWork);
341  substrate::asmPause(); // Let token propagate
342  } while (!term.globalTermination() && (!needsBreak || !broke));
343 
344  if (checkEmpty(wl, tld, 0)) {
345  execTime.stop();
346  break;
347  }
348 
349  if (needsBreak && broke) {
350  execTime.stop();
351  break;
352  }
353 
355  barrier.wait();
356  }
357 
358  if (couldAbort)
359  setThreadContext(0);
360  }
361 
362  struct T1 {};
363  struct T2 {};
364 
365  template <typename... WArgsTy>
366  ForEachExecutor(T2, FunctionTy f, const ArgsTy& args, WArgsTy... wargs)
367  : term(substrate::getSystemTermination(activeThreads)),
368  barrier(getBarrier(activeThreads)), wl(std::forward<WArgsTy>(wargs)...),
369  origFunction(f), loopname(galois::internal::getLoopName(args)),
370  broke(false), initTime(loopname, "Init"),
371  execTime(loopname, "Execute") {}
372 
373  template <typename WArgsTy, size_t... Is>
374  ForEachExecutor(T1, FunctionTy f, const ArgsTy& args, const WArgsTy& wlargs,
375  std::index_sequence<Is...>)
376  : ForEachExecutor(T2{}, f, args, std::get<Is>(wlargs)...) {}
377 
378  template <typename WArgsTy>
379  ForEachExecutor(T1, FunctionTy f, const ArgsTy& args, const WArgsTy&,
380  std::index_sequence<>)
381  : ForEachExecutor(T2{}, f, args) {}
382 
383 public:
384  ForEachExecutor(FunctionTy f, const ArgsTy& args)
385  : ForEachExecutor(T1{}, f, args, get_trait_value<wl_tag>(args).args,
386  std::make_index_sequence<std::tuple_size<decltype(
387  get_trait_value<wl_tag>(args).args)>::value>{}) {}
388 
389  template <typename RangeTy>
390  void init(const RangeTy&) {}
391 
392  template <typename RangeTy>
393  void initThread(const RangeTy& range) {
394 
395  initTime.start();
396 
397  wl.push_initial(range);
399 
400  initTime.stop();
401  }
402 
403  void operator()() {
404  bool isLeader = substrate::ThreadPool::isLeader();
405  bool couldAbort = needsAborts && activeThreads > 1;
406  if (couldAbort && isLeader)
407  go<true, true>();
408  else if (couldAbort && !isLeader)
409  go<true, false>();
410  else if (!couldAbort && isLeader)
411  go<false, true>();
412  else
413  go<false, false>();
414  }
415 };
416 
417 template <typename WLTy>
418 constexpr auto has_with_iterator(int) -> decltype(
419  std::declval<typename WLTy::template with_iterator<int*>::type>(), bool()) {
420  return true;
421 }
422 
423 template <typename>
424 constexpr auto has_with_iterator(...) -> bool {
425  return false;
426 }
427 
428 template <typename WLTy, typename IterTy, typename Enable = void>
429 struct reiterator {
430  typedef WLTy type;
431 };
432 
433 template <typename WLTy, typename IterTy>
434 struct reiterator<WLTy, IterTy,
435  typename std::enable_if<has_with_iterator<WLTy>(0)>::type> {
436  typedef typename WLTy::template with_iterator<IterTy>::type type;
437 };
438 
439 // TODO(ddn): Think about folding in range into args too
440 template <typename RangeTy, typename FunctionTy, typename ArgsTy>
441 void for_each_impl(const RangeTy& range, FunctionTy&& fn, const ArgsTy& args) {
443  value_type;
444  typedef typename get_trait_type<wl_tag, ArgsTy>::type::type BaseWorkListTy;
446  type ::template retype<value_type>
447  WorkListTy;
448  using FuncRefType =
451 
452  auto& barrier = getBarrier(activeThreads);
453  FuncRefType fn_ref = fn;
454  WorkTy W(fn_ref, args);
455  W.init(range);
457  activeThreads, [&W, &range]() { W.initThread(range); }, std::ref(barrier),
458  std::ref(W));
459 }
460 
461 // TODO: Need to decide whether user should provide num_run tag or
462 // num_run can be provided by loop instance which is guaranteed to be unique
463 
465 template <typename RangeTy, typename FunctionTy, typename TupleTy>
466 void for_each_gen(const RangeTy& r, FunctionTy&& fn, const TupleTy& tpl) {
467  static_assert(!has_trait<char*, TupleTy>(), "old loopname");
468  static_assert(!has_trait<char const*, TupleTy>(), "old loopname");
469  static_assert(!has_trait<bool, TupleTy>(), "old steal");
470 
471  auto ftpl = std::tuple_cat(tpl, typename function_traits<FunctionTy>::type{});
472 
473  auto xtpl = std::tuple_cat(
474  ftpl, get_default_trait_values(tpl, std::make_tuple(wl_tag{}),
475  std::make_tuple(wl<defaultWL>())));
476 
477  constexpr bool TIME_IT = has_trait<loopname_tag, decltype(xtpl)>();
478  CondStatTimer<TIME_IT> timer(galois::internal::getLoopName(xtpl));
479 
480  timer.start();
481 
482  runtime::for_each_impl(r, std::forward<FunctionTy>(fn), xtpl);
483 
484  timer.stop();
485 }
486 
487 } // end namespace runtime
488 } // end namespace galois
489 #endif
ForEachExecutor(T1, FunctionTy f, const ArgsTy &args, const WArgsTy &, std::index_sequence<>)
Definition: Executor_ForEach.h:379
Definition: Executor_ForEach.h:362
unsigned cancelIteration()
Definition: Context.cpp:92
ForEachExecutor(T2, FunctionTy f, const ArgsTy &args, WArgsTy...wargs)
Definition: Executor_ForEach.h:366
virtual void initializeThread()=0
Initializes the per-thread state.
virtual void localTermination(bool workHappened)=0
Process termination locally.
ForEachExecutor(FunctionTy f, const ArgsTy &args)
Definition: Executor_ForEach.h:384
Definition: LoopStatistics.h:31
FunctionTy function
Definition: Executor_ForEach.h:165
PushBufferTy & getPushBuffer()
Definition: UserContextAccess.h:38
ThreadPool & getThreadPool(void)
return a reference to system thread pool
Definition: ThreadPool.cpp:259
void run(unsigned num, Args &&...args)
execute work on all threads a simple wrapper for run
Definition: ThreadPool.h:142
void go()
Definition: Executor_ForEach.h:306
void setThreadContext(SimpleRuntimeContext *n)
used by the parallel code to set up conflict detection per thread
Definition: Context.cpp:31
GALOIS_ATTRIBUTE_NOINLINE bool handleAborts(ThreadLocalData &tld)
Definition: Executor_ForEach.h:288
static unsigned getLeader()
Definition: ThreadPool.h:206
void stop()
Definition: ThreadTimer.h:61
static constexpr bool MORE_STATS
Definition: Executor_ForEach.h:157
void for_each_impl(const RangeTy &range, FunctionTy &&fn, const ArgsTy &args)
Definition: Executor_ForEach.h:441
static constexpr bool needsAborts
Definition: Executor_ForEach.h:154
thread_local std::jmp_buf execFrame
Definition: Context.cpp:29
ThreadLocalData(FunctionTy fn, const char *ln)
Definition: Executor_ForEach.h:175
bool globalTermination() const
Returns whether global termination is detected.
Definition: Termination.h:72
substrate::Barrier & getBarrier(unsigned activeThreads)
Have a pre-instantiated barrier available for use.
Definition: Substrate.cpp:24
void init(const RangeTy &range)
Definition: Executor_ParaMeter.h:409
WLTy type
Definition: Executor_ForEach.h:430
Definition: Traits.h:197
size_t size() const
Definition: gdeque.h:348
substrate::TerminationDetection & term
Definition: Executor_ForEach.h:191
void push(const Item &item)
Definition: Executor_ForEach.h:136
AbortHandler< value_type > aborted
Definition: Executor_ForEach.h:190
Definition: Executor_ForEach.h:173
Definition: Barrier.h:33
AbortHandler()
Definition: Executor_ForEach.h:123
void initThread(const RangeTy &) const
Definition: Executor_ParaMeter.h:415
#define GALOIS_ATTRIBUTE_NOINLINE
Definition: CompilerSpecific.h:46
SimpleRuntimeContext ctx
Definition: Executor_ForEach.h:166
typename internal::OperatorReferenceType_impl< T >::type OperatorReferenceType
Definition: OperatorReferenceTypes.h:66
Simple Container Wrapper worklist (not scalable).
Definition: Simple.h:36
ForEachExecutor(T1, FunctionTy f, const ArgsTy &args, const WArgsTy &wlargs, std::index_sequence< Is...>)
Definition: Executor_ForEach.h:374
TerminationDetection & getSystemTermination(unsigned activeThreads)
Definition: Termination.cpp:35
void start()
Definition: ThreadTimer.h:59
Definition: Executor_ForEach.h:163
Returns the type associated with the given trait in a tuple.
Definition: Traits.h:121
WLTy::template with_iterator< IterTy >::type type
Definition: Executor_ForEach.h:436
void commitIteration(ThreadLocalData &tld)
Definition: Executor_ForEach.h:202
constexpr auto has_with_iterator(int) -> decltype(std::declval< typename WLTy::template with_iterator< int * >::type >(), bool())
Definition: Executor_ForEach.h:418
void for_each_gen(const RangeTy &r, FunctionTy &&fn, const TupleTy &tpl)
Normalize arguments to for_each.
Definition: Executor_ForEach.h:466
void resetPushBuffer()
Definition: UserContextAccess.h:39
auto checkEmpty(WL &wl, ThreadLocalData &, int) -> decltype(wl.empty(), bool())
Definition: Executor_ForEach.h:300
bool broke
Definition: Executor_ForEach.h:197
Indicates worklist to use.
Definition: Traits.h:211
SuperTy & data()
Definition: UserContextAccess.h:40
substrate::Barrier & barrier
Definition: Executor_ForEach.h:192
FunctionTy origFunction
Definition: Executor_ForEach.h:195
unsigned int num
Definition: Executor_ForEach.h:183
value_type & value(value_type &val) const
Definition: Executor_ForEach.h:129
PerThreadTimer< MORE_STATS > execTime
Definition: Executor_ForEach.h:200
void inc_pushes(size_t v=1)
Definition: LoopStatistics.h:54
GALOIS_ATTRIBUTE_NOINLINE void abortIteration(const Item &item, ThreadLocalData &tld)
Definition: Executor_ForEach.h:222
static unsigned getTID()
Definition: ThreadPool.h:204
Definition: PerThreadStorage.h:88
static constexpr bool needsPia
Definition: Executor_ForEach.h:155
void startIteration()
Definition: libgalois/include/galois/runtime/Context.h:175
unsigned int activeThreads
Definition: Threads.cpp:26
void setBreakFlag(bool *b)
Definition: UserContextAccess.h:43
Definition: Executor_ForEach.h:182
ConflictFlag
Definition: libgalois/include/galois/runtime/Context.h:41
Definition: Termination.h:39
void fastPushBack(typename UserContextAccess< value_type >::PushBufferTy &x)
Definition: Executor_ForEach.h:292
std::tuple type
Definition: Traits.h:181
static unsigned getSocket()
Definition: ThreadPool.h:207
void doProcess(value_type &val, ThreadLocalData &tld)
Definition: Executor_ForEach.h:236
static constexpr bool needsBreak
Definition: Executor_ForEach.h:156
void operator()(void)
Definition: Executor_ParaMeter.h:417
Definition: Executor_ForEach.h:52
static bool isLeader()
Definition: ThreadPool.h:205
UserContextAccess< value_type > facing
Definition: Executor_ForEach.h:164
void inc_iterations()
Definition: LoopStatistics.h:56
const char * loopname
Definition: Executor_ForEach.h:196
Definition: Executor_ForEach.h:150
WorkListTy::value_type value_type
Definition: Executor_ForEach.h:161
Definition: libgalois/include/galois/runtime/Context.h:135
galois::optional< typename WL::value_type > item
Definition: Executor_ForEach.h:184
PerThreadTimer< MORE_STATS > initTime
Definition: Executor_ForEach.h:199
void push(const value_type &val)
Definition: Executor_ForEach.h:131
bool runQueue(ThreadLocalData &tld, WL &lwl)
Definition: Executor_ForEach.h:281
unsigned getMaxSockets() const
Definition: ThreadPool.h:180
Definition: Executor_ForEach.h:363
bool checkEmpty(WorkListTy &, ThreadLocalData &,...)
Definition: Executor_ForEach.h:297
WorkListTy wl
Definition: Executor_ForEach.h:194
ThreadLocalBasics(FunctionTy fn)
Definition: Executor_ForEach.h:168
void inc_conflicts()
Definition: LoopStatistics.h:58
AbortedList * getQueue()
Definition: Executor_ForEach.h:144
Definition: Timer.h:88
T value_type
Definition: Executor_ParaMeter.h:111
value_type & value(Item &item) const
Definition: Executor_ForEach.h:128
void resetAlloc()
Definition: UserContextAccess.h:37
static constexpr bool needStats
Definition: Executor_ForEach.h:152
void runQueueDispatch(ThreadLocalData &tld, WL &lwl, RunQueueState< WL > &s)
Definition: Executor_ForEach.h:256
Definition: Executor_ForEach.h:429
static constexpr bool needsPush
Definition: Executor_ForEach.h:153
bool runQueueSimple(ThreadLocalData &tld)
Definition: Executor_ForEach.h:245
constexpr auto get_default_trait_values(std::index_sequence<> GALOIS_UNUSED(seq), S GALOIS_UNUSED(source), T GALOIS_UNUSED(tags), D GALOIS_UNUSED(defaults))
Returns a tuple that has an element from defaults[i] for every type from tags[i] missing in source...
Definition: Traits.h:148
unsigned commitIteration()
Definition: Context.cpp:77
typename GenericWL::template retype< T > WorkListTy
Definition: Executor_ParaMeter.h:113
void setFastPushBack(FastPushBack f)
Definition: UserContextAccess.h:42