CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
DiskWriter.cc
Go to the documentation of this file.
1 // $Id: DiskWriter.cc,v 1.30 2011/06/20 15:55:53 mommsen Exp $
3 
4 #include <algorithm>
5 
6 #include <boost/bind.hpp>
7 #include <boost/pointer_cast.hpp>
8 
9 #include "toolbox/task/WorkLoopFactory.h"
10 #include "xcept/tools.h"
11 
20 
21 
22 namespace stor {
23 
24  DiskWriter::DiskWriter(xdaq::Application *app, SharedResourcesPtr sr) :
25  app_(app),
26  sharedResources_(sr),
27  dbFileHandler_(new DbFileHandler()),
28  runNumber_(0),
29  lastFileTimeoutCheckTime_(utils::getCurrentTime()),
30  endOfRunReport_(new StreamsMonitorCollection::EndOfRunReport()),
31  actionIsActive_(true)
32  {
33  WorkerThreadParams workerParams =
34  sharedResources_->configuration_->getWorkerThreadParams();
35  timeout_ = workerParams.DWdeqWaitTime_;
36  }
37 
38 
40  {
41  // Stop the activity
42  actionIsActive_ = false;
43 
44  // Cancel the workloop (will wait until the action has finished)
45  writingWL_->cancel();
46 
47  // Destroy any remaining streams. Under normal conditions, there should be none
48  destroyStreams();
49  }
50 
51 
52  void DiskWriter::startWorkLoop(std::string workloopName)
53  {
54  try
55  {
56  std::string identifier = utils::getIdentifier(app_->getApplicationDescriptor());
57 
58  writingWL_ = toolbox::task::getWorkLoopFactory()->
59  getWorkLoop( identifier + workloopName, "waiting" );
60 
61  if ( ! writingWL_->isActive() )
62  {
63  toolbox::task::ActionSignature* processAction =
64  toolbox::task::bind(this, &DiskWriter::writeAction,
65  identifier + "WriteNextEvent");
66  writingWL_->submit(processAction);
67 
68  writingWL_->activate();
69  }
70  }
71  catch (xcept::Exception& e)
72  {
73  std::string msg = "Failed to start workloop 'DiskWriter' with 'writeNextEvent'.";
74  XCEPT_RETHROW(stor::exception::DiskWriting, msg, e);
75  }
76  }
77 
78 
79  bool DiskWriter::writeAction(toolbox::task::WorkLoop*)
80  {
81  std::string errorMsg = "Failed to write an event: ";
82 
83  try
84  {
86  }
87  catch(xcept::Exception &e)
88  {
89  XCEPT_DECLARE_NESTED( stor::exception::DiskWriting,
90  sentinelException, errorMsg, e );
91  sharedResources_->moveToFailedState(sentinelException);
92  }
93  catch(std::exception &e)
94  {
95  errorMsg += e.what();
96  XCEPT_DECLARE( stor::exception::DiskWriting,
97  sentinelException, errorMsg );
98  sharedResources_->moveToFailedState(sentinelException);
99  }
100  catch(...)
101  {
102  errorMsg += "Unknown exception";
103  XCEPT_DECLARE( stor::exception::DiskWriting,
104  sentinelException, errorMsg );
105  sharedResources_->moveToFailedState(sentinelException);
106  }
107 
108  return actionIsActive_;
109  }
110 
111 
113  {
114  I2OChain event;
115  StreamQueuePtr sq = sharedResources_->streamQueue_;
117  if (sq->deqTimedWait(event, timeout_))
118  {
119  sharedResources_->diskWriterResources_->setBusy(true);
120 
121  utils::Duration_t elapsedTime = utils::getCurrentTime() - startTime;
122  sharedResources_->statisticsReporter_->getThroughputMonitorCollection().addDiskWriterIdleSample(elapsedTime);
123  sharedResources_->statisticsReporter_->getThroughputMonitorCollection().addPoppedEventSample(event.memoryUsed());
124 
125  if( event.isEndOfLumiSectionMessage() )
126  {
127  processEndOfLumiSection( event );
128  }
129  else
130  {
131  writeEventToStreams( event );
133  }
134  }
135  else
136  {
137  utils::Duration_t elapsedTime = utils::getCurrentTime() - startTime;
138  sharedResources_->statisticsReporter_->
139  getThroughputMonitorCollection().addDiskWriterIdleSample(elapsedTime);
140 
142  checkForFileTimeOuts(true);
143  sharedResources_->diskWriterResources_->setBusy(false);
144  }
145  }
146 
147 
149  {
150  std::vector<StreamID> streams = event.getStreamTags();
151 
152  for (
153  std::vector<StreamID>::const_iterator it = streams.begin(), itEnd = streams.end();
154  it != itEnd;
155  ++it
156  )
157  {
158  try
159  {
160  streamHandlers_.at(*it)->writeEvent(event);
161  }
162  catch (std::out_of_range& e)
163  {
164  std::ostringstream msg;
165  msg << "Unable to retrieve stream handler for " << (*it) << " : ";
166  msg << e.what();
167  XCEPT_RAISE(exception::UnknownStreamId, msg.str());
168  }
169  }
170  }
171 
172 
174  {
175  EvtStrConfigListPtr evtCfgList;
176  ErrStrConfigListPtr errCfgList;
177  DiskWritingParams newdwParams;
178  unsigned int newRunNumber;
179  boost::posix_time::time_duration newTimeoutValue;
180  bool doConfig;
181  if (sharedResources_->diskWriterResources_->
182  streamChangeRequested(doConfig, evtCfgList, errCfgList, newdwParams, newRunNumber, newTimeoutValue))
183  {
184  destroyStreams();
185  if (doConfig)
186  {
187  dwParams_ = newdwParams;
188  runNumber_ = newRunNumber;
189  timeout_ = newTimeoutValue;
190  dbFileHandler_->configure(runNumber_, dwParams_);
191 
193  configureEventStreams(evtCfgList);
194  configureErrorStreams(errCfgList);
195  }
196  sharedResources_->diskWriterResources_->streamChangeDone();
197  }
198  }
199 
200 
201  void DiskWriter::checkForFileTimeOuts(const bool doItNow)
202  {
204 
206  {
207  closeTimedOutFiles(now);
209  }
210  }
211 
212 
214  {
215  std::for_each(streamHandlers_.begin(), streamHandlers_.end(),
216  boost::bind(&StreamHandler::closeTimedOutFiles, _1, now));
217  }
218 
219 
221  {
222  for (
223  EvtStrConfigList::iterator it = cfgList->begin(),
224  itEnd = cfgList->end();
225  it != itEnd;
226  ++it
227  )
228  {
229  if ( it->fractionToDisk() > 0 )
230  makeEventStream(*it);
231  }
232  }
233 
234 
236  {
237  for (
238  ErrStrConfigList::iterator it = cfgList->begin(),
239  itEnd = cfgList->end();
240  it != itEnd;
241  ++it
242  )
243  {
244  makeErrorStream(*it);
245  }
246  }
247 
248 
250  {
251  if ( dwParams_.faultyEventsStream_.empty() ) return;
252 
253  boost::shared_ptr<FaultyEventStreamHandler> newHandler(
255  );
256  streamHandlers_.push_back(boost::dynamic_pointer_cast<StreamHandler>(newHandler));
257  }
258 
259 
261  {
262  boost::shared_ptr<EventStreamHandler> newHandler(
264  );
265  streamHandlers_.push_back(boost::dynamic_pointer_cast<StreamHandler>(newHandler));
266  streamCfg.setStreamId(streamHandlers_.size() - 1);
267  }
268 
269 
271  {
272  boost::shared_ptr<FRDStreamHandler> newHandler(
274  );
275  streamHandlers_.push_back(boost::dynamic_pointer_cast<StreamHandler>(newHandler));
276  streamCfg.setStreamId(streamHandlers_.size() - 1);
277  }
278 
279 
281  {
282  if (streamHandlers_.empty()) return;
283 
284  std::for_each(streamHandlers_.begin(), streamHandlers_.end(),
285  boost::bind(&StreamHandler::closeAllFiles, _1));
286  streamHandlers_.clear();
287 
290  }
291 
292 
294  {
296  sharedResources_->statisticsReporter_->getStreamsMonitorCollection();
297 
299  }
300 
301 
303  {
304  std::ostringstream str;
305  str << "LScount:" << endOfRunReport_->lsCountWithFiles
306  << "\tEoLScount:" << endOfRunReport_->eolsCount
307  << "\tLastLumi:" << endOfRunReport_->latestLumiSectionWritten
308  << "\tEoR";
309  dbFileHandler_->write(str.str());
310  endOfRunReport_->reset();
311  }
312 
313 
315  {
316  if ( msg.faulty() || msg.runNumber() != runNumber_ ) return;
317 
318  const uint32_t lumiSection = msg.lumiSection();
319 
320  std::string fileCountStr;
321  bool filesWritten = false;
322 
323  for (StreamHandlers::const_iterator it = streamHandlers_.begin(),
324  itEnd = streamHandlers_.end(); it != itEnd; ++it)
325  {
326  if ( (*it)->closeFilesForLumiSection(lumiSection, fileCountStr) )
327  filesWritten = true;
328  }
329  fileCountStr += "\tEoLS:1";
330  dbFileHandler_->write(fileCountStr);
331 
332  ++(endOfRunReport_->eolsCount);
333  if (filesWritten) ++(endOfRunReport_->lsCountWithFiles);
334  endOfRunReport_->updateLatestWrittenLumiSection(lumiSection);
335  }
336 
337 } // namespace stor
338 
339 
TimePoint_t getCurrentTime()
Definition: Utils.h:158
static const char runNumber_[]
void checkForFileTimeOuts(const bool doItNow=false)
Definition: DiskWriter.cc:201
uint32_t runNumber() const
Definition: I2OChain.cc:575
boost::shared_ptr< ErrStrConfigList > ErrStrConfigListPtr
void writeEndOfRunMarker()
Definition: DiskWriter.cc:302
utils::TimePoint_t lastFileTimeoutCheckTime_
Definition: DiskWriter.h:151
bool isEndOfLumiSectionMessage() const
Definition: I2OChain.cc:615
SharedResourcesPtr sharedResources_
Definition: DiskWriter.h:145
boost::shared_ptr< SharedResources > SharedResourcesPtr
void checkStreamChangeRequest()
Definition: DiskWriter.cc:173
void writeEventToStreams(const I2OChain &)
Definition: DiskWriter.cc:148
DiskWritingParams dwParams_
Definition: DiskWriter.h:146
boost::shared_ptr< EvtStrConfigList > EvtStrConfigListPtr
uint32_t lumiSection() const
Definition: I2OChain.cc:585
boost::posix_time::time_duration Duration_t
Definition: Utils.h:41
void makeEventStream(EventStreamConfigurationInfo &)
Definition: DiskWriter.cc:260
void configureEventStreams(EvtStrConfigListPtr)
Definition: DiskWriter.cc:220
void configureErrorStreams(ErrStrConfigListPtr)
Definition: DiskWriter.cc:235
void reportRemainingLumiSections()
Definition: DiskWriter.cc:293
boost::shared_ptr< StreamQueue > StreamQueuePtr
Definition: StreamQueue.h:22
boost::posix_time::ptime TimePoint_t
Definition: Utils.h:35
void closeTimedOutFiles(const utils::TimePoint_t)
Definition: DiskWriter.cc:213
unsigned int runNumber_
Definition: DiskWriter.h:149
void reportAllLumiSectionInfos(DbFileHandlerPtr, EndOfRunReportPtr)
void closeTimedOutFiles(utils::TimePoint_t currentTime=utils::getCurrentTime())
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger but the state exists so we define the behavior If all triggers are the negative crieriion will lead to accepting the event(this again matches the behavior of"!*"before the partial wildcard feature was incorporated).The per-event"cost"of each negative criterion with multiple relevant triggers is about the same as!*was in the past
boost::posix_time::time_duration timeout_
Definition: DiskWriter.h:150
void processEndOfLumiSection(const I2OChain &)
Definition: DiskWriter.cc:314
StreamHandlers streamHandlers_
Definition: DiskWriter.h:155
DiskWriter(xdaq::Application *, SharedResourcesPtr sr)
Definition: DiskWriter.cc:24
void makeFaultyEventStream()
Definition: DiskWriter.cc:249
std::string faultyEventsStream_
Definition: Configuration.h:44
const DbFileHandlerPtr dbFileHandler_
Definition: DiskWriter.h:147
std::string getIdentifier(xdaq::ApplicationDescriptor *)
Definition: Utils.cc:72
void makeErrorStream(ErrorStreamConfigurationInfo &)
Definition: DiskWriter.cc:270
bool writeAction(toolbox::task::WorkLoop *)
Definition: DiskWriter.cc:79
size_t memoryUsed() const
Definition: I2OChain.cc:426
utils::Duration_t fileClosingTestInterval_
Definition: Configuration.h:42
xdaq::Application * app_
Definition: DiskWriter.h:144
void startWorkLoop(std::string workloopName)
Definition: DiskWriter.cc:52
boost::posix_time::time_duration DWdeqWaitTime_
StreamsMonitorCollection::EndOfRunReportPtr endOfRunReport_
Definition: DiskWriter.h:157
void writeNextEvent()
Definition: DiskWriter.cc:112
bool faulty() const
Definition: I2OChain.cc:125
void destroyStreams()
Definition: DiskWriter.cc:280
toolbox::task::WorkLoop * writingWL_
Definition: DiskWriter.h:160