CMS 3D CMS Logo

Public Member Functions | Private Types | Private Member Functions | Private Attributes

stor::DiskWriter Class Reference

#include <DiskWriter.h>

List of all members.

Public Member Functions

 DiskWriter (xdaq::Application *, SharedResourcesPtr sr)
void startWorkLoop (std::string workloopName)
bool writeAction (toolbox::task::WorkLoop *)
 ~DiskWriter ()

Private Types

typedef boost::shared_ptr
< StreamHandler
StreamHandlerPtr
typedef std::vector
< StreamHandlerPtr
StreamHandlers

Private Member Functions

void checkForFileTimeOuts (const bool doItNow=false)
void checkStreamChangeRequest ()
void closeTimedOutFiles (const utils::TimePoint_t)
void configureErrorStreams (ErrStrConfigListPtr)
void configureEventStreams (EvtStrConfigListPtr)
void destroyStreams ()
 DiskWriter (DiskWriter const &)
void makeErrorStream (ErrorStreamConfigurationInfo &)
void makeEventStream (EventStreamConfigurationInfo &)
void makeFaultyEventStream ()
DiskWriteroperator= (DiskWriter const &)
void processEndOfLumiSection (const I2OChain &)
void reportRemainingLumiSections ()
void writeEndOfRunMarker ()
void writeEventToStreams (const I2OChain &)
void writeNextEvent ()

Private Attributes

bool actionIsActive_
xdaq::Application * app_
const DbFileHandlerPtr dbFileHandler_
DiskWritingParams dwParams_
StreamsMonitorCollection::EndOfRunReportPtr endOfRunReport_
utils::TimePoint_t lastFileTimeoutCheckTime_
unsigned int runNumber_
SharedResourcesPtr sharedResources_
StreamHandlers streamHandlers_
boost::posix_time::time_duration timeout_
toolbox::task::WorkLoop * writingWL_

Detailed Description

Writes events to disk

It gets the next event from the StreamQueue and writes it to the appropriate stream file(s) on disk.

Author:
mommsen
Revision:
1.15
Date:
2011/06/20 15:55:52

Definition at line 42 of file DiskWriter.h.


Member Typedef Documentation

typedef boost::shared_ptr<StreamHandler> stor::DiskWriter::StreamHandlerPtr [private]

Definition at line 153 of file DiskWriter.h.

typedef std::vector<StreamHandlerPtr> stor::DiskWriter::StreamHandlers [private]

Definition at line 154 of file DiskWriter.h.


Constructor & Destructor Documentation

stor::DiskWriter::DiskWriter ( xdaq::Application *  app,
SharedResourcesPtr  sr 
)

Definition at line 25 of file DiskWriter.cc.

References stor::WorkerThreadParams::DWdeqWaitTime_, sharedResources_, and timeout_.

                                                                    :
  app_(app),
  sharedResources_(sr),
  dbFileHandler_(new DbFileHandler()),
  runNumber_(0),
  lastFileTimeoutCheckTime_(utils::getCurrentTime()),
  endOfRunReport_(new StreamsMonitorCollection::EndOfRunReport()),
  actionIsActive_(true)
  {
    WorkerThreadParams workerParams =
      sharedResources_->configuration_->getWorkerThreadParams();
    timeout_ = workerParams.DWdeqWaitTime_;
  }
stor::DiskWriter::~DiskWriter ( )

Definition at line 40 of file DiskWriter.cc.

References actionIsActive_, destroyStreams(), and writingWL_.

  {
    // Stop the activity
    actionIsActive_ = false;
    
    // Cancel the workloop (will wait until the action has finished)
    writingWL_->cancel();
    
    // Destroy any remaining streams. Under normal conditions, there should be none
    destroyStreams(); 
  }
stor::DiskWriter::DiskWriter ( DiskWriter const &  ) [private]

Member Function Documentation

void stor::DiskWriter::checkForFileTimeOuts ( const bool  doItNow = false) [private]

Close old files if fileClosingTestInterval has passed or do it now if argument is true

Definition at line 202 of file DiskWriter.cc.

References closeTimedOutFiles(), dwParams_, stor::DiskWritingParams::fileClosingTestInterval_, stor::utils::getCurrentTime(), lastFileTimeoutCheckTime_, and cmsPerfSuiteHarvest::now.

Referenced by writeNextEvent().

  {
    utils::TimePoint_t now = utils::getCurrentTime();
    
    if (doItNow || (now - lastFileTimeoutCheckTime_) > dwParams_.fileClosingTestInterval_)
    {
      closeTimedOutFiles(now);
      lastFileTimeoutCheckTime_ = now;
    }
  }
void stor::DiskWriter::checkStreamChangeRequest ( ) [private]

Reconfigure streams if a request is pending

Definition at line 174 of file DiskWriter.cc.

References configureErrorStreams(), configureEventStreams(), dbFileHandler_, destroyStreams(), dwParams_, makeFaultyEventStream(), runNumber_, sharedResources_, and timeout_.

Referenced by writeNextEvent().

  {
    EvtStrConfigListPtr evtCfgList;
    ErrStrConfigListPtr errCfgList;
    DiskWritingParams newdwParams;
    unsigned int newRunNumber;
    boost::posix_time::time_duration newTimeoutValue;
    bool doConfig;
    if (sharedResources_->diskWriterResources_->
      streamChangeRequested(doConfig, evtCfgList, errCfgList, newdwParams, newRunNumber, newTimeoutValue))
    {
      destroyStreams();
      if (doConfig)
      {
        dwParams_ = newdwParams;
        runNumber_ = newRunNumber;
        timeout_ = newTimeoutValue;
        dbFileHandler_->configure(runNumber_, dwParams_);
        
        makeFaultyEventStream();
        configureEventStreams(evtCfgList);
        configureErrorStreams(errCfgList);
      }
      sharedResources_->diskWriterResources_->streamChangeDone();
    }
  }
void stor::DiskWriter::closeTimedOutFiles ( const utils::TimePoint_t  now) [private]

Close all timed-out files

Definition at line 214 of file DiskWriter.cc.

References stor::StreamHandler::closeTimedOutFiles(), and streamHandlers_.

Referenced by checkForFileTimeOuts().

  {
    std::for_each(streamHandlers_.begin(), streamHandlers_.end(),
      boost::bind(&StreamHandler::closeTimedOutFiles, _1, now));
  }
void stor::DiskWriter::configureErrorStreams ( ErrStrConfigListPtr  cfgList) [private]

Configures the error streams to be written to disk

Definition at line 236 of file DiskWriter.cc.

References makeErrorStream().

Referenced by checkStreamChangeRequest().

  {
    for (
      ErrStrConfigList::iterator it = cfgList->begin(),
        itEnd = cfgList->end();
      it != itEnd;
      ++it
    ) 
    {
      makeErrorStream(*it);
    }
  }
void stor::DiskWriter::configureEventStreams ( EvtStrConfigListPtr  cfgList) [private]

Configures the event streams to be written to disk

Definition at line 221 of file DiskWriter.cc.

References makeEventStream().

Referenced by checkStreamChangeRequest().

  {
    for (
      EvtStrConfigList::iterator it = cfgList->begin(),
        itEnd = cfgList->end();
      it != itEnd;
      ++it
    ) 
    {
      if ( it->fractionToDisk() > 0 )
        makeEventStream(*it);
    }
  }
void stor::DiskWriter::destroyStreams ( ) [private]

Gracefully close all streams

Definition at line 281 of file DiskWriter.cc.

References stor::StreamHandler::closeAllFiles(), reportRemainingLumiSections(), streamHandlers_, and writeEndOfRunMarker().

Referenced by checkStreamChangeRequest(), and ~DiskWriter().

  {
    if (streamHandlers_.empty()) return;
    
    std::for_each(streamHandlers_.begin(), streamHandlers_.end(),
      boost::bind(&StreamHandler::closeAllFiles, _1));
    streamHandlers_.clear();
    
    reportRemainingLumiSections();
    writeEndOfRunMarker();
  }
void stor::DiskWriter::makeErrorStream ( ErrorStreamConfigurationInfo streamCfg) [private]

Creates the handler for the given error event stream

Definition at line 271 of file DiskWriter.cc.

References dbFileHandler_, stor::ErrorStreamConfigurationInfo::setStreamId(), sharedResources_, and streamHandlers_.

Referenced by configureErrorStreams().

  {
    boost::shared_ptr<FRDStreamHandler> newHandler(
      new FRDStreamHandler(streamCfg, sharedResources_, dbFileHandler_)
    );
    streamHandlers_.push_back(boost::dynamic_pointer_cast<StreamHandler>(newHandler));
    streamCfg.setStreamId(streamHandlers_.size() - 1);
  }
void stor::DiskWriter::makeEventStream ( EventStreamConfigurationInfo streamCfg) [private]

Creates the handler for the given event stream

Definition at line 261 of file DiskWriter.cc.

References dbFileHandler_, stor::EventStreamConfigurationInfo::setStreamId(), sharedResources_, and streamHandlers_.

Referenced by configureEventStreams().

  {
    boost::shared_ptr<EventStreamHandler> newHandler(
      new EventStreamHandler(streamCfg, sharedResources_, dbFileHandler_)
    );
    streamHandlers_.push_back(boost::dynamic_pointer_cast<StreamHandler>(newHandler));
    streamCfg.setStreamId(streamHandlers_.size() - 1);
  }
void stor::DiskWriter::makeFaultyEventStream ( ) [private]

Creates the handler for faulty events detected by the SM

Definition at line 250 of file DiskWriter.cc.

References dbFileHandler_, dwParams_, stor::DiskWritingParams::faultyEventsStream_, sharedResources_, and streamHandlers_.

Referenced by checkStreamChangeRequest().

  {
    if ( dwParams_.faultyEventsStream_.empty() ) return;

    boost::shared_ptr<FaultyEventStreamHandler> newHandler(
    new FaultyEventStreamHandler(sharedResources_, dbFileHandler_, dwParams_.faultyEventsStream_)
    );
    streamHandlers_.push_back(boost::dynamic_pointer_cast<StreamHandler>(newHandler));
  }
DiskWriter& stor::DiskWriter::operator= ( DiskWriter const &  ) [private]
void stor::DiskWriter::processEndOfLumiSection ( const I2OChain msg) [private]

Close files at the end of a luminosity section and release message memory:

Definition at line 315 of file DiskWriter.cc.

References dbFileHandler_, endOfRunReport_, stor::I2OChain::faulty(), stor::I2OChain::lumiSection(), stor::I2OChain::runNumber(), runNumber_, and streamHandlers_.

Referenced by writeNextEvent().

  {
    if ( msg.faulty() || msg.runNumber() != runNumber_ ) return;
    if ( streamHandlers_.empty() ) return; //Don't care about EoLS signal if we have no streams

    const uint32_t lumiSection = msg.lumiSection();
    
    std::string fileCountStr;
    bool filesWritten = false;

    for (StreamHandlers::const_iterator it = streamHandlers_.begin(),
           itEnd = streamHandlers_.end(); it != itEnd; ++it)
    {
      if ( (*it)->closeFilesForLumiSection(lumiSection, fileCountStr) )
        filesWritten = true;
    }
    fileCountStr += "\tEoLS:1";
    dbFileHandler_->write(fileCountStr);

    ++(endOfRunReport_->eolsCount);
    if (filesWritten) ++(endOfRunReport_->lsCountWithFiles);
    endOfRunReport_->updateLatestWrittenLumiSection(lumiSection);
  }
void stor::DiskWriter::reportRemainingLumiSections ( ) [private]

Log file statistics for so far unreported lumi sections

Definition at line 294 of file DiskWriter.cc.

References dbFileHandler_, endOfRunReport_, stor::StreamsMonitorCollection::reportAllLumiSectionInfos(), and sharedResources_.

Referenced by destroyStreams().

  {
    StreamsMonitorCollection& smc =
      sharedResources_->statisticsReporter_->getStreamsMonitorCollection();
    
    smc.reportAllLumiSectionInfos(dbFileHandler_, endOfRunReport_);
  }
void stor::DiskWriter::startWorkLoop ( std::string  workloopName)

Creates and starts the disk writing workloop

Definition at line 53 of file DiskWriter.cc.

References app_, alignCSCRings::e, Exception, stor::utils::getIdentifier(), lumiQueryAPI::msg, writeAction(), and writingWL_.

  {
    try
    {
      std::string identifier = utils::getIdentifier(app_->getApplicationDescriptor());
      
      writingWL_ = toolbox::task::getWorkLoopFactory()->
        getWorkLoop( identifier + workloopName, "waiting" );
      
      if ( ! writingWL_->isActive() )
      {
        toolbox::task::ActionSignature* processAction = 
          toolbox::task::bind(this, &DiskWriter::writeAction, 
            identifier + "WriteNextEvent");
        writingWL_->submit(processAction);
        
        writingWL_->activate();
      }
    }
    catch (xcept::Exception& e)
    {
      std::string msg = "Failed to start workloop 'DiskWriter' with 'writeNextEvent'.";
      XCEPT_RETHROW(stor::exception::DiskWriting, msg, e);
    }
  }
bool stor::DiskWriter::writeAction ( toolbox::task::WorkLoop *  )

The workloop action taking the next event from the StreamQueue and writing it to disk

Definition at line 80 of file DiskWriter.cc.

References actionIsActive_, alignCSCRings::e, exception, Exception, sharedResources_, and writeNextEvent().

Referenced by startWorkLoop().

  {
    std::string errorMsg = "Failed to write an event: ";
    
    try
    {
      writeNextEvent();
    }
    catch(xcept::Exception &e)
    {
      XCEPT_DECLARE_NESTED( stor::exception::DiskWriting,
        sentinelException, errorMsg, e );
      sharedResources_->alarmHandler_->moveToFailedState(sentinelException);
    }
    catch(std::exception &e)
    {
      errorMsg += e.what();
      XCEPT_DECLARE( stor::exception::DiskWriting,
        sentinelException, errorMsg );
      sharedResources_->alarmHandler_->moveToFailedState(sentinelException);
    }
    catch(...)
    {
      errorMsg += "Unknown exception";
      XCEPT_DECLARE( stor::exception::DiskWriting,
        sentinelException, errorMsg );
      sharedResources_->alarmHandler_->moveToFailedState(sentinelException);
    }
    
    return actionIsActive_;
  }
void stor::DiskWriter::writeEndOfRunMarker ( ) [private]

Log end-of-run marker

Definition at line 303 of file DiskWriter.cc.

References dbFileHandler_, and endOfRunReport_.

Referenced by destroyStreams().

  {
    std::ostringstream str;
    str << "LScount:" << endOfRunReport_->lsCountWithFiles
      << "\tEoLScount:" << endOfRunReport_->eolsCount
      << "\tLastLumi:" << endOfRunReport_->latestLumiSectionWritten
      << "\tEoR";
    dbFileHandler_->write(str.str());
    endOfRunReport_->reset();
  }
void stor::DiskWriter::writeEventToStreams ( const I2OChain event) [private]

Writes the event to the appropriate streams

Definition at line 149 of file DiskWriter.cc.

References alignCSCRings::e, lumiQueryAPI::msg, streamHandlers_, and hcal_dqm_sourceclient-file_cfg::streams.

Referenced by writeNextEvent().

  {
    std::vector<StreamID> streams = event.getStreamTags();
    
    for (
      std::vector<StreamID>::const_iterator it = streams.begin(), itEnd = streams.end();
      it != itEnd;
      ++it
    )
    {
      try
      {
        streamHandlers_.at(*it)->writeEvent(event);
      }
      catch (std::out_of_range& e)
      {
        std::ostringstream msg;
        msg << "Unable to retrieve stream handler for " << (*it) << " : ";
        msg << e.what();
        XCEPT_RAISE(exception::UnknownStreamId, msg.str());
      }
    }
  }
void stor::DiskWriter::writeNextEvent ( ) [private]

Takes the event from the stream queue

Definition at line 113 of file DiskWriter.cc.

References checkForFileTimeOuts(), checkStreamChangeRequest(), event(), stor::utils::getCurrentTime(), stor::I2OChain::isEndOfLumiSectionMessage(), stor::I2OChain::memoryUsed(), processEndOfLumiSection(), sharedResources_, timeout_, and writeEventToStreams().

Referenced by writeAction().

  {
    I2OChain event;
    StreamQueuePtr sq = sharedResources_->streamQueue_;
    utils::TimePoint_t startTime = utils::getCurrentTime();
    if (sq->deqTimedWait(event, timeout_))
    {
      sharedResources_->diskWriterResources_->setBusy(true);
      
      utils::Duration_t elapsedTime = utils::getCurrentTime() - startTime;
      sharedResources_->statisticsReporter_->getThroughputMonitorCollection().addDiskWriterIdleSample(elapsedTime);
      sharedResources_->statisticsReporter_->getThroughputMonitorCollection().addPoppedEventSample(event.memoryUsed());
      
      if( event.isEndOfLumiSectionMessage() )
      {
        processEndOfLumiSection( event );
      }
      else
      {
        writeEventToStreams( event );
        checkForFileTimeOuts();
      }
    }
    else
    {
      utils::Duration_t elapsedTime = utils::getCurrentTime() - startTime;
      sharedResources_->statisticsReporter_->
        getThroughputMonitorCollection().addDiskWriterIdleSample(elapsedTime);
      
      checkStreamChangeRequest();
      checkForFileTimeOuts(true);
      sharedResources_->diskWriterResources_->setBusy(false);
    }
  }

Member Data Documentation

Definition at line 159 of file DiskWriter.h.

Referenced by writeAction(), and ~DiskWriter().

xdaq::Application* stor::DiskWriter::app_ [private]

Definition at line 144 of file DiskWriter.h.

Referenced by startWorkLoop().

Definition at line 151 of file DiskWriter.h.

Referenced by checkForFileTimeOuts().

unsigned int stor::DiskWriter::runNumber_ [private]

Definition at line 149 of file DiskWriter.h.

Referenced by checkStreamChangeRequest(), and processEndOfLumiSection().

boost::posix_time::time_duration stor::DiskWriter::timeout_ [private]

Definition at line 150 of file DiskWriter.h.

Referenced by checkStreamChangeRequest(), DiskWriter(), and writeNextEvent().

toolbox::task::WorkLoop* stor::DiskWriter::writingWL_ [private]

Definition at line 160 of file DiskWriter.h.

Referenced by startWorkLoop(), and ~DiskWriter().