1 #ifndef EvFFastMonitoringService_H 2 #define EvFFastMonitoringService_H 1 14 #include "boost/filesystem.hpp" 24 #include <unordered_map> 63 m_value.store(iValue,std::memory_order_relaxed);
66 operator T() {
return m_value.load(std::memory_order_relaxed); }
75 Encoding(
unsigned int res): reserved_(res), current_(reserved_), currentReserved_(0)
84 delete[] dummiesForReserved_;
88 std::unordered_map<const void *, int>::const_iterator it=quickReference_.find(add);
89 return (it!=quickReference_.end()) ? (*it).second : 0;
94 quickReference_[
add]=
i;
95 if(decoder_.size()<=
i)
96 decoder_.push_back(add);
98 decoder_[currentReserved_] =
add;
101 fillReserved(add,currentReserved_);
106 for(
unsigned int i = currentReserved_;
i<reserved_;
i++)
107 fillReserved(dummiesForReserved_+
i,
i);
111 quickReference_[
add]=current_;
112 decoder_.push_back(add);
116 return decoder_.size();
128 static const std::string macroStateNames[FastMonitoringThread::MCOUNT];
129 static const std::string inputStateNames[FastMonitoringThread::inCOUNT];
168 void setExceptionDetected(
unsigned int ls);
174 void accumulateFileSize(
unsigned int lumi,
unsigned long fileSize);
175 void startedLookingForFile();
176 void stoppedLookingForFile(
unsigned int lumi);
177 void reportLockWait(
unsigned int ls,
double waitTime,
unsigned int lockCount);
178 unsigned int getEventsProcessedForLumi(
unsigned int lumi,
bool * abortFlag=
nullptr);
179 bool getAbortFlagForLumi(
unsigned int lumi);
182 unsigned int processed = getEventsProcessedForLumi(lumi);
184 return !getAbortFlagForLumi(lumi) && (processed || emptyLumisectionMode_);
193 void doSnapshot(
const unsigned int ls,
const bool isGlobalEOL);
197 fmt_.jsonMonitor_->snapStreamAtomic(ls,streamID);
201 monInit_.exchange(
true,std::memory_order_acquire);
202 while (!fmt_.m_stoprequest) {
203 edm::LogInfo(
"FastMonitoringService") <<
"Current states: Ms=" << fmt_.m_data.fastMacrostateJ_.value()
204 <<
" ms=" << encPath_[0].encode(ministate_[0])
205 <<
" us=" << encModule_.encode(microstate_[0])
206 <<
" is=" << inputStateNames[inputState_]
207 <<
" iss="<< inputStateNames[inputSupervisorState_]
211 std::lock_guard<std::mutex>
lock(fmt_.monlock_);
213 doSnapshot(lastGlobalLumi_,
false);
215 if (fastMonIntervals_ && (snapCounter_%fastMonIntervals_)==0) {
216 if (filePerFwkStream_) {
217 std::vector<std::string> CSVv;
218 for (
unsigned int i=0;
i<nStreams_;
i++) {
219 CSVv.push_back(fmt_.jsonMonitor_->getCSVString((
int)
i));
221 fmt_.monlock_.unlock();
222 for (
unsigned int i=0;
i<nStreams_;
i++) {
224 fmt_.jsonMonitor_->outputCSV(fastPathList_[
i],CSVv[i]);
228 std::string CSV = fmt_.jsonMonitor_->getCSVString();
230 fmt_.monlock_.unlock();
232 fmt_.jsonMonitor_->outputCSV(fastPath_,CSV);
248 std::atomic<FastMonitoringThread::InputState> inputState_ { FastMonitoringThread::InputState::inInit };
249 std::atomic<FastMonitoringThread::InputState> inputSupervisorState_ { FastMonitoringThread::InputState::inInit };
255 unsigned int snapCounter_ = 0;
302 bool threadIDAvailable_ =
false;
311 bool pathLegendWritten_ =
false;
312 unsigned int nOutputModules_ =0;
315 bool exception_detected_ =
false;
317 bool emptyLumisectionMode_ =
false;
int encode(const void *add)
Encoding(unsigned int res)
std::string pathLegendFileJson_
ContainableAtomic(ContainableAtomic< T > const &iOther)
std::vector< ContainableAtomic< const void * > > microstate_
std::string pathLegendFile_
ContainableAtomic(T iValue)
std::atomic< bool > isInitTransition_
std::map< unsigned int, timeval > lumiStartTime_
void fillReserved(const void *add, unsigned int i)
std::string getRunDirName() const
TrainProcessor *const proc
std::map< unsigned int, unsigned long > accuSize_
FastMonitoringThread fmt_
void setInState(FastMonitoringThread::InputState inputState)
std::vector< std::atomic< bool > * > streamCounterUpdating_
bool shouldWriteFiles(unsigned int lumi, unsigned int *proc=0)
void setInStateSup(FastMonitoringThread::InputState inputState)
void add(const std::vector< const T * > &source, std::vector< const T * > &dest)
void doStreamEOLSnapshot(const unsigned int ls, const unsigned int streamID)
std::map< unsigned int, unsigned int > filesProcessedDuringLumi_
ContainableAtomic< T > & operator=(const void *iValue)
unsigned int lastGlobalLumi_
std::string inputLegendFileJson_
unsigned int fastMonIntervals_
std::vector< bool > pathNamesReady_
std::atomic< bool > monInit_
void updateReserved(const void *add)
unsigned int lumiFromSource_
std::vector< std::atomic< bool > * > collectedPathList_
std::queue< unsigned int > lastGlobalLumisClosed_
std::string microstateDefPath_
std::map< unsigned int, double > avgLeadTime_
std::string moduleLegendFile_
static const std::string nopath_
std::vector< ContainableAtomic< const void * > > threadMicrostate_
std::unordered_map< const void *, int > quickReference_
std::map< unsigned int, std::pair< double, unsigned int > > lockStatsDuringLumi_
std::string moduleLegendFileJson_
std::vector< unsigned long > firstEventId_
std::vector< const void * > decoder_
std::vector< std::string > fastPathList_
std::vector< ContainableAtomic< unsigned int > > eventCountForPathInit_
void completeReservedWithDummies()
void setInputSource(FedRawDataInputSource *inputSource)
std::map< unsigned int, std::pair< unsigned int, bool > > processedEventsPerLumi_
std::atomic< unsigned long > totalEventsProcessed_
std::atomic< FastMonitoringThread::Macrostate > macrostate_
std::vector< ContainableAtomic< const void * > > ministate_
std::vector< double > leadTimes_
boost::filesystem::path workingDirectory_
bool isGlobalLumiTransition_
const void * decode(unsigned int index)
void update(const void *add)
std::vector< Encoding > encPath_
edm::ModuleDescription * dummiesForReserved_
std::vector< unsigned int > exceptionInLS_