CMS 3D CMS Logo

ServiceManager.cc

Go to the documentation of this file.
00001 // $Id: ServiceManager.cc,v 1.19 2008/12/19 23:34:20 biery Exp $
00002 
00003 #include <EventFilter/StorageManager/interface/ServiceManager.h>
00004 #include "EventFilter/StorageManager/interface/Configurator.h"
00005 #include <EventFilter/StorageManager/interface/EventStreamService.h>
00006 #include <EventFilter/StorageManager/interface/FRDStreamService.h>
00007 #include "FWCore/Framework/interface/EventSelector.h"
00008 #include "FWCore/ParameterSet/interface/PythonProcessDesc.h"
00009 #include <FWCore/Utilities/interface/Exception.h>
00010 #include <typeinfo>
00011 
00012 using namespace std;
00013 using namespace edm;
00014 using boost::shared_ptr;
00015 
00016 
00017 ServiceManager::ServiceManager(const std::string& config):
00018   outModPSets_(0),
00019   managedOutputs_(0),
00020   psetHLTOutputLabels_(0),
00021   outputModuleIds_(0),
00022   storedEvents_(0),
00023   currentlumi_(0),
00024   timeouttime_(0),
00025   lasttimechecked_(0),
00026   errorStreamPSetIndex_(-1),
00027   errorStreamCreated_(false),
00028   samples_(1000),
00029   period4samples_(10)
00030 {
00031   storedNames_.clear();
00032   collectStreamerPSets(config);
00033   pmeter_ = new stor::SMPerformanceMeter();
00034   pmeter_->init(samples_, period4samples_);
00035 } 
00036 
00037 
00038 ServiceManager::~ServiceManager()
00039 { 
00040   managedOutputs_.clear();
00041   outputModuleIds_.clear();
00042   storedEvents_.clear();
00043   storedNames_.clear();
00044   delete pmeter_;
00045 }
00046 
00047 
00048 void ServiceManager::start()
00049 {
00050   psetHLTOutputLabels_.clear();
00051   for (unsigned int idx = 0; idx < outModPSets_.size(); idx++) {
00052     psetHLTOutputLabels_.push_back(std::string());  // empty string
00053   }
00054 
00055   managedOutputs_.clear();
00056   outputModuleIds_.clear();
00057   storedEvents_.clear();
00058   storedNames_.clear();
00059 
00060   currentlumi_ = 0;
00061   timeouttime_ = 0;
00062   lasttimechecked_ = 0;
00063   errorStreamCreated_ = false;
00064   pmeter_->init(samples_, period4samples_);
00065 }
00066 
00067 
00068 void ServiceManager::stop()
00069 {
00070   for(StreamsIterator  it = managedOutputs_.begin(), itEnd = managedOutputs_.end();
00071       it != itEnd; ++it) {
00072       (*it)->stop();
00073   }
00074 }
00075 
00076 
00077 void ServiceManager::manageInitMsg(std::string catalog, uint32 disks, std::string sourceId, InitMsgView& view, stor::InitMsgCollection& initMsgCollection)
00078 {
00079   boost::shared_ptr<stor::Parameter> smParameter_ = stor::Configurator::instance()->getParameter();
00080   std::string inputOMLabel = view.outputModuleLabel();
00081   int psetIdx = -1;
00082   for(std::vector<ParameterSet>::iterator it = outModPSets_.begin(), itEnd = outModPSets_.end();
00083       it != itEnd; ++it) {
00084     ++psetIdx;
00085     bool createStreamNow = false;
00086 
00087     // if this ParameterSet corresponds to the error stream, skip over it
00088     // since the error stream doesn't use the INIT messages.
00089     if (psetIdx == errorStreamPSetIndex_) continue;
00090 
00091     // test if this INIT message is the right one for this output module
00092     // (that is, whether the HLT output module specified in the 
00093     // SM output module SelectHLTOutput parameter matches the HLT output
00094     // module in the INIT message)
00095 
00096     // fetch the SelectHLTOutput parameter from the SM output PSet
00097     std::string requestedOMLabel =
00098       it->getUntrackedParameter<std::string>("SelectHLTOutput", std::string());
00099 
00100     // if the SM output PSet didn't specify an HLT output module...
00101     //
00102     // (Note that the SelectHLTOutput parameter is optional.  If it is not
00103     // specified, we create the stream using the first INIT message that
00104     // we receive.  However, if we get multiple, different, INIT messages,
00105     // we complain loudly.
00106     // By allowing it to be optional, though, we provide some level
00107     // of backward compatibility - setups that only have one HLT output
00108     // module aren't forced to add this parameter.)
00109     if (requestedOMLabel.empty()) {
00110 
00111       // if we haven't yet created the stream object, go ahead and do
00112       // that using this INIT message
00113       if (psetHLTOutputLabels_[psetIdx].empty()) {
00114         createStreamNow = true;
00115       }
00116 
00117       // if we already created the stream object and this is a different
00118       // INIT message than what we used to create it, we need to complain
00119       // because SM output PSets are required to have a SelectHLTOutput
00120       // parameter in the presence of multiple INIT messages (multiple
00121       // HLT output modules)
00122       else if (inputOMLabel != psetHLTOutputLabels_[psetIdx]) {
00123         std::string errorString;
00124         errorString.append("ERROR: The configuration for Stream ");
00125         errorString.append((*it).getParameter<string> ("streamLabel"));
00126         errorString.append(" does not specify an HLT output module.\n");
00127         errorString.append("Please specify one of the HLT output modules ");
00128         errorString.append("listed below as the SelectHLTOutput parameter ");
00129         errorString.append("in the EventStreamFileWriter configuration ");
00130         errorString.append("for Stream ");
00131         errorString.append((*it).getParameter<string> ("streamLabel"));
00132         errorString.append(".\n");
00133         errorString.append(initMsgCollection.getSelectionHelpString());
00134         errorString.append("\n");
00135         throw cms::Exception("ServiceManager","manageInitMsg")
00136           << errorString << std::endl;
00137       }
00138     }
00139 
00140     // if the SM output PSet did specify an HLT output module...
00141     else {
00142 
00143       // if the HLT output module labels match...
00144       if (inputOMLabel == requestedOMLabel) {
00145 
00146         // if we haven't yet created the stream object, go ahead and do
00147         // that using this INIT message
00148         if (psetHLTOutputLabels_[psetIdx].empty()) {
00149           createStreamNow = true;
00150         }
00151 
00152         // if we already created the stream object, we could complain,
00153         // but won't (for now) so that this method can support multiple
00154         // calls with the same INIT message
00155         else {}
00156       }
00157 
00158       // if the HLT output module labels do not match, do nothing
00159       else {}
00160     }
00161 
00162     if (createStreamNow) {
00163       shared_ptr<StreamService> stream = shared_ptr<StreamService>(new EventStreamService((*it),view));
00164       stream->setCatalog(catalog);
00165       stream->setNumberOfFileSystems(disks);
00166       stream->setSourceId(sourceId);
00167       stream->setFileName(smParameter_ -> fileName());
00168       stream->setFilePath(smParameter_ -> filePath());
00169       stream->setMaxFileSize(smParameter_ -> maxFileSize());
00170       stream->setSetupLabel(smParameter_ -> setupLabel());
00171       stream->setHighWaterMark(smParameter_ -> highWaterMark());
00172       stream->setLumiSectionTimeOut(smParameter_ -> lumiSectionTimeOut());
00173       managedOutputs_.push_back(stream);
00174       outputModuleIds_.push_back(view.outputModuleId());
00175       storedEvents_.push_back(0);
00176       storedNames_.push_back(stream->getStreamLabel());
00177       stream->report(cout,3);
00178 
00179       psetHLTOutputLabels_[psetIdx] = inputOMLabel;
00180     }
00181   }
00182 }
00183 
00184 void ServiceManager::manageEventMsg(EventMsgView& msg)
00185 {
00186   int outputIdx = -1;
00187   for(StreamsIterator it = managedOutputs_.begin(), itEnd = managedOutputs_.end(); it != itEnd; ++it) {
00188     ++outputIdx;
00189     if (msg.outModId() != outputModuleIds_[outputIdx])
00190       continue;
00191 
00192     bool thisEventAccepted = (*it)->nextEvent(msg.startAddress());
00193     if (!thisEventAccepted)
00194       continue;
00195 
00196     ++storedEvents_[outputIdx];
00197     pmeter_->addSample(msg.size());
00198     if ((*it)->lumiSection() > currentlumi_) {
00199       currentlumi_ = (*it)->lumiSection();
00200       timeouttime_ = (*it)->getCurrentTime();
00201     }
00202   }
00203 }
00204 
00205 void ServiceManager::manageErrorEventMsg(std::string catalog, uint32 disks, std::string sourceId, FRDEventMsgView& msg)
00206 {
00207   // if no error stream was configured, we can exit early
00208   if (errorStreamPSetIndex_ < 0) return;
00209 
00210   // create the error stream, if needed
00211   if (! errorStreamCreated_) {
00212     ParameterSet errorStreamPSet = outModPSets_.at(errorStreamPSetIndex_);
00213     boost::shared_ptr<stor::Parameter> smParameter_ = stor::Configurator::instance()->getParameter();
00214 
00215     shared_ptr<StreamService> stream =
00216       shared_ptr<StreamService>(new FRDStreamService(errorStreamPSet));
00217     stream->setCatalog(catalog);
00218     stream->setNumberOfFileSystems(disks);
00219     stream->setSourceId(sourceId);
00220     stream->setFileName(smParameter_ -> fileName());
00221     stream->setFilePath(smParameter_ -> filePath());
00222     stream->setMaxFileSize(smParameter_ -> maxFileSize());
00223     stream->setSetupLabel(smParameter_ -> setupLabel());
00224     stream->setHighWaterMark(smParameter_ -> highWaterMark());
00225     stream->setLumiSectionTimeOut(smParameter_ -> lumiSectionTimeOut());
00226     managedOutputs_.push_back(stream);
00227     outputModuleIds_.push_back(0xffffffff);
00228     storedEvents_.push_back(0);
00229     storedNames_.push_back(stream->getStreamLabel());
00230     stream->report(cout,3);
00231 
00232     psetHLTOutputLabels_[errorStreamPSetIndex_] = "ResourceBroker Error Output";
00233 
00234     errorStreamCreated_ = true;
00235   }
00236 
00237   // process the event
00238   int outputIdx = -1;
00239   for(StreamsIterator strIter = managedOutputs_.begin(), strIterEnd = managedOutputs_.end(); strIter != strIterEnd; ++strIter) {
00240     ++outputIdx;
00241     std::string streamClassName = typeid(*(*strIter)).name();
00242     if (streamClassName.find("FRDStreamService", 0) == string::npos)
00243       continue;
00244 
00245     bool thisEventAccepted = (*strIter)->nextEvent(msg.startAddress());
00246     if (!thisEventAccepted)
00247       continue;
00248 
00249     ++storedEvents_[outputIdx];
00250 
00251     // for now, we don't have any lumi section information in the
00252     // FRDEvent messages, so we don't try to do any lumi-boundary processing
00253   }
00254 }
00255 
00256 
00257 void ServiceManager::closeFilesIfNeeded()
00258 {
00259   StreamsIterator itBeg = managedOutputs_.begin();
00260   StreamsIterator itEnd = managedOutputs_.end();
00261   for(StreamsIterator it = itBeg; it != itEnd; ++it) {
00262     (*it)->closeTimedOutFiles();
00263   }
00264 }
00265 
00266 //
00267 // *** get all files from all streams
00268 //
00269 std::list<std::string>& ServiceManager::get_filelist() 
00270 { 
00271   filelist_.clear();
00272   for(StreamsIterator it = managedOutputs_.begin(), itEnd = managedOutputs_.end();
00273       it != itEnd; ++it) {
00274       std::list<std::string> sub_list = (*it)->getFileList();
00275       if(sub_list.size() > 0)
00276         filelist_.insert(filelist_.end(), sub_list.begin(), sub_list.end());
00277   } 
00278   return filelist_; 
00279 }
00280 
00281 
00282 //
00283 // *** get all current files from all streams
00284 //
00285 std::list<std::string>& ServiceManager::get_currfiles()
00286 { 
00287   currfiles_.clear();
00288   for(StreamsIterator it = managedOutputs_.begin(), itEnd = managedOutputs_.end();
00289       it != itEnd; ++it) {
00290       std::list<std::string> sub_list = (*it)->getCurrentFileList();
00291       if(sub_list.size() > 0)
00292         currfiles_.insert(currfiles_.end(), sub_list.begin(), sub_list.end());
00293   }
00294   return currfiles_;  
00295 }
00296 
00297 //
00298 std::vector<uint32>& ServiceManager::get_storedEvents()
00299 { 
00300   return storedEvents_;  
00301 }
00302 std::vector<std::string>& ServiceManager::get_storedNames()
00303 { 
00304   return storedNames_;  
00305 }
00306 
00310 std::map<std::string, Strings> ServiceManager::getStreamSelectionTable()
00311 {
00312   std::map<std::string, Strings> selTable;
00313   int psetIdx = -1;
00314   for(std::vector<ParameterSet>::iterator it = outModPSets_.begin();
00315       it != outModPSets_.end(); ++it) {
00316     ++psetIdx;
00317     if (psetIdx == errorStreamPSetIndex_) continue;
00318 
00319     std::string streamLabel = it->getParameter<string> ("streamLabel");
00320     if (streamLabel.size() > 0) {
00321       selTable[streamLabel] = EventSelector::getEventSelectionVString(*it);
00322     }
00323   }
00324   return selTable;
00325 }
00326 
00327 //
00328 // *** wrote similar example code in IOPool/Streamer/test/ParamSetWalker_t.cpp 
00329 // *** this method is diluted version of same code.
00330 // *** if more items needs to be extracted for config, refer to example code
00331 //
00332 void ServiceManager::collectStreamerPSets(const std::string& config)
00333 {
00334 
00335      try{
00336        
00337        PythonProcessDesc py_pdesc(config.c_str());
00338        boost::shared_ptr<ProcessDesc> pdesc = py_pdesc.processDesc();
00339 
00340        boost::shared_ptr<ParameterSet> procPset = pdesc->getProcessPSet();
00341        
00342         ParameterSet allTrigPaths = procPset->
00343          getUntrackedParameter<ParameterSet>("@trigger_paths");
00344        
00345        if (allTrigPaths.empty())
00346          throw cms::Exception("collectStreamerPSets","ServiceManager")
00347            << "No Trigger or End Path Found in the Config File" <<endl;
00348        
00349        std::vector<std::string> allEndPaths = 
00350          procPset->getParameter<std::vector<std::string> >("@end_paths");
00351        
00352        if (allEndPaths.empty())
00353          throw cms::Exception("collectStreamerPSets","ServiceManager")
00354            << "No End Path Found in the Config File" <<endl;
00355        
00356        for(std::vector<std::string>::iterator it = allEndPaths.begin(), itEnd = allEndPaths.end();
00357            it != itEnd;
00358            ++it) {
00359            std::vector<std::string> anEndPath = procPset->getParameter<std::vector<std::string> >((*it));
00360            for(std::vector<std::string>::iterator i = anEndPath.begin(), iEnd = anEndPath.end();
00361                i != iEnd; ++i) {
00362                ParameterSet aModInEndPathPset = 
00363                  procPset->getParameter<ParameterSet>((*i));
00364                if (aModInEndPathPset.empty())
00365                  throw cms::Exception("collectStreamerPSets","ServiceManager")
00366                    << "Empty End Path Found in the Config File" <<endl;
00367               
00368                std::string mod_type = aModInEndPathPset.getParameter<std::string> ("@module_type");
00369                if (mod_type == "EventStreamFileWriter") {
00370                  outModPSets_.push_back(aModInEndPathPset);
00371                  psetHLTOutputLabels_.push_back(std::string());  // empty string
00372                }
00373                else if (mod_type == "ErrorStreamFileWriter" ||
00374                         mod_type == "FRDStreamFileWriter") {
00375                  errorStreamPSetIndex_ = outModPSets_.size();
00376                  outModPSets_.push_back(aModInEndPathPset);
00377                  psetHLTOutputLabels_.push_back(std::string());  // empty string
00378                }
00379            }
00380        }
00381      } catch (cms::Exception & e) {
00382        std::cerr << "cms::Exception: " << e.explainSelf() << std::endl;
00383        std::cerr << "std::Exception: " << e.what() << std::endl;
00384        throw cms::Exception("collectStreamerPSets") << e.explainSelf() << std::endl;
00385      }
00386 }
00387 
00388 boost::shared_ptr<stor::SMOnlyStats> ServiceManager::get_stats()
00389 { 
00390 // Copy measurements for a different thread potentially
00391 // TODO create each time or use a data member?
00392     boost::shared_ptr<stor::SMOnlyStats> outstats(new stor::SMOnlyStats() );
00393 
00394     if ( pmeter_->getStats().shortTermCounter_->hasValidResult() )
00395     {
00396       stor::SMPerfStats stats = pmeter_->getStats();
00397 
00398       outstats->instantBandwidth_= stats.shortTermCounter_->getValueRate();
00399       outstats->instantRate_     = stats.shortTermCounter_->getSampleRate();
00400       outstats->instantLatency_  = 1000000.0 / outstats->instantRate_;
00401 
00402       double now = stor::ForeverCounter::getCurrentTime();
00403       outstats->totalSamples_    = stats.longTermCounter_->getSampleCount();
00404       outstats->duration_        = stats.longTermCounter_->getDuration(now);
00405       outstats->meanBandwidth_   = stats.longTermCounter_->getValueRate(now);
00406       outstats->meanRate_        = stats.longTermCounter_->getSampleRate(now);
00407       outstats->meanLatency_     = 1000000.0 / outstats->meanRate_;
00408 
00409       outstats->maxBandwidth_    = stats.maxBandwidth_;
00410       outstats->minBandwidth_    = stats.minBandwidth_;
00411     }
00412 
00413     // for time period bandwidth performance measurements
00414     if ( pmeter_->getStats().shortPeriodCounter_->hasValidResult() )
00415     {
00416       stor::SMPerfStats stats = pmeter_->getStats();
00417 
00418       outstats->instantBandwidth2_= stats.shortPeriodCounter_->getValueRate();
00419       outstats->instantRate2_     = stats.shortPeriodCounter_->getSampleRate();
00420       outstats->instantLatency2_  = 1000000.0 / outstats->instantRate2_;
00421 
00422       double now = stor::ForeverCounter::getCurrentTime();
00423       outstats->totalSamples2_    = stats.longTermCounter_->getSampleCount();
00424       outstats->duration2_        = stats.longTermCounter_->getDuration(now);
00425       outstats->meanBandwidth2_   = stats.longTermCounter_->getValueRate(now);
00426       outstats->meanRate2_        = stats.longTermCounter_->getSampleRate(now);
00427       outstats->meanLatency2_     = 1000000.0 / outstats->meanRate2_;
00428 
00429       outstats->maxBandwidth2_    = stats.maxBandwidth2_;
00430       outstats->minBandwidth2_    = stats.minBandwidth2_;
00431     }
00432     outstats->receivedVolume_ = pmeter_->totalvolumemb();
00433     outstats->samples_ = samples_;
00434     outstats->period4samples_ = period4samples_;
00435     return outstats;
00436 }

Generated on Tue Jun 9 17:34:57 2009 for CMSSW by  doxygen 1.5.4