CMS 3D CMS Logo

DataPoint.cc
Go to the documentation of this file.
1 /*
2  * DataPoint.cc
3  *
4  * Created on: Sep 24, 2012
5  * Author: aspataru
6  */
7 
9 
10 #include <algorithm>
11 #include <cassert>
12 #include <cstring>
13 
14 //max collected updates per lumi
15 #define MAXUPDATES 0xffffffff
16 #define MAXBINS
17 
18 using namespace jsoncollector;
19 
20 namespace jsoncollector {
21  template class HistoJ<unsigned int>;
22  template class HistoJ<double>;
23 } // namespace jsoncollector
24 
25 const std::string DataPoint::SOURCE = "source";
26 const std::string DataPoint::DEFINITION = "definition";
27 const std::string DataPoint::DATA = "data";
28 
30  if (buf_)
31  delete[] buf_;
32 }
33 
34 /*
35  *
36  * Method implementation for simple DataPoint usage
37  *
38  */
39 
41  if (!source_.empty()) {
42  root[SOURCE] = source_;
43  }
44  if (!definition_.empty()) {
46  }
47  for (unsigned int i = 0; i < data_.size(); i++)
48  root[DATA].append(data_[i]);
49 }
50 
52  source_ = root.get(SOURCE, "").asString();
53  definition_ = root.get(DEFINITION, "").asString();
54  if (root.get(DATA, "").isArray()) {
55  unsigned int size = root.get(DATA, "").size();
56  for (unsigned int i = 0; i < size; i++) {
57  data_.push_back(root.get(DATA, "")[i].asString());
58  }
59  }
60 }
61 
62 /*
63  *
64  * Method implementation for the new multi-threaded model
65  *
66  * */
67 
68 void DataPoint::trackMonitorable(JsonMonitorable const *monitorable, bool NAifZeroUpdates) {
69  name_ = monitorable->getName();
70  tracked_ = (void const *)monitorable;
71  if (dynamic_cast<IntJ const *>(monitorable))
72  monType_ = TYPEINT;
73  else if (dynamic_cast<DoubleJ const *>(monitorable))
75  else if (dynamic_cast<StringJ const *>(monitorable))
77  else
78  assert(0);
79  NAifZeroUpdates_ = NAifZeroUpdates;
80 }
81 
83  std::vector<unsigned int> const *monvec,
84  bool NAifZeroUpdates) {
85  name_ = name;
86  tracked_ = (void const *)monvec;
87  isStream_ = true;
89  NAifZeroUpdates_ = NAifZeroUpdates;
90  makeStreamLumiMap(monvec->size());
91 }
92 
94  std::vector<AtomicMonUInt *> const *monvec,
95  bool NAifZeroUpdates) {
96  name_ = name;
97  tracked_ = (void const *)monvec;
98  isStream_ = true;
99  isAtomic_ = true;
100  monType_ = TYPEUINT;
101  NAifZeroUpdates_ = NAifZeroUpdates;
102  makeStreamLumiMap(monvec->size());
103 }
104 
105 void DataPoint::makeStreamLumiMap(unsigned int size) {
106  for (unsigned int i = 0; i < size; i++) {
107  streamDataMaps_.push_back(MonPtrMap());
108  }
109 }
110 
111 void DataPoint::serialize(Json::Value &root, bool rootInit, std::string const &input) const {
112  if (rootInit) {
113  if (!source_.empty())
114  root[SOURCE] = source_;
115  if (!definition_.empty())
117  }
118  root[DATA].append(input);
119 }
120 
121 void DataPoint::snap(unsigned int lumi) {
122  isCached_ = false;
123  if (isStream_) {
124  if (monType_ == TYPEUINT) {
125  for (unsigned int i = 0; i < streamDataMaps_.size(); i++) {
126  unsigned int streamLumi_ = streamLumisPtr_->at(i); //get currently processed stream lumi
127  unsigned int monVal;
128 
129 #if ATOMIC_LEVEL > 0
130  if (isAtomic_)
131  monVal =
132  (static_cast<std::vector<AtomicMonUInt *> const *>(tracked_))->at(i)->load(std::memory_order_relaxed);
133 #else
134  if (isAtomic_)
135  monVal = *((static_cast<std::vector<AtomicMonUInt *> const *>(tracked_))->at(i));
136 #endif
137  else
138  monVal = (static_cast<std::vector<unsigned int> const *>(tracked_))->at(i);
139 
140  auto itr = streamDataMaps_[i].find(streamLumi_);
141  if (itr == streamDataMaps_[i].end()) {
142  if (opType_ == OPHISTO) {
143  if (*nBinsPtr_) {
145  nh->update(monVal);
146  streamDataMaps_[i][streamLumi_] = nh;
147  }
148  } else { //default to SUM
149  IntJ *nj = new IntJ;
150  nj->update(monVal);
151  streamDataMaps_[i][streamLumi_] = nj;
152  }
153  } else {
154  if (opType_ == OPHISTO) {
155  if (*nBinsPtr_) {
156  (static_cast<HistoJ<unsigned int> *>(itr->second.get()))->update(monVal);
157  }
158  } else {
159  *(static_cast<IntJ *>(itr->second.get())) = monVal;
160  }
161  }
162  }
163  } else
164  assert(monType_ != TYPEINT); //not yet implemented, application error
165  } else
166  snapGlobal(lumi);
167 }
168 
169 void DataPoint::snapGlobal(unsigned int lumi) {
170  isCached_ = false;
171  if (isStream_)
172  return;
173  auto itr = globalDataMap_.find(lumi);
174  if (itr == globalDataMap_.end()) {
175  if (monType_ == TYPEINT) {
176  IntJ *ij = new IntJ;
177  ij->update((static_cast<IntJ const *>(tracked_))->value());
178  globalDataMap_[lumi] = ij;
179  }
180  if (monType_ == TYPEDOUBLE) {
181  DoubleJ *dj = new DoubleJ;
182  dj->update((static_cast<DoubleJ const *>(tracked_))->value());
183  globalDataMap_[lumi] = dj;
184  }
185  if (monType_ == TYPESTRING) {
186  StringJ *sj = new StringJ;
187  sj->update((static_cast<StringJ const *>(tracked_))->value());
188  globalDataMap_[lumi] = sj;
189  }
190  } else {
191  if (monType_ == TYPEINT)
192  static_cast<IntJ *>(itr->second.get())->update((static_cast<IntJ const *>(tracked_))->value());
193  else if (monType_ == TYPEDOUBLE)
194  static_cast<DoubleJ *>(itr->second.get())->update((static_cast<DoubleJ const *>(tracked_))->value());
195  else if (monType_ == TYPESTRING)
196  static_cast<StringJ *>(itr->second.get())->concatenate((static_cast<StringJ const *>(tracked_))->value());
197  }
198 }
199 
200 void DataPoint::snapStreamAtomic(unsigned int lumi, unsigned int streamID) {
201  if (!isStream_ || !isAtomic_)
202  return;
203  isCached_ = false;
204  if (monType_ == TYPEUINT) {
205  unsigned int monVal;
206 #if ATOMIC_LEVEL > 0
207  if (isAtomic_)
208  monVal =
209  (static_cast<std::vector<AtomicMonUInt *> const *>(tracked_))->at(streamID)->load(std::memory_order_relaxed);
210 #else
211  if (isAtomic_)
212  monVal = *((static_cast<std::vector<AtomicMonUInt *> const *>(tracked_))->at(streamID));
213 #endif
214  else
215  monVal = (static_cast<std::vector<unsigned int> const *>(tracked_))->at(streamID);
216 
217  auto itr = streamDataMaps_[streamID].find(lumi);
218  if (itr == streamDataMaps_[streamID].end()) //insert
219  {
220  if (opType_ == OPHISTO) {
221  if (*nBinsPtr_) {
223  h->update(monVal);
224  streamDataMaps_[streamID][lumi] = h;
225  }
226  } else { //default to SUM
227 
228  IntJ *h = new IntJ;
229  h->update(monVal);
230  streamDataMaps_[streamID][lumi] = h;
231  }
232  } else {
233  if (opType_ == OPHISTO) {
234  if (*nBinsPtr_) {
235  static_cast<HistoJ<unsigned int> *>(itr->second.get())->update(monVal);
236  }
237  } else
238  *(static_cast<IntJ *>(itr->second.get())) = monVal;
239  }
240  } else
241  assert(monType_ != TYPEINT); //not yet implemented
242 }
243 
245  if (tracked_) {
246  if (isStream_) {
247  std::stringstream ss;
248  if (sid < 0) {
249  if (isAtomic_) {
250  // if ATOMIC_LEVEL>0
251  // ss << (unsigned int) (static_cast<std::vector<AtomicMonUInt*>*>(tracked_))->at(fastIndex_)->load(std::memory_order_relaxed);
252 
253  ss << (unsigned int)*((static_cast<std::vector<AtomicMonUInt *> const *>(tracked_))->at(fastIndex_));
254  fastIndex_ = (fastIndex_ + 1) % (static_cast<std::vector<AtomicMonUInt *> const *>(tracked_))->size();
255  } else {
256  ss << (static_cast<std::vector<unsigned int> const *>(tracked_))->at(fastIndex_);
257  fastIndex_ = (fastIndex_ + 1) % (static_cast<std::vector<unsigned int> const *>(tracked_))->size();
258  }
259 
260  } else {
261  if (isAtomic_)
262  ss << (unsigned int)*((static_cast<std::vector<AtomicMonUInt *> const *>(tracked_))->at(unsigned(sid)));
263  else
264  ss << (static_cast<std::vector<unsigned int> const *>(tracked_))->at(unsigned(sid));
265  }
266  return ss.str();
267  }
268  return (static_cast<JsonMonitorable const *>(tracked_))->toString();
269  }
270  return std::string("");
271 }
272 
274  assert(monType_ == TYPEUINT && isStream_); //for now only support UINT and SUM for stream variables
275  IntJ *newJ = new IntJ;
276  for (unsigned int i = 0; i < streamDataMaps_.size(); i++) {
277  auto itr = streamDataMaps_[i].find(lumi);
278  if (itr != streamDataMaps_[i].end()) {
279  newJ->add(static_cast<IntJ *>(itr->second.get())->value());
280  }
281  }
282  cacheI_ = newJ->value();
283  isCached_ = true;
284  return newJ; //assume the caller takes care of deleting the object
285 }
286 
287 void DataPoint::mergeAndSerialize(Json::Value &root, unsigned int lumi, bool initJsonValue, int sid) {
288  if (initJsonValue) {
289  root[SOURCE] = source_;
291  }
292 
293  if (isDummy_) {
294  root[DATA].append("N/A");
295  return;
296  }
297  if (!isStream_) {
298  auto itr = globalDataMap_.find(lumi);
299  if (itr != globalDataMap_.end()) {
300  root[DATA].append(itr->second.get()->toString());
301  } else {
302  if (NAifZeroUpdates_)
303  root[DATA].append("N/A");
304  else if (monType_ == TYPESTRING)
305  root[DATA].append("");
306  else
307  root[DATA].append("0");
308  }
309  return;
310  } else {
312  if (isCached_) {
313  std::stringstream ss;
314  ss << cacheI_;
315  root[DATA].append(ss.str());
316  return;
317  }
318  if (opType_ != OPHISTO) { //sum is default
319  std::stringstream ss;
320  unsigned int updates = 0;
321  unsigned int sum = 0;
322  if (sid < 1)
323  for (unsigned int i = 0; i < streamDataMaps_.size(); i++) {
324  auto itr = streamDataMaps_[i].find(lumi);
325  if (itr != streamDataMaps_[i].end()) {
326  sum += static_cast<IntJ *>(itr->second.get())->value();
327  updates++;
328  }
329  }
330  else {
331  auto itr = streamDataMaps_[unsigned(sid)].find(lumi);
332  if (itr != streamDataMaps_[unsigned(sid)].end()) {
333  sum += static_cast<IntJ *>(itr->second.get())->value();
334  updates++;
335  }
336  }
337  if (!updates && NAifZeroUpdates_)
338  ss << "N/A";
339  ss << sum;
340  root[DATA].append(ss.str());
341  return;
342  }
343  if (opType_ == OPHISTO) {
344  if (nBinsPtr_ == nullptr) {
345  root[DATA].append("N/A");
346  return;
347  }
348  if (*nBinsPtr_ > bufLen_) {
349  if (buf_)
350  delete[] buf_;
351  bufLen_ = *nBinsPtr_;
352  buf_ = new uint32_t[bufLen_];
353  }
354  memset(buf_, 0, bufLen_ * sizeof(uint32_t));
355  unsigned int updates = 0;
356  if (sid < 1)
357  for (unsigned int i = 0; i < streamDataMaps_.size(); i++) {
358  auto itr = streamDataMaps_[i].find(lumi);
359  if (itr != streamDataMaps_[i].end()) {
360  HistoJ<unsigned int> *monObj = static_cast<HistoJ<unsigned int> *>(itr->second.get());
361  updates += monObj->getUpdates();
362  auto &hvec = monObj->value();
363  for (unsigned int j = 0; j < hvec.size(); j++) {
364  unsigned int thisbin = (unsigned int)hvec[j];
365  if (thisbin < *nBinsPtr_) {
366  buf_[thisbin]++;
367  }
368  }
369  }
370  }
371  else {
372  auto itr = streamDataMaps_[unsigned(sid)].find(lumi);
373  if (itr != streamDataMaps_[unsigned(sid)].end()) {
374  HistoJ<unsigned int> *monObj = static_cast<HistoJ<unsigned int> *>(itr->second.get());
375  updates += monObj->getUpdates();
376  auto &hvec = monObj->value();
377  for (unsigned int j = 0; j < hvec.size(); j++) {
378  unsigned int thisbin = (unsigned int)hvec[j];
379  if (thisbin < *nBinsPtr_) {
380  buf_[thisbin]++;
381  }
382  }
383  }
384  }
385  std::stringstream ss;
386  if (!*nBinsPtr_ || (!updates && NAifZeroUpdates_))
387  ss << "N/A";
388  else {
389  ss << "[";
390  if (*nBinsPtr_) {
391  for (unsigned int i = 0; i < *nBinsPtr_ - 1; i++) {
392  ss << buf_[i] << ",";
393  }
394  ss << buf_[*nBinsPtr_ - 1];
395  }
396  ss << "]";
397  }
398  root[DATA].append(ss.str());
399  return;
400  }
401  }
402 }
403 
404 //wipe out data that will no longer be used
405 void DataPoint::discardCollected(unsigned int lumi) {
406  for (unsigned int i = 0; i < streamDataMaps_.size(); i++) {
407  auto itr = streamDataMaps_[i].find(lumi);
408  if (itr != streamDataMaps_[i].end())
409  streamDataMaps_[i].erase(lumi);
410  }
411 
412  auto itr = globalDataMap_.find(lumi);
413  if (itr != globalDataMap_.end())
414  globalDataMap_.erase(lumi);
415 }
size
Write out results.
OperationType opType_
Definition: DataPoint.h:140
void snap(unsigned int lumi)
Definition: DataPoint.cc:121
std::vector< unsigned int > * streamLumisPtr_
Definition: DataPoint.h:131
void discardCollected(unsigned int forLumi)
Definition: DataPoint.cc:405
static const std::string SOURCE
Definition: DataPoint.h:116
void update(std::string const &newStr)
Represents a JSON value.
Definition: value.h:99
unsigned int fastIndex_
Definition: DataPoint.h:151
std::map< unsigned int, JsonMonPtr > MonPtrMap
Definition: DataPoint.h:34
assert(be >=bs)
void trackMonitorable(JsonMonitorable const *monitorable, bool NAifZeroUpdates)
Definition: DataPoint.cc:68
std::vector< T > & value()
std::string definition_
Definition: DataPoint.h:123
static std::string const input
Definition: EdmProvDump.cc:50
static const std::string DATA
Definition: DataPoint.h:118
#define MAXUPDATES
Definition: DataPoint.cc:15
std::vector< MonPtrMap > streamDataMaps_
Definition: DataPoint.h:126
void snapGlobal(unsigned int lumi)
Definition: DataPoint.cc:169
void serialize(Json::Value &root) const override
Definition: DataPoint.cc:40
std::vector< std::string > data_
Definition: DataPoint.h:124
JsonMonitorable * mergeAndRetrieveValue(unsigned int forLumi)
Definition: DataPoint.cc:273
void add(long sth)
std::string fastOutCSV(int sid=-1)
Definition: DataPoint.cc:244
unsigned int bufLen_
Definition: DataPoint.h:145
void const * tracked_
Definition: DataPoint.h:128
uint32_t nh
void trackVectorUInt(std::string const &name, std::vector< unsigned int > const *monvec, bool NAifZeroUpdates)
Definition: DataPoint.cc:82
void trackVectorUIntAtomic(std::string const &name, std::vector< AtomicMonUInt *> const *monvec, bool NAifZeroUpdates)
Definition: DataPoint.cc:93
virtual std::string const & getName() const
void deserialize(Json::Value &root) override
Definition: DataPoint.cc:51
void mergeAndSerialize(Json::Value &jsonRoot, unsigned int lumi, bool initJsonValue, int sid)
Definition: DataPoint.cc:287
void update(long sth)
#define update(a, b)
void snapStreamAtomic(unsigned int lumi, unsigned int streamID)
Definition: DataPoint.cc:200
void makeStreamLumiMap(unsigned int size)
Definition: DataPoint.cc:105
The Signals That Services Can Subscribe To This is based on ActivityRegistry h
Helper function to determine trigger accepts.
Definition: Activities.doc:4
static const std::string DEFINITION
Definition: DataPoint.h:117
unsigned int * nBinsPtr_
Definition: DataPoint.h:147
void update(double sth)