CMS 3D CMS Logo

List of all members | Public Member Functions | Static Public Member Functions | Static Public Attributes | Private Member Functions | Private Attributes
evf::FastMonitoringService Class Reference

#include <FastMonitoringService.h>

Inheritance diagram for evf::FastMonitoringService:
evf::MicroStateService

Public Member Functions

void accumulateFileSize (unsigned int lumi, unsigned long fileSize)
 
bool exceptionDetected () const
 
 FastMonitoringService (const edm::ParameterSet &, edm::ActivityRegistry &)
 
bool getAbortFlagForLumi (unsigned int lumi)
 
unsigned int getEventsProcessedForLumi (unsigned int lumi, bool *abortFlag=nullptr)
 
std::string getRunDirName () const
 
bool isExceptionOnData (unsigned int ls)
 
void jobFailure ()
 
std::string makeInputLegendaJson ()
 
std::string makeModuleLegendaJson ()
 
std::string makePathLegendaJson ()
 
void postBeginJob ()
 
void postEndJob ()
 
void postEvent (edm::StreamContext const &)
 
void postGlobalBeginRun (edm::GlobalContext const &)
 
void postGlobalEndLumi (edm::GlobalContext const &)
 
void postModuleEvent (edm::StreamContext const &, edm::ModuleCallingContext const &)
 
void postModuleEventAcquire (edm::StreamContext const &, edm::ModuleCallingContext const &)
 
void postSourceEvent (edm::StreamID)
 
void postStreamBeginLumi (edm::StreamContext const &)
 
void postStreamEndLumi (edm::StreamContext const &)
 
void preallocate (edm::service::SystemBounds const &)
 
void preBeginJob (edm::PathsAndConsumesOfModulesBase const &, edm::ProcessContext const &pc)
 
void preEvent (edm::StreamContext const &)
 
void preGlobalBeginLumi (edm::GlobalContext const &)
 
void preGlobalEarlyTermination (edm::GlobalContext const &, edm::TerminationOrigin)
 
void preGlobalEndLumi (edm::GlobalContext const &)
 
void preModuleBeginJob (edm::ModuleDescription const &)
 
void preModuleEvent (edm::StreamContext const &, edm::ModuleCallingContext const &)
 
void preModuleEventAcquire (edm::StreamContext const &, edm::ModuleCallingContext const &)
 
void prePathEvent (edm::StreamContext const &, edm::PathContext const &)
 
void preSourceEarlyTermination (edm::TerminationOrigin)
 
void preSourceEvent (edm::StreamID)
 
void preStreamBeginLumi (edm::StreamContext const &)
 
void preStreamEarlyTermination (edm::StreamContext const &, edm::TerminationOrigin)
 
void preStreamEndLumi (edm::StreamContext const &)
 
void reportLockWait (unsigned int ls, double waitTime, unsigned int lockCount)
 
void setExceptionDetected (unsigned int ls)
 
void setInputSource (FedRawDataInputSource *inputSource)
 
void setInputSource (DAQSource *inputSource)
 
void setInState (FastMonState::InputState inputState)
 
void setInStateSup (FastMonState::InputState inputState)
 
void setMicroState (FastMonState::Microstate)
 
void setMicroState (edm::StreamID, FastMonState::Microstate)
 
bool shouldWriteFiles (unsigned int lumi, unsigned int *proc=nullptr)
 
void startedLookingForFile ()
 
void stoppedLookingForFile (unsigned int lumi)
 
 ~FastMonitoringService () override
 
- Public Member Functions inherited from evf::MicroStateService
 MicroStateService (const edm::ParameterSet &, edm::ActivityRegistry &)
 
virtual ~MicroStateService ()
 

Static Public Member Functions

static void fillDescriptions (edm::ConfigurationDescriptions &descriptions)
 

Static Public Attributes

static const std::string inputStateNames [FastMonState::inCOUNT]
 
static const std::string macroStateNames [FastMonState::MCOUNT]
 
static const std::string nopath_ = "NoPath"
 
static const edm::ModuleDescription reservedMicroStateNames [FastMonState::mCOUNT]
 

Private Member Functions

void doSnapshot (const unsigned int ls, const bool isGlobalEOL)
 
void snapshotRunner ()
 

Private Attributes

std::map< unsigned int, unsigned long > accuSize_
 
std::map< unsigned int, double > avgLeadTime_
 
std::vector< std::atomic< bool > * > collectedPathList_
 
DAQSourcedaqInputSource_ = nullptr
 
bool exception_detected_ = false
 
std::vector< unsigned int > exceptionInLS_
 
std::string fastMicrostateDefPath_
 
unsigned int fastMonIntervals_
 
std::string fastName_
 
std::string fastPath_
 
std::vector< std::string > fastPathList_
 
timeval fileLookStart_
 
timeval fileLookStop_
 
bool filePerFwkStream_
 
std::map< unsigned int, unsigned int > filesProcessedDuringLumi_
 
std::shared_ptr< FastMonitoringThreadfmt_
 
std::atomic< bool > has_data_exception_ = false
 
std::atomic< bool > has_source_exception_ = false
 
std::string inputLegendFileJson_
 
FedRawDataInputSourceinputSource_ = nullptr
 
std::atomic< FastMonState::InputStateinputState_ {FastMonState::InputState::inInit}
 
std::atomic< FastMonState::InputStateinputSupervisorState_ {FastMonState::InputState::inInit}
 
std::atomic< bool > isInitTransition_
 
unsigned int lastGlobalLumi_
 
std::vector< double > leadTimes_
 
std::map< unsigned int, std::pair< double, unsigned int > > lockStatsDuringLumi_
 
unsigned int lumiFromSource_
 
std::map< unsigned int, timeval > lumiStartTime_
 
std::string microstateDefPath_
 
std::string moduleLegendFile_
 
std::string moduleLegendFileJson_
 
std::atomic< bool > monInit_
 
unsigned int nOutputModules_ = 0
 
unsigned int nStreams_
 
unsigned int nThreads_
 
std::string pathLegendFile_
 
std::string pathLegendFileJson_
 
std::vector< bool > pathNamesReady_
 
std::map< unsigned int, std::pair< unsigned int, bool > > processedEventsPerLumi_
 
std::filesystem::path runDirectory_
 
int sleepTime_
 
std::string slowName_
 
unsigned int snapCounter_ = 0
 
std::vector< std::atomic< bool > * > streamCounterUpdating_
 
bool threadIDAvailable_ = false
 
std::atomic< unsigned long > totalEventsProcessed_
 
bool verbose_ = false
 
std::filesystem::path workingDirectory_
 

Detailed Description

Definition at line 156 of file FastMonitoringService.h.

Constructor & Destructor Documentation

◆ FastMonitoringService()

evf::FastMonitoringService::FastMonitoringService ( const edm::ParameterSet iPS,
edm::ActivityRegistry reg 
)

Definition at line 126 of file FastMonitoringService.cc.

References Exception, fastMicrostateDefPath_, jobFailure(), microstateDefPath_, postBeginJob(), postEndJob(), postEvent(), postGlobalEndLumi(), postModuleEvent(), postModuleEventAcquire(), postSourceEvent(), postStreamBeginLumi(), postStreamEndLumi(), preallocate(), preBeginJob(), preEvent(), preGlobalBeginLumi(), preGlobalEarlyTermination(), preGlobalEndLumi(), preModuleBeginJob(), preModuleEvent(), preModuleEventAcquire(), prePathEvent(), preSourceEarlyTermination(), preSourceEvent(), preStreamBeginLumi(), preStreamEarlyTermination(), preStreamEndLumi(), edm_modernize_messagelogger::stat, AlCaHLTBitMon_QueryRunRegistry::string, edm::ActivityRegistry::watchJobFailure(), edm::ActivityRegistry::watchPostBeginJob(), edm::ActivityRegistry::watchPostEndJob(), edm::ActivityRegistry::watchPostEvent(), edm::ActivityRegistry::watchPostGlobalEndLumi(), edm::ActivityRegistry::watchPostModuleEvent(), edm::ActivityRegistry::watchPostModuleEventAcquire(), edm::ActivityRegistry::watchPostSourceEvent(), edm::ActivityRegistry::watchPostStreamBeginLumi(), edm::ActivityRegistry::watchPostStreamEndLumi(), edm::ActivityRegistry::watchPreallocate(), edm::ActivityRegistry::watchPreBeginJob(), edm::ActivityRegistry::watchPreEvent(), edm::ActivityRegistry::watchPreGlobalBeginLumi(), edm::ActivityRegistry::watchPreGlobalEarlyTermination(), edm::ActivityRegistry::watchPreGlobalEndLumi(), edm::ActivityRegistry::watchPreModuleBeginJob(), edm::ActivityRegistry::watchPreModuleEvent(), edm::ActivityRegistry::watchPreModuleEventAcquire(), edm::ActivityRegistry::watchPrePathEvent(), edm::ActivityRegistry::watchPreSourceEarlyTermination(), edm::ActivityRegistry::watchPreSourceEvent(), edm::ActivityRegistry::watchPreStreamBeginLumi(), edm::ActivityRegistry::watchPreStreamEarlyTermination(), and edm::ActivityRegistry::watchPreStreamEndLumi().

127  : MicroStateService(iPS, reg),
128  fmt_(new FastMonitoringThread()),
129  nStreams_(0) //until initialized
130  ,
131  sleepTime_(iPS.getUntrackedParameter<int>("sleepTime", 1)),
132  fastMonIntervals_(iPS.getUntrackedParameter<unsigned int>("fastMonIntervals", 2)),
133  fastName_("fastmoni"),
134  slowName_("slowmoni"),
135  filePerFwkStream_(iPS.getUntrackedParameter<bool>("filePerFwkStream", false)),
137  verbose_(iPS.getUntrackedParameter<bool>("verbose")) {
138  reg.watchPreallocate(this, &FastMonitoringService::preallocate); //receiving information on number of threads
140 
145 
149 
154 
156 
157  reg.watchPreEvent(this, &FastMonitoringService::preEvent); //stream
159 
160  reg.watchPreSourceEvent(this, &FastMonitoringService::preSourceEvent); //source (with streamID of requestor)
162 
165 
168 
172 
173  //find microstate definition path (required by the module)
174  struct stat statbuf;
175  std::string microstateBaseSuffix = "src/EventFilter/Utilities/plugins/microstatedef.jsd";
176  std::string microstatePath = std::string(std::getenv("CMSSW_BASE")) + "/" + microstateBaseSuffix;
177  if (stat(microstatePath.c_str(), &statbuf)) {
178  microstatePath = std::string(std::getenv("CMSSW_RELEASE_BASE")) + "/" + microstateBaseSuffix;
179  if (stat(microstatePath.c_str(), &statbuf)) {
180  microstatePath = microstateBaseSuffix;
181  if (stat(microstatePath.c_str(), &statbuf))
182  throw cms::Exception("FastMonitoringService") << "microstate definition file not found";
183  }
184  }
185  fastMicrostateDefPath_ = microstateDefPath_ = microstatePath;
186  }
void prePathEvent(edm::StreamContext const &, edm::PathContext const &)
void watchPreStreamEarlyTermination(PreStreamEarlyTermination::slot_type const &iSlot)
void watchPreEvent(PreEvent::slot_type const &iSlot)
void postModuleEventAcquire(edm::StreamContext const &, edm::ModuleCallingContext const &)
void watchPrePathEvent(PrePathEvent::slot_type const &iSlot)
void watchPreallocate(Preallocate::slot_type const &iSlot)
void watchPreModuleEventAcquire(PreModuleEventAcquire::slot_type const &iSlot)
void watchPostEndJob(PostEndJob::slot_type const &iSlot)
void preallocate(edm::service::SystemBounds const &)
void preGlobalBeginLumi(edm::GlobalContext const &)
void watchPreModuleEvent(PreModuleEvent::slot_type const &iSlot)
void postGlobalEndLumi(edm::GlobalContext const &)
void postEvent(edm::StreamContext const &)
void watchPreGlobalEndLumi(PreGlobalEndLumi::slot_type const &iSlot)
void watchPostEvent(PostEvent::slot_type const &iSlot)
void watchPostStreamEndLumi(PostStreamEndLumi::slot_type const &iSlot)
void watchPreGlobalBeginLumi(PreGlobalBeginLumi::slot_type const &iSlot)
void watchPostModuleEvent(PostModuleEvent::slot_type const &iSlot)
void watchPostSourceEvent(PostSourceEvent::slot_type const &iSlot)
void preGlobalEndLumi(edm::GlobalContext const &)
void watchPreStreamEndLumi(PreStreamEndLumi::slot_type const &iSlot)
void preGlobalEarlyTermination(edm::GlobalContext const &, edm::TerminationOrigin)
void watchPreSourceEarlyTermination(PreSourceEarlyTermination::slot_type const &iSlot)
void watchJobFailure(JobFailure::slot_type const &iSlot)
convenience function for attaching to signal
T getUntrackedParameter(std::string const &, T const &) const
void preModuleBeginJob(edm::ModuleDescription const &)
MicroStateService(const edm::ParameterSet &, edm::ActivityRegistry &)
void preStreamEndLumi(edm::StreamContext const &)
void watchPostStreamBeginLumi(PostStreamBeginLumi::slot_type const &iSlot)
void preStreamEarlyTermination(edm::StreamContext const &, edm::TerminationOrigin)
void watchPreGlobalEarlyTermination(PreGlobalEarlyTermination::slot_type const &iSlot)
void watchPostModuleEventAcquire(PostModuleEventAcquire::slot_type const &iSlot)
void watchPostGlobalEndLumi(PostGlobalEndLumi::slot_type const &iSlot)
void watchPreModuleBeginJob(PreModuleBeginJob::slot_type const &iSlot)
void preModuleEventAcquire(edm::StreamContext const &, edm::ModuleCallingContext const &)
void postModuleEvent(edm::StreamContext const &, edm::ModuleCallingContext const &)
void postStreamBeginLumi(edm::StreamContext const &)
void postStreamEndLumi(edm::StreamContext const &)
void preStreamBeginLumi(edm::StreamContext const &)
std::atomic< unsigned long > totalEventsProcessed_
void watchPreStreamBeginLumi(PreStreamBeginLumi::slot_type const &iSlot)
void preBeginJob(edm::PathsAndConsumesOfModulesBase const &, edm::ProcessContext const &pc)
void watchPreBeginJob(PreBeginJob::slot_type const &iSlot)
convenience function for attaching to signal
void preEvent(edm::StreamContext const &)
void preSourceEarlyTermination(edm::TerminationOrigin)
void watchPreSourceEvent(PreSourceEvent::slot_type const &iSlot)
std::shared_ptr< FastMonitoringThread > fmt_
void preModuleEvent(edm::StreamContext const &, edm::ModuleCallingContext const &)
void watchPostBeginJob(PostBeginJob::slot_type const &iSlot)
convenience function for attaching to signal

◆ ~FastMonitoringService()

evf::FastMonitoringService::~FastMonitoringService ( )
override

Definition at line 188 of file FastMonitoringService.cc.

188 {}

Member Function Documentation

◆ accumulateFileSize()

void evf::FastMonitoringService::accumulateFileSize ( unsigned int  lumi,
unsigned long  fileSize 
)

Definition at line 697 of file FastMonitoringService.cc.

References accuSize_, filesProcessedDuringLumi_, fmt_, CommonMethods::lock(), and BXlumiParameters_cfi::lumi.

Referenced by evf::EvFDaqDirector::bumpFile().

697  {
698  std::lock_guard<std::mutex> lock(fmt_->monlock_);
699 
700  if (accuSize_.find(lumi) == accuSize_.end())
701  accuSize_[lumi] = fileSize;
702  else
703  accuSize_[lumi] += fileSize;
704 
707  else
709  }
std::map< unsigned int, unsigned long > accuSize_
std::map< unsigned int, unsigned int > filesProcessedDuringLumi_
std::shared_ptr< FastMonitoringThread > fmt_

◆ doSnapshot()

void evf::FastMonitoringService::doSnapshot ( const unsigned int  ls,
const bool  isGlobalEOL 
)
private

Definition at line 846 of file FastMonitoringService.cc.

References avgLeadTime_, filesProcessedDuringLumi_, fmt_, mps_fire::i, caHitNtupletGeneratorKernels::if(), evf::FastMonState::inNewLumi, evf::FastMonState::inNoRequest, evf::FastMonState::inNoRequestWithEoLThreads, evf::FastMonState::inNoRequestWithIdleThreads, inputState_, inputSupervisorState_, evf::FastMonState::inRunEnd, evf::FastMonState::inSupBusy, evf::FastMonState::inSupFileLimit, evf::FastMonState::inSupLockPolling, evf::FastMonState::inSupLockPollingCopying, evf::FastMonState::inSupNewFile, evf::FastMonState::inSupNewFileWaitChunk, evf::FastMonState::inSupNewFileWaitChunkCopying, evf::FastMonState::inSupNewFileWaitThread, evf::FastMonState::inSupNewFileWaitThreadCopying, evf::FastMonState::inSupNoFile, evf::FastMonState::inSupThrottled, evf::FastMonState::inSupWaitFreeChunk, evf::FastMonState::inSupWaitFreeChunkCopying, evf::FastMonState::inSupWaitFreeThread, evf::FastMonState::inSupWaitFreeThreadCopying, evf::FastMonState::inWaitChunk, evf::FastMonState::inWaitChunk_busy, evf::FastMonState::inWaitChunk_fileLimit, evf::FastMonState::inWaitChunk_lockPolling, evf::FastMonState::inWaitChunk_lockPollingCopying, evf::FastMonState::inWaitChunk_newFile, evf::FastMonState::inWaitChunk_newFileWaitChunk, evf::FastMonState::inWaitChunk_newFileWaitChunkCopying, evf::FastMonState::inWaitChunk_newFileWaitThread, evf::FastMonState::inWaitChunk_newFileWaitThreadCopying, evf::FastMonState::inWaitChunk_noFile, evf::FastMonState::inWaitChunk_runEnd, evf::FastMonState::inWaitChunk_waitFreeChunk, evf::FastMonState::inWaitChunk_waitFreeChunkCopying, evf::FastMonState::inWaitChunk_waitFreeThread, evf::FastMonState::inWaitChunk_waitFreeThreadCopying, evf::FastMonState::inWaitInput, evf::FastMonState::inWaitInput_busy, evf::FastMonState::inWaitInput_fileLimit, evf::FastMonState::inWaitInput_lockPolling, evf::FastMonState::inWaitInput_lockPollingCopying, evf::FastMonState::inWaitInput_newFile, evf::FastMonState::inWaitInput_newFileWaitChunk, evf::FastMonState::inWaitInput_newFileWaitChunkCopying, evf::FastMonState::inWaitInput_newFileWaitThread, evf::FastMonState::inWaitInput_newFileWaitThreadCopying, evf::FastMonState::inWaitInput_noFile, evf::FastMonState::inWaitInput_runEnd, evf::FastMonState::inWaitInput_waitFreeChunk, evf::FastMonState::inWaitInput_waitFreeChunkCopying, evf::FastMonState::inWaitInput_waitFreeThread, evf::FastMonState::inWaitInput_waitFreeThreadCopying, isInitTransition_, lockStatsDuringLumi_, eostools::ls(), evf::FastMonState::mEoL, evf::FastMonState::mFwkEoL, evf::FastMonState::mIdle, nStreams_, and reservedMicroStateNames.

Referenced by preGlobalEndLumi(), and snapshotRunner().

846  {
847  // update macrostate
848  fmt_->m_data.fastMacrostateJ_ = fmt_->m_data.macrostate_;
849 
850  std::vector<const void*> microstateCopy(fmt_->m_data.microstate_.begin(), fmt_->m_data.microstate_.end());
851  std::vector<unsigned char> microstateAcqCopy(fmt_->m_data.microstateAcqFlag_.begin(),
852  fmt_->m_data.microstateAcqFlag_.end());
853 
854  if (!isInitTransition_) {
855  auto itd = avgLeadTime_.find(ls);
856  if (itd != avgLeadTime_.end())
857  fmt_->m_data.fastAvgLeadTimeJ_ = itd->second;
858  else
859  fmt_->m_data.fastAvgLeadTimeJ_ = 0.;
860 
861  auto iti = filesProcessedDuringLumi_.find(ls);
862  if (iti != filesProcessedDuringLumi_.end())
863  fmt_->m_data.fastFilesProcessedJ_ = iti->second;
864  else
865  fmt_->m_data.fastFilesProcessedJ_ = 0;
866 
867  auto itrd = lockStatsDuringLumi_.find(ls);
868  if (itrd != lockStatsDuringLumi_.end()) {
869  fmt_->m_data.fastLockWaitJ_ = itrd->second.first;
870  fmt_->m_data.fastLockCountJ_ = itrd->second.second;
871  } else {
872  fmt_->m_data.fastLockWaitJ_ = 0.;
873  fmt_->m_data.fastLockCountJ_ = 0.;
874  }
875  }
876 
877  for (unsigned int i = 0; i < nStreams_; i++) {
878  fmt_->m_data.ministateEncoded_[i] = fmt_->m_data.encPath_[i].encodeString(fmt_->m_data.ministate_[i]);
879  if (microstateAcqCopy[i])
880  fmt_->m_data.microstateEncoded_[i] =
881  fmt_->m_data.microstateBins_ + fmt_->m_data.encModule_.encode(microstateCopy[i]);
882  else
883  fmt_->m_data.microstateEncoded_[i] = fmt_->m_data.encModule_.encode(microstateCopy[i]);
884  }
885 
886  bool inputStatePerThread = false;
887 
889  switch (inputSupervisorState_) {
891  fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_fileLimit;
892  break;
894  fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_waitFreeChunk;
895  break;
897  fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_waitFreeChunkCopying;
898  break;
900  fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_waitFreeThread;
901  break;
903  fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_waitFreeThreadCopying;
904  break;
906  fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_busy;
907  break;
909  fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_lockPolling;
910  break;
912  fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_lockPollingCopying;
913  break;
915  fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_runEnd;
916  break;
918  fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_noFile;
919  break;
921  fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_newFile;
922  break;
925  break;
927  fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_newFileWaitThread;
928  break;
931  break;
933  fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_newFileWaitChunk;
934  break;
935  default:
936  fmt_->m_data.inputState_[0] = FastMonState::inWaitInput;
937  }
938  } else if (inputState_ == FastMonState::inWaitChunk) {
939  switch (inputSupervisorState_) {
941  fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_fileLimit;
942  break;
944  fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_waitFreeChunk;
945  break;
947  fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_waitFreeChunkCopying;
948  break;
950  fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_waitFreeThread;
951  break;
953  fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_waitFreeThreadCopying;
954  break;
956  fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_busy;
957  break;
959  fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_lockPolling;
960  break;
962  fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_lockPollingCopying;
963  break;
965  fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_runEnd;
966  break;
968  fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_noFile;
969  break;
971  fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_newFile;
972  break;
975  break;
977  fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_newFileWaitThread;
978  break;
981  break;
983  fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_newFileWaitChunk;
984  break;
985  default:
986  fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk;
987  }
988  } else if (inputState_ == FastMonState::inNoRequest) {
989  inputStatePerThread = true;
990  for (unsigned int i = 0; i < nStreams_; i++) {
991  if (microstateCopy[i] == &reservedMicroStateNames[FastMonState::mIdle])
992  fmt_->m_data.inputState_[i] = FastMonState::inNoRequestWithIdleThreads;
993  else if (microstateCopy[i] == &reservedMicroStateNames[FastMonState::mEoL] ||
994  microstateCopy[i] == &reservedMicroStateNames[FastMonState::mFwkEoL])
995  fmt_->m_data.inputState_[i] = FastMonState::inNoRequestWithEoLThreads;
996  else
997  fmt_->m_data.inputState_[i] = FastMonState::inNoRequest;
998  }
999  } else if (inputState_ == FastMonState::inNewLumi) {
1000  inputStatePerThread = true;
1001  for (unsigned int i = 0; i < nStreams_; i++) {
1002  if (microstateCopy[i] == &reservedMicroStateNames[FastMonState::mEoL] ||
1003  microstateCopy[i] == &reservedMicroStateNames[FastMonState::mFwkEoL])
1004  fmt_->m_data.inputState_[i] = FastMonState::inNewLumi;
1005  }
1007  //apply directly throttled state from supervisor
1008  fmt_->m_data.inputState_[0] = inputSupervisorState_;
1009  } else
1010  fmt_->m_data.inputState_[0] = inputState_;
1011 
1012  //this is same for all streams
1013  if (!inputStatePerThread)
1014  for (unsigned int i = 1; i < nStreams_; i++)
1015  fmt_->m_data.inputState_[i] = fmt_->m_data.inputState_[0];
1016 
1017  if (isGlobalEOL) { //only update global variables
1018  fmt_->jsonMonitor_->snapGlobal(ls);
1019  } else
1020  fmt_->jsonMonitor_->snap(ls);
1021  }
std::atomic< FastMonState::InputState > inputState_
std::atomic< bool > isInitTransition_
std::map< unsigned int, unsigned int > filesProcessedDuringLumi_
static const edm::ModuleDescription reservedMicroStateNames[FastMonState::mCOUNT]
std::map< unsigned int, double > avgLeadTime_
std::atomic< FastMonState::InputState > inputSupervisorState_
def ls(path, rec=False)
Definition: eostools.py:349
std::map< unsigned int, std::pair< double, unsigned int > > lockStatsDuringLumi_
std::shared_ptr< FastMonitoringThread > fmt_

◆ exceptionDetected()

bool evf::FastMonitoringService::exceptionDetected ( ) const

Definition at line 434 of file FastMonitoringService.cc.

References has_data_exception_, and has_source_exception_.

Referenced by DAQSource::~DAQSource(), and FedRawDataInputSource::~FedRawDataInputSource().

434  {
435  return has_source_exception_.load() || has_data_exception_.load();
436  }
std::atomic< bool > has_data_exception_
std::atomic< bool > has_source_exception_

◆ fillDescriptions()

void evf::FastMonitoringService::fillDescriptions ( edm::ConfigurationDescriptions descriptions)
static

Definition at line 190 of file FastMonitoringService.cc.

References edm::ConfigurationDescriptions::add(), and submitPVResolutionJobs::desc.

190  {
192  desc.setComment("Service for File-based DAQ monitoring and event accounting");
193  desc.addUntracked<int>("sleepTime", 1)->setComment("Sleep time of the monitoring thread");
194  desc.addUntracked<unsigned int>("fastMonIntervals", 2)
195  ->setComment("Modulo of sleepTime intervals on which fastmon file is written out");
196  desc.addUntracked<bool>("filePerFwkStream", false)
197  ->setComment("Switches on monitoring output per framework stream");
198  desc.addUntracked<bool>("verbose", false)->setComment("Set to use LogInfo messages from the monitoring thread");
199  desc.setAllowAnything();
200  descriptions.add("FastMonitoringService", desc);
201  }
void add(std::string const &label, ParameterSetDescription const &psetDescription)

◆ getAbortFlagForLumi()

bool evf::FastMonitoringService::getAbortFlagForLumi ( unsigned int  lumi)

Definition at line 771 of file FastMonitoringService.cc.

References Exception, fmt_, CommonMethods::lock(), BXlumiParameters_cfi::lumi, and processedEventsPerLumi_.

Referenced by shouldWriteFiles().

771  {
772  std::lock_guard<std::mutex> lock(fmt_->monlock_);
773 
774  auto it = processedEventsPerLumi_.find(lumi);
775  if (it != processedEventsPerLumi_.end()) {
776  unsigned int abortFlag = it->second.second;
777  return abortFlag;
778  } else {
779  throw cms::Exception("FastMonitoringService")
780  << "output module wants already deleted (or never reported by SOURCE) lumisection status for LUMI -: "
781  << lumi;
782  return false;
783  }
784  }
std::map< unsigned int, std::pair< unsigned int, bool > > processedEventsPerLumi_
std::shared_ptr< FastMonitoringThread > fmt_

◆ getEventsProcessedForLumi()

unsigned int evf::FastMonitoringService::getEventsProcessedForLumi ( unsigned int  lumi,
bool *  abortFlag = nullptr 
)

Definition at line 753 of file FastMonitoringService.cc.

References Exception, fmt_, CommonMethods::lock(), BXlumiParameters_cfi::lumi, ValidateTausOnZEEFastSim_cff::proc, and processedEventsPerLumi_.

Referenced by dqm::DQMFileSaverPB::fillJson(), dqmfilesaver::fillJson(), evf::EvFOutputModule::globalEndLuminosityBlock(), evf::GlobalEvFOutputModule::globalEndLuminosityBlock(), dqm::DQMFileSaverPB::saveLumi(), and shouldWriteFiles().

753  {
754  std::lock_guard<std::mutex> lock(fmt_->monlock_);
755 
756  auto it = processedEventsPerLumi_.find(lumi);
757  if (it != processedEventsPerLumi_.end()) {
758  unsigned int proc = it->second.first;
759  if (abortFlag)
760  *abortFlag = it->second.second;
761  return proc;
762  } else {
763  throw cms::Exception("FastMonitoringService")
764  << "output module wants already deleted (or never reported by SOURCE) lumisection event count for LUMI -: "
765  << lumi;
766  return 0;
767  }
768  }
std::map< unsigned int, std::pair< unsigned int, bool > > processedEventsPerLumi_
std::shared_ptr< FastMonitoringThread > fmt_

◆ getRunDirName()

std::string evf::FastMonitoringService::getRunDirName ( ) const
inline

Definition at line 221 of file FastMonitoringService.h.

References runDirectory_.

221 { return runDirectory_.stem().string(); }
std::filesystem::path runDirectory_

◆ isExceptionOnData()

bool evf::FastMonitoringService::isExceptionOnData ( unsigned int  ls)

Definition at line 438 of file FastMonitoringService.cc.

References exceptionInLS_, fmt_, has_data_exception_, has_source_exception_, CommonMethods::lock(), and eostools::ls().

Referenced by evf::EvFDaqDirector::preGlobalEndLumi(), FedRawDataInputSource::read(), DAQSource::read(), DAQSource::~DAQSource(), and FedRawDataInputSource::~FedRawDataInputSource().

438  {
439  if (!has_data_exception_.load())
440  return false;
441  if (has_source_exception_.load())
442  return true;
443  std::lock_guard<std::mutex> lock(fmt_->monlock_);
444  for (auto ex : exceptionInLS_) {
445  if (ls == ex)
446  return true;
447  }
448  return false;
449  }
std::atomic< bool > has_data_exception_
def ls(path, rec=False)
Definition: eostools.py:349
std::atomic< bool > has_source_exception_
std::shared_ptr< FastMonitoringThread > fmt_
std::vector< unsigned int > exceptionInLS_

◆ jobFailure()

void evf::FastMonitoringService::jobFailure ( )

Definition at line 451 of file FastMonitoringService.cc.

References fmt_, and evf::FastMonState::sError.

Referenced by FastMonitoringService().

451 { fmt_->m_data.macrostate_ = FastMonState::sError; }
std::shared_ptr< FastMonitoringThread > fmt_

◆ makeInputLegendaJson()

std::string evf::FastMonitoringService::makeInputLegendaJson ( )

Definition at line 236 of file FastMonitoringService.cc.

References Json::Value::append(), Json::arrayValue, mps_fire::i, evf::FastMonState::inCOUNT, inputStateNames, and convertToRaw::writer.

Referenced by postBeginJob().

236  {
237  Json::Value legendaVector(Json::arrayValue);
238  for (int i = 0; i < FastMonState::inCOUNT; i++)
239  legendaVector.append(Json::Value(inputStateNames[i]));
240  Json::Value moduleLegend;
241  moduleLegend["names"] = legendaVector;
243  return writer.write(moduleLegend);
244  }
static const std::string inputStateNames[FastMonState::inCOUNT]
Represents a JSON value.
Definition: value.h:99
Writes a Value in JSON format in a human friendly way.
Definition: writer.h:63
array value (ordered list)
Definition: value.h:30

◆ makeModuleLegendaJson()

std::string evf::FastMonitoringService::makeModuleLegendaJson ( )

Definition at line 215 of file FastMonitoringService.cc.

References Json::Value::append(), Json::arrayValue, fmt_, mps_fire::i, nOutputModules_, evf::nReservedModules, evf::nSpecialModules, and convertToRaw::writer.

Referenced by postBeginJob().

215  {
216  Json::Value legendaVector(Json::arrayValue);
217  for (int i = 0; i < fmt_->m_data.encModule_.current_; i++)
218  legendaVector.append(
219  Json::Value((static_cast<const edm::ModuleDescription*>(fmt_->m_data.encModule_.decode(i)))->moduleLabel()));
220  //duplicate modules adding a list for acquire states (not all modules actually have it)
221  for (int i = 0; i < fmt_->m_data.encModule_.current_; i++)
222  legendaVector.append(Json::Value(
223  (static_cast<const edm::ModuleDescription*>(fmt_->m_data.encModule_.decode(i)))->moduleLabel() + "__ACQ"));
224  Json::Value valReserved(nReservedModules);
225  Json::Value valSpecial(nSpecialModules);
226  Json::Value valOutputModules(nOutputModules_);
227  Json::Value moduleLegend;
228  moduleLegend["names"] = legendaVector;
229  moduleLegend["reserved"] = valReserved;
230  moduleLegend["special"] = valSpecial;
231  moduleLegend["output"] = valOutputModules;
233  return writer.write(moduleLegend);
234  }
Represents a JSON value.
Definition: value.h:99
constexpr int nSpecialModules
constexpr int nReservedModules
std::shared_ptr< FastMonitoringThread > fmt_
Writes a Value in JSON format in a human friendly way.
Definition: writer.h:63
array value (ordered list)
Definition: value.h:30

◆ makePathLegendaJson()

std::string evf::FastMonitoringService::makePathLegendaJson ( )

Definition at line 203 of file FastMonitoringService.cc.

References Json::Value::append(), Json::arrayValue, fmt_, mps_fire::i, evf::nReservedPaths, and convertToRaw::writer.

Referenced by postBeginJob().

203  {
204  Json::Value legendaVector(Json::arrayValue);
205  for (int i = 0; i < fmt_->m_data.encPath_[0].current_; i++)
206  legendaVector.append(Json::Value(*(static_cast<const std::string*>(fmt_->m_data.encPath_[0].decode(i)))));
207  Json::Value valReserved(nReservedPaths);
208  Json::Value pathLegend;
209  pathLegend["names"] = legendaVector;
210  pathLegend["reserved"] = valReserved;
212  return writer.write(pathLegend);
213  }
Represents a JSON value.
Definition: value.h:99
constexpr int nReservedPaths
std::shared_ptr< FastMonitoringThread > fmt_
Writes a Value in JSON format in a human friendly way.
Definition: writer.h:63
array value (ordered list)
Definition: value.h:30

◆ postBeginJob()

void evf::FastMonitoringService::postBeginJob ( )

Definition at line 469 of file FastMonitoringService.cc.

References fmt_, inputLegendFileJson_, CommonMethods::lock(), makeInputLegendaJson(), makeModuleLegendaJson(), makePathLegendaJson(), moduleLegendFileJson_, pathLegendFileJson_, evf::FastMonState::sJobReady, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by FastMonitoringService().

469  {
470  std::string&& moduleLegStrJson = makeModuleLegendaJson();
471  FileIO::writeStringToFile(moduleLegendFileJson_, moduleLegStrJson);
472 
473  std::string inputLegendStrJson = makeInputLegendaJson();
474  FileIO::writeStringToFile(inputLegendFileJson_, inputLegendStrJson);
475 
476  std::string pathLegendStrJson = makePathLegendaJson();
477  FileIO::writeStringToFile(pathLegendFileJson_, pathLegendStrJson);
478 
479  fmt_->m_data.macrostate_ = FastMonState::sJobReady;
480 
481  //update number of entries in module histogram
482  std::lock_guard<std::mutex> lock(fmt_->monlock_);
483  //double the size to add post-acquire states
484  fmt_->m_data.microstateBins_ = fmt_->m_data.encModule_.vecsize() * 2;
485  }
std::shared_ptr< FastMonitoringThread > fmt_

◆ postEndJob()

void evf::FastMonitoringService::postEndJob ( )

Definition at line 487 of file FastMonitoringService.cc.

References fmt_, and evf::FastMonState::sJobEnded.

Referenced by FastMonitoringService().

487  {
488  fmt_->m_data.macrostate_ = FastMonState::sJobEnded;
489  fmt_->stop();
490  }
std::shared_ptr< FastMonitoringThread > fmt_

◆ postEvent()

void evf::FastMonitoringService::postEvent ( edm::StreamContext const &  sc)

Definition at line 642 of file FastMonitoringService.cc.

References fmt_, evf::FastMonState::mIdle, nopath_, reservedMicroStateNames, edm::StreamContext::streamID(), and totalEventsProcessed_.

Referenced by FastMonitoringService().

642  {
643  fmt_->m_data.microstate_[sc.streamID()] = &reservedMicroStateNames[FastMonState::mIdle];
644 
645  fmt_->m_data.ministate_[sc.streamID()] = &nopath_;
646 
647  (*(fmt_->m_data.processed_[sc.streamID()]))++;
648 
649  //fast path counter (events accumulated in a run)
650  unsigned long res = totalEventsProcessed_.fetch_add(1, std::memory_order_relaxed);
651  fmt_->m_data.fastPathProcessedJ_ = res + 1;
652  }
static const edm::ModuleDescription reservedMicroStateNames[FastMonState::mCOUNT]
Definition: Electron.h:6
static const std::string nopath_
std::atomic< unsigned long > totalEventsProcessed_
std::shared_ptr< FastMonitoringThread > fmt_

◆ postGlobalBeginRun()

void evf::FastMonitoringService::postGlobalBeginRun ( edm::GlobalContext const &  gc)

Definition at line 492 of file FastMonitoringService.cc.

References fmt_, isInitTransition_, and evf::FastMonState::sRunning.

492  {
493  fmt_->m_data.macrostate_ = FastMonState::sRunning;
494  isInitTransition_ = false;
495  }
std::atomic< bool > isInitTransition_
std::shared_ptr< FastMonitoringThread > fmt_

◆ postGlobalEndLumi()

void evf::FastMonitoringService::postGlobalEndLumi ( edm::GlobalContext const &  gc)

Definition at line 592 of file FastMonitoringService.cc.

References avgLeadTime_, filesProcessedDuringLumi_, fmt_, CommonMethods::lock(), lockStatsDuringLumi_, edm::LuminosityBlockID::luminosityBlock(), edm::GlobalContext::luminosityBlockID(), and processedEventsPerLumi_.

Referenced by FastMonitoringService().

592  {
593  std::lock_guard<std::mutex> lock(fmt_->monlock_);
594  unsigned int lumi = gc.luminosityBlockID().luminosityBlock();
595  //LS monitoring snapshot with input source data has been taken in previous callback
596  avgLeadTime_.erase(lumi);
598  lockStatsDuringLumi_.erase(lumi);
599 
600  //output module already used this in end lumi (this could be migrated to EvFDaqDirector as it is essential for FFF bookkeeping)
602  }
std::map< unsigned int, unsigned int > filesProcessedDuringLumi_
std::map< unsigned int, std::pair< unsigned int, bool > > processedEventsPerLumi_
std::map< unsigned int, double > avgLeadTime_
std::map< unsigned int, std::pair< double, unsigned int > > lockStatsDuringLumi_
std::shared_ptr< FastMonitoringThread > fmt_

◆ postModuleEvent()

void evf::FastMonitoringService::postModuleEvent ( edm::StreamContext const &  sc,
edm::ModuleCallingContext const &  mcc 
)

Definition at line 678 of file FastMonitoringService.cc.

References fmt_, evf::FastMonState::mFwkOvhMod, reservedMicroStateNames, edm::StreamContext::streamID(), and edm::StreamID::value().

Referenced by FastMonitoringService().

678  {
679  fmt_->m_data.microstate_[sc.streamID().value()] = &reservedMicroStateNames[FastMonState::mFwkOvhMod];
680  }
static const edm::ModuleDescription reservedMicroStateNames[FastMonState::mCOUNT]
std::shared_ptr< FastMonitoringThread > fmt_

◆ postModuleEventAcquire()

void evf::FastMonitoringService::postModuleEventAcquire ( edm::StreamContext const &  sc,
edm::ModuleCallingContext const &  mcc 
)

Definition at line 667 of file FastMonitoringService.cc.

References fmt_, edm::StreamContext::streamID(), and edm::StreamID::value().

Referenced by FastMonitoringService().

668  {
669  //fmt_->m_data.microstate_[sc.streamID().value()] = (void*)(mcc.moduleDescription());
670  fmt_->m_data.microstateAcqFlag_[sc.streamID().value()] = 1;
671  }
std::shared_ptr< FastMonitoringThread > fmt_

◆ postSourceEvent()

void evf::FastMonitoringService::postSourceEvent ( edm::StreamID  sid)

Definition at line 658 of file FastMonitoringService.cc.

References fmt_, evf::FastMonState::mFwkOvhSrc, reservedMicroStateNames, and edm::StreamID::value().

Referenced by FastMonitoringService().

658  {
659  fmt_->m_data.microstate_[sid.value()] = &reservedMicroStateNames[FastMonState::mFwkOvhSrc];
660  }
static const edm::ModuleDescription reservedMicroStateNames[FastMonState::mCOUNT]
unsigned int value() const
Definition: StreamID.h:43
std::shared_ptr< FastMonitoringThread > fmt_

◆ postStreamBeginLumi()

void evf::FastMonitoringService::postStreamBeginLumi ( edm::StreamContext const &  sc)

Definition at line 617 of file FastMonitoringService.cc.

References fmt_, evf::FastMonState::mIdle, reservedMicroStateNames, edm::StreamContext::streamID(), and edm::StreamID::value().

Referenced by FastMonitoringService().

617  {
618  fmt_->m_data.microstate_[sc.streamID().value()] = &reservedMicroStateNames[FastMonState::mIdle];
619  }
static const edm::ModuleDescription reservedMicroStateNames[FastMonState::mCOUNT]
std::shared_ptr< FastMonitoringThread > fmt_

◆ postStreamEndLumi()

void evf::FastMonitoringService::postStreamEndLumi ( edm::StreamContext const &  sc)

Definition at line 632 of file FastMonitoringService.cc.

References fmt_, evf::FastMonState::mFwkEoL, reservedMicroStateNames, edm::StreamContext::streamID(), and edm::StreamID::value().

Referenced by FastMonitoringService().

632  {
633  fmt_->m_data.microstate_[sc.streamID().value()] = &reservedMicroStateNames[FastMonState::mFwkEoL];
634  }
static const edm::ModuleDescription reservedMicroStateNames[FastMonState::mCOUNT]
std::shared_ptr< FastMonitoringThread > fmt_

◆ preallocate()

void evf::FastMonitoringService::preallocate ( edm::service::SystemBounds const &  bounds)

Definition at line 246 of file FastMonitoringService.cc.

References nStreams_, and nThreads_.

Referenced by FastMonitoringService().

246  {
247  nStreams_ = bounds.maxNumberOfStreams();
248  nThreads_ = bounds.maxNumberOfThreads();
249  //this should already be >=1
250  if (nStreams_ == 0)
251  nStreams_ = 1;
252  if (nThreads_ == 0)
253  nThreads_ = 1;
254  }

◆ preBeginJob()

void evf::FastMonitoringService::preBeginJob ( edm::PathsAndConsumesOfModulesBase const &  pathsInfo,
edm::ProcessContext const &  pc 
)

Definition at line 256 of file FastMonitoringService.cc.

References edm::PathsAndConsumesOfModulesBase::endPaths(), Exception, fastMicrostateDefPath_, fastName_, fastPath_, fastPathList_, filePerFwkStream_, fmt_, mps_fire::i, evf::FastMonState::inCOUNT, inputLegendFileJson_, isInitTransition_, lastGlobalLumi_, LogDebug, lumiFromSource_, evf::FastMonState::mCOUNT, evf::FastMonState::MCOUNT, microstateDefPath_, evf::FastMonState::mInvalid, moduleLegendFile_, moduleLegendFileJson_, monInit_, nopath_, nStreams_, nThreads_, Utilities::operator, castor_dqm_sourceclient_file_cfg::path, pathLegendFile_, pathLegendFileJson_, edm::PathsAndConsumesOfModulesBase::paths(), reservedMicroStateNames, runDirectory_, evf::FastMonState::sInit, sleepTime_, snapshotRunner(), streamCounterUpdating_, threadIDAvailable_, and workingDirectory_.

Referenced by FastMonitoringService().

257  {
258  // FIND RUN DIRECTORY
259  // The run dir should be set via the configuration of EvFDaqDirector
260 
261  if (edm::Service<evf::EvFDaqDirector>().operator->() == nullptr) {
262  throw cms::Exception("FastMonitoringService") << "EvFDaqDirector is not present";
263  }
264  std::filesystem::path runDirectory{edm::Service<evf::EvFDaqDirector>()->baseRunDir()};
265  workingDirectory_ = runDirectory_ = runDirectory;
266  workingDirectory_ /= "mon";
267 
268  if (!std::filesystem::is_directory(workingDirectory_)) {
269  LogDebug("FastMonitoringService") << "<MON> DIR NOT FOUND! Trying to create -: " << workingDirectory_.string();
270  std::filesystem::create_directories(workingDirectory_);
271  if (!std::filesystem::is_directory(workingDirectory_))
272  edm::LogWarning("FastMonitoringService") << "Unable to create <MON> DIR -: " << workingDirectory_.string()
273  << ". No monitoring data will be written.";
274  }
275 
276  std::ostringstream fastFileName;
277 
278  fastFileName << fastName_ << "_pid" << std::setfill('0') << std::setw(5) << getpid() << ".fast";
280  fast /= fastFileName.str();
281  fastPath_ = fast.string();
282  if (filePerFwkStream_)
283  for (unsigned int i = 0; i < nStreams_; i++) {
284  std::ostringstream fastFileNameTid;
285  fastFileNameTid << fastName_ << "_pid" << std::setfill('0') << std::setw(5) << getpid() << "_tid" << i
286  << ".fast";
288  fastTid /= fastFileNameTid.str();
289  fastPathList_.push_back(fastTid.string());
290  }
291 
292  std::ostringstream moduleLegFile;
293  std::ostringstream moduleLegFileJson;
294  moduleLegFile << "microstatelegend_pid" << std::setfill('0') << std::setw(5) << getpid() << ".leg";
295  moduleLegFileJson << "microstatelegend_pid" << std::setfill('0') << std::setw(5) << getpid() << ".jsn";
296  moduleLegendFile_ = (workingDirectory_ / moduleLegFile.str()).string();
297  moduleLegendFileJson_ = (workingDirectory_ / moduleLegFileJson.str()).string();
298 
299  std::ostringstream pathLegFile;
300  std::ostringstream pathLegFileJson;
301  pathLegFile << "pathlegend_pid" << std::setfill('0') << std::setw(5) << getpid() << ".leg";
302  pathLegendFile_ = (workingDirectory_ / pathLegFile.str()).string();
303  pathLegFileJson << "pathlegend_pid" << std::setfill('0') << std::setw(5) << getpid() << ".jsn";
304  pathLegendFileJson_ = (workingDirectory_ / pathLegFileJson.str()).string();
305 
306  std::ostringstream inputLegFileJson;
307  inputLegFileJson << "inputlegend_pid" << std::setfill('0') << std::setw(5) << getpid() << ".jsn";
308  inputLegendFileJson_ = (workingDirectory_ / inputLegFileJson.str()).string();
309 
310  LogDebug("FastMonitoringService") << "Initializing FastMonitor with microstate def path -: " << microstateDefPath_;
311  //<< encPath_.current_ + 1 << " " << encModule_.current_ + 1
312 
313  /*
314  * initialize the fast monitor with:
315  * vector of pointers to monitorable parameters
316  * path to definition
317  *
318  */
319 
320  fmt_->m_data.macrostate_ = FastMonState::sInit;
321 
322  for (unsigned int i = 0; i < (FastMonState::mCOUNT); i++)
323  fmt_->m_data.encModule_.updateReserved(static_cast<const void*>(reservedMicroStateNames + i));
324  fmt_->m_data.encModule_.completeReservedWithDummies();
325 
326  for (unsigned int i = 0; i < nStreams_; i++) {
327  fmt_->m_data.ministate_.emplace_back(&nopath_);
328  fmt_->m_data.microstate_.emplace_back(&reservedMicroStateNames[FastMonState::mInvalid]);
329  fmt_->m_data.microstateAcqFlag_.push_back(0);
330 
331  //for synchronization
332  streamCounterUpdating_.push_back(new std::atomic<bool>(false));
333 
334  //path (mini) state
335  fmt_->m_data.encPath_.emplace_back(0);
336  fmt_->m_data.encPath_[i].update(static_cast<const void*>(&nopath_));
337 
338  for (auto& path : pathsInfo.paths()) {
339  fmt_->m_data.encPath_[i].updatePreinit(path);
340  }
341  for (auto& endPath : pathsInfo.endPaths()) {
342  fmt_->m_data.encPath_[i].updatePreinit(endPath);
343  }
344  }
345  //for (unsigned int i=0;i<nThreads_;i++)
346  // threadMicrostate_.push_back(&reservedMicroStateNames[mInvalid]);
347 
348  //initial size until we detect number of bins
349  fmt_->m_data.macrostateBins_ = FastMonState::MCOUNT;
350  fmt_->m_data.microstateBins_ = 0;
351  fmt_->m_data.inputstateBins_ = FastMonState::inCOUNT;
352  fmt_->m_data.ministateBins_ = fmt_->m_data.encPath_[0].vecsize();
353 
354  lastGlobalLumi_ = 0;
355  isInitTransition_ = true;
356  lumiFromSource_ = 0;
357 
358  //startup monitoring
359  fmt_->resetFastMonitor(microstateDefPath_, fastMicrostateDefPath_);
360  fmt_->jsonMonitor_->setNStreams(nStreams_);
361  fmt_->m_data.registerVariables(fmt_->jsonMonitor_.get(), nStreams_, threadIDAvailable_ ? nThreads_ : 0);
362  monInit_.store(false, std::memory_order_release);
363  if (sleepTime_ > 0)
365 
366  //this definition needs: #include "tbb/compat/thread"
367  //however this would results in TBB imeplementation replacing std::thread
368  //(both supposedly call pthread_self())
369  //number of threads created in process could be obtained from /proc,
370  //assuming that all posix threads are true kernel threads capable of running in parallel
371 
372  //#if TBB_IMPLEMENT_CPP0X
374  //threadIDAvailable_=true;
375  //#endif
376  }
std::atomic< bool > isInitTransition_
std::filesystem::path workingDirectory_
std::vector< std::atomic< bool > * > streamCounterUpdating_
static const edm::ModuleDescription reservedMicroStateNames[FastMonState::mCOUNT]
static const std::string nopath_
std::filesystem::path runDirectory_
std::vector< std::string > fastPathList_
std::shared_ptr< FastMonitoringThread > fmt_
Log< level::Warning, false > LogWarning
#define LogDebug(id)

◆ preEvent()

void evf::FastMonitoringService::preEvent ( edm::StreamContext const &  sc)

Definition at line 640 of file FastMonitoringService.cc.

Referenced by FastMonitoringService().

640 {}

◆ preGlobalBeginLumi()

void evf::FastMonitoringService::preGlobalBeginLumi ( edm::GlobalContext const &  gc)

Definition at line 497 of file FastMonitoringService.cc.

References fmt_, lastGlobalLumi_, CommonMethods::lock(), edm::LuminosityBlockID::luminosityBlock(), edm::GlobalContext::luminosityBlockID(), and lumiStartTime_.

Referenced by FastMonitoringService().

497  {
498  timeval lumiStartTime;
499  gettimeofday(&lumiStartTime, nullptr);
500  unsigned int newLumi = gc.luminosityBlockID().luminosityBlock();
501  lastGlobalLumi_ = newLumi;
502 
503  std::lock_guard<std::mutex> lock(fmt_->monlock_);
504  lumiStartTime_[newLumi] = lumiStartTime;
505  }
std::map< unsigned int, timeval > lumiStartTime_
std::shared_ptr< FastMonitoringThread > fmt_

◆ preGlobalEarlyTermination()

void evf::FastMonitoringService::preGlobalEarlyTermination ( edm::GlobalContext const &  gc,
edm::TerminationOrigin  to 
)

Definition at line 394 of file FastMonitoringService.cc.

References visDQMUpload::context, edm::ExceptionFromAnotherContext, edm::ExceptionFromThisContext, exceptionInLS_, edm::ExternalSignal, fmt_, has_data_exception_, CommonMethods::lock(), edm::LuminosityBlockID::luminosityBlock(), edm::GlobalContext::luminosityBlockID(), and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by FastMonitoringService().

394  {
397  context = " FromThisContext ";
399  context = " FromAnotherContext";
401  context = " FromExternalSignal";
402  edm::LogWarning("FastMonitoringService")
403  << " GLOBAL "
404  << "earlyTermination -: LS:" << gc.luminosityBlockID().luminosityBlock() << " " << context;
405  std::lock_guard<std::mutex> lock(fmt_->monlock_);
406  exceptionInLS_.push_back(gc.luminosityBlockID().luminosityBlock());
407  has_data_exception_.store(true);
408  }
std::atomic< bool > has_data_exception_
std::shared_ptr< FastMonitoringThread > fmt_
Log< level::Warning, false > LogWarning
std::vector< unsigned int > exceptionInLS_

◆ preGlobalEndLumi()

void evf::FastMonitoringService::preGlobalEndLumi ( edm::GlobalContext const &  gc)

Definition at line 507 of file FastMonitoringService.cc.

References accuSize_, daqInputSource_, doSnapshot(), Exception, exception_detected_, exceptionInLS_, filePerFwkStream_, fmt_, DAQSource::getEventReport(), FedRawDataInputSource::getEventReport(), inputSource_, CommonMethods::lock(), LogDebug, BXlumiParameters_cfi::lumi, edm::LuminosityBlockID::luminosityBlock(), edm::GlobalContext::luminosityBlockID(), lumiStartTime_, convertSQLitetoXML_cfg::output, castor_dqm_sourceclient_file_cfg::path, processedEventsPerLumi_, edm::shutdown_flag, sleepTime_, slowName_, throughputFactor(), jsoncollector::IntJ::value(), and workingDirectory_.

Referenced by FastMonitoringService().

507  {
508  unsigned int lumi = gc.luminosityBlockID().luminosityBlock();
509  LogDebug("FastMonitoringService") << "Lumi ended. Writing JSON information. LUMI -: " << lumi;
510  timeval lumiStopTime;
511  gettimeofday(&lumiStopTime, nullptr);
512 
513  std::lock_guard<std::mutex> lock(fmt_->monlock_);
514 
515  // Compute throughput
516  timeval stt = lumiStartTime_[lumi];
517  lumiStartTime_.erase(lumi);
518  unsigned long usecondsForLumi = (lumiStopTime.tv_sec - stt.tv_sec) * 1000000 + (lumiStopTime.tv_usec - stt.tv_usec);
519  unsigned long accuSize = accuSize_.find(lumi) == accuSize_.end() ? 0 : accuSize_[lumi];
520  accuSize_.erase(lumi);
521  double throughput = throughputFactor() * double(accuSize) / double(usecondsForLumi);
522  //store to registered variable
523  fmt_->m_data.fastThroughputJ_.value() = throughput;
524 
525  //update
526  doSnapshot(lumi, true);
527 
528  //retrieve one result we need (todo: sanity check if it's found)
529  IntJ* lumiProcessedJptr = dynamic_cast<IntJ*>(fmt_->jsonMonitor_->getMergedIntJForLumi("Processed", lumi));
530  if (!lumiProcessedJptr)
531  throw cms::Exception("FastMonitoringService") << "Internal error: got null pointer from FastMonitor";
532  processedEventsPerLumi_[lumi] = std::pair<unsigned int, bool>(lumiProcessedJptr->value(), false);
533 
534  //checking if exception has been thrown (in case of Global/Stream early termination, for this LS)
535  bool exception_detected = exception_detected_;
536  for (auto ex : exceptionInLS_)
537  if (lumi == ex)
538  exception_detected = true;
539 
540  if (edm::shutdown_flag || exception_detected) {
541  edm::LogInfo("FastMonitoringService")
542  << "Run interrupted. Skip writing EoL information -: " << processedEventsPerLumi_[lumi].first
543  << " events were processed in LUMI " << lumi;
544  //this will prevent output modules from producing json file for possibly incomplete lumi
545  processedEventsPerLumi_[lumi].first = 0;
546  processedEventsPerLumi_[lumi].second = true;
547  //disable this exception, so service can be used standalone (will be thrown if output module asks for this information)
548  //throw cms::Exception("FastMonitoringService") << "SOURCE did not send update for lumi block. LUMI -:" << lumi;
549  return;
550  }
551 
552  if (inputSource_ || daqInputSource_) {
553  auto sourceReport =
555  if (sourceReport.first) {
556  if (sourceReport.second != processedEventsPerLumi_[lumi].first) {
557  throw cms::Exception("FastMonitoringService") << "MISMATCH with SOURCE update. LUMI -: " << lumi
558  << ", events(processed):" << processedEventsPerLumi_[lumi].first
559  << " events(source):" << sourceReport.second;
560  }
561  }
562  }
563 
564  edm::LogInfo("FastMonitoringService")
565  << "Statistics for lumisection -: lumi = " << lumi << " events = " << lumiProcessedJptr->value()
566  << " time = " << usecondsForLumi / 1000000 << " size = " << accuSize << " thr = " << throughput;
567  delete lumiProcessedJptr;
568 
569  //full global and stream merge&output for this lumi
570 
571  // create file name for slow monitoring file
572  bool output = sleepTime_ > 0;
573  if (filePerFwkStream_) {
574  std::stringstream slowFileNameStem;
575  slowFileNameStem << slowName_ << "_ls" << std::setfill('0') << std::setw(4) << lumi << "_pid" << std::setfill('0')
576  << std::setw(5) << getpid();
578  slow /= slowFileNameStem.str();
579  fmt_->jsonMonitor_->outputFullJSONs(slow.string(), ".jsn", lumi, output);
580  } else {
581  std::stringstream slowFileName;
582  slowFileName << slowName_ << "_ls" << std::setfill('0') << std::setw(4) << lumi << "_pid" << std::setfill('0')
583  << std::setw(5) << getpid() << ".jsn";
585  slow /= slowFileName.str();
586  //full global and stream merge and JSON write for this lumi
587  fmt_->jsonMonitor_->outputFullJSON(slow.string(), lumi, output);
588  }
589  fmt_->jsonMonitor_->discardCollected(lumi); //we don't do further updates for this lumi
590  }
std::map< unsigned int, timeval > lumiStartTime_
std::pair< bool, unsigned int > getEventReport(unsigned int lumi, bool erase)
std::map< unsigned int, unsigned long > accuSize_
std::filesystem::path workingDirectory_
volatile std::atomic< bool > shutdown_flag
std::pair< bool, unsigned int > getEventReport(unsigned int lumi, bool erase)
Definition: DAQSource.cc:1378
std::map< unsigned int, std::pair< unsigned int, bool > > processedEventsPerLumi_
void doSnapshot(const unsigned int ls, const bool isGlobalEOL)
constexpr double throughputFactor()
Log< level::Info, false > LogInfo
FedRawDataInputSource * inputSource_
Definition: output.py:1
std::shared_ptr< FastMonitoringThread > fmt_
std::vector< unsigned int > exceptionInLS_
#define LogDebug(id)

◆ preModuleBeginJob()

void evf::FastMonitoringService::preModuleBeginJob ( edm::ModuleDescription const &  desc)

Definition at line 454 of file FastMonitoringService.cc.

References submitPVResolutionJobs::desc, fmt_, CommonMethods::lock(), and nOutputModules_.

Referenced by FastMonitoringService().

454  {
455  std::lock_guard<std::mutex> lock(fmt_->monlock_);
456  //std::cout << " Pre module Begin Job module: " << desc.moduleName() << std::endl;
457 
458  //build a map of modules keyed by their module description address
459  //here we need to treat output modules in a special way so they can be easily singled out
460  if (desc.moduleName() == "Stream" || desc.moduleName() == "GlobalEvFOutputModule" ||
461  desc.moduleName() == "EvFOutputModule" || desc.moduleName() == "EventStreamFileWriter" ||
462  desc.moduleName() == "PoolOutputModule") {
463  fmt_->m_data.encModule_.updateReserved((void*)&desc);
464  nOutputModules_++;
465  } else
466  fmt_->m_data.encModule_.update((void*)&desc);
467  }
std::shared_ptr< FastMonitoringThread > fmt_

◆ preModuleEvent()

void evf::FastMonitoringService::preModuleEvent ( edm::StreamContext const &  sc,
edm::ModuleCallingContext const &  mcc 
)

Definition at line 673 of file FastMonitoringService.cc.

References fmt_, edm::ModuleCallingContext::moduleDescription(), edm::StreamContext::streamID(), and edm::StreamID::value().

Referenced by FastMonitoringService().

673  {
674  fmt_->m_data.microstate_[sc.streamID().value()] = (void*)(mcc.moduleDescription());
675  fmt_->m_data.microstateAcqFlag_[sc.streamID().value()] = 0;
676  }
std::shared_ptr< FastMonitoringThread > fmt_

◆ preModuleEventAcquire()

void evf::FastMonitoringService::preModuleEventAcquire ( edm::StreamContext const &  sc,
edm::ModuleCallingContext const &  mcc 
)

Definition at line 662 of file FastMonitoringService.cc.

References fmt_, edm::ModuleCallingContext::moduleDescription(), edm::StreamContext::streamID(), and edm::StreamID::value().

Referenced by FastMonitoringService().

663  {
664  fmt_->m_data.microstate_[sc.streamID().value()] = (void*)(mcc.moduleDescription());
665  }
std::shared_ptr< FastMonitoringThread > fmt_

◆ prePathEvent()

void evf::FastMonitoringService::prePathEvent ( edm::StreamContext const &  sc,
edm::PathContext const &  pc 
)

Definition at line 636 of file FastMonitoringService.cc.

References fmt_, edm::PathContext::pathName(), and edm::StreamContext::streamID().

Referenced by FastMonitoringService().

636  {
637  fmt_->m_data.ministate_[sc.streamID()] = &(pc.pathName());
638  }
std::shared_ptr< FastMonitoringThread > fmt_

◆ preSourceEarlyTermination()

void evf::FastMonitoringService::preSourceEarlyTermination ( edm::TerminationOrigin  to)

Definition at line 410 of file FastMonitoringService.cc.

References visDQMUpload::context, exception_detected_, edm::ExceptionFromAnotherContext, edm::ExceptionFromThisContext, edm::ExternalSignal, fmt_, has_data_exception_, has_source_exception_, CommonMethods::lock(), and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by FastMonitoringService().

410  {
413  context = " FromThisContext ";
415  context = " FromAnotherContext";
417  context = " FromExternalSignal";
418  edm::LogWarning("FastMonitoringService") << " SOURCE "
419  << "earlyTermination -: " << context;
420  std::lock_guard<std::mutex> lock(fmt_->monlock_);
421  exception_detected_ = true;
422  has_source_exception_.store(true);
423  has_data_exception_.store(true);
424  }
std::atomic< bool > has_data_exception_
std::atomic< bool > has_source_exception_
std::shared_ptr< FastMonitoringThread > fmt_
Log< level::Warning, false > LogWarning

◆ preSourceEvent()

void evf::FastMonitoringService::preSourceEvent ( edm::StreamID  sid)

Definition at line 654 of file FastMonitoringService.cc.

References fmt_, evf::FastMonState::mInput, reservedMicroStateNames, and edm::StreamID::value().

Referenced by FastMonitoringService().

654  {
655  fmt_->m_data.microstate_[sid.value()] = &reservedMicroStateNames[FastMonState::mInput];
656  }
static const edm::ModuleDescription reservedMicroStateNames[FastMonState::mCOUNT]
unsigned int value() const
Definition: StreamID.h:43
std::shared_ptr< FastMonitoringThread > fmt_

◆ preStreamBeginLumi()

void evf::FastMonitoringService::preStreamBeginLumi ( edm::StreamContext const &  sc)

Definition at line 604 of file FastMonitoringService.cc.

References edm::StreamContext::eventID(), fmt_, CommonMethods::lock(), edm::EventID::luminosityBlock(), evf::FastMonState::mBoL, nopath_, reservedMicroStateNames, edm::StreamContext::streamID(), and edm::StreamID::value().

Referenced by FastMonitoringService().

604  {
605  unsigned int sid = sc.streamID().value();
606 
607  std::lock_guard<std::mutex> lock(fmt_->monlock_);
608  fmt_->m_data.streamLumi_[sid] = sc.eventID().luminosityBlock();
609 
610  //reset collected values for this stream
611  *(fmt_->m_data.processed_[sid]) = 0;
612 
613  fmt_->m_data.ministate_[sid] = &nopath_;
614  fmt_->m_data.microstate_[sid] = &reservedMicroStateNames[FastMonState::mBoL];
615  }
static const edm::ModuleDescription reservedMicroStateNames[FastMonState::mCOUNT]
static const std::string nopath_
std::shared_ptr< FastMonitoringThread > fmt_

◆ preStreamEarlyTermination()

void evf::FastMonitoringService::preStreamEarlyTermination ( edm::StreamContext const &  sc,
edm::TerminationOrigin  to 
)

Definition at line 378 of file FastMonitoringService.cc.

References visDQMUpload::context, edm::StreamContext::eventID(), edm::ExceptionFromAnotherContext, edm::ExceptionFromThisContext, exceptionInLS_, edm::ExternalSignal, fmt_, has_data_exception_, CommonMethods::lock(), edm::EventID::luminosityBlock(), edm::StreamContext::streamID(), AlCaHLTBitMon_QueryRunRegistry::string, and edm::StreamID::value().

Referenced by FastMonitoringService().

378  {
381  context = " FromThisContext ";
383  context = " FromAnotherContext";
385  context = " FromExternalSignal";
386  edm::LogWarning("FastMonitoringService")
387  << " STREAM " << sc.streamID().value() << " earlyTermination -: ID:" << sc.eventID()
388  << " LS:" << sc.eventID().luminosityBlock() << " " << context;
389  std::lock_guard<std::mutex> lock(fmt_->monlock_);
390  exceptionInLS_.push_back(sc.eventID().luminosityBlock());
391  has_data_exception_.store(true);
392  }
std::atomic< bool > has_data_exception_
std::shared_ptr< FastMonitoringThread > fmt_
Log< level::Warning, false > LogWarning
std::vector< unsigned int > exceptionInLS_

◆ preStreamEndLumi()

void evf::FastMonitoringService::preStreamEndLumi ( edm::StreamContext const &  sc)

Definition at line 621 of file FastMonitoringService.cc.

References edm::StreamContext::eventID(), fmt_, CommonMethods::lock(), edm::EventID::luminosityBlock(), evf::FastMonState::mEoL, nopath_, reservedMicroStateNames, edm::StreamContext::streamID(), and edm::StreamID::value().

Referenced by FastMonitoringService().

621  {
622  unsigned int sid = sc.streamID().value();
623  std::lock_guard<std::mutex> lock(fmt_->monlock_);
624 
625  //update processed count to be complete at this time
626  //doStreamEOLSnapshot(sc.eventID().luminosityBlock(), sid);
627  fmt_->jsonMonitor_->snapStreamAtomic(sc.eventID().luminosityBlock(), sid);
628  //reset this in case stream does not get notified of next lumi (we keep processed events only)
629  fmt_->m_data.ministate_[sid] = &nopath_;
630  fmt_->m_data.microstate_[sid] = &reservedMicroStateNames[FastMonState::mEoL];
631  }
static const edm::ModuleDescription reservedMicroStateNames[FastMonState::mCOUNT]
static const std::string nopath_
std::shared_ptr< FastMonitoringThread > fmt_

◆ reportLockWait()

void evf::FastMonitoringService::reportLockWait ( unsigned int  ls,
double  waitTime,
unsigned int  lockCount 
)

Definition at line 747 of file FastMonitoringService.cc.

References fmt_, CommonMethods::lock(), lockStatsDuringLumi_, and eostools::ls().

Referenced by FedRawDataInputSource::readSupervisor(), and DAQSource::readSupervisor().

747  {
748  std::lock_guard<std::mutex> lock(fmt_->monlock_);
749  lockStatsDuringLumi_[ls] = std::pair<double, unsigned int>(waitTime, lockCount);
750  }
def ls(path, rec=False)
Definition: eostools.py:349
std::map< unsigned int, std::pair< double, unsigned int > > lockStatsDuringLumi_
std::shared_ptr< FastMonitoringThread > fmt_

◆ setExceptionDetected()

void evf::FastMonitoringService::setExceptionDetected ( unsigned int  ls)

Definition at line 426 of file FastMonitoringService.cc.

References exception_detected_, exceptionInLS_, fmt_, CommonMethods::lock(), and eostools::ls().

Referenced by FedRawDataInputSource::getNextEvent(), and DAQSource::getNextEventFromDataBlock().

426  {
427  std::lock_guard<std::mutex> lock(fmt_->monlock_);
428  if (!ls)
429  exception_detected_ = true;
430  else
431  exceptionInLS_.push_back(ls);
432  }
def ls(path, rec=False)
Definition: eostools.py:349
std::shared_ptr< FastMonitoringThread > fmt_
std::vector< unsigned int > exceptionInLS_

◆ setInputSource() [1/2]

void evf::FastMonitoringService::setInputSource ( FedRawDataInputSource inputSource)
inline

Definition at line 222 of file FastMonitoringService.h.

References inputSource_.

Referenced by DAQSource::DAQSource(), and FedRawDataInputSource::FedRawDataInputSource().

222 { inputSource_ = inputSource; }
FedRawDataInputSource * inputSource_

◆ setInputSource() [2/2]

void evf::FastMonitoringService::setInputSource ( DAQSource inputSource)
inline

Definition at line 223 of file FastMonitoringService.h.

References daqInputSource_.

223 { daqInputSource_ = inputSource; }

◆ setInState()

void evf::FastMonitoringService::setInState ( FastMonState::InputState  inputState)
inline

◆ setInStateSup()

void evf::FastMonitoringService::setInStateSup ( FastMonState::InputState  inputState)
inline

◆ setMicroState() [1/2]

void evf::FastMonitoringService::setMicroState ( FastMonState::Microstate  m)

Definition at line 686 of file FastMonitoringService.cc.

References fmt_, mps_fire::i, visualization-live-secondInstance_cfg::m, nStreams_, and reservedMicroStateNames.

686  {
687  for (unsigned int i = 0; i < nStreams_; i++)
688  fmt_->m_data.microstate_[i] = &reservedMicroStateNames[m];
689  }
static const edm::ModuleDescription reservedMicroStateNames[FastMonState::mCOUNT]
std::shared_ptr< FastMonitoringThread > fmt_

◆ setMicroState() [2/2]

void evf::FastMonitoringService::setMicroState ( edm::StreamID  sid,
FastMonState::Microstate  m 
)

Definition at line 692 of file FastMonitoringService.cc.

References fmt_, visualization-live-secondInstance_cfg::m, and reservedMicroStateNames.

692  {
693  fmt_->m_data.microstate_[sid] = &reservedMicroStateNames[m];
694  }
static const edm::ModuleDescription reservedMicroStateNames[FastMonState::mCOUNT]
std::shared_ptr< FastMonitoringThread > fmt_

◆ shouldWriteFiles()

bool evf::FastMonitoringService::shouldWriteFiles ( unsigned int  lumi,
unsigned int *  proc = nullptr 
)
inline

Definition at line 215 of file FastMonitoringService.h.

References getAbortFlagForLumi(), getEventsProcessedForLumi(), and ValidateTausOnZEEFastSim_cff::proc.

Referenced by L1TriggerJSONMonitoring::globalEndLuminosityBlockSummary(), and HLTriggerJSONMonitoring::globalEndLuminosityBlockSummary().

215  {
216  unsigned int processed = getEventsProcessedForLumi(lumi);
217  if (proc)
218  *proc = processed;
219  return !getAbortFlagForLumi(lumi);
220  }
unsigned int getEventsProcessedForLumi(unsigned int lumi, bool *abortFlag=nullptr)
bool getAbortFlagForLumi(unsigned int lumi)

◆ snapshotRunner()

void evf::FastMonitoringService::snapshotRunner ( )
private

Definition at line 787 of file FastMonitoringService.cc.

References doSnapshot(), relativeConstraints::empty, f, fastMonIntervals_, fastPath_, fastPathList_, filePerFwkStream_, fmt_, mps_fire::i, inputState_, inputStateNames, inputSupervisorState_, lastGlobalLumi_, CommonMethods::lock(), monInit_, mps_check::msg, nStreams_, AlCaHLTBitMon_ParallelJobs::p, sleepTime_, snapCounter_, AlCaHLTBitMon_QueryRunRegistry::string, and verbose_.

Referenced by preBeginJob().

787  {
788  monInit_.exchange(true, std::memory_order_acquire);
789  while (!fmt_->m_stoprequest) {
790  std::vector<std::vector<unsigned int>> lastEnc;
791  {
792  std::unique_lock<std::mutex> lock(fmt_->monlock_);
793 
794  doSnapshot(lastGlobalLumi_, false);
795 
796  lastEnc.emplace_back(fmt_->m_data.ministateEncoded_);
797  lastEnc.emplace_back(fmt_->m_data.microstateEncoded_);
798 
800  if (filePerFwkStream_) {
801  std::vector<std::string> CSVv;
802  for (unsigned int i = 0; i < nStreams_; i++) {
803  CSVv.push_back(fmt_->jsonMonitor_->getCSVString((int)i));
804  }
805  // release mutex before writing out fast path file
806  lock.release()->unlock();
807  for (unsigned int i = 0; i < nStreams_; i++) {
808  if (!CSVv[i].empty())
809  fmt_->jsonMonitor_->outputCSV(fastPathList_[i], CSVv[i]);
810  }
811  } else {
812  std::string CSV = fmt_->jsonMonitor_->getCSVString();
813  // release mutex before writing out fast path file
814  lock.release()->unlock();
815  if (!CSV.empty())
816  fmt_->jsonMonitor_->outputCSV(fastPath_, CSV);
817  }
818  }
819  snapCounter_++;
820  }
821 
822  if (verbose_) {
823  edm::LogInfo msg("FastMonitoringService");
824  auto f = [&](std::vector<unsigned int> const& p) {
825  for (unsigned int i = 0; i < nStreams_; i++) {
826  if (i == 0)
827  msg << "[" << p[i] << ",";
828  else if (i <= nStreams_ - 1)
829  msg << p[i] << ",";
830  else
831  msg << p[i] << "]";
832  }
833  };
834 
835  msg << "Current states: Ms=" << fmt_->m_data.fastMacrostateJ_.value() << " ms=";
836  f(lastEnc[0]);
837  msg << " us=";
838  f(lastEnc[1]);
840  }
841 
842  ::sleep(sleepTime_);
843  }
844  }
std::atomic< FastMonState::InputState > inputState_
static const std::string inputStateNames[FastMonState::inCOUNT]
void doSnapshot(const unsigned int ls, const bool isGlobalEOL)
std::atomic< FastMonState::InputState > inputSupervisorState_
double f[11][100]
std::vector< std::string > fastPathList_
tuple msg
Definition: mps_check.py:286
std::shared_ptr< FastMonitoringThread > fmt_

◆ startedLookingForFile()

void evf::FastMonitoringService::startedLookingForFile ( )

Definition at line 711 of file FastMonitoringService.cc.

References fileLookStart_.

Referenced by FedRawDataInputSource::readSupervisor(), and DAQSource::readSupervisor().

711  {
712  gettimeofday(&fileLookStart_, nullptr);
713  /*
714  std::cout << "Started looking for .raw file at: s=" << fileLookStart_.tv_sec << ": ms = "
715  << fileLookStart_.tv_usec / 1000.0 << std::endl;
716  */
717  }

◆ stoppedLookingForFile()

void evf::FastMonitoringService::stoppedLookingForFile ( unsigned int  lumi)

Definition at line 719 of file FastMonitoringService.cc.

References avgLeadTime_, fileLookStart_, fileLookStop_, fmt_, mps_fire::i, leadTimes_, CommonMethods::lock(), BXlumiParameters_cfi::lumi, and lumiFromSource_.

Referenced by FedRawDataInputSource::readSupervisor(), and DAQSource::readSupervisor().

719  {
720  gettimeofday(&fileLookStop_, nullptr);
721  /*
722  std::cout << "Stopped looking for .raw file at: s=" << fileLookStop_.tv_sec << ": ms = "
723  << fileLookStop_.tv_usec / 1000.0 << std::endl;
724  */
725  std::lock_guard<std::mutex> lock(fmt_->monlock_);
726 
727  if (lumi > lumiFromSource_) {
729  leadTimes_.clear();
730  }
731  unsigned long elapsedTime = (fileLookStop_.tv_sec - fileLookStart_.tv_sec) * 1000000 // sec to us
732  + (fileLookStop_.tv_usec - fileLookStart_.tv_usec); // us
733  // add this to lead times for this lumi
734  leadTimes_.push_back((double)elapsedTime);
735 
736  // recompute average lead time for this lumi
737  if (leadTimes_.size() == 1)
739  else {
740  double totTime = 0;
741  for (unsigned int i = 0; i < leadTimes_.size(); i++)
742  totTime += leadTimes_[i];
743  avgLeadTime_[lumi] = 0.001 * (totTime / leadTimes_.size());
744  }
745  }
std::map< unsigned int, double > avgLeadTime_
std::vector< double > leadTimes_
std::shared_ptr< FastMonitoringThread > fmt_

Member Data Documentation

◆ accuSize_

std::map<unsigned int, unsigned long> evf::FastMonitoringService::accuSize_
private

Definition at line 264 of file FastMonitoringService.h.

Referenced by accumulateFileSize(), and preGlobalEndLumi().

◆ avgLeadTime_

std::map<unsigned int, double> evf::FastMonitoringService::avgLeadTime_
private

Definition at line 261 of file FastMonitoringService.h.

Referenced by doSnapshot(), postGlobalEndLumi(), and stoppedLookingForFile().

◆ collectedPathList_

std::vector<std::atomic<bool>*> evf::FastMonitoringService::collectedPathList_
private

Definition at line 275 of file FastMonitoringService.h.

◆ daqInputSource_

DAQSource* evf::FastMonitoringService::daqInputSource_ = nullptr
private

Definition at line 237 of file FastMonitoringService.h.

Referenced by preGlobalEndLumi(), and setInputSource().

◆ exception_detected_

bool evf::FastMonitoringService::exception_detected_ = false
private

◆ exceptionInLS_

std::vector<unsigned int> evf::FastMonitoringService::exceptionInLS_
private

◆ fastMicrostateDefPath_

std::string evf::FastMonitoringService::fastMicrostateDefPath_
private

Definition at line 246 of file FastMonitoringService.h.

Referenced by FastMonitoringService(), and preBeginJob().

◆ fastMonIntervals_

unsigned int evf::FastMonitoringService::fastMonIntervals_
private

Definition at line 244 of file FastMonitoringService.h.

Referenced by snapshotRunner().

◆ fastName_

std::string evf::FastMonitoringService::fastName_
private

Definition at line 247 of file FastMonitoringService.h.

Referenced by preBeginJob().

◆ fastPath_

std::string evf::FastMonitoringService::fastPath_
private

Definition at line 247 of file FastMonitoringService.h.

Referenced by preBeginJob(), and snapshotRunner().

◆ fastPathList_

std::vector<std::string> evf::FastMonitoringService::fastPathList_
private

Definition at line 296 of file FastMonitoringService.h.

Referenced by preBeginJob(), and snapshotRunner().

◆ fileLookStart_

timeval evf::FastMonitoringService::fileLookStart_
private

Definition at line 253 of file FastMonitoringService.h.

Referenced by startedLookingForFile(), and stoppedLookingForFile().

◆ fileLookStop_

timeval evf::FastMonitoringService::fileLookStop_
private

Definition at line 253 of file FastMonitoringService.h.

Referenced by stoppedLookingForFile().

◆ filePerFwkStream_

bool evf::FastMonitoringService::filePerFwkStream_
private

Definition at line 248 of file FastMonitoringService.h.

Referenced by preBeginJob(), preGlobalEndLumi(), and snapshotRunner().

◆ filesProcessedDuringLumi_

std::map<unsigned int, unsigned int> evf::FastMonitoringService::filesProcessedDuringLumi_
private

Definition at line 262 of file FastMonitoringService.h.

Referenced by accumulateFileSize(), doSnapshot(), and postGlobalEndLumi().

◆ fmt_

std::shared_ptr<FastMonitoringThread> evf::FastMonitoringService::fmt_
private

◆ has_data_exception_

std::atomic<bool> evf::FastMonitoringService::has_data_exception_ = false
private

◆ has_source_exception_

std::atomic<bool> evf::FastMonitoringService::has_source_exception_ = false
private

◆ inputLegendFileJson_

std::string evf::FastMonitoringService::inputLegendFileJson_
private

Definition at line 288 of file FastMonitoringService.h.

Referenced by postBeginJob(), and preBeginJob().

◆ inputSource_

FedRawDataInputSource* evf::FastMonitoringService::inputSource_ = nullptr
private

Definition at line 236 of file FastMonitoringService.h.

Referenced by preGlobalEndLumi(), and setInputSource().

◆ inputState_

std::atomic<FastMonState::InputState> evf::FastMonitoringService::inputState_ {FastMonState::InputState::inInit}
private

Definition at line 238 of file FastMonitoringService.h.

Referenced by doSnapshot(), setInState(), and snapshotRunner().

◆ inputStateNames

const std::string evf::FastMonitoringService::inputStateNames
static

Definition at line 161 of file FastMonitoringService.h.

Referenced by makeInputLegendaJson(), and snapshotRunner().

◆ inputSupervisorState_

std::atomic<FastMonState::InputState> evf::FastMonitoringService::inputSupervisorState_ {FastMonState::InputState::inInit}
private

Definition at line 239 of file FastMonitoringService.h.

Referenced by doSnapshot(), setInStateSup(), and snapshotRunner().

◆ isInitTransition_

std::atomic<bool> evf::FastMonitoringService::isInitTransition_
private

Definition at line 256 of file FastMonitoringService.h.

Referenced by doSnapshot(), postGlobalBeginRun(), and preBeginJob().

◆ lastGlobalLumi_

unsigned int evf::FastMonitoringService::lastGlobalLumi_
private

Definition at line 255 of file FastMonitoringService.h.

Referenced by preBeginJob(), preGlobalBeginLumi(), and snapshotRunner().

◆ leadTimes_

std::vector<double> evf::FastMonitoringService::leadTimes_
private

Definition at line 265 of file FastMonitoringService.h.

Referenced by stoppedLookingForFile().

◆ lockStatsDuringLumi_

std::map<unsigned int, std::pair<double, unsigned int> > evf::FastMonitoringService::lockStatsDuringLumi_
private

Definition at line 266 of file FastMonitoringService.h.

Referenced by doSnapshot(), postGlobalEndLumi(), and reportLockWait().

◆ lumiFromSource_

unsigned int evf::FastMonitoringService::lumiFromSource_
private

Definition at line 257 of file FastMonitoringService.h.

Referenced by preBeginJob(), and stoppedLookingForFile().

◆ lumiStartTime_

std::map<unsigned int, timeval> evf::FastMonitoringService::lumiStartTime_
private

Definition at line 252 of file FastMonitoringService.h.

Referenced by preGlobalBeginLumi(), and preGlobalEndLumi().

◆ macroStateNames

const std::string evf::FastMonitoringService::macroStateNames
static
Initial value:
= {"Init",
"JobReady",
"RunGiven",
"Running",
"Stopping",
"Done",
"JobEnded",
"Error",
"ErrorEnded",
"End",
"Invalid"}

Definition at line 160 of file FastMonitoringService.h.

◆ microstateDefPath_

std::string evf::FastMonitoringService::microstateDefPath_
private

Definition at line 246 of file FastMonitoringService.h.

Referenced by FastMonitoringService(), and preBeginJob().

◆ moduleLegendFile_

std::string evf::FastMonitoringService::moduleLegendFile_
private

Definition at line 284 of file FastMonitoringService.h.

Referenced by preBeginJob().

◆ moduleLegendFileJson_

std::string evf::FastMonitoringService::moduleLegendFileJson_
private

Definition at line 285 of file FastMonitoringService.h.

Referenced by postBeginJob(), and preBeginJob().

◆ monInit_

std::atomic<bool> evf::FastMonitoringService::monInit_
private

Definition at line 291 of file FastMonitoringService.h.

Referenced by preBeginJob(), and snapshotRunner().

◆ nopath_

const std::string evf::FastMonitoringService::nopath_ = "NoPath"
static

◆ nOutputModules_

unsigned int evf::FastMonitoringService::nOutputModules_ = 0
private

Definition at line 289 of file FastMonitoringService.h.

Referenced by makeModuleLegendaJson(), and preModuleBeginJob().

◆ nStreams_

unsigned int evf::FastMonitoringService::nStreams_
private

◆ nThreads_

unsigned int evf::FastMonitoringService::nThreads_
private

Definition at line 242 of file FastMonitoringService.h.

Referenced by preallocate(), and preBeginJob().

◆ pathLegendFile_

std::string evf::FastMonitoringService::pathLegendFile_
private

Definition at line 286 of file FastMonitoringService.h.

Referenced by preBeginJob().

◆ pathLegendFileJson_

std::string evf::FastMonitoringService::pathLegendFileJson_
private

Definition at line 287 of file FastMonitoringService.h.

Referenced by postBeginJob(), and preBeginJob().

◆ pathNamesReady_

std::vector<bool> evf::FastMonitoringService::pathNamesReady_
private

Definition at line 276 of file FastMonitoringService.h.

◆ processedEventsPerLumi_

std::map<unsigned int, std::pair<unsigned int, bool> > evf::FastMonitoringService::processedEventsPerLumi_
private

◆ reservedMicroStateNames

const edm::ModuleDescription evf::FastMonitoringService::reservedMicroStateNames
static
Initial value:
= {
edm::ModuleDescription("Dummy", "Invalid"),
edm::ModuleDescription("Dummy", "Idle"),
edm::ModuleDescription("Dummy", "FwkOvhSrc"),
edm::ModuleDescription("Dummy", "FwkOvhMod"),
edm::ModuleDescription("Dummy", "FwkEoL"),
edm::ModuleDescription("Dummy", "Input"),
edm::ModuleDescription("Dummy", "DQM"),
edm::ModuleDescription("Dummy", "BoL"),
edm::ModuleDescription("Dummy", "EoL"),
edm::ModuleDescription("Dummy", "GlobalEoL")}

Definition at line 159 of file FastMonitoringService.h.

Referenced by doSnapshot(), postEvent(), postModuleEvent(), postSourceEvent(), postStreamBeginLumi(), postStreamEndLumi(), preBeginJob(), preSourceEvent(), preStreamBeginLumi(), preStreamEndLumi(), and setMicroState().

◆ runDirectory_

std::filesystem::path evf::FastMonitoringService::runDirectory_
private

Definition at line 278 of file FastMonitoringService.h.

Referenced by getRunDirName(), and preBeginJob().

◆ sleepTime_

int evf::FastMonitoringService::sleepTime_
private

Definition at line 243 of file FastMonitoringService.h.

Referenced by preBeginJob(), preGlobalEndLumi(), and snapshotRunner().

◆ slowName_

std::string evf::FastMonitoringService::slowName_
private

Definition at line 247 of file FastMonitoringService.h.

Referenced by preGlobalEndLumi().

◆ snapCounter_

unsigned int evf::FastMonitoringService::snapCounter_ = 0
private

Definition at line 245 of file FastMonitoringService.h.

Referenced by snapshotRunner().

◆ streamCounterUpdating_

std::vector<std::atomic<bool>*> evf::FastMonitoringService::streamCounterUpdating_
private

Definition at line 273 of file FastMonitoringService.h.

Referenced by preBeginJob().

◆ threadIDAvailable_

bool evf::FastMonitoringService::threadIDAvailable_ = false
private

Definition at line 280 of file FastMonitoringService.h.

Referenced by preBeginJob().

◆ totalEventsProcessed_

std::atomic<unsigned long> evf::FastMonitoringService::totalEventsProcessed_
private

Definition at line 282 of file FastMonitoringService.h.

Referenced by postEvent().

◆ verbose_

bool evf::FastMonitoringService::verbose_ = false
private

Definition at line 298 of file FastMonitoringService.h.

Referenced by snapshotRunner().

◆ workingDirectory_

std::filesystem::path evf::FastMonitoringService::workingDirectory_
private

Definition at line 278 of file FastMonitoringService.h.

Referenced by preBeginJob(), and preGlobalEndLumi().