28 #include "tbb/concurrent_unordered_map.h" 38 using duration_t = std::chrono::microseconds;
39 using clock_t = std::chrono::steady_clock;
47 class StallStatistics {
52 StallStatistics() =
default;
55 unsigned numberOfStalls()
const {
return stallCounter_; }
59 duration_t totalStalledTime()
const {
return duration_t{totalTime_.load()}; }
60 duration_t maxStalledTime()
const {
return duration_t{maxTime_.load()}; }
67 auto const thisTime = ms.count();
68 totalTime_ += thisTime;
70 while (thisTime >
max && !maxTime_.compare_exchange_strong(
max, thisTime))
76 std::atomic<unsigned> stallCounter_{};
77 std::atomic<rep_t> totalTime_{};
78 std::atomic<rep_t> maxTime_{};
88 template <
typename H,
typename...
T>
91 concatenate(os, t...);
96 postSourceEvent =
's',
98 postModuleEventPrefetching =
'p',
99 preModuleEventAcquire =
'A',
100 postModuleEventAcquire =
'a',
101 preModuleEvent =
'M',
102 preEventReadFromSource =
'R',
103 postEventReadFromSource =
'r',
104 postModuleEvent =
'm',
121 os << static_cast<std::underlying_type_t<step>>(
s);
126 os << static_cast<std::underlying_type_t<Phase>>(
s);
130 template <
step S,
typename... ARGS>
132 std::ostringstream oss;
134 concatenate(oss, args...);
142 case StreamContext::Transition::kBeginRun:
143 return Phase::streamBeginRun;
144 case StreamContext::Transition::kBeginLuminosityBlock:
145 return Phase::streamBeginLumi;
146 case StreamContext::Transition::kEvent:
148 case StreamContext::Transition::kEndLuminosityBlock:
149 return Phase::streamEndLumi;
150 case StreamContext::Transition::kEndRun:
151 return Phase::streamEndRun;
159 auto toTransition(
edm::StreamContext const& iContext) -> std::underlying_type_t<Phase> {
160 return static_cast<std::underlying_type_t<Phase>
>(toTransitionImpl(iContext));
166 case GlobalContext::Transition::kBeginRun:
167 return Phase::globalBeginRun;
168 case GlobalContext::Transition::kBeginLuminosityBlock:
169 return Phase::globalBeginLumi;
170 case GlobalContext::Transition::kEndLuminosityBlock:
171 return Phase::globalEndLumi;
172 case GlobalContext::Transition::kWriteLuminosityBlock:
173 return Phase::globalEndLumi;
174 case GlobalContext::Transition::kEndRun:
175 return Phase::globalEndRun;
176 case GlobalContext::Transition::kWriteRun:
177 return Phase::globalEndRun;
184 auto toTransition(
edm::GlobalContext const& iContext) -> std::underlying_type_t<Phase> {
185 return static_cast<std::underlying_type_t<Phase>
>(toTransitionImpl(iContext));
221 decltype(
now()) beginTime_{};
228 using ModuleID = decltype(std::declval<ModuleDescription>().
id());
229 tbb::concurrent_unordered_map<std::pair<StreamID_value, ModuleID>, std::pair<decltype(beginTime_), bool>>
232 std::vector<std::string> moduleLabels_{};
233 std::vector<StallStatistics> moduleStats_{};
242 constexpr char const* filename_default{
""};
254 std::chrono::round<duration_t>(duration<double>(iPS.getUntrackedParameter<
double>(
"stallThreshold")))} {
297 iRegistry.preallocateSignal_.connect(
300 std::ostringstream oss;
301 oss <<
"# Transition Symbol\n";
302 oss <<
"#----------------- ------\n";
303 oss <<
"# globalBeginRun " << Phase::globalBeginRun <<
"\n" 304 <<
"# streamBeginRun " << Phase::streamBeginRun <<
"\n" 305 <<
"# globalBeginLumi " << Phase::globalBeginLumi <<
"\n" 306 <<
"# streamBeginLumi " << Phase::streamBeginLumi <<
"\n" 308 <<
"# streamEndLumi " << Phase::streamEndLumi <<
"\n" 309 <<
"# globalEndLumi " << Phase::globalEndLumi <<
"\n" 310 <<
"# streamEndRun " << Phase::streamEndRun <<
"\n" 311 <<
"# globalEndRun " << Phase::globalEndRun <<
"\n";
312 oss <<
"# Step Symbol Entries\n" 313 <<
"# -------------------------- ------ ------------------------------------------\n" 314 <<
"# preSourceEvent " << step::preSourceEvent <<
" <Stream ID> <Time since beginJob (ms)>\n" 315 <<
"# postSourceEvent " << step::postSourceEvent <<
" <Stream ID> <Time since beginJob (ms)>\n" 316 <<
"# preEvent " << step::preEvent
317 <<
" <Stream ID> <Run#> <LumiBlock#> <Event#> <Time since beginJob (ms)>\n" 318 <<
"# postModuleEventPrefetching " << step::postModuleEventPrefetching
319 <<
" <Stream ID> <Module ID> <Time since beginJob (ms)>\n" 320 <<
"# preModuleEventAcquire " << step::preModuleEventAcquire
321 <<
" <Stream ID> <Module ID> <Time since beginJob (ms)>\n" 322 <<
"# postModuleEventAcquire " << step::postModuleEventAcquire
323 <<
" <Stream ID> <Module ID> <Time since beginJob (ms)>\n" 324 <<
"# preModuleTransition " << step::preModuleEvent
325 <<
" <Stream ID> <Module ID> <Transition type> <Time since beginJob (ms)>\n" 326 <<
"# preEventReadFromSource " << step::preEventReadFromSource
327 <<
" <Stream ID> <Module ID> <Time since beginJob (ms)>\n" 328 <<
"# postEventReadFromSource " << step::postEventReadFromSource
329 <<
" <Stream ID> <Module ID> <Time since beginJob (ms)>\n" 330 <<
"# postModuleTransition " << step::postModuleEvent
331 <<
" <Stream ID> <Module ID> <Transition type> <Time since beginJob (ms)>\n" 332 <<
"# postEvent " << step::postEvent
333 <<
" <Stream ID> <Run#> <LumiBlock#> <Event#> <Time since beginJob (ms)>\n";
334 file_.
write(oss.str());
342 "Name of file to which detailed timing information should be written.\n" 343 "An empty filename argument (the default) indicates that no extra\n" 344 "information will be written to a dedicated file, but only the summary\n" 345 "including stalling-modules information will be logged.");
346 desc.
addUntracked<
double>(
"stallThreshold", threshold_default)
348 "Threshold (in seconds) used to classify modules as stalled.\n" 349 "Microsecond granularity allowed.");
350 descriptions.
add(
"StallMonitor", desc);
352 "This service keeps track of various times in event-processing to determine which modules are stalling.");
367 auto const mid = md.
id();
392 std::ostringstream oss;
393 oss <<
"\n# " << col0 << space << lastCol <<
'\n';
394 oss <<
"# " <<
std::string(col0.width() + space.size() + lastCol.size(),
'-') <<
'\n';
400 oss <<
"#M " << std::setw(
width) << std::left << col0(
i) << space << std::left <<
moduleLabels_[
i] <<
'\n';
403 file_.
write(oss.str());
414 auto msg = assembleMessage<step::preSourceEvent>(sid.
value(),
t);
420 auto msg = assembleMessage<step::postSourceEvent>(sid.
value(),
t);
427 auto msg = assembleMessage<step::preEvent>(stream_id(sc),
eid.run(),
eid.luminosityBlock(),
eid.event(),
t);
432 auto const sid = stream_id(sc);
433 auto const mid = module_id(mcc);
438 auto msg = assembleMessage<step::postModuleEventPrefetching>(sid, mid,
t);
444 auto const preModEventAcquire =
now();
445 auto const sid = stream_id(sc);
446 auto const mid = module_id(mcc);
451 auto t = duration_cast<duration_t>(preModEventAcquire -
beginTime_).
count();
452 auto msg = assembleMessage<step::preModuleEventAcquire>(sid, mid,
t);
456 if (duration_t::duration::zero() !=
startT) {
457 auto const preFetch_to_preModEventAcquire = duration_cast<duration_t>(preModEventAcquire -
start.first);
458 if (preFetch_to_preModEventAcquire < stallThreshold_)
460 moduleStats_[mid].update(preFetch_to_preModEventAcquire);
465 auto const postModEventAcquire = duration_cast<duration_t>(
now() -
beginTime_).
count();
466 auto msg = assembleMessage<step::postModuleEventAcquire>(stream_id(sc), module_id(mcc), postModEventAcquire);
471 auto const preModEvent =
now();
472 auto const sid = stream_id(sc);
473 auto const mid = module_id(mcc);
479 assembleMessage<step::preModuleEvent>(sid, mid,
static_cast<std::underlying_type_t<Phase>
>(
Phase::Event),
t);
483 if (duration_t::duration::zero() !=
startT && !
start.second) {
484 auto const preFetch_to_preModEvent = duration_cast<duration_t>(preModEvent -
start.first);
485 if (preFetch_to_preModEvent < stallThreshold_)
492 auto const tNow =
now();
493 auto const sid = stream_id(sc);
494 auto const mid = module_id(mcc);
496 auto msg = assembleMessage<step::preModuleEvent>(sid, mid, toTransition(sc),
t);
502 auto msg = assembleMessage<step::postModuleEvent>(stream_id(sc), module_id(mcc), toTransition(sc),
t);
508 auto msg = assembleMessage<step::preModuleEvent>(
numStreams_, module_id(mcc), toTransition(gc),
t);
514 auto msg = assembleMessage<step::postModuleEvent>(
numStreams_, module_id(mcc), toTransition(gc), postModTime);
520 auto msg = assembleMessage<step::preEventReadFromSource>(stream_id(sc), module_id(mcc),
t);
526 auto msg = assembleMessage<step::postEventReadFromSource>(stream_id(sc), module_id(mcc),
t);
532 auto msg = assembleMessage<step::postModuleEvent>(
533 stream_id(sc), module_id(mcc),
static_cast<std::underlying_type_t<Phase>
>(
Phase::Event), postModEvent);
540 auto msg = assembleMessage<step::postEvent>(stream_id(sc),
eid.run(),
eid.luminosityBlock(),
eid.event(),
t);
548 if (stats.numberOfStalls() == 0u)
561 out <<
tag << space << col1 << space << col2 << space << col3 << space << col4 <<
'\n';
566 using seconds_d = duration<double>;
568 auto to_seconds_str = [](
auto const& duration) {
569 std::ostringstream oss;
570 auto const time = duration_cast<seconds_d>(duration).
count();
575 out << std::setfill(
' ');
577 if (stats.label().empty() ||
578 stats.numberOfStalls() == 0u)
580 out << std::left <<
tag << space << col1(stats.label()) << space << std::right << col2(stats.numberOfStalls())
581 << space << col3(to_seconds_str(stats.totalStalledTime())) << space
582 << col4(to_seconds_str(stats.maxStalledTime())) <<
'\n';
T getUntrackedParameter(std::string const &, T const &) const
ParameterDescriptionBase * addUntracked(U const &iLabel, T const &value)
void preModuleEvent(StreamContext const &, ModuleCallingContext const &)
FWCore Framework interface EventSetupRecordImplementation h
Helper function to determine trigger accepts.
void preSourceEvent(StreamID)
unique_ptr< ClusterSequence > cs
ThreadSafeOutputFileStream file_
void postEvent(StreamContext const &)
decltype(now()) beginTime_
std::ostream & operator<<(std::ostream &out, const ALILine &li)
std::string const & moduleLabel() const
void preModuleConstruction(edm::ModuleDescription const &)
void preModuleStreamTransition(StreamContext const &, ModuleCallingContext const &)
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
unsigned int maxNumberOfStreams() const
std::vector< std::string > moduleLabels_
Transition transition() const
void postModuleEventPrefetching(StreamContext const &, ModuleCallingContext const &)
void preEventReadFromSource(StreamContext const &, ModuleCallingContext const &)
ModuleDescription const * moduleDescription() const
duration_t const stallThreshold_
void preEvent(StreamContext const &)
#define DEFINE_FWK_SERVICE(type)
tbb::concurrent_unordered_map< std::pair< StreamID_value, ModuleID >, std::pair< decltype(beginTime_), bool > > stallStart_
StreamID const & streamID() const
void preModuleEventAcquire(StreamContext const &, ModuleCallingContext const &)
void setComment(std::string const &value)
void postModuleEventAcquire(StreamContext const &, ModuleCallingContext const &)
unsigned int value() const
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
void postEventReadFromSource(StreamContext const &, ModuleCallingContext const &)
void postModuleGlobalTransition(GlobalContext const &, ModuleCallingContext const &)
void add(std::string const &label, ParameterSetDescription const &psetDescription)
double S(const TLorentzVector &, const TLorentzVector &)
void postModuleStreamTransition(StreamContext const &, ModuleCallingContext const &)
void postSourceEvent(StreamID)
void postModuleEvent(StreamContext const &, ModuleCallingContext const &)
decltype(std::declval< StreamID >().value()) StreamID_value
Transition transition() const
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
std::vector< StallStatistics > moduleStats_
EventID const & eventID() const
decltype(std::declval< ModuleDescription >().id()) ModuleID
void write(std::string &&msg)
void preModuleGlobalTransition(GlobalContext const &, ModuleCallingContext const &)