CMS 3D CMS Logo

FastMonitor.cc
Go to the documentation of this file.
1 /*
2  * FastMonitor.cc
3  *
4  * Created on: Nov 27, 2012
5  * Author: aspataru
6  */
7 
12 
13 #include <fstream>
14 #include <iostream>
15 #include <sstream>
16 #include <cassert>
17 #include <sys/types.h>
18 #include <unistd.h>
19 #include <boost/filesystem/fstream.hpp>
20 
21 using namespace jsoncollector;
22 
23 FastMonitor::FastMonitor(std::string const& defPath, std::string const defGroup, bool strictChecking, bool useSource, bool useDefinition) :
24  defPath_(defPath),strictChecking_(strictChecking),useSource_(useSource),useDefinition_(useDefinition),nStreams_(1),deleteDef_(true)
25 {
26  //get host and PID info
27  if (useSource)
29 
30  //load definition file
31  dpd_ = new DataPointDefinition();
33 
34 }
35 
36 
37 FastMonitor::FastMonitor(DataPointDefinition * dpd, bool strictChecking, bool useSource, bool useDefinition) :
38  strictChecking_(strictChecking),useSource_(useSource),useDefinition_(useDefinition),nStreams_(1),dpd_(dpd)
39 {
40  //get host and PID info
41  if (useSource)
43 }
44 
46 {
47  for (auto dp: dataPoints_) delete dp;
48  if (deleteDef_) delete dpd_;
49  if (deleteDefFast_) delete dpdFast_;
50 }
51 
52 void FastMonitor::addFastPathDefinition(std::string const& defPathFast, std::string const defGroupFast, bool strict)
53 {
54  haveFastPath_=true;
55  defPathFast_=defPathFast;
59  deleteDefFast_=true;
60 }
61 
62 //per-process variables
63 void FastMonitor::registerGlobalMonitorable(JsonMonitorable *newMonitorable, bool NAifZeroUpdates, unsigned int *nBins)
64 {
66  dp->trackMonitorable(newMonitorable,NAifZeroUpdates);
67  dp->setNBins(nBins);
68  dataPoints_.push_back(dp);
69  dpNameMap_[newMonitorable->getName()]=dataPoints_.size()-1;
70 
71  //checks if the same name is registered twice
72  assert(uids_.insert(newMonitorable->getName()).second);
73 }
74 
75 //fast path: no merge operation is performed
77 {
79  dp->trackMonitorable(newMonitorable,false);
80  dataPointsFastOnly_.push_back(dp);
81 }
82 
83 //per-stream variables
84 void FastMonitor::registerStreamMonitorableUIntVec(std::string const& name, std::vector<unsigned int> *inputs, bool NAifZeroUpdates, unsigned int *nBins)
85 {
87  dp->trackVectorUInt(name,inputs,NAifZeroUpdates);
88  dp->setNBins(nBins);
89  dataPoints_.push_back(dp);
90  dpNameMap_[name]=dataPoints_.size()-1;
91  assert (uids_.insert(name).second);
92 }
93 
94 
95 //atomic variables with guaranteed updates at the time of reading
96 void FastMonitor::registerStreamMonitorableUIntVecAtomic(std::string const& name, std::vector<AtomicMonUInt*> *inputs, bool NAifZeroUpdates, unsigned int *nBins)
97 {
98  std::string definitionToPass;
99  if (useDefinition_) definitionToPass=defPath_;
100  DataPoint *dp = new DataPoint(definitionToPass,sourceInfo_);
101  dp->trackVectorUIntAtomic(name,inputs,NAifZeroUpdates);
102  dp->setNBins(nBins);
103  dataPoints_.push_back(dp);
104  dpNameMap_[name]=dataPoints_.size()-1;
105  assert (uids_.insert(name).second);
106 }
107 
108 
109 
110 void FastMonitor::commit(std::vector<unsigned int> *streamLumisPtr)
111 {
112  std::vector<std::string> const& jsonNames= dpd_->getNames();
113  regDpCount_ = dataPoints_.size();
114  if (strictChecking_)
115  assert(jsonNames.size()==regDpCount_);
116 
117  std::map<unsigned int,bool> hasJson;
118  for (unsigned int i=0;i<jsonNames.size();i++)
119  {
120  bool notFoundVar=true;
121  for (unsigned int j=0;j<regDpCount_;j++) {
122  if (dataPoints_[j]->getName()==jsonNames[i])
123  {
124  dataPoints_[j]->setOperation(dpd_->getOperationFor(i));
125  jsonDpIndex_.push_back(j);
126  hasJson[j]=true;
127  notFoundVar=false;
128  break;
129  }
130  }
131  if (notFoundVar) {
132  assert(!strictChecking_);
133  //push dummy DP if not registered by the service so that we output required JSON/CSV
134  DataPoint *dummyDp = new DataPoint(sourceInfo_,defPath_);
135  dummyDp->trackDummy(jsonNames[i],true);
136  dataPoints_.push_back(dummyDp);
137  jsonDpIndex_.push_back(dataPoints_.size()-1);
138  }
139  }
140  for (unsigned int i=0;i<regDpCount_;i++) {
141  dataPoints_[i]->setStreamLumiPtr(streamLumisPtr);
142  }
143 
144  //fast path:
145  if (haveFastPath_) {
146  std::vector<std::string> const& fjsonNames = dpdFast_->getNames();
148  assert(!(fastPathStrictChecking_ && fjsonNames.size()==fregDpCount_));
149  std::map<unsigned int,bool> fhasJson;
150  for (unsigned int i=0;i<fjsonNames.size();i++)
151  {
152  bool notFoundVar=true;
153  for (unsigned int j=0;j<fregDpCount_;j++) {
154  if (dataPointsFastOnly_[j]->getName()==fjsonNames[i])
155  {
157  fhasJson[j]=true;
158  notFoundVar=false;
159  break;
160  }
161  }
162  if (notFoundVar)
163  {
164  //try to find variable among slow variables
165 
166  bool notFoundVarSlow=true;
167  for (unsigned int j=0;j<regDpCount_;j++) {
168  if (dataPoints_[j]->getName()==fjsonNames[i])
169  {
170  jsonDpIndexFast_.push_back(dataPoints_[j]);
171  //fhasJson[j]=true;
172  notFoundVarSlow=false;
173  break;
174  }
175  }
176 
177  assert(!(fastPathStrictChecking_ && !notFoundVarSlow));
178  //push dummy DP if not registered by the service so that we output required JSON/CSV
179  if (notFoundVarSlow) {
181  dummyDp->trackDummy(fjsonNames[i],true);
182  dataPointsFastOnly_.push_back(dummyDp);
183  jsonDpIndexFast_.push_back(dummyDp);
184  }
185  }
186  }
187  }
188 }
189 
190 //update everything
191 void FastMonitor::snap(unsigned int ls)
192 {
193  recentSnaps_++;
195  for (unsigned int i=0;i<regDpCount_;i++) {
196  dataPoints_[i]->snap(ls);
197  }
198 }
199 
200 //update for global variables as most of them are correct only at global EOL
201 void FastMonitor::snapGlobal(unsigned int ls)
202 {
203  recentSnaps_++;
204  for (unsigned int i=0;i<regDpCount_;i++) {
205  dataPoints_[i]->snapGlobal(ls);
206  }
207 }
208 
209 //update atomic per-stream vars(e.g. event counters) not updating time-based measurements (mini/microstate)
210 void FastMonitor::snapStreamAtomic(unsigned int ls, unsigned int streamID)
211 {
212  recentSnaps_++;
213  for (unsigned int i=0;i<regDpCount_;i++) {
214  dataPoints_[i]->snapStreamAtomic(ls, streamID);
215  }
216 }
217 
219 {
220  //output what was specified in JSON in the same order (including dummies)
221  unsigned int monSize = jsonDpIndexFast_.size();
222  std::stringstream ss;
223  if (monSize) {
224  for (unsigned int j=0; j< monSize;j++) {
225  ss << jsonDpIndexFast_[j]->fastOutCSV(sid);
226  if (j<monSize-1) ss << ",";
227  }
228  }
229  return ss.str();
230 }
231 
232 void FastMonitor::outputCSV(std::string const& path, std::string const& csvString)
233 {
234  std::ofstream outputFile;
235  outputFile.open(path.c_str(), std::fstream::out | std::fstream::trunc);
236  outputFile << defPathFast_ << std::endl;
237  outputFile << csvString << std::endl;
238  outputFile.close();
239 }
240 
241 
242 //get one variable (caller must delete it later)
244 {
245  auto it = dpNameMap_.find(name);
246  assert(it!=dpNameMap_.end());
247  return dataPoints_[it->second]->mergeAndRetrieveValue(forLumi);
248 }
249 
250 bool FastMonitor::outputFullJSONs(std::string const& pathstem, std::string const& ext, unsigned int lumi)
251 {
252  LogDebug("FastMonitor") << "SNAP updates -: " << recentSnaps_ << " (by timer: " << recentSnapsTimer_
253  << ") in lumisection ";
254 
255  recentSnaps_ = recentSnapsTimer_ = 0;
256  for (unsigned int i=0;i<nStreams_;i++) {
257  Json::Value serializeRoot;
258  for (unsigned int j=0; j< jsonDpIndex_.size();j++) {
259  dataPoints_[jsonDpIndex_[j]]->mergeAndSerialize(serializeRoot,lumi,true,i);
260  }
261  //get extension
262  std::stringstream tidext;
263  tidext << "_tid" << i;
264  std::string path = pathstem + tidext.str() + ext;
265 
267  std::string && result = writer.write(serializeRoot);
268  FileIO::writeStringToFile(path, result);
269  }
270  return true;
271 }
272 
274 {
275  LogDebug("FastMonitor") << "SNAP updates -: " << recentSnaps_ << " (by timer: " << recentSnapsTimer_
276  << ") in lumisection ";
277 
278  recentSnaps_ = recentSnapsTimer_ = 0;
279  Json::Value serializeRoot;
280  for (unsigned int j=0; j< jsonDpIndex_.size();j++) {
281  dataPoints_[jsonDpIndex_[j]]->mergeAndSerialize(serializeRoot,lumi,j==0,-1);
282  }
283 
285  std::string && result = writer.write(serializeRoot);
286  FileIO::writeStringToFile(path, result);
287  return true;
288 }
289 
290 void FastMonitor::discardCollected(unsigned int forLumi)
291 {
292  for (auto dp: dataPoints_) dp->discardCollected(forLumi);
293 }
294 
296 {
297  std::stringstream hpid;
298  int pid = (int) getpid();
299  char hostname[128];
300  gethostname(hostname, sizeof hostname);
301  hpid << hostname << "_" << pid;
302  sHPid = hpid.str();
303 }
304 
#define LogDebug(id)
void registerFastGlobalMonitorable(JsonMonitorable *newMonitorable)
Definition: FastMonitor.cc:76
void addFastPathDefinition(std::string const &defPathFast, std::string const defGroupFast, bool strict)
Definition: FastMonitor.cc:52
void setNBins(unsigned int *nBins)
Definition: DataPoint.h:114
DataPointDefinition * dpdFast_
Definition: FastMonitor.h:97
void trackVectorUInt(std::string const &name, std::vector< unsigned int > *monvec, bool NAifZeroUpdates)
Definition: DataPoint.cc:86
OperationType getOperationFor(unsigned int index)
void trackDummy(std::string const &name, bool setNAifZeroUpdates)
Definition: DataPoint.h:82
virtual std::string & getName()
void snapStreamAtomic(unsigned int ls, unsigned int streamID)
Definition: FastMonitor.cc:210
void registerStreamMonitorableUIntVecAtomic(std::string const &name, std::vector< AtomicMonUInt * > *inputs, bool NAifZeroUpdates, unsigned int *nBins=nullptr)
Definition: FastMonitor.cc:96
std::vector< DataPoint * > dataPointsFastOnly_
Definition: FastMonitor.h:102
std::map< std::string, unsigned int > dpNameMap_
Definition: FastMonitor.h:106
void trackMonitorable(JsonMonitorable *monitorable, bool NAifZeroUpdates)
Definition: DataPoint.cc:74
bool outputFullJSONs(std::string const &pathstem, std::string const &ext, unsigned int lumi)
Definition: FastMonitor.cc:250
Represents a JSON value.
Definition: value.h:111
FastMonitor(std::string const &defPath, std::string const defGroup, bool strictChecking, bool useSource=true, bool useDefinition=true)
Definition: FastMonitor.cc:23
void registerGlobalMonitorable(JsonMonitorable *newMonitorable, bool NAifZeroUpdates, unsigned int *nBins=nullptr)
Definition: FastMonitor.cc:63
std::vector< unsigned int > jsonDpIndex_
Definition: FastMonitor.h:103
U second(std::pair< T, U > const &p)
static bool getDataPointDefinitionFor(std::string &defFilePath, DataPointDefinition *dpd, const std::string *defaultGroup=nullptr)
std::vector< DataPoint * > dataPoints_
Definition: FastMonitor.h:101
void registerStreamMonitorableUIntVec(std::string const &name, std::vector< unsigned int > *inputs, bool NAifZeroUpdates, unsigned int *nBins=nullptr)
Definition: FastMonitor.cc:84
std::unordered_set< std::string > uids_
Definition: FastMonitor.h:113
static void writeStringToFile(std::string const &filename, std::string &content)
Definition: FileIO.cc:21
void commit(std::vector< unsigned int > *streamLumisPtr)
Definition: FastMonitor.cc:110
void trackVectorUIntAtomic(std::string const &name, std::vector< AtomicMonUInt * > *monvec, bool NAifZeroUpdates)
Definition: DataPoint.cc:96
def ls(path, rec=False)
Definition: eostools.py:349
std::string getCSVString(int sid=-1)
Definition: FastMonitor.cc:218
std::vector< DataPoint * > jsonDpIndexFast_
Definition: FastMonitor.h:104
void outputCSV(std::string const &path, std::string const &csvString)
Definition: FastMonitor.cc:232
std::string write(const Value &root) override
Serialize a Value in JSON format.
void snap(unsigned int ls)
Definition: FastMonitor.cc:191
DataPointDefinition * dpd_
Definition: FastMonitor.h:96
JsonMonitorable * getMergedIntJForLumi(std::string const &name, unsigned int forLumi)
Definition: FastMonitor.cc:243
unsigned int recentSnapsTimer_
Definition: FastMonitor.h:109
std::vector< std::string > const & getNames()
void getHostAndPID(std::string &sHPid)
Definition: FastMonitor.cc:295
Definition: memstream.h:15
Writes a Value in JSON format in a human friendly way.
Definition: writer.h:65
void discardCollected(unsigned int forLumi)
Definition: FastMonitor.cc:290
bool outputFullJSON(std::string const &path, unsigned int lumi)
Definition: FastMonitor.cc:273
void snapGlobal(unsigned int ls)
Definition: FastMonitor.cc:201