CMS 3D CMS Logo

/data/refman/pasoursint/CMSSW_4_1_8_patch12/src/EventFilter/StorageManager/src/DQMEventProcessor.cc

Go to the documentation of this file.
00001 // $Id: DQMEventProcessor.cc,v 1.16 2011/04/19 16:01:53 mommsen Exp $
00003 
00004 #include "toolbox/task/WorkLoopFactory.h"
00005 #include "xcept/tools.h"
00006 
00007 #include "EventFilter/StorageManager/interface/Exception.h"
00008 #include "EventFilter/StorageManager/interface/DQMEventProcessor.h"
00009 #include "EventFilter/StorageManager/interface/DQMEventProcessorResources.h"
00010 #include "EventFilter/StorageManager/interface/DQMEventQueueCollection.h"
00011 #include "EventFilter/StorageManager/interface/QueueID.h"
00012 #include "EventFilter/StorageManager/interface/StatisticsReporter.h"
00013 #include "EventFilter/StorageManager/src/DQMEventStore.icc"
00014 
00015 
00016 namespace stor {
00017   
00019   // Specializations for DQMEventStore //
00021   
00022   template<>  
00023   DQMEventMsgView
00024   DQMEventStore<I2OChain,InitMsgCollection,SharedResources>::
00025   getDQMEventView(I2OChain const& dqmEvent)
00026   {
00027     tempEventArea_.clear();
00028     dqmEvent.copyFragmentsIntoBuffer(tempEventArea_);
00029     return DQMEventMsgView(&tempEventArea_[0]);
00030   }
00031 
00032 
00033 
00034   DQMEventProcessor::DQMEventProcessor(xdaq::Application* app, SharedResourcesPtr sr) :
00035   app_(app),
00036   sharedResources_(sr),
00037   actionIsActive_(true),
00038   latestLumiSection_(0),
00039   discardDQMUpdatesForOlderLS_(0),
00040   dqmEventStore_
00041   (
00042     app->getApplicationDescriptor(),
00043     sr->dqmEventQueueCollection_,
00044     sr->statisticsReporter_->getDQMEventMonitorCollection(),
00045     sr->initMsgCollection_.get(),
00046     &stor::InitMsgCollection::maxMsgCount,
00047     sr.get(),
00048     &stor::SharedResources::moveToFailedState,
00049     sr->statisticsReporter_->alarmHandler()
00050   )
00051   {
00052     WorkerThreadParams workerParams =
00053       sharedResources_->configuration_->getWorkerThreadParams();
00054     timeout_ = workerParams.DQMEPdeqWaitTime_;
00055   }
00056   
00057   
00058   DQMEventProcessor::~DQMEventProcessor()
00059   {
00060     // Stop the activity
00061     actionIsActive_ = false;
00062     
00063     // Cancel the workloop (will wait until the action has finished)
00064     processWL_->cancel();
00065   }
00066   
00067   
00068   void DQMEventProcessor::startWorkLoop(std::string workloopName)
00069   {
00070     try
00071     {
00072       std::string identifier = utils::getIdentifier(app_->getApplicationDescriptor());
00073       
00074       processWL_ = toolbox::task::getWorkLoopFactory()->
00075       getWorkLoop( identifier + workloopName, "waiting" );
00076       
00077       if ( ! processWL_->isActive() )
00078       {
00079         toolbox::task::ActionSignature* processAction = 
00080           toolbox::task::bind(this, &DQMEventProcessor::processDQMEvents,
00081           identifier + "ProcessNextDQMEvent");
00082         processWL_->submit(processAction);
00083         
00084         processWL_->activate();
00085       }
00086     }
00087     catch (xcept::Exception& e)
00088     {
00089       std::string msg = "Failed to start workloop 'DQMEventProcessor' with 'processNextDQMEvent'.";
00090       XCEPT_RETHROW(stor::exception::DQMEventProcessing, msg, e);
00091     }
00092   }
00093   
00094   
00095   bool DQMEventProcessor::processDQMEvents(toolbox::task::WorkLoop*)
00096   {
00097     std::string errorMsg = "Failed to process a DQM event: ";
00098     
00099     try
00100     {
00101       processNextDQMEvent();
00102     }
00103     catch(xcept::Exception &e)
00104     {
00105       XCEPT_DECLARE_NESTED( stor::exception::DQMEventProcessing,
00106         sentinelException, errorMsg, e );
00107       sharedResources_->moveToFailedState(sentinelException);
00108     }
00109     catch(std::exception &e)
00110     {
00111       errorMsg += e.what();
00112       XCEPT_DECLARE( stor::exception::DQMEventProcessing,
00113         sentinelException, errorMsg );
00114       sharedResources_->moveToFailedState(sentinelException);
00115     }
00116     catch(...)
00117     {
00118       errorMsg += "Unknown exception";
00119       XCEPT_DECLARE( stor::exception::DQMEventProcessing,
00120         sentinelException, errorMsg );
00121       sharedResources_->moveToFailedState(sentinelException);
00122     }
00123     
00124     return actionIsActive_;
00125   }
00126   
00127   
00128   void DQMEventProcessor::processNextDQMEvent()
00129   {
00130     DQMEventQueue::ValueType dqmEvent;
00131     DQMEventQueuePtr eq = sharedResources_->dqmEventQueue_;
00132     utils::TimePoint_t startTime = utils::getCurrentTime();
00133     if (eq->deqTimedWait(dqmEvent, timeout_))
00134     {
00135       utils::Duration_t elapsedTime = utils::getCurrentTime() - startTime;
00136       sharedResources_->statisticsReporter_->getThroughputMonitorCollection().
00137         addDQMEventProcessorIdleSample(elapsedTime);
00138 
00139       if (
00140         (discardDQMUpdatesForOlderLS_ > 0) &&
00141         (dqmEvent.first.lumiSection() + discardDQMUpdatesForOlderLS_ < latestLumiSection_)
00142       )
00143         // subtracting unsigned quantities might not yield the right result!
00144       {
00145         // discard very old LS
00146         sharedResources_->statisticsReporter_->getDQMEventMonitorCollection().
00147           getDroppedDQMEventCountsMQ().addSample(dqmEvent.second + 1);        
00148       }
00149       else
00150       {
00151         sharedResources_->statisticsReporter_->getThroughputMonitorCollection().
00152           addPoppedDQMEventSample(dqmEvent.first.memoryUsed());
00153         sharedResources_->statisticsReporter_->getDQMEventMonitorCollection().
00154           getDroppedDQMEventCountsMQ().addSample(dqmEvent.second);
00155         
00156         latestLumiSection_ = std::max(latestLumiSection_, dqmEvent.first.lumiSection());
00157         dqmEventStore_.addDQMEvent(dqmEvent.first);
00158       }
00159     }
00160     else
00161     {
00162       utils::Duration_t elapsedTime = utils::getCurrentTime() - startTime;
00163       sharedResources_->statisticsReporter_->getThroughputMonitorCollection().
00164         addDQMEventProcessorIdleSample(elapsedTime);
00165     }
00166     
00167     DQMEventProcessorResources::Requests requests;
00168     DQMProcessingParams dqmParams;
00169     boost::posix_time::time_duration newTimeoutValue;
00170     if (sharedResources_->dqmEventProcessorResources_->
00171       getRequests(requests, dqmParams, newTimeoutValue))
00172     {
00173       if (requests.configuration)
00174       {
00175         timeout_ = newTimeoutValue;
00176         dqmEventStore_.setParameters(dqmParams);
00177         discardDQMUpdatesForOlderLS_ = dqmParams.discardDQMUpdatesForOlderLS_;
00178       }
00179       if (requests.endOfRun)
00180       {
00181         endOfRun();
00182       }
00183       if (requests.storeDestruction)
00184       {
00185         dqmEventStore_.clear();
00186       }
00187       sharedResources_->dqmEventProcessorResources_->requestsDone();
00188     }
00189   }
00190   
00191   void DQMEventProcessor::endOfRun()
00192   {
00193     dqmEventStore_.purge();
00194   }
00195     
00196 } // namespace stor
00197