CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
FastMonitoringService.cc
Go to the documentation of this file.
2 #include <iostream>
3 
5 #include <iomanip>
6 #include <sys/time.h>
7 
17 
20 using namespace jsoncollector;
21 
22 constexpr double throughputFactor() {return (1000000)/double(1024*1024);}
23 
24 #define NRESERVEDMODULES 33
25 #define NSPECIALMODULES 7
26 #define NRESERVEDPATHS 1
27 
28 namespace evf{
29 
30  const std::string FastMonitoringService::macroStateNames[FastMonitoringThread::MCOUNT] =
31  {"Init","JobReady","RunGiven","Running",
32  "Stopping","Done","JobEnded","Error","ErrorEnded","End",
33  "Invalid"};
34 
35  const std::string FastMonitoringService::nopath_ = "NoPath";
36 
38  edm::ActivityRegistry& reg) :
39  MicroStateService(iPS,reg)
40  ,encModule_(NRESERVEDMODULES)
41  ,nStreams_(0)//until initialized
42  ,sleepTime_(iPS.getUntrackedParameter<int>("sleepTime", 1))
43  ,fastMonIntervals_(iPS.getUntrackedParameter<unsigned int>("fastMonIntervals", 1))
44  ,microstateDefPath_(iPS.getUntrackedParameter<std::string> ("microstateDefPath", std::string(getenv("CMSSW_BASE"))+"/src/EventFilter/Utilities/plugins/microstatedef.jsd"))
45  ,fastMicrostateDefPath_(iPS.getUntrackedParameter<std::string>("fastMicrostateDefPath", microstateDefPath_))
46  ,fastName_(iPS.getUntrackedParameter<std::string>("fastName", "fastmoni"))
47  ,slowName_(iPS.getUntrackedParameter<std::string>("slowName", "slowmoni"))
48  ,totalEventsProcessed_(0)
49  {
50  reg.watchPreallocate(this, &FastMonitoringService::preallocate);//receiving information on number of threads
52 
56 
60 
65 
67 
70 
71  reg.watchPreSourceEvent(this,&FastMonitoringService::preSourceEvent);//source (with streamID of requestor)
73 
76 
80  }
81 
82 
84  {
85  }
86 
87 
89  Json::Value legendaVector(Json::arrayValue);
90  for(int i = 0; i < encPath_[0].current_; i++)
91  legendaVector.append(Json::Value(*((std::string *)(encPath_[0].decode(i)))));
92  Json::Value valReserved(NRESERVEDPATHS);
93  Json::Value pathLegend;
94  pathLegend["names"]=legendaVector;
95  pathLegend["reserved"]=valReserved;
96  Json::StyledWriter writer;
97  return writer.write(pathLegend);
98  }
99 
101  Json::Value legendaVector(Json::arrayValue);
102  for(int i = 0; i < encModule_.current_; i++)
103  legendaVector.append(Json::Value(((const edm::ModuleDescription *)(encModule_.decode(i)))->moduleLabel()));
104  Json::Value valReserved(NRESERVEDMODULES);
105  Json::Value valSpecial(NSPECIALMODULES);
106  Json::Value valOutputModules(nOutputModules_);
107  Json::Value moduleLegend;
108  moduleLegend["names"]=legendaVector;
109  moduleLegend["reserved"]=valReserved;
110  moduleLegend["special"]=valSpecial;
111  moduleLegend["output"]=valOutputModules;
112  Json::StyledWriter writer;
113  return writer.write(moduleLegend);
114  }
115 
117  {
118 
119  // FIND RUN DIRECTORY
120  // The run dir should be set via the configuration of EvFDaqDirector
121 
122  if (edm::Service<evf::EvFDaqDirector>().operator->()==nullptr)
123  {
124  throw cms::Exception("FastMonitoringService") << "EvFDaqDirector is not present";
125 
126  }
127  emptyLumisectionMode_ = edm::Service<evf::EvFDaqDirector>()->emptyLumisectionMode();
128  boost::filesystem::path runDirectory(edm::Service<evf::EvFDaqDirector>()->baseRunDir());
129  workingDirectory_ = runDirectory_ = runDirectory;
130  workingDirectory_ /= "mon";
131 
132  if ( !boost::filesystem::is_directory(workingDirectory_)) {
133  LogDebug("FastMonitoringService") << "<MON> DIR NOT FOUND! Trying to create -: " << workingDirectory_.string() ;
134  boost::filesystem::create_directories(workingDirectory_);
135  if ( !boost::filesystem::is_directory(workingDirectory_))
136  edm::LogWarning("FastMonitoringService") << "Unable to create <MON> DIR -: " << workingDirectory_.string()
137  << ". No monitoring data will be written.";
138  }
139 
140  std::ostringstream fastFileName;
141 
142  fastFileName << fastName_ << "_pid" << std::setfill('0') << std::setw(5) << getpid() << ".fast";
144  fast /= fastFileName.str();
145  fastPath_ = fast.string();
146 
147  std::ostringstream moduleLegFile;
148  std::ostringstream moduleLegFileJson;
149  moduleLegFile << "microstatelegend_pid" << std::setfill('0') << std::setw(5) << getpid() << ".leg";
150  moduleLegFileJson << "microstatelegend_pid" << std::setfill('0') << std::setw(5) << getpid() << ".jsn";
151  moduleLegendFile_ = (workingDirectory_/moduleLegFile.str()).string();
152  moduleLegendFileJson_ = (workingDirectory_/moduleLegFileJson.str()).string();
153 
154  std::ostringstream pathLegFile;
155  std::ostringstream pathLegFileJson;
156  pathLegFile << "pathlegend_pid" << std::setfill('0') << std::setw(5) << getpid() << ".leg";
157  pathLegendFile_ = (workingDirectory_/pathLegFile.str()).string();
158  pathLegFileJson << "pathlegend_pid" << std::setfill('0') << std::setw(5) << getpid() << ".jsn";
159  pathLegendFileJson_ = (workingDirectory_/pathLegFileJson.str()).string();
160 
161  LogDebug("FastMonitoringService") << "Initializing FastMonitor with microstate def path -: "
163  //<< encPath_.current_ + 1 << " " << encModule_.current_ + 1
164 
165  nStreams_=bounds.maxNumberOfStreams();
166  nThreads_=bounds.maxNumberOfThreads();
167 
168  //this should already be >=1
169  if (nStreams_==0) nStreams_=1;
170  if (nThreads_==0) nThreads_=1;
171 
172  /*
173  * initialize the fast monitor with:
174  * vector of pointers to monitorable parameters
175  * path to definition
176  *
177  */
178 
180 
181  for(unsigned int i = 0; i < (mCOUNT); i++)
184 
185  for (unsigned int i=0;i<nStreams_;i++) {
186  ministate_.push_back(&nopath_);
188 
189  //for synchronization
190  streamCounterUpdating_.push_back(new std::atomic<bool>(0));
191 
192  //path (mini) state
193  encPath_.emplace_back(0);
194  encPath_[i].update((void*)&nopath_);
195  eventCountForPathInit_.push_back(0);
196  firstEventId_.push_back(0);
197  collectedPathList_.push_back(new std::atomic<bool>(0));
198 
199  }
200  //for (unsigned int i=0;i<nThreads_;i++)
201  // threadMicrostate_.push_back(&reservedMicroStateNames[mInvalid]);
202 
203  //initial size until we detect number of bins
207 
208  lastGlobalLumi_=0;
210  lumiFromSource_=0;
211 
212  //startup monitoring
214  fmt_.jsonMonitor_->setNStreams(nStreams_);
216  monInit_.store(false,std::memory_order_release);
218 
219  //this definition needs: #include "tbb/compat/thread"
220  //however this would results in TBB imeplementation replacing std::thread
221  //(both supposedly call pthread_self())
222  //number of threads created in process could be obtained from /proc,
223  //assuming that all posix threads are true kernel threads capable of running in parallel
224 
225  //#if TBB_IMPLEMENT_CPP0X
227  //threadIDAvailable_=true;
228  //#endif
229 
230  }
231 
233  {
234  std::string context;
235  if (to==edm::TerminationOrigin::ExceptionFromThisContext) context = " FromThisContext ";
236  if (to==edm::TerminationOrigin::ExceptionFromAnotherContext) context = " FromAnotherContext";
237  if (to==edm::TerminationOrigin::ExternalSignal) context = " FromExternalSignal";
238  edm::LogInfo("FastMonitoringService") << " STREAM " << sc.streamID().value() << " earlyTermination -: ID:"<< sc.eventID()
239  << " LS:" << sc.eventID().luminosityBlock() << " " << context;
240  std::lock_guard<std::mutex> lock(fmt_.monlock_);
241  exceptionInLS_.push_back(sc.eventID().luminosityBlock());
242  //exception_detected_=true;
243  }
244 
246  {
247  std::string context;
248  if (to==edm::TerminationOrigin::ExceptionFromThisContext) context = " FromThisContext ";
249  if (to==edm::TerminationOrigin::ExceptionFromAnotherContext) context = " FromAnotherContext";
250  if (to==edm::TerminationOrigin::ExternalSignal) context = " FromExternalSignal";
251  edm::LogInfo("FastMonitoringService") << " GLOBAL " << "earlyTermination -: LS:"
252  << gc.luminosityBlockID().luminosityBlock() << " " << context;
253  std::lock_guard<std::mutex> lock(fmt_.monlock_);
255  //exception_detected_=true;
256  }
257 
259  {
260  std::string context;
261  if (to==edm::TerminationOrigin::ExceptionFromThisContext) context = " FromThisContext ";
262  if (to==edm::TerminationOrigin::ExceptionFromAnotherContext) context = " FromAnotherContext";
263  if (to==edm::TerminationOrigin::ExternalSignal) context = " FromExternalSignal";
264  edm::LogInfo("FastMonitoringService") << " SOURCE " << "earlyTermination -: " << context;
265  std::lock_guard<std::mutex> lock(fmt_.monlock_);
266  exception_detected_=true;
267  }
268 
270  if (!ls) exception_detected_=true;
271  else exceptionInLS_.push_back(ls);
272  }
273 
275  {
277  }
278 
279  //new output module name is stream
281  {
282  std::lock_guard<std::mutex> lock(fmt_.monlock_);
283  //std::cout << " Pre module Begin Job module: " << desc.moduleName() << std::endl;
284 
285  //build a map of modules keyed by their module description address
286  //here we need to treat output modules in a special way so they can be easily singled out
287  if(desc.moduleName() == "Stream" || desc.moduleName() == "ShmStreamConsumer" || desc.moduleName() == "EvFOutputModule" ||
288  desc.moduleName() == "EventStreamFileWriter" || desc.moduleName() == "PoolOutputModule") {
289  encModule_.updateReserved((void*)&desc);
290  nOutputModules_++;
291  }
292  else
293  encModule_.update((void*)&desc);
294  }
295 
297  {
298  std::string && moduleLegStrJson = makeModuleLegendaJson();
299  FileIO::writeStringToFile(moduleLegendFileJson_, moduleLegStrJson);
300 
302 
303  //update number of entries in module histogram
304  std::lock_guard<std::mutex> lock(fmt_.monlock_);
306  }
307 
309  {
311  fmt_.stop();
312  }
313 
315  {
317  }
318 
320  {
321 
322  timeval lumiStartTime;
323  gettimeofday(&lumiStartTime, 0);
324  unsigned int newLumi = gc.luminosityBlockID().luminosityBlock();
325 
326  std::lock_guard<std::mutex> lock(fmt_.monlock_);
327 
328  lumiStartTime_[newLumi]=lumiStartTime;
329  while (!lastGlobalLumisClosed_.empty()) {
330  //wipe out old map entries as they aren't needed and slow down access
331  unsigned int oldLumi = lastGlobalLumisClosed_.back();
333  lumiStartTime_.erase(oldLumi);
334  avgLeadTime_.erase(oldLumi);
335  filesProcessedDuringLumi_.erase(oldLumi);
336  accuSize_.erase(oldLumi);
337  lockStatsDuringLumi_.erase(oldLumi);
338  processedEventsPerLumi_.erase(oldLumi);
339  }
340  lastGlobalLumi_= newLumi;
342  }
343 
345  {
346  unsigned int lumi = gc.luminosityBlockID().luminosityBlock();
347  LogDebug("FastMonitoringService") << "Lumi ended. Writing JSON information. LUMI -: "
348  << lumi;
349  timeval lumiStopTime;
350  gettimeofday(&lumiStopTime, 0);
351 
352  std::lock_guard<std::mutex> lock(fmt_.monlock_);
353 
354  // Compute throughput
355  timeval stt = lumiStartTime_[lumi];
356  unsigned long usecondsForLumi = (lumiStopTime.tv_sec - stt.tv_sec)*1000000
357  + (lumiStopTime.tv_usec - stt.tv_usec);
358  unsigned long accuSize = accuSize_.find(lumi)==accuSize_.end() ? 0 : accuSize_[lumi];
359  double throughput = throughputFactor()* double(accuSize) / double(usecondsForLumi);
360  //store to registered variable
361  fmt_.m_data.fastThroughputJ_.value() = throughput;
362 
363  //update
364  doSnapshot(lumi,true);
365 
366  // create file name for slow monitoring file
367  std::stringstream slowFileName;
368  slowFileName << slowName_ << "_ls" << std::setfill('0') << std::setw(4)
369  << lumi << "_pid" << std::setfill('0')
370  << std::setw(5) << getpid() << ".jsn";
372  slow /= slowFileName.str();
373 
374  //retrieve one result we need (todo: sanity check if it's found)
375  IntJ *lumiProcessedJptr = dynamic_cast<IntJ*>(fmt_.jsonMonitor_->getMergedIntJForLumi("Processed",lumi));
376  if (!lumiProcessedJptr)
377  throw cms::Exception("FastMonitoringService") << "Internal error: got null pointer from FastMonitor";
378  processedEventsPerLumi_[lumi] = std::pair<unsigned int,bool>(lumiProcessedJptr->value(),false);
379 
380  {
381  auto itr = sourceEventsReport_.find(lumi);
382  if (itr==sourceEventsReport_.end()) {
383  //check if exception has been thrown (in case of Global/Stream early termination, for this LS)
384  bool exception_detected = exception_detected_;
385  for (auto ex : exceptionInLS_)
386  if (lumi == ex) exception_detected=true;
387 
388  if (edm::shutdown_flag || exception_detected) {
389  edm::LogInfo("FastMonitoringService") << "Run interrupted. Skip writing EoL information -: "
390  << processedEventsPerLumi_[lumi].first << " events were processed in LUMI " << lumi;
391  //this will prevent output modules from producing json file for possibly incomplete lumi
392  processedEventsPerLumi_[lumi].first=0;
393  processedEventsPerLumi_[lumi].second=true;
394  return;
395  }
396  //disable this exception, so service can be used standalone (will be thrown if output module asks for this information)
397  //throw cms::Exception("FastMonitoringService") << "SOURCE did not send update for lumi block. LUMI -:" << lumi;
398  }
399  else {
400  if (itr->second!=processedEventsPerLumi_[lumi].first) {
401  throw cms::Exception("FastMonitoringService") << "MISMATCH with SOURCE update. LUMI -: "
402  << lumi
403  << ", events(processed):" << processedEventsPerLumi_[lumi].first
404  << " events(source):" << itr->second;
405  }
406  sourceEventsReport_.erase(itr);
407  }
408  }
409  edm::LogInfo("FastMonitoringService") << "Statistics for lumisection -: lumi = " << lumi << " events = "
410  << lumiProcessedJptr->value() << " time = " << usecondsForLumi/1000000
411  << " size = " << accuSize << " thr = " << throughput;
412  delete lumiProcessedJptr;
413 
414  //full global and stream merge&output for this lumi
415  fmt_.jsonMonitor_->outputFullJSON(slow.string(),lumi);//full global and stream merge and JSON write for this lumi
416  fmt_.jsonMonitor_->discardCollected(lumi);//we don't do further updates for this lumi
417 
419  }
420 
422  {
423  //mark closed lumis (still keep map entries until next one)
425  }
426 
428  {
429  unsigned int sid = sc.streamID().value();
430  std::lock_guard<std::mutex> lock(fmt_.monlock_);
432 
433  //reset collected values for this stream
434  *(fmt_.m_data.processed_[sid])=0;
435 
436  ministate_[sid]=&nopath_;
438  }
439 
441  {
443  }
444 
446  {
447  unsigned int sid = sc.streamID().value();
448  std::lock_guard<std::mutex> lock(fmt_.monlock_);
449 
450  #if ATOMIC_LEVEL>=2
451  //spinlock to make sure we are not still updating event counter somewhere
452  while (streamCounterUpdating_[sid]->load(std::memory_order_acquire)) {}
453  #endif
454 
455  //update processed count to be complete at this time
457  //reset this in case stream does not get notified of next lumi (we keep processed events only)
458  ministate_[sid]=&nopath_;
460  }
462  {
464  }
465 
466 
468  {
469  //make sure that all path names are retrieved before allowing ministate to change
470  //hack: assume memory is synchronized after ~50 events seen by each stream
471  if (unlikely(eventCountForPathInit_[sc.streamID()]<50) && false==collectedPathList_[sc.streamID()]->load(std::memory_order_acquire))
472  {
473  //protection between stream threads, as well as the service monitoring thread
474  std::lock_guard<std::mutex> lock(fmt_.monlock_);
475 
476  if (firstEventId_[sc.streamID()]==0)
477  firstEventId_[sc.streamID()]=sc.eventID().event();
478  if (sc.eventID().event()==firstEventId_[sc.streamID()])
479  {
480  encPath_[sc.streamID()].update((void*)&pc.pathName());
481  return;
482  }
483  else {
484  //finished collecting path names
485  collectedPathList_[sc.streamID()]->store(true,std::memory_order_seq_cst);
486  fmt_.m_data.ministateBins_=encPath_[sc.streamID()].vecsize();
487  if (!pathLegendWritten_) {
488  std::string pathLegendStrJson = makePathLegendaJson();
489  FileIO::writeStringToFile(pathLegendFileJson_, pathLegendStrJson);
490  pathLegendWritten_=true;
491  }
492  }
493  }
494  else {
495  ministate_[sc.streamID()] = &(pc.pathName());
496  }
497  }
498 
499 
501  {
502  }
503 
505  {
507 
508  ministate_[sc.streamID()] = &nopath_;
509 
510  #if ATOMIC_LEVEL>=2
511  //use atomic flag to make sure end of lumi sees this
512  streamCounterUpdating_[sc.streamID()]->store(true,std::memory_order_release);
513  fmt_.m_data.processed_[sc.streamID()]->fetch_add(1,std::memory_order_release);
514  streamCounterUpdating_[sc.streamID()]->store(false,std::memory_order_release);
515 
516  #elif ATOMIC_LEVEL==1
517  //writes are atomic, we assume writes propagate to memory before stream EOL snap
518  fmt_.m_data.processed_[sc.streamID()]->fetch_add(1,std::memory_order_relaxed);
519 
520  #elif ATOMIC_LEVEL==0 //default
521  (*(fmt_.m_data.processed_[sc.streamID()]))++;
522  #endif
524 
525  //fast path counter (events accumulated in a run)
526  unsigned long res = totalEventsProcessed_.fetch_add(1,std::memory_order_relaxed);
528  //fmt_.m_data.fastPathProcessedJ_ = totalEventsProcessed_.load(std::memory_order_relaxed);
529  }
530 
532  {
534  }
535 
537  {
539  }
540 
542  {
543  microstate_[sc.streamID().value()] = (void*)(mcc.moduleDescription());
544  }
545 
547  {
548  //microstate_[sc.streamID().value()] = (void*)(mcc.moduleDescription());
550  }
551 
552  //FUNCTIONS CALLED FROM OUTSIDE
553 
554  //this is for old-fashioned service that is not thread safe and can block other streams
555  //(we assume the worst case - everything is blocked)
557  {
558  for (unsigned int i=0;i<nStreams_;i++)
560  }
561 
562  //this is for services that are multithreading-enabled or rarely blocks other streams
564  {
566  }
567 
568  //from source
569  void FastMonitoringService::accumulateFileSize(unsigned int lumi, unsigned long fileSize) {
570  std::lock_guard<std::mutex> lock(fmt_.monlock_);
571 
572  if (accuSize_.find(lumi)==accuSize_.end()) accuSize_[lumi] = fileSize;
573  else accuSize_[lumi] += fileSize;
574 
576  filesProcessedDuringLumi_[lumi] = 1;
577  else
579  }
580 
582  gettimeofday(&fileLookStart_, 0);
583  /*
584  std::cout << "Started looking for .raw file at: s=" << fileLookStart_.tv_sec << ": ms = "
585  << fileLookStart_.tv_usec / 1000.0 << std::endl;
586  */
587  }
588 
590  gettimeofday(&fileLookStop_, 0);
591  /*
592  std::cout << "Stopped looking for .raw file at: s=" << fileLookStop_.tv_sec << ": ms = "
593  << fileLookStop_.tv_usec / 1000.0 << std::endl;
594  */
595  std::lock_guard<std::mutex> lock(fmt_.monlock_);
596 
597  if (lumi>lumiFromSource_) {
599  leadTimes_.clear();
600  }
601  unsigned long elapsedTime = (fileLookStop_.tv_sec - fileLookStart_.tv_sec) * 1000000 // sec to us
602  + (fileLookStop_.tv_usec - fileLookStart_.tv_usec); // us
603  // add this to lead times for this lumi
604  leadTimes_.push_back((double)elapsedTime);
605 
606  // recompute average lead time for this lumi
607  if (leadTimes_.size() == 1) avgLeadTime_[lumi] = leadTimes_[0];
608  else {
609  double totTime = 0;
610  for (unsigned int i = 0; i < leadTimes_.size(); i++) totTime += leadTimes_[i];
611  avgLeadTime_[lumi] = 0.001*(totTime / leadTimes_.size());
612  }
613  }
614 
615  void FastMonitoringService::reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount)
616  {
617  std::lock_guard<std::mutex> lock(fmt_.monlock_);
618  lockStatsDuringLumi_[ls]=std::pair<double,unsigned int>(waitTime,lockCount);
619 
620  }
621 
622  //for the output module
623  unsigned int FastMonitoringService::getEventsProcessedForLumi(unsigned int lumi, bool * abortFlag) {
624  std::lock_guard<std::mutex> lock(fmt_.monlock_);
625 
626  auto it = processedEventsPerLumi_.find(lumi);
627  if (it!=processedEventsPerLumi_.end()) {
628  unsigned int proc = it->second.first;
629  if (abortFlag) *abortFlag=it->second.second;
630  return proc;
631  }
632  else {
633  throw cms::Exception("FastMonitoringService") << "output module wants already deleted (or never reported by SOURCE) lumisection event count for LUMI -: "<<lumi;
634  return 0;
635  }
636  }
637 
638  //for the output module
640  std::lock_guard<std::mutex> lock(fmt_.monlock_);
641 
642  auto it = processedEventsPerLumi_.find(lumi);
643  if (it!=processedEventsPerLumi_.end()) {
644  unsigned int abortFlag = it->second.second;
645  return abortFlag;
646  }
647  else {
648  throw cms::Exception("FastMonitoringService") << "output module wants already deleted (or never reported by SOURCE) lumisection status for LUMI -: "<<lumi;
649  return 0;
650  }
651  }
652 
653  void FastMonitoringService::doSnapshot(const unsigned int ls, const bool isGlobalEOL) {
654  // update macrostate
656 
657  //update these unless in the midst of a global transition
659 
660  auto itd = avgLeadTime_.find(ls);
661  if (itd != avgLeadTime_.end())
662  fmt_.m_data.fastAvgLeadTimeJ_ = itd->second;
663  else fmt_.m_data.fastAvgLeadTimeJ_=0.;
664 
665  auto iti = filesProcessedDuringLumi_.find(ls);
666  if (iti != filesProcessedDuringLumi_.end())
667  fmt_.m_data.fastFilesProcessedJ_ = iti->second;
669 
670  auto itrd = lockStatsDuringLumi_.find(ls);
671  if (itrd != lockStatsDuringLumi_.end()) {
672  fmt_.m_data.fastLockWaitJ_ = itrd->second.first;
673  fmt_.m_data.fastLockCountJ_ = itrd->second.second;
674  }
675  else {
678  }
679 
680  }
681  else return;
682 
683  //capture latest mini/microstate of streams
684  for (unsigned int i=0;i<nStreams_;i++) {
687  }
688  //for (unsigned int i=0;i<nThreads_;i++)
689  // fmt_.m_data.threadMicrostateEncoded_[i] = encModule_.encode(threadMicrostate_[i]);
690 
691  if (isGlobalEOL)
692  {//only update global variables
693  fmt_.jsonMonitor_->snapGlobal(ls);
694  }
695  else
696  fmt_.jsonMonitor_->snap(ls);
697  }
698 
700  {
701 
702  std::lock_guard<std::mutex> lock(fmt_.monlock_);
703  auto itr = sourceEventsReport_.find(lumi);
704  if (itr!=sourceEventsReport_.end())
705  itr->second+=events;
706  else
707  sourceEventsReport_[lumi]=events;
708 
709  }
710 } //end namespace evf
711 
#define LogDebug(id)
void prePathEvent(edm::StreamContext const &, edm::PathContext const &)
std::string const & pathName() const
Definition: PathContext.h:37
unsigned int maxNumberOfThreads() const
Definition: SystemBounds.h:46
EventNumber_t event() const
Definition: EventID.h:41
void watchPreStreamEarlyTermination(PreStreamEarlyTermination::slot_type const &iSlot)
void watchPreEvent(PreEvent::slot_type const &iSlot)
int i
Definition: DBlmapReader.cc:9
unsigned int getEventsProcessedForLumi(unsigned int lumi, bool *abortFlag=0)
static const edm::ModuleDescription reservedMicroStateNames[mCOUNT]
void watchPrePathEvent(PrePathEvent::slot_type const &iSlot)
void watchPreallocate(Preallocate::slot_type const &iSlot)
void setExceptionDetected(unsigned int ls)
boost::filesystem::path runDirectory_
void watchPostEndJob(PostEndJob::slot_type const &iSlot)
void preallocate(edm::service::SystemBounds const &)
std::map< unsigned int, timeval > lumiStartTime_
void start(void(FastMonitoringService::*fp)(), FastMonitoringService *cp)
void preGlobalBeginLumi(edm::GlobalContext const &)
double throughputFactor()
void watchPreModuleEvent(PreModuleEvent::slot_type const &iSlot)
void postGlobalEndLumi(edm::GlobalContext const &)
TrainProcessor *const proc
Definition: MVATrainer.cc:101
void postEvent(edm::StreamContext const &)
void accumulateFileSize(unsigned int lumi, unsigned long fileSize)
void watchPreGlobalEndLumi(PreGlobalEndLumi::slot_type const &iSlot)
std::map< unsigned int, unsigned long > accuSize_
tuple lumi
Definition: fjr2json.py:35
std::vector< unsigned int > eventCountForPathInit_
std::vector< std::atomic< bool > * > streamCounterUpdating_
void watchPostEvent(PostEvent::slot_type const &iSlot)
LuminosityBlockID const & luminosityBlockID() const
Definition: GlobalContext.h:52
def ls
Definition: eostools.py:348
void watchPostStreamEndLumi(PostStreamEndLumi::slot_type const &iSlot)
void watchPreGlobalBeginLumi(PreGlobalBeginLumi::slot_type const &iSlot)
std::string const & moduleName() const
void watchPostModuleEvent(PostModuleEvent::slot_type const &iSlot)
Value & append(const Value &value)
Append value to array at the end.
void watchPostSourceEvent(PostSourceEvent::slot_type const &iSlot)
volatile std::atomic< bool > shutdown_flag
void doStreamEOLSnapshot(const unsigned int ls, const unsigned int streamID)
std::map< unsigned int, unsigned int > filesProcessedDuringLumi_
void preGlobalEndLumi(edm::GlobalContext const &)
Represents a JSON value.
Definition: value.h:111
LuminosityBlockNumber_t luminosityBlock() const
Definition: EventID.h:40
void watchPreStreamEndLumi(PreStreamEndLumi::slot_type const &iSlot)
void preGlobalEarlyTermination(edm::GlobalContext const &, edm::TerminationOrigin)
#define constexpr
void watchPreSourceEarlyTermination(PreSourceEarlyTermination::slot_type const &iSlot)
void watchJobFailure(JobFailure::slot_type const &iSlot)
convenience function for attaching to signal
void reportEventsThisLumiInSource(unsigned int lumi, unsigned int events)
void registerVariables(jsoncollector::FastMonitor *fm, unsigned int nStreams, unsigned int nThreads)
#define unlikely(x)
unsigned int maxNumberOfStreams() const
Definition: SystemBounds.h:43
void preModuleBeginJob(edm::ModuleDescription const &)
std::vector< std::atomic< bool > * > collectedPathList_
std::queue< unsigned int > lastGlobalLumisClosed_
void preStreamEndLumi(edm::StreamContext const &)
std::map< unsigned int, double > avgLeadTime_
void watchPostStreamBeginLumi(PostStreamBeginLumi::slot_type const &iSlot)
void doSnapshot(const unsigned int ls, const bool isGlobalEOL)
void preStreamEarlyTermination(edm::StreamContext const &, edm::TerminationOrigin)
void watchPreGlobalEarlyTermination(PreGlobalEarlyTermination::slot_type const &iSlot)
std::unique_ptr< jsoncollector::FastMonitor > jsonMonitor_
static const std::string nopath_
def load
Definition: svgfig.py:546
ModuleDescription const * moduleDescription() const
#define NRESERVEDMODULES
void resetFastMonitor(std::string const &microStateDefPath, std::string const &fastMicroStateDefPath)
void watchPostGlobalEndLumi(PostGlobalEndLumi::slot_type const &iSlot)
std::map< unsigned int, std::pair< double, unsigned int > > lockStatsDuringLumi_
std::vector< unsigned long > firstEventId_
void watchPreModuleBeginJob(PreModuleBeginJob::slot_type const &iSlot)
void postModuleEvent(edm::StreamContext const &, edm::ModuleCallingContext const &)
virtual std::string write(const Value &root)
Serialize a Value in JSON format.
void postStreamBeginLumi(edm::StreamContext const &)
StreamID const & streamID() const
Definition: StreamContext.h:57
void postStreamEndLumi(edm::StreamContext const &)
std::vector< const void * > microstate_
std::vector< unsigned int > microstateEncoded_
unsigned int value() const
Definition: StreamID.h:46
std::map< unsigned int, std::pair< unsigned int, bool > > processedEventsPerLumi_
void preStreamBeginLumi(edm::StreamContext const &)
std::atomic< unsigned long > totalEventsProcessed_
LuminosityBlockNumber_t luminosityBlock() const
void watchPreStreamBeginLumi(PreStreamBeginLumi::slot_type const &iSlot)
FastMonitoringThread::Macrostate macrostate_
std::vector< double > leadTimes_
void stoppedLookingForFile(unsigned int lumi)
void setMicroState(MicroStateService::Microstate)
tuple events
Definition: patZpeak.py:19
boost::filesystem::path workingDirectory_
std::map< unsigned int, unsigned int > sourceEventsReport_
std::vector< jsoncollector::AtomicMonUInt * > processed_
void postGlobalBeginRun(edm::GlobalContext const &)
void preEvent(edm::StreamContext const &)
void preSourceEarlyTermination(edm::TerminationOrigin)
EventID const & eventID() const
Definition: StreamContext.h:59
void watchPreSourceEvent(PreSourceEvent::slot_type const &iSlot)
bool getAbortFlagForLumi(unsigned int lumi)
void preModuleEvent(edm::StreamContext const &, edm::ModuleCallingContext const &)
Writes a Value in JSON format in a human friendly way.
Definition: writer.h:65
std::vector< const void * > ministate_
const void * decode(unsigned int index)
std::vector< unsigned int > streamLumi_
#define NRESERVEDPATHS
std::vector< Encoding > encPath_
std::vector< unsigned int > ministateEncoded_
void reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount)
#define NSPECIALMODULES
std::vector< unsigned int > exceptionInLS_
void watchPostBeginJob(PostBeginJob::slot_type const &iSlot)
convenience function for attaching to signal
array value (ordered list)
Definition: value.h:31