CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
FastMonitoringService.cc
Go to the documentation of this file.
2 #include <iostream>
3 
5 #include <iomanip>
6 #include <sys/time.h>
7 
18 
21 using namespace jsoncollector;
22 
25 
26 constexpr double throughputFactor() {return (1000000)/double(1024*1024);}
27 
28 static const int nReservedModules = 64;
29 static const int nSpecialModules = 7;
30 static const int nReservedPaths = 1;
31 
32 namespace evf{
33 
34  const std::string FastMonitoringService::macroStateNames[FastMonitoringThread::MCOUNT] =
35  {"Init","JobReady","RunGiven","Running",
36  "Stopping","Done","JobEnded","Error","ErrorEnded","End",
37  "Invalid"};
38 
39  const std::string FastMonitoringService::nopath_ = "NoPath";
40 
41  FastMonitoringService::FastMonitoringService(const edm::ParameterSet& iPS,
42  edm::ActivityRegistry& reg) :
43  MicroStateService(iPS,reg)
44  ,encModule_(nReservedModules)
45  ,nStreams_(0)//until initialized
46  ,sleepTime_(iPS.getUntrackedParameter<int>("sleepTime", 1))
47  ,fastMonIntervals_(iPS.getUntrackedParameter<unsigned int>("fastMonIntervals", 2))
48  ,fastName_("fastmoni")
49  ,slowName_("slowmoni")
50  ,totalEventsProcessed_(0)
51  {
52  reg.watchPreallocate(this, &FastMonitoringService::preallocate);//receiving information on number of threads
54 
59 
63 
68 
70 
73 
74  reg.watchPreSourceEvent(this,&FastMonitoringService::preSourceEvent);//source (with streamID of requestor)
76 
79 
83 
84  //find microstate definition path (required by the module)
85  struct stat statbuf;
86  std::string microstateBaseSuffix = "src/EventFilter/Utilities/plugins/microstatedef.jsd";
87  std::string microstatePath = std::string(getenv("CMSSW_BASE")) + "/" + microstateBaseSuffix;
88  if (stat(microstatePath.c_str(), &statbuf)) {
89  microstatePath = std::string(getenv("CMSSW_RELEASE_BASE")) + "/" + microstateBaseSuffix;
90  if (stat(microstatePath.c_str(), &statbuf)) {
91  microstatePath = microstateBaseSuffix;
92  if (stat(microstatePath.c_str(), &statbuf))
93  throw cms::Exception("FastMonitoringService") << "microstate definition file not found";
94  }
95  }
96  fastMicrostateDefPath_ = microstateDefPath_ = microstatePath;
97  }
98 
99 
101  {
102  }
103 
105  {
107  desc.setComment("Service for File-based DAQ monitoring and event accounting");
108  desc.addUntracked<int> ("sleepTime",1)->setComment("Sleep time of the monitoring thread");
109  desc.addUntracked<unsigned int> ("fastMonIntervals",2)->setComment("Modulo of sleepTime intervals on which fastmon file is written out");
110  desc.setAllowAnything();
111  descriptions.add("FastMonitoringService", desc);
112  }
113 
114 
116  Json::Value legendaVector(Json::arrayValue);
117  for(int i = 0; i < encPath_[0].current_; i++)
118  legendaVector.append(Json::Value(*(static_cast<const std::string *>(encPath_[0].decode(i)))));
119  Json::Value valReserved(nReservedPaths);
120  Json::Value pathLegend;
121  pathLegend["names"]=legendaVector;
122  pathLegend["reserved"]=valReserved;
123  Json::StyledWriter writer;
124  return writer.write(pathLegend);
125  }
126 
128  Json::Value legendaVector(Json::arrayValue);
129  for(int i = 0; i < encModule_.current_; i++)
130  legendaVector.append(Json::Value((static_cast<const edm::ModuleDescription *>(encModule_.decode(i)))->moduleLabel()));
131  Json::Value valReserved(nReservedModules);
132  Json::Value valSpecial(nSpecialModules);
133  Json::Value valOutputModules(nOutputModules_);
134  Json::Value moduleLegend;
135  moduleLegend["names"]=legendaVector;
136  moduleLegend["reserved"]=valReserved;
137  moduleLegend["special"]=valSpecial;
138  moduleLegend["output"]=valOutputModules;
139  Json::StyledWriter writer;
140  return writer.write(moduleLegend);
141  }
142 
144  {
145  nStreams_=bounds.maxNumberOfStreams();
146  nThreads_=bounds.maxNumberOfThreads();
147  //this should already be >=1
148  if (nStreams_==0) nStreams_=1;
149  if (nThreads_==0) nThreads_=1;
150  }
151 
153  edm::ProcessContext const& pc)
154  {
155 
156  // FIND RUN DIRECTORY
157  // The run dir should be set via the configuration of EvFDaqDirector
158 
159  if (edm::Service<evf::EvFDaqDirector>().operator->()==nullptr)
160  {
161  throw cms::Exception("FastMonitoringService") << "EvFDaqDirector is not present";
162 
163  }
164  emptyLumisectionMode_ = edm::Service<evf::EvFDaqDirector>()->emptyLumisectionMode();
165  boost::filesystem::path runDirectory(edm::Service<evf::EvFDaqDirector>()->baseRunDir());
166  workingDirectory_ = runDirectory_ = runDirectory;
167  workingDirectory_ /= "mon";
168 
169  if ( !boost::filesystem::is_directory(workingDirectory_)) {
170  LogDebug("FastMonitoringService") << "<MON> DIR NOT FOUND! Trying to create -: " << workingDirectory_.string() ;
171  boost::filesystem::create_directories(workingDirectory_);
172  if ( !boost::filesystem::is_directory(workingDirectory_))
173  edm::LogWarning("FastMonitoringService") << "Unable to create <MON> DIR -: " << workingDirectory_.string()
174  << ". No monitoring data will be written.";
175  }
176 
177  std::ostringstream fastFileName;
178 
179  fastFileName << fastName_ << "_pid" << std::setfill('0') << std::setw(5) << getpid() << ".fast";
181  fast /= fastFileName.str();
182  fastPath_ = fast.string();
183 
184  std::ostringstream moduleLegFile;
185  std::ostringstream moduleLegFileJson;
186  moduleLegFile << "microstatelegend_pid" << std::setfill('0') << std::setw(5) << getpid() << ".leg";
187  moduleLegFileJson << "microstatelegend_pid" << std::setfill('0') << std::setw(5) << getpid() << ".jsn";
188  moduleLegendFile_ = (workingDirectory_/moduleLegFile.str()).string();
189  moduleLegendFileJson_ = (workingDirectory_/moduleLegFileJson.str()).string();
190 
191  std::ostringstream pathLegFile;
192  std::ostringstream pathLegFileJson;
193  pathLegFile << "pathlegend_pid" << std::setfill('0') << std::setw(5) << getpid() << ".leg";
194  pathLegendFile_ = (workingDirectory_/pathLegFile.str()).string();
195  pathLegFileJson << "pathlegend_pid" << std::setfill('0') << std::setw(5) << getpid() << ".jsn";
196  pathLegendFileJson_ = (workingDirectory_/pathLegFileJson.str()).string();
197 
198  LogDebug("FastMonitoringService") << "Initializing FastMonitor with microstate def path -: "
200  //<< encPath_.current_ + 1 << " " << encModule_.current_ + 1
201 
202  /*
203  * initialize the fast monitor with:
204  * vector of pointers to monitorable parameters
205  * path to definition
206  *
207  */
208 
210 
211  for(unsigned int i = 0; i < (mCOUNT); i++)
212  encModule_.updateReserved(static_cast<const void*>(reservedMicroStateNames+i));
214 
215  for (unsigned int i=0;i<nStreams_;i++) {
216  ministate_.push_back(&nopath_);
218 
219  //for synchronization
220  streamCounterUpdating_.push_back(new std::atomic<bool>(0));
221 
222  //path (mini) state
223  encPath_.emplace_back(0);
224  encPath_[i].update(static_cast<const void*>(&nopath_));
225  eventCountForPathInit_.push_back(0);
226  firstEventId_.push_back(0);
227  collectedPathList_.push_back(new std::atomic<bool>(0));
228 
229  }
230  //for (unsigned int i=0;i<nThreads_;i++)
231  // threadMicrostate_.push_back(&reservedMicroStateNames[mInvalid]);
232 
233  //initial size until we detect number of bins
237 
238  lastGlobalLumi_=0;
240  lumiFromSource_=0;
241 
242  //startup monitoring
244  fmt_.jsonMonitor_->setNStreams(nStreams_);
246  monInit_.store(false,std::memory_order_release);
248 
249  //this definition needs: #include "tbb/compat/thread"
250  //however this would results in TBB imeplementation replacing std::thread
251  //(both supposedly call pthread_self())
252  //number of threads created in process could be obtained from /proc,
253  //assuming that all posix threads are true kernel threads capable of running in parallel
254 
255  //#if TBB_IMPLEMENT_CPP0X
257  //threadIDAvailable_=true;
258  //#endif
259 
260  }
261 
263  {
264  std::string context;
265  if (to==edm::TerminationOrigin::ExceptionFromThisContext) context = " FromThisContext ";
266  if (to==edm::TerminationOrigin::ExceptionFromAnotherContext) context = " FromAnotherContext";
267  if (to==edm::TerminationOrigin::ExternalSignal) context = " FromExternalSignal";
268  edm::LogWarning("FastMonitoringService") << " STREAM " << sc.streamID().value() << " earlyTermination -: ID:"<< sc.eventID()
269  << " LS:" << sc.eventID().luminosityBlock() << " " << context;
270  std::lock_guard<std::mutex> lock(fmt_.monlock_);
271  exceptionInLS_.push_back(sc.eventID().luminosityBlock());
272  }
273 
275  {
276  std::string context;
277  if (to==edm::TerminationOrigin::ExceptionFromThisContext) context = " FromThisContext ";
278  if (to==edm::TerminationOrigin::ExceptionFromAnotherContext) context = " FromAnotherContext";
279  if (to==edm::TerminationOrigin::ExternalSignal) context = " FromExternalSignal";
280  edm::LogWarning("FastMonitoringService") << " GLOBAL " << "earlyTermination -: LS:"
281  << gc.luminosityBlockID().luminosityBlock() << " " << context;
282  std::lock_guard<std::mutex> lock(fmt_.monlock_);
284  }
285 
287  {
288  std::string context;
289  if (to==edm::TerminationOrigin::ExceptionFromThisContext) context = " FromThisContext ";
290  if (to==edm::TerminationOrigin::ExceptionFromAnotherContext) context = " FromAnotherContext";
291  if (to==edm::TerminationOrigin::ExternalSignal) context = " FromExternalSignal";
292  edm::LogWarning("FastMonitoringService") << " SOURCE " << "earlyTermination -: " << context;
293  std::lock_guard<std::mutex> lock(fmt_.monlock_);
294  exception_detected_=true;
295  }
296 
298  if (!ls) exception_detected_=true;
299  else exceptionInLS_.push_back(ls);
300  }
301 
303  {
305  }
306 
307  //new output module name is stream
309  {
310  std::lock_guard<std::mutex> lock(fmt_.monlock_);
311  //std::cout << " Pre module Begin Job module: " << desc.moduleName() << std::endl;
312 
313  //build a map of modules keyed by their module description address
314  //here we need to treat output modules in a special way so they can be easily singled out
315  if(desc.moduleName() == "Stream" || desc.moduleName() == "ShmStreamConsumer" || desc.moduleName() == "EvFOutputModule" ||
316  desc.moduleName() == "EventStreamFileWriter" || desc.moduleName() == "PoolOutputModule") {
317  encModule_.updateReserved((void*)&desc);
318  nOutputModules_++;
319  }
320  else
321  encModule_.update((void*)&desc);
322  }
323 
325  {
326  std::string && moduleLegStrJson = makeModuleLegendaJson();
327  FileIO::writeStringToFile(moduleLegendFileJson_, moduleLegStrJson);
328 
330 
331  //update number of entries in module histogram
332  std::lock_guard<std::mutex> lock(fmt_.monlock_);
334  }
335 
337  {
339  fmt_.stop();
340  }
341 
343  {
345  }
346 
348  {
349 
350  timeval lumiStartTime;
351  gettimeofday(&lumiStartTime, 0);
352  unsigned int newLumi = gc.luminosityBlockID().luminosityBlock();
353 
354  std::lock_guard<std::mutex> lock(fmt_.monlock_);
355 
356  lumiStartTime_[newLumi]=lumiStartTime;
357  while (!lastGlobalLumisClosed_.empty()) {
358  //wipe out old map entries as they aren't needed and slow down access
359  unsigned int oldLumi = lastGlobalLumisClosed_.back();
361  lumiStartTime_.erase(oldLumi);
362  avgLeadTime_.erase(oldLumi);
363  filesProcessedDuringLumi_.erase(oldLumi);
364  accuSize_.erase(oldLumi);
365  lockStatsDuringLumi_.erase(oldLumi);
366  processedEventsPerLumi_.erase(oldLumi);
367  }
368  lastGlobalLumi_= newLumi;
370  }
371 
373  {
374  unsigned int lumi = gc.luminosityBlockID().luminosityBlock();
375  LogDebug("FastMonitoringService") << "Lumi ended. Writing JSON information. LUMI -: "
376  << lumi;
377  timeval lumiStopTime;
378  gettimeofday(&lumiStopTime, 0);
379 
380  std::lock_guard<std::mutex> lock(fmt_.monlock_);
381 
382  // Compute throughput
383  timeval stt = lumiStartTime_[lumi];
384  unsigned long usecondsForLumi = (lumiStopTime.tv_sec - stt.tv_sec)*1000000
385  + (lumiStopTime.tv_usec - stt.tv_usec);
386  unsigned long accuSize = accuSize_.find(lumi)==accuSize_.end() ? 0 : accuSize_[lumi];
387  double throughput = throughputFactor()* double(accuSize) / double(usecondsForLumi);
388  //store to registered variable
389  fmt_.m_data.fastThroughputJ_.value() = throughput;
390 
391  //update
392  doSnapshot(lumi,true);
393 
394  // create file name for slow monitoring file
395  std::stringstream slowFileName;
396  slowFileName << slowName_ << "_ls" << std::setfill('0') << std::setw(4)
397  << lumi << "_pid" << std::setfill('0')
398  << std::setw(5) << getpid() << ".jsn";
400  slow /= slowFileName.str();
401 
402  //retrieve one result we need (todo: sanity check if it's found)
403  IntJ *lumiProcessedJptr = dynamic_cast<IntJ*>(fmt_.jsonMonitor_->getMergedIntJForLumi("Processed",lumi));
404  if (!lumiProcessedJptr)
405  throw cms::Exception("FastMonitoringService") << "Internal error: got null pointer from FastMonitor";
406  processedEventsPerLumi_[lumi] = std::pair<unsigned int,bool>(lumiProcessedJptr->value(),false);
407 
408  //checking if exception has been thrown (in case of Global/Stream early termination, for this LS)
409  bool exception_detected = exception_detected_;
410  for (auto ex : exceptionInLS_)
411  if (lumi == ex) exception_detected=true;
412 
413  if (edm::shutdown_flag || exception_detected) {
414  edm::LogInfo("FastMonitoringService") << "Run interrupted. Skip writing EoL information -: "
415  << processedEventsPerLumi_[lumi].first << " events were processed in LUMI " << lumi;
416  //this will prevent output modules from producing json file for possibly incomplete lumi
417  processedEventsPerLumi_[lumi].first=0;
418  processedEventsPerLumi_[lumi].second=true;
419  //disable this exception, so service can be used standalone (will be thrown if output module asks for this information)
420  //throw cms::Exception("FastMonitoringService") << "SOURCE did not send update for lumi block. LUMI -:" << lumi;
421  return;
422 
423  }
424 
425  if (inputSource_) {
426  auto sourceReport = inputSource_->getEventReport(lumi, true);
427  if (sourceReport.first) {
428  if (sourceReport.second!=processedEventsPerLumi_[lumi].first) {
429  throw cms::Exception("FastMonitoringService") << "MISMATCH with SOURCE update. LUMI -: "
430  << lumi
431  << ", events(processed):" << processedEventsPerLumi_[lumi].first
432  << " events(source):" << sourceReport.second;
433  }
434  }
435  }
436  edm::LogInfo("FastMonitoringService") << "Statistics for lumisection -: lumi = " << lumi << " events = "
437  << lumiProcessedJptr->value() << " time = " << usecondsForLumi/1000000
438  << " size = " << accuSize << " thr = " << throughput;
439  delete lumiProcessedJptr;
440 
441  //full global and stream merge&output for this lumi
442  fmt_.jsonMonitor_->outputFullJSON(slow.string(),lumi);//full global and stream merge and JSON write for this lumi
443  fmt_.jsonMonitor_->discardCollected(lumi);//we don't do further updates for this lumi
444 
446  }
447 
449  {
450  //mark closed lumis (still keep map entries until next one)
452  }
453 
455  {
456  unsigned int sid = sc.streamID().value();
457  std::lock_guard<std::mutex> lock(fmt_.monlock_);
459 
460  //reset collected values for this stream
461  *(fmt_.m_data.processed_[sid])=0;
462 
463  ministate_[sid]=&nopath_;
465  }
466 
468  {
470  }
471 
473  {
474  unsigned int sid = sc.streamID().value();
475  std::lock_guard<std::mutex> lock(fmt_.monlock_);
476 
477  #if ATOMIC_LEVEL>=2
478  //spinlock to make sure we are not still updating event counter somewhere
479  while (streamCounterUpdating_[sid]->load(std::memory_order_acquire)) {}
480  #endif
481 
482  //update processed count to be complete at this time
484  //reset this in case stream does not get notified of next lumi (we keep processed events only)
485  ministate_[sid]=&nopath_;
487  }
489  {
491  }
492 
493 
495  {
496  //make sure that all path names are retrieved before allowing ministate to change
497  //hack: assume memory is synchronized after ~50 events seen by each stream
498  if (unlikely(eventCountForPathInit_[sc.streamID()]<50) && false==collectedPathList_[sc.streamID()]->load(std::memory_order_acquire))
499  {
500  //protection between stream threads, as well as the service monitoring thread
501  std::lock_guard<std::mutex> lock(fmt_.monlock_);
502 
503  if (firstEventId_[sc.streamID()]==0)
504  firstEventId_[sc.streamID()]=sc.eventID().event();
505  if (sc.eventID().event()==firstEventId_[sc.streamID()])
506  {
507  encPath_[sc.streamID()].update((void*)&pc.pathName());
508  return;
509  }
510  else {
511  //finished collecting path names
512  collectedPathList_[sc.streamID()]->store(true,std::memory_order_seq_cst);
513  fmt_.m_data.ministateBins_=encPath_[sc.streamID()].vecsize();
514  if (!pathLegendWritten_) {
515  std::string pathLegendStrJson = makePathLegendaJson();
516  FileIO::writeStringToFile(pathLegendFileJson_, pathLegendStrJson);
517  pathLegendWritten_=true;
518  }
519  }
520  }
521  else {
522  ministate_[sc.streamID()] = &(pc.pathName());
523  }
524  }
525 
526 
528  {
529  }
530 
532  {
534 
535  ministate_[sc.streamID()] = &nopath_;
536 
537  #if ATOMIC_LEVEL>=2
538  //use atomic flag to make sure end of lumi sees this
539  streamCounterUpdating_[sc.streamID()]->store(true,std::memory_order_release);
540  fmt_.m_data.processed_[sc.streamID()]->fetch_add(1,std::memory_order_release);
541  streamCounterUpdating_[sc.streamID()]->store(false,std::memory_order_release);
542 
543  #elif ATOMIC_LEVEL==1
544  //writes are atomic, we assume writes propagate to memory before stream EOL snap
545  fmt_.m_data.processed_[sc.streamID()]->fetch_add(1,std::memory_order_relaxed);
546 
547  #elif ATOMIC_LEVEL==0 //default
548  (*(fmt_.m_data.processed_[sc.streamID()]))++;
549  #endif
551 
552  //fast path counter (events accumulated in a run)
553  unsigned long res = totalEventsProcessed_.fetch_add(1,std::memory_order_relaxed);
555  //fmt_.m_data.fastPathProcessedJ_ = totalEventsProcessed_.load(std::memory_order_relaxed);
556  }
557 
559  {
561  }
562 
564  {
566  }
567 
569  {
570  microstate_[sc.streamID().value()] = (void*)(mcc.moduleDescription());
571  }
572 
574  {
575  //microstate_[sc.streamID().value()] = (void*)(mcc.moduleDescription());
577  }
578 
579  //FUNCTIONS CALLED FROM OUTSIDE
580 
581  //this is for old-fashioned service that is not thread safe and can block other streams
582  //(we assume the worst case - everything is blocked)
584  {
585  for (unsigned int i=0;i<nStreams_;i++)
587  }
588 
589  //this is for services that are multithreading-enabled or rarely blocks other streams
591  {
593  }
594 
595  //from source
596  void FastMonitoringService::accumulateFileSize(unsigned int lumi, unsigned long fileSize) {
597  std::lock_guard<std::mutex> lock(fmt_.monlock_);
598 
599  if (accuSize_.find(lumi)==accuSize_.end()) accuSize_[lumi] = fileSize;
600  else accuSize_[lumi] += fileSize;
601 
603  filesProcessedDuringLumi_[lumi] = 1;
604  else
606  }
607 
609  gettimeofday(&fileLookStart_, 0);
610  /*
611  std::cout << "Started looking for .raw file at: s=" << fileLookStart_.tv_sec << ": ms = "
612  << fileLookStart_.tv_usec / 1000.0 << std::endl;
613  */
614  }
615 
617  gettimeofday(&fileLookStop_, 0);
618  /*
619  std::cout << "Stopped looking for .raw file at: s=" << fileLookStop_.tv_sec << ": ms = "
620  << fileLookStop_.tv_usec / 1000.0 << std::endl;
621  */
622  std::lock_guard<std::mutex> lock(fmt_.monlock_);
623 
624  if (lumi>lumiFromSource_) {
626  leadTimes_.clear();
627  }
628  unsigned long elapsedTime = (fileLookStop_.tv_sec - fileLookStart_.tv_sec) * 1000000 // sec to us
629  + (fileLookStop_.tv_usec - fileLookStart_.tv_usec); // us
630  // add this to lead times for this lumi
631  leadTimes_.push_back((double)elapsedTime);
632 
633  // recompute average lead time for this lumi
634  if (leadTimes_.size() == 1) avgLeadTime_[lumi] = leadTimes_[0];
635  else {
636  double totTime = 0;
637  for (unsigned int i = 0; i < leadTimes_.size(); i++) totTime += leadTimes_[i];
638  avgLeadTime_[lumi] = 0.001*(totTime / leadTimes_.size());
639  }
640  }
641 
642  void FastMonitoringService::reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount)
643  {
644  std::lock_guard<std::mutex> lock(fmt_.monlock_);
645  lockStatsDuringLumi_[ls]=std::pair<double,unsigned int>(waitTime,lockCount);
646 
647  }
648 
649  //for the output module
650  unsigned int FastMonitoringService::getEventsProcessedForLumi(unsigned int lumi, bool * abortFlag) {
651  std::lock_guard<std::mutex> lock(fmt_.monlock_);
652 
653  auto it = processedEventsPerLumi_.find(lumi);
654  if (it!=processedEventsPerLumi_.end()) {
655  unsigned int proc = it->second.first;
656  if (abortFlag) *abortFlag=it->second.second;
657  return proc;
658  }
659  else {
660  throw cms::Exception("FastMonitoringService") << "output module wants already deleted (or never reported by SOURCE) lumisection event count for LUMI -: "<<lumi;
661  return 0;
662  }
663  }
664 
665  //for the output module
667  std::lock_guard<std::mutex> lock(fmt_.monlock_);
668 
669  auto it = processedEventsPerLumi_.find(lumi);
670  if (it!=processedEventsPerLumi_.end()) {
671  unsigned int abortFlag = it->second.second;
672  return abortFlag;
673  }
674  else {
675  throw cms::Exception("FastMonitoringService") << "output module wants already deleted (or never reported by SOURCE) lumisection status for LUMI -: "<<lumi;
676  return 0;
677  }
678  }
679 
680  void FastMonitoringService::doSnapshot(const unsigned int ls, const bool isGlobalEOL) {
681  // update macrostate
683 
684  //update these unless in the midst of a global transition
686 
687  auto itd = avgLeadTime_.find(ls);
688  if (itd != avgLeadTime_.end())
689  fmt_.m_data.fastAvgLeadTimeJ_ = itd->second;
690  else fmt_.m_data.fastAvgLeadTimeJ_=0.;
691 
692  auto iti = filesProcessedDuringLumi_.find(ls);
693  if (iti != filesProcessedDuringLumi_.end())
694  fmt_.m_data.fastFilesProcessedJ_ = iti->second;
696 
697  auto itrd = lockStatsDuringLumi_.find(ls);
698  if (itrd != lockStatsDuringLumi_.end()) {
699  fmt_.m_data.fastLockWaitJ_ = itrd->second.first;
700  fmt_.m_data.fastLockCountJ_ = itrd->second.second;
701  }
702  else {
705  }
706 
707  }
708  else return;
709 
710  //capture latest mini/microstate of streams
711  for (unsigned int i=0;i<nStreams_;i++) {
714  }
715  //for (unsigned int i=0;i<nThreads_;i++)
716  // fmt_.m_data.threadMicrostateEncoded_[i] = encModule_.encode(threadMicrostate_[i]);
717 
718  if (isGlobalEOL)
719  {//only update global variables
720  fmt_.jsonMonitor_->snapGlobal(ls);
721  }
722  else
723  fmt_.jsonMonitor_->snap(ls);
724  }
725 
726 } //end namespace evf
727 
#define LogDebug(id)
void prePathEvent(edm::StreamContext const &, edm::PathContext const &)
std::string const & pathName() const
Definition: PathContext.h:37
unsigned int maxNumberOfThreads() const
Definition: SystemBounds.h:46
EventNumber_t event() const
Definition: EventID.h:41
void watchPreStreamEarlyTermination(PreStreamEarlyTermination::slot_type const &iSlot)
void watchPreEvent(PreEvent::slot_type const &iSlot)
int i
Definition: DBlmapReader.cc:9
unsigned int getEventsProcessedForLumi(unsigned int lumi, bool *abortFlag=0)
static const edm::ModuleDescription reservedMicroStateNames[mCOUNT]
void watchPrePathEvent(PrePathEvent::slot_type const &iSlot)
void watchPreallocate(Preallocate::slot_type const &iSlot)
ParameterDescriptionBase * addUntracked(U const &iLabel, T const &value)
void setExceptionDetected(unsigned int ls)
boost::filesystem::path runDirectory_
void watchPostEndJob(PostEndJob::slot_type const &iSlot)
void preallocate(edm::service::SystemBounds const &)
std::map< unsigned int, timeval > lumiStartTime_
void start(void(FastMonitoringService::*fp)(), FastMonitoringService *cp)
void preGlobalBeginLumi(edm::GlobalContext const &)
std::pair< bool, unsigned int > getEventReport(unsigned int lumi, bool erase)
void setAllowAnything()
allow any parameter label/value pairs
double throughputFactor()
void watchPreModuleEvent(PreModuleEvent::slot_type const &iSlot)
void postGlobalEndLumi(edm::GlobalContext const &)
TrainProcessor *const proc
Definition: MVATrainer.cc:101
void postEvent(edm::StreamContext const &)
void accumulateFileSize(unsigned int lumi, unsigned long fileSize)
void watchPreGlobalEndLumi(PreGlobalEndLumi::slot_type const &iSlot)
std::map< unsigned int, unsigned long > accuSize_
tuple lumi
Definition: fjr2json.py:35
std::vector< unsigned int > eventCountForPathInit_
std::vector< std::atomic< bool > * > streamCounterUpdating_
void watchPostEvent(PostEvent::slot_type const &iSlot)
static const int nReservedPaths
LuminosityBlockID const & luminosityBlockID() const
Definition: GlobalContext.h:52
def ls
Definition: eostools.py:348
void watchPostStreamEndLumi(PostStreamEndLumi::slot_type const &iSlot)
void watchPreGlobalBeginLumi(PreGlobalBeginLumi::slot_type const &iSlot)
std::string const & moduleName() const
void watchPostModuleEvent(PostModuleEvent::slot_type const &iSlot)
Value & append(const Value &value)
Append value to array at the end.
void watchPostSourceEvent(PostSourceEvent::slot_type const &iSlot)
volatile std::atomic< bool > shutdown_flag
void doStreamEOLSnapshot(const unsigned int ls, const unsigned int streamID)
std::map< unsigned int, unsigned int > filesProcessedDuringLumi_
void preGlobalEndLumi(edm::GlobalContext const &)
Represents a JSON value.
Definition: value.h:111
LuminosityBlockNumber_t luminosityBlock() const
Definition: EventID.h:40
void watchPreStreamEndLumi(PreStreamEndLumi::slot_type const &iSlot)
void preGlobalEarlyTermination(edm::GlobalContext const &, edm::TerminationOrigin)
#define constexpr
void watchPreSourceEarlyTermination(PreSourceEarlyTermination::slot_type const &iSlot)
void watchJobFailure(JobFailure::slot_type const &iSlot)
convenience function for attaching to signal
void registerVariables(jsoncollector::FastMonitor *fm, unsigned int nStreams, unsigned int nThreads)
#define unlikely(x)
unsigned int maxNumberOfStreams() const
Definition: SystemBounds.h:43
void preModuleBeginJob(edm::ModuleDescription const &)
void setComment(std::string const &value)
std::vector< std::atomic< bool > * > collectedPathList_
std::queue< unsigned int > lastGlobalLumisClosed_
void preStreamEndLumi(edm::StreamContext const &)
std::map< unsigned int, double > avgLeadTime_
void watchPostStreamBeginLumi(PostStreamBeginLumi::slot_type const &iSlot)
void doSnapshot(const unsigned int ls, const bool isGlobalEOL)
void preStreamEarlyTermination(edm::StreamContext const &, edm::TerminationOrigin)
void watchPreGlobalEarlyTermination(PreGlobalEarlyTermination::slot_type const &iSlot)
std::unique_ptr< jsoncollector::FastMonitor > jsonMonitor_
static const std::string nopath_
def load
Definition: svgfig.py:546
ModuleDescription const * moduleDescription() const
void resetFastMonitor(std::string const &microStateDefPath, std::string const &fastMicroStateDefPath)
static const int nReservedModules
void watchPostGlobalEndLumi(PostGlobalEndLumi::slot_type const &iSlot)
std::map< unsigned int, std::pair< double, unsigned int > > lockStatsDuringLumi_
std::vector< unsigned long > firstEventId_
void watchPreModuleBeginJob(PreModuleBeginJob::slot_type const &iSlot)
void postModuleEvent(edm::StreamContext const &, edm::ModuleCallingContext const &)
virtual std::string write(const Value &root)
Serialize a Value in JSON format.
void postStreamBeginLumi(edm::StreamContext const &)
StreamID const & streamID() const
Definition: StreamContext.h:57
void postStreamEndLumi(edm::StreamContext const &)
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
std::vector< const void * > microstate_
std::vector< unsigned int > microstateEncoded_
unsigned int value() const
Definition: StreamID.h:46
std::map< unsigned int, std::pair< unsigned int, bool > > processedEventsPerLumi_
void preStreamBeginLumi(edm::StreamContext const &)
std::atomic< unsigned long > totalEventsProcessed_
FedRawDataInputSource * inputSource_
static const int nSpecialModules
LuminosityBlockNumber_t luminosityBlock() const
void watchPreStreamBeginLumi(PreStreamBeginLumi::slot_type const &iSlot)
void add(std::string const &label, ParameterSetDescription const &psetDescription)
FastMonitoringThread::Macrostate macrostate_
std::vector< double > leadTimes_
void preBeginJob(edm::PathsAndConsumesOfModulesBase const &, edm::ProcessContext const &pc)
void watchPreBeginJob(PreBeginJob::slot_type const &iSlot)
convenience function for attaching to signal
void stoppedLookingForFile(unsigned int lumi)
void setMicroState(MicroStateService::Microstate)
boost::filesystem::path workingDirectory_
std::vector< jsoncollector::AtomicMonUInt * > processed_
void postGlobalBeginRun(edm::GlobalContext const &)
void preEvent(edm::StreamContext const &)
void preSourceEarlyTermination(edm::TerminationOrigin)
EventID const & eventID() const
Definition: StreamContext.h:59
void watchPreSourceEvent(PreSourceEvent::slot_type const &iSlot)
bool getAbortFlagForLumi(unsigned int lumi)
void preModuleEvent(edm::StreamContext const &, edm::ModuleCallingContext const &)
Writes a Value in JSON format in a human friendly way.
Definition: writer.h:65
std::vector< const void * > ministate_
const void * decode(unsigned int index)
std::vector< unsigned int > streamLumi_
std::vector< Encoding > encPath_
std::vector< unsigned int > ministateEncoded_
void reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount)
std::vector< unsigned int > exceptionInLS_
void watchPostBeginJob(PostBeginJob::slot_type const &iSlot)
convenience function for attaching to signal
array value (ordered list)
Definition: value.h:31