CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
FastMonitoringService.cc
Go to the documentation of this file.
2 #include <iostream>
3 
5 #include <iomanip>
6 #include <sys/time.h>
7 
17 
20 
23 
24 constexpr double throughputFactor() {return (1000000)/double(1024*1024);}
25 
26 #define NRESERVEDMODULES 33
27 #define NSPECIALMODULES 7
28 #define NRESERVEDPATHS 1
29 
30 namespace evf{
31 
33  {"Init","JobReady","RunGiven","Running",
34  "Stopping","Done","JobEnded","Error","ErrorEnded","End",
35  "Invalid"};
36 
38 
40  edm::ActivityRegistry& reg) :
41  MicroStateService(iPS,reg)
42  ,encModule_(NRESERVEDMODULES)
43  ,nStreams_(0)//until initialized
44  ,sleepTime_(iPS.getUntrackedParameter<int>("sleepTime", 1))
45  ,fastMonIntervals_(iPS.getUntrackedParameter<unsigned int>("fastMonIntervals", 2))
46  ,fastName_("fastmoni")
47  ,slowName_("slowmoni")
48  ,totalEventsProcessed_(0)
49  {
50  reg.watchPreallocate(this, &FastMonitoringService::preallocate);//receiving information on number of threads
52 
56 
60 
65 
67 
70 
71  reg.watchPreSourceEvent(this,&FastMonitoringService::preSourceEvent);//source (with streamID of requestor)
73 
76 
80 
81  //find microstate definition path (required by the module)
82  struct stat statbuf;
83  std::string microstateBaseSuffix = "src/EventFilter/Utilities/plugins/microstatedef.jsd";
84  std::string microstatePath = std::string(getenv("CMSSW_BASE")) + "/" + microstateBaseSuffix;
85  if (stat(microstatePath.c_str(), &statbuf)) {
86  microstatePath = std::string(getenv("CMSSW_RELEASE_BASE")) + "/" + microstateBaseSuffix;
87  if (stat(microstatePath.c_str(), &statbuf)) {
88  microstatePath = microstateBaseSuffix;
89  if (stat(microstatePath.c_str(), &statbuf))
90  throw cms::Exception("FastMonitoringService") << "microstate definition file not found";
91  }
92  }
93  fastMicrostateDefPath_ = microstateDefPath_ = microstatePath;
94  }
95 
96 
98  {
99  }
100 
102  {
104  desc.setComment("Service for File-based DAQ monitoring and event accounting");
105  desc.addUntracked<int> ("sleepTime",1)->setComment("Sleep time of the monitoring thread");
106  desc.addUntracked<unsigned int> ("fastMonIntervals",2)->setComment("Modulo of sleepTime intervals on which fastmon file is written out");
107  desc.setAllowAnything();
108  descriptions.add("FastMonitoringService", desc);
109  }
110 
111 
113  Json::Value legendaVector(Json::arrayValue);
114  for(int i = 0; i < encPath_[0].current_; i++)
115  legendaVector.append(Json::Value(*((std::string *)(encPath_[0].decode(i)))));
116  Json::Value valReserved(NRESERVEDPATHS);
117  Json::Value pathLegend;
118  pathLegend["names"]=legendaVector;
119  pathLegend["reserved"]=valReserved;
120  Json::StyledWriter writer;
121  return writer.write(pathLegend);
122  }
123 
125  Json::Value legendaVector(Json::arrayValue);
126  for(int i = 0; i < encModule_.current_; i++)
127  legendaVector.append(Json::Value(((const edm::ModuleDescription *)(encModule_.decode(i)))->moduleLabel()));
128  Json::Value valReserved(NRESERVEDMODULES);
129  Json::Value valSpecial(NSPECIALMODULES);
130  Json::Value valOutputModules(nOutputModules_);
131  Json::Value moduleLegend;
132  moduleLegend["names"]=legendaVector;
133  moduleLegend["reserved"]=valReserved;
134  moduleLegend["special"]=valSpecial;
135  moduleLegend["output"]=valOutputModules;
136  Json::StyledWriter writer;
137  return writer.write(moduleLegend);
138  }
139 
141  {
142 
143  // FIND RUN DIRECTORY
144  // The run dir should be set via the configuration of EvFDaqDirector
145 
146  if (edm::Service<evf::EvFDaqDirector>().operator->()==nullptr)
147  {
148  throw cms::Exception("FastMonitoringService") << "EvFDaqDirector is not present";
149 
150  }
151  emptyLumisectionMode_ = edm::Service<evf::EvFDaqDirector>()->emptyLumisectionMode();
152  boost::filesystem::path runDirectory(edm::Service<evf::EvFDaqDirector>()->baseRunDir());
153  workingDirectory_ = runDirectory_ = runDirectory;
154  workingDirectory_ /= "mon";
155 
156  if ( !boost::filesystem::is_directory(workingDirectory_)) {
157  LogDebug("FastMonitoringService") << "<MON> DIR NOT FOUND! Trying to create -: " << workingDirectory_.string() ;
158  boost::filesystem::create_directories(workingDirectory_);
159  if ( !boost::filesystem::is_directory(workingDirectory_))
160  edm::LogWarning("FastMonitoringService") << "Unable to create <MON> DIR -: " << workingDirectory_.string()
161  << ". No monitoring data will be written.";
162  }
163 
164  std::ostringstream fastFileName;
165 
166  fastFileName << fastName_ << "_pid" << std::setfill('0') << std::setw(5) << getpid() << ".fast";
168  fast /= fastFileName.str();
169  fastPath_ = fast.string();
170 
171  std::ostringstream moduleLegFile;
172  std::ostringstream moduleLegFileJson;
173  moduleLegFile << "microstatelegend_pid" << std::setfill('0') << std::setw(5) << getpid() << ".leg";
174  moduleLegFileJson << "microstatelegend_pid" << std::setfill('0') << std::setw(5) << getpid() << ".jsn";
175  moduleLegendFile_ = (workingDirectory_/moduleLegFile.str()).string();
176  moduleLegendFileJson_ = (workingDirectory_/moduleLegFileJson.str()).string();
177 
178  std::ostringstream pathLegFile;
179  std::ostringstream pathLegFileJson;
180  pathLegFile << "pathlegend_pid" << std::setfill('0') << std::setw(5) << getpid() << ".leg";
181  pathLegendFile_ = (workingDirectory_/pathLegFile.str()).string();
182  pathLegFileJson << "pathlegend_pid" << std::setfill('0') << std::setw(5) << getpid() << ".jsn";
183  pathLegendFileJson_ = (workingDirectory_/pathLegFileJson.str()).string();
184 
185  LogDebug("FastMonitoringService") << "Initializing FastMonitor with microstate def path -: "
187  //<< encPath_.current_ + 1 << " " << encModule_.current_ + 1
188 
189  nStreams_=bounds.maxNumberOfStreams();
190  nThreads_=bounds.maxNumberOfThreads();
191 
192  //this should already be >=1
193  if (nStreams_==0) nStreams_=1;
194  if (nThreads_==0) nThreads_=1;
195 
196  /*
197  * initialize the fast monitor with:
198  * vector of pointers to monitorable parameters
199  * path to definition
200  *
201  */
202 
204 
205  for(unsigned int i = 0; i < (mCOUNT); i++)
208 
209  for (unsigned int i=0;i<nStreams_;i++) {
210  ministate_.push_back(&nopath_);
212 
213  //for synchronization
214  streamCounterUpdating_.push_back(new std::atomic<bool>(0));
215 
216  //path (mini) state
217  encPath_.emplace_back(0);
218  encPath_[i].update((void*)&nopath_);
219  eventCountForPathInit_.push_back(0);
220  firstEventId_.push_back(0);
221  collectedPathList_.push_back(new std::atomic<bool>(0));
222 
223  }
224  //for (unsigned int i=0;i<nThreads_;i++)
225  // threadMicrostate_.push_back(&reservedMicroStateNames[mInvalid]);
226 
227  //initial size until we detect number of bins
231 
232  lastGlobalLumi_=0;
234  lumiFromSource_=0;
235 
236  //startup monitoring
238  fmt_.jsonMonitor_->setNStreams(nStreams_);
240  monInit_.store(false,std::memory_order_release);
242 
243  //this definition needs: #include "tbb/compat/thread"
244  //however this would results in TBB imeplementation replacing std::thread
245  //(both supposedly call pthread_self())
246  //number of threads created in process could be obtained from /proc,
247  //assuming that all posix threads are true kernel threads capable of running in parallel
248 
249  //#if TBB_IMPLEMENT_CPP0X
251  //threadIDAvailable_=true;
252  //#endif
253 
254  }
255 
257  {
258  std::string context;
259  if (to==edm::TerminationOrigin::ExceptionFromThisContext) context = " FromThisContext ";
260  if (to==edm::TerminationOrigin::ExceptionFromAnotherContext) context = " FromAnotherContext";
261  if (to==edm::TerminationOrigin::ExternalSignal) context = " FromExternalSignal";
262  edm::LogWarning("FastMonitoringService") << " STREAM " << sc.streamID().value() << " earlyTermination -: ID:"<< sc.eventID()
263  << " LS:" << sc.eventID().luminosityBlock() << " " << context;
264  std::lock_guard<std::mutex> lock(fmt_.monlock_);
265  exceptionInLS_.push_back(sc.eventID().luminosityBlock());
266  //exception_detected_=true;
267  }
268 
270  {
271  std::string context;
272  if (to==edm::TerminationOrigin::ExceptionFromThisContext) context = " FromThisContext ";
273  if (to==edm::TerminationOrigin::ExceptionFromAnotherContext) context = " FromAnotherContext";
274  if (to==edm::TerminationOrigin::ExternalSignal) context = " FromExternalSignal";
275  edm::LogWarning("FastMonitoringService") << " GLOBAL " << "earlyTermination -: LS:"
276  << gc.luminosityBlockID().luminosityBlock() << " " << context;
277  std::lock_guard<std::mutex> lock(fmt_.monlock_);
279  //exception_detected_=true;
280  }
281 
283  {
284  std::string context;
285  if (to==edm::TerminationOrigin::ExceptionFromThisContext) context = " FromThisContext ";
286  if (to==edm::TerminationOrigin::ExceptionFromAnotherContext) context = " FromAnotherContext";
287  if (to==edm::TerminationOrigin::ExternalSignal) context = " FromExternalSignal";
288  edm::LogWarning("FastMonitoringService") << " SOURCE " << "earlyTermination -: " << context;
289  std::lock_guard<std::mutex> lock(fmt_.monlock_);
290  exception_detected_=true;
291  }
292 
294  if (!ls) exception_detected_=true;
295  else exceptionInLS_.push_back(ls);
296  }
297 
299  {
301  }
302 
303  //new output module name is stream
305  {
306  std::lock_guard<std::mutex> lock(fmt_.monlock_);
307  //std::cout << " Pre module Begin Job module: " << desc.moduleName() << std::endl;
308 
309  //build a map of modules keyed by their module description address
310  //here we need to treat output modules in a special way so they can be easily singled out
311  if(desc.moduleName() == "Stream" || desc.moduleName() == "ShmStreamConsumer" || desc.moduleName() == "EvFOutputModule" ||
312  desc.moduleName() == "EventStreamFileWriter" || desc.moduleName() == "PoolOutputModule") {
313  encModule_.updateReserved((void*)&desc);
314  nOutputModules_++;
315  }
316  else
317  encModule_.update((void*)&desc);
318  }
319 
321  {
322  std::string && moduleLegStrJson = makeModuleLegendaJson();
323  FileIO::writeStringToFile(moduleLegendFileJson_, moduleLegStrJson);
324 
326 
327  //update number of entries in module histogram
328  std::lock_guard<std::mutex> lock(fmt_.monlock_);
330  }
331 
333  {
335  fmt_.stop();
336  }
337 
339  {
341  }
342 
344  {
345 
346  timeval lumiStartTime;
347  gettimeofday(&lumiStartTime, 0);
348  unsigned int newLumi = gc.luminosityBlockID().luminosityBlock();
349 
350  std::lock_guard<std::mutex> lock(fmt_.monlock_);
351 
352  lumiStartTime_[newLumi]=lumiStartTime;
353  while (!lastGlobalLumisClosed_.empty()) {
354  //wipe out old map entries as they aren't needed and slow down access
355  unsigned int oldLumi = lastGlobalLumisClosed_.back();
357  lumiStartTime_.erase(oldLumi);
358  avgLeadTime_.erase(oldLumi);
359  filesProcessedDuringLumi_.erase(oldLumi);
360  accuSize_.erase(oldLumi);
361  lockStatsDuringLumi_.erase(oldLumi);
362  processedEventsPerLumi_.erase(oldLumi);
363  }
364  lastGlobalLumi_= newLumi;
366  }
367 
369  {
370  unsigned int lumi = gc.luminosityBlockID().luminosityBlock();
371  LogDebug("FastMonitoringService") << "Lumi ended. Writing JSON information. LUMI -: "
372  << lumi;
373  timeval lumiStopTime;
374  gettimeofday(&lumiStopTime, 0);
375 
376  std::lock_guard<std::mutex> lock(fmt_.monlock_);
377 
378  // Compute throughput
379  timeval stt = lumiStartTime_[lumi];
380  unsigned long usecondsForLumi = (lumiStopTime.tv_sec - stt.tv_sec)*1000000
381  + (lumiStopTime.tv_usec - stt.tv_usec);
382  unsigned long accuSize = accuSize_.find(lumi)==accuSize_.end() ? 0 : accuSize_[lumi];
383  double throughput = throughputFactor()* double(accuSize) / double(usecondsForLumi);
384  //store to registered variable
385  fmt_.m_data.fastThroughputJ_.value() = throughput;
386 
387  //update
388  doSnapshot(lumi,true);
389 
390  // create file name for slow monitoring file
391  std::stringstream slowFileName;
392  slowFileName << slowName_ << "_ls" << std::setfill('0') << std::setw(4)
393  << lumi << "_pid" << std::setfill('0')
394  << std::setw(5) << getpid() << ".jsn";
396  slow /= slowFileName.str();
397 
398  //retrieve one result we need (todo: sanity check if it's found)
399  IntJ *lumiProcessedJptr = dynamic_cast<IntJ*>(fmt_.jsonMonitor_->getMergedIntJForLumi("Processed",lumi));
400  if (!lumiProcessedJptr)
401  throw cms::Exception("FastMonitoringService") << "Internal error: got null pointer from FastMonitor";
402  processedEventsPerLumi_[lumi] = std::pair<unsigned int,bool>(lumiProcessedJptr->value(),false);
403 
404  //checking if exception has been thrown (in case of Global/Stream early termination, for this LS)
405  bool exception_detected = exception_detected_;
406  for (auto ex : exceptionInLS_)
407  if (lumi == ex) exception_detected=true;
408 
409  if (edm::shutdown_flag || exception_detected) {
410  edm::LogInfo("FastMonitoringService") << "Run interrupted. Skip writing EoL information -: "
411  << processedEventsPerLumi_[lumi].first << " events were processed in LUMI " << lumi;
412  //this will prevent output modules from producing json file for possibly incomplete lumi
413  processedEventsPerLumi_[lumi].first=0;
414  processedEventsPerLumi_[lumi].second=true;
415  //disable this exception, so service can be used standalone (will be thrown if output module asks for this information)
416  //throw cms::Exception("FastMonitoringService") << "SOURCE did not send update for lumi block. LUMI -:" << lumi;
417  return;
418 
419  }
420 
421  auto itr = sourceEventsReport_.find(lumi);
422  if (itr!=sourceEventsReport_.end()) {
423  if (itr->second!=processedEventsPerLumi_[lumi].first) {
424  throw cms::Exception("FastMonitoringService") << "MISMATCH with SOURCE update. LUMI -: "
425  << lumi
426  << ", events(processed):" << processedEventsPerLumi_[lumi].first
427  << " events(source):" << itr->second;
428  }
429  sourceEventsReport_.erase(itr);
430  }
431  edm::LogInfo("FastMonitoringService") << "Statistics for lumisection -: lumi = " << lumi << " events = "
432  << lumiProcessedJptr->value() << " time = " << usecondsForLumi/1000000
433  << " size = " << accuSize << " thr = " << throughput;
434  delete lumiProcessedJptr;
435 
436  //full global and stream merge&output for this lumi
437  fmt_.jsonMonitor_->outputFullJSON(slow.string(),lumi);//full global and stream merge and JSON write for this lumi
438  fmt_.jsonMonitor_->discardCollected(lumi);//we don't do further updates for this lumi
439 
441  }
442 
444  {
445  //mark closed lumis (still keep map entries until next one)
447  }
448 
450  {
451  unsigned int sid = sc.streamID().value();
452  std::lock_guard<std::mutex> lock(fmt_.monlock_);
454 
455  //reset collected values for this stream
456  *(fmt_.m_data.processed_[sid])=0;
457 
458  ministate_[sid]=&nopath_;
460  }
461 
463  {
465  }
466 
468  {
469  unsigned int sid = sc.streamID().value();
470  std::lock_guard<std::mutex> lock(fmt_.monlock_);
471 
472  #if ATOMIC_LEVEL>=2
473  //spinlock to make sure we are not still updating event counter somewhere
474  while (streamCounterUpdating_[sid]->load(std::memory_order_acquire)) {}
475  #endif
476 
477  //update processed count to be complete at this time
479  //reset this in case stream does not get notified of next lumi (we keep processed events only)
480  ministate_[sid]=&nopath_;
482  }
484  {
486  }
487 
488 
490  {
491  //make sure that all path names are retrieved before allowing ministate to change
492  //hack: assume memory is synchronized after ~50 events seen by each stream
493  if (unlikely(eventCountForPathInit_[sc.streamID()]<50) && false==collectedPathList_[sc.streamID()]->load(std::memory_order_acquire))
494  {
495  //protection between stream threads, as well as the service monitoring thread
496  std::lock_guard<std::mutex> lock(fmt_.monlock_);
497 
498  if (firstEventId_[sc.streamID()]==0)
499  firstEventId_[sc.streamID()]=sc.eventID().event();
500  if (sc.eventID().event()==firstEventId_[sc.streamID()])
501  {
502  encPath_[sc.streamID()].update((void*)&pc.pathName());
503  return;
504  }
505  else {
506  //finished collecting path names
507  collectedPathList_[sc.streamID()]->store(true,std::memory_order_seq_cst);
508  fmt_.m_data.ministateBins_=encPath_[sc.streamID()].vecsize();
509  if (!pathLegendWritten_) {
510  std::string pathLegendStrJson = makePathLegendaJson();
511  FileIO::writeStringToFile(pathLegendFileJson_, pathLegendStrJson);
512  pathLegendWritten_=true;
513  }
514  }
515  }
516  else {
517  ministate_[sc.streamID()] = &(pc.pathName());
518  }
519  }
520 
521 
523  {
524  }
525 
527  {
529 
530  ministate_[sc.streamID()] = &nopath_;
531 
532  #if ATOMIC_LEVEL>=2
533  //use atomic flag to make sure end of lumi sees this
534  streamCounterUpdating_[sc.streamID()]->store(true,std::memory_order_release);
535  fmt_.m_data.processed_[sc.streamID()]->fetch_add(1,std::memory_order_release);
536  streamCounterUpdating_[sc.streamID()]->store(false,std::memory_order_release);
537 
538  #elif ATOMIC_LEVEL==1
539  //writes are atomic, we assume writes propagate to memory before stream EOL snap
540  fmt_.m_data.processed_[sc.streamID()]->fetch_add(1,std::memory_order_relaxed);
541 
542  #elif ATOMIC_LEVEL==0 //default
543  (*(fmt_.m_data.processed_[sc.streamID()]))++;
544  #endif
546 
547  //fast path counter (events accumulated in a run)
548  unsigned long res = totalEventsProcessed_.fetch_add(1,std::memory_order_relaxed);
550  //fmt_.m_data.fastPathProcessedJ_ = totalEventsProcessed_.load(std::memory_order_relaxed);
551  }
552 
554  {
556  }
557 
559  {
561  }
562 
564  {
565  microstate_[sc.streamID().value()] = (void*)(mcc.moduleDescription());
566  }
567 
569  {
570  //microstate_[sc.streamID().value()] = (void*)(mcc.moduleDescription());
572  }
573 
574  //FUNCTIONS CALLED FROM OUTSIDE
575 
576  //this is for old-fashioned service that is not thread safe and can block other streams
577  //(we assume the worst case - everything is blocked)
579  {
580  for (unsigned int i=0;i<nStreams_;i++)
582  }
583 
584  //this is for services that are multithreading-enabled or rarely blocks other streams
586  {
588  }
589 
590  //from source
591  void FastMonitoringService::accumulateFileSize(unsigned int lumi, unsigned long fileSize) {
592  std::lock_guard<std::mutex> lock(fmt_.monlock_);
593 
594  if (accuSize_.find(lumi)==accuSize_.end()) accuSize_[lumi] = fileSize;
595  else accuSize_[lumi] += fileSize;
596 
598  filesProcessedDuringLumi_[lumi] = 1;
599  else
601  }
602 
604  gettimeofday(&fileLookStart_, 0);
605  /*
606  std::cout << "Started looking for .raw file at: s=" << fileLookStart_.tv_sec << ": ms = "
607  << fileLookStart_.tv_usec / 1000.0 << std::endl;
608  */
609  }
610 
612  gettimeofday(&fileLookStop_, 0);
613  /*
614  std::cout << "Stopped looking for .raw file at: s=" << fileLookStop_.tv_sec << ": ms = "
615  << fileLookStop_.tv_usec / 1000.0 << std::endl;
616  */
617  std::lock_guard<std::mutex> lock(fmt_.monlock_);
618 
619  if (lumi>lumiFromSource_) {
621  leadTimes_.clear();
622  }
623  unsigned long elapsedTime = (fileLookStop_.tv_sec - fileLookStart_.tv_sec) * 1000000 // sec to us
624  + (fileLookStop_.tv_usec - fileLookStart_.tv_usec); // us
625  // add this to lead times for this lumi
626  leadTimes_.push_back((double)elapsedTime);
627 
628  // recompute average lead time for this lumi
629  if (leadTimes_.size() == 1) avgLeadTime_[lumi] = leadTimes_[0];
630  else {
631  double totTime = 0;
632  for (unsigned int i = 0; i < leadTimes_.size(); i++) totTime += leadTimes_[i];
633  avgLeadTime_[lumi] = 0.001*(totTime / leadTimes_.size());
634  }
635  }
636 
637  void FastMonitoringService::reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount)
638  {
639  std::lock_guard<std::mutex> lock(fmt_.monlock_);
640  lockStatsDuringLumi_[ls]=std::pair<double,unsigned int>(waitTime,lockCount);
641 
642  }
643 
644  //for the output module
645  unsigned int FastMonitoringService::getEventsProcessedForLumi(unsigned int lumi, bool * abortFlag) {
646  std::lock_guard<std::mutex> lock(fmt_.monlock_);
647 
648  auto it = processedEventsPerLumi_.find(lumi);
649  if (it!=processedEventsPerLumi_.end()) {
650  unsigned int proc = it->second.first;
651  if (abortFlag) *abortFlag=it->second.second;
652  return proc;
653  }
654  else {
655  throw cms::Exception("FastMonitoringService") << "output module wants already deleted (or never reported by SOURCE) lumisection event count for LUMI -: "<<lumi;
656  return 0;
657  }
658  }
659 
660  //for the output module
662  std::lock_guard<std::mutex> lock(fmt_.monlock_);
663 
664  auto it = processedEventsPerLumi_.find(lumi);
665  if (it!=processedEventsPerLumi_.end()) {
666  unsigned int abortFlag = it->second.second;
667  return abortFlag;
668  }
669  else {
670  throw cms::Exception("FastMonitoringService") << "output module wants already deleted (or never reported by SOURCE) lumisection status for LUMI -: "<<lumi;
671  return 0;
672  }
673  }
674 
675  void FastMonitoringService::doSnapshot(const unsigned int ls, const bool isGlobalEOL) {
676  // update macrostate
678 
679  //update these unless in the midst of a global transition
681 
682  auto itd = avgLeadTime_.find(ls);
683  if (itd != avgLeadTime_.end())
684  fmt_.m_data.fastAvgLeadTimeJ_ = itd->second;
685  else fmt_.m_data.fastAvgLeadTimeJ_=0.;
686 
687  auto iti = filesProcessedDuringLumi_.find(ls);
688  if (iti != filesProcessedDuringLumi_.end())
689  fmt_.m_data.fastFilesProcessedJ_ = iti->second;
691 
692  auto itrd = lockStatsDuringLumi_.find(ls);
693  if (itrd != lockStatsDuringLumi_.end()) {
694  fmt_.m_data.fastLockWaitJ_ = itrd->second.first;
695  fmt_.m_data.fastLockCountJ_ = itrd->second.second;
696  }
697  else {
700  }
701 
702  }
703  else return;
704 
705  //capture latest mini/microstate of streams
706  for (unsigned int i=0;i<nStreams_;i++) {
709  }
710  //for (unsigned int i=0;i<nThreads_;i++)
711  // fmt_.m_data.threadMicrostateEncoded_[i] = encModule_.encode(threadMicrostate_[i]);
712 
713  if (isGlobalEOL)
714  {//only update global variables
715  fmt_.jsonMonitor_->snapGlobal(ls);
716  }
717  else
718  fmt_.jsonMonitor_->snap(ls);
719  }
720 
722  {
723 
724  std::lock_guard<std::mutex> lock(fmt_.monlock_);
725  auto itr = sourceEventsReport_.find(lumi);
726  if (itr!=sourceEventsReport_.end())
727  itr->second+=events;
728  else
729  sourceEventsReport_[lumi]=events;
730 
731  }
732 } //end namespace evf
733 
#define LogDebug(id)
void prePathEvent(edm::StreamContext const &, edm::PathContext const &)
std::string const & pathName() const
Definition: PathContext.h:37
unsigned int maxNumberOfThreads() const
Definition: SystemBounds.h:46
EventNumber_t event() const
Definition: EventID.h:41
void watchPreStreamEarlyTermination(PreStreamEarlyTermination::slot_type const &iSlot)
void watchPreEvent(PreEvent::slot_type const &iSlot)
int i
Definition: DBlmapReader.cc:9
unsigned int getEventsProcessedForLumi(unsigned int lumi, bool *abortFlag=0)
static const edm::ModuleDescription reservedMicroStateNames[mCOUNT]
std::vector< AtomicMonUInt * > processed_
void watchPrePathEvent(PrePathEvent::slot_type const &iSlot)
void watchPreallocate(Preallocate::slot_type const &iSlot)
ParameterDescriptionBase * addUntracked(U const &iLabel, T const &value)
void setExceptionDetected(unsigned int ls)
boost::filesystem::path runDirectory_
void watchPostEndJob(PostEndJob::slot_type const &iSlot)
void preallocate(edm::service::SystemBounds const &)
std::map< unsigned int, timeval > lumiStartTime_
void start(void(FastMonitoringService::*fp)(), FastMonitoringService *cp)
void preGlobalBeginLumi(edm::GlobalContext const &)
void setAllowAnything()
allow any parameter label/value pairs
double throughputFactor()
void watchPreModuleEvent(PreModuleEvent::slot_type const &iSlot)
void postGlobalEndLumi(edm::GlobalContext const &)
TrainProcessor *const proc
Definition: MVATrainer.cc:101
void postEvent(edm::StreamContext const &)
void accumulateFileSize(unsigned int lumi, unsigned long fileSize)
void watchPreGlobalEndLumi(PreGlobalEndLumi::slot_type const &iSlot)
std::map< unsigned int, unsigned long > accuSize_
tuple lumi
Definition: fjr2json.py:35
std::vector< unsigned int > eventCountForPathInit_
std::vector< std::atomic< bool > * > streamCounterUpdating_
void watchPostEvent(PostEvent::slot_type const &iSlot)
LuminosityBlockID const & luminosityBlockID() const
Definition: GlobalContext.h:52
void watchPostStreamEndLumi(PostStreamEndLumi::slot_type const &iSlot)
void watchPreGlobalBeginLumi(PreGlobalBeginLumi::slot_type const &iSlot)
std::string const & moduleName() const
void watchPostModuleEvent(PostModuleEvent::slot_type const &iSlot)
Value & append(const Value &value)
Append value to array at the end.
void watchPostSourceEvent(PostSourceEvent::slot_type const &iSlot)
volatile std::atomic< bool > shutdown_flag
void doStreamEOLSnapshot(const unsigned int ls, const unsigned int streamID)
std::map< unsigned int, unsigned int > filesProcessedDuringLumi_
void preGlobalEndLumi(edm::GlobalContext const &)
Represents a JSON value.
Definition: value.h:111
LuminosityBlockNumber_t luminosityBlock() const
Definition: EventID.h:40
void watchPreStreamEndLumi(PreStreamEndLumi::slot_type const &iSlot)
void preGlobalEarlyTermination(edm::GlobalContext const &, edm::TerminationOrigin)
#define constexpr
void watchPreSourceEarlyTermination(PreSourceEarlyTermination::slot_type const &iSlot)
void watchJobFailure(JobFailure::slot_type const &iSlot)
convenience function for attaching to signal
void reportEventsThisLumiInSource(unsigned int lumi, unsigned int events)
#define unlikely(x)
unsigned int maxNumberOfStreams() const
Definition: SystemBounds.h:43
void preModuleBeginJob(edm::ModuleDescription const &)
void setComment(std::string const &value)
std::vector< std::atomic< bool > * > collectedPathList_
tuple path
else: Piece not in the list, fine.
std::queue< unsigned int > lastGlobalLumisClosed_
void preStreamEndLumi(edm::StreamContext const &)
std::map< unsigned int, double > avgLeadTime_
void watchPostStreamBeginLumi(PostStreamBeginLumi::slot_type const &iSlot)
void doSnapshot(const unsigned int ls, const bool isGlobalEOL)
void preStreamEarlyTermination(edm::StreamContext const &, edm::TerminationOrigin)
void watchPreGlobalEarlyTermination(PreGlobalEarlyTermination::slot_type const &iSlot)
static const std::string nopath_
def load
Definition: svgfig.py:546
ModuleDescription const * moduleDescription() const
#define NRESERVEDMODULES
void resetFastMonitor(std::string const &microStateDefPath, std::string const &fastMicroStateDefPath)
void watchPostGlobalEndLumi(PostGlobalEndLumi::slot_type const &iSlot)
std::map< unsigned int, std::pair< double, unsigned int > > lockStatsDuringLumi_
std::vector< unsigned long > firstEventId_
static const std::string macroStateNames[FastMonitoringThread::MCOUNT]
void watchPreModuleBeginJob(PreModuleBeginJob::slot_type const &iSlot)
void postModuleEvent(edm::StreamContext const &, edm::ModuleCallingContext const &)
virtual std::string write(const Value &root)
Serialize a Value in JSON format.
void postStreamBeginLumi(edm::StreamContext const &)
StreamID const & streamID() const
Definition: StreamContext.h:57
void postStreamEndLumi(edm::StreamContext const &)
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
std::vector< const void * > microstate_
std::vector< unsigned int > microstateEncoded_
unsigned int value() const
Definition: StreamID.h:46
std::map< unsigned int, std::pair< unsigned int, bool > > processedEventsPerLumi_
void preStreamBeginLumi(edm::StreamContext const &)
void registerVariables(FastMonitor *fm, unsigned int nStreams, unsigned int nThreads)
std::atomic< unsigned long > totalEventsProcessed_
LuminosityBlockNumber_t luminosityBlock() const
void watchPreStreamBeginLumi(PreStreamBeginLumi::slot_type const &iSlot)
void add(std::string const &label, ParameterSetDescription const &psetDescription)
FastMonitoringThread::Macrostate macrostate_
std::vector< double > leadTimes_
void stoppedLookingForFile(unsigned int lumi)
void setMicroState(MicroStateService::Microstate)
tuple events
Definition: patZpeak.py:19
boost::filesystem::path workingDirectory_
std::map< unsigned int, unsigned int > sourceEventsReport_
std::unique_ptr< FastMonitor > jsonMonitor_
void postGlobalBeginRun(edm::GlobalContext const &)
void preEvent(edm::StreamContext const &)
void preSourceEarlyTermination(edm::TerminationOrigin)
EventID const & eventID() const
Definition: StreamContext.h:59
void watchPreSourceEvent(PreSourceEvent::slot_type const &iSlot)
bool getAbortFlagForLumi(unsigned int lumi)
void preModuleEvent(edm::StreamContext const &, edm::ModuleCallingContext const &)
Writes a Value in JSON format in a human friendly way.
Definition: writer.h:65
std::vector< const void * > ministate_
const void * decode(unsigned int index)
std::vector< unsigned int > streamLumi_
#define NRESERVEDPATHS
std::vector< Encoding > encPath_
std::vector< unsigned int > ministateEncoded_
FastMonitoringService(const edm::ParameterSet &, edm::ActivityRegistry &)
void reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount)
#define NSPECIALMODULES
std::vector< unsigned int > exceptionInLS_
void watchPostBeginJob(PostBeginJob::slot_type const &iSlot)
convenience function for attaching to signal
array value (ordered list)
Definition: value.h:31