CMS 3D CMS Logo

CMSSW_4_4_3_patch1/src/EventFilter/SMProxyServer/src/DQMArchiver.cc

Go to the documentation of this file.
00001 // $Id: DQMArchiver.cc,v 1.3 2011/03/30 15:33:43 mommsen Exp $
00003 
00004 #include "DQMServices/Core/interface/DQMStore.h"
00005 #include "EventFilter/SMProxyServer/interface/DQMArchiver.h"
00006 #include "EventFilter/SMProxyServer/interface/Exception.h"
00007 #include "EventFilter/StorageManager/interface/ConsumerID.h"
00008 #include "EventFilter/StorageManager/interface/DQMEventMonitorCollection.h"
00009 #include "EventFilter/StorageManager/interface/QueueID.h"
00010 #include "EventFilter/StorageManager/src/DQMHttpSource.h"
00011 #include "FWCore/ParameterSet/interface/ParameterSet.h"
00012 
00013 #include "TObject.h"
00014 
00015 #include <boost/foreach.hpp>
00016 
00017 #include <memory>
00018 #include <string>
00019 #include <vector>
00020 
00021 namespace smproxy
00022 {
00023   DQMArchiver::DQMArchiver(StateMachine* stateMachine) :
00024   stateMachine_(stateMachine),
00025   dqmArchivingParams_(stateMachine->getConfiguration()->getDQMArchivingParams()),
00026   dqmEventQueueCollection_(stateMachine->getDQMEventQueueCollection())
00027   {
00028     if ( dqmArchivingParams_.archiveDQM_ )
00029     {
00030       createRegistration();
00031       thread_.reset(
00032         new boost::thread( boost::bind(&DQMArchiver::activity, this) )
00033       );
00034     }
00035   }
00036 
00037   DQMArchiver::~DQMArchiver()
00038   {
00039     if (thread_) thread_->join();
00040   }
00041   
00042   void DQMArchiver::activity()
00043   {
00044     try
00045     {
00046       doIt();
00047     }
00048     catch(xcept::Exception &e)
00049     {
00050       stateMachine_->moveToFailedState(e);
00051     }
00052     catch(std::exception &e)
00053     {
00054       XCEPT_DECLARE(exception::DQMArchival,
00055         sentinelException, e.what());
00056       stateMachine_->moveToFailedState(sentinelException);
00057     }
00058     catch(...)
00059     {
00060       std::string errorMsg = "Unknown exception in DQM archiving thread";
00061       XCEPT_DECLARE(exception::DQMArchival,
00062         sentinelException, errorMsg);
00063       stateMachine_->moveToFailedState(sentinelException);
00064     }
00065   }
00066   
00067   void DQMArchiver::doIt()
00068   {
00069     stor::RegistrationCollectionPtr registrationCollection =
00070       stateMachine_->getRegistrationCollection();
00071     const stor::ConsumerID cid = regPtr_->consumerId();
00072     
00073     while ( registrationCollection->registrationIsAllowed(cid) )
00074     {
00075       const stor::DQMEventQueueCollection::ValueType dqmEvent =
00076         dqmEventQueueCollection_->popEvent(cid);
00077       
00078       if ( dqmEvent.first.empty() )
00079         ::sleep(1);
00080       else
00081         handleDQMEvent(dqmEvent.first);
00082     }
00083 
00084     // run ended, write the last updates to file
00085     BOOST_FOREACH(
00086       const Records::value_type& pair,
00087       lastUpdateForFolders_
00088     )
00089     {
00090       writeDQMEventToFile(pair.second.getDQMEventMsgView(), true);
00091     }
00092   }
00093 
00094   void DQMArchiver::handleDQMEvent(const stor::DQMTopLevelFolder::Record& record)
00095   {
00096     const DQMEventMsgView view = record.getDQMEventMsgView();
00097 
00098     updateLastRecord(record);
00099 
00100     if (
00101       dqmArchivingParams_.archiveIntervalDQM_ > 0 &&
00102       ((view.updateNumber()+1) % dqmArchivingParams_.archiveIntervalDQM_) == 0
00103     )
00104     {
00105       writeDQMEventToFile(view, false);
00106     }
00107   }
00108 
00109   void DQMArchiver::updateLastRecord(const stor::DQMTopLevelFolder::Record& record)
00110   {
00111     const DQMEventMsgView view = record.getDQMEventMsgView();
00112     const std::string topFolderName = view.topFolderName();
00113     Records::iterator pos = lastUpdateForFolders_.lower_bound(topFolderName);
00114 
00115     if (pos != lastUpdateForFolders_.end() &&
00116       !(lastUpdateForFolders_.key_comp()(topFolderName, pos->first)))
00117     {
00118       // key already exists
00119       pos->second = record;
00120     }
00121     else
00122     {
00123       lastUpdateForFolders_.insert(pos, Records::value_type(topFolderName, record));
00124     }
00125   }
00126 
00127   void DQMArchiver::writeDQMEventToFile
00128   (
00129     const DQMEventMsgView& view,
00130     const bool endRun
00131   ) const
00132   {
00133     edm::ParameterSet dqmStorePSet;
00134     dqmStorePSet.addUntrackedParameter<bool>("collateHistograms", true);
00135     DQMStore dqmStore(dqmStorePSet);
00136 
00137     std::ostringstream fileName;
00138     fileName << dqmArchivingParams_.filePrefixDQM_
00139       << "/DQM_R"
00140       << std::setfill('0') << std::setw(9) << view.runNumber();
00141     if ( ! endRun ) fileName << "_L" << std::setw(6) << view.lumiSection();
00142     fileName << ".root";
00143     
00144     // don't require that the file exists
00145     dqmStore.load(fileName.str(), DQMStore::StripRunDirs, false);
00146 
00147     edm::DQMHttpSource::addEventToDQMBackend(&dqmStore, view, false);
00148 
00149     dqmStore.save(fileName.str());
00150 
00151     stor::DQMEventMonitorCollection& demc =
00152       stateMachine_->getStatisticsReporter()->getDQMEventMonitorCollection();
00153     demc.getWrittenDQMEventSizeMQ().addSample(
00154       static_cast<double>(view.size()) / 0x100000
00155     );
00156     demc.getNumberOfWrittenTopLevelFoldersMQ().addSample(1);
00157   }
00158 
00159   void DQMArchiver::createRegistration()
00160   {
00161     edm::ParameterSet pset;
00162     pset.addUntrackedParameter<std::string>("DQMconsumerName", "DQMArchiver");
00163     pset.addUntrackedParameter<std::string>("topLevelFolderName", 
00164       dqmArchivingParams_.archiveTopLevelFolder_);
00165 
00166     regPtr_.reset( new stor::DQMEventConsumerRegistrationInfo(pset,
00167         stateMachine_->getConfiguration()->getEventServingParams(),
00168         "internal")
00169     );
00170     
00171     stor::RegistrationCollectionPtr registrationCollection =
00172       stateMachine_->getRegistrationCollection();
00173     
00174     const stor::ConsumerID cid = registrationCollection->getConsumerId();
00175     regPtr_->setConsumerId(cid);
00176 
00177     const stor::QueueID qid = dqmEventQueueCollection_->createQueue(regPtr_);
00178     regPtr_->setQueueId(qid);
00179 
00180     registrationCollection->addRegistrationInfo(regPtr_);
00181   }
00182 
00183 } // namespace smproxy
00184