CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
DQMEventProcessor.cc
Go to the documentation of this file.
1 // $Id: DQMEventProcessor.cc,v 1.19 2012/04/20 10:48:01 mommsen Exp $
3 
4 #include "toolbox/task/WorkLoopFactory.h"
5 #include "xcept/tools.h"
6 
12 #include "EventFilter/StorageManager/src/DQMEventStore.icc"
13 
14 
15 namespace stor {
16 
18  // Specializations for DQMEventStore //
20 
21  template<>
24  getDQMEventView(I2OChain const& dqmEvent)
25  {
26  tempEventArea_.clear();
27  dqmEvent.copyFragmentsIntoBuffer(tempEventArea_);
28  return DQMEventMsgView(&tempEventArea_[0]);
29  }
30 
31 
32 
34  app_(app),
35  sharedResources_(sr),
36  actionIsActive_(true),
37  latestLumiSection_(0),
38  discardDQMUpdatesForOlderLS_(0),
39  dqmEventStore_
40  (
41  app->getApplicationDescriptor(),
42  sr->dqmEventQueueCollection_,
43  sr->statisticsReporter_->getDQMEventMonitorCollection(),
44  &sr->statisticsReporter_->getDataSenderMonitorCollection(),
45  &stor::DataSenderMonitorCollection::getConnectedEPs,
46  sr->alarmHandler_.get(),
47  &stor::AlarmHandler::moveToFailedState,
48  sr->alarmHandler_
49  )
50  {
51  WorkerThreadParams workerParams =
52  sharedResources_->configuration_->getWorkerThreadParams();
53  timeout_ = workerParams.DQMEPdeqWaitTime_;
54  }
55 
56 
58  {
59  // Stop the activity
60  actionIsActive_ = false;
61 
62  // Cancel the workloop (will wait until the action has finished)
63  processWL_->cancel();
64  }
65 
66 
67  void DQMEventProcessor::startWorkLoop(std::string workloopName)
68  {
69  try
70  {
71  std::string identifier = utils::getIdentifier(app_->getApplicationDescriptor());
72 
73  processWL_ = toolbox::task::getWorkLoopFactory()->
74  getWorkLoop( identifier + workloopName, "waiting" );
75 
76  if ( ! processWL_->isActive() )
77  {
78  toolbox::task::ActionSignature* processAction =
79  toolbox::task::bind(this, &DQMEventProcessor::processDQMEvents,
80  identifier + "ProcessNextDQMEvent");
81  processWL_->submit(processAction);
82 
83  processWL_->activate();
84  }
85  }
86  catch (xcept::Exception& e)
87  {
88  std::string msg = "Failed to start workloop 'DQMEventProcessor' with 'processNextDQMEvent'.";
89  XCEPT_RETHROW(stor::exception::DQMEventProcessing, msg, e);
90  }
91  }
92 
93 
94  bool DQMEventProcessor::processDQMEvents(toolbox::task::WorkLoop*)
95  {
96  std::string errorMsg = "Failed to process a DQM event: ";
97 
98  try
99  {
101  }
102  catch(xcept::Exception &e)
103  {
104  XCEPT_DECLARE_NESTED( stor::exception::DQMEventProcessing,
105  sentinelException, errorMsg, e );
106  sharedResources_->alarmHandler_->moveToFailedState(sentinelException);
107  }
108  catch(std::exception &e)
109  {
110  errorMsg += e.what();
111  XCEPT_DECLARE( stor::exception::DQMEventProcessing,
112  sentinelException, errorMsg );
113  sharedResources_->alarmHandler_->moveToFailedState(sentinelException);
114  }
115  catch(...)
116  {
117  errorMsg += "Unknown exception";
118  XCEPT_DECLARE( stor::exception::DQMEventProcessing,
119  sentinelException, errorMsg );
120  sharedResources_->alarmHandler_->moveToFailedState(sentinelException);
121  }
122 
123  return actionIsActive_;
124  }
125 
126 
128  {
129  DQMEventQueue::ValueType dqmEvent;
130  DQMEventQueuePtr eq = sharedResources_->dqmEventQueue_;
132  if (eq->deqTimedWait(dqmEvent, timeout_))
133  {
134  utils::Duration_t elapsedTime = utils::getCurrentTime() - startTime;
135  sharedResources_->statisticsReporter_->getThroughputMonitorCollection().
136  addDQMEventProcessorIdleSample(elapsedTime);
137 
138  if (
140  (dqmEvent.first.lumiSection() + discardDQMUpdatesForOlderLS_ < latestLumiSection_)
141  )
142  // subtracting unsigned quantities might not yield the right result!
143  {
144  // discard very old LS
145  sharedResources_->statisticsReporter_->getDQMEventMonitorCollection().
146  getDroppedDQMEventCountsMQ().addSample(dqmEvent.second + 1);
147  }
148  else
149  {
150  sharedResources_->statisticsReporter_->getThroughputMonitorCollection().
151  addPoppedDQMEventSample(dqmEvent.first.memoryUsed());
152  sharedResources_->statisticsReporter_->getDQMEventMonitorCollection().
153  getDroppedDQMEventCountsMQ().addSample(dqmEvent.second);
154 
155  latestLumiSection_ = std::max(latestLumiSection_, dqmEvent.first.lumiSection());
156  dqmEventStore_.addDQMEvent(dqmEvent.first);
157  }
158  }
159  else
160  {
161  utils::Duration_t elapsedTime = utils::getCurrentTime() - startTime;
162  sharedResources_->statisticsReporter_->getThroughputMonitorCollection().
163  addDQMEventProcessorIdleSample(elapsedTime);
164  }
165 
167  DQMProcessingParams dqmParams;
168  boost::posix_time::time_duration newTimeoutValue;
169  if (sharedResources_->dqmEventProcessorResources_->
170  getRequests(requests, dqmParams, newTimeoutValue))
171  {
172  if (requests.configuration)
173  {
174  timeout_ = newTimeoutValue;
175  dqmEventStore_.setParameters(dqmParams);
177  }
178  if (requests.endOfRun)
179  {
180  endOfRun();
181  }
182  if (requests.storeDestruction)
183  {
184  dqmEventStore_.clear();
185  }
186  sharedResources_->dqmEventProcessorResources_->requestsDone();
187  }
188  }
189 
191  {
192  dqmEventStore_.purge();
193  latestLumiSection_ = 0;
194  }
195 
196 } // namespace stor
197 
TimePoint_t getCurrentTime()
Definition: Utils.h:158
DQMEventMsgView getDQMEventView(EventType const &)
unsigned int discardDQMUpdatesForOlderLS_
Definition: Configuration.h:66
toolbox::task::WorkLoop * processWL_
boost::shared_ptr< SharedResources > SharedResourcesPtr
boost::shared_ptr< DQMEventQueue > DQMEventQueuePtr
Definition: DQMEventQueue.h:23
SharedResourcesPtr sharedResources_
EnqPolicy::ValueType ValueType
boost::posix_time::time_duration Duration_t
Definition: Utils.h:41
const T & max(const T &a, const T &b)
boost::posix_time::time_duration DQMEPdeqWaitTime_
bool processDQMEvents(toolbox::task::WorkLoop *)
void startWorkLoop(std::string workloopName)
boost::posix_time::ptime TimePoint_t
Definition: Utils.h:35
DQMEventStore< I2OChain, DataSenderMonitorCollection, AlarmHandler > dqmEventStore_
unsigned int copyFragmentsIntoBuffer(std::vector< unsigned char > &buff) const
Definition: I2OChain.cc:469
std::string getIdentifier(xdaq::ApplicationDescriptor *)
Definition: Utils.cc:72
xdaq::Application * app_
unsigned int discardDQMUpdatesForOlderLS_
T get(const Candidate &c)
Definition: component.h:56
boost::posix_time::time_duration timeout_
DQMEventProcessor(xdaq::Application *, SharedResourcesPtr sr)