CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
List of all members | Classes | Public Member Functions | Static Public Attributes | Private Member Functions | Private Attributes
evf::FastMonitoringService Class Reference

#include <FastMonitoringService.h>

Inheritance diagram for evf::FastMonitoringService:
evf::MicroStateService

Classes

struct  Encoding
 

Public Member Functions

void accumulateFileSize (unsigned int lumi, unsigned long fileSize)
 
 FastMonitoringService (const edm::ParameterSet &, edm::ActivityRegistry &)
 
unsigned int getEventsProcessedForLumi (unsigned int lumi)
 
std::string getRunDirName () const
 
void jobFailure ()
 
std::string makeModuleLegenda ()
 
std::string makePathLegenda ()
 
void postBeginJob ()
 
void postEndJob ()
 
void postEvent (edm::StreamContext const &)
 
void postGlobalBeginRun (edm::GlobalContext const &)
 
void postGlobalEndLumi (edm::GlobalContext const &)
 
void postModuleEvent (edm::StreamContext const &, edm::ModuleCallingContext const &)
 
void postSourceEvent (edm::StreamID)
 
void postStreamBeginLumi (edm::StreamContext const &)
 
void postStreamEndLumi (edm::StreamContext const &)
 
void preallocate (edm::service::SystemBounds const &)
 
void preEvent (edm::StreamContext const &)
 
void preGlobalBeginLumi (edm::GlobalContext const &)
 
void preGlobalEarlyTermination (edm::GlobalContext const &, edm::TerminationOrigin)
 
void preGlobalEndLumi (edm::GlobalContext const &)
 
void preModuleBeginJob (edm::ModuleDescription const &)
 
void preModuleEvent (edm::StreamContext const &, edm::ModuleCallingContext const &)
 
void prePathEvent (edm::StreamContext const &, edm::PathContext const &)
 
void preSourceEarlyTermination (edm::TerminationOrigin)
 
void preSourceEvent (edm::StreamID)
 
void preStreamBeginLumi (edm::StreamContext const &)
 
void preStreamEarlyTermination (edm::StreamContext const &, edm::TerminationOrigin)
 
void preStreamEndLumi (edm::StreamContext const &)
 
void reportEventsThisLumiInSource (unsigned int lumi, unsigned int events)
 
void reportLockWait (unsigned int ls, double waitTime, unsigned int lockCount)
 
void setExceptionDetected (unsigned int ls)
 
void setMicroState (MicroStateService::Microstate)
 
void setMicroState (edm::StreamID, MicroStateService::Microstate)
 
void startedLookingForFile ()
 
void stoppedLookingForFile (unsigned int lumi)
 
 ~FastMonitoringService ()
 
- Public Member Functions inherited from evf::MicroStateService
virtual std::string getMicroState1 ()
 
virtual std::string const & getMicroState2 ()
 
 MicroStateService (const edm::ParameterSet &, edm::ActivityRegistry &)
 
virtual ~MicroStateService ()
 

Static Public Attributes

static const std::string macroStateNames [FastMonitoringThread::MCOUNT]
 
static const std::string nopath_ = "NoPath"
 
- Static Public Attributes inherited from evf::MicroStateService
static const edm::ModuleDescription reservedMicroStateNames [mCOUNT]
 

Private Member Functions

void doSnapshot (const unsigned int ls, const bool isGlobalEOL)
 
void doStreamEOLSnapshot (const unsigned int ls, const unsigned int streamID)
 
void dowork ()
 

Private Attributes

std::map< unsigned int,
unsigned long > 
accuSize_
 
std::map< unsigned int, double > avgLeadTime_
 
std::vector< std::atomic< bool > * > collectedPathList_
 
Encoding encModule_
 
std::vector< EncodingencPath_
 
std::vector< unsigned int > eventCountForPathInit_
 
bool exception_detected_ = false
 
std::vector< unsigned int > exceptionInLS_
 
std::string fastMicrostateDefPath_
 
unsigned int fastMonIntervals_
 
std::string fastName_
 
std::string fastPath_
 
timeval fileLookStart_
 
timeval fileLookStop_
 
std::map< unsigned int,
unsigned int > 
filesProcessedDuringLumi_
 
std::vector< unsigned long > firstEventId_
 
FastMonitoringThread fmt_
 
bool isGlobalLumiTransition_
 
unsigned int lastGlobalLumi_
 
std::queue< unsigned int > lastGlobalLumisClosed_
 
std::vector< double > leadTimes_
 
std::map< unsigned int,
std::pair< double, unsigned
int > > 
lockStatsDuringLumi_
 
unsigned int lumiFromSource_
 
std::map< unsigned int, timeval > lumiStartTime_
 
FastMonitoringThread::Macrostate macrostate_
 
std::vector< const void * > microstate_
 
std::string microstateDefPath_
 
std::vector< const void * > ministate_
 
std::string moduleLegendFile_
 
std::atomic< bool > monInit_
 
unsigned int nStreams_
 
unsigned int nThreads_
 
std::string pathLegendFile_
 
bool pathLegendWritten_ = false
 
std::vector< bool > pathNamesReady_
 
std::map< unsigned int,
unsigned int > 
processedEventsPerLumi_
 
boost::filesystem::path runDirectory_
 
int sleepTime_
 
std::string slowName_
 
unsigned int snapCounter_ = 0
 
std::map< unsigned int,
unsigned int > 
sourceEventsReport_
 
std::vector< std::atomic< bool > * > streamCounterUpdating_
 
bool threadIDAvailable_ = false
 
std::vector< const void * > threadMicrostate_
 
std::atomic< unsigned long > totalEventsProcessed_
 
boost::filesystem::path workingDirectory_
 

Additional Inherited Members

- Public Types inherited from evf::MicroStateService
enum  Microstate {
  mInvalid = 0, mFwkOvh, mIdle, mInput,
  mInputDone, mDqm, mEoL, mCOUNT
}
 
- Static Protected Attributes inherited from evf::MicroStateService
static const std::string default_return_ ="NotImplemented"
 

Detailed Description

Definition at line 51 of file FastMonitoringService.h.

Constructor & Destructor Documentation

evf::FastMonitoringService::FastMonitoringService ( const edm::ParameterSet iPS,
edm::ActivityRegistry reg 
)

Definition at line 32 of file FastMonitoringService.cc.

References jobFailure(), postBeginJob(), postEndJob(), postEvent(), postGlobalEndLumi(), postModuleEvent(), postSourceEvent(), postStreamBeginLumi(), postStreamEndLumi(), preallocate(), preEvent(), preGlobalBeginLumi(), preGlobalEarlyTermination(), preGlobalEndLumi(), preModuleBeginJob(), preModuleEvent(), prePathEvent(), preSourceEarlyTermination(), preSourceEvent(), preStreamBeginLumi(), preStreamEarlyTermination(), preStreamEndLumi(), edm::ActivityRegistry::watchJobFailure(), edm::ActivityRegistry::watchPostBeginJob(), edm::ActivityRegistry::watchPostEndJob(), edm::ActivityRegistry::watchPostEvent(), edm::ActivityRegistry::watchPostGlobalEndLumi(), edm::ActivityRegistry::watchPostModuleEvent(), edm::ActivityRegistry::watchPostSourceEvent(), edm::ActivityRegistry::watchPostStreamBeginLumi(), edm::ActivityRegistry::watchPostStreamEndLumi(), edm::ActivityRegistry::watchPreallocate(), edm::ActivityRegistry::watchPreEvent(), edm::ActivityRegistry::watchPreGlobalBeginLumi(), edm::ActivityRegistry::watchPreGlobalEarlyTermination(), edm::ActivityRegistry::watchPreGlobalEndLumi(), edm::ActivityRegistry::watchPreModuleBeginJob(), edm::ActivityRegistry::watchPreModuleEvent(), edm::ActivityRegistry::watchPrePathEvent(), edm::ActivityRegistry::watchPreSourceEarlyTermination(), edm::ActivityRegistry::watchPreSourceEvent(), edm::ActivityRegistry::watchPreStreamBeginLumi(), edm::ActivityRegistry::watchPreStreamEarlyTermination(), and edm::ActivityRegistry::watchPreStreamEndLumi().

33  :
34  MicroStateService(iPS,reg)
35  ,encModule_(33)
36  ,nStreams_(0)//until initialized
37  ,sleepTime_(iPS.getUntrackedParameter<int>("sleepTime", 1))
38  ,fastMonIntervals_(iPS.getUntrackedParameter<unsigned int>("fastMonIntervals", 1))
39  ,microstateDefPath_(iPS.getUntrackedParameter<std::string> ("microstateDefPath", std::string(getenv("CMSSW_BASE"))+"/src/EventFilter/Utilities/plugins/microstatedef.jsd"))
41  ,fastName_(iPS.getUntrackedParameter<std::string>("fastName", "fastmoni"))
42  ,slowName_(iPS.getUntrackedParameter<std::string>("slowName", "slowmoni"))
44  {
45  reg.watchPreallocate(this, &FastMonitoringService::preallocate);//receiving information on number of threads
47 
51 
55 
60 
62 
65 
66  reg.watchPreSourceEvent(this,&FastMonitoringService::preSourceEvent);//source (with streamID of requestor)
68 
71 
75  }
void prePathEvent(edm::StreamContext const &, edm::PathContext const &)
void watchPreStreamEarlyTermination(PreStreamEarlyTermination::slot_type const &iSlot)
T getUntrackedParameter(std::string const &, T const &) const
void watchPreEvent(PreEvent::slot_type const &iSlot)
void watchPrePathEvent(PrePathEvent::slot_type const &iSlot)
void watchPreallocate(Preallocate::slot_type const &iSlot)
void watchPostEndJob(PostEndJob::slot_type const &iSlot)
void preallocate(edm::service::SystemBounds const &)
void preGlobalBeginLumi(edm::GlobalContext const &)
void watchPreModuleEvent(PreModuleEvent::slot_type const &iSlot)
void postGlobalEndLumi(edm::GlobalContext const &)
void postEvent(edm::StreamContext const &)
void watchPreGlobalEndLumi(PreGlobalEndLumi::slot_type const &iSlot)
void watchPostEvent(PostEvent::slot_type const &iSlot)
void watchPostStreamEndLumi(PostStreamEndLumi::slot_type const &iSlot)
void watchPreGlobalBeginLumi(PreGlobalBeginLumi::slot_type const &iSlot)
void watchPostModuleEvent(PostModuleEvent::slot_type const &iSlot)
void watchPostSourceEvent(PostSourceEvent::slot_type const &iSlot)
void preGlobalEndLumi(edm::GlobalContext const &)
void watchPreStreamEndLumi(PreStreamEndLumi::slot_type const &iSlot)
void preGlobalEarlyTermination(edm::GlobalContext const &, edm::TerminationOrigin)
void watchPreSourceEarlyTermination(PreSourceEarlyTermination::slot_type const &iSlot)
void watchJobFailure(JobFailure::slot_type const &iSlot)
convenience function for attaching to signal
void preModuleBeginJob(edm::ModuleDescription const &)
MicroStateService(const edm::ParameterSet &, edm::ActivityRegistry &)
void preStreamEndLumi(edm::StreamContext const &)
void watchPostStreamBeginLumi(PostStreamBeginLumi::slot_type const &iSlot)
void preStreamEarlyTermination(edm::StreamContext const &, edm::TerminationOrigin)
void watchPreGlobalEarlyTermination(PreGlobalEarlyTermination::slot_type const &iSlot)
void watchPostGlobalEndLumi(PostGlobalEndLumi::slot_type const &iSlot)
void watchPreModuleBeginJob(PreModuleBeginJob::slot_type const &iSlot)
void postModuleEvent(edm::StreamContext const &, edm::ModuleCallingContext const &)
void postStreamBeginLumi(edm::StreamContext const &)
void postStreamEndLumi(edm::StreamContext const &)
void preStreamBeginLumi(edm::StreamContext const &)
std::atomic< unsigned long > totalEventsProcessed_
void watchPreStreamBeginLumi(PreStreamBeginLumi::slot_type const &iSlot)
void preEvent(edm::StreamContext const &)
void preSourceEarlyTermination(edm::TerminationOrigin)
void watchPreSourceEvent(PreSourceEvent::slot_type const &iSlot)
void preModuleEvent(edm::StreamContext const &, edm::ModuleCallingContext const &)
void watchPostBeginJob(PostBeginJob::slot_type const &iSlot)
convenience function for attaching to signal
evf::FastMonitoringService::~FastMonitoringService ( )

Definition at line 78 of file FastMonitoringService.cc.

79  {
80  }

Member Function Documentation

void evf::FastMonitoringService::accumulateFileSize ( unsigned int  lumi,
unsigned long  fileSize 
)

Definition at line 546 of file FastMonitoringService.cc.

References accuSize_, filesProcessedDuringLumi_, fmt_, CommonMethods::lock(), fjr2json::lumi, and evf::FastMonitoringThread::monlock_.

Referenced by evf::EvFDaqDirector::bumpFile().

546  {
547  std::lock_guard<std::mutex> lock(fmt_.monlock_);
548 
549  if (accuSize_.find(lumi)==accuSize_.end()) accuSize_[lumi] = fileSize;
550  else accuSize_[lumi] += fileSize;
551 
554  else
556  }
std::map< unsigned int, unsigned long > accuSize_
tuple lumi
Definition: fjr2json.py:35
std::map< unsigned int, unsigned int > filesProcessedDuringLumi_
void evf::FastMonitoringService::doSnapshot ( const unsigned int  ls,
const bool  isGlobalEOL 
)
private

Definition at line 615 of file FastMonitoringService.cc.

References avgLeadTime_, encModule_, evf::FastMonitoringService::Encoding::encode(), encPath_, evf::FastMonitoringThread::MonitorData::fastAvgLeadTimeJ_, evf::FastMonitoringThread::MonitorData::fastFilesProcessedJ_, evf::FastMonitoringThread::MonitorData::fastLockCountJ_, evf::FastMonitoringThread::MonitorData::fastLockWaitJ_, evf::FastMonitoringThread::MonitorData::fastMacrostateJ_, filesProcessedDuringLumi_, fmt_, i, isGlobalLumiTransition_, evf::FastMonitoringThread::jsonMonitor_, lockStatsDuringLumi_, evf::FastMonitoringThread::m_data, macrostate_, microstate_, evf::FastMonitoringThread::MonitorData::microstateEncoded_, ministate_, evf::FastMonitoringThread::MonitorData::ministateEncoded_, and nStreams_.

Referenced by dowork(), and preGlobalEndLumi().

615  {
616  // update macrostate
618 
619  //update these unless in the midst of a global transition
621 
622  auto itd = avgLeadTime_.find(ls);
623  if (itd != avgLeadTime_.end())
624  fmt_.m_data.fastAvgLeadTimeJ_ = itd->second;
625  else fmt_.m_data.fastAvgLeadTimeJ_=0.;
626 
627  auto iti = filesProcessedDuringLumi_.find(ls);
628  if (iti != filesProcessedDuringLumi_.end())
629  fmt_.m_data.fastFilesProcessedJ_ = iti->second;
631 
632  auto itrd = lockStatsDuringLumi_.find(ls);
633  if (itrd != lockStatsDuringLumi_.end()) {
634  fmt_.m_data.fastLockWaitJ_ = itrd->second.first;
635  fmt_.m_data.fastLockCountJ_ = itrd->second.second;
636  }
637  else {
640  }
641 
642  }
643  else return;
644 
645  //capture latest mini/microstate of streams
646  for (unsigned int i=0;i<nStreams_;i++) {
649  }
650  //for (unsigned int i=0;i<nThreads_;i++)
651  // fmt_.m_data.threadMicrostateEncoded_[i] = encModule_.encode(threadMicrostate_[i]);
652 
653  if (isGlobalEOL)
654  {//only update global variables
655  fmt_.jsonMonitor_->snapGlobal(ls);
656  }
657  else
658  fmt_.jsonMonitor_->snap(ls);
659  }
int i
Definition: DBlmapReader.cc:9
def ls
Definition: eostools.py:346
std::map< unsigned int, unsigned int > filesProcessedDuringLumi_
std::map< unsigned int, double > avgLeadTime_
std::map< unsigned int, std::pair< double, unsigned int > > lockStatsDuringLumi_
std::vector< const void * > microstate_
std::vector< unsigned int > microstateEncoded_
FastMonitoringThread::Macrostate macrostate_
std::unique_ptr< FastMonitor > jsonMonitor_
std::vector< const void * > ministate_
std::vector< Encoding > encPath_
std::vector< unsigned int > ministateEncoded_
void evf::FastMonitoringService::doStreamEOLSnapshot ( const unsigned int  ls,
const unsigned int  streamID 
)
inlineprivate

Definition at line 161 of file FastMonitoringService.h.

References fmt_, and evf::FastMonitoringThread::jsonMonitor_.

Referenced by preStreamEndLumi().

161  {
162  //pick up only event count here
163  fmt_.jsonMonitor_->snapStreamAtomic(ls,streamID);
164  }
def ls
Definition: eostools.py:346
std::unique_ptr< FastMonitor > jsonMonitor_
void evf::FastMonitoringService::dowork ( )
inlineprivate

Definition at line 166 of file FastMonitoringService.h.

References doSnapshot(), encModule_, evf::FastMonitoringService::Encoding::encode(), encPath_, evf::FastMonitoringThread::MonitorData::fastMacrostateJ_, fastMonIntervals_, fastPath_, fmt_, evf::FastMonitoringThread::jsonMonitor_, lastGlobalLumi_, CommonMethods::lock(), evf::FastMonitoringThread::m_data, evf::FastMonitoringThread::m_stoprequest, microstate_, ministate_, monInit_, evf::FastMonitoringThread::monlock_, sleepTime_, snapCounter_, AlCaHLTBitMon_QueryRunRegistry::string, and jsoncollector::IntJ::value().

Referenced by preallocate().

166  { // the function to be called in the thread. Thread completes when function returns.
167  monInit_.exchange(true,std::memory_order_acquire);
168  while (!fmt_.m_stoprequest) {
169  edm::LogInfo("FastMonitoringService") << "Current states: Ms=" << fmt_.m_data.fastMacrostateJ_.value()
170  << " ms=" << encPath_[0].encode(ministate_[0])
171  << " us=" << encModule_.encode(microstate_[0]) << std::endl;
172 
173  {
174  std::lock_guard<std::mutex> lock(fmt_.monlock_);
175 
177 
179  std::string CSV = fmt_.jsonMonitor_->getCSVString();
180  //release mutex before writing out fast path file
181  fmt_.monlock_.unlock();
182  if (CSV.size())
183  fmt_.jsonMonitor_->outputCSV(fastPath_,CSV);
184  }
185 
186  snapCounter_++;
187 
188  }
189  ::sleep(sleepTime_);
190  }
191  }
void doSnapshot(const unsigned int ls, const bool isGlobalEOL)
std::vector< const void * > microstate_
std::atomic< bool > m_stoprequest
std::unique_ptr< FastMonitor > jsonMonitor_
std::vector< const void * > ministate_
std::vector< Encoding > encPath_
unsigned int evf::FastMonitoringService::getEventsProcessedForLumi ( unsigned int  lumi)

Definition at line 600 of file FastMonitoringService.cc.

References edm::hlt::Exception, fmt_, CommonMethods::lock(), fjr2json::lumi, evf::FastMonitoringThread::monlock_, proc, and processedEventsPerLumi_.

Referenced by DQMFileSaver::fillJson(), DQMFileSaver::globalEndLuminosityBlock(), and TriggerJSONMonitoring::globalEndLuminosityBlockSummary().

600  {
601  std::lock_guard<std::mutex> lock(fmt_.monlock_);
602 
603  auto it = processedEventsPerLumi_.find(lumi);
604  if (it!=processedEventsPerLumi_.end()) {
605  unsigned int proc = it->second;
606  return proc;
607  }
608  else {
609  throw cms::Exception("FastMonitoringService") << "output module wants already deleted (or never reported by SOURCE) lumisection event count for LUMI -: "<<lumi;
610  return 0;
611  }
612  }
TrainProcessor *const proc
Definition: MVATrainer.cc:101
tuple lumi
Definition: fjr2json.py:35
std::map< unsigned int, unsigned int > processedEventsPerLumi_
std::string evf::FastMonitoringService::getRunDirName ( ) const
inline

Definition at line 155 of file FastMonitoringService.h.

References runDirectory_.

155 { return runDirectory_.stem().string(); }
boost::filesystem::path runDirectory_
void evf::FastMonitoringService::jobFailure ( )
std::string evf::FastMonitoringService::makeModuleLegenda ( )

Definition at line 94 of file FastMonitoringService.cc.

References evf::FastMonitoringService::Encoding::current_, evf::FastMonitoringService::Encoding::decode(), encModule_, and i.

Referenced by postBeginJob().

95  {
96  std::ostringstream ost;
97  for(int i = 0; i < encModule_.current_; i++)
98  ost<<i<<"="<<((const edm::ModuleDescription *)(encModule_.decode(i)))->moduleLabel()<<" ";
99  return ost.str();
100  }
int i
Definition: DBlmapReader.cc:9
const void * decode(unsigned int index)
std::string evf::FastMonitoringService::makePathLegenda ( )

Definition at line 83 of file FastMonitoringService.cc.

References edm::decode(), encPath_, i, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by prePathEvent().

83  {
84  //taken from first stream
85  std::ostringstream ost;
86  for(int i = 0;
87  i < encPath_[0].current_;
88  i++)
89  ost<<i<<"="<<*((std::string *)(encPath_[0].decode(i)))<<" ";
90  return ost.str();
91  }
int i
Definition: DBlmapReader.cc:9
bool decode(bool &, std::string const &)
Definition: types.cc:62
std::vector< Encoding > encPath_
void evf::FastMonitoringService::postBeginJob ( )

Definition at line 274 of file FastMonitoringService.cc.

References encModule_, fmt_, CommonMethods::lock(), evf::FastMonitoringThread::m_data, macrostate_, makeModuleLegenda(), evf::FastMonitoringThread::MonitorData::microstateBins_, moduleLegendFile_, evf::FastMonitoringThread::monlock_, evf::FastMonitoringThread::sJobReady, AlCaHLTBitMon_QueryRunRegistry::string, and evf::FastMonitoringService::Encoding::vecsize().

Referenced by FastMonitoringService().

275  {
276  std::string && moduleLegStr = makeModuleLegenda();
277  FileIO::writeStringToFile(moduleLegendFile_, moduleLegStr);
278 
280 
281  //update number of entries in module histogram
282  std::lock_guard<std::mutex> lock(fmt_.monlock_);
284  }
FastMonitoringThread::Macrostate macrostate_
void evf::FastMonitoringService::postEndJob ( )
void evf::FastMonitoringService::postEvent ( edm::StreamContext const &  sc)

Definition at line 481 of file FastMonitoringService.cc.

References eventCountForPathInit_, evf::FastMonitoringThread::MonitorData::fastPathProcessedJ_, fmt_, evf::FastMonitoringThread::m_data, microstate_, evf::MicroStateService::mIdle, ministate_, nopath_, evf::FastMonitoringThread::MonitorData::processed_, evf::MicroStateService::reservedMicroStateNames, streamCounterUpdating_, edm::StreamContext::streamID(), and totalEventsProcessed_.

Referenced by FastMonitoringService().

482  {
483  microstate_[sc.streamID()] = &reservedMicroStateNames[mIdle];
484 
485  ministate_[sc.streamID()] = &nopath_;
486 
487  #if ATOMIC_LEVEL>=2
488  //use atomic flag to make sure end of lumi sees this
489  streamCounterUpdating_[sc.streamID()]->store(true,std::memory_order_release);
490  fmt_.m_data.processed_[sc.streamID()]->fetch_add(1,std::memory_order_release);
491  streamCounterUpdating_[sc.streamID()]->store(false,std::memory_order_release);
492 
493  #elif ATOMIC_LEVEL==1
494  //writes are atomic, we assume writes propagate to memory before stream EOL snap
495  fmt_.m_data.processed_[sc.streamID()]->fetch_add(1,std::memory_order_relaxed);
496 
497  #elif ATOMIC_LEVEL==0 //default
498  (*(fmt_.m_data.processed_[sc.streamID()]))++;
499  #endif
500  eventCountForPathInit_[sc.streamID()]++;
501 
502  //fast path counter (events accumulated in a run)
503  unsigned long res = totalEventsProcessed_.fetch_add(1,std::memory_order_relaxed);
505  //fmt_.m_data.fastPathProcessedJ_ = totalEventsProcessed_.load(std::memory_order_relaxed);
506  }
static const edm::ModuleDescription reservedMicroStateNames[mCOUNT]
std::vector< AtomicMonUInt * > processed_
std::vector< unsigned int > eventCountForPathInit_
std::vector< std::atomic< bool > * > streamCounterUpdating_
static const std::string nopath_
std::vector< const void * > microstate_
std::atomic< unsigned long > totalEventsProcessed_
std::vector< const void * > ministate_
void evf::FastMonitoringService::postGlobalBeginRun ( edm::GlobalContext const &  gc)
void evf::FastMonitoringService::postGlobalEndLumi ( edm::GlobalContext const &  gc)

Definition at line 398 of file FastMonitoringService.cc.

References lastGlobalLumisClosed_, edm::LuminosityBlockID::luminosityBlock(), and edm::GlobalContext::luminosityBlockID().

Referenced by FastMonitoringService().

399  {
400  //mark closed lumis (still keep map entries until next one)
401  lastGlobalLumisClosed_.push(gc.luminosityBlockID().luminosityBlock());
402  }
std::queue< unsigned int > lastGlobalLumisClosed_
void evf::FastMonitoringService::postModuleEvent ( edm::StreamContext const &  sc,
edm::ModuleCallingContext const &  mcc 
)

Definition at line 523 of file FastMonitoringService.cc.

References evf::MicroStateService::mFwkOvh, microstate_, evf::MicroStateService::reservedMicroStateNames, edm::StreamContext::streamID(), and edm::StreamID::value().

Referenced by FastMonitoringService().

524  {
525  //microstate_[sc.streamID().value()] = (void*)(mcc.moduleDescription());
526  microstate_[sc.streamID().value()] = &reservedMicroStateNames[mFwkOvh];
527  }
static const edm::ModuleDescription reservedMicroStateNames[mCOUNT]
std::vector< const void * > microstate_
void evf::FastMonitoringService::postSourceEvent ( edm::StreamID  sid)

Definition at line 513 of file FastMonitoringService.cc.

References evf::MicroStateService::mFwkOvh, microstate_, evf::MicroStateService::reservedMicroStateNames, and edm::StreamID::value().

Referenced by FastMonitoringService().

514  {
516  }
static const edm::ModuleDescription reservedMicroStateNames[mCOUNT]
std::vector< const void * > microstate_
unsigned int value() const
Definition: StreamID.h:46
void evf::FastMonitoringService::postStreamBeginLumi ( edm::StreamContext const &  sc)
void evf::FastMonitoringService::postStreamEndLumi ( edm::StreamContext const &  sc)
void evf::FastMonitoringService::preallocate ( edm::service::SystemBounds const &  bounds)

Definition at line 103 of file FastMonitoringService.cc.

References collectedPathList_, evf::FastMonitoringService::Encoding::completeReservedWithDummies(), dowork(), encModule_, encPath_, eventCountForPathInit_, edm::hlt::Exception, fastMicrostateDefPath_, fastName_, fastPath_, firstEventId_, fmt_, i, isGlobalLumiTransition_, evf::FastMonitoringThread::jsonMonitor_, lastGlobalLumi_, LogDebug, lumiFromSource_, evf::FastMonitoringThread::m_data, macrostate_, evf::FastMonitoringThread::MonitorData::macrostateBins_, edm::service::SystemBounds::maxNumberOfStreams(), edm::service::SystemBounds::maxNumberOfThreads(), evf::FastMonitoringThread::MCOUNT, evf::MicroStateService::mCOUNT, microstate_, evf::FastMonitoringThread::MonitorData::microstateBins_, microstateDefPath_, ministate_, evf::FastMonitoringThread::MonitorData::ministateBins_, evf::MicroStateService::mInvalid, moduleLegendFile_, monInit_, nopath_, nStreams_, nThreads_, cppFunctionSkipper::operator, cmsHarvester::path, pathLegendFile_, evf::FastMonitoringThread::MonitorData::registerVariables(), evf::MicroStateService::reservedMicroStateNames, evf::FastMonitoringThread::resetFastMonitor(), runDirectory_, evf::FastMonitoringThread::sInit, evf::FastMonitoringThread::start(), streamCounterUpdating_, threadIDAvailable_, evf::FastMonitoringService::Encoding::updateReserved(), and workingDirectory_.

Referenced by FastMonitoringService().

104  {
105 
106  // FIND RUN DIRECTORY
107  // The run dir should be set via the configuration of EvFDaqDirector
108 
109  if (edm::Service<evf::EvFDaqDirector>().operator->()==nullptr)
110  {
111  throw cms::Exception("FastMonitoringService") << "EvFDaqDirector is not present";
112 
113  }
114  boost::filesystem::path runDirectory(edm::Service<evf::EvFDaqDirector>()->baseRunDir());
115  workingDirectory_ = runDirectory_ = runDirectory;
116  workingDirectory_ /= "mon";
117 
118  if ( !boost::filesystem::is_directory(workingDirectory_)) {
119  LogDebug("FastMonitoringService") << "<MON> DIR NOT FOUND! Trying to create -: " << workingDirectory_.string() ;
120  boost::filesystem::create_directories(workingDirectory_);
121  if ( !boost::filesystem::is_directory(workingDirectory_))
122  edm::LogWarning("FastMonitoringService") << "Unable to create <MON> DIR -: " << workingDirectory_.string()
123  << ". No monitoring data will be written.";
124  }
125 
126  std::ostringstream fastFileName;
127 
128  fastFileName << fastName_ << "_pid" << std::setfill('0') << std::setw(5) << getpid() << ".fast";
130  fast /= fastFileName.str();
131  fastPath_ = fast.string();
132 
133  std::ostringstream moduleLegFile;
134  moduleLegFile << "microstatelegend_pid" << std::setfill('0') << std::setw(5) << getpid() << ".leg";
135  moduleLegendFile_ = (workingDirectory_/moduleLegFile.str()).string();
136 
137  std::ostringstream pathLegFile;
138  pathLegFile << "pathlegend_pid" << std::setfill('0') << std::setw(5) << getpid() << ".leg";
139  pathLegendFile_ = (workingDirectory_/pathLegFile.str()).string();
140 
141  LogDebug("FastMonitoringService") << "Initializing FastMonitor with microstate def path -: "
143  //<< encPath_.current_ + 1 << " " << encModule_.current_ + 1
144 
145  nStreams_=bounds.maxNumberOfStreams();
146  nThreads_=bounds.maxNumberOfThreads();
147 
148  //this should already be >=1
149  if (nStreams_==0) nStreams_=1;
150  if (nThreads_==0) nThreads_=1;
151 
152  /*
153  * initialize the fast monitor with:
154  * vector of pointers to monitorable parameters
155  * path to definition
156  *
157  */
158 
160 
161  for(unsigned int i = 0; i < (mCOUNT); i++)
164 
165  for (unsigned int i=0;i<nStreams_;i++) {
166  ministate_.push_back(&nopath_);
168 
169  //for synchronization
170  streamCounterUpdating_.push_back(new std::atomic<bool>(0));
171 
172  //path (mini) state
173  encPath_.emplace_back(0);
174  encPath_[i].update((void*)&nopath_);
175  eventCountForPathInit_.push_back(0);
176  firstEventId_.push_back(0);
177  collectedPathList_.push_back(new std::atomic<bool>(0));
178 
179  }
180  //for (unsigned int i=0;i<nThreads_;i++)
181  // threadMicrostate_.push_back(&reservedMicroStateNames[mInvalid]);
182 
183  //initial size until we detect number of bins
187 
188  lastGlobalLumi_=0;
190  lumiFromSource_=0;
191 
192  //startup monitoring
194  fmt_.jsonMonitor_->setNStreams(nStreams_);
196  monInit_.store(false,std::memory_order_release);
198 
199  //this definition needs: #include "tbb/compat/thread"
200  //however this would results in TBB imeplementation replacing std::thread
201  //(both supposedly call pthread_self())
202  //number of threads created in process could be obtained from /proc,
203  //assuming that all posix threads are true kernel threads capable of running in parallel
204 
205  //#if TBB_IMPLEMENT_CPP0X
207  //threadIDAvailable_=true;
208  //#endif
209 
210  }
#define LogDebug(id)
int i
Definition: DBlmapReader.cc:9
static const edm::ModuleDescription reservedMicroStateNames[mCOUNT]
boost::filesystem::path runDirectory_
void start(void(FastMonitoringService::*fp)(), FastMonitoringService *cp)
std::vector< unsigned int > eventCountForPathInit_
std::vector< std::atomic< bool > * > streamCounterUpdating_
std::vector< std::atomic< bool > * > collectedPathList_
tuple path
else: Piece not in the list, fine.
static const std::string nopath_
void resetFastMonitor(std::string const &microStateDefPath, std::string const &fastMicroStateDefPath)
std::vector< unsigned long > firstEventId_
std::vector< const void * > microstate_
void registerVariables(FastMonitor *fm, unsigned int nStreams, unsigned int nThreads)
FastMonitoringThread::Macrostate macrostate_
boost::filesystem::path workingDirectory_
std::unique_ptr< FastMonitor > jsonMonitor_
std::vector< const void * > ministate_
std::vector< Encoding > encPath_
void evf::FastMonitoringService::preEvent ( edm::StreamContext const &  sc)

Definition at line 477 of file FastMonitoringService.cc.

Referenced by FastMonitoringService().

478  {
479  }
void evf::FastMonitoringService::preGlobalBeginLumi ( edm::GlobalContext const &  gc)

Definition at line 297 of file FastMonitoringService.cc.

References accuSize_, avgLeadTime_, filesProcessedDuringLumi_, fmt_, isGlobalLumiTransition_, lastGlobalLumi_, lastGlobalLumisClosed_, CommonMethods::lock(), lockStatsDuringLumi_, edm::LuminosityBlockID::luminosityBlock(), edm::GlobalContext::luminosityBlockID(), lumiStartTime_, evf::FastMonitoringThread::monlock_, and processedEventsPerLumi_.

Referenced by FastMonitoringService().

298  {
299 
300  timeval lumiStartTime;
301  gettimeofday(&lumiStartTime, 0);
302  unsigned int newLumi = gc.luminosityBlockID().luminosityBlock();
303 
304  std::lock_guard<std::mutex> lock(fmt_.monlock_);
305 
306  lumiStartTime_[newLumi]=lumiStartTime;
307  while (!lastGlobalLumisClosed_.empty()) {
308  //wipe out old map entries as they aren't needed and slow down access
309  unsigned int oldLumi = lastGlobalLumisClosed_.back();
311  lumiStartTime_.erase(oldLumi);
312  avgLeadTime_.erase(oldLumi);
313  filesProcessedDuringLumi_.erase(oldLumi);
314  accuSize_.erase(oldLumi);
315  lockStatsDuringLumi_.erase(oldLumi);
316  processedEventsPerLumi_.erase(oldLumi);
317  }
318  lastGlobalLumi_= newLumi;
320  }
std::map< unsigned int, timeval > lumiStartTime_
std::map< unsigned int, unsigned long > accuSize_
std::map< unsigned int, unsigned int > filesProcessedDuringLumi_
std::queue< unsigned int > lastGlobalLumisClosed_
std::map< unsigned int, double > avgLeadTime_
std::map< unsigned int, std::pair< double, unsigned int > > lockStatsDuringLumi_
std::map< unsigned int, unsigned int > processedEventsPerLumi_
void evf::FastMonitoringService::preGlobalEarlyTermination ( edm::GlobalContext const &  gc,
edm::TerminationOrigin  to 
)

Definition at line 225 of file FastMonitoringService.cc.

References edm::ExceptionFromAnotherContext, edm::ExceptionFromThisContext, exceptionInLS_, edm::ExternalSignal, fmt_, CommonMethods::lock(), edm::LuminosityBlockID::luminosityBlock(), edm::GlobalContext::luminosityBlockID(), evf::FastMonitoringThread::monlock_, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by FastMonitoringService().

226  {
227  std::string context;
228  if (to==edm::TerminationOrigin::ExceptionFromThisContext) context = " FromThisContext ";
229  if (to==edm::TerminationOrigin::ExceptionFromAnotherContext) context = " FromAnotherContext";
230  if (to==edm::TerminationOrigin::ExternalSignal) context = " FromExternalSignal";
231  edm::LogInfo("FastMonitoringService") << " GLOBAL " << "earlyTermination -: LS:"
232  << gc.luminosityBlockID().luminosityBlock() << " " << context;
233  std::lock_guard<std::mutex> lock(fmt_.monlock_);
234  exceptionInLS_.push_back(gc.luminosityBlockID().luminosityBlock());
235  //exception_detected_=true;
236  }
std::vector< unsigned int > exceptionInLS_
void evf::FastMonitoringService::preGlobalEndLumi ( edm::GlobalContext const &  gc)

Definition at line 322 of file FastMonitoringService.cc.

References accuSize_, doSnapshot(), edm::hlt::Exception, exception_detected_, exceptionInLS_, evf::FastMonitoringThread::MonitorData::fastThroughputJ_, fmt_, isGlobalLumiTransition_, evf::FastMonitoringThread::jsonMonitor_, CommonMethods::lock(), LogDebug, fjr2json::lumi, edm::LuminosityBlockID::luminosityBlock(), edm::GlobalContext::luminosityBlockID(), lumiStartTime_, evf::FastMonitoringThread::m_data, evf::FastMonitoringThread::monlock_, cmsHarvester::path, processedEventsPerLumi_, edm::shutdown_flag, slowName_, sourceEventsReport_, throughputFactor(), jsoncollector::IntJ::value(), jsoncollector::DoubleJ::value(), and workingDirectory_.

Referenced by FastMonitoringService().

323  {
324  unsigned int lumi = gc.luminosityBlockID().luminosityBlock();
325  LogDebug("FastMonitoringService") << "Lumi ended. Writing JSON information. LUMI -: "
326  << lumi;
327  timeval lumiStopTime;
328  gettimeofday(&lumiStopTime, 0);
329 
330  std::lock_guard<std::mutex> lock(fmt_.monlock_);
331 
332  // Compute throughput
333  timeval stt = lumiStartTime_[lumi];
334  unsigned long usecondsForLumi = (lumiStopTime.tv_sec - stt.tv_sec)*1000000
335  + (lumiStopTime.tv_usec - stt.tv_usec);
336  unsigned long accuSize = accuSize_.find(lumi)==accuSize_.end() ? 0 : accuSize_[lumi];
337  double throughput = throughputFactor()* double(accuSize) / double(usecondsForLumi);
338  //store to registered variable
339  fmt_.m_data.fastThroughputJ_.value() = throughput;
340 
341  //update
342  doSnapshot(lumi,true);
343 
344  // create file name for slow monitoring file
345  std::stringstream slowFileName;
346  slowFileName << slowName_ << "_ls" << std::setfill('0') << std::setw(4)
347  << lumi << "_pid" << std::setfill('0')
348  << std::setw(5) << getpid() << ".jsn";
350  slow /= slowFileName.str();
351 
352  //retrieve one result we need (todo: sanity check if it's found)
353  IntJ *lumiProcessedJptr = dynamic_cast<IntJ*>(fmt_.jsonMonitor_->getMergedIntJForLumi("Processed",lumi));
354  if (!lumiProcessedJptr)
355  throw cms::Exception("FastMonitoringService") << "Internal error: got null pointer from FastMonitor";
356  processedEventsPerLumi_[lumi] = lumiProcessedJptr->value();
357 
358  {
359  auto itr = sourceEventsReport_.find(lumi);
360  if (itr==sourceEventsReport_.end()) {
361  //check if exception has been thrown (in case of Global/Stream early termination, for this LS)
362  bool exception_detected = exception_detected_;
363  for (auto ex : exceptionInLS_)
364  if (lumi == ex) exception_detected=true;
365 
366  if (edm::shutdown_flag || exception_detected) {
367  edm::LogInfo("FastMonitoringService") << "Run interrupted. Skip writing EoL information -: "
368  << processedEventsPerLumi_[lumi] << " events were processed in LUMI " << lumi;
369  //this will prevent output modules from producing json file for possibly incomplete lumi
371  return;
372  }
373  //disable this exception, so service can be used standalone (will be thrown if output module asks for this information)
374  //throw cms::Exception("FastMonitoringService") << "SOURCE did not send update for lumi block. LUMI -:" << lumi;
375  }
376  else {
377  if (itr->second!=processedEventsPerLumi_[lumi]) {
378  throw cms::Exception("FastMonitoringService") << "MISMATCH with SOURCE update. LUMI -: "
379  << lumi
380  << ", events(processed):" << processedEventsPerLumi_[lumi]
381  << " events(source):" << itr->second;
382  }
383  sourceEventsReport_.erase(itr);
384  }
385  }
386  edm::LogInfo("FastMonitoringService") << "Statistics for lumisection -: lumi = " << lumi << " events = "
387  << lumiProcessedJptr->value() << " time = " << usecondsForLumi/1000000
388  << " size = " << accuSize << " thr = " << throughput;
389  delete lumiProcessedJptr;
390 
391  //full global and stream merge&output for this lumi
392  fmt_.jsonMonitor_->outputFullJSON(slow.string(),lumi);//full global and stream merge and JSON write for this lumi
393  fmt_.jsonMonitor_->discardCollected(lumi);//we don't do further updates for this lumi
394 
396  }
#define LogDebug(id)
std::map< unsigned int, timeval > lumiStartTime_
double throughputFactor()
std::map< unsigned int, unsigned long > accuSize_
tuple lumi
Definition: fjr2json.py:35
volatile std::atomic< bool > shutdown_flag
tuple path
else: Piece not in the list, fine.
void doSnapshot(const unsigned int ls, const bool isGlobalEOL)
boost::filesystem::path workingDirectory_
std::map< unsigned int, unsigned int > processedEventsPerLumi_
std::map< unsigned int, unsigned int > sourceEventsReport_
std::unique_ptr< FastMonitor > jsonMonitor_
std::vector< unsigned int > exceptionInLS_
void evf::FastMonitoringService::preModuleBeginJob ( edm::ModuleDescription const &  desc)

Definition at line 260 of file FastMonitoringService.cc.

References encModule_, fmt_, CommonMethods::lock(), edm::ModuleDescription::moduleName(), evf::FastMonitoringThread::monlock_, evf::FastMonitoringService::Encoding::update(), and evf::FastMonitoringService::Encoding::updateReserved().

Referenced by FastMonitoringService().

261  {
262  std::lock_guard<std::mutex> lock(fmt_.monlock_);
263  //std::cout << " Pre module Begin Job module: " << desc.moduleName() << std::endl;
264 
265  //build a map of modules keyed by their module description address
266  //here we need to treat output modules in a special way so they can be easily singled out
267  if(desc.moduleName() == "Stream" || desc.moduleName() == "ShmStreamConsumer" ||
268  desc.moduleName() == "EventStreamFileWriter" || desc.moduleName() == "PoolOutputModule")
269  encModule_.updateReserved((void*)&desc);
270  else
271  encModule_.update((void*)&desc);
272  }
void evf::FastMonitoringService::preModuleEvent ( edm::StreamContext const &  sc,
edm::ModuleCallingContext const &  mcc 
)

Definition at line 518 of file FastMonitoringService.cc.

References microstate_, edm::ModuleCallingContext::moduleDescription(), edm::StreamContext::streamID(), and edm::StreamID::value().

Referenced by FastMonitoringService().

519  {
520  microstate_[sc.streamID().value()] = (void*)(mcc.moduleDescription());
521  }
std::vector< const void * > microstate_
void evf::FastMonitoringService::prePathEvent ( edm::StreamContext const &  sc,
edm::PathContext const &  pc 
)

Definition at line 444 of file FastMonitoringService.cc.

References collectedPathList_, encPath_, edm::EventID::event(), eventCountForPathInit_, edm::StreamContext::eventID(), firstEventId_, fmt_, CommonMethods::lock(), evf::FastMonitoringThread::m_data, makePathLegenda(), ministate_, evf::FastMonitoringThread::MonitorData::ministateBins_, evf::FastMonitoringThread::monlock_, pathLegendFile_, pathLegendWritten_, edm::PathContext::pathName(), edm::StreamContext::streamID(), AlCaHLTBitMon_QueryRunRegistry::string, and unlikely.

Referenced by FastMonitoringService().

445  {
446  //make sure that all path names are retrieved before allowing ministate to change
447  //hack: assume memory is synchronized after ~50 events seen by each stream
448  if (unlikely(eventCountForPathInit_[sc.streamID()]<50) && false==collectedPathList_[sc.streamID()]->load(std::memory_order_acquire))
449  {
450  //protection between stream threads, as well as the service monitoring thread
451  std::lock_guard<std::mutex> lock(fmt_.monlock_);
452 
453  if (firstEventId_[sc.streamID()]==0)
454  firstEventId_[sc.streamID()]=sc.eventID().event();
455  if (sc.eventID().event()==firstEventId_[sc.streamID()])
456  {
457  encPath_[sc.streamID()].update((void*)&pc.pathName());
458  return;
459  }
460  else {
461  //finished collecting path names
462  collectedPathList_[sc.streamID()]->store(true,std::memory_order_seq_cst);
463  fmt_.m_data.ministateBins_=encPath_[sc.streamID()].vecsize();
464  if (!pathLegendWritten_) {
465  std::string pathLegendStr = makePathLegenda();
466  FileIO::writeStringToFile(pathLegendFile_, pathLegendStr);
467  pathLegendWritten_=true;
468  }
469  }
470  }
471  else {
472  ministate_[sc.streamID()] = &(pc.pathName());
473  }
474  }
std::vector< unsigned int > eventCountForPathInit_
#define unlikely(x)
std::vector< std::atomic< bool > * > collectedPathList_
std::vector< unsigned long > firstEventId_
std::vector< const void * > ministate_
std::vector< Encoding > encPath_
void evf::FastMonitoringService::preSourceEarlyTermination ( edm::TerminationOrigin  to)
void evf::FastMonitoringService::preSourceEvent ( edm::StreamID  sid)

Definition at line 508 of file FastMonitoringService.cc.

References microstate_, evf::MicroStateService::mInput, evf::MicroStateService::reservedMicroStateNames, and edm::StreamID::value().

Referenced by FastMonitoringService().

509  {
511  }
static const edm::ModuleDescription reservedMicroStateNames[mCOUNT]
std::vector< const void * > microstate_
unsigned int value() const
Definition: StreamID.h:46
void evf::FastMonitoringService::preStreamBeginLumi ( edm::StreamContext const &  sc)

Definition at line 404 of file FastMonitoringService.cc.

References edm::StreamContext::eventID(), fmt_, CommonMethods::lock(), edm::EventID::luminosityBlock(), evf::FastMonitoringThread::m_data, evf::MicroStateService::mFwkOvh, microstate_, ministate_, evf::FastMonitoringThread::monlock_, nopath_, evf::FastMonitoringThread::MonitorData::processed_, evf::MicroStateService::reservedMicroStateNames, edm::StreamContext::streamID(), evf::FastMonitoringThread::MonitorData::streamLumi_, and edm::StreamID::value().

Referenced by FastMonitoringService().

405  {
406  unsigned int sid = sc.streamID().value();
407  std::lock_guard<std::mutex> lock(fmt_.monlock_);
408  fmt_.m_data.streamLumi_[sid] = sc.eventID().luminosityBlock();
409 
410  //reset collected values for this stream
411  *(fmt_.m_data.processed_[sid])=0;
412 
413  ministate_[sid]=&nopath_;
415  }
static const edm::ModuleDescription reservedMicroStateNames[mCOUNT]
std::vector< AtomicMonUInt * > processed_
static const std::string nopath_
std::vector< const void * > microstate_
std::vector< const void * > ministate_
std::vector< unsigned int > streamLumi_
void evf::FastMonitoringService::preStreamEarlyTermination ( edm::StreamContext const &  sc,
edm::TerminationOrigin  to 
)

Definition at line 212 of file FastMonitoringService.cc.

References edm::StreamContext::eventID(), edm::ExceptionFromAnotherContext, edm::ExceptionFromThisContext, exceptionInLS_, edm::ExternalSignal, fmt_, CommonMethods::lock(), edm::EventID::luminosityBlock(), evf::FastMonitoringThread::monlock_, edm::StreamContext::streamID(), AlCaHLTBitMon_QueryRunRegistry::string, and edm::StreamID::value().

Referenced by FastMonitoringService().

213  {
214  std::string context;
215  if (to==edm::TerminationOrigin::ExceptionFromThisContext) context = " FromThisContext ";
216  if (to==edm::TerminationOrigin::ExceptionFromAnotherContext) context = " FromAnotherContext";
217  if (to==edm::TerminationOrigin::ExternalSignal) context = " FromExternalSignal";
218  edm::LogInfo("FastMonitoringService") << " STREAM " << sc.streamID().value() << " earlyTermination -: ID:"<< sc.eventID()
219  << " LS:" << sc.eventID().luminosityBlock() << " " << context;
220  std::lock_guard<std::mutex> lock(fmt_.monlock_);
221  exceptionInLS_.push_back(sc.eventID().luminosityBlock());
222  //exception_detected_=true;
223  }
std::vector< unsigned int > exceptionInLS_
void evf::FastMonitoringService::preStreamEndLumi ( edm::StreamContext const &  sc)

Definition at line 422 of file FastMonitoringService.cc.

References doStreamEOLSnapshot(), edm::StreamContext::eventID(), fmt_, svgfig::load(), CommonMethods::lock(), edm::EventID::luminosityBlock(), evf::MicroStateService::mEoL, microstate_, ministate_, evf::FastMonitoringThread::monlock_, nopath_, evf::MicroStateService::reservedMicroStateNames, streamCounterUpdating_, edm::StreamContext::streamID(), and edm::StreamID::value().

Referenced by FastMonitoringService().

423  {
424  unsigned int sid = sc.streamID().value();
425  std::lock_guard<std::mutex> lock(fmt_.monlock_);
426 
427  #if ATOMIC_LEVEL>=2
428  //spinlock to make sure we are not still updating event counter somewhere
429  while (streamCounterUpdating_[sid]->load(std::memory_order_acquire)) {}
430  #endif
431 
432  //update processed count to be complete at this time
433  doStreamEOLSnapshot(sc.eventID().luminosityBlock(),sid);
434  //reset this in case stream does not get notified of next lumi (we keep processed events only)
435  ministate_[sid]=&nopath_;
437  }
static const edm::ModuleDescription reservedMicroStateNames[mCOUNT]
std::vector< std::atomic< bool > * > streamCounterUpdating_
void doStreamEOLSnapshot(const unsigned int ls, const unsigned int streamID)
static const std::string nopath_
def load
Definition: svgfig.py:546
std::vector< const void * > microstate_
std::vector< const void * > ministate_
void evf::FastMonitoringService::reportEventsThisLumiInSource ( unsigned int  lumi,
unsigned int  events 
)

Definition at line 661 of file FastMonitoringService.cc.

References fmt_, CommonMethods::lock(), evf::FastMonitoringThread::monlock_, and sourceEventsReport_.

Referenced by FedRawDataInputSource::checkNextEvent(), and FedRawDataInputSource::getNextEvent().

662  {
663 
664  std::lock_guard<std::mutex> lock(fmt_.monlock_);
665  auto itr = sourceEventsReport_.find(lumi);
666  if (itr!=sourceEventsReport_.end())
667  itr->second+=events;
668  else
670 
671  }
tuple lumi
Definition: fjr2json.py:35
tuple events
Definition: patZpeak.py:19
std::map< unsigned int, unsigned int > sourceEventsReport_
void evf::FastMonitoringService::reportLockWait ( unsigned int  ls,
double  waitTime,
unsigned int  lockCount 
)

Definition at line 592 of file FastMonitoringService.cc.

References fmt_, CommonMethods::lock(), lockStatsDuringLumi_, eostools::ls(), evf::FastMonitoringThread::monlock_, and edmStreamStallGrapher::waitTime.

Referenced by FedRawDataInputSource::readSupervisor().

593  {
594  std::lock_guard<std::mutex> lock(fmt_.monlock_);
595  lockStatsDuringLumi_[ls]=std::pair<double,unsigned int>(waitTime,lockCount);
596 
597  }
def ls
Definition: eostools.py:346
std::map< unsigned int, std::pair< double, unsigned int > > lockStatsDuringLumi_
void evf::FastMonitoringService::setExceptionDetected ( unsigned int  ls)

Definition at line 249 of file FastMonitoringService.cc.

References exception_detected_, and exceptionInLS_.

Referenced by FedRawDataInputSource::getNextEvent().

249  {
250  if (!ls) exception_detected_=true;
251  else exceptionInLS_.push_back(ls);
252  }
def ls
Definition: eostools.py:346
std::vector< unsigned int > exceptionInLS_
void evf::FastMonitoringService::setMicroState ( MicroStateService::Microstate  m)
virtual

Implements evf::MicroStateService.

Definition at line 533 of file FastMonitoringService.cc.

References i, microstate_, nStreams_, and evf::MicroStateService::reservedMicroStateNames.

534  {
535  for (unsigned int i=0;i<nStreams_;i++)
537  }
int i
Definition: DBlmapReader.cc:9
static const edm::ModuleDescription reservedMicroStateNames[mCOUNT]
std::vector< const void * > microstate_
void evf::FastMonitoringService::setMicroState ( edm::StreamID  sid,
MicroStateService::Microstate  m 
)
virtual

Implements evf::MicroStateService.

Definition at line 540 of file FastMonitoringService.cc.

References contentValuesFiles::m, microstate_, and evf::MicroStateService::reservedMicroStateNames.

541  {
543  }
static const edm::ModuleDescription reservedMicroStateNames[mCOUNT]
std::vector< const void * > microstate_
void evf::FastMonitoringService::startedLookingForFile ( )

Definition at line 558 of file FastMonitoringService.cc.

References fileLookStart_.

Referenced by FedRawDataInputSource::readSupervisor().

558  {
559  gettimeofday(&fileLookStart_, 0);
560  /*
561  std::cout << "Started looking for .raw file at: s=" << fileLookStart_.tv_sec << ": ms = "
562  << fileLookStart_.tv_usec / 1000.0 << std::endl;
563  */
564  }
void evf::FastMonitoringService::stoppedLookingForFile ( unsigned int  lumi)

Definition at line 566 of file FastMonitoringService.cc.

References avgLeadTime_, fileLookStart_, fileLookStop_, fmt_, i, leadTimes_, CommonMethods::lock(), fjr2json::lumi, lumiFromSource_, and evf::FastMonitoringThread::monlock_.

Referenced by FedRawDataInputSource::readSupervisor().

566  {
567  gettimeofday(&fileLookStop_, 0);
568  /*
569  std::cout << "Stopped looking for .raw file at: s=" << fileLookStop_.tv_sec << ": ms = "
570  << fileLookStop_.tv_usec / 1000.0 << std::endl;
571  */
572  std::lock_guard<std::mutex> lock(fmt_.monlock_);
573 
574  if (lumi>lumiFromSource_) {
576  leadTimes_.clear();
577  }
578  unsigned long elapsedTime = (fileLookStop_.tv_sec - fileLookStart_.tv_sec) * 1000000 // sec to us
579  + (fileLookStop_.tv_usec - fileLookStart_.tv_usec); // us
580  // add this to lead times for this lumi
581  leadTimes_.push_back((double)elapsedTime);
582 
583  // recompute average lead time for this lumi
584  if (leadTimes_.size() == 1) avgLeadTime_[lumi] = leadTimes_[0];
585  else {
586  double totTime = 0;
587  for (unsigned int i = 0; i < leadTimes_.size(); i++) totTime += leadTimes_[i];
588  avgLeadTime_[lumi] = 0.001*(totTime / leadTimes_.size());
589  }
590  }
int i
Definition: DBlmapReader.cc:9
tuple lumi
Definition: fjr2json.py:35
std::map< unsigned int, double > avgLeadTime_
std::vector< double > leadTimes_

Member Data Documentation

std::map<unsigned int, unsigned long> evf::FastMonitoringService::accuSize_
private
std::map<unsigned int, double> evf::FastMonitoringService::avgLeadTime_
private

Definition at line 226 of file FastMonitoringService.h.

Referenced by doSnapshot(), preGlobalBeginLumi(), and stoppedLookingForFile().

std::vector<std::atomic<bool>*> evf::FastMonitoringService::collectedPathList_
private

Definition at line 241 of file FastMonitoringService.h.

Referenced by preallocate(), and prePathEvent().

Encoding evf::FastMonitoringService::encModule_
private
std::vector<Encoding> evf::FastMonitoringService::encPath_
private
std::vector<unsigned int> evf::FastMonitoringService::eventCountForPathInit_
private

Definition at line 242 of file FastMonitoringService.h.

Referenced by postEvent(), preallocate(), and prePathEvent().

bool evf::FastMonitoringService::exception_detected_ = false
private
std::vector<unsigned int> evf::FastMonitoringService::exceptionInLS_
private
std::string evf::FastMonitoringService::fastMicrostateDefPath_
private

Definition at line 203 of file FastMonitoringService.h.

Referenced by preallocate().

unsigned int evf::FastMonitoringService::fastMonIntervals_
private

Definition at line 201 of file FastMonitoringService.h.

Referenced by dowork().

std::string evf::FastMonitoringService::fastName_
private

Definition at line 204 of file FastMonitoringService.h.

Referenced by preallocate().

std::string evf::FastMonitoringService::fastPath_
private

Definition at line 204 of file FastMonitoringService.h.

Referenced by dowork(), and preallocate().

timeval evf::FastMonitoringService::fileLookStart_
private

Definition at line 209 of file FastMonitoringService.h.

Referenced by startedLookingForFile(), and stoppedLookingForFile().

timeval evf::FastMonitoringService::fileLookStop_
private

Definition at line 209 of file FastMonitoringService.h.

Referenced by stoppedLookingForFile().

std::map<unsigned int, unsigned int> evf::FastMonitoringService::filesProcessedDuringLumi_
private

Definition at line 227 of file FastMonitoringService.h.

Referenced by accumulateFileSize(), doSnapshot(), and preGlobalBeginLumi().

std::vector<unsigned long> evf::FastMonitoringService::firstEventId_
private

Definition at line 240 of file FastMonitoringService.h.

Referenced by preallocate(), and prePathEvent().

FastMonitoringThread evf::FastMonitoringService::fmt_
private
bool evf::FastMonitoringService::isGlobalLumiTransition_
private
unsigned int evf::FastMonitoringService::lastGlobalLumi_
private

Definition at line 211 of file FastMonitoringService.h.

Referenced by dowork(), preallocate(), and preGlobalBeginLumi().

std::queue<unsigned int> evf::FastMonitoringService::lastGlobalLumisClosed_
private

Definition at line 212 of file FastMonitoringService.h.

Referenced by postGlobalEndLumi(), and preGlobalBeginLumi().

std::vector<double> evf::FastMonitoringService::leadTimes_
private

Definition at line 230 of file FastMonitoringService.h.

Referenced by stoppedLookingForFile().

std::map<unsigned int, std::pair<double,unsigned int> > evf::FastMonitoringService::lockStatsDuringLumi_
private

Definition at line 231 of file FastMonitoringService.h.

Referenced by doSnapshot(), preGlobalBeginLumi(), and reportLockWait().

unsigned int evf::FastMonitoringService::lumiFromSource_
private

Definition at line 214 of file FastMonitoringService.h.

Referenced by preallocate(), and stoppedLookingForFile().

std::map<unsigned int, timeval> evf::FastMonitoringService::lumiStartTime_
private

Definition at line 208 of file FastMonitoringService.h.

Referenced by preGlobalBeginLumi(), and preGlobalEndLumi().

FastMonitoringThread::Macrostate evf::FastMonitoringService::macrostate_
private
const std::string evf::FastMonitoringService::macroStateNames
static
Initial value:
=
{"Init","JobReady","RunGiven","Running",
"Stopping","Done","JobEnded","Error","ErrorEnded","End",
"Invalid"}

Definition at line 108 of file FastMonitoringService.h.

std::vector<const void*> evf::FastMonitoringService::microstate_
private
std::string evf::FastMonitoringService::microstateDefPath_
private

Definition at line 203 of file FastMonitoringService.h.

Referenced by preallocate().

std::vector<const void*> evf::FastMonitoringService::ministate_
private
std::string evf::FastMonitoringService::moduleLegendFile_
private

Definition at line 253 of file FastMonitoringService.h.

Referenced by postBeginJob(), and preallocate().

std::atomic<bool> evf::FastMonitoringService::monInit_
private

Definition at line 257 of file FastMonitoringService.h.

Referenced by dowork(), and preallocate().

const std::string evf::FastMonitoringService::nopath_ = "NoPath"
static
unsigned int evf::FastMonitoringService::nStreams_
private

Definition at line 198 of file FastMonitoringService.h.

Referenced by doSnapshot(), preallocate(), and setMicroState().

unsigned int evf::FastMonitoringService::nThreads_
private

Definition at line 199 of file FastMonitoringService.h.

Referenced by preallocate().

std::string evf::FastMonitoringService::pathLegendFile_
private

Definition at line 254 of file FastMonitoringService.h.

Referenced by preallocate(), and prePathEvent().

bool evf::FastMonitoringService::pathLegendWritten_ = false
private

Definition at line 255 of file FastMonitoringService.h.

Referenced by prePathEvent().

std::vector<bool> evf::FastMonitoringService::pathNamesReady_
private

Definition at line 243 of file FastMonitoringService.h.

std::map<unsigned int, unsigned int> evf::FastMonitoringService::processedEventsPerLumi_
private
boost::filesystem::path evf::FastMonitoringService::runDirectory_
private

Definition at line 245 of file FastMonitoringService.h.

Referenced by getRunDirName(), and preallocate().

int evf::FastMonitoringService::sleepTime_
private

Definition at line 200 of file FastMonitoringService.h.

Referenced by dowork().

std::string evf::FastMonitoringService::slowName_
private

Definition at line 204 of file FastMonitoringService.h.

Referenced by preGlobalEndLumi().

unsigned int evf::FastMonitoringService::snapCounter_ = 0
private

Definition at line 202 of file FastMonitoringService.h.

Referenced by dowork().

std::map<unsigned int,unsigned int> evf::FastMonitoringService::sourceEventsReport_
private

Definition at line 247 of file FastMonitoringService.h.

Referenced by preGlobalEndLumi(), and reportEventsThisLumiInSource().

std::vector<std::atomic<bool>*> evf::FastMonitoringService::streamCounterUpdating_
private

Definition at line 238 of file FastMonitoringService.h.

Referenced by postEvent(), preallocate(), and preStreamEndLumi().

bool evf::FastMonitoringService::threadIDAvailable_ = false
private

Definition at line 249 of file FastMonitoringService.h.

Referenced by preallocate().

std::vector<const void*> evf::FastMonitoringService::threadMicrostate_
private

Definition at line 222 of file FastMonitoringService.h.

std::atomic<unsigned long> evf::FastMonitoringService::totalEventsProcessed_
private

Definition at line 251 of file FastMonitoringService.h.

Referenced by postEvent().

boost::filesystem::path evf::FastMonitoringService::workingDirectory_
private

Definition at line 245 of file FastMonitoringService.h.

Referenced by preallocate(), and preGlobalEndLumi().