#include <EventFilter/StorageManager/interface/EventStreamService.h>
Public Member Functions | |
void | closeTimedOutFiles () |
void | closeTimedOutFiles (int lumi, double timeoutstart) |
EventStreamService (ParameterSet const &, InitMsgView const &) | |
bool | nextEvent (const uint8 *const) |
void | report (std::ostream &os, int indentation) const |
void | stop () |
~EventStreamService () | |
Private Member Functions | |
bool | acceptEvent (EventMsgView const &) |
bool | checkEvent (boost::shared_ptr< FileRecord >, EventMsgView const &) const |
boost::shared_ptr< FileRecord > | generateFileRecord () |
boost::shared_ptr< OutputService > | getOutputService (EventMsgView const &) |
void | initializeSelection (InitMsgView const &) |
boost::shared_ptr< OutputService > | newOutputService () |
void | saveInitMessage (InitMsgView const &) |
Private Attributes | |
boost::shared_ptr < edm::EventSelector > | eventSelector_ |
std::vector< unsigned char > | saved_initmsg_ |
Definition at line 29 of file EventStreamService.h.
EventStreamService::EventStreamService | ( | ParameterSet const & | pset, | |
InitMsgView const & | view | |||
) |
Definition at line 23 of file EventStreamService.cc.
References edm::StreamService::highWaterMark_, initializeSelection(), edm::StreamService::lumiSection_, edm::StreamService::lumiSectionTimeOut_, edm::StreamService::maxFileSizeInMB_, edm::StreamService::maxSize_, edm::StreamService::ntotal_, edm::StreamService::numberOfFileSystems_, edm::StreamService::parameterSet_, edm::StreamService::runNumber_, saveInitMessage(), and edm::StreamService::setStreamParameter().
00024 { 00025 parameterSet_ = pset; 00026 runNumber_ = 0; 00027 lumiSection_ = 0; 00028 numberOfFileSystems_ = 0; 00029 maxFileSizeInMB_ = 0; 00030 maxSize_ = 0; 00031 highWaterMark_ = 0; 00032 lumiSectionTimeOut_ = 0; 00033 ntotal_ = 0; 00034 00035 saveInitMessage(view); 00036 initializeSelection(view); 00037 setStreamParameter(); 00038 }
edm::EventStreamService::~EventStreamService | ( | ) | [inline] |
bool EventStreamService::acceptEvent | ( | EventMsgView const & | view | ) | [private] |
Definition at line 211 of file EventStreamService.cc.
References eventSelector_, EventMsgView::hltCount(), and EventMsgView::hltTriggerBits().
Referenced by nextEvent().
00212 { 00213 std::vector<unsigned char> hlt_out; 00214 hlt_out.resize(1 + (view.hltCount()-1)/4); 00215 view.hltTriggerBits(&hlt_out[0]); 00216 int num_paths = view.hltCount(); 00217 bool rc = (eventSelector_->wantAll() || eventSelector_->acceptEvent(&hlt_out[0], num_paths)); 00218 return rc; 00219 }
bool edm::EventStreamService::checkEvent | ( | boost::shared_ptr< FileRecord > | , | |
EventMsgView const & | ||||
) | const [private] |
Referenced by getOutputService().
void EventStreamService::closeTimedOutFiles | ( | ) | [virtual] |
Implements edm::StreamService.
Definition at line 128 of file EventStreamService.cc.
References fd, edm::StreamService::fillOutputSummaryClosed(), edm::StreamService::getCurrentTime(), it, edm::StreamService::lumiSectionTimeOut_, and edm::StreamService::outputMap_.
00129 { 00130 double currentTime = getCurrentTime(); 00131 for (OutputMapIterator it = outputMap_.begin(); it != outputMap_.end(); ) { 00132 if (currentTime - it->second->lastEntry() > lumiSectionTimeOut_) { 00133 boost::shared_ptr<FileRecord> fd(it->first); 00134 outputMap_.erase(it++); 00135 fillOutputSummaryClosed(fd); 00136 } else { 00137 ++it; 00138 } 00139 } 00140 }
Implements edm::StreamService.
Definition at line 86 of file EventStreamService.cc.
References fd, edm::StreamService::fillOutputSummaryClosed(), edm::StreamService::getCurrentTime(), it, edm::StreamService::lumiSectionTimeOut_, and edm::StreamService::outputMap_.
00087 { 00088 // just mark what the code should have done 00089 for (OutputMapIterator it = outputMap_.begin(); it != outputMap_.end(); ) { 00090 00091 int reason = lumi*100000 + it->first->lumiSection()*10; 00092 00093 // do not touch files from current lumi section 00094 if (it->first->lumiSection() == lumi) { 00095 it->first->setWhyClosed(reason); 00096 ++it; 00097 continue; 00098 } 00099 00100 if (it->first->lumiSection() < lumi-1) { 00101 it->first->setWhyClosed(reason+2); // close old (N-2) lumi sections in any case 00102 } else if (timeoutdiff > lumiSectionTimeOut_) { 00103 it->first->setWhyClosed(reason+3); // check if timeout reached for previous (N-1) lumi sections 00104 } else { 00105 it->first->setWhyClosed(reason+9); // default value to catch race condition 00106 } 00107 00108 ++it; 00109 } 00110 00111 // code from rev 1.10 00112 double currentTime = getCurrentTime(); 00113 for (OutputMapIterator it = outputMap_.begin(); it != outputMap_.end(); ) { 00114 if (currentTime - it->second->lastEntry() > lumiSectionTimeOut_) { 00115 boost::shared_ptr<FileRecord> fd(it->first); 00116 outputMap_.erase(it++); 00117 fillOutputSummaryClosed(fd); 00118 } else 00119 ++it; 00120 } 00121 }
boost::shared_ptr< FileRecord > EventStreamService::generateFileRecord | ( | ) | [private] |
Definition at line 246 of file EventStreamService.cc.
References fd, aod_PYTHIA_cfg::fileName, edm::StreamService::fileName_, edm::StreamService::filePath_, it, edm::StreamService::list_lock_, edm::StreamService::lumiSection_, edm::StreamService::ntotal_, edm::StreamService::numberOfFileSystems_, edm::StreamService::outputSummary_, edm::StreamService::runNumber_, edm::StreamService::setupLabel_, sl, edm::StreamService::sourceId_, and edm::StreamService::streamLabel_.
Referenced by newOutputService().
00247 { 00248 std::ostringstream oss; 00249 oss << setupLabel_ 00250 << "." << setfill('0') << std::setw(8) << runNumber_ 00251 << "." << setfill('0') << std::setw(4) << lumiSection_ 00252 << "." << streamLabel_ 00253 << "." << fileName_ 00254 << "." << setfill('0') << std::setw(2) << sourceId_; 00255 string fileName = oss.str(); 00256 00257 shared_ptr<FileRecord> fd = shared_ptr<FileRecord>(new FileRecord(lumiSection_, fileName, filePath_)); 00258 ++ntotal_; 00259 00260 boost::mutex::scoped_lock sl(list_lock_); 00261 map<string, int>::iterator it = outputSummary_.find(fileName); 00262 if(it==outputSummary_.end()) { 00263 outputSummary_.insert(std::pair<string, int>(fileName,0)); 00264 } else { 00265 ++it->second; 00266 fd->setFileCounter(it->second); 00267 } 00268 00269 if (numberOfFileSystems_ > 0) 00270 fd->fileSystem((runNumber_ + atoi(sourceId_.c_str()) + ntotal_) % numberOfFileSystems_); 00271 00272 fd->checkDirectories(); 00273 fd->setRunNumber(runNumber_); 00274 fd->setStreamLabel(streamLabel_); 00275 fd->setSetupLabel(setupLabel_); 00276 00277 // fd->report(cout, 12); 00278 return fd; 00279 }
boost::shared_ptr< OutputService > EventStreamService::getOutputService | ( | EventMsgView const & | view | ) | [private] |
Definition at line 147 of file EventStreamService.cc.
References checkEvent(), fd, edm::StreamService::fillOutputSummaryClosed(), it, edm::StreamService::lumiSection_, and edm::StreamService::outputMap_.
Referenced by nextEvent().
00148 { 00149 for (OutputMapIterator it = outputMap_.begin(); it != outputMap_.end(); ++it) { 00150 if (it->first->lumiSection() == lumiSection_) { 00151 if (checkEvent(it->first, view)) 00152 return it->second; 00153 else { // close file since file size exceeded 00154 boost::shared_ptr<FileRecord> fd(it->first); 00155 fd->setWhyClosed(4); 00156 outputMap_.erase(it); 00157 fillOutputSummaryClosed(fd); 00158 break; 00159 } 00160 } 00161 } 00162 return newOutputService(); 00163 }
void EventStreamService::initializeSelection | ( | InitMsgView const & | initView | ) | [private] |
Definition at line 200 of file EventStreamService.cc.
References eventSelector_, edm::ParameterSet::getUntrackedParameter(), InitMsgView::hltTriggerNames(), and edm::StreamService::parameterSet_.
Referenced by EventStreamService().
00201 { 00202 Strings triggerNameList; 00203 initView.hltTriggerNames(triggerNameList); 00204 eventSelector_.reset(new EventSelector(parameterSet_.getUntrackedParameter("SelectEvents", ParameterSet()),triggerNameList)); 00205 }
boost::shared_ptr< OutputService > EventStreamService::newOutputService | ( | ) | [private] |
Definition at line 172 of file EventStreamService.cc.
References file, generateFileRecord(), edm::StreamService::outputMap_, and saved_initmsg_.
00173 { 00174 boost::shared_ptr<FileRecord> file = generateFileRecord(); 00175 InitMsgView view(&saved_initmsg_[0]); 00176 00177 shared_ptr<OutputService> outputService(new EventOutputService(file, view)); 00178 outputMap_[file] = outputService; 00179 00180 return outputService; 00181 }
Implements edm::StreamService.
Definition at line 44 of file EventStreamService.cc.
References acceptEvent(), getOutputService(), EventMsgView::lumi(), edm::StreamService::lumiSection_, EventMsgView::run(), and edm::StreamService::runNumber_.
00045 { 00046 ProgressMarker::instance()->processing(true); 00047 EventMsgView view((void *) bufPtr); 00048 if (!acceptEvent(view)) 00049 { 00050 ProgressMarker::instance()->processing(false); 00051 return false; 00052 } 00053 runNumber_ = view.run(); 00054 lumiSection_ = view.lumi(); 00055 00056 shared_ptr<OutputService> outputService = getOutputService(view); 00057 ProgressMarker::instance()->processing(false); 00058 00059 ProgressMarker::instance()->writing(true); 00060 outputService->writeEvent(bufPtr); 00061 ProgressMarker::instance()->writing(false); 00062 return true; 00063 }
Implements edm::StreamService.
void EventStreamService::saveInitMessage | ( | InitMsgView const & | view | ) | [private] |
Definition at line 225 of file EventStreamService.cc.
References edmNew::copy(), Capri::details::from(), saved_initmsg_, InitMsgView::size(), and InitMsgView::startAddress().
Referenced by EventStreamService().
00226 { 00227 saved_initmsg_.resize(view.size() + 20); 00228 unsigned char* pos = &saved_initmsg_[0]; 00229 unsigned char* from = view.startAddress(); 00230 unsigned int dsize = view.size(); 00231 copy(from,from+dsize,pos); 00232 }
void EventStreamService::stop | ( | ) | [virtual] |
Implements edm::StreamService.
Definition at line 69 of file EventStreamService.cc.
References fd, edm::StreamService::fillOutputSummaryClosed(), it, and edm::StreamService::outputMap_.
Referenced by ~EventStreamService().
00070 { 00071 for (OutputMapIterator it = outputMap_.begin(); it != outputMap_.end(); ) { 00072 boost::shared_ptr<FileRecord> fd(it->first); 00073 fd->setWhyClosed(1); 00074 outputMap_.erase(it++); 00075 fillOutputSummaryClosed(fd); 00076 } 00077 }
boost::shared_ptr<edm::EventSelector> edm::EventStreamService::eventSelector_ [private] |
Definition at line 53 of file EventStreamService.h.
Referenced by acceptEvent(), and initializeSelection().
std::vector<unsigned char> edm::EventStreamService::saved_initmsg_ [private] |
Definition at line 56 of file EventStreamService.h.
Referenced by newOutputService(), and saveInitMessage().