CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Groups Pages
DataPoint.cc
Go to the documentation of this file.
1 /*
2  * DataPoint.cc
3  *
4  * Created on: Sep 24, 2012
5  * Author: aspataru
6  */
7 
9 
10 #include <tbb/concurrent_vector.h>
11 
12 #include <algorithm>
13 #include <cassert>
14 
15 //max collected updates per lumi
16 #define MAXUPDATES 0xffffffff
17 #define MAXBINS
18 
19 using namespace jsoncollector;
20 
21 namespace jsoncollector {
22  template class HistoJ<unsigned int>;
23  template class HistoJ<double>;
24 } // namespace jsoncollector
25 
26 const std::string DataPoint::SOURCE = "source";
27 const std::string DataPoint::DEFINITION = "definition";
28 const std::string DataPoint::DATA = "data";
29 
31  if (buf_)
32  delete[] buf_;
33 }
34 
35 /*
36  *
37  * Method implementation for simple DataPoint usage
38  *
39  */
40 
41 void DataPoint::serialize(Json::Value &root) const {
42  if (!source_.empty()) {
43  root[SOURCE] = source_;
44  }
45  if (!definition_.empty()) {
46  root[DEFINITION] = definition_;
47  }
48  for (unsigned int i = 0; i < data_.size(); i++)
49  root[DATA].append(data_[i]);
50 }
51 
53  source_ = root.get(SOURCE, "").asString();
54  definition_ = root.get(DEFINITION, "").asString();
55  if (root.get(DATA, "").isArray()) {
56  unsigned int size = root.get(DATA, "").size();
57  for (unsigned int i = 0; i < size; i++) {
58  data_.push_back(root.get(DATA, "")[i].asString());
59  }
60  }
61 }
62 
63 /*
64  *
65  * Method implementation for the new multi-threaded model
66  *
67  * */
68 
69 void DataPoint::trackMonitorable(JsonMonitorable const *monitorable, bool NAifZeroUpdates) {
70  name_ = monitorable->getName();
71  tracked_ = (void const *)monitorable;
72  if (dynamic_cast<IntJ const *>(monitorable))
73  monType_ = TYPEINT;
74  else if (dynamic_cast<DoubleJ const *>(monitorable))
76  else if (dynamic_cast<StringJ const *>(monitorable))
78  else
79  assert(0);
80  NAifZeroUpdates_ = NAifZeroUpdates;
81 }
82 
84  std::vector<unsigned int> const *monvec,
85  bool NAifZeroUpdates) {
86  name_ = name;
87  tracked_ = (void const *)monvec;
88  isStream_ = true;
90  NAifZeroUpdates_ = NAifZeroUpdates;
91  makeStreamLumiMap(monvec->size());
92 }
93 
95  std::vector<AtomicMonUInt *> const *monvec,
96  bool NAifZeroUpdates) {
97  name_ = name;
98  tracked_ = (void const *)monvec;
99  isStream_ = true;
100  isAtomic_ = true;
101  monType_ = TYPEUINT;
102  NAifZeroUpdates_ = NAifZeroUpdates;
103  makeStreamLumiMap(monvec->size());
104 }
105 
106 void DataPoint::makeStreamLumiMap(unsigned int size) {
107  for (unsigned int i = 0; i < size; i++) {
108  streamDataMaps_.push_back(MonPtrMap());
109  }
110 }
111 
112 void DataPoint::serialize(Json::Value &root, bool rootInit, std::string const &input) const {
113  if (rootInit) {
114  if (!source_.empty())
115  root[SOURCE] = source_;
116  if (!definition_.empty())
117  root[DEFINITION] = definition_;
118  }
119  root[DATA].append(input);
120 }
121 
122 void DataPoint::snap(unsigned int lumi) {
123  isCached_ = false;
124  if (isStream_) {
125  if (monType_ == TYPEUINT) {
126  for (unsigned int i = 0; i < streamDataMaps_.size(); i++) {
127  unsigned int streamLumi_ = streamLumisPtr_->at(i); //get currently processed stream lumi
128  unsigned int monVal;
129 
130 #if ATOMIC_LEVEL > 0
131  if (isAtomic_)
132  monVal =
133  (static_cast<std::vector<AtomicMonUInt *> const *>(tracked_))->at(i)->load(std::memory_order_relaxed);
134 #else
135  if (isAtomic_)
136  monVal = *((static_cast<std::vector<AtomicMonUInt *> const *>(tracked_))->at(i));
137 #endif
138  else
139  monVal = (static_cast<std::vector<unsigned int> const *>(tracked_))->at(i);
140 
141  auto itr = streamDataMaps_[i].find(streamLumi_);
142  if (itr == streamDataMaps_[i].end()) {
143  if (opType_ == OPHISTO) {
144  if (*nBinsPtr_) {
146  nh->update(monVal);
147  streamDataMaps_[i][streamLumi_] = nh;
148  }
149  } else { //default to SUM
150  IntJ *nj = new IntJ;
151  nj->update(monVal);
152  streamDataMaps_[i][streamLumi_] = nj;
153  }
154  } else {
155  if (opType_ == OPHISTO) {
156  if (*nBinsPtr_) {
157  (static_cast<HistoJ<unsigned int> *>(itr->second.get()))->update(monVal);
158  }
159  } else {
160  *(static_cast<IntJ *>(itr->second.get())) = monVal;
161  }
162  }
163  }
164  } else
165  assert(monType_ != TYPEINT); //not yet implemented, application error
166  } else
167  snapGlobal(lumi);
168 }
169 
170 void DataPoint::snapGlobal(unsigned int lumi) {
171  isCached_ = false;
172  if (isStream_)
173  return;
174  auto itr = globalDataMap_.find(lumi);
175  if (itr == globalDataMap_.end()) {
176  if (monType_ == TYPEINT) {
177  IntJ *ij = new IntJ;
178  ij->update((static_cast<IntJ const *>(tracked_))->value());
179  globalDataMap_[lumi] = ij;
180  }
181  if (monType_ == TYPEDOUBLE) {
182  DoubleJ *dj = new DoubleJ;
183  dj->update((static_cast<DoubleJ const *>(tracked_))->value());
184  globalDataMap_[lumi] = dj;
185  }
186  if (monType_ == TYPESTRING) {
187  StringJ *sj = new StringJ;
188  sj->update((static_cast<StringJ const *>(tracked_))->value());
189  globalDataMap_[lumi] = sj;
190  }
191  } else {
192  if (monType_ == TYPEINT)
193  static_cast<IntJ *>(itr->second.get())->update((static_cast<IntJ const *>(tracked_))->value());
194  else if (monType_ == TYPEDOUBLE)
195  static_cast<DoubleJ *>(itr->second.get())->update((static_cast<DoubleJ const *>(tracked_))->value());
196  else if (monType_ == TYPESTRING)
197  static_cast<StringJ *>(itr->second.get())->concatenate((static_cast<StringJ const *>(tracked_))->value());
198  }
199 }
200 
201 void DataPoint::snapStreamAtomic(unsigned int lumi, unsigned int streamID) {
202  if (!isStream_ || !isAtomic_)
203  return;
204  isCached_ = false;
205  if (monType_ == TYPEUINT) {
206  unsigned int monVal;
207 #if ATOMIC_LEVEL > 0
208  if (isAtomic_)
209  monVal =
210  (static_cast<std::vector<AtomicMonUInt *> const *>(tracked_))->at(streamID)->load(std::memory_order_relaxed);
211 #else
212  if (isAtomic_)
213  monVal = *((static_cast<std::vector<AtomicMonUInt *> const *>(tracked_))->at(streamID));
214 #endif
215  else
216  monVal = (static_cast<std::vector<unsigned int> const *>(tracked_))->at(streamID);
217 
218  auto itr = streamDataMaps_[streamID].find(lumi);
219  if (itr == streamDataMaps_[streamID].end()) //insert
220  {
221  if (opType_ == OPHISTO) {
222  if (*nBinsPtr_) {
224  h->update(monVal);
225  streamDataMaps_[streamID][lumi] = h;
226  }
227  } else { //default to SUM
228 
229  IntJ *h = new IntJ;
230  h->update(monVal);
231  streamDataMaps_[streamID][lumi] = h;
232  }
233  } else {
234  if (opType_ == OPHISTO) {
235  if (*nBinsPtr_) {
236  static_cast<HistoJ<unsigned int> *>(itr->second.get())->update(monVal);
237  }
238  } else
239  *(static_cast<IntJ *>(itr->second.get())) = monVal;
240  }
241  } else
242  assert(monType_ != TYPEINT); //not yet implemented
243 }
244 
246  if (tracked_) {
247  if (isStream_) {
248  std::stringstream ss;
249  if (sid < 0) {
250  if (isAtomic_) {
251  // if ATOMIC_LEVEL>0
252  // ss << (unsigned int) (static_cast<std::vector<AtomicMonUInt*>*>(tracked_))->at(fastIndex_)->load(std::memory_order_relaxed);
253 
254  ss << (unsigned int)*((static_cast<std::vector<AtomicMonUInt *> const *>(tracked_))->at(fastIndex_));
255  fastIndex_ = (fastIndex_ + 1) % (static_cast<std::vector<AtomicMonUInt *> const *>(tracked_))->size();
256  } else {
257  ss << (static_cast<std::vector<unsigned int> const *>(tracked_))->at(fastIndex_);
258  fastIndex_ = (fastIndex_ + 1) % (static_cast<std::vector<unsigned int> const *>(tracked_))->size();
259  }
260 
261  } else {
262  if (isAtomic_)
263  ss << (unsigned int)*((static_cast<std::vector<AtomicMonUInt *> const *>(tracked_))->at(unsigned(sid)));
264  else
265  ss << (static_cast<std::vector<unsigned int> const *>(tracked_))->at(unsigned(sid));
266  }
267  return ss.str();
268  }
269  return (static_cast<JsonMonitorable const *>(tracked_))->toString();
270  }
271  return std::string("");
272 }
273 
275  assert(monType_ == TYPEUINT && isStream_); //for now only support UINT and SUM for stream variables
276  IntJ *newJ = new IntJ;
277  for (unsigned int i = 0; i < streamDataMaps_.size(); i++) {
278  auto itr = streamDataMaps_[i].find(lumi);
279  if (itr != streamDataMaps_[i].end()) {
280  newJ->add(static_cast<IntJ *>(itr->second.get())->value());
281  }
282  }
283  cacheI_ = newJ->value();
284  isCached_ = true;
285  return newJ; //assume the caller takes care of deleting the object
286 }
287 
288 void DataPoint::mergeAndSerialize(Json::Value &root, unsigned int lumi, bool initJsonValue, int sid) {
289  if (initJsonValue) {
290  root[SOURCE] = source_;
291  root[DEFINITION] = definition_;
292  }
293 
294  if (isDummy_) {
295  root[DATA].append("N/A");
296  return;
297  }
298  if (!isStream_) {
299  auto itr = globalDataMap_.find(lumi);
300  if (itr != globalDataMap_.end()) {
301  root[DATA].append(itr->second.get()->toString());
302  } else {
303  if (NAifZeroUpdates_)
304  root[DATA].append("N/A");
305  else if (monType_ == TYPESTRING)
306  root[DATA].append("");
307  else
308  root[DATA].append("0");
309  }
310  return;
311  } else {
313  if (isCached_) {
314  std::stringstream ss;
315  ss << cacheI_;
316  root[DATA].append(ss.str());
317  return;
318  }
319  if (opType_ != OPHISTO) { //sum is default
320  std::stringstream ss;
321  unsigned int updates = 0;
322  unsigned int sum = 0;
323  if (sid < 1)
324  for (unsigned int i = 0; i < streamDataMaps_.size(); i++) {
325  auto itr = streamDataMaps_[i].find(lumi);
326  if (itr != streamDataMaps_[i].end()) {
327  sum += static_cast<IntJ *>(itr->second.get())->value();
328  updates++;
329  }
330  }
331  else {
332  auto itr = streamDataMaps_[unsigned(sid)].find(lumi);
333  if (itr != streamDataMaps_[unsigned(sid)].end()) {
334  sum += static_cast<IntJ *>(itr->second.get())->value();
335  updates++;
336  }
337  }
338  if (!updates && NAifZeroUpdates_)
339  ss << "N/A";
340  ss << sum;
341  root[DATA].append(ss.str());
342  return;
343  }
344  if (opType_ == OPHISTO) {
345  if (nBinsPtr_ == nullptr) {
346  root[DATA].append("N/A");
347  return;
348  }
349  if (*nBinsPtr_ > bufLen_) {
350  if (buf_)
351  delete[] buf_;
352  bufLen_ = *nBinsPtr_;
353  buf_ = new uint32_t[bufLen_];
354  }
355  memset(buf_, 0, bufLen_ * sizeof(uint32_t));
356  unsigned int updates = 0;
357  if (sid < 1)
358  for (unsigned int i = 0; i < streamDataMaps_.size(); i++) {
359  auto itr = streamDataMaps_[i].find(lumi);
360  if (itr != streamDataMaps_[i].end()) {
361  HistoJ<unsigned int> *monObj = static_cast<HistoJ<unsigned int> *>(itr->second.get());
362  updates += monObj->getUpdates();
363  auto &hvec = monObj->value();
364  for (unsigned int j = 0; j < hvec.size(); j++) {
365  unsigned int thisbin = (unsigned int)hvec[j];
366  if (thisbin < *nBinsPtr_) {
367  buf_[thisbin]++;
368  }
369  }
370  }
371  }
372  else {
373  auto itr = streamDataMaps_[unsigned(sid)].find(lumi);
374  if (itr != streamDataMaps_[unsigned(sid)].end()) {
375  HistoJ<unsigned int> *monObj = static_cast<HistoJ<unsigned int> *>(itr->second.get());
376  updates += monObj->getUpdates();
377  auto &hvec = monObj->value();
378  for (unsigned int j = 0; j < hvec.size(); j++) {
379  unsigned int thisbin = (unsigned int)hvec[j];
380  if (thisbin < *nBinsPtr_) {
381  buf_[thisbin]++;
382  }
383  }
384  }
385  }
386  std::stringstream ss;
387  if (!*nBinsPtr_ || (!updates && NAifZeroUpdates_))
388  ss << "N/A";
389  else {
390  ss << "[";
391  if (*nBinsPtr_) {
392  for (unsigned int i = 0; i < *nBinsPtr_ - 1; i++) {
393  ss << buf_[i] << ",";
394  }
395  ss << buf_[*nBinsPtr_ - 1];
396  }
397  ss << "]";
398  }
399  root[DATA].append(ss.str());
400  return;
401  }
402  }
403 }
404 
405 //wipe out data that will no longer be used
406 void DataPoint::discardCollected(unsigned int lumi) {
407  for (unsigned int i = 0; i < streamDataMaps_.size(); i++) {
408  auto itr = streamDataMaps_[i].find(lumi);
409  if (itr != streamDataMaps_[i].end())
410  streamDataMaps_[i].erase(lumi);
411  }
412 
413  auto itr = globalDataMap_.find(lumi);
414  if (itr != globalDataMap_.end())
415  globalDataMap_.erase(lumi);
416 }
OperationType opType_
Definition: DataPoint.h:140
Value get(UInt index, const Value &defaultValue) const
void snap(unsigned int lumi)
Definition: DataPoint.cc:122
std::vector< unsigned int > * streamLumisPtr_
Definition: DataPoint.h:131
void discardCollected(unsigned int forLumi)
Definition: DataPoint.cc:406
static const std::string SOURCE
Definition: DataPoint.h:116
boost::dynamic_bitset append(const boost::dynamic_bitset<> &bs1, const boost::dynamic_bitset<> &bs2)
this method takes two bitsets bs1 and bs2 and returns result of bs2 appended to the end of bs1 ...
Value & append(const Value &value)
Append value to array at the end.
void update(std::string const &newStr)
Represents a JSON value.
Definition: value.h:99
unsigned int fastIndex_
Definition: DataPoint.h:151
std::map< unsigned int, JsonMonPtr > MonPtrMap
Definition: DataPoint.h:34
assert(be >=bs)
void trackMonitorable(JsonMonitorable const *monitorable, bool NAifZeroUpdates)
Definition: DataPoint.cc:69
std::vector< T > & value()
std::string definition_
Definition: DataPoint.h:123
static std::string const input
Definition: EdmProvDump.cc:47
static const std::string DATA
Definition: DataPoint.h:118
UInt size() const
Number of values in array or object.
#define MAXUPDATES
Definition: DataPoint.cc:16
std::vector< MonPtrMap > streamDataMaps_
Definition: DataPoint.h:126
bool isArray() const
void snapGlobal(unsigned int lumi)
Definition: DataPoint.cc:170
void serialize(Json::Value &root) const override
Definition: DataPoint.cc:41
std::vector< std::string > data_
Definition: DataPoint.h:124
JsonMonitorable * mergeAndRetrieveValue(unsigned int forLumi)
Definition: DataPoint.cc:274
void add(long sth)
virtual std::string const & getName() const
std::string fastOutCSV(int sid=-1)
Definition: DataPoint.cc:245
unsigned int bufLen_
Definition: DataPoint.h:145
void const * tracked_
Definition: DataPoint.h:128
uint32_t nh
list lumi
Definition: dqmdumpme.py:53
void trackVectorUIntAtomic(std::string const &name, std::vector< AtomicMonUInt * > const *monvec, bool NAifZeroUpdates)
Definition: DataPoint.cc:94
void trackVectorUInt(std::string const &name, std::vector< unsigned int > const *monvec, bool NAifZeroUpdates)
Definition: DataPoint.cc:83
std::string asString() const
void deserialize(Json::Value &root) override
Definition: DataPoint.cc:52
void mergeAndSerialize(Json::Value &jsonRoot, unsigned int lumi, bool initJsonValue, int sid)
Definition: DataPoint.cc:288
void update(long sth)
#define update(a, b)
string end
Definition: dataset.py:937
void snapStreamAtomic(unsigned int lumi, unsigned int streamID)
Definition: DataPoint.cc:201
void makeStreamLumiMap(unsigned int size)
Definition: DataPoint.cc:106
The Signals That Services Can Subscribe To This is based on ActivityRegistry h
Helper function to determine trigger accepts.
Definition: Activities.doc:4
static const std::string DEFINITION
Definition: DataPoint.h:117
unsigned int * nBinsPtr_
Definition: DataPoint.h:147
tuple size
Write out results.
void update(double sth)