00001
00024 #include "EventFilter/Modules/interface/FUShmDQMOutputService.h"
00025 #include "EventFilter/Utilities/interface/MicroStateService.h"
00026 #include "FWCore/ServiceRegistry/interface/Service.h"
00027 #include "FWCore/Version/interface/GetReleaseVersion.h"
00028 #include "FWCore/MessageLogger/interface/MessageLogger.h"
00029 #include "DQMServices/Core/interface/MonitorElement.h"
00030 #include "DQMServices/Core/interface/DQMStore.h"
00031 #include "FWCore/Utilities/src/Guid.h"
00032 #include "DataFormats/Provenance/interface/ModuleDescription.h"
00033 #include "TClass.h"
00034 #include "zlib.h"
00035 #include <unistd.h>
00036 #include <sys/types.h>
00037
00038 using namespace std;
00039
00045 #define DSS_DEBUG 0
00046
00050 bool FUShmDQMOutputService::fuIdsInitialized_ = false;
00051 uint32 FUShmDQMOutputService::fuGuidValue_ = 0;
00052
00056 FUShmDQMOutputService::FUShmDQMOutputService(const edm::ParameterSet &pset,
00057 edm::ActivityRegistry &actReg)
00058 : evf::ServiceWeb("FUShmDQMOutputService")
00059 , updateNumber_(0)
00060 , shmBuffer_(0)
00061 , nbUpdates_(0)
00062 , input("INPUT")
00063 , dqm("DQM")
00064 , attach_(false)
00065 {
00066
00067
00068
00069
00070 actReg.watchPostEndLumi(this, &FUShmDQMOutputService::postEndLumi);
00071
00072
00073
00074
00075 actReg.watchPostSourceConstruction(this,
00076 &FUShmDQMOutputService::postSourceConstructionProcessing);
00077
00078
00079 actReg.watchPreBeginRun(this, &FUShmDQMOutputService::preBeginRun);
00080
00081
00082
00083
00084 actReg.watchPostEndJob(this, &FUShmDQMOutputService::postEndJobProcessing);
00085
00086
00087 int initialSize =
00088 pset.getUntrackedParameter<int>("initialMessageBufferSize", 1000000);
00089 messageBuffer_.resize(initialSize);
00090 lumiSectionsPerUpdate_ = pset.getParameter<double>("lumiSectionsPerUpdate");
00091
00092 if (lumiSectionsPerUpdate_ <= 1.0) {lumiSectionsPerUpdate_ = 1.0;}
00093 initializationIsNeeded_ = true;
00094 useCompression_ = pset.getParameter<bool>("useCompression");
00095 compressionLevel_ = pset.getParameter<int>("compressionLevel");
00096
00097
00098
00099
00100
00101 lumiSectionInterval_ =
00102 pset.getUntrackedParameter<int>("lumiSectionInterval", 0);
00103 if (lumiSectionInterval_ < 1) {lumiSectionInterval_ = 0;}
00104
00105
00106 struct timeval now;
00107 struct timezone dummyTZ;
00108 gettimeofday(&now, &dummyTZ);
00109
00110 timeInSecSinceUTC_ = static_cast<double>(now.tv_sec) + (static_cast<double>(now.tv_usec)/1000000.0);
00111
00112 int got_host = gethostname(host_name_, sizeof(host_name_));
00113 if(got_host != 0) strcpy(host_name_, "noHostNameFoundOrTooLong");
00114
00115 if (! fuIdsInitialized_) {
00116 fuIdsInitialized_ = true;
00117
00118 edm::Guid guidObj(true);
00119 std::string guidString = guidObj.toString();
00120
00121
00122 uLong crc = crc32(0L, Z_NULL, 0);
00123 Bytef* buf = (Bytef*)guidString.data();
00124 crc = crc32(crc, buf, guidString.length());
00125 fuGuidValue_ = crc;
00126
00127
00128 }
00129 }
00130
00134 FUShmDQMOutputService::~FUShmDQMOutputService(void)
00135 {
00136 shmdt(shmBuffer_);
00137 }
00138
00139 void FUShmDQMOutputService::defaultWebPage(xgi::Input *in, xgi::Output *out)
00140 {
00141 }
00142
00143 void FUShmDQMOutputService::publish(xdata::InfoSpace *is)
00144 {
00145 try{
00146 is->fireItemAvailable("nbDqmUpdates",&nbUpdates_);
00147 }
00148 catch(xdata::exception::Exception &e){
00149 edm::LogInfo("FUShmDQMOutputService")
00150 << " exception when publishing to InfoSpace ";
00151 }
00152 }
00153
00154 void FUShmDQMOutputService::postEndLumi(edm::LuminosityBlock const &lb, edm::EventSetup const &es)
00155 {
00156 if (attach_) attachToShm();
00157 attach_=false;
00158
00159 evf::MicroStateService *mss = 0;
00160 try{
00161 mss = edm::Service<evf::MicroStateService>().operator->();
00162 if(mss) mss->setMicroState(&dqm);
00163 }
00164 catch(...) {
00165 edm::LogError("FUShmDQMOutputService")<< "exception when trying to get service MicroStateService";
00166 }
00167
00168
00169
00170 unsigned int thisLumiSection = 0;
00171 if(lumiSectionInterval_ == 0)
00172 thisLumiSection = lb.luminosityBlock();
00173 else {
00174
00175 struct timeval now;
00176 struct timezone dummyTZ;
00177 gettimeofday(&now, &dummyTZ);
00178 double timeInSec = static_cast<double>(now.tv_sec) + (static_cast<double>(now.tv_usec)/1000000.0) - timeInSecSinceUTC_;
00179
00180 if(lumiSectionInterval_ > 0) thisLumiSection = static_cast<uint32>(timeInSec/lumiSectionInterval_);
00181 }
00182
00183
00184 if (initializationIsNeeded_) {
00185 initializationIsNeeded_ = false;
00186 lumiSectionOfPreviousUpdate_ = thisLumiSection;
00187 firstLumiSectionSeen_ = thisLumiSection;
00188
00189
00190
00191 struct timeval now;
00192 struct timezone dummyTZ;
00193 gettimeofday(&now, &dummyTZ);
00194
00195 timeInSecSinceUTC_ = static_cast<double>(now.tv_sec) + (static_cast<double>(now.tv_usec)/1000000.0);
00196 }
00197
00198
00199
00200 if(thisLumiSection%4!=0)
00201 {
00202
00203
00204 if(mss) mss->setMicroState(&input);
00205 return;
00206 }
00207
00208
00209
00210
00211
00212
00213
00214
00215
00216 unsigned int lumiSectionTag = thisLumiSection;
00217
00218
00219 if (bei == NULL) {
00220 bei = edm::Service<DQMStore>().operator->();
00221 }
00222
00223
00224 if (bei == NULL) {
00225 throw cms::Exception("postEventProcessing", "FUShmDQMOutputService")
00226 << "Unable to lookup the DQMStore service!\n";
00227 }
00228
00229
00230
00231 std::vector<std::string> topLevelFolderList;
00232
00233 bei->cd();
00234
00235 topLevelFolderList = bei->getSubdirs();
00236
00237
00238
00239 std::map< std::string, DQMEvent::TObjectTable > toMap;
00240 std::vector<std::string>::const_iterator dirIter;
00241 for (dirIter = topLevelFolderList.begin();
00242 dirIter != topLevelFolderList.end();
00243 dirIter++) {
00244 std::string dirName = *dirIter;
00245 DQMEvent::TObjectTable toTable;
00246
00247
00248 findMonitorElements(toTable, dirName);
00249
00250
00251 toMap[dirName] = toTable;
00252 }
00253
00254
00255
00256 for (dirIter = topLevelFolderList.begin();
00257 dirIter != topLevelFolderList.end();
00258 dirIter++) {
00259 std::string dirName = *dirIter;
00260 DQMEvent::TObjectTable toTable = toMap[dirName];
00261 if (toTable.size() == 0) {continue;}
00262
00263
00264 serializeWorker_.serializeDQMEvent(toTable, useCompression_,
00265 compressionLevel_);
00266
00267
00268 unsigned int srcSize = serializeWorker_.currentSpaceUsed();
00269 unsigned int newSize = srcSize + 50000;
00270 if (messageBuffer_.size() < newSize) messageBuffer_.resize(newSize);
00271
00272
00273 DQMEventMsgBuilder dqmMsgBuilder(&messageBuffer_[0], messageBuffer_.size(),
00274 lb.run(), lb.luminosityBlock(),
00275 lb.endTime(),
00276 lumiSectionTag, updateNumber_,
00277 (uint32)serializeWorker_.adler32_chksum(),
00278 host_name_,
00279 edm::getReleaseVersion(), dirName,
00280 toTable);
00281
00282
00283 unsigned char* src = serializeWorker_.bufferPointer();
00284 std::copy(src,src + srcSize, dqmMsgBuilder.eventAddress());
00285 dqmMsgBuilder.setEventLength(srcSize);
00286 if (useCompression_) {
00287 dqmMsgBuilder.setCompressionFlag(serializeWorker_.currentEventSize());
00288 }
00289
00290
00291 dqmMsgBuilder.setFUProcessId(getpid());
00292 dqmMsgBuilder.setFUGuid(fuGuidValue_);
00293
00294
00295 writeShmDQMData(dqmMsgBuilder);
00296
00297
00298 if(mss) mss->setMicroState(&input);
00299
00300 }
00301
00302
00303
00304
00305
00306
00307 lumiSectionOfPreviousUpdate_ = thisLumiSection;
00308 nbUpdates_++;
00309 updateNumber_++;
00310 }
00311
00316 void FUShmDQMOutputService::postSourceConstructionProcessing(const edm::ModuleDescription &moduleDesc)
00317 {
00318
00319 bei = edm::Service<DQMStore>().operator->();
00320 }
00321
00326 void FUShmDQMOutputService::preBeginRun(const edm::RunID &runID,
00327 const edm::Timestamp ×tamp)
00328 {
00329 nbUpdates_ = 0;
00330 updateNumber_ = 0;
00331 initializationIsNeeded_ = true;
00332 }
00333
00338 void FUShmDQMOutputService::postEndJobProcessing()
00339 {
00340
00341 initializationIsNeeded_ = true;
00342 }
00343
00348 void FUShmDQMOutputService::findMonitorElements(DQMEvent::TObjectTable &toTable,
00349 std::string folderPath)
00350 {
00351 if (bei == NULL) {return;}
00352
00353
00354 std::vector<MonitorElement *> localMEList = bei->getContents(folderPath);
00355
00356
00357
00358 std::vector<TObject *> updateTOList;
00359 for (int idx = 0; idx < (int) localMEList.size(); idx++) {
00360 MonitorElement *mePtr = localMEList[idx];
00361
00362 updateTOList.push_back(mePtr->getRootObject());
00363
00364 }
00365 if (updateTOList.size() > 0) {
00366 toTable[folderPath] = updateTOList;
00367 }
00368
00369
00370
00371
00372
00373 if (bei->dirExists(folderPath)) {
00374 bei->setCurrentFolder(folderPath);
00375 std::vector<std::string> subDirList = bei->getSubdirs();
00376
00377
00378 std::vector<std::string>::const_iterator dirIter;
00379 for (dirIter = subDirList.begin(); dirIter != subDirList.end(); dirIter++) {
00380 std::string subDirPath = (*dirIter);
00381 findMonitorElements(toTable, subDirPath);
00382 }
00383 }
00384 }
00385
00389 void FUShmDQMOutputService::writeShmDQMData(DQMEventMsgBuilder const& dqmMsgBuilder)
00390 {
00391
00392 unsigned char* buffer = (unsigned char*) dqmMsgBuilder.startAddress();
00393 unsigned int size = dqmMsgBuilder.size();
00394
00395
00396 DQMEventMsgView dqmMsgView(buffer);
00397 unsigned int runid = dqmMsgView.runNumber();
00398 unsigned int eventid = dqmMsgView.eventNumberAtUpdate();
00399
00400
00401 std::string topFolder = dqmMsgView.topFolderName();
00402 uLong crc = crc32(0L, Z_NULL, 0);
00403 Bytef* buf = (Bytef*)topFolder.data();
00404 crc = crc32(crc, buf, topFolder.length());
00405
00406 if(!shmBuffer_) {
00407 edm::LogError("FUDQMShmOutputService")
00408 << " Error writing to shared memory as shm is not available";
00409 } else {
00410 bool ret = shmBuffer_->writeDqmEventData(runid, eventid, (unsigned int)crc,
00411 getpid(), fuGuidValue_, buffer, size);
00412 if(!ret) edm::LogError("FUShmDQMOutputService") << " Error with writing data to ShmBuffer";
00413 }
00414
00415 }
00416
00417 void FUShmDQMOutputService::setAttachToShm() {
00418 attach_=true;
00419 }
00420
00421 bool FUShmDQMOutputService::attachToShm()
00422 {
00423 if(0==shmBuffer_) {
00424 shmBuffer_ = evf::FUShmBuffer::getShmBuffer();
00425 if (0==shmBuffer_) {
00426 edm::LogError("FUDQMShmOutputService")<<"Failed to attach to shared memory";
00427 return false;
00428 }
00429 return true;
00430 }
00431 return false;
00432
00433 }
00434
00435
00436
00437 bool FUShmDQMOutputService::detachFromShm()
00438 {
00439 if(0!=shmBuffer_) {
00440 shmdt(shmBuffer_);
00441 shmBuffer_ = 0;
00442 }
00443 return true;
00444 }