Galois
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
DTerminationDetector.h
Go to the documentation of this file.
1 /*
2  * This file belongs to the Galois project, a C++ library for exploiting
3  * parallelism. The code is being released under the terms of the 3-Clause BSD
4  * License (a copy is located in LICENSE.txt at the top-level directory).
5  *
6  * Copyright (C) 2018, The University of Texas at Austin. All rights reserved.
7  * UNIVERSITY EXPRESSLY DISCLAIMS ANY AND ALL WARRANTIES CONCERNING THIS
8  * SOFTWARE AND DOCUMENTATION, INCLUDING ANY WARRANTIES OF MERCHANTABILITY,
9  * FITNESS FOR ANY PARTICULAR PURPOSE, NON-INFRINGEMENT AND WARRANTIES OF
10  * PERFORMANCE, AND ANY WARRANTY THAT MIGHT OTHERWISE ARISE FROM COURSE OF
11  * DEALING OR USAGE OF TRADE. NO WARRANTY IS EITHER EXPRESS OR IMPLIED WITH
12  * RESPECT TO THE USE OF THE SOFTWARE OR DOCUMENTATION. Under no circumstances
13  * shall University be liable for incidental, special, indirect, direct or
14  * consequential damages or loss of profits, interruption of business, or
15  * related expenses which may arise from use of Software or Documentation,
16  * including but not limited to those resulting from defects in Software and/or
17  * Documentation, or loss or inaccuracy of data of any kind.
18  */
19 
26 #ifndef GALOIS_DISTTERMINATOR_H
27 #define GALOIS_DISTTERMINATOR_H
28 
29 #include <limits>
30 #include "galois/Galois.h"
31 #include "galois/Reduction.h"
32 #include "galois/AtomicHelpers.h"
33 #include "galois/runtime/LWCI.h"
35 
36 namespace galois {
37 
44 template <typename Ty>
45 class DGTerminator {
48 
50  Ty local_mdata, global_mdata;
51 
52  uint64_t prev_snapshot;
53  uint64_t snapshot;
54  uint64_t global_snapshot;
55  bool work_done;
56 #ifndef GALOIS_USE_LCI
57  MPI_Request snapshot_request;
58 #else
59  lc_colreq snapshot_request;
60 #endif
61 
62 public:
65  reinitialize();
67  reset();
68  }
69 
70  void reinitialize() {
71  prev_snapshot = 0;
72  snapshot = 1;
73  global_snapshot = 1;
74  work_done = false;
75  }
76 
83  DGTerminator& operator+=(const Ty& rhs) {
84  mdata += rhs;
85  return *this;
86  }
87 
93  void operator=(const Ty rhs) {
94  mdata.reset();
95  mdata += rhs;
96  }
97 
103  void set(const Ty rhs) {
104  mdata.reset();
105  mdata += rhs;
106  }
107 
113  Ty read_local() {
114  if (local_mdata == 0)
115  local_mdata = mdata.reduce();
116  return local_mdata;
117  }
118 
126  Ty read() { return global_mdata; }
127 
133  Ty reset() {
134  Ty retval = global_mdata;
135  mdata.reset();
136  local_mdata = global_mdata = 0;
137  return retval;
138  }
139 
141 #ifdef GALOIS_USE_LCI
142  lc_ialreduce(&snapshot, &global_snapshot, sizeof(Ty),
143  &galois::runtime::internal::ompi_op_max<Ty>, lc_col_ep,
144  &snapshot_request);
145 #else
146  MPI_Iallreduce(&snapshot, &global_snapshot, 1, MPI_UNSIGNED_LONG, MPI_MAX,
147  MPI_COMM_WORLD, &snapshot_request);
148 #endif
149  }
150 
151  bool terminate() {
152  bool active = (local_mdata != 0);
153  // if (active) galois::gDebug("[", net.ID, "] local work done \n");
154  if (!active) {
155  active = net.anyPendingSends();
156  // if (active) galois::gDebug("[", net.ID, "] pending send \n");
157  }
158  int snapshot_ended = 0;
159  if (!active) {
160 #ifndef GALOIS_USE_LCI
161  MPI_Test(&snapshot_request, &snapshot_ended, MPI_STATUS_IGNORE);
162 #else
163  lc_col_progress(&snapshot_request);
164  snapshot_ended = snapshot_request.flag;
165 #endif
166  }
167  if (!active) { // check pending receives after checking snapshot
168  active = net.anyPendingReceives();
169  if (active)
170  galois::gDebug("[", net.ID, "] pending receive");
171  }
172  if (active) {
173  work_done = true;
174  } else {
175  if (snapshot_ended != 0) {
176  snapshot = global_snapshot;
177  if (work_done) {
178  work_done = false;
179  prev_snapshot = snapshot;
180  ++snapshot;
181  galois::gDebug("[", net.ID, "] work done, taking snapshot ",
182  snapshot);
184  } else if (prev_snapshot != snapshot) {
185  prev_snapshot = snapshot;
186  galois::gDebug("[", net.ID, "] no work done, taking snapshot ",
187  snapshot);
189  } else {
190  galois::gDebug("[", net.ID, "] terminating ", snapshot);
191  // an explicit barrier may be required here
192  // so that the next async phase begins on all hosts at the same time
193  // however, this may add overheads when it is not required
194  // (depending on when the next async phase actually begins), so
195  // ASSUME: caller will call getHostBarrier().wait() if required
196  reinitialize(); // for next async phase
197  return true;
198  }
199  }
200  }
201  return false;
202  }
203 
213  Ty reduce(std::string runID = std::string()) {
214  std::string timer_str("ReduceDGAccum_" + runID);
215 
216  galois::CondStatTimer<GALOIS_COMM_STATS> reduceTimer(timer_str.c_str(),
217  "DGReducible");
218  reduceTimer.start();
219 
220  if (local_mdata == 0)
221  local_mdata = mdata.reduce();
222 
223  bool halt = terminate();
224  global_mdata = !halt;
225  if (halt) {
226  galois::runtime::evilPhase += 2; // one for reduce and one for broadcast
228  static_cast<uint32_t>(
229  std::numeric_limits<int16_t>::max())) { // limit defined by MPI or
230  // LCI
232  }
233  }
234 
235  reduceTimer.stop();
236 
237  return global_mdata;
238  }
239 };
240 
241 } // namespace galois
242 #endif
void reinitialize()
Definition: DTerminationDetector.h:70
void reset()
Definition: Reduction.h:113
Ty reset()
Reset the entire accumulator.
Definition: DTerminationDetector.h:133
DGTerminator & operator+=(const Ty &rhs)
Adds to accumulated value.
Definition: DTerminationDetector.h:83
void gDebug(Args &&...GALOIS_USED_ONLY_IN_DEBUG(args))
Prints a debug string from a sequence of things; prints nothing if NDEBUG is defined.
Definition: gIO.h:72
void operator=(const Ty rhs)
Sets current value stored in accumulator.
Definition: DTerminationDetector.h:93
static uint32_t ID
This machine&#39;s host ID.
Definition: Network.h:81
Distributed sum-reducer for getting the sum of some value across multiple hosts.
Definition: DTerminationDetector.h:45
LWCI header that includes lc.h (LCI library) and internal helper functions on arrays.
void set(const Ty rhs)
Sets current value stored in accumulator.
Definition: DTerminationDetector.h:103
Ty read_local()
Read local accumulated value.
Definition: DTerminationDetector.h:113
const Ty max(std::atomic< Ty > &a, const Ty &b)
Definition: AtomicHelpers.h:40
Contains declaration of DistStatManager, which reports runtime statistics of a distributed applicatio...
NetworkInterface & getSystemNetworkInterface()
Get the network interface.
Definition: Network.cpp:131
Ty reduce(std::string runID=std::string())
Reduce data across all hosts, saves the value, and returns the reduced value.
Definition: DTerminationDetector.h:213
void start()
Definition: Timer.cpp:82
void initiate_snapshot()
Definition: DTerminationDetector.h:140
T & reduce()
Returns the final reduction value.
Definition: Reduction.h:102
bool terminate()
Definition: DTerminationDetector.h:151
uint32_t evilPhase
Variable that keeps track of which network send/recv phase a program is currently on...
Definition: Network.cpp:36
A class that defines functions that a network interface in Galois should have.
Definition: Network.h:52
Definition: Timer.h:88
DGTerminator()
Default constructor.
Definition: DTerminationDetector.h:64
Ty read()
Read the value returned by the last reduce call.
Definition: DTerminationDetector.h:126