CMS 3D CMS Logo

List of all members | Public Member Functions | Static Public Member Functions | Static Public Attributes | Private Member Functions | Static Private Member Functions | Private Attributes
evf::FastMonitoringService Class Reference

#include <FastMonitoringService.h>

Inheritance diagram for evf::FastMonitoringService:
evf::MicroStateService

Public Member Functions

void accumulateFileSize (unsigned int lumi, unsigned long fileSize)
 
bool exceptionDetected () const
 
 FastMonitoringService (const edm::ParameterSet &, edm::ActivityRegistry &)
 
bool getAbortFlagForLumi (unsigned int lumi)
 
unsigned int getEventsProcessedForLumi (unsigned int lumi, bool *abortFlag=nullptr)
 
std::string getRunDirName () const
 
bool isExceptionOnData (unsigned int ls)
 
void jobFailure ()
 
std::string makeInputLegendaJson ()
 
std::string makeModuleLegendaJson ()
 
void postBeginJob ()
 
void postEndJob ()
 
void postEvent (edm::StreamContext const &)
 
void postGlobalBeginRun (edm::GlobalContext const &)
 
void postGlobalEndLumi (edm::GlobalContext const &)
 
void postModuleEvent (edm::StreamContext const &, edm::ModuleCallingContext const &)
 
void postModuleEventAcquire (edm::StreamContext const &, edm::ModuleCallingContext const &)
 
void postSourceEvent (edm::StreamID)
 
void postStreamBeginLumi (edm::StreamContext const &)
 
void postStreamEndLumi (edm::StreamContext const &)
 
void preallocate (edm::service::SystemBounds const &)
 
void preBeginJob (edm::PathsAndConsumesOfModulesBase const &, edm::ProcessContext const &pc)
 
void preEvent (edm::StreamContext const &)
 
void preGlobalBeginLumi (edm::GlobalContext const &)
 
void preGlobalEarlyTermination (edm::GlobalContext const &, edm::TerminationOrigin)
 
void preGlobalEndLumi (edm::GlobalContext const &)
 
void preModuleBeginJob (edm::ModuleDescription const &)
 
void preModuleEvent (edm::StreamContext const &, edm::ModuleCallingContext const &)
 
void preModuleEventAcquire (edm::StreamContext const &, edm::ModuleCallingContext const &)
 
void prePathEvent (edm::StreamContext const &, edm::PathContext const &)
 
void preSourceEarlyTermination (edm::TerminationOrigin)
 
void preSourceEvent (edm::StreamID)
 
void preStreamBeginLumi (edm::StreamContext const &)
 
void preStreamEarlyTermination (edm::StreamContext const &, edm::TerminationOrigin)
 
void preStreamEndLumi (edm::StreamContext const &)
 
void reportLockWait (unsigned int ls, double waitTime, unsigned int lockCount)
 
void setExceptionDetected (unsigned int ls)
 
void setInputSource (FedRawDataInputSource *inputSource)
 
void setInputSource (DAQSource *inputSource)
 
void setInState (FastMonState::InputState inputState)
 
void setInStateSup (FastMonState::InputState inputState)
 
void setTMicrostate (FastMonState::Microstate m)
 
bool shouldWriteFiles (unsigned int lumi, unsigned int *proc=nullptr)
 
void startedLookingForFile ()
 
void stoppedLookingForFile (unsigned int lumi)
 
 ~FastMonitoringService () override
 
- Public Member Functions inherited from evf::MicroStateService
 MicroStateService (const edm::ParameterSet &, edm::ActivityRegistry &)
 
virtual ~MicroStateService ()
 

Static Public Member Functions

static void fillDescriptions (edm::ConfigurationDescriptions &descriptions)
 
static unsigned int getTID ()
 

Static Public Attributes

static const std::string inputStateNames [FastMonState::inCOUNT]
 
static const std::string macroStateNames [FastMonState::MCOUNT]
 
static const edm::ModuleDescription specialMicroStateNames [FastMonState::mCOUNT]
 

Private Member Functions

void doSnapshot (const unsigned int ls, const bool isGlobalEOL)
 
void snapshotRunner ()
 

Static Private Member Functions

static unsigned int getSID (edm::StreamContext const &sc)
 
static unsigned int getSID (edm::StreamID const &sid)
 

Private Attributes

std::map< unsigned int, unsigned long > accuSize_
 
std::map< unsigned int, double > avgLeadTime_
 
std::unique_ptr< ConcurrencyTrackerct_
 
DAQSourcedaqInputSource_ = nullptr
 
bool exception_detected_ = false
 
std::vector< unsigned int > exceptionInLS_
 
std::string fastMicrostateDefPath_
 
unsigned int fastMonIntervals_
 
std::string fastName_
 
std::string fastPath_
 
timeval fileLookStart_
 
timeval fileLookStop_
 
std::map< unsigned int, unsigned int > filesProcessedDuringLumi_
 
std::unique_ptr< FastMonitoringThreadfmt_
 
std::atomic< bool > has_data_exception_ = false
 
std::atomic< bool > has_source_exception_ = false
 
std::string inputLegendFileJson_
 
FedRawDataInputSourceinputSource_ = nullptr
 
std::atomic< FastMonState::InputStateinputState_ {FastMonState::InputState::inInit}
 
std::atomic< FastMonState::InputStateinputSupervisorState_ {FastMonState::InputState::inInit}
 
std::atomic< bool > isInitTransition_
 
unsigned int lastGlobalLumi_
 
std::vector< double > leadTimes_
 
std::map< unsigned int, std::pair< double, unsigned int > > lockStatsDuringLumi_
 
unsigned int lumiFromSource_
 
std::map< unsigned int, timeval > lumiStartTime_
 
std::vector< ContainableAtomic< const void * > > microstate_
 
std::vector< ContainableAtomic< unsigned char > > microstateAcqFlag_
 
std::string microstateDefPath_
 
std::string moduleLegendFile_
 
std::string moduleLegendFileJson_
 
std::atomic< bool > monInit_
 
unsigned int nMonThreads_ = 0
 
unsigned int nOutputModules_ = 0
 
unsigned int nStreams_ = 0
 
unsigned int nThreads_ = 0
 
std::map< unsigned int, std::pair< unsigned int, bool > > processedEventsPerLumi_
 
std::filesystem::path runDirectory_
 
int sleepTime_
 
unsigned int snapCounter_ = 0
 
std::vector< std::atomic< bool > * > streamCounterUpdating_
 
bool tbbConcurrencyTracker_
 
bool tbbMonitoringMode_
 
bool threadIDAvailable_ = false
 
std::vector< ContainableAtomic< const void * > > tmicrostate_
 
std::vector< ContainableAtomic< unsigned char > > tmicrostateAcqFlag_
 
std::atomic< unsigned long > totalEventsProcessed_
 
bool verbose_ = false
 
std::filesystem::path workingDirectory_
 

Detailed Description

Definition at line 167 of file FastMonitoringService.h.

Constructor & Destructor Documentation

◆ FastMonitoringService()

evf::FastMonitoringService::FastMonitoringService ( const edm::ParameterSet iPS,
edm::ActivityRegistry reg 
)

Definition at line 197 of file FastMonitoringService.cc.

References Exception, fastMicrostateDefPath_, jobFailure(), microstateDefPath_, postBeginJob(), postEndJob(), postEvent(), postGlobalEndLumi(), postModuleEvent(), postModuleEventAcquire(), postSourceEvent(), postStreamBeginLumi(), postStreamEndLumi(), preallocate(), preBeginJob(), preEvent(), preGlobalBeginLumi(), preGlobalEarlyTermination(), preGlobalEndLumi(), preModuleBeginJob(), preModuleEvent(), preModuleEventAcquire(), preSourceEarlyTermination(), preSourceEvent(), preStreamBeginLumi(), preStreamEarlyTermination(), preStreamEndLumi(), edm_modernize_messagelogger::stat, AlCaHLTBitMon_QueryRunRegistry::string, edm::ActivityRegistry::watchJobFailure(), edm::ActivityRegistry::watchPostBeginJob(), edm::ActivityRegistry::watchPostEndJob(), edm::ActivityRegistry::watchPostEvent(), edm::ActivityRegistry::watchPostGlobalEndLumi(), edm::ActivityRegistry::watchPostModuleEvent(), edm::ActivityRegistry::watchPostModuleEventAcquire(), edm::ActivityRegistry::watchPostSourceEvent(), edm::ActivityRegistry::watchPostStreamBeginLumi(), edm::ActivityRegistry::watchPostStreamEndLumi(), edm::ActivityRegistry::watchPreallocate(), edm::ActivityRegistry::watchPreBeginJob(), edm::ActivityRegistry::watchPreEvent(), edm::ActivityRegistry::watchPreGlobalBeginLumi(), edm::ActivityRegistry::watchPreGlobalEarlyTermination(), edm::ActivityRegistry::watchPreGlobalEndLumi(), edm::ActivityRegistry::watchPreModuleBeginJob(), edm::ActivityRegistry::watchPreModuleEvent(), edm::ActivityRegistry::watchPreModuleEventAcquire(), edm::ActivityRegistry::watchPreSourceEarlyTermination(), edm::ActivityRegistry::watchPreSourceEvent(), edm::ActivityRegistry::watchPreStreamBeginLumi(), edm::ActivityRegistry::watchPreStreamEarlyTermination(), and edm::ActivityRegistry::watchPreStreamEndLumi().

198  : MicroStateService(iPS, reg),
199  fmt_(new FastMonitoringThread()),
200  tbbMonitoringMode_(iPS.getUntrackedParameter<bool>("tbbMonitoringMode", true)),
201  tbbConcurrencyTracker_(iPS.getUntrackedParameter<bool>("tbbConcurrencyTracker", true) && tbbMonitoringMode_),
202  sleepTime_(iPS.getUntrackedParameter<int>("sleepTime", 1)),
203  fastMonIntervals_(iPS.getUntrackedParameter<unsigned int>("fastMonIntervals", 2)),
204  fastName_("fastmoni"),
206  verbose_(iPS.getUntrackedParameter<bool>("verbose")) {
207  reg.watchPreallocate(this, &FastMonitoringService::preallocate); //receiving information on number of threads
209 
214 
218 
223 
224  reg.watchPreEvent(this, &FastMonitoringService::preEvent); //stream
226 
227  //readEvent (not getNextItemType)
228  reg.watchPreSourceEvent(this, &FastMonitoringService::preSourceEvent); //source (with streamID of requestor)
230 
233 
236 
240 
241  //find microstate definition path (required by the module)
242  struct stat statbuf;
243  std::string microstateBaseSuffix = "src/EventFilter/Utilities/plugins/microstatedef.jsd";
244  std::string microstatePath = std::string(std::getenv("CMSSW_BASE")) + "/" + microstateBaseSuffix;
245  if (stat(microstatePath.c_str(), &statbuf)) {
246  microstatePath = std::string(std::getenv("CMSSW_RELEASE_BASE")) + "/" + microstateBaseSuffix;
247  if (stat(microstatePath.c_str(), &statbuf)) {
248  microstatePath = microstateBaseSuffix;
249  if (stat(microstatePath.c_str(), &statbuf))
250  throw cms::Exception("FastMonitoringService") << "microstate definition file not found";
251  }
252  }
253  fastMicrostateDefPath_ = microstateDefPath_ = microstatePath;
254  }
void watchPreStreamEarlyTermination(PreStreamEarlyTermination::slot_type const &iSlot)
void watchPreEvent(PreEvent::slot_type const &iSlot)
void postModuleEventAcquire(edm::StreamContext const &, edm::ModuleCallingContext const &)
void watchPreallocate(Preallocate::slot_type const &iSlot)
void watchPreModuleEventAcquire(PreModuleEventAcquire::slot_type const &iSlot)
void watchPostEndJob(PostEndJob::slot_type const &iSlot)
void preallocate(edm::service::SystemBounds const &)
void preGlobalBeginLumi(edm::GlobalContext const &)
void watchPreModuleEvent(PreModuleEvent::slot_type const &iSlot)
void postGlobalEndLumi(edm::GlobalContext const &)
void postEvent(edm::StreamContext const &)
void watchPreGlobalEndLumi(PreGlobalEndLumi::slot_type const &iSlot)
void watchPostEvent(PostEvent::slot_type const &iSlot)
void watchPostStreamEndLumi(PostStreamEndLumi::slot_type const &iSlot)
void watchPreGlobalBeginLumi(PreGlobalBeginLumi::slot_type const &iSlot)
void watchPostModuleEvent(PostModuleEvent::slot_type const &iSlot)
void watchPostSourceEvent(PostSourceEvent::slot_type const &iSlot)
void preGlobalEndLumi(edm::GlobalContext const &)
void watchPreStreamEndLumi(PreStreamEndLumi::slot_type const &iSlot)
std::unique_ptr< FastMonitoringThread > fmt_
void preGlobalEarlyTermination(edm::GlobalContext const &, edm::TerminationOrigin)
void watchPreSourceEarlyTermination(PreSourceEarlyTermination::slot_type const &iSlot)
void watchJobFailure(JobFailure::slot_type const &iSlot)
convenience function for attaching to signal
T getUntrackedParameter(std::string const &, T const &) const
void preModuleBeginJob(edm::ModuleDescription const &)
MicroStateService(const edm::ParameterSet &, edm::ActivityRegistry &)
void preStreamEndLumi(edm::StreamContext const &)
void watchPostStreamBeginLumi(PostStreamBeginLumi::slot_type const &iSlot)
void preStreamEarlyTermination(edm::StreamContext const &, edm::TerminationOrigin)
void watchPreGlobalEarlyTermination(PreGlobalEarlyTermination::slot_type const &iSlot)
void watchPostModuleEventAcquire(PostModuleEventAcquire::slot_type const &iSlot)
void watchPostGlobalEndLumi(PostGlobalEndLumi::slot_type const &iSlot)
void watchPreModuleBeginJob(PreModuleBeginJob::slot_type const &iSlot)
void preModuleEventAcquire(edm::StreamContext const &, edm::ModuleCallingContext const &)
void postModuleEvent(edm::StreamContext const &, edm::ModuleCallingContext const &)
void postStreamBeginLumi(edm::StreamContext const &)
void postStreamEndLumi(edm::StreamContext const &)
void preStreamBeginLumi(edm::StreamContext const &)
std::atomic< unsigned long > totalEventsProcessed_
void watchPreStreamBeginLumi(PreStreamBeginLumi::slot_type const &iSlot)
void preBeginJob(edm::PathsAndConsumesOfModulesBase const &, edm::ProcessContext const &pc)
void watchPreBeginJob(PreBeginJob::slot_type const &iSlot)
convenience function for attaching to signal
void preEvent(edm::StreamContext const &)
void preSourceEarlyTermination(edm::TerminationOrigin)
void watchPreSourceEvent(PreSourceEvent::slot_type const &iSlot)
void preModuleEvent(edm::StreamContext const &, edm::ModuleCallingContext const &)
void watchPostBeginJob(PostBeginJob::slot_type const &iSlot)
convenience function for attaching to signal

◆ ~FastMonitoringService()

evf::FastMonitoringService::~FastMonitoringService ( )
override

Definition at line 256 of file FastMonitoringService.cc.

256 {}

Member Function Documentation

◆ accumulateFileSize()

void evf::FastMonitoringService::accumulateFileSize ( unsigned int  lumi,
unsigned long  fileSize 
)

Definition at line 717 of file FastMonitoringService.cc.

References accuSize_, filesProcessedDuringLumi_, fmt_, CommonMethods::lock(), and BXlumiParameters_cfi::lumi.

Referenced by evf::EvFDaqDirector::bumpFile().

717  {
718  std::lock_guard<std::mutex> lock(fmt_->monlock_);
719 
720  if (accuSize_.find(lumi) == accuSize_.end())
721  accuSize_[lumi] = fileSize;
722  else
723  accuSize_[lumi] += fileSize;
724 
727  else
729  }
std::map< unsigned int, unsigned long > accuSize_
std::map< unsigned int, unsigned int > filesProcessedDuringLumi_
std::unique_ptr< FastMonitoringThread > fmt_

◆ doSnapshot()

void evf::FastMonitoringService::doSnapshot ( const unsigned int  ls,
const bool  isGlobalEOL 
)
private

Definition at line 859 of file FastMonitoringService.cc.

References avgLeadTime_, ct_, filesProcessedDuringLumi_, fmt_, evf::getmEoL(), evf::getmFwk(), evf::getmFwkEoL(), evf::getmIdle(), mps_fire::i, ALPAKA_ACCELERATOR_NAMESPACE::caPixelDoublets::if(), evf::FastMonState::inIgnore, evf::FastMonState::inNewLumi, evf::FastMonState::inNoRequest, evf::FastMonState::inNoRequestWithEoLThreads, evf::FastMonState::inNoRequestWithIdleThreads, inputState_, inputSupervisorState_, evf::FastMonState::inRunEnd, evf::FastMonState::inSupBusy, evf::FastMonState::inSupFileLimit, evf::FastMonState::inSupLockPolling, evf::FastMonState::inSupLockPollingCopying, evf::FastMonState::inSupNewFile, evf::FastMonState::inSupNewFileWaitChunk, evf::FastMonState::inSupNewFileWaitChunkCopying, evf::FastMonState::inSupNewFileWaitThread, evf::FastMonState::inSupNewFileWaitThreadCopying, evf::FastMonState::inSupNoFile, evf::FastMonState::inSupThrottled, evf::FastMonState::inSupWaitFreeChunk, evf::FastMonState::inSupWaitFreeChunkCopying, evf::FastMonState::inSupWaitFreeThread, evf::FastMonState::inSupWaitFreeThreadCopying, evf::FastMonState::inWaitChunk, evf::FastMonState::inWaitChunk_busy, evf::FastMonState::inWaitChunk_fileLimit, evf::FastMonState::inWaitChunk_lockPolling, evf::FastMonState::inWaitChunk_lockPollingCopying, evf::FastMonState::inWaitChunk_newFile, evf::FastMonState::inWaitChunk_newFileWaitChunk, evf::FastMonState::inWaitChunk_newFileWaitChunkCopying, evf::FastMonState::inWaitChunk_newFileWaitThread, evf::FastMonState::inWaitChunk_newFileWaitThreadCopying, evf::FastMonState::inWaitChunk_noFile, evf::FastMonState::inWaitChunk_runEnd, evf::FastMonState::inWaitChunk_waitFreeChunk, evf::FastMonState::inWaitChunk_waitFreeChunkCopying, evf::FastMonState::inWaitChunk_waitFreeThread, evf::FastMonState::inWaitChunk_waitFreeThreadCopying, evf::FastMonState::inWaitInput, evf::FastMonState::inWaitInput_busy, evf::FastMonState::inWaitInput_fileLimit, evf::FastMonState::inWaitInput_lockPolling, evf::FastMonState::inWaitInput_lockPollingCopying, evf::FastMonState::inWaitInput_newFile, evf::FastMonState::inWaitInput_newFileWaitChunk, evf::FastMonState::inWaitInput_newFileWaitChunkCopying, evf::FastMonState::inWaitInput_newFileWaitThread, evf::FastMonState::inWaitInput_newFileWaitThreadCopying, evf::FastMonState::inWaitInput_noFile, evf::FastMonState::inWaitInput_runEnd, evf::FastMonState::inWaitInput_waitFreeChunk, evf::FastMonState::inWaitInput_waitFreeChunkCopying, evf::FastMonState::inWaitInput_waitFreeThread, evf::FastMonState::inWaitInput_waitFreeThreadCopying, isInitTransition_, lockStatsDuringLumi_, eostools::ls(), microstate_, microstateAcqFlag_, nMonThreads_, nStreams_, nThreads_, tmicrostate_, and tmicrostateAcqFlag_.

Referenced by preGlobalEndLumi(), and snapshotRunner().

859  {
860  // update macrostate
861  fmt_->m_data.fastMacrostateJ_ = fmt_->m_data.macrostate_;
862 
863  std::vector<const void*> microstateCopy(microstate_.begin(), microstate_.end());
864  std::vector<const void*> tmicrostateCopy(tmicrostate_.begin(), tmicrostate_.end());
865  std::vector<unsigned char> microstateAcqCopy(microstateAcqFlag_.begin(), microstateAcqFlag_.end());
866  std::vector<unsigned char> tmicrostateAcqCopy(tmicrostateAcqFlag_.begin(), tmicrostateAcqFlag_.end());
867 
868  if (!isInitTransition_) {
869  auto itd = avgLeadTime_.find(ls);
870  if (itd != avgLeadTime_.end())
871  fmt_->m_data.fastAvgLeadTimeJ_ = itd->second;
872  else
873  fmt_->m_data.fastAvgLeadTimeJ_ = 0.;
874 
875  auto iti = filesProcessedDuringLumi_.find(ls);
876  if (iti != filesProcessedDuringLumi_.end())
877  fmt_->m_data.fastFilesProcessedJ_ = iti->second;
878  else
879  fmt_->m_data.fastFilesProcessedJ_ = 0;
880 
881  auto itrd = lockStatsDuringLumi_.find(ls);
882  if (itrd != lockStatsDuringLumi_.end()) {
883  fmt_->m_data.fastLockWaitJ_ = itrd->second.first;
884  fmt_->m_data.fastLockCountJ_ = itrd->second.second;
885  } else {
886  fmt_->m_data.fastLockWaitJ_ = 0.;
887  fmt_->m_data.fastLockCountJ_ = 0.;
888  }
889  }
890 
891  for (unsigned int i = 0; i < nThreads_; i++) {
892  if (tmicrostateCopy[i] == getmIdle() && ct_->isThreadActive(i)) {
893  //overhead if thread is running
894  tmicrostateCopy[i] = getmFwk();
895  }
896  if (tmicrostateAcqCopy[i])
897  fmt_->m_data.tmicrostateEncoded_[i] =
898  fmt_->m_data.microstateBins_ + fmt_->m_data.encModule_.encode(tmicrostateCopy[i]);
899  else
900  fmt_->m_data.tmicrostateEncoded_[i] = fmt_->m_data.encModule_.encode(tmicrostateCopy[i]);
901  }
902 
903  for (unsigned int i = 0; i < nStreams_; i++) {
904  if (microstateAcqCopy[i])
905  fmt_->m_data.microstateEncoded_[i] =
906  fmt_->m_data.microstateBins_ + fmt_->m_data.encModule_.encode(microstateCopy[i]);
907  else
908  fmt_->m_data.microstateEncoded_[i] = fmt_->m_data.encModule_.encode(microstateCopy[i]);
909  }
910 
911  bool inputStatePerThread = false;
912 
914  switch (inputSupervisorState_) {
916  fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_fileLimit;
917  break;
919  fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_waitFreeChunk;
920  break;
922  fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_waitFreeChunkCopying;
923  break;
925  fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_waitFreeThread;
926  break;
928  fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_waitFreeThreadCopying;
929  break;
931  fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_busy;
932  break;
934  fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_lockPolling;
935  break;
937  fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_lockPollingCopying;
938  break;
940  fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_runEnd;
941  break;
943  fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_noFile;
944  break;
946  fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_newFile;
947  break;
950  break;
952  fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_newFileWaitThread;
953  break;
956  break;
958  fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_newFileWaitChunk;
959  break;
960  default:
961  fmt_->m_data.inputState_[0] = FastMonState::inWaitInput;
962  }
963  } else if (inputState_ == FastMonState::inWaitChunk) {
964  switch (inputSupervisorState_) {
966  fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_fileLimit;
967  break;
969  fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_waitFreeChunk;
970  break;
972  fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_waitFreeChunkCopying;
973  break;
975  fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_waitFreeThread;
976  break;
978  fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_waitFreeThreadCopying;
979  break;
981  fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_busy;
982  break;
984  fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_lockPolling;
985  break;
987  fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_lockPollingCopying;
988  break;
990  fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_runEnd;
991  break;
993  fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_noFile;
994  break;
996  fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_newFile;
997  break;
1000  break;
1002  fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_newFileWaitThread;
1003  break;
1006  break;
1008  fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_newFileWaitChunk;
1009  break;
1010  default:
1011  fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk;
1012  }
1013  } else if (inputState_ == FastMonState::inNoRequest) {
1014  inputStatePerThread = true;
1015  for (unsigned int i = 0; i < nMonThreads_; i++) {
1016  if (i >= nStreams_)
1017  fmt_->m_data.inputState_[i] = FastMonState::inIgnore;
1018  else if (microstateCopy[i] == getmIdle())
1019  fmt_->m_data.inputState_[i] = FastMonState::inNoRequestWithIdleThreads;
1020  else if (microstateCopy[i] == getmEoL() || microstateCopy[i] == getmFwkEoL())
1021  fmt_->m_data.inputState_[i] = FastMonState::inNoRequestWithEoLThreads;
1022  else
1023  fmt_->m_data.inputState_[i] = FastMonState::inNoRequest;
1024  }
1025  } else if (inputState_ == FastMonState::inNewLumi) {
1026  inputStatePerThread = true;
1027  for (unsigned int i = 0; i < nMonThreads_; i++) {
1028  if (i >= nStreams_)
1029  fmt_->m_data.inputState_[i] = FastMonState::inIgnore;
1030  else if (microstateCopy[i] == getmEoL() || microstateCopy[i] == getmFwkEoL())
1031  fmt_->m_data.inputState_[i] = FastMonState::inNewLumi;
1032  }
1034  //apply directly throttled state from supervisor
1035  fmt_->m_data.inputState_[0] = inputSupervisorState_;
1036  } else
1037  fmt_->m_data.inputState_[0] = inputState_;
1038 
1039  //this is same for all streams
1040  if (!inputStatePerThread)
1041  for (unsigned int i = 1; i < nMonThreads_; i++)
1042  fmt_->m_data.inputState_[i] = fmt_->m_data.inputState_[0];
1043 
1044  if (isGlobalEOL) { //only update global variables
1045  fmt_->jsonMonitor_->snapGlobal(ls);
1046  } else
1047  fmt_->jsonMonitor_->snap(ls);
1048  }
constexpr edm::ModuleDescription const * getmFwkEoL()
std::atomic< FastMonState::InputState > inputState_
std::vector< ContainableAtomic< const void * > > microstate_
constexpr edm::ModuleDescription const * getmIdle()
std::atomic< bool > isInitTransition_
std::map< unsigned int, unsigned int > filesProcessedDuringLumi_
std::unique_ptr< FastMonitoringThread > fmt_
constexpr edm::ModuleDescription const * getmFwk()
constexpr edm::ModuleDescription const * getmEoL()
std::vector< ContainableAtomic< unsigned char > > tmicrostateAcqFlag_
std::map< unsigned int, double > avgLeadTime_
std::atomic< FastMonState::InputState > inputSupervisorState_
std::vector< ContainableAtomic< unsigned char > > microstateAcqFlag_
std::unique_ptr< ConcurrencyTracker > ct_
def ls(path, rec=False)
Definition: eostools.py:349
std::map< unsigned int, std::pair< double, unsigned int > > lockStatsDuringLumi_
if(threadIdxLocalY==0 &&threadIdxLocalX==0)
std::vector< ContainableAtomic< const void * > > tmicrostate_

◆ exceptionDetected()

bool evf::FastMonitoringService::exceptionDetected ( ) const

Definition at line 457 of file FastMonitoringService.cc.

References has_data_exception_, and has_source_exception_.

Referenced by DAQSource::~DAQSource(), and FedRawDataInputSource::~FedRawDataInputSource().

457  {
458  return has_source_exception_.load() || has_data_exception_.load();
459  }
std::atomic< bool > has_data_exception_
std::atomic< bool > has_source_exception_

◆ fillDescriptions()

void evf::FastMonitoringService::fillDescriptions ( edm::ConfigurationDescriptions descriptions)
static

Definition at line 258 of file FastMonitoringService.cc.

References edm::ConfigurationDescriptions::add(), and submitPVResolutionJobs::desc.

258  {
260  desc.setComment("Service for File-based DAQ monitoring and event accounting");
261  desc.addUntracked<bool>("tbbMonitoringMode", true)
262  ->setComment("Monitor individual module processing per TBB thread instead of stream");
263  desc.addUntracked<bool>("tbbConcurrencyTracker", true)
264  ->setComment("Monitor TBB thread activity to flag microstate as real idle or overhead/other");
265  desc.addUntracked<int>("sleepTime", 1)->setComment("Sleep time of the monitoring thread");
266  desc.addUntracked<unsigned int>("fastMonIntervals", 2)
267  ->setComment("Modulo of sleepTime intervals on which fastmon file is written out");
268  desc.addUntracked<bool>("filePerFwkStream", true) //obsolete
269  ->setComment("Switches on monitoring output per framework stream");
270  desc.addUntracked<bool>("verbose", false)->setComment("Set to use LogInfo messages from the monitoring thread");
271  desc.setAllowAnything();
272  descriptions.add("FastMonitoringService", desc);
273  }
void add(std::string const &label, ParameterSetDescription const &psetDescription)

◆ getAbortFlagForLumi()

bool evf::FastMonitoringService::getAbortFlagForLumi ( unsigned int  lumi)

Definition at line 795 of file FastMonitoringService.cc.

References Exception, fmt_, ALPAKA_ACCELERATOR_NAMESPACE::vertexFinder::it, CommonMethods::lock(), BXlumiParameters_cfi::lumi, and processedEventsPerLumi_.

Referenced by shouldWriteFiles().

795  {
796  std::lock_guard<std::mutex> lock(fmt_->monlock_);
797 
798  auto it = processedEventsPerLumi_.find(lumi);
799  if (it != processedEventsPerLumi_.end()) {
800  unsigned int abortFlag = it->second.second;
801  return abortFlag;
802  } else {
803  throw cms::Exception("FastMonitoringService")
804  << "output module wants already deleted (or never reported by SOURCE) lumisection status for LUMI -: "
805  << lumi;
806  return false;
807  }
808  }
std::unique_ptr< FastMonitoringThread > fmt_
std::map< unsigned int, std::pair< unsigned int, bool > > processedEventsPerLumi_

◆ getEventsProcessedForLumi()

unsigned int evf::FastMonitoringService::getEventsProcessedForLumi ( unsigned int  lumi,
bool *  abortFlag = nullptr 
)

Definition at line 777 of file FastMonitoringService.cc.

References Exception, fmt_, ALPAKA_ACCELERATOR_NAMESPACE::vertexFinder::it, CommonMethods::lock(), BXlumiParameters_cfi::lumi, ValidateTausOnZEEFastSim_cff::proc, and processedEventsPerLumi_.

Referenced by dqm::DQMFileSaverPB::fillJson(), dqmfilesaver::fillJson(), evf::GlobalEvFOutputModule::globalEndLuminosityBlock(), dqm::DQMFileSaverPB::saveLumi(), and shouldWriteFiles().

777  {
778  std::lock_guard<std::mutex> lock(fmt_->monlock_);
779 
780  auto it = processedEventsPerLumi_.find(lumi);
781  if (it != processedEventsPerLumi_.end()) {
782  unsigned int proc = it->second.first;
783  if (abortFlag)
784  *abortFlag = it->second.second;
785  return proc;
786  } else {
787  throw cms::Exception("FastMonitoringService")
788  << "output module wants already deleted (or never reported by SOURCE) lumisection event count for LUMI -: "
789  << lumi;
790  return 0;
791  }
792  }
std::unique_ptr< FastMonitoringThread > fmt_
std::map< unsigned int, std::pair< unsigned int, bool > > processedEventsPerLumi_

◆ getRunDirName()

std::string evf::FastMonitoringService::getRunDirName ( ) const
inline

Definition at line 226 of file FastMonitoringService.h.

References runDirectory_.

226 { return runDirectory_.stem().string(); }
std::filesystem::path runDirectory_

◆ getSID() [1/2]

static unsigned int evf::FastMonitoringService::getSID ( edm::StreamContext const &  sc)
inlinestaticprivate

◆ getSID() [2/2]

static unsigned int evf::FastMonitoringService::getSID ( edm::StreamID const &  sid)
inlinestaticprivate

Definition at line 243 of file FastMonitoringService.h.

References edm::StreamID::value().

243 { return sid.value(); }

◆ getTID()

static unsigned int evf::FastMonitoringService::getTID ( )
inlinestatic

Definition at line 234 of file FastMonitoringService.h.

Referenced by postModuleEvent(), postModuleEventAcquire(), postSourceEvent(), preModuleEvent(), preModuleEventAcquire(), and preSourceEvent().

234 { return tbb::this_task_arena::current_thread_index(); }

◆ isExceptionOnData()

bool evf::FastMonitoringService::isExceptionOnData ( unsigned int  ls)

Definition at line 461 of file FastMonitoringService.cc.

References exceptionInLS_, fmt_, has_data_exception_, has_source_exception_, CommonMethods::lock(), and eostools::ls().

Referenced by evf::EvFDaqDirector::preGlobalEndLumi(), FedRawDataInputSource::read(), DAQSource::read(), DAQSource::~DAQSource(), and FedRawDataInputSource::~FedRawDataInputSource().

461  {
462  if (!has_data_exception_.load())
463  return false;
464  if (has_source_exception_.load())
465  return true;
466  std::lock_guard<std::mutex> lock(fmt_->monlock_);
467  for (auto ex : exceptionInLS_) {
468  if (ls == ex)
469  return true;
470  }
471  return false;
472  }
std::atomic< bool > has_data_exception_
std::unique_ptr< FastMonitoringThread > fmt_
def ls(path, rec=False)
Definition: eostools.py:349
std::atomic< bool > has_source_exception_
std::vector< unsigned int > exceptionInLS_

◆ jobFailure()

void evf::FastMonitoringService::jobFailure ( )

Definition at line 474 of file FastMonitoringService.cc.

References fmt_, and evf::FastMonState::sError.

Referenced by FastMonitoringService().

474 { fmt_->m_data.macrostate_ = FastMonState::sError; }
std::unique_ptr< FastMonitoringThread > fmt_

◆ makeInputLegendaJson()

std::string evf::FastMonitoringService::makeInputLegendaJson ( )

Definition at line 296 of file FastMonitoringService.cc.

References jsoncollector::Json::Value::append(), jsoncollector::Json::arrayValue, mps_fire::i, evf::FastMonState::inCOUNT, inputStateNames, and convertToRaw::writer.

Referenced by postBeginJob().

296  {
297  Json::Value legendaVector(Json::arrayValue);
298  for (int i = 0; i < FastMonState::inCOUNT; i++)
299  legendaVector.append(Json::Value(inputStateNames[i]));
300  Json::Value moduleLegend;
301  moduleLegend["names"] = legendaVector;
303  return writer.write(moduleLegend);
304  }
static const std::string inputStateNames[FastMonState::inCOUNT]
Writes a Value in JSON format in a human friendly way.
Definition: writer.h:64
array value (ordered list)
Definition: value.h:32
Represents a JSON value.
Definition: value.h:101

◆ makeModuleLegendaJson()

std::string evf::FastMonitoringService::makeModuleLegendaJson ( )

Definition at line 275 of file FastMonitoringService.cc.

References jsoncollector::Json::Value::append(), jsoncollector::Json::arrayValue, fmt_, mps_fire::i, nOutputModules_, evf::nReservedModules, evf::nSpecialModules, and convertToRaw::writer.

Referenced by postBeginJob().

275  {
276  Json::Value legendaVector(Json::arrayValue);
277  for (int i = 0; i < fmt_->m_data.encModule_.current_; i++)
278  legendaVector.append(
279  Json::Value((static_cast<const edm::ModuleDescription*>(fmt_->m_data.encModule_.decode(i)))->moduleLabel()));
280  //duplicate modules adding a list for acquire states (not all modules actually have it)
281  for (int i = 0; i < fmt_->m_data.encModule_.current_; i++)
282  legendaVector.append(Json::Value(
283  (static_cast<const edm::ModuleDescription*>(fmt_->m_data.encModule_.decode(i)))->moduleLabel() + "__ACQ"));
284  Json::Value valReserved(nReservedModules);
285  Json::Value valSpecial(nSpecialModules);
286  Json::Value valOutputModules(nOutputModules_);
287  Json::Value moduleLegend;
288  moduleLegend["names"] = legendaVector;
289  moduleLegend["reserved"] = valReserved;
290  moduleLegend["special"] = valSpecial;
291  moduleLegend["output"] = valOutputModules;
293  return writer.write(moduleLegend);
294  }
std::unique_ptr< FastMonitoringThread > fmt_
constexpr int nSpecialModules
Writes a Value in JSON format in a human friendly way.
Definition: writer.h:64
array value (ordered list)
Definition: value.h:32
constexpr int nReservedModules
Represents a JSON value.
Definition: value.h:101

◆ postBeginJob()

void evf::FastMonitoringService::postBeginJob ( )

Definition at line 491 of file FastMonitoringService.cc.

References fmt_, inputLegendFileJson_, CommonMethods::lock(), makeInputLegendaJson(), makeModuleLegendaJson(), moduleLegendFileJson_, evf::FastMonState::sJobReady, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by FastMonitoringService().

491  {
492  std::string&& moduleLegStrJson = makeModuleLegendaJson();
493  FileIO::writeStringToFile(moduleLegendFileJson_, moduleLegStrJson);
494 
495  std::string inputLegendStrJson = makeInputLegendaJson();
496  FileIO::writeStringToFile(inputLegendFileJson_, inputLegendStrJson);
497 
498  fmt_->m_data.macrostate_ = FastMonState::sJobReady;
499 
500  //update number of entries in module histogram
501  std::lock_guard<std::mutex> lock(fmt_->monlock_);
502  //double the size to add post-acquire states
503  fmt_->m_data.microstateBins_ = fmt_->m_data.encModule_.vecsize() * 2;
504  }
std::unique_ptr< FastMonitoringThread > fmt_

◆ postEndJob()

void evf::FastMonitoringService::postEndJob ( )

Definition at line 506 of file FastMonitoringService.cc.

References fmt_, and evf::FastMonState::sJobEnded.

Referenced by FastMonitoringService().

506  {
507  fmt_->m_data.macrostate_ = FastMonState::sJobEnded;
508  fmt_->stop();
509  }
std::unique_ptr< FastMonitoringThread > fmt_

◆ postEvent()

void evf::FastMonitoringService::postEvent ( edm::StreamContext const &  sc)

Definition at line 641 of file FastMonitoringService.cc.

References fmt_, evf::getmIdle(), microstate_, edm::StreamContext::streamID(), totalEventsProcessed_, and edm::StreamID::value().

Referenced by FastMonitoringService().

641  {
642  (*(fmt_->m_data.processed_[sc.streamID().value()]))++;
643  //fast path counter (events accumulated in a run)
644  unsigned long res = totalEventsProcessed_.fetch_add(1, std::memory_order_relaxed);
645  fmt_->m_data.fastPathProcessedJ_ = res + 1;
646 
647  microstate_[sc.streamID().value()] = getmIdle();
648  }
std::vector< ContainableAtomic< const void * > > microstate_
constexpr edm::ModuleDescription const * getmIdle()
std::unique_ptr< FastMonitoringThread > fmt_
Definition: Electron.h:6
std::atomic< unsigned long > totalEventsProcessed_

◆ postGlobalBeginRun()

void evf::FastMonitoringService::postGlobalBeginRun ( edm::GlobalContext const &  gc)

Definition at line 511 of file FastMonitoringService.cc.

References fmt_, isInitTransition_, and evf::FastMonState::sRunning.

511  {
512  fmt_->m_data.macrostate_ = FastMonState::sRunning;
513  isInitTransition_ = false;
514  }
std::atomic< bool > isInitTransition_
std::unique_ptr< FastMonitoringThread > fmt_

◆ postGlobalEndLumi()

void evf::FastMonitoringService::postGlobalEndLumi ( edm::GlobalContext const &  gc)

Definition at line 598 of file FastMonitoringService.cc.

References avgLeadTime_, filesProcessedDuringLumi_, fmt_, CommonMethods::lock(), lockStatsDuringLumi_, edm::LuminosityBlockID::luminosityBlock(), edm::GlobalContext::luminosityBlockID(), and processedEventsPerLumi_.

Referenced by FastMonitoringService().

598  {
599  std::lock_guard<std::mutex> lock(fmt_->monlock_);
600  unsigned int lumi = gc.luminosityBlockID().luminosityBlock();
601  //LS monitoring snapshot with input source data has been taken in previous callback
602  avgLeadTime_.erase(lumi);
604  lockStatsDuringLumi_.erase(lumi);
605 
606  //output module already used this in end lumi (this could be migrated to EvFDaqDirector as it is essential for FFF bookkeeping)
608  }
std::map< unsigned int, unsigned int > filesProcessedDuringLumi_
std::unique_ptr< FastMonitoringThread > fmt_
std::map< unsigned int, std::pair< unsigned int, bool > > processedEventsPerLumi_
std::map< unsigned int, double > avgLeadTime_
std::map< unsigned int, std::pair< double, unsigned int > > lockStatsDuringLumi_

◆ postModuleEvent()

void evf::FastMonitoringService::postModuleEvent ( edm::StreamContext const &  sc,
edm::ModuleCallingContext const &  mcc 
)

Definition at line 706 of file FastMonitoringService.cc.

References evf::getmFwkOvhMod(), evf::getmIdle(), getSID(), getTID(), microstate_, nThreads_, tbbMonitoringMode_, and tmicrostate_.

Referenced by FastMonitoringService().

706  {
708  if (!tbbMonitoringMode_)
709  return;
710  auto tid = getTID();
711  if (tid >= nThreads_)
712  return;
713  tmicrostate_[tid] = getmIdle();
714  }
constexpr edm::ModuleDescription const * getmFwkOvhMod()
std::vector< ContainableAtomic< const void * > > microstate_
constexpr edm::ModuleDescription const * getmIdle()
static unsigned int getSID(edm::StreamContext const &sc)
std::vector< ContainableAtomic< const void * > > tmicrostate_

◆ postModuleEventAcquire()

void evf::FastMonitoringService::postModuleEventAcquire ( edm::StreamContext const &  sc,
edm::ModuleCallingContext const &  mcc 
)

Definition at line 683 of file FastMonitoringService.cc.

References evf::getmFwkOvhMod(), evf::getmIdle(), getSID(), getTID(), microstate_, microstateAcqFlag_, nThreads_, tbbMonitoringMode_, tmicrostate_, and tmicrostateAcqFlag_.

Referenced by FastMonitoringService().

684  {
686  microstateAcqFlag_[getSID(sc)] = 0;
687  if (!tbbMonitoringMode_)
688  return;
689  auto tid = getTID();
690  if (tid >= nThreads_)
691  return;
692  tmicrostate_[tid] = getmIdle();
693  tmicrostateAcqFlag_[tid] = 0;
694  }
constexpr edm::ModuleDescription const * getmFwkOvhMod()
std::vector< ContainableAtomic< const void * > > microstate_
constexpr edm::ModuleDescription const * getmIdle()
std::vector< ContainableAtomic< unsigned char > > tmicrostateAcqFlag_
std::vector< ContainableAtomic< unsigned char > > microstateAcqFlag_
static unsigned int getSID(edm::StreamContext const &sc)
std::vector< ContainableAtomic< const void * > > tmicrostate_

◆ postSourceEvent()

void evf::FastMonitoringService::postSourceEvent ( edm::StreamID  sid)

Definition at line 660 of file FastMonitoringService.cc.

References evf::getmFwkOvhSrc(), evf::getmIdle(), getSID(), getTID(), microstate_, nThreads_, tbbMonitoringMode_, and tmicrostate_.

Referenced by FastMonitoringService().

660  {
661  microstate_[getSID(sid)] = getmFwkOvhSrc();
662  if (!tbbMonitoringMode_)
663  return;
664  auto tid = getTID();
665  if (tid >= nThreads_)
666  return;
667  tmicrostate_[tid] = getmIdle();
668  }
constexpr edm::ModuleDescription const * getmFwkOvhSrc()
std::vector< ContainableAtomic< const void * > > microstate_
constexpr edm::ModuleDescription const * getmIdle()
static unsigned int getSID(edm::StreamContext const &sc)
std::vector< ContainableAtomic< const void * > > tmicrostate_

◆ postStreamBeginLumi()

void evf::FastMonitoringService::postStreamBeginLumi ( edm::StreamContext const &  sc)

Definition at line 620 of file FastMonitoringService.cc.

References evf::getmIdle(), microstate_, edm::StreamContext::streamID(), and edm::StreamID::value().

Referenced by FastMonitoringService().

620  {
621  microstate_[sc.streamID().value()] = getmIdle();
622  }
std::vector< ContainableAtomic< const void * > > microstate_
constexpr edm::ModuleDescription const * getmIdle()

◆ postStreamEndLumi()

void evf::FastMonitoringService::postStreamEndLumi ( edm::StreamContext const &  sc)

Definition at line 633 of file FastMonitoringService.cc.

References evf::getmFwkEoL(), microstate_, edm::StreamContext::streamID(), and edm::StreamID::value().

Referenced by FastMonitoringService().

633  {
634  microstate_[sc.streamID().value()] = getmFwkEoL();
635  }
constexpr edm::ModuleDescription const * getmFwkEoL()
std::vector< ContainableAtomic< const void * > > microstate_

◆ preallocate()

void evf::FastMonitoringService::preallocate ( edm::service::SystemBounds const &  bounds)

Definition at line 306 of file FastMonitoringService.cc.

References ct_, SiStripPI::max, nMonThreads_, nStreams_, and nThreads_.

Referenced by FastMonitoringService().

306  {
307  nStreams_ = bounds.maxNumberOfStreams();
308  nThreads_ = bounds.maxNumberOfThreads();
309  //this should already be >=1
310  if (nStreams_ == 0)
311  nStreams_ = 1;
312  if (nThreads_ == 0)
313  nThreads_ = 1;
315  ct_ = std::make_unique<ConcurrencyTracker>(nThreads_);
316  //start concurrency tracking
317  }
std::unique_ptr< ConcurrencyTracker > ct_

◆ preBeginJob()

void evf::FastMonitoringService::preBeginJob ( edm::PathsAndConsumesOfModulesBase const &  ,
edm::ProcessContext const &  pc 
)

Definition at line 319 of file FastMonitoringService.cc.

References ct_, Exception, fastMicrostateDefPath_, fastName_, fastPath_, fmt_, evf::getmInvalid(), mps_fire::i, evf::FastMonState::inCOUNT, inputLegendFileJson_, isInitTransition_, lastGlobalLumi_, LogDebug, lumiFromSource_, evf::FastMonState::mCOUNT, evf::FastMonState::MCOUNT, microstate_, microstateAcqFlag_, microstateDefPath_, moduleLegendFile_, moduleLegendFileJson_, monInit_, nMonThreads_, nStreams_, nThreads_, Utilities::operator, castor_dqm_sourceclient_file_cfg::path, runDirectory_, evf::FastMonState::sInit, sleepTime_, snapshotRunner(), specialMicroStateNames, streamCounterUpdating_, tbbConcurrencyTracker_, tmicrostate_, tmicrostateAcqFlag_, and workingDirectory_.

Referenced by FastMonitoringService().

319  {
320  // FIND RUN DIRECTORY
321  // The run dir should be set via the configuration of EvFDaqDirector
323  ct_->activate();
324 
325  if (edm::Service<evf::EvFDaqDirector>().operator->() == nullptr) {
326  throw cms::Exception("FastMonitoringService") << "EvFDaqDirector is not present";
327  }
328  std::filesystem::path runDirectory{edm::Service<evf::EvFDaqDirector>()->baseRunDir()};
329  workingDirectory_ = runDirectory_ = runDirectory;
330  workingDirectory_ /= "mon";
331 
332  if (!std::filesystem::is_directory(workingDirectory_)) {
333  LogDebug("FastMonitoringService") << "<MON> DIR NOT FOUND! Trying to create -: " << workingDirectory_.string();
334  std::filesystem::create_directories(workingDirectory_);
335  if (!std::filesystem::is_directory(workingDirectory_))
336  edm::LogWarning("FastMonitoringService") << "Unable to create <MON> DIR -: " << workingDirectory_.string()
337  << ". No monitoring data will be written.";
338  }
339 
340  std::ostringstream fastFileName;
341 
342  fastFileName << fastName_ << "_pid" << std::setfill('0') << std::setw(5) << getpid() << ".fast";
344  fast /= fastFileName.str();
345  fastPath_ = fast.string();
346 
347  std::ostringstream moduleLegFile;
348  std::ostringstream moduleLegFileJson;
349  moduleLegFile << "microstatelegend_pid" << std::setfill('0') << std::setw(5) << getpid() << ".leg";
350  moduleLegFileJson << "microstatelegend_pid" << std::setfill('0') << std::setw(5) << getpid() << ".jsn";
351  moduleLegendFile_ = (workingDirectory_ / moduleLegFile.str()).string();
352  moduleLegendFileJson_ = (workingDirectory_ / moduleLegFileJson.str()).string();
353 
354  std::ostringstream inputLegFileJson;
355  inputLegFileJson << "inputlegend_pid" << std::setfill('0') << std::setw(5) << getpid() << ".jsn";
356  inputLegendFileJson_ = (workingDirectory_ / inputLegFileJson.str()).string();
357 
358  LogDebug("FastMonitoringService") << "Initializing FastMonitor with microstate def path -: " << microstateDefPath_;
359 
360  /*
361  * initialize the fast monitor with:
362  * vector of pointers to monitorable parameters
363  * path to definition
364  *
365  */
366 
367  fmt_->m_data.macrostate_ = FastMonState::sInit;
368 
369  for (unsigned int i = 0; i < (FastMonState::mCOUNT); i++)
370  fmt_->m_data.encModule_.updateReserved(static_cast<const void*>(specialMicroStateNames + i));
371  fmt_->m_data.encModule_.completeReservedWithDummies();
372 
373  for (unsigned int i = 0; i < nMonThreads_; i++) {
374  microstate_.emplace_back(getmInvalid());
375  microstateAcqFlag_.push_back(0);
376  tmicrostate_.emplace_back(getmInvalid());
377  tmicrostateAcqFlag_.push_back(0);
378 
379  //for synchronization
380  streamCounterUpdating_.push_back(new std::atomic<bool>(false));
381  }
382 
383  //initial size until we detect number of bins
384  fmt_->m_data.macrostateBins_ = FastMonState::MCOUNT;
385  fmt_->m_data.microstateBins_ = 0;
386  fmt_->m_data.inputstateBins_ = FastMonState::inCOUNT;
387 
388  lastGlobalLumi_ = 0;
389  isInitTransition_ = true;
390  lumiFromSource_ = 0;
391 
392  //startup monitoring
393  fmt_->resetFastMonitor(microstateDefPath_, fastMicrostateDefPath_);
394  fmt_->jsonMonitor_->setNStreams(nMonThreads_);
395  fmt_->m_data.registerVariables(fmt_->jsonMonitor_.get(), nMonThreads_, nStreams_, nThreads_);
396  monInit_.store(false, std::memory_order_release);
397  if (sleepTime_ > 0)
399  }
constexpr edm::ModuleDescription const * getmInvalid()
std::vector< ContainableAtomic< const void * > > microstate_
std::atomic< bool > isInitTransition_
std::filesystem::path workingDirectory_
std::vector< std::atomic< bool > * > streamCounterUpdating_
std::unique_ptr< FastMonitoringThread > fmt_
std::vector< ContainableAtomic< unsigned char > > tmicrostateAcqFlag_
std::filesystem::path runDirectory_
std::vector< ContainableAtomic< unsigned char > > microstateAcqFlag_
std::unique_ptr< ConcurrencyTracker > ct_
Log< level::Warning, false > LogWarning
static const edm::ModuleDescription specialMicroStateNames[FastMonState::mCOUNT]
#define LogDebug(id)
std::vector< ContainableAtomic< const void * > > tmicrostate_

◆ preEvent()

void evf::FastMonitoringService::preEvent ( edm::StreamContext const &  sc)

Definition at line 637 of file FastMonitoringService.cc.

References evf::getmEvent(), microstate_, edm::StreamContext::streamID(), and edm::StreamID::value().

Referenced by FastMonitoringService().

637  {
638  microstate_[sc.streamID().value()] = getmEvent();
639  }
std::vector< ContainableAtomic< const void * > > microstate_
constexpr edm::ModuleDescription const * getmEvent()

◆ preGlobalBeginLumi()

void evf::FastMonitoringService::preGlobalBeginLumi ( edm::GlobalContext const &  gc)

Definition at line 516 of file FastMonitoringService.cc.

References fmt_, evf::getmIdle(), evf::getmInvalid(), mps_fire::i, lastGlobalLumi_, CommonMethods::lock(), edm::LuminosityBlockID::luminosityBlock(), edm::GlobalContext::luminosityBlockID(), lumiStartTime_, nThreads_, tbbMonitoringMode_, and tmicrostate_.

Referenced by FastMonitoringService().

516  {
517  timeval lumiStartTime;
518  gettimeofday(&lumiStartTime, nullptr);
519  unsigned int newLumi = gc.luminosityBlockID().luminosityBlock();
520  lastGlobalLumi_ = newLumi;
521 
522  std::lock_guard<std::mutex> lock(fmt_->monlock_);
523  lumiStartTime_[newLumi] = lumiStartTime;
524  //reset all states to idle
525  if (tbbMonitoringMode_)
526  for (unsigned i = 0; i < nThreads_; i++)
527  if (tmicrostate_[i] == getmInvalid())
528  tmicrostate_[i] = getmIdle();
529  }
constexpr edm::ModuleDescription const * getmInvalid()
constexpr edm::ModuleDescription const * getmIdle()
std::map< unsigned int, timeval > lumiStartTime_
std::unique_ptr< FastMonitoringThread > fmt_
std::vector< ContainableAtomic< const void * > > tmicrostate_

◆ preGlobalEarlyTermination()

void evf::FastMonitoringService::preGlobalEarlyTermination ( edm::GlobalContext const &  gc,
edm::TerminationOrigin  to 
)

Definition at line 417 of file FastMonitoringService.cc.

References visDQMUpload::context, edm::ExceptionFromAnotherContext, edm::ExceptionFromThisContext, exceptionInLS_, edm::ExternalSignal, fmt_, has_data_exception_, CommonMethods::lock(), edm::LuminosityBlockID::luminosityBlock(), edm::GlobalContext::luminosityBlockID(), and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by FastMonitoringService().

417  {
420  context = " FromThisContext ";
422  context = " FromAnotherContext";
424  context = " FromExternalSignal";
425  edm::LogWarning("FastMonitoringService")
426  << " GLOBAL "
427  << "earlyTermination -: LS:" << gc.luminosityBlockID().luminosityBlock() << " " << context;
428  std::lock_guard<std::mutex> lock(fmt_->monlock_);
429  exceptionInLS_.push_back(gc.luminosityBlockID().luminosityBlock());
430  has_data_exception_.store(true);
431  }
std::atomic< bool > has_data_exception_
std::unique_ptr< FastMonitoringThread > fmt_
Log< level::Warning, false > LogWarning
std::vector< unsigned int > exceptionInLS_

◆ preGlobalEndLumi()

void evf::FastMonitoringService::preGlobalEndLumi ( edm::GlobalContext const &  gc)

Definition at line 531 of file FastMonitoringService.cc.

References accuSize_, daqInputSource_, doSnapshot(), Exception, exception_detected_, exceptionInLS_, fmt_, DAQSource::getEventReport(), FedRawDataInputSource::getEventReport(), inputSource_, CommonMethods::lock(), LogDebug, BXlumiParameters_cfi::lumi, edm::LuminosityBlockID::luminosityBlock(), edm::GlobalContext::luminosityBlockID(), lumiStartTime_, processedEventsPerLumi_, edm::shutdown_flag, throughputFactor(), and jsoncollector::IntJ::value().

Referenced by FastMonitoringService().

531  {
532  unsigned int lumi = gc.luminosityBlockID().luminosityBlock();
533  LogDebug("FastMonitoringService") << "Lumi ended. Writing JSON information. LUMI -: " << lumi;
534  timeval lumiStopTime;
535  gettimeofday(&lumiStopTime, nullptr);
536 
537  std::lock_guard<std::mutex> lock(fmt_->monlock_);
538 
539  // Compute throughput
540  timeval stt = lumiStartTime_[lumi];
541  lumiStartTime_.erase(lumi);
542  unsigned long usecondsForLumi = (lumiStopTime.tv_sec - stt.tv_sec) * 1000000 + (lumiStopTime.tv_usec - stt.tv_usec);
543  unsigned long accuSize = accuSize_.find(lumi) == accuSize_.end() ? 0 : accuSize_[lumi];
544  accuSize_.erase(lumi);
545  double throughput = throughputFactor() * double(accuSize) / double(usecondsForLumi);
546  //store to registered variable
547  fmt_->m_data.fastThroughputJ_.value() = throughput;
548 
549  //update
550  doSnapshot(lumi, true);
551 
552  //retrieve one result we need (todo: sanity check if it's found)
553  IntJ* lumiProcessedJptr = dynamic_cast<IntJ*>(fmt_->jsonMonitor_->getMergedIntJForLumi("Processed", lumi));
554  if (!lumiProcessedJptr)
555  throw cms::Exception("FastMonitoringService") << "Internal error: got null pointer from FastMonitor";
556  processedEventsPerLumi_[lumi] = std::pair<unsigned int, bool>(lumiProcessedJptr->value(), false);
557 
558  //checking if exception has been thrown (in case of Global/Stream early termination, for this LS)
559  bool exception_detected = exception_detected_;
560  for (auto ex : exceptionInLS_)
561  if (lumi == ex)
562  exception_detected = true;
563 
564  if (edm::shutdown_flag || exception_detected) {
565  edm::LogInfo("FastMonitoringService")
566  << "Run interrupted. Skip writing EoL information -: " << processedEventsPerLumi_[lumi].first
567  << " events were processed in LUMI " << lumi;
568  //this will prevent output modules from producing json file for possibly incomplete lumi
569  processedEventsPerLumi_[lumi].first = 0;
570  processedEventsPerLumi_[lumi].second = true;
571  //disable this exception, so service can be used standalone (will be thrown if output module asks for this information)
572  //throw cms::Exception("FastMonitoringService") << "SOURCE did not send update for lumi block. LUMI -:" << lumi;
573  return;
574  }
575 
576  if (inputSource_ || daqInputSource_) {
577  auto sourceReport =
579  if (sourceReport.first) {
580  if (sourceReport.second != processedEventsPerLumi_[lumi].first) {
581  throw cms::Exception("FastMonitoringService") << "MISMATCH with SOURCE update. LUMI -: " << lumi
582  << ", events(processed):" << processedEventsPerLumi_[lumi].first
583  << " events(source):" << sourceReport.second;
584  }
585  }
586  }
587 
588  edm::LogInfo("FastMonitoringService")
589  << "Statistics for lumisection -: lumi = " << lumi << " events = " << lumiProcessedJptr->value()
590  << " time = " << usecondsForLumi / 1000000 << " size = " << accuSize << " thr = " << throughput;
591  delete lumiProcessedJptr;
592 
593  //full global and stream merge (will be used by output modules), output from this service is deprecated
594  fmt_->jsonMonitor_->outputFullJSON("dummy", lumi, false);
595  fmt_->jsonMonitor_->discardCollected(lumi); //we don't do further updates for this lumi
596  }
std::map< unsigned int, timeval > lumiStartTime_
std::pair< bool, unsigned int > getEventReport(unsigned int lumi, bool erase)
std::map< unsigned int, unsigned long > accuSize_
volatile std::atomic< bool > shutdown_flag
std::pair< bool, unsigned int > getEventReport(unsigned int lumi, bool erase)
Definition: DAQSource.cc:1390
std::unique_ptr< FastMonitoringThread > fmt_
std::map< unsigned int, std::pair< unsigned int, bool > > processedEventsPerLumi_
void doSnapshot(const unsigned int ls, const bool isGlobalEOL)
constexpr double throughputFactor()
Log< level::Info, false > LogInfo
FedRawDataInputSource * inputSource_
std::vector< unsigned int > exceptionInLS_
#define LogDebug(id)

◆ preModuleBeginJob()

void evf::FastMonitoringService::preModuleBeginJob ( edm::ModuleDescription const &  desc)

Definition at line 477 of file FastMonitoringService.cc.

References submitPVResolutionJobs::desc, fmt_, CommonMethods::lock(), and nOutputModules_.

Referenced by FastMonitoringService().

477  {
478  std::lock_guard<std::mutex> lock(fmt_->monlock_);
479  //std::cout << " Pre module Begin Job module: " << desc.moduleName() << std::endl;
480 
481  //build a map of modules keyed by their module description address
482  //here we need to treat output modules in a special way so they can be easily singled out
483  if (desc.moduleName() == "Stream" || desc.moduleName() == "GlobalEvFOutputModule" ||
484  desc.moduleName() == "EventStreamFileWriter" || desc.moduleName() == "PoolOutputModule") {
485  fmt_->m_data.encModule_.updateReserved((void*)&desc);
486  nOutputModules_++;
487  } else
488  fmt_->m_data.encModule_.update((void*)&desc);
489  }
std::unique_ptr< FastMonitoringThread > fmt_

◆ preModuleEvent()

void evf::FastMonitoringService::preModuleEvent ( edm::StreamContext const &  sc,
edm::ModuleCallingContext const &  mcc 
)

Definition at line 696 of file FastMonitoringService.cc.

References getSID(), getTID(), microstate_, edm::ModuleCallingContext::moduleDescription(), nThreads_, tbbMonitoringMode_, and tmicrostate_.

Referenced by FastMonitoringService().

696  {
697  microstate_[getSID(sc)] = (void*)(mcc.moduleDescription());
698  if (!tbbMonitoringMode_)
699  return;
700  auto tid = getTID();
701  if (tid >= nThreads_)
702  return;
703  tmicrostate_[tid] = (void*)(mcc.moduleDescription());
704  }
std::vector< ContainableAtomic< const void * > > microstate_
static unsigned int getSID(edm::StreamContext const &sc)
std::vector< ContainableAtomic< const void * > > tmicrostate_

◆ preModuleEventAcquire()

void evf::FastMonitoringService::preModuleEventAcquire ( edm::StreamContext const &  sc,
edm::ModuleCallingContext const &  mcc 
)

Definition at line 670 of file FastMonitoringService.cc.

References getSID(), getTID(), microstate_, microstateAcqFlag_, edm::ModuleCallingContext::moduleDescription(), nThreads_, tbbMonitoringMode_, tmicrostate_, and tmicrostateAcqFlag_.

Referenced by FastMonitoringService().

671  {
672  microstate_[getSID(sc)] = (void*)(mcc.moduleDescription());
673  microstateAcqFlag_[getSID(sc)] = 1;
674  if (!tbbMonitoringMode_)
675  return;
676  auto tid = getTID();
677  if (tid >= nThreads_)
678  return;
679  tmicrostate_[tid] = (void*)(mcc.moduleDescription());
680  tmicrostateAcqFlag_[tid] = 1;
681  }
std::vector< ContainableAtomic< const void * > > microstate_
std::vector< ContainableAtomic< unsigned char > > tmicrostateAcqFlag_
std::vector< ContainableAtomic< unsigned char > > microstateAcqFlag_
static unsigned int getSID(edm::StreamContext const &sc)
std::vector< ContainableAtomic< const void * > > tmicrostate_

◆ prePathEvent()

void evf::FastMonitoringService::prePathEvent ( edm::StreamContext const &  ,
edm::PathContext const &   
)

◆ preSourceEarlyTermination()

void evf::FastMonitoringService::preSourceEarlyTermination ( edm::TerminationOrigin  to)

Definition at line 433 of file FastMonitoringService.cc.

References visDQMUpload::context, exception_detected_, edm::ExceptionFromAnotherContext, edm::ExceptionFromThisContext, edm::ExternalSignal, fmt_, has_data_exception_, has_source_exception_, CommonMethods::lock(), and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by FastMonitoringService().

433  {
436  context = " FromThisContext ";
438  context = " FromAnotherContext";
440  context = " FromExternalSignal";
441  edm::LogWarning("FastMonitoringService") << " SOURCE "
442  << "earlyTermination -: " << context;
443  std::lock_guard<std::mutex> lock(fmt_->monlock_);
444  exception_detected_ = true;
445  has_source_exception_.store(true);
446  has_data_exception_.store(true);
447  }
std::atomic< bool > has_data_exception_
std::unique_ptr< FastMonitoringThread > fmt_
std::atomic< bool > has_source_exception_
Log< level::Warning, false > LogWarning

◆ preSourceEvent()

void evf::FastMonitoringService::preSourceEvent ( edm::StreamID  sid)

Definition at line 650 of file FastMonitoringService.cc.

References evf::getmInput(), getSID(), getTID(), microstate_, nThreads_, tbbMonitoringMode_, and tmicrostate_.

Referenced by FastMonitoringService().

650  {
651  microstate_[getSID(sid)] = getmInput();
652  if (!tbbMonitoringMode_)
653  return;
654  auto tid = getTID();
655  if (tid >= nThreads_)
656  return;
657  tmicrostate_[tid] = getmInput();
658  }
std::vector< ContainableAtomic< const void * > > microstate_
static unsigned int getSID(edm::StreamContext const &sc)
constexpr edm::ModuleDescription const * getmInput()
std::vector< ContainableAtomic< const void * > > tmicrostate_

◆ preStreamBeginLumi()

void evf::FastMonitoringService::preStreamBeginLumi ( edm::StreamContext const &  sc)

Definition at line 610 of file FastMonitoringService.cc.

References edm::StreamContext::eventID(), fmt_, evf::getmBoL(), CommonMethods::lock(), edm::EventID::luminosityBlock(), microstate_, edm::StreamContext::streamID(), and edm::StreamID::value().

Referenced by FastMonitoringService().

610  {
611  std::lock_guard<std::mutex> lock(fmt_->monlock_);
612  fmt_->m_data.streamLumi_[sc.streamID().value()] = sc.eventID().luminosityBlock();
613 
614  //reset collected values for this stream
615  *(fmt_->m_data.processed_[sc.streamID().value()]) = 0;
616 
617  microstate_[sc.streamID().value()] = getmBoL();
618  }
std::vector< ContainableAtomic< const void * > > microstate_
constexpr edm::ModuleDescription const * getmBoL()
std::unique_ptr< FastMonitoringThread > fmt_

◆ preStreamEarlyTermination()

void evf::FastMonitoringService::preStreamEarlyTermination ( edm::StreamContext const &  sc,
edm::TerminationOrigin  to 
)

Definition at line 401 of file FastMonitoringService.cc.

References visDQMUpload::context, edm::StreamContext::eventID(), edm::ExceptionFromAnotherContext, edm::ExceptionFromThisContext, exceptionInLS_, edm::ExternalSignal, fmt_, has_data_exception_, CommonMethods::lock(), edm::EventID::luminosityBlock(), edm::StreamContext::streamID(), AlCaHLTBitMon_QueryRunRegistry::string, and edm::StreamID::value().

Referenced by FastMonitoringService().

401  {
404  context = " FromThisContext ";
406  context = " FromAnotherContext";
408  context = " FromExternalSignal";
409  edm::LogWarning("FastMonitoringService")
410  << " STREAM " << sc.streamID().value() << " earlyTermination -: ID:" << sc.eventID()
411  << " LS:" << sc.eventID().luminosityBlock() << " " << context;
412  std::lock_guard<std::mutex> lock(fmt_->monlock_);
413  exceptionInLS_.push_back(sc.eventID().luminosityBlock());
414  has_data_exception_.store(true);
415  }
std::atomic< bool > has_data_exception_
std::unique_ptr< FastMonitoringThread > fmt_
Log< level::Warning, false > LogWarning
std::vector< unsigned int > exceptionInLS_

◆ preStreamEndLumi()

void evf::FastMonitoringService::preStreamEndLumi ( edm::StreamContext const &  sc)

Definition at line 624 of file FastMonitoringService.cc.

References edm::StreamContext::eventID(), fmt_, evf::getmEoL(), CommonMethods::lock(), edm::EventID::luminosityBlock(), microstate_, edm::StreamContext::streamID(), and edm::StreamID::value().

Referenced by FastMonitoringService().

624  {
625  std::lock_guard<std::mutex> lock(fmt_->monlock_);
626  //update processed count to be complete at this time
627  //doStreamEOLSnapshot(sc.eventID().luminosityBlock(), sid);
628  fmt_->jsonMonitor_->snapStreamAtomic(sc.eventID().luminosityBlock(), sc.streamID().value());
629  //reset this in case stream does not get notified of next lumi (we keep processed events only)
630  microstate_[sc.streamID().value()] = getmEoL();
631  }
std::vector< ContainableAtomic< const void * > > microstate_
std::unique_ptr< FastMonitoringThread > fmt_
constexpr edm::ModuleDescription const * getmEoL()

◆ reportLockWait()

void evf::FastMonitoringService::reportLockWait ( unsigned int  ls,
double  waitTime,
unsigned int  lockCount 
)

Definition at line 767 of file FastMonitoringService.cc.

References fmt_, CommonMethods::lock(), lockStatsDuringLumi_, and eostools::ls().

Referenced by FedRawDataInputSource::readSupervisor(), and DAQSource::readSupervisor().

767  {
768  std::lock_guard<std::mutex> lock(fmt_->monlock_);
769  lockStatsDuringLumi_[ls] = std::pair<double, unsigned int>(waitTime, lockCount);
770  }
std::unique_ptr< FastMonitoringThread > fmt_
def ls(path, rec=False)
Definition: eostools.py:349
std::map< unsigned int, std::pair< double, unsigned int > > lockStatsDuringLumi_

◆ setExceptionDetected()

void evf::FastMonitoringService::setExceptionDetected ( unsigned int  ls)

Definition at line 449 of file FastMonitoringService.cc.

References exception_detected_, exceptionInLS_, fmt_, CommonMethods::lock(), and eostools::ls().

Referenced by FedRawDataInputSource::getNextEvent(), and DAQSource::getNextEventFromDataBlock().

449  {
450  std::lock_guard<std::mutex> lock(fmt_->monlock_);
451  if (!ls)
452  exception_detected_ = true;
453  else
454  exceptionInLS_.push_back(ls);
455  }
std::unique_ptr< FastMonitoringThread > fmt_
def ls(path, rec=False)
Definition: eostools.py:349
std::vector< unsigned int > exceptionInLS_

◆ setInputSource() [1/2]

void evf::FastMonitoringService::setInputSource ( FedRawDataInputSource inputSource)
inline

Definition at line 227 of file FastMonitoringService.h.

References inputSource_.

Referenced by DAQSource::DAQSource(), and FedRawDataInputSource::FedRawDataInputSource().

227 { inputSource_ = inputSource; }
FedRawDataInputSource * inputSource_

◆ setInputSource() [2/2]

void evf::FastMonitoringService::setInputSource ( DAQSource inputSource)
inline

Definition at line 228 of file FastMonitoringService.h.

References daqInputSource_.

228 { daqInputSource_ = inputSource; }

◆ setInState()

void evf::FastMonitoringService::setInState ( FastMonState::InputState  inputState)
inline

◆ setInStateSup()

void evf::FastMonitoringService::setInStateSup ( FastMonState::InputState  inputState)
inline

◆ setTMicrostate()

void evf::FastMonitoringService::setTMicrostate ( FastMonState::Microstate  m)

Definition at line 772 of file FastMonitoringService.cc.

References visualization-live-secondInstance_cfg::m, specialMicroStateNames, and tmicrostate_.

Referenced by IdleSourceSentry::IdleSourceSentry(), and IdleSourceSentry::~IdleSourceSentry().

772  {
773  tmicrostate_[tbb::this_task_arena::current_thread_index()] = &specialMicroStateNames[m];
774  }
static const edm::ModuleDescription specialMicroStateNames[FastMonState::mCOUNT]
std::vector< ContainableAtomic< const void * > > tmicrostate_

◆ shouldWriteFiles()

bool evf::FastMonitoringService::shouldWriteFiles ( unsigned int  lumi,
unsigned int *  proc = nullptr 
)
inline

Definition at line 220 of file FastMonitoringService.h.

References getAbortFlagForLumi(), getEventsProcessedForLumi(), and ValidateTausOnZEEFastSim_cff::proc.

Referenced by L1TriggerJSONMonitoring::globalEndLuminosityBlockSummary(), and HLTriggerJSONMonitoring::globalEndLuminosityBlockSummary().

220  {
221  unsigned int processed = getEventsProcessedForLumi(lumi);
222  if (proc)
223  *proc = processed;
224  return !getAbortFlagForLumi(lumi);
225  }
unsigned int getEventsProcessedForLumi(unsigned int lumi, bool *abortFlag=nullptr)
bool getAbortFlagForLumi(unsigned int lumi)

◆ snapshotRunner()

void evf::FastMonitoringService::snapshotRunner ( )
private

Definition at line 811 of file FastMonitoringService.cc.

References doSnapshot(), f, fastMonIntervals_, fastPath_, fmt_, mps_fire::i, inputState_, inputStateNames, inputSupervisorState_, lastGlobalLumi_, CommonMethods::lock(), monInit_, mps_check::msg, nMonThreads_, AlCaHLTBitMon_ParallelJobs::p, sleepTime_, snapCounter_, and verbose_.

Referenced by preBeginJob().

811  {
812  monInit_.exchange(true, std::memory_order_acquire);
813  while (!fmt_->m_stoprequest) {
814  std::vector<std::vector<unsigned int>> lastEnc;
815  {
816  std::unique_lock<std::mutex> lock(fmt_->monlock_);
817 
818  doSnapshot(lastGlobalLumi_, false);
819 
820  lastEnc.emplace_back(fmt_->m_data.tmicrostateEncoded_);
821  lastEnc.emplace_back(fmt_->m_data.microstateEncoded_);
822 
824  std::vector<std::string> CSVv;
825  for (unsigned int i = 0; i < nMonThreads_; i++) {
826  CSVv.push_back(fmt_->jsonMonitor_->getCSVString((int)i));
827  }
828  // release mutex before writing out fast path file
829  lock.release()->unlock();
830  fmt_->jsonMonitor_->outputCSV(fastPath_, CSVv);
831  }
832  snapCounter_++;
833  }
834 
835  if (verbose_) {
836  edm::LogInfo msg("FastMonitoringService");
837  auto f = [&](std::vector<unsigned int> const& p) {
838  for (unsigned int i = 0; i < nMonThreads_; i++) {
839  if (i == 0)
840  msg << "[" << p[i] << ",";
841  else if (i <= nMonThreads_ - 1)
842  msg << p[i] << ",";
843  else
844  msg << p[i] << "]";
845  }
846  };
847 
848  msg << "Current states: Ms=" << fmt_->m_data.fastMacrostateJ_.value() << " ms=";
849  f(lastEnc[0]);
850  msg << " us=";
851  f(lastEnc[1]);
853  }
854 
855  ::sleep(sleepTime_);
856  }
857  }
std::atomic< FastMonState::InputState > inputState_
static const std::string inputStateNames[FastMonState::inCOUNT]
std::unique_ptr< FastMonitoringThread > fmt_
void doSnapshot(const unsigned int ls, const bool isGlobalEOL)
std::atomic< FastMonState::InputState > inputSupervisorState_
double f[11][100]
tuple msg
Definition: mps_check.py:286

◆ startedLookingForFile()

void evf::FastMonitoringService::startedLookingForFile ( )

Definition at line 731 of file FastMonitoringService.cc.

References fileLookStart_.

Referenced by FedRawDataInputSource::readSupervisor(), and DAQSource::readSupervisor().

731  {
732  gettimeofday(&fileLookStart_, nullptr);
733  /*
734  std::cout << "Started looking for .raw file at: s=" << fileLookStart_.tv_sec << ": ms = "
735  << fileLookStart_.tv_usec / 1000.0 << std::endl;
736  */
737  }

◆ stoppedLookingForFile()

void evf::FastMonitoringService::stoppedLookingForFile ( unsigned int  lumi)

Definition at line 739 of file FastMonitoringService.cc.

References avgLeadTime_, fileLookStart_, fileLookStop_, fmt_, mps_fire::i, leadTimes_, CommonMethods::lock(), BXlumiParameters_cfi::lumi, and lumiFromSource_.

Referenced by FedRawDataInputSource::readSupervisor(), and DAQSource::readSupervisor().

739  {
740  gettimeofday(&fileLookStop_, nullptr);
741  /*
742  std::cout << "Stopped looking for .raw file at: s=" << fileLookStop_.tv_sec << ": ms = "
743  << fileLookStop_.tv_usec / 1000.0 << std::endl;
744  */
745  std::lock_guard<std::mutex> lock(fmt_->monlock_);
746 
747  if (lumi > lumiFromSource_) {
749  leadTimes_.clear();
750  }
751  unsigned long elapsedTime = (fileLookStop_.tv_sec - fileLookStart_.tv_sec) * 1000000 // sec to us
752  + (fileLookStop_.tv_usec - fileLookStart_.tv_usec); // us
753  // add this to lead times for this lumi
754  leadTimes_.push_back((double)elapsedTime);
755 
756  // recompute average lead time for this lumi
757  if (leadTimes_.size() == 1)
759  else {
760  double totTime = 0;
761  for (unsigned int i = 0; i < leadTimes_.size(); i++)
762  totTime += leadTimes_[i];
763  avgLeadTime_[lumi] = 0.001 * (totTime / leadTimes_.size());
764  }
765  }
std::unique_ptr< FastMonitoringThread > fmt_
std::map< unsigned int, double > avgLeadTime_
std::vector< double > leadTimes_

Member Data Documentation

◆ accuSize_

std::map<unsigned int, unsigned long> evf::FastMonitoringService::accuSize_
private

Definition at line 280 of file FastMonitoringService.h.

Referenced by accumulateFileSize(), and preGlobalEndLumi().

◆ avgLeadTime_

std::map<unsigned int, double> evf::FastMonitoringService::avgLeadTime_
private

Definition at line 277 of file FastMonitoringService.h.

Referenced by doSnapshot(), postGlobalEndLumi(), and stoppedLookingForFile().

◆ ct_

std::unique_ptr<ConcurrencyTracker> evf::FastMonitoringService::ct_
private

Definition at line 247 of file FastMonitoringService.h.

Referenced by doSnapshot(), preallocate(), and preBeginJob().

◆ daqInputSource_

DAQSource* evf::FastMonitoringService::daqInputSource_ = nullptr
private

Definition at line 251 of file FastMonitoringService.h.

Referenced by preGlobalEndLumi(), and setInputSource().

◆ exception_detected_

bool evf::FastMonitoringService::exception_detected_ = false
private

◆ exceptionInLS_

std::vector<unsigned int> evf::FastMonitoringService::exceptionInLS_
private

◆ fastMicrostateDefPath_

std::string evf::FastMonitoringService::fastMicrostateDefPath_
private

Definition at line 263 of file FastMonitoringService.h.

Referenced by FastMonitoringService(), and preBeginJob().

◆ fastMonIntervals_

unsigned int evf::FastMonitoringService::fastMonIntervals_
private

Definition at line 261 of file FastMonitoringService.h.

Referenced by snapshotRunner().

◆ fastName_

std::string evf::FastMonitoringService::fastName_
private

Definition at line 264 of file FastMonitoringService.h.

Referenced by preBeginJob().

◆ fastPath_

std::string evf::FastMonitoringService::fastPath_
private

Definition at line 264 of file FastMonitoringService.h.

Referenced by preBeginJob(), and snapshotRunner().

◆ fileLookStart_

timeval evf::FastMonitoringService::fileLookStart_
private

Definition at line 269 of file FastMonitoringService.h.

Referenced by startedLookingForFile(), and stoppedLookingForFile().

◆ fileLookStop_

timeval evf::FastMonitoringService::fileLookStop_
private

Definition at line 269 of file FastMonitoringService.h.

Referenced by stoppedLookingForFile().

◆ filesProcessedDuringLumi_

std::map<unsigned int, unsigned int> evf::FastMonitoringService::filesProcessedDuringLumi_
private

Definition at line 278 of file FastMonitoringService.h.

Referenced by accumulateFileSize(), doSnapshot(), and postGlobalEndLumi().

◆ fmt_

std::unique_ptr<FastMonitoringThread> evf::FastMonitoringService::fmt_
private

◆ has_data_exception_

std::atomic<bool> evf::FastMonitoringService::has_data_exception_ = false
private

◆ has_source_exception_

std::atomic<bool> evf::FastMonitoringService::has_source_exception_ = false
private

◆ inputLegendFileJson_

std::string evf::FastMonitoringService::inputLegendFileJson_
private

Definition at line 299 of file FastMonitoringService.h.

Referenced by postBeginJob(), and preBeginJob().

◆ inputSource_

FedRawDataInputSource* evf::FastMonitoringService::inputSource_ = nullptr
private

Definition at line 250 of file FastMonitoringService.h.

Referenced by preGlobalEndLumi(), and setInputSource().

◆ inputState_

std::atomic<FastMonState::InputState> evf::FastMonitoringService::inputState_ {FastMonState::InputState::inInit}
private

Definition at line 252 of file FastMonitoringService.h.

Referenced by doSnapshot(), setInState(), and snapshotRunner().

◆ inputStateNames

const std::string evf::FastMonitoringService::inputStateNames
static

Definition at line 172 of file FastMonitoringService.h.

Referenced by makeInputLegendaJson(), and snapshotRunner().

◆ inputSupervisorState_

std::atomic<FastMonState::InputState> evf::FastMonitoringService::inputSupervisorState_ {FastMonState::InputState::inInit}
private

Definition at line 253 of file FastMonitoringService.h.

Referenced by doSnapshot(), setInStateSup(), and snapshotRunner().

◆ isInitTransition_

std::atomic<bool> evf::FastMonitoringService::isInitTransition_
private

Definition at line 272 of file FastMonitoringService.h.

Referenced by doSnapshot(), postGlobalBeginRun(), and preBeginJob().

◆ lastGlobalLumi_

unsigned int evf::FastMonitoringService::lastGlobalLumi_
private

Definition at line 271 of file FastMonitoringService.h.

Referenced by preBeginJob(), preGlobalBeginLumi(), and snapshotRunner().

◆ leadTimes_

std::vector<double> evf::FastMonitoringService::leadTimes_
private

Definition at line 281 of file FastMonitoringService.h.

Referenced by stoppedLookingForFile().

◆ lockStatsDuringLumi_

std::map<unsigned int, std::pair<double, unsigned int> > evf::FastMonitoringService::lockStatsDuringLumi_
private

Definition at line 282 of file FastMonitoringService.h.

Referenced by doSnapshot(), postGlobalEndLumi(), and reportLockWait().

◆ lumiFromSource_

unsigned int evf::FastMonitoringService::lumiFromSource_
private

Definition at line 273 of file FastMonitoringService.h.

Referenced by preBeginJob(), and stoppedLookingForFile().

◆ lumiStartTime_

std::map<unsigned int, timeval> evf::FastMonitoringService::lumiStartTime_
private

Definition at line 268 of file FastMonitoringService.h.

Referenced by preGlobalBeginLumi(), and preGlobalEndLumi().

◆ macroStateNames

const std::string evf::FastMonitoringService::macroStateNames
static
Initial value:
= {"Init",
"JobReady",
"RunGiven",
"Running",
"Stopping",
"Done",
"JobEnded",
"Error",
"ErrorEnded",
"End",
"Invalid"}

Definition at line 171 of file FastMonitoringService.h.

◆ microstate_

std::vector<ContainableAtomic<const void*> > evf::FastMonitoringService::microstate_
private

◆ microstateAcqFlag_

std::vector<ContainableAtomic<unsigned char> > evf::FastMonitoringService::microstateAcqFlag_
private

◆ microstateDefPath_

std::string evf::FastMonitoringService::microstateDefPath_
private

Definition at line 263 of file FastMonitoringService.h.

Referenced by FastMonitoringService(), and preBeginJob().

◆ moduleLegendFile_

std::string evf::FastMonitoringService::moduleLegendFile_
private

Definition at line 297 of file FastMonitoringService.h.

Referenced by preBeginJob().

◆ moduleLegendFileJson_

std::string evf::FastMonitoringService::moduleLegendFileJson_
private

Definition at line 298 of file FastMonitoringService.h.

Referenced by postBeginJob(), and preBeginJob().

◆ monInit_

std::atomic<bool> evf::FastMonitoringService::monInit_
private

Definition at line 302 of file FastMonitoringService.h.

Referenced by preBeginJob(), and snapshotRunner().

◆ nMonThreads_

unsigned int evf::FastMonitoringService::nMonThreads_ = 0
private

Definition at line 256 of file FastMonitoringService.h.

Referenced by doSnapshot(), preallocate(), preBeginJob(), and snapshotRunner().

◆ nOutputModules_

unsigned int evf::FastMonitoringService::nOutputModules_ = 0
private

Definition at line 300 of file FastMonitoringService.h.

Referenced by makeModuleLegendaJson(), and preModuleBeginJob().

◆ nStreams_

unsigned int evf::FastMonitoringService::nStreams_ = 0
private

Definition at line 255 of file FastMonitoringService.h.

Referenced by doSnapshot(), preallocate(), and preBeginJob().

◆ nThreads_

unsigned int evf::FastMonitoringService::nThreads_ = 0
private

◆ processedEventsPerLumi_

std::map<unsigned int, std::pair<unsigned int, bool> > evf::FastMonitoringService::processedEventsPerLumi_
private

◆ runDirectory_

std::filesystem::path evf::FastMonitoringService::runDirectory_
private

Definition at line 291 of file FastMonitoringService.h.

Referenced by getRunDirName(), and preBeginJob().

◆ sleepTime_

int evf::FastMonitoringService::sleepTime_
private

Definition at line 260 of file FastMonitoringService.h.

Referenced by preBeginJob(), and snapshotRunner().

◆ snapCounter_

unsigned int evf::FastMonitoringService::snapCounter_ = 0
private

Definition at line 262 of file FastMonitoringService.h.

Referenced by snapshotRunner().

◆ specialMicroStateNames

const edm::ModuleDescription evf::FastMonitoringService::specialMicroStateNames
static
Initial value:
= {
edm::ModuleDescription("Dummy", "Invalid"),
edm::ModuleDescription("Dummy", "Idle"),
edm::ModuleDescription("Dummy", "FwkOvhSrc"),
edm::ModuleDescription("Dummy", "FwkOvhMod"),
edm::ModuleDescription("Dummy", "FwkEoL"),
edm::ModuleDescription("Dummy", "Input"),
edm::ModuleDescription("Dummy", "DQM"),
edm::ModuleDescription("Dummy", "BoL"),
edm::ModuleDescription("Dummy", "EoL"),
edm::ModuleDescription("Dummy", "GlobalEoL"),
edm::ModuleDescription("Dummy", "Fwk"),
edm::ModuleDescription("Dummy", "IdleSource"),
edm::ModuleDescription("Dummy", "Event"),
edm::ModuleDescription("Dummy", "Ignore")}

Definition at line 170 of file FastMonitoringService.h.

Referenced by preBeginJob(), and setTMicrostate().

◆ streamCounterUpdating_

std::vector<std::atomic<bool>*> evf::FastMonitoringService::streamCounterUpdating_
private

Definition at line 289 of file FastMonitoringService.h.

Referenced by preBeginJob().

◆ tbbConcurrencyTracker_

bool evf::FastMonitoringService::tbbConcurrencyTracker_
private

Definition at line 259 of file FastMonitoringService.h.

Referenced by preBeginJob().

◆ tbbMonitoringMode_

bool evf::FastMonitoringService::tbbMonitoringMode_
private

◆ threadIDAvailable_

bool evf::FastMonitoringService::threadIDAvailable_ = false
private

Definition at line 293 of file FastMonitoringService.h.

◆ tmicrostate_

std::vector<ContainableAtomic<const void*> > evf::FastMonitoringService::tmicrostate_
private

◆ tmicrostateAcqFlag_

std::vector<ContainableAtomic<unsigned char> > evf::FastMonitoringService::tmicrostateAcqFlag_
private

◆ totalEventsProcessed_

std::atomic<unsigned long> evf::FastMonitoringService::totalEventsProcessed_
private

Definition at line 295 of file FastMonitoringService.h.

Referenced by postEvent().

◆ verbose_

bool evf::FastMonitoringService::verbose_ = false
private

Definition at line 315 of file FastMonitoringService.h.

Referenced by snapshotRunner().

◆ workingDirectory_

std::filesystem::path evf::FastMonitoringService::workingDirectory_
private

Definition at line 291 of file FastMonitoringService.h.

Referenced by preBeginJob().