#include <StreamHandler.h>
Public Member Functions | |
void | closeAllFiles () |
void | closeFilesForLumiSection (const uint32_t lumiSection) |
void | closeFilesForLumiSection (const uint32_t &lumiSection, std::string &) |
void | closeTimedOutFiles (utils::TimePoint_t currentTime=utils::getCurrentTime()) |
StreamHandler (const SharedResourcesPtr, const DbFileHandlerPtr) | |
void | writeEvent (const I2OChain &event) |
virtual | ~StreamHandler () |
Protected Types | |
typedef std::map< std::string, unsigned int > | CoreFileNamesMap |
typedef boost::shared_ptr < FileHandler > | FileHandlerPtr |
typedef std::vector < FileHandlerPtr > | FileHandlers |
Protected Member Functions | |
virtual double | fractionToDisk () const =0 |
virtual FileHandlerPtr | getFileHandler (const I2OChain &event) |
unsigned long long | getMaxFileSize () const |
FilesMonitorCollection::FileRecordPtr | getNewFileRecord (const I2OChain &event) |
virtual int | getStreamMaxFileSize () const =0 |
virtual FileHandlerPtr | newFileHandler (const I2OChain &event)=0 |
virtual std::string | streamLabel () const =0 |
Protected Attributes | |
const DbFileHandlerPtr | dbFileHandler_ |
const DiskWritingParams | diskWritingParams_ |
FileHandlers | fileHandlers_ |
const SharedResourcesPtr | sharedResources_ |
const StatisticsReporterPtr | statReporter_ |
const StreamsMonitorCollection::StreamRecordPtr | streamRecord_ |
CoreFileNamesMap | usedCoreFileNames_ |
Private Member Functions | |
bool | fileTooLarge (const FileHandlerPtr, const unsigned long &dataSize) const |
std::string | getBaseFilePath (const uint32_t &runNumber, uint32_t fileCount) const |
std::string | getCoreFileName (const uint32_t &runNumber, const uint32_t &lumiSection) const |
unsigned int | getFileCounter (const std::string &coreFileName) |
std::string | getFileSystem (const uint32_t &runNumber, uint32_t fileCount) const |
StreamHandler & | operator= (StreamHandler const &) |
StreamHandler (StreamHandler const &) |
Abstract class to handle one stream written to disk.
Definition at line 31 of file StreamHandler.h.
typedef std::map<std::string, unsigned int> stor::StreamHandler::CoreFileNamesMap [protected] |
Definition at line 149 of file StreamHandler.h.
typedef boost::shared_ptr<FileHandler> stor::StreamHandler::FileHandlerPtr [protected] |
Definition at line 71 of file StreamHandler.h.
typedef std::vector<FileHandlerPtr> stor::StreamHandler::FileHandlers [protected] |
Definition at line 146 of file StreamHandler.h.
stor::StreamHandler::StreamHandler | ( | const SharedResourcesPtr | sharedResources, |
const DbFileHandlerPtr | dbFileHandler | ||
) |
Definition at line 16 of file StreamHandler.cc.
: sharedResources_(sharedResources), statReporter_(sharedResources->statisticsReporter_), streamRecord_(statReporter_->getStreamsMonitorCollection().getNewStreamRecord()), diskWritingParams_(sharedResources->configuration_->getDiskWritingParams()), dbFileHandler_(dbFileHandler) {}
virtual stor::StreamHandler::~StreamHandler | ( | ) | [inline, virtual] |
Definition at line 37 of file StreamHandler.h.
{};
stor::StreamHandler::StreamHandler | ( | StreamHandler const & | ) | [private] |
void stor::StreamHandler::closeAllFiles | ( | ) |
Gracefully close all open files
Definition at line 28 of file StreamHandler.cc.
References ExpressReco_HICollisions_FallBack::e, stor::AlarmHandler::ERROR, exception, Exception, fileHandlers_, stor::FilesMonitorCollection::FileRecord::runEnded, statReporter_, and streamLabel().
Referenced by stor::DiskWriter::destroyStreams(), and stor::FaultyEventStreamHandler::getFileHandler().
{ std::string errorMsg = "Failed to close all files for stream " + streamLabel() + ": "; for (FileHandlers::const_iterator it = fileHandlers_.begin(), itEnd = fileHandlers_.end(); it != itEnd; ++it) { try { (*it)->closeFile(FilesMonitorCollection::FileRecord::runEnded); } catch(xcept::Exception& e) { XCEPT_DECLARE_NESTED( stor::exception::DiskWriting, sentinelException, errorMsg, e ); statReporter_->alarmHandler()-> notifySentinel(AlarmHandler::ERROR, sentinelException); } catch(std::exception &e) { errorMsg += e.what(); XCEPT_DECLARE( stor::exception::DiskWriting, sentinelException, errorMsg ); statReporter_->alarmHandler()-> notifySentinel(AlarmHandler::ERROR, sentinelException); } catch(...) { errorMsg += "Unknown exception"; XCEPT_DECLARE( stor::exception::DiskWriting, sentinelException, errorMsg ); statReporter_->alarmHandler()-> notifySentinel(AlarmHandler::ERROR, sentinelException); } } fileHandlers_.clear(); }
void stor::StreamHandler::closeFilesForLumiSection | ( | const uint32_t & | lumiSection, |
std::string & | str | ||
) |
Close all files which belong to the given lumi section and print number of files for this lumi section into the passed string.
Definition at line 79 of file StreamHandler.cc.
{ streamRecord_->reportLumiSectionInfo(lumiSection, str); closeFilesForLumiSection(lumiSection); }
void stor::StreamHandler::closeFilesForLumiSection | ( | const uint32_t | lumiSection | ) |
Close all files which belong to the given lumi section
Definition at line 89 of file StreamHandler.cc.
References fileHandlers_, and stor::FileHandler::isFromLumiSection().
{ fileHandlers_.erase( std::remove_if(fileHandlers_.begin(), fileHandlers_.end(), boost::bind(&FileHandler::isFromLumiSection, _1, lumiSection)), fileHandlers_.end()); }
void stor::StreamHandler::closeTimedOutFiles | ( | utils::TimePoint_t | currentTime = utils::getCurrentTime() | ) |
Close all files which are have not seen any recent events
Definition at line 67 of file StreamHandler.cc.
References fileHandlers_, and stor::FileHandler::tooOld().
Referenced by stor::DiskWriter::closeTimedOutFiles().
{ fileHandlers_.erase( std::remove_if(fileHandlers_.begin(), fileHandlers_.end(), boost::bind(&FileHandler::tooOld, _1, currentTime)), fileHandlers_.end()); }
bool stor::StreamHandler::fileTooLarge | ( | const FileHandlerPtr | , |
const unsigned long & | dataSize | ||
) | const [private] |
Return true if the file would become too large when adding dataSize in Bytes
virtual double stor::StreamHandler::fractionToDisk | ( | ) | const [protected, pure virtual] |
Return the fraction-to-disk parameter
Implemented in stor::EventStreamHandler, stor::FaultyEventStreamHandler, and stor::FRDStreamHandler.
Referenced by stor::FaultyEventStreamHandler::FaultyEventStreamHandler().
std::string stor::StreamHandler::getBaseFilePath | ( | const uint32_t & | runNumber, |
uint32_t | fileCount | ||
) | const [private] |
Get path w/o working directory
Definition at line 167 of file StreamHandler.cc.
Referenced by getNewFileRecord().
{ return diskWritingParams_.filePath_ + getFileSystem(runNumber, fileCount); }
std::string stor::StreamHandler::getCoreFileName | ( | const uint32_t & | runNumber, |
const uint32_t & | lumiSection | ||
) | const [private] |
Get the core file name
Definition at line 200 of file StreamHandler.cc.
References python::StorageManager_cfg::streamLabel.
Referenced by getNewFileRecord().
{ std::ostringstream coreFileName; coreFileName << diskWritingParams_.setupLabel_ << "." << std::setfill('0') << std::setw(8) << runNumber << "." << std::setfill('0') << std::setw(4) << lumiSection << "." << streamLabel() << "." << diskWritingParams_.fileName_ << "." << std::setfill('0') << std::setw(2) << diskWritingParams_.smInstanceString_; return coreFileName.str(); }
unsigned int stor::StreamHandler::getFileCounter | ( | const std::string & | coreFileName | ) | [private] |
Get the instance count of this core file name
Definition at line 217 of file StreamHandler.cc.
References pos, and usedCoreFileNames_.
Referenced by getNewFileRecord().
{ CoreFileNamesMap::iterator pos = usedCoreFileNames_.find(coreFileName); if (pos == usedCoreFileNames_.end()) { usedCoreFileNames_.insert(pos, std::make_pair(coreFileName, 0)); return 0; } else { ++(pos->second); return pos->second; } }
StreamHandler::FileHandlerPtr stor::StreamHandler::getFileHandler | ( | const I2OChain & | event | ) | [protected, virtual] |
Get the file handler responsible for the event
Reimplemented in stor::FaultyEventStreamHandler.
Definition at line 110 of file StreamHandler.cc.
References fileHandlers_, newFileHandler(), and stor::I2OChain::totalDataSize().
Referenced by writeEvent().
{ for ( FileHandlers::iterator it = fileHandlers_.begin(), itEnd = fileHandlers_.end(); it != itEnd; ++it ) { if ( (*it)->lumiSection() == event.lumiSection() ) { if ( (*it)->tooLarge(event.totalDataSize()) ) { fileHandlers_.erase(it); break; } else { return (*it); } } } return newFileHandler(event); }
std::string stor::StreamHandler::getFileSystem | ( | const uint32_t & | runNumber, |
uint32_t | fileCount | ||
) | const [private] |
Get file system string
Definition at line 177 of file StreamHandler.cc.
{ // if the number of logical disks is not specified, don't // add a file system subdir to the path if (diskWritingParams_.nLogicalDisk_ <= 0) {return "";} unsigned int fileSystemNumber = (runNumber + atoi(diskWritingParams_.smInstanceString_.c_str()) + fileCount) % diskWritingParams_.nLogicalDisk_; std::ostringstream fileSystem; fileSystem << "/" << std::setfill('0') << std::setw(2) << fileSystemNumber; return fileSystem.str(); }
unsigned long long stor::StreamHandler::getMaxFileSize | ( | ) | const [protected] |
Return the maximum file size in bytes
Definition at line 233 of file StreamHandler.cc.
References diskWritingParams_, getStreamMaxFileSize(), and stor::DiskWritingParams::maxFileSizeMB_.
Referenced by stor::EventStreamHandler::newFileHandler(), and stor::FRDStreamHandler::newFileHandler().
{ const unsigned long long maxFileSizeMB = diskWritingParams_.maxFileSizeMB_ > 0 ? diskWritingParams_.maxFileSizeMB_ : getStreamMaxFileSize(); return ( maxFileSizeMB * 1024 * 1024 ); }
FilesMonitorCollection::FileRecordPtr stor::StreamHandler::getNewFileRecord | ( | const I2OChain & | event | ) | [protected] |
Return a new file record for the event
Definition at line 136 of file StreamHandler.cc.
References ExpressReco_HICollisions_FallBack::e, getBaseFilePath(), getCoreFileName(), getFileCounter(), stor::FilesMonitorCollection::FileRecord::notClosed, sharedResources_, statReporter_, streamLabel(), and streamRecord_.
Referenced by stor::EventStreamHandler::newFileHandler(), stor::FRDStreamHandler::newFileHandler(), and stor::FaultyEventStreamHandler::newFileHandler().
{ FilesMonitorCollection::FileRecordPtr fileRecord = statReporter_->getFilesMonitorCollection().getNewFileRecord(); try { fileRecord->runNumber = event.runNumber(); fileRecord->lumiSection = event.lumiSection(); } catch(stor::exception::IncompleteEventMessage &e) { fileRecord->runNumber = sharedResources_->configuration_->getRunNumber(); fileRecord->lumiSection = 0; } fileRecord->streamLabel = streamLabel(); fileRecord->baseFilePath = getBaseFilePath(fileRecord->runNumber, fileRecord->entryCounter); fileRecord->coreFileName = getCoreFileName(fileRecord->runNumber, fileRecord->lumiSection); fileRecord->fileCounter = getFileCounter(fileRecord->coreFileName); fileRecord->whyClosed = FilesMonitorCollection::FileRecord::notClosed; fileRecord->isOpen = true; streamRecord_->incrementFileCount(fileRecord->lumiSection); return fileRecord; }
virtual int stor::StreamHandler::getStreamMaxFileSize | ( | ) | const [protected, pure virtual] |
Return the maximum file size for the stream in MB
Implemented in stor::EventStreamHandler, stor::FaultyEventStreamHandler, and stor::FRDStreamHandler.
Referenced by getMaxFileSize().
virtual FileHandlerPtr stor::StreamHandler::newFileHandler | ( | const I2OChain & | event | ) | [protected, pure virtual] |
Return a new file handler for the provided event
Implemented in stor::EventStreamHandler, stor::FaultyEventStreamHandler, and stor::FRDStreamHandler.
Referenced by getFileHandler().
StreamHandler& stor::StreamHandler::operator= | ( | StreamHandler const & | ) | [private] |
virtual std::string stor::StreamHandler::streamLabel | ( | ) | const [protected, pure virtual] |
Return the stream label
Implemented in stor::EventStreamHandler, stor::FaultyEventStreamHandler, and stor::FRDStreamHandler.
Referenced by closeAllFiles(), and getNewFileRecord().
void stor::StreamHandler::writeEvent | ( | const I2OChain & | event | ) |
Write the event to the stream file
Definition at line 100 of file StreamHandler.cc.
References getFileHandler(), statReporter_, streamRecord_, and stor::I2OChain::totalDataSize().
{ FileHandlerPtr handler = getFileHandler(event); handler->writeEvent(event); streamRecord_->addSizeInBytes(event.totalDataSize()); statReporter_->getThroughputMonitorCollection(). addDiskWriteSample(event.totalDataSize()); }
const DbFileHandlerPtr stor::StreamHandler::dbFileHandler_ [protected] |
Definition at line 144 of file StreamHandler.h.
Referenced by stor::EventStreamHandler::newFileHandler(), stor::FRDStreamHandler::newFileHandler(), and stor::FaultyEventStreamHandler::newFileHandler().
const DiskWritingParams stor::StreamHandler::diskWritingParams_ [protected] |
Definition at line 143 of file StreamHandler.h.
Referenced by getMaxFileSize(), stor::EventStreamHandler::newFileHandler(), stor::FRDStreamHandler::newFileHandler(), and stor::FaultyEventStreamHandler::newFileHandler().
FileHandlers stor::StreamHandler::fileHandlers_ [protected] |
Definition at line 147 of file StreamHandler.h.
Referenced by closeAllFiles(), closeFilesForLumiSection(), closeTimedOutFiles(), getFileHandler(), stor::EventStreamHandler::newFileHandler(), stor::FRDStreamHandler::newFileHandler(), and stor::FaultyEventStreamHandler::newFileHandler().
const SharedResourcesPtr stor::StreamHandler::sharedResources_ [protected] |
Definition at line 140 of file StreamHandler.h.
Referenced by getNewFileRecord().
const StatisticsReporterPtr stor::StreamHandler::statReporter_ [protected] |
Definition at line 141 of file StreamHandler.h.
Referenced by closeAllFiles(), getNewFileRecord(), and writeEvent().
const StreamsMonitorCollection::StreamRecordPtr stor::StreamHandler::streamRecord_ [protected] |
Definition at line 142 of file StreamHandler.h.
Referenced by stor::FaultyEventStreamHandler::fractionToDisk(), getNewFileRecord(), stor::FaultyEventStreamHandler::streamLabel(), and writeEvent().
Definition at line 150 of file StreamHandler.h.
Referenced by getFileCounter().