CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
FUShmDQMOutputService.cc
Go to the documentation of this file.
1 
33 #include "TClass.h"
34 #include "zlib.h"
35 #include <unistd.h>
36 #include <sys/types.h>
37 
38 using namespace std;
39 
45 #define DSS_DEBUG 0
46 
52 
57  edm::ActivityRegistry &actReg)
58  : evf::ServiceWeb("FUShmDQMOutputService")
59  , updateNumber_(0)
60  , shmBuffer_(0)
61  , nbUpdates_(0)
62  , input("INPUT")
63  , dqm("DQM")
64 {
65 
66  // specify the routine to be called after event processing. This routine
67  // will be used to periodically fetch monitor elements from the DQM
68  // backend and write out to shared memory for sending to the storage manager.
70 
71  // specify the routine to be called after the input source has been
72  // constructed. This routine will be used to initialize our connection
73  // to the storage manager and any other needed setup.??
74  actReg.watchPostSourceConstruction(this,
76 
77  // specify the routine to be called when a run begins
79 
80  // specify the routine to be called when the job has finished. It will
81  // be used to disconnect from the SM, if needed, and any other shutdown
82  // tasks that are needed.??
84 
85  // set internal values from the parameter set
86  int initialSize =
87  pset.getUntrackedParameter<int>("initialMessageBufferSize", 1000000);
88  messageBuffer_.resize(initialSize);
89  lumiSectionsPerUpdate_ = pset.getParameter<double>("lumiSectionsPerUpdate");
90  // for the moment, only support a number of lumi sections per update >= 1
93  useCompression_ = pset.getParameter<bool>("useCompression");
94  compressionLevel_ = pset.getParameter<int>("compressionLevel");
95  // the default for lumiSectionInterval_ is 0, meaning get it from the event
96  // otherwise we get a fake one that should match the fake lumi block
97  // for events (if any) as long as the time between lumi blocks is larger
98  // than the time difference between creating this service and the
99  // FUShmOutputModule event output module
101  pset.getUntrackedParameter<int>("lumiSectionInterval", 0); // seconds
102  if (lumiSectionInterval_ < 1) {lumiSectionInterval_ = 0;}
103 
104  // for fake test luminosity sections
105  struct timeval now;
106  struct timezone dummyTZ;
107  gettimeofday(&now, &dummyTZ);
108  // we will count lumi section numbers from this time
109  timeInSecSinceUTC_ = static_cast<double>(now.tv_sec) + (static_cast<double>(now.tv_usec)/1000000.0);
110 
111  int got_host = gethostname(host_name_, sizeof(host_name_));
112  if(got_host != 0) strcpy(host_name_, "noHostNameFoundOrTooLong");
113 
114  if (! fuIdsInitialized_) {
115  fuIdsInitialized_ = true;
116 
117  edm::Guid guidObj(true);
118  std::string guidString = guidObj.toString();
119  //std::cout << "DQMOutput GUID string = " << guidString << std::endl;
120 
121  uLong crc = crc32(0L, Z_NULL, 0);
122  Bytef* buf = (Bytef*)guidString.data();
123  crc = crc32(crc, buf, guidString.length());
124  fuGuidValue_ = crc;
125 
126  //std::cout << "DQMOutput GUID value = 0x" << std::hex << fuGuidValue_ << std::endl;
127  }
128 }
129 
134 {
135  shmdt(shmBuffer_);
136 }
137 
139 {
140 }
141 
142 void FUShmDQMOutputService::publish(xdata::InfoSpace *is)
143 {
144  try{
145  is->fireItemAvailable("nbDqmUpdates",&nbUpdates_);
146  }
147  catch(xdata::exception::Exception &e){
148  edm::LogInfo("FUShmDQMOutputService")
149  << " exception when publishing to InfoSpace ";
150  }
151 }
152 
154 {
155  evf::MicroStateService *mss = 0;
156  try{
158  if(mss) mss->setMicroState(&dqm);
159  }
160  catch(...) {
161  edm::LogError("FUShmDQMOutputService")<< "exception when trying to get service MicroStateService";
162  }
163 
164 
165  // fake the luminosity section if we don't want to use the real one
166  unsigned int thisLumiSection = 0;
167  if(lumiSectionInterval_ == 0)
168  thisLumiSection = lb.luminosityBlock();
169  else {
170  // match the code in Event output module to get the same (almost) lumi number
171  struct timeval now;
172  struct timezone dummyTZ;
173  gettimeofday(&now, &dummyTZ);
174  double timeInSec = static_cast<double>(now.tv_sec) + (static_cast<double>(now.tv_usec)/1000000.0) - timeInSecSinceUTC_;
175  // what about overflows?
176  if(lumiSectionInterval_ > 0) thisLumiSection = static_cast<uint32>(timeInSec/lumiSectionInterval_);
177  }
178 
179  // special handling for the first event
181  initializationIsNeeded_ = false;
182  lumiSectionOfPreviousUpdate_ = thisLumiSection;
183  firstLumiSectionSeen_ = thisLumiSection;
184 
185  // for when a run(job) had ended and we start a new run(job)
186  // for fake test luminosity sections
187  struct timeval now;
188  struct timezone dummyTZ;
189  gettimeofday(&now, &dummyTZ);
190  // we will count lumi section numbers from this time
191  timeInSecSinceUTC_ = static_cast<double>(now.tv_sec) + (static_cast<double>(now.tv_usec)/1000000.0);
192  }
193 
194  // std::cout << getpid() << ": :" //<< gettid()
195  // << ":DQMOutputService check if have to send update for lumiSection " << thisLumiSection << std::endl;
196  if(thisLumiSection%4!=0)
197  {
198 // std::cout << getpid() << ": :" //<< gettid()
199 // << ":DQMOutputService skipping update for lumiSection " << thisLumiSection << std::endl;
200  if(mss) mss->setMicroState(&input);
201  return;
202  }
203 // std::cout << getpid() << ": :" //<< gettid()
204 // << ":DQMOutputService sending update for lumiSection " << thisLumiSection << std::endl;
205  // Calculate the update ID and lumi ID for this update
206  // fullUpdateRatio and fullLsDelta are unused. comment out the calculation.
207  //int fullLsDelta = (int) (thisLumiSection - firstLumiSectionSeen_);
208  //double fullUpdateRatio = ((double) fullLsDelta) / lumiSectionsPerUpdate_;
209  // this is the update number starting from zero
210 
211  // this is the actual luminosity section number for the beginning lumi section of this update
212  unsigned int lumiSectionTag = thisLumiSection;
213 
214  // retry the lookup of the backend interface, if needed
215  if (bei == NULL) {
217  }
218 
219  // to go any further, a backend interface pointer is crucial
220  if (bei == NULL) {
221  throw cms::Exception("postEventProcessing", "FUShmDQMOutputService")
222  << "Unable to lookup the DQMStore service!\n";
223  }
224 
225  // determine the top level folders (these will be used for grouping
226  // monitor elements into DQM Events)
227  std::vector<std::string> topLevelFolderList;
228  //std::cout << "### SenderService, pwd = " << bei->pwd() << std::endl;
229  bei->cd();
230  //std::cout << "### SenderService, pwd = " << bei->pwd() << std::endl;
231  topLevelFolderList = bei->getSubdirs();
232 
233  // find the monitor elements under each top level folder (including
234  // subdirectories)
235  std::map< std::string, DQMEvent::TObjectTable > toMap;
236  std::vector<std::string>::const_iterator dirIter;
237  for (dirIter = topLevelFolderList.begin();
238  dirIter != topLevelFolderList.end();
239  dirIter++) {
240  std::string dirName = *dirIter;
241  DQMEvent::TObjectTable toTable;
242 
243  // find the MEs
244  findMonitorElements(toTable, dirName);
245 
246  // store the list in the map
247  toMap[dirName] = toTable;
248  }
249 
250  // create a DQMEvent message for each top-level folder
251  // and write each to the shared memory
252  for (dirIter = topLevelFolderList.begin();
253  dirIter != topLevelFolderList.end();
254  dirIter++) {
255  std::string dirName = *dirIter;
256  DQMEvent::TObjectTable toTable = toMap[dirName];
257  if (toTable.size() == 0) {continue;}
258 
259  // serialize the monitor element data
262 
263  // resize the message buffer, if needed
264  unsigned int srcSize = serializeWorker_.currentSpaceUsed();
265  unsigned int newSize = srcSize + 50000; // allow for header
266  if (messageBuffer_.size() < newSize) messageBuffer_.resize(newSize);
267 
268  // create the message
269  DQMEventMsgBuilder dqmMsgBuilder(&messageBuffer_[0], messageBuffer_.size(),
270  lb.run(), lb.luminosityBlock(),
271  lb.endTime(),
272  lumiSectionTag, updateNumber_,
274  host_name_,
276  toTable);
277 
278  // copy the serialized data into the message
279  unsigned char* src = serializeWorker_.bufferPointer();
280  std::copy(src,src + srcSize, dqmMsgBuilder.eventAddress());
281  dqmMsgBuilder.setEventLength(srcSize);
282  if (useCompression_) {
283  dqmMsgBuilder.setCompressionFlag(serializeWorker_.currentEventSize());
284  }
285 
286  // write the filter unit UUID and PID into the message
287  dqmMsgBuilder.setFUProcessId(getpid());
288  dqmMsgBuilder.setFUGuid(fuGuidValue_);
289 
290  // send the message
291  writeShmDQMData(dqmMsgBuilder);
292 // std::cout << getpid() << ": :" // << gettid()
293 // << ":DQMOutputService DONE sending update for lumiSection " << thisLumiSection << std::endl;
294  if(mss) mss->setMicroState(&input);
295 
296  }
297 
298  // reset monitor elements that have requested it
299  // TODO - enable this
300  //bei->doneSending(true, true);
301 
302  // update the "previous" lumi section
303  lumiSectionOfPreviousUpdate_ = thisLumiSection;
304  nbUpdates_++;
305  updateNumber_++;
306 }
307 
313 {
314 
316 }
317 
323  const edm::Timestamp &timestamp)
324 {
325  nbUpdates_ = 0;
326  updateNumber_ = 0;
328 }
329 
335 {
336  // since the service is not destroyed we need to take care of endjob items here
338 }
339 
345  std::string folderPath)
346 {
347  if (bei == NULL) {return;}
348 
349  // fetch the monitor elements in the specified directory
350  std::vector<MonitorElement *> localMEList = bei->getContents(folderPath);
351  //MonitorElementRootFolder* folderPtr = bei->getDirectory(folderPath);
352 
353  // add the MEs that should be updated to the table
354  std::vector<TObject *> updateTOList;
355  for (int idx = 0; idx < (int) localMEList.size(); idx++) {
356  MonitorElement *mePtr = localMEList[idx];
357  // if (mePtr->wasUpdated()) { // @@EM send updated and not (to be revised)
358  updateTOList.push_back(mePtr->getRootObject());
359  // }
360  }
361  if (updateTOList.size() > 0) {
362  toTable[folderPath] = updateTOList;
363  }
364 
365  // find the subdirectories in this folder
366  // (checking if the directory exists is probably overkill,
367  // but we really don't want to create new folders using
368  // setCurrentFolder())
369  if (bei->dirExists(folderPath)) {
370  bei->setCurrentFolder(folderPath);
371  std::vector<std::string> subDirList = bei->getSubdirs();
372 
373  // loop over the subdirectories, find the MEs in each one
374  std::vector<std::string>::const_iterator dirIter;
375  for (dirIter = subDirList.begin(); dirIter != subDirList.end(); dirIter++) {
376  std::string subDirPath = (*dirIter);
377  findMonitorElements(toTable, subDirPath);
378  }
379  }
380 }
381 
386 {
387  // fetch the location and size of the message buffer
388  unsigned char* buffer = (unsigned char*) dqmMsgBuilder.startAddress();
389  unsigned int size = dqmMsgBuilder.size();
390 
391  // fetch the run, event, and folder number for addition to the I2O fragments
392  DQMEventMsgView dqmMsgView(buffer);
393  unsigned int runid = dqmMsgView.runNumber();
394  unsigned int eventid = dqmMsgView.eventNumberAtUpdate();
395 
396  // We need to generate an unique 32 bit ID from the top folder name
397  std::string topFolder = dqmMsgView.topFolderName();
398  uLong crc = crc32(0L, Z_NULL, 0);
399  Bytef* buf = (Bytef*)topFolder.data();
400  crc = crc32(crc, buf, topFolder.length());
401 
402  if(!shmBuffer_) {
403  edm::LogError("FUDQMShmOutputService")
404  << " Error writing to shared memory as shm is not available";
405  } else {
406  bool ret = shmBuffer_->writeDqmEventData(runid, eventid, (unsigned int)crc,
407  getpid(), fuGuidValue_, buffer, size);
408  if(!ret) edm::LogError("FUShmDQMOutputService") << " Error with writing data to ShmBuffer";
409  }
410 
411 }
412 
413 
414 
416 {
417  if(0==shmBuffer_) {
419  if (0==shmBuffer_) {
420  edm::LogError("FUDQMShmOutputService")<<"Failed to attach to shared memory";
421  return false;
422  }
423  return true;
424  }
425  return false;
426 
427 }
428 
429 
430 
432 {
433  if(0!=shmBuffer_) {
434  shmdt(shmBuffer_);
435  shmBuffer_ = 0;
436  }
437  return true;
438 }
T getParameter(std::string const &) const
T getUntrackedParameter(std::string const &, T const &) const
#define Input(cl)
Definition: vmac.h:189
FUShmDQMOutputService(const edm::ParameterSet &pset, edm::ActivityRegistry &actReg)
uint32_t adler32_chksum() const
std::vector< std::string > getSubdirs(void) const
Definition: DQMStore.cc:1216
void watchPostEndJob(PostEndJob::slot_type const &iSlot)
bool writeDqmEventData(unsigned int runNumber, unsigned int evtAtUpdate, unsigned int folderId, unsigned int fuProcessId, unsigned int fuGuid, unsigned char *data, unsigned int dataSize)
Definition: FUShmBuffer.cc:772
static FUShmBuffer * getShmBuffer()
Definition: FUShmBuffer.cc:915
void postEndLumi(edm::LuminosityBlock const &, edm::EventSetup const &)
void writeShmDQMData(DQMEventMsgBuilder const &dqmMsgBuilder)
void cd(void)
go to top directory (ie. root)
Definition: DQMStore.cc:209
void watchPostSourceConstruction(PostSourceConstruction::slot_type const &iSlot)
#define NULL
Definition: scimark2.h:8
void watchPostEndLumi(PostEndLumi::slot_type const &iSlot)
uint32 eventNumberAtUpdate() const
int serializeDQMEvent(DQMEvent::TObjectTable &toTable, bool use_compression, int compression_level)
void setMicroState(std::string const *)
unsigned int currentEventSize() const
std::vector< char > messageBuffer_
LuminosityBlockNumber_t luminosityBlock() const
std::string const toString() const
Automatic conversion from string reprentation.
Definition: Guid.cc:44
edm::StreamDQMSerializer serializeWorker_
std::string topFolderName() const
Definition: Guid.h:23
unsigned int lumiSectionOfPreviousUpdate_
Timestamp const & endTime() const
void publish(xdata::InfoSpace *)
unsigned int currentSpaceUsed() const
RunNumber_t run() const
unsigned char * bufferPointer() const
std::vector< MonitorElement * > getContents(const std::string &path) const
Definition: DQMStore.cc:1294
evf::FUShmBuffer * shmBuffer_
unsigned int uint32
Definition: MsgTools.h:13
tuple input
Definition: collect_tpl.py:10
bool dirExists(const std::string &path) const
true if directory exists
Definition: DQMStore.cc:296
tuple out
Definition: dbtoconf.py:99
std::string getReleaseVersion()
xdata::UnsignedInteger32 nbUpdates_
TObject * getRootObject(void) const
void preBeginRun(const edm::RunID &runID, const edm::Timestamp &timestamp)
void postSourceConstructionProcessing(const edm::ModuleDescription &modDesc)
void findMonitorElements(DQMEvent::TObjectTable &toTable, std::string folderPath)
#define Output(cl)
Definition: vmac.h:193
uint8 * startAddress() const
void defaultWebPage(xgi::Input *in, xgi::Output *out)
std::map< std::string, std::vector< TObject * > > TObjectTable
uint32 runNumber() const
tuple size
Write out results.
void setCurrentFolder(const std::string &fullpath)
Definition: DQMStore.cc:232
void watchPreBeginRun(PreBeginRun::slot_type const &iSlot)
tuple src
Definition: align_tpl.py:87