00001
00024 #include "EventFilter/Modules/interface/FUShmDQMOutputService.h"
00025 #include "EventFilter/Utilities/interface/MicroStateService.h"
00026 #include "FWCore/ServiceRegistry/interface/Service.h"
00027 #include "FWCore/Version/interface/GetReleaseVersion.h"
00028 #include "FWCore/MessageLogger/interface/MessageLogger.h"
00029 #include "DQMServices/Core/interface/MonitorElement.h"
00030 #include "DQMServices/Core/interface/DQMStore.h"
00031 #include "FWCore/Utilities/src/Guid.h"
00032 #include "DataFormats/Provenance/interface/ModuleDescription.h"
00033 #include "TClass.h"
00034 #include "zlib.h"
00035 #include <unistd.h>
00036 #include <sys/types.h>
00037
00038 using namespace std;
00039
00045 #define DSS_DEBUG 0
00046
00050 bool FUShmDQMOutputService::fuIdsInitialized_ = false;
00051 uint32 FUShmDQMOutputService::fuGuidValue_ = 0;
00052
00056 FUShmDQMOutputService::FUShmDQMOutputService(const edm::ParameterSet &pset,
00057 edm::ActivityRegistry &actReg)
00058 : evf::ServiceWeb("FUShmDQMOutputService")
00059 , updateNumber_(0)
00060 , shmBuffer_(0)
00061 , nbUpdates_(0)
00062 , input("INPUT")
00063 , dqm("DQM")
00064 {
00065
00066
00067
00068
00069 actReg.watchPostEndLumi(this, &FUShmDQMOutputService::postEndLumi);
00070
00071
00072
00073
00074 actReg.watchPostSourceConstruction(this,
00075 &FUShmDQMOutputService::postSourceConstructionProcessing);
00076
00077
00078 actReg.watchPreBeginRun(this, &FUShmDQMOutputService::preBeginRun);
00079
00080
00081
00082
00083 actReg.watchPostEndJob(this, &FUShmDQMOutputService::postEndJobProcessing);
00084
00085
00086 int initialSize =
00087 pset.getUntrackedParameter<int>("initialMessageBufferSize", 1000000);
00088 messageBuffer_.resize(initialSize);
00089 lumiSectionsPerUpdate_ = pset.getParameter<double>("lumiSectionsPerUpdate");
00090
00091 if (lumiSectionsPerUpdate_ <= 1.0) {lumiSectionsPerUpdate_ = 1.0;}
00092 initializationIsNeeded_ = true;
00093 useCompression_ = pset.getParameter<bool>("useCompression");
00094 compressionLevel_ = pset.getParameter<int>("compressionLevel");
00095
00096
00097
00098
00099
00100 lumiSectionInterval_ =
00101 pset.getUntrackedParameter<int>("lumiSectionInterval", 0);
00102 if (lumiSectionInterval_ < 1) {lumiSectionInterval_ = 0;}
00103
00104
00105 struct timeval now;
00106 struct timezone dummyTZ;
00107 gettimeofday(&now, &dummyTZ);
00108
00109 timeInSecSinceUTC_ = static_cast<double>(now.tv_sec) + (static_cast<double>(now.tv_usec)/1000000.0);
00110
00111 int got_host = gethostname(host_name_, sizeof(host_name_));
00112 if(got_host != 0) strcpy(host_name_, "noHostNameFoundOrTooLong");
00113
00114 if (! fuIdsInitialized_) {
00115 fuIdsInitialized_ = true;
00116
00117 edm::Guid guidObj(true);
00118 std::string guidString = guidObj.toString();
00119
00120
00121 uLong crc = crc32(0L, Z_NULL, 0);
00122 Bytef* buf = (Bytef*)guidString.data();
00123 crc = crc32(crc, buf, guidString.length());
00124 fuGuidValue_ = crc;
00125
00126
00127 }
00128 }
00129
00133 FUShmDQMOutputService::~FUShmDQMOutputService(void)
00134 {
00135 shmdt(shmBuffer_);
00136 }
00137
00138 void FUShmDQMOutputService::defaultWebPage(xgi::Input *in, xgi::Output *out)
00139 {
00140 }
00141
00142 void FUShmDQMOutputService::publish(xdata::InfoSpace *is)
00143 {
00144 try{
00145 is->fireItemAvailable("nbDqmUpdates",&nbUpdates_);
00146 }
00147 catch(xdata::exception::Exception &e){
00148 edm::LogInfo("FUShmDQMOutputService")
00149 << " exception when publishing to InfoSpace ";
00150 }
00151 }
00152
00153 void FUShmDQMOutputService::postEndLumi(edm::LuminosityBlock const &lb, edm::EventSetup const &es)
00154 {
00155 evf::MicroStateService *mss = 0;
00156 try{
00157 mss = edm::Service<evf::MicroStateService>().operator->();
00158 if(mss) mss->setMicroState(&dqm);
00159 }
00160 catch(...) {
00161 edm::LogError("FUShmDQMOutputService")<< "exception when trying to get service MicroStateService";
00162 }
00163
00164
00165
00166 unsigned int thisLumiSection = 0;
00167 if(lumiSectionInterval_ == 0)
00168 thisLumiSection = lb.luminosityBlock();
00169 else {
00170
00171 struct timeval now;
00172 struct timezone dummyTZ;
00173 gettimeofday(&now, &dummyTZ);
00174 double timeInSec = static_cast<double>(now.tv_sec) + (static_cast<double>(now.tv_usec)/1000000.0) - timeInSecSinceUTC_;
00175
00176 if(lumiSectionInterval_ > 0) thisLumiSection = static_cast<uint32>(timeInSec/lumiSectionInterval_);
00177 }
00178
00179
00180 if (initializationIsNeeded_) {
00181 initializationIsNeeded_ = false;
00182 lumiSectionOfPreviousUpdate_ = thisLumiSection;
00183 firstLumiSectionSeen_ = thisLumiSection;
00184
00185
00186
00187 struct timeval now;
00188 struct timezone dummyTZ;
00189 gettimeofday(&now, &dummyTZ);
00190
00191 timeInSecSinceUTC_ = static_cast<double>(now.tv_sec) + (static_cast<double>(now.tv_usec)/1000000.0);
00192 }
00193
00194
00195
00196 if(thisLumiSection%4!=0)
00197 {
00198
00199
00200 if(mss) mss->setMicroState(&input);
00201 return;
00202 }
00203
00204
00205
00206
00207
00208
00209
00210
00211
00212 unsigned int lumiSectionTag = thisLumiSection;
00213
00214
00215 if (bei == NULL) {
00216 bei = edm::Service<DQMStore>().operator->();
00217 }
00218
00219
00220 if (bei == NULL) {
00221 throw cms::Exception("postEventProcessing", "FUShmDQMOutputService")
00222 << "Unable to lookup the DQMStore service!\n";
00223 }
00224
00225
00226
00227 std::vector<std::string> topLevelFolderList;
00228
00229 bei->cd();
00230
00231 topLevelFolderList = bei->getSubdirs();
00232
00233
00234
00235 std::map< std::string, DQMEvent::TObjectTable > toMap;
00236 std::vector<std::string>::const_iterator dirIter;
00237 for (dirIter = topLevelFolderList.begin();
00238 dirIter != topLevelFolderList.end();
00239 dirIter++) {
00240 std::string dirName = *dirIter;
00241 DQMEvent::TObjectTable toTable;
00242
00243
00244 findMonitorElements(toTable, dirName);
00245
00246
00247 toMap[dirName] = toTable;
00248 }
00249
00250
00251
00252 for (dirIter = topLevelFolderList.begin();
00253 dirIter != topLevelFolderList.end();
00254 dirIter++) {
00255 std::string dirName = *dirIter;
00256 DQMEvent::TObjectTable toTable = toMap[dirName];
00257 if (toTable.size() == 0) {continue;}
00258
00259
00260 serializeWorker_.serializeDQMEvent(toTable, useCompression_,
00261 compressionLevel_);
00262
00263
00264 unsigned int srcSize = serializeWorker_.currentSpaceUsed();
00265 unsigned int newSize = srcSize + 50000;
00266 if (messageBuffer_.size() < newSize) messageBuffer_.resize(newSize);
00267
00268
00269 DQMEventMsgBuilder dqmMsgBuilder(&messageBuffer_[0], messageBuffer_.size(),
00270 lb.run(), lb.luminosityBlock(),
00271 lb.endTime(),
00272 lumiSectionTag, updateNumber_,
00273 (uint32)serializeWorker_.adler32_chksum(),
00274 host_name_,
00275 edm::getReleaseVersion(), dirName,
00276 toTable);
00277
00278
00279 unsigned char* src = serializeWorker_.bufferPointer();
00280 std::copy(src,src + srcSize, dqmMsgBuilder.eventAddress());
00281 dqmMsgBuilder.setEventLength(srcSize);
00282 if (useCompression_) {
00283 dqmMsgBuilder.setCompressionFlag(serializeWorker_.currentEventSize());
00284 }
00285
00286
00287 dqmMsgBuilder.setFUProcessId(getpid());
00288 dqmMsgBuilder.setFUGuid(fuGuidValue_);
00289
00290
00291 writeShmDQMData(dqmMsgBuilder);
00292
00293
00294 if(mss) mss->setMicroState(&input);
00295
00296 }
00297
00298
00299
00300
00301
00302
00303 lumiSectionOfPreviousUpdate_ = thisLumiSection;
00304 nbUpdates_++;
00305 updateNumber_++;
00306 }
00307
00312 void FUShmDQMOutputService::postSourceConstructionProcessing(const edm::ModuleDescription &moduleDesc)
00313 {
00314
00315 bei = edm::Service<DQMStore>().operator->();
00316 }
00317
00322 void FUShmDQMOutputService::preBeginRun(const edm::RunID &runID,
00323 const edm::Timestamp ×tamp)
00324 {
00325 nbUpdates_ = 0;
00326 updateNumber_ = 0;
00327 initializationIsNeeded_ = true;
00328 }
00329
00334 void FUShmDQMOutputService::postEndJobProcessing()
00335 {
00336
00337 initializationIsNeeded_ = true;
00338 }
00339
00344 void FUShmDQMOutputService::findMonitorElements(DQMEvent::TObjectTable &toTable,
00345 std::string folderPath)
00346 {
00347 if (bei == NULL) {return;}
00348
00349
00350 std::vector<MonitorElement *> localMEList = bei->getContents(folderPath);
00351
00352
00353
00354 std::vector<TObject *> updateTOList;
00355 for (int idx = 0; idx < (int) localMEList.size(); idx++) {
00356 MonitorElement *mePtr = localMEList[idx];
00357
00358 updateTOList.push_back(mePtr->getRootObject());
00359
00360 }
00361 if (updateTOList.size() > 0) {
00362 toTable[folderPath] = updateTOList;
00363 }
00364
00365
00366
00367
00368
00369 if (bei->dirExists(folderPath)) {
00370 bei->setCurrentFolder(folderPath);
00371 std::vector<std::string> subDirList = bei->getSubdirs();
00372
00373
00374 std::vector<std::string>::const_iterator dirIter;
00375 for (dirIter = subDirList.begin(); dirIter != subDirList.end(); dirIter++) {
00376 std::string subDirPath = (*dirIter);
00377 findMonitorElements(toTable, subDirPath);
00378 }
00379 }
00380 }
00381
00385 void FUShmDQMOutputService::writeShmDQMData(DQMEventMsgBuilder const& dqmMsgBuilder)
00386 {
00387
00388 unsigned char* buffer = (unsigned char*) dqmMsgBuilder.startAddress();
00389 unsigned int size = dqmMsgBuilder.size();
00390
00391
00392 DQMEventMsgView dqmMsgView(buffer);
00393 unsigned int runid = dqmMsgView.runNumber();
00394 unsigned int eventid = dqmMsgView.eventNumberAtUpdate();
00395
00396
00397 std::string topFolder = dqmMsgView.topFolderName();
00398 uLong crc = crc32(0L, Z_NULL, 0);
00399 Bytef* buf = (Bytef*)topFolder.data();
00400 crc = crc32(crc, buf, topFolder.length());
00401
00402 if(!shmBuffer_) {
00403 edm::LogError("FUDQMShmOutputService")
00404 << " Error writing to shared memory as shm is not available";
00405 } else {
00406 bool ret = shmBuffer_->writeDqmEventData(runid, eventid, (unsigned int)crc,
00407 getpid(), fuGuidValue_, buffer, size);
00408 if(!ret) edm::LogError("FUShmDQMOutputService") << " Error with writing data to ShmBuffer";
00409 }
00410
00411 }
00412
00413
00414
00415 bool FUShmDQMOutputService::attachToShm()
00416 {
00417 if(0==shmBuffer_) {
00418 shmBuffer_ = evf::FUShmBuffer::getShmBuffer();
00419 if (0==shmBuffer_) {
00420 edm::LogError("FUDQMShmOutputService")<<"Failed to attach to shared memory";
00421 return false;
00422 }
00423 return true;
00424 }
00425 return false;
00426
00427 }
00428
00429
00430
00431 bool FUShmDQMOutputService::detachFromShm()
00432 {
00433 if(0!=shmBuffer_) {
00434 shmdt(shmBuffer_);
00435 shmBuffer_ = 0;
00436 }
00437 return true;
00438 }