CMS 3D CMS Logo

/data/refman/pasoursint/CMSSW_4_1_8_patch13/src/EventFilter/StorageManager/src/DiskWriter.cc

Go to the documentation of this file.
00001 // $Id: DiskWriter.cc,v 1.26 2011/03/07 15:31:32 mommsen Exp $
00003 
00004 #include <algorithm>
00005 
00006 #include <boost/bind.hpp>
00007 #include <boost/pointer_cast.hpp>
00008 
00009 #include "toolbox/task/WorkLoopFactory.h"
00010 #include "xcept/tools.h"
00011 
00012 #include "EventFilter/StorageManager/interface/DiskWriter.h"
00013 #include "EventFilter/StorageManager/interface/DiskWriterResources.h"
00014 #include "EventFilter/StorageManager/interface/EventStreamHandler.h"
00015 #include "EventFilter/StorageManager/interface/Exception.h"
00016 #include "EventFilter/StorageManager/interface/FaultyEventStreamHandler.h"
00017 #include "EventFilter/StorageManager/interface/FRDStreamHandler.h"
00018 #include "EventFilter/StorageManager/interface/I2OChain.h"
00019 #include "EventFilter/StorageManager/interface/StreamHandler.h"
00020 
00021 
00022 namespace stor {
00023   
00024   DiskWriter::DiskWriter(xdaq::Application *app, SharedResourcesPtr sr) :
00025   app_(app),
00026   sharedResources_(sr),
00027   dbFileHandler_(new DbFileHandler()),
00028   runNumber_(0),
00029   lastFileTimeoutCheckTime_(utils::getCurrentTime()),
00030   actionIsActive_(true)
00031   {
00032     WorkerThreadParams workerParams =
00033       sharedResources_->configuration_->getWorkerThreadParams();
00034     timeout_ = workerParams.DWdeqWaitTime_;
00035   }
00036   
00037   
00038   DiskWriter::~DiskWriter()
00039   {
00040     // Stop the activity
00041     actionIsActive_ = false;
00042     
00043     // Cancel the workloop (will wait until the action has finished)
00044     writingWL_->cancel();
00045     
00046     // Destroy any remaining streams. Under normal conditions, there should be none
00047     destroyStreams(); 
00048   }
00049   
00050   
00051   void DiskWriter::startWorkLoop(std::string workloopName)
00052   {
00053     try
00054     {
00055       std::string identifier = utils::getIdentifier(app_->getApplicationDescriptor());
00056       
00057       writingWL_ = toolbox::task::getWorkLoopFactory()->
00058         getWorkLoop( identifier + workloopName, "waiting" );
00059       
00060       if ( ! writingWL_->isActive() )
00061       {
00062         toolbox::task::ActionSignature* processAction = 
00063           toolbox::task::bind(this, &DiskWriter::writeAction, 
00064             identifier + "WriteNextEvent");
00065         writingWL_->submit(processAction);
00066         
00067         writingWL_->activate();
00068       }
00069     }
00070     catch (xcept::Exception& e)
00071     {
00072       std::string msg = "Failed to start workloop 'DiskWriter' with 'writeNextEvent'.";
00073       XCEPT_RETHROW(stor::exception::DiskWriting, msg, e);
00074     }
00075   }
00076   
00077   
00078   bool DiskWriter::writeAction(toolbox::task::WorkLoop*)
00079   {
00080     std::string errorMsg = "Failed to write an event: ";
00081     
00082     try
00083     {
00084       writeNextEvent();
00085     }
00086     catch(xcept::Exception &e)
00087     {
00088       XCEPT_DECLARE_NESTED( stor::exception::DiskWriting,
00089         sentinelException, errorMsg, e );
00090       sharedResources_->moveToFailedState(sentinelException);
00091     }
00092     catch(std::exception &e)
00093     {
00094       errorMsg += e.what();
00095       XCEPT_DECLARE( stor::exception::DiskWriting,
00096         sentinelException, errorMsg );
00097       sharedResources_->moveToFailedState(sentinelException);
00098     }
00099     catch(...)
00100     {
00101       errorMsg += "Unknown exception";
00102       XCEPT_DECLARE( stor::exception::DiskWriting,
00103         sentinelException, errorMsg );
00104       sharedResources_->moveToFailedState(sentinelException);
00105     }
00106     
00107     return actionIsActive_;
00108   }
00109   
00110   
00111   void DiskWriter::writeNextEvent()
00112   {
00113     I2OChain event;
00114     StreamQueuePtr sq = sharedResources_->streamQueue_;
00115     utils::TimePoint_t startTime = utils::getCurrentTime();
00116     if (sq->deqTimedWait(event, timeout_))
00117     {
00118       sharedResources_->diskWriterResources_->setBusy(true);
00119       
00120       utils::Duration_t elapsedTime = utils::getCurrentTime() - startTime;
00121       sharedResources_->statisticsReporter_->getThroughputMonitorCollection().addDiskWriterIdleSample(elapsedTime);
00122       sharedResources_->statisticsReporter_->getThroughputMonitorCollection().addPoppedEventSample(event.memoryUsed());
00123       
00124       if( event.isEndOfLumiSectionMessage() )
00125       {
00126         processEndOfLumiSection( event );
00127       }
00128       else
00129       {
00130         writeEventToStreams( event );
00131         checkForFileTimeOuts();
00132       }
00133     }
00134     else
00135     {
00136       utils::Duration_t elapsedTime = utils::getCurrentTime() - startTime;
00137       sharedResources_->statisticsReporter_->
00138         getThroughputMonitorCollection().addDiskWriterIdleSample(elapsedTime);
00139       
00140       checkStreamChangeRequest();
00141       checkForFileTimeOuts(true);
00142       sharedResources_->diskWriterResources_->setBusy(false);
00143     }
00144   }
00145   
00146   
00147   void DiskWriter::writeEventToStreams(const I2OChain& event)
00148   {
00149     std::vector<StreamID> streams = event.getStreamTags();
00150     
00151     for (
00152       std::vector<StreamID>::const_iterator it = streams.begin(), itEnd = streams.end();
00153       it != itEnd;
00154       ++it
00155     )
00156     {
00157       try
00158       {
00159         streamHandlers_.at(*it)->writeEvent(event);
00160       }
00161       catch (std::out_of_range& e)
00162       {
00163         std::ostringstream msg;
00164         msg << "Unable to retrieve stream handler for " << (*it) << " : ";
00165         msg << e.what();
00166         XCEPT_RAISE(exception::UnknownStreamId, msg.str());
00167       }
00168     }
00169   }
00170   
00171   
00172   void DiskWriter::checkStreamChangeRequest()
00173   {
00174     EvtStrConfigListPtr evtCfgList;
00175     ErrStrConfigListPtr errCfgList;
00176     DiskWritingParams newdwParams;
00177     unsigned int newRunNumber;
00178     boost::posix_time::time_duration newTimeoutValue;
00179     bool doConfig;
00180     if (sharedResources_->diskWriterResources_->
00181       streamChangeRequested(doConfig, evtCfgList, errCfgList, newdwParams, newRunNumber, newTimeoutValue))
00182     {
00183       destroyStreams();
00184       if (doConfig)
00185       {
00186         dwParams_ = newdwParams;
00187         runNumber_ = newRunNumber;
00188         timeout_ = newTimeoutValue;
00189         dbFileHandler_->configure(runNumber_, dwParams_);
00190         
00191         makeFaultyEventStream();
00192         configureEventStreams(evtCfgList);
00193         configureErrorStreams(errCfgList);
00194       }
00195       sharedResources_->diskWriterResources_->streamChangeDone();
00196     }
00197   }
00198   
00199   
00200   void DiskWriter::checkForFileTimeOuts(const bool doItNow)
00201   {
00202     utils::TimePoint_t now = utils::getCurrentTime();
00203     
00204     if (doItNow || (now - lastFileTimeoutCheckTime_) > dwParams_.fileClosingTestInterval_)
00205     {
00206       closeTimedOutFiles(now);
00207       lastFileTimeoutCheckTime_ = now;
00208     }
00209   }
00210   
00211   
00212   void DiskWriter::closeTimedOutFiles(const utils::TimePoint_t now)
00213   {
00214     std::for_each(streamHandlers_.begin(), streamHandlers_.end(),
00215       boost::bind(&StreamHandler::closeTimedOutFiles, _1, now));
00216   }
00217   
00218   
00219   void DiskWriter::configureEventStreams(EvtStrConfigListPtr cfgList)
00220   {
00221     for (
00222       EvtStrConfigList::iterator it = cfgList->begin(),
00223         itEnd = cfgList->end();
00224       it != itEnd;
00225       ++it
00226     ) 
00227     {
00228       if ( it->fractionToDisk() > 0 )
00229         makeEventStream(*it);
00230     }
00231   }
00232   
00233   
00234   void DiskWriter::configureErrorStreams(ErrStrConfigListPtr cfgList)
00235   {
00236     for (
00237       ErrStrConfigList::iterator it = cfgList->begin(),
00238         itEnd = cfgList->end();
00239       it != itEnd;
00240       ++it
00241     ) 
00242     {
00243       makeErrorStream(*it);
00244     }
00245   }
00246   
00247   
00248   void DiskWriter::makeFaultyEventStream()
00249   {
00250     if ( dwParams_.faultyEventsStream_.empty() ) return;
00251 
00252     boost::shared_ptr<FaultyEventStreamHandler> newHandler(
00253     new FaultyEventStreamHandler(sharedResources_, dbFileHandler_, dwParams_.faultyEventsStream_)
00254     );
00255     streamHandlers_.push_back(boost::dynamic_pointer_cast<StreamHandler>(newHandler));
00256   }
00257   
00258   
00259   void DiskWriter::makeEventStream(EventStreamConfigurationInfo& streamCfg)
00260   {
00261     boost::shared_ptr<EventStreamHandler> newHandler(
00262       new EventStreamHandler(streamCfg, sharedResources_, dbFileHandler_)
00263     );
00264     streamHandlers_.push_back(boost::dynamic_pointer_cast<StreamHandler>(newHandler));
00265     streamCfg.setStreamId(streamHandlers_.size() - 1);
00266   }
00267   
00268   
00269   void DiskWriter::makeErrorStream(ErrorStreamConfigurationInfo& streamCfg)
00270   {
00271     boost::shared_ptr<FRDStreamHandler> newHandler(
00272       new FRDStreamHandler(streamCfg, sharedResources_, dbFileHandler_)
00273     );
00274     streamHandlers_.push_back(boost::dynamic_pointer_cast<StreamHandler>(newHandler));
00275     streamCfg.setStreamId(streamHandlers_.size() - 1);
00276   }
00277   
00278   
00279   void DiskWriter::destroyStreams()
00280   {
00281     if (streamHandlers_.empty()) return;
00282     
00283     std::for_each(streamHandlers_.begin(), streamHandlers_.end(),
00284       boost::bind(&StreamHandler::closeAllFiles, _1));
00285     streamHandlers_.clear();
00286     
00287     reportRemainingLumiSections();
00288     writeEndOfRunMarker();
00289   }
00290   
00291   
00292   void DiskWriter::reportRemainingLumiSections() const
00293   {
00294     StreamsMonitorCollection& smc =
00295       sharedResources_->statisticsReporter_->getStreamsMonitorCollection();
00296     
00297     smc.reportAllLumiSectionInfos(dbFileHandler_);
00298   }
00299   
00300   
00301   void DiskWriter::writeEndOfRunMarker() const
00302   {
00303     RunMonitorCollection& rmc =
00304       sharedResources_->statisticsReporter_->getRunMonitorCollection();
00305     // Make sure we report the latest values
00306     rmc.calculateStatistics(utils::getCurrentTime());
00307     
00308     MonitoredQuantity::Stats lumiSectionsSeenStats;
00309     rmc.getLumiSectionsSeenMQ().getStats(lumiSectionsSeenStats);
00310     MonitoredQuantity::Stats eolsSeenStats;
00311     rmc.getEoLSSeenMQ().getStats(eolsSeenStats);
00312     
00313     std::ostringstream str;
00314     str << "LScount:" << lumiSectionsSeenStats.getSampleCount()
00315       << "\tEoLScount:" << eolsSeenStats.getSampleCount()
00316       << "\tLastLumi:" << lumiSectionsSeenStats.getLastSampleValue()
00317       << "\tEoR";
00318     dbFileHandler_->write(str.str());
00319   }
00320   
00321   
00322   void DiskWriter::processEndOfLumiSection(const I2OChain& msg)
00323   {
00324     if ( msg.faulty() || msg.runNumber() != runNumber_ ) return;
00325     
00326     const uint32_t lumiSection = msg.lumiSection();
00327     
00328     std::string fileCountStr;
00329     
00330     for (StreamHandlers::const_iterator it = streamHandlers_.begin(),
00331            itEnd = streamHandlers_.end(); it != itEnd; ++it)
00332     {
00333       (*it)->closeFilesForLumiSection(lumiSection, fileCountStr);
00334     }
00335     dbFileHandler_->write(fileCountStr);
00336   }
00337   
00338 } // namespace stor
00339 
00340