CMS 3D CMS Logo

/data/refman/pasoursint/CMSSW_5_3_10/src/EventFilter/StorageManager/src/DiskWriter.cc

Go to the documentation of this file.
00001 // $Id: DiskWriter.cc,v 1.32 2011/11/08 10:48:40 mommsen Exp $
00003 
00004 #include <algorithm>
00005 
00006 #include <boost/bind.hpp>
00007 #include <boost/pointer_cast.hpp>
00008 
00009 #include "toolbox/task/WorkLoopFactory.h"
00010 #include "xcept/tools.h"
00011 
00012 #include "EventFilter/StorageManager/interface/AlarmHandler.h"
00013 #include "EventFilter/StorageManager/interface/DiskWriter.h"
00014 #include "EventFilter/StorageManager/interface/DiskWriterResources.h"
00015 #include "EventFilter/StorageManager/interface/EventStreamHandler.h"
00016 #include "EventFilter/StorageManager/interface/Exception.h"
00017 #include "EventFilter/StorageManager/interface/FaultyEventStreamHandler.h"
00018 #include "EventFilter/StorageManager/interface/FRDStreamHandler.h"
00019 #include "EventFilter/StorageManager/interface/I2OChain.h"
00020 #include "EventFilter/StorageManager/interface/StreamHandler.h"
00021 
00022 
00023 namespace stor {
00024   
00025   DiskWriter::DiskWriter(xdaq::Application *app, SharedResourcesPtr sr) :
00026   app_(app),
00027   sharedResources_(sr),
00028   dbFileHandler_(new DbFileHandler()),
00029   runNumber_(0),
00030   lastFileTimeoutCheckTime_(utils::getCurrentTime()),
00031   endOfRunReport_(new StreamsMonitorCollection::EndOfRunReport()),
00032   actionIsActive_(true)
00033   {
00034     WorkerThreadParams workerParams =
00035       sharedResources_->configuration_->getWorkerThreadParams();
00036     timeout_ = workerParams.DWdeqWaitTime_;
00037   }
00038   
00039   
00040   DiskWriter::~DiskWriter()
00041   {
00042     // Stop the activity
00043     actionIsActive_ = false;
00044     
00045     // Cancel the workloop (will wait until the action has finished)
00046     writingWL_->cancel();
00047     
00048     // Destroy any remaining streams. Under normal conditions, there should be none
00049     destroyStreams(); 
00050   }
00051   
00052   
00053   void DiskWriter::startWorkLoop(std::string workloopName)
00054   {
00055     try
00056     {
00057       std::string identifier = utils::getIdentifier(app_->getApplicationDescriptor());
00058       
00059       writingWL_ = toolbox::task::getWorkLoopFactory()->
00060         getWorkLoop( identifier + workloopName, "waiting" );
00061       
00062       if ( ! writingWL_->isActive() )
00063       {
00064         toolbox::task::ActionSignature* processAction = 
00065           toolbox::task::bind(this, &DiskWriter::writeAction, 
00066             identifier + "WriteNextEvent");
00067         writingWL_->submit(processAction);
00068         
00069         writingWL_->activate();
00070       }
00071     }
00072     catch (xcept::Exception& e)
00073     {
00074       std::string msg = "Failed to start workloop 'DiskWriter' with 'writeNextEvent'.";
00075       XCEPT_RETHROW(stor::exception::DiskWriting, msg, e);
00076     }
00077   }
00078   
00079   
00080   bool DiskWriter::writeAction(toolbox::task::WorkLoop*)
00081   {
00082     std::string errorMsg = "Failed to write an event: ";
00083     
00084     try
00085     {
00086       writeNextEvent();
00087     }
00088     catch(xcept::Exception &e)
00089     {
00090       XCEPT_DECLARE_NESTED( stor::exception::DiskWriting,
00091         sentinelException, errorMsg, e );
00092       sharedResources_->alarmHandler_->moveToFailedState(sentinelException);
00093     }
00094     catch(std::exception &e)
00095     {
00096       errorMsg += e.what();
00097       XCEPT_DECLARE( stor::exception::DiskWriting,
00098         sentinelException, errorMsg );
00099       sharedResources_->alarmHandler_->moveToFailedState(sentinelException);
00100     }
00101     catch(...)
00102     {
00103       errorMsg += "Unknown exception";
00104       XCEPT_DECLARE( stor::exception::DiskWriting,
00105         sentinelException, errorMsg );
00106       sharedResources_->alarmHandler_->moveToFailedState(sentinelException);
00107     }
00108     
00109     return actionIsActive_;
00110   }
00111   
00112   
00113   void DiskWriter::writeNextEvent()
00114   {
00115     I2OChain event;
00116     StreamQueuePtr sq = sharedResources_->streamQueue_;
00117     utils::TimePoint_t startTime = utils::getCurrentTime();
00118     if (sq->deqTimedWait(event, timeout_))
00119     {
00120       sharedResources_->diskWriterResources_->setBusy(true);
00121       
00122       utils::Duration_t elapsedTime = utils::getCurrentTime() - startTime;
00123       sharedResources_->statisticsReporter_->getThroughputMonitorCollection().addDiskWriterIdleSample(elapsedTime);
00124       sharedResources_->statisticsReporter_->getThroughputMonitorCollection().addPoppedEventSample(event.memoryUsed());
00125       
00126       if( event.isEndOfLumiSectionMessage() )
00127       {
00128         processEndOfLumiSection( event );
00129       }
00130       else
00131       {
00132         writeEventToStreams( event );
00133         checkForFileTimeOuts();
00134       }
00135     }
00136     else
00137     {
00138       utils::Duration_t elapsedTime = utils::getCurrentTime() - startTime;
00139       sharedResources_->statisticsReporter_->
00140         getThroughputMonitorCollection().addDiskWriterIdleSample(elapsedTime);
00141       
00142       checkStreamChangeRequest();
00143       checkForFileTimeOuts(true);
00144       sharedResources_->diskWriterResources_->setBusy(false);
00145     }
00146   }
00147   
00148   
00149   void DiskWriter::writeEventToStreams(const I2OChain& event)
00150   {
00151     std::vector<StreamID> streams = event.getStreamTags();
00152     
00153     for (
00154       std::vector<StreamID>::const_iterator it = streams.begin(), itEnd = streams.end();
00155       it != itEnd;
00156       ++it
00157     )
00158     {
00159       try
00160       {
00161         streamHandlers_.at(*it)->writeEvent(event);
00162       }
00163       catch (std::out_of_range& e)
00164       {
00165         std::ostringstream msg;
00166         msg << "Unable to retrieve stream handler for " << (*it) << " : ";
00167         msg << e.what();
00168         XCEPT_RAISE(exception::UnknownStreamId, msg.str());
00169       }
00170     }
00171   }
00172   
00173   
00174   void DiskWriter::checkStreamChangeRequest()
00175   {
00176     EvtStrConfigListPtr evtCfgList;
00177     ErrStrConfigListPtr errCfgList;
00178     DiskWritingParams newdwParams;
00179     unsigned int newRunNumber;
00180     boost::posix_time::time_duration newTimeoutValue;
00181     bool doConfig;
00182     if (sharedResources_->diskWriterResources_->
00183       streamChangeRequested(doConfig, evtCfgList, errCfgList, newdwParams, newRunNumber, newTimeoutValue))
00184     {
00185       destroyStreams();
00186       if (doConfig)
00187       {
00188         dwParams_ = newdwParams;
00189         runNumber_ = newRunNumber;
00190         timeout_ = newTimeoutValue;
00191         dbFileHandler_->configure(runNumber_, dwParams_);
00192         
00193         makeFaultyEventStream();
00194         configureEventStreams(evtCfgList);
00195         configureErrorStreams(errCfgList);
00196       }
00197       sharedResources_->diskWriterResources_->streamChangeDone();
00198     }
00199   }
00200   
00201   
00202   void DiskWriter::checkForFileTimeOuts(const bool doItNow)
00203   {
00204     utils::TimePoint_t now = utils::getCurrentTime();
00205     
00206     if (doItNow || (now - lastFileTimeoutCheckTime_) > dwParams_.fileClosingTestInterval_)
00207     {
00208       closeTimedOutFiles(now);
00209       lastFileTimeoutCheckTime_ = now;
00210     }
00211   }
00212   
00213   
00214   void DiskWriter::closeTimedOutFiles(const utils::TimePoint_t now)
00215   {
00216     std::for_each(streamHandlers_.begin(), streamHandlers_.end(),
00217       boost::bind(&StreamHandler::closeTimedOutFiles, _1, now));
00218   }
00219   
00220   
00221   void DiskWriter::configureEventStreams(EvtStrConfigListPtr cfgList)
00222   {
00223     for (
00224       EvtStrConfigList::iterator it = cfgList->begin(),
00225         itEnd = cfgList->end();
00226       it != itEnd;
00227       ++it
00228     ) 
00229     {
00230       if ( it->fractionToDisk() > 0 )
00231         makeEventStream(*it);
00232     }
00233   }
00234   
00235   
00236   void DiskWriter::configureErrorStreams(ErrStrConfigListPtr cfgList)
00237   {
00238     for (
00239       ErrStrConfigList::iterator it = cfgList->begin(),
00240         itEnd = cfgList->end();
00241       it != itEnd;
00242       ++it
00243     ) 
00244     {
00245       makeErrorStream(*it);
00246     }
00247   }
00248   
00249   
00250   void DiskWriter::makeFaultyEventStream()
00251   {
00252     if ( dwParams_.faultyEventsStream_.empty() ) return;
00253 
00254     boost::shared_ptr<FaultyEventStreamHandler> newHandler(
00255     new FaultyEventStreamHandler(sharedResources_, dbFileHandler_, dwParams_.faultyEventsStream_)
00256     );
00257     streamHandlers_.push_back(boost::dynamic_pointer_cast<StreamHandler>(newHandler));
00258   }
00259   
00260   
00261   void DiskWriter::makeEventStream(EventStreamConfigurationInfo& streamCfg)
00262   {
00263     boost::shared_ptr<EventStreamHandler> newHandler(
00264       new EventStreamHandler(streamCfg, sharedResources_, dbFileHandler_)
00265     );
00266     streamHandlers_.push_back(boost::dynamic_pointer_cast<StreamHandler>(newHandler));
00267     streamCfg.setStreamId(streamHandlers_.size() - 1);
00268   }
00269   
00270   
00271   void DiskWriter::makeErrorStream(ErrorStreamConfigurationInfo& streamCfg)
00272   {
00273     boost::shared_ptr<FRDStreamHandler> newHandler(
00274       new FRDStreamHandler(streamCfg, sharedResources_, dbFileHandler_)
00275     );
00276     streamHandlers_.push_back(boost::dynamic_pointer_cast<StreamHandler>(newHandler));
00277     streamCfg.setStreamId(streamHandlers_.size() - 1);
00278   }
00279   
00280   
00281   void DiskWriter::destroyStreams()
00282   {
00283     if (streamHandlers_.empty()) return;
00284     
00285     std::for_each(streamHandlers_.begin(), streamHandlers_.end(),
00286       boost::bind(&StreamHandler::closeAllFiles, _1));
00287     streamHandlers_.clear();
00288     
00289     reportRemainingLumiSections();
00290     writeEndOfRunMarker();
00291   }
00292   
00293   
00294   void DiskWriter::reportRemainingLumiSections()
00295   {
00296     StreamsMonitorCollection& smc =
00297       sharedResources_->statisticsReporter_->getStreamsMonitorCollection();
00298     
00299     smc.reportAllLumiSectionInfos(dbFileHandler_, endOfRunReport_);
00300   }
00301   
00302   
00303   void DiskWriter::writeEndOfRunMarker()
00304   {
00305     std::ostringstream str;
00306     str << "LScount:" << endOfRunReport_->lsCountWithFiles
00307       << "\tEoLScount:" << endOfRunReport_->eolsCount
00308       << "\tLastLumi:" << endOfRunReport_->latestLumiSectionWritten
00309       << "\tEoR";
00310     dbFileHandler_->write(str.str());
00311     endOfRunReport_->reset();
00312   }
00313   
00314   
00315   void DiskWriter::processEndOfLumiSection(const I2OChain& msg)
00316   {
00317     if ( msg.faulty() || msg.runNumber() != runNumber_ ) return;
00318     
00319     const uint32_t lumiSection = msg.lumiSection();
00320     
00321     std::string fileCountStr;
00322     bool filesWritten = false;
00323 
00324     for (StreamHandlers::const_iterator it = streamHandlers_.begin(),
00325            itEnd = streamHandlers_.end(); it != itEnd; ++it)
00326     {
00327       if ( (*it)->closeFilesForLumiSection(lumiSection, fileCountStr) )
00328         filesWritten = true;
00329     }
00330     fileCountStr += "\tEoLS:1";
00331     dbFileHandler_->write(fileCountStr);
00332 
00333     ++(endOfRunReport_->eolsCount);
00334     if (filesWritten) ++(endOfRunReport_->lsCountWithFiles);
00335     endOfRunReport_->updateLatestWrittenLumiSection(lumiSection);
00336   }
00337   
00338 } // namespace stor
00339 
00340