00001
00002
00003 #include <EventFilter/StorageManager/interface/ServiceManager.h>
00004 #include "EventFilter/StorageManager/interface/Configurator.h"
00005 #include <EventFilter/StorageManager/interface/EventStreamService.h>
00006 #include <EventFilter/StorageManager/interface/FRDStreamService.h>
00007 #include "FWCore/Framework/interface/EventSelector.h"
00008 #include "FWCore/ParameterSet/interface/PythonProcessDesc.h"
00009 #include <FWCore/Utilities/interface/Exception.h>
00010 #include <typeinfo>
00011
00012 using namespace std;
00013 using namespace edm;
00014 using boost::shared_ptr;
00015
00016
00017 ServiceManager::ServiceManager(const std::string& config):
00018 outModPSets_(0),
00019 managedOutputs_(0),
00020 psetHLTOutputLabels_(0),
00021 outputModuleIds_(0),
00022 storedEvents_(0),
00023 currentlumi_(0),
00024 timeouttime_(0),
00025 lasttimechecked_(0),
00026 errorStreamPSetIndex_(-1),
00027 errorStreamCreated_(false),
00028 samples_(1000),
00029 period4samples_(10)
00030 {
00031 storedNames_.clear();
00032 collectStreamerPSets(config);
00033 pmeter_ = new stor::SMPerformanceMeter();
00034 pmeter_->init(samples_, period4samples_);
00035 }
00036
00037
00038 ServiceManager::~ServiceManager()
00039 {
00040 managedOutputs_.clear();
00041 outputModuleIds_.clear();
00042 storedEvents_.clear();
00043 storedNames_.clear();
00044 delete pmeter_;
00045 }
00046
00047
00048 void ServiceManager::start()
00049 {
00050 psetHLTOutputLabels_.clear();
00051 for (unsigned int idx = 0; idx < outModPSets_.size(); idx++) {
00052 psetHLTOutputLabels_.push_back(std::string());
00053 }
00054
00055 managedOutputs_.clear();
00056 outputModuleIds_.clear();
00057 storedEvents_.clear();
00058 storedNames_.clear();
00059
00060 currentlumi_ = 0;
00061 timeouttime_ = 0;
00062 lasttimechecked_ = 0;
00063 errorStreamCreated_ = false;
00064 pmeter_->init(samples_, period4samples_);
00065 }
00066
00067
00068 void ServiceManager::stop()
00069 {
00070 for(StreamsIterator it = managedOutputs_.begin(), itEnd = managedOutputs_.end();
00071 it != itEnd; ++it) {
00072 (*it)->stop();
00073 }
00074 }
00075
00076
00077 void ServiceManager::manageInitMsg(std::string catalog, uint32 disks, std::string sourceId, InitMsgView& view, stor::InitMsgCollection& initMsgCollection)
00078 {
00079 boost::shared_ptr<stor::Parameter> smParameter_ = stor::Configurator::instance()->getParameter();
00080 std::string inputOMLabel = view.outputModuleLabel();
00081 int psetIdx = -1;
00082 for(std::vector<ParameterSet>::iterator it = outModPSets_.begin(), itEnd = outModPSets_.end();
00083 it != itEnd; ++it) {
00084 ++psetIdx;
00085 bool createStreamNow = false;
00086
00087
00088
00089 if (psetIdx == errorStreamPSetIndex_) continue;
00090
00091
00092
00093
00094
00095
00096
00097 std::string requestedOMLabel =
00098 it->getUntrackedParameter<std::string>("SelectHLTOutput", std::string());
00099
00100
00101
00102
00103
00104
00105
00106
00107
00108
00109 if (requestedOMLabel.empty()) {
00110
00111
00112
00113 if (psetHLTOutputLabels_[psetIdx].empty()) {
00114 createStreamNow = true;
00115 }
00116
00117
00118
00119
00120
00121
00122 else if (inputOMLabel != psetHLTOutputLabels_[psetIdx]) {
00123 std::string errorString;
00124 errorString.append("ERROR: The configuration for Stream ");
00125 errorString.append((*it).getParameter<string> ("streamLabel"));
00126 errorString.append(" does not specify an HLT output module.\n");
00127 errorString.append("Please specify one of the HLT output modules ");
00128 errorString.append("listed below as the SelectHLTOutput parameter ");
00129 errorString.append("in the EventStreamFileWriter configuration ");
00130 errorString.append("for Stream ");
00131 errorString.append((*it).getParameter<string> ("streamLabel"));
00132 errorString.append(".\n");
00133 errorString.append(initMsgCollection.getSelectionHelpString());
00134 errorString.append("\n");
00135 throw cms::Exception("ServiceManager","manageInitMsg")
00136 << errorString << std::endl;
00137 }
00138 }
00139
00140
00141 else {
00142
00143
00144 if (inputOMLabel == requestedOMLabel) {
00145
00146
00147
00148 if (psetHLTOutputLabels_[psetIdx].empty()) {
00149 createStreamNow = true;
00150 }
00151
00152
00153
00154
00155 else {}
00156 }
00157
00158
00159 else {}
00160 }
00161
00162 if (createStreamNow) {
00163 shared_ptr<StreamService> stream = shared_ptr<StreamService>(new EventStreamService((*it),view));
00164 stream->setCatalog(catalog);
00165 stream->setNumberOfFileSystems(disks);
00166 stream->setSourceId(sourceId);
00167 stream->setFileName(smParameter_ -> fileName());
00168 stream->setFilePath(smParameter_ -> filePath());
00169 stream->setMaxFileSize(smParameter_ -> maxFileSize());
00170 stream->setSetupLabel(smParameter_ -> setupLabel());
00171 stream->setHighWaterMark(smParameter_ -> highWaterMark());
00172 stream->setLumiSectionTimeOut(smParameter_ -> lumiSectionTimeOut());
00173 managedOutputs_.push_back(stream);
00174 outputModuleIds_.push_back(view.outputModuleId());
00175 storedEvents_.push_back(0);
00176 storedNames_.push_back(stream->getStreamLabel());
00177 stream->report(cout,3);
00178
00179 psetHLTOutputLabels_[psetIdx] = inputOMLabel;
00180 }
00181 }
00182 }
00183
00184 void ServiceManager::manageEventMsg(EventMsgView& msg)
00185 {
00186 int outputIdx = -1;
00187 for(StreamsIterator it = managedOutputs_.begin(), itEnd = managedOutputs_.end(); it != itEnd; ++it) {
00188 ++outputIdx;
00189 if (msg.outModId() != outputModuleIds_[outputIdx])
00190 continue;
00191
00192 bool thisEventAccepted = (*it)->nextEvent(msg.startAddress());
00193 if (!thisEventAccepted)
00194 continue;
00195
00196 ++storedEvents_[outputIdx];
00197 pmeter_->addSample(msg.size());
00198 if ((*it)->lumiSection() > currentlumi_) {
00199 currentlumi_ = (*it)->lumiSection();
00200 timeouttime_ = (*it)->getCurrentTime();
00201 }
00202 }
00203 }
00204
00205 void ServiceManager::manageErrorEventMsg(std::string catalog, uint32 disks, std::string sourceId, FRDEventMsgView& msg)
00206 {
00207
00208 if (errorStreamPSetIndex_ < 0) return;
00209
00210
00211 if (! errorStreamCreated_) {
00212 ParameterSet errorStreamPSet = outModPSets_.at(errorStreamPSetIndex_);
00213 boost::shared_ptr<stor::Parameter> smParameter_ = stor::Configurator::instance()->getParameter();
00214
00215 shared_ptr<StreamService> stream =
00216 shared_ptr<StreamService>(new FRDStreamService(errorStreamPSet));
00217 stream->setCatalog(catalog);
00218 stream->setNumberOfFileSystems(disks);
00219 stream->setSourceId(sourceId);
00220 stream->setFileName(smParameter_ -> fileName());
00221 stream->setFilePath(smParameter_ -> filePath());
00222 stream->setMaxFileSize(smParameter_ -> maxFileSize());
00223 stream->setSetupLabel(smParameter_ -> setupLabel());
00224 stream->setHighWaterMark(smParameter_ -> highWaterMark());
00225 stream->setLumiSectionTimeOut(smParameter_ -> lumiSectionTimeOut());
00226 managedOutputs_.push_back(stream);
00227 outputModuleIds_.push_back(0xffffffff);
00228 storedEvents_.push_back(0);
00229 storedNames_.push_back(stream->getStreamLabel());
00230 stream->report(cout,3);
00231
00232 psetHLTOutputLabels_[errorStreamPSetIndex_] = "ResourceBroker Error Output";
00233
00234 errorStreamCreated_ = true;
00235 }
00236
00237
00238 int outputIdx = -1;
00239 for(StreamsIterator strIter = managedOutputs_.begin(), strIterEnd = managedOutputs_.end(); strIter != strIterEnd; ++strIter) {
00240 ++outputIdx;
00241 std::string streamClassName = typeid(*(*strIter)).name();
00242 if (streamClassName.find("FRDStreamService", 0) == string::npos)
00243 continue;
00244
00245 bool thisEventAccepted = (*strIter)->nextEvent(msg.startAddress());
00246 if (!thisEventAccepted)
00247 continue;
00248
00249 ++storedEvents_[outputIdx];
00250
00251
00252
00253 }
00254 }
00255
00256
00257 void ServiceManager::closeFilesIfNeeded()
00258 {
00259 StreamsIterator itBeg = managedOutputs_.begin();
00260 StreamsIterator itEnd = managedOutputs_.end();
00261 for(StreamsIterator it = itBeg; it != itEnd; ++it) {
00262 (*it)->closeTimedOutFiles();
00263 }
00264 }
00265
00266
00267
00268
00269 std::list<std::string>& ServiceManager::get_filelist()
00270 {
00271 filelist_.clear();
00272 for(StreamsIterator it = managedOutputs_.begin(), itEnd = managedOutputs_.end();
00273 it != itEnd; ++it) {
00274 std::list<std::string> sub_list = (*it)->getFileList();
00275 if(sub_list.size() > 0)
00276 filelist_.insert(filelist_.end(), sub_list.begin(), sub_list.end());
00277 }
00278 return filelist_;
00279 }
00280
00281
00282
00283
00284
00285 std::list<std::string>& ServiceManager::get_currfiles()
00286 {
00287 currfiles_.clear();
00288 for(StreamsIterator it = managedOutputs_.begin(), itEnd = managedOutputs_.end();
00289 it != itEnd; ++it) {
00290 std::list<std::string> sub_list = (*it)->getCurrentFileList();
00291 if(sub_list.size() > 0)
00292 currfiles_.insert(currfiles_.end(), sub_list.begin(), sub_list.end());
00293 }
00294 return currfiles_;
00295 }
00296
00297
00298 std::vector<uint32>& ServiceManager::get_storedEvents()
00299 {
00300 return storedEvents_;
00301 }
00302 std::vector<std::string>& ServiceManager::get_storedNames()
00303 {
00304 return storedNames_;
00305 }
00306
00310 std::map<std::string, Strings> ServiceManager::getStreamSelectionTable()
00311 {
00312 std::map<std::string, Strings> selTable;
00313 int psetIdx = -1;
00314 for(std::vector<ParameterSet>::iterator it = outModPSets_.begin();
00315 it != outModPSets_.end(); ++it) {
00316 ++psetIdx;
00317 if (psetIdx == errorStreamPSetIndex_) continue;
00318
00319 std::string streamLabel = it->getParameter<string> ("streamLabel");
00320 if (streamLabel.size() > 0) {
00321 selTable[streamLabel] = EventSelector::getEventSelectionVString(*it);
00322 }
00323 }
00324 return selTable;
00325 }
00326
00327
00328
00329
00330
00331
00332 void ServiceManager::collectStreamerPSets(const std::string& config)
00333 {
00334
00335 try{
00336
00337 PythonProcessDesc py_pdesc(config.c_str());
00338 boost::shared_ptr<ProcessDesc> pdesc = py_pdesc.processDesc();
00339
00340 boost::shared_ptr<ParameterSet> procPset = pdesc->getProcessPSet();
00341
00342 ParameterSet allTrigPaths = procPset->
00343 getUntrackedParameter<ParameterSet>("@trigger_paths");
00344
00345 if (allTrigPaths.empty())
00346 throw cms::Exception("collectStreamerPSets","ServiceManager")
00347 << "No Trigger or End Path Found in the Config File" <<endl;
00348
00349 std::vector<std::string> allEndPaths =
00350 procPset->getParameter<std::vector<std::string> >("@end_paths");
00351
00352 if (allEndPaths.empty())
00353 throw cms::Exception("collectStreamerPSets","ServiceManager")
00354 << "No End Path Found in the Config File" <<endl;
00355
00356 for(std::vector<std::string>::iterator it = allEndPaths.begin(), itEnd = allEndPaths.end();
00357 it != itEnd;
00358 ++it) {
00359 std::vector<std::string> anEndPath = procPset->getParameter<std::vector<std::string> >((*it));
00360 for(std::vector<std::string>::iterator i = anEndPath.begin(), iEnd = anEndPath.end();
00361 i != iEnd; ++i) {
00362 ParameterSet aModInEndPathPset =
00363 procPset->getParameter<ParameterSet>((*i));
00364 if (aModInEndPathPset.empty())
00365 throw cms::Exception("collectStreamerPSets","ServiceManager")
00366 << "Empty End Path Found in the Config File" <<endl;
00367
00368 std::string mod_type = aModInEndPathPset.getParameter<std::string> ("@module_type");
00369 if (mod_type == "EventStreamFileWriter") {
00370 outModPSets_.push_back(aModInEndPathPset);
00371 psetHLTOutputLabels_.push_back(std::string());
00372 }
00373 else if (mod_type == "ErrorStreamFileWriter" ||
00374 mod_type == "FRDStreamFileWriter") {
00375 errorStreamPSetIndex_ = outModPSets_.size();
00376 outModPSets_.push_back(aModInEndPathPset);
00377 psetHLTOutputLabels_.push_back(std::string());
00378 }
00379 }
00380 }
00381 } catch (cms::Exception & e) {
00382 std::cerr << "cms::Exception: " << e.explainSelf() << std::endl;
00383 std::cerr << "std::Exception: " << e.what() << std::endl;
00384 throw cms::Exception("collectStreamerPSets") << e.explainSelf() << std::endl;
00385 }
00386 }
00387
00388 boost::shared_ptr<stor::SMOnlyStats> ServiceManager::get_stats()
00389 {
00390
00391
00392 boost::shared_ptr<stor::SMOnlyStats> outstats(new stor::SMOnlyStats() );
00393
00394 if ( pmeter_->getStats().shortTermCounter_->hasValidResult() )
00395 {
00396 stor::SMPerfStats stats = pmeter_->getStats();
00397
00398 outstats->instantBandwidth_= stats.shortTermCounter_->getValueRate();
00399 outstats->instantRate_ = stats.shortTermCounter_->getSampleRate();
00400 outstats->instantLatency_ = 1000000.0 / outstats->instantRate_;
00401
00402 double now = stor::ForeverCounter::getCurrentTime();
00403 outstats->totalSamples_ = stats.longTermCounter_->getSampleCount();
00404 outstats->duration_ = stats.longTermCounter_->getDuration(now);
00405 outstats->meanBandwidth_ = stats.longTermCounter_->getValueRate(now);
00406 outstats->meanRate_ = stats.longTermCounter_->getSampleRate(now);
00407 outstats->meanLatency_ = 1000000.0 / outstats->meanRate_;
00408
00409 outstats->maxBandwidth_ = stats.maxBandwidth_;
00410 outstats->minBandwidth_ = stats.minBandwidth_;
00411 }
00412
00413
00414 if ( pmeter_->getStats().shortPeriodCounter_->hasValidResult() )
00415 {
00416 stor::SMPerfStats stats = pmeter_->getStats();
00417
00418 outstats->instantBandwidth2_= stats.shortPeriodCounter_->getValueRate();
00419 outstats->instantRate2_ = stats.shortPeriodCounter_->getSampleRate();
00420 outstats->instantLatency2_ = 1000000.0 / outstats->instantRate2_;
00421
00422 double now = stor::ForeverCounter::getCurrentTime();
00423 outstats->totalSamples2_ = stats.longTermCounter_->getSampleCount();
00424 outstats->duration2_ = stats.longTermCounter_->getDuration(now);
00425 outstats->meanBandwidth2_ = stats.longTermCounter_->getValueRate(now);
00426 outstats->meanRate2_ = stats.longTermCounter_->getSampleRate(now);
00427 outstats->meanLatency2_ = 1000000.0 / outstats->meanRate2_;
00428
00429 outstats->maxBandwidth2_ = stats.maxBandwidth2_;
00430 outstats->minBandwidth2_ = stats.minBandwidth2_;
00431 }
00432 outstats->receivedVolume_ = pmeter_->totalvolumemb();
00433 outstats->samples_ = samples_;
00434 outstats->period4samples_ = period4samples_;
00435 return outstats;
00436 }