CMS 3D CMS Logo

DataPoint.cc
Go to the documentation of this file.
1 /*
2  * DataPoint.cc
3  *
4  * Created on: Sep 24, 2012
5  * Author: aspataru
6  */
7 
9 
10 #include <tbb/concurrent_vector.h>
11 
12 #include <algorithm>
13 #include <cassert>
14 
15 //max collected updates per lumi
16 #define MAXUPDATES 0xffffffff
17 #define MAXBINS
18 
19 using namespace jsoncollector;
20 
21 namespace jsoncollector {
22  template class HistoJ<unsigned int>;
23  template class HistoJ<double>;
24 }
25 
26 const std::string DataPoint::SOURCE = "source";
27 const std::string DataPoint::DEFINITION = "definition";
28 const std::string DataPoint::DATA = "data";
29 
30 
32 {
33  if (buf_) delete[] buf_;
34 }
35 
36 /*
37  *
38  * Method implementation for simple DataPoint usage
39  *
40  */
41 
42 
44 {
45  if (!source_.empty()) {
46  root[SOURCE] = source_;
47  }
48  if (!definition_.empty()) {
49  root[DEFINITION] = definition_;
50  }
51  for (unsigned int i=0;i<data_.size();i++)
52  root[DATA].append(data_[i]);
53 }
54 
55 
57 {
58  source_ = root.get(SOURCE, "").asString();
59  definition_ = root.get(DEFINITION, "").asString();
60  if (root.get(DATA, "").isArray()) {
61  unsigned int size = root.get(DATA, "").size();
62  for (unsigned int i = 0; i < size; i++) {
63  data_.push_back(root.get(DATA, "")[i].asString());
64  }
65  }
66 }
67 
68 /*
69  *
70  * Method implementation for the new multi-threaded model
71  *
72  * */
73 
74 void DataPoint::trackMonitorable(JsonMonitorable *monitorable,bool NAifZeroUpdates)
75 {
76  name_=monitorable->getName();
77  tracked_ = (void*)monitorable;
78  if (dynamic_cast<IntJ*>(monitorable)) monType_=TYPEINT;
79  else if (dynamic_cast<DoubleJ*>(monitorable)) monType_=TYPEDOUBLE;
80  else if (dynamic_cast<StringJ*>(monitorable)) monType_=TYPESTRING;
81  else assert(0);
82  NAifZeroUpdates_=NAifZeroUpdates;
83 
84 }
85 
86 void DataPoint::trackVectorUInt(std::string const& name, std::vector<unsigned int> *monvec, bool NAifZeroUpdates)
87 {
88  name_=name;
89  tracked_ = (void*)monvec;
90  isStream_=true;
92  NAifZeroUpdates_=NAifZeroUpdates;
93  makeStreamLumiMap(monvec->size());
94 }
95 
96 void DataPoint::trackVectorUIntAtomic(std::string const& name, std::vector<AtomicMonUInt*> *monvec, bool NAifZeroUpdates)
97 {
98  name_=name;
99  tracked_ = (void*)monvec;
100  isStream_=true;
101  isAtomic_=true;
103  NAifZeroUpdates_=NAifZeroUpdates;
104  makeStreamLumiMap(monvec->size());
105 }
106 
108 {
109  for (unsigned int i=0;i<size;i++) {
110  streamDataMaps_.push_back(MonPtrMap());
111  }
112 }
113 
114 void DataPoint::serialize(Json::Value& root, bool rootInit, std::string const&input) const
115 {
116  if (rootInit) {
117  if (!source_.empty())
118  root[SOURCE] = source_;
119  if (!definition_.empty())
120  root[DEFINITION] = definition_;
121  }
122  root[DATA].append(input);
123 }
124 
125 void DataPoint::snap(unsigned int lumi)
126 {
127  isCached_=false;
128  if (isStream_) {
129  if (monType_==TYPEUINT)
130  {
131  for (unsigned int i=0; i<streamDataMaps_.size();i++) {
132  unsigned int streamLumi_=streamLumisPtr_->at(i);//get currently processed stream lumi
133  unsigned int monVal;
134 
135 #if ATOMIC_LEVEL>0
136  if (isAtomic_) monVal = (static_cast<std::vector<AtomicMonUInt*>*>(tracked_))->at(i)->load(std::memory_order_relaxed);
137 #else
138  if (isAtomic_) monVal = *((static_cast<std::vector<AtomicMonUInt*>*>(tracked_))->at(i));
139 #endif
140  else monVal = (static_cast<std::vector<unsigned int>*>(tracked_))->at(i);
141 
142  auto itr = streamDataMaps_[i].find(streamLumi_);
143  if (itr==streamDataMaps_[i].end())
144  {
145  if (opType_==OPHISTO) {
146  if (*nBinsPtr_) {
148  nh->update(monVal);
149  streamDataMaps_[i][streamLumi_] = nh;
150  }
151  }
152  else {//default to SUM
153  IntJ *nj = new IntJ;
154  nj->update(monVal);
155  streamDataMaps_[i][streamLumi_]= nj;
156  }
157  }
158  else {
159  if (opType_==OPHISTO) {
160  if (*nBinsPtr_) {
161  (static_cast<HistoJ<unsigned int> *>(itr->second.get()))->update(monVal);
162  }
163  }
164  else {
165  *(static_cast<IntJ*>(itr->second.get()))=monVal;
166  }
167  }
168  }
169  }
170  else assert(monType_!=TYPEINT);//not yet implemented, application error
171  }
172  else snapGlobal(lumi);
173 }
174 
175 void DataPoint::snapGlobal(unsigned int lumi)
176 {
177  isCached_=false;
178  if (isStream_) return;
179  auto itr = globalDataMap_.find(lumi);
180  if (itr==globalDataMap_.end()) {
181  if (monType_==TYPEINT) {
182  IntJ *ij = new IntJ;
183  ij->update((static_cast<IntJ*>(tracked_))->value());
184  globalDataMap_[lumi]=ij;
185  }
186  if (monType_==TYPEDOUBLE) {
187  DoubleJ *dj = new DoubleJ;
188  dj->update((static_cast<DoubleJ*>(tracked_))->value());
189  globalDataMap_[lumi]=dj;
190  }
191  if (monType_==TYPESTRING) {
192  StringJ *sj = new StringJ;
193  sj->update((static_cast<StringJ*>(tracked_))->value());
194  globalDataMap_[lumi]=sj;
195  }
196  } else {
197  if (monType_==TYPEINT)
198  static_cast<IntJ*>(itr->second.get())->update((static_cast<IntJ*>(tracked_))->value());
199  else if (monType_==TYPEDOUBLE)
200  static_cast<DoubleJ*>(itr->second.get())->update((static_cast<DoubleJ*>(tracked_))->value());
201  else if (monType_==TYPESTRING)
202  static_cast<StringJ*>(itr->second.get())->concatenate((static_cast<StringJ*>(tracked_))->value());
203  }
204 }
205 
206 void DataPoint::snapStreamAtomic(unsigned int lumi, unsigned int streamID)
207 {
208  if (!isStream_ || !isAtomic_) return;
209  isCached_=false;
210  if (monType_==TYPEUINT)
211  {
212  unsigned int monVal;
213 #if ATOMIC_LEVEL>0
214  if (isAtomic_) monVal = (static_cast<std::vector<AtomicMonUInt*>*>(tracked_))->at(streamID)->load(std::memory_order_relaxed);
215 #else
216  if (isAtomic_) monVal = *((static_cast<std::vector<AtomicMonUInt*>*>(tracked_))->at(streamID));
217 #endif
218  else monVal = (static_cast<std::vector<unsigned int>*>(tracked_))->at(streamID);
219 
220  auto itr = streamDataMaps_[streamID].find(lumi);
221  if (itr==streamDataMaps_[streamID].end()) //insert
222  {
223  if (opType_==OPHISTO) {
224  if (*nBinsPtr_) {
226  h->update(monVal);
227  streamDataMaps_[streamID][lumi] = h;
228  }
229  }
230  else {//default to SUM
231 
232  IntJ *h = new IntJ;
233  h->update(monVal);
234  streamDataMaps_[streamID][lumi] = h;
235  }
236  }
237  else
238  {
239  if (opType_==OPHISTO) {
240  if (*nBinsPtr_) {
241  static_cast<HistoJ<unsigned int>*>(itr->second.get())->update(monVal);
242  }
243  }
244  else
245  *(static_cast<IntJ*>(itr->second.get()))=monVal;
246  }
247  }
248  else assert(monType_!=TYPEINT);//not yet implemented
249 }
250 
252 {
253  if (tracked_) {
254  if (isStream_) {
255  std::stringstream ss;
256  if (sid<0) {
257  if (isAtomic_) {
258 // if ATOMIC_LEVEL>0
259 // ss << (unsigned int) (static_cast<std::vector<AtomicMonUInt*>*>(tracked_))->at(fastIndex_)->load(std::memory_order_relaxed);
260 
261  ss << (unsigned int) *((static_cast<std::vector<AtomicMonUInt*>*>(tracked_))->at(fastIndex_));
262  fastIndex_ = (fastIndex_+1) % (static_cast<std::vector<AtomicMonUInt*>*>(tracked_))->size();
263  }
264  else {
265  ss << (static_cast<std::vector<unsigned int>*>(tracked_))->at(fastIndex_);
266  fastIndex_ = (fastIndex_+1) % (static_cast<std::vector<unsigned int>*>(tracked_))->size();
267  }
268 
269  }
270  else {
271  if (isAtomic_)
272  ss << (unsigned int) *((static_cast<std::vector<AtomicMonUInt*>*>(tracked_))->at(unsigned(sid)));
273  else
274  ss << (static_cast<std::vector<unsigned int>*>(tracked_))->at(unsigned(sid));
275  }
276  return ss.str();
277  }
278  return (static_cast<JsonMonitorable*>(tracked_))->toString();
279  }
280  return std::string("");
281 }
282 
284 {
285  assert(monType_==TYPEUINT && isStream_);//for now only support UINT and SUM for stream variables
286  IntJ *newJ = new IntJ;
287  for (unsigned int i=0;i<streamDataMaps_.size();i++) {
288  auto itr = streamDataMaps_[i].find(lumi);
289  if (itr!=streamDataMaps_[i].end()) {
290  newJ->add(static_cast<IntJ*>(itr->second.get())->value());
291  }
292  }
293  cacheI_=newJ->value();
294  isCached_=true;
295  return newJ;//assume the caller takes care of deleting the object
296 }
297 
298 void DataPoint::mergeAndSerialize(Json::Value & root,unsigned int lumi,bool initJsonValue, int sid)
299 {
300  if (initJsonValue) {
301  root[SOURCE] = source_;
302  root[DEFINITION] = definition_;
303  }
304 
305  if (isDummy_) {
306  root[DATA].append("N/A");
307  return;
308  }
309  if (!isStream_) {
310  auto itr = globalDataMap_.find(lumi);
311  if (itr != globalDataMap_.end()) {
312  root[DATA].append(itr->second.get()->toString());
313  }
314  else {
315  if (NAifZeroUpdates_) root[DATA].append("N/A");
316  else if (monType_==TYPESTRING) root[DATA].append("");
317  else root[DATA].append("0");
318  }
319  return;
320  }
321  else {
322  assert(monType_==TYPEUINT);
323  if (isCached_) {
324  std::stringstream ss;
325  ss << cacheI_;
326  root[DATA].append(ss.str());
327  return;
328  }
329  if (opType_!=OPHISTO) {//sum is default
330  std::stringstream ss;
331  unsigned int updates=0;
332  unsigned int sum=0;
333  if (sid<1)
334  for (unsigned int i=0;i<streamDataMaps_.size();i++) {
335  auto itr = streamDataMaps_[i].find(lumi);
336  if (itr!=streamDataMaps_[i].end()) {
337  sum+=static_cast<IntJ*>(itr->second.get())->value();
338  updates++;
339  }
340  }
341  else {
342  auto itr = streamDataMaps_[unsigned(sid)].find(lumi);
343  if (itr!=streamDataMaps_[unsigned(sid)].end()) {
344  sum+=static_cast<IntJ*>(itr->second.get())->value();
345  updates++;
346  }
347  }
348  if (!updates && NAifZeroUpdates_) ss << "N/A";
349  ss << sum;
350  root[DATA].append(ss.str());
351  return;
352  }
353  if (opType_==OPHISTO) {
354  if (nBinsPtr_==nullptr) {
355  root[DATA].append("N/A");
356  return;
357  }
358  if (*nBinsPtr_>bufLen_) {
359  if (buf_) delete[] buf_;
361  buf_= new uint32_t[bufLen_];
362  }
363  memset(buf_,0,bufLen_*sizeof(uint32_t));
364  unsigned int updates=0;
365  if (sid<1)
366  for (unsigned int i=0;i<streamDataMaps_.size();i++) {
367  auto itr = streamDataMaps_[i].find(lumi);
368  if (itr!=streamDataMaps_[i].end()) {
369  HistoJ <unsigned int>* monObj = static_cast<HistoJ<unsigned int>*>(itr->second.get());
370  updates+=monObj->getUpdates();
371  auto &hvec = monObj->value();
372  for (unsigned int j=0;j<hvec.size();j++) {
373  unsigned int thisbin=(unsigned int) hvec[j];
374  if (thisbin<*nBinsPtr_) {
375  buf_[thisbin]++;
376  }
377  }
378  }
379  }
380  else {
381  auto itr = streamDataMaps_[unsigned(sid)].find(lumi);
382  if (itr!=streamDataMaps_[unsigned(sid)].end()) {
383  HistoJ <unsigned int>* monObj = static_cast<HistoJ<unsigned int>*>(itr->second.get());
384  updates+=monObj->getUpdates();
385  auto &hvec = monObj->value();
386  for (unsigned int j=0;j<hvec.size();j++) {
387  unsigned int thisbin=(unsigned int) hvec[j];
388  if (thisbin<*nBinsPtr_) {
389  buf_[thisbin]++;
390  }
391  }
392  }
393  }
394  std::stringstream ss;
395  if (!*nBinsPtr_ || (!updates && NAifZeroUpdates_)) ss << "N/A";
396  else {
397  ss << "[";
398  if (*nBinsPtr_) {
399  for (unsigned int i=0;i<*nBinsPtr_-1;i++) {
400  ss << buf_[i] << ",";
401  }
402  ss << buf_[*nBinsPtr_-1];
403  }
404  ss << "]";
405  }
406  root[DATA].append(ss.str());
407  return;
408  }
409  }
410 }
411 
412 //wipe out data that will no longer be used
414 {
415  for (unsigned int i=0;i<streamDataMaps_.size();i++)
416  {
417  auto itr = streamDataMaps_[i].find(lumi);
418  if (itr!=streamDataMaps_[i].end()) streamDataMaps_[i].erase(lumi);
419  }
420 
421  auto itr = globalDataMap_.find(lumi);
422  if (itr!=globalDataMap_.end())
423  globalDataMap_.erase(lumi);
424 }
425 
size
Write out results.
OperationType opType_
Definition: DataPoint.h:145
Value get(UInt index, const Value &defaultValue) const
void snap(unsigned int lumi)
Definition: DataPoint.cc:125
std::vector< unsigned int > * streamLumisPtr_
Definition: DataPoint.h:136
void discardCollected(unsigned int forLumi)
Definition: DataPoint.cc:413
void trackVectorUInt(std::string const &name, std::vector< unsigned int > *monvec, bool NAifZeroUpdates)
Definition: DataPoint.cc:86
static const std::string SOURCE
Definition: DataPoint.h:121
FWCore Framework interface EventSetupRecordImplementation h
Helper function to determine trigger accepts.
virtual std::string & getName()
Value & append(const Value &value)
Append value to array at the end.
void trackMonitorable(JsonMonitorable *monitorable, bool NAifZeroUpdates)
Definition: DataPoint.cc:74
void update(std::string const &newStr)
std::map< unsigned int, JsonMonPtr > MonPtrMap
Definition: DataPoint.h:34
Represents a JSON value.
Definition: value.h:111
unsigned int fastIndex_
Definition: DataPoint.h:156
std::vector< T > & value()
std::string definition_
Definition: DataPoint.h:128
static std::string const input
Definition: EdmProvDump.cc:45
static const std::string DATA
Definition: DataPoint.h:123
UInt size() const
Number of values in array or object.
#define MAXUPDATES
Definition: DataPoint.cc:16
std::vector< MonPtrMap > streamDataMaps_
Definition: DataPoint.h:131
bool isArray() const
void snapGlobal(unsigned int lumi)
Definition: DataPoint.cc:175
std::vector< std::string > data_
Definition: DataPoint.h:129
JsonMonitorable * mergeAndRetrieveValue(unsigned int forLumi)
Definition: DataPoint.cc:283
void add(long sth)
std::string fastOutCSV(int sid=-1)
Definition: DataPoint.cc:251
void serialize(Json::Value &root) const override
Definition: DataPoint.cc:43
#define end
Definition: vmac.h:39
unsigned int bufLen_
Definition: DataPoint.h:150
void trackVectorUIntAtomic(std::string const &name, std::vector< AtomicMonUInt * > *monvec, bool NAifZeroUpdates)
Definition: DataPoint.cc:96
std::string asString() const
void deserialize(Json::Value &root) override
Definition: DataPoint.cc:56
void mergeAndSerialize(Json::Value &jsonRoot, unsigned int lumi, bool initJsonValue, int sid)
Definition: DataPoint.cc:298
void update(long sth)
#define update(a, b)
void snapStreamAtomic(unsigned int lumi, unsigned int streamID)
Definition: DataPoint.cc:206
void makeStreamLumiMap(unsigned int size)
Definition: DataPoint.cc:107
static const std::string DEFINITION
Definition: DataPoint.h:122
unsigned int * nBinsPtr_
Definition: DataPoint.h:152
void update(double sth)