CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
FUShmDQMOutputService.cc
Go to the documentation of this file.
1 
33 #include "TClass.h"
34 #include "zlib.h"
35 #include <unistd.h>
36 #include <sys/types.h>
37 
38 using namespace std;
39 
45 #define DSS_DEBUG 0
46 
52 
57  edm::ActivityRegistry &actReg)
58  : evf::ServiceWeb("FUShmDQMOutputService")
59  , updateNumber_(0)
60  , shmBuffer_(0)
61  , nbUpdates_(0)
62  , input("INPUT")
63  , dqm("DQM")
64  , attach_(false)
65 {
66 
67  // specify the routine to be called after event processing. This routine
68  // will be used to periodically fetch monitor elements from the DQM
69  // backend and write out to shared memory for sending to the storage manager.
71 
72  // specify the routine to be called after the input source has been
73  // constructed. This routine will be used to initialize our connection
74  // to the storage manager and any other needed setup.??
75  actReg.watchPostSourceConstruction(this,
77 
78  // specify the routine to be called when a run begins
80 
81  // specify the routine to be called when the job has finished. It will
82  // be used to disconnect from the SM, if needed, and any other shutdown
83  // tasks that are needed.??
85 
86  // set internal values from the parameter set
87  int initialSize =
88  pset.getUntrackedParameter<int>("initialMessageBufferSize", 1000000);
89  messageBuffer_.resize(initialSize);
90  lumiSectionsPerUpdate_ = pset.getParameter<double>("lumiSectionsPerUpdate");
91  // for the moment, only support a number of lumi sections per update >= 1
94  useCompression_ = pset.getParameter<bool>("useCompression");
95  compressionLevel_ = pset.getParameter<int>("compressionLevel");
96  // the default for lumiSectionInterval_ is 0, meaning get it from the event
97  // otherwise we get a fake one that should match the fake lumi block
98  // for events (if any) as long as the time between lumi blocks is larger
99  // than the time difference between creating this service and the
100  // FUShmOutputModule event output module
102  pset.getUntrackedParameter<int>("lumiSectionInterval", 0); // seconds
103  if (lumiSectionInterval_ < 1) {lumiSectionInterval_ = 0;}
104 
105  // for fake test luminosity sections
106  struct timeval now;
107  struct timezone dummyTZ;
108  gettimeofday(&now, &dummyTZ);
109  // we will count lumi section numbers from this time
110  timeInSecSinceUTC_ = static_cast<double>(now.tv_sec) + (static_cast<double>(now.tv_usec)/1000000.0);
111 
112  int got_host = gethostname(host_name_, sizeof(host_name_));
113  if(got_host != 0) strcpy(host_name_, "noHostNameFoundOrTooLong");
114 
115  if (! fuIdsInitialized_) {
116  fuIdsInitialized_ = true;
117 
118  edm::Guid guidObj(true);
119  std::string guidString = guidObj.toString();
120  //std::cout << "DQMOutput GUID string = " << guidString << std::endl;
121 
122  uLong crc = crc32(0L, Z_NULL, 0);
123  Bytef* buf = (Bytef*)guidString.data();
124  crc = crc32(crc, buf, guidString.length());
125  fuGuidValue_ = crc;
126 
127  //std::cout << "DQMOutput GUID value = 0x" << std::hex << fuGuidValue_ << std::endl;
128  }
129 }
130 
135 {
136  shmdt(shmBuffer_);
137 }
138 
140 {
141 }
142 
143 void FUShmDQMOutputService::publish(xdata::InfoSpace *is)
144 {
145  try{
146  is->fireItemAvailable("nbDqmUpdates",&nbUpdates_);
147  }
149  edm::LogInfo("FUShmDQMOutputService")
150  << " exception when publishing to InfoSpace ";
151  }
152 }
153 
155 {
156  if (attach_) attachToShm();
157  attach_=false;
158 
159  evf::MicroStateService *mss = 0;
160  try{
162  if(mss) mss->setMicroState(&dqm);
163  }
164  catch(...) {
165  edm::LogError("FUShmDQMOutputService")<< "exception when trying to get service MicroStateService";
166  }
167 
168 
169  // fake the luminosity section if we don't want to use the real one
170  unsigned int thisLumiSection = 0;
171  if(lumiSectionInterval_ == 0)
172  thisLumiSection = lb.luminosityBlock();
173  else {
174  // match the code in Event output module to get the same (almost) lumi number
175  struct timeval now;
176  struct timezone dummyTZ;
177  gettimeofday(&now, &dummyTZ);
178  double timeInSec = static_cast<double>(now.tv_sec) + (static_cast<double>(now.tv_usec)/1000000.0) - timeInSecSinceUTC_;
179  // what about overflows?
180  if(lumiSectionInterval_ > 0) thisLumiSection = static_cast<uint32>(timeInSec/lumiSectionInterval_);
181  }
182 
183  // special handling for the first event
185  initializationIsNeeded_ = false;
186  lumiSectionOfPreviousUpdate_ = thisLumiSection;
187  firstLumiSectionSeen_ = thisLumiSection;
188 
189  // for when a run(job) had ended and we start a new run(job)
190  // for fake test luminosity sections
191  struct timeval now;
192  struct timezone dummyTZ;
193  gettimeofday(&now, &dummyTZ);
194  // we will count lumi section numbers from this time
195  timeInSecSinceUTC_ = static_cast<double>(now.tv_sec) + (static_cast<double>(now.tv_usec)/1000000.0);
196  }
197 
198  // std::cout << getpid() << ": :" //<< gettid()
199  // << ":DQMOutputService check if have to send update for lumiSection " << thisLumiSection << std::endl;
200  if(thisLumiSection%4!=0)
201  {
202 // std::cout << getpid() << ": :" //<< gettid()
203 // << ":DQMOutputService skipping update for lumiSection " << thisLumiSection << std::endl;
204  if(mss) mss->setMicroState(&input);
205  return;
206  }
207 // std::cout << getpid() << ": :" //<< gettid()
208 // << ":DQMOutputService sending update for lumiSection " << thisLumiSection << std::endl;
209  // Calculate the update ID and lumi ID for this update
210  // fullUpdateRatio and fullLsDelta are unused. comment out the calculation.
211  //int fullLsDelta = (int) (thisLumiSection - firstLumiSectionSeen_);
212  //double fullUpdateRatio = ((double) fullLsDelta) / lumiSectionsPerUpdate_;
213  // this is the update number starting from zero
214 
215  // this is the actual luminosity section number for the beginning lumi section of this update
216  unsigned int lumiSectionTag = thisLumiSection;
217 
218  // retry the lookup of the backend interface, if needed
219  if (bei == NULL) {
221  }
222 
223  // to go any further, a backend interface pointer is crucial
224  if (bei == NULL) {
225  throw cms::Exception("postEventProcessing", "FUShmDQMOutputService")
226  << "Unable to lookup the DQMStore service!\n";
227  }
228 
229  // determine the top level folders (these will be used for grouping
230  // monitor elements into DQM Events)
231  std::vector<std::string> topLevelFolderList;
232  //std::cout << "### SenderService, pwd = " << bei->pwd() << std::endl;
233  bei->cd();
234  //std::cout << "### SenderService, pwd = " << bei->pwd() << std::endl;
235  topLevelFolderList = bei->getSubdirs();
236 
237  // find the monitor elements under each top level folder (including
238  // subdirectories)
239  std::map< std::string, DQMEvent::TObjectTable > toMap;
240  std::vector<std::string>::const_iterator dirIter;
241  for (dirIter = topLevelFolderList.begin();
242  dirIter != topLevelFolderList.end();
243  dirIter++) {
244  std::string dirName = *dirIter;
245  DQMEvent::TObjectTable toTable;
246 
247  // find the MEs
248  findMonitorElements(toTable, dirName);
249 
250  // store the list in the map
251  toMap[dirName] = toTable;
252  }
253 
254  // create a DQMEvent message for each top-level folder
255  // and write each to the shared memory
256  for (dirIter = topLevelFolderList.begin();
257  dirIter != topLevelFolderList.end();
258  dirIter++) {
259  std::string dirName = *dirIter;
260  DQMEvent::TObjectTable toTable = toMap[dirName];
261  if (toTable.size() == 0) {continue;}
262 
263  // serialize the monitor element data
266 
267  // resize the message buffer, if needed
268  unsigned int srcSize = serializeWorker_.currentSpaceUsed();
269  unsigned int newSize = srcSize + 50000; // allow for header
270  if (messageBuffer_.size() < newSize) messageBuffer_.resize(newSize);
271 
272  // create the message
273  DQMEventMsgBuilder dqmMsgBuilder(&messageBuffer_[0], messageBuffer_.size(),
274  lb.run(), lb.luminosityBlock(),
275  lb.endTime(),
276  lumiSectionTag, updateNumber_,
278  host_name_,
280  toTable);
281 
282  // copy the serialized data into the message
283  unsigned char* src = serializeWorker_.bufferPointer();
284  std::copy(src,src + srcSize, dqmMsgBuilder.eventAddress());
285  dqmMsgBuilder.setEventLength(srcSize);
286  if (useCompression_) {
287  dqmMsgBuilder.setCompressionFlag(serializeWorker_.currentEventSize());
288  }
289 
290  // write the filter unit UUID and PID into the message
291  dqmMsgBuilder.setFUProcessId(getpid());
292  dqmMsgBuilder.setFUGuid(fuGuidValue_);
293 
294  // send the message
295  writeShmDQMData(dqmMsgBuilder);
296 // std::cout << getpid() << ": :" // << gettid()
297 // << ":DQMOutputService DONE sending update for lumiSection " << thisLumiSection << std::endl;
298  if(mss) mss->setMicroState(&input);
299 
300  }
301 
302  // reset monitor elements that have requested it
303  // TODO - enable this
304  //bei->doneSending(true, true);
305 
306  // update the "previous" lumi section
307  lumiSectionOfPreviousUpdate_ = thisLumiSection;
308  nbUpdates_++;
309  updateNumber_++;
310 }
311 
317 {
318 
320 }
321 
327  const edm::Timestamp &timestamp)
328 {
329  nbUpdates_ = 0;
330  updateNumber_ = 0;
332 }
333 
339 {
340  // since the service is not destroyed we need to take care of endjob items here
342 }
343 
349  std::string folderPath)
350 {
351  if (bei == NULL) {return;}
352 
353  // fetch the monitor elements in the specified directory
354  std::vector<MonitorElement *> localMEList = bei->getContents(folderPath);
355  //MonitorElementRootFolder* folderPtr = bei->getDirectory(folderPath);
356 
357  // add the MEs that should be updated to the table
358  std::vector<TObject *> updateTOList;
359  for (int idx = 0; idx < (int) localMEList.size(); idx++) {
360  MonitorElement *mePtr = localMEList[idx];
361  // if (mePtr->wasUpdated()) { // @@EM send updated and not (to be revised)
362  updateTOList.push_back(mePtr->getRootObject());
363  // }
364  }
365  if (updateTOList.size() > 0) {
366  toTable[folderPath] = updateTOList;
367  }
368 
369  // find the subdirectories in this folder
370  // (checking if the directory exists is probably overkill,
371  // but we really don't want to create new folders using
372  // setCurrentFolder())
373  if (bei->dirExists(folderPath)) {
374  bei->setCurrentFolder(folderPath);
375  std::vector<std::string> subDirList = bei->getSubdirs();
376 
377  // loop over the subdirectories, find the MEs in each one
378  std::vector<std::string>::const_iterator dirIter;
379  for (dirIter = subDirList.begin(); dirIter != subDirList.end(); dirIter++) {
380  std::string subDirPath = (*dirIter);
381  findMonitorElements(toTable, subDirPath);
382  }
383  }
384 }
385 
390 {
391  // fetch the location and size of the message buffer
392  unsigned char* buffer = (unsigned char*) dqmMsgBuilder.startAddress();
393  unsigned int size = dqmMsgBuilder.size();
394 
395  // fetch the run, event, and folder number for addition to the I2O fragments
396  DQMEventMsgView dqmMsgView(buffer);
397  unsigned int runid = dqmMsgView.runNumber();
398  unsigned int eventid = dqmMsgView.eventNumberAtUpdate();
399 
400  // We need to generate an unique 32 bit ID from the top folder name
401  std::string topFolder = dqmMsgView.topFolderName();
402  uLong crc = crc32(0L, Z_NULL, 0);
403  Bytef* buf = (Bytef*)topFolder.data();
404  crc = crc32(crc, buf, topFolder.length());
405 
406  if(!shmBuffer_) {
407  edm::LogError("FUDQMShmOutputService")
408  << " Error writing to shared memory as shm is not available";
409  } else {
410  bool ret = shmBuffer_->writeDqmEventData(runid, eventid, (unsigned int)crc,
411  getpid(), fuGuidValue_, buffer, size);
412  if(!ret) edm::LogError("FUShmDQMOutputService") << " Error with writing data to ShmBuffer";
413  }
414 
415 }
416 
418  attach_=true;
419 }
420 
422 {
423  if(0==shmBuffer_) {
425  if (0==shmBuffer_) {
426  edm::LogError("FUDQMShmOutputService")<<"Failed to attach to shared memory";
427  return false;
428  }
429  return true;
430  }
431  return false;
432 
433 }
434 
435 
436 
438 {
439  if(0!=shmBuffer_) {
440  shmdt(shmBuffer_);
441  shmBuffer_ = 0;
442  }
443  return true;
444 }
T getParameter(std::string const &) const
T getUntrackedParameter(std::string const &, T const &) const
#define Input(cl)
Definition: vmac.h:189
FUShmDQMOutputService(const edm::ParameterSet &pset, edm::ActivityRegistry &actReg)
uint32_t adler32_chksum() const
std::vector< std::string > getSubdirs(void) const
Definition: DQMStore.cc:1424
void watchPostEndJob(PostEndJob::slot_type const &iSlot)
bool writeDqmEventData(unsigned int runNumber, unsigned int evtAtUpdate, unsigned int folderId, unsigned int fuProcessId, unsigned int fuGuid, unsigned char *data, unsigned int dataSize)
Definition: FUShmBuffer.cc:820
static FUShmBuffer * getShmBuffer()
Definition: FUShmBuffer.cc:986
void postEndLumi(edm::LuminosityBlock const &, edm::EventSetup const &)
void writeShmDQMData(DQMEventMsgBuilder const &dqmMsgBuilder)
void cd(void)
go to top directory (ie. root)
Definition: DQMStore.cc:411
void watchPostSourceConstruction(PostSourceConstruction::slot_type const &iSlot)
#define NULL
Definition: scimark2.h:8
void watchPostEndLumi(PostEndLumi::slot_type const &iSlot)
uint32 eventNumberAtUpdate() const
int serializeDQMEvent(DQMEvent::TObjectTable &toTable, bool use_compression, int compression_level)
void setMicroState(std::string const *)
unsigned int currentEventSize() const
std::vector< char > messageBuffer_
LuminosityBlockNumber_t luminosityBlock() const
std::string const toString() const
Automatic conversion from string reprentation.
Definition: Guid.cc:41
edm::StreamDQMSerializer serializeWorker_
std::string topFolderName() const
Definition: Guid.h:23
unsigned int lumiSectionOfPreviousUpdate_
Timestamp const & endTime() const
void publish(xdata::InfoSpace *)
unsigned int currentSpaceUsed() const
RunNumber_t run() const
unsigned char * bufferPointer() const
std::vector< MonitorElement * > getContents(const std::string &path) const
Definition: DQMStore.cc:1502
evf::FUShmBuffer * shmBuffer_
unsigned int uint32
Definition: MsgTools.h:13
bool dirExists(const std::string &path) const
true if directory exists
Definition: DQMStore.cc:498
tuple out
Definition: dbtoconf.py:99
std::string getReleaseVersion()
xdata::UnsignedInteger32 nbUpdates_
TObject * getRootObject(void) const
tuple idx
DEBUGGING if hasattr(process,&quot;trackMonIterativeTracking2012&quot;): print &quot;trackMonIterativeTracking2012 D...
void preBeginRun(const edm::RunID &runID, const edm::Timestamp &timestamp)
void postSourceConstructionProcessing(const edm::ModuleDescription &modDesc)
void findMonitorElements(DQMEvent::TObjectTable &toTable, std::string folderPath)
#define Output(cl)
Definition: vmac.h:193
uint8 * startAddress() const
void defaultWebPage(xgi::Input *in, xgi::Output *out)
std::map< std::string, std::vector< TObject * > > TObjectTable
uint32 runNumber() const
tuple size
Write out results.
void setCurrentFolder(const std::string &fullpath)
Definition: DQMStore.cc:434
void watchPreBeginRun(PreBeginRun::slot_type const &iSlot)