CMS 3D CMS Logo

/data/refman/pasoursint/CMSSW_5_3_0/src/EventFilter/StorageManager/src/StreamHandler.cc

Go to the documentation of this file.
00001 // $Id: StreamHandler.cc,v 1.24 2012/04/04 12:17:04 mommsen Exp $
00003 
00004 #include <sstream>
00005 #include <iomanip>
00006 
00007 #include "EventFilter/StorageManager/interface/FileHandler.h"
00008 #include "EventFilter/StorageManager/interface/I2OChain.h"
00009 #include "EventFilter/StorageManager/interface/StreamHandler.h"
00010 
00011 #include "boost/bind.hpp"
00012 
00013 
00014 namespace stor {
00015   
00016   StreamHandler::StreamHandler(
00017     const SharedResourcesPtr sharedResources,
00018     const DbFileHandlerPtr dbFileHandler
00019   ) :
00020   sharedResources_(sharedResources),
00021   statReporter_(sharedResources->statisticsReporter_),
00022   streamRecord_(statReporter_->getStreamsMonitorCollection().getNewStreamRecord()),
00023   diskWritingParams_(dbFileHandler->getDiskWritingParams()),
00024   dbFileHandler_(dbFileHandler)
00025   {}
00026   
00027   
00028   void StreamHandler::closeAllFiles()
00029   {
00030     std::string errorMsg = "Failed to close all files for stream " + streamLabel() + ": ";
00031     
00032     for (FileHandlers::const_iterator it = fileHandlers_.begin(),
00033            itEnd = fileHandlers_.end(); it != itEnd; ++it)
00034     {
00035       try
00036       {
00037         (*it)->closeFile(FilesMonitorCollection::FileRecord::runEnded);
00038       }
00039       catch(xcept::Exception& e)
00040       {
00041         XCEPT_DECLARE_NESTED( stor::exception::DiskWriting,
00042           sentinelException, errorMsg, e );
00043         sharedResources_->alarmHandler_->
00044           notifySentinel(AlarmHandler::ERROR, sentinelException);
00045       }
00046       catch(std::exception &e)
00047       {
00048         errorMsg += e.what();
00049         XCEPT_DECLARE( stor::exception::DiskWriting,
00050           sentinelException, errorMsg );
00051         sharedResources_->alarmHandler_->
00052           notifySentinel(AlarmHandler::ERROR, sentinelException);
00053       }
00054       catch(...)
00055       {
00056         errorMsg += "Unknown exception";
00057         XCEPT_DECLARE( stor::exception::DiskWriting,
00058           sentinelException, errorMsg );
00059         sharedResources_->alarmHandler_->
00060           notifySentinel(AlarmHandler::ERROR, sentinelException);
00061       }
00062     }
00063     fileHandlers_.clear();
00064   }
00065   
00066   
00067   void StreamHandler::closeTimedOutFiles(utils::TimePoint_t currentTime)
00068   {
00069     fileHandlers_.erase(
00070       std::remove_if(fileHandlers_.begin(),
00071         fileHandlers_.end(),
00072         boost::bind(&FileHandler::tooOld,
00073           _1, currentTime)),
00074       fileHandlers_.end());
00075   }
00076   
00077   
00078   bool StreamHandler::closeFilesForLumiSection
00079   (
00080     const uint32_t& lumiSection,
00081     std::string& str
00082   )
00083   {
00084     fileHandlers_.erase(
00085       std::remove_if(fileHandlers_.begin(),
00086         fileHandlers_.end(),
00087         boost::bind(&FileHandler::isFromLumiSection,
00088           _1, lumiSection)),
00089       fileHandlers_.end());
00090     
00091     return streamRecord_->reportLumiSectionInfo(lumiSection, str);
00092   }
00093   
00094   
00095   void StreamHandler::writeEvent(const I2OChain& event)
00096   {
00097     FileHandlerPtr handler = getFileHandler(event);
00098     handler->writeEvent(event);
00099     streamRecord_->addSizeInBytes(event.totalDataSize());
00100     statReporter_->getThroughputMonitorCollection().
00101       addDiskWriteSample(event.totalDataSize());
00102   }
00103   
00104   
00105   StreamHandler::FileHandlerPtr StreamHandler::getFileHandler(const I2OChain& event)
00106   {
00107     for (
00108       FileHandlers::iterator it = fileHandlers_.begin(), itEnd = fileHandlers_.end();
00109       it != itEnd;
00110       ++it
00111     ) 
00112     {
00113       if ( (*it)->lumiSection() == event.lumiSection() )
00114       {
00115         if ( (*it)->tooLarge(event.totalDataSize()) )
00116         { 
00117           fileHandlers_.erase(it);
00118           break;
00119         }
00120         else
00121         {
00122           return (*it);
00123         }
00124       }
00125     }    
00126     return newFileHandler(event);
00127   }
00128   
00129   
00130   FilesMonitorCollection::FileRecordPtr
00131   StreamHandler::getNewFileRecord(const I2OChain& event)
00132   {
00133     FilesMonitorCollection::FileRecordPtr fileRecord =
00134       statReporter_->getFilesMonitorCollection().getNewFileRecord();
00135     
00136     try
00137     {
00138       fileRecord->runNumber = event.runNumber();
00139       fileRecord->lumiSection = event.lumiSection();
00140     }
00141     catch(stor::exception::IncompleteEventMessage &e)
00142     {
00143       fileRecord->runNumber = sharedResources_->configuration_->getRunNumber();
00144       fileRecord->lumiSection = 0;
00145     }
00146     fileRecord->streamLabel = streamLabel();
00147     fileRecord->baseFilePath =
00148       getBaseFilePath(fileRecord->runNumber, fileRecord->entryCounter);
00149     fileRecord->coreFileName =
00150       getCoreFileName(fileRecord->runNumber, fileRecord->lumiSection);
00151     fileRecord->fileCounter = getFileCounter(fileRecord->coreFileName);
00152     fileRecord->whyClosed = FilesMonitorCollection::FileRecord::notClosed;
00153     fileRecord->isOpen = true;
00154     
00155     streamRecord_->incrementFileCount(fileRecord->lumiSection);
00156     
00157     return fileRecord;
00158   }
00159   
00160   
00161   std::string StreamHandler::getBaseFilePath
00162   (
00163     const uint32_t& runNumber,
00164     uint32_t fileCount
00165   ) const
00166   {
00167     return diskWritingParams_.filePath_ + getFileSystem(runNumber, fileCount);
00168   }
00169   
00170   
00171   std::string StreamHandler::getFileSystem
00172   (
00173     const uint32_t& runNumber,
00174     uint32_t fileCount
00175   ) const
00176   {
00177     // if the number of logical disks is not specified, don't
00178     // add a file system subdir to the path
00179     if (diskWritingParams_.nLogicalDisk_ <= 0) {return "";}
00180     
00181     unsigned int fileSystemNumber =
00182       (runNumber
00183         + atoi(diskWritingParams_.smInstanceString_.c_str())
00184         + fileCount)
00185       % diskWritingParams_.nLogicalDisk_;
00186     
00187     std::ostringstream fileSystem;
00188     fileSystem << "/" << std::setfill('0') << std::setw(2) << fileSystemNumber; 
00189     
00190     return fileSystem.str();
00191   }
00192   
00193   
00194   std::string StreamHandler::getCoreFileName
00195   (
00196     const uint32_t& runNumber,
00197     const uint32_t& lumiSection
00198   ) const
00199   {
00200     std::ostringstream coreFileName;
00201     coreFileName << diskWritingParams_.setupLabel_
00202       << "." << std::setfill('0') << std::setw(8) << runNumber
00203       << "." << std::setfill('0') << std::setw(4) << lumiSection
00204       << "." << streamLabel()
00205       << "." << diskWritingParams_.fileName_
00206       << "." << std::setfill('0') << std::setw(2) << diskWritingParams_.smInstanceString_;
00207     
00208     return coreFileName.str();
00209   }
00210   
00211   
00212   unsigned int StreamHandler::getFileCounter(const std::string& coreFileName)
00213   {
00214     CoreFileNamesMap::iterator pos = usedCoreFileNames_.find(coreFileName);
00215     if (pos == usedCoreFileNames_.end())
00216     {
00217       usedCoreFileNames_.insert(pos, std::make_pair(coreFileName, 0));
00218       return 0;
00219     }
00220     else
00221     {
00222       ++(pos->second);
00223       return pos->second;
00224     }
00225   }
00226   
00227   
00228   unsigned long long StreamHandler::getMaxFileSize() const
00229   {
00230     const unsigned long long maxFileSizeMB =
00231       diskWritingParams_.maxFileSizeMB_ > 0 ? 
00232       diskWritingParams_.maxFileSizeMB_ : getStreamMaxFileSize();
00233     
00234     return ( maxFileSizeMB * 1024 * 1024 );
00235   }
00236   
00237 } // namespace stor
00238