110 "NewLumiBusyEndingLS",
111 "NewLumiIdleEndingLS",
121 "NoRequestWithIdleThreads",
122 "NoRequestWithGlobalEoL",
123 "NoRequestWithEoLThreads",
126 "SupWaitFreeChunkCopying",
128 "SupWaitFreeThreadCopying",
131 "SupLockPollingCopying",
134 "SupNewFileWaitThreadCopying",
135 "SupNewFileWaitThread",
136 "SupNewFileWaitChunkCopying",
137 "SupNewFileWaitChunk",
138 "WaitInput_fileLimit",
139 "WaitInput_waitFreeChunk",
140 "WaitInput_waitFreeChunkCopying",
141 "WaitInput_waitFreeThread",
142 "WaitInput_waitFreeThreadCopying",
144 "WaitInput_lockPolling",
145 "WaitInput_lockPollingCopying",
149 "WaitInput_newFileWaitThreadCopying",
150 "WaitInput_newFileWaitThread",
151 "WaitInput_newFileWaitChunkCopying",
152 "WaitInput_newFileWaitChunk",
153 "WaitChunk_fileLimit",
154 "WaitChunk_waitFreeChunk",
155 "WaitChunk_waitFreeChunkCopying",
156 "WaitChunk_waitFreeThread",
157 "WaitChunk_waitFreeThreadCopying",
159 "WaitChunk_lockPolling",
160 "WaitChunk_lockPollingCopying",
164 "WaitChunk_newFileWaitThreadCopying",
165 "WaitChunk_newFileWaitThread",
166 "WaitChunk_newFileWaitChunkCopying",
167 "WaitChunk_newFileWaitChunk",
178 : num_threads(),
max_threads(num_expected), threadactive_(num_expected, 0) {
185 threadactive_[tbb::this_task_arena::current_thread_index()] = 1;
190 threadactive_[tbb::this_task_arena::current_thread_index()] = 0;
200 tbbMonitoringMode_(iPS.getUntrackedParameter<
bool>(
"tbbMonitoringMode",
true)),
201 tbbConcurrencyTracker_(iPS.getUntrackedParameter<
bool>(
"tbbConcurrencyTracker",
true) && tbbMonitoringMode_),
202 sleepTime_(iPS.getUntrackedParameter<
int>(
"sleepTime", 1)),
203 fastMonIntervals_(iPS.getUntrackedParameter<unsigned
int>(
"fastMonIntervals", 2)),
204 fastName_(
"fastmoni"),
205 totalEventsProcessed_(0),
206 verbose_(iPS.getUntrackedParameter<
bool>(
"verbose")) {
243 std::string microstateBaseSuffix =
"src/EventFilter/Utilities/plugins/microstatedef.jsd";
245 if (
stat(microstatePath.c_str(), &statbuf)) {
246 microstatePath =
std::string(std::getenv(
"CMSSW_RELEASE_BASE")) +
"/" + microstateBaseSuffix;
247 if (
stat(microstatePath.c_str(), &statbuf)) {
248 microstatePath = microstateBaseSuffix;
249 if (
stat(microstatePath.c_str(), &statbuf))
250 throw cms::Exception(
"FastMonitoringService") <<
"microstate definition file not found";
260 desc.setComment(
"Service for File-based DAQ monitoring and event accounting");
261 desc.addUntracked<
bool>(
"tbbMonitoringMode",
true)
262 ->setComment(
"Monitor individual module processing per TBB thread instead of stream");
263 desc.addUntracked<
bool>(
"tbbConcurrencyTracker",
true)
264 ->setComment(
"Monitor TBB thread activity to flag microstate as real idle or overhead/other");
265 desc.addUntracked<
int>(
"sleepTime", 1)->setComment(
"Sleep time of the monitoring thread");
266 desc.addUntracked<
unsigned int>(
"fastMonIntervals", 2)
267 ->setComment(
"Modulo of sleepTime intervals on which fastmon file is written out");
268 desc.addUntracked<
bool>(
"filePerFwkStream",
true)
269 ->setComment(
"Switches on monitoring output per framework stream");
270 desc.addUntracked<
bool>(
"verbose",
false)->setComment(
"Set to use LogInfo messages from the monitoring thread");
271 desc.setAllowAnything();
272 descriptions.
add(
"FastMonitoringService",
desc);
277 for (
int i = 0;
i <
fmt_->m_data.encModule_.current_;
i++)
279 Json::Value((static_cast<const edm::ModuleDescription*>(
fmt_->m_data.encModule_.decode(
i)))->moduleLabel()));
281 for (
int i = 0;
i <
fmt_->m_data.encModule_.current_;
i++)
283 (static_cast<const edm::ModuleDescription*>(
fmt_->m_data.encModule_.decode(
i)))->moduleLabel() +
"__ACQ"));
288 moduleLegend[
"names"] = legendaVector;
289 moduleLegend[
"reserved"] = valReserved;
290 moduleLegend[
"special"] = valSpecial;
291 moduleLegend[
"output"] = valOutputModules;
293 return writer.write(moduleLegend);
301 moduleLegend[
"names"] = legendaVector;
303 return writer.write(moduleLegend);
326 throw cms::Exception(
"FastMonitoringService") <<
"EvFDaqDirector is not present";
337 <<
". No monitoring data will be written.";
340 std::ostringstream fastFileName;
342 fastFileName <<
fastName_ <<
"_pid" << std::setfill(
'0') << std::setw(5) << getpid() <<
".fast";
344 fast /= fastFileName.str();
347 std::ostringstream moduleLegFile;
348 std::ostringstream moduleLegFileJson;
349 moduleLegFile <<
"microstatelegend_pid" << std::setfill(
'0') << std::setw(5) << getpid() <<
".leg";
350 moduleLegFileJson <<
"microstatelegend_pid" << std::setfill(
'0') << std::setw(5) << getpid() <<
".jsn";
354 std::ostringstream inputLegFileJson;
355 inputLegFileJson <<
"inputlegend_pid" << std::setfill(
'0') << std::setw(5) << getpid() <<
".jsn";
371 fmt_->m_data.encModule_.completeReservedWithDummies();
385 fmt_->m_data.microstateBins_ = 0;
396 monInit_.store(
false, std::memory_order_release);
406 context =
" FromAnotherContext";
408 context =
" FromExternalSignal";
412 std::lock_guard<std::mutex>
lock(
fmt_->monlock_);
422 context =
" FromAnotherContext";
424 context =
" FromExternalSignal";
428 std::lock_guard<std::mutex>
lock(
fmt_->monlock_);
438 context =
" FromAnotherContext";
440 context =
" FromExternalSignal";
442 <<
"earlyTermination -: " <<
context;
443 std::lock_guard<std::mutex>
lock(
fmt_->monlock_);
450 std::lock_guard<std::mutex>
lock(
fmt_->monlock_);
466 std::lock_guard<std::mutex>
lock(
fmt_->monlock_);
478 std::lock_guard<std::mutex>
lock(
fmt_->monlock_);
483 if (
desc.moduleName() ==
"Stream" ||
desc.moduleName() ==
"GlobalEvFOutputModule" ||
484 desc.moduleName() ==
"EventStreamFileWriter" ||
desc.moduleName() ==
"PoolOutputModule") {
485 fmt_->m_data.encModule_.updateReserved((
void*)&
desc);
488 fmt_->m_data.encModule_.update((
void*)&
desc);
501 std::lock_guard<std::mutex>
lock(
fmt_->monlock_);
503 fmt_->m_data.microstateBins_ =
fmt_->m_data.encModule_.vecsize() * 2;
517 timeval lumiStartTime;
518 gettimeofday(&lumiStartTime,
nullptr);
522 std::lock_guard<std::mutex>
lock(
fmt_->monlock_);
533 LogDebug(
"FastMonitoringService") <<
"Lumi ended. Writing JSON information. LUMI -: " <<
lumi;
534 timeval lumiStopTime;
535 gettimeofday(&lumiStopTime,
nullptr);
537 std::lock_guard<std::mutex>
lock(
fmt_->monlock_);
542 unsigned long usecondsForLumi = (lumiStopTime.tv_sec - stt.tv_sec) * 1000000 + (lumiStopTime.tv_usec - stt.tv_usec);
545 double throughput =
throughputFactor() * double(accuSize) / double(usecondsForLumi);
547 fmt_->m_data.fastThroughputJ_.value() = throughput;
553 IntJ* lumiProcessedJptr =
dynamic_cast<IntJ*
>(
fmt_->jsonMonitor_->getMergedIntJForLumi(
"Processed",
lumi));
554 if (!lumiProcessedJptr)
555 throw cms::Exception(
"FastMonitoringService") <<
"Internal error: got null pointer from FastMonitor";
562 exception_detected =
true;
567 <<
" events were processed in LUMI " <<
lumi;
579 if (sourceReport.first) {
581 throw cms::Exception(
"FastMonitoringService") <<
"MISMATCH with SOURCE update. LUMI -: " <<
lumi 583 <<
" events(source):" << sourceReport.second;
589 <<
"Statistics for lumisection -: lumi = " <<
lumi <<
" events = " << lumiProcessedJptr->
value()
590 <<
" time = " << usecondsForLumi / 1000000 <<
" size = " << accuSize <<
" thr = " << throughput;
591 delete lumiProcessedJptr;
594 fmt_->jsonMonitor_->outputFullJSON(
"dummy",
lumi,
false);
595 fmt_->jsonMonitor_->discardCollected(
lumi);
599 std::lock_guard<std::mutex>
lock(
fmt_->monlock_);
611 std::lock_guard<std::mutex>
lock(
fmt_->monlock_);
625 std::lock_guard<std::mutex>
lock(
fmt_->monlock_);
645 fmt_->m_data.fastPathProcessedJ_ =
res + 1;
718 std::lock_guard<std::mutex>
lock(
fmt_->monlock_);
745 std::lock_guard<std::mutex>
lock(
fmt_->monlock_);
768 std::lock_guard<std::mutex>
lock(
fmt_->monlock_);
778 std::lock_guard<std::mutex>
lock(
fmt_->monlock_);
782 unsigned int proc =
it->second.first;
784 *abortFlag =
it->second.second;
788 <<
"output module wants already deleted (or never reported by SOURCE) lumisection event count for LUMI -: " 796 std::lock_guard<std::mutex>
lock(
fmt_->monlock_);
800 unsigned int abortFlag =
it->second.second;
804 <<
"output module wants already deleted (or never reported by SOURCE) lumisection status for LUMI -: " 812 monInit_.exchange(
true, std::memory_order_acquire);
813 while (!
fmt_->m_stoprequest) {
814 std::vector<std::vector<unsigned int>> lastEnc;
816 std::unique_lock<std::mutex>
lock(
fmt_->monlock_);
820 lastEnc.emplace_back(
fmt_->m_data.tmicrostateEncoded_);
821 lastEnc.emplace_back(
fmt_->m_data.microstateEncoded_);
824 std::vector<std::string> CSVv;
826 CSVv.push_back(
fmt_->jsonMonitor_->getCSVString((
int)
i));
829 lock.release()->unlock();
837 auto f = [&](std::vector<unsigned int>
const&
p) {
840 msg <<
"[" <<
p[
i] <<
",";
848 msg <<
"Current states: Ms=" <<
fmt_->m_data.fastMacrostateJ_.value() <<
" ms=";
861 fmt_->m_data.fastMacrostateJ_ =
fmt_->m_data.macrostate_;
871 fmt_->m_data.fastAvgLeadTimeJ_ = itd->second;
873 fmt_->m_data.fastAvgLeadTimeJ_ = 0.;
877 fmt_->m_data.fastFilesProcessedJ_ = iti->second;
879 fmt_->m_data.fastFilesProcessedJ_ = 0;
883 fmt_->m_data.fastLockWaitJ_ = itrd->second.first;
884 fmt_->m_data.fastLockCountJ_ = itrd->second.second;
886 fmt_->m_data.fastLockWaitJ_ = 0.;
887 fmt_->m_data.fastLockCountJ_ = 0.;
892 if (tmicrostateCopy[
i] ==
getmIdle() &&
ct_->isThreadActive(
i)) {
896 if (tmicrostateAcqCopy[
i])
897 fmt_->m_data.tmicrostateEncoded_[
i] =
898 fmt_->m_data.microstateBins_ +
fmt_->m_data.encModule_.encode(tmicrostateCopy[
i]);
900 fmt_->m_data.tmicrostateEncoded_[
i] =
fmt_->m_data.encModule_.encode(tmicrostateCopy[
i]);
904 if (microstateAcqCopy[
i])
905 fmt_->m_data.microstateEncoded_[
i] =
906 fmt_->m_data.microstateBins_ +
fmt_->m_data.encModule_.encode(microstateCopy[
i]);
908 fmt_->m_data.microstateEncoded_[
i] =
fmt_->m_data.encModule_.encode(microstateCopy[
i]);
911 bool inputStatePerThread =
false;
1014 inputStatePerThread =
true;
1018 else if (microstateCopy[
i] ==
getmIdle())
1026 inputStatePerThread =
true;
1040 if (!inputStatePerThread)
1042 fmt_->m_data.inputState_[
i] =
fmt_->m_data.inputState_[0];
1045 fmt_->jsonMonitor_->snapGlobal(
ls);
1047 fmt_->jsonMonitor_->snap(
ls);
std::atomic< bool > has_data_exception_
constexpr edm::ModuleDescription const * getmFwkEoL()
std::atomic< FastMonState::InputState > inputState_
void watchPreStreamEarlyTermination(PreStreamEarlyTermination::slot_type const &iSlot)
static const std::string inputStateNames[FastMonState::inCOUNT]
constexpr edm::ModuleDescription const * getmInvalid()
constexpr edm::ModuleDescription const * getmFwkOvhSrc()
void watchPreEvent(PreEvent::slot_type const &iSlot)
ModuleDescription const * moduleDescription() const
constexpr edm::ModuleDescription const * getmFwkOvhMod()
void postModuleEventAcquire(edm::StreamContext const &, edm::ModuleCallingContext const &)
LuminosityBlockNumber_t luminosityBlock() const
std::vector< ContainableAtomic< const void * > > microstate_
constexpr edm::ModuleDescription const * getmIdle()
void postSourceEvent(edm::StreamID)
std::atomic< bool > isInitTransition_
void watchPreallocate(Preallocate::slot_type const &iSlot)
void startedLookingForFile()
constexpr edm::ModuleDescription const * getmBoL()
void setExceptionDetected(unsigned int ls)
void on_scheduler_exit(bool) override
void watchPreModuleEventAcquire(PreModuleEventAcquire::slot_type const &iSlot)
void watchPostEndJob(PostEndJob::slot_type const &iSlot)
void preallocate(edm::service::SystemBounds const &)
std::map< unsigned int, timeval > lumiStartTime_
constexpr edm::ModuleDescription const * getmIgnore()
void preGlobalBeginLumi(edm::GlobalContext const &)
void watchPreModuleEvent(PreModuleEvent::slot_type const &iSlot)
void postGlobalEndLumi(edm::GlobalContext const &)
void postEvent(edm::StreamContext const &)
void accumulateFileSize(unsigned int lumi, unsigned long fileSize)
void watchPreGlobalEndLumi(PreGlobalEndLumi::slot_type const &iSlot)
std::map< unsigned int, unsigned long > accuSize_
std::filesystem::path workingDirectory_
unsigned int snapCounter_
std::vector< std::atomic< bool > * > streamCounterUpdating_
void watchPostEvent(PostEvent::slot_type const &iSlot)
volatile std::atomic< bool > shutdown_flag
bool tbbConcurrencyTracker_
void watchPostStreamEndLumi(PostStreamEndLumi::slot_type const &iSlot)
void watchPreGlobalBeginLumi(PreGlobalBeginLumi::slot_type const &iSlot)
unsigned int nOutputModules_
void watchPostModuleEvent(PostModuleEvent::slot_type const &iSlot)
Value & append(const Value &value)
Append value to array at the end.
void watchPostSourceEvent(PostSourceEvent::slot_type const &iSlot)
std::map< unsigned int, unsigned int > filesProcessedDuringLumi_
std::pair< bool, unsigned int > getEventReport(unsigned int lumi, bool erase)
void preGlobalEndLumi(edm::GlobalContext const &)
unsigned int lastGlobalLumi_
std::vector< ContainableAtomic< unsigned int > > threadactive_
bool isExceptionOnData(unsigned int ls)
void watchPreStreamEndLumi(PreStreamEndLumi::slot_type const &iSlot)
std::unique_ptr< FastMonitoringThread > fmt_
constexpr edm::ModuleDescription const * getmFwk()
void preGlobalEarlyTermination(edm::GlobalContext const &, edm::TerminationOrigin)
void watchPreSourceEarlyTermination(PreSourceEarlyTermination::slot_type const &iSlot)
constexpr int nSpecialModules
std::string inputLegendFileJson_
LuminosityBlockNumber_t luminosityBlock() const
unsigned int fastMonIntervals_
void watchJobFailure(JobFailure::slot_type const &iSlot)
convenience function for attaching to signal
std::atomic< bool > monInit_
std::string makeModuleLegendaJson()
ConcurrencyTracker(unsigned num_expected)
void preModuleBeginJob(edm::ModuleDescription const &)
std::string makeInputLegendaJson()
std::map< unsigned int, std::pair< unsigned int, bool > > processedEventsPerLumi_
MicroStateService(const edm::ParameterSet &, edm::ActivityRegistry &)
constexpr edm::ModuleDescription const * getmEoL()
unsigned int lumiFromSource_
void on_scheduler_entry(bool) override
constexpr edm::ModuleDescription const * getmGlobEoL()
std::string microstateDefPath_
void preStreamEndLumi(edm::StreamContext const &)
std::vector< ContainableAtomic< unsigned char > > tmicrostateAcqFlag_
std::map< unsigned int, double > avgLeadTime_
std::string moduleLegendFile_
void watchPostStreamBeginLumi(PostStreamBeginLumi::slot_type const &iSlot)
void doSnapshot(const unsigned int ls, const bool isGlobalEOL)
void preStreamEarlyTermination(edm::StreamContext const &, edm::TerminationOrigin)
void watchPreGlobalEarlyTermination(PreGlobalEarlyTermination::slot_type const &iSlot)
std::atomic< FastMonState::InputState > inputSupervisorState_
unsigned int nMonThreads_
StreamID const & streamID() const
static unsigned int getTID()
constexpr double throughputFactor()
void watchPostModuleEventAcquire(PostModuleEventAcquire::slot_type const &iSlot)
std::string fastMicrostateDefPath_
std::filesystem::path runDirectory_
void watchPostGlobalEndLumi(PostGlobalEndLumi::slot_type const &iSlot)
std::string moduleLegendFileJson_
virtual ~MicroStateService()
void watchPreModuleBeginJob(PreModuleBeginJob::slot_type const &iSlot)
std::vector< ContainableAtomic< unsigned char > > microstateAcqFlag_
void preModuleEventAcquire(edm::StreamContext const &, edm::ModuleCallingContext const &)
void postModuleEvent(edm::StreamContext const &, edm::ModuleCallingContext const &)
LuminosityBlockID const & luminosityBlockID() const
void postStreamBeginLumi(edm::StreamContext const &)
static unsigned int getSID(edm::StreamContext const &sc)
std::unique_ptr< ConcurrencyTracker > ct_
Log< level::Info, false > LogInfo
void preSourceEvent(edm::StreamID)
void postStreamEndLumi(edm::StreamContext const &)
constexpr edm::ModuleDescription const * getmInput()
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
void preStreamBeginLumi(edm::StreamContext const &)
std::atomic< unsigned long > totalEventsProcessed_
FedRawDataInputSource * inputSource_
std::map< unsigned int, std::pair< double, unsigned int > > lockStatsDuringLumi_
~FastMonitoringService() override
void setTMicrostate(FastMonState::Microstate m)
constexpr int nReservedModules
void watchPreStreamBeginLumi(PreStreamBeginLumi::slot_type const &iSlot)
void add(std::string const &label, ParameterSetDescription const &psetDescription)
std::vector< double > leadTimes_
void preBeginJob(edm::PathsAndConsumesOfModulesBase const &, edm::ProcessContext const &pc)
void watchPreBeginJob(PreBeginJob::slot_type const &iSlot)
convenience function for attaching to signal
void stoppedLookingForFile(unsigned int lumi)
std::atomic< bool > has_source_exception_
DAQSource * daqInputSource_
EventID const & eventID() const
std::atomic< int > num_threads
constexpr edm::ModuleDescription const * getmEvent()
void postGlobalBeginRun(edm::GlobalContext const &)
void preEvent(edm::StreamContext const &)
unsigned int getEventsProcessedForLumi(unsigned int lumi, bool *abortFlag=nullptr)
void preSourceEarlyTermination(edm::TerminationOrigin)
void watchPreSourceEvent(PreSourceEvent::slot_type const &iSlot)
unsigned int value() const
bool getAbortFlagForLumi(unsigned int lumi)
bool isThreadActive(unsigned index)
Log< level::Warning, false > LogWarning
void preModuleEvent(edm::StreamContext const &, edm::ModuleCallingContext const &)
static const edm::ModuleDescription specialMicroStateNames[FastMonState::mCOUNT]
Writes a Value in JSON format in a human friendly way.
if(threadIdxLocalY==0 &&threadIdxLocalX==0)
bool exceptionDetected() const
void reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount)
std::vector< unsigned int > exceptionInLS_
constexpr edm::ModuleDescription const * getmIdleSource()
void watchPostBeginJob(PostBeginJob::slot_type const &iSlot)
convenience function for attaching to signal
constexpr edm::ModuleDescription const * getmDqm()
array value (ordered list)
std::vector< ContainableAtomic< const void * > > tmicrostate_