CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
List of all members | Classes | Public Member Functions | Static Public Attributes | Private Member Functions | Private Attributes
evf::FastMonitoringService Class Reference

#include <FastMonitoringService.h>

Inheritance diagram for evf::FastMonitoringService:
evf::MicroStateService

Classes

struct  Encoding
 

Public Member Functions

void accumulateFileSize (unsigned int lumi, unsigned long fileSize)
 
 FastMonitoringService (const edm::ParameterSet &, edm::ActivityRegistry &)
 
unsigned int getEventsProcessedForLumi (unsigned int lumi)
 
std::string getRunDirName () const
 
void jobFailure ()
 
std::string makeModuleLegenda ()
 
std::string makePathLegenda ()
 
void postBeginJob ()
 
void postEndJob ()
 
void postEvent (edm::StreamContext const &)
 
void postGlobalBeginRun (edm::GlobalContext const &)
 
void postGlobalEndLumi (edm::GlobalContext const &)
 
void postModuleEvent (edm::StreamContext const &, edm::ModuleCallingContext const &)
 
void postSourceEvent (edm::StreamID)
 
void postStreamBeginLumi (edm::StreamContext const &)
 
void postStreamEndLumi (edm::StreamContext const &)
 
void preallocate (edm::service::SystemBounds const &)
 
void preEvent (edm::StreamContext const &)
 
void preGlobalBeginLumi (edm::GlobalContext const &)
 
void preGlobalEarlyTermination (edm::GlobalContext const &, edm::TerminationOrigin)
 
void preGlobalEndLumi (edm::GlobalContext const &)
 
void preModuleBeginJob (edm::ModuleDescription const &)
 
void preModuleEvent (edm::StreamContext const &, edm::ModuleCallingContext const &)
 
void prePathEvent (edm::StreamContext const &, edm::PathContext const &)
 
void preSourceEarlyTermination (edm::TerminationOrigin)
 
void preSourceEvent (edm::StreamID)
 
void preStreamBeginLumi (edm::StreamContext const &)
 
void preStreamEarlyTermination (edm::StreamContext const &, edm::TerminationOrigin)
 
void preStreamEndLumi (edm::StreamContext const &)
 
void reportEventsThisLumiInSource (unsigned int lumi, unsigned int events)
 
void setMicroState (MicroStateService::Microstate)
 
void setMicroState (edm::StreamID, MicroStateService::Microstate)
 
void startedLookingForFile ()
 
void stoppedLookingForFile (unsigned int lumi)
 
 ~FastMonitoringService ()
 
- Public Member Functions inherited from evf::MicroStateService
virtual std::string getMicroState1 ()
 
virtual std::string const & getMicroState2 ()
 
 MicroStateService (const edm::ParameterSet &, edm::ActivityRegistry &)
 
virtual ~MicroStateService ()
 

Static Public Attributes

static const std::string macroStateNames [FastMonitoringThread::MCOUNT]
 
static const std::string nopath_ = "NoPath"
 
- Static Public Attributes inherited from evf::MicroStateService
static const edm::ModuleDescription reservedMicroStateNames [mCOUNT]
 

Private Member Functions

void doSnapshot (const unsigned int ls, const bool isGlobalEOL)
 
void doStreamEOLSnapshot (const unsigned int ls, const unsigned int streamID)
 
void dowork ()
 

Private Attributes

std::map< unsigned int,
unsigned long > 
accuSize_
 
std::map< unsigned int, double > avgLeadTime_
 
std::vector< std::atomic< bool > * > collectedPathList_
 
Encoding encModule_
 
std::vector< EncodingencPath_
 
std::vector< unsigned int > eventCountForPathInit_
 
bool exception_detected_ = false
 
std::vector< unsigned int > exceptionInLS_
 
std::string fastMicrostateDefPath_
 
unsigned int fastMonIntervals_
 
std::string fastName_
 
std::string fastPath_
 
timeval fileLookStart_
 
timeval fileLookStop_
 
std::map< unsigned int,
unsigned int > 
filesProcessedDuringLumi_
 
std::vector< unsigned long > firstEventId_
 
FastMonitoringThread fmt_
 
bool isGlobalLumiTransition_
 
unsigned int lastGlobalLumi_
 
std::queue< unsigned int > lastGlobalLumisClosed_
 
std::vector< double > leadTimes_
 
unsigned int lumiFromSource_
 
std::map< unsigned int, timeval > lumiStartTime_
 
FastMonitoringThread::Macrostate macrostate_
 
std::vector< const void * > microstate_
 
std::string microstateDefPath_
 
std::vector< const void * > ministate_
 
std::string moduleLegendFile_
 
std::atomic< bool > monInit_
 
unsigned int nStreams_
 
unsigned int nThreads_
 
std::string pathLegendFile_
 
bool pathLegendWritten_ = false
 
std::vector< bool > pathNamesReady_
 
std::map< unsigned int,
unsigned int > 
processedEventsPerLumi_
 
boost::filesystem::path runDirectory_
 
int sleepTime_
 
std::string slowName_
 
unsigned int snapCounter_ = 0
 
std::map< unsigned int,
unsigned int > 
sourceEventsReport_
 
std::vector< std::atomic< bool > * > streamCounterUpdating_
 
bool threadIDAvailable_ = false
 
std::vector< const void * > threadMicrostate_
 
std::atomic< unsigned long > totalEventsProcessed_
 
boost::filesystem::path workingDirectory_
 

Additional Inherited Members

- Public Types inherited from evf::MicroStateService
enum  Microstate {
  mInvalid = 0, mFwkOvh, mIdle, mInput,
  mInputDone, mDqm, mEoL, mCOUNT
}
 
- Static Protected Attributes inherited from evf::MicroStateService
static const std::string default_return_ ="NotImplemented"
 

Detailed Description

Definition at line 51 of file FastMonitoringService.h.

Constructor & Destructor Documentation

evf::FastMonitoringService::FastMonitoringService ( const edm::ParameterSet iPS,
edm::ActivityRegistry reg 
)

Definition at line 32 of file FastMonitoringService.cc.

References jobFailure(), postBeginJob(), postEndJob(), postEvent(), postGlobalEndLumi(), postModuleEvent(), postSourceEvent(), postStreamBeginLumi(), postStreamEndLumi(), preallocate(), preEvent(), preGlobalBeginLumi(), preGlobalEarlyTermination(), preGlobalEndLumi(), preModuleBeginJob(), preModuleEvent(), prePathEvent(), preSourceEarlyTermination(), preSourceEvent(), preStreamBeginLumi(), preStreamEarlyTermination(), preStreamEndLumi(), edm::ActivityRegistry::watchJobFailure(), edm::ActivityRegistry::watchPostBeginJob(), edm::ActivityRegistry::watchPostEndJob(), edm::ActivityRegistry::watchPostEvent(), edm::ActivityRegistry::watchPostGlobalEndLumi(), edm::ActivityRegistry::watchPostModuleEvent(), edm::ActivityRegistry::watchPostSourceEvent(), edm::ActivityRegistry::watchPostStreamBeginLumi(), edm::ActivityRegistry::watchPostStreamEndLumi(), edm::ActivityRegistry::watchPreallocate(), edm::ActivityRegistry::watchPreEvent(), edm::ActivityRegistry::watchPreGlobalBeginLumi(), edm::ActivityRegistry::watchPreGlobalEarlyTermination(), edm::ActivityRegistry::watchPreGlobalEndLumi(), edm::ActivityRegistry::watchPreModuleBeginJob(), edm::ActivityRegistry::watchPreModuleEvent(), edm::ActivityRegistry::watchPrePathEvent(), edm::ActivityRegistry::watchPreSourceEarlyTermination(), edm::ActivityRegistry::watchPreSourceEvent(), edm::ActivityRegistry::watchPreStreamBeginLumi(), edm::ActivityRegistry::watchPreStreamEarlyTermination(), and edm::ActivityRegistry::watchPreStreamEndLumi().

33  :
34  MicroStateService(iPS,reg)
35  ,encModule_(33)
36  ,nStreams_(0)//until initialized
37  ,sleepTime_(iPS.getUntrackedParameter<int>("sleepTime", 1))
38  ,fastMonIntervals_(iPS.getUntrackedParameter<unsigned int>("fastMonIntervals", 1))
39  ,microstateDefPath_(iPS.getUntrackedParameter<std::string> ("microstateDefPath", std::string(getenv("CMSSW_BASE"))+"/src/EventFilter/Utilities/plugins/microstatedef.jsd"))
41  ,fastName_(iPS.getUntrackedParameter<std::string>("fastName", "fastmoni"))
42  ,slowName_(iPS.getUntrackedParameter<std::string>("slowName", "slowmoni"))
44  {
45  reg.watchPreallocate(this, &FastMonitoringService::preallocate);//receiving information on number of threads
47 
51 
55 
60 
62 
65 
66  reg.watchPreSourceEvent(this,&FastMonitoringService::preSourceEvent);//source (with streamID of requestor)
68 
71 
75  }
void prePathEvent(edm::StreamContext const &, edm::PathContext const &)
void watchPreStreamEarlyTermination(PreStreamEarlyTermination::slot_type const &iSlot)
T getUntrackedParameter(std::string const &, T const &) const
void watchPreEvent(PreEvent::slot_type const &iSlot)
void watchPrePathEvent(PrePathEvent::slot_type const &iSlot)
void watchPreallocate(Preallocate::slot_type const &iSlot)
void watchPostEndJob(PostEndJob::slot_type const &iSlot)
void preallocate(edm::service::SystemBounds const &)
void preGlobalBeginLumi(edm::GlobalContext const &)
void watchPreModuleEvent(PreModuleEvent::slot_type const &iSlot)
void postGlobalEndLumi(edm::GlobalContext const &)
void postEvent(edm::StreamContext const &)
void watchPreGlobalEndLumi(PreGlobalEndLumi::slot_type const &iSlot)
void watchPostEvent(PostEvent::slot_type const &iSlot)
void watchPostStreamEndLumi(PostStreamEndLumi::slot_type const &iSlot)
void watchPreGlobalBeginLumi(PreGlobalBeginLumi::slot_type const &iSlot)
void watchPostModuleEvent(PostModuleEvent::slot_type const &iSlot)
void watchPostSourceEvent(PostSourceEvent::slot_type const &iSlot)
void preGlobalEndLumi(edm::GlobalContext const &)
void watchPreStreamEndLumi(PreStreamEndLumi::slot_type const &iSlot)
void preGlobalEarlyTermination(edm::GlobalContext const &, edm::TerminationOrigin)
void watchPreSourceEarlyTermination(PreSourceEarlyTermination::slot_type const &iSlot)
void watchJobFailure(JobFailure::slot_type const &iSlot)
convenience function for attaching to signal
void preModuleBeginJob(edm::ModuleDescription const &)
MicroStateService(const edm::ParameterSet &, edm::ActivityRegistry &)
void preStreamEndLumi(edm::StreamContext const &)
void watchPostStreamBeginLumi(PostStreamBeginLumi::slot_type const &iSlot)
void preStreamEarlyTermination(edm::StreamContext const &, edm::TerminationOrigin)
void watchPreGlobalEarlyTermination(PreGlobalEarlyTermination::slot_type const &iSlot)
void watchPostGlobalEndLumi(PostGlobalEndLumi::slot_type const &iSlot)
void watchPreModuleBeginJob(PreModuleBeginJob::slot_type const &iSlot)
void postModuleEvent(edm::StreamContext const &, edm::ModuleCallingContext const &)
void postStreamBeginLumi(edm::StreamContext const &)
void postStreamEndLumi(edm::StreamContext const &)
void preStreamBeginLumi(edm::StreamContext const &)
std::atomic< unsigned long > totalEventsProcessed_
void watchPreStreamBeginLumi(PreStreamBeginLumi::slot_type const &iSlot)
void preEvent(edm::StreamContext const &)
void preSourceEarlyTermination(edm::TerminationOrigin)
void watchPreSourceEvent(PreSourceEvent::slot_type const &iSlot)
void preModuleEvent(edm::StreamContext const &, edm::ModuleCallingContext const &)
void watchPostBeginJob(PostBeginJob::slot_type const &iSlot)
convenience function for attaching to signal
evf::FastMonitoringService::~FastMonitoringService ( )

Definition at line 78 of file FastMonitoringService.cc.

79  {
80  }

Member Function Documentation

void evf::FastMonitoringService::accumulateFileSize ( unsigned int  lumi,
unsigned long  fileSize 
)

Definition at line 540 of file FastMonitoringService.cc.

References accuSize_, filesProcessedDuringLumi_, fmt_, fff_deleter::lock(), fjr2json::lumi, and evf::FastMonitoringThread::monlock_.

Referenced by evf::EvFDaqDirector::bumpFile().

540  {
541  std::lock_guard<std::mutex> lock(fmt_.monlock_);
542 
543  if (accuSize_.find(lumi)==accuSize_.end()) accuSize_[lumi] = fileSize;
544  else accuSize_[lumi] += fileSize;
545 
548  else
550  }
std::map< unsigned int, unsigned long > accuSize_
tuple lumi
Definition: fjr2json.py:35
std::map< unsigned int, unsigned int > filesProcessedDuringLumi_
void evf::FastMonitoringService::doSnapshot ( const unsigned int  ls,
const bool  isGlobalEOL 
)
private

Definition at line 602 of file FastMonitoringService.cc.

References avgLeadTime_, encModule_, evf::FastMonitoringService::Encoding::encode(), encPath_, evf::FastMonitoringThread::MonitorData::fastAvgLeadTimeJ_, evf::FastMonitoringThread::MonitorData::fastFilesProcessedJ_, evf::FastMonitoringThread::MonitorData::fastMacrostateJ_, filesProcessedDuringLumi_, fmt_, i, isGlobalLumiTransition_, evf::FastMonitoringThread::jsonMonitor_, evf::FastMonitoringThread::m_data, macrostate_, microstate_, evf::FastMonitoringThread::MonitorData::microstateEncoded_, ministate_, evf::FastMonitoringThread::MonitorData::ministateEncoded_, and nStreams_.

Referenced by dowork(), and preGlobalEndLumi().

602  {
603  // update macrostate
605 
606  //update these unless in the midst of a global transition
608 
609  auto itd = avgLeadTime_.find(ls);
610  if (itd != avgLeadTime_.end())
611  fmt_.m_data.fastAvgLeadTimeJ_ = itd->second;
612  else fmt_.m_data.fastAvgLeadTimeJ_=0.;
613 
614  auto iti = filesProcessedDuringLumi_.find(ls);
615  if (iti != filesProcessedDuringLumi_.end())
616  fmt_.m_data.fastFilesProcessedJ_ = iti->second;
618  }
619  else return;
620 
621  //capture latest mini/microstate of streams
622  for (unsigned int i=0;i<nStreams_;i++) {
625  }
626  //for (unsigned int i=0;i<nThreads_;i++)
627  // fmt_.m_data.threadMicrostateEncoded_[i] = encModule_.encode(threadMicrostate_[i]);
628 
629  if (isGlobalEOL)
630  {//only update global variables
631  fmt_.jsonMonitor_->snapGlobal(ls);
632  }
633  else
634  fmt_.jsonMonitor_->snap(ls);
635  }
int i
Definition: DBlmapReader.cc:9
std::map< unsigned int, unsigned int > filesProcessedDuringLumi_
std::map< unsigned int, double > avgLeadTime_
std::vector< const void * > microstate_
std::vector< unsigned int > microstateEncoded_
FastMonitoringThread::Macrostate macrostate_
std::unique_ptr< FastMonitor > jsonMonitor_
std::vector< const void * > ministate_
std::vector< Encoding > encPath_
std::vector< unsigned int > ministateEncoded_
void evf::FastMonitoringService::doStreamEOLSnapshot ( const unsigned int  ls,
const unsigned int  streamID 
)
inlineprivate

Definition at line 159 of file FastMonitoringService.h.

References fmt_, and evf::FastMonitoringThread::jsonMonitor_.

Referenced by preStreamEndLumi().

159  {
160  //pick up only event count here
161  fmt_.jsonMonitor_->snapStreamAtomic(ls,streamID);
162  }
std::unique_ptr< FastMonitor > jsonMonitor_
void evf::FastMonitoringService::dowork ( )
inlineprivate

Definition at line 164 of file FastMonitoringService.h.

References doSnapshot(), encModule_, evf::FastMonitoringService::Encoding::encode(), encPath_, evf::FastMonitoringThread::MonitorData::fastMacrostateJ_, fastMonIntervals_, fastPath_, fmt_, evf::FastMonitoringThread::jsonMonitor_, lastGlobalLumi_, fff_deleter::lock(), evf::FastMonitoringThread::m_data, evf::FastMonitoringThread::m_stoprequest, microstate_, ministate_, monInit_, evf::FastMonitoringThread::monlock_, sleepTime_, snapCounter_, AlCaHLTBitMon_QueryRunRegistry::string, and jsoncollector::IntJ::value().

Referenced by preallocate().

164  { // the function to be called in the thread. Thread completes when function returns.
165  monInit_.exchange(true,std::memory_order_acquire);
166  while (!fmt_.m_stoprequest) {
167  edm::LogInfo("FastMonitoringService") << "Current states: Ms=" << fmt_.m_data.fastMacrostateJ_.value()
168  << " ms=" << encPath_[0].encode(ministate_[0])
169  << " us=" << encModule_.encode(microstate_[0]) << std::endl;
170 
171  {
172  std::lock_guard<std::mutex> lock(fmt_.monlock_);
173 
175 
177  std::string CSV = fmt_.jsonMonitor_->getCSVString();
178  //release mutex before writing out fast path file
179  fmt_.monlock_.unlock();
180  if (CSV.size())
181  fmt_.jsonMonitor_->outputCSV(fastPath_,CSV);
182  }
183 
184  snapCounter_++;
185 
186  }
187  ::sleep(sleepTime_);
188  }
189  }
void doSnapshot(const unsigned int ls, const bool isGlobalEOL)
std::vector< const void * > microstate_
std::atomic< bool > m_stoprequest
std::unique_ptr< FastMonitor > jsonMonitor_
std::vector< const void * > ministate_
std::vector< Encoding > encPath_
unsigned int evf::FastMonitoringService::getEventsProcessedForLumi ( unsigned int  lumi)

Definition at line 587 of file FastMonitoringService.cc.

References edm::hlt::Exception, fmt_, fff_deleter::lock(), fjr2json::lumi, evf::FastMonitoringThread::monlock_, proc, and processedEventsPerLumi_.

Referenced by DQMFileSaver::fillJson(), and DQMFileSaver::globalEndLuminosityBlock().

587  {
588  std::lock_guard<std::mutex> lock(fmt_.monlock_);
589 
590  auto it = processedEventsPerLumi_.find(lumi);
591  if (it!=processedEventsPerLumi_.end()) {
592  unsigned int proc = it->second;
593  return proc;
594  }
595  else {
596  throw cms::Exception("FastMonitoringService") << "output module wants already deleted (or never reported by SOURCE) lumisection event count for LUMI -: "<<lumi;
597  return 0;
598  }
599  }
TrainProcessor *const proc
Definition: MVATrainer.cc:101
tuple lumi
Definition: fjr2json.py:35
std::map< unsigned int, unsigned int > processedEventsPerLumi_
std::string evf::FastMonitoringService::getRunDirName ( ) const
inline

Definition at line 153 of file FastMonitoringService.h.

References runDirectory_.

153 { return runDirectory_.stem().string(); }
boost::filesystem::path runDirectory_
void evf::FastMonitoringService::jobFailure ( )
std::string evf::FastMonitoringService::makeModuleLegenda ( )

Definition at line 94 of file FastMonitoringService.cc.

References evf::FastMonitoringService::Encoding::current_, evf::FastMonitoringService::Encoding::decode(), encModule_, and i.

Referenced by postBeginJob().

95  {
96  std::ostringstream ost;
97  for(int i = 0; i < encModule_.current_; i++)
98  ost<<i<<"="<<((const edm::ModuleDescription *)(encModule_.decode(i)))->moduleLabel()<<" ";
99  return ost.str();
100  }
int i
Definition: DBlmapReader.cc:9
const void * decode(unsigned int index)
std::string evf::FastMonitoringService::makePathLegenda ( )

Definition at line 83 of file FastMonitoringService.cc.

References edm::decode(), encPath_, i, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by prePathEvent().

83  {
84  //taken from first stream
85  std::ostringstream ost;
86  for(int i = 0;
87  i < encPath_[0].current_;
88  i++)
89  ost<<i<<"="<<*((std::string *)(encPath_[0].decode(i)))<<" ";
90  return ost.str();
91  }
int i
Definition: DBlmapReader.cc:9
bool decode(bool &, std::string const &)
Definition: types.cc:62
std::vector< Encoding > encPath_
void evf::FastMonitoringService::postBeginJob ( )

Definition at line 269 of file FastMonitoringService.cc.

References encModule_, fmt_, fff_deleter::lock(), evf::FastMonitoringThread::m_data, macrostate_, makeModuleLegenda(), evf::FastMonitoringThread::MonitorData::microstateBins_, moduleLegendFile_, evf::FastMonitoringThread::monlock_, evf::FastMonitoringThread::sJobReady, AlCaHLTBitMon_QueryRunRegistry::string, and evf::FastMonitoringService::Encoding::vecsize().

Referenced by FastMonitoringService().

270  {
271  std::string && moduleLegStr = makeModuleLegenda();
272  FileIO::writeStringToFile(moduleLegendFile_, moduleLegStr);
273 
275 
276  //update number of entries in module histogram
277  std::lock_guard<std::mutex> lock(fmt_.monlock_);
279  }
FastMonitoringThread::Macrostate macrostate_
void evf::FastMonitoringService::postEndJob ( )
void evf::FastMonitoringService::postEvent ( edm::StreamContext const &  sc)

Definition at line 475 of file FastMonitoringService.cc.

References eventCountForPathInit_, evf::FastMonitoringThread::MonitorData::fastPathProcessedJ_, fmt_, evf::FastMonitoringThread::m_data, microstate_, evf::MicroStateService::mIdle, ministate_, nopath_, evf::FastMonitoringThread::MonitorData::processed_, evf::MicroStateService::reservedMicroStateNames, streamCounterUpdating_, edm::StreamContext::streamID(), and totalEventsProcessed_.

Referenced by FastMonitoringService().

476  {
477  microstate_[sc.streamID()] = &reservedMicroStateNames[mIdle];
478 
479  ministate_[sc.streamID()] = &nopath_;
480 
481  #if ATOMIC_LEVEL>=2
482  //use atomic flag to make sure end of lumi sees this
483  streamCounterUpdating_[sc.streamID()]->store(true,std::memory_order_release);
484  fmt_.m_data.processed_[sc.streamID()]->fetch_add(1,std::memory_order_release);
485  streamCounterUpdating_[sc.streamID()]->store(false,std::memory_order_release);
486 
487  #elif ATOMIC_LEVEL==1
488  //writes are atomic, we assume writes propagate to memory before stream EOL snap
489  fmt_.m_data.processed_[sc.streamID()]->fetch_add(1,std::memory_order_relaxed);
490 
491  #elif ATOMIC_LEVEL==0 //default
492  (*(fmt_.m_data.processed_[sc.streamID()]))++;
493  #endif
494  eventCountForPathInit_[sc.streamID()]++;
495 
496  //fast path counter (events accumulated in a run)
497  unsigned long res = totalEventsProcessed_.fetch_add(1,std::memory_order_relaxed);
499  //fmt_.m_data.fastPathProcessedJ_ = totalEventsProcessed_.load(std::memory_order_relaxed);
500  }
static const edm::ModuleDescription reservedMicroStateNames[mCOUNT]
std::vector< AtomicMonUInt * > processed_
std::vector< unsigned int > eventCountForPathInit_
std::vector< std::atomic< bool > * > streamCounterUpdating_
static const std::string nopath_
std::vector< const void * > microstate_
std::atomic< unsigned long > totalEventsProcessed_
std::vector< const void * > ministate_
void evf::FastMonitoringService::postGlobalBeginRun ( edm::GlobalContext const &  gc)
void evf::FastMonitoringService::postGlobalEndLumi ( edm::GlobalContext const &  gc)

Definition at line 392 of file FastMonitoringService.cc.

References lastGlobalLumisClosed_, edm::LuminosityBlockID::luminosityBlock(), and edm::GlobalContext::luminosityBlockID().

Referenced by FastMonitoringService().

393  {
394  //mark closed lumis (still keep map entries until next one)
395  lastGlobalLumisClosed_.push(gc.luminosityBlockID().luminosityBlock());
396  }
std::queue< unsigned int > lastGlobalLumisClosed_
void evf::FastMonitoringService::postModuleEvent ( edm::StreamContext const &  sc,
edm::ModuleCallingContext const &  mcc 
)

Definition at line 517 of file FastMonitoringService.cc.

References evf::MicroStateService::mFwkOvh, microstate_, evf::MicroStateService::reservedMicroStateNames, edm::StreamContext::streamID(), and edm::StreamID::value().

Referenced by FastMonitoringService().

518  {
519  //microstate_[sc.streamID().value()] = (void*)(mcc.moduleDescription());
520  microstate_[sc.streamID().value()] = &reservedMicroStateNames[mFwkOvh];
521  }
static const edm::ModuleDescription reservedMicroStateNames[mCOUNT]
std::vector< const void * > microstate_
void evf::FastMonitoringService::postSourceEvent ( edm::StreamID  sid)

Definition at line 507 of file FastMonitoringService.cc.

References evf::MicroStateService::mFwkOvh, microstate_, evf::MicroStateService::reservedMicroStateNames, and edm::StreamID::value().

Referenced by FastMonitoringService().

508  {
510  }
static const edm::ModuleDescription reservedMicroStateNames[mCOUNT]
std::vector< const void * > microstate_
unsigned int value() const
Definition: StreamID.h:46
void evf::FastMonitoringService::postStreamBeginLumi ( edm::StreamContext const &  sc)
void evf::FastMonitoringService::postStreamEndLumi ( edm::StreamContext const &  sc)
void evf::FastMonitoringService::preallocate ( edm::service::SystemBounds const &  bounds)

Definition at line 103 of file FastMonitoringService.cc.

References collectedPathList_, evf::FastMonitoringService::Encoding::completeReservedWithDummies(), dowork(), encModule_, encPath_, eventCountForPathInit_, edm::hlt::Exception, fastMicrostateDefPath_, fastName_, fastPath_, firstEventId_, fmt_, i, isGlobalLumiTransition_, evf::FastMonitoringThread::jsonMonitor_, lastGlobalLumi_, LogDebug, lumiFromSource_, evf::FastMonitoringThread::m_data, macrostate_, evf::FastMonitoringThread::MonitorData::macrostateBins_, edm::service::SystemBounds::maxNumberOfStreams(), edm::service::SystemBounds::maxNumberOfThreads(), evf::FastMonitoringThread::MCOUNT, evf::MicroStateService::mCOUNT, microstate_, evf::FastMonitoringThread::MonitorData::microstateBins_, microstateDefPath_, ministate_, evf::FastMonitoringThread::MonitorData::ministateBins_, evf::MicroStateService::mInvalid, moduleLegendFile_, monInit_, nopath_, nStreams_, nThreads_, cppFunctionSkipper::operator, cmsHarvester::path, pathLegendFile_, evf::FastMonitoringThread::MonitorData::registerVariables(), evf::MicroStateService::reservedMicroStateNames, evf::FastMonitoringThread::resetFastMonitor(), runDirectory_, evf::FastMonitoringThread::sInit, evf::FastMonitoringThread::start(), streamCounterUpdating_, threadIDAvailable_, evf::FastMonitoringService::Encoding::updateReserved(), and workingDirectory_.

Referenced by FastMonitoringService().

104  {
105 
106  // FIND RUN DIRECTORY
107  // The run dir should be set via the configuration of EvFDaqDirector
108 
109  if (edm::Service<evf::EvFDaqDirector>().operator->()==nullptr)
110  {
111  throw cms::Exception("FastMonitoringService") << "EvFDaqDirector is not present";
112 
113  }
114  boost::filesystem::path runDirectory(edm::Service<evf::EvFDaqDirector>()->baseRunDir());
115  workingDirectory_ = runDirectory_ = runDirectory;
116  workingDirectory_ /= "mon";
117 
118  if ( !boost::filesystem::is_directory(workingDirectory_)) {
119  LogDebug("FastMonitoringService") << "<MON> DIR NOT FOUND! Trying to create -: " << workingDirectory_.string() ;
120  boost::filesystem::create_directories(workingDirectory_);
121  if ( !boost::filesystem::is_directory(workingDirectory_))
122  edm::LogWarning("FastMonitoringService") << "Unable to create <MON> DIR -: " << workingDirectory_.string()
123  << ". No monitoring data will be written.";
124  }
125 
126  std::ostringstream fastFileName;
127 
128  fastFileName << fastName_ << "_pid" << std::setfill('0') << std::setw(5) << getpid() << ".fast";
130  fast /= fastFileName.str();
131  fastPath_ = fast.string();
132 
133  std::ostringstream moduleLegFile;
134  moduleLegFile << "microstatelegend_pid" << std::setfill('0') << std::setw(5) << getpid() << ".leg";
135  moduleLegendFile_ = (workingDirectory_/moduleLegFile.str()).string();
136 
137  std::ostringstream pathLegFile;
138  pathLegFile << "pathlegend_pid" << std::setfill('0') << std::setw(5) << getpid() << ".leg";
139  pathLegendFile_ = (workingDirectory_/pathLegFile.str()).string();
140 
141  LogDebug("FastMonitoringService") << "Initializing FastMonitor with microstate def path -: "
143  //<< encPath_.current_ + 1 << " " << encModule_.current_ + 1
144 
145  nStreams_=bounds.maxNumberOfStreams();
146  nThreads_=bounds.maxNumberOfThreads();
147 
148  //this should already be >=1
149  if (nStreams_==0) nStreams_=1;
150  if (nThreads_==0) nThreads_=1;
151 
152  /*
153  * initialize the fast monitor with:
154  * vector of pointers to monitorable parameters
155  * path to definition
156  *
157  */
158 
160 
161  for(unsigned int i = 0; i < (mCOUNT); i++)
164 
165  for (unsigned int i=0;i<nStreams_;i++) {
166  ministate_.push_back(&nopath_);
168 
169  //for synchronization
170  streamCounterUpdating_.push_back(new std::atomic<bool>(0));
171 
172  //path (mini) state
173  encPath_.emplace_back(0);
174  encPath_[i].update((void*)&nopath_);
175  eventCountForPathInit_.push_back(0);
176  firstEventId_.push_back(0);
177  collectedPathList_.push_back(new std::atomic<bool>(0));
178 
179  }
180  //for (unsigned int i=0;i<nThreads_;i++)
181  // threadMicrostate_.push_back(&reservedMicroStateNames[mInvalid]);
182 
183  //initial size until we detect number of bins
187 
188  lastGlobalLumi_=0;
190  lumiFromSource_=0;
191 
192  //startup monitoring
194  fmt_.jsonMonitor_->setNStreams(nStreams_);
196  monInit_.store(false,std::memory_order_release);
198 
199  //this definition needs: #include "tbb/compat/thread"
200  //however this would results in TBB imeplementation replacing std::thread
201  //(both supposedly call pthread_self())
202  //number of threads created in process could be obtained from /proc,
203  //assuming that all posix threads are true kernel threads capable of running in parallel
204 
205  //#if TBB_IMPLEMENT_CPP0X
207  //threadIDAvailable_=true;
208  //#endif
209 
210  }
#define LogDebug(id)
int i
Definition: DBlmapReader.cc:9
static const edm::ModuleDescription reservedMicroStateNames[mCOUNT]
boost::filesystem::path runDirectory_
void start(void(FastMonitoringService::*fp)(), FastMonitoringService *cp)
std::vector< unsigned int > eventCountForPathInit_
std::vector< std::atomic< bool > * > streamCounterUpdating_
std::vector< std::atomic< bool > * > collectedPathList_
tuple path
else: Piece not in the list, fine.
static const std::string nopath_
void resetFastMonitor(std::string const &microStateDefPath, std::string const &fastMicroStateDefPath)
std::vector< unsigned long > firstEventId_
std::vector< const void * > microstate_
void registerVariables(FastMonitor *fm, unsigned int nStreams, unsigned int nThreads)
FastMonitoringThread::Macrostate macrostate_
boost::filesystem::path workingDirectory_
std::unique_ptr< FastMonitor > jsonMonitor_
std::vector< const void * > ministate_
std::vector< Encoding > encPath_
void evf::FastMonitoringService::preEvent ( edm::StreamContext const &  sc)

Definition at line 471 of file FastMonitoringService.cc.

Referenced by FastMonitoringService().

472  {
473  }
void evf::FastMonitoringService::preGlobalBeginLumi ( edm::GlobalContext const &  gc)

Definition at line 292 of file FastMonitoringService.cc.

References accuSize_, avgLeadTime_, filesProcessedDuringLumi_, fmt_, isGlobalLumiTransition_, lastGlobalLumi_, lastGlobalLumisClosed_, fff_deleter::lock(), edm::LuminosityBlockID::luminosityBlock(), edm::GlobalContext::luminosityBlockID(), lumiStartTime_, evf::FastMonitoringThread::monlock_, and processedEventsPerLumi_.

Referenced by FastMonitoringService().

293  {
294 
295  timeval lumiStartTime;
296  gettimeofday(&lumiStartTime, 0);
297  unsigned int newLumi = gc.luminosityBlockID().luminosityBlock();
298 
299  std::lock_guard<std::mutex> lock(fmt_.monlock_);
300 
301  lumiStartTime_[newLumi]=lumiStartTime;
302  while (!lastGlobalLumisClosed_.empty()) {
303  //wipe out old map entries as they aren't needed and slow down access
304  unsigned int oldLumi = lastGlobalLumisClosed_.back();
306  lumiStartTime_.erase(oldLumi);
307  avgLeadTime_.erase(oldLumi);
308  filesProcessedDuringLumi_.erase(oldLumi);
309  accuSize_.erase(oldLumi);
310  processedEventsPerLumi_.erase(oldLumi);
311  }
312  lastGlobalLumi_= newLumi;
314  }
std::map< unsigned int, timeval > lumiStartTime_
std::map< unsigned int, unsigned long > accuSize_
std::map< unsigned int, unsigned int > filesProcessedDuringLumi_
std::queue< unsigned int > lastGlobalLumisClosed_
std::map< unsigned int, double > avgLeadTime_
std::map< unsigned int, unsigned int > processedEventsPerLumi_
void evf::FastMonitoringService::preGlobalEarlyTermination ( edm::GlobalContext const &  gc,
edm::TerminationOrigin  to 
)

Definition at line 225 of file FastMonitoringService.cc.

References edm::ExceptionFromAnotherContext, edm::ExceptionFromThisContext, exceptionInLS_, edm::ExternalSignal, fmt_, fff_deleter::lock(), edm::LuminosityBlockID::luminosityBlock(), edm::GlobalContext::luminosityBlockID(), evf::FastMonitoringThread::monlock_, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by FastMonitoringService().

226  {
227  std::string context;
228  if (to==edm::TerminationOrigin::ExceptionFromThisContext) context = " FromThisContext ";
229  if (to==edm::TerminationOrigin::ExceptionFromAnotherContext) context = " FromAnotherContext";
230  if (to==edm::TerminationOrigin::ExternalSignal) context = " FromExternalSignal";
231  edm::LogInfo("FastMonitoringService") << " GLOBAL " << "earlyTermination -: LS:"
232  << gc.luminosityBlockID().luminosityBlock() << " " << context;
233  std::lock_guard<std::mutex> lock(fmt_.monlock_);
234  exceptionInLS_.push_back(gc.luminosityBlockID().luminosityBlock());
235  //exception_detected_=true;
236  }
std::vector< unsigned int > exceptionInLS_
void evf::FastMonitoringService::preGlobalEndLumi ( edm::GlobalContext const &  gc)

Definition at line 316 of file FastMonitoringService.cc.

References accuSize_, doSnapshot(), edm::hlt::Exception, exception_detected_, exceptionInLS_, evf::FastMonitoringThread::MonitorData::fastThroughputJ_, fmt_, isGlobalLumiTransition_, evf::FastMonitoringThread::jsonMonitor_, fff_deleter::lock(), LogDebug, fjr2json::lumi, edm::LuminosityBlockID::luminosityBlock(), edm::GlobalContext::luminosityBlockID(), lumiStartTime_, evf::FastMonitoringThread::m_data, evf::FastMonitoringThread::monlock_, cmsHarvester::path, processedEventsPerLumi_, edm::shutdown_flag, slowName_, sourceEventsReport_, throughputFactor(), jsoncollector::IntJ::value(), jsoncollector::DoubleJ::value(), and workingDirectory_.

Referenced by FastMonitoringService().

317  {
318  unsigned int lumi = gc.luminosityBlockID().luminosityBlock();
319  LogDebug("FastMonitoringService") << "Lumi ended. Writing JSON information. LUMI -: "
320  << lumi;
321  timeval lumiStopTime;
322  gettimeofday(&lumiStopTime, 0);
323 
324  std::lock_guard<std::mutex> lock(fmt_.monlock_);
325 
326  // Compute throughput
327  timeval stt = lumiStartTime_[lumi];
328  unsigned long usecondsForLumi = (lumiStopTime.tv_sec - stt.tv_sec)*1000000
329  + (lumiStopTime.tv_usec - stt.tv_usec);
330  unsigned long accuSize = accuSize_.find(lumi)==accuSize_.end() ? 0 : accuSize_[lumi];
331  double throughput = throughputFactor()* double(accuSize) / double(usecondsForLumi);
332  //store to registered variable
333  fmt_.m_data.fastThroughputJ_.value() = throughput;
334 
335  //update
336  doSnapshot(lumi,true);
337 
338  // create file name for slow monitoring file
339  std::stringstream slowFileName;
340  slowFileName << slowName_ << "_ls" << std::setfill('0') << std::setw(4)
341  << lumi << "_pid" << std::setfill('0')
342  << std::setw(5) << getpid() << ".jsn";
344  slow /= slowFileName.str();
345 
346  //retrieve one result we need (todo: sanity check if it's found)
347  IntJ *lumiProcessedJptr = dynamic_cast<IntJ*>(fmt_.jsonMonitor_->getMergedIntJForLumi("Processed",lumi));
348  if (!lumiProcessedJptr)
349  throw cms::Exception("FastMonitoringService") << "Internal error: got null pointer from FastMonitor";
350  processedEventsPerLumi_[lumi] = lumiProcessedJptr->value();
351 
352  {
353  auto itr = sourceEventsReport_.find(lumi);
354  if (itr==sourceEventsReport_.end()) {
355  //check if exception has been thrown (in case of Global/Stream early termination, for this LS)
356  bool exception_detected = exception_detected_;
357  for (auto ex : exceptionInLS_)
358  if (lumi == ex) exception_detected=true;
359 
360  if (edm::shutdown_flag || exception_detected) {
361  edm::LogInfo("FastMonitoringService") << "Run interrupted. Skip writing EoL information -: "
362  << processedEventsPerLumi_[lumi] << " events were processed in LUMI " << lumi;
363  //this will prevent output modules from producing json file for possibly incomplete lumi
365  return;
366  }
367  //disable this exception, so service can be used standalone (will be thrown if output module asks for this information)
368  //throw cms::Exception("FastMonitoringService") << "SOURCE did not send update for lumi block. LUMI -:" << lumi;
369  }
370  else {
371  if (itr->second!=processedEventsPerLumi_[lumi]) {
372  throw cms::Exception("FastMonitoringService") << "MISMATCH with SOURCE update. LUMI -: "
373  << lumi
374  << ", events(processed):" << processedEventsPerLumi_[lumi]
375  << " events(source):" << itr->second;
376  }
377  sourceEventsReport_.erase(itr);
378  }
379  }
380  edm::LogInfo("FastMonitoringService") << "Statistics for lumisection -: lumi = " << lumi << " events = "
381  << lumiProcessedJptr->value() << " time = " << usecondsForLumi/1000000
382  << " size = " << accuSize << " thr = " << throughput;
383  delete lumiProcessedJptr;
384 
385  //full global and stream merge&output for this lumi
386  fmt_.jsonMonitor_->outputFullJSON(slow.string(),lumi);//full global and stream merge and JSON write for this lumi
387  fmt_.jsonMonitor_->discardCollected(lumi);//we don't do further updates for this lumi
388 
390  }
#define LogDebug(id)
std::map< unsigned int, timeval > lumiStartTime_
double throughputFactor()
std::map< unsigned int, unsigned long > accuSize_
tuple lumi
Definition: fjr2json.py:35
volatile std::atomic< bool > shutdown_flag
tuple path
else: Piece not in the list, fine.
void doSnapshot(const unsigned int ls, const bool isGlobalEOL)
boost::filesystem::path workingDirectory_
std::map< unsigned int, unsigned int > processedEventsPerLumi_
std::map< unsigned int, unsigned int > sourceEventsReport_
std::unique_ptr< FastMonitor > jsonMonitor_
std::vector< unsigned int > exceptionInLS_
void evf::FastMonitoringService::preModuleBeginJob ( edm::ModuleDescription const &  desc)

Definition at line 255 of file FastMonitoringService.cc.

References encModule_, fmt_, fff_deleter::lock(), edm::ModuleDescription::moduleName(), evf::FastMonitoringThread::monlock_, evf::FastMonitoringService::Encoding::update(), and evf::FastMonitoringService::Encoding::updateReserved().

Referenced by FastMonitoringService().

256  {
257  std::lock_guard<std::mutex> lock(fmt_.monlock_);
258  //std::cout << " Pre module Begin Job module: " << desc.moduleName() << std::endl;
259 
260  //build a map of modules keyed by their module description address
261  //here we need to treat output modules in a special way so they can be easily singled out
262  if(desc.moduleName() == "Stream" || desc.moduleName() == "ShmStreamConsumer" ||
263  desc.moduleName() == "EventStreamFileWriter" || desc.moduleName() == "PoolOutputModule")
264  encModule_.updateReserved((void*)&desc);
265  else
266  encModule_.update((void*)&desc);
267  }
void evf::FastMonitoringService::preModuleEvent ( edm::StreamContext const &  sc,
edm::ModuleCallingContext const &  mcc 
)

Definition at line 512 of file FastMonitoringService.cc.

References microstate_, edm::ModuleCallingContext::moduleDescription(), edm::StreamContext::streamID(), and edm::StreamID::value().

Referenced by FastMonitoringService().

513  {
514  microstate_[sc.streamID().value()] = (void*)(mcc.moduleDescription());
515  }
std::vector< const void * > microstate_
void evf::FastMonitoringService::prePathEvent ( edm::StreamContext const &  sc,
edm::PathContext const &  pc 
)

Definition at line 438 of file FastMonitoringService.cc.

References collectedPathList_, encPath_, edm::EventID::event(), eventCountForPathInit_, edm::StreamContext::eventID(), firstEventId_, fmt_, fff_deleter::lock(), evf::FastMonitoringThread::m_data, makePathLegenda(), ministate_, evf::FastMonitoringThread::MonitorData::ministateBins_, evf::FastMonitoringThread::monlock_, pathLegendFile_, pathLegendWritten_, edm::PathContext::pathName(), edm::StreamContext::streamID(), AlCaHLTBitMon_QueryRunRegistry::string, and unlikely.

Referenced by FastMonitoringService().

439  {
440  //make sure that all path names are retrieved before allowing ministate to change
441  //hack: assume memory is synchronized after ~50 events seen by each stream
442  if (unlikely(eventCountForPathInit_[sc.streamID()]<50) && false==collectedPathList_[sc.streamID()]->load(std::memory_order_acquire))
443  {
444  //protection between stream threads, as well as the service monitoring thread
445  std::lock_guard<std::mutex> lock(fmt_.monlock_);
446 
447  if (firstEventId_[sc.streamID()]==0)
448  firstEventId_[sc.streamID()]=sc.eventID().event();
449  if (sc.eventID().event()==firstEventId_[sc.streamID()])
450  {
451  encPath_[sc.streamID()].update((void*)&pc.pathName());
452  return;
453  }
454  else {
455  //finished collecting path names
456  collectedPathList_[sc.streamID()]->store(true,std::memory_order_seq_cst);
457  fmt_.m_data.ministateBins_=encPath_[sc.streamID()].vecsize();
458  if (!pathLegendWritten_) {
459  std::string pathLegendStr = makePathLegenda();
460  FileIO::writeStringToFile(pathLegendFile_, pathLegendStr);
461  pathLegendWritten_=true;
462  }
463  }
464  }
465  else {
466  ministate_[sc.streamID()] = &(pc.pathName());
467  }
468  }
std::vector< unsigned int > eventCountForPathInit_
#define unlikely(x)
std::vector< std::atomic< bool > * > collectedPathList_
std::vector< unsigned long > firstEventId_
std::vector< const void * > ministate_
std::vector< Encoding > encPath_
void evf::FastMonitoringService::preSourceEarlyTermination ( edm::TerminationOrigin  to)
void evf::FastMonitoringService::preSourceEvent ( edm::StreamID  sid)

Definition at line 502 of file FastMonitoringService.cc.

References microstate_, evf::MicroStateService::mInput, evf::MicroStateService::reservedMicroStateNames, and edm::StreamID::value().

Referenced by FastMonitoringService().

503  {
505  }
static const edm::ModuleDescription reservedMicroStateNames[mCOUNT]
std::vector< const void * > microstate_
unsigned int value() const
Definition: StreamID.h:46
void evf::FastMonitoringService::preStreamBeginLumi ( edm::StreamContext const &  sc)

Definition at line 398 of file FastMonitoringService.cc.

References edm::StreamContext::eventID(), fmt_, fff_deleter::lock(), edm::EventID::luminosityBlock(), evf::FastMonitoringThread::m_data, evf::MicroStateService::mFwkOvh, microstate_, ministate_, evf::FastMonitoringThread::monlock_, nopath_, evf::FastMonitoringThread::MonitorData::processed_, evf::MicroStateService::reservedMicroStateNames, edm::StreamContext::streamID(), evf::FastMonitoringThread::MonitorData::streamLumi_, and edm::StreamID::value().

Referenced by FastMonitoringService().

399  {
400  unsigned int sid = sc.streamID().value();
401  std::lock_guard<std::mutex> lock(fmt_.monlock_);
402  fmt_.m_data.streamLumi_[sid] = sc.eventID().luminosityBlock();
403 
404  //reset collected values for this stream
405  *(fmt_.m_data.processed_[sid])=0;
406 
407  ministate_[sid]=&nopath_;
409  }
static const edm::ModuleDescription reservedMicroStateNames[mCOUNT]
std::vector< AtomicMonUInt * > processed_
static const std::string nopath_
std::vector< const void * > microstate_
std::vector< const void * > ministate_
std::vector< unsigned int > streamLumi_
void evf::FastMonitoringService::preStreamEarlyTermination ( edm::StreamContext const &  sc,
edm::TerminationOrigin  to 
)

Definition at line 212 of file FastMonitoringService.cc.

References edm::StreamContext::eventID(), edm::ExceptionFromAnotherContext, edm::ExceptionFromThisContext, exceptionInLS_, edm::ExternalSignal, fmt_, fff_deleter::lock(), edm::EventID::luminosityBlock(), evf::FastMonitoringThread::monlock_, edm::StreamContext::streamID(), AlCaHLTBitMon_QueryRunRegistry::string, and edm::StreamID::value().

Referenced by FastMonitoringService().

213  {
214  std::string context;
215  if (to==edm::TerminationOrigin::ExceptionFromThisContext) context = " FromThisContext ";
216  if (to==edm::TerminationOrigin::ExceptionFromAnotherContext) context = " FromAnotherContext";
217  if (to==edm::TerminationOrigin::ExternalSignal) context = " FromExternalSignal";
218  edm::LogInfo("FastMonitoringService") << " STREAM " << sc.streamID().value() << " earlyTermination -: ID:"<< sc.eventID()
219  << " LS:" << sc.eventID().luminosityBlock() << " " << context;
220  std::lock_guard<std::mutex> lock(fmt_.monlock_);
221  exceptionInLS_.push_back(sc.eventID().luminosityBlock());
222  //exception_detected_=true;
223  }
std::vector< unsigned int > exceptionInLS_
void evf::FastMonitoringService::preStreamEndLumi ( edm::StreamContext const &  sc)

Definition at line 416 of file FastMonitoringService.cc.

References doStreamEOLSnapshot(), edm::StreamContext::eventID(), fmt_, svgfig::load(), fff_deleter::lock(), edm::EventID::luminosityBlock(), evf::MicroStateService::mEoL, microstate_, ministate_, evf::FastMonitoringThread::monlock_, nopath_, evf::MicroStateService::reservedMicroStateNames, streamCounterUpdating_, edm::StreamContext::streamID(), and edm::StreamID::value().

Referenced by FastMonitoringService().

417  {
418  unsigned int sid = sc.streamID().value();
419  std::lock_guard<std::mutex> lock(fmt_.monlock_);
420 
421  #if ATOMIC_LEVEL>=2
422  //spinlock to make sure we are not still updating event counter somewhere
423  while (streamCounterUpdating_[sid]->load(std::memory_order_acquire)) {}
424  #endif
425 
426  //update processed count to be complete at this time
427  doStreamEOLSnapshot(sc.eventID().luminosityBlock(),sid);
428  //reset this in case stream does not get notified of next lumi (we keep processed events only)
429  ministate_[sid]=&nopath_;
431  }
static const edm::ModuleDescription reservedMicroStateNames[mCOUNT]
std::vector< std::atomic< bool > * > streamCounterUpdating_
void doStreamEOLSnapshot(const unsigned int ls, const unsigned int streamID)
static const std::string nopath_
def load
Definition: svgfig.py:546
std::vector< const void * > microstate_
std::vector< const void * > ministate_
void evf::FastMonitoringService::reportEventsThisLumiInSource ( unsigned int  lumi,
unsigned int  events 
)

Definition at line 637 of file FastMonitoringService.cc.

References fmt_, fff_deleter::lock(), evf::FastMonitoringThread::monlock_, and sourceEventsReport_.

Referenced by FedRawDataInputSource::checkNextEvent(), and FedRawDataInputSource::getNextEvent().

638  {
639 
640  std::lock_guard<std::mutex> lock(fmt_.monlock_);
641  auto itr = sourceEventsReport_.find(lumi);
642  if (itr!=sourceEventsReport_.end())
643  itr->second+=events;
644  else
646 
647  }
tuple lumi
Definition: fjr2json.py:35
tuple events
Definition: patZpeak.py:19
std::map< unsigned int, unsigned int > sourceEventsReport_
void evf::FastMonitoringService::setMicroState ( MicroStateService::Microstate  m)
virtual

Implements evf::MicroStateService.

Definition at line 527 of file FastMonitoringService.cc.

References i, microstate_, nStreams_, and evf::MicroStateService::reservedMicroStateNames.

528  {
529  for (unsigned int i=0;i<nStreams_;i++)
531  }
int i
Definition: DBlmapReader.cc:9
static const edm::ModuleDescription reservedMicroStateNames[mCOUNT]
std::vector< const void * > microstate_
void evf::FastMonitoringService::setMicroState ( edm::StreamID  sid,
MicroStateService::Microstate  m 
)
virtual

Implements evf::MicroStateService.

Definition at line 534 of file FastMonitoringService.cc.

References m, microstate_, and evf::MicroStateService::reservedMicroStateNames.

535  {
537  }
static const edm::ModuleDescription reservedMicroStateNames[mCOUNT]
std::vector< const void * > microstate_
void evf::FastMonitoringService::startedLookingForFile ( )

Definition at line 552 of file FastMonitoringService.cc.

References fileLookStart_.

Referenced by FedRawDataInputSource::readSupervisor().

552  {
553  gettimeofday(&fileLookStart_, 0);
554  /*
555  std::cout << "Started looking for .raw file at: s=" << fileLookStart_.tv_sec << ": ms = "
556  << fileLookStart_.tv_usec / 1000.0 << std::endl;
557  */
558  }
void evf::FastMonitoringService::stoppedLookingForFile ( unsigned int  lumi)

Definition at line 560 of file FastMonitoringService.cc.

References avgLeadTime_, fileLookStart_, fileLookStop_, fmt_, i, leadTimes_, fff_deleter::lock(), fjr2json::lumi, lumiFromSource_, and evf::FastMonitoringThread::monlock_.

Referenced by FedRawDataInputSource::readSupervisor().

560  {
561  gettimeofday(&fileLookStop_, 0);
562  /*
563  std::cout << "Stopped looking for .raw file at: s=" << fileLookStop_.tv_sec << ": ms = "
564  << fileLookStop_.tv_usec / 1000.0 << std::endl;
565  */
566  std::lock_guard<std::mutex> lock(fmt_.monlock_);
567 
568  if (lumi>lumiFromSource_) {
570  leadTimes_.clear();
571  }
572  unsigned long elapsedTime = (fileLookStop_.tv_sec - fileLookStart_.tv_sec) * 1000000 // sec to us
573  + (fileLookStop_.tv_usec - fileLookStart_.tv_usec); // us
574  // add this to lead times for this lumi
575  leadTimes_.push_back((double)elapsedTime);
576 
577  // recompute average lead time for this lumi
578  if (leadTimes_.size() == 1) avgLeadTime_[lumi] = leadTimes_[0];
579  else {
580  double totTime = 0;
581  for (unsigned int i = 0; i < leadTimes_.size(); i++) totTime += leadTimes_[i];
582  avgLeadTime_[lumi] = 0.001*(totTime / leadTimes_.size());
583  }
584  }
int i
Definition: DBlmapReader.cc:9
tuple lumi
Definition: fjr2json.py:35
std::map< unsigned int, double > avgLeadTime_
std::vector< double > leadTimes_

Member Data Documentation

std::map<unsigned int, unsigned long> evf::FastMonitoringService::accuSize_
private
std::map<unsigned int, double> evf::FastMonitoringService::avgLeadTime_
private

Definition at line 224 of file FastMonitoringService.h.

Referenced by doSnapshot(), preGlobalBeginLumi(), and stoppedLookingForFile().

std::vector<std::atomic<bool>*> evf::FastMonitoringService::collectedPathList_
private

Definition at line 238 of file FastMonitoringService.h.

Referenced by preallocate(), and prePathEvent().

Encoding evf::FastMonitoringService::encModule_
private
std::vector<Encoding> evf::FastMonitoringService::encPath_
private
std::vector<unsigned int> evf::FastMonitoringService::eventCountForPathInit_
private

Definition at line 239 of file FastMonitoringService.h.

Referenced by postEvent(), preallocate(), and prePathEvent().

bool evf::FastMonitoringService::exception_detected_ = false
private

Definition at line 255 of file FastMonitoringService.h.

Referenced by preGlobalEndLumi(), and preSourceEarlyTermination().

std::vector<unsigned int> evf::FastMonitoringService::exceptionInLS_
private
std::string evf::FastMonitoringService::fastMicrostateDefPath_
private

Definition at line 201 of file FastMonitoringService.h.

Referenced by preallocate().

unsigned int evf::FastMonitoringService::fastMonIntervals_
private

Definition at line 199 of file FastMonitoringService.h.

Referenced by dowork().

std::string evf::FastMonitoringService::fastName_
private

Definition at line 202 of file FastMonitoringService.h.

Referenced by preallocate().

std::string evf::FastMonitoringService::fastPath_
private

Definition at line 202 of file FastMonitoringService.h.

Referenced by dowork(), and preallocate().

timeval evf::FastMonitoringService::fileLookStart_
private

Definition at line 207 of file FastMonitoringService.h.

Referenced by startedLookingForFile(), and stoppedLookingForFile().

timeval evf::FastMonitoringService::fileLookStop_
private

Definition at line 207 of file FastMonitoringService.h.

Referenced by stoppedLookingForFile().

std::map<unsigned int, unsigned int> evf::FastMonitoringService::filesProcessedDuringLumi_
private

Definition at line 225 of file FastMonitoringService.h.

Referenced by accumulateFileSize(), doSnapshot(), and preGlobalBeginLumi().

std::vector<unsigned long> evf::FastMonitoringService::firstEventId_
private

Definition at line 237 of file FastMonitoringService.h.

Referenced by preallocate(), and prePathEvent().

FastMonitoringThread evf::FastMonitoringService::fmt_
private
bool evf::FastMonitoringService::isGlobalLumiTransition_
private
unsigned int evf::FastMonitoringService::lastGlobalLumi_
private

Definition at line 209 of file FastMonitoringService.h.

Referenced by dowork(), preallocate(), and preGlobalBeginLumi().

std::queue<unsigned int> evf::FastMonitoringService::lastGlobalLumisClosed_
private

Definition at line 210 of file FastMonitoringService.h.

Referenced by postGlobalEndLumi(), and preGlobalBeginLumi().

std::vector<double> evf::FastMonitoringService::leadTimes_
private

Definition at line 228 of file FastMonitoringService.h.

Referenced by stoppedLookingForFile().

unsigned int evf::FastMonitoringService::lumiFromSource_
private

Definition at line 212 of file FastMonitoringService.h.

Referenced by preallocate(), and stoppedLookingForFile().

std::map<unsigned int, timeval> evf::FastMonitoringService::lumiStartTime_
private

Definition at line 206 of file FastMonitoringService.h.

Referenced by preGlobalBeginLumi(), and preGlobalEndLumi().

FastMonitoringThread::Macrostate evf::FastMonitoringService::macrostate_
private
const std::string evf::FastMonitoringService::macroStateNames
static
Initial value:
=
{"Init","JobReady","RunGiven","Running",
"Stopping","Done","JobEnded","Error","ErrorEnded","End",
"Invalid"}

Definition at line 108 of file FastMonitoringService.h.

std::vector<const void*> evf::FastMonitoringService::microstate_
private
std::string evf::FastMonitoringService::microstateDefPath_
private

Definition at line 201 of file FastMonitoringService.h.

Referenced by preallocate().

std::vector<const void*> evf::FastMonitoringService::ministate_
private
std::string evf::FastMonitoringService::moduleLegendFile_
private

Definition at line 250 of file FastMonitoringService.h.

Referenced by postBeginJob(), and preallocate().

std::atomic<bool> evf::FastMonitoringService::monInit_
private

Definition at line 254 of file FastMonitoringService.h.

Referenced by dowork(), and preallocate().

const std::string evf::FastMonitoringService::nopath_ = "NoPath"
static
unsigned int evf::FastMonitoringService::nStreams_
private

Definition at line 196 of file FastMonitoringService.h.

Referenced by doSnapshot(), preallocate(), and setMicroState().

unsigned int evf::FastMonitoringService::nThreads_
private

Definition at line 197 of file FastMonitoringService.h.

Referenced by preallocate().

std::string evf::FastMonitoringService::pathLegendFile_
private

Definition at line 251 of file FastMonitoringService.h.

Referenced by preallocate(), and prePathEvent().

bool evf::FastMonitoringService::pathLegendWritten_ = false
private

Definition at line 252 of file FastMonitoringService.h.

Referenced by prePathEvent().

std::vector<bool> evf::FastMonitoringService::pathNamesReady_
private

Definition at line 240 of file FastMonitoringService.h.

std::map<unsigned int, unsigned int> evf::FastMonitoringService::processedEventsPerLumi_
private
boost::filesystem::path evf::FastMonitoringService::runDirectory_
private

Definition at line 242 of file FastMonitoringService.h.

Referenced by getRunDirName(), and preallocate().

int evf::FastMonitoringService::sleepTime_
private

Definition at line 198 of file FastMonitoringService.h.

Referenced by dowork().

std::string evf::FastMonitoringService::slowName_
private

Definition at line 202 of file FastMonitoringService.h.

Referenced by preGlobalEndLumi().

unsigned int evf::FastMonitoringService::snapCounter_ = 0
private

Definition at line 200 of file FastMonitoringService.h.

Referenced by dowork().

std::map<unsigned int,unsigned int> evf::FastMonitoringService::sourceEventsReport_
private

Definition at line 244 of file FastMonitoringService.h.

Referenced by preGlobalEndLumi(), and reportEventsThisLumiInSource().

std::vector<std::atomic<bool>*> evf::FastMonitoringService::streamCounterUpdating_
private

Definition at line 235 of file FastMonitoringService.h.

Referenced by postEvent(), preallocate(), and preStreamEndLumi().

bool evf::FastMonitoringService::threadIDAvailable_ = false
private

Definition at line 246 of file FastMonitoringService.h.

Referenced by preallocate().

std::vector<const void*> evf::FastMonitoringService::threadMicrostate_
private

Definition at line 220 of file FastMonitoringService.h.

std::atomic<unsigned long> evf::FastMonitoringService::totalEventsProcessed_
private

Definition at line 248 of file FastMonitoringService.h.

Referenced by postEvent().

boost::filesystem::path evf::FastMonitoringService::workingDirectory_
private

Definition at line 242 of file FastMonitoringService.h.

Referenced by preallocate(), and preGlobalEndLumi().