CMS 3D CMS Logo

StreamSchedule.cc
Go to the documentation of this file.
2 
28 
30 
31 #include <algorithm>
32 #include <cassert>
33 #include <cstdlib>
34 #include <functional>
35 #include <iomanip>
36 #include <list>
37 #include <map>
38 #include <exception>
39 
40 namespace edm {
41 
42  namespace {
43 
44  // Function template to transform each element in the input range to
45  // a value placed into the output range. The supplied function
46  // should take a const_reference to the 'input', and write to a
47  // reference to the 'output'.
48  template <typename InputIterator, typename ForwardIterator, typename Func>
49  void transform_into(InputIterator begin, InputIterator end, ForwardIterator out, Func func) {
50  for (; begin != end; ++begin, ++out)
51  func(*begin, *out);
52  }
53 
54  // Function template that takes a sequence 'from', a sequence
55  // 'to', and a callable object 'func'. It and applies
56  // transform_into to fill the 'to' sequence with the values
57  // calcuated by the callable object, taking care to fill the
58  // outupt only if all calls succeed.
59  template <typename FROM, typename TO, typename FUNC>
60  void fill_summary(FROM const& from, TO& to, FUNC func) {
61  if (to.size() != from.size()) {
62  TO temp(from.size());
63  transform_into(from.begin(), from.end(), temp.begin(), func);
64  to.swap(temp);
65  } else {
66  transform_into(from.begin(), from.end(), to.begin(), func);
67  }
68  }
69 
70  // -----------------------------
71 
72  // Here we make the trigger results inserter directly. This should
73  // probably be a utility in the WorkerRegistry or elsewhere.
74 
75  StreamSchedule::WorkerPtr makeInserter(ExceptionToActionTable const& actions,
76  std::shared_ptr<ActivityRegistry> areg,
77  std::shared_ptr<TriggerResultInserter> inserter) {
79  new edm::WorkerT<TriggerResultInserter::ModuleType>(inserter, inserter->moduleDescription(), &actions));
80  ptr->setActivityRegistry(areg);
81  return ptr;
82  }
83 
84  void initializeBranchToReadingWorker(ParameterSet const& opts,
85  ProductRegistry const& preg,
86  std::multimap<std::string, Worker*>& branchToReadingWorker) {
87  // See if any data has been marked to be deleted early (removing any duplicates)
88  auto vBranchesToDeleteEarly = opts.getUntrackedParameter<std::vector<std::string>>("canDeleteEarly");
89  if (not vBranchesToDeleteEarly.empty()) {
90  std::sort(vBranchesToDeleteEarly.begin(), vBranchesToDeleteEarly.end(), std::less<std::string>());
91  vBranchesToDeleteEarly.erase(std::unique(vBranchesToDeleteEarly.begin(), vBranchesToDeleteEarly.end()),
92  vBranchesToDeleteEarly.end());
93 
94  // Are the requested items in the product registry?
95  auto allBranchNames = preg.allBranchNames();
96  //the branch names all end with a period, which we do not want to compare with
97  for (auto& b : allBranchNames) {
98  b.resize(b.size() - 1);
99  }
100  std::sort(allBranchNames.begin(), allBranchNames.end(), std::less<std::string>());
101  std::vector<std::string> temp;
102  temp.reserve(vBranchesToDeleteEarly.size());
103 
104  std::set_intersection(vBranchesToDeleteEarly.begin(),
105  vBranchesToDeleteEarly.end(),
106  allBranchNames.begin(),
107  allBranchNames.end(),
108  std::back_inserter(temp));
109  vBranchesToDeleteEarly.swap(temp);
110  if (temp.size() != vBranchesToDeleteEarly.size()) {
111  std::vector<std::string> missingProducts;
112  std::set_difference(temp.begin(),
113  temp.end(),
114  vBranchesToDeleteEarly.begin(),
115  vBranchesToDeleteEarly.end(),
116  std::back_inserter(missingProducts));
117  LogInfo l("MissingProductsForCanDeleteEarly");
118  l << "The following products in the 'canDeleteEarly' list are not available in this job and will be ignored.";
119  for (auto const& n : missingProducts) {
120  l << "\n " << n;
121  }
122  }
123  //set placeholder for the branch, we will remove the nullptr if a
124  // module actually wants the branch.
125  for (auto const& branch : vBranchesToDeleteEarly) {
126  branchToReadingWorker.insert(std::make_pair(branch, static_cast<Worker*>(nullptr)));
127  }
128  }
129  }
130  } // namespace
131 
132  // -----------------------------
133 
134  typedef std::vector<std::string> vstring;
135 
136  // -----------------------------
137 
139  std::shared_ptr<TriggerResultInserter> inserter,
140  std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
141  std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
142  std::shared_ptr<ModuleRegistry> modReg,
143  ParameterSet& proc_pset,
144  service::TriggerNamesService const& tns,
145  PreallocationConfiguration const& prealloc,
146  ProductRegistry& preg,
147  BranchIDListHelper& branchIDListHelper,
149  std::shared_ptr<ActivityRegistry> areg,
150  std::shared_ptr<ProcessConfiguration> processConfiguration,
151  bool allowEarlyDelete,
152  StreamID streamID,
153  ProcessContext const* processContext)
154  : workerManager_(modReg, areg, actions),
155  actReg_(areg),
156  results_(new HLTGlobalStatus(tns.getTrigPaths().size())),
157  results_inserter_(),
158  trig_paths_(),
159  end_paths_(),
160  total_events_(),
161  total_passed_(),
162  number_of_unscheduled_modules_(0),
163  streamID_(streamID),
164  streamContext_(streamID_, processContext),
165  endpathsAreActive_(true),
166  skippingEvent_(false) {
167  ParameterSet const& opts = proc_pset.getUntrackedParameterSet("options", ParameterSet());
168  bool hasPath = false;
169  std::vector<std::string> const& pathNames = tns.getTrigPaths();
170  std::vector<std::string> const& endPathNames = tns.getEndPaths();
171 
172  int trig_bitpos = 0;
173  trig_paths_.reserve(pathNames.size());
174  for (auto const& trig_name : pathNames) {
175  fillTrigPath(proc_pset, preg, &prealloc, processConfiguration, trig_bitpos, trig_name, results(), endPathNames);
176  ++trig_bitpos;
177  hasPath = true;
178  }
179 
180  if (hasPath) {
181  // the results inserter stands alone
182  inserter->setTrigResultForStream(streamID.value(), results());
183 
184  results_inserter_ = makeInserter(actions, actReg_, inserter);
186  }
187 
188  // fill normal endpaths
189  int bitpos = 0;
190  end_paths_.reserve(endPathNames.size());
191  for (auto const& end_path_name : endPathNames) {
192  fillEndPath(proc_pset, preg, &prealloc, processConfiguration, bitpos, end_path_name, endPathNames);
193  ++bitpos;
194  }
195 
196  makePathStatusInserters(pathStatusInserters, endPathStatusInserters, actions);
197 
198  //See if all modules were used
199  std::set<std::string> usedWorkerLabels;
200  for (auto const& worker : allWorkers()) {
201  usedWorkerLabels.insert(worker->description().moduleLabel());
202  }
203  std::vector<std::string> modulesInConfig(proc_pset.getParameter<std::vector<std::string>>("@all_modules"));
204  std::set<std::string> modulesInConfigSet(modulesInConfig.begin(), modulesInConfig.end());
205  std::vector<std::string> unusedLabels;
206  set_difference(modulesInConfigSet.begin(),
207  modulesInConfigSet.end(),
208  usedWorkerLabels.begin(),
209  usedWorkerLabels.end(),
210  back_inserter(unusedLabels));
211  std::set<std::string> unscheduledLabels;
212  std::vector<std::string> shouldBeUsedLabels;
213  if (!unusedLabels.empty()) {
214  //Need to
215  // 1) create worker
216  // 2) if it is a WorkerT<EDProducer>, add it to our list
217  // 3) hand list to our delayed reader
218  for (auto const& label : unusedLabels) {
219  bool isTracked;
220  ParameterSet* modulePSet(proc_pset.getPSetForUpdate(label, isTracked));
221  assert(isTracked);
222  assert(modulePSet != nullptr);
224  *modulePSet, preg, &prealloc, processConfiguration, label, unscheduledLabels, shouldBeUsedLabels);
225  }
226  if (!shouldBeUsedLabels.empty()) {
227  std::ostringstream unusedStream;
228  unusedStream << "'" << shouldBeUsedLabels.front() << "'";
229  for (std::vector<std::string>::iterator itLabel = shouldBeUsedLabels.begin() + 1,
230  itLabelEnd = shouldBeUsedLabels.end();
231  itLabel != itLabelEnd;
232  ++itLabel) {
233  unusedStream << ",'" << *itLabel << "'";
234  }
235  LogInfo("path") << "The following module labels are not assigned to any path:\n" << unusedStream.str() << "\n";
236  }
237  }
238  number_of_unscheduled_modules_ = unscheduledLabels.size();
239 
240  initializeEarlyDelete(*modReg, opts, preg, allowEarlyDelete);
241 
242  } // StreamSchedule::StreamSchedule
243 
245  edm::ParameterSet const& opts,
246  edm::ProductRegistry const& preg,
247  bool allowEarlyDelete) {
248  //for now, if have a subProcess, don't allow early delete
249  // In the future we should use the SubProcess's 'keep list' to decide what can be kept
250  if (not allowEarlyDelete)
251  return;
252 
253  //see if 'canDeleteEarly' was set and if so setup the list with those products actually
254  // registered for this job
255  std::multimap<std::string, Worker*> branchToReadingWorker;
256  initializeBranchToReadingWorker(opts, preg, branchToReadingWorker);
257 
258  //If no delete early items have been specified we don't have to do anything
259  if (branchToReadingWorker.empty()) {
260  return;
261  }
262  const std::vector<std::string> kEmpty;
263  std::map<Worker*, unsigned int> reserveSizeForWorker;
264  unsigned int upperLimitOnReadingWorker = 0;
265  unsigned int upperLimitOnIndicies = 0;
266  unsigned int nUniqueBranchesToDelete = branchToReadingWorker.size();
267 
268  //talk with output modules first
269  modReg.forAllModuleHolders([&branchToReadingWorker, &nUniqueBranchesToDelete](maker::ModuleHolder* iHolder) {
270  auto comm = iHolder->createOutputModuleCommunicator();
271  if (comm) {
272  if (!branchToReadingWorker.empty()) {
273  //If an OutputModule needs a product, we can't delete it early
274  // so we should remove it from our list
275  SelectedProductsForBranchType const& kept = comm->keptProducts();
276  for (auto const& item : kept[InEvent]) {
277  BranchDescription const& desc = *item.first;
278  auto found = branchToReadingWorker.equal_range(desc.branchName());
279  if (found.first != found.second) {
280  --nUniqueBranchesToDelete;
281  branchToReadingWorker.erase(found.first, found.second);
282  }
283  }
284  }
285  }
286  });
287 
288  if (branchToReadingWorker.empty()) {
289  return;
290  }
291 
292  for (auto w : allWorkers()) {
293  //determine if this module could read a branch we want to delete early
294  auto pset = pset::Registry::instance()->getMapped(w->description().parameterSetID());
295  if (nullptr != pset) {
296  auto branches = pset->getUntrackedParameter<std::vector<std::string>>("mightGet", kEmpty);
297  if (not branches.empty()) {
298  ++upperLimitOnReadingWorker;
299  }
300  for (auto const& branch : branches) {
301  auto found = branchToReadingWorker.equal_range(branch);
302  if (found.first != found.second) {
303  ++upperLimitOnIndicies;
304  ++reserveSizeForWorker[w];
305  if (nullptr == found.first->second) {
306  found.first->second = w;
307  } else {
308  branchToReadingWorker.insert(make_pair(found.first->first, w));
309  }
310  }
311  }
312  }
313  }
314  {
315  auto it = branchToReadingWorker.begin();
316  std::vector<std::string> unusedBranches;
317  while (it != branchToReadingWorker.end()) {
318  if (it->second == nullptr) {
319  unusedBranches.push_back(it->first);
320  //erasing the object invalidates the iterator so must advance it first
321  auto temp = it;
322  ++it;
323  branchToReadingWorker.erase(temp);
324  } else {
325  ++it;
326  }
327  }
328  if (not unusedBranches.empty()) {
329  LogWarning l("UnusedProductsForCanDeleteEarly");
330  l << "The following products in the 'canDeleteEarly' list are not used in this job and will be ignored.\n"
331  " If possible, remove the producer from the job or add the product to the producer's own 'mightGet' list.";
332  for (auto const& n : unusedBranches) {
333  l << "\n " << n;
334  }
335  }
336  }
337  if (!branchToReadingWorker.empty()) {
338  earlyDeleteHelpers_.reserve(upperLimitOnReadingWorker);
339  earlyDeleteHelperToBranchIndicies_.resize(upperLimitOnIndicies, 0);
340  earlyDeleteBranchToCount_.reserve(nUniqueBranchesToDelete);
341  std::map<const Worker*, EarlyDeleteHelper*> alreadySeenWorkers;
342  std::string lastBranchName;
343  size_t nextOpenIndex = 0;
344  unsigned int* beginAddress = &(earlyDeleteHelperToBranchIndicies_.front());
345  for (auto& branchAndWorker : branchToReadingWorker) {
346  if (lastBranchName != branchAndWorker.first) {
347  //have to put back the period we removed earlier in order to get the proper name
348  BranchID bid(branchAndWorker.first + ".");
349  earlyDeleteBranchToCount_.emplace_back(bid, 0U);
350  lastBranchName = branchAndWorker.first;
351  }
352  auto found = alreadySeenWorkers.find(branchAndWorker.second);
353  if (alreadySeenWorkers.end() == found) {
354  //NOTE: we will set aside enough space in earlyDeleteHelperToBranchIndicies_ to accommodate
355  // all the branches that might be read by this worker. However, initially we will only tell the
356  // EarlyDeleteHelper about the first one. As additional branches are added via 'appendIndex' the
357  // EarlyDeleteHelper will automatically advance its internal end pointer.
358  size_t index = nextOpenIndex;
359  size_t nIndices = reserveSizeForWorker[branchAndWorker.second];
361  earlyDeleteHelpers_.emplace_back(beginAddress + index, beginAddress + index + 1, &earlyDeleteBranchToCount_);
362  branchAndWorker.second->setEarlyDeleteHelper(&(earlyDeleteHelpers_.back()));
363  alreadySeenWorkers.insert(std::make_pair(branchAndWorker.second, &(earlyDeleteHelpers_.back())));
364  nextOpenIndex += nIndices;
365  } else {
366  found->second->appendIndex(earlyDeleteBranchToCount_.size() - 1);
367  }
368  }
369 
370  //Now we can compactify the earlyDeleteHelperToBranchIndicies_ since we may have over estimated the
371  // space needed for each module
372  auto itLast = earlyDeleteHelpers_.begin();
373  for (auto it = earlyDeleteHelpers_.begin() + 1; it != earlyDeleteHelpers_.end(); ++it) {
374  if (itLast->end() != it->begin()) {
375  //figure the offset for next Worker since it hasn't been moved yet so it has the original address
376  unsigned int delta = it->begin() - itLast->end();
377  it->shiftIndexPointers(delta);
378 
380  earlyDeleteHelperToBranchIndicies_.begin() + (itLast->end() - beginAddress),
381  earlyDeleteHelperToBranchIndicies_.begin() + (it->begin() - beginAddress));
382  }
383  itLast = it;
384  }
386  earlyDeleteHelperToBranchIndicies_.begin() + (itLast->end() - beginAddress),
388 
389  //now tell the paths about the deleters
390  for (auto& p : trig_paths_) {
391  p.setEarlyDeleteHelpers(alreadySeenWorkers);
392  }
393  for (auto& p : end_paths_) {
394  p.setEarlyDeleteHelpers(alreadySeenWorkers);
395  }
397  }
398  }
399 
401  ProductRegistry& preg,
402  PreallocationConfiguration const* prealloc,
403  std::shared_ptr<ProcessConfiguration const> processConfiguration,
404  std::string const& pathName,
405  bool ignoreFilters,
406  PathWorkers& out,
407  std::vector<std::string> const& endPathNames) {
408  vstring modnames = proc_pset.getParameter<vstring>(pathName);
409  PathWorkers tmpworkers;
410 
411  unsigned int placeInPath = 0;
412  for (auto const& name : modnames) {
413  //Modules except EDFilters are set to run concurrently by default
414  bool doNotRunConcurrently = false;
416  if (name[0] == '!') {
417  filterAction = WorkerInPath::Veto;
418  } else if (name[0] == '-' or name[0] == '+') {
419  filterAction = WorkerInPath::Ignore;
420  }
421  if (name[0] == '|' or name[0] == '+') {
422  //cms.wait was specified so do not run concurrently
423  doNotRunConcurrently = true;
424  }
425 
427  if (filterAction != WorkerInPath::Normal or name[0] == '|') {
428  moduleLabel.erase(0, 1);
429  }
430 
431  bool isTracked;
432  ParameterSet* modpset = proc_pset.getPSetForUpdate(moduleLabel, isTracked);
433  if (modpset == nullptr) {
434  std::string pathType("endpath");
435  if (!search_all(endPathNames, pathName)) {
436  pathType = std::string("path");
437  }
439  << "The unknown module label \"" << moduleLabel << "\" appears in " << pathType << " \"" << pathName
440  << "\"\n please check spelling or remove that label from the path.";
441  }
442  assert(isTracked);
443 
444  Worker* worker = workerManager_.getWorker(*modpset, preg, prealloc, processConfiguration, moduleLabel);
445  if (ignoreFilters && filterAction != WorkerInPath::Ignore && worker->moduleType() == Worker::kFilter) {
446  // We have a filter on an end path, and the filter is not explicitly ignored.
447  // See if the filter is allowed.
448  std::vector<std::string> allowed_filters = proc_pset.getUntrackedParameter<vstring>("@filters_on_endpaths");
449  if (!search_all(allowed_filters, worker->description().moduleName())) {
450  // Filter is not allowed. Ignore the result, and issue a warning.
451  filterAction = WorkerInPath::Ignore;
452  LogWarning("FilterOnEndPath") << "The EDFilter '" << worker->description().moduleName()
453  << "' with module label '" << moduleLabel << "' appears on EndPath '"
454  << pathName << "'.\n"
455  << "The return value of the filter will be ignored.\n"
456  << "To suppress this warning, either remove the filter from the endpath,\n"
457  << "or explicitly ignore it in the configuration by using cms.ignore().\n";
458  }
459  }
460  bool runConcurrently = not doNotRunConcurrently;
461  if (runConcurrently && worker->moduleType() == Worker::kFilter and filterAction != WorkerInPath::Ignore) {
462  runConcurrently = false;
463  }
464  tmpworkers.emplace_back(worker, filterAction, placeInPath, runConcurrently);
465  ++placeInPath;
466  }
467 
468  out.swap(tmpworkers);
469  }
470 
472  ProductRegistry& preg,
473  PreallocationConfiguration const* prealloc,
474  std::shared_ptr<ProcessConfiguration const> processConfiguration,
475  int bitpos,
476  std::string const& name,
477  TrigResPtr trptr,
478  std::vector<std::string> const& endPathNames) {
479  PathWorkers tmpworkers;
480  fillWorkers(proc_pset, preg, prealloc, processConfiguration, name, false, tmpworkers, endPathNames);
481 
482  // an empty path will cause an extra bit that is not used
483  if (!tmpworkers.empty()) {
484  trig_paths_.emplace_back(bitpos,
485  name,
486  tmpworkers,
487  trptr,
488  actionTable(),
489  actReg_,
493  } else {
494  empty_trig_paths_.push_back(bitpos);
495  }
496  for (WorkerInPath const& workerInPath : tmpworkers) {
497  addToAllWorkers(workerInPath.getWorker());
498  }
499  }
500 
502  ProductRegistry& preg,
503  PreallocationConfiguration const* prealloc,
504  std::shared_ptr<ProcessConfiguration const> processConfiguration,
505  int bitpos,
506  std::string const& name,
507  std::vector<std::string> const& endPathNames) {
508  PathWorkers tmpworkers;
509  fillWorkers(proc_pset, preg, prealloc, processConfiguration, name, true, tmpworkers, endPathNames);
510 
511  if (!tmpworkers.empty()) {
512  //EndPaths are not supposed to stop if SkipEvent type exception happens
513  end_paths_.emplace_back(bitpos,
514  name,
515  tmpworkers,
516  TrigResPtr(),
517  actionTable(),
518  actReg_,
520  nullptr,
522  } else {
523  empty_end_paths_.push_back(bitpos);
524  }
525  for (WorkerInPath const& workerInPath : tmpworkers) {
526  addToAllWorkers(workerInPath.getWorker());
527  }
528  }
529 
531 
533 
535  Worker* found = nullptr;
536  for (auto const& worker : allWorkers()) {
537  if (worker->description().moduleLabel() == iLabel) {
538  found = worker;
539  break;
540  }
541  }
542  if (nullptr == found) {
543  return;
544  }
545 
546  iMod->replaceModuleFor(found);
547  found->beginStream(streamID_, streamContext_);
548  }
549 
550  std::vector<ModuleDescription const*> StreamSchedule::getAllModuleDescriptions() const {
551  std::vector<ModuleDescription const*> result;
552  result.reserve(allWorkers().size());
553 
554  for (auto const& worker : allWorkers()) {
555  ModuleDescription const* p = worker->descPtr();
556  result.push_back(p);
557  }
558  return result;
559  }
560 
562  WaitingTaskHolder iTask,
564  ServiceToken const& serviceToken,
565  std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters) {
566  EventPrincipal& ep = info.principal();
567 
568  // Caught exception is propagated via WaitingTaskHolder
569  CMS_SA_ALLOW try {
570  this->resetAll();
571 
573 
574  Traits::setStreamContext(streamContext_, ep);
575  //a service may want to communicate with another service
576  ServiceRegistry::Operate guard(serviceToken);
577  Traits::preScheduleSignal(actReg_.get(), &streamContext_);
578 
579  HLTPathStatus hltPathStatus(hlt::Pass, 0);
580  for (int empty_trig_path : empty_trig_paths_) {
581  results_->at(empty_trig_path) = hltPathStatus;
582  pathStatusInserters[empty_trig_path]->setPathStatus(streamID_, hltPathStatus);
583  std::exception_ptr except = pathStatusInserterWorkers_[empty_trig_path]
586  if (except) {
587  iTask.doneWaiting(except);
588  return;
589  }
590  }
591  for (int empty_end_path : empty_end_paths_) {
592  std::exception_ptr except = endPathStatusInserterWorkers_[empty_end_path]
595  if (except) {
596  iTask.doneWaiting(except);
597  return;
598  }
599  }
600 
603 
604  ++total_events_;
605 
606  //use to give priorities on an error to ones from Paths
607  auto pathErrorHolder = std::make_unique<std::atomic<std::exception_ptr*>>(nullptr);
608  auto pathErrorPtr = pathErrorHolder.get();
609  auto allPathsDone = make_waiting_task(
610  tbb::task::allocate_root(),
611  [iTask, this, serviceToken, pathError = std::move(pathErrorHolder)](std::exception_ptr const* iPtr) mutable {
612  ServiceRegistry::Operate operate(serviceToken);
613 
614  std::exception_ptr ptr;
615  if (pathError->load()) {
616  ptr = *pathError->load();
617  delete pathError->load();
618  }
619  if ((not ptr) and iPtr) {
620  ptr = *iPtr;
621  }
623  });
624  //The holder guarantees that if the paths finish before the loop ends
625  // that we do not start too soon. It also guarantees that the task will
626  // run under that condition.
627  WaitingTaskHolder allPathsHolder(allPathsDone);
628 
629  auto pathsDone = make_waiting_task(tbb::task::allocate_root(),
630  [allPathsHolder, pathErrorPtr, transitionInfo = info, this, serviceToken](
631  std::exception_ptr const* iPtr) mutable {
632  ServiceRegistry::Operate operate(serviceToken);
633 
634  if (iPtr) {
635  //this is used to prioritize this error over one
636  // that happens in EndPath or Accumulate
637  pathErrorPtr->store(new std::exception_ptr(*iPtr));
638  }
639  finishedPaths(*pathErrorPtr, std::move(allPathsHolder), transitionInfo);
640  });
641 
642  //The holder guarantees that if the paths finish before the loop ends
643  // that we do not start too soon. It also guarantees that the task will
644  // run under that condition.
645  WaitingTaskHolder taskHolder(pathsDone);
646 
647  //start end paths first so on single threaded the paths will run first
648  for (auto it = end_paths_.rbegin(), itEnd = end_paths_.rend(); it != itEnd; ++it) {
649  it->processOneOccurrenceAsync(allPathsDone, info, serviceToken, streamID_, &streamContext_);
650  }
651 
652  for (auto it = trig_paths_.rbegin(), itEnd = trig_paths_.rend(); it != itEnd; ++it) {
653  it->processOneOccurrenceAsync(pathsDone, info, serviceToken, streamID_, &streamContext_);
654  }
655 
656  ParentContext parentContext(&streamContext_);
658  allPathsDone, info, serviceToken, streamID_, parentContext, &streamContext_);
659  } catch (...) {
660  iTask.doneWaiting(std::current_exception());
661  }
662  }
663 
664  void StreamSchedule::finishedPaths(std::atomic<std::exception_ptr*>& iExcept,
665  WaitingTaskHolder iWait,
667  if (iExcept) {
668  // Caught exception is propagated via WaitingTaskHolder
669  CMS_SA_ALLOW try { std::rethrow_exception(*(iExcept.load())); } catch (cms::Exception& e) {
674  edm::printCmsExceptionWarning("SkipEvent", e);
675  *(iExcept.load()) = std::exception_ptr();
676  } else {
677  *(iExcept.load()) = std::current_exception();
678  }
679  } catch (...) {
680  *(iExcept.load()) = std::current_exception();
681  }
682  }
683 
684  if ((not iExcept) and results_->accept()) {
685  ++total_passed_;
686  }
687 
688  if (nullptr != results_inserter_.get()) {
689  // Caught exception is propagated to the caller
690  CMS_SA_ALLOW try {
691  //Even if there was an exception, we need to allow results inserter
692  // to run since some module may be waiting on its results.
693  ParentContext parentContext(&streamContext_);
695 
696  results_inserter_->doWork<Traits>(info, streamID_, parentContext, &streamContext_);
697  } catch (cms::Exception& ex) {
698  if (not iExcept) {
699  if (ex.context().empty()) {
700  std::ostringstream ost;
701  ost << "Processing Event " << info.principal().id();
702  ex.addContext(ost.str());
703  }
704  iExcept.store(new std::exception_ptr(std::current_exception()));
705  }
706  } catch (...) {
707  if (not iExcept) {
708  iExcept.store(new std::exception_ptr(std::current_exception()));
709  }
710  }
711  }
712  std::exception_ptr ptr;
713  if (iExcept) {
714  ptr = *iExcept.load();
715  }
716  iWait.doneWaiting(ptr);
717  }
718 
719  std::exception_ptr StreamSchedule::finishProcessOneEvent(std::exception_ptr iExcept) {
721 
722  if (iExcept) {
723  //add context information to the exception and print message
724  try {
725  convertException::wrap([&]() { std::rethrow_exception(iExcept); });
726  } catch (cms::Exception& ex) {
727  bool const cleaningUpAfterException = false;
728  if (ex.context().empty()) {
729  addContextAndPrintException("Calling function StreamSchedule::processOneEvent", ex, cleaningUpAfterException);
730  } else {
731  addContextAndPrintException("", ex, cleaningUpAfterException);
732  }
733  iExcept = std::current_exception();
734  }
735 
736  actReg_->preStreamEarlyTerminationSignal_(streamContext_, TerminationOrigin::ExceptionFromThisContext);
737  }
738  // Caught exception is propagated to the caller
739  CMS_SA_ALLOW try { Traits::postScheduleSignal(actReg_.get(), &streamContext_); } catch (...) {
740  if (not iExcept) {
741  iExcept = std::current_exception();
742  }
743  }
744  if (not iExcept) {
746  }
747 
748  return iExcept;
749  }
750 
751  void StreamSchedule::availablePaths(std::vector<std::string>& oLabelsToFill) const {
752  oLabelsToFill.reserve(trig_paths_.size());
753  std::transform(trig_paths_.begin(),
754  trig_paths_.end(),
755  std::back_inserter(oLabelsToFill),
756  std::bind(&Path::name, std::placeholders::_1));
757  }
758 
759  void StreamSchedule::modulesInPath(std::string const& iPathLabel, std::vector<std::string>& oLabelsToFill) const {
760  TrigPaths::const_iterator itFound = std::find_if(
761  trig_paths_.begin(),
762  trig_paths_.end(),
763  std::bind(std::equal_to<std::string>(), iPathLabel, std::bind(&Path::name, std::placeholders::_1)));
764  if (itFound != trig_paths_.end()) {
765  oLabelsToFill.reserve(itFound->size());
766  for (size_t i = 0; i < itFound->size(); ++i) {
767  oLabelsToFill.push_back(itFound->getWorker(i)->description().moduleLabel());
768  }
769  }
770  }
771 
773  std::vector<ModuleDescription const*>& descriptions,
774  unsigned int hint) const {
775  descriptions.clear();
776  bool found = false;
777  TrigPaths::const_iterator itFound;
778 
779  if (hint < trig_paths_.size()) {
780  itFound = trig_paths_.begin() + hint;
781  if (itFound->name() == iPathLabel)
782  found = true;
783  }
784  if (!found) {
785  // if the hint did not work, do it the slow way
786  itFound = std::find_if(
787  trig_paths_.begin(),
788  trig_paths_.end(),
789  std::bind(std::equal_to<std::string>(), iPathLabel, std::bind(&Path::name, std::placeholders::_1)));
790  if (itFound != trig_paths_.end())
791  found = true;
792  }
793  if (found) {
794  descriptions.reserve(itFound->size());
795  for (size_t i = 0; i < itFound->size(); ++i) {
796  descriptions.push_back(itFound->getWorker(i)->descPtr());
797  }
798  }
799  }
800 
802  std::vector<ModuleDescription const*>& descriptions,
803  unsigned int hint) const {
804  descriptions.clear();
805  bool found = false;
806  TrigPaths::const_iterator itFound;
807 
808  if (hint < end_paths_.size()) {
809  itFound = end_paths_.begin() + hint;
810  if (itFound->name() == iEndPathLabel)
811  found = true;
812  }
813  if (!found) {
814  // if the hint did not work, do it the slow way
815  itFound = std::find_if(
816  end_paths_.begin(),
817  end_paths_.end(),
818  std::bind(std::equal_to<std::string>(), iEndPathLabel, std::bind(&Path::name, std::placeholders::_1)));
819  if (itFound != end_paths_.end())
820  found = true;
821  }
822  if (found) {
823  descriptions.reserve(itFound->size());
824  for (size_t i = 0; i < itFound->size(); ++i) {
825  descriptions.push_back(itFound->getWorker(i)->descPtr());
826  }
827  }
828  }
829 
830  void StreamSchedule::enableEndPaths(bool active) { endpathsAreActive_ = active; }
831 
833 
834  static void fillModuleInPathSummary(Path const& path, size_t which, ModuleInPathSummary& sum) {
835  sum.timesVisited += path.timesVisited(which);
836  sum.timesPassed += path.timesPassed(which);
837  sum.timesFailed += path.timesFailed(which);
838  sum.timesExcept += path.timesExcept(which);
839  sum.moduleLabel = path.getWorker(which)->description().moduleLabel();
840  }
841 
842  static void fillPathSummary(Path const& path, PathSummary& sum) {
843  sum.name = path.name();
844  sum.bitPosition = path.bitPosition();
845  sum.timesRun += path.timesRun();
846  sum.timesPassed += path.timesPassed();
847  sum.timesFailed += path.timesFailed();
848  sum.timesExcept += path.timesExcept();
849 
850  Path::size_type sz = path.size();
851  if (sum.moduleInPathSummaries.empty()) {
852  std::vector<ModuleInPathSummary> temp(sz);
853  for (size_t i = 0; i != sz; ++i) {
855  }
856  sum.moduleInPathSummaries.swap(temp);
857  } else {
858  assert(sz == sum.moduleInPathSummaries.size());
859  for (size_t i = 0; i != sz; ++i) {
861  }
862  }
863  }
864 
865  static void fillWorkerSummaryAux(Worker const& w, WorkerSummary& sum) {
866  sum.timesVisited += w.timesVisited();
867  sum.timesRun += w.timesRun();
868  sum.timesPassed += w.timesPassed();
869  sum.timesFailed += w.timesFailed();
870  sum.timesExcept += w.timesExcept();
871  sum.moduleLabel = w.description().moduleLabel();
872  }
873 
874  static void fillWorkerSummary(Worker const* pw, WorkerSummary& sum) { fillWorkerSummaryAux(*pw, sum); }
875 
877  rep.eventSummary.totalEvents += totalEvents();
878  rep.eventSummary.totalEventsPassed += totalEventsPassed();
879  rep.eventSummary.totalEventsFailed += totalEventsFailed();
880 
881  fill_summary(trig_paths_, rep.trigPathSummaries, &fillPathSummary);
882  fill_summary(end_paths_, rep.endPathSummaries, &fillPathSummary);
883  fill_summary(allWorkers(), rep.workerSummaries, &fillWorkerSummary);
884  }
885 
887  using std::placeholders::_1;
889  for_all(trig_paths_, std::bind(&Path::clearCounters, _1));
890  for_all(end_paths_, std::bind(&Path::clearCounters, _1));
891  for_all(allWorkers(), std::bind(&Worker::clearCounters, _1));
892  }
893 
895  skippingEvent_ = false;
896  results_->reset();
897  }
898 
900 
902  //must be sure we have cleared the count first
903  for (auto& count : earlyDeleteBranchToCount_) {
904  count.count = 0;
905  }
906  //now reset based on how many helpers use that branch
908  ++(earlyDeleteBranchToCount_[index].count);
909  }
910  for (auto& helper : earlyDeleteHelpers_) {
911  helper.reset();
912  }
913  }
914 
916  std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
917  std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
919  int bitpos = 0;
920  unsigned int indexEmpty = 0;
921  unsigned int indexOfPath = 0;
922  for (auto& pathStatusInserter : pathStatusInserters) {
923  std::shared_ptr<PathStatusInserter> inserterPtr = get_underlying(pathStatusInserter);
924  WorkerPtr workerPtr(
925  new edm::WorkerT<PathStatusInserter::ModuleType>(inserterPtr, inserterPtr->moduleDescription(), &actions));
926  pathStatusInserterWorkers_.emplace_back(workerPtr);
927  workerPtr->setActivityRegistry(actReg_);
928  addToAllWorkers(workerPtr.get());
929 
930  // A little complexity here because a C++ Path object is not
931  // instantiated and put into end_paths if there are no modules
932  // on the configured path.
933  if (indexEmpty < empty_trig_paths_.size() && bitpos == empty_trig_paths_.at(indexEmpty)) {
934  ++indexEmpty;
935  } else {
936  trig_paths_.at(indexOfPath).setPathStatusInserter(inserterPtr.get(), workerPtr.get());
937  ++indexOfPath;
938  }
939  ++bitpos;
940  }
941 
942  bitpos = 0;
943  indexEmpty = 0;
944  indexOfPath = 0;
945  for (auto& endPathStatusInserter : endPathStatusInserters) {
946  std::shared_ptr<EndPathStatusInserter> inserterPtr = get_underlying(endPathStatusInserter);
947  WorkerPtr workerPtr(
948  new edm::WorkerT<EndPathStatusInserter::ModuleType>(inserterPtr, inserterPtr->moduleDescription(), &actions));
949  endPathStatusInserterWorkers_.emplace_back(workerPtr);
950  workerPtr->setActivityRegistry(actReg_);
951  addToAllWorkers(workerPtr.get());
952 
953  // A little complexity here because a C++ Path object is not
954  // instantiated and put into end_paths if there are no modules
955  // on the configured path.
956  if (indexEmpty < empty_end_paths_.size() && bitpos == empty_end_paths_.at(indexEmpty)) {
957  ++indexEmpty;
958  } else {
959  end_paths_.at(indexOfPath).setPathStatusInserter(nullptr, workerPtr.get());
960  ++indexOfPath;
961  }
962  ++bitpos;
963  }
964  }
965 } // namespace edm
edm::WorkerSummary::timesVisited
int timesVisited
Definition: TriggerReport.h:45
edm::Path::size_type
WorkersInPath::size_type size_type
Definition: Path.h:46
edm::StreamSchedule::fillEndPath
void fillEndPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, std::vector< std::string > const &endPathNames)
Definition: StreamSchedule.cc:501
edm::EventTransitionInfo
Definition: TransitionInfoTypes.h:26
edm::StreamSchedule::addToAllWorkers
void addToAllWorkers(Worker *w)
Definition: StreamSchedule.cc:899
edm::pset::Registry::instance
static Registry * instance()
Definition: Registry.cc:12
edm::fillWorkerSummaryAux
static void fillWorkerSummaryAux(Worker const &w, WorkerSummary &sum)
Definition: StreamSchedule.cc:865
edm::PathContext::PathType::kPath
edm::StreamID
Definition: StreamID.h:30
edm::WorkerManager::addToAllWorkers
void addToAllWorkers(Worker *w)
Definition: WorkerManager.cc:123
edm::TerminationOrigin::ExceptionFromThisContext
mps_fire.i
i
Definition: mps_fire.py:428
edm::StreamSchedule::endPathStatusInserterWorkers_
std::vector< edm::propagate_const< WorkerPtr > > endPathStatusInserterWorkers_
Definition: StreamSchedule.h:340
edm::WorkerSummary::moduleLabel
std::string moduleLabel
Definition: TriggerReport.h:51
edm::StreamSchedule::empty_end_paths_
std::vector< int > empty_end_paths_
Definition: StreamSchedule.h:345
edm::StreamSchedule::clearCounters
void clearCounters()
Clear all the counters in the trigger report.
Definition: StreamSchedule.cc:886
edm::ModuleInPathSummary::timesFailed
int timesFailed
Definition: TriggerReport.h:27
edm::StreamSchedule::getTriggerReport
void getTriggerReport(TriggerReport &rep) const
Definition: StreamSchedule.cc:876
MessageLogger.h
funct::false
false
Definition: Factorize.h:29
dqmiodumpmetadata.n
n
Definition: dqmiodumpmetadata.py:28
edm::StreamSchedule::beginStream
void beginStream()
Definition: StreamSchedule.cc:530
edm::StreamSchedule::results
TrigResConstPtr results() const
Definition: StreamSchedule.h:325
cms::Exception::addContext
void addContext(std::string const &context)
Definition: Exception.cc:165
edm::StreamSchedule::makePathStatusInserters
void makePathStatusInserters(std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, ExceptionToActionTable const &actions)
Definition: StreamSchedule.cc:915
WorkerT.h
edm::PathSummary::timesFailed
int timesFailed
Definition: TriggerReport.h:37
edm::ParameterSet::getUntrackedParameterSet
ParameterSet getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
Definition: ParameterSet.cc:2136
BranchIDListHelper.h
TriggerNamesService.h
WaitingTaskHolder.h
edm::StreamSchedule::fillWorkers
void fillWorkers(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &name, bool ignoreFilters, PathWorkers &out, std::vector< std::string > const &endPathNames)
Definition: StreamSchedule.cc:400
edm::StreamSchedule::workerManager_
WorkerManager workerManager_
Definition: StreamSchedule.h:333
MicroEventContent_cff.branch
branch
Definition: MicroEventContent_cff.py:178
edm
HLT enums.
Definition: AlignableModifier.h:19
edm::ProcessContext
Definition: ProcessContext.h:27
edm::StreamSchedule::streamContext_
StreamContext streamContext_
Definition: StreamSchedule.h:367
edm::printCmsExceptionWarning
void printCmsExceptionWarning(char const *behavior, cms::Exception const &e)
Definition: ExceptionMessages.cc:25
AlCaHLTBitMon_ParallelJobs.p
p
Definition: AlCaHLTBitMon_ParallelJobs.py:153
edm::StreamSchedule::earlyDeleteBranchToCount_
std::vector< BranchToCount > earlyDeleteBranchToCount_
Definition: StreamSchedule.h:350
edm::StreamSchedule::total_events_
int total_events_
Definition: StreamSchedule.h:362
edm::ModuleDescription::moduleName
std::string const & moduleName() const
Definition: ModuleDescription.h:42
edm::WorkerManager::addToUnscheduledWorkers
void addToUnscheduledWorkers(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration > processConfiguration, std::string label, std::set< std::string > &unscheduledLabels, std::vector< std::string > &shouldBeUsedLabels)
Definition: WorkerManager.cc:42
edm::ModuleInPathSummary::timesExcept
int timesExcept
Definition: TriggerReport.h:28
TriggerTimingReport.h
edm::StreamSchedule::PathWorkers
std::vector< WorkerInPath > PathWorkers
Definition: StreamSchedule.h:164
Algorithms.h
edm::StreamSchedule::totalEventsPassed
int totalEventsPassed() const
Definition: StreamSchedule.h:230
edm::fillWorkerSummary
static void fillWorkerSummary(Worker const *pw, WorkerSummary &sum)
Definition: StreamSchedule.cc:874
edm::HLTGlobalStatus
Definition: HLTGlobalStatus.h:25
to
cms::cuda::assert
assert(be >=bs)
edm::propagate_const::get
constexpr element_type const * get() const
Definition: propagate_const.h:64
edm::StreamSchedule::StreamSchedule
StreamSchedule(std::shared_ptr< TriggerResultInserter > inserter, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, std::shared_ptr< ModuleRegistry >, ParameterSet &proc_pset, service::TriggerNamesService const &tns, PreallocationConfiguration const &prealloc, ProductRegistry &pregistry, BranchIDListHelper &branchIDListHelper, ExceptionToActionTable const &actions, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration > processConfiguration, bool allowEarlyDelete, StreamID streamID, ProcessContext const *processContext)
Definition: StreamSchedule.cc:138
info
static const TGPicture * info(bool iBackgroundIsBlack)
Definition: FWCollectionSummaryWidget.cc:153
edm::StreamSchedule::allWorkers
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
Definition: StreamSchedule.h:255
edm::maker::ModuleHolder
Definition: ModuleHolder.h:37
edm::ParameterSet::getUntrackedParameter
T getUntrackedParameter(std::string const &, T const &) const
edm::StreamID::value
unsigned int value() const
Definition: StreamID.h:43
ProductRegistry.h
edm::WorkerT
Definition: Frameworkfwd.h:62
WorkerInPath.h
edm::StreamSchedule::WorkerPtr
std::shared_ptr< Worker > WorkerPtr
Definition: StreamSchedule.h:159
edm::LogInfo
Log< level::Info, false > LogInfo
Definition: MessageLogger.h:125
edm::WaitingTaskHolder::doneWaiting
void doneWaiting(std::exception_ptr iExcept)
Definition: WaitingTaskHolder.h:75
newFWLiteAna.found
found
Definition: newFWLiteAna.py:118
groupFilesInBlocks.temp
list temp
Definition: groupFilesInBlocks.py:142
edm::LogWarning
Log< level::Warning, false > LogWarning
Definition: MessageLogger.h:122
edm::ModuleInPathSummary::timesPassed
int timesPassed
Definition: TriggerReport.h:26
CMS_SA_ALLOW
#define CMS_SA_ALLOW
Definition: thread_safety_macros.h:5
edm::PathSummary::moduleInPathSummaries
std::vector< ModuleInPathSummary > moduleInPathSummaries
Definition: TriggerReport.h:41
edm::StreamSchedule::vstring
std::vector< std::string > vstring
Definition: StreamSchedule.h:154
edm::ModuleDescription
Definition: ModuleDescription.h:21
edm::PathSummary::name
std::string name
Definition: TriggerReport.h:40
edm::StreamSchedule::results_inserter_
edm::propagate_const< WorkerPtr > results_inserter_
Definition: StreamSchedule.h:338
edm::for_all
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
edm::ProductRegistry
Definition: ProductRegistry.h:37
TriggerReport.h
edm::ServiceToken
Definition: ServiceToken.h:40
edm::propagate_const
Definition: propagate_const.h:32
OutputModuleCommunicator.h
edm::vstring
std::vector< std::string > vstring
Definition: Schedule.cc:653
edm::Worker::kFilter
Definition: Worker.h:91
edm::StreamSchedule::number_of_unscheduled_modules_
unsigned int number_of_unscheduled_modules_
Definition: StreamSchedule.h:364
edm::WorkerSummary::timesExcept
int timesExcept
Definition: TriggerReport.h:49
edm::EventPrincipal
Definition: EventPrincipal.h:46
OutputModuleDescription.h
edm::StreamSchedule::totalEvents
int totalEvents() const
Definition: StreamSchedule.h:226
edm::StreamSchedule::moduleDescriptionsInEndPath
void moduleDescriptionsInEndPath(std::string const &iEndPathLabel, std::vector< ModuleDescription const * > &descriptions, unsigned int hint) const
Definition: StreamSchedule.cc:801
edm::maker::ModuleHolder::createOutputModuleCommunicator
virtual std::unique_ptr< OutputModuleCommunicator > createOutputModuleCommunicator()=0
edm::ModuleInPathSummary::moduleLabel
std::string moduleLabel
Definition: TriggerReport.h:30
ProcessConfiguration.h
hltMonBTagIPClient_cfi.pathName
pathName
Definition: hltMonBTagIPClient_cfi.py:5
edm::StreamSchedule::processOneEventAsync
void processOneEventAsync(WaitingTaskHolder iTask, EventTransitionInfo &, ServiceToken const &token, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters)
Definition: StreamSchedule.cc:561
edm::StreamSchedule::endPathsEnabled
bool endPathsEnabled() const
Definition: StreamSchedule.cc:832
w
const double w
Definition: UKUtility.cc:23
edm::convertException::wrap
auto wrap(F iFunc) -> decltype(iFunc())
Definition: ConvertException.h:19
edm::BranchID
Definition: BranchID.h:14
edm::PathSummary::bitPosition
int bitPosition
Definition: TriggerReport.h:34
edm::WorkerInPath::Ignore
Definition: WorkerInPath.h:27
mps_fire.end
end
Definition: mps_fire.py:242
edm::exception_actions::FailPath
Definition: ExceptionActions.h:11
edm::StreamSchedule::enableEndPaths
void enableEndPaths(bool active)
Definition: StreamSchedule.cc:830
edm::WorkerSummary::timesRun
int timesRun
Definition: TriggerReport.h:46
edm::OccurrenceTraits< EventPrincipal, BranchActionStreamBegin >
Definition: OccurrenceTraits.h:38
submitPVResolutionJobs.count
count
Definition: submitPVResolutionJobs.py:352
edm::StreamSchedule::replaceModule
void replaceModule(maker::ModuleHolder *iMod, std::string const &iLabel)
clone the type of module with label iLabel but configure with iPSet.
Definition: StreamSchedule.cc:534
HcalDetIdTransform::transform
unsigned transform(const HcalDetId &id, unsigned transformCode)
Definition: HcalDetIdTransform.cc:7
edm::StreamSchedule::actionTable
ExceptionToActionTable const & actionTable() const
returns the action table
Definition: StreamSchedule.h:283
edm::InEvent
Definition: BranchType.h:11
ConvertException.h
edm::exception_actions::SkipEvent
Definition: ExceptionActions.h:11
ExceptionCollector.h
ParameterSetDescription.h
b
double b
Definition: hdecay.h:118
edm::Worker
Definition: Worker.h:88
edm::WorkerInPath::Normal
Definition: WorkerInPath.h:27
edm::make_waiting_task
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:87
edm::StreamSchedule::results_
edm::propagate_const< TrigResPtr > results_
Definition: StreamSchedule.h:336
edm::StreamSchedule::empty_trig_paths_
std::vector< int > empty_trig_paths_
Definition: StreamSchedule.h:344
edm::WorkerSummary
Definition: TriggerReport.h:44
edm::StreamSchedule::initializeEarlyDelete
void initializeEarlyDelete(ModuleRegistry &modReg, edm::ParameterSet const &opts, edm::ProductRegistry const &preg, bool allowEarlyDelete)
Definition: StreamSchedule.cc:244
edm::ModuleRegistry::forAllModuleHolders
void forAllModuleHolders(F iFunc)
Definition: ModuleRegistry.h:52
mitigatedMETSequence_cff.U
U
Definition: mitigatedMETSequence_cff.py:36
edm::ParentContext
Definition: ParentContext.h:27
AlCaHLTBitMon_QueryRunRegistry.string
string
Definition: AlCaHLTBitMon_QueryRunRegistry.py:256
edm::WorkerInPath::FilterAction
FilterAction
Definition: WorkerInPath.h:27
edm::WorkerManager::beginStream
void beginStream(StreamID iID, StreamContext &streamContext)
Definition: WorkerManager.cc:109
edm::WorkerSummary::timesPassed
int timesPassed
Definition: TriggerReport.h:47
funct::true
true
Definition: Factorize.h:173
edm::StreamSchedule::moduleDescriptionsInPath
void moduleDescriptionsInPath(std::string const &iPathLabel, std::vector< ModuleDescription const * > &descriptions, unsigned int hint) const
Definition: StreamSchedule.cc:772
StreamSchedule.h
edm::ParameterSet
Definition: ParameterSet.h:47
cms::Exception::context
std::list< std::string > const & context() const
Definition: Exception.cc:147
ParameterSet
Definition: Functions.h:16
edm::WaitingTaskHolder
Definition: WaitingTaskHolder.h:30
edm::StreamSchedule::skippingEvent_
std::atomic< bool > skippingEvent_
Definition: StreamSchedule.h:369
edm::WorkerManager::setupOnDemandSystem
void setupOnDemandSystem(EventTransitionInfo const &)
Definition: WorkerManager.cc:138
edm::StreamSchedule::actReg_
std::shared_ptr< ActivityRegistry > actReg_
Definition: StreamSchedule.h:334
LuminosityBlockProcessingStatus.h
dumpMFGeometry_cfg.delta
delta
Definition: dumpMFGeometry_cfg.py:25
helper
Definition: helper.py:1
TriggerResultInserter.h
edm::Path
Definition: Path.h:41
edm::HLTPathStatus
Definition: HLTPathStatus.h:33
edm::get_underlying
constexpr T & get_underlying(propagate_const< T > &)
Definition: propagate_const.h:103
edm::BranchIDListHelper
Definition: BranchIDListHelper.h:15
edm::addContextAndPrintException
void addContextAndPrintException(char const *context, cms::Exception &ex, bool disablePrint)
Definition: ExceptionHelpers.cc:11
edm::StreamSchedule::fillTrigPath
void fillTrigPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, TrigResPtr, std::vector< std::string > const &endPathNames)
Definition: StreamSchedule.cc:471
writedatasetfile.action
action
Definition: writedatasetfile.py:8
cuy.rep
rep
Definition: cuy.py:1190
edm::StreamSchedule::end_paths_
TrigPaths end_paths_
Definition: StreamSchedule.h:343
trackerHitRTTI::vector
Definition: trackerHitRTTI.h:21
edm::WorkerManager::processAccumulatorsAsync
void processAccumulatorsAsync(WaitingTask *, typename T::TransitionInfoType const &, ServiceToken const &, StreamID, ParentContext const &, typename T::Context const *)
Definition: WorkerManager.h:146
B2GTnPMonitor_cfi.item
item
Definition: B2GTnPMonitor_cfi.py:147
edm::fillPathSummary
static void fillPathSummary(Path const &path, PathSummary &sum)
Definition: StreamSchedule.cc:842
edm::ExceptionToActionTable
Definition: ExceptionActions.h:16
edm::Worker::description
ModuleDescription const & description() const
Definition: Worker.h:188
TrackCollections2monitor_cff.func
func
Definition: TrackCollections2monitor_cff.py:359
PathContext.h
PathStatusInserter.h
edm::TriggerReport
Definition: TriggerReport.h:56
edm::StreamSchedule::streamID_
StreamID streamID_
Definition: StreamSchedule.h:366
edm::Worker::moduleType
virtual Types moduleType() const =0
cmsLHEtoEOSManager.l
l
Definition: cmsLHEtoEOSManager.py:204
EndPathStatusInserter.h
edm::StreamSchedule::getAllModuleDescriptions
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
Definition: StreamSchedule.cc:550
edm::service::TriggerNamesService
Definition: TriggerNamesService.h:42
Registry.h
edm::PreallocationConfiguration
Definition: PreallocationConfiguration.h:27
edm::ModuleInPathSummary::timesVisited
int timesVisited
Definition: TriggerReport.h:25
patCandidatesForDimuonsSequences_cff.pathNames
pathNames
Definition: patCandidatesForDimuonsSequences_cff.py:109
edm::StreamSchedule::resetAll
void resetAll()
Definition: StreamSchedule.cc:894
edm::StreamSchedule::finishedPaths
void finishedPaths(std::atomic< std::exception_ptr * > &, WaitingTaskHolder, EventTransitionInfo &)
Definition: StreamSchedule.cc:664
ModuleRegistry.h
edm::Worker::clearCounters
void clearCounters()
Definition: Worker.h:215
submitPVResolutionJobs.desc
string desc
Definition: submitPVResolutionJobs.py:251
edm::StreamSchedule::earlyDeleteHelpers_
std::vector< EarlyDeleteHelper > earlyDeleteHelpers_
Definition: StreamSchedule.h:360
edm::maker::ModuleHolder::replaceModuleFor
virtual void replaceModuleFor(Worker *) const =0
edm::SelectedProductsForBranchType
std::array< SelectedProducts, NumBranchTypes > SelectedProductsForBranchType
Definition: SelectedProducts.h:13
eostools.move
def move(src, dest)
Definition: eostools.py:511
edm::ModuleInPathSummary
Definition: TriggerReport.h:24
edm::ModuleRegistry
Definition: ModuleRegistry.h:40
edm::StreamSchedule::TrigResPtr
std::shared_ptr< HLTGlobalStatus > TrigResPtr
Definition: StreamSchedule.h:157
tier0.unique
def unique(seq, keepstr=True)
Definition: tier0.py:24
edm::Path::name
std::string const & name() const
Definition: Path.h:74
edm::service::TriggerNamesService::getTrigPaths
Strings const & getTrigPaths() const
Definition: TriggerNamesService.h:55
edm::PathSummary
Definition: TriggerReport.h:33
edm::fillModuleInPathSummary
static void fillModuleInPathSummary(Path const &path, size_t which, ModuleInPathSummary &sum)
Definition: StreamSchedule.cc:834
ModuleHolder.h
edm::PathSummary::timesRun
int timesRun
Definition: TriggerReport.h:35
edm::service::TriggerNamesService::getEndPaths
Strings const & getEndPaths() const
Definition: TriggerNamesService.h:74
edm::WorkerSummary::timesFailed
int timesFailed
Definition: TriggerReport.h:48
Exception
Definition: hltDiff.cc:246
edm::PathSummary::timesPassed
int timesPassed
Definition: TriggerReport.h:36
edm::StreamSchedule::trig_paths_
TrigPaths trig_paths_
Definition: StreamSchedule.h:342
edm::WorkerInPath
Definition: WorkerInPath.h:25
edm::PathContext::PathType::kEndPath
edm::PathSummary::timesExcept
int timesExcept
Definition: TriggerReport.h:38
edm::WorkerInPath::Veto
Definition: WorkerInPath.h:27
edm::StreamSchedule::resetEarlyDelete
void resetEarlyDelete()
Definition: StreamSchedule.cc:901
Skims_PA_cff.name
name
Definition: Skims_PA_cff.py:17
or
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
edm::ParameterSet::getParameter
T getParameter(std::string const &) const
Definition: ParameterSet.h:303
edm::ExceptionToActionTable::find
exception_actions::ActionCodes find(const std::string &category) const
Definition: ExceptionActions.cc:85
edm::WorkerManager::setupResolvers
void setupResolvers(Principal &principal)
Definition: WorkerManager.cc:129
edm::WorkerManager::endStream
void endStream(StreamID iID, StreamContext &streamContext)
Definition: WorkerManager.cc:115
AlignmentPI::index
index
Definition: AlignmentPayloadInspectorHelper.h:46
edm::StreamSchedule::endStream
void endStream()
Definition: StreamSchedule.cc:532
edm::WorkerManager::getWorker
Worker * getWorker(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &label)
Definition: WorkerManager.cc:33
edm::BranchDescription
Definition: BranchDescription.h:32
MillePedeFileConverter_cfg.out
out
Definition: MillePedeFileConverter_cfg.py:31
actions
roAction_t actions[nactions]
Definition: GenABIO.cc:181
mps_fire.result
result
Definition: mps_fire.py:311
edm::search_all
bool search_all(ForwardSequence const &s, Datum const &d)
Definition: Algorithms.h:36
cms::Exception
Definition: Exception.h:70
castor_dqm_sourceclient_file_cfg.path
path
Definition: castor_dqm_sourceclient_file_cfg.py:37
edm::StreamSchedule::streamID
StreamID streamID() const
Definition: StreamSchedule.h:199
ParameterSet.h
HerwigMaxPtPartonFilter_cfi.moduleLabel
moduleLabel
Definition: HerwigMaxPtPartonFilter_cfi.py:4
edm::StreamSchedule::pathStatusInserterWorkers_
std::vector< edm::propagate_const< WorkerPtr > > pathStatusInserterWorkers_
Definition: StreamSchedule.h:339
edm::hlt::Pass
accept
Definition: HLTenums.h:18
edm::StreamSchedule::availablePaths
void availablePaths(std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill the labels for all paths in the process
Definition: StreamSchedule.cc:751
edm::exception_actions::IgnoreCompletely
Definition: ExceptionActions.h:11
edm::StreamSchedule::earlyDeleteHelperToBranchIndicies_
std::vector< unsigned int > earlyDeleteHelperToBranchIndicies_
Definition: StreamSchedule.h:357
Factory.h
edm::Path::clearCounters
void clearCounters()
Definition: Path.cc:198
edm::ParameterSet::getPSetForUpdate
ParameterSet * getPSetForUpdate(std::string const &name, bool &isTracked)
Definition: ParameterSet.cc:457
edm::StreamSchedule::finishProcessOneEvent
std::exception_ptr finishProcessOneEvent(std::exception_ptr)
Definition: StreamSchedule.cc:719
edm::Log
Definition: MessageLogger.h:70
eostools.which
def which(cmd)
Definition: eostools.py:336
edm::ServiceRegistry::Operate
Definition: ServiceRegistry.h:40
edm::StreamSchedule::totalEventsFailed
int totalEventsFailed() const
Definition: StreamSchedule.h:234
edm::pset::Registry::getMapped
bool getMapped(key_type const &k, value_type &result) const
Definition: Registry.cc:17
edm::errors::Configuration
Definition: EDMException.h:36
edm::StreamSchedule::modulesInPath
void modulesInPath(std::string const &iPathLabel, std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel
Definition: StreamSchedule.cc:759
SiStripBadComponentsDQMServiceTemplate_cfg.ep
ep
Definition: SiStripBadComponentsDQMServiceTemplate_cfg.py:86
label
const char * label
Definition: PFTauDecayModeTools.cc:11
edm::StreamSchedule::endpathsAreActive_
volatile bool endpathsAreActive_
Definition: StreamSchedule.h:368
muonDTDigis_cfi.pset
pset
Definition: muonDTDigis_cfi.py:27
geometryDiff.opts
opts
Definition: geometryDiff.py:11
edm::StreamSchedule::total_passed_
int total_passed_
Definition: StreamSchedule.h:363
edm::exception_actions::ActionCodes
ActionCodes
Definition: ExceptionActions.h:11
findQualityFiles.size
size
Write out results.
Definition: findQualityFiles.py:443
MillePedeFileConverter_cfg.e
e
Definition: MillePedeFileConverter_cfg.py:37