CMS 3D CMS Logo

/data/refman/pasoursint/CMSSW_6_1_1/src/EventFilter/StorageManager/src/StreamHandler.cc

Go to the documentation of this file.
00001 // $Id: StreamHandler.cc,v 1.25 2012/10/17 10:13:25 mommsen Exp $
00003 
00004 #include <sstream>
00005 #include <iomanip>
00006 
00007 #include "EventFilter/StorageManager/interface/FileHandler.h"
00008 #include "EventFilter/StorageManager/interface/I2OChain.h"
00009 #include "EventFilter/StorageManager/interface/StreamHandler.h"
00010 
00011 #include "boost/bind.hpp"
00012 
00013 
00014 namespace stor {
00015   
00016   StreamHandler::StreamHandler(
00017     const SharedResourcesPtr sharedResources,
00018     const DbFileHandlerPtr dbFileHandler
00019   ) :
00020   sharedResources_(sharedResources),
00021   statReporter_(sharedResources->statisticsReporter_),
00022   streamRecord_(statReporter_->getStreamsMonitorCollection().getNewStreamRecord()),
00023   diskWritingParams_(dbFileHandler->getDiskWritingParams()),
00024   dbFileHandler_(dbFileHandler)
00025   {}
00026   
00027   
00028   void StreamHandler::closeAllFiles()
00029   {
00030     std::string errorMsg = "Failed to close all files for stream " + streamLabel() + ": ";
00031     
00032     for (FileHandlers::const_iterator it = fileHandlers_.begin(),
00033            itEnd = fileHandlers_.end(); it != itEnd; ++it)
00034     {
00035       try
00036       {
00037         (*it)->closeFile(FilesMonitorCollection::FileRecord::runEnded);
00038       }
00039       catch(xcept::Exception& e)
00040       {
00041         XCEPT_DECLARE_NESTED( stor::exception::DiskWriting,
00042           sentinelException, errorMsg, e );
00043         sharedResources_->alarmHandler_->
00044           notifySentinel(AlarmHandler::ERROR, sentinelException);
00045       }
00046       catch(std::exception &e)
00047       {
00048         errorMsg += e.what();
00049         XCEPT_DECLARE( stor::exception::DiskWriting,
00050           sentinelException, errorMsg );
00051         sharedResources_->alarmHandler_->
00052           notifySentinel(AlarmHandler::ERROR, sentinelException);
00053       }
00054       catch(...)
00055       {
00056         errorMsg += "Unknown exception";
00057         XCEPT_DECLARE( stor::exception::DiskWriting,
00058           sentinelException, errorMsg );
00059         sharedResources_->alarmHandler_->
00060           notifySentinel(AlarmHandler::ERROR, sentinelException);
00061       }
00062     }
00063     fileHandlers_.clear();
00064   }
00065   
00066   
00067   void StreamHandler::closeTimedOutFiles(utils::TimePoint_t currentTime)
00068   {
00069     fileHandlers_.erase(
00070       std::remove_if(fileHandlers_.begin(),
00071         fileHandlers_.end(),
00072         boost::bind(&FileHandler::tooOld,
00073           _1, currentTime)),
00074       fileHandlers_.end());
00075   }
00076   
00077   
00078   bool StreamHandler::closeFilesForLumiSection
00079   (
00080     const uint32_t& lumiSection,
00081     std::string& str
00082   )
00083   {
00084     fileHandlers_.erase(
00085       std::remove_if(fileHandlers_.begin(),
00086         fileHandlers_.end(),
00087         boost::bind(&FileHandler::isFromLumiSection,
00088           _1, lumiSection)),
00089       fileHandlers_.end());
00090     
00091     return streamRecord_->reportLumiSectionInfo(lumiSection, str);
00092   }
00093   
00094   
00095   void StreamHandler::writeEvent(const I2OChain& event)
00096   {
00097     FileHandlerPtr handler = getFileHandler(event);
00098     handler->writeEvent(event);
00099     streamRecord_->addSizeInBytes(event.totalDataSize());
00100     statReporter_->getThroughputMonitorCollection().
00101       addDiskWriteSample(event.totalDataSize());
00102   }
00103   
00104   
00105   StreamHandler::FileHandlerPtr StreamHandler::getFileHandler(const I2OChain& event)
00106   {
00107     for (
00108       FileHandlers::iterator it = fileHandlers_.begin(), itEnd = fileHandlers_.end();
00109       it != itEnd;
00110       ++it
00111     ) 
00112     {
00113       if ( (*it)->lumiSection() == event.lumiSection() )
00114       {
00115         if ( (*it)->tooLarge(event.totalDataSize()) )
00116         { 
00117           fileHandlers_.erase(it);
00118           break;
00119         }
00120         else
00121         {
00122           return (*it);
00123         }
00124       }
00125     }    
00126     return newFileHandler(event);
00127   }
00128   
00129   
00130   FilesMonitorCollection::FileRecordPtr
00131   StreamHandler::getNewFileRecord(const I2OChain& event)
00132   {
00133     FilesMonitorCollection::FileRecordPtr fileRecord =
00134       statReporter_->getFilesMonitorCollection().getNewFileRecord();
00135     
00136     try
00137     {
00138       fileRecord->runNumber = event.runNumber();
00139       fileRecord->lumiSection = event.lumiSection();
00140     }
00141     catch(stor::exception::IncompleteEventMessage &e)
00142     {
00143       fileRecord->runNumber = sharedResources_->configuration_->getRunNumber();
00144       fileRecord->lumiSection = 0;
00145     }
00146     fileRecord->streamLabel = streamLabel();
00147     fileRecord->baseFilePath =
00148       getBaseFilePath(fileRecord->runNumber, fileRecord->entryCounter);
00149     fileRecord->coreFileName =
00150       getCoreFileName(fileRecord->runNumber, fileRecord->lumiSection);
00151     fileRecord->fileCounter = getFileCounter(fileRecord->coreFileName);
00152     fileRecord->whyClosed = FilesMonitorCollection::FileRecord::notClosed;
00153     fileRecord->isOpen = true;
00154     
00155     return fileRecord;
00156   }
00157   
00158   
00159   std::string StreamHandler::getBaseFilePath
00160   (
00161     const uint32_t& runNumber,
00162     uint32_t fileCount
00163   ) const
00164   {
00165     return diskWritingParams_.filePath_ + getFileSystem(runNumber, fileCount);
00166   }
00167   
00168   
00169   std::string StreamHandler::getFileSystem
00170   (
00171     const uint32_t& runNumber,
00172     uint32_t fileCount
00173   ) const
00174   {
00175     // if the number of logical disks is not specified, don't
00176     // add a file system subdir to the path
00177     if (diskWritingParams_.nLogicalDisk_ <= 0) {return "";}
00178     
00179     unsigned int fileSystemNumber =
00180       (runNumber
00181         + atoi(diskWritingParams_.smInstanceString_.c_str())
00182         + fileCount)
00183       % diskWritingParams_.nLogicalDisk_;
00184     
00185     std::ostringstream fileSystem;
00186     fileSystem << "/" << std::setfill('0') << std::setw(2) << fileSystemNumber; 
00187     
00188     return fileSystem.str();
00189   }
00190   
00191   
00192   std::string StreamHandler::getCoreFileName
00193   (
00194     const uint32_t& runNumber,
00195     const uint32_t& lumiSection
00196   ) const
00197   {
00198     std::ostringstream coreFileName;
00199     coreFileName << diskWritingParams_.setupLabel_
00200       << "." << std::setfill('0') << std::setw(8) << runNumber
00201       << "." << std::setfill('0') << std::setw(4) << lumiSection
00202       << "." << streamLabel()
00203       << "." << diskWritingParams_.fileName_
00204       << "." << std::setfill('0') << std::setw(2) << diskWritingParams_.smInstanceString_;
00205     
00206     return coreFileName.str();
00207   }
00208   
00209   
00210   unsigned int StreamHandler::getFileCounter(const std::string& coreFileName)
00211   {
00212     CoreFileNamesMap::iterator pos = usedCoreFileNames_.find(coreFileName);
00213     if (pos == usedCoreFileNames_.end())
00214     {
00215       usedCoreFileNames_.insert(pos, std::make_pair(coreFileName, 0));
00216       return 0;
00217     }
00218     else
00219     {
00220       ++(pos->second);
00221       return pos->second;
00222     }
00223   }
00224   
00225   
00226   unsigned long long StreamHandler::getMaxFileSize() const
00227   {
00228     const unsigned long long maxFileSizeMB =
00229       diskWritingParams_.maxFileSizeMB_ > 0 ? 
00230       diskWritingParams_.maxFileSizeMB_ : getStreamMaxFileSize();
00231     
00232     return ( maxFileSizeMB * 1024 * 1024 );
00233   }
00234   
00235 } // namespace stor
00236