CMS 3D CMS Logo

FastMonitoringService.cc
Go to the documentation of this file.
2 #include <iostream>
3 
5 #include <iomanip>
6 #include <sys/time.h>
7 
18 
21 using namespace jsoncollector;
22 
25 
26 constexpr double throughputFactor() {return (1000000)/double(1024*1024);}
27 
28 static const int nReservedModules = 64;
29 static const int nSpecialModules = 10;
30 static const int nReservedPaths = 1;
31 
32 namespace evf{
33 
34  const std::string FastMonitoringService::macroStateNames[FastMonitoringThread::MCOUNT] =
35  {"Init","JobReady","RunGiven","Running",
36  "Stopping","Done","JobEnded","Error","ErrorEnded","End",
37  "Invalid"};
38 
39  const std::string FastMonitoringService::inputStateNames[FastMonitoringThread::inCOUNT] =
40  {"Ignore","Init","WaitInput","NewLumi","NewLumiBusyEndingLS","NewLumiIdleEndingLS","RunEnd","ProcessingFile","WaitChunk","ChunkReceived",
41  "ChecksumEvent","CachedEvent","ReadEvent","ReadCleanup","NoRequest","NoRequestWithIdleThreads",
42  "NoRequestWithGlobalEoL","NoRequestWithEoLThreads",
43  "SupFileLimit", "SupWaitFreeChunk","SupWaitFreeChunkCopying", "SupWaitFreeThread","SupWaitFreeThreadCopying",
44  "SupBusy", "SupLockPolling","SupLockPollingCopying",
45  "SupNoFile", "SupNewFile", "SupNewFileWaitThreadCopying", "SupNewFileWaitThread",
46  "SupNewFileWaitChunkCopying", "SupNewFileWaitChunk",
47  "WaitInput_fileLimit","WaitInput_waitFreeChunk","WaitInput_waitFreeChunkCopying","WaitInput_waitFreeThread","WaitInput_waitFreeThreadCopying",
48  "WaitInput_busy","WaitInput_lockPolling","WaitInput_lockPollingCopying","WaitInput_runEnd",
49  "WaitInput_noFile","WaitInput_newFile","WaitInput_newFileWaitThreadCopying","WaitInput_newFileWaitThread",
50  "WaitInput_newFileWaitChunkCopying","WaitInput_newFileWaitChunk",
51  "WaitChunk_fileLimit","WaitChunk_waitFreeChunk","WaitChunk_waitFreeChunkCopying","WaitChunk_waitFreeThread","WaitChunk_waitFreeThreadCopying",
52  "WaitChunk_busy","WaitChunk_lockPolling","WaitChunk_lockPollingCopying","WaitChunk_runEnd",
53  "WaitChunk_noFile","WaitChunk_newFile","WaitChunk_newFileWaitThreadCopying","WaitChunk_newFileWaitThread",
54  "WaitChunk_newFileWaitChunkCopying","WaitChunk_newFileWaitChunk"
55  };
56 
57 
58  const std::string FastMonitoringService::nopath_ = "NoPath";
59 
60  FastMonitoringService::FastMonitoringService(const edm::ParameterSet& iPS,
61  edm::ActivityRegistry& reg) :
62  MicroStateService(iPS,reg)
63  ,encModule_(nReservedModules)
64  ,nStreams_(0)//until initialized
65  ,sleepTime_(iPS.getUntrackedParameter<int>("sleepTime", 1))
66  ,fastMonIntervals_(iPS.getUntrackedParameter<unsigned int>("fastMonIntervals", 2))
67  ,fastName_("fastmoni")
68  ,slowName_("slowmoni")
69  ,filePerFwkStream_(iPS.getUntrackedParameter<bool>("filePerFwkStream", false))
70  ,totalEventsProcessed_(0)
71  {
72  reg.watchPreallocate(this, &FastMonitoringService::preallocate);//receiving information on number of threads
74 
79 
83 
88 
90 
93 
94  reg.watchPreSourceEvent(this,&FastMonitoringService::preSourceEvent);//source (with streamID of requestor)
96 
99 
103 
104  //find microstate definition path (required by the module)
105  struct stat statbuf;
106  std::string microstateBaseSuffix = "src/EventFilter/Utilities/plugins/microstatedef.jsd";
107  std::string microstatePath = std::string(getenv("CMSSW_BASE")) + "/" + microstateBaseSuffix;
108  if (stat(microstatePath.c_str(), &statbuf)) {
109  microstatePath = std::string(getenv("CMSSW_RELEASE_BASE")) + "/" + microstateBaseSuffix;
110  if (stat(microstatePath.c_str(), &statbuf)) {
111  microstatePath = microstateBaseSuffix;
112  if (stat(microstatePath.c_str(), &statbuf))
113  throw cms::Exception("FastMonitoringService") << "microstate definition file not found";
114  }
115  }
116  fastMicrostateDefPath_ = microstateDefPath_ = microstatePath;
117  }
118 
119 
121  {
122  }
123 
125  {
127  desc.setComment("Service for File-based DAQ monitoring and event accounting");
128  desc.addUntracked<int> ("sleepTime",1)->setComment("Sleep time of the monitoring thread");
129  desc.addUntracked<unsigned int> ("fastMonIntervals",2)->setComment("Modulo of sleepTime intervals on which fastmon file is written out");
130  desc.addUntracked<bool> ("filePerFwkStream", false)->setComment("Switches on monitoring output per framework stream");
131  desc.setAllowAnything();
132  descriptions.add("FastMonitoringService", desc);
133  }
134 
135 
137  Json::Value legendaVector(Json::arrayValue);
138  for(int i = 0; i < encPath_[0].current_; i++)
139  legendaVector.append(Json::Value(*(static_cast<const std::string *>(encPath_[0].decode(i)))));
140  Json::Value valReserved(nReservedPaths);
141  Json::Value pathLegend;
142  pathLegend["names"]=legendaVector;
143  pathLegend["reserved"]=valReserved;
145  return writer.write(pathLegend);
146  }
147 
149  Json::Value legendaVector(Json::arrayValue);
150  for(int i = 0; i < encModule_.current_; i++)
151  legendaVector.append(Json::Value((static_cast<const edm::ModuleDescription *>(encModule_.decode(i)))->moduleLabel()));
152  Json::Value valReserved(nReservedModules);
153  Json::Value valSpecial(nSpecialModules);
154  Json::Value valOutputModules(nOutputModules_);
155  Json::Value moduleLegend;
156  moduleLegend["names"]=legendaVector;
157  moduleLegend["reserved"]=valReserved;
158  moduleLegend["special"]=valSpecial;
159  moduleLegend["output"]=valOutputModules;
161  return writer.write(moduleLegend);
162  }
163 
165  Json::Value legendaVector(Json::arrayValue);
166  for(int i = 0; i < FastMonitoringThread::inCOUNT; i++)
167  legendaVector.append(Json::Value(inputStateNames[i]));
168  Json::Value moduleLegend;
169  moduleLegend["names"]=legendaVector;
171  return writer.write(moduleLegend);
172  }
173 
175  {
176  nStreams_=bounds.maxNumberOfStreams();
177  nThreads_=bounds.maxNumberOfThreads();
178  //this should already be >=1
179  if (nStreams_==0) nStreams_=1;
180  if (nThreads_==0) nThreads_=1;
181  }
182 
184  edm::ProcessContext const& pc)
185  {
186 
187  // FIND RUN DIRECTORY
188  // The run dir should be set via the configuration of EvFDaqDirector
189 
190  if (edm::Service<evf::EvFDaqDirector>().operator->()==nullptr)
191  {
192  throw cms::Exception("FastMonitoringService") << "EvFDaqDirector is not present";
193 
194  }
195  emptyLumisectionMode_ = edm::Service<evf::EvFDaqDirector>()->emptyLumisectionMode();
196  boost::filesystem::path runDirectory(edm::Service<evf::EvFDaqDirector>()->baseRunDir());
197  workingDirectory_ = runDirectory_ = runDirectory;
198  workingDirectory_ /= "mon";
199 
200  if ( !boost::filesystem::is_directory(workingDirectory_)) {
201  LogDebug("FastMonitoringService") << "<MON> DIR NOT FOUND! Trying to create -: " << workingDirectory_.string() ;
202  boost::filesystem::create_directories(workingDirectory_);
203  if ( !boost::filesystem::is_directory(workingDirectory_))
204  edm::LogWarning("FastMonitoringService") << "Unable to create <MON> DIR -: " << workingDirectory_.string()
205  << ". No monitoring data will be written.";
206  }
207 
208  std::ostringstream fastFileName;
209 
210  fastFileName << fastName_ << "_pid" << std::setfill('0') << std::setw(5) << getpid() << ".fast";
212  fast /= fastFileName.str();
213  fastPath_ = fast.string();
214  if (filePerFwkStream_)
215  for (unsigned int i=0;i<nStreams_;i++) {
216  std::ostringstream fastFileNameTid;
217  fastFileNameTid << fastName_ << "_pid" << std::setfill('0') << std::setw(5) << getpid() << "_tid" << i << ".fast";
219  fastTid /= fastFileNameTid.str();
220  fastPathList_.push_back(fastTid.string());
221  }
222 
223  std::ostringstream moduleLegFile;
224  std::ostringstream moduleLegFileJson;
225  moduleLegFile << "microstatelegend_pid" << std::setfill('0') << std::setw(5) << getpid() << ".leg";
226  moduleLegFileJson << "microstatelegend_pid" << std::setfill('0') << std::setw(5) << getpid() << ".jsn";
227  moduleLegendFile_ = (workingDirectory_/moduleLegFile.str()).string();
228  moduleLegendFileJson_ = (workingDirectory_/moduleLegFileJson.str()).string();
229 
230  std::ostringstream pathLegFile;
231  std::ostringstream pathLegFileJson;
232  pathLegFile << "pathlegend_pid" << std::setfill('0') << std::setw(5) << getpid() << ".leg";
233  pathLegendFile_ = (workingDirectory_/pathLegFile.str()).string();
234  pathLegFileJson << "pathlegend_pid" << std::setfill('0') << std::setw(5) << getpid() << ".jsn";
235  pathLegendFileJson_ = (workingDirectory_/pathLegFileJson.str()).string();
236 
237  std::ostringstream inputLegFileJson;
238  inputLegFileJson << "inputlegend_pid" << std::setfill('0') << std::setw(5) << getpid() << ".jsn";
239  inputLegendFileJson_ = (workingDirectory_/inputLegFileJson.str()).string();
240 
241  LogDebug("FastMonitoringService") << "Initializing FastMonitor with microstate def path -: "
243  //<< encPath_.current_ + 1 << " " << encModule_.current_ + 1
244 
245  /*
246  * initialize the fast monitor with:
247  * vector of pointers to monitorable parameters
248  * path to definition
249  *
250  */
251 
253 
254  for(unsigned int i = 0; i < (mCOUNT); i++)
255  encModule_.updateReserved(static_cast<const void*>(reservedMicroStateNames+i));
257 
258  for (unsigned int i=0;i<nStreams_;i++) {
259  ministate_.emplace_back(&nopath_);
261 
262  //for synchronization
263  streamCounterUpdating_.push_back(new std::atomic<bool>(0));
264 
265  //path (mini) state
266  encPath_.emplace_back(0);
267  encPath_[i].update(static_cast<const void*>(&nopath_));
268  eventCountForPathInit_.push_back(0);
269  firstEventId_.push_back(0);
270  collectedPathList_.push_back(new std::atomic<bool>(0));
271 
272  }
273  //for (unsigned int i=0;i<nThreads_;i++)
274  // threadMicrostate_.push_back(&reservedMicroStateNames[mInvalid]);
275 
276  //initial size until we detect number of bins
281 
282  lastGlobalLumi_=0;
284  isInitTransition_=true;
285  lumiFromSource_=0;
286 
287  //startup monitoring
289  fmt_.jsonMonitor_->setNStreams(nStreams_);
291  monInit_.store(false,std::memory_order_release);
293 
294  //this definition needs: #include "tbb/compat/thread"
295  //however this would results in TBB imeplementation replacing std::thread
296  //(both supposedly call pthread_self())
297  //number of threads created in process could be obtained from /proc,
298  //assuming that all posix threads are true kernel threads capable of running in parallel
299 
300  //#if TBB_IMPLEMENT_CPP0X
302  //threadIDAvailable_=true;
303  //#endif
304 
305  }
306 
308  {
309  std::string context;
310  if (to==edm::TerminationOrigin::ExceptionFromThisContext) context = " FromThisContext ";
311  if (to==edm::TerminationOrigin::ExceptionFromAnotherContext) context = " FromAnotherContext";
312  if (to==edm::TerminationOrigin::ExternalSignal) context = " FromExternalSignal";
313  edm::LogWarning("FastMonitoringService") << " STREAM " << sc.streamID().value() << " earlyTermination -: ID:"<< sc.eventID()
314  << " LS:" << sc.eventID().luminosityBlock() << " " << context;
315  std::lock_guard<std::mutex> lock(fmt_.monlock_);
316  exceptionInLS_.push_back(sc.eventID().luminosityBlock());
317  }
318 
320  {
321  std::string context;
322  if (to==edm::TerminationOrigin::ExceptionFromThisContext) context = " FromThisContext ";
323  if (to==edm::TerminationOrigin::ExceptionFromAnotherContext) context = " FromAnotherContext";
324  if (to==edm::TerminationOrigin::ExternalSignal) context = " FromExternalSignal";
325  edm::LogWarning("FastMonitoringService") << " GLOBAL " << "earlyTermination -: LS:"
326  << gc.luminosityBlockID().luminosityBlock() << " " << context;
327  std::lock_guard<std::mutex> lock(fmt_.monlock_);
329  }
330 
332  {
333  std::string context;
334  if (to==edm::TerminationOrigin::ExceptionFromThisContext) context = " FromThisContext ";
335  if (to==edm::TerminationOrigin::ExceptionFromAnotherContext) context = " FromAnotherContext";
336  if (to==edm::TerminationOrigin::ExternalSignal) context = " FromExternalSignal";
337  edm::LogWarning("FastMonitoringService") << " SOURCE " << "earlyTermination -: " << context;
338  std::lock_guard<std::mutex> lock(fmt_.monlock_);
339  exception_detected_=true;
340  }
341 
343  if (!ls) exception_detected_=true;
344  else exceptionInLS_.push_back(ls);
345  }
346 
348  {
350  }
351 
352  //new output module name is stream
354  {
355  std::lock_guard<std::mutex> lock(fmt_.monlock_);
356  //std::cout << " Pre module Begin Job module: " << desc.moduleName() << std::endl;
357 
358  //build a map of modules keyed by their module description address
359  //here we need to treat output modules in a special way so they can be easily singled out
360  if(desc.moduleName() == "Stream" || desc.moduleName() == "ShmStreamConsumer" || desc.moduleName() == "EvFOutputModule" ||
361  desc.moduleName() == "EventStreamFileWriter" || desc.moduleName() == "PoolOutputModule") {
362  encModule_.updateReserved((void*)&desc);
363  nOutputModules_++;
364  }
365  else
366  encModule_.update((void*)&desc);
367  }
368 
370  {
371  std::string && moduleLegStrJson = makeModuleLegendaJson();
372  FileIO::writeStringToFile(moduleLegendFileJson_, moduleLegStrJson);
373 
374  std::string inputLegendStrJson = makeInputLegendaJson();
375  FileIO::writeStringToFile(inputLegendFileJson_, inputLegendStrJson);
376 
378 
379  //update number of entries in module histogram
380  std::lock_guard<std::mutex> lock(fmt_.monlock_);
382  }
383 
385  {
387  fmt_.stop();
388  }
389 
391  {
393  isInitTransition_=false;
394  }
395 
397  {
398  timeval lumiStartTime;
399  gettimeofday(&lumiStartTime, 0);
400  unsigned int newLumi = gc.luminosityBlockID().luminosityBlock();
401 
402  std::lock_guard<std::mutex> lock(fmt_.monlock_);
403 
404  lumiStartTime_[newLumi]=lumiStartTime;
405  while (!lastGlobalLumisClosed_.empty()) {
406  //wipe out old map entries as they aren't needed and slow down access
407  unsigned int oldLumi = lastGlobalLumisClosed_.back();
409  lumiStartTime_.erase(oldLumi);
410  avgLeadTime_.erase(oldLumi);
411  filesProcessedDuringLumi_.erase(oldLumi);
412  accuSize_.erase(oldLumi);
413  lockStatsDuringLumi_.erase(oldLumi);
414  processedEventsPerLumi_.erase(oldLumi);
415  }
416  lastGlobalLumi_= newLumi;
418  }
419 
421  {
422  unsigned int lumi = gc.luminosityBlockID().luminosityBlock();
423  LogDebug("FastMonitoringService") << "Lumi ended. Writing JSON information. LUMI -: "
424  << lumi;
425  timeval lumiStopTime;
426  gettimeofday(&lumiStopTime, 0);
427 
428  std::lock_guard<std::mutex> lock(fmt_.monlock_);
429 
430  // Compute throughput
431  timeval stt = lumiStartTime_[lumi];
432  unsigned long usecondsForLumi = (lumiStopTime.tv_sec - stt.tv_sec)*1000000
433  + (lumiStopTime.tv_usec - stt.tv_usec);
434  unsigned long accuSize = accuSize_.find(lumi)==accuSize_.end() ? 0 : accuSize_[lumi];
435  double throughput = throughputFactor()* double(accuSize) / double(usecondsForLumi);
436  //store to registered variable
437  fmt_.m_data.fastThroughputJ_.value() = throughput;
438 
439  //update
440  doSnapshot(lumi,true);
441 
442  //retrieve one result we need (todo: sanity check if it's found)
443  IntJ *lumiProcessedJptr = dynamic_cast<IntJ*>(fmt_.jsonMonitor_->getMergedIntJForLumi("Processed",lumi));
444  if (!lumiProcessedJptr)
445  throw cms::Exception("FastMonitoringService") << "Internal error: got null pointer from FastMonitor";
446  processedEventsPerLumi_[lumi] = std::pair<unsigned int,bool>(lumiProcessedJptr->value(),false);
447 
448  //checking if exception has been thrown (in case of Global/Stream early termination, for this LS)
449  bool exception_detected = exception_detected_;
450  for (auto ex : exceptionInLS_)
451  if (lumi == ex) exception_detected=true;
452 
453  if (edm::shutdown_flag || exception_detected) {
454  edm::LogInfo("FastMonitoringService") << "Run interrupted. Skip writing EoL information -: "
455  << processedEventsPerLumi_[lumi].first << " events were processed in LUMI " << lumi;
456  //this will prevent output modules from producing json file for possibly incomplete lumi
457  processedEventsPerLumi_[lumi].first=0;
458  processedEventsPerLumi_[lumi].second=true;
459  //disable this exception, so service can be used standalone (will be thrown if output module asks for this information)
460  //throw cms::Exception("FastMonitoringService") << "SOURCE did not send update for lumi block. LUMI -:" << lumi;
461  return;
462 
463  }
464 
465  if (inputSource_) {
466  auto sourceReport = inputSource_->getEventReport(lumi, true);
467  if (sourceReport.first) {
468  if (sourceReport.second!=processedEventsPerLumi_[lumi].first) {
469  throw cms::Exception("FastMonitoringService") << "MISMATCH with SOURCE update. LUMI -: "
470  << lumi
471  << ", events(processed):" << processedEventsPerLumi_[lumi].first
472  << " events(source):" << sourceReport.second;
473  }
474  }
475  }
476  edm::LogInfo("FastMonitoringService") << "Statistics for lumisection -: lumi = " << lumi << " events = "
477  << lumiProcessedJptr->value() << " time = " << usecondsForLumi/1000000
478  << " size = " << accuSize << " thr = " << throughput;
479  delete lumiProcessedJptr;
480 
481  //full global and stream merge&output for this lumi
482 
483  // create file name for slow monitoring file
484  if (filePerFwkStream_) {
485  std::stringstream slowFileNameStem;
486  slowFileNameStem << slowName_ << "_ls" << std::setfill('0') << std::setw(4)
487  << lumi << "_pid" << std::setfill('0')
488  << std::setw(5) << getpid();
490  slow /= slowFileNameStem.str();
491  fmt_.jsonMonitor_->outputFullJSONs(slow.string(),".jsn",lumi);
492  }
493  else {
494  std::stringstream slowFileName;
495  slowFileName << slowName_ << "_ls" << std::setfill('0') << std::setw(4)
496  << lumi << "_pid" << std::setfill('0')
497  << std::setw(5) << getpid() << ".jsn";
499  slow /= slowFileName.str();
500  fmt_.jsonMonitor_->outputFullJSON(slow.string(),lumi);//full global and stream merge and JSON write for this lumi
501  }
502  fmt_.jsonMonitor_->discardCollected(lumi);//we don't do further updates for this lumi
503 
505  }
506 
508  {
509  std::lock_guard<std::mutex> lock(fmt_.monlock_);
510  //mark closed lumis (still keep map entries until next one)
512  }
513 
515  {
516  unsigned int sid = sc.streamID().value();
517 
518  std::lock_guard<std::mutex> lock(fmt_.monlock_);
520 
521  //reset collected values for this stream
522  *(fmt_.m_data.processed_[sid])=0;
523 
524  ministate_[sid]=&nopath_;
526  }
527 
529  {
531  }
532 
534  {
535  unsigned int sid = sc.streamID().value();
536  std::lock_guard<std::mutex> lock(fmt_.monlock_);
537 
538  #if ATOMIC_LEVEL>=2
539  //spinlock to make sure we are not still updating event counter somewhere
540  while (streamCounterUpdating_[sid]->load(std::memory_order_acquire)) {}
541  #endif
542 
543  //update processed count to be complete at this time
545  //reset this in case stream does not get notified of next lumi (we keep processed events only)
546  ministate_[sid]=&nopath_;
548  }
550  {
552  }
553 
554 
556  {
557  //make sure that all path names are retrieved before allowing ministate to change
558  //hack: assume memory is synchronized after ~50 events seen by each stream
559  if (unlikely(eventCountForPathInit_[sc.streamID()]<50) && false==collectedPathList_[sc.streamID()]->load(std::memory_order_acquire))
560  {
561  //protection between stream threads, as well as the service monitoring thread
562  std::lock_guard<std::mutex> lock(fmt_.monlock_);
563 
564  if (firstEventId_[sc.streamID()]==0)
565  firstEventId_[sc.streamID()]=sc.eventID().event();
566  if (sc.eventID().event()==firstEventId_[sc.streamID()])
567  {
568  encPath_[sc.streamID()].update((void*)&pc.pathName());
569  return;
570  }
571  else {
572  //finished collecting path names
573  collectedPathList_[sc.streamID()]->store(true,std::memory_order_seq_cst);
574  fmt_.m_data.ministateBins_=encPath_[sc.streamID()].vecsize();
575  if (!pathLegendWritten_) {
576  std::string pathLegendStrJson = makePathLegendaJson();
577  FileIO::writeStringToFile(pathLegendFileJson_, pathLegendStrJson);
578  pathLegendWritten_=true;
579  }
580  }
581  }
582  else {
583  ministate_[sc.streamID()] = &(pc.pathName());
584  }
585  }
586 
587 
589  {
590  }
591 
593  {
595 
596  ministate_[sc.streamID()] = &nopath_;
597 
598  #if ATOMIC_LEVEL>=2
599  //use atomic flag to make sure end of lumi sees this
600  streamCounterUpdating_[sc.streamID()]->store(true,std::memory_order_release);
601  fmt_.m_data.processed_[sc.streamID()]->fetch_add(1,std::memory_order_release);
602  streamCounterUpdating_[sc.streamID()]->store(false,std::memory_order_release);
603 
604  #elif ATOMIC_LEVEL==1
605  //writes are atomic, we assume writes propagate to memory before stream EOL snap
606  fmt_.m_data.processed_[sc.streamID()]->fetch_add(1,std::memory_order_relaxed);
607 
608  #elif ATOMIC_LEVEL==0 //default
609  (*(fmt_.m_data.processed_[sc.streamID()]))++;
610  #endif
611  eventCountForPathInit_[sc.streamID()].m_value++;
612 
613  //fast path counter (events accumulated in a run)
614  unsigned long res = totalEventsProcessed_.fetch_add(1,std::memory_order_relaxed);
616  //fmt_.m_data.fastPathProcessedJ_ = totalEventsProcessed_.load(std::memory_order_relaxed);
617  }
618 
620  {
622  }
623 
625  {
627  }
628 
630  {
631  microstate_[sc.streamID().value()] = (void*)(mcc.moduleDescription());
632  }
633 
635  {
636  //microstate_[sc.streamID().value()] = (void*)(mcc.moduleDescription());
638  }
639 
640  //FUNCTIONS CALLED FROM OUTSIDE
641 
642  //this is for old-fashioned service that is not thread safe and can block other streams
643  //(we assume the worst case - everything is blocked)
645  {
646  for (unsigned int i=0;i<nStreams_;i++)
648  }
649 
650  //this is for services that are multithreading-enabled or rarely blocks other streams
652  {
654  }
655 
656  //from source
657  void FastMonitoringService::accumulateFileSize(unsigned int lumi, unsigned long fileSize) {
658  std::lock_guard<std::mutex> lock(fmt_.monlock_);
659 
660  if (accuSize_.find(lumi)==accuSize_.end()) accuSize_[lumi] = fileSize;
661  else accuSize_[lumi] += fileSize;
662 
664  filesProcessedDuringLumi_[lumi] = 1;
665  else
667  }
668 
670  gettimeofday(&fileLookStart_, 0);
671  /*
672  std::cout << "Started looking for .raw file at: s=" << fileLookStart_.tv_sec << ": ms = "
673  << fileLookStart_.tv_usec / 1000.0 << std::endl;
674  */
675  }
676 
678  gettimeofday(&fileLookStop_, 0);
679  /*
680  std::cout << "Stopped looking for .raw file at: s=" << fileLookStop_.tv_sec << ": ms = "
681  << fileLookStop_.tv_usec / 1000.0 << std::endl;
682  */
683  std::lock_guard<std::mutex> lock(fmt_.monlock_);
684 
685  if (lumi>lumiFromSource_) {
687  leadTimes_.clear();
688  }
689  unsigned long elapsedTime = (fileLookStop_.tv_sec - fileLookStart_.tv_sec) * 1000000 // sec to us
690  + (fileLookStop_.tv_usec - fileLookStart_.tv_usec); // us
691  // add this to lead times for this lumi
692  leadTimes_.push_back((double)elapsedTime);
693 
694  // recompute average lead time for this lumi
695  if (leadTimes_.size() == 1) avgLeadTime_[lumi] = leadTimes_[0];
696  else {
697  double totTime = 0;
698  for (unsigned int i = 0; i < leadTimes_.size(); i++) totTime += leadTimes_[i];
699  avgLeadTime_[lumi] = 0.001*(totTime / leadTimes_.size());
700  }
701  }
702 
703  void FastMonitoringService::reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount)
704  {
705  std::lock_guard<std::mutex> lock(fmt_.monlock_);
706  lockStatsDuringLumi_[ls]=std::pair<double,unsigned int>(waitTime,lockCount);
707 
708  }
709 
710  //for the output module
711  unsigned int FastMonitoringService::getEventsProcessedForLumi(unsigned int lumi, bool * abortFlag) {
712  std::lock_guard<std::mutex> lock(fmt_.monlock_);
713 
714  auto it = processedEventsPerLumi_.find(lumi);
715  if (it!=processedEventsPerLumi_.end()) {
716  unsigned int proc = it->second.first;
717  if (abortFlag) *abortFlag=it->second.second;
718  return proc;
719  }
720  else {
721  throw cms::Exception("FastMonitoringService") << "output module wants already deleted (or never reported by SOURCE) lumisection event count for LUMI -: "<<lumi;
722  return 0;
723  }
724  }
725 
726  //for the output module
728  std::lock_guard<std::mutex> lock(fmt_.monlock_);
729 
730  auto it = processedEventsPerLumi_.find(lumi);
731  if (it!=processedEventsPerLumi_.end()) {
732  unsigned int abortFlag = it->second.second;
733  return abortFlag;
734  }
735  else {
736  throw cms::Exception("FastMonitoringService") << "output module wants already deleted (or never reported by SOURCE) lumisection status for LUMI -: "<<lumi;
737  return 0;
738  }
739  }
740 
741  void FastMonitoringService::doSnapshot(const unsigned int ls, const bool isGlobalEOL) {
742  // update macrostate
744 
745  std::vector<const void*> microstateCopy(microstate_.begin(),microstate_.end());
746 
747  //update these unless in the midst of a global transition
749 
750  auto itd = avgLeadTime_.find(ls);
751  if (itd != avgLeadTime_.end())
752  fmt_.m_data.fastAvgLeadTimeJ_ = itd->second;
753  else fmt_.m_data.fastAvgLeadTimeJ_=0.;
754 
755  auto iti = filesProcessedDuringLumi_.find(ls);
756  if (iti != filesProcessedDuringLumi_.end())
757  fmt_.m_data.fastFilesProcessedJ_ = iti->second;
759 
760  auto itrd = lockStatsDuringLumi_.find(ls);
761  if (itrd != lockStatsDuringLumi_.end()) {
762  fmt_.m_data.fastLockWaitJ_ = itrd->second.first;
763  fmt_.m_data.fastLockCountJ_ = itrd->second.second;
764  }
765  else {
768  }
769  }
770  else {
772  for (unsigned int i=0;i<nStreams_;i++) {
773  if (microstateCopy[i]==&reservedMicroStateNames[mFwkEoL]) {
774  microstateCopy[i]=&reservedMicroStateNames[mGlobEoL];
775  }
776  }
777  }
778 
779  for (unsigned int i=0;i<nStreams_;i++) {
781  fmt_.m_data.microstateEncoded_[i] = encModule_.encode(microstateCopy[i]);
782  }
783 
784  bool inputStatePerThread=false;
785 
787  switch (inputSupervisorState_) {
790  break;
793  break;
796  break;
799  break;
802  break;
805  break;
808  break;
811  break;
814  break;
817  break;
820  break;
823  break;
826  break;
829  break;
832  break;
833  default:
835  }
836  }
838 
839  switch (inputSupervisorState_) {
842  break;
845  break;
848  break;
851  break;
854  break;
857  break;
860  break;
863  break;
866  break;
869  break;
872  break;
875  break;
878  break;
881  break;
884  break;
885  default:
887  }
888  }
892  else {
893  inputStatePerThread=true;
894  for (unsigned int i=0;i<nStreams_;i++) {
895  if (microstateCopy[i]==&reservedMicroStateNames[mIdle])
897  else if (microstateCopy[i]==&reservedMicroStateNames[mEoL] ||
898  microstateCopy[i]==&reservedMicroStateNames[mFwkEoL] ||
899  microstateCopy[i]==&reservedMicroStateNames[mGlobEoL])
901  else
903  }
904  }
905  }
907  inputStatePerThread=true;
908  for (unsigned int i=0;i<nStreams_;i++) {
909  if (microstateCopy[i]==&reservedMicroStateNames[mEoL] ||
910  microstateCopy[i]==&reservedMicroStateNames[mFwkEoL] ||
911  microstateCopy[i]==&reservedMicroStateNames[mGlobEoL])
913  else if (microstateCopy[i]==&reservedMicroStateNames[mIdle])
915  else
917  }
918  }
919  else
921 
922  //this is same for all streams
923  if (!inputStatePerThread)
924  for (unsigned int i=1;i<nStreams_;i++)
926 
927  if (isGlobalEOL)
928  {//only update global variables
929  fmt_.jsonMonitor_->snapGlobal(ls);
930  }
931  else
932  fmt_.jsonMonitor_->snap(ls);
933  }
934 
935 } //end namespace evf
936 
#define LogDebug(id)
void prePathEvent(edm::StreamContext const &, edm::PathContext const &)
std::string const & pathName() const
Definition: PathContext.h:37
unsigned int maxNumberOfThreads() const
Definition: SystemBounds.h:46
EventNumber_t event() const
Definition: EventID.h:41
void watchPreStreamEarlyTermination(PreStreamEarlyTermination::slot_type const &iSlot)
void watchPreEvent(PreEvent::slot_type const &iSlot)
unsigned int getEventsProcessedForLumi(unsigned int lumi, bool *abortFlag=0)
static const edm::ModuleDescription reservedMicroStateNames[mCOUNT]
std::vector< ContainableAtomic< const void * > > microstate_
std::atomic< bool > isInitTransition_
void watchPrePathEvent(PrePathEvent::slot_type const &iSlot)
void watchPreallocate(Preallocate::slot_type const &iSlot)
ParameterDescriptionBase * addUntracked(U const &iLabel, T const &value)
void setExceptionDetected(unsigned int ls)
boost::filesystem::path runDirectory_
void watchPostEndJob(PostEndJob::slot_type const &iSlot)
void preallocate(edm::service::SystemBounds const &)
std::map< unsigned int, timeval > lumiStartTime_
void start(void(FastMonitoringService::*fp)(), FastMonitoringService *cp)
void preGlobalBeginLumi(edm::GlobalContext const &)
std::pair< bool, unsigned int > getEventReport(unsigned int lumi, bool erase)
void setAllowAnything()
allow any parameter label/value pairs
double throughputFactor()
void watchPreModuleEvent(PreModuleEvent::slot_type const &iSlot)
void postGlobalEndLumi(edm::GlobalContext const &)
TrainProcessor *const proc
Definition: MVATrainer.cc:101
void postEvent(edm::StreamContext const &)
void accumulateFileSize(unsigned int lumi, unsigned long fileSize)
void watchPreGlobalEndLumi(PreGlobalEndLumi::slot_type const &iSlot)
std::map< unsigned int, unsigned long > accuSize_
std::atomic< FastMonitoringThread::InputState > inputSupervisorState_
std::vector< std::atomic< bool > * > streamCounterUpdating_
void watchPostEvent(PostEvent::slot_type const &iSlot)
static const int nReservedPaths
volatile std::atomic< bool > shutdown_flag
LuminosityBlockID const & luminosityBlockID() const
Definition: GlobalContext.h:52
void watchPostStreamEndLumi(PostStreamEndLumi::slot_type const &iSlot)
void watchPreGlobalBeginLumi(PreGlobalBeginLumi::slot_type const &iSlot)
std::string const & moduleName() const
void watchPostModuleEvent(PostModuleEvent::slot_type const &iSlot)
Value & append(const Value &value)
Append value to array at the end.
void watchPostSourceEvent(PostSourceEvent::slot_type const &iSlot)
void doStreamEOLSnapshot(const unsigned int ls, const unsigned int streamID)
std::map< unsigned int, unsigned int > filesProcessedDuringLumi_
void preGlobalEndLumi(edm::GlobalContext const &)
Represents a JSON value.
Definition: value.h:111
LuminosityBlockNumber_t luminosityBlock() const
Definition: EventID.h:40
void watchPreStreamEndLumi(PreStreamEndLumi::slot_type const &iSlot)
void preGlobalEarlyTermination(edm::GlobalContext const &, edm::TerminationOrigin)
#define constexpr
void watchPreSourceEarlyTermination(PreSourceEarlyTermination::slot_type const &iSlot)
Definition: Electron.h:4
void watchJobFailure(JobFailure::slot_type const &iSlot)
convenience function for attaching to signal
void registerVariables(jsoncollector::FastMonitor *fm, unsigned int nStreams, unsigned int nThreads)
#define unlikely(x)
unsigned int maxNumberOfStreams() const
Definition: SystemBounds.h:43
void preModuleBeginJob(edm::ModuleDescription const &)
static const std::string inputStateNames[FastMonitoringThread::inCOUNT]
void setComment(std::string const &value)
std::vector< std::atomic< bool > * > collectedPathList_
std::queue< unsigned int > lastGlobalLumisClosed_
void preStreamEndLumi(edm::StreamContext const &)
std::map< unsigned int, double > avgLeadTime_
void watchPostStreamBeginLumi(PostStreamBeginLumi::slot_type const &iSlot)
void doSnapshot(const unsigned int ls, const bool isGlobalEOL)
void preStreamEarlyTermination(edm::StreamContext const &, edm::TerminationOrigin)
void watchPreGlobalEarlyTermination(PreGlobalEarlyTermination::slot_type const &iSlot)
std::unique_ptr< jsoncollector::FastMonitor > jsonMonitor_
static const std::string nopath_
ModuleDescription const * moduleDescription() const
void resetFastMonitor(std::string const &microStateDefPath, std::string const &fastMicroStateDefPath)
static const int nReservedModules
void watchPostGlobalEndLumi(PostGlobalEndLumi::slot_type const &iSlot)
std::map< unsigned int, std::pair< double, unsigned int > > lockStatsDuringLumi_
std::vector< unsigned long > firstEventId_
void watchPreModuleBeginJob(PreModuleBeginJob::slot_type const &iSlot)
void postModuleEvent(edm::StreamContext const &, edm::ModuleCallingContext const &)
virtual std::string write(const Value &root)
Serialize a Value in JSON format.
std::vector< std::string > fastPathList_
std::vector< ContainableAtomic< unsigned int > > eventCountForPathInit_
void postStreamBeginLumi(edm::StreamContext const &)
std::atomic< FastMonitoringThread::InputState > inputState_
StreamID const & streamID() const
Definition: StreamContext.h:57
def ls(path, rec=False)
Definition: eostools.py:348
void postStreamEndLumi(edm::StreamContext const &)
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
std::vector< unsigned int > microstateEncoded_
unsigned int value() const
Definition: StreamID.h:46
std::map< unsigned int, std::pair< unsigned int, bool > > processedEventsPerLumi_
void preStreamBeginLumi(edm::StreamContext const &)
std::atomic< unsigned long > totalEventsProcessed_
FedRawDataInputSource * inputSource_
std::atomic< FastMonitoringThread::Macrostate > macrostate_
def load(fileName)
Definition: svgfig.py:546
static const int nSpecialModules
std::vector< ContainableAtomic< const void * > > ministate_
LuminosityBlockNumber_t luminosityBlock() const
void watchPreStreamBeginLumi(PreStreamBeginLumi::slot_type const &iSlot)
void add(std::string const &label, ParameterSetDescription const &psetDescription)
std::vector< double > leadTimes_
void preBeginJob(edm::PathsAndConsumesOfModulesBase const &, edm::ProcessContext const &pc)
void watchPreBeginJob(PreBeginJob::slot_type const &iSlot)
convenience function for attaching to signal
void stoppedLookingForFile(unsigned int lumi)
void setMicroState(MicroStateService::Microstate)
boost::filesystem::path workingDirectory_
if(dp >Float(M_PI)) dp-
std::vector< jsoncollector::AtomicMonUInt * > processed_
void postGlobalBeginRun(edm::GlobalContext const &)
void preEvent(edm::StreamContext const &)
void preSourceEarlyTermination(edm::TerminationOrigin)
EventID const & eventID() const
Definition: StreamContext.h:59
void watchPreSourceEvent(PreSourceEvent::slot_type const &iSlot)
bool getAbortFlagForLumi(unsigned int lumi)
void preModuleEvent(edm::StreamContext const &, edm::ModuleCallingContext const &)
Writes a Value in JSON format in a human friendly way.
Definition: writer.h:65
const void * decode(unsigned int index)
std::vector< unsigned int > streamLumi_
std::vector< Encoding > encPath_
std::vector< unsigned int > inputState_
std::vector< unsigned int > ministateEncoded_
void reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount)
std::vector< unsigned int > exceptionInLS_
void watchPostBeginJob(PostBeginJob::slot_type const &iSlot)
convenience function for attaching to signal
array value (ordered list)
Definition: value.h:31