#include <StreamHandler.h>
Public Member Functions | |
void | closeAllFiles () |
bool | closeFilesForLumiSection (const uint32_t &lumiSection, std::string &) |
void | closeTimedOutFiles (utils::TimePoint_t currentTime=utils::getCurrentTime()) |
StreamHandler (const SharedResourcesPtr, const DbFileHandlerPtr) | |
void | writeEvent (const I2OChain &event) |
virtual | ~StreamHandler () |
Protected Types | |
typedef std::map< std::string, unsigned int > | CoreFileNamesMap |
typedef boost::shared_ptr < FileHandler > | FileHandlerPtr |
typedef std::vector < FileHandlerPtr > | FileHandlers |
Protected Member Functions | |
virtual double | fractionToDisk () const =0 |
virtual FileHandlerPtr | getFileHandler (const I2OChain &event) |
unsigned long long | getMaxFileSize () const |
FilesMonitorCollection::FileRecordPtr | getNewFileRecord (const I2OChain &event) |
virtual int | getStreamMaxFileSize () const =0 |
virtual FileHandlerPtr | newFileHandler (const I2OChain &event)=0 |
virtual std::string | streamLabel () const =0 |
Protected Attributes | |
const DbFileHandlerPtr | dbFileHandler_ |
const DiskWritingParams | diskWritingParams_ |
FileHandlers | fileHandlers_ |
const SharedResourcesPtr | sharedResources_ |
const StatisticsReporterPtr | statReporter_ |
const StreamsMonitorCollection::StreamRecordPtr | streamRecord_ |
CoreFileNamesMap | usedCoreFileNames_ |
Private Member Functions | |
bool | fileTooLarge (const FileHandlerPtr, const unsigned long &dataSize) const |
std::string | getBaseFilePath (const uint32_t &runNumber, uint32_t fileCount) const |
std::string | getCoreFileName (const uint32_t &runNumber, const uint32_t &lumiSection) const |
unsigned int | getFileCounter (const std::string &coreFileName) |
std::string | getFileSystem (const uint32_t &runNumber, uint32_t fileCount) const |
StreamHandler & | operator= (StreamHandler const &) |
StreamHandler (StreamHandler const &) |
Abstract class to handle one stream written to disk.
Definition at line 31 of file StreamHandler.h.
typedef std::map<std::string, unsigned int> stor::StreamHandler::CoreFileNamesMap [protected] |
Definition at line 145 of file StreamHandler.h.
typedef boost::shared_ptr<FileHandler> stor::StreamHandler::FileHandlerPtr [protected] |
Definition at line 67 of file StreamHandler.h.
typedef std::vector<FileHandlerPtr> stor::StreamHandler::FileHandlers [protected] |
Definition at line 142 of file StreamHandler.h.
stor::StreamHandler::StreamHandler | ( | const SharedResourcesPtr | sharedResources, |
const DbFileHandlerPtr | dbFileHandler | ||
) |
Definition at line 16 of file StreamHandler.cc.
: sharedResources_(sharedResources), statReporter_(sharedResources->statisticsReporter_), streamRecord_(statReporter_->getStreamsMonitorCollection().getNewStreamRecord()), diskWritingParams_(sharedResources->configuration_->getDiskWritingParams()), dbFileHandler_(dbFileHandler) {}
virtual stor::StreamHandler::~StreamHandler | ( | ) | [inline, virtual] |
Definition at line 37 of file StreamHandler.h.
{};
stor::StreamHandler::StreamHandler | ( | StreamHandler const & | ) | [private] |
void stor::StreamHandler::closeAllFiles | ( | ) |
Gracefully close all open files
Definition at line 28 of file StreamHandler.cc.
References stor::AlarmHandler::ERROR, exception, Exception, fileHandlers_, stor::FilesMonitorCollection::FileRecord::runEnded, sharedResources_, and streamLabel().
Referenced by stor::DiskWriter::destroyStreams(), and stor::FaultyEventStreamHandler::getFileHandler().
{ std::string errorMsg = "Failed to close all files for stream " + streamLabel() + ": "; for (FileHandlers::const_iterator it = fileHandlers_.begin(), itEnd = fileHandlers_.end(); it != itEnd; ++it) { try { (*it)->closeFile(FilesMonitorCollection::FileRecord::runEnded); } catch(xcept::Exception& e) { XCEPT_DECLARE_NESTED( stor::exception::DiskWriting, sentinelException, errorMsg, e ); sharedResources_->alarmHandler_-> notifySentinel(AlarmHandler::ERROR, sentinelException); } catch(std::exception &e) { errorMsg += e.what(); XCEPT_DECLARE( stor::exception::DiskWriting, sentinelException, errorMsg ); sharedResources_->alarmHandler_-> notifySentinel(AlarmHandler::ERROR, sentinelException); } catch(...) { errorMsg += "Unknown exception"; XCEPT_DECLARE( stor::exception::DiskWriting, sentinelException, errorMsg ); sharedResources_->alarmHandler_-> notifySentinel(AlarmHandler::ERROR, sentinelException); } } fileHandlers_.clear(); }
bool stor::StreamHandler::closeFilesForLumiSection | ( | const uint32_t & | lumiSection, |
std::string & | str | ||
) |
Close all files which belong to the given lumi section and print number of files for this lumi section into the passed string. Returns true if at least one file was closed.
Definition at line 79 of file StreamHandler.cc.
References stor::FileHandler::isFromLumiSection().
{ fileHandlers_.erase( std::remove_if(fileHandlers_.begin(), fileHandlers_.end(), boost::bind(&FileHandler::isFromLumiSection, _1, lumiSection)), fileHandlers_.end()); return streamRecord_->reportLumiSectionInfo(lumiSection, str); }
void stor::StreamHandler::closeTimedOutFiles | ( | utils::TimePoint_t | currentTime = utils::getCurrentTime() | ) |
Close all files which are have not seen any recent events
Definition at line 67 of file StreamHandler.cc.
References fileHandlers_, and stor::FileHandler::tooOld().
Referenced by stor::DiskWriter::closeTimedOutFiles().
{ fileHandlers_.erase( std::remove_if(fileHandlers_.begin(), fileHandlers_.end(), boost::bind(&FileHandler::tooOld, _1, currentTime)), fileHandlers_.end()); }
bool stor::StreamHandler::fileTooLarge | ( | const FileHandlerPtr | , |
const unsigned long & | dataSize | ||
) | const [private] |
Return true if the file would become too large when adding dataSize in Bytes
virtual double stor::StreamHandler::fractionToDisk | ( | ) | const [protected, pure virtual] |
Return the fraction-to-disk parameter
Implemented in stor::EventStreamHandler, stor::FaultyEventStreamHandler, and stor::FRDStreamHandler.
Referenced by stor::FaultyEventStreamHandler::FaultyEventStreamHandler().
std::string stor::StreamHandler::getBaseFilePath | ( | const uint32_t & | runNumber, |
uint32_t | fileCount | ||
) | const [private] |
Get path w/o working directory
Definition at line 162 of file StreamHandler.cc.
Referenced by getNewFileRecord().
{ return diskWritingParams_.filePath_ + getFileSystem(runNumber, fileCount); }
std::string stor::StreamHandler::getCoreFileName | ( | const uint32_t & | runNumber, |
const uint32_t & | lumiSection | ||
) | const [private] |
Get the core file name
Definition at line 195 of file StreamHandler.cc.
References python::StorageManager_cfg::streamLabel.
Referenced by getNewFileRecord().
{ std::ostringstream coreFileName; coreFileName << diskWritingParams_.setupLabel_ << "." << std::setfill('0') << std::setw(8) << runNumber << "." << std::setfill('0') << std::setw(4) << lumiSection << "." << streamLabel() << "." << diskWritingParams_.fileName_ << "." << std::setfill('0') << std::setw(2) << diskWritingParams_.smInstanceString_; return coreFileName.str(); }
unsigned int stor::StreamHandler::getFileCounter | ( | const std::string & | coreFileName | ) | [private] |
Get the instance count of this core file name
Definition at line 212 of file StreamHandler.cc.
References pos, and usedCoreFileNames_.
Referenced by getNewFileRecord().
{ CoreFileNamesMap::iterator pos = usedCoreFileNames_.find(coreFileName); if (pos == usedCoreFileNames_.end()) { usedCoreFileNames_.insert(pos, std::make_pair(coreFileName, 0)); return 0; } else { ++(pos->second); return pos->second; } }
StreamHandler::FileHandlerPtr stor::StreamHandler::getFileHandler | ( | const I2OChain & | event | ) | [protected, virtual] |
Get the file handler responsible for the event
Reimplemented in stor::FaultyEventStreamHandler.
Definition at line 105 of file StreamHandler.cc.
References fileHandlers_, newFileHandler(), and stor::I2OChain::totalDataSize().
Referenced by writeEvent().
{ for ( FileHandlers::iterator it = fileHandlers_.begin(), itEnd = fileHandlers_.end(); it != itEnd; ++it ) { if ( (*it)->lumiSection() == event.lumiSection() ) { if ( (*it)->tooLarge(event.totalDataSize()) ) { fileHandlers_.erase(it); break; } else { return (*it); } } } return newFileHandler(event); }
std::string stor::StreamHandler::getFileSystem | ( | const uint32_t & | runNumber, |
uint32_t | fileCount | ||
) | const [private] |
Get file system string
Definition at line 172 of file StreamHandler.cc.
{ // if the number of logical disks is not specified, don't // add a file system subdir to the path if (diskWritingParams_.nLogicalDisk_ <= 0) {return "";} unsigned int fileSystemNumber = (runNumber + atoi(diskWritingParams_.smInstanceString_.c_str()) + fileCount) % diskWritingParams_.nLogicalDisk_; std::ostringstream fileSystem; fileSystem << "/" << std::setfill('0') << std::setw(2) << fileSystemNumber; return fileSystem.str(); }
unsigned long long stor::StreamHandler::getMaxFileSize | ( | ) | const [protected] |
Return the maximum file size in bytes
Definition at line 228 of file StreamHandler.cc.
References diskWritingParams_, getStreamMaxFileSize(), and stor::DiskWritingParams::maxFileSizeMB_.
Referenced by stor::EventStreamHandler::newFileHandler(), and stor::FRDStreamHandler::newFileHandler().
{ const unsigned long long maxFileSizeMB = diskWritingParams_.maxFileSizeMB_ > 0 ? diskWritingParams_.maxFileSizeMB_ : getStreamMaxFileSize(); return ( maxFileSizeMB * 1024 * 1024 ); }
FilesMonitorCollection::FileRecordPtr stor::StreamHandler::getNewFileRecord | ( | const I2OChain & | event | ) | [protected] |
Return a new file record for the event
Definition at line 131 of file StreamHandler.cc.
References getBaseFilePath(), getCoreFileName(), getFileCounter(), stor::FilesMonitorCollection::FileRecord::notClosed, sharedResources_, statReporter_, streamLabel(), and streamRecord_.
Referenced by stor::EventStreamHandler::newFileHandler(), stor::FRDStreamHandler::newFileHandler(), and stor::FaultyEventStreamHandler::newFileHandler().
{ FilesMonitorCollection::FileRecordPtr fileRecord = statReporter_->getFilesMonitorCollection().getNewFileRecord(); try { fileRecord->runNumber = event.runNumber(); fileRecord->lumiSection = event.lumiSection(); } catch(stor::exception::IncompleteEventMessage &e) { fileRecord->runNumber = sharedResources_->configuration_->getRunNumber(); fileRecord->lumiSection = 0; } fileRecord->streamLabel = streamLabel(); fileRecord->baseFilePath = getBaseFilePath(fileRecord->runNumber, fileRecord->entryCounter); fileRecord->coreFileName = getCoreFileName(fileRecord->runNumber, fileRecord->lumiSection); fileRecord->fileCounter = getFileCounter(fileRecord->coreFileName); fileRecord->whyClosed = FilesMonitorCollection::FileRecord::notClosed; fileRecord->isOpen = true; streamRecord_->incrementFileCount(fileRecord->lumiSection); return fileRecord; }
virtual int stor::StreamHandler::getStreamMaxFileSize | ( | ) | const [protected, pure virtual] |
Return the maximum file size for the stream in MB
Implemented in stor::EventStreamHandler, stor::FaultyEventStreamHandler, and stor::FRDStreamHandler.
Referenced by getMaxFileSize().
virtual FileHandlerPtr stor::StreamHandler::newFileHandler | ( | const I2OChain & | event | ) | [protected, pure virtual] |
Return a new file handler for the provided event
Implemented in stor::EventStreamHandler, stor::FaultyEventStreamHandler, and stor::FRDStreamHandler.
Referenced by getFileHandler().
StreamHandler& stor::StreamHandler::operator= | ( | StreamHandler const & | ) | [private] |
virtual std::string stor::StreamHandler::streamLabel | ( | ) | const [protected, pure virtual] |
Return the stream label
Implemented in stor::EventStreamHandler, stor::FaultyEventStreamHandler, and stor::FRDStreamHandler.
Referenced by closeAllFiles(), and getNewFileRecord().
void stor::StreamHandler::writeEvent | ( | const I2OChain & | event | ) |
Write the event to the stream file
Definition at line 95 of file StreamHandler.cc.
References getFileHandler(), statReporter_, streamRecord_, and stor::I2OChain::totalDataSize().
{ FileHandlerPtr handler = getFileHandler(event); handler->writeEvent(event); streamRecord_->addSizeInBytes(event.totalDataSize()); statReporter_->getThroughputMonitorCollection(). addDiskWriteSample(event.totalDataSize()); }
const DbFileHandlerPtr stor::StreamHandler::dbFileHandler_ [protected] |
Definition at line 140 of file StreamHandler.h.
Referenced by stor::EventStreamHandler::newFileHandler(), stor::FRDStreamHandler::newFileHandler(), and stor::FaultyEventStreamHandler::newFileHandler().
const DiskWritingParams stor::StreamHandler::diskWritingParams_ [protected] |
Definition at line 139 of file StreamHandler.h.
Referenced by getMaxFileSize(), stor::EventStreamHandler::newFileHandler(), stor::FRDStreamHandler::newFileHandler(), and stor::FaultyEventStreamHandler::newFileHandler().
FileHandlers stor::StreamHandler::fileHandlers_ [protected] |
Definition at line 143 of file StreamHandler.h.
Referenced by closeAllFiles(), closeTimedOutFiles(), getFileHandler(), stor::EventStreamHandler::newFileHandler(), stor::FRDStreamHandler::newFileHandler(), and stor::FaultyEventStreamHandler::newFileHandler().
const SharedResourcesPtr stor::StreamHandler::sharedResources_ [protected] |
Definition at line 136 of file StreamHandler.h.
Referenced by closeAllFiles(), and getNewFileRecord().
const StatisticsReporterPtr stor::StreamHandler::statReporter_ [protected] |
Definition at line 137 of file StreamHandler.h.
Referenced by getNewFileRecord(), and writeEvent().
const StreamsMonitorCollection::StreamRecordPtr stor::StreamHandler::streamRecord_ [protected] |
Definition at line 138 of file StreamHandler.h.
Referenced by stor::FaultyEventStreamHandler::fractionToDisk(), getNewFileRecord(), stor::FaultyEventStreamHandler::streamLabel(), and writeEvent().
Definition at line 146 of file StreamHandler.h.
Referenced by getFileCounter().