1 #ifndef EvFFastMonitoringService_H 2 #define EvFFastMonitoringService_H 1 14 #include "boost/filesystem.hpp" 24 #include <unordered_map> 61 Encoding(
unsigned int res): reserved_(res), current_(reserved_), currentReserved_(0)
70 delete[] dummiesForReserved_;
74 std::unordered_map<const void *, int>::const_iterator it=quickReference_.find(add);
75 return (it!=quickReference_.end()) ? (*it).second : 0;
80 quickReference_[
add]=
i;
81 if(decoder_.size()<=
i)
82 decoder_.push_back(add);
84 decoder_[currentReserved_] =
add;
87 fillReserved(add,currentReserved_);
92 for(
unsigned int i = currentReserved_;
i<reserved_;
i++)
93 fillReserved(dummiesForReserved_+
i,
i);
97 quickReference_[
add]=current_;
98 decoder_.push_back(add);
102 return decoder_.size();
114 static const std::string macroStateNames[FastMonitoringThread::MCOUNT];
115 static const std::string inputStateNames[FastMonitoringThread::inCOUNT];
154 void setExceptionDetected(
unsigned int ls);
160 void accumulateFileSize(
unsigned int lumi,
unsigned long fileSize);
161 void startedLookingForFile();
162 void stoppedLookingForFile(
unsigned int lumi);
163 void reportLockWait(
unsigned int ls,
double waitTime,
unsigned int lockCount);
164 unsigned int getEventsProcessedForLumi(
unsigned int lumi,
bool * abortFlag=
nullptr);
165 bool getAbortFlagForLumi(
unsigned int lumi);
168 unsigned int processed = getEventsProcessedForLumi(lumi);
170 return !getAbortFlagForLumi(lumi) && (processed || emptyLumisectionMode_);
179 void doSnapshot(
const unsigned int ls,
const bool isGlobalEOL);
183 fmt_.jsonMonitor_->snapStreamAtomic(ls,streamID);
187 monInit_.exchange(
true,std::memory_order_acquire);
188 while (!fmt_.m_stoprequest) {
189 edm::LogInfo(
"FastMonitoringService") <<
"Current states: Ms=" << fmt_.m_data.fastMacrostateJ_.value()
190 <<
" ms=" << encPath_[0].encode(ministate_[0])
191 <<
" us=" << encModule_.encode(microstate_[0])
192 <<
" is=" << inputStateNames[inputState_]
193 <<
" iss="<< inputStateNames[inputSupervisorState_]
197 std::lock_guard<std::mutex>
lock(fmt_.monlock_);
199 doSnapshot(lastGlobalLumi_,
false);
201 if (fastMonIntervals_ && (snapCounter_%fastMonIntervals_)==0) {
202 if (filePerFwkStream_) {
203 std::vector<std::string> CSVv;
204 for (
unsigned int i=0;
i<nStreams_;
i++) {
205 CSVv.push_back(fmt_.jsonMonitor_->getCSVString((
int)
i));
207 fmt_.monlock_.unlock();
208 for (
unsigned int i=0;
i<nStreams_;
i++) {
210 fmt_.jsonMonitor_->outputCSV(fastPathList_[
i],CSVv[i]);
214 std::string CSV = fmt_.jsonMonitor_->getCSVString();
216 fmt_.monlock_.unlock();
218 fmt_.jsonMonitor_->outputCSV(fastPath_,CSV);
241 unsigned int snapCounter_ = 0;
288 bool threadIDAvailable_ =
false;
297 bool pathLegendWritten_ =
false;
298 unsigned int nOutputModules_ =0;
301 bool exception_detected_ =
false;
303 bool emptyLumisectionMode_ =
false;
int encode(const void *add)
FastMonitoringThread::InputState inputState_
Encoding(unsigned int res)
std::string pathLegendFileJson_
std::string pathLegendFile_
std::map< unsigned int, timeval > lumiStartTime_
void fillReserved(const void *add, unsigned int i)
std::string getRunDirName() const
TrainProcessor *const proc
std::map< unsigned int, unsigned long > accuSize_
FastMonitoringThread fmt_
void setInState(FastMonitoringThread::InputState inputState)
std::vector< unsigned int > eventCountForPathInit_
std::vector< std::atomic< bool > * > streamCounterUpdating_
bool shouldWriteFiles(unsigned int lumi, unsigned int *proc=0)
void setInStateSup(FastMonitoringThread::InputState inputState)
void add(const std::vector< const T * > &source, std::vector< const T * > &dest)
void doStreamEOLSnapshot(const unsigned int ls, const unsigned int streamID)
std::map< unsigned int, unsigned int > filesProcessedDuringLumi_
unsigned int lastGlobalLumi_
FastMonitoringThread::InputState inputSupervisorState_
std::string inputLegendFileJson_
unsigned int fastMonIntervals_
std::vector< bool > pathNamesReady_
std::atomic< bool > monInit_
void updateReserved(const void *add)
unsigned int lumiFromSource_
std::vector< std::atomic< bool > * > collectedPathList_
std::queue< unsigned int > lastGlobalLumisClosed_
std::string microstateDefPath_
std::map< unsigned int, double > avgLeadTime_
std::string moduleLegendFile_
static const std::string nopath_
std::vector< const void * > threadMicrostate_
std::unordered_map< const void *, int > quickReference_
std::map< unsigned int, std::pair< double, unsigned int > > lockStatsDuringLumi_
std::string moduleLegendFileJson_
std::vector< unsigned long > firstEventId_
std::vector< const void * > decoder_
std::vector< std::string > fastPathList_
void completeReservedWithDummies()
void setInputSource(FedRawDataInputSource *inputSource)
std::vector< const void * > microstate_
std::map< unsigned int, std::pair< unsigned int, bool > > processedEventsPerLumi_
std::atomic< unsigned long > totalEventsProcessed_
FastMonitoringThread::Macrostate macrostate_
std::vector< double > leadTimes_
boost::filesystem::path workingDirectory_
bool isGlobalLumiTransition_
std::vector< const void * > ministate_
const void * decode(unsigned int index)
void update(const void *add)
std::vector< Encoding > encPath_
edm::ModuleDescription * dummiesForReserved_
std::vector< unsigned int > exceptionInLS_