CMS 3D CMS Logo

All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
FastMonitoringService.cc
Go to the documentation of this file.
2 #include <iostream>
3 
5 #include <iomanip>
6 #include <sys/time.h>
7 
18 
21 using namespace jsoncollector;
22 
25 
26 constexpr double throughputFactor() { return (1000000) / double(1024 * 1024); }
27 
28 static const int nReservedModules = 64;
29 static const int nSpecialModules = 10;
30 static const int nReservedPaths = 1;
31 
32 namespace evf {
33 
34  const std::string FastMonitoringService::macroStateNames[FastMonitoringThread::MCOUNT] = {"Init",
35  "JobReady",
36  "RunGiven",
37  "Running",
38  "Stopping",
39  "Done",
40  "JobEnded",
41  "Error",
42  "ErrorEnded",
43  "End",
44  "Invalid"};
45 
46  const std::string FastMonitoringService::inputStateNames[FastMonitoringThread::inCOUNT] = {
47  "Ignore",
48  "Init",
49  "WaitInput",
50  "NewLumi",
51  "NewLumiBusyEndingLS",
52  "NewLumiIdleEndingLS",
53  "RunEnd",
54  "ProcessingFile",
55  "WaitChunk",
56  "ChunkReceived",
57  "ChecksumEvent",
58  "CachedEvent",
59  "ReadEvent",
60  "ReadCleanup",
61  "NoRequest",
62  "NoRequestWithIdleThreads",
63  "NoRequestWithGlobalEoL",
64  "NoRequestWithEoLThreads",
65  "SupFileLimit",
66  "SupWaitFreeChunk",
67  "SupWaitFreeChunkCopying",
68  "SupWaitFreeThread",
69  "SupWaitFreeThreadCopying",
70  "SupBusy",
71  "SupLockPolling",
72  "SupLockPollingCopying",
73  "SupNoFile",
74  "SupNewFile",
75  "SupNewFileWaitThreadCopying",
76  "SupNewFileWaitThread",
77  "SupNewFileWaitChunkCopying",
78  "SupNewFileWaitChunk",
79  "WaitInput_fileLimit",
80  "WaitInput_waitFreeChunk",
81  "WaitInput_waitFreeChunkCopying",
82  "WaitInput_waitFreeThread",
83  "WaitInput_waitFreeThreadCopying",
84  "WaitInput_busy",
85  "WaitInput_lockPolling",
86  "WaitInput_lockPollingCopying",
87  "WaitInput_runEnd",
88  "WaitInput_noFile",
89  "WaitInput_newFile",
90  "WaitInput_newFileWaitThreadCopying",
91  "WaitInput_newFileWaitThread",
92  "WaitInput_newFileWaitChunkCopying",
93  "WaitInput_newFileWaitChunk",
94  "WaitChunk_fileLimit",
95  "WaitChunk_waitFreeChunk",
96  "WaitChunk_waitFreeChunkCopying",
97  "WaitChunk_waitFreeThread",
98  "WaitChunk_waitFreeThreadCopying",
99  "WaitChunk_busy",
100  "WaitChunk_lockPolling",
101  "WaitChunk_lockPollingCopying",
102  "WaitChunk_runEnd",
103  "WaitChunk_noFile",
104  "WaitChunk_newFile",
105  "WaitChunk_newFileWaitThreadCopying",
106  "WaitChunk_newFileWaitThread",
107  "WaitChunk_newFileWaitChunkCopying",
108  "WaitChunk_newFileWaitChunk"};
109 
110  const std::string FastMonitoringService::nopath_ = "NoPath";
111 
113  : MicroStateService(iPS, reg),
114  encModule_(nReservedModules),
115  nStreams_(0) //until initialized
116  ,
117  sleepTime_(iPS.getUntrackedParameter<int>("sleepTime", 1)),
118  fastMonIntervals_(iPS.getUntrackedParameter<unsigned int>("fastMonIntervals", 2)),
119  fastName_("fastmoni"),
120  slowName_("slowmoni"),
121  filePerFwkStream_(iPS.getUntrackedParameter<bool>("filePerFwkStream", false)),
122  totalEventsProcessed_(0) {
123  reg.watchPreallocate(this, &FastMonitoringService::preallocate); //receiving information on number of threads
125 
130 
134 
139 
141 
142  reg.watchPreEvent(this, &FastMonitoringService::preEvent); //stream
144 
145  reg.watchPreSourceEvent(this, &FastMonitoringService::preSourceEvent); //source (with streamID of requestor)
147 
148  reg.watchPreModuleEvent(this, &FastMonitoringService::preModuleEvent); //should be stream
150 
154 
155  //find microstate definition path (required by the module)
156  struct stat statbuf;
157  std::string microstateBaseSuffix = "src/EventFilter/Utilities/plugins/microstatedef.jsd";
158  std::string microstatePath = std::string(std::getenv("CMSSW_BASE")) + "/" + microstateBaseSuffix;
159  if (stat(microstatePath.c_str(), &statbuf)) {
160  microstatePath = std::string(std::getenv("CMSSW_RELEASE_BASE")) + "/" + microstateBaseSuffix;
161  if (stat(microstatePath.c_str(), &statbuf)) {
162  microstatePath = microstateBaseSuffix;
163  if (stat(microstatePath.c_str(), &statbuf))
164  throw cms::Exception("FastMonitoringService") << "microstate definition file not found";
165  }
166  }
167  fastMicrostateDefPath_ = microstateDefPath_ = microstatePath;
168  }
169 
171 
174  desc.setComment("Service for File-based DAQ monitoring and event accounting");
175  desc.addUntracked<int>("sleepTime", 1)->setComment("Sleep time of the monitoring thread");
176  desc.addUntracked<unsigned int>("fastMonIntervals", 2)
177  ->setComment("Modulo of sleepTime intervals on which fastmon file is written out");
178  desc.addUntracked<bool>("filePerFwkStream", false)
179  ->setComment("Switches on monitoring output per framework stream");
180  desc.setAllowAnything();
181  descriptions.add("FastMonitoringService", desc);
182  }
183 
185  Json::Value legendaVector(Json::arrayValue);
186  for (int i = 0; i < encPath_[0].current_; i++)
187  legendaVector.append(Json::Value(*(static_cast<const std::string*>(encPath_[0].decode(i)))));
188  Json::Value valReserved(nReservedPaths);
189  Json::Value pathLegend;
190  pathLegend["names"] = legendaVector;
191  pathLegend["reserved"] = valReserved;
193  return writer.write(pathLegend);
194  }
195 
197  Json::Value legendaVector(Json::arrayValue);
198  for (int i = 0; i < encModule_.current_; i++)
199  legendaVector.append(
200  Json::Value((static_cast<const edm::ModuleDescription*>(encModule_.decode(i)))->moduleLabel()));
201  Json::Value valReserved(nReservedModules);
202  Json::Value valSpecial(nSpecialModules);
203  Json::Value valOutputModules(nOutputModules_);
204  Json::Value moduleLegend;
205  moduleLegend["names"] = legendaVector;
206  moduleLegend["reserved"] = valReserved;
207  moduleLegend["special"] = valSpecial;
208  moduleLegend["output"] = valOutputModules;
210  return writer.write(moduleLegend);
211  }
212 
214  Json::Value legendaVector(Json::arrayValue);
215  for (int i = 0; i < FastMonitoringThread::inCOUNT; i++)
216  legendaVector.append(Json::Value(inputStateNames[i]));
217  Json::Value moduleLegend;
218  moduleLegend["names"] = legendaVector;
220  return writer.write(moduleLegend);
221  }
222 
224  nStreams_ = bounds.maxNumberOfStreams();
225  nThreads_ = bounds.maxNumberOfThreads();
226  //this should already be >=1
227  if (nStreams_ == 0)
228  nStreams_ = 1;
229  if (nThreads_ == 0)
230  nThreads_ = 1;
231  }
232 
234  // FIND RUN DIRECTORY
235  // The run dir should be set via the configuration of EvFDaqDirector
236 
237  if (edm::Service<evf::EvFDaqDirector>().operator->() == nullptr) {
238  throw cms::Exception("FastMonitoringService") << "EvFDaqDirector is not present";
239  }
240  boost::filesystem::path runDirectory{edm::Service<evf::EvFDaqDirector>()->baseRunDir()};
241  workingDirectory_ = runDirectory_ = runDirectory;
242  workingDirectory_ /= "mon";
243 
244  if (!boost::filesystem::is_directory(workingDirectory_)) {
245  LogDebug("FastMonitoringService") << "<MON> DIR NOT FOUND! Trying to create -: " << workingDirectory_.string();
246  boost::filesystem::create_directories(workingDirectory_);
247  if (!boost::filesystem::is_directory(workingDirectory_))
248  edm::LogWarning("FastMonitoringService") << "Unable to create <MON> DIR -: " << workingDirectory_.string()
249  << ". No monitoring data will be written.";
250  }
251 
252  std::ostringstream fastFileName;
253 
254  fastFileName << fastName_ << "_pid" << std::setfill('0') << std::setw(5) << getpid() << ".fast";
256  fast /= fastFileName.str();
257  fastPath_ = fast.string();
258  if (filePerFwkStream_)
259  for (unsigned int i = 0; i < nStreams_; i++) {
260  std::ostringstream fastFileNameTid;
261  fastFileNameTid << fastName_ << "_pid" << std::setfill('0') << std::setw(5) << getpid() << "_tid" << i
262  << ".fast";
264  fastTid /= fastFileNameTid.str();
265  fastPathList_.push_back(fastTid.string());
266  }
267 
268  std::ostringstream moduleLegFile;
269  std::ostringstream moduleLegFileJson;
270  moduleLegFile << "microstatelegend_pid" << std::setfill('0') << std::setw(5) << getpid() << ".leg";
271  moduleLegFileJson << "microstatelegend_pid" << std::setfill('0') << std::setw(5) << getpid() << ".jsn";
272  moduleLegendFile_ = (workingDirectory_ / moduleLegFile.str()).string();
273  moduleLegendFileJson_ = (workingDirectory_ / moduleLegFileJson.str()).string();
274 
275  std::ostringstream pathLegFile;
276  std::ostringstream pathLegFileJson;
277  pathLegFile << "pathlegend_pid" << std::setfill('0') << std::setw(5) << getpid() << ".leg";
278  pathLegendFile_ = (workingDirectory_ / pathLegFile.str()).string();
279  pathLegFileJson << "pathlegend_pid" << std::setfill('0') << std::setw(5) << getpid() << ".jsn";
280  pathLegendFileJson_ = (workingDirectory_ / pathLegFileJson.str()).string();
281 
282  std::ostringstream inputLegFileJson;
283  inputLegFileJson << "inputlegend_pid" << std::setfill('0') << std::setw(5) << getpid() << ".jsn";
284  inputLegendFileJson_ = (workingDirectory_ / inputLegFileJson.str()).string();
285 
286  LogDebug("FastMonitoringService") << "Initializing FastMonitor with microstate def path -: " << microstateDefPath_;
287  //<< encPath_.current_ + 1 << " " << encModule_.current_ + 1
288 
289  /*
290  * initialize the fast monitor with:
291  * vector of pointers to monitorable parameters
292  * path to definition
293  *
294  */
295 
297 
298  for (unsigned int i = 0; i < (mCOUNT); i++)
299  encModule_.updateReserved(static_cast<const void*>(reservedMicroStateNames + i));
301 
302  for (unsigned int i = 0; i < nStreams_; i++) {
303  ministate_.emplace_back(&nopath_);
305 
306  //for synchronization
307  streamCounterUpdating_.push_back(new std::atomic<bool>(false));
308 
309  //path (mini) state
310  encPath_.emplace_back(0);
311  encPath_[i].update(static_cast<const void*>(&nopath_));
312  eventCountForPathInit_.push_back(0);
313  firstEventId_.push_back(0);
314  collectedPathList_.push_back(new std::atomic<bool>(false));
315  }
316  //for (unsigned int i=0;i<nThreads_;i++)
317  // threadMicrostate_.push_back(&reservedMicroStateNames[mInvalid]);
318 
319  //initial size until we detect number of bins
324 
325  lastGlobalLumi_ = 0;
326  isInitTransition_ = true;
327  lumiFromSource_ = 0;
328 
329  //startup monitoring
331  fmt_.jsonMonitor_->setNStreams(nStreams_);
333  monInit_.store(false, std::memory_order_release);
335 
336  //this definition needs: #include "tbb/compat/thread"
337  //however this would results in TBB imeplementation replacing std::thread
338  //(both supposedly call pthread_self())
339  //number of threads created in process could be obtained from /proc,
340  //assuming that all posix threads are true kernel threads capable of running in parallel
341 
342  //#if TBB_IMPLEMENT_CPP0X
344  //threadIDAvailable_=true;
345  //#endif
346  }
347 
349  std::string context;
351  context = " FromThisContext ";
353  context = " FromAnotherContext";
355  context = " FromExternalSignal";
356  edm::LogWarning("FastMonitoringService")
357  << " STREAM " << sc.streamID().value() << " earlyTermination -: ID:" << sc.eventID()
358  << " LS:" << sc.eventID().luminosityBlock() << " " << context;
359  std::lock_guard<std::mutex> lock(fmt_.monlock_);
360  exceptionInLS_.push_back(sc.eventID().luminosityBlock());
361  }
362 
364  std::string context;
366  context = " FromThisContext ";
368  context = " FromAnotherContext";
370  context = " FromExternalSignal";
371  edm::LogWarning("FastMonitoringService")
372  << " GLOBAL "
373  << "earlyTermination -: LS:" << gc.luminosityBlockID().luminosityBlock() << " " << context;
374  std::lock_guard<std::mutex> lock(fmt_.monlock_);
376  }
377 
379  std::string context;
381  context = " FromThisContext ";
383  context = " FromAnotherContext";
385  context = " FromExternalSignal";
386  edm::LogWarning("FastMonitoringService") << " SOURCE "
387  << "earlyTermination -: " << context;
388  std::lock_guard<std::mutex> lock(fmt_.monlock_);
389  exception_detected_ = true;
390  }
391 
393  if (!ls)
394  exception_detected_ = true;
395  else
396  exceptionInLS_.push_back(ls);
397  }
398 
400 
401  //new output module name is stream
403  std::lock_guard<std::mutex> lock(fmt_.monlock_);
404  //std::cout << " Pre module Begin Job module: " << desc.moduleName() << std::endl;
405 
406  //build a map of modules keyed by their module description address
407  //here we need to treat output modules in a special way so they can be easily singled out
408  if (desc.moduleName() == "Stream" || desc.moduleName() == "ShmStreamConsumer" ||
409  desc.moduleName() == "EvFOutputModule" || desc.moduleName() == "EventStreamFileWriter" ||
410  desc.moduleName() == "PoolOutputModule") {
411  encModule_.updateReserved((void*)&desc);
412  nOutputModules_++;
413  } else
414  encModule_.update((void*)&desc);
415  }
416 
418  std::string&& moduleLegStrJson = makeModuleLegendaJson();
419  FileIO::writeStringToFile(moduleLegendFileJson_, moduleLegStrJson);
420 
421  std::string inputLegendStrJson = makeInputLegendaJson();
422  FileIO::writeStringToFile(inputLegendFileJson_, inputLegendStrJson);
423 
425 
426  //update number of entries in module histogram
427  std::lock_guard<std::mutex> lock(fmt_.monlock_);
429  }
430 
433  fmt_.stop();
434  }
435 
438  isInitTransition_ = false;
439  }
440 
442  timeval lumiStartTime;
443  gettimeofday(&lumiStartTime, nullptr);
444  unsigned int newLumi = gc.luminosityBlockID().luminosityBlock();
445  lastGlobalLumi_ = newLumi;
446 
447  std::lock_guard<std::mutex> lock(fmt_.monlock_);
448  lumiStartTime_[newLumi] = lumiStartTime;
449  }
450 
452  unsigned int lumi = gc.luminosityBlockID().luminosityBlock();
453  LogDebug("FastMonitoringService") << "Lumi ended. Writing JSON information. LUMI -: " << lumi;
454  timeval lumiStopTime;
455  gettimeofday(&lumiStopTime, nullptr);
456 
457  std::lock_guard<std::mutex> lock(fmt_.monlock_);
458 
459  // Compute throughput
460  timeval stt = lumiStartTime_[lumi];
461  lumiStartTime_.erase(lumi);
462  unsigned long usecondsForLumi = (lumiStopTime.tv_sec - stt.tv_sec) * 1000000 + (lumiStopTime.tv_usec - stt.tv_usec);
463  unsigned long accuSize = accuSize_.find(lumi) == accuSize_.end() ? 0 : accuSize_[lumi];
464  accuSize_.erase(lumi);
465  double throughput = throughputFactor() * double(accuSize) / double(usecondsForLumi);
466  //store to registered variable
467  fmt_.m_data.fastThroughputJ_.value() = throughput;
468 
469  //update
470  doSnapshot(lumi, true);
471 
472  //retrieve one result we need (todo: sanity check if it's found)
473  IntJ* lumiProcessedJptr = dynamic_cast<IntJ*>(fmt_.jsonMonitor_->getMergedIntJForLumi("Processed", lumi));
474  if (!lumiProcessedJptr)
475  throw cms::Exception("FastMonitoringService") << "Internal error: got null pointer from FastMonitor";
476  processedEventsPerLumi_[lumi] = std::pair<unsigned int, bool>(lumiProcessedJptr->value(), false);
477 
478  //checking if exception has been thrown (in case of Global/Stream early termination, for this LS)
479  bool exception_detected = exception_detected_;
480  for (auto ex : exceptionInLS_)
481  if (lumi == ex)
482  exception_detected = true;
483 
484  if (edm::shutdown_flag || exception_detected) {
485  edm::LogInfo("FastMonitoringService")
486  << "Run interrupted. Skip writing EoL information -: " << processedEventsPerLumi_[lumi].first
487  << " events were processed in LUMI " << lumi;
488  //this will prevent output modules from producing json file for possibly incomplete lumi
489  processedEventsPerLumi_[lumi].first = 0;
490  processedEventsPerLumi_[lumi].second = true;
491  //disable this exception, so service can be used standalone (will be thrown if output module asks for this information)
492  //throw cms::Exception("FastMonitoringService") << "SOURCE did not send update for lumi block. LUMI -:" << lumi;
493  return;
494  }
495 
496  if (inputSource_) {
497  auto sourceReport = inputSource_->getEventReport(lumi, true);
498  if (sourceReport.first) {
499  if (sourceReport.second != processedEventsPerLumi_[lumi].first) {
500  throw cms::Exception("FastMonitoringService") << "MISMATCH with SOURCE update. LUMI -: " << lumi
501  << ", events(processed):" << processedEventsPerLumi_[lumi].first
502  << " events(source):" << sourceReport.second;
503  }
504  }
505  }
506  edm::LogInfo("FastMonitoringService")
507  << "Statistics for lumisection -: lumi = " << lumi << " events = " << lumiProcessedJptr->value()
508  << " time = " << usecondsForLumi / 1000000 << " size = " << accuSize << " thr = " << throughput;
509  delete lumiProcessedJptr;
510 
511  //full global and stream merge&output for this lumi
512 
513  // create file name for slow monitoring file
514  if (filePerFwkStream_) {
515  std::stringstream slowFileNameStem;
516  slowFileNameStem << slowName_ << "_ls" << std::setfill('0') << std::setw(4) << lumi << "_pid" << std::setfill('0')
517  << std::setw(5) << getpid();
519  slow /= slowFileNameStem.str();
520  fmt_.jsonMonitor_->outputFullJSONs(slow.string(), ".jsn", lumi);
521  } else {
522  std::stringstream slowFileName;
523  slowFileName << slowName_ << "_ls" << std::setfill('0') << std::setw(4) << lumi << "_pid" << std::setfill('0')
524  << std::setw(5) << getpid() << ".jsn";
526  slow /= slowFileName.str();
527  fmt_.jsonMonitor_->outputFullJSON(slow.string(),
528  lumi); //full global and stream merge and JSON write for this lumi
529  }
530  fmt_.jsonMonitor_->discardCollected(lumi); //we don't do further updates for this lumi
531  }
532 
534  std::lock_guard<std::mutex> lock(fmt_.monlock_);
535  unsigned int lumi = gc.luminosityBlockID().luminosityBlock();
536  //LS monitoring snapshot with input source data has been taken in previous callback
537  avgLeadTime_.erase(lumi);
538  filesProcessedDuringLumi_.erase(lumi);
539  lockStatsDuringLumi_.erase(lumi);
540 
541  //output module already used this in end lumi (this could be migrated to EvFDaqDirector as it is essential for FFF bookkeeping)
542  processedEventsPerLumi_.erase(lumi);
543  }
544 
546  unsigned int sid = sc.streamID().value();
547 
548  std::lock_guard<std::mutex> lock(fmt_.monlock_);
550 
551  //reset collected values for this stream
552  *(fmt_.m_data.processed_[sid]) = 0;
553 
554  ministate_[sid] = &nopath_;
556  }
557 
560  }
561 
563  unsigned int sid = sc.streamID().value();
564  std::lock_guard<std::mutex> lock(fmt_.monlock_);
565 
566  //update processed count to be complete at this time
568  //reset this in case stream does not get notified of next lumi (we keep processed events only)
569  ministate_[sid] = &nopath_;
571  }
574  }
575 
577  //make sure that all path names are retrieved before allowing ministate to change
578  //hack: assume memory is synchronized after ~50 events seen by each stream
579  if (UNLIKELY(eventCountForPathInit_[sc.streamID()] < 50) &&
580  false == collectedPathList_[sc.streamID()]->load(std::memory_order_acquire)) {
581  //protection between stream threads, as well as the service monitoring thread
582  std::lock_guard<std::mutex> lock(fmt_.monlock_);
583 
584  if (firstEventId_[sc.streamID()] == 0)
585  firstEventId_[sc.streamID()] = sc.eventID().event();
586  if (sc.eventID().event() == firstEventId_[sc.streamID()]) {
587  encPath_[sc.streamID()].update((void*)&pc.pathName());
588  return;
589  } else {
590  //finished collecting path names
591  collectedPathList_[sc.streamID()]->store(true, std::memory_order_seq_cst);
592  fmt_.m_data.ministateBins_ = encPath_[sc.streamID()].vecsize();
593  if (!pathLegendWritten_) {
594  std::string pathLegendStrJson = makePathLegendaJson();
595  FileIO::writeStringToFile(pathLegendFileJson_, pathLegendStrJson);
596  pathLegendWritten_ = true;
597  }
598  }
599  } else {
600  ministate_[sc.streamID()] = &(pc.pathName());
601  }
602  }
603 
605 
608 
609  ministate_[sc.streamID()] = &nopath_;
610 
611  (*(fmt_.m_data.processed_[sc.streamID()]))++;
612  eventCountForPathInit_[sc.streamID()].m_value++;
613 
614  //fast path counter (events accumulated in a run)
615  unsigned long res = totalEventsProcessed_.fetch_add(1, std::memory_order_relaxed);
616  fmt_.m_data.fastPathProcessedJ_ = res + 1;
617  //fmt_.m_data.fastPathProcessedJ_ = totalEventsProcessed_.load(std::memory_order_relaxed);
618  }
619 
622  }
623 
626  }
627 
629  microstate_[sc.streamID().value()] = (void*)(mcc.moduleDescription());
630  }
631 
633  //microstate_[sc.streamID().value()] = (void*)(mcc.moduleDescription());
635  }
636 
637  //FUNCTIONS CALLED FROM OUTSIDE
638 
639  //this is for old-fashioned service that is not thread safe and can block other streams
640  //(we assume the worst case - everything is blocked)
642  for (unsigned int i = 0; i < nStreams_; i++)
644  }
645 
646  //this is for services that are multithreading-enabled or rarely blocks other streams
649  }
650 
651  //from source
652  void FastMonitoringService::accumulateFileSize(unsigned int lumi, unsigned long fileSize) {
653  std::lock_guard<std::mutex> lock(fmt_.monlock_);
654 
655  if (accuSize_.find(lumi) == accuSize_.end())
656  accuSize_[lumi] = fileSize;
657  else
658  accuSize_[lumi] += fileSize;
659 
661  filesProcessedDuringLumi_[lumi] = 1;
662  else
664  }
665 
667  gettimeofday(&fileLookStart_, nullptr);
668  /*
669  std::cout << "Started looking for .raw file at: s=" << fileLookStart_.tv_sec << ": ms = "
670  << fileLookStart_.tv_usec / 1000.0 << std::endl;
671  */
672  }
673 
675  gettimeofday(&fileLookStop_, nullptr);
676  /*
677  std::cout << "Stopped looking for .raw file at: s=" << fileLookStop_.tv_sec << ": ms = "
678  << fileLookStop_.tv_usec / 1000.0 << std::endl;
679  */
680  std::lock_guard<std::mutex> lock(fmt_.monlock_);
681 
682  if (lumi > lumiFromSource_) {
684  leadTimes_.clear();
685  }
686  unsigned long elapsedTime = (fileLookStop_.tv_sec - fileLookStart_.tv_sec) * 1000000 // sec to us
687  + (fileLookStop_.tv_usec - fileLookStart_.tv_usec); // us
688  // add this to lead times for this lumi
689  leadTimes_.push_back((double)elapsedTime);
690 
691  // recompute average lead time for this lumi
692  if (leadTimes_.size() == 1)
693  avgLeadTime_[lumi] = leadTimes_[0];
694  else {
695  double totTime = 0;
696  for (unsigned int i = 0; i < leadTimes_.size(); i++)
697  totTime += leadTimes_[i];
698  avgLeadTime_[lumi] = 0.001 * (totTime / leadTimes_.size());
699  }
700  }
701 
702  void FastMonitoringService::reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount) {
703  std::lock_guard<std::mutex> lock(fmt_.monlock_);
704  lockStatsDuringLumi_[ls] = std::pair<double, unsigned int>(waitTime, lockCount);
705  }
706 
707  //for the output module
708  unsigned int FastMonitoringService::getEventsProcessedForLumi(unsigned int lumi, bool* abortFlag) {
709  std::lock_guard<std::mutex> lock(fmt_.monlock_);
710 
711  auto it = processedEventsPerLumi_.find(lumi);
712  if (it != processedEventsPerLumi_.end()) {
713  unsigned int proc = it->second.first;
714  if (abortFlag)
715  *abortFlag = it->second.second;
716  return proc;
717  } else {
718  throw cms::Exception("FastMonitoringService")
719  << "output module wants already deleted (or never reported by SOURCE) lumisection event count for LUMI -: "
720  << lumi;
721  return 0;
722  }
723  }
724 
725  //for the output module
727  std::lock_guard<std::mutex> lock(fmt_.monlock_);
728 
729  auto it = processedEventsPerLumi_.find(lumi);
730  if (it != processedEventsPerLumi_.end()) {
731  unsigned int abortFlag = it->second.second;
732  return abortFlag;
733  } else {
734  throw cms::Exception("FastMonitoringService")
735  << "output module wants already deleted (or never reported by SOURCE) lumisection status for LUMI -: "
736  << lumi;
737  return false;
738  }
739  }
740 
741  void FastMonitoringService::doSnapshot(const unsigned int ls, const bool isGlobalEOL) {
742  // update macrostate
744 
745  std::vector<const void*> microstateCopy(microstate_.begin(), microstate_.end());
746 
747  if (!isInitTransition_) {
748  auto itd = avgLeadTime_.find(ls);
749  if (itd != avgLeadTime_.end())
750  fmt_.m_data.fastAvgLeadTimeJ_ = itd->second;
751  else
753 
754  auto iti = filesProcessedDuringLumi_.find(ls);
755  if (iti != filesProcessedDuringLumi_.end())
756  fmt_.m_data.fastFilesProcessedJ_ = iti->second;
757  else
759 
760  auto itrd = lockStatsDuringLumi_.find(ls);
761  if (itrd != lockStatsDuringLumi_.end()) {
762  fmt_.m_data.fastLockWaitJ_ = itrd->second.first;
763  fmt_.m_data.fastLockCountJ_ = itrd->second.second;
764  } else {
767  }
768  }
769 
770  for (unsigned int i = 0; i < nStreams_; i++) {
772  fmt_.m_data.microstateEncoded_[i] = encModule_.encode(microstateCopy[i]);
773  }
774 
775  bool inputStatePerThread = false;
776 
778  switch (inputSupervisorState_) {
781  break;
784  break;
787  break;
790  break;
793  break;
796  break;
799  break;
802  break;
805  break;
808  break;
811  break;
814  break;
817  break;
820  break;
823  break;
824  default:
826  }
828  switch (inputSupervisorState_) {
831  break;
834  break;
837  break;
840  break;
843  break;
846  break;
849  break;
852  break;
855  break;
858  break;
861  break;
864  break;
867  break;
870  break;
873  break;
874  default:
876  }
878  inputStatePerThread = true;
879  for (unsigned int i = 0; i < nStreams_; i++) {
880  if (microstateCopy[i] == &reservedMicroStateNames[mIdle])
882  else if (microstateCopy[i] == &reservedMicroStateNames[mEoL] ||
883  microstateCopy[i] == &reservedMicroStateNames[mFwkEoL])
885  else
887  }
889  inputStatePerThread = true;
890  for (unsigned int i = 0; i < nStreams_; i++) {
891  if (microstateCopy[i] == &reservedMicroStateNames[mEoL] ||
892  microstateCopy[i] == &reservedMicroStateNames[mFwkEoL])
894  }
895  } else
897 
898  //this is same for all streams
899  if (!inputStatePerThread)
900  for (unsigned int i = 1; i < nStreams_; i++)
902 
903  if (isGlobalEOL) { //only update global variables
904  fmt_.jsonMonitor_->snapGlobal(ls);
905  } else
906  fmt_.jsonMonitor_->snap(ls);
907  }
908 
909 } //end namespace evf
#define LogDebug(id)
void prePathEvent(edm::StreamContext const &, edm::PathContext const &)
std::string const & pathName() const
Definition: PathContext.h:30
unsigned int maxNumberOfThreads() const
Definition: SystemBounds.h:38
EventNumber_t event() const
Definition: EventID.h:40
void watchPreStreamEarlyTermination(PreStreamEarlyTermination::slot_type const &iSlot)
Definition: fillJson.h:27
void watchPreEvent(PreEvent::slot_type const &iSlot)
unsigned int getEventsProcessedForLumi(unsigned int lumi, bool *abortFlag=0)
static const edm::ModuleDescription reservedMicroStateNames[mCOUNT]
std::vector< ContainableAtomic< const void * > > microstate_
std::atomic< bool > isInitTransition_
void watchPrePathEvent(PrePathEvent::slot_type const &iSlot)
void watchPreallocate(Preallocate::slot_type const &iSlot)
ParameterDescriptionBase * addUntracked(U const &iLabel, T const &value)
void setExceptionDetected(unsigned int ls)
boost::filesystem::path runDirectory_
void watchPostEndJob(PostEndJob::slot_type const &iSlot)
void preallocate(edm::service::SystemBounds const &)
std::map< unsigned int, timeval > lumiStartTime_
void start(void(FastMonitoringService::*fp)(), FastMonitoringService *cp)
void preGlobalBeginLumi(edm::GlobalContext const &)
std::pair< bool, unsigned int > getEventReport(unsigned int lumi, bool erase)
void setAllowAnything()
allow any parameter label/value pairs
double throughputFactor()
void watchPreModuleEvent(PreModuleEvent::slot_type const &iSlot)
void postGlobalEndLumi(edm::GlobalContext const &)
void postEvent(edm::StreamContext const &)
void accumulateFileSize(unsigned int lumi, unsigned long fileSize)
void watchPreGlobalEndLumi(PreGlobalEndLumi::slot_type const &iSlot)
std::map< unsigned int, unsigned long > accuSize_
std::atomic< FastMonitoringThread::InputState > inputSupervisorState_
std::vector< std::atomic< bool > * > streamCounterUpdating_
void watchPostEvent(PostEvent::slot_type const &iSlot)
static const int nReservedPaths
volatile std::atomic< bool > shutdown_flag
LuminosityBlockID const & luminosityBlockID() const
Definition: GlobalContext.h:55
void watchPostStreamEndLumi(PostStreamEndLumi::slot_type const &iSlot)
void watchPreGlobalBeginLumi(PreGlobalBeginLumi::slot_type const &iSlot)
std::string const & moduleName() const
void watchPostModuleEvent(PostModuleEvent::slot_type const &iSlot)
Value & append(const Value &value)
Append value to array at the end.
void watchPostSourceEvent(PostSourceEvent::slot_type const &iSlot)
void doStreamEOLSnapshot(const unsigned int ls, const unsigned int streamID)
std::map< unsigned int, unsigned int > filesProcessedDuringLumi_
void preGlobalEndLumi(edm::GlobalContext const &)
Represents a JSON value.
Definition: value.h:99
LuminosityBlockNumber_t luminosityBlock() const
Definition: EventID.h:39
void watchPreStreamEndLumi(PreStreamEndLumi::slot_type const &iSlot)
void preGlobalEarlyTermination(edm::GlobalContext const &, edm::TerminationOrigin)
void watchPreSourceEarlyTermination(PreSourceEarlyTermination::slot_type const &iSlot)
Definition: Electron.h:6
void watchJobFailure(JobFailure::slot_type const &iSlot)
convenience function for attaching to signal
void registerVariables(jsoncollector::FastMonitor *fm, unsigned int nStreams, unsigned int nThreads)
unsigned int maxNumberOfStreams() const
Definition: SystemBounds.h:35
void preModuleBeginJob(edm::ModuleDescription const &)
static const std::string inputStateNames[FastMonitoringThread::inCOUNT]
void setMicroState(MicroStateService::Microstate) override
void setComment(std::string const &value)
std::map< unsigned int, std::pair< unsigned int, bool > > processedEventsPerLumi_
std::vector< std::atomic< bool > * > collectedPathList_
void preStreamEndLumi(edm::StreamContext const &)
std::map< unsigned int, double > avgLeadTime_
void watchPostStreamBeginLumi(PostStreamBeginLumi::slot_type const &iSlot)
void doSnapshot(const unsigned int ls, const bool isGlobalEOL)
void preStreamEarlyTermination(edm::StreamContext const &, edm::TerminationOrigin)
void watchPreGlobalEarlyTermination(PreGlobalEarlyTermination::slot_type const &iSlot)
std::unique_ptr< jsoncollector::FastMonitor > jsonMonitor_
static const std::string nopath_
ModuleDescription const * moduleDescription() const
void resetFastMonitor(std::string const &microStateDefPath, std::string const &fastMicroStateDefPath)
static const int nReservedModules
void watchPostGlobalEndLumi(PostGlobalEndLumi::slot_type const &iSlot)
std::vector< unsigned long > firstEventId_
void watchPreModuleBeginJob(PreModuleBeginJob::slot_type const &iSlot)
void postModuleEvent(edm::StreamContext const &, edm::ModuleCallingContext const &)
std::vector< std::string > fastPathList_
std::vector< ContainableAtomic< unsigned int > > eventCountForPathInit_
void postStreamBeginLumi(edm::StreamContext const &)
std::atomic< FastMonitoringThread::InputState > inputState_
StreamID const & streamID() const
Definition: StreamContext.h:54
def ls(path, rec=False)
Definition: eostools.py:349
void postStreamEndLumi(edm::StreamContext const &)
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
std::vector< unsigned int > microstateEncoded_
unsigned int value() const
Definition: StreamID.h:42
void preStreamBeginLumi(edm::StreamContext const &)
std::atomic< unsigned long > totalEventsProcessed_
FedRawDataInputSource * inputSource_
std::map< unsigned int, std::pair< double, unsigned int > > lockStatsDuringLumi_
std::atomic< FastMonitoringThread::Macrostate > macrostate_
static const int nSpecialModules
std::vector< ContainableAtomic< const void * > > ministate_
LuminosityBlockNumber_t luminosityBlock() const
void watchPreStreamBeginLumi(PreStreamBeginLumi::slot_type const &iSlot)
void add(std::string const &label, ParameterSetDescription const &psetDescription)
std::vector< double > leadTimes_
void preBeginJob(edm::PathsAndConsumesOfModulesBase const &, edm::ProcessContext const &pc)
void watchPreBeginJob(PreBeginJob::slot_type const &iSlot)
convenience function for attaching to signal
void stoppedLookingForFile(unsigned int lumi)
std::string write(const Value &root) override
Serialize a Value in JSON format.
boost::filesystem::path workingDirectory_
std::vector< jsoncollector::AtomicMonUInt * > processed_
void postGlobalBeginRun(edm::GlobalContext const &)
void preEvent(edm::StreamContext const &)
void preSourceEarlyTermination(edm::TerminationOrigin)
EventID const & eventID() const
Definition: StreamContext.h:59
void watchPreSourceEvent(PreSourceEvent::slot_type const &iSlot)
bool getAbortFlagForLumi(unsigned int lumi)
#define UNLIKELY(x)
Definition: Likely.h:21
void preModuleEvent(edm::StreamContext const &, edm::ModuleCallingContext const &)
Writes a Value in JSON format in a human friendly way.
Definition: writer.h:63
const void * decode(unsigned int index)
std::vector< Encoding > encPath_
std::vector< unsigned int > ministateEncoded_
#define constexpr
void reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount)
std::vector< unsigned int > exceptionInLS_
void watchPostBeginJob(PostBeginJob::slot_type const &iSlot)
convenience function for attaching to signal
array value (ordered list)
Definition: value.h:30