CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
List of all members | Classes | Public Member Functions | Static Public Attributes | Private Member Functions | Private Attributes
evf::FastMonitoringService Class Reference

#include <FastMonitoringService.h>

Inheritance diagram for evf::FastMonitoringService:
evf::MicroStateService

Classes

struct  Encoding
 

Public Member Functions

void accumulateFileSize (unsigned int lumi, unsigned long fileSize)
 
 FastMonitoringService (const edm::ParameterSet &, edm::ActivityRegistry &)
 
bool getAbortFlagForLumi (unsigned int lumi)
 
unsigned int getEventsProcessedForLumi (unsigned int lumi, bool *abortFlag=0)
 
std::string getRunDirName () const
 
void jobFailure ()
 
std::string makeModuleLegendaJson ()
 
std::string makePathLegendaJson ()
 
void postBeginJob ()
 
void postEndJob ()
 
void postEvent (edm::StreamContext const &)
 
void postGlobalBeginRun (edm::GlobalContext const &)
 
void postGlobalEndLumi (edm::GlobalContext const &)
 
void postModuleEvent (edm::StreamContext const &, edm::ModuleCallingContext const &)
 
void postSourceEvent (edm::StreamID)
 
void postStreamBeginLumi (edm::StreamContext const &)
 
void postStreamEndLumi (edm::StreamContext const &)
 
void preallocate (edm::service::SystemBounds const &)
 
void preEvent (edm::StreamContext const &)
 
void preGlobalBeginLumi (edm::GlobalContext const &)
 
void preGlobalEarlyTermination (edm::GlobalContext const &, edm::TerminationOrigin)
 
void preGlobalEndLumi (edm::GlobalContext const &)
 
void preModuleBeginJob (edm::ModuleDescription const &)
 
void preModuleEvent (edm::StreamContext const &, edm::ModuleCallingContext const &)
 
void prePathEvent (edm::StreamContext const &, edm::PathContext const &)
 
void preSourceEarlyTermination (edm::TerminationOrigin)
 
void preSourceEvent (edm::StreamID)
 
void preStreamBeginLumi (edm::StreamContext const &)
 
void preStreamEarlyTermination (edm::StreamContext const &, edm::TerminationOrigin)
 
void preStreamEndLumi (edm::StreamContext const &)
 
void reportEventsThisLumiInSource (unsigned int lumi, unsigned int events)
 
void reportLockWait (unsigned int ls, double waitTime, unsigned int lockCount)
 
void setExceptionDetected (unsigned int ls)
 
void setMicroState (MicroStateService::Microstate)
 
void setMicroState (edm::StreamID, MicroStateService::Microstate)
 
bool shouldWriteFiles (unsigned int lumi, unsigned int *proc=0)
 
void startedLookingForFile ()
 
void stoppedLookingForFile (unsigned int lumi)
 
 ~FastMonitoringService ()
 
- Public Member Functions inherited from evf::MicroStateService
virtual std::string getMicroState1 ()
 
virtual std::string const & getMicroState2 ()
 
 MicroStateService (const edm::ParameterSet &, edm::ActivityRegistry &)
 
virtual ~MicroStateService ()
 

Static Public Attributes

static const std::string macroStateNames [FastMonitoringThread::MCOUNT]
 
static const std::string nopath_ = "NoPath"
 
- Static Public Attributes inherited from evf::MicroStateService
static const edm::ModuleDescription reservedMicroStateNames [mCOUNT]
 

Private Member Functions

void doSnapshot (const unsigned int ls, const bool isGlobalEOL)
 
void doStreamEOLSnapshot (const unsigned int ls, const unsigned int streamID)
 
void dowork ()
 

Private Attributes

std::map< unsigned int,
unsigned long > 
accuSize_
 
std::map< unsigned int, double > avgLeadTime_
 
std::vector< std::atomic< bool > * > collectedPathList_
 
bool emptyLumisectionMode_ = false
 
Encoding encModule_
 
std::vector< EncodingencPath_
 
std::vector< unsigned int > eventCountForPathInit_
 
bool exception_detected_ = false
 
std::vector< unsigned int > exceptionInLS_
 
std::string fastMicrostateDefPath_
 
unsigned int fastMonIntervals_
 
std::string fastName_
 
std::string fastPath_
 
timeval fileLookStart_
 
timeval fileLookStop_
 
std::map< unsigned int,
unsigned int > 
filesProcessedDuringLumi_
 
std::vector< unsigned long > firstEventId_
 
FastMonitoringThread fmt_
 
bool isGlobalLumiTransition_
 
unsigned int lastGlobalLumi_
 
std::queue< unsigned int > lastGlobalLumisClosed_
 
std::vector< double > leadTimes_
 
std::map< unsigned int,
std::pair< double, unsigned
int > > 
lockStatsDuringLumi_
 
unsigned int lumiFromSource_
 
std::map< unsigned int, timeval > lumiStartTime_
 
FastMonitoringThread::Macrostate macrostate_
 
std::vector< const void * > microstate_
 
std::string microstateDefPath_
 
std::vector< const void * > ministate_
 
std::string moduleLegendFile_
 
std::string moduleLegendFileJson_
 
std::atomic< bool > monInit_
 
unsigned int nOutputModules_ =0
 
unsigned int nStreams_
 
unsigned int nThreads_
 
std::string pathLegendFile_
 
std::string pathLegendFileJson_
 
bool pathLegendWritten_ = false
 
std::vector< bool > pathNamesReady_
 
std::map< unsigned int,
std::pair< unsigned int, bool > > 
processedEventsPerLumi_
 
boost::filesystem::path runDirectory_
 
int sleepTime_
 
std::string slowName_
 
unsigned int snapCounter_ = 0
 
std::map< unsigned int,
unsigned int > 
sourceEventsReport_
 
std::vector< std::atomic< bool > * > streamCounterUpdating_
 
bool threadIDAvailable_ = false
 
std::vector< const void * > threadMicrostate_
 
std::atomic< unsigned long > totalEventsProcessed_
 
boost::filesystem::path workingDirectory_
 

Additional Inherited Members

- Public Types inherited from evf::MicroStateService
enum  Microstate {
  mInvalid = 0, mFwkOvh, mIdle, mInput,
  mInputDone, mDqm, mEoL, mCOUNT
}
 
- Static Protected Attributes inherited from evf::MicroStateService
static const std::string default_return_ ="NotImplemented"
 

Detailed Description

Definition at line 51 of file FastMonitoringService.h.

Constructor & Destructor Documentation

evf::FastMonitoringService::FastMonitoringService ( const edm::ParameterSet iPS,
edm::ActivityRegistry reg 
)

Definition at line 37 of file FastMonitoringService.cc.

References jobFailure(), postBeginJob(), postEndJob(), postEvent(), postGlobalEndLumi(), postModuleEvent(), postSourceEvent(), postStreamBeginLumi(), postStreamEndLumi(), preallocate(), preEvent(), preGlobalBeginLumi(), preGlobalEarlyTermination(), preGlobalEndLumi(), preModuleBeginJob(), preModuleEvent(), prePathEvent(), preSourceEarlyTermination(), preSourceEvent(), preStreamBeginLumi(), preStreamEarlyTermination(), preStreamEndLumi(), edm::ActivityRegistry::watchJobFailure(), edm::ActivityRegistry::watchPostBeginJob(), edm::ActivityRegistry::watchPostEndJob(), edm::ActivityRegistry::watchPostEvent(), edm::ActivityRegistry::watchPostGlobalEndLumi(), edm::ActivityRegistry::watchPostModuleEvent(), edm::ActivityRegistry::watchPostSourceEvent(), edm::ActivityRegistry::watchPostStreamBeginLumi(), edm::ActivityRegistry::watchPostStreamEndLumi(), edm::ActivityRegistry::watchPreallocate(), edm::ActivityRegistry::watchPreEvent(), edm::ActivityRegistry::watchPreGlobalBeginLumi(), edm::ActivityRegistry::watchPreGlobalEarlyTermination(), edm::ActivityRegistry::watchPreGlobalEndLumi(), edm::ActivityRegistry::watchPreModuleBeginJob(), edm::ActivityRegistry::watchPreModuleEvent(), edm::ActivityRegistry::watchPrePathEvent(), edm::ActivityRegistry::watchPreSourceEarlyTermination(), edm::ActivityRegistry::watchPreSourceEvent(), edm::ActivityRegistry::watchPreStreamBeginLumi(), edm::ActivityRegistry::watchPreStreamEarlyTermination(), and edm::ActivityRegistry::watchPreStreamEndLumi().

38  :
39  MicroStateService(iPS,reg)
41  ,nStreams_(0)//until initialized
42  ,sleepTime_(iPS.getUntrackedParameter<int>("sleepTime", 1))
43  ,fastMonIntervals_(iPS.getUntrackedParameter<unsigned int>("fastMonIntervals", 1))
44  ,microstateDefPath_(iPS.getUntrackedParameter<std::string> ("microstateDefPath", std::string(getenv("CMSSW_BASE"))+"/src/EventFilter/Utilities/plugins/microstatedef.jsd"))
46  ,fastName_(iPS.getUntrackedParameter<std::string>("fastName", "fastmoni"))
47  ,slowName_(iPS.getUntrackedParameter<std::string>("slowName", "slowmoni"))
49  {
50  reg.watchPreallocate(this, &FastMonitoringService::preallocate);//receiving information on number of threads
52 
56 
60 
65 
67 
70 
71  reg.watchPreSourceEvent(this,&FastMonitoringService::preSourceEvent);//source (with streamID of requestor)
73 
76 
80  }
void prePathEvent(edm::StreamContext const &, edm::PathContext const &)
void watchPreStreamEarlyTermination(PreStreamEarlyTermination::slot_type const &iSlot)
T getUntrackedParameter(std::string const &, T const &) const
void watchPreEvent(PreEvent::slot_type const &iSlot)
void watchPrePathEvent(PrePathEvent::slot_type const &iSlot)
void watchPreallocate(Preallocate::slot_type const &iSlot)
void watchPostEndJob(PostEndJob::slot_type const &iSlot)
void preallocate(edm::service::SystemBounds const &)
void preGlobalBeginLumi(edm::GlobalContext const &)
void watchPreModuleEvent(PreModuleEvent::slot_type const &iSlot)
void postGlobalEndLumi(edm::GlobalContext const &)
void postEvent(edm::StreamContext const &)
void watchPreGlobalEndLumi(PreGlobalEndLumi::slot_type const &iSlot)
void watchPostEvent(PostEvent::slot_type const &iSlot)
void watchPostStreamEndLumi(PostStreamEndLumi::slot_type const &iSlot)
void watchPreGlobalBeginLumi(PreGlobalBeginLumi::slot_type const &iSlot)
void watchPostModuleEvent(PostModuleEvent::slot_type const &iSlot)
void watchPostSourceEvent(PostSourceEvent::slot_type const &iSlot)
void preGlobalEndLumi(edm::GlobalContext const &)
void watchPreStreamEndLumi(PreStreamEndLumi::slot_type const &iSlot)
void preGlobalEarlyTermination(edm::GlobalContext const &, edm::TerminationOrigin)
void watchPreSourceEarlyTermination(PreSourceEarlyTermination::slot_type const &iSlot)
void watchJobFailure(JobFailure::slot_type const &iSlot)
convenience function for attaching to signal
void preModuleBeginJob(edm::ModuleDescription const &)
MicroStateService(const edm::ParameterSet &, edm::ActivityRegistry &)
void preStreamEndLumi(edm::StreamContext const &)
void watchPostStreamBeginLumi(PostStreamBeginLumi::slot_type const &iSlot)
void preStreamEarlyTermination(edm::StreamContext const &, edm::TerminationOrigin)
void watchPreGlobalEarlyTermination(PreGlobalEarlyTermination::slot_type const &iSlot)
#define NRESERVEDMODULES
void watchPostGlobalEndLumi(PostGlobalEndLumi::slot_type const &iSlot)
void watchPreModuleBeginJob(PreModuleBeginJob::slot_type const &iSlot)
void postModuleEvent(edm::StreamContext const &, edm::ModuleCallingContext const &)
void postStreamBeginLumi(edm::StreamContext const &)
void postStreamEndLumi(edm::StreamContext const &)
void preStreamBeginLumi(edm::StreamContext const &)
std::atomic< unsigned long > totalEventsProcessed_
void watchPreStreamBeginLumi(PreStreamBeginLumi::slot_type const &iSlot)
void preEvent(edm::StreamContext const &)
void preSourceEarlyTermination(edm::TerminationOrigin)
void watchPreSourceEvent(PreSourceEvent::slot_type const &iSlot)
void preModuleEvent(edm::StreamContext const &, edm::ModuleCallingContext const &)
void watchPostBeginJob(PostBeginJob::slot_type const &iSlot)
convenience function for attaching to signal
evf::FastMonitoringService::~FastMonitoringService ( )

Definition at line 83 of file FastMonitoringService.cc.

84  {
85  }

Member Function Documentation

void evf::FastMonitoringService::accumulateFileSize ( unsigned int  lumi,
unsigned long  fileSize 
)

Definition at line 569 of file FastMonitoringService.cc.

References accuSize_, filesProcessedDuringLumi_, fmt_, CommonMethods::lock(), fjr2json::lumi, and evf::FastMonitoringThread::monlock_.

Referenced by evf::EvFDaqDirector::bumpFile().

569  {
570  std::lock_guard<std::mutex> lock(fmt_.monlock_);
571 
572  if (accuSize_.find(lumi)==accuSize_.end()) accuSize_[lumi] = fileSize;
573  else accuSize_[lumi] += fileSize;
574 
577  else
579  }
std::map< unsigned int, unsigned long > accuSize_
tuple lumi
Definition: fjr2json.py:35
std::map< unsigned int, unsigned int > filesProcessedDuringLumi_
void evf::FastMonitoringService::doSnapshot ( const unsigned int  ls,
const bool  isGlobalEOL 
)
private

Definition at line 653 of file FastMonitoringService.cc.

References avgLeadTime_, encModule_, evf::FastMonitoringService::Encoding::encode(), encPath_, evf::FastMonitoringThread::MonitorData::fastAvgLeadTimeJ_, evf::FastMonitoringThread::MonitorData::fastFilesProcessedJ_, evf::FastMonitoringThread::MonitorData::fastLockCountJ_, evf::FastMonitoringThread::MonitorData::fastLockWaitJ_, evf::FastMonitoringThread::MonitorData::fastMacrostateJ_, filesProcessedDuringLumi_, fmt_, i, isGlobalLumiTransition_, evf::FastMonitoringThread::jsonMonitor_, lockStatsDuringLumi_, evf::FastMonitoringThread::m_data, macrostate_, microstate_, evf::FastMonitoringThread::MonitorData::microstateEncoded_, ministate_, evf::FastMonitoringThread::MonitorData::ministateEncoded_, and nStreams_.

Referenced by dowork(), and preGlobalEndLumi().

653  {
654  // update macrostate
656 
657  //update these unless in the midst of a global transition
659 
660  auto itd = avgLeadTime_.find(ls);
661  if (itd != avgLeadTime_.end())
662  fmt_.m_data.fastAvgLeadTimeJ_ = itd->second;
663  else fmt_.m_data.fastAvgLeadTimeJ_=0.;
664 
665  auto iti = filesProcessedDuringLumi_.find(ls);
666  if (iti != filesProcessedDuringLumi_.end())
667  fmt_.m_data.fastFilesProcessedJ_ = iti->second;
669 
670  auto itrd = lockStatsDuringLumi_.find(ls);
671  if (itrd != lockStatsDuringLumi_.end()) {
672  fmt_.m_data.fastLockWaitJ_ = itrd->second.first;
673  fmt_.m_data.fastLockCountJ_ = itrd->second.second;
674  }
675  else {
678  }
679 
680  }
681  else return;
682 
683  //capture latest mini/microstate of streams
684  for (unsigned int i=0;i<nStreams_;i++) {
687  }
688  //for (unsigned int i=0;i<nThreads_;i++)
689  // fmt_.m_data.threadMicrostateEncoded_[i] = encModule_.encode(threadMicrostate_[i]);
690 
691  if (isGlobalEOL)
692  {//only update global variables
693  fmt_.jsonMonitor_->snapGlobal(ls);
694  }
695  else
696  fmt_.jsonMonitor_->snap(ls);
697  }
int i
Definition: DBlmapReader.cc:9
def ls
Definition: eostools.py:348
std::map< unsigned int, unsigned int > filesProcessedDuringLumi_
std::map< unsigned int, double > avgLeadTime_
std::unique_ptr< jsoncollector::FastMonitor > jsonMonitor_
std::map< unsigned int, std::pair< double, unsigned int > > lockStatsDuringLumi_
std::vector< const void * > microstate_
std::vector< unsigned int > microstateEncoded_
FastMonitoringThread::Macrostate macrostate_
std::vector< const void * > ministate_
std::vector< Encoding > encPath_
std::vector< unsigned int > ministateEncoded_
void evf::FastMonitoringService::doStreamEOLSnapshot ( const unsigned int  ls,
const unsigned int  streamID 
)
inlineprivate

Definition at line 168 of file FastMonitoringService.h.

References fmt_, and evf::FastMonitoringThread::jsonMonitor_.

Referenced by preStreamEndLumi().

168  {
169  //pick up only event count here
170  fmt_.jsonMonitor_->snapStreamAtomic(ls,streamID);
171  }
def ls
Definition: eostools.py:348
std::unique_ptr< jsoncollector::FastMonitor > jsonMonitor_
void evf::FastMonitoringService::dowork ( )
inlineprivate

Definition at line 173 of file FastMonitoringService.h.

References doSnapshot(), encModule_, evf::FastMonitoringService::Encoding::encode(), encPath_, evf::FastMonitoringThread::MonitorData::fastMacrostateJ_, fastMonIntervals_, fastPath_, fmt_, evf::FastMonitoringThread::jsonMonitor_, lastGlobalLumi_, CommonMethods::lock(), evf::FastMonitoringThread::m_data, evf::FastMonitoringThread::m_stoprequest, microstate_, ministate_, monInit_, evf::FastMonitoringThread::monlock_, sleepTime_, snapCounter_, AlCaHLTBitMon_QueryRunRegistry::string, and jsoncollector::IntJ::value().

Referenced by preallocate().

173  { // the function to be called in the thread. Thread completes when function returns.
174  monInit_.exchange(true,std::memory_order_acquire);
175  while (!fmt_.m_stoprequest) {
176  edm::LogInfo("FastMonitoringService") << "Current states: Ms=" << fmt_.m_data.fastMacrostateJ_.value()
177  << " ms=" << encPath_[0].encode(ministate_[0])
178  << " us=" << encModule_.encode(microstate_[0]) << std::endl;
179 
180  {
181  std::lock_guard<std::mutex> lock(fmt_.monlock_);
182 
184 
186  std::string CSV = fmt_.jsonMonitor_->getCSVString();
187  //release mutex before writing out fast path file
188  fmt_.monlock_.unlock();
189  if (CSV.size())
190  fmt_.jsonMonitor_->outputCSV(fastPath_,CSV);
191  }
192 
193  snapCounter_++;
194 
195  }
196  ::sleep(sleepTime_);
197  }
198  }
void doSnapshot(const unsigned int ls, const bool isGlobalEOL)
std::unique_ptr< jsoncollector::FastMonitor > jsonMonitor_
std::vector< const void * > microstate_
std::atomic< bool > m_stoprequest
std::vector< const void * > ministate_
std::vector< Encoding > encPath_
bool evf::FastMonitoringService::getAbortFlagForLumi ( unsigned int  lumi)

Definition at line 639 of file FastMonitoringService.cc.

References Exception, fmt_, CommonMethods::lock(), fjr2json::lumi, evf::FastMonitoringThread::monlock_, and processedEventsPerLumi_.

Referenced by shouldWriteFiles().

639  {
640  std::lock_guard<std::mutex> lock(fmt_.monlock_);
641 
642  auto it = processedEventsPerLumi_.find(lumi);
643  if (it!=processedEventsPerLumi_.end()) {
644  unsigned int abortFlag = it->second.second;
645  return abortFlag;
646  }
647  else {
648  throw cms::Exception("FastMonitoringService") << "output module wants already deleted (or never reported by SOURCE) lumisection status for LUMI -: "<<lumi;
649  return 0;
650  }
651  }
tuple lumi
Definition: fjr2json.py:35
std::map< unsigned int, std::pair< unsigned int, bool > > processedEventsPerLumi_
unsigned int evf::FastMonitoringService::getEventsProcessedForLumi ( unsigned int  lumi,
bool *  abortFlag = 0 
)

Definition at line 623 of file FastMonitoringService.cc.

References Exception, fmt_, CommonMethods::lock(), fjr2json::lumi, evf::FastMonitoringThread::monlock_, proc, and processedEventsPerLumi_.

Referenced by dqm::DQMFileSaverPB::fillJson(), DQMFileSaver::fillJson(), DQMFileSaver::saveForFilterUnit(), dqm::DQMFileSaverPB::saveLumi(), and shouldWriteFiles().

623  {
624  std::lock_guard<std::mutex> lock(fmt_.monlock_);
625 
626  auto it = processedEventsPerLumi_.find(lumi);
627  if (it!=processedEventsPerLumi_.end()) {
628  unsigned int proc = it->second.first;
629  if (abortFlag) *abortFlag=it->second.second;
630  return proc;
631  }
632  else {
633  throw cms::Exception("FastMonitoringService") << "output module wants already deleted (or never reported by SOURCE) lumisection event count for LUMI -: "<<lumi;
634  return 0;
635  }
636  }
TrainProcessor *const proc
Definition: MVATrainer.cc:101
tuple lumi
Definition: fjr2json.py:35
std::map< unsigned int, std::pair< unsigned int, bool > > processedEventsPerLumi_
std::string evf::FastMonitoringService::getRunDirName ( ) const
inline

Definition at line 162 of file FastMonitoringService.h.

References runDirectory_.

162 { return runDirectory_.stem().string(); }
boost::filesystem::path runDirectory_
void evf::FastMonitoringService::jobFailure ( )
std::string evf::FastMonitoringService::makeModuleLegendaJson ( )

Definition at line 100 of file FastMonitoringService.cc.

References Json::Value::append(), Json::arrayValue, evf::FastMonitoringService::Encoding::current_, evf::FastMonitoringService::Encoding::decode(), encModule_, i, nOutputModules_, NRESERVEDMODULES, NSPECIALMODULES, and Json::StyledWriter::write().

Referenced by postBeginJob().

100  {
101  Json::Value legendaVector(Json::arrayValue);
102  for(int i = 0; i < encModule_.current_; i++)
103  legendaVector.append(Json::Value(((const edm::ModuleDescription *)(encModule_.decode(i)))->moduleLabel()));
104  Json::Value valReserved(NRESERVEDMODULES);
105  Json::Value valSpecial(NSPECIALMODULES);
106  Json::Value valOutputModules(nOutputModules_);
107  Json::Value moduleLegend;
108  moduleLegend["names"]=legendaVector;
109  moduleLegend["reserved"]=valReserved;
110  moduleLegend["special"]=valSpecial;
111  moduleLegend["output"]=valOutputModules;
112  Json::StyledWriter writer;
113  return writer.write(moduleLegend);
114  }
int i
Definition: DBlmapReader.cc:9
Represents a JSON value.
Definition: value.h:111
#define NRESERVEDMODULES
virtual std::string write(const Value &root)
Serialize a Value in JSON format.
Writes a Value in JSON format in a human friendly way.
Definition: writer.h:65
const void * decode(unsigned int index)
#define NSPECIALMODULES
array value (ordered list)
Definition: value.h:31
std::string evf::FastMonitoringService::makePathLegendaJson ( )

Definition at line 88 of file FastMonitoringService.cc.

References Json::Value::append(), Json::arrayValue, encPath_, i, NRESERVEDPATHS, AlCaHLTBitMon_QueryRunRegistry::string, and Json::StyledWriter::write().

Referenced by prePathEvent().

88  {
89  Json::Value legendaVector(Json::arrayValue);
90  for(int i = 0; i < encPath_[0].current_; i++)
91  legendaVector.append(Json::Value(*((std::string *)(encPath_[0].decode(i)))));
92  Json::Value valReserved(NRESERVEDPATHS);
93  Json::Value pathLegend;
94  pathLegend["names"]=legendaVector;
95  pathLegend["reserved"]=valReserved;
96  Json::StyledWriter writer;
97  return writer.write(pathLegend);
98  }
int i
Definition: DBlmapReader.cc:9
Represents a JSON value.
Definition: value.h:111
virtual std::string write(const Value &root)
Serialize a Value in JSON format.
Writes a Value in JSON format in a human friendly way.
Definition: writer.h:65
#define NRESERVEDPATHS
std::vector< Encoding > encPath_
array value (ordered list)
Definition: value.h:31
void evf::FastMonitoringService::postBeginJob ( )

Definition at line 296 of file FastMonitoringService.cc.

References encModule_, fmt_, CommonMethods::lock(), evf::FastMonitoringThread::m_data, macrostate_, makeModuleLegendaJson(), evf::FastMonitoringThread::MonitorData::microstateBins_, moduleLegendFileJson_, evf::FastMonitoringThread::monlock_, evf::FastMonitoringThread::sJobReady, AlCaHLTBitMon_QueryRunRegistry::string, and evf::FastMonitoringService::Encoding::vecsize().

Referenced by FastMonitoringService().

297  {
298  std::string && moduleLegStrJson = makeModuleLegendaJson();
299  FileIO::writeStringToFile(moduleLegendFileJson_, moduleLegStrJson);
300 
302 
303  //update number of entries in module histogram
304  std::lock_guard<std::mutex> lock(fmt_.monlock_);
306  }
FastMonitoringThread::Macrostate macrostate_
void evf::FastMonitoringService::postEndJob ( )
void evf::FastMonitoringService::postEvent ( edm::StreamContext const &  sc)

Definition at line 504 of file FastMonitoringService.cc.

References eventCountForPathInit_, evf::FastMonitoringThread::MonitorData::fastPathProcessedJ_, fmt_, evf::FastMonitoringThread::m_data, microstate_, evf::MicroStateService::mIdle, ministate_, nopath_, evf::FastMonitoringThread::MonitorData::processed_, evf::MicroStateService::reservedMicroStateNames, streamCounterUpdating_, edm::StreamContext::streamID(), and totalEventsProcessed_.

Referenced by FastMonitoringService().

505  {
506  microstate_[sc.streamID()] = &reservedMicroStateNames[mIdle];
507 
508  ministate_[sc.streamID()] = &nopath_;
509 
510  #if ATOMIC_LEVEL>=2
511  //use atomic flag to make sure end of lumi sees this
512  streamCounterUpdating_[sc.streamID()]->store(true,std::memory_order_release);
513  fmt_.m_data.processed_[sc.streamID()]->fetch_add(1,std::memory_order_release);
514  streamCounterUpdating_[sc.streamID()]->store(false,std::memory_order_release);
515 
516  #elif ATOMIC_LEVEL==1
517  //writes are atomic, we assume writes propagate to memory before stream EOL snap
518  fmt_.m_data.processed_[sc.streamID()]->fetch_add(1,std::memory_order_relaxed);
519 
520  #elif ATOMIC_LEVEL==0 //default
521  (*(fmt_.m_data.processed_[sc.streamID()]))++;
522  #endif
523  eventCountForPathInit_[sc.streamID()]++;
524 
525  //fast path counter (events accumulated in a run)
526  unsigned long res = totalEventsProcessed_.fetch_add(1,std::memory_order_relaxed);
528  //fmt_.m_data.fastPathProcessedJ_ = totalEventsProcessed_.load(std::memory_order_relaxed);
529  }
static const edm::ModuleDescription reservedMicroStateNames[mCOUNT]
std::vector< unsigned int > eventCountForPathInit_
std::vector< std::atomic< bool > * > streamCounterUpdating_
static const std::string nopath_
std::vector< const void * > microstate_
std::atomic< unsigned long > totalEventsProcessed_
std::vector< jsoncollector::AtomicMonUInt * > processed_
std::vector< const void * > ministate_
void evf::FastMonitoringService::postGlobalBeginRun ( edm::GlobalContext const &  gc)
void evf::FastMonitoringService::postGlobalEndLumi ( edm::GlobalContext const &  gc)

Definition at line 421 of file FastMonitoringService.cc.

References lastGlobalLumisClosed_, edm::LuminosityBlockID::luminosityBlock(), and edm::GlobalContext::luminosityBlockID().

Referenced by FastMonitoringService().

422  {
423  //mark closed lumis (still keep map entries until next one)
424  lastGlobalLumisClosed_.push(gc.luminosityBlockID().luminosityBlock());
425  }
std::queue< unsigned int > lastGlobalLumisClosed_
void evf::FastMonitoringService::postModuleEvent ( edm::StreamContext const &  sc,
edm::ModuleCallingContext const &  mcc 
)

Definition at line 546 of file FastMonitoringService.cc.

References evf::MicroStateService::mFwkOvh, microstate_, evf::MicroStateService::reservedMicroStateNames, edm::StreamContext::streamID(), and edm::StreamID::value().

Referenced by FastMonitoringService().

547  {
548  //microstate_[sc.streamID().value()] = (void*)(mcc.moduleDescription());
549  microstate_[sc.streamID().value()] = &reservedMicroStateNames[mFwkOvh];
550  }
static const edm::ModuleDescription reservedMicroStateNames[mCOUNT]
std::vector< const void * > microstate_
void evf::FastMonitoringService::postSourceEvent ( edm::StreamID  sid)

Definition at line 536 of file FastMonitoringService.cc.

References evf::MicroStateService::mFwkOvh, microstate_, evf::MicroStateService::reservedMicroStateNames, and edm::StreamID::value().

Referenced by FastMonitoringService().

537  {
539  }
static const edm::ModuleDescription reservedMicroStateNames[mCOUNT]
std::vector< const void * > microstate_
unsigned int value() const
Definition: StreamID.h:46
void evf::FastMonitoringService::postStreamBeginLumi ( edm::StreamContext const &  sc)
void evf::FastMonitoringService::postStreamEndLumi ( edm::StreamContext const &  sc)
void evf::FastMonitoringService::preallocate ( edm::service::SystemBounds const &  bounds)

Definition at line 116 of file FastMonitoringService.cc.

References collectedPathList_, evf::FastMonitoringService::Encoding::completeReservedWithDummies(), dowork(), emptyLumisectionMode_, encModule_, encPath_, eventCountForPathInit_, Exception, fastMicrostateDefPath_, fastName_, fastPath_, firstEventId_, fmt_, i, isGlobalLumiTransition_, evf::FastMonitoringThread::jsonMonitor_, lastGlobalLumi_, LogDebug, lumiFromSource_, evf::FastMonitoringThread::m_data, macrostate_, evf::FastMonitoringThread::MonitorData::macrostateBins_, edm::service::SystemBounds::maxNumberOfStreams(), edm::service::SystemBounds::maxNumberOfThreads(), evf::FastMonitoringThread::MCOUNT, evf::MicroStateService::mCOUNT, microstate_, evf::FastMonitoringThread::MonitorData::microstateBins_, microstateDefPath_, ministate_, evf::FastMonitoringThread::MonitorData::ministateBins_, evf::MicroStateService::mInvalid, moduleLegendFile_, moduleLegendFileJson_, monInit_, nopath_, nStreams_, nThreads_, cppFunctionSkipper::operator, fed_dqm_sourceclient-live_cfg::path, pathLegendFile_, pathLegendFileJson_, evf::FastMonitoringThread::MonitorData::registerVariables(), evf::MicroStateService::reservedMicroStateNames, evf::FastMonitoringThread::resetFastMonitor(), runDirectory_, evf::FastMonitoringThread::sInit, evf::FastMonitoringThread::start(), streamCounterUpdating_, threadIDAvailable_, evf::FastMonitoringService::Encoding::updateReserved(), and workingDirectory_.

Referenced by FastMonitoringService().

117  {
118 
119  // FIND RUN DIRECTORY
120  // The run dir should be set via the configuration of EvFDaqDirector
121 
122  if (edm::Service<evf::EvFDaqDirector>().operator->()==nullptr)
123  {
124  throw cms::Exception("FastMonitoringService") << "EvFDaqDirector is not present";
125 
126  }
127  emptyLumisectionMode_ = edm::Service<evf::EvFDaqDirector>()->emptyLumisectionMode();
128  boost::filesystem::path runDirectory(edm::Service<evf::EvFDaqDirector>()->baseRunDir());
129  workingDirectory_ = runDirectory_ = runDirectory;
130  workingDirectory_ /= "mon";
131 
132  if ( !boost::filesystem::is_directory(workingDirectory_)) {
133  LogDebug("FastMonitoringService") << "<MON> DIR NOT FOUND! Trying to create -: " << workingDirectory_.string() ;
134  boost::filesystem::create_directories(workingDirectory_);
135  if ( !boost::filesystem::is_directory(workingDirectory_))
136  edm::LogWarning("FastMonitoringService") << "Unable to create <MON> DIR -: " << workingDirectory_.string()
137  << ". No monitoring data will be written.";
138  }
139 
140  std::ostringstream fastFileName;
141 
142  fastFileName << fastName_ << "_pid" << std::setfill('0') << std::setw(5) << getpid() << ".fast";
144  fast /= fastFileName.str();
145  fastPath_ = fast.string();
146 
147  std::ostringstream moduleLegFile;
148  std::ostringstream moduleLegFileJson;
149  moduleLegFile << "microstatelegend_pid" << std::setfill('0') << std::setw(5) << getpid() << ".leg";
150  moduleLegFileJson << "microstatelegend_pid" << std::setfill('0') << std::setw(5) << getpid() << ".jsn";
151  moduleLegendFile_ = (workingDirectory_/moduleLegFile.str()).string();
152  moduleLegendFileJson_ = (workingDirectory_/moduleLegFileJson.str()).string();
153 
154  std::ostringstream pathLegFile;
155  std::ostringstream pathLegFileJson;
156  pathLegFile << "pathlegend_pid" << std::setfill('0') << std::setw(5) << getpid() << ".leg";
157  pathLegendFile_ = (workingDirectory_/pathLegFile.str()).string();
158  pathLegFileJson << "pathlegend_pid" << std::setfill('0') << std::setw(5) << getpid() << ".jsn";
159  pathLegendFileJson_ = (workingDirectory_/pathLegFileJson.str()).string();
160 
161  LogDebug("FastMonitoringService") << "Initializing FastMonitor with microstate def path -: "
163  //<< encPath_.current_ + 1 << " " << encModule_.current_ + 1
164 
165  nStreams_=bounds.maxNumberOfStreams();
166  nThreads_=bounds.maxNumberOfThreads();
167 
168  //this should already be >=1
169  if (nStreams_==0) nStreams_=1;
170  if (nThreads_==0) nThreads_=1;
171 
172  /*
173  * initialize the fast monitor with:
174  * vector of pointers to monitorable parameters
175  * path to definition
176  *
177  */
178 
180 
181  for(unsigned int i = 0; i < (mCOUNT); i++)
184 
185  for (unsigned int i=0;i<nStreams_;i++) {
186  ministate_.push_back(&nopath_);
188 
189  //for synchronization
190  streamCounterUpdating_.push_back(new std::atomic<bool>(0));
191 
192  //path (mini) state
193  encPath_.emplace_back(0);
194  encPath_[i].update((void*)&nopath_);
195  eventCountForPathInit_.push_back(0);
196  firstEventId_.push_back(0);
197  collectedPathList_.push_back(new std::atomic<bool>(0));
198 
199  }
200  //for (unsigned int i=0;i<nThreads_;i++)
201  // threadMicrostate_.push_back(&reservedMicroStateNames[mInvalid]);
202 
203  //initial size until we detect number of bins
207 
208  lastGlobalLumi_=0;
210  lumiFromSource_=0;
211 
212  //startup monitoring
214  fmt_.jsonMonitor_->setNStreams(nStreams_);
216  monInit_.store(false,std::memory_order_release);
218 
219  //this definition needs: #include "tbb/compat/thread"
220  //however this would results in TBB imeplementation replacing std::thread
221  //(both supposedly call pthread_self())
222  //number of threads created in process could be obtained from /proc,
223  //assuming that all posix threads are true kernel threads capable of running in parallel
224 
225  //#if TBB_IMPLEMENT_CPP0X
227  //threadIDAvailable_=true;
228  //#endif
229 
230  }
#define LogDebug(id)
int i
Definition: DBlmapReader.cc:9
static const edm::ModuleDescription reservedMicroStateNames[mCOUNT]
boost::filesystem::path runDirectory_
void start(void(FastMonitoringService::*fp)(), FastMonitoringService *cp)
std::vector< unsigned int > eventCountForPathInit_
std::vector< std::atomic< bool > * > streamCounterUpdating_
void registerVariables(jsoncollector::FastMonitor *fm, unsigned int nStreams, unsigned int nThreads)
std::vector< std::atomic< bool > * > collectedPathList_
std::unique_ptr< jsoncollector::FastMonitor > jsonMonitor_
static const std::string nopath_
void resetFastMonitor(std::string const &microStateDefPath, std::string const &fastMicroStateDefPath)
std::vector< unsigned long > firstEventId_
std::vector< const void * > microstate_
FastMonitoringThread::Macrostate macrostate_
boost::filesystem::path workingDirectory_
std::vector< const void * > ministate_
std::vector< Encoding > encPath_
void evf::FastMonitoringService::preEvent ( edm::StreamContext const &  sc)

Definition at line 500 of file FastMonitoringService.cc.

Referenced by FastMonitoringService().

501  {
502  }
void evf::FastMonitoringService::preGlobalBeginLumi ( edm::GlobalContext const &  gc)

Definition at line 319 of file FastMonitoringService.cc.

References accuSize_, avgLeadTime_, filesProcessedDuringLumi_, fmt_, isGlobalLumiTransition_, lastGlobalLumi_, lastGlobalLumisClosed_, CommonMethods::lock(), lockStatsDuringLumi_, edm::LuminosityBlockID::luminosityBlock(), edm::GlobalContext::luminosityBlockID(), lumiStartTime_, evf::FastMonitoringThread::monlock_, and processedEventsPerLumi_.

Referenced by FastMonitoringService().

320  {
321 
322  timeval lumiStartTime;
323  gettimeofday(&lumiStartTime, 0);
324  unsigned int newLumi = gc.luminosityBlockID().luminosityBlock();
325 
326  std::lock_guard<std::mutex> lock(fmt_.monlock_);
327 
328  lumiStartTime_[newLumi]=lumiStartTime;
329  while (!lastGlobalLumisClosed_.empty()) {
330  //wipe out old map entries as they aren't needed and slow down access
331  unsigned int oldLumi = lastGlobalLumisClosed_.back();
333  lumiStartTime_.erase(oldLumi);
334  avgLeadTime_.erase(oldLumi);
335  filesProcessedDuringLumi_.erase(oldLumi);
336  accuSize_.erase(oldLumi);
337  lockStatsDuringLumi_.erase(oldLumi);
338  processedEventsPerLumi_.erase(oldLumi);
339  }
340  lastGlobalLumi_= newLumi;
342  }
std::map< unsigned int, timeval > lumiStartTime_
std::map< unsigned int, unsigned long > accuSize_
std::map< unsigned int, unsigned int > filesProcessedDuringLumi_
std::queue< unsigned int > lastGlobalLumisClosed_
std::map< unsigned int, double > avgLeadTime_
std::map< unsigned int, std::pair< double, unsigned int > > lockStatsDuringLumi_
std::map< unsigned int, std::pair< unsigned int, bool > > processedEventsPerLumi_
void evf::FastMonitoringService::preGlobalEarlyTermination ( edm::GlobalContext const &  gc,
edm::TerminationOrigin  to 
)

Definition at line 245 of file FastMonitoringService.cc.

References edm::ExceptionFromAnotherContext, edm::ExceptionFromThisContext, exceptionInLS_, edm::ExternalSignal, fmt_, CommonMethods::lock(), edm::LuminosityBlockID::luminosityBlock(), edm::GlobalContext::luminosityBlockID(), evf::FastMonitoringThread::monlock_, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by FastMonitoringService().

246  {
247  std::string context;
248  if (to==edm::TerminationOrigin::ExceptionFromThisContext) context = " FromThisContext ";
249  if (to==edm::TerminationOrigin::ExceptionFromAnotherContext) context = " FromAnotherContext";
250  if (to==edm::TerminationOrigin::ExternalSignal) context = " FromExternalSignal";
251  edm::LogInfo("FastMonitoringService") << " GLOBAL " << "earlyTermination -: LS:"
252  << gc.luminosityBlockID().luminosityBlock() << " " << context;
253  std::lock_guard<std::mutex> lock(fmt_.monlock_);
254  exceptionInLS_.push_back(gc.luminosityBlockID().luminosityBlock());
255  //exception_detected_=true;
256  }
std::vector< unsigned int > exceptionInLS_
void evf::FastMonitoringService::preGlobalEndLumi ( edm::GlobalContext const &  gc)

Definition at line 344 of file FastMonitoringService.cc.

References accuSize_, doSnapshot(), Exception, exception_detected_, exceptionInLS_, evf::FastMonitoringThread::MonitorData::fastThroughputJ_, fmt_, isGlobalLumiTransition_, evf::FastMonitoringThread::jsonMonitor_, CommonMethods::lock(), LogDebug, fjr2json::lumi, edm::LuminosityBlockID::luminosityBlock(), edm::GlobalContext::luminosityBlockID(), lumiStartTime_, evf::FastMonitoringThread::m_data, evf::FastMonitoringThread::monlock_, fed_dqm_sourceclient-live_cfg::path, processedEventsPerLumi_, edm::shutdown_flag, slowName_, sourceEventsReport_, throughputFactor(), jsoncollector::IntJ::value(), jsoncollector::DoubleJ::value(), and workingDirectory_.

Referenced by FastMonitoringService().

345  {
346  unsigned int lumi = gc.luminosityBlockID().luminosityBlock();
347  LogDebug("FastMonitoringService") << "Lumi ended. Writing JSON information. LUMI -: "
348  << lumi;
349  timeval lumiStopTime;
350  gettimeofday(&lumiStopTime, 0);
351 
352  std::lock_guard<std::mutex> lock(fmt_.monlock_);
353 
354  // Compute throughput
355  timeval stt = lumiStartTime_[lumi];
356  unsigned long usecondsForLumi = (lumiStopTime.tv_sec - stt.tv_sec)*1000000
357  + (lumiStopTime.tv_usec - stt.tv_usec);
358  unsigned long accuSize = accuSize_.find(lumi)==accuSize_.end() ? 0 : accuSize_[lumi];
359  double throughput = throughputFactor()* double(accuSize) / double(usecondsForLumi);
360  //store to registered variable
361  fmt_.m_data.fastThroughputJ_.value() = throughput;
362 
363  //update
364  doSnapshot(lumi,true);
365 
366  // create file name for slow monitoring file
367  std::stringstream slowFileName;
368  slowFileName << slowName_ << "_ls" << std::setfill('0') << std::setw(4)
369  << lumi << "_pid" << std::setfill('0')
370  << std::setw(5) << getpid() << ".jsn";
372  slow /= slowFileName.str();
373 
374  //retrieve one result we need (todo: sanity check if it's found)
375  IntJ *lumiProcessedJptr = dynamic_cast<IntJ*>(fmt_.jsonMonitor_->getMergedIntJForLumi("Processed",lumi));
376  if (!lumiProcessedJptr)
377  throw cms::Exception("FastMonitoringService") << "Internal error: got null pointer from FastMonitor";
378  processedEventsPerLumi_[lumi] = std::pair<unsigned int,bool>(lumiProcessedJptr->value(),false);
379 
380  {
381  auto itr = sourceEventsReport_.find(lumi);
382  if (itr==sourceEventsReport_.end()) {
383  //check if exception has been thrown (in case of Global/Stream early termination, for this LS)
384  bool exception_detected = exception_detected_;
385  for (auto ex : exceptionInLS_)
386  if (lumi == ex) exception_detected=true;
387 
388  if (edm::shutdown_flag || exception_detected) {
389  edm::LogInfo("FastMonitoringService") << "Run interrupted. Skip writing EoL information -: "
390  << processedEventsPerLumi_[lumi].first << " events were processed in LUMI " << lumi;
391  //this will prevent output modules from producing json file for possibly incomplete lumi
392  processedEventsPerLumi_[lumi].first=0;
393  processedEventsPerLumi_[lumi].second=true;
394  return;
395  }
396  //disable this exception, so service can be used standalone (will be thrown if output module asks for this information)
397  //throw cms::Exception("FastMonitoringService") << "SOURCE did not send update for lumi block. LUMI -:" << lumi;
398  }
399  else {
400  if (itr->second!=processedEventsPerLumi_[lumi].first) {
401  throw cms::Exception("FastMonitoringService") << "MISMATCH with SOURCE update. LUMI -: "
402  << lumi
403  << ", events(processed):" << processedEventsPerLumi_[lumi].first
404  << " events(source):" << itr->second;
405  }
406  sourceEventsReport_.erase(itr);
407  }
408  }
409  edm::LogInfo("FastMonitoringService") << "Statistics for lumisection -: lumi = " << lumi << " events = "
410  << lumiProcessedJptr->value() << " time = " << usecondsForLumi/1000000
411  << " size = " << accuSize << " thr = " << throughput;
412  delete lumiProcessedJptr;
413 
414  //full global and stream merge&output for this lumi
415  fmt_.jsonMonitor_->outputFullJSON(slow.string(),lumi);//full global and stream merge and JSON write for this lumi
416  fmt_.jsonMonitor_->discardCollected(lumi);//we don't do further updates for this lumi
417 
419  }
#define LogDebug(id)
std::map< unsigned int, timeval > lumiStartTime_
double throughputFactor()
std::map< unsigned int, unsigned long > accuSize_
tuple lumi
Definition: fjr2json.py:35
volatile std::atomic< bool > shutdown_flag
void doSnapshot(const unsigned int ls, const bool isGlobalEOL)
std::unique_ptr< jsoncollector::FastMonitor > jsonMonitor_
std::map< unsigned int, std::pair< unsigned int, bool > > processedEventsPerLumi_
boost::filesystem::path workingDirectory_
std::map< unsigned int, unsigned int > sourceEventsReport_
std::vector< unsigned int > exceptionInLS_
void evf::FastMonitoringService::preModuleBeginJob ( edm::ModuleDescription const &  desc)

Definition at line 280 of file FastMonitoringService.cc.

References encModule_, fmt_, CommonMethods::lock(), edm::ModuleDescription::moduleName(), evf::FastMonitoringThread::monlock_, nOutputModules_, evf::FastMonitoringService::Encoding::update(), and evf::FastMonitoringService::Encoding::updateReserved().

Referenced by FastMonitoringService().

281  {
282  std::lock_guard<std::mutex> lock(fmt_.monlock_);
283  //std::cout << " Pre module Begin Job module: " << desc.moduleName() << std::endl;
284 
285  //build a map of modules keyed by their module description address
286  //here we need to treat output modules in a special way so they can be easily singled out
287  if(desc.moduleName() == "Stream" || desc.moduleName() == "ShmStreamConsumer" || desc.moduleName() == "EvFOutputModule" ||
288  desc.moduleName() == "EventStreamFileWriter" || desc.moduleName() == "PoolOutputModule") {
289  encModule_.updateReserved((void*)&desc);
290  nOutputModules_++;
291  }
292  else
293  encModule_.update((void*)&desc);
294  }
void evf::FastMonitoringService::preModuleEvent ( edm::StreamContext const &  sc,
edm::ModuleCallingContext const &  mcc 
)

Definition at line 541 of file FastMonitoringService.cc.

References microstate_, edm::ModuleCallingContext::moduleDescription(), edm::StreamContext::streamID(), and edm::StreamID::value().

Referenced by FastMonitoringService().

542  {
543  microstate_[sc.streamID().value()] = (void*)(mcc.moduleDescription());
544  }
std::vector< const void * > microstate_
void evf::FastMonitoringService::prePathEvent ( edm::StreamContext const &  sc,
edm::PathContext const &  pc 
)

Definition at line 467 of file FastMonitoringService.cc.

References collectedPathList_, encPath_, edm::EventID::event(), eventCountForPathInit_, edm::StreamContext::eventID(), firstEventId_, fmt_, CommonMethods::lock(), evf::FastMonitoringThread::m_data, makePathLegendaJson(), ministate_, evf::FastMonitoringThread::MonitorData::ministateBins_, evf::FastMonitoringThread::monlock_, pathLegendFileJson_, pathLegendWritten_, edm::PathContext::pathName(), edm::StreamContext::streamID(), AlCaHLTBitMon_QueryRunRegistry::string, and unlikely.

Referenced by FastMonitoringService().

468  {
469  //make sure that all path names are retrieved before allowing ministate to change
470  //hack: assume memory is synchronized after ~50 events seen by each stream
471  if (unlikely(eventCountForPathInit_[sc.streamID()]<50) && false==collectedPathList_[sc.streamID()]->load(std::memory_order_acquire))
472  {
473  //protection between stream threads, as well as the service monitoring thread
474  std::lock_guard<std::mutex> lock(fmt_.monlock_);
475 
476  if (firstEventId_[sc.streamID()]==0)
477  firstEventId_[sc.streamID()]=sc.eventID().event();
478  if (sc.eventID().event()==firstEventId_[sc.streamID()])
479  {
480  encPath_[sc.streamID()].update((void*)&pc.pathName());
481  return;
482  }
483  else {
484  //finished collecting path names
485  collectedPathList_[sc.streamID()]->store(true,std::memory_order_seq_cst);
486  fmt_.m_data.ministateBins_=encPath_[sc.streamID()].vecsize();
487  if (!pathLegendWritten_) {
488  std::string pathLegendStrJson = makePathLegendaJson();
489  FileIO::writeStringToFile(pathLegendFileJson_, pathLegendStrJson);
490  pathLegendWritten_=true;
491  }
492  }
493  }
494  else {
495  ministate_[sc.streamID()] = &(pc.pathName());
496  }
497  }
std::vector< unsigned int > eventCountForPathInit_
#define unlikely(x)
std::vector< std::atomic< bool > * > collectedPathList_
std::vector< unsigned long > firstEventId_
std::vector< const void * > ministate_
std::vector< Encoding > encPath_
void evf::FastMonitoringService::preSourceEarlyTermination ( edm::TerminationOrigin  to)
void evf::FastMonitoringService::preSourceEvent ( edm::StreamID  sid)

Definition at line 531 of file FastMonitoringService.cc.

References microstate_, evf::MicroStateService::mInput, evf::MicroStateService::reservedMicroStateNames, and edm::StreamID::value().

Referenced by FastMonitoringService().

532  {
534  }
static const edm::ModuleDescription reservedMicroStateNames[mCOUNT]
std::vector< const void * > microstate_
unsigned int value() const
Definition: StreamID.h:46
void evf::FastMonitoringService::preStreamBeginLumi ( edm::StreamContext const &  sc)

Definition at line 427 of file FastMonitoringService.cc.

References edm::StreamContext::eventID(), fmt_, CommonMethods::lock(), edm::EventID::luminosityBlock(), evf::FastMonitoringThread::m_data, evf::MicroStateService::mFwkOvh, microstate_, ministate_, evf::FastMonitoringThread::monlock_, nopath_, evf::FastMonitoringThread::MonitorData::processed_, evf::MicroStateService::reservedMicroStateNames, edm::StreamContext::streamID(), evf::FastMonitoringThread::MonitorData::streamLumi_, and edm::StreamID::value().

Referenced by FastMonitoringService().

428  {
429  unsigned int sid = sc.streamID().value();
430  std::lock_guard<std::mutex> lock(fmt_.monlock_);
431  fmt_.m_data.streamLumi_[sid] = sc.eventID().luminosityBlock();
432 
433  //reset collected values for this stream
434  *(fmt_.m_data.processed_[sid])=0;
435 
436  ministate_[sid]=&nopath_;
438  }
static const edm::ModuleDescription reservedMicroStateNames[mCOUNT]
static const std::string nopath_
std::vector< const void * > microstate_
std::vector< jsoncollector::AtomicMonUInt * > processed_
std::vector< const void * > ministate_
std::vector< unsigned int > streamLumi_
void evf::FastMonitoringService::preStreamEarlyTermination ( edm::StreamContext const &  sc,
edm::TerminationOrigin  to 
)

Definition at line 232 of file FastMonitoringService.cc.

References edm::StreamContext::eventID(), edm::ExceptionFromAnotherContext, edm::ExceptionFromThisContext, exceptionInLS_, edm::ExternalSignal, fmt_, CommonMethods::lock(), edm::EventID::luminosityBlock(), evf::FastMonitoringThread::monlock_, edm::StreamContext::streamID(), AlCaHLTBitMon_QueryRunRegistry::string, and edm::StreamID::value().

Referenced by FastMonitoringService().

233  {
234  std::string context;
235  if (to==edm::TerminationOrigin::ExceptionFromThisContext) context = " FromThisContext ";
236  if (to==edm::TerminationOrigin::ExceptionFromAnotherContext) context = " FromAnotherContext";
237  if (to==edm::TerminationOrigin::ExternalSignal) context = " FromExternalSignal";
238  edm::LogInfo("FastMonitoringService") << " STREAM " << sc.streamID().value() << " earlyTermination -: ID:"<< sc.eventID()
239  << " LS:" << sc.eventID().luminosityBlock() << " " << context;
240  std::lock_guard<std::mutex> lock(fmt_.monlock_);
241  exceptionInLS_.push_back(sc.eventID().luminosityBlock());
242  //exception_detected_=true;
243  }
std::vector< unsigned int > exceptionInLS_
void evf::FastMonitoringService::preStreamEndLumi ( edm::StreamContext const &  sc)

Definition at line 445 of file FastMonitoringService.cc.

References doStreamEOLSnapshot(), edm::StreamContext::eventID(), fmt_, svgfig::load(), CommonMethods::lock(), edm::EventID::luminosityBlock(), evf::MicroStateService::mEoL, microstate_, ministate_, evf::FastMonitoringThread::monlock_, nopath_, evf::MicroStateService::reservedMicroStateNames, streamCounterUpdating_, edm::StreamContext::streamID(), and edm::StreamID::value().

Referenced by FastMonitoringService().

446  {
447  unsigned int sid = sc.streamID().value();
448  std::lock_guard<std::mutex> lock(fmt_.monlock_);
449 
450  #if ATOMIC_LEVEL>=2
451  //spinlock to make sure we are not still updating event counter somewhere
452  while (streamCounterUpdating_[sid]->load(std::memory_order_acquire)) {}
453  #endif
454 
455  //update processed count to be complete at this time
456  doStreamEOLSnapshot(sc.eventID().luminosityBlock(),sid);
457  //reset this in case stream does not get notified of next lumi (we keep processed events only)
458  ministate_[sid]=&nopath_;
460  }
static const edm::ModuleDescription reservedMicroStateNames[mCOUNT]
std::vector< std::atomic< bool > * > streamCounterUpdating_
void doStreamEOLSnapshot(const unsigned int ls, const unsigned int streamID)
static const std::string nopath_
def load
Definition: svgfig.py:546
std::vector< const void * > microstate_
std::vector< const void * > ministate_
void evf::FastMonitoringService::reportEventsThisLumiInSource ( unsigned int  lumi,
unsigned int  events 
)

Definition at line 699 of file FastMonitoringService.cc.

References fmt_, CommonMethods::lock(), evf::FastMonitoringThread::monlock_, and sourceEventsReport_.

Referenced by FedRawDataInputSource::checkNextEvent(), and FedRawDataInputSource::getNextEvent().

700  {
701 
702  std::lock_guard<std::mutex> lock(fmt_.monlock_);
703  auto itr = sourceEventsReport_.find(lumi);
704  if (itr!=sourceEventsReport_.end())
705  itr->second+=events;
706  else
708 
709  }
tuple lumi
Definition: fjr2json.py:35
tuple events
Definition: patZpeak.py:19
std::map< unsigned int, unsigned int > sourceEventsReport_
void evf::FastMonitoringService::reportLockWait ( unsigned int  ls,
double  waitTime,
unsigned int  lockCount 
)

Definition at line 615 of file FastMonitoringService.cc.

References fmt_, CommonMethods::lock(), lockStatsDuringLumi_, eostools::ls(), and evf::FastMonitoringThread::monlock_.

Referenced by FedRawDataInputSource::readSupervisor().

616  {
617  std::lock_guard<std::mutex> lock(fmt_.monlock_);
618  lockStatsDuringLumi_[ls]=std::pair<double,unsigned int>(waitTime,lockCount);
619 
620  }
def ls
Definition: eostools.py:348
std::map< unsigned int, std::pair< double, unsigned int > > lockStatsDuringLumi_
void evf::FastMonitoringService::setExceptionDetected ( unsigned int  ls)

Definition at line 269 of file FastMonitoringService.cc.

References exception_detected_, and exceptionInLS_.

Referenced by FedRawDataInputSource::getNextEvent().

269  {
270  if (!ls) exception_detected_=true;
271  else exceptionInLS_.push_back(ls);
272  }
def ls
Definition: eostools.py:348
std::vector< unsigned int > exceptionInLS_
void evf::FastMonitoringService::setMicroState ( MicroStateService::Microstate  m)
virtual

Implements evf::MicroStateService.

Definition at line 556 of file FastMonitoringService.cc.

References i, microstate_, nStreams_, and evf::MicroStateService::reservedMicroStateNames.

557  {
558  for (unsigned int i=0;i<nStreams_;i++)
560  }
int i
Definition: DBlmapReader.cc:9
static const edm::ModuleDescription reservedMicroStateNames[mCOUNT]
std::vector< const void * > microstate_
void evf::FastMonitoringService::setMicroState ( edm::StreamID  sid,
MicroStateService::Microstate  m 
)
virtual
bool evf::FastMonitoringService::shouldWriteFiles ( unsigned int  lumi,
unsigned int *  proc = 0 
)
inline

Definition at line 156 of file FastMonitoringService.h.

References emptyLumisectionMode_, getAbortFlagForLumi(), getEventsProcessedForLumi(), and proc.

Referenced by DQMFileSaver::globalEndLuminosityBlock(), and TriggerJSONMonitoring::globalEndLuminosityBlockSummary().

157  {
158  unsigned int processed = getEventsProcessedForLumi(lumi);
159  if (proc) *proc = processed;
160  return !getAbortFlagForLumi(lumi) && (processed || emptyLumisectionMode_);
161  }
unsigned int getEventsProcessedForLumi(unsigned int lumi, bool *abortFlag=0)
TrainProcessor *const proc
Definition: MVATrainer.cc:101
tuple lumi
Definition: fjr2json.py:35
bool getAbortFlagForLumi(unsigned int lumi)
void evf::FastMonitoringService::startedLookingForFile ( )

Definition at line 581 of file FastMonitoringService.cc.

References fileLookStart_.

Referenced by FedRawDataInputSource::readSupervisor().

581  {
582  gettimeofday(&fileLookStart_, 0);
583  /*
584  std::cout << "Started looking for .raw file at: s=" << fileLookStart_.tv_sec << ": ms = "
585  << fileLookStart_.tv_usec / 1000.0 << std::endl;
586  */
587  }
void evf::FastMonitoringService::stoppedLookingForFile ( unsigned int  lumi)

Definition at line 589 of file FastMonitoringService.cc.

References avgLeadTime_, fileLookStart_, fileLookStop_, fmt_, i, leadTimes_, CommonMethods::lock(), fjr2json::lumi, lumiFromSource_, and evf::FastMonitoringThread::monlock_.

Referenced by FedRawDataInputSource::readSupervisor().

589  {
590  gettimeofday(&fileLookStop_, 0);
591  /*
592  std::cout << "Stopped looking for .raw file at: s=" << fileLookStop_.tv_sec << ": ms = "
593  << fileLookStop_.tv_usec / 1000.0 << std::endl;
594  */
595  std::lock_guard<std::mutex> lock(fmt_.monlock_);
596 
597  if (lumi>lumiFromSource_) {
599  leadTimes_.clear();
600  }
601  unsigned long elapsedTime = (fileLookStop_.tv_sec - fileLookStart_.tv_sec) * 1000000 // sec to us
602  + (fileLookStop_.tv_usec - fileLookStart_.tv_usec); // us
603  // add this to lead times for this lumi
604  leadTimes_.push_back((double)elapsedTime);
605 
606  // recompute average lead time for this lumi
607  if (leadTimes_.size() == 1) avgLeadTime_[lumi] = leadTimes_[0];
608  else {
609  double totTime = 0;
610  for (unsigned int i = 0; i < leadTimes_.size(); i++) totTime += leadTimes_[i];
611  avgLeadTime_[lumi] = 0.001*(totTime / leadTimes_.size());
612  }
613  }
int i
Definition: DBlmapReader.cc:9
tuple lumi
Definition: fjr2json.py:35
std::map< unsigned int, double > avgLeadTime_
std::vector< double > leadTimes_

Member Data Documentation

std::map<unsigned int, unsigned long> evf::FastMonitoringService::accuSize_
private
std::map<unsigned int, double> evf::FastMonitoringService::avgLeadTime_
private

Definition at line 233 of file FastMonitoringService.h.

Referenced by doSnapshot(), preGlobalBeginLumi(), and stoppedLookingForFile().

std::vector<std::atomic<bool>*> evf::FastMonitoringService::collectedPathList_
private

Definition at line 248 of file FastMonitoringService.h.

Referenced by preallocate(), and prePathEvent().

bool evf::FastMonitoringService::emptyLumisectionMode_ = false
private

Definition at line 270 of file FastMonitoringService.h.

Referenced by preallocate(), and shouldWriteFiles().

Encoding evf::FastMonitoringService::encModule_
private
std::vector<Encoding> evf::FastMonitoringService::encPath_
private
std::vector<unsigned int> evf::FastMonitoringService::eventCountForPathInit_
private

Definition at line 249 of file FastMonitoringService.h.

Referenced by postEvent(), preallocate(), and prePathEvent().

bool evf::FastMonitoringService::exception_detected_ = false
private
std::vector<unsigned int> evf::FastMonitoringService::exceptionInLS_
private
std::string evf::FastMonitoringService::fastMicrostateDefPath_
private

Definition at line 210 of file FastMonitoringService.h.

Referenced by preallocate().

unsigned int evf::FastMonitoringService::fastMonIntervals_
private

Definition at line 208 of file FastMonitoringService.h.

Referenced by dowork().

std::string evf::FastMonitoringService::fastName_
private

Definition at line 211 of file FastMonitoringService.h.

Referenced by preallocate().

std::string evf::FastMonitoringService::fastPath_
private

Definition at line 211 of file FastMonitoringService.h.

Referenced by dowork(), and preallocate().

timeval evf::FastMonitoringService::fileLookStart_
private

Definition at line 216 of file FastMonitoringService.h.

Referenced by startedLookingForFile(), and stoppedLookingForFile().

timeval evf::FastMonitoringService::fileLookStop_
private

Definition at line 216 of file FastMonitoringService.h.

Referenced by stoppedLookingForFile().

std::map<unsigned int, unsigned int> evf::FastMonitoringService::filesProcessedDuringLumi_
private

Definition at line 234 of file FastMonitoringService.h.

Referenced by accumulateFileSize(), doSnapshot(), and preGlobalBeginLumi().

std::vector<unsigned long> evf::FastMonitoringService::firstEventId_
private

Definition at line 247 of file FastMonitoringService.h.

Referenced by preallocate(), and prePathEvent().

FastMonitoringThread evf::FastMonitoringService::fmt_
private
bool evf::FastMonitoringService::isGlobalLumiTransition_
private
unsigned int evf::FastMonitoringService::lastGlobalLumi_
private

Definition at line 218 of file FastMonitoringService.h.

Referenced by dowork(), preallocate(), and preGlobalBeginLumi().

std::queue<unsigned int> evf::FastMonitoringService::lastGlobalLumisClosed_
private

Definition at line 219 of file FastMonitoringService.h.

Referenced by postGlobalEndLumi(), and preGlobalBeginLumi().

std::vector<double> evf::FastMonitoringService::leadTimes_
private

Definition at line 237 of file FastMonitoringService.h.

Referenced by stoppedLookingForFile().

std::map<unsigned int, std::pair<double,unsigned int> > evf::FastMonitoringService::lockStatsDuringLumi_
private

Definition at line 238 of file FastMonitoringService.h.

Referenced by doSnapshot(), preGlobalBeginLumi(), and reportLockWait().

unsigned int evf::FastMonitoringService::lumiFromSource_
private

Definition at line 221 of file FastMonitoringService.h.

Referenced by preallocate(), and stoppedLookingForFile().

std::map<unsigned int, timeval> evf::FastMonitoringService::lumiStartTime_
private

Definition at line 215 of file FastMonitoringService.h.

Referenced by preGlobalBeginLumi(), and preGlobalEndLumi().

FastMonitoringThread::Macrostate evf::FastMonitoringService::macrostate_
private
const std::string evf::FastMonitoringService::macroStateNames
static
Initial value:
=
{"Init","JobReady","RunGiven","Running",
"Stopping","Done","JobEnded","Error","ErrorEnded","End",
"Invalid"}

Definition at line 108 of file FastMonitoringService.h.

std::vector<const void*> evf::FastMonitoringService::microstate_
private
std::string evf::FastMonitoringService::microstateDefPath_
private

Definition at line 210 of file FastMonitoringService.h.

Referenced by preallocate().

std::vector<const void*> evf::FastMonitoringService::ministate_
private
std::string evf::FastMonitoringService::moduleLegendFile_
private

Definition at line 260 of file FastMonitoringService.h.

Referenced by preallocate().

std::string evf::FastMonitoringService::moduleLegendFileJson_
private

Definition at line 261 of file FastMonitoringService.h.

Referenced by postBeginJob(), and preallocate().

std::atomic<bool> evf::FastMonitoringService::monInit_
private

Definition at line 267 of file FastMonitoringService.h.

Referenced by dowork(), and preallocate().

const std::string evf::FastMonitoringService::nopath_ = "NoPath"
static
unsigned int evf::FastMonitoringService::nOutputModules_ =0
private

Definition at line 265 of file FastMonitoringService.h.

Referenced by makeModuleLegendaJson(), and preModuleBeginJob().

unsigned int evf::FastMonitoringService::nStreams_
private

Definition at line 205 of file FastMonitoringService.h.

Referenced by doSnapshot(), preallocate(), and setMicroState().

unsigned int evf::FastMonitoringService::nThreads_
private

Definition at line 206 of file FastMonitoringService.h.

Referenced by preallocate().

std::string evf::FastMonitoringService::pathLegendFile_
private

Definition at line 262 of file FastMonitoringService.h.

Referenced by preallocate().

std::string evf::FastMonitoringService::pathLegendFileJson_
private

Definition at line 263 of file FastMonitoringService.h.

Referenced by preallocate(), and prePathEvent().

bool evf::FastMonitoringService::pathLegendWritten_ = false
private

Definition at line 264 of file FastMonitoringService.h.

Referenced by prePathEvent().

std::vector<bool> evf::FastMonitoringService::pathNamesReady_
private

Definition at line 250 of file FastMonitoringService.h.

std::map<unsigned int, std::pair<unsigned int,bool> > evf::FastMonitoringService::processedEventsPerLumi_
private
boost::filesystem::path evf::FastMonitoringService::runDirectory_
private

Definition at line 252 of file FastMonitoringService.h.

Referenced by getRunDirName(), and preallocate().

int evf::FastMonitoringService::sleepTime_
private

Definition at line 207 of file FastMonitoringService.h.

Referenced by dowork().

std::string evf::FastMonitoringService::slowName_
private

Definition at line 211 of file FastMonitoringService.h.

Referenced by preGlobalEndLumi().

unsigned int evf::FastMonitoringService::snapCounter_ = 0
private

Definition at line 209 of file FastMonitoringService.h.

Referenced by dowork().

std::map<unsigned int,unsigned int> evf::FastMonitoringService::sourceEventsReport_
private

Definition at line 254 of file FastMonitoringService.h.

Referenced by preGlobalEndLumi(), and reportEventsThisLumiInSource().

std::vector<std::atomic<bool>*> evf::FastMonitoringService::streamCounterUpdating_
private

Definition at line 245 of file FastMonitoringService.h.

Referenced by postEvent(), preallocate(), and preStreamEndLumi().

bool evf::FastMonitoringService::threadIDAvailable_ = false
private

Definition at line 256 of file FastMonitoringService.h.

Referenced by preallocate().

std::vector<const void*> evf::FastMonitoringService::threadMicrostate_
private

Definition at line 229 of file FastMonitoringService.h.

std::atomic<unsigned long> evf::FastMonitoringService::totalEventsProcessed_
private

Definition at line 258 of file FastMonitoringService.h.

Referenced by postEvent().

boost::filesystem::path evf::FastMonitoringService::workingDirectory_
private

Definition at line 252 of file FastMonitoringService.h.

Referenced by preallocate(), and preGlobalEndLumi().