1 #ifndef EvFFastMonitoringService_H 2 #define EvFFastMonitoringService_H 1 13 #include "boost/filesystem.hpp" 23 #include <unordered_map> 61 m_value.store(iValue, std::memory_order_relaxed);
64 operator T() {
return m_value.load(std::memory_order_relaxed); }
71 Encoding(
unsigned int res) : reserved_(res), current_(reserved_), currentReserved_(0) {
78 delete[] dummiesForReserved_;
82 std::unordered_map<const void*, int>::const_iterator it = quickReference_.find(add);
83 return (it != quickReference_.end()) ? (*it).second : 0;
88 quickReference_[
add] =
i;
89 if (decoder_.size() <=
i)
90 decoder_.push_back(add);
92 decoder_[currentReserved_] =
add;
95 fillReserved(add, currentReserved_);
99 for (
unsigned int i = currentReserved_;
i < reserved_;
i++)
100 fillReserved(dummiesForReserved_ +
i,
i);
104 quickReference_[
add] = current_;
105 decoder_.push_back(add);
108 unsigned int vecsize() {
return decoder_.size(); }
119 static const std::string macroStateNames[FastMonitoringThread::MCOUNT];
120 static const std::string inputStateNames[FastMonitoringThread::inCOUNT];
159 void setExceptionDetected(
unsigned int ls);
165 void accumulateFileSize(
unsigned int lumi,
unsigned long fileSize);
166 void startedLookingForFile();
167 void stoppedLookingForFile(
unsigned int lumi);
168 void reportLockWait(
unsigned int ls,
double waitTime,
unsigned int lockCount);
169 unsigned int getEventsProcessedForLumi(
unsigned int lumi,
bool* abortFlag =
nullptr);
170 bool getAbortFlagForLumi(
unsigned int lumi);
172 unsigned int processed = getEventsProcessedForLumi(lumi);
175 return !getAbortFlagForLumi(lumi);
183 void doSnapshot(
const unsigned int ls,
const bool isGlobalEOL);
187 fmt_.jsonMonitor_->snapStreamAtomic(ls, streamID);
191 monInit_.exchange(
true, std::memory_order_acquire);
192 while (!fmt_.m_stoprequest) {
194 <<
"Current states: Ms=" << fmt_.m_data.fastMacrostateJ_.value()
195 <<
" ms=" << encPath_[0].encode(ministate_[0]) <<
" us=" << encModule_.encode(microstate_[0])
196 <<
" is=" << inputStateNames[inputState_] <<
" iss=" << inputStateNames[inputSupervisorState_] << std::endl;
199 std::lock_guard<std::mutex>
lock(fmt_.monlock_);
201 doSnapshot(lastGlobalLumi_,
false);
203 if (fastMonIntervals_ && (snapCounter_ % fastMonIntervals_) == 0) {
204 if (filePerFwkStream_) {
205 std::vector<std::string> CSVv;
206 for (
unsigned int i = 0;
i < nStreams_;
i++) {
207 CSVv.push_back(fmt_.jsonMonitor_->getCSVString((
int)
i));
209 fmt_.monlock_.unlock();
210 for (
unsigned int i = 0;
i < nStreams_;
i++) {
212 fmt_.jsonMonitor_->outputCSV(fastPathList_[
i], CSVv[i]);
215 std::string CSV = fmt_.jsonMonitor_->getCSVString();
217 fmt_.monlock_.unlock();
219 fmt_.jsonMonitor_->outputCSV(fastPath_, CSV);
234 std::atomic<FastMonitoringThread::InputState> inputState_{FastMonitoringThread::InputState::inInit};
235 std::atomic<FastMonitoringThread::InputState> inputSupervisorState_{FastMonitoringThread::InputState::inInit};
241 unsigned int snapCounter_ = 0;
286 bool threadIDAvailable_ =
false;
295 bool pathLegendWritten_ =
false;
296 unsigned int nOutputModules_ = 0;
299 bool exception_detected_ =
false;
int encode(const void *add)
Encoding(unsigned int res)
std::string pathLegendFileJson_
ContainableAtomic(ContainableAtomic< T > const &iOther)
std::vector< ContainableAtomic< const void * > > microstate_
std::vector< const void * > decoder_
std::string pathLegendFile_
ContainableAtomic(T iValue)
std::atomic< bool > isInitTransition_
std::map< unsigned int, timeval > lumiStartTime_
void fillReserved(const void *add, unsigned int i)
std::string getRunDirName() const
std::map< unsigned int, unsigned long > accuSize_
FastMonitoringThread fmt_
void setInState(FastMonitoringThread::InputState inputState)
std::vector< std::atomic< bool > * > streamCounterUpdating_
bool shouldWriteFiles(unsigned int lumi, unsigned int *proc=0)
void setInStateSup(FastMonitoringThread::InputState inputState)
void doStreamEOLSnapshot(const unsigned int ls, const unsigned int streamID)
std::map< unsigned int, unsigned int > filesProcessedDuringLumi_
ContainableAtomic< T > & operator=(const void *iValue)
unsigned int lastGlobalLumi_
std::string inputLegendFileJson_
unsigned int fastMonIntervals_
std::vector< bool > pathNamesReady_
std::atomic< bool > monInit_
std::map< unsigned int, std::pair< unsigned int, bool > > processedEventsPerLumi_
void updateReserved(const void *add)
unsigned int lumiFromSource_
std::vector< std::atomic< bool > * > collectedPathList_
std::string microstateDefPath_
std::unordered_map< const void *, int > quickReference_
std::map< unsigned int, double > avgLeadTime_
std::string moduleLegendFile_
static const std::string nopath_
std::vector< ContainableAtomic< const void * > > threadMicrostate_
std::string moduleLegendFileJson_
std::vector< unsigned long > firstEventId_
std::vector< std::string > fastPathList_
std::vector< ContainableAtomic< unsigned int > > eventCountForPathInit_
void completeReservedWithDummies()
void setInputSource(FedRawDataInputSource *inputSource)
void add(std::map< std::string, TH1 * > &h, TH1 *hist)
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
std::atomic< unsigned long > totalEventsProcessed_
std::map< unsigned int, std::pair< double, unsigned int > > lockStatsDuringLumi_
std::atomic< FastMonitoringThread::Macrostate > macrostate_
std::vector< ContainableAtomic< const void * > > ministate_
std::vector< double > leadTimes_
boost::filesystem::path workingDirectory_
const void * decode(unsigned int index)
void update(const void *add)
std::vector< Encoding > encPath_
edm::ModuleDescription * dummiesForReserved_
std::vector< unsigned int > exceptionInLS_