CMS 3D CMS Logo

Public Member Functions | Protected Types | Protected Member Functions | Protected Attributes | Private Member Functions

stor::StreamHandler Class Reference

#include <StreamHandler.h>

Inheritance diagram for stor::StreamHandler:
stor::EventStreamHandler stor::FaultyEventStreamHandler stor::FRDStreamHandler

List of all members.

Public Member Functions

void closeAllFiles ()
void closeFilesForLumiSection (const uint32_t lumiSection)
void closeFilesForLumiSection (const uint32_t &lumiSection, std::string &)
void closeTimedOutFiles (utils::TimePoint_t currentTime=utils::getCurrentTime())
 StreamHandler (const SharedResourcesPtr, const DbFileHandlerPtr)
void writeEvent (const I2OChain &event)
virtual ~StreamHandler ()

Protected Types

typedef std::map< std::string,
unsigned int > 
CoreFileNamesMap
typedef boost::shared_ptr
< FileHandler
FileHandlerPtr
typedef std::vector
< FileHandlerPtr
FileHandlers

Protected Member Functions

virtual double fractionToDisk () const =0
virtual FileHandlerPtr getFileHandler (const I2OChain &event)
unsigned long long getMaxFileSize () const
FilesMonitorCollection::FileRecordPtr getNewFileRecord (const I2OChain &event)
virtual int getStreamMaxFileSize () const =0
virtual FileHandlerPtr newFileHandler (const I2OChain &event)=0
virtual std::string streamLabel () const =0

Protected Attributes

const DbFileHandlerPtr dbFileHandler_
const DiskWritingParams diskWritingParams_
FileHandlers fileHandlers_
const SharedResourcesPtr sharedResources_
const StatisticsReporterPtr statReporter_
const
StreamsMonitorCollection::StreamRecordPtr 
streamRecord_
CoreFileNamesMap usedCoreFileNames_

Private Member Functions

bool fileTooLarge (const FileHandlerPtr, const unsigned long &dataSize) const
std::string getBaseFilePath (const uint32_t &runNumber, uint32_t fileCount) const
std::string getCoreFileName (const uint32_t &runNumber, const uint32_t &lumiSection) const
unsigned int getFileCounter (const std::string &coreFileName)
std::string getFileSystem (const uint32_t &runNumber, uint32_t fileCount) const
StreamHandleroperator= (StreamHandler const &)
 StreamHandler (StreamHandler const &)

Detailed Description

Abstract class to handle one stream written to disk.

Author:
mommsen
Revision:
1.14
Date:
2011/03/07 15:31:32

Definition at line 31 of file StreamHandler.h.


Member Typedef Documentation

typedef std::map<std::string, unsigned int> stor::StreamHandler::CoreFileNamesMap [protected]

Definition at line 149 of file StreamHandler.h.

typedef boost::shared_ptr<FileHandler> stor::StreamHandler::FileHandlerPtr [protected]

Definition at line 71 of file StreamHandler.h.

typedef std::vector<FileHandlerPtr> stor::StreamHandler::FileHandlers [protected]

Definition at line 146 of file StreamHandler.h.


Constructor & Destructor Documentation

stor::StreamHandler::StreamHandler ( const SharedResourcesPtr  sharedResources,
const DbFileHandlerPtr  dbFileHandler 
)

Definition at line 16 of file StreamHandler.cc.

    :
  sharedResources_(sharedResources),
  statReporter_(sharedResources->statisticsReporter_),
  streamRecord_(statReporter_->getStreamsMonitorCollection().getNewStreamRecord()),
  diskWritingParams_(sharedResources->configuration_->getDiskWritingParams()),
  dbFileHandler_(dbFileHandler)
  {}
virtual stor::StreamHandler::~StreamHandler ( ) [inline, virtual]

Definition at line 37 of file StreamHandler.h.

{};
stor::StreamHandler::StreamHandler ( StreamHandler const &  ) [private]

Member Function Documentation

void stor::StreamHandler::closeAllFiles ( )

Gracefully close all open files

Definition at line 28 of file StreamHandler.cc.

References ExpressReco_HICollisions_FallBack::e, stor::AlarmHandler::ERROR, exception, Exception, fileHandlers_, stor::FilesMonitorCollection::FileRecord::runEnded, statReporter_, and streamLabel().

Referenced by stor::DiskWriter::destroyStreams(), and stor::FaultyEventStreamHandler::getFileHandler().

  {
    std::string errorMsg = "Failed to close all files for stream " + streamLabel() + ": ";
    
    for (FileHandlers::const_iterator it = fileHandlers_.begin(),
           itEnd = fileHandlers_.end(); it != itEnd; ++it)
    {
      try
      {
        (*it)->closeFile(FilesMonitorCollection::FileRecord::runEnded);
      }
      catch(xcept::Exception& e)
      {
        XCEPT_DECLARE_NESTED( stor::exception::DiskWriting,
          sentinelException, errorMsg, e );
        statReporter_->alarmHandler()->
          notifySentinel(AlarmHandler::ERROR, sentinelException);
      }
      catch(std::exception &e)
      {
        errorMsg += e.what();
        XCEPT_DECLARE( stor::exception::DiskWriting,
          sentinelException, errorMsg );
        statReporter_->alarmHandler()->
          notifySentinel(AlarmHandler::ERROR, sentinelException);
      }
      catch(...)
      {
        errorMsg += "Unknown exception";
        XCEPT_DECLARE( stor::exception::DiskWriting,
          sentinelException, errorMsg );
        statReporter_->alarmHandler()->
          notifySentinel(AlarmHandler::ERROR, sentinelException);
      }
    }
    fileHandlers_.clear();
  }
void stor::StreamHandler::closeFilesForLumiSection ( const uint32_t &  lumiSection,
std::string &  str 
)

Close all files which belong to the given lumi section and print number of files for this lumi section into the passed string.

Definition at line 79 of file StreamHandler.cc.

  {
    streamRecord_->reportLumiSectionInfo(lumiSection, str);
    closeFilesForLumiSection(lumiSection);
  }
void stor::StreamHandler::closeFilesForLumiSection ( const uint32_t  lumiSection)

Close all files which belong to the given lumi section

Definition at line 89 of file StreamHandler.cc.

References fileHandlers_, and stor::FileHandler::isFromLumiSection().

  {
    fileHandlers_.erase(
      std::remove_if(fileHandlers_.begin(),
        fileHandlers_.end(),
        boost::bind(&FileHandler::isFromLumiSection,
          _1, lumiSection)),
      fileHandlers_.end());
  }
void stor::StreamHandler::closeTimedOutFiles ( utils::TimePoint_t  currentTime = utils::getCurrentTime())

Close all files which are have not seen any recent events

Definition at line 67 of file StreamHandler.cc.

References fileHandlers_, and stor::FileHandler::tooOld().

Referenced by stor::DiskWriter::closeTimedOutFiles().

  {
    fileHandlers_.erase(
      std::remove_if(fileHandlers_.begin(),
        fileHandlers_.end(),
        boost::bind(&FileHandler::tooOld,
          _1, currentTime)),
      fileHandlers_.end());
  }
bool stor::StreamHandler::fileTooLarge ( const FileHandlerPtr  ,
const unsigned long &  dataSize 
) const [private]

Return true if the file would become too large when adding dataSize in Bytes

virtual double stor::StreamHandler::fractionToDisk ( ) const [protected, pure virtual]
std::string stor::StreamHandler::getBaseFilePath ( const uint32_t &  runNumber,
uint32_t  fileCount 
) const [private]

Get path w/o working directory

Definition at line 167 of file StreamHandler.cc.

Referenced by getNewFileRecord().

  {
    return diskWritingParams_.filePath_ + getFileSystem(runNumber, fileCount);
  }
std::string stor::StreamHandler::getCoreFileName ( const uint32_t &  runNumber,
const uint32_t &  lumiSection 
) const [private]

Get the core file name

Definition at line 200 of file StreamHandler.cc.

References python::StorageManager_cfg::streamLabel.

Referenced by getNewFileRecord().

  {
    std::ostringstream coreFileName;
    coreFileName << diskWritingParams_.setupLabel_
      << "." << std::setfill('0') << std::setw(8) << runNumber
      << "." << std::setfill('0') << std::setw(4) << lumiSection
      << "." << streamLabel()
      << "." << diskWritingParams_.fileName_
      << "." << std::setfill('0') << std::setw(2) << diskWritingParams_.smInstanceString_;
    
    return coreFileName.str();
  }
unsigned int stor::StreamHandler::getFileCounter ( const std::string &  coreFileName) [private]

Get the instance count of this core file name

Definition at line 217 of file StreamHandler.cc.

References pos, and usedCoreFileNames_.

Referenced by getNewFileRecord().

  {
    CoreFileNamesMap::iterator pos = usedCoreFileNames_.find(coreFileName);
    if (pos == usedCoreFileNames_.end())
    {
      usedCoreFileNames_.insert(pos, std::make_pair(coreFileName, 0));
      return 0;
    }
    else
    {
      ++(pos->second);
      return pos->second;
    }
  }
StreamHandler::FileHandlerPtr stor::StreamHandler::getFileHandler ( const I2OChain event) [protected, virtual]

Get the file handler responsible for the event

Reimplemented in stor::FaultyEventStreamHandler.

Definition at line 110 of file StreamHandler.cc.

References fileHandlers_, newFileHandler(), and stor::I2OChain::totalDataSize().

Referenced by writeEvent().

  {
    for (
      FileHandlers::iterator it = fileHandlers_.begin(), itEnd = fileHandlers_.end();
      it != itEnd;
      ++it
    ) 
    {
      if ( (*it)->lumiSection() == event.lumiSection() )
      {
        if ( (*it)->tooLarge(event.totalDataSize()) )
        { 
          fileHandlers_.erase(it);
          break;
        }
        else
        {
          return (*it);
        }
      }
    }    
    return newFileHandler(event);
  }
std::string stor::StreamHandler::getFileSystem ( const uint32_t &  runNumber,
uint32_t  fileCount 
) const [private]

Get file system string

Definition at line 177 of file StreamHandler.cc.

  {
    // if the number of logical disks is not specified, don't
    // add a file system subdir to the path
    if (diskWritingParams_.nLogicalDisk_ <= 0) {return "";}
    
    unsigned int fileSystemNumber =
      (runNumber
        + atoi(diskWritingParams_.smInstanceString_.c_str())
        + fileCount)
      % diskWritingParams_.nLogicalDisk_;
    
    std::ostringstream fileSystem;
    fileSystem << "/" << std::setfill('0') << std::setw(2) << fileSystemNumber; 
    
    return fileSystem.str();
  }
unsigned long long stor::StreamHandler::getMaxFileSize ( ) const [protected]

Return the maximum file size in bytes

Definition at line 233 of file StreamHandler.cc.

References diskWritingParams_, getStreamMaxFileSize(), and stor::DiskWritingParams::maxFileSizeMB_.

Referenced by stor::EventStreamHandler::newFileHandler(), and stor::FRDStreamHandler::newFileHandler().

  {
    const unsigned long long maxFileSizeMB =
      diskWritingParams_.maxFileSizeMB_ > 0 ? 
      diskWritingParams_.maxFileSizeMB_ : getStreamMaxFileSize();
    
    return ( maxFileSizeMB * 1024 * 1024 );
  }
FilesMonitorCollection::FileRecordPtr stor::StreamHandler::getNewFileRecord ( const I2OChain event) [protected]

Return a new file record for the event

Definition at line 136 of file StreamHandler.cc.

References ExpressReco_HICollisions_FallBack::e, getBaseFilePath(), getCoreFileName(), getFileCounter(), stor::FilesMonitorCollection::FileRecord::notClosed, sharedResources_, statReporter_, streamLabel(), and streamRecord_.

Referenced by stor::EventStreamHandler::newFileHandler(), stor::FRDStreamHandler::newFileHandler(), and stor::FaultyEventStreamHandler::newFileHandler().

  {
    FilesMonitorCollection::FileRecordPtr fileRecord =
      statReporter_->getFilesMonitorCollection().getNewFileRecord();
    
    try
    {
      fileRecord->runNumber = event.runNumber();
      fileRecord->lumiSection = event.lumiSection();
    }
    catch(stor::exception::IncompleteEventMessage &e)
    {
      fileRecord->runNumber = sharedResources_->configuration_->getRunNumber();
      fileRecord->lumiSection = 0;
    }
    fileRecord->streamLabel = streamLabel();
    fileRecord->baseFilePath =
      getBaseFilePath(fileRecord->runNumber, fileRecord->entryCounter);
    fileRecord->coreFileName =
      getCoreFileName(fileRecord->runNumber, fileRecord->lumiSection);
    fileRecord->fileCounter = getFileCounter(fileRecord->coreFileName);
    fileRecord->whyClosed = FilesMonitorCollection::FileRecord::notClosed;
    fileRecord->isOpen = true;
    
    streamRecord_->incrementFileCount(fileRecord->lumiSection);
    
    return fileRecord;
  }
virtual int stor::StreamHandler::getStreamMaxFileSize ( ) const [protected, pure virtual]

Return the maximum file size for the stream in MB

Implemented in stor::EventStreamHandler, stor::FaultyEventStreamHandler, and stor::FRDStreamHandler.

Referenced by getMaxFileSize().

virtual FileHandlerPtr stor::StreamHandler::newFileHandler ( const I2OChain event) [protected, pure virtual]

Return a new file handler for the provided event

Implemented in stor::EventStreamHandler, stor::FaultyEventStreamHandler, and stor::FRDStreamHandler.

Referenced by getFileHandler().

StreamHandler& stor::StreamHandler::operator= ( StreamHandler const &  ) [private]
virtual std::string stor::StreamHandler::streamLabel ( ) const [protected, pure virtual]
void stor::StreamHandler::writeEvent ( const I2OChain event)

Write the event to the stream file

Definition at line 100 of file StreamHandler.cc.

References getFileHandler(), statReporter_, streamRecord_, and stor::I2OChain::totalDataSize().

  {
    FileHandlerPtr handler = getFileHandler(event);
    handler->writeEvent(event);
    streamRecord_->addSizeInBytes(event.totalDataSize());
    statReporter_->getThroughputMonitorCollection().
      addDiskWriteSample(event.totalDataSize());
  }

Member Data Documentation

Definition at line 140 of file StreamHandler.h.

Referenced by getNewFileRecord().

Definition at line 141 of file StreamHandler.h.

Referenced by closeAllFiles(), getNewFileRecord(), and writeEvent().

Definition at line 150 of file StreamHandler.h.

Referenced by getFileCounter().