CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
FastMonitoringService.cc
Go to the documentation of this file.
2 #include <iostream>
3 
5 #include <iomanip>
6 #include <sys/time.h>
7 
17 
20 
21 constexpr double throughputFactor() {return (1000000)/double(1024*1024);}
22 
23 namespace evf{
24 
26  {"Init","JobReady","RunGiven","Running",
27  "Stopping","Done","JobEnded","Error","ErrorEnded","End",
28  "Invalid"};
29 
31 
33  edm::ActivityRegistry& reg) :
34  MicroStateService(iPS,reg)
35  ,encModule_(33)
36  ,nStreams_(0)//until initialized
37  ,sleepTime_(iPS.getUntrackedParameter<int>("sleepTime", 1))
38  ,fastMonIntervals_(iPS.getUntrackedParameter<unsigned int>("fastMonIntervals", 1))
39  ,microstateDefPath_(iPS.getUntrackedParameter<std::string> ("microstateDefPath", std::string(getenv("CMSSW_BASE"))+"/src/EventFilter/Utilities/plugins/microstatedef.jsd"))
40  ,fastMicrostateDefPath_(iPS.getUntrackedParameter<std::string>("fastMicrostateDefPath", microstateDefPath_))
41  ,fastName_(iPS.getUntrackedParameter<std::string>("fastName", "fastmoni"))
42  ,slowName_(iPS.getUntrackedParameter<std::string>("slowName", "slowmoni"))
43  ,totalEventsProcessed_(0)
44  {
45  reg.watchPreallocate(this, &FastMonitoringService::preallocate);//receiving information on number of threads
47 
51 
55 
60 
62 
65 
66  reg.watchPreSourceEvent(this,&FastMonitoringService::preSourceEvent);//source (with streamID of requestor)
68 
71 
75  }
76 
77 
79  {
80  }
81 
82 
84  //taken from first stream
85  std::ostringstream ost;
86  for(int i = 0;
87  i < encPath_[0].current_;
88  i++)
89  ost<<i<<"="<<*((std::string *)(encPath_[0].decode(i)))<<" ";
90  return ost.str();
91  }
92 
93 
95  {
96  std::ostringstream ost;
97  for(int i = 0; i < encModule_.current_; i++)
98  ost<<i<<"="<<((const edm::ModuleDescription *)(encModule_.decode(i)))->moduleLabel()<<" ";
99  return ost.str();
100  }
101 
102 
104  {
105 
106  // FIND RUN DIRECTORY
107  // The run dir should be set via the configuration of EvFDaqDirector
108 
109  if (edm::Service<evf::EvFDaqDirector>().operator->()==nullptr)
110  {
111  throw cms::Exception("FastMonitoringService") << "EvFDaqDirector is not present";
112 
113  }
114  emptyLumisectionMode_ = edm::Service<evf::EvFDaqDirector>()->emptyLumisectionMode();
115  boost::filesystem::path runDirectory(edm::Service<evf::EvFDaqDirector>()->baseRunDir());
116  workingDirectory_ = runDirectory_ = runDirectory;
117  workingDirectory_ /= "mon";
118 
119  if ( !boost::filesystem::is_directory(workingDirectory_)) {
120  LogDebug("FastMonitoringService") << "<MON> DIR NOT FOUND! Trying to create -: " << workingDirectory_.string() ;
121  boost::filesystem::create_directories(workingDirectory_);
122  if ( !boost::filesystem::is_directory(workingDirectory_))
123  edm::LogWarning("FastMonitoringService") << "Unable to create <MON> DIR -: " << workingDirectory_.string()
124  << ". No monitoring data will be written.";
125  }
126 
127  std::ostringstream fastFileName;
128 
129  fastFileName << fastName_ << "_pid" << std::setfill('0') << std::setw(5) << getpid() << ".fast";
131  fast /= fastFileName.str();
132  fastPath_ = fast.string();
133 
134  std::ostringstream moduleLegFile;
135  moduleLegFile << "microstatelegend_pid" << std::setfill('0') << std::setw(5) << getpid() << ".leg";
136  moduleLegendFile_ = (workingDirectory_/moduleLegFile.str()).string();
137 
138  std::ostringstream pathLegFile;
139  pathLegFile << "pathlegend_pid" << std::setfill('0') << std::setw(5) << getpid() << ".leg";
140  pathLegendFile_ = (workingDirectory_/pathLegFile.str()).string();
141 
142  LogDebug("FastMonitoringService") << "Initializing FastMonitor with microstate def path -: "
144  //<< encPath_.current_ + 1 << " " << encModule_.current_ + 1
145 
146  nStreams_=bounds.maxNumberOfStreams();
147  nThreads_=bounds.maxNumberOfThreads();
148 
149  //this should already be >=1
150  if (nStreams_==0) nStreams_=1;
151  if (nThreads_==0) nThreads_=1;
152 
153  /*
154  * initialize the fast monitor with:
155  * vector of pointers to monitorable parameters
156  * path to definition
157  *
158  */
159 
161 
162  for(unsigned int i = 0; i < (mCOUNT); i++)
165 
166  for (unsigned int i=0;i<nStreams_;i++) {
167  ministate_.push_back(&nopath_);
169 
170  //for synchronization
171  streamCounterUpdating_.push_back(new std::atomic<bool>(0));
172 
173  //path (mini) state
174  encPath_.emplace_back(0);
175  encPath_[i].update((void*)&nopath_);
176  eventCountForPathInit_.push_back(0);
177  firstEventId_.push_back(0);
178  collectedPathList_.push_back(new std::atomic<bool>(0));
179 
180  }
181  //for (unsigned int i=0;i<nThreads_;i++)
182  // threadMicrostate_.push_back(&reservedMicroStateNames[mInvalid]);
183 
184  //initial size until we detect number of bins
188 
189  lastGlobalLumi_=0;
191  lumiFromSource_=0;
192 
193  //startup monitoring
195  fmt_.jsonMonitor_->setNStreams(nStreams_);
197  monInit_.store(false,std::memory_order_release);
199 
200  //this definition needs: #include "tbb/compat/thread"
201  //however this would results in TBB imeplementation replacing std::thread
202  //(both supposedly call pthread_self())
203  //number of threads created in process could be obtained from /proc,
204  //assuming that all posix threads are true kernel threads capable of running in parallel
205 
206  //#if TBB_IMPLEMENT_CPP0X
208  //threadIDAvailable_=true;
209  //#endif
210 
211  }
212 
214  {
215  std::string context;
216  if (to==edm::TerminationOrigin::ExceptionFromThisContext) context = " FromThisContext ";
217  if (to==edm::TerminationOrigin::ExceptionFromAnotherContext) context = " FromAnotherContext";
218  if (to==edm::TerminationOrigin::ExternalSignal) context = " FromExternalSignal";
219  edm::LogInfo("FastMonitoringService") << " STREAM " << sc.streamID().value() << " earlyTermination -: ID:"<< sc.eventID()
220  << " LS:" << sc.eventID().luminosityBlock() << " " << context;
221  std::lock_guard<std::mutex> lock(fmt_.monlock_);
222  exceptionInLS_.push_back(sc.eventID().luminosityBlock());
223  //exception_detected_=true;
224  }
225 
227  {
228  std::string context;
229  if (to==edm::TerminationOrigin::ExceptionFromThisContext) context = " FromThisContext ";
230  if (to==edm::TerminationOrigin::ExceptionFromAnotherContext) context = " FromAnotherContext";
231  if (to==edm::TerminationOrigin::ExternalSignal) context = " FromExternalSignal";
232  edm::LogInfo("FastMonitoringService") << " GLOBAL " << "earlyTermination -: LS:"
233  << gc.luminosityBlockID().luminosityBlock() << " " << context;
234  std::lock_guard<std::mutex> lock(fmt_.monlock_);
236  //exception_detected_=true;
237  }
238 
240  {
241  std::string context;
242  if (to==edm::TerminationOrigin::ExceptionFromThisContext) context = " FromThisContext ";
243  if (to==edm::TerminationOrigin::ExceptionFromAnotherContext) context = " FromAnotherContext";
244  if (to==edm::TerminationOrigin::ExternalSignal) context = " FromExternalSignal";
245  edm::LogInfo("FastMonitoringService") << " SOURCE " << "earlyTermination -: " << context;
246  std::lock_guard<std::mutex> lock(fmt_.monlock_);
247  exception_detected_=true;
248  }
249 
251  if (!ls) exception_detected_=true;
252  else exceptionInLS_.push_back(ls);
253  }
254 
256  {
258  }
259 
260  //new output module name is stream
262  {
263  std::lock_guard<std::mutex> lock(fmt_.monlock_);
264  //std::cout << " Pre module Begin Job module: " << desc.moduleName() << std::endl;
265 
266  //build a map of modules keyed by their module description address
267  //here we need to treat output modules in a special way so they can be easily singled out
268  if(desc.moduleName() == "Stream" || desc.moduleName() == "ShmStreamConsumer" || desc.moduleName() == "EvFOutputModule" ||
269  desc.moduleName() == "EventStreamFileWriter" || desc.moduleName() == "PoolOutputModule")
270  encModule_.updateReserved((void*)&desc);
271  else
272  encModule_.update((void*)&desc);
273  }
274 
276  {
277  std::string && moduleLegStr = makeModuleLegenda();
278  FileIO::writeStringToFile(moduleLegendFile_, moduleLegStr);
279 
281 
282  //update number of entries in module histogram
283  std::lock_guard<std::mutex> lock(fmt_.monlock_);
285  }
286 
288  {
290  fmt_.stop();
291  }
292 
294  {
296  }
297 
299  {
300 
301  timeval lumiStartTime;
302  gettimeofday(&lumiStartTime, 0);
303  unsigned int newLumi = gc.luminosityBlockID().luminosityBlock();
304 
305  std::lock_guard<std::mutex> lock(fmt_.monlock_);
306 
307  lumiStartTime_[newLumi]=lumiStartTime;
308  while (!lastGlobalLumisClosed_.empty()) {
309  //wipe out old map entries as they aren't needed and slow down access
310  unsigned int oldLumi = lastGlobalLumisClosed_.back();
312  lumiStartTime_.erase(oldLumi);
313  avgLeadTime_.erase(oldLumi);
314  filesProcessedDuringLumi_.erase(oldLumi);
315  accuSize_.erase(oldLumi);
316  lockStatsDuringLumi_.erase(oldLumi);
317  processedEventsPerLumi_.erase(oldLumi);
318  }
319  lastGlobalLumi_= newLumi;
321  }
322 
324  {
325  unsigned int lumi = gc.luminosityBlockID().luminosityBlock();
326  LogDebug("FastMonitoringService") << "Lumi ended. Writing JSON information. LUMI -: "
327  << lumi;
328  timeval lumiStopTime;
329  gettimeofday(&lumiStopTime, 0);
330 
331  std::lock_guard<std::mutex> lock(fmt_.monlock_);
332 
333  // Compute throughput
334  timeval stt = lumiStartTime_[lumi];
335  unsigned long usecondsForLumi = (lumiStopTime.tv_sec - stt.tv_sec)*1000000
336  + (lumiStopTime.tv_usec - stt.tv_usec);
337  unsigned long accuSize = accuSize_.find(lumi)==accuSize_.end() ? 0 : accuSize_[lumi];
338  double throughput = throughputFactor()* double(accuSize) / double(usecondsForLumi);
339  //store to registered variable
340  fmt_.m_data.fastThroughputJ_.value() = throughput;
341 
342  //update
343  doSnapshot(lumi,true);
344 
345  // create file name for slow monitoring file
346  std::stringstream slowFileName;
347  slowFileName << slowName_ << "_ls" << std::setfill('0') << std::setw(4)
348  << lumi << "_pid" << std::setfill('0')
349  << std::setw(5) << getpid() << ".jsn";
351  slow /= slowFileName.str();
352 
353  //retrieve one result we need (todo: sanity check if it's found)
354  IntJ *lumiProcessedJptr = dynamic_cast<IntJ*>(fmt_.jsonMonitor_->getMergedIntJForLumi("Processed",lumi));
355  if (!lumiProcessedJptr)
356  throw cms::Exception("FastMonitoringService") << "Internal error: got null pointer from FastMonitor";
357  processedEventsPerLumi_[lumi] = std::pair<unsigned int,bool>(lumiProcessedJptr->value(),false);
358 
359  {
360  auto itr = sourceEventsReport_.find(lumi);
361  if (itr==sourceEventsReport_.end()) {
362  //check if exception has been thrown (in case of Global/Stream early termination, for this LS)
363  bool exception_detected = exception_detected_;
364  for (auto ex : exceptionInLS_)
365  if (lumi == ex) exception_detected=true;
366 
367  if (edm::shutdown_flag || exception_detected) {
368  edm::LogInfo("FastMonitoringService") << "Run interrupted. Skip writing EoL information -: "
369  << processedEventsPerLumi_[lumi].first << " events were processed in LUMI " << lumi;
370  //this will prevent output modules from producing json file for possibly incomplete lumi
371  processedEventsPerLumi_[lumi].first=0;
372  processedEventsPerLumi_[lumi].second=true;
373  return;
374  }
375  //disable this exception, so service can be used standalone (will be thrown if output module asks for this information)
376  //throw cms::Exception("FastMonitoringService") << "SOURCE did not send update for lumi block. LUMI -:" << lumi;
377  }
378  else {
379  if (itr->second!=processedEventsPerLumi_[lumi].first) {
380  throw cms::Exception("FastMonitoringService") << "MISMATCH with SOURCE update. LUMI -: "
381  << lumi
382  << ", events(processed):" << processedEventsPerLumi_[lumi].first
383  << " events(source):" << itr->second;
384  }
385  sourceEventsReport_.erase(itr);
386  }
387  }
388  edm::LogInfo("FastMonitoringService") << "Statistics for lumisection -: lumi = " << lumi << " events = "
389  << lumiProcessedJptr->value() << " time = " << usecondsForLumi/1000000
390  << " size = " << accuSize << " thr = " << throughput;
391  delete lumiProcessedJptr;
392 
393  //full global and stream merge&output for this lumi
394  fmt_.jsonMonitor_->outputFullJSON(slow.string(),lumi);//full global and stream merge and JSON write for this lumi
395  fmt_.jsonMonitor_->discardCollected(lumi);//we don't do further updates for this lumi
396 
398  }
399 
401  {
402  //mark closed lumis (still keep map entries until next one)
404  }
405 
407  {
408  unsigned int sid = sc.streamID().value();
409  std::lock_guard<std::mutex> lock(fmt_.monlock_);
411 
412  //reset collected values for this stream
413  *(fmt_.m_data.processed_[sid])=0;
414 
415  ministate_[sid]=&nopath_;
417  }
418 
420  {
422  }
423 
425  {
426  unsigned int sid = sc.streamID().value();
427  std::lock_guard<std::mutex> lock(fmt_.monlock_);
428 
429  #if ATOMIC_LEVEL>=2
430  //spinlock to make sure we are not still updating event counter somewhere
431  while (streamCounterUpdating_[sid]->load(std::memory_order_acquire)) {}
432  #endif
433 
434  //update processed count to be complete at this time
436  //reset this in case stream does not get notified of next lumi (we keep processed events only)
437  ministate_[sid]=&nopath_;
439  }
441  {
443  }
444 
445 
447  {
448  //make sure that all path names are retrieved before allowing ministate to change
449  //hack: assume memory is synchronized after ~50 events seen by each stream
450  if (unlikely(eventCountForPathInit_[sc.streamID()]<50) && false==collectedPathList_[sc.streamID()]->load(std::memory_order_acquire))
451  {
452  //protection between stream threads, as well as the service monitoring thread
453  std::lock_guard<std::mutex> lock(fmt_.monlock_);
454 
455  if (firstEventId_[sc.streamID()]==0)
456  firstEventId_[sc.streamID()]=sc.eventID().event();
457  if (sc.eventID().event()==firstEventId_[sc.streamID()])
458  {
459  encPath_[sc.streamID()].update((void*)&pc.pathName());
460  return;
461  }
462  else {
463  //finished collecting path names
464  collectedPathList_[sc.streamID()]->store(true,std::memory_order_seq_cst);
465  fmt_.m_data.ministateBins_=encPath_[sc.streamID()].vecsize();
466  if (!pathLegendWritten_) {
467  std::string pathLegendStr = makePathLegenda();
468  FileIO::writeStringToFile(pathLegendFile_, pathLegendStr);
469  pathLegendWritten_=true;
470  }
471  }
472  }
473  else {
474  ministate_[sc.streamID()] = &(pc.pathName());
475  }
476  }
477 
478 
480  {
481  }
482 
484  {
486 
487  ministate_[sc.streamID()] = &nopath_;
488 
489  #if ATOMIC_LEVEL>=2
490  //use atomic flag to make sure end of lumi sees this
491  streamCounterUpdating_[sc.streamID()]->store(true,std::memory_order_release);
492  fmt_.m_data.processed_[sc.streamID()]->fetch_add(1,std::memory_order_release);
493  streamCounterUpdating_[sc.streamID()]->store(false,std::memory_order_release);
494 
495  #elif ATOMIC_LEVEL==1
496  //writes are atomic, we assume writes propagate to memory before stream EOL snap
497  fmt_.m_data.processed_[sc.streamID()]->fetch_add(1,std::memory_order_relaxed);
498 
499  #elif ATOMIC_LEVEL==0 //default
500  (*(fmt_.m_data.processed_[sc.streamID()]))++;
501  #endif
503 
504  //fast path counter (events accumulated in a run)
505  unsigned long res = totalEventsProcessed_.fetch_add(1,std::memory_order_relaxed);
507  //fmt_.m_data.fastPathProcessedJ_ = totalEventsProcessed_.load(std::memory_order_relaxed);
508  }
509 
511  {
513  }
514 
516  {
518  }
519 
521  {
522  microstate_[sc.streamID().value()] = (void*)(mcc.moduleDescription());
523  }
524 
526  {
527  //microstate_[sc.streamID().value()] = (void*)(mcc.moduleDescription());
529  }
530 
531  //FUNCTIONS CALLED FROM OUTSIDE
532 
533  //this is for old-fashioned service that is not thread safe and can block other streams
534  //(we assume the worst case - everything is blocked)
536  {
537  for (unsigned int i=0;i<nStreams_;i++)
539  }
540 
541  //this is for services that are multithreading-enabled or rarely blocks other streams
543  {
545  }
546 
547  //from source
548  void FastMonitoringService::accumulateFileSize(unsigned int lumi, unsigned long fileSize) {
549  std::lock_guard<std::mutex> lock(fmt_.monlock_);
550 
551  if (accuSize_.find(lumi)==accuSize_.end()) accuSize_[lumi] = fileSize;
552  else accuSize_[lumi] += fileSize;
553 
555  filesProcessedDuringLumi_[lumi] = 1;
556  else
558  }
559 
561  gettimeofday(&fileLookStart_, 0);
562  /*
563  std::cout << "Started looking for .raw file at: s=" << fileLookStart_.tv_sec << ": ms = "
564  << fileLookStart_.tv_usec / 1000.0 << std::endl;
565  */
566  }
567 
569  gettimeofday(&fileLookStop_, 0);
570  /*
571  std::cout << "Stopped looking for .raw file at: s=" << fileLookStop_.tv_sec << ": ms = "
572  << fileLookStop_.tv_usec / 1000.0 << std::endl;
573  */
574  std::lock_guard<std::mutex> lock(fmt_.monlock_);
575 
576  if (lumi>lumiFromSource_) {
578  leadTimes_.clear();
579  }
580  unsigned long elapsedTime = (fileLookStop_.tv_sec - fileLookStart_.tv_sec) * 1000000 // sec to us
581  + (fileLookStop_.tv_usec - fileLookStart_.tv_usec); // us
582  // add this to lead times for this lumi
583  leadTimes_.push_back((double)elapsedTime);
584 
585  // recompute average lead time for this lumi
586  if (leadTimes_.size() == 1) avgLeadTime_[lumi] = leadTimes_[0];
587  else {
588  double totTime = 0;
589  for (unsigned int i = 0; i < leadTimes_.size(); i++) totTime += leadTimes_[i];
590  avgLeadTime_[lumi] = 0.001*(totTime / leadTimes_.size());
591  }
592  }
593 
594  void FastMonitoringService::reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount)
595  {
596  std::lock_guard<std::mutex> lock(fmt_.monlock_);
597  lockStatsDuringLumi_[ls]=std::pair<double,unsigned int>(waitTime,lockCount);
598 
599  }
600 
601  //for the output module
602  unsigned int FastMonitoringService::getEventsProcessedForLumi(unsigned int lumi, bool * abortFlag) {
603  std::lock_guard<std::mutex> lock(fmt_.monlock_);
604 
605  auto it = processedEventsPerLumi_.find(lumi);
606  if (it!=processedEventsPerLumi_.end()) {
607  unsigned int proc = it->second.first;
608  if (abortFlag) *abortFlag=it->second.second;
609  return proc;
610  }
611  else {
612  throw cms::Exception("FastMonitoringService") << "output module wants already deleted (or never reported by SOURCE) lumisection event count for LUMI -: "<<lumi;
613  return 0;
614  }
615  }
616 
617  //for the output module
619  std::lock_guard<std::mutex> lock(fmt_.monlock_);
620 
621  auto it = processedEventsPerLumi_.find(lumi);
622  if (it!=processedEventsPerLumi_.end()) {
623  unsigned int abortFlag = it->second.second;
624  return abortFlag;
625  }
626  else {
627  throw cms::Exception("FastMonitoringService") << "output module wants already deleted (or never reported by SOURCE) lumisection status for LUMI -: "<<lumi;
628  return 0;
629  }
630  }
631 
632  void FastMonitoringService::doSnapshot(const unsigned int ls, const bool isGlobalEOL) {
633  // update macrostate
635 
636  //update these unless in the midst of a global transition
638 
639  auto itd = avgLeadTime_.find(ls);
640  if (itd != avgLeadTime_.end())
641  fmt_.m_data.fastAvgLeadTimeJ_ = itd->second;
642  else fmt_.m_data.fastAvgLeadTimeJ_=0.;
643 
644  auto iti = filesProcessedDuringLumi_.find(ls);
645  if (iti != filesProcessedDuringLumi_.end())
646  fmt_.m_data.fastFilesProcessedJ_ = iti->second;
648 
649  auto itrd = lockStatsDuringLumi_.find(ls);
650  if (itrd != lockStatsDuringLumi_.end()) {
651  fmt_.m_data.fastLockWaitJ_ = itrd->second.first;
652  fmt_.m_data.fastLockCountJ_ = itrd->second.second;
653  }
654  else {
657  }
658 
659  }
660  else return;
661 
662  //capture latest mini/microstate of streams
663  for (unsigned int i=0;i<nStreams_;i++) {
666  }
667  //for (unsigned int i=0;i<nThreads_;i++)
668  // fmt_.m_data.threadMicrostateEncoded_[i] = encModule_.encode(threadMicrostate_[i]);
669 
670  if (isGlobalEOL)
671  {//only update global variables
672  fmt_.jsonMonitor_->snapGlobal(ls);
673  }
674  else
675  fmt_.jsonMonitor_->snap(ls);
676  }
677 
679  {
680 
681  std::lock_guard<std::mutex> lock(fmt_.monlock_);
682  auto itr = sourceEventsReport_.find(lumi);
683  if (itr!=sourceEventsReport_.end())
684  itr->second+=events;
685  else
686  sourceEventsReport_[lumi]=events;
687 
688  }
689 } //end namespace evf
690 
#define LogDebug(id)
void prePathEvent(edm::StreamContext const &, edm::PathContext const &)
std::string const & pathName() const
Definition: PathContext.h:37
unsigned int maxNumberOfThreads() const
Definition: SystemBounds.h:46
EventNumber_t event() const
Definition: EventID.h:41
void watchPreStreamEarlyTermination(PreStreamEarlyTermination::slot_type const &iSlot)
void watchPreEvent(PreEvent::slot_type const &iSlot)
int i
Definition: DBlmapReader.cc:9
unsigned int getEventsProcessedForLumi(unsigned int lumi, bool *abortFlag=0)
static const edm::ModuleDescription reservedMicroStateNames[mCOUNT]
std::vector< AtomicMonUInt * > processed_
void watchPrePathEvent(PrePathEvent::slot_type const &iSlot)
void watchPreallocate(Preallocate::slot_type const &iSlot)
void setExceptionDetected(unsigned int ls)
boost::filesystem::path runDirectory_
void watchPostEndJob(PostEndJob::slot_type const &iSlot)
void preallocate(edm::service::SystemBounds const &)
std::map< unsigned int, timeval > lumiStartTime_
void start(void(FastMonitoringService::*fp)(), FastMonitoringService *cp)
void preGlobalBeginLumi(edm::GlobalContext const &)
double throughputFactor()
void watchPreModuleEvent(PreModuleEvent::slot_type const &iSlot)
void postGlobalEndLumi(edm::GlobalContext const &)
TrainProcessor *const proc
Definition: MVATrainer.cc:101
void postEvent(edm::StreamContext const &)
void accumulateFileSize(unsigned int lumi, unsigned long fileSize)
void watchPreGlobalEndLumi(PreGlobalEndLumi::slot_type const &iSlot)
std::map< unsigned int, unsigned long > accuSize_
tuple lumi
Definition: fjr2json.py:35
std::vector< unsigned int > eventCountForPathInit_
std::vector< std::atomic< bool > * > streamCounterUpdating_
void watchPostEvent(PostEvent::slot_type const &iSlot)
LuminosityBlockID const & luminosityBlockID() const
Definition: GlobalContext.h:52
void watchPostStreamEndLumi(PostStreamEndLumi::slot_type const &iSlot)
void watchPreGlobalBeginLumi(PreGlobalBeginLumi::slot_type const &iSlot)
std::string const & moduleName() const
void watchPostModuleEvent(PostModuleEvent::slot_type const &iSlot)
void watchPostSourceEvent(PostSourceEvent::slot_type const &iSlot)
volatile std::atomic< bool > shutdown_flag
void doStreamEOLSnapshot(const unsigned int ls, const unsigned int streamID)
std::map< unsigned int, unsigned int > filesProcessedDuringLumi_
void preGlobalEndLumi(edm::GlobalContext const &)
LuminosityBlockNumber_t luminosityBlock() const
Definition: EventID.h:40
void watchPreStreamEndLumi(PreStreamEndLumi::slot_type const &iSlot)
void preGlobalEarlyTermination(edm::GlobalContext const &, edm::TerminationOrigin)
#define constexpr
void watchPreSourceEarlyTermination(PreSourceEarlyTermination::slot_type const &iSlot)
void watchJobFailure(JobFailure::slot_type const &iSlot)
convenience function for attaching to signal
void reportEventsThisLumiInSource(unsigned int lumi, unsigned int events)
#define unlikely(x)
unsigned int maxNumberOfStreams() const
Definition: SystemBounds.h:43
void preModuleBeginJob(edm::ModuleDescription const &)
std::vector< std::atomic< bool > * > collectedPathList_
tuple path
else: Piece not in the list, fine.
std::queue< unsigned int > lastGlobalLumisClosed_
void preStreamEndLumi(edm::StreamContext const &)
std::map< unsigned int, double > avgLeadTime_
void watchPostStreamBeginLumi(PostStreamBeginLumi::slot_type const &iSlot)
void doSnapshot(const unsigned int ls, const bool isGlobalEOL)
void preStreamEarlyTermination(edm::StreamContext const &, edm::TerminationOrigin)
void watchPreGlobalEarlyTermination(PreGlobalEarlyTermination::slot_type const &iSlot)
static const std::string nopath_
def load
Definition: svgfig.py:546
ModuleDescription const * moduleDescription() const
void resetFastMonitor(std::string const &microStateDefPath, std::string const &fastMicroStateDefPath)
bool decode(bool &, std::string const &)
Definition: types.cc:62
void watchPostGlobalEndLumi(PostGlobalEndLumi::slot_type const &iSlot)
std::map< unsigned int, std::pair< double, unsigned int > > lockStatsDuringLumi_
std::vector< unsigned long > firstEventId_
static const std::string macroStateNames[FastMonitoringThread::MCOUNT]
void watchPreModuleBeginJob(PreModuleBeginJob::slot_type const &iSlot)
void postModuleEvent(edm::StreamContext const &, edm::ModuleCallingContext const &)
void postStreamBeginLumi(edm::StreamContext const &)
StreamID const & streamID() const
Definition: StreamContext.h:57
void postStreamEndLumi(edm::StreamContext const &)
std::vector< const void * > microstate_
std::vector< unsigned int > microstateEncoded_
unsigned int value() const
Definition: StreamID.h:46
std::map< unsigned int, std::pair< unsigned int, bool > > processedEventsPerLumi_
void preStreamBeginLumi(edm::StreamContext const &)
void registerVariables(FastMonitor *fm, unsigned int nStreams, unsigned int nThreads)
std::atomic< unsigned long > totalEventsProcessed_
LuminosityBlockNumber_t luminosityBlock() const
void watchPreStreamBeginLumi(PreStreamBeginLumi::slot_type const &iSlot)
FastMonitoringThread::Macrostate macrostate_
std::vector< double > leadTimes_
void stoppedLookingForFile(unsigned int lumi)
void setMicroState(MicroStateService::Microstate)
tuple events
Definition: patZpeak.py:19
boost::filesystem::path workingDirectory_
std::map< unsigned int, unsigned int > sourceEventsReport_
std::unique_ptr< FastMonitor > jsonMonitor_
void postGlobalBeginRun(edm::GlobalContext const &)
void preEvent(edm::StreamContext const &)
void preSourceEarlyTermination(edm::TerminationOrigin)
EventID const & eventID() const
Definition: StreamContext.h:59
void watchPreSourceEvent(PreSourceEvent::slot_type const &iSlot)
bool getAbortFlagForLumi(unsigned int lumi)
void preModuleEvent(edm::StreamContext const &, edm::ModuleCallingContext const &)
std::vector< const void * > ministate_
const void * decode(unsigned int index)
std::vector< unsigned int > streamLumi_
std::vector< Encoding > encPath_
std::vector< unsigned int > ministateEncoded_
FastMonitoringService(const edm::ParameterSet &, edm::ActivityRegistry &)
void reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount)
std::vector< unsigned int > exceptionInLS_
void watchPostBeginJob(PostBeginJob::slot_type const &iSlot)
convenience function for attaching to signal