CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
List of all members | Public Member Functions | Private Member Functions | Private Attributes
stor::DQMEventProcessor Class Reference

#include <DQMEventProcessor.h>

Inheritance diagram for stor::DQMEventProcessor:

Public Member Functions

 DQMEventProcessor (xdaq::Application *, SharedResourcesPtr sr)
 
bool processDQMEvents (toolbox::task::WorkLoop *)
 
void startWorkLoop (std::string workloopName)
 
 ~DQMEventProcessor ()
 

Private Member Functions

 DQMEventProcessor (DQMEventProcessor const &)
 
void endOfRun ()
 
DQMEventProcessoroperator= (DQMEventProcessor const &)
 
void processNextDQMEvent ()
 

Private Attributes

bool actionIsActive_
 
xdaq::Application * app_
 
unsigned int discardDQMUpdatesForOlderLS_
 
DQMEventStore< I2OChain,
InitMsgCollection,
SharedResources
dqmEventStore_
 
uint32_t latestLumiSection_
 
toolbox::task::WorkLoop * processWL_
 
SharedResourcesPtr sharedResources_
 
boost::posix_time::time_duration timeout_
 

Detailed Description

Processes the DQM event (histograms)

It retrieves the next DQM event from the DQMEventQueue, adds up the histograms belonging to one lumi-section, and puts it into the appropriate DQMConsumerQueues. Depending on the configuration, it also writes the histograms to disk every N lumi-sections.

Author:
mommsen
Revision:
1.6
Date:
2011/03/07 15:31:31

Definition at line 38 of file DQMEventProcessor.h.

Constructor & Destructor Documentation

stor::DQMEventProcessor::DQMEventProcessor ( xdaq::Application *  app,
SharedResourcesPtr  sr 
)

Definition at line 34 of file DQMEventProcessor.cc.

References stor::WorkerThreadParams::DQMEPdeqWaitTime_, sharedResources_, and timeout_.

34  :
35  app_(app),
36  sharedResources_(sr),
37  actionIsActive_(true),
41  (
42  app->getApplicationDescriptor(),
43  sr->dqmEventQueueCollection_,
44  sr->statisticsReporter_->getDQMEventMonitorCollection(),
45  sr->initMsgCollection_.get(),
47  sr.get(),
49  sr->statisticsReporter_->alarmHandler()
50  )
51  {
52  WorkerThreadParams workerParams =
53  sharedResources_->configuration_->getWorkerThreadParams();
54  timeout_ = workerParams.DQMEPdeqWaitTime_;
55  }
SharedResourcesPtr sharedResources_
xdaq::Application * app_
DQMEventStore< I2OChain, InitMsgCollection, SharedResources > dqmEventStore_
unsigned int discardDQMUpdatesForOlderLS_
void moveToFailedState(xcept::Exception &)
boost::posix_time::time_duration timeout_
stor::DQMEventProcessor::~DQMEventProcessor ( )

Definition at line 58 of file DQMEventProcessor.cc.

References actionIsActive_, and processWL_.

59  {
60  // Stop the activity
61  actionIsActive_ = false;
62 
63  // Cancel the workloop (will wait until the action has finished)
64  processWL_->cancel();
65  }
toolbox::task::WorkLoop * processWL_
stor::DQMEventProcessor::DQMEventProcessor ( DQMEventProcessor const &  )
private

Member Function Documentation

void stor::DQMEventProcessor::endOfRun ( )
private

Purge instances and process all completed DQM records

Definition at line 191 of file DQMEventProcessor.cc.

References dqmEventStore_.

Referenced by processNextDQMEvent().

192  {
193  dqmEventStore_.purge();
194  }
DQMEventStore< I2OChain, InitMsgCollection, SharedResources > dqmEventStore_
DQMEventProcessor& stor::DQMEventProcessor::operator= ( DQMEventProcessor const &  )
private
bool stor::DQMEventProcessor::processDQMEvents ( toolbox::task::WorkLoop *  )

The workloop action taking the next DQM event from the DQMEventQueue, processes it, and puts it into the appropriate DQMConsumerQueues when the lumi-section has finished.

Definition at line 95 of file DQMEventProcessor.cc.

References actionIsActive_, ExpressReco_HICollisions_FallBack::e, cmsCodeRules.cppFunctionSkipper::exception, edm::hlt::Exception, processNextDQMEvent(), and sharedResources_.

Referenced by startWorkLoop().

96  {
97  std::string errorMsg = "Failed to process a DQM event: ";
98 
99  try
100  {
102  }
103  catch(xcept::Exception &e)
104  {
105  XCEPT_DECLARE_NESTED( stor::exception::DQMEventProcessing,
106  sentinelException, errorMsg, e );
107  sharedResources_->moveToFailedState(sentinelException);
108  }
109  catch(std::exception &e)
110  {
111  errorMsg += e.what();
112  XCEPT_DECLARE( stor::exception::DQMEventProcessing,
113  sentinelException, errorMsg );
114  sharedResources_->moveToFailedState(sentinelException);
115  }
116  catch(...)
117  {
118  errorMsg += "Unknown exception";
119  XCEPT_DECLARE( stor::exception::DQMEventProcessing,
120  sentinelException, errorMsg );
121  sharedResources_->moveToFailedState(sentinelException);
122  }
123 
124  return actionIsActive_;
125  }
SharedResourcesPtr sharedResources_
void stor::DQMEventProcessor::processNextDQMEvent ( )
private

Pops the next DQM event from the DQMEventQueue and adds it to the DQMStore

Definition at line 128 of file DQMEventProcessor.cc.

References stor::DQMEventProcessorResources::Requests::configuration, stor::DQMProcessingParams::discardDQMUpdatesForOlderLS_, discardDQMUpdatesForOlderLS_, dqmEventStore_, stor::DQMEventProcessorResources::Requests::endOfRun, endOfRun(), python.Vispa.Plugins.EdmBrowser.EdmDataAccessor::eq(), stor::utils::getCurrentTime(), latestLumiSection_, max(), sharedResources_, stor::DQMEventProcessorResources::Requests::storeDestruction, and timeout_.

Referenced by processDQMEvents().

129  {
130  DQMEventQueue::ValueType dqmEvent;
131  DQMEventQueuePtr eq = sharedResources_->dqmEventQueue_;
133  if (eq->deqTimedWait(dqmEvent, timeout_))
134  {
135  utils::Duration_t elapsedTime = utils::getCurrentTime() - startTime;
136  sharedResources_->statisticsReporter_->getThroughputMonitorCollection().
137  addDQMEventProcessorIdleSample(elapsedTime);
138 
139  if (
141  (dqmEvent.first.lumiSection() + discardDQMUpdatesForOlderLS_ < latestLumiSection_)
142  )
143  // subtracting unsigned quantities might not yield the right result!
144  {
145  // discard very old LS
146  sharedResources_->statisticsReporter_->getDQMEventMonitorCollection().
147  getDroppedDQMEventCountsMQ().addSample(dqmEvent.second + 1);
148  }
149  else
150  {
151  sharedResources_->statisticsReporter_->getThroughputMonitorCollection().
152  addPoppedDQMEventSample(dqmEvent.first.memoryUsed());
153  sharedResources_->statisticsReporter_->getDQMEventMonitorCollection().
154  getDroppedDQMEventCountsMQ().addSample(dqmEvent.second);
155 
156  latestLumiSection_ = std::max(latestLumiSection_, dqmEvent.first.lumiSection());
157  dqmEventStore_.addDQMEvent(dqmEvent.first);
158  }
159  }
160  else
161  {
162  utils::Duration_t elapsedTime = utils::getCurrentTime() - startTime;
163  sharedResources_->statisticsReporter_->getThroughputMonitorCollection().
164  addDQMEventProcessorIdleSample(elapsedTime);
165  }
166 
167  DQMEventProcessorResources::Requests requests;
168  DQMProcessingParams dqmParams;
169  boost::posix_time::time_duration newTimeoutValue;
170  if (sharedResources_->dqmEventProcessorResources_->
171  getRequests(requests, dqmParams, newTimeoutValue))
172  {
173  if (requests.configuration)
174  {
175  timeout_ = newTimeoutValue;
176  dqmEventStore_.setParameters(dqmParams);
177  discardDQMUpdatesForOlderLS_ = dqmParams.discardDQMUpdatesForOlderLS_;
178  }
179  if (requests.endOfRun)
180  {
181  endOfRun();
182  }
183  if (requests.storeDestruction)
184  {
185  dqmEventStore_.clear();
186  }
187  sharedResources_->dqmEventProcessorResources_->requestsDone();
188  }
189  }
TimePoint_t getCurrentTime()
Definition: Utils.h:158
boost::shared_ptr< DQMEventQueue > DQMEventQueuePtr
Definition: DQMEventQueue.h:23
SharedResourcesPtr sharedResources_
EnqPolicy::ValueType ValueType
boost::posix_time::time_duration Duration_t
Definition: Utils.h:41
const T & max(const T &a, const T &b)
boost::posix_time::ptime TimePoint_t
Definition: Utils.h:35
DQMEventStore< I2OChain, InitMsgCollection, SharedResources > dqmEventStore_
unsigned int discardDQMUpdatesForOlderLS_
boost::posix_time::time_duration timeout_
void stor::DQMEventProcessor::startWorkLoop ( std::string  workloopName)

Creates and starts the DQM event processing workloop

Definition at line 68 of file DQMEventProcessor.cc.

References app_, ExpressReco_HICollisions_FallBack::e, edm::hlt::Exception, stor::utils::getIdentifier(), runTheMatrix::msg, processDQMEvents(), and processWL_.

69  {
70  try
71  {
72  std::string identifier = utils::getIdentifier(app_->getApplicationDescriptor());
73 
74  processWL_ = toolbox::task::getWorkLoopFactory()->
75  getWorkLoop( identifier + workloopName, "waiting" );
76 
77  if ( ! processWL_->isActive() )
78  {
79  toolbox::task::ActionSignature* processAction =
80  toolbox::task::bind(this, &DQMEventProcessor::processDQMEvents,
81  identifier + "ProcessNextDQMEvent");
82  processWL_->submit(processAction);
83 
84  processWL_->activate();
85  }
86  }
87  catch (xcept::Exception& e)
88  {
89  std::string msg = "Failed to start workloop 'DQMEventProcessor' with 'processNextDQMEvent'.";
90  XCEPT_RETHROW(stor::exception::DQMEventProcessing, msg, e);
91  }
92  }
toolbox::task::WorkLoop * processWL_
bool processDQMEvents(toolbox::task::WorkLoop *)
std::string getIdentifier(xdaq::ApplicationDescriptor *)
Definition: Utils.cc:72
xdaq::Application * app_

Member Data Documentation

bool stor::DQMEventProcessor::actionIsActive_
private

Definition at line 82 of file DQMEventProcessor.h.

Referenced by processDQMEvents(), and ~DQMEventProcessor().

xdaq::Application* stor::DQMEventProcessor::app_
private

Definition at line 78 of file DQMEventProcessor.h.

Referenced by startWorkLoop().

unsigned int stor::DQMEventProcessor::discardDQMUpdatesForOlderLS_
private

Definition at line 84 of file DQMEventProcessor.h.

Referenced by processNextDQMEvent().

DQMEventStore<I2OChain,InitMsgCollection,SharedResources> stor::DQMEventProcessor::dqmEventStore_
private

Definition at line 88 of file DQMEventProcessor.h.

Referenced by endOfRun(), and processNextDQMEvent().

uint32_t stor::DQMEventProcessor::latestLumiSection_
private

Definition at line 83 of file DQMEventProcessor.h.

Referenced by processNextDQMEvent().

toolbox::task::WorkLoop* stor::DQMEventProcessor::processWL_
private

Definition at line 86 of file DQMEventProcessor.h.

Referenced by startWorkLoop(), and ~DQMEventProcessor().

SharedResourcesPtr stor::DQMEventProcessor::sharedResources_
private

Definition at line 79 of file DQMEventProcessor.h.

Referenced by DQMEventProcessor(), processDQMEvents(), and processNextDQMEvent().

boost::posix_time::time_duration stor::DQMEventProcessor::timeout_
private

Definition at line 81 of file DQMEventProcessor.h.

Referenced by DQMEventProcessor(), and processNextDQMEvent().