CMS 3D CMS Logo

FastMonitoringThread.h
Go to the documentation of this file.
1 #ifndef EVF_FASTMONITORINGTHREAD
2 #define EVF_FASTMONITORINGTHREAD
3 
6 
7 #include <iostream>
8 #include <memory>
9 
10 #include <vector>
11 #include <thread>
12 #include <mutex>
13 
14 namespace evf {
15 
16  constexpr int nReservedModules = 64;
17  constexpr int nSpecialModules = 10;
18  constexpr int nReservedPaths = 1;
19 
20  namespace FastMonState {
21  enum Macrostate;
22  }
23 
25 
26  template <typename T>
29  ContainableAtomic(T iValue) : m_value(iValue) {}
32  m_value.store(iValue, std::memory_order_relaxed);
33  return *this;
34  }
35  operator T() { return m_value.load(std::memory_order_relaxed); }
36 
37  std::atomic<T> m_value;
38  };
39 
40  struct FastMonEncoding {
42  if (reserved_)
44  // completeReservedWithDummies();
45  }
47  if (reserved_)
48  delete[] dummiesForReserved_;
49  }
50  //trick: only encode state when sending it over (i.e. every sec)
51  int encode(const void* add) const {
52  std::unordered_map<const void*, int>::const_iterator it = quickReference_.find(add);
53  return (it != quickReference_.end()) ? (*it).second : 0;
54  }
55 
56  //this allows to init path list in beginJob, but strings used later are not in the same memory
57  //position. Therefore path address lookup will be updated when snapshot (encode) is called
58  //with this we can remove ugly path legend update in preEventPath, but will still need a check
59  //that any event has been processed (any path will do)
61  std::unordered_map<const void*, int>::const_iterator it = quickReference_.find((void*)add);
62  if (it == quickReference_.end()) {
63  //try to match by string content (encode only used
64  auto it = quickReferencePreinit_.find(*add);
65  if (it == quickReferencePreinit_.end())
66  return 0;
67  else {
68  //overwrite pointer in decoder and add to reference
69  decoder_[(*it).second] = (void*)add;
70  quickReference_[(void*)add] = (*it).second;
71  quickReferencePreinit_.erase(it);
72  return encode((void*)add);
73  }
74  }
75  return (*it).second;
76  }
77 
78  const void* decode(unsigned int index) { return decoder_[index]; }
79  void fillReserved(const void* add, unsigned int i) {
80  // translation_[*name]=current_;
82  if (decoder_.size() <= i)
83  decoder_.push_back(add);
84  else
86  }
87  void updateReserved(const void* add) {
90  }
92  for (unsigned int i = currentReserved_; i < reserved_; i++)
94  }
95  void update(const void* add) {
96  // translation_[*name]=current_;
98  decoder_.push_back(add);
99  current_++;
100  }
101 
103  // translation_[*name]=current_;
105  decoder_.push_back((void*)&add);
106  current_++;
107  }
108 
109  unsigned int vecsize() { return decoder_.size(); }
110  std::unordered_map<const void*, int> quickReference_;
111  std::unordered_map<std::string, int> quickReferencePreinit_;
112  std::vector<const void*> decoder_;
113  unsigned int reserved_;
114  int current_;
117  };
118 
120  public:
121  struct MonitorData {
122  //fastpath global monitorables
130 
131  unsigned int varIndexThrougput_;
132 
133  //per stream
134  std::vector<unsigned int> microstateEncoded_;
135  std::vector<unsigned int> ministateEncoded_;
136  std::vector<jsoncollector::AtomicMonUInt*> processed_;
138  std::vector<unsigned int> threadMicrostateEncoded_;
139  std::vector<unsigned int> inputState_;
140 
141  //tracking luminosity of a stream
142  std::vector<unsigned int> streamLumi_;
143 
144  //N bins for histograms
145  unsigned int macrostateBins_;
146  unsigned int ministateBins_;
147  unsigned int microstateBins_;
148  unsigned int inputstateBins_;
149 
150  //global state
151  std::atomic<FastMonState::Macrostate> macrostate_;
152 
153  //per stream
154  std::vector<ContainableAtomic<const std::string*>> ministate_;
155  std::vector<ContainableAtomic<const void*>> microstate_;
156  std::vector<ContainableAtomic<unsigned char>> microstateAcqFlag_;
157  std::vector<ContainableAtomic<const void*>> threadMicrostate_;
158 
160  std::vector<FastMonEncoding> encPath_;
161 
162  //unsigned int prescaleindex_; // ditto
163 
166  fastThroughputJ_ = 0;
167  fastAvgLeadTimeJ_ = 0;
169  fastLockWaitJ_ = 0;
170  fastLockCountJ_ = 0;
171  fastMacrostateJ_.setName("Macrostate");
172  fastThroughputJ_.setName("Throughput");
173  fastAvgLeadTimeJ_.setName("AverageLeadTime");
174  fastFilesProcessedJ_.setName("FilesProcessed");
175  fastLockWaitJ_.setName("LockWaitUs");
176  fastLockCountJ_.setName("LockCount");
177 
179  fastPathProcessedJ_.setName("Processed");
180  }
181 
182  //to be called after fast monitor is constructed
183  void registerVariables(jsoncollector::FastMonitor* fm, unsigned int nStreams, unsigned int nThreads) {
184  //tell FM to track these global variables(for fast and slow monitoring)
191 
192  for (unsigned int i = 0; i < nStreams; i++) {
194  *p = 0;
195  processed_.push_back(p);
196  streamLumi_.push_back(0);
197  }
198 
200  ministateEncoded_.resize(nStreams);
202  inputState_.resize(nStreams);
203  for (unsigned int j = 0; j < inputState_.size(); j++)
204  inputState_[j] = 0;
205 
206  //tell FM to track these int vectors
208 
209  if (nThreads <= nStreams) //no overlapping in module execution per stream
211  else
213 
214  fm->registerStreamMonitorableUIntVecAtomic("Processed", &processed_, false, nullptr);
215 
216  //input source state tracking (not stream, but other than first item in vector is set to Ignore state)
218 
219  //global cumulative event counter is used for fast path
221 
222  //provide vector with updated per stream lumis and let it finish initialization
223  fm->commit(&streamLumi_);
224  }
225  };
226 
227  //constructor
229 
230  void resetFastMonitor(std::string const& microStateDefPath, std::string const& fastMicroStateDefPath) {
231  std::string defGroup = "data";
232  jsonMonitor_ = std::make_unique<jsoncollector::FastMonitor>(microStateDefPath, defGroup, false);
233  if (!fastMicroStateDefPath.empty())
234  jsonMonitor_->addFastPathDefinition(fastMicroStateDefPath, defGroup, false);
235  }
236 
238  assert(!m_thread);
239  m_thread = std::make_shared<std::thread>(fp, cp);
240  }
241  void stop() {
242  if (m_thread.get()) {
243  m_stoprequest = true;
244  m_thread->join();
245  m_thread.reset();
246  }
247  }
248 
250 
251  private:
252  std::atomic<bool> m_stoprequest;
253  std::shared_ptr<std::thread> m_thread;
256 
257  std::unique_ptr<jsoncollector::FastMonitor> jsonMonitor_;
258 
259  friend class FastMonitoringService;
260  };
261 } //end namespace evf
262 #endif
evf::FastMonitoringThread::MonitorData::microstateEncoded_
std::vector< unsigned int > microstateEncoded_
Definition: FastMonitoringThread.h:134
evf::FastMonitoringThread::MonitorData::encModule_
FastMonEncoding encModule_
Definition: FastMonitoringThread.h:159
evf::FastMonitoringThread::MonitorData::processed_
std::vector< jsoncollector::AtomicMonUInt * > processed_
Definition: FastMonitoringThread.h:136
evf::FastMonEncoding::update
void update(const void *add)
Definition: FastMonitoringThread.h:95
evf::FastMonitoringThread::monlock_
std::mutex monlock_
Definition: FastMonitoringThread.h:255
evf::FastMonEncoding::fillReserved
void fillReserved(const void *add, unsigned int i)
Definition: FastMonitoringThread.h:79
mps_fire.i
i
Definition: mps_fire.py:428
evf::FastMonitoringThread::MonitorData::MonitorData
MonitorData()
Definition: FastMonitoringThread.h:164
evf::FastMonitoringThread::MonitorData::threadMicrostateEncoded_
std::vector< unsigned int > threadMicrostateEncoded_
Definition: FastMonitoringThread.h:138
funct::false
false
Definition: Factorize.h:29
evf::FastMonitoringThread::MonitorData::fastAvgLeadTimeJ_
jsoncollector::DoubleJ fastAvgLeadTimeJ_
Definition: FastMonitoringThread.h:125
evf::FastMonitoringThread::MonitorData::fastLockWaitJ_
jsoncollector::DoubleJ fastLockWaitJ_
Definition: FastMonitoringThread.h:127
evf::nSpecialModules
constexpr int nSpecialModules
Definition: FastMonitoringThread.h:17
evf::ContainableAtomic::operator=
ContainableAtomic< T > & operator=(T iValue)
Definition: FastMonitoringThread.h:31
evf::FastMonitoringThread::MonitorData::macrostate_
std::atomic< FastMonState::Macrostate > macrostate_
Definition: FastMonitoringThread.h:151
evf::FastMonEncoding
Definition: FastMonitoringThread.h:40
jsoncollector::FastMonitor::commit
void commit(std::vector< unsigned int > *streamLumisPtr)
Definition: FastMonitor.cc:114
evf::FastMonitoringThread::MonitorData::inputstateBins_
unsigned int inputstateBins_
Definition: FastMonitoringThread.h:148
AlCaHLTBitMon_ParallelJobs.p
p
Definition: AlCaHLTBitMon_ParallelJobs.py:153
evf::FastMonEncoding::currentReserved_
int currentReserved_
Definition: FastMonitoringThread.h:115
evf::ContainableAtomic::ContainableAtomic
ContainableAtomic(T iValue)
Definition: FastMonitoringThread.h:29
evf::ContainableAtomic::ContainableAtomic
ContainableAtomic(ContainableAtomic< T > const &iOther)
Definition: FastMonitoringThread.h:30
evf::FastMonEncoding::updatePreinit
void updatePreinit(std::string const &add)
Definition: FastMonitoringThread.h:102
cms::cuda::assert
assert(be >=bs)
evf::FastMonEncoding::reserved_
unsigned int reserved_
Definition: FastMonitoringThread.h:113
personalPlayback.fp
fp
Definition: personalPlayback.py:523
evf::FastMonitoringThread
Definition: FastMonitoringThread.h:119
evf::nReservedPaths
constexpr int nReservedPaths
Definition: FastMonitoringThread.h:18
evf::FastMonitoringThread::MonitorData::microstate_
std::vector< ContainableAtomic< const void * > > microstate_
Definition: FastMonitoringThread.h:155
hgcal_conditions::parameters
Definition: HGCConditions.h:86
evf::ContainableAtomic::m_value
std::atomic< T > m_value
Definition: FastMonitoringThread.h:37
evf::FastMonitoringThread::resetFastMonitor
void resetFastMonitor(std::string const &microStateDefPath, std::string const &fastMicroStateDefPath)
Definition: FastMonitoringThread.h:230
evf::FastMonEncoding::~FastMonEncoding
~FastMonEncoding()
Definition: FastMonitoringThread.h:46
evf::FastMonEncoding::completeReservedWithDummies
void completeReservedWithDummies()
Definition: FastMonitoringThread.h:91
evf::FastMonitoringThread::FastMonitoringThread
FastMonitoringThread()
Definition: FastMonitoringThread.h:228
runTheMatrix.nStreams
nStreams
Definition: runTheMatrix.py:371
edm::ModuleDescription
Definition: ModuleDescription.h:21
jsoncollector::FastMonitor::registerGlobalMonitorable
void registerGlobalMonitorable(JsonMonitorable *newMonitorable, bool NAifZeroUpdates, unsigned int *nBins=nullptr)
Definition: FastMonitor.cc:65
evf::FastMonEncoding::decode
const void * decode(unsigned int index)
Definition: FastMonitoringThread.h:78
evf::FastMonitoringThread::MonitorData::fastFilesProcessedJ_
jsoncollector::IntJ fastFilesProcessedJ_
Definition: FastMonitoringThread.h:126
evf::FastMonitoringThread::m_data
MonitorData m_data
Definition: FastMonitoringThread.h:254
evf::FastMonitoringService
Definition: FastMonitoringService.h:153
jsoncollector::FastMonitor::registerStreamMonitorableUIntVec
void registerStreamMonitorableUIntVec(std::string const &name, std::vector< unsigned int > *inputs, bool NAifZeroUpdates, unsigned int *nBins=nullptr)
Definition: FastMonitor.cc:86
jsoncollector::JsonMonitorable::setName
virtual void setName(std::string name)
Definition: JsonMonitorable.h:38
evf::FastMonEncoding::current_
int current_
Definition: FastMonitoringThread.h:114
jsoncollector::DoubleJ
Definition: JsonMonitorable.h:106
evf::FastMonitoringThread::MonitorData::inputState_
std::vector< unsigned int > inputState_
Definition: FastMonitoringThread.h:139
evf::FastMonEncoding::updateReserved
void updateReserved(const void *add)
Definition: FastMonitoringThread.h:87
evf::FastMonEncoding::quickReferencePreinit_
std::unordered_map< std::string, int > quickReferencePreinit_
Definition: FastMonitoringThread.h:111
evf::FastMonEncoding::encodeString
int encodeString(const std::string *add)
Definition: FastMonitoringThread.h:60
evf::nReservedModules
constexpr int nReservedModules
Definition: FastMonitoringThread.h:16
evf::FastMonitoringThread::stop
void stop()
Definition: FastMonitoringThread.h:241
evf::FastMonitoringThread::MonitorData::varIndexThrougput_
unsigned int varIndexThrougput_
Definition: FastMonitoringThread.h:131
svgfig.load
def load(fileName)
Definition: svgfig.py:547
evf::FastMonitoringThread::MonitorData::fastThroughputJ_
jsoncollector::DoubleJ fastThroughputJ_
Definition: FastMonitoringThread.h:124
runTheMatrix.nThreads
nThreads
Definition: runTheMatrix.py:370
evf::FastMonitoringThread::MonitorData
Definition: FastMonitoringThread.h:121
AlCaHLTBitMon_QueryRunRegistry.string
string
Definition: AlCaHLTBitMon_QueryRunRegistry.py:256
evf::FastMonitoringThread::MonitorData::microstateBins_
unsigned int microstateBins_
Definition: FastMonitoringThread.h:147
evf::FastMonitoringThread::m_stoprequest
std::atomic< bool > m_stoprequest
Definition: FastMonitoringThread.h:252
PVValHelper::add
void add(std::map< std::string, TH1 * > &h, TH1 *hist)
Definition: PVValidationHelpers.cc:12
evf::FastMonitoringThread::MonitorData::streamLumi_
std::vector< unsigned int > streamLumi_
Definition: FastMonitoringThread.h:142
evf::ContainableAtomic::ContainableAtomic
ContainableAtomic()
Definition: FastMonitoringThread.h:28
evf::FastMonEncoding::decoder_
std::vector< const void * > decoder_
Definition: FastMonitoringThread.h:112
jsoncollector::AtomicMonUInt
unsigned int AtomicMonUInt
Definition: DataPoint.h:31
evf::FastMonitoringThread::MonitorData::registerVariables
void registerVariables(jsoncollector::FastMonitor *fm, unsigned int nStreams, unsigned int nThreads)
Definition: FastMonitoringThread.h:183
FastMonitoringService.h
mutex
static std::mutex mutex
Definition: Proxy.cc:8
jsoncollector::FastMonitor::registerStreamMonitorableUIntVecAtomic
void registerStreamMonitorableUIntVecAtomic(std::string const &name, std::vector< AtomicMonUInt * > *inputs, bool NAifZeroUpdates, unsigned int *nBins=nullptr)
Definition: FastMonitor.cc:99
evf::FastMonitoringThread::MonitorData::macrostateBins_
unsigned int macrostateBins_
Definition: FastMonitoringThread.h:145
jsoncollector::FastMonitor::registerFastGlobalMonitorable
void registerFastGlobalMonitorable(JsonMonitorable *newMonitorable)
Definition: FastMonitor.cc:79
evf::FastMonitoringThread::MonitorData::fastEventsProcessedJ_
jsoncollector::IntJ fastEventsProcessedJ_
Definition: FastMonitoringThread.h:129
res
Definition: Electron.h:6
jsoncollector::FastMonitor
Definition: FastMonitor.h:19
evf::FastMonEncoding::encode
int encode(const void *add) const
Definition: FastMonitoringThread.h:51
evf::FastMonitoringThread::m_thread
std::shared_ptr< std::thread > m_thread
Definition: FastMonitoringThread.h:253
evf::FastMonState::Macrostate
Macrostate
Definition: FastMonitoringService.h:67
evf::FastMonEncoding::vecsize
unsigned int vecsize()
Definition: FastMonitoringThread.h:109
evf::FastMonEncoding::dummiesForReserved_
edm::ModuleDescription * dummiesForReserved_
Definition: FastMonitoringThread.h:116
evf::FastMonitoringThread::~FastMonitoringThread
~FastMonitoringThread()
Definition: FastMonitoringThread.h:249
evf::ContainableAtomic
Definition: FastMonitoringThread.h:27
evf::FastMonitoringThread::MonitorData::encPath_
std::vector< FastMonEncoding > encPath_
Definition: FastMonitoringThread.h:160
evf::FastMonitoringThread::MonitorData::fastPathProcessedJ_
jsoncollector::IntJ fastPathProcessedJ_
Definition: FastMonitoringThread.h:137
evf::FastMonEncoding::quickReference_
std::unordered_map< const void *, int > quickReference_
Definition: FastMonitoringThread.h:110
T
long double T
Definition: Basic3DVectorLD.h:48
evf::FastMonitoringThread::jsonMonitor_
std::unique_ptr< jsoncollector::FastMonitor > jsonMonitor_
Definition: FastMonitoringThread.h:257
evf::FastMonitoringThread::MonitorData::ministate_
std::vector< ContainableAtomic< const std::string * > > ministate_
Definition: FastMonitoringThread.h:154
evf
Definition: fillJson.h:27
evf::FastMonitoringThread::MonitorData::fastLockCountJ_
jsoncollector::IntJ fastLockCountJ_
Definition: FastMonitoringThread.h:128
evf::FastMonitoringThread::MonitorData::fastMacrostateJ_
jsoncollector::IntJ fastMacrostateJ_
Definition: FastMonitoringThread.h:123
evf::FastMonState::sInit
Definition: FastMonitoringService.h:68
jsoncollector::IntJ
Definition: JsonMonitorable.h:66
AlignmentPI::index
index
Definition: AlignmentPayloadInspectorHelper.h:46
SiStripCommissioningSource_FromEDM_cfg.FastMonitoringService
FastMonitoringService
Definition: SiStripCommissioningSource_FromEDM_cfg.py:49
evf::FastMonitoringThread::MonitorData::threadMicrostate_
std::vector< ContainableAtomic< const void * > > threadMicrostate_
Definition: FastMonitoringThread.h:157
dqmiolumiharvest.j
j
Definition: dqmiolumiharvest.py:66
evf::FastMonEncoding::FastMonEncoding
FastMonEncoding(unsigned int res)
Definition: FastMonitoringThread.h:41
evf::FastMonitoringThread::MonitorData::ministateBins_
unsigned int ministateBins_
Definition: FastMonitoringThread.h:146
evf::FastMonitoringThread::MonitorData::ministateEncoded_
std::vector< unsigned int > ministateEncoded_
Definition: FastMonitoringThread.h:135
FastMonitor.h
evf::FastMonitoringThread::MonitorData::microstateAcqFlag_
std::vector< ContainableAtomic< unsigned char > > microstateAcqFlag_
Definition: FastMonitoringThread.h:156
evf::FastMonitoringThread::start
void start(void(FastMonitoringService::*fp)(), FastMonitoringService *cp)
Definition: FastMonitoringThread.h:237