21 using namespace jsoncollector;
34 const std::string FastMonitoringService::macroStateNames[FastMonitoringThread::MCOUNT] =
35 {
"Init",
"JobReady",
"RunGiven",
"Running",
36 "Stopping",
"Done",
"JobEnded",
"Error",
"ErrorEnded",
"End",
39 const std::string FastMonitoringService::inputStateNames[FastMonitoringThread::inCOUNT] =
40 {
"Ignore",
"Init",
"WaitInput",
"NewLumi",
"RunEnd",
"ProcessingFile",
"WaitChunk",
"ChunkReceived",
41 "ChecksumEvent",
"CachedEvent",
"ReadEvent",
"ReadCleanup",
"NoRequest",
"NoRequestWithIdleThreads",
42 "NoRequestWithIdleAndEoLThreads",
"NoRequestWithGlobalEoL",
"NoRequestWithAllEoLThreads",
"NoRequestWithEoLThreads",
43 "SupFileLimit",
"SupWaitFreeChunk",
"SupWaitFreeChunkCopying",
"SupWaitFreeThread",
"SupWaitFreeThreadCopying",
44 "SupBusy",
"SupLockPolling",
"SupLockPollingCopying",
45 "SupNoFile",
"SupNewFile",
"SupNewFileWaitThreadCopying",
"SupNewFileWaitThread",
46 "SupNewFileWaitChunkCopying",
"SupNewFileWaitChunk",
47 "WaitInput_fileLimit",
"WaitInput_waitFreeChunk",
"WaitInput_waitFreeChunkCopying",
"WaitInput_waitFreeThread",
"WaitInput_waitFreeThreadCopying",
48 "WaitInput_busy",
"WaitInput_lockPolling",
"WaitInput_lockPollingCopying",
"WaitInput_runEnd",
49 "WaitInput_noFile",
"WaitInput_newFile",
"WaitInput_newFileWaitThreadCopying",
"WaitInput_newFileWaitThread",
50 "WaitInput_newFileWaitChunkCopying",
"WaitInput_newFileWaitChunk",
51 "WaitChunk_fileLimit",
"WaitChunk_waitFreeChunk",
"WaitChunk_waitFreeChunkCopying",
"WaitChunk_waitFreeThread",
"WaitChunk_waitFreeThreadCopying",
52 "WaitChunk_busy",
"WaitChunk_lockPolling",
"WaitChunk_lockPollingCopying",
"WaitChunk_runEnd",
53 "WaitChunk_noFile",
"WaitChunk_newFile",
"WaitChunk_newFileWaitThreadCopying",
"WaitChunk_newFileWaitThread",
54 "WaitChunk_newFileWaitChunkCopying",
"WaitChunk_newFileWaitChunk"
58 const std::string FastMonitoringService::nopath_ =
"NoPath";
65 ,sleepTime_(iPS.getUntrackedParameter<int>(
"sleepTime", 1))
66 ,fastMonIntervals_(iPS.getUntrackedParameter<unsigned int>(
"fastMonIntervals", 2))
67 ,fastName_(
"fastmoni")
68 ,slowName_(
"slowmoni")
69 ,filePerFwkStream_(iPS.getUntrackedParameter<bool>(
"filePerFwkStream",
false))
70 ,totalEventsProcessed_(0)
106 std::string microstateBaseSuffix =
"src/EventFilter/Utilities/plugins/microstatedef.jsd";
108 if (stat(microstatePath.c_str(), &statbuf)) {
109 microstatePath =
std::string(getenv(
"CMSSW_RELEASE_BASE")) +
"/" + microstateBaseSuffix;
110 if (stat(microstatePath.c_str(), &statbuf)) {
111 microstatePath = microstateBaseSuffix;
112 if (stat(microstatePath.c_str(), &statbuf))
113 throw cms::Exception(
"FastMonitoringService") <<
"microstate definition file not found";
127 desc.
setComment(
"Service for File-based DAQ monitoring and event accounting");
128 desc.
addUntracked<
int> (
"sleepTime",1)->setComment(
"Sleep time of the monitoring thread");
129 desc.
addUntracked<
unsigned int> (
"fastMonIntervals",2)->setComment(
"Modulo of sleepTime intervals on which fastmon file is written out");
130 desc.
addUntracked<
bool> (
"filePerFwkStream",
false)->setComment(
"Switches on monitoring output per framework stream");
132 descriptions.
add(
"FastMonitoringService", desc);
142 pathLegend[
"names"]=legendaVector;
143 pathLegend[
"reserved"]=valReserved;
145 return writer.
write(pathLegend);
156 moduleLegend[
"names"]=legendaVector;
157 moduleLegend[
"reserved"]=valReserved;
158 moduleLegend[
"special"]=valSpecial;
159 moduleLegend[
"output"]=valOutputModules;
161 return writer.
write(moduleLegend);
169 moduleLegend[
"names"]=legendaVector;
171 return writer.
write(moduleLegend);
192 throw cms::Exception(
"FastMonitoringService") <<
"EvFDaqDirector is not present";
205 <<
". No monitoring data will be written.";
208 std::ostringstream fastFileName;
210 fastFileName <<
fastName_ <<
"_pid" << std::setfill(
'0') << std::setw(5) << getpid() <<
".fast";
212 fast /= fastFileName.str();
216 std::ostringstream fastFileNameTid;
217 fastFileNameTid <<
fastName_ <<
"_pid" << std::setfill(
'0') << std::setw(5) << getpid() <<
"_tid" <<
i <<
".fast";
219 fastTid /= fastFileNameTid.str();
223 std::ostringstream moduleLegFile;
224 std::ostringstream moduleLegFileJson;
225 moduleLegFile <<
"microstatelegend_pid" << std::setfill(
'0') << std::setw(5) << getpid() <<
".leg";
226 moduleLegFileJson <<
"microstatelegend_pid" << std::setfill(
'0') << std::setw(5) << getpid() <<
".jsn";
230 std::ostringstream pathLegFile;
231 std::ostringstream pathLegFileJson;
232 pathLegFile <<
"pathlegend_pid" << std::setfill(
'0') << std::setw(5) << getpid() <<
".leg";
234 pathLegFileJson <<
"pathlegend_pid" << std::setfill(
'0') << std::setw(5) << getpid() <<
".jsn";
237 std::ostringstream inputLegFileJson;
238 inputLegFileJson <<
"inputlegend_pid" << std::setfill(
'0') << std::setw(5) << getpid() <<
".jsn";
241 LogDebug(
"FastMonitoringService") <<
"Initializing FastMonitor with microstate def path -: "
254 for(
unsigned int i = 0;
i < (
mCOUNT);
i++)
290 monInit_.store(
false,std::memory_order_release);
324 edm::LogWarning(
"FastMonitoringService") <<
" GLOBAL " <<
"earlyTermination -: LS:"
336 edm::LogWarning(
"FastMonitoringService") <<
" SOURCE " <<
"earlyTermination -: " << context;
396 timeval lumiStartTime;
397 gettimeofday(&lumiStartTime, 0);
421 LogDebug(
"FastMonitoringService") <<
"Lumi ended. Writing JSON information. LUMI -: "
423 timeval lumiStopTime;
424 gettimeofday(&lumiStopTime, 0);
430 unsigned long usecondsForLumi = (lumiStopTime.tv_sec - stt.tv_sec)*1000000
431 + (lumiStopTime.tv_usec - stt.tv_usec);
433 double throughput =
throughputFactor()* double(accuSize) / double(usecondsForLumi);
442 if (!lumiProcessedJptr)
443 throw cms::Exception(
"FastMonitoringService") <<
"Internal error: got null pointer from FastMonitor";
449 if (lumi == ex) exception_detected=
true;
452 edm::LogInfo(
"FastMonitoringService") <<
"Run interrupted. Skip writing EoL information -: "
465 if (sourceReport.first) {
467 throw cms::Exception(
"FastMonitoringService") <<
"MISMATCH with SOURCE update. LUMI -: "
470 <<
" events(source):" << sourceReport.second;
474 edm::LogInfo(
"FastMonitoringService") <<
"Statistics for lumisection -: lumi = " << lumi <<
" events = "
475 << lumiProcessedJptr->
value() <<
" time = " << usecondsForLumi/1000000
476 <<
" size = " << accuSize <<
" thr = " << throughput;
477 delete lumiProcessedJptr;
483 std::stringstream slowFileNameStem;
484 slowFileNameStem <<
slowName_ <<
"_ls" << std::setfill(
'0') << std::setw(4)
485 << lumi <<
"_pid" << std::setfill(
'0')
486 << std::setw(5) << getpid();
488 slow /= slowFileNameStem.str();
492 std::stringstream slowFileName;
493 slowFileName <<
slowName_ <<
"_ls" << std::setfill(
'0') << std::setw(4)
494 << lumi <<
"_pid" << std::setfill(
'0')
495 << std::setw(5) << getpid() <<
".jsn";
497 slow /= slowFileName.str();
600 #elif ATOMIC_LEVEL==1
604 #elif ATOMIC_LEVEL==0 //default
712 unsigned int proc = it->second.first;
713 if (abortFlag) *abortFlag=it->second.second;
717 throw cms::Exception(
"FastMonitoringService") <<
"output module wants already deleted (or never reported by SOURCE) lumisection event count for LUMI -: "<<
lumi;
728 unsigned int abortFlag = it->second.second;
732 throw cms::Exception(
"FastMonitoringService") <<
"output module wants already deleted (or never reported by SOURCE) lumisection status for LUMI -: "<<
lumi;
771 bool anyThreadsIdle=
false;
772 bool anyThreadsEoL=
false;
773 bool allThreadsEoL=
true;
779 else allThreadsEoL=
false;
888 else if (anyThreadsEoL && anyThreadsIdle)
890 else if (anyThreadsIdle)
892 else if (anyThreadsEoL)
894 else if (allThreadsEoL)
int encode(const void *add)
void prePathEvent(edm::StreamContext const &, edm::PathContext const &)
FastMonitoringThread::InputState inputState_
std::string const & pathName() const
unsigned int maxNumberOfThreads() const
std::string pathLegendFileJson_
EventNumber_t event() const
void watchPreStreamEarlyTermination(PreStreamEarlyTermination::slot_type const &iSlot)
void watchPreEvent(PreEvent::slot_type const &iSlot)
unsigned int getEventsProcessedForLumi(unsigned int lumi, bool *abortFlag=0)
static const edm::ModuleDescription reservedMicroStateNames[mCOUNT]
std::string pathLegendFile_
void postSourceEvent(edm::StreamID)
std::string makePathLegendaJson()
void watchPrePathEvent(PrePathEvent::slot_type const &iSlot)
void watchPreallocate(Preallocate::slot_type const &iSlot)
void startedLookingForFile()
ParameterDescriptionBase * addUntracked(U const &iLabel, T const &value)
void setExceptionDetected(unsigned int ls)
boost::filesystem::path runDirectory_
void watchPostEndJob(PostEndJob::slot_type const &iSlot)
void preallocate(edm::service::SystemBounds const &)
std::map< unsigned int, timeval > lumiStartTime_
void start(void(FastMonitoringService::*fp)(), FastMonitoringService *cp)
void preGlobalBeginLumi(edm::GlobalContext const &)
void setAllowAnything()
allow any parameter label/value pairs
double throughputFactor()
void watchPreModuleEvent(PreModuleEvent::slot_type const &iSlot)
void postGlobalEndLumi(edm::GlobalContext const &)
TrainProcessor *const proc
void postEvent(edm::StreamContext const &)
void accumulateFileSize(unsigned int lumi, unsigned long fileSize)
void watchPreGlobalEndLumi(PreGlobalEndLumi::slot_type const &iSlot)
std::map< unsigned int, unsigned long > accuSize_
FastMonitoringThread fmt_
unsigned int microstateBins_
std::vector< unsigned int > eventCountForPathInit_
std::vector< std::atomic< bool > * > streamCounterUpdating_
void watchPostEvent(PostEvent::slot_type const &iSlot)
jsoncollector::DoubleJ fastThroughputJ_
static const int nReservedPaths
LuminosityBlockID const & luminosityBlockID() const
void watchPostStreamEndLumi(PostStreamEndLumi::slot_type const &iSlot)
void watchPreGlobalBeginLumi(PreGlobalBeginLumi::slot_type const &iSlot)
unsigned int nOutputModules_
std::string const & moduleName() const
void watchPostModuleEvent(PostModuleEvent::slot_type const &iSlot)
Value & append(const Value &value)
Append value to array at the end.
void watchPostSourceEvent(PostSourceEvent::slot_type const &iSlot)
volatile std::atomic< bool > shutdown_flag
void doStreamEOLSnapshot(const unsigned int ls, const unsigned int streamID)
std::map< unsigned int, unsigned int > filesProcessedDuringLumi_
void preGlobalEndLumi(edm::GlobalContext const &)
unsigned int lastGlobalLumi_
FastMonitoringThread::InputState inputSupervisorState_
LuminosityBlockNumber_t luminosityBlock() const
void watchPreStreamEndLumi(PreStreamEndLumi::slot_type const &iSlot)
void preGlobalEarlyTermination(edm::GlobalContext const &, edm::TerminationOrigin)
void watchPreSourceEarlyTermination(PreSourceEarlyTermination::slot_type const &iSlot)
std::string inputLegendFileJson_
unsigned int inputstateBins_
jsoncollector::IntJ fastMacrostateJ_
void watchJobFailure(JobFailure::slot_type const &iSlot)
convenience function for attaching to signal
void registerVariables(jsoncollector::FastMonitor *fm, unsigned int nStreams, unsigned int nThreads)
std::atomic< bool > monInit_
std::string makeModuleLegendaJson()
unsigned int maxNumberOfStreams() const
void preModuleBeginJob(edm::ModuleDescription const &)
static const std::string inputStateNames[FastMonitoringThread::inCOUNT]
void setComment(std::string const &value)
std::string makeInputLegendaJson()
void updateReserved(const void *add)
unsigned int lumiFromSource_
std::vector< std::atomic< bool > * > collectedPathList_
std::queue< unsigned int > lastGlobalLumisClosed_
std::string microstateDefPath_
void preStreamEndLumi(edm::StreamContext const &)
std::map< unsigned int, double > avgLeadTime_
std::string moduleLegendFile_
void watchPostStreamBeginLumi(PostStreamBeginLumi::slot_type const &iSlot)
void doSnapshot(const unsigned int ls, const bool isGlobalEOL)
void preStreamEarlyTermination(edm::StreamContext const &, edm::TerminationOrigin)
void watchPreGlobalEarlyTermination(PreGlobalEarlyTermination::slot_type const &iSlot)
std::unique_ptr< jsoncollector::FastMonitor > jsonMonitor_
jsoncollector::IntJ fastFilesProcessedJ_
jsoncollector::DoubleJ fastAvgLeadTimeJ_
static const std::string nopath_
ModuleDescription const * moduleDescription() const
void resetFastMonitor(std::string const µStateDefPath, std::string const &fastMicroStateDefPath)
std::string fastMicrostateDefPath_
bool emptyLumisectionMode_
static const int nReservedModules
void watchPostGlobalEndLumi(PostGlobalEndLumi::slot_type const &iSlot)
std::map< unsigned int, std::pair< double, unsigned int > > lockStatsDuringLumi_
std::string moduleLegendFileJson_
std::vector< unsigned long > firstEventId_
void watchPreModuleBeginJob(PreModuleBeginJob::slot_type const &iSlot)
jsoncollector::DoubleJ fastLockWaitJ_
void postModuleEvent(edm::StreamContext const &, edm::ModuleCallingContext const &)
virtual std::string write(const Value &root)
Serialize a Value in JSON format.
std::vector< std::string > fastPathList_
void postStreamBeginLumi(edm::StreamContext const &)
void completeReservedWithDummies()
StreamID const & streamID() const
void preSourceEvent(edm::StreamID)
void postStreamEndLumi(edm::StreamContext const &)
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
std::vector< const void * > microstate_
std::vector< unsigned int > microstateEncoded_
unsigned int value() const
std::map< unsigned int, std::pair< unsigned int, bool > > processedEventsPerLumi_
void preStreamBeginLumi(edm::StreamContext const &)
std::atomic< unsigned long > totalEventsProcessed_
FedRawDataInputSource * inputSource_
static const int nSpecialModules
LuminosityBlockNumber_t luminosityBlock() const
void watchPreStreamBeginLumi(PreStreamBeginLumi::slot_type const &iSlot)
void add(std::string const &label, ParameterSetDescription const &psetDescription)
FastMonitoringThread::Macrostate macrostate_
std::vector< double > leadTimes_
jsoncollector::IntJ fastPathProcessedJ_
void preBeginJob(edm::PathsAndConsumesOfModulesBase const &, edm::ProcessContext const &pc)
void watchPreBeginJob(PreBeginJob::slot_type const &iSlot)
convenience function for attaching to signal
void stoppedLookingForFile(unsigned int lumi)
void setMicroState(MicroStateService::Microstate)
boost::filesystem::path workingDirectory_
jsoncollector::IntJ fastLockCountJ_
std::vector< jsoncollector::AtomicMonUInt * > processed_
void postGlobalBeginRun(edm::GlobalContext const &)
void preEvent(edm::StreamContext const &)
bool isGlobalLumiTransition_
void preSourceEarlyTermination(edm::TerminationOrigin)
EventID const & eventID() const
void watchPreSourceEvent(PreSourceEvent::slot_type const &iSlot)
bool getAbortFlagForLumi(unsigned int lumi)
volatile std::atomic< bool > shutdown_flag false
void preModuleEvent(edm::StreamContext const &, edm::ModuleCallingContext const &)
Writes a Value in JSON format in a human friendly way.
std::vector< const void * > ministate_
unsigned int macrostateBins_
const void * decode(unsigned int index)
std::vector< unsigned int > streamLumi_
void update(const void *add)
unsigned int ministateBins_
std::vector< Encoding > encPath_
std::vector< unsigned int > inputState_
std::vector< unsigned int > ministateEncoded_
void reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount)
std::vector< unsigned int > exceptionInLS_
void watchPostBeginJob(PostBeginJob::slot_type const &iSlot)
convenience function for attaching to signal
array value (ordered list)