CMS 3D CMS Logo

FastMonitoringService.cc
Go to the documentation of this file.
3 
9 //#include "FWCore/ServiceRegistry/interface/PathContext.h"
16 
20 
23 
24 #include <iostream>
25 #include <iomanip>
26 #include <sys/time.h>
27 
28 using namespace jsoncollector;
29 
30 constexpr double throughputFactor() { return (1000000) / double(1024 * 1024); }
31 
32 namespace evf {
33 
34  const edm::ModuleDescription FastMonitoringService::specialMicroStateNames[FastMonState::mCOUNT] = {
35  edm::ModuleDescription("Dummy", "Invalid"),
36  edm::ModuleDescription("Dummy", "Idle"),
37  edm::ModuleDescription("Dummy", "FwkOvhSrc"),
38  edm::ModuleDescription("Dummy", "FwkOvhMod"), //set post produce, analyze or filter
39  edm::ModuleDescription("Dummy", "FwkEoL"),
40  edm::ModuleDescription("Dummy", "Input"),
41  edm::ModuleDescription("Dummy", "DQM"),
42  edm::ModuleDescription("Dummy", "BoL"),
43  edm::ModuleDescription("Dummy", "EoL"),
44  edm::ModuleDescription("Dummy", "GlobalEoL"),
45  edm::ModuleDescription("Dummy", "Fwk"),
46  edm::ModuleDescription("Dummy", "IdleSource"),
47  edm::ModuleDescription("Dummy", "Event"),
48  edm::ModuleDescription("Dummy", "Ignore")};
49 
51  return &FastMonitoringService::specialMicroStateNames[FastMonState::mInvalid];
52  }
54  return &FastMonitoringService::specialMicroStateNames[FastMonState::mIdle];
55  }
57  return &FastMonitoringService::specialMicroStateNames[FastMonState::mFwkOvhSrc];
58  }
60  return &FastMonitoringService::specialMicroStateNames[FastMonState::mFwkOvhMod];
61  }
63  return &FastMonitoringService::specialMicroStateNames[FastMonState::mFwkEoL];
64  }
66  return &FastMonitoringService::specialMicroStateNames[FastMonState::mInput];
67  }
69  return &FastMonitoringService::specialMicroStateNames[FastMonState::mDqm];
70  }
72  return &FastMonitoringService::specialMicroStateNames[FastMonState::mBoL];
73  }
75  return &FastMonitoringService::specialMicroStateNames[FastMonState::mEoL];
76  }
78  return &FastMonitoringService::specialMicroStateNames[FastMonState::mGlobEoL];
79  }
81  return &FastMonitoringService::specialMicroStateNames[FastMonState::mFwk];
82  }
84  return &FastMonitoringService::specialMicroStateNames[FastMonState::mIdleSource];
85  }
87  return &FastMonitoringService::specialMicroStateNames[FastMonState::mEvent];
88  }
90  return &FastMonitoringService::specialMicroStateNames[FastMonState::mIgnore];
91  }
92 
93  const std::string FastMonitoringService::macroStateNames[FastMonState::MCOUNT] = {"Init",
94  "JobReady",
95  "RunGiven",
96  "Running",
97  "Stopping",
98  "Done",
99  "JobEnded",
100  "Error",
101  "ErrorEnded",
102  "End",
103  "Invalid"};
104 
105  const std::string FastMonitoringService::inputStateNames[FastMonState::inCOUNT] = {
106  "Ignore",
107  "Init",
108  "WaitInput",
109  "NewLumi",
110  "NewLumiBusyEndingLS",
111  "NewLumiIdleEndingLS",
112  "RunEnd",
113  "ProcessingFile",
114  "WaitChunk",
115  "ChunkReceived",
116  "ChecksumEvent",
117  "CachedEvent",
118  "ReadEvent",
119  "ReadCleanup",
120  "NoRequest",
121  "NoRequestWithIdleThreads",
122  "NoRequestWithGlobalEoL",
123  "NoRequestWithEoLThreads",
124  "SupFileLimit",
125  "SupWaitFreeChunk",
126  "SupWaitFreeChunkCopying",
127  "SupWaitFreeThread",
128  "SupWaitFreeThreadCopying",
129  "SupBusy",
130  "SupLockPolling",
131  "SupLockPollingCopying",
132  "SupNoFile",
133  "SupNewFile",
134  "SupNewFileWaitThreadCopying",
135  "SupNewFileWaitThread",
136  "SupNewFileWaitChunkCopying",
137  "SupNewFileWaitChunk",
138  "WaitInput_fileLimit",
139  "WaitInput_waitFreeChunk",
140  "WaitInput_waitFreeChunkCopying",
141  "WaitInput_waitFreeThread",
142  "WaitInput_waitFreeThreadCopying",
143  "WaitInput_busy",
144  "WaitInput_lockPolling",
145  "WaitInput_lockPollingCopying",
146  "WaitInput_runEnd",
147  "WaitInput_noFile",
148  "WaitInput_newFile",
149  "WaitInput_newFileWaitThreadCopying",
150  "WaitInput_newFileWaitThread",
151  "WaitInput_newFileWaitChunkCopying",
152  "WaitInput_newFileWaitChunk",
153  "WaitChunk_fileLimit",
154  "WaitChunk_waitFreeChunk",
155  "WaitChunk_waitFreeChunkCopying",
156  "WaitChunk_waitFreeThread",
157  "WaitChunk_waitFreeThreadCopying",
158  "WaitChunk_busy",
159  "WaitChunk_lockPolling",
160  "WaitChunk_lockPollingCopying",
161  "WaitChunk_runEnd",
162  "WaitChunk_noFile",
163  "WaitChunk_newFile",
164  "WaitChunk_newFileWaitThreadCopying",
165  "WaitChunk_newFileWaitThread",
166  "WaitChunk_newFileWaitChunkCopying",
167  "WaitChunk_newFileWaitChunk",
168  "inSupThrottled",
169  "inThrottled"};
170 
171  class ConcurrencyTracker : public tbb::task_scheduler_observer {
172  std::atomic<int> num_threads;
173  unsigned max_threads;
174  std::vector<ContainableAtomic<unsigned int>> threadactive_;
175 
176  public:
177  ConcurrencyTracker(unsigned num_expected)
178  : num_threads(), max_threads(num_expected), threadactive_(num_expected, 0) {
179  //set array to if it will not be used
180  //for (unsigned i=0;i<num_expected;i++) threadactive_.push_back(0);
181  }
182  void activate() { observe(true); }
183  void on_scheduler_entry(bool) override {
184  ++num_threads;
185  threadactive_[tbb::this_task_arena::current_thread_index()] = 1;
186  }
187 
188  void on_scheduler_exit(bool) override {
189  --num_threads;
190  threadactive_[tbb::this_task_arena::current_thread_index()] = 0;
191  }
192 
193  bool isThreadActive(unsigned index) { return threadactive_[index] == 1; }
194  int get_concurrency() { return num_threads; }
195  };
196 
198  : MicroStateService(iPS, reg),
199  fmt_(new FastMonitoringThread()),
200  tbbMonitoringMode_(iPS.getUntrackedParameter<bool>("tbbMonitoringMode", true)),
201  tbbConcurrencyTracker_(iPS.getUntrackedParameter<bool>("tbbConcurrencyTracker", true) && tbbMonitoringMode_),
202  sleepTime_(iPS.getUntrackedParameter<int>("sleepTime", 1)),
203  fastMonIntervals_(iPS.getUntrackedParameter<unsigned int>("fastMonIntervals", 2)),
204  fastName_("fastmoni"),
205  totalEventsProcessed_(0),
206  verbose_(iPS.getUntrackedParameter<bool>("verbose")) {
207  reg.watchPreallocate(this, &FastMonitoringService::preallocate); //receiving information on number of threads
209 
214 
218 
223 
224  reg.watchPreEvent(this, &FastMonitoringService::preEvent); //stream
226 
227  //readEvent (not getNextItemType)
228  reg.watchPreSourceEvent(this, &FastMonitoringService::preSourceEvent); //source (with streamID of requestor)
230 
233 
236 
240 
241  //find microstate definition path (required by the module)
242  struct stat statbuf;
243  std::string microstateBaseSuffix = "src/EventFilter/Utilities/plugins/microstatedef.jsd";
244  std::string microstatePath = std::string(std::getenv("CMSSW_BASE")) + "/" + microstateBaseSuffix;
245  if (stat(microstatePath.c_str(), &statbuf)) {
246  microstatePath = std::string(std::getenv("CMSSW_RELEASE_BASE")) + "/" + microstateBaseSuffix;
247  if (stat(microstatePath.c_str(), &statbuf)) {
248  microstatePath = microstateBaseSuffix;
249  if (stat(microstatePath.c_str(), &statbuf))
250  throw cms::Exception("FastMonitoringService") << "microstate definition file not found";
251  }
252  }
253  fastMicrostateDefPath_ = microstateDefPath_ = microstatePath;
254  }
255 
257 
260  desc.setComment("Service for File-based DAQ monitoring and event accounting");
261  desc.addUntracked<bool>("tbbMonitoringMode", true)
262  ->setComment("Monitor individual module processing per TBB thread instead of stream");
263  desc.addUntracked<bool>("tbbConcurrencyTracker", true)
264  ->setComment("Monitor TBB thread activity to flag microstate as real idle or overhead/other");
265  desc.addUntracked<int>("sleepTime", 1)->setComment("Sleep time of the monitoring thread");
266  desc.addUntracked<unsigned int>("fastMonIntervals", 2)
267  ->setComment("Modulo of sleepTime intervals on which fastmon file is written out");
268  desc.addUntracked<bool>("filePerFwkStream", true) //obsolete
269  ->setComment("Switches on monitoring output per framework stream");
270  desc.addUntracked<bool>("verbose", false)->setComment("Set to use LogInfo messages from the monitoring thread");
271  desc.setAllowAnything();
272  descriptions.add("FastMonitoringService", desc);
273  }
274 
276  Json::Value legendaVector(Json::arrayValue);
277  for (int i = 0; i < fmt_->m_data.encModule_.current_; i++)
278  legendaVector.append(
279  Json::Value((static_cast<const edm::ModuleDescription*>(fmt_->m_data.encModule_.decode(i)))->moduleLabel()));
280  //duplicate modules adding a list for acquire states (not all modules actually have it)
281  for (int i = 0; i < fmt_->m_data.encModule_.current_; i++)
282  legendaVector.append(Json::Value(
283  (static_cast<const edm::ModuleDescription*>(fmt_->m_data.encModule_.decode(i)))->moduleLabel() + "__ACQ"));
284  Json::Value valReserved(nReservedModules);
285  Json::Value valSpecial(nSpecialModules);
286  Json::Value valOutputModules(nOutputModules_);
287  Json::Value moduleLegend;
288  moduleLegend["names"] = legendaVector;
289  moduleLegend["reserved"] = valReserved;
290  moduleLegend["special"] = valSpecial;
291  moduleLegend["output"] = valOutputModules;
293  return writer.write(moduleLegend);
294  }
295 
297  Json::Value legendaVector(Json::arrayValue);
298  for (int i = 0; i < FastMonState::inCOUNT; i++)
299  legendaVector.append(Json::Value(inputStateNames[i]));
300  Json::Value moduleLegend;
301  moduleLegend["names"] = legendaVector;
303  return writer.write(moduleLegend);
304  }
305 
307  nStreams_ = bounds.maxNumberOfStreams();
308  nThreads_ = bounds.maxNumberOfThreads();
309  //this should already be >=1
310  if (nStreams_ == 0)
311  nStreams_ = 1;
312  if (nThreads_ == 0)
313  nThreads_ = 1;
315  ct_ = std::make_unique<ConcurrencyTracker>(nThreads_);
316  //start concurrency tracking
317  }
318 
320  // FIND RUN DIRECTORY
321  // The run dir should be set via the configuration of EvFDaqDirector
323  ct_->activate();
324 
325  if (edm::Service<evf::EvFDaqDirector>().operator->() == nullptr) {
326  throw cms::Exception("FastMonitoringService") << "EvFDaqDirector is not present";
327  }
328  std::filesystem::path runDirectory{edm::Service<evf::EvFDaqDirector>()->baseRunDir()};
329  workingDirectory_ = runDirectory_ = runDirectory;
330  workingDirectory_ /= "mon";
331 
332  if (!std::filesystem::is_directory(workingDirectory_)) {
333  LogDebug("FastMonitoringService") << "<MON> DIR NOT FOUND! Trying to create -: " << workingDirectory_.string();
334  std::filesystem::create_directories(workingDirectory_);
335  if (!std::filesystem::is_directory(workingDirectory_))
336  edm::LogWarning("FastMonitoringService") << "Unable to create <MON> DIR -: " << workingDirectory_.string()
337  << ". No monitoring data will be written.";
338  }
339 
340  std::ostringstream fastFileName;
341 
342  fastFileName << fastName_ << "_pid" << std::setfill('0') << std::setw(5) << getpid() << ".fast";
344  fast /= fastFileName.str();
345  fastPath_ = fast.string();
346 
347  std::ostringstream moduleLegFile;
348  std::ostringstream moduleLegFileJson;
349  moduleLegFile << "microstatelegend_pid" << std::setfill('0') << std::setw(5) << getpid() << ".leg";
350  moduleLegFileJson << "microstatelegend_pid" << std::setfill('0') << std::setw(5) << getpid() << ".jsn";
351  moduleLegendFile_ = (workingDirectory_ / moduleLegFile.str()).string();
352  moduleLegendFileJson_ = (workingDirectory_ / moduleLegFileJson.str()).string();
353 
354  std::ostringstream inputLegFileJson;
355  inputLegFileJson << "inputlegend_pid" << std::setfill('0') << std::setw(5) << getpid() << ".jsn";
356  inputLegendFileJson_ = (workingDirectory_ / inputLegFileJson.str()).string();
357 
358  LogDebug("FastMonitoringService") << "Initializing FastMonitor with microstate def path -: " << microstateDefPath_;
359 
360  /*
361  * initialize the fast monitor with:
362  * vector of pointers to monitorable parameters
363  * path to definition
364  *
365  */
366 
367  fmt_->m_data.macrostate_ = FastMonState::sInit;
368 
369  for (unsigned int i = 0; i < (FastMonState::mCOUNT); i++)
370  fmt_->m_data.encModule_.updateReserved(static_cast<const void*>(specialMicroStateNames + i));
371  fmt_->m_data.encModule_.completeReservedWithDummies();
372 
373  for (unsigned int i = 0; i < nMonThreads_; i++) {
374  microstate_.emplace_back(getmInvalid());
375  microstateAcqFlag_.push_back(0);
376  tmicrostate_.emplace_back(getmInvalid());
377  tmicrostateAcqFlag_.push_back(0);
378 
379  //for synchronization
380  streamCounterUpdating_.push_back(new std::atomic<bool>(false));
381  }
382 
383  //initial size until we detect number of bins
384  fmt_->m_data.macrostateBins_ = FastMonState::MCOUNT;
385  fmt_->m_data.microstateBins_ = 0;
386  fmt_->m_data.inputstateBins_ = FastMonState::inCOUNT;
387 
388  lastGlobalLumi_ = 0;
389  isInitTransition_ = true;
390  lumiFromSource_ = 0;
391 
392  //startup monitoring
393  fmt_->resetFastMonitor(microstateDefPath_, fastMicrostateDefPath_);
394  fmt_->jsonMonitor_->setNStreams(nMonThreads_);
395  fmt_->m_data.registerVariables(fmt_->jsonMonitor_.get(), nMonThreads_, nStreams_, nThreads_);
396  monInit_.store(false, std::memory_order_release);
397  if (sleepTime_ > 0)
399  }
400 
404  context = " FromThisContext ";
406  context = " FromAnotherContext";
408  context = " FromExternalSignal";
409  edm::LogWarning("FastMonitoringService")
410  << " STREAM " << sc.streamID().value() << " earlyTermination -: ID:" << sc.eventID()
411  << " LS:" << sc.eventID().luminosityBlock() << " " << context;
412  std::lock_guard<std::mutex> lock(fmt_->monlock_);
413  exceptionInLS_.push_back(sc.eventID().luminosityBlock());
414  has_data_exception_.store(true);
415  }
416 
420  context = " FromThisContext ";
422  context = " FromAnotherContext";
424  context = " FromExternalSignal";
425  edm::LogWarning("FastMonitoringService")
426  << " GLOBAL "
427  << "earlyTermination -: LS:" << gc.luminosityBlockID().luminosityBlock() << " " << context;
428  std::lock_guard<std::mutex> lock(fmt_->monlock_);
430  has_data_exception_.store(true);
431  }
432 
436  context = " FromThisContext ";
438  context = " FromAnotherContext";
440  context = " FromExternalSignal";
441  edm::LogWarning("FastMonitoringService") << " SOURCE "
442  << "earlyTermination -: " << context;
443  std::lock_guard<std::mutex> lock(fmt_->monlock_);
444  exception_detected_ = true;
445  has_source_exception_.store(true);
446  has_data_exception_.store(true);
447  }
448 
450  std::lock_guard<std::mutex> lock(fmt_->monlock_);
451  if (!ls)
452  exception_detected_ = true;
453  else
454  exceptionInLS_.push_back(ls);
455  }
456 
458  return has_source_exception_.load() || has_data_exception_.load();
459  }
460 
462  if (!has_data_exception_.load())
463  return false;
464  if (has_source_exception_.load())
465  return true;
466  std::lock_guard<std::mutex> lock(fmt_->monlock_);
467  for (auto ex : exceptionInLS_) {
468  if (ls == ex)
469  return true;
470  }
471  return false;
472  }
473 
475 
476  //new output module name is stream
478  std::lock_guard<std::mutex> lock(fmt_->monlock_);
479  //std::cout << " Pre module Begin Job module: " << desc.moduleName() << std::endl;
480 
481  //build a map of modules keyed by their module description address
482  //here we need to treat output modules in a special way so they can be easily singled out
483  if (desc.moduleName() == "Stream" || desc.moduleName() == "GlobalEvFOutputModule" ||
484  desc.moduleName() == "EventStreamFileWriter" || desc.moduleName() == "PoolOutputModule") {
485  fmt_->m_data.encModule_.updateReserved((void*)&desc);
486  nOutputModules_++;
487  } else
488  fmt_->m_data.encModule_.update((void*)&desc);
489  }
490 
492  std::string&& moduleLegStrJson = makeModuleLegendaJson();
493  FileIO::writeStringToFile(moduleLegendFileJson_, moduleLegStrJson);
494 
495  std::string inputLegendStrJson = makeInputLegendaJson();
496  FileIO::writeStringToFile(inputLegendFileJson_, inputLegendStrJson);
497 
498  fmt_->m_data.macrostate_ = FastMonState::sJobReady;
499 
500  //update number of entries in module histogram
501  std::lock_guard<std::mutex> lock(fmt_->monlock_);
502  //double the size to add post-acquire states
503  fmt_->m_data.microstateBins_ = fmt_->m_data.encModule_.vecsize() * 2;
504  }
505 
507  fmt_->m_data.macrostate_ = FastMonState::sJobEnded;
508  fmt_->stop();
509  }
510 
512  fmt_->m_data.macrostate_ = FastMonState::sRunning;
513  isInitTransition_ = false;
514  }
515 
517  timeval lumiStartTime;
518  gettimeofday(&lumiStartTime, nullptr);
519  unsigned int newLumi = gc.luminosityBlockID().luminosityBlock();
520  lastGlobalLumi_ = newLumi;
521 
522  std::lock_guard<std::mutex> lock(fmt_->monlock_);
523  lumiStartTime_[newLumi] = lumiStartTime;
524  //reset all states to idle
525  if (tbbMonitoringMode_)
526  for (unsigned i = 0; i < nThreads_; i++)
527  if (tmicrostate_[i] == getmInvalid())
528  tmicrostate_[i] = getmIdle();
529  }
530 
532  unsigned int lumi = gc.luminosityBlockID().luminosityBlock();
533  LogDebug("FastMonitoringService") << "Lumi ended. Writing JSON information. LUMI -: " << lumi;
534  timeval lumiStopTime;
535  gettimeofday(&lumiStopTime, nullptr);
536 
537  std::lock_guard<std::mutex> lock(fmt_->monlock_);
538 
539  // Compute throughput
540  timeval stt = lumiStartTime_[lumi];
541  lumiStartTime_.erase(lumi);
542  unsigned long usecondsForLumi = (lumiStopTime.tv_sec - stt.tv_sec) * 1000000 + (lumiStopTime.tv_usec - stt.tv_usec);
543  unsigned long accuSize = accuSize_.find(lumi) == accuSize_.end() ? 0 : accuSize_[lumi];
544  accuSize_.erase(lumi);
545  double throughput = throughputFactor() * double(accuSize) / double(usecondsForLumi);
546  //store to registered variable
547  fmt_->m_data.fastThroughputJ_.value() = throughput;
548 
549  //update
550  doSnapshot(lumi, true);
551 
552  //retrieve one result we need (todo: sanity check if it's found)
553  IntJ* lumiProcessedJptr = dynamic_cast<IntJ*>(fmt_->jsonMonitor_->getMergedIntJForLumi("Processed", lumi));
554  if (!lumiProcessedJptr)
555  throw cms::Exception("FastMonitoringService") << "Internal error: got null pointer from FastMonitor";
556  processedEventsPerLumi_[lumi] = std::pair<unsigned int, bool>(lumiProcessedJptr->value(), false);
557 
558  //checking if exception has been thrown (in case of Global/Stream early termination, for this LS)
559  bool exception_detected = exception_detected_;
560  for (auto ex : exceptionInLS_)
561  if (lumi == ex)
562  exception_detected = true;
563 
564  if (edm::shutdown_flag || exception_detected) {
565  edm::LogInfo("FastMonitoringService")
566  << "Run interrupted. Skip writing EoL information -: " << processedEventsPerLumi_[lumi].first
567  << " events were processed in LUMI " << lumi;
568  //this will prevent output modules from producing json file for possibly incomplete lumi
569  processedEventsPerLumi_[lumi].first = 0;
570  processedEventsPerLumi_[lumi].second = true;
571  //disable this exception, so service can be used standalone (will be thrown if output module asks for this information)
572  //throw cms::Exception("FastMonitoringService") << "SOURCE did not send update for lumi block. LUMI -:" << lumi;
573  return;
574  }
575 
576  if (inputSource_ || daqInputSource_) {
577  auto sourceReport =
579  if (sourceReport.first) {
580  if (sourceReport.second != processedEventsPerLumi_[lumi].first) {
581  throw cms::Exception("FastMonitoringService") << "MISMATCH with SOURCE update. LUMI -: " << lumi
582  << ", events(processed):" << processedEventsPerLumi_[lumi].first
583  << " events(source):" << sourceReport.second;
584  }
585  }
586  }
587 
588  edm::LogInfo("FastMonitoringService")
589  << "Statistics for lumisection -: lumi = " << lumi << " events = " << lumiProcessedJptr->value()
590  << " time = " << usecondsForLumi / 1000000 << " size = " << accuSize << " thr = " << throughput;
591  delete lumiProcessedJptr;
592 
593  //full global and stream merge (will be used by output modules), output from this service is deprecated
594  fmt_->jsonMonitor_->outputFullJSON("dummy", lumi, false);
595  fmt_->jsonMonitor_->discardCollected(lumi); //we don't do further updates for this lumi
596  }
597 
599  std::lock_guard<std::mutex> lock(fmt_->monlock_);
600  unsigned int lumi = gc.luminosityBlockID().luminosityBlock();
601  //LS monitoring snapshot with input source data has been taken in previous callback
602  avgLeadTime_.erase(lumi);
604  lockStatsDuringLumi_.erase(lumi);
605 
606  //output module already used this in end lumi (this could be migrated to EvFDaqDirector as it is essential for FFF bookkeeping)
608  }
609 
611  std::lock_guard<std::mutex> lock(fmt_->monlock_);
612  fmt_->m_data.streamLumi_[sc.streamID().value()] = sc.eventID().luminosityBlock();
613 
614  //reset collected values for this stream
615  *(fmt_->m_data.processed_[sc.streamID().value()]) = 0;
616 
617  microstate_[sc.streamID().value()] = getmBoL();
618  }
619 
621  microstate_[sc.streamID().value()] = getmIdle();
622  }
623 
625  std::lock_guard<std::mutex> lock(fmt_->monlock_);
626  //update processed count to be complete at this time
627  //doStreamEOLSnapshot(sc.eventID().luminosityBlock(), sid);
628  fmt_->jsonMonitor_->snapStreamAtomic(sc.eventID().luminosityBlock(), sc.streamID().value());
629  //reset this in case stream does not get notified of next lumi (we keep processed events only)
630  microstate_[sc.streamID().value()] = getmEoL();
631  }
632 
634  microstate_[sc.streamID().value()] = getmFwkEoL();
635  }
636 
638  microstate_[sc.streamID().value()] = getmEvent();
639  }
640 
642  (*(fmt_->m_data.processed_[sc.streamID().value()]))++;
643  //fast path counter (events accumulated in a run)
644  unsigned long res = totalEventsProcessed_.fetch_add(1, std::memory_order_relaxed);
645  fmt_->m_data.fastPathProcessedJ_ = res + 1;
646 
647  microstate_[sc.streamID().value()] = getmIdle();
648  }
649 
651  microstate_[getSID(sid)] = getmInput();
652  if (!tbbMonitoringMode_)
653  return;
654  auto tid = getTID();
655  if (tid >= nThreads_)
656  return;
657  tmicrostate_[tid] = getmInput();
658  }
659 
661  microstate_[getSID(sid)] = getmFwkOvhSrc();
662  if (!tbbMonitoringMode_)
663  return;
664  auto tid = getTID();
665  if (tid >= nThreads_)
666  return;
667  tmicrostate_[tid] = getmIdle();
668  }
669 
671  edm::ModuleCallingContext const& mcc) {
672  microstate_[getSID(sc)] = (void*)(mcc.moduleDescription());
673  microstateAcqFlag_[getSID(sc)] = 1;
674  if (!tbbMonitoringMode_)
675  return;
676  auto tid = getTID();
677  if (tid >= nThreads_)
678  return;
679  tmicrostate_[tid] = (void*)(mcc.moduleDescription());
680  tmicrostateAcqFlag_[tid] = 1;
681  }
682 
684  edm::ModuleCallingContext const& mcc) {
686  microstateAcqFlag_[getSID(sc)] = 0;
687  if (!tbbMonitoringMode_)
688  return;
689  auto tid = getTID();
690  if (tid >= nThreads_)
691  return;
692  tmicrostate_[tid] = getmIdle();
693  tmicrostateAcqFlag_[tid] = 0;
694  }
695 
697  microstate_[getSID(sc)] = (void*)(mcc.moduleDescription());
698  if (!tbbMonitoringMode_)
699  return;
700  auto tid = getTID();
701  if (tid >= nThreads_)
702  return;
703  tmicrostate_[tid] = (void*)(mcc.moduleDescription());
704  }
705 
708  if (!tbbMonitoringMode_)
709  return;
710  auto tid = getTID();
711  if (tid >= nThreads_)
712  return;
713  tmicrostate_[tid] = getmIdle();
714  }
715 
716  //from source
717  void FastMonitoringService::accumulateFileSize(unsigned int lumi, unsigned long fileSize) {
718  std::lock_guard<std::mutex> lock(fmt_->monlock_);
719 
720  if (accuSize_.find(lumi) == accuSize_.end())
721  accuSize_[lumi] = fileSize;
722  else
723  accuSize_[lumi] += fileSize;
724 
727  else
729  }
730 
732  gettimeofday(&fileLookStart_, nullptr);
733  /*
734  std::cout << "Started looking for .raw file at: s=" << fileLookStart_.tv_sec << ": ms = "
735  << fileLookStart_.tv_usec / 1000.0 << std::endl;
736  */
737  }
738 
740  gettimeofday(&fileLookStop_, nullptr);
741  /*
742  std::cout << "Stopped looking for .raw file at: s=" << fileLookStop_.tv_sec << ": ms = "
743  << fileLookStop_.tv_usec / 1000.0 << std::endl;
744  */
745  std::lock_guard<std::mutex> lock(fmt_->monlock_);
746 
747  if (lumi > lumiFromSource_) {
749  leadTimes_.clear();
750  }
751  unsigned long elapsedTime = (fileLookStop_.tv_sec - fileLookStart_.tv_sec) * 1000000 // sec to us
752  + (fileLookStop_.tv_usec - fileLookStart_.tv_usec); // us
753  // add this to lead times for this lumi
754  leadTimes_.push_back((double)elapsedTime);
755 
756  // recompute average lead time for this lumi
757  if (leadTimes_.size() == 1)
759  else {
760  double totTime = 0;
761  for (unsigned int i = 0; i < leadTimes_.size(); i++)
762  totTime += leadTimes_[i];
763  avgLeadTime_[lumi] = 0.001 * (totTime / leadTimes_.size());
764  }
765  }
766 
767  void FastMonitoringService::reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount) {
768  std::lock_guard<std::mutex> lock(fmt_->monlock_);
769  lockStatsDuringLumi_[ls] = std::pair<double, unsigned int>(waitTime, lockCount);
770  }
771 
773  tmicrostate_[tbb::this_task_arena::current_thread_index()] = &specialMicroStateNames[m];
774  }
775 
776  //for the output module
777  unsigned int FastMonitoringService::getEventsProcessedForLumi(unsigned int lumi, bool* abortFlag) {
778  std::lock_guard<std::mutex> lock(fmt_->monlock_);
779 
780  auto it = processedEventsPerLumi_.find(lumi);
781  if (it != processedEventsPerLumi_.end()) {
782  unsigned int proc = it->second.first;
783  if (abortFlag)
784  *abortFlag = it->second.second;
785  return proc;
786  } else {
787  throw cms::Exception("FastMonitoringService")
788  << "output module wants already deleted (or never reported by SOURCE) lumisection event count for LUMI -: "
789  << lumi;
790  return 0;
791  }
792  }
793 
794  //for the output module
796  std::lock_guard<std::mutex> lock(fmt_->monlock_);
797 
798  auto it = processedEventsPerLumi_.find(lumi);
799  if (it != processedEventsPerLumi_.end()) {
800  unsigned int abortFlag = it->second.second;
801  return abortFlag;
802  } else {
803  throw cms::Exception("FastMonitoringService")
804  << "output module wants already deleted (or never reported by SOURCE) lumisection status for LUMI -: "
805  << lumi;
806  return false;
807  }
808  }
809 
810  // the function to be called in the thread. Thread completes when function returns.
812  monInit_.exchange(true, std::memory_order_acquire);
813  while (!fmt_->m_stoprequest) {
814  std::vector<std::vector<unsigned int>> lastEnc;
815  {
816  std::unique_lock<std::mutex> lock(fmt_->monlock_);
817 
818  doSnapshot(lastGlobalLumi_, false);
819 
820  lastEnc.emplace_back(fmt_->m_data.tmicrostateEncoded_);
821  lastEnc.emplace_back(fmt_->m_data.microstateEncoded_);
822 
824  std::vector<std::string> CSVv;
825  for (unsigned int i = 0; i < nMonThreads_; i++) {
826  CSVv.push_back(fmt_->jsonMonitor_->getCSVString((int)i));
827  }
828  // release mutex before writing out fast path file
829  lock.release()->unlock();
830  fmt_->jsonMonitor_->outputCSV(fastPath_, CSVv);
831  }
832  snapCounter_++;
833  }
834 
835  if (verbose_) {
836  edm::LogInfo msg("FastMonitoringService");
837  auto f = [&](std::vector<unsigned int> const& p) {
838  for (unsigned int i = 0; i < nMonThreads_; i++) {
839  if (i == 0)
840  msg << "[" << p[i] << ",";
841  else if (i <= nMonThreads_ - 1)
842  msg << p[i] << ",";
843  else
844  msg << p[i] << "]";
845  }
846  };
847 
848  msg << "Current states: Ms=" << fmt_->m_data.fastMacrostateJ_.value() << " ms=";
849  f(lastEnc[0]);
850  msg << " us=";
851  f(lastEnc[1]);
853  }
854 
855  ::sleep(sleepTime_);
856  }
857  }
858 
859  void FastMonitoringService::doSnapshot(const unsigned int ls, const bool isGlobalEOL) {
860  // update macrostate
861  fmt_->m_data.fastMacrostateJ_ = fmt_->m_data.macrostate_;
862 
863  std::vector<const void*> microstateCopy(microstate_.begin(), microstate_.end());
864  std::vector<const void*> tmicrostateCopy(tmicrostate_.begin(), tmicrostate_.end());
865  std::vector<unsigned char> microstateAcqCopy(microstateAcqFlag_.begin(), microstateAcqFlag_.end());
866  std::vector<unsigned char> tmicrostateAcqCopy(tmicrostateAcqFlag_.begin(), tmicrostateAcqFlag_.end());
867 
868  if (!isInitTransition_) {
869  auto itd = avgLeadTime_.find(ls);
870  if (itd != avgLeadTime_.end())
871  fmt_->m_data.fastAvgLeadTimeJ_ = itd->second;
872  else
873  fmt_->m_data.fastAvgLeadTimeJ_ = 0.;
874 
875  auto iti = filesProcessedDuringLumi_.find(ls);
876  if (iti != filesProcessedDuringLumi_.end())
877  fmt_->m_data.fastFilesProcessedJ_ = iti->second;
878  else
879  fmt_->m_data.fastFilesProcessedJ_ = 0;
880 
881  auto itrd = lockStatsDuringLumi_.find(ls);
882  if (itrd != lockStatsDuringLumi_.end()) {
883  fmt_->m_data.fastLockWaitJ_ = itrd->second.first;
884  fmt_->m_data.fastLockCountJ_ = itrd->second.second;
885  } else {
886  fmt_->m_data.fastLockWaitJ_ = 0.;
887  fmt_->m_data.fastLockCountJ_ = 0.;
888  }
889  }
890 
891  for (unsigned int i = 0; i < nThreads_; i++) {
892  if (tmicrostateCopy[i] == getmIdle() && ct_->isThreadActive(i)) {
893  //overhead if thread is running
894  tmicrostateCopy[i] = getmFwk();
895  }
896  if (tmicrostateAcqCopy[i])
897  fmt_->m_data.tmicrostateEncoded_[i] =
898  fmt_->m_data.microstateBins_ + fmt_->m_data.encModule_.encode(tmicrostateCopy[i]);
899  else
900  fmt_->m_data.tmicrostateEncoded_[i] = fmt_->m_data.encModule_.encode(tmicrostateCopy[i]);
901  }
902 
903  for (unsigned int i = 0; i < nStreams_; i++) {
904  if (microstateAcqCopy[i])
905  fmt_->m_data.microstateEncoded_[i] =
906  fmt_->m_data.microstateBins_ + fmt_->m_data.encModule_.encode(microstateCopy[i]);
907  else
908  fmt_->m_data.microstateEncoded_[i] = fmt_->m_data.encModule_.encode(microstateCopy[i]);
909  }
910 
911  bool inputStatePerThread = false;
912 
914  switch (inputSupervisorState_) {
916  fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_fileLimit;
917  break;
919  fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_waitFreeChunk;
920  break;
922  fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_waitFreeChunkCopying;
923  break;
925  fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_waitFreeThread;
926  break;
928  fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_waitFreeThreadCopying;
929  break;
931  fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_busy;
932  break;
934  fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_lockPolling;
935  break;
937  fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_lockPollingCopying;
938  break;
940  fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_runEnd;
941  break;
943  fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_noFile;
944  break;
946  fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_newFile;
947  break;
950  break;
952  fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_newFileWaitThread;
953  break;
956  break;
958  fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_newFileWaitChunk;
959  break;
960  default:
961  fmt_->m_data.inputState_[0] = FastMonState::inWaitInput;
962  }
963  } else if (inputState_ == FastMonState::inWaitChunk) {
964  switch (inputSupervisorState_) {
966  fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_fileLimit;
967  break;
969  fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_waitFreeChunk;
970  break;
972  fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_waitFreeChunkCopying;
973  break;
975  fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_waitFreeThread;
976  break;
978  fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_waitFreeThreadCopying;
979  break;
981  fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_busy;
982  break;
984  fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_lockPolling;
985  break;
987  fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_lockPollingCopying;
988  break;
990  fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_runEnd;
991  break;
993  fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_noFile;
994  break;
996  fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_newFile;
997  break;
1000  break;
1002  fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_newFileWaitThread;
1003  break;
1006  break;
1008  fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_newFileWaitChunk;
1009  break;
1010  default:
1011  fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk;
1012  }
1013  } else if (inputState_ == FastMonState::inNoRequest) {
1014  inputStatePerThread = true;
1015  for (unsigned int i = 0; i < nMonThreads_; i++) {
1016  if (i >= nStreams_)
1017  fmt_->m_data.inputState_[i] = FastMonState::inIgnore;
1018  else if (microstateCopy[i] == getmIdle())
1019  fmt_->m_data.inputState_[i] = FastMonState::inNoRequestWithIdleThreads;
1020  else if (microstateCopy[i] == getmEoL() || microstateCopy[i] == getmFwkEoL())
1021  fmt_->m_data.inputState_[i] = FastMonState::inNoRequestWithEoLThreads;
1022  else
1023  fmt_->m_data.inputState_[i] = FastMonState::inNoRequest;
1024  }
1025  } else if (inputState_ == FastMonState::inNewLumi) {
1026  inputStatePerThread = true;
1027  for (unsigned int i = 0; i < nMonThreads_; i++) {
1028  if (i >= nStreams_)
1029  fmt_->m_data.inputState_[i] = FastMonState::inIgnore;
1030  else if (microstateCopy[i] == getmEoL() || microstateCopy[i] == getmFwkEoL())
1031  fmt_->m_data.inputState_[i] = FastMonState::inNewLumi;
1032  }
1034  //apply directly throttled state from supervisor
1035  fmt_->m_data.inputState_[0] = inputSupervisorState_;
1036  } else
1037  fmt_->m_data.inputState_[0] = inputState_;
1038 
1039  //this is same for all streams
1040  if (!inputStatePerThread)
1041  for (unsigned int i = 1; i < nMonThreads_; i++)
1042  fmt_->m_data.inputState_[i] = fmt_->m_data.inputState_[0];
1043 
1044  if (isGlobalEOL) { //only update global variables
1045  fmt_->jsonMonitor_->snapGlobal(ls);
1046  } else
1047  fmt_->jsonMonitor_->snap(ls);
1048  }
1049 
1050  //compatibility
1052 
1054 
1055 } //end namespace evf
std::atomic< bool > has_data_exception_
constexpr edm::ModuleDescription const * getmFwkEoL()
std::atomic< FastMonState::InputState > inputState_
void watchPreStreamEarlyTermination(PreStreamEarlyTermination::slot_type const &iSlot)
Definition: fillJson.h:27
static const std::string inputStateNames[FastMonState::inCOUNT]
constexpr edm::ModuleDescription const * getmInvalid()
constexpr edm::ModuleDescription const * getmFwkOvhSrc()
void watchPreEvent(PreEvent::slot_type const &iSlot)
ModuleDescription const * moduleDescription() const
constexpr edm::ModuleDescription const * getmFwkOvhMod()
void postModuleEventAcquire(edm::StreamContext const &, edm::ModuleCallingContext const &)
LuminosityBlockNumber_t luminosityBlock() const
std::vector< ContainableAtomic< const void * > > microstate_
constexpr edm::ModuleDescription const * getmIdle()
std::atomic< bool > isInitTransition_
void watchPreallocate(Preallocate::slot_type const &iSlot)
constexpr edm::ModuleDescription const * getmBoL()
void setExceptionDetected(unsigned int ls)
void on_scheduler_exit(bool) override
void watchPreModuleEventAcquire(PreModuleEventAcquire::slot_type const &iSlot)
void watchPostEndJob(PostEndJob::slot_type const &iSlot)
void preallocate(edm::service::SystemBounds const &)
std::map< unsigned int, timeval > lumiStartTime_
constexpr edm::ModuleDescription const * getmIgnore()
void preGlobalBeginLumi(edm::GlobalContext const &)
std::pair< bool, unsigned int > getEventReport(unsigned int lumi, bool erase)
void watchPreModuleEvent(PreModuleEvent::slot_type const &iSlot)
void postGlobalEndLumi(edm::GlobalContext const &)
void postEvent(edm::StreamContext const &)
void accumulateFileSize(unsigned int lumi, unsigned long fileSize)
void watchPreGlobalEndLumi(PreGlobalEndLumi::slot_type const &iSlot)
std::map< unsigned int, unsigned long > accuSize_
std::filesystem::path workingDirectory_
std::vector< std::atomic< bool > * > streamCounterUpdating_
void watchPostEvent(PostEvent::slot_type const &iSlot)
volatile std::atomic< bool > shutdown_flag
void watchPostStreamEndLumi(PostStreamEndLumi::slot_type const &iSlot)
void watchPreGlobalBeginLumi(PreGlobalBeginLumi::slot_type const &iSlot)
void watchPostModuleEvent(PostModuleEvent::slot_type const &iSlot)
void watchPostSourceEvent(PostSourceEvent::slot_type const &iSlot)
std::map< unsigned int, unsigned int > filesProcessedDuringLumi_
std::pair< bool, unsigned int > getEventReport(unsigned int lumi, bool erase)
Definition: DAQSource.cc:1390
void preGlobalEndLumi(edm::GlobalContext const &)
std::vector< ContainableAtomic< unsigned int > > threadactive_
bool isExceptionOnData(unsigned int ls)
void watchPreStreamEndLumi(PreStreamEndLumi::slot_type const &iSlot)
std::unique_ptr< FastMonitoringThread > fmt_
constexpr edm::ModuleDescription const * getmFwk()
void preGlobalEarlyTermination(edm::GlobalContext const &, edm::TerminationOrigin)
void watchPreSourceEarlyTermination(PreSourceEarlyTermination::slot_type const &iSlot)
Definition: Electron.h:6
constexpr int nSpecialModules
LuminosityBlockNumber_t luminosityBlock() const
Definition: EventID.h:39
void watchJobFailure(JobFailure::slot_type const &iSlot)
convenience function for attaching to signal
ConcurrencyTracker(unsigned num_expected)
void preModuleBeginJob(edm::ModuleDescription const &)
std::map< unsigned int, std::pair< unsigned int, bool > > processedEventsPerLumi_
MicroStateService(const edm::ParameterSet &, edm::ActivityRegistry &)
constexpr edm::ModuleDescription const * getmEoL()
void on_scheduler_entry(bool) override
constexpr edm::ModuleDescription const * getmGlobEoL()
void preStreamEndLumi(edm::StreamContext const &)
std::vector< ContainableAtomic< unsigned char > > tmicrostateAcqFlag_
std::map< unsigned int, double > avgLeadTime_
void watchPostStreamBeginLumi(PostStreamBeginLumi::slot_type const &iSlot)
void doSnapshot(const unsigned int ls, const bool isGlobalEOL)
void preStreamEarlyTermination(edm::StreamContext const &, edm::TerminationOrigin)
void watchPreGlobalEarlyTermination(PreGlobalEarlyTermination::slot_type const &iSlot)
Writes a Value in JSON format in a human friendly way.
Definition: writer.h:64
std::atomic< FastMonState::InputState > inputSupervisorState_
StreamID const & streamID() const
Definition: StreamContext.h:55
constexpr double throughputFactor()
void watchPostModuleEventAcquire(PostModuleEventAcquire::slot_type const &iSlot)
array value (ordered list)
Definition: value.h:32
std::filesystem::path runDirectory_
double f[11][100]
void watchPostGlobalEndLumi(PostGlobalEndLumi::slot_type const &iSlot)
void watchPreModuleBeginJob(PreModuleBeginJob::slot_type const &iSlot)
std::vector< ContainableAtomic< unsigned char > > microstateAcqFlag_
void preModuleEventAcquire(edm::StreamContext const &, edm::ModuleCallingContext const &)
void postModuleEvent(edm::StreamContext const &, edm::ModuleCallingContext const &)
LuminosityBlockID const & luminosityBlockID() const
Definition: GlobalContext.h:62
void postStreamBeginLumi(edm::StreamContext const &)
static unsigned int getSID(edm::StreamContext const &sc)
std::unique_ptr< ConcurrencyTracker > ct_
Log< level::Info, false > LogInfo
def ls(path, rec=False)
Definition: eostools.py:349
void postStreamEndLumi(edm::StreamContext const &)
constexpr edm::ModuleDescription const * getmInput()
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
void preStreamBeginLumi(edm::StreamContext const &)
std::atomic< unsigned long > totalEventsProcessed_
FedRawDataInputSource * inputSource_
std::map< unsigned int, std::pair< double, unsigned int > > lockStatsDuringLumi_
Value & append(const Value &value)
Append value to array at the end.
void setTMicrostate(FastMonState::Microstate m)
constexpr int nReservedModules
void watchPreStreamBeginLumi(PreStreamBeginLumi::slot_type const &iSlot)
void add(std::string const &label, ParameterSetDescription const &psetDescription)
tuple msg
Definition: mps_check.py:286
std::vector< double > leadTimes_
void preBeginJob(edm::PathsAndConsumesOfModulesBase const &, edm::ProcessContext const &pc)
void watchPreBeginJob(PreBeginJob::slot_type const &iSlot)
convenience function for attaching to signal
void stoppedLookingForFile(unsigned int lumi)
std::atomic< bool > has_source_exception_
EventID const & eventID() const
Definition: StreamContext.h:60
constexpr edm::ModuleDescription const * getmEvent()
void postGlobalBeginRun(edm::GlobalContext const &)
void preEvent(edm::StreamContext const &)
unsigned int getEventsProcessedForLumi(unsigned int lumi, bool *abortFlag=nullptr)
void preSourceEarlyTermination(edm::TerminationOrigin)
void watchPreSourceEvent(PreSourceEvent::slot_type const &iSlot)
unsigned int value() const
Definition: StreamID.h:43
bool getAbortFlagForLumi(unsigned int lumi)
bool isThreadActive(unsigned index)
Log< level::Warning, false > LogWarning
void preModuleEvent(edm::StreamContext const &, edm::ModuleCallingContext const &)
static const edm::ModuleDescription specialMicroStateNames[FastMonState::mCOUNT]
Represents a JSON value.
Definition: value.h:101
if(threadIdxLocalY==0 &&threadIdxLocalX==0)
JSON (JavaScript Object Notation).
Definition: DataPoint.h:26
void reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount)
std::vector< unsigned int > exceptionInLS_
constexpr edm::ModuleDescription const * getmIdleSource()
#define LogDebug(id)
void watchPostBeginJob(PostBeginJob::slot_type const &iSlot)
convenience function for attaching to signal
constexpr edm::ModuleDescription const * getmDqm()
std::vector< ContainableAtomic< const void * > > tmicrostate_