CMS 3D CMS Logo

StreamSchedule.cc
Go to the documentation of this file.
2 
28 
30 
31 #include <algorithm>
32 #include <cassert>
33 #include <cstdlib>
34 #include <functional>
35 #include <iomanip>
36 #include <list>
37 #include <map>
38 #include <exception>
39 
40 namespace edm {
41  namespace {
42 
43  // Function template to transform each element in the input range to
44  // a value placed into the output range. The supplied function
45  // should take a const_reference to the 'input', and write to a
46  // reference to the 'output'.
47  template <typename InputIterator, typename ForwardIterator, typename Func>
48  void transform_into(InputIterator begin, InputIterator end, ForwardIterator out, Func func) {
49  for (; begin != end; ++begin, ++out)
50  func(*begin, *out);
51  }
52 
53  // Function template that takes a sequence 'from', a sequence
54  // 'to', and a callable object 'func'. It and applies
55  // transform_into to fill the 'to' sequence with the values
56  // calcuated by the callable object, taking care to fill the
57  // outupt only if all calls succeed.
58  template <typename FROM, typename TO, typename FUNC>
59  void fill_summary(FROM const& from, TO& to, FUNC func) {
60  if (to.size() != from.size()) {
61  TO temp(from.size());
62  transform_into(from.begin(), from.end(), temp.begin(), func);
63  to.swap(temp);
64  } else {
65  transform_into(from.begin(), from.end(), to.begin(), func);
66  }
67  }
68 
69  // -----------------------------
70 
71  // Here we make the trigger results inserter directly. This should
72  // probably be a utility in the WorkerRegistry or elsewhere.
73 
74  StreamSchedule::WorkerPtr makeInserter(ExceptionToActionTable const& actions,
75  std::shared_ptr<ActivityRegistry> areg,
76  std::shared_ptr<TriggerResultInserter> inserter) {
78  new edm::WorkerT<TriggerResultInserter::ModuleType>(inserter, inserter->moduleDescription(), &actions));
79  ptr->setActivityRegistry(areg);
80  return ptr;
81  }
82 
83  void initializeBranchToReadingWorker(ParameterSet const& opts,
84  ProductRegistry const& preg,
85  std::multimap<std::string, Worker*>& branchToReadingWorker) {
86  // See if any data has been marked to be deleted early (removing any duplicates)
87  auto vBranchesToDeleteEarly = opts.getUntrackedParameter<std::vector<std::string>>("canDeleteEarly");
88  if (not vBranchesToDeleteEarly.empty()) {
89  std::sort(vBranchesToDeleteEarly.begin(), vBranchesToDeleteEarly.end(), std::less<std::string>());
90  vBranchesToDeleteEarly.erase(std::unique(vBranchesToDeleteEarly.begin(), vBranchesToDeleteEarly.end()),
91  vBranchesToDeleteEarly.end());
92 
93  // Are the requested items in the product registry?
94  auto allBranchNames = preg.allBranchNames();
95  //the branch names all end with a period, which we do not want to compare with
96  for (auto& b : allBranchNames) {
97  b.resize(b.size() - 1);
98  }
99  std::sort(allBranchNames.begin(), allBranchNames.end(), std::less<std::string>());
100  std::vector<std::string> temp;
101  temp.reserve(vBranchesToDeleteEarly.size());
102 
103  std::set_intersection(vBranchesToDeleteEarly.begin(),
104  vBranchesToDeleteEarly.end(),
105  allBranchNames.begin(),
106  allBranchNames.end(),
107  std::back_inserter(temp));
108  vBranchesToDeleteEarly.swap(temp);
109  if (temp.size() != vBranchesToDeleteEarly.size()) {
110  std::vector<std::string> missingProducts;
111  std::set_difference(temp.begin(),
112  temp.end(),
113  vBranchesToDeleteEarly.begin(),
114  vBranchesToDeleteEarly.end(),
115  std::back_inserter(missingProducts));
116  LogInfo l("MissingProductsForCanDeleteEarly");
117  l << "The following products in the 'canDeleteEarly' list are not available in this job and will be ignored.";
118  for (auto const& n : missingProducts) {
119  l << "\n " << n;
120  }
121  }
122  //set placeholder for the branch, we will remove the nullptr if a
123  // module actually wants the branch.
124  for (auto const& branch : vBranchesToDeleteEarly) {
125  branchToReadingWorker.insert(std::make_pair(branch, static_cast<Worker*>(nullptr)));
126  }
127  }
128  }
129  } // namespace
130 
131  // -----------------------------
132 
133  typedef std::vector<std::string> vstring;
134 
135  // -----------------------------
136 
138  std::shared_ptr<TriggerResultInserter> inserter,
139  std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
140  std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
141  std::shared_ptr<ModuleRegistry> modReg,
142  ParameterSet& proc_pset,
143  service::TriggerNamesService const& tns,
144  PreallocationConfiguration const& prealloc,
145  ProductRegistry& preg,
146  BranchIDListHelper& branchIDListHelper,
148  std::shared_ptr<ActivityRegistry> areg,
149  std::shared_ptr<ProcessConfiguration> processConfiguration,
150  bool allowEarlyDelete,
151  StreamID streamID,
152  ProcessContext const* processContext)
153  : workerManager_(modReg, areg, actions),
154  actReg_(areg),
155  results_(new HLTGlobalStatus(tns.getTrigPaths().size())),
156  results_inserter_(),
157  trig_paths_(),
158  end_paths_(),
159  total_events_(),
160  total_passed_(),
161  number_of_unscheduled_modules_(0),
162  streamID_(streamID),
163  streamContext_(streamID_, processContext),
164  endpathsAreActive_(true),
165  skippingEvent_(false) {
166  ParameterSet const& opts = proc_pset.getUntrackedParameterSet("options", ParameterSet());
167  bool hasPath = false;
168  std::vector<std::string> const& pathNames = tns.getTrigPaths();
169  std::vector<std::string> const& endPathNames = tns.getEndPaths();
170 
171  int trig_bitpos = 0;
172  trig_paths_.reserve(pathNames.size());
173  for (auto const& trig_name : pathNames) {
174  fillTrigPath(proc_pset, preg, &prealloc, processConfiguration, trig_bitpos, trig_name, results(), endPathNames);
175  ++trig_bitpos;
176  hasPath = true;
177  }
178 
179  if (hasPath) {
180  // the results inserter stands alone
181  inserter->setTrigResultForStream(streamID.value(), results());
182 
183  results_inserter_ = makeInserter(actions, actReg_, inserter);
185  }
186 
187  // fill normal endpaths
188  int bitpos = 0;
189  end_paths_.reserve(endPathNames.size());
190  for (auto const& end_path_name : endPathNames) {
191  fillEndPath(proc_pset, preg, &prealloc, processConfiguration, bitpos, end_path_name, endPathNames);
192  ++bitpos;
193  }
194 
195  makePathStatusInserters(pathStatusInserters, endPathStatusInserters, actions);
196 
197  //See if all modules were used
198  std::set<std::string> usedWorkerLabels;
199  for (auto const& worker : allWorkers()) {
200  usedWorkerLabels.insert(worker->description().moduleLabel());
201  }
202  std::vector<std::string> modulesInConfig(proc_pset.getParameter<std::vector<std::string>>("@all_modules"));
203  std::set<std::string> modulesInConfigSet(modulesInConfig.begin(), modulesInConfig.end());
204  std::vector<std::string> unusedLabels;
205  set_difference(modulesInConfigSet.begin(),
206  modulesInConfigSet.end(),
207  usedWorkerLabels.begin(),
208  usedWorkerLabels.end(),
209  back_inserter(unusedLabels));
210  std::set<std::string> unscheduledLabels;
211  std::vector<std::string> shouldBeUsedLabels;
212  if (!unusedLabels.empty()) {
213  //Need to
214  // 1) create worker
215  // 2) if it is a WorkerT<EDProducer>, add it to our list
216  // 3) hand list to our delayed reader
217  for (auto const& label : unusedLabels) {
218  bool isTracked;
219  ParameterSet* modulePSet(proc_pset.getPSetForUpdate(label, isTracked));
220  assert(isTracked);
221  assert(modulePSet != nullptr);
223  *modulePSet, preg, &prealloc, processConfiguration, label, unscheduledLabels, shouldBeUsedLabels);
224  }
225  if (!shouldBeUsedLabels.empty()) {
226  std::ostringstream unusedStream;
227  unusedStream << "'" << shouldBeUsedLabels.front() << "'";
228  for (std::vector<std::string>::iterator itLabel = shouldBeUsedLabels.begin() + 1,
229  itLabelEnd = shouldBeUsedLabels.end();
230  itLabel != itLabelEnd;
231  ++itLabel) {
232  unusedStream << ",'" << *itLabel << "'";
233  }
234  LogInfo("path") << "The following module labels are not assigned to any path:\n" << unusedStream.str() << "\n";
235  }
236  }
237  number_of_unscheduled_modules_ = unscheduledLabels.size();
238 
239  initializeEarlyDelete(*modReg, opts, preg, allowEarlyDelete);
240 
241  } // StreamSchedule::StreamSchedule
242 
244  edm::ParameterSet const& opts,
245  edm::ProductRegistry const& preg,
246  bool allowEarlyDelete) {
247  //for now, if have a subProcess, don't allow early delete
248  // In the future we should use the SubProcess's 'keep list' to decide what can be kept
249  if (not allowEarlyDelete)
250  return;
251 
252  //see if 'canDeleteEarly' was set and if so setup the list with those products actually
253  // registered for this job
254  std::multimap<std::string, Worker*> branchToReadingWorker;
255  initializeBranchToReadingWorker(opts, preg, branchToReadingWorker);
256 
257  //If no delete early items have been specified we don't have to do anything
258  if (branchToReadingWorker.empty()) {
259  return;
260  }
261  const std::vector<std::string> kEmpty;
262  std::map<Worker*, unsigned int> reserveSizeForWorker;
263  unsigned int upperLimitOnReadingWorker = 0;
264  unsigned int upperLimitOnIndicies = 0;
265  unsigned int nUniqueBranchesToDelete = branchToReadingWorker.size();
266 
267  //talk with output modules first
268  modReg.forAllModuleHolders([&branchToReadingWorker, &nUniqueBranchesToDelete](maker::ModuleHolder* iHolder) {
269  auto comm = iHolder->createOutputModuleCommunicator();
270  if (comm) {
271  if (!branchToReadingWorker.empty()) {
272  //If an OutputModule needs a product, we can't delete it early
273  // so we should remove it from our list
274  SelectedProductsForBranchType const& kept = comm->keptProducts();
275  for (auto const& item : kept[InEvent]) {
276  BranchDescription const& desc = *item.first;
277  auto found = branchToReadingWorker.equal_range(desc.branchName());
278  if (found.first != found.second) {
279  --nUniqueBranchesToDelete;
280  branchToReadingWorker.erase(found.first, found.second);
281  }
282  }
283  }
284  }
285  });
286 
287  if (branchToReadingWorker.empty()) {
288  return;
289  }
290 
291  for (auto w : allWorkers()) {
292  //determine if this module could read a branch we want to delete early
293  auto pset = pset::Registry::instance()->getMapped(w->description().parameterSetID());
294  if (nullptr != pset) {
295  auto branches = pset->getUntrackedParameter<std::vector<std::string>>("mightGet", kEmpty);
296  if (not branches.empty()) {
297  ++upperLimitOnReadingWorker;
298  }
299  for (auto const& branch : branches) {
300  auto found = branchToReadingWorker.equal_range(branch);
301  if (found.first != found.second) {
302  ++upperLimitOnIndicies;
303  ++reserveSizeForWorker[w];
304  if (nullptr == found.first->second) {
305  found.first->second = w;
306  } else {
307  branchToReadingWorker.insert(make_pair(found.first->first, w));
308  }
309  }
310  }
311  }
312  }
313  {
314  auto it = branchToReadingWorker.begin();
315  std::vector<std::string> unusedBranches;
316  while (it != branchToReadingWorker.end()) {
317  if (it->second == nullptr) {
318  unusedBranches.push_back(it->first);
319  //erasing the object invalidates the iterator so must advance it first
320  auto temp = it;
321  ++it;
322  branchToReadingWorker.erase(temp);
323  } else {
324  ++it;
325  }
326  }
327  if (not unusedBranches.empty()) {
328  LogWarning l("UnusedProductsForCanDeleteEarly");
329  l << "The following products in the 'canDeleteEarly' list are not used in this job and will be ignored.\n"
330  " If possible, remove the producer from the job or add the product to the producer's own 'mightGet' list.";
331  for (auto const& n : unusedBranches) {
332  l << "\n " << n;
333  }
334  }
335  }
336  if (!branchToReadingWorker.empty()) {
337  earlyDeleteHelpers_.reserve(upperLimitOnReadingWorker);
338  earlyDeleteHelperToBranchIndicies_.resize(upperLimitOnIndicies, 0);
339  earlyDeleteBranchToCount_.reserve(nUniqueBranchesToDelete);
340  std::map<const Worker*, EarlyDeleteHelper*> alreadySeenWorkers;
341  std::string lastBranchName;
342  size_t nextOpenIndex = 0;
343  unsigned int* beginAddress = &(earlyDeleteHelperToBranchIndicies_.front());
344  for (auto& branchAndWorker : branchToReadingWorker) {
345  if (lastBranchName != branchAndWorker.first) {
346  //have to put back the period we removed earlier in order to get the proper name
347  BranchID bid(branchAndWorker.first + ".");
348  earlyDeleteBranchToCount_.emplace_back(bid, 0U);
349  lastBranchName = branchAndWorker.first;
350  }
351  auto found = alreadySeenWorkers.find(branchAndWorker.second);
352  if (alreadySeenWorkers.end() == found) {
353  //NOTE: we will set aside enough space in earlyDeleteHelperToBranchIndicies_ to accommodate
354  // all the branches that might be read by this worker. However, initially we will only tell the
355  // EarlyDeleteHelper about the first one. As additional branches are added via 'appendIndex' the
356  // EarlyDeleteHelper will automatically advance its internal end pointer.
357  size_t index = nextOpenIndex;
358  size_t nIndices = reserveSizeForWorker[branchAndWorker.second];
360  earlyDeleteHelpers_.emplace_back(beginAddress + index, beginAddress + index + 1, &earlyDeleteBranchToCount_);
361  branchAndWorker.second->setEarlyDeleteHelper(&(earlyDeleteHelpers_.back()));
362  alreadySeenWorkers.insert(std::make_pair(branchAndWorker.second, &(earlyDeleteHelpers_.back())));
363  nextOpenIndex += nIndices;
364  } else {
365  found->second->appendIndex(earlyDeleteBranchToCount_.size() - 1);
366  }
367  }
368 
369  //Now we can compactify the earlyDeleteHelperToBranchIndicies_ since we may have over estimated the
370  // space needed for each module
371  auto itLast = earlyDeleteHelpers_.begin();
372  for (auto it = earlyDeleteHelpers_.begin() + 1; it != earlyDeleteHelpers_.end(); ++it) {
373  if (itLast->end() != it->begin()) {
374  //figure the offset for next Worker since it hasn't been moved yet so it has the original address
375  unsigned int delta = it->begin() - itLast->end();
376  it->shiftIndexPointers(delta);
377 
379  earlyDeleteHelperToBranchIndicies_.begin() + (itLast->end() - beginAddress),
380  earlyDeleteHelperToBranchIndicies_.begin() + (it->begin() - beginAddress));
381  }
382  itLast = it;
383  }
385  earlyDeleteHelperToBranchIndicies_.begin() + (itLast->end() - beginAddress),
387 
388  //now tell the paths about the deleters
389  for (auto& p : trig_paths_) {
390  p.setEarlyDeleteHelpers(alreadySeenWorkers);
391  }
392  for (auto& p : end_paths_) {
393  p.setEarlyDeleteHelpers(alreadySeenWorkers);
394  }
396  }
397  }
398 
400  ProductRegistry& preg,
401  PreallocationConfiguration const* prealloc,
402  std::shared_ptr<ProcessConfiguration const> processConfiguration,
403  std::string const& pathName,
404  bool ignoreFilters,
405  PathWorkers& out,
406  std::vector<std::string> const& endPathNames) {
407  vstring modnames = proc_pset.getParameter<vstring>(pathName);
408  PathWorkers tmpworkers;
409 
410  unsigned int placeInPath = 0;
411  for (auto const& name : modnames) {
412  //Modules except EDFilters are set to run concurrently by default
413  bool doNotRunConcurrently = false;
415  if (name[0] == '!') {
416  filterAction = WorkerInPath::Veto;
417  } else if (name[0] == '-' or name[0] == '+') {
418  filterAction = WorkerInPath::Ignore;
419  }
420  if (name[0] == '|' or name[0] == '+') {
421  //cms.wait was specified so do not run concurrently
422  doNotRunConcurrently = true;
423  }
424 
426  if (filterAction != WorkerInPath::Normal or name[0] == '|') {
427  moduleLabel.erase(0, 1);
428  }
429 
430  bool isTracked;
431  ParameterSet* modpset = proc_pset.getPSetForUpdate(moduleLabel, isTracked);
432  if (modpset == nullptr) {
433  std::string pathType("endpath");
434  if (!search_all(endPathNames, pathName)) {
435  pathType = std::string("path");
436  }
438  << "The unknown module label \"" << moduleLabel << "\" appears in " << pathType << " \"" << pathName
439  << "\"\n please check spelling or remove that label from the path.";
440  }
441  assert(isTracked);
442 
443  Worker* worker = workerManager_.getWorker(*modpset, preg, prealloc, processConfiguration, moduleLabel);
444  if (ignoreFilters && filterAction != WorkerInPath::Ignore && worker->moduleType() == Worker::kFilter) {
445  // We have a filter on an end path, and the filter is not explicitly ignored.
446  // See if the filter is allowed.
447  std::vector<std::string> allowed_filters = proc_pset.getUntrackedParameter<vstring>("@filters_on_endpaths");
448  if (!search_all(allowed_filters, worker->description().moduleName())) {
449  // Filter is not allowed. Ignore the result, and issue a warning.
450  filterAction = WorkerInPath::Ignore;
451  LogWarning("FilterOnEndPath") << "The EDFilter '" << worker->description().moduleName()
452  << "' with module label '" << moduleLabel << "' appears on EndPath '"
453  << pathName << "'.\n"
454  << "The return value of the filter will be ignored.\n"
455  << "To suppress this warning, either remove the filter from the endpath,\n"
456  << "or explicitly ignore it in the configuration by using cms.ignore().\n";
457  }
458  }
459  bool runConcurrently = not doNotRunConcurrently;
460  if (runConcurrently && worker->moduleType() == Worker::kFilter and filterAction != WorkerInPath::Ignore) {
461  runConcurrently = false;
462  }
463  tmpworkers.emplace_back(worker, filterAction, placeInPath, runConcurrently);
464  ++placeInPath;
465  }
466 
467  out.swap(tmpworkers);
468  }
469 
471  ProductRegistry& preg,
472  PreallocationConfiguration const* prealloc,
473  std::shared_ptr<ProcessConfiguration const> processConfiguration,
474  int bitpos,
475  std::string const& name,
476  TrigResPtr trptr,
477  std::vector<std::string> const& endPathNames) {
478  PathWorkers tmpworkers;
479  fillWorkers(proc_pset, preg, prealloc, processConfiguration, name, false, tmpworkers, endPathNames);
480 
481  // an empty path will cause an extra bit that is not used
482  if (!tmpworkers.empty()) {
483  trig_paths_.emplace_back(bitpos,
484  name,
485  tmpworkers,
486  trptr,
487  actionTable(),
488  actReg_,
492  } else {
493  empty_trig_paths_.push_back(bitpos);
494  }
495  for (WorkerInPath const& workerInPath : tmpworkers) {
496  addToAllWorkers(workerInPath.getWorker());
497  }
498  }
499 
501  ProductRegistry& preg,
502  PreallocationConfiguration const* prealloc,
503  std::shared_ptr<ProcessConfiguration const> processConfiguration,
504  int bitpos,
505  std::string const& name,
506  std::vector<std::string> const& endPathNames) {
507  PathWorkers tmpworkers;
508  fillWorkers(proc_pset, preg, prealloc, processConfiguration, name, true, tmpworkers, endPathNames);
509 
510  if (!tmpworkers.empty()) {
511  //EndPaths are not supposed to stop if SkipEvent type exception happens
512  end_paths_.emplace_back(bitpos,
513  name,
514  tmpworkers,
515  TrigResPtr(),
516  actionTable(),
517  actReg_,
519  nullptr,
521  } else {
522  empty_end_paths_.push_back(bitpos);
523  }
524  for (WorkerInPath const& workerInPath : tmpworkers) {
525  addToAllWorkers(workerInPath.getWorker());
526  }
527  }
528 
530 
532 
534  Worker* found = nullptr;
535  for (auto const& worker : allWorkers()) {
536  if (worker->description().moduleLabel() == iLabel) {
537  found = worker;
538  break;
539  }
540  }
541  if (nullptr == found) {
542  return;
543  }
544 
545  iMod->replaceModuleFor(found);
546  found->beginStream(streamID_, streamContext_);
547  }
548 
549  std::vector<ModuleDescription const*> StreamSchedule::getAllModuleDescriptions() const {
550  std::vector<ModuleDescription const*> result;
551  result.reserve(allWorkers().size());
552 
553  for (auto const& worker : allWorkers()) {
554  ModuleDescription const* p = worker->descPtr();
555  result.push_back(p);
556  }
557  return result;
558  }
559 
561  WaitingTaskHolder iTask,
563  EventSetupImpl const& es,
564  ServiceToken const& serviceToken,
565  std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters) {
566  // Caught exception is propagated via WaitingTaskHolder
567  CMS_SA_ALLOW try {
568  this->resetAll();
569 
571 
572  Traits::setStreamContext(streamContext_, ep);
573  //a service may want to communicate with another service
574  ServiceRegistry::Operate guard(serviceToken);
575  Traits::preScheduleSignal(actReg_.get(), &streamContext_);
576 
577  HLTPathStatus hltPathStatus(hlt::Pass, 0);
578  for (int empty_trig_path : empty_trig_paths_) {
579  results_->at(empty_trig_path) = hltPathStatus;
580  pathStatusInserters[empty_trig_path]->setPathStatus(streamID_, hltPathStatus);
581  std::exception_ptr except = pathStatusInserterWorkers_[empty_trig_path]
584  if (except) {
585  iTask.doneWaiting(except);
586  return;
587  }
588  }
589  for (int empty_end_path : empty_end_paths_) {
590  std::exception_ptr except = endPathStatusInserterWorkers_[empty_end_path]
593  if (except) {
594  iTask.doneWaiting(except);
595  return;
596  }
597  }
598 
599  // This call takes care of the unscheduled processing.
601 
602  ++total_events_;
603 
604  //use to give priorities on an error to ones from Paths
605  auto pathErrorHolder = std::make_unique<std::atomic<std::exception_ptr*>>(nullptr);
606  auto pathErrorPtr = pathErrorHolder.get();
607  auto allPathsDone = make_waiting_task(
608  tbb::task::allocate_root(),
609  [iTask, this, serviceToken, pathError = std::move(pathErrorHolder)](std::exception_ptr const* iPtr) mutable {
610  ServiceRegistry::Operate operate(serviceToken);
611 
612  std::exception_ptr ptr;
613  if (pathError->load()) {
614  ptr = *pathError->load();
615  delete pathError->load();
616  }
617  if ((not ptr) and iPtr) {
618  ptr = *iPtr;
619  }
621  });
622  //The holder guarantees that if the paths finish before the loop ends
623  // that we do not start too soon. It also guarantees that the task will
624  // run under that condition.
625  WaitingTaskHolder allPathsHolder(allPathsDone);
626 
627  auto pathsDone = make_waiting_task(
628  tbb::task::allocate_root(),
629  [allPathsHolder, pathErrorPtr, &ep, &es, this, serviceToken](std::exception_ptr const* iPtr) mutable {
630  ServiceRegistry::Operate operate(serviceToken);
631 
632  if (iPtr) {
633  //this is used to prioritize this error over one
634  // that happens in EndPath or Accumulate
635  pathErrorPtr->store(new std::exception_ptr(*iPtr));
636  }
637  finishedPaths(*pathErrorPtr, std::move(allPathsHolder), ep, es);
638  });
639 
640  //The holder guarantees that if the paths finish before the loop ends
641  // that we do not start too soon. It also guarantees that the task will
642  // run under that condition.
643  WaitingTaskHolder taskHolder(pathsDone);
644 
645  //start end paths first so on single threaded the paths will run first
646  for (auto it = end_paths_.rbegin(), itEnd = end_paths_.rend(); it != itEnd; ++it) {
647  it->processOneOccurrenceAsync(allPathsDone, ep, es, serviceToken, streamID_, &streamContext_);
648  }
649 
650  for (auto it = trig_paths_.rbegin(), itEnd = trig_paths_.rend(); it != itEnd; ++it) {
651  it->processOneOccurrenceAsync(pathsDone, ep, es, serviceToken, streamID_, &streamContext_);
652  }
653 
654  ParentContext parentContext(&streamContext_);
656  allPathsDone, ep, es, serviceToken, streamID_, parentContext, &streamContext_);
657  } catch (...) {
658  iTask.doneWaiting(std::current_exception());
659  }
660  }
661 
662  void StreamSchedule::finishedPaths(std::atomic<std::exception_ptr*>& iExcept,
663  WaitingTaskHolder iWait,
665  EventSetupImpl const& es) {
666  if (iExcept) {
667  // Caught exception is propagated via WaitingTaskHolder
668  CMS_SA_ALLOW try { std::rethrow_exception(*(iExcept.load())); } catch (cms::Exception& e) {
673  edm::printCmsExceptionWarning("SkipEvent", e);
674  *(iExcept.load()) = std::exception_ptr();
675  } else {
676  *(iExcept.load()) = std::current_exception();
677  }
678  } catch (...) {
679  *(iExcept.load()) = std::current_exception();
680  }
681  }
682 
683  if ((not iExcept) and results_->accept()) {
684  ++total_passed_;
685  }
686 
687  if (nullptr != results_inserter_.get()) {
688  // Caught exception is propagated to the caller
689  CMS_SA_ALLOW try {
690  //Even if there was an exception, we need to allow results inserter
691  // to run since some module may be waiting on its results.
692  ParentContext parentContext(&streamContext_);
694 
695  results_inserter_->doWork<Traits>(ep, es, streamID_, parentContext, &streamContext_);
696  } catch (cms::Exception& ex) {
697  if (not iExcept) {
698  if (ex.context().empty()) {
699  std::ostringstream ost;
700  ost << "Processing Event " << ep.id();
701  ex.addContext(ost.str());
702  }
703  iExcept.store(new std::exception_ptr(std::current_exception()));
704  }
705  } catch (...) {
706  if (not iExcept) {
707  iExcept.store(new std::exception_ptr(std::current_exception()));
708  }
709  }
710  }
711  std::exception_ptr ptr;
712  if (iExcept) {
713  ptr = *iExcept.load();
714  }
715  iWait.doneWaiting(ptr);
716  }
717 
718  std::exception_ptr StreamSchedule::finishProcessOneEvent(std::exception_ptr iExcept) {
720 
721  if (iExcept) {
722  //add context information to the exception and print message
723  try {
724  convertException::wrap([&]() { std::rethrow_exception(iExcept); });
725  } catch (cms::Exception& ex) {
726  bool const cleaningUpAfterException = false;
727  if (ex.context().empty()) {
728  addContextAndPrintException("Calling function StreamSchedule::processOneEvent", ex, cleaningUpAfterException);
729  } else {
730  addContextAndPrintException("", ex, cleaningUpAfterException);
731  }
732  iExcept = std::current_exception();
733  }
734 
735  actReg_->preStreamEarlyTerminationSignal_(streamContext_, TerminationOrigin::ExceptionFromThisContext);
736  }
737  // Caught exception is propagated to the caller
738  CMS_SA_ALLOW try { Traits::postScheduleSignal(actReg_.get(), &streamContext_); } catch (...) {
739  if (not iExcept) {
740  iExcept = std::current_exception();
741  }
742  }
743  if (not iExcept) {
745  }
746 
747  return iExcept;
748  }
749 
750  void StreamSchedule::availablePaths(std::vector<std::string>& oLabelsToFill) const {
751  oLabelsToFill.reserve(trig_paths_.size());
752  std::transform(trig_paths_.begin(),
753  trig_paths_.end(),
754  std::back_inserter(oLabelsToFill),
755  std::bind(&Path::name, std::placeholders::_1));
756  }
757 
758  void StreamSchedule::modulesInPath(std::string const& iPathLabel, std::vector<std::string>& oLabelsToFill) const {
759  TrigPaths::const_iterator itFound = std::find_if(
760  trig_paths_.begin(),
761  trig_paths_.end(),
762  std::bind(std::equal_to<std::string>(), iPathLabel, std::bind(&Path::name, std::placeholders::_1)));
763  if (itFound != trig_paths_.end()) {
764  oLabelsToFill.reserve(itFound->size());
765  for (size_t i = 0; i < itFound->size(); ++i) {
766  oLabelsToFill.push_back(itFound->getWorker(i)->description().moduleLabel());
767  }
768  }
769  }
770 
772  std::vector<ModuleDescription const*>& descriptions,
773  unsigned int hint) const {
774  descriptions.clear();
775  bool found = false;
776  TrigPaths::const_iterator itFound;
777 
778  if (hint < trig_paths_.size()) {
779  itFound = trig_paths_.begin() + hint;
780  if (itFound->name() == iPathLabel)
781  found = true;
782  }
783  if (!found) {
784  // if the hint did not work, do it the slow way
785  itFound = std::find_if(
786  trig_paths_.begin(),
787  trig_paths_.end(),
788  std::bind(std::equal_to<std::string>(), iPathLabel, std::bind(&Path::name, std::placeholders::_1)));
789  if (itFound != trig_paths_.end())
790  found = true;
791  }
792  if (found) {
793  descriptions.reserve(itFound->size());
794  for (size_t i = 0; i < itFound->size(); ++i) {
795  descriptions.push_back(itFound->getWorker(i)->descPtr());
796  }
797  }
798  }
799 
801  std::vector<ModuleDescription const*>& descriptions,
802  unsigned int hint) const {
803  descriptions.clear();
804  bool found = false;
805  TrigPaths::const_iterator itFound;
806 
807  if (hint < end_paths_.size()) {
808  itFound = end_paths_.begin() + hint;
809  if (itFound->name() == iEndPathLabel)
810  found = true;
811  }
812  if (!found) {
813  // if the hint did not work, do it the slow way
814  itFound = std::find_if(
815  end_paths_.begin(),
816  end_paths_.end(),
817  std::bind(std::equal_to<std::string>(), iEndPathLabel, std::bind(&Path::name, std::placeholders::_1)));
818  if (itFound != end_paths_.end())
819  found = true;
820  }
821  if (found) {
822  descriptions.reserve(itFound->size());
823  for (size_t i = 0; i < itFound->size(); ++i) {
824  descriptions.push_back(itFound->getWorker(i)->descPtr());
825  }
826  }
827  }
828 
829  void StreamSchedule::enableEndPaths(bool active) { endpathsAreActive_ = active; }
830 
832 
833  static void fillModuleInPathSummary(Path const& path, size_t which, ModuleInPathSummary& sum) {
834  sum.timesVisited += path.timesVisited(which);
835  sum.timesPassed += path.timesPassed(which);
836  sum.timesFailed += path.timesFailed(which);
837  sum.timesExcept += path.timesExcept(which);
838  sum.moduleLabel = path.getWorker(which)->description().moduleLabel();
839  }
840 
841  static void fillPathSummary(Path const& path, PathSummary& sum) {
842  sum.name = path.name();
843  sum.bitPosition = path.bitPosition();
844  sum.timesRun += path.timesRun();
845  sum.timesPassed += path.timesPassed();
846  sum.timesFailed += path.timesFailed();
847  sum.timesExcept += path.timesExcept();
848 
849  Path::size_type sz = path.size();
850  if (sum.moduleInPathSummaries.empty()) {
851  std::vector<ModuleInPathSummary> temp(sz);
852  for (size_t i = 0; i != sz; ++i) {
854  }
855  sum.moduleInPathSummaries.swap(temp);
856  } else {
857  assert(sz == sum.moduleInPathSummaries.size());
858  for (size_t i = 0; i != sz; ++i) {
860  }
861  }
862  }
863 
864  static void fillWorkerSummaryAux(Worker const& w, WorkerSummary& sum) {
865  sum.timesVisited += w.timesVisited();
866  sum.timesRun += w.timesRun();
867  sum.timesPassed += w.timesPassed();
868  sum.timesFailed += w.timesFailed();
869  sum.timesExcept += w.timesExcept();
870  sum.moduleLabel = w.description().moduleLabel();
871  }
872 
873  static void fillWorkerSummary(Worker const* pw, WorkerSummary& sum) { fillWorkerSummaryAux(*pw, sum); }
874 
876  rep.eventSummary.totalEvents += totalEvents();
877  rep.eventSummary.totalEventsPassed += totalEventsPassed();
878  rep.eventSummary.totalEventsFailed += totalEventsFailed();
879 
880  fill_summary(trig_paths_, rep.trigPathSummaries, &fillPathSummary);
881  fill_summary(end_paths_, rep.endPathSummaries, &fillPathSummary);
882  fill_summary(allWorkers(), rep.workerSummaries, &fillWorkerSummary);
883  }
884 
886  using std::placeholders::_1;
888  for_all(trig_paths_, std::bind(&Path::clearCounters, _1));
889  for_all(end_paths_, std::bind(&Path::clearCounters, _1));
890  for_all(allWorkers(), std::bind(&Worker::clearCounters, _1));
891  }
892 
894  skippingEvent_ = false;
895  results_->reset();
896  }
897 
899 
901  //must be sure we have cleared the count first
902  for (auto& count : earlyDeleteBranchToCount_) {
903  count.count = 0;
904  }
905  //now reset based on how many helpers use that branch
907  ++(earlyDeleteBranchToCount_[index].count);
908  }
909  for (auto& helper : earlyDeleteHelpers_) {
910  helper.reset();
911  }
912  }
913 
915  std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
916  std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
918  int bitpos = 0;
919  unsigned int indexEmpty = 0;
920  unsigned int indexOfPath = 0;
921  for (auto& pathStatusInserter : pathStatusInserters) {
922  std::shared_ptr<PathStatusInserter> inserterPtr = get_underlying(pathStatusInserter);
923  WorkerPtr workerPtr(
924  new edm::WorkerT<PathStatusInserter::ModuleType>(inserterPtr, inserterPtr->moduleDescription(), &actions));
925  pathStatusInserterWorkers_.emplace_back(workerPtr);
926  workerPtr->setActivityRegistry(actReg_);
927  addToAllWorkers(workerPtr.get());
928 
929  // A little complexity here because a C++ Path object is not
930  // instantiated and put into end_paths if there are no modules
931  // on the configured path.
932  if (indexEmpty < empty_trig_paths_.size() && bitpos == empty_trig_paths_.at(indexEmpty)) {
933  ++indexEmpty;
934  } else {
935  trig_paths_.at(indexOfPath).setPathStatusInserter(inserterPtr.get(), workerPtr.get());
936  ++indexOfPath;
937  }
938  ++bitpos;
939  }
940 
941  bitpos = 0;
942  indexEmpty = 0;
943  indexOfPath = 0;
944  for (auto& endPathStatusInserter : endPathStatusInserters) {
945  std::shared_ptr<EndPathStatusInserter> inserterPtr = get_underlying(endPathStatusInserter);
946  WorkerPtr workerPtr(
947  new edm::WorkerT<EndPathStatusInserter::ModuleType>(inserterPtr, inserterPtr->moduleDescription(), &actions));
948  endPathStatusInserterWorkers_.emplace_back(workerPtr);
949  workerPtr->setActivityRegistry(actReg_);
950  addToAllWorkers(workerPtr.get());
951 
952  // A little complexity here because a C++ Path object is not
953  // instantiated and put into end_paths if there are no modules
954  // on the configured path.
955  if (indexEmpty < empty_end_paths_.size() && bitpos == empty_end_paths_.at(indexEmpty)) {
956  ++indexEmpty;
957  } else {
958  end_paths_.at(indexOfPath).setPathStatusInserter(nullptr, workerPtr.get());
959  ++indexOfPath;
960  }
961  ++bitpos;
962  }
963  }
964 } // namespace edm
edm::WorkerSummary::timesVisited
int timesVisited
Definition: TriggerReport.h:45
edm::Path::size_type
WorkersInPath::size_type size_type
Definition: Path.h:49
edm::StreamSchedule::fillEndPath
void fillEndPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, std::vector< std::string > const &endPathNames)
Definition: StreamSchedule.cc:500
edm::StreamSchedule::addToAllWorkers
void addToAllWorkers(Worker *w)
Definition: StreamSchedule.cc:898
edm::pset::Registry::instance
static Registry * instance()
Definition: Registry.cc:12
edm::fillWorkerSummaryAux
static void fillWorkerSummaryAux(Worker const &w, WorkerSummary &sum)
Definition: StreamSchedule.cc:864
edm::PathContext::PathType::kPath
edm::StreamID
Definition: StreamID.h:30
edm::WorkerManager::addToAllWorkers
void addToAllWorkers(Worker *w)
Definition: WorkerManager.cc:119
edm::TerminationOrigin::ExceptionFromThisContext
mps_fire.i
i
Definition: mps_fire.py:355
edm::StreamSchedule::endPathStatusInserterWorkers_
std::vector< edm::propagate_const< WorkerPtr > > endPathStatusInserterWorkers_
Definition: StreamSchedule.h:344
edm::WorkerSummary::moduleLabel
std::string moduleLabel
Definition: TriggerReport.h:51
edm::StreamSchedule::empty_end_paths_
std::vector< int > empty_end_paths_
Definition: StreamSchedule.h:349
edm::StreamSchedule::clearCounters
void clearCounters()
Clear all the counters in the trigger report.
Definition: StreamSchedule.cc:885
edm::ModuleInPathSummary::timesFailed
int timesFailed
Definition: TriggerReport.h:27
edm::StreamSchedule::getTriggerReport
void getTriggerReport(TriggerReport &rep) const
Definition: StreamSchedule.cc:875
MessageLogger.h
funct::false
false
Definition: Factorize.h:34
dqmiodumpmetadata.n
n
Definition: dqmiodumpmetadata.py:28
edm::StreamSchedule::beginStream
void beginStream()
Definition: StreamSchedule.cc:529
edm::StreamSchedule::results
TrigResConstPtr results() const
Definition: StreamSchedule.h:329
cms::Exception::addContext
void addContext(std::string const &context)
Definition: Exception.cc:165
edm::StreamSchedule::makePathStatusInserters
void makePathStatusInserters(std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, ExceptionToActionTable const &actions)
Definition: StreamSchedule.cc:914
WorkerT.h
edm::PathSummary::timesFailed
int timesFailed
Definition: TriggerReport.h:37
edm::ParameterSet::getUntrackedParameterSet
ParameterSet getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
Definition: ParameterSet.cc:2129
edm::EventSetupImpl
Definition: EventSetupImpl.h:44
BranchIDListHelper.h
TriggerNamesService.h
WaitingTaskHolder.h
edm::StreamSchedule::fillWorkers
void fillWorkers(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &name, bool ignoreFilters, PathWorkers &out, std::vector< std::string > const &endPathNames)
Definition: StreamSchedule.cc:399
edm::StreamSchedule::workerManager_
WorkerManager workerManager_
Definition: StreamSchedule.h:337
MicroEventContent_cff.branch
branch
Definition: MicroEventContent_cff.py:152
edm
HLT enums.
Definition: AlignableModifier.h:19
edm::ProcessContext
Definition: ProcessContext.h:27
edm::StreamSchedule::streamContext_
StreamContext streamContext_
Definition: StreamSchedule.h:371
edm::printCmsExceptionWarning
void printCmsExceptionWarning(char const *behavior, cms::Exception const &e)
Definition: ExceptionMessages.cc:25
AlCaHLTBitMon_ParallelJobs.p
p
Definition: AlCaHLTBitMon_ParallelJobs.py:153
edm::StreamSchedule::earlyDeleteBranchToCount_
std::vector< BranchToCount > earlyDeleteBranchToCount_
Definition: StreamSchedule.h:354
edm::StreamSchedule::total_events_
int total_events_
Definition: StreamSchedule.h:366
edm::ModuleDescription::moduleName
std::string const & moduleName() const
Definition: ModuleDescription.h:42
edm::LogInfo
Definition: MessageLogger.h:254
edm::WorkerManager::addToUnscheduledWorkers
void addToUnscheduledWorkers(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration > processConfiguration, std::string label, std::set< std::string > &unscheduledLabels, std::vector< std::string > &shouldBeUsedLabels)
Definition: WorkerManager.cc:42
edm::ModuleInPathSummary::timesExcept
int timesExcept
Definition: TriggerReport.h:28
TriggerTimingReport.h
edm::StreamSchedule::PathWorkers
std::vector< WorkerInPath > PathWorkers
Definition: StreamSchedule.h:164
Algorithms.h
edm::StreamSchedule::totalEventsPassed
int totalEventsPassed() const
Definition: StreamSchedule.h:232
edm::fillWorkerSummary
static void fillWorkerSummary(Worker const *pw, WorkerSummary &sum)
Definition: StreamSchedule.cc:873
edm::HLTGlobalStatus
Definition: HLTGlobalStatus.h:25
to
cms::cuda::assert
assert(be >=bs)
edm::StreamSchedule::StreamSchedule
StreamSchedule(std::shared_ptr< TriggerResultInserter > inserter, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, std::shared_ptr< ModuleRegistry >, ParameterSet &proc_pset, service::TriggerNamesService const &tns, PreallocationConfiguration const &prealloc, ProductRegistry &pregistry, BranchIDListHelper &branchIDListHelper, ExceptionToActionTable const &actions, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration > processConfiguration, bool allowEarlyDelete, StreamID streamID, ProcessContext const *processContext)
Definition: StreamSchedule.cc:137
edm::StreamSchedule::allWorkers
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
Definition: StreamSchedule.h:257
edm::maker::ModuleHolder
Definition: ModuleHolder.h:37
edm::ParameterSet::getUntrackedParameter
T getUntrackedParameter(std::string const &, T const &) const
edm::StreamID::value
unsigned int value() const
Definition: StreamID.h:42
ProductRegistry.h
edm::WorkerT
Definition: Frameworkfwd.h:54
WorkerInPath.h
edm::StreamSchedule::WorkerPtr
std::shared_ptr< Worker > WorkerPtr
Definition: StreamSchedule.h:159
edm::WaitingTaskHolder::doneWaiting
void doneWaiting(std::exception_ptr iExcept)
Definition: WaitingTaskHolder.h:75
edm::get_underlying
T & get_underlying(propagate_const< T > &)
Definition: propagate_const.h:103
newFWLiteAna.found
found
Definition: newFWLiteAna.py:118
groupFilesInBlocks.temp
list temp
Definition: groupFilesInBlocks.py:142
edm::ModuleInPathSummary::timesPassed
int timesPassed
Definition: TriggerReport.h:26
CMS_SA_ALLOW
#define CMS_SA_ALLOW
Definition: thread_safety_macros.h:5
edm::PathSummary::moduleInPathSummaries
std::vector< ModuleInPathSummary > moduleInPathSummaries
Definition: TriggerReport.h:41
edm::StreamSchedule::vstring
std::vector< std::string > vstring
Definition: StreamSchedule.h:154
edm::ModuleDescription
Definition: ModuleDescription.h:21
end
#define end
Definition: vmac.h:39
edm::PathSummary::name
std::string name
Definition: TriggerReport.h:40
edm::StreamSchedule::results_inserter_
edm::propagate_const< WorkerPtr > results_inserter_
Definition: StreamSchedule.h:342
edm::for_all
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
edm::ProductRegistry
Definition: ProductRegistry.h:34
TriggerReport.h
edm::ServiceToken
Definition: ServiceToken.h:40
edm::propagate_const::get
element_type const * get() const
Definition: propagate_const.h:64
edm::propagate_const
Definition: propagate_const.h:32
OutputModuleCommunicator.h
edm::vstring
std::vector< std::string > vstring
Definition: Schedule.cc:580
edm::Worker::kFilter
Definition: Worker.h:86
edm::StreamSchedule::number_of_unscheduled_modules_
unsigned int number_of_unscheduled_modules_
Definition: StreamSchedule.h:368
edm::WorkerSummary::timesExcept
int timesExcept
Definition: TriggerReport.h:49
edm::EventPrincipal
Definition: EventPrincipal.h:46
OutputModuleDescription.h
edm::StreamSchedule::totalEvents
int totalEvents() const
Definition: StreamSchedule.h:228
edm::StreamSchedule::moduleDescriptionsInEndPath
void moduleDescriptionsInEndPath(std::string const &iEndPathLabel, std::vector< ModuleDescription const * > &descriptions, unsigned int hint) const
Definition: StreamSchedule.cc:800
edm::maker::ModuleHolder::createOutputModuleCommunicator
virtual std::unique_ptr< OutputModuleCommunicator > createOutputModuleCommunicator()=0
edm::ModuleInPathSummary::moduleLabel
std::string moduleLabel
Definition: TriggerReport.h:30
ProcessConfiguration.h
hltMonBTagIPClient_cfi.pathName
pathName
Definition: hltMonBTagIPClient_cfi.py:5
edm::StreamSchedule::endPathsEnabled
bool endPathsEnabled() const
Definition: StreamSchedule.cc:831
w
const double w
Definition: UKUtility.cc:23
edm::convertException::wrap
auto wrap(F iFunc) -> decltype(iFunc())
Definition: ConvertException.h:19
edm::BranchID
Definition: BranchID.h:14
edm::PathSummary::bitPosition
int bitPosition
Definition: TriggerReport.h:34
edm::WorkerInPath::Ignore
Definition: WorkerInPath.h:27
edm::exception_actions::FailPath
Definition: ExceptionActions.h:11
edm::StreamSchedule::enableEndPaths
void enableEndPaths(bool active)
Definition: StreamSchedule.cc:829
edm::WorkerSummary::timesRun
int timesRun
Definition: TriggerReport.h:46
edm::OccurrenceTraits< EventPrincipal, BranchActionStreamBegin >
Definition: OccurrenceTraits.h:34
edm::StreamSchedule::replaceModule
void replaceModule(maker::ModuleHolder *iMod, std::string const &iLabel)
clone the type of module with label iLabel but configure with iPSet.
Definition: StreamSchedule.cc:533
HcalDetIdTransform::transform
unsigned transform(const HcalDetId &id, unsigned transformCode)
Definition: HcalDetIdTransform.cc:7
edm::StreamSchedule::actionTable
ExceptionToActionTable const & actionTable() const
returns the action table
Definition: StreamSchedule.h:285
edm::InEvent
Definition: BranchType.h:11
ConvertException.h
edm::exception_actions::SkipEvent
Definition: ExceptionActions.h:11
ExceptionCollector.h
ParameterSetDescription.h
b
double b
Definition: hdecay.h:118
edm::Worker
Definition: Worker.h:83
edm::WorkerInPath::Normal
Definition: WorkerInPath.h:27
edm::make_waiting_task
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:87
edm::StreamSchedule::results_
edm::propagate_const< TrigResPtr > results_
Definition: StreamSchedule.h:340
edm::StreamSchedule::empty_trig_paths_
std::vector< int > empty_trig_paths_
Definition: StreamSchedule.h:348
edm::WorkerSummary
Definition: TriggerReport.h:44
edm::StreamSchedule::initializeEarlyDelete
void initializeEarlyDelete(ModuleRegistry &modReg, edm::ParameterSet const &opts, edm::ProductRegistry const &preg, bool allowEarlyDelete)
Definition: StreamSchedule.cc:243
edm::ModuleRegistry::forAllModuleHolders
void forAllModuleHolders(F iFunc)
Definition: ModuleRegistry.h:52
mitigatedMETSequence_cff.U
U
Definition: mitigatedMETSequence_cff.py:36
edm::ParentContext
Definition: ParentContext.h:27
AlCaHLTBitMon_QueryRunRegistry.string
string
Definition: AlCaHLTBitMon_QueryRunRegistry.py:256
edm::WorkerInPath::FilterAction
FilterAction
Definition: WorkerInPath.h:27
edm::WorkerManager::beginStream
void beginStream(StreamID iID, StreamContext &streamContext)
Definition: WorkerManager.cc:105
edm::LogWarning
Definition: MessageLogger.h:141
edm::WorkerSummary::timesPassed
int timesPassed
Definition: TriggerReport.h:47
funct::true
true
Definition: Factorize.h:173
edm::StreamSchedule::moduleDescriptionsInPath
void moduleDescriptionsInPath(std::string const &iPathLabel, std::vector< ModuleDescription const * > &descriptions, unsigned int hint) const
Definition: StreamSchedule.cc:771
StreamSchedule.h
edm::ParameterSet
Definition: ParameterSet.h:36
cms::Exception::context
std::list< std::string > const & context() const
Definition: Exception.cc:147
ParameterSet
Definition: Functions.h:16
edm::WaitingTaskHolder
Definition: WaitingTaskHolder.h:30
edm::StreamSchedule::skippingEvent_
std::atomic< bool > skippingEvent_
Definition: StreamSchedule.h:373
edm::StreamSchedule::actReg_
std::shared_ptr< ActivityRegistry > actReg_
Definition: StreamSchedule.h:338
KineDebug3::count
void count()
Definition: KinematicConstrainedVertexUpdatorT.h:21
LuminosityBlockProcessingStatus.h
dumpMFGeometry_cfg.delta
delta
Definition: dumpMFGeometry_cfg.py:25
helper
Definition: helper.py:1
TriggerResultInserter.h
edm::Path
Definition: Path.h:44
edm::HLTPathStatus
Definition: HLTPathStatus.h:33
edm::BranchIDListHelper
Definition: BranchIDListHelper.h:15
edm::addContextAndPrintException
void addContextAndPrintException(char const *context, cms::Exception &ex, bool disablePrint)
Definition: ExceptionHelpers.cc:11
edm::StreamSchedule::fillTrigPath
void fillTrigPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, TrigResPtr, std::vector< std::string > const &endPathNames)
Definition: StreamSchedule.cc:470
edm::WorkerManager::processAccumulatorsAsync
void processAccumulatorsAsync(WaitingTask *task, typename T::MyPrincipal const &ep, EventSetupImpl const &es, ServiceToken const &token, StreamID streamID, ParentContext const &parentContext, typename T::Context const *context)
Definition: WorkerManager.h:149
writedatasetfile.action
action
Definition: writedatasetfile.py:8
cuy.rep
rep
Definition: cuy.py:1190
edm::StreamSchedule::end_paths_
TrigPaths end_paths_
Definition: StreamSchedule.h:347
edm::BranchDescription::branchName
std::string const & branchName() const
Definition: BranchDescription.h:119
B2GTnPMonitor_cfi.item
item
Definition: B2GTnPMonitor_cfi.py:147
edm::fillPathSummary
static void fillPathSummary(Path const &path, PathSummary &sum)
Definition: StreamSchedule.cc:841
edm::ExceptionToActionTable
Definition: ExceptionActions.h:16
edm::Worker::description
ModuleDescription const & description() const
Definition: Worker.h:190
TrackCollections2monitor_cff.func
func
Definition: TrackCollections2monitor_cff.py:359
PathContext.h
PathStatusInserter.h
edm::TriggerReport
Definition: TriggerReport.h:56
edm::StreamSchedule::finishedPaths
void finishedPaths(std::atomic< std::exception_ptr * > &, WaitingTaskHolder, EventPrincipal &ep, EventSetupImpl const &es)
Definition: StreamSchedule.cc:662
edm::StreamSchedule::streamID_
StreamID streamID_
Definition: StreamSchedule.h:370
edm::Worker::moduleType
virtual Types moduleType() const =0
cmsLHEtoEOSManager.l
l
Definition: cmsLHEtoEOSManager.py:193
EndPathStatusInserter.h
edm::StreamSchedule::getAllModuleDescriptions
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
Definition: StreamSchedule.cc:549
edm::service::TriggerNamesService
Definition: TriggerNamesService.h:42
Registry.h
edm::ParameterSet::getParameter
T getParameter(std::string const &) const
edm::PreallocationConfiguration
Definition: PreallocationConfiguration.h:27
edm::ModuleInPathSummary::timesVisited
int timesVisited
Definition: TriggerReport.h:25
patCandidatesForDimuonsSequences_cff.pathNames
pathNames
Definition: patCandidatesForDimuonsSequences_cff.py:109
edm::StreamSchedule::resetAll
void resetAll()
Definition: StreamSchedule.cc:893
ModuleRegistry.h
edm::Worker::clearCounters
void clearCounters()
Definition: Worker.h:217
edm::StreamSchedule::earlyDeleteHelpers_
std::vector< EarlyDeleteHelper > earlyDeleteHelpers_
Definition: StreamSchedule.h:364
edm::maker::ModuleHolder::replaceModuleFor
virtual void replaceModuleFor(Worker *) const =0
edm::SelectedProductsForBranchType
std::array< SelectedProducts, NumBranchTypes > SelectedProductsForBranchType
Definition: SelectedProducts.h:13
eostools.move
def move(src, dest)
Definition: eostools.py:511
edm::ModuleInPathSummary
Definition: TriggerReport.h:24
edm::ModuleRegistry
Definition: ModuleRegistry.h:40
edm::StreamSchedule::TrigResPtr
std::shared_ptr< HLTGlobalStatus > TrigResPtr
Definition: StreamSchedule.h:157
tier0.unique
def unique(seq, keepstr=True)
Definition: tier0.py:24
edm::Path::name
std::string const & name() const
Definition: Path.h:80
edm::service::TriggerNamesService::getTrigPaths
Strings const & getTrigPaths() const
Definition: TriggerNamesService.h:55
edm::PathSummary
Definition: TriggerReport.h:33
edm::fillModuleInPathSummary
static void fillModuleInPathSummary(Path const &path, size_t which, ModuleInPathSummary &sum)
Definition: StreamSchedule.cc:833
ModuleHolder.h
edm::PathSummary::timesRun
int timesRun
Definition: TriggerReport.h:35
edm::service::TriggerNamesService::getEndPaths
Strings const & getEndPaths() const
Definition: TriggerNamesService.h:74
edm::WorkerSummary::timesFailed
int timesFailed
Definition: TriggerReport.h:48
edm::StreamSchedule::processOneEventAsync
void processOneEventAsync(WaitingTaskHolder iTask, EventPrincipal &ep, EventSetupImpl const &es, ServiceToken const &token, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters)
Definition: StreamSchedule.cc:560
Exception
Definition: hltDiff.cc:246
edm::PathSummary::timesPassed
int timesPassed
Definition: TriggerReport.h:36
edm::StreamSchedule::trig_paths_
TrigPaths trig_paths_
Definition: StreamSchedule.h:346
edm::WorkerInPath
Definition: WorkerInPath.h:25
edm::PathContext::PathType::kEndPath
edm::PathSummary::timesExcept
int timesExcept
Definition: TriggerReport.h:38
edm::WorkerInPath::Veto
Definition: WorkerInPath.h:27
edm::StreamSchedule::resetEarlyDelete
void resetEarlyDelete()
Definition: StreamSchedule.cc:900
Skims_PA_cff.name
name
Definition: Skims_PA_cff.py:17
or
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
edm::WorkerManager::setupOnDemandSystem
void setupOnDemandSystem(Principal &principal, EventSetupImpl const &es)
Definition: WorkerManager.cc:125
edm::ExceptionToActionTable::find
exception_actions::ActionCodes find(const std::string &category) const
Definition: ExceptionActions.cc:85
edm::WorkerManager::endStream
void endStream(StreamID iID, StreamContext &streamContext)
Definition: WorkerManager.cc:111
AlignmentPI::index
index
Definition: AlignmentPayloadInspectorHelper.h:46
edm::StreamSchedule::endStream
void endStream()
Definition: StreamSchedule.cc:531
edm::WorkerManager::getWorker
Worker * getWorker(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &label)
Definition: WorkerManager.cc:33
edm::BranchDescription
Definition: BranchDescription.h:32
MillePedeFileConverter_cfg.out
out
Definition: MillePedeFileConverter_cfg.py:31
actions
roAction_t actions[nactions]
Definition: GenABIO.cc:181
mps_fire.result
result
Definition: mps_fire.py:303
edm::search_all
bool search_all(ForwardSequence const &s, Datum const &d)
Definition: Algorithms.h:36
cms::Exception
Definition: Exception.h:70
castor_dqm_sourceclient_file_cfg.path
path
Definition: castor_dqm_sourceclient_file_cfg.py:37
edm::StreamSchedule::streamID
StreamID streamID() const
Definition: StreamSchedule.h:201
ParameterSet.h
HerwigMaxPtPartonFilter_cfi.moduleLabel
moduleLabel
Definition: HerwigMaxPtPartonFilter_cfi.py:4
edm::StreamSchedule::pathStatusInserterWorkers_
std::vector< edm::propagate_const< WorkerPtr > > pathStatusInserterWorkers_
Definition: StreamSchedule.h:343
edm::hlt::Pass
accept
Definition: HLTenums.h:18
edm::StreamSchedule::availablePaths
void availablePaths(std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill the labels for all paths in the process
Definition: StreamSchedule.cc:750
edm::exception_actions::IgnoreCompletely
Definition: ExceptionActions.h:11
edm::StreamSchedule::earlyDeleteHelperToBranchIndicies_
std::vector< unsigned int > earlyDeleteHelperToBranchIndicies_
Definition: StreamSchedule.h:361
Factory.h
edm::Path::clearCounters
void clearCounters()
Definition: Path.cc:196
edm::ParameterSet::getPSetForUpdate
ParameterSet * getPSetForUpdate(std::string const &name, bool &isTracked)
Definition: ParameterSet.cc:450
edm::StreamSchedule::finishProcessOneEvent
std::exception_ptr finishProcessOneEvent(std::exception_ptr)
Definition: StreamSchedule.cc:718
eostools.which
def which(cmd)
Definition: eostools.py:336
edm::ServiceRegistry::Operate
Definition: ServiceRegistry.h:40
edm::StreamSchedule::totalEventsFailed
int totalEventsFailed() const
Definition: StreamSchedule.h:236
edm::pset::Registry::getMapped
bool getMapped(key_type const &k, value_type &result) const
Definition: Registry.cc:17
edm::errors::Configuration
Definition: EDMException.h:36
edm::StreamSchedule::modulesInPath
void modulesInPath(std::string const &iPathLabel, std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel
Definition: StreamSchedule.cc:758
SiStripBadComponentsDQMServiceTemplate_cfg.ep
ep
Definition: SiStripBadComponentsDQMServiceTemplate_cfg.py:86
begin
#define begin
Definition: vmac.h:32
label
const char * label
Definition: PFTauDecayModeTools.cc:11
edm::StreamSchedule::endpathsAreActive_
volatile bool endpathsAreActive_
Definition: StreamSchedule.h:372
muonDTDigis_cfi.pset
pset
Definition: muonDTDigis_cfi.py:27
geometryDiff.opts
opts
Definition: geometryDiff.py:11
edm::StreamSchedule::total_passed_
int total_passed_
Definition: StreamSchedule.h:367
edm::exception_actions::ActionCodes
ActionCodes
Definition: ExceptionActions.h:11
findQualityFiles.size
size
Write out results.
Definition: findQualityFiles.py:443
MillePedeFileConverter_cfg.e
e
Definition: MillePedeFileConverter_cfg.py:37