00001
00023 #ifndef GALOIS_WORKLIST_OBIM_H
00024 #define GALOIS_WORKLIST_OBIM_H
00025
00026 #include "Galois/config.h"
00027 #include "Galois/FlatMap.h"
00028 #include "Galois/Runtime/PerThreadStorage.h"
00029 #include "Galois/WorkList/Fifo.h"
00030 #include "Galois/WorkList/WorkListHelpers.h"
00031
00032 #include GALOIS_CXX11_STD_HEADER(type_traits)
00033 #include <limits>
00034
00035 namespace Galois {
00036 namespace WorkList {
00037
00062 template<class Indexer = DummyIndexer<int>, typename Container = FIFO<>,
00063 unsigned BlockPeriod=0,
00064 bool BSP=true,
00065 typename T=int,
00066 typename Index=int,
00067 bool Concurrent=true>
00068 struct OrderedByIntegerMetric : private boost::noncopyable {
00069 template<bool _concurrent>
00070 struct rethread { typedef OrderedByIntegerMetric<Indexer, typename Container::template rethread<_concurrent>::type, BlockPeriod, BSP, T, Index, _concurrent> type; };
00071
00072 template<typename _T>
00073 struct retype { typedef OrderedByIntegerMetric<Indexer, typename Container::template retype<_T>::type, BlockPeriod, BSP, _T, typename std::result_of<Indexer(_T)>::type, Concurrent> type; };
00074
00075 template<unsigned _period>
00076 struct with_block_period { typedef OrderedByIntegerMetric<Indexer, Container, _period, BSP, T, Index, Concurrent> type; };
00077
00078 template<typename _container>
00079 struct with_container { typedef OrderedByIntegerMetric<Indexer, _container, BlockPeriod, BSP, T, Index, Concurrent> type; };
00080
00081 template<typename _indexer>
00082 struct with_indexer { typedef OrderedByIntegerMetric<_indexer, Container, BlockPeriod, BSP, T, Index, Concurrent> type; };
00083
00084 template<bool _bsp>
00085 struct with_back_scan_prevention { typedef OrderedByIntegerMetric<Indexer, Container, BlockPeriod, _bsp, T, Index, Concurrent> type; };
00086
00087 typedef T value_type;
00088
00089 private:
00090 typedef typename Container::template rethread<Concurrent>::type CTy;
00091 typedef Galois::flat_map<Index, CTy*> LMapTy;
00092
00093
00094 struct perItem {
00095 LMapTy local;
00096 Index curIndex;
00097 Index scanStart;
00098 CTy* current;
00099 unsigned int lastMasterVersion;
00100 unsigned int numPops;
00101
00102 perItem() :
00103 curIndex(std::numeric_limits<Index>::min()),
00104 scanStart(std::numeric_limits<Index>::min()),
00105 current(0), lastMasterVersion(0), numPops(0) { }
00106 };
00107
00108 typedef std::deque<std::pair<Index, CTy*> > MasterLog;
00109
00110
00111
00112 Runtime::PerThreadStorage<perItem> current;
00113 Runtime::LL::PaddedLock<Concurrent> masterLock;
00114 MasterLog masterLog;
00115
00116 std::atomic<unsigned int> masterVersion;
00117 Indexer indexer;
00118
00119 bool updateLocal(perItem& p) {
00120 if (p.lastMasterVersion != masterVersion.load(std::memory_order_relaxed)) {
00121
00122 for (; p.lastMasterVersion < masterVersion.load(std::memory_order_relaxed); ++p.lastMasterVersion) {
00123
00124
00125 #if 0
00126 p.local.insert(masterLog[p.lastMasterVersion]);
00127 #else
00128 std::pair<Index, CTy*> logEntry = masterLog[p.lastMasterVersion];
00129 p.local[logEntry.first] = logEntry.second;
00130 assert(logEntry.second);
00131 #endif
00132 }
00133
00134 return true;
00135 }
00136 return false;
00137 }
00138
00139 GALOIS_ATTRIBUTE_NOINLINE
00140 Galois::optional<T> slowPop(perItem& p) {
00141
00142 updateLocal(p);
00143 unsigned myID = Runtime::LL::getTID();
00144 bool localLeader = Runtime::LL::isPackageLeaderForSelf(myID);
00145
00146 Index msS = std::numeric_limits<Index>::min();
00147 if (BSP) {
00148 msS = p.scanStart;
00149 if (localLeader) {
00150 for (unsigned i = 0; i < Runtime::activeThreads; ++i)
00151 msS = std::min(msS, current.getRemote(i)->scanStart);
00152 } else {
00153 msS = std::min(msS, current.getRemote(Runtime::LL::getLeaderForThread(myID))->scanStart);
00154 }
00155 }
00156
00157 for (auto ii = p.local.lower_bound(msS), ee = p.local.end(); ii != ee; ++ii) {
00158 Galois::optional<T> retval;
00159 if ((retval = ii->second->pop())) {
00160 p.current = ii->second;
00161 p.curIndex = ii->first;
00162 p.scanStart = ii->first;
00163 return retval;
00164 }
00165 }
00166 return Galois::optional<value_type>();
00167 }
00168
00169 GALOIS_ATTRIBUTE_NOINLINE
00170 CTy* slowUpdateLocalOrCreate(perItem& p, Index i) {
00171
00172 do {
00173 CTy* lC;
00174 updateLocal(p);
00175 if ((lC = p.local[i]))
00176 return lC;
00177 } while (!masterLock.try_lock());
00178
00179 updateLocal(p);
00180 CTy*& lC2 = p.local[i];
00181 if (!lC2) {
00182 lC2 = new CTy();
00183 p.lastMasterVersion = masterVersion.load(std::memory_order_relaxed) + 1;
00184 masterLog.push_back(std::make_pair(i, lC2));
00185 masterVersion.fetch_add(1);
00186 }
00187 masterLock.unlock();
00188 return lC2;
00189 }
00190
00191 inline CTy* updateLocalOrCreate(perItem& p, Index i) {
00192
00193 CTy* lC;
00194 if ((lC = p.local[i]))
00195 return lC;
00196
00197 return slowUpdateLocalOrCreate(p, i);
00198 }
00199
00200 public:
00201 OrderedByIntegerMetric(const Indexer& x = Indexer()): masterVersion(0), indexer(x) { }
00202
00203 ~OrderedByIntegerMetric() {
00204
00205
00206 for (auto ii = masterLog.rbegin(), ei = masterLog.rend(); ii != ei; ++ii) {
00207 delete ii->second;
00208 }
00209 }
00210
00211 void push(const value_type& val) {
00212 Index index = indexer(val);
00213 perItem& p = *current.getLocal();
00214
00215 if (index == p.curIndex && p.current) {
00216 p.current->push(val);
00217 return;
00218 }
00219
00220
00221 CTy* lC = updateLocalOrCreate(p, index);
00222 if (BSP && index < p.scanStart)
00223 p.scanStart = index;
00224
00225 if (index < p.curIndex) {
00226 p.curIndex = index;
00227 p.current = lC;
00228 }
00229 lC->push(val);
00230 }
00231
00232 template<typename Iter>
00233 void push(Iter b, Iter e) {
00234 while (b != e)
00235 push(*b++);
00236 }
00237
00238 template<typename RangeTy>
00239 void push_initial(const RangeTy& range) {
00240 auto rp = range.local_pair();
00241 push(rp.first, rp.second);
00242 }
00243
00244 Galois::optional<value_type> pop() {
00245
00246 perItem& p = *current.getLocal();
00247 CTy* C = p.current;
00248 if (BlockPeriod && (p.numPops++ & ((1<<BlockPeriod)-1)) == 0)
00249 return slowPop(p);
00250
00251 Galois::optional<value_type> retval;
00252 if (C && (retval = C->pop()))
00253 return retval;
00254
00255
00256 return slowPop(p);
00257 }
00258 };
00259 GALOIS_WLCOMPILECHECK(OrderedByIntegerMetric)
00260
00261 }
00262 }
00263
00264 #endif