CMS 3D CMS Logo

edm::EventStreamService Class Reference

#include <EventFilter/StorageManager/interface/EventStreamService.h>

Inheritance diagram for edm::EventStreamService:

edm::StreamService

List of all members.

Public Member Functions

void closeTimedOutFiles ()
void closeTimedOutFiles (int lumi, double timeoutstart)
 EventStreamService (ParameterSet const &, InitMsgView const &)
bool nextEvent (const uint8 *const)
void report (std::ostream &os, int indentation) const
void stop ()
 ~EventStreamService ()

Private Member Functions

bool acceptEvent (EventMsgView const &)
bool checkEvent (boost::shared_ptr< FileRecord >, EventMsgView const &) const
boost::shared_ptr< FileRecordgenerateFileRecord ()
boost::shared_ptr< OutputServicegetOutputService (EventMsgView const &)
void initializeSelection (InitMsgView const &)
boost::shared_ptr< OutputServicenewOutputService ()
void saveInitMessage (InitMsgView const &)

Private Attributes

boost::shared_ptr
< edm::EventSelector
eventSelector_
std::vector< unsigned char > saved_initmsg_


Detailed Description

Definition at line 29 of file EventStreamService.h.


Constructor & Destructor Documentation

EventStreamService::EventStreamService ( ParameterSet const &  pset,
InitMsgView const &  view 
)

Definition at line 23 of file EventStreamService.cc.

References edm::StreamService::highWaterMark_, initializeSelection(), edm::StreamService::lumiSection_, edm::StreamService::lumiSectionTimeOut_, edm::StreamService::maxFileSizeInMB_, edm::StreamService::maxSize_, edm::StreamService::ntotal_, edm::StreamService::numberOfFileSystems_, edm::StreamService::parameterSet_, edm::StreamService::runNumber_, saveInitMessage(), and edm::StreamService::setStreamParameter().

00024 {
00025   parameterSet_ = pset;
00026   runNumber_ = 0;
00027   lumiSection_ = 0;
00028   numberOfFileSystems_ = 0;
00029   maxFileSizeInMB_ = 0;
00030   maxSize_ = 0;
00031   highWaterMark_ = 0;
00032   lumiSectionTimeOut_ = 0;
00033   ntotal_ = 0;
00034 
00035   saveInitMessage(view);
00036   initializeSelection(view);
00037   setStreamParameter();
00038 }

edm::EventStreamService::~EventStreamService (  )  [inline]

Definition at line 33 of file EventStreamService.h.

References stop().

00033 { stop(); }


Member Function Documentation

bool EventStreamService::acceptEvent ( EventMsgView const &  view  )  [private]

Definition at line 211 of file EventStreamService.cc.

References eventSelector_, EventMsgView::hltCount(), and EventMsgView::hltTriggerBits().

Referenced by nextEvent().

00212 {
00213   std::vector<unsigned char> hlt_out;
00214   hlt_out.resize(1 + (view.hltCount()-1)/4);
00215   view.hltTriggerBits(&hlt_out[0]);
00216   int num_paths = view.hltCount();
00217   bool rc = (eventSelector_->wantAll() || eventSelector_->acceptEvent(&hlt_out[0], num_paths));
00218   return rc;
00219 }

bool edm::EventStreamService::checkEvent ( boost::shared_ptr< FileRecord ,
EventMsgView const &   
) const [private]

Referenced by getOutputService().

void EventStreamService::closeTimedOutFiles (  )  [virtual]

Implements edm::StreamService.

Definition at line 128 of file EventStreamService.cc.

References fd, edm::StreamService::fillOutputSummaryClosed(), edm::StreamService::getCurrentTime(), it, edm::StreamService::lumiSectionTimeOut_, and edm::StreamService::outputMap_.

00129 {
00130   double currentTime = getCurrentTime();
00131   for (OutputMapIterator it = outputMap_.begin(); it != outputMap_.end(); ) {
00132      if (currentTime - it->second->lastEntry() > lumiSectionTimeOut_) {
00133         boost::shared_ptr<FileRecord> fd(it->first);
00134         outputMap_.erase(it++);
00135         fillOutputSummaryClosed(fd);
00136      } else {
00137         ++it;
00138      }
00139   }
00140 }

void EventStreamService::closeTimedOutFiles ( int  lumi,
double  timeoutstart 
) [virtual]

Implements edm::StreamService.

Definition at line 86 of file EventStreamService.cc.

References fd, edm::StreamService::fillOutputSummaryClosed(), edm::StreamService::getCurrentTime(), it, edm::StreamService::lumiSectionTimeOut_, and edm::StreamService::outputMap_.

00087 {
00088    // just mark what the code should have done
00089   for (OutputMapIterator it = outputMap_.begin(); it != outputMap_.end(); ) {
00090 
00091     int reason = lumi*100000 + it->first->lumiSection()*10;
00092 
00093     // do not touch files from current lumi section
00094     if (it->first->lumiSection() == lumi) {
00095       it->first->setWhyClosed(reason);
00096       ++it;
00097       continue;
00098     }
00099 
00100     if (it->first->lumiSection() < lumi-1) {
00101       it->first->setWhyClosed(reason+2);  // close old (N-2) lumi sections in any case
00102     } else if (timeoutdiff > lumiSectionTimeOut_) {
00103       it->first->setWhyClosed(reason+3);  // check if timeout reached for previous (N-1) lumi sections
00104     } else {
00105       it->first->setWhyClosed(reason+9);  // default value to catch race condition
00106     }
00107 
00108     ++it;
00109   }
00110 
00111   // code from rev 1.10 
00112   double currentTime = getCurrentTime();
00113   for (OutputMapIterator it = outputMap_.begin(); it != outputMap_.end(); ) {
00114      if (currentTime - it->second->lastEntry() > lumiSectionTimeOut_) {
00115         boost::shared_ptr<FileRecord> fd(it->first);
00116         outputMap_.erase(it++);
00117         fillOutputSummaryClosed(fd);
00118      } else 
00119         ++it;
00120   }
00121 }

boost::shared_ptr< FileRecord > EventStreamService::generateFileRecord (  )  [private]

Definition at line 246 of file EventStreamService.cc.

References fd, aod_PYTHIA_cfg::fileName, edm::StreamService::fileName_, edm::StreamService::filePath_, it, edm::StreamService::list_lock_, edm::StreamService::lumiSection_, edm::StreamService::ntotal_, edm::StreamService::numberOfFileSystems_, edm::StreamService::outputSummary_, edm::StreamService::runNumber_, edm::StreamService::setupLabel_, sl, edm::StreamService::sourceId_, and edm::StreamService::streamLabel_.

Referenced by newOutputService().

00247 {
00248   std::ostringstream oss;   
00249   oss    << setupLabel_ 
00250          << "." << setfill('0') << std::setw(8) << runNumber_ 
00251          << "." << setfill('0') << std::setw(4) << lumiSection_
00252          << "." << streamLabel_ 
00253          << "." << fileName_
00254          << "." << setfill('0') << std::setw(2) << sourceId_;
00255   string fileName = oss.str();
00256 
00257   shared_ptr<FileRecord> fd = shared_ptr<FileRecord>(new FileRecord(lumiSection_, fileName, filePath_));    
00258   ++ntotal_;
00259 
00260   boost::mutex::scoped_lock sl(list_lock_);
00261   map<string, int>::iterator it = outputSummary_.find(fileName);
00262   if(it==outputSummary_.end()) {
00263      outputSummary_.insert(std::pair<string, int>(fileName,0));
00264   } else {
00265      ++it->second;
00266      fd->setFileCounter(it->second);
00267   }
00268 
00269   if (numberOfFileSystems_ > 0)
00270     fd->fileSystem((runNumber_ + atoi(sourceId_.c_str()) + ntotal_) % numberOfFileSystems_); 
00271   
00272   fd->checkDirectories();
00273   fd->setRunNumber(runNumber_);
00274   fd->setStreamLabel(streamLabel_);
00275   fd->setSetupLabel(setupLabel_);
00276 
00277   // fd->report(cout, 12);
00278   return fd;
00279 }

boost::shared_ptr< OutputService > EventStreamService::getOutputService ( EventMsgView const &  view  )  [private]

Definition at line 147 of file EventStreamService.cc.

References checkEvent(), fd, edm::StreamService::fillOutputSummaryClosed(), it, edm::StreamService::lumiSection_, and edm::StreamService::outputMap_.

Referenced by nextEvent().

00148 {
00149   for (OutputMapIterator it = outputMap_.begin(); it != outputMap_.end(); ++it) {
00150        if (it->first->lumiSection() == lumiSection_) {
00151           if (checkEvent(it->first, view))
00152             return it->second;
00153           else { // close file since file size exceeded
00154             boost::shared_ptr<FileRecord> fd(it->first);
00155             fd->setWhyClosed(4);
00156             outputMap_.erase(it);
00157             fillOutputSummaryClosed(fd);
00158             break;
00159           }
00160       }
00161   }
00162   return newOutputService();
00163 }

void EventStreamService::initializeSelection ( InitMsgView const &  initView  )  [private]

Definition at line 200 of file EventStreamService.cc.

References eventSelector_, edm::ParameterSet::getUntrackedParameter(), InitMsgView::hltTriggerNames(), and edm::StreamService::parameterSet_.

Referenced by EventStreamService().

00201 {
00202   Strings triggerNameList;
00203   initView.hltTriggerNames(triggerNameList);
00204   eventSelector_.reset(new EventSelector(parameterSet_.getUntrackedParameter("SelectEvents", ParameterSet()),triggerNameList));
00205 }

boost::shared_ptr< OutputService > EventStreamService::newOutputService (  )  [private]

Definition at line 172 of file EventStreamService.cc.

References file, generateFileRecord(), edm::StreamService::outputMap_, and saved_initmsg_.

00173 {
00174   boost::shared_ptr<FileRecord> file = generateFileRecord();
00175   InitMsgView view(&saved_initmsg_[0]);
00176 
00177   shared_ptr<OutputService> outputService(new EventOutputService(file, view));
00178   outputMap_[file] = outputService;
00179 
00180   return outputService;
00181 }

bool EventStreamService::nextEvent ( const uint8 * const  bufPtr  )  [virtual]

Implements edm::StreamService.

Definition at line 44 of file EventStreamService.cc.

References acceptEvent(), getOutputService(), EventMsgView::lumi(), edm::StreamService::lumiSection_, EventMsgView::run(), and edm::StreamService::runNumber_.

00045 {
00046   ProgressMarker::instance()->processing(true);
00047   EventMsgView view((void *) bufPtr);
00048   if (!acceptEvent(view))
00049     {
00050       ProgressMarker::instance()->processing(false);
00051       return false;
00052     }
00053   runNumber_   = view.run();
00054   lumiSection_ = view.lumi();
00055 
00056   shared_ptr<OutputService> outputService = getOutputService(view);
00057   ProgressMarker::instance()->processing(false);
00058   
00059   ProgressMarker::instance()->writing(true);
00060   outputService->writeEvent(bufPtr);
00061   ProgressMarker::instance()->writing(false);
00062   return true;
00063 }

void edm::EventStreamService::report ( std::ostream &  os,
int  indentation 
) const [virtual]

Implements edm::StreamService.

void EventStreamService::saveInitMessage ( InitMsgView const &  view  )  [private]

Definition at line 225 of file EventStreamService.cc.

References edmNew::copy(), Capri::details::from(), saved_initmsg_, InitMsgView::size(), and InitMsgView::startAddress().

Referenced by EventStreamService().

00226 {
00227   saved_initmsg_.resize(view.size() + 20);
00228   unsigned char* pos  = &saved_initmsg_[0];
00229   unsigned char* from = view.startAddress();
00230   unsigned int dsize  = view.size();
00231   copy(from,from+dsize,pos);
00232 }

void EventStreamService::stop (  )  [virtual]

Implements edm::StreamService.

Definition at line 69 of file EventStreamService.cc.

References fd, edm::StreamService::fillOutputSummaryClosed(), it, and edm::StreamService::outputMap_.

Referenced by ~EventStreamService().

00070 {
00071   for (OutputMapIterator it = outputMap_.begin(); it != outputMap_.end(); ) {
00072     boost::shared_ptr<FileRecord> fd(it->first);
00073     fd->setWhyClosed(1);
00074     outputMap_.erase(it++);
00075     fillOutputSummaryClosed(fd);
00076   }
00077 }


Member Data Documentation

boost::shared_ptr<edm::EventSelector> edm::EventStreamService::eventSelector_ [private]

Definition at line 53 of file EventStreamService.h.

Referenced by acceptEvent(), and initializeSelection().

std::vector<unsigned char> edm::EventStreamService::saved_initmsg_ [private]

Definition at line 56 of file EventStreamService.h.

Referenced by newOutputService(), and saveInitMessage().


The documentation for this class was generated from the following files:
Generated on Tue Jun 9 18:41:01 2009 for CMSSW by  doxygen 1.5.4