CMS 3D CMS Logo

FastMonitoringService.cc
Go to the documentation of this file.
3 
16 
20 
23 
24 #include <iostream>
25 #include <iomanip>
26 #include <sys/time.h>
27 
28 using namespace jsoncollector;
29 
30 constexpr double throughputFactor() { return (1000000) / double(1024 * 1024); }
31 
32 namespace evf {
33 
34  const edm::ModuleDescription FastMonitoringService::reservedMicroStateNames[FastMonState::mCOUNT] = {
35  edm::ModuleDescription("Dummy", "Invalid"),
36  edm::ModuleDescription("Dummy", "Idle"),
37  edm::ModuleDescription("Dummy", "FwkOvhSrc"),
38  edm::ModuleDescription("Dummy", "FwkOvhMod"), //set post produce, analyze or filter
39  edm::ModuleDescription("Dummy", "FwkEoL"),
40  edm::ModuleDescription("Dummy", "Input"),
41  edm::ModuleDescription("Dummy", "DQM"),
42  edm::ModuleDescription("Dummy", "BoL"),
43  edm::ModuleDescription("Dummy", "EoL"),
44  edm::ModuleDescription("Dummy", "GlobalEoL")};
45 
46  const std::string FastMonitoringService::macroStateNames[FastMonState::MCOUNT] = {"Init",
47  "JobReady",
48  "RunGiven",
49  "Running",
50  "Stopping",
51  "Done",
52  "JobEnded",
53  "Error",
54  "ErrorEnded",
55  "End",
56  "Invalid"};
57 
58  const std::string FastMonitoringService::inputStateNames[FastMonState::inCOUNT] = {
59  "Ignore",
60  "Init",
61  "WaitInput",
62  "NewLumi",
63  "NewLumiBusyEndingLS",
64  "NewLumiIdleEndingLS",
65  "RunEnd",
66  "ProcessingFile",
67  "WaitChunk",
68  "ChunkReceived",
69  "ChecksumEvent",
70  "CachedEvent",
71  "ReadEvent",
72  "ReadCleanup",
73  "NoRequest",
74  "NoRequestWithIdleThreads",
75  "NoRequestWithGlobalEoL",
76  "NoRequestWithEoLThreads",
77  "SupFileLimit",
78  "SupWaitFreeChunk",
79  "SupWaitFreeChunkCopying",
80  "SupWaitFreeThread",
81  "SupWaitFreeThreadCopying",
82  "SupBusy",
83  "SupLockPolling",
84  "SupLockPollingCopying",
85  "SupNoFile",
86  "SupNewFile",
87  "SupNewFileWaitThreadCopying",
88  "SupNewFileWaitThread",
89  "SupNewFileWaitChunkCopying",
90  "SupNewFileWaitChunk",
91  "WaitInput_fileLimit",
92  "WaitInput_waitFreeChunk",
93  "WaitInput_waitFreeChunkCopying",
94  "WaitInput_waitFreeThread",
95  "WaitInput_waitFreeThreadCopying",
96  "WaitInput_busy",
97  "WaitInput_lockPolling",
98  "WaitInput_lockPollingCopying",
99  "WaitInput_runEnd",
100  "WaitInput_noFile",
101  "WaitInput_newFile",
102  "WaitInput_newFileWaitThreadCopying",
103  "WaitInput_newFileWaitThread",
104  "WaitInput_newFileWaitChunkCopying",
105  "WaitInput_newFileWaitChunk",
106  "WaitChunk_fileLimit",
107  "WaitChunk_waitFreeChunk",
108  "WaitChunk_waitFreeChunkCopying",
109  "WaitChunk_waitFreeThread",
110  "WaitChunk_waitFreeThreadCopying",
111  "WaitChunk_busy",
112  "WaitChunk_lockPolling",
113  "WaitChunk_lockPollingCopying",
114  "WaitChunk_runEnd",
115  "WaitChunk_noFile",
116  "WaitChunk_newFile",
117  "WaitChunk_newFileWaitThreadCopying",
118  "WaitChunk_newFileWaitThread",
119  "WaitChunk_newFileWaitChunkCopying",
120  "WaitChunk_newFileWaitChunk",
121  "inSupThrottled",
122  "inThrottled"};
123 
124  const std::string FastMonitoringService::nopath_ = "NoPath";
125 
127  : MicroStateService(iPS, reg),
128  fmt_(new FastMonitoringThread()),
129  nStreams_(0) //until initialized
130  ,
131  sleepTime_(iPS.getUntrackedParameter<int>("sleepTime", 1)),
132  fastMonIntervals_(iPS.getUntrackedParameter<unsigned int>("fastMonIntervals", 2)),
133  fastName_("fastmoni"),
134  slowName_("slowmoni"),
135  filePerFwkStream_(iPS.getUntrackedParameter<bool>("filePerFwkStream", false)),
136  totalEventsProcessed_(0) {
137  reg.watchPreallocate(this, &FastMonitoringService::preallocate); //receiving information on number of threads
139 
144 
148 
153 
155 
156  reg.watchPreEvent(this, &FastMonitoringService::preEvent); //stream
158 
159  reg.watchPreSourceEvent(this, &FastMonitoringService::preSourceEvent); //source (with streamID of requestor)
161 
164 
167 
171 
172  //find microstate definition path (required by the module)
173  struct stat statbuf;
174  std::string microstateBaseSuffix = "src/EventFilter/Utilities/plugins/microstatedef.jsd";
175  std::string microstatePath = std::string(std::getenv("CMSSW_BASE")) + "/" + microstateBaseSuffix;
176  if (stat(microstatePath.c_str(), &statbuf)) {
177  microstatePath = std::string(std::getenv("CMSSW_RELEASE_BASE")) + "/" + microstateBaseSuffix;
178  if (stat(microstatePath.c_str(), &statbuf)) {
179  microstatePath = microstateBaseSuffix;
180  if (stat(microstatePath.c_str(), &statbuf))
181  throw cms::Exception("FastMonitoringService") << "microstate definition file not found";
182  }
183  }
184  fastMicrostateDefPath_ = microstateDefPath_ = microstatePath;
185  }
186 
188 
191  desc.setComment("Service for File-based DAQ monitoring and event accounting");
192  desc.addUntracked<int>("sleepTime", 1)->setComment("Sleep time of the monitoring thread");
193  desc.addUntracked<unsigned int>("fastMonIntervals", 2)
194  ->setComment("Modulo of sleepTime intervals on which fastmon file is written out");
195  desc.addUntracked<bool>("filePerFwkStream", false)
196  ->setComment("Switches on monitoring output per framework stream");
197  desc.setAllowAnything();
198  descriptions.add("FastMonitoringService", desc);
199  }
200 
202  Json::Value legendaVector(Json::arrayValue);
203  for (int i = 0; i < fmt_->m_data.encPath_[0].current_; i++)
204  legendaVector.append(Json::Value(*(static_cast<const std::string*>(fmt_->m_data.encPath_[0].decode(i)))));
205  Json::Value valReserved(nReservedPaths);
206  Json::Value pathLegend;
207  pathLegend["names"] = legendaVector;
208  pathLegend["reserved"] = valReserved;
210  return writer.write(pathLegend);
211  }
212 
214  Json::Value legendaVector(Json::arrayValue);
215  for (int i = 0; i < fmt_->m_data.encModule_.current_; i++)
216  legendaVector.append(
217  Json::Value((static_cast<const edm::ModuleDescription*>(fmt_->m_data.encModule_.decode(i)))->moduleLabel()));
218  //duplicate modules adding a list for acquire states (not all modules actually have it)
219  for (int i = 0; i < fmt_->m_data.encModule_.current_; i++)
220  legendaVector.append(Json::Value(
221  (static_cast<const edm::ModuleDescription*>(fmt_->m_data.encModule_.decode(i)))->moduleLabel() + "__ACQ"));
222  Json::Value valReserved(nReservedModules);
223  Json::Value valSpecial(nSpecialModules);
224  Json::Value valOutputModules(nOutputModules_);
225  Json::Value moduleLegend;
226  moduleLegend["names"] = legendaVector;
227  moduleLegend["reserved"] = valReserved;
228  moduleLegend["special"] = valSpecial;
229  moduleLegend["output"] = valOutputModules;
231  return writer.write(moduleLegend);
232  }
233 
235  Json::Value legendaVector(Json::arrayValue);
236  for (int i = 0; i < FastMonState::inCOUNT; i++)
237  legendaVector.append(Json::Value(inputStateNames[i]));
238  Json::Value moduleLegend;
239  moduleLegend["names"] = legendaVector;
241  return writer.write(moduleLegend);
242  }
243 
245  nStreams_ = bounds.maxNumberOfStreams();
246  nThreads_ = bounds.maxNumberOfThreads();
247  //this should already be >=1
248  if (nStreams_ == 0)
249  nStreams_ = 1;
250  if (nThreads_ == 0)
251  nThreads_ = 1;
252  }
253 
255  edm::ProcessContext const& pc) {
256  // FIND RUN DIRECTORY
257  // The run dir should be set via the configuration of EvFDaqDirector
258 
259  if (edm::Service<evf::EvFDaqDirector>().operator->() == nullptr) {
260  throw cms::Exception("FastMonitoringService") << "EvFDaqDirector is not present";
261  }
262  std::filesystem::path runDirectory{edm::Service<evf::EvFDaqDirector>()->baseRunDir()};
263  workingDirectory_ = runDirectory_ = runDirectory;
264  workingDirectory_ /= "mon";
265 
266  if (!std::filesystem::is_directory(workingDirectory_)) {
267  LogDebug("FastMonitoringService") << "<MON> DIR NOT FOUND! Trying to create -: " << workingDirectory_.string();
268  std::filesystem::create_directories(workingDirectory_);
269  if (!std::filesystem::is_directory(workingDirectory_))
270  edm::LogWarning("FastMonitoringService") << "Unable to create <MON> DIR -: " << workingDirectory_.string()
271  << ". No monitoring data will be written.";
272  }
273 
274  std::ostringstream fastFileName;
275 
276  fastFileName << fastName_ << "_pid" << std::setfill('0') << std::setw(5) << getpid() << ".fast";
278  fast /= fastFileName.str();
279  fastPath_ = fast.string();
280  if (filePerFwkStream_)
281  for (unsigned int i = 0; i < nStreams_; i++) {
282  std::ostringstream fastFileNameTid;
283  fastFileNameTid << fastName_ << "_pid" << std::setfill('0') << std::setw(5) << getpid() << "_tid" << i
284  << ".fast";
286  fastTid /= fastFileNameTid.str();
287  fastPathList_.push_back(fastTid.string());
288  }
289 
290  std::ostringstream moduleLegFile;
291  std::ostringstream moduleLegFileJson;
292  moduleLegFile << "microstatelegend_pid" << std::setfill('0') << std::setw(5) << getpid() << ".leg";
293  moduleLegFileJson << "microstatelegend_pid" << std::setfill('0') << std::setw(5) << getpid() << ".jsn";
294  moduleLegendFile_ = (workingDirectory_ / moduleLegFile.str()).string();
295  moduleLegendFileJson_ = (workingDirectory_ / moduleLegFileJson.str()).string();
296 
297  std::ostringstream pathLegFile;
298  std::ostringstream pathLegFileJson;
299  pathLegFile << "pathlegend_pid" << std::setfill('0') << std::setw(5) << getpid() << ".leg";
300  pathLegendFile_ = (workingDirectory_ / pathLegFile.str()).string();
301  pathLegFileJson << "pathlegend_pid" << std::setfill('0') << std::setw(5) << getpid() << ".jsn";
302  pathLegendFileJson_ = (workingDirectory_ / pathLegFileJson.str()).string();
303 
304  std::ostringstream inputLegFileJson;
305  inputLegFileJson << "inputlegend_pid" << std::setfill('0') << std::setw(5) << getpid() << ".jsn";
306  inputLegendFileJson_ = (workingDirectory_ / inputLegFileJson.str()).string();
307 
308  LogDebug("FastMonitoringService") << "Initializing FastMonitor with microstate def path -: " << microstateDefPath_;
309  //<< encPath_.current_ + 1 << " " << encModule_.current_ + 1
310 
311  /*
312  * initialize the fast monitor with:
313  * vector of pointers to monitorable parameters
314  * path to definition
315  *
316  */
317 
318  fmt_->m_data.macrostate_ = FastMonState::sInit;
319 
320  for (unsigned int i = 0; i < (FastMonState::mCOUNT); i++)
321  fmt_->m_data.encModule_.updateReserved(static_cast<const void*>(reservedMicroStateNames + i));
322  fmt_->m_data.encModule_.completeReservedWithDummies();
323 
324  for (unsigned int i = 0; i < nStreams_; i++) {
325  fmt_->m_data.ministate_.emplace_back(&nopath_);
326  fmt_->m_data.microstate_.emplace_back(&reservedMicroStateNames[FastMonState::mInvalid]);
327  fmt_->m_data.microstateAcqFlag_.push_back(0);
328 
329  //for synchronization
330  streamCounterUpdating_.push_back(new std::atomic<bool>(false));
331 
332  //path (mini) state
333  fmt_->m_data.encPath_.emplace_back(0);
334  fmt_->m_data.encPath_[i].update(static_cast<const void*>(&nopath_));
335 
336  for (auto& path : pathsInfo.paths()) {
337  fmt_->m_data.encPath_[i].updatePreinit(path);
338  }
339  for (auto& endPath : pathsInfo.endPaths()) {
340  fmt_->m_data.encPath_[i].updatePreinit(endPath);
341  }
342  }
343  //for (unsigned int i=0;i<nThreads_;i++)
344  // threadMicrostate_.push_back(&reservedMicroStateNames[mInvalid]);
345 
346  //initial size until we detect number of bins
347  fmt_->m_data.macrostateBins_ = FastMonState::MCOUNT;
348  fmt_->m_data.microstateBins_ = 0;
349  fmt_->m_data.inputstateBins_ = FastMonState::inCOUNT;
350  fmt_->m_data.ministateBins_ = fmt_->m_data.encPath_[0].vecsize();
351 
352  lastGlobalLumi_ = 0;
353  isInitTransition_ = true;
354  lumiFromSource_ = 0;
355 
356  //startup monitoring
357  fmt_->resetFastMonitor(microstateDefPath_, fastMicrostateDefPath_);
358  fmt_->jsonMonitor_->setNStreams(nStreams_);
359  fmt_->m_data.registerVariables(fmt_->jsonMonitor_.get(), nStreams_, threadIDAvailable_ ? nThreads_ : 0);
360  monInit_.store(false, std::memory_order_release);
361  if (sleepTime_ > 0)
363 
364  //this definition needs: #include "tbb/compat/thread"
365  //however this would results in TBB imeplementation replacing std::thread
366  //(both supposedly call pthread_self())
367  //number of threads created in process could be obtained from /proc,
368  //assuming that all posix threads are true kernel threads capable of running in parallel
369 
370  //#if TBB_IMPLEMENT_CPP0X
372  //threadIDAvailable_=true;
373  //#endif
374  }
375 
379  context = " FromThisContext ";
381  context = " FromAnotherContext";
383  context = " FromExternalSignal";
384  edm::LogWarning("FastMonitoringService")
385  << " STREAM " << sc.streamID().value() << " earlyTermination -: ID:" << sc.eventID()
386  << " LS:" << sc.eventID().luminosityBlock() << " " << context;
387  std::lock_guard<std::mutex> lock(fmt_->monlock_);
388  exceptionInLS_.push_back(sc.eventID().luminosityBlock());
389  has_data_exception_.store(true);
390  }
391 
395  context = " FromThisContext ";
397  context = " FromAnotherContext";
399  context = " FromExternalSignal";
400  edm::LogWarning("FastMonitoringService")
401  << " GLOBAL "
402  << "earlyTermination -: LS:" << gc.luminosityBlockID().luminosityBlock() << " " << context;
403  std::lock_guard<std::mutex> lock(fmt_->monlock_);
405  has_data_exception_.store(true);
406  }
407 
411  context = " FromThisContext ";
413  context = " FromAnotherContext";
415  context = " FromExternalSignal";
416  edm::LogWarning("FastMonitoringService") << " SOURCE "
417  << "earlyTermination -: " << context;
418  std::lock_guard<std::mutex> lock(fmt_->monlock_);
419  exception_detected_ = true;
420  has_source_exception_.store(true);
421  has_data_exception_.store(true);
422  }
423 
425  std::lock_guard<std::mutex> lock(fmt_->monlock_);
426  if (!ls)
427  exception_detected_ = true;
428  else
429  exceptionInLS_.push_back(ls);
430  }
431 
433  return has_source_exception_.load() || has_data_exception_.load();
434  }
435 
437  if (!has_data_exception_.load())
438  return false;
439  if (has_source_exception_.load())
440  return true;
441  std::lock_guard<std::mutex> lock(fmt_->monlock_);
442  for (auto ex : exceptionInLS_) {
443  if (ls == ex)
444  return true;
445  }
446  return false;
447  }
448 
450 
451  //new output module name is stream
453  std::lock_guard<std::mutex> lock(fmt_->monlock_);
454  //std::cout << " Pre module Begin Job module: " << desc.moduleName() << std::endl;
455 
456  //build a map of modules keyed by their module description address
457  //here we need to treat output modules in a special way so they can be easily singled out
458  if (desc.moduleName() == "Stream" || desc.moduleName() == "GlobalEvFOutputModule" ||
459  desc.moduleName() == "EvFOutputModule" || desc.moduleName() == "EventStreamFileWriter" ||
460  desc.moduleName() == "PoolOutputModule") {
461  fmt_->m_data.encModule_.updateReserved((void*)&desc);
462  nOutputModules_++;
463  } else
464  fmt_->m_data.encModule_.update((void*)&desc);
465  }
466 
468  std::string&& moduleLegStrJson = makeModuleLegendaJson();
469  FileIO::writeStringToFile(moduleLegendFileJson_, moduleLegStrJson);
470 
471  std::string inputLegendStrJson = makeInputLegendaJson();
472  FileIO::writeStringToFile(inputLegendFileJson_, inputLegendStrJson);
473 
474  std::string pathLegendStrJson = makePathLegendaJson();
475  FileIO::writeStringToFile(pathLegendFileJson_, pathLegendStrJson);
476 
477  fmt_->m_data.macrostate_ = FastMonState::sJobReady;
478 
479  //update number of entries in module histogram
480  std::lock_guard<std::mutex> lock(fmt_->monlock_);
481  //double the size to add post-acquire states
482  fmt_->m_data.microstateBins_ = fmt_->m_data.encModule_.vecsize() * 2;
483  }
484 
486  fmt_->m_data.macrostate_ = FastMonState::sJobEnded;
487  fmt_->stop();
488  }
489 
491  fmt_->m_data.macrostate_ = FastMonState::sRunning;
492  isInitTransition_ = false;
493  }
494 
496  timeval lumiStartTime;
497  gettimeofday(&lumiStartTime, nullptr);
498  unsigned int newLumi = gc.luminosityBlockID().luminosityBlock();
499  lastGlobalLumi_ = newLumi;
500 
501  std::lock_guard<std::mutex> lock(fmt_->monlock_);
502  lumiStartTime_[newLumi] = lumiStartTime;
503  }
504 
506  unsigned int lumi = gc.luminosityBlockID().luminosityBlock();
507  LogDebug("FastMonitoringService") << "Lumi ended. Writing JSON information. LUMI -: " << lumi;
508  timeval lumiStopTime;
509  gettimeofday(&lumiStopTime, nullptr);
510 
511  std::lock_guard<std::mutex> lock(fmt_->monlock_);
512 
513  // Compute throughput
514  timeval stt = lumiStartTime_[lumi];
515  lumiStartTime_.erase(lumi);
516  unsigned long usecondsForLumi = (lumiStopTime.tv_sec - stt.tv_sec) * 1000000 + (lumiStopTime.tv_usec - stt.tv_usec);
517  unsigned long accuSize = accuSize_.find(lumi) == accuSize_.end() ? 0 : accuSize_[lumi];
518  accuSize_.erase(lumi);
519  double throughput = throughputFactor() * double(accuSize) / double(usecondsForLumi);
520  //store to registered variable
521  fmt_->m_data.fastThroughputJ_.value() = throughput;
522 
523  //update
524  doSnapshot(lumi, true);
525 
526  //retrieve one result we need (todo: sanity check if it's found)
527  IntJ* lumiProcessedJptr = dynamic_cast<IntJ*>(fmt_->jsonMonitor_->getMergedIntJForLumi("Processed", lumi));
528  if (!lumiProcessedJptr)
529  throw cms::Exception("FastMonitoringService") << "Internal error: got null pointer from FastMonitor";
530  processedEventsPerLumi_[lumi] = std::pair<unsigned int, bool>(lumiProcessedJptr->value(), false);
531 
532  //checking if exception has been thrown (in case of Global/Stream early termination, for this LS)
533  bool exception_detected = exception_detected_;
534  for (auto ex : exceptionInLS_)
535  if (lumi == ex)
536  exception_detected = true;
537 
538  if (edm::shutdown_flag || exception_detected) {
539  edm::LogInfo("FastMonitoringService")
540  << "Run interrupted. Skip writing EoL information -: " << processedEventsPerLumi_[lumi].first
541  << " events were processed in LUMI " << lumi;
542  //this will prevent output modules from producing json file for possibly incomplete lumi
543  processedEventsPerLumi_[lumi].first = 0;
544  processedEventsPerLumi_[lumi].second = true;
545  //disable this exception, so service can be used standalone (will be thrown if output module asks for this information)
546  //throw cms::Exception("FastMonitoringService") << "SOURCE did not send update for lumi block. LUMI -:" << lumi;
547  return;
548  }
549 
550  if (inputSource_ || daqInputSource_) {
551  auto sourceReport =
553  if (sourceReport.first) {
554  if (sourceReport.second != processedEventsPerLumi_[lumi].first) {
555  throw cms::Exception("FastMonitoringService") << "MISMATCH with SOURCE update. LUMI -: " << lumi
556  << ", events(processed):" << processedEventsPerLumi_[lumi].first
557  << " events(source):" << sourceReport.second;
558  }
559  }
560  }
561 
562  edm::LogInfo("FastMonitoringService")
563  << "Statistics for lumisection -: lumi = " << lumi << " events = " << lumiProcessedJptr->value()
564  << " time = " << usecondsForLumi / 1000000 << " size = " << accuSize << " thr = " << throughput;
565  delete lumiProcessedJptr;
566 
567  //full global and stream merge&output for this lumi
568 
569  // create file name for slow monitoring file
570  bool output = sleepTime_ > 0;
571  if (filePerFwkStream_) {
572  std::stringstream slowFileNameStem;
573  slowFileNameStem << slowName_ << "_ls" << std::setfill('0') << std::setw(4) << lumi << "_pid" << std::setfill('0')
574  << std::setw(5) << getpid();
576  slow /= slowFileNameStem.str();
577  fmt_->jsonMonitor_->outputFullJSONs(slow.string(), ".jsn", lumi, output);
578  } else {
579  std::stringstream slowFileName;
580  slowFileName << slowName_ << "_ls" << std::setfill('0') << std::setw(4) << lumi << "_pid" << std::setfill('0')
581  << std::setw(5) << getpid() << ".jsn";
583  slow /= slowFileName.str();
584  //full global and stream merge and JSON write for this lumi
585  fmt_->jsonMonitor_->outputFullJSON(slow.string(), lumi, output);
586  }
587  fmt_->jsonMonitor_->discardCollected(lumi); //we don't do further updates for this lumi
588  }
589 
591  std::lock_guard<std::mutex> lock(fmt_->monlock_);
592  unsigned int lumi = gc.luminosityBlockID().luminosityBlock();
593  //LS monitoring snapshot with input source data has been taken in previous callback
594  avgLeadTime_.erase(lumi);
596  lockStatsDuringLumi_.erase(lumi);
597 
598  //output module already used this in end lumi (this could be migrated to EvFDaqDirector as it is essential for FFF bookkeeping)
600  }
601 
603  unsigned int sid = sc.streamID().value();
604 
605  std::lock_guard<std::mutex> lock(fmt_->monlock_);
606  fmt_->m_data.streamLumi_[sid] = sc.eventID().luminosityBlock();
607 
608  //reset collected values for this stream
609  *(fmt_->m_data.processed_[sid]) = 0;
610 
611  fmt_->m_data.ministate_[sid] = &nopath_;
612  fmt_->m_data.microstate_[sid] = &reservedMicroStateNames[FastMonState::mBoL];
613  }
614 
616  fmt_->m_data.microstate_[sc.streamID().value()] = &reservedMicroStateNames[FastMonState::mIdle];
617  }
618 
620  unsigned int sid = sc.streamID().value();
621  std::lock_guard<std::mutex> lock(fmt_->monlock_);
622 
623  //update processed count to be complete at this time
624  //doStreamEOLSnapshot(sc.eventID().luminosityBlock(), sid);
625  fmt_->jsonMonitor_->snapStreamAtomic(sc.eventID().luminosityBlock(), sid);
626  //reset this in case stream does not get notified of next lumi (we keep processed events only)
627  fmt_->m_data.ministate_[sid] = &nopath_;
628  fmt_->m_data.microstate_[sid] = &reservedMicroStateNames[FastMonState::mEoL];
629  }
631  fmt_->m_data.microstate_[sc.streamID().value()] = &reservedMicroStateNames[FastMonState::mFwkEoL];
632  }
633 
635  fmt_->m_data.ministate_[sc.streamID()] = &(pc.pathName());
636  }
637 
639 
641  fmt_->m_data.microstate_[sc.streamID()] = &reservedMicroStateNames[FastMonState::mIdle];
642 
643  fmt_->m_data.ministate_[sc.streamID()] = &nopath_;
644 
645  (*(fmt_->m_data.processed_[sc.streamID()]))++;
646 
647  //fast path counter (events accumulated in a run)
648  unsigned long res = totalEventsProcessed_.fetch_add(1, std::memory_order_relaxed);
649  fmt_->m_data.fastPathProcessedJ_ = res + 1;
650  }
651 
653  fmt_->m_data.microstate_[sid.value()] = &reservedMicroStateNames[FastMonState::mInput];
654  }
655 
657  fmt_->m_data.microstate_[sid.value()] = &reservedMicroStateNames[FastMonState::mFwkOvhSrc];
658  }
659 
661  edm::ModuleCallingContext const& mcc) {
662  fmt_->m_data.microstate_[sc.streamID().value()] = (void*)(mcc.moduleDescription());
663  }
664 
666  edm::ModuleCallingContext const& mcc) {
667  //fmt_->m_data.microstate_[sc.streamID().value()] = (void*)(mcc.moduleDescription());
668  fmt_->m_data.microstateAcqFlag_[sc.streamID().value()] = 1;
669  }
670 
672  fmt_->m_data.microstate_[sc.streamID().value()] = (void*)(mcc.moduleDescription());
673  fmt_->m_data.microstateAcqFlag_[sc.streamID().value()] = 0;
674  }
675 
677  fmt_->m_data.microstate_[sc.streamID().value()] = &reservedMicroStateNames[FastMonState::mFwkOvhMod];
678  }
679 
680  //FUNCTIONS CALLED FROM OUTSIDE
681 
682  //this is for old-fashioned service that is not thread safe and can block other streams
683  //(we assume the worst case - everything is blocked)
685  for (unsigned int i = 0; i < nStreams_; i++)
686  fmt_->m_data.microstate_[i] = &reservedMicroStateNames[m];
687  }
688 
689  //this is for services that are multithreading-enabled or rarely blocks other streams
691  fmt_->m_data.microstate_[sid] = &reservedMicroStateNames[m];
692  }
693 
694  //from source
695  void FastMonitoringService::accumulateFileSize(unsigned int lumi, unsigned long fileSize) {
696  std::lock_guard<std::mutex> lock(fmt_->monlock_);
697 
698  if (accuSize_.find(lumi) == accuSize_.end())
699  accuSize_[lumi] = fileSize;
700  else
701  accuSize_[lumi] += fileSize;
702 
705  else
707  }
708 
710  gettimeofday(&fileLookStart_, nullptr);
711  /*
712  std::cout << "Started looking for .raw file at: s=" << fileLookStart_.tv_sec << ": ms = "
713  << fileLookStart_.tv_usec / 1000.0 << std::endl;
714  */
715  }
716 
718  gettimeofday(&fileLookStop_, nullptr);
719  /*
720  std::cout << "Stopped looking for .raw file at: s=" << fileLookStop_.tv_sec << ": ms = "
721  << fileLookStop_.tv_usec / 1000.0 << std::endl;
722  */
723  std::lock_guard<std::mutex> lock(fmt_->monlock_);
724 
725  if (lumi > lumiFromSource_) {
727  leadTimes_.clear();
728  }
729  unsigned long elapsedTime = (fileLookStop_.tv_sec - fileLookStart_.tv_sec) * 1000000 // sec to us
730  + (fileLookStop_.tv_usec - fileLookStart_.tv_usec); // us
731  // add this to lead times for this lumi
732  leadTimes_.push_back((double)elapsedTime);
733 
734  // recompute average lead time for this lumi
735  if (leadTimes_.size() == 1)
737  else {
738  double totTime = 0;
739  for (unsigned int i = 0; i < leadTimes_.size(); i++)
740  totTime += leadTimes_[i];
741  avgLeadTime_[lumi] = 0.001 * (totTime / leadTimes_.size());
742  }
743  }
744 
745  void FastMonitoringService::reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount) {
746  std::lock_guard<std::mutex> lock(fmt_->monlock_);
747  lockStatsDuringLumi_[ls] = std::pair<double, unsigned int>(waitTime, lockCount);
748  }
749 
750  //for the output module
751  unsigned int FastMonitoringService::getEventsProcessedForLumi(unsigned int lumi, bool* abortFlag) {
752  std::lock_guard<std::mutex> lock(fmt_->monlock_);
753 
754  auto it = processedEventsPerLumi_.find(lumi);
755  if (it != processedEventsPerLumi_.end()) {
756  unsigned int proc = it->second.first;
757  if (abortFlag)
758  *abortFlag = it->second.second;
759  return proc;
760  } else {
761  throw cms::Exception("FastMonitoringService")
762  << "output module wants already deleted (or never reported by SOURCE) lumisection event count for LUMI -: "
763  << lumi;
764  return 0;
765  }
766  }
767 
768  //for the output module
770  std::lock_guard<std::mutex> lock(fmt_->monlock_);
771 
772  auto it = processedEventsPerLumi_.find(lumi);
773  if (it != processedEventsPerLumi_.end()) {
774  unsigned int abortFlag = it->second.second;
775  return abortFlag;
776  } else {
777  throw cms::Exception("FastMonitoringService")
778  << "output module wants already deleted (or never reported by SOURCE) lumisection status for LUMI -: "
779  << lumi;
780  return false;
781  }
782  }
783 
784  // the function to be called in the thread. Thread completes when function returns.
786  monInit_.exchange(true, std::memory_order_acquire);
787  while (!fmt_->m_stoprequest) {
788  std::vector<std::vector<unsigned int>> lastEnc;
789  {
790  std::unique_lock<std::mutex> lock(fmt_->monlock_);
791 
792  doSnapshot(lastGlobalLumi_, false);
793 
794  lastEnc.emplace_back(fmt_->m_data.ministateEncoded_);
795  lastEnc.emplace_back(fmt_->m_data.microstateEncoded_);
796 
798  if (filePerFwkStream_) {
799  std::vector<std::string> CSVv;
800  for (unsigned int i = 0; i < nStreams_; i++) {
801  CSVv.push_back(fmt_->jsonMonitor_->getCSVString((int)i));
802  }
803  // release mutex before writing out fast path file
804  lock.release()->unlock();
805  for (unsigned int i = 0; i < nStreams_; i++) {
806  if (!CSVv[i].empty())
807  fmt_->jsonMonitor_->outputCSV(fastPathList_[i], CSVv[i]);
808  }
809  } else {
810  std::string CSV = fmt_->jsonMonitor_->getCSVString();
811  // release mutex before writing out fast path file
812  lock.release()->unlock();
813  if (!CSV.empty())
814  fmt_->jsonMonitor_->outputCSV(fastPath_, CSV);
815  }
816  }
817  snapCounter_++;
818  }
819 
820  {
821  edm::LogInfo msg("FastMonitoringService");
822  auto f = [&](std::vector<unsigned int> const& p) {
823  for (unsigned int i = 0; i < nStreams_; i++) {
824  if (i == 0)
825  msg << "[" << p[i] << ",";
826  else if (i <= nStreams_ - 1)
827  msg << p[i] << ",";
828  else
829  msg << p[i] << "]";
830  }
831  };
832 
833  msg << "Current states: Ms=" << fmt_->m_data.fastMacrostateJ_.value() << " ms=";
834  f(lastEnc[0]);
835  msg << " us=";
836  f(lastEnc[1]);
838  }
839 
840  ::sleep(sleepTime_);
841  }
842  }
843 
844  void FastMonitoringService::doSnapshot(const unsigned int ls, const bool isGlobalEOL) {
845  // update macrostate
846  fmt_->m_data.fastMacrostateJ_ = fmt_->m_data.macrostate_;
847 
848  std::vector<const void*> microstateCopy(fmt_->m_data.microstate_.begin(), fmt_->m_data.microstate_.end());
849  std::vector<unsigned char> microstateAcqCopy(fmt_->m_data.microstateAcqFlag_.begin(),
850  fmt_->m_data.microstateAcqFlag_.end());
851 
852  if (!isInitTransition_) {
853  auto itd = avgLeadTime_.find(ls);
854  if (itd != avgLeadTime_.end())
855  fmt_->m_data.fastAvgLeadTimeJ_ = itd->second;
856  else
857  fmt_->m_data.fastAvgLeadTimeJ_ = 0.;
858 
859  auto iti = filesProcessedDuringLumi_.find(ls);
860  if (iti != filesProcessedDuringLumi_.end())
861  fmt_->m_data.fastFilesProcessedJ_ = iti->second;
862  else
863  fmt_->m_data.fastFilesProcessedJ_ = 0;
864 
865  auto itrd = lockStatsDuringLumi_.find(ls);
866  if (itrd != lockStatsDuringLumi_.end()) {
867  fmt_->m_data.fastLockWaitJ_ = itrd->second.first;
868  fmt_->m_data.fastLockCountJ_ = itrd->second.second;
869  } else {
870  fmt_->m_data.fastLockWaitJ_ = 0.;
871  fmt_->m_data.fastLockCountJ_ = 0.;
872  }
873  }
874 
875  for (unsigned int i = 0; i < nStreams_; i++) {
876  fmt_->m_data.ministateEncoded_[i] = fmt_->m_data.encPath_[i].encodeString(fmt_->m_data.ministate_[i]);
877  if (microstateAcqCopy[i])
878  fmt_->m_data.microstateEncoded_[i] =
879  fmt_->m_data.microstateBins_ + fmt_->m_data.encModule_.encode(microstateCopy[i]);
880  else
881  fmt_->m_data.microstateEncoded_[i] = fmt_->m_data.encModule_.encode(microstateCopy[i]);
882  }
883 
884  bool inputStatePerThread = false;
885 
887  switch (inputSupervisorState_) {
889  fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_fileLimit;
890  break;
892  fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_waitFreeChunk;
893  break;
895  fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_waitFreeChunkCopying;
896  break;
898  fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_waitFreeThread;
899  break;
901  fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_waitFreeThreadCopying;
902  break;
904  fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_busy;
905  break;
907  fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_lockPolling;
908  break;
910  fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_lockPollingCopying;
911  break;
913  fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_runEnd;
914  break;
916  fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_noFile;
917  break;
919  fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_newFile;
920  break;
923  break;
925  fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_newFileWaitThread;
926  break;
929  break;
931  fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_newFileWaitChunk;
932  break;
933  default:
934  fmt_->m_data.inputState_[0] = FastMonState::inWaitInput;
935  }
936  } else if (inputState_ == FastMonState::inWaitChunk) {
937  switch (inputSupervisorState_) {
939  fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_fileLimit;
940  break;
942  fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_waitFreeChunk;
943  break;
945  fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_waitFreeChunkCopying;
946  break;
948  fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_waitFreeThread;
949  break;
951  fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_waitFreeThreadCopying;
952  break;
954  fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_busy;
955  break;
957  fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_lockPolling;
958  break;
960  fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_lockPollingCopying;
961  break;
963  fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_runEnd;
964  break;
966  fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_noFile;
967  break;
969  fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_newFile;
970  break;
973  break;
975  fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_newFileWaitThread;
976  break;
979  break;
981  fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_newFileWaitChunk;
982  break;
983  default:
984  fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk;
985  }
986  } else if (inputState_ == FastMonState::inNoRequest) {
987  inputStatePerThread = true;
988  for (unsigned int i = 0; i < nStreams_; i++) {
989  if (microstateCopy[i] == &reservedMicroStateNames[FastMonState::mIdle])
990  fmt_->m_data.inputState_[i] = FastMonState::inNoRequestWithIdleThreads;
991  else if (microstateCopy[i] == &reservedMicroStateNames[FastMonState::mEoL] ||
992  microstateCopy[i] == &reservedMicroStateNames[FastMonState::mFwkEoL])
993  fmt_->m_data.inputState_[i] = FastMonState::inNoRequestWithEoLThreads;
994  else
995  fmt_->m_data.inputState_[i] = FastMonState::inNoRequest;
996  }
997  } else if (inputState_ == FastMonState::inNewLumi) {
998  inputStatePerThread = true;
999  for (unsigned int i = 0; i < nStreams_; i++) {
1000  if (microstateCopy[i] == &reservedMicroStateNames[FastMonState::mEoL] ||
1001  microstateCopy[i] == &reservedMicroStateNames[FastMonState::mFwkEoL])
1002  fmt_->m_data.inputState_[i] = FastMonState::inNewLumi;
1003  }
1005  //apply directly throttled state from supervisor
1006  fmt_->m_data.inputState_[0] = inputSupervisorState_;
1007  } else
1008  fmt_->m_data.inputState_[0] = inputState_;
1009 
1010  //this is same for all streams
1011  if (!inputStatePerThread)
1012  for (unsigned int i = 1; i < nStreams_; i++)
1013  fmt_->m_data.inputState_[i] = fmt_->m_data.inputState_[0];
1014 
1015  if (isGlobalEOL) { //only update global variables
1016  fmt_->jsonMonitor_->snapGlobal(ls);
1017  } else
1018  fmt_->jsonMonitor_->snap(ls);
1019  }
1020 
1021  //compatibility
1023 
1025 
1026 } //end namespace evf
std::atomic< bool > has_data_exception_
void prePathEvent(edm::StreamContext const &, edm::PathContext const &)
std::atomic< FastMonState::InputState > inputState_
void watchPreStreamEarlyTermination(PreStreamEarlyTermination::slot_type const &iSlot)
Definition: fillJson.h:27
static const std::string inputStateNames[FastMonState::inCOUNT]
void watchPreEvent(PreEvent::slot_type const &iSlot)
ModuleDescription const * moduleDescription() const
void postModuleEventAcquire(edm::StreamContext const &, edm::ModuleCallingContext const &)
LuminosityBlockNumber_t luminosityBlock() const
std::atomic< bool > isInitTransition_
void watchPrePathEvent(PrePathEvent::slot_type const &iSlot)
void watchPreallocate(Preallocate::slot_type const &iSlot)
void setExceptionDetected(unsigned int ls)
void watchPreModuleEventAcquire(PreModuleEventAcquire::slot_type const &iSlot)
void watchPostEndJob(PostEndJob::slot_type const &iSlot)
void preallocate(edm::service::SystemBounds const &)
std::map< unsigned int, timeval > lumiStartTime_
void preGlobalBeginLumi(edm::GlobalContext const &)
std::pair< bool, unsigned int > getEventReport(unsigned int lumi, bool erase)
void watchPreModuleEvent(PreModuleEvent::slot_type const &iSlot)
void postGlobalEndLumi(edm::GlobalContext const &)
void postEvent(edm::StreamContext const &)
void accumulateFileSize(unsigned int lumi, unsigned long fileSize)
void watchPreGlobalEndLumi(PreGlobalEndLumi::slot_type const &iSlot)
std::map< unsigned int, unsigned long > accuSize_
std::filesystem::path workingDirectory_
std::vector< std::atomic< bool > * > streamCounterUpdating_
void watchPostEvent(PostEvent::slot_type const &iSlot)
volatile std::atomic< bool > shutdown_flag
void watchPostStreamEndLumi(PostStreamEndLumi::slot_type const &iSlot)
void watchPreGlobalBeginLumi(PreGlobalBeginLumi::slot_type const &iSlot)
void watchPostModuleEvent(PostModuleEvent::slot_type const &iSlot)
Value & append(const Value &value)
Append value to array at the end.
void watchPostSourceEvent(PostSourceEvent::slot_type const &iSlot)
std::map< unsigned int, unsigned int > filesProcessedDuringLumi_
std::pair< bool, unsigned int > getEventReport(unsigned int lumi, bool erase)
Definition: DAQSource.cc:1378
void preGlobalEndLumi(edm::GlobalContext const &)
void setMicroState(FastMonState::Microstate)
Represents a JSON value.
Definition: value.h:99
bool isExceptionOnData(unsigned int ls)
void watchPreStreamEndLumi(PreStreamEndLumi::slot_type const &iSlot)
void preGlobalEarlyTermination(edm::GlobalContext const &, edm::TerminationOrigin)
void watchPreSourceEarlyTermination(PreSourceEarlyTermination::slot_type const &iSlot)
static const edm::ModuleDescription reservedMicroStateNames[FastMonState::mCOUNT]
Definition: Electron.h:6
constexpr int nSpecialModules
std::vector< std::string > const & endPaths() const
LuminosityBlockNumber_t luminosityBlock() const
Definition: EventID.h:39
void watchJobFailure(JobFailure::slot_type const &iSlot)
convenience function for attaching to signal
void preModuleBeginJob(edm::ModuleDescription const &)
std::map< unsigned int, std::pair< unsigned int, bool > > processedEventsPerLumi_
MicroStateService(const edm::ParameterSet &, edm::ActivityRegistry &)
void preStreamEndLumi(edm::StreamContext const &)
std::map< unsigned int, double > avgLeadTime_
void watchPostStreamBeginLumi(PostStreamBeginLumi::slot_type const &iSlot)
void doSnapshot(const unsigned int ls, const bool isGlobalEOL)
void preStreamEarlyTermination(edm::StreamContext const &, edm::TerminationOrigin)
void watchPreGlobalEarlyTermination(PreGlobalEarlyTermination::slot_type const &iSlot)
std::atomic< FastMonState::InputState > inputSupervisorState_
StreamID const & streamID() const
Definition: StreamContext.h:55
static const std::string nopath_
constexpr double throughputFactor()
void watchPostModuleEventAcquire(PostModuleEventAcquire::slot_type const &iSlot)
std::filesystem::path runDirectory_
double f[11][100]
void watchPostGlobalEndLumi(PostGlobalEndLumi::slot_type const &iSlot)
void watchPreModuleBeginJob(PreModuleBeginJob::slot_type const &iSlot)
void preModuleEventAcquire(edm::StreamContext const &, edm::ModuleCallingContext const &)
void postModuleEvent(edm::StreamContext const &, edm::ModuleCallingContext const &)
LuminosityBlockID const & luminosityBlockID() const
Definition: GlobalContext.h:62
std::vector< std::string > fastPathList_
constexpr int nReservedPaths
void postStreamBeginLumi(edm::StreamContext const &)
Log< level::Info, false > LogInfo
def ls(path, rec=False)
Definition: eostools.py:349
void postStreamEndLumi(edm::StreamContext const &)
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
void preStreamBeginLumi(edm::StreamContext const &)
std::atomic< unsigned long > totalEventsProcessed_
FedRawDataInputSource * inputSource_
std::map< unsigned int, std::pair< double, unsigned int > > lockStatsDuringLumi_
constexpr int nReservedModules
void watchPreStreamBeginLumi(PreStreamBeginLumi::slot_type const &iSlot)
void add(std::string const &label, ParameterSetDescription const &psetDescription)
tuple msg
Definition: mps_check.py:286
std::vector< double > leadTimes_
void preBeginJob(edm::PathsAndConsumesOfModulesBase const &, edm::ProcessContext const &pc)
void watchPreBeginJob(PreBeginJob::slot_type const &iSlot)
convenience function for attaching to signal
void stoppedLookingForFile(unsigned int lumi)
std::atomic< bool > has_source_exception_
EventID const & eventID() const
Definition: StreamContext.h:60
void postGlobalBeginRun(edm::GlobalContext const &)
void preEvent(edm::StreamContext const &)
std::string const & pathName() const
Definition: PathContext.h:30
unsigned int getEventsProcessedForLumi(unsigned int lumi, bool *abortFlag=nullptr)
void preSourceEarlyTermination(edm::TerminationOrigin)
void watchPreSourceEvent(PreSourceEvent::slot_type const &iSlot)
unsigned int value() const
Definition: StreamID.h:43
bool getAbortFlagForLumi(unsigned int lumi)
std::shared_ptr< FastMonitoringThread > fmt_
Log< level::Warning, false > LogWarning
void preModuleEvent(edm::StreamContext const &, edm::ModuleCallingContext const &)
Writes a Value in JSON format in a human friendly way.
Definition: writer.h:63
void reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount)
std::vector< unsigned int > exceptionInLS_
std::vector< std::string > const & paths() const
#define LogDebug(id)
void watchPostBeginJob(PostBeginJob::slot_type const &iSlot)
convenience function for attaching to signal
array value (ordered list)
Definition: value.h:30