27 #include "tbb/concurrent_unordered_map.h" 37 using clock_t = std::chrono::steady_clock;
51 class StallStatistics {
57 StallStatistics() =
default;
60 unsigned numberOfStalls()
const {
return stallCounter_; }
62 using duration_t = std::chrono::milliseconds;
65 duration_t totalStalledTime()
const {
return duration_t{totalTime_.load()}; }
66 duration_t maxStalledTime()
const {
return duration_t{maxTime_.load()}; }
71 void update(std::chrono::milliseconds
const ms)
74 auto const thisTime = ms.count();
75 totalTime_ += thisTime;
77 while (thisTime >
max && !maxTime_.compare_exchange_strong(
max, thisTime));
82 std::atomic<unsigned> stallCounter_ {};
83 std::atomic<rep_t> totalTime_ {};
84 std::atomic<rep_t> maxTime_ {};
91 concatenate(std::ostream& os,
T const t)
96 template <
typename H,
typename...
T>
98 concatenate(std::ostream& os,
H const h,
T const...
t)
101 concatenate(os, t...);
104 enum class step : char { preSourceEvent =
'S',
105 postSourceEvent =
's',
107 postModuleEventPrefetching =
'p',
108 preModuleEventAcquire =
'A',
109 postModuleEventAcquire =
'a',
110 preModuleEvent =
'M',
111 preEventReadFromSource =
'R',
112 postEventReadFromSource =
'r',
113 postModuleEvent =
'm' ,
118 os << static_cast<std::underlying_type_t<step>>(
s);
122 template <
step S,
typename... ARGS>
125 std::ostringstream oss;
127 concatenate(oss, args...);
162 decltype(
now()) beginTime_ {};
169 using ModuleID = decltype(std::declval<ModuleDescription>().
id());
170 tbb::concurrent_unordered_map<std::pair<StreamID_value,ModuleID>, std::pair<decltype(beginTime_), bool>> stallStart_ {};
172 std::vector<std::string> moduleLabels_ {};
173 std::vector<StallStatistics> moduleStats_ {};
181 constexpr char const* filename_default {
""};
182 constexpr double threshold_default {0.1};
192 , stallThreshold_{
static_cast<long int>(iPS.getUntrackedParameter<
double>(
"stallThreshold")*1000)}
212 std::ostringstream oss;
213 oss <<
"# Step Symbol Entries\n" 214 <<
"# -------------------------- ------ ------------------------------------------\n" 215 <<
"# preSourceEvent " << step::preSourceEvent <<
" <Stream ID> <Time since beginJob (ms)>\n" 216 <<
"# postSourceEvent " << step::postSourceEvent <<
" <Stream ID> <Time since beginJob (ms)>\n" 217 <<
"# preEvent " << step::preEvent <<
" <Stream ID> <Run#> <LumiBlock#> <Event#> <Time since beginJob (ms)>\n" 218 <<
"# postModuleEventPrefetching " << step::postModuleEventPrefetching <<
" <Stream ID> <Module ID> <Time since beginJob (ms)>\n" 219 <<
"# preModuleEventAcquire " << step::preModuleEventAcquire <<
" <Stream ID> <Module ID> <Time since beginJob (ms)>\n" 220 <<
"# postModuleEventAcquire " << step::postModuleEventAcquire <<
" <Stream ID> <Module ID> <Time since beginJob (ms)>\n" 221 <<
"# preModuleEvent " << step::preModuleEvent <<
" <Stream ID> <Module ID> <Time since beginJob (ms)>\n" 222 <<
"# preEventReadFromSource " << step::preEventReadFromSource <<
" <Stream ID> <Module ID> <Time since beginJob (ms)>\n" 223 <<
"# postEventReadFromSource " << step::postEventReadFromSource <<
" <Stream ID> <Module ID> <Time since beginJob (ms)>\n" 224 <<
"# postModuleEvent " << step::postModuleEvent <<
" <Stream ID> <Module ID> <Time since beginJob (ms)>\n" 225 <<
"# postEvent " << step::postEvent <<
" <Stream ID> <Run#> <LumiBlock#> <Event#> <Time since beginJob (ms)>\n";
226 file_.
write(oss.str());
233 desc.
addUntracked<
std::string>(
"fileName", filename_default)->setComment(
"Name of file to which detailed timing information should be written.\n" 234 "An empty filename argument (the default) indicates that no extra\n" 235 "information will be written to a dedicated file, but only the summary\n" 236 "including stalling-modules information will be logged.");
237 desc.
addUntracked<
double>(
"stallThreshold", threshold_default)->setComment(
"Threshold (in seconds) used to classify modules as stalled.\n" 238 "Millisecond granularity allowed.");
239 descriptions.
add(
"StallMonitor", desc);
240 descriptions.
setComment(
"This service keeps track of various times in event-processing to determine which modules are stalling.");
256 auto const mid = md.
id();
283 std::ostringstream oss;
284 oss <<
"\n# " << col0 << space << lastCol <<
'\n';
285 oss <<
"# " <<
std::string(col0.width()+space.size()+lastCol.size(),
'-') <<
'\n';
289 if (
label.empty())
continue;
290 oss <<
"#M " << std::setw(
width) << std::left << col0(
i) << space
294 file_.
write(oss.str());
306 auto msg = assembleMessage<step::preSourceEvent>(sid.
value(),
t);
313 auto msg = assembleMessage<step::postSourceEvent>(sid.
value(),
t);
321 auto msg = assembleMessage<step::preEvent>(stream_id(sc),
eid.run(),
eid.luminosityBlock(),
eid.event(),
t);
327 auto const sid = stream_id(sc);
328 auto const mid = module_id(mcc);
333 auto msg = assembleMessage<step::postModuleEventPrefetching>(sid, mid,
t);
340 auto const preModEventAcquire =
now();
341 auto const sid = stream_id(sc);
342 auto const mid = module_id(mcc);
347 auto t = duration_cast<milliseconds>(preModEventAcquire -
beginTime_).
count();
348 auto msg = assembleMessage<step::preModuleEventAcquire>(sid, mid,
t);
352 if( milliseconds::duration::zero() !=
startT) {
353 auto const preFetch_to_preModEventAcquire = duration_cast<milliseconds>(preModEventAcquire -
start.first);
354 if (preFetch_to_preModEventAcquire < stallThreshold_)
return;
355 moduleStats_[mid].update(preFetch_to_preModEventAcquire);
361 auto const postModEventAcquire = duration_cast<milliseconds>(
now()-
beginTime_).
count();
362 auto msg = assembleMessage<step::postModuleEventAcquire>(stream_id(sc), module_id(mcc), postModEventAcquire);
368 auto const preModEvent =
now();
369 auto const sid = stream_id(sc);
370 auto const mid = module_id(mcc);
375 auto msg = assembleMessage<step::preModuleEvent>(sid, mid,
t);
379 if( milliseconds::duration::zero() !=
startT && !
start.second) {
380 auto const preFetch_to_preModEvent = duration_cast<milliseconds>(preModEvent-
start.first);
381 if (preFetch_to_preModEvent < stallThreshold_)
return;
389 auto msg = assembleMessage<step::preEventReadFromSource>(stream_id(sc), module_id(mcc),
t);
396 auto msg = assembleMessage<step::postEventReadFromSource>(stream_id(sc), module_id(mcc),
t);
403 auto msg = assembleMessage<step::postModuleEvent>(stream_id(sc), module_id(mcc), postModEvent);
411 auto msg = assembleMessage<step::postEvent>(stream_id(sc),
eid.run(),
eid.luminosityBlock(),
eid.event(),
t);
418 std::size_t
width {};
420 if (stats.numberOfStalls() == 0u)
return;
445 using seconds_d = duration<double>;
447 auto to_seconds_str = [](
auto const& duration){
448 std::ostringstream oss;
449 auto const time = duration_cast<seconds_d>(duration).
count();
454 out << std::setfill(
' ');
456 if (stats.label().empty() ||
457 stats.numberOfStalls() == 0u)
continue;
460 << col1(stats.label()) << space
462 << col2(stats.numberOfStalls()) << space
463 << col3(to_seconds_str(stats.totalStalledTime())) << space
464 << col4(to_seconds_str(stats.maxStalledTime())) <<
'\n';
T getUntrackedParameter(std::string const &, T const &) const
std::chrono::milliseconds const stallThreshold_
auto_ptr< ClusterSequence > cs
ParameterDescriptionBase * addUntracked(U const &iLabel, T const &value)
void preModuleEvent(StreamContext const &, ModuleCallingContext const &)
FWCore Framework interface EventSetupRecordImplementation h
Helper function to determine trigger accepts.
void preSourceEvent(StreamID)
ThreadSafeOutputFileStream file_
void postEvent(StreamContext const &)
decltype(now()) beginTime_
std::ostream & operator<<(std::ostream &out, const ALILine &li)
std::string const & moduleLabel() const
void preModuleConstruction(edm::ModuleDescription const &)
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
std::vector< std::string > moduleLabels_
void postModuleEventPrefetching(StreamContext const &, ModuleCallingContext const &)
void preEventReadFromSource(StreamContext const &, ModuleCallingContext const &)
ModuleDescription const * moduleDescription() const
void preEvent(StreamContext const &)
StreamID const & streamID() const
void preModuleEventAcquire(StreamContext const &, ModuleCallingContext const &)
#define DEFINE_FWK_SERVICE(type)
void setComment(std::string const &value)
void postModuleEventAcquire(StreamContext const &, ModuleCallingContext const &)
unsigned int value() const
void postEventReadFromSource(StreamContext const &, ModuleCallingContext const &)
void add(std::string const &label, ParameterSetDescription const &psetDescription)
double S(const TLorentzVector &, const TLorentzVector &)
void postSourceEvent(StreamID)
void postModuleEvent(StreamContext const &, ModuleCallingContext const &)
decltype(std::declval< StreamID >().value()) StreamID_value
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
std::vector< StallStatistics > moduleStats_
EventID const & eventID() const
decltype(std::declval< ModuleDescription >().id()) ModuleID
void write(std::string &&msg)
tbb::concurrent_unordered_map< std::pair< StreamID_value, ModuleID >, std::pair< decltype(beginTime_), bool > > stallStart_