CMS 3D CMS Logo

/data/refman/pasoursint/CMSSW_4_2_9_HLT1_bphpatch4/src/EventFilter/StorageManager/src/DiskWriter.cc

Go to the documentation of this file.
00001 // $Id: DiskWriter.cc,v 1.31 2011/06/20 16:38:51 mommsen Exp $
00003 
00004 #include <algorithm>
00005 
00006 #include <boost/bind.hpp>
00007 #include <boost/pointer_cast.hpp>
00008 
00009 #include "toolbox/task/WorkLoopFactory.h"
00010 #include "xcept/tools.h"
00011 
00012 #include "EventFilter/StorageManager/interface/DiskWriter.h"
00013 #include "EventFilter/StorageManager/interface/DiskWriterResources.h"
00014 #include "EventFilter/StorageManager/interface/EventStreamHandler.h"
00015 #include "EventFilter/StorageManager/interface/Exception.h"
00016 #include "EventFilter/StorageManager/interface/FaultyEventStreamHandler.h"
00017 #include "EventFilter/StorageManager/interface/FRDStreamHandler.h"
00018 #include "EventFilter/StorageManager/interface/I2OChain.h"
00019 #include "EventFilter/StorageManager/interface/StreamHandler.h"
00020 
00021 
00022 namespace stor {
00023   
00024   DiskWriter::DiskWriter(xdaq::Application *app, SharedResourcesPtr sr) :
00025   app_(app),
00026   sharedResources_(sr),
00027   dbFileHandler_(new DbFileHandler()),
00028   runNumber_(0),
00029   lastFileTimeoutCheckTime_(utils::getCurrentTime()),
00030   endOfRunReport_(new StreamsMonitorCollection::EndOfRunReport()),
00031   actionIsActive_(true)
00032   {
00033     WorkerThreadParams workerParams =
00034       sharedResources_->configuration_->getWorkerThreadParams();
00035     timeout_ = workerParams.DWdeqWaitTime_;
00036   }
00037   
00038   
00039   DiskWriter::~DiskWriter()
00040   {
00041     // Stop the activity
00042     actionIsActive_ = false;
00043     
00044     // Cancel the workloop (will wait until the action has finished)
00045     writingWL_->cancel();
00046     
00047     // Destroy any remaining streams. Under normal conditions, there should be none
00048     destroyStreams(); 
00049   }
00050   
00051   
00052   void DiskWriter::startWorkLoop(std::string workloopName)
00053   {
00054     try
00055     {
00056       std::string identifier = utils::getIdentifier(app_->getApplicationDescriptor());
00057       
00058       writingWL_ = toolbox::task::getWorkLoopFactory()->
00059         getWorkLoop( identifier + workloopName, "waiting" );
00060       
00061       if ( ! writingWL_->isActive() )
00062       {
00063         toolbox::task::ActionSignature* processAction = 
00064           toolbox::task::bind(this, &DiskWriter::writeAction, 
00065             identifier + "WriteNextEvent");
00066         writingWL_->submit(processAction);
00067         
00068         writingWL_->activate();
00069       }
00070     }
00071     catch (xcept::Exception& e)
00072     {
00073       std::string msg = "Failed to start workloop 'DiskWriter' with 'writeNextEvent'.";
00074       XCEPT_RETHROW(stor::exception::DiskWriting, msg, e);
00075     }
00076   }
00077   
00078   
00079   bool DiskWriter::writeAction(toolbox::task::WorkLoop*)
00080   {
00081     std::string errorMsg = "Failed to write an event: ";
00082     
00083     try
00084     {
00085       writeNextEvent();
00086     }
00087     catch(xcept::Exception &e)
00088     {
00089       XCEPT_DECLARE_NESTED( stor::exception::DiskWriting,
00090         sentinelException, errorMsg, e );
00091       sharedResources_->moveToFailedState(sentinelException);
00092     }
00093     catch(std::exception &e)
00094     {
00095       errorMsg += e.what();
00096       XCEPT_DECLARE( stor::exception::DiskWriting,
00097         sentinelException, errorMsg );
00098       sharedResources_->moveToFailedState(sentinelException);
00099     }
00100     catch(...)
00101     {
00102       errorMsg += "Unknown exception";
00103       XCEPT_DECLARE( stor::exception::DiskWriting,
00104         sentinelException, errorMsg );
00105       sharedResources_->moveToFailedState(sentinelException);
00106     }
00107     
00108     return actionIsActive_;
00109   }
00110   
00111   
00112   void DiskWriter::writeNextEvent()
00113   {
00114     I2OChain event;
00115     StreamQueuePtr sq = sharedResources_->streamQueue_;
00116     utils::TimePoint_t startTime = utils::getCurrentTime();
00117     if (sq->deqTimedWait(event, timeout_))
00118     {
00119       sharedResources_->diskWriterResources_->setBusy(true);
00120       
00121       utils::Duration_t elapsedTime = utils::getCurrentTime() - startTime;
00122       sharedResources_->statisticsReporter_->getThroughputMonitorCollection().addDiskWriterIdleSample(elapsedTime);
00123       sharedResources_->statisticsReporter_->getThroughputMonitorCollection().addPoppedEventSample(event.memoryUsed());
00124       
00125       if( event.isEndOfLumiSectionMessage() )
00126       {
00127         processEndOfLumiSection( event );
00128       }
00129       else
00130       {
00131         writeEventToStreams( event );
00132         checkForFileTimeOuts();
00133       }
00134     }
00135     else
00136     {
00137       utils::Duration_t elapsedTime = utils::getCurrentTime() - startTime;
00138       sharedResources_->statisticsReporter_->
00139         getThroughputMonitorCollection().addDiskWriterIdleSample(elapsedTime);
00140       
00141       checkStreamChangeRequest();
00142       checkForFileTimeOuts(true);
00143       sharedResources_->diskWriterResources_->setBusy(false);
00144     }
00145   }
00146   
00147   
00148   void DiskWriter::writeEventToStreams(const I2OChain& event)
00149   {
00150     std::vector<StreamID> streams = event.getStreamTags();
00151     
00152     for (
00153       std::vector<StreamID>::const_iterator it = streams.begin(), itEnd = streams.end();
00154       it != itEnd;
00155       ++it
00156     )
00157     {
00158       try
00159       {
00160         streamHandlers_.at(*it)->writeEvent(event);
00161       }
00162       catch (std::out_of_range& e)
00163       {
00164         std::ostringstream msg;
00165         msg << "Unable to retrieve stream handler for " << (*it) << " : ";
00166         msg << e.what();
00167         XCEPT_RAISE(exception::UnknownStreamId, msg.str());
00168       }
00169     }
00170   }
00171   
00172   
00173   void DiskWriter::checkStreamChangeRequest()
00174   {
00175     EvtStrConfigListPtr evtCfgList;
00176     ErrStrConfigListPtr errCfgList;
00177     DiskWritingParams newdwParams;
00178     unsigned int newRunNumber;
00179     boost::posix_time::time_duration newTimeoutValue;
00180     bool doConfig;
00181     if (sharedResources_->diskWriterResources_->
00182       streamChangeRequested(doConfig, evtCfgList, errCfgList, newdwParams, newRunNumber, newTimeoutValue))
00183     {
00184       destroyStreams();
00185       if (doConfig)
00186       {
00187         dwParams_ = newdwParams;
00188         runNumber_ = newRunNumber;
00189         timeout_ = newTimeoutValue;
00190         dbFileHandler_->configure(runNumber_, dwParams_);
00191         
00192         makeFaultyEventStream();
00193         configureEventStreams(evtCfgList);
00194         configureErrorStreams(errCfgList);
00195       }
00196       sharedResources_->diskWriterResources_->streamChangeDone();
00197     }
00198   }
00199   
00200   
00201   void DiskWriter::checkForFileTimeOuts(const bool doItNow)
00202   {
00203     utils::TimePoint_t now = utils::getCurrentTime();
00204     
00205     if (doItNow || (now - lastFileTimeoutCheckTime_) > dwParams_.fileClosingTestInterval_)
00206     {
00207       closeTimedOutFiles(now);
00208       lastFileTimeoutCheckTime_ = now;
00209     }
00210   }
00211   
00212   
00213   void DiskWriter::closeTimedOutFiles(const utils::TimePoint_t now)
00214   {
00215     std::for_each(streamHandlers_.begin(), streamHandlers_.end(),
00216       boost::bind(&StreamHandler::closeTimedOutFiles, _1, now));
00217   }
00218   
00219   
00220   void DiskWriter::configureEventStreams(EvtStrConfigListPtr cfgList)
00221   {
00222     for (
00223       EvtStrConfigList::iterator it = cfgList->begin(),
00224         itEnd = cfgList->end();
00225       it != itEnd;
00226       ++it
00227     ) 
00228     {
00229       if ( it->fractionToDisk() > 0 )
00230         makeEventStream(*it);
00231     }
00232   }
00233   
00234   
00235   void DiskWriter::configureErrorStreams(ErrStrConfigListPtr cfgList)
00236   {
00237     for (
00238       ErrStrConfigList::iterator it = cfgList->begin(),
00239         itEnd = cfgList->end();
00240       it != itEnd;
00241       ++it
00242     ) 
00243     {
00244       makeErrorStream(*it);
00245     }
00246   }
00247   
00248   
00249   void DiskWriter::makeFaultyEventStream()
00250   {
00251     if ( dwParams_.faultyEventsStream_.empty() ) return;
00252 
00253     boost::shared_ptr<FaultyEventStreamHandler> newHandler(
00254     new FaultyEventStreamHandler(sharedResources_, dbFileHandler_, dwParams_.faultyEventsStream_)
00255     );
00256     streamHandlers_.push_back(boost::dynamic_pointer_cast<StreamHandler>(newHandler));
00257   }
00258   
00259   
00260   void DiskWriter::makeEventStream(EventStreamConfigurationInfo& streamCfg)
00261   {
00262     boost::shared_ptr<EventStreamHandler> newHandler(
00263       new EventStreamHandler(streamCfg, sharedResources_, dbFileHandler_)
00264     );
00265     streamHandlers_.push_back(boost::dynamic_pointer_cast<StreamHandler>(newHandler));
00266     streamCfg.setStreamId(streamHandlers_.size() - 1);
00267   }
00268   
00269   
00270   void DiskWriter::makeErrorStream(ErrorStreamConfigurationInfo& streamCfg)
00271   {
00272     boost::shared_ptr<FRDStreamHandler> newHandler(
00273       new FRDStreamHandler(streamCfg, sharedResources_, dbFileHandler_)
00274     );
00275     streamHandlers_.push_back(boost::dynamic_pointer_cast<StreamHandler>(newHandler));
00276     streamCfg.setStreamId(streamHandlers_.size() - 1);
00277   }
00278   
00279   
00280   void DiskWriter::destroyStreams()
00281   {
00282     if (streamHandlers_.empty()) return;
00283     
00284     std::for_each(streamHandlers_.begin(), streamHandlers_.end(),
00285       boost::bind(&StreamHandler::closeAllFiles, _1));
00286     streamHandlers_.clear();
00287     
00288     reportRemainingLumiSections();
00289     writeEndOfRunMarker();
00290   }
00291   
00292   
00293   void DiskWriter::reportRemainingLumiSections()
00294   {
00295     StreamsMonitorCollection& smc =
00296       sharedResources_->statisticsReporter_->getStreamsMonitorCollection();
00297     
00298     smc.reportAllLumiSectionInfos(dbFileHandler_, endOfRunReport_);
00299   }
00300   
00301   
00302   void DiskWriter::writeEndOfRunMarker()
00303   {
00304     std::ostringstream str;
00305     str << "LScount:" << endOfRunReport_->lsCountWithFiles
00306       << "\tEoLScount:" << endOfRunReport_->eolsCount
00307       << "\tLastLumi:" << endOfRunReport_->latestLumiSectionWritten
00308       << "\tEoR";
00309     dbFileHandler_->write(str.str());
00310     endOfRunReport_->reset();
00311   }
00312   
00313   
00314   void DiskWriter::processEndOfLumiSection(const I2OChain& msg)
00315   {
00316     if ( msg.faulty() || msg.runNumber() != runNumber_ ) return;
00317     
00318     const uint32_t lumiSection = msg.lumiSection();
00319     
00320     std::string fileCountStr;
00321     bool filesWritten = false;
00322 
00323     for (StreamHandlers::const_iterator it = streamHandlers_.begin(),
00324            itEnd = streamHandlers_.end(); it != itEnd; ++it)
00325     {
00326       if ( (*it)->closeFilesForLumiSection(lumiSection, fileCountStr) )
00327         filesWritten = true;
00328     }
00329     fileCountStr += "\tEoLS:1";
00330     dbFileHandler_->write(fileCountStr);
00331 
00332     ++(endOfRunReport_->eolsCount);
00333     if (filesWritten) ++(endOfRunReport_->lsCountWithFiles);
00334     endOfRunReport_->updateLatestWrittenLumiSection(lumiSection);
00335   }
00336   
00337 } // namespace stor
00338 
00339