CMS 3D CMS Logo

/data/refman/pasoursint/CMSSW_5_3_3/src/EventFilter/StorageManager/src/DQMEventProcessor.cc

Go to the documentation of this file.
00001 // $Id: DQMEventProcessor.cc,v 1.19 2012/04/20 10:48:01 mommsen Exp $
00003 
00004 #include "toolbox/task/WorkLoopFactory.h"
00005 #include "xcept/tools.h"
00006 
00007 #include "EventFilter/StorageManager/interface/Exception.h"
00008 #include "EventFilter/StorageManager/interface/DQMEventProcessor.h"
00009 #include "EventFilter/StorageManager/interface/DQMEventProcessorResources.h"
00010 #include "EventFilter/StorageManager/interface/DQMEventQueueCollection.h"
00011 #include "EventFilter/StorageManager/interface/QueueID.h"
00012 #include "EventFilter/StorageManager/src/DQMEventStore.icc"
00013 
00014 
00015 namespace stor {
00016   
00018   // Specializations for DQMEventStore //
00020   
00021   template<>  
00022   DQMEventMsgView
00023   DQMEventStore<I2OChain,DataSenderMonitorCollection,AlarmHandler>::
00024   getDQMEventView(I2OChain const& dqmEvent)
00025   {
00026     tempEventArea_.clear();
00027     dqmEvent.copyFragmentsIntoBuffer(tempEventArea_);
00028     return DQMEventMsgView(&tempEventArea_[0]);
00029   }
00030 
00031 
00032 
00033   DQMEventProcessor::DQMEventProcessor(xdaq::Application* app, SharedResourcesPtr sr) :
00034   app_(app),
00035   sharedResources_(sr),
00036   actionIsActive_(true),
00037   latestLumiSection_(0),
00038   discardDQMUpdatesForOlderLS_(0),
00039   dqmEventStore_
00040   (
00041     app->getApplicationDescriptor(),
00042     sr->dqmEventQueueCollection_,
00043     sr->statisticsReporter_->getDQMEventMonitorCollection(),
00044     &sr->statisticsReporter_->getDataSenderMonitorCollection(),
00045     &stor::DataSenderMonitorCollection::getConnectedEPs,
00046     sr->alarmHandler_.get(),
00047     &stor::AlarmHandler::moveToFailedState,
00048     sr->alarmHandler_
00049   )
00050   {
00051     WorkerThreadParams workerParams =
00052       sharedResources_->configuration_->getWorkerThreadParams();
00053     timeout_ = workerParams.DQMEPdeqWaitTime_;
00054   }
00055   
00056   
00057   DQMEventProcessor::~DQMEventProcessor()
00058   {
00059     // Stop the activity
00060     actionIsActive_ = false;
00061     
00062     // Cancel the workloop (will wait until the action has finished)
00063     processWL_->cancel();
00064   }
00065   
00066   
00067   void DQMEventProcessor::startWorkLoop(std::string workloopName)
00068   {
00069     try
00070     {
00071       std::string identifier = utils::getIdentifier(app_->getApplicationDescriptor());
00072       
00073       processWL_ = toolbox::task::getWorkLoopFactory()->
00074       getWorkLoop( identifier + workloopName, "waiting" );
00075       
00076       if ( ! processWL_->isActive() )
00077       {
00078         toolbox::task::ActionSignature* processAction = 
00079           toolbox::task::bind(this, &DQMEventProcessor::processDQMEvents,
00080           identifier + "ProcessNextDQMEvent");
00081         processWL_->submit(processAction);
00082         
00083         processWL_->activate();
00084       }
00085     }
00086     catch (xcept::Exception& e)
00087     {
00088       std::string msg = "Failed to start workloop 'DQMEventProcessor' with 'processNextDQMEvent'.";
00089       XCEPT_RETHROW(stor::exception::DQMEventProcessing, msg, e);
00090     }
00091   }
00092   
00093   
00094   bool DQMEventProcessor::processDQMEvents(toolbox::task::WorkLoop*)
00095   {
00096     std::string errorMsg = "Failed to process a DQM event: ";
00097     
00098     try
00099     {
00100       processNextDQMEvent();
00101     }
00102     catch(xcept::Exception &e)
00103     {
00104       XCEPT_DECLARE_NESTED( stor::exception::DQMEventProcessing,
00105         sentinelException, errorMsg, e );
00106       sharedResources_->alarmHandler_->moveToFailedState(sentinelException);
00107     }
00108     catch(std::exception &e)
00109     {
00110       errorMsg += e.what();
00111       XCEPT_DECLARE( stor::exception::DQMEventProcessing,
00112         sentinelException, errorMsg );
00113       sharedResources_->alarmHandler_->moveToFailedState(sentinelException);
00114     }
00115     catch(...)
00116     {
00117       errorMsg += "Unknown exception";
00118       XCEPT_DECLARE( stor::exception::DQMEventProcessing,
00119         sentinelException, errorMsg );
00120       sharedResources_->alarmHandler_->moveToFailedState(sentinelException);
00121     }
00122     
00123     return actionIsActive_;
00124   }
00125   
00126   
00127   void DQMEventProcessor::processNextDQMEvent()
00128   {
00129     DQMEventQueue::ValueType dqmEvent;
00130     DQMEventQueuePtr eq = sharedResources_->dqmEventQueue_;
00131     utils::TimePoint_t startTime = utils::getCurrentTime();
00132     if (eq->deqTimedWait(dqmEvent, timeout_))
00133     {
00134       utils::Duration_t elapsedTime = utils::getCurrentTime() - startTime;
00135       sharedResources_->statisticsReporter_->getThroughputMonitorCollection().
00136         addDQMEventProcessorIdleSample(elapsedTime);
00137 
00138       if (
00139         (discardDQMUpdatesForOlderLS_ > 0) &&
00140         (dqmEvent.first.lumiSection() + discardDQMUpdatesForOlderLS_ < latestLumiSection_)
00141       )
00142         // subtracting unsigned quantities might not yield the right result!
00143       {
00144         // discard very old LS
00145         sharedResources_->statisticsReporter_->getDQMEventMonitorCollection().
00146           getDroppedDQMEventCountsMQ().addSample(dqmEvent.second + 1);        
00147       }
00148       else
00149       {
00150         sharedResources_->statisticsReporter_->getThroughputMonitorCollection().
00151           addPoppedDQMEventSample(dqmEvent.first.memoryUsed());
00152         sharedResources_->statisticsReporter_->getDQMEventMonitorCollection().
00153           getDroppedDQMEventCountsMQ().addSample(dqmEvent.second);
00154         
00155         latestLumiSection_ = std::max(latestLumiSection_, dqmEvent.first.lumiSection());
00156         dqmEventStore_.addDQMEvent(dqmEvent.first);
00157       }
00158     }
00159     else
00160     {
00161       utils::Duration_t elapsedTime = utils::getCurrentTime() - startTime;
00162       sharedResources_->statisticsReporter_->getThroughputMonitorCollection().
00163         addDQMEventProcessorIdleSample(elapsedTime);
00164     }
00165     
00166     DQMEventProcessorResources::Requests requests;
00167     DQMProcessingParams dqmParams;
00168     boost::posix_time::time_duration newTimeoutValue;
00169     if (sharedResources_->dqmEventProcessorResources_->
00170       getRequests(requests, dqmParams, newTimeoutValue))
00171     {
00172       if (requests.configuration)
00173       {
00174         timeout_ = newTimeoutValue;
00175         dqmEventStore_.setParameters(dqmParams);
00176         discardDQMUpdatesForOlderLS_ = dqmParams.discardDQMUpdatesForOlderLS_;
00177       }
00178       if (requests.endOfRun)
00179       {
00180         endOfRun();
00181       }
00182       if (requests.storeDestruction)
00183       {
00184         dqmEventStore_.clear();
00185       }
00186       sharedResources_->dqmEventProcessorResources_->requestsDone();
00187     }
00188   }
00189   
00190   void DQMEventProcessor::endOfRun()
00191   {
00192     dqmEventStore_.purge();
00193     latestLumiSection_ = 0;
00194   }
00195     
00196 } // namespace stor
00197