CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
List of all members | Classes | Public Member Functions | Static Public Attributes | Private Member Functions | Private Attributes
evf::FastMonitoringService Class Reference

#include <FastMonitoringService.h>

Inheritance diagram for evf::FastMonitoringService:
evf::MicroStateService

Classes

struct  Encoding
 

Public Member Functions

void accumulateFileSize (unsigned int lumi, unsigned long fileSize)
 
 FastMonitoringService (const edm::ParameterSet &, edm::ActivityRegistry &)
 
bool getAbortFlagForLumi (unsigned int lumi)
 
unsigned int getEventsProcessedForLumi (unsigned int lumi, bool *abortFlag=0)
 
std::string getRunDirName () const
 
void jobFailure ()
 
std::string makeModuleLegenda ()
 
std::string makePathLegenda ()
 
void postBeginJob ()
 
void postEndJob ()
 
void postEvent (edm::StreamContext const &)
 
void postGlobalBeginRun (edm::GlobalContext const &)
 
void postGlobalEndLumi (edm::GlobalContext const &)
 
void postModuleEvent (edm::StreamContext const &, edm::ModuleCallingContext const &)
 
void postSourceEvent (edm::StreamID)
 
void postStreamBeginLumi (edm::StreamContext const &)
 
void postStreamEndLumi (edm::StreamContext const &)
 
void preallocate (edm::service::SystemBounds const &)
 
void preEvent (edm::StreamContext const &)
 
void preGlobalBeginLumi (edm::GlobalContext const &)
 
void preGlobalEarlyTermination (edm::GlobalContext const &, edm::TerminationOrigin)
 
void preGlobalEndLumi (edm::GlobalContext const &)
 
void preModuleBeginJob (edm::ModuleDescription const &)
 
void preModuleEvent (edm::StreamContext const &, edm::ModuleCallingContext const &)
 
void prePathEvent (edm::StreamContext const &, edm::PathContext const &)
 
void preSourceEarlyTermination (edm::TerminationOrigin)
 
void preSourceEvent (edm::StreamID)
 
void preStreamBeginLumi (edm::StreamContext const &)
 
void preStreamEarlyTermination (edm::StreamContext const &, edm::TerminationOrigin)
 
void preStreamEndLumi (edm::StreamContext const &)
 
void reportEventsThisLumiInSource (unsigned int lumi, unsigned int events)
 
void reportLockWait (unsigned int ls, double waitTime, unsigned int lockCount)
 
void setExceptionDetected (unsigned int ls)
 
void setMicroState (MicroStateService::Microstate)
 
void setMicroState (edm::StreamID, MicroStateService::Microstate)
 
bool shouldWriteFiles (unsigned int lumi, unsigned int *proc=0)
 
void startedLookingForFile ()
 
void stoppedLookingForFile (unsigned int lumi)
 
 ~FastMonitoringService ()
 
- Public Member Functions inherited from evf::MicroStateService
virtual std::string getMicroState1 ()
 
virtual std::string const & getMicroState2 ()
 
 MicroStateService (const edm::ParameterSet &, edm::ActivityRegistry &)
 
virtual ~MicroStateService ()
 

Static Public Attributes

static const std::string macroStateNames [FastMonitoringThread::MCOUNT]
 
static const std::string nopath_ = "NoPath"
 
- Static Public Attributes inherited from evf::MicroStateService
static const edm::ModuleDescription reservedMicroStateNames [mCOUNT]
 

Private Member Functions

void doSnapshot (const unsigned int ls, const bool isGlobalEOL)
 
void doStreamEOLSnapshot (const unsigned int ls, const unsigned int streamID)
 
void dowork ()
 

Private Attributes

std::map< unsigned int,
unsigned long > 
accuSize_
 
std::map< unsigned int, double > avgLeadTime_
 
std::vector< std::atomic< bool > * > collectedPathList_
 
bool emptyLumisectionMode_ = false
 
Encoding encModule_
 
std::vector< EncodingencPath_
 
std::vector< unsigned int > eventCountForPathInit_
 
bool exception_detected_ = false
 
std::vector< unsigned int > exceptionInLS_
 
std::string fastMicrostateDefPath_
 
unsigned int fastMonIntervals_
 
std::string fastName_
 
std::string fastPath_
 
timeval fileLookStart_
 
timeval fileLookStop_
 
std::map< unsigned int,
unsigned int > 
filesProcessedDuringLumi_
 
std::vector< unsigned long > firstEventId_
 
FastMonitoringThread fmt_
 
bool isGlobalLumiTransition_
 
unsigned int lastGlobalLumi_
 
std::queue< unsigned int > lastGlobalLumisClosed_
 
std::vector< double > leadTimes_
 
std::map< unsigned int,
std::pair< double, unsigned
int > > 
lockStatsDuringLumi_
 
unsigned int lumiFromSource_
 
std::map< unsigned int, timeval > lumiStartTime_
 
FastMonitoringThread::Macrostate macrostate_
 
std::vector< const void * > microstate_
 
std::string microstateDefPath_
 
std::vector< const void * > ministate_
 
std::string moduleLegendFile_
 
std::atomic< bool > monInit_
 
unsigned int nStreams_
 
unsigned int nThreads_
 
std::string pathLegendFile_
 
bool pathLegendWritten_ = false
 
std::vector< bool > pathNamesReady_
 
std::map< unsigned int,
std::pair< unsigned int, bool > > 
processedEventsPerLumi_
 
boost::filesystem::path runDirectory_
 
int sleepTime_
 
std::string slowName_
 
unsigned int snapCounter_ = 0
 
std::map< unsigned int,
unsigned int > 
sourceEventsReport_
 
std::vector< std::atomic< bool > * > streamCounterUpdating_
 
bool threadIDAvailable_ = false
 
std::vector< const void * > threadMicrostate_
 
std::atomic< unsigned long > totalEventsProcessed_
 
boost::filesystem::path workingDirectory_
 

Additional Inherited Members

- Public Types inherited from evf::MicroStateService
enum  Microstate {
  mInvalid = 0, mFwkOvh, mIdle, mInput,
  mInputDone, mDqm, mEoL, mCOUNT
}
 
- Static Protected Attributes inherited from evf::MicroStateService
static const std::string default_return_ ="NotImplemented"
 

Detailed Description

Definition at line 51 of file FastMonitoringService.h.

Constructor & Destructor Documentation

evf::FastMonitoringService::FastMonitoringService ( const edm::ParameterSet iPS,
edm::ActivityRegistry reg 
)

Definition at line 32 of file FastMonitoringService.cc.

References jobFailure(), postBeginJob(), postEndJob(), postEvent(), postGlobalEndLumi(), postModuleEvent(), postSourceEvent(), postStreamBeginLumi(), postStreamEndLumi(), preallocate(), preEvent(), preGlobalBeginLumi(), preGlobalEarlyTermination(), preGlobalEndLumi(), preModuleBeginJob(), preModuleEvent(), prePathEvent(), preSourceEarlyTermination(), preSourceEvent(), preStreamBeginLumi(), preStreamEarlyTermination(), preStreamEndLumi(), edm::ActivityRegistry::watchJobFailure(), edm::ActivityRegistry::watchPostBeginJob(), edm::ActivityRegistry::watchPostEndJob(), edm::ActivityRegistry::watchPostEvent(), edm::ActivityRegistry::watchPostGlobalEndLumi(), edm::ActivityRegistry::watchPostModuleEvent(), edm::ActivityRegistry::watchPostSourceEvent(), edm::ActivityRegistry::watchPostStreamBeginLumi(), edm::ActivityRegistry::watchPostStreamEndLumi(), edm::ActivityRegistry::watchPreallocate(), edm::ActivityRegistry::watchPreEvent(), edm::ActivityRegistry::watchPreGlobalBeginLumi(), edm::ActivityRegistry::watchPreGlobalEarlyTermination(), edm::ActivityRegistry::watchPreGlobalEndLumi(), edm::ActivityRegistry::watchPreModuleBeginJob(), edm::ActivityRegistry::watchPreModuleEvent(), edm::ActivityRegistry::watchPrePathEvent(), edm::ActivityRegistry::watchPreSourceEarlyTermination(), edm::ActivityRegistry::watchPreSourceEvent(), edm::ActivityRegistry::watchPreStreamBeginLumi(), edm::ActivityRegistry::watchPreStreamEarlyTermination(), and edm::ActivityRegistry::watchPreStreamEndLumi().

33  :
34  MicroStateService(iPS,reg)
35  ,encModule_(33)
36  ,nStreams_(0)//until initialized
37  ,sleepTime_(iPS.getUntrackedParameter<int>("sleepTime", 1))
38  ,fastMonIntervals_(iPS.getUntrackedParameter<unsigned int>("fastMonIntervals", 1))
39  ,microstateDefPath_(iPS.getUntrackedParameter<std::string> ("microstateDefPath", std::string(getenv("CMSSW_BASE"))+"/src/EventFilter/Utilities/plugins/microstatedef.jsd"))
41  ,fastName_(iPS.getUntrackedParameter<std::string>("fastName", "fastmoni"))
42  ,slowName_(iPS.getUntrackedParameter<std::string>("slowName", "slowmoni"))
44  {
45  reg.watchPreallocate(this, &FastMonitoringService::preallocate);//receiving information on number of threads
47 
51 
55 
60 
62 
65 
66  reg.watchPreSourceEvent(this,&FastMonitoringService::preSourceEvent);//source (with streamID of requestor)
68 
71 
75  }
void prePathEvent(edm::StreamContext const &, edm::PathContext const &)
void watchPreStreamEarlyTermination(PreStreamEarlyTermination::slot_type const &iSlot)
T getUntrackedParameter(std::string const &, T const &) const
void watchPreEvent(PreEvent::slot_type const &iSlot)
void watchPrePathEvent(PrePathEvent::slot_type const &iSlot)
void watchPreallocate(Preallocate::slot_type const &iSlot)
void watchPostEndJob(PostEndJob::slot_type const &iSlot)
void preallocate(edm::service::SystemBounds const &)
void preGlobalBeginLumi(edm::GlobalContext const &)
void watchPreModuleEvent(PreModuleEvent::slot_type const &iSlot)
void postGlobalEndLumi(edm::GlobalContext const &)
void postEvent(edm::StreamContext const &)
void watchPreGlobalEndLumi(PreGlobalEndLumi::slot_type const &iSlot)
void watchPostEvent(PostEvent::slot_type const &iSlot)
void watchPostStreamEndLumi(PostStreamEndLumi::slot_type const &iSlot)
void watchPreGlobalBeginLumi(PreGlobalBeginLumi::slot_type const &iSlot)
void watchPostModuleEvent(PostModuleEvent::slot_type const &iSlot)
void watchPostSourceEvent(PostSourceEvent::slot_type const &iSlot)
void preGlobalEndLumi(edm::GlobalContext const &)
void watchPreStreamEndLumi(PreStreamEndLumi::slot_type const &iSlot)
void preGlobalEarlyTermination(edm::GlobalContext const &, edm::TerminationOrigin)
void watchPreSourceEarlyTermination(PreSourceEarlyTermination::slot_type const &iSlot)
void watchJobFailure(JobFailure::slot_type const &iSlot)
convenience function for attaching to signal
void preModuleBeginJob(edm::ModuleDescription const &)
MicroStateService(const edm::ParameterSet &, edm::ActivityRegistry &)
void preStreamEndLumi(edm::StreamContext const &)
void watchPostStreamBeginLumi(PostStreamBeginLumi::slot_type const &iSlot)
void preStreamEarlyTermination(edm::StreamContext const &, edm::TerminationOrigin)
void watchPreGlobalEarlyTermination(PreGlobalEarlyTermination::slot_type const &iSlot)
void watchPostGlobalEndLumi(PostGlobalEndLumi::slot_type const &iSlot)
void watchPreModuleBeginJob(PreModuleBeginJob::slot_type const &iSlot)
void postModuleEvent(edm::StreamContext const &, edm::ModuleCallingContext const &)
void postStreamBeginLumi(edm::StreamContext const &)
void postStreamEndLumi(edm::StreamContext const &)
void preStreamBeginLumi(edm::StreamContext const &)
std::atomic< unsigned long > totalEventsProcessed_
void watchPreStreamBeginLumi(PreStreamBeginLumi::slot_type const &iSlot)
void preEvent(edm::StreamContext const &)
void preSourceEarlyTermination(edm::TerminationOrigin)
void watchPreSourceEvent(PreSourceEvent::slot_type const &iSlot)
void preModuleEvent(edm::StreamContext const &, edm::ModuleCallingContext const &)
void watchPostBeginJob(PostBeginJob::slot_type const &iSlot)
convenience function for attaching to signal
evf::FastMonitoringService::~FastMonitoringService ( )

Definition at line 78 of file FastMonitoringService.cc.

79  {
80  }

Member Function Documentation

void evf::FastMonitoringService::accumulateFileSize ( unsigned int  lumi,
unsigned long  fileSize 
)

Definition at line 548 of file FastMonitoringService.cc.

References accuSize_, filesProcessedDuringLumi_, fmt_, CommonMethods::lock(), fjr2json::lumi, and evf::FastMonitoringThread::monlock_.

Referenced by evf::EvFDaqDirector::bumpFile().

548  {
549  std::lock_guard<std::mutex> lock(fmt_.monlock_);
550 
551  if (accuSize_.find(lumi)==accuSize_.end()) accuSize_[lumi] = fileSize;
552  else accuSize_[lumi] += fileSize;
553 
556  else
558  }
std::map< unsigned int, unsigned long > accuSize_
tuple lumi
Definition: fjr2json.py:35
std::map< unsigned int, unsigned int > filesProcessedDuringLumi_
void evf::FastMonitoringService::doSnapshot ( const unsigned int  ls,
const bool  isGlobalEOL 
)
private

Definition at line 632 of file FastMonitoringService.cc.

References avgLeadTime_, encModule_, evf::FastMonitoringService::Encoding::encode(), encPath_, evf::FastMonitoringThread::MonitorData::fastAvgLeadTimeJ_, evf::FastMonitoringThread::MonitorData::fastFilesProcessedJ_, evf::FastMonitoringThread::MonitorData::fastLockCountJ_, evf::FastMonitoringThread::MonitorData::fastLockWaitJ_, evf::FastMonitoringThread::MonitorData::fastMacrostateJ_, filesProcessedDuringLumi_, fmt_, i, isGlobalLumiTransition_, evf::FastMonitoringThread::jsonMonitor_, lockStatsDuringLumi_, evf::FastMonitoringThread::m_data, macrostate_, microstate_, evf::FastMonitoringThread::MonitorData::microstateEncoded_, ministate_, evf::FastMonitoringThread::MonitorData::ministateEncoded_, and nStreams_.

Referenced by dowork(), and preGlobalEndLumi().

632  {
633  // update macrostate
635 
636  //update these unless in the midst of a global transition
638 
639  auto itd = avgLeadTime_.find(ls);
640  if (itd != avgLeadTime_.end())
641  fmt_.m_data.fastAvgLeadTimeJ_ = itd->second;
642  else fmt_.m_data.fastAvgLeadTimeJ_=0.;
643 
644  auto iti = filesProcessedDuringLumi_.find(ls);
645  if (iti != filesProcessedDuringLumi_.end())
646  fmt_.m_data.fastFilesProcessedJ_ = iti->second;
648 
649  auto itrd = lockStatsDuringLumi_.find(ls);
650  if (itrd != lockStatsDuringLumi_.end()) {
651  fmt_.m_data.fastLockWaitJ_ = itrd->second.first;
652  fmt_.m_data.fastLockCountJ_ = itrd->second.second;
653  }
654  else {
657  }
658 
659  }
660  else return;
661 
662  //capture latest mini/microstate of streams
663  for (unsigned int i=0;i<nStreams_;i++) {
666  }
667  //for (unsigned int i=0;i<nThreads_;i++)
668  // fmt_.m_data.threadMicrostateEncoded_[i] = encModule_.encode(threadMicrostate_[i]);
669 
670  if (isGlobalEOL)
671  {//only update global variables
672  fmt_.jsonMonitor_->snapGlobal(ls);
673  }
674  else
675  fmt_.jsonMonitor_->snap(ls);
676  }
int i
Definition: DBlmapReader.cc:9
std::map< unsigned int, unsigned int > filesProcessedDuringLumi_
std::map< unsigned int, double > avgLeadTime_
std::map< unsigned int, std::pair< double, unsigned int > > lockStatsDuringLumi_
std::vector< const void * > microstate_
std::vector< unsigned int > microstateEncoded_
FastMonitoringThread::Macrostate macrostate_
std::unique_ptr< FastMonitor > jsonMonitor_
std::vector< const void * > ministate_
std::vector< Encoding > encPath_
std::vector< unsigned int > ministateEncoded_
void evf::FastMonitoringService::doStreamEOLSnapshot ( const unsigned int  ls,
const unsigned int  streamID 
)
inlineprivate

Definition at line 168 of file FastMonitoringService.h.

References fmt_, and evf::FastMonitoringThread::jsonMonitor_.

Referenced by preStreamEndLumi().

168  {
169  //pick up only event count here
170  fmt_.jsonMonitor_->snapStreamAtomic(ls,streamID);
171  }
std::unique_ptr< FastMonitor > jsonMonitor_
void evf::FastMonitoringService::dowork ( )
inlineprivate

Definition at line 173 of file FastMonitoringService.h.

References doSnapshot(), encModule_, evf::FastMonitoringService::Encoding::encode(), encPath_, evf::FastMonitoringThread::MonitorData::fastMacrostateJ_, fastMonIntervals_, fastPath_, fmt_, evf::FastMonitoringThread::jsonMonitor_, lastGlobalLumi_, CommonMethods::lock(), evf::FastMonitoringThread::m_data, evf::FastMonitoringThread::m_stoprequest, microstate_, ministate_, monInit_, evf::FastMonitoringThread::monlock_, sleepTime_, snapCounter_, AlCaHLTBitMon_QueryRunRegistry::string, and jsoncollector::IntJ::value().

Referenced by preallocate().

173  { // the function to be called in the thread. Thread completes when function returns.
174  monInit_.exchange(true,std::memory_order_acquire);
175  while (!fmt_.m_stoprequest) {
176  edm::LogInfo("FastMonitoringService") << "Current states: Ms=" << fmt_.m_data.fastMacrostateJ_.value()
177  << " ms=" << encPath_[0].encode(ministate_[0])
178  << " us=" << encModule_.encode(microstate_[0]) << std::endl;
179 
180  {
181  std::lock_guard<std::mutex> lock(fmt_.monlock_);
182 
184 
186  std::string CSV = fmt_.jsonMonitor_->getCSVString();
187  //release mutex before writing out fast path file
188  fmt_.monlock_.unlock();
189  if (CSV.size())
190  fmt_.jsonMonitor_->outputCSV(fastPath_,CSV);
191  }
192 
193  snapCounter_++;
194 
195  }
196  ::sleep(sleepTime_);
197  }
198  }
void doSnapshot(const unsigned int ls, const bool isGlobalEOL)
std::vector< const void * > microstate_
std::atomic< bool > m_stoprequest
std::unique_ptr< FastMonitor > jsonMonitor_
std::vector< const void * > ministate_
std::vector< Encoding > encPath_
bool evf::FastMonitoringService::getAbortFlagForLumi ( unsigned int  lumi)

Definition at line 618 of file FastMonitoringService.cc.

References edm::hlt::Exception, fmt_, CommonMethods::lock(), fjr2json::lumi, evf::FastMonitoringThread::monlock_, and processedEventsPerLumi_.

Referenced by shouldWriteFiles().

618  {
619  std::lock_guard<std::mutex> lock(fmt_.monlock_);
620 
621  auto it = processedEventsPerLumi_.find(lumi);
622  if (it!=processedEventsPerLumi_.end()) {
623  unsigned int abortFlag = it->second.second;
624  return abortFlag;
625  }
626  else {
627  throw cms::Exception("FastMonitoringService") << "output module wants already deleted (or never reported by SOURCE) lumisection status for LUMI -: "<<lumi;
628  return 0;
629  }
630  }
tuple lumi
Definition: fjr2json.py:35
std::map< unsigned int, std::pair< unsigned int, bool > > processedEventsPerLumi_
unsigned int evf::FastMonitoringService::getEventsProcessedForLumi ( unsigned int  lumi,
bool *  abortFlag = 0 
)

Definition at line 602 of file FastMonitoringService.cc.

References edm::hlt::Exception, fmt_, CommonMethods::lock(), fjr2json::lumi, evf::FastMonitoringThread::monlock_, proc, and processedEventsPerLumi_.

Referenced by DQMFileSaver::fillJson(), dqm::DQMFileSaverBase::fillJson(), DQMFileSaver::saveForFilterUnit(), and shouldWriteFiles().

602  {
603  std::lock_guard<std::mutex> lock(fmt_.monlock_);
604 
605  auto it = processedEventsPerLumi_.find(lumi);
606  if (it!=processedEventsPerLumi_.end()) {
607  unsigned int proc = it->second.first;
608  if (abortFlag) *abortFlag=it->second.second;
609  return proc;
610  }
611  else {
612  throw cms::Exception("FastMonitoringService") << "output module wants already deleted (or never reported by SOURCE) lumisection event count for LUMI -: "<<lumi;
613  return 0;
614  }
615  }
TrainProcessor *const proc
Definition: MVATrainer.cc:101
tuple lumi
Definition: fjr2json.py:35
std::map< unsigned int, std::pair< unsigned int, bool > > processedEventsPerLumi_
std::string evf::FastMonitoringService::getRunDirName ( ) const
inline

Definition at line 162 of file FastMonitoringService.h.

References runDirectory_.

162 { return runDirectory_.stem().string(); }
boost::filesystem::path runDirectory_
void evf::FastMonitoringService::jobFailure ( )
std::string evf::FastMonitoringService::makeModuleLegenda ( )

Definition at line 94 of file FastMonitoringService.cc.

References evf::FastMonitoringService::Encoding::current_, evf::FastMonitoringService::Encoding::decode(), encModule_, and i.

Referenced by postBeginJob().

95  {
96  std::ostringstream ost;
97  for(int i = 0; i < encModule_.current_; i++)
98  ost<<i<<"="<<((const edm::ModuleDescription *)(encModule_.decode(i)))->moduleLabel()<<" ";
99  return ost.str();
100  }
int i
Definition: DBlmapReader.cc:9
const void * decode(unsigned int index)
std::string evf::FastMonitoringService::makePathLegenda ( )

Definition at line 83 of file FastMonitoringService.cc.

References edm::decode(), encPath_, i, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by prePathEvent().

83  {
84  //taken from first stream
85  std::ostringstream ost;
86  for(int i = 0;
87  i < encPath_[0].current_;
88  i++)
89  ost<<i<<"="<<*((std::string *)(encPath_[0].decode(i)))<<" ";
90  return ost.str();
91  }
int i
Definition: DBlmapReader.cc:9
bool decode(bool &, std::string const &)
Definition: types.cc:62
std::vector< Encoding > encPath_
void evf::FastMonitoringService::postBeginJob ( )

Definition at line 275 of file FastMonitoringService.cc.

References encModule_, fmt_, CommonMethods::lock(), evf::FastMonitoringThread::m_data, macrostate_, makeModuleLegenda(), evf::FastMonitoringThread::MonitorData::microstateBins_, moduleLegendFile_, evf::FastMonitoringThread::monlock_, evf::FastMonitoringThread::sJobReady, AlCaHLTBitMon_QueryRunRegistry::string, and evf::FastMonitoringService::Encoding::vecsize().

Referenced by FastMonitoringService().

276  {
277  std::string && moduleLegStr = makeModuleLegenda();
278  FileIO::writeStringToFile(moduleLegendFile_, moduleLegStr);
279 
281 
282  //update number of entries in module histogram
283  std::lock_guard<std::mutex> lock(fmt_.monlock_);
285  }
FastMonitoringThread::Macrostate macrostate_
void evf::FastMonitoringService::postEndJob ( )
void evf::FastMonitoringService::postEvent ( edm::StreamContext const &  sc)

Definition at line 483 of file FastMonitoringService.cc.

References eventCountForPathInit_, evf::FastMonitoringThread::MonitorData::fastPathProcessedJ_, fmt_, evf::FastMonitoringThread::m_data, microstate_, evf::MicroStateService::mIdle, ministate_, nopath_, evf::FastMonitoringThread::MonitorData::processed_, evf::MicroStateService::reservedMicroStateNames, streamCounterUpdating_, edm::StreamContext::streamID(), and totalEventsProcessed_.

Referenced by FastMonitoringService().

484  {
485  microstate_[sc.streamID()] = &reservedMicroStateNames[mIdle];
486 
487  ministate_[sc.streamID()] = &nopath_;
488 
489  #if ATOMIC_LEVEL>=2
490  //use atomic flag to make sure end of lumi sees this
491  streamCounterUpdating_[sc.streamID()]->store(true,std::memory_order_release);
492  fmt_.m_data.processed_[sc.streamID()]->fetch_add(1,std::memory_order_release);
493  streamCounterUpdating_[sc.streamID()]->store(false,std::memory_order_release);
494 
495  #elif ATOMIC_LEVEL==1
496  //writes are atomic, we assume writes propagate to memory before stream EOL snap
497  fmt_.m_data.processed_[sc.streamID()]->fetch_add(1,std::memory_order_relaxed);
498 
499  #elif ATOMIC_LEVEL==0 //default
500  (*(fmt_.m_data.processed_[sc.streamID()]))++;
501  #endif
502  eventCountForPathInit_[sc.streamID()]++;
503 
504  //fast path counter (events accumulated in a run)
505  unsigned long res = totalEventsProcessed_.fetch_add(1,std::memory_order_relaxed);
507  //fmt_.m_data.fastPathProcessedJ_ = totalEventsProcessed_.load(std::memory_order_relaxed);
508  }
static const edm::ModuleDescription reservedMicroStateNames[mCOUNT]
std::vector< AtomicMonUInt * > processed_
std::vector< unsigned int > eventCountForPathInit_
std::vector< std::atomic< bool > * > streamCounterUpdating_
static const std::string nopath_
std::vector< const void * > microstate_
std::atomic< unsigned long > totalEventsProcessed_
std::vector< const void * > ministate_
void evf::FastMonitoringService::postGlobalBeginRun ( edm::GlobalContext const &  gc)
void evf::FastMonitoringService::postGlobalEndLumi ( edm::GlobalContext const &  gc)

Definition at line 400 of file FastMonitoringService.cc.

References lastGlobalLumisClosed_, edm::LuminosityBlockID::luminosityBlock(), and edm::GlobalContext::luminosityBlockID().

Referenced by FastMonitoringService().

401  {
402  //mark closed lumis (still keep map entries until next one)
403  lastGlobalLumisClosed_.push(gc.luminosityBlockID().luminosityBlock());
404  }
std::queue< unsigned int > lastGlobalLumisClosed_
void evf::FastMonitoringService::postModuleEvent ( edm::StreamContext const &  sc,
edm::ModuleCallingContext const &  mcc 
)

Definition at line 525 of file FastMonitoringService.cc.

References evf::MicroStateService::mFwkOvh, microstate_, evf::MicroStateService::reservedMicroStateNames, edm::StreamContext::streamID(), and edm::StreamID::value().

Referenced by FastMonitoringService().

526  {
527  //microstate_[sc.streamID().value()] = (void*)(mcc.moduleDescription());
528  microstate_[sc.streamID().value()] = &reservedMicroStateNames[mFwkOvh];
529  }
static const edm::ModuleDescription reservedMicroStateNames[mCOUNT]
std::vector< const void * > microstate_
void evf::FastMonitoringService::postSourceEvent ( edm::StreamID  sid)

Definition at line 515 of file FastMonitoringService.cc.

References evf::MicroStateService::mFwkOvh, microstate_, evf::MicroStateService::reservedMicroStateNames, and edm::StreamID::value().

Referenced by FastMonitoringService().

516  {
518  }
static const edm::ModuleDescription reservedMicroStateNames[mCOUNT]
std::vector< const void * > microstate_
unsigned int value() const
Definition: StreamID.h:46
void evf::FastMonitoringService::postStreamBeginLumi ( edm::StreamContext const &  sc)
void evf::FastMonitoringService::postStreamEndLumi ( edm::StreamContext const &  sc)
void evf::FastMonitoringService::preallocate ( edm::service::SystemBounds const &  bounds)

Definition at line 103 of file FastMonitoringService.cc.

References collectedPathList_, evf::FastMonitoringService::Encoding::completeReservedWithDummies(), dowork(), emptyLumisectionMode_, encModule_, encPath_, eventCountForPathInit_, edm::hlt::Exception, fastMicrostateDefPath_, fastName_, fastPath_, firstEventId_, fmt_, i, isGlobalLumiTransition_, evf::FastMonitoringThread::jsonMonitor_, lastGlobalLumi_, LogDebug, lumiFromSource_, evf::FastMonitoringThread::m_data, macrostate_, evf::FastMonitoringThread::MonitorData::macrostateBins_, edm::service::SystemBounds::maxNumberOfStreams(), edm::service::SystemBounds::maxNumberOfThreads(), evf::FastMonitoringThread::MCOUNT, evf::MicroStateService::mCOUNT, microstate_, evf::FastMonitoringThread::MonitorData::microstateBins_, microstateDefPath_, ministate_, evf::FastMonitoringThread::MonitorData::ministateBins_, evf::MicroStateService::mInvalid, moduleLegendFile_, monInit_, nopath_, nStreams_, nThreads_, cppFunctionSkipper::operator, cmsHarvester::path, pathLegendFile_, evf::FastMonitoringThread::MonitorData::registerVariables(), evf::MicroStateService::reservedMicroStateNames, evf::FastMonitoringThread::resetFastMonitor(), runDirectory_, evf::FastMonitoringThread::sInit, evf::FastMonitoringThread::start(), streamCounterUpdating_, threadIDAvailable_, evf::FastMonitoringService::Encoding::updateReserved(), and workingDirectory_.

Referenced by FastMonitoringService().

104  {
105 
106  // FIND RUN DIRECTORY
107  // The run dir should be set via the configuration of EvFDaqDirector
108 
109  if (edm::Service<evf::EvFDaqDirector>().operator->()==nullptr)
110  {
111  throw cms::Exception("FastMonitoringService") << "EvFDaqDirector is not present";
112 
113  }
114  emptyLumisectionMode_ = edm::Service<evf::EvFDaqDirector>()->emptyLumisectionMode();
115  boost::filesystem::path runDirectory(edm::Service<evf::EvFDaqDirector>()->baseRunDir());
116  workingDirectory_ = runDirectory_ = runDirectory;
117  workingDirectory_ /= "mon";
118 
119  if ( !boost::filesystem::is_directory(workingDirectory_)) {
120  LogDebug("FastMonitoringService") << "<MON> DIR NOT FOUND! Trying to create -: " << workingDirectory_.string() ;
121  boost::filesystem::create_directories(workingDirectory_);
122  if ( !boost::filesystem::is_directory(workingDirectory_))
123  edm::LogWarning("FastMonitoringService") << "Unable to create <MON> DIR -: " << workingDirectory_.string()
124  << ". No monitoring data will be written.";
125  }
126 
127  std::ostringstream fastFileName;
128 
129  fastFileName << fastName_ << "_pid" << std::setfill('0') << std::setw(5) << getpid() << ".fast";
131  fast /= fastFileName.str();
132  fastPath_ = fast.string();
133 
134  std::ostringstream moduleLegFile;
135  moduleLegFile << "microstatelegend_pid" << std::setfill('0') << std::setw(5) << getpid() << ".leg";
136  moduleLegendFile_ = (workingDirectory_/moduleLegFile.str()).string();
137 
138  std::ostringstream pathLegFile;
139  pathLegFile << "pathlegend_pid" << std::setfill('0') << std::setw(5) << getpid() << ".leg";
140  pathLegendFile_ = (workingDirectory_/pathLegFile.str()).string();
141 
142  LogDebug("FastMonitoringService") << "Initializing FastMonitor with microstate def path -: "
144  //<< encPath_.current_ + 1 << " " << encModule_.current_ + 1
145 
146  nStreams_=bounds.maxNumberOfStreams();
147  nThreads_=bounds.maxNumberOfThreads();
148 
149  //this should already be >=1
150  if (nStreams_==0) nStreams_=1;
151  if (nThreads_==0) nThreads_=1;
152 
153  /*
154  * initialize the fast monitor with:
155  * vector of pointers to monitorable parameters
156  * path to definition
157  *
158  */
159 
161 
162  for(unsigned int i = 0; i < (mCOUNT); i++)
165 
166  for (unsigned int i=0;i<nStreams_;i++) {
167  ministate_.push_back(&nopath_);
169 
170  //for synchronization
171  streamCounterUpdating_.push_back(new std::atomic<bool>(0));
172 
173  //path (mini) state
174  encPath_.emplace_back(0);
175  encPath_[i].update((void*)&nopath_);
176  eventCountForPathInit_.push_back(0);
177  firstEventId_.push_back(0);
178  collectedPathList_.push_back(new std::atomic<bool>(0));
179 
180  }
181  //for (unsigned int i=0;i<nThreads_;i++)
182  // threadMicrostate_.push_back(&reservedMicroStateNames[mInvalid]);
183 
184  //initial size until we detect number of bins
188 
189  lastGlobalLumi_=0;
191  lumiFromSource_=0;
192 
193  //startup monitoring
195  fmt_.jsonMonitor_->setNStreams(nStreams_);
197  monInit_.store(false,std::memory_order_release);
199 
200  //this definition needs: #include "tbb/compat/thread"
201  //however this would results in TBB imeplementation replacing std::thread
202  //(both supposedly call pthread_self())
203  //number of threads created in process could be obtained from /proc,
204  //assuming that all posix threads are true kernel threads capable of running in parallel
205 
206  //#if TBB_IMPLEMENT_CPP0X
208  //threadIDAvailable_=true;
209  //#endif
210 
211  }
#define LogDebug(id)
int i
Definition: DBlmapReader.cc:9
static const edm::ModuleDescription reservedMicroStateNames[mCOUNT]
boost::filesystem::path runDirectory_
void start(void(FastMonitoringService::*fp)(), FastMonitoringService *cp)
std::vector< unsigned int > eventCountForPathInit_
std::vector< std::atomic< bool > * > streamCounterUpdating_
std::vector< std::atomic< bool > * > collectedPathList_
tuple path
else: Piece not in the list, fine.
static const std::string nopath_
void resetFastMonitor(std::string const &microStateDefPath, std::string const &fastMicroStateDefPath)
std::vector< unsigned long > firstEventId_
std::vector< const void * > microstate_
void registerVariables(FastMonitor *fm, unsigned int nStreams, unsigned int nThreads)
FastMonitoringThread::Macrostate macrostate_
boost::filesystem::path workingDirectory_
std::unique_ptr< FastMonitor > jsonMonitor_
std::vector< const void * > ministate_
std::vector< Encoding > encPath_
void evf::FastMonitoringService::preEvent ( edm::StreamContext const &  sc)

Definition at line 479 of file FastMonitoringService.cc.

Referenced by FastMonitoringService().

480  {
481  }
void evf::FastMonitoringService::preGlobalBeginLumi ( edm::GlobalContext const &  gc)

Definition at line 298 of file FastMonitoringService.cc.

References accuSize_, avgLeadTime_, filesProcessedDuringLumi_, fmt_, isGlobalLumiTransition_, lastGlobalLumi_, lastGlobalLumisClosed_, CommonMethods::lock(), lockStatsDuringLumi_, edm::LuminosityBlockID::luminosityBlock(), edm::GlobalContext::luminosityBlockID(), lumiStartTime_, evf::FastMonitoringThread::monlock_, and processedEventsPerLumi_.

Referenced by FastMonitoringService().

299  {
300 
301  timeval lumiStartTime;
302  gettimeofday(&lumiStartTime, 0);
303  unsigned int newLumi = gc.luminosityBlockID().luminosityBlock();
304 
305  std::lock_guard<std::mutex> lock(fmt_.monlock_);
306 
307  lumiStartTime_[newLumi]=lumiStartTime;
308  while (!lastGlobalLumisClosed_.empty()) {
309  //wipe out old map entries as they aren't needed and slow down access
310  unsigned int oldLumi = lastGlobalLumisClosed_.back();
312  lumiStartTime_.erase(oldLumi);
313  avgLeadTime_.erase(oldLumi);
314  filesProcessedDuringLumi_.erase(oldLumi);
315  accuSize_.erase(oldLumi);
316  lockStatsDuringLumi_.erase(oldLumi);
317  processedEventsPerLumi_.erase(oldLumi);
318  }
319  lastGlobalLumi_= newLumi;
321  }
std::map< unsigned int, timeval > lumiStartTime_
std::map< unsigned int, unsigned long > accuSize_
std::map< unsigned int, unsigned int > filesProcessedDuringLumi_
std::queue< unsigned int > lastGlobalLumisClosed_
std::map< unsigned int, double > avgLeadTime_
std::map< unsigned int, std::pair< double, unsigned int > > lockStatsDuringLumi_
std::map< unsigned int, std::pair< unsigned int, bool > > processedEventsPerLumi_
void evf::FastMonitoringService::preGlobalEarlyTermination ( edm::GlobalContext const &  gc,
edm::TerminationOrigin  to 
)

Definition at line 226 of file FastMonitoringService.cc.

References edm::ExceptionFromAnotherContext, edm::ExceptionFromThisContext, exceptionInLS_, edm::ExternalSignal, fmt_, CommonMethods::lock(), edm::LuminosityBlockID::luminosityBlock(), edm::GlobalContext::luminosityBlockID(), evf::FastMonitoringThread::monlock_, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by FastMonitoringService().

227  {
228  std::string context;
229  if (to==edm::TerminationOrigin::ExceptionFromThisContext) context = " FromThisContext ";
230  if (to==edm::TerminationOrigin::ExceptionFromAnotherContext) context = " FromAnotherContext";
231  if (to==edm::TerminationOrigin::ExternalSignal) context = " FromExternalSignal";
232  edm::LogInfo("FastMonitoringService") << " GLOBAL " << "earlyTermination -: LS:"
233  << gc.luminosityBlockID().luminosityBlock() << " " << context;
234  std::lock_guard<std::mutex> lock(fmt_.monlock_);
235  exceptionInLS_.push_back(gc.luminosityBlockID().luminosityBlock());
236  //exception_detected_=true;
237  }
std::vector< unsigned int > exceptionInLS_
void evf::FastMonitoringService::preGlobalEndLumi ( edm::GlobalContext const &  gc)

Definition at line 323 of file FastMonitoringService.cc.

References accuSize_, doSnapshot(), edm::hlt::Exception, exception_detected_, exceptionInLS_, evf::FastMonitoringThread::MonitorData::fastThroughputJ_, fmt_, isGlobalLumiTransition_, evf::FastMonitoringThread::jsonMonitor_, CommonMethods::lock(), LogDebug, fjr2json::lumi, edm::LuminosityBlockID::luminosityBlock(), edm::GlobalContext::luminosityBlockID(), lumiStartTime_, evf::FastMonitoringThread::m_data, evf::FastMonitoringThread::monlock_, cmsHarvester::path, processedEventsPerLumi_, edm::shutdown_flag, slowName_, sourceEventsReport_, throughputFactor(), jsoncollector::IntJ::value(), jsoncollector::DoubleJ::value(), and workingDirectory_.

Referenced by FastMonitoringService().

324  {
325  unsigned int lumi = gc.luminosityBlockID().luminosityBlock();
326  LogDebug("FastMonitoringService") << "Lumi ended. Writing JSON information. LUMI -: "
327  << lumi;
328  timeval lumiStopTime;
329  gettimeofday(&lumiStopTime, 0);
330 
331  std::lock_guard<std::mutex> lock(fmt_.monlock_);
332 
333  // Compute throughput
334  timeval stt = lumiStartTime_[lumi];
335  unsigned long usecondsForLumi = (lumiStopTime.tv_sec - stt.tv_sec)*1000000
336  + (lumiStopTime.tv_usec - stt.tv_usec);
337  unsigned long accuSize = accuSize_.find(lumi)==accuSize_.end() ? 0 : accuSize_[lumi];
338  double throughput = throughputFactor()* double(accuSize) / double(usecondsForLumi);
339  //store to registered variable
340  fmt_.m_data.fastThroughputJ_.value() = throughput;
341 
342  //update
343  doSnapshot(lumi,true);
344 
345  // create file name for slow monitoring file
346  std::stringstream slowFileName;
347  slowFileName << slowName_ << "_ls" << std::setfill('0') << std::setw(4)
348  << lumi << "_pid" << std::setfill('0')
349  << std::setw(5) << getpid() << ".jsn";
351  slow /= slowFileName.str();
352 
353  //retrieve one result we need (todo: sanity check if it's found)
354  IntJ *lumiProcessedJptr = dynamic_cast<IntJ*>(fmt_.jsonMonitor_->getMergedIntJForLumi("Processed",lumi));
355  if (!lumiProcessedJptr)
356  throw cms::Exception("FastMonitoringService") << "Internal error: got null pointer from FastMonitor";
357  processedEventsPerLumi_[lumi] = std::pair<unsigned int,bool>(lumiProcessedJptr->value(),false);
358 
359  {
360  auto itr = sourceEventsReport_.find(lumi);
361  if (itr==sourceEventsReport_.end()) {
362  //check if exception has been thrown (in case of Global/Stream early termination, for this LS)
363  bool exception_detected = exception_detected_;
364  for (auto ex : exceptionInLS_)
365  if (lumi == ex) exception_detected=true;
366 
367  if (edm::shutdown_flag || exception_detected) {
368  edm::LogInfo("FastMonitoringService") << "Run interrupted. Skip writing EoL information -: "
369  << processedEventsPerLumi_[lumi].first << " events were processed in LUMI " << lumi;
370  //this will prevent output modules from producing json file for possibly incomplete lumi
371  processedEventsPerLumi_[lumi].first=0;
372  processedEventsPerLumi_[lumi].second=true;
373  return;
374  }
375  //disable this exception, so service can be used standalone (will be thrown if output module asks for this information)
376  //throw cms::Exception("FastMonitoringService") << "SOURCE did not send update for lumi block. LUMI -:" << lumi;
377  }
378  else {
379  if (itr->second!=processedEventsPerLumi_[lumi].first) {
380  throw cms::Exception("FastMonitoringService") << "MISMATCH with SOURCE update. LUMI -: "
381  << lumi
382  << ", events(processed):" << processedEventsPerLumi_[lumi].first
383  << " events(source):" << itr->second;
384  }
385  sourceEventsReport_.erase(itr);
386  }
387  }
388  edm::LogInfo("FastMonitoringService") << "Statistics for lumisection -: lumi = " << lumi << " events = "
389  << lumiProcessedJptr->value() << " time = " << usecondsForLumi/1000000
390  << " size = " << accuSize << " thr = " << throughput;
391  delete lumiProcessedJptr;
392 
393  //full global and stream merge&output for this lumi
394  fmt_.jsonMonitor_->outputFullJSON(slow.string(),lumi);//full global and stream merge and JSON write for this lumi
395  fmt_.jsonMonitor_->discardCollected(lumi);//we don't do further updates for this lumi
396 
398  }
#define LogDebug(id)
std::map< unsigned int, timeval > lumiStartTime_
double throughputFactor()
std::map< unsigned int, unsigned long > accuSize_
tuple lumi
Definition: fjr2json.py:35
volatile std::atomic< bool > shutdown_flag
tuple path
else: Piece not in the list, fine.
void doSnapshot(const unsigned int ls, const bool isGlobalEOL)
std::map< unsigned int, std::pair< unsigned int, bool > > processedEventsPerLumi_
boost::filesystem::path workingDirectory_
std::map< unsigned int, unsigned int > sourceEventsReport_
std::unique_ptr< FastMonitor > jsonMonitor_
std::vector< unsigned int > exceptionInLS_
void evf::FastMonitoringService::preModuleBeginJob ( edm::ModuleDescription const &  desc)

Definition at line 261 of file FastMonitoringService.cc.

References encModule_, fmt_, CommonMethods::lock(), edm::ModuleDescription::moduleName(), evf::FastMonitoringThread::monlock_, evf::FastMonitoringService::Encoding::update(), and evf::FastMonitoringService::Encoding::updateReserved().

Referenced by FastMonitoringService().

262  {
263  std::lock_guard<std::mutex> lock(fmt_.monlock_);
264  //std::cout << " Pre module Begin Job module: " << desc.moduleName() << std::endl;
265 
266  //build a map of modules keyed by their module description address
267  //here we need to treat output modules in a special way so they can be easily singled out
268  if(desc.moduleName() == "Stream" || desc.moduleName() == "ShmStreamConsumer" || desc.moduleName() == "EvFOutputModule" ||
269  desc.moduleName() == "EventStreamFileWriter" || desc.moduleName() == "PoolOutputModule")
270  encModule_.updateReserved((void*)&desc);
271  else
272  encModule_.update((void*)&desc);
273  }
void evf::FastMonitoringService::preModuleEvent ( edm::StreamContext const &  sc,
edm::ModuleCallingContext const &  mcc 
)

Definition at line 520 of file FastMonitoringService.cc.

References microstate_, edm::ModuleCallingContext::moduleDescription(), edm::StreamContext::streamID(), and edm::StreamID::value().

Referenced by FastMonitoringService().

521  {
522  microstate_[sc.streamID().value()] = (void*)(mcc.moduleDescription());
523  }
std::vector< const void * > microstate_
void evf::FastMonitoringService::prePathEvent ( edm::StreamContext const &  sc,
edm::PathContext const &  pc 
)

Definition at line 446 of file FastMonitoringService.cc.

References collectedPathList_, encPath_, edm::EventID::event(), eventCountForPathInit_, edm::StreamContext::eventID(), firstEventId_, fmt_, CommonMethods::lock(), evf::FastMonitoringThread::m_data, makePathLegenda(), ministate_, evf::FastMonitoringThread::MonitorData::ministateBins_, evf::FastMonitoringThread::monlock_, pathLegendFile_, pathLegendWritten_, edm::PathContext::pathName(), edm::StreamContext::streamID(), AlCaHLTBitMon_QueryRunRegistry::string, and unlikely.

Referenced by FastMonitoringService().

447  {
448  //make sure that all path names are retrieved before allowing ministate to change
449  //hack: assume memory is synchronized after ~50 events seen by each stream
450  if (unlikely(eventCountForPathInit_[sc.streamID()]<50) && false==collectedPathList_[sc.streamID()]->load(std::memory_order_acquire))
451  {
452  //protection between stream threads, as well as the service monitoring thread
453  std::lock_guard<std::mutex> lock(fmt_.monlock_);
454 
455  if (firstEventId_[sc.streamID()]==0)
456  firstEventId_[sc.streamID()]=sc.eventID().event();
457  if (sc.eventID().event()==firstEventId_[sc.streamID()])
458  {
459  encPath_[sc.streamID()].update((void*)&pc.pathName());
460  return;
461  }
462  else {
463  //finished collecting path names
464  collectedPathList_[sc.streamID()]->store(true,std::memory_order_seq_cst);
465  fmt_.m_data.ministateBins_=encPath_[sc.streamID()].vecsize();
466  if (!pathLegendWritten_) {
467  std::string pathLegendStr = makePathLegenda();
468  FileIO::writeStringToFile(pathLegendFile_, pathLegendStr);
469  pathLegendWritten_=true;
470  }
471  }
472  }
473  else {
474  ministate_[sc.streamID()] = &(pc.pathName());
475  }
476  }
std::vector< unsigned int > eventCountForPathInit_
#define unlikely(x)
std::vector< std::atomic< bool > * > collectedPathList_
std::vector< unsigned long > firstEventId_
std::vector< const void * > ministate_
std::vector< Encoding > encPath_
void evf::FastMonitoringService::preSourceEarlyTermination ( edm::TerminationOrigin  to)
void evf::FastMonitoringService::preSourceEvent ( edm::StreamID  sid)

Definition at line 510 of file FastMonitoringService.cc.

References microstate_, evf::MicroStateService::mInput, evf::MicroStateService::reservedMicroStateNames, and edm::StreamID::value().

Referenced by FastMonitoringService().

511  {
513  }
static const edm::ModuleDescription reservedMicroStateNames[mCOUNT]
std::vector< const void * > microstate_
unsigned int value() const
Definition: StreamID.h:46
void evf::FastMonitoringService::preStreamBeginLumi ( edm::StreamContext const &  sc)

Definition at line 406 of file FastMonitoringService.cc.

References edm::StreamContext::eventID(), fmt_, CommonMethods::lock(), edm::EventID::luminosityBlock(), evf::FastMonitoringThread::m_data, evf::MicroStateService::mFwkOvh, microstate_, ministate_, evf::FastMonitoringThread::monlock_, nopath_, evf::FastMonitoringThread::MonitorData::processed_, evf::MicroStateService::reservedMicroStateNames, edm::StreamContext::streamID(), evf::FastMonitoringThread::MonitorData::streamLumi_, and edm::StreamID::value().

Referenced by FastMonitoringService().

407  {
408  unsigned int sid = sc.streamID().value();
409  std::lock_guard<std::mutex> lock(fmt_.monlock_);
410  fmt_.m_data.streamLumi_[sid] = sc.eventID().luminosityBlock();
411 
412  //reset collected values for this stream
413  *(fmt_.m_data.processed_[sid])=0;
414 
415  ministate_[sid]=&nopath_;
417  }
static const edm::ModuleDescription reservedMicroStateNames[mCOUNT]
std::vector< AtomicMonUInt * > processed_
static const std::string nopath_
std::vector< const void * > microstate_
std::vector< const void * > ministate_
std::vector< unsigned int > streamLumi_
void evf::FastMonitoringService::preStreamEarlyTermination ( edm::StreamContext const &  sc,
edm::TerminationOrigin  to 
)

Definition at line 213 of file FastMonitoringService.cc.

References edm::StreamContext::eventID(), edm::ExceptionFromAnotherContext, edm::ExceptionFromThisContext, exceptionInLS_, edm::ExternalSignal, fmt_, CommonMethods::lock(), edm::EventID::luminosityBlock(), evf::FastMonitoringThread::monlock_, edm::StreamContext::streamID(), AlCaHLTBitMon_QueryRunRegistry::string, and edm::StreamID::value().

Referenced by FastMonitoringService().

214  {
215  std::string context;
216  if (to==edm::TerminationOrigin::ExceptionFromThisContext) context = " FromThisContext ";
217  if (to==edm::TerminationOrigin::ExceptionFromAnotherContext) context = " FromAnotherContext";
218  if (to==edm::TerminationOrigin::ExternalSignal) context = " FromExternalSignal";
219  edm::LogInfo("FastMonitoringService") << " STREAM " << sc.streamID().value() << " earlyTermination -: ID:"<< sc.eventID()
220  << " LS:" << sc.eventID().luminosityBlock() << " " << context;
221  std::lock_guard<std::mutex> lock(fmt_.monlock_);
222  exceptionInLS_.push_back(sc.eventID().luminosityBlock());
223  //exception_detected_=true;
224  }
std::vector< unsigned int > exceptionInLS_
void evf::FastMonitoringService::preStreamEndLumi ( edm::StreamContext const &  sc)

Definition at line 424 of file FastMonitoringService.cc.

References doStreamEOLSnapshot(), edm::StreamContext::eventID(), fmt_, svgfig::load(), CommonMethods::lock(), edm::EventID::luminosityBlock(), evf::MicroStateService::mEoL, microstate_, ministate_, evf::FastMonitoringThread::monlock_, nopath_, evf::MicroStateService::reservedMicroStateNames, streamCounterUpdating_, edm::StreamContext::streamID(), and edm::StreamID::value().

Referenced by FastMonitoringService().

425  {
426  unsigned int sid = sc.streamID().value();
427  std::lock_guard<std::mutex> lock(fmt_.monlock_);
428 
429  #if ATOMIC_LEVEL>=2
430  //spinlock to make sure we are not still updating event counter somewhere
431  while (streamCounterUpdating_[sid]->load(std::memory_order_acquire)) {}
432  #endif
433 
434  //update processed count to be complete at this time
435  doStreamEOLSnapshot(sc.eventID().luminosityBlock(),sid);
436  //reset this in case stream does not get notified of next lumi (we keep processed events only)
437  ministate_[sid]=&nopath_;
439  }
static const edm::ModuleDescription reservedMicroStateNames[mCOUNT]
std::vector< std::atomic< bool > * > streamCounterUpdating_
void doStreamEOLSnapshot(const unsigned int ls, const unsigned int streamID)
static const std::string nopath_
def load
Definition: svgfig.py:546
std::vector< const void * > microstate_
std::vector< const void * > ministate_
void evf::FastMonitoringService::reportEventsThisLumiInSource ( unsigned int  lumi,
unsigned int  events 
)

Definition at line 678 of file FastMonitoringService.cc.

References fmt_, CommonMethods::lock(), evf::FastMonitoringThread::monlock_, and sourceEventsReport_.

Referenced by FedRawDataInputSource::checkNextEvent(), and FedRawDataInputSource::getNextEvent().

679  {
680 
681  std::lock_guard<std::mutex> lock(fmt_.monlock_);
682  auto itr = sourceEventsReport_.find(lumi);
683  if (itr!=sourceEventsReport_.end())
684  itr->second+=events;
685  else
687 
688  }
tuple lumi
Definition: fjr2json.py:35
tuple events
Definition: patZpeak.py:19
std::map< unsigned int, unsigned int > sourceEventsReport_
void evf::FastMonitoringService::reportLockWait ( unsigned int  ls,
double  waitTime,
unsigned int  lockCount 
)

Definition at line 594 of file FastMonitoringService.cc.

References fmt_, CommonMethods::lock(), lockStatsDuringLumi_, python.rootplot.utilities::ls(), and evf::FastMonitoringThread::monlock_.

Referenced by FedRawDataInputSource::readSupervisor().

595  {
596  std::lock_guard<std::mutex> lock(fmt_.monlock_);
597  lockStatsDuringLumi_[ls]=std::pair<double,unsigned int>(waitTime,lockCount);
598 
599  }
std::map< unsigned int, std::pair< double, unsigned int > > lockStatsDuringLumi_
void evf::FastMonitoringService::setExceptionDetected ( unsigned int  ls)

Definition at line 250 of file FastMonitoringService.cc.

References exception_detected_, and exceptionInLS_.

Referenced by FedRawDataInputSource::getNextEvent().

250  {
251  if (!ls) exception_detected_=true;
252  else exceptionInLS_.push_back(ls);
253  }
std::vector< unsigned int > exceptionInLS_
void evf::FastMonitoringService::setMicroState ( MicroStateService::Microstate  m)
virtual

Implements evf::MicroStateService.

Definition at line 535 of file FastMonitoringService.cc.

References i, microstate_, nStreams_, and evf::MicroStateService::reservedMicroStateNames.

536  {
537  for (unsigned int i=0;i<nStreams_;i++)
539  }
int i
Definition: DBlmapReader.cc:9
static const edm::ModuleDescription reservedMicroStateNames[mCOUNT]
std::vector< const void * > microstate_
void evf::FastMonitoringService::setMicroState ( edm::StreamID  sid,
MicroStateService::Microstate  m 
)
virtual
bool evf::FastMonitoringService::shouldWriteFiles ( unsigned int  lumi,
unsigned int *  proc = 0 
)
inline

Definition at line 156 of file FastMonitoringService.h.

References emptyLumisectionMode_, getAbortFlagForLumi(), getEventsProcessedForLumi(), and proc.

Referenced by DQMFileSaver::globalEndLuminosityBlock(), and TriggerJSONMonitoring::globalEndLuminosityBlockSummary().

157  {
158  unsigned int processed = getEventsProcessedForLumi(lumi);
159  if (proc) *proc = processed;
160  return !getAbortFlagForLumi(lumi) && (processed || emptyLumisectionMode_);
161  }
unsigned int getEventsProcessedForLumi(unsigned int lumi, bool *abortFlag=0)
TrainProcessor *const proc
Definition: MVATrainer.cc:101
tuple lumi
Definition: fjr2json.py:35
bool getAbortFlagForLumi(unsigned int lumi)
void evf::FastMonitoringService::startedLookingForFile ( )

Definition at line 560 of file FastMonitoringService.cc.

References fileLookStart_.

Referenced by FedRawDataInputSource::readSupervisor().

560  {
561  gettimeofday(&fileLookStart_, 0);
562  /*
563  std::cout << "Started looking for .raw file at: s=" << fileLookStart_.tv_sec << ": ms = "
564  << fileLookStart_.tv_usec / 1000.0 << std::endl;
565  */
566  }
void evf::FastMonitoringService::stoppedLookingForFile ( unsigned int  lumi)

Definition at line 568 of file FastMonitoringService.cc.

References avgLeadTime_, fileLookStart_, fileLookStop_, fmt_, i, leadTimes_, CommonMethods::lock(), fjr2json::lumi, lumiFromSource_, and evf::FastMonitoringThread::monlock_.

Referenced by FedRawDataInputSource::readSupervisor().

568  {
569  gettimeofday(&fileLookStop_, 0);
570  /*
571  std::cout << "Stopped looking for .raw file at: s=" << fileLookStop_.tv_sec << ": ms = "
572  << fileLookStop_.tv_usec / 1000.0 << std::endl;
573  */
574  std::lock_guard<std::mutex> lock(fmt_.monlock_);
575 
576  if (lumi>lumiFromSource_) {
578  leadTimes_.clear();
579  }
580  unsigned long elapsedTime = (fileLookStop_.tv_sec - fileLookStart_.tv_sec) * 1000000 // sec to us
581  + (fileLookStop_.tv_usec - fileLookStart_.tv_usec); // us
582  // add this to lead times for this lumi
583  leadTimes_.push_back((double)elapsedTime);
584 
585  // recompute average lead time for this lumi
586  if (leadTimes_.size() == 1) avgLeadTime_[lumi] = leadTimes_[0];
587  else {
588  double totTime = 0;
589  for (unsigned int i = 0; i < leadTimes_.size(); i++) totTime += leadTimes_[i];
590  avgLeadTime_[lumi] = 0.001*(totTime / leadTimes_.size());
591  }
592  }
int i
Definition: DBlmapReader.cc:9
tuple lumi
Definition: fjr2json.py:35
std::map< unsigned int, double > avgLeadTime_
std::vector< double > leadTimes_

Member Data Documentation

std::map<unsigned int, unsigned long> evf::FastMonitoringService::accuSize_
private
std::map<unsigned int, double> evf::FastMonitoringService::avgLeadTime_
private

Definition at line 233 of file FastMonitoringService.h.

Referenced by doSnapshot(), preGlobalBeginLumi(), and stoppedLookingForFile().

std::vector<std::atomic<bool>*> evf::FastMonitoringService::collectedPathList_
private

Definition at line 248 of file FastMonitoringService.h.

Referenced by preallocate(), and prePathEvent().

bool evf::FastMonitoringService::emptyLumisectionMode_ = false
private

Definition at line 267 of file FastMonitoringService.h.

Referenced by preallocate(), and shouldWriteFiles().

Encoding evf::FastMonitoringService::encModule_
private
std::vector<Encoding> evf::FastMonitoringService::encPath_
private
std::vector<unsigned int> evf::FastMonitoringService::eventCountForPathInit_
private

Definition at line 249 of file FastMonitoringService.h.

Referenced by postEvent(), preallocate(), and prePathEvent().

bool evf::FastMonitoringService::exception_detected_ = false
private
std::vector<unsigned int> evf::FastMonitoringService::exceptionInLS_
private
std::string evf::FastMonitoringService::fastMicrostateDefPath_
private

Definition at line 210 of file FastMonitoringService.h.

Referenced by preallocate().

unsigned int evf::FastMonitoringService::fastMonIntervals_
private

Definition at line 208 of file FastMonitoringService.h.

Referenced by dowork().

std::string evf::FastMonitoringService::fastName_
private

Definition at line 211 of file FastMonitoringService.h.

Referenced by preallocate().

std::string evf::FastMonitoringService::fastPath_
private

Definition at line 211 of file FastMonitoringService.h.

Referenced by dowork(), and preallocate().

timeval evf::FastMonitoringService::fileLookStart_
private

Definition at line 216 of file FastMonitoringService.h.

Referenced by startedLookingForFile(), and stoppedLookingForFile().

timeval evf::FastMonitoringService::fileLookStop_
private

Definition at line 216 of file FastMonitoringService.h.

Referenced by stoppedLookingForFile().

std::map<unsigned int, unsigned int> evf::FastMonitoringService::filesProcessedDuringLumi_
private

Definition at line 234 of file FastMonitoringService.h.

Referenced by accumulateFileSize(), doSnapshot(), and preGlobalBeginLumi().

std::vector<unsigned long> evf::FastMonitoringService::firstEventId_
private

Definition at line 247 of file FastMonitoringService.h.

Referenced by preallocate(), and prePathEvent().

FastMonitoringThread evf::FastMonitoringService::fmt_
private
bool evf::FastMonitoringService::isGlobalLumiTransition_
private
unsigned int evf::FastMonitoringService::lastGlobalLumi_
private

Definition at line 218 of file FastMonitoringService.h.

Referenced by dowork(), preallocate(), and preGlobalBeginLumi().

std::queue<unsigned int> evf::FastMonitoringService::lastGlobalLumisClosed_
private

Definition at line 219 of file FastMonitoringService.h.

Referenced by postGlobalEndLumi(), and preGlobalBeginLumi().

std::vector<double> evf::FastMonitoringService::leadTimes_
private

Definition at line 237 of file FastMonitoringService.h.

Referenced by stoppedLookingForFile().

std::map<unsigned int, std::pair<double,unsigned int> > evf::FastMonitoringService::lockStatsDuringLumi_
private

Definition at line 238 of file FastMonitoringService.h.

Referenced by doSnapshot(), preGlobalBeginLumi(), and reportLockWait().

unsigned int evf::FastMonitoringService::lumiFromSource_
private

Definition at line 221 of file FastMonitoringService.h.

Referenced by preallocate(), and stoppedLookingForFile().

std::map<unsigned int, timeval> evf::FastMonitoringService::lumiStartTime_
private

Definition at line 215 of file FastMonitoringService.h.

Referenced by preGlobalBeginLumi(), and preGlobalEndLumi().

FastMonitoringThread::Macrostate evf::FastMonitoringService::macrostate_
private
const std::string evf::FastMonitoringService::macroStateNames
static
Initial value:
=
{"Init","JobReady","RunGiven","Running",
"Stopping","Done","JobEnded","Error","ErrorEnded","End",
"Invalid"}

Definition at line 108 of file FastMonitoringService.h.

std::vector<const void*> evf::FastMonitoringService::microstate_
private
std::string evf::FastMonitoringService::microstateDefPath_
private

Definition at line 210 of file FastMonitoringService.h.

Referenced by preallocate().

std::vector<const void*> evf::FastMonitoringService::ministate_
private
std::string evf::FastMonitoringService::moduleLegendFile_
private

Definition at line 260 of file FastMonitoringService.h.

Referenced by postBeginJob(), and preallocate().

std::atomic<bool> evf::FastMonitoringService::monInit_
private

Definition at line 264 of file FastMonitoringService.h.

Referenced by dowork(), and preallocate().

const std::string evf::FastMonitoringService::nopath_ = "NoPath"
static
unsigned int evf::FastMonitoringService::nStreams_
private

Definition at line 205 of file FastMonitoringService.h.

Referenced by doSnapshot(), preallocate(), and setMicroState().

unsigned int evf::FastMonitoringService::nThreads_
private

Definition at line 206 of file FastMonitoringService.h.

Referenced by preallocate().

std::string evf::FastMonitoringService::pathLegendFile_
private

Definition at line 261 of file FastMonitoringService.h.

Referenced by preallocate(), and prePathEvent().

bool evf::FastMonitoringService::pathLegendWritten_ = false
private

Definition at line 262 of file FastMonitoringService.h.

Referenced by prePathEvent().

std::vector<bool> evf::FastMonitoringService::pathNamesReady_
private

Definition at line 250 of file FastMonitoringService.h.

std::map<unsigned int, std::pair<unsigned int,bool> > evf::FastMonitoringService::processedEventsPerLumi_
private
boost::filesystem::path evf::FastMonitoringService::runDirectory_
private

Definition at line 252 of file FastMonitoringService.h.

Referenced by getRunDirName(), and preallocate().

int evf::FastMonitoringService::sleepTime_
private

Definition at line 207 of file FastMonitoringService.h.

Referenced by dowork().

std::string evf::FastMonitoringService::slowName_
private

Definition at line 211 of file FastMonitoringService.h.

Referenced by preGlobalEndLumi().

unsigned int evf::FastMonitoringService::snapCounter_ = 0
private

Definition at line 209 of file FastMonitoringService.h.

Referenced by dowork().

std::map<unsigned int,unsigned int> evf::FastMonitoringService::sourceEventsReport_
private

Definition at line 254 of file FastMonitoringService.h.

Referenced by preGlobalEndLumi(), and reportEventsThisLumiInSource().

std::vector<std::atomic<bool>*> evf::FastMonitoringService::streamCounterUpdating_
private

Definition at line 245 of file FastMonitoringService.h.

Referenced by postEvent(), preallocate(), and preStreamEndLumi().

bool evf::FastMonitoringService::threadIDAvailable_ = false
private

Definition at line 256 of file FastMonitoringService.h.

Referenced by preallocate().

std::vector<const void*> evf::FastMonitoringService::threadMicrostate_
private

Definition at line 229 of file FastMonitoringService.h.

std::atomic<unsigned long> evf::FastMonitoringService::totalEventsProcessed_
private

Definition at line 258 of file FastMonitoringService.h.

Referenced by postEvent().

boost::filesystem::path evf::FastMonitoringService::workingDirectory_
private

Definition at line 252 of file FastMonitoringService.h.

Referenced by preallocate(), and preGlobalEndLumi().