00001
00003
00004 #include <algorithm>
00005
00006 #include <boost/bind.hpp>
00007 #include <boost/pointer_cast.hpp>
00008
00009 #include "toolbox/task/WorkLoopFactory.h"
00010 #include "xcept/tools.h"
00011
00012 #include "EventFilter/StorageManager/interface/DiskWriter.h"
00013 #include "EventFilter/StorageManager/interface/DiskWriterResources.h"
00014 #include "EventFilter/StorageManager/interface/EventStreamHandler.h"
00015 #include "EventFilter/StorageManager/interface/Exception.h"
00016 #include "EventFilter/StorageManager/interface/FaultyEventStreamHandler.h"
00017 #include "EventFilter/StorageManager/interface/FRDStreamHandler.h"
00018 #include "EventFilter/StorageManager/interface/I2OChain.h"
00019 #include "EventFilter/StorageManager/interface/StreamHandler.h"
00020
00021
00022 namespace stor {
00023
00024 DiskWriter::DiskWriter(xdaq::Application *app, SharedResourcesPtr sr) :
00025 app_(app),
00026 sharedResources_(sr),
00027 dbFileHandler_(new DbFileHandler()),
00028 runNumber_(0),
00029 lastFileTimeoutCheckTime_(utils::getCurrentTime()),
00030 endOfRunReport_(new StreamsMonitorCollection::EndOfRunReport()),
00031 actionIsActive_(true)
00032 {
00033 WorkerThreadParams workerParams =
00034 sharedResources_->configuration_->getWorkerThreadParams();
00035 timeout_ = workerParams.DWdeqWaitTime_;
00036 }
00037
00038
00039 DiskWriter::~DiskWriter()
00040 {
00041
00042 actionIsActive_ = false;
00043
00044
00045 writingWL_->cancel();
00046
00047
00048 destroyStreams();
00049 }
00050
00051
00052 void DiskWriter::startWorkLoop(std::string workloopName)
00053 {
00054 try
00055 {
00056 std::string identifier = utils::getIdentifier(app_->getApplicationDescriptor());
00057
00058 writingWL_ = toolbox::task::getWorkLoopFactory()->
00059 getWorkLoop( identifier + workloopName, "waiting" );
00060
00061 if ( ! writingWL_->isActive() )
00062 {
00063 toolbox::task::ActionSignature* processAction =
00064 toolbox::task::bind(this, &DiskWriter::writeAction,
00065 identifier + "WriteNextEvent");
00066 writingWL_->submit(processAction);
00067
00068 writingWL_->activate();
00069 }
00070 }
00071 catch (xcept::Exception& e)
00072 {
00073 std::string msg = "Failed to start workloop 'DiskWriter' with 'writeNextEvent'.";
00074 XCEPT_RETHROW(stor::exception::DiskWriting, msg, e);
00075 }
00076 }
00077
00078
00079 bool DiskWriter::writeAction(toolbox::task::WorkLoop*)
00080 {
00081 std::string errorMsg = "Failed to write an event: ";
00082
00083 try
00084 {
00085 writeNextEvent();
00086 }
00087 catch(xcept::Exception &e)
00088 {
00089 XCEPT_DECLARE_NESTED( stor::exception::DiskWriting,
00090 sentinelException, errorMsg, e );
00091 sharedResources_->moveToFailedState(sentinelException);
00092 }
00093 catch(std::exception &e)
00094 {
00095 errorMsg += e.what();
00096 XCEPT_DECLARE( stor::exception::DiskWriting,
00097 sentinelException, errorMsg );
00098 sharedResources_->moveToFailedState(sentinelException);
00099 }
00100 catch(...)
00101 {
00102 errorMsg += "Unknown exception";
00103 XCEPT_DECLARE( stor::exception::DiskWriting,
00104 sentinelException, errorMsg );
00105 sharedResources_->moveToFailedState(sentinelException);
00106 }
00107
00108 return actionIsActive_;
00109 }
00110
00111
00112 void DiskWriter::writeNextEvent()
00113 {
00114 I2OChain event;
00115 StreamQueuePtr sq = sharedResources_->streamQueue_;
00116 utils::TimePoint_t startTime = utils::getCurrentTime();
00117 if (sq->deqTimedWait(event, timeout_))
00118 {
00119 sharedResources_->diskWriterResources_->setBusy(true);
00120
00121 utils::Duration_t elapsedTime = utils::getCurrentTime() - startTime;
00122 sharedResources_->statisticsReporter_->getThroughputMonitorCollection().addDiskWriterIdleSample(elapsedTime);
00123 sharedResources_->statisticsReporter_->getThroughputMonitorCollection().addPoppedEventSample(event.memoryUsed());
00124
00125 if( event.isEndOfLumiSectionMessage() )
00126 {
00127 processEndOfLumiSection( event );
00128 }
00129 else
00130 {
00131 writeEventToStreams( event );
00132 checkForFileTimeOuts();
00133 }
00134 }
00135 else
00136 {
00137 utils::Duration_t elapsedTime = utils::getCurrentTime() - startTime;
00138 sharedResources_->statisticsReporter_->
00139 getThroughputMonitorCollection().addDiskWriterIdleSample(elapsedTime);
00140
00141 checkStreamChangeRequest();
00142 checkForFileTimeOuts(true);
00143 sharedResources_->diskWriterResources_->setBusy(false);
00144 }
00145 }
00146
00147
00148 void DiskWriter::writeEventToStreams(const I2OChain& event)
00149 {
00150 std::vector<StreamID> streams = event.getStreamTags();
00151
00152 for (
00153 std::vector<StreamID>::const_iterator it = streams.begin(), itEnd = streams.end();
00154 it != itEnd;
00155 ++it
00156 )
00157 {
00158 try
00159 {
00160 streamHandlers_.at(*it)->writeEvent(event);
00161 }
00162 catch (std::out_of_range& e)
00163 {
00164 std::ostringstream msg;
00165 msg << "Unable to retrieve stream handler for " << (*it) << " : ";
00166 msg << e.what();
00167 XCEPT_RAISE(exception::UnknownStreamId, msg.str());
00168 }
00169 }
00170 }
00171
00172
00173 void DiskWriter::checkStreamChangeRequest()
00174 {
00175 EvtStrConfigListPtr evtCfgList;
00176 ErrStrConfigListPtr errCfgList;
00177 DiskWritingParams newdwParams;
00178 unsigned int newRunNumber;
00179 boost::posix_time::time_duration newTimeoutValue;
00180 bool doConfig;
00181 if (sharedResources_->diskWriterResources_->
00182 streamChangeRequested(doConfig, evtCfgList, errCfgList, newdwParams, newRunNumber, newTimeoutValue))
00183 {
00184 destroyStreams();
00185 if (doConfig)
00186 {
00187 dwParams_ = newdwParams;
00188 runNumber_ = newRunNumber;
00189 timeout_ = newTimeoutValue;
00190 dbFileHandler_->configure(runNumber_, dwParams_);
00191
00192 makeFaultyEventStream();
00193 configureEventStreams(evtCfgList);
00194 configureErrorStreams(errCfgList);
00195 }
00196 sharedResources_->diskWriterResources_->streamChangeDone();
00197 }
00198 }
00199
00200
00201 void DiskWriter::checkForFileTimeOuts(const bool doItNow)
00202 {
00203 utils::TimePoint_t now = utils::getCurrentTime();
00204
00205 if (doItNow || (now - lastFileTimeoutCheckTime_) > dwParams_.fileClosingTestInterval_)
00206 {
00207 closeTimedOutFiles(now);
00208 lastFileTimeoutCheckTime_ = now;
00209 }
00210 }
00211
00212
00213 void DiskWriter::closeTimedOutFiles(const utils::TimePoint_t now)
00214 {
00215 std::for_each(streamHandlers_.begin(), streamHandlers_.end(),
00216 boost::bind(&StreamHandler::closeTimedOutFiles, _1, now));
00217 }
00218
00219
00220 void DiskWriter::configureEventStreams(EvtStrConfigListPtr cfgList)
00221 {
00222 for (
00223 EvtStrConfigList::iterator it = cfgList->begin(),
00224 itEnd = cfgList->end();
00225 it != itEnd;
00226 ++it
00227 )
00228 {
00229 if ( it->fractionToDisk() > 0 )
00230 makeEventStream(*it);
00231 }
00232 }
00233
00234
00235 void DiskWriter::configureErrorStreams(ErrStrConfigListPtr cfgList)
00236 {
00237 for (
00238 ErrStrConfigList::iterator it = cfgList->begin(),
00239 itEnd = cfgList->end();
00240 it != itEnd;
00241 ++it
00242 )
00243 {
00244 makeErrorStream(*it);
00245 }
00246 }
00247
00248
00249 void DiskWriter::makeFaultyEventStream()
00250 {
00251 if ( dwParams_.faultyEventsStream_.empty() ) return;
00252
00253 boost::shared_ptr<FaultyEventStreamHandler> newHandler(
00254 new FaultyEventStreamHandler(sharedResources_, dbFileHandler_, dwParams_.faultyEventsStream_)
00255 );
00256 streamHandlers_.push_back(boost::dynamic_pointer_cast<StreamHandler>(newHandler));
00257 }
00258
00259
00260 void DiskWriter::makeEventStream(EventStreamConfigurationInfo& streamCfg)
00261 {
00262 boost::shared_ptr<EventStreamHandler> newHandler(
00263 new EventStreamHandler(streamCfg, sharedResources_, dbFileHandler_)
00264 );
00265 streamHandlers_.push_back(boost::dynamic_pointer_cast<StreamHandler>(newHandler));
00266 streamCfg.setStreamId(streamHandlers_.size() - 1);
00267 }
00268
00269
00270 void DiskWriter::makeErrorStream(ErrorStreamConfigurationInfo& streamCfg)
00271 {
00272 boost::shared_ptr<FRDStreamHandler> newHandler(
00273 new FRDStreamHandler(streamCfg, sharedResources_, dbFileHandler_)
00274 );
00275 streamHandlers_.push_back(boost::dynamic_pointer_cast<StreamHandler>(newHandler));
00276 streamCfg.setStreamId(streamHandlers_.size() - 1);
00277 }
00278
00279
00280 void DiskWriter::destroyStreams()
00281 {
00282 if (streamHandlers_.empty()) return;
00283
00284 std::for_each(streamHandlers_.begin(), streamHandlers_.end(),
00285 boost::bind(&StreamHandler::closeAllFiles, _1));
00286 streamHandlers_.clear();
00287
00288 reportRemainingLumiSections();
00289 writeEndOfRunMarker();
00290 }
00291
00292
00293 void DiskWriter::reportRemainingLumiSections()
00294 {
00295 StreamsMonitorCollection& smc =
00296 sharedResources_->statisticsReporter_->getStreamsMonitorCollection();
00297
00298 smc.reportAllLumiSectionInfos(dbFileHandler_, endOfRunReport_);
00299 }
00300
00301
00302 void DiskWriter::writeEndOfRunMarker()
00303 {
00304 std::ostringstream str;
00305 str << "LScount:" << endOfRunReport_->lsCountWithFiles
00306 << "\tEoLScount:" << endOfRunReport_->eolsCount
00307 << "\tLastLumi:" << endOfRunReport_->latestLumiSectionWritten
00308 << "\tEoR";
00309 dbFileHandler_->write(str.str());
00310 endOfRunReport_->reset();
00311 }
00312
00313
00314 void DiskWriter::processEndOfLumiSection(const I2OChain& msg)
00315 {
00316 if ( msg.faulty() || msg.runNumber() != runNumber_ ) return;
00317
00318 const uint32_t lumiSection = msg.lumiSection();
00319
00320 std::string fileCountStr;
00321 bool filesWritten = false;
00322
00323 for (StreamHandlers::const_iterator it = streamHandlers_.begin(),
00324 itEnd = streamHandlers_.end(); it != itEnd; ++it)
00325 {
00326 if ( (*it)->closeFilesForLumiSection(lumiSection, fileCountStr) )
00327 filesWritten = true;
00328 }
00329 fileCountStr += "\tEoLS:1";
00330 dbFileHandler_->write(fileCountStr);
00331
00332 ++(endOfRunReport_->eolsCount);
00333 if (filesWritten) ++(endOfRunReport_->lsCountWithFiles);
00334 endOfRunReport_->updateLatestWrittenLumiSection(lumiSection);
00335 }
00336
00337 }
00338
00339