Galois
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
Obim.h
Go to the documentation of this file.
1 /*
2  * This file belongs to the Galois project, a C++ library for exploiting
3  * parallelism. The code is being released under the terms of the 3-Clause BSD
4  * License (a copy is located in LICENSE.txt at the top-level directory).
5  *
6  * Copyright (C) 2018, The University of Texas at Austin. All rights reserved.
7  * UNIVERSITY EXPRESSLY DISCLAIMS ANY AND ALL WARRANTIES CONCERNING THIS
8  * SOFTWARE AND DOCUMENTATION, INCLUDING ANY WARRANTIES OF MERCHANTABILITY,
9  * FITNESS FOR ANY PARTICULAR PURPOSE, NON-INFRINGEMENT AND WARRANTIES OF
10  * PERFORMANCE, AND ANY WARRANTY THAT MIGHT OTHERWISE ARISE FROM COURSE OF
11  * DEALING OR USAGE OF TRADE. NO WARRANTY IS EITHER EXPRESS OR IMPLIED WITH
12  * RESPECT TO THE USE OF THE SOFTWARE OR DOCUMENTATION. Under no circumstances
13  * shall University be liable for incidental, special, indirect, direct or
14  * consequential damages or loss of profits, interruption of business, or
15  * related expenses which may arise from use of Software or Documentation,
16  * including but not limited to those resulting from defects in Software and/or
17  * Documentation, or loss or inaccuracy of data of any kind.
18  */
19 
20 #ifndef GALOIS_WORKLIST_OBIM_H
21 #define GALOIS_WORKLIST_OBIM_H
22 
23 #include <deque>
24 #include <limits>
25 #include <type_traits>
26 
27 #include "galois/FlatMap.h"
31 #include "galois/worklists/Chunk.h"
33 
34 namespace galois {
35 namespace worklists {
36 
37 namespace internal {
38 
39 template <typename T, typename Index, bool UseBarrier>
40 class OrderedByIntegerMetricData {
41 protected:
42  struct ThreadData {};
43  bool hasStored(ThreadData&, Index) { return false; }
44  galois::optional<T> popStored(ThreadData&, Index) { return {}; }
45 };
46 
47 template <typename T, typename Index>
48 class OrderedByIntegerMetricData<T, Index, true> {
49 protected:
50  struct ThreadData {
51  bool hasWork;
52  std::deque<std::pair<Index, T>> stored;
53  };
54 
55  substrate::Barrier& barrier;
56 
57  OrderedByIntegerMetricData()
58  : barrier(runtime::getBarrier(runtime::activeThreads)) {}
59 
60  bool hasStored(ThreadData& p, Index idx) {
61  for (auto& e : p.stored) {
62  if (e.first == idx) {
63  std::swap(e, p.stored.front());
64  return true;
65  }
66  }
67  return false;
68  }
69 
70  galois::optional<T> popStored(ThreadData& p, Index idx) {
72  for (auto ii = p.stored.begin(), ei = p.stored.end(); ii != ei; ++ii) {
73  if (ii->first == idx) {
74  item = ii->second;
75  p.stored.erase(ii);
76  break;
77  }
78  }
79  return item;
80  }
81 };
82 
83 template <typename Index, bool UseDescending>
84 struct OrderedByIntegerMetricComparator {
85  std::less<Index> compare;
86  Index identity;
87  Index earliest;
88 
89  template <typename C>
90  struct with_local_map {
92  };
93  OrderedByIntegerMetricComparator()
94  : identity(std::numeric_limits<Index>::max()),
95  earliest(std::numeric_limits<Index>::min()) {}
96 };
97 
98 template <typename Index>
99 struct OrderedByIntegerMetricComparator<Index, true> {
100  std::greater<Index> compare;
101  Index identity;
102  Index earliest;
103 
104  template <typename C>
105  struct with_local_map {
107  };
108  OrderedByIntegerMetricComparator()
109  : identity(std::numeric_limits<Index>::min()),
110  earliest(std::numeric_limits<Index>::max()) {}
111 };
112 
113 } // namespace internal
114 
145 // TODO could move to general comparator but there are issues with atomic reads
146 // and initial values for arbitrary types
147 template <class Indexer = DummyIndexer<int>,
148  typename Container = PerSocketChunkFIFO<>, unsigned BlockPeriod = 0,
149  bool BSP = true, typename T = int, typename Index = int,
150  bool UseBarrier = false, bool UseMonotonic = false,
151  bool UseDescending = false, bool Concurrent = true>
153  : private boost::noncopyable,
154  public internal::OrderedByIntegerMetricData<T, Index, UseBarrier>,
155  public internal::OrderedByIntegerMetricComparator<Index, UseDescending> {
156  // static_assert(std::is_integral<Index>::value, "only integral index types
157  // supported");
158 
159  template <typename _T>
161  Indexer, typename Container::template retype<_T>, BlockPeriod, BSP, _T,
162  typename std::result_of<Indexer(_T)>::type, UseBarrier, UseMonotonic,
163  UseDescending, Concurrent>;
164 
165  template <bool _b>
166  using rethread =
167  OrderedByIntegerMetric<Indexer, Container, BlockPeriod, BSP, T, Index,
168  UseBarrier, UseMonotonic, UseDescending, _b>;
169 
170  template <unsigned _period>
172  typedef OrderedByIntegerMetric<Indexer, Container, _period, BSP, T, Index,
173  UseBarrier, UseMonotonic, UseDescending,
174  Concurrent>
176  };
177 
178  template <typename _container>
179  struct with_container {
180  typedef OrderedByIntegerMetric<Indexer, _container, BlockPeriod, BSP, T,
181  Index, UseBarrier, UseMonotonic,
182  UseDescending, Concurrent>
184  };
185 
186  template <typename _indexer>
187  struct with_indexer {
188  typedef OrderedByIntegerMetric<_indexer, Container, BlockPeriod, BSP, T,
189  Index, UseBarrier, UseMonotonic,
190  UseDescending, Concurrent>
192  };
193 
194  template <bool _bsp>
196  typedef OrderedByIntegerMetric<Indexer, Container, BlockPeriod, _bsp, T,
197  Index, UseBarrier, UseMonotonic,
198  UseDescending, Concurrent>
200  };
201 
202  template <bool _use_barrier>
203  struct with_barrier {
204  typedef OrderedByIntegerMetric<Indexer, Container, BlockPeriod, BSP, T,
205  Index, _use_barrier, UseMonotonic,
206  UseDescending, Concurrent>
208  };
209 
210  template <bool _use_monotonic>
211  struct with_monotonic {
212  typedef OrderedByIntegerMetric<Indexer, Container, BlockPeriod, BSP, T,
213  Index, UseBarrier, _use_monotonic,
214  UseDescending, Concurrent>
216  };
217 
218  template <bool _use_descending>
220  typedef OrderedByIntegerMetric<Indexer, Container, BlockPeriod, BSP, T,
221  Index, UseBarrier, UseMonotonic,
222  _use_descending, Concurrent>
224  };
225 
226  typedef T value_type;
227  typedef Index index_type;
228 
229 private:
230  typedef typename Container::template rethread<Concurrent> CTy;
231  typedef internal::OrderedByIntegerMetricComparator<Index, UseDescending>
232  Comparator;
233  typedef typename Comparator::template with_local_map<CTy*>::type LMapTy;
234 
235  struct ThreadData
236  : public internal::OrderedByIntegerMetricData<T, Index,
237  UseBarrier>::ThreadData {
238  LMapTy local;
239  Index curIndex;
240  Index scanStart;
241  CTy* current;
242  unsigned int lastMasterVersion;
243  unsigned int numPops;
244 
245  ThreadData(Index initial)
246  : curIndex(initial), scanStart(initial), current(0),
247  lastMasterVersion(0), numPops(0) {}
248  };
249 
250  typedef std::deque<std::pair<Index, CTy*>> MasterLog;
251 
252  // NB: Place dynamically growing masterLog after fixed-size PerThreadStorage
253  // members to give higher likelihood of reclaiming PerThreadStorage
256  MasterLog masterLog;
257 
258  std::atomic<unsigned int> masterVersion;
259  Indexer indexer;
260 
261  bool updateLocal(ThreadData& p) {
262  if (p.lastMasterVersion != masterVersion.load(std::memory_order_relaxed)) {
263  for (;
264  p.lastMasterVersion < masterVersion.load(std::memory_order_relaxed);
265  ++p.lastMasterVersion) {
266  // XXX(ddn): Somehow the second block is better than
267  // the first for bipartite matching (GCC 4.7.2)
268 #if 0
269  p.local.insert(masterLog[p.lastMasterVersion]);
270 #else
271  std::pair<Index, CTy*> logEntry = masterLog[p.lastMasterVersion];
272  p.local[logEntry.first] = logEntry.second;
273  assert(logEntry.second);
274 #endif
275  }
276  return true;
277  }
278  return false;
279  }
280 
282  galois::optional<T> slowPop(ThreadData& p) {
283  bool localLeader = substrate::ThreadPool::isLeader();
284  Index msS = this->earliest;
285 
286  updateLocal(p);
287 
288  if (BSP && !UseMonotonic) {
289  msS = p.scanStart;
290  if (localLeader) {
291  for (unsigned i = 0; i < runtime::activeThreads; ++i) {
292  Index o = data.getRemote(i)->scanStart;
293  if (this->compare(o, msS))
294  msS = o;
295  }
296  } else {
297  Index o = data.getRemote(substrate::ThreadPool::getLeader())->scanStart;
298  if (this->compare(o, msS))
299  msS = o;
300  }
301  }
302 
303  for (auto ii = p.local.lower_bound(msS), ei = p.local.end(); ii != ei;
304  ++ii) {
305  galois::optional<T> item;
306  if ((item = ii->second->pop())) {
307  p.current = ii->second;
308  p.curIndex = ii->first;
309  p.scanStart = ii->first;
310  return item;
311  }
312  }
313 
315  }
316 
318  CTy* slowUpdateLocalOrCreate(ThreadData& p, Index i) {
319  // update local until we find it or we get the write lock
320  do {
321  updateLocal(p);
322  auto it = p.local.find(i);
323  if (it != p.local.end())
324  return it->second;
325  } while (!masterLock.try_lock());
326  // we have the write lock, update again then create
327  updateLocal(p);
328  auto it = p.local.find(i);
329  CTy* C2 = (it != p.local.end()) ? it->second : nullptr;
330  if (!C2) {
331  C2 = new CTy();
332  p.local[i] = C2;
333  p.lastMasterVersion = masterVersion.load(std::memory_order_relaxed) + 1;
334  masterLog.push_back(std::make_pair(i, C2));
335  masterVersion.fetch_add(1);
336  }
337  masterLock.unlock();
338  return C2;
339  }
340 
341  inline CTy* updateLocalOrCreate(ThreadData& p, Index i) {
342  // Try local then try update then find again or else create and update the
343  // master log
344  auto it = p.local.find(i);
345  if (it != p.local.end())
346  return it->second;
347  // slowpath
348  return slowUpdateLocalOrCreate(p, i);
349  }
350 
351 public:
352  OrderedByIntegerMetric(const Indexer& x = Indexer())
353  : data(this->earliest), masterVersion(0), indexer(x) {}
354 
356  // Deallocate in LIFO order to give opportunity for simple garbage
357  // collection
358  for (auto ii = masterLog.rbegin(), ei = masterLog.rend(); ii != ei; ++ii) {
359  delete ii->second;
360  }
361  }
362 
363  void push(const value_type& val) {
364  Index index = indexer(val);
365  ThreadData& p = *data.getLocal();
366 
367  assert(!UseMonotonic || this->compare(p.curIndex, index));
368 
369  // Fast path
370  if (index == p.curIndex && p.current) {
371  p.current->push(val);
372  return;
373  }
374 
375  // Slow path
376  CTy* C = updateLocalOrCreate(p, index);
377  if (BSP && this->compare(index, p.scanStart))
378  p.scanStart = index;
379  // Opportunistically move to higher priority work
380  if (!UseBarrier && this->compare(index, p.curIndex)) {
381  p.curIndex = index;
382  p.current = C;
383  }
384  C->push(val);
385  }
386 
387  template <typename Iter>
388  void push(Iter b, Iter e) {
389  while (b != e)
390  push(*b++);
391  }
392 
393  template <typename RangeTy>
394  void push_initial(const RangeTy& range) {
395  auto rp = range.local_pair();
396  push(rp.first, rp.second);
397  }
398 
400  // Find a successful pop
401  ThreadData& p = *data.getLocal();
402  CTy* C = p.current;
403 
404  if (this->hasStored(p, p.curIndex))
405  return this->popStored(p, p.curIndex);
406 
407  if (!UseBarrier && BlockPeriod &&
408  (p.numPops++ & ((1 << BlockPeriod) - 1)) == 0)
409  return slowPop(p);
410 
412  if (C && (item = C->pop()))
413  return item;
414 
415  if (UseBarrier)
416  return item;
417 
418  // Slow path
419  return slowPop(p);
420  }
421 
422  template <bool Barrier = UseBarrier>
423  auto empty() -> typename std::enable_if<Barrier, bool>::type {
425  ThreadData& p = *data.getLocal();
426 
427  // try to pop from global worklist
428  item = slowPop(p);
429  if (item)
430  p.stored.push_back(std::make_pair(p.curIndex, *item));
431 
432  // check if there are thread-local work items
433  if (!p.stored.empty()) {
434  Index storedIndex = this->identity;
435  for (auto& e : p.stored) {
436  if (this->compare(e.first, storedIndex)) {
437  storedIndex = e.first;
438  }
439  }
440  p.curIndex = storedIndex;
441  p.current = p.local[storedIndex];
442  }
443  p.hasWork = !p.stored.empty();
444 
445  this->barrier.wait();
446 
447  // align with the earliest level from threads that have works
448  bool hasWork = p.hasWork;
449  Index curIndex = (hasWork) ? p.curIndex : this->identity;
450  CTy* C = (hasWork) ? p.current : nullptr;
451 
452  for (unsigned i = 0; i < runtime::activeThreads; ++i) {
453  ThreadData& o = *data.getRemote(i);
454  if (o.hasWork && this->compare(o.curIndex, curIndex)) {
455  curIndex = o.curIndex;
456  C = o.current;
457  }
458  hasWork |= o.hasWork;
459  }
460 
461  this->barrier.wait();
462 
463  p.current = C;
464  p.curIndex = curIndex;
465 
466  if (UseMonotonic) {
467  for (auto ii = p.local.begin(); ii != p.local.end();) {
468  bool toBreak = ii->second == C;
469  if (toBreak)
470  break;
471  ii = p.local.erase(ii);
472  }
473  }
474 
475  return !hasWork;
476  }
477 };
478 GALOIS_WLCOMPILECHECK(OrderedByIntegerMetric)
479 
480 } // end namespace worklists
481 } // end namespace galois
482 
483 #endif
OrderedByIntegerMetric< Indexer, Container, BlockPeriod, _bsp, T, Index, UseBarrier, UseMonotonic, UseDescending, Concurrent > type
Definition: Obim.h:199
static unsigned getLeader()
Definition: ThreadPool.h:206
auto empty() -> typename std::enable_if< Barrier, bool >::type
Definition: Obim.h:423
~OrderedByIntegerMetric()
Definition: Obim.h:355
substrate::Barrier & getBarrier(unsigned activeThreads)
Have a pre-instantiated barrier available for use.
Definition: Substrate.cpp:24
Approximate priority scheduling.
Definition: Obim.h:152
T * getLocal()
Definition: PerThreadStorage.h:128
#define GALOIS_ATTRIBUTE_NOINLINE
Definition: CompilerSpecific.h:46
OrderedByIntegerMetric< Indexer, Container, _period, BSP, T, Index, UseBarrier, UseMonotonic, UseDescending, Concurrent > type
Definition: Obim.h:175
void push(Iter b, Iter e)
Definition: Obim.h:388
galois::optional< value_type > pop()
Definition: Obim.h:399
Galois version of boost::optional.
Definition: optional.h:34
Index index_type
Definition: Obim.h:227
OrderedByIntegerMetric< Indexer, _container, BlockPeriod, BSP, T, Index, UseBarrier, UseMonotonic, UseDescending, Concurrent > type
Definition: Obim.h:183
const Ty max(std::atomic< Ty > &a, const Ty &b)
Definition: AtomicHelpers.h:40
unsigned int activeThreads
Definition: Threads.cpp:26
Simple map data structure, based off a single array.
Definition: FlatMap.h:36
#define GALOIS_WLCOMPILECHECK(name)
Definition: WLCompileCheck.h:26
OrderedByIntegerMetric< Indexer, Container, BlockPeriod, BSP, T, Index, UseBarrier, UseMonotonic, _use_descending, Concurrent > type
Definition: Obim.h:223
void push(const value_type &val)
Definition: Obim.h:363
static bool isLeader()
Definition: ThreadPool.h:205
const Ty min(std::atomic< Ty > &a, const Ty &b)
Definition: AtomicHelpers.h:70
OrderedByIntegerMetric< Indexer, Container, BlockPeriod, BSP, T, Index, _use_barrier, UseMonotonic, UseDescending, Concurrent > type
Definition: Obim.h:207
T * getRemote(unsigned int thread)
Definition: PerThreadStorage.h:149
OrderedByIntegerMetric< _indexer, Container, BlockPeriod, BSP, T, Index, UseBarrier, UseMonotonic, UseDescending, Concurrent > type
Definition: Obim.h:191
OrderedByIntegerMetric< Indexer, Container, BlockPeriod, BSP, T, Index, UseBarrier, _use_monotonic, UseDescending, Concurrent > type
Definition: Obim.h:215
void push_initial(const RangeTy &range)
Definition: Obim.h:394