CMS 3D CMS Logo

FUShmDQMOutputService.cc

Go to the documentation of this file.
00001 
00024 #include "EventFilter/Modules/interface/FUShmDQMOutputService.h"
00025 #include "FWCore/ServiceRegistry/interface/Service.h"
00026 #include "FWCore/Utilities/interface/GetReleaseVersion.h"
00027 #include "FWCore/MessageLogger/interface/MessageLogger.h"
00028 #include "DQMServices/Core/interface/MonitorElement.h"
00029 #include "DQMServices/Core/interface/DQMStore.h"
00030 #include "FWCore/Utilities/src/Guid.h"
00031 #include "DataFormats/Provenance/interface/ModuleDescription.h"
00032 #include "TClass.h"
00033 #include "zlib.h"
00034 
00035 using namespace std;
00036 
00042 #define DSS_DEBUG 0
00043 
00047 bool FUShmDQMOutputService::fuIdsInitialized_ = false;
00048 uint32 FUShmDQMOutputService::fuProcId_ = 0;
00049 uint32 FUShmDQMOutputService::fuGuidValue_ = 0;
00050 
00054 FUShmDQMOutputService::FUShmDQMOutputService(const edm::ParameterSet &pset,
00055                                    edm::ActivityRegistry &actReg):
00056   shmBuffer_(0)
00057 {
00058   if (DSS_DEBUG) {cout << "FUShmDQMOutputService Constructor" << endl;}
00059 
00060   // specify the routine to be called after event processing.  This routine
00061   // will be used to periodically fetch monitor elements from the DQM
00062   // backend and write out to shared memory for sending to the storage manager.
00063   actReg.watchPostProcessEvent(this, &FUShmDQMOutputService::postEventProcessing);
00064 
00065   // specify the routine to be called after the input source has been
00066   // constructed.  This routine will be used to initialize our connection
00067   // to the storage manager and any other needed setup.??
00068   actReg.watchPostSourceConstruction(this,
00069          &FUShmDQMOutputService::postSourceConstructionProcessing);
00070 
00071   // specify the routine to be called when a run begins
00072   actReg.watchPreBeginRun(this, &FUShmDQMOutputService::preBeginRun);
00073 
00074   // specify the routine to be called when the job has finished.  It will
00075   // be used to disconnect from the SM, if needed, and any other shutdown
00076   // tasks that are needed.??
00077   actReg.watchPostEndJob(this, &FUShmDQMOutputService::postEndJobProcessing);
00078 
00079   // helpful callbacks when trying to understand the signals that are
00080   // available to framework services
00081   if (DSS_DEBUG >= 2) {
00082     actReg.watchPostBeginJob(this, &FUShmDQMOutputService::postBeginJobProcessing);
00083     actReg.watchPreSource(this, &FUShmDQMOutputService::preSourceProcessing);
00084     actReg.watchPostSource(this, &FUShmDQMOutputService::postSourceProcessing);
00085     actReg.watchPreModule(this, &FUShmDQMOutputService::preModuleProcessing);
00086     actReg.watchPostModule(this, &FUShmDQMOutputService::postModuleProcessing);
00087     actReg.watchPreSourceConstruction(this,
00088            &FUShmDQMOutputService::preSourceConstructionProcessing);
00089     actReg.watchPreModuleConstruction(this,
00090            &FUShmDQMOutputService::preModuleConstructionProcessing);
00091     actReg.watchPostModuleConstruction(this,
00092            &FUShmDQMOutputService::postModuleConstructionProcessing);
00093   }
00094 
00095   // set internal values from the parameter set
00096   int initialSize =
00097     pset.getUntrackedParameter<int>("initialMessageBufferSize", 1000000);
00098   messageBuffer_.resize(initialSize);
00099   lumiSectionsPerUpdate_ = pset.getParameter<double>("lumiSectionsPerUpdate");
00100   // for the moment, only support a number of lumi sections per update >= 1
00101   if (lumiSectionsPerUpdate_ <= 1.0) {lumiSectionsPerUpdate_ = 1.0;}
00102   initializationIsNeeded_ = true;
00103   useCompression_ = pset.getParameter<bool>("useCompression");
00104   compressionLevel_ = pset.getParameter<int>("compressionLevel");
00105   // the default for lumiSectionInterval_ is 0, meaning get it from the event
00106   // otherwise we get a fake one that should match the fake lumi block
00107   // for events (if any) as long as the time between lumi blocks is larger
00108   // than the time difference between creating this service and the 
00109   // FUShmOutputModule event output module
00110   lumiSectionInterval_ =
00111     pset.getUntrackedParameter<int>("lumiSectionInterval", 0); // seconds
00112   if (lumiSectionInterval_ < 1) {lumiSectionInterval_ = 0;}
00113 
00114   // for fake test luminosity sections
00115   struct timeval now;
00116   struct timezone dummyTZ;
00117   gettimeofday(&now, &dummyTZ);
00118   // we will count lumi section numbers from this time
00119   timeInSecSinceUTC_ = static_cast<double>(now.tv_sec) + (static_cast<double>(now.tv_usec)/1000000.0);
00120 
00121   if (! fuIdsInitialized_) {
00122     fuIdsInitialized_ = true;
00123 
00124     edm::Guid guidObj(true);
00125     std::string guidString = guidObj.toString();
00126     //std::cout << "DQMOutput GUID string = " << guidString << std::endl;
00127 
00128     uLong crc = crc32(0L, Z_NULL, 0);
00129     Bytef* buf = (Bytef*)guidString.data();
00130     crc = crc32(crc, buf, guidString.length());
00131     fuGuidValue_ = crc;
00132 
00133     fuProcId_ = getpid();
00134     //std::cout << "DQMOutput GUID value = 0x" << std::hex << fuGuidValue_ << std::dec
00135     //          << " for PID = " << fuProcId_ << std::endl;
00136   }
00137 }
00138 
00142 FUShmDQMOutputService::~FUShmDQMOutputService(void)
00143 {
00144   if (DSS_DEBUG) {cout << "FUShmDQMOutputService Destructor" << endl;}
00145   shmdt(shmBuffer_);
00146 }
00147 
00155 void FUShmDQMOutputService::postEventProcessing(const edm::Event &event,
00156                                            const edm::EventSetup &eventSetup)
00157 {
00158   // fake the luminosity section if we don't want to use the real one
00159   unsigned int thisLumiSection = 0;
00160   if(lumiSectionInterval_ == 0)
00161     thisLumiSection = event.luminosityBlock();
00162   else {
00163     // match the code in Event output module to get the same (almost) lumi number
00164     struct timeval now;
00165     struct timezone dummyTZ;
00166     gettimeofday(&now, &dummyTZ);
00167     double timeInSec = static_cast<double>(now.tv_sec) + (static_cast<double>(now.tv_usec)/1000000.0) - timeInSecSinceUTC_;
00168     // what about overflows?
00169     if(lumiSectionInterval_ > 0) thisLumiSection = static_cast<uint32>(timeInSec/lumiSectionInterval_);
00170   }
00171 
00172   if (DSS_DEBUG) {
00173     cout << "FUShmDQMOutputService::postEventProcessing called, event number "
00174          << event.id().event() << ", lumi section "
00175          << thisLumiSection << endl;
00176   }
00177 
00178   // special handling for the first event
00179   if (initializationIsNeeded_) {
00180     initializationIsNeeded_ = false;
00181     lumiSectionOfPreviousUpdate_ = thisLumiSection;
00182     firstLumiSectionSeen_ = thisLumiSection;
00183 
00184     // for when a run(job) had ended and we start a new run(job)
00185     // for fake test luminosity sections
00186     struct timeval now;
00187     struct timezone dummyTZ;
00188     gettimeofday(&now, &dummyTZ);
00189     // we will count lumi section numbers from this time
00190     timeInSecSinceUTC_ = static_cast<double>(now.tv_sec) + (static_cast<double>(now.tv_usec)/1000000.0);
00191   }
00192 
00193   // We send a DQMEvent when the correct number of luminosity sections have passed
00194   // but this will occur here for the first event of a new lumi section which
00195   // means the data for the first event of this new lumi section is always added to the
00196   // to the DQM data for the update for the previous lumi section - beware!
00197   // Can only correct in this postEventProcessing stage if we knew this is the last
00198   // event of a lumi section. (There is no preEventProcessing possibility?)
00199 
00200   // only continue if the correct number of luminosity sections have passed
00201   int lsDelta = (int) (thisLumiSection - lumiSectionOfPreviousUpdate_);
00202   double updateRatio = ((double) lsDelta) / lumiSectionsPerUpdate_;
00203   if (updateRatio < 1.0) {return;}
00204 
00205   // CAlculate the update ID and lumi ID for this update
00206   int fullLsDelta = (int) (thisLumiSection - firstLumiSectionSeen_);
00207   double fullUpdateRatio = ((double) fullLsDelta) / lumiSectionsPerUpdate_;
00208   // this is the update number starting from zero
00209   uint32 updateNumber = -1 + (uint32) fullUpdateRatio;
00210   // this is the actual luminosity section number for the beginning lumi section of this update
00211   unsigned int lumiSectionTag = firstLumiSectionSeen_ +
00212     ((int) (updateNumber * lumiSectionsPerUpdate_));
00213 
00214   // retry the lookup of the backend interface, if needed
00215   if (bei == NULL) {
00216     bei = edm::Service<DQMStore>().operator->();
00217   }
00218 
00219   // to go any further, a backend interface pointer is crucial
00220   if (bei == NULL) {
00221     throw cms::Exception("postEventProcessing", "FUShmDQMOutputService")
00222       << "Unable to lookup the DQMStore service!\n";
00223   }
00224 
00225   // determine the top level folders (these will be used for grouping
00226   // monitor elements into DQM Events)
00227   std::vector<std::string> topLevelFolderList;
00228   //std::cout << "### SenderService, pwd = " << bei->pwd() << std::endl;
00229   bei->cd();
00230   //std::cout << "### SenderService, pwd = " << bei->pwd() << std::endl;
00231   topLevelFolderList = bei->getSubdirs();
00232 
00233   // find the monitor elements under each top level folder (including
00234   // subdirectories)
00235   std::map< std::string, DQMEvent::TObjectTable > toMap;
00236   std::vector<std::string>::const_iterator dirIter;
00237   for (dirIter = topLevelFolderList.begin();
00238        dirIter != topLevelFolderList.end();
00239        dirIter++) {
00240     std::string dirName = *dirIter;
00241     DQMEvent::TObjectTable toTable;
00242 
00243     // find the MEs
00244     findMonitorElements(toTable, dirName);
00245 
00246     // store the list in the map
00247     toMap[dirName] = toTable;
00248   }
00249 
00250   // create a DQMEvent message for each top-level folder
00251   // and write each to the shared memory
00252   for (dirIter = topLevelFolderList.begin();
00253        dirIter != topLevelFolderList.end();
00254        dirIter++) {
00255     std::string dirName = *dirIter;
00256     DQMEvent::TObjectTable toTable = toMap[dirName];
00257     if (toTable.size() == 0) {continue;}
00258 
00259     // serialize the monitor element data
00260     serializeWorker_.serializeDQMEvent(toTable, useCompression_,
00261                                        compressionLevel_);
00262 
00263     // resize the message buffer, if needed 
00264     unsigned int srcSize = serializeWorker_.currentSpaceUsed();
00265     unsigned int newSize = srcSize + 50000;  // allow for header
00266     if (messageBuffer_.size() < newSize) messageBuffer_.resize(newSize);
00267 
00268     // create the message
00269     DQMEventMsgBuilder dqmMsgBuilder(&messageBuffer_[0], messageBuffer_.size(),
00270                                      event.id().run(), event.id().event(),
00271                                      event.time(),
00272                                      lumiSectionTag, updateNumber,
00273                                      edm::getReleaseVersion(), dirName,
00274                                      toTable);
00275 
00276     // copy the serialized data into the message
00277     unsigned char* src = serializeWorker_.bufferPointer();
00278     std::copy(src,src + srcSize, dqmMsgBuilder.eventAddress());
00279     dqmMsgBuilder.setEventLength(srcSize);
00280     if (useCompression_) {
00281       dqmMsgBuilder.setCompressionFlag(serializeWorker_.currentEventSize());
00282     }
00283 
00284     // write the filter unit UUID and PID into the message
00285     dqmMsgBuilder.setFUProcessId(fuProcId_);
00286     dqmMsgBuilder.setFUGuid(fuGuidValue_);
00287 
00288     // send the message
00289     writeShmDQMData(dqmMsgBuilder);
00290 
00291     // test deserialization
00292     if (DSS_DEBUG >= 3) {
00293       DQMEventMsgView dqmEventView(&messageBuffer_[0]);
00294       std::cout << "  DQM Message data:" << std::endl; 
00295       std::cout << "    protocol version = "
00296                 << dqmEventView.protocolVersion() << std::endl; 
00297       std::cout << "    header size = "
00298                 << dqmEventView.headerSize() << std::endl; 
00299       std::cout << "    run number = "
00300                 << dqmEventView.runNumber() << std::endl; 
00301       std::cout << "    event number = "
00302                 << dqmEventView.eventNumberAtUpdate() << std::endl; 
00303       std::cout << "    lumi section = "
00304                 << dqmEventView.lumiSection() << std::endl; 
00305       std::cout << "    update number = "
00306                 << dqmEventView.updateNumber() << std::endl; 
00307       std::cout << "    compression flag = "
00308                 << dqmEventView.compressionFlag() << std::endl; 
00309       std::cout << "    reserved word = "
00310                 << dqmEventView.reserved() << std::endl; 
00311       std::cout << "    release tag = "
00312                 << dqmEventView.releaseTag() << std::endl; 
00313       std::cout << "    top folder name = "
00314                 << dqmEventView.topFolderName() << std::endl; 
00315       std::cout << "    sub folder count = "
00316                 << dqmEventView.subFolderCount() << std::endl; 
00317       std::auto_ptr<DQMEvent::TObjectTable> toTablePtr =
00318         deserializeWorker_.deserializeDQMEvent(dqmEventView);
00319       DQMEvent::TObjectTable::const_iterator toIter;
00320       for (toIter = toTablePtr->begin();
00321            toIter != toTablePtr->end(); toIter++) {
00322         std::string subFolderName = toIter->first;
00323         std::cout << "  folder = " << subFolderName << std::endl;
00324         std::vector<TObject *> toList = toIter->second;
00325         for (int tdx = 0; tdx < (int) toList.size(); tdx++) {
00326           TObject *toPtr = toList[tdx];
00327           string cls = toPtr->IsA()->GetName();
00328           string nm = toPtr->GetName();
00329           std::cout << "    TObject class = " << cls
00330                     << ", name = " << nm << std::endl;
00331         }
00332       }
00333     }
00334   }
00335 
00336   // reset monitor elements that have requested it
00337   // TODO - enable this
00338   //bei->doneSending(true, true);
00339 
00340   // update the "previous" lumi section
00341   lumiSectionOfPreviousUpdate_ = thisLumiSection;
00342 }
00343 
00348 void FUShmDQMOutputService::postSourceConstructionProcessing(const edm::ModuleDescription &moduleDesc)
00349 {
00350   if (DSS_DEBUG) {
00351     cout << "FUShmDQMOutputService::postSourceConstructionProcessing called for "
00352          << moduleDesc.moduleName() << endl;
00353   }
00354 
00355   bei = edm::Service<DQMStore>().operator->();
00356 }
00357 
00362 void FUShmDQMOutputService::preBeginRun(const edm::RunID &runID,
00363                                         const edm::Timestamp &timestamp)
00364 {
00365   if (DSS_DEBUG) {
00366     cout << "FUShmDQMOutputService::preBeginRun called, run number "
00367          << runID.run() << endl;
00368   }
00369 
00370   initializationIsNeeded_ = true;
00371 }
00372 
00377 void FUShmDQMOutputService::postEndJobProcessing()
00378 {
00379   if (DSS_DEBUG) {
00380     cout << "FUShmDQMOutputService::postEndJobProcessing called" << endl;
00381   }
00382   // since the service is not destroyed we need to take care of endjob items here
00383   initializationIsNeeded_ = true;
00384 }
00385 
00390 void FUShmDQMOutputService::findMonitorElements(DQMEvent::TObjectTable &toTable,
00391                                            std::string folderPath)
00392 {
00393   if (bei == NULL) {return;}
00394 
00395   // fetch the monitor elements in the specified directory
00396   std::vector<MonitorElement *> localMEList = bei->getContents(folderPath);
00397   //MonitorElementRootFolder* folderPtr = bei->getDirectory(folderPath);
00398 
00399   // add the MEs that should be updated to the table
00400   std::vector<TObject *> updateTOList;
00401   for (int idx = 0; idx < (int) localMEList.size(); idx++) {
00402     MonitorElement *mePtr = localMEList[idx];
00403     if (mePtr->wasUpdated()) {
00404       updateTOList.push_back(mePtr->getRootObject());
00405     }
00406   }
00407   if (updateTOList.size() > 0) {
00408     toTable[folderPath] = updateTOList;
00409   }
00410 
00411   // find the subdirectories in this folder
00412   // (checking if the directory exists is probably overkill,
00413   // but we really don't want to create new folders using
00414   // setCurrentFolder())
00415   if (bei->dirExists(folderPath)) {
00416     bei->setCurrentFolder(folderPath);
00417     std::vector<std::string> subDirList = bei->getSubdirs();
00418 
00419     // loop over the subdirectories, find the MEs in each one
00420     std::vector<std::string>::const_iterator dirIter;
00421     for (dirIter = subDirList.begin(); dirIter != subDirList.end(); dirIter++) {
00422       std::string subDirPath = (*dirIter);
00423       findMonitorElements(toTable, subDirPath);
00424     }
00425   }
00426 }
00427 
00431 void FUShmDQMOutputService::writeShmDQMData(DQMEventMsgBuilder const& dqmMsgBuilder)
00432 {
00433   // fetch the location and size of the message buffer
00434   unsigned char* buffer = (unsigned char*) dqmMsgBuilder.startAddress();
00435   unsigned int size = dqmMsgBuilder.size();
00436 
00437   // fetch the run, event, and folder number for addition to the I2O fragments
00438   DQMEventMsgView dqmMsgView(buffer);
00439   unsigned int runid = dqmMsgView.runNumber();
00440   unsigned int eventid = dqmMsgView.eventNumberAtUpdate();
00441 
00442   // We need to generate an unique 32 bit ID from the top folder name
00443   std::string topFolder = dqmMsgView.topFolderName();
00444   uLong crc = crc32(0L, Z_NULL, 0);
00445   Bytef* buf = (Bytef*)topFolder.data();
00446   crc = crc32(crc, buf, topFolder.length());
00447   if (DSS_DEBUG) {
00448     std::cout << "Folder = " << topFolder << " crc = " << crc << std::endl;
00449   }
00450 
00451   if(!shmBuffer_) {
00452     edm::LogError("FUDQMShmOutputService") 
00453       << " Error writing to shared memory as shm is not available";
00454   } else {
00455     bool ret = shmBuffer_->writeDqmEventData(runid, eventid, (unsigned int)crc,
00456                                              fuProcId_, fuGuidValue_, buffer, size);
00457     if(!ret) edm::LogError("FUShmDQMOutputService") << " Error with writing data to ShmBuffer";
00458   }
00459 
00460 }
00461 
00466 void FUShmDQMOutputService::postBeginJobProcessing()
00467 {
00468   if (DSS_DEBUG >= 2) {
00469     cout << "FUShmDQMOutputService::postBeginJobProcessing called" << endl;
00470   }
00471 }
00472 
00478 void FUShmDQMOutputService::preSourceProcessing()
00479 {
00480   if (DSS_DEBUG >= 2) {
00481     cout << "FUShmDQMOutputService::preSourceProcessing called" << endl;
00482   }
00483 }
00484 
00490 void FUShmDQMOutputService::postSourceProcessing()
00491 {
00492   if (DSS_DEBUG >= 2) {
00493     cout << "FUShmDQMOutputService::postSourceProcessing called" << endl;
00494   }
00495 }
00496 
00501 void FUShmDQMOutputService::preModuleProcessing(const edm::ModuleDescription &moduleDesc)
00502 {
00503   if (DSS_DEBUG >= 2) {
00504     cout << "FUShmDQMOutputService::preModuleProcessing called for "
00505          << moduleDesc.moduleName() << endl;
00506   }
00507 }
00508 
00513 void FUShmDQMOutputService::postModuleProcessing(const edm::ModuleDescription &moduleDesc)
00514 {
00515   if (DSS_DEBUG >= 2) {
00516     cout << "FUShmDQMOutputService::postModuleProcessing called for "
00517          << moduleDesc.moduleName() << endl;
00518   }
00519 }
00520 
00525 void FUShmDQMOutputService::preSourceConstructionProcessing(const edm::ModuleDescription &moduleDesc)
00526 {
00527   if (DSS_DEBUG >= 2) {
00528     cout << "FUShmDQMOutputService::preSourceConstructionProcessing called for "
00529          << moduleDesc.moduleName() << endl;
00530   }
00531 }
00532 
00537 void FUShmDQMOutputService::preModuleConstructionProcessing(const edm::ModuleDescription &moduleDesc)
00538 {
00539   if (DSS_DEBUG >= 2) {
00540     cout << "FUShmDQMOutputService::preModuleConstructionProcessing called for "
00541          << moduleDesc.moduleName() << endl;
00542   }
00543 }
00544 
00549 void FUShmDQMOutputService::postModuleConstructionProcessing(const edm::ModuleDescription &moduleDesc)
00550 {
00551   if (DSS_DEBUG >= 2) {
00552     cout << "FUShmDQMOutputService::postModuleConstructionProcessing called for "
00553          << moduleDesc.moduleName() << endl;
00554   }
00555 }
00556 
00557 bool FUShmDQMOutputService::attachToShm()
00558 {
00559   if(0==shmBuffer_) {
00560     shmBuffer_ = evf::FUShmBuffer::getShmBuffer();
00561     if (0==shmBuffer_) {
00562       edm::LogError("FUDQMShmOutputService")<<"Failed to attach to shared memory";
00563       return false;
00564     }
00565     return true;    
00566   }
00567   return false;
00568 
00569 }
00570 
00571 
00572 
00573 bool FUShmDQMOutputService::detachFromShm()
00574 {
00575   if(0!=shmBuffer_) {
00576     shmdt(shmBuffer_);
00577     shmBuffer_ = 0;
00578   }
00579   return true;
00580 }

Generated on Tue Jun 9 17:34:42 2009 for CMSSW by  doxygen 1.5.4