CMS 3D CMS Logo

FastMonitoringService.cc
Go to the documentation of this file.
3 
15 
19 
22 
23 #include <iostream>
24 #include <iomanip>
25 #include <sys/time.h>
26 
27 using namespace jsoncollector;
28 
29 constexpr double throughputFactor() { return (1000000) / double(1024 * 1024); }
30 
31 namespace evf {
32 
33  const edm::ModuleDescription FastMonitoringService::reservedMicroStateNames[FastMonState::mCOUNT] = {
34  edm::ModuleDescription("Dummy", "Invalid"),
35  edm::ModuleDescription("Dummy", "Idle"),
36  edm::ModuleDescription("Dummy", "FwkOvhSrc"),
37  edm::ModuleDescription("Dummy", "FwkOvhMod"), //set post produce, analyze or filter
38  edm::ModuleDescription("Dummy", "FwkEoL"),
39  edm::ModuleDescription("Dummy", "Input"),
40  edm::ModuleDescription("Dummy", "DQM"),
41  edm::ModuleDescription("Dummy", "BoL"),
42  edm::ModuleDescription("Dummy", "EoL"),
43  edm::ModuleDescription("Dummy", "GlobalEoL")};
44 
45  const std::string FastMonitoringService::macroStateNames[FastMonState::MCOUNT] = {"Init",
46  "JobReady",
47  "RunGiven",
48  "Running",
49  "Stopping",
50  "Done",
51  "JobEnded",
52  "Error",
53  "ErrorEnded",
54  "End",
55  "Invalid"};
56 
57  const std::string FastMonitoringService::inputStateNames[FastMonState::inCOUNT] = {
58  "Ignore",
59  "Init",
60  "WaitInput",
61  "NewLumi",
62  "NewLumiBusyEndingLS",
63  "NewLumiIdleEndingLS",
64  "RunEnd",
65  "ProcessingFile",
66  "WaitChunk",
67  "ChunkReceived",
68  "ChecksumEvent",
69  "CachedEvent",
70  "ReadEvent",
71  "ReadCleanup",
72  "NoRequest",
73  "NoRequestWithIdleThreads",
74  "NoRequestWithGlobalEoL",
75  "NoRequestWithEoLThreads",
76  "SupFileLimit",
77  "SupWaitFreeChunk",
78  "SupWaitFreeChunkCopying",
79  "SupWaitFreeThread",
80  "SupWaitFreeThreadCopying",
81  "SupBusy",
82  "SupLockPolling",
83  "SupLockPollingCopying",
84  "SupNoFile",
85  "SupNewFile",
86  "SupNewFileWaitThreadCopying",
87  "SupNewFileWaitThread",
88  "SupNewFileWaitChunkCopying",
89  "SupNewFileWaitChunk",
90  "WaitInput_fileLimit",
91  "WaitInput_waitFreeChunk",
92  "WaitInput_waitFreeChunkCopying",
93  "WaitInput_waitFreeThread",
94  "WaitInput_waitFreeThreadCopying",
95  "WaitInput_busy",
96  "WaitInput_lockPolling",
97  "WaitInput_lockPollingCopying",
98  "WaitInput_runEnd",
99  "WaitInput_noFile",
100  "WaitInput_newFile",
101  "WaitInput_newFileWaitThreadCopying",
102  "WaitInput_newFileWaitThread",
103  "WaitInput_newFileWaitChunkCopying",
104  "WaitInput_newFileWaitChunk",
105  "WaitChunk_fileLimit",
106  "WaitChunk_waitFreeChunk",
107  "WaitChunk_waitFreeChunkCopying",
108  "WaitChunk_waitFreeThread",
109  "WaitChunk_waitFreeThreadCopying",
110  "WaitChunk_busy",
111  "WaitChunk_lockPolling",
112  "WaitChunk_lockPollingCopying",
113  "WaitChunk_runEnd",
114  "WaitChunk_noFile",
115  "WaitChunk_newFile",
116  "WaitChunk_newFileWaitThreadCopying",
117  "WaitChunk_newFileWaitThread",
118  "WaitChunk_newFileWaitChunkCopying",
119  "WaitChunk_newFileWaitChunk"};
120 
121  const std::string FastMonitoringService::nopath_ = "NoPath";
122 
124  : MicroStateService(iPS, reg),
125  fmt_(new FastMonitoringThread()),
126  nStreams_(0) //until initialized
127  ,
128  sleepTime_(iPS.getUntrackedParameter<int>("sleepTime", 1)),
129  fastMonIntervals_(iPS.getUntrackedParameter<unsigned int>("fastMonIntervals", 2)),
130  fastName_("fastmoni"),
131  slowName_("slowmoni"),
132  filePerFwkStream_(iPS.getUntrackedParameter<bool>("filePerFwkStream", false)),
133  totalEventsProcessed_(0) {
134  reg.watchPreallocate(this, &FastMonitoringService::preallocate); //receiving information on number of threads
136 
141 
145 
150 
152 
153  reg.watchPreEvent(this, &FastMonitoringService::preEvent); //stream
155 
156  reg.watchPreSourceEvent(this, &FastMonitoringService::preSourceEvent); //source (with streamID of requestor)
158 
161 
164 
168 
169  //find microstate definition path (required by the module)
170  struct stat statbuf;
171  std::string microstateBaseSuffix = "src/EventFilter/Utilities/plugins/microstatedef.jsd";
172  std::string microstatePath = std::string(std::getenv("CMSSW_BASE")) + "/" + microstateBaseSuffix;
173  if (stat(microstatePath.c_str(), &statbuf)) {
174  microstatePath = std::string(std::getenv("CMSSW_RELEASE_BASE")) + "/" + microstateBaseSuffix;
175  if (stat(microstatePath.c_str(), &statbuf)) {
176  microstatePath = microstateBaseSuffix;
177  if (stat(microstatePath.c_str(), &statbuf))
178  throw cms::Exception("FastMonitoringService") << "microstate definition file not found";
179  }
180  }
181  fastMicrostateDefPath_ = microstateDefPath_ = microstatePath;
182  }
183 
185 
188  desc.setComment("Service for File-based DAQ monitoring and event accounting");
189  desc.addUntracked<int>("sleepTime", 1)->setComment("Sleep time of the monitoring thread");
190  desc.addUntracked<unsigned int>("fastMonIntervals", 2)
191  ->setComment("Modulo of sleepTime intervals on which fastmon file is written out");
192  desc.addUntracked<bool>("filePerFwkStream", false)
193  ->setComment("Switches on monitoring output per framework stream");
194  desc.setAllowAnything();
195  descriptions.add("FastMonitoringService", desc);
196  }
197 
199  Json::Value legendaVector(Json::arrayValue);
200  for (int i = 0; i < fmt_->m_data.encPath_[0].current_; i++)
201  legendaVector.append(Json::Value(*(static_cast<const std::string*>(fmt_->m_data.encPath_[0].decode(i)))));
202  Json::Value valReserved(nReservedPaths);
203  Json::Value pathLegend;
204  pathLegend["names"] = legendaVector;
205  pathLegend["reserved"] = valReserved;
207  return writer.write(pathLegend);
208  }
209 
211  Json::Value legendaVector(Json::arrayValue);
212  for (int i = 0; i < fmt_->m_data.encModule_.current_; i++)
213  legendaVector.append(
214  Json::Value((static_cast<const edm::ModuleDescription*>(fmt_->m_data.encModule_.decode(i)))->moduleLabel()));
215  //duplicate modules adding a list for acquire states (not all modules actually have it)
216  for (int i = 0; i < fmt_->m_data.encModule_.current_; i++)
217  legendaVector.append(Json::Value(
218  (static_cast<const edm::ModuleDescription*>(fmt_->m_data.encModule_.decode(i)))->moduleLabel() + "__ACQ"));
219  Json::Value valReserved(nReservedModules);
220  Json::Value valSpecial(nSpecialModules);
221  Json::Value valOutputModules(nOutputModules_);
222  Json::Value moduleLegend;
223  moduleLegend["names"] = legendaVector;
224  moduleLegend["reserved"] = valReserved;
225  moduleLegend["special"] = valSpecial;
226  moduleLegend["output"] = valOutputModules;
228  return writer.write(moduleLegend);
229  }
230 
232  Json::Value legendaVector(Json::arrayValue);
233  for (int i = 0; i < FastMonState::inCOUNT; i++)
234  legendaVector.append(Json::Value(inputStateNames[i]));
235  Json::Value moduleLegend;
236  moduleLegend["names"] = legendaVector;
238  return writer.write(moduleLegend);
239  }
240 
242  nStreams_ = bounds.maxNumberOfStreams();
243  nThreads_ = bounds.maxNumberOfThreads();
244  //this should already be >=1
245  if (nStreams_ == 0)
246  nStreams_ = 1;
247  if (nThreads_ == 0)
248  nThreads_ = 1;
249  }
250 
252  edm::ProcessContext const& pc) {
253  // FIND RUN DIRECTORY
254  // The run dir should be set via the configuration of EvFDaqDirector
255 
256  if (edm::Service<evf::EvFDaqDirector>().operator->() == nullptr) {
257  throw cms::Exception("FastMonitoringService") << "EvFDaqDirector is not present";
258  }
259  std::filesystem::path runDirectory{edm::Service<evf::EvFDaqDirector>()->baseRunDir()};
260  workingDirectory_ = runDirectory_ = runDirectory;
261  workingDirectory_ /= "mon";
262 
263  if (!std::filesystem::is_directory(workingDirectory_)) {
264  LogDebug("FastMonitoringService") << "<MON> DIR NOT FOUND! Trying to create -: " << workingDirectory_.string();
265  std::filesystem::create_directories(workingDirectory_);
266  if (!std::filesystem::is_directory(workingDirectory_))
267  edm::LogWarning("FastMonitoringService") << "Unable to create <MON> DIR -: " << workingDirectory_.string()
268  << ". No monitoring data will be written.";
269  }
270 
271  std::ostringstream fastFileName;
272 
273  fastFileName << fastName_ << "_pid" << std::setfill('0') << std::setw(5) << getpid() << ".fast";
275  fast /= fastFileName.str();
276  fastPath_ = fast.string();
277  if (filePerFwkStream_)
278  for (unsigned int i = 0; i < nStreams_; i++) {
279  std::ostringstream fastFileNameTid;
280  fastFileNameTid << fastName_ << "_pid" << std::setfill('0') << std::setw(5) << getpid() << "_tid" << i
281  << ".fast";
283  fastTid /= fastFileNameTid.str();
284  fastPathList_.push_back(fastTid.string());
285  }
286 
287  std::ostringstream moduleLegFile;
288  std::ostringstream moduleLegFileJson;
289  moduleLegFile << "microstatelegend_pid" << std::setfill('0') << std::setw(5) << getpid() << ".leg";
290  moduleLegFileJson << "microstatelegend_pid" << std::setfill('0') << std::setw(5) << getpid() << ".jsn";
291  moduleLegendFile_ = (workingDirectory_ / moduleLegFile.str()).string();
292  moduleLegendFileJson_ = (workingDirectory_ / moduleLegFileJson.str()).string();
293 
294  std::ostringstream pathLegFile;
295  std::ostringstream pathLegFileJson;
296  pathLegFile << "pathlegend_pid" << std::setfill('0') << std::setw(5) << getpid() << ".leg";
297  pathLegendFile_ = (workingDirectory_ / pathLegFile.str()).string();
298  pathLegFileJson << "pathlegend_pid" << std::setfill('0') << std::setw(5) << getpid() << ".jsn";
299  pathLegendFileJson_ = (workingDirectory_ / pathLegFileJson.str()).string();
300 
301  std::ostringstream inputLegFileJson;
302  inputLegFileJson << "inputlegend_pid" << std::setfill('0') << std::setw(5) << getpid() << ".jsn";
303  inputLegendFileJson_ = (workingDirectory_ / inputLegFileJson.str()).string();
304 
305  LogDebug("FastMonitoringService") << "Initializing FastMonitor with microstate def path -: " << microstateDefPath_;
306  //<< encPath_.current_ + 1 << " " << encModule_.current_ + 1
307 
308  /*
309  * initialize the fast monitor with:
310  * vector of pointers to monitorable parameters
311  * path to definition
312  *
313  */
314 
315  fmt_->m_data.macrostate_ = FastMonState::sInit;
316 
317  for (unsigned int i = 0; i < (FastMonState::mCOUNT); i++)
318  fmt_->m_data.encModule_.updateReserved(static_cast<const void*>(reservedMicroStateNames + i));
319  fmt_->m_data.encModule_.completeReservedWithDummies();
320 
321  for (unsigned int i = 0; i < nStreams_; i++) {
322  fmt_->m_data.ministate_.emplace_back(&nopath_);
323  fmt_->m_data.microstate_.emplace_back(&reservedMicroStateNames[FastMonState::mInvalid]);
324  fmt_->m_data.microstateAcqFlag_.push_back(0);
325 
326  //for synchronization
327  streamCounterUpdating_.push_back(new std::atomic<bool>(false));
328 
329  //path (mini) state
330  fmt_->m_data.encPath_.emplace_back(0);
331  fmt_->m_data.encPath_[i].update(static_cast<const void*>(&nopath_));
332 
333  for (auto& path : pathsInfo.paths()) {
334  fmt_->m_data.encPath_[i].updatePreinit(path);
335  }
336  for (auto& endPath : pathsInfo.endPaths()) {
337  fmt_->m_data.encPath_[i].updatePreinit(endPath);
338  }
339  }
340  //for (unsigned int i=0;i<nThreads_;i++)
341  // threadMicrostate_.push_back(&reservedMicroStateNames[mInvalid]);
342 
343  //initial size until we detect number of bins
344  fmt_->m_data.macrostateBins_ = FastMonState::MCOUNT;
345  fmt_->m_data.microstateBins_ = 0;
346  fmt_->m_data.inputstateBins_ = FastMonState::inCOUNT;
347  fmt_->m_data.ministateBins_ = fmt_->m_data.encPath_[0].vecsize();
348 
349  lastGlobalLumi_ = 0;
350  isInitTransition_ = true;
351  lumiFromSource_ = 0;
352 
353  //startup monitoring
354  fmt_->resetFastMonitor(microstateDefPath_, fastMicrostateDefPath_);
355  fmt_->jsonMonitor_->setNStreams(nStreams_);
356  fmt_->m_data.registerVariables(fmt_->jsonMonitor_.get(), nStreams_, threadIDAvailable_ ? nThreads_ : 0);
357  monInit_.store(false, std::memory_order_release);
358  if (sleepTime_ > 0)
360 
361  //this definition needs: #include "tbb/compat/thread"
362  //however this would results in TBB imeplementation replacing std::thread
363  //(both supposedly call pthread_self())
364  //number of threads created in process could be obtained from /proc,
365  //assuming that all posix threads are true kernel threads capable of running in parallel
366 
367  //#if TBB_IMPLEMENT_CPP0X
369  //threadIDAvailable_=true;
370  //#endif
371  }
372 
376  context = " FromThisContext ";
378  context = " FromAnotherContext";
380  context = " FromExternalSignal";
381  edm::LogWarning("FastMonitoringService")
382  << " STREAM " << sc.streamID().value() << " earlyTermination -: ID:" << sc.eventID()
383  << " LS:" << sc.eventID().luminosityBlock() << " " << context;
384  std::lock_guard<std::mutex> lock(fmt_->monlock_);
385  exceptionInLS_.push_back(sc.eventID().luminosityBlock());
386  }
387 
391  context = " FromThisContext ";
393  context = " FromAnotherContext";
395  context = " FromExternalSignal";
396  edm::LogWarning("FastMonitoringService")
397  << " GLOBAL "
398  << "earlyTermination -: LS:" << gc.luminosityBlockID().luminosityBlock() << " " << context;
399  std::lock_guard<std::mutex> lock(fmt_->monlock_);
401  }
402 
406  context = " FromThisContext ";
408  context = " FromAnotherContext";
410  context = " FromExternalSignal";
411  edm::LogWarning("FastMonitoringService") << " SOURCE "
412  << "earlyTermination -: " << context;
413  std::lock_guard<std::mutex> lock(fmt_->monlock_);
414  exception_detected_ = true;
415  }
416 
418  if (!ls)
419  exception_detected_ = true;
420  else
421  exceptionInLS_.push_back(ls);
422  }
423 
425 
426  //new output module name is stream
428  std::lock_guard<std::mutex> lock(fmt_->monlock_);
429  //std::cout << " Pre module Begin Job module: " << desc.moduleName() << std::endl;
430 
431  //build a map of modules keyed by their module description address
432  //here we need to treat output modules in a special way so they can be easily singled out
433  if (desc.moduleName() == "Stream" || desc.moduleName() == "ShmStreamConsumer" ||
434  desc.moduleName() == "EvFOutputModule" || desc.moduleName() == "EventStreamFileWriter" ||
435  desc.moduleName() == "PoolOutputModule") {
436  fmt_->m_data.encModule_.updateReserved((void*)&desc);
437  nOutputModules_++;
438  } else
439  fmt_->m_data.encModule_.update((void*)&desc);
440  }
441 
443  std::string&& moduleLegStrJson = makeModuleLegendaJson();
444  FileIO::writeStringToFile(moduleLegendFileJson_, moduleLegStrJson);
445 
446  std::string inputLegendStrJson = makeInputLegendaJson();
447  FileIO::writeStringToFile(inputLegendFileJson_, inputLegendStrJson);
448 
449  std::string pathLegendStrJson = makePathLegendaJson();
450  FileIO::writeStringToFile(pathLegendFileJson_, pathLegendStrJson);
451 
452  fmt_->m_data.macrostate_ = FastMonState::sJobReady;
453 
454  //update number of entries in module histogram
455  std::lock_guard<std::mutex> lock(fmt_->monlock_);
456  //double the size to add post-acquire states
457  fmt_->m_data.microstateBins_ = fmt_->m_data.encModule_.vecsize() * 2;
458  }
459 
461  fmt_->m_data.macrostate_ = FastMonState::sJobEnded;
462  fmt_->stop();
463  }
464 
466  fmt_->m_data.macrostate_ = FastMonState::sRunning;
467  isInitTransition_ = false;
468  }
469 
471  timeval lumiStartTime;
472  gettimeofday(&lumiStartTime, nullptr);
473  unsigned int newLumi = gc.luminosityBlockID().luminosityBlock();
474  lastGlobalLumi_ = newLumi;
475 
476  std::lock_guard<std::mutex> lock(fmt_->monlock_);
477  lumiStartTime_[newLumi] = lumiStartTime;
478  }
479 
481  unsigned int lumi = gc.luminosityBlockID().luminosityBlock();
482  LogDebug("FastMonitoringService") << "Lumi ended. Writing JSON information. LUMI -: " << lumi;
483  timeval lumiStopTime;
484  gettimeofday(&lumiStopTime, nullptr);
485 
486  std::lock_guard<std::mutex> lock(fmt_->monlock_);
487 
488  // Compute throughput
489  timeval stt = lumiStartTime_[lumi];
490  lumiStartTime_.erase(lumi);
491  unsigned long usecondsForLumi = (lumiStopTime.tv_sec - stt.tv_sec) * 1000000 + (lumiStopTime.tv_usec - stt.tv_usec);
492  unsigned long accuSize = accuSize_.find(lumi) == accuSize_.end() ? 0 : accuSize_[lumi];
493  accuSize_.erase(lumi);
494  double throughput = throughputFactor() * double(accuSize) / double(usecondsForLumi);
495  //store to registered variable
496  fmt_->m_data.fastThroughputJ_.value() = throughput;
497 
498  //update
499  doSnapshot(lumi, true);
500 
501  //retrieve one result we need (todo: sanity check if it's found)
502  IntJ* lumiProcessedJptr = dynamic_cast<IntJ*>(fmt_->jsonMonitor_->getMergedIntJForLumi("Processed", lumi));
503  if (!lumiProcessedJptr)
504  throw cms::Exception("FastMonitoringService") << "Internal error: got null pointer from FastMonitor";
505  processedEventsPerLumi_[lumi] = std::pair<unsigned int, bool>(lumiProcessedJptr->value(), false);
506 
507  //checking if exception has been thrown (in case of Global/Stream early termination, for this LS)
508  bool exception_detected = exception_detected_;
509  for (auto ex : exceptionInLS_)
510  if (lumi == ex)
511  exception_detected = true;
512 
513  if (edm::shutdown_flag || exception_detected) {
514  edm::LogInfo("FastMonitoringService")
515  << "Run interrupted. Skip writing EoL information -: " << processedEventsPerLumi_[lumi].first
516  << " events were processed in LUMI " << lumi;
517  //this will prevent output modules from producing json file for possibly incomplete lumi
518  processedEventsPerLumi_[lumi].first = 0;
519  processedEventsPerLumi_[lumi].second = true;
520  //disable this exception, so service can be used standalone (will be thrown if output module asks for this information)
521  //throw cms::Exception("FastMonitoringService") << "SOURCE did not send update for lumi block. LUMI -:" << lumi;
522  return;
523  }
524 
525  if (inputSource_) {
526  auto sourceReport = inputSource_->getEventReport(lumi, true);
527  if (sourceReport.first) {
528  if (sourceReport.second != processedEventsPerLumi_[lumi].first) {
529  throw cms::Exception("FastMonitoringService") << "MISMATCH with SOURCE update. LUMI -: " << lumi
530  << ", events(processed):" << processedEventsPerLumi_[lumi].first
531  << " events(source):" << sourceReport.second;
532  }
533  }
534  }
535  edm::LogInfo("FastMonitoringService")
536  << "Statistics for lumisection -: lumi = " << lumi << " events = " << lumiProcessedJptr->value()
537  << " time = " << usecondsForLumi / 1000000 << " size = " << accuSize << " thr = " << throughput;
538  delete lumiProcessedJptr;
539 
540  //full global and stream merge&output for this lumi
541 
542  // create file name for slow monitoring file
543  bool output = sleepTime_ > 0;
544  if (filePerFwkStream_) {
545  std::stringstream slowFileNameStem;
546  slowFileNameStem << slowName_ << "_ls" << std::setfill('0') << std::setw(4) << lumi << "_pid" << std::setfill('0')
547  << std::setw(5) << getpid();
549  slow /= slowFileNameStem.str();
550  fmt_->jsonMonitor_->outputFullJSONs(slow.string(), ".jsn", lumi, output);
551  } else {
552  std::stringstream slowFileName;
553  slowFileName << slowName_ << "_ls" << std::setfill('0') << std::setw(4) << lumi << "_pid" << std::setfill('0')
554  << std::setw(5) << getpid() << ".jsn";
556  slow /= slowFileName.str();
557  //full global and stream merge and JSON write for this lumi
558  fmt_->jsonMonitor_->outputFullJSON(slow.string(), lumi, output);
559  }
560  fmt_->jsonMonitor_->discardCollected(lumi); //we don't do further updates for this lumi
561  }
562 
564  std::lock_guard<std::mutex> lock(fmt_->monlock_);
565  unsigned int lumi = gc.luminosityBlockID().luminosityBlock();
566  //LS monitoring snapshot with input source data has been taken in previous callback
567  avgLeadTime_.erase(lumi);
569  lockStatsDuringLumi_.erase(lumi);
570 
571  //output module already used this in end lumi (this could be migrated to EvFDaqDirector as it is essential for FFF bookkeeping)
573  }
574 
576  unsigned int sid = sc.streamID().value();
577 
578  std::lock_guard<std::mutex> lock(fmt_->monlock_);
579  fmt_->m_data.streamLumi_[sid] = sc.eventID().luminosityBlock();
580 
581  //reset collected values for this stream
582  *(fmt_->m_data.processed_[sid]) = 0;
583 
584  fmt_->m_data.ministate_[sid] = &nopath_;
585  fmt_->m_data.microstate_[sid] = &reservedMicroStateNames[FastMonState::mBoL];
586  }
587 
589  fmt_->m_data.microstate_[sc.streamID().value()] = &reservedMicroStateNames[FastMonState::mIdle];
590  }
591 
593  unsigned int sid = sc.streamID().value();
594  std::lock_guard<std::mutex> lock(fmt_->monlock_);
595 
596  //update processed count to be complete at this time
597  //doStreamEOLSnapshot(sc.eventID().luminosityBlock(), sid);
598  fmt_->jsonMonitor_->snapStreamAtomic(sc.eventID().luminosityBlock(), sid);
599  //reset this in case stream does not get notified of next lumi (we keep processed events only)
600  fmt_->m_data.ministate_[sid] = &nopath_;
601  fmt_->m_data.microstate_[sid] = &reservedMicroStateNames[FastMonState::mEoL];
602  }
604  fmt_->m_data.microstate_[sc.streamID().value()] = &reservedMicroStateNames[FastMonState::mFwkEoL];
605  }
606 
608  fmt_->m_data.ministate_[sc.streamID()] = &(pc.pathName());
609  }
610 
612 
614  fmt_->m_data.microstate_[sc.streamID()] = &reservedMicroStateNames[FastMonState::mIdle];
615 
616  fmt_->m_data.ministate_[sc.streamID()] = &nopath_;
617 
618  (*(fmt_->m_data.processed_[sc.streamID()]))++;
619 
620  //fast path counter (events accumulated in a run)
621  unsigned long res = totalEventsProcessed_.fetch_add(1, std::memory_order_relaxed);
622  fmt_->m_data.fastPathProcessedJ_ = res + 1;
623  }
624 
626  fmt_->m_data.microstate_[sid.value()] = &reservedMicroStateNames[FastMonState::mInput];
627  }
628 
630  fmt_->m_data.microstate_[sid.value()] = &reservedMicroStateNames[FastMonState::mFwkOvhSrc];
631  }
632 
634  edm::ModuleCallingContext const& mcc) {
635  fmt_->m_data.microstate_[sc.streamID().value()] = (void*)(mcc.moduleDescription());
636  }
637 
639  edm::ModuleCallingContext const& mcc) {
640  //fmt_->m_data.microstate_[sc.streamID().value()] = (void*)(mcc.moduleDescription());
641  fmt_->m_data.microstateAcqFlag_[sc.streamID().value()] = 1;
642  }
643 
645  fmt_->m_data.microstate_[sc.streamID().value()] = (void*)(mcc.moduleDescription());
646  fmt_->m_data.microstateAcqFlag_[sc.streamID().value()] = 0;
647  }
648 
650  fmt_->m_data.microstate_[sc.streamID().value()] = &reservedMicroStateNames[FastMonState::mFwkOvhMod];
651  }
652 
653  //FUNCTIONS CALLED FROM OUTSIDE
654 
655  //this is for old-fashioned service that is not thread safe and can block other streams
656  //(we assume the worst case - everything is blocked)
658  for (unsigned int i = 0; i < nStreams_; i++)
659  fmt_->m_data.microstate_[i] = &reservedMicroStateNames[m];
660  }
661 
662  //this is for services that are multithreading-enabled or rarely blocks other streams
664  fmt_->m_data.microstate_[sid] = &reservedMicroStateNames[m];
665  }
666 
667  //from source
668  void FastMonitoringService::accumulateFileSize(unsigned int lumi, unsigned long fileSize) {
669  std::lock_guard<std::mutex> lock(fmt_->monlock_);
670 
671  if (accuSize_.find(lumi) == accuSize_.end())
672  accuSize_[lumi] = fileSize;
673  else
674  accuSize_[lumi] += fileSize;
675 
678  else
680  }
681 
683  gettimeofday(&fileLookStart_, nullptr);
684  /*
685  std::cout << "Started looking for .raw file at: s=" << fileLookStart_.tv_sec << ": ms = "
686  << fileLookStart_.tv_usec / 1000.0 << std::endl;
687  */
688  }
689 
691  gettimeofday(&fileLookStop_, nullptr);
692  /*
693  std::cout << "Stopped looking for .raw file at: s=" << fileLookStop_.tv_sec << ": ms = "
694  << fileLookStop_.tv_usec / 1000.0 << std::endl;
695  */
696  std::lock_guard<std::mutex> lock(fmt_->monlock_);
697 
698  if (lumi > lumiFromSource_) {
700  leadTimes_.clear();
701  }
702  unsigned long elapsedTime = (fileLookStop_.tv_sec - fileLookStart_.tv_sec) * 1000000 // sec to us
703  + (fileLookStop_.tv_usec - fileLookStart_.tv_usec); // us
704  // add this to lead times for this lumi
705  leadTimes_.push_back((double)elapsedTime);
706 
707  // recompute average lead time for this lumi
708  if (leadTimes_.size() == 1)
710  else {
711  double totTime = 0;
712  for (unsigned int i = 0; i < leadTimes_.size(); i++)
713  totTime += leadTimes_[i];
714  avgLeadTime_[lumi] = 0.001 * (totTime / leadTimes_.size());
715  }
716  }
717 
718  void FastMonitoringService::reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount) {
719  std::lock_guard<std::mutex> lock(fmt_->monlock_);
720  lockStatsDuringLumi_[ls] = std::pair<double, unsigned int>(waitTime, lockCount);
721  }
722 
723  //for the output module
724  unsigned int FastMonitoringService::getEventsProcessedForLumi(unsigned int lumi, bool* abortFlag) {
725  std::lock_guard<std::mutex> lock(fmt_->monlock_);
726 
727  auto it = processedEventsPerLumi_.find(lumi);
728  if (it != processedEventsPerLumi_.end()) {
729  unsigned int proc = it->second.first;
730  if (abortFlag)
731  *abortFlag = it->second.second;
732  return proc;
733  } else {
734  throw cms::Exception("FastMonitoringService")
735  << "output module wants already deleted (or never reported by SOURCE) lumisection event count for LUMI -: "
736  << lumi;
737  return 0;
738  }
739  }
740 
741  //for the output module
743  std::lock_guard<std::mutex> lock(fmt_->monlock_);
744 
745  auto it = processedEventsPerLumi_.find(lumi);
746  if (it != processedEventsPerLumi_.end()) {
747  unsigned int abortFlag = it->second.second;
748  return abortFlag;
749  } else {
750  throw cms::Exception("FastMonitoringService")
751  << "output module wants already deleted (or never reported by SOURCE) lumisection status for LUMI -: "
752  << lumi;
753  return false;
754  }
755  }
756 
757  // the function to be called in the thread. Thread completes when function returns.
759  monInit_.exchange(true, std::memory_order_acquire);
760  while (!fmt_->m_stoprequest) {
761  std::vector<std::vector<unsigned int>> lastEnc;
762  {
763  std::lock_guard<std::mutex> lock(fmt_->monlock_);
764 
765  doSnapshot(lastGlobalLumi_, false);
766 
767  lastEnc.emplace_back(fmt_->m_data.ministateEncoded_);
768  lastEnc.emplace_back(fmt_->m_data.microstateEncoded_);
769 
771  if (filePerFwkStream_) {
772  std::vector<std::string> CSVv;
773  for (unsigned int i = 0; i < nStreams_; i++) {
774  CSVv.push_back(fmt_->jsonMonitor_->getCSVString((int)i));
775  }
776  fmt_->monlock_.unlock();
777  for (unsigned int i = 0; i < nStreams_; i++) {
778  if (!CSVv[i].empty())
779  fmt_->jsonMonitor_->outputCSV(fastPathList_[i], CSVv[i]);
780  }
781  } else {
782  std::string CSV = fmt_->jsonMonitor_->getCSVString();
783  //release mutex before writing out fast path file
784  fmt_->monlock_.unlock();
785  if (!CSV.empty())
786  fmt_->jsonMonitor_->outputCSV(fastPath_, CSV);
787  }
788  }
789  snapCounter_++;
790  }
791 
792  std::stringstream accum;
793  std::function<void(std::vector<unsigned int>)> f = [&](std::vector<unsigned int> p) {
794  for (unsigned int i = 0; i < nStreams_; i++) {
795  if (i == 0)
796  accum << "[" << p[i] << ",";
797  else if (i <= nStreams_ - 1)
798  accum << p[i] << ",";
799  else
800  accum << p[i] << "]";
801  }
802  };
803 
804  accum << "Current states: Ms=" << fmt_->m_data.fastMacrostateJ_.value() << " ms=";
805  f(lastEnc[0]);
806  accum << " us=";
807  f(lastEnc[1]);
808  accum << " is=" << inputStateNames[inputState_] << " iss=" << inputStateNames[inputSupervisorState_];
809  edm::LogInfo("FastMonitoringService") << accum.str();
810 
811  ::sleep(sleepTime_);
812  }
813  }
814 
815  void FastMonitoringService::doSnapshot(const unsigned int ls, const bool isGlobalEOL) {
816  // update macrostate
817  fmt_->m_data.fastMacrostateJ_ = fmt_->m_data.macrostate_;
818 
819  std::vector<const void*> microstateCopy(fmt_->m_data.microstate_.begin(), fmt_->m_data.microstate_.end());
820  std::vector<unsigned char> microstateAcqCopy(fmt_->m_data.microstateAcqFlag_.begin(),
821  fmt_->m_data.microstateAcqFlag_.end());
822 
823  if (!isInitTransition_) {
824  auto itd = avgLeadTime_.find(ls);
825  if (itd != avgLeadTime_.end())
826  fmt_->m_data.fastAvgLeadTimeJ_ = itd->second;
827  else
828  fmt_->m_data.fastAvgLeadTimeJ_ = 0.;
829 
830  auto iti = filesProcessedDuringLumi_.find(ls);
831  if (iti != filesProcessedDuringLumi_.end())
832  fmt_->m_data.fastFilesProcessedJ_ = iti->second;
833  else
834  fmt_->m_data.fastFilesProcessedJ_ = 0;
835 
836  auto itrd = lockStatsDuringLumi_.find(ls);
837  if (itrd != lockStatsDuringLumi_.end()) {
838  fmt_->m_data.fastLockWaitJ_ = itrd->second.first;
839  fmt_->m_data.fastLockCountJ_ = itrd->second.second;
840  } else {
841  fmt_->m_data.fastLockWaitJ_ = 0.;
842  fmt_->m_data.fastLockCountJ_ = 0.;
843  }
844  }
845 
846  for (unsigned int i = 0; i < nStreams_; i++) {
847  fmt_->m_data.ministateEncoded_[i] = fmt_->m_data.encPath_[i].encodeString(fmt_->m_data.ministate_[i]);
848  if (microstateAcqCopy[i])
849  fmt_->m_data.microstateEncoded_[i] =
850  fmt_->m_data.microstateBins_ + fmt_->m_data.encModule_.encode(microstateCopy[i]);
851  else
852  fmt_->m_data.microstateEncoded_[i] = fmt_->m_data.encModule_.encode(microstateCopy[i]);
853  }
854 
855  bool inputStatePerThread = false;
856 
858  switch (inputSupervisorState_) {
860  fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_fileLimit;
861  break;
863  fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_waitFreeChunk;
864  break;
866  fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_waitFreeChunkCopying;
867  break;
869  fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_waitFreeThread;
870  break;
872  fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_waitFreeThreadCopying;
873  break;
875  fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_busy;
876  break;
878  fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_lockPolling;
879  break;
881  fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_lockPollingCopying;
882  break;
884  fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_runEnd;
885  break;
887  fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_noFile;
888  break;
890  fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_newFile;
891  break;
894  break;
896  fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_newFileWaitThread;
897  break;
900  break;
902  fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_newFileWaitChunk;
903  break;
904  default:
905  fmt_->m_data.inputState_[0] = FastMonState::inWaitInput;
906  }
907  } else if (inputState_ == FastMonState::inWaitChunk) {
908  switch (inputSupervisorState_) {
910  fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_fileLimit;
911  break;
913  fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_waitFreeChunk;
914  break;
916  fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_waitFreeChunkCopying;
917  break;
919  fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_waitFreeThread;
920  break;
922  fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_waitFreeThreadCopying;
923  break;
925  fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_busy;
926  break;
928  fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_lockPolling;
929  break;
931  fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_lockPollingCopying;
932  break;
934  fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_runEnd;
935  break;
937  fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_noFile;
938  break;
940  fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_newFile;
941  break;
944  break;
946  fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_newFileWaitThread;
947  break;
950  break;
952  fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_newFileWaitChunk;
953  break;
954  default:
955  fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk;
956  }
957  } else if (inputState_ == FastMonState::inNoRequest) {
958  inputStatePerThread = true;
959  for (unsigned int i = 0; i < nStreams_; i++) {
960  if (microstateCopy[i] == &reservedMicroStateNames[FastMonState::mIdle])
961  fmt_->m_data.inputState_[i] = FastMonState::inNoRequestWithIdleThreads;
962  else if (microstateCopy[i] == &reservedMicroStateNames[FastMonState::mEoL] ||
963  microstateCopy[i] == &reservedMicroStateNames[FastMonState::mFwkEoL])
964  fmt_->m_data.inputState_[i] = FastMonState::inNoRequestWithEoLThreads;
965  else
966  fmt_->m_data.inputState_[i] = FastMonState::inNoRequest;
967  }
968  } else if (inputState_ == FastMonState::inNewLumi) {
969  inputStatePerThread = true;
970  for (unsigned int i = 0; i < nStreams_; i++) {
971  if (microstateCopy[i] == &reservedMicroStateNames[FastMonState::mEoL] ||
972  microstateCopy[i] == &reservedMicroStateNames[FastMonState::mFwkEoL])
973  fmt_->m_data.inputState_[i] = FastMonState::inNewLumi;
974  }
975  } else
976  fmt_->m_data.inputState_[0] = inputState_;
977 
978  //this is same for all streams
979  if (!inputStatePerThread)
980  for (unsigned int i = 1; i < nStreams_; i++)
981  fmt_->m_data.inputState_[i] = fmt_->m_data.inputState_[0];
982 
983  if (isGlobalEOL) { //only update global variables
984  fmt_->jsonMonitor_->snapGlobal(ls);
985  } else
986  fmt_->jsonMonitor_->snap(ls);
987  }
988 
989  //compatibility
991 
993 
994 } //end namespace evf
evf::FastMonitoringService::workingDirectory_
std::filesystem::path workingDirectory_
Definition: FastMonitoringService.h:271
evf::FastMonitoringService::preGlobalEndLumi
void preGlobalEndLumi(edm::GlobalContext const &)
Definition: FastMonitoringService.cc:480
ConfigurationDescriptions.h
edm::GlobalContext::luminosityBlockID
LuminosityBlockID const & luminosityBlockID() const
Definition: GlobalContext.h:60
edm::StreamID
Definition: StreamID.h:30
evf::FastMonState::inSupWaitFreeThreadCopying
Definition: FastMonitoringService.h:107
eostools.ls
def ls(path, rec=False)
Definition: eostools.py:349
evf::FastMonState::inWaitChunk_lockPolling
Definition: FastMonitoringService.h:140
edm::ActivityRegistry::watchPostStreamEndLumi
void watchPostStreamEndLumi(PostStreamEndLumi::slot_type const &iSlot)
Definition: ActivityRegistry.h:466
edm::ActivityRegistry::watchPreSourceEarlyTermination
void watchPreSourceEarlyTermination(PreSourceEarlyTermination::slot_type const &iSlot)
Definition: ActivityRegistry.h:517
evf::FastMonitoringService::exception_detected_
bool exception_detected_
Definition: FastMonitoringService.h:285
ModuleCallingContext.h
electrons_cff.bool
bool
Definition: electrons_cff.py:366
edm::TerminationOrigin::ExceptionFromThisContext
mps_fire.i
i
Definition: mps_fire.py:428
evf::FastMonitoringService::lockStatsDuringLumi_
std::map< unsigned int, std::pair< double, unsigned int > > lockStatsDuringLumi_
Definition: FastMonitoringService.h:259
evf::FastMonitoringService::preStreamEndLumi
void preStreamEndLumi(edm::StreamContext const &)
Definition: FastMonitoringService.cc:592
MessageLogger.h
evf::FastMonState::inSupNewFileWaitChunkCopying
Definition: FastMonitoringService.h:115
funct::false
false
Definition: Factorize.h:29
evf::FastMonitoringService::fastMonIntervals_
unsigned int fastMonIntervals_
Definition: FastMonitoringService.h:237
evf::FastMonitoringService::runDirectory_
std::filesystem::path runDirectory_
Definition: FastMonitoringService.h:271
evf::FastMonState::inWaitChunk
Definition: FastMonitoringService.h:92
evf::FastMonitoringService::doSnapshot
void doSnapshot(const unsigned int ls, const bool isGlobalEOL)
Definition: FastMonitoringService.cc:815
evf::FastMonState::inNoRequestWithIdleThreads
Definition: FastMonitoringService.h:99
evf::FastMonitoringService::preSourceEvent
void preSourceEvent(edm::StreamID)
Definition: FastMonitoringService.cc:625
f
double f[11][100]
Definition: MuScleFitUtils.cc:78
evf::nSpecialModules
constexpr int nSpecialModules
Definition: FastMonitoringThread.h:17
evf::FastMonitoringService::getEventsProcessedForLumi
unsigned int getEventsProcessedForLumi(unsigned int lumi, bool *abortFlag=nullptr)
Definition: FastMonitoringService.cc:724
convertSQLitetoXML_cfg.output
output
Definition: convertSQLitetoXML_cfg.py:72
evf::FastMonitoringService::processedEventsPerLumi_
std::map< unsigned int, std::pair< unsigned int, bool > > processedEventsPerLumi_
Definition: FastMonitoringService.h:262
evf::FastMonState::inSupNewFileWaitThread
Definition: FastMonitoringService.h:114
Json::arrayValue
array value (ordered list)
Definition: value.h:30
edm::ProcessContext
Definition: ProcessContext.h:27
evf::FastMonState::inNoRequest
Definition: FastMonitoringService.h:98
evf::FastMonState::inSupNewFileWaitChunk
Definition: FastMonitoringService.h:116
edm::TerminationOrigin::ExternalSignal
evf::FastMonitoringService::inputLegendFileJson_
std::string inputLegendFileJson_
Definition: FastMonitoringService.h:281
edm::ActivityRegistry::watchPostGlobalEndLumi
void watchPostGlobalEndLumi(PostGlobalEndLumi::slot_type const &iSlot)
Definition: ActivityRegistry.h:426
evf::FastMonitoringService::reservedMicroStateNames
static const edm::ModuleDescription reservedMicroStateNames[FastMonState::mCOUNT]
Definition: FastMonitoringService.h:156
edm::ActivityRegistry::watchJobFailure
void watchJobFailure(JobFailure::slot_type const &iSlot)
convenience function for attaching to signal
Definition: ActivityRegistry.h:178
edm::ParameterSetDescription
Definition: ParameterSetDescription.h:52
if
if(0==first)
Definition: CAHitNtupletGeneratorKernelsImpl.h:58
to
evf::FastMonitoringService::moduleLegendFileJson_
std::string moduleLegendFileJson_
Definition: FastMonitoringService.h:278
evf::FastMonState::inWaitChunk_noFile
Definition: FastMonitoringService.h:143
cscNeutronWriter_cfi.writer
writer
Definition: cscNeutronWriter_cfi.py:6
evf::FastMonitoringService::fastPath_
std::string fastPath_
Definition: FastMonitoringService.h:240
evf::FastMonitoringService::preSourceEarlyTermination
void preSourceEarlyTermination(edm::TerminationOrigin)
Definition: FastMonitoringService.cc:403
evf::FastMonitoringThread
Definition: FastMonitoringThread.h:119
evf::FastMonState::sRunning
Definition: FastMonitoringService.h:71
evf::nReservedPaths
constexpr int nReservedPaths
Definition: FastMonitoringThread.h:18
evf::FastMonitoringService::exceptionInLS_
std::vector< unsigned int > exceptionInLS_
Definition: FastMonitoringService.h:286
throughputFactor
constexpr double throughputFactor()
Definition: FastMonitoringService.cc:29
edm::StreamID::value
unsigned int value() const
Definition: StreamID.h:43
evf::FastMonitoringService::postBeginJob
void postBeginJob()
Definition: FastMonitoringService.cc:442
edm::ModuleCallingContext::moduleDescription
ModuleDescription const * moduleDescription() const
Definition: ModuleCallingContext.h:50
edm::LogInfo
Log< level::Info, false > LogInfo
Definition: MessageLogger.h:125
evf::FastMonState::inWaitInput_lockPollingCopying
Definition: FastMonitoringService.h:125
evf::FastMonState::inWaitChunk_newFileWaitThreadCopying
Definition: FastMonitoringService.h:145
edm::EventID::luminosityBlock
LuminosityBlockNumber_t luminosityBlock() const
Definition: EventID.h:39
edm::LogWarning
Log< level::Warning, false > LogWarning
Definition: MessageLogger.h:122
evf::FastMonState::inSupWaitFreeThread
Definition: FastMonitoringService.h:106
evf::FastMonState::sError
Definition: FastMonitoringService.h:76
FileIO.h
FedRawDataInputSource::getEventReport
std::pair< bool, unsigned int > getEventReport(unsigned int lumi, bool erase)
Definition: FedRawDataInputSource.cc:1516
edm::ModuleDescription
Definition: ModuleDescription.h:21
evf::FastMonState::sJobReady
Definition: FastMonitoringService.h:69
evf::FastMonState::inWaitChunk_runEnd
Definition: FastMonitoringService.h:142
Utilities.operator
operator
Definition: Utilities.py:24
evf::FastMonitoringService::nThreads_
unsigned int nThreads_
Definition: FastMonitoringService.h:235
evf::FastMonState::inWaitInput_noFile
Definition: FastMonitoringService.h:127
evf::FastMonitoringService::nStreams_
unsigned int nStreams_
Definition: FastMonitoringService.h:234
evf::FastMonState::mFwkEoL
Definition: FastMonitoringService.h:58
edm::ActivityRegistry::watchPostSourceEvent
void watchPostSourceEvent(PostSourceEvent::slot_type const &iSlot)
Definition: ActivityRegistry.h:190
evf::FastMonState::inWaitInput_lockPolling
Definition: FastMonitoringService.h:124
ModuleDescription.h
evf::FastMonitoringService::accumulateFileSize
void accumulateFileSize(unsigned int lumi, unsigned long fileSize)
Definition: FastMonitoringService.cc:668
visDQMUpload.context
context
Definition: visDQMUpload.py:37
evf::FastMonState::inWaitInput_waitFreeChunk
Definition: FastMonitoringService.h:119
evf::FastMonState::inWaitChunk_fileLimit
Definition: FastMonitoringService.h:134
BXlumiParameters_cfi.lumi
lumi
Definition: BXlumiParameters_cfi.py:6
evf::FastMonitoringService::inputSource_
FedRawDataInputSource * inputSource_
Definition: FastMonitoringService.h:230
evf::FastMonitoringService::postStreamBeginLumi
void postStreamBeginLumi(edm::StreamContext const &)
Definition: FastMonitoringService.cc:588
evf::FastMonState::inNoRequestWithEoLThreads
Definition: FastMonitoringService.h:101
evf::FastMonitoringService::fastPathList_
std::vector< std::string > fastPathList_
Definition: FastMonitoringService.h:287
edm::ActivityRegistry::watchPostEndJob
void watchPostEndJob(PostEndJob::slot_type const &iSlot)
Definition: ActivityRegistry.h:170
edm::ActivityRegistry::watchPreStreamBeginLumi
void watchPreStreamBeginLumi(PreStreamBeginLumi::slot_type const &iSlot)
Definition: ActivityRegistry.h:447
evf::FastMonState::inWaitChunk_waitFreeChunkCopying
Definition: FastMonitoringService.h:136
evf::FastMonitoringService::fastName_
std::string fastName_
Definition: FastMonitoringService.h:240
edm::ConfigurationDescriptions::add
void add(std::string const &label, ParameterSetDescription const &psetDescription)
Definition: ConfigurationDescriptions.cc:57
edm::StreamContext
Definition: StreamContext.h:31
evf::FastMonitoringService::lumiStartTime_
std::map< unsigned int, timeval > lumiStartTime_
Definition: FastMonitoringService.h:245
evf::FastMonitoringService::preGlobalBeginLumi
void preGlobalBeginLumi(edm::GlobalContext const &)
Definition: FastMonitoringService.cc:470
evf::FastMonState::inWaitInput
Definition: FastMonitoringService.h:86
evf::FastMonState::inNewLumi
Definition: FastMonitoringService.h:87
edm::ActivityRegistry::watchPreBeginJob
void watchPreBeginJob(PreBeginJob::slot_type const &iSlot)
convenience function for attaching to signal
Definition: ActivityRegistry.h:151
Service.h
visualization-live-secondInstance_cfg.m
m
Definition: visualization-live-secondInstance_cfg.py:79
evf::FastMonState::inWaitChunk_lockPollingCopying
Definition: FastMonitoringService.h:141
evf::FastMonState::mCOUNT
Definition: FastMonitoringService.h:64
evf::FastMonState::inWaitInput_busy
Definition: FastMonitoringService.h:123
edm::ActivityRegistry::watchPreEvent
void watchPreEvent(PreEvent::slot_type const &iSlot)
Definition: ActivityRegistry.h:474
evf::FastMonitoringService::fileLookStart_
timeval fileLookStart_
Definition: FastMonitoringService.h:246
edm::ActivityRegistry
Definition: ActivityRegistry.h:134
evf::FastMonitoringService::preStreamBeginLumi
void preStreamBeginLumi(edm::StreamContext const &)
Definition: FastMonitoringService.cc:575
evf::FastMonitoringService::postModuleEvent
void postModuleEvent(edm::StreamContext const &, edm::ModuleCallingContext const &)
Definition: FastMonitoringService.cc:649
evf::nReservedModules
constexpr int nReservedModules
Definition: FastMonitoringThread.h:16
evf::FastMonState::inSupFileLimit
Definition: FastMonitoringService.h:103
edm::ActivityRegistry::watchPostBeginJob
void watchPostBeginJob(PostBeginJob::slot_type const &iSlot)
convenience function for attaching to signal
Definition: ActivityRegistry.h:158
evf::FastMonState::inWaitInput_fileLimit
Definition: FastMonitoringService.h:118
ParameterSetDescription.h
Json::StyledWriter
Writes a Value in JSON format in a human friendly way.
Definition: writer.h:63
evf::FastMonState::inSupBusy
Definition: FastMonitoringService.h:108
evf::FastMonState::inWaitInput_runEnd
Definition: FastMonitoringService.h:126
evf::FastMonState::inWaitInput_waitFreeChunkCopying
Definition: FastMonitoringService.h:120
evf::FastMonState::Microstate
Microstate
Definition: FastMonitoringService.h:53
edm::ConfigurationDescriptions
Definition: ConfigurationDescriptions.h:28
evf::FastMonitoringService::postModuleEventAcquire
void postModuleEventAcquire(edm::StreamContext const &, edm::ModuleCallingContext const &)
Definition: FastMonitoringService.cc:638
evf::FastMonState::inWaitChunk_newFileWaitChunk
Definition: FastMonitoringService.h:148
evf::FastMonitoringService::fillDescriptions
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
Definition: FastMonitoringService.cc:186
evf::FastMonitoringService::postGlobalEndLumi
void postGlobalEndLumi(edm::GlobalContext const &)
Definition: FastMonitoringService.cc:563
evf::MicroStateService::MicroStateService
MicroStateService(const edm::ParameterSet &, edm::ActivityRegistry &)
Definition: FastMonitoringService.cc:990
evf::FastMonitoringService::getAbortFlagForLumi
bool getAbortFlagForLumi(unsigned int lumi)
Definition: FastMonitoringService.cc:742
UnixSignalHandlers.h
evf::FastMonState::inSupNewFile
Definition: FastMonitoringService.h:112
evf::FastMonitoringService::~FastMonitoringService
~FastMonitoringService() override
Definition: FastMonitoringService.cc:184
edm::GlobalContext
Definition: GlobalContext.h:29
edm::service::SystemBounds
Definition: SystemBounds.h:29
evf::FastMonState::mEoL
Definition: FastMonitoringService.h:62
LogDebug
#define LogDebug(id)
Definition: MessageLogger.h:233
edm::ActivityRegistry::watchPostEvent
void watchPostEvent(PostEvent::slot_type const &iSlot)
Definition: ActivityRegistry.h:480
edm::ParameterSet
Definition: ParameterSet.h:47
edm::service::SystemBounds::maxNumberOfThreads
unsigned int maxNumberOfThreads() const
Definition: SystemBounds.h:38
Json::Value::append
Value & append(const Value &value)
Append value to array at the end.
ValidateTausOnZEEFastSim_cff.proc
proc
Definition: ValidateTausOnZEEFastSim_cff.py:6
evf::FastMonitoringService::streamCounterUpdating_
std::vector< std::atomic< bool > * > streamCounterUpdating_
Definition: FastMonitoringService.h:266
edm::PathsAndConsumesOfModulesBase::endPaths
std::vector< std::string > const & endPaths() const
Definition: PathsAndConsumesOfModulesBase.h:40
GlobalContext.h
AlCaHLTBitMon_ParallelJobs.p
def p
Definition: AlCaHLTBitMon_ParallelJobs.py:153
evf::FastMonState::inWaitChunk_busy
Definition: FastMonitoringService.h:139
evf::FastMonState::inSupLockPolling
Definition: FastMonitoringService.h:109
Event.h
evf::FastMonitoringService::preModuleEvent
void preModuleEvent(edm::StreamContext const &, edm::ModuleCallingContext const &)
Definition: FastMonitoringService.cc:644
CommonMethods.lock
def lock()
Definition: CommonMethods.py:81
edm::StreamContext::streamID
StreamID const & streamID() const
Definition: StreamContext.h:54
edm::PathContext
Definition: PathContext.h:24
edm::ActivityRegistry::watchPreStreamEndLumi
void watchPreStreamEndLumi(PreStreamEndLumi::slot_type const &iSlot)
Definition: ActivityRegistry.h:461
evf::FastMonState::inWaitChunk_waitFreeChunk
Definition: FastMonitoringService.h:135
evf::FastMonState::inWaitInput_waitFreeThread
Definition: FastMonitoringService.h:121
edm::shutdown_flag
volatile std::atomic< bool > shutdown_flag
Definition: UnixSignalHandlers.cc:22
evf::FastMonitoringService::lumiFromSource_
unsigned int lumiFromSource_
Definition: FastMonitoringService.h:250
edm::ActivityRegistry::watchPreStreamEarlyTermination
void watchPreStreamEarlyTermination(PreStreamEarlyTermination::slot_type const &iSlot)
Definition: ActivityRegistry.h:499
evf::FastMonState::inWaitChunk_waitFreeThread
Definition: FastMonitoringService.h:137
evf::FastMonitoringService::pathLegendFile_
std::string pathLegendFile_
Definition: FastMonitoringService.h:279
evf::FastMonState::MCOUNT
Definition: FastMonitoringService.h:80
evf::FastMonitoringService::avgLeadTime_
std::map< unsigned int, double > avgLeadTime_
Definition: FastMonitoringService.h:254
evf::FastMonitoringService::postSourceEvent
void postSourceEvent(edm::StreamID)
Definition: FastMonitoringService.cc:629
edm::Service
Definition: Service.h:30
evf::FastMonState::inWaitInput_newFile
Definition: FastMonitoringService.h:128
createfilelist.int
int
Definition: createfilelist.py:10
EvFDaqDirector.h
TerminationOrigin
FastMonitoringService.h
evf::FastMonitoringService::monInit_
std::atomic< bool > monInit_
Definition: FastMonitoringService.h:284
edm::LuminosityBlockID::luminosityBlock
LuminosityBlockNumber_t luminosityBlock() const
Definition: LuminosityBlockID.h:42
evf::FastMonitoringService::setMicroState
void setMicroState(FastMonState::Microstate)
Definition: FastMonitoringService.cc:657
evf::FastMonState::inSupWaitFreeChunk
Definition: FastMonitoringService.h:104
edm::service::SystemBounds::maxNumberOfStreams
unsigned int maxNumberOfStreams() const
Definition: SystemBounds.h:35
evf::FastMonitoringService::preModuleBeginJob
void preModuleBeginJob(edm::ModuleDescription const &)
Definition: FastMonitoringService.cc:427
evf::FastMonState::inWaitInput_newFileWaitThread
Definition: FastMonitoringService.h:130
evf::FastMonitoringService::lastGlobalLumi_
unsigned int lastGlobalLumi_
Definition: FastMonitoringService.h:248
evf::FastMonitoringService::jobFailure
void jobFailure()
Definition: FastMonitoringService.cc:424
evf::FastMonitoringService::prePathEvent
void prePathEvent(edm::StreamContext const &, edm::PathContext const &)
Definition: FastMonitoringService.cc:607
evf::FastMonitoringService::nOutputModules_
unsigned int nOutputModules_
Definition: FastMonitoringService.h:282
evf::FastMonState::mFwkOvhSrc
Definition: FastMonitoringService.h:56
evf::FastMonitoringService::inputState_
std::atomic< FastMonState::InputState > inputState_
Definition: FastMonitoringService.h:231
PathContext.h
res
Definition: Electron.h:6
evf::FastMonitoringService::slowName_
std::string slowName_
Definition: FastMonitoringService.h:240
evf::FastMonitoringService::postGlobalBeginRun
void postGlobalBeginRun(edm::GlobalContext const &)
Definition: FastMonitoringService.cc:465
evf::FastMonitoringService::preEvent
void preEvent(edm::StreamContext const &)
Definition: FastMonitoringService.cc:611
edm::PathContext::pathName
std::string const & pathName() const
Definition: PathContext.h:30
AlCaHLTBitMon_QueryRunRegistry.string
string string
Definition: AlCaHLTBitMon_QueryRunRegistry.py:256
FedRawDataInputSource.h
evf::FastMonitoringService::reportLockWait
void reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount)
Definition: FastMonitoringService.cc:718
jsoncollector::IntJ::value
long & value()
Definition: JsonMonitorable.h:88
jsoncollector
Definition: DataPoint.h:26
evf::FastMonState::inWaitChunk_waitFreeThreadCopying
Definition: FastMonitoringService.h:138
evf::FastMonitoringService::postEvent
void postEvent(edm::StreamContext const &)
Definition: FastMonitoringService.cc:613
evf::FastMonState::inSupNoFile
Definition: FastMonitoringService.h:111
evf::FastMonState::sJobEnded
Definition: FastMonitoringService.h:75
evf::FastMonitoringService::isInitTransition_
std::atomic< bool > isInitTransition_
Definition: FastMonitoringService.h:249
edm::ActivityRegistry::watchPreGlobalEndLumi
void watchPreGlobalEndLumi(PreGlobalEndLumi::slot_type const &iSlot)
Definition: ActivityRegistry.h:421
evf::FastMonitoringService::inputSupervisorState_
std::atomic< FastMonState::InputState > inputSupervisorState_
Definition: FastMonitoringService.h:232
edm::ActivityRegistry::watchPreallocate
void watchPreallocate(Preallocate::slot_type const &iSlot)
Definition: ActivityRegistry.h:144
edm::ActivityRegistry::watchPreGlobalEarlyTermination
void watchPreGlobalEarlyTermination(PreGlobalEarlyTermination::slot_type const &iSlot)
Definition: ActivityRegistry.h:508
evf::FastMonitoringService::fmt_
std::shared_ptr< FastMonitoringThread > fmt_
Definition: FastMonitoringService.h:227
edm::ActivityRegistry::watchPostStreamBeginLumi
void watchPostStreamBeginLumi(PostStreamBeginLumi::slot_type const &iSlot)
Definition: ActivityRegistry.h:454
submitPVResolutionJobs.desc
string desc
Definition: submitPVResolutionJobs.py:251
evf::FastMonitoringService::nopath_
static const std::string nopath_
Definition: FastMonitoringService.h:160
evf::FastMonitoringService::fileLookStop_
timeval fileLookStop_
Definition: FastMonitoringService.h:246
edm::ActivityRegistry::watchPreGlobalBeginLumi
void watchPreGlobalBeginLumi(PreGlobalBeginLumi::slot_type const &iSlot)
Definition: ActivityRegistry.h:407
evf::FastMonState::inWaitInput_newFileWaitChunk
Definition: FastMonitoringService.h:132
evf::FastMonitoringService::makeInputLegendaJson
std::string makeInputLegendaJson()
Definition: FastMonitoringService.cc:231
edm::TerminationOrigin::ExceptionFromAnotherContext
edm::ActivityRegistry::watchPreModuleBeginJob
void watchPreModuleBeginJob(PreModuleBeginJob::slot_type const &iSlot)
Definition: ActivityRegistry.h:749
evf::FastMonitoringService::preGlobalEarlyTermination
void preGlobalEarlyTermination(edm::GlobalContext const &, edm::TerminationOrigin)
Definition: FastMonitoringService.cc:388
evf::FastMonitoringService::inputStateNames
static const std::string inputStateNames[FastMonState::inCOUNT]
Definition: FastMonitoringService.h:158
edm::ActivityRegistry::watchPostModuleEvent
void watchPostModuleEvent(PostModuleEvent::slot_type const &iSlot)
Definition: ActivityRegistry.h:799
edm::PathsAndConsumesOfModulesBase::paths
std::vector< std::string > const & paths() const
Definition: PathsAndConsumesOfModulesBase.h:39
evf::FastMonState::mBoL
Definition: FastMonitoringService.h:61
edm::PathsAndConsumesOfModulesBase
Definition: PathsAndConsumesOfModulesBase.h:35
evf::FastMonitoringService::accuSize_
std::map< unsigned int, unsigned long > accuSize_
Definition: FastMonitoringService.h:257
relativeConstraints.empty
bool empty
Definition: relativeConstraints.py:46
edm::ActivityRegistry::watchPostModuleEventAcquire
void watchPostModuleEventAcquire(PostModuleEventAcquire::slot_type const &iSlot)
Definition: ActivityRegistry.h:813
evf::FastMonitoringService::makePathLegendaJson
std::string makePathLegendaJson()
Definition: FastMonitoringService.cc:198
Exception
Definition: hltDiff.cc:245
edm::ActivityRegistry::watchPreSourceEvent
void watchPreSourceEvent(PreSourceEvent::slot_type const &iSlot)
Definition: ActivityRegistry.h:184
evf::FastMonitoringService::threadIDAvailable_
bool threadIDAvailable_
Definition: FastMonitoringService.h:273
evf
Definition: fillJson.h:27
evf::FastMonitoringService::filePerFwkStream_
bool filePerFwkStream_
Definition: FastMonitoringService.h:241
HiBiasedCentrality_cfi.function
function
Definition: HiBiasedCentrality_cfi.py:4
evf::FastMonitoringService::startedLookingForFile
void startedLookingForFile()
Definition: FastMonitoringService.cc:682
FastMonitoringThread.h
evf::FastMonState::inSupNewFileWaitThreadCopying
Definition: FastMonitoringService.h:113
evf::FastMonitoringService::makeModuleLegendaJson
std::string makeModuleLegendaJson()
Definition: FastMonitoringService.cc:210
evf::FastMonState::mIdle
Definition: FastMonitoringService.h:55
evf::FastMonState::sInit
Definition: FastMonitoringService.h:68
evf::FastMonitoringService::preallocate
void preallocate(edm::service::SystemBounds const &)
Definition: FastMonitoringService.cc:241
evf::FastMonState::inCOUNT
Definition: FastMonitoringService.h:149
evf::FastMonitoringService::setExceptionDetected
void setExceptionDetected(unsigned int ls)
Definition: FastMonitoringService.cc:417
jsoncollector::IntJ
Definition: JsonMonitorable.h:66
funct::void
TEMPL(T2) struct Divides void
Definition: Factorize.h:24
evf::FastMonState::inRunEnd
Definition: FastMonitoringService.h:90
evf::FastMonitoringService::pathLegendFileJson_
std::string pathLegendFileJson_
Definition: FastMonitoringService.h:280
evf::FastMonitoringService::totalEventsProcessed_
std::atomic< unsigned long > totalEventsProcessed_
Definition: FastMonitoringService.h:275
evf::FastMonState::inWaitChunk_newFile
Definition: FastMonitoringService.h:144
SiStripCommissioningSource_FromEDM_cfg.FastMonitoringService
FastMonitoringService
Definition: SiStripCommissioningSource_FromEDM_cfg.py:49
evf::FastMonState::mInvalid
Definition: FastMonitoringService.h:54
evf::FastMonState::inWaitChunk_newFileWaitThread
Definition: FastMonitoringService.h:146
evf::FastMonState::mFwkOvhMod
Definition: FastMonitoringService.h:57
evf::FastMonState::mInput
Definition: FastMonitoringService.h:59
castor_dqm_sourceclient_file_cfg.path
path
Definition: castor_dqm_sourceclient_file_cfg.py:37
evf::FastMonitoringService::fastMicrostateDefPath_
std::string fastMicrostateDefPath_
Definition: FastMonitoringService.h:239
evf::FastMonState::inSupWaitFreeChunkCopying
Definition: FastMonitoringService.h:105
evf::FastMonState::inWaitInput_newFileWaitChunkCopying
Definition: FastMonitoringService.h:131
evf::FastMonitoringService::leadTimes_
std::vector< double > leadTimes_
Definition: FastMonitoringService.h:258
evf::FastMonitoringService::microstateDefPath_
std::string microstateDefPath_
Definition: FastMonitoringService.h:239
edm::ActivityRegistry::watchPrePathEvent
void watchPrePathEvent(PrePathEvent::slot_type const &iSlot)
Definition: ActivityRegistry.h:486
StreamContext.h
evf::FastMonitoringService::moduleLegendFile_
std::string moduleLegendFile_
Definition: FastMonitoringService.h:277
evf::FastMonitoringService::postStreamEndLumi
void postStreamEndLumi(edm::StreamContext const &)
Definition: FastMonitoringService.cc:603
edm_modernize_messagelogger.stat
stat
Definition: edm_modernize_messagelogger.py:27
evf::FastMonitoringService::filesProcessedDuringLumi_
std::map< unsigned int, unsigned int > filesProcessedDuringLumi_
Definition: FastMonitoringService.h:255
evf::MicroStateService::~MicroStateService
virtual ~MicroStateService()
Definition: FastMonitoringService.cc:992
evf::FastMonitoringService::preStreamEarlyTermination
void preStreamEarlyTermination(edm::StreamContext const &, edm::TerminationOrigin)
Definition: FastMonitoringService.cc:373
evf::FastMonitoringService::snapCounter_
unsigned int snapCounter_
Definition: FastMonitoringService.h:238
evf::FastMonitoringService::stoppedLookingForFile
void stoppedLookingForFile(unsigned int lumi)
Definition: FastMonitoringService.cc:690
evf::FastMonitoringService::postEndJob
void postEndJob()
Definition: FastMonitoringService.cc:460
evf::FastMonitoringService::preBeginJob
void preBeginJob(edm::PathsAndConsumesOfModulesBase const &, edm::ProcessContext const &pc)
Definition: FastMonitoringService.cc:251
edm::ActivityRegistry::watchPreModuleEventAcquire
void watchPreModuleEventAcquire(PreModuleEventAcquire::slot_type const &iSlot)
Definition: ActivityRegistry.h:805
PathsAndConsumesOfModulesBase.h
lumi
Definition: LumiSectionData.h:20
evf::FastMonState::inSupLockPollingCopying
Definition: FastMonitoringService.h:110
evf::FastMonState::inWaitInput_waitFreeThreadCopying
Definition: FastMonitoringService.h:122
SystemBounds.h
evf::FastMonitoringService::preModuleEventAcquire
void preModuleEventAcquire(edm::StreamContext const &, edm::ModuleCallingContext const &)
Definition: FastMonitoringService.cc:633
evf::FastMonitoringService::snapshotRunner
void snapshotRunner()
Definition: FastMonitoringService.cc:758
evf::MicroStateService
Definition: MicroStateService.h:13
evf::FastMonitoringService::sleepTime_
int sleepTime_
Definition: FastMonitoringService.h:236
edm::StreamContext::eventID
EventID const & eventID() const
Definition: StreamContext.h:59
edm::ActivityRegistry::watchPreModuleEvent
void watchPreModuleEvent(PreModuleEvent::slot_type const &iSlot)
Definition: ActivityRegistry.h:793
Json::Value
Represents a JSON value.
Definition: value.h:99
evf::FastMonState::inWaitInput_newFileWaitThreadCopying
Definition: FastMonitoringService.h:129
evf::FastMonState::inWaitChunk_newFileWaitChunkCopying
Definition: FastMonitoringService.h:147
edm::ModuleCallingContext
Definition: ModuleCallingContext.h:29