Go to the documentation of this file.00001
00003
00004 #include <sstream>
00005 #include <iomanip>
00006
00007 #include "EventFilter/StorageManager/interface/FileHandler.h"
00008 #include "EventFilter/StorageManager/interface/I2OChain.h"
00009 #include "EventFilter/StorageManager/interface/StreamHandler.h"
00010
00011 #include "boost/bind.hpp"
00012
00013
00014 namespace stor {
00015
00016 StreamHandler::StreamHandler(
00017 const SharedResourcesPtr sharedResources,
00018 const DbFileHandlerPtr dbFileHandler
00019 ) :
00020 sharedResources_(sharedResources),
00021 statReporter_(sharedResources->statisticsReporter_),
00022 streamRecord_(statReporter_->getStreamsMonitorCollection().getNewStreamRecord()),
00023 diskWritingParams_(sharedResources->configuration_->getDiskWritingParams()),
00024 dbFileHandler_(dbFileHandler)
00025 {}
00026
00027
00028 void StreamHandler::closeAllFiles()
00029 {
00030 std::string errorMsg = "Failed to close all files for stream " + streamLabel() + ": ";
00031
00032 for (FileHandlers::const_iterator it = fileHandlers_.begin(),
00033 itEnd = fileHandlers_.end(); it != itEnd; ++it)
00034 {
00035 try
00036 {
00037 (*it)->closeFile(FilesMonitorCollection::FileRecord::runEnded);
00038 }
00039 catch(xcept::Exception& e)
00040 {
00041 XCEPT_DECLARE_NESTED( stor::exception::DiskWriting,
00042 sentinelException, errorMsg, e );
00043 statReporter_->alarmHandler()->
00044 notifySentinel(AlarmHandler::ERROR, sentinelException);
00045 }
00046 catch(std::exception &e)
00047 {
00048 errorMsg += e.what();
00049 XCEPT_DECLARE( stor::exception::DiskWriting,
00050 sentinelException, errorMsg );
00051 statReporter_->alarmHandler()->
00052 notifySentinel(AlarmHandler::ERROR, sentinelException);
00053 }
00054 catch(...)
00055 {
00056 errorMsg += "Unknown exception";
00057 XCEPT_DECLARE( stor::exception::DiskWriting,
00058 sentinelException, errorMsg );
00059 statReporter_->alarmHandler()->
00060 notifySentinel(AlarmHandler::ERROR, sentinelException);
00061 }
00062 }
00063 fileHandlers_.clear();
00064 }
00065
00066
00067 void StreamHandler::closeTimedOutFiles(utils::TimePoint_t currentTime)
00068 {
00069 fileHandlers_.erase(
00070 std::remove_if(fileHandlers_.begin(),
00071 fileHandlers_.end(),
00072 boost::bind(&FileHandler::tooOld,
00073 _1, currentTime)),
00074 fileHandlers_.end());
00075 }
00076
00077
00078 void StreamHandler::closeFilesForLumiSection
00079 (
00080 const uint32_t& lumiSection,
00081 std::string& str
00082 )
00083 {
00084 streamRecord_->reportLumiSectionInfo(lumiSection, str);
00085 closeFilesForLumiSection(lumiSection);
00086 }
00087
00088
00089 void StreamHandler::closeFilesForLumiSection(const uint32_t lumiSection)
00090 {
00091 fileHandlers_.erase(
00092 std::remove_if(fileHandlers_.begin(),
00093 fileHandlers_.end(),
00094 boost::bind(&FileHandler::isFromLumiSection,
00095 _1, lumiSection)),
00096 fileHandlers_.end());
00097 }
00098
00099
00100 void StreamHandler::writeEvent(const I2OChain& event)
00101 {
00102 FileHandlerPtr handler = getFileHandler(event);
00103 handler->writeEvent(event);
00104 streamRecord_->addSizeInBytes(event.totalDataSize());
00105 statReporter_->getThroughputMonitorCollection().
00106 addDiskWriteSample(event.totalDataSize());
00107 }
00108
00109
00110 StreamHandler::FileHandlerPtr StreamHandler::getFileHandler(const I2OChain& event)
00111 {
00112 for (
00113 FileHandlers::iterator it = fileHandlers_.begin(), itEnd = fileHandlers_.end();
00114 it != itEnd;
00115 ++it
00116 )
00117 {
00118 if ( (*it)->lumiSection() == event.lumiSection() )
00119 {
00120 if ( (*it)->tooLarge(event.totalDataSize()) )
00121 {
00122 fileHandlers_.erase(it);
00123 break;
00124 }
00125 else
00126 {
00127 return (*it);
00128 }
00129 }
00130 }
00131 return newFileHandler(event);
00132 }
00133
00134
00135 FilesMonitorCollection::FileRecordPtr
00136 StreamHandler::getNewFileRecord(const I2OChain& event)
00137 {
00138 FilesMonitorCollection::FileRecordPtr fileRecord =
00139 statReporter_->getFilesMonitorCollection().getNewFileRecord();
00140
00141 try
00142 {
00143 fileRecord->runNumber = event.runNumber();
00144 fileRecord->lumiSection = event.lumiSection();
00145 }
00146 catch(stor::exception::IncompleteEventMessage &e)
00147 {
00148 fileRecord->runNumber = sharedResources_->configuration_->getRunNumber();
00149 fileRecord->lumiSection = 0;
00150 }
00151 fileRecord->streamLabel = streamLabel();
00152 fileRecord->baseFilePath =
00153 getBaseFilePath(fileRecord->runNumber, fileRecord->entryCounter);
00154 fileRecord->coreFileName =
00155 getCoreFileName(fileRecord->runNumber, fileRecord->lumiSection);
00156 fileRecord->fileCounter = getFileCounter(fileRecord->coreFileName);
00157 fileRecord->whyClosed = FilesMonitorCollection::FileRecord::notClosed;
00158 fileRecord->isOpen = true;
00159
00160 streamRecord_->incrementFileCount(fileRecord->lumiSection);
00161
00162 return fileRecord;
00163 }
00164
00165
00166 std::string StreamHandler::getBaseFilePath
00167 (
00168 const uint32_t& runNumber,
00169 uint32_t fileCount
00170 ) const
00171 {
00172 return diskWritingParams_.filePath_ + getFileSystem(runNumber, fileCount);
00173 }
00174
00175
00176 std::string StreamHandler::getFileSystem
00177 (
00178 const uint32_t& runNumber,
00179 uint32_t fileCount
00180 ) const
00181 {
00182
00183
00184 if (diskWritingParams_.nLogicalDisk_ <= 0) {return "";}
00185
00186 unsigned int fileSystemNumber =
00187 (runNumber
00188 + atoi(diskWritingParams_.smInstanceString_.c_str())
00189 + fileCount)
00190 % diskWritingParams_.nLogicalDisk_;
00191
00192 std::ostringstream fileSystem;
00193 fileSystem << "/" << std::setfill('0') << std::setw(2) << fileSystemNumber;
00194
00195 return fileSystem.str();
00196 }
00197
00198
00199 std::string StreamHandler::getCoreFileName
00200 (
00201 const uint32_t& runNumber,
00202 const uint32_t& lumiSection
00203 ) const
00204 {
00205 std::ostringstream coreFileName;
00206 coreFileName << diskWritingParams_.setupLabel_
00207 << "." << std::setfill('0') << std::setw(8) << runNumber
00208 << "." << std::setfill('0') << std::setw(4) << lumiSection
00209 << "." << streamLabel()
00210 << "." << diskWritingParams_.fileName_
00211 << "." << std::setfill('0') << std::setw(2) << diskWritingParams_.smInstanceString_;
00212
00213 return coreFileName.str();
00214 }
00215
00216
00217 unsigned int StreamHandler::getFileCounter(const std::string& coreFileName)
00218 {
00219 CoreFileNamesMap::iterator pos = usedCoreFileNames_.find(coreFileName);
00220 if (pos == usedCoreFileNames_.end())
00221 {
00222 usedCoreFileNames_.insert(pos, std::make_pair(coreFileName, 0));
00223 return 0;
00224 }
00225 else
00226 {
00227 ++(pos->second);
00228 return pos->second;
00229 }
00230 }
00231
00232
00233 unsigned long long StreamHandler::getMaxFileSize() const
00234 {
00235 const unsigned long long maxFileSizeMB =
00236 diskWritingParams_.maxFileSizeMB_ > 0 ?
00237 diskWritingParams_.maxFileSizeMB_ : getStreamMaxFileSize();
00238
00239 return ( maxFileSizeMB * 1024 * 1024 );
00240 }
00241
00242 }
00243