00001
00024 #include "EventFilter/Modules/interface/FUShmDQMOutputService.h"
00025 #include "FWCore/ServiceRegistry/interface/Service.h"
00026 #include "FWCore/Utilities/interface/GetReleaseVersion.h"
00027 #include "FWCore/MessageLogger/interface/MessageLogger.h"
00028 #include "DQMServices/Core/interface/MonitorElement.h"
00029 #include "DQMServices/Core/interface/DQMStore.h"
00030 #include "FWCore/Utilities/src/Guid.h"
00031 #include "DataFormats/Provenance/interface/ModuleDescription.h"
00032 #include "TClass.h"
00033 #include "zlib.h"
00034
00035 using namespace std;
00036
00042 #define DSS_DEBUG 0
00043
00047 bool FUShmDQMOutputService::fuIdsInitialized_ = false;
00048 uint32 FUShmDQMOutputService::fuProcId_ = 0;
00049 uint32 FUShmDQMOutputService::fuGuidValue_ = 0;
00050
00054 FUShmDQMOutputService::FUShmDQMOutputService(const edm::ParameterSet &pset,
00055 edm::ActivityRegistry &actReg):
00056 shmBuffer_(0)
00057 {
00058 if (DSS_DEBUG) {cout << "FUShmDQMOutputService Constructor" << endl;}
00059
00060
00061
00062
00063 actReg.watchPostProcessEvent(this, &FUShmDQMOutputService::postEventProcessing);
00064
00065
00066
00067
00068 actReg.watchPostSourceConstruction(this,
00069 &FUShmDQMOutputService::postSourceConstructionProcessing);
00070
00071
00072 actReg.watchPreBeginRun(this, &FUShmDQMOutputService::preBeginRun);
00073
00074
00075
00076
00077 actReg.watchPostEndJob(this, &FUShmDQMOutputService::postEndJobProcessing);
00078
00079
00080
00081 if (DSS_DEBUG >= 2) {
00082 actReg.watchPostBeginJob(this, &FUShmDQMOutputService::postBeginJobProcessing);
00083 actReg.watchPreSource(this, &FUShmDQMOutputService::preSourceProcessing);
00084 actReg.watchPostSource(this, &FUShmDQMOutputService::postSourceProcessing);
00085 actReg.watchPreModule(this, &FUShmDQMOutputService::preModuleProcessing);
00086 actReg.watchPostModule(this, &FUShmDQMOutputService::postModuleProcessing);
00087 actReg.watchPreSourceConstruction(this,
00088 &FUShmDQMOutputService::preSourceConstructionProcessing);
00089 actReg.watchPreModuleConstruction(this,
00090 &FUShmDQMOutputService::preModuleConstructionProcessing);
00091 actReg.watchPostModuleConstruction(this,
00092 &FUShmDQMOutputService::postModuleConstructionProcessing);
00093 }
00094
00095
00096 int initialSize =
00097 pset.getUntrackedParameter<int>("initialMessageBufferSize", 1000000);
00098 messageBuffer_.resize(initialSize);
00099 lumiSectionsPerUpdate_ = pset.getParameter<double>("lumiSectionsPerUpdate");
00100
00101 if (lumiSectionsPerUpdate_ <= 1.0) {lumiSectionsPerUpdate_ = 1.0;}
00102 initializationIsNeeded_ = true;
00103 useCompression_ = pset.getParameter<bool>("useCompression");
00104 compressionLevel_ = pset.getParameter<int>("compressionLevel");
00105
00106
00107
00108
00109
00110 lumiSectionInterval_ =
00111 pset.getUntrackedParameter<int>("lumiSectionInterval", 0);
00112 if (lumiSectionInterval_ < 1) {lumiSectionInterval_ = 0;}
00113
00114
00115 struct timeval now;
00116 struct timezone dummyTZ;
00117 gettimeofday(&now, &dummyTZ);
00118
00119 timeInSecSinceUTC_ = static_cast<double>(now.tv_sec) + (static_cast<double>(now.tv_usec)/1000000.0);
00120
00121 if (! fuIdsInitialized_) {
00122 fuIdsInitialized_ = true;
00123
00124 edm::Guid guidObj(true);
00125 std::string guidString = guidObj.toString();
00126
00127
00128 uLong crc = crc32(0L, Z_NULL, 0);
00129 Bytef* buf = (Bytef*)guidString.data();
00130 crc = crc32(crc, buf, guidString.length());
00131 fuGuidValue_ = crc;
00132
00133 fuProcId_ = getpid();
00134
00135
00136 }
00137 }
00138
00142 FUShmDQMOutputService::~FUShmDQMOutputService(void)
00143 {
00144 if (DSS_DEBUG) {cout << "FUShmDQMOutputService Destructor" << endl;}
00145 shmdt(shmBuffer_);
00146 }
00147
00155 void FUShmDQMOutputService::postEventProcessing(const edm::Event &event,
00156 const edm::EventSetup &eventSetup)
00157 {
00158
00159 unsigned int thisLumiSection = 0;
00160 if(lumiSectionInterval_ == 0)
00161 thisLumiSection = event.luminosityBlock();
00162 else {
00163
00164 struct timeval now;
00165 struct timezone dummyTZ;
00166 gettimeofday(&now, &dummyTZ);
00167 double timeInSec = static_cast<double>(now.tv_sec) + (static_cast<double>(now.tv_usec)/1000000.0) - timeInSecSinceUTC_;
00168
00169 if(lumiSectionInterval_ > 0) thisLumiSection = static_cast<uint32>(timeInSec/lumiSectionInterval_);
00170 }
00171
00172 if (DSS_DEBUG) {
00173 cout << "FUShmDQMOutputService::postEventProcessing called, event number "
00174 << event.id().event() << ", lumi section "
00175 << thisLumiSection << endl;
00176 }
00177
00178
00179 if (initializationIsNeeded_) {
00180 initializationIsNeeded_ = false;
00181 lumiSectionOfPreviousUpdate_ = thisLumiSection;
00182 firstLumiSectionSeen_ = thisLumiSection;
00183
00184
00185
00186 struct timeval now;
00187 struct timezone dummyTZ;
00188 gettimeofday(&now, &dummyTZ);
00189
00190 timeInSecSinceUTC_ = static_cast<double>(now.tv_sec) + (static_cast<double>(now.tv_usec)/1000000.0);
00191 }
00192
00193
00194
00195
00196
00197
00198
00199
00200
00201 int lsDelta = (int) (thisLumiSection - lumiSectionOfPreviousUpdate_);
00202 double updateRatio = ((double) lsDelta) / lumiSectionsPerUpdate_;
00203 if (updateRatio < 1.0) {return;}
00204
00205
00206 int fullLsDelta = (int) (thisLumiSection - firstLumiSectionSeen_);
00207 double fullUpdateRatio = ((double) fullLsDelta) / lumiSectionsPerUpdate_;
00208
00209 uint32 updateNumber = -1 + (uint32) fullUpdateRatio;
00210
00211 unsigned int lumiSectionTag = firstLumiSectionSeen_ +
00212 ((int) (updateNumber * lumiSectionsPerUpdate_));
00213
00214
00215 if (bei == NULL) {
00216 bei = edm::Service<DQMStore>().operator->();
00217 }
00218
00219
00220 if (bei == NULL) {
00221 throw cms::Exception("postEventProcessing", "FUShmDQMOutputService")
00222 << "Unable to lookup the DQMStore service!\n";
00223 }
00224
00225
00226
00227 std::vector<std::string> topLevelFolderList;
00228
00229 bei->cd();
00230
00231 topLevelFolderList = bei->getSubdirs();
00232
00233
00234
00235 std::map< std::string, DQMEvent::TObjectTable > toMap;
00236 std::vector<std::string>::const_iterator dirIter;
00237 for (dirIter = topLevelFolderList.begin();
00238 dirIter != topLevelFolderList.end();
00239 dirIter++) {
00240 std::string dirName = *dirIter;
00241 DQMEvent::TObjectTable toTable;
00242
00243
00244 findMonitorElements(toTable, dirName);
00245
00246
00247 toMap[dirName] = toTable;
00248 }
00249
00250
00251
00252 for (dirIter = topLevelFolderList.begin();
00253 dirIter != topLevelFolderList.end();
00254 dirIter++) {
00255 std::string dirName = *dirIter;
00256 DQMEvent::TObjectTable toTable = toMap[dirName];
00257 if (toTable.size() == 0) {continue;}
00258
00259
00260 serializeWorker_.serializeDQMEvent(toTable, useCompression_,
00261 compressionLevel_);
00262
00263
00264 unsigned int srcSize = serializeWorker_.currentSpaceUsed();
00265 unsigned int newSize = srcSize + 50000;
00266 if (messageBuffer_.size() < newSize) messageBuffer_.resize(newSize);
00267
00268
00269 DQMEventMsgBuilder dqmMsgBuilder(&messageBuffer_[0], messageBuffer_.size(),
00270 event.id().run(), event.id().event(),
00271 event.time(),
00272 lumiSectionTag, updateNumber,
00273 edm::getReleaseVersion(), dirName,
00274 toTable);
00275
00276
00277 unsigned char* src = serializeWorker_.bufferPointer();
00278 std::copy(src,src + srcSize, dqmMsgBuilder.eventAddress());
00279 dqmMsgBuilder.setEventLength(srcSize);
00280 if (useCompression_) {
00281 dqmMsgBuilder.setCompressionFlag(serializeWorker_.currentEventSize());
00282 }
00283
00284
00285 dqmMsgBuilder.setFUProcessId(fuProcId_);
00286 dqmMsgBuilder.setFUGuid(fuGuidValue_);
00287
00288
00289 writeShmDQMData(dqmMsgBuilder);
00290
00291
00292 if (DSS_DEBUG >= 3) {
00293 DQMEventMsgView dqmEventView(&messageBuffer_[0]);
00294 std::cout << " DQM Message data:" << std::endl;
00295 std::cout << " protocol version = "
00296 << dqmEventView.protocolVersion() << std::endl;
00297 std::cout << " header size = "
00298 << dqmEventView.headerSize() << std::endl;
00299 std::cout << " run number = "
00300 << dqmEventView.runNumber() << std::endl;
00301 std::cout << " event number = "
00302 << dqmEventView.eventNumberAtUpdate() << std::endl;
00303 std::cout << " lumi section = "
00304 << dqmEventView.lumiSection() << std::endl;
00305 std::cout << " update number = "
00306 << dqmEventView.updateNumber() << std::endl;
00307 std::cout << " compression flag = "
00308 << dqmEventView.compressionFlag() << std::endl;
00309 std::cout << " reserved word = "
00310 << dqmEventView.reserved() << std::endl;
00311 std::cout << " release tag = "
00312 << dqmEventView.releaseTag() << std::endl;
00313 std::cout << " top folder name = "
00314 << dqmEventView.topFolderName() << std::endl;
00315 std::cout << " sub folder count = "
00316 << dqmEventView.subFolderCount() << std::endl;
00317 std::auto_ptr<DQMEvent::TObjectTable> toTablePtr =
00318 deserializeWorker_.deserializeDQMEvent(dqmEventView);
00319 DQMEvent::TObjectTable::const_iterator toIter;
00320 for (toIter = toTablePtr->begin();
00321 toIter != toTablePtr->end(); toIter++) {
00322 std::string subFolderName = toIter->first;
00323 std::cout << " folder = " << subFolderName << std::endl;
00324 std::vector<TObject *> toList = toIter->second;
00325 for (int tdx = 0; tdx < (int) toList.size(); tdx++) {
00326 TObject *toPtr = toList[tdx];
00327 string cls = toPtr->IsA()->GetName();
00328 string nm = toPtr->GetName();
00329 std::cout << " TObject class = " << cls
00330 << ", name = " << nm << std::endl;
00331 }
00332 }
00333 }
00334 }
00335
00336
00337
00338
00339
00340
00341 lumiSectionOfPreviousUpdate_ = thisLumiSection;
00342 }
00343
00348 void FUShmDQMOutputService::postSourceConstructionProcessing(const edm::ModuleDescription &moduleDesc)
00349 {
00350 if (DSS_DEBUG) {
00351 cout << "FUShmDQMOutputService::postSourceConstructionProcessing called for "
00352 << moduleDesc.moduleName() << endl;
00353 }
00354
00355 bei = edm::Service<DQMStore>().operator->();
00356 }
00357
00362 void FUShmDQMOutputService::preBeginRun(const edm::RunID &runID,
00363 const edm::Timestamp ×tamp)
00364 {
00365 if (DSS_DEBUG) {
00366 cout << "FUShmDQMOutputService::preBeginRun called, run number "
00367 << runID.run() << endl;
00368 }
00369
00370 initializationIsNeeded_ = true;
00371 }
00372
00377 void FUShmDQMOutputService::postEndJobProcessing()
00378 {
00379 if (DSS_DEBUG) {
00380 cout << "FUShmDQMOutputService::postEndJobProcessing called" << endl;
00381 }
00382
00383 initializationIsNeeded_ = true;
00384 }
00385
00390 void FUShmDQMOutputService::findMonitorElements(DQMEvent::TObjectTable &toTable,
00391 std::string folderPath)
00392 {
00393 if (bei == NULL) {return;}
00394
00395
00396 std::vector<MonitorElement *> localMEList = bei->getContents(folderPath);
00397
00398
00399
00400 std::vector<TObject *> updateTOList;
00401 for (int idx = 0; idx < (int) localMEList.size(); idx++) {
00402 MonitorElement *mePtr = localMEList[idx];
00403 if (mePtr->wasUpdated()) {
00404 updateTOList.push_back(mePtr->getRootObject());
00405 }
00406 }
00407 if (updateTOList.size() > 0) {
00408 toTable[folderPath] = updateTOList;
00409 }
00410
00411
00412
00413
00414
00415 if (bei->dirExists(folderPath)) {
00416 bei->setCurrentFolder(folderPath);
00417 std::vector<std::string> subDirList = bei->getSubdirs();
00418
00419
00420 std::vector<std::string>::const_iterator dirIter;
00421 for (dirIter = subDirList.begin(); dirIter != subDirList.end(); dirIter++) {
00422 std::string subDirPath = (*dirIter);
00423 findMonitorElements(toTable, subDirPath);
00424 }
00425 }
00426 }
00427
00431 void FUShmDQMOutputService::writeShmDQMData(DQMEventMsgBuilder const& dqmMsgBuilder)
00432 {
00433
00434 unsigned char* buffer = (unsigned char*) dqmMsgBuilder.startAddress();
00435 unsigned int size = dqmMsgBuilder.size();
00436
00437
00438 DQMEventMsgView dqmMsgView(buffer);
00439 unsigned int runid = dqmMsgView.runNumber();
00440 unsigned int eventid = dqmMsgView.eventNumberAtUpdate();
00441
00442
00443 std::string topFolder = dqmMsgView.topFolderName();
00444 uLong crc = crc32(0L, Z_NULL, 0);
00445 Bytef* buf = (Bytef*)topFolder.data();
00446 crc = crc32(crc, buf, topFolder.length());
00447 if (DSS_DEBUG) {
00448 std::cout << "Folder = " << topFolder << " crc = " << crc << std::endl;
00449 }
00450
00451 if(!shmBuffer_) {
00452 edm::LogError("FUDQMShmOutputService")
00453 << " Error writing to shared memory as shm is not available";
00454 } else {
00455 bool ret = shmBuffer_->writeDqmEventData(runid, eventid, (unsigned int)crc,
00456 fuProcId_, fuGuidValue_, buffer, size);
00457 if(!ret) edm::LogError("FUShmDQMOutputService") << " Error with writing data to ShmBuffer";
00458 }
00459
00460 }
00461
00466 void FUShmDQMOutputService::postBeginJobProcessing()
00467 {
00468 if (DSS_DEBUG >= 2) {
00469 cout << "FUShmDQMOutputService::postBeginJobProcessing called" << endl;
00470 }
00471 }
00472
00478 void FUShmDQMOutputService::preSourceProcessing()
00479 {
00480 if (DSS_DEBUG >= 2) {
00481 cout << "FUShmDQMOutputService::preSourceProcessing called" << endl;
00482 }
00483 }
00484
00490 void FUShmDQMOutputService::postSourceProcessing()
00491 {
00492 if (DSS_DEBUG >= 2) {
00493 cout << "FUShmDQMOutputService::postSourceProcessing called" << endl;
00494 }
00495 }
00496
00501 void FUShmDQMOutputService::preModuleProcessing(const edm::ModuleDescription &moduleDesc)
00502 {
00503 if (DSS_DEBUG >= 2) {
00504 cout << "FUShmDQMOutputService::preModuleProcessing called for "
00505 << moduleDesc.moduleName() << endl;
00506 }
00507 }
00508
00513 void FUShmDQMOutputService::postModuleProcessing(const edm::ModuleDescription &moduleDesc)
00514 {
00515 if (DSS_DEBUG >= 2) {
00516 cout << "FUShmDQMOutputService::postModuleProcessing called for "
00517 << moduleDesc.moduleName() << endl;
00518 }
00519 }
00520
00525 void FUShmDQMOutputService::preSourceConstructionProcessing(const edm::ModuleDescription &moduleDesc)
00526 {
00527 if (DSS_DEBUG >= 2) {
00528 cout << "FUShmDQMOutputService::preSourceConstructionProcessing called for "
00529 << moduleDesc.moduleName() << endl;
00530 }
00531 }
00532
00537 void FUShmDQMOutputService::preModuleConstructionProcessing(const edm::ModuleDescription &moduleDesc)
00538 {
00539 if (DSS_DEBUG >= 2) {
00540 cout << "FUShmDQMOutputService::preModuleConstructionProcessing called for "
00541 << moduleDesc.moduleName() << endl;
00542 }
00543 }
00544
00549 void FUShmDQMOutputService::postModuleConstructionProcessing(const edm::ModuleDescription &moduleDesc)
00550 {
00551 if (DSS_DEBUG >= 2) {
00552 cout << "FUShmDQMOutputService::postModuleConstructionProcessing called for "
00553 << moduleDesc.moduleName() << endl;
00554 }
00555 }
00556
00557 bool FUShmDQMOutputService::attachToShm()
00558 {
00559 if(0==shmBuffer_) {
00560 shmBuffer_ = evf::FUShmBuffer::getShmBuffer();
00561 if (0==shmBuffer_) {
00562 edm::LogError("FUDQMShmOutputService")<<"Failed to attach to shared memory";
00563 return false;
00564 }
00565 return true;
00566 }
00567 return false;
00568
00569 }
00570
00571
00572
00573 bool FUShmDQMOutputService::detachFromShm()
00574 {
00575 if(0!=shmBuffer_) {
00576 shmdt(shmBuffer_);
00577 shmBuffer_ = 0;
00578 }
00579 return true;
00580 }