|
|
Go to the documentation of this file.
34 const std::string FastMonitoringService::macroStateNames[FastMonitoringThread::MCOUNT] = {
"Init",
46 const std::string FastMonitoringService::inputStateNames[FastMonitoringThread::inCOUNT] = {
51 "NewLumiBusyEndingLS",
52 "NewLumiIdleEndingLS",
62 "NoRequestWithIdleThreads",
63 "NoRequestWithGlobalEoL",
64 "NoRequestWithEoLThreads",
67 "SupWaitFreeChunkCopying",
69 "SupWaitFreeThreadCopying",
72 "SupLockPollingCopying",
75 "SupNewFileWaitThreadCopying",
76 "SupNewFileWaitThread",
77 "SupNewFileWaitChunkCopying",
78 "SupNewFileWaitChunk",
79 "WaitInput_fileLimit",
80 "WaitInput_waitFreeChunk",
81 "WaitInput_waitFreeChunkCopying",
82 "WaitInput_waitFreeThread",
83 "WaitInput_waitFreeThreadCopying",
85 "WaitInput_lockPolling",
86 "WaitInput_lockPollingCopying",
90 "WaitInput_newFileWaitThreadCopying",
91 "WaitInput_newFileWaitThread",
92 "WaitInput_newFileWaitChunkCopying",
93 "WaitInput_newFileWaitChunk",
94 "WaitChunk_fileLimit",
95 "WaitChunk_waitFreeChunk",
96 "WaitChunk_waitFreeChunkCopying",
97 "WaitChunk_waitFreeThread",
98 "WaitChunk_waitFreeThreadCopying",
100 "WaitChunk_lockPolling",
101 "WaitChunk_lockPollingCopying",
105 "WaitChunk_newFileWaitThreadCopying",
106 "WaitChunk_newFileWaitThread",
107 "WaitChunk_newFileWaitChunkCopying",
108 "WaitChunk_newFileWaitChunk"};
110 const std::string FastMonitoringService::nopath_ =
"NoPath";
117 sleepTime_(iPS.getUntrackedParameter<
int>(
"sleepTime", 1)),
118 fastMonIntervals_(iPS.getUntrackedParameter<unsigned
int>(
"fastMonIntervals", 2)),
119 fastName_(
"fastmoni"),
120 slowName_(
"slowmoni"),
121 filePerFwkStream_(iPS.getUntrackedParameter<
bool>(
"filePerFwkStream",
false)),
122 totalEventsProcessed_(0) {
157 std::string microstateBaseSuffix =
"src/EventFilter/Utilities/plugins/microstatedef.jsd";
159 if (
stat(microstatePath.c_str(), &statbuf)) {
160 microstatePath =
std::string(std::getenv(
"CMSSW_RELEASE_BASE")) +
"/" + microstateBaseSuffix;
161 if (
stat(microstatePath.c_str(), &statbuf)) {
162 microstatePath = microstateBaseSuffix;
163 if (
stat(microstatePath.c_str(), &statbuf))
164 throw cms::Exception(
"FastMonitoringService") <<
"microstate definition file not found";
174 desc.setComment(
"Service for File-based DAQ monitoring and event accounting");
175 desc.addUntracked<
int>(
"sleepTime", 1)->setComment(
"Sleep time of the monitoring thread");
176 desc.addUntracked<
unsigned int>(
"fastMonIntervals", 2)
177 ->setComment(
"Modulo of sleepTime intervals on which fastmon file is written out");
178 desc.addUntracked<
bool>(
"filePerFwkStream",
false)
179 ->setComment(
"Switches on monitoring output per framework stream");
180 desc.setAllowAnything();
181 descriptions.
add(
"FastMonitoringService",
desc);
190 pathLegend[
"names"] = legendaVector;
191 pathLegend[
"reserved"] = valReserved;
193 return writer.write(pathLegend);
205 moduleLegend[
"names"] = legendaVector;
206 moduleLegend[
"reserved"] = valReserved;
207 moduleLegend[
"special"] = valSpecial;
208 moduleLegend[
"output"] = valOutputModules;
210 return writer.write(moduleLegend);
218 moduleLegend[
"names"] = legendaVector;
220 return writer.write(moduleLegend);
238 throw cms::Exception(
"FastMonitoringService") <<
"EvFDaqDirector is not present";
249 <<
". No monitoring data will be written.";
252 std::ostringstream fastFileName;
254 fastFileName <<
fastName_ <<
"_pid" << std::setfill(
'0') << std::setw(5) << getpid() <<
".fast";
256 fast /= fastFileName.str();
260 std::ostringstream fastFileNameTid;
261 fastFileNameTid <<
fastName_ <<
"_pid" << std::setfill(
'0') << std::setw(5) << getpid() <<
"_tid" <<
i
264 fastTid /= fastFileNameTid.str();
268 std::ostringstream moduleLegFile;
269 std::ostringstream moduleLegFileJson;
270 moduleLegFile <<
"microstatelegend_pid" << std::setfill(
'0') << std::setw(5) << getpid() <<
".leg";
271 moduleLegFileJson <<
"microstatelegend_pid" << std::setfill(
'0') << std::setw(5) << getpid() <<
".jsn";
275 std::ostringstream pathLegFile;
276 std::ostringstream pathLegFileJson;
277 pathLegFile <<
"pathlegend_pid" << std::setfill(
'0') << std::setw(5) << getpid() <<
".leg";
279 pathLegFileJson <<
"pathlegend_pid" << std::setfill(
'0') << std::setw(5) << getpid() <<
".jsn";
282 std::ostringstream inputLegFileJson;
283 inputLegFileJson <<
"inputlegend_pid" << std::setfill(
'0') << std::setw(5) << getpid() <<
".jsn";
298 for (
unsigned int i = 0;
i < (
mCOUNT);
i++)
333 monInit_.store(
false, std::memory_order_release);
351 context =
" FromThisContext ";
353 context =
" FromAnotherContext";
355 context =
" FromExternalSignal";
366 context =
" FromThisContext ";
368 context =
" FromAnotherContext";
370 context =
" FromExternalSignal";
381 context =
" FromThisContext ";
383 context =
" FromAnotherContext";
385 context =
" FromExternalSignal";
387 <<
"earlyTermination -: " << context;
408 if (
desc.moduleName() ==
"Stream" ||
desc.moduleName() ==
"ShmStreamConsumer" ||
409 desc.moduleName() ==
"EvFOutputModule" ||
desc.moduleName() ==
"EventStreamFileWriter" ||
410 desc.moduleName() ==
"PoolOutputModule") {
442 timeval lumiStartTime;
443 gettimeofday(&lumiStartTime,
nullptr);
453 LogDebug(
"FastMonitoringService") <<
"Lumi ended. Writing JSON information. LUMI -: " <<
lumi;
454 timeval lumiStopTime;
455 gettimeofday(&lumiStopTime,
nullptr);
462 unsigned long usecondsForLumi = (lumiStopTime.tv_sec - stt.tv_sec) * 1000000 + (lumiStopTime.tv_usec - stt.tv_usec);
465 double throughput =
throughputFactor() * double(accuSize) / double(usecondsForLumi);
474 if (!lumiProcessedJptr)
475 throw cms::Exception(
"FastMonitoringService") <<
"Internal error: got null pointer from FastMonitor";
482 exception_detected =
true;
487 <<
" events were processed in LUMI " <<
lumi;
498 if (sourceReport.first) {
500 throw cms::Exception(
"FastMonitoringService") <<
"MISMATCH with SOURCE update. LUMI -: " <<
lumi
502 <<
" events(source):" << sourceReport.second;
507 <<
"Statistics for lumisection -: lumi = " <<
lumi <<
" events = " << lumiProcessedJptr->
value()
508 <<
" time = " << usecondsForLumi / 1000000 <<
" size = " << accuSize <<
" thr = " << throughput;
509 delete lumiProcessedJptr;
515 std::stringstream slowFileNameStem;
516 slowFileNameStem <<
slowName_ <<
"_ls" << std::setfill(
'0') << std::setw(4) <<
lumi <<
"_pid" << std::setfill(
'0')
517 << std::setw(5) << getpid();
519 slow /= slowFileNameStem.str();
522 std::stringstream slowFileName;
523 slowFileName <<
slowName_ <<
"_ls" << std::setfill(
'0') << std::setw(4) <<
lumi <<
"_pid" << std::setfill(
'0')
524 << std::setw(5) << getpid() <<
".jsn";
526 slow /= slowFileName.str();
713 unsigned int proc = it->second.first;
715 *abortFlag = it->second.second;
719 <<
"output module wants already deleted (or never reported by SOURCE) lumisection event count for LUMI -: "
731 unsigned int abortFlag = it->second.second;
735 <<
"output module wants already deleted (or never reported by SOURCE) lumisection status for LUMI -: "
775 bool inputStatePerThread =
false;
878 inputStatePerThread =
true;
889 inputStatePerThread =
true;
899 if (!inputStatePerThread)
std::filesystem::path workingDirectory_
void preGlobalEndLumi(edm::GlobalContext const &)
LuminosityBlockID const & luminosityBlockID() const
std::vector< unsigned int > microstateEncoded_
void watchPostStreamEndLumi(PostStreamEndLumi::slot_type const &iSlot)
void watchPreSourceEarlyTermination(PreSourceEarlyTermination::slot_type const &iSlot)
std::vector< jsoncollector::AtomicMonUInt * > processed_
std::vector< ContainableAtomic< unsigned int > > eventCountForPathInit_
std::map< unsigned int, std::pair< double, unsigned int > > lockStatsDuringLumi_
void preStreamEndLumi(edm::StreamContext const &)
std::filesystem::path runDirectory_
void doSnapshot(const unsigned int ls, const bool isGlobalEOL)
std::vector< Encoding > encPath_
void preSourceEvent(edm::StreamID)
jsoncollector::DoubleJ fastAvgLeadTimeJ_
jsoncollector::DoubleJ fastLockWaitJ_
unsigned int getEventsProcessedForLumi(unsigned int lumi, bool *abortFlag=nullptr)
std::map< unsigned int, std::pair< unsigned int, bool > > processedEventsPerLumi_
array value (ordered list)
unsigned int inputstateBins_
std::string inputLegendFileJson_
std::vector< std::atomic< bool > * > collectedPathList_
void watchPostGlobalEndLumi(PostGlobalEndLumi::slot_type const &iSlot)
void watchJobFailure(JobFailure::slot_type const &iSlot)
convenience function for attaching to signal
void update(const void *add)
std::string moduleLegendFileJson_
void preSourceEarlyTermination(edm::TerminationOrigin)
std::vector< unsigned int > exceptionInLS_
static const int nReservedModules
constexpr double throughputFactor()
unsigned int value() const
ModuleDescription const * moduleDescription() const
Log< level::Info, false > LogInfo
void resetFastMonitor(std::string const µStateDefPath, std::string const &fastMicroStateDefPath)
std::atomic< FastMonitoringThread::Macrostate > macrostate_
LuminosityBlockNumber_t luminosityBlock() const
Log< level::Warning, false > LogWarning
std::vector< unsigned long > firstEventId_
jsoncollector::IntJ fastFilesProcessedJ_
void watchPostSourceEvent(PostSourceEvent::slot_type const &iSlot)
void accumulateFileSize(unsigned int lumi, unsigned long fileSize)
void updateReserved(const void *add)
FedRawDataInputSource * inputSource_
void postStreamBeginLumi(edm::StreamContext const &)
std::vector< std::string > fastPathList_
void watchPostEndJob(PostEndJob::slot_type const &iSlot)
void watchPreStreamBeginLumi(PreStreamBeginLumi::slot_type const &iSlot)
void add(std::string const &label, ParameterSetDescription const &psetDescription)
std::map< unsigned int, timeval > lumiStartTime_
void preGlobalBeginLumi(edm::GlobalContext const &)
void watchPreBeginJob(PreBeginJob::slot_type const &iSlot)
convenience function for attaching to signal
std::vector< unsigned int > inputState_
std::vector< ContainableAtomic< const void * > > microstate_
void watchPreEvent(PreEvent::slot_type const &iSlot)
void preStreamBeginLumi(edm::StreamContext const &)
void postModuleEvent(edm::StreamContext const &, edm::ModuleCallingContext const &)
jsoncollector::DoubleJ fastThroughputJ_
void watchPostBeginJob(PostBeginJob::slot_type const &iSlot)
convenience function for attaching to signal
Writes a Value in JSON format in a human friendly way.
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
void postGlobalEndLumi(edm::GlobalContext const &)
std::atomic< FastMonitoringThread::InputState > inputSupervisorState_
bool getAbortFlagForLumi(unsigned int lumi)
~FastMonitoringService() override
void watchPostEvent(PostEvent::slot_type const &iSlot)
unsigned int maxNumberOfThreads() const
Value & append(const Value &value)
Append value to array at the end.
std::vector< std::atomic< bool > * > streamCounterUpdating_
unsigned int microstateBins_
void preModuleEvent(edm::StreamContext const &, edm::ModuleCallingContext const &)
StreamID const & streamID() const
void watchPreStreamEndLumi(PreStreamEndLumi::slot_type const &iSlot)
volatile std::atomic< bool > shutdown_flag
unsigned int lumiFromSource_
std::vector< unsigned int > streamLumi_
void watchPreStreamEarlyTermination(PreStreamEarlyTermination::slot_type const &iSlot)
std::string pathLegendFile_
static const int nSpecialModules
std::map< unsigned int, double > avgLeadTime_
void postSourceEvent(edm::StreamID)
EventNumber_t event() const
void registerVariables(jsoncollector::FastMonitor *fm, unsigned int nStreams, unsigned int nThreads)
std::atomic< bool > monInit_
LuminosityBlockNumber_t luminosityBlock() const
unsigned int maxNumberOfStreams() const
unsigned int macrostateBins_
void preModuleBeginJob(edm::ModuleDescription const &)
unsigned int lastGlobalLumi_
void prePathEvent(edm::StreamContext const &, edm::PathContext const &)
unsigned int nOutputModules_
void postGlobalBeginRun(edm::GlobalContext const &)
void preEvent(edm::StreamContext const &)
std::string const & pathName() const
void reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount)
void postEvent(edm::StreamContext const &)
std::atomic< bool > isInitTransition_
void watchPreGlobalEndLumi(PreGlobalEndLumi::slot_type const &iSlot)
void watchPreallocate(Preallocate::slot_type const &iSlot)
void watchPreGlobalEarlyTermination(PreGlobalEarlyTermination::slot_type const &iSlot)
void watchPostStreamBeginLumi(PostStreamBeginLumi::slot_type const &iSlot)
static const std::string nopath_
void watchPreGlobalBeginLumi(PreGlobalBeginLumi::slot_type const &iSlot)
std::string makeInputLegendaJson()
jsoncollector::IntJ fastPathProcessedJ_
void watchPreModuleBeginJob(PreModuleBeginJob::slot_type const &iSlot)
void preGlobalEarlyTermination(edm::GlobalContext const &, edm::TerminationOrigin)
int encode(const void *add)
void watchPostModuleEvent(PostModuleEvent::slot_type const &iSlot)
std::unique_ptr< jsoncollector::FastMonitor > jsonMonitor_
std::map< unsigned int, unsigned long > accuSize_
std::string makePathLegendaJson()
void watchPreSourceEvent(PreSourceEvent::slot_type const &iSlot)
static const std::string inputStateNames[FastMonitoringThread::inCOUNT]
jsoncollector::IntJ fastLockCountJ_
jsoncollector::IntJ fastMacrostateJ_
void startedLookingForFile()
std::string makeModuleLegendaJson()
void preallocate(edm::service::SystemBounds const &)
void setExceptionDetected(unsigned int ls)
std::string pathLegendFileJson_
std::atomic< unsigned long > totalEventsProcessed_
std::string fastMicrostateDefPath_
std::vector< ContainableAtomic< const void * > > ministate_
std::vector< double > leadTimes_
std::string microstateDefPath_
void watchPrePathEvent(PrePathEvent::slot_type const &iSlot)
void doStreamEOLSnapshot(const unsigned int ls, const unsigned int streamID)
std::string moduleLegendFile_
void postStreamEndLumi(edm::StreamContext const &)
std::map< unsigned int, unsigned int > filesProcessedDuringLumi_
void preStreamEarlyTermination(edm::StreamContext const &, edm::TerminationOrigin)
void stoppedLookingForFile(unsigned int lumi)
void preBeginJob(edm::PathsAndConsumesOfModulesBase const &, edm::ProcessContext const &pc)
void setMicroState(MicroStateService::Microstate) override
unsigned int ministateBins_
std::vector< unsigned int > ministateEncoded_
void completeReservedWithDummies()
EventID const & eventID() const
static const int nReservedPaths
void watchPreModuleEvent(PreModuleEvent::slot_type const &iSlot)
std::atomic< FastMonitoringThread::InputState > inputState_
const void * decode(unsigned int index)
static const edm::ModuleDescription reservedMicroStateNames[mCOUNT]
void start(void(FastMonitoringService::*fp)(), FastMonitoringService *cp)
FastMonitoringThread fmt_