CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Groups Pages
StreamSchedule.cc
Go to the documentation of this file.
2 
28 
30 
31 #include <algorithm>
32 #include <cassert>
33 #include <cstdlib>
34 #include <functional>
35 #include <iomanip>
36 #include <list>
37 #include <map>
38 #include <exception>
39 
40 namespace edm {
41 
42  namespace {
43 
44  // Function template to transform each element in the input range to
45  // a value placed into the output range. The supplied function
46  // should take a const_reference to the 'input', and write to a
47  // reference to the 'output'.
48  template <typename InputIterator, typename ForwardIterator, typename Func>
49  void transform_into(InputIterator begin, InputIterator end, ForwardIterator out, Func func) {
50  for (; begin != end; ++begin, ++out)
51  func(*begin, *out);
52  }
53 
54  // Function template that takes a sequence 'from', a sequence
55  // 'to', and a callable object 'func'. It and applies
56  // transform_into to fill the 'to' sequence with the values
57  // calcuated by the callable object, taking care to fill the
58  // outupt only if all calls succeed.
59  template <typename FROM, typename TO, typename FUNC>
60  void fill_summary(FROM const& from, TO& to, FUNC func) {
61  if (to.size() != from.size()) {
62  TO temp(from.size());
63  transform_into(from.begin(), from.end(), temp.begin(), func);
64  to.swap(temp);
65  } else {
66  transform_into(from.begin(), from.end(), to.begin(), func);
67  }
68  }
69 
70  // -----------------------------
71 
72  // Here we make the trigger results inserter directly. This should
73  // probably be a utility in the WorkerRegistry or elsewhere.
74 
75  StreamSchedule::WorkerPtr makeInserter(ExceptionToActionTable const& actions,
76  std::shared_ptr<ActivityRegistry> areg,
77  std::shared_ptr<TriggerResultInserter> inserter) {
79  new edm::WorkerT<TriggerResultInserter::ModuleType>(inserter, inserter->moduleDescription(), &actions));
80  ptr->setActivityRegistry(areg);
81  return ptr;
82  }
83 
84  void initializeBranchToReadingWorker(ParameterSet const& opts,
85  ProductRegistry const& preg,
86  std::multimap<std::string, Worker*>& branchToReadingWorker) {
87  // See if any data has been marked to be deleted early (removing any duplicates)
88  auto vBranchesToDeleteEarly = opts.getUntrackedParameter<std::vector<std::string>>("canDeleteEarly");
89  if (not vBranchesToDeleteEarly.empty()) {
90  std::sort(vBranchesToDeleteEarly.begin(), vBranchesToDeleteEarly.end(), std::less<std::string>());
91  vBranchesToDeleteEarly.erase(std::unique(vBranchesToDeleteEarly.begin(), vBranchesToDeleteEarly.end()),
92  vBranchesToDeleteEarly.end());
93 
94  // Are the requested items in the product registry?
95  auto allBranchNames = preg.allBranchNames();
96  //the branch names all end with a period, which we do not want to compare with
97  for (auto& b : allBranchNames) {
98  b.resize(b.size() - 1);
99  }
100  std::sort(allBranchNames.begin(), allBranchNames.end(), std::less<std::string>());
101  std::vector<std::string> temp;
102  temp.reserve(vBranchesToDeleteEarly.size());
103 
104  std::set_intersection(vBranchesToDeleteEarly.begin(),
105  vBranchesToDeleteEarly.end(),
106  allBranchNames.begin(),
107  allBranchNames.end(),
108  std::back_inserter(temp));
109  vBranchesToDeleteEarly.swap(temp);
110  if (temp.size() != vBranchesToDeleteEarly.size()) {
111  std::vector<std::string> missingProducts;
112  std::set_difference(temp.begin(),
113  temp.end(),
114  vBranchesToDeleteEarly.begin(),
115  vBranchesToDeleteEarly.end(),
116  std::back_inserter(missingProducts));
117  LogInfo l("MissingProductsForCanDeleteEarly");
118  l << "The following products in the 'canDeleteEarly' list are not available in this job and will be ignored.";
119  for (auto const& n : missingProducts) {
120  l << "\n " << n;
121  }
122  }
123  //set placeholder for the branch, we will remove the nullptr if a
124  // module actually wants the branch.
125  for (auto const& branch : vBranchesToDeleteEarly) {
126  branchToReadingWorker.insert(std::make_pair(branch, static_cast<Worker*>(nullptr)));
127  }
128  }
129  }
130  } // namespace
131 
132  // -----------------------------
133 
134  typedef std::vector<std::string> vstring;
135 
136  // -----------------------------
137 
139  std::shared_ptr<TriggerResultInserter> inserter,
140  std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
141  std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
142  std::shared_ptr<ModuleRegistry> modReg,
143  ParameterSet& proc_pset,
146  ProductRegistry& preg,
147  BranchIDListHelper& branchIDListHelper,
148  ExceptionToActionTable const& actions,
149  std::shared_ptr<ActivityRegistry> areg,
150  std::shared_ptr<ProcessConfiguration> processConfiguration,
151  bool allowEarlyDelete,
152  StreamID streamID,
153  ProcessContext const* processContext)
154  : workerManager_(modReg, areg, actions),
155  actReg_(areg),
156  results_(new HLTGlobalStatus(tns.getTrigPaths().size())),
157  results_inserter_(),
158  trig_paths_(),
159  end_paths_(),
160  total_events_(),
161  total_passed_(),
162  number_of_unscheduled_modules_(0),
163  streamID_(streamID),
164  streamContext_(streamID_, processContext),
165  skippingEvent_(false) {
166  ParameterSet const& opts = proc_pset.getUntrackedParameterSet("options", ParameterSet());
167  bool hasPath = false;
168  std::vector<std::string> const& pathNames = tns.getTrigPaths();
169  std::vector<std::string> const& endPathNames = tns.getEndPaths();
170 
171  int trig_bitpos = 0;
172  trig_paths_.reserve(pathNames.size());
173  for (auto const& trig_name : pathNames) {
174  fillTrigPath(proc_pset, preg, &prealloc, processConfiguration, trig_bitpos, trig_name, results(), endPathNames);
175  ++trig_bitpos;
176  hasPath = true;
177  }
178 
179  if (hasPath) {
180  // the results inserter stands alone
181  inserter->setTrigResultForStream(streamID.value(), results());
182 
183  results_inserter_ = makeInserter(actions, actReg_, inserter);
185  }
186 
187  // fill normal endpaths
188  int bitpos = 0;
189  end_paths_.reserve(endPathNames.size());
190  for (auto const& end_path_name : endPathNames) {
191  fillEndPath(proc_pset, preg, &prealloc, processConfiguration, bitpos, end_path_name, endPathNames);
192  ++bitpos;
193  }
194 
195  makePathStatusInserters(pathStatusInserters, endPathStatusInserters, actions);
196 
197  //See if all modules were used
198  std::set<std::string> usedWorkerLabels;
199  for (auto const& worker : allWorkers()) {
200  usedWorkerLabels.insert(worker->description()->moduleLabel());
201  }
202  std::vector<std::string> modulesInConfig(proc_pset.getParameter<std::vector<std::string>>("@all_modules"));
203  std::set<std::string> modulesInConfigSet(modulesInConfig.begin(), modulesInConfig.end());
204  std::vector<std::string> unusedLabels;
205  set_difference(modulesInConfigSet.begin(),
206  modulesInConfigSet.end(),
207  usedWorkerLabels.begin(),
208  usedWorkerLabels.end(),
209  back_inserter(unusedLabels));
210  std::set<std::string> unscheduledLabels;
211  std::vector<std::string> shouldBeUsedLabels;
212  if (!unusedLabels.empty()) {
213  //Need to
214  // 1) create worker
215  // 2) if it is a WorkerT<EDProducer>, add it to our list
216  // 3) hand list to our delayed reader
217  for (auto const& label : unusedLabels) {
218  bool isTracked;
219  ParameterSet* modulePSet(proc_pset.getPSetForUpdate(label, isTracked));
220  assert(isTracked);
221  assert(modulePSet != nullptr);
223  *modulePSet, preg, &prealloc, processConfiguration, label, unscheduledLabels, shouldBeUsedLabels);
224  }
225  if (!shouldBeUsedLabels.empty()) {
226  std::ostringstream unusedStream;
227  unusedStream << "'" << shouldBeUsedLabels.front() << "'";
228  for (std::vector<std::string>::iterator itLabel = shouldBeUsedLabels.begin() + 1,
229  itLabelEnd = shouldBeUsedLabels.end();
230  itLabel != itLabelEnd;
231  ++itLabel) {
232  unusedStream << ",'" << *itLabel << "'";
233  }
234  LogInfo("path") << "The following module labels are not assigned to any path:\n" << unusedStream.str() << "\n";
235  }
236  }
237  number_of_unscheduled_modules_ = unscheduledLabels.size();
238 
239  initializeEarlyDelete(*modReg, opts, preg, allowEarlyDelete);
240 
241  } // StreamSchedule::StreamSchedule
242 
244  edm::ParameterSet const& opts,
245  edm::ProductRegistry const& preg,
246  bool allowEarlyDelete) {
247  //for now, if have a subProcess, don't allow early delete
248  // In the future we should use the SubProcess's 'keep list' to decide what can be kept
249  if (not allowEarlyDelete)
250  return;
251 
252  //see if 'canDeleteEarly' was set and if so setup the list with those products actually
253  // registered for this job
254  std::multimap<std::string, Worker*> branchToReadingWorker;
255  initializeBranchToReadingWorker(opts, preg, branchToReadingWorker);
256 
257  //If no delete early items have been specified we don't have to do anything
258  if (branchToReadingWorker.empty()) {
259  return;
260  }
261  const std::vector<std::string> kEmpty;
262  std::map<Worker*, unsigned int> reserveSizeForWorker;
263  unsigned int upperLimitOnReadingWorker = 0;
264  unsigned int upperLimitOnIndicies = 0;
265  unsigned int nUniqueBranchesToDelete = branchToReadingWorker.size();
266 
267  //talk with output modules first
268  modReg.forAllModuleHolders([&branchToReadingWorker, &nUniqueBranchesToDelete](maker::ModuleHolder* iHolder) {
269  auto comm = iHolder->createOutputModuleCommunicator();
270  if (comm) {
271  if (!branchToReadingWorker.empty()) {
272  //If an OutputModule needs a product, we can't delete it early
273  // so we should remove it from our list
274  SelectedProductsForBranchType const& kept = comm->keptProducts();
275  for (auto const& item : kept[InEvent]) {
276  BranchDescription const& desc = *item.first;
277  auto found = branchToReadingWorker.equal_range(desc.branchName());
278  if (found.first != found.second) {
279  --nUniqueBranchesToDelete;
280  branchToReadingWorker.erase(found.first, found.second);
281  }
282  }
283  }
284  }
285  });
286 
287  if (branchToReadingWorker.empty()) {
288  return;
289  }
290 
291  for (auto w : allWorkers()) {
292  //determine if this module could read a branch we want to delete early
293  auto pset = pset::Registry::instance()->getMapped(w->description()->parameterSetID());
294  if (nullptr != pset) {
295  auto branches = pset->getUntrackedParameter<std::vector<std::string>>("mightGet", kEmpty);
296  if (not branches.empty()) {
297  ++upperLimitOnReadingWorker;
298  }
299  for (auto const& branch : branches) {
300  auto found = branchToReadingWorker.equal_range(branch);
301  if (found.first != found.second) {
302  ++upperLimitOnIndicies;
303  ++reserveSizeForWorker[w];
304  if (nullptr == found.first->second) {
305  found.first->second = w;
306  } else {
307  branchToReadingWorker.insert(make_pair(found.first->first, w));
308  }
309  }
310  }
311  }
312  }
313  {
314  auto it = branchToReadingWorker.begin();
315  std::vector<std::string> unusedBranches;
316  while (it != branchToReadingWorker.end()) {
317  if (it->second == nullptr) {
318  unusedBranches.push_back(it->first);
319  //erasing the object invalidates the iterator so must advance it first
320  auto temp = it;
321  ++it;
322  branchToReadingWorker.erase(temp);
323  } else {
324  ++it;
325  }
326  }
327  if (not unusedBranches.empty()) {
328  LogWarning l("UnusedProductsForCanDeleteEarly");
329  l << "The following products in the 'canDeleteEarly' list are not used in this job and will be ignored.\n"
330  " If possible, remove the producer from the job or add the product to the producer's own 'mightGet' list.";
331  for (auto const& n : unusedBranches) {
332  l << "\n " << n;
333  }
334  }
335  }
336  if (!branchToReadingWorker.empty()) {
337  earlyDeleteHelpers_.reserve(upperLimitOnReadingWorker);
338  earlyDeleteHelperToBranchIndicies_.resize(upperLimitOnIndicies, 0);
339  earlyDeleteBranchToCount_.reserve(nUniqueBranchesToDelete);
340  std::map<const Worker*, EarlyDeleteHelper*> alreadySeenWorkers;
341  std::string lastBranchName;
342  size_t nextOpenIndex = 0;
343  unsigned int* beginAddress = &(earlyDeleteHelperToBranchIndicies_.front());
344  for (auto& branchAndWorker : branchToReadingWorker) {
345  if (lastBranchName != branchAndWorker.first) {
346  //have to put back the period we removed earlier in order to get the proper name
347  BranchID bid(branchAndWorker.first + ".");
348  earlyDeleteBranchToCount_.emplace_back(bid, 0U);
349  lastBranchName = branchAndWorker.first;
350  }
351  auto found = alreadySeenWorkers.find(branchAndWorker.second);
352  if (alreadySeenWorkers.end() == found) {
353  //NOTE: we will set aside enough space in earlyDeleteHelperToBranchIndicies_ to accommodate
354  // all the branches that might be read by this worker. However, initially we will only tell the
355  // EarlyDeleteHelper about the first one. As additional branches are added via 'appendIndex' the
356  // EarlyDeleteHelper will automatically advance its internal end pointer.
357  size_t index = nextOpenIndex;
358  size_t nIndices = reserveSizeForWorker[branchAndWorker.second];
360  earlyDeleteHelpers_.emplace_back(beginAddress + index, beginAddress + index + 1, &earlyDeleteBranchToCount_);
361  branchAndWorker.second->setEarlyDeleteHelper(&(earlyDeleteHelpers_.back()));
362  alreadySeenWorkers.insert(std::make_pair(branchAndWorker.second, &(earlyDeleteHelpers_.back())));
363  nextOpenIndex += nIndices;
364  } else {
365  found->second->appendIndex(earlyDeleteBranchToCount_.size() - 1);
366  }
367  }
368 
369  //Now we can compactify the earlyDeleteHelperToBranchIndicies_ since we may have over estimated the
370  // space needed for each module
371  auto itLast = earlyDeleteHelpers_.begin();
372  for (auto it = earlyDeleteHelpers_.begin() + 1; it != earlyDeleteHelpers_.end(); ++it) {
373  if (itLast->end() != it->begin()) {
374  //figure the offset for next Worker since it hasn't been moved yet so it has the original address
375  unsigned int delta = it->begin() - itLast->end();
376  it->shiftIndexPointers(delta);
377 
379  earlyDeleteHelperToBranchIndicies_.begin() + (itLast->end() - beginAddress),
380  earlyDeleteHelperToBranchIndicies_.begin() + (it->begin() - beginAddress));
381  }
382  itLast = it;
383  }
385  earlyDeleteHelperToBranchIndicies_.begin() + (itLast->end() - beginAddress),
387 
388  //now tell the paths about the deleters
389  for (auto& p : trig_paths_) {
390  p.setEarlyDeleteHelpers(alreadySeenWorkers);
391  }
392  for (auto& p : end_paths_) {
393  p.setEarlyDeleteHelpers(alreadySeenWorkers);
394  }
396  }
397  }
398 
400  ProductRegistry& preg,
402  std::shared_ptr<ProcessConfiguration const> processConfiguration,
403  std::string const& pathName,
404  bool ignoreFilters,
405  PathWorkers& out,
406  std::vector<std::string> const& endPathNames) {
407  vstring modnames = proc_pset.getParameter<vstring>(pathName);
408  PathWorkers tmpworkers;
409 
410  unsigned int placeInPath = 0;
411  for (auto const& name : modnames) {
412  //Modules except EDFilters are set to run concurrently by default
413  bool doNotRunConcurrently = false;
415  if (name[0] == '!') {
416  filterAction = WorkerInPath::Veto;
417  } else if (name[0] == '-' or name[0] == '+') {
418  filterAction = WorkerInPath::Ignore;
419  }
420  if (name[0] == '|' or name[0] == '+') {
421  //cms.wait was specified so do not run concurrently
422  doNotRunConcurrently = true;
423  }
424 
425  std::string moduleLabel = name;
426  if (filterAction != WorkerInPath::Normal or name[0] == '|') {
427  moduleLabel.erase(0, 1);
428  }
429 
430  bool isTracked;
431  ParameterSet* modpset = proc_pset.getPSetForUpdate(moduleLabel, isTracked);
432  if (modpset == nullptr) {
433  std::string pathType("endpath");
434  if (!search_all(endPathNames, pathName)) {
435  pathType = std::string("path");
436  }
438  << "The unknown module label \"" << moduleLabel << "\" appears in " << pathType << " \"" << pathName
439  << "\"\n please check spelling or remove that label from the path.";
440  }
441  assert(isTracked);
442 
443  Worker* worker = workerManager_.getWorker(*modpset, preg, prealloc, processConfiguration, moduleLabel);
444  if (ignoreFilters && filterAction != WorkerInPath::Ignore && worker->moduleType() == Worker::kFilter) {
445  // We have a filter on an end path, and the filter is not explicitly ignored.
446  // See if the filter is allowed.
447  std::vector<std::string> allowed_filters = proc_pset.getUntrackedParameter<vstring>("@filters_on_endpaths");
448  if (!search_all(allowed_filters, worker->description()->moduleName())) {
449  // Filter is not allowed. Ignore the result, and issue a warning.
450  filterAction = WorkerInPath::Ignore;
451  LogWarning("FilterOnEndPath") << "The EDFilter '" << worker->description()->moduleName()
452  << "' with module label '" << moduleLabel << "' appears on EndPath '"
453  << pathName << "'.\n"
454  << "The return value of the filter will be ignored.\n"
455  << "To suppress this warning, either remove the filter from the endpath,\n"
456  << "or explicitly ignore it in the configuration by using cms.ignore().\n";
457  }
458  }
459  bool runConcurrently = not doNotRunConcurrently;
460  if (runConcurrently && worker->moduleType() == Worker::kFilter and filterAction != WorkerInPath::Ignore) {
461  runConcurrently = false;
462  }
463  tmpworkers.emplace_back(worker, filterAction, placeInPath, runConcurrently);
464  ++placeInPath;
465  }
466 
467  out.swap(tmpworkers);
468  }
469 
471  ProductRegistry& preg,
473  std::shared_ptr<ProcessConfiguration const> processConfiguration,
474  int bitpos,
475  std::string const& name,
476  TrigResPtr trptr,
477  std::vector<std::string> const& endPathNames) {
478  PathWorkers tmpworkers;
479  fillWorkers(proc_pset, preg, prealloc, processConfiguration, name, false, tmpworkers, endPathNames);
480 
481  // an empty path will cause an extra bit that is not used
482  if (!tmpworkers.empty()) {
483  trig_paths_.emplace_back(bitpos,
484  name,
485  tmpworkers,
486  trptr,
487  actionTable(),
488  actReg_,
492  } else {
493  empty_trig_paths_.push_back(bitpos);
494  }
495  for (WorkerInPath const& workerInPath : tmpworkers) {
496  addToAllWorkers(workerInPath.getWorker());
497  }
498  }
499 
501  ProductRegistry& preg,
503  std::shared_ptr<ProcessConfiguration const> processConfiguration,
504  int bitpos,
505  std::string const& name,
506  std::vector<std::string> const& endPathNames) {
507  PathWorkers tmpworkers;
508  fillWorkers(proc_pset, preg, prealloc, processConfiguration, name, true, tmpworkers, endPathNames);
509 
510  if (!tmpworkers.empty()) {
511  //EndPaths are not supposed to stop if SkipEvent type exception happens
512  end_paths_.emplace_back(bitpos,
513  name,
514  tmpworkers,
515  TrigResPtr(),
516  actionTable(),
517  actReg_,
519  nullptr,
521  } else {
522  empty_end_paths_.push_back(bitpos);
523  }
524  for (WorkerInPath const& workerInPath : tmpworkers) {
525  addToAllWorkers(workerInPath.getWorker());
526  }
527  }
528 
530 
532 
534  Worker* found = nullptr;
535  for (auto const& worker : allWorkers()) {
536  if (worker->description()->moduleLabel() == iLabel) {
537  found = worker;
538  break;
539  }
540  }
541  if (nullptr == found) {
542  return;
543  }
544 
545  iMod->replaceModuleFor(found);
547  }
548 
550 
551  std::vector<ModuleDescription const*> StreamSchedule::getAllModuleDescriptions() const {
552  std::vector<ModuleDescription const*> result;
553  result.reserve(allWorkers().size());
554 
555  for (auto const& worker : allWorkers()) {
556  ModuleDescription const* p = worker->description();
557  result.push_back(p);
558  }
559  return result;
560  }
561 
563  WaitingTaskHolder iTask,
565  ServiceToken const& serviceToken,
566  std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters) {
567  EventPrincipal& ep = info.principal();
568 
569  // Caught exception is propagated via WaitingTaskHolder
570  CMS_SA_ALLOW try {
571  this->resetAll();
572 
574 
575  Traits::setStreamContext(streamContext_, ep);
576  //a service may want to communicate with another service
577  ServiceRegistry::Operate guard(serviceToken);
578  Traits::preScheduleSignal(actReg_.get(), &streamContext_);
579 
580  HLTPathStatus hltPathStatus(hlt::Pass, 0);
581  for (int empty_trig_path : empty_trig_paths_) {
582  results_->at(empty_trig_path) = hltPathStatus;
583  pathStatusInserters[empty_trig_path]->setPathStatus(streamID_, hltPathStatus);
584  std::exception_ptr except = pathStatusInserterWorkers_[empty_trig_path]
587  if (except) {
588  iTask.doneWaiting(except);
589  return;
590  }
591  }
592  for (int empty_end_path : empty_end_paths_) {
593  std::exception_ptr except = endPathStatusInserterWorkers_[empty_end_path]
596  if (except) {
597  iTask.doneWaiting(except);
598  return;
599  }
600  }
601 
604 
605  ++total_events_;
606 
607  //use to give priorities on an error to ones from Paths
608  auto pathErrorHolder = std::make_unique<std::atomic<std::exception_ptr*>>(nullptr);
609  auto pathErrorPtr = pathErrorHolder.get();
610  ServiceWeakToken weakToken = serviceToken;
611  auto allPathsDone = make_waiting_task(
612  [iTask, this, weakToken, pathError = std::move(pathErrorHolder)](std::exception_ptr const* iPtr) mutable {
613  ServiceRegistry::Operate operate(weakToken.lock());
614 
615  std::exception_ptr ptr;
616  if (pathError->load()) {
617  ptr = *pathError->load();
618  delete pathError->load();
619  }
620  if ((not ptr) and iPtr) {
621  ptr = *iPtr;
622  }
624  });
625  //The holder guarantees that if the paths finish before the loop ends
626  // that we do not start too soon. It also guarantees that the task will
627  // run under that condition.
628  WaitingTaskHolder allPathsHolder(*iTask.group(), allPathsDone);
629 
630  auto pathsDone = make_waiting_task([allPathsHolder, pathErrorPtr, transitionInfo = info, this, weakToken](
631  std::exception_ptr const* iPtr) mutable {
632  ServiceRegistry::Operate operate(weakToken.lock());
633 
634  if (iPtr) {
635  //this is used to prioritize this error over one
636  // that happens in EndPath or Accumulate
637  pathErrorPtr->store(new std::exception_ptr(*iPtr));
638  }
639  finishedPaths(*pathErrorPtr, std::move(allPathsHolder), transitionInfo);
640  });
641 
642  //The holder guarantees that if the paths finish before the loop ends
643  // that we do not start too soon. It also guarantees that the task will
644  // run under that condition.
645  WaitingTaskHolder taskHolder(*iTask.group(), pathsDone);
646 
647  //start end paths first so on single threaded the paths will run first
648  WaitingTaskHolder hAllPathsDone(*iTask.group(), allPathsDone);
649  for (auto it = end_paths_.rbegin(), itEnd = end_paths_.rend(); it != itEnd; ++it) {
650  it->processOneOccurrenceAsync(hAllPathsDone, info, serviceToken, streamID_, &streamContext_);
651  }
652 
653  for (auto it = trig_paths_.rbegin(), itEnd = trig_paths_.rend(); it != itEnd; ++it) {
654  it->processOneOccurrenceAsync(taskHolder, info, serviceToken, streamID_, &streamContext_);
655  }
656 
657  ParentContext parentContext(&streamContext_);
659  hAllPathsDone, info, serviceToken, streamID_, parentContext, &streamContext_);
660  } catch (...) {
661  iTask.doneWaiting(std::current_exception());
662  }
663  }
664 
665  void StreamSchedule::finishedPaths(std::atomic<std::exception_ptr*>& iExcept,
666  WaitingTaskHolder iWait,
668  if (iExcept) {
669  // Caught exception is propagated via WaitingTaskHolder
670  CMS_SA_ALLOW try { std::rethrow_exception(*(iExcept.load())); } catch (cms::Exception& e) {
674  if (action == exception_actions::SkipEvent) {
675  edm::printCmsExceptionWarning("SkipEvent", e);
676  *(iExcept.load()) = std::exception_ptr();
677  } else {
678  *(iExcept.load()) = std::current_exception();
679  }
680  } catch (...) {
681  *(iExcept.load()) = std::current_exception();
682  }
683  }
684 
685  if ((not iExcept) and results_->accept()) {
686  ++total_passed_;
687  }
688 
689  if (nullptr != results_inserter_.get()) {
690  // Caught exception is propagated to the caller
691  CMS_SA_ALLOW try {
692  //Even if there was an exception, we need to allow results inserter
693  // to run since some module may be waiting on its results.
694  ParentContext parentContext(&streamContext_);
696 
697  auto expt = results_inserter_->runModuleDirectly<Traits>(info, streamID_, parentContext, &streamContext_);
698  if (expt) {
699  std::rethrow_exception(expt);
700  }
701  } catch (cms::Exception& ex) {
702  if (not iExcept) {
703  if (ex.context().empty()) {
704  std::ostringstream ost;
705  ost << "Processing Event " << info.principal().id();
706  ex.addContext(ost.str());
707  }
708  iExcept.store(new std::exception_ptr(std::current_exception()));
709  }
710  } catch (...) {
711  if (not iExcept) {
712  iExcept.store(new std::exception_ptr(std::current_exception()));
713  }
714  }
715  }
716  std::exception_ptr ptr;
717  if (iExcept) {
718  ptr = *iExcept.load();
719  }
720  iWait.doneWaiting(ptr);
721  }
722 
723  std::exception_ptr StreamSchedule::finishProcessOneEvent(std::exception_ptr iExcept) {
725 
726  if (iExcept) {
727  //add context information to the exception and print message
728  try {
729  convertException::wrap([&]() { std::rethrow_exception(iExcept); });
730  } catch (cms::Exception& ex) {
731  bool const cleaningUpAfterException = false;
732  if (ex.context().empty()) {
733  addContextAndPrintException("Calling function StreamSchedule::processOneEvent", ex, cleaningUpAfterException);
734  } else {
735  addContextAndPrintException("", ex, cleaningUpAfterException);
736  }
737  iExcept = std::current_exception();
738  }
739 
740  actReg_->preStreamEarlyTerminationSignal_(streamContext_, TerminationOrigin::ExceptionFromThisContext);
741  }
742  // Caught exception is propagated to the caller
743  CMS_SA_ALLOW try { Traits::postScheduleSignal(actReg_.get(), &streamContext_); } catch (...) {
744  if (not iExcept) {
745  iExcept = std::current_exception();
746  }
747  }
748  if (not iExcept) {
750  }
751 
752  return iExcept;
753  }
754 
755  void StreamSchedule::availablePaths(std::vector<std::string>& oLabelsToFill) const {
756  oLabelsToFill.reserve(trig_paths_.size());
757  std::transform(trig_paths_.begin(),
758  trig_paths_.end(),
759  std::back_inserter(oLabelsToFill),
760  std::bind(&Path::name, std::placeholders::_1));
761  }
762 
763  void StreamSchedule::modulesInPath(std::string const& iPathLabel, std::vector<std::string>& oLabelsToFill) const {
764  TrigPaths::const_iterator itFound = std::find_if(
765  trig_paths_.begin(),
766  trig_paths_.end(),
767  std::bind(std::equal_to<std::string>(), iPathLabel, std::bind(&Path::name, std::placeholders::_1)));
768  if (itFound != trig_paths_.end()) {
769  oLabelsToFill.reserve(itFound->size());
770  for (size_t i = 0; i < itFound->size(); ++i) {
771  oLabelsToFill.push_back(itFound->getWorker(i)->description()->moduleLabel());
772  }
773  }
774  }
775 
777  std::vector<ModuleDescription const*>& descriptions,
778  unsigned int hint) const {
779  descriptions.clear();
780  bool found = false;
781  TrigPaths::const_iterator itFound;
782 
783  if (hint < trig_paths_.size()) {
784  itFound = trig_paths_.begin() + hint;
785  if (itFound->name() == iPathLabel)
786  found = true;
787  }
788  if (!found) {
789  // if the hint did not work, do it the slow way
790  itFound = std::find_if(
791  trig_paths_.begin(),
792  trig_paths_.end(),
793  std::bind(std::equal_to<std::string>(), iPathLabel, std::bind(&Path::name, std::placeholders::_1)));
794  if (itFound != trig_paths_.end())
795  found = true;
796  }
797  if (found) {
798  descriptions.reserve(itFound->size());
799  for (size_t i = 0; i < itFound->size(); ++i) {
800  descriptions.push_back(itFound->getWorker(i)->description());
801  }
802  }
803  }
804 
806  std::vector<ModuleDescription const*>& descriptions,
807  unsigned int hint) const {
808  descriptions.clear();
809  bool found = false;
810  TrigPaths::const_iterator itFound;
811 
812  if (hint < end_paths_.size()) {
813  itFound = end_paths_.begin() + hint;
814  if (itFound->name() == iEndPathLabel)
815  found = true;
816  }
817  if (!found) {
818  // if the hint did not work, do it the slow way
819  itFound = std::find_if(
820  end_paths_.begin(),
821  end_paths_.end(),
822  std::bind(std::equal_to<std::string>(), iEndPathLabel, std::bind(&Path::name, std::placeholders::_1)));
823  if (itFound != end_paths_.end())
824  found = true;
825  }
826  if (found) {
827  descriptions.reserve(itFound->size());
828  for (size_t i = 0; i < itFound->size(); ++i) {
829  descriptions.push_back(itFound->getWorker(i)->description());
830  }
831  }
832  }
833 
834  static void fillModuleInPathSummary(Path const& path, size_t which, ModuleInPathSummary& sum) {
835  sum.timesVisited += path.timesVisited(which);
836  sum.timesPassed += path.timesPassed(which);
837  sum.timesFailed += path.timesFailed(which);
838  sum.timesExcept += path.timesExcept(which);
839  sum.moduleLabel = path.getWorker(which)->description()->moduleLabel();
840  }
841 
842  static void fillPathSummary(Path const& path, PathSummary& sum) {
843  sum.name = path.name();
844  sum.bitPosition = path.bitPosition();
845  sum.timesRun += path.timesRun();
846  sum.timesPassed += path.timesPassed();
847  sum.timesFailed += path.timesFailed();
848  sum.timesExcept += path.timesExcept();
849 
850  Path::size_type sz = path.size();
851  if (sum.moduleInPathSummaries.empty()) {
852  std::vector<ModuleInPathSummary> temp(sz);
853  for (size_t i = 0; i != sz; ++i) {
854  fillModuleInPathSummary(path, i, temp[i]);
855  }
856  sum.moduleInPathSummaries.swap(temp);
857  } else {
858  assert(sz == sum.moduleInPathSummaries.size());
859  for (size_t i = 0; i != sz; ++i) {
861  }
862  }
863  }
864 
865  static void fillWorkerSummaryAux(Worker const& w, WorkerSummary& sum) {
866  sum.timesVisited += w.timesVisited();
867  sum.timesRun += w.timesRun();
868  sum.timesPassed += w.timesPassed();
869  sum.timesFailed += w.timesFailed();
870  sum.timesExcept += w.timesExcept();
871  sum.moduleLabel = w.description()->moduleLabel();
872  }
873 
874  static void fillWorkerSummary(Worker const* pw, WorkerSummary& sum) { fillWorkerSummaryAux(*pw, sum); }
875 
880 
881  fill_summary(trig_paths_, rep.trigPathSummaries, &fillPathSummary);
882  fill_summary(end_paths_, rep.endPathSummaries, &fillPathSummary);
883  fill_summary(allWorkers(), rep.workerSummaries, &fillWorkerSummary);
884  }
885 
887  using std::placeholders::_1;
889  for_all(trig_paths_, std::bind(&Path::clearCounters, _1));
890  for_all(end_paths_, std::bind(&Path::clearCounters, _1));
891  for_all(allWorkers(), std::bind(&Worker::clearCounters, _1));
892  }
893 
895  skippingEvent_ = false;
896  results_->reset();
897  }
898 
900 
902  //must be sure we have cleared the count first
903  for (auto& count : earlyDeleteBranchToCount_) {
904  count.count = 0;
905  }
906  //now reset based on how many helpers use that branch
908  ++(earlyDeleteBranchToCount_[index].count);
909  }
910  for (auto& helper : earlyDeleteHelpers_) {
911  helper.reset();
912  }
913  }
914 
916  std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
917  std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
918  ExceptionToActionTable const& actions) {
919  int bitpos = 0;
920  unsigned int indexEmpty = 0;
921  unsigned int indexOfPath = 0;
922  for (auto& pathStatusInserter : pathStatusInserters) {
923  std::shared_ptr<PathStatusInserter> inserterPtr = get_underlying(pathStatusInserter);
924  WorkerPtr workerPtr(
925  new edm::WorkerT<PathStatusInserter::ModuleType>(inserterPtr, inserterPtr->moduleDescription(), &actions));
926  pathStatusInserterWorkers_.emplace_back(workerPtr);
927  workerPtr->setActivityRegistry(actReg_);
928  addToAllWorkers(workerPtr.get());
929 
930  // A little complexity here because a C++ Path object is not
931  // instantiated and put into end_paths if there are no modules
932  // on the configured path.
933  if (indexEmpty < empty_trig_paths_.size() && bitpos == empty_trig_paths_.at(indexEmpty)) {
934  ++indexEmpty;
935  } else {
936  trig_paths_.at(indexOfPath).setPathStatusInserter(inserterPtr.get(), workerPtr.get());
937  ++indexOfPath;
938  }
939  ++bitpos;
940  }
941 
942  bitpos = 0;
943  indexEmpty = 0;
944  indexOfPath = 0;
945  for (auto& endPathStatusInserter : endPathStatusInserters) {
946  std::shared_ptr<EndPathStatusInserter> inserterPtr = get_underlying(endPathStatusInserter);
947  WorkerPtr workerPtr(
948  new edm::WorkerT<EndPathStatusInserter::ModuleType>(inserterPtr, inserterPtr->moduleDescription(), &actions));
949  endPathStatusInserterWorkers_.emplace_back(workerPtr);
950  workerPtr->setActivityRegistry(actReg_);
951  addToAllWorkers(workerPtr.get());
952 
953  // A little complexity here because a C++ Path object is not
954  // instantiated and put into end_paths if there are no modules
955  // on the configured path.
956  if (indexEmpty < empty_end_paths_.size() && bitpos == empty_end_paths_.at(indexEmpty)) {
957  ++indexEmpty;
958  } else {
959  end_paths_.at(indexOfPath).setPathStatusInserter(nullptr, workerPtr.get());
960  ++indexOfPath;
961  }
962  ++bitpos;
963  }
964  }
965 } // namespace edm
static void fillModuleInPathSummary(Path const &path, size_t which, ModuleInPathSummary &sum)
void moduleDescriptionsInEndPath(std::string const &iEndPathLabel, std::vector< ModuleDescription const * > &descriptions, unsigned int hint) const
std::vector< PathSummary > endPathSummaries
Definition: TriggerReport.h:59
T getUntrackedParameter(std::string const &, T const &) const
std::string const & branchName() const
ServiceToken lock() const
Definition: ServiceToken.h:101
string rep
Definition: cuy.py:1189
pathNames_ & tns()), endPathNames_(&tns.getEndPaths()), wantSummary_(tns.wantSummary()
Definition: Schedule.cc:691
int timesRun() const
Definition: Path.h:78
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
int bitPosition() const
Definition: Path.h:73
def which
Definition: eostools.py:336
static const TGPicture * info(bool iBackgroundIsBlack)
#define CMS_SA_ALLOW
ModuleDescription const * description() const
Definition: Worker.h:188
std::vector< int > empty_trig_paths_
void moduleDescriptionsInPath(std::string const &iPathLabel, std::vector< ModuleDescription const * > &descriptions, unsigned int hint) const
void fillTrigPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, TrigResPtr, std::vector< std::string > const &endPathNames)
std::vector< edm::propagate_const< WorkerPtr > > pathStatusInserterWorkers_
void endStream(StreamID iID, StreamContext &streamContext)
const double w
Definition: UKUtility.cc:23
virtual void replaceModuleFor(Worker *) const =0
Strings const & getEndPaths() const
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventIDconst &, edm::Timestampconst & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
int totalEventsFailed() const
void processAccumulatorsAsync(WaitingTaskHolder, typename T::TransitionInfoType const &, ServiceToken const &, StreamID, ParentContext const &, typename T::Context const *)
std::array< SelectedProducts, NumBranchTypes > SelectedProductsForBranchType
void initializeEarlyDelete(ModuleRegistry &modReg, edm::ParameterSet const &opts, edm::ProductRegistry const &preg, bool allowEarlyDelete)
std::shared_ptr< HLTGlobalStatus > TrigResPtr
WorkersInPath::size_type size_type
Definition: Path.h:46
EventPrincipal & principal()
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
void addToUnscheduledWorkers(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration > processConfiguration, std::string label, std::set< std::string > &unscheduledLabels, std::vector< std::string > &shouldBeUsedLabels)
int timesPassed() const
Definition: Worker.h:234
void processOneEventAsync(WaitingTaskHolder iTask, EventTransitionInfo &, ServiceToken const &token, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters)
std::vector< BranchToCount > earlyDeleteBranchToCount_
EventID const & id() const
int totalEvents() const
def unique
Definition: tier0.py:24
void setupOnDemandSystem(EventTransitionInfo const &)
void clearCounters()
Definition: Worker.h:222
void addContextAndPrintException(char const *context, cms::Exception &ex, bool disablePrint)
std::string const & moduleName() const
processConfiguration
Definition: Schedule.cc:687
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t Func __host__ __device__ V int Func func
std::string const & category() const
Definition: Exception.cc:143
void addToAllWorkers(Worker *w)
int timesExcept() const
Definition: Path.h:81
exception_actions::ActionCodes find(const std::string &category) const
size_type size() const
Definition: Path.h:84
ParameterSet getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
edm::propagate_const< WorkerPtr > results_inserter_
ExceptionToActionTable const & actionTable() const
returns the action table
void beginStream(StreamID id, StreamContext &streamContext)
Definition: Worker.cc:328
void deleteModuleIfExists(std::string const &moduleLabel)
std::shared_ptr< Worker > WorkerPtr
std::vector< WorkerSummary > workerSummaries
Definition: TriggerReport.h:60
void fillWorkers(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &name, bool ignoreFilters, PathWorkers &out, std::vector< std::string > const &endPathNames)
assert(be >=bs)
actions
Definition: Schedule.cc:687
std::string const & moduleLabel() const
unsigned int number_of_unscheduled_modules_
WorkerManager workerManager_
std::shared_ptr< ActivityRegistry > actReg_
void finishedPaths(std::atomic< std::exception_ptr * > &, WaitingTaskHolder, EventTransitionInfo &)
tuple result
Definition: mps_fire.py:311
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
std::string moduleLabel
Definition: TriggerReport.h:51
char const * label
void fillEndPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, std::vector< std::string > const &endPathNames)
std::vector< WorkerInPath > PathWorkers
int timesExcept() const
Definition: Worker.h:236
int totalEventsPassed() const
std::vector< int > empty_end_paths_
void getTriggerReport(TriggerReport &rep) const
void doneWaiting(std::exception_ptr iExcept)
std::vector< PathSummary > trigPathSummaries
Definition: TriggerReport.h:58
EventSummary eventSummary
Definition: TriggerReport.h:57
accept
Definition: HLTenums.h:18
int timesVisited() const
Definition: Worker.h:233
std::string name
Definition: TriggerReport.h:40
Definition: Path.h:41
StreamContext streamContext_
std::vector< std::string > vstring
def move
Definition: eostools.py:511
std::list< std::string > const & context() const
Definition: Exception.cc:147
static void fillPathSummary(Path const &path, PathSummary &sum)
int timesPassed() const
Definition: Path.h:79
bool getMapped(key_type const &k, value_type &result) const
Definition: Registry.cc:17
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:101
int timesRun() const
Definition: Worker.h:232
std::exception_ptr finishProcessOneEvent(std::exception_ptr)
void deleteModule(std::string const &iLabel)
Delete the module with label iLabel.
areg
Definition: Schedule.cc:687
static void fillWorkerSummaryAux(Worker const &w, WorkerSummary &sum)
virtual Types moduleType() const =0
Log< level::Info, false > LogInfo
void beginStream(StreamID iID, StreamContext &streamContext)
void clearCounters()
Clear all the counters in the trigger report.
unsigned int value() const
Definition: StreamID.h:43
std::vector< unsigned int > earlyDeleteHelperToBranchIndicies_
void forAllModuleHolders(F iFunc)
edm::propagate_const< TrigResPtr > results_
T getParameter(std::string const &) const
Definition: ParameterSet.h:303
bool search_all(ForwardSequence const &s, Datum const &d)
Definition: Algorithms.h:36
int timesVisited(size_type i) const
Definition: Path.h:85
double b
Definition: hdecay.h:118
constexpr T & get_underlying(propagate_const< T > &)
void addContext(std::string const &context)
Definition: Exception.cc:165
virtual std::unique_ptr< OutputModuleCommunicator > createOutputModuleCommunicator()=0
string action
Definition: mps_fire.py:183
tbb::task_group * group() const noexcept
constexpr element_type const * get() const
int timesFailed() const
Definition: Worker.h:235
static void fillWorkerSummary(Worker const *pw, WorkerSummary &sum)
StreamSchedule(std::shared_ptr< TriggerResultInserter > inserter, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, std::shared_ptr< ModuleRegistry >, ParameterSet &proc_pset, service::TriggerNamesService const &tns, PreallocationConfiguration const &prealloc, ProductRegistry &pregistry, BranchIDListHelper &branchIDListHelper, ExceptionToActionTable const &actions, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration > processConfiguration, bool allowEarlyDelete, StreamID streamID, ProcessContext const *processContext)
std::atomic< bool > skippingEvent_
std::string const & name() const
Definition: Path.h:74
void setupResolvers(Principal &principal)
Strings const & getTrigPaths() const
Worker const * getWorker(size_type i) const
Definition: Path.h:89
string end
Definition: dataset.py:937
void replaceModule(maker::ModuleHolder *iMod, std::string const &iLabel)
clone the type of module with label iLabel but configure with iPSet.
std::vector< edm::propagate_const< WorkerPtr > > endPathStatusInserterWorkers_
void makePathStatusInserters(std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, ExceptionToActionTable const &actions)
std::vector< ModuleInPathSummary > moduleInPathSummaries
Definition: TriggerReport.h:41
auto wrap(F iFunc) -> decltype(iFunc())
Log< level::Warning, false > LogWarning
preg
Definition: Schedule.cc:687
TrigResConstPtr results() const
std::vector< std::string > vstring
Definition: Schedule.cc:667
std::vector< std::string > set_difference(std::vector< std::string > const &v1, std::vector< std::string > const &v2)
tuple size
Write out results.
static Registry * instance()
Definition: Registry.cc:12
Worker * getWorker(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &label)
void clearCounters()
Definition: Path.cc:198
void availablePaths(std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill the labels for all paths in the process
std::vector< EarlyDeleteHelper > earlyDeleteHelpers_
void modulesInPath(std::string const &iPathLabel, std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel ...
prealloc
Definition: Schedule.cc:687
void addToAllWorkers(Worker *w)
int timesFailed() const
Definition: Path.h:80
void printCmsExceptionWarning(char const *behavior, cms::Exception const &e)
ParameterSet * getPSetForUpdate(std::string const &name, bool &isTracked)
std::vector< std::string > set_intersection(std::vector< std::string > const &v1, std::vector< std::string > const &v2)
unsigned transform(const HcalDetId &id, unsigned transformCode)