test
CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
StallMonitor.cc
Go to the documentation of this file.
1 // -*- C++ -*-
2 //
3 // Package: FWCore/Services
4 // Class : StallMonitor
5 //
6 // Implementation:
7 //
8 // Original Author: Kyle Knoepfel
9 //
10 
26 
27 #include "tbb/concurrent_unordered_map.h"
28 
29 #include <atomic>
30 #include <chrono>
31 #include <iomanip>
32 #include <iostream>
33 #include <sstream>
34 
35 namespace {
36 
37  using clock_t = std::chrono::steady_clock;
38  auto now = clock_t::now;
39 
40  inline auto stream_id(edm::StreamContext const& cs)
41  {
42  return cs.streamID().value();
43  }
44 
45  inline auto module_id(edm::ModuleCallingContext const& mcc)
46  {
47  return mcc.moduleDescription()->id();
48  }
49 
50  //===============================================================
51  class StallStatistics {
52  public:
53 
54  // c'tor receiving 'std::string const&' type not provided since we
55  // must be able to call (e.g.) std::vector<StallStatistics>(20),
56  // for which a default label is not sensible in this context.
57  StallStatistics() = default;
58 
59  std::string const& label() const { return label_; }
60  unsigned numberOfStalls() const { return stallCounter_; }
61 
62  using duration_t = std::chrono::milliseconds;
63  using rep_t = duration_t::rep;
64 
65  duration_t totalStalledTime() const { return duration_t{totalTime_.load()}; }
66  duration_t maxStalledTime() const { return duration_t{maxTime_.load()}; }
67 
68  // Modifiers
69  void setLabel(std::string const& label) { label_ = label; }
70 
71  void update(std::chrono::milliseconds const ms)
72  {
73  ++stallCounter_;
74  auto const thisTime = ms.count();
75  totalTime_ += thisTime;
76  rep_t max {maxTime_};
77  while (thisTime > max && !maxTime_.compare_exchange_strong(max, thisTime));
78  }
79 
80  private:
81  std::string label_ {};
82  std::atomic<unsigned> stallCounter_ {};
83  std::atomic<rep_t> totalTime_ {};
84  std::atomic<rep_t> maxTime_ {};
85  };
86 
87  //===============================================================
88  // Message-assembly utilities
89  template <typename T>
91  concatenate(std::ostream& os, T const t)
92  {
93  os << ' ' << t;
94  }
95 
96  template <typename H, typename... T>
98  concatenate(std::ostream& os, H const h, T const... t)
99  {
100  os << ' ' << h;
101  concatenate(os, t...);
102  }
103 
104  enum class step : char { preEvent = 'E',
105  postModuleEventPrefetching = 'p',
106  preModuleEvent = 'M',
107  preEventReadFromSource = 'R',
108  postEventReadFromSource = 'r',
109  postModuleEvent = 'm' ,
110  postEvent = 'e'};
111 
112  std::ostream& operator<<(std::ostream& os, step const s)
113  {
114  os << static_cast<std::underlying_type_t<step>>(s);
115  return os;
116  }
117 
118  template <step S, typename... ARGS>
119  std::string assembleMessage(ARGS const... args)
120  {
121  std::ostringstream oss;
122  oss << S;
123  concatenate(oss, args...);
124  oss << '\n';
125  return oss.str();
126  }
127 
128 }
129 
130 namespace edm {
131  namespace service {
132 
133  class StallMonitor {
134  public:
136  static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
137 
138  private:
139 
141  void postBeginJob();
142  void preEvent(StreamContext const&);
148  void postEvent(StreamContext const&);
149  void postEndJob();
150 
152  bool const validFile_; // Separate data member from file to improve efficiency.
153  std::chrono::milliseconds const stallThreshold_;
154  decltype(now()) beginTime_ {};
155 
156  // There can be multiple modules per stream. Therefore, we need
157  // the combination of StreamID and ModuleID to correctly track
158  // stalling information. We use tbb::concurrent_unordered_map
159  // for this purpose.
160  using StreamID_value = decltype(std::declval<StreamID>().value());
161  using ModuleID = decltype(std::declval<ModuleDescription>().id());
162  tbb::concurrent_unordered_map<std::pair<StreamID_value,ModuleID>, decltype(beginTime_)> stallStart_ {};
163 
164  std::vector<std::string> moduleLabels_ {};
165  std::vector<StallStatistics> moduleStats_ {};
166  };
167 
168  }
169 
170 }
171 
172 namespace {
173  constexpr char const* filename_default {""};
174  constexpr double threshold_default {0.1};
175  std::string const space {" "};
176 }
177 
179 using namespace std::chrono;
180 
181 StallMonitor::StallMonitor(ParameterSet const& iPS, ActivityRegistry& iRegistry)
182  : file_{iPS.getUntrackedParameter<std::string>("fileName", filename_default)}
183  , validFile_{file_}
184  , stallThreshold_{static_cast<long int>(iPS.getUntrackedParameter<double>("stallThreshold", threshold_default)*1000)}
185 {
190  iRegistry.watchPostEndJob(this, &StallMonitor::postEndJob);
191 
192  if (validFile_) {
193  // Only enable the following callbacks if writing to a file.
194  iRegistry.watchPreEvent(this, &StallMonitor::preEvent);
198  iRegistry.watchPostEvent(this, &StallMonitor::postEvent);
199 
200  std::ostringstream oss;
201  oss << "# Step Symbol Entries\n"
202  << "# -------------------------- ------ ------------------------------------------\n"
203  << "# preEvent " << step::preEvent << " <Stream ID> <Run#> <LumiBlock#> <Event#> <Time since beginJob (ms)>\n"
204  << "# postModuleEventPrefetching " << step::postModuleEventPrefetching << " <Stream ID> <Module ID> <Time since beginJob (ms)>\n"
205  << "# preModuleEvent " << step::preModuleEvent << " <Stream ID> <Module ID> <Time since beginJob (ms)>\n"
206  << "# preEventReadFromSource " << step::preEventReadFromSource << " <Stream ID> <Module ID> <Time since beginJob (ms)>\n"
207  << "# postEventReadFromSource " << step::postEventReadFromSource << " <Stream ID> <Module ID> <Time since beginJob (ms)>\n"
208  << "# postModuleEvent " << step::postModuleEvent << " <Stream ID> <Module ID> <Time since beginJob (ms)>\n"
209  << "# postEvent " << step::postEvent << " <Stream ID> <Run#> <LumiBlock#> <Event#> <Time since beginJob (ms)>\n";
210  file_.write(oss.str());
211  }
212 }
213 
215 {
217  desc.addUntracked<std::string>("fileName", filename_default)->setComment("Name of file to which detailed timing information should be written.\n"
218  "An empty filename argument (the default) indicates that no extra\n"
219  "information will be written to a dedicated file, but only the summary\n"
220  "including stalling-modules information will be logged.");
221  desc.addUntracked<double>("stallThreshold", threshold_default)->setComment("Threshold (in seconds) used to classify modules as stalled.\n"
222  "Millisecond granularity allowed.");
223  descriptions.add("StallMonitor", desc);
224  descriptions.setComment("This service keeps track of various times in event-processing to determine which modules are stalling.");
225 }
226 
228 {
229  // Module labels are dense, so if the module id is greater than the
230  // size of moduleLabels_, grow the vector to the correct index and
231  // assign the last entry to the desired label. Note that with the
232  // current implementation, there is no module with ID '0'. In
233  // principle, the module-information vectors are therefore each one
234  // entry too large. However, since removing the entry at the front
235  // makes for awkward indexing later on, and since the sizes of these
236  // extra entries are on the order of bytes, we will leave them in
237  // and skip over them later when printing out summaries. The
238  // extraneous entries can be identified by their module labels being
239  // empty.
240  auto const mid = md.id();
241  if (mid < moduleLabels_.size()) {
242  moduleLabels_[mid] = md.moduleLabel();
243  }
244  else {
245  moduleLabels_.resize(mid+1);
246  moduleLabels_.back() = md.moduleLabel();
247  }
248 }
249 
251 {
252  // Since a (push,emplace)_back cannot be called for a vector of a
253  // type containing atomics (like 'StallStatistics')--i.e. atomics
254  // have no copy/move-assignment operators, we must specify the size
255  // of the vector at construction time.
256  moduleStats_ = std::vector<StallStatistics>(moduleLabels_.size());
257  for (std::size_t i{}; i < moduleStats_.size(); ++i) {
258  moduleStats_[i].setLabel(moduleLabels_[i]);
259  }
260 
261  if (validFile_) {
262  std::size_t const width {std::to_string(moduleLabels_.size()).size()};
263 
264  OStreamColumn col0 {"Module ID", width};
265  std::string const lastCol {"Module label"};
266 
267  std::ostringstream oss;
268  oss << "\n# " << col0 << space << lastCol << '\n';
269  oss << "# " << std::string(col0.width()+space.size()+lastCol.size(),'-') << '\n';
270 
271  for (std::size_t i{} ; i < moduleLabels_.size(); ++i) {
272  auto const& label = moduleLabels_[i];
273  if (label.empty()) continue; // See comment in filling of moduleLabels_;
274  oss << "#M " << std::setw(width) << std::left << col0(i) << space
275  << std::left << moduleLabels_[i] << '\n';
276  }
277  oss << '\n';
278  file_.write(oss.str());
279  }
280  // Don't need the labels anymore--info. is now part of the
281  // module-statistics objects.
282  moduleLabels_.clear();
283 
284  beginTime_ = now();
285 }
286 
288 {
289  auto const t = duration_cast<milliseconds>(now()-beginTime_).count();
290  auto const& eid = sc.eventID();
291  auto msg = assembleMessage<step::preEvent>(stream_id(sc), eid.run(), eid.luminosityBlock(), eid.event(), t);
293 }
294 
296 {
297  auto const sid = stream_id(sc);
298  auto const mid = module_id(mcc);
299  auto start = stallStart_[std::make_pair(sid,mid)] = now();
300 
301  if (validFile_) {
302  auto const t = duration_cast<milliseconds>(start-beginTime_).count();
303  auto msg = assembleMessage<step::postModuleEventPrefetching>(sid, mid, t);
305  }
306 }
307 
309 {
310  auto const preModEvent = now();
311  auto const sid = stream_id(sc);
312  auto const mid = module_id(mcc);
313  if (validFile_) {
314  auto msg = assembleMessage<step::preModuleEvent>(sid, mid, duration_cast<milliseconds>(preModEvent-beginTime_).count());
316  }
317 
318  auto const preFetch_to_preModEvent = duration_cast<milliseconds>(preModEvent-stallStart_[std::make_pair(sid,mid)]);
319  if (preFetch_to_preModEvent < stallThreshold_) return;
320  moduleStats_[mid].update(preFetch_to_preModEvent);
321 }
322 
324 {
325  auto const t = duration_cast<milliseconds>(now()-beginTime_).count();
326  auto msg = assembleMessage<step::preEventReadFromSource>(stream_id(sc), module_id(mcc), t);
328 }
329 
331 {
332  auto const t = duration_cast<milliseconds>(now()-beginTime_).count();
333  auto msg = assembleMessage<step::postEventReadFromSource>(stream_id(sc), module_id(mcc), t);
335 }
336 
338 {
339  auto const postModEvent = duration_cast<milliseconds>(now()-beginTime_).count();
340  auto msg = assembleMessage<step::postModuleEvent>(stream_id(sc), module_id(mcc), postModEvent);
342 }
343 
345 {
346  auto const t = duration_cast<milliseconds>(now()-beginTime_).count();
347  auto const& eid = sc.eventID();
348  auto msg = assembleMessage<step::postEvent>(stream_id(sc), eid.run(), eid.luminosityBlock(), eid.event(), t);
350 }
351 
353 {
354  // Prepare summary
355  std::size_t width {};
356  edm::for_all(moduleStats_, [&width](auto const& stats) {
357  if (stats.numberOfStalls() == 0u) return;
358  width = std::max(width, stats.label().size());
359  });
360 
361  OStreamColumn tag {"StallMonitor>"};
362  OStreamColumn col1 {"Module label", width};
363  OStreamColumn col2 {"# of stalls"};
364  OStreamColumn col3 {"Total stalled time"};
365  OStreamColumn col4 {"Max stalled time"};
366 
367  LogAbsolute out {"StallMonitor"};
368  out << '\n';
369  out << tag << space
370  << col1 << space
371  << col2 << space
372  << col3 << space
373  << col4 << '\n';
374 
375  out << tag << space
376  << std::setfill('-')
377  << col1(std::string{}) << space
378  << col2(std::string{}) << space
379  << col3(std::string{}) << space
380  << col4(std::string{}) << '\n';
381 
382  using seconds_d = duration<double>;
383 
384  auto to_seconds_str = [](auto const& duration){
385  std::ostringstream oss;
386  auto const time = duration_cast<seconds_d>(duration).count();
387  oss << time << " s";
388  return oss.str();
389  };
390 
391  out << std::setfill(' ');
392  for (auto const& stats : moduleStats_) {
393  if (stats.label().empty() || // See comment in filling of moduleLabels_;
394  stats.numberOfStalls() == 0u) continue;
395  out << std::left
396  << tag << space
397  << col1(stats.label()) << space
398  << std::right
399  << col2(stats.numberOfStalls()) << space
400  << col3(to_seconds_str(stats.totalStalledTime())) << space
401  << col4(to_seconds_str(stats.maxStalledTime())) << '\n';
402  }
403 }
404 
T getUntrackedParameter(std::string const &, T const &) const
void watchPreEvent(PreEvent::slot_type const &iSlot)
int i
Definition: DBlmapReader.cc:9
string rep
Definition: cuy.py:1188
std::chrono::milliseconds const stallThreshold_
auto_ptr< ClusterSequence > cs
ParameterDescriptionBase * addUntracked(U const &iLabel, T const &value)
void preModuleEvent(StreamContext const &, ModuleCallingContext const &)
FWCore Framework interface EventSetupRecordImplementation h
Helper function to determine trigger accepts.
void watchPostEndJob(PostEndJob::slot_type const &iSlot)
void watchPreModuleEvent(PreModuleEvent::slot_type const &iSlot)
void watchPreModuleConstruction(PreModuleConstruction::slot_type const &iSlot)
ThreadSafeOutputFileStream file_
void watchPostEvent(PostEvent::slot_type const &iSlot)
void postEvent(StreamContext const &)
decltype(now()) beginTime_
void watchPreEventReadFromSource(PreEventReadFromSource::slot_type const &iSlot)
void watchPostModuleEvent(PostModuleEvent::slot_type const &iSlot)
std::ostream & operator<<(std::ostream &out, const ALILine &li)
Definition: ALILine.cc:188
std::string const & moduleLabel() const
void preModuleConstruction(edm::ModuleDescription const &)
#define constexpr
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
void watchPostModuleEventPrefetching(PostModuleEventPrefetching::slot_type const &iSlot)
std::vector< std::string > moduleLabels_
void postModuleEventPrefetching(StreamContext const &, ModuleCallingContext const &)
void preEventReadFromSource(StreamContext const &, ModuleCallingContext const &)
def move
Definition: eostools.py:510
ModuleDescription const * moduleDescription() const
void preEvent(StreamContext const &)
StallMonitor(ParameterSet const &, ActivityRegistry &)
StreamID const & streamID() const
Definition: StreamContext.h:57
#define DEFINE_FWK_SERVICE(type)
Definition: ServiceMaker.h:113
void setComment(std::string const &value)
unsigned int value() const
Definition: StreamID.h:46
void postEventReadFromSource(StreamContext const &, ModuleCallingContext const &)
void add(std::string const &label, ParameterSetDescription const &psetDescription)
tbb::concurrent_unordered_map< std::pair< StreamID_value, ModuleID >, decltype(beginTime_)> stallStart_
susybsm::MuonSegment ms
Definition: classes.h:31
double S(const TLorentzVector &, const TLorentzVector &)
Definition: Particle.cc:99
void postModuleEvent(StreamContext const &, ModuleCallingContext const &)
decltype(std::declval< StreamID >().value()) StreamID_value
#define update(a, b)
void watchPostEventReadFromSource(PostEventReadFromSource::slot_type const &iSlot)
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
std::vector< StallStatistics > moduleStats_
EventID const & eventID() const
Definition: StreamContext.h:59
step
decltype(std::declval< ModuleDescription >().id()) ModuleID
long double T
tuple size
Write out results.
unsigned int id() const
void watchPostBeginJob(PostBeginJob::slot_type const &iSlot)
convenience function for attaching to signal