20 #ifndef GALOIS_WORKLIST_OBIM_H
21 #define GALOIS_WORKLIST_OBIM_H
25 #include <type_traits>
39 template <
typename T,
typename Index,
bool UseBarrier>
40 class OrderedByIntegerMetricData {
43 bool hasStored(ThreadData&, Index) {
return false; }
47 template <
typename T,
typename Index>
48 class OrderedByIntegerMetricData<T, Index, true> {
52 std::deque<std::pair<Index, T>> stored;
55 substrate::Barrier& barrier;
57 OrderedByIntegerMetricData()
60 bool hasStored(ThreadData& p, Index idx) {
61 for (
auto& e : p.stored) {
63 std::swap(e, p.stored.front());
72 for (
auto ii = p.stored.begin(), ei = p.stored.end(); ii != ei; ++ii) {
73 if (ii->first == idx) {
83 template <
typename Index,
bool UseDescending>
84 struct OrderedByIntegerMetricComparator {
85 std::less<Index> compare;
90 struct with_local_map {
93 OrderedByIntegerMetricComparator()
94 : identity(std::numeric_limits<Index>::
max()),
95 earliest(std::numeric_limits<Index>::
min()) {}
98 template <
typename Index>
99 struct OrderedByIntegerMetricComparator<Index, true> {
100 std::greater<Index> compare;
104 template <
typename C>
105 struct with_local_map {
108 OrderedByIntegerMetricComparator()
109 : identity(std::numeric_limits<Index>::
min()),
110 earliest(std::numeric_limits<Index>::
max()) {}
147 template <
class Indexer = DummyIndexer<
int>,
148 typename Container = PerSocketChunkFIFO<>,
unsigned BlockPeriod = 0,
149 bool BSP = true,
typename T =
int,
typename Index =
int,
150 bool UseBarrier = false,
bool UseMonotonic = false,
151 bool UseDescending = false,
bool Concurrent = true>
153 :
private boost::noncopyable,
154 public internal::OrderedByIntegerMetricData<T, Index, UseBarrier>,
155 public internal::OrderedByIntegerMetricComparator<Index, UseDescending> {
159 template <
typename _T>
161 Indexer,
typename Container::template
retype<_T>, BlockPeriod, BSP, _T,
162 typename std::result_of<Indexer(_T)>::type, UseBarrier, UseMonotonic,
163 UseDescending, Concurrent>;
168 UseBarrier, UseMonotonic, UseDescending, _b>;
170 template <
unsigned _period>
173 UseBarrier, UseMonotonic, UseDescending,
178 template <
typename _container>
181 Index, UseBarrier, UseMonotonic,
182 UseDescending, Concurrent>
186 template <
typename _indexer>
189 Index, UseBarrier, UseMonotonic,
190 UseDescending, Concurrent>
197 Index, UseBarrier, UseMonotonic,
198 UseDescending, Concurrent>
202 template <
bool _use_barrier>
205 Index, _use_barrier, UseMonotonic,
206 UseDescending, Concurrent>
210 template <
bool _use_monotonic>
213 Index, UseBarrier, _use_monotonic,
214 UseDescending, Concurrent>
218 template <
bool _use_descending>
221 Index, UseBarrier, UseMonotonic,
222 _use_descending, Concurrent>
231 typedef internal::OrderedByIntegerMetricComparator<Index, UseDescending>
233 typedef typename Comparator::template with_local_map<CTy*>::type LMapTy;
236 :
public internal::OrderedByIntegerMetricData<T, Index,
237 UseBarrier>::ThreadData {
242 unsigned int lastMasterVersion;
243 unsigned int numPops;
245 ThreadData(Index initial)
246 : curIndex(initial), scanStart(initial), current(0),
247 lastMasterVersion(0), numPops(0) {}
250 typedef std::deque<std::pair<Index, CTy*>> MasterLog;
258 std::atomic<unsigned int> masterVersion;
261 bool updateLocal(ThreadData& p) {
262 if (p.lastMasterVersion != masterVersion.load(std::memory_order_relaxed)) {
264 p.lastMasterVersion < masterVersion.load(std::memory_order_relaxed);
265 ++p.lastMasterVersion) {
269 p.local.insert(masterLog[p.lastMasterVersion]);
271 std::pair<Index, CTy*> logEntry = masterLog[p.lastMasterVersion];
272 p.local[logEntry.first] = logEntry.second;
273 assert(logEntry.second);
284 Index msS = this->earliest;
288 if (BSP && !UseMonotonic) {
293 if (this->compare(o, msS))
298 if (this->compare(o, msS))
303 for (
auto ii = p.local.lower_bound(msS), ei = p.local.end(); ii != ei;
306 if ((item = ii->second->pop())) {
307 p.current = ii->second;
308 p.curIndex = ii->first;
309 p.scanStart = ii->first;
318 CTy* slowUpdateLocalOrCreate(ThreadData& p, Index i) {
322 auto it = p.local.find(i);
323 if (it != p.local.end())
325 }
while (!masterLock.try_lock());
328 auto it = p.local.find(i);
329 CTy* C2 = (it != p.local.end()) ? it->second :
nullptr;
333 p.lastMasterVersion = masterVersion.load(std::memory_order_relaxed) + 1;
334 masterLog.push_back(std::make_pair(i, C2));
335 masterVersion.fetch_add(1);
341 inline CTy* updateLocalOrCreate(ThreadData& p, Index i) {
344 auto it = p.local.find(i);
345 if (it != p.local.end())
348 return slowUpdateLocalOrCreate(p, i);
352 OrderedByIntegerMetric(
const Indexer& x = Indexer())
353 : data(this->earliest), masterVersion(0), indexer(x) {}
358 for (
auto ii = masterLog.rbegin(), ei = masterLog.rend(); ii != ei; ++ii) {
364 Index index = indexer(val);
367 assert(!UseMonotonic || this->compare(p.curIndex, index));
370 if (index == p.curIndex && p.current) {
371 p.current->push(val);
376 CTy* C = updateLocalOrCreate(p, index);
377 if (BSP && this->compare(index, p.scanStart))
380 if (!UseBarrier && this->compare(index, p.curIndex)) {
387 template <
typename Iter>
393 template <
typename RangeTy>
395 auto rp = range.local_pair();
396 push(rp.first, rp.second);
404 if (this->hasStored(p, p.curIndex))
405 return this->popStored(p, p.curIndex);
407 if (!UseBarrier && BlockPeriod &&
408 (p.numPops++ & ((1 << BlockPeriod) - 1)) == 0)
412 if (C && (item = C->pop()))
422 template <
bool Barrier = UseBarrier>
423 auto empty() ->
typename std::enable_if<Barrier, bool>::type {
430 p.stored.push_back(std::make_pair(p.curIndex, *item));
433 if (!p.stored.empty()) {
434 Index storedIndex = this->identity;
435 for (
auto& e : p.stored) {
436 if (this->compare(e.first, storedIndex)) {
437 storedIndex = e.first;
440 p.curIndex = storedIndex;
441 p.current = p.local[storedIndex];
443 p.hasWork = !p.stored.empty();
445 this->barrier.wait();
448 bool hasWork = p.hasWork;
449 Index curIndex = (hasWork) ? p.curIndex : this->identity;
450 CTy* C = (hasWork) ? p.current :
nullptr;
454 if (o.hasWork && this->compare(o.curIndex, curIndex)) {
455 curIndex = o.curIndex;
458 hasWork |= o.hasWork;
461 this->barrier.wait();
464 p.curIndex = curIndex;
467 for (
auto ii = p.local.begin(); ii != p.local.end();) {
468 bool toBreak = ii->second == C;
471 ii = p.local.erase(ii);
OrderedByIntegerMetric< Indexer, Container, BlockPeriod, _bsp, T, Index, UseBarrier, UseMonotonic, UseDescending, Concurrent > type
Definition: Obim.h:199
static unsigned getLeader()
Definition: ThreadPool.h:206
auto empty() -> typename std::enable_if< Barrier, bool >::type
Definition: Obim.h:423
~OrderedByIntegerMetric()
Definition: Obim.h:355
substrate::Barrier & getBarrier(unsigned activeThreads)
Have a pre-instantiated barrier available for use.
Definition: Substrate.cpp:24
Approximate priority scheduling.
Definition: Obim.h:152
T * getLocal()
Definition: PerThreadStorage.h:128
#define GALOIS_ATTRIBUTE_NOINLINE
Definition: CompilerSpecific.h:46
OrderedByIntegerMetric< Indexer, Container, _period, BSP, T, Index, UseBarrier, UseMonotonic, UseDescending, Concurrent > type
Definition: Obim.h:175
void push(Iter b, Iter e)
Definition: Obim.h:388
galois::optional< value_type > pop()
Definition: Obim.h:399
Galois version of boost::optional.
Definition: optional.h:34
Index index_type
Definition: Obim.h:227
OrderedByIntegerMetric< Indexer, _container, BlockPeriod, BSP, T, Index, UseBarrier, UseMonotonic, UseDescending, Concurrent > type
Definition: Obim.h:183
const Ty max(std::atomic< Ty > &a, const Ty &b)
Definition: AtomicHelpers.h:40
unsigned int activeThreads
Definition: Threads.cpp:26
Simple map data structure, based off a single array.
Definition: FlatMap.h:36
#define GALOIS_WLCOMPILECHECK(name)
Definition: WLCompileCheck.h:26
OrderedByIntegerMetric< Indexer, Container, BlockPeriod, BSP, T, Index, UseBarrier, UseMonotonic, _use_descending, Concurrent > type
Definition: Obim.h:223
void push(const value_type &val)
Definition: Obim.h:363
static bool isLeader()
Definition: ThreadPool.h:205
const Ty min(std::atomic< Ty > &a, const Ty &b)
Definition: AtomicHelpers.h:70
OrderedByIntegerMetric< Indexer, Container, BlockPeriod, BSP, T, Index, _use_barrier, UseMonotonic, UseDescending, Concurrent > type
Definition: Obim.h:207
T * getRemote(unsigned int thread)
Definition: PerThreadStorage.h:149
OrderedByIntegerMetric< _indexer, Container, BlockPeriod, BSP, T, Index, UseBarrier, UseMonotonic, UseDescending, Concurrent > type
Definition: Obim.h:191
OrderedByIntegerMetric< Indexer, Container, BlockPeriod, BSP, T, Index, UseBarrier, _use_monotonic, UseDescending, Concurrent > type
Definition: Obim.h:215
T value_type
Definition: Obim.h:226
void push_initial(const RangeTy &range)
Definition: Obim.h:394