CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
DataPoint.cc
Go to the documentation of this file.
1 /*
2  * DataPoint.cc
3  *
4  * Created on: Sep 24, 2012
5  * Author: aspataru
6  */
7 
9 
10 #include <tbb/concurrent_vector.h>
11 
12 #include <algorithm>
13 #include <assert.h>
14 
15 //max collected updates per lumi
16 #define MAXUPDATES 200
17 #define MAXBINS
18 
19 using namespace jsoncollector;
20 
21 namespace jsoncollector {
22  template class HistoJ<unsigned int>;
23  template class HistoJ<double>;
24 }
25 
26 const std::string DataPoint::SOURCE = "source";
27 const std::string DataPoint::DEFINITION = "definition";
28 const std::string DataPoint::DATA = "data";
29 
30 
32 {
33  if (buf_) delete[] buf_;
34 }
35 
36 /*
37  *
38  * Method implementation for simple DataPoint usage
39  *
40  */
41 
42 
44 {
45  if (source_.size()) {
46  root[SOURCE] = source_;
47  }
48  if (definition_.size()) {
49  root[DEFINITION] = definition_;
50  }
51  for (unsigned int i=0;i<data_.size();i++)
52  root[DATA].append(data_[i]);
53 }
54 
55 
57 {
58  source_ = root.get(SOURCE, "").asString();
59  definition_ = root.get(DEFINITION, "").asString();
60  if (root.get(DATA, "").isArray()) {
61  unsigned int size = root.get(DATA, "").size();
62  for (unsigned int i = 0; i < size; i++) {
63  data_.push_back(root.get(DATA, "")[i].asString());
64  }
65  }
66 }
67 
68 /*
69  *
70  * Method implementation for the new multi-threaded model
71  *
72  * */
73 
74 void DataPoint::trackMonitorable(JsonMonitorable *monitorable,bool NAifZeroUpdates)
75 {
76  name_=monitorable->getName();
77  tracked_ = (void*)monitorable;
78  if (dynamic_cast<IntJ*>(monitorable)) monType_=TYPEINT;
79  else if (dynamic_cast<DoubleJ*>(monitorable)) monType_=TYPEDOUBLE;
80  else if (dynamic_cast<StringJ*>(monitorable)) monType_=TYPESTRING;
81  else assert(0);
82  NAifZeroUpdates_=NAifZeroUpdates;
83 
84 }
85 
86 void DataPoint::trackVectorUInt(std::string const& name, std::vector<unsigned int> *monvec, bool NAifZeroUpdates)
87 {
88  name_=name;
89  tracked_ = (void*)monvec;
90  isStream_=true;
92  NAifZeroUpdates_=NAifZeroUpdates;
93  makeStreamLumiMap(monvec->size());
94 }
95 
96 void DataPoint::trackVectorUIntAtomic(std::string const& name, std::vector<AtomicMonUInt*> *monvec, bool NAifZeroUpdates)
97 {
98  name_=name;
99  tracked_ = (void*)monvec;
100  isStream_=true;
101  isAtomic_=true;
103  NAifZeroUpdates_=NAifZeroUpdates;
104  makeStreamLumiMap(monvec->size());
105 }
106 
108 {
109  for (unsigned int i=0;i<size;i++) {
110  streamDataMaps_.push_back(MonPtrMap());
111  }
112 }
113 
114 void DataPoint::serialize(Json::Value& root, bool rootInit, std::string const&input) const
115 {
116  if (rootInit) {
117  if (source_.size())
118  root[SOURCE] = source_;
119  if (definition_.size())
120  root[DEFINITION] = definition_;
121  }
122  root[DATA].append(input);
123 }
124 
125 void DataPoint::snap(unsigned int lumi)
126 {
127  isCached_=false;
128  if (isStream_) {
129  if (monType_==TYPEUINT)
130  {
131  for (unsigned int i=0; i<streamDataMaps_.size();i++) {
132  unsigned int streamLumi_=streamLumisPtr_->at(i);//get currently processed stream lumi
133  unsigned int monVal;
134 
135 #if ATOMIC_LEVEL>0
136  if (isAtomic_) monVal = (static_cast<std::vector<AtomicMonUInt*>*>(tracked_))->at(i)->load(std::memory_order_relaxed);
137 #else
138  if (isAtomic_) monVal = *((static_cast<std::vector<AtomicMonUInt*>*>(tracked_))->at(i));
139 #endif
140  else monVal = (static_cast<std::vector<unsigned int>*>(tracked_))->at(i);
141 
142  auto itr = streamDataMaps_[i].find(streamLumi_);
143  if (itr==streamDataMaps_[i].end())
144  {
145  if (opType_==OPHISTO) {
146  if (*nBinsPtr_) {
148  nh->update(monVal);
149  streamDataMaps_[i][streamLumi_] = nh;
150  }
151  }
152  else {//default to SUM
153  IntJ *nj = new IntJ;
154  nj->update(monVal);
155  streamDataMaps_[i][streamLumi_]= nj;
156  }
157  }
158  else {
159  if (opType_==OPHISTO) {
160  if (*nBinsPtr_) {
161  (static_cast<HistoJ<unsigned int> *>(itr->second.get()))->update(monVal);
162  }
163  }
164  else {
165  *(static_cast<IntJ*>(itr->second.get()))=monVal;
166  }
167  }
168  }
169  }
170  else assert(monType_!=TYPEINT);//not yet implemented, application error
171  }
172  else snapGlobal(lumi);
173 }
174 
175 void DataPoint::snapGlobal(unsigned int lumi)
176 {
177  isCached_=false;
178  if (isStream_) return;
179  auto itr = globalDataMap_.find(lumi);
180  if (itr==globalDataMap_.end()) {
181  if (monType_==TYPEINT) {
182  IntJ *ij = new IntJ;
183  ij->update((static_cast<IntJ*>(tracked_))->value());
184  globalDataMap_[lumi]=ij;
185  }
186  if (monType_==TYPEDOUBLE) {
187  DoubleJ *dj = new DoubleJ;
188  dj->update((static_cast<DoubleJ*>(tracked_))->value());
189  globalDataMap_[lumi]=dj;
190  }
191  if (monType_==TYPESTRING) {
192  StringJ *sj = new StringJ;
193  sj->update((static_cast<StringJ*>(tracked_))->value());
194  globalDataMap_[lumi]=sj;
195  }
196  } else {
197  if (monType_==TYPEINT)
198  static_cast<IntJ*>(itr->second.get())->update((static_cast<IntJ*>(tracked_))->value());
199  else if (monType_==TYPEDOUBLE)
200  static_cast<DoubleJ*>(itr->second.get())->update((static_cast<DoubleJ*>(tracked_))->value());
201  else if (monType_==TYPESTRING)
202  static_cast<StringJ*>(itr->second.get())->concatenate((static_cast<StringJ*>(tracked_))->value());
203  }
204 }
205 
206 void DataPoint::snapStreamAtomic(unsigned int lumi, unsigned int streamID)
207 {
208  if (!isStream_ || !isAtomic_) return;
209  isCached_=false;
210  if (monType_==TYPEUINT)
211  {
212  unsigned int monVal;
213 #if ATOMIC_LEVEL>0
214  if (isAtomic_) monVal = (static_cast<std::vector<AtomicMonUInt*>*>(tracked_))->at(streamID)->load(std::memory_order_relaxed);
215 #else
216  if (isAtomic_) monVal = *((static_cast<std::vector<AtomicMonUInt*>*>(tracked_))->at(streamID));
217 #endif
218  else monVal = (static_cast<std::vector<unsigned int>*>(tracked_))->at(streamID);
219 
220  auto itr = streamDataMaps_[streamID].find(lumi);
221  if (itr==streamDataMaps_[streamID].end()) //insert
222  {
223  if (opType_==OPHISTO) {
224  if (*nBinsPtr_) {
226  h->update(monVal);
227  streamDataMaps_[streamID][lumi] = h;
228  }
229  }
230  else {//default to SUM
231 
232  IntJ *h = new IntJ;
233  h->update(monVal);
234  streamDataMaps_[streamID][lumi] = h;
235  }
236  }
237  else
238  {
239  if (opType_==OPHISTO) {
240  if (*nBinsPtr_) {
241  static_cast<HistoJ<unsigned int>*>(itr->second.get())->update(monVal);
242  }
243  }
244  else
245  *(static_cast<IntJ*>(itr->second.get()))=monVal;
246  }
247  }
248  else assert(monType_!=TYPEINT);//not yet implemented
249 }
250 
252 {
253  if (tracked_) {
254  if (isStream_) {
255  std::stringstream ss;
256  if (isAtomic_) {
257 #if ATOMIC_LEVEL>0
258  ss << (unsigned int) (static_cast<std::vector<AtomicMonUInt*>*>(tracked_))->at(fastIndex_)->load(std::memory_order_relaxed);
259 #else
260  ss << (unsigned int) *((static_cast<std::vector<AtomicMonUInt*>*>(tracked_))->at(fastIndex_));
261 #endif
262  fastIndex_ = (fastIndex_+1) % (static_cast<std::vector<AtomicMonUInt*>*>(tracked_))->size();
263  }
264  else {
265  ss << (static_cast<std::vector<unsigned int>*>(tracked_))->at(fastIndex_);
266  fastIndex_ = (fastIndex_+1) % (static_cast<std::vector<unsigned int>*>(tracked_))->size();
267  }
268 
269  return ss.str();
270  }
271  return (static_cast<JsonMonitorable*>(tracked_))->toString();
272  }
273  return std::string("");
274 }
275 
277 {
278  assert(monType_==TYPEUINT && isStream_);//for now only support UINT and SUM for stream variables
279  IntJ *newJ = new IntJ;
280  for (unsigned int i=0;i<streamDataMaps_.size();i++) {
281  auto itr = streamDataMaps_[i].find(lumi);
282  if (itr!=streamDataMaps_[i].end()) {
283  newJ->add(static_cast<IntJ*>(itr->second.get())->value());
284  }
285  }
286  cacheI_=newJ->value();
287  isCached_=true;
288  return newJ;//assume the caller takes care of deleting the object
289 }
290 
291 void DataPoint::mergeAndSerialize(Json::Value & root,unsigned int lumi,bool initJsonValue)
292 {
293  if (initJsonValue) {
294  root[SOURCE] = source_;
295  root[DEFINITION] = definition_;
296  }
297 
298  if (isDummy_) {
299  root[DATA].append("N/A");
300  return;
301  }
302  if (!isStream_) {
303  auto itr = globalDataMap_.find(lumi);
304  if (itr != globalDataMap_.end()) {
305  root[DATA].append(itr->second.get()->toString());
306  }
307  else {
308  if (NAifZeroUpdates_) root[DATA].append("N/A");
309  else if (monType_==TYPESTRING) root[DATA].append("");
310  else root[DATA].append("0");
311  }
312  return;
313  }
314  else {
315  assert(monType_==TYPEUINT);
316  if (isCached_) {
317  std::stringstream ss;
318  ss << cacheI_;
319  root[DATA].append(ss.str());
320  return;
321  }
322  if (opType_!=OPHISTO) {//sum is default
323  std::stringstream ss;
324  unsigned int updates=0;
325  unsigned int sum=0;
326  for (unsigned int i=0;i<streamDataMaps_.size();i++) {
327  auto itr = streamDataMaps_[i].find(lumi);
328  if (itr!=streamDataMaps_[i].end()) {
329  sum+=static_cast<IntJ*>(itr->second.get())->value();
330  updates++;
331  }
332  }
333  if (!updates && NAifZeroUpdates_) ss << "N/A";
334  ss << sum;
335  root[DATA].append(ss.str());
336  return;
337  }
338  if (opType_==OPHISTO) {
339  if (nBinsPtr_==nullptr) {
340  root[DATA].append("N/A");
341  return;
342  }
343  if (*nBinsPtr_>bufLen_) {
344  if (buf_) delete[] buf_;
346  buf_= new uint32_t[bufLen_];
347  }
348  memset(buf_,0,bufLen_*sizeof(uint32_t));
349  unsigned int updates=0;
350  for (unsigned int i=0;i<streamDataMaps_.size();i++) {
351  auto itr = streamDataMaps_[i].find(lumi);
352  if (itr!=streamDataMaps_[i].end()) {
353  HistoJ <unsigned int>* monObj = static_cast<HistoJ<unsigned int>*>(itr->second.get());
354  updates+=monObj->getUpdates();
355  auto &hvec = monObj->value();
356  for (unsigned int i=0;i<hvec.size();i++) {
357  unsigned int thisbin=(unsigned int) hvec[i];
358  if (thisbin<*nBinsPtr_) {
359  buf_[thisbin]++;
360  }
361  }
362  }
363  }
364  std::stringstream ss;
365  if (!*nBinsPtr_ || (!updates && NAifZeroUpdates_)) ss << "N/A";
366  else {
367  ss << "[";
368  if (*nBinsPtr_) {
369  for (unsigned int i=0;i<*nBinsPtr_-1;i++) {
370  ss << buf_[i] << ",";
371  }
372  ss << buf_[*nBinsPtr_-1];
373  }
374  ss << "]";
375  }
376  root[DATA].append(ss.str());
377  return;
378  }
379  }
380 }
381 
382 //wipe out data that will no longer be used
384 {
385  for (unsigned int i=0;i<streamDataMaps_.size();i++)
386  {
387  auto itr = streamDataMaps_[i].find(lumi);
388  if (itr!=streamDataMaps_[i].end()) streamDataMaps_[i].erase(lumi);
389  }
390 
391  auto itr = globalDataMap_.find(lumi);
392  if (itr!=globalDataMap_.end())
393  globalDataMap_.erase(lumi);
394 }
395 
OperationType opType_
Definition: DataPoint.h:144
Value get(UInt index, const Value &defaultValue) const
int i
Definition: DBlmapReader.cc:9
void snap(unsigned int lumi)
Definition: DataPoint.cc:125
std::vector< unsigned int > * streamLumisPtr_
Definition: DataPoint.h:135
void discardCollected(unsigned int forLumi)
Definition: DataPoint.cc:383
void trackVectorUInt(std::string const &name, std::vector< unsigned int > *monvec, bool NAifZeroUpdates)
Definition: DataPoint.cc:86
static const std::string SOURCE
Definition: DataPoint.h:120
virtual std::string & getName()
void mergeAndSerialize(Json::Value &jsonRoot, unsigned int lumi, bool initJsonValue)
Definition: DataPoint.cc:291
tuple lumi
Definition: fjr2json.py:35
virtual void serialize(Json::Value &root) const
Definition: DataPoint.cc:43
Value & append(const Value &value)
Append value to array at the end.
void trackMonitorable(JsonMonitorable *monitorable, bool NAifZeroUpdates)
Definition: DataPoint.cc:74
void update(std::string const &newStr)
std::map< unsigned int, JsonMonPtr > MonPtrMap
Definition: DataPoint.h:34
Represents a JSON value.
Definition: value.h:111
unsigned int fastIndex_
Definition: DataPoint.h:155
std::vector< T > & value()
std::string definition_
Definition: DataPoint.h:127
static std::string const input
Definition: EdmProvDump.cc:44
static const std::string DATA
Definition: DataPoint.h:122
UInt size() const
Number of values in array or object.
#define MAXUPDATES
Definition: DataPoint.cc:16
std::vector< MonPtrMap > streamDataMaps_
Definition: DataPoint.h:130
bool isArray() const
void snapGlobal(unsigned int lumi)
Definition: DataPoint.cc:175
std::vector< std::string > data_
Definition: DataPoint.h:128
JsonMonitorable * mergeAndRetrieveValue(unsigned int forLumi)
Definition: DataPoint.cc:276
void add(long sth)
The Signals That Services Can Subscribe To This is based on ActivityRegistry h
Helper function to determine trigger accepts.
Definition: Activities.doc:4
#define end
Definition: vmac.h:37
unsigned int bufLen_
Definition: DataPoint.h:149
virtual void deserialize(Json::Value &root)
Definition: DataPoint.cc:56
void trackVectorUIntAtomic(std::string const &name, std::vector< AtomicMonUInt * > *monvec, bool NAifZeroUpdates)
Definition: DataPoint.cc:96
std::string asString() const
void update(long sth)
#define update(a, b)
std::string fastOutCSV()
Definition: DataPoint.cc:251
void snapStreamAtomic(unsigned int lumi, unsigned int streamID)
Definition: DataPoint.cc:206
void makeStreamLumiMap(unsigned int size)
Definition: DataPoint.cc:107
static const std::string DEFINITION
Definition: DataPoint.h:121
unsigned int * nBinsPtr_
Definition: DataPoint.h:151
tuple size
Write out results.
void update(double sth)
string root
initialization
Definition: dbtoconf.py:70