CMS 3D CMS Logo

List of all members | Classes | Public Member Functions | Static Public Member Functions | Static Public Attributes | Private Member Functions | Private Attributes
evf::FastMonitoringService Class Reference

#include <FastMonitoringService.h>

Inheritance diagram for evf::FastMonitoringService:
evf::MicroStateService

Classes

struct  Encoding
 

Public Member Functions

void accumulateFileSize (unsigned int lumi, unsigned long fileSize)
 
 FastMonitoringService (const edm::ParameterSet &, edm::ActivityRegistry &)
 
bool getAbortFlagForLumi (unsigned int lumi)
 
unsigned int getEventsProcessedForLumi (unsigned int lumi, bool *abortFlag=0)
 
std::string getRunDirName () const
 
void jobFailure ()
 
std::string makeInputLegendaJson ()
 
std::string makeModuleLegendaJson ()
 
std::string makePathLegendaJson ()
 
void postBeginJob ()
 
void postEndJob ()
 
void postEvent (edm::StreamContext const &)
 
void postGlobalBeginRun (edm::GlobalContext const &)
 
void postGlobalEndLumi (edm::GlobalContext const &)
 
void postModuleEvent (edm::StreamContext const &, edm::ModuleCallingContext const &)
 
void postSourceEvent (edm::StreamID)
 
void postStreamBeginLumi (edm::StreamContext const &)
 
void postStreamEndLumi (edm::StreamContext const &)
 
void preallocate (edm::service::SystemBounds const &)
 
void preBeginJob (edm::PathsAndConsumesOfModulesBase const &, edm::ProcessContext const &pc)
 
void preEvent (edm::StreamContext const &)
 
void preGlobalBeginLumi (edm::GlobalContext const &)
 
void preGlobalEarlyTermination (edm::GlobalContext const &, edm::TerminationOrigin)
 
void preGlobalEndLumi (edm::GlobalContext const &)
 
void preModuleBeginJob (edm::ModuleDescription const &)
 
void preModuleEvent (edm::StreamContext const &, edm::ModuleCallingContext const &)
 
void prePathEvent (edm::StreamContext const &, edm::PathContext const &)
 
void preSourceEarlyTermination (edm::TerminationOrigin)
 
void preSourceEvent (edm::StreamID)
 
void preStreamBeginLumi (edm::StreamContext const &)
 
void preStreamEarlyTermination (edm::StreamContext const &, edm::TerminationOrigin)
 
void preStreamEndLumi (edm::StreamContext const &)
 
void reportLockWait (unsigned int ls, double waitTime, unsigned int lockCount)
 
void setExceptionDetected (unsigned int ls)
 
void setInputSource (FedRawDataInputSource *inputSource)
 
void setInState (FastMonitoringThread::InputState inputState)
 
void setInStateSup (FastMonitoringThread::InputState inputState)
 
void setMicroState (MicroStateService::Microstate) override
 
void setMicroState (edm::StreamID, MicroStateService::Microstate) override
 
bool shouldWriteFiles (unsigned int lumi, unsigned int *proc=0)
 
void startedLookingForFile ()
 
void stoppedLookingForFile (unsigned int lumi)
 
 ~FastMonitoringService () override
 
- Public Member Functions inherited from evf::MicroStateService
virtual std::string getMicroState1 ()
 
virtual std::string const & getMicroState2 ()
 
 MicroStateService (const edm::ParameterSet &, edm::ActivityRegistry &)
 
virtual ~MicroStateService ()
 

Static Public Member Functions

static void fillDescriptions (edm::ConfigurationDescriptions &descriptions)
 

Static Public Attributes

static const std::string inputStateNames [FastMonitoringThread::inCOUNT]
 
static const std::string macroStateNames [FastMonitoringThread::MCOUNT]
 
static const std::string nopath_ = "NoPath"
 
- Static Public Attributes inherited from evf::MicroStateService
static const edm::ModuleDescription reservedMicroStateNames [mCOUNT]
 

Private Member Functions

void doSnapshot (const unsigned int ls, const bool isGlobalEOL)
 
void doStreamEOLSnapshot (const unsigned int ls, const unsigned int streamID)
 
void dowork ()
 

Private Attributes

std::map< unsigned int, unsigned long > accuSize_
 
std::map< unsigned int, double > avgLeadTime_
 
std::vector< std::atomic< bool > * > collectedPathList_
 
Encoding encModule_
 
std::vector< EncodingencPath_
 
std::vector< ContainableAtomic< unsigned int > > eventCountForPathInit_
 
bool exception_detected_ = false
 
std::vector< unsigned int > exceptionInLS_
 
std::string fastMicrostateDefPath_
 
unsigned int fastMonIntervals_
 
std::string fastName_
 
std::string fastPath_
 
std::vector< std::string > fastPathList_
 
timeval fileLookStart_
 
timeval fileLookStop_
 
bool filePerFwkStream_
 
std::map< unsigned int, unsigned int > filesProcessedDuringLumi_
 
std::vector< unsigned long > firstEventId_
 
FastMonitoringThread fmt_
 
std::string inputLegendFileJson_
 
FedRawDataInputSourceinputSource_ = 0
 
std::atomic< FastMonitoringThread::InputStateinputState_ { FastMonitoringThread::InputState::inInit }
 
std::atomic< FastMonitoringThread::InputStateinputSupervisorState_ { FastMonitoringThread::InputState::inInit }
 
std::atomic< bool > isInitTransition_
 
unsigned int lastGlobalLumi_
 
std::vector< double > leadTimes_
 
std::map< unsigned int, std::pair< double, unsigned int > > lockStatsDuringLumi_
 
unsigned int lumiFromSource_
 
std::map< unsigned int, timeval > lumiStartTime_
 
std::atomic< FastMonitoringThread::Macrostatemacrostate_
 
std::vector< ContainableAtomic< const void * > > microstate_
 
std::string microstateDefPath_
 
std::vector< ContainableAtomic< const void * > > ministate_
 
std::string moduleLegendFile_
 
std::string moduleLegendFileJson_
 
std::atomic< bool > monInit_
 
unsigned int nOutputModules_ =0
 
unsigned int nStreams_
 
unsigned int nThreads_
 
std::string pathLegendFile_
 
std::string pathLegendFileJson_
 
bool pathLegendWritten_ = false
 
std::vector< bool > pathNamesReady_
 
std::map< unsigned int, std::pair< unsigned int, bool > > processedEventsPerLumi_
 
boost::filesystem::path runDirectory_
 
int sleepTime_
 
std::string slowName_
 
unsigned int snapCounter_ = 0
 
std::vector< std::atomic< bool > * > streamCounterUpdating_
 
bool threadIDAvailable_ = false
 
std::vector< ContainableAtomic< const void * > > threadMicrostate_
 
std::atomic< unsigned long > totalEventsProcessed_
 
boost::filesystem::path workingDirectory_
 

Additional Inherited Members

- Public Types inherited from evf::MicroStateService
enum  Microstate {
  mInvalid = 0, mIdle, mFwkOvhSrc, mFwkOvhMod,
  mFwkEoL, mInput, mDqm, mBoL,
  mEoL, mGlobEoL, mCOUNT
}
 
- Static Protected Attributes inherited from evf::MicroStateService
static const std::string default_return_ ="NotImplemented"
 

Detailed Description

Definition at line 71 of file FastMonitoringService.h.

Constructor & Destructor Documentation

evf::FastMonitoringService::FastMonitoringService ( const edm::ParameterSet iPS,
edm::ActivityRegistry reg 
)

Definition at line 60 of file FastMonitoringService.cc.

References Exception, fastMicrostateDefPath_, jobFailure(), microstateDefPath_, postBeginJob(), postEndJob(), postEvent(), postGlobalEndLumi(), postModuleEvent(), postSourceEvent(), postStreamBeginLumi(), postStreamEndLumi(), preallocate(), preBeginJob(), preEvent(), preGlobalBeginLumi(), preGlobalEarlyTermination(), preGlobalEndLumi(), preModuleBeginJob(), preModuleEvent(), prePathEvent(), preSourceEarlyTermination(), preSourceEvent(), preStreamBeginLumi(), preStreamEarlyTermination(), preStreamEndLumi(), trackingPlots::stat, AlCaHLTBitMon_QueryRunRegistry::string, edm::ActivityRegistry::watchJobFailure(), edm::ActivityRegistry::watchPostBeginJob(), edm::ActivityRegistry::watchPostEndJob(), edm::ActivityRegistry::watchPostEvent(), edm::ActivityRegistry::watchPostGlobalEndLumi(), edm::ActivityRegistry::watchPostModuleEvent(), edm::ActivityRegistry::watchPostSourceEvent(), edm::ActivityRegistry::watchPostStreamBeginLumi(), edm::ActivityRegistry::watchPostStreamEndLumi(), edm::ActivityRegistry::watchPreallocate(), edm::ActivityRegistry::watchPreBeginJob(), edm::ActivityRegistry::watchPreEvent(), edm::ActivityRegistry::watchPreGlobalBeginLumi(), edm::ActivityRegistry::watchPreGlobalEarlyTermination(), edm::ActivityRegistry::watchPreGlobalEndLumi(), edm::ActivityRegistry::watchPreModuleBeginJob(), edm::ActivityRegistry::watchPreModuleEvent(), edm::ActivityRegistry::watchPrePathEvent(), edm::ActivityRegistry::watchPreSourceEarlyTermination(), edm::ActivityRegistry::watchPreSourceEvent(), edm::ActivityRegistry::watchPreStreamBeginLumi(), edm::ActivityRegistry::watchPreStreamEarlyTermination(), and edm::ActivityRegistry::watchPreStreamEndLumi().

61  :
62  MicroStateService(iPS,reg)
64  ,nStreams_(0)//until initialized
65  ,sleepTime_(iPS.getUntrackedParameter<int>("sleepTime", 1))
66  ,fastMonIntervals_(iPS.getUntrackedParameter<unsigned int>("fastMonIntervals", 2))
67  ,fastName_("fastmoni")
68  ,slowName_("slowmoni")
69  ,filePerFwkStream_(iPS.getUntrackedParameter<bool>("filePerFwkStream", false))
71  {
72  reg.watchPreallocate(this, &FastMonitoringService::preallocate);//receiving information on number of threads
74 
79 
83 
88 
90 
93 
94  reg.watchPreSourceEvent(this,&FastMonitoringService::preSourceEvent);//source (with streamID of requestor)
96 
99 
103 
104  //find microstate definition path (required by the module)
105  struct stat statbuf;
106  std::string microstateBaseSuffix = "src/EventFilter/Utilities/plugins/microstatedef.jsd";
107  std::string microstatePath = std::string(getenv("CMSSW_BASE")) + "/" + microstateBaseSuffix;
108  if (stat(microstatePath.c_str(), &statbuf)) {
109  microstatePath = std::string(getenv("CMSSW_RELEASE_BASE")) + "/" + microstateBaseSuffix;
110  if (stat(microstatePath.c_str(), &statbuf)) {
111  microstatePath = microstateBaseSuffix;
112  if (stat(microstatePath.c_str(), &statbuf))
113  throw cms::Exception("FastMonitoringService") << "microstate definition file not found";
114  }
115  }
116  fastMicrostateDefPath_ = microstateDefPath_ = microstatePath;
117  }
void prePathEvent(edm::StreamContext const &, edm::PathContext const &)
void watchPreStreamEarlyTermination(PreStreamEarlyTermination::slot_type const &iSlot)
T getUntrackedParameter(std::string const &, T const &) const
void watchPreEvent(PreEvent::slot_type const &iSlot)
void watchPrePathEvent(PrePathEvent::slot_type const &iSlot)
void watchPreallocate(Preallocate::slot_type const &iSlot)
void watchPostEndJob(PostEndJob::slot_type const &iSlot)
void preallocate(edm::service::SystemBounds const &)
void preGlobalBeginLumi(edm::GlobalContext const &)
void watchPreModuleEvent(PreModuleEvent::slot_type const &iSlot)
void postGlobalEndLumi(edm::GlobalContext const &)
void postEvent(edm::StreamContext const &)
void watchPreGlobalEndLumi(PreGlobalEndLumi::slot_type const &iSlot)
void watchPostEvent(PostEvent::slot_type const &iSlot)
void watchPostStreamEndLumi(PostStreamEndLumi::slot_type const &iSlot)
void watchPreGlobalBeginLumi(PreGlobalBeginLumi::slot_type const &iSlot)
void watchPostModuleEvent(PostModuleEvent::slot_type const &iSlot)
void watchPostSourceEvent(PostSourceEvent::slot_type const &iSlot)
void preGlobalEndLumi(edm::GlobalContext const &)
void watchPreStreamEndLumi(PreStreamEndLumi::slot_type const &iSlot)
void preGlobalEarlyTermination(edm::GlobalContext const &, edm::TerminationOrigin)
void watchPreSourceEarlyTermination(PreSourceEarlyTermination::slot_type const &iSlot)
void watchJobFailure(JobFailure::slot_type const &iSlot)
convenience function for attaching to signal
void preModuleBeginJob(edm::ModuleDescription const &)
MicroStateService(const edm::ParameterSet &, edm::ActivityRegistry &)
void preStreamEndLumi(edm::StreamContext const &)
void watchPostStreamBeginLumi(PostStreamBeginLumi::slot_type const &iSlot)
void preStreamEarlyTermination(edm::StreamContext const &, edm::TerminationOrigin)
void watchPreGlobalEarlyTermination(PreGlobalEarlyTermination::slot_type const &iSlot)
static const int nReservedModules
void watchPostGlobalEndLumi(PostGlobalEndLumi::slot_type const &iSlot)
void watchPreModuleBeginJob(PreModuleBeginJob::slot_type const &iSlot)
void postModuleEvent(edm::StreamContext const &, edm::ModuleCallingContext const &)
void postStreamBeginLumi(edm::StreamContext const &)
void postStreamEndLumi(edm::StreamContext const &)
void preStreamBeginLumi(edm::StreamContext const &)
std::atomic< unsigned long > totalEventsProcessed_
void watchPreStreamBeginLumi(PreStreamBeginLumi::slot_type const &iSlot)
void preBeginJob(edm::PathsAndConsumesOfModulesBase const &, edm::ProcessContext const &pc)
void watchPreBeginJob(PreBeginJob::slot_type const &iSlot)
convenience function for attaching to signal
void preEvent(edm::StreamContext const &)
void preSourceEarlyTermination(edm::TerminationOrigin)
void watchPreSourceEvent(PreSourceEvent::slot_type const &iSlot)
void preModuleEvent(edm::StreamContext const &, edm::ModuleCallingContext const &)
void watchPostBeginJob(PostBeginJob::slot_type const &iSlot)
convenience function for attaching to signal
evf::FastMonitoringService::~FastMonitoringService ( )
override

Definition at line 120 of file FastMonitoringService.cc.

121  {
122  }

Member Function Documentation

void evf::FastMonitoringService::accumulateFileSize ( unsigned int  lumi,
unsigned long  fileSize 
)

Definition at line 634 of file FastMonitoringService.cc.

References accuSize_, filesProcessedDuringLumi_, fmt_, CommonMethods::lock(), csvLumiCalc::lumi, and evf::FastMonitoringThread::monlock_.

Referenced by evf::EvFDaqDirector::bumpFile().

634  {
635  std::lock_guard<std::mutex> lock(fmt_.monlock_);
636 
637  if (accuSize_.find(lumi)==accuSize_.end()) accuSize_[lumi] = fileSize;
638  else accuSize_[lumi] += fileSize;
639 
642  else
644  }
std::map< unsigned int, unsigned long > accuSize_
std::map< unsigned int, unsigned int > filesProcessedDuringLumi_
void evf::FastMonitoringService::doSnapshot ( const unsigned int  ls,
const bool  isGlobalEOL 
)
private

Definition at line 718 of file FastMonitoringService.cc.

References avgLeadTime_, encModule_, evf::FastMonitoringService::Encoding::encode(), encPath_, evf::FastMonitoringThread::MonitorData::fastAvgLeadTimeJ_, evf::FastMonitoringThread::MonitorData::fastFilesProcessedJ_, evf::FastMonitoringThread::MonitorData::fastLockCountJ_, evf::FastMonitoringThread::MonitorData::fastLockWaitJ_, evf::FastMonitoringThread::MonitorData::fastMacrostateJ_, filesProcessedDuringLumi_, fmt_, mps_fire::i, reco::if(), evf::FastMonitoringThread::inNewLumi, evf::FastMonitoringThread::inNoRequest, evf::FastMonitoringThread::inNoRequestWithEoLThreads, evf::FastMonitoringThread::inNoRequestWithIdleThreads, evf::FastMonitoringThread::MonitorData::inputState_, inputState_, inputSupervisorState_, evf::FastMonitoringThread::inRunEnd, evf::FastMonitoringThread::inSupBusy, evf::FastMonitoringThread::inSupFileLimit, evf::FastMonitoringThread::inSupLockPolling, evf::FastMonitoringThread::inSupLockPollingCopying, evf::FastMonitoringThread::inSupNewFile, evf::FastMonitoringThread::inSupNewFileWaitChunk, evf::FastMonitoringThread::inSupNewFileWaitChunkCopying, evf::FastMonitoringThread::inSupNewFileWaitThread, evf::FastMonitoringThread::inSupNewFileWaitThreadCopying, evf::FastMonitoringThread::inSupNoFile, evf::FastMonitoringThread::inSupWaitFreeChunk, evf::FastMonitoringThread::inSupWaitFreeChunkCopying, evf::FastMonitoringThread::inSupWaitFreeThread, evf::FastMonitoringThread::inSupWaitFreeThreadCopying, evf::FastMonitoringThread::inWaitChunk, evf::FastMonitoringThread::inWaitChunk_busy, evf::FastMonitoringThread::inWaitChunk_fileLimit, evf::FastMonitoringThread::inWaitChunk_lockPolling, evf::FastMonitoringThread::inWaitChunk_lockPollingCopying, evf::FastMonitoringThread::inWaitChunk_newFile, evf::FastMonitoringThread::inWaitChunk_newFileWaitChunk, evf::FastMonitoringThread::inWaitChunk_newFileWaitChunkCopying, evf::FastMonitoringThread::inWaitChunk_newFileWaitThread, evf::FastMonitoringThread::inWaitChunk_newFileWaitThreadCopying, evf::FastMonitoringThread::inWaitChunk_noFile, evf::FastMonitoringThread::inWaitChunk_runEnd, evf::FastMonitoringThread::inWaitChunk_waitFreeChunk, evf::FastMonitoringThread::inWaitChunk_waitFreeChunkCopying, evf::FastMonitoringThread::inWaitChunk_waitFreeThread, evf::FastMonitoringThread::inWaitChunk_waitFreeThreadCopying, evf::FastMonitoringThread::inWaitInput, evf::FastMonitoringThread::inWaitInput_busy, evf::FastMonitoringThread::inWaitInput_fileLimit, evf::FastMonitoringThread::inWaitInput_lockPolling, evf::FastMonitoringThread::inWaitInput_lockPollingCopying, evf::FastMonitoringThread::inWaitInput_newFile, evf::FastMonitoringThread::inWaitInput_newFileWaitChunk, evf::FastMonitoringThread::inWaitInput_newFileWaitChunkCopying, evf::FastMonitoringThread::inWaitInput_newFileWaitThread, evf::FastMonitoringThread::inWaitInput_newFileWaitThreadCopying, evf::FastMonitoringThread::inWaitInput_noFile, evf::FastMonitoringThread::inWaitInput_runEnd, evf::FastMonitoringThread::inWaitInput_waitFreeChunk, evf::FastMonitoringThread::inWaitInput_waitFreeChunkCopying, evf::FastMonitoringThread::inWaitInput_waitFreeThread, evf::FastMonitoringThread::inWaitInput_waitFreeThreadCopying, isInitTransition_, evf::FastMonitoringThread::jsonMonitor_, lockStatsDuringLumi_, evf::FastMonitoringThread::m_data, macrostate_, evf::MicroStateService::mEoL, evf::MicroStateService::mFwkEoL, microstate_, evf::FastMonitoringThread::MonitorData::microstateEncoded_, evf::MicroStateService::mIdle, ministate_, evf::FastMonitoringThread::MonitorData::ministateEncoded_, nStreams_, and evf::MicroStateService::reservedMicroStateNames.

Referenced by preGlobalEndLumi().

718  {
719  // update macrostate
721 
722  std::vector<const void*> microstateCopy(microstate_.begin(),microstate_.end());
723 
724  if (!isInitTransition_) {
725 
726  auto itd = avgLeadTime_.find(ls);
727  if (itd != avgLeadTime_.end())
728  fmt_.m_data.fastAvgLeadTimeJ_ = itd->second;
729  else fmt_.m_data.fastAvgLeadTimeJ_=0.;
730 
731  auto iti = filesProcessedDuringLumi_.find(ls);
732  if (iti != filesProcessedDuringLumi_.end())
733  fmt_.m_data.fastFilesProcessedJ_ = iti->second;
735 
736  auto itrd = lockStatsDuringLumi_.find(ls);
737  if (itrd != lockStatsDuringLumi_.end()) {
738  fmt_.m_data.fastLockWaitJ_ = itrd->second.first;
739  fmt_.m_data.fastLockCountJ_ = itrd->second.second;
740  }
741  else {
744  }
745  }
746 
747  for (unsigned int i=0;i<nStreams_;i++) {
749  fmt_.m_data.microstateEncoded_[i] = encModule_.encode(microstateCopy[i]);
750  }
751 
752  bool inputStatePerThread=false;
753 
755  switch (inputSupervisorState_) {
758  break;
761  break;
764  break;
767  break;
770  break;
773  break;
776  break;
779  break;
782  break;
785  break;
788  break;
791  break;
794  break;
797  break;
800  break;
801  default:
803  }
804  }
806 
807  switch (inputSupervisorState_) {
810  break;
813  break;
816  break;
819  break;
822  break;
825  break;
828  break;
831  break;
834  break;
837  break;
840  break;
843  break;
846  break;
849  break;
852  break;
853  default:
855  }
856  }
858  inputStatePerThread=true;
859  for (unsigned int i=0;i<nStreams_;i++) {
860  if (microstateCopy[i]==&reservedMicroStateNames[mIdle])
862  else if (microstateCopy[i]==&reservedMicroStateNames[mEoL] ||
863  microstateCopy[i]==&reservedMicroStateNames[mFwkEoL])
865  else
867  }
868  }
870  inputStatePerThread=true;
871  for (unsigned int i=0;i<nStreams_;i++) {
872  if (microstateCopy[i]==&reservedMicroStateNames[mEoL] ||
873  microstateCopy[i]==&reservedMicroStateNames[mFwkEoL])
875  }
876  }
877  else
879 
880  //this is same for all streams
881  if (!inputStatePerThread)
882  for (unsigned int i=1;i<nStreams_;i++)
884 
885  if (isGlobalEOL)
886  {//only update global variables
887  fmt_.jsonMonitor_->snapGlobal(ls);
888  }
889  else
890  fmt_.jsonMonitor_->snap(ls);
891  }
static const edm::ModuleDescription reservedMicroStateNames[mCOUNT]
std::vector< ContainableAtomic< const void * > > microstate_
std::atomic< bool > isInitTransition_
std::atomic< FastMonitoringThread::InputState > inputSupervisorState_
std::map< unsigned int, unsigned int > filesProcessedDuringLumi_
std::map< unsigned int, double > avgLeadTime_
std::unique_ptr< jsoncollector::FastMonitor > jsonMonitor_
std::map< unsigned int, std::pair< double, unsigned int > > lockStatsDuringLumi_
std::atomic< FastMonitoringThread::InputState > inputState_
def ls(path, rec=False)
Definition: eostools.py:348
std::vector< unsigned int > microstateEncoded_
std::atomic< FastMonitoringThread::Macrostate > macrostate_
std::vector< ContainableAtomic< const void * > > ministate_
if(dp >Float(M_PI)) dp-
std::vector< Encoding > encPath_
std::vector< unsigned int > inputState_
std::vector< unsigned int > ministateEncoded_
void evf::FastMonitoringService::doStreamEOLSnapshot ( const unsigned int  ls,
const unsigned int  streamID 
)
inlineprivate

Definition at line 195 of file FastMonitoringService.h.

Referenced by preStreamEndLumi().

195  {
196  //pick up only event count here
197  fmt_.jsonMonitor_->snapStreamAtomic(ls,streamID);
198  }
std::unique_ptr< jsoncollector::FastMonitor > jsonMonitor_
def ls(path, rec=False)
Definition: eostools.py:348
void evf::FastMonitoringService::dowork ( )
inlineprivate

Definition at line 200 of file FastMonitoringService.h.

References relativeConstraints::empty, mps_fire::i, CommonMethods::lock(), and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by preBeginJob().

200  { // the function to be called in the thread. Thread completes when function returns.
201  monInit_.exchange(true,std::memory_order_acquire);
202  while (!fmt_.m_stoprequest) {
203  edm::LogInfo("FastMonitoringService") << "Current states: Ms=" << fmt_.m_data.fastMacrostateJ_.value()
204  << " ms=" << encPath_[0].encode(ministate_[0])
205  << " us=" << encModule_.encode(microstate_[0])
206  << " is=" << inputStateNames[inputState_]
208  << std::endl;
209 
210  {
211  std::lock_guard<std::mutex> lock(fmt_.monlock_);
212 
214 
216  if (filePerFwkStream_) {
217  std::vector<std::string> CSVv;
218  for (unsigned int i=0;i<nStreams_;i++) {
219  CSVv.push_back(fmt_.jsonMonitor_->getCSVString((int)i));
220  }
221  fmt_.monlock_.unlock();
222  for (unsigned int i=0;i<nStreams_;i++) {
223  if (!CSVv[i].empty())
224  fmt_.jsonMonitor_->outputCSV(fastPathList_[i],CSVv[i]);
225  }
226  }
227  else {
228  std::string CSV = fmt_.jsonMonitor_->getCSVString();
229  //release mutex before writing out fast path file
230  fmt_.monlock_.unlock();
231  if (!CSV.empty())
232  fmt_.jsonMonitor_->outputCSV(fastPath_,CSV);
233  }
234  }
235 
236  snapCounter_++;
237 
238  }
239  ::sleep(sleepTime_);
240  }
241  }
std::vector< ContainableAtomic< const void * > > microstate_
std::atomic< FastMonitoringThread::InputState > inputSupervisorState_
static const std::string inputStateNames[FastMonitoringThread::inCOUNT]
void doSnapshot(const unsigned int ls, const bool isGlobalEOL)
std::unique_ptr< jsoncollector::FastMonitor > jsonMonitor_
std::vector< std::string > fastPathList_
std::atomic< FastMonitoringThread::InputState > inputState_
std::vector< ContainableAtomic< const void * > > ministate_
std::atomic< bool > m_stoprequest
std::vector< Encoding > encPath_
void evf::FastMonitoringService::fillDescriptions ( edm::ConfigurationDescriptions descriptions)
static

Definition at line 124 of file FastMonitoringService.cc.

References edm::ConfigurationDescriptions::add(), edm::ParameterSetDescription::addUntracked(), edm::ParameterSetDescription::setAllowAnything(), and edm::ParameterSetDescription::setComment().

125  {
127  desc.setComment("Service for File-based DAQ monitoring and event accounting");
128  desc.addUntracked<int> ("sleepTime",1)->setComment("Sleep time of the monitoring thread");
129  desc.addUntracked<unsigned int> ("fastMonIntervals",2)->setComment("Modulo of sleepTime intervals on which fastmon file is written out");
130  desc.addUntracked<bool> ("filePerFwkStream", false)->setComment("Switches on monitoring output per framework stream");
131  desc.setAllowAnything();
132  descriptions.add("FastMonitoringService", desc);
133  }
ParameterDescriptionBase * addUntracked(U const &iLabel, T const &value)
void setAllowAnything()
allow any parameter label/value pairs
void setComment(std::string const &value)
void add(std::string const &label, ParameterSetDescription const &psetDescription)
bool evf::FastMonitoringService::getAbortFlagForLumi ( unsigned int  lumi)

Definition at line 704 of file FastMonitoringService.cc.

References Exception, fmt_, CommonMethods::lock(), csvLumiCalc::lumi, evf::FastMonitoringThread::monlock_, and processedEventsPerLumi_.

704  {
705  std::lock_guard<std::mutex> lock(fmt_.monlock_);
706 
707  auto it = processedEventsPerLumi_.find(lumi);
708  if (it!=processedEventsPerLumi_.end()) {
709  unsigned int abortFlag = it->second.second;
710  return abortFlag;
711  }
712  else {
713  throw cms::Exception("FastMonitoringService") << "output module wants already deleted (or never reported by SOURCE) lumisection status for LUMI -: "<<lumi;
714  return false;
715  }
716  }
std::map< unsigned int, std::pair< unsigned int, bool > > processedEventsPerLumi_
unsigned int evf::FastMonitoringService::getEventsProcessedForLumi ( unsigned int  lumi,
bool *  abortFlag = 0 
)

Definition at line 688 of file FastMonitoringService.cc.

References Exception, fmt_, CommonMethods::lock(), csvLumiCalc::lumi, evf::FastMonitoringThread::monlock_, proc, and processedEventsPerLumi_.

Referenced by evf::RecoEventOutputModuleForFU< Consumer >::endLuminosityBlock(), dqm::DQMFileSaverPB::fillJson(), DQMFileSaver::fillJson(), DQMFileSaver::saveForFilterUnit(), and dqm::DQMFileSaverPB::saveLumi().

688  {
689  std::lock_guard<std::mutex> lock(fmt_.monlock_);
690 
691  auto it = processedEventsPerLumi_.find(lumi);
692  if (it!=processedEventsPerLumi_.end()) {
693  unsigned int proc = it->second.first;
694  if (abortFlag) *abortFlag=it->second.second;
695  return proc;
696  }
697  else {
698  throw cms::Exception("FastMonitoringService") << "output module wants already deleted (or never reported by SOURCE) lumisection event count for LUMI -: "<<lumi;
699  return 0;
700  }
701  }
TrainProcessor *const proc
Definition: MVATrainer.cc:101
std::map< unsigned int, std::pair< unsigned int, bool > > processedEventsPerLumi_
std::string evf::FastMonitoringService::getRunDirName ( ) const
inline

Definition at line 186 of file FastMonitoringService.h.

186 { return runDirectory_.stem().string(); }
boost::filesystem::path runDirectory_
void evf::FastMonitoringService::jobFailure ( )

Definition at line 345 of file FastMonitoringService.cc.

References macrostate_, and evf::FastMonitoringThread::sError.

Referenced by FastMonitoringService().

346  {
348  }
std::atomic< FastMonitoringThread::Macrostate > macrostate_
std::string evf::FastMonitoringService::makeInputLegendaJson ( )

Definition at line 164 of file FastMonitoringService.cc.

References Json::Value::append(), Json::arrayValue, mps_fire::i, evf::FastMonitoringThread::inCOUNT, inputStateNames, Json::StyledWriter::write(), and cscNeutronWriter_cfi::writer.

Referenced by postBeginJob().

164  {
165  Json::Value legendaVector(Json::arrayValue);
166  for(int i = 0; i < FastMonitoringThread::inCOUNT; i++)
167  legendaVector.append(Json::Value(inputStateNames[i]));
168  Json::Value moduleLegend;
169  moduleLegend["names"]=legendaVector;
171  return writer.write(moduleLegend);
172  }
Represents a JSON value.
Definition: value.h:111
static const std::string inputStateNames[FastMonitoringThread::inCOUNT]
std::string write(const Value &root) override
Serialize a Value in JSON format.
Writes a Value in JSON format in a human friendly way.
Definition: writer.h:65
array value (ordered list)
Definition: value.h:31
std::string evf::FastMonitoringService::makeModuleLegendaJson ( )

Definition at line 148 of file FastMonitoringService.cc.

References Json::Value::append(), Json::arrayValue, evf::FastMonitoringService::Encoding::current_, evf::FastMonitoringService::Encoding::decode(), encModule_, mps_fire::i, nOutputModules_, nReservedModules, nSpecialModules, Json::StyledWriter::write(), and cscNeutronWriter_cfi::writer.

Referenced by postBeginJob().

148  {
149  Json::Value legendaVector(Json::arrayValue);
150  for(int i = 0; i < encModule_.current_; i++)
151  legendaVector.append(Json::Value((static_cast<const edm::ModuleDescription *>(encModule_.decode(i)))->moduleLabel()));
152  Json::Value valReserved(nReservedModules);
153  Json::Value valSpecial(nSpecialModules);
154  Json::Value valOutputModules(nOutputModules_);
155  Json::Value moduleLegend;
156  moduleLegend["names"]=legendaVector;
157  moduleLegend["reserved"]=valReserved;
158  moduleLegend["special"]=valSpecial;
159  moduleLegend["output"]=valOutputModules;
161  return writer.write(moduleLegend);
162  }
Represents a JSON value.
Definition: value.h:111
static const int nReservedModules
static const int nSpecialModules
std::string write(const Value &root) override
Serialize a Value in JSON format.
Writes a Value in JSON format in a human friendly way.
Definition: writer.h:65
const void * decode(unsigned int index)
array value (ordered list)
Definition: value.h:31
std::string evf::FastMonitoringService::makePathLegendaJson ( )

Definition at line 136 of file FastMonitoringService.cc.

References Json::Value::append(), Json::arrayValue, encPath_, mps_fire::i, nReservedPaths, Json::StyledWriter::write(), and cscNeutronWriter_cfi::writer.

Referenced by prePathEvent().

136  {
137  Json::Value legendaVector(Json::arrayValue);
138  for(int i = 0; i < encPath_[0].current_; i++)
139  legendaVector.append(Json::Value(*(static_cast<const std::string *>(encPath_[0].decode(i)))));
140  Json::Value valReserved(nReservedPaths);
141  Json::Value pathLegend;
142  pathLegend["names"]=legendaVector;
143  pathLegend["reserved"]=valReserved;
145  return writer.write(pathLegend);
146  }
static const int nReservedPaths
Represents a JSON value.
Definition: value.h:111
std::string write(const Value &root) override
Serialize a Value in JSON format.
Writes a Value in JSON format in a human friendly way.
Definition: writer.h:65
std::vector< Encoding > encPath_
array value (ordered list)
Definition: value.h:31
void evf::FastMonitoringService::postBeginJob ( )

Definition at line 367 of file FastMonitoringService.cc.

References encModule_, fmt_, inputLegendFileJson_, CommonMethods::lock(), evf::FastMonitoringThread::m_data, macrostate_, makeInputLegendaJson(), makeModuleLegendaJson(), evf::FastMonitoringThread::MonitorData::microstateBins_, moduleLegendFileJson_, evf::FastMonitoringThread::monlock_, evf::FastMonitoringThread::sJobReady, AlCaHLTBitMon_QueryRunRegistry::string, and evf::FastMonitoringService::Encoding::vecsize().

Referenced by FastMonitoringService().

368  {
369  std::string && moduleLegStrJson = makeModuleLegendaJson();
370  FileIO::writeStringToFile(moduleLegendFileJson_, moduleLegStrJson);
371 
372  std::string inputLegendStrJson = makeInputLegendaJson();
373  FileIO::writeStringToFile(inputLegendFileJson_, inputLegendStrJson);
374 
376 
377  //update number of entries in module histogram
378  std::lock_guard<std::mutex> lock(fmt_.monlock_);
380  }
std::atomic< FastMonitoringThread::Macrostate > macrostate_
void evf::FastMonitoringService::postEndJob ( )
void evf::FastMonitoringService::postEvent ( edm::StreamContext const &  sc)

Definition at line 581 of file FastMonitoringService.cc.

References eventCountForPathInit_, evf::FastMonitoringThread::MonitorData::fastPathProcessedJ_, fmt_, evf::FastMonitoringThread::m_data, microstate_, evf::MicroStateService::mIdle, ministate_, nopath_, evf::FastMonitoringThread::MonitorData::processed_, evf::MicroStateService::reservedMicroStateNames, edm::StreamContext::streamID(), and totalEventsProcessed_.

Referenced by FastMonitoringService().

582  {
584 
585  ministate_[sc.streamID()] = &nopath_;
586 
587  (*(fmt_.m_data.processed_[sc.streamID()]))++;
588  eventCountForPathInit_[sc.streamID()].m_value++;
589 
590  //fast path counter (events accumulated in a run)
591  unsigned long res = totalEventsProcessed_.fetch_add(1,std::memory_order_relaxed);
593  //fmt_.m_data.fastPathProcessedJ_ = totalEventsProcessed_.load(std::memory_order_relaxed);
594  }
static const edm::ModuleDescription reservedMicroStateNames[mCOUNT]
std::vector< ContainableAtomic< const void * > > microstate_
Definition: Electron.h:6
static const std::string nopath_
std::vector< ContainableAtomic< unsigned int > > eventCountForPathInit_
std::atomic< unsigned long > totalEventsProcessed_
std::vector< ContainableAtomic< const void * > > ministate_
std::vector< jsoncollector::AtomicMonUInt * > processed_
void evf::FastMonitoringService::postGlobalBeginRun ( edm::GlobalContext const &  gc)
void evf::FastMonitoringService::postGlobalEndLumi ( edm::GlobalContext const &  gc)

Definition at line 495 of file FastMonitoringService.cc.

References avgLeadTime_, filesProcessedDuringLumi_, fmt_, CommonMethods::lock(), lockStatsDuringLumi_, edm::LuminosityBlockID::luminosityBlock(), edm::GlobalContext::luminosityBlockID(), evf::FastMonitoringThread::monlock_, and processedEventsPerLumi_.

Referenced by FastMonitoringService().

496  {
497  std::lock_guard<std::mutex> lock(fmt_.monlock_);
498  unsigned int lumi = gc.luminosityBlockID().luminosityBlock();
499  //LS monitoring snapshot with input source data has been taken in previous callback
500  avgLeadTime_.erase(lumi);
501  filesProcessedDuringLumi_.erase(lumi);
502  lockStatsDuringLumi_.erase(lumi);
503 
504  //output module already used this in end lumi (this could be migrated to EvFDaqDirector as it is essential for FFF bookkeeping)
505  processedEventsPerLumi_.erase(lumi);
506  }
std::map< unsigned int, unsigned int > filesProcessedDuringLumi_
std::map< unsigned int, double > avgLeadTime_
std::map< unsigned int, std::pair< double, unsigned int > > lockStatsDuringLumi_
std::map< unsigned int, std::pair< unsigned int, bool > > processedEventsPerLumi_
void evf::FastMonitoringService::postModuleEvent ( edm::StreamContext const &  sc,
edm::ModuleCallingContext const &  mcc 
)

Definition at line 611 of file FastMonitoringService.cc.

References evf::MicroStateService::mFwkOvhMod, microstate_, evf::MicroStateService::reservedMicroStateNames, edm::StreamContext::streamID(), and edm::StreamID::value().

Referenced by FastMonitoringService().

612  {
613  //microstate_[sc.streamID().value()] = (void*)(mcc.moduleDescription());
614  microstate_[sc.streamID().value()] = &reservedMicroStateNames[mFwkOvhMod];
615  }
static const edm::ModuleDescription reservedMicroStateNames[mCOUNT]
std::vector< ContainableAtomic< const void * > > microstate_
void evf::FastMonitoringService::postSourceEvent ( edm::StreamID  sid)

Definition at line 601 of file FastMonitoringService.cc.

References evf::MicroStateService::mFwkOvhSrc, microstate_, evf::MicroStateService::reservedMicroStateNames, and edm::StreamID::value().

Referenced by FastMonitoringService().

602  {
604  }
static const edm::ModuleDescription reservedMicroStateNames[mCOUNT]
std::vector< ContainableAtomic< const void * > > microstate_
unsigned int value() const
Definition: StreamID.h:46
void evf::FastMonitoringService::postStreamBeginLumi ( edm::StreamContext const &  sc)

Definition at line 522 of file FastMonitoringService.cc.

References microstate_, evf::MicroStateService::mIdle, evf::MicroStateService::reservedMicroStateNames, edm::StreamContext::streamID(), and edm::StreamID::value().

Referenced by FastMonitoringService().

523  {
524  microstate_[sc.streamID().value()]=&reservedMicroStateNames[mIdle];
525  }
static const edm::ModuleDescription reservedMicroStateNames[mCOUNT]
std::vector< ContainableAtomic< const void * > > microstate_
void evf::FastMonitoringService::postStreamEndLumi ( edm::StreamContext const &  sc)

Definition at line 538 of file FastMonitoringService.cc.

References evf::MicroStateService::mFwkEoL, microstate_, evf::MicroStateService::reservedMicroStateNames, edm::StreamContext::streamID(), and edm::StreamID::value().

Referenced by FastMonitoringService().

539  {
540  microstate_[sc.streamID().value()]=&reservedMicroStateNames[mFwkEoL];
541  }
static const edm::ModuleDescription reservedMicroStateNames[mCOUNT]
std::vector< ContainableAtomic< const void * > > microstate_
void evf::FastMonitoringService::preallocate ( edm::service::SystemBounds const &  bounds)

Definition at line 174 of file FastMonitoringService.cc.

References edm::service::SystemBounds::maxNumberOfStreams(), edm::service::SystemBounds::maxNumberOfThreads(), nStreams_, and nThreads_.

Referenced by FastMonitoringService().

175  {
176  nStreams_=bounds.maxNumberOfStreams();
177  nThreads_=bounds.maxNumberOfThreads();
178  //this should already be >=1
179  if (nStreams_==0) nStreams_=1;
180  if (nThreads_==0) nThreads_=1;
181  }
void evf::FastMonitoringService::preBeginJob ( edm::PathsAndConsumesOfModulesBase const &  ,
edm::ProcessContext const &  pc 
)

Definition at line 183 of file FastMonitoringService.cc.

References collectedPathList_, evf::FastMonitoringService::Encoding::completeReservedWithDummies(), dowork(), encModule_, encPath_, eventCountForPathInit_, Exception, fastMicrostateDefPath_, fastName_, fastPath_, fastPathList_, filePerFwkStream_, firstEventId_, fmt_, mps_fire::i, evf::FastMonitoringThread::inCOUNT, inputLegendFileJson_, evf::FastMonitoringThread::MonitorData::inputstateBins_, isInitTransition_, evf::FastMonitoringThread::jsonMonitor_, lastGlobalLumi_, LogDebug, lumiFromSource_, evf::FastMonitoringThread::m_data, macrostate_, evf::FastMonitoringThread::MonitorData::macrostateBins_, evf::FastMonitoringThread::MCOUNT, evf::MicroStateService::mCOUNT, microstate_, evf::FastMonitoringThread::MonitorData::microstateBins_, microstateDefPath_, ministate_, evf::FastMonitoringThread::MonitorData::ministateBins_, evf::MicroStateService::mInvalid, moduleLegendFile_, moduleLegendFileJson_, monInit_, nopath_, nStreams_, nThreads_, Utilities::operator, callgraph::path, pathLegendFile_, pathLegendFileJson_, evf::FastMonitoringThread::MonitorData::registerVariables(), evf::MicroStateService::reservedMicroStateNames, evf::FastMonitoringThread::resetFastMonitor(), runDirectory_, evf::FastMonitoringThread::sInit, evf::FastMonitoringThread::start(), streamCounterUpdating_, threadIDAvailable_, evf::FastMonitoringService::Encoding::updateReserved(), and workingDirectory_.

Referenced by FastMonitoringService().

185  {
186 
187  // FIND RUN DIRECTORY
188  // The run dir should be set via the configuration of EvFDaqDirector
189 
190  if (edm::Service<evf::EvFDaqDirector>().operator->()==nullptr)
191  {
192  throw cms::Exception("FastMonitoringService") << "EvFDaqDirector is not present";
193 
194  }
195  boost::filesystem::path runDirectory(edm::Service<evf::EvFDaqDirector>()->baseRunDir());
196  workingDirectory_ = runDirectory_ = runDirectory;
197  workingDirectory_ /= "mon";
198 
199  if ( !boost::filesystem::is_directory(workingDirectory_)) {
200  LogDebug("FastMonitoringService") << "<MON> DIR NOT FOUND! Trying to create -: " << workingDirectory_.string() ;
201  boost::filesystem::create_directories(workingDirectory_);
202  if ( !boost::filesystem::is_directory(workingDirectory_))
203  edm::LogWarning("FastMonitoringService") << "Unable to create <MON> DIR -: " << workingDirectory_.string()
204  << ". No monitoring data will be written.";
205  }
206 
207  std::ostringstream fastFileName;
208 
209  fastFileName << fastName_ << "_pid" << std::setfill('0') << std::setw(5) << getpid() << ".fast";
211  fast /= fastFileName.str();
212  fastPath_ = fast.string();
213  if (filePerFwkStream_)
214  for (unsigned int i=0;i<nStreams_;i++) {
215  std::ostringstream fastFileNameTid;
216  fastFileNameTid << fastName_ << "_pid" << std::setfill('0') << std::setw(5) << getpid() << "_tid" << i << ".fast";
218  fastTid /= fastFileNameTid.str();
219  fastPathList_.push_back(fastTid.string());
220  }
221 
222  std::ostringstream moduleLegFile;
223  std::ostringstream moduleLegFileJson;
224  moduleLegFile << "microstatelegend_pid" << std::setfill('0') << std::setw(5) << getpid() << ".leg";
225  moduleLegFileJson << "microstatelegend_pid" << std::setfill('0') << std::setw(5) << getpid() << ".jsn";
226  moduleLegendFile_ = (workingDirectory_/moduleLegFile.str()).string();
227  moduleLegendFileJson_ = (workingDirectory_/moduleLegFileJson.str()).string();
228 
229  std::ostringstream pathLegFile;
230  std::ostringstream pathLegFileJson;
231  pathLegFile << "pathlegend_pid" << std::setfill('0') << std::setw(5) << getpid() << ".leg";
232  pathLegendFile_ = (workingDirectory_/pathLegFile.str()).string();
233  pathLegFileJson << "pathlegend_pid" << std::setfill('0') << std::setw(5) << getpid() << ".jsn";
234  pathLegendFileJson_ = (workingDirectory_/pathLegFileJson.str()).string();
235 
236  std::ostringstream inputLegFileJson;
237  inputLegFileJson << "inputlegend_pid" << std::setfill('0') << std::setw(5) << getpid() << ".jsn";
238  inputLegendFileJson_ = (workingDirectory_/inputLegFileJson.str()).string();
239 
240  LogDebug("FastMonitoringService") << "Initializing FastMonitor with microstate def path -: "
242  //<< encPath_.current_ + 1 << " " << encModule_.current_ + 1
243 
244  /*
245  * initialize the fast monitor with:
246  * vector of pointers to monitorable parameters
247  * path to definition
248  *
249  */
250 
252 
253  for(unsigned int i = 0; i < (mCOUNT); i++)
254  encModule_.updateReserved(static_cast<const void*>(reservedMicroStateNames+i));
256 
257  for (unsigned int i=0;i<nStreams_;i++) {
258  ministate_.emplace_back(&nopath_);
260 
261  //for synchronization
262  streamCounterUpdating_.push_back(new std::atomic<bool>(false));
263 
264  //path (mini) state
265  encPath_.emplace_back(0);
266  encPath_[i].update(static_cast<const void*>(&nopath_));
267  eventCountForPathInit_.push_back(0);
268  firstEventId_.push_back(0);
269  collectedPathList_.push_back(new std::atomic<bool>(false));
270 
271  }
272  //for (unsigned int i=0;i<nThreads_;i++)
273  // threadMicrostate_.push_back(&reservedMicroStateNames[mInvalid]);
274 
275  //initial size until we detect number of bins
280 
281  lastGlobalLumi_=0;
282  isInitTransition_=true;
283  lumiFromSource_=0;
284 
285  //startup monitoring
287  fmt_.jsonMonitor_->setNStreams(nStreams_);
289  monInit_.store(false,std::memory_order_release);
291 
292  //this definition needs: #include "tbb/compat/thread"
293  //however this would results in TBB imeplementation replacing std::thread
294  //(both supposedly call pthread_self())
295  //number of threads created in process could be obtained from /proc,
296  //assuming that all posix threads are true kernel threads capable of running in parallel
297 
298  //#if TBB_IMPLEMENT_CPP0X
300  //threadIDAvailable_=true;
301  //#endif
302 
303  }
#define LogDebug(id)
static const edm::ModuleDescription reservedMicroStateNames[mCOUNT]
std::vector< ContainableAtomic< const void * > > microstate_
std::atomic< bool > isInitTransition_
boost::filesystem::path runDirectory_
void start(void(FastMonitoringService::*fp)(), FastMonitoringService *cp)
std::vector< std::atomic< bool > * > streamCounterUpdating_
void registerVariables(jsoncollector::FastMonitor *fm, unsigned int nStreams, unsigned int nThreads)
std::vector< std::atomic< bool > * > collectedPathList_
std::unique_ptr< jsoncollector::FastMonitor > jsonMonitor_
static const std::string nopath_
void resetFastMonitor(std::string const &microStateDefPath, std::string const &fastMicroStateDefPath)
std::vector< unsigned long > firstEventId_
std::vector< std::string > fastPathList_
std::vector< ContainableAtomic< unsigned int > > eventCountForPathInit_
std::atomic< FastMonitoringThread::Macrostate > macrostate_
std::vector< ContainableAtomic< const void * > > ministate_
boost::filesystem::path workingDirectory_
std::vector< Encoding > encPath_
void evf::FastMonitoringService::preEvent ( edm::StreamContext const &  sc)

Definition at line 577 of file FastMonitoringService.cc.

Referenced by FastMonitoringService().

578  {
579  }
void evf::FastMonitoringService::preGlobalBeginLumi ( edm::GlobalContext const &  gc)

Definition at line 394 of file FastMonitoringService.cc.

References fmt_, lastGlobalLumi_, CommonMethods::lock(), edm::LuminosityBlockID::luminosityBlock(), edm::GlobalContext::luminosityBlockID(), lumiStartTime_, and evf::FastMonitoringThread::monlock_.

Referenced by FastMonitoringService().

395  {
396  timeval lumiStartTime;
397  gettimeofday(&lumiStartTime, nullptr);
398  unsigned int newLumi = gc.luminosityBlockID().luminosityBlock();
399  lastGlobalLumi_ = newLumi;
400 
401  std::lock_guard<std::mutex> lock(fmt_.monlock_);
402  lumiStartTime_[newLumi]=lumiStartTime;
403 
404 
405  }
std::map< unsigned int, timeval > lumiStartTime_
void evf::FastMonitoringService::preGlobalEarlyTermination ( edm::GlobalContext const &  gc,
edm::TerminationOrigin  to 
)

Definition at line 317 of file FastMonitoringService.cc.

References edm::ExceptionFromAnotherContext, edm::ExceptionFromThisContext, exceptionInLS_, edm::ExternalSignal, fmt_, CommonMethods::lock(), edm::LuminosityBlockID::luminosityBlock(), edm::GlobalContext::luminosityBlockID(), evf::FastMonitoringThread::monlock_, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by FastMonitoringService().

318  {
319  std::string context;
320  if (to==edm::TerminationOrigin::ExceptionFromThisContext) context = " FromThisContext ";
321  if (to==edm::TerminationOrigin::ExceptionFromAnotherContext) context = " FromAnotherContext";
322  if (to==edm::TerminationOrigin::ExternalSignal) context = " FromExternalSignal";
323  edm::LogWarning("FastMonitoringService") << " GLOBAL " << "earlyTermination -: LS:"
324  << gc.luminosityBlockID().luminosityBlock() << " " << context;
325  std::lock_guard<std::mutex> lock(fmt_.monlock_);
326  exceptionInLS_.push_back(gc.luminosityBlockID().luminosityBlock());
327  }
std::vector< unsigned int > exceptionInLS_
void evf::FastMonitoringService::preGlobalEndLumi ( edm::GlobalContext const &  gc)

Definition at line 407 of file FastMonitoringService.cc.

References accuSize_, doSnapshot(), Exception, exception_detected_, exceptionInLS_, evf::FastMonitoringThread::MonitorData::fastThroughputJ_, filePerFwkStream_, fmt_, FedRawDataInputSource::getEventReport(), inputSource_, evf::FastMonitoringThread::jsonMonitor_, CommonMethods::lock(), LogDebug, csvLumiCalc::lumi, edm::LuminosityBlockID::luminosityBlock(), edm::GlobalContext::luminosityBlockID(), lumiStartTime_, evf::FastMonitoringThread::m_data, evf::FastMonitoringThread::monlock_, callgraph::path, processedEventsPerLumi_, edm::shutdown_flag, slowName_, throughputFactor(), jsoncollector::IntJ::value(), jsoncollector::DoubleJ::value(), and workingDirectory_.

Referenced by FastMonitoringService().

408  {
409  unsigned int lumi = gc.luminosityBlockID().luminosityBlock();
410  LogDebug("FastMonitoringService") << "Lumi ended. Writing JSON information. LUMI -: "
411  << lumi;
412  timeval lumiStopTime;
413  gettimeofday(&lumiStopTime, nullptr);
414 
415  std::lock_guard<std::mutex> lock(fmt_.monlock_);
416 
417  // Compute throughput
418  timeval stt = lumiStartTime_[lumi];
419  lumiStartTime_.erase(lumi);
420  unsigned long usecondsForLumi = (lumiStopTime.tv_sec - stt.tv_sec)*1000000
421  + (lumiStopTime.tv_usec - stt.tv_usec);
422  unsigned long accuSize = accuSize_.find(lumi)==accuSize_.end() ? 0 : accuSize_[lumi];
423  accuSize_.erase(lumi);
424  double throughput = throughputFactor()* double(accuSize) / double(usecondsForLumi);
425  //store to registered variable
426  fmt_.m_data.fastThroughputJ_.value() = throughput;
427 
428  //update
429  doSnapshot(lumi,true);
430 
431  //retrieve one result we need (todo: sanity check if it's found)
432  IntJ *lumiProcessedJptr = dynamic_cast<IntJ*>(fmt_.jsonMonitor_->getMergedIntJForLumi("Processed",lumi));
433  if (!lumiProcessedJptr)
434  throw cms::Exception("FastMonitoringService") << "Internal error: got null pointer from FastMonitor";
435  processedEventsPerLumi_[lumi] = std::pair<unsigned int,bool>(lumiProcessedJptr->value(),false);
436 
437  //checking if exception has been thrown (in case of Global/Stream early termination, for this LS)
438  bool exception_detected = exception_detected_;
439  for (auto ex : exceptionInLS_)
440  if (lumi == ex) exception_detected=true;
441 
442  if (edm::shutdown_flag || exception_detected) {
443  edm::LogInfo("FastMonitoringService") << "Run interrupted. Skip writing EoL information -: "
444  << processedEventsPerLumi_[lumi].first << " events were processed in LUMI " << lumi;
445  //this will prevent output modules from producing json file for possibly incomplete lumi
446  processedEventsPerLumi_[lumi].first=0;
447  processedEventsPerLumi_[lumi].second=true;
448  //disable this exception, so service can be used standalone (will be thrown if output module asks for this information)
449  //throw cms::Exception("FastMonitoringService") << "SOURCE did not send update for lumi block. LUMI -:" << lumi;
450  return;
451 
452  }
453 
454  if (inputSource_) {
455  auto sourceReport = inputSource_->getEventReport(lumi, true);
456  if (sourceReport.first) {
457  if (sourceReport.second!=processedEventsPerLumi_[lumi].first) {
458  throw cms::Exception("FastMonitoringService") << "MISMATCH with SOURCE update. LUMI -: "
459  << lumi
460  << ", events(processed):" << processedEventsPerLumi_[lumi].first
461  << " events(source):" << sourceReport.second;
462  }
463  }
464  }
465  edm::LogInfo("FastMonitoringService") << "Statistics for lumisection -: lumi = " << lumi << " events = "
466  << lumiProcessedJptr->value() << " time = " << usecondsForLumi/1000000
467  << " size = " << accuSize << " thr = " << throughput;
468  delete lumiProcessedJptr;
469 
470  //full global and stream merge&output for this lumi
471 
472  // create file name for slow monitoring file
473  if (filePerFwkStream_) {
474  std::stringstream slowFileNameStem;
475  slowFileNameStem << slowName_ << "_ls" << std::setfill('0') << std::setw(4)
476  << lumi << "_pid" << std::setfill('0')
477  << std::setw(5) << getpid();
479  slow /= slowFileNameStem.str();
480  fmt_.jsonMonitor_->outputFullJSONs(slow.string(),".jsn",lumi);
481  }
482  else {
483  std::stringstream slowFileName;
484  slowFileName << slowName_ << "_ls" << std::setfill('0') << std::setw(4)
485  << lumi << "_pid" << std::setfill('0')
486  << std::setw(5) << getpid() << ".jsn";
488  slow /= slowFileName.str();
489  fmt_.jsonMonitor_->outputFullJSON(slow.string(),lumi);//full global and stream merge and JSON write for this lumi
490  }
491  fmt_.jsonMonitor_->discardCollected(lumi);//we don't do further updates for this lumi
492 
493  }
#define LogDebug(id)
std::map< unsigned int, timeval > lumiStartTime_
std::pair< bool, unsigned int > getEventReport(unsigned int lumi, bool erase)
double throughputFactor()
std::map< unsigned int, unsigned long > accuSize_
volatile std::atomic< bool > shutdown_flag
void doSnapshot(const unsigned int ls, const bool isGlobalEOL)
std::unique_ptr< jsoncollector::FastMonitor > jsonMonitor_
std::map< unsigned int, std::pair< unsigned int, bool > > processedEventsPerLumi_
FedRawDataInputSource * inputSource_
boost::filesystem::path workingDirectory_
std::vector< unsigned int > exceptionInLS_
void evf::FastMonitoringService::preModuleBeginJob ( edm::ModuleDescription const &  )

Definition at line 351 of file FastMonitoringService.cc.

References encModule_, fmt_, CommonMethods::lock(), edm::ModuleDescription::moduleName(), evf::FastMonitoringThread::monlock_, nOutputModules_, evf::FastMonitoringService::Encoding::update(), and evf::FastMonitoringService::Encoding::updateReserved().

Referenced by FastMonitoringService().

352  {
353  std::lock_guard<std::mutex> lock(fmt_.monlock_);
354  //std::cout << " Pre module Begin Job module: " << desc.moduleName() << std::endl;
355 
356  //build a map of modules keyed by their module description address
357  //here we need to treat output modules in a special way so they can be easily singled out
358  if(desc.moduleName() == "Stream" || desc.moduleName() == "ShmStreamConsumer" || desc.moduleName() == "EvFOutputModule" ||
359  desc.moduleName() == "EventStreamFileWriter" || desc.moduleName() == "PoolOutputModule") {
360  encModule_.updateReserved((void*)&desc);
361  nOutputModules_++;
362  }
363  else
364  encModule_.update((void*)&desc);
365  }
void evf::FastMonitoringService::preModuleEvent ( edm::StreamContext const &  sc,
edm::ModuleCallingContext const &  mcc 
)

Definition at line 606 of file FastMonitoringService.cc.

References microstate_, edm::ModuleCallingContext::moduleDescription(), edm::StreamContext::streamID(), and edm::StreamID::value().

Referenced by FastMonitoringService().

607  {
608  microstate_[sc.streamID().value()] = (void*)(mcc.moduleDescription());
609  }
std::vector< ContainableAtomic< const void * > > microstate_
void evf::FastMonitoringService::prePathEvent ( edm::StreamContext const &  sc,
edm::PathContext const &  pc 
)

Definition at line 544 of file FastMonitoringService.cc.

References collectedPathList_, encPath_, edm::EventID::event(), eventCountForPathInit_, edm::StreamContext::eventID(), firstEventId_, fmt_, CommonMethods::lock(), evf::FastMonitoringThread::m_data, makePathLegendaJson(), ministate_, evf::FastMonitoringThread::MonitorData::ministateBins_, evf::FastMonitoringThread::monlock_, pathLegendFileJson_, pathLegendWritten_, edm::PathContext::pathName(), edm::StreamContext::streamID(), AlCaHLTBitMon_QueryRunRegistry::string, and unlikely.

Referenced by FastMonitoringService().

545  {
546  //make sure that all path names are retrieved before allowing ministate to change
547  //hack: assume memory is synchronized after ~50 events seen by each stream
548  if (unlikely(eventCountForPathInit_[sc.streamID()]<50) && false==collectedPathList_[sc.streamID()]->load(std::memory_order_acquire))
549  {
550  //protection between stream threads, as well as the service monitoring thread
551  std::lock_guard<std::mutex> lock(fmt_.monlock_);
552 
553  if (firstEventId_[sc.streamID()]==0)
554  firstEventId_[sc.streamID()]=sc.eventID().event();
555  if (sc.eventID().event()==firstEventId_[sc.streamID()])
556  {
557  encPath_[sc.streamID()].update((void*)&pc.pathName());
558  return;
559  }
560  else {
561  //finished collecting path names
562  collectedPathList_[sc.streamID()]->store(true,std::memory_order_seq_cst);
563  fmt_.m_data.ministateBins_=encPath_[sc.streamID()].vecsize();
564  if (!pathLegendWritten_) {
565  std::string pathLegendStrJson = makePathLegendaJson();
566  FileIO::writeStringToFile(pathLegendFileJson_, pathLegendStrJson);
567  pathLegendWritten_=true;
568  }
569  }
570  }
571  else {
572  ministate_[sc.streamID()] = &(pc.pathName());
573  }
574  }
#define unlikely(x)
std::vector< std::atomic< bool > * > collectedPathList_
std::vector< unsigned long > firstEventId_
std::vector< ContainableAtomic< unsigned int > > eventCountForPathInit_
std::vector< ContainableAtomic< const void * > > ministate_
std::vector< Encoding > encPath_
void evf::FastMonitoringService::preSourceEarlyTermination ( edm::TerminationOrigin  to)
void evf::FastMonitoringService::preSourceEvent ( edm::StreamID  sid)

Definition at line 596 of file FastMonitoringService.cc.

References microstate_, evf::MicroStateService::mInput, evf::MicroStateService::reservedMicroStateNames, and edm::StreamID::value().

Referenced by FastMonitoringService().

597  {
599  }
static const edm::ModuleDescription reservedMicroStateNames[mCOUNT]
std::vector< ContainableAtomic< const void * > > microstate_
unsigned int value() const
Definition: StreamID.h:46
void evf::FastMonitoringService::preStreamBeginLumi ( edm::StreamContext const &  sc)

Definition at line 508 of file FastMonitoringService.cc.

References edm::StreamContext::eventID(), fmt_, CommonMethods::lock(), edm::EventID::luminosityBlock(), evf::FastMonitoringThread::m_data, evf::MicroStateService::mBoL, microstate_, ministate_, evf::FastMonitoringThread::monlock_, nopath_, evf::FastMonitoringThread::MonitorData::processed_, evf::MicroStateService::reservedMicroStateNames, edm::StreamContext::streamID(), evf::FastMonitoringThread::MonitorData::streamLumi_, and edm::StreamID::value().

Referenced by FastMonitoringService().

509  {
510  unsigned int sid = sc.streamID().value();
511 
512  std::lock_guard<std::mutex> lock(fmt_.monlock_);
513  fmt_.m_data.streamLumi_[sid] = sc.eventID().luminosityBlock();
514 
515  //reset collected values for this stream
516  *(fmt_.m_data.processed_[sid])=0;
517 
518  ministate_[sid]=&nopath_;
520  }
static const edm::ModuleDescription reservedMicroStateNames[mCOUNT]
std::vector< ContainableAtomic< const void * > > microstate_
static const std::string nopath_
std::vector< ContainableAtomic< const void * > > ministate_
std::vector< jsoncollector::AtomicMonUInt * > processed_
std::vector< unsigned int > streamLumi_
void evf::FastMonitoringService::preStreamEarlyTermination ( edm::StreamContext const &  sc,
edm::TerminationOrigin  to 
)

Definition at line 305 of file FastMonitoringService.cc.

References edm::StreamContext::eventID(), edm::ExceptionFromAnotherContext, edm::ExceptionFromThisContext, exceptionInLS_, edm::ExternalSignal, fmt_, CommonMethods::lock(), edm::EventID::luminosityBlock(), evf::FastMonitoringThread::monlock_, edm::StreamContext::streamID(), AlCaHLTBitMon_QueryRunRegistry::string, and edm::StreamID::value().

Referenced by FastMonitoringService().

306  {
307  std::string context;
308  if (to==edm::TerminationOrigin::ExceptionFromThisContext) context = " FromThisContext ";
309  if (to==edm::TerminationOrigin::ExceptionFromAnotherContext) context = " FromAnotherContext";
310  if (to==edm::TerminationOrigin::ExternalSignal) context = " FromExternalSignal";
311  edm::LogWarning("FastMonitoringService") << " STREAM " << sc.streamID().value() << " earlyTermination -: ID:"<< sc.eventID()
312  << " LS:" << sc.eventID().luminosityBlock() << " " << context;
313  std::lock_guard<std::mutex> lock(fmt_.monlock_);
314  exceptionInLS_.push_back(sc.eventID().luminosityBlock());
315  }
std::vector< unsigned int > exceptionInLS_
void evf::FastMonitoringService::preStreamEndLumi ( edm::StreamContext const &  sc)

Definition at line 527 of file FastMonitoringService.cc.

References doStreamEOLSnapshot(), edm::StreamContext::eventID(), fmt_, CommonMethods::lock(), edm::EventID::luminosityBlock(), evf::MicroStateService::mEoL, microstate_, ministate_, evf::FastMonitoringThread::monlock_, nopath_, evf::MicroStateService::reservedMicroStateNames, edm::StreamContext::streamID(), and edm::StreamID::value().

Referenced by FastMonitoringService().

528  {
529  unsigned int sid = sc.streamID().value();
530  std::lock_guard<std::mutex> lock(fmt_.monlock_);
531 
532  //update processed count to be complete at this time
533  doStreamEOLSnapshot(sc.eventID().luminosityBlock(),sid);
534  //reset this in case stream does not get notified of next lumi (we keep processed events only)
535  ministate_[sid]=&nopath_;
537  }
static const edm::ModuleDescription reservedMicroStateNames[mCOUNT]
std::vector< ContainableAtomic< const void * > > microstate_
void doStreamEOLSnapshot(const unsigned int ls, const unsigned int streamID)
static const std::string nopath_
std::vector< ContainableAtomic< const void * > > ministate_
void evf::FastMonitoringService::reportLockWait ( unsigned int  ls,
double  waitTime,
unsigned int  lockCount 
)

Definition at line 680 of file FastMonitoringService.cc.

References fmt_, CommonMethods::lock(), lockStatsDuringLumi_, eostools::ls(), and evf::FastMonitoringThread::monlock_.

Referenced by FedRawDataInputSource::readSupervisor().

681  {
682  std::lock_guard<std::mutex> lock(fmt_.monlock_);
683  lockStatsDuringLumi_[ls]=std::pair<double,unsigned int>(waitTime,lockCount);
684 
685  }
std::map< unsigned int, std::pair< double, unsigned int > > lockStatsDuringLumi_
def ls(path, rec=False)
Definition: eostools.py:348
void evf::FastMonitoringService::setExceptionDetected ( unsigned int  ls)

Definition at line 340 of file FastMonitoringService.cc.

References exception_detected_, and exceptionInLS_.

Referenced by FedRawDataInputSource::getNextEvent().

340  {
341  if (!ls) exception_detected_=true;
342  else exceptionInLS_.push_back(ls);
343  }
def ls(path, rec=False)
Definition: eostools.py:348
std::vector< unsigned int > exceptionInLS_
void evf::FastMonitoringService::setInputSource ( FedRawDataInputSource inputSource)
inline

Definition at line 187 of file FastMonitoringService.h.

Referenced by FedRawDataInputSource::FedRawDataInputSource().

187 {inputSource_=inputSource;}
FedRawDataInputSource * inputSource_
void evf::FastMonitoringService::setInState ( FastMonitoringThread::InputState  inputState)
inline
void evf::FastMonitoringService::setInStateSup ( FastMonitoringThread::InputState  inputState)
inline

Definition at line 189 of file FastMonitoringService.h.

References eostools::ls().

Referenced by FedRawDataInputSource::FedRawDataInputSource(), and FedRawDataInputSource::readSupervisor().

189 {inputSupervisorState_=inputState;}
std::atomic< FastMonitoringThread::InputState > inputSupervisorState_
void evf::FastMonitoringService::setMicroState ( MicroStateService::Microstate  m)
overridevirtual

Implements evf::MicroStateService.

Definition at line 621 of file FastMonitoringService.cc.

References mps_fire::i, microstate_, nStreams_, and evf::MicroStateService::reservedMicroStateNames.

622  {
623  for (unsigned int i=0;i<nStreams_;i++)
625  }
static const edm::ModuleDescription reservedMicroStateNames[mCOUNT]
std::vector< ContainableAtomic< const void * > > microstate_
void evf::FastMonitoringService::setMicroState ( edm::StreamID  sid,
MicroStateService::Microstate  m 
)
overridevirtual

Implements evf::MicroStateService.

Definition at line 628 of file FastMonitoringService.cc.

References funct::m, microstate_, and evf::MicroStateService::reservedMicroStateNames.

629  {
631  }
static const edm::ModuleDescription reservedMicroStateNames[mCOUNT]
std::vector< ContainableAtomic< const void * > > microstate_
bool evf::FastMonitoringService::shouldWriteFiles ( unsigned int  lumi,
unsigned int *  proc = 0 
)
inline

Definition at line 180 of file FastMonitoringService.h.

References proc.

Referenced by DQMFileSaver::globalEndLuminosityBlock(), L1TriggerJSONMonitoring::globalEndLuminosityBlockSummary(), and HLTriggerJSONMonitoring::globalEndLuminosityBlockSummary().

181  {
182  unsigned int processed = getEventsProcessedForLumi(lumi);
183  if (proc) *proc = processed;
184  return !getAbortFlagForLumi(lumi);
185  }
unsigned int getEventsProcessedForLumi(unsigned int lumi, bool *abortFlag=0)
TrainProcessor *const proc
Definition: MVATrainer.cc:101
bool getAbortFlagForLumi(unsigned int lumi)
void evf::FastMonitoringService::startedLookingForFile ( )

Definition at line 646 of file FastMonitoringService.cc.

References fileLookStart_.

Referenced by FedRawDataInputSource::readSupervisor().

646  {
647  gettimeofday(&fileLookStart_, nullptr);
648  /*
649  std::cout << "Started looking for .raw file at: s=" << fileLookStart_.tv_sec << ": ms = "
650  << fileLookStart_.tv_usec / 1000.0 << std::endl;
651  */
652  }
void evf::FastMonitoringService::stoppedLookingForFile ( unsigned int  lumi)

Definition at line 654 of file FastMonitoringService.cc.

References avgLeadTime_, fileLookStart_, fileLookStop_, fmt_, mps_fire::i, leadTimes_, CommonMethods::lock(), csvLumiCalc::lumi, lumiFromSource_, and evf::FastMonitoringThread::monlock_.

Referenced by FedRawDataInputSource::readSupervisor().

654  {
655  gettimeofday(&fileLookStop_, nullptr);
656  /*
657  std::cout << "Stopped looking for .raw file at: s=" << fileLookStop_.tv_sec << ": ms = "
658  << fileLookStop_.tv_usec / 1000.0 << std::endl;
659  */
660  std::lock_guard<std::mutex> lock(fmt_.monlock_);
661 
662  if (lumi>lumiFromSource_) {
664  leadTimes_.clear();
665  }
666  unsigned long elapsedTime = (fileLookStop_.tv_sec - fileLookStart_.tv_sec) * 1000000 // sec to us
667  + (fileLookStop_.tv_usec - fileLookStart_.tv_usec); // us
668  // add this to lead times for this lumi
669  leadTimes_.push_back((double)elapsedTime);
670 
671  // recompute average lead time for this lumi
672  if (leadTimes_.size() == 1) avgLeadTime_[lumi] = leadTimes_[0];
673  else {
674  double totTime = 0;
675  for (unsigned int i = 0; i < leadTimes_.size(); i++) totTime += leadTimes_[i];
676  avgLeadTime_[lumi] = 0.001*(totTime / leadTimes_.size());
677  }
678  }
std::map< unsigned int, double > avgLeadTime_
std::vector< double > leadTimes_

Member Data Documentation

std::map<unsigned int, unsigned long> evf::FastMonitoringService::accuSize_
private

Definition at line 282 of file FastMonitoringService.h.

Referenced by accumulateFileSize(), and preGlobalEndLumi().

std::map<unsigned int, double> evf::FastMonitoringService::avgLeadTime_
private

Definition at line 279 of file FastMonitoringService.h.

Referenced by doSnapshot(), postGlobalEndLumi(), and stoppedLookingForFile().

std::vector<std::atomic<bool>*> evf::FastMonitoringService::collectedPathList_
private

Definition at line 294 of file FastMonitoringService.h.

Referenced by preBeginJob(), and prePathEvent().

Encoding evf::FastMonitoringService::encModule_
private
std::vector<Encoding> evf::FastMonitoringService::encPath_
private
std::vector<ContainableAtomic<unsigned int> > evf::FastMonitoringService::eventCountForPathInit_
private

Definition at line 295 of file FastMonitoringService.h.

Referenced by postEvent(), preBeginJob(), and prePathEvent().

bool evf::FastMonitoringService::exception_detected_ = false
private
std::vector<unsigned int> evf::FastMonitoringService::exceptionInLS_
private
std::string evf::FastMonitoringService::fastMicrostateDefPath_
private

Definition at line 256 of file FastMonitoringService.h.

Referenced by FastMonitoringService(), and preBeginJob().

unsigned int evf::FastMonitoringService::fastMonIntervals_
private

Definition at line 254 of file FastMonitoringService.h.

std::string evf::FastMonitoringService::fastName_
private

Definition at line 257 of file FastMonitoringService.h.

Referenced by preBeginJob().

std::string evf::FastMonitoringService::fastPath_
private

Definition at line 257 of file FastMonitoringService.h.

Referenced by preBeginJob().

std::vector<std::string> evf::FastMonitoringService::fastPathList_
private

Definition at line 315 of file FastMonitoringService.h.

Referenced by preBeginJob().

timeval evf::FastMonitoringService::fileLookStart_
private

Definition at line 263 of file FastMonitoringService.h.

Referenced by startedLookingForFile(), and stoppedLookingForFile().

timeval evf::FastMonitoringService::fileLookStop_
private

Definition at line 263 of file FastMonitoringService.h.

Referenced by stoppedLookingForFile().

bool evf::FastMonitoringService::filePerFwkStream_
private

Definition at line 258 of file FastMonitoringService.h.

Referenced by preBeginJob(), and preGlobalEndLumi().

std::map<unsigned int, unsigned int> evf::FastMonitoringService::filesProcessedDuringLumi_
private

Definition at line 280 of file FastMonitoringService.h.

Referenced by accumulateFileSize(), doSnapshot(), and postGlobalEndLumi().

std::vector<unsigned long> evf::FastMonitoringService::firstEventId_
private

Definition at line 293 of file FastMonitoringService.h.

Referenced by preBeginJob(), and prePathEvent().

FastMonitoringThread evf::FastMonitoringService::fmt_
private
std::string evf::FastMonitoringService::inputLegendFileJson_
private

Definition at line 308 of file FastMonitoringService.h.

Referenced by postBeginJob(), and preBeginJob().

FedRawDataInputSource* evf::FastMonitoringService::inputSource_ = 0
private

Definition at line 247 of file FastMonitoringService.h.

Referenced by preGlobalEndLumi().

std::atomic<FastMonitoringThread::InputState> evf::FastMonitoringService::inputState_ { FastMonitoringThread::InputState::inInit }
private

Definition at line 248 of file FastMonitoringService.h.

Referenced by doSnapshot().

const std::string evf::FastMonitoringService::inputStateNames
static
Initial value:
=
{"Ignore","Init","WaitInput","NewLumi","NewLumiBusyEndingLS","NewLumiIdleEndingLS","RunEnd","ProcessingFile","WaitChunk","ChunkReceived",
"ChecksumEvent","CachedEvent","ReadEvent","ReadCleanup","NoRequest","NoRequestWithIdleThreads",
"NoRequestWithGlobalEoL","NoRequestWithEoLThreads",
"SupFileLimit", "SupWaitFreeChunk","SupWaitFreeChunkCopying", "SupWaitFreeThread","SupWaitFreeThreadCopying",
"SupBusy", "SupLockPolling","SupLockPollingCopying",
"SupNoFile", "SupNewFile", "SupNewFileWaitThreadCopying", "SupNewFileWaitThread",
"SupNewFileWaitChunkCopying", "SupNewFileWaitChunk",
"WaitInput_fileLimit","WaitInput_waitFreeChunk","WaitInput_waitFreeChunkCopying","WaitInput_waitFreeThread","WaitInput_waitFreeThreadCopying",
"WaitInput_busy","WaitInput_lockPolling","WaitInput_lockPollingCopying","WaitInput_runEnd",
"WaitInput_noFile","WaitInput_newFile","WaitInput_newFileWaitThreadCopying","WaitInput_newFileWaitThread",
"WaitInput_newFileWaitChunkCopying","WaitInput_newFileWaitChunk",
"WaitChunk_fileLimit","WaitChunk_waitFreeChunk","WaitChunk_waitFreeChunkCopying","WaitChunk_waitFreeThread","WaitChunk_waitFreeThreadCopying",
"WaitChunk_busy","WaitChunk_lockPolling","WaitChunk_lockPollingCopying","WaitChunk_runEnd",
"WaitChunk_noFile","WaitChunk_newFile","WaitChunk_newFileWaitThreadCopying","WaitChunk_newFileWaitThread",
"WaitChunk_newFileWaitChunkCopying","WaitChunk_newFileWaitChunk"
}

Definition at line 129 of file FastMonitoringService.h.

Referenced by makeInputLegendaJson().

std::atomic<FastMonitoringThread::InputState> evf::FastMonitoringService::inputSupervisorState_ { FastMonitoringThread::InputState::inInit }
private

Definition at line 249 of file FastMonitoringService.h.

Referenced by doSnapshot().

std::atomic<bool> evf::FastMonitoringService::isInitTransition_
private

Definition at line 266 of file FastMonitoringService.h.

Referenced by doSnapshot(), postGlobalBeginRun(), and preBeginJob().

unsigned int evf::FastMonitoringService::lastGlobalLumi_
private

Definition at line 265 of file FastMonitoringService.h.

Referenced by preBeginJob(), and preGlobalBeginLumi().

std::vector<double> evf::FastMonitoringService::leadTimes_
private

Definition at line 283 of file FastMonitoringService.h.

Referenced by stoppedLookingForFile().

std::map<unsigned int, std::pair<double,unsigned int> > evf::FastMonitoringService::lockStatsDuringLumi_
private

Definition at line 284 of file FastMonitoringService.h.

Referenced by doSnapshot(), postGlobalEndLumi(), and reportLockWait().

unsigned int evf::FastMonitoringService::lumiFromSource_
private

Definition at line 267 of file FastMonitoringService.h.

Referenced by preBeginJob(), and stoppedLookingForFile().

std::map<unsigned int, timeval> evf::FastMonitoringService::lumiStartTime_
private

Definition at line 262 of file FastMonitoringService.h.

Referenced by preGlobalBeginLumi(), and preGlobalEndLumi().

std::atomic<FastMonitoringThread::Macrostate> evf::FastMonitoringService::macrostate_
private
const std::string evf::FastMonitoringService::macroStateNames
static
Initial value:
=
{"Init","JobReady","RunGiven","Running",
"Stopping","Done","JobEnded","Error","ErrorEnded","End",
"Invalid"}

Definition at line 128 of file FastMonitoringService.h.

std::vector<ContainableAtomic<const void*> > evf::FastMonitoringService::microstate_
private
std::string evf::FastMonitoringService::microstateDefPath_
private

Definition at line 256 of file FastMonitoringService.h.

Referenced by FastMonitoringService(), and preBeginJob().

std::vector<ContainableAtomic<const void*> > evf::FastMonitoringService::ministate_
private
std::string evf::FastMonitoringService::moduleLegendFile_
private

Definition at line 304 of file FastMonitoringService.h.

Referenced by preBeginJob().

std::string evf::FastMonitoringService::moduleLegendFileJson_
private

Definition at line 305 of file FastMonitoringService.h.

Referenced by postBeginJob(), and preBeginJob().

std::atomic<bool> evf::FastMonitoringService::monInit_
private

Definition at line 312 of file FastMonitoringService.h.

Referenced by preBeginJob().

const std::string evf::FastMonitoringService::nopath_ = "NoPath"
static
unsigned int evf::FastMonitoringService::nOutputModules_ =0
private

Definition at line 310 of file FastMonitoringService.h.

Referenced by makeModuleLegendaJson(), and preModuleBeginJob().

unsigned int evf::FastMonitoringService::nStreams_
private

Definition at line 251 of file FastMonitoringService.h.

Referenced by doSnapshot(), preallocate(), preBeginJob(), and setMicroState().

unsigned int evf::FastMonitoringService::nThreads_
private

Definition at line 252 of file FastMonitoringService.h.

Referenced by preallocate(), and preBeginJob().

std::string evf::FastMonitoringService::pathLegendFile_
private

Definition at line 306 of file FastMonitoringService.h.

Referenced by preBeginJob().

std::string evf::FastMonitoringService::pathLegendFileJson_
private

Definition at line 307 of file FastMonitoringService.h.

Referenced by preBeginJob(), and prePathEvent().

bool evf::FastMonitoringService::pathLegendWritten_ = false
private

Definition at line 309 of file FastMonitoringService.h.

Referenced by prePathEvent().

std::vector<bool> evf::FastMonitoringService::pathNamesReady_
private

Definition at line 296 of file FastMonitoringService.h.

std::map<unsigned int, std::pair<unsigned int,bool> > evf::FastMonitoringService::processedEventsPerLumi_
private
boost::filesystem::path evf::FastMonitoringService::runDirectory_
private

Definition at line 298 of file FastMonitoringService.h.

Referenced by preBeginJob().

int evf::FastMonitoringService::sleepTime_
private

Definition at line 253 of file FastMonitoringService.h.

std::string evf::FastMonitoringService::slowName_
private

Definition at line 257 of file FastMonitoringService.h.

Referenced by preGlobalEndLumi().

unsigned int evf::FastMonitoringService::snapCounter_ = 0
private

Definition at line 255 of file FastMonitoringService.h.

std::vector<std::atomic<bool>*> evf::FastMonitoringService::streamCounterUpdating_
private

Definition at line 291 of file FastMonitoringService.h.

Referenced by preBeginJob().

bool evf::FastMonitoringService::threadIDAvailable_ = false
private

Definition at line 300 of file FastMonitoringService.h.

Referenced by preBeginJob().

std::vector<ContainableAtomic<const void*> > evf::FastMonitoringService::threadMicrostate_
private

Definition at line 275 of file FastMonitoringService.h.

std::atomic<unsigned long> evf::FastMonitoringService::totalEventsProcessed_
private

Definition at line 302 of file FastMonitoringService.h.

Referenced by postEvent().

boost::filesystem::path evf::FastMonitoringService::workingDirectory_
private

Definition at line 298 of file FastMonitoringService.h.

Referenced by preBeginJob(), and preGlobalEndLumi().