test
CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
FastMonitor.cc
Go to the documentation of this file.
1 /*
2  * FastMonitor.cc
3  *
4  * Created on: Nov 27, 2012
5  * Author: aspataru
6  */
7 
12 
13 #include <fstream>
14 #include <iostream>
15 #include <sstream>
16 #include <assert.h>
17 #include <sys/types.h>
18 #include <unistd.h>
19 
20 using namespace jsoncollector;
21 
22 FastMonitor::FastMonitor(std::string const& defPath, std::string const defGroup, bool strictChecking, bool useSource, bool useDefinition) :
23  defPath_(defPath),strictChecking_(strictChecking),useSource_(useSource),useDefinition_(useDefinition),nStreams_(1),deleteDef_(true)
24 {
25  //get host and PID info
26  if (useSource)
28 
29  //load definition file
30  dpd_ = new DataPointDefinition();
32 
33 }
34 
35 
36 FastMonitor::FastMonitor(DataPointDefinition * dpd, bool strictChecking, bool useSource, bool useDefinition) :
37  strictChecking_(strictChecking),useSource_(useSource),useDefinition_(useDefinition),nStreams_(1),dpd_(dpd)
38 {
39  //get host and PID info
40  if (useSource)
42 }
43 
45 {
46  for (auto dp: dataPoints_) delete dp;
47  if (deleteDef_) delete dpd_;
48  if (deleteDefFast_) delete dpdFast_;
49 }
50 
51 void FastMonitor::addFastPathDefinition(std::string const& defPathFast, std::string const defGroupFast, bool strict)
52 {
53  haveFastPath_=true;
54  defPathFast_=defPathFast;
58  deleteDefFast_=true;
59 }
60 
61 //per-process variables
62 void FastMonitor::registerGlobalMonitorable(JsonMonitorable *newMonitorable, bool NAifZeroUpdates, unsigned int *nBins)
63 {
65  dp->trackMonitorable(newMonitorable,NAifZeroUpdates);
66  dp->setNBins(nBins);
67  dataPoints_.push_back(dp);
68  dpNameMap_[newMonitorable->getName()]=dataPoints_.size()-1;
69 
70  //checks if the same name is registered twice
71  assert(uids_.insert(newMonitorable->getName()).second);
72 }
73 
74 //fast path: no merge operation is performed
76 {
78  dp->trackMonitorable(newMonitorable,false);
79  dataPointsFastOnly_.push_back(dp);
80 }
81 
82 //per-stream variables
83 void FastMonitor::registerStreamMonitorableUIntVec(std::string const& name, std::vector<unsigned int> *inputs, bool NAifZeroUpdates, unsigned int *nBins)
84 {
86  dp->trackVectorUInt(name,inputs,NAifZeroUpdates);
87  dp->setNBins(nBins);
88  dataPoints_.push_back(dp);
89  dpNameMap_[name]=dataPoints_.size()-1;
90  assert (uids_.insert(name).second);
91 }
92 
93 
94 //atomic variables with guaranteed updates at the time of reading
95 void FastMonitor::registerStreamMonitorableUIntVecAtomic(std::string const& name, std::vector<AtomicMonUInt*> *inputs, bool NAifZeroUpdates, unsigned int *nBins)
96 {
97  std::string definitionToPass;
98  if (useDefinition_) definitionToPass=defPath_;
99  DataPoint *dp = new DataPoint(definitionToPass,sourceInfo_);
100  dp->trackVectorUIntAtomic(name,inputs,NAifZeroUpdates);
101  dp->setNBins(nBins);
102  dataPoints_.push_back(dp);
103  dpNameMap_[name]=dataPoints_.size()-1;
104  assert (uids_.insert(name).second);
105 }
106 
107 
108 
109 void FastMonitor::commit(std::vector<unsigned int> *streamLumisPtr)
110 {
111  std::vector<std::string> const& jsonNames= dpd_->getNames();
112  regDpCount_ = dataPoints_.size();
113  if (strictChecking_)
114  assert(jsonNames.size()==regDpCount_);
115 
116  std::map<unsigned int,bool> hasJson;
117  for (unsigned int i=0;i<jsonNames.size();i++)
118  {
119  bool notFoundVar=true;
120  for (unsigned int j=0;j<regDpCount_;j++) {
121  if (dataPoints_[j]->getName()==jsonNames[i])
122  {
123  dataPoints_[j]->setOperation(dpd_->getOperationFor(i));
124  jsonDpIndex_.push_back(j);
125  hasJson[j]=true;
126  notFoundVar=false;
127  break;
128  }
129  }
130  if (notFoundVar) {
132  //push dummy DP if not registered by the service so that we output required JSON/CSV
133  DataPoint *dummyDp = new DataPoint(sourceInfo_,defPath_);
134  dummyDp->trackDummy(jsonNames[i],true);
135  dataPoints_.push_back(dummyDp);
136  jsonDpIndex_.push_back(dataPoints_.size()-1);
137  }
138  }
139  for (unsigned int i=0;i<regDpCount_;i++) {
140  dataPoints_[i]->setStreamLumiPtr(streamLumisPtr);
141  }
142 
143  //fast path:
144  if (haveFastPath_) {
145  std::vector<std::string> const& fjsonNames = dpdFast_->getNames();
147  assert(!(fastPathStrictChecking_ && fjsonNames.size()==fregDpCount_));
148  std::map<unsigned int,bool> fhasJson;
149  for (unsigned int i=0;i<fjsonNames.size();i++)
150  {
151  bool notFoundVar=true;
152  for (unsigned int j=0;j<fregDpCount_;j++) {
153  if (dataPointsFastOnly_[j]->getName()==fjsonNames[i])
154  {
156  fhasJson[j]=true;
157  notFoundVar=false;
158  break;
159  }
160  }
161  if (notFoundVar)
162  {
163  //try to find variable among slow variables
164 
165  bool notFoundVarSlow=true;
166  for (unsigned int j=0;j<regDpCount_;j++) {
167  if (dataPoints_[j]->getName()==fjsonNames[i])
168  {
169  jsonDpIndexFast_.push_back(dataPoints_[j]);
170  //fhasJson[j]=true;
171  notFoundVarSlow=false;
172  break;
173  }
174  }
175 
176  assert(!(fastPathStrictChecking_ && !notFoundVarSlow));
177  //push dummy DP if not registered by the service so that we output required JSON/CSV
178  if (notFoundVarSlow) {
180  dummyDp->trackDummy(fjsonNames[i],true);
181  dataPointsFastOnly_.push_back(dummyDp);
182  jsonDpIndexFast_.push_back(dummyDp);
183  }
184  }
185  }
186  }
187 }
188 
189 //update everything
190 void FastMonitor::snap(unsigned int ls)
191 {
192  recentSnaps_++;
194  for (unsigned int i=0;i<regDpCount_;i++) {
195  dataPoints_[i]->snap(ls);
196  }
197 }
198 
199 //update for global variables as most of them are correct only at global EOL
200 void FastMonitor::snapGlobal(unsigned int ls)
201 {
202  recentSnaps_++;
203  for (unsigned int i=0;i<regDpCount_;i++) {
204  dataPoints_[i]->snapGlobal(ls);
205  }
206 }
207 
208 //update atomic per-stream vars(e.g. event counters) not updating time-based measurements (mini/microstate)
209 void FastMonitor::snapStreamAtomic(unsigned int ls, unsigned int streamID)
210 {
211  recentSnaps_++;
212  for (unsigned int i=0;i<regDpCount_;i++) {
213  dataPoints_[i]->snapStreamAtomic(ls, streamID);
214  }
215 }
216 
218 {
219  //output what was specified in JSON in the same order (including dummies)
220  unsigned int monSize = jsonDpIndexFast_.size();
221  std::stringstream ss;
222  if (monSize) {
223  for (unsigned int j=0; j< monSize;j++) {
224  ss << jsonDpIndexFast_[j]->fastOutCSV();
225  if (j<monSize-1) ss << ",";
226  }
227  }
228  return ss.str();
229 }
230 
231 void FastMonitor::outputCSV(std::string const& path, std::string const& csvString)
232 {
233  std::ofstream outputFile;
234  outputFile.open(path.c_str(), std::fstream::out | std::fstream::trunc);
235  outputFile << defPathFast_ << std::endl;
236  outputFile << csvString << std::endl;
237  outputFile.close();
238 }
239 
240 
241 //get one variable (caller must delete it later)
243 {
244  auto it = dpNameMap_.find(name);
245  assert(it!=dpNameMap_.end());
246  return dataPoints_[it->second]->mergeAndRetrieveValue(forLumi);
247 }
248 
249 bool FastMonitor::outputFullJSON(std::string const& path, unsigned int lumi, bool log)
250 {
251  if (log)
252  LogDebug("FastMonitor") << "SNAP updates -: " << recentSnaps_ << " (by timer: " << recentSnapsTimer_
253  << ") in lumisection ";
254 
255  recentSnaps_ = recentSnapsTimer_ = 0;
256 
257  Json::Value serializeRoot;
258  for (unsigned int j=0; j< jsonDpIndex_.size();j++) {
259  dataPoints_[jsonDpIndex_[j]]->mergeAndSerialize(serializeRoot,lumi,j==0);
260  }
261 
262  Json::StyledWriter writer;
263  std::string && result = writer.write(serializeRoot);
264  FileIO::writeStringToFile(path, result);
265  return true;
266 }
267 
268 void FastMonitor::discardCollected(unsigned int forLumi)
269 {
270  for (auto dp: dataPoints_) dp->discardCollected(forLumi);
271 }
272 
274 {
275  std::stringstream hpid;
276  int pid = (int) getpid();
277  char hostname[128];
278  gethostname(hostname, sizeof hostname);
279  hpid << hostname << "_" << pid;
280  sHPid = hpid.str();
281 }
282 
#define LogDebug(id)
void registerFastGlobalMonitorable(JsonMonitorable *newMonitorable)
Definition: FastMonitor.cc:75
void addFastPathDefinition(std::string const &defPathFast, std::string const defGroupFast, bool strict)
Definition: FastMonitor.cc:51
int i
Definition: DBlmapReader.cc:9
void setNBins(unsigned int *nBins)
Definition: DataPoint.h:114
DataPointDefinition * dpdFast_
Definition: FastMonitor.h:96
void trackVectorUInt(std::string const &name, std::vector< unsigned int > *monvec, bool NAifZeroUpdates)
Definition: DataPoint.cc:86
OperationType getOperationFor(unsigned int index)
bool outputFullJSON(std::string const &path, unsigned int lumi, bool log=true)
Definition: FastMonitor.cc:249
void trackDummy(std::string const &name, bool setNAifZeroUpdates)
Definition: DataPoint.h:82
virtual std::string & getName()
void snapStreamAtomic(unsigned int ls, unsigned int streamID)
Definition: FastMonitor.cc:209
tuple lumi
Definition: fjr2json.py:35
void registerStreamMonitorableUIntVecAtomic(std::string const &name, std::vector< AtomicMonUInt * > *inputs, bool NAifZeroUpdates, unsigned int *nBins=nullptr)
Definition: FastMonitor.cc:95
assert(m_qm.get())
def ls
Definition: eostools.py:348
std::vector< DataPoint * > dataPointsFastOnly_
Definition: FastMonitor.h:101
std::map< std::string, unsigned int > dpNameMap_
Definition: FastMonitor.h:105
void trackMonitorable(JsonMonitorable *monitorable, bool NAifZeroUpdates)
Definition: DataPoint.cc:74
Represents a JSON value.
Definition: value.h:111
FastMonitor(std::string const &defPath, std::string const defGroup, bool strictChecking, bool useSource=true, bool useDefinition=true)
Definition: FastMonitor.cc:22
void registerGlobalMonitorable(JsonMonitorable *newMonitorable, bool NAifZeroUpdates, unsigned int *nBins=nullptr)
Definition: FastMonitor.cc:62
std::vector< unsigned int > jsonDpIndex_
Definition: FastMonitor.h:102
U second(std::pair< T, U > const &p)
static bool getDataPointDefinitionFor(std::string &defFilePath, DataPointDefinition *dpd, const std::string *defaultGroup=nullptr)
std::vector< DataPoint * > dataPoints_
Definition: FastMonitor.h:100
void registerStreamMonitorableUIntVec(std::string const &name, std::vector< unsigned int > *inputs, bool NAifZeroUpdates, unsigned int *nBins=nullptr)
Definition: FastMonitor.cc:83
std::unordered_set< std::string > uids_
Definition: FastMonitor.h:112
tuple result
Definition: query.py:137
int j
Definition: DBlmapReader.cc:9
static void writeStringToFile(std::string const &filename, std::string &content)
Definition: FileIO.cc:21
virtual std::string write(const Value &root)
Serialize a Value in JSON format.
void commit(std::vector< unsigned int > *streamLumisPtr)
Definition: FastMonitor.cc:109
void trackVectorUIntAtomic(std::string const &name, std::vector< AtomicMonUInt * > *monvec, bool NAifZeroUpdates)
Definition: DataPoint.cc:96
tuple out
Definition: dbtoconf.py:99
tuple pid
Definition: sysUtil.py:22
std::vector< DataPoint * > jsonDpIndexFast_
Definition: FastMonitor.h:103
void outputCSV(std::string const &path, std::string const &csvString)
Definition: FastMonitor.cc:231
void snap(unsigned int ls)
Definition: FastMonitor.cc:190
DataPointDefinition * dpd_
Definition: FastMonitor.h:95
JsonMonitorable * getMergedIntJForLumi(std::string const &name, unsigned int forLumi)
Definition: FastMonitor.cc:242
unsigned int recentSnapsTimer_
Definition: FastMonitor.h:108
std::vector< std::string > const & getNames()
void getHostAndPID(std::string &sHPid)
Definition: FastMonitor.cc:273
Writes a Value in JSON format in a human friendly way.
Definition: writer.h:65
void discardCollected(unsigned int forLumi)
Definition: FastMonitor.cc:268
void snapGlobal(unsigned int ls)
Definition: FastMonitor.cc:200
tuple log
Definition: cmsBatch.py:341