Go to the documentation of this file.00001
00003
00004 #include <sstream>
00005 #include <iomanip>
00006
00007 #include "EventFilter/StorageManager/interface/FileHandler.h"
00008 #include "EventFilter/StorageManager/interface/I2OChain.h"
00009 #include "EventFilter/StorageManager/interface/StreamHandler.h"
00010
00011 #include "boost/bind.hpp"
00012
00013
00014 namespace stor {
00015
00016 StreamHandler::StreamHandler(
00017 const SharedResourcesPtr sharedResources,
00018 const DbFileHandlerPtr dbFileHandler
00019 ) :
00020 sharedResources_(sharedResources),
00021 statReporter_(sharedResources->statisticsReporter_),
00022 streamRecord_(statReporter_->getStreamsMonitorCollection().getNewStreamRecord()),
00023 diskWritingParams_(dbFileHandler->getDiskWritingParams()),
00024 dbFileHandler_(dbFileHandler)
00025 {}
00026
00027
00028 void StreamHandler::closeAllFiles()
00029 {
00030 std::string errorMsg = "Failed to close all files for stream " + streamLabel() + ": ";
00031
00032 for (FileHandlers::const_iterator it = fileHandlers_.begin(),
00033 itEnd = fileHandlers_.end(); it != itEnd; ++it)
00034 {
00035 try
00036 {
00037 (*it)->closeFile(FilesMonitorCollection::FileRecord::runEnded);
00038 }
00039 catch(xcept::Exception& e)
00040 {
00041 XCEPT_DECLARE_NESTED( stor::exception::DiskWriting,
00042 sentinelException, errorMsg, e );
00043 sharedResources_->alarmHandler_->
00044 notifySentinel(AlarmHandler::ERROR, sentinelException);
00045 }
00046 catch(std::exception &e)
00047 {
00048 errorMsg += e.what();
00049 XCEPT_DECLARE( stor::exception::DiskWriting,
00050 sentinelException, errorMsg );
00051 sharedResources_->alarmHandler_->
00052 notifySentinel(AlarmHandler::ERROR, sentinelException);
00053 }
00054 catch(...)
00055 {
00056 errorMsg += "Unknown exception";
00057 XCEPT_DECLARE( stor::exception::DiskWriting,
00058 sentinelException, errorMsg );
00059 sharedResources_->alarmHandler_->
00060 notifySentinel(AlarmHandler::ERROR, sentinelException);
00061 }
00062 }
00063 fileHandlers_.clear();
00064 }
00065
00066
00067 void StreamHandler::closeTimedOutFiles(utils::TimePoint_t currentTime)
00068 {
00069 fileHandlers_.erase(
00070 std::remove_if(fileHandlers_.begin(),
00071 fileHandlers_.end(),
00072 boost::bind(&FileHandler::tooOld,
00073 _1, currentTime)),
00074 fileHandlers_.end());
00075 }
00076
00077
00078 bool StreamHandler::closeFilesForLumiSection
00079 (
00080 const uint32_t& lumiSection,
00081 std::string& str
00082 )
00083 {
00084 fileHandlers_.erase(
00085 std::remove_if(fileHandlers_.begin(),
00086 fileHandlers_.end(),
00087 boost::bind(&FileHandler::isFromLumiSection,
00088 _1, lumiSection)),
00089 fileHandlers_.end());
00090
00091 return streamRecord_->reportLumiSectionInfo(lumiSection, str);
00092 }
00093
00094
00095 void StreamHandler::writeEvent(const I2OChain& event)
00096 {
00097 FileHandlerPtr handler = getFileHandler(event);
00098 handler->writeEvent(event);
00099 streamRecord_->addSizeInBytes(event.totalDataSize());
00100 statReporter_->getThroughputMonitorCollection().
00101 addDiskWriteSample(event.totalDataSize());
00102 }
00103
00104
00105 StreamHandler::FileHandlerPtr StreamHandler::getFileHandler(const I2OChain& event)
00106 {
00107 for (
00108 FileHandlers::iterator it = fileHandlers_.begin(), itEnd = fileHandlers_.end();
00109 it != itEnd;
00110 ++it
00111 )
00112 {
00113 if ( (*it)->lumiSection() == event.lumiSection() )
00114 {
00115 if ( (*it)->tooLarge(event.totalDataSize()) )
00116 {
00117 fileHandlers_.erase(it);
00118 break;
00119 }
00120 else
00121 {
00122 return (*it);
00123 }
00124 }
00125 }
00126 return newFileHandler(event);
00127 }
00128
00129
00130 FilesMonitorCollection::FileRecordPtr
00131 StreamHandler::getNewFileRecord(const I2OChain& event)
00132 {
00133 FilesMonitorCollection::FileRecordPtr fileRecord =
00134 statReporter_->getFilesMonitorCollection().getNewFileRecord();
00135
00136 try
00137 {
00138 fileRecord->runNumber = event.runNumber();
00139 fileRecord->lumiSection = event.lumiSection();
00140 }
00141 catch(stor::exception::IncompleteEventMessage &e)
00142 {
00143 fileRecord->runNumber = sharedResources_->configuration_->getRunNumber();
00144 fileRecord->lumiSection = 0;
00145 }
00146 fileRecord->streamLabel = streamLabel();
00147 fileRecord->baseFilePath =
00148 getBaseFilePath(fileRecord->runNumber, fileRecord->entryCounter);
00149 fileRecord->coreFileName =
00150 getCoreFileName(fileRecord->runNumber, fileRecord->lumiSection);
00151 fileRecord->fileCounter = getFileCounter(fileRecord->coreFileName);
00152 fileRecord->whyClosed = FilesMonitorCollection::FileRecord::notClosed;
00153 fileRecord->isOpen = true;
00154
00155 return fileRecord;
00156 }
00157
00158
00159 std::string StreamHandler::getBaseFilePath
00160 (
00161 const uint32_t& runNumber,
00162 uint32_t fileCount
00163 ) const
00164 {
00165 return diskWritingParams_.filePath_ + getFileSystem(runNumber, fileCount);
00166 }
00167
00168
00169 std::string StreamHandler::getFileSystem
00170 (
00171 const uint32_t& runNumber,
00172 uint32_t fileCount
00173 ) const
00174 {
00175
00176
00177 if (diskWritingParams_.nLogicalDisk_ <= 0) {return "";}
00178
00179 unsigned int fileSystemNumber =
00180 (runNumber
00181 + atoi(diskWritingParams_.smInstanceString_.c_str())
00182 + fileCount)
00183 % diskWritingParams_.nLogicalDisk_;
00184
00185 std::ostringstream fileSystem;
00186 fileSystem << "/" << std::setfill('0') << std::setw(2) << fileSystemNumber;
00187
00188 return fileSystem.str();
00189 }
00190
00191
00192 std::string StreamHandler::getCoreFileName
00193 (
00194 const uint32_t& runNumber,
00195 const uint32_t& lumiSection
00196 ) const
00197 {
00198 std::ostringstream coreFileName;
00199 coreFileName << diskWritingParams_.setupLabel_
00200 << "." << std::setfill('0') << std::setw(8) << runNumber
00201 << "." << std::setfill('0') << std::setw(4) << lumiSection
00202 << "." << streamLabel()
00203 << "." << diskWritingParams_.fileName_
00204 << "." << std::setfill('0') << std::setw(2) << diskWritingParams_.smInstanceString_;
00205
00206 return coreFileName.str();
00207 }
00208
00209
00210 unsigned int StreamHandler::getFileCounter(const std::string& coreFileName)
00211 {
00212 CoreFileNamesMap::iterator pos = usedCoreFileNames_.find(coreFileName);
00213 if (pos == usedCoreFileNames_.end())
00214 {
00215 usedCoreFileNames_.insert(pos, std::make_pair(coreFileName, 0));
00216 return 0;
00217 }
00218 else
00219 {
00220 ++(pos->second);
00221 return pos->second;
00222 }
00223 }
00224
00225
00226 unsigned long long StreamHandler::getMaxFileSize() const
00227 {
00228 const unsigned long long maxFileSizeMB =
00229 diskWritingParams_.maxFileSizeMB_ > 0 ?
00230 diskWritingParams_.maxFileSizeMB_ : getStreamMaxFileSize();
00231
00232 return ( maxFileSizeMB * 1024 * 1024 );
00233 }
00234
00235 }
00236