CMS 3D CMS Logo

StreamSchedule.cc
Go to the documentation of this file.
2 
28 
30 
31 #include <algorithm>
32 #include <cassert>
33 #include <cstdlib>
34 #include <functional>
35 #include <iomanip>
36 #include <list>
37 #include <map>
38 #include <exception>
39 
40 namespace edm {
41  namespace {
42 
43  // Function template to transform each element in the input range to
44  // a value placed into the output range. The supplied function
45  // should take a const_reference to the 'input', and write to a
46  // reference to the 'output'.
47  template <typename InputIterator, typename ForwardIterator, typename Func>
48  void transform_into(InputIterator begin, InputIterator end, ForwardIterator out, Func func) {
49  for (; begin != end; ++begin, ++out)
50  func(*begin, *out);
51  }
52 
53  // Function template that takes a sequence 'from', a sequence
54  // 'to', and a callable object 'func'. It and applies
55  // transform_into to fill the 'to' sequence with the values
56  // calcuated by the callable object, taking care to fill the
57  // outupt only if all calls succeed.
58  template <typename FROM, typename TO, typename FUNC>
59  void fill_summary(FROM const& from, TO& to, FUNC func) {
60  if (to.size() != from.size()) {
61  TO temp(from.size());
62  transform_into(from.begin(), from.end(), temp.begin(), func);
63  to.swap(temp);
64  } else {
65  transform_into(from.begin(), from.end(), to.begin(), func);
66  }
67  }
68 
69  // -----------------------------
70 
71  // Here we make the trigger results inserter directly. This should
72  // probably be a utility in the WorkerRegistry or elsewhere.
73 
74  StreamSchedule::WorkerPtr makeInserter(ExceptionToActionTable const& actions,
75  std::shared_ptr<ActivityRegistry> areg,
76  std::shared_ptr<TriggerResultInserter> inserter) {
78  new edm::WorkerT<TriggerResultInserter::ModuleType>(inserter, inserter->moduleDescription(), &actions));
79  ptr->setActivityRegistry(areg);
80  return ptr;
81  }
82 
83  void initializeBranchToReadingWorker(ParameterSet const& opts,
84  ProductRegistry const& preg,
85  std::multimap<std::string, Worker*>& branchToReadingWorker) {
86  // See if any data has been marked to be deleted early (removing any duplicates)
87  auto vBranchesToDeleteEarly = opts.getUntrackedParameter<std::vector<std::string>>("canDeleteEarly");
88  if (not vBranchesToDeleteEarly.empty()) {
89  std::sort(vBranchesToDeleteEarly.begin(), vBranchesToDeleteEarly.end(), std::less<std::string>());
90  vBranchesToDeleteEarly.erase(std::unique(vBranchesToDeleteEarly.begin(), vBranchesToDeleteEarly.end()),
91  vBranchesToDeleteEarly.end());
92 
93  // Are the requested items in the product registry?
94  auto allBranchNames = preg.allBranchNames();
95  //the branch names all end with a period, which we do not want to compare with
96  for (auto& b : allBranchNames) {
97  b.resize(b.size() - 1);
98  }
99  std::sort(allBranchNames.begin(), allBranchNames.end(), std::less<std::string>());
100  std::vector<std::string> temp;
101  temp.reserve(vBranchesToDeleteEarly.size());
102 
103  std::set_intersection(vBranchesToDeleteEarly.begin(),
104  vBranchesToDeleteEarly.end(),
105  allBranchNames.begin(),
106  allBranchNames.end(),
107  std::back_inserter(temp));
108  vBranchesToDeleteEarly.swap(temp);
109  if (temp.size() != vBranchesToDeleteEarly.size()) {
110  std::vector<std::string> missingProducts;
111  std::set_difference(temp.begin(),
112  temp.end(),
113  vBranchesToDeleteEarly.begin(),
114  vBranchesToDeleteEarly.end(),
115  std::back_inserter(missingProducts));
116  LogInfo l("MissingProductsForCanDeleteEarly");
117  l << "The following products in the 'canDeleteEarly' list are not available in this job and will be ignored.";
118  for (auto const& n : missingProducts) {
119  l << "\n " << n;
120  }
121  }
122  //set placeholder for the branch, we will remove the nullptr if a
123  // module actually wants the branch.
124  for (auto const& branch : vBranchesToDeleteEarly) {
125  branchToReadingWorker.insert(std::make_pair(branch, static_cast<Worker*>(nullptr)));
126  }
127  }
128  }
129  } // namespace
130 
131  // -----------------------------
132 
133  typedef std::vector<std::string> vstring;
134 
135  // -----------------------------
136 
138  std::shared_ptr<TriggerResultInserter> inserter,
139  std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
140  std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
141  std::shared_ptr<ModuleRegistry> modReg,
142  ParameterSet& proc_pset,
143  service::TriggerNamesService const& tns,
144  PreallocationConfiguration const& prealloc,
145  ProductRegistry& preg,
146  BranchIDListHelper& branchIDListHelper,
147  ExceptionToActionTable const& actions,
148  std::shared_ptr<ActivityRegistry> areg,
149  std::shared_ptr<ProcessConfiguration> processConfiguration,
150  bool allowEarlyDelete,
151  StreamID streamID,
152  ProcessContext const* processContext)
153  : workerManager_(modReg, areg, actions),
154  actReg_(areg),
155  results_(new HLTGlobalStatus(tns.getTrigPaths().size())),
156  results_inserter_(),
157  trig_paths_(),
158  end_paths_(),
159  total_events_(),
160  total_passed_(),
161  number_of_unscheduled_modules_(0),
162  streamID_(streamID),
163  streamContext_(streamID_, processContext),
164  endpathsAreActive_(true),
165  skippingEvent_(false) {
166  ParameterSet const& opts = proc_pset.getUntrackedParameterSet("options", ParameterSet());
167  bool hasPath = false;
168  std::vector<std::string> const& pathNames = tns.getTrigPaths();
169  std::vector<std::string> const& endPathNames = tns.getEndPaths();
170 
171  int trig_bitpos = 0;
172  trig_paths_.reserve(pathNames.size());
173  for (auto const& trig_name : pathNames) {
174  fillTrigPath(proc_pset, preg, &prealloc, processConfiguration, trig_bitpos, trig_name, results(), endPathNames);
175  ++trig_bitpos;
176  hasPath = true;
177  }
178 
179  if (hasPath) {
180  // the results inserter stands alone
181  inserter->setTrigResultForStream(streamID.value(), results());
182 
183  results_inserter_ = makeInserter(actions, actReg_, inserter);
185  }
186 
187  // fill normal endpaths
188  int bitpos = 0;
189  end_paths_.reserve(endPathNames.size());
190  for (auto const& end_path_name : endPathNames) {
191  fillEndPath(proc_pset, preg, &prealloc, processConfiguration, bitpos, end_path_name, endPathNames);
192  ++bitpos;
193  }
194 
195  makePathStatusInserters(pathStatusInserters, endPathStatusInserters, actions);
196 
197  //See if all modules were used
198  std::set<std::string> usedWorkerLabels;
199  for (auto const& worker : allWorkers()) {
200  usedWorkerLabels.insert(worker->description().moduleLabel());
201  }
202  std::vector<std::string> modulesInConfig(proc_pset.getParameter<std::vector<std::string>>("@all_modules"));
203  std::set<std::string> modulesInConfigSet(modulesInConfig.begin(), modulesInConfig.end());
204  std::vector<std::string> unusedLabels;
205  set_difference(modulesInConfigSet.begin(),
206  modulesInConfigSet.end(),
207  usedWorkerLabels.begin(),
208  usedWorkerLabels.end(),
209  back_inserter(unusedLabels));
210  std::set<std::string> unscheduledLabels;
211  std::vector<std::string> shouldBeUsedLabels;
212  if (!unusedLabels.empty()) {
213  //Need to
214  // 1) create worker
215  // 2) if it is a WorkerT<EDProducer>, add it to our list
216  // 3) hand list to our delayed reader
217  for (auto const& label : unusedLabels) {
218  bool isTracked;
219  ParameterSet* modulePSet(proc_pset.getPSetForUpdate(label, isTracked));
220  assert(isTracked);
221  assert(modulePSet != nullptr);
223  *modulePSet, preg, &prealloc, processConfiguration, label, unscheduledLabels, shouldBeUsedLabels);
224  }
225  if (!shouldBeUsedLabels.empty()) {
226  std::ostringstream unusedStream;
227  unusedStream << "'" << shouldBeUsedLabels.front() << "'";
228  for (std::vector<std::string>::iterator itLabel = shouldBeUsedLabels.begin() + 1,
229  itLabelEnd = shouldBeUsedLabels.end();
230  itLabel != itLabelEnd;
231  ++itLabel) {
232  unusedStream << ",'" << *itLabel << "'";
233  }
234  LogInfo("path") << "The following module labels are not assigned to any path:\n" << unusedStream.str() << "\n";
235  }
236  }
237  number_of_unscheduled_modules_ = unscheduledLabels.size();
238 
239  initializeEarlyDelete(*modReg, opts, preg, allowEarlyDelete);
240 
241  } // StreamSchedule::StreamSchedule
242 
244  edm::ParameterSet const& opts,
245  edm::ProductRegistry const& preg,
246  bool allowEarlyDelete) {
247  //for now, if have a subProcess, don't allow early delete
248  // In the future we should use the SubProcess's 'keep list' to decide what can be kept
249  if (not allowEarlyDelete)
250  return;
251 
252  //see if 'canDeleteEarly' was set and if so setup the list with those products actually
253  // registered for this job
254  std::multimap<std::string, Worker*> branchToReadingWorker;
255  initializeBranchToReadingWorker(opts, preg, branchToReadingWorker);
256 
257  //If no delete early items have been specified we don't have to do anything
258  if (branchToReadingWorker.empty()) {
259  return;
260  }
261  const std::vector<std::string> kEmpty;
262  std::map<Worker*, unsigned int> reserveSizeForWorker;
263  unsigned int upperLimitOnReadingWorker = 0;
264  unsigned int upperLimitOnIndicies = 0;
265  unsigned int nUniqueBranchesToDelete = branchToReadingWorker.size();
266 
267  //talk with output modules first
268  modReg.forAllModuleHolders([&branchToReadingWorker, &nUniqueBranchesToDelete](maker::ModuleHolder* iHolder) {
269  auto comm = iHolder->createOutputModuleCommunicator();
270  if (comm) {
271  if (!branchToReadingWorker.empty()) {
272  //If an OutputModule needs a product, we can't delete it early
273  // so we should remove it from our list
274  SelectedProductsForBranchType const& kept = comm->keptProducts();
275  for (auto const& item : kept[InEvent]) {
276  BranchDescription const& desc = *item.first;
277  auto found = branchToReadingWorker.equal_range(desc.branchName());
278  if (found.first != found.second) {
279  --nUniqueBranchesToDelete;
280  branchToReadingWorker.erase(found.first, found.second);
281  }
282  }
283  }
284  }
285  });
286 
287  if (branchToReadingWorker.empty()) {
288  return;
289  }
290 
291  for (auto w : allWorkers()) {
292  //determine if this module could read a branch we want to delete early
293  auto pset = pset::Registry::instance()->getMapped(w->description().parameterSetID());
294  if (nullptr != pset) {
295  auto branches = pset->getUntrackedParameter<std::vector<std::string>>("mightGet", kEmpty);
296  if (not branches.empty()) {
297  ++upperLimitOnReadingWorker;
298  }
299  for (auto const& branch : branches) {
300  auto found = branchToReadingWorker.equal_range(branch);
301  if (found.first != found.second) {
302  ++upperLimitOnIndicies;
303  ++reserveSizeForWorker[w];
304  if (nullptr == found.first->second) {
305  found.first->second = w;
306  } else {
307  branchToReadingWorker.insert(make_pair(found.first->first, w));
308  }
309  }
310  }
311  }
312  }
313  {
314  auto it = branchToReadingWorker.begin();
315  std::vector<std::string> unusedBranches;
316  while (it != branchToReadingWorker.end()) {
317  if (it->second == nullptr) {
318  unusedBranches.push_back(it->first);
319  //erasing the object invalidates the iterator so must advance it first
320  auto temp = it;
321  ++it;
322  branchToReadingWorker.erase(temp);
323  } else {
324  ++it;
325  }
326  }
327  if (not unusedBranches.empty()) {
328  LogWarning l("UnusedProductsForCanDeleteEarly");
329  l << "The following products in the 'canDeleteEarly' list are not used in this job and will be ignored.\n"
330  " If possible, remove the producer from the job or add the product to the producer's own 'mightGet' list.";
331  for (auto const& n : unusedBranches) {
332  l << "\n " << n;
333  }
334  }
335  }
336  if (!branchToReadingWorker.empty()) {
337  earlyDeleteHelpers_.reserve(upperLimitOnReadingWorker);
338  earlyDeleteHelperToBranchIndicies_.resize(upperLimitOnIndicies, 0);
339  earlyDeleteBranchToCount_.reserve(nUniqueBranchesToDelete);
340  std::map<const Worker*, EarlyDeleteHelper*> alreadySeenWorkers;
341  std::string lastBranchName;
342  size_t nextOpenIndex = 0;
343  unsigned int* beginAddress = &(earlyDeleteHelperToBranchIndicies_.front());
344  for (auto& branchAndWorker : branchToReadingWorker) {
345  if (lastBranchName != branchAndWorker.first) {
346  //have to put back the period we removed earlier in order to get the proper name
347  BranchID bid(branchAndWorker.first + ".");
348  earlyDeleteBranchToCount_.emplace_back(bid, 0U);
349  lastBranchName = branchAndWorker.first;
350  }
351  auto found = alreadySeenWorkers.find(branchAndWorker.second);
352  if (alreadySeenWorkers.end() == found) {
353  //NOTE: we will set aside enough space in earlyDeleteHelperToBranchIndicies_ to accommodate
354  // all the branches that might be read by this worker. However, initially we will only tell the
355  // EarlyDeleteHelper about the first one. As additional branches are added via 'appendIndex' the
356  // EarlyDeleteHelper will automatically advance its internal end pointer.
357  size_t index = nextOpenIndex;
358  size_t nIndices = reserveSizeForWorker[branchAndWorker.second];
360  earlyDeleteHelpers_.emplace_back(beginAddress + index, beginAddress + index + 1, &earlyDeleteBranchToCount_);
361  branchAndWorker.second->setEarlyDeleteHelper(&(earlyDeleteHelpers_.back()));
362  alreadySeenWorkers.insert(std::make_pair(branchAndWorker.second, &(earlyDeleteHelpers_.back())));
363  nextOpenIndex += nIndices;
364  } else {
365  found->second->appendIndex(earlyDeleteBranchToCount_.size() - 1);
366  }
367  }
368 
369  //Now we can compactify the earlyDeleteHelperToBranchIndicies_ since we may have over estimated the
370  // space needed for each module
371  auto itLast = earlyDeleteHelpers_.begin();
372  for (auto it = earlyDeleteHelpers_.begin() + 1; it != earlyDeleteHelpers_.end(); ++it) {
373  if (itLast->end() != it->begin()) {
374  //figure the offset for next Worker since it hasn't been moved yet so it has the original address
375  unsigned int delta = it->begin() - itLast->end();
376  it->shiftIndexPointers(delta);
377 
379  earlyDeleteHelperToBranchIndicies_.begin() + (itLast->end() - beginAddress),
380  earlyDeleteHelperToBranchIndicies_.begin() + (it->begin() - beginAddress));
381  }
382  itLast = it;
383  }
385  earlyDeleteHelperToBranchIndicies_.begin() + (itLast->end() - beginAddress),
387 
388  //now tell the paths about the deleters
389  for (auto& p : trig_paths_) {
390  p.setEarlyDeleteHelpers(alreadySeenWorkers);
391  }
392  for (auto& p : end_paths_) {
393  p.setEarlyDeleteHelpers(alreadySeenWorkers);
394  }
396  }
397  }
398 
400  ProductRegistry& preg,
401  PreallocationConfiguration const* prealloc,
402  std::shared_ptr<ProcessConfiguration const> processConfiguration,
403  std::string const& pathName,
404  bool ignoreFilters,
405  PathWorkers& out,
406  std::vector<std::string> const& endPathNames) {
407  vstring modnames = proc_pset.getParameter<vstring>(pathName);
408  PathWorkers tmpworkers;
409 
410  unsigned int placeInPath = 0;
411  for (auto const& name : modnames) {
413  if (name[0] == '!')
414  filterAction = WorkerInPath::Veto;
415  else if (name[0] == '-')
416  filterAction = WorkerInPath::Ignore;
417 
418  std::string moduleLabel = name;
419  if (filterAction != WorkerInPath::Normal)
420  moduleLabel.erase(0, 1);
421 
422  bool isTracked;
423  ParameterSet* modpset = proc_pset.getPSetForUpdate(moduleLabel, isTracked);
424  if (modpset == nullptr) {
425  std::string pathType("endpath");
426  if (!search_all(endPathNames, pathName)) {
427  pathType = std::string("path");
428  }
430  << "The unknown module label \"" << moduleLabel << "\" appears in " << pathType << " \"" << pathName
431  << "\"\n please check spelling or remove that label from the path.";
432  }
433  assert(isTracked);
434 
435  Worker* worker = workerManager_.getWorker(*modpset, preg, prealloc, processConfiguration, moduleLabel);
436  if (ignoreFilters && filterAction != WorkerInPath::Ignore && worker->moduleType() == Worker::kFilter) {
437  // We have a filter on an end path, and the filter is not explicitly ignored.
438  // See if the filter is allowed.
439  std::vector<std::string> allowed_filters = proc_pset.getUntrackedParameter<vstring>("@filters_on_endpaths");
440  if (!search_all(allowed_filters, worker->description().moduleName())) {
441  // Filter is not allowed. Ignore the result, and issue a warning.
442  filterAction = WorkerInPath::Ignore;
443  LogWarning("FilterOnEndPath") << "The EDFilter '" << worker->description().moduleName()
444  << "' with module label '" << moduleLabel << "' appears on EndPath '"
445  << pathName << "'.\n"
446  << "The return value of the filter will be ignored.\n"
447  << "To suppress this warning, either remove the filter from the endpath,\n"
448  << "or explicitly ignore it in the configuration by using cms.ignore().\n";
449  }
450  }
451  tmpworkers.emplace_back(worker, filterAction, placeInPath);
452  ++placeInPath;
453  }
454 
455  out.swap(tmpworkers);
456  }
457 
459  ProductRegistry& preg,
460  PreallocationConfiguration const* prealloc,
461  std::shared_ptr<ProcessConfiguration const> processConfiguration,
462  int bitpos,
463  std::string const& name,
464  TrigResPtr trptr,
465  std::vector<std::string> const& endPathNames) {
466  PathWorkers tmpworkers;
467  fillWorkers(proc_pset, preg, prealloc, processConfiguration, name, false, tmpworkers, endPathNames);
468 
469  // an empty path will cause an extra bit that is not used
470  if (!tmpworkers.empty()) {
471  trig_paths_.emplace_back(bitpos,
472  name,
473  tmpworkers,
474  trptr,
475  actionTable(),
476  actReg_,
480  } else {
481  empty_trig_paths_.push_back(bitpos);
482  }
483  for (WorkerInPath const& workerInPath : tmpworkers) {
484  addToAllWorkers(workerInPath.getWorker());
485  }
486  }
487 
489  ProductRegistry& preg,
490  PreallocationConfiguration const* prealloc,
491  std::shared_ptr<ProcessConfiguration const> processConfiguration,
492  int bitpos,
493  std::string const& name,
494  std::vector<std::string> const& endPathNames) {
495  PathWorkers tmpworkers;
496  fillWorkers(proc_pset, preg, prealloc, processConfiguration, name, true, tmpworkers, endPathNames);
497 
498  if (!tmpworkers.empty()) {
499  //EndPaths are not supposed to stop if SkipEvent type exception happens
500  end_paths_.emplace_back(bitpos,
501  name,
502  tmpworkers,
503  TrigResPtr(),
504  actionTable(),
505  actReg_,
507  nullptr,
509  } else {
510  empty_end_paths_.push_back(bitpos);
511  }
512  for (WorkerInPath const& workerInPath : tmpworkers) {
513  addToAllWorkers(workerInPath.getWorker());
514  }
515  }
516 
518 
520 
522  Worker* found = nullptr;
523  for (auto const& worker : allWorkers()) {
524  if (worker->description().moduleLabel() == iLabel) {
525  found = worker;
526  break;
527  }
528  }
529  if (nullptr == found) {
530  return;
531  }
532 
533  iMod->replaceModuleFor(found);
535  }
536 
537  std::vector<ModuleDescription const*> StreamSchedule::getAllModuleDescriptions() const {
538  std::vector<ModuleDescription const*> result;
539  result.reserve(allWorkers().size());
540 
541  for (auto const& worker : allWorkers()) {
542  ModuleDescription const* p = worker->descPtr();
543  result.push_back(p);
544  }
545  return result;
546  }
547 
549  WaitingTaskHolder iTask,
550  EventPrincipal& ep,
551  EventSetupImpl const& es,
552  ServiceToken const& serviceToken,
553  std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters) {
554  try {
555  this->resetAll();
556 
558 
559  Traits::setStreamContext(streamContext_, ep);
560  //a service may want to communicate with another service
561  ServiceRegistry::Operate guard(serviceToken);
562  Traits::preScheduleSignal(actReg_.get(), &streamContext_);
563 
564  HLTPathStatus hltPathStatus(hlt::Pass, 0);
565  for (int empty_trig_path : empty_trig_paths_) {
566  results_->at(empty_trig_path) = hltPathStatus;
567  pathStatusInserters[empty_trig_path]->setPathStatus(streamID_, hltPathStatus);
568  std::exception_ptr except = pathStatusInserterWorkers_[empty_trig_path]
571  if (except) {
572  iTask.doneWaiting(except);
573  return;
574  }
575  }
576  for (int empty_end_path : empty_end_paths_) {
577  std::exception_ptr except = endPathStatusInserterWorkers_[empty_end_path]
580  if (except) {
581  iTask.doneWaiting(except);
582  return;
583  }
584  }
585 
586  // This call takes care of the unscheduled processing.
588 
589  ++total_events_;
590 
591  //use to give priorities on an error to ones from Paths
592  auto pathErrorHolder = std::make_unique<std::atomic<std::exception_ptr*>>(nullptr);
593  auto pathErrorPtr = pathErrorHolder.get();
594  auto allPathsDone = make_waiting_task(
595  tbb::task::allocate_root(),
596  [iTask, this, serviceToken, pathError = std::move(pathErrorHolder)](std::exception_ptr const* iPtr) mutable {
597  ServiceRegistry::Operate operate(serviceToken);
598 
599  std::exception_ptr ptr;
600  if (pathError->load()) {
601  ptr = *pathError->load();
602  delete pathError->load();
603  }
604  if ((not ptr) and iPtr) {
605  ptr = *iPtr;
606  }
608  });
609  //The holder guarantees that if the paths finish before the loop ends
610  // that we do not start too soon. It also guarantees that the task will
611  // run under that condition.
612  WaitingTaskHolder allPathsHolder(allPathsDone);
613 
614  auto pathsDone = make_waiting_task(
615  tbb::task::allocate_root(),
616  [allPathsHolder, pathErrorPtr, &ep, &es, this, serviceToken](std::exception_ptr const* iPtr) mutable {
617  ServiceRegistry::Operate operate(serviceToken);
618 
619  if (iPtr) {
620  //this is used to prioritize this error over one
621  // that happens in EndPath or Accumulate
622  pathErrorPtr->store(new std::exception_ptr(*iPtr));
623  }
624  finishedPaths(*pathErrorPtr, std::move(allPathsHolder), ep, es);
625  });
626 
627  //The holder guarantees that if the paths finish before the loop ends
628  // that we do not start too soon. It also guarantees that the task will
629  // run under that condition.
630  WaitingTaskHolder taskHolder(pathsDone);
631 
632  //start end paths first so on single threaded the paths will run first
633  for (auto it = end_paths_.rbegin(), itEnd = end_paths_.rend(); it != itEnd; ++it) {
634  it->processOneOccurrenceAsync(allPathsDone, ep, es, serviceToken, streamID_, &streamContext_);
635  }
636 
637  for (auto it = trig_paths_.rbegin(), itEnd = trig_paths_.rend(); it != itEnd; ++it) {
638  it->processOneOccurrenceAsync(pathsDone, ep, es, serviceToken, streamID_, &streamContext_);
639  }
640 
641  ParentContext parentContext(&streamContext_);
643  allPathsDone, ep, es, serviceToken, streamID_, parentContext, &streamContext_);
644  } catch (...) {
645  iTask.doneWaiting(std::current_exception());
646  }
647  }
648 
649  void StreamSchedule::finishedPaths(std::atomic<std::exception_ptr*>& iExcept,
650  WaitingTaskHolder iWait,
651  EventPrincipal& ep,
652  EventSetupImpl const& es) {
653  if (iExcept) {
654  try {
655  std::rethrow_exception(*(iExcept.load()));
656  } catch (cms::Exception& e) {
658  assert(action != exception_actions::IgnoreCompletely);
659  assert(action != exception_actions::FailPath);
660  if (action == exception_actions::SkipEvent) {
661  edm::printCmsExceptionWarning("SkipEvent", e);
662  *(iExcept.load()) = std::exception_ptr();
663  } else {
664  *(iExcept.load()) = std::current_exception();
665  }
666  } catch (...) {
667  *(iExcept.load()) = std::current_exception();
668  }
669  }
670 
671  if ((not iExcept) and results_->accept()) {
672  ++total_passed_;
673  }
674 
675  if (nullptr != results_inserter_.get()) {
676  try {
677  //Even if there was an exception, we need to allow results inserter
678  // to run since some module may be waiting on its results.
679  ParentContext parentContext(&streamContext_);
681 
682  results_inserter_->doWork<Traits>(ep, es, streamID_, parentContext, &streamContext_);
683  } catch (cms::Exception& ex) {
684  if (not iExcept) {
685  if (ex.context().empty()) {
686  std::ostringstream ost;
687  ost << "Processing Event " << ep.id();
688  ex.addContext(ost.str());
689  }
690  iExcept.store(new std::exception_ptr(std::current_exception()));
691  }
692  } catch (...) {
693  if (not iExcept) {
694  iExcept.store(new std::exception_ptr(std::current_exception()));
695  }
696  }
697  }
698  std::exception_ptr ptr;
699  if (iExcept) {
700  ptr = *iExcept.load();
701  }
702  iWait.doneWaiting(ptr);
703  }
704 
705  std::exception_ptr StreamSchedule::finishProcessOneEvent(std::exception_ptr iExcept) {
707 
708  if (iExcept) {
709  //add context information to the exception and print message
710  try {
711  convertException::wrap([&]() { std::rethrow_exception(iExcept); });
712  } catch (cms::Exception& ex) {
713  bool const cleaningUpAfterException = false;
714  if (ex.context().empty()) {
715  addContextAndPrintException("Calling function StreamSchedule::processOneEvent", ex, cleaningUpAfterException);
716  } else {
717  addContextAndPrintException("", ex, cleaningUpAfterException);
718  }
719  iExcept = std::current_exception();
720  }
721 
722  actReg_->preStreamEarlyTerminationSignal_(streamContext_, TerminationOrigin::ExceptionFromThisContext);
723  }
724 
725  try {
726  Traits::postScheduleSignal(actReg_.get(), &streamContext_);
727  } catch (...) {
728  if (not iExcept) {
729  iExcept = std::current_exception();
730  }
731  }
732  if (not iExcept) {
734  }
735 
736  return iExcept;
737  }
738 
739  void StreamSchedule::availablePaths(std::vector<std::string>& oLabelsToFill) const {
740  oLabelsToFill.reserve(trig_paths_.size());
741  std::transform(trig_paths_.begin(),
742  trig_paths_.end(),
743  std::back_inserter(oLabelsToFill),
744  std::bind(&Path::name, std::placeholders::_1));
745  }
746 
747  void StreamSchedule::modulesInPath(std::string const& iPathLabel, std::vector<std::string>& oLabelsToFill) const {
748  TrigPaths::const_iterator itFound = std::find_if(
749  trig_paths_.begin(),
750  trig_paths_.end(),
751  std::bind(std::equal_to<std::string>(), iPathLabel, std::bind(&Path::name, std::placeholders::_1)));
752  if (itFound != trig_paths_.end()) {
753  oLabelsToFill.reserve(itFound->size());
754  for (size_t i = 0; i < itFound->size(); ++i) {
755  oLabelsToFill.push_back(itFound->getWorker(i)->description().moduleLabel());
756  }
757  }
758  }
759 
761  std::vector<ModuleDescription const*>& descriptions,
762  unsigned int hint) const {
763  descriptions.clear();
764  bool found = false;
765  TrigPaths::const_iterator itFound;
766 
767  if (hint < trig_paths_.size()) {
768  itFound = trig_paths_.begin() + hint;
769  if (itFound->name() == iPathLabel)
770  found = true;
771  }
772  if (!found) {
773  // if the hint did not work, do it the slow way
774  itFound = std::find_if(
775  trig_paths_.begin(),
776  trig_paths_.end(),
777  std::bind(std::equal_to<std::string>(), iPathLabel, std::bind(&Path::name, std::placeholders::_1)));
778  if (itFound != trig_paths_.end())
779  found = true;
780  }
781  if (found) {
782  descriptions.reserve(itFound->size());
783  for (size_t i = 0; i < itFound->size(); ++i) {
784  descriptions.push_back(itFound->getWorker(i)->descPtr());
785  }
786  }
787  }
788 
790  std::vector<ModuleDescription const*>& descriptions,
791  unsigned int hint) const {
792  descriptions.clear();
793  bool found = false;
794  TrigPaths::const_iterator itFound;
795 
796  if (hint < end_paths_.size()) {
797  itFound = end_paths_.begin() + hint;
798  if (itFound->name() == iEndPathLabel)
799  found = true;
800  }
801  if (!found) {
802  // if the hint did not work, do it the slow way
803  itFound = std::find_if(
804  end_paths_.begin(),
805  end_paths_.end(),
806  std::bind(std::equal_to<std::string>(), iEndPathLabel, std::bind(&Path::name, std::placeholders::_1)));
807  if (itFound != end_paths_.end())
808  found = true;
809  }
810  if (found) {
811  descriptions.reserve(itFound->size());
812  for (size_t i = 0; i < itFound->size(); ++i) {
813  descriptions.push_back(itFound->getWorker(i)->descPtr());
814  }
815  }
816  }
817 
818  void StreamSchedule::enableEndPaths(bool active) { endpathsAreActive_ = active; }
819 
821 
822  static void fillModuleInPathSummary(Path const& path, size_t which, ModuleInPathSummary& sum) {
823  sum.timesVisited += path.timesVisited(which);
824  sum.timesPassed += path.timesPassed(which);
825  sum.timesFailed += path.timesFailed(which);
826  sum.timesExcept += path.timesExcept(which);
827  sum.moduleLabel = path.getWorker(which)->description().moduleLabel();
828  }
829 
830  static void fillPathSummary(Path const& path, PathSummary& sum) {
831  sum.name = path.name();
832  sum.bitPosition = path.bitPosition();
833  sum.timesRun += path.timesRun();
834  sum.timesPassed += path.timesPassed();
835  sum.timesFailed += path.timesFailed();
836  sum.timesExcept += path.timesExcept();
837 
838  Path::size_type sz = path.size();
839  if (sum.moduleInPathSummaries.empty()) {
840  std::vector<ModuleInPathSummary> temp(sz);
841  for (size_t i = 0; i != sz; ++i) {
842  fillModuleInPathSummary(path, i, temp[i]);
843  }
844  sum.moduleInPathSummaries.swap(temp);
845  } else {
846  assert(sz == sum.moduleInPathSummaries.size());
847  for (size_t i = 0; i != sz; ++i) {
849  }
850  }
851  }
852 
853  static void fillWorkerSummaryAux(Worker const& w, WorkerSummary& sum) {
854  sum.timesVisited += w.timesVisited();
855  sum.timesRun += w.timesRun();
856  sum.timesPassed += w.timesPassed();
857  sum.timesFailed += w.timesFailed();
858  sum.timesExcept += w.timesExcept();
859  sum.moduleLabel = w.description().moduleLabel();
860  }
861 
862  static void fillWorkerSummary(Worker const* pw, WorkerSummary& sum) { fillWorkerSummaryAux(*pw, sum); }
863 
868 
869  fill_summary(trig_paths_, rep.trigPathSummaries, &fillPathSummary);
870  fill_summary(end_paths_, rep.endPathSummaries, &fillPathSummary);
871  fill_summary(allWorkers(), rep.workerSummaries, &fillWorkerSummary);
872  }
873 
875  using std::placeholders::_1;
877  for_all(trig_paths_, std::bind(&Path::clearCounters, _1));
878  for_all(end_paths_, std::bind(&Path::clearCounters, _1));
879  for_all(allWorkers(), std::bind(&Worker::clearCounters, _1));
880  }
881 
883  skippingEvent_ = false;
884  results_->reset();
885  }
886 
888 
890  //must be sure we have cleared the count first
891  for (auto& count : earlyDeleteBranchToCount_) {
892  count.count = 0;
893  }
894  //now reset based on how many helpers use that branch
896  ++(earlyDeleteBranchToCount_[index].count);
897  }
898  for (auto& helper : earlyDeleteHelpers_) {
899  helper.reset();
900  }
901  }
902 
904  std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
905  std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
906  ExceptionToActionTable const& actions) {
907  int bitpos = 0;
908  unsigned int indexEmpty = 0;
909  unsigned int indexOfPath = 0;
910  for (auto& pathStatusInserter : pathStatusInserters) {
911  std::shared_ptr<PathStatusInserter> inserterPtr = get_underlying(pathStatusInserter);
912  WorkerPtr workerPtr(
913  new edm::WorkerT<PathStatusInserter::ModuleType>(inserterPtr, inserterPtr->moduleDescription(), &actions));
914  pathStatusInserterWorkers_.emplace_back(workerPtr);
915  workerPtr->setActivityRegistry(actReg_);
916  addToAllWorkers(workerPtr.get());
917 
918  // A little complexity here because a C++ Path object is not
919  // instantiated and put into end_paths if there are no modules
920  // on the configured path.
921  if (indexEmpty < empty_trig_paths_.size() && bitpos == empty_trig_paths_.at(indexEmpty)) {
922  ++indexEmpty;
923  } else {
924  trig_paths_.at(indexOfPath).setPathStatusInserter(inserterPtr.get(), workerPtr.get());
925  ++indexOfPath;
926  }
927  ++bitpos;
928  }
929 
930  bitpos = 0;
931  indexEmpty = 0;
932  indexOfPath = 0;
933  for (auto& endPathStatusInserter : endPathStatusInserters) {
934  std::shared_ptr<EndPathStatusInserter> inserterPtr = get_underlying(endPathStatusInserter);
935  WorkerPtr workerPtr(
936  new edm::WorkerT<EndPathStatusInserter::ModuleType>(inserterPtr, inserterPtr->moduleDescription(), &actions));
937  endPathStatusInserterWorkers_.emplace_back(workerPtr);
938  workerPtr->setActivityRegistry(actReg_);
939  addToAllWorkers(workerPtr.get());
940 
941  // A little complexity here because a C++ Path object is not
942  // instantiated and put into end_paths if there are no modules
943  // on the configured path.
944  if (indexEmpty < empty_end_paths_.size() && bitpos == empty_end_paths_.at(indexEmpty)) {
945  ++indexEmpty;
946  } else {
947  end_paths_.at(indexOfPath).setPathStatusInserter(nullptr, workerPtr.get());
948  ++indexOfPath;
949  }
950  ++bitpos;
951  }
952  }
953 } // namespace edm
static void fillModuleInPathSummary(Path const &path, size_t which, ModuleInPathSummary &sum)
size
Write out results.
dbl * delta
Definition: mlp_gen.cc:36
void moduleDescriptionsInEndPath(std::string const &iEndPathLabel, std::vector< ModuleDescription const * > &descriptions, unsigned int hint) const
T getParameter(std::string const &) const
std::vector< PathSummary > endPathSummaries
Definition: TriggerReport.h:59
T getUntrackedParameter(std::string const &, T const &) const
std::string const & branchName() const
int timesRun() const
Definition: Path.h:84
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
int bitPosition() const
Definition: Path.h:79
ModuleDescription const & description() const
Definition: Worker.h:190
std::vector< int > empty_trig_paths_
Definition: helper.py:1
void moduleDescriptionsInPath(std::string const &iPathLabel, std::vector< ModuleDescription const * > &descriptions, unsigned int hint) const
roAction_t actions[nactions]
Definition: GenABIO.cc:181
void fillTrigPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, TrigResPtr, std::vector< std::string > const &endPathNames)
std::vector< edm::propagate_const< WorkerPtr > > pathStatusInserterWorkers_
void endStream(StreamID iID, StreamContext &streamContext)
const double w
Definition: UKUtility.cc:23
virtual void replaceModuleFor(Worker *) const =0
Strings const & getEndPaths() const
int totalEventsFailed() const
std::array< SelectedProducts, NumBranchTypes > SelectedProductsForBranchType
void initializeEarlyDelete(ModuleRegistry &modReg, edm::ParameterSet const &opts, edm::ProductRegistry const &preg, bool allowEarlyDelete)
std::shared_ptr< HLTGlobalStatus > TrigResPtr
WorkersInPath::size_type size_type
Definition: Path.h:49
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
void addToUnscheduledWorkers(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration > processConfiguration, std::string label, std::set< std::string > &unscheduledLabels, std::vector< std::string > &shouldBeUsedLabels)
int timesPassed() const
Definition: Worker.h:229
volatile bool endpathsAreActive_
std::vector< BranchToCount > earlyDeleteBranchToCount_
EventID const & id() const
int totalEvents() const
void clearCounters()
Definition: Worker.h:217
void addContextAndPrintException(char const *context, cms::Exception &ex, bool disablePrint)
std::string const & moduleName() const
std::string const & category() const
Definition: Exception.cc:143
void addToAllWorkers(Worker *w)
int timesExcept() const
Definition: Path.h:87
exception_actions::ActionCodes find(const std::string &category) const
size_type size() const
Definition: Path.h:91
ParameterSet getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
edm::propagate_const< WorkerPtr > results_inserter_
ExceptionToActionTable const & actionTable() const
returns the action table
void beginStream(StreamID id, StreamContext &streamContext)
Definition: Worker.cc:314
std::shared_ptr< Worker > WorkerPtr
std::vector< WorkerSummary > workerSummaries
Definition: TriggerReport.h:60
void fillWorkers(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &name, bool ignoreFilters, PathWorkers &out, std::vector< std::string > const &endPathNames)
std::string const & moduleLabel() const
unsigned int number_of_unscheduled_modules_
WorkerManager workerManager_
std::shared_ptr< ActivityRegistry > actReg_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
std::string moduleLabel
Definition: TriggerReport.h:51
char const * label
void fillEndPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, std::vector< std::string > const &endPathNames)
std::vector< WorkerInPath > PathWorkers
int timesExcept() const
Definition: Worker.h:231
int totalEventsPassed() const
std::vector< int > empty_end_paths_
void getTriggerReport(TriggerReport &rep) const
void doneWaiting(std::exception_ptr iExcept)
void setupOnDemandSystem(Principal &principal, EventSetupImpl const &es)
std::vector< PathSummary > trigPathSummaries
Definition: TriggerReport.h:58
EventSummary eventSummary
Definition: TriggerReport.h:57
accept
Definition: HLTenums.h:19
T & get_underlying(propagate_const< T > &)
int timesVisited() const
Definition: Worker.h:228
std::string name
Definition: TriggerReport.h:40
def unique(seq, keepstr=True)
Definition: tier0.py:25
Definition: Path.h:44
StreamContext streamContext_
std::list< std::string > const & context() const
Definition: Exception.cc:147
static void fillPathSummary(Path const &path, PathSummary &sum)
int timesPassed() const
Definition: Path.h:85
bool getMapped(key_type const &k, value_type &result) const
Definition: Registry.cc:17
int timesRun() const
Definition: Worker.h:227
#define end
Definition: vmac.h:39
void processOneEventAsync(WaitingTaskHolder iTask, EventPrincipal &ep, EventSetupImpl const &es, ServiceToken const &token, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters)
rep
Definition: cuy.py:1190
std::exception_ptr finishProcessOneEvent(std::exception_ptr)
element_type const * get() const
static void fillWorkerSummaryAux(Worker const &w, WorkerSummary &sum)
void beginStream(StreamID iID, StreamContext &streamContext)
void clearCounters()
Clear all the counters in the trigger report.
void processAccumulatorsAsync(WaitingTask *task, typename T::MyPrincipal const &ep, EventSetupImpl const &es, ServiceToken const &token, StreamID streamID, ParentContext const &parentContext, typename T::Context const *context)
unsigned int value() const
Definition: StreamID.h:42
std::vector< unsigned int > earlyDeleteHelperToBranchIndicies_
void forAllModuleHolders(F iFunc)
edm::propagate_const< TrigResPtr > results_
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:87
bool search_all(ForwardSequence const &s, Datum const &d)
Definition: Algorithms.h:36
int timesVisited(size_type i) const
Definition: Path.h:92
double b
Definition: hdecay.h:120
void addContext(std::string const &context)
Definition: Exception.cc:165
virtual std::unique_ptr< OutputModuleCommunicator > createOutputModuleCommunicator()=0
int timesFailed() const
Definition: Worker.h:230
static void fillWorkerSummary(Worker const *pw, WorkerSummary &sum)
bool endPathsEnabled() const
StreamSchedule(std::shared_ptr< TriggerResultInserter > inserter, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, std::shared_ptr< ModuleRegistry >, ParameterSet &proc_pset, service::TriggerNamesService const &tns, PreallocationConfiguration const &prealloc, ProductRegistry &pregistry, BranchIDListHelper &branchIDListHelper, ExceptionToActionTable const &actions, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration > processConfiguration, bool allowEarlyDelete, StreamID streamID, ProcessContext const *processContext)
std::atomic< bool > skippingEvent_
std::string const & name() const
Definition: Path.h:80
#define begin
Definition: vmac.h:32
HLT enums.
Strings const & getTrigPaths() const
Worker const * getWorker(size_type i) const
Definition: Path.h:96
void replaceModule(maker::ModuleHolder *iMod, std::string const &iLabel)
clone the type of module with label iLabel but configure with iPSet.
std::vector< edm::propagate_const< WorkerPtr > > endPathStatusInserterWorkers_
void enableEndPaths(bool active)
void makePathStatusInserters(std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, ExceptionToActionTable const &actions)
def which(cmd)
Definition: eostools.py:336
std::vector< ModuleInPathSummary > moduleInPathSummaries
Definition: TriggerReport.h:41
virtual Types moduleType() const =0
auto wrap(F iFunc) -> decltype(iFunc())
void finishedPaths(std::atomic< std::exception_ptr * > &, WaitingTaskHolder, EventPrincipal &ep, EventSetupImpl const &es)
TrigResConstPtr results() const
std::vector< std::string > vstring
Definition: Schedule.cc:565
def move(src, dest)
Definition: eostools.py:511
static Registry * instance()
Definition: Registry.cc:12
Worker * getWorker(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &label)
void clearCounters()
Definition: Path.cc:164
void availablePaths(std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill the labels for all paths in the process
std::vector< EarlyDeleteHelper > earlyDeleteHelpers_
void modulesInPath(std::string const &iPathLabel, std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel ...
void addToAllWorkers(Worker *w)
int timesFailed() const
Definition: Path.h:86
void printCmsExceptionWarning(char const *behavior, cms::Exception const &e)
def operate(timelog, memlog, json_f, num)
ParameterSet * getPSetForUpdate(std::string const &name, bool &isTracked)