CMS 3D CMS Logo

StreamSchedule.cc
Go to the documentation of this file.
2 
32 
34 #include "processEDAliases.h"
35 
36 #include <algorithm>
37 #include <cassert>
38 #include <cstdlib>
39 #include <functional>
40 #include <iomanip>
41 #include <limits>
42 #include <list>
43 #include <map>
44 #include <fmt/format.h>
45 
46 namespace edm {
47 
48  namespace {
49 
50  // Function template to transform each element in the input range to
51  // a value placed into the output range. The supplied function
52  // should take a const_reference to the 'input', and write to a
53  // reference to the 'output'.
54  template <typename InputIterator, typename ForwardIterator, typename Func>
55  void transform_into(InputIterator begin, InputIterator end, ForwardIterator out, Func func) {
56  for (; begin != end; ++begin, ++out)
57  func(*begin, *out);
58  }
59 
60  // Function template that takes a sequence 'from', a sequence
61  // 'to', and a callable object 'func'. It and applies
62  // transform_into to fill the 'to' sequence with the values
63  // calcuated by the callable object, taking care to fill the
64  // outupt only if all calls succeed.
65  template <typename FROM, typename TO, typename FUNC>
66  void fill_summary(FROM const& from, TO& to, FUNC func) {
67  if (to.size() != from.size()) {
68  TO temp(from.size());
69  transform_into(from.begin(), from.end(), temp.begin(), func);
70  to.swap(temp);
71  } else {
72  transform_into(from.begin(), from.end(), to.begin(), func);
73  }
74  }
75 
76  class BeginStreamTraits {
77  public:
78  static void preScheduleSignal(ActivityRegistry* activityRegistry, StreamContext const* streamContext) {
79  activityRegistry->preBeginStreamSignal_(*streamContext);
80  }
81  static void postScheduleSignal(ActivityRegistry* activityRegistry, StreamContext const* streamContext) {
82  activityRegistry->postBeginStreamSignal_(*streamContext);
83  }
84  };
85 
86  class EndStreamTraits {
87  public:
88  static void preScheduleSignal(ActivityRegistry* activityRegistry, StreamContext const* streamContext) {
89  activityRegistry->preEndStreamSignal_(*streamContext);
90  }
91  static void postScheduleSignal(ActivityRegistry* activityRegistry, StreamContext const* streamContext) {
92  activityRegistry->postEndStreamSignal_(*streamContext);
93  }
94  };
95 
96  // -----------------------------
97 
98  // Here we make the trigger results inserter directly. This should
99  // probably be a utility in the WorkerRegistry or elsewhere.
100 
101  StreamSchedule::WorkerPtr makeInserter(ExceptionToActionTable const& actions,
102  std::shared_ptr<ActivityRegistry> areg,
103  std::shared_ptr<TriggerResultInserter> inserter) {
105  new edm::WorkerT<TriggerResultInserter::ModuleType>(inserter, inserter->moduleDescription(), &actions));
106  ptr->setActivityRegistry(areg);
107  return ptr;
108  }
109 
110  void initializeBranchToReadingWorker(std::vector<std::string> const& branchesToDeleteEarly,
111  ProductRegistry const& preg,
112  std::multimap<std::string, Worker*>& branchToReadingWorker) {
113  auto vBranchesToDeleteEarly = branchesToDeleteEarly;
114  // Remove any duplicates
115  std::sort(vBranchesToDeleteEarly.begin(), vBranchesToDeleteEarly.end(), std::less<std::string>());
116  vBranchesToDeleteEarly.erase(std::unique(vBranchesToDeleteEarly.begin(), vBranchesToDeleteEarly.end()),
117  vBranchesToDeleteEarly.end());
118 
119  // Are the requested items in the product registry?
120  auto allBranchNames = preg.allBranchNames();
121  //the branch names all end with a period, which we do not want to compare with
122  for (auto& b : allBranchNames) {
123  b.resize(b.size() - 1);
124  }
125  std::sort(allBranchNames.begin(), allBranchNames.end(), std::less<std::string>());
126  std::vector<std::string> temp;
127  temp.reserve(vBranchesToDeleteEarly.size());
128 
129  std::set_intersection(vBranchesToDeleteEarly.begin(),
130  vBranchesToDeleteEarly.end(),
131  allBranchNames.begin(),
132  allBranchNames.end(),
133  std::back_inserter(temp));
134  vBranchesToDeleteEarly.swap(temp);
135  if (temp.size() != vBranchesToDeleteEarly.size()) {
136  std::vector<std::string> missingProducts;
137  std::set_difference(temp.begin(),
138  temp.end(),
139  vBranchesToDeleteEarly.begin(),
140  vBranchesToDeleteEarly.end(),
141  std::back_inserter(missingProducts));
142  LogInfo l("MissingProductsForCanDeleteEarly");
143  l << "The following products in the 'canDeleteEarly' list are not available in this job and will be ignored.";
144  for (auto const& n : missingProducts) {
145  l << "\n " << n;
146  }
147  }
148  //set placeholder for the branch, we will remove the nullptr if a
149  // module actually wants the branch.
150  for (auto const& branch : vBranchesToDeleteEarly) {
151  branchToReadingWorker.insert(std::make_pair(branch, static_cast<Worker*>(nullptr)));
152  }
153  }
154 
155  Worker* getWorker(std::string const& moduleLabel,
156  ParameterSet& proc_pset,
157  WorkerManager& workerManager,
158  ProductRegistry& preg,
159  PreallocationConfiguration const* prealloc,
160  std::shared_ptr<ProcessConfiguration const> processConfiguration) {
161  bool isTracked;
162  ParameterSet* modpset = proc_pset.getPSetForUpdate(moduleLabel, isTracked);
163  if (modpset == nullptr) {
164  return nullptr;
165  }
166  assert(isTracked);
167 
168  return workerManager.getWorker(*modpset, preg, prealloc, processConfiguration, moduleLabel);
169  }
170 
171  // If ConditionalTask modules exist in the container of module
172  // names, returns the range (std::pair) for the modules. The range
173  // excludes the special markers '#' (right before the
174  // ConditionalTask modules) and '@' (last element).
175  // If the module name container does not contain ConditionalTask
176  // modules, returns std::pair of end iterators.
177  template <typename T>
178  auto findConditionalTaskModulesRange(T& modnames) {
179  auto beg = std::find(modnames.begin(), modnames.end(), "#");
180  if (beg == modnames.end()) {
181  return std::pair(modnames.end(), modnames.end());
182  }
183  return std::pair(beg + 1, std::prev(modnames.end()));
184  }
185 
186  std::optional<std::string> findBestMatchingAlias(
187  std::unordered_multimap<std::string, edm::BranchDescription const*> const& conditionalModuleBranches,
188  std::unordered_multimap<std::string, StreamSchedule::AliasInfo> const& aliasMap,
189  std::string const& productModuleLabel,
190  ConsumesInfo const& consumesInfo) {
191  std::optional<std::string> best;
192  int wildcardsInBest = std::numeric_limits<int>::max();
193  bool bestIsAmbiguous = false;
194 
195  auto updateBest = [&best, &wildcardsInBest, &bestIsAmbiguous](
196  std::string const& label, bool instanceIsWildcard, bool typeIsWildcard) {
197  int const wildcards = static_cast<int>(instanceIsWildcard) + static_cast<int>(typeIsWildcard);
198  if (wildcards == 0) {
199  bestIsAmbiguous = false;
200  return true;
201  }
202  if (not best or wildcards < wildcardsInBest) {
203  best = label;
204  wildcardsInBest = wildcards;
205  bestIsAmbiguous = false;
206  } else if (best and *best != label and wildcardsInBest == wildcards) {
207  bestIsAmbiguous = true;
208  }
209  return false;
210  };
211 
212  auto findAlias = aliasMap.equal_range(productModuleLabel);
213  for (auto it = findAlias.first; it != findAlias.second; ++it) {
214  std::string const& aliasInstanceLabel =
215  it->second.instanceLabel != "*" ? it->second.instanceLabel : it->second.originalInstanceLabel;
216  bool const instanceIsWildcard = (aliasInstanceLabel == "*");
217  if (instanceIsWildcard or consumesInfo.instance() == aliasInstanceLabel) {
218  bool const typeIsWildcard = it->second.friendlyClassName == "*";
219  if (typeIsWildcard or (consumesInfo.type().friendlyClassName() == it->second.friendlyClassName)) {
220  if (updateBest(it->second.originalModuleLabel, instanceIsWildcard, typeIsWildcard)) {
221  return it->second.originalModuleLabel;
222  }
223  } else if (consumesInfo.kindOfType() == ELEMENT_TYPE) {
224  //consume is a View so need to do more intrusive search
225  //find matching branches in module
226  auto branches = conditionalModuleBranches.equal_range(productModuleLabel);
227  for (auto itBranch = branches.first; itBranch != branches.second; ++it) {
228  if (typeIsWildcard or itBranch->second->productInstanceName() == it->second.originalInstanceLabel) {
229  if (productholderindexhelper::typeIsViewCompatible(consumesInfo.type(),
230  TypeID(itBranch->second->wrappedType().typeInfo()),
231  itBranch->second->className())) {
232  if (updateBest(it->second.originalModuleLabel, instanceIsWildcard, typeIsWildcard)) {
233  return it->second.originalModuleLabel;
234  }
235  }
236  }
237  }
238  }
239  }
240  }
241  if (bestIsAmbiguous) {
243  << "Encountered ambiguity when trying to find a best-matching alias for\n"
244  << " friendly class name " << consumesInfo.type().friendlyClassName() << "\n"
245  << " module label " << productModuleLabel << "\n"
246  << " product instance name " << consumesInfo.instance() << "\n"
247  << "when processing EDAliases for modules in ConditionalTasks. Two aliases have the same number of "
248  "wildcards ("
249  << wildcardsInBest << ")";
250  }
251  return best;
252  }
253  } // namespace
254 
255  // -----------------------------
256 
257  typedef std::vector<std::string> vstring;
258 
259  // -----------------------------
260 
262  public:
264 
266  ProductRegistry& preg,
267  PreallocationConfiguration const* prealloc,
268  std::shared_ptr<ProcessConfiguration const> processConfiguration,
269  WorkerManager& workerManagerLumisAndEvents,
270  std::vector<std::string> const& trigPathNames) {
271  std::unordered_set<std::string> allConditionalMods;
272  for (auto const& pathName : trigPathNames) {
273  auto const modnames = proc_pset.getParameter<vstring>(pathName);
274 
275  //Pull out ConditionalTask modules
276  auto condRange = findConditionalTaskModulesRange(modnames);
277  if (condRange.first == condRange.second)
278  continue;
279 
280  //the last entry should be ignored since it is required to be "@"
281  allConditionalMods.insert(condRange.first, condRange.second);
282  }
283 
284  for (auto const& cond : allConditionalMods) {
285  //force the creation of the conditional modules so alias check can work
286  (void)getWorker(cond, proc_pset, workerManagerLumisAndEvents, preg, prealloc, processConfiguration);
287  }
288 
289  fillAliasMap(proc_pset, allConditionalMods);
290  processSwitchEDAliases(proc_pset, preg, *processConfiguration, allConditionalMods);
291 
292  //find branches created by the conditional modules
293  for (auto const& prod : preg.productList()) {
294  if (allConditionalMods.find(prod.first.moduleLabel()) != allConditionalMods.end()) {
295  conditionalModsBranches_.emplace(prod.first.moduleLabel(), &prod.second);
296  }
297  }
298  }
299 
300  std::unordered_multimap<std::string, AliasInfo> const& aliasMap() const { return aliasMap_; }
301 
302  std::unordered_multimap<std::string, edm::BranchDescription const*> conditionalModuleBranches(
303  std::unordered_set<std::string> const& conditionalmods) const {
304  std::unordered_multimap<std::string, edm::BranchDescription const*> ret;
305  for (auto const& mod : conditionalmods) {
306  auto range = conditionalModsBranches_.equal_range(mod);
307  ret.insert(range.first, range.second);
308  }
309  return ret;
310  }
311 
312  private:
313  void fillAliasMap(ParameterSet const& proc_pset, std::unordered_set<std::string> const& allConditionalMods) {
314  auto aliases = proc_pset.getParameter<std::vector<std::string>>("@all_aliases");
315  std::string const star("*");
316  for (auto const& alias : aliases) {
317  auto info = proc_pset.getParameter<edm::ParameterSet>(alias);
318  auto aliasedToModuleLabels = info.getParameterNames();
319  for (auto const& mod : aliasedToModuleLabels) {
320  if (not mod.empty() and mod[0] != '@' and allConditionalMods.find(mod) != allConditionalMods.end()) {
321  auto aliasVPSet = info.getParameter<std::vector<edm::ParameterSet>>(mod);
322  for (auto const& aliasPSet : aliasVPSet) {
323  std::string type = star;
324  std::string instance = star;
325  std::string originalInstance = star;
326  if (aliasPSet.exists("type")) {
327  type = aliasPSet.getParameter<std::string>("type");
328  }
329  if (aliasPSet.exists("toProductInstance")) {
330  instance = aliasPSet.getParameter<std::string>("toProductInstance");
331  }
332  if (aliasPSet.exists("fromProductInstance")) {
333  originalInstance = aliasPSet.getParameter<std::string>("fromProductInstance");
334  }
335 
336  aliasMap_.emplace(alias, AliasInfo{type, instance, originalInstance, mod});
337  }
338  }
339  }
340  }
341  }
342 
343  void processSwitchEDAliases(ParameterSet const& proc_pset,
344  ProductRegistry& preg,
345  ProcessConfiguration const& processConfiguration,
346  std::unordered_set<std::string> const& allConditionalMods) {
347  auto const& all_modules = proc_pset.getParameter<std::vector<std::string>>("@all_modules");
348  std::vector<std::string> switchEDAliases;
349  for (auto const& module : all_modules) {
350  auto const& mod_pset = proc_pset.getParameter<edm::ParameterSet>(module);
351  if (mod_pset.getParameter<std::string>("@module_type") == "SwitchProducer") {
352  auto const& all_cases = mod_pset.getParameter<std::vector<std::string>>("@all_cases");
353  for (auto const& case_label : all_cases) {
354  auto range = aliasMap_.equal_range(case_label);
355  if (range.first != range.second) {
356  switchEDAliases.push_back(case_label);
357  }
358  }
359  }
360  }
362  switchEDAliases, allConditionalMods, proc_pset, processConfiguration.processName(), preg);
363  }
364 
365  std::unordered_multimap<std::string, AliasInfo> aliasMap_;
366  std::unordered_multimap<std::string, edm::BranchDescription const*> conditionalModsBranches_;
367  };
368 
369  // -----------------------------
370 
372  std::shared_ptr<TriggerResultInserter> inserter,
373  std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
374  std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
375  std::shared_ptr<ModuleRegistry> modReg,
376  ParameterSet& proc_pset,
377  service::TriggerNamesService const& tns,
378  PreallocationConfiguration const& prealloc,
379  ProductRegistry& preg,
381  std::shared_ptr<ActivityRegistry> areg,
382  std::shared_ptr<ProcessConfiguration const> processConfiguration,
383  StreamID streamID,
384  ProcessContext const* processContext)
385  : workerManagerBeginEnd_(modReg, areg, actions),
386  workerManagerRuns_(modReg, areg, actions),
387  workerManagerLumisAndEvents_(modReg, areg, actions),
388  actReg_(areg),
389  results_(new HLTGlobalStatus(tns.getTrigPaths().size())),
390  results_inserter_(),
391  trig_paths_(),
392  end_paths_(),
393  total_events_(),
394  total_passed_(),
395  number_of_unscheduled_modules_(0),
396  streamID_(streamID),
397  streamContext_(streamID_, processContext) {
398  bool hasPath = false;
399  std::vector<std::string> const& pathNames = tns.getTrigPaths();
400  std::vector<std::string> const& endPathNames = tns.getEndPaths();
401 
402  ConditionalTaskHelper conditionalTaskHelper(
403  proc_pset, preg, &prealloc, processConfiguration, workerManagerLumisAndEvents_, pathNames);
404  std::unordered_set<std::string> conditionalModules;
405 
406  int trig_bitpos = 0;
407  trig_paths_.reserve(pathNames.size());
408  for (auto const& trig_name : pathNames) {
409  fillTrigPath(proc_pset,
410  preg,
411  &prealloc,
412  processConfiguration,
413  trig_bitpos,
414  trig_name,
415  results(),
416  endPathNames,
417  conditionalTaskHelper,
418  conditionalModules);
419  ++trig_bitpos;
420  hasPath = true;
421  }
422 
423  if (hasPath) {
424  // the results inserter stands alone
425  inserter->setTrigResultForStream(streamID.value(), results());
426 
427  results_inserter_ = makeInserter(actions, actReg_, inserter);
429  }
430 
431  // fill normal endpaths
432  int bitpos = 0;
433  end_paths_.reserve(endPathNames.size());
434  for (auto const& end_path_name : endPathNames) {
435  fillEndPath(proc_pset,
436  preg,
437  &prealloc,
438  processConfiguration,
439  bitpos,
440  end_path_name,
441  endPathNames,
442  conditionalTaskHelper,
443  conditionalModules);
444  ++bitpos;
445  }
446 
447  makePathStatusInserters(pathStatusInserters, endPathStatusInserters, actions);
448 
449  //See if all modules were used
450  std::set<std::string> usedWorkerLabels;
451  for (auto const& worker : allWorkersLumisAndEvents()) {
452  usedWorkerLabels.insert(worker->description()->moduleLabel());
453  }
454  std::vector<std::string> modulesInConfig(proc_pset.getParameter<std::vector<std::string>>("@all_modules"));
455  std::set<std::string> modulesInConfigSet(modulesInConfig.begin(), modulesInConfig.end());
456  std::vector<std::string> unusedLabels;
457  set_difference(modulesInConfigSet.begin(),
458  modulesInConfigSet.end(),
459  usedWorkerLabels.begin(),
460  usedWorkerLabels.end(),
461  back_inserter(unusedLabels));
462  std::set<std::string> unscheduledLabels;
463  std::vector<std::string> shouldBeUsedLabels;
464  if (!unusedLabels.empty()) {
465  //Need to
466  // 1) create worker
467  // 2) if it is a WorkerT<EDProducer>, add it to our list
468  // 3) hand list to our delayed reader
469  for (auto const& label : unusedLabels) {
470  bool isTracked;
471  ParameterSet* modulePSet(proc_pset.getPSetForUpdate(label, isTracked));
472  assert(isTracked);
473  assert(modulePSet != nullptr);
475  *modulePSet, preg, &prealloc, processConfiguration, label, unscheduledLabels, shouldBeUsedLabels);
476  }
477  if (!shouldBeUsedLabels.empty()) {
478  std::ostringstream unusedStream;
479  unusedStream << "'" << shouldBeUsedLabels.front() << "'";
480  for (std::vector<std::string>::iterator itLabel = shouldBeUsedLabels.begin() + 1,
481  itLabelEnd = shouldBeUsedLabels.end();
482  itLabel != itLabelEnd;
483  ++itLabel) {
484  unusedStream << ",'" << *itLabel << "'";
485  }
486  LogInfo("path") << "The following module labels are not assigned to any path:\n" << unusedStream.str() << "\n";
487  }
488  }
489  number_of_unscheduled_modules_ = unscheduledLabels.size();
490 
491  // Print conditional modules that were not consumed in any of their associated Paths
492  if (streamID.value() == 0 and not conditionalModules.empty()) {
493  // Intersection of unscheduled and ConditionalTask modules gives
494  // directly the set of conditional modules that were not
495  // consumed by anything in the Paths associated to the
496  // corresponding ConditionalTask.
497  std::vector<std::string_view> labelsToPrint;
498  std::copy_if(
499  unscheduledLabels.begin(),
500  unscheduledLabels.end(),
501  std::back_inserter(labelsToPrint),
502  [&conditionalModules](auto const& lab) { return conditionalModules.find(lab) != conditionalModules.end(); });
503 
504  if (not labelsToPrint.empty()) {
505  edm::LogWarning log("NonConsumedConditionalModules");
506  log << "The following modules were part of some ConditionalTask, but were not\n"
507  << "consumed by any other module in any of the Paths to which the ConditionalTask\n"
508  << "was associated. Perhaps they should be either removed from the\n"
509  << "job, or moved to a Task to make it explicit they are unscheduled.\n";
510  for (auto const& modLabel : labelsToPrint) {
511  log.format("\n {}", modLabel);
512  }
513  }
514  }
515 
516  for (auto const& worker : allWorkersLumisAndEvents()) {
517  std::string const& moduleLabel = worker->description()->moduleLabel();
518 
519  // The new worker pointers will be null for the TriggerResultsInserter, PathStatusInserter, and
520  // EndPathStatusInserter because there are no ParameterSets for those in the configuration.
521  // We could add special code to create workers for those, but instead we skip them because they
522  // do not have beginStream, endStream, or run/lumi begin/end stream transition functions.
523 
524  Worker* workerBeginEnd =
525  getWorker(moduleLabel, proc_pset, workerManagerBeginEnd_, preg, &prealloc, processConfiguration);
526  if (workerBeginEnd) {
527  workerManagerBeginEnd_.addToAllWorkers(workerBeginEnd);
528  }
529 
530  Worker* workerRuns = getWorker(moduleLabel, proc_pset, workerManagerRuns_, preg, &prealloc, processConfiguration);
531  if (workerRuns) {
533  }
534  }
535 
536  } // StreamSchedule::StreamSchedule
537 
539  std::vector<std::string> const& branchesToDeleteEarly,
540  std::multimap<std::string, std::string> const& referencesToBranches,
541  std::vector<std::string> const& modulesToSkip,
542  edm::ProductRegistry const& preg) {
543  // setup the list with those products actually registered for this job
544  std::multimap<std::string, Worker*> branchToReadingWorker;
545  initializeBranchToReadingWorker(branchesToDeleteEarly, preg, branchToReadingWorker);
546 
547  const std::vector<std::string> kEmpty;
548  std::map<Worker*, unsigned int> reserveSizeForWorker;
549  unsigned int upperLimitOnReadingWorker = 0;
550  unsigned int upperLimitOnIndicies = 0;
551  unsigned int nUniqueBranchesToDelete = branchToReadingWorker.size();
552 
553  //talk with output modules first
554  modReg.forAllModuleHolders([&branchToReadingWorker, &nUniqueBranchesToDelete](maker::ModuleHolder* iHolder) {
555  auto comm = iHolder->createOutputModuleCommunicator();
556  if (comm) {
557  if (!branchToReadingWorker.empty()) {
558  //If an OutputModule needs a product, we can't delete it early
559  // so we should remove it from our list
560  SelectedProductsForBranchType const& kept = comm->keptProducts();
561  for (auto const& item : kept[InEvent]) {
562  BranchDescription const& desc = *item.first;
563  auto found = branchToReadingWorker.equal_range(desc.branchName());
564  if (found.first != found.second) {
565  --nUniqueBranchesToDelete;
566  branchToReadingWorker.erase(found.first, found.second);
567  }
568  }
569  }
570  }
571  });
572 
573  if (branchToReadingWorker.empty()) {
574  return;
575  }
576 
577  std::unordered_set<std::string> modulesToExclude(modulesToSkip.begin(), modulesToSkip.end());
578  for (auto w : allWorkersLumisAndEvents()) {
579  if (modulesToExclude.end() != modulesToExclude.find(w->description()->moduleLabel())) {
580  continue;
581  }
582  //determine if this module could read a branch we want to delete early
583  auto consumes = w->consumesInfo();
584  if (not consumes.empty()) {
585  bool foundAtLeastOneMatchingBranch = false;
586  for (auto const& product : consumes) {
587  std::string branch = fmt::format("{}_{}_{}_{}",
588  product.type().friendlyClassName(),
589  product.label().data(),
590  product.instance().data(),
591  product.process().data());
592  {
593  //Handle case where worker directly consumes product
594  auto found = branchToReadingWorker.end();
595  if (product.process().empty()) {
596  auto startFound = branchToReadingWorker.lower_bound(branch);
597  if (startFound != branchToReadingWorker.end()) {
598  if (startFound->first.substr(0, branch.size()) == branch) {
599  //match all processNames here, even if it means multiple matches will happen
600  found = startFound;
601  }
602  }
603  } else {
604  auto exactFound = branchToReadingWorker.equal_range(branch);
605  if (exactFound.first != exactFound.second) {
606  found = exactFound.first;
607  }
608  }
609  if (found != branchToReadingWorker.end()) {
610  if (not foundAtLeastOneMatchingBranch) {
611  ++upperLimitOnReadingWorker;
612  foundAtLeastOneMatchingBranch = true;
613  }
614  ++upperLimitOnIndicies;
615  ++reserveSizeForWorker[w];
616  if (nullptr == found->second) {
617  found->second = w;
618  } else {
619  branchToReadingWorker.insert(make_pair(found->first, w));
620  }
621  }
622  }
623  {
624  //Handle case where indirectly consumes product
625  auto found = referencesToBranches.end();
626  if (product.process().empty()) {
627  auto startFound = referencesToBranches.lower_bound(branch);
628  if (startFound != referencesToBranches.end()) {
629  if (startFound->first.substr(0, branch.size()) == branch) {
630  //match all processNames here, even if it means multiple matches will happen
631  found = startFound;
632  }
633  }
634  } else {
635  //can match exactly
636  auto exactFound = referencesToBranches.equal_range(branch);
637  if (exactFound.first != exactFound.second) {
638  found = exactFound.first;
639  }
640  }
641  if (found != referencesToBranches.end()) {
642  for (auto itr = found; (itr != referencesToBranches.end()) and (itr->first == found->first); ++itr) {
643  auto foundInBranchToReadingWorker = branchToReadingWorker.find(itr->second);
644  if (foundInBranchToReadingWorker == branchToReadingWorker.end()) {
645  continue;
646  }
647  if (not foundAtLeastOneMatchingBranch) {
648  ++upperLimitOnReadingWorker;
649  foundAtLeastOneMatchingBranch = true;
650  }
651  ++upperLimitOnIndicies;
652  ++reserveSizeForWorker[w];
653  if (nullptr == foundInBranchToReadingWorker->second) {
654  foundInBranchToReadingWorker->second = w;
655  } else {
656  branchToReadingWorker.insert(make_pair(itr->second, w));
657  }
658  }
659  }
660  }
661  }
662  }
663  }
664  {
665  auto it = branchToReadingWorker.begin();
666  std::vector<std::string> unusedBranches;
667  while (it != branchToReadingWorker.end()) {
668  if (it->second == nullptr) {
669  unusedBranches.push_back(it->first);
670  //erasing the object invalidates the iterator so must advance it first
671  auto temp = it;
672  ++it;
673  branchToReadingWorker.erase(temp);
674  } else {
675  ++it;
676  }
677  }
678  if (not unusedBranches.empty()) {
679  LogWarning l("UnusedProductsForCanDeleteEarly");
680  l << "The following products in the 'canDeleteEarly' list are not used in this job and will be ignored.\n"
681  " If possible, remove the producer from the job.";
682  for (auto const& n : unusedBranches) {
683  l << "\n " << n;
684  }
685  }
686  }
687  if (!branchToReadingWorker.empty()) {
688  earlyDeleteHelpers_.reserve(upperLimitOnReadingWorker);
689  earlyDeleteHelperToBranchIndicies_.resize(upperLimitOnIndicies, 0);
690  earlyDeleteBranchToCount_.reserve(nUniqueBranchesToDelete);
691  std::map<const Worker*, EarlyDeleteHelper*> alreadySeenWorkers;
692  std::string lastBranchName;
693  size_t nextOpenIndex = 0;
694  unsigned int* beginAddress = &(earlyDeleteHelperToBranchIndicies_.front());
695  for (auto& branchAndWorker : branchToReadingWorker) {
696  if (lastBranchName != branchAndWorker.first) {
697  //have to put back the period we removed earlier in order to get the proper name
698  BranchID bid(branchAndWorker.first + ".");
699  earlyDeleteBranchToCount_.emplace_back(bid, 0U);
700  lastBranchName = branchAndWorker.first;
701  }
702  auto found = alreadySeenWorkers.find(branchAndWorker.second);
703  if (alreadySeenWorkers.end() == found) {
704  //NOTE: we will set aside enough space in earlyDeleteHelperToBranchIndicies_ to accommodate
705  // all the branches that might be read by this worker. However, initially we will only tell the
706  // EarlyDeleteHelper about the first one. As additional branches are added via 'appendIndex' the
707  // EarlyDeleteHelper will automatically advance its internal end pointer.
708  size_t index = nextOpenIndex;
709  size_t nIndices = reserveSizeForWorker[branchAndWorker.second];
712  earlyDeleteHelpers_.emplace_back(beginAddress + index, beginAddress + index + 1, &earlyDeleteBranchToCount_);
713  branchAndWorker.second->setEarlyDeleteHelper(&(earlyDeleteHelpers_.back()));
714  alreadySeenWorkers.insert(std::make_pair(branchAndWorker.second, &(earlyDeleteHelpers_.back())));
715  nextOpenIndex += nIndices;
716  } else {
717  found->second->appendIndex(earlyDeleteBranchToCount_.size() - 1);
718  }
719  }
720 
721  //Now we can compactify the earlyDeleteHelperToBranchIndicies_ since we may have over estimated the
722  // space needed for each module
723  auto itLast = earlyDeleteHelpers_.begin();
724  for (auto it = earlyDeleteHelpers_.begin() + 1; it != earlyDeleteHelpers_.end(); ++it) {
725  if (itLast->end() != it->begin()) {
726  //figure the offset for next Worker since it hasn't been moved yet so it has the original address
727  unsigned int delta = it->begin() - itLast->end();
728  it->shiftIndexPointers(delta);
729 
731  earlyDeleteHelperToBranchIndicies_.begin() + (itLast->end() - beginAddress),
732  earlyDeleteHelperToBranchIndicies_.begin() + (it->begin() - beginAddress));
733  }
734  itLast = it;
735  }
737  earlyDeleteHelperToBranchIndicies_.begin() + (itLast->end() - beginAddress),
739 
740  //now tell the paths about the deleters
741  for (auto& p : trig_paths_) {
742  p.setEarlyDeleteHelpers(alreadySeenWorkers);
743  }
744  for (auto& p : end_paths_) {
745  p.setEarlyDeleteHelpers(alreadySeenWorkers);
746  }
748  }
749  }
750 
752  Worker* worker,
753  std::unordered_set<std::string>& conditionalModules,
754  std::unordered_multimap<std::string, edm::BranchDescription const*> const& conditionalModuleBranches,
755  std::unordered_multimap<std::string, AliasInfo> const& aliasMap,
756  ParameterSet& proc_pset,
757  ProductRegistry& preg,
758  PreallocationConfiguration const* prealloc,
759  std::shared_ptr<ProcessConfiguration const> processConfiguration) {
760  std::vector<Worker*> returnValue;
761  auto const& consumesInfo = worker->consumesInfo();
762  auto moduleLabel = worker->description()->moduleLabel();
763  using namespace productholderindexhelper;
764  for (auto const& ci : consumesInfo) {
765  if (not ci.skipCurrentProcess() and
766  (ci.process().empty() or ci.process() == processConfiguration->processName())) {
767  auto productModuleLabel = std::string(ci.label());
768  bool productFromConditionalModule = false;
769  auto itFound = conditionalModules.find(productModuleLabel);
770  if (itFound == conditionalModules.end()) {
771  //Check to see if this was an alias
772  //note that aliasMap was previously filtered so only the conditional modules remain there
773  auto foundAlias = findBestMatchingAlias(conditionalModuleBranches, aliasMap, productModuleLabel, ci);
774  if (foundAlias) {
775  productModuleLabel = *foundAlias;
776  productFromConditionalModule = true;
777  itFound = conditionalModules.find(productModuleLabel);
778  //check that the alias-for conditional module has not been used
779  if (itFound == conditionalModules.end()) {
780  continue;
781  }
782  }
783  } else {
784  //need to check the rest of the data product info
785  auto findBranches = conditionalModuleBranches.equal_range(productModuleLabel);
786  for (auto itBranch = findBranches.first; itBranch != findBranches.second; ++itBranch) {
787  if (itBranch->second->productInstanceName() == ci.instance()) {
788  if (ci.kindOfType() == PRODUCT_TYPE) {
789  if (ci.type() == itBranch->second->unwrappedTypeID()) {
790  productFromConditionalModule = true;
791  break;
792  }
793  } else {
794  //this is a view
796  ci.type(), TypeID(itBranch->second->wrappedType().typeInfo()), itBranch->second->className())) {
797  productFromConditionalModule = true;
798  break;
799  }
800  }
801  }
802  }
803  }
804  if (productFromConditionalModule) {
805  auto condWorker = getWorker(
806  productModuleLabel, proc_pset, workerManagerLumisAndEvents_, preg, prealloc, processConfiguration);
807  assert(condWorker);
808 
809  conditionalModules.erase(itFound);
810 
811  auto dependents = tryToPlaceConditionalModules(condWorker,
812  conditionalModules,
813  conditionalModuleBranches,
814  aliasMap,
815  proc_pset,
816  preg,
817  prealloc,
818  processConfiguration);
819  returnValue.insert(returnValue.end(), dependents.begin(), dependents.end());
820  returnValue.push_back(condWorker);
821  }
822  }
823  }
824  return returnValue;
825  }
826 
828  ProductRegistry& preg,
829  PreallocationConfiguration const* prealloc,
830  std::shared_ptr<ProcessConfiguration const> processConfiguration,
831  std::string const& pathName,
832  bool ignoreFilters,
833  PathWorkers& out,
834  std::vector<std::string> const& endPathNames,
835  ConditionalTaskHelper const& conditionalTaskHelper,
836  std::unordered_set<std::string>& allConditionalModules) {
837  vstring modnames = proc_pset.getParameter<vstring>(pathName);
838  PathWorkers tmpworkers;
839 
840  //Pull out ConditionalTask modules
841  auto condRange = findConditionalTaskModulesRange(modnames);
842 
843  std::unordered_set<std::string> conditionalmods;
844  //An EDAlias may be redirecting to a module on a ConditionalTask
845  std::unordered_multimap<std::string, edm::BranchDescription const*> conditionalModsBranches;
846  std::unordered_map<std::string, unsigned int> conditionalModOrder;
847  if (condRange.first != condRange.second) {
848  for (auto it = condRange.first; it != condRange.second; ++it) {
849  // ordering needs to skip the # token in the path list
850  conditionalModOrder.emplace(*it, it - modnames.begin() - 1);
851  }
852  //the last entry should be ignored since it is required to be "@"
853  conditionalmods = std::unordered_set<std::string>(std::make_move_iterator(condRange.first),
854  std::make_move_iterator(condRange.second));
855 
856  conditionalModsBranches = conditionalTaskHelper.conditionalModuleBranches(conditionalmods);
857  modnames.erase(std::prev(condRange.first), modnames.end());
858 
859  // Make a union of all conditional modules from all Paths
860  allConditionalModules.insert(conditionalmods.begin(), conditionalmods.end());
861  }
862 
863  unsigned int placeInPath = 0;
864  for (auto const& name : modnames) {
865  //Modules except EDFilters are set to run concurrently by default
866  bool doNotRunConcurrently = false;
868  if (name[0] == '!') {
869  filterAction = WorkerInPath::Veto;
870  } else if (name[0] == '-' or name[0] == '+') {
871  filterAction = WorkerInPath::Ignore;
872  }
873  if (name[0] == '|' or name[0] == '+') {
874  //cms.wait was specified so do not run concurrently
875  doNotRunConcurrently = true;
876  }
877 
879  if (filterAction != WorkerInPath::Normal or name[0] == '|') {
880  moduleLabel.erase(0, 1);
881  }
882 
883  Worker* worker =
884  getWorker(moduleLabel, proc_pset, workerManagerLumisAndEvents_, preg, prealloc, processConfiguration);
885  if (worker == nullptr) {
886  std::string pathType("endpath");
887  if (!search_all(endPathNames, pathName)) {
888  pathType = std::string("path");
889  }
891  << "The unknown module label \"" << moduleLabel << "\" appears in " << pathType << " \"" << pathName
892  << "\"\n please check spelling or remove that label from the path.";
893  }
894 
895  if (ignoreFilters && filterAction != WorkerInPath::Ignore && worker->moduleType() == Worker::kFilter) {
896  // We have a filter on an end path, and the filter is not explicitly ignored.
897  // See if the filter is allowed.
898  std::vector<std::string> allowed_filters = proc_pset.getUntrackedParameter<vstring>("@filters_on_endpaths");
899  if (!search_all(allowed_filters, worker->description()->moduleName())) {
900  // Filter is not allowed. Ignore the result, and issue a warning.
901  filterAction = WorkerInPath::Ignore;
902  LogWarning("FilterOnEndPath") << "The EDFilter '" << worker->description()->moduleName()
903  << "' with module label '" << moduleLabel << "' appears on EndPath '"
904  << pathName << "'.\n"
905  << "The return value of the filter will be ignored.\n"
906  << "To suppress this warning, either remove the filter from the endpath,\n"
907  << "or explicitly ignore it in the configuration by using cms.ignore().\n";
908  }
909  }
910  bool runConcurrently = not doNotRunConcurrently;
911  if (runConcurrently && worker->moduleType() == Worker::kFilter and filterAction != WorkerInPath::Ignore) {
912  runConcurrently = false;
913  }
914 
915  auto condModules = tryToPlaceConditionalModules(worker,
916  conditionalmods,
917  conditionalModsBranches,
918  conditionalTaskHelper.aliasMap(),
919  proc_pset,
920  preg,
921  prealloc,
922  processConfiguration);
923  for (auto condMod : condModules) {
924  tmpworkers.emplace_back(
925  condMod, WorkerInPath::Ignore, conditionalModOrder[condMod->description()->moduleLabel()], true);
926  }
927 
928  tmpworkers.emplace_back(worker, filterAction, placeInPath, runConcurrently);
929  ++placeInPath;
930  }
931 
932  out.swap(tmpworkers);
933  }
934 
936  ProductRegistry& preg,
937  PreallocationConfiguration const* prealloc,
938  std::shared_ptr<ProcessConfiguration const> processConfiguration,
939  int bitpos,
940  std::string const& name,
941  TrigResPtr trptr,
942  std::vector<std::string> const& endPathNames,
943  ConditionalTaskHelper const& conditionalTaskHelper,
944  std::unordered_set<std::string>& allConditionalModules) {
945  PathWorkers tmpworkers;
946  fillWorkers(proc_pset,
947  preg,
948  prealloc,
949  processConfiguration,
950  name,
951  false,
952  tmpworkers,
953  endPathNames,
954  conditionalTaskHelper,
955  allConditionalModules);
956 
957  // an empty path will cause an extra bit that is not used
958  if (!tmpworkers.empty()) {
959  trig_paths_.emplace_back(
960  bitpos, name, tmpworkers, trptr, actionTable(), actReg_, &streamContext_, PathContext::PathType::kPath);
961  } else {
962  empty_trig_paths_.push_back(bitpos);
963  }
964  for (WorkerInPath const& workerInPath : tmpworkers) {
965  addToAllWorkers(workerInPath.getWorker());
966  }
967  }
968 
970  ProductRegistry& preg,
971  PreallocationConfiguration const* prealloc,
972  std::shared_ptr<ProcessConfiguration const> processConfiguration,
973  int bitpos,
974  std::string const& name,
975  std::vector<std::string> const& endPathNames,
976  ConditionalTaskHelper const& conditionalTaskHelper,
977  std::unordered_set<std::string>& allConditionalModules) {
978  PathWorkers tmpworkers;
979  fillWorkers(proc_pset,
980  preg,
981  prealloc,
982  processConfiguration,
983  name,
984  true,
985  tmpworkers,
986  endPathNames,
987  conditionalTaskHelper,
988  allConditionalModules);
989 
990  if (!tmpworkers.empty()) {
991  end_paths_.emplace_back(bitpos,
992  name,
993  tmpworkers,
994  TrigResPtr(),
995  actionTable(),
996  actReg_,
999  } else {
1000  empty_end_paths_.push_back(bitpos);
1001  }
1002  for (WorkerInPath const& workerInPath : tmpworkers) {
1003  addToAllWorkers(workerInPath.getWorker());
1004  }
1005  }
1006 
1009  streamContext_.setEventID(EventID(0, 0, 0));
1013 
1014  std::exception_ptr exceptionInStream;
1015  CMS_SA_ALLOW try {
1016  preScheduleSignal<BeginStreamTraits>(&streamContext_);
1018  } catch (...) {
1019  exceptionInStream = std::current_exception();
1020  }
1021 
1022  postScheduleSignal<BeginStreamTraits>(&streamContext_, exceptionInStream);
1023 
1024  if (exceptionInStream) {
1025  bool cleaningUpAfterException = false;
1026  handleException(streamContext_, cleaningUpAfterException, exceptionInStream);
1027  }
1029 
1030  if (exceptionInStream) {
1031  std::rethrow_exception(exceptionInStream);
1032  }
1033  }
1034 
1035  void StreamSchedule::endStream(ExceptionCollector& collector, std::mutex& collectorMutex) noexcept {
1037  streamContext_.setEventID(EventID(0, 0, 0));
1041 
1042  std::exception_ptr exceptionInStream;
1043  CMS_SA_ALLOW try {
1044  preScheduleSignal<EndStreamTraits>(&streamContext_);
1045  workerManagerBeginEnd_.endStream(streamID_, streamContext_, collector, collectorMutex);
1046  } catch (...) {
1047  exceptionInStream = std::current_exception();
1048  }
1049 
1050  postScheduleSignal<EndStreamTraits>(&streamContext_, exceptionInStream);
1051 
1052  if (exceptionInStream) {
1053  std::lock_guard<std::mutex> collectorLock(collectorMutex);
1054  collector.call([&exceptionInStream]() { std::rethrow_exception(exceptionInStream); });
1055  }
1057  }
1058 
1060  for (auto const& worker : allWorkersBeginEnd()) {
1061  if (worker->description()->moduleLabel() == iLabel) {
1062  iMod->replaceModuleFor(worker);
1063 
1065  streamContext_.setEventID(EventID(0, 0, 0));
1069  try {
1070  worker->beginStream(streamID_, streamContext_);
1071  } catch (cms::Exception& ex) {
1073  ex.addContext("Executing StreamSchedule::replaceModule");
1074  throw;
1075  }
1077  break;
1078  }
1079  }
1080 
1081  for (auto const& worker : allWorkersRuns()) {
1082  if (worker->description()->moduleLabel() == iLabel) {
1083  iMod->replaceModuleFor(worker);
1084  break;
1085  }
1086  }
1087 
1088  for (auto const& worker : allWorkersLumisAndEvents()) {
1089  if (worker->description()->moduleLabel() == iLabel) {
1090  iMod->replaceModuleFor(worker);
1091  break;
1092  }
1093  }
1094  }
1095 
1100  }
1101 
1102  std::vector<ModuleDescription const*> StreamSchedule::getAllModuleDescriptions() const {
1103  std::vector<ModuleDescription const*> result;
1104  result.reserve(allWorkersLumisAndEvents().size());
1105 
1106  for (auto const& worker : allWorkersLumisAndEvents()) {
1107  ModuleDescription const* p = worker->description();
1108  result.push_back(p);
1109  }
1110  return result;
1111  }
1112 
1114  WaitingTaskHolder iTask,
1116  ServiceToken const& serviceToken,
1117  std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters) {
1118  EventPrincipal& ep = info.principal();
1119 
1120  // Caught exception is propagated via WaitingTaskHolder
1121  CMS_SA_ALLOW try {
1122  this->resetAll();
1123 
1125 
1126  Traits::setStreamContext(streamContext_, ep);
1127  //a service may want to communicate with another service
1128  ServiceRegistry::Operate guard(serviceToken);
1129  Traits::preScheduleSignal(actReg_.get(), &streamContext_);
1130 
1131  // Data dependencies need to be set up before marking empty
1132  // (End)Paths complete in case something consumes the status of
1133  // the empty (EndPath)
1136 
1137  HLTPathStatus hltPathStatus(hlt::Pass, 0);
1138  for (int empty_trig_path : empty_trig_paths_) {
1139  results_->at(empty_trig_path) = hltPathStatus;
1140  pathStatusInserters[empty_trig_path]->setPathStatus(streamID_, hltPathStatus);
1141  std::exception_ptr except = pathStatusInserterWorkers_[empty_trig_path]
1144  if (except) {
1145  iTask.doneWaiting(except);
1146  return;
1147  }
1148  }
1149  if (not endPathStatusInserterWorkers_.empty()) {
1150  for (int empty_end_path : empty_end_paths_) {
1151  std::exception_ptr except =
1152  endPathStatusInserterWorkers_[empty_end_path]
1155  if (except) {
1156  iTask.doneWaiting(except);
1157  return;
1158  }
1159  }
1160  }
1161 
1162  ++total_events_;
1163 
1164  //use to give priorities on an error to ones from Paths
1165  auto pathErrorHolder = std::make_unique<std::atomic<std::exception_ptr*>>(nullptr);
1166  auto pathErrorPtr = pathErrorHolder.get();
1167  ServiceWeakToken weakToken = serviceToken;
1168  auto allPathsDone = make_waiting_task(
1169  [iTask, this, weakToken, pathError = std::move(pathErrorHolder)](std::exception_ptr const* iPtr) mutable {
1170  ServiceRegistry::Operate operate(weakToken.lock());
1171 
1172  std::exception_ptr ptr;
1173  if (pathError->load()) {
1174  ptr = *pathError->load();
1175  delete pathError->load();
1176  }
1177  if ((not ptr) and iPtr) {
1178  ptr = *iPtr;
1179  }
1180  iTask.doneWaiting(finishProcessOneEvent(ptr));
1181  });
1182  //The holder guarantees that if the paths finish before the loop ends
1183  // that we do not start too soon. It also guarantees that the task will
1184  // run under that condition.
1185  WaitingTaskHolder allPathsHolder(*iTask.group(), allPathsDone);
1186 
1187  auto pathsDone = make_waiting_task([allPathsHolder, pathErrorPtr, transitionInfo = info, this, weakToken](
1188  std::exception_ptr const* iPtr) mutable {
1189  ServiceRegistry::Operate operate(weakToken.lock());
1190 
1191  if (iPtr) {
1192  // free previous value of pathErrorPtr, if any;
1193  // prioritize this error over one that happens in EndPath or Accumulate
1194  auto currentPtr = pathErrorPtr->exchange(new std::exception_ptr(*iPtr));
1195  assert(currentPtr == nullptr);
1196  }
1197  finishedPaths(*pathErrorPtr, std::move(allPathsHolder), transitionInfo);
1198  });
1199 
1200  //The holder guarantees that if the paths finish before the loop ends
1201  // that we do not start too soon. It also guarantees that the task will
1202  // run under that condition.
1203  WaitingTaskHolder taskHolder(*iTask.group(), pathsDone);
1204 
1205  //start end paths first so on single threaded the paths will run first
1206  WaitingTaskHolder hAllPathsDone(*iTask.group(), allPathsDone);
1207  for (auto it = end_paths_.rbegin(), itEnd = end_paths_.rend(); it != itEnd; ++it) {
1208  it->processEventUsingPathAsync(hAllPathsDone, info, serviceToken, streamID_, &streamContext_);
1209  }
1210 
1211  for (auto it = trig_paths_.rbegin(), itEnd = trig_paths_.rend(); it != itEnd; ++it) {
1212  it->processEventUsingPathAsync(taskHolder, info, serviceToken, streamID_, &streamContext_);
1213  }
1214 
1215  ParentContext parentContext(&streamContext_);
1217  hAllPathsDone, info, serviceToken, streamID_, parentContext, &streamContext_);
1218  } catch (...) {
1219  iTask.doneWaiting(std::current_exception());
1220  }
1221  }
1222 
1223  void StreamSchedule::finishedPaths(std::atomic<std::exception_ptr*>& iExcept,
1224  WaitingTaskHolder iWait,
1226  if (iExcept) {
1227  // Caught exception is propagated via WaitingTaskHolder
1228  CMS_SA_ALLOW try { std::rethrow_exception(*(iExcept.load())); } catch (cms::Exception& e) {
1232  edm::printCmsExceptionWarning("TryToContinue", e);
1233  *(iExcept.load()) = std::exception_ptr();
1234  } else {
1235  *(iExcept.load()) = std::current_exception();
1236  }
1237  } catch (...) {
1238  *(iExcept.load()) = std::current_exception();
1239  }
1240  }
1241 
1242  if ((not iExcept) and results_->accept()) {
1243  ++total_passed_;
1244  }
1245 
1246  if (nullptr != results_inserter_.get()) {
1247  // Caught exception is propagated to the caller
1248  CMS_SA_ALLOW try {
1249  //Even if there was an exception, we need to allow results inserter
1250  // to run since some module may be waiting on its results.
1251  ParentContext parentContext(&streamContext_);
1253 
1254  auto expt = results_inserter_->runModuleDirectly<Traits>(info, streamID_, parentContext, &streamContext_);
1255  if (expt) {
1256  std::rethrow_exception(expt);
1257  }
1258  } catch (cms::Exception& ex) {
1259  if (not iExcept) {
1260  if (ex.context().empty()) {
1261  std::ostringstream ost;
1262  ost << "Processing Event " << info.principal().id();
1263  ex.addContext(ost.str());
1264  }
1265  iExcept.store(new std::exception_ptr(std::current_exception()));
1266  }
1267  } catch (...) {
1268  if (not iExcept) {
1269  iExcept.store(new std::exception_ptr(std::current_exception()));
1270  }
1271  }
1272  }
1273  std::exception_ptr ptr;
1274  if (iExcept) {
1275  ptr = *iExcept.load();
1276  }
1277  iWait.doneWaiting(ptr);
1278  }
1279 
1280  std::exception_ptr StreamSchedule::finishProcessOneEvent(std::exception_ptr iExcept) {
1282 
1283  if (iExcept) {
1284  //add context information to the exception and print message
1285  try {
1286  convertException::wrap([&]() { std::rethrow_exception(iExcept); });
1287  } catch (cms::Exception& ex) {
1288  bool const cleaningUpAfterException = false;
1289  if (ex.context().empty()) {
1290  addContextAndPrintException("Calling function StreamSchedule::processOneEvent", ex, cleaningUpAfterException);
1291  } else {
1292  addContextAndPrintException("", ex, cleaningUpAfterException);
1293  }
1294  iExcept = std::current_exception();
1295  }
1296 
1297  actReg_->preStreamEarlyTerminationSignal_(streamContext_, TerminationOrigin::ExceptionFromThisContext);
1298  }
1299  // Caught exception is propagated to the caller
1300  CMS_SA_ALLOW try { Traits::postScheduleSignal(actReg_.get(), &streamContext_); } catch (...) {
1301  if (not iExcept) {
1302  iExcept = std::current_exception();
1303  }
1304  }
1305  if (not iExcept) {
1306  resetEarlyDelete();
1307  }
1308 
1309  return iExcept;
1310  }
1311 
1312  void StreamSchedule::availablePaths(std::vector<std::string>& oLabelsToFill) const {
1313  oLabelsToFill.reserve(trig_paths_.size());
1314  std::transform(trig_paths_.begin(),
1315  trig_paths_.end(),
1316  std::back_inserter(oLabelsToFill),
1317  std::bind(&Path::name, std::placeholders::_1));
1318  }
1319 
1320  void StreamSchedule::modulesInPath(std::string const& iPathLabel, std::vector<std::string>& oLabelsToFill) const {
1321  TrigPaths::const_iterator itFound = std::find_if(
1322  trig_paths_.begin(),
1323  trig_paths_.end(),
1324  std::bind(std::equal_to<std::string>(), iPathLabel, std::bind(&Path::name, std::placeholders::_1)));
1325  if (itFound != trig_paths_.end()) {
1326  oLabelsToFill.reserve(itFound->size());
1327  for (size_t i = 0; i < itFound->size(); ++i) {
1328  oLabelsToFill.push_back(itFound->getWorker(i)->description()->moduleLabel());
1329  }
1330  }
1331  }
1332 
1334  std::vector<ModuleDescription const*>& descriptions,
1335  unsigned int hint) const {
1336  descriptions.clear();
1337  bool found = false;
1338  TrigPaths::const_iterator itFound;
1339 
1340  if (hint < trig_paths_.size()) {
1341  itFound = trig_paths_.begin() + hint;
1342  if (itFound->name() == iPathLabel)
1343  found = true;
1344  }
1345  if (!found) {
1346  // if the hint did not work, do it the slow way
1347  itFound = std::find_if(
1348  trig_paths_.begin(),
1349  trig_paths_.end(),
1350  std::bind(std::equal_to<std::string>(), iPathLabel, std::bind(&Path::name, std::placeholders::_1)));
1351  if (itFound != trig_paths_.end())
1352  found = true;
1353  }
1354  if (found) {
1355  descriptions.reserve(itFound->size());
1356  for (size_t i = 0; i < itFound->size(); ++i) {
1357  descriptions.push_back(itFound->getWorker(i)->description());
1358  }
1359  }
1360  }
1361 
1363  std::vector<ModuleDescription const*>& descriptions,
1364  unsigned int hint) const {
1365  descriptions.clear();
1366  bool found = false;
1367  TrigPaths::const_iterator itFound;
1368 
1369  if (hint < end_paths_.size()) {
1370  itFound = end_paths_.begin() + hint;
1371  if (itFound->name() == iEndPathLabel)
1372  found = true;
1373  }
1374  if (!found) {
1375  // if the hint did not work, do it the slow way
1376  itFound = std::find_if(
1377  end_paths_.begin(),
1378  end_paths_.end(),
1379  std::bind(std::equal_to<std::string>(), iEndPathLabel, std::bind(&Path::name, std::placeholders::_1)));
1380  if (itFound != end_paths_.end())
1381  found = true;
1382  }
1383  if (found) {
1384  descriptions.reserve(itFound->size());
1385  for (size_t i = 0; i < itFound->size(); ++i) {
1386  descriptions.push_back(itFound->getWorker(i)->description());
1387  }
1388  }
1389  }
1390 
1391  static void fillModuleInPathSummary(Path const& path, size_t which, ModuleInPathSummary& sum) {
1392  sum.timesVisited += path.timesVisited(which);
1393  sum.timesPassed += path.timesPassed(which);
1394  sum.timesFailed += path.timesFailed(which);
1395  sum.timesExcept += path.timesExcept(which);
1396  sum.moduleLabel = path.getWorker(which)->description()->moduleLabel();
1397  sum.bitPosition = path.bitPosition(which);
1398  }
1399 
1400  static void fillPathSummary(Path const& path, PathSummary& sum) {
1401  sum.name = path.name();
1402  sum.bitPosition = path.bitPosition();
1403  sum.timesRun += path.timesRun();
1404  sum.timesPassed += path.timesPassed();
1405  sum.timesFailed += path.timesFailed();
1406  sum.timesExcept += path.timesExcept();
1407 
1408  Path::size_type sz = path.size();
1409  if (sum.moduleInPathSummaries.empty()) {
1410  std::vector<ModuleInPathSummary> temp(sz);
1411  for (size_t i = 0; i != sz; ++i) {
1413  }
1414  sum.moduleInPathSummaries.swap(temp);
1415  } else {
1416  assert(sz == sum.moduleInPathSummaries.size());
1417  for (size_t i = 0; i != sz; ++i) {
1419  }
1420  }
1421  }
1422 
1423  static void fillWorkerSummaryAux(Worker const& w, WorkerSummary& sum) {
1424  sum.timesVisited += w.timesVisited();
1425  sum.timesRun += w.timesRun();
1426  sum.timesPassed += w.timesPassed();
1427  sum.timesFailed += w.timesFailed();
1428  sum.timesExcept += w.timesExcept();
1429  sum.moduleLabel = w.description()->moduleLabel();
1430  }
1431 
1432  static void fillWorkerSummary(Worker const* pw, WorkerSummary& sum) { fillWorkerSummaryAux(*pw, sum); }
1433 
1435  rep.eventSummary.totalEvents += totalEvents();
1436  rep.eventSummary.totalEventsPassed += totalEventsPassed();
1437  rep.eventSummary.totalEventsFailed += totalEventsFailed();
1438 
1439  fill_summary(trig_paths_, rep.trigPathSummaries, &fillPathSummary);
1440  fill_summary(end_paths_, rep.endPathSummaries, &fillPathSummary);
1441  fill_summary(allWorkersLumisAndEvents(), rep.workerSummaries, &fillWorkerSummary);
1442  }
1443 
1445  using std::placeholders::_1;
1447  for_all(trig_paths_, std::bind(&Path::clearCounters, _1));
1448  for_all(end_paths_, std::bind(&Path::clearCounters, _1));
1450  }
1451 
1452  void StreamSchedule::resetAll() { results_->reset(); }
1453 
1455 
1457  //must be sure we have cleared the count first
1458  for (auto& count : earlyDeleteBranchToCount_) {
1459  count.count = 0;
1460  }
1461  //now reset based on how many helpers use that branch
1463  ++(earlyDeleteBranchToCount_[index].count);
1464  }
1465  for (auto& helper : earlyDeleteHelpers_) {
1466  helper.reset();
1467  }
1468  }
1469 
1471  std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
1472  std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
1474  int bitpos = 0;
1475  unsigned int indexEmpty = 0;
1476  unsigned int indexOfPath = 0;
1477  for (auto& pathStatusInserter : pathStatusInserters) {
1478  std::shared_ptr<PathStatusInserter> inserterPtr = get_underlying(pathStatusInserter);
1479  WorkerPtr workerPtr(
1480  new edm::WorkerT<PathStatusInserter::ModuleType>(inserterPtr, inserterPtr->moduleDescription(), &actions));
1481  pathStatusInserterWorkers_.emplace_back(workerPtr);
1482  workerPtr->setActivityRegistry(actReg_);
1483  addToAllWorkers(workerPtr.get());
1484 
1485  // A little complexity here because a C++ Path object is not
1486  // instantiated and put into end_paths if there are no modules
1487  // on the configured path.
1488  if (indexEmpty < empty_trig_paths_.size() && bitpos == empty_trig_paths_.at(indexEmpty)) {
1489  ++indexEmpty;
1490  } else {
1491  trig_paths_.at(indexOfPath).setPathStatusInserter(inserterPtr.get(), workerPtr.get());
1492  ++indexOfPath;
1493  }
1494  ++bitpos;
1495  }
1496 
1497  bitpos = 0;
1498  indexEmpty = 0;
1499  indexOfPath = 0;
1500  for (auto& endPathStatusInserter : endPathStatusInserters) {
1501  std::shared_ptr<EndPathStatusInserter> inserterPtr = get_underlying(endPathStatusInserter);
1502  WorkerPtr workerPtr(
1503  new edm::WorkerT<EndPathStatusInserter::ModuleType>(inserterPtr, inserterPtr->moduleDescription(), &actions));
1504  endPathStatusInserterWorkers_.emplace_back(workerPtr);
1505  workerPtr->setActivityRegistry(actReg_);
1506  addToAllWorkers(workerPtr.get());
1507 
1508  // A little complexity here because a C++ Path object is not
1509  // instantiated and put into end_paths if there are no modules
1510  // on the configured path.
1511  if (indexEmpty < empty_end_paths_.size() && bitpos == empty_end_paths_.at(indexEmpty)) {
1512  ++indexEmpty;
1513  } else {
1514  end_paths_.at(indexOfPath).setPathStatusInserter(nullptr, workerPtr.get());
1515  ++indexOfPath;
1516  }
1517  ++bitpos;
1518  }
1519  }
1520 
1522  bool cleaningUpAfterException,
1523  std::exception_ptr& excpt) const noexcept {
1524  //add context information to the exception and print message
1525  try {
1526  convertException::wrap([&excpt]() { std::rethrow_exception(excpt); });
1527  } catch (cms::Exception& ex) {
1528  std::ostringstream ost;
1529  // In most cases the exception will already have context at this point,
1530  // but add some context here in those rare cases where it does not.
1531  if (ex.context().empty()) {
1532  exceptionContext(ost, streamContext);
1533  }
1534  addContextAndPrintException(ost.str().c_str(), ex, cleaningUpAfterException);
1535  excpt = std::current_exception();
1536  }
1537  // We are already handling an earlier exception, so ignore it
1538  // if this signal results in another exception being thrown.
1539  CMS_SA_ALLOW try {
1540  actReg_->preStreamEarlyTerminationSignal_(streamContext, TerminationOrigin::ExceptionFromThisContext);
1541  } catch (...) {
1542  }
1543  }
1544 } // namespace edm
static void fillModuleInPathSummary(Path const &path, size_t which, ModuleInPathSummary &sum)
void initializeEarlyDelete(ModuleRegistry &modReg, std::vector< std::string > const &branchesToDeleteEarly, std::multimap< std::string, std::string > const &referencesToBranches, std::vector< std::string > const &modulesToSkip, edm::ProductRegistry const &preg)
AllWorkers const & allWorkersBeginEnd() const
returns the collection of pointers to workers
dictionary aliases
Definition: autoCond.py:112
void moduleDescriptionsInEndPath(std::string const &iEndPathLabel, std::vector< ModuleDescription const *> &descriptions, unsigned int hint) const
ExceptionToActionTable const & actionTable() const
returns the action table
T getParameter(std::string const &) const
Definition: ParameterSet.h:307
static const TGPicture * info(bool iBackgroundIsBlack)
void setTimestamp(Timestamp const &v)
Definition: StreamContext.h:70
#define CMS_SA_ALLOW
std::vector< int > empty_trig_paths_
Definition: helper.py:1
roAction_t actions[nactions]
Definition: GenABIO.cc:181
void exceptionContext(cms::Exception &, ESModuleCallingContext const &)
std::vector< edm::propagate_const< WorkerPtr > > pathStatusInserterWorkers_
void fillWorkers(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &name, bool ignoreFilters, PathWorkers &out, std::vector< std::string > const &endPathNames, ConditionalTaskHelper const &conditionalTaskHelper, std::unordered_set< std::string > &allConditionalModules)
virtual void replaceModuleFor(Worker *) const =0
ProductList const & productList() const
std::unordered_multimap< std::string, AliasInfo > const & aliasMap() const
void processAccumulatorsAsync(WaitingTaskHolder, typename T::TransitionInfoType const &, ServiceToken const &, StreamID, ParentContext const &, typename T::Context const *)
std::array< SelectedProducts, NumBranchTypes > SelectedProductsForBranchType
static PFTauRenderPlugin instance
T w() const
std::shared_ptr< HLTGlobalStatus > TrigResPtr
WorkersInPath::size_type size_type
Definition: Path.h:46
int totalEventsPassed() const
ret
prodAgent to be discontinued
void modulesInPath(std::string const &iPathLabel, std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel ...
static std::mutex mutex
Definition: Proxy.cc:8
void endStream(ExceptionCollector &collector, std::mutex &collectorMutex) noexcept
void processOneEventAsync(WaitingTaskHolder iTask, EventTransitionInfo &, ServiceToken const &token, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters)
std::vector< BranchToCount > earlyDeleteBranchToCount_
void clearCounters() noexcept
Definition: Worker.h:234
std::unordered_multimap< std::string, AliasInfo > aliasMap_
void setupOnDemandSystem(EventTransitionInfo const &)
void addContextAndPrintException(char const *context, cms::Exception &ex, bool disablePrint)
ConditionalTaskHelper(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, WorkerManager &workerManagerLumisAndEvents, std::vector< std::string > const &trigPathNames)
void addToAllWorkers(Worker *w)
std::string const & moduleName() const
StreamSchedule(std::shared_ptr< TriggerResultInserter > inserter, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, std::shared_ptr< ModuleRegistry >, ParameterSet &proc_pset, service::TriggerNamesService const &tns, PreallocationConfiguration const &prealloc, ProductRegistry &pregistry, ExceptionToActionTable const &actions, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, StreamID streamID, ProcessContext const *processContext)
std::vector< Worker * > tryToPlaceConditionalModules(Worker *, std::unordered_set< std::string > &conditionalModules, std::unordered_multimap< std::string, edm::BranchDescription const *> const &conditionalModuleBranches, std::unordered_multimap< std::string, AliasInfo > const &aliasMap, ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration)
edm::propagate_const< WorkerPtr > results_inserter_
void deleteModuleIfExists(std::string const &moduleLabel)
std::shared_ptr< Worker > WorkerPtr
void find(edm::Handle< EcalRecHitCollection > &hits, DetId thisDet, std::vector< EcalRecHitCollection::const_iterator > &hit, bool debug=false)
Definition: FindCaloHit.cc:19
assert(be >=bs)
StreamID streamID() const
WorkerManager workerManagerBeginEnd_
unsigned int number_of_unscheduled_modules_
int totalEventsFailed() const
TEMPL(T2) struct Divides void
Definition: Factorize.h:24
std::shared_ptr< ActivityRegistry > actReg_
void setTransition(Transition v)
Definition: StreamContext.h:66
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
T getUntrackedParameter(std::string const &, T const &) const
constexpr element_type const * get() const
std::string moduleLabel
Definition: TriggerReport.h:52
static RunIndex invalidRunIndex()
Definition: RunIndex.cc:9
oneapi::tbb::task_group * group() const noexcept
void processEDAliases(std::vector< std::string > const &aliasNamesToProcess, std::unordered_set< std::string > const &aliasModulesToProcess, ParameterSet const &proc_pset, std::string const &processName, ProductRegistry &preg)
char const * label
std::vector< WorkerInPath > PathWorkers
std::vector< int > empty_end_paths_
accept
Definition: HLTenums.h:18
WorkerManager workerManagerRuns_
void setLuminosityBlockIndex(LuminosityBlockIndex const &v)
Definition: StreamContext.h:69
void processSwitchEDAliases(ParameterSet const &proc_pset, ProductRegistry &preg, ProcessConfiguration const &processConfiguration, std::unordered_set< std::string > const &allConditionalMods)
std::string name
Definition: TriggerReport.h:41
void moduleDescriptionsInPath(std::string const &iPathLabel, std::vector< ModuleDescription const *> &descriptions, unsigned int hint) const
TrigResConstPtr results() const
Strings const & getTrigPaths() const
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
def unique(seq, keepstr=True)
Definition: tier0.py:24
Definition: Path.h:41
StreamContext streamContext_
std::vector< std::string > vstring
virtual std::vector< ConsumesInfo > consumesInfo() const =0
static void fillPathSummary(Path const &path, PathSummary &sum)
void finishedPaths(std::atomic< std::exception_ptr *> &, WaitingTaskHolder, EventTransitionInfo &)
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:92
ModuleDescription const * description() const noexcept
Definition: Worker.h:200
void fillTrigPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, TrigResPtr, std::vector< std::string > const &endPathNames, ConditionalTaskHelper const &conditionalTaskHelper, std::unordered_set< std::string > &allConditionalModules)
bool typeIsViewCompatible(TypeID const &requestedViewType, TypeID const &wrappedtypeID, std::string const &className)
rep
Definition: cuy.py:1189
std::exception_ptr finishProcessOneEvent(std::exception_ptr)
static LuminosityBlockIndex invalidLuminosityBlockIndex()
ServiceToken lock() const
Definition: ServiceToken.h:101
void deleteModule(std::string const &iLabel)
Delete the module with label iLabel.
void doneWaiting(std::exception_ptr iExcept) noexcept
int totalEvents() const
static void fillWorkerSummaryAux(Worker const &w, WorkerSummary &sum)
virtual Types moduleType() const =0
Log< level::Info, false > LogInfo
std::string const & className() const
Definition: TypeID.cc:40
void clearCounters()
Clear all the counters in the trigger report.
void fillAliasMap(ParameterSet const &proc_pset, std::unordered_set< std::string > const &allConditionalMods)
Strings const & getEndPaths() const
void getTriggerReport(TriggerReport &rep) const
void addToUnscheduledWorkers(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string label, std::set< std::string > &unscheduledLabels, std::vector< std::string > &shouldBeUsedLabels)
exception_actions::ActionCodes find(const std::string &category) const
std::vector< unsigned int > earlyDeleteHelperToBranchIndicies_
void forAllModuleHolders(F iFunc)
edm::propagate_const< TrigResPtr > results_
void handleException(StreamContext const &, bool cleaningUpAfterException, std::exception_ptr &) const noexcept
bool search_all(ForwardSequence const &s, Datum const &d)
Definition: Algorithms.h:36
AllWorkers const & allWorkersLumisAndEvents() const
double b
Definition: hdecay.h:120
constexpr T & get_underlying(propagate_const< T > &)
void addContext(std::string const &context)
Definition: Exception.cc:169
virtual std::unique_ptr< OutputModuleCommunicator > createOutputModuleCommunicator()=0
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
static void fillWorkerSummary(Worker const *pw, WorkerSummary &sum)
std::string const & processName() const
void setEventID(EventID const &v)
Definition: StreamContext.h:67
HLT enums.
void setupResolvers(Principal &principal)
void replaceModule(maker::ModuleHolder *iMod, std::string const &iLabel)
clone the type of module with label iLabel but configure with iPSet.
std::vector< edm::propagate_const< WorkerPtr > > endPathStatusInserterWorkers_
void makePathStatusInserters(std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, ExceptionToActionTable const &actions)
def which(cmd)
Definition: eostools.py:336
std::unordered_multimap< std::string, edm::BranchDescription const * > conditionalModuleBranches(std::unordered_set< std::string > const &conditionalmods) const
std::vector< ModuleInPathSummary > moduleInPathSummaries
Definition: TriggerReport.h:42
void availablePaths(std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill the labels for all paths in the process
unsigned int value() const
Definition: StreamID.h:43
auto wrap(F iFunc) -> decltype(iFunc())
void endStream(StreamID, StreamContext const &, ExceptionCollector &, std::mutex &collectorMutex) noexcept
std::unordered_multimap< std::string, edm::BranchDescription const * > conditionalModsBranches_
Log< level::Warning, false > LogWarning
void fillEndPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, std::vector< std::string > const &endPathNames, ConditionalTaskHelper const &conditionalTaskHelper, std::unordered_set< std::string > &allConditionalModules)
std::list< std::string > const & context() const
Definition: Exception.cc:151
long double T
std::string const & moduleLabel() const
T mod(const T &a, const T &b)
Definition: ecalDccMap.h:4
std::string const & name() const
Definition: Path.h:66
WorkerManager workerManagerLumisAndEvents_
void setRunIndex(RunIndex const &v)
Definition: StreamContext.h:68
void beginStream(StreamID, StreamContext const &)
std::vector< std::string > vstring
Definition: Schedule.cc:482
std::vector< std::string > set_difference(std::vector< std::string > const &v1, std::vector< std::string > const &v2)
AllWorkers const & allWorkersRuns() const
def move(src, dest)
Definition: eostools.py:511
void clearCounters()
Definition: Path.cc:183
std::vector< EarlyDeleteHelper > earlyDeleteHelpers_
void addToAllWorkers(Worker *w)
void printCmsExceptionWarning(char const *behavior, cms::Exception const &e)
ParameterSet * getPSetForUpdate(std::string const &name, bool &isTracked)
std::vector< std::string > set_intersection(std::vector< std::string > const &v1, std::vector< std::string > const &v2)
unsigned transform(const HcalDetId &id, unsigned transformCode)