00001
00003
00004 #include <algorithm>
00005
00006 #include <boost/bind.hpp>
00007 #include <boost/pointer_cast.hpp>
00008
00009 #include "toolbox/task/WorkLoopFactory.h"
00010 #include "xcept/tools.h"
00011
00012 #include "EventFilter/StorageManager/interface/AlarmHandler.h"
00013 #include "EventFilter/StorageManager/interface/DiskWriter.h"
00014 #include "EventFilter/StorageManager/interface/DiskWriterResources.h"
00015 #include "EventFilter/StorageManager/interface/EventStreamHandler.h"
00016 #include "EventFilter/StorageManager/interface/Exception.h"
00017 #include "EventFilter/StorageManager/interface/FaultyEventStreamHandler.h"
00018 #include "EventFilter/StorageManager/interface/FRDStreamHandler.h"
00019 #include "EventFilter/StorageManager/interface/I2OChain.h"
00020 #include "EventFilter/StorageManager/interface/StreamHandler.h"
00021
00022
00023 namespace stor {
00024
00025 DiskWriter::DiskWriter(xdaq::Application *app, SharedResourcesPtr sr) :
00026 app_(app),
00027 sharedResources_(sr),
00028 dbFileHandler_(new DbFileHandler()),
00029 runNumber_(0),
00030 lastFileTimeoutCheckTime_(utils::getCurrentTime()),
00031 endOfRunReport_(new StreamsMonitorCollection::EndOfRunReport()),
00032 actionIsActive_(true)
00033 {
00034 WorkerThreadParams workerParams =
00035 sharedResources_->configuration_->getWorkerThreadParams();
00036 timeout_ = workerParams.DWdeqWaitTime_;
00037 }
00038
00039
00040 DiskWriter::~DiskWriter()
00041 {
00042
00043 actionIsActive_ = false;
00044
00045
00046 writingWL_->cancel();
00047
00048
00049 destroyStreams();
00050 }
00051
00052
00053 void DiskWriter::startWorkLoop(std::string workloopName)
00054 {
00055 try
00056 {
00057 std::string identifier = utils::getIdentifier(app_->getApplicationDescriptor());
00058
00059 writingWL_ = toolbox::task::getWorkLoopFactory()->
00060 getWorkLoop( identifier + workloopName, "waiting" );
00061
00062 if ( ! writingWL_->isActive() )
00063 {
00064 toolbox::task::ActionSignature* processAction =
00065 toolbox::task::bind(this, &DiskWriter::writeAction,
00066 identifier + "WriteNextEvent");
00067 writingWL_->submit(processAction);
00068
00069 writingWL_->activate();
00070 }
00071 }
00072 catch (xcept::Exception& e)
00073 {
00074 std::string msg = "Failed to start workloop 'DiskWriter' with 'writeNextEvent'.";
00075 XCEPT_RETHROW(stor::exception::DiskWriting, msg, e);
00076 }
00077 }
00078
00079
00080 bool DiskWriter::writeAction(toolbox::task::WorkLoop*)
00081 {
00082 std::string errorMsg = "Failed to write an event: ";
00083
00084 try
00085 {
00086 writeNextEvent();
00087 }
00088 catch(xcept::Exception &e)
00089 {
00090 XCEPT_DECLARE_NESTED( stor::exception::DiskWriting,
00091 sentinelException, errorMsg, e );
00092 sharedResources_->alarmHandler_->moveToFailedState(sentinelException);
00093 }
00094 catch(std::exception &e)
00095 {
00096 errorMsg += e.what();
00097 XCEPT_DECLARE( stor::exception::DiskWriting,
00098 sentinelException, errorMsg );
00099 sharedResources_->alarmHandler_->moveToFailedState(sentinelException);
00100 }
00101 catch(...)
00102 {
00103 errorMsg += "Unknown exception";
00104 XCEPT_DECLARE( stor::exception::DiskWriting,
00105 sentinelException, errorMsg );
00106 sharedResources_->alarmHandler_->moveToFailedState(sentinelException);
00107 }
00108
00109 return actionIsActive_;
00110 }
00111
00112
00113 void DiskWriter::writeNextEvent()
00114 {
00115 I2OChain event;
00116 StreamQueuePtr sq = sharedResources_->streamQueue_;
00117 utils::TimePoint_t startTime = utils::getCurrentTime();
00118 if (sq->deqTimedWait(event, timeout_))
00119 {
00120 sharedResources_->diskWriterResources_->setBusy(true);
00121
00122 utils::Duration_t elapsedTime = utils::getCurrentTime() - startTime;
00123 sharedResources_->statisticsReporter_->getThroughputMonitorCollection().addDiskWriterIdleSample(elapsedTime);
00124 sharedResources_->statisticsReporter_->getThroughputMonitorCollection().addPoppedEventSample(event.memoryUsed());
00125
00126 if( event.isEndOfLumiSectionMessage() )
00127 {
00128 processEndOfLumiSection( event );
00129 }
00130 else
00131 {
00132 writeEventToStreams( event );
00133 checkForFileTimeOuts();
00134 }
00135 }
00136 else
00137 {
00138 utils::Duration_t elapsedTime = utils::getCurrentTime() - startTime;
00139 sharedResources_->statisticsReporter_->
00140 getThroughputMonitorCollection().addDiskWriterIdleSample(elapsedTime);
00141
00142 checkStreamChangeRequest();
00143 checkForFileTimeOuts(true);
00144 sharedResources_->diskWriterResources_->setBusy(false);
00145 }
00146 }
00147
00148
00149 void DiskWriter::writeEventToStreams(const I2OChain& event)
00150 {
00151 std::vector<StreamID> streams = event.getStreamTags();
00152
00153 for (
00154 std::vector<StreamID>::const_iterator it = streams.begin(), itEnd = streams.end();
00155 it != itEnd;
00156 ++it
00157 )
00158 {
00159 try
00160 {
00161 streamHandlers_.at(*it)->writeEvent(event);
00162 }
00163 catch (std::out_of_range& e)
00164 {
00165 std::ostringstream msg;
00166 msg << "Unable to retrieve stream handler for " << (*it) << " : ";
00167 msg << e.what();
00168 XCEPT_RAISE(exception::UnknownStreamId, msg.str());
00169 }
00170 }
00171 }
00172
00173
00174 void DiskWriter::checkStreamChangeRequest()
00175 {
00176 EvtStrConfigListPtr evtCfgList;
00177 ErrStrConfigListPtr errCfgList;
00178 DiskWritingParams newdwParams;
00179 unsigned int newRunNumber;
00180 boost::posix_time::time_duration newTimeoutValue;
00181 bool doConfig;
00182 if (sharedResources_->diskWriterResources_->
00183 streamChangeRequested(doConfig, evtCfgList, errCfgList, newdwParams, newRunNumber, newTimeoutValue))
00184 {
00185 destroyStreams();
00186 if (doConfig)
00187 {
00188 dwParams_ = newdwParams;
00189 runNumber_ = newRunNumber;
00190 timeout_ = newTimeoutValue;
00191 dbFileHandler_->configure(runNumber_, dwParams_);
00192
00193 makeFaultyEventStream();
00194 configureEventStreams(evtCfgList);
00195 configureErrorStreams(errCfgList);
00196 }
00197 sharedResources_->diskWriterResources_->streamChangeDone();
00198 }
00199 }
00200
00201
00202 void DiskWriter::checkForFileTimeOuts(const bool doItNow)
00203 {
00204 utils::TimePoint_t now = utils::getCurrentTime();
00205
00206 if (doItNow || (now - lastFileTimeoutCheckTime_) > dwParams_.fileClosingTestInterval_)
00207 {
00208 closeTimedOutFiles(now);
00209 lastFileTimeoutCheckTime_ = now;
00210 }
00211 }
00212
00213
00214 void DiskWriter::closeTimedOutFiles(const utils::TimePoint_t now)
00215 {
00216 std::for_each(streamHandlers_.begin(), streamHandlers_.end(),
00217 boost::bind(&StreamHandler::closeTimedOutFiles, _1, now));
00218 }
00219
00220
00221 void DiskWriter::configureEventStreams(EvtStrConfigListPtr cfgList)
00222 {
00223 for (
00224 EvtStrConfigList::iterator it = cfgList->begin(),
00225 itEnd = cfgList->end();
00226 it != itEnd;
00227 ++it
00228 )
00229 {
00230 if ( it->fractionToDisk() > 0 )
00231 makeEventStream(*it);
00232 }
00233 }
00234
00235
00236 void DiskWriter::configureErrorStreams(ErrStrConfigListPtr cfgList)
00237 {
00238 for (
00239 ErrStrConfigList::iterator it = cfgList->begin(),
00240 itEnd = cfgList->end();
00241 it != itEnd;
00242 ++it
00243 )
00244 {
00245 makeErrorStream(*it);
00246 }
00247 }
00248
00249
00250 void DiskWriter::makeFaultyEventStream()
00251 {
00252 if ( dwParams_.faultyEventsStream_.empty() ) return;
00253
00254 boost::shared_ptr<FaultyEventStreamHandler> newHandler(
00255 new FaultyEventStreamHandler(sharedResources_, dbFileHandler_, dwParams_.faultyEventsStream_)
00256 );
00257 streamHandlers_.push_back(boost::dynamic_pointer_cast<StreamHandler>(newHandler));
00258 }
00259
00260
00261 void DiskWriter::makeEventStream(EventStreamConfigurationInfo& streamCfg)
00262 {
00263 boost::shared_ptr<EventStreamHandler> newHandler(
00264 new EventStreamHandler(streamCfg, sharedResources_, dbFileHandler_)
00265 );
00266 streamHandlers_.push_back(boost::dynamic_pointer_cast<StreamHandler>(newHandler));
00267 streamCfg.setStreamId(streamHandlers_.size() - 1);
00268 }
00269
00270
00271 void DiskWriter::makeErrorStream(ErrorStreamConfigurationInfo& streamCfg)
00272 {
00273 boost::shared_ptr<FRDStreamHandler> newHandler(
00274 new FRDStreamHandler(streamCfg, sharedResources_, dbFileHandler_)
00275 );
00276 streamHandlers_.push_back(boost::dynamic_pointer_cast<StreamHandler>(newHandler));
00277 streamCfg.setStreamId(streamHandlers_.size() - 1);
00278 }
00279
00280
00281 void DiskWriter::destroyStreams()
00282 {
00283 if (streamHandlers_.empty()) return;
00284
00285 std::for_each(streamHandlers_.begin(), streamHandlers_.end(),
00286 boost::bind(&StreamHandler::closeAllFiles, _1));
00287 streamHandlers_.clear();
00288
00289 reportRemainingLumiSections();
00290 writeEndOfRunMarker();
00291 }
00292
00293
00294 void DiskWriter::reportRemainingLumiSections()
00295 {
00296 StreamsMonitorCollection& smc =
00297 sharedResources_->statisticsReporter_->getStreamsMonitorCollection();
00298
00299 smc.reportAllLumiSectionInfos(dbFileHandler_, endOfRunReport_);
00300 }
00301
00302
00303 void DiskWriter::writeEndOfRunMarker()
00304 {
00305 std::ostringstream str;
00306 str << "LScount:" << endOfRunReport_->lsCountWithFiles
00307 << "\tEoLScount:" << endOfRunReport_->eolsCount
00308 << "\tLastLumi:" << endOfRunReport_->latestLumiSectionWritten
00309 << "\tEoR";
00310 dbFileHandler_->write(str.str());
00311 endOfRunReport_->reset();
00312 }
00313
00314
00315 void DiskWriter::processEndOfLumiSection(const I2OChain& msg)
00316 {
00317 if ( msg.faulty() || msg.runNumber() != runNumber_ ) return;
00318
00319 const uint32_t lumiSection = msg.lumiSection();
00320
00321 std::string fileCountStr;
00322 bool filesWritten = false;
00323
00324 for (StreamHandlers::const_iterator it = streamHandlers_.begin(),
00325 itEnd = streamHandlers_.end(); it != itEnd; ++it)
00326 {
00327 if ( (*it)->closeFilesForLumiSection(lumiSection, fileCountStr) )
00328 filesWritten = true;
00329 }
00330 fileCountStr += "\tEoLS:1";
00331 dbFileHandler_->write(fileCountStr);
00332
00333 ++(endOfRunReport_->eolsCount);
00334 if (filesWritten) ++(endOfRunReport_->lsCountWithFiles);
00335 endOfRunReport_->updateLatestWrittenLumiSection(lumiSection);
00336 }
00337
00338 }
00339
00340