CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
DiskWriter.cc
Go to the documentation of this file.
1 // $Id: DiskWriter.cc,v 1.33 2012/10/17 10:13:25 mommsen Exp $
3 
4 #include <algorithm>
5 
6 #include <boost/bind.hpp>
7 #include <boost/pointer_cast.hpp>
8 
9 #include "toolbox/task/WorkLoopFactory.h"
10 #include "xcept/tools.h"
11 
21 
22 
23 namespace stor {
24 
25  DiskWriter::DiskWriter(xdaq::Application *app, SharedResourcesPtr sr) :
26  app_(app),
27  sharedResources_(sr),
28  dbFileHandler_(new DbFileHandler()),
29  runNumber_(0),
30  lastFileTimeoutCheckTime_(utils::getCurrentTime()),
31  endOfRunReport_(new StreamsMonitorCollection::EndOfRunReport()),
32  actionIsActive_(true)
33  {
34  WorkerThreadParams workerParams =
35  sharedResources_->configuration_->getWorkerThreadParams();
36  timeout_ = workerParams.DWdeqWaitTime_;
37  }
38 
39 
41  {
42  // Stop the activity
43  actionIsActive_ = false;
44 
45  // Cancel the workloop (will wait until the action has finished)
46  writingWL_->cancel();
47 
48  // Destroy any remaining streams. Under normal conditions, there should be none
49  destroyStreams();
50  }
51 
52 
54  {
55  try
56  {
57  std::string identifier = utils::getIdentifier(app_->getApplicationDescriptor());
58 
59  writingWL_ = toolbox::task::getWorkLoopFactory()->
60  getWorkLoop( identifier + workloopName, "waiting" );
61 
62  if ( ! writingWL_->isActive() )
63  {
64  toolbox::task::ActionSignature* processAction =
65  toolbox::task::bind(this, &DiskWriter::writeAction,
66  identifier + "WriteNextEvent");
67  writingWL_->submit(processAction);
68 
69  writingWL_->activate();
70  }
71  }
72  catch (xcept::Exception& e)
73  {
74  std::string msg = "Failed to start workloop 'DiskWriter' with 'writeNextEvent'.";
75  XCEPT_RETHROW(stor::exception::DiskWriting, msg, e);
76  }
77  }
78 
79 
80  bool DiskWriter::writeAction(toolbox::task::WorkLoop*)
81  {
82  std::string errorMsg = "Failed to write an event: ";
83 
84  try
85  {
87  }
88  catch(xcept::Exception &e)
89  {
90  XCEPT_DECLARE_NESTED( stor::exception::DiskWriting,
91  sentinelException, errorMsg, e );
92  sharedResources_->alarmHandler_->moveToFailedState(sentinelException);
93  }
94  catch(std::exception &e)
95  {
96  errorMsg += e.what();
97  XCEPT_DECLARE( stor::exception::DiskWriting,
98  sentinelException, errorMsg );
99  sharedResources_->alarmHandler_->moveToFailedState(sentinelException);
100  }
101  catch(...)
102  {
103  errorMsg += "Unknown exception";
104  XCEPT_DECLARE( stor::exception::DiskWriting,
105  sentinelException, errorMsg );
106  sharedResources_->alarmHandler_->moveToFailedState(sentinelException);
107  }
108 
109  return actionIsActive_;
110  }
111 
112 
114  {
115  I2OChain event;
116  StreamQueuePtr sq = sharedResources_->streamQueue_;
118  if (sq->deqTimedWait(event, timeout_))
119  {
120  sharedResources_->diskWriterResources_->setBusy(true);
121 
122  utils::Duration_t elapsedTime = utils::getCurrentTime() - startTime;
123  sharedResources_->statisticsReporter_->getThroughputMonitorCollection().addDiskWriterIdleSample(elapsedTime);
124  sharedResources_->statisticsReporter_->getThroughputMonitorCollection().addPoppedEventSample(event.memoryUsed());
125 
126  if( event.isEndOfLumiSectionMessage() )
127  {
128  processEndOfLumiSection( event );
129  }
130  else
131  {
132  writeEventToStreams( event );
134  }
135  }
136  else
137  {
138  utils::Duration_t elapsedTime = utils::getCurrentTime() - startTime;
139  sharedResources_->statisticsReporter_->
140  getThroughputMonitorCollection().addDiskWriterIdleSample(elapsedTime);
141 
143  checkForFileTimeOuts(true);
144  sharedResources_->diskWriterResources_->setBusy(false);
145  }
146  }
147 
148 
150  {
151  std::vector<StreamID> streams = event.getStreamTags();
152 
153  for (
154  std::vector<StreamID>::const_iterator it = streams.begin(), itEnd = streams.end();
155  it != itEnd;
156  ++it
157  )
158  {
159  try
160  {
161  streamHandlers_.at(*it)->writeEvent(event);
162  }
163  catch (std::out_of_range& e)
164  {
165  std::ostringstream msg;
166  msg << "Unable to retrieve stream handler for " << (*it) << " : ";
167  msg << e.what();
168  XCEPT_RAISE(exception::UnknownStreamId, msg.str());
169  }
170  }
171  }
172 
173 
175  {
176  EvtStrConfigListPtr evtCfgList;
177  ErrStrConfigListPtr errCfgList;
178  DiskWritingParams newdwParams;
179  unsigned int newRunNumber;
180  boost::posix_time::time_duration newTimeoutValue;
181  bool doConfig;
182  if (sharedResources_->diskWriterResources_->
183  streamChangeRequested(doConfig, evtCfgList, errCfgList, newdwParams, newRunNumber, newTimeoutValue))
184  {
185  destroyStreams();
186  if (doConfig)
187  {
188  dwParams_ = newdwParams;
189  runNumber_ = newRunNumber;
190  timeout_ = newTimeoutValue;
191  dbFileHandler_->configure(runNumber_, dwParams_);
192 
194  configureEventStreams(evtCfgList);
195  configureErrorStreams(errCfgList);
196  }
197  sharedResources_->diskWriterResources_->streamChangeDone();
198  }
199  }
200 
201 
202  void DiskWriter::checkForFileTimeOuts(const bool doItNow)
203  {
205 
207  {
208  closeTimedOutFiles(now);
210  }
211  }
212 
213 
215  {
216  std::for_each(streamHandlers_.begin(), streamHandlers_.end(),
217  boost::bind(&StreamHandler::closeTimedOutFiles, _1, now));
218  }
219 
220 
222  {
223  for (
224  EvtStrConfigList::iterator it = cfgList->begin(),
225  itEnd = cfgList->end();
226  it != itEnd;
227  ++it
228  )
229  {
230  if ( it->fractionToDisk() > 0 )
231  makeEventStream(*it);
232  }
233  }
234 
235 
237  {
238  for (
239  ErrStrConfigList::iterator it = cfgList->begin(),
240  itEnd = cfgList->end();
241  it != itEnd;
242  ++it
243  )
244  {
245  makeErrorStream(*it);
246  }
247  }
248 
249 
251  {
252  if ( dwParams_.faultyEventsStream_.empty() ) return;
253 
254  boost::shared_ptr<FaultyEventStreamHandler> newHandler(
256  );
257  streamHandlers_.push_back(boost::dynamic_pointer_cast<StreamHandler>(newHandler));
258  }
259 
260 
262  {
263  boost::shared_ptr<EventStreamHandler> newHandler(
265  );
266  streamHandlers_.push_back(boost::dynamic_pointer_cast<StreamHandler>(newHandler));
267  streamCfg.setStreamId(streamHandlers_.size() - 1);
268  }
269 
270 
272  {
273  boost::shared_ptr<FRDStreamHandler> newHandler(
275  );
276  streamHandlers_.push_back(boost::dynamic_pointer_cast<StreamHandler>(newHandler));
277  streamCfg.setStreamId(streamHandlers_.size() - 1);
278  }
279 
280 
282  {
283  if (streamHandlers_.empty()) return;
284 
285  std::for_each(streamHandlers_.begin(), streamHandlers_.end(),
286  boost::bind(&StreamHandler::closeAllFiles, _1));
287  streamHandlers_.clear();
288 
291  }
292 
293 
295  {
297  sharedResources_->statisticsReporter_->getStreamsMonitorCollection();
298 
300  }
301 
302 
304  {
305  std::ostringstream str;
306  str << "LScount:" << endOfRunReport_->lsCountWithFiles
307  << "\tEoLScount:" << endOfRunReport_->eolsCount
308  << "\tLastLumi:" << endOfRunReport_->latestLumiSectionWritten
309  << "\tEoR";
310  dbFileHandler_->write(str.str());
311  endOfRunReport_->reset();
312  }
313 
314 
316  {
317  if ( msg.faulty() || msg.runNumber() != runNumber_ ) return;
318  if ( streamHandlers_.empty() ) return; //Don't care about EoLS signal if we have no streams
319 
320  const uint32_t lumiSection = msg.lumiSection();
321 
322  std::string fileCountStr;
323  bool filesWritten = false;
324 
325  for (StreamHandlers::const_iterator it = streamHandlers_.begin(),
326  itEnd = streamHandlers_.end(); it != itEnd; ++it)
327  {
328  if ( (*it)->closeFilesForLumiSection(lumiSection, fileCountStr) )
329  filesWritten = true;
330  }
331  fileCountStr += "\tEoLS:1";
332  dbFileHandler_->write(fileCountStr);
333 
334  ++(endOfRunReport_->eolsCount);
335  if (filesWritten) ++(endOfRunReport_->lsCountWithFiles);
336  endOfRunReport_->updateLatestWrittenLumiSection(lumiSection);
337  }
338 
339 } // namespace stor
340 
341 
TimePoint_t getCurrentTime()
Definition: Utils.h:158
static const char runNumber_[]
void checkForFileTimeOuts(const bool doItNow=false)
Definition: DiskWriter.cc:202
uint32_t runNumber() const
Definition: I2OChain.cc:585
boost::shared_ptr< ErrStrConfigList > ErrStrConfigListPtr
void writeEndOfRunMarker()
Definition: DiskWriter.cc:303
utils::TimePoint_t lastFileTimeoutCheckTime_
Definition: DiskWriter.h:151
bool isEndOfLumiSectionMessage() const
Definition: I2OChain.cc:625
SharedResourcesPtr sharedResources_
Definition: DiskWriter.h:145
boost::shared_ptr< SharedResources > SharedResourcesPtr
void checkStreamChangeRequest()
Definition: DiskWriter.cc:174
void writeEventToStreams(const I2OChain &)
Definition: DiskWriter.cc:149
DiskWritingParams dwParams_
Definition: DiskWriter.h:146
boost::shared_ptr< EvtStrConfigList > EvtStrConfigListPtr
uint32_t lumiSection() const
Definition: I2OChain.cc:595
boost::posix_time::time_duration Duration_t
Definition: Utils.h:41
void makeEventStream(EventStreamConfigurationInfo &)
Definition: DiskWriter.cc:261
void configureEventStreams(EvtStrConfigListPtr)
Definition: DiskWriter.cc:221
void configureErrorStreams(ErrStrConfigListPtr)
Definition: DiskWriter.cc:236
void reportRemainingLumiSections()
Definition: DiskWriter.cc:294
boost::shared_ptr< StreamQueue > StreamQueuePtr
Definition: StreamQueue.h:22
boost::posix_time::ptime TimePoint_t
Definition: Utils.h:35
void closeTimedOutFiles(const utils::TimePoint_t)
Definition: DiskWriter.cc:214
unsigned int runNumber_
Definition: DiskWriter.h:149
void reportAllLumiSectionInfos(DbFileHandlerPtr, EndOfRunReportPtr)
void closeTimedOutFiles(utils::TimePoint_t currentTime=utils::getCurrentTime())
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger but the state exists so we define the behavior If all triggers are the negative crieriion will lead to accepting the event(this again matches the behavior of"!*"before the partial wildcard feature was incorporated).The per-event"cost"of each negative criterion with multiple relevant triggers is about the same as!*was in the past
boost::posix_time::time_duration timeout_
Definition: DiskWriter.h:150
void processEndOfLumiSection(const I2OChain &)
Definition: DiskWriter.cc:315
StreamHandlers streamHandlers_
Definition: DiskWriter.h:155
DiskWriter(xdaq::Application *, SharedResourcesPtr sr)
Definition: DiskWriter.cc:25
void makeFaultyEventStream()
Definition: DiskWriter.cc:250
std::string faultyEventsStream_
Definition: Configuration.h:44
const DbFileHandlerPtr dbFileHandler_
Definition: DiskWriter.h:147
std::string getIdentifier(xdaq::ApplicationDescriptor *)
Definition: Utils.cc:72
void makeErrorStream(ErrorStreamConfigurationInfo &)
Definition: DiskWriter.cc:271
bool writeAction(toolbox::task::WorkLoop *)
Definition: DiskWriter.cc:80
size_t memoryUsed() const
Definition: I2OChain.cc:426
utils::Duration_t fileClosingTestInterval_
Definition: Configuration.h:42
xdaq::Application * app_
Definition: DiskWriter.h:144
void startWorkLoop(std::string workloopName)
Definition: DiskWriter.cc:53
boost::posix_time::time_duration DWdeqWaitTime_
StreamsMonitorCollection::EndOfRunReportPtr endOfRunReport_
Definition: DiskWriter.h:157
void writeNextEvent()
Definition: DiskWriter.cc:113
bool faulty() const
Definition: I2OChain.cc:125
void destroyStreams()
Definition: DiskWriter.cc:281
toolbox::task::WorkLoop * writingWL_
Definition: DiskWriter.h:160