Galois
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
ThreadPool.h
Go to the documentation of this file.
1 /*
2  * This file belongs to the Galois project, a C++ library for exploiting
3  * parallelism. The code is being released under the terms of the 3-Clause BSD
4  * License (a copy is located in LICENSE.txt at the top-level directory).
5  *
6  * Copyright (C) 2018, The University of Texas at Austin. All rights reserved.
7  * UNIVERSITY EXPRESSLY DISCLAIMS ANY AND ALL WARRANTIES CONCERNING THIS
8  * SOFTWARE AND DOCUMENTATION, INCLUDING ANY WARRANTIES OF MERCHANTABILITY,
9  * FITNESS FOR ANY PARTICULAR PURPOSE, NON-INFRINGEMENT AND WARRANTIES OF
10  * PERFORMANCE, AND ANY WARRANTY THAT MIGHT OTHERWISE ARISE FROM COURSE OF
11  * DEALING OR USAGE OF TRADE. NO WARRANTY IS EITHER EXPRESS OR IMPLIED WITH
12  * RESPECT TO THE USE OF THE SOFTWARE OR DOCUMENTATION. Under no circumstances
13  * shall University be liable for incidental, special, indirect, direct or
14  * consequential damages or loss of profits, interruption of business, or
15  * related expenses which may arise from use of Software or Documentation,
16  * including but not limited to those resulting from defects in Software and/or
17  * Documentation, or loss or inaccuracy of data of any kind.
18  */
19 
20 #ifndef GALOIS_SUBSTRATE_THREADPOOL_H
21 #define GALOIS_SUBSTRATE_THREADPOOL_H
22 
23 #include <atomic>
24 #include <cassert>
25 #include <condition_variable>
26 #include <cstdlib>
27 #include <functional>
28 #include <thread>
29 #include <vector>
30 
33 
34 namespace galois::substrate::internal {
35 
36 template <typename tpl, int s, int r>
37 struct ExecuteTupleImpl {
38  static inline void execute(tpl& cmds) {
39  std::get<s>(cmds)();
41  }
42 };
43 
44 template <typename tpl, int s>
45 struct ExecuteTupleImpl<tpl, s, 0> {
46  static inline void execute(tpl&) {}
47 };
48 
49 } // namespace galois::substrate::internal
50 
51 namespace galois::substrate {
52 
53 class ThreadPool {
54  friend class SharedMem;
55 
56 protected:
57  struct shutdown_ty {};
58  struct fastmode_ty {
59  bool mode;
60  };
61  struct dedicated_ty {
62  std::function<void(void)> fn;
63  };
64 
66  struct per_signal {
67  std::condition_variable cv;
68  std::mutex m;
69  unsigned wbegin, wend;
70  std::atomic<int> done;
71  std::atomic<int> fastRelease;
73 
74  void wakeup(bool fastmode) {
75  if (fastmode) {
76  done = 0;
77  fastRelease = 1;
78  } else {
79  std::lock_guard<std::mutex> lg(m);
80  done = 0;
81  cv.notify_one();
82  // start.release();
83  }
84  }
85 
86  void wait(bool fastmode) {
87  if (fastmode) {
88  while (!fastRelease.load(std::memory_order_relaxed)) {
89  asmPause();
90  }
91  fastRelease = 0;
92  } else {
93  std::unique_lock<std::mutex> lg(m);
94  cv.wait(lg, [=] { return !done; });
95  // start.acquire();
96  }
97  }
98  };
99 
100  thread_local static per_signal my_box;
101 
103  std::vector<per_signal*> signals;
104  std::vector<std::thread> threads;
105  unsigned reserved;
106  unsigned masterFastmode;
107  bool running;
108  std::function<void(void)> work;
109 
111  void destroyCommon();
112 
114  void initThread(unsigned tid);
115 
117  void threadLoop(unsigned tid);
118 
120  void cascade(bool fastmode);
121 
123  void decascade();
124 
126  void runInternal(unsigned num);
127 
128  ThreadPool();
129 
130 public:
131  ~ThreadPool();
132 
133  ThreadPool(const ThreadPool&) = delete;
134  ThreadPool& operator=(const ThreadPool&) = delete;
135 
136  ThreadPool(ThreadPool&&) = delete;
137  ThreadPool& operator=(ThreadPool&&) = delete;
138 
141  template <typename... Args>
142  void run(unsigned num, Args&&... args) {
143  struct ExecuteTuple {
144  // using Ty = std::tuple<Args...>;
145  std::tuple<Args...> cmds;
146 
147  void operator()() {
148  internal::ExecuteTupleImpl<
149  std::tuple<Args...>, 0,
150  std::tuple_size<std::tuple<Args...>>::value>::execute(this->cmds);
151  }
152  ExecuteTuple(Args&&... args) : cmds(std::forward<Args>(args)...) {}
153  };
154  // paying for an indirection in work allows small-object optimization in
155  // std::function to kick in and avoid a heap allocation
156  ExecuteTuple lwork(std::forward<Args>(args)...);
157  work = std::ref(lwork);
158  // work =
159  // std::function<void(void)>(ExecuteTuple(std::forward<Args>(args)...));
160  assert(num <= getMaxThreads());
161  runInternal(num);
162  }
163 
165  void runDedicated(std::function<void(void)>& f);
166 
167  // experimental: busy wait for work
168  void burnPower(unsigned num);
169  // experimental: leave busy wait
170  void beKind();
171 
172  bool isRunning() const { return running; }
173 
175  unsigned getMaxUsableThreads() const { return mi.maxThreads - reserved; }
178  unsigned getMaxThreads() const { return mi.maxThreads; }
179  unsigned getMaxCores() const { return mi.maxCores; }
180  unsigned getMaxSockets() const { return mi.maxSockets; }
181  unsigned getMaxNumaNodes() const { return mi.maxNumaNodes; }
182 
183  unsigned getLeaderForSocket(unsigned pid) const {
184  for (unsigned i = 0; i < getMaxThreads(); ++i)
185  if (getSocket(i) == pid && isLeader(i))
186  return i;
187  abort();
188  }
189 
190  bool isLeader(unsigned tid) const {
191  return signals[tid]->topo.socketLeader == tid;
192  }
193  unsigned getSocket(unsigned tid) const { return signals[tid]->topo.socket; }
194  unsigned getLeader(unsigned tid) const {
195  return signals[tid]->topo.socketLeader;
196  }
197  unsigned getCumulativeMaxSocket(unsigned tid) const {
198  return signals[tid]->topo.cumulativeMaxSocket;
199  }
200  unsigned getNumaNode(unsigned tid) const {
201  return signals[tid]->topo.numaNode;
202  }
203 
204  static unsigned getTID() { return my_box.topo.tid; }
205  static bool isLeader() { return my_box.topo.tid == my_box.topo.socketLeader; }
206  static unsigned getLeader() { return my_box.topo.socketLeader; }
207  static unsigned getSocket() { return my_box.topo.socket; }
208  static unsigned getCumulativeMaxSocket() {
210  }
211  static unsigned getNumaNode() { return my_box.topo.numaNode; }
212 };
213 
217 ThreadPool& getThreadPool(void);
218 
219 } // namespace galois::substrate
220 
221 namespace galois::substrate::internal {
222 
223 void setThreadPool(ThreadPool* tp);
224 
225 } // namespace galois::substrate::internal
226 
227 #endif
void beKind()
Definition: ThreadPool.cpp:83
unsigned maxThreads
Definition: HWTopo.h:41
ThreadPool & getThreadPool(void)
return a reference to system thread pool
Definition: ThreadPool.cpp:259
void threadLoop(unsigned tid)
main thread loop
Definition: ThreadPool.cpp:132
void run(unsigned num, Args &&...args)
execute work on all threads a simple wrapper for run
Definition: ThreadPool.h:142
unsigned wend
Definition: ThreadPool.h:69
std::function< void(void)> fn
Definition: ThreadPool.h:62
std::function< void(void)> work
Definition: ThreadPool.h:108
static unsigned getLeader()
Definition: ThreadPool.h:206
unsigned tid
Definition: HWTopo.h:31
void wait(bool fastmode)
Definition: ThreadPool.h:86
std::atomic< int > done
Definition: ThreadPool.h:70
unsigned getMaxThreads() const
return the number of threads supported by the thread pool on the current machine
Definition: ThreadPool.h:178
unsigned getMaxCores() const
Definition: ThreadPool.h:179
type for setting fastmode
Definition: ThreadPool.h:61
unsigned maxSockets
Definition: HWTopo.h:43
std::condition_variable cv
Definition: ThreadPool.h:67
std::atomic< int > fastRelease
Definition: ThreadPool.h:71
void decascade()
spin down after run
Definition: ThreadPool.cpp:160
unsigned reserved
Definition: ThreadPool.h:105
unsigned maxCores
Definition: HWTopo.h:42
unsigned getNumaNode(unsigned tid) const
Definition: ThreadPool.h:200
static unsigned getCumulativeMaxSocket()
Definition: ThreadPool.h:208
bool running
Definition: ThreadPool.h:107
unsigned socketLeader
Definition: HWTopo.h:32
Definition: substrate/SharedMem.h:32
std::mutex m
Definition: ThreadPool.h:68
unsigned numaNode
Definition: HWTopo.h:34
unsigned getCumulativeMaxSocket(unsigned tid) const
Definition: ThreadPool.h:197
void runInternal(unsigned num)
execute work on num threads
Definition: ThreadPool.cpp:203
unsigned wbegin
Definition: ThreadPool.h:69
void runDedicated(std::function< void(void)> &f)
run function in a dedicated thread until the threadpool exits
Definition: ThreadPool.cpp:231
static unsigned getTID()
Definition: ThreadPool.h:204
std::vector< per_signal * > signals
Definition: ThreadPool.h:103
bool isLeader(unsigned tid) const
Definition: ThreadPool.h:190
void initThread(unsigned tid)
Initialize a thread.
Definition: ThreadPool.cpp:118
static unsigned getSocket()
Definition: ThreadPool.h:207
unsigned getMaxNumaNodes() const
Definition: ThreadPool.h:181
Definition: HWTopo.h:30
unsigned getLeader(unsigned tid) const
Definition: ThreadPool.h:194
void burnPower(unsigned num)
Definition: ThreadPool.cpp:70
void operator()(void)
Definition: Executor_ParaMeter.h:417
bool mode
Definition: ThreadPool.h:59
static bool isLeader()
Definition: ThreadPool.h:205
ThreadTopoInfo topo
Definition: ThreadPool.h:72
std::vector< std::thread > threads
Definition: ThreadPool.h:104
Definition: ThreadPool.h:53
void cascade(bool fastmode)
spin up for run
Definition: ThreadPool.cpp:179
unsigned masterFastmode
Definition: ThreadPool.h:106
unsigned getMaxSockets() const
Definition: ThreadPool.h:180
static thread_local per_signal my_box
Definition: ThreadPool.h:100
ThreadPool & operator=(const ThreadPool &)=delete
Definition: HWTopo.h:40
unsigned getLeaderForSocket(unsigned pid) const
Definition: ThreadPool.h:183
void wakeup(bool fastmode)
Definition: ThreadPool.h:74
MachineTopoInfo mi
Definition: ThreadPool.h:102
type for shutting down thread
Definition: ThreadPool.h:58
type to switch to dedicated mode
Definition: ThreadPool.h:66
unsigned cumulativeMaxSocket
Definition: HWTopo.h:35
unsigned getMaxUsableThreads() const
return the number of non-reserved threads in the pool
Definition: ThreadPool.h:175
unsigned maxNumaNodes
Definition: HWTopo.h:44
ThreadPool()
Definition: ThreadPool.cpp:40
bool isRunning() const
Definition: ThreadPool.h:172
unsigned getSocket(unsigned tid) const
Definition: ThreadPool.h:193
static unsigned getNumaNode()
Definition: ThreadPool.h:211
void destroyCommon()
destroy all threads
Definition: ThreadPool.cpp:65
~ThreadPool()
Definition: ThreadPool.cpp:58
class galois::runtime::ParaMeter::ParaMeterExecutor execute(const R &range)
Definition: Executor_ParaMeter.h:360
unsigned socket
Definition: HWTopo.h:33