CMS 3D CMS Logo

StreamSchedule.cc
Go to the documentation of this file.
2 
29 
31 #include "processEDAliases.h"
32 
33 #include <algorithm>
34 #include <cassert>
35 #include <cstdlib>
36 #include <functional>
37 #include <iomanip>
38 #include <limits>
39 #include <list>
40 #include <map>
41 #include <exception>
42 #include <fmt/format.h>
43 
44 namespace edm {
45 
46  namespace {
47 
48  // Function template to transform each element in the input range to
49  // a value placed into the output range. The supplied function
50  // should take a const_reference to the 'input', and write to a
51  // reference to the 'output'.
52  template <typename InputIterator, typename ForwardIterator, typename Func>
53  void transform_into(InputIterator begin, InputIterator end, ForwardIterator out, Func func) {
54  for (; begin != end; ++begin, ++out)
55  func(*begin, *out);
56  }
57 
58  // Function template that takes a sequence 'from', a sequence
59  // 'to', and a callable object 'func'. It and applies
60  // transform_into to fill the 'to' sequence with the values
61  // calcuated by the callable object, taking care to fill the
62  // outupt only if all calls succeed.
63  template <typename FROM, typename TO, typename FUNC>
64  void fill_summary(FROM const& from, TO& to, FUNC func) {
65  if (to.size() != from.size()) {
66  TO temp(from.size());
67  transform_into(from.begin(), from.end(), temp.begin(), func);
68  to.swap(temp);
69  } else {
70  transform_into(from.begin(), from.end(), to.begin(), func);
71  }
72  }
73 
74  // -----------------------------
75 
76  // Here we make the trigger results inserter directly. This should
77  // probably be a utility in the WorkerRegistry or elsewhere.
78 
79  StreamSchedule::WorkerPtr makeInserter(ExceptionToActionTable const& actions,
80  std::shared_ptr<ActivityRegistry> areg,
81  std::shared_ptr<TriggerResultInserter> inserter) {
83  new edm::WorkerT<TriggerResultInserter::ModuleType>(inserter, inserter->moduleDescription(), &actions));
84  ptr->setActivityRegistry(areg);
85  return ptr;
86  }
87 
88  void initializeBranchToReadingWorker(std::vector<std::string> const& branchesToDeleteEarly,
89  ProductRegistry const& preg,
90  std::multimap<std::string, Worker*>& branchToReadingWorker) {
91  auto vBranchesToDeleteEarly = branchesToDeleteEarly;
92  // Remove any duplicates
93  std::sort(vBranchesToDeleteEarly.begin(), vBranchesToDeleteEarly.end(), std::less<std::string>());
94  vBranchesToDeleteEarly.erase(std::unique(vBranchesToDeleteEarly.begin(), vBranchesToDeleteEarly.end()),
95  vBranchesToDeleteEarly.end());
96 
97  // Are the requested items in the product registry?
98  auto allBranchNames = preg.allBranchNames();
99  //the branch names all end with a period, which we do not want to compare with
100  for (auto& b : allBranchNames) {
101  b.resize(b.size() - 1);
102  }
103  std::sort(allBranchNames.begin(), allBranchNames.end(), std::less<std::string>());
104  std::vector<std::string> temp;
105  temp.reserve(vBranchesToDeleteEarly.size());
106 
107  std::set_intersection(vBranchesToDeleteEarly.begin(),
108  vBranchesToDeleteEarly.end(),
109  allBranchNames.begin(),
110  allBranchNames.end(),
111  std::back_inserter(temp));
112  vBranchesToDeleteEarly.swap(temp);
113  if (temp.size() != vBranchesToDeleteEarly.size()) {
114  std::vector<std::string> missingProducts;
115  std::set_difference(temp.begin(),
116  temp.end(),
117  vBranchesToDeleteEarly.begin(),
118  vBranchesToDeleteEarly.end(),
119  std::back_inserter(missingProducts));
120  LogInfo l("MissingProductsForCanDeleteEarly");
121  l << "The following products in the 'canDeleteEarly' list are not available in this job and will be ignored.";
122  for (auto const& n : missingProducts) {
123  l << "\n " << n;
124  }
125  }
126  //set placeholder for the branch, we will remove the nullptr if a
127  // module actually wants the branch.
128  for (auto const& branch : vBranchesToDeleteEarly) {
129  branchToReadingWorker.insert(std::make_pair(branch, static_cast<Worker*>(nullptr)));
130  }
131  }
132 
133  Worker* getWorker(std::string const& moduleLabel,
134  ParameterSet& proc_pset,
135  WorkerManager& workerManager,
136  ProductRegistry& preg,
137  PreallocationConfiguration const* prealloc,
138  std::shared_ptr<ProcessConfiguration const> processConfiguration) {
139  bool isTracked;
140  ParameterSet* modpset = proc_pset.getPSetForUpdate(moduleLabel, isTracked);
141  if (modpset == nullptr) {
142  return nullptr;
143  }
144  assert(isTracked);
145 
146  return workerManager.getWorker(*modpset, preg, prealloc, processConfiguration, moduleLabel);
147  }
148 
149  // If ConditionalTask modules exist in the container of module
150  // names, returns the range (std::pair) for the modules. The range
151  // excludes the special markers '#' (right before the
152  // ConditionalTask modules) and '@' (last element).
153  // If the module name container does not contain ConditionalTask
154  // modules, returns std::pair of end iterators.
155  template <typename T>
156  auto findConditionalTaskModulesRange(T& modnames) {
157  auto beg = std::find(modnames.begin(), modnames.end(), "#");
158  if (beg == modnames.end()) {
159  return std::pair(modnames.end(), modnames.end());
160  }
161  return std::pair(beg + 1, std::prev(modnames.end()));
162  }
163 
164  std::optional<std::string> findBestMatchingAlias(
165  std::unordered_multimap<std::string, edm::BranchDescription const*> const& conditionalModuleBranches,
166  std::unordered_multimap<std::string, StreamSchedule::AliasInfo> const& aliasMap,
167  std::string const& productModuleLabel,
168  ConsumesInfo const& consumesInfo) {
169  std::optional<std::string> best;
170  int wildcardsInBest = std::numeric_limits<int>::max();
171  bool bestIsAmbiguous = false;
172 
173  auto updateBest = [&best, &wildcardsInBest, &bestIsAmbiguous](
174  std::string const& label, bool instanceIsWildcard, bool typeIsWildcard) {
175  int const wildcards = static_cast<int>(instanceIsWildcard) + static_cast<int>(typeIsWildcard);
176  if (wildcards == 0) {
177  bestIsAmbiguous = false;
178  return true;
179  }
180  if (not best or wildcards < wildcardsInBest) {
181  best = label;
182  wildcardsInBest = wildcards;
183  bestIsAmbiguous = false;
184  } else if (best and *best != label and wildcardsInBest == wildcards) {
185  bestIsAmbiguous = true;
186  }
187  return false;
188  };
189 
190  auto findAlias = aliasMap.equal_range(productModuleLabel);
191  for (auto it = findAlias.first; it != findAlias.second; ++it) {
192  std::string const& aliasInstanceLabel =
193  it->second.instanceLabel != "*" ? it->second.instanceLabel : it->second.originalInstanceLabel;
194  bool const instanceIsWildcard = (aliasInstanceLabel == "*");
195  if (instanceIsWildcard or consumesInfo.instance() == aliasInstanceLabel) {
196  bool const typeIsWildcard = it->second.friendlyClassName == "*";
197  if (typeIsWildcard or (consumesInfo.type().friendlyClassName() == it->second.friendlyClassName)) {
198  if (updateBest(it->second.originalModuleLabel, instanceIsWildcard, typeIsWildcard)) {
199  return it->second.originalModuleLabel;
200  }
201  } else if (consumesInfo.kindOfType() == ELEMENT_TYPE) {
202  //consume is a View so need to do more intrusive search
203  //find matching branches in module
204  auto branches = conditionalModuleBranches.equal_range(productModuleLabel);
205  for (auto itBranch = branches.first; itBranch != branches.second; ++it) {
206  if (typeIsWildcard or itBranch->second->productInstanceName() == it->second.originalInstanceLabel) {
207  if (productholderindexhelper::typeIsViewCompatible(consumesInfo.type(),
208  TypeID(itBranch->second->wrappedType().typeInfo()),
209  itBranch->second->className())) {
210  if (updateBest(it->second.originalModuleLabel, instanceIsWildcard, typeIsWildcard)) {
211  return it->second.originalModuleLabel;
212  }
213  }
214  }
215  }
216  }
217  }
218  }
219  if (bestIsAmbiguous) {
221  << "Encountered ambiguity when trying to find a best-matching alias for\n"
222  << " friendly class name " << consumesInfo.type().friendlyClassName() << "\n"
223  << " module label " << productModuleLabel << "\n"
224  << " product instance name " << consumesInfo.instance() << "\n"
225  << "when processing EDAliases for modules in ConditionalTasks. Two aliases have the same number of "
226  "wildcards ("
227  << wildcardsInBest << ")";
228  }
229  return best;
230  }
231  } // namespace
232 
233  // -----------------------------
234 
235  typedef std::vector<std::string> vstring;
236 
237  // -----------------------------
238 
240  public:
242 
244  ProductRegistry& preg,
245  PreallocationConfiguration const* prealloc,
246  std::shared_ptr<ProcessConfiguration const> processConfiguration,
247  WorkerManager& workerManager,
248  std::vector<std::string> const& trigPathNames) {
249  std::unordered_set<std::string> allConditionalMods;
250  for (auto const& pathName : trigPathNames) {
251  auto const modnames = proc_pset.getParameter<vstring>(pathName);
252 
253  //Pull out ConditionalTask modules
254  auto condRange = findConditionalTaskModulesRange(modnames);
255  if (condRange.first == condRange.second)
256  continue;
257 
258  //the last entry should be ignored since it is required to be "@"
259  allConditionalMods.insert(condRange.first, condRange.second);
260  }
261 
262  for (auto const& cond : allConditionalMods) {
263  //force the creation of the conditional modules so alias check can work
264  (void)getWorker(cond, proc_pset, workerManager, preg, prealloc, processConfiguration);
265  }
266 
267  fillAliasMap(proc_pset, allConditionalMods);
268  processSwitchEDAliases(proc_pset, preg, *processConfiguration, allConditionalMods);
269 
270  //find branches created by the conditional modules
271  for (auto const& prod : preg.productList()) {
272  if (allConditionalMods.find(prod.first.moduleLabel()) != allConditionalMods.end()) {
273  conditionalModsBranches_.emplace(prod.first.moduleLabel(), &prod.second);
274  }
275  }
276  }
277 
278  std::unordered_multimap<std::string, AliasInfo> const& aliasMap() const { return aliasMap_; }
279 
280  std::unordered_multimap<std::string, edm::BranchDescription const*> conditionalModuleBranches(
281  std::unordered_set<std::string> const& conditionalmods) const {
282  std::unordered_multimap<std::string, edm::BranchDescription const*> ret;
283  for (auto const& mod : conditionalmods) {
284  auto range = conditionalModsBranches_.equal_range(mod);
285  ret.insert(range.first, range.second);
286  }
287  return ret;
288  }
289 
290  private:
291  void fillAliasMap(ParameterSet const& proc_pset, std::unordered_set<std::string> const& allConditionalMods) {
292  auto aliases = proc_pset.getParameter<std::vector<std::string>>("@all_aliases");
293  std::string const star("*");
294  for (auto const& alias : aliases) {
295  auto info = proc_pset.getParameter<edm::ParameterSet>(alias);
296  auto aliasedToModuleLabels = info.getParameterNames();
297  for (auto const& mod : aliasedToModuleLabels) {
298  if (not mod.empty() and mod[0] != '@' and allConditionalMods.find(mod) != allConditionalMods.end()) {
299  auto aliasVPSet = info.getParameter<std::vector<edm::ParameterSet>>(mod);
300  for (auto const& aliasPSet : aliasVPSet) {
301  std::string type = star;
302  std::string instance = star;
303  std::string originalInstance = star;
304  if (aliasPSet.exists("type")) {
305  type = aliasPSet.getParameter<std::string>("type");
306  }
307  if (aliasPSet.exists("toProductInstance")) {
308  instance = aliasPSet.getParameter<std::string>("toProductInstance");
309  }
310  if (aliasPSet.exists("fromProductInstance")) {
311  originalInstance = aliasPSet.getParameter<std::string>("fromProductInstance");
312  }
313 
314  aliasMap_.emplace(alias, AliasInfo{type, instance, originalInstance, mod});
315  }
316  }
317  }
318  }
319  }
320 
321  void processSwitchEDAliases(ParameterSet const& proc_pset,
322  ProductRegistry& preg,
323  ProcessConfiguration const& processConfiguration,
324  std::unordered_set<std::string> const& allConditionalMods) {
325  auto const& all_modules = proc_pset.getParameter<std::vector<std::string>>("@all_modules");
326  std::vector<std::string> switchEDAliases;
327  for (auto const& module : all_modules) {
328  auto const& mod_pset = proc_pset.getParameter<edm::ParameterSet>(module);
329  if (mod_pset.getParameter<std::string>("@module_type") == "SwitchProducer") {
330  auto const& all_cases = mod_pset.getParameter<std::vector<std::string>>("@all_cases");
331  for (auto const& case_label : all_cases) {
332  auto range = aliasMap_.equal_range(case_label);
333  if (range.first != range.second) {
334  switchEDAliases.push_back(case_label);
335  }
336  }
337  }
338  }
340  switchEDAliases, allConditionalMods, proc_pset, processConfiguration.processName(), preg);
341  }
342 
343  std::unordered_multimap<std::string, AliasInfo> aliasMap_;
344  std::unordered_multimap<std::string, edm::BranchDescription const*> conditionalModsBranches_;
345  };
346 
347  // -----------------------------
348 
350  std::shared_ptr<TriggerResultInserter> inserter,
351  std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
352  std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
353  std::shared_ptr<ModuleRegistry> modReg,
354  ParameterSet& proc_pset,
355  service::TriggerNamesService const& tns,
356  PreallocationConfiguration const& prealloc,
357  ProductRegistry& preg,
359  std::shared_ptr<ActivityRegistry> areg,
360  std::shared_ptr<ProcessConfiguration const> processConfiguration,
361  StreamID streamID,
362  ProcessContext const* processContext)
363  : workerManager_(modReg, areg, actions),
364  actReg_(areg),
365  results_(new HLTGlobalStatus(tns.getTrigPaths().size())),
366  results_inserter_(),
367  trig_paths_(),
368  end_paths_(),
369  total_events_(),
370  total_passed_(),
371  number_of_unscheduled_modules_(0),
372  streamID_(streamID),
373  streamContext_(streamID_, processContext) {
374  bool hasPath = false;
375  std::vector<std::string> const& pathNames = tns.getTrigPaths();
376  std::vector<std::string> const& endPathNames = tns.getEndPaths();
377 
378  ConditionalTaskHelper conditionalTaskHelper(
379  proc_pset, preg, &prealloc, processConfiguration, workerManager_, pathNames);
380  std::unordered_set<std::string> conditionalModules;
381 
382  int trig_bitpos = 0;
383  trig_paths_.reserve(pathNames.size());
384  for (auto const& trig_name : pathNames) {
385  fillTrigPath(proc_pset,
386  preg,
387  &prealloc,
388  processConfiguration,
389  trig_bitpos,
390  trig_name,
391  results(),
392  endPathNames,
393  conditionalTaskHelper,
394  conditionalModules);
395  ++trig_bitpos;
396  hasPath = true;
397  }
398 
399  if (hasPath) {
400  // the results inserter stands alone
401  inserter->setTrigResultForStream(streamID.value(), results());
402 
403  results_inserter_ = makeInserter(actions, actReg_, inserter);
405  }
406 
407  // fill normal endpaths
408  int bitpos = 0;
409  end_paths_.reserve(endPathNames.size());
410  for (auto const& end_path_name : endPathNames) {
411  fillEndPath(proc_pset,
412  preg,
413  &prealloc,
414  processConfiguration,
415  bitpos,
416  end_path_name,
417  endPathNames,
418  conditionalTaskHelper,
419  conditionalModules);
420  ++bitpos;
421  }
422 
423  makePathStatusInserters(pathStatusInserters, endPathStatusInserters, actions);
424 
425  //See if all modules were used
426  std::set<std::string> usedWorkerLabels;
427  for (auto const& worker : allWorkers()) {
428  usedWorkerLabels.insert(worker->description()->moduleLabel());
429  }
430  std::vector<std::string> modulesInConfig(proc_pset.getParameter<std::vector<std::string>>("@all_modules"));
431  std::set<std::string> modulesInConfigSet(modulesInConfig.begin(), modulesInConfig.end());
432  std::vector<std::string> unusedLabels;
433  set_difference(modulesInConfigSet.begin(),
434  modulesInConfigSet.end(),
435  usedWorkerLabels.begin(),
436  usedWorkerLabels.end(),
437  back_inserter(unusedLabels));
438  std::set<std::string> unscheduledLabels;
439  std::vector<std::string> shouldBeUsedLabels;
440  if (!unusedLabels.empty()) {
441  //Need to
442  // 1) create worker
443  // 2) if it is a WorkerT<EDProducer>, add it to our list
444  // 3) hand list to our delayed reader
445  for (auto const& label : unusedLabels) {
446  bool isTracked;
447  ParameterSet* modulePSet(proc_pset.getPSetForUpdate(label, isTracked));
448  assert(isTracked);
449  assert(modulePSet != nullptr);
451  *modulePSet, preg, &prealloc, processConfiguration, label, unscheduledLabels, shouldBeUsedLabels);
452  }
453  if (!shouldBeUsedLabels.empty()) {
454  std::ostringstream unusedStream;
455  unusedStream << "'" << shouldBeUsedLabels.front() << "'";
456  for (std::vector<std::string>::iterator itLabel = shouldBeUsedLabels.begin() + 1,
457  itLabelEnd = shouldBeUsedLabels.end();
458  itLabel != itLabelEnd;
459  ++itLabel) {
460  unusedStream << ",'" << *itLabel << "'";
461  }
462  LogInfo("path") << "The following module labels are not assigned to any path:\n" << unusedStream.str() << "\n";
463  }
464  }
465  number_of_unscheduled_modules_ = unscheduledLabels.size();
466 
467  // Print conditional modules that were not consumed in any of their associated Paths
468  if (streamID.value() == 0 and not conditionalModules.empty()) {
469  // Intersection of unscheduled and ConditionalTask modules gives
470  // directly the set of conditional modules that were not
471  // consumed by anything in the Paths associated to the
472  // corresponding ConditionalTask.
473  std::vector<std::string_view> labelsToPrint;
474  std::copy_if(
475  unscheduledLabels.begin(),
476  unscheduledLabels.end(),
477  std::back_inserter(labelsToPrint),
478  [&conditionalModules](auto const& lab) { return conditionalModules.find(lab) != conditionalModules.end(); });
479 
480  if (not labelsToPrint.empty()) {
481  edm::LogWarning log("NonConsumedConditionalModules");
482  log << "The following modules were part of some ConditionalTask, but were not\n"
483  << "consumed by any other module in any of the Paths to which the ConditionalTask\n"
484  << "was associated. Perhaps they should be either removed from the\n"
485  << "job, or moved to a Task to make it explicit they are unscheduled.\n";
486  for (auto const& modLabel : labelsToPrint) {
487  log.format("\n {}", modLabel);
488  }
489  }
490  }
491  } // StreamSchedule::StreamSchedule
492 
494  std::vector<std::string> const& branchesToDeleteEarly,
495  std::multimap<std::string, std::string> const& referencesToBranches,
496  std::vector<std::string> const& modulesToSkip,
497  edm::ProductRegistry const& preg) {
498  // setup the list with those products actually registered for this job
499  std::multimap<std::string, Worker*> branchToReadingWorker;
500  initializeBranchToReadingWorker(branchesToDeleteEarly, preg, branchToReadingWorker);
501 
502  const std::vector<std::string> kEmpty;
503  std::map<Worker*, unsigned int> reserveSizeForWorker;
504  unsigned int upperLimitOnReadingWorker = 0;
505  unsigned int upperLimitOnIndicies = 0;
506  unsigned int nUniqueBranchesToDelete = branchToReadingWorker.size();
507 
508  //talk with output modules first
509  modReg.forAllModuleHolders([&branchToReadingWorker, &nUniqueBranchesToDelete](maker::ModuleHolder* iHolder) {
510  auto comm = iHolder->createOutputModuleCommunicator();
511  if (comm) {
512  if (!branchToReadingWorker.empty()) {
513  //If an OutputModule needs a product, we can't delete it early
514  // so we should remove it from our list
515  SelectedProductsForBranchType const& kept = comm->keptProducts();
516  for (auto const& item : kept[InEvent]) {
517  BranchDescription const& desc = *item.first;
518  auto found = branchToReadingWorker.equal_range(desc.branchName());
519  if (found.first != found.second) {
520  --nUniqueBranchesToDelete;
521  branchToReadingWorker.erase(found.first, found.second);
522  }
523  }
524  }
525  }
526  });
527 
528  if (branchToReadingWorker.empty()) {
529  return;
530  }
531 
532  std::unordered_set<std::string> modulesToExclude(modulesToSkip.begin(), modulesToSkip.end());
533  for (auto w : allWorkers()) {
534  if (modulesToExclude.end() != modulesToExclude.find(w->description()->moduleLabel())) {
535  continue;
536  }
537  //determine if this module could read a branch we want to delete early
538  auto consumes = w->consumesInfo();
539  if (not consumes.empty()) {
540  bool foundAtLeastOneMatchingBranch = false;
541  for (auto const& product : consumes) {
542  std::string branch = fmt::format("{}_{}_{}_{}",
543  product.type().friendlyClassName(),
544  product.label().data(),
545  product.instance().data(),
546  product.process().data());
547  {
548  //Handle case where worker directly consumes product
549  auto found = branchToReadingWorker.end();
550  if (product.process().empty()) {
551  auto startFound = branchToReadingWorker.lower_bound(branch);
552  if (startFound != branchToReadingWorker.end()) {
553  if (startFound->first.substr(0, branch.size()) == branch) {
554  //match all processNames here, even if it means multiple matches will happen
555  found = startFound;
556  }
557  }
558  } else {
559  auto exactFound = branchToReadingWorker.equal_range(branch);
560  if (exactFound.first != exactFound.second) {
561  found = exactFound.first;
562  }
563  }
564  if (found != branchToReadingWorker.end()) {
565  if (not foundAtLeastOneMatchingBranch) {
566  ++upperLimitOnReadingWorker;
567  foundAtLeastOneMatchingBranch = true;
568  }
569  ++upperLimitOnIndicies;
570  ++reserveSizeForWorker[w];
571  if (nullptr == found->second) {
572  found->second = w;
573  } else {
574  branchToReadingWorker.insert(make_pair(found->first, w));
575  }
576  }
577  }
578  {
579  //Handle case where indirectly consumes product
580  auto found = referencesToBranches.end();
581  if (product.process().empty()) {
582  auto startFound = referencesToBranches.lower_bound(branch);
583  if (startFound != referencesToBranches.end()) {
584  if (startFound->first.substr(0, branch.size()) == branch) {
585  //match all processNames here, even if it means multiple matches will happen
586  found = startFound;
587  }
588  }
589  } else {
590  //can match exactly
591  auto exactFound = referencesToBranches.equal_range(branch);
592  if (exactFound.first != exactFound.second) {
593  found = exactFound.first;
594  }
595  }
596  if (found != referencesToBranches.end()) {
597  for (auto itr = found; (itr != referencesToBranches.end()) and (itr->first == found->first); ++itr) {
598  auto foundInBranchToReadingWorker = branchToReadingWorker.find(itr->second);
599  if (foundInBranchToReadingWorker == branchToReadingWorker.end()) {
600  continue;
601  }
602  if (not foundAtLeastOneMatchingBranch) {
603  ++upperLimitOnReadingWorker;
604  foundAtLeastOneMatchingBranch = true;
605  }
606  ++upperLimitOnIndicies;
607  ++reserveSizeForWorker[w];
608  if (nullptr == foundInBranchToReadingWorker->second) {
609  foundInBranchToReadingWorker->second = w;
610  } else {
611  branchToReadingWorker.insert(make_pair(itr->second, w));
612  }
613  }
614  }
615  }
616  }
617  }
618  }
619  {
620  auto it = branchToReadingWorker.begin();
621  std::vector<std::string> unusedBranches;
622  while (it != branchToReadingWorker.end()) {
623  if (it->second == nullptr) {
624  unusedBranches.push_back(it->first);
625  //erasing the object invalidates the iterator so must advance it first
626  auto temp = it;
627  ++it;
628  branchToReadingWorker.erase(temp);
629  } else {
630  ++it;
631  }
632  }
633  if (not unusedBranches.empty()) {
634  LogWarning l("UnusedProductsForCanDeleteEarly");
635  l << "The following products in the 'canDeleteEarly' list are not used in this job and will be ignored.\n"
636  " If possible, remove the producer from the job.";
637  for (auto const& n : unusedBranches) {
638  l << "\n " << n;
639  }
640  }
641  }
642  if (!branchToReadingWorker.empty()) {
643  earlyDeleteHelpers_.reserve(upperLimitOnReadingWorker);
644  earlyDeleteHelperToBranchIndicies_.resize(upperLimitOnIndicies, 0);
645  earlyDeleteBranchToCount_.reserve(nUniqueBranchesToDelete);
646  std::map<const Worker*, EarlyDeleteHelper*> alreadySeenWorkers;
647  std::string lastBranchName;
648  size_t nextOpenIndex = 0;
649  unsigned int* beginAddress = &(earlyDeleteHelperToBranchIndicies_.front());
650  for (auto& branchAndWorker : branchToReadingWorker) {
651  if (lastBranchName != branchAndWorker.first) {
652  //have to put back the period we removed earlier in order to get the proper name
653  BranchID bid(branchAndWorker.first + ".");
654  earlyDeleteBranchToCount_.emplace_back(bid, 0U);
655  lastBranchName = branchAndWorker.first;
656  }
657  auto found = alreadySeenWorkers.find(branchAndWorker.second);
658  if (alreadySeenWorkers.end() == found) {
659  //NOTE: we will set aside enough space in earlyDeleteHelperToBranchIndicies_ to accommodate
660  // all the branches that might be read by this worker. However, initially we will only tell the
661  // EarlyDeleteHelper about the first one. As additional branches are added via 'appendIndex' the
662  // EarlyDeleteHelper will automatically advance its internal end pointer.
663  size_t index = nextOpenIndex;
664  size_t nIndices = reserveSizeForWorker[branchAndWorker.second];
667  earlyDeleteHelpers_.emplace_back(beginAddress + index, beginAddress + index + 1, &earlyDeleteBranchToCount_);
668  branchAndWorker.second->setEarlyDeleteHelper(&(earlyDeleteHelpers_.back()));
669  alreadySeenWorkers.insert(std::make_pair(branchAndWorker.second, &(earlyDeleteHelpers_.back())));
670  nextOpenIndex += nIndices;
671  } else {
672  found->second->appendIndex(earlyDeleteBranchToCount_.size() - 1);
673  }
674  }
675 
676  //Now we can compactify the earlyDeleteHelperToBranchIndicies_ since we may have over estimated the
677  // space needed for each module
678  auto itLast = earlyDeleteHelpers_.begin();
679  for (auto it = earlyDeleteHelpers_.begin() + 1; it != earlyDeleteHelpers_.end(); ++it) {
680  if (itLast->end() != it->begin()) {
681  //figure the offset for next Worker since it hasn't been moved yet so it has the original address
682  unsigned int delta = it->begin() - itLast->end();
683  it->shiftIndexPointers(delta);
684 
686  earlyDeleteHelperToBranchIndicies_.begin() + (itLast->end() - beginAddress),
687  earlyDeleteHelperToBranchIndicies_.begin() + (it->begin() - beginAddress));
688  }
689  itLast = it;
690  }
692  earlyDeleteHelperToBranchIndicies_.begin() + (itLast->end() - beginAddress),
694 
695  //now tell the paths about the deleters
696  for (auto& p : trig_paths_) {
697  p.setEarlyDeleteHelpers(alreadySeenWorkers);
698  }
699  for (auto& p : end_paths_) {
700  p.setEarlyDeleteHelpers(alreadySeenWorkers);
701  }
703  }
704  }
705 
707  Worker* worker,
708  std::unordered_set<std::string>& conditionalModules,
709  std::unordered_multimap<std::string, edm::BranchDescription const*> const& conditionalModuleBranches,
710  std::unordered_multimap<std::string, AliasInfo> const& aliasMap,
711  ParameterSet& proc_pset,
712  ProductRegistry& preg,
713  PreallocationConfiguration const* prealloc,
714  std::shared_ptr<ProcessConfiguration const> processConfiguration) {
715  std::vector<Worker*> returnValue;
716  auto const& consumesInfo = worker->consumesInfo();
717  auto moduleLabel = worker->description()->moduleLabel();
718  using namespace productholderindexhelper;
719  for (auto const& ci : consumesInfo) {
720  if (not ci.skipCurrentProcess() and
721  (ci.process().empty() or ci.process() == processConfiguration->processName())) {
722  auto productModuleLabel = std::string(ci.label());
723  bool productFromConditionalModule = false;
724  auto itFound = conditionalModules.find(productModuleLabel);
725  if (itFound == conditionalModules.end()) {
726  //Check to see if this was an alias
727  //note that aliasMap was previously filtered so only the conditional modules remain there
728  auto foundAlias = findBestMatchingAlias(conditionalModuleBranches, aliasMap, productModuleLabel, ci);
729  if (foundAlias) {
730  productModuleLabel = *foundAlias;
731  productFromConditionalModule = true;
732  itFound = conditionalModules.find(productModuleLabel);
733  //check that the alias-for conditional module has not been used
734  if (itFound == conditionalModules.end()) {
735  continue;
736  }
737  }
738  } else {
739  //need to check the rest of the data product info
740  auto findBranches = conditionalModuleBranches.equal_range(productModuleLabel);
741  for (auto itBranch = findBranches.first; itBranch != findBranches.second; ++itBranch) {
742  if (itBranch->second->productInstanceName() == ci.instance()) {
743  if (ci.kindOfType() == PRODUCT_TYPE) {
744  if (ci.type() == itBranch->second->unwrappedTypeID()) {
745  productFromConditionalModule = true;
746  break;
747  }
748  } else {
749  //this is a view
751  ci.type(), TypeID(itBranch->second->wrappedType().typeInfo()), itBranch->second->className())) {
752  productFromConditionalModule = true;
753  break;
754  }
755  }
756  }
757  }
758  }
759  if (productFromConditionalModule) {
760  auto condWorker =
761  getWorker(productModuleLabel, proc_pset, workerManager_, preg, prealloc, processConfiguration);
762  assert(condWorker);
763 
764  conditionalModules.erase(itFound);
765 
766  auto dependents = tryToPlaceConditionalModules(condWorker,
767  conditionalModules,
768  conditionalModuleBranches,
769  aliasMap,
770  proc_pset,
771  preg,
772  prealloc,
773  processConfiguration);
774  returnValue.insert(returnValue.end(), dependents.begin(), dependents.end());
775  returnValue.push_back(condWorker);
776  }
777  }
778  }
779  return returnValue;
780  }
781 
783  ProductRegistry& preg,
784  PreallocationConfiguration const* prealloc,
785  std::shared_ptr<ProcessConfiguration const> processConfiguration,
786  std::string const& pathName,
787  bool ignoreFilters,
788  PathWorkers& out,
789  std::vector<std::string> const& endPathNames,
790  ConditionalTaskHelper const& conditionalTaskHelper,
791  std::unordered_set<std::string>& allConditionalModules) {
792  vstring modnames = proc_pset.getParameter<vstring>(pathName);
793  PathWorkers tmpworkers;
794 
795  //Pull out ConditionalTask modules
796  auto condRange = findConditionalTaskModulesRange(modnames);
797 
798  std::unordered_set<std::string> conditionalmods;
799  //An EDAlias may be redirecting to a module on a ConditionalTask
800  std::unordered_multimap<std::string, edm::BranchDescription const*> conditionalModsBranches;
801  std::unordered_map<std::string, unsigned int> conditionalModOrder;
802  if (condRange.first != condRange.second) {
803  for (auto it = condRange.first; it != condRange.second; ++it) {
804  // ordering needs to skip the # token in the path list
805  conditionalModOrder.emplace(*it, it - modnames.begin() - 1);
806  }
807  //the last entry should be ignored since it is required to be "@"
808  conditionalmods = std::unordered_set<std::string>(std::make_move_iterator(condRange.first),
809  std::make_move_iterator(condRange.second));
810 
811  conditionalModsBranches = conditionalTaskHelper.conditionalModuleBranches(conditionalmods);
812  modnames.erase(std::prev(condRange.first), modnames.end());
813 
814  // Make a union of all conditional modules from all Paths
815  allConditionalModules.insert(conditionalmods.begin(), conditionalmods.end());
816  }
817 
818  unsigned int placeInPath = 0;
819  for (auto const& name : modnames) {
820  //Modules except EDFilters are set to run concurrently by default
821  bool doNotRunConcurrently = false;
823  if (name[0] == '!') {
824  filterAction = WorkerInPath::Veto;
825  } else if (name[0] == '-' or name[0] == '+') {
826  filterAction = WorkerInPath::Ignore;
827  }
828  if (name[0] == '|' or name[0] == '+') {
829  //cms.wait was specified so do not run concurrently
830  doNotRunConcurrently = true;
831  }
832 
834  if (filterAction != WorkerInPath::Normal or name[0] == '|') {
835  moduleLabel.erase(0, 1);
836  }
837 
838  Worker* worker = getWorker(moduleLabel, proc_pset, workerManager_, preg, prealloc, processConfiguration);
839  if (worker == nullptr) {
840  std::string pathType("endpath");
841  if (!search_all(endPathNames, pathName)) {
842  pathType = std::string("path");
843  }
845  << "The unknown module label \"" << moduleLabel << "\" appears in " << pathType << " \"" << pathName
846  << "\"\n please check spelling or remove that label from the path.";
847  }
848 
849  if (ignoreFilters && filterAction != WorkerInPath::Ignore && worker->moduleType() == Worker::kFilter) {
850  // We have a filter on an end path, and the filter is not explicitly ignored.
851  // See if the filter is allowed.
852  std::vector<std::string> allowed_filters = proc_pset.getUntrackedParameter<vstring>("@filters_on_endpaths");
853  if (!search_all(allowed_filters, worker->description()->moduleName())) {
854  // Filter is not allowed. Ignore the result, and issue a warning.
855  filterAction = WorkerInPath::Ignore;
856  LogWarning("FilterOnEndPath") << "The EDFilter '" << worker->description()->moduleName()
857  << "' with module label '" << moduleLabel << "' appears on EndPath '"
858  << pathName << "'.\n"
859  << "The return value of the filter will be ignored.\n"
860  << "To suppress this warning, either remove the filter from the endpath,\n"
861  << "or explicitly ignore it in the configuration by using cms.ignore().\n";
862  }
863  }
864  bool runConcurrently = not doNotRunConcurrently;
865  if (runConcurrently && worker->moduleType() == Worker::kFilter and filterAction != WorkerInPath::Ignore) {
866  runConcurrently = false;
867  }
868 
869  auto condModules = tryToPlaceConditionalModules(worker,
870  conditionalmods,
871  conditionalModsBranches,
872  conditionalTaskHelper.aliasMap(),
873  proc_pset,
874  preg,
875  prealloc,
876  processConfiguration);
877  for (auto condMod : condModules) {
878  tmpworkers.emplace_back(
879  condMod, WorkerInPath::Ignore, conditionalModOrder[condMod->description()->moduleLabel()], true);
880  }
881 
882  tmpworkers.emplace_back(worker, filterAction, placeInPath, runConcurrently);
883  ++placeInPath;
884  }
885 
886  out.swap(tmpworkers);
887  }
888 
890  ProductRegistry& preg,
891  PreallocationConfiguration const* prealloc,
892  std::shared_ptr<ProcessConfiguration const> processConfiguration,
893  int bitpos,
894  std::string const& name,
895  TrigResPtr trptr,
896  std::vector<std::string> const& endPathNames,
897  ConditionalTaskHelper const& conditionalTaskHelper,
898  std::unordered_set<std::string>& allConditionalModules) {
899  PathWorkers tmpworkers;
900  fillWorkers(proc_pset,
901  preg,
902  prealloc,
903  processConfiguration,
904  name,
905  false,
906  tmpworkers,
907  endPathNames,
908  conditionalTaskHelper,
909  allConditionalModules);
910 
911  // an empty path will cause an extra bit that is not used
912  if (!tmpworkers.empty()) {
913  trig_paths_.emplace_back(
914  bitpos, name, tmpworkers, trptr, actionTable(), actReg_, &streamContext_, PathContext::PathType::kPath);
915  } else {
916  empty_trig_paths_.push_back(bitpos);
917  }
918  for (WorkerInPath const& workerInPath : tmpworkers) {
919  addToAllWorkers(workerInPath.getWorker());
920  }
921  }
922 
924  ProductRegistry& preg,
925  PreallocationConfiguration const* prealloc,
926  std::shared_ptr<ProcessConfiguration const> processConfiguration,
927  int bitpos,
928  std::string const& name,
929  std::vector<std::string> const& endPathNames,
930  ConditionalTaskHelper const& conditionalTaskHelper,
931  std::unordered_set<std::string>& allConditionalModules) {
932  PathWorkers tmpworkers;
933  fillWorkers(proc_pset,
934  preg,
935  prealloc,
936  processConfiguration,
937  name,
938  true,
939  tmpworkers,
940  endPathNames,
941  conditionalTaskHelper,
942  allConditionalModules);
943 
944  if (!tmpworkers.empty()) {
945  end_paths_.emplace_back(bitpos,
946  name,
947  tmpworkers,
948  TrigResPtr(),
949  actionTable(),
950  actReg_,
953  } else {
954  empty_end_paths_.push_back(bitpos);
955  }
956  for (WorkerInPath const& workerInPath : tmpworkers) {
957  addToAllWorkers(workerInPath.getWorker());
958  }
959  }
960 
962 
964 
966  Worker* found = nullptr;
967  for (auto const& worker : allWorkers()) {
968  if (worker->description()->moduleLabel() == iLabel) {
969  found = worker;
970  break;
971  }
972  }
973  if (nullptr == found) {
974  return;
975  }
976 
977  iMod->replaceModuleFor(found);
978  found->beginStream(streamID_, streamContext_);
979  }
980 
982 
983  std::vector<ModuleDescription const*> StreamSchedule::getAllModuleDescriptions() const {
984  std::vector<ModuleDescription const*> result;
985  result.reserve(allWorkers().size());
986 
987  for (auto const& worker : allWorkers()) {
988  ModuleDescription const* p = worker->description();
989  result.push_back(p);
990  }
991  return result;
992  }
993 
995  WaitingTaskHolder iTask,
997  ServiceToken const& serviceToken,
998  std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters) {
999  EventPrincipal& ep = info.principal();
1000 
1001  // Caught exception is propagated via WaitingTaskHolder
1002  CMS_SA_ALLOW try {
1003  this->resetAll();
1004 
1006 
1007  Traits::setStreamContext(streamContext_, ep);
1008  //a service may want to communicate with another service
1009  ServiceRegistry::Operate guard(serviceToken);
1010  Traits::preScheduleSignal(actReg_.get(), &streamContext_);
1011 
1012  // Data dependencies need to be set up before marking empty
1013  // (End)Paths complete in case something consumes the status of
1014  // the empty (EndPath)
1017 
1018  HLTPathStatus hltPathStatus(hlt::Pass, 0);
1019  for (int empty_trig_path : empty_trig_paths_) {
1020  results_->at(empty_trig_path) = hltPathStatus;
1021  pathStatusInserters[empty_trig_path]->setPathStatus(streamID_, hltPathStatus);
1022  std::exception_ptr except = pathStatusInserterWorkers_[empty_trig_path]
1025  if (except) {
1026  iTask.doneWaiting(except);
1027  return;
1028  }
1029  }
1030  for (int empty_end_path : empty_end_paths_) {
1031  std::exception_ptr except = endPathStatusInserterWorkers_[empty_end_path]
1034  if (except) {
1035  iTask.doneWaiting(except);
1036  return;
1037  }
1038  }
1039 
1040  ++total_events_;
1041 
1042  //use to give priorities on an error to ones from Paths
1043  auto pathErrorHolder = std::make_unique<std::atomic<std::exception_ptr*>>(nullptr);
1044  auto pathErrorPtr = pathErrorHolder.get();
1045  ServiceWeakToken weakToken = serviceToken;
1046  auto allPathsDone = make_waiting_task(
1047  [iTask, this, weakToken, pathError = std::move(pathErrorHolder)](std::exception_ptr const* iPtr) mutable {
1048  ServiceRegistry::Operate operate(weakToken.lock());
1049 
1050  std::exception_ptr ptr;
1051  if (pathError->load()) {
1052  ptr = *pathError->load();
1053  delete pathError->load();
1054  }
1055  if ((not ptr) and iPtr) {
1056  ptr = *iPtr;
1057  }
1058  iTask.doneWaiting(finishProcessOneEvent(ptr));
1059  });
1060  //The holder guarantees that if the paths finish before the loop ends
1061  // that we do not start too soon. It also guarantees that the task will
1062  // run under that condition.
1063  WaitingTaskHolder allPathsHolder(*iTask.group(), allPathsDone);
1064 
1065  auto pathsDone = make_waiting_task([allPathsHolder, pathErrorPtr, transitionInfo = info, this, weakToken](
1066  std::exception_ptr const* iPtr) mutable {
1067  ServiceRegistry::Operate operate(weakToken.lock());
1068 
1069  if (iPtr) {
1070  //this is used to prioritize this error over one
1071  // that happens in EndPath or Accumulate
1072  pathErrorPtr->store(new std::exception_ptr(*iPtr));
1073  }
1074  finishedPaths(*pathErrorPtr, std::move(allPathsHolder), transitionInfo);
1075  });
1076 
1077  //The holder guarantees that if the paths finish before the loop ends
1078  // that we do not start too soon. It also guarantees that the task will
1079  // run under that condition.
1080  WaitingTaskHolder taskHolder(*iTask.group(), pathsDone);
1081 
1082  //start end paths first so on single threaded the paths will run first
1083  WaitingTaskHolder hAllPathsDone(*iTask.group(), allPathsDone);
1084  for (auto it = end_paths_.rbegin(), itEnd = end_paths_.rend(); it != itEnd; ++it) {
1085  it->processOneOccurrenceAsync(hAllPathsDone, info, serviceToken, streamID_, &streamContext_);
1086  }
1087 
1088  for (auto it = trig_paths_.rbegin(), itEnd = trig_paths_.rend(); it != itEnd; ++it) {
1089  it->processOneOccurrenceAsync(taskHolder, info, serviceToken, streamID_, &streamContext_);
1090  }
1091 
1092  ParentContext parentContext(&streamContext_);
1094  hAllPathsDone, info, serviceToken, streamID_, parentContext, &streamContext_);
1095  } catch (...) {
1096  iTask.doneWaiting(std::current_exception());
1097  }
1098  }
1099 
1100  void StreamSchedule::finishedPaths(std::atomic<std::exception_ptr*>& iExcept,
1101  WaitingTaskHolder iWait,
1103  if (iExcept) {
1104  // Caught exception is propagated via WaitingTaskHolder
1105  CMS_SA_ALLOW try { std::rethrow_exception(*(iExcept.load())); } catch (cms::Exception& e) {
1109  edm::printCmsExceptionWarning("TryToContinue", e);
1110  *(iExcept.load()) = std::exception_ptr();
1111  } else {
1112  *(iExcept.load()) = std::current_exception();
1113  }
1114  } catch (...) {
1115  *(iExcept.load()) = std::current_exception();
1116  }
1117  }
1118 
1119  if ((not iExcept) and results_->accept()) {
1120  ++total_passed_;
1121  }
1122 
1123  if (nullptr != results_inserter_.get()) {
1124  // Caught exception is propagated to the caller
1125  CMS_SA_ALLOW try {
1126  //Even if there was an exception, we need to allow results inserter
1127  // to run since some module may be waiting on its results.
1128  ParentContext parentContext(&streamContext_);
1130 
1131  auto expt = results_inserter_->runModuleDirectly<Traits>(info, streamID_, parentContext, &streamContext_);
1132  if (expt) {
1133  std::rethrow_exception(expt);
1134  }
1135  } catch (cms::Exception& ex) {
1136  if (not iExcept) {
1137  if (ex.context().empty()) {
1138  std::ostringstream ost;
1139  ost << "Processing Event " << info.principal().id();
1140  ex.addContext(ost.str());
1141  }
1142  iExcept.store(new std::exception_ptr(std::current_exception()));
1143  }
1144  } catch (...) {
1145  if (not iExcept) {
1146  iExcept.store(new std::exception_ptr(std::current_exception()));
1147  }
1148  }
1149  }
1150  std::exception_ptr ptr;
1151  if (iExcept) {
1152  ptr = *iExcept.load();
1153  }
1154  iWait.doneWaiting(ptr);
1155  }
1156 
1157  std::exception_ptr StreamSchedule::finishProcessOneEvent(std::exception_ptr iExcept) {
1159 
1160  if (iExcept) {
1161  //add context information to the exception and print message
1162  try {
1163  convertException::wrap([&]() { std::rethrow_exception(iExcept); });
1164  } catch (cms::Exception& ex) {
1165  bool const cleaningUpAfterException = false;
1166  if (ex.context().empty()) {
1167  addContextAndPrintException("Calling function StreamSchedule::processOneEvent", ex, cleaningUpAfterException);
1168  } else {
1169  addContextAndPrintException("", ex, cleaningUpAfterException);
1170  }
1171  iExcept = std::current_exception();
1172  }
1173 
1174  actReg_->preStreamEarlyTerminationSignal_(streamContext_, TerminationOrigin::ExceptionFromThisContext);
1175  }
1176  // Caught exception is propagated to the caller
1177  CMS_SA_ALLOW try { Traits::postScheduleSignal(actReg_.get(), &streamContext_); } catch (...) {
1178  if (not iExcept) {
1179  iExcept = std::current_exception();
1180  }
1181  }
1182  if (not iExcept) {
1183  resetEarlyDelete();
1184  }
1185 
1186  return iExcept;
1187  }
1188 
1189  void StreamSchedule::availablePaths(std::vector<std::string>& oLabelsToFill) const {
1190  oLabelsToFill.reserve(trig_paths_.size());
1191  std::transform(trig_paths_.begin(),
1192  trig_paths_.end(),
1193  std::back_inserter(oLabelsToFill),
1194  std::bind(&Path::name, std::placeholders::_1));
1195  }
1196 
1197  void StreamSchedule::modulesInPath(std::string const& iPathLabel, std::vector<std::string>& oLabelsToFill) const {
1198  TrigPaths::const_iterator itFound = std::find_if(
1199  trig_paths_.begin(),
1200  trig_paths_.end(),
1201  std::bind(std::equal_to<std::string>(), iPathLabel, std::bind(&Path::name, std::placeholders::_1)));
1202  if (itFound != trig_paths_.end()) {
1203  oLabelsToFill.reserve(itFound->size());
1204  for (size_t i = 0; i < itFound->size(); ++i) {
1205  oLabelsToFill.push_back(itFound->getWorker(i)->description()->moduleLabel());
1206  }
1207  }
1208  }
1209 
1211  std::vector<ModuleDescription const*>& descriptions,
1212  unsigned int hint) const {
1213  descriptions.clear();
1214  bool found = false;
1215  TrigPaths::const_iterator itFound;
1216 
1217  if (hint < trig_paths_.size()) {
1218  itFound = trig_paths_.begin() + hint;
1219  if (itFound->name() == iPathLabel)
1220  found = true;
1221  }
1222  if (!found) {
1223  // if the hint did not work, do it the slow way
1224  itFound = std::find_if(
1225  trig_paths_.begin(),
1226  trig_paths_.end(),
1227  std::bind(std::equal_to<std::string>(), iPathLabel, std::bind(&Path::name, std::placeholders::_1)));
1228  if (itFound != trig_paths_.end())
1229  found = true;
1230  }
1231  if (found) {
1232  descriptions.reserve(itFound->size());
1233  for (size_t i = 0; i < itFound->size(); ++i) {
1234  descriptions.push_back(itFound->getWorker(i)->description());
1235  }
1236  }
1237  }
1238 
1240  std::vector<ModuleDescription const*>& descriptions,
1241  unsigned int hint) const {
1242  descriptions.clear();
1243  bool found = false;
1244  TrigPaths::const_iterator itFound;
1245 
1246  if (hint < end_paths_.size()) {
1247  itFound = end_paths_.begin() + hint;
1248  if (itFound->name() == iEndPathLabel)
1249  found = true;
1250  }
1251  if (!found) {
1252  // if the hint did not work, do it the slow way
1253  itFound = std::find_if(
1254  end_paths_.begin(),
1255  end_paths_.end(),
1256  std::bind(std::equal_to<std::string>(), iEndPathLabel, std::bind(&Path::name, std::placeholders::_1)));
1257  if (itFound != end_paths_.end())
1258  found = true;
1259  }
1260  if (found) {
1261  descriptions.reserve(itFound->size());
1262  for (size_t i = 0; i < itFound->size(); ++i) {
1263  descriptions.push_back(itFound->getWorker(i)->description());
1264  }
1265  }
1266  }
1267 
1268  static void fillModuleInPathSummary(Path const& path, size_t which, ModuleInPathSummary& sum) {
1269  sum.timesVisited += path.timesVisited(which);
1270  sum.timesPassed += path.timesPassed(which);
1271  sum.timesFailed += path.timesFailed(which);
1272  sum.timesExcept += path.timesExcept(which);
1273  sum.moduleLabel = path.getWorker(which)->description()->moduleLabel();
1274  sum.bitPosition = path.bitPosition(which);
1275  }
1276 
1277  static void fillPathSummary(Path const& path, PathSummary& sum) {
1278  sum.name = path.name();
1279  sum.bitPosition = path.bitPosition();
1280  sum.timesRun += path.timesRun();
1281  sum.timesPassed += path.timesPassed();
1282  sum.timesFailed += path.timesFailed();
1283  sum.timesExcept += path.timesExcept();
1284 
1285  Path::size_type sz = path.size();
1286  if (sum.moduleInPathSummaries.empty()) {
1287  std::vector<ModuleInPathSummary> temp(sz);
1288  for (size_t i = 0; i != sz; ++i) {
1290  }
1291  sum.moduleInPathSummaries.swap(temp);
1292  } else {
1293  assert(sz == sum.moduleInPathSummaries.size());
1294  for (size_t i = 0; i != sz; ++i) {
1296  }
1297  }
1298  }
1299 
1300  static void fillWorkerSummaryAux(Worker const& w, WorkerSummary& sum) {
1301  sum.timesVisited += w.timesVisited();
1302  sum.timesRun += w.timesRun();
1303  sum.timesPassed += w.timesPassed();
1304  sum.timesFailed += w.timesFailed();
1305  sum.timesExcept += w.timesExcept();
1306  sum.moduleLabel = w.description()->moduleLabel();
1307  }
1308 
1309  static void fillWorkerSummary(Worker const* pw, WorkerSummary& sum) { fillWorkerSummaryAux(*pw, sum); }
1310 
1312  rep.eventSummary.totalEvents += totalEvents();
1313  rep.eventSummary.totalEventsPassed += totalEventsPassed();
1314  rep.eventSummary.totalEventsFailed += totalEventsFailed();
1315 
1316  fill_summary(trig_paths_, rep.trigPathSummaries, &fillPathSummary);
1317  fill_summary(end_paths_, rep.endPathSummaries, &fillPathSummary);
1318  fill_summary(allWorkers(), rep.workerSummaries, &fillWorkerSummary);
1319  }
1320 
1322  using std::placeholders::_1;
1324  for_all(trig_paths_, std::bind(&Path::clearCounters, _1));
1325  for_all(end_paths_, std::bind(&Path::clearCounters, _1));
1326  for_all(allWorkers(), std::bind(&Worker::clearCounters, _1));
1327  }
1328 
1329  void StreamSchedule::resetAll() { results_->reset(); }
1330 
1332 
1334  //must be sure we have cleared the count first
1335  for (auto& count : earlyDeleteBranchToCount_) {
1336  count.count = 0;
1337  }
1338  //now reset based on how many helpers use that branch
1340  ++(earlyDeleteBranchToCount_[index].count);
1341  }
1342  for (auto& helper : earlyDeleteHelpers_) {
1343  helper.reset();
1344  }
1345  }
1346 
1348  std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
1349  std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
1351  int bitpos = 0;
1352  unsigned int indexEmpty = 0;
1353  unsigned int indexOfPath = 0;
1354  for (auto& pathStatusInserter : pathStatusInserters) {
1355  std::shared_ptr<PathStatusInserter> inserterPtr = get_underlying(pathStatusInserter);
1356  WorkerPtr workerPtr(
1357  new edm::WorkerT<PathStatusInserter::ModuleType>(inserterPtr, inserterPtr->moduleDescription(), &actions));
1358  pathStatusInserterWorkers_.emplace_back(workerPtr);
1359  workerPtr->setActivityRegistry(actReg_);
1360  addToAllWorkers(workerPtr.get());
1361 
1362  // A little complexity here because a C++ Path object is not
1363  // instantiated and put into end_paths if there are no modules
1364  // on the configured path.
1365  if (indexEmpty < empty_trig_paths_.size() && bitpos == empty_trig_paths_.at(indexEmpty)) {
1366  ++indexEmpty;
1367  } else {
1368  trig_paths_.at(indexOfPath).setPathStatusInserter(inserterPtr.get(), workerPtr.get());
1369  ++indexOfPath;
1370  }
1371  ++bitpos;
1372  }
1373 
1374  bitpos = 0;
1375  indexEmpty = 0;
1376  indexOfPath = 0;
1377  for (auto& endPathStatusInserter : endPathStatusInserters) {
1378  std::shared_ptr<EndPathStatusInserter> inserterPtr = get_underlying(endPathStatusInserter);
1379  WorkerPtr workerPtr(
1380  new edm::WorkerT<EndPathStatusInserter::ModuleType>(inserterPtr, inserterPtr->moduleDescription(), &actions));
1381  endPathStatusInserterWorkers_.emplace_back(workerPtr);
1382  workerPtr->setActivityRegistry(actReg_);
1383  addToAllWorkers(workerPtr.get());
1384 
1385  // A little complexity here because a C++ Path object is not
1386  // instantiated and put into end_paths if there are no modules
1387  // on the configured path.
1388  if (indexEmpty < empty_end_paths_.size() && bitpos == empty_end_paths_.at(indexEmpty)) {
1389  ++indexEmpty;
1390  } else {
1391  end_paths_.at(indexOfPath).setPathStatusInserter(nullptr, workerPtr.get());
1392  ++indexOfPath;
1393  }
1394  ++bitpos;
1395  }
1396  }
1397 } // namespace edm
static void fillModuleInPathSummary(Path const &path, size_t which, ModuleInPathSummary &sum)
void initializeEarlyDelete(ModuleRegistry &modReg, std::vector< std::string > const &branchesToDeleteEarly, std::multimap< std::string, std::string > const &referencesToBranches, std::vector< std::string > const &modulesToSkip, edm::ProductRegistry const &preg)
void moduleDescriptionsInEndPath(std::string const &iEndPathLabel, std::vector< ModuleDescription const *> &descriptions, unsigned int hint) const
ExceptionToActionTable const & actionTable() const
returns the action table
T getParameter(std::string const &) const
Definition: ParameterSet.h:307
static const TGPicture * info(bool iBackgroundIsBlack)
#define CMS_SA_ALLOW
std::vector< int > empty_trig_paths_
Definition: helper.py:1
roAction_t actions[nactions]
Definition: GenABIO.cc:181
std::vector< edm::propagate_const< WorkerPtr > > pathStatusInserterWorkers_
void fillWorkers(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &name, bool ignoreFilters, PathWorkers &out, std::vector< std::string > const &endPathNames, ConditionalTaskHelper const &conditionalTaskHelper, std::unordered_set< std::string > &allConditionalModules)
void endStream(StreamID iID, StreamContext &streamContext)
virtual void replaceModuleFor(Worker *) const =0
ProductList const & productList() const
std::unordered_multimap< std::string, AliasInfo > const & aliasMap() const
void processAccumulatorsAsync(WaitingTaskHolder, typename T::TransitionInfoType const &, ServiceToken const &, StreamID, ParentContext const &, typename T::Context const *)
std::array< SelectedProducts, NumBranchTypes > SelectedProductsForBranchType
static PFTauRenderPlugin instance
T w() const
std::shared_ptr< HLTGlobalStatus > TrigResPtr
WorkersInPath::size_type size_type
Definition: Path.h:46
int totalEventsPassed() const
ret
prodAgent to be discontinued
void modulesInPath(std::string const &iPathLabel, std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel ...
void processOneEventAsync(WaitingTaskHolder iTask, EventTransitionInfo &, ServiceToken const &token, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters)
std::vector< BranchToCount > earlyDeleteBranchToCount_
std::unordered_multimap< std::string, AliasInfo > aliasMap_
void setupOnDemandSystem(EventTransitionInfo const &)
void clearCounters()
Definition: Worker.h:232
void addContextAndPrintException(char const *context, cms::Exception &ex, bool disablePrint)
void addToAllWorkers(Worker *w)
std::string const & moduleName() const
StreamSchedule(std::shared_ptr< TriggerResultInserter > inserter, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, std::shared_ptr< ModuleRegistry >, ParameterSet &proc_pset, service::TriggerNamesService const &tns, PreallocationConfiguration const &prealloc, ProductRegistry &pregistry, ExceptionToActionTable const &actions, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, StreamID streamID, ProcessContext const *processContext)
std::vector< Worker * > tryToPlaceConditionalModules(Worker *, std::unordered_set< std::string > &conditionalModules, std::unordered_multimap< std::string, edm::BranchDescription const *> const &conditionalModuleBranches, std::unordered_multimap< std::string, AliasInfo > const &aliasMap, ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration)
edm::propagate_const< WorkerPtr > results_inserter_
void deleteModuleIfExists(std::string const &moduleLabel)
std::shared_ptr< Worker > WorkerPtr
void find(edm::Handle< EcalRecHitCollection > &hits, DetId thisDet, std::vector< EcalRecHitCollection::const_iterator > &hit, bool debug=false)
Definition: FindCaloHit.cc:19
assert(be >=bs)
StreamID streamID() const
ConditionalTaskHelper(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, WorkerManager &workerManager, std::vector< std::string > const &trigPathNames)
unsigned int number_of_unscheduled_modules_
int totalEventsFailed() const
TEMPL(T2) struct Divides void
Definition: Factorize.h:24
WorkerManager workerManager_
std::shared_ptr< ActivityRegistry > actReg_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
T getUntrackedParameter(std::string const &, T const &) const
constexpr element_type const * get() const
std::string moduleLabel
Definition: TriggerReport.h:52
oneapi::tbb::task_group * group() const noexcept
void processEDAliases(std::vector< std::string > const &aliasNamesToProcess, std::unordered_set< std::string > const &aliasModulesToProcess, ParameterSet const &proc_pset, std::string const &processName, ProductRegistry &preg)
char const * label
std::vector< WorkerInPath > PathWorkers
std::vector< int > empty_end_paths_
void doneWaiting(std::exception_ptr iExcept)
accept
Definition: HLTenums.h:18
void processSwitchEDAliases(ParameterSet const &proc_pset, ProductRegistry &preg, ProcessConfiguration const &processConfiguration, std::unordered_set< std::string > const &allConditionalMods)
std::string name
Definition: TriggerReport.h:41
void moduleDescriptionsInPath(std::string const &iPathLabel, std::vector< ModuleDescription const *> &descriptions, unsigned int hint) const
TrigResConstPtr results() const
Strings const & getTrigPaths() const
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
def unique(seq, keepstr=True)
Definition: tier0.py:24
Definition: Path.h:41
StreamContext streamContext_
std::vector< std::string > vstring
virtual std::vector< ConsumesInfo > consumesInfo() const =0
static void fillPathSummary(Path const &path, PathSummary &sum)
void finishedPaths(std::atomic< std::exception_ptr *> &, WaitingTaskHolder, EventTransitionInfo &)
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:92
void fillTrigPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, TrigResPtr, std::vector< std::string > const &endPathNames, ConditionalTaskHelper const &conditionalTaskHelper, std::unordered_set< std::string > &allConditionalModules)
bool typeIsViewCompatible(TypeID const &requestedViewType, TypeID const &wrappedtypeID, std::string const &className)
rep
Definition: cuy.py:1189
std::exception_ptr finishProcessOneEvent(std::exception_ptr)
ServiceToken lock() const
Definition: ServiceToken.h:101
void deleteModule(std::string const &iLabel)
Delete the module with label iLabel.
int totalEvents() const
static void fillWorkerSummaryAux(Worker const &w, WorkerSummary &sum)
virtual Types moduleType() const =0
Log< level::Info, false > LogInfo
void beginStream(StreamID iID, StreamContext &streamContext)
ModuleDescription const * description() const
Definition: Worker.h:198
std::string const & className() const
Definition: TypeID.cc:40
void clearCounters()
Clear all the counters in the trigger report.
void fillAliasMap(ParameterSet const &proc_pset, std::unordered_set< std::string > const &allConditionalMods)
Strings const & getEndPaths() const
void getTriggerReport(TriggerReport &rep) const
void addToUnscheduledWorkers(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string label, std::set< std::string > &unscheduledLabels, std::vector< std::string > &shouldBeUsedLabels)
exception_actions::ActionCodes find(const std::string &category) const
std::vector< unsigned int > earlyDeleteHelperToBranchIndicies_
void forAllModuleHolders(F iFunc)
edm::propagate_const< TrigResPtr > results_
bool search_all(ForwardSequence const &s, Datum const &d)
Definition: Algorithms.h:36
double b
Definition: hdecay.h:120
constexpr T & get_underlying(propagate_const< T > &)
void addContext(std::string const &context)
Definition: Exception.cc:169
virtual std::unique_ptr< OutputModuleCommunicator > createOutputModuleCommunicator()=0
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
static void fillWorkerSummary(Worker const *pw, WorkerSummary &sum)
std::string const & processName() const
HLT enums.
void setupResolvers(Principal &principal)
void replaceModule(maker::ModuleHolder *iMod, std::string const &iLabel)
clone the type of module with label iLabel but configure with iPSet.
std::vector< edm::propagate_const< WorkerPtr > > endPathStatusInserterWorkers_
void makePathStatusInserters(std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, ExceptionToActionTable const &actions)
def which(cmd)
Definition: eostools.py:336
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
std::unordered_multimap< std::string, edm::BranchDescription const * > conditionalModuleBranches(std::unordered_set< std::string > const &conditionalmods) const
std::vector< ModuleInPathSummary > moduleInPathSummaries
Definition: TriggerReport.h:42
void availablePaths(std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill the labels for all paths in the process
unsigned int value() const
Definition: StreamID.h:43
auto wrap(F iFunc) -> decltype(iFunc())
std::unordered_multimap< std::string, edm::BranchDescription const * > conditionalModsBranches_
Log< level::Warning, false > LogWarning
void fillEndPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, std::vector< std::string > const &endPathNames, ConditionalTaskHelper const &conditionalTaskHelper, std::unordered_set< std::string > &allConditionalModules)
std::list< std::string > const & context() const
Definition: Exception.cc:151
long double T
std::string const & moduleLabel() const
T mod(const T &a, const T &b)
Definition: ecalDccMap.h:4
std::string const & name() const
Definition: Path.h:73
std::vector< std::string > vstring
Definition: Schedule.cc:482
std::vector< std::string > set_difference(std::vector< std::string > const &v1, std::vector< std::string > const &v2)
def move(src, dest)
Definition: eostools.py:511
void clearCounters()
Definition: Path.cc:183
std::vector< EarlyDeleteHelper > earlyDeleteHelpers_
void addToAllWorkers(Worker *w)
void printCmsExceptionWarning(char const *behavior, cms::Exception const &e)
ParameterSet * getPSetForUpdate(std::string const &name, bool &isTracked)
std::vector< std::string > set_intersection(std::vector< std::string > const &v1, std::vector< std::string > const &v2)
unsigned transform(const HcalDetId &id, unsigned transformCode)