Galois
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
TiledExecutor.h
Go to the documentation of this file.
1 /*
2  * This file belongs to the Galois project, a C++ library for exploiting
3  * parallelism. The code is being released under the terms of the 3-Clause BSD
4  * License (a copy is located in LICENSE.txt at the top-level directory).
5  *
6  * Copyright (C) 2018, The University of Texas at Austin. All rights reserved.
7  * UNIVERSITY EXPRESSLY DISCLAIMS ANY AND ALL WARRANTIES CONCERNING THIS
8  * SOFTWARE AND DOCUMENTATION, INCLUDING ANY WARRANTIES OF MERCHANTABILITY,
9  * FITNESS FOR ANY PARTICULAR PURPOSE, NON-INFRINGEMENT AND WARRANTIES OF
10  * PERFORMANCE, AND ANY WARRANTY THAT MIGHT OTHERWISE ARISE FROM COURSE OF
11  * DEALING OR USAGE OF TRADE. NO WARRANTY IS EITHER EXPRESS OR IMPLIED WITH
12  * RESPECT TO THE USE OF THE SOFTWARE OR DOCUMENTATION. Under no circumstances
13  * shall University be liable for incidental, special, indirect, direct or
14  * consequential damages or loss of profits, interruption of business, or
15  * related expenses which may arise from use of Software or Documentation,
16  * including but not limited to those resulting from defects in Software and/or
17  * Documentation, or loss or inaccuracy of data of any kind.
18  */
19 
20 #ifndef _GALOIS_RUNTIME_TILEDEXECUTOR_H_
21 #define _GALOIS_RUNTIME_TILEDEXECUTOR_H_
22 
23 #include "galois/config.h"
24 #include "galois/Galois.h"
25 #include "galois/LargeArray.h"
26 #include "galois/NoDerefIterator.h"
27 
28 namespace galois {
29 namespace runtime {
30 
31 template <typename Graph, bool UseExp = false>
33  static constexpr int numDims = 2; // code is specialized to 2
34 
36  using GNode = typename Graph::GraphNode;
37  using iterator = typename Graph::iterator;
38  using edge_iterator = typename Graph::edge_iterator;
39  using Point = std::array<size_t, numDims>;
40 
41  template <typename T>
42  struct SimpleAtomic {
43  std::atomic<T> value;
44  SimpleAtomic() : value(0) {}
45  SimpleAtomic(const SimpleAtomic& o) : value(o.value.load()) {}
46  T relaxedLoad() { return value.load(std::memory_order_relaxed); }
47  void relaxedAdd(T delta) {
48  value.store(relaxedLoad() + delta, std::memory_order_relaxed);
49  }
50  };
51 
55  struct Task {
56  iterator startX;
57  iterator endX;
58  GNode startY;
59  GNode endYInclusive;
60  Point coord;
61  SimpleAtomic<unsigned> updates;
62  };
63 
68  struct GetDst : public std::unary_function<edge_iterator, GNode> {
69  Graph* g;
70  GetDst() {}
71  GetDst(Graph* _g) : g(_g) {}
72 
73  GNode operator()(edge_iterator ii) const { return g->getEdgeDst(ii); }
74  };
75 
77  using edge_dst_iterator =
78  boost::transform_iterator<GetDst, no_deref_iterator>;
79 
80  Graph& g;
81  int cutoff; // XXX: UseExp
82  galois::substrate::Barrier& barrier; // XXX: UseExp
83  // std::array<galois::LargeArray<SpinLock>, numDims> locks;
84  // galois::LargeArray<Task> tasks;
85  std::array<std::vector<SpinLock>, numDims> locks;
86  std::vector<Task> tasks;
87  size_t numTasks;
88  unsigned maxUpdates;
89  bool useLocks;
90  galois::GAccumulator<unsigned> failedProbes;
91 
100  void nextPoint(Point& p, int dim, int delta) {
101  assert(dim < numDims);
102  p[dim] += delta;
103  // account for overflow
104  while (p[dim] >= locks[dim].size()) {
105  p[dim] -= locks[dim].size();
106  }
107  }
108 
118  Task* getTask(const Point& p) {
119  Task* t = &tasks[p[0] + p[1] * locks[0].size()];
120 
121  assert(t < &tasks[numTasks]);
122  assert(t >= &tasks[0]);
123 
124  return t;
125  }
126 
140  Task* probeBlockWithLock(Point& start, int dim, size_t n) {
141  Point p = start;
142 
143  for (size_t i = 0; i < n; ++i) {
144  Task* t = getTask(p);
145 
146  assert(p[0] == t->coord[0]);
147  assert(p[1] == t->coord[1]);
148  assert(t->coord[0] < locks[0].size());
149  assert(t->coord[1] < locks[1].size());
150 
151  if (t->updates.relaxedLoad() < maxUpdates) {
152  if (std::try_lock(locks[0][t->coord[0]], locks[1][t->coord[1]]) < 0) {
153  if (t->updates.relaxedLoad() < maxUpdates) {
154  t->updates.relaxedAdd(1);
155  start = p;
156  return t;
157  }
158 
159  // TODO add to worklist
160  for (int i = 0; i < numDims; ++i) {
161  locks[i][t->coord[i]].unlock();
162  }
163  }
164  }
165 
166  nextPoint(p, dim, 1);
167  }
168 
169  failedProbes += 1;
170  return nullptr;
171  }
172 
184  Task* probeBlockWithoutLock(Point& start, int dim, size_t n) {
185  Point p = start;
186 
187  for (size_t i = 0; i < n; ++i) {
188  Task* t = getTask(p);
189 
190  assert(p[0] == t->coord[0]);
191  assert(p[1] == t->coord[1]);
192  assert(t->coord[0] < locks[0].size());
193  assert(t->coord[1] < locks[1].size());
194 
195  if (t->updates.relaxedLoad() < maxUpdates) {
196  if (t->updates.value.fetch_add(1) < maxUpdates) {
197  // hasn't reached maxed updates at point of fetch
198  start = p;
199  return t;
200  }
201  }
202  nextPoint(p, dim, 1);
203  }
204 
205  failedProbes += 1;
206  return nullptr;
207  }
208 
227  Task* probeBlock(Point& start, int dim, size_t n) {
228  assert(dim < 2);
229 
230  if (useLocks) {
231  return probeBlockWithLock(start, dim, n);
232  } else {
233  return probeBlockWithoutLock(start, dim, n);
234  }
235  }
236 
237  // TODO (Loc) this function needs an overhaul; right now it's too hacky and
238  // imprecise
254  Task* nextBlock(Point& start, bool inclusive) {
255  Task* t;
256 
257  // repeats twice just to make sure there are actually no unused blocks
258  // TODO this method of termination detection is hacky and imprecise,
259  // find a better way
260  for (int times = 0; times < 2; ++times) {
261  Point limit{{locks[0].size(), locks[1].size()}};
262 
263  int inclusiveDelta = (inclusive && times == 0) ? 0 : 1;
264 
265  // First iteration (i.e. inclusive = true) is INCLUSIVE of start
266  // Otherwise, check the next blocks in the x and y direction for the
267  // next block
268  for (int i = 0; i < numDims; ++i) {
269  Point p = start;
270  nextPoint(p, i, inclusiveDelta);
271 
272  if ((t = probeBlock(p, i, limit[i] - inclusiveDelta))) {
273  start = p;
274  return t;
275  }
276  }
277 
278  // if the above for loop failed, it means all blocks in both directions
279  // (left->right, up->down) from current block from point are locked
280  // and/or all blocks have reached max updates
281  Point p = start;
282  // solution to above issue in comment = advance using diagonal and check
283  // from there
284  for (int i = 0; i < numDims; ++i) {
285  nextPoint(p, i, 1);
286  }
287 
288  // below will end up looping through entire grid looking for a block
289  // to work on; in some cases a block will be looped over more than once
290  // (see below TODO)
291  // TODO probably unoptimal: if any limit has hit 0, is it the case that
292  // the entire grid has been looked at already? This comment writer thinks
293  // the answer is yes in which case the below is doing extra work
294  while (std::any_of(limit.begin(), limit.end(),
295  [](size_t x) { return x > 0; })) {
296  for (int i = 0; i < numDims; ++i) {
297  if (limit[i] > 1 && (t = probeBlock(p, i, limit[i] - 1))) {
298  start = p;
299  return t;
300  }
301  }
302 
303  for (int i = 0; i < numDims; ++i) {
304  if (limit[i] > 0) {
305  limit[i] -= 1;
306  nextPoint(p, i, 1);
307  }
308  }
309  }
310  }
311 
312  return nullptr;
313  }
314 
326  template <bool UseDense, typename Function>
327  void executeBlock(Function& fn, Task& task,
328  typename std::enable_if<UseDense>::type* = 0) {
329  GetDst getDst{&g};
330 
331  for (auto ii = task.startX; ii != task.endX; ++ii) {
332  for (auto jj = g.begin() + task.startY,
333  ej = g.begin() + task.endYInclusive + 1;
334  jj != ej; ++jj) {
335  fn(*ii, *jj);
336  }
337  }
338  }
339 
351  template <bool UseDense, typename Function>
352  void executeBlock(Function& fn, Task& task,
353  typename std::enable_if<!UseDense>::type* = 0) {
354  GetDst getDst{&g};
355 
356  for (auto ii = task.startX; ii != task.endX; ++ii) {
357  no_deref_iterator nbegin(
358  g.edge_begin(*ii, galois::MethodFlag::UNPROTECTED));
359  no_deref_iterator nend(g.edge_end(*ii, galois::MethodFlag::UNPROTECTED));
360 
361  // iterates over the edges, but edge_dst_iterator xforms it to the dest
362  // node itself
363  edge_dst_iterator dbegin(nbegin, getDst);
364  edge_dst_iterator dend(nend, getDst);
365 
366  // TODO check if we want to use experimental
367  // if (UseExp &&
368  // cutoff < 0 &&
369  // std::distance(g.edge_begin(*ii, galois::MethodFlag::UNPROTECTED),
370  // g.edge_end(*ii, galois::MethodFlag::UNPROTECTED)) >= -cutoff) {
371  // continue;
372  //} else if (UseExp &&
373  // cutoff > 0 &&
374  // std::distance(g.edge_begin(*ii,
375  // galois::MethodFlag::UNPROTECTED),
376  // g.edge_end(*ii,
377  // galois::MethodFlag::UNPROTECTED)) < cutoff) {
378  // continue;
379  //}
380 
381  for (auto jj = std::lower_bound(dbegin, dend, task.startY); jj != dend;) {
382  // if (UseExp) {
383  // constexpr int numTimes = 1;
384  // constexpr int width = 1;
385  // bool done = false;
386  // for (int times = 0; times < numTimes; ++times) {
387  // for (int i = 0; i < width; ++i) {
388  // edge_iterator edge = *(jj+i).base();
389  // if (*(jj + i) > task.endYInclusive) {
390  // done = true;
391  // break;
392  // }
393 
394  // fn(*ii, *(jj+i), edge);
395  // }
396  // }
397  // if (done)
398  // break;
399  // for (int i = 0; jj != dend && i < width; ++jj, ++i)
400  // ;
401  // if (jj == dend)
402  // break;
403  //} else {
404  edge_iterator edge = *jj.base();
405  if (*jj > task.endYInclusive)
406  break;
407 
408  fn(*ii, *jj, edge);
409  ++jj;
410  //}
411  }
412  }
413  }
414 
430  template <bool UseDense, typename Function>
431  void executeLoopExp(Function fn, unsigned tid, unsigned total) {
432  Point numBlocks{locks[0].size(), locks[1].size()};
433  Point block;
434  Point start;
435 
436  // TODO this assigns each thread a block along the diagonal, which is
437  // probably NOT what you want in this executor since each block will go
438  // along the diagonal; fix this
439  for (int i = 0; i < numDims; ++i) {
440  block[i] = (numBlocks[i] + total - 1) / total; // blocks per thread
441  start[i] = std::min(block[i] * tid, numBlocks[i] - 1); // block to start
442  }
443 
444  // Move diagonal along dim each round
445  // if more y than x, then dim is 1 (i.e. y), else 0
446  int dim = numBlocks[0] < numBlocks[1] ? 1 : 0;
447  int odim = (dim + 1) % 2;
448  // num blocks in dim dimension
449  size_t maxRounds = numBlocks[dim];
450 
451  for (size_t rounds = 0; rounds < maxRounds; ++rounds) {
452  Point p{start[0], start[1]};
453  nextPoint(p, dim, rounds);
454 
455  size_t ntries =
456  std::min(block[odim] * (tid + 1), numBlocks[odim]) - start[odim];
457  for (size_t tries = 0; tries < ntries; ++tries) {
458  Task* t = probeBlock(p, 0, 1); // probe block I am currently on
459  if (t) {
460  executeBlock<UseDense>(fn, *t);
461 
462  if (useLocks) {
463  for (int i = 0; i < numDims; ++i)
464  locks[i][t->coord[i]].unlock();
465  }
466  }
467 
468  for (int i = 0; i < numDims; ++i)
469  nextPoint(p, i, 1);
470  }
471 
472  barrier.wait();
473  }
474  }
475 
476  // TODO examine this
477  // bulk synchronous diagonals: dynamic assignment within diagonals
478  template <bool UseDense, typename Function>
479  void executeLoopExp2(Function fn, unsigned tid, unsigned total) {
480  Point numBlocks{{locks[0].size(), locks[1].size()}};
481  Point block;
482  Point start;
483  for (int i = 0; i < numDims; ++i) {
484  block[i] = (numBlocks[i] + total - 1) / total;
485  start[i] = std::min(block[i] * tid, numBlocks[i] - 1);
486  }
487 
488  // Move diagonal along dim each round
489  int dim = numBlocks[0] < numBlocks[1] ? 1 : 0;
490  int odim = (dim + 1) % 2;
491  size_t maxRounds = numBlocks[dim];
492 
493  for (size_t round = 0; round < maxRounds; ++round) {
494  Point base{{start[0], start[1]}};
495  nextPoint(base, dim, round);
496  for (size_t tries = 0; tries < numBlocks[odim]; ++tries) {
497  size_t index = tries + base[odim];
498  if (index >= numBlocks[odim])
499  index -= numBlocks[odim];
500  Point p{};
501  nextPoint(p, dim, round);
502  nextPoint(p, odim, index);
503  nextPoint(p, dim, index);
504 
505  Task* t = probeBlock(p, 0, 1);
506  if (!t)
507  continue;
508  executeBlock<UseDense>(fn, *t);
509 
510  if (useLocks) {
511  for (int i = 0; i < numDims; ++i)
512  locks[i][t->coord[i]].unlock();
513  }
514  }
515 
516  barrier.wait();
517  }
518  }
519 
520  // TODO this function is imprecise by virtue of nextBlock being a bad
521  // function
534  template <bool UseDense, typename Function>
535  void executeLoopOrig(Function fn, unsigned tid, unsigned total) {
536  Point numBlocks{{locks[0].size(), locks[1].size()}};
537  Point block;
538  Point start;
539 
540  // find out each thread's starting point; essentially what it is doing
541  // is assinging each thread to a block on the diagonal to begin with
542  for (int i = 0; i < numDims; ++i) {
543  block[i] = (numBlocks[i] + total - 1) / total; // blocks per thread
544  start[i] = std::min(block[i] * tid, numBlocks[i] - 1); // block to start
545  }
546 
547  unsigned coresPerSocket =
550 
551  // if using locks, readjust start Y location of this thread to location of
552  // the thread's socket
553  if (useLocks) {
554  start = {{start[0],
555  std::min(block[1] *
556  galois::substrate::getThreadPool().getSocket(tid) *
557  coresPerSocket,
558  numBlocks[1] - 1)}};
559  }
560 
561  Point p = start;
562 
563  for (int i = 0;; ++i) {
564  Task* t = nextBlock(p, i == 0);
565  // TODO: Replace with sparse worklist, etc.
566  if (!t)
567  break;
568 
569  executeBlock<UseDense>(fn, *t);
570 
571  // unlock the task block if using locks (next block returns the task with
572  // the block locked)
573  if (useLocks) {
574  for (int i = 0; i < numDims; ++i) {
575  locks[i][t->coord[i]].unlock();
576  }
577  }
578  }
579  }
580 
591  template <bool UseDense, typename Function>
592  void executeLoop(Function fn, unsigned tid, unsigned total) {
593  // if (false && UseExp)
594  // executeLoopExp2<UseDense>(fn, tid, total);
595  // else
596  executeLoopOrig<UseDense>(fn, tid, total);
597  }
598 
611  void initializeTasks(iterator firstX, iterator lastX, iterator firstY,
612  iterator lastY, size_t sizeX, size_t sizeY) {
613  const size_t numXBlocks =
614  (std::distance(firstX, lastX) + sizeX - 1) / sizeX;
615  const size_t numYBlocks =
616  (std::distance(firstY, lastY) + sizeY - 1) / sizeY;
617  const size_t numBlocks = numXBlocks * numYBlocks;
618 
619  // locks[0].create(numXBlocks);
620  // locks[1].create(numYBlocks);
621  // tasks.create(numBlocks);
622  locks[0].resize(numXBlocks);
623  locks[1].resize(numYBlocks);
624  tasks.resize(numBlocks);
625 
626  // TODO parallelize this?
627  // assign each block the X and Y that it is responsible for
628  for (size_t i = 0; i < numBlocks; ++i) {
629  Task& task = tasks[i];
630  task.coord = {{i % numXBlocks, i / numXBlocks}};
631  std::tie(task.startX, task.endX) =
632  galois::block_range(firstX, lastX, task.coord[0], numXBlocks);
633  iterator s;
634  iterator e;
635  std::tie(s, e) =
636  galois::block_range(firstY, lastY, task.coord[1], numYBlocks);
637  // XXX: Works for CSR graphs
638  task.startY = *s;
639  task.endYInclusive = *e - 1;
640  }
641  }
642 
651  template <bool UseDense, typename Function>
652  struct Process {
654  Function fn;
655 
656  void operator()(unsigned tid, unsigned total) {
657  self->executeLoop<UseDense>(fn, tid, total);
658  }
659  };
660 
661 public:
662  Fixed2DGraphTiledExecutor(Graph& g, int cutoff = 0)
663  : g(g), cutoff(cutoff),
664  barrier(galois::runtime::getBarrier(galois::getActiveThreads())) {}
665 
670  galois::runtime::reportStat_Single("TiledExecutor", "ProbeFailures",
671  failedProbes.reduce());
672  }
673 
692  template <typename Function>
693  void execute(iterator firstX, iterator lastX, iterator firstY, iterator lastY,
694  size_t sizeX, size_t sizeY, Function fn, bool _useLocks,
695  unsigned numIterations = 1) {
696  initializeTasks(firstX, lastX, firstY, lastY, sizeX, sizeY);
697  numTasks = tasks.size();
698  maxUpdates = numIterations;
699  useLocks = _useLocks;
700 
701  Process<false, Function> p{this, fn};
702 
703  galois::on_each(p);
704 
705  // TODO remove after worklist fix
706  if (std::any_of(tasks.begin(), tasks.end(),
707  [this](Task& t) { return t.updates.value < maxUpdates; })) {
708  galois::gWarn("Missing tasks");
709  }
710  }
711 
730  template <typename Function>
731  void executeDense(iterator firstX, iterator lastX, iterator firstY,
732  iterator lastY, size_t sizeX, size_t sizeY, Function fn,
733  bool _useLocks, int numIterations = 1) {
734  initializeTasks(firstX, lastX, firstY, lastY, sizeX, sizeY);
735  numTasks = tasks.size();
736  maxUpdates = numIterations;
737  useLocks = _useLocks;
738  Process<true, Function> p{this, fn};
739  galois::on_each(p);
740 
741  // TODO remove after worklist fix
742  if (std::any_of(tasks.begin(), tasks.end(),
743  [this](Task& t) { return t.updates.value < maxUpdates; })) {
744  galois::gWarn("Missing tasks");
745  }
746  }
747 };
748 
749 } // namespace runtime
750 } // namespace galois
751 #endif
ThreadPool & getThreadPool(void)
return a reference to system thread pool
Definition: ThreadPool.cpp:259
unsigned int getActiveThreads() noexcept
Returns the number of threads in use.
Definition: Threads.cpp:37
void executeDense(iterator firstX, iterator lastX, iterator firstY, iterator lastY, size_t sizeX, size_t sizeY, Function fn, bool _useLocks, int numIterations=1)
Execute a function on a provided X set of nodes and Y set of nodes for a certain number of iterations...
Definition: TiledExecutor.h:731
~Fixed2DGraphTiledExecutor()
Report the number of probe block failures to statistics.
Definition: TiledExecutor.h:669
std::pair< IterTy, IterTy > block_range(IterTy b, IterTy e, unsigned id, unsigned num)
Returns a continuous block from the range based on the number of divisions and the id of the block re...
Definition: gstl.h:244
unsigned getMaxCores() const
Definition: ThreadPool.h:179
substrate::Barrier & getBarrier(unsigned activeThreads)
Have a pre-instantiated barrier available for use.
Definition: Substrate.cpp:24
Definition: Barrier.h:33
Fixed2DGraphTiledExecutor(Graph &g, int cutoff=0)
Definition: TiledExecutor.h:662
void execute(iterator firstX, iterator lastX, iterator firstY, iterator lastY, size_t sizeX, size_t sizeY, Function fn, bool _useLocks, unsigned numIterations=1)
Execute a function on a provided X set of nodes and Y set of nodes for a certain number of iterations...
Definition: TiledExecutor.h:693
Definition: TiledExecutor.h:32
Modify an iterator so that *it == it.
Definition: NoDerefIterator.h:31
void reportStat_Single(const S1 &region, const S2 &category, const T &value)
Definition: Statistics.h:544
Definition: PaddedLock.h:35
void operator()(void)
Definition: Executor_ParaMeter.h:417
const Ty min(std::atomic< Ty > &a, const Ty &b)
Definition: AtomicHelpers.h:70
void on_each(FunctionTy &&fn, const Args &...args)
Low-level parallel loop.
Definition: Loops.h:86
T & reduce()
Returns the final reduction value.
Definition: Reduction.h:102
unsigned getMaxSockets() const
Definition: ThreadPool.h:180
void gWarn(Args &&...args)
Prints a warning string from a sequence of things.
Definition: gIO.h:63