CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
StreamHandler.cc
Go to the documentation of this file.
1 // $Id: StreamHandler.cc,v 1.25 2012/10/17 10:13:25 mommsen Exp $
3 
4 #include <sstream>
5 #include <iomanip>
6 
10 
11 #include "boost/bind.hpp"
12 
13 
14 namespace stor {
15 
17  const SharedResourcesPtr sharedResources,
18  const DbFileHandlerPtr dbFileHandler
19  ) :
20  sharedResources_(sharedResources),
21  statReporter_(sharedResources->statisticsReporter_),
22  streamRecord_(statReporter_->getStreamsMonitorCollection().getNewStreamRecord()),
23  diskWritingParams_(dbFileHandler->getDiskWritingParams()),
24  dbFileHandler_(dbFileHandler)
25  {}
26 
27 
29  {
30  std::string errorMsg = "Failed to close all files for stream " + streamLabel() + ": ";
31 
32  for (FileHandlers::const_iterator it = fileHandlers_.begin(),
33  itEnd = fileHandlers_.end(); it != itEnd; ++it)
34  {
35  try
36  {
38  }
39  catch(xcept::Exception& e)
40  {
41  XCEPT_DECLARE_NESTED( stor::exception::DiskWriting,
42  sentinelException, errorMsg, e );
43  sharedResources_->alarmHandler_->
44  notifySentinel(AlarmHandler::ERROR, sentinelException);
45  }
46  catch(std::exception &e)
47  {
48  errorMsg += e.what();
49  XCEPT_DECLARE( stor::exception::DiskWriting,
50  sentinelException, errorMsg );
51  sharedResources_->alarmHandler_->
52  notifySentinel(AlarmHandler::ERROR, sentinelException);
53  }
54  catch(...)
55  {
56  errorMsg += "Unknown exception";
57  XCEPT_DECLARE( stor::exception::DiskWriting,
58  sentinelException, errorMsg );
59  sharedResources_->alarmHandler_->
60  notifySentinel(AlarmHandler::ERROR, sentinelException);
61  }
62  }
63  fileHandlers_.clear();
64  }
65 
66 
68  {
69  fileHandlers_.erase(
70  std::remove_if(fileHandlers_.begin(),
71  fileHandlers_.end(),
72  boost::bind(&FileHandler::tooOld,
73  _1, currentTime)),
74  fileHandlers_.end());
75  }
76 
77 
79  (
80  const uint32_t& lumiSection,
81  std::string& str
82  )
83  {
84  fileHandlers_.erase(
85  std::remove_if(fileHandlers_.begin(),
86  fileHandlers_.end(),
87  boost::bind(&FileHandler::isFromLumiSection,
88  _1, lumiSection)),
89  fileHandlers_.end());
90 
91  return streamRecord_->reportLumiSectionInfo(lumiSection, str);
92  }
93 
94 
96  {
97  FileHandlerPtr handler = getFileHandler(event);
98  handler->writeEvent(event);
99  streamRecord_->addSizeInBytes(event.totalDataSize());
100  statReporter_->getThroughputMonitorCollection().
101  addDiskWriteSample(event.totalDataSize());
102  }
103 
104 
106  {
107  for (
108  FileHandlers::iterator it = fileHandlers_.begin(), itEnd = fileHandlers_.end();
109  it != itEnd;
110  ++it
111  )
112  {
113  if ( (*it)->lumiSection() == event.lumiSection() )
114  {
115  if ( (*it)->tooLarge(event.totalDataSize()) )
116  {
117  fileHandlers_.erase(it);
118  break;
119  }
120  else
121  {
122  return (*it);
123  }
124  }
125  }
126  return newFileHandler(event);
127  }
128 
129 
132  {
134  statReporter_->getFilesMonitorCollection().getNewFileRecord();
135 
136  try
137  {
138  fileRecord->runNumber = event.runNumber();
139  fileRecord->lumiSection = event.lumiSection();
140  }
141  catch(stor::exception::IncompleteEventMessage &e)
142  {
143  fileRecord->runNumber = sharedResources_->configuration_->getRunNumber();
144  fileRecord->lumiSection = 0;
145  }
146  fileRecord->streamLabel = streamLabel();
147  fileRecord->baseFilePath =
148  getBaseFilePath(fileRecord->runNumber, fileRecord->entryCounter);
149  fileRecord->coreFileName =
150  getCoreFileName(fileRecord->runNumber, fileRecord->lumiSection);
151  fileRecord->fileCounter = getFileCounter(fileRecord->coreFileName);
152  fileRecord->whyClosed = FilesMonitorCollection::FileRecord::notClosed;
153  fileRecord->isOpen = true;
154 
155  return fileRecord;
156  }
157 
158 
160  (
161  const uint32_t& runNumber,
162  uint32_t fileCount
163  ) const
164  {
165  return diskWritingParams_.filePath_ + getFileSystem(runNumber, fileCount);
166  }
167 
168 
170  (
171  const uint32_t& runNumber,
172  uint32_t fileCount
173  ) const
174  {
175  // if the number of logical disks is not specified, don't
176  // add a file system subdir to the path
177  if (diskWritingParams_.nLogicalDisk_ <= 0) {return "";}
178 
179  unsigned int fileSystemNumber =
180  (runNumber
181  + atoi(diskWritingParams_.smInstanceString_.c_str())
182  + fileCount)
183  % diskWritingParams_.nLogicalDisk_;
184 
185  std::ostringstream fileSystem;
186  fileSystem << "/" << std::setfill('0') << std::setw(2) << fileSystemNumber;
187 
188  return fileSystem.str();
189  }
190 
191 
193  (
194  const uint32_t& runNumber,
195  const uint32_t& lumiSection
196  ) const
197  {
198  std::ostringstream coreFileName;
199  coreFileName << diskWritingParams_.setupLabel_
200  << "." << std::setfill('0') << std::setw(8) << runNumber
201  << "." << std::setfill('0') << std::setw(4) << lumiSection
202  << "." << streamLabel()
203  << "." << diskWritingParams_.fileName_
204  << "." << std::setfill('0') << std::setw(2) << diskWritingParams_.smInstanceString_;
205 
206  return coreFileName.str();
207  }
208 
209 
210  unsigned int StreamHandler::getFileCounter(const std::string& coreFileName)
211  {
212  CoreFileNamesMap::iterator pos = usedCoreFileNames_.find(coreFileName);
213  if (pos == usedCoreFileNames_.end())
214  {
215  usedCoreFileNames_.insert(pos, std::make_pair(coreFileName, 0));
216  return 0;
217  }
218  else
219  {
220  ++(pos->second);
221  return pos->second;
222  }
223  }
224 
225 
226  unsigned long long StreamHandler::getMaxFileSize() const
227  {
228  const unsigned long long maxFileSizeMB =
231 
232  return ( maxFileSizeMB * 1024 * 1024 );
233  }
234 
235 } // namespace stor
236 
CoreFileNamesMap usedCoreFileNames_
std::string getCoreFileName(const uint32_t &runNumber, const uint32_t &lumiSection) const
virtual int getStreamMaxFileSize() const =0
const StatisticsReporterPtr statReporter_
boost::shared_ptr< SharedResources > SharedResourcesPtr
StreamHandler(const SharedResourcesPtr, const DbFileHandlerPtr)
boost::shared_ptr< FileRecord > FileRecordPtr
unsigned long long getMaxFileSize() const
virtual std::string streamLabel() const =0
unsigned int getFileCounter(const std::string &coreFileName)
virtual FileHandlerPtr newFileHandler(const I2OChain &event)=0
boost::shared_ptr< DbFileHandler > DbFileHandlerPtr
Definition: DbFileHandler.h:71
std::string getBaseFilePath(const uint32_t &runNumber, uint32_t fileCount) const
boost::posix_time::ptime TimePoint_t
Definition: Utils.h:35
void closeTimedOutFiles(utils::TimePoint_t currentTime=utils::getCurrentTime())
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger but the state exists so we define the behavior If all triggers are the negative crieriion will lead to accepting the event(this again matches the behavior of"!*"before the partial wildcard feature was incorporated).The per-event"cost"of each negative criterion with multiple relevant triggers is about the same as!*was in the past
void writeEvent(const I2OChain &event)
virtual FileHandlerPtr getFileHandler(const I2OChain &event)
FilesMonitorCollection::FileRecordPtr getNewFileRecord(const I2OChain &event)
bool tooOld(const utils::TimePoint_t currentTime=utils::getCurrentTime())
Definition: FileHandler.cc:129
const StreamsMonitorCollection::StreamRecordPtr streamRecord_
unsigned long totalDataSize() const
Definition: I2OChain.cc:432
FileHandlers fileHandlers_
const DiskWritingParams & diskWritingParams_
boost::shared_ptr< FileHandler > FileHandlerPtr
Definition: StreamHandler.h:67
const SharedResourcesPtr sharedResources_
bool closeFilesForLumiSection(const uint32_t &lumiSection, std::string &)
bool isFromLumiSection(const uint32_t lumiSection)
Definition: FileHandler.cc:144
std::string getFileSystem(const uint32_t &runNumber, uint32_t fileCount) const