CMS 3D CMS Logo

FastMonitoringThread.h
Go to the documentation of this file.
1 #ifndef EVF_FASTMONITORINGTHREAD
2 #define EVF_FASTMONITORINGTHREAD
3 
6 
7 #include <iostream>
8 #include <memory>
9 
10 #include <vector>
11 #include <thread>
12 #include <mutex>
13 
14 namespace evf {
15 
16  constexpr int nReservedModules = 128;
17  constexpr int nSpecialModules = 10;
18  constexpr int nReservedPaths = 1;
19 
20  namespace FastMonState {
21  enum Macrostate;
22  }
23 
25 
26  template <typename T>
29  ContainableAtomic(T iValue) : m_value(iValue) {}
32  m_value.store(iValue, std::memory_order_relaxed);
33  return *this;
34  }
35  operator T() { return m_value.load(std::memory_order_relaxed); }
36 
37  std::atomic<T> m_value;
38  };
39 
40  struct FastMonEncoding {
42  if (reserved_)
44  // completeReservedWithDummies();
45  }
47  if (reserved_)
48  delete[] dummiesForReserved_;
49  }
50  //trick: only encode state when sending it over (i.e. every sec)
51  int encode(const void* add) const {
52  std::unordered_map<const void*, int>::const_iterator it = quickReference_.find(add);
53  return (it != quickReference_.end()) ? (*it).second : 0;
54  }
55 
56  //this allows to init path list in beginJob, but strings used later are not in the same memory
57  //position. Therefore path address lookup will be updated when snapshot (encode) is called
58  //with this we can remove ugly path legend update in preEventPath, but will still need a check
59  //that any event has been processed (any path will do)
61  std::unordered_map<const void*, int>::const_iterator it = quickReference_.find((void*)add);
62  if (it == quickReference_.end()) {
63  //try to match by string content (encode only used
64  auto it = quickReferencePreinit_.find(*add);
65  if (it == quickReferencePreinit_.end())
66  return 0;
67  else {
68  //overwrite pointer in decoder and add to reference
69  decoder_[(*it).second] = (void*)add;
70  quickReference_[(void*)add] = (*it).second;
71  quickReferencePreinit_.erase(it);
72  return encode((void*)add);
73  }
74  }
75  return (*it).second;
76  }
77 
78  const void* decode(unsigned int index) { return decoder_[index]; }
79  void fillReserved(const void* add, unsigned int i) {
80  // translation_[*name]=current_;
82  if (decoder_.size() <= i)
83  decoder_.push_back(add);
84  else
86  }
87  void updateReserved(const void* add) {
90  }
92  for (unsigned int i = currentReserved_; i < reserved_; i++)
94  }
95  void update(const void* add) {
96  // translation_[*name]=current_;
98  decoder_.push_back(add);
99  current_++;
100  }
101 
103  // translation_[*name]=current_;
105  decoder_.push_back((void*)&add);
106  current_++;
107  }
108 
109  unsigned int vecsize() { return decoder_.size(); }
110  std::unordered_map<const void*, int> quickReference_;
111  std::unordered_map<std::string, int> quickReferencePreinit_;
112  std::vector<const void*> decoder_;
113  unsigned int reserved_;
114  int current_;
117  };
118 
120  public:
121  struct MonitorData {
122  //fastpath global monitorables
130 
131  unsigned int varIndexThrougput_;
132 
133  //per stream
134  std::vector<unsigned int> microstateEncoded_;
135  std::vector<unsigned int> ministateEncoded_;
136  std::vector<jsoncollector::AtomicMonUInt*> processed_;
138  std::vector<unsigned int> threadMicrostateEncoded_;
139  std::vector<unsigned int> inputState_;
140 
141  //tracking luminosity of a stream
142  std::vector<unsigned int> streamLumi_;
143 
144  //N bins for histograms
145  unsigned int macrostateBins_;
146  unsigned int ministateBins_;
147  unsigned int microstateBins_;
148  unsigned int inputstateBins_;
149 
150  //global state
151  std::atomic<FastMonState::Macrostate> macrostate_;
152 
153  //per stream
154  std::vector<ContainableAtomic<const std::string*>> ministate_;
155  std::vector<ContainableAtomic<const void*>> microstate_;
156  std::vector<ContainableAtomic<unsigned char>> microstateAcqFlag_;
157  std::vector<ContainableAtomic<const void*>> threadMicrostate_;
158 
160  std::vector<FastMonEncoding> encPath_;
161 
162  //unsigned int prescaleindex_; // ditto
163 
166  fastThroughputJ_ = 0;
167  fastAvgLeadTimeJ_ = 0;
169  fastLockWaitJ_ = 0;
170  fastLockCountJ_ = 0;
171  fastMacrostateJ_.setName("Macrostate");
172  fastThroughputJ_.setName("Throughput");
173  fastAvgLeadTimeJ_.setName("AverageLeadTime");
174  fastFilesProcessedJ_.setName("FilesProcessed");
175  fastLockWaitJ_.setName("LockWaitUs");
176  fastLockCountJ_.setName("LockCount");
177 
179  fastPathProcessedJ_.setName("Processed");
180  }
181 
182  //to be called after fast monitor is constructed
183  void registerVariables(jsoncollector::FastMonitor* fm, unsigned int nStreams, unsigned int nThreads) {
184  //tell FM to track these global variables(for fast and slow monitoring)
191 
192  for (unsigned int i = 0; i < nStreams; i++) {
194  *p = 0;
195  processed_.push_back(p);
196  streamLumi_.push_back(0);
197  }
198 
199  microstateEncoded_.resize(nStreams);
200  ministateEncoded_.resize(nStreams);
201  threadMicrostateEncoded_.resize(nThreads);
202  inputState_.resize(nStreams);
203  for (unsigned int j = 0; j < inputState_.size(); j++)
204  inputState_[j] = 0;
205 
206  //tell FM to track these int vectors
208 
209  if (nThreads <= nStreams) //no overlapping in module execution per stream
211  else
213 
214  fm->registerStreamMonitorableUIntVecAtomic("Processed", &processed_, false, nullptr);
215 
216  //input source state tracking (not stream, but other than first item in vector is set to Ignore state)
218 
219  //global cumulative event counter is used for fast path
221 
222  //provide vector with updated per stream lumis and let it finish initialization
223  fm->commit(&streamLumi_);
224  }
225  };
226 
227  //constructor
229 
230  void resetFastMonitor(std::string const& microStateDefPath, std::string const& fastMicroStateDefPath) {
231  std::string defGroup = "data";
232  jsonMonitor_ = std::make_unique<jsoncollector::FastMonitor>(microStateDefPath, defGroup, false);
233  if (!fastMicroStateDefPath.empty())
234  jsonMonitor_->addFastPathDefinition(fastMicroStateDefPath, defGroup, false);
235  }
236 
238  assert(!m_thread);
239  m_thread = std::make_shared<std::thread>(fp, cp);
240  }
241  void stop() {
242  if (m_thread.get()) {
243  m_stoprequest = true;
244  m_thread->join();
245  m_thread.reset();
246  }
247  }
248 
250 
251  private:
252  std::atomic<bool> m_stoprequest;
253  std::shared_ptr<std::thread> m_thread;
256 
257  std::unique_ptr<jsoncollector::FastMonitor> jsonMonitor_;
258 
259  friend class FastMonitoringService;
260  };
261 } //end namespace evf
262 #endif
std::vector< ContainableAtomic< unsigned char > > microstateAcqFlag_
void registerFastGlobalMonitorable(JsonMonitorable *newMonitorable)
Definition: FastMonitor.cc:81
Definition: fillJson.h:27
std::vector< const void * > decoder_
void update(const void *add)
ContainableAtomic(ContainableAtomic< T > const &iOther)
std::vector< ContainableAtomic< const std::string * > > ministate_
void registerStreamMonitorableUIntVecAtomic(std::string const &name, std::vector< AtomicMonUInt *> *inputs, bool NAifZeroUpdates, unsigned int *nBins=nullptr)
Definition: FastMonitor.cc:101
void start(void(FastMonitoringService::*fp)(), FastMonitoringService *cp)
const void * decode(unsigned int index)
std::atomic< FastMonState::Macrostate > macrostate_
void updatePreinit(std::string const &add)
edm::ModuleDescription * dummiesForReserved_
static std::mutex mutex
Definition: Proxy.cc:8
std::unordered_map< std::string, int > quickReferencePreinit_
std::shared_ptr< std::thread > m_thread
int encodeString(const std::string *add)
assert(be >=bs)
Definition: Electron.h:6
constexpr int nSpecialModules
void registerGlobalMonitorable(JsonMonitorable *newMonitorable, bool NAifZeroUpdates, unsigned int *nBins=nullptr)
Definition: FastMonitor.cc:67
void registerVariables(jsoncollector::FastMonitor *fm, unsigned int nStreams, unsigned int nThreads)
std::vector< unsigned int > threadMicrostateEncoded_
virtual void setName(std::string name)
void registerStreamMonitorableUIntVec(std::string const &name, std::vector< unsigned int > *inputs, bool NAifZeroUpdates, unsigned int *nBins=nullptr)
Definition: FastMonitor.cc:88
std::unique_ptr< jsoncollector::FastMonitor > jsonMonitor_
void resetFastMonitor(std::string const &microStateDefPath, std::string const &fastMicroStateDefPath)
void commit(std::vector< unsigned int > *streamLumisPtr)
Definition: FastMonitor.cc:116
constexpr int nReservedPaths
unsigned int AtomicMonUInt
Definition: DataPoint.h:31
std::vector< unsigned int > microstateEncoded_
std::vector< ContainableAtomic< const void * > > threadMicrostate_
def load(fileName)
Definition: svgfig.py:547
constexpr int nReservedModules
void updateReserved(const void *add)
std::vector< FastMonEncoding > encPath_
std::vector< ContainableAtomic< const void * > > microstate_
void fillReserved(const void *add, unsigned int i)
void add(std::map< std::string, TH1 *> &h, TH1 *hist)
std::atomic< bool > m_stoprequest
std::vector< jsoncollector::AtomicMonUInt * > processed_
FastMonEncoding(unsigned int res)
int encode(const void *add) const
long double T
std::vector< unsigned int > ministateEncoded_
std::unordered_map< const void *, int > quickReference_
def cp(fromDir, toDir, listOfFiles, overwrite=False, smallList=False)
ContainableAtomic< T > & operator=(T iValue)