CMS 3D CMS Logo

/data/refman/pasoursint/CMSSW_5_3_6/src/EventFilter/Modules/src/FUShmDQMOutputService.cc

Go to the documentation of this file.
00001 
00024 #include "EventFilter/Modules/interface/FUShmDQMOutputService.h"
00025 #include "EventFilter/Utilities/interface/MicroStateService.h"
00026 #include "FWCore/ServiceRegistry/interface/Service.h"
00027 #include "FWCore/Version/interface/GetReleaseVersion.h"
00028 #include "FWCore/MessageLogger/interface/MessageLogger.h"
00029 #include "DQMServices/Core/interface/MonitorElement.h"
00030 #include "DQMServices/Core/interface/DQMStore.h"
00031 #include "FWCore/Utilities/src/Guid.h"
00032 #include "DataFormats/Provenance/interface/ModuleDescription.h"
00033 #include "TClass.h"
00034 #include "zlib.h"
00035 #include <unistd.h>
00036 #include <sys/types.h>
00037 
00038 using namespace std;
00039 
00045 #define DSS_DEBUG 0
00046 
00050 bool FUShmDQMOutputService::fuIdsInitialized_ = false;
00051 uint32 FUShmDQMOutputService::fuGuidValue_ = 0;
00052 
00056 FUShmDQMOutputService::FUShmDQMOutputService(const edm::ParameterSet &pset,
00057                                    edm::ActivityRegistry &actReg)
00058   : evf::ServiceWeb("FUShmDQMOutputService")
00059   , updateNumber_(0)
00060   , shmBuffer_(0)
00061   , nbUpdates_(0)
00062   , input("INPUT")
00063   , dqm("DQM")
00064   , attach_(false)
00065 {
00066 
00067   // specify the routine to be called after event processing.  This routine
00068   // will be used to periodically fetch monitor elements from the DQM
00069   // backend and write out to shared memory for sending to the storage manager.
00070   actReg.watchPostEndLumi(this, &FUShmDQMOutputService::postEndLumi);
00071 
00072   // specify the routine to be called after the input source has been
00073   // constructed.  This routine will be used to initialize our connection
00074   // to the storage manager and any other needed setup.??
00075   actReg.watchPostSourceConstruction(this,
00076          &FUShmDQMOutputService::postSourceConstructionProcessing);
00077 
00078   // specify the routine to be called when a run begins
00079   actReg.watchPreBeginRun(this, &FUShmDQMOutputService::preBeginRun);
00080 
00081   // specify the routine to be called when the job has finished.  It will
00082   // be used to disconnect from the SM, if needed, and any other shutdown
00083   // tasks that are needed.??
00084   actReg.watchPostEndJob(this, &FUShmDQMOutputService::postEndJobProcessing);
00085 
00086   // set internal values from the parameter set
00087   int initialSize =
00088     pset.getUntrackedParameter<int>("initialMessageBufferSize", 1000000);
00089   messageBuffer_.resize(initialSize);
00090   lumiSectionsPerUpdate_ = pset.getParameter<double>("lumiSectionsPerUpdate");
00091   // for the moment, only support a number of lumi sections per update >= 1
00092   if (lumiSectionsPerUpdate_ <= 1.0) {lumiSectionsPerUpdate_ = 1.0;}
00093   initializationIsNeeded_ = true;
00094   useCompression_ = pset.getParameter<bool>("useCompression");
00095   compressionLevel_ = pset.getParameter<int>("compressionLevel");
00096   // the default for lumiSectionInterval_ is 0, meaning get it from the event
00097   // otherwise we get a fake one that should match the fake lumi block
00098   // for events (if any) as long as the time between lumi blocks is larger
00099   // than the time difference between creating this service and the 
00100   // FUShmOutputModule event output module
00101   lumiSectionInterval_ =
00102     pset.getUntrackedParameter<int>("lumiSectionInterval", 0); // seconds
00103   if (lumiSectionInterval_ < 1) {lumiSectionInterval_ = 0;}
00104 
00105   // for fake test luminosity sections
00106   struct timeval now;
00107   struct timezone dummyTZ;
00108   gettimeofday(&now, &dummyTZ);
00109   // we will count lumi section numbers from this time
00110   timeInSecSinceUTC_ = static_cast<double>(now.tv_sec) + (static_cast<double>(now.tv_usec)/1000000.0);
00111 
00112   int got_host = gethostname(host_name_, sizeof(host_name_));
00113   if(got_host != 0) strcpy(host_name_, "noHostNameFoundOrTooLong");
00114 
00115   if (! fuIdsInitialized_) {
00116     fuIdsInitialized_ = true;
00117 
00118     edm::Guid guidObj(true);
00119     std::string guidString = guidObj.toString();
00120     //std::cout << "DQMOutput GUID string = " << guidString << std::endl;
00121 
00122     uLong crc = crc32(0L, Z_NULL, 0);
00123     Bytef* buf = (Bytef*)guidString.data();
00124     crc = crc32(crc, buf, guidString.length());
00125     fuGuidValue_ = crc;
00126 
00127     //std::cout << "DQMOutput GUID value = 0x" << std::hex << fuGuidValue_ << std::endl;
00128   }
00129 }
00130 
00134 FUShmDQMOutputService::~FUShmDQMOutputService(void)
00135 {
00136   shmdt(shmBuffer_);
00137 }
00138 
00139 void FUShmDQMOutputService::defaultWebPage(xgi::Input *in, xgi::Output *out)
00140 {
00141 }
00142 
00143 void FUShmDQMOutputService::publish(xdata::InfoSpace *is)
00144 {
00145   try{
00146     is->fireItemAvailable("nbDqmUpdates",&nbUpdates_);
00147   }
00148   catch(xdata::exception::Exception &e){
00149     edm::LogInfo("FUShmDQMOutputService")
00150       << " exception when publishing to InfoSpace "; 
00151   } 
00152 }
00153 
00154 void FUShmDQMOutputService::postEndLumi(edm::LuminosityBlock const &lb, edm::EventSetup const &es)
00155 {
00156   if (attach_) attachToShm();
00157   attach_=false;
00158   
00159   evf::MicroStateService *mss = 0;
00160   try{
00161     mss = edm::Service<evf::MicroStateService>().operator->();
00162     if(mss) mss->setMicroState(&dqm);
00163   }
00164   catch(...) { 
00165     edm::LogError("FUShmDQMOutputService")<< "exception when trying to get service MicroStateService";
00166   }
00167   
00168 
00169   // fake the luminosity section if we don't want to use the real one
00170   unsigned int thisLumiSection = 0;
00171   if(lumiSectionInterval_ == 0)
00172     thisLumiSection = lb.luminosityBlock();
00173   else {
00174     // match the code in Event output module to get the same (almost) lumi number
00175     struct timeval now;
00176     struct timezone dummyTZ;
00177     gettimeofday(&now, &dummyTZ);
00178     double timeInSec = static_cast<double>(now.tv_sec) + (static_cast<double>(now.tv_usec)/1000000.0) - timeInSecSinceUTC_;
00179     // what about overflows?
00180     if(lumiSectionInterval_ > 0) thisLumiSection = static_cast<uint32>(timeInSec/lumiSectionInterval_);
00181   }
00182 
00183    // special handling for the first event
00184   if (initializationIsNeeded_) {
00185     initializationIsNeeded_ = false;
00186     lumiSectionOfPreviousUpdate_ = thisLumiSection;
00187     firstLumiSectionSeen_ = thisLumiSection;
00188 
00189     // for when a run(job) had ended and we start a new run(job)
00190     // for fake test luminosity sections
00191     struct timeval now;
00192     struct timezone dummyTZ;
00193     gettimeofday(&now, &dummyTZ);
00194     // we will count lumi section numbers from this time
00195     timeInSecSinceUTC_ = static_cast<double>(now.tv_sec) + (static_cast<double>(now.tv_usec)/1000000.0);
00196   }
00197 
00198   //  std::cout << getpid() << ": :" //<< gettid() 
00199   //        << ":DQMOutputService check if have to send update for lumiSection " << thisLumiSection << std::endl;
00200   if(thisLumiSection%4!=0) 
00201     {
00202 //       std::cout << getpid() << ": :" //<< gettid() 
00203 //              << ":DQMOutputService skipping update for lumiSection " << thisLumiSection << std::endl;
00204       if(mss) mss->setMicroState(&input);
00205       return;
00206     }
00207 //   std::cout << getpid() << ": :" //<< gettid() 
00208 //          << ":DQMOutputService sending update for lumiSection " << thisLumiSection << std::endl;
00209   // Calculate the update ID and lumi ID for this update
00210   // fullUpdateRatio and fullLsDelta are unused. comment out the calculation.
00211   //int fullLsDelta = (int) (thisLumiSection - firstLumiSectionSeen_);
00212   //double fullUpdateRatio = ((double) fullLsDelta) / lumiSectionsPerUpdate_;
00213   // this is the update number starting from zero
00214 
00215   // this is the actual luminosity section number for the beginning lumi section of this update
00216   unsigned int lumiSectionTag = thisLumiSection;
00217 
00218   // retry the lookup of the backend interface, if needed
00219   if (bei == NULL) {
00220     bei = edm::Service<DQMStore>().operator->();
00221   }
00222 
00223   // to go any further, a backend interface pointer is crucial
00224   if (bei == NULL) {
00225     throw cms::Exception("postEventProcessing", "FUShmDQMOutputService")
00226       << "Unable to lookup the DQMStore service!\n";
00227   }
00228 
00229   // determine the top level folders (these will be used for grouping
00230   // monitor elements into DQM Events)
00231   std::vector<std::string> topLevelFolderList;
00232   //std::cout << "### SenderService, pwd = " << bei->pwd() << std::endl;
00233   bei->cd();
00234   //std::cout << "### SenderService, pwd = " << bei->pwd() << std::endl;
00235   topLevelFolderList = bei->getSubdirs();
00236 
00237   // find the monitor elements under each top level folder (including
00238   // subdirectories)
00239   std::map< std::string, DQMEvent::TObjectTable > toMap;
00240   std::vector<std::string>::const_iterator dirIter;
00241   for (dirIter = topLevelFolderList.begin();
00242        dirIter != topLevelFolderList.end();
00243        dirIter++) {
00244     std::string dirName = *dirIter;
00245     DQMEvent::TObjectTable toTable;
00246 
00247     // find the MEs
00248     findMonitorElements(toTable, dirName);
00249 
00250     // store the list in the map
00251     toMap[dirName] = toTable;
00252   }
00253 
00254   // create a DQMEvent message for each top-level folder
00255   // and write each to the shared memory
00256   for (dirIter = topLevelFolderList.begin();
00257        dirIter != topLevelFolderList.end();
00258        dirIter++) {
00259     std::string dirName = *dirIter;
00260     DQMEvent::TObjectTable toTable = toMap[dirName];
00261     if (toTable.size() == 0) {continue;}
00262 
00263     // serialize the monitor element data
00264     serializeWorker_.serializeDQMEvent(toTable, useCompression_,
00265                                        compressionLevel_);
00266 
00267     // resize the message buffer, if needed 
00268     unsigned int srcSize = serializeWorker_.currentSpaceUsed();
00269     unsigned int newSize = srcSize + 50000;  // allow for header
00270     if (messageBuffer_.size() < newSize) messageBuffer_.resize(newSize);
00271 
00272     // create the message
00273     DQMEventMsgBuilder dqmMsgBuilder(&messageBuffer_[0], messageBuffer_.size(),
00274                                      lb.run(), lb.luminosityBlock(),
00275                                      lb.endTime(),
00276                                      lumiSectionTag, updateNumber_,
00277                                      (uint32)serializeWorker_.adler32_chksum(),
00278                                      host_name_,
00279                                      edm::getReleaseVersion(), dirName,
00280                                      toTable);
00281 
00282     // copy the serialized data into the message
00283     unsigned char* src = serializeWorker_.bufferPointer();
00284     std::copy(src,src + srcSize, dqmMsgBuilder.eventAddress());
00285     dqmMsgBuilder.setEventLength(srcSize);
00286     if (useCompression_) {
00287       dqmMsgBuilder.setCompressionFlag(serializeWorker_.currentEventSize());
00288     }
00289 
00290     // write the filter unit UUID and PID into the message
00291     dqmMsgBuilder.setFUProcessId(getpid());
00292     dqmMsgBuilder.setFUGuid(fuGuidValue_);
00293 
00294     // send the message
00295     writeShmDQMData(dqmMsgBuilder);
00296 //     std::cout << getpid() << ": :" // << gettid() 
00297 //            << ":DQMOutputService DONE sending update for lumiSection " << thisLumiSection << std::endl;
00298     if(mss) mss->setMicroState(&input);
00299 
00300   }
00301   
00302   // reset monitor elements that have requested it
00303   // TODO - enable this
00304   //bei->doneSending(true, true);
00305   
00306   // update the "previous" lumi section
00307   lumiSectionOfPreviousUpdate_ = thisLumiSection;
00308   nbUpdates_++;
00309   updateNumber_++;
00310 }
00311 
00316 void FUShmDQMOutputService::postSourceConstructionProcessing(const edm::ModuleDescription &moduleDesc)
00317 {
00318 
00319   bei = edm::Service<DQMStore>().operator->();
00320 }
00321 
00326 void FUShmDQMOutputService::preBeginRun(const edm::RunID &runID,
00327                                         const edm::Timestamp &timestamp)
00328 {
00329   nbUpdates_ = 0;
00330   updateNumber_ = 0;
00331   initializationIsNeeded_ = true;
00332 }
00333 
00338 void FUShmDQMOutputService::postEndJobProcessing()
00339 {
00340   // since the service is not destroyed we need to take care of endjob items here
00341   initializationIsNeeded_ = true;
00342 }
00343 
00348 void FUShmDQMOutputService::findMonitorElements(DQMEvent::TObjectTable &toTable,
00349                                            std::string folderPath)
00350 {
00351   if (bei == NULL) {return;}
00352 
00353   // fetch the monitor elements in the specified directory
00354   std::vector<MonitorElement *> localMEList = bei->getContents(folderPath);
00355   //MonitorElementRootFolder* folderPtr = bei->getDirectory(folderPath);
00356 
00357   // add the MEs that should be updated to the table
00358   std::vector<TObject *> updateTOList;
00359   for (int idx = 0; idx < (int) localMEList.size(); idx++) {
00360     MonitorElement *mePtr = localMEList[idx];
00361     //    if (mePtr->wasUpdated()) { // @@EM send updated and not (to be revised)
00362     updateTOList.push_back(mePtr->getRootObject());
00363       //    }
00364   }
00365   if (updateTOList.size() > 0) {
00366     toTable[folderPath] = updateTOList;
00367   }
00368 
00369   // find the subdirectories in this folder
00370   // (checking if the directory exists is probably overkill,
00371   // but we really don't want to create new folders using
00372   // setCurrentFolder())
00373   if (bei->dirExists(folderPath)) {
00374     bei->setCurrentFolder(folderPath);
00375     std::vector<std::string> subDirList = bei->getSubdirs();
00376 
00377     // loop over the subdirectories, find the MEs in each one
00378     std::vector<std::string>::const_iterator dirIter;
00379     for (dirIter = subDirList.begin(); dirIter != subDirList.end(); dirIter++) {
00380       std::string subDirPath = (*dirIter);
00381       findMonitorElements(toTable, subDirPath);
00382     }
00383   }
00384 }
00385 
00389 void FUShmDQMOutputService::writeShmDQMData(DQMEventMsgBuilder const& dqmMsgBuilder)
00390 {
00391   // fetch the location and size of the message buffer
00392   unsigned char* buffer = (unsigned char*) dqmMsgBuilder.startAddress();
00393   unsigned int size = dqmMsgBuilder.size();
00394 
00395   // fetch the run, event, and folder number for addition to the I2O fragments
00396   DQMEventMsgView dqmMsgView(buffer);
00397   unsigned int runid = dqmMsgView.runNumber();
00398   unsigned int eventid = dqmMsgView.eventNumberAtUpdate();
00399 
00400   // We need to generate an unique 32 bit ID from the top folder name
00401   std::string topFolder = dqmMsgView.topFolderName();
00402   uLong crc = crc32(0L, Z_NULL, 0);
00403   Bytef* buf = (Bytef*)topFolder.data();
00404   crc = crc32(crc, buf, topFolder.length());
00405 
00406   if(!shmBuffer_) {
00407     edm::LogError("FUDQMShmOutputService") 
00408       << " Error writing to shared memory as shm is not available";
00409   } else {
00410     bool ret = shmBuffer_->writeDqmEventData(runid, eventid, (unsigned int)crc,
00411                                              getpid(), fuGuidValue_, buffer, size);
00412     if(!ret) edm::LogError("FUShmDQMOutputService") << " Error with writing data to ShmBuffer";
00413   }
00414 
00415 }
00416 
00417 void FUShmDQMOutputService::setAttachToShm() {
00418   attach_=true;
00419 }
00420 
00421 bool FUShmDQMOutputService::attachToShm()
00422 {
00423   if(0==shmBuffer_) {
00424     shmBuffer_ = evf::FUShmBuffer::getShmBuffer();
00425     if (0==shmBuffer_) {
00426       edm::LogError("FUDQMShmOutputService")<<"Failed to attach to shared memory";
00427       return false;
00428     }
00429     return true;    
00430   }
00431   return false;
00432 
00433 }
00434 
00435 
00436 
00437 bool FUShmDQMOutputService::detachFromShm()
00438 {
00439   if(0!=shmBuffer_) {
00440     shmdt(shmBuffer_);
00441     shmBuffer_ = 0;
00442   }
00443   return true;
00444 }