Go to the documentation of this file.00001
00003
00004 #include <sstream>
00005 #include <iomanip>
00006
00007 #include "EventFilter/StorageManager/interface/FileHandler.h"
00008 #include "EventFilter/StorageManager/interface/I2OChain.h"
00009 #include "EventFilter/StorageManager/interface/StreamHandler.h"
00010
00011 #include "boost/bind.hpp"
00012
00013
00014 namespace stor {
00015
00016 StreamHandler::StreamHandler(
00017 const SharedResourcesPtr sharedResources,
00018 const DbFileHandlerPtr dbFileHandler
00019 ) :
00020 sharedResources_(sharedResources),
00021 statReporter_(sharedResources->statisticsReporter_),
00022 streamRecord_(statReporter_->getStreamsMonitorCollection().getNewStreamRecord()),
00023 diskWritingParams_(dbFileHandler->getDiskWritingParams()),
00024 dbFileHandler_(dbFileHandler)
00025 {}
00026
00027
00028 void StreamHandler::closeAllFiles()
00029 {
00030 std::string errorMsg = "Failed to close all files for stream " + streamLabel() + ": ";
00031
00032 for (FileHandlers::const_iterator it = fileHandlers_.begin(),
00033 itEnd = fileHandlers_.end(); it != itEnd; ++it)
00034 {
00035 try
00036 {
00037 (*it)->closeFile(FilesMonitorCollection::FileRecord::runEnded);
00038 }
00039 catch(xcept::Exception& e)
00040 {
00041 XCEPT_DECLARE_NESTED( stor::exception::DiskWriting,
00042 sentinelException, errorMsg, e );
00043 sharedResources_->alarmHandler_->
00044 notifySentinel(AlarmHandler::ERROR, sentinelException);
00045 }
00046 catch(std::exception &e)
00047 {
00048 errorMsg += e.what();
00049 XCEPT_DECLARE( stor::exception::DiskWriting,
00050 sentinelException, errorMsg );
00051 sharedResources_->alarmHandler_->
00052 notifySentinel(AlarmHandler::ERROR, sentinelException);
00053 }
00054 catch(...)
00055 {
00056 errorMsg += "Unknown exception";
00057 XCEPT_DECLARE( stor::exception::DiskWriting,
00058 sentinelException, errorMsg );
00059 sharedResources_->alarmHandler_->
00060 notifySentinel(AlarmHandler::ERROR, sentinelException);
00061 }
00062 }
00063 fileHandlers_.clear();
00064 }
00065
00066
00067 void StreamHandler::closeTimedOutFiles(utils::TimePoint_t currentTime)
00068 {
00069 fileHandlers_.erase(
00070 std::remove_if(fileHandlers_.begin(),
00071 fileHandlers_.end(),
00072 boost::bind(&FileHandler::tooOld,
00073 _1, currentTime)),
00074 fileHandlers_.end());
00075 }
00076
00077
00078 bool StreamHandler::closeFilesForLumiSection
00079 (
00080 const uint32_t& lumiSection,
00081 std::string& str
00082 )
00083 {
00084 fileHandlers_.erase(
00085 std::remove_if(fileHandlers_.begin(),
00086 fileHandlers_.end(),
00087 boost::bind(&FileHandler::isFromLumiSection,
00088 _1, lumiSection)),
00089 fileHandlers_.end());
00090
00091 return streamRecord_->reportLumiSectionInfo(lumiSection, str);
00092 }
00093
00094
00095 void StreamHandler::writeEvent(const I2OChain& event)
00096 {
00097 FileHandlerPtr handler = getFileHandler(event);
00098 handler->writeEvent(event);
00099 streamRecord_->addSizeInBytes(event.totalDataSize());
00100 statReporter_->getThroughputMonitorCollection().
00101 addDiskWriteSample(event.totalDataSize());
00102 }
00103
00104
00105 StreamHandler::FileHandlerPtr StreamHandler::getFileHandler(const I2OChain& event)
00106 {
00107 for (
00108 FileHandlers::iterator it = fileHandlers_.begin(), itEnd = fileHandlers_.end();
00109 it != itEnd;
00110 ++it
00111 )
00112 {
00113 if ( (*it)->lumiSection() == event.lumiSection() )
00114 {
00115 if ( (*it)->tooLarge(event.totalDataSize()) )
00116 {
00117 fileHandlers_.erase(it);
00118 break;
00119 }
00120 else
00121 {
00122 return (*it);
00123 }
00124 }
00125 }
00126 return newFileHandler(event);
00127 }
00128
00129
00130 FilesMonitorCollection::FileRecordPtr
00131 StreamHandler::getNewFileRecord(const I2OChain& event)
00132 {
00133 FilesMonitorCollection::FileRecordPtr fileRecord =
00134 statReporter_->getFilesMonitorCollection().getNewFileRecord();
00135
00136 try
00137 {
00138 fileRecord->runNumber = event.runNumber();
00139 fileRecord->lumiSection = event.lumiSection();
00140 }
00141 catch(stor::exception::IncompleteEventMessage &e)
00142 {
00143 fileRecord->runNumber = sharedResources_->configuration_->getRunNumber();
00144 fileRecord->lumiSection = 0;
00145 }
00146 fileRecord->streamLabel = streamLabel();
00147 fileRecord->baseFilePath =
00148 getBaseFilePath(fileRecord->runNumber, fileRecord->entryCounter);
00149 fileRecord->coreFileName =
00150 getCoreFileName(fileRecord->runNumber, fileRecord->lumiSection);
00151 fileRecord->fileCounter = getFileCounter(fileRecord->coreFileName);
00152 fileRecord->whyClosed = FilesMonitorCollection::FileRecord::notClosed;
00153 fileRecord->isOpen = true;
00154
00155 streamRecord_->incrementFileCount(fileRecord->lumiSection);
00156
00157 return fileRecord;
00158 }
00159
00160
00161 std::string StreamHandler::getBaseFilePath
00162 (
00163 const uint32_t& runNumber,
00164 uint32_t fileCount
00165 ) const
00166 {
00167 return diskWritingParams_.filePath_ + getFileSystem(runNumber, fileCount);
00168 }
00169
00170
00171 std::string StreamHandler::getFileSystem
00172 (
00173 const uint32_t& runNumber,
00174 uint32_t fileCount
00175 ) const
00176 {
00177
00178
00179 if (diskWritingParams_.nLogicalDisk_ <= 0) {return "";}
00180
00181 unsigned int fileSystemNumber =
00182 (runNumber
00183 + atoi(diskWritingParams_.smInstanceString_.c_str())
00184 + fileCount)
00185 % diskWritingParams_.nLogicalDisk_;
00186
00187 std::ostringstream fileSystem;
00188 fileSystem << "/" << std::setfill('0') << std::setw(2) << fileSystemNumber;
00189
00190 return fileSystem.str();
00191 }
00192
00193
00194 std::string StreamHandler::getCoreFileName
00195 (
00196 const uint32_t& runNumber,
00197 const uint32_t& lumiSection
00198 ) const
00199 {
00200 std::ostringstream coreFileName;
00201 coreFileName << diskWritingParams_.setupLabel_
00202 << "." << std::setfill('0') << std::setw(8) << runNumber
00203 << "." << std::setfill('0') << std::setw(4) << lumiSection
00204 << "." << streamLabel()
00205 << "." << diskWritingParams_.fileName_
00206 << "." << std::setfill('0') << std::setw(2) << diskWritingParams_.smInstanceString_;
00207
00208 return coreFileName.str();
00209 }
00210
00211
00212 unsigned int StreamHandler::getFileCounter(const std::string& coreFileName)
00213 {
00214 CoreFileNamesMap::iterator pos = usedCoreFileNames_.find(coreFileName);
00215 if (pos == usedCoreFileNames_.end())
00216 {
00217 usedCoreFileNames_.insert(pos, std::make_pair(coreFileName, 0));
00218 return 0;
00219 }
00220 else
00221 {
00222 ++(pos->second);
00223 return pos->second;
00224 }
00225 }
00226
00227
00228 unsigned long long StreamHandler::getMaxFileSize() const
00229 {
00230 const unsigned long long maxFileSizeMB =
00231 diskWritingParams_.maxFileSizeMB_ > 0 ?
00232 diskWritingParams_.maxFileSizeMB_ : getStreamMaxFileSize();
00233
00234 return ( maxFileSizeMB * 1024 * 1024 );
00235 }
00236
00237 }
00238