CMS 3D CMS Logo

FastMonitoringThread.h
Go to the documentation of this file.
1 #ifndef EVF_FASTMONITORINGTHREAD
2 #define EVF_FASTMONITORINGTHREAD
3 
5 
6 #include <iostream>
7 #include <vector>
8 #include <thread>
9 #include <mutex>
10 
11 namespace evf {
12 
14 
16  public:
17  // a copy of the Framework/EventProcessor states
18  enum Macrostate {
19  sInit = 0,
32  };
33 
34  enum InputState {
35  inIgnore = 0,
53  //supervisor thread and worker threads state
68  //combined with inWaitInput
84  //combined with inWaitChunk
101  };
102 
103  struct MonitorData {
104  //fastpath global monitorables
112 
113  unsigned int varIndexThrougput_;
114 
115  //per stream
116  std::vector<unsigned int> microstateEncoded_;
117  std::vector<unsigned int> ministateEncoded_;
118  std::vector<jsoncollector::AtomicMonUInt*> processed_;
120  std::vector<unsigned int> threadMicrostateEncoded_;
121  std::vector<unsigned int> inputState_;
122 
123  //tracking luminosity of a stream
124  std::vector<unsigned int> streamLumi_;
125 
126  //N bins for histograms
127  unsigned int macrostateBins_;
128  unsigned int ministateBins_;
129  unsigned int microstateBins_;
130  unsigned int inputstateBins_;
131 
132  //unsigned int prescaleindex_; // ditto
133 
135  fastMacrostateJ_ = FastMonitoringThread::sInit;
136  fastThroughputJ_ = 0;
137  fastAvgLeadTimeJ_ = 0;
138  fastFilesProcessedJ_ = 0;
139  fastLockWaitJ_ = 0;
140  fastLockCountJ_ = 0;
141  fastMacrostateJ_.setName("Macrostate");
142  fastThroughputJ_.setName("Throughput");
143  fastAvgLeadTimeJ_.setName("AverageLeadTime");
144  fastFilesProcessedJ_.setName("FilesProcessed");
145  fastLockWaitJ_.setName("LockWaitUs");
146  fastLockCountJ_.setName("LockCount");
147 
148  fastPathProcessedJ_ = 0;
149  fastPathProcessedJ_.setName("Processed");
150  }
151 
152  //to be called after fast monitor is constructed
153  void registerVariables(jsoncollector::FastMonitor* fm, unsigned int nStreams, unsigned int nThreads) {
154  //tell FM to track these global variables(for fast and slow monitoring)
155  fm->registerGlobalMonitorable(&fastMacrostateJ_, true, &macrostateBins_);
156  fm->registerGlobalMonitorable(&fastThroughputJ_, false);
157  fm->registerGlobalMonitorable(&fastAvgLeadTimeJ_, false);
158  fm->registerGlobalMonitorable(&fastFilesProcessedJ_, false);
159  fm->registerGlobalMonitorable(&fastLockWaitJ_, false);
160  fm->registerGlobalMonitorable(&fastLockCountJ_, false);
161 
162  for (unsigned int i = 0; i < nStreams; i++) {
164  *p = 0;
165  processed_.push_back(p);
166  streamLumi_.push_back(0);
167  }
168 
169  microstateEncoded_.resize(nStreams);
170  ministateEncoded_.resize(nStreams);
171  threadMicrostateEncoded_.resize(nThreads);
172  inputState_.resize(nStreams);
173  for (unsigned int j = 0; j < inputState_.size(); j++)
174  inputState_[j] = 0;
175 
176  //tell FM to track these int vectors
177  fm->registerStreamMonitorableUIntVec("Ministate", &ministateEncoded_, true, &ministateBins_);
178 
179  if (nThreads <= nStreams) //no overlapping in module execution per stream
180  fm->registerStreamMonitorableUIntVec("Microstate", &microstateEncoded_, true, &microstateBins_);
181  else
182  fm->registerStreamMonitorableUIntVec("Microstate", &threadMicrostateEncoded_, true, &microstateBins_);
183 
184  fm->registerStreamMonitorableUIntVecAtomic("Processed", &processed_, false, nullptr);
185 
186  //input source state tracking (not stream, but other than first item in vector is set to Ignore state)
187  fm->registerStreamMonitorableUIntVec("Inputstate", &inputState_, true, &inputstateBins_);
188 
189  //global cumulative event counter is used for fast path
190  fm->registerFastGlobalMonitorable(&fastPathProcessedJ_);
191 
192  //provide vector with updated per stream lumis and let it finish initialization
193  fm->commit(&streamLumi_);
194  }
195  };
196 
197  //constructor
199 
200  void resetFastMonitor(std::string const& microStateDefPath, std::string const& fastMicroStateDefPath) {
201  std::string defGroup = "data";
202  jsonMonitor_.reset(new jsoncollector::FastMonitor(microStateDefPath, defGroup, false));
203  if (!fastMicroStateDefPath.empty())
204  jsonMonitor_->addFastPathDefinition(fastMicroStateDefPath, defGroup, false);
205  }
206 
208  assert(!m_thread);
209  m_thread = std::make_shared<std::thread>(fp, cp);
210  }
211  void stop() {
212  assert(m_thread);
213  m_stoprequest = true;
214  m_thread->join();
215  }
216 
217  private:
218  std::atomic<bool> m_stoprequest;
219  std::shared_ptr<std::thread> m_thread;
222 
223  std::unique_ptr<jsoncollector::FastMonitor> jsonMonitor_;
224 
225  friend class FastMonitoringService;
226  };
227 } //end namespace evf
228 #endif
void registerFastGlobalMonitorable(JsonMonitorable *newMonitorable)
Definition: FastMonitor.cc:80
static boost::mutex mutex
Definition: Proxy.cc:9
Definition: fillJson.h:27
void start(void(FastMonitoringService::*fp)(), FastMonitoringService *cp)
void registerStreamMonitorableUIntVecAtomic(std::string const &name, std::vector< AtomicMonUInt * > *inputs, bool NAifZeroUpdates, unsigned int *nBins=nullptr)
Definition: FastMonitor.cc:100
std::shared_ptr< std::thread > m_thread
void registerGlobalMonitorable(JsonMonitorable *newMonitorable, bool NAifZeroUpdates, unsigned int *nBins=nullptr)
Definition: FastMonitor.cc:66
void registerVariables(jsoncollector::FastMonitor *fm, unsigned int nStreams, unsigned int nThreads)
std::vector< unsigned int > threadMicrostateEncoded_
virtual void setName(std::string name)
void registerStreamMonitorableUIntVec(std::string const &name, std::vector< unsigned int > *inputs, bool NAifZeroUpdates, unsigned int *nBins=nullptr)
Definition: FastMonitor.cc:87
std::unique_ptr< jsoncollector::FastMonitor > jsonMonitor_
void resetFastMonitor(std::string const &microStateDefPath, std::string const &fastMicroStateDefPath)
void commit(std::vector< unsigned int > *streamLumisPtr)
Definition: FastMonitor.cc:115
unsigned int AtomicMonUInt
Definition: DataPoint.h:31
std::vector< unsigned int > microstateEncoded_
std::atomic< bool > m_stoprequest
std::vector< jsoncollector::AtomicMonUInt * > processed_
std::vector< unsigned int > ministateEncoded_