CMS 3D CMS Logo

StreamSchedule.cc
Go to the documentation of this file.
2 
29 
31 #include "processEDAliases.h"
32 
33 #include <algorithm>
34 #include <cassert>
35 #include <cstdlib>
36 #include <functional>
37 #include <iomanip>
38 #include <limits>
39 #include <list>
40 #include <map>
41 #include <exception>
42 #include <fmt/format.h>
43 
44 namespace edm {
45 
46  namespace {
47 
48  // Function template to transform each element in the input range to
49  // a value placed into the output range. The supplied function
50  // should take a const_reference to the 'input', and write to a
51  // reference to the 'output'.
52  template <typename InputIterator, typename ForwardIterator, typename Func>
53  void transform_into(InputIterator begin, InputIterator end, ForwardIterator out, Func func) {
54  for (; begin != end; ++begin, ++out)
55  func(*begin, *out);
56  }
57 
58  // Function template that takes a sequence 'from', a sequence
59  // 'to', and a callable object 'func'. It and applies
60  // transform_into to fill the 'to' sequence with the values
61  // calcuated by the callable object, taking care to fill the
62  // outupt only if all calls succeed.
63  template <typename FROM, typename TO, typename FUNC>
64  void fill_summary(FROM const& from, TO& to, FUNC func) {
65  if (to.size() != from.size()) {
66  TO temp(from.size());
67  transform_into(from.begin(), from.end(), temp.begin(), func);
68  to.swap(temp);
69  } else {
70  transform_into(from.begin(), from.end(), to.begin(), func);
71  }
72  }
73 
74  // -----------------------------
75 
76  // Here we make the trigger results inserter directly. This should
77  // probably be a utility in the WorkerRegistry or elsewhere.
78 
79  StreamSchedule::WorkerPtr makeInserter(ExceptionToActionTable const& actions,
80  std::shared_ptr<ActivityRegistry> areg,
81  std::shared_ptr<TriggerResultInserter> inserter) {
83  new edm::WorkerT<TriggerResultInserter::ModuleType>(inserter, inserter->moduleDescription(), &actions));
84  ptr->setActivityRegistry(areg);
85  return ptr;
86  }
87 
88  void initializeBranchToReadingWorker(std::vector<std::string> const& branchesToDeleteEarly,
89  ProductRegistry const& preg,
90  std::multimap<std::string, Worker*>& branchToReadingWorker) {
91  auto vBranchesToDeleteEarly = branchesToDeleteEarly;
92  // Remove any duplicates
93  std::sort(vBranchesToDeleteEarly.begin(), vBranchesToDeleteEarly.end(), std::less<std::string>());
94  vBranchesToDeleteEarly.erase(std::unique(vBranchesToDeleteEarly.begin(), vBranchesToDeleteEarly.end()),
95  vBranchesToDeleteEarly.end());
96 
97  // Are the requested items in the product registry?
98  auto allBranchNames = preg.allBranchNames();
99  //the branch names all end with a period, which we do not want to compare with
100  for (auto& b : allBranchNames) {
101  b.resize(b.size() - 1);
102  }
103  std::sort(allBranchNames.begin(), allBranchNames.end(), std::less<std::string>());
104  std::vector<std::string> temp;
105  temp.reserve(vBranchesToDeleteEarly.size());
106 
107  std::set_intersection(vBranchesToDeleteEarly.begin(),
108  vBranchesToDeleteEarly.end(),
109  allBranchNames.begin(),
110  allBranchNames.end(),
111  std::back_inserter(temp));
112  vBranchesToDeleteEarly.swap(temp);
113  if (temp.size() != vBranchesToDeleteEarly.size()) {
114  std::vector<std::string> missingProducts;
115  std::set_difference(temp.begin(),
116  temp.end(),
117  vBranchesToDeleteEarly.begin(),
118  vBranchesToDeleteEarly.end(),
119  std::back_inserter(missingProducts));
120  LogInfo l("MissingProductsForCanDeleteEarly");
121  l << "The following products in the 'canDeleteEarly' list are not available in this job and will be ignored.";
122  for (auto const& n : missingProducts) {
123  l << "\n " << n;
124  }
125  }
126  //set placeholder for the branch, we will remove the nullptr if a
127  // module actually wants the branch.
128  for (auto const& branch : vBranchesToDeleteEarly) {
129  branchToReadingWorker.insert(std::make_pair(branch, static_cast<Worker*>(nullptr)));
130  }
131  }
132 
133  Worker* getWorker(std::string const& moduleLabel,
134  ParameterSet& proc_pset,
135  WorkerManager& workerManager,
136  ProductRegistry& preg,
137  PreallocationConfiguration const* prealloc,
138  std::shared_ptr<ProcessConfiguration const> processConfiguration) {
139  bool isTracked;
140  ParameterSet* modpset = proc_pset.getPSetForUpdate(moduleLabel, isTracked);
141  if (modpset == nullptr) {
142  return nullptr;
143  }
144  assert(isTracked);
145 
146  return workerManager.getWorker(*modpset, preg, prealloc, processConfiguration, moduleLabel);
147  }
148 
149  // If ConditionalTask modules exist in the container of module
150  // names, returns the range (std::pair) for the modules. The range
151  // excludes the special markers '#' (right before the
152  // ConditionalTask modules) and '@' (last element).
153  // If the module name container does not contain ConditionalTask
154  // modules, returns std::pair of end iterators.
155  template <typename T>
156  auto findConditionalTaskModulesRange(T& modnames) {
157  auto beg = std::find(modnames.begin(), modnames.end(), "#");
158  if (beg == modnames.end()) {
159  return std::pair(modnames.end(), modnames.end());
160  }
161  return std::pair(beg + 1, std::prev(modnames.end()));
162  }
163 
164  std::optional<std::string> findBestMatchingAlias(
165  std::unordered_multimap<std::string, edm::BranchDescription const*> const& conditionalModuleBranches,
166  std::unordered_multimap<std::string, StreamSchedule::AliasInfo> const& aliasMap,
167  std::string const& productModuleLabel,
168  ConsumesInfo const& consumesInfo) {
169  std::optional<std::string> best;
170  int wildcardsInBest = std::numeric_limits<int>::max();
171  bool bestIsAmbiguous = false;
172 
173  auto updateBest = [&best, &wildcardsInBest, &bestIsAmbiguous](
174  std::string const& label, bool instanceIsWildcard, bool typeIsWildcard) {
175  int const wildcards = static_cast<int>(instanceIsWildcard) + static_cast<int>(typeIsWildcard);
176  if (wildcards == 0) {
177  bestIsAmbiguous = false;
178  return true;
179  }
180  if (not best or wildcards < wildcardsInBest) {
181  best = label;
182  wildcardsInBest = wildcards;
183  bestIsAmbiguous = false;
184  } else if (best and *best != label and wildcardsInBest == wildcards) {
185  bestIsAmbiguous = true;
186  }
187  return false;
188  };
189 
190  auto findAlias = aliasMap.equal_range(productModuleLabel);
191  for (auto it = findAlias.first; it != findAlias.second; ++it) {
192  std::string const& aliasInstanceLabel =
193  it->second.instanceLabel != "*" ? it->second.instanceLabel : it->second.originalInstanceLabel;
194  bool const instanceIsWildcard = (aliasInstanceLabel == "*");
195  if (instanceIsWildcard or consumesInfo.instance() == aliasInstanceLabel) {
196  bool const typeIsWildcard = it->second.friendlyClassName == "*";
197  if (typeIsWildcard or (consumesInfo.type().friendlyClassName() == it->second.friendlyClassName)) {
198  if (updateBest(it->second.originalModuleLabel, instanceIsWildcard, typeIsWildcard)) {
199  return it->second.originalModuleLabel;
200  }
201  } else if (consumesInfo.kindOfType() == ELEMENT_TYPE) {
202  //consume is a View so need to do more intrusive search
203  //find matching branches in module
204  auto branches = conditionalModuleBranches.equal_range(productModuleLabel);
205  for (auto itBranch = branches.first; itBranch != branches.second; ++it) {
206  if (typeIsWildcard or itBranch->second->productInstanceName() == it->second.originalInstanceLabel) {
207  if (productholderindexhelper::typeIsViewCompatible(consumesInfo.type(),
208  TypeID(itBranch->second->wrappedType().typeInfo()),
209  itBranch->second->className())) {
210  if (updateBest(it->second.originalModuleLabel, instanceIsWildcard, typeIsWildcard)) {
211  return it->second.originalModuleLabel;
212  }
213  }
214  }
215  }
216  }
217  }
218  }
219  if (bestIsAmbiguous) {
221  << "Encountered ambiguity when trying to find a best-matching alias for\n"
222  << " friendly class name " << consumesInfo.type().friendlyClassName() << "\n"
223  << " module label " << productModuleLabel << "\n"
224  << " product instance name " << consumesInfo.instance() << "\n"
225  << "when processing EDAliases for modules in ConditionalTasks. Two aliases have the same number of "
226  "wildcards ("
227  << wildcardsInBest << ")";
228  }
229  return best;
230  }
231  } // namespace
232 
233  // -----------------------------
234 
235  typedef std::vector<std::string> vstring;
236 
237  // -----------------------------
238 
240  public:
242 
244  ProductRegistry& preg,
245  PreallocationConfiguration const* prealloc,
246  std::shared_ptr<ProcessConfiguration const> processConfiguration,
247  WorkerManager& workerManager,
248  std::vector<std::string> const& trigPathNames) {
249  std::unordered_set<std::string> allConditionalMods;
250  for (auto const& pathName : trigPathNames) {
251  auto const modnames = proc_pset.getParameter<vstring>(pathName);
252 
253  //Pull out ConditionalTask modules
254  auto condRange = findConditionalTaskModulesRange(modnames);
255  if (condRange.first == condRange.second)
256  continue;
257 
258  //the last entry should be ignored since it is required to be "@"
259  allConditionalMods.insert(condRange.first, condRange.second);
260  }
261 
262  for (auto const& cond : allConditionalMods) {
263  //force the creation of the conditional modules so alias check can work
264  (void)getWorker(cond, proc_pset, workerManager, preg, prealloc, processConfiguration);
265  }
266 
267  fillAliasMap(proc_pset, allConditionalMods);
268  processSwitchEDAliases(proc_pset, preg, *processConfiguration, allConditionalMods);
269 
270  //find branches created by the conditional modules
271  for (auto const& prod : preg.productList()) {
272  if (allConditionalMods.find(prod.first.moduleLabel()) != allConditionalMods.end()) {
273  conditionalModsBranches_.emplace(prod.first.moduleLabel(), &prod.second);
274  }
275  }
276  }
277 
278  std::unordered_multimap<std::string, AliasInfo> const& aliasMap() const { return aliasMap_; }
279 
280  std::unordered_multimap<std::string, edm::BranchDescription const*> conditionalModuleBranches(
281  std::unordered_set<std::string> const& conditionalmods) const {
282  std::unordered_multimap<std::string, edm::BranchDescription const*> ret;
283  for (auto const& mod : conditionalmods) {
284  auto range = conditionalModsBranches_.equal_range(mod);
285  ret.insert(range.first, range.second);
286  }
287  return ret;
288  }
289 
290  private:
291  void fillAliasMap(ParameterSet const& proc_pset, std::unordered_set<std::string> const& allConditionalMods) {
292  auto aliases = proc_pset.getParameter<std::vector<std::string>>("@all_aliases");
293  std::string const star("*");
294  for (auto const& alias : aliases) {
295  auto info = proc_pset.getParameter<edm::ParameterSet>(alias);
296  auto aliasedToModuleLabels = info.getParameterNames();
297  for (auto const& mod : aliasedToModuleLabels) {
298  if (not mod.empty() and mod[0] != '@' and allConditionalMods.find(mod) != allConditionalMods.end()) {
299  auto aliasVPSet = info.getParameter<std::vector<edm::ParameterSet>>(mod);
300  for (auto const& aliasPSet : aliasVPSet) {
301  std::string type = star;
302  std::string instance = star;
303  std::string originalInstance = star;
304  if (aliasPSet.exists("type")) {
305  type = aliasPSet.getParameter<std::string>("type");
306  }
307  if (aliasPSet.exists("toProductInstance")) {
308  instance = aliasPSet.getParameter<std::string>("toProductInstance");
309  }
310  if (aliasPSet.exists("fromProductInstance")) {
311  originalInstance = aliasPSet.getParameter<std::string>("fromProductInstance");
312  }
313 
314  aliasMap_.emplace(alias, AliasInfo{type, instance, originalInstance, mod});
315  }
316  }
317  }
318  }
319  }
320 
321  void processSwitchEDAliases(ParameterSet const& proc_pset,
322  ProductRegistry& preg,
323  ProcessConfiguration const& processConfiguration,
324  std::unordered_set<std::string> const& allConditionalMods) {
325  auto const& all_modules = proc_pset.getParameter<std::vector<std::string>>("@all_modules");
326  std::vector<std::string> switchEDAliases;
327  for (auto const& module : all_modules) {
328  auto const& mod_pset = proc_pset.getParameter<edm::ParameterSet>(module);
329  if (mod_pset.getParameter<std::string>("@module_type") == "SwitchProducer") {
330  auto const& all_cases = mod_pset.getParameter<std::vector<std::string>>("@all_cases");
331  for (auto const& case_label : all_cases) {
332  auto range = aliasMap_.equal_range(case_label);
333  if (range.first != range.second) {
334  switchEDAliases.push_back(case_label);
335  }
336  }
337  }
338  }
340  switchEDAliases, allConditionalMods, proc_pset, processConfiguration.processName(), preg);
341  }
342 
343  std::unordered_multimap<std::string, AliasInfo> aliasMap_;
344  std::unordered_multimap<std::string, edm::BranchDescription const*> conditionalModsBranches_;
345  };
346 
347  // -----------------------------
348 
350  std::shared_ptr<TriggerResultInserter> inserter,
351  std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
352  std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
353  std::shared_ptr<ModuleRegistry> modReg,
354  ParameterSet& proc_pset,
355  service::TriggerNamesService const& tns,
356  PreallocationConfiguration const& prealloc,
357  ProductRegistry& preg,
359  std::shared_ptr<ActivityRegistry> areg,
360  std::shared_ptr<ProcessConfiguration const> processConfiguration,
361  StreamID streamID,
362  ProcessContext const* processContext)
363  : workerManager_(modReg, areg, actions),
364  actReg_(areg),
365  results_(new HLTGlobalStatus(tns.getTrigPaths().size())),
366  results_inserter_(),
367  trig_paths_(),
368  end_paths_(),
369  total_events_(),
370  total_passed_(),
371  number_of_unscheduled_modules_(0),
372  streamID_(streamID),
373  streamContext_(streamID_, processContext) {
374  bool hasPath = false;
375  std::vector<std::string> const& pathNames = tns.getTrigPaths();
376  std::vector<std::string> const& endPathNames = tns.getEndPaths();
377 
378  ConditionalTaskHelper conditionalTaskHelper(
379  proc_pset, preg, &prealloc, processConfiguration, workerManager_, pathNames);
380  std::unordered_set<std::string> conditionalModules;
381 
382  int trig_bitpos = 0;
383  trig_paths_.reserve(pathNames.size());
384  for (auto const& trig_name : pathNames) {
385  fillTrigPath(proc_pset,
386  preg,
387  &prealloc,
388  processConfiguration,
389  trig_bitpos,
390  trig_name,
391  results(),
392  endPathNames,
393  conditionalTaskHelper,
394  conditionalModules);
395  ++trig_bitpos;
396  hasPath = true;
397  }
398 
399  if (hasPath) {
400  // the results inserter stands alone
401  inserter->setTrigResultForStream(streamID.value(), results());
402 
403  results_inserter_ = makeInserter(actions, actReg_, inserter);
405  }
406 
407  // fill normal endpaths
408  int bitpos = 0;
409  end_paths_.reserve(endPathNames.size());
410  for (auto const& end_path_name : endPathNames) {
411  fillEndPath(proc_pset,
412  preg,
413  &prealloc,
414  processConfiguration,
415  bitpos,
416  end_path_name,
417  endPathNames,
418  conditionalTaskHelper,
419  conditionalModules);
420  ++bitpos;
421  }
422 
423  makePathStatusInserters(pathStatusInserters, endPathStatusInserters, actions);
424 
425  //See if all modules were used
426  std::set<std::string> usedWorkerLabels;
427  for (auto const& worker : allWorkers()) {
428  usedWorkerLabels.insert(worker->description()->moduleLabel());
429  }
430  std::vector<std::string> modulesInConfig(proc_pset.getParameter<std::vector<std::string>>("@all_modules"));
431  std::set<std::string> modulesInConfigSet(modulesInConfig.begin(), modulesInConfig.end());
432  std::vector<std::string> unusedLabels;
433  set_difference(modulesInConfigSet.begin(),
434  modulesInConfigSet.end(),
435  usedWorkerLabels.begin(),
436  usedWorkerLabels.end(),
437  back_inserter(unusedLabels));
438  std::set<std::string> unscheduledLabels;
439  std::vector<std::string> shouldBeUsedLabels;
440  if (!unusedLabels.empty()) {
441  //Need to
442  // 1) create worker
443  // 2) if it is a WorkerT<EDProducer>, add it to our list
444  // 3) hand list to our delayed reader
445  for (auto const& label : unusedLabels) {
446  bool isTracked;
447  ParameterSet* modulePSet(proc_pset.getPSetForUpdate(label, isTracked));
448  assert(isTracked);
449  assert(modulePSet != nullptr);
451  *modulePSet, preg, &prealloc, processConfiguration, label, unscheduledLabels, shouldBeUsedLabels);
452  }
453  if (!shouldBeUsedLabels.empty()) {
454  std::ostringstream unusedStream;
455  unusedStream << "'" << shouldBeUsedLabels.front() << "'";
456  for (std::vector<std::string>::iterator itLabel = shouldBeUsedLabels.begin() + 1,
457  itLabelEnd = shouldBeUsedLabels.end();
458  itLabel != itLabelEnd;
459  ++itLabel) {
460  unusedStream << ",'" << *itLabel << "'";
461  }
462  LogInfo("path") << "The following module labels are not assigned to any path:\n" << unusedStream.str() << "\n";
463  }
464  }
465  number_of_unscheduled_modules_ = unscheduledLabels.size();
466 
467  // Print conditional modules that were not consumed in any of their associated Paths
468  if (streamID.value() == 0 and not conditionalModules.empty()) {
469  // Intersection of unscheduled and ConditionalTask modules gives
470  // directly the set of conditional modules that were not
471  // consumed by anything in the Paths associated to the
472  // corresponding ConditionalTask.
473  std::vector<std::string_view> labelsToPrint;
474  std::copy_if(
475  unscheduledLabels.begin(),
476  unscheduledLabels.end(),
477  std::back_inserter(labelsToPrint),
478  [&conditionalModules](auto const& lab) { return conditionalModules.find(lab) != conditionalModules.end(); });
479 
480  if (not labelsToPrint.empty()) {
481  edm::LogWarning log("NonConsumedConditionalModules");
482  log << "The following modules were part of some ConditionalTask, but were not\n"
483  << "consumed by any other module in any of the Paths to which the ConditionalTask\n"
484  << "was associated. Perhaps they should be either removed from the\n"
485  << "job, or moved to a Task to make it explicit they are unscheduled.\n";
486  for (auto const& modLabel : labelsToPrint) {
487  log.format("\n {}", modLabel);
488  }
489  }
490  }
491  } // StreamSchedule::StreamSchedule
492 
494  std::vector<std::string> const& branchesToDeleteEarly,
495  std::multimap<std::string, std::string> const& referencesToBranches,
496  std::vector<std::string> const& modulesToSkip,
497  edm::ProductRegistry const& preg) {
498  // setup the list with those products actually registered for this job
499  std::multimap<std::string, Worker*> branchToReadingWorker;
500  initializeBranchToReadingWorker(branchesToDeleteEarly, preg, branchToReadingWorker);
501 
502  const std::vector<std::string> kEmpty;
503  std::map<Worker*, unsigned int> reserveSizeForWorker;
504  unsigned int upperLimitOnReadingWorker = 0;
505  unsigned int upperLimitOnIndicies = 0;
506  unsigned int nUniqueBranchesToDelete = branchToReadingWorker.size();
507 
508  //talk with output modules first
509  modReg.forAllModuleHolders([&branchToReadingWorker, &nUniqueBranchesToDelete](maker::ModuleHolder* iHolder) {
510  auto comm = iHolder->createOutputModuleCommunicator();
511  if (comm) {
512  if (!branchToReadingWorker.empty()) {
513  //If an OutputModule needs a product, we can't delete it early
514  // so we should remove it from our list
515  SelectedProductsForBranchType const& kept = comm->keptProducts();
516  for (auto const& item : kept[InEvent]) {
517  BranchDescription const& desc = *item.first;
518  auto found = branchToReadingWorker.equal_range(desc.branchName());
519  if (found.first != found.second) {
520  --nUniqueBranchesToDelete;
521  branchToReadingWorker.erase(found.first, found.second);
522  }
523  }
524  }
525  }
526  });
527 
528  if (branchToReadingWorker.empty()) {
529  return;
530  }
531 
532  std::unordered_set<std::string> modulesToExclude(modulesToSkip.begin(), modulesToSkip.end());
533  for (auto w : allWorkers()) {
534  if (modulesToExclude.end() != modulesToExclude.find(w->description()->moduleLabel())) {
535  continue;
536  }
537  //determine if this module could read a branch we want to delete early
538  auto consumes = w->consumesInfo();
539  if (not consumes.empty()) {
540  bool foundAtLeastOneMatchingBranch = false;
541  for (auto const& product : consumes) {
542  std::string branch = fmt::format("{}_{}_{}_{}",
543  product.type().friendlyClassName(),
544  product.label().data(),
545  product.instance().data(),
546  product.process().data());
547  {
548  //Handle case where worker directly consumes product
549  auto found = branchToReadingWorker.end();
550  if (product.process().empty()) {
551  auto startFound = branchToReadingWorker.lower_bound(branch);
552  if (startFound != branchToReadingWorker.end()) {
553  if (startFound->first.substr(0, branch.size()) == branch) {
554  //match all processNames here, even if it means multiple matches will happen
555  found = startFound;
556  }
557  }
558  } else {
559  auto exactFound = branchToReadingWorker.equal_range(branch);
560  if (exactFound.first != exactFound.second) {
561  found = exactFound.first;
562  }
563  }
564  if (found != branchToReadingWorker.end()) {
565  if (not foundAtLeastOneMatchingBranch) {
566  ++upperLimitOnReadingWorker;
567  foundAtLeastOneMatchingBranch = true;
568  }
569  ++upperLimitOnIndicies;
570  ++reserveSizeForWorker[w];
571  if (nullptr == found->second) {
572  found->second = w;
573  } else {
574  branchToReadingWorker.insert(make_pair(found->first, w));
575  }
576  }
577  }
578  {
579  //Handle case where indirectly consumes product
580  auto found = referencesToBranches.end();
581  if (product.process().empty()) {
582  auto startFound = referencesToBranches.lower_bound(branch);
583  if (startFound != referencesToBranches.end()) {
584  if (startFound->first.substr(0, branch.size()) == branch) {
585  //match all processNames here, even if it means multiple matches will happen
586  found = startFound;
587  }
588  }
589  } else {
590  //can match exactly
591  auto exactFound = referencesToBranches.equal_range(branch);
592  if (exactFound.first != exactFound.second) {
593  found = exactFound.first;
594  }
595  }
596  if (found != referencesToBranches.end()) {
597  for (auto itr = found; (itr != referencesToBranches.end()) and (itr->first == found->first); ++itr) {
598  auto foundInBranchToReadingWorker = branchToReadingWorker.find(itr->second);
599  if (foundInBranchToReadingWorker == branchToReadingWorker.end()) {
600  continue;
601  }
602  if (not foundAtLeastOneMatchingBranch) {
603  ++upperLimitOnReadingWorker;
604  foundAtLeastOneMatchingBranch = true;
605  }
606  ++upperLimitOnIndicies;
607  ++reserveSizeForWorker[w];
608  if (nullptr == foundInBranchToReadingWorker->second) {
609  foundInBranchToReadingWorker->second = w;
610  } else {
611  branchToReadingWorker.insert(make_pair(itr->second, w));
612  }
613  }
614  }
615  }
616  }
617  }
618  }
619  {
620  auto it = branchToReadingWorker.begin();
621  std::vector<std::string> unusedBranches;
622  while (it != branchToReadingWorker.end()) {
623  if (it->second == nullptr) {
624  unusedBranches.push_back(it->first);
625  //erasing the object invalidates the iterator so must advance it first
626  auto temp = it;
627  ++it;
628  branchToReadingWorker.erase(temp);
629  } else {
630  ++it;
631  }
632  }
633  if (not unusedBranches.empty()) {
634  LogWarning l("UnusedProductsForCanDeleteEarly");
635  l << "The following products in the 'canDeleteEarly' list are not used in this job and will be ignored.\n"
636  " If possible, remove the producer from the job.";
637  for (auto const& n : unusedBranches) {
638  l << "\n " << n;
639  }
640  }
641  }
642  if (!branchToReadingWorker.empty()) {
643  earlyDeleteHelpers_.reserve(upperLimitOnReadingWorker);
644  earlyDeleteHelperToBranchIndicies_.resize(upperLimitOnIndicies, 0);
645  earlyDeleteBranchToCount_.reserve(nUniqueBranchesToDelete);
646  std::map<const Worker*, EarlyDeleteHelper*> alreadySeenWorkers;
647  std::string lastBranchName;
648  size_t nextOpenIndex = 0;
649  unsigned int* beginAddress = &(earlyDeleteHelperToBranchIndicies_.front());
650  for (auto& branchAndWorker : branchToReadingWorker) {
651  if (lastBranchName != branchAndWorker.first) {
652  //have to put back the period we removed earlier in order to get the proper name
653  BranchID bid(branchAndWorker.first + ".");
654  earlyDeleteBranchToCount_.emplace_back(bid, 0U);
655  lastBranchName = branchAndWorker.first;
656  }
657  auto found = alreadySeenWorkers.find(branchAndWorker.second);
658  if (alreadySeenWorkers.end() == found) {
659  //NOTE: we will set aside enough space in earlyDeleteHelperToBranchIndicies_ to accommodate
660  // all the branches that might be read by this worker. However, initially we will only tell the
661  // EarlyDeleteHelper about the first one. As additional branches are added via 'appendIndex' the
662  // EarlyDeleteHelper will automatically advance its internal end pointer.
663  size_t index = nextOpenIndex;
664  size_t nIndices = reserveSizeForWorker[branchAndWorker.second];
667  earlyDeleteHelpers_.emplace_back(beginAddress + index, beginAddress + index + 1, &earlyDeleteBranchToCount_);
668  branchAndWorker.second->setEarlyDeleteHelper(&(earlyDeleteHelpers_.back()));
669  alreadySeenWorkers.insert(std::make_pair(branchAndWorker.second, &(earlyDeleteHelpers_.back())));
670  nextOpenIndex += nIndices;
671  } else {
672  found->second->appendIndex(earlyDeleteBranchToCount_.size() - 1);
673  }
674  }
675 
676  //Now we can compactify the earlyDeleteHelperToBranchIndicies_ since we may have over estimated the
677  // space needed for each module
678  auto itLast = earlyDeleteHelpers_.begin();
679  for (auto it = earlyDeleteHelpers_.begin() + 1; it != earlyDeleteHelpers_.end(); ++it) {
680  if (itLast->end() != it->begin()) {
681  //figure the offset for next Worker since it hasn't been moved yet so it has the original address
682  unsigned int delta = it->begin() - itLast->end();
683  it->shiftIndexPointers(delta);
684 
686  earlyDeleteHelperToBranchIndicies_.begin() + (itLast->end() - beginAddress),
687  earlyDeleteHelperToBranchIndicies_.begin() + (it->begin() - beginAddress));
688  }
689  itLast = it;
690  }
692  earlyDeleteHelperToBranchIndicies_.begin() + (itLast->end() - beginAddress),
694 
695  //now tell the paths about the deleters
696  for (auto& p : trig_paths_) {
697  p.setEarlyDeleteHelpers(alreadySeenWorkers);
698  }
699  for (auto& p : end_paths_) {
700  p.setEarlyDeleteHelpers(alreadySeenWorkers);
701  }
703  }
704  }
705 
707  Worker* worker,
708  std::unordered_set<std::string>& conditionalModules,
709  std::unordered_multimap<std::string, edm::BranchDescription const*> const& conditionalModuleBranches,
710  std::unordered_multimap<std::string, AliasInfo> const& aliasMap,
711  ParameterSet& proc_pset,
712  ProductRegistry& preg,
713  PreallocationConfiguration const* prealloc,
714  std::shared_ptr<ProcessConfiguration const> processConfiguration) {
715  std::vector<Worker*> returnValue;
716  auto const& consumesInfo = worker->consumesInfo();
717  auto moduleLabel = worker->description()->moduleLabel();
718  using namespace productholderindexhelper;
719  for (auto const& ci : consumesInfo) {
720  if (not ci.skipCurrentProcess() and
721  (ci.process().empty() or ci.process() == processConfiguration->processName())) {
722  auto productModuleLabel = std::string(ci.label());
723  bool productFromConditionalModule = false;
724  auto itFound = conditionalModules.find(productModuleLabel);
725  if (itFound == conditionalModules.end()) {
726  //Check to see if this was an alias
727  //note that aliasMap was previously filtered so only the conditional modules remain there
728  auto foundAlias = findBestMatchingAlias(conditionalModuleBranches, aliasMap, productModuleLabel, ci);
729  if (foundAlias) {
730  productModuleLabel = *foundAlias;
731  productFromConditionalModule = true;
732  itFound = conditionalModules.find(productModuleLabel);
733  //check that the alias-for conditional module has not been used
734  if (itFound == conditionalModules.end()) {
735  continue;
736  }
737  }
738  } else {
739  //need to check the rest of the data product info
740  auto findBranches = conditionalModuleBranches.equal_range(productModuleLabel);
741  for (auto itBranch = findBranches.first; itBranch != findBranches.second; ++itBranch) {
742  if (itBranch->second->productInstanceName() == ci.instance()) {
743  if (ci.kindOfType() == PRODUCT_TYPE) {
744  if (ci.type() == itBranch->second->unwrappedTypeID()) {
745  productFromConditionalModule = true;
746  break;
747  }
748  } else {
749  //this is a view
751  ci.type(), TypeID(itBranch->second->wrappedType().typeInfo()), itBranch->second->className())) {
752  productFromConditionalModule = true;
753  break;
754  }
755  }
756  }
757  }
758  }
759  if (productFromConditionalModule) {
760  auto condWorker =
761  getWorker(productModuleLabel, proc_pset, workerManager_, preg, prealloc, processConfiguration);
762  assert(condWorker);
763 
764  conditionalModules.erase(itFound);
765 
766  auto dependents = tryToPlaceConditionalModules(condWorker,
767  conditionalModules,
768  conditionalModuleBranches,
769  aliasMap,
770  proc_pset,
771  preg,
772  prealloc,
773  processConfiguration);
774  returnValue.insert(returnValue.end(), dependents.begin(), dependents.end());
775  returnValue.push_back(condWorker);
776  }
777  }
778  }
779  return returnValue;
780  }
781 
783  ProductRegistry& preg,
784  PreallocationConfiguration const* prealloc,
785  std::shared_ptr<ProcessConfiguration const> processConfiguration,
786  std::string const& pathName,
787  bool ignoreFilters,
788  PathWorkers& out,
789  std::vector<std::string> const& endPathNames,
790  ConditionalTaskHelper const& conditionalTaskHelper,
791  std::unordered_set<std::string>& allConditionalModules) {
792  vstring modnames = proc_pset.getParameter<vstring>(pathName);
793  PathWorkers tmpworkers;
794 
795  //Pull out ConditionalTask modules
796  auto condRange = findConditionalTaskModulesRange(modnames);
797 
798  std::unordered_set<std::string> conditionalmods;
799  //An EDAlias may be redirecting to a module on a ConditionalTask
800  std::unordered_multimap<std::string, edm::BranchDescription const*> conditionalModsBranches;
801  std::unordered_map<std::string, unsigned int> conditionalModOrder;
802  if (condRange.first != condRange.second) {
803  for (auto it = condRange.first; it != condRange.second; ++it) {
804  // ordering needs to skip the # token in the path list
805  conditionalModOrder.emplace(*it, it - modnames.begin() - 1);
806  }
807  //the last entry should be ignored since it is required to be "@"
808  conditionalmods = std::unordered_set<std::string>(std::make_move_iterator(condRange.first),
809  std::make_move_iterator(condRange.second));
810 
811  conditionalModsBranches = conditionalTaskHelper.conditionalModuleBranches(conditionalmods);
812  modnames.erase(std::prev(condRange.first), modnames.end());
813 
814  // Make a union of all conditional modules from all Paths
815  allConditionalModules.insert(conditionalmods.begin(), conditionalmods.end());
816  }
817 
818  unsigned int placeInPath = 0;
819  for (auto const& name : modnames) {
820  //Modules except EDFilters are set to run concurrently by default
821  bool doNotRunConcurrently = false;
823  if (name[0] == '!') {
824  filterAction = WorkerInPath::Veto;
825  } else if (name[0] == '-' or name[0] == '+') {
826  filterAction = WorkerInPath::Ignore;
827  }
828  if (name[0] == '|' or name[0] == '+') {
829  //cms.wait was specified so do not run concurrently
830  doNotRunConcurrently = true;
831  }
832 
834  if (filterAction != WorkerInPath::Normal or name[0] == '|') {
835  moduleLabel.erase(0, 1);
836  }
837 
838  Worker* worker = getWorker(moduleLabel, proc_pset, workerManager_, preg, prealloc, processConfiguration);
839  if (worker == nullptr) {
840  std::string pathType("endpath");
841  if (!search_all(endPathNames, pathName)) {
842  pathType = std::string("path");
843  }
845  << "The unknown module label \"" << moduleLabel << "\" appears in " << pathType << " \"" << pathName
846  << "\"\n please check spelling or remove that label from the path.";
847  }
848 
849  if (ignoreFilters && filterAction != WorkerInPath::Ignore && worker->moduleType() == Worker::kFilter) {
850  // We have a filter on an end path, and the filter is not explicitly ignored.
851  // See if the filter is allowed.
852  std::vector<std::string> allowed_filters = proc_pset.getUntrackedParameter<vstring>("@filters_on_endpaths");
853  if (!search_all(allowed_filters, worker->description()->moduleName())) {
854  // Filter is not allowed. Ignore the result, and issue a warning.
855  filterAction = WorkerInPath::Ignore;
856  LogWarning("FilterOnEndPath") << "The EDFilter '" << worker->description()->moduleName()
857  << "' with module label '" << moduleLabel << "' appears on EndPath '"
858  << pathName << "'.\n"
859  << "The return value of the filter will be ignored.\n"
860  << "To suppress this warning, either remove the filter from the endpath,\n"
861  << "or explicitly ignore it in the configuration by using cms.ignore().\n";
862  }
863  }
864  bool runConcurrently = not doNotRunConcurrently;
865  if (runConcurrently && worker->moduleType() == Worker::kFilter and filterAction != WorkerInPath::Ignore) {
866  runConcurrently = false;
867  }
868 
869  auto condModules = tryToPlaceConditionalModules(worker,
870  conditionalmods,
871  conditionalModsBranches,
872  conditionalTaskHelper.aliasMap(),
873  proc_pset,
874  preg,
875  prealloc,
876  processConfiguration);
877  for (auto condMod : condModules) {
878  tmpworkers.emplace_back(
879  condMod, WorkerInPath::Ignore, conditionalModOrder[condMod->description()->moduleLabel()], true);
880  }
881 
882  tmpworkers.emplace_back(worker, filterAction, placeInPath, runConcurrently);
883  ++placeInPath;
884  }
885 
886  out.swap(tmpworkers);
887  }
888 
890  ProductRegistry& preg,
891  PreallocationConfiguration const* prealloc,
892  std::shared_ptr<ProcessConfiguration const> processConfiguration,
893  int bitpos,
894  std::string const& name,
895  TrigResPtr trptr,
896  std::vector<std::string> const& endPathNames,
897  ConditionalTaskHelper const& conditionalTaskHelper,
898  std::unordered_set<std::string>& allConditionalModules) {
899  PathWorkers tmpworkers;
900  fillWorkers(proc_pset,
901  preg,
902  prealloc,
903  processConfiguration,
904  name,
905  false,
906  tmpworkers,
907  endPathNames,
908  conditionalTaskHelper,
909  allConditionalModules);
910 
911  // an empty path will cause an extra bit that is not used
912  if (!tmpworkers.empty()) {
913  trig_paths_.emplace_back(
914  bitpos, name, tmpworkers, trptr, actionTable(), actReg_, &streamContext_, PathContext::PathType::kPath);
915  } else {
916  empty_trig_paths_.push_back(bitpos);
917  }
918  for (WorkerInPath const& workerInPath : tmpworkers) {
919  addToAllWorkers(workerInPath.getWorker());
920  }
921  }
922 
924  ProductRegistry& preg,
925  PreallocationConfiguration const* prealloc,
926  std::shared_ptr<ProcessConfiguration const> processConfiguration,
927  int bitpos,
928  std::string const& name,
929  std::vector<std::string> const& endPathNames,
930  ConditionalTaskHelper const& conditionalTaskHelper,
931  std::unordered_set<std::string>& allConditionalModules) {
932  PathWorkers tmpworkers;
933  fillWorkers(proc_pset,
934  preg,
935  prealloc,
936  processConfiguration,
937  name,
938  true,
939  tmpworkers,
940  endPathNames,
941  conditionalTaskHelper,
942  allConditionalModules);
943 
944  if (!tmpworkers.empty()) {
945  end_paths_.emplace_back(bitpos,
946  name,
947  tmpworkers,
948  TrigResPtr(),
949  actionTable(),
950  actReg_,
953  } else {
954  empty_end_paths_.push_back(bitpos);
955  }
956  for (WorkerInPath const& workerInPath : tmpworkers) {
957  addToAllWorkers(workerInPath.getWorker());
958  }
959  }
960 
962 
964 
966  Worker* found = nullptr;
967  for (auto const& worker : allWorkers()) {
968  if (worker->description()->moduleLabel() == iLabel) {
969  found = worker;
970  break;
971  }
972  }
973  if (nullptr == found) {
974  return;
975  }
976 
977  iMod->replaceModuleFor(found);
978  found->beginStream(streamID_, streamContext_);
979  }
980 
982 
983  std::vector<ModuleDescription const*> StreamSchedule::getAllModuleDescriptions() const {
984  std::vector<ModuleDescription const*> result;
985  result.reserve(allWorkers().size());
986 
987  for (auto const& worker : allWorkers()) {
988  ModuleDescription const* p = worker->description();
989  result.push_back(p);
990  }
991  return result;
992  }
993 
995  WaitingTaskHolder iTask,
997  ServiceToken const& serviceToken,
998  std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters) {
999  EventPrincipal& ep = info.principal();
1000 
1001  // Caught exception is propagated via WaitingTaskHolder
1002  CMS_SA_ALLOW try {
1003  this->resetAll();
1004 
1006 
1007  Traits::setStreamContext(streamContext_, ep);
1008  //a service may want to communicate with another service
1009  ServiceRegistry::Operate guard(serviceToken);
1010  Traits::preScheduleSignal(actReg_.get(), &streamContext_);
1011 
1012  // Data dependencies need to be set up before marking empty
1013  // (End)Paths complete in case something consumes the status of
1014  // the empty (EndPath)
1017 
1018  HLTPathStatus hltPathStatus(hlt::Pass, 0);
1019  for (int empty_trig_path : empty_trig_paths_) {
1020  results_->at(empty_trig_path) = hltPathStatus;
1021  pathStatusInserters[empty_trig_path]->setPathStatus(streamID_, hltPathStatus);
1022  std::exception_ptr except = pathStatusInserterWorkers_[empty_trig_path]
1025  if (except) {
1026  iTask.doneWaiting(except);
1027  return;
1028  }
1029  }
1030  if (not endPathStatusInserterWorkers_.empty()) {
1031  for (int empty_end_path : empty_end_paths_) {
1032  std::exception_ptr except =
1033  endPathStatusInserterWorkers_[empty_end_path]
1036  if (except) {
1037  iTask.doneWaiting(except);
1038  return;
1039  }
1040  }
1041  }
1042 
1043  ++total_events_;
1044 
1045  //use to give priorities on an error to ones from Paths
1046  auto pathErrorHolder = std::make_unique<std::atomic<std::exception_ptr*>>(nullptr);
1047  auto pathErrorPtr = pathErrorHolder.get();
1048  ServiceWeakToken weakToken = serviceToken;
1049  auto allPathsDone = make_waiting_task(
1050  [iTask, this, weakToken, pathError = std::move(pathErrorHolder)](std::exception_ptr const* iPtr) mutable {
1051  ServiceRegistry::Operate operate(weakToken.lock());
1052 
1053  std::exception_ptr ptr;
1054  if (pathError->load()) {
1055  ptr = *pathError->load();
1056  delete pathError->load();
1057  }
1058  if ((not ptr) and iPtr) {
1059  ptr = *iPtr;
1060  }
1061  iTask.doneWaiting(finishProcessOneEvent(ptr));
1062  });
1063  //The holder guarantees that if the paths finish before the loop ends
1064  // that we do not start too soon. It also guarantees that the task will
1065  // run under that condition.
1066  WaitingTaskHolder allPathsHolder(*iTask.group(), allPathsDone);
1067 
1068  auto pathsDone = make_waiting_task([allPathsHolder, pathErrorPtr, transitionInfo = info, this, weakToken](
1069  std::exception_ptr const* iPtr) mutable {
1070  ServiceRegistry::Operate operate(weakToken.lock());
1071 
1072  if (iPtr) {
1073  //this is used to prioritize this error over one
1074  // that happens in EndPath or Accumulate
1075  pathErrorPtr->store(new std::exception_ptr(*iPtr));
1076  }
1077  finishedPaths(*pathErrorPtr, std::move(allPathsHolder), transitionInfo);
1078  });
1079 
1080  //The holder guarantees that if the paths finish before the loop ends
1081  // that we do not start too soon. It also guarantees that the task will
1082  // run under that condition.
1083  WaitingTaskHolder taskHolder(*iTask.group(), pathsDone);
1084 
1085  //start end paths first so on single threaded the paths will run first
1086  WaitingTaskHolder hAllPathsDone(*iTask.group(), allPathsDone);
1087  for (auto it = end_paths_.rbegin(), itEnd = end_paths_.rend(); it != itEnd; ++it) {
1088  it->processOneOccurrenceAsync(hAllPathsDone, info, serviceToken, streamID_, &streamContext_);
1089  }
1090 
1091  for (auto it = trig_paths_.rbegin(), itEnd = trig_paths_.rend(); it != itEnd; ++it) {
1092  it->processOneOccurrenceAsync(taskHolder, info, serviceToken, streamID_, &streamContext_);
1093  }
1094 
1095  ParentContext parentContext(&streamContext_);
1097  hAllPathsDone, info, serviceToken, streamID_, parentContext, &streamContext_);
1098  } catch (...) {
1099  iTask.doneWaiting(std::current_exception());
1100  }
1101  }
1102 
1103  void StreamSchedule::finishedPaths(std::atomic<std::exception_ptr*>& iExcept,
1104  WaitingTaskHolder iWait,
1106  if (iExcept) {
1107  // Caught exception is propagated via WaitingTaskHolder
1108  CMS_SA_ALLOW try { std::rethrow_exception(*(iExcept.load())); } catch (cms::Exception& e) {
1112  edm::printCmsExceptionWarning("TryToContinue", e);
1113  *(iExcept.load()) = std::exception_ptr();
1114  } else {
1115  *(iExcept.load()) = std::current_exception();
1116  }
1117  } catch (...) {
1118  *(iExcept.load()) = std::current_exception();
1119  }
1120  }
1121 
1122  if ((not iExcept) and results_->accept()) {
1123  ++total_passed_;
1124  }
1125 
1126  if (nullptr != results_inserter_.get()) {
1127  // Caught exception is propagated to the caller
1128  CMS_SA_ALLOW try {
1129  //Even if there was an exception, we need to allow results inserter
1130  // to run since some module may be waiting on its results.
1131  ParentContext parentContext(&streamContext_);
1133 
1134  auto expt = results_inserter_->runModuleDirectly<Traits>(info, streamID_, parentContext, &streamContext_);
1135  if (expt) {
1136  std::rethrow_exception(expt);
1137  }
1138  } catch (cms::Exception& ex) {
1139  if (not iExcept) {
1140  if (ex.context().empty()) {
1141  std::ostringstream ost;
1142  ost << "Processing Event " << info.principal().id();
1143  ex.addContext(ost.str());
1144  }
1145  iExcept.store(new std::exception_ptr(std::current_exception()));
1146  }
1147  } catch (...) {
1148  if (not iExcept) {
1149  iExcept.store(new std::exception_ptr(std::current_exception()));
1150  }
1151  }
1152  }
1153  std::exception_ptr ptr;
1154  if (iExcept) {
1155  ptr = *iExcept.load();
1156  }
1157  iWait.doneWaiting(ptr);
1158  }
1159 
1160  std::exception_ptr StreamSchedule::finishProcessOneEvent(std::exception_ptr iExcept) {
1162 
1163  if (iExcept) {
1164  //add context information to the exception and print message
1165  try {
1166  convertException::wrap([&]() { std::rethrow_exception(iExcept); });
1167  } catch (cms::Exception& ex) {
1168  bool const cleaningUpAfterException = false;
1169  if (ex.context().empty()) {
1170  addContextAndPrintException("Calling function StreamSchedule::processOneEvent", ex, cleaningUpAfterException);
1171  } else {
1172  addContextAndPrintException("", ex, cleaningUpAfterException);
1173  }
1174  iExcept = std::current_exception();
1175  }
1176 
1177  actReg_->preStreamEarlyTerminationSignal_(streamContext_, TerminationOrigin::ExceptionFromThisContext);
1178  }
1179  // Caught exception is propagated to the caller
1180  CMS_SA_ALLOW try { Traits::postScheduleSignal(actReg_.get(), &streamContext_); } catch (...) {
1181  if (not iExcept) {
1182  iExcept = std::current_exception();
1183  }
1184  }
1185  if (not iExcept) {
1186  resetEarlyDelete();
1187  }
1188 
1189  return iExcept;
1190  }
1191 
1192  void StreamSchedule::availablePaths(std::vector<std::string>& oLabelsToFill) const {
1193  oLabelsToFill.reserve(trig_paths_.size());
1194  std::transform(trig_paths_.begin(),
1195  trig_paths_.end(),
1196  std::back_inserter(oLabelsToFill),
1197  std::bind(&Path::name, std::placeholders::_1));
1198  }
1199 
1200  void StreamSchedule::modulesInPath(std::string const& iPathLabel, std::vector<std::string>& oLabelsToFill) const {
1201  TrigPaths::const_iterator itFound = std::find_if(
1202  trig_paths_.begin(),
1203  trig_paths_.end(),
1204  std::bind(std::equal_to<std::string>(), iPathLabel, std::bind(&Path::name, std::placeholders::_1)));
1205  if (itFound != trig_paths_.end()) {
1206  oLabelsToFill.reserve(itFound->size());
1207  for (size_t i = 0; i < itFound->size(); ++i) {
1208  oLabelsToFill.push_back(itFound->getWorker(i)->description()->moduleLabel());
1209  }
1210  }
1211  }
1212 
1214  std::vector<ModuleDescription const*>& descriptions,
1215  unsigned int hint) const {
1216  descriptions.clear();
1217  bool found = false;
1218  TrigPaths::const_iterator itFound;
1219 
1220  if (hint < trig_paths_.size()) {
1221  itFound = trig_paths_.begin() + hint;
1222  if (itFound->name() == iPathLabel)
1223  found = true;
1224  }
1225  if (!found) {
1226  // if the hint did not work, do it the slow way
1227  itFound = std::find_if(
1228  trig_paths_.begin(),
1229  trig_paths_.end(),
1230  std::bind(std::equal_to<std::string>(), iPathLabel, std::bind(&Path::name, std::placeholders::_1)));
1231  if (itFound != trig_paths_.end())
1232  found = true;
1233  }
1234  if (found) {
1235  descriptions.reserve(itFound->size());
1236  for (size_t i = 0; i < itFound->size(); ++i) {
1237  descriptions.push_back(itFound->getWorker(i)->description());
1238  }
1239  }
1240  }
1241 
1243  std::vector<ModuleDescription const*>& descriptions,
1244  unsigned int hint) const {
1245  descriptions.clear();
1246  bool found = false;
1247  TrigPaths::const_iterator itFound;
1248 
1249  if (hint < end_paths_.size()) {
1250  itFound = end_paths_.begin() + hint;
1251  if (itFound->name() == iEndPathLabel)
1252  found = true;
1253  }
1254  if (!found) {
1255  // if the hint did not work, do it the slow way
1256  itFound = std::find_if(
1257  end_paths_.begin(),
1258  end_paths_.end(),
1259  std::bind(std::equal_to<std::string>(), iEndPathLabel, std::bind(&Path::name, std::placeholders::_1)));
1260  if (itFound != end_paths_.end())
1261  found = true;
1262  }
1263  if (found) {
1264  descriptions.reserve(itFound->size());
1265  for (size_t i = 0; i < itFound->size(); ++i) {
1266  descriptions.push_back(itFound->getWorker(i)->description());
1267  }
1268  }
1269  }
1270 
1271  static void fillModuleInPathSummary(Path const& path, size_t which, ModuleInPathSummary& sum) {
1272  sum.timesVisited += path.timesVisited(which);
1273  sum.timesPassed += path.timesPassed(which);
1274  sum.timesFailed += path.timesFailed(which);
1275  sum.timesExcept += path.timesExcept(which);
1276  sum.moduleLabel = path.getWorker(which)->description()->moduleLabel();
1277  sum.bitPosition = path.bitPosition(which);
1278  }
1279 
1280  static void fillPathSummary(Path const& path, PathSummary& sum) {
1281  sum.name = path.name();
1282  sum.bitPosition = path.bitPosition();
1283  sum.timesRun += path.timesRun();
1284  sum.timesPassed += path.timesPassed();
1285  sum.timesFailed += path.timesFailed();
1286  sum.timesExcept += path.timesExcept();
1287 
1288  Path::size_type sz = path.size();
1289  if (sum.moduleInPathSummaries.empty()) {
1290  std::vector<ModuleInPathSummary> temp(sz);
1291  for (size_t i = 0; i != sz; ++i) {
1293  }
1294  sum.moduleInPathSummaries.swap(temp);
1295  } else {
1296  assert(sz == sum.moduleInPathSummaries.size());
1297  for (size_t i = 0; i != sz; ++i) {
1299  }
1300  }
1301  }
1302 
1303  static void fillWorkerSummaryAux(Worker const& w, WorkerSummary& sum) {
1304  sum.timesVisited += w.timesVisited();
1305  sum.timesRun += w.timesRun();
1306  sum.timesPassed += w.timesPassed();
1307  sum.timesFailed += w.timesFailed();
1308  sum.timesExcept += w.timesExcept();
1309  sum.moduleLabel = w.description()->moduleLabel();
1310  }
1311 
1312  static void fillWorkerSummary(Worker const* pw, WorkerSummary& sum) { fillWorkerSummaryAux(*pw, sum); }
1313 
1315  rep.eventSummary.totalEvents += totalEvents();
1316  rep.eventSummary.totalEventsPassed += totalEventsPassed();
1317  rep.eventSummary.totalEventsFailed += totalEventsFailed();
1318 
1319  fill_summary(trig_paths_, rep.trigPathSummaries, &fillPathSummary);
1320  fill_summary(end_paths_, rep.endPathSummaries, &fillPathSummary);
1321  fill_summary(allWorkers(), rep.workerSummaries, &fillWorkerSummary);
1322  }
1323 
1325  using std::placeholders::_1;
1327  for_all(trig_paths_, std::bind(&Path::clearCounters, _1));
1328  for_all(end_paths_, std::bind(&Path::clearCounters, _1));
1329  for_all(allWorkers(), std::bind(&Worker::clearCounters, _1));
1330  }
1331 
1332  void StreamSchedule::resetAll() { results_->reset(); }
1333 
1335 
1337  //must be sure we have cleared the count first
1338  for (auto& count : earlyDeleteBranchToCount_) {
1339  count.count = 0;
1340  }
1341  //now reset based on how many helpers use that branch
1343  ++(earlyDeleteBranchToCount_[index].count);
1344  }
1345  for (auto& helper : earlyDeleteHelpers_) {
1346  helper.reset();
1347  }
1348  }
1349 
1351  std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
1352  std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
1354  int bitpos = 0;
1355  unsigned int indexEmpty = 0;
1356  unsigned int indexOfPath = 0;
1357  for (auto& pathStatusInserter : pathStatusInserters) {
1358  std::shared_ptr<PathStatusInserter> inserterPtr = get_underlying(pathStatusInserter);
1359  WorkerPtr workerPtr(
1360  new edm::WorkerT<PathStatusInserter::ModuleType>(inserterPtr, inserterPtr->moduleDescription(), &actions));
1361  pathStatusInserterWorkers_.emplace_back(workerPtr);
1362  workerPtr->setActivityRegistry(actReg_);
1363  addToAllWorkers(workerPtr.get());
1364 
1365  // A little complexity here because a C++ Path object is not
1366  // instantiated and put into end_paths if there are no modules
1367  // on the configured path.
1368  if (indexEmpty < empty_trig_paths_.size() && bitpos == empty_trig_paths_.at(indexEmpty)) {
1369  ++indexEmpty;
1370  } else {
1371  trig_paths_.at(indexOfPath).setPathStatusInserter(inserterPtr.get(), workerPtr.get());
1372  ++indexOfPath;
1373  }
1374  ++bitpos;
1375  }
1376 
1377  bitpos = 0;
1378  indexEmpty = 0;
1379  indexOfPath = 0;
1380  for (auto& endPathStatusInserter : endPathStatusInserters) {
1381  std::shared_ptr<EndPathStatusInserter> inserterPtr = get_underlying(endPathStatusInserter);
1382  WorkerPtr workerPtr(
1383  new edm::WorkerT<EndPathStatusInserter::ModuleType>(inserterPtr, inserterPtr->moduleDescription(), &actions));
1384  endPathStatusInserterWorkers_.emplace_back(workerPtr);
1385  workerPtr->setActivityRegistry(actReg_);
1386  addToAllWorkers(workerPtr.get());
1387 
1388  // A little complexity here because a C++ Path object is not
1389  // instantiated and put into end_paths if there are no modules
1390  // on the configured path.
1391  if (indexEmpty < empty_end_paths_.size() && bitpos == empty_end_paths_.at(indexEmpty)) {
1392  ++indexEmpty;
1393  } else {
1394  end_paths_.at(indexOfPath).setPathStatusInserter(nullptr, workerPtr.get());
1395  ++indexOfPath;
1396  }
1397  ++bitpos;
1398  }
1399  }
1400 } // namespace edm
static void fillModuleInPathSummary(Path const &path, size_t which, ModuleInPathSummary &sum)
void initializeEarlyDelete(ModuleRegistry &modReg, std::vector< std::string > const &branchesToDeleteEarly, std::multimap< std::string, std::string > const &referencesToBranches, std::vector< std::string > const &modulesToSkip, edm::ProductRegistry const &preg)
void moduleDescriptionsInEndPath(std::string const &iEndPathLabel, std::vector< ModuleDescription const *> &descriptions, unsigned int hint) const
ExceptionToActionTable const & actionTable() const
returns the action table
T getParameter(std::string const &) const
Definition: ParameterSet.h:307
static const TGPicture * info(bool iBackgroundIsBlack)
#define CMS_SA_ALLOW
std::vector< int > empty_trig_paths_
Definition: helper.py:1
roAction_t actions[nactions]
Definition: GenABIO.cc:181
std::vector< edm::propagate_const< WorkerPtr > > pathStatusInserterWorkers_
void fillWorkers(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &name, bool ignoreFilters, PathWorkers &out, std::vector< std::string > const &endPathNames, ConditionalTaskHelper const &conditionalTaskHelper, std::unordered_set< std::string > &allConditionalModules)
void endStream(StreamID iID, StreamContext &streamContext)
virtual void replaceModuleFor(Worker *) const =0
ProductList const & productList() const
std::unordered_multimap< std::string, AliasInfo > const & aliasMap() const
void processAccumulatorsAsync(WaitingTaskHolder, typename T::TransitionInfoType const &, ServiceToken const &, StreamID, ParentContext const &, typename T::Context const *)
std::array< SelectedProducts, NumBranchTypes > SelectedProductsForBranchType
static PFTauRenderPlugin instance
T w() const
std::shared_ptr< HLTGlobalStatus > TrigResPtr
WorkersInPath::size_type size_type
Definition: Path.h:46
int totalEventsPassed() const
ret
prodAgent to be discontinued
void modulesInPath(std::string const &iPathLabel, std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel ...
void processOneEventAsync(WaitingTaskHolder iTask, EventTransitionInfo &, ServiceToken const &token, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters)
std::vector< BranchToCount > earlyDeleteBranchToCount_
std::unordered_multimap< std::string, AliasInfo > aliasMap_
void setupOnDemandSystem(EventTransitionInfo const &)
void clearCounters()
Definition: Worker.h:232
void addContextAndPrintException(char const *context, cms::Exception &ex, bool disablePrint)
void addToAllWorkers(Worker *w)
std::string const & moduleName() const
StreamSchedule(std::shared_ptr< TriggerResultInserter > inserter, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, std::shared_ptr< ModuleRegistry >, ParameterSet &proc_pset, service::TriggerNamesService const &tns, PreallocationConfiguration const &prealloc, ProductRegistry &pregistry, ExceptionToActionTable const &actions, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, StreamID streamID, ProcessContext const *processContext)
std::vector< Worker * > tryToPlaceConditionalModules(Worker *, std::unordered_set< std::string > &conditionalModules, std::unordered_multimap< std::string, edm::BranchDescription const *> const &conditionalModuleBranches, std::unordered_multimap< std::string, AliasInfo > const &aliasMap, ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration)
edm::propagate_const< WorkerPtr > results_inserter_
void deleteModuleIfExists(std::string const &moduleLabel)
std::shared_ptr< Worker > WorkerPtr
void find(edm::Handle< EcalRecHitCollection > &hits, DetId thisDet, std::vector< EcalRecHitCollection::const_iterator > &hit, bool debug=false)
Definition: FindCaloHit.cc:19
assert(be >=bs)
StreamID streamID() const
ConditionalTaskHelper(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, WorkerManager &workerManager, std::vector< std::string > const &trigPathNames)
unsigned int number_of_unscheduled_modules_
int totalEventsFailed() const
TEMPL(T2) struct Divides void
Definition: Factorize.h:24
WorkerManager workerManager_
std::shared_ptr< ActivityRegistry > actReg_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
T getUntrackedParameter(std::string const &, T const &) const
constexpr element_type const * get() const
std::string moduleLabel
Definition: TriggerReport.h:52
oneapi::tbb::task_group * group() const noexcept
void processEDAliases(std::vector< std::string > const &aliasNamesToProcess, std::unordered_set< std::string > const &aliasModulesToProcess, ParameterSet const &proc_pset, std::string const &processName, ProductRegistry &preg)
char const * label
std::vector< WorkerInPath > PathWorkers
std::vector< int > empty_end_paths_
void doneWaiting(std::exception_ptr iExcept)
accept
Definition: HLTenums.h:18
void processSwitchEDAliases(ParameterSet const &proc_pset, ProductRegistry &preg, ProcessConfiguration const &processConfiguration, std::unordered_set< std::string > const &allConditionalMods)
std::string name
Definition: TriggerReport.h:41
void moduleDescriptionsInPath(std::string const &iPathLabel, std::vector< ModuleDescription const *> &descriptions, unsigned int hint) const
TrigResConstPtr results() const
Strings const & getTrigPaths() const
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
def unique(seq, keepstr=True)
Definition: tier0.py:24
Definition: Path.h:41
StreamContext streamContext_
std::vector< std::string > vstring
virtual std::vector< ConsumesInfo > consumesInfo() const =0
static void fillPathSummary(Path const &path, PathSummary &sum)
void finishedPaths(std::atomic< std::exception_ptr *> &, WaitingTaskHolder, EventTransitionInfo &)
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:92
void fillTrigPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, TrigResPtr, std::vector< std::string > const &endPathNames, ConditionalTaskHelper const &conditionalTaskHelper, std::unordered_set< std::string > &allConditionalModules)
bool typeIsViewCompatible(TypeID const &requestedViewType, TypeID const &wrappedtypeID, std::string const &className)
rep
Definition: cuy.py:1189
std::exception_ptr finishProcessOneEvent(std::exception_ptr)
ServiceToken lock() const
Definition: ServiceToken.h:101
void deleteModule(std::string const &iLabel)
Delete the module with label iLabel.
int totalEvents() const
static void fillWorkerSummaryAux(Worker const &w, WorkerSummary &sum)
virtual Types moduleType() const =0
Log< level::Info, false > LogInfo
void beginStream(StreamID iID, StreamContext &streamContext)
ModuleDescription const * description() const
Definition: Worker.h:198
std::string const & className() const
Definition: TypeID.cc:40
void clearCounters()
Clear all the counters in the trigger report.
void fillAliasMap(ParameterSet const &proc_pset, std::unordered_set< std::string > const &allConditionalMods)
Strings const & getEndPaths() const
void getTriggerReport(TriggerReport &rep) const
void addToUnscheduledWorkers(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string label, std::set< std::string > &unscheduledLabels, std::vector< std::string > &shouldBeUsedLabels)
exception_actions::ActionCodes find(const std::string &category) const
std::vector< unsigned int > earlyDeleteHelperToBranchIndicies_
void forAllModuleHolders(F iFunc)
edm::propagate_const< TrigResPtr > results_
bool search_all(ForwardSequence const &s, Datum const &d)
Definition: Algorithms.h:36
double b
Definition: hdecay.h:120
constexpr T & get_underlying(propagate_const< T > &)
void addContext(std::string const &context)
Definition: Exception.cc:169
virtual std::unique_ptr< OutputModuleCommunicator > createOutputModuleCommunicator()=0
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
static void fillWorkerSummary(Worker const *pw, WorkerSummary &sum)
std::string const & processName() const
HLT enums.
void setupResolvers(Principal &principal)
void replaceModule(maker::ModuleHolder *iMod, std::string const &iLabel)
clone the type of module with label iLabel but configure with iPSet.
std::vector< edm::propagate_const< WorkerPtr > > endPathStatusInserterWorkers_
void makePathStatusInserters(std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, ExceptionToActionTable const &actions)
def which(cmd)
Definition: eostools.py:336
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
std::unordered_multimap< std::string, edm::BranchDescription const * > conditionalModuleBranches(std::unordered_set< std::string > const &conditionalmods) const
std::vector< ModuleInPathSummary > moduleInPathSummaries
Definition: TriggerReport.h:42
void availablePaths(std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill the labels for all paths in the process
unsigned int value() const
Definition: StreamID.h:43
auto wrap(F iFunc) -> decltype(iFunc())
std::unordered_multimap< std::string, edm::BranchDescription const * > conditionalModsBranches_
Log< level::Warning, false > LogWarning
void fillEndPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, std::vector< std::string > const &endPathNames, ConditionalTaskHelper const &conditionalTaskHelper, std::unordered_set< std::string > &allConditionalModules)
std::list< std::string > const & context() const
Definition: Exception.cc:151
long double T
std::string const & moduleLabel() const
T mod(const T &a, const T &b)
Definition: ecalDccMap.h:4
std::string const & name() const
Definition: Path.h:73
std::vector< std::string > vstring
Definition: Schedule.cc:482
std::vector< std::string > set_difference(std::vector< std::string > const &v1, std::vector< std::string > const &v2)
def move(src, dest)
Definition: eostools.py:511
void clearCounters()
Definition: Path.cc:183
std::vector< EarlyDeleteHelper > earlyDeleteHelpers_
void addToAllWorkers(Worker *w)
void printCmsExceptionWarning(char const *behavior, cms::Exception const &e)
ParameterSet * getPSetForUpdate(std::string const &name, bool &isTracked)
std::vector< std::string > set_intersection(std::vector< std::string > const &v1, std::vector< std::string > const &v2)
unsigned transform(const HcalDetId &id, unsigned transformCode)