20 #ifndef GALOIS_RUNTIME_EXECUTOR_DOALL_H
21 #define GALOIS_RUNTIME_EXECUTOR_DOALL_H
23 #include "galois/config.h"
36 namespace galois::runtime {
40 template <
typename R,
typename F,
typename ArgsTuple>
41 class DoAllStealingExec {
43 typedef typename R::local_iterator Iter;
44 typedef typename std::iterator_traits<Iter>::difference_type Diff_ty;
46 enum StealAmt { HALF, FULL };
48 constexpr
static const bool NEED_STATS =
49 galois::internal::NeedStats<ArgsTuple>::value;
50 constexpr
static const bool MORE_STATS =
51 NEED_STATS && has_trait<more_stats_tag, ArgsTuple>();
52 constexpr
static const bool USE_TERM =
false;
54 struct ThreadContext {
67 : work_mutex(), id(substrate::
getThreadPool().getMaxThreads()),
68 shared_beg(), shared_end(), m_size(0), num_iter(0) {
73 ThreadContext(
unsigned id, Iter beg, Iter end)
74 : work_mutex(), id(id), shared_beg(beg), shared_end(end),
75 m_size(std::distance(beg, end)), num_iter(0) {}
77 bool doWork(F func,
const unsigned chunk_size) {
83 while (getWork(beg, end, chunk_size)) {
87 for (; beg != end; ++beg) {
98 bool hasWorkWeak()
const {
return (m_size > 0); }
100 bool hasWork()
const {
108 assert(shared_beg != shared_end);
117 bool getWork(Iter& priv_beg, Iter& priv_end,
const unsigned chunk_size) {
125 Iter nbeg = shared_beg;
126 if (m_size <= chunk_size) {
131 std::advance(nbeg, chunk_size);
132 m_size -= chunk_size;
136 priv_beg = shared_beg;
146 void steal_from_end_impl(Iter& steal_beg, Iter& steal_end,
const Diff_ty sz,
147 std::forward_iterator_tag) {
150 steal_beg = shared_beg;
151 std::advance(shared_beg, sz);
152 steal_end = shared_beg;
155 void steal_from_end_impl(Iter& steal_beg, Iter& steal_end,
const Diff_ty sz,
156 std::bidirectional_iterator_tag) {
158 steal_end = shared_end;
159 std::advance(shared_end, -sz);
160 steal_beg = shared_end;
163 void steal_from_end(Iter& steal_beg, Iter& steal_end,
const Diff_ty sz) {
166 steal_beg, steal_end, sz,
167 typename std::iterator_traits<Iter>::iterator_category());
170 void steal_from_beg(Iter& steal_beg, Iter& steal_end,
const Diff_ty sz) {
172 steal_beg = shared_beg;
173 std::advance(shared_beg, sz);
174 steal_end = shared_beg;
178 bool stealWork(Iter& steal_beg, Iter& steal_end, Diff_ty& steal_size,
179 StealAmt amount,
size_t chunk_size) {
182 if (work_mutex.try_lock()) {
187 if (amount == HALF && m_size > (Diff_ty)chunk_size) {
188 steal_size = m_size / 2;
193 if (m_size <= steal_size) {
194 steal_beg = shared_beg;
195 steal_end = shared_end;
197 shared_beg = shared_end;
205 steal_from_beg(steal_beg, steal_end, steal_size);
206 m_size -= steal_size;
216 void assignWork(
const Iter& beg,
const Iter& end,
const Diff_ty sz) {
219 assert(!hasWorkWeak());
221 assert(std::distance(beg, end) == sz);
233 transferWork(ThreadContext& rich, ThreadContext& poor, StealAmt amount) {
235 assert(rich.id != poor.id);
243 Diff_ty steal_size = 0;
246 rich.stealWork(steal_beg, steal_end, steal_size, amount, chunk_size);
249 assert(steal_beg != steal_end);
250 assert(std::distance(steal_beg, steal_end) == steal_size);
252 poor.assignWork(steal_beg, steal_end, steal_size);
260 bool sawWork =
false;
261 bool stoleWork =
false;
267 const unsigned per_pack = tp.getMaxThreads() / tp.getMaxSockets();
269 const unsigned pack_beg = my_pack * per_pack;
270 const unsigned pack_end = (my_pack + 1) * per_pack;
272 for (
unsigned i = 1; i < pack_end; ++i) {
275 unsigned t = (poor.id + i) % per_pack + pack_beg;
276 assert((t >= pack_beg) && (t < pack_end));
279 if (workers.getRemote(t)->hasWorkWeak()) {
282 stoleWork = transferWork(*workers.getRemote(t), poor, HALF);
291 return sawWork || stoleWork;
295 const StealAmt& amt) {
296 bool sawWork =
false;
297 bool stoleWork =
false;
304 for (
unsigned i = 0; i < maxT; ++i) {
305 ThreadContext& rich = *(workers.getRemote((poor.id + i) % maxT));
307 if (tp.getSocket(rich.id) != myPkg) {
308 if (rich.hasWorkWeak()) {
311 stoleWork = transferWork(rich, poor, amt);
321 return sawWork || stoleWork;
327 ret = stealWithinSocket(poor);
333 substrate::asmPause();
336 ret = stealOutsideSocket(poor, HALF);
341 substrate::asmPause();
344 ret = stealOutsideSocket(poor, HALF);
348 substrate::asmPause();
358 substrate::PerThreadStorage<ThreadContext> workers;
360 substrate::TerminationDetection& term;
363 PerThreadTimer<MORE_STATS> totalTime;
364 PerThreadTimer<MORE_STATS> initTime;
365 PerThreadTimer<MORE_STATS> execTime;
366 PerThreadTimer<MORE_STATS> stealTime;
367 PerThreadTimer<MORE_STATS> termTime;
370 DoAllStealingExec(
const R& _range, F _func,
const ArgsTuple& argsTuple)
371 : range(_range), func(_func),
372 loopname(galois::internal::getLoopName(argsTuple)),
378 assert(chunk_size > 0);
385 term.initializeThread();
389 *workers.getLocal(
id) =
390 ThreadContext(
id, range.local_begin(), range.local_end());
395 ~DoAllStealingExec() {
398 for (
unsigned i = 0; i < workers.size(); ++i) {
399 auto& ctx = *(workers.getRemote(i));
400 assert(!ctx.hasWork() &&
"Unprocessed work left");
409 ThreadContext& ctx = *workers.getLocal();
413 bool workHappened =
false;
417 if (ctx.doWork(func, chunk_size)) {
423 assert(!ctx.hasWork());
426 bool stole = trySteal(ctx);
434 assert(!ctx.hasWork());
437 term.localTermination(workHappened);
439 bool quit = term.globalTermination();
452 assert(!ctx.hasWork());
460 template <
bool _STEAL>
461 struct ChooseDoAllImpl {
463 template <
typename R,
typename F,
typename ArgsT>
464 static void call(
const R& range, F&& func,
const ArgsT& argsTuple) {
466 internal::DoAllStealingExec<
467 R, OperatorReferenceType<decltype(std::forward<F>(func))>, ArgsT>
468 exec(range, std::forward<F>(func), argsTuple);
473 activeThreads, [&exec](
void) { exec.initThread(); }, std::ref(barrier),
479 struct ChooseDoAllImpl<false> {
481 template <
typename R,
typename F,
typename ArgsT>
482 static void call(
const R& range, F func,
const ArgsT& argsTuple) {
485 [&](
const unsigned int,
const unsigned int) {
486 static constexpr
bool NEED_STATS =
487 galois::internal::NeedStats<ArgsT>::value;
488 static constexpr
bool MORE_STATS =
489 NEED_STATS && has_trait<more_stats_tag, ArgsT>();
491 const char*
const loopname = galois::internal::getLoopName(argsTuple);
493 PerThreadTimer<MORE_STATS> totalTime(loopname,
"Total");
494 PerThreadTimer<MORE_STATS> initTime(loopname,
"Init");
495 PerThreadTimer<MORE_STATS> execTime(loopname,
"Work");
500 auto begin = range.local_begin();
501 const auto end = range.local_end();
509 while (begin != end) {
529 template <
typename R,
typename F,
typename ArgsTuple>
530 void do_all_gen(
const R& range, F&& func,
const ArgsTuple& argsTuple) {
532 static_assert(!has_trait<char*, ArgsTuple>(),
"old loopname");
533 static_assert(!has_trait<char const*, ArgsTuple>(),
"old loopname");
534 static_assert(!has_trait<bool, ArgsTuple>(),
"old steal");
536 auto argsT = std::tuple_cat(
541 using ArgsT = decltype(argsT);
543 constexpr
bool TIME_IT = has_trait<loopname_tag, ArgsT>();
548 constexpr
bool STEAL = has_trait<steal_tag, ArgsT>();
551 internal::ChooseDoAllImpl<STEAL>::call(range, func_ref, argsT);
ThreadPool & getThreadPool(void)
return a reference to system thread pool
Definition: ThreadPool.cpp:259
void run(unsigned num, Args &&...args)
execute work on all threads a simple wrapper for run
Definition: ThreadPool.h:142
unsigned int getActiveThreads() noexcept
Returns the number of threads in use.
Definition: Threads.cpp:37
substrate::Barrier & getBarrier(unsigned activeThreads)
Have a pre-instantiated barrier available for use.
Definition: Substrate.cpp:24
void initThread(const RangeTy &) const
Definition: Executor_ParaMeter.h:415
#define GALOIS_ATTRIBUTE_NOINLINE
Definition: CompilerSpecific.h:46
typename internal::OperatorReferenceType_impl< T >::type OperatorReferenceType
Definition: OperatorReferenceTypes.h:66
constexpr int GALOIS_CACHE_LINE_SIZE
Definition: CompilerSpecific.h:37
const char * loopname
Definition: Executor_ParaMeter.h:145
TerminationDetection & getSystemTermination(unsigned activeThreads)
Definition: Termination.cpp:35
void reportStat_Tsum(const S1 ®ion, const S2 &category, const T &value)
Definition: Statistics.h:562
Specify chunk size for do_all_coupled & do_all_choice at compile time or at runtime.
Definition: Traits.h:355
void on_each_gen(FunctionTy &&fn, const TupleTy &tpl)
Definition: Executor_OnEach.h:74
static unsigned getTID()
Definition: ThreadPool.h:204
void do_all_gen(const R &range, F &&func, const ArgsTuple &argsTuple)
Definition: Executor_DoAll.h:530
unsigned int activeThreads
Definition: Threads.cpp:26
static unsigned getSocket()
Definition: ThreadPool.h:207
void operator()(void)
Definition: Executor_ParaMeter.h:417
constexpr auto get_trait_value(Tuple tpl)
Returns the value associated with the given trait T in a tuple.
Definition: Traits.h:112
constexpr auto get_default_trait_values(std::index_sequence<> GALOIS_UNUSED(seq), S GALOIS_UNUSED(source), T GALOIS_UNUSED(tags), D GALOIS_UNUSED(defaults))
Returns a tuple that has an element from defaults[i] for every type from tags[i] missing in source...
Definition: Traits.h:148