CMS 3D CMS Logo

FastMonitoringService.cc
Go to the documentation of this file.
3 
9 //#include "FWCore/ServiceRegistry/interface/PathContext.h"
16 
20 
23 
24 #include <iostream>
25 #include <iomanip>
26 #include <sys/time.h>
27 
28 using namespace jsoncollector;
29 
30 constexpr double throughputFactor() { return (1000000) / double(1024 * 1024); }
31 
32 namespace evf {
33 
34  const edm::ModuleDescription FastMonitoringService::specialMicroStateNames[FastMonState::mCOUNT] = {
35  edm::ModuleDescription("Dummy", "Invalid"),
36  edm::ModuleDescription("Dummy", "Idle"),
37  edm::ModuleDescription("Dummy", "FwkOvhSrc"),
38  edm::ModuleDescription("Dummy", "FwkOvhMod"), //set post produce, analyze or filter
39  edm::ModuleDescription("Dummy", "FwkEoL"),
40  edm::ModuleDescription("Dummy", "Input"),
41  edm::ModuleDescription("Dummy", "DQM"),
42  edm::ModuleDescription("Dummy", "BoL"),
43  edm::ModuleDescription("Dummy", "EoL"),
44  edm::ModuleDescription("Dummy", "GlobalEoL"),
45  edm::ModuleDescription("Dummy", "Fwk"),
46  edm::ModuleDescription("Dummy", "IdleSource"),
47  edm::ModuleDescription("Dummy", "Event"),
48  edm::ModuleDescription("Dummy", "Ignore")};
49 
51  return &FastMonitoringService::specialMicroStateNames[FastMonState::mInvalid];
52  }
54  return &FastMonitoringService::specialMicroStateNames[FastMonState::mIdle];
55  }
57  return &FastMonitoringService::specialMicroStateNames[FastMonState::mFwkOvhSrc];
58  }
60  return &FastMonitoringService::specialMicroStateNames[FastMonState::mFwkOvhMod];
61  }
63  return &FastMonitoringService::specialMicroStateNames[FastMonState::mFwkEoL];
64  }
66  return &FastMonitoringService::specialMicroStateNames[FastMonState::mInput];
67  }
69  return &FastMonitoringService::specialMicroStateNames[FastMonState::mDqm];
70  }
72  return &FastMonitoringService::specialMicroStateNames[FastMonState::mBoL];
73  }
75  return &FastMonitoringService::specialMicroStateNames[FastMonState::mEoL];
76  }
78  return &FastMonitoringService::specialMicroStateNames[FastMonState::mGlobEoL];
79  }
81  return &FastMonitoringService::specialMicroStateNames[FastMonState::mFwk];
82  }
84  return &FastMonitoringService::specialMicroStateNames[FastMonState::mIdleSource];
85  }
87  return &FastMonitoringService::specialMicroStateNames[FastMonState::mEvent];
88  }
90  return &FastMonitoringService::specialMicroStateNames[FastMonState::mIgnore];
91  }
92 
93  const std::string FastMonitoringService::macroStateNames[FastMonState::MCOUNT] = {"Init",
94  "JobReady",
95  "RunGiven",
96  "Running",
97  "Stopping",
98  "Done",
99  "JobEnded",
100  "Error",
101  "ErrorEnded",
102  "End",
103  "Invalid"};
104 
105  const std::string FastMonitoringService::inputStateNames[FastMonState::inCOUNT] = {
106  "Ignore",
107  "Init",
108  "WaitInput",
109  "NewLumi",
110  "NewLumiBusyEndingLS",
111  "NewLumiIdleEndingLS",
112  "RunEnd",
113  "ProcessingFile",
114  "WaitChunk",
115  "ChunkReceived",
116  "ChecksumEvent",
117  "CachedEvent",
118  "ReadEvent",
119  "ReadCleanup",
120  "NoRequest",
121  "NoRequestWithIdleThreads",
122  "NoRequestWithGlobalEoL",
123  "NoRequestWithEoLThreads",
124  "SupFileLimit",
125  "SupWaitFreeChunk",
126  "SupWaitFreeChunkCopying",
127  "SupWaitFreeThread",
128  "SupWaitFreeThreadCopying",
129  "SupBusy",
130  "SupLockPolling",
131  "SupLockPollingCopying",
132  "SupNoFile",
133  "SupNewFile",
134  "SupNewFileWaitThreadCopying",
135  "SupNewFileWaitThread",
136  "SupNewFileWaitChunkCopying",
137  "SupNewFileWaitChunk",
138  "WaitInput_fileLimit",
139  "WaitInput_waitFreeChunk",
140  "WaitInput_waitFreeChunkCopying",
141  "WaitInput_waitFreeThread",
142  "WaitInput_waitFreeThreadCopying",
143  "WaitInput_busy",
144  "WaitInput_lockPolling",
145  "WaitInput_lockPollingCopying",
146  "WaitInput_runEnd",
147  "WaitInput_noFile",
148  "WaitInput_newFile",
149  "WaitInput_newFileWaitThreadCopying",
150  "WaitInput_newFileWaitThread",
151  "WaitInput_newFileWaitChunkCopying",
152  "WaitInput_newFileWaitChunk",
153  "WaitChunk_fileLimit",
154  "WaitChunk_waitFreeChunk",
155  "WaitChunk_waitFreeChunkCopying",
156  "WaitChunk_waitFreeThread",
157  "WaitChunk_waitFreeThreadCopying",
158  "WaitChunk_busy",
159  "WaitChunk_lockPolling",
160  "WaitChunk_lockPollingCopying",
161  "WaitChunk_runEnd",
162  "WaitChunk_noFile",
163  "WaitChunk_newFile",
164  "WaitChunk_newFileWaitThreadCopying",
165  "WaitChunk_newFileWaitThread",
166  "WaitChunk_newFileWaitChunkCopying",
167  "WaitChunk_newFileWaitChunk",
168  "inSupThrottled",
169  "inThrottled"};
170 
171  class ConcurrencyTracker : public tbb::task_scheduler_observer {
172  std::atomic<int> num_threads;
173  unsigned max_threads;
174  std::vector<ContainableAtomic<unsigned int>> threadactive_;
175 
176  public:
177  ConcurrencyTracker(unsigned num_expected)
178  : num_threads(), max_threads(num_expected), threadactive_(num_expected, 0) {
179  //set array to if it will not be used
180  //for (unsigned i=0;i<num_expected;i++) threadactive_.push_back(0);
181  }
182  void activate() { observe(true); }
183  void on_scheduler_entry(bool) override {
184  ++num_threads;
185  threadactive_[tbb::this_task_arena::current_thread_index()] = 1;
186  }
187 
188  void on_scheduler_exit(bool) override {
189  --num_threads;
190  threadactive_[tbb::this_task_arena::current_thread_index()] = 0;
191  }
192 
193  bool isThreadActive(unsigned index) { return threadactive_[index] == 1; }
194  int get_concurrency() { return num_threads; }
195  };
196 
198  : fmt_(new FastMonitoringThread()),
199  tbbMonitoringMode_(iPS.getUntrackedParameter<bool>("tbbMonitoringMode", true)),
200  tbbConcurrencyTracker_(iPS.getUntrackedParameter<bool>("tbbConcurrencyTracker", true) && tbbMonitoringMode_),
201  sleepTime_(iPS.getUntrackedParameter<int>("sleepTime", 1)),
202  fastMonIntervals_(iPS.getUntrackedParameter<unsigned int>("fastMonIntervals", 2)),
203  fastName_("fastmoni"),
204  totalEventsProcessed_(0),
205  verbose_(iPS.getUntrackedParameter<bool>("verbose")) {
206  reg.watchPreallocate(this, &FastMonitoringService::preallocate); //receiving information on number of threads
208 
213 
217 
222 
223  reg.watchPreEvent(this, &FastMonitoringService::preEvent); //stream
225 
226  //readEvent (not getNextItemType)
227  reg.watchPreSourceEvent(this, &FastMonitoringService::preSourceEvent); //source (with streamID of requestor)
229 
232 
235 
239 
240  //find microstate definition path (required by the module)
241  struct stat statbuf;
242  std::string microstateBaseSuffix = "src/EventFilter/Utilities/plugins/microstatedef.jsd";
243  std::string microstatePath = std::string(std::getenv("CMSSW_BASE")) + "/" + microstateBaseSuffix;
244  if (stat(microstatePath.c_str(), &statbuf)) {
245  microstatePath = std::string(std::getenv("CMSSW_RELEASE_BASE")) + "/" + microstateBaseSuffix;
246  if (stat(microstatePath.c_str(), &statbuf)) {
247  microstatePath = microstateBaseSuffix;
248  if (stat(microstatePath.c_str(), &statbuf))
249  throw cms::Exception("FastMonitoringService") << "microstate definition file not found";
250  }
251  }
252  fastMicrostateDefPath_ = microstateDefPath_ = microstatePath;
253  }
254 
256 
259  desc.setComment("Service for File-based DAQ monitoring and event accounting");
260  desc.addUntracked<bool>("tbbMonitoringMode", true)
261  ->setComment("Monitor individual module processing per TBB thread instead of stream");
262  desc.addUntracked<bool>("tbbConcurrencyTracker", true)
263  ->setComment("Monitor TBB thread activity to flag microstate as real idle or overhead/other");
264  desc.addUntracked<int>("sleepTime", 1)->setComment("Sleep time of the monitoring thread");
265  desc.addUntracked<unsigned int>("fastMonIntervals", 2)
266  ->setComment("Modulo of sleepTime intervals on which fastmon file is written out");
267  desc.addUntracked<bool>("filePerFwkStream", true) //obsolete
268  ->setComment("Switches on monitoring output per framework stream");
269  desc.addUntracked<bool>("verbose", false)->setComment("Set to use LogInfo messages from the monitoring thread");
270  desc.setAllowAnything();
271  descriptions.add("FastMonitoringService", desc);
272  }
273 
275  Json::Value legendaVector(Json::arrayValue);
276  for (int i = 0; i < fmt_->m_data.encModule_.current_; i++)
277  legendaVector.append(
278  Json::Value((static_cast<const edm::ModuleDescription*>(fmt_->m_data.encModule_.decode(i)))->moduleLabel()));
279  //duplicate modules adding a list for acquire states (not all modules actually have it)
280  for (int i = 0; i < fmt_->m_data.encModule_.current_; i++)
281  legendaVector.append(Json::Value(
282  (static_cast<const edm::ModuleDescription*>(fmt_->m_data.encModule_.decode(i)))->moduleLabel() + "__ACQ"));
283  Json::Value valReserved(nReservedModules);
284  Json::Value valSpecial(nSpecialModules);
285  Json::Value valOutputModules(nOutputModules_);
286  Json::Value moduleLegend;
287  moduleLegend["names"] = legendaVector;
288  moduleLegend["reserved"] = valReserved;
289  moduleLegend["special"] = valSpecial;
290  moduleLegend["output"] = valOutputModules;
292  return writer.write(moduleLegend);
293  }
294 
296  Json::Value legendaVector(Json::arrayValue);
297  for (int i = 0; i < FastMonState::inCOUNT; i++)
298  legendaVector.append(Json::Value(inputStateNames[i]));
299  Json::Value moduleLegend;
300  moduleLegend["names"] = legendaVector;
302  return writer.write(moduleLegend);
303  }
304 
306  nStreams_ = bounds.maxNumberOfStreams();
307  nThreads_ = bounds.maxNumberOfThreads();
308  //this should already be >=1
309  if (nStreams_ == 0)
310  nStreams_ = 1;
311  if (nThreads_ == 0)
312  nThreads_ = 1;
314  ct_ = std::make_unique<ConcurrencyTracker>(nThreads_);
315  //start concurrency tracking
316  }
317 
319  // FIND RUN DIRECTORY
320  // The run dir should be set via the configuration of EvFDaqDirector
322  ct_->activate();
323 
324  if (edm::Service<evf::EvFDaqDirector>().operator->() == nullptr) {
325  throw cms::Exception("FastMonitoringService") << "EvFDaqDirector is not present";
326  }
327  std::filesystem::path runDirectory{edm::Service<evf::EvFDaqDirector>()->baseRunDir()};
328  workingDirectory_ = runDirectory_ = runDirectory;
329  workingDirectory_ /= "mon";
330 
331  if (!std::filesystem::is_directory(workingDirectory_)) {
332  LogDebug("FastMonitoringService") << "<MON> DIR NOT FOUND! Trying to create -: " << workingDirectory_.string();
333  std::filesystem::create_directories(workingDirectory_);
334  if (!std::filesystem::is_directory(workingDirectory_))
335  edm::LogWarning("FastMonitoringService") << "Unable to create <MON> DIR -: " << workingDirectory_.string()
336  << ". No monitoring data will be written.";
337  }
338 
339  std::ostringstream fastFileName;
340 
341  fastFileName << fastName_ << "_pid" << std::setfill('0') << std::setw(5) << getpid() << ".fast";
343  fast /= fastFileName.str();
344  fastPath_ = fast.string();
345 
346  std::ostringstream moduleLegFile;
347  std::ostringstream moduleLegFileJson;
348  moduleLegFile << "microstatelegend_pid" << std::setfill('0') << std::setw(5) << getpid() << ".leg";
349  moduleLegFileJson << "microstatelegend_pid" << std::setfill('0') << std::setw(5) << getpid() << ".jsn";
350  moduleLegendFile_ = (workingDirectory_ / moduleLegFile.str()).string();
351  moduleLegendFileJson_ = (workingDirectory_ / moduleLegFileJson.str()).string();
352 
353  std::ostringstream inputLegFileJson;
354  inputLegFileJson << "inputlegend_pid" << std::setfill('0') << std::setw(5) << getpid() << ".jsn";
355  inputLegendFileJson_ = (workingDirectory_ / inputLegFileJson.str()).string();
356 
357  LogDebug("FastMonitoringService") << "Initializing FastMonitor with microstate def path -: " << microstateDefPath_;
358 
359  /*
360  * initialize the fast monitor with:
361  * vector of pointers to monitorable parameters
362  * path to definition
363  *
364  */
365 
366  fmt_->m_data.macrostate_ = FastMonState::sInit;
367 
368  for (unsigned int i = 0; i < (FastMonState::mCOUNT); i++)
369  fmt_->m_data.encModule_.updateReserved(static_cast<const void*>(specialMicroStateNames + i));
370  fmt_->m_data.encModule_.completeReservedWithDummies();
371 
372  for (unsigned int i = 0; i < nMonThreads_; i++) {
373  microstate_.emplace_back(getmInvalid());
374  microstateAcqFlag_.push_back(0);
375  tmicrostate_.emplace_back(getmInvalid());
376  tmicrostateAcqFlag_.push_back(0);
377 
378  //for synchronization
379  streamCounterUpdating_.push_back(new std::atomic<bool>(false));
380  }
381 
382  //initial size until we detect number of bins
383  fmt_->m_data.macrostateBins_ = FastMonState::MCOUNT;
384  fmt_->m_data.microstateBins_ = 0;
385  fmt_->m_data.inputstateBins_ = FastMonState::inCOUNT;
386 
387  lastGlobalLumi_ = 0;
388  isInitTransition_ = true;
389  lumiFromSource_ = 0;
390 
391  //startup monitoring
392  fmt_->resetFastMonitor(microstateDefPath_, fastMicrostateDefPath_);
393  fmt_->jsonMonitor_->setNStreams(nMonThreads_);
394  fmt_->m_data.registerVariables(fmt_->jsonMonitor_.get(), nMonThreads_, nStreams_, nThreads_);
395  monInit_.store(false, std::memory_order_release);
396  if (sleepTime_ > 0)
398  }
399 
403  context = " FromThisContext ";
405  context = " FromAnotherContext";
407  context = " FromExternalSignal";
408  edm::LogWarning("FastMonitoringService")
409  << " STREAM " << sc.streamID().value() << " earlyTermination -: ID:" << sc.eventID()
410  << " LS:" << sc.eventID().luminosityBlock() << " " << context;
411  std::lock_guard<std::mutex> lock(fmt_->monlock_);
412  exceptionInLS_.push_back(sc.eventID().luminosityBlock());
413  has_data_exception_.store(true);
414  }
415 
419  context = " FromThisContext ";
421  context = " FromAnotherContext";
423  context = " FromExternalSignal";
424  edm::LogWarning("FastMonitoringService")
425  << " GLOBAL "
426  << "earlyTermination -: LS:" << gc.luminosityBlockID().luminosityBlock() << " " << context;
427  std::lock_guard<std::mutex> lock(fmt_->monlock_);
429  has_data_exception_.store(true);
430  }
431 
435  context = " FromThisContext ";
437  context = " FromAnotherContext";
439  context = " FromExternalSignal";
440  edm::LogWarning("FastMonitoringService") << " SOURCE "
441  << "earlyTermination -: " << context;
442  std::lock_guard<std::mutex> lock(fmt_->monlock_);
443  exception_detected_ = true;
444  has_source_exception_.store(true);
445  has_data_exception_.store(true);
446  }
447 
449  std::lock_guard<std::mutex> lock(fmt_->monlock_);
450  if (!ls)
451  exception_detected_ = true;
452  else
453  exceptionInLS_.push_back(ls);
454  }
455 
457  return has_source_exception_.load() || has_data_exception_.load();
458  }
459 
461  if (!has_data_exception_.load())
462  return false;
463  if (has_source_exception_.load())
464  return true;
465  std::lock_guard<std::mutex> lock(fmt_->monlock_);
466  for (auto ex : exceptionInLS_) {
467  if (ls == ex)
468  return true;
469  }
470  return false;
471  }
472 
474 
475  //new output module name is stream
477  std::lock_guard<std::mutex> lock(fmt_->monlock_);
478  //std::cout << " Pre module Begin Job module: " << desc.moduleName() << std::endl;
479 
480  //build a map of modules keyed by their module description address
481  //here we need to treat output modules in a special way so they can be easily singled out
482  if (desc.moduleName() == "Stream" || desc.moduleName() == "GlobalEvFOutputModule" ||
483  desc.moduleName() == "EventStreamFileWriter" || desc.moduleName() == "PoolOutputModule") {
484  fmt_->m_data.encModule_.updateReserved((void*)&desc);
485  nOutputModules_++;
486  } else
487  fmt_->m_data.encModule_.update((void*)&desc);
488  }
489 
491  std::string&& moduleLegStrJson = makeModuleLegendaJson();
492  FileIO::writeStringToFile(moduleLegendFileJson_, moduleLegStrJson);
493 
494  std::string inputLegendStrJson = makeInputLegendaJson();
495  FileIO::writeStringToFile(inputLegendFileJson_, inputLegendStrJson);
496 
497  fmt_->m_data.macrostate_ = FastMonState::sJobReady;
498 
499  //update number of entries in module histogram
500  std::lock_guard<std::mutex> lock(fmt_->monlock_);
501  //double the size to add post-acquire states
502  fmt_->m_data.microstateBins_ = fmt_->m_data.encModule_.vecsize() * 2;
503  }
504 
506  fmt_->m_data.macrostate_ = FastMonState::sJobEnded;
507  fmt_->stop();
508  }
509 
511  fmt_->m_data.macrostate_ = FastMonState::sRunning;
512  isInitTransition_ = false;
513  }
514 
516  timeval lumiStartTime;
517  gettimeofday(&lumiStartTime, nullptr);
518  unsigned int newLumi = gc.luminosityBlockID().luminosityBlock();
519  lastGlobalLumi_ = newLumi;
520 
521  std::lock_guard<std::mutex> lock(fmt_->monlock_);
522  lumiStartTime_[newLumi] = lumiStartTime;
523  //reset all states to idle
524  if (tbbMonitoringMode_)
525  for (unsigned i = 0; i < nThreads_; i++)
526  if (tmicrostate_[i] == getmInvalid())
527  tmicrostate_[i] = getmIdle();
528  }
529 
531  unsigned int lumi = gc.luminosityBlockID().luminosityBlock();
532  LogDebug("FastMonitoringService") << "Lumi ended. Writing JSON information. LUMI -: " << lumi;
533  timeval lumiStopTime;
534  gettimeofday(&lumiStopTime, nullptr);
535 
536  std::lock_guard<std::mutex> lock(fmt_->monlock_);
537 
538  // Compute throughput
539  timeval stt = lumiStartTime_[lumi];
540  lumiStartTime_.erase(lumi);
541  unsigned long usecondsForLumi = (lumiStopTime.tv_sec - stt.tv_sec) * 1000000 + (lumiStopTime.tv_usec - stt.tv_usec);
542  unsigned long accuSize = accuSize_.find(lumi) == accuSize_.end() ? 0 : accuSize_[lumi];
543  accuSize_.erase(lumi);
544  double throughput = throughputFactor() * double(accuSize) / double(usecondsForLumi);
545  //store to registered variable
546  fmt_->m_data.fastThroughputJ_.value() = throughput;
547 
548  //update
549  doSnapshot(lumi, true);
550 
551  //retrieve one result we need (todo: sanity check if it's found)
552  IntJ* lumiProcessedJptr = dynamic_cast<IntJ*>(fmt_->jsonMonitor_->getMergedIntJForLumi("Processed", lumi));
553  if (!lumiProcessedJptr)
554  throw cms::Exception("FastMonitoringService") << "Internal error: got null pointer from FastMonitor";
555  processedEventsPerLumi_[lumi] = std::pair<unsigned int, bool>(lumiProcessedJptr->value(), false);
556 
557  //checking if exception has been thrown (in case of Global/Stream early termination, for this LS)
558  bool exception_detected = exception_detected_;
559  for (auto ex : exceptionInLS_)
560  if (lumi == ex)
561  exception_detected = true;
562 
563  if (edm::shutdown_flag || exception_detected) {
564  edm::LogInfo("FastMonitoringService")
565  << "Run interrupted. Skip writing EoL information -: " << processedEventsPerLumi_[lumi].first
566  << " events were processed in LUMI " << lumi;
567  //this will prevent output modules from producing json file for possibly incomplete lumi
568  processedEventsPerLumi_[lumi].first = 0;
569  processedEventsPerLumi_[lumi].second = true;
570  //disable this exception, so service can be used standalone (will be thrown if output module asks for this information)
571  //throw cms::Exception("FastMonitoringService") << "SOURCE did not send update for lumi block. LUMI -:" << lumi;
572  return;
573  }
574 
575  if (inputSource_ || daqInputSource_) {
576  auto sourceReport =
578  if (sourceReport.first) {
579  if (sourceReport.second != processedEventsPerLumi_[lumi].first) {
580  throw cms::Exception("FastMonitoringService") << "MISMATCH with SOURCE update. LUMI -: " << lumi
581  << ", events(processed):" << processedEventsPerLumi_[lumi].first
582  << " events(source):" << sourceReport.second;
583  }
584  }
585  }
586 
587  edm::LogInfo("FastMonitoringService")
588  << "Statistics for lumisection -: lumi = " << lumi << " events = " << lumiProcessedJptr->value()
589  << " time = " << usecondsForLumi / 1000000 << " size = " << accuSize << " thr = " << throughput;
590  delete lumiProcessedJptr;
591 
592  //full global and stream merge (will be used by output modules), output from this service is deprecated
593  fmt_->jsonMonitor_->outputFullJSON("dummy", lumi, false);
594  fmt_->jsonMonitor_->discardCollected(lumi); //we don't do further updates for this lumi
595  }
596 
598  std::lock_guard<std::mutex> lock(fmt_->monlock_);
599  unsigned int lumi = gc.luminosityBlockID().luminosityBlock();
600  //LS monitoring snapshot with input source data has been taken in previous callback
601  avgLeadTime_.erase(lumi);
603  lockStatsDuringLumi_.erase(lumi);
604 
605  //output module already used this in end lumi (this could be migrated to EvFDaqDirector as it is essential for FFF bookkeeping)
607  }
608 
610  std::lock_guard<std::mutex> lock(fmt_->monlock_);
611  fmt_->m_data.streamLumi_[sc.streamID().value()] = sc.eventID().luminosityBlock();
612 
613  //reset collected values for this stream
614  *(fmt_->m_data.processed_[sc.streamID().value()]) = 0;
615 
616  microstate_[sc.streamID().value()] = getmBoL();
617  }
618 
620  microstate_[sc.streamID().value()] = getmIdle();
621  }
622 
624  std::lock_guard<std::mutex> lock(fmt_->monlock_);
625  //update processed count to be complete at this time
626  //doStreamEOLSnapshot(sc.eventID().luminosityBlock(), sid);
627  fmt_->jsonMonitor_->snapStreamAtomic(sc.eventID().luminosityBlock(), sc.streamID().value());
628  //reset this in case stream does not get notified of next lumi (we keep processed events only)
629  microstate_[sc.streamID().value()] = getmEoL();
630  }
631 
633  microstate_[sc.streamID().value()] = getmFwkEoL();
634  }
635 
637  microstate_[sc.streamID().value()] = getmEvent();
638  }
639 
641  (*(fmt_->m_data.processed_[sc.streamID().value()]))++;
642  //fast path counter (events accumulated in a run)
643  unsigned long res = totalEventsProcessed_.fetch_add(1, std::memory_order_relaxed);
644  fmt_->m_data.fastPathProcessedJ_ = res + 1;
645 
646  microstate_[sc.streamID().value()] = getmIdle();
647  }
648 
650  microstate_[getSID(sid)] = getmInput();
651  if (!tbbMonitoringMode_)
652  return;
653  auto tid = getTID();
654  if (tid >= nThreads_)
655  return;
656  tmicrostate_[tid] = getmInput();
657  }
658 
660  microstate_[getSID(sid)] = getmFwkOvhSrc();
661  if (!tbbMonitoringMode_)
662  return;
663  auto tid = getTID();
664  if (tid >= nThreads_)
665  return;
666  tmicrostate_[tid] = getmIdle();
667  }
668 
670  edm::ModuleCallingContext const& mcc) {
671  microstate_[getSID(sc)] = (void*)(mcc.moduleDescription());
672  microstateAcqFlag_[getSID(sc)] = 1;
673  if (!tbbMonitoringMode_)
674  return;
675  auto tid = getTID();
676  if (tid >= nThreads_)
677  return;
678  tmicrostate_[tid] = (void*)(mcc.moduleDescription());
679  tmicrostateAcqFlag_[tid] = 1;
680  }
681 
683  edm::ModuleCallingContext const& mcc) {
685  microstateAcqFlag_[getSID(sc)] = 0;
686  if (!tbbMonitoringMode_)
687  return;
688  auto tid = getTID();
689  if (tid >= nThreads_)
690  return;
691  tmicrostate_[tid] = getmIdle();
692  tmicrostateAcqFlag_[tid] = 0;
693  }
694 
696  microstate_[getSID(sc)] = (void*)(mcc.moduleDescription());
697  if (!tbbMonitoringMode_)
698  return;
699  auto tid = getTID();
700  if (tid >= nThreads_)
701  return;
702  tmicrostate_[tid] = (void*)(mcc.moduleDescription());
703  }
704 
707  if (!tbbMonitoringMode_)
708  return;
709  auto tid = getTID();
710  if (tid >= nThreads_)
711  return;
712  tmicrostate_[tid] = getmIdle();
713  }
714 
715  //from source
716  void FastMonitoringService::accumulateFileSize(unsigned int lumi, unsigned long fileSize) {
717  std::lock_guard<std::mutex> lock(fmt_->monlock_);
718 
719  if (accuSize_.find(lumi) == accuSize_.end())
720  accuSize_[lumi] = fileSize;
721  else
722  accuSize_[lumi] += fileSize;
723 
726  else
728  }
729 
731  gettimeofday(&fileLookStart_, nullptr);
732  /*
733  std::cout << "Started looking for .raw file at: s=" << fileLookStart_.tv_sec << ": ms = "
734  << fileLookStart_.tv_usec / 1000.0 << std::endl;
735  */
736  }
737 
739  gettimeofday(&fileLookStop_, nullptr);
740  /*
741  std::cout << "Stopped looking for .raw file at: s=" << fileLookStop_.tv_sec << ": ms = "
742  << fileLookStop_.tv_usec / 1000.0 << std::endl;
743  */
744  std::lock_guard<std::mutex> lock(fmt_->monlock_);
745 
746  if (lumi > lumiFromSource_) {
748  leadTimes_.clear();
749  }
750  unsigned long elapsedTime = (fileLookStop_.tv_sec - fileLookStart_.tv_sec) * 1000000 // sec to us
751  + (fileLookStop_.tv_usec - fileLookStart_.tv_usec); // us
752  // add this to lead times for this lumi
753  leadTimes_.push_back((double)elapsedTime);
754 
755  // recompute average lead time for this lumi
756  if (leadTimes_.size() == 1)
758  else {
759  double totTime = 0;
760  for (unsigned int i = 0; i < leadTimes_.size(); i++)
761  totTime += leadTimes_[i];
762  avgLeadTime_[lumi] = 0.001 * (totTime / leadTimes_.size());
763  }
764  }
765 
766  void FastMonitoringService::reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount) {
767  std::lock_guard<std::mutex> lock(fmt_->monlock_);
768  lockStatsDuringLumi_[ls] = std::pair<double, unsigned int>(waitTime, lockCount);
769  }
770 
772  tmicrostate_[tbb::this_task_arena::current_thread_index()] = &specialMicroStateNames[m];
773  }
774 
775  //for the output module
776  unsigned int FastMonitoringService::getEventsProcessedForLumi(unsigned int lumi, bool* abortFlag) {
777  std::lock_guard<std::mutex> lock(fmt_->monlock_);
778 
779  auto it = processedEventsPerLumi_.find(lumi);
780  if (it != processedEventsPerLumi_.end()) {
781  unsigned int proc = it->second.first;
782  if (abortFlag)
783  *abortFlag = it->second.second;
784  return proc;
785  } else {
786  throw cms::Exception("FastMonitoringService")
787  << "output module wants already deleted (or never reported by SOURCE) lumisection event count for LUMI -: "
788  << lumi;
789  return 0;
790  }
791  }
792 
793  //for the output module
795  std::lock_guard<std::mutex> lock(fmt_->monlock_);
796 
797  auto it = processedEventsPerLumi_.find(lumi);
798  if (it != processedEventsPerLumi_.end()) {
799  unsigned int abortFlag = it->second.second;
800  return abortFlag;
801  } else {
802  throw cms::Exception("FastMonitoringService")
803  << "output module wants already deleted (or never reported by SOURCE) lumisection status for LUMI -: "
804  << lumi;
805  return false;
806  }
807  }
808 
809  // the function to be called in the thread. Thread completes when function returns.
811  monInit_.exchange(true, std::memory_order_acquire);
812  while (!fmt_->m_stoprequest) {
813  std::vector<std::vector<unsigned int>> lastEnc;
814  {
815  std::unique_lock<std::mutex> lock(fmt_->monlock_);
816 
817  doSnapshot(lastGlobalLumi_, false);
818 
819  lastEnc.emplace_back(fmt_->m_data.tmicrostateEncoded_);
820  lastEnc.emplace_back(fmt_->m_data.microstateEncoded_);
821 
823  std::vector<std::string> CSVv;
824  for (unsigned int i = 0; i < nMonThreads_; i++) {
825  CSVv.push_back(fmt_->jsonMonitor_->getCSVString((int)i));
826  }
827  // release mutex before writing out fast path file
828  lock.release()->unlock();
829  fmt_->jsonMonitor_->outputCSV(fastPath_, CSVv);
830  }
831  snapCounter_++;
832  }
833 
834  if (verbose_) {
835  edm::LogInfo msg("FastMonitoringService");
836  auto f = [&](std::vector<unsigned int> const& p) {
837  for (unsigned int i = 0; i < nMonThreads_; i++) {
838  if (i == 0)
839  msg << "[" << p[i] << ",";
840  else if (i <= nMonThreads_ - 1)
841  msg << p[i] << ",";
842  else
843  msg << p[i] << "]";
844  }
845  };
846 
847  msg << "Current states: Ms=" << fmt_->m_data.fastMacrostateJ_.value() << " ms=";
848  f(lastEnc[0]);
849  msg << " us=";
850  f(lastEnc[1]);
852  }
853 
854  ::sleep(sleepTime_);
855  }
856  }
857 
858  void FastMonitoringService::doSnapshot(const unsigned int ls, const bool isGlobalEOL) {
859  // update macrostate
860  fmt_->m_data.fastMacrostateJ_ = fmt_->m_data.macrostate_;
861 
862  std::vector<const void*> microstateCopy(microstate_.begin(), microstate_.end());
863  std::vector<const void*> tmicrostateCopy(tmicrostate_.begin(), tmicrostate_.end());
864  std::vector<unsigned char> microstateAcqCopy(microstateAcqFlag_.begin(), microstateAcqFlag_.end());
865  std::vector<unsigned char> tmicrostateAcqCopy(tmicrostateAcqFlag_.begin(), tmicrostateAcqFlag_.end());
866 
867  if (!isInitTransition_) {
868  auto itd = avgLeadTime_.find(ls);
869  if (itd != avgLeadTime_.end())
870  fmt_->m_data.fastAvgLeadTimeJ_ = itd->second;
871  else
872  fmt_->m_data.fastAvgLeadTimeJ_ = 0.;
873 
874  auto iti = filesProcessedDuringLumi_.find(ls);
875  if (iti != filesProcessedDuringLumi_.end())
876  fmt_->m_data.fastFilesProcessedJ_ = iti->second;
877  else
878  fmt_->m_data.fastFilesProcessedJ_ = 0;
879 
880  auto itrd = lockStatsDuringLumi_.find(ls);
881  if (itrd != lockStatsDuringLumi_.end()) {
882  fmt_->m_data.fastLockWaitJ_ = itrd->second.first;
883  fmt_->m_data.fastLockCountJ_ = itrd->second.second;
884  } else {
885  fmt_->m_data.fastLockWaitJ_ = 0.;
886  fmt_->m_data.fastLockCountJ_ = 0.;
887  }
888  }
889 
890  for (unsigned int i = 0; i < nThreads_; i++) {
891  if (tmicrostateCopy[i] == getmIdle() && ct_->isThreadActive(i)) {
892  //overhead if thread is running
893  tmicrostateCopy[i] = getmFwk();
894  }
895  if (tmicrostateAcqCopy[i])
896  fmt_->m_data.tmicrostateEncoded_[i] =
897  fmt_->m_data.microstateBins_ + fmt_->m_data.encModule_.encode(tmicrostateCopy[i]);
898  else
899  fmt_->m_data.tmicrostateEncoded_[i] = fmt_->m_data.encModule_.encode(tmicrostateCopy[i]);
900  }
901 
902  for (unsigned int i = 0; i < nStreams_; i++) {
903  if (microstateAcqCopy[i])
904  fmt_->m_data.microstateEncoded_[i] =
905  fmt_->m_data.microstateBins_ + fmt_->m_data.encModule_.encode(microstateCopy[i]);
906  else
907  fmt_->m_data.microstateEncoded_[i] = fmt_->m_data.encModule_.encode(microstateCopy[i]);
908  }
909 
910  bool inputStatePerThread = false;
911 
913  switch (inputSupervisorState_) {
915  fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_fileLimit;
916  break;
918  fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_waitFreeChunk;
919  break;
921  fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_waitFreeChunkCopying;
922  break;
924  fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_waitFreeThread;
925  break;
927  fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_waitFreeThreadCopying;
928  break;
930  fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_busy;
931  break;
933  fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_lockPolling;
934  break;
936  fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_lockPollingCopying;
937  break;
939  fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_runEnd;
940  break;
942  fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_noFile;
943  break;
945  fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_newFile;
946  break;
949  break;
951  fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_newFileWaitThread;
952  break;
955  break;
957  fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_newFileWaitChunk;
958  break;
959  default:
960  fmt_->m_data.inputState_[0] = FastMonState::inWaitInput;
961  }
962  } else if (inputState_ == FastMonState::inWaitChunk) {
963  switch (inputSupervisorState_) {
965  fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_fileLimit;
966  break;
968  fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_waitFreeChunk;
969  break;
971  fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_waitFreeChunkCopying;
972  break;
974  fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_waitFreeThread;
975  break;
977  fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_waitFreeThreadCopying;
978  break;
980  fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_busy;
981  break;
983  fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_lockPolling;
984  break;
986  fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_lockPollingCopying;
987  break;
989  fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_runEnd;
990  break;
992  fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_noFile;
993  break;
995  fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_newFile;
996  break;
999  break;
1001  fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_newFileWaitThread;
1002  break;
1005  break;
1007  fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_newFileWaitChunk;
1008  break;
1009  default:
1010  fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk;
1011  }
1012  } else if (inputState_ == FastMonState::inNoRequest) {
1013  inputStatePerThread = true;
1014  for (unsigned int i = 0; i < nMonThreads_; i++) {
1015  if (i >= nStreams_)
1016  fmt_->m_data.inputState_[i] = FastMonState::inIgnore;
1017  else if (microstateCopy[i] == getmIdle())
1018  fmt_->m_data.inputState_[i] = FastMonState::inNoRequestWithIdleThreads;
1019  else if (microstateCopy[i] == getmEoL() || microstateCopy[i] == getmFwkEoL())
1020  fmt_->m_data.inputState_[i] = FastMonState::inNoRequestWithEoLThreads;
1021  else
1022  fmt_->m_data.inputState_[i] = FastMonState::inNoRequest;
1023  }
1024  } else if (inputState_ == FastMonState::inNewLumi) {
1025  inputStatePerThread = true;
1026  for (unsigned int i = 0; i < nMonThreads_; i++) {
1027  if (i >= nStreams_)
1028  fmt_->m_data.inputState_[i] = FastMonState::inIgnore;
1029  else if (microstateCopy[i] == getmEoL() || microstateCopy[i] == getmFwkEoL())
1030  fmt_->m_data.inputState_[i] = FastMonState::inNewLumi;
1031  }
1033  //apply directly throttled state from supervisor
1034  fmt_->m_data.inputState_[0] = inputSupervisorState_;
1035  } else
1036  fmt_->m_data.inputState_[0] = inputState_;
1037 
1038  //this is same for all streams
1039  if (!inputStatePerThread)
1040  for (unsigned int i = 1; i < nMonThreads_; i++)
1041  fmt_->m_data.inputState_[i] = fmt_->m_data.inputState_[0];
1042 
1043  if (isGlobalEOL) { //only update global variables
1044  fmt_->jsonMonitor_->snapGlobal(ls);
1045  } else
1046  fmt_->jsonMonitor_->snap(ls);
1047  }
1048 
1049 } //end namespace evf
std::atomic< bool > has_data_exception_
constexpr edm::ModuleDescription const * getmFwkEoL()
std::atomic< FastMonState::InputState > inputState_
void watchPreStreamEarlyTermination(PreStreamEarlyTermination::slot_type const &iSlot)
Definition: fillJson.h:27
static const std::string inputStateNames[FastMonState::inCOUNT]
constexpr edm::ModuleDescription const * getmInvalid()
constexpr edm::ModuleDescription const * getmFwkOvhSrc()
void watchPreEvent(PreEvent::slot_type const &iSlot)
constexpr edm::ModuleDescription const * getmFwkOvhMod()
void postModuleEventAcquire(edm::StreamContext const &, edm::ModuleCallingContext const &)
LuminosityBlockNumber_t luminosityBlock() const
std::vector< ContainableAtomic< const void * > > microstate_
constexpr edm::ModuleDescription const * getmIdle()
std::atomic< bool > isInitTransition_
void watchPreallocate(Preallocate::slot_type const &iSlot)
constexpr edm::ModuleDescription const * getmBoL()
void setExceptionDetected(unsigned int ls)
void on_scheduler_exit(bool) override
void watchPreModuleEventAcquire(PreModuleEventAcquire::slot_type const &iSlot)
void watchPostEndJob(PostEndJob::slot_type const &iSlot)
void preallocate(edm::service::SystemBounds const &)
std::map< unsigned int, timeval > lumiStartTime_
constexpr edm::ModuleDescription const * getmIgnore()
void preGlobalBeginLumi(edm::GlobalContext const &)
std::pair< bool, unsigned int > getEventReport(unsigned int lumi, bool erase)
void watchPreModuleEvent(PreModuleEvent::slot_type const &iSlot)
void postGlobalEndLumi(edm::GlobalContext const &)
void postEvent(edm::StreamContext const &)
void accumulateFileSize(unsigned int lumi, unsigned long fileSize)
void watchPreGlobalEndLumi(PreGlobalEndLumi::slot_type const &iSlot)
std::map< unsigned int, unsigned long > accuSize_
std::filesystem::path workingDirectory_
std::vector< std::atomic< bool > * > streamCounterUpdating_
void watchPostEvent(PostEvent::slot_type const &iSlot)
volatile std::atomic< bool > shutdown_flag
void watchPostStreamEndLumi(PostStreamEndLumi::slot_type const &iSlot)
void watchPreGlobalBeginLumi(PreGlobalBeginLumi::slot_type const &iSlot)
void watchPostModuleEvent(PostModuleEvent::slot_type const &iSlot)
void watchPostSourceEvent(PostSourceEvent::slot_type const &iSlot)
std::map< unsigned int, unsigned int > filesProcessedDuringLumi_
std::pair< bool, unsigned int > getEventReport(unsigned int lumi, bool erase)
Definition: DAQSource.cc:1390
void preGlobalEndLumi(edm::GlobalContext const &)
std::vector< ContainableAtomic< unsigned int > > threadactive_
ModuleDescription const * moduleDescription() const noexcept
bool isExceptionOnData(unsigned int ls)
void watchPreStreamEndLumi(PreStreamEndLumi::slot_type const &iSlot)
std::unique_ptr< FastMonitoringThread > fmt_
constexpr edm::ModuleDescription const * getmFwk()
void preGlobalEarlyTermination(edm::GlobalContext const &, edm::TerminationOrigin)
void watchPreSourceEarlyTermination(PreSourceEarlyTermination::slot_type const &iSlot)
Definition: Electron.h:6
constexpr int nSpecialModules
LuminosityBlockNumber_t luminosityBlock() const
Definition: EventID.h:39
void watchJobFailure(JobFailure::slot_type const &iSlot)
convenience function for attaching to signal
ConcurrencyTracker(unsigned num_expected)
void preModuleBeginJob(edm::ModuleDescription const &)
std::map< unsigned int, std::pair< unsigned int, bool > > processedEventsPerLumi_
constexpr edm::ModuleDescription const * getmEoL()
void on_scheduler_entry(bool) override
constexpr edm::ModuleDescription const * getmGlobEoL()
void preStreamEndLumi(edm::StreamContext const &)
std::vector< ContainableAtomic< unsigned char > > tmicrostateAcqFlag_
std::map< unsigned int, double > avgLeadTime_
void watchPostStreamBeginLumi(PostStreamBeginLumi::slot_type const &iSlot)
void doSnapshot(const unsigned int ls, const bool isGlobalEOL)
void preStreamEarlyTermination(edm::StreamContext const &, edm::TerminationOrigin)
void watchPreGlobalEarlyTermination(PreGlobalEarlyTermination::slot_type const &iSlot)
Writes a Value in JSON format in a human friendly way.
Definition: writer.h:64
std::atomic< FastMonState::InputState > inputSupervisorState_
StreamID const & streamID() const
Definition: StreamContext.h:55
constexpr double throughputFactor()
void watchPostModuleEventAcquire(PostModuleEventAcquire::slot_type const &iSlot)
array value (ordered list)
Definition: value.h:32
std::filesystem::path runDirectory_
double f[11][100]
void watchPostGlobalEndLumi(PostGlobalEndLumi::slot_type const &iSlot)
void watchPreModuleBeginJob(PreModuleBeginJob::slot_type const &iSlot)
std::vector< ContainableAtomic< unsigned char > > microstateAcqFlag_
void preModuleEventAcquire(edm::StreamContext const &, edm::ModuleCallingContext const &)
void postModuleEvent(edm::StreamContext const &, edm::ModuleCallingContext const &)
LuminosityBlockID const & luminosityBlockID() const
Definition: GlobalContext.h:66
void postStreamBeginLumi(edm::StreamContext const &)
static unsigned int getSID(edm::StreamContext const &sc)
std::unique_ptr< ConcurrencyTracker > ct_
Log< level::Info, false > LogInfo
def ls(path, rec=False)
Definition: eostools.py:349
void postStreamEndLumi(edm::StreamContext const &)
constexpr edm::ModuleDescription const * getmInput()
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
void preStreamBeginLumi(edm::StreamContext const &)
std::atomic< unsigned long > totalEventsProcessed_
FedRawDataInputSource * inputSource_
std::map< unsigned int, std::pair< double, unsigned int > > lockStatsDuringLumi_
Value & append(const Value &value)
Append value to array at the end.
void setTMicrostate(FastMonState::Microstate m)
constexpr int nReservedModules
void watchPreStreamBeginLumi(PreStreamBeginLumi::slot_type const &iSlot)
void add(std::string const &label, ParameterSetDescription const &psetDescription)
tuple msg
Definition: mps_check.py:286
std::vector< double > leadTimes_
void preBeginJob(edm::PathsAndConsumesOfModulesBase const &, edm::ProcessContext const &pc)
void watchPreBeginJob(PreBeginJob::slot_type const &iSlot)
convenience function for attaching to signal
void stoppedLookingForFile(unsigned int lumi)
std::atomic< bool > has_source_exception_
EventID const & eventID() const
Definition: StreamContext.h:60
constexpr edm::ModuleDescription const * getmEvent()
void postGlobalBeginRun(edm::GlobalContext const &)
void preEvent(edm::StreamContext const &)
unsigned int getEventsProcessedForLumi(unsigned int lumi, bool *abortFlag=nullptr)
void preSourceEarlyTermination(edm::TerminationOrigin)
void watchPreSourceEvent(PreSourceEvent::slot_type const &iSlot)
unsigned int value() const
Definition: StreamID.h:43
bool getAbortFlagForLumi(unsigned int lumi)
bool isThreadActive(unsigned index)
Log< level::Warning, false > LogWarning
void preModuleEvent(edm::StreamContext const &, edm::ModuleCallingContext const &)
static const edm::ModuleDescription specialMicroStateNames[FastMonState::mCOUNT]
Represents a JSON value.
Definition: value.h:101
if(threadIdxLocalY==0 &&threadIdxLocalX==0)
JSON (JavaScript Object Notation).
Definition: DataPoint.h:26
void reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount)
std::vector< unsigned int > exceptionInLS_
constexpr edm::ModuleDescription const * getmIdleSource()
#define LogDebug(id)
void watchPostBeginJob(PostBeginJob::slot_type const &iSlot)
convenience function for attaching to signal
constexpr edm::ModuleDescription const * getmDqm()
std::vector< ContainableAtomic< const void * > > tmicrostate_