CMS 3D CMS Logo

StreamSchedule.cc
Go to the documentation of this file.
2 
28 
30 
31 #include <algorithm>
32 #include <cassert>
33 #include <cstdlib>
34 #include <functional>
35 #include <iomanip>
36 #include <list>
37 #include <map>
38 #include <exception>
39 
40 namespace edm {
41 
42  namespace {
43 
44  // Function template to transform each element in the input range to
45  // a value placed into the output range. The supplied function
46  // should take a const_reference to the 'input', and write to a
47  // reference to the 'output'.
48  template <typename InputIterator, typename ForwardIterator, typename Func>
49  void transform_into(InputIterator begin, InputIterator end, ForwardIterator out, Func func) {
50  for (; begin != end; ++begin, ++out)
51  func(*begin, *out);
52  }
53 
54  // Function template that takes a sequence 'from', a sequence
55  // 'to', and a callable object 'func'. It and applies
56  // transform_into to fill the 'to' sequence with the values
57  // calcuated by the callable object, taking care to fill the
58  // outupt only if all calls succeed.
59  template <typename FROM, typename TO, typename FUNC>
60  void fill_summary(FROM const& from, TO& to, FUNC func) {
61  if (to.size() != from.size()) {
62  TO temp(from.size());
63  transform_into(from.begin(), from.end(), temp.begin(), func);
64  to.swap(temp);
65  } else {
66  transform_into(from.begin(), from.end(), to.begin(), func);
67  }
68  }
69 
70  // -----------------------------
71 
72  // Here we make the trigger results inserter directly. This should
73  // probably be a utility in the WorkerRegistry or elsewhere.
74 
75  StreamSchedule::WorkerPtr makeInserter(ExceptionToActionTable const& actions,
76  std::shared_ptr<ActivityRegistry> areg,
77  std::shared_ptr<TriggerResultInserter> inserter) {
79  new edm::WorkerT<TriggerResultInserter::ModuleType>(inserter, inserter->moduleDescription(), &actions));
80  ptr->setActivityRegistry(areg);
81  return ptr;
82  }
83 
84  void initializeBranchToReadingWorker(ParameterSet const& opts,
85  ProductRegistry const& preg,
86  std::multimap<std::string, Worker*>& branchToReadingWorker) {
87  // See if any data has been marked to be deleted early (removing any duplicates)
88  auto vBranchesToDeleteEarly = opts.getUntrackedParameter<std::vector<std::string>>("canDeleteEarly");
89  if (not vBranchesToDeleteEarly.empty()) {
90  std::sort(vBranchesToDeleteEarly.begin(), vBranchesToDeleteEarly.end(), std::less<std::string>());
91  vBranchesToDeleteEarly.erase(std::unique(vBranchesToDeleteEarly.begin(), vBranchesToDeleteEarly.end()),
92  vBranchesToDeleteEarly.end());
93 
94  // Are the requested items in the product registry?
95  auto allBranchNames = preg.allBranchNames();
96  //the branch names all end with a period, which we do not want to compare with
97  for (auto& b : allBranchNames) {
98  b.resize(b.size() - 1);
99  }
100  std::sort(allBranchNames.begin(), allBranchNames.end(), std::less<std::string>());
101  std::vector<std::string> temp;
102  temp.reserve(vBranchesToDeleteEarly.size());
103 
104  std::set_intersection(vBranchesToDeleteEarly.begin(),
105  vBranchesToDeleteEarly.end(),
106  allBranchNames.begin(),
107  allBranchNames.end(),
108  std::back_inserter(temp));
109  vBranchesToDeleteEarly.swap(temp);
110  if (temp.size() != vBranchesToDeleteEarly.size()) {
111  std::vector<std::string> missingProducts;
112  std::set_difference(temp.begin(),
113  temp.end(),
114  vBranchesToDeleteEarly.begin(),
115  vBranchesToDeleteEarly.end(),
116  std::back_inserter(missingProducts));
117  LogInfo l("MissingProductsForCanDeleteEarly");
118  l << "The following products in the 'canDeleteEarly' list are not available in this job and will be ignored.";
119  for (auto const& n : missingProducts) {
120  l << "\n " << n;
121  }
122  }
123  //set placeholder for the branch, we will remove the nullptr if a
124  // module actually wants the branch.
125  for (auto const& branch : vBranchesToDeleteEarly) {
126  branchToReadingWorker.insert(std::make_pair(branch, static_cast<Worker*>(nullptr)));
127  }
128  }
129  }
130  } // namespace
131 
132  // -----------------------------
133 
134  typedef std::vector<std::string> vstring;
135 
136  // -----------------------------
137 
139  std::shared_ptr<TriggerResultInserter> inserter,
140  std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
141  std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
142  std::shared_ptr<ModuleRegistry> modReg,
143  ParameterSet& proc_pset,
144  service::TriggerNamesService const& tns,
145  PreallocationConfiguration const& prealloc,
146  ProductRegistry& preg,
147  BranchIDListHelper& branchIDListHelper,
149  std::shared_ptr<ActivityRegistry> areg,
150  std::shared_ptr<ProcessConfiguration> processConfiguration,
151  bool allowEarlyDelete,
152  StreamID streamID,
153  ProcessContext const* processContext)
154  : workerManager_(modReg, areg, actions),
155  actReg_(areg),
156  results_(new HLTGlobalStatus(tns.getTrigPaths().size())),
157  results_inserter_(),
158  trig_paths_(),
159  end_paths_(),
160  total_events_(),
161  total_passed_(),
162  number_of_unscheduled_modules_(0),
163  streamID_(streamID),
164  streamContext_(streamID_, processContext),
165  endpathsAreActive_(true),
166  skippingEvent_(false) {
167  ParameterSet const& opts = proc_pset.getUntrackedParameterSet("options", ParameterSet());
168  bool hasPath = false;
169  std::vector<std::string> const& pathNames = tns.getTrigPaths();
170  std::vector<std::string> const& endPathNames = tns.getEndPaths();
171 
172  int trig_bitpos = 0;
173  trig_paths_.reserve(pathNames.size());
174  for (auto const& trig_name : pathNames) {
175  fillTrigPath(proc_pset, preg, &prealloc, processConfiguration, trig_bitpos, trig_name, results(), endPathNames);
176  ++trig_bitpos;
177  hasPath = true;
178  }
179 
180  if (hasPath) {
181  // the results inserter stands alone
182  inserter->setTrigResultForStream(streamID.value(), results());
183 
184  results_inserter_ = makeInserter(actions, actReg_, inserter);
186  }
187 
188  // fill normal endpaths
189  int bitpos = 0;
190  end_paths_.reserve(endPathNames.size());
191  for (auto const& end_path_name : endPathNames) {
192  fillEndPath(proc_pset, preg, &prealloc, processConfiguration, bitpos, end_path_name, endPathNames);
193  ++bitpos;
194  }
195 
196  makePathStatusInserters(pathStatusInserters, endPathStatusInserters, actions);
197 
198  //See if all modules were used
199  std::set<std::string> usedWorkerLabels;
200  for (auto const& worker : allWorkers()) {
201  usedWorkerLabels.insert(worker->description()->moduleLabel());
202  }
203  std::vector<std::string> modulesInConfig(proc_pset.getParameter<std::vector<std::string>>("@all_modules"));
204  std::set<std::string> modulesInConfigSet(modulesInConfig.begin(), modulesInConfig.end());
205  std::vector<std::string> unusedLabels;
206  set_difference(modulesInConfigSet.begin(),
207  modulesInConfigSet.end(),
208  usedWorkerLabels.begin(),
209  usedWorkerLabels.end(),
210  back_inserter(unusedLabels));
211  std::set<std::string> unscheduledLabels;
212  std::vector<std::string> shouldBeUsedLabels;
213  if (!unusedLabels.empty()) {
214  //Need to
215  // 1) create worker
216  // 2) if it is a WorkerT<EDProducer>, add it to our list
217  // 3) hand list to our delayed reader
218  for (auto const& label : unusedLabels) {
219  bool isTracked;
220  ParameterSet* modulePSet(proc_pset.getPSetForUpdate(label, isTracked));
221  assert(isTracked);
222  assert(modulePSet != nullptr);
224  *modulePSet, preg, &prealloc, processConfiguration, label, unscheduledLabels, shouldBeUsedLabels);
225  }
226  if (!shouldBeUsedLabels.empty()) {
227  std::ostringstream unusedStream;
228  unusedStream << "'" << shouldBeUsedLabels.front() << "'";
229  for (std::vector<std::string>::iterator itLabel = shouldBeUsedLabels.begin() + 1,
230  itLabelEnd = shouldBeUsedLabels.end();
231  itLabel != itLabelEnd;
232  ++itLabel) {
233  unusedStream << ",'" << *itLabel << "'";
234  }
235  LogInfo("path") << "The following module labels are not assigned to any path:\n" << unusedStream.str() << "\n";
236  }
237  }
238  number_of_unscheduled_modules_ = unscheduledLabels.size();
239 
240  initializeEarlyDelete(*modReg, opts, preg, allowEarlyDelete);
241 
242  } // StreamSchedule::StreamSchedule
243 
245  edm::ParameterSet const& opts,
246  edm::ProductRegistry const& preg,
247  bool allowEarlyDelete) {
248  //for now, if have a subProcess, don't allow early delete
249  // In the future we should use the SubProcess's 'keep list' to decide what can be kept
250  if (not allowEarlyDelete)
251  return;
252 
253  //see if 'canDeleteEarly' was set and if so setup the list with those products actually
254  // registered for this job
255  std::multimap<std::string, Worker*> branchToReadingWorker;
256  initializeBranchToReadingWorker(opts, preg, branchToReadingWorker);
257 
258  //If no delete early items have been specified we don't have to do anything
259  if (branchToReadingWorker.empty()) {
260  return;
261  }
262  const std::vector<std::string> kEmpty;
263  std::map<Worker*, unsigned int> reserveSizeForWorker;
264  unsigned int upperLimitOnReadingWorker = 0;
265  unsigned int upperLimitOnIndicies = 0;
266  unsigned int nUniqueBranchesToDelete = branchToReadingWorker.size();
267 
268  //talk with output modules first
269  modReg.forAllModuleHolders([&branchToReadingWorker, &nUniqueBranchesToDelete](maker::ModuleHolder* iHolder) {
270  auto comm = iHolder->createOutputModuleCommunicator();
271  if (comm) {
272  if (!branchToReadingWorker.empty()) {
273  //If an OutputModule needs a product, we can't delete it early
274  // so we should remove it from our list
275  SelectedProductsForBranchType const& kept = comm->keptProducts();
276  for (auto const& item : kept[InEvent]) {
277  BranchDescription const& desc = *item.first;
278  auto found = branchToReadingWorker.equal_range(desc.branchName());
279  if (found.first != found.second) {
280  --nUniqueBranchesToDelete;
281  branchToReadingWorker.erase(found.first, found.second);
282  }
283  }
284  }
285  }
286  });
287 
288  if (branchToReadingWorker.empty()) {
289  return;
290  }
291 
292  for (auto w : allWorkers()) {
293  //determine if this module could read a branch we want to delete early
294  auto pset = pset::Registry::instance()->getMapped(w->description()->parameterSetID());
295  if (nullptr != pset) {
296  auto branches = pset->getUntrackedParameter<std::vector<std::string>>("mightGet", kEmpty);
297  if (not branches.empty()) {
298  ++upperLimitOnReadingWorker;
299  }
300  for (auto const& branch : branches) {
301  auto found = branchToReadingWorker.equal_range(branch);
302  if (found.first != found.second) {
303  ++upperLimitOnIndicies;
304  ++reserveSizeForWorker[w];
305  if (nullptr == found.first->second) {
306  found.first->second = w;
307  } else {
308  branchToReadingWorker.insert(make_pair(found.first->first, w));
309  }
310  }
311  }
312  }
313  }
314  {
315  auto it = branchToReadingWorker.begin();
316  std::vector<std::string> unusedBranches;
317  while (it != branchToReadingWorker.end()) {
318  if (it->second == nullptr) {
319  unusedBranches.push_back(it->first);
320  //erasing the object invalidates the iterator so must advance it first
321  auto temp = it;
322  ++it;
323  branchToReadingWorker.erase(temp);
324  } else {
325  ++it;
326  }
327  }
328  if (not unusedBranches.empty()) {
329  LogWarning l("UnusedProductsForCanDeleteEarly");
330  l << "The following products in the 'canDeleteEarly' list are not used in this job and will be ignored.\n"
331  " If possible, remove the producer from the job or add the product to the producer's own 'mightGet' list.";
332  for (auto const& n : unusedBranches) {
333  l << "\n " << n;
334  }
335  }
336  }
337  if (!branchToReadingWorker.empty()) {
338  earlyDeleteHelpers_.reserve(upperLimitOnReadingWorker);
339  earlyDeleteHelperToBranchIndicies_.resize(upperLimitOnIndicies, 0);
340  earlyDeleteBranchToCount_.reserve(nUniqueBranchesToDelete);
341  std::map<const Worker*, EarlyDeleteHelper*> alreadySeenWorkers;
342  std::string lastBranchName;
343  size_t nextOpenIndex = 0;
344  unsigned int* beginAddress = &(earlyDeleteHelperToBranchIndicies_.front());
345  for (auto& branchAndWorker : branchToReadingWorker) {
346  if (lastBranchName != branchAndWorker.first) {
347  //have to put back the period we removed earlier in order to get the proper name
348  BranchID bid(branchAndWorker.first + ".");
349  earlyDeleteBranchToCount_.emplace_back(bid, 0U);
350  lastBranchName = branchAndWorker.first;
351  }
352  auto found = alreadySeenWorkers.find(branchAndWorker.second);
353  if (alreadySeenWorkers.end() == found) {
354  //NOTE: we will set aside enough space in earlyDeleteHelperToBranchIndicies_ to accommodate
355  // all the branches that might be read by this worker. However, initially we will only tell the
356  // EarlyDeleteHelper about the first one. As additional branches are added via 'appendIndex' the
357  // EarlyDeleteHelper will automatically advance its internal end pointer.
358  size_t index = nextOpenIndex;
359  size_t nIndices = reserveSizeForWorker[branchAndWorker.second];
361  earlyDeleteHelpers_.emplace_back(beginAddress + index, beginAddress + index + 1, &earlyDeleteBranchToCount_);
362  branchAndWorker.second->setEarlyDeleteHelper(&(earlyDeleteHelpers_.back()));
363  alreadySeenWorkers.insert(std::make_pair(branchAndWorker.second, &(earlyDeleteHelpers_.back())));
364  nextOpenIndex += nIndices;
365  } else {
366  found->second->appendIndex(earlyDeleteBranchToCount_.size() - 1);
367  }
368  }
369 
370  //Now we can compactify the earlyDeleteHelperToBranchIndicies_ since we may have over estimated the
371  // space needed for each module
372  auto itLast = earlyDeleteHelpers_.begin();
373  for (auto it = earlyDeleteHelpers_.begin() + 1; it != earlyDeleteHelpers_.end(); ++it) {
374  if (itLast->end() != it->begin()) {
375  //figure the offset for next Worker since it hasn't been moved yet so it has the original address
376  unsigned int delta = it->begin() - itLast->end();
377  it->shiftIndexPointers(delta);
378 
380  earlyDeleteHelperToBranchIndicies_.begin() + (itLast->end() - beginAddress),
381  earlyDeleteHelperToBranchIndicies_.begin() + (it->begin() - beginAddress));
382  }
383  itLast = it;
384  }
386  earlyDeleteHelperToBranchIndicies_.begin() + (itLast->end() - beginAddress),
388 
389  //now tell the paths about the deleters
390  for (auto& p : trig_paths_) {
391  p.setEarlyDeleteHelpers(alreadySeenWorkers);
392  }
393  for (auto& p : end_paths_) {
394  p.setEarlyDeleteHelpers(alreadySeenWorkers);
395  }
397  }
398  }
399 
401  ProductRegistry& preg,
402  PreallocationConfiguration const* prealloc,
403  std::shared_ptr<ProcessConfiguration const> processConfiguration,
404  std::string const& pathName,
405  bool ignoreFilters,
406  PathWorkers& out,
407  std::vector<std::string> const& endPathNames) {
408  vstring modnames = proc_pset.getParameter<vstring>(pathName);
409  PathWorkers tmpworkers;
410 
411  unsigned int placeInPath = 0;
412  for (auto const& name : modnames) {
413  //Modules except EDFilters are set to run concurrently by default
414  bool doNotRunConcurrently = false;
416  if (name[0] == '!') {
417  filterAction = WorkerInPath::Veto;
418  } else if (name[0] == '-' or name[0] == '+') {
419  filterAction = WorkerInPath::Ignore;
420  }
421  if (name[0] == '|' or name[0] == '+') {
422  //cms.wait was specified so do not run concurrently
423  doNotRunConcurrently = true;
424  }
425 
427  if (filterAction != WorkerInPath::Normal or name[0] == '|') {
428  moduleLabel.erase(0, 1);
429  }
430 
431  bool isTracked;
432  ParameterSet* modpset = proc_pset.getPSetForUpdate(moduleLabel, isTracked);
433  if (modpset == nullptr) {
434  std::string pathType("endpath");
435  if (!search_all(endPathNames, pathName)) {
436  pathType = std::string("path");
437  }
439  << "The unknown module label \"" << moduleLabel << "\" appears in " << pathType << " \"" << pathName
440  << "\"\n please check spelling or remove that label from the path.";
441  }
442  assert(isTracked);
443 
444  Worker* worker = workerManager_.getWorker(*modpset, preg, prealloc, processConfiguration, moduleLabel);
445  if (ignoreFilters && filterAction != WorkerInPath::Ignore && worker->moduleType() == Worker::kFilter) {
446  // We have a filter on an end path, and the filter is not explicitly ignored.
447  // See if the filter is allowed.
448  std::vector<std::string> allowed_filters = proc_pset.getUntrackedParameter<vstring>("@filters_on_endpaths");
449  if (!search_all(allowed_filters, worker->description()->moduleName())) {
450  // Filter is not allowed. Ignore the result, and issue a warning.
451  filterAction = WorkerInPath::Ignore;
452  LogWarning("FilterOnEndPath") << "The EDFilter '" << worker->description()->moduleName()
453  << "' with module label '" << moduleLabel << "' appears on EndPath '"
454  << pathName << "'.\n"
455  << "The return value of the filter will be ignored.\n"
456  << "To suppress this warning, either remove the filter from the endpath,\n"
457  << "or explicitly ignore it in the configuration by using cms.ignore().\n";
458  }
459  }
460  bool runConcurrently = not doNotRunConcurrently;
461  if (runConcurrently && worker->moduleType() == Worker::kFilter and filterAction != WorkerInPath::Ignore) {
462  runConcurrently = false;
463  }
464  tmpworkers.emplace_back(worker, filterAction, placeInPath, runConcurrently);
465  ++placeInPath;
466  }
467 
468  out.swap(tmpworkers);
469  }
470 
472  ProductRegistry& preg,
473  PreallocationConfiguration const* prealloc,
474  std::shared_ptr<ProcessConfiguration const> processConfiguration,
475  int bitpos,
476  std::string const& name,
477  TrigResPtr trptr,
478  std::vector<std::string> const& endPathNames) {
479  PathWorkers tmpworkers;
480  fillWorkers(proc_pset, preg, prealloc, processConfiguration, name, false, tmpworkers, endPathNames);
481 
482  // an empty path will cause an extra bit that is not used
483  if (!tmpworkers.empty()) {
484  trig_paths_.emplace_back(bitpos,
485  name,
486  tmpworkers,
487  trptr,
488  actionTable(),
489  actReg_,
493  } else {
494  empty_trig_paths_.push_back(bitpos);
495  }
496  for (WorkerInPath const& workerInPath : tmpworkers) {
497  addToAllWorkers(workerInPath.getWorker());
498  }
499  }
500 
502  ProductRegistry& preg,
503  PreallocationConfiguration const* prealloc,
504  std::shared_ptr<ProcessConfiguration const> processConfiguration,
505  int bitpos,
506  std::string const& name,
507  std::vector<std::string> const& endPathNames) {
508  PathWorkers tmpworkers;
509  fillWorkers(proc_pset, preg, prealloc, processConfiguration, name, true, tmpworkers, endPathNames);
510 
511  if (!tmpworkers.empty()) {
512  //EndPaths are not supposed to stop if SkipEvent type exception happens
513  end_paths_.emplace_back(bitpos,
514  name,
515  tmpworkers,
516  TrigResPtr(),
517  actionTable(),
518  actReg_,
520  nullptr,
522  } else {
523  empty_end_paths_.push_back(bitpos);
524  }
525  for (WorkerInPath const& workerInPath : tmpworkers) {
526  addToAllWorkers(workerInPath.getWorker());
527  }
528  }
529 
531 
533 
535  Worker* found = nullptr;
536  for (auto const& worker : allWorkers()) {
537  if (worker->description()->moduleLabel() == iLabel) {
538  found = worker;
539  break;
540  }
541  }
542  if (nullptr == found) {
543  return;
544  }
545 
546  iMod->replaceModuleFor(found);
547  found->beginStream(streamID_, streamContext_);
548  }
549 
551 
552  std::vector<ModuleDescription const*> StreamSchedule::getAllModuleDescriptions() const {
553  std::vector<ModuleDescription const*> result;
554  result.reserve(allWorkers().size());
555 
556  for (auto const& worker : allWorkers()) {
557  ModuleDescription const* p = worker->description();
558  result.push_back(p);
559  }
560  return result;
561  }
562 
564  WaitingTaskHolder iTask,
566  ServiceToken const& serviceToken,
567  std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters) {
568  EventPrincipal& ep = info.principal();
569 
570  // Caught exception is propagated via WaitingTaskHolder
571  CMS_SA_ALLOW try {
572  this->resetAll();
573 
575 
576  Traits::setStreamContext(streamContext_, ep);
577  //a service may want to communicate with another service
578  ServiceRegistry::Operate guard(serviceToken);
579  Traits::preScheduleSignal(actReg_.get(), &streamContext_);
580 
581  HLTPathStatus hltPathStatus(hlt::Pass, 0);
582  for (int empty_trig_path : empty_trig_paths_) {
583  results_->at(empty_trig_path) = hltPathStatus;
584  pathStatusInserters[empty_trig_path]->setPathStatus(streamID_, hltPathStatus);
585  std::exception_ptr except = pathStatusInserterWorkers_[empty_trig_path]
588  if (except) {
589  iTask.doneWaiting(except);
590  return;
591  }
592  }
593  for (int empty_end_path : empty_end_paths_) {
594  std::exception_ptr except = endPathStatusInserterWorkers_[empty_end_path]
597  if (except) {
598  iTask.doneWaiting(except);
599  return;
600  }
601  }
602 
605 
606  ++total_events_;
607 
608  //use to give priorities on an error to ones from Paths
609  auto pathErrorHolder = std::make_unique<std::atomic<std::exception_ptr*>>(nullptr);
610  auto pathErrorPtr = pathErrorHolder.get();
611  ServiceWeakToken weakToken = serviceToken;
612  auto allPathsDone = make_waiting_task(
613  [iTask, this, weakToken, pathError = std::move(pathErrorHolder)](std::exception_ptr const* iPtr) mutable {
614  ServiceRegistry::Operate operate(weakToken.lock());
615 
616  std::exception_ptr ptr;
617  if (pathError->load()) {
618  ptr = *pathError->load();
619  delete pathError->load();
620  }
621  if ((not ptr) and iPtr) {
622  ptr = *iPtr;
623  }
625  });
626  //The holder guarantees that if the paths finish before the loop ends
627  // that we do not start too soon. It also guarantees that the task will
628  // run under that condition.
629  WaitingTaskHolder allPathsHolder(*iTask.group(), allPathsDone);
630 
631  auto pathsDone = make_waiting_task([allPathsHolder, pathErrorPtr, transitionInfo = info, this, weakToken](
632  std::exception_ptr const* iPtr) mutable {
633  ServiceRegistry::Operate operate(weakToken.lock());
634 
635  if (iPtr) {
636  //this is used to prioritize this error over one
637  // that happens in EndPath or Accumulate
638  pathErrorPtr->store(new std::exception_ptr(*iPtr));
639  }
640  finishedPaths(*pathErrorPtr, std::move(allPathsHolder), transitionInfo);
641  });
642 
643  //The holder guarantees that if the paths finish before the loop ends
644  // that we do not start too soon. It also guarantees that the task will
645  // run under that condition.
646  WaitingTaskHolder taskHolder(*iTask.group(), pathsDone);
647 
648  //start end paths first so on single threaded the paths will run first
649  WaitingTaskHolder hAllPathsDone(*iTask.group(), allPathsDone);
650  for (auto it = end_paths_.rbegin(), itEnd = end_paths_.rend(); it != itEnd; ++it) {
651  it->processOneOccurrenceAsync(hAllPathsDone, info, serviceToken, streamID_, &streamContext_);
652  }
653 
654  for (auto it = trig_paths_.rbegin(), itEnd = trig_paths_.rend(); it != itEnd; ++it) {
655  it->processOneOccurrenceAsync(taskHolder, info, serviceToken, streamID_, &streamContext_);
656  }
657 
658  ParentContext parentContext(&streamContext_);
660  hAllPathsDone, info, serviceToken, streamID_, parentContext, &streamContext_);
661  } catch (...) {
662  iTask.doneWaiting(std::current_exception());
663  }
664  }
665 
666  void StreamSchedule::finishedPaths(std::atomic<std::exception_ptr*>& iExcept,
667  WaitingTaskHolder iWait,
669  if (iExcept) {
670  // Caught exception is propagated via WaitingTaskHolder
671  CMS_SA_ALLOW try { std::rethrow_exception(*(iExcept.load())); } catch (cms::Exception& e) {
676  edm::printCmsExceptionWarning("SkipEvent", e);
677  *(iExcept.load()) = std::exception_ptr();
678  } else {
679  *(iExcept.load()) = std::current_exception();
680  }
681  } catch (...) {
682  *(iExcept.load()) = std::current_exception();
683  }
684  }
685 
686  if ((not iExcept) and results_->accept()) {
687  ++total_passed_;
688  }
689 
690  if (nullptr != results_inserter_.get()) {
691  // Caught exception is propagated to the caller
692  CMS_SA_ALLOW try {
693  //Even if there was an exception, we need to allow results inserter
694  // to run since some module may be waiting on its results.
695  ParentContext parentContext(&streamContext_);
697 
698  auto expt = results_inserter_->runModuleDirectly<Traits>(info, streamID_, parentContext, &streamContext_);
699  if (expt) {
700  std::rethrow_exception(expt);
701  }
702  } catch (cms::Exception& ex) {
703  if (not iExcept) {
704  if (ex.context().empty()) {
705  std::ostringstream ost;
706  ost << "Processing Event " << info.principal().id();
707  ex.addContext(ost.str());
708  }
709  iExcept.store(new std::exception_ptr(std::current_exception()));
710  }
711  } catch (...) {
712  if (not iExcept) {
713  iExcept.store(new std::exception_ptr(std::current_exception()));
714  }
715  }
716  }
717  std::exception_ptr ptr;
718  if (iExcept) {
719  ptr = *iExcept.load();
720  }
721  iWait.doneWaiting(ptr);
722  }
723 
724  std::exception_ptr StreamSchedule::finishProcessOneEvent(std::exception_ptr iExcept) {
726 
727  if (iExcept) {
728  //add context information to the exception and print message
729  try {
730  convertException::wrap([&]() { std::rethrow_exception(iExcept); });
731  } catch (cms::Exception& ex) {
732  bool const cleaningUpAfterException = false;
733  if (ex.context().empty()) {
734  addContextAndPrintException("Calling function StreamSchedule::processOneEvent", ex, cleaningUpAfterException);
735  } else {
736  addContextAndPrintException("", ex, cleaningUpAfterException);
737  }
738  iExcept = std::current_exception();
739  }
740 
741  actReg_->preStreamEarlyTerminationSignal_(streamContext_, TerminationOrigin::ExceptionFromThisContext);
742  }
743  // Caught exception is propagated to the caller
744  CMS_SA_ALLOW try { Traits::postScheduleSignal(actReg_.get(), &streamContext_); } catch (...) {
745  if (not iExcept) {
746  iExcept = std::current_exception();
747  }
748  }
749  if (not iExcept) {
751  }
752 
753  return iExcept;
754  }
755 
756  void StreamSchedule::availablePaths(std::vector<std::string>& oLabelsToFill) const {
757  oLabelsToFill.reserve(trig_paths_.size());
758  std::transform(trig_paths_.begin(),
759  trig_paths_.end(),
760  std::back_inserter(oLabelsToFill),
761  std::bind(&Path::name, std::placeholders::_1));
762  }
763 
764  void StreamSchedule::modulesInPath(std::string const& iPathLabel, std::vector<std::string>& oLabelsToFill) const {
765  TrigPaths::const_iterator itFound = std::find_if(
766  trig_paths_.begin(),
767  trig_paths_.end(),
768  std::bind(std::equal_to<std::string>(), iPathLabel, std::bind(&Path::name, std::placeholders::_1)));
769  if (itFound != trig_paths_.end()) {
770  oLabelsToFill.reserve(itFound->size());
771  for (size_t i = 0; i < itFound->size(); ++i) {
772  oLabelsToFill.push_back(itFound->getWorker(i)->description()->moduleLabel());
773  }
774  }
775  }
776 
778  std::vector<ModuleDescription const*>& descriptions,
779  unsigned int hint) const {
780  descriptions.clear();
781  bool found = false;
782  TrigPaths::const_iterator itFound;
783 
784  if (hint < trig_paths_.size()) {
785  itFound = trig_paths_.begin() + hint;
786  if (itFound->name() == iPathLabel)
787  found = true;
788  }
789  if (!found) {
790  // if the hint did not work, do it the slow way
791  itFound = std::find_if(
792  trig_paths_.begin(),
793  trig_paths_.end(),
794  std::bind(std::equal_to<std::string>(), iPathLabel, std::bind(&Path::name, std::placeholders::_1)));
795  if (itFound != trig_paths_.end())
796  found = true;
797  }
798  if (found) {
799  descriptions.reserve(itFound->size());
800  for (size_t i = 0; i < itFound->size(); ++i) {
801  descriptions.push_back(itFound->getWorker(i)->description());
802  }
803  }
804  }
805 
807  std::vector<ModuleDescription const*>& descriptions,
808  unsigned int hint) const {
809  descriptions.clear();
810  bool found = false;
811  TrigPaths::const_iterator itFound;
812 
813  if (hint < end_paths_.size()) {
814  itFound = end_paths_.begin() + hint;
815  if (itFound->name() == iEndPathLabel)
816  found = true;
817  }
818  if (!found) {
819  // if the hint did not work, do it the slow way
820  itFound = std::find_if(
821  end_paths_.begin(),
822  end_paths_.end(),
823  std::bind(std::equal_to<std::string>(), iEndPathLabel, std::bind(&Path::name, std::placeholders::_1)));
824  if (itFound != end_paths_.end())
825  found = true;
826  }
827  if (found) {
828  descriptions.reserve(itFound->size());
829  for (size_t i = 0; i < itFound->size(); ++i) {
830  descriptions.push_back(itFound->getWorker(i)->description());
831  }
832  }
833  }
834 
835  void StreamSchedule::enableEndPaths(bool active) { endpathsAreActive_ = active; }
836 
838 
839  static void fillModuleInPathSummary(Path const& path, size_t which, ModuleInPathSummary& sum) {
840  sum.timesVisited += path.timesVisited(which);
841  sum.timesPassed += path.timesPassed(which);
842  sum.timesFailed += path.timesFailed(which);
843  sum.timesExcept += path.timesExcept(which);
844  sum.moduleLabel = path.getWorker(which)->description()->moduleLabel();
845  }
846 
847  static void fillPathSummary(Path const& path, PathSummary& sum) {
848  sum.name = path.name();
849  sum.bitPosition = path.bitPosition();
850  sum.timesRun += path.timesRun();
851  sum.timesPassed += path.timesPassed();
852  sum.timesFailed += path.timesFailed();
853  sum.timesExcept += path.timesExcept();
854 
855  Path::size_type sz = path.size();
856  if (sum.moduleInPathSummaries.empty()) {
857  std::vector<ModuleInPathSummary> temp(sz);
858  for (size_t i = 0; i != sz; ++i) {
860  }
861  sum.moduleInPathSummaries.swap(temp);
862  } else {
863  assert(sz == sum.moduleInPathSummaries.size());
864  for (size_t i = 0; i != sz; ++i) {
866  }
867  }
868  }
869 
870  static void fillWorkerSummaryAux(Worker const& w, WorkerSummary& sum) {
871  sum.timesVisited += w.timesVisited();
872  sum.timesRun += w.timesRun();
873  sum.timesPassed += w.timesPassed();
874  sum.timesFailed += w.timesFailed();
875  sum.timesExcept += w.timesExcept();
876  sum.moduleLabel = w.description()->moduleLabel();
877  }
878 
879  static void fillWorkerSummary(Worker const* pw, WorkerSummary& sum) { fillWorkerSummaryAux(*pw, sum); }
880 
882  rep.eventSummary.totalEvents += totalEvents();
883  rep.eventSummary.totalEventsPassed += totalEventsPassed();
884  rep.eventSummary.totalEventsFailed += totalEventsFailed();
885 
886  fill_summary(trig_paths_, rep.trigPathSummaries, &fillPathSummary);
887  fill_summary(end_paths_, rep.endPathSummaries, &fillPathSummary);
888  fill_summary(allWorkers(), rep.workerSummaries, &fillWorkerSummary);
889  }
890 
892  using std::placeholders::_1;
894  for_all(trig_paths_, std::bind(&Path::clearCounters, _1));
895  for_all(end_paths_, std::bind(&Path::clearCounters, _1));
896  for_all(allWorkers(), std::bind(&Worker::clearCounters, _1));
897  }
898 
900  skippingEvent_ = false;
901  results_->reset();
902  }
903 
905 
907  //must be sure we have cleared the count first
908  for (auto& count : earlyDeleteBranchToCount_) {
909  count.count = 0;
910  }
911  //now reset based on how many helpers use that branch
913  ++(earlyDeleteBranchToCount_[index].count);
914  }
915  for (auto& helper : earlyDeleteHelpers_) {
916  helper.reset();
917  }
918  }
919 
921  std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
922  std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
924  int bitpos = 0;
925  unsigned int indexEmpty = 0;
926  unsigned int indexOfPath = 0;
927  for (auto& pathStatusInserter : pathStatusInserters) {
928  std::shared_ptr<PathStatusInserter> inserterPtr = get_underlying(pathStatusInserter);
929  WorkerPtr workerPtr(
930  new edm::WorkerT<PathStatusInserter::ModuleType>(inserterPtr, inserterPtr->moduleDescription(), &actions));
931  pathStatusInserterWorkers_.emplace_back(workerPtr);
932  workerPtr->setActivityRegistry(actReg_);
933  addToAllWorkers(workerPtr.get());
934 
935  // A little complexity here because a C++ Path object is not
936  // instantiated and put into end_paths if there are no modules
937  // on the configured path.
938  if (indexEmpty < empty_trig_paths_.size() && bitpos == empty_trig_paths_.at(indexEmpty)) {
939  ++indexEmpty;
940  } else {
941  trig_paths_.at(indexOfPath).setPathStatusInserter(inserterPtr.get(), workerPtr.get());
942  ++indexOfPath;
943  }
944  ++bitpos;
945  }
946 
947  bitpos = 0;
948  indexEmpty = 0;
949  indexOfPath = 0;
950  for (auto& endPathStatusInserter : endPathStatusInserters) {
951  std::shared_ptr<EndPathStatusInserter> inserterPtr = get_underlying(endPathStatusInserter);
952  WorkerPtr workerPtr(
953  new edm::WorkerT<EndPathStatusInserter::ModuleType>(inserterPtr, inserterPtr->moduleDescription(), &actions));
954  endPathStatusInserterWorkers_.emplace_back(workerPtr);
955  workerPtr->setActivityRegistry(actReg_);
956  addToAllWorkers(workerPtr.get());
957 
958  // A little complexity here because a C++ Path object is not
959  // instantiated and put into end_paths if there are no modules
960  // on the configured path.
961  if (indexEmpty < empty_end_paths_.size() && bitpos == empty_end_paths_.at(indexEmpty)) {
962  ++indexEmpty;
963  } else {
964  end_paths_.at(indexOfPath).setPathStatusInserter(nullptr, workerPtr.get());
965  ++indexOfPath;
966  }
967  ++bitpos;
968  }
969  }
970 } // namespace edm
edm::WorkerSummary::timesVisited
int timesVisited
Definition: TriggerReport.h:45
edm::Path::size_type
WorkersInPath::size_type size_type
Definition: Path.h:46
edm::StreamSchedule::fillEndPath
void fillEndPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, std::vector< std::string > const &endPathNames)
Definition: StreamSchedule.cc:501
edm::EventTransitionInfo
Definition: TransitionInfoTypes.h:26
edm::StreamSchedule::addToAllWorkers
void addToAllWorkers(Worker *w)
Definition: StreamSchedule.cc:904
edm::pset::Registry::instance
static Registry * instance()
Definition: Registry.cc:12
edm::fillWorkerSummaryAux
static void fillWorkerSummaryAux(Worker const &w, WorkerSummary &sum)
Definition: StreamSchedule.cc:870
edm::PathContext::PathType::kPath
edm::StreamID
Definition: StreamID.h:30
edm::WorkerManager::addToAllWorkers
void addToAllWorkers(Worker *w)
Definition: WorkerManager.cc:135
edm::TerminationOrigin::ExceptionFromThisContext
mps_fire.i
i
Definition: mps_fire.py:428
edm::StreamSchedule::endPathStatusInserterWorkers_
std::vector< edm::propagate_const< WorkerPtr > > endPathStatusInserterWorkers_
Definition: StreamSchedule.h:343
edm::WorkerSummary::moduleLabel
std::string moduleLabel
Definition: TriggerReport.h:51
edm::StreamSchedule::empty_end_paths_
std::vector< int > empty_end_paths_
Definition: StreamSchedule.h:348
edm::StreamSchedule::clearCounters
void clearCounters()
Clear all the counters in the trigger report.
Definition: StreamSchedule.cc:891
edm::ModuleInPathSummary::timesFailed
int timesFailed
Definition: TriggerReport.h:27
edm::StreamSchedule::getTriggerReport
void getTriggerReport(TriggerReport &rep) const
Definition: StreamSchedule.cc:881
MessageLogger.h
funct::false
false
Definition: Factorize.h:29
dqmiodumpmetadata.n
n
Definition: dqmiodumpmetadata.py:28
edm::StreamSchedule::beginStream
void beginStream()
Definition: StreamSchedule.cc:530
edm::StreamSchedule::results
TrigResConstPtr results() const
Definition: StreamSchedule.h:328
cms::Exception::addContext
void addContext(std::string const &context)
Definition: Exception.cc:165
edm::StreamSchedule::makePathStatusInserters
void makePathStatusInserters(std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, ExceptionToActionTable const &actions)
Definition: StreamSchedule.cc:920
edm::ServiceWeakToken
Definition: ServiceToken.h:86
WorkerT.h
edm::PathSummary::timesFailed
int timesFailed
Definition: TriggerReport.h:37
edm::ParameterSet::getUntrackedParameterSet
ParameterSet getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
Definition: ParameterSet.cc:2136
BranchIDListHelper.h
TriggerNamesService.h
WaitingTaskHolder.h
edm::StreamSchedule::fillWorkers
void fillWorkers(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &name, bool ignoreFilters, PathWorkers &out, std::vector< std::string > const &endPathNames)
Definition: StreamSchedule.cc:400
edm::StreamSchedule::workerManager_
WorkerManager workerManager_
Definition: StreamSchedule.h:336
MicroEventContent_cff.branch
branch
Definition: MicroEventContent_cff.py:174
edm
HLT enums.
Definition: AlignableModifier.h:19
edm::ProcessContext
Definition: ProcessContext.h:27
edm::StreamSchedule::streamContext_
StreamContext streamContext_
Definition: StreamSchedule.h:370
edm::printCmsExceptionWarning
void printCmsExceptionWarning(char const *behavior, cms::Exception const &e)
Definition: ExceptionMessages.cc:25
edm::StreamSchedule::earlyDeleteBranchToCount_
std::vector< BranchToCount > earlyDeleteBranchToCount_
Definition: StreamSchedule.h:353
edm::StreamSchedule::total_events_
int total_events_
Definition: StreamSchedule.h:365
edm::ModuleDescription::moduleName
std::string const & moduleName() const
Definition: ModuleDescription.h:42
edm::WorkerManager::addToUnscheduledWorkers
void addToUnscheduledWorkers(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration > processConfiguration, std::string label, std::set< std::string > &unscheduledLabels, std::vector< std::string > &shouldBeUsedLabels)
Definition: WorkerManager.cc:52
edm::ModuleInPathSummary::timesExcept
int timesExcept
Definition: TriggerReport.h:28
TriggerTimingReport.h
edm::StreamSchedule::PathWorkers
std::vector< WorkerInPath > PathWorkers
Definition: StreamSchedule.h:164
Algorithms.h
edm::StreamSchedule::totalEventsPassed
int totalEventsPassed() const
Definition: StreamSchedule.h:230
edm::fillWorkerSummary
static void fillWorkerSummary(Worker const *pw, WorkerSummary &sum)
Definition: StreamSchedule.cc:879
edm::HLTGlobalStatus
Definition: HLTGlobalStatus.h:25
to
cms::cuda::assert
assert(be >=bs)
edm::propagate_const::get
constexpr element_type const * get() const
Definition: propagate_const.h:64
edm::StreamSchedule::StreamSchedule
StreamSchedule(std::shared_ptr< TriggerResultInserter > inserter, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, std::shared_ptr< ModuleRegistry >, ParameterSet &proc_pset, service::TriggerNamesService const &tns, PreallocationConfiguration const &prealloc, ProductRegistry &pregistry, BranchIDListHelper &branchIDListHelper, ExceptionToActionTable const &actions, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration > processConfiguration, bool allowEarlyDelete, StreamID streamID, ProcessContext const *processContext)
Definition: StreamSchedule.cc:138
info
static const TGPicture * info(bool iBackgroundIsBlack)
Definition: FWCollectionSummaryWidget.cc:153
edm::StreamSchedule::allWorkers
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
Definition: StreamSchedule.h:258
edm::maker::ModuleHolder
Definition: ModuleHolder.h:37
edm::ParameterSet::getUntrackedParameter
T getUntrackedParameter(std::string const &, T const &) const
edm::StreamID::value
unsigned int value() const
Definition: StreamID.h:43
ProductRegistry.h
edm::WorkerT
Definition: Frameworkfwd.h:63
WorkerInPath.h
edm::StreamSchedule::WorkerPtr
std::shared_ptr< Worker > WorkerPtr
Definition: StreamSchedule.h:159
edm::LogInfo
Log< level::Info, false > LogInfo
Definition: MessageLogger.h:125
edm::WaitingTaskHolder::doneWaiting
void doneWaiting(std::exception_ptr iExcept)
Definition: WaitingTaskHolder.h:93
newFWLiteAna.found
found
Definition: newFWLiteAna.py:118
groupFilesInBlocks.temp
list temp
Definition: groupFilesInBlocks.py:142
edm::LogWarning
Log< level::Warning, false > LogWarning
Definition: MessageLogger.h:122
edm::ModuleInPathSummary::timesPassed
int timesPassed
Definition: TriggerReport.h:26
CMS_SA_ALLOW
#define CMS_SA_ALLOW
Definition: thread_safety_macros.h:5
edm::PathSummary::moduleInPathSummaries
std::vector< ModuleInPathSummary > moduleInPathSummaries
Definition: TriggerReport.h:41
edm::StreamSchedule::vstring
std::vector< std::string > vstring
Definition: StreamSchedule.h:154
edm::ModuleDescription
Definition: ModuleDescription.h:21
edm::PathSummary::name
std::string name
Definition: TriggerReport.h:40
edm::StreamSchedule::results_inserter_
edm::propagate_const< WorkerPtr > results_inserter_
Definition: StreamSchedule.h:341
edm::for_all
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
edm::ProductRegistry
Definition: ProductRegistry.h:37
edm::Worker::description
ModuleDescription const * description() const
Definition: Worker.h:188
TriggerReport.h
edm::ServiceToken
Definition: ServiceToken.h:42
edm::propagate_const
Definition: propagate_const.h:32
OutputModuleCommunicator.h
edm::vstring
std::vector< std::string > vstring
Definition: Schedule.cc:654
edm::Worker::kFilter
Definition: Worker.h:94
edm::StreamSchedule::number_of_unscheduled_modules_
unsigned int number_of_unscheduled_modules_
Definition: StreamSchedule.h:367
edm::WorkerSummary::timesExcept
int timesExcept
Definition: TriggerReport.h:49
edm::EventPrincipal
Definition: EventPrincipal.h:48
OutputModuleDescription.h
edm::StreamSchedule::totalEvents
int totalEvents() const
Definition: StreamSchedule.h:226
edm::StreamSchedule::moduleDescriptionsInEndPath
void moduleDescriptionsInEndPath(std::string const &iEndPathLabel, std::vector< ModuleDescription const * > &descriptions, unsigned int hint) const
Definition: StreamSchedule.cc:806
edm::maker::ModuleHolder::createOutputModuleCommunicator
virtual std::unique_ptr< OutputModuleCommunicator > createOutputModuleCommunicator()=0
edm::ModuleInPathSummary::moduleLabel
std::string moduleLabel
Definition: TriggerReport.h:30
ProcessConfiguration.h
hltMonBTagIPClient_cfi.pathName
pathName
Definition: hltMonBTagIPClient_cfi.py:5
edm::StreamSchedule::processOneEventAsync
void processOneEventAsync(WaitingTaskHolder iTask, EventTransitionInfo &, ServiceToken const &token, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters)
Definition: StreamSchedule.cc:563
edm::StreamSchedule::endPathsEnabled
bool endPathsEnabled() const
Definition: StreamSchedule.cc:837
w
const double w
Definition: UKUtility.cc:23
edm::convertException::wrap
auto wrap(F iFunc) -> decltype(iFunc())
Definition: ConvertException.h:19
edm::BranchID
Definition: BranchID.h:14
edm::PathSummary::bitPosition
int bitPosition
Definition: TriggerReport.h:34
edm::WorkerInPath::Ignore
Definition: WorkerInPath.h:27
mps_fire.end
end
Definition: mps_fire.py:242
edm::exception_actions::FailPath
Definition: ExceptionActions.h:11
edm::StreamSchedule::enableEndPaths
void enableEndPaths(bool active)
Definition: StreamSchedule.cc:835
edm::WorkerSummary::timesRun
int timesRun
Definition: TriggerReport.h:46
edm::OccurrenceTraits< EventPrincipal, BranchActionStreamBegin >
Definition: OccurrenceTraits.h:38
submitPVResolutionJobs.count
count
Definition: submitPVResolutionJobs.py:352
edm::StreamSchedule::replaceModule
void replaceModule(maker::ModuleHolder *iMod, std::string const &iLabel)
clone the type of module with label iLabel but configure with iPSet.
Definition: StreamSchedule.cc:534
HcalDetIdTransform::transform
unsigned transform(const HcalDetId &id, unsigned transformCode)
Definition: HcalDetIdTransform.cc:7
edm::StreamSchedule::actionTable
ExceptionToActionTable const & actionTable() const
returns the action table
Definition: StreamSchedule.h:286
edm::InEvent
Definition: BranchType.h:11
ConvertException.h
edm::exception_actions::SkipEvent
Definition: ExceptionActions.h:11
ExceptionCollector.h
ParameterSetDescription.h
b
double b
Definition: hdecay.h:118
edm::Worker
Definition: Worker.h:91
edm::WorkerInPath::Normal
Definition: WorkerInPath.h:27
edm::StreamSchedule::results_
edm::propagate_const< TrigResPtr > results_
Definition: StreamSchedule.h:339
edm::StreamSchedule::empty_trig_paths_
std::vector< int > empty_trig_paths_
Definition: StreamSchedule.h:347
edm::WorkerSummary
Definition: TriggerReport.h:44
edm::StreamSchedule::initializeEarlyDelete
void initializeEarlyDelete(ModuleRegistry &modReg, edm::ParameterSet const &opts, edm::ProductRegistry const &preg, bool allowEarlyDelete)
Definition: StreamSchedule.cc:244
edm::ModuleRegistry::forAllModuleHolders
void forAllModuleHolders(F iFunc)
Definition: ModuleRegistry.h:56
mitigatedMETSequence_cff.U
U
Definition: mitigatedMETSequence_cff.py:36
edm::ParentContext
Definition: ParentContext.h:27
edm::WorkerInPath::FilterAction
FilterAction
Definition: WorkerInPath.h:27
edm::WorkerManager::beginStream
void beginStream(StreamID iID, StreamContext &streamContext)
Definition: WorkerManager.cc:121
edm::WorkerSummary::timesPassed
int timesPassed
Definition: TriggerReport.h:47
funct::true
true
Definition: Factorize.h:173
edm::StreamSchedule::moduleDescriptionsInPath
void moduleDescriptionsInPath(std::string const &iPathLabel, std::vector< ModuleDescription const * > &descriptions, unsigned int hint) const
Definition: StreamSchedule.cc:777
StreamSchedule.h
edm::ParameterSet
Definition: ParameterSet.h:47
cms::Exception::context
std::list< std::string > const & context() const
Definition: Exception.cc:147
AlCaHLTBitMon_ParallelJobs.p
def p
Definition: AlCaHLTBitMon_ParallelJobs.py:153
edm::make_waiting_task
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:101
ParameterSet
Definition: Functions.h:16
edm::WaitingTaskHolder
Definition: WaitingTaskHolder.h:32
edm::StreamSchedule::skippingEvent_
std::atomic< bool > skippingEvent_
Definition: StreamSchedule.h:372
edm::WorkerManager::setupOnDemandSystem
void setupOnDemandSystem(EventTransitionInfo const &)
Definition: WorkerManager.cc:150
edm::StreamSchedule::actReg_
std::shared_ptr< ActivityRegistry > actReg_
Definition: StreamSchedule.h:337
LuminosityBlockProcessingStatus.h
jetUpdater_cfi.sort
sort
Definition: jetUpdater_cfi.py:29
dumpMFGeometry_cfg.delta
delta
Definition: dumpMFGeometry_cfg.py:25
helper
Definition: helper.py:1
TriggerResultInserter.h
edm::Path
Definition: Path.h:41
edm::HLTPathStatus
Definition: HLTPathStatus.h:33
edm::get_underlying
constexpr T & get_underlying(propagate_const< T > &)
Definition: propagate_const.h:103
edm::BranchIDListHelper
Definition: BranchIDListHelper.h:15
edm::addContextAndPrintException
void addContextAndPrintException(char const *context, cms::Exception &ex, bool disablePrint)
Definition: ExceptionHelpers.cc:11
edm::StreamSchedule::fillTrigPath
void fillTrigPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, TrigResPtr, std::vector< std::string > const &endPathNames)
Definition: StreamSchedule.cc:471
writedatasetfile.action
action
Definition: writedatasetfile.py:8
cuy.rep
rep
Definition: cuy.py:1189
edm::StreamSchedule::end_paths_
TrigPaths end_paths_
Definition: StreamSchedule.h:346
trackerHitRTTI::vector
Definition: trackerHitRTTI.h:21
edm::WorkerManager::processAccumulatorsAsync
void processAccumulatorsAsync(WaitingTaskHolder, typename T::TransitionInfoType const &, ServiceToken const &, StreamID, ParentContext const &, typename T::Context const *)
Definition: WorkerManager.h:119
edm::ServiceWeakToken::lock
ServiceToken lock() const
Definition: ServiceToken.h:101
B2GTnPMonitor_cfi.item
item
Definition: B2GTnPMonitor_cfi.py:147
edm::fillPathSummary
static void fillPathSummary(Path const &path, PathSummary &sum)
Definition: StreamSchedule.cc:847
edm::ExceptionToActionTable
Definition: ExceptionActions.h:16
TrackCollections2monitor_cff.func
func
Definition: TrackCollections2monitor_cff.py:359
PathContext.h
PathStatusInserter.h
edm::TriggerReport
Definition: TriggerReport.h:56
AlCaHLTBitMon_QueryRunRegistry.string
string string
Definition: AlCaHLTBitMon_QueryRunRegistry.py:256
edm::StreamSchedule::streamID_
StreamID streamID_
Definition: StreamSchedule.h:369
edm::Worker::moduleType
virtual Types moduleType() const =0
cmsLHEtoEOSManager.l
l
Definition: cmsLHEtoEOSManager.py:204
EndPathStatusInserter.h
edm::StreamSchedule::getAllModuleDescriptions
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
Definition: StreamSchedule.cc:552
edm::service::TriggerNamesService
Definition: TriggerNamesService.h:42
Registry.h
edm::PreallocationConfiguration
Definition: PreallocationConfiguration.h:27
edm::ModuleInPathSummary::timesVisited
int timesVisited
Definition: TriggerReport.h:25
edm::StreamSchedule::resetAll
void resetAll()
Definition: StreamSchedule.cc:899
edm::StreamSchedule::finishedPaths
void finishedPaths(std::atomic< std::exception_ptr * > &, WaitingTaskHolder, EventTransitionInfo &)
Definition: StreamSchedule.cc:666
ModuleRegistry.h
edm::Worker::clearCounters
void clearCounters()
Definition: Worker.h:222
submitPVResolutionJobs.desc
string desc
Definition: submitPVResolutionJobs.py:251
edm::StreamSchedule::earlyDeleteHelpers_
std::vector< EarlyDeleteHelper > earlyDeleteHelpers_
Definition: StreamSchedule.h:363
edm::maker::ModuleHolder::replaceModuleFor
virtual void replaceModuleFor(Worker *) const =0
edm::SelectedProductsForBranchType
std::array< SelectedProducts, NumBranchTypes > SelectedProductsForBranchType
Definition: SelectedProducts.h:13
eostools.move
def move(src, dest)
Definition: eostools.py:511
edm::ModuleInPathSummary
Definition: TriggerReport.h:24
edm::ModuleRegistry
Definition: ModuleRegistry.h:40
edm::StreamSchedule::TrigResPtr
std::shared_ptr< HLTGlobalStatus > TrigResPtr
Definition: StreamSchedule.h:157
tier0.unique
def unique(seq, keepstr=True)
Definition: tier0.py:24
edm::Path::name
std::string const & name() const
Definition: Path.h:74
edm::service::TriggerNamesService::getTrigPaths
Strings const & getTrigPaths() const
Definition: TriggerNamesService.h:55
edm::PathSummary
Definition: TriggerReport.h:33
edm::fillModuleInPathSummary
static void fillModuleInPathSummary(Path const &path, size_t which, ModuleInPathSummary &sum)
Definition: StreamSchedule.cc:839
ModuleHolder.h
edm::PathSummary::timesRun
int timesRun
Definition: TriggerReport.h:35
edm::service::TriggerNamesService::getEndPaths
Strings const & getEndPaths() const
Definition: TriggerNamesService.h:74
edm::WorkerSummary::timesFailed
int timesFailed
Definition: TriggerReport.h:48
Exception
Definition: hltDiff.cc:245
edm::PathSummary::timesPassed
int timesPassed
Definition: TriggerReport.h:36
edm::StreamSchedule::trig_paths_
TrigPaths trig_paths_
Definition: StreamSchedule.h:345
edm::WorkerInPath
Definition: WorkerInPath.h:25
edm::PathContext::PathType::kEndPath
edm::PathSummary::timesExcept
int timesExcept
Definition: TriggerReport.h:38
edm::WorkerInPath::Veto
Definition: WorkerInPath.h:27
edm::StreamSchedule::resetEarlyDelete
void resetEarlyDelete()
Definition: StreamSchedule.cc:906
Skims_PA_cff.name
name
Definition: Skims_PA_cff.py:17
or
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
edm::ParameterSet::getParameter
T getParameter(std::string const &) const
Definition: ParameterSet.h:303
edm::ExceptionToActionTable::find
exception_actions::ActionCodes find(const std::string &category) const
Definition: ExceptionActions.cc:85
edm::WorkerManager::setupResolvers
void setupResolvers(Principal &principal)
Definition: WorkerManager.cc:141
edm::WorkerManager::endStream
void endStream(StreamID iID, StreamContext &streamContext)
Definition: WorkerManager.cc:127
AlignmentPI::index
index
Definition: AlignmentPayloadInspectorHelper.h:46
edm::StreamSchedule::endStream
void endStream()
Definition: StreamSchedule.cc:532
edm::WorkerManager::getWorker
Worker * getWorker(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &label)
Definition: WorkerManager.cc:43
edm::BranchDescription
Definition: BranchDescription.h:32
MillePedeFileConverter_cfg.out
out
Definition: MillePedeFileConverter_cfg.py:31
actions
roAction_t actions[nactions]
Definition: GenABIO.cc:181
mps_fire.result
result
Definition: mps_fire.py:311
edm::search_all
bool search_all(ForwardSequence const &s, Datum const &d)
Definition: Algorithms.h:36
cms::Exception
Definition: Exception.h:70
castor_dqm_sourceclient_file_cfg.path
path
Definition: castor_dqm_sourceclient_file_cfg.py:37
edm::StreamSchedule::streamID
StreamID streamID() const
Definition: StreamSchedule.h:199
ParameterSet.h
HerwigMaxPtPartonFilter_cfi.moduleLabel
moduleLabel
Definition: HerwigMaxPtPartonFilter_cfi.py:4
edm::StreamSchedule::pathStatusInserterWorkers_
std::vector< edm::propagate_const< WorkerPtr > > pathStatusInserterWorkers_
Definition: StreamSchedule.h:342
edm::hlt::Pass
accept
Definition: HLTenums.h:18
edm::StreamSchedule::availablePaths
void availablePaths(std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill the labels for all paths in the process
Definition: StreamSchedule.cc:756
edm::exception_actions::IgnoreCompletely
Definition: ExceptionActions.h:11
edm::StreamSchedule::earlyDeleteHelperToBranchIndicies_
std::vector< unsigned int > earlyDeleteHelperToBranchIndicies_
Definition: StreamSchedule.h:360
Factory.h
edm::Path::clearCounters
void clearCounters()
Definition: Path.cc:198
edm::WorkerManager::deleteModuleIfExists
void deleteModuleIfExists(std::string const &moduleLabel)
Definition: WorkerManager.cc:33
edm::ParameterSet::getPSetForUpdate
ParameterSet * getPSetForUpdate(std::string const &name, bool &isTracked)
Definition: ParameterSet.cc:457
edm::StreamSchedule::finishProcessOneEvent
std::exception_ptr finishProcessOneEvent(std::exception_ptr)
Definition: StreamSchedule.cc:724
edm::Log
Definition: MessageLogger.h:70
eostools.which
def which(cmd)
Definition: eostools.py:336
edm::ServiceRegistry::Operate
Definition: ServiceRegistry.h:40
edm::StreamSchedule::totalEventsFailed
int totalEventsFailed() const
Definition: StreamSchedule.h:234
edm::pset::Registry::getMapped
bool getMapped(key_type const &k, value_type &result) const
Definition: Registry.cc:17
edm::errors::Configuration
Definition: EDMException.h:36
edm::StreamSchedule::modulesInPath
void modulesInPath(std::string const &iPathLabel, std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel
Definition: StreamSchedule.cc:764
SiStripBadComponentsDQMServiceTemplate_cfg.ep
ep
Definition: SiStripBadComponentsDQMServiceTemplate_cfg.py:86
label
const char * label
Definition: PFTauDecayModeTools.cc:11
edm::StreamSchedule::endpathsAreActive_
volatile bool endpathsAreActive_
Definition: StreamSchedule.h:371
edm::WaitingTaskHolder::group
tbb::task_group * group() const noexcept
Definition: WaitingTaskHolder.h:77
muonDTDigis_cfi.pset
pset
Definition: muonDTDigis_cfi.py:27
geometryDiff.opts
opts
Definition: geometryDiff.py:11
edm::StreamSchedule::total_passed_
int total_passed_
Definition: StreamSchedule.h:366
edm::exception_actions::ActionCodes
ActionCodes
Definition: ExceptionActions.h:11
findQualityFiles.size
size
Write out results.
Definition: findQualityFiles.py:443
MillePedeFileConverter_cfg.e
e
Definition: MillePedeFileConverter_cfg.py:37
edm::StreamSchedule::deleteModule
void deleteModule(std::string const &iLabel)
Delete the module with label iLabel.
Definition: StreamSchedule.cc:550