CMS 3D CMS Logo

StreamSchedule.cc
Go to the documentation of this file.
2 
28 
29 #include <algorithm>
30 #include <cassert>
31 #include <cstdlib>
32 #include <functional>
33 #include <iomanip>
34 #include <list>
35 #include <map>
36 #include <exception>
37 
38 namespace edm {
39  namespace {
40 
41  // Function template to transform each element in the input range to
42  // a value placed into the output range. The supplied function
43  // should take a const_reference to the 'input', and write to a
44  // reference to the 'output'.
45  template <typename InputIterator, typename ForwardIterator, typename Func>
46  void
47  transform_into(InputIterator begin, InputIterator end,
48  ForwardIterator out, Func func) {
49  for (; begin != end; ++begin, ++out) func(*begin, *out);
50  }
51 
52  // Function template that takes a sequence 'from', a sequence
53  // 'to', and a callable object 'func'. It and applies
54  // transform_into to fill the 'to' sequence with the values
55  // calcuated by the callable object, taking care to fill the
56  // outupt only if all calls succeed.
57  template <typename FROM, typename TO, typename FUNC>
58  void
59  fill_summary(FROM const& from, TO& to, FUNC func) {
60  if(to.size()!=from.size()) {
61  TO temp(from.size());
62  transform_into(from.begin(), from.end(), temp.begin(), func);
63  to.swap(temp);
64  } else {
65  transform_into(from.begin(), from.end(), to.begin(), func);
66  }
67  }
68 
69  // -----------------------------
70 
71  // Here we make the trigger results inserter directly. This should
72  // probably be a utility in the WorkerRegistry or elsewhere.
73 
75  makeInserter(ExceptionToActionTable const& actions,
76  std::shared_ptr<ActivityRegistry> areg,
77  std::shared_ptr<TriggerResultInserter> inserter) {
78  StreamSchedule::WorkerPtr ptr(new edm::WorkerT<TriggerResultInserter::ModuleType>(inserter, inserter->moduleDescription(), &actions));
79  ptr->setActivityRegistry(areg);
80  return ptr;
81  }
82 
83  void
84  initializeBranchToReadingWorker(ParameterSet const& opts,
85  ProductRegistry const& preg,
86  std::multimap<std::string,Worker*>& branchToReadingWorker)
87  {
88  // See if any data has been marked to be deleted early (removing any duplicates)
89  auto vBranchesToDeleteEarly = opts.getUntrackedParameter<std::vector<std::string>>("canDeleteEarly",std::vector<std::string>());
90  if(not vBranchesToDeleteEarly.empty()) {
91  std::sort(vBranchesToDeleteEarly.begin(),vBranchesToDeleteEarly.end(),std::less<std::string>());
92  vBranchesToDeleteEarly.erase(std::unique(vBranchesToDeleteEarly.begin(),vBranchesToDeleteEarly.end()),
93  vBranchesToDeleteEarly.end());
94 
95  // Are the requested items in the product registry?
96  auto allBranchNames = preg.allBranchNames();
97  //the branch names all end with a period, which we do not want to compare with
98  for(auto & b:allBranchNames) {
99  b.resize(b.size()-1);
100  }
101  std::sort(allBranchNames.begin(),allBranchNames.end(),std::less<std::string>());
102  std::vector<std::string> temp;
103  temp.reserve(vBranchesToDeleteEarly.size());
104 
105  std::set_intersection(vBranchesToDeleteEarly.begin(),vBranchesToDeleteEarly.end(),
106  allBranchNames.begin(),allBranchNames.end(),
107  std::back_inserter(temp));
108  vBranchesToDeleteEarly.swap(temp);
109  if(temp.size() != vBranchesToDeleteEarly.size()) {
110  std::vector<std::string> missingProducts;
111  std::set_difference(temp.begin(),temp.end(),
112  vBranchesToDeleteEarly.begin(),vBranchesToDeleteEarly.end(),
113  std::back_inserter(missingProducts));
114  LogInfo l("MissingProductsForCanDeleteEarly");
115  l<<"The following products in the 'canDeleteEarly' list are not available in this job and will be ignored.";
116  for(auto const& n:missingProducts){
117  l<<"\n "<<n;
118  }
119  }
120  //set placeholder for the branch, we will remove the nullptr if a
121  // module actually wants the branch.
122  for(auto const& branch:vBranchesToDeleteEarly) {
123  branchToReadingWorker.insert(std::make_pair(branch, static_cast<Worker*>(nullptr)));
124  }
125  }
126  }
127  }
128 
129  // -----------------------------
130 
131  typedef std::vector<std::string> vstring;
132 
133  // -----------------------------
134 
135  StreamSchedule::StreamSchedule(std::shared_ptr<TriggerResultInserter> inserter,
136  std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
137  std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
138  std::shared_ptr<ModuleRegistry> modReg,
139  ParameterSet& proc_pset,
140  service::TriggerNamesService const& tns,
141  PreallocationConfiguration const& prealloc,
142  ProductRegistry& preg,
143  BranchIDListHelper& branchIDListHelper,
144  ExceptionToActionTable const& actions,
145  std::shared_ptr<ActivityRegistry> areg,
146  std::shared_ptr<ProcessConfiguration> processConfiguration,
147  bool allowEarlyDelete,
148  StreamID streamID,
149  ProcessContext const* processContext) :
150  workerManager_(modReg,areg, actions),
151  actReg_(areg),
152  results_(new HLTGlobalStatus(tns.getTrigPaths().size())),
153  results_inserter_(),
154  trig_paths_(),
155  end_paths_(),
156  total_events_(),
157  total_passed_(),
158  number_of_unscheduled_modules_(0),
159  streamID_(streamID),
160  streamContext_(streamID_, processContext),
161  endpathsAreActive_(true),
162  skippingEvent_(false){
163 
164  ParameterSet const& opts = proc_pset.getUntrackedParameterSet("options", ParameterSet());
165  bool hasPath = false;
166  std::vector<std::string> const& pathNames = tns.getTrigPaths();
167  std::vector<std::string> const& endPathNames = tns.getEndPaths();
168 
169  int trig_bitpos = 0;
170  trig_paths_.reserve(pathNames.size());
171  for (auto const& trig_name : pathNames) {
172  fillTrigPath(proc_pset, preg, &prealloc, processConfiguration, trig_bitpos, trig_name, results(), endPathNames);
173  ++trig_bitpos;
174  hasPath = true;
175  }
176 
177  if (hasPath) {
178  // the results inserter stands alone
179  inserter->setTrigResultForStream(streamID.value(), results());
180 
181  results_inserter_ = makeInserter(actions, actReg_, inserter);
183  }
184 
185  // fill normal endpaths
186  int bitpos = 0;
187  end_paths_.reserve(endPathNames.size());
188  for (auto const& end_path_name : endPathNames) {
189  fillEndPath(proc_pset, preg, &prealloc, processConfiguration, bitpos, end_path_name, endPathNames);
190  ++bitpos;
191  }
192 
193  makePathStatusInserters(pathStatusInserters, endPathStatusInserters, actions);
194 
195  //See if all modules were used
196  std::set<std::string> usedWorkerLabels;
197  for (auto const& worker : allWorkers()) {
198  usedWorkerLabels.insert(worker->description().moduleLabel());
199  }
200  std::vector<std::string> modulesInConfig(proc_pset.getParameter<std::vector<std::string> >("@all_modules"));
201  std::set<std::string> modulesInConfigSet(modulesInConfig.begin(), modulesInConfig.end());
202  std::vector<std::string> unusedLabels;
203  set_difference(modulesInConfigSet.begin(), modulesInConfigSet.end(),
204  usedWorkerLabels.begin(), usedWorkerLabels.end(),
205  back_inserter(unusedLabels));
206  std::set<std::string> unscheduledLabels;
207  std::vector<std::string> shouldBeUsedLabels;
208  if (!unusedLabels.empty()) {
209  //Need to
210  // 1) create worker
211  // 2) if it is a WorkerT<EDProducer>, add it to our list
212  // 3) hand list to our delayed reader
213  for (auto const& label : unusedLabels) {
214  bool isTracked;
215  ParameterSet* modulePSet(proc_pset.getPSetForUpdate(label, isTracked));
216  assert(isTracked);
217  assert(modulePSet != nullptr);
218  workerManager_.addToUnscheduledWorkers(*modulePSet, preg, &prealloc, processConfiguration, label, unscheduledLabels, shouldBeUsedLabels);
219  }
220  if (!shouldBeUsedLabels.empty()) {
221  std::ostringstream unusedStream;
222  unusedStream << "'" << shouldBeUsedLabels.front() << "'";
223  for (std::vector<std::string>::iterator itLabel = shouldBeUsedLabels.begin() + 1,
224  itLabelEnd = shouldBeUsedLabels.end();
225  itLabel != itLabelEnd;
226  ++itLabel) {
227  unusedStream << ",'" << *itLabel << "'";
228  }
229  LogInfo("path")
230  << "The following module labels are not assigned to any path:\n"
231  << unusedStream.str()
232  << "\n";
233  }
234  }
235  if (!unscheduledLabels.empty()) {
236  number_of_unscheduled_modules_=unscheduledLabels.size();
237  workerManager_.setOnDemandProducts(preg, unscheduledLabels);
238  }
239 
240 
241  initializeEarlyDelete(*modReg, opts,preg,allowEarlyDelete);
242 
243  } // StreamSchedule::StreamSchedule
244 
245 
247  edm::ParameterSet const& opts, edm::ProductRegistry const& preg,
248  bool allowEarlyDelete) {
249  //for now, if have a subProcess, don't allow early delete
250  // In the future we should use the SubProcess's 'keep list' to decide what can be kept
251  if(not allowEarlyDelete) return;
252 
253  //see if 'canDeleteEarly' was set and if so setup the list with those products actually
254  // registered for this job
255  std::multimap<std::string,Worker*> branchToReadingWorker;
256  initializeBranchToReadingWorker(opts,preg,branchToReadingWorker);
257 
258  //If no delete early items have been specified we don't have to do anything
259  if(branchToReadingWorker.empty()) {
260  return;
261  }
262  const std::vector<std::string> kEmpty;
263  std::map<Worker*,unsigned int> reserveSizeForWorker;
264  unsigned int upperLimitOnReadingWorker =0;
265  unsigned int upperLimitOnIndicies = 0;
266  unsigned int nUniqueBranchesToDelete=branchToReadingWorker.size();
267 
268  //talk with output modules first
269  modReg.forAllModuleHolders([&branchToReadingWorker,&nUniqueBranchesToDelete](maker::ModuleHolder* iHolder){
270  auto comm = iHolder->createOutputModuleCommunicator();
271  if (comm) {
272  if(!branchToReadingWorker.empty()) {
273  //If an OutputModule needs a product, we can't delete it early
274  // so we should remove it from our list
275  SelectedProductsForBranchType const& kept = comm->keptProducts();
276  for(auto const& item: kept[InEvent]) {
277  BranchDescription const& desc = *item.first;
278  auto found = branchToReadingWorker.equal_range(desc.branchName());
279  if(found.first !=found.second) {
280  --nUniqueBranchesToDelete;
281  branchToReadingWorker.erase(found.first,found.second);
282  }
283  }
284  }
285  }
286  });
287 
288  if(branchToReadingWorker.empty()) {
289  return;
290  }
291 
292  for (auto w :allWorkers()) {
293  //determine if this module could read a branch we want to delete early
294  auto pset = pset::Registry::instance()->getMapped(w->description().parameterSetID());
295  if(nullptr!=pset) {
296  auto branches = pset->getUntrackedParameter<std::vector<std::string>>("mightGet",kEmpty);
297  if(not branches.empty()) {
298  ++upperLimitOnReadingWorker;
299  }
300  for(auto const& branch:branches){
301  auto found = branchToReadingWorker.equal_range(branch);
302  if(found.first != found.second) {
303  ++upperLimitOnIndicies;
304  ++reserveSizeForWorker[w];
305  if(nullptr == found.first->second) {
306  found.first->second = w;
307  } else {
308  branchToReadingWorker.insert(make_pair(found.first->first,w));
309  }
310  }
311  }
312  }
313  }
314  {
315  auto it = branchToReadingWorker.begin();
316  std::vector<std::string> unusedBranches;
317  while(it !=branchToReadingWorker.end()) {
318  if(it->second == nullptr) {
319  unusedBranches.push_back(it->first);
320  //erasing the object invalidates the iterator so must advance it first
321  auto temp = it;
322  ++it;
323  branchToReadingWorker.erase(temp);
324  } else {
325  ++it;
326  }
327  }
328  if(not unusedBranches.empty()) {
329  LogWarning l("UnusedProductsForCanDeleteEarly");
330  l<<"The following products in the 'canDeleteEarly' list are not used in this job and will be ignored.\n"
331  " If possible, remove the producer from the job or add the product to the producer's own 'mightGet' list.";
332  for(auto const& n:unusedBranches){
333  l<<"\n "<<n;
334  }
335  }
336  }
337  if(!branchToReadingWorker.empty()) {
338  earlyDeleteHelpers_.reserve(upperLimitOnReadingWorker);
339  earlyDeleteHelperToBranchIndicies_.resize(upperLimitOnIndicies,0);
340  earlyDeleteBranchToCount_.reserve(nUniqueBranchesToDelete);
341  std::map<const Worker*,EarlyDeleteHelper*> alreadySeenWorkers;
342  std::string lastBranchName;
343  size_t nextOpenIndex = 0;
344  unsigned int* beginAddress = &(earlyDeleteHelperToBranchIndicies_.front());
345  for(auto& branchAndWorker:branchToReadingWorker) {
346  if(lastBranchName != branchAndWorker.first) {
347  //have to put back the period we removed earlier in order to get the proper name
348  BranchID bid(branchAndWorker.first+".");
349  earlyDeleteBranchToCount_.emplace_back(bid,0U);
350  lastBranchName = branchAndWorker.first;
351  }
352  auto found = alreadySeenWorkers.find(branchAndWorker.second);
353  if(alreadySeenWorkers.end() == found) {
354  //NOTE: we will set aside enough space in earlyDeleteHelperToBranchIndicies_ to accommodate
355  // all the branches that might be read by this worker. However, initially we will only tell the
356  // EarlyDeleteHelper about the first one. As additional branches are added via 'appendIndex' the
357  // EarlyDeleteHelper will automatically advance its internal end pointer.
358  size_t index = nextOpenIndex;
359  size_t nIndices = reserveSizeForWorker[branchAndWorker.second];
361  earlyDeleteHelpers_.emplace_back(beginAddress+index,
362  beginAddress+index+1,
364  branchAndWorker.second->setEarlyDeleteHelper(&(earlyDeleteHelpers_.back()));
365  alreadySeenWorkers.insert(std::make_pair(branchAndWorker.second,&(earlyDeleteHelpers_.back())));
366  nextOpenIndex +=nIndices;
367  } else {
368  found->second->appendIndex(earlyDeleteBranchToCount_.size()-1);
369  }
370  }
371 
372  //Now we can compactify the earlyDeleteHelperToBranchIndicies_ since we may have over estimated the
373  // space needed for each module
374  auto itLast = earlyDeleteHelpers_.begin();
375  for(auto it = earlyDeleteHelpers_.begin()+1;it != earlyDeleteHelpers_.end();++it) {
376  if(itLast->end() != it->begin()) {
377  //figure the offset for next Worker since it hasn't been moved yet so it has the original address
378  unsigned int delta = it->begin()- itLast->end();
379  it->shiftIndexPointers(delta);
380 
382  (itLast->end()-beginAddress),
384  (it->begin()-beginAddress));
385  }
386  itLast = it;
387  }
388  earlyDeleteHelperToBranchIndicies_.erase(earlyDeleteHelperToBranchIndicies_.begin()+(itLast->end()-beginAddress),
390 
391  //now tell the paths about the deleters
392  for(auto& p : trig_paths_) {
393  p.setEarlyDeleteHelpers(alreadySeenWorkers);
394  }
395  for(auto& p : end_paths_) {
396  p.setEarlyDeleteHelpers(alreadySeenWorkers);
397  }
399  }
400  }
401 
403  ProductRegistry& preg,
404  PreallocationConfiguration const* prealloc,
405  std::shared_ptr<ProcessConfiguration const> processConfiguration,
406  std::string const& pathName,
407  bool ignoreFilters,
408  PathWorkers& out,
409  std::vector<std::string> const& endPathNames) {
410  vstring modnames = proc_pset.getParameter<vstring>(pathName);
411  PathWorkers tmpworkers;
412 
413  unsigned int placeInPath = 0;
414  for (auto const& name : modnames) {
415 
417  if (name[0] == '!') filterAction = WorkerInPath::Veto;
418  else if (name[0] == '-') filterAction = WorkerInPath::Ignore;
419 
420  std::string moduleLabel = name;
421  if (filterAction != WorkerInPath::Normal) moduleLabel.erase(0, 1);
422 
423  bool isTracked;
424  ParameterSet* modpset = proc_pset.getPSetForUpdate(moduleLabel, isTracked);
425  if (modpset == nullptr) {
426  std::string pathType("endpath");
427  if (!search_all(endPathNames, pathName)) {
428  pathType = std::string("path");
429  }
431  "The unknown module label \"" << moduleLabel <<
432  "\" appears in " << pathType << " \"" << pathName <<
433  "\"\n please check spelling or remove that label from the path.";
434  }
435  assert(isTracked);
436 
437  Worker* worker = workerManager_.getWorker(*modpset, preg, prealloc, processConfiguration, moduleLabel);
438  if (ignoreFilters && filterAction != WorkerInPath::Ignore && worker->moduleType()==Worker::kFilter) {
439  // We have a filter on an end path, and the filter is not explicitly ignored.
440  // See if the filter is allowed.
441  std::vector<std::string> allowed_filters = proc_pset.getUntrackedParameter<vstring>("@filters_on_endpaths");
442  if (!search_all(allowed_filters, worker->description().moduleName())) {
443  // Filter is not allowed. Ignore the result, and issue a warning.
444  filterAction = WorkerInPath::Ignore;
445  LogWarning("FilterOnEndPath")
446  << "The EDFilter '" << worker->description().moduleName() << "' with module label '" << moduleLabel << "' appears on EndPath '" << pathName << "'.\n"
447  << "The return value of the filter will be ignored.\n"
448  << "To suppress this warning, either remove the filter from the endpath,\n"
449  << "or explicitly ignore it in the configuration by using cms.ignore().\n";
450  }
451  }
452  tmpworkers.emplace_back(worker, filterAction, placeInPath);
453  ++placeInPath;
454  }
455 
456  out.swap(tmpworkers);
457  }
458 
460  ProductRegistry& preg,
461  PreallocationConfiguration const* prealloc,
462  std::shared_ptr<ProcessConfiguration const> processConfiguration,
463  int bitpos, std::string const& name, TrigResPtr trptr,
464  std::vector<std::string> const& endPathNames) {
465  PathWorkers tmpworkers;
466  fillWorkers(proc_pset, preg, prealloc, processConfiguration, name, false, tmpworkers, endPathNames);
467 
468  // an empty path will cause an extra bit that is not used
469  if (!tmpworkers.empty()) {
470  trig_paths_.emplace_back(bitpos, name, tmpworkers, trptr, actionTable(), actReg_, &streamContext_, &skippingEvent_, PathContext::PathType::kPath);
471  } else {
472  empty_trig_paths_.push_back(bitpos);
473  }
474  for (WorkerInPath const& workerInPath : tmpworkers) {
475  addToAllWorkers(workerInPath.getWorker());
476  }
477  }
478 
480  ProductRegistry& preg,
481  PreallocationConfiguration const* prealloc,
482  std::shared_ptr<ProcessConfiguration const> processConfiguration,
483  int bitpos, std::string const& name,
484  std::vector<std::string> const& endPathNames) {
485  PathWorkers tmpworkers;
486  fillWorkers(proc_pset, preg, prealloc, processConfiguration, name, true, tmpworkers, endPathNames);
487 
488  if (!tmpworkers.empty()) {
489  //EndPaths are not supposed to stop if SkipEvent type exception happens
490  end_paths_.emplace_back(bitpos, name, tmpworkers, TrigResPtr(), actionTable(), actReg_, &streamContext_, nullptr, PathContext::PathType::kEndPath);
491  } else {
492  empty_end_paths_.push_back(bitpos);
493  }
494  for (WorkerInPath const& workerInPath : tmpworkers) {
495  addToAllWorkers(workerInPath.getWorker());
496  }
497  }
498 
501  }
502 
505  }
506 
508  std::string const& iLabel) {
509  Worker* found = nullptr;
510  for (auto const& worker : allWorkers()) {
511  if (worker->description().moduleLabel() == iLabel) {
512  found = worker;
513  break;
514  }
515  }
516  if (nullptr == found) {
517  return;
518  }
519 
520  iMod->replaceModuleFor(found);
522  }
523 
524  std::vector<ModuleDescription const*>
526  std::vector<ModuleDescription const*> result;
527  result.reserve(allWorkers().size());
528 
529  for (auto const& worker : allWorkers()) {
530  ModuleDescription const* p = worker->descPtr();
531  result.push_back(p);
532  }
533  return result;
534  }
535 
537  EventPrincipal& ep,
538  EventSetup const& es,
539  std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters) {
540  this->resetAll();
541 
543 
544  Traits::setStreamContext(streamContext_, ep);
545  Traits::preScheduleSignal(actReg_.get(), &streamContext_);
546 
547  HLTPathStatus hltPathStatus(hlt::Pass, 0);
548  for (int empty_trig_path : empty_trig_paths_) {
549  results_->at(empty_trig_path) = hltPathStatus;
550  pathStatusInserters[empty_trig_path]->setPathStatus(streamID_, hltPathStatus);
551  std::exception_ptr iException = pathStatusInserterWorkers_[empty_trig_path]->runModuleDirectly<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>>(
553  );
554  if (iException) {
555  iTask.doneWaiting(iException);
556  return;
557  }
558  }
559  for (int empty_end_path : empty_end_paths_) {
560  std::exception_ptr iException = endPathStatusInserterWorkers_[empty_end_path]->runModuleDirectly<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>>(
562  );
563  if (iException) {
564  iTask.doneWaiting(iException);
565  return;
566  }
567  }
568 
569  // This call takes care of the unscheduled processing.
571 
572  ++total_events_;
573  auto serviceToken = ServiceRegistry::instance().presentToken();
574 
575  auto allPathsDone = make_waiting_task(tbb::task::allocate_root(),
576  [iTask,this,serviceToken](std::exception_ptr const* iPtr) mutable
577  {
578  ServiceRegistry::Operate operate(serviceToken);
579 
580  std::exception_ptr ptr;
581  if(iPtr) {
582  ptr = *iPtr;
583  }
585  });
586  //The holder guarantees that if the paths finish before the loop ends
587  // that we do not start too soon. It also guarantees that the task will
588  // run under that condition.
589  WaitingTaskHolder allPathsHolder(allPathsDone);
590 
591  auto pathsDone = make_waiting_task(tbb::task::allocate_root(),
592  [allPathsHolder,&ep, &es, this,serviceToken](std::exception_ptr const* iPtr) mutable
593  {
594  ServiceRegistry::Operate operate(serviceToken);
595 
596  std::exception_ptr ptr;
597  if(iPtr) {
598  ptr = *iPtr;
599  }
600  finishedPaths(ptr, std::move(allPathsHolder), ep, es);
601  });
602 
603  //The holder guarantees that if the paths finish before the loop ends
604  // that we do not start too soon. It also guarantees that the task will
605  // run under that condition.
606  WaitingTaskHolder taskHolder(pathsDone);
607 
608  //start end paths first so on single threaded the paths will run first
609  for(auto it = end_paths_.rbegin(), itEnd = end_paths_.rend();
610  it != itEnd; ++it) {
611  it->processOneOccurrenceAsync(allPathsDone,ep, es, streamID_, &streamContext_);
612  }
613 
614  for(auto it = trig_paths_.rbegin(), itEnd = trig_paths_.rend();
615  it != itEnd; ++ it) {
616  it->processOneOccurrenceAsync(pathsDone,ep, es, streamID_, &streamContext_);
617  }
618  }
619 
620  void
621  StreamSchedule::finishedPaths(std::exception_ptr iExcept, WaitingTaskHolder iWait, EventPrincipal& ep,
622  EventSetup const& es) {
623 
624  if(iExcept) {
625  try {
626  std::rethrow_exception(iExcept);
627  }
628  catch(cms::Exception& e) {
630  assert (action != exception_actions::IgnoreCompletely);
631  assert (action != exception_actions::FailPath);
632  if (action == exception_actions::SkipEvent) {
633  edm::printCmsExceptionWarning("SkipEvent", e);
634  iExcept = std::exception_ptr();
635  } else {
636  iExcept = std::current_exception();
637  }
638  }
639  catch(...) {
640  iExcept = std::current_exception();
641  }
642  }
643 
644 
645  if((not iExcept) and results_->accept()) {
646  ++total_passed_;
647  }
648 
649  if(nullptr != results_inserter_.get()) {
650  try {
651  //Even if there was an exception, we need to allow results inserter
652  // to run since some module may be waiting on its results.
653  ParentContext parentContext(&streamContext_);
655 
656  results_inserter_->doWork<Traits>(ep, es, streamID_, parentContext, &streamContext_);
657  }
658  catch (cms::Exception & ex) {
659  if (not iExcept) {
660  if(ex.context().empty()) {
661  std::ostringstream ost;
662  ost << "Processing Event " << ep.id();
663  ex.addContext(ost.str());
664  }
665  iExcept = std::current_exception();
666  }
667  }
668  catch(...) {
669  if (not iExcept) {
670  iExcept = std::current_exception();
671  }
672  }
673  }
674  iWait.doneWaiting(iExcept);
675  }
676 
677 
678  std::exception_ptr
679  StreamSchedule::finishProcessOneEvent(std::exception_ptr iExcept) {
681 
682  if(iExcept) {
683  //add context information to the exception and print message
684  try {
685  convertException::wrap([&]() {
686  std::rethrow_exception(iExcept);
687  });
688  } catch(cms::Exception& ex) {
689  bool const cleaningUpAfterException = false;
690  if (ex.context().empty()) {
691  addContextAndPrintException("Calling function StreamSchedule::processOneEvent", ex, cleaningUpAfterException);
692  } else {
693  addContextAndPrintException("", ex, cleaningUpAfterException);
694  }
695  iExcept = std::current_exception();
696  }
697 
698  actReg_->preStreamEarlyTerminationSignal_(streamContext_,TerminationOrigin::ExceptionFromThisContext);
699  }
700 
701  try {
702  Traits::postScheduleSignal(actReg_.get(), &streamContext_);
703  } catch(...) {
704  if(not iExcept) {
705  iExcept = std::current_exception();
706  }
707  }
708  if(not iExcept ) {
710  }
711 
712  return iExcept;
713  }
714 
715 
716  void
717  StreamSchedule::availablePaths(std::vector<std::string>& oLabelsToFill) const {
718  oLabelsToFill.reserve(trig_paths_.size());
719  std::transform(trig_paths_.begin(),
720  trig_paths_.end(),
721  std::back_inserter(oLabelsToFill),
722  std::bind(&Path::name, std::placeholders::_1));
723  }
724 
725  void
727  std::vector<std::string>& oLabelsToFill) const {
728  TrigPaths::const_iterator itFound =
729  std::find_if (trig_paths_.begin(),
730  trig_paths_.end(),
731  std::bind(std::equal_to<std::string>(),
732  iPathLabel,
733  std::bind(&Path::name, std::placeholders::_1)));
734  if (itFound!=trig_paths_.end()) {
735  oLabelsToFill.reserve(itFound->size());
736  for (size_t i = 0; i < itFound->size(); ++i) {
737  oLabelsToFill.push_back(itFound->getWorker(i)->description().moduleLabel());
738  }
739  }
740  }
741 
742  void
744  std::vector<ModuleDescription const*>& descriptions,
745  unsigned int hint) const {
746  descriptions.clear();
747  bool found = false;
748  TrigPaths::const_iterator itFound;
749 
750  if(hint < trig_paths_.size()) {
751  itFound = trig_paths_.begin() + hint;
752  if(itFound->name() == iPathLabel) found = true;
753  }
754  if(!found) {
755  // if the hint did not work, do it the slow way
756  itFound = std::find_if (trig_paths_.begin(),
757  trig_paths_.end(),
758  std::bind(std::equal_to<std::string>(),
759  iPathLabel,
760  std::bind(&Path::name, std::placeholders::_1)));
761  if (itFound != trig_paths_.end()) found = true;
762  }
763  if (found) {
764  descriptions.reserve(itFound->size());
765  for (size_t i = 0; i < itFound->size(); ++i) {
766  descriptions.push_back(itFound->getWorker(i)->descPtr());
767  }
768  }
769  }
770 
771  void
773  std::vector<ModuleDescription const*>& descriptions,
774  unsigned int hint) const {
775  descriptions.clear();
776  bool found = false;
777  TrigPaths::const_iterator itFound;
778 
779  if(hint < end_paths_.size()) {
780  itFound = end_paths_.begin() + hint;
781  if(itFound->name() == iEndPathLabel) found = true;
782  }
783  if(!found) {
784  // if the hint did not work, do it the slow way
785  itFound = std::find_if (end_paths_.begin(),
786  end_paths_.end(),
787  std::bind(std::equal_to<std::string>(),
788  iEndPathLabel,
789  std::bind(&Path::name, std::placeholders::_1)));
790  if (itFound != end_paths_.end()) found = true;
791  }
792  if (found) {
793  descriptions.reserve(itFound->size());
794  for (size_t i = 0; i < itFound->size(); ++i) {
795  descriptions.push_back(itFound->getWorker(i)->descPtr());
796  }
797  }
798  }
799 
800  void
802  endpathsAreActive_ = active;
803  }
804 
805  bool
807  return endpathsAreActive_;
808  }
809 
810  static void
812  size_t which,
813  ModuleInPathSummary& sum) {
814  sum.timesVisited += path.timesVisited(which);
815  sum.timesPassed += path.timesPassed(which);
816  sum.timesFailed += path.timesFailed(which);
817  sum.timesExcept += path.timesExcept(which);
818  sum.moduleLabel = path.getWorker(which)->description().moduleLabel();
819  }
820 
821  static void
823  sum.name = path.name();
824  sum.bitPosition = path.bitPosition();
825  sum.timesRun += path.timesRun();
826  sum.timesPassed += path.timesPassed();
827  sum.timesFailed += path.timesFailed();
828  sum.timesExcept += path.timesExcept();
829 
830  Path::size_type sz = path.size();
831  if(sum.moduleInPathSummaries.empty()) {
832  std::vector<ModuleInPathSummary> temp(sz);
833  for (size_t i = 0; i != sz; ++i) {
834  fillModuleInPathSummary(path, i, temp[i]);
835  }
836  sum.moduleInPathSummaries.swap(temp);
837  } else {
838  assert(sz == sum.moduleInPathSummaries.size());
839  for (size_t i = 0; i != sz; ++i) {
841  }
842  }
843  }
844 
845  static void
847  sum.timesVisited += w.timesVisited();
848  sum.timesRun += w.timesRun();
849  sum.timesPassed += w.timesPassed();
850  sum.timesFailed += w.timesFailed();
851  sum.timesExcept += w.timesExcept();
852  sum.moduleLabel = w.description().moduleLabel();
853  }
854 
855  static void
857  fillWorkerSummaryAux(*pw, sum);
858  }
859 
860  void
865 
866  fill_summary(trig_paths_, rep.trigPathSummaries, &fillPathSummary);
867  fill_summary(end_paths_, rep.endPathSummaries, &fillPathSummary);
868  fill_summary(allWorkers(), rep.workerSummaries, &fillWorkerSummary);
869  }
870 
871  void
873  using std::placeholders::_1;
875  for_all(trig_paths_, std::bind(&Path::clearCounters, _1));
876  for_all(end_paths_, std::bind(&Path::clearCounters, _1));
877  for_all(allWorkers(), std::bind(&Worker::clearCounters, _1));
878  }
879 
880  void
882  skippingEvent_ = false;
883  results_->reset();
884  }
885 
886  void
889  }
890 
891  void
893  //must be sure we have cleared the count first
894  for(auto& count:earlyDeleteBranchToCount_) {
895  count.count = 0;
896  }
897  //now reset based on how many helpers use that branch
899  ++(earlyDeleteBranchToCount_[index].count);
900  }
901  for(auto& helper: earlyDeleteHelpers_) {
902  helper.reset();
903  }
904  }
905 
906  void
908  std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
909  std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
910  ExceptionToActionTable const& actions) {
911 
912  int bitpos = 0;
913  unsigned int indexEmpty = 0;
914  unsigned int indexOfPath = 0;
915  for(auto & pathStatusInserter : pathStatusInserters) {
916  std::shared_ptr<PathStatusInserter> inserterPtr = get_underlying(pathStatusInserter);
918  inserterPtr->moduleDescription(),
919  &actions));
920  pathStatusInserterWorkers_.emplace_back(workerPtr);
921  workerPtr->setActivityRegistry(actReg_);
922  addToAllWorkers(workerPtr.get());
923 
924  // A little complexity here because a C++ Path object is not
925  // instantiated and put into end_paths if there are no modules
926  // on the configured path.
927  if (indexEmpty < empty_trig_paths_.size() && bitpos == empty_trig_paths_.at(indexEmpty)) {
928  ++indexEmpty;
929  } else {
930  trig_paths_.at(indexOfPath).setPathStatusInserter(inserterPtr.get(),
931  workerPtr.get());
932  ++indexOfPath;
933  }
934  ++bitpos;
935  }
936 
937  bitpos = 0;
938  indexEmpty = 0;
939  indexOfPath = 0;
940  for(auto & endPathStatusInserter : endPathStatusInserters) {
941  std::shared_ptr<EndPathStatusInserter> inserterPtr = get_underlying(endPathStatusInserter);
943  inserterPtr->moduleDescription(),
944  &actions));
945  endPathStatusInserterWorkers_.emplace_back(workerPtr);
946  workerPtr->setActivityRegistry(actReg_);
947  addToAllWorkers(workerPtr.get());
948 
949  // A little complexity here because a C++ Path object is not
950  // instantiated and put into end_paths if there are no modules
951  // on the configured path.
952  if (indexEmpty < empty_end_paths_.size() && bitpos == empty_end_paths_.at(indexEmpty)) {
953  ++indexEmpty;
954  } else {
955  end_paths_.at(indexOfPath).setPathStatusInserter(nullptr,
956  workerPtr.get());
957  ++indexOfPath;
958  }
959  ++bitpos;
960  }
961  }
962 }
static void fillModuleInPathSummary(Path const &path, size_t which, ModuleInPathSummary &sum)
size
Write out results.
dbl * delta
Definition: mlp_gen.cc:36
void moduleDescriptionsInEndPath(std::string const &iEndPathLabel, std::vector< ModuleDescription const * > &descriptions, unsigned int hint) const
T getParameter(std::string const &) const
std::vector< PathSummary > endPathSummaries
Definition: TriggerReport.h:68
T getUntrackedParameter(std::string const &, T const &) const
std::string const & branchName() const
int timesRun() const
Definition: Path.h:77
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
int bitPosition() const
Definition: Path.h:72
ModuleDescription const & description() const
Definition: Worker.h:182
std::vector< int > empty_trig_paths_
Definition: helper.py:1
void moduleDescriptionsInPath(std::string const &iPathLabel, std::vector< ModuleDescription const * > &descriptions, unsigned int hint) const
roAction_t actions[nactions]
Definition: GenABIO.cc:187
void fillTrigPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, TrigResPtr, std::vector< std::string > const &endPathNames)
std::vector< edm::propagate_const< WorkerPtr > > pathStatusInserterWorkers_
void endStream(StreamID iID, StreamContext &streamContext)
const double w
Definition: UKUtility.cc:23
virtual void replaceModuleFor(Worker *) const =0
Strings const & getEndPaths() const
int totalEventsFailed() const
std::array< SelectedProducts, NumBranchTypes > SelectedProductsForBranchType
void initializeEarlyDelete(ModuleRegistry &modReg, edm::ParameterSet const &opts, edm::ProductRegistry const &preg, bool allowEarlyDelete)
std::shared_ptr< HLTGlobalStatus > TrigResPtr
void setOnDemandProducts(ProductRegistry &pregistry, std::set< std::string > const &unscheduledLabels) const
WorkersInPath::size_type size_type
Definition: Path.h:49
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
void addToUnscheduledWorkers(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration > processConfiguration, std::string label, std::set< std::string > &unscheduledLabels, std::vector< std::string > &shouldBeUsedLabels)
int timesPassed() const
Definition: Worker.h:220
volatile bool endpathsAreActive_
std::vector< BranchToCount > earlyDeleteBranchToCount_
EventID const & id() const
int totalEvents() const
void clearCounters()
Definition: Worker.h:206
void addContextAndPrintException(char const *context, cms::Exception &ex, bool disablePrint)
std::string const & moduleName() const
std::string const & category() const
Definition: Exception.cc:183
void addToAllWorkers(Worker *w)
int timesExcept() const
Definition: Path.h:80
exception_actions::ActionCodes find(const std::string &category) const
size_type size() const
Definition: Path.h:84
ParameterSet getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
edm::propagate_const< WorkerPtr > results_inserter_
ExceptionToActionTable const & actionTable() const
returns the action table
void beginStream(StreamID id, StreamContext &streamContext)
Definition: Worker.cc:323
std::shared_ptr< Worker > WorkerPtr
std::vector< WorkerSummary > workerSummaries
Definition: TriggerReport.h:69
void fillWorkers(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &name, bool ignoreFilters, PathWorkers &out, std::vector< std::string > const &endPathNames)
ServiceToken presentToken() const
std::string const & moduleLabel() const
unsigned int number_of_unscheduled_modules_
WorkerManager workerManager_
std::shared_ptr< ActivityRegistry > actReg_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
std::string moduleLabel
Definition: TriggerReport.h:56
void fillEndPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, std::vector< std::string > const &endPathNames)
std::vector< WorkerInPath > PathWorkers
int timesExcept() const
Definition: Worker.h:222
int totalEventsPassed() const
std::vector< int > empty_end_paths_
void getTriggerReport(TriggerReport &rep) const
void doneWaiting(std::exception_ptr iExcept)
std::vector< PathSummary > trigPathSummaries
Definition: TriggerReport.h:67
EventSummary eventSummary
Definition: TriggerReport.h:66
accept
Definition: HLTenums.h:19
T & get_underlying(propagate_const< T > &)
int timesVisited() const
Definition: Worker.h:219
std::string name
Definition: TriggerReport.h:44
def unique(seq, keepstr=True)
Definition: tier0.py:24
Definition: Path.h:44
StreamContext streamContext_
std::list< std::string > const & context() const
Definition: Exception.cc:191
static void fillPathSummary(Path const &path, PathSummary &sum)
static ServiceRegistry & instance()
int timesPassed() const
Definition: Path.h:78
bool getMapped(key_type const &k, value_type &result) const
Definition: Registry.cc:19
int timesRun() const
Definition: Worker.h:218
#define end
Definition: vmac.h:39
rep
Definition: cuy.py:1188
std::exception_ptr finishProcessOneEvent(std::exception_ptr)
element_type const * get() const
static void fillWorkerSummaryAux(Worker const &w, WorkerSummary &sum)
void beginStream(StreamID iID, StreamContext &streamContext)
void clearCounters()
Clear all the counters in the trigger report.
void processOneEventAsync(WaitingTaskHolder iTask, EventPrincipal &ep, EventSetup const &es, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters)
unsigned int value() const
Definition: StreamID.h:46
std::vector< unsigned int > earlyDeleteHelperToBranchIndicies_
void forAllModuleHolders(F iFunc)
edm::propagate_const< TrigResPtr > results_
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:92
bool search_all(ForwardSequence const &s, Datum const &d)
Definition: Algorithms.h:46
int timesVisited(size_type i) const
Definition: Path.h:85
double b
Definition: hdecay.h:120
void addContext(std::string const &context)
Definition: Exception.cc:227
virtual std::unique_ptr< OutputModuleCommunicator > createOutputModuleCommunicator()=0
int timesFailed() const
Definition: Worker.h:221
static void fillWorkerSummary(Worker const *pw, WorkerSummary &sum)
bool endPathsEnabled() const
StreamSchedule(std::shared_ptr< TriggerResultInserter > inserter, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, std::shared_ptr< ModuleRegistry >, ParameterSet &proc_pset, service::TriggerNamesService const &tns, PreallocationConfiguration const &prealloc, ProductRegistry &pregistry, BranchIDListHelper &branchIDListHelper, ExceptionToActionTable const &actions, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration > processConfiguration, bool allowEarlyDelete, StreamID streamID, ProcessContext const *processContext)
std::atomic< bool > skippingEvent_
std::string const & name() const
Definition: Path.h:73
#define begin
Definition: vmac.h:32
HLT enums.
Strings const & getTrigPaths() const
Worker const * getWorker(size_type i) const
Definition: Path.h:89
void replaceModule(maker::ModuleHolder *iMod, std::string const &iLabel)
clone the type of module with label iLabel but configure with iPSet.
std::vector< edm::propagate_const< WorkerPtr > > endPathStatusInserterWorkers_
void enableEndPaths(bool active)
void makePathStatusInserters(std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, ExceptionToActionTable const &actions)
def which(cmd)
Definition: eostools.py:335
std::vector< ModuleInPathSummary > moduleInPathSummaries
Definition: TriggerReport.h:45
virtual Types moduleType() const =0
auto wrap(F iFunc) -> decltype(iFunc())
void finishedPaths(std::exception_ptr, WaitingTaskHolder, EventPrincipal &ep, EventSetup const &es)
void setupOnDemandSystem(Principal &principal, EventSetup const &es)
TrigResConstPtr results() const
std::vector< std::string > vstring
Definition: Schedule.cc:430
def move(src, dest)
Definition: eostools.py:510
static Registry * instance()
Definition: Registry.cc:13
Worker * getWorker(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &label)
void clearCounters()
Definition: Path.cc:171
void availablePaths(std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill the labels for all paths in the process
std::vector< EarlyDeleteHelper > earlyDeleteHelpers_
void modulesInPath(std::string const &iPathLabel, std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel ...
void addToAllWorkers(Worker *w)
int timesFailed() const
Definition: Path.h:79
void printCmsExceptionWarning(char const *behavior, cms::Exception const &e)
def operate(timelog, memlog, json_f, num)
ParameterSet * getPSetForUpdate(std::string const &name, bool &isTracked)