Go to the documentation of this file.00001
00003
00004 #include "DQMServices/Core/interface/DQMStore.h"
00005 #include "EventFilter/SMProxyServer/interface/DQMArchiver.h"
00006 #include "EventFilter/SMProxyServer/interface/Exception.h"
00007 #include "EventFilter/StorageManager/interface/ConsumerID.h"
00008 #include "EventFilter/StorageManager/interface/DQMEventMonitorCollection.h"
00009 #include "EventFilter/StorageManager/interface/QueueID.h"
00010 #include "EventFilter/StorageManager/src/DQMHttpSource.h"
00011 #include "FWCore/ParameterSet/interface/ParameterSet.h"
00012
00013 #include "TObject.h"
00014
00015 #include <boost/foreach.hpp>
00016
00017 #include <memory>
00018 #include <string>
00019 #include <vector>
00020
00021 namespace smproxy
00022 {
00023 DQMArchiver::DQMArchiver(StateMachine* stateMachine) :
00024 stateMachine_(stateMachine),
00025 dqmArchivingParams_(stateMachine->getConfiguration()->getDQMArchivingParams()),
00026 dqmEventQueueCollection_(stateMachine->getDQMEventQueueCollection())
00027 {
00028 if ( dqmArchivingParams_.archiveDQM_ )
00029 {
00030 createRegistration();
00031 thread_.reset(
00032 new boost::thread( boost::bind(&DQMArchiver::activity, this) )
00033 );
00034 }
00035 }
00036
00037 DQMArchiver::~DQMArchiver()
00038 {
00039 if (thread_) thread_->join();
00040 }
00041
00042 void DQMArchiver::activity()
00043 {
00044 try
00045 {
00046 doIt();
00047 }
00048 catch(xcept::Exception &e)
00049 {
00050 stateMachine_->moveToFailedState(e);
00051 }
00052 catch(std::exception &e)
00053 {
00054 XCEPT_DECLARE(exception::DQMArchival,
00055 sentinelException, e.what());
00056 stateMachine_->moveToFailedState(sentinelException);
00057 }
00058 catch(...)
00059 {
00060 std::string errorMsg = "Unknown exception in DQM archiving thread";
00061 XCEPT_DECLARE(exception::DQMArchival,
00062 sentinelException, errorMsg);
00063 stateMachine_->moveToFailedState(sentinelException);
00064 }
00065 }
00066
00067 void DQMArchiver::doIt()
00068 {
00069 stor::RegistrationCollectionPtr registrationCollection =
00070 stateMachine_->getRegistrationCollection();
00071 const stor::ConsumerID cid = regPtr_->consumerId();
00072
00073 while ( registrationCollection->registrationIsAllowed(cid) )
00074 {
00075 const stor::DQMEventQueueCollection::ValueType dqmEvent =
00076 dqmEventQueueCollection_->popEvent(cid);
00077
00078 if ( dqmEvent.first.empty() )
00079 ::sleep(1);
00080 else
00081 handleDQMEvent(dqmEvent.first);
00082 }
00083
00084
00085 BOOST_FOREACH(
00086 const Records::value_type& pair,
00087 lastUpdateForFolders_
00088 )
00089 {
00090 writeDQMEventToFile(pair.second.getDQMEventMsgView(), true);
00091 }
00092 }
00093
00094 void DQMArchiver::handleDQMEvent(const stor::DQMTopLevelFolder::Record& record)
00095 {
00096 const DQMEventMsgView view = record.getDQMEventMsgView();
00097
00098 updateLastRecord(record);
00099
00100 if (
00101 dqmArchivingParams_.archiveIntervalDQM_ > 0 &&
00102 ((view.updateNumber()+1) % dqmArchivingParams_.archiveIntervalDQM_) == 0
00103 )
00104 {
00105 writeDQMEventToFile(view, false);
00106 }
00107 }
00108
00109 void DQMArchiver::updateLastRecord(const stor::DQMTopLevelFolder::Record& record)
00110 {
00111 const DQMEventMsgView view = record.getDQMEventMsgView();
00112 const std::string topFolderName = view.topFolderName();
00113 Records::iterator pos = lastUpdateForFolders_.lower_bound(topFolderName);
00114
00115 if (pos != lastUpdateForFolders_.end() &&
00116 !(lastUpdateForFolders_.key_comp()(topFolderName, pos->first)))
00117 {
00118
00119 pos->second = record;
00120 }
00121 else
00122 {
00123 lastUpdateForFolders_.insert(pos, Records::value_type(topFolderName, record));
00124 }
00125 }
00126
00127 void DQMArchiver::writeDQMEventToFile
00128 (
00129 const DQMEventMsgView& view,
00130 const bool endRun
00131 ) const
00132 {
00133 edm::ParameterSet dqmStorePSet;
00134 dqmStorePSet.addUntrackedParameter<bool>("collateHistograms", true);
00135 DQMStore dqmStore(dqmStorePSet);
00136
00137 std::ostringstream fileName;
00138 fileName << dqmArchivingParams_.filePrefixDQM_
00139 << "/DQM_R"
00140 << std::setfill('0') << std::setw(9) << view.runNumber();
00141 if ( ! endRun ) fileName << "_L" << std::setw(6) << view.lumiSection();
00142 fileName << ".root";
00143
00144
00145 dqmStore.load(fileName.str(), DQMStore::StripRunDirs, false);
00146
00147 edm::DQMHttpSource::addEventToDQMBackend(&dqmStore, view, false);
00148
00149 dqmStore.save(fileName.str());
00150
00151 stor::DQMEventMonitorCollection& demc =
00152 stateMachine_->getStatisticsReporter()->getDQMEventMonitorCollection();
00153 demc.getWrittenDQMEventSizeMQ().addSample(
00154 static_cast<double>(view.size()) / 0x100000
00155 );
00156 demc.getNumberOfWrittenTopLevelFoldersMQ().addSample(1);
00157 }
00158
00159 void DQMArchiver::createRegistration()
00160 {
00161 edm::ParameterSet pset;
00162 pset.addUntrackedParameter<std::string>("DQMconsumerName", "DQMArchiver");
00163 pset.addUntrackedParameter<std::string>("topLevelFolderName",
00164 dqmArchivingParams_.archiveTopLevelFolder_);
00165
00166 regPtr_.reset( new stor::DQMEventConsumerRegistrationInfo(pset,
00167 stateMachine_->getConfiguration()->getEventServingParams(),
00168 "internal")
00169 );
00170
00171 stor::RegistrationCollectionPtr registrationCollection =
00172 stateMachine_->getRegistrationCollection();
00173
00174 const stor::ConsumerID cid = registrationCollection->getConsumerId();
00175 regPtr_->setConsumerId(cid);
00176
00177 const stor::QueueID qid = dqmEventQueueCollection_->createQueue(regPtr_);
00178 regPtr_->setQueueId(qid);
00179
00180 registrationCollection->addRegistrationInfo(regPtr_);
00181 }
00182
00183 }
00184