CMS 3D CMS Logo

FastMonitoringService.cc
Go to the documentation of this file.
3 
15 
19 
22 
23 #include <iostream>
24 #include <iomanip>
25 #include <sys/time.h>
26 
27 using namespace jsoncollector;
28 
29 constexpr double throughputFactor() { return (1000000) / double(1024 * 1024); }
30 
31 namespace evf {
32 
33  const edm::ModuleDescription FastMonitoringService::reservedMicroStateNames[FastMonState::mCOUNT] = {
34  edm::ModuleDescription("Dummy", "Invalid"),
35  edm::ModuleDescription("Dummy", "Idle"),
36  edm::ModuleDescription("Dummy", "FwkOvhSrc"),
37  edm::ModuleDescription("Dummy", "FwkOvhMod"), //set post produce, analyze or filter
38  edm::ModuleDescription("Dummy", "FwkEoL"),
39  edm::ModuleDescription("Dummy", "Input"),
40  edm::ModuleDescription("Dummy", "DQM"),
41  edm::ModuleDescription("Dummy", "BoL"),
42  edm::ModuleDescription("Dummy", "EoL"),
43  edm::ModuleDescription("Dummy", "GlobalEoL")};
44 
45  const std::string FastMonitoringService::macroStateNames[FastMonState::MCOUNT] = {"Init",
46  "JobReady",
47  "RunGiven",
48  "Running",
49  "Stopping",
50  "Done",
51  "JobEnded",
52  "Error",
53  "ErrorEnded",
54  "End",
55  "Invalid"};
56 
57  const std::string FastMonitoringService::inputStateNames[FastMonState::inCOUNT] = {
58  "Ignore",
59  "Init",
60  "WaitInput",
61  "NewLumi",
62  "NewLumiBusyEndingLS",
63  "NewLumiIdleEndingLS",
64  "RunEnd",
65  "ProcessingFile",
66  "WaitChunk",
67  "ChunkReceived",
68  "ChecksumEvent",
69  "CachedEvent",
70  "ReadEvent",
71  "ReadCleanup",
72  "NoRequest",
73  "NoRequestWithIdleThreads",
74  "NoRequestWithGlobalEoL",
75  "NoRequestWithEoLThreads",
76  "SupFileLimit",
77  "SupWaitFreeChunk",
78  "SupWaitFreeChunkCopying",
79  "SupWaitFreeThread",
80  "SupWaitFreeThreadCopying",
81  "SupBusy",
82  "SupLockPolling",
83  "SupLockPollingCopying",
84  "SupNoFile",
85  "SupNewFile",
86  "SupNewFileWaitThreadCopying",
87  "SupNewFileWaitThread",
88  "SupNewFileWaitChunkCopying",
89  "SupNewFileWaitChunk",
90  "WaitInput_fileLimit",
91  "WaitInput_waitFreeChunk",
92  "WaitInput_waitFreeChunkCopying",
93  "WaitInput_waitFreeThread",
94  "WaitInput_waitFreeThreadCopying",
95  "WaitInput_busy",
96  "WaitInput_lockPolling",
97  "WaitInput_lockPollingCopying",
98  "WaitInput_runEnd",
99  "WaitInput_noFile",
100  "WaitInput_newFile",
101  "WaitInput_newFileWaitThreadCopying",
102  "WaitInput_newFileWaitThread",
103  "WaitInput_newFileWaitChunkCopying",
104  "WaitInput_newFileWaitChunk",
105  "WaitChunk_fileLimit",
106  "WaitChunk_waitFreeChunk",
107  "WaitChunk_waitFreeChunkCopying",
108  "WaitChunk_waitFreeThread",
109  "WaitChunk_waitFreeThreadCopying",
110  "WaitChunk_busy",
111  "WaitChunk_lockPolling",
112  "WaitChunk_lockPollingCopying",
113  "WaitChunk_runEnd",
114  "WaitChunk_noFile",
115  "WaitChunk_newFile",
116  "WaitChunk_newFileWaitThreadCopying",
117  "WaitChunk_newFileWaitThread",
118  "WaitChunk_newFileWaitChunkCopying",
119  "WaitChunk_newFileWaitChunk",
120  "inSupThrottled",
121  "inThrottled"};
122 
123  const std::string FastMonitoringService::nopath_ = "NoPath";
124 
126  : MicroStateService(iPS, reg),
127  fmt_(new FastMonitoringThread()),
128  nStreams_(0) //until initialized
129  ,
130  sleepTime_(iPS.getUntrackedParameter<int>("sleepTime", 1)),
131  fastMonIntervals_(iPS.getUntrackedParameter<unsigned int>("fastMonIntervals", 2)),
132  fastName_("fastmoni"),
133  slowName_("slowmoni"),
134  filePerFwkStream_(iPS.getUntrackedParameter<bool>("filePerFwkStream", false)),
135  totalEventsProcessed_(0) {
136  reg.watchPreallocate(this, &FastMonitoringService::preallocate); //receiving information on number of threads
138 
143 
147 
152 
154 
155  reg.watchPreEvent(this, &FastMonitoringService::preEvent); //stream
157 
158  reg.watchPreSourceEvent(this, &FastMonitoringService::preSourceEvent); //source (with streamID of requestor)
160 
163 
166 
170 
171  //find microstate definition path (required by the module)
172  struct stat statbuf;
173  std::string microstateBaseSuffix = "src/EventFilter/Utilities/plugins/microstatedef.jsd";
174  std::string microstatePath = std::string(std::getenv("CMSSW_BASE")) + "/" + microstateBaseSuffix;
175  if (stat(microstatePath.c_str(), &statbuf)) {
176  microstatePath = std::string(std::getenv("CMSSW_RELEASE_BASE")) + "/" + microstateBaseSuffix;
177  if (stat(microstatePath.c_str(), &statbuf)) {
178  microstatePath = microstateBaseSuffix;
179  if (stat(microstatePath.c_str(), &statbuf))
180  throw cms::Exception("FastMonitoringService") << "microstate definition file not found";
181  }
182  }
183  fastMicrostateDefPath_ = microstateDefPath_ = microstatePath;
184  }
185 
187 
190  desc.setComment("Service for File-based DAQ monitoring and event accounting");
191  desc.addUntracked<int>("sleepTime", 1)->setComment("Sleep time of the monitoring thread");
192  desc.addUntracked<unsigned int>("fastMonIntervals", 2)
193  ->setComment("Modulo of sleepTime intervals on which fastmon file is written out");
194  desc.addUntracked<bool>("filePerFwkStream", false)
195  ->setComment("Switches on monitoring output per framework stream");
196  desc.setAllowAnything();
197  descriptions.add("FastMonitoringService", desc);
198  }
199 
201  Json::Value legendaVector(Json::arrayValue);
202  for (int i = 0; i < fmt_->m_data.encPath_[0].current_; i++)
203  legendaVector.append(Json::Value(*(static_cast<const std::string*>(fmt_->m_data.encPath_[0].decode(i)))));
204  Json::Value valReserved(nReservedPaths);
205  Json::Value pathLegend;
206  pathLegend["names"] = legendaVector;
207  pathLegend["reserved"] = valReserved;
209  return writer.write(pathLegend);
210  }
211 
213  Json::Value legendaVector(Json::arrayValue);
214  for (int i = 0; i < fmt_->m_data.encModule_.current_; i++)
215  legendaVector.append(
216  Json::Value((static_cast<const edm::ModuleDescription*>(fmt_->m_data.encModule_.decode(i)))->moduleLabel()));
217  //duplicate modules adding a list for acquire states (not all modules actually have it)
218  for (int i = 0; i < fmt_->m_data.encModule_.current_; i++)
219  legendaVector.append(Json::Value(
220  (static_cast<const edm::ModuleDescription*>(fmt_->m_data.encModule_.decode(i)))->moduleLabel() + "__ACQ"));
221  Json::Value valReserved(nReservedModules);
222  Json::Value valSpecial(nSpecialModules);
223  Json::Value valOutputModules(nOutputModules_);
224  Json::Value moduleLegend;
225  moduleLegend["names"] = legendaVector;
226  moduleLegend["reserved"] = valReserved;
227  moduleLegend["special"] = valSpecial;
228  moduleLegend["output"] = valOutputModules;
230  return writer.write(moduleLegend);
231  }
232 
234  Json::Value legendaVector(Json::arrayValue);
235  for (int i = 0; i < FastMonState::inCOUNT; i++)
236  legendaVector.append(Json::Value(inputStateNames[i]));
237  Json::Value moduleLegend;
238  moduleLegend["names"] = legendaVector;
240  return writer.write(moduleLegend);
241  }
242 
244  nStreams_ = bounds.maxNumberOfStreams();
245  nThreads_ = bounds.maxNumberOfThreads();
246  //this should already be >=1
247  if (nStreams_ == 0)
248  nStreams_ = 1;
249  if (nThreads_ == 0)
250  nThreads_ = 1;
251  }
252 
254  edm::ProcessContext const& pc) {
255  // FIND RUN DIRECTORY
256  // The run dir should be set via the configuration of EvFDaqDirector
257 
258  if (edm::Service<evf::EvFDaqDirector>().operator->() == nullptr) {
259  throw cms::Exception("FastMonitoringService") << "EvFDaqDirector is not present";
260  }
261  std::filesystem::path runDirectory{edm::Service<evf::EvFDaqDirector>()->baseRunDir()};
262  workingDirectory_ = runDirectory_ = runDirectory;
263  workingDirectory_ /= "mon";
264 
265  if (!std::filesystem::is_directory(workingDirectory_)) {
266  LogDebug("FastMonitoringService") << "<MON> DIR NOT FOUND! Trying to create -: " << workingDirectory_.string();
267  std::filesystem::create_directories(workingDirectory_);
268  if (!std::filesystem::is_directory(workingDirectory_))
269  edm::LogWarning("FastMonitoringService") << "Unable to create <MON> DIR -: " << workingDirectory_.string()
270  << ". No monitoring data will be written.";
271  }
272 
273  std::ostringstream fastFileName;
274 
275  fastFileName << fastName_ << "_pid" << std::setfill('0') << std::setw(5) << getpid() << ".fast";
277  fast /= fastFileName.str();
278  fastPath_ = fast.string();
279  if (filePerFwkStream_)
280  for (unsigned int i = 0; i < nStreams_; i++) {
281  std::ostringstream fastFileNameTid;
282  fastFileNameTid << fastName_ << "_pid" << std::setfill('0') << std::setw(5) << getpid() << "_tid" << i
283  << ".fast";
285  fastTid /= fastFileNameTid.str();
286  fastPathList_.push_back(fastTid.string());
287  }
288 
289  std::ostringstream moduleLegFile;
290  std::ostringstream moduleLegFileJson;
291  moduleLegFile << "microstatelegend_pid" << std::setfill('0') << std::setw(5) << getpid() << ".leg";
292  moduleLegFileJson << "microstatelegend_pid" << std::setfill('0') << std::setw(5) << getpid() << ".jsn";
293  moduleLegendFile_ = (workingDirectory_ / moduleLegFile.str()).string();
294  moduleLegendFileJson_ = (workingDirectory_ / moduleLegFileJson.str()).string();
295 
296  std::ostringstream pathLegFile;
297  std::ostringstream pathLegFileJson;
298  pathLegFile << "pathlegend_pid" << std::setfill('0') << std::setw(5) << getpid() << ".leg";
299  pathLegendFile_ = (workingDirectory_ / pathLegFile.str()).string();
300  pathLegFileJson << "pathlegend_pid" << std::setfill('0') << std::setw(5) << getpid() << ".jsn";
301  pathLegendFileJson_ = (workingDirectory_ / pathLegFileJson.str()).string();
302 
303  std::ostringstream inputLegFileJson;
304  inputLegFileJson << "inputlegend_pid" << std::setfill('0') << std::setw(5) << getpid() << ".jsn";
305  inputLegendFileJson_ = (workingDirectory_ / inputLegFileJson.str()).string();
306 
307  LogDebug("FastMonitoringService") << "Initializing FastMonitor with microstate def path -: " << microstateDefPath_;
308  //<< encPath_.current_ + 1 << " " << encModule_.current_ + 1
309 
310  /*
311  * initialize the fast monitor with:
312  * vector of pointers to monitorable parameters
313  * path to definition
314  *
315  */
316 
317  fmt_->m_data.macrostate_ = FastMonState::sInit;
318 
319  for (unsigned int i = 0; i < (FastMonState::mCOUNT); i++)
320  fmt_->m_data.encModule_.updateReserved(static_cast<const void*>(reservedMicroStateNames + i));
321  fmt_->m_data.encModule_.completeReservedWithDummies();
322 
323  for (unsigned int i = 0; i < nStreams_; i++) {
324  fmt_->m_data.ministate_.emplace_back(&nopath_);
325  fmt_->m_data.microstate_.emplace_back(&reservedMicroStateNames[FastMonState::mInvalid]);
326  fmt_->m_data.microstateAcqFlag_.push_back(0);
327 
328  //for synchronization
329  streamCounterUpdating_.push_back(new std::atomic<bool>(false));
330 
331  //path (mini) state
332  fmt_->m_data.encPath_.emplace_back(0);
333  fmt_->m_data.encPath_[i].update(static_cast<const void*>(&nopath_));
334 
335  for (auto& path : pathsInfo.paths()) {
336  fmt_->m_data.encPath_[i].updatePreinit(path);
337  }
338  for (auto& endPath : pathsInfo.endPaths()) {
339  fmt_->m_data.encPath_[i].updatePreinit(endPath);
340  }
341  }
342  //for (unsigned int i=0;i<nThreads_;i++)
343  // threadMicrostate_.push_back(&reservedMicroStateNames[mInvalid]);
344 
345  //initial size until we detect number of bins
346  fmt_->m_data.macrostateBins_ = FastMonState::MCOUNT;
347  fmt_->m_data.microstateBins_ = 0;
348  fmt_->m_data.inputstateBins_ = FastMonState::inCOUNT;
349  fmt_->m_data.ministateBins_ = fmt_->m_data.encPath_[0].vecsize();
350 
351  lastGlobalLumi_ = 0;
352  isInitTransition_ = true;
353  lumiFromSource_ = 0;
354 
355  //startup monitoring
356  fmt_->resetFastMonitor(microstateDefPath_, fastMicrostateDefPath_);
357  fmt_->jsonMonitor_->setNStreams(nStreams_);
358  fmt_->m_data.registerVariables(fmt_->jsonMonitor_.get(), nStreams_, threadIDAvailable_ ? nThreads_ : 0);
359  monInit_.store(false, std::memory_order_release);
360  if (sleepTime_ > 0)
362 
363  //this definition needs: #include "tbb/compat/thread"
364  //however this would results in TBB imeplementation replacing std::thread
365  //(both supposedly call pthread_self())
366  //number of threads created in process could be obtained from /proc,
367  //assuming that all posix threads are true kernel threads capable of running in parallel
368 
369  //#if TBB_IMPLEMENT_CPP0X
371  //threadIDAvailable_=true;
372  //#endif
373  }
374 
378  context = " FromThisContext ";
380  context = " FromAnotherContext";
382  context = " FromExternalSignal";
383  edm::LogWarning("FastMonitoringService")
384  << " STREAM " << sc.streamID().value() << " earlyTermination -: ID:" << sc.eventID()
385  << " LS:" << sc.eventID().luminosityBlock() << " " << context;
386  std::lock_guard<std::mutex> lock(fmt_->monlock_);
387  exceptionInLS_.push_back(sc.eventID().luminosityBlock());
388  }
389 
393  context = " FromThisContext ";
395  context = " FromAnotherContext";
397  context = " FromExternalSignal";
398  edm::LogWarning("FastMonitoringService")
399  << " GLOBAL "
400  << "earlyTermination -: LS:" << gc.luminosityBlockID().luminosityBlock() << " " << context;
401  std::lock_guard<std::mutex> lock(fmt_->monlock_);
403  }
404 
408  context = " FromThisContext ";
410  context = " FromAnotherContext";
412  context = " FromExternalSignal";
413  edm::LogWarning("FastMonitoringService") << " SOURCE "
414  << "earlyTermination -: " << context;
415  std::lock_guard<std::mutex> lock(fmt_->monlock_);
416  exception_detected_ = true;
417  }
418 
420  if (!ls)
421  exception_detected_ = true;
422  else
423  exceptionInLS_.push_back(ls);
424  }
425 
427 
428  //new output module name is stream
430  std::lock_guard<std::mutex> lock(fmt_->monlock_);
431  //std::cout << " Pre module Begin Job module: " << desc.moduleName() << std::endl;
432 
433  //build a map of modules keyed by their module description address
434  //here we need to treat output modules in a special way so they can be easily singled out
435  if (desc.moduleName() == "Stream" || desc.moduleName() == "ShmStreamConsumer" ||
436  desc.moduleName() == "EvFOutputModule" || desc.moduleName() == "EventStreamFileWriter" ||
437  desc.moduleName() == "PoolOutputModule") {
438  fmt_->m_data.encModule_.updateReserved((void*)&desc);
439  nOutputModules_++;
440  } else
441  fmt_->m_data.encModule_.update((void*)&desc);
442  }
443 
445  std::string&& moduleLegStrJson = makeModuleLegendaJson();
446  FileIO::writeStringToFile(moduleLegendFileJson_, moduleLegStrJson);
447 
448  std::string inputLegendStrJson = makeInputLegendaJson();
449  FileIO::writeStringToFile(inputLegendFileJson_, inputLegendStrJson);
450 
451  std::string pathLegendStrJson = makePathLegendaJson();
452  FileIO::writeStringToFile(pathLegendFileJson_, pathLegendStrJson);
453 
454  fmt_->m_data.macrostate_ = FastMonState::sJobReady;
455 
456  //update number of entries in module histogram
457  std::lock_guard<std::mutex> lock(fmt_->monlock_);
458  //double the size to add post-acquire states
459  fmt_->m_data.microstateBins_ = fmt_->m_data.encModule_.vecsize() * 2;
460  }
461 
463  fmt_->m_data.macrostate_ = FastMonState::sJobEnded;
464  fmt_->stop();
465  }
466 
468  fmt_->m_data.macrostate_ = FastMonState::sRunning;
469  isInitTransition_ = false;
470  }
471 
473  timeval lumiStartTime;
474  gettimeofday(&lumiStartTime, nullptr);
475  unsigned int newLumi = gc.luminosityBlockID().luminosityBlock();
476  lastGlobalLumi_ = newLumi;
477 
478  std::lock_guard<std::mutex> lock(fmt_->monlock_);
479  lumiStartTime_[newLumi] = lumiStartTime;
480  }
481 
483  unsigned int lumi = gc.luminosityBlockID().luminosityBlock();
484  LogDebug("FastMonitoringService") << "Lumi ended. Writing JSON information. LUMI -: " << lumi;
485  timeval lumiStopTime;
486  gettimeofday(&lumiStopTime, nullptr);
487 
488  std::lock_guard<std::mutex> lock(fmt_->monlock_);
489 
490  // Compute throughput
491  timeval stt = lumiStartTime_[lumi];
492  lumiStartTime_.erase(lumi);
493  unsigned long usecondsForLumi = (lumiStopTime.tv_sec - stt.tv_sec) * 1000000 + (lumiStopTime.tv_usec - stt.tv_usec);
494  unsigned long accuSize = accuSize_.find(lumi) == accuSize_.end() ? 0 : accuSize_[lumi];
495  accuSize_.erase(lumi);
496  double throughput = throughputFactor() * double(accuSize) / double(usecondsForLumi);
497  //store to registered variable
498  fmt_->m_data.fastThroughputJ_.value() = throughput;
499 
500  //update
501  doSnapshot(lumi, true);
502 
503  //retrieve one result we need (todo: sanity check if it's found)
504  IntJ* lumiProcessedJptr = dynamic_cast<IntJ*>(fmt_->jsonMonitor_->getMergedIntJForLumi("Processed", lumi));
505  if (!lumiProcessedJptr)
506  throw cms::Exception("FastMonitoringService") << "Internal error: got null pointer from FastMonitor";
507  processedEventsPerLumi_[lumi] = std::pair<unsigned int, bool>(lumiProcessedJptr->value(), false);
508 
509  //checking if exception has been thrown (in case of Global/Stream early termination, for this LS)
510  bool exception_detected = exception_detected_;
511  for (auto ex : exceptionInLS_)
512  if (lumi == ex)
513  exception_detected = true;
514 
515  if (edm::shutdown_flag || exception_detected) {
516  edm::LogInfo("FastMonitoringService")
517  << "Run interrupted. Skip writing EoL information -: " << processedEventsPerLumi_[lumi].first
518  << " events were processed in LUMI " << lumi;
519  //this will prevent output modules from producing json file for possibly incomplete lumi
520  processedEventsPerLumi_[lumi].first = 0;
521  processedEventsPerLumi_[lumi].second = true;
522  //disable this exception, so service can be used standalone (will be thrown if output module asks for this information)
523  //throw cms::Exception("FastMonitoringService") << "SOURCE did not send update for lumi block. LUMI -:" << lumi;
524  return;
525  }
526 
527  if (inputSource_) {
528  auto sourceReport = inputSource_->getEventReport(lumi, true);
529  if (sourceReport.first) {
530  if (sourceReport.second != processedEventsPerLumi_[lumi].first) {
531  throw cms::Exception("FastMonitoringService") << "MISMATCH with SOURCE update. LUMI -: " << lumi
532  << ", events(processed):" << processedEventsPerLumi_[lumi].first
533  << " events(source):" << sourceReport.second;
534  }
535  }
536  }
537  edm::LogInfo("FastMonitoringService")
538  << "Statistics for lumisection -: lumi = " << lumi << " events = " << lumiProcessedJptr->value()
539  << " time = " << usecondsForLumi / 1000000 << " size = " << accuSize << " thr = " << throughput;
540  delete lumiProcessedJptr;
541 
542  //full global and stream merge&output for this lumi
543 
544  // create file name for slow monitoring file
545  bool output = sleepTime_ > 0;
546  if (filePerFwkStream_) {
547  std::stringstream slowFileNameStem;
548  slowFileNameStem << slowName_ << "_ls" << std::setfill('0') << std::setw(4) << lumi << "_pid" << std::setfill('0')
549  << std::setw(5) << getpid();
551  slow /= slowFileNameStem.str();
552  fmt_->jsonMonitor_->outputFullJSONs(slow.string(), ".jsn", lumi, output);
553  } else {
554  std::stringstream slowFileName;
555  slowFileName << slowName_ << "_ls" << std::setfill('0') << std::setw(4) << lumi << "_pid" << std::setfill('0')
556  << std::setw(5) << getpid() << ".jsn";
558  slow /= slowFileName.str();
559  //full global and stream merge and JSON write for this lumi
560  fmt_->jsonMonitor_->outputFullJSON(slow.string(), lumi, output);
561  }
562  fmt_->jsonMonitor_->discardCollected(lumi); //we don't do further updates for this lumi
563  }
564 
566  std::lock_guard<std::mutex> lock(fmt_->monlock_);
567  unsigned int lumi = gc.luminosityBlockID().luminosityBlock();
568  //LS monitoring snapshot with input source data has been taken in previous callback
569  avgLeadTime_.erase(lumi);
571  lockStatsDuringLumi_.erase(lumi);
572 
573  //output module already used this in end lumi (this could be migrated to EvFDaqDirector as it is essential for FFF bookkeeping)
575  }
576 
578  unsigned int sid = sc.streamID().value();
579 
580  std::lock_guard<std::mutex> lock(fmt_->monlock_);
581  fmt_->m_data.streamLumi_[sid] = sc.eventID().luminosityBlock();
582 
583  //reset collected values for this stream
584  *(fmt_->m_data.processed_[sid]) = 0;
585 
586  fmt_->m_data.ministate_[sid] = &nopath_;
587  fmt_->m_data.microstate_[sid] = &reservedMicroStateNames[FastMonState::mBoL];
588  }
589 
591  fmt_->m_data.microstate_[sc.streamID().value()] = &reservedMicroStateNames[FastMonState::mIdle];
592  }
593 
595  unsigned int sid = sc.streamID().value();
596  std::lock_guard<std::mutex> lock(fmt_->monlock_);
597 
598  //update processed count to be complete at this time
599  //doStreamEOLSnapshot(sc.eventID().luminosityBlock(), sid);
600  fmt_->jsonMonitor_->snapStreamAtomic(sc.eventID().luminosityBlock(), sid);
601  //reset this in case stream does not get notified of next lumi (we keep processed events only)
602  fmt_->m_data.ministate_[sid] = &nopath_;
603  fmt_->m_data.microstate_[sid] = &reservedMicroStateNames[FastMonState::mEoL];
604  }
606  fmt_->m_data.microstate_[sc.streamID().value()] = &reservedMicroStateNames[FastMonState::mFwkEoL];
607  }
608 
610  fmt_->m_data.ministate_[sc.streamID()] = &(pc.pathName());
611  }
612 
614 
616  fmt_->m_data.microstate_[sc.streamID()] = &reservedMicroStateNames[FastMonState::mIdle];
617 
618  fmt_->m_data.ministate_[sc.streamID()] = &nopath_;
619 
620  (*(fmt_->m_data.processed_[sc.streamID()]))++;
621 
622  //fast path counter (events accumulated in a run)
623  unsigned long res = totalEventsProcessed_.fetch_add(1, std::memory_order_relaxed);
624  fmt_->m_data.fastPathProcessedJ_ = res + 1;
625  }
626 
628  fmt_->m_data.microstate_[sid.value()] = &reservedMicroStateNames[FastMonState::mInput];
629  }
630 
632  fmt_->m_data.microstate_[sid.value()] = &reservedMicroStateNames[FastMonState::mFwkOvhSrc];
633  }
634 
636  edm::ModuleCallingContext const& mcc) {
637  fmt_->m_data.microstate_[sc.streamID().value()] = (void*)(mcc.moduleDescription());
638  }
639 
641  edm::ModuleCallingContext const& mcc) {
642  //fmt_->m_data.microstate_[sc.streamID().value()] = (void*)(mcc.moduleDescription());
643  fmt_->m_data.microstateAcqFlag_[sc.streamID().value()] = 1;
644  }
645 
647  fmt_->m_data.microstate_[sc.streamID().value()] = (void*)(mcc.moduleDescription());
648  fmt_->m_data.microstateAcqFlag_[sc.streamID().value()] = 0;
649  }
650 
652  fmt_->m_data.microstate_[sc.streamID().value()] = &reservedMicroStateNames[FastMonState::mFwkOvhMod];
653  }
654 
655  //FUNCTIONS CALLED FROM OUTSIDE
656 
657  //this is for old-fashioned service that is not thread safe and can block other streams
658  //(we assume the worst case - everything is blocked)
660  for (unsigned int i = 0; i < nStreams_; i++)
661  fmt_->m_data.microstate_[i] = &reservedMicroStateNames[m];
662  }
663 
664  //this is for services that are multithreading-enabled or rarely blocks other streams
666  fmt_->m_data.microstate_[sid] = &reservedMicroStateNames[m];
667  }
668 
669  //from source
670  void FastMonitoringService::accumulateFileSize(unsigned int lumi, unsigned long fileSize) {
671  std::lock_guard<std::mutex> lock(fmt_->monlock_);
672 
673  if (accuSize_.find(lumi) == accuSize_.end())
674  accuSize_[lumi] = fileSize;
675  else
676  accuSize_[lumi] += fileSize;
677 
680  else
682  }
683 
685  gettimeofday(&fileLookStart_, nullptr);
686  /*
687  std::cout << "Started looking for .raw file at: s=" << fileLookStart_.tv_sec << ": ms = "
688  << fileLookStart_.tv_usec / 1000.0 << std::endl;
689  */
690  }
691 
693  gettimeofday(&fileLookStop_, nullptr);
694  /*
695  std::cout << "Stopped looking for .raw file at: s=" << fileLookStop_.tv_sec << ": ms = "
696  << fileLookStop_.tv_usec / 1000.0 << std::endl;
697  */
698  std::lock_guard<std::mutex> lock(fmt_->monlock_);
699 
700  if (lumi > lumiFromSource_) {
702  leadTimes_.clear();
703  }
704  unsigned long elapsedTime = (fileLookStop_.tv_sec - fileLookStart_.tv_sec) * 1000000 // sec to us
705  + (fileLookStop_.tv_usec - fileLookStart_.tv_usec); // us
706  // add this to lead times for this lumi
707  leadTimes_.push_back((double)elapsedTime);
708 
709  // recompute average lead time for this lumi
710  if (leadTimes_.size() == 1)
712  else {
713  double totTime = 0;
714  for (unsigned int i = 0; i < leadTimes_.size(); i++)
715  totTime += leadTimes_[i];
716  avgLeadTime_[lumi] = 0.001 * (totTime / leadTimes_.size());
717  }
718  }
719 
720  void FastMonitoringService::reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount) {
721  std::lock_guard<std::mutex> lock(fmt_->monlock_);
722  lockStatsDuringLumi_[ls] = std::pair<double, unsigned int>(waitTime, lockCount);
723  }
724 
725  //for the output module
726  unsigned int FastMonitoringService::getEventsProcessedForLumi(unsigned int lumi, bool* abortFlag) {
727  std::lock_guard<std::mutex> lock(fmt_->monlock_);
728 
729  auto it = processedEventsPerLumi_.find(lumi);
730  if (it != processedEventsPerLumi_.end()) {
731  unsigned int proc = it->second.first;
732  if (abortFlag)
733  *abortFlag = it->second.second;
734  return proc;
735  } else {
736  throw cms::Exception("FastMonitoringService")
737  << "output module wants already deleted (or never reported by SOURCE) lumisection event count for LUMI -: "
738  << lumi;
739  return 0;
740  }
741  }
742 
743  //for the output module
745  std::lock_guard<std::mutex> lock(fmt_->monlock_);
746 
747  auto it = processedEventsPerLumi_.find(lumi);
748  if (it != processedEventsPerLumi_.end()) {
749  unsigned int abortFlag = it->second.second;
750  return abortFlag;
751  } else {
752  throw cms::Exception("FastMonitoringService")
753  << "output module wants already deleted (or never reported by SOURCE) lumisection status for LUMI -: "
754  << lumi;
755  return false;
756  }
757  }
758 
759  // the function to be called in the thread. Thread completes when function returns.
761  monInit_.exchange(true, std::memory_order_acquire);
762  while (!fmt_->m_stoprequest) {
763  std::vector<std::vector<unsigned int>> lastEnc;
764  {
765  std::lock_guard<std::mutex> lock(fmt_->monlock_);
766 
767  doSnapshot(lastGlobalLumi_, false);
768 
769  lastEnc.emplace_back(fmt_->m_data.ministateEncoded_);
770  lastEnc.emplace_back(fmt_->m_data.microstateEncoded_);
771 
773  if (filePerFwkStream_) {
774  std::vector<std::string> CSVv;
775  for (unsigned int i = 0; i < nStreams_; i++) {
776  CSVv.push_back(fmt_->jsonMonitor_->getCSVString((int)i));
777  }
778  fmt_->monlock_.unlock();
779  for (unsigned int i = 0; i < nStreams_; i++) {
780  if (!CSVv[i].empty())
781  fmt_->jsonMonitor_->outputCSV(fastPathList_[i], CSVv[i]);
782  }
783  } else {
784  std::string CSV = fmt_->jsonMonitor_->getCSVString();
785  //release mutex before writing out fast path file
786  fmt_->monlock_.unlock();
787  if (!CSV.empty())
788  fmt_->jsonMonitor_->outputCSV(fastPath_, CSV);
789  }
790  }
791  snapCounter_++;
792  }
793 
794  std::stringstream accum;
795  std::function<void(std::vector<unsigned int>)> f = [&](std::vector<unsigned int> p) {
796  for (unsigned int i = 0; i < nStreams_; i++) {
797  if (i == 0)
798  accum << "[" << p[i] << ",";
799  else if (i <= nStreams_ - 1)
800  accum << p[i] << ",";
801  else
802  accum << p[i] << "]";
803  }
804  };
805 
806  accum << "Current states: Ms=" << fmt_->m_data.fastMacrostateJ_.value() << " ms=";
807  f(lastEnc[0]);
808  accum << " us=";
809  f(lastEnc[1]);
810  accum << " is=" << inputStateNames[inputState_] << " iss=" << inputStateNames[inputSupervisorState_];
811  edm::LogInfo("FastMonitoringService") << accum.str();
812 
813  ::sleep(sleepTime_);
814  }
815  }
816 
817  void FastMonitoringService::doSnapshot(const unsigned int ls, const bool isGlobalEOL) {
818  // update macrostate
819  fmt_->m_data.fastMacrostateJ_ = fmt_->m_data.macrostate_;
820 
821  std::vector<const void*> microstateCopy(fmt_->m_data.microstate_.begin(), fmt_->m_data.microstate_.end());
822  std::vector<unsigned char> microstateAcqCopy(fmt_->m_data.microstateAcqFlag_.begin(),
823  fmt_->m_data.microstateAcqFlag_.end());
824 
825  if (!isInitTransition_) {
826  auto itd = avgLeadTime_.find(ls);
827  if (itd != avgLeadTime_.end())
828  fmt_->m_data.fastAvgLeadTimeJ_ = itd->second;
829  else
830  fmt_->m_data.fastAvgLeadTimeJ_ = 0.;
831 
832  auto iti = filesProcessedDuringLumi_.find(ls);
833  if (iti != filesProcessedDuringLumi_.end())
834  fmt_->m_data.fastFilesProcessedJ_ = iti->second;
835  else
836  fmt_->m_data.fastFilesProcessedJ_ = 0;
837 
838  auto itrd = lockStatsDuringLumi_.find(ls);
839  if (itrd != lockStatsDuringLumi_.end()) {
840  fmt_->m_data.fastLockWaitJ_ = itrd->second.first;
841  fmt_->m_data.fastLockCountJ_ = itrd->second.second;
842  } else {
843  fmt_->m_data.fastLockWaitJ_ = 0.;
844  fmt_->m_data.fastLockCountJ_ = 0.;
845  }
846  }
847 
848  for (unsigned int i = 0; i < nStreams_; i++) {
849  fmt_->m_data.ministateEncoded_[i] = fmt_->m_data.encPath_[i].encodeString(fmt_->m_data.ministate_[i]);
850  if (microstateAcqCopy[i])
851  fmt_->m_data.microstateEncoded_[i] =
852  fmt_->m_data.microstateBins_ + fmt_->m_data.encModule_.encode(microstateCopy[i]);
853  else
854  fmt_->m_data.microstateEncoded_[i] = fmt_->m_data.encModule_.encode(microstateCopy[i]);
855  }
856 
857  bool inputStatePerThread = false;
858 
860  switch (inputSupervisorState_) {
862  fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_fileLimit;
863  break;
865  fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_waitFreeChunk;
866  break;
868  fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_waitFreeChunkCopying;
869  break;
871  fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_waitFreeThread;
872  break;
874  fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_waitFreeThreadCopying;
875  break;
877  fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_busy;
878  break;
880  fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_lockPolling;
881  break;
883  fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_lockPollingCopying;
884  break;
886  fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_runEnd;
887  break;
889  fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_noFile;
890  break;
892  fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_newFile;
893  break;
896  break;
898  fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_newFileWaitThread;
899  break;
902  break;
904  fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_newFileWaitChunk;
905  break;
906  default:
907  fmt_->m_data.inputState_[0] = FastMonState::inWaitInput;
908  }
909  } else if (inputState_ == FastMonState::inWaitChunk) {
910  switch (inputSupervisorState_) {
912  fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_fileLimit;
913  break;
915  fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_waitFreeChunk;
916  break;
918  fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_waitFreeChunkCopying;
919  break;
921  fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_waitFreeThread;
922  break;
924  fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_waitFreeThreadCopying;
925  break;
927  fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_busy;
928  break;
930  fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_lockPolling;
931  break;
933  fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_lockPollingCopying;
934  break;
936  fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_runEnd;
937  break;
939  fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_noFile;
940  break;
942  fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_newFile;
943  break;
946  break;
948  fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_newFileWaitThread;
949  break;
952  break;
954  fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_newFileWaitChunk;
955  break;
956  default:
957  fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk;
958  }
959  } else if (inputState_ == FastMonState::inNoRequest) {
960  inputStatePerThread = true;
961  for (unsigned int i = 0; i < nStreams_; i++) {
962  if (microstateCopy[i] == &reservedMicroStateNames[FastMonState::mIdle])
963  fmt_->m_data.inputState_[i] = FastMonState::inNoRequestWithIdleThreads;
964  else if (microstateCopy[i] == &reservedMicroStateNames[FastMonState::mEoL] ||
965  microstateCopy[i] == &reservedMicroStateNames[FastMonState::mFwkEoL])
966  fmt_->m_data.inputState_[i] = FastMonState::inNoRequestWithEoLThreads;
967  else
968  fmt_->m_data.inputState_[i] = FastMonState::inNoRequest;
969  }
970  } else if (inputState_ == FastMonState::inNewLumi) {
971  inputStatePerThread = true;
972  for (unsigned int i = 0; i < nStreams_; i++) {
973  if (microstateCopy[i] == &reservedMicroStateNames[FastMonState::mEoL] ||
974  microstateCopy[i] == &reservedMicroStateNames[FastMonState::mFwkEoL])
975  fmt_->m_data.inputState_[i] = FastMonState::inNewLumi;
976  }
978  //apply directly throttled state from supervisor
979  fmt_->m_data.inputState_[0] = inputSupervisorState_;
980  } else
981  fmt_->m_data.inputState_[0] = inputState_;
982 
983  //this is same for all streams
984  if (!inputStatePerThread)
985  for (unsigned int i = 1; i < nStreams_; i++)
986  fmt_->m_data.inputState_[i] = fmt_->m_data.inputState_[0];
987 
988  if (isGlobalEOL) { //only update global variables
989  fmt_->jsonMonitor_->snapGlobal(ls);
990  } else
991  fmt_->jsonMonitor_->snap(ls);
992  }
993 
994  //compatibility
996 
998 
999 } //end namespace evf
evf::FastMonitoringService::workingDirectory_
std::filesystem::path workingDirectory_
Definition: FastMonitoringService.h:273
evf::FastMonitoringService::preGlobalEndLumi
void preGlobalEndLumi(edm::GlobalContext const &)
Definition: FastMonitoringService.cc:482
ConfigurationDescriptions.h
edm::GlobalContext::luminosityBlockID
LuminosityBlockID const & luminosityBlockID() const
Definition: GlobalContext.h:60
edm::StreamID
Definition: StreamID.h:30
evf::FastMonState::inSupWaitFreeThreadCopying
Definition: FastMonitoringService.h:107
eostools.ls
def ls(path, rec=False)
Definition: eostools.py:349
evf::FastMonState::inWaitChunk_lockPolling
Definition: FastMonitoringService.h:140
edm::ActivityRegistry::watchPostStreamEndLumi
void watchPostStreamEndLumi(PostStreamEndLumi::slot_type const &iSlot)
Definition: ActivityRegistry.h:466
edm::ActivityRegistry::watchPreSourceEarlyTermination
void watchPreSourceEarlyTermination(PreSourceEarlyTermination::slot_type const &iSlot)
Definition: ActivityRegistry.h:517
evf::FastMonitoringService::exception_detected_
bool exception_detected_
Definition: FastMonitoringService.h:287
ModuleCallingContext.h
electrons_cff.bool
bool
Definition: electrons_cff.py:366
edm::TerminationOrigin::ExceptionFromThisContext
mps_fire.i
i
Definition: mps_fire.py:428
evf::FastMonitoringService::lockStatsDuringLumi_
std::map< unsigned int, std::pair< double, unsigned int > > lockStatsDuringLumi_
Definition: FastMonitoringService.h:261
evf::FastMonitoringService::preStreamEndLumi
void preStreamEndLumi(edm::StreamContext const &)
Definition: FastMonitoringService.cc:594
MessageLogger.h
evf::FastMonState::inSupNewFileWaitChunkCopying
Definition: FastMonitoringService.h:115
funct::false
false
Definition: Factorize.h:29
evf::FastMonitoringService::fastMonIntervals_
unsigned int fastMonIntervals_
Definition: FastMonitoringService.h:239
evf::FastMonitoringService::runDirectory_
std::filesystem::path runDirectory_
Definition: FastMonitoringService.h:273
evf::FastMonState::inWaitChunk
Definition: FastMonitoringService.h:92
evf::FastMonitoringService::doSnapshot
void doSnapshot(const unsigned int ls, const bool isGlobalEOL)
Definition: FastMonitoringService.cc:817
evf::FastMonState::inSupThrottled
Definition: FastMonitoringService.h:149
evf::FastMonState::inNoRequestWithIdleThreads
Definition: FastMonitoringService.h:99
evf::FastMonitoringService::preSourceEvent
void preSourceEvent(edm::StreamID)
Definition: FastMonitoringService.cc:627
f
double f[11][100]
Definition: MuScleFitUtils.cc:78
evf::nSpecialModules
constexpr int nSpecialModules
Definition: FastMonitoringThread.h:17
evf::FastMonitoringService::getEventsProcessedForLumi
unsigned int getEventsProcessedForLumi(unsigned int lumi, bool *abortFlag=nullptr)
Definition: FastMonitoringService.cc:726
convertSQLitetoXML_cfg.output
output
Definition: convertSQLitetoXML_cfg.py:72
evf::FastMonitoringService::processedEventsPerLumi_
std::map< unsigned int, std::pair< unsigned int, bool > > processedEventsPerLumi_
Definition: FastMonitoringService.h:264
evf::FastMonState::inSupNewFileWaitThread
Definition: FastMonitoringService.h:114
Json::arrayValue
array value (ordered list)
Definition: value.h:30
edm::ProcessContext
Definition: ProcessContext.h:27
evf::FastMonState::inNoRequest
Definition: FastMonitoringService.h:98
evf::FastMonState::inSupNewFileWaitChunk
Definition: FastMonitoringService.h:116
edm::TerminationOrigin::ExternalSignal
evf::FastMonitoringService::inputLegendFileJson_
std::string inputLegendFileJson_
Definition: FastMonitoringService.h:283
edm::ActivityRegistry::watchPostGlobalEndLumi
void watchPostGlobalEndLumi(PostGlobalEndLumi::slot_type const &iSlot)
Definition: ActivityRegistry.h:426
evf::FastMonitoringService::reservedMicroStateNames
static const edm::ModuleDescription reservedMicroStateNames[FastMonState::mCOUNT]
Definition: FastMonitoringService.h:158
edm::ActivityRegistry::watchJobFailure
void watchJobFailure(JobFailure::slot_type const &iSlot)
convenience function for attaching to signal
Definition: ActivityRegistry.h:178
edm::ParameterSetDescription
Definition: ParameterSetDescription.h:52
if
if(0==first)
Definition: CAHitNtupletGeneratorKernelsImpl.h:58
to
evf::FastMonitoringService::moduleLegendFileJson_
std::string moduleLegendFileJson_
Definition: FastMonitoringService.h:280
evf::FastMonState::inWaitChunk_noFile
Definition: FastMonitoringService.h:143
cscNeutronWriter_cfi.writer
writer
Definition: cscNeutronWriter_cfi.py:6
evf::FastMonitoringService::fastPath_
std::string fastPath_
Definition: FastMonitoringService.h:242
evf::FastMonitoringService::preSourceEarlyTermination
void preSourceEarlyTermination(edm::TerminationOrigin)
Definition: FastMonitoringService.cc:405
evf::FastMonitoringThread
Definition: FastMonitoringThread.h:119
evf::FastMonState::sRunning
Definition: FastMonitoringService.h:71
evf::nReservedPaths
constexpr int nReservedPaths
Definition: FastMonitoringThread.h:18
evf::FastMonitoringService::exceptionInLS_
std::vector< unsigned int > exceptionInLS_
Definition: FastMonitoringService.h:288
throughputFactor
constexpr double throughputFactor()
Definition: FastMonitoringService.cc:29
edm::StreamID::value
unsigned int value() const
Definition: StreamID.h:43
evf::FastMonitoringService::postBeginJob
void postBeginJob()
Definition: FastMonitoringService.cc:444
edm::ModuleCallingContext::moduleDescription
ModuleDescription const * moduleDescription() const
Definition: ModuleCallingContext.h:50
edm::LogInfo
Log< level::Info, false > LogInfo
Definition: MessageLogger.h:125
evf::FastMonState::inWaitInput_lockPollingCopying
Definition: FastMonitoringService.h:125
evf::FastMonState::inWaitChunk_newFileWaitThreadCopying
Definition: FastMonitoringService.h:145
edm::EventID::luminosityBlock
LuminosityBlockNumber_t luminosityBlock() const
Definition: EventID.h:39
edm::LogWarning
Log< level::Warning, false > LogWarning
Definition: MessageLogger.h:122
evf::FastMonState::inSupWaitFreeThread
Definition: FastMonitoringService.h:106
evf::FastMonState::sError
Definition: FastMonitoringService.h:76
FileIO.h
FedRawDataInputSource::getEventReport
std::pair< bool, unsigned int > getEventReport(unsigned int lumi, bool erase)
Definition: FedRawDataInputSource.cc:1529
edm::ModuleDescription
Definition: ModuleDescription.h:21
evf::FastMonState::sJobReady
Definition: FastMonitoringService.h:69
evf::FastMonState::inWaitChunk_runEnd
Definition: FastMonitoringService.h:142
Utilities.operator
operator
Definition: Utilities.py:24
evf::FastMonitoringService::nThreads_
unsigned int nThreads_
Definition: FastMonitoringService.h:237
evf::FastMonState::inWaitInput_noFile
Definition: FastMonitoringService.h:127
evf::FastMonitoringService::nStreams_
unsigned int nStreams_
Definition: FastMonitoringService.h:236
evf::FastMonState::mFwkEoL
Definition: FastMonitoringService.h:58
edm::ActivityRegistry::watchPostSourceEvent
void watchPostSourceEvent(PostSourceEvent::slot_type const &iSlot)
Definition: ActivityRegistry.h:190
evf::FastMonState::inWaitInput_lockPolling
Definition: FastMonitoringService.h:124
ModuleDescription.h
evf::FastMonitoringService::accumulateFileSize
void accumulateFileSize(unsigned int lumi, unsigned long fileSize)
Definition: FastMonitoringService.cc:670
visDQMUpload.context
context
Definition: visDQMUpload.py:37
evf::FastMonState::inWaitInput_waitFreeChunk
Definition: FastMonitoringService.h:119
evf::FastMonState::inWaitChunk_fileLimit
Definition: FastMonitoringService.h:134
BXlumiParameters_cfi.lumi
lumi
Definition: BXlumiParameters_cfi.py:6
evf::FastMonitoringService::inputSource_
FedRawDataInputSource * inputSource_
Definition: FastMonitoringService.h:232
evf::FastMonitoringService::postStreamBeginLumi
void postStreamBeginLumi(edm::StreamContext const &)
Definition: FastMonitoringService.cc:590
evf::FastMonState::inNoRequestWithEoLThreads
Definition: FastMonitoringService.h:101
evf::FastMonitoringService::fastPathList_
std::vector< std::string > fastPathList_
Definition: FastMonitoringService.h:289
edm::ActivityRegistry::watchPostEndJob
void watchPostEndJob(PostEndJob::slot_type const &iSlot)
Definition: ActivityRegistry.h:170
edm::ActivityRegistry::watchPreStreamBeginLumi
void watchPreStreamBeginLumi(PreStreamBeginLumi::slot_type const &iSlot)
Definition: ActivityRegistry.h:447
evf::FastMonState::inWaitChunk_waitFreeChunkCopying
Definition: FastMonitoringService.h:136
evf::FastMonitoringService::fastName_
std::string fastName_
Definition: FastMonitoringService.h:242
edm::ConfigurationDescriptions::add
void add(std::string const &label, ParameterSetDescription const &psetDescription)
Definition: ConfigurationDescriptions.cc:57
edm::StreamContext
Definition: StreamContext.h:31
evf::FastMonitoringService::lumiStartTime_
std::map< unsigned int, timeval > lumiStartTime_
Definition: FastMonitoringService.h:247
evf::FastMonitoringService::preGlobalBeginLumi
void preGlobalBeginLumi(edm::GlobalContext const &)
Definition: FastMonitoringService.cc:472
evf::FastMonState::inWaitInput
Definition: FastMonitoringService.h:86
evf::FastMonState::inNewLumi
Definition: FastMonitoringService.h:87
edm::ActivityRegistry::watchPreBeginJob
void watchPreBeginJob(PreBeginJob::slot_type const &iSlot)
convenience function for attaching to signal
Definition: ActivityRegistry.h:151
Service.h
visualization-live-secondInstance_cfg.m
m
Definition: visualization-live-secondInstance_cfg.py:78
evf::FastMonState::inWaitChunk_lockPollingCopying
Definition: FastMonitoringService.h:141
evf::FastMonState::mCOUNT
Definition: FastMonitoringService.h:64
evf::FastMonState::inWaitInput_busy
Definition: FastMonitoringService.h:123
edm::ActivityRegistry::watchPreEvent
void watchPreEvent(PreEvent::slot_type const &iSlot)
Definition: ActivityRegistry.h:474
evf::FastMonitoringService::fileLookStart_
timeval fileLookStart_
Definition: FastMonitoringService.h:248
edm::ActivityRegistry
Definition: ActivityRegistry.h:134
evf::FastMonitoringService::preStreamBeginLumi
void preStreamBeginLumi(edm::StreamContext const &)
Definition: FastMonitoringService.cc:577
evf::FastMonitoringService::postModuleEvent
void postModuleEvent(edm::StreamContext const &, edm::ModuleCallingContext const &)
Definition: FastMonitoringService.cc:651
evf::nReservedModules
constexpr int nReservedModules
Definition: FastMonitoringThread.h:16
evf::FastMonState::inSupFileLimit
Definition: FastMonitoringService.h:103
edm::ActivityRegistry::watchPostBeginJob
void watchPostBeginJob(PostBeginJob::slot_type const &iSlot)
convenience function for attaching to signal
Definition: ActivityRegistry.h:158
evf::FastMonState::inWaitInput_fileLimit
Definition: FastMonitoringService.h:118
ParameterSetDescription.h
Json::StyledWriter
Writes a Value in JSON format in a human friendly way.
Definition: writer.h:63
evf::FastMonState::inSupBusy
Definition: FastMonitoringService.h:108
evf::FastMonState::inWaitInput_runEnd
Definition: FastMonitoringService.h:126
evf::FastMonState::inWaitInput_waitFreeChunkCopying
Definition: FastMonitoringService.h:120
evf::FastMonState::Microstate
Microstate
Definition: FastMonitoringService.h:53
edm::ConfigurationDescriptions
Definition: ConfigurationDescriptions.h:28
evf::FastMonitoringService::postModuleEventAcquire
void postModuleEventAcquire(edm::StreamContext const &, edm::ModuleCallingContext const &)
Definition: FastMonitoringService.cc:640
evf::FastMonState::inWaitChunk_newFileWaitChunk
Definition: FastMonitoringService.h:148
evf::FastMonitoringService::fillDescriptions
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
Definition: FastMonitoringService.cc:188
evf::FastMonitoringService::postGlobalEndLumi
void postGlobalEndLumi(edm::GlobalContext const &)
Definition: FastMonitoringService.cc:565
evf::MicroStateService::MicroStateService
MicroStateService(const edm::ParameterSet &, edm::ActivityRegistry &)
Definition: FastMonitoringService.cc:995
evf::FastMonitoringService::getAbortFlagForLumi
bool getAbortFlagForLumi(unsigned int lumi)
Definition: FastMonitoringService.cc:744
UnixSignalHandlers.h
evf::FastMonState::inSupNewFile
Definition: FastMonitoringService.h:112
evf::FastMonitoringService::~FastMonitoringService
~FastMonitoringService() override
Definition: FastMonitoringService.cc:186
edm::GlobalContext
Definition: GlobalContext.h:29
edm::service::SystemBounds
Definition: SystemBounds.h:29
evf::FastMonState::mEoL
Definition: FastMonitoringService.h:62
LogDebug
#define LogDebug(id)
Definition: MessageLogger.h:233
edm::ActivityRegistry::watchPostEvent
void watchPostEvent(PostEvent::slot_type const &iSlot)
Definition: ActivityRegistry.h:480
edm::ParameterSet
Definition: ParameterSet.h:47
edm::service::SystemBounds::maxNumberOfThreads
unsigned int maxNumberOfThreads() const
Definition: SystemBounds.h:38
Json::Value::append
Value & append(const Value &value)
Append value to array at the end.
ValidateTausOnZEEFastSim_cff.proc
proc
Definition: ValidateTausOnZEEFastSim_cff.py:6
evf::FastMonitoringService::streamCounterUpdating_
std::vector< std::atomic< bool > * > streamCounterUpdating_
Definition: FastMonitoringService.h:268
edm::PathsAndConsumesOfModulesBase::endPaths
std::vector< std::string > const & endPaths() const
Definition: PathsAndConsumesOfModulesBase.h:40
GlobalContext.h
AlCaHLTBitMon_ParallelJobs.p
def p
Definition: AlCaHLTBitMon_ParallelJobs.py:153
evf::FastMonState::inWaitChunk_busy
Definition: FastMonitoringService.h:139
evf::FastMonState::inSupLockPolling
Definition: FastMonitoringService.h:109
Event.h
evf::FastMonitoringService::preModuleEvent
void preModuleEvent(edm::StreamContext const &, edm::ModuleCallingContext const &)
Definition: FastMonitoringService.cc:646
CommonMethods.lock
def lock()
Definition: CommonMethods.py:81
edm::StreamContext::streamID
StreamID const & streamID() const
Definition: StreamContext.h:54
edm::PathContext
Definition: PathContext.h:24
edm::ActivityRegistry::watchPreStreamEndLumi
void watchPreStreamEndLumi(PreStreamEndLumi::slot_type const &iSlot)
Definition: ActivityRegistry.h:461
evf::FastMonState::inWaitChunk_waitFreeChunk
Definition: FastMonitoringService.h:135
evf::FastMonState::inWaitInput_waitFreeThread
Definition: FastMonitoringService.h:121
edm::shutdown_flag
volatile std::atomic< bool > shutdown_flag
Definition: UnixSignalHandlers.cc:22
evf::FastMonitoringService::lumiFromSource_
unsigned int lumiFromSource_
Definition: FastMonitoringService.h:252
edm::ActivityRegistry::watchPreStreamEarlyTermination
void watchPreStreamEarlyTermination(PreStreamEarlyTermination::slot_type const &iSlot)
Definition: ActivityRegistry.h:499
evf::FastMonState::inWaitChunk_waitFreeThread
Definition: FastMonitoringService.h:137
evf::FastMonitoringService::pathLegendFile_
std::string pathLegendFile_
Definition: FastMonitoringService.h:281
evf::FastMonState::MCOUNT
Definition: FastMonitoringService.h:80
evf::FastMonitoringService::avgLeadTime_
std::map< unsigned int, double > avgLeadTime_
Definition: FastMonitoringService.h:256
evf::FastMonitoringService::postSourceEvent
void postSourceEvent(edm::StreamID)
Definition: FastMonitoringService.cc:631
edm::Service
Definition: Service.h:30
evf::FastMonState::inWaitInput_newFile
Definition: FastMonitoringService.h:128
createfilelist.int
int
Definition: createfilelist.py:10
EvFDaqDirector.h
TerminationOrigin
FastMonitoringService.h
evf::FastMonitoringService::monInit_
std::atomic< bool > monInit_
Definition: FastMonitoringService.h:286
edm::LuminosityBlockID::luminosityBlock
LuminosityBlockNumber_t luminosityBlock() const
Definition: LuminosityBlockID.h:42
evf::FastMonitoringService::setMicroState
void setMicroState(FastMonState::Microstate)
Definition: FastMonitoringService.cc:659
evf::FastMonState::inSupWaitFreeChunk
Definition: FastMonitoringService.h:104
edm::service::SystemBounds::maxNumberOfStreams
unsigned int maxNumberOfStreams() const
Definition: SystemBounds.h:35
evf::FastMonitoringService::preModuleBeginJob
void preModuleBeginJob(edm::ModuleDescription const &)
Definition: FastMonitoringService.cc:429
evf::FastMonState::inWaitInput_newFileWaitThread
Definition: FastMonitoringService.h:130
evf::FastMonitoringService::lastGlobalLumi_
unsigned int lastGlobalLumi_
Definition: FastMonitoringService.h:250
evf::FastMonitoringService::jobFailure
void jobFailure()
Definition: FastMonitoringService.cc:426
evf::FastMonitoringService::prePathEvent
void prePathEvent(edm::StreamContext const &, edm::PathContext const &)
Definition: FastMonitoringService.cc:609
evf::FastMonitoringService::nOutputModules_
unsigned int nOutputModules_
Definition: FastMonitoringService.h:284
evf::FastMonState::mFwkOvhSrc
Definition: FastMonitoringService.h:56
evf::FastMonitoringService::inputState_
std::atomic< FastMonState::InputState > inputState_
Definition: FastMonitoringService.h:233
PathContext.h
res
Definition: Electron.h:6
evf::FastMonitoringService::slowName_
std::string slowName_
Definition: FastMonitoringService.h:242
evf::FastMonitoringService::postGlobalBeginRun
void postGlobalBeginRun(edm::GlobalContext const &)
Definition: FastMonitoringService.cc:467
evf::FastMonitoringService::preEvent
void preEvent(edm::StreamContext const &)
Definition: FastMonitoringService.cc:613
edm::PathContext::pathName
std::string const & pathName() const
Definition: PathContext.h:30
AlCaHLTBitMon_QueryRunRegistry.string
string string
Definition: AlCaHLTBitMon_QueryRunRegistry.py:256
FedRawDataInputSource.h
evf::FastMonitoringService::reportLockWait
void reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount)
Definition: FastMonitoringService.cc:720
jsoncollector::IntJ::value
long & value()
Definition: JsonMonitorable.h:88
jsoncollector
Definition: DataPoint.h:26
evf::FastMonState::inWaitChunk_waitFreeThreadCopying
Definition: FastMonitoringService.h:138
evf::FastMonitoringService::postEvent
void postEvent(edm::StreamContext const &)
Definition: FastMonitoringService.cc:615
evf::FastMonState::inSupNoFile
Definition: FastMonitoringService.h:111
evf::FastMonState::sJobEnded
Definition: FastMonitoringService.h:75
evf::FastMonitoringService::isInitTransition_
std::atomic< bool > isInitTransition_
Definition: FastMonitoringService.h:251
edm::ActivityRegistry::watchPreGlobalEndLumi
void watchPreGlobalEndLumi(PreGlobalEndLumi::slot_type const &iSlot)
Definition: ActivityRegistry.h:421
evf::FastMonitoringService::inputSupervisorState_
std::atomic< FastMonState::InputState > inputSupervisorState_
Definition: FastMonitoringService.h:234
edm::ActivityRegistry::watchPreallocate
void watchPreallocate(Preallocate::slot_type const &iSlot)
Definition: ActivityRegistry.h:144
edm::ActivityRegistry::watchPreGlobalEarlyTermination
void watchPreGlobalEarlyTermination(PreGlobalEarlyTermination::slot_type const &iSlot)
Definition: ActivityRegistry.h:508
evf::FastMonitoringService::fmt_
std::shared_ptr< FastMonitoringThread > fmt_
Definition: FastMonitoringService.h:229
edm::ActivityRegistry::watchPostStreamBeginLumi
void watchPostStreamBeginLumi(PostStreamBeginLumi::slot_type const &iSlot)
Definition: ActivityRegistry.h:454
submitPVResolutionJobs.desc
string desc
Definition: submitPVResolutionJobs.py:251
evf::FastMonitoringService::nopath_
static const std::string nopath_
Definition: FastMonitoringService.h:162
evf::FastMonitoringService::fileLookStop_
timeval fileLookStop_
Definition: FastMonitoringService.h:248
edm::ActivityRegistry::watchPreGlobalBeginLumi
void watchPreGlobalBeginLumi(PreGlobalBeginLumi::slot_type const &iSlot)
Definition: ActivityRegistry.h:407
evf::FastMonState::inWaitInput_newFileWaitChunk
Definition: FastMonitoringService.h:132
evf::FastMonitoringService::makeInputLegendaJson
std::string makeInputLegendaJson()
Definition: FastMonitoringService.cc:233
edm::TerminationOrigin::ExceptionFromAnotherContext
edm::ActivityRegistry::watchPreModuleBeginJob
void watchPreModuleBeginJob(PreModuleBeginJob::slot_type const &iSlot)
Definition: ActivityRegistry.h:749
evf::FastMonitoringService::preGlobalEarlyTermination
void preGlobalEarlyTermination(edm::GlobalContext const &, edm::TerminationOrigin)
Definition: FastMonitoringService.cc:390
evf::FastMonitoringService::inputStateNames
static const std::string inputStateNames[FastMonState::inCOUNT]
Definition: FastMonitoringService.h:160
edm::ActivityRegistry::watchPostModuleEvent
void watchPostModuleEvent(PostModuleEvent::slot_type const &iSlot)
Definition: ActivityRegistry.h:799
edm::PathsAndConsumesOfModulesBase::paths
std::vector< std::string > const & paths() const
Definition: PathsAndConsumesOfModulesBase.h:39
evf::FastMonState::mBoL
Definition: FastMonitoringService.h:61
edm::PathsAndConsumesOfModulesBase
Definition: PathsAndConsumesOfModulesBase.h:35
evf::FastMonitoringService::accuSize_
std::map< unsigned int, unsigned long > accuSize_
Definition: FastMonitoringService.h:259
relativeConstraints.empty
bool empty
Definition: relativeConstraints.py:46
edm::ActivityRegistry::watchPostModuleEventAcquire
void watchPostModuleEventAcquire(PostModuleEventAcquire::slot_type const &iSlot)
Definition: ActivityRegistry.h:813
evf::FastMonitoringService::makePathLegendaJson
std::string makePathLegendaJson()
Definition: FastMonitoringService.cc:200
Exception
Definition: hltDiff.cc:245
edm::ActivityRegistry::watchPreSourceEvent
void watchPreSourceEvent(PreSourceEvent::slot_type const &iSlot)
Definition: ActivityRegistry.h:184
evf::FastMonitoringService::threadIDAvailable_
bool threadIDAvailable_
Definition: FastMonitoringService.h:275
evf
Definition: fillJson.h:27
evf::FastMonitoringService::filePerFwkStream_
bool filePerFwkStream_
Definition: FastMonitoringService.h:243
HiBiasedCentrality_cfi.function
function
Definition: HiBiasedCentrality_cfi.py:4
evf::FastMonitoringService::startedLookingForFile
void startedLookingForFile()
Definition: FastMonitoringService.cc:684
FastMonitoringThread.h
evf::FastMonState::inSupNewFileWaitThreadCopying
Definition: FastMonitoringService.h:113
evf::FastMonitoringService::makeModuleLegendaJson
std::string makeModuleLegendaJson()
Definition: FastMonitoringService.cc:212
evf::FastMonState::mIdle
Definition: FastMonitoringService.h:55
evf::FastMonState::sInit
Definition: FastMonitoringService.h:68
evf::FastMonitoringService::preallocate
void preallocate(edm::service::SystemBounds const &)
Definition: FastMonitoringService.cc:243
evf::FastMonState::inCOUNT
Definition: FastMonitoringService.h:151
evf::FastMonitoringService::setExceptionDetected
void setExceptionDetected(unsigned int ls)
Definition: FastMonitoringService.cc:419
jsoncollector::IntJ
Definition: JsonMonitorable.h:66
funct::void
TEMPL(T2) struct Divides void
Definition: Factorize.h:24
evf::FastMonState::inRunEnd
Definition: FastMonitoringService.h:90
evf::FastMonitoringService::pathLegendFileJson_
std::string pathLegendFileJson_
Definition: FastMonitoringService.h:282
evf::FastMonitoringService::totalEventsProcessed_
std::atomic< unsigned long > totalEventsProcessed_
Definition: FastMonitoringService.h:277
evf::FastMonState::inWaitChunk_newFile
Definition: FastMonitoringService.h:144
SiStripCommissioningSource_FromEDM_cfg.FastMonitoringService
FastMonitoringService
Definition: SiStripCommissioningSource_FromEDM_cfg.py:49
evf::FastMonState::mInvalid
Definition: FastMonitoringService.h:54
evf::FastMonState::inWaitChunk_newFileWaitThread
Definition: FastMonitoringService.h:146
evf::FastMonState::mFwkOvhMod
Definition: FastMonitoringService.h:57
evf::FastMonState::mInput
Definition: FastMonitoringService.h:59
castor_dqm_sourceclient_file_cfg.path
path
Definition: castor_dqm_sourceclient_file_cfg.py:37
evf::FastMonitoringService::fastMicrostateDefPath_
std::string fastMicrostateDefPath_
Definition: FastMonitoringService.h:241
evf::FastMonState::inSupWaitFreeChunkCopying
Definition: FastMonitoringService.h:105
evf::FastMonState::inWaitInput_newFileWaitChunkCopying
Definition: FastMonitoringService.h:131
evf::FastMonitoringService::leadTimes_
std::vector< double > leadTimes_
Definition: FastMonitoringService.h:260
evf::FastMonitoringService::microstateDefPath_
std::string microstateDefPath_
Definition: FastMonitoringService.h:241
edm::ActivityRegistry::watchPrePathEvent
void watchPrePathEvent(PrePathEvent::slot_type const &iSlot)
Definition: ActivityRegistry.h:486
StreamContext.h
evf::FastMonitoringService::moduleLegendFile_
std::string moduleLegendFile_
Definition: FastMonitoringService.h:279
evf::FastMonitoringService::postStreamEndLumi
void postStreamEndLumi(edm::StreamContext const &)
Definition: FastMonitoringService.cc:605
edm_modernize_messagelogger.stat
stat
Definition: edm_modernize_messagelogger.py:27
evf::FastMonitoringService::filesProcessedDuringLumi_
std::map< unsigned int, unsigned int > filesProcessedDuringLumi_
Definition: FastMonitoringService.h:257
evf::MicroStateService::~MicroStateService
virtual ~MicroStateService()
Definition: FastMonitoringService.cc:997
evf::FastMonitoringService::preStreamEarlyTermination
void preStreamEarlyTermination(edm::StreamContext const &, edm::TerminationOrigin)
Definition: FastMonitoringService.cc:375
evf::FastMonitoringService::snapCounter_
unsigned int snapCounter_
Definition: FastMonitoringService.h:240
evf::FastMonitoringService::stoppedLookingForFile
void stoppedLookingForFile(unsigned int lumi)
Definition: FastMonitoringService.cc:692
evf::FastMonitoringService::postEndJob
void postEndJob()
Definition: FastMonitoringService.cc:462
evf::FastMonitoringService::preBeginJob
void preBeginJob(edm::PathsAndConsumesOfModulesBase const &, edm::ProcessContext const &pc)
Definition: FastMonitoringService.cc:253
edm::ActivityRegistry::watchPreModuleEventAcquire
void watchPreModuleEventAcquire(PreModuleEventAcquire::slot_type const &iSlot)
Definition: ActivityRegistry.h:805
PathsAndConsumesOfModulesBase.h
lumi
Definition: LumiSectionData.h:20
evf::FastMonState::inSupLockPollingCopying
Definition: FastMonitoringService.h:110
evf::FastMonState::inWaitInput_waitFreeThreadCopying
Definition: FastMonitoringService.h:122
SystemBounds.h
evf::FastMonitoringService::preModuleEventAcquire
void preModuleEventAcquire(edm::StreamContext const &, edm::ModuleCallingContext const &)
Definition: FastMonitoringService.cc:635
evf::FastMonitoringService::snapshotRunner
void snapshotRunner()
Definition: FastMonitoringService.cc:760
evf::MicroStateService
Definition: MicroStateService.h:13
evf::FastMonitoringService::sleepTime_
int sleepTime_
Definition: FastMonitoringService.h:238
edm::StreamContext::eventID
EventID const & eventID() const
Definition: StreamContext.h:59
edm::ActivityRegistry::watchPreModuleEvent
void watchPreModuleEvent(PreModuleEvent::slot_type const &iSlot)
Definition: ActivityRegistry.h:793
Json::Value
Represents a JSON value.
Definition: value.h:99
evf::FastMonState::inWaitInput_newFileWaitThreadCopying
Definition: FastMonitoringService.h:129
evf::FastMonState::inWaitChunk_newFileWaitChunkCopying
Definition: FastMonitoringService.h:147
edm::ModuleCallingContext
Definition: ModuleCallingContext.h:29