CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
StreamHandler.cc
Go to the documentation of this file.
1 // $Id: StreamHandler.cc,v 1.24 2012/04/04 12:17:04 mommsen Exp $
3 
4 #include <sstream>
5 #include <iomanip>
6 
10 
11 #include "boost/bind.hpp"
12 
13 
14 namespace stor {
15 
17  const SharedResourcesPtr sharedResources,
18  const DbFileHandlerPtr dbFileHandler
19  ) :
20  sharedResources_(sharedResources),
21  statReporter_(sharedResources->statisticsReporter_),
22  streamRecord_(statReporter_->getStreamsMonitorCollection().getNewStreamRecord()),
23  diskWritingParams_(dbFileHandler->getDiskWritingParams()),
24  dbFileHandler_(dbFileHandler)
25  {}
26 
27 
29  {
30  std::string errorMsg = "Failed to close all files for stream " + streamLabel() + ": ";
31 
32  for (FileHandlers::const_iterator it = fileHandlers_.begin(),
33  itEnd = fileHandlers_.end(); it != itEnd; ++it)
34  {
35  try
36  {
38  }
39  catch(xcept::Exception& e)
40  {
41  XCEPT_DECLARE_NESTED( stor::exception::DiskWriting,
42  sentinelException, errorMsg, e );
43  sharedResources_->alarmHandler_->
44  notifySentinel(AlarmHandler::ERROR, sentinelException);
45  }
46  catch(std::exception &e)
47  {
48  errorMsg += e.what();
49  XCEPT_DECLARE( stor::exception::DiskWriting,
50  sentinelException, errorMsg );
51  sharedResources_->alarmHandler_->
52  notifySentinel(AlarmHandler::ERROR, sentinelException);
53  }
54  catch(...)
55  {
56  errorMsg += "Unknown exception";
57  XCEPT_DECLARE( stor::exception::DiskWriting,
58  sentinelException, errorMsg );
59  sharedResources_->alarmHandler_->
60  notifySentinel(AlarmHandler::ERROR, sentinelException);
61  }
62  }
63  fileHandlers_.clear();
64  }
65 
66 
68  {
69  fileHandlers_.erase(
70  std::remove_if(fileHandlers_.begin(),
71  fileHandlers_.end(),
72  boost::bind(&FileHandler::tooOld,
73  _1, currentTime)),
74  fileHandlers_.end());
75  }
76 
77 
79  (
80  const uint32_t& lumiSection,
81  std::string& str
82  )
83  {
84  fileHandlers_.erase(
85  std::remove_if(fileHandlers_.begin(),
86  fileHandlers_.end(),
87  boost::bind(&FileHandler::isFromLumiSection,
88  _1, lumiSection)),
89  fileHandlers_.end());
90 
91  return streamRecord_->reportLumiSectionInfo(lumiSection, str);
92  }
93 
94 
96  {
97  FileHandlerPtr handler = getFileHandler(event);
98  handler->writeEvent(event);
99  streamRecord_->addSizeInBytes(event.totalDataSize());
100  statReporter_->getThroughputMonitorCollection().
101  addDiskWriteSample(event.totalDataSize());
102  }
103 
104 
106  {
107  for (
108  FileHandlers::iterator it = fileHandlers_.begin(), itEnd = fileHandlers_.end();
109  it != itEnd;
110  ++it
111  )
112  {
113  if ( (*it)->lumiSection() == event.lumiSection() )
114  {
115  if ( (*it)->tooLarge(event.totalDataSize()) )
116  {
117  fileHandlers_.erase(it);
118  break;
119  }
120  else
121  {
122  return (*it);
123  }
124  }
125  }
126  return newFileHandler(event);
127  }
128 
129 
132  {
134  statReporter_->getFilesMonitorCollection().getNewFileRecord();
135 
136  try
137  {
138  fileRecord->runNumber = event.runNumber();
139  fileRecord->lumiSection = event.lumiSection();
140  }
141  catch(stor::exception::IncompleteEventMessage &e)
142  {
143  fileRecord->runNumber = sharedResources_->configuration_->getRunNumber();
144  fileRecord->lumiSection = 0;
145  }
146  fileRecord->streamLabel = streamLabel();
147  fileRecord->baseFilePath =
148  getBaseFilePath(fileRecord->runNumber, fileRecord->entryCounter);
149  fileRecord->coreFileName =
150  getCoreFileName(fileRecord->runNumber, fileRecord->lumiSection);
151  fileRecord->fileCounter = getFileCounter(fileRecord->coreFileName);
152  fileRecord->whyClosed = FilesMonitorCollection::FileRecord::notClosed;
153  fileRecord->isOpen = true;
154 
155  streamRecord_->incrementFileCount(fileRecord->lumiSection);
156 
157  return fileRecord;
158  }
159 
160 
162  (
163  const uint32_t& runNumber,
164  uint32_t fileCount
165  ) const
166  {
167  return diskWritingParams_.filePath_ + getFileSystem(runNumber, fileCount);
168  }
169 
170 
171  std::string StreamHandler::getFileSystem
172  (
173  const uint32_t& runNumber,
174  uint32_t fileCount
175  ) const
176  {
177  // if the number of logical disks is not specified, don't
178  // add a file system subdir to the path
179  if (diskWritingParams_.nLogicalDisk_ <= 0) {return "";}
180 
181  unsigned int fileSystemNumber =
182  (runNumber
183  + atoi(diskWritingParams_.smInstanceString_.c_str())
184  + fileCount)
185  % diskWritingParams_.nLogicalDisk_;
186 
187  std::ostringstream fileSystem;
188  fileSystem << "/" << std::setfill('0') << std::setw(2) << fileSystemNumber;
189 
190  return fileSystem.str();
191  }
192 
193 
195  (
196  const uint32_t& runNumber,
197  const uint32_t& lumiSection
198  ) const
199  {
200  std::ostringstream coreFileName;
201  coreFileName << diskWritingParams_.setupLabel_
202  << "." << std::setfill('0') << std::setw(8) << runNumber
203  << "." << std::setfill('0') << std::setw(4) << lumiSection
204  << "." << streamLabel()
205  << "." << diskWritingParams_.fileName_
206  << "." << std::setfill('0') << std::setw(2) << diskWritingParams_.smInstanceString_;
207 
208  return coreFileName.str();
209  }
210 
211 
212  unsigned int StreamHandler::getFileCounter(const std::string& coreFileName)
213  {
214  CoreFileNamesMap::iterator pos = usedCoreFileNames_.find(coreFileName);
215  if (pos == usedCoreFileNames_.end())
216  {
217  usedCoreFileNames_.insert(pos, std::make_pair(coreFileName, 0));
218  return 0;
219  }
220  else
221  {
222  ++(pos->second);
223  return pos->second;
224  }
225  }
226 
227 
228  unsigned long long StreamHandler::getMaxFileSize() const
229  {
230  const unsigned long long maxFileSizeMB =
233 
234  return ( maxFileSizeMB * 1024 * 1024 );
235  }
236 
237 } // namespace stor
238 
CoreFileNamesMap usedCoreFileNames_
std::string getCoreFileName(const uint32_t &runNumber, const uint32_t &lumiSection) const
virtual int getStreamMaxFileSize() const =0
const StatisticsReporterPtr statReporter_
boost::shared_ptr< SharedResources > SharedResourcesPtr
StreamHandler(const SharedResourcesPtr, const DbFileHandlerPtr)
boost::shared_ptr< FileRecord > FileRecordPtr
unsigned long long getMaxFileSize() const
virtual std::string streamLabel() const =0
unsigned int getFileCounter(const std::string &coreFileName)
virtual FileHandlerPtr newFileHandler(const I2OChain &event)=0
boost::shared_ptr< DbFileHandler > DbFileHandlerPtr
Definition: DbFileHandler.h:71
std::string getBaseFilePath(const uint32_t &runNumber, uint32_t fileCount) const
boost::posix_time::ptime TimePoint_t
Definition: Utils.h:35
void closeTimedOutFiles(utils::TimePoint_t currentTime=utils::getCurrentTime())
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger but the state exists so we define the behavior If all triggers are the negative crieriion will lead to accepting the event(this again matches the behavior of"!*"before the partial wildcard feature was incorporated).The per-event"cost"of each negative criterion with multiple relevant triggers is about the same as!*was in the past
void writeEvent(const I2OChain &event)
virtual FileHandlerPtr getFileHandler(const I2OChain &event)
FilesMonitorCollection::FileRecordPtr getNewFileRecord(const I2OChain &event)
bool tooOld(const utils::TimePoint_t currentTime=utils::getCurrentTime())
Definition: FileHandler.cc:129
const StreamsMonitorCollection::StreamRecordPtr streamRecord_
unsigned long totalDataSize() const
Definition: I2OChain.cc:432
FileHandlers fileHandlers_
const DiskWritingParams & diskWritingParams_
boost::shared_ptr< FileHandler > FileHandlerPtr
Definition: StreamHandler.h:67
const SharedResourcesPtr sharedResources_
bool closeFilesForLumiSection(const uint32_t &lumiSection, std::string &)
bool isFromLumiSection(const uint32_t lumiSection)
Definition: FileHandler.cc:144
std::string getFileSystem(const uint32_t &runNumber, uint32_t fileCount) const