28 #include "tbb/concurrent_unordered_map.h" 38 using clock_t = std::chrono::steady_clock;
52 class StallStatistics {
58 StallStatistics() =
default;
61 unsigned numberOfStalls()
const {
return stallCounter_; }
63 using duration_t = std::chrono::milliseconds;
66 duration_t totalStalledTime()
const {
return duration_t{totalTime_.load()}; }
67 duration_t maxStalledTime()
const {
return duration_t{maxTime_.load()}; }
72 void update(std::chrono::milliseconds
const ms)
75 auto const thisTime = ms.count();
76 totalTime_ += thisTime;
78 while (thisTime >
max && !maxTime_.compare_exchange_strong(
max, thisTime));
83 std::atomic<unsigned> stallCounter_ {};
84 std::atomic<rep_t> totalTime_ {};
85 std::atomic<rep_t> maxTime_ {};
92 concatenate(std::ostream& os,
T const t)
97 template <
typename H,
typename...
T>
99 concatenate(std::ostream& os,
H const h,
T const...
t)
102 concatenate(os, t...);
105 enum class step : char { preSourceEvent =
'S',
106 postSourceEvent =
's',
108 postModuleEventPrefetching =
'p',
109 preModuleEventAcquire =
'A',
110 postModuleEventAcquire =
'a',
111 preModuleEvent =
'M',
112 preEventReadFromSource =
'R',
113 postEventReadFromSource =
'r',
114 postModuleEvent =
'm' ,
117 enum class Phase : short { globalEndRun = -4,
129 os << static_cast<std::underlying_type_t<step>>(
s);
135 os << static_cast<std::underlying_type_t<Phase>>(
s);
139 template <
step S,
typename... ARGS>
142 std::ostringstream oss;
144 concatenate(oss, args...);
152 case StreamContext::Transition::kBeginRun:
153 return Phase::streamBeginRun;
154 case StreamContext::Transition::kBeginLuminosityBlock:
155 return Phase::streamBeginLumi;
156 case StreamContext::Transition::kEvent:
158 case StreamContext::Transition::kEndLuminosityBlock:
159 return Phase::streamEndLumi;
160 case StreamContext::Transition::kEndRun:
161 return Phase::streamEndRun;
169 auto toTransition(
edm::StreamContext const& iContext) -> std::underlying_type_t<Phase> {
170 return static_cast<std::underlying_type_t<Phase>
>(toTransitionImpl(iContext));
176 case GlobalContext::Transition::kBeginRun:
177 return Phase::globalBeginRun;
178 case GlobalContext::Transition::kBeginLuminosityBlock:
179 return Phase::globalBeginLumi;
180 case GlobalContext::Transition::kEndLuminosityBlock:
181 return Phase::globalEndLumi;
182 case GlobalContext::Transition::kWriteLuminosityBlock:
183 return Phase::globalEndLumi;
184 case GlobalContext::Transition::kEndRun:
185 return Phase::globalEndRun;
186 case GlobalContext::Transition::kWriteRun:
187 return Phase::globalEndRun;
194 auto toTransition(
edm::GlobalContext const& iContext) -> std::underlying_type_t<Phase> {
195 return static_cast<std::underlying_type_t<Phase>
>(toTransitionImpl(iContext));
232 decltype(
now()) beginTime_ {};
239 using ModuleID = decltype(std::declval<ModuleDescription>().
id());
240 tbb::concurrent_unordered_map<std::pair<StreamID_value,ModuleID>, std::pair<decltype(beginTime_), bool>> stallStart_ {};
242 std::vector<std::string> moduleLabels_ {};
243 std::vector<StallStatistics> moduleStats_ {};
252 constexpr char const* filename_default {
""};
253 constexpr double threshold_default {0.1};
263 , stallThreshold_{
static_cast<long int>(iPS.getUntrackedParameter<
double>(
"stallThreshold")*1000)}
309 std::ostringstream oss;
310 oss <<
"# Transition Symbol\n";
311 oss <<
"#----------------- ------\n";
312 oss <<
"# globalBeginRun "<<Phase::globalBeginRun <<
"\n" 313 <<
"# streamBeginRun "<<Phase::streamBeginRun <<
"\n" 314 <<
"# globalBeginLumi "<<Phase::globalBeginLumi<<
"\n" 315 <<
"# streamBeginLumi "<<Phase::streamBeginLumi<<
"\n" 317 <<
"# streamEndLumi "<<Phase::streamEndLumi<<
"\n" 318 <<
"# globalEndLumi "<<Phase::globalEndLumi<<
"\n" 319 <<
"# streamEndRun "<<Phase::streamEndRun<<
"\n" 320 <<
"# globalEndRun "<<Phase::globalEndRun<<
"\n";
321 oss <<
"# Step Symbol Entries\n" 322 <<
"# -------------------------- ------ ------------------------------------------\n" 323 <<
"# preSourceEvent " << step::preSourceEvent <<
" <Stream ID> <Time since beginJob (ms)>\n" 324 <<
"# postSourceEvent " << step::postSourceEvent <<
" <Stream ID> <Time since beginJob (ms)>\n" 325 <<
"# preEvent " << step::preEvent <<
" <Stream ID> <Run#> <LumiBlock#> <Event#> <Time since beginJob (ms)>\n" 326 <<
"# postModuleEventPrefetching " << step::postModuleEventPrefetching <<
" <Stream ID> <Module ID> <Time since beginJob (ms)>\n" 327 <<
"# preModuleEventAcquire " << step::preModuleEventAcquire <<
" <Stream ID> <Module ID> <Time since beginJob (ms)>\n" 328 <<
"# postModuleEventAcquire " << step::postModuleEventAcquire <<
" <Stream ID> <Module ID> <Time since beginJob (ms)>\n" 329 <<
"# preModuleTransition " << step::preModuleEvent <<
" <Stream ID> <Module ID> <Transition type> <Time since beginJob (ms)>\n" 330 <<
"# preEventReadFromSource " << step::preEventReadFromSource <<
" <Stream ID> <Module ID> <Time since beginJob (ms)>\n" 331 <<
"# postEventReadFromSource " << step::postEventReadFromSource <<
" <Stream ID> <Module ID> <Time since beginJob (ms)>\n" 332 <<
"# postModuleTransition " << step::postModuleEvent <<
" <Stream ID> <Module ID> <Transition type> <Time since beginJob (ms)>\n" 333 <<
"# postEvent " << step::postEvent <<
" <Stream ID> <Run#> <LumiBlock#> <Event#> <Time since beginJob (ms)>\n";
334 file_.
write(oss.str());
341 desc.
addUntracked<
std::string>(
"fileName", filename_default)->setComment(
"Name of file to which detailed timing information should be written.\n" 342 "An empty filename argument (the default) indicates that no extra\n" 343 "information will be written to a dedicated file, but only the summary\n" 344 "including stalling-modules information will be logged.");
345 desc.
addUntracked<
double>(
"stallThreshold", threshold_default)->setComment(
"Threshold (in seconds) used to classify modules as stalled.\n" 346 "Millisecond granularity allowed.");
347 descriptions.
add(
"StallMonitor", desc);
348 descriptions.
setComment(
"This service keeps track of various times in event-processing to determine which modules are stalling.");
364 auto const mid = md.
id();
391 std::ostringstream oss;
392 oss <<
"\n# " << col0 << space << lastCol <<
'\n';
393 oss <<
"# " <<
std::string(col0.width()+space.size()+lastCol.size(),
'-') <<
'\n';
397 if (
label.empty())
continue;
398 oss <<
"#M " << std::setw(
width) << std::left << col0(
i) << space
402 file_.
write(oss.str());
414 auto msg = assembleMessage<step::preSourceEvent>(sid.
value(),
t);
421 auto msg = assembleMessage<step::postSourceEvent>(sid.
value(),
t);
429 auto msg = assembleMessage<step::preEvent>(stream_id(sc),
eid.run(),
eid.luminosityBlock(),
eid.event(),
t);
435 auto const sid = stream_id(sc);
436 auto const mid = module_id(mcc);
441 auto msg = assembleMessage<step::postModuleEventPrefetching>(sid, mid,
t);
448 auto const preModEventAcquire =
now();
449 auto const sid = stream_id(sc);
450 auto const mid = module_id(mcc);
455 auto t = duration_cast<milliseconds>(preModEventAcquire -
beginTime_).
count();
456 auto msg = assembleMessage<step::preModuleEventAcquire>(sid, mid,
t);
460 if( milliseconds::duration::zero() !=
startT) {
461 auto const preFetch_to_preModEventAcquire = duration_cast<milliseconds>(preModEventAcquire -
start.first);
462 if (preFetch_to_preModEventAcquire < stallThreshold_)
return;
463 moduleStats_[mid].update(preFetch_to_preModEventAcquire);
469 auto const postModEventAcquire = duration_cast<milliseconds>(
now()-
beginTime_).
count();
470 auto msg = assembleMessage<step::postModuleEventAcquire>(stream_id(sc), module_id(mcc), postModEventAcquire);
476 auto const preModEvent =
now();
477 auto const sid = stream_id(sc);
478 auto const mid = module_id(mcc);
483 auto msg = assembleMessage<step::preModuleEvent>(sid, mid,
static_cast<std::underlying_type_t<Phase>
>(
Phase::Event),
t);
487 if( milliseconds::duration::zero() !=
startT && !
start.second) {
488 auto const preFetch_to_preModEvent = duration_cast<milliseconds>(preModEvent-
start.first);
489 if (preFetch_to_preModEvent < stallThreshold_)
return;
496 auto const tNow =
now();
497 auto const sid = stream_id(sc);
498 auto const mid = module_id(mcc);
500 auto msg = assembleMessage<step::preModuleEvent>(sid, mid, toTransition(sc),
t);
507 auto msg = assembleMessage<step::postModuleEvent>(stream_id(sc), module_id(mcc), toTransition(sc),
t);
514 auto msg = assembleMessage<step::preModuleEvent>(
numStreams_, module_id(mcc), toTransition(gc),
t);
520 auto msg = assembleMessage<step::postModuleEvent>(
numStreams_, module_id(mcc), toTransition(gc), postModTime);
527 auto msg = assembleMessage<step::preEventReadFromSource>(stream_id(sc), module_id(mcc),
t);
534 auto msg = assembleMessage<step::postEventReadFromSource>(stream_id(sc), module_id(mcc),
t);
541 auto msg = assembleMessage<step::postModuleEvent>(stream_id(sc), module_id(mcc),
static_cast<std::underlying_type_t<Phase>
>(
Phase::Event), postModEvent);
549 auto msg = assembleMessage<step::postEvent>(stream_id(sc),
eid.run(),
eid.luminosityBlock(),
eid.event(),
t);
556 std::size_t
width {};
558 if (stats.numberOfStalls() == 0u)
return;
583 using seconds_d = duration<double>;
585 auto to_seconds_str = [](
auto const& duration){
586 std::ostringstream oss;
587 auto const time = duration_cast<seconds_d>(duration).
count();
592 out << std::setfill(
' ');
594 if (stats.label().empty() ||
595 stats.numberOfStalls() == 0u)
continue;
598 << col1(stats.label()) << space
600 << col2(stats.numberOfStalls()) << space
601 << col3(to_seconds_str(stats.totalStalledTime())) << space
602 << col4(to_seconds_str(stats.maxStalledTime())) <<
'\n';
T getUntrackedParameter(std::string const &, T const &) const
std::chrono::milliseconds const stallThreshold_
ParameterDescriptionBase * addUntracked(U const &iLabel, T const &value)
void preModuleEvent(StreamContext const &, ModuleCallingContext const &)
FWCore Framework interface EventSetupRecordImplementation h
Helper function to determine trigger accepts.
void preSourceEvent(StreamID)
unique_ptr< ClusterSequence > cs
ThreadSafeOutputFileStream file_
void postEvent(StreamContext const &)
decltype(now()) beginTime_
std::ostream & operator<<(std::ostream &out, const ALILine &li)
std::string const & moduleLabel() const
void preModuleConstruction(edm::ModuleDescription const &)
void preModuleStreamTransition(StreamContext const &, ModuleCallingContext const &)
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
unsigned int maxNumberOfStreams() const
std::vector< std::string > moduleLabels_
Transition transition() const
void postModuleEventPrefetching(StreamContext const &, ModuleCallingContext const &)
void preEventReadFromSource(StreamContext const &, ModuleCallingContext const &)
ModuleDescription const * moduleDescription() const
void preEvent(StreamContext const &)
StreamID const & streamID() const
void preModuleEventAcquire(StreamContext const &, ModuleCallingContext const &)
#define DEFINE_FWK_SERVICE(type)
void setComment(std::string const &value)
void postModuleEventAcquire(StreamContext const &, ModuleCallingContext const &)
unsigned int value() const
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
void postEventReadFromSource(StreamContext const &, ModuleCallingContext const &)
void postModuleGlobalTransition(GlobalContext const &, ModuleCallingContext const &)
void add(std::string const &label, ParameterSetDescription const &psetDescription)
double S(const TLorentzVector &, const TLorentzVector &)
void postModuleStreamTransition(StreamContext const &, ModuleCallingContext const &)
void postSourceEvent(StreamID)
void postModuleEvent(StreamContext const &, ModuleCallingContext const &)
decltype(std::declval< StreamID >().value()) StreamID_value
Transition transition() const
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
std::vector< StallStatistics > moduleStats_
EventID const & eventID() const
decltype(std::declval< ModuleDescription >().id()) ModuleID
void write(std::string &&msg)
tbb::concurrent_unordered_map< std::pair< StreamID_value, ModuleID >, std::pair< decltype(beginTime_), bool > > stallStart_
void preModuleGlobalTransition(GlobalContext const &, ModuleCallingContext const &)