CMS 3D CMS Logo

Public Member Functions | Private Types | Private Member Functions | Private Attributes

stor::DiskWriter Class Reference

#include <DiskWriter.h>

List of all members.

Public Member Functions

 DiskWriter (xdaq::Application *, SharedResourcesPtr sr)
void startWorkLoop (std::string workloopName)
bool writeAction (toolbox::task::WorkLoop *)
 ~DiskWriter ()

Private Types

typedef boost::shared_ptr
< StreamHandler
StreamHandlerPtr
typedef std::vector
< StreamHandlerPtr
StreamHandlers

Private Member Functions

void checkForFileTimeOuts (const bool doItNow=false)
void checkStreamChangeRequest ()
void closeTimedOutFiles (const utils::TimePoint_t)
void configureErrorStreams (ErrStrConfigListPtr)
void configureEventStreams (EvtStrConfigListPtr)
void destroyStreams ()
 DiskWriter (DiskWriter const &)
void makeErrorStream (ErrorStreamConfigurationInfo &)
void makeEventStream (EventStreamConfigurationInfo &)
void makeFaultyEventStream ()
DiskWriteroperator= (DiskWriter const &)
void processEndOfLumiSection (const I2OChain &)
void reportRemainingLumiSections () const
void writeEndOfRunMarker () const
void writeEventToStreams (const I2OChain &)
void writeNextEvent ()

Private Attributes

bool actionIsActive_
xdaq::Application * app_
const DbFileHandlerPtr dbFileHandler_
DiskWritingParams dwParams_
utils::TimePoint_t lastFileTimeoutCheckTime_
unsigned int runNumber_
SharedResourcesPtr sharedResources_
StreamHandlers streamHandlers_
boost::posix_time::time_duration timeout_
toolbox::task::WorkLoop * writingWL_

Detailed Description

Writes events to disk

It gets the next event from the StreamQueue and writes it to the appropriate stream file(s) on disk.

Author:
mommsen
Revision:
1.13
Date:
2011/03/07 15:31:31

Definition at line 41 of file DiskWriter.h.


Member Typedef Documentation

typedef boost::shared_ptr<StreamHandler> stor::DiskWriter::StreamHandlerPtr [private]

Definition at line 152 of file DiskWriter.h.

typedef std::vector<StreamHandlerPtr> stor::DiskWriter::StreamHandlers [private]

Definition at line 153 of file DiskWriter.h.


Constructor & Destructor Documentation

stor::DiskWriter::DiskWriter ( xdaq::Application *  app,
SharedResourcesPtr  sr 
)

Definition at line 24 of file DiskWriter.cc.

References stor::WorkerThreadParams::DWdeqWaitTime_, sharedResources_, and timeout_.

                                                                    :
  app_(app),
  sharedResources_(sr),
  dbFileHandler_(new DbFileHandler()),
  runNumber_(0),
  lastFileTimeoutCheckTime_(utils::getCurrentTime()),
  actionIsActive_(true)
  {
    WorkerThreadParams workerParams =
      sharedResources_->configuration_->getWorkerThreadParams();
    timeout_ = workerParams.DWdeqWaitTime_;
  }
stor::DiskWriter::~DiskWriter ( )

Definition at line 38 of file DiskWriter.cc.

References actionIsActive_, destroyStreams(), and writingWL_.

  {
    // Stop the activity
    actionIsActive_ = false;
    
    // Cancel the workloop (will wait until the action has finished)
    writingWL_->cancel();
    
    // Destroy any remaining streams. Under normal conditions, there should be none
    destroyStreams(); 
  }
stor::DiskWriter::DiskWriter ( DiskWriter const &  ) [private]

Member Function Documentation

void stor::DiskWriter::checkForFileTimeOuts ( const bool  doItNow = false) [private]

Close old files if fileClosingTestInterval has passed or do it now if argument is true

Definition at line 200 of file DiskWriter.cc.

References closeTimedOutFiles(), dwParams_, stor::DiskWritingParams::fileClosingTestInterval_, stor::utils::getCurrentTime(), lastFileTimeoutCheckTime_, and cmsPerfSuiteHarvest::now.

Referenced by writeNextEvent().

  {
    utils::TimePoint_t now = utils::getCurrentTime();
    
    if (doItNow || (now - lastFileTimeoutCheckTime_) > dwParams_.fileClosingTestInterval_)
    {
      closeTimedOutFiles(now);
      lastFileTimeoutCheckTime_ = now;
    }
  }
void stor::DiskWriter::checkStreamChangeRequest ( ) [private]

Reconfigure streams if a request is pending

Definition at line 172 of file DiskWriter.cc.

References configureErrorStreams(), configureEventStreams(), dbFileHandler_, destroyStreams(), dwParams_, makeFaultyEventStream(), runNumber_, sharedResources_, and timeout_.

Referenced by writeNextEvent().

  {
    EvtStrConfigListPtr evtCfgList;
    ErrStrConfigListPtr errCfgList;
    DiskWritingParams newdwParams;
    unsigned int newRunNumber;
    boost::posix_time::time_duration newTimeoutValue;
    bool doConfig;
    if (sharedResources_->diskWriterResources_->
      streamChangeRequested(doConfig, evtCfgList, errCfgList, newdwParams, newRunNumber, newTimeoutValue))
    {
      destroyStreams();
      if (doConfig)
      {
        dwParams_ = newdwParams;
        runNumber_ = newRunNumber;
        timeout_ = newTimeoutValue;
        dbFileHandler_->configure(runNumber_, dwParams_);
        
        makeFaultyEventStream();
        configureEventStreams(evtCfgList);
        configureErrorStreams(errCfgList);
      }
      sharedResources_->diskWriterResources_->streamChangeDone();
    }
  }
void stor::DiskWriter::closeTimedOutFiles ( const utils::TimePoint_t  now) [private]

Close all timed-out files

Definition at line 212 of file DiskWriter.cc.

References stor::StreamHandler::closeTimedOutFiles(), and streamHandlers_.

Referenced by checkForFileTimeOuts().

  {
    std::for_each(streamHandlers_.begin(), streamHandlers_.end(),
      boost::bind(&StreamHandler::closeTimedOutFiles, _1, now));
  }
void stor::DiskWriter::configureErrorStreams ( ErrStrConfigListPtr  cfgList) [private]

Configures the error streams to be written to disk

Definition at line 234 of file DiskWriter.cc.

References makeErrorStream().

Referenced by checkStreamChangeRequest().

  {
    for (
      ErrStrConfigList::iterator it = cfgList->begin(),
        itEnd = cfgList->end();
      it != itEnd;
      ++it
    ) 
    {
      makeErrorStream(*it);
    }
  }
void stor::DiskWriter::configureEventStreams ( EvtStrConfigListPtr  cfgList) [private]

Configures the event streams to be written to disk

Definition at line 219 of file DiskWriter.cc.

References makeEventStream().

Referenced by checkStreamChangeRequest().

  {
    for (
      EvtStrConfigList::iterator it = cfgList->begin(),
        itEnd = cfgList->end();
      it != itEnd;
      ++it
    ) 
    {
      if ( it->fractionToDisk() > 0 )
        makeEventStream(*it);
    }
  }
void stor::DiskWriter::destroyStreams ( ) [private]

Gracefully close all streams

Definition at line 279 of file DiskWriter.cc.

References stor::StreamHandler::closeAllFiles(), reportRemainingLumiSections(), streamHandlers_, and writeEndOfRunMarker().

Referenced by checkStreamChangeRequest(), and ~DiskWriter().

  {
    if (streamHandlers_.empty()) return;
    
    std::for_each(streamHandlers_.begin(), streamHandlers_.end(),
      boost::bind(&StreamHandler::closeAllFiles, _1));
    streamHandlers_.clear();
    
    reportRemainingLumiSections();
    writeEndOfRunMarker();
  }
void stor::DiskWriter::makeErrorStream ( ErrorStreamConfigurationInfo streamCfg) [private]

Creates the handler for the given error event stream

Definition at line 269 of file DiskWriter.cc.

References dbFileHandler_, stor::ErrorStreamConfigurationInfo::setStreamId(), sharedResources_, and streamHandlers_.

Referenced by configureErrorStreams().

  {
    boost::shared_ptr<FRDStreamHandler> newHandler(
      new FRDStreamHandler(streamCfg, sharedResources_, dbFileHandler_)
    );
    streamHandlers_.push_back(boost::dynamic_pointer_cast<StreamHandler>(newHandler));
    streamCfg.setStreamId(streamHandlers_.size() - 1);
  }
void stor::DiskWriter::makeEventStream ( EventStreamConfigurationInfo streamCfg) [private]

Creates the handler for the given event stream

Definition at line 259 of file DiskWriter.cc.

References dbFileHandler_, stor::EventStreamConfigurationInfo::setStreamId(), sharedResources_, and streamHandlers_.

Referenced by configureEventStreams().

  {
    boost::shared_ptr<EventStreamHandler> newHandler(
      new EventStreamHandler(streamCfg, sharedResources_, dbFileHandler_)
    );
    streamHandlers_.push_back(boost::dynamic_pointer_cast<StreamHandler>(newHandler));
    streamCfg.setStreamId(streamHandlers_.size() - 1);
  }
void stor::DiskWriter::makeFaultyEventStream ( ) [private]

Creates the handler for faulty events detected by the SM

Definition at line 248 of file DiskWriter.cc.

References dbFileHandler_, dwParams_, stor::DiskWritingParams::faultyEventsStream_, sharedResources_, and streamHandlers_.

Referenced by checkStreamChangeRequest().

  {
    if ( dwParams_.faultyEventsStream_.empty() ) return;

    boost::shared_ptr<FaultyEventStreamHandler> newHandler(
    new FaultyEventStreamHandler(sharedResources_, dbFileHandler_, dwParams_.faultyEventsStream_)
    );
    streamHandlers_.push_back(boost::dynamic_pointer_cast<StreamHandler>(newHandler));
  }
DiskWriter& stor::DiskWriter::operator= ( DiskWriter const &  ) [private]
void stor::DiskWriter::processEndOfLumiSection ( const I2OChain msg) [private]

Close files at the end of a luminosity section and release message memory:

Definition at line 322 of file DiskWriter.cc.

References dbFileHandler_, stor::I2OChain::faulty(), stor::I2OChain::lumiSection(), stor::I2OChain::runNumber(), runNumber_, and streamHandlers_.

Referenced by writeNextEvent().

  {
    if ( msg.faulty() || msg.runNumber() != runNumber_ ) return;
    
    const uint32_t lumiSection = msg.lumiSection();
    
    std::string fileCountStr;
    
    for (StreamHandlers::const_iterator it = streamHandlers_.begin(),
           itEnd = streamHandlers_.end(); it != itEnd; ++it)
    {
      (*it)->closeFilesForLumiSection(lumiSection, fileCountStr);
    }
    dbFileHandler_->write(fileCountStr);
  }
void stor::DiskWriter::reportRemainingLumiSections ( ) const [private]

Log file statistics for so far unreported lumi sections

Definition at line 292 of file DiskWriter.cc.

References dbFileHandler_, stor::StreamsMonitorCollection::reportAllLumiSectionInfos(), and sharedResources_.

Referenced by destroyStreams().

  {
    StreamsMonitorCollection& smc =
      sharedResources_->statisticsReporter_->getStreamsMonitorCollection();
    
    smc.reportAllLumiSectionInfos(dbFileHandler_);
  }
void stor::DiskWriter::startWorkLoop ( std::string  workloopName)

Creates and starts the disk writing workloop

Definition at line 51 of file DiskWriter.cc.

References app_, ExpressReco_HICollisions_FallBack::e, Exception, stor::utils::getIdentifier(), runTheMatrix::msg, writeAction(), and writingWL_.

  {
    try
    {
      std::string identifier = utils::getIdentifier(app_->getApplicationDescriptor());
      
      writingWL_ = toolbox::task::getWorkLoopFactory()->
        getWorkLoop( identifier + workloopName, "waiting" );
      
      if ( ! writingWL_->isActive() )
      {
        toolbox::task::ActionSignature* processAction = 
          toolbox::task::bind(this, &DiskWriter::writeAction, 
            identifier + "WriteNextEvent");
        writingWL_->submit(processAction);
        
        writingWL_->activate();
      }
    }
    catch (xcept::Exception& e)
    {
      std::string msg = "Failed to start workloop 'DiskWriter' with 'writeNextEvent'.";
      XCEPT_RETHROW(stor::exception::DiskWriting, msg, e);
    }
  }
bool stor::DiskWriter::writeAction ( toolbox::task::WorkLoop *  )

The workloop action taking the next event from the StreamQueue and writing it to disk

Definition at line 78 of file DiskWriter.cc.

References actionIsActive_, ExpressReco_HICollisions_FallBack::e, exception, Exception, sharedResources_, and writeNextEvent().

Referenced by startWorkLoop().

  {
    std::string errorMsg = "Failed to write an event: ";
    
    try
    {
      writeNextEvent();
    }
    catch(xcept::Exception &e)
    {
      XCEPT_DECLARE_NESTED( stor::exception::DiskWriting,
        sentinelException, errorMsg, e );
      sharedResources_->moveToFailedState(sentinelException);
    }
    catch(std::exception &e)
    {
      errorMsg += e.what();
      XCEPT_DECLARE( stor::exception::DiskWriting,
        sentinelException, errorMsg );
      sharedResources_->moveToFailedState(sentinelException);
    }
    catch(...)
    {
      errorMsg += "Unknown exception";
      XCEPT_DECLARE( stor::exception::DiskWriting,
        sentinelException, errorMsg );
      sharedResources_->moveToFailedState(sentinelException);
    }
    
    return actionIsActive_;
  }
void stor::DiskWriter::writeEndOfRunMarker ( ) const [private]

Log end-of-run marker

Definition at line 301 of file DiskWriter.cc.

References stor::MonitorCollection::calculateStatistics(), dbFileHandler_, stor::utils::getCurrentTime(), stor::RunMonitorCollection::getEoLSSeenMQ(), stor::MonitoredQuantity::Stats::getLastSampleValue(), stor::RunMonitorCollection::getLumiSectionsSeenMQ(), stor::MonitoredQuantity::Stats::getSampleCount(), stor::MonitoredQuantity::getStats(), and sharedResources_.

Referenced by destroyStreams().

  {
    RunMonitorCollection& rmc =
      sharedResources_->statisticsReporter_->getRunMonitorCollection();
    // Make sure we report the latest values
    rmc.calculateStatistics(utils::getCurrentTime());
    
    MonitoredQuantity::Stats lumiSectionsSeenStats;
    rmc.getLumiSectionsSeenMQ().getStats(lumiSectionsSeenStats);
    MonitoredQuantity::Stats eolsSeenStats;
    rmc.getEoLSSeenMQ().getStats(eolsSeenStats);
    
    std::ostringstream str;
    str << "LScount:" << lumiSectionsSeenStats.getSampleCount()
      << "\tEoLScount:" << eolsSeenStats.getSampleCount()
      << "\tLastLumi:" << lumiSectionsSeenStats.getLastSampleValue()
      << "\tEoR";
    dbFileHandler_->write(str.str());
  }
void stor::DiskWriter::writeEventToStreams ( const I2OChain event) [private]

Writes the event to the appropriate streams

Definition at line 147 of file DiskWriter.cc.

References ExpressReco_HICollisions_FallBack::e, runTheMatrix::msg, streamHandlers_, and hcal_dqm_sourceclient-file_cfg::streams.

Referenced by writeNextEvent().

  {
    std::vector<StreamID> streams = event.getStreamTags();
    
    for (
      std::vector<StreamID>::const_iterator it = streams.begin(), itEnd = streams.end();
      it != itEnd;
      ++it
    )
    {
      try
      {
        streamHandlers_.at(*it)->writeEvent(event);
      }
      catch (std::out_of_range& e)
      {
        std::ostringstream msg;
        msg << "Unable to retrieve stream handler for " << (*it) << " : ";
        msg << e.what();
        XCEPT_RAISE(exception::UnknownStreamId, msg.str());
      }
    }
  }
void stor::DiskWriter::writeNextEvent ( ) [private]

Takes the event from the stream queue

Definition at line 111 of file DiskWriter.cc.

References checkForFileTimeOuts(), checkStreamChangeRequest(), event(), stor::utils::getCurrentTime(), stor::I2OChain::isEndOfLumiSectionMessage(), stor::I2OChain::memoryUsed(), processEndOfLumiSection(), sharedResources_, timeout_, and writeEventToStreams().

Referenced by writeAction().

  {
    I2OChain event;
    StreamQueuePtr sq = sharedResources_->streamQueue_;
    utils::TimePoint_t startTime = utils::getCurrentTime();
    if (sq->deqTimedWait(event, timeout_))
    {
      sharedResources_->diskWriterResources_->setBusy(true);
      
      utils::Duration_t elapsedTime = utils::getCurrentTime() - startTime;
      sharedResources_->statisticsReporter_->getThroughputMonitorCollection().addDiskWriterIdleSample(elapsedTime);
      sharedResources_->statisticsReporter_->getThroughputMonitorCollection().addPoppedEventSample(event.memoryUsed());
      
      if( event.isEndOfLumiSectionMessage() )
      {
        processEndOfLumiSection( event );
      }
      else
      {
        writeEventToStreams( event );
        checkForFileTimeOuts();
      }
    }
    else
    {
      utils::Duration_t elapsedTime = utils::getCurrentTime() - startTime;
      sharedResources_->statisticsReporter_->
        getThroughputMonitorCollection().addDiskWriterIdleSample(elapsedTime);
      
      checkStreamChangeRequest();
      checkForFileTimeOuts(true);
      sharedResources_->diskWriterResources_->setBusy(false);
    }
  }

Member Data Documentation

Definition at line 156 of file DiskWriter.h.

Referenced by writeAction(), and ~DiskWriter().

xdaq::Application* stor::DiskWriter::app_ [private]

Definition at line 143 of file DiskWriter.h.

Referenced by startWorkLoop().

Definition at line 150 of file DiskWriter.h.

Referenced by checkForFileTimeOuts().

unsigned int stor::DiskWriter::runNumber_ [private]

Definition at line 148 of file DiskWriter.h.

Referenced by checkStreamChangeRequest(), and processEndOfLumiSection().

boost::posix_time::time_duration stor::DiskWriter::timeout_ [private]

Definition at line 149 of file DiskWriter.h.

Referenced by checkStreamChangeRequest(), DiskWriter(), and writeNextEvent().

toolbox::task::WorkLoop* stor::DiskWriter::writingWL_ [private]

Definition at line 157 of file DiskWriter.h.

Referenced by startWorkLoop(), and ~DiskWriter().