00001
00023 #ifndef GALOIS_WORKLIST_BULKSYNCHRONOUS_H
00024 #define GALOIS_WORKLIST_BULKSYNCHRONOUS_H
00025
00026 #include "Galois/Runtime/Barrier.h"
00027 #include "Chunked.h"
00028 #include "WLCompileCheck.h"
00029
00030 namespace Galois {
00031 namespace WorkList {
00032
00038 template<class Container=dChunkedFIFO<>, class T=int, bool Concurrent = true>
00039 class BulkSynchronous : private boost::noncopyable {
00040 public:
00041 template<bool _concurrent>
00042 struct rethread { typedef BulkSynchronous<Container, T, _concurrent> type; };
00043
00044 template<typename _T>
00045 struct retype { typedef BulkSynchronous<typename Container::template retype<_T>::type, _T, Concurrent> type; };
00046
00047 template<typename _container>
00048 struct with_container { typedef BulkSynchronous<_container, T, Concurrent> type; };
00049
00050 private:
00051 typedef typename Container::template rethread<Concurrent>::type CTy;
00052
00053 struct TLD {
00054 unsigned round;
00055 TLD(): round(0) { }
00056 };
00057
00058 CTy wls[2];
00059 Runtime::PerThreadStorage<TLD> tlds;
00060 Runtime::Barrier& barrier;
00061 Runtime::LL::CacheLineStorage<volatile long> some;
00062 volatile bool empty;
00063
00064 public:
00065 typedef T value_type;
00066
00067 BulkSynchronous(): barrier(Runtime::getSystemBarrier()), empty(false) { }
00068
00069 void push(const value_type& val) {
00070 wls[(tlds.getLocal()->round + 1) & 1].push(val);
00071 }
00072
00073 template<typename ItTy>
00074 void push(ItTy b, ItTy e) {
00075 while (b != e)
00076 push(*b++);
00077 }
00078
00079 template<typename RangeTy>
00080 void push_initial(const RangeTy& range) {
00081 auto rp = range.local_pair();
00082 push(rp.first, rp.second);
00083 tlds.getLocal()->round = 1;
00084 some.data = true;
00085 }
00086
00087 Galois::optional<value_type> pop() {
00088 TLD& tld = *tlds.getLocal();
00089 Galois::optional<value_type> r;
00090
00091 while (true) {
00092 if (empty)
00093 return r;
00094
00095 r = wls[tld.round].pop();
00096 if (r)
00097 return r;
00098
00099 barrier.wait();
00100 if (Runtime::LL::getTID() == 0) {
00101 if (!some.data)
00102 empty = true;
00103 some.data = false;
00104 }
00105 tld.round = (tld.round + 1) & 1;
00106 barrier.wait();
00107
00108 r = wls[tld.round].pop();
00109 if (r) {
00110 some.data = true;
00111 return r;
00112 }
00113 }
00114 }
00115 };
00116 GALOIS_WLCOMPILECHECK(BulkSynchronous)
00117
00118 }
00119 }
00120
00121 #endif