CMS 3D CMS Logo

DataPoint.cc
Go to the documentation of this file.
1 /*
2  * DataPoint.cc
3  *
4  * Created on: Sep 24, 2012
5  * Author: aspataru
6  */
7 
9 
10 #include <tbb/concurrent_vector.h>
11 
12 #include <algorithm>
13 #include <cassert>
14 
15 //max collected updates per lumi
16 #define MAXUPDATES 0xffffffff
17 #define MAXBINS
18 
19 using namespace jsoncollector;
20 
21 namespace jsoncollector {
22  template class HistoJ<unsigned int>;
23  template class HistoJ<double>;
24 } // namespace jsoncollector
25 
26 const std::string DataPoint::SOURCE = "source";
27 const std::string DataPoint::DEFINITION = "definition";
28 const std::string DataPoint::DATA = "data";
29 
31  if (buf_)
32  delete[] buf_;
33 }
34 
35 /*
36  *
37  * Method implementation for simple DataPoint usage
38  *
39  */
40 
42  if (!source_.empty()) {
43  root[SOURCE] = source_;
44  }
45  if (!definition_.empty()) {
47  }
48  for (unsigned int i = 0; i < data_.size(); i++)
49  root[DATA].append(data_[i]);
50 }
51 
53  source_ = root.get(SOURCE, "").asString();
54  definition_ = root.get(DEFINITION, "").asString();
55  if (root.get(DATA, "").isArray()) {
56  unsigned int size = root.get(DATA, "").size();
57  for (unsigned int i = 0; i < size; i++) {
58  data_.push_back(root.get(DATA, "")[i].asString());
59  }
60  }
61 }
62 
63 /*
64  *
65  * Method implementation for the new multi-threaded model
66  *
67  * */
68 
69 void DataPoint::trackMonitorable(JsonMonitorable *monitorable, bool NAifZeroUpdates) {
70  name_ = monitorable->getName();
71  tracked_ = (void *)monitorable;
72  if (dynamic_cast<IntJ *>(monitorable))
73  monType_ = TYPEINT;
74  else if (dynamic_cast<DoubleJ *>(monitorable))
76  else if (dynamic_cast<StringJ *>(monitorable))
78  else
79  assert(0);
80  NAifZeroUpdates_ = NAifZeroUpdates;
81 }
82 
83 void DataPoint::trackVectorUInt(std::string const &name, std::vector<unsigned int> *monvec, bool NAifZeroUpdates) {
84  name_ = name;
85  tracked_ = (void *)monvec;
86  isStream_ = true;
88  NAifZeroUpdates_ = NAifZeroUpdates;
89  makeStreamLumiMap(monvec->size());
90 }
91 
93  std::vector<AtomicMonUInt *> *monvec,
94  bool NAifZeroUpdates) {
95  name_ = name;
96  tracked_ = (void *)monvec;
97  isStream_ = true;
98  isAtomic_ = true;
100  NAifZeroUpdates_ = NAifZeroUpdates;
101  makeStreamLumiMap(monvec->size());
102 }
103 
104 void DataPoint::makeStreamLumiMap(unsigned int size) {
105  for (unsigned int i = 0; i < size; i++) {
106  streamDataMaps_.push_back(MonPtrMap());
107  }
108 }
109 
110 void DataPoint::serialize(Json::Value &root, bool rootInit, std::string const &input) const {
111  if (rootInit) {
112  if (!source_.empty())
113  root[SOURCE] = source_;
114  if (!definition_.empty())
116  }
117  root[DATA].append(input);
118 }
119 
120 void DataPoint::snap(unsigned int lumi) {
121  isCached_ = false;
122  if (isStream_) {
123  if (monType_ == TYPEUINT) {
124  for (unsigned int i = 0; i < streamDataMaps_.size(); i++) {
125  unsigned int streamLumi_ = streamLumisPtr_->at(i); //get currently processed stream lumi
126  unsigned int monVal;
127 
128 #if ATOMIC_LEVEL > 0
129  if (isAtomic_)
130  monVal = (static_cast<std::vector<AtomicMonUInt *> *>(tracked_))->at(i)->load(std::memory_order_relaxed);
131 #else
132  if (isAtomic_)
133  monVal = *((static_cast<std::vector<AtomicMonUInt *> *>(tracked_))->at(i));
134 #endif
135  else
136  monVal = (static_cast<std::vector<unsigned int> *>(tracked_))->at(i);
137 
138  auto itr = streamDataMaps_[i].find(streamLumi_);
139  if (itr == streamDataMaps_[i].end()) {
140  if (opType_ == OPHISTO) {
141  if (*nBinsPtr_) {
143  nh->update(monVal);
144  streamDataMaps_[i][streamLumi_] = nh;
145  }
146  } else { //default to SUM
147  IntJ *nj = new IntJ;
148  nj->update(monVal);
149  streamDataMaps_[i][streamLumi_] = nj;
150  }
151  } else {
152  if (opType_ == OPHISTO) {
153  if (*nBinsPtr_) {
154  (static_cast<HistoJ<unsigned int> *>(itr->second.get()))->update(monVal);
155  }
156  } else {
157  *(static_cast<IntJ *>(itr->second.get())) = monVal;
158  }
159  }
160  }
161  } else
162  assert(monType_ != TYPEINT); //not yet implemented, application error
163  } else
164  snapGlobal(lumi);
165 }
166 
167 void DataPoint::snapGlobal(unsigned int lumi) {
168  isCached_ = false;
169  if (isStream_)
170  return;
171  auto itr = globalDataMap_.find(lumi);
172  if (itr == globalDataMap_.end()) {
173  if (monType_ == TYPEINT) {
174  IntJ *ij = new IntJ;
175  ij->update((static_cast<IntJ *>(tracked_))->value());
176  globalDataMap_[lumi] = ij;
177  }
178  if (monType_ == TYPEDOUBLE) {
179  DoubleJ *dj = new DoubleJ;
180  dj->update((static_cast<DoubleJ *>(tracked_))->value());
181  globalDataMap_[lumi] = dj;
182  }
183  if (monType_ == TYPESTRING) {
184  StringJ *sj = new StringJ;
185  sj->update((static_cast<StringJ *>(tracked_))->value());
186  globalDataMap_[lumi] = sj;
187  }
188  } else {
189  if (monType_ == TYPEINT)
190  static_cast<IntJ *>(itr->second.get())->update((static_cast<IntJ *>(tracked_))->value());
191  else if (monType_ == TYPEDOUBLE)
192  static_cast<DoubleJ *>(itr->second.get())->update((static_cast<DoubleJ *>(tracked_))->value());
193  else if (monType_ == TYPESTRING)
194  static_cast<StringJ *>(itr->second.get())->concatenate((static_cast<StringJ *>(tracked_))->value());
195  }
196 }
197 
198 void DataPoint::snapStreamAtomic(unsigned int lumi, unsigned int streamID) {
199  if (!isStream_ || !isAtomic_)
200  return;
201  isCached_ = false;
202  if (monType_ == TYPEUINT) {
203  unsigned int monVal;
204 #if ATOMIC_LEVEL > 0
205  if (isAtomic_)
206  monVal = (static_cast<std::vector<AtomicMonUInt *> *>(tracked_))->at(streamID)->load(std::memory_order_relaxed);
207 #else
208  if (isAtomic_)
209  monVal = *((static_cast<std::vector<AtomicMonUInt *> *>(tracked_))->at(streamID));
210 #endif
211  else
212  monVal = (static_cast<std::vector<unsigned int> *>(tracked_))->at(streamID);
213 
214  auto itr = streamDataMaps_[streamID].find(lumi);
215  if (itr == streamDataMaps_[streamID].end()) //insert
216  {
217  if (opType_ == OPHISTO) {
218  if (*nBinsPtr_) {
220  h->update(monVal);
221  streamDataMaps_[streamID][lumi] = h;
222  }
223  } else { //default to SUM
224 
225  IntJ *h = new IntJ;
226  h->update(monVal);
227  streamDataMaps_[streamID][lumi] = h;
228  }
229  } else {
230  if (opType_ == OPHISTO) {
231  if (*nBinsPtr_) {
232  static_cast<HistoJ<unsigned int> *>(itr->second.get())->update(monVal);
233  }
234  } else
235  *(static_cast<IntJ *>(itr->second.get())) = monVal;
236  }
237  } else
238  assert(monType_ != TYPEINT); //not yet implemented
239 }
240 
242  if (tracked_) {
243  if (isStream_) {
244  std::stringstream ss;
245  if (sid < 0) {
246  if (isAtomic_) {
247  // if ATOMIC_LEVEL>0
248  // ss << (unsigned int) (static_cast<std::vector<AtomicMonUInt*>*>(tracked_))->at(fastIndex_)->load(std::memory_order_relaxed);
249 
250  ss << (unsigned int)*((static_cast<std::vector<AtomicMonUInt *> *>(tracked_))->at(fastIndex_));
251  fastIndex_ = (fastIndex_ + 1) % (static_cast<std::vector<AtomicMonUInt *> *>(tracked_))->size();
252  } else {
253  ss << (static_cast<std::vector<unsigned int> *>(tracked_))->at(fastIndex_);
254  fastIndex_ = (fastIndex_ + 1) % (static_cast<std::vector<unsigned int> *>(tracked_))->size();
255  }
256 
257  } else {
258  if (isAtomic_)
259  ss << (unsigned int)*((static_cast<std::vector<AtomicMonUInt *> *>(tracked_))->at(unsigned(sid)));
260  else
261  ss << (static_cast<std::vector<unsigned int> *>(tracked_))->at(unsigned(sid));
262  }
263  return ss.str();
264  }
265  return (static_cast<JsonMonitorable *>(tracked_))->toString();
266  }
267  return std::string("");
268 }
269 
271  assert(monType_ == TYPEUINT && isStream_); //for now only support UINT and SUM for stream variables
272  IntJ *newJ = new IntJ;
273  for (unsigned int i = 0; i < streamDataMaps_.size(); i++) {
274  auto itr = streamDataMaps_[i].find(lumi);
275  if (itr != streamDataMaps_[i].end()) {
276  newJ->add(static_cast<IntJ *>(itr->second.get())->value());
277  }
278  }
279  cacheI_ = newJ->value();
280  isCached_ = true;
281  return newJ; //assume the caller takes care of deleting the object
282 }
283 
284 void DataPoint::mergeAndSerialize(Json::Value &root, unsigned int lumi, bool initJsonValue, int sid) {
285  if (initJsonValue) {
286  root[SOURCE] = source_;
288  }
289 
290  if (isDummy_) {
291  root[DATA].append("N/A");
292  return;
293  }
294  if (!isStream_) {
295  auto itr = globalDataMap_.find(lumi);
296  if (itr != globalDataMap_.end()) {
297  root[DATA].append(itr->second.get()->toString());
298  } else {
299  if (NAifZeroUpdates_)
300  root[DATA].append("N/A");
301  else if (monType_ == TYPESTRING)
302  root[DATA].append("");
303  else
304  root[DATA].append("0");
305  }
306  return;
307  } else {
309  if (isCached_) {
310  std::stringstream ss;
311  ss << cacheI_;
312  root[DATA].append(ss.str());
313  return;
314  }
315  if (opType_ != OPHISTO) { //sum is default
316  std::stringstream ss;
317  unsigned int updates = 0;
318  unsigned int sum = 0;
319  if (sid < 1)
320  for (unsigned int i = 0; i < streamDataMaps_.size(); i++) {
321  auto itr = streamDataMaps_[i].find(lumi);
322  if (itr != streamDataMaps_[i].end()) {
323  sum += static_cast<IntJ *>(itr->second.get())->value();
324  updates++;
325  }
326  }
327  else {
328  auto itr = streamDataMaps_[unsigned(sid)].find(lumi);
329  if (itr != streamDataMaps_[unsigned(sid)].end()) {
330  sum += static_cast<IntJ *>(itr->second.get())->value();
331  updates++;
332  }
333  }
334  if (!updates && NAifZeroUpdates_)
335  ss << "N/A";
336  ss << sum;
337  root[DATA].append(ss.str());
338  return;
339  }
340  if (opType_ == OPHISTO) {
341  if (nBinsPtr_ == nullptr) {
342  root[DATA].append("N/A");
343  return;
344  }
345  if (*nBinsPtr_ > bufLen_) {
346  if (buf_)
347  delete[] buf_;
348  bufLen_ = *nBinsPtr_;
349  buf_ = new uint32_t[bufLen_];
350  }
351  memset(buf_, 0, bufLen_ * sizeof(uint32_t));
352  unsigned int updates = 0;
353  if (sid < 1)
354  for (unsigned int i = 0; i < streamDataMaps_.size(); i++) {
355  auto itr = streamDataMaps_[i].find(lumi);
356  if (itr != streamDataMaps_[i].end()) {
357  HistoJ<unsigned int> *monObj = static_cast<HistoJ<unsigned int> *>(itr->second.get());
358  updates += monObj->getUpdates();
359  auto &hvec = monObj->value();
360  for (unsigned int j = 0; j < hvec.size(); j++) {
361  unsigned int thisbin = (unsigned int)hvec[j];
362  if (thisbin < *nBinsPtr_) {
363  buf_[thisbin]++;
364  }
365  }
366  }
367  }
368  else {
369  auto itr = streamDataMaps_[unsigned(sid)].find(lumi);
370  if (itr != streamDataMaps_[unsigned(sid)].end()) {
371  HistoJ<unsigned int> *monObj = static_cast<HistoJ<unsigned int> *>(itr->second.get());
372  updates += monObj->getUpdates();
373  auto &hvec = monObj->value();
374  for (unsigned int j = 0; j < hvec.size(); j++) {
375  unsigned int thisbin = (unsigned int)hvec[j];
376  if (thisbin < *nBinsPtr_) {
377  buf_[thisbin]++;
378  }
379  }
380  }
381  }
382  std::stringstream ss;
383  if (!*nBinsPtr_ || (!updates && NAifZeroUpdates_))
384  ss << "N/A";
385  else {
386  ss << "[";
387  if (*nBinsPtr_) {
388  for (unsigned int i = 0; i < *nBinsPtr_ - 1; i++) {
389  ss << buf_[i] << ",";
390  }
391  ss << buf_[*nBinsPtr_ - 1];
392  }
393  ss << "]";
394  }
395  root[DATA].append(ss.str());
396  return;
397  }
398  }
399 }
400 
401 //wipe out data that will no longer be used
402 void DataPoint::discardCollected(unsigned int lumi) {
403  for (unsigned int i = 0; i < streamDataMaps_.size(); i++) {
404  auto itr = streamDataMaps_[i].find(lumi);
405  if (itr != streamDataMaps_[i].end())
406  streamDataMaps_[i].erase(lumi);
407  }
408 
409  auto itr = globalDataMap_.find(lumi);
410  if (itr != globalDataMap_.end())
411  globalDataMap_.erase(lumi);
412 }
jsoncollector::TYPEDOUBLE
Definition: JsonMonitorable.h:21
jsoncollector::DataPoint::globalDataMap_
MonPtrMap globalDataMap_
Definition: DataPoint.h:125
mps_fire.i
i
Definition: mps_fire.py:428
input
static const std::string input
Definition: EdmProvDump.cc:48
DataPoint.h
jsoncollector::DataPoint::tracked_
void * tracked_
Definition: DataPoint.h:126
jsoncollector::DataPoint::cacheI_
int cacheI_
Definition: DataPoint.h:146
jsoncollector::DataPoint::mergeAndSerialize
void mergeAndSerialize(Json::Value &jsonRoot, unsigned int lumi, bool initJsonValue, int sid)
Definition: DataPoint.cc:284
jsoncollector::HistoJ::value
std::vector< T > & value()
Definition: JsonMonitorable.h:224
jsoncollector::DataPoint::monType_
MonType monType_
Definition: DataPoint.h:137
jsoncollector::DataPoint::fastIndex_
unsigned int fastIndex_
Definition: DataPoint.h:149
h
FWCore Framework interface EventSetupRecordImplementation h
Helper function to determine trigger accepts.
Definition: L1TUtmAlgorithmRcd.h:4
jsoncollector::HistoJ< unsigned int >
jsoncollector::DataPoint::trackVectorUInt
void trackVectorUInt(std::string const &name, std::vector< unsigned int > *monvec, bool NAifZeroUpdates)
Definition: DataPoint.cc:83
cms::cuda::assert
assert(be >=bs)
jsoncollector::IntJ::update
void update(long sth)
Definition: JsonMonitorable.h:90
jsoncollector::DataPoint::trackVectorUIntAtomic
void trackVectorUIntAtomic(std::string const &name, std::vector< AtomicMonUInt * > *monvec, bool NAifZeroUpdates)
Definition: DataPoint.cc:92
jsoncollector::DataPoint::isAtomic_
bool isAtomic_
Definition: DataPoint.h:132
jsoncollector::JsonMonitorable::getName
virtual std::string & getName()
Definition: JsonMonitorable.h:40
jsoncollector::DataPoint::nBinsPtr_
unsigned int * nBinsPtr_
Definition: DataPoint.h:145
contentValuesCheck.ss
ss
Definition: contentValuesCheck.py:33
jsoncollector::JsonMonitorable::getUpdates
unsigned int getUpdates()
Definition: JsonMonitorable.h:34
jsoncollector::DataPoint::isStream_
bool isStream_
Definition: DataPoint.h:131
BXlumiParameters_cfi.lumi
lumi
Definition: BXlumiParameters_cfi.py:6
jsoncollector::MonPtrMap
std::map< unsigned int, JsonMonPtr > MonPtrMap
Definition: DataPoint.h:34
jsoncollector::TYPESTRING
Definition: JsonMonitorable.h:21
jsoncollector::DataPoint::isDummy_
bool isDummy_
Definition: DataPoint.h:133
jsoncollector::DataPoint::data_
std::vector< std::string > data_
Definition: DataPoint.h:122
h
jsoncollector::DoubleJ
Definition: JsonMonitorable.h:106
jsoncollector::DataPoint::DATA
static const std::string DATA
Definition: DataPoint.h:116
jsoncollector::DataPoint::fastOutCSV
std::string fastOutCSV(int sid=-1)
Definition: DataPoint.cc:241
cms::cuda::nh
uint32_t nh
Definition: HistoContainer.h:23
mps_fire.end
end
Definition: mps_fire.py:242
jsoncollector::TYPEUINT
Definition: JsonMonitorable.h:21
jsoncollector::DataPoint::makeStreamLumiMap
void makeStreamLumiMap(unsigned int size)
Definition: DataPoint.cc:104
jsoncollector::DataPoint::streamLumisPtr_
std::vector< unsigned int > * streamLumisPtr_
Definition: DataPoint.h:129
jsoncollector::DataPoint::NAifZeroUpdates_
bool NAifZeroUpdates_
Definition: DataPoint.h:134
jsoncollector::DataPoint::definition_
std::string definition_
Definition: DataPoint.h:121
jsoncollector::DataPoint::snapGlobal
void snapGlobal(unsigned int lumi)
Definition: DataPoint.cc:167
jsoncollector::TYPEINT
Definition: JsonMonitorable.h:21
jsoncollector::DataPoint::DEFINITION
static const std::string DEFINITION
Definition: DataPoint.h:115
AlCaHLTBitMon_QueryRunRegistry.string
string
Definition: AlCaHLTBitMon_QueryRunRegistry.py:256
jsoncollector::DataPoint::streamDataMaps_
std::vector< MonPtrMap > streamDataMaps_
Definition: DataPoint.h:124
jsoncollector::DataPoint::source_
std::string source_
Definition: DataPoint.h:120
mps_setup.append
append
Definition: mps_setup.py:85
createfilelist.int
int
Definition: createfilelist.py:10
jsoncollector::DataPoint::deserialize
void deserialize(Json::Value &root) override
Definition: DataPoint.cc:52
root
Definition: RooFitFunction.h:10
jsoncollector::JsonMonitorable
Definition: JsonMonitorable.h:24
jsoncollector::IntJ::value
long & value()
Definition: JsonMonitorable.h:88
jsoncollector
Definition: DataPoint.h:26
jsoncollector::DataPoint::SOURCE
static const std::string SOURCE
Definition: DataPoint.h:114
jsoncollector::StringJ::update
void update(std::string const &newStr)
Definition: JsonMonitorable.h:165
jsoncollector::DataPoint::discardCollected
void discardCollected(unsigned int forLumi)
Definition: DataPoint.cc:402
jsoncollector::DataPoint::mergeAndRetrieveValue
JsonMonitorable * mergeAndRetrieveValue(unsigned int forLumi)
Definition: DataPoint.cc:270
relativeConstraints.value
value
Definition: relativeConstraints.py:53
jsoncollector::DataPoint::opType_
OperationType opType_
Definition: DataPoint.h:138
jsoncollector::DoubleJ::update
void update(double sth)
Definition: JsonMonitorable.h:129
jsoncollector::IntJ::add
void add(long sth)
Definition: JsonMonitorable.h:97
jsoncollector::DataPoint::serialize
void serialize(Json::Value &root) const override
Definition: DataPoint.cc:41
Skims_PA_cff.name
name
Definition: Skims_PA_cff.py:17
jsoncollector::DataPoint::trackMonitorable
void trackMonitorable(JsonMonitorable *monitorable, bool NAifZeroUpdates)
Definition: DataPoint.cc:69
jsoncollector::IntJ
Definition: JsonMonitorable.h:66
jsoncollector::DataPoint::name_
std::string name_
Definition: DataPoint.h:139
jsoncollector::DataPoint::isCached_
bool isCached_
Definition: DataPoint.h:147
jsoncollector::DataPoint::~DataPoint
~DataPoint() override
Definition: DataPoint.cc:30
jsoncollector::OPHISTO
Definition: JsonMonitorable.h:22
dqmiolumiharvest.j
j
Definition: dqmiolumiharvest.py:66
lumi
Definition: LumiSectionData.h:20
jsoncollector::StringJ
Definition: JsonMonitorable.h:140
jsoncollector::DataPoint::snap
void snap(unsigned int lumi)
Definition: DataPoint.cc:120
jsoncollector::DataPoint::bufLen_
unsigned int bufLen_
Definition: DataPoint.h:143
jsoncollector::DataPoint::buf_
uint32_t * buf_
Definition: DataPoint.h:142
Json::Value
Represents a JSON value.
Definition: value.h:99
update
#define update(a, b)
Definition: TrackClassifier.cc:10
jsoncollector::DataPoint::snapStreamAtomic
void snapStreamAtomic(unsigned int lumi, unsigned int streamID)
Definition: DataPoint.cc:198
findQualityFiles.size
size
Write out results.
Definition: findQualityFiles.py:443
MAXUPDATES
#define MAXUPDATES
Definition: DataPoint.cc:16