CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
List of all members | Public Member Functions | Protected Types | Protected Member Functions | Protected Attributes | Private Member Functions
stor::StreamHandler Class Referenceabstract

#include <StreamHandler.h>

Inheritance diagram for stor::StreamHandler:
stor::EventStreamHandler stor::FaultyEventStreamHandler stor::FRDStreamHandler

Public Member Functions

void closeAllFiles ()
 
void closeFilesForLumiSection (const uint32_t lumiSection)
 
void closeFilesForLumiSection (const uint32_t &lumiSection, std::string &)
 
void closeTimedOutFiles (utils::TimePoint_t currentTime=utils::getCurrentTime())
 
 StreamHandler (const SharedResourcesPtr, const DbFileHandlerPtr)
 
void writeEvent (const I2OChain &event)
 
virtual ~StreamHandler ()
 

Protected Types

typedef std::map< std::string,
unsigned int > 
CoreFileNamesMap
 
typedef boost::shared_ptr
< FileHandler
FileHandlerPtr
 
typedef std::vector
< FileHandlerPtr
FileHandlers
 

Protected Member Functions

virtual double fractionToDisk () const =0
 
virtual FileHandlerPtr getFileHandler (const I2OChain &event)
 
unsigned long long getMaxFileSize () const
 
FilesMonitorCollection::FileRecordPtr getNewFileRecord (const I2OChain &event)
 
virtual int getStreamMaxFileSize () const =0
 
virtual FileHandlerPtr newFileHandler (const I2OChain &event)=0
 
virtual std::string streamLabel () const =0
 

Protected Attributes

const DbFileHandlerPtr dbFileHandler_
 
const DiskWritingParams diskWritingParams_
 
FileHandlers fileHandlers_
 
const SharedResourcesPtr sharedResources_
 
const StatisticsReporterPtr statReporter_
 
const
StreamsMonitorCollection::StreamRecordPtr 
streamRecord_
 
CoreFileNamesMap usedCoreFileNames_
 

Private Member Functions

bool fileTooLarge (const FileHandlerPtr, const unsigned long &dataSize) const
 
std::string getBaseFilePath (const uint32_t &runNumber, uint32_t fileCount) const
 
std::string getCoreFileName (const uint32_t &runNumber, const uint32_t &lumiSection) const
 
unsigned int getFileCounter (const std::string &coreFileName)
 
std::string getFileSystem (const uint32_t &runNumber, uint32_t fileCount) const
 
StreamHandleroperator= (StreamHandler const &)
 
 StreamHandler (StreamHandler const &)
 

Detailed Description

Abstract class to handle one stream written to disk.

Author:
mommsen
Revision:
1.13.6.1
Date:
2011/03/07 11:33:04

Definition at line 31 of file StreamHandler.h.

Member Typedef Documentation

typedef std::map<std::string, unsigned int> stor::StreamHandler::CoreFileNamesMap
protected

Definition at line 149 of file StreamHandler.h.

typedef boost::shared_ptr<FileHandler> stor::StreamHandler::FileHandlerPtr
protected

Definition at line 71 of file StreamHandler.h.

typedef std::vector<FileHandlerPtr> stor::StreamHandler::FileHandlers
protected

Definition at line 146 of file StreamHandler.h.

Constructor & Destructor Documentation

stor::StreamHandler::StreamHandler ( const SharedResourcesPtr  sharedResources,
const DbFileHandlerPtr  dbFileHandler 
)

Definition at line 16 of file StreamHandler.cc.

19  :
20  sharedResources_(sharedResources),
21  statReporter_(sharedResources->statisticsReporter_),
22  streamRecord_(statReporter_->getStreamsMonitorCollection().getNewStreamRecord()),
23  diskWritingParams_(sharedResources->configuration_->getDiskWritingParams()),
24  dbFileHandler_(dbFileHandler)
25  {}
const StatisticsReporterPtr statReporter_
const DiskWritingParams diskWritingParams_
const DbFileHandlerPtr dbFileHandler_
const StreamsMonitorCollection::StreamRecordPtr streamRecord_
const SharedResourcesPtr sharedResources_
virtual stor::StreamHandler::~StreamHandler ( )
inlinevirtual

Definition at line 37 of file StreamHandler.h.

37 {};
stor::StreamHandler::StreamHandler ( StreamHandler const &  )
private

Member Function Documentation

void stor::StreamHandler::closeAllFiles ( )

Gracefully close all open files

Definition at line 28 of file StreamHandler.cc.

References ExpressReco_HICollisions_FallBack::e, stor::AlarmHandler::ERROR, cmsCodeRules.cppFunctionSkipper::exception, edm::hlt::Exception, fileHandlers_, stor::FilesMonitorCollection::FileRecord::runEnded, statReporter_, and streamLabel().

Referenced by stor::DiskWriter::destroyStreams(), and stor::FaultyEventStreamHandler::getFileHandler().

29  {
30  std::string errorMsg = "Failed to close all files for stream " + streamLabel() + ": ";
31 
32  for (FileHandlers::const_iterator it = fileHandlers_.begin(),
33  itEnd = fileHandlers_.end(); it != itEnd; ++it)
34  {
35  try
36  {
38  }
39  catch(xcept::Exception& e)
40  {
41  XCEPT_DECLARE_NESTED( stor::exception::DiskWriting,
42  sentinelException, errorMsg, e );
43  statReporter_->alarmHandler()->
44  notifySentinel(AlarmHandler::ERROR, sentinelException);
45  }
46  catch(std::exception &e)
47  {
48  errorMsg += e.what();
49  XCEPT_DECLARE( stor::exception::DiskWriting,
50  sentinelException, errorMsg );
51  statReporter_->alarmHandler()->
52  notifySentinel(AlarmHandler::ERROR, sentinelException);
53  }
54  catch(...)
55  {
56  errorMsg += "Unknown exception";
57  XCEPT_DECLARE( stor::exception::DiskWriting,
58  sentinelException, errorMsg );
59  statReporter_->alarmHandler()->
60  notifySentinel(AlarmHandler::ERROR, sentinelException);
61  }
62  }
63  fileHandlers_.clear();
64  }
const StatisticsReporterPtr statReporter_
virtual std::string streamLabel() const =0
FileHandlers fileHandlers_
void stor::StreamHandler::closeFilesForLumiSection ( const uint32_t  lumiSection)

Close all files which belong to the given lumi section

Definition at line 89 of file StreamHandler.cc.

References fileHandlers_, and stor::FileHandler::isFromLumiSection().

90  {
91  fileHandlers_.erase(
92  std::remove_if(fileHandlers_.begin(),
93  fileHandlers_.end(),
94  boost::bind(&FileHandler::isFromLumiSection,
95  _1, lumiSection)),
96  fileHandlers_.end());
97  }
FileHandlers fileHandlers_
bool isFromLumiSection(const uint32_t lumiSection)
Definition: FileHandler.cc:164
void stor::StreamHandler::closeFilesForLumiSection ( const uint32_t &  lumiSection,
std::string &  str 
)

Close all files which belong to the given lumi section and print number of files for this lumi section into the passed string.

Definition at line 79 of file StreamHandler.cc.

83  {
84  streamRecord_->reportLumiSectionInfo(lumiSection, str);
85  closeFilesForLumiSection(lumiSection);
86  }
const StreamsMonitorCollection::StreamRecordPtr streamRecord_
void closeFilesForLumiSection(const uint32_t lumiSection)
void stor::StreamHandler::closeTimedOutFiles ( utils::TimePoint_t  currentTime = utils::getCurrentTime())

Close all files which are have not seen any recent events

Definition at line 67 of file StreamHandler.cc.

References fileHandlers_, and stor::FileHandler::tooOld().

Referenced by stor::DiskWriter::closeTimedOutFiles().

68  {
69  fileHandlers_.erase(
70  std::remove_if(fileHandlers_.begin(),
71  fileHandlers_.end(),
72  boost::bind(&FileHandler::tooOld,
73  _1, currentTime)),
74  fileHandlers_.end());
75  }
bool tooOld(const utils::TimePoint_t currentTime=utils::getCurrentTime())
Definition: FileHandler.cc:149
FileHandlers fileHandlers_
bool stor::StreamHandler::fileTooLarge ( const FileHandlerPtr  ,
const unsigned long &  dataSize 
) const
private

Return true if the file would become too large when adding dataSize in Bytes

virtual double stor::StreamHandler::fractionToDisk ( ) const
protectedpure virtual
std::string stor::StreamHandler::getBaseFilePath ( const uint32_t &  runNumber,
uint32_t  fileCount 
) const
private

Get path w/o working directory

Definition at line 167 of file StreamHandler.cc.

Referenced by getNewFileRecord().

171  {
173  }
const DiskWritingParams diskWritingParams_
std::string getFileSystem(const uint32_t &runNumber, uint32_t fileCount) const
std::string stor::StreamHandler::getCoreFileName ( const uint32_t &  runNumber,
const uint32_t &  lumiSection 
) const
private

Get the core file name

Definition at line 200 of file StreamHandler.cc.

References python.StorageManager_cfg::streamLabel.

Referenced by getNewFileRecord().

204  {
205  std::ostringstream coreFileName;
206  coreFileName << diskWritingParams_.setupLabel_
207  << "." << std::setfill('0') << std::setw(8) << runNumber
208  << "." << std::setfill('0') << std::setw(4) << lumiSection
209  << "." << streamLabel()
210  << "." << diskWritingParams_.fileName_
211  << "." << std::setfill('0') << std::setw(2) << diskWritingParams_.smInstanceString_;
212 
213  return coreFileName.str();
214  }
const DiskWritingParams diskWritingParams_
std::string smInstanceString_
Definition: Configuration.h:51
virtual std::string streamLabel() const =0
unsigned int stor::StreamHandler::getFileCounter ( const std::string &  coreFileName)
private

Get the instance count of this core file name

Definition at line 217 of file StreamHandler.cc.

References pos, and usedCoreFileNames_.

Referenced by getNewFileRecord().

218  {
219  CoreFileNamesMap::iterator pos = usedCoreFileNames_.find(coreFileName);
220  if (pos == usedCoreFileNames_.end())
221  {
222  usedCoreFileNames_.insert(pos, std::make_pair(coreFileName, 0));
223  return 0;
224  }
225  else
226  {
227  ++(pos->second);
228  return pos->second;
229  }
230  }
CoreFileNamesMap usedCoreFileNames_
StreamHandler::FileHandlerPtr stor::StreamHandler::getFileHandler ( const I2OChain event)
protectedvirtual

Get the file handler responsible for the event

Reimplemented in stor::FaultyEventStreamHandler.

Definition at line 110 of file StreamHandler.cc.

References fileHandlers_, newFileHandler(), and stor::I2OChain::totalDataSize().

Referenced by writeEvent().

111  {
112  for (
113  FileHandlers::iterator it = fileHandlers_.begin(), itEnd = fileHandlers_.end();
114  it != itEnd;
115  ++it
116  )
117  {
118  if ( (*it)->lumiSection() == event.lumiSection() )
119  {
120  if ( (*it)->tooLarge(event.totalDataSize()) )
121  {
122  fileHandlers_.erase(it);
123  break;
124  }
125  else
126  {
127  return (*it);
128  }
129  }
130  }
131  return newFileHandler(event);
132  }
virtual FileHandlerPtr newFileHandler(const I2OChain &event)=0
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger but the state exists so we define the behavior If all triggers are the negative crieriion will lead to accepting the event(this again matches the behavior of"!*"before the partial wildcard feature was incorporated).The per-event"cost"of each negative criterion with multiple relevant triggers is about the same as!*was in the past
FileHandlers fileHandlers_
std::string stor::StreamHandler::getFileSystem ( const uint32_t &  runNumber,
uint32_t  fileCount 
) const
private

Get file system string

Definition at line 177 of file StreamHandler.cc.

181  {
182  // if the number of logical disks is not specified, don't
183  // add a file system subdir to the path
184  if (diskWritingParams_.nLogicalDisk_ <= 0) {return "";}
185 
186  unsigned int fileSystemNumber =
187  (runNumber
188  + atoi(diskWritingParams_.smInstanceString_.c_str())
189  + fileCount)
191 
192  std::ostringstream fileSystem;
193  fileSystem << "/" << std::setfill('0') << std::setw(2) << fileSystemNumber;
194 
195  return fileSystem.str();
196  }
const DiskWritingParams diskWritingParams_
std::string smInstanceString_
Definition: Configuration.h:51
unsigned long long stor::StreamHandler::getMaxFileSize ( ) const
protected

Return the maximum file size in bytes

Definition at line 233 of file StreamHandler.cc.

References diskWritingParams_, getStreamMaxFileSize(), and stor::DiskWritingParams::maxFileSizeMB_.

Referenced by stor::FRDStreamHandler::newFileHandler(), and stor::EventStreamHandler::newFileHandler().

234  {
235  const unsigned long long maxFileSizeMB =
238 
239  return ( maxFileSizeMB * 1024 * 1024 );
240  }
virtual int getStreamMaxFileSize() const =0
const DiskWritingParams diskWritingParams_
FilesMonitorCollection::FileRecordPtr stor::StreamHandler::getNewFileRecord ( const I2OChain event)
protected

Return a new file record for the event

Definition at line 136 of file StreamHandler.cc.

References ExpressReco_HICollisions_FallBack::e, getBaseFilePath(), getCoreFileName(), getFileCounter(), stor::FilesMonitorCollection::FileRecord::notClosed, sharedResources_, statReporter_, streamLabel(), and streamRecord_.

Referenced by stor::FRDStreamHandler::newFileHandler(), stor::EventStreamHandler::newFileHandler(), and stor::FaultyEventStreamHandler::newFileHandler().

137  {
139  statReporter_->getFilesMonitorCollection().getNewFileRecord();
140 
141  try
142  {
143  fileRecord->runNumber = event.runNumber();
144  fileRecord->lumiSection = event.lumiSection();
145  }
146  catch(stor::exception::IncompleteEventMessage &e)
147  {
148  fileRecord->runNumber = sharedResources_->configuration_->getRunNumber();
149  fileRecord->lumiSection = 0;
150  }
151  fileRecord->streamLabel = streamLabel();
152  fileRecord->baseFilePath =
153  getBaseFilePath(fileRecord->runNumber, fileRecord->entryCounter);
154  fileRecord->coreFileName =
155  getCoreFileName(fileRecord->runNumber, fileRecord->lumiSection);
156  fileRecord->fileCounter = getFileCounter(fileRecord->coreFileName);
157  fileRecord->whyClosed = FilesMonitorCollection::FileRecord::notClosed;
158  fileRecord->isOpen = true;
159 
160  streamRecord_->incrementFileCount(fileRecord->lumiSection);
161 
162  return fileRecord;
163  }
std::string getCoreFileName(const uint32_t &runNumber, const uint32_t &lumiSection) const
const StatisticsReporterPtr statReporter_
boost::shared_ptr< FileRecord > FileRecordPtr
virtual std::string streamLabel() const =0
unsigned int getFileCounter(const std::string &coreFileName)
std::string getBaseFilePath(const uint32_t &runNumber, uint32_t fileCount) const
const StreamsMonitorCollection::StreamRecordPtr streamRecord_
const SharedResourcesPtr sharedResources_
virtual int stor::StreamHandler::getStreamMaxFileSize ( ) const
protectedpure virtual

Return the maximum file size for the stream in MB

Implemented in stor::FaultyEventStreamHandler, stor::EventStreamHandler, and stor::FRDStreamHandler.

Referenced by getMaxFileSize().

virtual FileHandlerPtr stor::StreamHandler::newFileHandler ( const I2OChain event)
protectedpure virtual

Return a new file handler for the provided event

Implemented in stor::FaultyEventStreamHandler, stor::EventStreamHandler, and stor::FRDStreamHandler.

Referenced by getFileHandler().

StreamHandler& stor::StreamHandler::operator= ( StreamHandler const &  )
private
virtual std::string stor::StreamHandler::streamLabel ( ) const
protectedpure virtual
void stor::StreamHandler::writeEvent ( const I2OChain event)

Write the event to the stream file

Definition at line 100 of file StreamHandler.cc.

References getFileHandler(), statReporter_, streamRecord_, and stor::I2OChain::totalDataSize().

101  {
103  handler->writeEvent(event);
104  streamRecord_->addSizeInBytes(event.totalDataSize());
105  statReporter_->getThroughputMonitorCollection().
106  addDiskWriteSample(event.totalDataSize());
107  }
const StatisticsReporterPtr statReporter_
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger but the state exists so we define the behavior If all triggers are the negative crieriion will lead to accepting the event(this again matches the behavior of"!*"before the partial wildcard feature was incorporated).The per-event"cost"of each negative criterion with multiple relevant triggers is about the same as!*was in the past
virtual FileHandlerPtr getFileHandler(const I2OChain &event)
const StreamsMonitorCollection::StreamRecordPtr streamRecord_
boost::shared_ptr< FileHandler > FileHandlerPtr
Definition: StreamHandler.h:71

Member Data Documentation

const DbFileHandlerPtr stor::StreamHandler::dbFileHandler_
protected
const DiskWritingParams stor::StreamHandler::diskWritingParams_
protected
FileHandlers stor::StreamHandler::fileHandlers_
protected
const SharedResourcesPtr stor::StreamHandler::sharedResources_
protected

Definition at line 140 of file StreamHandler.h.

Referenced by getNewFileRecord().

const StatisticsReporterPtr stor::StreamHandler::statReporter_
protected

Definition at line 141 of file StreamHandler.h.

Referenced by closeAllFiles(), getNewFileRecord(), and writeEvent().

const StreamsMonitorCollection::StreamRecordPtr stor::StreamHandler::streamRecord_
protected
CoreFileNamesMap stor::StreamHandler::usedCoreFileNames_
protected

Definition at line 150 of file StreamHandler.h.

Referenced by getFileCounter().