CMS 3D CMS Logo

FastMonitoringThread.h
Go to the documentation of this file.
1 #ifndef EVF_FASTMONITORINGTHREAD
2 #define EVF_FASTMONITORINGTHREAD
3 
6 
7 #include <iostream>
8 #include <memory>
9 
10 #include <vector>
11 #include <thread>
12 #include <mutex>
13 
14 namespace evf {
15 
16  //namespace FastMonState {
17  // enum Macrostate;
18  //}
19 
21 
22  template <typename T>
23  struct ContainableAtomic {
25  ContainableAtomic(T iValue) : m_value(iValue) {}
28  m_value.store(iValue, std::memory_order_relaxed);
29  return *this;
30  }
31  operator T() { return m_value.load(std::memory_order_relaxed); }
32 
33  std::atomic<T> m_value;
34  };
35 
36  struct FastMonEncoding {
38  if (reserved_)
40  // completeReservedWithDummies();
41  }
43  if (reserved_)
44  delete[] dummiesForReserved_;
45  }
46  //trick: only encode state when sending it over (i.e. every sec)
47  int encode(const void* add) const {
48  std::unordered_map<const void*, int>::const_iterator it = quickReference_.find(add);
49  return (it != quickReference_.end()) ? (*it).second : 0;
50  }
51 
52  //this allows to init path list in beginJob, but strings used later are not in the same memory
53  //position. Therefore path address lookup will be updated when snapshot (encode) is called
54  //with this we can remove ugly path legend update in preEventPath, but will still need a check
55  //that any event has been processed (any path will do)
57  std::unordered_map<const void*, int>::const_iterator it = quickReference_.find((void*)add);
58  if (it == quickReference_.end()) {
59  //try to match by string content (encode only used
60  auto it = quickReferencePreinit_.find(*add);
61  if (it == quickReferencePreinit_.end())
62  return 0;
63  else {
64  //overwrite pointer in decoder and add to reference
65  decoder_[(*it).second] = (void*)add;
66  quickReference_[(void*)add] = (*it).second;
68  return encode((void*)add);
69  }
70  }
71  return (*it).second;
72  }
73 
74  const void* decode(unsigned int index) { return decoder_[index]; }
75  void fillReserved(const void* add, unsigned int i) {
76  // translation_[*name]=current_;
78  if (decoder_.size() <= i)
79  decoder_.push_back(add);
80  else
82  }
83  void updateReserved(const void* add) {
86  }
88  for (unsigned int i = currentReserved_; i < reserved_; i++)
90  }
91  void update(const void* add) {
92  // translation_[*name]=current_;
94  decoder_.push_back(add);
95  current_++;
96  }
97 
98  void updatePreinit(std::string const& add) {
99  // translation_[*name]=current_;
101  decoder_.push_back((void*)&add);
102  current_++;
103  }
104 
105  unsigned int vecsize() { return decoder_.size(); }
106  std::unordered_map<const void*, int> quickReference_;
107  std::unordered_map<std::string, int> quickReferencePreinit_;
108  std::vector<const void*> decoder_;
109  unsigned int reserved_;
110  int current_;
113  };
114 
116  public:
117  struct MonitorData {
118  //fastpath global monitorables
126 
127  unsigned int varIndexThrougput_;
128 
129  //per stream
130  std::vector<unsigned int> tmicrostateEncoded_;
131  std::vector<unsigned int> microstateEncoded_;
132  std::vector<jsoncollector::AtomicMonUInt*> processed_;
134  std::vector<unsigned int> inputState_;
135 
136  //tracking luminosity of a stream
137  std::vector<unsigned int> streamLumi_;
138 
139  //N bins for histograms
140  unsigned int macrostateBins_;
141  unsigned int microstateBins_;
142  unsigned int inputstateBins_;
143 
144  //global state
145  std::atomic<FastMonState::Macrostate> macrostate_;
146 
148  std::vector<FastMonEncoding> encPath_;
149 
150  //unsigned int prescaleindex_; // ditto
151 
154  fastThroughputJ_ = 0;
155  fastAvgLeadTimeJ_ = 0;
157  fastLockWaitJ_ = 0;
158  fastLockCountJ_ = 0;
159  fastMacrostateJ_.setName("Macrostate");
160  fastThroughputJ_.setName("Throughput");
161  fastAvgLeadTimeJ_.setName("AverageLeadTime");
162  fastFilesProcessedJ_.setName("FilesProcessed");
163  fastLockWaitJ_.setName("LockWaitUs");
164  fastLockCountJ_.setName("LockCount");
165 
167  fastPathProcessedJ_.setName("Processed");
168  }
169 
170  //to be called after fast monitor is constructed
172  unsigned nMaxSlices,
173  unsigned nMaxStreams,
174  unsigned nMaxThreads) {
175  //tell FM to track these global variables(for fast and slow monitoring)
182 
183  for (unsigned int i = 0; i < nMaxSlices; i++) {
185  *p = 0;
186  processed_.push_back(p);
187  streamLumi_.push_back(0);
188  }
189 
190  tmicrostateEncoded_.resize(nMaxSlices, FastMonState::mInvalid);
191  for (unsigned int i = nMaxThreads; i < nMaxSlices; i++) {
193  }
194  microstateEncoded_.resize(nMaxSlices, FastMonState::mInvalid);
195  inputState_.resize(nMaxSlices, FastMonState::inInit);
196  for (unsigned int i = nMaxStreams; i < nMaxSlices; i++) {
199  }
200  //for (unsigned int j = 0; j < nMaxStreams; j++)
201  // inputState_[j] = 0;
202 
203  //tell FM to track these int vectors
205 
207 
208  fm->registerStreamMonitorableUIntVecAtomic("Processed", &processed_, false, nullptr);
209 
210  //input source state tracking (not stream, but other than first item in vector is set to Ignore state)
212 
213  //global cumulative event counter is used for fast path
215 
216  //provide vector with updated per stream lumis and let it finish initialization
217  fm->commit(&streamLumi_);
218  }
219  };
220 
221  //constructor
223 
224  void resetFastMonitor(std::string const& microStateDefPath, std::string const& fastMicroStateDefPath) {
225  std::string defGroup = "data";
226  jsonMonitor_ = std::make_unique<jsoncollector::FastMonitor>(microStateDefPath, defGroup, false);
227  if (!fastMicroStateDefPath.empty())
228  jsonMonitor_->addFastPathDefinition(fastMicroStateDefPath, defGroup, false);
229  }
230 
232  assert(!m_thread);
233  m_thread = std::make_shared<std::thread>(fp, cp);
234  }
235  void stop() {
236  if (m_thread.get()) {
237  m_stoprequest = true;
238  m_thread->join();
239  m_thread.reset();
240  }
241  }
242 
244 
245  private:
246  std::atomic<bool> m_stoprequest;
247  std::shared_ptr<std::thread> m_thread;
250 
251  std::unique_ptr<jsoncollector::FastMonitor> jsonMonitor_;
252 
253  friend class FastMonitoringService;
254  };
255 } //end namespace evf
256 #endif
void registerFastGlobalMonitorable(JsonMonitorable *newMonitorable)
Definition: FastMonitor.cc:81
Definition: fillJson.h:27
std::vector< const void * > decoder_
void update(const void *add)
ContainableAtomic(ContainableAtomic< T > const &iOther)
void registerStreamMonitorableUIntVecAtomic(std::string const &name, std::vector< AtomicMonUInt *> *inputs, bool NAifZeroUpdates, unsigned int *nBins=nullptr)
Definition: FastMonitor.cc:101
void start(void(FastMonitoringService::*fp)(), FastMonitoringService *cp)
const void * decode(unsigned int index)
std::atomic< FastMonState::Macrostate > macrostate_
void updatePreinit(std::string const &add)
edm::ModuleDescription * dummiesForReserved_
static std::mutex mutex
Definition: Proxy.cc:8
std::unordered_map< std::string, int > quickReferencePreinit_
std::shared_ptr< std::thread > m_thread
int encodeString(const std::string *add)
assert(be >=bs)
Definition: Electron.h:6
std::vector< unsigned int > tmicrostateEncoded_
void registerGlobalMonitorable(JsonMonitorable *newMonitorable, bool NAifZeroUpdates, unsigned int *nBins=nullptr)
Definition: FastMonitor.cc:67
virtual void setName(std::string name)
void registerStreamMonitorableUIntVec(std::string const &name, std::vector< unsigned int > *inputs, bool NAifZeroUpdates, unsigned int *nBins=nullptr)
Definition: FastMonitor.cc:88
std::unique_ptr< jsoncollector::FastMonitor > jsonMonitor_
void resetFastMonitor(std::string const &microStateDefPath, std::string const &fastMicroStateDefPath)
void registerVariables(jsoncollector::FastMonitor *fm, unsigned nMaxSlices, unsigned nMaxStreams, unsigned nMaxThreads)
void commit(std::vector< unsigned int > *streamLumisPtr)
Definition: FastMonitor.cc:116
unsigned int AtomicMonUInt
Definition: DataPoint.h:31
std::vector< unsigned int > microstateEncoded_
def load(fileName)
Definition: svgfig.py:547
constexpr int nReservedModules
void updateReserved(const void *add)
std::vector< FastMonEncoding > encPath_
void fillReserved(const void *add, unsigned int i)
void add(std::map< std::string, TH1 *> &h, TH1 *hist)
std::atomic< bool > m_stoprequest
std::vector< jsoncollector::AtomicMonUInt * > processed_
FastMonEncoding(unsigned int res)
int encode(const void *add) const
long double T
std::unordered_map< const void *, int > quickReference_
def cp(fromDir, toDir, listOfFiles, overwrite=False, smallList=False)
ContainableAtomic< T > & operator=(T iValue)