27 #include "tbb/concurrent_unordered_map.h" 37 using clock_t = std::chrono::steady_clock;
51 class StallStatistics {
57 StallStatistics() =
default;
60 unsigned numberOfStalls()
const {
return stallCounter_; }
62 using duration_t = std::chrono::milliseconds;
65 duration_t totalStalledTime()
const {
return duration_t{totalTime_.load()}; }
66 duration_t maxStalledTime()
const {
return duration_t{maxTime_.load()}; }
71 void update(std::chrono::milliseconds
const ms)
74 auto const thisTime = ms.count();
75 totalTime_ += thisTime;
77 while (thisTime >
max && !maxTime_.compare_exchange_strong(
max, thisTime));
82 std::atomic<unsigned> stallCounter_ {};
83 std::atomic<rep_t> totalTime_ {};
84 std::atomic<rep_t> maxTime_ {};
91 concatenate(std::ostream& os,
T const t)
96 template <
typename H,
typename...
T>
98 concatenate(std::ostream& os,
H const h,
T const...
t)
101 concatenate(os, t...);
104 enum class step : char { preSourceEvent =
'S',
105 postSourceEvent =
's',
107 postModuleEventPrefetching =
'p',
108 preModuleEvent =
'M',
109 preEventReadFromSource =
'R',
110 postEventReadFromSource =
'r',
111 postModuleEvent =
'm' ,
116 os << static_cast<std::underlying_type_t<step>>(
s);
120 template <
step S,
typename... ARGS>
123 std::ostringstream oss;
125 concatenate(oss, args...);
158 decltype(
now()) beginTime_ {};
165 using ModuleID = decltype(std::declval<ModuleDescription>().
id());
166 tbb::concurrent_unordered_map<std::pair<StreamID_value,ModuleID>, decltype(beginTime_)> stallStart_ {};
168 std::vector<std::string> moduleLabels_ {};
169 std::vector<StallStatistics> moduleStats_ {};
177 constexpr char const* filename_default {
""};
178 constexpr double threshold_default {0.1};
188 , stallThreshold_{
static_cast<long int>(iPS.getUntrackedParameter<
double>(
"stallThreshold")*1000)}
206 std::ostringstream oss;
207 oss <<
"# Step Symbol Entries\n" 208 <<
"# -------------------------- ------ ------------------------------------------\n" 209 <<
"# preSourceEvent " << step::preSourceEvent <<
" <Stream ID> <Time since beginJob (ms)>\n" 210 <<
"# postSourceEvent " << step::postSourceEvent <<
" <Stream ID> <Time since beginJob (ms)>\n" 211 <<
"# preEvent " << step::preEvent <<
" <Stream ID> <Run#> <LumiBlock#> <Event#> <Time since beginJob (ms)>\n" 212 <<
"# postModuleEventPrefetching " << step::postModuleEventPrefetching <<
" <Stream ID> <Module ID> <Time since beginJob (ms)>\n" 213 <<
"# preModuleEvent " << step::preModuleEvent <<
" <Stream ID> <Module ID> <Time since beginJob (ms)>\n" 214 <<
"# preEventReadFromSource " << step::preEventReadFromSource <<
" <Stream ID> <Module ID> <Time since beginJob (ms)>\n" 215 <<
"# postEventReadFromSource " << step::postEventReadFromSource <<
" <Stream ID> <Module ID> <Time since beginJob (ms)>\n" 216 <<
"# postModuleEvent " << step::postModuleEvent <<
" <Stream ID> <Module ID> <Time since beginJob (ms)>\n" 217 <<
"# postEvent " << step::postEvent <<
" <Stream ID> <Run#> <LumiBlock#> <Event#> <Time since beginJob (ms)>\n";
218 file_.
write(oss.str());
225 desc.
addUntracked<
std::string>(
"fileName", filename_default)->setComment(
"Name of file to which detailed timing information should be written.\n" 226 "An empty filename argument (the default) indicates that no extra\n" 227 "information will be written to a dedicated file, but only the summary\n" 228 "including stalling-modules information will be logged.");
229 desc.
addUntracked<
double>(
"stallThreshold", threshold_default)->setComment(
"Threshold (in seconds) used to classify modules as stalled.\n" 230 "Millisecond granularity allowed.");
231 descriptions.
add(
"StallMonitor", desc);
232 descriptions.
setComment(
"This service keeps track of various times in event-processing to determine which modules are stalling.");
248 auto const mid = md.
id();
275 std::ostringstream oss;
276 oss <<
"\n# " << col0 << space << lastCol <<
'\n';
277 oss <<
"# " <<
std::string(col0.width()+space.size()+lastCol.size(),
'-') <<
'\n';
281 if (
label.empty())
continue;
282 oss <<
"#M " << std::setw(
width) << std::left << col0(
i) << space
286 file_.
write(oss.str());
298 auto msg = assembleMessage<step::preSourceEvent>(sid.
value(),
t);
305 auto msg = assembleMessage<step::postSourceEvent>(sid.
value(),
t);
313 auto msg = assembleMessage<step::preEvent>(stream_id(sc),
eid.run(),
eid.luminosityBlock(),
eid.event(),
t);
319 auto const sid = stream_id(sc);
320 auto const mid = module_id(mcc);
325 auto msg = assembleMessage<step::postModuleEventPrefetching>(sid, mid,
t);
332 auto const preModEvent =
now();
333 auto const sid = stream_id(sc);
334 auto const mid = module_id(mcc);
339 if(
startT == milliseconds::duration::zero()) {
341 auto msg = assembleMessage<step::postModuleEventPrefetching>(sid, mid,
t);
344 auto msg = assembleMessage<step::preModuleEvent>(sid, mid,
t);
348 if( milliseconds::duration::zero() !=
startT) {
349 auto const preFetch_to_preModEvent = duration_cast<milliseconds>(preModEvent-
start);
350 if (preFetch_to_preModEvent < stallThreshold_)
return;
358 auto msg = assembleMessage<step::preEventReadFromSource>(stream_id(sc), module_id(mcc),
t);
365 auto msg = assembleMessage<step::postEventReadFromSource>(stream_id(sc), module_id(mcc),
t);
372 auto msg = assembleMessage<step::postModuleEvent>(stream_id(sc), module_id(mcc), postModEvent);
380 auto msg = assembleMessage<step::postEvent>(stream_id(sc),
eid.run(),
eid.luminosityBlock(),
eid.event(),
t);
387 std::size_t
width {};
389 if (stats.numberOfStalls() == 0u)
return;
414 using seconds_d = duration<double>;
416 auto to_seconds_str = [](
auto const& duration){
417 std::ostringstream oss;
418 auto const time = duration_cast<seconds_d>(duration).
count();
423 out << std::setfill(
' ');
425 if (stats.label().empty() ||
426 stats.numberOfStalls() == 0u)
continue;
429 << col1(stats.label()) << space
431 << col2(stats.numberOfStalls()) << space
432 << col3(to_seconds_str(stats.totalStalledTime())) << space
433 << col4(to_seconds_str(stats.maxStalledTime())) <<
'\n';
T getUntrackedParameter(std::string const &, T const &) const
std::chrono::milliseconds const stallThreshold_
auto_ptr< ClusterSequence > cs
ParameterDescriptionBase * addUntracked(U const &iLabel, T const &value)
void preModuleEvent(StreamContext const &, ModuleCallingContext const &)
FWCore Framework interface EventSetupRecordImplementation h
Helper function to determine trigger accepts.
void preSourceEvent(StreamID)
ThreadSafeOutputFileStream file_
void postEvent(StreamContext const &)
decltype(now()) beginTime_
std::ostream & operator<<(std::ostream &out, const ALILine &li)
std::string const & moduleLabel() const
void preModuleConstruction(edm::ModuleDescription const &)
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
std::vector< std::string > moduleLabels_
void postModuleEventPrefetching(StreamContext const &, ModuleCallingContext const &)
void preEventReadFromSource(StreamContext const &, ModuleCallingContext const &)
ModuleDescription const * moduleDescription() const
void preEvent(StreamContext const &)
StreamID const & streamID() const
#define DEFINE_FWK_SERVICE(type)
void setComment(std::string const &value)
unsigned int value() const
void postEventReadFromSource(StreamContext const &, ModuleCallingContext const &)
void add(std::string const &label, ParameterSetDescription const &psetDescription)
tbb::concurrent_unordered_map< std::pair< StreamID_value, ModuleID >, decltype(beginTime_)> stallStart_
double S(const TLorentzVector &, const TLorentzVector &)
void postSourceEvent(StreamID)
void postModuleEvent(StreamContext const &, ModuleCallingContext const &)
decltype(std::declval< StreamID >().value()) StreamID_value
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
std::vector< StallStatistics > moduleStats_
EventID const & eventID() const
decltype(std::declval< ModuleDescription >().id()) ModuleID
void write(std::string &&msg)