62 "NewLumiBusyEndingLS",
63 "NewLumiIdleEndingLS",
73 "NoRequestWithIdleThreads",
74 "NoRequestWithGlobalEoL",
75 "NoRequestWithEoLThreads",
78 "SupWaitFreeChunkCopying",
80 "SupWaitFreeThreadCopying",
83 "SupLockPollingCopying",
86 "SupNewFileWaitThreadCopying",
87 "SupNewFileWaitThread",
88 "SupNewFileWaitChunkCopying",
89 "SupNewFileWaitChunk",
90 "WaitInput_fileLimit",
91 "WaitInput_waitFreeChunk",
92 "WaitInput_waitFreeChunkCopying",
93 "WaitInput_waitFreeThread",
94 "WaitInput_waitFreeThreadCopying",
96 "WaitInput_lockPolling",
97 "WaitInput_lockPollingCopying",
101 "WaitInput_newFileWaitThreadCopying",
102 "WaitInput_newFileWaitThread",
103 "WaitInput_newFileWaitChunkCopying",
104 "WaitInput_newFileWaitChunk",
105 "WaitChunk_fileLimit",
106 "WaitChunk_waitFreeChunk",
107 "WaitChunk_waitFreeChunkCopying",
108 "WaitChunk_waitFreeThread",
109 "WaitChunk_waitFreeThreadCopying",
111 "WaitChunk_lockPolling",
112 "WaitChunk_lockPollingCopying",
116 "WaitChunk_newFileWaitThreadCopying",
117 "WaitChunk_newFileWaitThread",
118 "WaitChunk_newFileWaitChunkCopying",
119 "WaitChunk_newFileWaitChunk"};
121 const std::string FastMonitoringService::nopath_ =
"NoPath";
128 sleepTime_(iPS.getUntrackedParameter<
int>(
"sleepTime", 1)),
129 fastMonIntervals_(iPS.getUntrackedParameter<unsigned
int>(
"fastMonIntervals", 2)),
130 fastName_(
"fastmoni"),
131 slowName_(
"slowmoni"),
132 filePerFwkStream_(iPS.getUntrackedParameter<
bool>(
"filePerFwkStream",
false)),
133 totalEventsProcessed_(0) {
171 std::string microstateBaseSuffix =
"src/EventFilter/Utilities/plugins/microstatedef.jsd";
173 if (
stat(microstatePath.c_str(), &statbuf)) {
174 microstatePath =
std::string(std::getenv(
"CMSSW_RELEASE_BASE")) +
"/" + microstateBaseSuffix;
175 if (
stat(microstatePath.c_str(), &statbuf)) {
176 microstatePath = microstateBaseSuffix;
177 if (
stat(microstatePath.c_str(), &statbuf))
178 throw cms::Exception(
"FastMonitoringService") <<
"microstate definition file not found";
188 desc.setComment(
"Service for File-based DAQ monitoring and event accounting");
189 desc.addUntracked<
int>(
"sleepTime", 1)->setComment(
"Sleep time of the monitoring thread");
190 desc.addUntracked<
unsigned int>(
"fastMonIntervals", 2)
191 ->setComment(
"Modulo of sleepTime intervals on which fastmon file is written out");
192 desc.addUntracked<
bool>(
"filePerFwkStream",
false)
193 ->setComment(
"Switches on monitoring output per framework stream");
194 desc.setAllowAnything();
195 descriptions.
add(
"FastMonitoringService",
desc);
200 for (
int i = 0;
i <
fmt_->m_data.encPath_[0].current_;
i++)
201 legendaVector.
append(
Json::Value(*(static_cast<const std::string*>(
fmt_->m_data.encPath_[0].decode(
i)))));
204 pathLegend[
"names"] = legendaVector;
205 pathLegend[
"reserved"] = valReserved;
207 return writer.write(pathLegend);
212 for (
int i = 0;
i <
fmt_->m_data.encModule_.current_;
i++)
214 Json::Value((static_cast<const edm::ModuleDescription*>(
fmt_->m_data.encModule_.decode(
i)))->moduleLabel()));
216 for (
int i = 0;
i <
fmt_->m_data.encModule_.current_;
i++)
218 (static_cast<const edm::ModuleDescription*>(
fmt_->m_data.encModule_.decode(
i)))->moduleLabel() +
"__ACQ"));
223 moduleLegend[
"names"] = legendaVector;
224 moduleLegend[
"reserved"] = valReserved;
225 moduleLegend[
"special"] = valSpecial;
226 moduleLegend[
"output"] = valOutputModules;
228 return writer.write(moduleLegend);
236 moduleLegend[
"names"] = legendaVector;
238 return writer.write(moduleLegend);
257 throw cms::Exception(
"FastMonitoringService") <<
"EvFDaqDirector is not present";
268 <<
". No monitoring data will be written.";
271 std::ostringstream fastFileName;
273 fastFileName <<
fastName_ <<
"_pid" << std::setfill(
'0') << std::setw(5) << getpid() <<
".fast";
275 fast /= fastFileName.str();
279 std::ostringstream fastFileNameTid;
280 fastFileNameTid <<
fastName_ <<
"_pid" << std::setfill(
'0') << std::setw(5) << getpid() <<
"_tid" <<
i
283 fastTid /= fastFileNameTid.str();
287 std::ostringstream moduleLegFile;
288 std::ostringstream moduleLegFileJson;
289 moduleLegFile <<
"microstatelegend_pid" << std::setfill(
'0') << std::setw(5) << getpid() <<
".leg";
290 moduleLegFileJson <<
"microstatelegend_pid" << std::setfill(
'0') << std::setw(5) << getpid() <<
".jsn";
294 std::ostringstream pathLegFile;
295 std::ostringstream pathLegFileJson;
296 pathLegFile <<
"pathlegend_pid" << std::setfill(
'0') << std::setw(5) << getpid() <<
".leg";
298 pathLegFileJson <<
"pathlegend_pid" << std::setfill(
'0') << std::setw(5) << getpid() <<
".jsn";
301 std::ostringstream inputLegFileJson;
302 inputLegFileJson <<
"inputlegend_pid" << std::setfill(
'0') << std::setw(5) << getpid() <<
".jsn";
319 fmt_->m_data.encModule_.completeReservedWithDummies();
324 fmt_->m_data.microstateAcqFlag_.push_back(0);
330 fmt_->m_data.encPath_.emplace_back(0);
331 fmt_->m_data.encPath_[
i].update(static_cast<const void*>(&
nopath_));
334 fmt_->m_data.encPath_[
i].updatePreinit(
path);
336 for (
auto& endPath : pathsInfo.
endPaths()) {
337 fmt_->m_data.encPath_[
i].updatePreinit(endPath);
345 fmt_->m_data.microstateBins_ = 0;
347 fmt_->m_data.ministateBins_ =
fmt_->m_data.encPath_[0].vecsize();
357 monInit_.store(
false, std::memory_order_release);
378 context =
" FromAnotherContext";
380 context =
" FromExternalSignal";
384 std::lock_guard<std::mutex>
lock(
fmt_->monlock_);
393 context =
" FromAnotherContext";
395 context =
" FromExternalSignal";
399 std::lock_guard<std::mutex>
lock(
fmt_->monlock_);
408 context =
" FromAnotherContext";
410 context =
" FromExternalSignal";
412 <<
"earlyTermination -: " <<
context;
413 std::lock_guard<std::mutex>
lock(
fmt_->monlock_);
428 std::lock_guard<std::mutex>
lock(
fmt_->monlock_);
433 if (
desc.moduleName() ==
"Stream" ||
desc.moduleName() ==
"ShmStreamConsumer" ||
434 desc.moduleName() ==
"EvFOutputModule" ||
desc.moduleName() ==
"EventStreamFileWriter" ||
435 desc.moduleName() ==
"PoolOutputModule") {
436 fmt_->m_data.encModule_.updateReserved((
void*)&
desc);
439 fmt_->m_data.encModule_.update((
void*)&
desc);
455 std::lock_guard<std::mutex>
lock(
fmt_->monlock_);
457 fmt_->m_data.microstateBins_ =
fmt_->m_data.encModule_.vecsize() * 2;
471 timeval lumiStartTime;
472 gettimeofday(&lumiStartTime,
nullptr);
476 std::lock_guard<std::mutex>
lock(
fmt_->monlock_);
482 LogDebug(
"FastMonitoringService") <<
"Lumi ended. Writing JSON information. LUMI -: " <<
lumi;
483 timeval lumiStopTime;
484 gettimeofday(&lumiStopTime,
nullptr);
486 std::lock_guard<std::mutex>
lock(
fmt_->monlock_);
491 unsigned long usecondsForLumi = (lumiStopTime.tv_sec - stt.tv_sec) * 1000000 + (lumiStopTime.tv_usec - stt.tv_usec);
494 double throughput =
throughputFactor() * double(accuSize) / double(usecondsForLumi);
496 fmt_->m_data.fastThroughputJ_.value() = throughput;
502 IntJ* lumiProcessedJptr = dynamic_cast<IntJ*>(
fmt_->jsonMonitor_->getMergedIntJForLumi(
"Processed",
lumi));
503 if (!lumiProcessedJptr)
504 throw cms::Exception(
"FastMonitoringService") <<
"Internal error: got null pointer from FastMonitor";
511 exception_detected =
true;
516 <<
" events were processed in LUMI " <<
lumi;
527 if (sourceReport.first) {
529 throw cms::Exception(
"FastMonitoringService") <<
"MISMATCH with SOURCE update. LUMI -: " <<
lumi
531 <<
" events(source):" << sourceReport.second;
536 <<
"Statistics for lumisection -: lumi = " <<
lumi <<
" events = " << lumiProcessedJptr->
value()
537 <<
" time = " << usecondsForLumi / 1000000 <<
" size = " << accuSize <<
" thr = " << throughput;
538 delete lumiProcessedJptr;
545 std::stringstream slowFileNameStem;
546 slowFileNameStem <<
slowName_ <<
"_ls" << std::setfill(
'0') << std::setw(4) <<
lumi <<
"_pid" << std::setfill(
'0')
547 << std::setw(5) << getpid();
549 slow /= slowFileNameStem.str();
550 fmt_->jsonMonitor_->outputFullJSONs(slow.string(),
".jsn",
lumi,
output);
552 std::stringstream slowFileName;
553 slowFileName <<
slowName_ <<
"_ls" << std::setfill(
'0') << std::setw(4) <<
lumi <<
"_pid" << std::setfill(
'0')
554 << std::setw(5) << getpid() <<
".jsn";
556 slow /= slowFileName.str();
560 fmt_->jsonMonitor_->discardCollected(
lumi);
564 std::lock_guard<std::mutex>
lock(
fmt_->monlock_);
578 std::lock_guard<std::mutex>
lock(
fmt_->monlock_);
582 *(
fmt_->m_data.processed_[sid]) = 0;
594 std::lock_guard<std::mutex>
lock(
fmt_->monlock_);
622 fmt_->m_data.fastPathProcessedJ_ =
res + 1;
669 std::lock_guard<std::mutex>
lock(
fmt_->monlock_);
696 std::lock_guard<std::mutex>
lock(
fmt_->monlock_);
719 std::lock_guard<std::mutex>
lock(
fmt_->monlock_);
725 std::lock_guard<std::mutex>
lock(
fmt_->monlock_);
729 unsigned int proc = it->second.first;
731 *abortFlag = it->second.second;
735 <<
"output module wants already deleted (or never reported by SOURCE) lumisection event count for LUMI -: "
743 std::lock_guard<std::mutex>
lock(
fmt_->monlock_);
747 unsigned int abortFlag = it->second.second;
751 <<
"output module wants already deleted (or never reported by SOURCE) lumisection status for LUMI -: "
759 monInit_.exchange(
true, std::memory_order_acquire);
760 while (!
fmt_->m_stoprequest) {
761 std::vector<std::vector<unsigned int>> lastEnc;
763 std::lock_guard<std::mutex>
lock(
fmt_->monlock_);
767 lastEnc.emplace_back(
fmt_->m_data.ministateEncoded_);
768 lastEnc.emplace_back(
fmt_->m_data.microstateEncoded_);
772 std::vector<std::string> CSVv;
774 CSVv.push_back(
fmt_->jsonMonitor_->getCSVString((
int)
i));
776 fmt_->monlock_.unlock();
784 fmt_->monlock_.unlock();
792 std::stringstream accum;
796 accum <<
"[" <<
p[
i] <<
",";
798 accum <<
p[
i] <<
",";
800 accum <<
p[
i] <<
"]";
804 accum <<
"Current states: Ms=" <<
fmt_->m_data.fastMacrostateJ_.value() <<
" ms=";
817 fmt_->m_data.fastMacrostateJ_ =
fmt_->m_data.macrostate_;
819 std::vector<const void*> microstateCopy(
fmt_->m_data.microstate_.begin(),
fmt_->m_data.microstate_.end());
820 std::vector<unsigned char> microstateAcqCopy(
fmt_->m_data.microstateAcqFlag_.begin(),
821 fmt_->m_data.microstateAcqFlag_.end());
826 fmt_->m_data.fastAvgLeadTimeJ_ = itd->second;
828 fmt_->m_data.fastAvgLeadTimeJ_ = 0.;
832 fmt_->m_data.fastFilesProcessedJ_ = iti->second;
834 fmt_->m_data.fastFilesProcessedJ_ = 0;
838 fmt_->m_data.fastLockWaitJ_ = itrd->second.first;
839 fmt_->m_data.fastLockCountJ_ = itrd->second.second;
841 fmt_->m_data.fastLockWaitJ_ = 0.;
842 fmt_->m_data.fastLockCountJ_ = 0.;
847 fmt_->m_data.ministateEncoded_[
i] =
fmt_->m_data.encPath_[
i].encodeString(
fmt_->m_data.ministate_[
i]);
848 if (microstateAcqCopy[
i])
849 fmt_->m_data.microstateEncoded_[
i] =
850 fmt_->m_data.microstateBins_ +
fmt_->m_data.encModule_.encode(microstateCopy[
i]);
852 fmt_->m_data.microstateEncoded_[
i] =
fmt_->m_data.encModule_.encode(microstateCopy[
i]);
855 bool inputStatePerThread =
false;
958 inputStatePerThread =
true;
969 inputStatePerThread =
true;
979 if (!inputStatePerThread)
981 fmt_->m_data.inputState_[
i] =
fmt_->m_data.inputState_[0];
984 fmt_->jsonMonitor_->snapGlobal(
ls);
986 fmt_->jsonMonitor_->snap(
ls);