CMS 3D CMS Logo

/data/refman/pasoursint/CMSSW_4_1_8_patch13/src/EventFilter/StorageManager/src/StreamHandler.cc

Go to the documentation of this file.
00001 // $Id: StreamHandler.cc,v 1.21 2011/03/07 15:31:32 mommsen Exp $
00003 
00004 #include <sstream>
00005 #include <iomanip>
00006 
00007 #include "EventFilter/StorageManager/interface/FileHandler.h"
00008 #include "EventFilter/StorageManager/interface/I2OChain.h"
00009 #include "EventFilter/StorageManager/interface/StreamHandler.h"
00010 
00011 #include "boost/bind.hpp"
00012 
00013 
00014 namespace stor {
00015   
00016   StreamHandler::StreamHandler(
00017     const SharedResourcesPtr sharedResources,
00018     const DbFileHandlerPtr dbFileHandler
00019   ) :
00020   sharedResources_(sharedResources),
00021   statReporter_(sharedResources->statisticsReporter_),
00022   streamRecord_(statReporter_->getStreamsMonitorCollection().getNewStreamRecord()),
00023   diskWritingParams_(sharedResources->configuration_->getDiskWritingParams()),
00024   dbFileHandler_(dbFileHandler)
00025   {}
00026   
00027   
00028   void StreamHandler::closeAllFiles()
00029   {
00030     std::string errorMsg = "Failed to close all files for stream " + streamLabel() + ": ";
00031     
00032     for (FileHandlers::const_iterator it = fileHandlers_.begin(),
00033            itEnd = fileHandlers_.end(); it != itEnd; ++it)
00034     {
00035       try
00036       {
00037         (*it)->closeFile(FilesMonitorCollection::FileRecord::runEnded);
00038       }
00039       catch(xcept::Exception& e)
00040       {
00041         XCEPT_DECLARE_NESTED( stor::exception::DiskWriting,
00042           sentinelException, errorMsg, e );
00043         statReporter_->alarmHandler()->
00044           notifySentinel(AlarmHandler::ERROR, sentinelException);
00045       }
00046       catch(std::exception &e)
00047       {
00048         errorMsg += e.what();
00049         XCEPT_DECLARE( stor::exception::DiskWriting,
00050           sentinelException, errorMsg );
00051         statReporter_->alarmHandler()->
00052           notifySentinel(AlarmHandler::ERROR, sentinelException);
00053       }
00054       catch(...)
00055       {
00056         errorMsg += "Unknown exception";
00057         XCEPT_DECLARE( stor::exception::DiskWriting,
00058           sentinelException, errorMsg );
00059         statReporter_->alarmHandler()->
00060           notifySentinel(AlarmHandler::ERROR, sentinelException);
00061       }
00062     }
00063     fileHandlers_.clear();
00064   }
00065   
00066   
00067   void StreamHandler::closeTimedOutFiles(utils::TimePoint_t currentTime)
00068   {
00069     fileHandlers_.erase(
00070       std::remove_if(fileHandlers_.begin(),
00071         fileHandlers_.end(),
00072         boost::bind(&FileHandler::tooOld,
00073           _1, currentTime)),
00074       fileHandlers_.end());
00075   }
00076   
00077   
00078   void StreamHandler::closeFilesForLumiSection
00079   (
00080     const uint32_t& lumiSection,
00081     std::string& str
00082   )
00083   {
00084     streamRecord_->reportLumiSectionInfo(lumiSection, str);
00085     closeFilesForLumiSection(lumiSection);
00086   }
00087   
00088   
00089   void StreamHandler::closeFilesForLumiSection(const uint32_t lumiSection)
00090   {
00091     fileHandlers_.erase(
00092       std::remove_if(fileHandlers_.begin(),
00093         fileHandlers_.end(),
00094         boost::bind(&FileHandler::isFromLumiSection,
00095           _1, lumiSection)),
00096       fileHandlers_.end());
00097   }
00098   
00099   
00100   void StreamHandler::writeEvent(const I2OChain& event)
00101   {
00102     FileHandlerPtr handler = getFileHandler(event);
00103     handler->writeEvent(event);
00104     streamRecord_->addSizeInBytes(event.totalDataSize());
00105     statReporter_->getThroughputMonitorCollection().
00106       addDiskWriteSample(event.totalDataSize());
00107   }
00108   
00109   
00110   StreamHandler::FileHandlerPtr StreamHandler::getFileHandler(const I2OChain& event)
00111   {
00112     for (
00113       FileHandlers::iterator it = fileHandlers_.begin(), itEnd = fileHandlers_.end();
00114       it != itEnd;
00115       ++it
00116     ) 
00117     {
00118       if ( (*it)->lumiSection() == event.lumiSection() )
00119       {
00120         if ( (*it)->tooLarge(event.totalDataSize()) )
00121         { 
00122           fileHandlers_.erase(it);
00123           break;
00124         }
00125         else
00126         {
00127           return (*it);
00128         }
00129       }
00130     }    
00131     return newFileHandler(event);
00132   }
00133   
00134   
00135   FilesMonitorCollection::FileRecordPtr
00136   StreamHandler::getNewFileRecord(const I2OChain& event)
00137   {
00138     FilesMonitorCollection::FileRecordPtr fileRecord =
00139       statReporter_->getFilesMonitorCollection().getNewFileRecord();
00140     
00141     try
00142     {
00143       fileRecord->runNumber = event.runNumber();
00144       fileRecord->lumiSection = event.lumiSection();
00145     }
00146     catch(stor::exception::IncompleteEventMessage &e)
00147     {
00148       fileRecord->runNumber = sharedResources_->configuration_->getRunNumber();
00149       fileRecord->lumiSection = 0;
00150     }
00151     fileRecord->streamLabel = streamLabel();
00152     fileRecord->baseFilePath =
00153       getBaseFilePath(fileRecord->runNumber, fileRecord->entryCounter);
00154     fileRecord->coreFileName =
00155       getCoreFileName(fileRecord->runNumber, fileRecord->lumiSection);
00156     fileRecord->fileCounter = getFileCounter(fileRecord->coreFileName);
00157     fileRecord->whyClosed = FilesMonitorCollection::FileRecord::notClosed;
00158     fileRecord->isOpen = true;
00159     
00160     streamRecord_->incrementFileCount(fileRecord->lumiSection);
00161     
00162     return fileRecord;
00163   }
00164   
00165   
00166   std::string StreamHandler::getBaseFilePath
00167   (
00168     const uint32_t& runNumber,
00169     uint32_t fileCount
00170   ) const
00171   {
00172     return diskWritingParams_.filePath_ + getFileSystem(runNumber, fileCount);
00173   }
00174   
00175   
00176   std::string StreamHandler::getFileSystem
00177   (
00178     const uint32_t& runNumber,
00179     uint32_t fileCount
00180   ) const
00181   {
00182     // if the number of logical disks is not specified, don't
00183     // add a file system subdir to the path
00184     if (diskWritingParams_.nLogicalDisk_ <= 0) {return "";}
00185     
00186     unsigned int fileSystemNumber =
00187       (runNumber
00188         + atoi(diskWritingParams_.smInstanceString_.c_str())
00189         + fileCount)
00190       % diskWritingParams_.nLogicalDisk_;
00191     
00192     std::ostringstream fileSystem;
00193     fileSystem << "/" << std::setfill('0') << std::setw(2) << fileSystemNumber; 
00194     
00195     return fileSystem.str();
00196   }
00197   
00198   
00199   std::string StreamHandler::getCoreFileName
00200   (
00201     const uint32_t& runNumber,
00202     const uint32_t& lumiSection
00203   ) const
00204   {
00205     std::ostringstream coreFileName;
00206     coreFileName << diskWritingParams_.setupLabel_
00207       << "." << std::setfill('0') << std::setw(8) << runNumber
00208       << "." << std::setfill('0') << std::setw(4) << lumiSection
00209       << "." << streamLabel()
00210       << "." << diskWritingParams_.fileName_
00211       << "." << std::setfill('0') << std::setw(2) << diskWritingParams_.smInstanceString_;
00212     
00213     return coreFileName.str();
00214   }
00215   
00216   
00217   unsigned int StreamHandler::getFileCounter(const std::string& coreFileName)
00218   {
00219     CoreFileNamesMap::iterator pos = usedCoreFileNames_.find(coreFileName);
00220     if (pos == usedCoreFileNames_.end())
00221     {
00222       usedCoreFileNames_.insert(pos, std::make_pair(coreFileName, 0));
00223       return 0;
00224     }
00225     else
00226     {
00227       ++(pos->second);
00228       return pos->second;
00229     }
00230   }
00231   
00232   
00233   unsigned long long StreamHandler::getMaxFileSize() const
00234   {
00235     const unsigned long long maxFileSizeMB =
00236       diskWritingParams_.maxFileSizeMB_ > 0 ? 
00237       diskWritingParams_.maxFileSizeMB_ : getStreamMaxFileSize();
00238     
00239     return ( maxFileSizeMB * 1024 * 1024 );
00240   }
00241   
00242 } // namespace stor
00243