63 "NewLumiBusyEndingLS",
64 "NewLumiIdleEndingLS",
74 "NoRequestWithIdleThreads",
75 "NoRequestWithGlobalEoL",
76 "NoRequestWithEoLThreads",
79 "SupWaitFreeChunkCopying",
81 "SupWaitFreeThreadCopying",
84 "SupLockPollingCopying",
87 "SupNewFileWaitThreadCopying",
88 "SupNewFileWaitThread",
89 "SupNewFileWaitChunkCopying",
90 "SupNewFileWaitChunk",
91 "WaitInput_fileLimit",
92 "WaitInput_waitFreeChunk",
93 "WaitInput_waitFreeChunkCopying",
94 "WaitInput_waitFreeThread",
95 "WaitInput_waitFreeThreadCopying",
97 "WaitInput_lockPolling",
98 "WaitInput_lockPollingCopying",
102 "WaitInput_newFileWaitThreadCopying",
103 "WaitInput_newFileWaitThread",
104 "WaitInput_newFileWaitChunkCopying",
105 "WaitInput_newFileWaitChunk",
106 "WaitChunk_fileLimit",
107 "WaitChunk_waitFreeChunk",
108 "WaitChunk_waitFreeChunkCopying",
109 "WaitChunk_waitFreeThread",
110 "WaitChunk_waitFreeThreadCopying",
112 "WaitChunk_lockPolling",
113 "WaitChunk_lockPollingCopying",
117 "WaitChunk_newFileWaitThreadCopying",
118 "WaitChunk_newFileWaitThread",
119 "WaitChunk_newFileWaitChunkCopying",
120 "WaitChunk_newFileWaitChunk",
124 const std::string FastMonitoringService::nopath_ =
"NoPath";
131 sleepTime_(iPS.getUntrackedParameter<
int>(
"sleepTime", 1)),
132 fastMonIntervals_(iPS.getUntrackedParameter<unsigned
int>(
"fastMonIntervals", 2)),
133 fastName_(
"fastmoni"),
134 slowName_(
"slowmoni"),
135 filePerFwkStream_(iPS.getUntrackedParameter<
bool>(
"filePerFwkStream",
false)),
136 totalEventsProcessed_(0),
137 verbose_(iPS.getUntrackedParameter<
bool>(
"verbose")) {
175 std::string microstateBaseSuffix =
"src/EventFilter/Utilities/plugins/microstatedef.jsd";
177 if (
stat(microstatePath.c_str(), &statbuf)) {
178 microstatePath =
std::string(std::getenv(
"CMSSW_RELEASE_BASE")) +
"/" + microstateBaseSuffix;
179 if (
stat(microstatePath.c_str(), &statbuf)) {
180 microstatePath = microstateBaseSuffix;
181 if (
stat(microstatePath.c_str(), &statbuf))
182 throw cms::Exception(
"FastMonitoringService") <<
"microstate definition file not found";
192 desc.setComment(
"Service for File-based DAQ monitoring and event accounting");
193 desc.addUntracked<
int>(
"sleepTime", 1)->setComment(
"Sleep time of the monitoring thread");
194 desc.addUntracked<
unsigned int>(
"fastMonIntervals", 2)
195 ->setComment(
"Modulo of sleepTime intervals on which fastmon file is written out");
196 desc.addUntracked<
bool>(
"filePerFwkStream",
false)
197 ->setComment(
"Switches on monitoring output per framework stream");
198 desc.addUntracked<
bool>(
"verbose",
false)->setComment(
"Set to use LogInfo messages from the monitoring thread");
199 desc.setAllowAnything();
200 descriptions.
add(
"FastMonitoringService",
desc);
205 for (
int i = 0;
i <
fmt_->m_data.encPath_[0].current_;
i++)
206 legendaVector.
append(
Json::Value(*(static_cast<const std::string*>(
fmt_->m_data.encPath_[0].decode(
i)))));
209 pathLegend[
"names"] = legendaVector;
210 pathLegend[
"reserved"] = valReserved;
212 return writer.write(pathLegend);
217 for (
int i = 0;
i <
fmt_->m_data.encModule_.current_;
i++)
219 Json::Value((static_cast<const edm::ModuleDescription*>(
fmt_->m_data.encModule_.decode(
i)))->moduleLabel()));
221 for (
int i = 0;
i <
fmt_->m_data.encModule_.current_;
i++)
223 (static_cast<const edm::ModuleDescription*>(
fmt_->m_data.encModule_.decode(
i)))->moduleLabel() +
"__ACQ"));
228 moduleLegend[
"names"] = legendaVector;
229 moduleLegend[
"reserved"] = valReserved;
230 moduleLegend[
"special"] = valSpecial;
231 moduleLegend[
"output"] = valOutputModules;
233 return writer.write(moduleLegend);
241 moduleLegend[
"names"] = legendaVector;
243 return writer.write(moduleLegend);
262 throw cms::Exception(
"FastMonitoringService") <<
"EvFDaqDirector is not present";
273 <<
". No monitoring data will be written.";
276 std::ostringstream fastFileName;
278 fastFileName <<
fastName_ <<
"_pid" << std::setfill(
'0') << std::setw(5) << getpid() <<
".fast";
280 fast /= fastFileName.str();
284 std::ostringstream fastFileNameTid;
285 fastFileNameTid <<
fastName_ <<
"_pid" << std::setfill(
'0') << std::setw(5) << getpid() <<
"_tid" <<
i 288 fastTid /= fastFileNameTid.str();
292 std::ostringstream moduleLegFile;
293 std::ostringstream moduleLegFileJson;
294 moduleLegFile <<
"microstatelegend_pid" << std::setfill(
'0') << std::setw(5) << getpid() <<
".leg";
295 moduleLegFileJson <<
"microstatelegend_pid" << std::setfill(
'0') << std::setw(5) << getpid() <<
".jsn";
299 std::ostringstream pathLegFile;
300 std::ostringstream pathLegFileJson;
301 pathLegFile <<
"pathlegend_pid" << std::setfill(
'0') << std::setw(5) << getpid() <<
".leg";
303 pathLegFileJson <<
"pathlegend_pid" << std::setfill(
'0') << std::setw(5) << getpid() <<
".jsn";
306 std::ostringstream inputLegFileJson;
307 inputLegFileJson <<
"inputlegend_pid" << std::setfill(
'0') << std::setw(5) << getpid() <<
".jsn";
324 fmt_->m_data.encModule_.completeReservedWithDummies();
329 fmt_->m_data.microstateAcqFlag_.push_back(0);
335 fmt_->m_data.encPath_.emplace_back(0);
336 fmt_->m_data.encPath_[
i].update(static_cast<const void*>(&
nopath_));
339 fmt_->m_data.encPath_[
i].updatePreinit(
path);
341 for (
auto& endPath : pathsInfo.
endPaths()) {
342 fmt_->m_data.encPath_[
i].updatePreinit(endPath);
350 fmt_->m_data.microstateBins_ = 0;
352 fmt_->m_data.ministateBins_ =
fmt_->m_data.encPath_[0].vecsize();
362 monInit_.store(
false, std::memory_order_release);
383 context =
" FromAnotherContext";
385 context =
" FromExternalSignal";
389 std::lock_guard<std::mutex>
lock(
fmt_->monlock_);
399 context =
" FromAnotherContext";
401 context =
" FromExternalSignal";
405 std::lock_guard<std::mutex>
lock(
fmt_->monlock_);
415 context =
" FromAnotherContext";
417 context =
" FromExternalSignal";
419 <<
"earlyTermination -: " <<
context;
420 std::lock_guard<std::mutex>
lock(
fmt_->monlock_);
427 std::lock_guard<std::mutex>
lock(
fmt_->monlock_);
443 std::lock_guard<std::mutex>
lock(
fmt_->monlock_);
455 std::lock_guard<std::mutex>
lock(
fmt_->monlock_);
460 if (
desc.moduleName() ==
"Stream" ||
desc.moduleName() ==
"GlobalEvFOutputModule" ||
461 desc.moduleName() ==
"EvFOutputModule" ||
desc.moduleName() ==
"EventStreamFileWriter" ||
462 desc.moduleName() ==
"PoolOutputModule") {
463 fmt_->m_data.encModule_.updateReserved((
void*)&
desc);
466 fmt_->m_data.encModule_.update((
void*)&
desc);
482 std::lock_guard<std::mutex>
lock(
fmt_->monlock_);
484 fmt_->m_data.microstateBins_ =
fmt_->m_data.encModule_.vecsize() * 2;
498 timeval lumiStartTime;
499 gettimeofday(&lumiStartTime,
nullptr);
503 std::lock_guard<std::mutex>
lock(
fmt_->monlock_);
509 LogDebug(
"FastMonitoringService") <<
"Lumi ended. Writing JSON information. LUMI -: " <<
lumi;
510 timeval lumiStopTime;
511 gettimeofday(&lumiStopTime,
nullptr);
513 std::lock_guard<std::mutex>
lock(
fmt_->monlock_);
518 unsigned long usecondsForLumi = (lumiStopTime.tv_sec - stt.tv_sec) * 1000000 + (lumiStopTime.tv_usec - stt.tv_usec);
521 double throughput =
throughputFactor() * double(accuSize) / double(usecondsForLumi);
523 fmt_->m_data.fastThroughputJ_.value() = throughput;
529 IntJ* lumiProcessedJptr =
dynamic_cast<IntJ*
>(
fmt_->jsonMonitor_->getMergedIntJForLumi(
"Processed",
lumi));
530 if (!lumiProcessedJptr)
531 throw cms::Exception(
"FastMonitoringService") <<
"Internal error: got null pointer from FastMonitor";
538 exception_detected =
true;
543 <<
" events were processed in LUMI " <<
lumi;
555 if (sourceReport.first) {
557 throw cms::Exception(
"FastMonitoringService") <<
"MISMATCH with SOURCE update. LUMI -: " <<
lumi 559 <<
" events(source):" << sourceReport.second;
565 <<
"Statistics for lumisection -: lumi = " <<
lumi <<
" events = " << lumiProcessedJptr->
value()
566 <<
" time = " << usecondsForLumi / 1000000 <<
" size = " << accuSize <<
" thr = " << throughput;
567 delete lumiProcessedJptr;
574 std::stringstream slowFileNameStem;
575 slowFileNameStem <<
slowName_ <<
"_ls" << std::setfill(
'0') << std::setw(4) <<
lumi <<
"_pid" << std::setfill(
'0')
576 << std::setw(5) << getpid();
578 slow /= slowFileNameStem.str();
579 fmt_->jsonMonitor_->outputFullJSONs(slow.string(),
".jsn",
lumi,
output);
581 std::stringstream slowFileName;
582 slowFileName <<
slowName_ <<
"_ls" << std::setfill(
'0') << std::setw(4) <<
lumi <<
"_pid" << std::setfill(
'0')
583 << std::setw(5) << getpid() <<
".jsn";
585 slow /= slowFileName.str();
589 fmt_->jsonMonitor_->discardCollected(
lumi);
593 std::lock_guard<std::mutex>
lock(
fmt_->monlock_);
607 std::lock_guard<std::mutex>
lock(
fmt_->monlock_);
611 *(
fmt_->m_data.processed_[sid]) = 0;
623 std::lock_guard<std::mutex>
lock(
fmt_->monlock_);
651 fmt_->m_data.fastPathProcessedJ_ =
res + 1;
698 std::lock_guard<std::mutex>
lock(
fmt_->monlock_);
725 std::lock_guard<std::mutex>
lock(
fmt_->monlock_);
748 std::lock_guard<std::mutex>
lock(
fmt_->monlock_);
754 std::lock_guard<std::mutex>
lock(
fmt_->monlock_);
758 unsigned int proc = it->second.first;
760 *abortFlag = it->second.second;
764 <<
"output module wants already deleted (or never reported by SOURCE) lumisection event count for LUMI -: " 772 std::lock_guard<std::mutex>
lock(
fmt_->monlock_);
776 unsigned int abortFlag = it->second.second;
780 <<
"output module wants already deleted (or never reported by SOURCE) lumisection status for LUMI -: " 788 monInit_.exchange(
true, std::memory_order_acquire);
789 while (!
fmt_->m_stoprequest) {
790 std::vector<std::vector<unsigned int>> lastEnc;
792 std::unique_lock<std::mutex>
lock(
fmt_->monlock_);
796 lastEnc.emplace_back(
fmt_->m_data.ministateEncoded_);
797 lastEnc.emplace_back(
fmt_->m_data.microstateEncoded_);
801 std::vector<std::string> CSVv;
803 CSVv.push_back(
fmt_->jsonMonitor_->getCSVString((
int)
i));
806 lock.release()->unlock();
814 lock.release()->unlock();
824 auto f = [&](std::vector<unsigned int>
const&
p) {
827 msg <<
"[" <<
p[
i] <<
",";
835 msg <<
"Current states: Ms=" <<
fmt_->m_data.fastMacrostateJ_.value() <<
" ms=";
848 fmt_->m_data.fastMacrostateJ_ =
fmt_->m_data.macrostate_;
850 std::vector<const void*> microstateCopy(
fmt_->m_data.microstate_.begin(),
fmt_->m_data.microstate_.end());
851 std::vector<unsigned char> microstateAcqCopy(
fmt_->m_data.microstateAcqFlag_.begin(),
852 fmt_->m_data.microstateAcqFlag_.end());
857 fmt_->m_data.fastAvgLeadTimeJ_ = itd->second;
859 fmt_->m_data.fastAvgLeadTimeJ_ = 0.;
863 fmt_->m_data.fastFilesProcessedJ_ = iti->second;
865 fmt_->m_data.fastFilesProcessedJ_ = 0;
869 fmt_->m_data.fastLockWaitJ_ = itrd->second.first;
870 fmt_->m_data.fastLockCountJ_ = itrd->second.second;
872 fmt_->m_data.fastLockWaitJ_ = 0.;
873 fmt_->m_data.fastLockCountJ_ = 0.;
878 fmt_->m_data.ministateEncoded_[
i] =
fmt_->m_data.encPath_[
i].encodeString(
fmt_->m_data.ministate_[
i]);
879 if (microstateAcqCopy[
i])
880 fmt_->m_data.microstateEncoded_[
i] =
881 fmt_->m_data.microstateBins_ +
fmt_->m_data.encModule_.encode(microstateCopy[
i]);
883 fmt_->m_data.microstateEncoded_[
i] =
fmt_->m_data.encModule_.encode(microstateCopy[
i]);
886 bool inputStatePerThread =
false;
989 inputStatePerThread =
true;
1000 inputStatePerThread =
true;
1013 if (!inputStatePerThread)
1015 fmt_->m_data.inputState_[
i] =
fmt_->m_data.inputState_[0];
1018 fmt_->jsonMonitor_->snapGlobal(
ls);
1020 fmt_->jsonMonitor_->snap(
ls);
std::atomic< bool > has_data_exception_
void prePathEvent(edm::StreamContext const &, edm::PathContext const &)
std::atomic< FastMonState::InputState > inputState_
std::string pathLegendFileJson_
void watchPreStreamEarlyTermination(PreStreamEarlyTermination::slot_type const &iSlot)
static const std::string inputStateNames[FastMonState::inCOUNT]
void watchPreEvent(PreEvent::slot_type const &iSlot)
ModuleDescription const * moduleDescription() const
void postModuleEventAcquire(edm::StreamContext const &, edm::ModuleCallingContext const &)
LuminosityBlockNumber_t luminosityBlock() const
std::string pathLegendFile_
void postSourceEvent(edm::StreamID)
std::string makePathLegendaJson()
std::atomic< bool > isInitTransition_
void watchPrePathEvent(PrePathEvent::slot_type const &iSlot)
void watchPreallocate(Preallocate::slot_type const &iSlot)
void startedLookingForFile()
void setExceptionDetected(unsigned int ls)
void watchPreModuleEventAcquire(PreModuleEventAcquire::slot_type const &iSlot)
void watchPostEndJob(PostEndJob::slot_type const &iSlot)
void preallocate(edm::service::SystemBounds const &)
std::map< unsigned int, timeval > lumiStartTime_
void preGlobalBeginLumi(edm::GlobalContext const &)
void watchPreModuleEvent(PreModuleEvent::slot_type const &iSlot)
void postGlobalEndLumi(edm::GlobalContext const &)
void postEvent(edm::StreamContext const &)
void accumulateFileSize(unsigned int lumi, unsigned long fileSize)
void watchPreGlobalEndLumi(PreGlobalEndLumi::slot_type const &iSlot)
std::map< unsigned int, unsigned long > accuSize_
std::filesystem::path workingDirectory_
unsigned int snapCounter_
std::vector< std::atomic< bool > * > streamCounterUpdating_
void watchPostEvent(PostEvent::slot_type const &iSlot)
volatile std::atomic< bool > shutdown_flag
void watchPostStreamEndLumi(PostStreamEndLumi::slot_type const &iSlot)
void watchPreGlobalBeginLumi(PreGlobalBeginLumi::slot_type const &iSlot)
unsigned int nOutputModules_
void watchPostModuleEvent(PostModuleEvent::slot_type const &iSlot)
Value & append(const Value &value)
Append value to array at the end.
void watchPostSourceEvent(PostSourceEvent::slot_type const &iSlot)
std::map< unsigned int, unsigned int > filesProcessedDuringLumi_
std::pair< bool, unsigned int > getEventReport(unsigned int lumi, bool erase)
void preGlobalEndLumi(edm::GlobalContext const &)
void setMicroState(FastMonState::Microstate)
unsigned int lastGlobalLumi_
bool isExceptionOnData(unsigned int ls)
void watchPreStreamEndLumi(PreStreamEndLumi::slot_type const &iSlot)
void preGlobalEarlyTermination(edm::GlobalContext const &, edm::TerminationOrigin)
void watchPreSourceEarlyTermination(PreSourceEarlyTermination::slot_type const &iSlot)
static const edm::ModuleDescription reservedMicroStateNames[FastMonState::mCOUNT]
constexpr int nSpecialModules
std::string inputLegendFileJson_
std::vector< std::string > const & endPaths() const
LuminosityBlockNumber_t luminosityBlock() const
unsigned int fastMonIntervals_
void watchJobFailure(JobFailure::slot_type const &iSlot)
convenience function for attaching to signal
std::atomic< bool > monInit_
std::string makeModuleLegendaJson()
void preModuleBeginJob(edm::ModuleDescription const &)
std::string makeInputLegendaJson()
std::map< unsigned int, std::pair< unsigned int, bool > > processedEventsPerLumi_
MicroStateService(const edm::ParameterSet &, edm::ActivityRegistry &)
unsigned int lumiFromSource_
std::string microstateDefPath_
void preStreamEndLumi(edm::StreamContext const &)
std::map< unsigned int, double > avgLeadTime_
std::string moduleLegendFile_
void watchPostStreamBeginLumi(PostStreamBeginLumi::slot_type const &iSlot)
void doSnapshot(const unsigned int ls, const bool isGlobalEOL)
void preStreamEarlyTermination(edm::StreamContext const &, edm::TerminationOrigin)
void watchPreGlobalEarlyTermination(PreGlobalEarlyTermination::slot_type const &iSlot)
std::atomic< FastMonState::InputState > inputSupervisorState_
StreamID const & streamID() const
static const std::string nopath_
constexpr double throughputFactor()
void watchPostModuleEventAcquire(PostModuleEventAcquire::slot_type const &iSlot)
std::string fastMicrostateDefPath_
std::filesystem::path runDirectory_
void watchPostGlobalEndLumi(PostGlobalEndLumi::slot_type const &iSlot)
std::string moduleLegendFileJson_
virtual ~MicroStateService()
void watchPreModuleBeginJob(PreModuleBeginJob::slot_type const &iSlot)
void preModuleEventAcquire(edm::StreamContext const &, edm::ModuleCallingContext const &)
void postModuleEvent(edm::StreamContext const &, edm::ModuleCallingContext const &)
LuminosityBlockID const & luminosityBlockID() const
std::vector< std::string > fastPathList_
constexpr int nReservedPaths
void postStreamBeginLumi(edm::StreamContext const &)
Log< level::Info, false > LogInfo
void preSourceEvent(edm::StreamID)
void postStreamEndLumi(edm::StreamContext const &)
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
void preStreamBeginLumi(edm::StreamContext const &)
std::atomic< unsigned long > totalEventsProcessed_
FedRawDataInputSource * inputSource_
std::map< unsigned int, std::pair< double, unsigned int > > lockStatsDuringLumi_
~FastMonitoringService() override
constexpr int nReservedModules
void watchPreStreamBeginLumi(PreStreamBeginLumi::slot_type const &iSlot)
void add(std::string const &label, ParameterSetDescription const &psetDescription)
std::vector< double > leadTimes_
void preBeginJob(edm::PathsAndConsumesOfModulesBase const &, edm::ProcessContext const &pc)
void watchPreBeginJob(PreBeginJob::slot_type const &iSlot)
convenience function for attaching to signal
void stoppedLookingForFile(unsigned int lumi)
std::atomic< bool > has_source_exception_
DAQSource * daqInputSource_
EventID const & eventID() const
void postGlobalBeginRun(edm::GlobalContext const &)
void preEvent(edm::StreamContext const &)
std::string const & pathName() const
unsigned int getEventsProcessedForLumi(unsigned int lumi, bool *abortFlag=nullptr)
void preSourceEarlyTermination(edm::TerminationOrigin)
void watchPreSourceEvent(PreSourceEvent::slot_type const &iSlot)
unsigned int value() const
bool getAbortFlagForLumi(unsigned int lumi)
std::shared_ptr< FastMonitoringThread > fmt_
Log< level::Warning, false > LogWarning
void preModuleEvent(edm::StreamContext const &, edm::ModuleCallingContext const &)
Writes a Value in JSON format in a human friendly way.
bool exceptionDetected() const
void reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount)
std::vector< unsigned int > exceptionInLS_
std::vector< std::string > const & paths() const
void watchPostBeginJob(PostBeginJob::slot_type const &iSlot)
convenience function for attaching to signal
array value (ordered list)