CMS 3D CMS Logo

FastMonitoringService.cc
Go to the documentation of this file.
3 
9 //#include "FWCore/ServiceRegistry/interface/PathContext.h"
16 
20 
23 
24 #include <iostream>
25 #include <iomanip>
26 #include <sys/time.h>
27 
28 using namespace jsoncollector;
29 
30 constexpr double throughputFactor() { return (1000000) / double(1024 * 1024); }
31 
32 namespace evf {
33 
34  const edm::ModuleDescription FastMonitoringService::specialMicroStateNames[FastMonState::mCOUNT] = {
35  edm::ModuleDescription("Dummy", "Invalid"),
36  edm::ModuleDescription("Dummy", "Idle"),
37  edm::ModuleDescription("Dummy", "FwkOvhSrc"),
38  edm::ModuleDescription("Dummy", "FwkOvhMod"), //set post produce, analyze or filter
39  edm::ModuleDescription("Dummy", "FwkEoL"),
40  edm::ModuleDescription("Dummy", "Input"),
41  edm::ModuleDescription("Dummy", "DQM"),
42  edm::ModuleDescription("Dummy", "BoL"),
43  edm::ModuleDescription("Dummy", "EoL"),
44  edm::ModuleDescription("Dummy", "GlobalEoL"),
45  edm::ModuleDescription("Dummy", "Fwk"),
46  edm::ModuleDescription("Dummy", "IdleSource"),
47  edm::ModuleDescription("Dummy", "Event"),
48  edm::ModuleDescription("Dummy", "Ignore")};
49 
50  constexpr edm::ModuleDescription const* getmInvalid() {
51  return &FastMonitoringService::specialMicroStateNames[FastMonState::mInvalid];
52  }
53  constexpr edm::ModuleDescription const* getmIdle() {
54  return &FastMonitoringService::specialMicroStateNames[FastMonState::mIdle];
55  }
57  return &FastMonitoringService::specialMicroStateNames[FastMonState::mFwkOvhSrc];
58  }
60  return &FastMonitoringService::specialMicroStateNames[FastMonState::mFwkOvhMod];
61  }
62  constexpr edm::ModuleDescription const* getmFwkEoL() {
63  return &FastMonitoringService::specialMicroStateNames[FastMonState::mFwkEoL];
64  }
65  constexpr edm::ModuleDescription const* getmInput() {
66  return &FastMonitoringService::specialMicroStateNames[FastMonState::mInput];
67  }
68  constexpr edm::ModuleDescription const* getmDqm() {
69  return &FastMonitoringService::specialMicroStateNames[FastMonState::mDqm];
70  }
71  constexpr edm::ModuleDescription const* getmBoL() {
72  return &FastMonitoringService::specialMicroStateNames[FastMonState::mBoL];
73  }
74  constexpr edm::ModuleDescription const* getmEoL() {
75  return &FastMonitoringService::specialMicroStateNames[FastMonState::mEoL];
76  }
77  constexpr edm::ModuleDescription const* getmGlobEoL() {
78  return &FastMonitoringService::specialMicroStateNames[FastMonState::mGlobEoL];
79  }
80  constexpr edm::ModuleDescription const* getmFwk() {
81  return &FastMonitoringService::specialMicroStateNames[FastMonState::mFwk];
82  }
84  return &FastMonitoringService::specialMicroStateNames[FastMonState::mIdleSource];
85  }
86  constexpr edm::ModuleDescription const* getmEvent() {
87  return &FastMonitoringService::specialMicroStateNames[FastMonState::mEvent];
88  }
89  constexpr edm::ModuleDescription const* getmIgnore() {
90  return &FastMonitoringService::specialMicroStateNames[FastMonState::mIgnore];
91  }
92 
93  const std::string FastMonitoringService::macroStateNames[FastMonState::MCOUNT] = {"Init",
94  "JobReady",
95  "RunGiven",
96  "Running",
97  "Stopping",
98  "Done",
99  "JobEnded",
100  "Error",
101  "ErrorEnded",
102  "End",
103  "Invalid"};
104 
105  const std::string FastMonitoringService::inputStateNames[FastMonState::inCOUNT] = {
106  "Ignore",
107  "Init",
108  "WaitInput",
109  "NewLumi",
110  "NewLumiBusyEndingLS",
111  "NewLumiIdleEndingLS",
112  "RunEnd",
113  "ProcessingFile",
114  "WaitChunk",
115  "ChunkReceived",
116  "ChecksumEvent",
117  "CachedEvent",
118  "ReadEvent",
119  "ReadCleanup",
120  "NoRequest",
121  "NoRequestWithIdleThreads",
122  "NoRequestWithGlobalEoL",
123  "NoRequestWithEoLThreads",
124  "SupFileLimit",
125  "SupWaitFreeChunk",
126  "SupWaitFreeChunkCopying",
127  "SupWaitFreeThread",
128  "SupWaitFreeThreadCopying",
129  "SupBusy",
130  "SupLockPolling",
131  "SupLockPollingCopying",
132  "SupNoFile",
133  "SupNewFile",
134  "SupNewFileWaitThreadCopying",
135  "SupNewFileWaitThread",
136  "SupNewFileWaitChunkCopying",
137  "SupNewFileWaitChunk",
138  "WaitInput_fileLimit",
139  "WaitInput_waitFreeChunk",
140  "WaitInput_waitFreeChunkCopying",
141  "WaitInput_waitFreeThread",
142  "WaitInput_waitFreeThreadCopying",
143  "WaitInput_busy",
144  "WaitInput_lockPolling",
145  "WaitInput_lockPollingCopying",
146  "WaitInput_runEnd",
147  "WaitInput_noFile",
148  "WaitInput_newFile",
149  "WaitInput_newFileWaitThreadCopying",
150  "WaitInput_newFileWaitThread",
151  "WaitInput_newFileWaitChunkCopying",
152  "WaitInput_newFileWaitChunk",
153  "WaitChunk_fileLimit",
154  "WaitChunk_waitFreeChunk",
155  "WaitChunk_waitFreeChunkCopying",
156  "WaitChunk_waitFreeThread",
157  "WaitChunk_waitFreeThreadCopying",
158  "WaitChunk_busy",
159  "WaitChunk_lockPolling",
160  "WaitChunk_lockPollingCopying",
161  "WaitChunk_runEnd",
162  "WaitChunk_noFile",
163  "WaitChunk_newFile",
164  "WaitChunk_newFileWaitThreadCopying",
165  "WaitChunk_newFileWaitThread",
166  "WaitChunk_newFileWaitChunkCopying",
167  "WaitChunk_newFileWaitChunk",
168  "inSupThrottled",
169  "inThrottled"};
170 
171  class ConcurrencyTracker : public tbb::task_scheduler_observer {
172  std::atomic<int> num_threads;
173  unsigned max_threads;
174  std::vector<ContainableAtomic<unsigned int>> threadactive_;
175 
176  public:
177  ConcurrencyTracker(unsigned num_expected)
178  : num_threads(), max_threads(num_expected), threadactive_(num_expected, 0) {
179  //set array to if it will not be used
180  //for (unsigned i=0;i<num_expected;i++) threadactive_.push_back(0);
181  }
182  void activate() { observe(true); }
183  void on_scheduler_entry(bool) override {
184  ++num_threads;
185  threadactive_[tbb::this_task_arena::current_thread_index()] = 1;
186  }
187 
188  void on_scheduler_exit(bool) override {
189  --num_threads;
190  threadactive_[tbb::this_task_arena::current_thread_index()] = 0;
191  }
192 
193  bool isThreadActive(unsigned index) { return threadactive_[index] == 1; }
194  int get_concurrency() { return num_threads; }
195  };
196 
198  : MicroStateService(iPS, reg),
199  fmt_(new FastMonitoringThread()),
200  tbbMonitoringMode_(iPS.getUntrackedParameter<bool>("tbbMonitoringMode", true)),
201  tbbConcurrencyTracker_(iPS.getUntrackedParameter<bool>("tbbConcurrencyTracker", true) && tbbMonitoringMode_),
202  sleepTime_(iPS.getUntrackedParameter<int>("sleepTime", 1)),
203  fastMonIntervals_(iPS.getUntrackedParameter<unsigned int>("fastMonIntervals", 2)),
204  fastName_("fastmoni"),
205  totalEventsProcessed_(0),
206  verbose_(iPS.getUntrackedParameter<bool>("verbose")) {
207  reg.watchPreallocate(this, &FastMonitoringService::preallocate); //receiving information on number of threads
209 
214 
218 
223 
224  reg.watchPreEvent(this, &FastMonitoringService::preEvent); //stream
226 
227  //readEvent (not getNextItemType)
228  reg.watchPreSourceEvent(this, &FastMonitoringService::preSourceEvent); //source (with streamID of requestor)
230 
233 
236 
240 
241  //find microstate definition path (required by the module)
242  struct stat statbuf;
243  std::string microstateBaseSuffix = "src/EventFilter/Utilities/plugins/microstatedef.jsd";
244  std::string microstatePath = std::string(std::getenv("CMSSW_BASE")) + "/" + microstateBaseSuffix;
245  if (stat(microstatePath.c_str(), &statbuf)) {
246  microstatePath = std::string(std::getenv("CMSSW_RELEASE_BASE")) + "/" + microstateBaseSuffix;
247  if (stat(microstatePath.c_str(), &statbuf)) {
248  microstatePath = microstateBaseSuffix;
249  if (stat(microstatePath.c_str(), &statbuf))
250  throw cms::Exception("FastMonitoringService") << "microstate definition file not found";
251  }
252  }
253  fastMicrostateDefPath_ = microstateDefPath_ = microstatePath;
254  }
255 
257 
260  desc.setComment("Service for File-based DAQ monitoring and event accounting");
261  desc.addUntracked<bool>("tbbMonitoringMode", true)
262  ->setComment("Monitor individual module processing per TBB thread instead of stream");
263  desc.addUntracked<bool>("tbbConcurrencyTracker", true)
264  ->setComment("Monitor TBB thread activity to flag microstate as real idle or overhead/other");
265  desc.addUntracked<int>("sleepTime", 1)->setComment("Sleep time of the monitoring thread");
266  desc.addUntracked<unsigned int>("fastMonIntervals", 2)
267  ->setComment("Modulo of sleepTime intervals on which fastmon file is written out");
268  desc.addUntracked<bool>("filePerFwkStream", true) //obsolete
269  ->setComment("Switches on monitoring output per framework stream");
270  desc.addUntracked<bool>("verbose", false)->setComment("Set to use LogInfo messages from the monitoring thread");
271  desc.setAllowAnything();
272  descriptions.add("FastMonitoringService", desc);
273  }
274 
276  Json::Value legendaVector(Json::arrayValue);
277  for (int i = 0; i < fmt_->m_data.encModule_.current_; i++)
278  legendaVector.append(
279  Json::Value((static_cast<const edm::ModuleDescription*>(fmt_->m_data.encModule_.decode(i)))->moduleLabel()));
280  //duplicate modules adding a list for acquire states (not all modules actually have it)
281  for (int i = 0; i < fmt_->m_data.encModule_.current_; i++)
282  legendaVector.append(Json::Value(
283  (static_cast<const edm::ModuleDescription*>(fmt_->m_data.encModule_.decode(i)))->moduleLabel() + "__ACQ"));
284  Json::Value valReserved(nReservedModules);
285  Json::Value valSpecial(nSpecialModules);
286  Json::Value valOutputModules(nOutputModules_);
287  Json::Value moduleLegend;
288  moduleLegend["names"] = legendaVector;
289  moduleLegend["reserved"] = valReserved;
290  moduleLegend["special"] = valSpecial;
291  moduleLegend["output"] = valOutputModules;
293  return writer.write(moduleLegend);
294  }
295 
297  Json::Value legendaVector(Json::arrayValue);
298  for (int i = 0; i < FastMonState::inCOUNT; i++)
299  legendaVector.append(Json::Value(inputStateNames[i]));
300  Json::Value moduleLegend;
301  moduleLegend["names"] = legendaVector;
303  return writer.write(moduleLegend);
304  }
305 
307  nStreams_ = bounds.maxNumberOfStreams();
308  nThreads_ = bounds.maxNumberOfThreads();
309  //this should already be >=1
310  if (nStreams_ == 0)
311  nStreams_ = 1;
312  if (nThreads_ == 0)
313  nThreads_ = 1;
315  ct_ = std::make_unique<ConcurrencyTracker>(nThreads_);
316  //start concurrency tracking
317  }
318 
320  // FIND RUN DIRECTORY
321  // The run dir should be set via the configuration of EvFDaqDirector
323  ct_->activate();
324 
325  if (edm::Service<evf::EvFDaqDirector>().operator->() == nullptr) {
326  throw cms::Exception("FastMonitoringService") << "EvFDaqDirector is not present";
327  }
328  std::filesystem::path runDirectory{edm::Service<evf::EvFDaqDirector>()->baseRunDir()};
329  workingDirectory_ = runDirectory_ = runDirectory;
330  workingDirectory_ /= "mon";
331 
332  if (!std::filesystem::is_directory(workingDirectory_)) {
333  LogDebug("FastMonitoringService") << "<MON> DIR NOT FOUND! Trying to create -: " << workingDirectory_.string();
334  std::filesystem::create_directories(workingDirectory_);
335  if (!std::filesystem::is_directory(workingDirectory_))
336  edm::LogWarning("FastMonitoringService") << "Unable to create <MON> DIR -: " << workingDirectory_.string()
337  << ". No monitoring data will be written.";
338  }
339 
340  std::ostringstream fastFileName;
341 
342  fastFileName << fastName_ << "_pid" << std::setfill('0') << std::setw(5) << getpid() << ".fast";
344  fast /= fastFileName.str();
345  fastPath_ = fast.string();
346 
347  std::ostringstream moduleLegFile;
348  std::ostringstream moduleLegFileJson;
349  moduleLegFile << "microstatelegend_pid" << std::setfill('0') << std::setw(5) << getpid() << ".leg";
350  moduleLegFileJson << "microstatelegend_pid" << std::setfill('0') << std::setw(5) << getpid() << ".jsn";
351  moduleLegendFile_ = (workingDirectory_ / moduleLegFile.str()).string();
352  moduleLegendFileJson_ = (workingDirectory_ / moduleLegFileJson.str()).string();
353 
354  std::ostringstream inputLegFileJson;
355  inputLegFileJson << "inputlegend_pid" << std::setfill('0') << std::setw(5) << getpid() << ".jsn";
356  inputLegendFileJson_ = (workingDirectory_ / inputLegFileJson.str()).string();
357 
358  LogDebug("FastMonitoringService") << "Initializing FastMonitor with microstate def path -: " << microstateDefPath_;
359 
360  /*
361  * initialize the fast monitor with:
362  * vector of pointers to monitorable parameters
363  * path to definition
364  *
365  */
366 
367  fmt_->m_data.macrostate_ = FastMonState::sInit;
368 
369  for (unsigned int i = 0; i < (FastMonState::mCOUNT); i++)
370  fmt_->m_data.encModule_.updateReserved(static_cast<const void*>(specialMicroStateNames + i));
371  fmt_->m_data.encModule_.completeReservedWithDummies();
372 
373  for (unsigned int i = 0; i < nMonThreads_; i++) {
374  microstate_.emplace_back(getmInvalid());
375  microstateAcqFlag_.push_back(0);
376  tmicrostate_.emplace_back(getmInvalid());
377  tmicrostateAcqFlag_.push_back(0);
378 
379  //for synchronization
380  streamCounterUpdating_.push_back(new std::atomic<bool>(false));
381  }
382 
383  //initial size until we detect number of bins
384  fmt_->m_data.macrostateBins_ = FastMonState::MCOUNT;
385  fmt_->m_data.microstateBins_ = 0;
386  fmt_->m_data.inputstateBins_ = FastMonState::inCOUNT;
387 
388  lastGlobalLumi_ = 0;
389  isInitTransition_ = true;
390  lumiFromSource_ = 0;
391 
392  //startup monitoring
393  fmt_->resetFastMonitor(microstateDefPath_, fastMicrostateDefPath_);
394  fmt_->jsonMonitor_->setNStreams(nMonThreads_);
395  fmt_->m_data.registerVariables(fmt_->jsonMonitor_.get(), nMonThreads_, nStreams_, nThreads_);
396  monInit_.store(false, std::memory_order_release);
397  if (sleepTime_ > 0)
399  }
400 
404  context = " FromThisContext ";
406  context = " FromAnotherContext";
408  context = " FromExternalSignal";
409  edm::LogWarning("FastMonitoringService")
410  << " STREAM " << sc.streamID().value() << " earlyTermination -: ID:" << sc.eventID()
411  << " LS:" << sc.eventID().luminosityBlock() << " " << context;
412  std::lock_guard<std::mutex> lock(fmt_->monlock_);
413  exceptionInLS_.push_back(sc.eventID().luminosityBlock());
414  has_data_exception_.store(true);
415  }
416 
420  context = " FromThisContext ";
422  context = " FromAnotherContext";
424  context = " FromExternalSignal";
425  edm::LogWarning("FastMonitoringService")
426  << " GLOBAL "
427  << "earlyTermination -: LS:" << gc.luminosityBlockID().luminosityBlock() << " " << context;
428  std::lock_guard<std::mutex> lock(fmt_->monlock_);
430  has_data_exception_.store(true);
431  }
432 
436  context = " FromThisContext ";
438  context = " FromAnotherContext";
440  context = " FromExternalSignal";
441  edm::LogWarning("FastMonitoringService") << " SOURCE "
442  << "earlyTermination -: " << context;
443  std::lock_guard<std::mutex> lock(fmt_->monlock_);
444  exception_detected_ = true;
445  has_source_exception_.store(true);
446  has_data_exception_.store(true);
447  }
448 
450  std::lock_guard<std::mutex> lock(fmt_->monlock_);
451  if (!ls)
452  exception_detected_ = true;
453  else
454  exceptionInLS_.push_back(ls);
455  }
456 
458  return has_source_exception_.load() || has_data_exception_.load();
459  }
460 
462  if (!has_data_exception_.load())
463  return false;
464  if (has_source_exception_.load())
465  return true;
466  std::lock_guard<std::mutex> lock(fmt_->monlock_);
467  for (auto ex : exceptionInLS_) {
468  if (ls == ex)
469  return true;
470  }
471  return false;
472  }
473 
475 
476  //new output module name is stream
478  std::lock_guard<std::mutex> lock(fmt_->monlock_);
479  //std::cout << " Pre module Begin Job module: " << desc.moduleName() << std::endl;
480 
481  //build a map of modules keyed by their module description address
482  //here we need to treat output modules in a special way so they can be easily singled out
483  if (desc.moduleName() == "Stream" || desc.moduleName() == "GlobalEvFOutputModule" ||
484  desc.moduleName() == "EvFOutputModule" || desc.moduleName() == "EventStreamFileWriter" ||
485  desc.moduleName() == "PoolOutputModule") {
486  fmt_->m_data.encModule_.updateReserved((void*)&desc);
487  nOutputModules_++;
488  } else
489  fmt_->m_data.encModule_.update((void*)&desc);
490  }
491 
493  std::string&& moduleLegStrJson = makeModuleLegendaJson();
494  FileIO::writeStringToFile(moduleLegendFileJson_, moduleLegStrJson);
495 
496  std::string inputLegendStrJson = makeInputLegendaJson();
497  FileIO::writeStringToFile(inputLegendFileJson_, inputLegendStrJson);
498 
499  fmt_->m_data.macrostate_ = FastMonState::sJobReady;
500 
501  //update number of entries in module histogram
502  std::lock_guard<std::mutex> lock(fmt_->monlock_);
503  //double the size to add post-acquire states
504  fmt_->m_data.microstateBins_ = fmt_->m_data.encModule_.vecsize() * 2;
505  }
506 
508  fmt_->m_data.macrostate_ = FastMonState::sJobEnded;
509  fmt_->stop();
510  }
511 
513  fmt_->m_data.macrostate_ = FastMonState::sRunning;
514  isInitTransition_ = false;
515  }
516 
518  timeval lumiStartTime;
519  gettimeofday(&lumiStartTime, nullptr);
520  unsigned int newLumi = gc.luminosityBlockID().luminosityBlock();
521  lastGlobalLumi_ = newLumi;
522 
523  std::lock_guard<std::mutex> lock(fmt_->monlock_);
524  lumiStartTime_[newLumi] = lumiStartTime;
525  //reset all states to idle
526  if (tbbMonitoringMode_)
527  for (unsigned i = 0; i < nThreads_; i++)
528  if (tmicrostate_[i] == getmInvalid())
529  tmicrostate_[i] = getmIdle();
530  }
531 
533  unsigned int lumi = gc.luminosityBlockID().luminosityBlock();
534  LogDebug("FastMonitoringService") << "Lumi ended. Writing JSON information. LUMI -: " << lumi;
535  timeval lumiStopTime;
536  gettimeofday(&lumiStopTime, nullptr);
537 
538  std::lock_guard<std::mutex> lock(fmt_->monlock_);
539 
540  // Compute throughput
541  timeval stt = lumiStartTime_[lumi];
542  lumiStartTime_.erase(lumi);
543  unsigned long usecondsForLumi = (lumiStopTime.tv_sec - stt.tv_sec) * 1000000 + (lumiStopTime.tv_usec - stt.tv_usec);
544  unsigned long accuSize = accuSize_.find(lumi) == accuSize_.end() ? 0 : accuSize_[lumi];
545  accuSize_.erase(lumi);
546  double throughput = throughputFactor() * double(accuSize) / double(usecondsForLumi);
547  //store to registered variable
548  fmt_->m_data.fastThroughputJ_.value() = throughput;
549 
550  //update
551  doSnapshot(lumi, true);
552 
553  //retrieve one result we need (todo: sanity check if it's found)
554  IntJ* lumiProcessedJptr = dynamic_cast<IntJ*>(fmt_->jsonMonitor_->getMergedIntJForLumi("Processed", lumi));
555  if (!lumiProcessedJptr)
556  throw cms::Exception("FastMonitoringService") << "Internal error: got null pointer from FastMonitor";
557  processedEventsPerLumi_[lumi] = std::pair<unsigned int, bool>(lumiProcessedJptr->value(), false);
558 
559  //checking if exception has been thrown (in case of Global/Stream early termination, for this LS)
560  bool exception_detected = exception_detected_;
561  for (auto ex : exceptionInLS_)
562  if (lumi == ex)
563  exception_detected = true;
564 
565  if (edm::shutdown_flag || exception_detected) {
566  edm::LogInfo("FastMonitoringService")
567  << "Run interrupted. Skip writing EoL information -: " << processedEventsPerLumi_[lumi].first
568  << " events were processed in LUMI " << lumi;
569  //this will prevent output modules from producing json file for possibly incomplete lumi
570  processedEventsPerLumi_[lumi].first = 0;
571  processedEventsPerLumi_[lumi].second = true;
572  //disable this exception, so service can be used standalone (will be thrown if output module asks for this information)
573  //throw cms::Exception("FastMonitoringService") << "SOURCE did not send update for lumi block. LUMI -:" << lumi;
574  return;
575  }
576 
577  if (inputSource_ || daqInputSource_) {
578  auto sourceReport =
580  if (sourceReport.first) {
581  if (sourceReport.second != processedEventsPerLumi_[lumi].first) {
582  throw cms::Exception("FastMonitoringService") << "MISMATCH with SOURCE update. LUMI -: " << lumi
583  << ", events(processed):" << processedEventsPerLumi_[lumi].first
584  << " events(source):" << sourceReport.second;
585  }
586  }
587  }
588 
589  edm::LogInfo("FastMonitoringService")
590  << "Statistics for lumisection -: lumi = " << lumi << " events = " << lumiProcessedJptr->value()
591  << " time = " << usecondsForLumi / 1000000 << " size = " << accuSize << " thr = " << throughput;
592  delete lumiProcessedJptr;
593 
594  //full global and stream merge (will be used by output modules), output from this service is deprecated
595  fmt_->jsonMonitor_->outputFullJSON("dummy", lumi, false);
596  fmt_->jsonMonitor_->discardCollected(lumi); //we don't do further updates for this lumi
597  }
598 
600  std::lock_guard<std::mutex> lock(fmt_->monlock_);
601  unsigned int lumi = gc.luminosityBlockID().luminosityBlock();
602  //LS monitoring snapshot with input source data has been taken in previous callback
603  avgLeadTime_.erase(lumi);
605  lockStatsDuringLumi_.erase(lumi);
606 
607  //output module already used this in end lumi (this could be migrated to EvFDaqDirector as it is essential for FFF bookkeeping)
609  }
610 
612  std::lock_guard<std::mutex> lock(fmt_->monlock_);
613  fmt_->m_data.streamLumi_[sc.streamID().value()] = sc.eventID().luminosityBlock();
614 
615  //reset collected values for this stream
616  *(fmt_->m_data.processed_[sc.streamID().value()]) = 0;
617 
618  microstate_[sc.streamID().value()] = getmBoL();
619  }
620 
622  microstate_[sc.streamID().value()] = getmIdle();
623  }
624 
626  std::lock_guard<std::mutex> lock(fmt_->monlock_);
627  //update processed count to be complete at this time
628  //doStreamEOLSnapshot(sc.eventID().luminosityBlock(), sid);
629  fmt_->jsonMonitor_->snapStreamAtomic(sc.eventID().luminosityBlock(), sc.streamID().value());
630  //reset this in case stream does not get notified of next lumi (we keep processed events only)
631  microstate_[sc.streamID().value()] = getmEoL();
632  }
633 
635  microstate_[sc.streamID().value()] = getmFwkEoL();
636  }
637 
639  microstate_[sc.streamID().value()] = getmEvent();
640  }
641 
643  (*(fmt_->m_data.processed_[sc.streamID().value()]))++;
644  //fast path counter (events accumulated in a run)
645  unsigned long res = totalEventsProcessed_.fetch_add(1, std::memory_order_relaxed);
646  fmt_->m_data.fastPathProcessedJ_ = res + 1;
647 
648  microstate_[sc.streamID().value()] = getmIdle();
649  }
650 
652  microstate_[getSID(sid)] = getmInput();
653  if (!tbbMonitoringMode_)
654  return;
656  }
657 
659  microstate_[getSID(sid)] = getmFwkOvhSrc();
660  if (!tbbMonitoringMode_)
661  return;
663  }
664 
666  edm::ModuleCallingContext const& mcc) {
667  microstate_[getSID(sc)] = (void*)(mcc.moduleDescription());
668  microstateAcqFlag_[getSID(sc)] = 1;
669  if (!tbbMonitoringMode_)
670  return;
671  tmicrostate_[getTID()] = (void*)(mcc.moduleDescription());
673  }
674 
676  edm::ModuleCallingContext const& mcc) {
678  microstateAcqFlag_[getSID(sc)] = 0;
679  if (!tbbMonitoringMode_)
680  return;
683  }
684 
686  microstate_[getSID(sc)] = (void*)(mcc.moduleDescription());
687  if (!tbbMonitoringMode_)
688  return;
689  tmicrostate_[getTID()] = (void*)(mcc.moduleDescription());
690  }
691 
694  if (!tbbMonitoringMode_)
695  return;
697  }
698 
699  //from source
700  void FastMonitoringService::accumulateFileSize(unsigned int lumi, unsigned long fileSize) {
701  std::lock_guard<std::mutex> lock(fmt_->monlock_);
702 
703  if (accuSize_.find(lumi) == accuSize_.end())
704  accuSize_[lumi] = fileSize;
705  else
706  accuSize_[lumi] += fileSize;
707 
710  else
712  }
713 
715  gettimeofday(&fileLookStart_, nullptr);
716  /*
717  std::cout << "Started looking for .raw file at: s=" << fileLookStart_.tv_sec << ": ms = "
718  << fileLookStart_.tv_usec / 1000.0 << std::endl;
719  */
720  }
721 
723  gettimeofday(&fileLookStop_, nullptr);
724  /*
725  std::cout << "Stopped looking for .raw file at: s=" << fileLookStop_.tv_sec << ": ms = "
726  << fileLookStop_.tv_usec / 1000.0 << std::endl;
727  */
728  std::lock_guard<std::mutex> lock(fmt_->monlock_);
729 
730  if (lumi > lumiFromSource_) {
732  leadTimes_.clear();
733  }
734  unsigned long elapsedTime = (fileLookStop_.tv_sec - fileLookStart_.tv_sec) * 1000000 // sec to us
735  + (fileLookStop_.tv_usec - fileLookStart_.tv_usec); // us
736  // add this to lead times for this lumi
737  leadTimes_.push_back((double)elapsedTime);
738 
739  // recompute average lead time for this lumi
740  if (leadTimes_.size() == 1)
742  else {
743  double totTime = 0;
744  for (unsigned int i = 0; i < leadTimes_.size(); i++)
745  totTime += leadTimes_[i];
746  avgLeadTime_[lumi] = 0.001 * (totTime / leadTimes_.size());
747  }
748  }
749 
750  void FastMonitoringService::reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount) {
751  std::lock_guard<std::mutex> lock(fmt_->monlock_);
752  lockStatsDuringLumi_[ls] = std::pair<double, unsigned int>(waitTime, lockCount);
753  }
754 
756  tmicrostate_[tbb::this_task_arena::current_thread_index()] = &specialMicroStateNames[m];
757  }
758 
759  //for the output module
760  unsigned int FastMonitoringService::getEventsProcessedForLumi(unsigned int lumi, bool* abortFlag) {
761  std::lock_guard<std::mutex> lock(fmt_->monlock_);
762 
763  auto it = processedEventsPerLumi_.find(lumi);
764  if (it != processedEventsPerLumi_.end()) {
765  unsigned int proc = it->second.first;
766  if (abortFlag)
767  *abortFlag = it->second.second;
768  return proc;
769  } else {
770  throw cms::Exception("FastMonitoringService")
771  << "output module wants already deleted (or never reported by SOURCE) lumisection event count for LUMI -: "
772  << lumi;
773  return 0;
774  }
775  }
776 
777  //for the output module
779  std::lock_guard<std::mutex> lock(fmt_->monlock_);
780 
781  auto it = processedEventsPerLumi_.find(lumi);
782  if (it != processedEventsPerLumi_.end()) {
783  unsigned int abortFlag = it->second.second;
784  return abortFlag;
785  } else {
786  throw cms::Exception("FastMonitoringService")
787  << "output module wants already deleted (or never reported by SOURCE) lumisection status for LUMI -: "
788  << lumi;
789  return false;
790  }
791  }
792 
793  // the function to be called in the thread. Thread completes when function returns.
795  monInit_.exchange(true, std::memory_order_acquire);
796  while (!fmt_->m_stoprequest) {
797  std::vector<std::vector<unsigned int>> lastEnc;
798  {
799  std::unique_lock<std::mutex> lock(fmt_->monlock_);
800 
801  doSnapshot(lastGlobalLumi_, false);
802 
803  lastEnc.emplace_back(fmt_->m_data.tmicrostateEncoded_);
804  lastEnc.emplace_back(fmt_->m_data.microstateEncoded_);
805 
807  std::vector<std::string> CSVv;
808  for (unsigned int i = 0; i < nMonThreads_; i++) {
809  CSVv.push_back(fmt_->jsonMonitor_->getCSVString((int)i));
810  }
811  // release mutex before writing out fast path file
812  lock.release()->unlock();
813  fmt_->jsonMonitor_->outputCSV(fastPath_, CSVv);
814  }
815  snapCounter_++;
816  }
817 
818  if (verbose_) {
819  edm::LogInfo msg("FastMonitoringService");
820  auto f = [&](std::vector<unsigned int> const& p) {
821  for (unsigned int i = 0; i < nMonThreads_; i++) {
822  if (i == 0)
823  msg << "[" << p[i] << ",";
824  else if (i <= nMonThreads_ - 1)
825  msg << p[i] << ",";
826  else
827  msg << p[i] << "]";
828  }
829  };
830 
831  msg << "Current states: Ms=" << fmt_->m_data.fastMacrostateJ_.value() << " ms=";
832  f(lastEnc[0]);
833  msg << " us=";
834  f(lastEnc[1]);
836  }
837 
838  ::sleep(sleepTime_);
839  }
840  }
841 
842  void FastMonitoringService::doSnapshot(const unsigned int ls, const bool isGlobalEOL) {
843  // update macrostate
844  fmt_->m_data.fastMacrostateJ_ = fmt_->m_data.macrostate_;
845 
846  std::vector<const void*> microstateCopy(microstate_.begin(), microstate_.end());
847  std::vector<const void*> tmicrostateCopy(tmicrostate_.begin(), tmicrostate_.end());
848  std::vector<unsigned char> microstateAcqCopy(microstateAcqFlag_.begin(), microstateAcqFlag_.end());
849  std::vector<unsigned char> tmicrostateAcqCopy(tmicrostateAcqFlag_.begin(), tmicrostateAcqFlag_.end());
850 
851  if (!isInitTransition_) {
852  auto itd = avgLeadTime_.find(ls);
853  if (itd != avgLeadTime_.end())
854  fmt_->m_data.fastAvgLeadTimeJ_ = itd->second;
855  else
856  fmt_->m_data.fastAvgLeadTimeJ_ = 0.;
857 
858  auto iti = filesProcessedDuringLumi_.find(ls);
859  if (iti != filesProcessedDuringLumi_.end())
860  fmt_->m_data.fastFilesProcessedJ_ = iti->second;
861  else
862  fmt_->m_data.fastFilesProcessedJ_ = 0;
863 
864  auto itrd = lockStatsDuringLumi_.find(ls);
865  if (itrd != lockStatsDuringLumi_.end()) {
866  fmt_->m_data.fastLockWaitJ_ = itrd->second.first;
867  fmt_->m_data.fastLockCountJ_ = itrd->second.second;
868  } else {
869  fmt_->m_data.fastLockWaitJ_ = 0.;
870  fmt_->m_data.fastLockCountJ_ = 0.;
871  }
872  }
873 
874  for (unsigned int i = 0; i < nThreads_; i++) {
875  if (tmicrostateCopy[i] == getmIdle() && ct_->isThreadActive(i)) {
876  //overhead if thread is running
877  tmicrostateCopy[i] = getmFwk();
878  }
879  if (tmicrostateAcqCopy[i])
880  fmt_->m_data.tmicrostateEncoded_[i] =
881  fmt_->m_data.microstateBins_ + fmt_->m_data.encModule_.encode(tmicrostateCopy[i]);
882  else
883  fmt_->m_data.tmicrostateEncoded_[i] = fmt_->m_data.encModule_.encode(tmicrostateCopy[i]);
884  }
885 
886  for (unsigned int i = 0; i < nStreams_; i++) {
887  if (microstateAcqCopy[i])
888  fmt_->m_data.microstateEncoded_[i] =
889  fmt_->m_data.microstateBins_ + fmt_->m_data.encModule_.encode(microstateCopy[i]);
890  else
891  fmt_->m_data.microstateEncoded_[i] = fmt_->m_data.encModule_.encode(microstateCopy[i]);
892  }
893 
894  bool inputStatePerThread = false;
895 
897  switch (inputSupervisorState_) {
899  fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_fileLimit;
900  break;
902  fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_waitFreeChunk;
903  break;
905  fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_waitFreeChunkCopying;
906  break;
908  fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_waitFreeThread;
909  break;
911  fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_waitFreeThreadCopying;
912  break;
914  fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_busy;
915  break;
917  fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_lockPolling;
918  break;
920  fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_lockPollingCopying;
921  break;
923  fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_runEnd;
924  break;
926  fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_noFile;
927  break;
929  fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_newFile;
930  break;
933  break;
935  fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_newFileWaitThread;
936  break;
939  break;
941  fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_newFileWaitChunk;
942  break;
943  default:
944  fmt_->m_data.inputState_[0] = FastMonState::inWaitInput;
945  }
946  } else if (inputState_ == FastMonState::inWaitChunk) {
947  switch (inputSupervisorState_) {
949  fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_fileLimit;
950  break;
952  fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_waitFreeChunk;
953  break;
955  fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_waitFreeChunkCopying;
956  break;
958  fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_waitFreeThread;
959  break;
961  fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_waitFreeThreadCopying;
962  break;
964  fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_busy;
965  break;
967  fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_lockPolling;
968  break;
970  fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_lockPollingCopying;
971  break;
973  fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_runEnd;
974  break;
976  fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_noFile;
977  break;
979  fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_newFile;
980  break;
983  break;
985  fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_newFileWaitThread;
986  break;
989  break;
991  fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_newFileWaitChunk;
992  break;
993  default:
994  fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk;
995  }
996  } else if (inputState_ == FastMonState::inNoRequest) {
997  inputStatePerThread = true;
998  for (unsigned int i = 0; i < nMonThreads_; i++) {
999  if (i >= nStreams_)
1000  fmt_->m_data.inputState_[i] = FastMonState::inIgnore;
1001  else if (microstateCopy[i] == getmIdle())
1002  fmt_->m_data.inputState_[i] = FastMonState::inNoRequestWithIdleThreads;
1003  else if (microstateCopy[i] == getmEoL() || microstateCopy[i] == getmFwkEoL())
1004  fmt_->m_data.inputState_[i] = FastMonState::inNoRequestWithEoLThreads;
1005  else
1006  fmt_->m_data.inputState_[i] = FastMonState::inNoRequest;
1007  }
1008  } else if (inputState_ == FastMonState::inNewLumi) {
1009  inputStatePerThread = true;
1010  for (unsigned int i = 0; i < nMonThreads_; i++) {
1011  if (i >= nStreams_)
1012  fmt_->m_data.inputState_[i] = FastMonState::inIgnore;
1013  else if (microstateCopy[i] == getmEoL() || microstateCopy[i] == getmFwkEoL())
1014  fmt_->m_data.inputState_[i] = FastMonState::inNewLumi;
1015  }
1017  //apply directly throttled state from supervisor
1018  fmt_->m_data.inputState_[0] = inputSupervisorState_;
1019  } else
1020  fmt_->m_data.inputState_[0] = inputState_;
1021 
1022  //this is same for all streams
1023  if (!inputStatePerThread)
1024  for (unsigned int i = 1; i < nMonThreads_; i++)
1025  fmt_->m_data.inputState_[i] = fmt_->m_data.inputState_[0];
1026 
1027  if (isGlobalEOL) { //only update global variables
1028  fmt_->jsonMonitor_->snapGlobal(ls);
1029  } else
1030  fmt_->jsonMonitor_->snap(ls);
1031  }
1032 
1033  //compatibility
1035 
1037 
1038 } //end namespace evf
std::atomic< bool > has_data_exception_
constexpr edm::ModuleDescription const * getmFwkEoL()
std::atomic< FastMonState::InputState > inputState_
void watchPreStreamEarlyTermination(PreStreamEarlyTermination::slot_type const &iSlot)
Definition: fillJson.h:27
static const std::string inputStateNames[FastMonState::inCOUNT]
constexpr edm::ModuleDescription const * getmInvalid()
constexpr edm::ModuleDescription const * getmFwkOvhSrc()
void watchPreEvent(PreEvent::slot_type const &iSlot)
ModuleDescription const * moduleDescription() const
constexpr edm::ModuleDescription const * getmFwkOvhMod()
void postModuleEventAcquire(edm::StreamContext const &, edm::ModuleCallingContext const &)
LuminosityBlockNumber_t luminosityBlock() const
std::vector< ContainableAtomic< const void * > > microstate_
constexpr edm::ModuleDescription const * getmIdle()
std::atomic< bool > isInitTransition_
void watchPreallocate(Preallocate::slot_type const &iSlot)
constexpr edm::ModuleDescription const * getmBoL()
void setExceptionDetected(unsigned int ls)
void on_scheduler_exit(bool) override
void watchPreModuleEventAcquire(PreModuleEventAcquire::slot_type const &iSlot)
void watchPostEndJob(PostEndJob::slot_type const &iSlot)
void preallocate(edm::service::SystemBounds const &)
std::map< unsigned int, timeval > lumiStartTime_
constexpr edm::ModuleDescription const * getmIgnore()
void preGlobalBeginLumi(edm::GlobalContext const &)
std::pair< bool, unsigned int > getEventReport(unsigned int lumi, bool erase)
void watchPreModuleEvent(PreModuleEvent::slot_type const &iSlot)
void postGlobalEndLumi(edm::GlobalContext const &)
void postEvent(edm::StreamContext const &)
void accumulateFileSize(unsigned int lumi, unsigned long fileSize)
void watchPreGlobalEndLumi(PreGlobalEndLumi::slot_type const &iSlot)
std::map< unsigned int, unsigned long > accuSize_
std::filesystem::path workingDirectory_
std::vector< std::atomic< bool > * > streamCounterUpdating_
void watchPostEvent(PostEvent::slot_type const &iSlot)
volatile std::atomic< bool > shutdown_flag
void watchPostStreamEndLumi(PostStreamEndLumi::slot_type const &iSlot)
void watchPreGlobalBeginLumi(PreGlobalBeginLumi::slot_type const &iSlot)
void watchPostModuleEvent(PostModuleEvent::slot_type const &iSlot)
Value & append(const Value &value)
Append value to array at the end.
void watchPostSourceEvent(PostSourceEvent::slot_type const &iSlot)
std::map< unsigned int, unsigned int > filesProcessedDuringLumi_
std::pair< bool, unsigned int > getEventReport(unsigned int lumi, bool erase)
Definition: DAQSource.cc:1388
void preGlobalEndLumi(edm::GlobalContext const &)
Represents a JSON value.
Definition: value.h:99
std::vector< ContainableAtomic< unsigned int > > threadactive_
bool isExceptionOnData(unsigned int ls)
void watchPreStreamEndLumi(PreStreamEndLumi::slot_type const &iSlot)
std::unique_ptr< FastMonitoringThread > fmt_
constexpr edm::ModuleDescription const * getmFwk()
void preGlobalEarlyTermination(edm::GlobalContext const &, edm::TerminationOrigin)
void watchPreSourceEarlyTermination(PreSourceEarlyTermination::slot_type const &iSlot)
Definition: Electron.h:6
constexpr int nSpecialModules
LuminosityBlockNumber_t luminosityBlock() const
Definition: EventID.h:39
void watchJobFailure(JobFailure::slot_type const &iSlot)
convenience function for attaching to signal
ConcurrencyTracker(unsigned num_expected)
void preModuleBeginJob(edm::ModuleDescription const &)
std::map< unsigned int, std::pair< unsigned int, bool > > processedEventsPerLumi_
MicroStateService(const edm::ParameterSet &, edm::ActivityRegistry &)
constexpr edm::ModuleDescription const * getmEoL()
void on_scheduler_entry(bool) override
constexpr edm::ModuleDescription const * getmGlobEoL()
void preStreamEndLumi(edm::StreamContext const &)
std::vector< ContainableAtomic< unsigned char > > tmicrostateAcqFlag_
std::map< unsigned int, double > avgLeadTime_
void watchPostStreamBeginLumi(PostStreamBeginLumi::slot_type const &iSlot)
void doSnapshot(const unsigned int ls, const bool isGlobalEOL)
void preStreamEarlyTermination(edm::StreamContext const &, edm::TerminationOrigin)
void watchPreGlobalEarlyTermination(PreGlobalEarlyTermination::slot_type const &iSlot)
std::atomic< FastMonState::InputState > inputSupervisorState_
StreamID const & streamID() const
Definition: StreamContext.h:55
constexpr double throughputFactor()
void watchPostModuleEventAcquire(PostModuleEventAcquire::slot_type const &iSlot)
std::filesystem::path runDirectory_
double f[11][100]
void watchPostGlobalEndLumi(PostGlobalEndLumi::slot_type const &iSlot)
void watchPreModuleBeginJob(PreModuleBeginJob::slot_type const &iSlot)
std::vector< ContainableAtomic< unsigned char > > microstateAcqFlag_
void preModuleEventAcquire(edm::StreamContext const &, edm::ModuleCallingContext const &)
void postModuleEvent(edm::StreamContext const &, edm::ModuleCallingContext const &)
LuminosityBlockID const & luminosityBlockID() const
Definition: GlobalContext.h:62
void postStreamBeginLumi(edm::StreamContext const &)
static unsigned int getSID(edm::StreamContext const &sc)
std::unique_ptr< ConcurrencyTracker > ct_
Log< level::Info, false > LogInfo
def ls(path, rec=False)
Definition: eostools.py:349
void postStreamEndLumi(edm::StreamContext const &)
constexpr edm::ModuleDescription const * getmInput()
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
void preStreamBeginLumi(edm::StreamContext const &)
std::atomic< unsigned long > totalEventsProcessed_
FedRawDataInputSource * inputSource_
std::map< unsigned int, std::pair< double, unsigned int > > lockStatsDuringLumi_
void setTMicrostate(FastMonState::Microstate m)
constexpr int nReservedModules
void watchPreStreamBeginLumi(PreStreamBeginLumi::slot_type const &iSlot)
void add(std::string const &label, ParameterSetDescription const &psetDescription)
tuple msg
Definition: mps_check.py:286
std::vector< double > leadTimes_
void preBeginJob(edm::PathsAndConsumesOfModulesBase const &, edm::ProcessContext const &pc)
void watchPreBeginJob(PreBeginJob::slot_type const &iSlot)
convenience function for attaching to signal
void stoppedLookingForFile(unsigned int lumi)
std::atomic< bool > has_source_exception_
EventID const & eventID() const
Definition: StreamContext.h:60
constexpr edm::ModuleDescription const * getmEvent()
void postGlobalBeginRun(edm::GlobalContext const &)
void preEvent(edm::StreamContext const &)
unsigned int getEventsProcessedForLumi(unsigned int lumi, bool *abortFlag=nullptr)
void preSourceEarlyTermination(edm::TerminationOrigin)
void watchPreSourceEvent(PreSourceEvent::slot_type const &iSlot)
unsigned int value() const
Definition: StreamID.h:43
bool getAbortFlagForLumi(unsigned int lumi)
bool isThreadActive(unsigned index)
Log< level::Warning, false > LogWarning
void preModuleEvent(edm::StreamContext const &, edm::ModuleCallingContext const &)
static const edm::ModuleDescription specialMicroStateNames[FastMonState::mCOUNT]
Writes a Value in JSON format in a human friendly way.
Definition: writer.h:63
void reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount)
std::vector< unsigned int > exceptionInLS_
constexpr edm::ModuleDescription const * getmIdleSource()
#define LogDebug(id)
void watchPostBeginJob(PostBeginJob::slot_type const &iSlot)
convenience function for attaching to signal
constexpr edm::ModuleDescription const * getmDqm()
array value (ordered list)
Definition: value.h:30
std::vector< ContainableAtomic< const void * > > tmicrostate_