Go to the documentation of this file.00001
00003
00004 #include <algorithm>
00005
00006 #include <boost/bind.hpp>
00007 #include <boost/pointer_cast.hpp>
00008
00009 #include "toolbox/task/WorkLoopFactory.h"
00010 #include "xcept/tools.h"
00011
00012 #include "EventFilter/StorageManager/interface/DiskWriter.h"
00013 #include "EventFilter/StorageManager/interface/DiskWriterResources.h"
00014 #include "EventFilter/StorageManager/interface/EventStreamHandler.h"
00015 #include "EventFilter/StorageManager/interface/Exception.h"
00016 #include "EventFilter/StorageManager/interface/FaultyEventStreamHandler.h"
00017 #include "EventFilter/StorageManager/interface/FRDStreamHandler.h"
00018 #include "EventFilter/StorageManager/interface/I2OChain.h"
00019 #include "EventFilter/StorageManager/interface/StreamHandler.h"
00020
00021
00022 namespace stor {
00023
00024 DiskWriter::DiskWriter(xdaq::Application *app, SharedResourcesPtr sr) :
00025 app_(app),
00026 sharedResources_(sr),
00027 dbFileHandler_(new DbFileHandler()),
00028 runNumber_(0),
00029 lastFileTimeoutCheckTime_(utils::getCurrentTime()),
00030 actionIsActive_(true)
00031 {
00032 WorkerThreadParams workerParams =
00033 sharedResources_->configuration_->getWorkerThreadParams();
00034 timeout_ = workerParams.DWdeqWaitTime_;
00035 }
00036
00037
00038 DiskWriter::~DiskWriter()
00039 {
00040
00041 actionIsActive_ = false;
00042
00043
00044 writingWL_->cancel();
00045
00046
00047 destroyStreams();
00048 }
00049
00050
00051 void DiskWriter::startWorkLoop(std::string workloopName)
00052 {
00053 try
00054 {
00055 std::string identifier = utils::getIdentifier(app_->getApplicationDescriptor());
00056
00057 writingWL_ = toolbox::task::getWorkLoopFactory()->
00058 getWorkLoop( identifier + workloopName, "waiting" );
00059
00060 if ( ! writingWL_->isActive() )
00061 {
00062 toolbox::task::ActionSignature* processAction =
00063 toolbox::task::bind(this, &DiskWriter::writeAction,
00064 identifier + "WriteNextEvent");
00065 writingWL_->submit(processAction);
00066
00067 writingWL_->activate();
00068 }
00069 }
00070 catch (xcept::Exception& e)
00071 {
00072 std::string msg = "Failed to start workloop 'DiskWriter' with 'writeNextEvent'.";
00073 XCEPT_RETHROW(stor::exception::DiskWriting, msg, e);
00074 }
00075 }
00076
00077
00078 bool DiskWriter::writeAction(toolbox::task::WorkLoop*)
00079 {
00080 std::string errorMsg = "Failed to write an event: ";
00081
00082 try
00083 {
00084 writeNextEvent();
00085 }
00086 catch(xcept::Exception &e)
00087 {
00088 XCEPT_DECLARE_NESTED( stor::exception::DiskWriting,
00089 sentinelException, errorMsg, e );
00090 sharedResources_->moveToFailedState(sentinelException);
00091 }
00092 catch(std::exception &e)
00093 {
00094 errorMsg += e.what();
00095 XCEPT_DECLARE( stor::exception::DiskWriting,
00096 sentinelException, errorMsg );
00097 sharedResources_->moveToFailedState(sentinelException);
00098 }
00099 catch(...)
00100 {
00101 errorMsg += "Unknown exception";
00102 XCEPT_DECLARE( stor::exception::DiskWriting,
00103 sentinelException, errorMsg );
00104 sharedResources_->moveToFailedState(sentinelException);
00105 }
00106
00107 return actionIsActive_;
00108 }
00109
00110
00111 void DiskWriter::writeNextEvent()
00112 {
00113 I2OChain event;
00114 StreamQueuePtr sq = sharedResources_->streamQueue_;
00115 utils::TimePoint_t startTime = utils::getCurrentTime();
00116 if (sq->deqTimedWait(event, timeout_))
00117 {
00118 sharedResources_->diskWriterResources_->setBusy(true);
00119
00120 utils::Duration_t elapsedTime = utils::getCurrentTime() - startTime;
00121 sharedResources_->statisticsReporter_->getThroughputMonitorCollection().addDiskWriterIdleSample(elapsedTime);
00122 sharedResources_->statisticsReporter_->getThroughputMonitorCollection().addPoppedEventSample(event.memoryUsed());
00123
00124 if( event.isEndOfLumiSectionMessage() )
00125 {
00126 processEndOfLumiSection( event );
00127 }
00128 else
00129 {
00130 writeEventToStreams( event );
00131 checkForFileTimeOuts();
00132 }
00133 }
00134 else
00135 {
00136 utils::Duration_t elapsedTime = utils::getCurrentTime() - startTime;
00137 sharedResources_->statisticsReporter_->
00138 getThroughputMonitorCollection().addDiskWriterIdleSample(elapsedTime);
00139
00140 checkStreamChangeRequest();
00141 checkForFileTimeOuts(true);
00142 sharedResources_->diskWriterResources_->setBusy(false);
00143 }
00144 }
00145
00146
00147 void DiskWriter::writeEventToStreams(const I2OChain& event)
00148 {
00149 std::vector<StreamID> streams = event.getStreamTags();
00150
00151 for (
00152 std::vector<StreamID>::const_iterator it = streams.begin(), itEnd = streams.end();
00153 it != itEnd;
00154 ++it
00155 )
00156 {
00157 try
00158 {
00159 streamHandlers_.at(*it)->writeEvent(event);
00160 }
00161 catch (std::out_of_range& e)
00162 {
00163 std::ostringstream msg;
00164 msg << "Unable to retrieve stream handler for " << (*it) << " : ";
00165 msg << e.what();
00166 XCEPT_RAISE(exception::UnknownStreamId, msg.str());
00167 }
00168 }
00169 }
00170
00171
00172 void DiskWriter::checkStreamChangeRequest()
00173 {
00174 EvtStrConfigListPtr evtCfgList;
00175 ErrStrConfigListPtr errCfgList;
00176 DiskWritingParams newdwParams;
00177 unsigned int newRunNumber;
00178 boost::posix_time::time_duration newTimeoutValue;
00179 bool doConfig;
00180 if (sharedResources_->diskWriterResources_->
00181 streamChangeRequested(doConfig, evtCfgList, errCfgList, newdwParams, newRunNumber, newTimeoutValue))
00182 {
00183 destroyStreams();
00184 if (doConfig)
00185 {
00186 dwParams_ = newdwParams;
00187 runNumber_ = newRunNumber;
00188 timeout_ = newTimeoutValue;
00189 dbFileHandler_->configure(runNumber_, dwParams_);
00190
00191 makeFaultyEventStream();
00192 configureEventStreams(evtCfgList);
00193 configureErrorStreams(errCfgList);
00194 }
00195 sharedResources_->diskWriterResources_->streamChangeDone();
00196 }
00197 }
00198
00199
00200 void DiskWriter::checkForFileTimeOuts(const bool doItNow)
00201 {
00202 utils::TimePoint_t now = utils::getCurrentTime();
00203
00204 if (doItNow || (now - lastFileTimeoutCheckTime_) > dwParams_.fileClosingTestInterval_)
00205 {
00206 closeTimedOutFiles(now);
00207 lastFileTimeoutCheckTime_ = now;
00208 }
00209 }
00210
00211
00212 void DiskWriter::closeTimedOutFiles(const utils::TimePoint_t now)
00213 {
00214 std::for_each(streamHandlers_.begin(), streamHandlers_.end(),
00215 boost::bind(&StreamHandler::closeTimedOutFiles, _1, now));
00216 }
00217
00218
00219 void DiskWriter::configureEventStreams(EvtStrConfigListPtr cfgList)
00220 {
00221 for (
00222 EvtStrConfigList::iterator it = cfgList->begin(),
00223 itEnd = cfgList->end();
00224 it != itEnd;
00225 ++it
00226 )
00227 {
00228 if ( it->fractionToDisk() > 0 )
00229 makeEventStream(*it);
00230 }
00231 }
00232
00233
00234 void DiskWriter::configureErrorStreams(ErrStrConfigListPtr cfgList)
00235 {
00236 for (
00237 ErrStrConfigList::iterator it = cfgList->begin(),
00238 itEnd = cfgList->end();
00239 it != itEnd;
00240 ++it
00241 )
00242 {
00243 makeErrorStream(*it);
00244 }
00245 }
00246
00247
00248 void DiskWriter::makeFaultyEventStream()
00249 {
00250 if ( dwParams_.faultyEventsStream_.empty() ) return;
00251
00252 boost::shared_ptr<FaultyEventStreamHandler> newHandler(
00253 new FaultyEventStreamHandler(sharedResources_, dbFileHandler_, dwParams_.faultyEventsStream_)
00254 );
00255 streamHandlers_.push_back(boost::dynamic_pointer_cast<StreamHandler>(newHandler));
00256 }
00257
00258
00259 void DiskWriter::makeEventStream(EventStreamConfigurationInfo& streamCfg)
00260 {
00261 boost::shared_ptr<EventStreamHandler> newHandler(
00262 new EventStreamHandler(streamCfg, sharedResources_, dbFileHandler_)
00263 );
00264 streamHandlers_.push_back(boost::dynamic_pointer_cast<StreamHandler>(newHandler));
00265 streamCfg.setStreamId(streamHandlers_.size() - 1);
00266 }
00267
00268
00269 void DiskWriter::makeErrorStream(ErrorStreamConfigurationInfo& streamCfg)
00270 {
00271 boost::shared_ptr<FRDStreamHandler> newHandler(
00272 new FRDStreamHandler(streamCfg, sharedResources_, dbFileHandler_)
00273 );
00274 streamHandlers_.push_back(boost::dynamic_pointer_cast<StreamHandler>(newHandler));
00275 streamCfg.setStreamId(streamHandlers_.size() - 1);
00276 }
00277
00278
00279 void DiskWriter::destroyStreams()
00280 {
00281 if (streamHandlers_.empty()) return;
00282
00283 std::for_each(streamHandlers_.begin(), streamHandlers_.end(),
00284 boost::bind(&StreamHandler::closeAllFiles, _1));
00285 streamHandlers_.clear();
00286
00287 reportRemainingLumiSections();
00288 writeEndOfRunMarker();
00289 }
00290
00291
00292 void DiskWriter::reportRemainingLumiSections() const
00293 {
00294 StreamsMonitorCollection& smc =
00295 sharedResources_->statisticsReporter_->getStreamsMonitorCollection();
00296
00297 smc.reportAllLumiSectionInfos(dbFileHandler_);
00298 }
00299
00300
00301 void DiskWriter::writeEndOfRunMarker() const
00302 {
00303 RunMonitorCollection& rmc =
00304 sharedResources_->statisticsReporter_->getRunMonitorCollection();
00305
00306 rmc.calculateStatistics(utils::getCurrentTime());
00307
00308 MonitoredQuantity::Stats lumiSectionsSeenStats;
00309 rmc.getLumiSectionsSeenMQ().getStats(lumiSectionsSeenStats);
00310 MonitoredQuantity::Stats eolsSeenStats;
00311 rmc.getEoLSSeenMQ().getStats(eolsSeenStats);
00312
00313 std::ostringstream str;
00314 str << "LScount:" << lumiSectionsSeenStats.getSampleCount()
00315 << "\tEoLScount:" << eolsSeenStats.getSampleCount()
00316 << "\tLastLumi:" << lumiSectionsSeenStats.getLastSampleValue()
00317 << "\tEoR";
00318 dbFileHandler_->write(str.str());
00319 }
00320
00321
00322 void DiskWriter::processEndOfLumiSection(const I2OChain& msg)
00323 {
00324 if ( msg.faulty() || msg.runNumber() != runNumber_ ) return;
00325
00326 const uint32_t lumiSection = msg.lumiSection();
00327
00328 std::string fileCountStr;
00329
00330 for (StreamHandlers::const_iterator it = streamHandlers_.begin(),
00331 itEnd = streamHandlers_.end(); it != itEnd; ++it)
00332 {
00333 (*it)->closeFilesForLumiSection(lumiSection, fileCountStr);
00334 }
00335 dbFileHandler_->write(fileCountStr);
00336 }
00337
00338 }
00339
00340