CMS 3D CMS Logo

StallMonitor.cc
Go to the documentation of this file.
1 // -*- C++ -*-
2 //
3 // Package: FWCore/Services
4 // Class : StallMonitor
5 //
6 // Implementation:
7 //
8 // Original Author: Kyle Knoepfel
9 //
10 
26 
27 #include "tbb/concurrent_unordered_map.h"
28 
29 #include <atomic>
30 #include <chrono>
31 #include <iomanip>
32 #include <iostream>
33 #include <sstream>
34 
35 namespace {
36 
37  using clock_t = std::chrono::steady_clock;
38  auto now = clock_t::now;
39 
40  inline auto stream_id(edm::StreamContext const& cs)
41  {
42  return cs.streamID().value();
43  }
44 
45  inline auto module_id(edm::ModuleCallingContext const& mcc)
46  {
47  return mcc.moduleDescription()->id();
48  }
49 
50  //===============================================================
51  class StallStatistics {
52  public:
53 
54  // c'tor receiving 'std::string const&' type not provided since we
55  // must be able to call (e.g.) std::vector<StallStatistics>(20),
56  // for which a default label is not sensible in this context.
57  StallStatistics() = default;
58 
59  std::string const& label() const { return label_; }
60  unsigned numberOfStalls() const { return stallCounter_; }
61 
62  using duration_t = std::chrono::milliseconds;
63  using rep_t = duration_t::rep;
64 
65  duration_t totalStalledTime() const { return duration_t{totalTime_.load()}; }
66  duration_t maxStalledTime() const { return duration_t{maxTime_.load()}; }
67 
68  // Modifiers
69  void setLabel(std::string const& label) { label_ = label; }
70 
71  void update(std::chrono::milliseconds const ms)
72  {
73  ++stallCounter_;
74  auto const thisTime = ms.count();
75  totalTime_ += thisTime;
76  rep_t max {maxTime_};
77  while (thisTime > max && !maxTime_.compare_exchange_strong(max, thisTime));
78  }
79 
80  private:
81  std::string label_ {};
82  std::atomic<unsigned> stallCounter_ {};
83  std::atomic<rep_t> totalTime_ {};
84  std::atomic<rep_t> maxTime_ {};
85  };
86 
87  //===============================================================
88  // Message-assembly utilities
89  template <typename T>
91  concatenate(std::ostream& os, T const t)
92  {
93  os << ' ' << t;
94  }
95 
96  template <typename H, typename... T>
98  concatenate(std::ostream& os, H const h, T const... t)
99  {
100  os << ' ' << h;
101  concatenate(os, t...);
102  }
103 
104  enum class step : char { preSourceEvent = 'S',
105  postSourceEvent = 's',
106  preEvent = 'E',
107  postModuleEventPrefetching = 'p',
108  preModuleEvent = 'M',
109  preEventReadFromSource = 'R',
110  postEventReadFromSource = 'r',
111  postModuleEvent = 'm' ,
112  postEvent = 'e'};
113 
114  std::ostream& operator<<(std::ostream& os, step const s)
115  {
116  os << static_cast<std::underlying_type_t<step>>(s);
117  return os;
118  }
119 
120  template <step S, typename... ARGS>
121  std::string assembleMessage(ARGS const... args)
122  {
123  std::ostringstream oss;
124  oss << S;
125  concatenate(oss, args...);
126  oss << '\n';
127  return oss.str();
128  }
129 
130 }
131 
132 namespace edm {
133  namespace service {
134 
135  class StallMonitor {
136  public:
138  static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
139 
140  private:
141 
142  void preModuleConstruction(edm::ModuleDescription const&);
143  void postBeginJob();
144  void preSourceEvent(StreamID);
145  void postSourceEvent(StreamID);
146  void preEvent(StreamContext const&);
147  void preModuleEvent(StreamContext const&, ModuleCallingContext const&);
148  void postModuleEventPrefetching(StreamContext const&, ModuleCallingContext const&);
149  void preEventReadFromSource(StreamContext const&, ModuleCallingContext const&);
150  void postEventReadFromSource(StreamContext const&, ModuleCallingContext const&);
151  void postModuleEvent(StreamContext const&, ModuleCallingContext const&);
152  void postEvent(StreamContext const&);
153  void postEndJob();
154 
156  bool const validFile_; // Separate data member from file to improve efficiency.
157  std::chrono::milliseconds const stallThreshold_;
158  decltype(now()) beginTime_ {};
159 
160  // There can be multiple modules per stream. Therefore, we need
161  // the combination of StreamID and ModuleID to correctly track
162  // stalling information. We use tbb::concurrent_unordered_map
163  // for this purpose.
164  using StreamID_value = decltype(std::declval<StreamID>().value());
165  using ModuleID = decltype(std::declval<ModuleDescription>().id());
166  tbb::concurrent_unordered_map<std::pair<StreamID_value,ModuleID>, decltype(beginTime_)> stallStart_ {};
167 
168  std::vector<std::string> moduleLabels_ {};
169  std::vector<StallStatistics> moduleStats_ {};
170  };
171 
172  }
173 
174 }
175 
176 namespace {
177  constexpr char const* filename_default {""};
178  constexpr double threshold_default {0.1};
179  std::string const space {" "};
180 }
181 
183 using namespace std::chrono;
184 
185 StallMonitor::StallMonitor(ParameterSet const& iPS, ActivityRegistry& iRegistry)
186  : file_{iPS.getUntrackedParameter<std::string>("fileName", filename_default)}
187  , validFile_{file_}
188  , stallThreshold_{static_cast<long int>(iPS.getUntrackedParameter<double>("stallThreshold")*1000)}
189 {
190  iRegistry.watchPreModuleConstruction(this, &StallMonitor::preModuleConstruction);
191  iRegistry.watchPostBeginJob(this, &StallMonitor::postBeginJob);
192  iRegistry.watchPostModuleEventPrefetching(this, &StallMonitor::postModuleEventPrefetching);
193  iRegistry.watchPreModuleEvent(this, &StallMonitor::preModuleEvent);
194  iRegistry.watchPostEndJob(this, &StallMonitor::postEndJob);
195 
196  if (validFile_) {
197  // Only enable the following callbacks if writing to a file.
198  iRegistry.watchPreSourceEvent(this, &StallMonitor::preSourceEvent);
199  iRegistry.watchPostSourceEvent(this, &StallMonitor::postSourceEvent);
200  iRegistry.watchPreEvent(this, &StallMonitor::preEvent);
201  iRegistry.watchPreEventReadFromSource(this, &StallMonitor::preEventReadFromSource);
202  iRegistry.watchPostEventReadFromSource(this, &StallMonitor::postEventReadFromSource);
203  iRegistry.watchPostModuleEvent(this, &StallMonitor::postModuleEvent);
204  iRegistry.watchPostEvent(this, &StallMonitor::postEvent);
205 
206  std::ostringstream oss;
207  oss << "# Step Symbol Entries\n"
208  << "# -------------------------- ------ ------------------------------------------\n"
209  << "# preSourceEvent " << step::preSourceEvent << " <Stream ID> <Time since beginJob (ms)>\n"
210  << "# postSourceEvent " << step::postSourceEvent << " <Stream ID> <Time since beginJob (ms)>\n"
211  << "# preEvent " << step::preEvent << " <Stream ID> <Run#> <LumiBlock#> <Event#> <Time since beginJob (ms)>\n"
212  << "# postModuleEventPrefetching " << step::postModuleEventPrefetching << " <Stream ID> <Module ID> <Time since beginJob (ms)>\n"
213  << "# preModuleEvent " << step::preModuleEvent << " <Stream ID> <Module ID> <Time since beginJob (ms)>\n"
214  << "# preEventReadFromSource " << step::preEventReadFromSource << " <Stream ID> <Module ID> <Time since beginJob (ms)>\n"
215  << "# postEventReadFromSource " << step::postEventReadFromSource << " <Stream ID> <Module ID> <Time since beginJob (ms)>\n"
216  << "# postModuleEvent " << step::postModuleEvent << " <Stream ID> <Module ID> <Time since beginJob (ms)>\n"
217  << "# postEvent " << step::postEvent << " <Stream ID> <Run#> <LumiBlock#> <Event#> <Time since beginJob (ms)>\n";
218  file_.write(oss.str());
219  }
220 }
221 
223 {
225  desc.addUntracked<std::string>("fileName", filename_default)->setComment("Name of file to which detailed timing information should be written.\n"
226  "An empty filename argument (the default) indicates that no extra\n"
227  "information will be written to a dedicated file, but only the summary\n"
228  "including stalling-modules information will be logged.");
229  desc.addUntracked<double>("stallThreshold", threshold_default)->setComment("Threshold (in seconds) used to classify modules as stalled.\n"
230  "Millisecond granularity allowed.");
231  descriptions.add("StallMonitor", desc);
232  descriptions.setComment("This service keeps track of various times in event-processing to determine which modules are stalling.");
233 }
234 
236 {
237  // Module labels are dense, so if the module id is greater than the
238  // size of moduleLabels_, grow the vector to the correct index and
239  // assign the last entry to the desired label. Note that with the
240  // current implementation, there is no module with ID '0'. In
241  // principle, the module-information vectors are therefore each one
242  // entry too large. However, since removing the entry at the front
243  // makes for awkward indexing later on, and since the sizes of these
244  // extra entries are on the order of bytes, we will leave them in
245  // and skip over them later when printing out summaries. The
246  // extraneous entries can be identified by their module labels being
247  // empty.
248  auto const mid = md.id();
249  if (mid < moduleLabels_.size()) {
250  moduleLabels_[mid] = md.moduleLabel();
251  }
252  else {
253  moduleLabels_.resize(mid+1);
254  moduleLabels_.back() = md.moduleLabel();
255  }
256 }
257 
259 {
260  // Since a (push,emplace)_back cannot be called for a vector of a
261  // type containing atomics (like 'StallStatistics')--i.e. atomics
262  // have no copy/move-assignment operators, we must specify the size
263  // of the vector at construction time.
264  moduleStats_ = std::vector<StallStatistics>(moduleLabels_.size());
265  for (std::size_t i{}; i < moduleStats_.size(); ++i) {
266  moduleStats_[i].setLabel(moduleLabels_[i]);
267  }
268 
269  if (validFile_) {
270  std::size_t const width {std::to_string(moduleLabels_.size()).size()};
271 
272  OStreamColumn col0 {"Module ID", width};
273  std::string const lastCol {"Module label"};
274 
275  std::ostringstream oss;
276  oss << "\n# " << col0 << space << lastCol << '\n';
277  oss << "# " << std::string(col0.width()+space.size()+lastCol.size(),'-') << '\n';
278 
279  for (std::size_t i{} ; i < moduleLabels_.size(); ++i) {
280  auto const& label = moduleLabels_[i];
281  if (label.empty()) continue; // See comment in filling of moduleLabels_;
282  oss << "#M " << std::setw(width) << std::left << col0(i) << space
283  << std::left << moduleLabels_[i] << '\n';
284  }
285  oss << '\n';
286  file_.write(oss.str());
287  }
288  // Don't need the labels anymore--info. is now part of the
289  // module-statistics objects.
290  moduleLabels_.clear();
291 
292  beginTime_ = now();
293 }
294 
296 {
297  auto const t = duration_cast<milliseconds>(now()-beginTime_).count();
298  auto msg = assembleMessage<step::preSourceEvent>(sid.value(), t);
299  file_.write(std::move(msg));
300 }
301 
303 {
304  auto const t = duration_cast<milliseconds>(now()-beginTime_).count();
305  auto msg = assembleMessage<step::postSourceEvent>(sid.value(), t);
306  file_.write(std::move(msg));
307 }
308 
310 {
311  auto const t = duration_cast<milliseconds>(now()-beginTime_).count();
312  auto const& eid = sc.eventID();
313  auto msg = assembleMessage<step::preEvent>(stream_id(sc), eid.run(), eid.luminosityBlock(), eid.event(), t);
314  file_.write(std::move(msg));
315 }
316 
318 {
319  auto const sid = stream_id(sc);
320  auto const mid = module_id(mcc);
321  auto start = stallStart_[std::make_pair(sid,mid)] = now();
322 
323  if (validFile_) {
324  auto const t = duration_cast<milliseconds>(start-beginTime_).count();
325  auto msg = assembleMessage<step::postModuleEventPrefetching>(sid, mid, t);
326  file_.write(std::move(msg));
327  }
328 }
329 
331 {
332  auto const preModEvent = now();
333  auto const sid = stream_id(sc);
334  auto const mid = module_id(mcc);
335  auto start = stallStart_[std::make_pair(sid,mid)];
336  auto startT = start.time_since_epoch();
337  if (validFile_) {
338  auto t = duration_cast<milliseconds>(preModEvent-beginTime_).count();
339  if(startT == milliseconds::duration::zero()) {
340  //prefetching did not happen
341  auto msg = assembleMessage<step::postModuleEventPrefetching>(sid, mid, t);
342  file_.write(std::move(msg));
343  }
344  auto msg = assembleMessage<step::preModuleEvent>(sid, mid, t);
345  file_.write(std::move(msg));
346  }
347 
348  if( milliseconds::duration::zero() != startT) {
349  auto const preFetch_to_preModEvent = duration_cast<milliseconds>(preModEvent-start);
350  if (preFetch_to_preModEvent < stallThreshold_) return;
351  moduleStats_[mid].update(preFetch_to_preModEvent);
352  }
353 }
354 
356 {
357  auto const t = duration_cast<milliseconds>(now()-beginTime_).count();
358  auto msg = assembleMessage<step::preEventReadFromSource>(stream_id(sc), module_id(mcc), t);
359  file_.write(std::move(msg));
360 }
361 
363 {
364  auto const t = duration_cast<milliseconds>(now()-beginTime_).count();
365  auto msg = assembleMessage<step::postEventReadFromSource>(stream_id(sc), module_id(mcc), t);
366  file_.write(std::move(msg));
367 }
368 
370 {
371  auto const postModEvent = duration_cast<milliseconds>(now()-beginTime_).count();
372  auto msg = assembleMessage<step::postModuleEvent>(stream_id(sc), module_id(mcc), postModEvent);
373  file_.write(std::move(msg));
374 }
375 
377 {
378  auto const t = duration_cast<milliseconds>(now()-beginTime_).count();
379  auto const& eid = sc.eventID();
380  auto msg = assembleMessage<step::postEvent>(stream_id(sc), eid.run(), eid.luminosityBlock(), eid.event(), t);
381  file_.write(std::move(msg));
382 }
383 
385 {
386  // Prepare summary
387  std::size_t width {};
388  edm::for_all(moduleStats_, [&width](auto const& stats) {
389  if (stats.numberOfStalls() == 0u) return;
390  width = std::max(width, stats.label().size());
391  });
392 
393  OStreamColumn tag {"StallMonitor>"};
394  OStreamColumn col1 {"Module label", width};
395  OStreamColumn col2 {"# of stalls"};
396  OStreamColumn col3 {"Total stalled time"};
397  OStreamColumn col4 {"Max stalled time"};
398 
399  LogAbsolute out {"StallMonitor"};
400  out << '\n';
401  out << tag << space
402  << col1 << space
403  << col2 << space
404  << col3 << space
405  << col4 << '\n';
406 
407  out << tag << space
408  << std::setfill('-')
409  << col1(std::string{}) << space
410  << col2(std::string{}) << space
411  << col3(std::string{}) << space
412  << col4(std::string{}) << '\n';
413 
414  using seconds_d = duration<double>;
415 
416  auto to_seconds_str = [](auto const& duration){
417  std::ostringstream oss;
418  auto const time = duration_cast<seconds_d>(duration).count();
419  oss << time << " s";
420  return oss.str();
421  };
422 
423  out << std::setfill(' ');
424  for (auto const& stats : moduleStats_) {
425  if (stats.label().empty() || // See comment in filling of moduleLabels_;
426  stats.numberOfStalls() == 0u) continue;
427  out << std::left
428  << tag << space
429  << col1(stats.label()) << space
430  << std::right
431  << col2(stats.numberOfStalls()) << space
432  << col3(to_seconds_str(stats.totalStalledTime())) << space
433  << col4(to_seconds_str(stats.maxStalledTime())) << '\n';
434  }
435 }
436 
size
Write out results.
Definition: start.py:1
T getUntrackedParameter(std::string const &, T const &) const
std::chrono::milliseconds const stallThreshold_
auto_ptr< ClusterSequence > cs
ParameterDescriptionBase * addUntracked(U const &iLabel, T const &value)
void preModuleEvent(StreamContext const &, ModuleCallingContext const &)
FWCore Framework interface EventSetupRecordImplementation h
Helper function to determine trigger accepts.
void preSourceEvent(StreamID)
ThreadSafeOutputFileStream file_
void postEvent(StreamContext const &)
decltype(now()) beginTime_
std::ostream & operator<<(std::ostream &out, const ALILine &li)
Definition: ALILine.cc:188
std::string const & moduleLabel() const
void preModuleConstruction(edm::ModuleDescription const &)
#define constexpr
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
std::vector< std::string > moduleLabels_
void postModuleEventPrefetching(StreamContext const &, ModuleCallingContext const &)
void preEventReadFromSource(StreamContext const &, ModuleCallingContext const &)
ModuleDescription const * moduleDescription() const
void preEvent(StreamContext const &)
rep
Definition: cuy.py:1188
StreamID const & streamID() const
Definition: StreamContext.h:57
#define DEFINE_FWK_SERVICE(type)
Definition: ServiceMaker.h:113
void setComment(std::string const &value)
unsigned int value() const
Definition: StreamID.h:46
void postEventReadFromSource(StreamContext const &, ModuleCallingContext const &)
void add(std::string const &label, ParameterSetDescription const &psetDescription)
tbb::concurrent_unordered_map< std::pair< StreamID_value, ModuleID >, decltype(beginTime_)> stallStart_
susybsm::MuonSegment ms
Definition: classes.h:31
double S(const TLorentzVector &, const TLorentzVector &)
Definition: Particle.cc:99
void postSourceEvent(StreamID)
void postModuleEvent(StreamContext const &, ModuleCallingContext const &)
decltype(std::declval< StreamID >().value()) StreamID_value
HLT enums.
#define update(a, b)
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
std::vector< StallStatistics > moduleStats_
EventID const & eventID() const
Definition: StreamContext.h:59
step
decltype(std::declval< ModuleDescription >().id()) ModuleID
long double T
def move(src, dest)
Definition: eostools.py:510
unsigned int id() const