CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Groups Pages
StreamSchedule.cc
Go to the documentation of this file.
2 
28 
30 
31 #include <algorithm>
32 #include <cassert>
33 #include <cstdlib>
34 #include <functional>
35 #include <iomanip>
36 #include <list>
37 #include <map>
38 #include <exception>
39 
40 namespace edm {
41 
42  namespace {
43 
44  // Function template to transform each element in the input range to
45  // a value placed into the output range. The supplied function
46  // should take a const_reference to the 'input', and write to a
47  // reference to the 'output'.
48  template <typename InputIterator, typename ForwardIterator, typename Func>
49  void transform_into(InputIterator begin, InputIterator end, ForwardIterator out, Func func) {
50  for (; begin != end; ++begin, ++out)
51  func(*begin, *out);
52  }
53 
54  // Function template that takes a sequence 'from', a sequence
55  // 'to', and a callable object 'func'. It and applies
56  // transform_into to fill the 'to' sequence with the values
57  // calcuated by the callable object, taking care to fill the
58  // outupt only if all calls succeed.
59  template <typename FROM, typename TO, typename FUNC>
60  void fill_summary(FROM const& from, TO& to, FUNC func) {
61  if (to.size() != from.size()) {
62  TO temp(from.size());
63  transform_into(from.begin(), from.end(), temp.begin(), func);
64  to.swap(temp);
65  } else {
66  transform_into(from.begin(), from.end(), to.begin(), func);
67  }
68  }
69 
70  // -----------------------------
71 
72  // Here we make the trigger results inserter directly. This should
73  // probably be a utility in the WorkerRegistry or elsewhere.
74 
75  StreamSchedule::WorkerPtr makeInserter(ExceptionToActionTable const& actions,
76  std::shared_ptr<ActivityRegistry> areg,
77  std::shared_ptr<TriggerResultInserter> inserter) {
79  new edm::WorkerT<TriggerResultInserter::ModuleType>(inserter, inserter->moduleDescription(), &actions));
80  ptr->setActivityRegistry(areg);
81  return ptr;
82  }
83 
84  void initializeBranchToReadingWorker(std::vector<std::string> const& branchesToDeleteEarly,
85  ProductRegistry const& preg,
86  std::multimap<std::string, Worker*>& branchToReadingWorker) {
87  auto vBranchesToDeleteEarly = branchesToDeleteEarly;
88  // Remove any duplicates
89  std::sort(vBranchesToDeleteEarly.begin(), vBranchesToDeleteEarly.end(), std::less<std::string>());
90  vBranchesToDeleteEarly.erase(std::unique(vBranchesToDeleteEarly.begin(), vBranchesToDeleteEarly.end()),
91  vBranchesToDeleteEarly.end());
92 
93  // Are the requested items in the product registry?
94  auto allBranchNames = preg.allBranchNames();
95  //the branch names all end with a period, which we do not want to compare with
96  for (auto& b : allBranchNames) {
97  b.resize(b.size() - 1);
98  }
99  std::sort(allBranchNames.begin(), allBranchNames.end(), std::less<std::string>());
100  std::vector<std::string> temp;
101  temp.reserve(vBranchesToDeleteEarly.size());
102 
103  std::set_intersection(vBranchesToDeleteEarly.begin(),
104  vBranchesToDeleteEarly.end(),
105  allBranchNames.begin(),
106  allBranchNames.end(),
107  std::back_inserter(temp));
108  vBranchesToDeleteEarly.swap(temp);
109  if (temp.size() != vBranchesToDeleteEarly.size()) {
110  std::vector<std::string> missingProducts;
111  std::set_difference(temp.begin(),
112  temp.end(),
113  vBranchesToDeleteEarly.begin(),
114  vBranchesToDeleteEarly.end(),
115  std::back_inserter(missingProducts));
116  LogInfo l("MissingProductsForCanDeleteEarly");
117  l << "The following products in the 'canDeleteEarly' list are not available in this job and will be ignored.";
118  for (auto const& n : missingProducts) {
119  l << "\n " << n;
120  }
121  }
122  //set placeholder for the branch, we will remove the nullptr if a
123  // module actually wants the branch.
124  for (auto const& branch : vBranchesToDeleteEarly) {
125  branchToReadingWorker.insert(std::make_pair(branch, static_cast<Worker*>(nullptr)));
126  }
127  }
128  } // namespace
129 
130  // -----------------------------
131 
132  typedef std::vector<std::string> vstring;
133 
134  // -----------------------------
135 
137  std::shared_ptr<TriggerResultInserter> inserter,
138  std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
139  std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
140  std::shared_ptr<ModuleRegistry> modReg,
141  ParameterSet& proc_pset,
144  ProductRegistry& preg,
145  BranchIDListHelper& branchIDListHelper,
146  ExceptionToActionTable const& actions,
147  std::shared_ptr<ActivityRegistry> areg,
148  std::shared_ptr<ProcessConfiguration> processConfiguration,
149  StreamID streamID,
150  ProcessContext const* processContext)
151  : workerManager_(modReg, areg, actions),
152  actReg_(areg),
153  results_(new HLTGlobalStatus(tns.getTrigPaths().size())),
154  results_inserter_(),
155  trig_paths_(),
156  end_paths_(),
157  total_events_(),
158  total_passed_(),
159  number_of_unscheduled_modules_(0),
160  streamID_(streamID),
161  streamContext_(streamID_, processContext),
162  skippingEvent_(false) {
163  bool hasPath = false;
164  std::vector<std::string> const& pathNames = tns.getTrigPaths();
165  std::vector<std::string> const& endPathNames = tns.getEndPaths();
166 
167  int trig_bitpos = 0;
168  trig_paths_.reserve(pathNames.size());
169  for (auto const& trig_name : pathNames) {
170  fillTrigPath(proc_pset, preg, &prealloc, processConfiguration, trig_bitpos, trig_name, results(), endPathNames);
171  ++trig_bitpos;
172  hasPath = true;
173  }
174 
175  if (hasPath) {
176  // the results inserter stands alone
177  inserter->setTrigResultForStream(streamID.value(), results());
178 
179  results_inserter_ = makeInserter(actions, actReg_, inserter);
181  }
182 
183  // fill normal endpaths
184  int bitpos = 0;
185  end_paths_.reserve(endPathNames.size());
186  for (auto const& end_path_name : endPathNames) {
187  fillEndPath(proc_pset, preg, &prealloc, processConfiguration, bitpos, end_path_name, endPathNames);
188  ++bitpos;
189  }
190 
191  makePathStatusInserters(pathStatusInserters, endPathStatusInserters, actions);
192 
193  //See if all modules were used
194  std::set<std::string> usedWorkerLabels;
195  for (auto const& worker : allWorkers()) {
196  usedWorkerLabels.insert(worker->description()->moduleLabel());
197  }
198  std::vector<std::string> modulesInConfig(proc_pset.getParameter<std::vector<std::string>>("@all_modules"));
199  std::set<std::string> modulesInConfigSet(modulesInConfig.begin(), modulesInConfig.end());
200  std::vector<std::string> unusedLabels;
201  set_difference(modulesInConfigSet.begin(),
202  modulesInConfigSet.end(),
203  usedWorkerLabels.begin(),
204  usedWorkerLabels.end(),
205  back_inserter(unusedLabels));
206  std::set<std::string> unscheduledLabels;
207  std::vector<std::string> shouldBeUsedLabels;
208  if (!unusedLabels.empty()) {
209  //Need to
210  // 1) create worker
211  // 2) if it is a WorkerT<EDProducer>, add it to our list
212  // 3) hand list to our delayed reader
213  for (auto const& label : unusedLabels) {
214  bool isTracked;
215  ParameterSet* modulePSet(proc_pset.getPSetForUpdate(label, isTracked));
216  assert(isTracked);
217  assert(modulePSet != nullptr);
219  *modulePSet, preg, &prealloc, processConfiguration, label, unscheduledLabels, shouldBeUsedLabels);
220  }
221  if (!shouldBeUsedLabels.empty()) {
222  std::ostringstream unusedStream;
223  unusedStream << "'" << shouldBeUsedLabels.front() << "'";
224  for (std::vector<std::string>::iterator itLabel = shouldBeUsedLabels.begin() + 1,
225  itLabelEnd = shouldBeUsedLabels.end();
226  itLabel != itLabelEnd;
227  ++itLabel) {
228  unusedStream << ",'" << *itLabel << "'";
229  }
230  LogInfo("path") << "The following module labels are not assigned to any path:\n" << unusedStream.str() << "\n";
231  }
232  }
233  number_of_unscheduled_modules_ = unscheduledLabels.size();
234  } // StreamSchedule::StreamSchedule
235 
237  std::vector<std::string> const& branchesToDeleteEarly,
238  edm::ProductRegistry const& preg) {
239  // setup the list with those products actually registered for this job
240  std::multimap<std::string, Worker*> branchToReadingWorker;
241  initializeBranchToReadingWorker(branchesToDeleteEarly, preg, branchToReadingWorker);
242 
243  const std::vector<std::string> kEmpty;
244  std::map<Worker*, unsigned int> reserveSizeForWorker;
245  unsigned int upperLimitOnReadingWorker = 0;
246  unsigned int upperLimitOnIndicies = 0;
247  unsigned int nUniqueBranchesToDelete = branchToReadingWorker.size();
248 
249  //talk with output modules first
250  modReg.forAllModuleHolders([&branchToReadingWorker, &nUniqueBranchesToDelete](maker::ModuleHolder* iHolder) {
251  auto comm = iHolder->createOutputModuleCommunicator();
252  if (comm) {
253  if (!branchToReadingWorker.empty()) {
254  //If an OutputModule needs a product, we can't delete it early
255  // so we should remove it from our list
256  SelectedProductsForBranchType const& kept = comm->keptProducts();
257  for (auto const& item : kept[InEvent]) {
258  BranchDescription const& desc = *item.first;
259  auto found = branchToReadingWorker.equal_range(desc.branchName());
260  if (found.first != found.second) {
261  --nUniqueBranchesToDelete;
262  branchToReadingWorker.erase(found.first, found.second);
263  }
264  }
265  }
266  }
267  });
268 
269  if (branchToReadingWorker.empty()) {
270  return;
271  }
272 
273  for (auto w : allWorkers()) {
274  //determine if this module could read a branch we want to delete early
275  auto pset = pset::Registry::instance()->getMapped(w->description()->parameterSetID());
276  if (nullptr != pset) {
277  auto branches = pset->getUntrackedParameter<std::vector<std::string>>("mightGet", kEmpty);
278  if (not branches.empty()) {
279  ++upperLimitOnReadingWorker;
280  }
281  for (auto const& branch : branches) {
282  auto found = branchToReadingWorker.equal_range(branch);
283  if (found.first != found.second) {
284  ++upperLimitOnIndicies;
285  ++reserveSizeForWorker[w];
286  if (nullptr == found.first->second) {
287  found.first->second = w;
288  } else {
289  branchToReadingWorker.insert(make_pair(found.first->first, w));
290  }
291  }
292  }
293  }
294  }
295  {
296  auto it = branchToReadingWorker.begin();
297  std::vector<std::string> unusedBranches;
298  while (it != branchToReadingWorker.end()) {
299  if (it->second == nullptr) {
300  unusedBranches.push_back(it->first);
301  //erasing the object invalidates the iterator so must advance it first
302  auto temp = it;
303  ++it;
304  branchToReadingWorker.erase(temp);
305  } else {
306  ++it;
307  }
308  }
309  if (not unusedBranches.empty()) {
310  LogWarning l("UnusedProductsForCanDeleteEarly");
311  l << "The following products in the 'canDeleteEarly' list are not used in this job and will be ignored.\n"
312  " If possible, remove the producer from the job or add the product to the producer's own 'mightGet' list.";
313  for (auto const& n : unusedBranches) {
314  l << "\n " << n;
315  }
316  }
317  }
318  if (!branchToReadingWorker.empty()) {
319  earlyDeleteHelpers_.reserve(upperLimitOnReadingWorker);
320  earlyDeleteHelperToBranchIndicies_.resize(upperLimitOnIndicies, 0);
321  earlyDeleteBranchToCount_.reserve(nUniqueBranchesToDelete);
322  std::map<const Worker*, EarlyDeleteHelper*> alreadySeenWorkers;
323  std::string lastBranchName;
324  size_t nextOpenIndex = 0;
325  unsigned int* beginAddress = &(earlyDeleteHelperToBranchIndicies_.front());
326  for (auto& branchAndWorker : branchToReadingWorker) {
327  if (lastBranchName != branchAndWorker.first) {
328  //have to put back the period we removed earlier in order to get the proper name
329  BranchID bid(branchAndWorker.first + ".");
330  earlyDeleteBranchToCount_.emplace_back(bid, 0U);
331  lastBranchName = branchAndWorker.first;
332  }
333  auto found = alreadySeenWorkers.find(branchAndWorker.second);
334  if (alreadySeenWorkers.end() == found) {
335  //NOTE: we will set aside enough space in earlyDeleteHelperToBranchIndicies_ to accommodate
336  // all the branches that might be read by this worker. However, initially we will only tell the
337  // EarlyDeleteHelper about the first one. As additional branches are added via 'appendIndex' the
338  // EarlyDeleteHelper will automatically advance its internal end pointer.
339  size_t index = nextOpenIndex;
340  size_t nIndices = reserveSizeForWorker[branchAndWorker.second];
342  earlyDeleteHelpers_.emplace_back(beginAddress + index, beginAddress + index + 1, &earlyDeleteBranchToCount_);
343  branchAndWorker.second->setEarlyDeleteHelper(&(earlyDeleteHelpers_.back()));
344  alreadySeenWorkers.insert(std::make_pair(branchAndWorker.second, &(earlyDeleteHelpers_.back())));
345  nextOpenIndex += nIndices;
346  } else {
347  found->second->appendIndex(earlyDeleteBranchToCount_.size() - 1);
348  }
349  }
350 
351  //Now we can compactify the earlyDeleteHelperToBranchIndicies_ since we may have over estimated the
352  // space needed for each module
353  auto itLast = earlyDeleteHelpers_.begin();
354  for (auto it = earlyDeleteHelpers_.begin() + 1; it != earlyDeleteHelpers_.end(); ++it) {
355  if (itLast->end() != it->begin()) {
356  //figure the offset for next Worker since it hasn't been moved yet so it has the original address
357  unsigned int delta = it->begin() - itLast->end();
358  it->shiftIndexPointers(delta);
359 
361  earlyDeleteHelperToBranchIndicies_.begin() + (itLast->end() - beginAddress),
362  earlyDeleteHelperToBranchIndicies_.begin() + (it->begin() - beginAddress));
363  }
364  itLast = it;
365  }
367  earlyDeleteHelperToBranchIndicies_.begin() + (itLast->end() - beginAddress),
369 
370  //now tell the paths about the deleters
371  for (auto& p : trig_paths_) {
372  p.setEarlyDeleteHelpers(alreadySeenWorkers);
373  }
374  for (auto& p : end_paths_) {
375  p.setEarlyDeleteHelpers(alreadySeenWorkers);
376  }
378  }
379  }
380 
382  ProductRegistry& preg,
384  std::shared_ptr<ProcessConfiguration const> processConfiguration,
385  std::string const& pathName,
386  bool ignoreFilters,
387  PathWorkers& out,
388  std::vector<std::string> const& endPathNames) {
389  vstring modnames = proc_pset.getParameter<vstring>(pathName);
390  PathWorkers tmpworkers;
391 
392  unsigned int placeInPath = 0;
393  for (auto const& name : modnames) {
394  //Modules except EDFilters are set to run concurrently by default
395  bool doNotRunConcurrently = false;
397  if (name[0] == '!') {
398  filterAction = WorkerInPath::Veto;
399  } else if (name[0] == '-' or name[0] == '+') {
400  filterAction = WorkerInPath::Ignore;
401  }
402  if (name[0] == '|' or name[0] == '+') {
403  //cms.wait was specified so do not run concurrently
404  doNotRunConcurrently = true;
405  }
406 
407  std::string moduleLabel = name;
408  if (filterAction != WorkerInPath::Normal or name[0] == '|') {
409  moduleLabel.erase(0, 1);
410  }
411 
412  bool isTracked;
413  ParameterSet* modpset = proc_pset.getPSetForUpdate(moduleLabel, isTracked);
414  if (modpset == nullptr) {
415  std::string pathType("endpath");
416  if (!search_all(endPathNames, pathName)) {
417  pathType = std::string("path");
418  }
420  << "The unknown module label \"" << moduleLabel << "\" appears in " << pathType << " \"" << pathName
421  << "\"\n please check spelling or remove that label from the path.";
422  }
423  assert(isTracked);
424 
425  Worker* worker = workerManager_.getWorker(*modpset, preg, prealloc, processConfiguration, moduleLabel);
426  if (ignoreFilters && filterAction != WorkerInPath::Ignore && worker->moduleType() == Worker::kFilter) {
427  // We have a filter on an end path, and the filter is not explicitly ignored.
428  // See if the filter is allowed.
429  std::vector<std::string> allowed_filters = proc_pset.getUntrackedParameter<vstring>("@filters_on_endpaths");
430  if (!search_all(allowed_filters, worker->description()->moduleName())) {
431  // Filter is not allowed. Ignore the result, and issue a warning.
432  filterAction = WorkerInPath::Ignore;
433  LogWarning("FilterOnEndPath") << "The EDFilter '" << worker->description()->moduleName()
434  << "' with module label '" << moduleLabel << "' appears on EndPath '"
435  << pathName << "'.\n"
436  << "The return value of the filter will be ignored.\n"
437  << "To suppress this warning, either remove the filter from the endpath,\n"
438  << "or explicitly ignore it in the configuration by using cms.ignore().\n";
439  }
440  }
441  bool runConcurrently = not doNotRunConcurrently;
442  if (runConcurrently && worker->moduleType() == Worker::kFilter and filterAction != WorkerInPath::Ignore) {
443  runConcurrently = false;
444  }
445  tmpworkers.emplace_back(worker, filterAction, placeInPath, runConcurrently);
446  ++placeInPath;
447  }
448 
449  out.swap(tmpworkers);
450  }
451 
453  ProductRegistry& preg,
455  std::shared_ptr<ProcessConfiguration const> processConfiguration,
456  int bitpos,
457  std::string const& name,
458  TrigResPtr trptr,
459  std::vector<std::string> const& endPathNames) {
460  PathWorkers tmpworkers;
461  fillWorkers(proc_pset, preg, prealloc, processConfiguration, name, false, tmpworkers, endPathNames);
462 
463  // an empty path will cause an extra bit that is not used
464  if (!tmpworkers.empty()) {
465  trig_paths_.emplace_back(bitpos,
466  name,
467  tmpworkers,
468  trptr,
469  actionTable(),
470  actReg_,
474  } else {
475  empty_trig_paths_.push_back(bitpos);
476  }
477  for (WorkerInPath const& workerInPath : tmpworkers) {
478  addToAllWorkers(workerInPath.getWorker());
479  }
480  }
481 
483  ProductRegistry& preg,
485  std::shared_ptr<ProcessConfiguration const> processConfiguration,
486  int bitpos,
487  std::string const& name,
488  std::vector<std::string> const& endPathNames) {
489  PathWorkers tmpworkers;
490  fillWorkers(proc_pset, preg, prealloc, processConfiguration, name, true, tmpworkers, endPathNames);
491 
492  if (!tmpworkers.empty()) {
493  //EndPaths are not supposed to stop if SkipEvent type exception happens
494  end_paths_.emplace_back(bitpos,
495  name,
496  tmpworkers,
497  TrigResPtr(),
498  actionTable(),
499  actReg_,
501  nullptr,
503  } else {
504  empty_end_paths_.push_back(bitpos);
505  }
506  for (WorkerInPath const& workerInPath : tmpworkers) {
507  addToAllWorkers(workerInPath.getWorker());
508  }
509  }
510 
512 
514 
516  Worker* found = nullptr;
517  for (auto const& worker : allWorkers()) {
518  if (worker->description()->moduleLabel() == iLabel) {
519  found = worker;
520  break;
521  }
522  }
523  if (nullptr == found) {
524  return;
525  }
526 
527  iMod->replaceModuleFor(found);
529  }
530 
532 
533  std::vector<ModuleDescription const*> StreamSchedule::getAllModuleDescriptions() const {
534  std::vector<ModuleDescription const*> result;
535  result.reserve(allWorkers().size());
536 
537  for (auto const& worker : allWorkers()) {
538  ModuleDescription const* p = worker->description();
539  result.push_back(p);
540  }
541  return result;
542  }
543 
545  WaitingTaskHolder iTask,
547  ServiceToken const& serviceToken,
548  std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters) {
549  EventPrincipal& ep = info.principal();
550 
551  // Caught exception is propagated via WaitingTaskHolder
552  CMS_SA_ALLOW try {
553  this->resetAll();
554 
556 
557  Traits::setStreamContext(streamContext_, ep);
558  //a service may want to communicate with another service
559  ServiceRegistry::Operate guard(serviceToken);
560  Traits::preScheduleSignal(actReg_.get(), &streamContext_);
561 
562  HLTPathStatus hltPathStatus(hlt::Pass, 0);
563  for (int empty_trig_path : empty_trig_paths_) {
564  results_->at(empty_trig_path) = hltPathStatus;
565  pathStatusInserters[empty_trig_path]->setPathStatus(streamID_, hltPathStatus);
566  std::exception_ptr except = pathStatusInserterWorkers_[empty_trig_path]
569  if (except) {
570  iTask.doneWaiting(except);
571  return;
572  }
573  }
574  for (int empty_end_path : empty_end_paths_) {
575  std::exception_ptr except = endPathStatusInserterWorkers_[empty_end_path]
578  if (except) {
579  iTask.doneWaiting(except);
580  return;
581  }
582  }
583 
586 
587  ++total_events_;
588 
589  //use to give priorities on an error to ones from Paths
590  auto pathErrorHolder = std::make_unique<std::atomic<std::exception_ptr*>>(nullptr);
591  auto pathErrorPtr = pathErrorHolder.get();
592  ServiceWeakToken weakToken = serviceToken;
593  auto allPathsDone = make_waiting_task(
594  [iTask, this, weakToken, pathError = std::move(pathErrorHolder)](std::exception_ptr const* iPtr) mutable {
595  ServiceRegistry::Operate operate(weakToken.lock());
596 
597  std::exception_ptr ptr;
598  if (pathError->load()) {
599  ptr = *pathError->load();
600  delete pathError->load();
601  }
602  if ((not ptr) and iPtr) {
603  ptr = *iPtr;
604  }
606  });
607  //The holder guarantees that if the paths finish before the loop ends
608  // that we do not start too soon. It also guarantees that the task will
609  // run under that condition.
610  WaitingTaskHolder allPathsHolder(*iTask.group(), allPathsDone);
611 
612  auto pathsDone = make_waiting_task([allPathsHolder, pathErrorPtr, transitionInfo = info, this, weakToken](
613  std::exception_ptr const* iPtr) mutable {
614  ServiceRegistry::Operate operate(weakToken.lock());
615 
616  if (iPtr) {
617  //this is used to prioritize this error over one
618  // that happens in EndPath or Accumulate
619  pathErrorPtr->store(new std::exception_ptr(*iPtr));
620  }
621  finishedPaths(*pathErrorPtr, std::move(allPathsHolder), transitionInfo);
622  });
623 
624  //The holder guarantees that if the paths finish before the loop ends
625  // that we do not start too soon. It also guarantees that the task will
626  // run under that condition.
627  WaitingTaskHolder taskHolder(*iTask.group(), pathsDone);
628 
629  //start end paths first so on single threaded the paths will run first
630  WaitingTaskHolder hAllPathsDone(*iTask.group(), allPathsDone);
631  for (auto it = end_paths_.rbegin(), itEnd = end_paths_.rend(); it != itEnd; ++it) {
632  it->processOneOccurrenceAsync(hAllPathsDone, info, serviceToken, streamID_, &streamContext_);
633  }
634 
635  for (auto it = trig_paths_.rbegin(), itEnd = trig_paths_.rend(); it != itEnd; ++it) {
636  it->processOneOccurrenceAsync(taskHolder, info, serviceToken, streamID_, &streamContext_);
637  }
638 
639  ParentContext parentContext(&streamContext_);
641  hAllPathsDone, info, serviceToken, streamID_, parentContext, &streamContext_);
642  } catch (...) {
643  iTask.doneWaiting(std::current_exception());
644  }
645  }
646 
647  void StreamSchedule::finishedPaths(std::atomic<std::exception_ptr*>& iExcept,
648  WaitingTaskHolder iWait,
650  if (iExcept) {
651  // Caught exception is propagated via WaitingTaskHolder
652  CMS_SA_ALLOW try { std::rethrow_exception(*(iExcept.load())); } catch (cms::Exception& e) {
656  if (action == exception_actions::SkipEvent) {
657  edm::printCmsExceptionWarning("SkipEvent", e);
658  *(iExcept.load()) = std::exception_ptr();
659  } else {
660  *(iExcept.load()) = std::current_exception();
661  }
662  } catch (...) {
663  *(iExcept.load()) = std::current_exception();
664  }
665  }
666 
667  if ((not iExcept) and results_->accept()) {
668  ++total_passed_;
669  }
670 
671  if (nullptr != results_inserter_.get()) {
672  // Caught exception is propagated to the caller
673  CMS_SA_ALLOW try {
674  //Even if there was an exception, we need to allow results inserter
675  // to run since some module may be waiting on its results.
676  ParentContext parentContext(&streamContext_);
678 
679  auto expt = results_inserter_->runModuleDirectly<Traits>(info, streamID_, parentContext, &streamContext_);
680  if (expt) {
681  std::rethrow_exception(expt);
682  }
683  } catch (cms::Exception& ex) {
684  if (not iExcept) {
685  if (ex.context().empty()) {
686  std::ostringstream ost;
687  ost << "Processing Event " << info.principal().id();
688  ex.addContext(ost.str());
689  }
690  iExcept.store(new std::exception_ptr(std::current_exception()));
691  }
692  } catch (...) {
693  if (not iExcept) {
694  iExcept.store(new std::exception_ptr(std::current_exception()));
695  }
696  }
697  }
698  std::exception_ptr ptr;
699  if (iExcept) {
700  ptr = *iExcept.load();
701  }
702  iWait.doneWaiting(ptr);
703  }
704 
705  std::exception_ptr StreamSchedule::finishProcessOneEvent(std::exception_ptr iExcept) {
707 
708  if (iExcept) {
709  //add context information to the exception and print message
710  try {
711  convertException::wrap([&]() { std::rethrow_exception(iExcept); });
712  } catch (cms::Exception& ex) {
713  bool const cleaningUpAfterException = false;
714  if (ex.context().empty()) {
715  addContextAndPrintException("Calling function StreamSchedule::processOneEvent", ex, cleaningUpAfterException);
716  } else {
717  addContextAndPrintException("", ex, cleaningUpAfterException);
718  }
719  iExcept = std::current_exception();
720  }
721 
722  actReg_->preStreamEarlyTerminationSignal_(streamContext_, TerminationOrigin::ExceptionFromThisContext);
723  }
724  // Caught exception is propagated to the caller
725  CMS_SA_ALLOW try { Traits::postScheduleSignal(actReg_.get(), &streamContext_); } catch (...) {
726  if (not iExcept) {
727  iExcept = std::current_exception();
728  }
729  }
730  if (not iExcept) {
732  }
733 
734  return iExcept;
735  }
736 
737  void StreamSchedule::availablePaths(std::vector<std::string>& oLabelsToFill) const {
738  oLabelsToFill.reserve(trig_paths_.size());
739  std::transform(trig_paths_.begin(),
740  trig_paths_.end(),
741  std::back_inserter(oLabelsToFill),
742  std::bind(&Path::name, std::placeholders::_1));
743  }
744 
745  void StreamSchedule::modulesInPath(std::string const& iPathLabel, std::vector<std::string>& oLabelsToFill) const {
746  TrigPaths::const_iterator itFound = std::find_if(
747  trig_paths_.begin(),
748  trig_paths_.end(),
749  std::bind(std::equal_to<std::string>(), iPathLabel, std::bind(&Path::name, std::placeholders::_1)));
750  if (itFound != trig_paths_.end()) {
751  oLabelsToFill.reserve(itFound->size());
752  for (size_t i = 0; i < itFound->size(); ++i) {
753  oLabelsToFill.push_back(itFound->getWorker(i)->description()->moduleLabel());
754  }
755  }
756  }
757 
759  std::vector<ModuleDescription const*>& descriptions,
760  unsigned int hint) const {
761  descriptions.clear();
762  bool found = false;
763  TrigPaths::const_iterator itFound;
764 
765  if (hint < trig_paths_.size()) {
766  itFound = trig_paths_.begin() + hint;
767  if (itFound->name() == iPathLabel)
768  found = true;
769  }
770  if (!found) {
771  // if the hint did not work, do it the slow way
772  itFound = std::find_if(
773  trig_paths_.begin(),
774  trig_paths_.end(),
775  std::bind(std::equal_to<std::string>(), iPathLabel, std::bind(&Path::name, std::placeholders::_1)));
776  if (itFound != trig_paths_.end())
777  found = true;
778  }
779  if (found) {
780  descriptions.reserve(itFound->size());
781  for (size_t i = 0; i < itFound->size(); ++i) {
782  descriptions.push_back(itFound->getWorker(i)->description());
783  }
784  }
785  }
786 
788  std::vector<ModuleDescription const*>& descriptions,
789  unsigned int hint) const {
790  descriptions.clear();
791  bool found = false;
792  TrigPaths::const_iterator itFound;
793 
794  if (hint < end_paths_.size()) {
795  itFound = end_paths_.begin() + hint;
796  if (itFound->name() == iEndPathLabel)
797  found = true;
798  }
799  if (!found) {
800  // if the hint did not work, do it the slow way
801  itFound = std::find_if(
802  end_paths_.begin(),
803  end_paths_.end(),
804  std::bind(std::equal_to<std::string>(), iEndPathLabel, std::bind(&Path::name, std::placeholders::_1)));
805  if (itFound != end_paths_.end())
806  found = true;
807  }
808  if (found) {
809  descriptions.reserve(itFound->size());
810  for (size_t i = 0; i < itFound->size(); ++i) {
811  descriptions.push_back(itFound->getWorker(i)->description());
812  }
813  }
814  }
815 
816  static void fillModuleInPathSummary(Path const& path, size_t which, ModuleInPathSummary& sum) {
817  sum.timesVisited += path.timesVisited(which);
818  sum.timesPassed += path.timesPassed(which);
819  sum.timesFailed += path.timesFailed(which);
820  sum.timesExcept += path.timesExcept(which);
821  sum.moduleLabel = path.getWorker(which)->description()->moduleLabel();
822  }
823 
824  static void fillPathSummary(Path const& path, PathSummary& sum) {
825  sum.name = path.name();
826  sum.bitPosition = path.bitPosition();
827  sum.timesRun += path.timesRun();
828  sum.timesPassed += path.timesPassed();
829  sum.timesFailed += path.timesFailed();
830  sum.timesExcept += path.timesExcept();
831 
832  Path::size_type sz = path.size();
833  if (sum.moduleInPathSummaries.empty()) {
834  std::vector<ModuleInPathSummary> temp(sz);
835  for (size_t i = 0; i != sz; ++i) {
836  fillModuleInPathSummary(path, i, temp[i]);
837  }
838  sum.moduleInPathSummaries.swap(temp);
839  } else {
840  assert(sz == sum.moduleInPathSummaries.size());
841  for (size_t i = 0; i != sz; ++i) {
843  }
844  }
845  }
846 
847  static void fillWorkerSummaryAux(Worker const& w, WorkerSummary& sum) {
848  sum.timesVisited += w.timesVisited();
849  sum.timesRun += w.timesRun();
850  sum.timesPassed += w.timesPassed();
851  sum.timesFailed += w.timesFailed();
852  sum.timesExcept += w.timesExcept();
853  sum.moduleLabel = w.description()->moduleLabel();
854  }
855 
856  static void fillWorkerSummary(Worker const* pw, WorkerSummary& sum) { fillWorkerSummaryAux(*pw, sum); }
857 
862 
863  fill_summary(trig_paths_, rep.trigPathSummaries, &fillPathSummary);
864  fill_summary(end_paths_, rep.endPathSummaries, &fillPathSummary);
865  fill_summary(allWorkers(), rep.workerSummaries, &fillWorkerSummary);
866  }
867 
869  using std::placeholders::_1;
871  for_all(trig_paths_, std::bind(&Path::clearCounters, _1));
872  for_all(end_paths_, std::bind(&Path::clearCounters, _1));
873  for_all(allWorkers(), std::bind(&Worker::clearCounters, _1));
874  }
875 
877  skippingEvent_ = false;
878  results_->reset();
879  }
880 
882 
884  //must be sure we have cleared the count first
885  for (auto& count : earlyDeleteBranchToCount_) {
886  count.count = 0;
887  }
888  //now reset based on how many helpers use that branch
890  ++(earlyDeleteBranchToCount_[index].count);
891  }
892  for (auto& helper : earlyDeleteHelpers_) {
893  helper.reset();
894  }
895  }
896 
898  std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
899  std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
900  ExceptionToActionTable const& actions) {
901  int bitpos = 0;
902  unsigned int indexEmpty = 0;
903  unsigned int indexOfPath = 0;
904  for (auto& pathStatusInserter : pathStatusInserters) {
905  std::shared_ptr<PathStatusInserter> inserterPtr = get_underlying(pathStatusInserter);
906  WorkerPtr workerPtr(
907  new edm::WorkerT<PathStatusInserter::ModuleType>(inserterPtr, inserterPtr->moduleDescription(), &actions));
908  pathStatusInserterWorkers_.emplace_back(workerPtr);
909  workerPtr->setActivityRegistry(actReg_);
910  addToAllWorkers(workerPtr.get());
911 
912  // A little complexity here because a C++ Path object is not
913  // instantiated and put into end_paths if there are no modules
914  // on the configured path.
915  if (indexEmpty < empty_trig_paths_.size() && bitpos == empty_trig_paths_.at(indexEmpty)) {
916  ++indexEmpty;
917  } else {
918  trig_paths_.at(indexOfPath).setPathStatusInserter(inserterPtr.get(), workerPtr.get());
919  ++indexOfPath;
920  }
921  ++bitpos;
922  }
923 
924  bitpos = 0;
925  indexEmpty = 0;
926  indexOfPath = 0;
927  for (auto& endPathStatusInserter : endPathStatusInserters) {
928  std::shared_ptr<EndPathStatusInserter> inserterPtr = get_underlying(endPathStatusInserter);
929  WorkerPtr workerPtr(
930  new edm::WorkerT<EndPathStatusInserter::ModuleType>(inserterPtr, inserterPtr->moduleDescription(), &actions));
931  endPathStatusInserterWorkers_.emplace_back(workerPtr);
932  workerPtr->setActivityRegistry(actReg_);
933  addToAllWorkers(workerPtr.get());
934 
935  // A little complexity here because a C++ Path object is not
936  // instantiated and put into end_paths if there are no modules
937  // on the configured path.
938  if (indexEmpty < empty_end_paths_.size() && bitpos == empty_end_paths_.at(indexEmpty)) {
939  ++indexEmpty;
940  } else {
941  end_paths_.at(indexOfPath).setPathStatusInserter(nullptr, workerPtr.get());
942  ++indexOfPath;
943  }
944  ++bitpos;
945  }
946  }
947 } // namespace edm
static void fillModuleInPathSummary(Path const &path, size_t which, ModuleInPathSummary &sum)
void moduleDescriptionsInEndPath(std::string const &iEndPathLabel, std::vector< ModuleDescription const * > &descriptions, unsigned int hint) const
std::vector< PathSummary > endPathSummaries
Definition: TriggerReport.h:59
T getUntrackedParameter(std::string const &, T const &) const
std::string const & branchName() const
ServiceToken lock() const
Definition: ServiceToken.h:101
string rep
Definition: cuy.py:1189
pathNames_ & tns()), endPathNames_(&tns.getEndPaths()), wantSummary_(tns.wantSummary()
Definition: Schedule.cc:691
int timesRun() const
Definition: Path.h:78
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
int bitPosition() const
Definition: Path.h:73
def which
Definition: eostools.py:336
static const TGPicture * info(bool iBackgroundIsBlack)
#define CMS_SA_ALLOW
ModuleDescription const * description() const
Definition: Worker.h:188
std::vector< int > empty_trig_paths_
void moduleDescriptionsInPath(std::string const &iPathLabel, std::vector< ModuleDescription const * > &descriptions, unsigned int hint) const
void fillTrigPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, TrigResPtr, std::vector< std::string > const &endPathNames)
std::vector< edm::propagate_const< WorkerPtr > > pathStatusInserterWorkers_
void endStream(StreamID iID, StreamContext &streamContext)
virtual void replaceModuleFor(Worker *) const =0
Strings const & getEndPaths() const
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventIDconst &, edm::Timestampconst & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
int totalEventsFailed() const
void processAccumulatorsAsync(WaitingTaskHolder, typename T::TransitionInfoType const &, ServiceToken const &, StreamID, ParentContext const &, typename T::Context const *)
std::array< SelectedProducts, NumBranchTypes > SelectedProductsForBranchType
std::shared_ptr< HLTGlobalStatus > TrigResPtr
WorkersInPath::size_type size_type
Definition: Path.h:46
EventPrincipal & principal()
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
void addToUnscheduledWorkers(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration > processConfiguration, std::string label, std::set< std::string > &unscheduledLabels, std::vector< std::string > &shouldBeUsedLabels)
int timesPassed() const
Definition: Worker.h:234
void processOneEventAsync(WaitingTaskHolder iTask, EventTransitionInfo &, ServiceToken const &token, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters)
std::vector< BranchToCount > earlyDeleteBranchToCount_
EventID const & id() const
int totalEvents() const
def unique
Definition: tier0.py:24
void setupOnDemandSystem(EventTransitionInfo const &)
void clearCounters()
Definition: Worker.h:222
void addContextAndPrintException(char const *context, cms::Exception &ex, bool disablePrint)
std::string const & moduleName() const
processConfiguration
Definition: Schedule.cc:687
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t Func __host__ __device__ V int Func func
std::string const & category() const
Definition: Exception.cc:143
void addToAllWorkers(Worker *w)
int timesExcept() const
Definition: Path.h:81
exception_actions::ActionCodes find(const std::string &category) const
size_type size() const
Definition: Path.h:84
edm::propagate_const< WorkerPtr > results_inserter_
ExceptionToActionTable const & actionTable() const
returns the action table
void beginStream(StreamID id, StreamContext &streamContext)
Definition: Worker.cc:280
void deleteModuleIfExists(std::string const &moduleLabel)
std::shared_ptr< Worker > WorkerPtr
std::vector< WorkerSummary > workerSummaries
Definition: TriggerReport.h:60
void fillWorkers(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &name, bool ignoreFilters, PathWorkers &out, std::vector< std::string > const &endPathNames)
assert(be >=bs)
actions
Definition: Schedule.cc:687
std::string const & moduleLabel() const
unsigned int number_of_unscheduled_modules_
WorkerManager workerManager_
std::shared_ptr< ActivityRegistry > actReg_
void finishedPaths(std::atomic< std::exception_ptr * > &, WaitingTaskHolder, EventTransitionInfo &)
tuple result
Definition: mps_fire.py:311
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
std::string moduleLabel
Definition: TriggerReport.h:51
oneapi::tbb::task_group * group() const noexcept
char const * label
void fillEndPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, std::vector< std::string > const &endPathNames)
std::vector< WorkerInPath > PathWorkers
int timesExcept() const
Definition: Worker.h:236
int totalEventsPassed() const
std::vector< int > empty_end_paths_
void getTriggerReport(TriggerReport &rep) const
void doneWaiting(std::exception_ptr iExcept)
std::vector< PathSummary > trigPathSummaries
Definition: TriggerReport.h:58
EventSummary eventSummary
Definition: TriggerReport.h:57
accept
Definition: HLTenums.h:18
int timesVisited() const
Definition: Worker.h:233
std::string name
Definition: TriggerReport.h:40
Definition: Path.h:41
StreamContext streamContext_
std::vector< std::string > vstring
def move
Definition: eostools.py:511
std::list< std::string > const & context() const
Definition: Exception.cc:147
static void fillPathSummary(Path const &path, PathSummary &sum)
int timesPassed() const
Definition: Path.h:79
StreamSchedule(std::shared_ptr< TriggerResultInserter > inserter, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, std::shared_ptr< ModuleRegistry >, ParameterSet &proc_pset, service::TriggerNamesService const &tns, PreallocationConfiguration const &prealloc, ProductRegistry &pregistry, BranchIDListHelper &branchIDListHelper, ExceptionToActionTable const &actions, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration > processConfiguration, StreamID streamID, ProcessContext const *processContext)
bool getMapped(key_type const &k, value_type &result) const
Definition: Registry.cc:17
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:101
int timesRun() const
Definition: Worker.h:232
std::exception_ptr finishProcessOneEvent(std::exception_ptr)
void deleteModule(std::string const &iLabel)
Delete the module with label iLabel.
areg
Definition: Schedule.cc:687
static void fillWorkerSummaryAux(Worker const &w, WorkerSummary &sum)
virtual Types moduleType() const =0
Log< level::Info, false > LogInfo
void beginStream(StreamID iID, StreamContext &streamContext)
void clearCounters()
Clear all the counters in the trigger report.
void initializeEarlyDelete(ModuleRegistry &modReg, std::vector< std::string > const &branchesToDeleteEarly, edm::ProductRegistry const &preg)
unsigned int value() const
Definition: StreamID.h:43
std::vector< unsigned int > earlyDeleteHelperToBranchIndicies_
void forAllModuleHolders(F iFunc)
edm::propagate_const< TrigResPtr > results_
T getParameter(std::string const &) const
Definition: ParameterSet.h:303
bool search_all(ForwardSequence const &s, Datum const &d)
Definition: Algorithms.h:36
int timesVisited(size_type i) const
Definition: Path.h:85
double b
Definition: hdecay.h:118
constexpr T & get_underlying(propagate_const< T > &)
void addContext(std::string const &context)
Definition: Exception.cc:165
virtual std::unique_ptr< OutputModuleCommunicator > createOutputModuleCommunicator()=0
string action
Definition: mps_fire.py:183
constexpr element_type const * get() const
int timesFailed() const
Definition: Worker.h:235
static void fillWorkerSummary(Worker const *pw, WorkerSummary &sum)
std::atomic< bool > skippingEvent_
std::string const & name() const
Definition: Path.h:74
void setupResolvers(Principal &principal)
Strings const & getTrigPaths() const
Worker const * getWorker(size_type i) const
Definition: Path.h:89
string end
Definition: dataset.py:937
void replaceModule(maker::ModuleHolder *iMod, std::string const &iLabel)
clone the type of module with label iLabel but configure with iPSet.
std::vector< edm::propagate_const< WorkerPtr > > endPathStatusInserterWorkers_
void makePathStatusInserters(std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, ExceptionToActionTable const &actions)
std::vector< ModuleInPathSummary > moduleInPathSummaries
Definition: TriggerReport.h:41
T w() const
auto wrap(F iFunc) -> decltype(iFunc())
Log< level::Warning, false > LogWarning
preg
Definition: Schedule.cc:687
TrigResConstPtr results() const
std::vector< std::string > vstring
Definition: Schedule.cc:667
std::vector< std::string > set_difference(std::vector< std::string > const &v1, std::vector< std::string > const &v2)
tuple size
Write out results.
static Registry * instance()
Definition: Registry.cc:12
Worker * getWorker(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &label)
void clearCounters()
Definition: Path.cc:198
void availablePaths(std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill the labels for all paths in the process
std::vector< EarlyDeleteHelper > earlyDeleteHelpers_
void modulesInPath(std::string const &iPathLabel, std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel ...
prealloc
Definition: Schedule.cc:687
void addToAllWorkers(Worker *w)
int timesFailed() const
Definition: Path.h:80
void printCmsExceptionWarning(char const *behavior, cms::Exception const &e)
ParameterSet * getPSetForUpdate(std::string const &name, bool &isTracked)
std::vector< std::string > set_intersection(std::vector< std::string > const &v1, std::vector< std::string > const &v2)
unsigned transform(const HcalDetId &id, unsigned transformCode)