Galois
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
Executor_DoAll.h
Go to the documentation of this file.
1 /*
2  * This file belongs to the Galois project, a C++ library for exploiting
3  * parallelism. The code is being released under the terms of the 3-Clause BSD
4  * License (a copy is located in LICENSE.txt at the top-level directory).
5  *
6  * Copyright (C) 2018, The University of Texas at Austin. All rights reserved.
7  * UNIVERSITY EXPRESSLY DISCLAIMS ANY AND ALL WARRANTIES CONCERNING THIS
8  * SOFTWARE AND DOCUMENTATION, INCLUDING ANY WARRANTIES OF MERCHANTABILITY,
9  * FITNESS FOR ANY PARTICULAR PURPOSE, NON-INFRINGEMENT AND WARRANTIES OF
10  * PERFORMANCE, AND ANY WARRANTY THAT MIGHT OTHERWISE ARISE FROM COURSE OF
11  * DEALING OR USAGE OF TRADE. NO WARRANTY IS EITHER EXPRESS OR IMPLIED WITH
12  * RESPECT TO THE USE OF THE SOFTWARE OR DOCUMENTATION. Under no circumstances
13  * shall University be liable for incidental, special, indirect, direct or
14  * consequential damages or loss of profits, interruption of business, or
15  * related expenses which may arise from use of Software or Documentation,
16  * including but not limited to those resulting from defects in Software and/or
17  * Documentation, or loss or inaccuracy of data of any kind.
18  */
19 
20 #ifndef GALOIS_RUNTIME_EXECUTOR_DOALL_H
21 #define GALOIS_RUNTIME_EXECUTOR_DOALL_H
22 
23 #include "galois/config.h"
24 #include "galois/gIO.h"
34 #include "galois/Timer.h"
35 
36 namespace galois::runtime {
37 
38 namespace internal {
39 
40 template <typename R, typename F, typename ArgsTuple>
41 class DoAllStealingExec {
42 
43  typedef typename R::local_iterator Iter;
44  typedef typename std::iterator_traits<Iter>::difference_type Diff_ty;
45 
46  enum StealAmt { HALF, FULL };
47 
48  constexpr static const bool NEED_STATS =
49  galois::internal::NeedStats<ArgsTuple>::value;
50  constexpr static const bool MORE_STATS =
51  NEED_STATS && has_trait<more_stats_tag, ArgsTuple>();
52  constexpr static const bool USE_TERM = false;
53 
54  struct ThreadContext {
55 
56  alignas(substrate::GALOIS_CACHE_LINE_SIZE) substrate::SimpleLock work_mutex;
57  unsigned id;
58 
59  Iter shared_beg;
60  Iter shared_end;
61  Diff_ty m_size;
62  size_t num_iter;
63 
64  // Stats
65 
66  ThreadContext()
67  : work_mutex(), id(substrate::getThreadPool().getMaxThreads()),
68  shared_beg(), shared_end(), m_size(0), num_iter(0) {
69  // TODO: fix this initialization problem,
70  // see initThread
71  }
72 
73  ThreadContext(unsigned id, Iter beg, Iter end)
74  : work_mutex(), id(id), shared_beg(beg), shared_end(end),
75  m_size(std::distance(beg, end)), num_iter(0) {}
76 
77  bool doWork(F func, const unsigned chunk_size) {
78  Iter beg(shared_beg);
79  Iter end(shared_end);
80 
81  bool didwork = false;
82 
83  while (getWork(beg, end, chunk_size)) {
84 
85  didwork = true;
86 
87  for (; beg != end; ++beg) {
88  if (NEED_STATS) {
89  ++num_iter;
90  }
91  func(*beg);
92  }
93  }
94 
95  return didwork;
96  }
97 
98  bool hasWorkWeak() const { return (m_size > 0); }
99 
100  bool hasWork() const {
101  bool ret = false;
102 
103  work_mutex.lock();
104  {
105  ret = hasWorkWeak();
106 
107  if (m_size > 0) {
108  assert(shared_beg != shared_end);
109  }
110  }
111  work_mutex.unlock();
112 
113  return ret;
114  }
115 
116  private:
117  bool getWork(Iter& priv_beg, Iter& priv_end, const unsigned chunk_size) {
118  bool succ = false;
119 
120  work_mutex.lock();
121  {
122  if (hasWorkWeak()) {
123  succ = true;
124 
125  Iter nbeg = shared_beg;
126  if (m_size <= chunk_size) {
127  nbeg = shared_end;
128  m_size = 0;
129 
130  } else {
131  std::advance(nbeg, chunk_size);
132  m_size -= chunk_size;
133  assert(m_size > 0);
134  }
135 
136  priv_beg = shared_beg;
137  priv_end = nbeg;
138  shared_beg = nbeg;
139  }
140  }
141  work_mutex.unlock();
142 
143  return succ;
144  }
145 
146  void steal_from_end_impl(Iter& steal_beg, Iter& steal_end, const Diff_ty sz,
147  std::forward_iterator_tag) {
148 
149  // steal from front for forward_iterator_tag
150  steal_beg = shared_beg;
151  std::advance(shared_beg, sz);
152  steal_end = shared_beg;
153  }
154 
155  void steal_from_end_impl(Iter& steal_beg, Iter& steal_end, const Diff_ty sz,
156  std::bidirectional_iterator_tag) {
157 
158  steal_end = shared_end;
159  std::advance(shared_end, -sz);
160  steal_beg = shared_end;
161  }
162 
163  void steal_from_end(Iter& steal_beg, Iter& steal_end, const Diff_ty sz) {
164  assert(sz > 0);
165  steal_from_end_impl(
166  steal_beg, steal_end, sz,
167  typename std::iterator_traits<Iter>::iterator_category());
168  }
169 
170  void steal_from_beg(Iter& steal_beg, Iter& steal_end, const Diff_ty sz) {
171  assert(sz > 0);
172  steal_beg = shared_beg;
173  std::advance(shared_beg, sz);
174  steal_end = shared_beg;
175  }
176 
177  public:
178  bool stealWork(Iter& steal_beg, Iter& steal_end, Diff_ty& steal_size,
179  StealAmt amount, size_t chunk_size) {
180  bool succ = false;
181 
182  if (work_mutex.try_lock()) {
183 
184  if (hasWorkWeak()) {
185  succ = true;
186 
187  if (amount == HALF && m_size > (Diff_ty)chunk_size) {
188  steal_size = m_size / 2;
189  } else {
190  steal_size = m_size;
191  }
192 
193  if (m_size <= steal_size) {
194  steal_beg = shared_beg;
195  steal_end = shared_end;
196 
197  shared_beg = shared_end;
198 
199  steal_size = m_size;
200  m_size = 0;
201 
202  } else {
203 
204  // steal_from_end (steal_beg, steal_end, steal_size);
205  steal_from_beg(steal_beg, steal_end, steal_size);
206  m_size -= steal_size;
207  }
208  }
209 
210  work_mutex.unlock();
211  }
212 
213  return succ;
214  }
215 
216  void assignWork(const Iter& beg, const Iter& end, const Diff_ty sz) {
217  work_mutex.lock();
218  {
219  assert(!hasWorkWeak());
220  assert(beg != end);
221  assert(std::distance(beg, end) == sz);
222 
223  shared_beg = beg;
224  shared_end = end;
225  m_size = sz;
226  }
227  work_mutex.unlock();
228  }
229  };
230 
231 private:
233  transferWork(ThreadContext& rich, ThreadContext& poor, StealAmt amount) {
234 
235  assert(rich.id != poor.id);
236  assert(rich.id < galois::getActiveThreads());
237  assert(poor.id < galois::getActiveThreads());
238 
239  Iter steal_beg;
240  Iter steal_end;
241 
242  // stealWork should initialize to a more appropriate value
243  Diff_ty steal_size = 0;
244 
245  bool succ =
246  rich.stealWork(steal_beg, steal_end, steal_size, amount, chunk_size);
247 
248  if (succ) {
249  assert(steal_beg != steal_end);
250  assert(std::distance(steal_beg, steal_end) == steal_size);
251 
252  poor.assignWork(steal_beg, steal_end, steal_size);
253  }
254 
255  return succ;
256  }
257 
258  GALOIS_ATTRIBUTE_NOINLINE bool stealWithinSocket(ThreadContext& poor) {
259 
260  bool sawWork = false;
261  bool stoleWork = false;
262 
263  auto& tp = substrate::getThreadPool();
264 
265  const unsigned maxT = galois::getActiveThreads();
266  const unsigned my_pack = substrate::ThreadPool::getSocket();
267  const unsigned per_pack = tp.getMaxThreads() / tp.getMaxSockets();
268 
269  const unsigned pack_beg = my_pack * per_pack;
270  const unsigned pack_end = (my_pack + 1) * per_pack;
271 
272  for (unsigned i = 1; i < pack_end; ++i) {
273 
274  // go around the socket in circle starting from the next thread
275  unsigned t = (poor.id + i) % per_pack + pack_beg;
276  assert((t >= pack_beg) && (t < pack_end));
277 
278  if (t < maxT) {
279  if (workers.getRemote(t)->hasWorkWeak()) {
280  sawWork = true;
281 
282  stoleWork = transferWork(*workers.getRemote(t), poor, HALF);
283 
284  if (stoleWork) {
285  break;
286  }
287  }
288  }
289  }
290 
291  return sawWork || stoleWork;
292  }
293 
294  GALOIS_ATTRIBUTE_NOINLINE bool stealOutsideSocket(ThreadContext& poor,
295  const StealAmt& amt) {
296  bool sawWork = false;
297  bool stoleWork = false;
298 
299  auto& tp = substrate::getThreadPool();
300  unsigned myPkg = substrate::ThreadPool::getSocket();
301  // unsigned maxT = LL::getMaxThreads ();
302  unsigned maxT = galois::getActiveThreads();
303 
304  for (unsigned i = 0; i < maxT; ++i) {
305  ThreadContext& rich = *(workers.getRemote((poor.id + i) % maxT));
306 
307  if (tp.getSocket(rich.id) != myPkg) {
308  if (rich.hasWorkWeak()) {
309  sawWork = true;
310 
311  stoleWork = transferWork(rich, poor, amt);
312  // stoleWork = transferWork (rich, poor, HALF);
313 
314  if (stoleWork) {
315  break;
316  }
317  }
318  }
319  }
320 
321  return sawWork || stoleWork;
322  }
323 
324  GALOIS_ATTRIBUTE_NOINLINE bool trySteal(ThreadContext& poor) {
325  bool ret = false;
326 
327  ret = stealWithinSocket(poor);
328 
329  if (ret) {
330  return true;
331  }
332 
333  substrate::asmPause();
334 
335  if (substrate::getThreadPool().isLeader(poor.id)) {
336  ret = stealOutsideSocket(poor, HALF);
337 
338  if (ret) {
339  return true;
340  }
341  substrate::asmPause();
342  }
343 
344  ret = stealOutsideSocket(poor, HALF);
345  if (ret) {
346  return true;
347  }
348  substrate::asmPause();
349 
350  return ret;
351  }
352 
353 private:
354  R range;
355  F func;
356  const char* loopname;
357  Diff_ty chunk_size;
358  substrate::PerThreadStorage<ThreadContext> workers;
359 
360  substrate::TerminationDetection& term;
361 
362  // for stats
363  PerThreadTimer<MORE_STATS> totalTime;
364  PerThreadTimer<MORE_STATS> initTime;
365  PerThreadTimer<MORE_STATS> execTime;
366  PerThreadTimer<MORE_STATS> stealTime;
367  PerThreadTimer<MORE_STATS> termTime;
368 
369 public:
370  DoAllStealingExec(const R& _range, F _func, const ArgsTuple& argsTuple)
371  : range(_range), func(_func),
372  loopname(galois::internal::getLoopName(argsTuple)),
373  chunk_size(get_trait_value<chunk_size_tag>(argsTuple).value),
374  term(substrate::getSystemTermination(activeThreads)),
375  totalTime(loopname, "Total"), initTime(loopname, "Init"),
376  execTime(loopname, "Execute"), stealTime(loopname, "Steal"),
377  termTime(loopname, "Term") {
378  assert(chunk_size > 0);
379  }
380 
381  // parallel call
382  void initThread(void) {
383  initTime.start();
384 
385  term.initializeThread();
386 
387  unsigned id = substrate::ThreadPool::getTID();
388 
389  *workers.getLocal(id) =
390  ThreadContext(id, range.local_begin(), range.local_end());
391 
392  initTime.stop();
393  }
394 
395  ~DoAllStealingExec() {
396 // executed serially
397 #ifndef NDEBUG
398  for (unsigned i = 0; i < workers.size(); ++i) {
399  auto& ctx = *(workers.getRemote(i));
400  assert(!ctx.hasWork() && "Unprocessed work left");
401  }
402 #endif
403 
404  // printStats ();
405  }
406 
407  void operator()(void) {
408 
409  ThreadContext& ctx = *workers.getLocal();
410  totalTime.start();
411 
412  while (true) {
413  bool workHappened = false;
414 
415  execTime.start();
416 
417  if (ctx.doWork(func, chunk_size)) {
418  workHappened = true;
419  }
420 
421  execTime.stop();
422 
423  assert(!ctx.hasWork());
424 
425  stealTime.start();
426  bool stole = trySteal(ctx);
427  stealTime.stop();
428 
429  if (stole) {
430  continue;
431 
432  } else {
433 
434  assert(!ctx.hasWork());
435  if (USE_TERM) {
436  termTime.start();
437  term.localTermination(workHappened);
438 
439  bool quit = term.globalTermination();
440  termTime.stop();
441 
442  if (quit) {
443  break;
444  }
445  } else {
446  break;
447  }
448  }
449  }
450 
451  totalTime.stop();
452  assert(!ctx.hasWork());
453 
454  if (NEED_STATS) {
455  galois::runtime::reportStat_Tsum(loopname, "Iterations", ctx.num_iter);
456  }
457  }
458 };
459 
460 template <bool _STEAL>
461 struct ChooseDoAllImpl {
462 
463  template <typename R, typename F, typename ArgsT>
464  static void call(const R& range, F&& func, const ArgsT& argsTuple) {
465 
466  internal::DoAllStealingExec<
467  R, OperatorReferenceType<decltype(std::forward<F>(func))>, ArgsT>
468  exec(range, std::forward<F>(func), argsTuple);
469 
470  substrate::Barrier& barrier = getBarrier(activeThreads);
471 
473  activeThreads, [&exec](void) { exec.initThread(); }, std::ref(barrier),
474  std::ref(exec));
475  }
476 };
477 
478 template <>
479 struct ChooseDoAllImpl<false> {
480 
481  template <typename R, typename F, typename ArgsT>
482  static void call(const R& range, F func, const ArgsT& argsTuple) {
483 
485  [&](const unsigned int, const unsigned int) {
486  static constexpr bool NEED_STATS =
487  galois::internal::NeedStats<ArgsT>::value;
488  static constexpr bool MORE_STATS =
489  NEED_STATS && has_trait<more_stats_tag, ArgsT>();
490 
491  const char* const loopname = galois::internal::getLoopName(argsTuple);
492 
493  PerThreadTimer<MORE_STATS> totalTime(loopname, "Total");
494  PerThreadTimer<MORE_STATS> initTime(loopname, "Init");
495  PerThreadTimer<MORE_STATS> execTime(loopname, "Work");
496 
497  totalTime.start();
498  initTime.start();
499 
500  auto begin = range.local_begin();
501  const auto end = range.local_end();
502 
503  initTime.stop();
504 
505  execTime.start();
506 
507  size_t iter = 0;
508 
509  while (begin != end) {
510  func(*begin++);
511  if (NEED_STATS) {
512  ++iter;
513  }
514  }
515  execTime.stop();
516 
517  totalTime.stop();
518 
519  if (NEED_STATS) {
520  galois::runtime::reportStat_Tsum(loopname, "Iterations", iter);
521  }
522  },
523  std::make_tuple());
524  }
525 };
526 
527 } // end namespace internal
528 
529 template <typename R, typename F, typename ArgsTuple>
530 void do_all_gen(const R& range, F&& func, const ArgsTuple& argsTuple) {
531 
532  static_assert(!has_trait<char*, ArgsTuple>(), "old loopname");
533  static_assert(!has_trait<char const*, ArgsTuple>(), "old loopname");
534  static_assert(!has_trait<bool, ArgsTuple>(), "old steal");
535 
536  auto argsT = std::tuple_cat(
537  argsTuple,
538  get_default_trait_values(argsTuple, std::make_tuple(chunk_size_tag{}),
539  std::make_tuple(chunk_size<>{})));
540 
541  using ArgsT = decltype(argsT);
542 
543  constexpr bool TIME_IT = has_trait<loopname_tag, ArgsT>();
544  CondStatTimer<TIME_IT> timer(galois::internal::getLoopName(argsT));
545 
546  timer.start();
547 
548  constexpr bool STEAL = has_trait<steal_tag, ArgsT>();
549 
550  OperatorReferenceType<decltype(std::forward<F>(func))> func_ref = func;
551  internal::ChooseDoAllImpl<STEAL>::call(range, func_ref, argsT);
552 
553  timer.stop();
554 }
555 
556 } // namespace galois::runtime
557 
558 #endif
ThreadPool & getThreadPool(void)
return a reference to system thread pool
Definition: ThreadPool.cpp:259
void run(unsigned num, Args &&...args)
execute work on all threads a simple wrapper for run
Definition: ThreadPool.h:142
unsigned int getActiveThreads() noexcept
Returns the number of threads in use.
Definition: Threads.cpp:37
substrate::Barrier & getBarrier(unsigned activeThreads)
Have a pre-instantiated barrier available for use.
Definition: Substrate.cpp:24
void initThread(const RangeTy &) const
Definition: Executor_ParaMeter.h:415
#define GALOIS_ATTRIBUTE_NOINLINE
Definition: CompilerSpecific.h:46
typename internal::OperatorReferenceType_impl< T >::type OperatorReferenceType
Definition: OperatorReferenceTypes.h:66
constexpr int GALOIS_CACHE_LINE_SIZE
Definition: CompilerSpecific.h:37
const char * loopname
Definition: Executor_ParaMeter.h:145
TerminationDetection & getSystemTermination(unsigned activeThreads)
Definition: Termination.cpp:35
void reportStat_Tsum(const S1 &region, const S2 &category, const T &value)
Definition: Statistics.h:562
Specify chunk size for do_all_coupled &amp; do_all_choice at compile time or at runtime.
Definition: Traits.h:355
void on_each_gen(FunctionTy &&fn, const TupleTy &tpl)
Definition: Executor_OnEach.h:74
Definition: Traits.h:337
static unsigned getTID()
Definition: ThreadPool.h:204
void do_all_gen(const R &range, F &&func, const ArgsTuple &argsTuple)
Definition: Executor_DoAll.h:530
unsigned int activeThreads
Definition: Threads.cpp:26
static unsigned getSocket()
Definition: ThreadPool.h:207
void operator()(void)
Definition: Executor_ParaMeter.h:417
constexpr auto get_trait_value(Tuple tpl)
Returns the value associated with the given trait T in a tuple.
Definition: Traits.h:112
Definition: Timer.h:88
constexpr auto get_default_trait_values(std::index_sequence<> GALOIS_UNUSED(seq), S GALOIS_UNUSED(source), T GALOIS_UNUSED(tags), D GALOIS_UNUSED(defaults))
Returns a tuple that has an element from defaults[i] for every type from tags[i] missing in source...
Definition: Traits.h:148