CMS 3D CMS Logo

StallMonitor.cc
Go to the documentation of this file.
1 // -*- C++ -*-
2 //
3 // Package: FWCore/Services
4 // Class : StallMonitor
5 //
6 // Implementation:
7 //
8 // Original Author: Kyle Knoepfel
9 //
10 
26 
27 #include "tbb/concurrent_unordered_map.h"
28 
29 #include <atomic>
30 #include <chrono>
31 #include <iomanip>
32 #include <iostream>
33 #include <sstream>
34 
35 namespace {
36 
37  using clock_t = std::chrono::steady_clock;
38  auto now = clock_t::now;
39 
40  inline auto stream_id(edm::StreamContext const& cs)
41  {
42  return cs.streamID().value();
43  }
44 
45  inline auto module_id(edm::ModuleCallingContext const& mcc)
46  {
47  return mcc.moduleDescription()->id();
48  }
49 
50  //===============================================================
51  class StallStatistics {
52  public:
53 
54  // c'tor receiving 'std::string const&' type not provided since we
55  // must be able to call (e.g.) std::vector<StallStatistics>(20),
56  // for which a default label is not sensible in this context.
57  StallStatistics() = default;
58 
59  std::string const& label() const { return label_; }
60  unsigned numberOfStalls() const { return stallCounter_; }
61 
62  using duration_t = std::chrono::milliseconds;
63  using rep_t = duration_t::rep;
64 
65  duration_t totalStalledTime() const { return duration_t{totalTime_.load()}; }
66  duration_t maxStalledTime() const { return duration_t{maxTime_.load()}; }
67 
68  // Modifiers
69  void setLabel(std::string const& label) { label_ = label; }
70 
71  void update(std::chrono::milliseconds const ms)
72  {
73  ++stallCounter_;
74  auto const thisTime = ms.count();
75  totalTime_ += thisTime;
76  rep_t max {maxTime_};
77  while (thisTime > max && !maxTime_.compare_exchange_strong(max, thisTime));
78  }
79 
80  private:
81  std::string label_ {};
82  std::atomic<unsigned> stallCounter_ {};
83  std::atomic<rep_t> totalTime_ {};
84  std::atomic<rep_t> maxTime_ {};
85  };
86 
87  //===============================================================
88  // Message-assembly utilities
89  template <typename T>
91  concatenate(std::ostream& os, T const t)
92  {
93  os << ' ' << t;
94  }
95 
96  template <typename H, typename... T>
98  concatenate(std::ostream& os, H const h, T const... t)
99  {
100  os << ' ' << h;
101  concatenate(os, t...);
102  }
103 
104  enum class step : char { preSourceEvent = 'S',
105  postSourceEvent = 's',
106  preEvent = 'E',
107  postModuleEventPrefetching = 'p',
108  preModuleEventAcquire = 'A',
109  postModuleEventAcquire = 'a',
110  preModuleEvent = 'M',
111  preEventReadFromSource = 'R',
112  postEventReadFromSource = 'r',
113  postModuleEvent = 'm' ,
114  postEvent = 'e'};
115 
116  std::ostream& operator<<(std::ostream& os, step const s)
117  {
118  os << static_cast<std::underlying_type_t<step>>(s);
119  return os;
120  }
121 
122  template <step S, typename... ARGS>
123  std::string assembleMessage(ARGS const... args)
124  {
125  std::ostringstream oss;
126  oss << S;
127  concatenate(oss, args...);
128  oss << '\n';
129  return oss.str();
130  }
131 
132 }
133 
134 namespace edm {
135  namespace service {
136 
137  class StallMonitor {
138  public:
140  static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
141 
142  private:
143 
144  void preModuleConstruction(edm::ModuleDescription const&);
145  void postBeginJob();
146  void preSourceEvent(StreamID);
147  void postSourceEvent(StreamID);
148  void preEvent(StreamContext const&);
149  void preModuleEventAcquire(StreamContext const&, ModuleCallingContext const&);
150  void postModuleEventAcquire(StreamContext const&, ModuleCallingContext const&);
151  void preModuleEvent(StreamContext const&, ModuleCallingContext const&);
152  void postModuleEventPrefetching(StreamContext const&, ModuleCallingContext const&);
153  void preEventReadFromSource(StreamContext const&, ModuleCallingContext const&);
154  void postEventReadFromSource(StreamContext const&, ModuleCallingContext const&);
155  void postModuleEvent(StreamContext const&, ModuleCallingContext const&);
156  void postEvent(StreamContext const&);
157  void postEndJob();
158 
160  bool const validFile_; // Separate data member from file to improve efficiency.
161  std::chrono::milliseconds const stallThreshold_;
162  decltype(now()) beginTime_ {};
163 
164  // There can be multiple modules per stream. Therefore, we need
165  // the combination of StreamID and ModuleID to correctly track
166  // stalling information. We use tbb::concurrent_unordered_map
167  // for this purpose.
168  using StreamID_value = decltype(std::declval<StreamID>().value());
169  using ModuleID = decltype(std::declval<ModuleDescription>().id());
170  tbb::concurrent_unordered_map<std::pair<StreamID_value,ModuleID>, std::pair<decltype(beginTime_), bool>> stallStart_ {};
171 
172  std::vector<std::string> moduleLabels_ {};
173  std::vector<StallStatistics> moduleStats_ {};
174  };
175 
176  }
177 
178 }
179 
180 namespace {
181  constexpr char const* filename_default {""};
182  constexpr double threshold_default {0.1};
183  std::string const space {" "};
184 }
185 
187 using namespace std::chrono;
188 
189 StallMonitor::StallMonitor(ParameterSet const& iPS, ActivityRegistry& iRegistry)
190  : file_{iPS.getUntrackedParameter<std::string>("fileName", filename_default)}
191  , validFile_{file_}
192  , stallThreshold_{static_cast<long int>(iPS.getUntrackedParameter<double>("stallThreshold")*1000)}
193 {
194  iRegistry.watchPreModuleConstruction(this, &StallMonitor::preModuleConstruction);
195  iRegistry.watchPostBeginJob(this, &StallMonitor::postBeginJob);
196  iRegistry.watchPostModuleEventPrefetching(this, &StallMonitor::postModuleEventPrefetching);
197  iRegistry.watchPreModuleEventAcquire(this, &StallMonitor::preModuleEventAcquire);
198  iRegistry.watchPreModuleEvent(this, &StallMonitor::preModuleEvent);
199  iRegistry.watchPostEndJob(this, &StallMonitor::postEndJob);
200 
201  if (validFile_) {
202  // Only enable the following callbacks if writing to a file.
203  iRegistry.watchPreSourceEvent(this, &StallMonitor::preSourceEvent);
204  iRegistry.watchPostSourceEvent(this, &StallMonitor::postSourceEvent);
205  iRegistry.watchPreEvent(this, &StallMonitor::preEvent);
206  iRegistry.watchPostModuleEventAcquire(this, &StallMonitor::postModuleEventAcquire);
207  iRegistry.watchPreEventReadFromSource(this, &StallMonitor::preEventReadFromSource);
208  iRegistry.watchPostEventReadFromSource(this, &StallMonitor::postEventReadFromSource);
209  iRegistry.watchPostModuleEvent(this, &StallMonitor::postModuleEvent);
210  iRegistry.watchPostEvent(this, &StallMonitor::postEvent);
211 
212  std::ostringstream oss;
213  oss << "# Step Symbol Entries\n"
214  << "# -------------------------- ------ ------------------------------------------\n"
215  << "# preSourceEvent " << step::preSourceEvent << " <Stream ID> <Time since beginJob (ms)>\n"
216  << "# postSourceEvent " << step::postSourceEvent << " <Stream ID> <Time since beginJob (ms)>\n"
217  << "# preEvent " << step::preEvent << " <Stream ID> <Run#> <LumiBlock#> <Event#> <Time since beginJob (ms)>\n"
218  << "# postModuleEventPrefetching " << step::postModuleEventPrefetching << " <Stream ID> <Module ID> <Time since beginJob (ms)>\n"
219  << "# preModuleEventAcquire " << step::preModuleEventAcquire << " <Stream ID> <Module ID> <Time since beginJob (ms)>\n"
220  << "# postModuleEventAcquire " << step::postModuleEventAcquire << " <Stream ID> <Module ID> <Time since beginJob (ms)>\n"
221  << "# preModuleEvent " << step::preModuleEvent << " <Stream ID> <Module ID> <Time since beginJob (ms)>\n"
222  << "# preEventReadFromSource " << step::preEventReadFromSource << " <Stream ID> <Module ID> <Time since beginJob (ms)>\n"
223  << "# postEventReadFromSource " << step::postEventReadFromSource << " <Stream ID> <Module ID> <Time since beginJob (ms)>\n"
224  << "# postModuleEvent " << step::postModuleEvent << " <Stream ID> <Module ID> <Time since beginJob (ms)>\n"
225  << "# postEvent " << step::postEvent << " <Stream ID> <Run#> <LumiBlock#> <Event#> <Time since beginJob (ms)>\n";
226  file_.write(oss.str());
227  }
228 }
229 
231 {
233  desc.addUntracked<std::string>("fileName", filename_default)->setComment("Name of file to which detailed timing information should be written.\n"
234  "An empty filename argument (the default) indicates that no extra\n"
235  "information will be written to a dedicated file, but only the summary\n"
236  "including stalling-modules information will be logged.");
237  desc.addUntracked<double>("stallThreshold", threshold_default)->setComment("Threshold (in seconds) used to classify modules as stalled.\n"
238  "Millisecond granularity allowed.");
239  descriptions.add("StallMonitor", desc);
240  descriptions.setComment("This service keeps track of various times in event-processing to determine which modules are stalling.");
241 }
242 
244 {
245  // Module labels are dense, so if the module id is greater than the
246  // size of moduleLabels_, grow the vector to the correct index and
247  // assign the last entry to the desired label. Note that with the
248  // current implementation, there is no module with ID '0'. In
249  // principle, the module-information vectors are therefore each one
250  // entry too large. However, since removing the entry at the front
251  // makes for awkward indexing later on, and since the sizes of these
252  // extra entries are on the order of bytes, we will leave them in
253  // and skip over them later when printing out summaries. The
254  // extraneous entries can be identified by their module labels being
255  // empty.
256  auto const mid = md.id();
257  if (mid < moduleLabels_.size()) {
258  moduleLabels_[mid] = md.moduleLabel();
259  }
260  else {
261  moduleLabels_.resize(mid+1);
262  moduleLabels_.back() = md.moduleLabel();
263  }
264 }
265 
267 {
268  // Since a (push,emplace)_back cannot be called for a vector of a
269  // type containing atomics (like 'StallStatistics')--i.e. atomics
270  // have no copy/move-assignment operators, we must specify the size
271  // of the vector at construction time.
272  moduleStats_ = std::vector<StallStatistics>(moduleLabels_.size());
273  for (std::size_t i{}; i < moduleStats_.size(); ++i) {
274  moduleStats_[i].setLabel(moduleLabels_[i]);
275  }
276 
277  if (validFile_) {
278  std::size_t const width {std::to_string(moduleLabels_.size()).size()};
279 
280  OStreamColumn col0 {"Module ID", width};
281  std::string const lastCol {"Module label"};
282 
283  std::ostringstream oss;
284  oss << "\n# " << col0 << space << lastCol << '\n';
285  oss << "# " << std::string(col0.width()+space.size()+lastCol.size(),'-') << '\n';
286 
287  for (std::size_t i{} ; i < moduleLabels_.size(); ++i) {
288  auto const& label = moduleLabels_[i];
289  if (label.empty()) continue; // See comment in filling of moduleLabels_;
290  oss << "#M " << std::setw(width) << std::left << col0(i) << space
291  << std::left << moduleLabels_[i] << '\n';
292  }
293  oss << '\n';
294  file_.write(oss.str());
295  }
296  // Don't need the labels anymore--info. is now part of the
297  // module-statistics objects.
298  moduleLabels_.clear();
299 
300  beginTime_ = now();
301 }
302 
304 {
305  auto const t = duration_cast<milliseconds>(now()-beginTime_).count();
306  auto msg = assembleMessage<step::preSourceEvent>(sid.value(), t);
307  file_.write(std::move(msg));
308 }
309 
311 {
312  auto const t = duration_cast<milliseconds>(now()-beginTime_).count();
313  auto msg = assembleMessage<step::postSourceEvent>(sid.value(), t);
314  file_.write(std::move(msg));
315 }
316 
318 {
319  auto const t = duration_cast<milliseconds>(now()-beginTime_).count();
320  auto const& eid = sc.eventID();
321  auto msg = assembleMessage<step::preEvent>(stream_id(sc), eid.run(), eid.luminosityBlock(), eid.event(), t);
322  file_.write(std::move(msg));
323 }
324 
326 {
327  auto const sid = stream_id(sc);
328  auto const mid = module_id(mcc);
329  auto start = stallStart_[std::make_pair(sid,mid)] = std::make_pair(now(), false);
330 
331  if (validFile_) {
332  auto const t = duration_cast<milliseconds>(start.first - beginTime_).count();
333  auto msg = assembleMessage<step::postModuleEventPrefetching>(sid, mid, t);
334  file_.write(std::move(msg));
335  }
336 }
337 
339 {
340  auto const preModEventAcquire = now();
341  auto const sid = stream_id(sc);
342  auto const mid = module_id(mcc);
343  auto & start = stallStart_[std::make_pair(sid,mid)];
344  auto startT = start.first.time_since_epoch();
345  start.second = true; // record so the preModuleEvent knows that acquire was called
346  if (validFile_) {
347  auto t = duration_cast<milliseconds>(preModEventAcquire - beginTime_).count();
348  auto msg = assembleMessage<step::preModuleEventAcquire>(sid, mid, t);
349  file_.write(std::move(msg));
350  }
351  // Check for stalls if prefetch was called
352  if( milliseconds::duration::zero() != startT) {
353  auto const preFetch_to_preModEventAcquire = duration_cast<milliseconds>(preModEventAcquire - start.first);
354  if (preFetch_to_preModEventAcquire < stallThreshold_) return;
355  moduleStats_[mid].update(preFetch_to_preModEventAcquire);
356  }
357 }
358 
360 {
361  auto const postModEventAcquire = duration_cast<milliseconds>(now()-beginTime_).count();
362  auto msg = assembleMessage<step::postModuleEventAcquire>(stream_id(sc), module_id(mcc), postModEventAcquire);
363  file_.write(std::move(msg));
364 }
365 
367 {
368  auto const preModEvent = now();
369  auto const sid = stream_id(sc);
370  auto const mid = module_id(mcc);
371  auto const& start = stallStart_[std::make_pair(sid,mid)];
372  auto startT = start.first.time_since_epoch();
373  if (validFile_) {
374  auto t = duration_cast<milliseconds>(preModEvent-beginTime_).count();
375  auto msg = assembleMessage<step::preModuleEvent>(sid, mid, t);
376  file_.write(std::move(msg));
377  }
378  // Check for stalls if prefetch was called and we did not already check before acquire
379  if( milliseconds::duration::zero() != startT && !start.second) {
380  auto const preFetch_to_preModEvent = duration_cast<milliseconds>(preModEvent-start.first);
381  if (preFetch_to_preModEvent < stallThreshold_) return;
382  moduleStats_[mid].update(preFetch_to_preModEvent);
383  }
384 }
385 
387 {
388  auto const t = duration_cast<milliseconds>(now()-beginTime_).count();
389  auto msg = assembleMessage<step::preEventReadFromSource>(stream_id(sc), module_id(mcc), t);
390  file_.write(std::move(msg));
391 }
392 
394 {
395  auto const t = duration_cast<milliseconds>(now()-beginTime_).count();
396  auto msg = assembleMessage<step::postEventReadFromSource>(stream_id(sc), module_id(mcc), t);
397  file_.write(std::move(msg));
398 }
399 
401 {
402  auto const postModEvent = duration_cast<milliseconds>(now()-beginTime_).count();
403  auto msg = assembleMessage<step::postModuleEvent>(stream_id(sc), module_id(mcc), postModEvent);
404  file_.write(std::move(msg));
405 }
406 
408 {
409  auto const t = duration_cast<milliseconds>(now()-beginTime_).count();
410  auto const& eid = sc.eventID();
411  auto msg = assembleMessage<step::postEvent>(stream_id(sc), eid.run(), eid.luminosityBlock(), eid.event(), t);
412  file_.write(std::move(msg));
413 }
414 
416 {
417  // Prepare summary
418  std::size_t width {};
419  edm::for_all(moduleStats_, [&width](auto const& stats) {
420  if (stats.numberOfStalls() == 0u) return;
421  width = std::max(width, stats.label().size());
422  });
423 
424  OStreamColumn tag {"StallMonitor>"};
425  OStreamColumn col1 {"Module label", width};
426  OStreamColumn col2 {"# of stalls"};
427  OStreamColumn col3 {"Total stalled time"};
428  OStreamColumn col4 {"Max stalled time"};
429 
430  LogAbsolute out {"StallMonitor"};
431  out << '\n';
432  out << tag << space
433  << col1 << space
434  << col2 << space
435  << col3 << space
436  << col4 << '\n';
437 
438  out << tag << space
439  << std::setfill('-')
440  << col1(std::string{}) << space
441  << col2(std::string{}) << space
442  << col3(std::string{}) << space
443  << col4(std::string{}) << '\n';
444 
445  using seconds_d = duration<double>;
446 
447  auto to_seconds_str = [](auto const& duration){
448  std::ostringstream oss;
449  auto const time = duration_cast<seconds_d>(duration).count();
450  oss << time << " s";
451  return oss.str();
452  };
453 
454  out << std::setfill(' ');
455  for (auto const& stats : moduleStats_) {
456  if (stats.label().empty() || // See comment in filling of moduleLabels_;
457  stats.numberOfStalls() == 0u) continue;
458  out << std::left
459  << tag << space
460  << col1(stats.label()) << space
461  << std::right
462  << col2(stats.numberOfStalls()) << space
463  << col3(to_seconds_str(stats.totalStalledTime())) << space
464  << col4(to_seconds_str(stats.maxStalledTime())) << '\n';
465  }
466 }
467 
size
Write out results.
Definition: start.py:1
T getUntrackedParameter(std::string const &, T const &) const
std::chrono::milliseconds const stallThreshold_
auto_ptr< ClusterSequence > cs
ParameterDescriptionBase * addUntracked(U const &iLabel, T const &value)
void preModuleEvent(StreamContext const &, ModuleCallingContext const &)
FWCore Framework interface EventSetupRecordImplementation h
Helper function to determine trigger accepts.
void preSourceEvent(StreamID)
ThreadSafeOutputFileStream file_
void postEvent(StreamContext const &)
decltype(now()) beginTime_
std::ostream & operator<<(std::ostream &out, const ALILine &li)
Definition: ALILine.cc:188
std::string const & moduleLabel() const
void preModuleConstruction(edm::ModuleDescription const &)
#define constexpr
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
std::vector< std::string > moduleLabels_
void postModuleEventPrefetching(StreamContext const &, ModuleCallingContext const &)
void preEventReadFromSource(StreamContext const &, ModuleCallingContext const &)
ModuleDescription const * moduleDescription() const
void preEvent(StreamContext const &)
rep
Definition: cuy.py:1188
StreamID const & streamID() const
Definition: StreamContext.h:57
void preModuleEventAcquire(StreamContext const &, ModuleCallingContext const &)
#define DEFINE_FWK_SERVICE(type)
Definition: ServiceMaker.h:113
void setComment(std::string const &value)
void postModuleEventAcquire(StreamContext const &, ModuleCallingContext const &)
unsigned int value() const
Definition: StreamID.h:46
void postEventReadFromSource(StreamContext const &, ModuleCallingContext const &)
void add(std::string const &label, ParameterSetDescription const &psetDescription)
tuple msg
Definition: mps_check.py:277
susybsm::MuonSegment ms
Definition: classes.h:31
double S(const TLorentzVector &, const TLorentzVector &)
Definition: Particle.cc:99
void postSourceEvent(StreamID)
void postModuleEvent(StreamContext const &, ModuleCallingContext const &)
decltype(std::declval< StreamID >().value()) StreamID_value
HLT enums.
#define update(a, b)
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
std::vector< StallStatistics > moduleStats_
EventID const & eventID() const
Definition: StreamContext.h:59
step
decltype(std::declval< ModuleDescription >().id()) ModuleID
long double T
tbb::concurrent_unordered_map< std::pair< StreamID_value, ModuleID >, std::pair< decltype(beginTime_), bool > > stallStart_
def move(src, dest)
Definition: eostools.py:510
unsigned int id() const