27 #include "tbb/concurrent_unordered_map.h"
37 using clock_t = std::chrono::steady_clock;
51 class StallStatistics {
57 StallStatistics() =
default;
60 unsigned numberOfStalls()
const {
return stallCounter_; }
62 using duration_t = std::chrono::milliseconds;
65 duration_t totalStalledTime()
const {
return duration_t{totalTime_.load()}; }
66 duration_t maxStalledTime()
const {
return duration_t{maxTime_.load()}; }
71 void update(std::chrono::milliseconds
const ms)
74 auto const thisTime = ms.count();
75 totalTime_ += thisTime;
77 while (thisTime >
max && !maxTime_.compare_exchange_strong(
max, thisTime));
82 std::atomic<unsigned> stallCounter_ {};
83 std::atomic<rep_t> totalTime_ {};
84 std::atomic<rep_t> maxTime_ {};
91 concatenate(std::ostream& os,
T const t)
96 template <
typename H,
typename...
T>
98 concatenate(std::ostream& os,
H const h,
T const...
t)
101 concatenate(os, t...);
105 postModuleEventPrefetching =
'p',
106 preModuleEvent =
'M',
107 preEventReadFromSource =
'R',
108 postEventReadFromSource =
'r',
109 postModuleEvent =
'm' ,
114 os << static_cast<std::underlying_type_t<step>>(
s);
118 template <
step S,
typename... ARGS>
121 std::ostringstream oss;
123 concatenate(oss, args...);
161 using ModuleID = decltype(std::declval<ModuleDescription>().
id());
173 constexpr char const* filename_default {
""};
174 constexpr double threshold_default {0.1};
179 using namespace std::chrono;
200 std::ostringstream oss;
201 oss <<
"# Step Symbol Entries\n"
202 <<
"# -------------------------- ------ ------------------------------------------\n"
203 <<
"# preEvent " << step::preEvent <<
" <Stream ID> <Run#> <LumiBlock#> <Event#> <Time since beginJob (ms)>\n"
204 <<
"# postModuleEventPrefetching " << step::postModuleEventPrefetching <<
" <Stream ID> <Module ID> <Time since beginJob (ms)>\n"
205 <<
"# preModuleEvent " << step::preModuleEvent <<
" <Stream ID> <Module ID> <Time since beginJob (ms)>\n"
206 <<
"# preEventReadFromSource " << step::preEventReadFromSource <<
" <Stream ID> <Module ID> <Time since beginJob (ms)>\n"
207 <<
"# postEventReadFromSource " << step::postEventReadFromSource <<
" <Stream ID> <Module ID> <Time since beginJob (ms)>\n"
208 <<
"# postModuleEvent " << step::postModuleEvent <<
" <Stream ID> <Module ID> <Time since beginJob (ms)>\n"
209 <<
"# postEvent " << step::postEvent <<
" <Stream ID> <Run#> <LumiBlock#> <Event#> <Time since beginJob (ms)>\n";
217 desc.
addUntracked<
std::string>(
"fileName", filename_default)->setComment(
"Name of file to which detailed timing information should be written.\n"
218 "An empty filename argument (the default) indicates that no extra\n"
219 "information will be written to a dedicated file, but only the summary\n"
220 "including stalling-modules information will be logged.");
221 desc.
addUntracked<
double>(
"stallThreshold", threshold_default)->setComment(
"Threshold (in seconds) used to classify modules as stalled.\n"
222 "Millisecond granularity allowed.");
223 descriptions.
add(
"StallMonitor", desc);
224 descriptions.
setComment(
"This service keeps track of various times in event-processing to determine which modules are stalling.");
240 auto const mid = md.
id();
267 std::ostringstream oss;
268 oss <<
"\n# " << col0 << space << lastCol <<
'\n';
269 oss <<
"# " <<
std::string(col0.width()+space.size()+lastCol.size(),
'-') <<
'\n';
273 if (
label.empty())
continue;
274 oss <<
"#M " << std::setw(
width) << std::left << col0(
i) << space
291 auto msg = assembleMessage<step::preEvent>(stream_id(sc),
eid.run(),
eid.luminosityBlock(),
eid.event(),
t);
297 auto const sid = stream_id(sc);
298 auto const mid = module_id(mcc);
303 auto msg = assembleMessage<step::postModuleEventPrefetching>(sid, mid,
t);
310 auto const preModEvent =
now();
311 auto const sid = stream_id(sc);
312 auto const mid = module_id(mcc);
314 auto msg = assembleMessage<step::preModuleEvent>(sid, mid, duration_cast<milliseconds>(preModEvent-
beginTime_).
count());
318 auto const preFetch_to_preModEvent = duration_cast<milliseconds>(preModEvent-
stallStart_[std::make_pair(sid,mid)]);
326 auto msg = assembleMessage<step::preEventReadFromSource>(stream_id(sc), module_id(mcc),
t);
333 auto msg = assembleMessage<step::postEventReadFromSource>(stream_id(sc), module_id(mcc),
t);
340 auto msg = assembleMessage<step::postModuleEvent>(stream_id(sc), module_id(mcc), postModEvent);
348 auto msg = assembleMessage<step::postEvent>(stream_id(sc),
eid.run(),
eid.luminosityBlock(),
eid.event(),
t);
355 std::size_t
width {};
357 if (stats.numberOfStalls() == 0u)
return;
382 using seconds_d = duration<double>;
384 auto to_seconds_str = [](
auto const& duration){
385 std::ostringstream oss;
386 auto const time = duration_cast<seconds_d>(duration).
count();
391 out << std::setfill(
' ');
393 if (stats.label().empty() ||
394 stats.numberOfStalls() == 0u)
continue;
397 << col1(stats.label()) << space
399 << col2(stats.numberOfStalls()) << space
400 << col3(to_seconds_str(stats.totalStalledTime())) << space
401 << col4(to_seconds_str(stats.maxStalledTime())) <<
'\n';
T getUntrackedParameter(std::string const &, T const &) const
void watchPreEvent(PreEvent::slot_type const &iSlot)
std::chrono::milliseconds const stallThreshold_
auto_ptr< ClusterSequence > cs
ParameterDescriptionBase * addUntracked(U const &iLabel, T const &value)
void preModuleEvent(StreamContext const &, ModuleCallingContext const &)
FWCore Framework interface EventSetupRecordImplementation h
Helper function to determine trigger accepts.
void watchPostEndJob(PostEndJob::slot_type const &iSlot)
void watchPreModuleEvent(PreModuleEvent::slot_type const &iSlot)
void watchPreModuleConstruction(PreModuleConstruction::slot_type const &iSlot)
ThreadSafeOutputFileStream file_
void watchPostEvent(PostEvent::slot_type const &iSlot)
void postEvent(StreamContext const &)
decltype(now()) beginTime_
void watchPreEventReadFromSource(PreEventReadFromSource::slot_type const &iSlot)
void watchPostModuleEvent(PostModuleEvent::slot_type const &iSlot)
std::ostream & operator<<(std::ostream &out, const ALILine &li)
std::string const & moduleLabel() const
void preModuleConstruction(edm::ModuleDescription const &)
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
void watchPostModuleEventPrefetching(PostModuleEventPrefetching::slot_type const &iSlot)
std::vector< std::string > moduleLabels_
void postModuleEventPrefetching(StreamContext const &, ModuleCallingContext const &)
void preEventReadFromSource(StreamContext const &, ModuleCallingContext const &)
ModuleDescription const * moduleDescription() const
void preEvent(StreamContext const &)
StallMonitor(ParameterSet const &, ActivityRegistry &)
StreamID const & streamID() const
#define DEFINE_FWK_SERVICE(type)
void setComment(std::string const &value)
unsigned int value() const
void postEventReadFromSource(StreamContext const &, ModuleCallingContext const &)
void add(std::string const &label, ParameterSetDescription const &psetDescription)
tbb::concurrent_unordered_map< std::pair< StreamID_value, ModuleID >, decltype(beginTime_)> stallStart_
double S(const TLorentzVector &, const TLorentzVector &)
void postModuleEvent(StreamContext const &, ModuleCallingContext const &)
decltype(std::declval< StreamID >().value()) StreamID_value
void watchPostEventReadFromSource(PostEventReadFromSource::slot_type const &iSlot)
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
std::vector< StallStatistics > moduleStats_
EventID const & eventID() const
decltype(std::declval< ModuleDescription >().id()) ModuleID
void write(std::string &&msg)
tuple size
Write out results.
void watchPostBeginJob(PostBeginJob::slot_type const &iSlot)
convenience function for attaching to signal