CMS 3D CMS Logo

EventStreamService.cc

Go to the documentation of this file.
00001 // $Id: EventStreamService.cc,v 1.9 2008/09/05 00:57:13 loizides Exp $
00002 
00003 #include <EventFilter/StorageManager/interface/EventStreamService.h>
00004 #include <EventFilter/StorageManager/interface/ProgressMarker.h>
00005 #include <EventFilter/StorageManager/interface/Parameter.h>
00006 #include "EventFilter/StorageManager/interface/Configurator.h"
00007 #include "EventFilter/StorageManager/interface/EventOutputService.h"  
00008 
00009 #include <iostream>
00010 #include <iomanip>
00011 #include <sys/stat.h>
00012 #include <sys/statfs.h>
00013 
00014 using namespace edm;
00015 using namespace std;
00016 using boost::shared_ptr;
00017 using stor::ProgressMarker;
00018 
00019 //
00020 // *** construct stream service from 
00021 // *** parameter set and init message
00022 //
00023 EventStreamService::EventStreamService(ParameterSet const& pset, InitMsgView const& view)
00024 {
00025   parameterSet_ = pset;
00026   runNumber_ = 0;
00027   lumiSection_ = 0;
00028   numberOfFileSystems_ = 0;
00029   maxFileSizeInMB_ = 0;
00030   maxSize_ = 0;
00031   highWaterMark_ = 0;
00032   lumiSectionTimeOut_ = 0;
00033   ntotal_ = 0;
00034 
00035   saveInitMessage(view);
00036   initializeSelection(view);
00037   setStreamParameter();
00038 }
00039 
00040 
00041 // 
00042 // *** event loop for stream service
00043 //
00044 bool EventStreamService::nextEvent(const uint8 * const bufPtr)
00045 {
00046   ProgressMarker::instance()->processing(true);
00047   EventMsgView view((void *) bufPtr);
00048   if (!acceptEvent(view))
00049     {
00050       ProgressMarker::instance()->processing(false);
00051       return false;
00052     }
00053   runNumber_   = view.run();
00054   lumiSection_ = view.lumi();
00055 
00056   shared_ptr<OutputService> outputService = getOutputService(view);
00057   ProgressMarker::instance()->processing(false);
00058   
00059   ProgressMarker::instance()->writing(true);
00060   outputService->writeEvent(bufPtr);
00061   ProgressMarker::instance()->writing(false);
00062   return true;
00063 }
00064 
00065 
00066 //
00067 // *** close all files on stop signal
00068 //
00069 void EventStreamService::stop()
00070 {
00071   for (OutputMapIterator it = outputMap_.begin(); it != outputMap_.end(); ) {
00072     boost::shared_ptr<FileRecord> fd(it->first);
00073     fd->setWhyClosed(1);
00074     outputMap_.erase(it++);
00075     fillOutputSummaryClosed(fd);
00076   }
00077 }
00078 
00079 
00080 // 
00081 // *** close all output service of the previous lumi-section 
00082 // *** when lumiSectionTimeOut seconds have passed since the
00083 // *** appearance of the new lumi section and make a record of the file
00084 // !!! Deprecated - use closeTimedOutFiles() instead !!!
00085 // 
00086 void EventStreamService::closeTimedOutFiles(int lumi, double timeoutdiff)
00087 {
00088    // just mark what the code should have done
00089   for (OutputMapIterator it = outputMap_.begin(); it != outputMap_.end(); ) {
00090 
00091     int reason = lumi*100000 + it->first->lumiSection()*10;
00092 
00093     // do not touch files from current lumi section
00094     if (it->first->lumiSection() == lumi) {
00095       it->first->setWhyClosed(reason);
00096       ++it;
00097       continue;
00098     }
00099 
00100     if (it->first->lumiSection() < lumi-1) {
00101       it->first->setWhyClosed(reason+2);  // close old (N-2) lumi sections in any case
00102     } else if (timeoutdiff > lumiSectionTimeOut_) {
00103       it->first->setWhyClosed(reason+3);  // check if timeout reached for previous (N-1) lumi sections
00104     } else {
00105       it->first->setWhyClosed(reason+9);  // default value to catch race condition
00106     }
00107 
00108     ++it;
00109   }
00110 
00111   // code from rev 1.10 
00112   double currentTime = getCurrentTime();
00113   for (OutputMapIterator it = outputMap_.begin(); it != outputMap_.end(); ) {
00114      if (currentTime - it->second->lastEntry() > lumiSectionTimeOut_) {
00115         boost::shared_ptr<FileRecord> fd(it->first);
00116         outputMap_.erase(it++);
00117         fillOutputSummaryClosed(fd);
00118      } else 
00119         ++it;
00120   }
00121 }
00122 
00123 
00124 // 
00125 // *** close all output service when lumiSectionTimeOut seconds have passed
00126 // *** since the most recent event was added
00127 // 
00128 void EventStreamService::closeTimedOutFiles()
00129 {
00130   double currentTime = getCurrentTime();
00131   for (OutputMapIterator it = outputMap_.begin(); it != outputMap_.end(); ) {
00132      if (currentTime - it->second->lastEntry() > lumiSectionTimeOut_) {
00133         boost::shared_ptr<FileRecord> fd(it->first);
00134         outputMap_.erase(it++);
00135         fillOutputSummaryClosed(fd);
00136      } else {
00137         ++it;
00138      }
00139   }
00140 }
00141 
00142 
00143 //
00144 // *** find output service in map or return a new one
00145 // *** rule: only one file for each lumi section is output map
00146 //
00147 boost::shared_ptr<OutputService> EventStreamService::getOutputService(EventMsgView const& view)
00148 {
00149   for (OutputMapIterator it = outputMap_.begin(); it != outputMap_.end(); ++it) {
00150        if (it->first->lumiSection() == lumiSection_) {
00151           if (checkEvent(it->first, view))
00152             return it->second;
00153           else { // close file since file size exceeded
00154             boost::shared_ptr<FileRecord> fd(it->first);
00155             fd->setWhyClosed(4);
00156             outputMap_.erase(it);
00157             fillOutputSummaryClosed(fd);
00158             break;
00159           }
00160       }
00161   }
00162   return newOutputService();
00163 }
00164 
00165 
00166 // 
00167 // *** generate file descriptor
00168 // *** generate output service
00169 // *** add ouput service to output map
00170 // *** add ouput service to output summary
00171 //
00172 boost::shared_ptr<OutputService> EventStreamService::newOutputService()
00173 {
00174   boost::shared_ptr<FileRecord> file = generateFileRecord();
00175   InitMsgView view(&saved_initmsg_[0]);
00176 
00177   shared_ptr<OutputService> outputService(new EventOutputService(file, view));
00178   outputMap_[file] = outputService;
00179 
00180   return outputService;
00181 }
00182 
00183 
00184 //
00185 // *** perform checks before writing the event
00186 // *** so far ... check the event will fit into the file 
00187 //
00188 bool EventStreamService::checkEvent(shared_ptr<FileRecord> file, EventMsgView const& view) const
00189 {
00190   if (file->fileSize() + static_cast<long long>(view.size()) > maxSize_ && file->events() > 0)
00191     return false;
00192 
00193   return true;
00194 }
00195 
00196 
00197 //
00198 // *** initialize stream selection
00199 // 
00200 void EventStreamService::initializeSelection(InitMsgView const& initView)
00201 {
00202   Strings triggerNameList;
00203   initView.hltTriggerNames(triggerNameList);
00204   eventSelector_.reset(new EventSelector(parameterSet_.getUntrackedParameter("SelectEvents", ParameterSet()),triggerNameList));
00205 }
00206 
00207 
00208 //
00209 // *** accept event according to their high level trigger bits
00210 //
00211 bool EventStreamService::acceptEvent(EventMsgView const& view) 
00212 {
00213   std::vector<unsigned char> hlt_out;
00214   hlt_out.resize(1 + (view.hltCount()-1)/4);
00215   view.hltTriggerBits(&hlt_out[0]);
00216   int num_paths = view.hltCount();
00217   bool rc = (eventSelector_->wantAll() || eventSelector_->acceptEvent(&hlt_out[0], num_paths));
00218   return rc;
00219 }
00220 
00221 
00222 //
00223 // *** save init message need to open new output service
00224 //
00225 void EventStreamService::saveInitMessage(InitMsgView const& view)
00226 {
00227   saved_initmsg_.resize(view.size() + 20);
00228   unsigned char* pos  = &saved_initmsg_[0];
00229   unsigned char* from = view.startAddress();
00230   unsigned int dsize  = view.size();
00231   copy(from,from+dsize,pos);
00232 }
00233 
00234 
00235 //
00236 // *** generate a unique file descriptor
00237 //
00238 //     The run number, stream name and storage manager instance have 
00239 //     to be part of the file name. I have added the lumi section, 
00240 //     but in any case we have to make sure that file names are
00241 //     unique. 
00242 //
00243 //     Keep a list of file names and check if file name 
00244 //     was not already used in this run. 
00245 //
00246 boost::shared_ptr<FileRecord> EventStreamService::generateFileRecord()
00247 {
00248   std::ostringstream oss;   
00249   oss    << setupLabel_ 
00250          << "." << setfill('0') << std::setw(8) << runNumber_ 
00251          << "." << setfill('0') << std::setw(4) << lumiSection_
00252          << "." << streamLabel_ 
00253          << "." << fileName_
00254          << "." << setfill('0') << std::setw(2) << sourceId_;
00255   string fileName = oss.str();
00256 
00257   shared_ptr<FileRecord> fd = shared_ptr<FileRecord>(new FileRecord(lumiSection_, fileName, filePath_));    
00258   ++ntotal_;
00259 
00260   boost::mutex::scoped_lock sl(list_lock_);
00261   map<string, int>::iterator it = outputSummary_.find(fileName);
00262   if(it==outputSummary_.end()) {
00263      outputSummary_.insert(std::pair<string, int>(fileName,0));
00264   } else {
00265      ++it->second;
00266      fd->setFileCounter(it->second);
00267   }
00268 
00269   if (numberOfFileSystems_ > 0)
00270     fd->fileSystem((runNumber_ + atoi(sourceId_.c_str()) + ntotal_) % numberOfFileSystems_); 
00271   
00272   fd->checkDirectories();
00273   fd->setRunNumber(runNumber_);
00274   fd->setStreamLabel(streamLabel_);
00275   fd->setSetupLabel(setupLabel_);
00276 
00277   // fd->report(cout, 12);
00278   return fd;
00279 }
00280 
00281 
00282 //
00283 // *** report the status of stream service
00284 //
00285 void EventStreamService::report(ostream &os, int indentation) const
00286 {
00287   string prefix(indentation, ' ');
00288   os << "\n";
00289   os << prefix << "------------- EventStreamService -------------\n";
00290   os << prefix << "fileName            " << fileName_              << "\n";
00291   os << prefix << "filePath            " << filePath_              << "\n";
00292   os << prefix << "sourceId            " << sourceId_              << "\n";
00293   os << prefix << "setupLabel          " << setupLabel_            << "\n";
00294   os << prefix << "streamLabel         " << streamLabel_           << "\n";
00295   os << prefix << "maxSize             " << maxSize_               << "\n";
00296   os << prefix << "highWaterMark       " << highWaterMark_         << "\n";
00297   os << prefix << "lumiSectionTimeOut  " << lumiSectionTimeOut_    << "\n";
00298   os << prefix << "no. active files    " << outputMap_.size()      << "\n";
00299   os << prefix << "no. files           " << outputSummary_.size()  << "\n";
00300   os << prefix << "-----------------------------------------\n";
00301 }

Generated on Tue Jun 9 17:34:56 2009 for CMSSW by  doxygen 1.5.4