CMS 3D CMS Logo

/data/refman/pasoursint/CMSSW_4_1_8_patch12/src/EventFilter/Modules/src/FUShmDQMOutputService.cc

Go to the documentation of this file.
00001 
00024 #include "EventFilter/Modules/interface/FUShmDQMOutputService.h"
00025 #include "EventFilter/Utilities/interface/MicroStateService.h"
00026 #include "FWCore/ServiceRegistry/interface/Service.h"
00027 #include "FWCore/Version/interface/GetReleaseVersion.h"
00028 #include "FWCore/MessageLogger/interface/MessageLogger.h"
00029 #include "DQMServices/Core/interface/MonitorElement.h"
00030 #include "DQMServices/Core/interface/DQMStore.h"
00031 #include "FWCore/Utilities/src/Guid.h"
00032 #include "DataFormats/Provenance/interface/ModuleDescription.h"
00033 #include "TClass.h"
00034 #include "zlib.h"
00035 #include <unistd.h>
00036 #include <sys/types.h>
00037 
00038 using namespace std;
00039 
00045 #define DSS_DEBUG 0
00046 
00050 bool FUShmDQMOutputService::fuIdsInitialized_ = false;
00051 uint32 FUShmDQMOutputService::fuGuidValue_ = 0;
00052 
00056 FUShmDQMOutputService::FUShmDQMOutputService(const edm::ParameterSet &pset,
00057                                    edm::ActivityRegistry &actReg)
00058   : evf::ServiceWeb("FUShmDQMOutputService")
00059   , updateNumber_(0)
00060   , shmBuffer_(0)
00061   , nbUpdates_(0)
00062 {
00063 
00064   // specify the routine to be called after event processing.  This routine
00065   // will be used to periodically fetch monitor elements from the DQM
00066   // backend and write out to shared memory for sending to the storage manager.
00067   actReg.watchPostEndLumi(this, &FUShmDQMOutputService::postEndLumi);
00068 
00069   // specify the routine to be called after the input source has been
00070   // constructed.  This routine will be used to initialize our connection
00071   // to the storage manager and any other needed setup.??
00072   actReg.watchPostSourceConstruction(this,
00073          &FUShmDQMOutputService::postSourceConstructionProcessing);
00074 
00075   // specify the routine to be called when a run begins
00076   actReg.watchPreBeginRun(this, &FUShmDQMOutputService::preBeginRun);
00077 
00078   // specify the routine to be called when the job has finished.  It will
00079   // be used to disconnect from the SM, if needed, and any other shutdown
00080   // tasks that are needed.??
00081   actReg.watchPostEndJob(this, &FUShmDQMOutputService::postEndJobProcessing);
00082 
00083   // set internal values from the parameter set
00084   int initialSize =
00085     pset.getUntrackedParameter<int>("initialMessageBufferSize", 1000000);
00086   messageBuffer_.resize(initialSize);
00087   lumiSectionsPerUpdate_ = pset.getParameter<double>("lumiSectionsPerUpdate");
00088   // for the moment, only support a number of lumi sections per update >= 1
00089   if (lumiSectionsPerUpdate_ <= 1.0) {lumiSectionsPerUpdate_ = 1.0;}
00090   initializationIsNeeded_ = true;
00091   useCompression_ = pset.getParameter<bool>("useCompression");
00092   compressionLevel_ = pset.getParameter<int>("compressionLevel");
00093   // the default for lumiSectionInterval_ is 0, meaning get it from the event
00094   // otherwise we get a fake one that should match the fake lumi block
00095   // for events (if any) as long as the time between lumi blocks is larger
00096   // than the time difference between creating this service and the 
00097   // FUShmOutputModule event output module
00098   lumiSectionInterval_ =
00099     pset.getUntrackedParameter<int>("lumiSectionInterval", 0); // seconds
00100   if (lumiSectionInterval_ < 1) {lumiSectionInterval_ = 0;}
00101 
00102   // for fake test luminosity sections
00103   struct timeval now;
00104   struct timezone dummyTZ;
00105   gettimeofday(&now, &dummyTZ);
00106   // we will count lumi section numbers from this time
00107   timeInSecSinceUTC_ = static_cast<double>(now.tv_sec) + (static_cast<double>(now.tv_usec)/1000000.0);
00108 
00109   int got_host = gethostname(host_name_, sizeof(host_name_));
00110   if(got_host != 0) strcpy(host_name_, "noHostNameFoundOrTooLong");
00111 
00112   if (! fuIdsInitialized_) {
00113     fuIdsInitialized_ = true;
00114 
00115     edm::Guid guidObj(true);
00116     std::string guidString = guidObj.toString();
00117     //std::cout << "DQMOutput GUID string = " << guidString << std::endl;
00118 
00119     uLong crc = crc32(0L, Z_NULL, 0);
00120     Bytef* buf = (Bytef*)guidString.data();
00121     crc = crc32(crc, buf, guidString.length());
00122     fuGuidValue_ = crc;
00123 
00124     //std::cout << "DQMOutput GUID value = 0x" << std::hex << fuGuidValue_ << std::endl;
00125   }
00126 }
00127 
00131 FUShmDQMOutputService::~FUShmDQMOutputService(void)
00132 {
00133   shmdt(shmBuffer_);
00134 }
00135 
00136 void FUShmDQMOutputService::defaultWebPage(xgi::Input *in, xgi::Output *out)
00137 {
00138 }
00139 
00140 void FUShmDQMOutputService::publish(xdata::InfoSpace *is)
00141 {
00142   try{
00143     is->fireItemAvailable("nbDqmUpdates",&nbUpdates_);
00144   }
00145   catch(xdata::exception::Exception &e){
00146     edm::LogInfo("FUShmDQMOutputService")
00147       << " exception when publishing to InfoSpace "; 
00148   } 
00149 }
00150 
00151 void FUShmDQMOutputService::postEndLumi(edm::LuminosityBlock const &lb, edm::EventSetup const &es)
00152 {
00153   std::string dqm = "DQM";
00154   std::string in = "INPUT";
00155   evf::MicroStateService *mss = 0;
00156   try{
00157     mss = edm::Service<evf::MicroStateService>().operator->();
00158     if(mss) mss->setMicroState(dqm);
00159   }
00160   catch(...) { 
00161     edm::LogError("FUShmDQMOutputService")<< "exception when trying to get service MicroStateService";
00162   }
00163   
00164 
00165   // fake the luminosity section if we don't want to use the real one
00166   unsigned int thisLumiSection = 0;
00167   if(lumiSectionInterval_ == 0)
00168     thisLumiSection = lb.luminosityBlock();
00169   else {
00170     // match the code in Event output module to get the same (almost) lumi number
00171     struct timeval now;
00172     struct timezone dummyTZ;
00173     gettimeofday(&now, &dummyTZ);
00174     double timeInSec = static_cast<double>(now.tv_sec) + (static_cast<double>(now.tv_usec)/1000000.0) - timeInSecSinceUTC_;
00175     // what about overflows?
00176     if(lumiSectionInterval_ > 0) thisLumiSection = static_cast<uint32>(timeInSec/lumiSectionInterval_);
00177   }
00178 
00179    // special handling for the first event
00180   if (initializationIsNeeded_) {
00181     initializationIsNeeded_ = false;
00182     lumiSectionOfPreviousUpdate_ = thisLumiSection;
00183     firstLumiSectionSeen_ = thisLumiSection;
00184 
00185     // for when a run(job) had ended and we start a new run(job)
00186     // for fake test luminosity sections
00187     struct timeval now;
00188     struct timezone dummyTZ;
00189     gettimeofday(&now, &dummyTZ);
00190     // we will count lumi section numbers from this time
00191     timeInSecSinceUTC_ = static_cast<double>(now.tv_sec) + (static_cast<double>(now.tv_usec)/1000000.0);
00192   }
00193 
00194   //  std::cout << getpid() << ": :" //<< gettid() 
00195   //        << ":DQMOutputService check if have to send update for lumiSection " << thisLumiSection << std::endl;
00196   if(thisLumiSection%4!=0) 
00197     {
00198 //       std::cout << getpid() << ": :" //<< gettid() 
00199 //              << ":DQMOutputService skipping update for lumiSection " << thisLumiSection << std::endl;
00200       if(mss) mss->setMicroState(in);
00201       return;
00202     }
00203 //   std::cout << getpid() << ": :" //<< gettid() 
00204 //          << ":DQMOutputService sending update for lumiSection " << thisLumiSection << std::endl;
00205   // Calculate the update ID and lumi ID for this update
00206   // fullUpdateRatio and fullLsDelta are unused. comment out the calculation.
00207   //int fullLsDelta = (int) (thisLumiSection - firstLumiSectionSeen_);
00208   //double fullUpdateRatio = ((double) fullLsDelta) / lumiSectionsPerUpdate_;
00209   // this is the update number starting from zero
00210 
00211   // this is the actual luminosity section number for the beginning lumi section of this update
00212   unsigned int lumiSectionTag = thisLumiSection;
00213 
00214   // retry the lookup of the backend interface, if needed
00215   if (bei == NULL) {
00216     bei = edm::Service<DQMStore>().operator->();
00217   }
00218 
00219   // to go any further, a backend interface pointer is crucial
00220   if (bei == NULL) {
00221     throw cms::Exception("postEventProcessing", "FUShmDQMOutputService")
00222       << "Unable to lookup the DQMStore service!\n";
00223   }
00224 
00225   // determine the top level folders (these will be used for grouping
00226   // monitor elements into DQM Events)
00227   std::vector<std::string> topLevelFolderList;
00228   //std::cout << "### SenderService, pwd = " << bei->pwd() << std::endl;
00229   bei->cd();
00230   //std::cout << "### SenderService, pwd = " << bei->pwd() << std::endl;
00231   topLevelFolderList = bei->getSubdirs();
00232 
00233   // find the monitor elements under each top level folder (including
00234   // subdirectories)
00235   std::map< std::string, DQMEvent::TObjectTable > toMap;
00236   std::vector<std::string>::const_iterator dirIter;
00237   for (dirIter = topLevelFolderList.begin();
00238        dirIter != topLevelFolderList.end();
00239        dirIter++) {
00240     std::string dirName = *dirIter;
00241     DQMEvent::TObjectTable toTable;
00242 
00243     // find the MEs
00244     findMonitorElements(toTable, dirName);
00245 
00246     // store the list in the map
00247     toMap[dirName] = toTable;
00248   }
00249 
00250   // create a DQMEvent message for each top-level folder
00251   // and write each to the shared memory
00252   for (dirIter = topLevelFolderList.begin();
00253        dirIter != topLevelFolderList.end();
00254        dirIter++) {
00255     std::string dirName = *dirIter;
00256     DQMEvent::TObjectTable toTable = toMap[dirName];
00257     if (toTable.size() == 0) {continue;}
00258 
00259     // serialize the monitor element data
00260     serializeWorker_.serializeDQMEvent(toTable, useCompression_,
00261                                        compressionLevel_);
00262 
00263     // resize the message buffer, if needed 
00264     unsigned int srcSize = serializeWorker_.currentSpaceUsed();
00265     unsigned int newSize = srcSize + 50000;  // allow for header
00266     if (messageBuffer_.size() < newSize) messageBuffer_.resize(newSize);
00267 
00268     // create the message
00269     DQMEventMsgBuilder dqmMsgBuilder(&messageBuffer_[0], messageBuffer_.size(),
00270                                      lb.run(), lb.luminosityBlock(),
00271                                      lb.endTime(),
00272                                      lumiSectionTag, updateNumber_,
00273                                      (uint32)serializeWorker_.adler32_chksum(),
00274                                      host_name_,
00275                                      edm::getReleaseVersion(), dirName,
00276                                      toTable);
00277 
00278     // copy the serialized data into the message
00279     unsigned char* src = serializeWorker_.bufferPointer();
00280     std::copy(src,src + srcSize, dqmMsgBuilder.eventAddress());
00281     dqmMsgBuilder.setEventLength(srcSize);
00282     if (useCompression_) {
00283       dqmMsgBuilder.setCompressionFlag(serializeWorker_.currentEventSize());
00284     }
00285 
00286     // write the filter unit UUID and PID into the message
00287     dqmMsgBuilder.setFUProcessId(getpid());
00288     dqmMsgBuilder.setFUGuid(fuGuidValue_);
00289 
00290     // send the message
00291     writeShmDQMData(dqmMsgBuilder);
00292 //     std::cout << getpid() << ": :" // << gettid() 
00293 //            << ":DQMOutputService DONE sending update for lumiSection " << thisLumiSection << std::endl;
00294     if(mss) mss->setMicroState(in);
00295 
00296   }
00297   
00298   // reset monitor elements that have requested it
00299   // TODO - enable this
00300   //bei->doneSending(true, true);
00301   
00302   // update the "previous" lumi section
00303   lumiSectionOfPreviousUpdate_ = thisLumiSection;
00304   nbUpdates_++;
00305   updateNumber_++;
00306 }
00307 
00312 void FUShmDQMOutputService::postSourceConstructionProcessing(const edm::ModuleDescription &moduleDesc)
00313 {
00314 
00315   bei = edm::Service<DQMStore>().operator->();
00316 }
00317 
00322 void FUShmDQMOutputService::preBeginRun(const edm::RunID &runID,
00323                                         const edm::Timestamp &timestamp)
00324 {
00325   nbUpdates_ = 0;
00326   updateNumber_ = 0;
00327   initializationIsNeeded_ = true;
00328 }
00329 
00334 void FUShmDQMOutputService::postEndJobProcessing()
00335 {
00336   // since the service is not destroyed we need to take care of endjob items here
00337   initializationIsNeeded_ = true;
00338 }
00339 
00344 void FUShmDQMOutputService::findMonitorElements(DQMEvent::TObjectTable &toTable,
00345                                            std::string folderPath)
00346 {
00347   if (bei == NULL) {return;}
00348 
00349   // fetch the monitor elements in the specified directory
00350   std::vector<MonitorElement *> localMEList = bei->getContents(folderPath);
00351   //MonitorElementRootFolder* folderPtr = bei->getDirectory(folderPath);
00352 
00353   // add the MEs that should be updated to the table
00354   std::vector<TObject *> updateTOList;
00355   for (int idx = 0; idx < (int) localMEList.size(); idx++) {
00356     MonitorElement *mePtr = localMEList[idx];
00357     //    if (mePtr->wasUpdated()) { // @@EM send updated and not (to be revised)
00358     updateTOList.push_back(mePtr->getRootObject());
00359       //    }
00360   }
00361   if (updateTOList.size() > 0) {
00362     toTable[folderPath] = updateTOList;
00363   }
00364 
00365   // find the subdirectories in this folder
00366   // (checking if the directory exists is probably overkill,
00367   // but we really don't want to create new folders using
00368   // setCurrentFolder())
00369   if (bei->dirExists(folderPath)) {
00370     bei->setCurrentFolder(folderPath);
00371     std::vector<std::string> subDirList = bei->getSubdirs();
00372 
00373     // loop over the subdirectories, find the MEs in each one
00374     std::vector<std::string>::const_iterator dirIter;
00375     for (dirIter = subDirList.begin(); dirIter != subDirList.end(); dirIter++) {
00376       std::string subDirPath = (*dirIter);
00377       findMonitorElements(toTable, subDirPath);
00378     }
00379   }
00380 }
00381 
00385 void FUShmDQMOutputService::writeShmDQMData(DQMEventMsgBuilder const& dqmMsgBuilder)
00386 {
00387   // fetch the location and size of the message buffer
00388   unsigned char* buffer = (unsigned char*) dqmMsgBuilder.startAddress();
00389   unsigned int size = dqmMsgBuilder.size();
00390 
00391   // fetch the run, event, and folder number for addition to the I2O fragments
00392   DQMEventMsgView dqmMsgView(buffer);
00393   unsigned int runid = dqmMsgView.runNumber();
00394   unsigned int eventid = dqmMsgView.eventNumberAtUpdate();
00395 
00396   // We need to generate an unique 32 bit ID from the top folder name
00397   std::string topFolder = dqmMsgView.topFolderName();
00398   uLong crc = crc32(0L, Z_NULL, 0);
00399   Bytef* buf = (Bytef*)topFolder.data();
00400   crc = crc32(crc, buf, topFolder.length());
00401 
00402   if(!shmBuffer_) {
00403     edm::LogError("FUDQMShmOutputService") 
00404       << " Error writing to shared memory as shm is not available";
00405   } else {
00406     bool ret = shmBuffer_->writeDqmEventData(runid, eventid, (unsigned int)crc,
00407                                              getpid(), fuGuidValue_, buffer, size);
00408     if(!ret) edm::LogError("FUShmDQMOutputService") << " Error with writing data to ShmBuffer";
00409   }
00410 
00411 }
00412 
00413 
00414 
00415 bool FUShmDQMOutputService::attachToShm()
00416 {
00417   if(0==shmBuffer_) {
00418     shmBuffer_ = evf::FUShmBuffer::getShmBuffer();
00419     if (0==shmBuffer_) {
00420       edm::LogError("FUDQMShmOutputService")<<"Failed to attach to shared memory";
00421       return false;
00422     }
00423     return true;    
00424   }
00425   return false;
00426 
00427 }
00428 
00429 
00430 
00431 bool FUShmDQMOutputService::detachFromShm()
00432 {
00433   if(0!=shmBuffer_) {
00434     shmdt(shmBuffer_);
00435     shmBuffer_ = 0;
00436   }
00437   return true;
00438 }