CMS 3D CMS Logo

StreamSchedule.cc
Go to the documentation of this file.
2 
29 
31 #include "processEDAliases.h"
32 
33 #include <algorithm>
34 #include <cassert>
35 #include <cstdlib>
36 #include <functional>
37 #include <iomanip>
38 #include <limits>
39 #include <list>
40 #include <map>
41 #include <exception>
42 #include <fmt/format.h>
43 
44 namespace edm {
45 
46  namespace {
47 
48  // Function template to transform each element in the input range to
49  // a value placed into the output range. The supplied function
50  // should take a const_reference to the 'input', and write to a
51  // reference to the 'output'.
52  template <typename InputIterator, typename ForwardIterator, typename Func>
53  void transform_into(InputIterator begin, InputIterator end, ForwardIterator out, Func func) {
54  for (; begin != end; ++begin, ++out)
55  func(*begin, *out);
56  }
57 
58  // Function template that takes a sequence 'from', a sequence
59  // 'to', and a callable object 'func'. It and applies
60  // transform_into to fill the 'to' sequence with the values
61  // calcuated by the callable object, taking care to fill the
62  // outupt only if all calls succeed.
63  template <typename FROM, typename TO, typename FUNC>
64  void fill_summary(FROM const& from, TO& to, FUNC func) {
65  if (to.size() != from.size()) {
66  TO temp(from.size());
67  transform_into(from.begin(), from.end(), temp.begin(), func);
68  to.swap(temp);
69  } else {
70  transform_into(from.begin(), from.end(), to.begin(), func);
71  }
72  }
73 
74  // -----------------------------
75 
76  // Here we make the trigger results inserter directly. This should
77  // probably be a utility in the WorkerRegistry or elsewhere.
78 
79  StreamSchedule::WorkerPtr makeInserter(ExceptionToActionTable const& actions,
80  std::shared_ptr<ActivityRegistry> areg,
81  std::shared_ptr<TriggerResultInserter> inserter) {
83  new edm::WorkerT<TriggerResultInserter::ModuleType>(inserter, inserter->moduleDescription(), &actions));
84  ptr->setActivityRegistry(areg);
85  return ptr;
86  }
87 
88  void initializeBranchToReadingWorker(std::vector<std::string> const& branchesToDeleteEarly,
89  ProductRegistry const& preg,
90  std::multimap<std::string, Worker*>& branchToReadingWorker) {
91  auto vBranchesToDeleteEarly = branchesToDeleteEarly;
92  // Remove any duplicates
93  std::sort(vBranchesToDeleteEarly.begin(), vBranchesToDeleteEarly.end(), std::less<std::string>());
94  vBranchesToDeleteEarly.erase(std::unique(vBranchesToDeleteEarly.begin(), vBranchesToDeleteEarly.end()),
95  vBranchesToDeleteEarly.end());
96 
97  // Are the requested items in the product registry?
98  auto allBranchNames = preg.allBranchNames();
99  //the branch names all end with a period, which we do not want to compare with
100  for (auto& b : allBranchNames) {
101  b.resize(b.size() - 1);
102  }
103  std::sort(allBranchNames.begin(), allBranchNames.end(), std::less<std::string>());
104  std::vector<std::string> temp;
105  temp.reserve(vBranchesToDeleteEarly.size());
106 
107  std::set_intersection(vBranchesToDeleteEarly.begin(),
108  vBranchesToDeleteEarly.end(),
109  allBranchNames.begin(),
110  allBranchNames.end(),
111  std::back_inserter(temp));
112  vBranchesToDeleteEarly.swap(temp);
113  if (temp.size() != vBranchesToDeleteEarly.size()) {
114  std::vector<std::string> missingProducts;
115  std::set_difference(temp.begin(),
116  temp.end(),
117  vBranchesToDeleteEarly.begin(),
118  vBranchesToDeleteEarly.end(),
119  std::back_inserter(missingProducts));
120  LogInfo l("MissingProductsForCanDeleteEarly");
121  l << "The following products in the 'canDeleteEarly' list are not available in this job and will be ignored.";
122  for (auto const& n : missingProducts) {
123  l << "\n " << n;
124  }
125  }
126  //set placeholder for the branch, we will remove the nullptr if a
127  // module actually wants the branch.
128  for (auto const& branch : vBranchesToDeleteEarly) {
129  branchToReadingWorker.insert(std::make_pair(branch, static_cast<Worker*>(nullptr)));
130  }
131  }
132 
133  Worker* getWorker(std::string const& moduleLabel,
134  ParameterSet& proc_pset,
135  WorkerManager& workerManager,
136  ProductRegistry& preg,
137  PreallocationConfiguration const* prealloc,
138  std::shared_ptr<ProcessConfiguration const> processConfiguration) {
139  bool isTracked;
140  ParameterSet* modpset = proc_pset.getPSetForUpdate(moduleLabel, isTracked);
141  if (modpset == nullptr) {
142  return nullptr;
143  }
144  assert(isTracked);
145 
146  return workerManager.getWorker(*modpset, preg, prealloc, processConfiguration, moduleLabel);
147  }
148 
149  // If ConditionalTask modules exist in the container of module
150  // names, returns the range (std::pair) for the modules. The range
151  // excludes the special markers '#' (right before the
152  // ConditionalTask modules) and '@' (last element).
153  // If the module name container does not contain ConditionalTask
154  // modules, returns std::pair of end iterators.
155  template <typename T>
156  auto findConditionalTaskModulesRange(T& modnames) {
157  auto beg = std::find(modnames.begin(), modnames.end(), "#");
158  if (beg == modnames.end()) {
159  return std::pair(modnames.end(), modnames.end());
160  }
161  return std::pair(beg + 1, std::prev(modnames.end()));
162  }
163 
164  std::optional<std::string> findBestMatchingAlias(
165  std::unordered_multimap<std::string, edm::BranchDescription const*> const& conditionalModuleBranches,
166  std::unordered_multimap<std::string, StreamSchedule::AliasInfo> const& aliasMap,
167  std::string const& productModuleLabel,
168  ConsumesInfo const& consumesInfo) {
169  std::optional<std::string> best;
170  int wildcardsInBest = std::numeric_limits<int>::max();
171  bool bestIsAmbiguous = false;
172 
173  auto updateBest = [&best, &wildcardsInBest, &bestIsAmbiguous](
174  std::string const& label, bool instanceIsWildcard, bool typeIsWildcard) {
175  int const wildcards = static_cast<int>(instanceIsWildcard) + static_cast<int>(typeIsWildcard);
176  if (wildcards == 0) {
177  bestIsAmbiguous = false;
178  return true;
179  }
180  if (not best or wildcards < wildcardsInBest) {
181  best = label;
182  wildcardsInBest = wildcards;
183  bestIsAmbiguous = false;
184  } else if (best and *best != label and wildcardsInBest == wildcards) {
185  bestIsAmbiguous = true;
186  }
187  return false;
188  };
189 
190  auto findAlias = aliasMap.equal_range(productModuleLabel);
191  for (auto it = findAlias.first; it != findAlias.second; ++it) {
192  std::string const& aliasInstanceLabel =
193  it->second.instanceLabel != "*" ? it->second.instanceLabel : it->second.originalInstanceLabel;
194  bool const instanceIsWildcard = (aliasInstanceLabel == "*");
195  if (instanceIsWildcard or consumesInfo.instance() == aliasInstanceLabel) {
196  bool const typeIsWildcard = it->second.friendlyClassName == "*";
197  if (typeIsWildcard or (consumesInfo.type().friendlyClassName() == it->second.friendlyClassName)) {
198  if (updateBest(it->second.originalModuleLabel, instanceIsWildcard, typeIsWildcard)) {
199  return it->second.originalModuleLabel;
200  }
201  } else if (consumesInfo.kindOfType() == ELEMENT_TYPE) {
202  //consume is a View so need to do more intrusive search
203  //find matching branches in module
204  auto branches = conditionalModuleBranches.equal_range(productModuleLabel);
205  for (auto itBranch = branches.first; itBranch != branches.second; ++it) {
206  if (typeIsWildcard or itBranch->second->productInstanceName() == it->second.originalInstanceLabel) {
207  if (productholderindexhelper::typeIsViewCompatible(consumesInfo.type(),
208  TypeID(itBranch->second->wrappedType().typeInfo()),
209  itBranch->second->className())) {
210  if (updateBest(it->second.originalModuleLabel, instanceIsWildcard, typeIsWildcard)) {
211  return it->second.originalModuleLabel;
212  }
213  }
214  }
215  }
216  }
217  }
218  }
219  if (bestIsAmbiguous) {
221  << "Encountered ambiguity when trying to find a best-matching alias for\n"
222  << " friendly class name " << consumesInfo.type().friendlyClassName() << "\n"
223  << " module label " << productModuleLabel << "\n"
224  << " product instance name " << consumesInfo.instance() << "\n"
225  << "when processing EDAliases for modules in ConditionalTasks. Two aliases have the same number of "
226  "wildcards ("
227  << wildcardsInBest << ")";
228  }
229  return best;
230  }
231  } // namespace
232 
233  // -----------------------------
234 
235  typedef std::vector<std::string> vstring;
236 
237  // -----------------------------
238 
240  public:
242 
244  ProductRegistry& preg,
245  PreallocationConfiguration const* prealloc,
246  std::shared_ptr<ProcessConfiguration const> processConfiguration,
247  WorkerManager& workerManagerLumisAndEvents,
248  std::vector<std::string> const& trigPathNames) {
249  std::unordered_set<std::string> allConditionalMods;
250  for (auto const& pathName : trigPathNames) {
251  auto const modnames = proc_pset.getParameter<vstring>(pathName);
252 
253  //Pull out ConditionalTask modules
254  auto condRange = findConditionalTaskModulesRange(modnames);
255  if (condRange.first == condRange.second)
256  continue;
257 
258  //the last entry should be ignored since it is required to be "@"
259  allConditionalMods.insert(condRange.first, condRange.second);
260  }
261 
262  for (auto const& cond : allConditionalMods) {
263  //force the creation of the conditional modules so alias check can work
264  (void)getWorker(cond, proc_pset, workerManagerLumisAndEvents, preg, prealloc, processConfiguration);
265  }
266 
267  fillAliasMap(proc_pset, allConditionalMods);
268  processSwitchEDAliases(proc_pset, preg, *processConfiguration, allConditionalMods);
269 
270  //find branches created by the conditional modules
271  for (auto const& prod : preg.productList()) {
272  if (allConditionalMods.find(prod.first.moduleLabel()) != allConditionalMods.end()) {
273  conditionalModsBranches_.emplace(prod.first.moduleLabel(), &prod.second);
274  }
275  }
276  }
277 
278  std::unordered_multimap<std::string, AliasInfo> const& aliasMap() const { return aliasMap_; }
279 
280  std::unordered_multimap<std::string, edm::BranchDescription const*> conditionalModuleBranches(
281  std::unordered_set<std::string> const& conditionalmods) const {
282  std::unordered_multimap<std::string, edm::BranchDescription const*> ret;
283  for (auto const& mod : conditionalmods) {
284  auto range = conditionalModsBranches_.equal_range(mod);
285  ret.insert(range.first, range.second);
286  }
287  return ret;
288  }
289 
290  private:
291  void fillAliasMap(ParameterSet const& proc_pset, std::unordered_set<std::string> const& allConditionalMods) {
292  auto aliases = proc_pset.getParameter<std::vector<std::string>>("@all_aliases");
293  std::string const star("*");
294  for (auto const& alias : aliases) {
295  auto info = proc_pset.getParameter<edm::ParameterSet>(alias);
296  auto aliasedToModuleLabels = info.getParameterNames();
297  for (auto const& mod : aliasedToModuleLabels) {
298  if (not mod.empty() and mod[0] != '@' and allConditionalMods.find(mod) != allConditionalMods.end()) {
299  auto aliasVPSet = info.getParameter<std::vector<edm::ParameterSet>>(mod);
300  for (auto const& aliasPSet : aliasVPSet) {
301  std::string type = star;
302  std::string instance = star;
303  std::string originalInstance = star;
304  if (aliasPSet.exists("type")) {
305  type = aliasPSet.getParameter<std::string>("type");
306  }
307  if (aliasPSet.exists("toProductInstance")) {
308  instance = aliasPSet.getParameter<std::string>("toProductInstance");
309  }
310  if (aliasPSet.exists("fromProductInstance")) {
311  originalInstance = aliasPSet.getParameter<std::string>("fromProductInstance");
312  }
313 
314  aliasMap_.emplace(alias, AliasInfo{type, instance, originalInstance, mod});
315  }
316  }
317  }
318  }
319  }
320 
321  void processSwitchEDAliases(ParameterSet const& proc_pset,
322  ProductRegistry& preg,
323  ProcessConfiguration const& processConfiguration,
324  std::unordered_set<std::string> const& allConditionalMods) {
325  auto const& all_modules = proc_pset.getParameter<std::vector<std::string>>("@all_modules");
326  std::vector<std::string> switchEDAliases;
327  for (auto const& module : all_modules) {
328  auto const& mod_pset = proc_pset.getParameter<edm::ParameterSet>(module);
329  if (mod_pset.getParameter<std::string>("@module_type") == "SwitchProducer") {
330  auto const& all_cases = mod_pset.getParameter<std::vector<std::string>>("@all_cases");
331  for (auto const& case_label : all_cases) {
332  auto range = aliasMap_.equal_range(case_label);
333  if (range.first != range.second) {
334  switchEDAliases.push_back(case_label);
335  }
336  }
337  }
338  }
340  switchEDAliases, allConditionalMods, proc_pset, processConfiguration.processName(), preg);
341  }
342 
343  std::unordered_multimap<std::string, AliasInfo> aliasMap_;
344  std::unordered_multimap<std::string, edm::BranchDescription const*> conditionalModsBranches_;
345  };
346 
347  // -----------------------------
348 
350  std::shared_ptr<TriggerResultInserter> inserter,
351  std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
352  std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
353  std::shared_ptr<ModuleRegistry> modReg,
354  ParameterSet& proc_pset,
355  service::TriggerNamesService const& tns,
356  PreallocationConfiguration const& prealloc,
357  ProductRegistry& preg,
359  std::shared_ptr<ActivityRegistry> areg,
360  std::shared_ptr<ProcessConfiguration const> processConfiguration,
361  StreamID streamID,
362  ProcessContext const* processContext)
363  : workerManagerBeginEnd_(modReg, areg, actions),
364  workerManagerRuns_(modReg, areg, actions),
365  workerManagerLumisAndEvents_(modReg, areg, actions),
366  actReg_(areg),
367  results_(new HLTGlobalStatus(tns.getTrigPaths().size())),
368  results_inserter_(),
369  trig_paths_(),
370  end_paths_(),
371  total_events_(),
372  total_passed_(),
373  number_of_unscheduled_modules_(0),
374  streamID_(streamID),
375  streamContext_(streamID_, processContext) {
376  bool hasPath = false;
377  std::vector<std::string> const& pathNames = tns.getTrigPaths();
378  std::vector<std::string> const& endPathNames = tns.getEndPaths();
379 
380  ConditionalTaskHelper conditionalTaskHelper(
381  proc_pset, preg, &prealloc, processConfiguration, workerManagerLumisAndEvents_, pathNames);
382  std::unordered_set<std::string> conditionalModules;
383 
384  int trig_bitpos = 0;
385  trig_paths_.reserve(pathNames.size());
386  for (auto const& trig_name : pathNames) {
387  fillTrigPath(proc_pset,
388  preg,
389  &prealloc,
390  processConfiguration,
391  trig_bitpos,
392  trig_name,
393  results(),
394  endPathNames,
395  conditionalTaskHelper,
396  conditionalModules);
397  ++trig_bitpos;
398  hasPath = true;
399  }
400 
401  if (hasPath) {
402  // the results inserter stands alone
403  inserter->setTrigResultForStream(streamID.value(), results());
404 
405  results_inserter_ = makeInserter(actions, actReg_, inserter);
407  }
408 
409  // fill normal endpaths
410  int bitpos = 0;
411  end_paths_.reserve(endPathNames.size());
412  for (auto const& end_path_name : endPathNames) {
413  fillEndPath(proc_pset,
414  preg,
415  &prealloc,
416  processConfiguration,
417  bitpos,
418  end_path_name,
419  endPathNames,
420  conditionalTaskHelper,
421  conditionalModules);
422  ++bitpos;
423  }
424 
425  makePathStatusInserters(pathStatusInserters, endPathStatusInserters, actions);
426 
427  //See if all modules were used
428  std::set<std::string> usedWorkerLabels;
429  for (auto const& worker : allWorkersLumisAndEvents()) {
430  usedWorkerLabels.insert(worker->description()->moduleLabel());
431  }
432  std::vector<std::string> modulesInConfig(proc_pset.getParameter<std::vector<std::string>>("@all_modules"));
433  std::set<std::string> modulesInConfigSet(modulesInConfig.begin(), modulesInConfig.end());
434  std::vector<std::string> unusedLabels;
435  set_difference(modulesInConfigSet.begin(),
436  modulesInConfigSet.end(),
437  usedWorkerLabels.begin(),
438  usedWorkerLabels.end(),
439  back_inserter(unusedLabels));
440  std::set<std::string> unscheduledLabels;
441  std::vector<std::string> shouldBeUsedLabels;
442  if (!unusedLabels.empty()) {
443  //Need to
444  // 1) create worker
445  // 2) if it is a WorkerT<EDProducer>, add it to our list
446  // 3) hand list to our delayed reader
447  for (auto const& label : unusedLabels) {
448  bool isTracked;
449  ParameterSet* modulePSet(proc_pset.getPSetForUpdate(label, isTracked));
450  assert(isTracked);
451  assert(modulePSet != nullptr);
453  *modulePSet, preg, &prealloc, processConfiguration, label, unscheduledLabels, shouldBeUsedLabels);
454  }
455  if (!shouldBeUsedLabels.empty()) {
456  std::ostringstream unusedStream;
457  unusedStream << "'" << shouldBeUsedLabels.front() << "'";
458  for (std::vector<std::string>::iterator itLabel = shouldBeUsedLabels.begin() + 1,
459  itLabelEnd = shouldBeUsedLabels.end();
460  itLabel != itLabelEnd;
461  ++itLabel) {
462  unusedStream << ",'" << *itLabel << "'";
463  }
464  LogInfo("path") << "The following module labels are not assigned to any path:\n" << unusedStream.str() << "\n";
465  }
466  }
467  number_of_unscheduled_modules_ = unscheduledLabels.size();
468 
469  // Print conditional modules that were not consumed in any of their associated Paths
470  if (streamID.value() == 0 and not conditionalModules.empty()) {
471  // Intersection of unscheduled and ConditionalTask modules gives
472  // directly the set of conditional modules that were not
473  // consumed by anything in the Paths associated to the
474  // corresponding ConditionalTask.
475  std::vector<std::string_view> labelsToPrint;
476  std::copy_if(
477  unscheduledLabels.begin(),
478  unscheduledLabels.end(),
479  std::back_inserter(labelsToPrint),
480  [&conditionalModules](auto const& lab) { return conditionalModules.find(lab) != conditionalModules.end(); });
481 
482  if (not labelsToPrint.empty()) {
483  edm::LogWarning log("NonConsumedConditionalModules");
484  log << "The following modules were part of some ConditionalTask, but were not\n"
485  << "consumed by any other module in any of the Paths to which the ConditionalTask\n"
486  << "was associated. Perhaps they should be either removed from the\n"
487  << "job, or moved to a Task to make it explicit they are unscheduled.\n";
488  for (auto const& modLabel : labelsToPrint) {
489  log.format("\n {}", modLabel);
490  }
491  }
492  }
493 
494  for (auto const& worker : allWorkersLumisAndEvents()) {
495  std::string const& moduleLabel = worker->description()->moduleLabel();
496 
497  // The new worker pointers will be null for the TriggerResultsInserter, PathStatusInserter, and
498  // EndPathStatusInserter because there are no ParameterSets for those in the configuration.
499  // We could add special code to create workers for those, but instead we skip them because they
500  // do not have beginStream, endStream, or run/lumi begin/end stream transition functions.
501 
502  Worker* workerBeginEnd =
503  getWorker(moduleLabel, proc_pset, workerManagerBeginEnd_, preg, &prealloc, processConfiguration);
504  if (workerBeginEnd) {
505  workerManagerBeginEnd_.addToAllWorkers(workerBeginEnd);
506  }
507 
508  Worker* workerRuns = getWorker(moduleLabel, proc_pset, workerManagerRuns_, preg, &prealloc, processConfiguration);
509  if (workerRuns) {
511  }
512  }
513 
514  } // StreamSchedule::StreamSchedule
515 
517  std::vector<std::string> const& branchesToDeleteEarly,
518  std::multimap<std::string, std::string> const& referencesToBranches,
519  std::vector<std::string> const& modulesToSkip,
520  edm::ProductRegistry const& preg) {
521  // setup the list with those products actually registered for this job
522  std::multimap<std::string, Worker*> branchToReadingWorker;
523  initializeBranchToReadingWorker(branchesToDeleteEarly, preg, branchToReadingWorker);
524 
525  const std::vector<std::string> kEmpty;
526  std::map<Worker*, unsigned int> reserveSizeForWorker;
527  unsigned int upperLimitOnReadingWorker = 0;
528  unsigned int upperLimitOnIndicies = 0;
529  unsigned int nUniqueBranchesToDelete = branchToReadingWorker.size();
530 
531  //talk with output modules first
532  modReg.forAllModuleHolders([&branchToReadingWorker, &nUniqueBranchesToDelete](maker::ModuleHolder* iHolder) {
533  auto comm = iHolder->createOutputModuleCommunicator();
534  if (comm) {
535  if (!branchToReadingWorker.empty()) {
536  //If an OutputModule needs a product, we can't delete it early
537  // so we should remove it from our list
538  SelectedProductsForBranchType const& kept = comm->keptProducts();
539  for (auto const& item : kept[InEvent]) {
540  BranchDescription const& desc = *item.first;
541  auto found = branchToReadingWorker.equal_range(desc.branchName());
542  if (found.first != found.second) {
543  --nUniqueBranchesToDelete;
544  branchToReadingWorker.erase(found.first, found.second);
545  }
546  }
547  }
548  }
549  });
550 
551  if (branchToReadingWorker.empty()) {
552  return;
553  }
554 
555  std::unordered_set<std::string> modulesToExclude(modulesToSkip.begin(), modulesToSkip.end());
556  for (auto w : allWorkersLumisAndEvents()) {
557  if (modulesToExclude.end() != modulesToExclude.find(w->description()->moduleLabel())) {
558  continue;
559  }
560  //determine if this module could read a branch we want to delete early
561  auto consumes = w->consumesInfo();
562  if (not consumes.empty()) {
563  bool foundAtLeastOneMatchingBranch = false;
564  for (auto const& product : consumes) {
565  std::string branch = fmt::format("{}_{}_{}_{}",
566  product.type().friendlyClassName(),
567  product.label().data(),
568  product.instance().data(),
569  product.process().data());
570  {
571  //Handle case where worker directly consumes product
572  auto found = branchToReadingWorker.end();
573  if (product.process().empty()) {
574  auto startFound = branchToReadingWorker.lower_bound(branch);
575  if (startFound != branchToReadingWorker.end()) {
576  if (startFound->first.substr(0, branch.size()) == branch) {
577  //match all processNames here, even if it means multiple matches will happen
578  found = startFound;
579  }
580  }
581  } else {
582  auto exactFound = branchToReadingWorker.equal_range(branch);
583  if (exactFound.first != exactFound.second) {
584  found = exactFound.first;
585  }
586  }
587  if (found != branchToReadingWorker.end()) {
588  if (not foundAtLeastOneMatchingBranch) {
589  ++upperLimitOnReadingWorker;
590  foundAtLeastOneMatchingBranch = true;
591  }
592  ++upperLimitOnIndicies;
593  ++reserveSizeForWorker[w];
594  if (nullptr == found->second) {
595  found->second = w;
596  } else {
597  branchToReadingWorker.insert(make_pair(found->first, w));
598  }
599  }
600  }
601  {
602  //Handle case where indirectly consumes product
603  auto found = referencesToBranches.end();
604  if (product.process().empty()) {
605  auto startFound = referencesToBranches.lower_bound(branch);
606  if (startFound != referencesToBranches.end()) {
607  if (startFound->first.substr(0, branch.size()) == branch) {
608  //match all processNames here, even if it means multiple matches will happen
609  found = startFound;
610  }
611  }
612  } else {
613  //can match exactly
614  auto exactFound = referencesToBranches.equal_range(branch);
615  if (exactFound.first != exactFound.second) {
616  found = exactFound.first;
617  }
618  }
619  if (found != referencesToBranches.end()) {
620  for (auto itr = found; (itr != referencesToBranches.end()) and (itr->first == found->first); ++itr) {
621  auto foundInBranchToReadingWorker = branchToReadingWorker.find(itr->second);
622  if (foundInBranchToReadingWorker == branchToReadingWorker.end()) {
623  continue;
624  }
625  if (not foundAtLeastOneMatchingBranch) {
626  ++upperLimitOnReadingWorker;
627  foundAtLeastOneMatchingBranch = true;
628  }
629  ++upperLimitOnIndicies;
630  ++reserveSizeForWorker[w];
631  if (nullptr == foundInBranchToReadingWorker->second) {
632  foundInBranchToReadingWorker->second = w;
633  } else {
634  branchToReadingWorker.insert(make_pair(itr->second, w));
635  }
636  }
637  }
638  }
639  }
640  }
641  }
642  {
643  auto it = branchToReadingWorker.begin();
644  std::vector<std::string> unusedBranches;
645  while (it != branchToReadingWorker.end()) {
646  if (it->second == nullptr) {
647  unusedBranches.push_back(it->first);
648  //erasing the object invalidates the iterator so must advance it first
649  auto temp = it;
650  ++it;
651  branchToReadingWorker.erase(temp);
652  } else {
653  ++it;
654  }
655  }
656  if (not unusedBranches.empty()) {
657  LogWarning l("UnusedProductsForCanDeleteEarly");
658  l << "The following products in the 'canDeleteEarly' list are not used in this job and will be ignored.\n"
659  " If possible, remove the producer from the job.";
660  for (auto const& n : unusedBranches) {
661  l << "\n " << n;
662  }
663  }
664  }
665  if (!branchToReadingWorker.empty()) {
666  earlyDeleteHelpers_.reserve(upperLimitOnReadingWorker);
667  earlyDeleteHelperToBranchIndicies_.resize(upperLimitOnIndicies, 0);
668  earlyDeleteBranchToCount_.reserve(nUniqueBranchesToDelete);
669  std::map<const Worker*, EarlyDeleteHelper*> alreadySeenWorkers;
670  std::string lastBranchName;
671  size_t nextOpenIndex = 0;
672  unsigned int* beginAddress = &(earlyDeleteHelperToBranchIndicies_.front());
673  for (auto& branchAndWorker : branchToReadingWorker) {
674  if (lastBranchName != branchAndWorker.first) {
675  //have to put back the period we removed earlier in order to get the proper name
676  BranchID bid(branchAndWorker.first + ".");
677  earlyDeleteBranchToCount_.emplace_back(bid, 0U);
678  lastBranchName = branchAndWorker.first;
679  }
680  auto found = alreadySeenWorkers.find(branchAndWorker.second);
681  if (alreadySeenWorkers.end() == found) {
682  //NOTE: we will set aside enough space in earlyDeleteHelperToBranchIndicies_ to accommodate
683  // all the branches that might be read by this worker. However, initially we will only tell the
684  // EarlyDeleteHelper about the first one. As additional branches are added via 'appendIndex' the
685  // EarlyDeleteHelper will automatically advance its internal end pointer.
686  size_t index = nextOpenIndex;
687  size_t nIndices = reserveSizeForWorker[branchAndWorker.second];
690  earlyDeleteHelpers_.emplace_back(beginAddress + index, beginAddress + index + 1, &earlyDeleteBranchToCount_);
691  branchAndWorker.second->setEarlyDeleteHelper(&(earlyDeleteHelpers_.back()));
692  alreadySeenWorkers.insert(std::make_pair(branchAndWorker.second, &(earlyDeleteHelpers_.back())));
693  nextOpenIndex += nIndices;
694  } else {
695  found->second->appendIndex(earlyDeleteBranchToCount_.size() - 1);
696  }
697  }
698 
699  //Now we can compactify the earlyDeleteHelperToBranchIndicies_ since we may have over estimated the
700  // space needed for each module
701  auto itLast = earlyDeleteHelpers_.begin();
702  for (auto it = earlyDeleteHelpers_.begin() + 1; it != earlyDeleteHelpers_.end(); ++it) {
703  if (itLast->end() != it->begin()) {
704  //figure the offset for next Worker since it hasn't been moved yet so it has the original address
705  unsigned int delta = it->begin() - itLast->end();
706  it->shiftIndexPointers(delta);
707 
709  earlyDeleteHelperToBranchIndicies_.begin() + (itLast->end() - beginAddress),
710  earlyDeleteHelperToBranchIndicies_.begin() + (it->begin() - beginAddress));
711  }
712  itLast = it;
713  }
715  earlyDeleteHelperToBranchIndicies_.begin() + (itLast->end() - beginAddress),
717 
718  //now tell the paths about the deleters
719  for (auto& p : trig_paths_) {
720  p.setEarlyDeleteHelpers(alreadySeenWorkers);
721  }
722  for (auto& p : end_paths_) {
723  p.setEarlyDeleteHelpers(alreadySeenWorkers);
724  }
726  }
727  }
728 
730  Worker* worker,
731  std::unordered_set<std::string>& conditionalModules,
732  std::unordered_multimap<std::string, edm::BranchDescription const*> const& conditionalModuleBranches,
733  std::unordered_multimap<std::string, AliasInfo> const& aliasMap,
734  ParameterSet& proc_pset,
735  ProductRegistry& preg,
736  PreallocationConfiguration const* prealloc,
737  std::shared_ptr<ProcessConfiguration const> processConfiguration) {
738  std::vector<Worker*> returnValue;
739  auto const& consumesInfo = worker->consumesInfo();
740  auto moduleLabel = worker->description()->moduleLabel();
741  using namespace productholderindexhelper;
742  for (auto const& ci : consumesInfo) {
743  if (not ci.skipCurrentProcess() and
744  (ci.process().empty() or ci.process() == processConfiguration->processName())) {
745  auto productModuleLabel = std::string(ci.label());
746  bool productFromConditionalModule = false;
747  auto itFound = conditionalModules.find(productModuleLabel);
748  if (itFound == conditionalModules.end()) {
749  //Check to see if this was an alias
750  //note that aliasMap was previously filtered so only the conditional modules remain there
751  auto foundAlias = findBestMatchingAlias(conditionalModuleBranches, aliasMap, productModuleLabel, ci);
752  if (foundAlias) {
753  productModuleLabel = *foundAlias;
754  productFromConditionalModule = true;
755  itFound = conditionalModules.find(productModuleLabel);
756  //check that the alias-for conditional module has not been used
757  if (itFound == conditionalModules.end()) {
758  continue;
759  }
760  }
761  } else {
762  //need to check the rest of the data product info
763  auto findBranches = conditionalModuleBranches.equal_range(productModuleLabel);
764  for (auto itBranch = findBranches.first; itBranch != findBranches.second; ++itBranch) {
765  if (itBranch->second->productInstanceName() == ci.instance()) {
766  if (ci.kindOfType() == PRODUCT_TYPE) {
767  if (ci.type() == itBranch->second->unwrappedTypeID()) {
768  productFromConditionalModule = true;
769  break;
770  }
771  } else {
772  //this is a view
774  ci.type(), TypeID(itBranch->second->wrappedType().typeInfo()), itBranch->second->className())) {
775  productFromConditionalModule = true;
776  break;
777  }
778  }
779  }
780  }
781  }
782  if (productFromConditionalModule) {
783  auto condWorker = getWorker(
784  productModuleLabel, proc_pset, workerManagerLumisAndEvents_, preg, prealloc, processConfiguration);
785  assert(condWorker);
786 
787  conditionalModules.erase(itFound);
788 
789  auto dependents = tryToPlaceConditionalModules(condWorker,
790  conditionalModules,
791  conditionalModuleBranches,
792  aliasMap,
793  proc_pset,
794  preg,
795  prealloc,
796  processConfiguration);
797  returnValue.insert(returnValue.end(), dependents.begin(), dependents.end());
798  returnValue.push_back(condWorker);
799  }
800  }
801  }
802  return returnValue;
803  }
804 
806  ProductRegistry& preg,
807  PreallocationConfiguration const* prealloc,
808  std::shared_ptr<ProcessConfiguration const> processConfiguration,
809  std::string const& pathName,
810  bool ignoreFilters,
811  PathWorkers& out,
812  std::vector<std::string> const& endPathNames,
813  ConditionalTaskHelper const& conditionalTaskHelper,
814  std::unordered_set<std::string>& allConditionalModules) {
815  vstring modnames = proc_pset.getParameter<vstring>(pathName);
816  PathWorkers tmpworkers;
817 
818  //Pull out ConditionalTask modules
819  auto condRange = findConditionalTaskModulesRange(modnames);
820 
821  std::unordered_set<std::string> conditionalmods;
822  //An EDAlias may be redirecting to a module on a ConditionalTask
823  std::unordered_multimap<std::string, edm::BranchDescription const*> conditionalModsBranches;
824  std::unordered_map<std::string, unsigned int> conditionalModOrder;
825  if (condRange.first != condRange.second) {
826  for (auto it = condRange.first; it != condRange.second; ++it) {
827  // ordering needs to skip the # token in the path list
828  conditionalModOrder.emplace(*it, it - modnames.begin() - 1);
829  }
830  //the last entry should be ignored since it is required to be "@"
831  conditionalmods = std::unordered_set<std::string>(std::make_move_iterator(condRange.first),
832  std::make_move_iterator(condRange.second));
833 
834  conditionalModsBranches = conditionalTaskHelper.conditionalModuleBranches(conditionalmods);
835  modnames.erase(std::prev(condRange.first), modnames.end());
836 
837  // Make a union of all conditional modules from all Paths
838  allConditionalModules.insert(conditionalmods.begin(), conditionalmods.end());
839  }
840 
841  unsigned int placeInPath = 0;
842  for (auto const& name : modnames) {
843  //Modules except EDFilters are set to run concurrently by default
844  bool doNotRunConcurrently = false;
846  if (name[0] == '!') {
847  filterAction = WorkerInPath::Veto;
848  } else if (name[0] == '-' or name[0] == '+') {
849  filterAction = WorkerInPath::Ignore;
850  }
851  if (name[0] == '|' or name[0] == '+') {
852  //cms.wait was specified so do not run concurrently
853  doNotRunConcurrently = true;
854  }
855 
857  if (filterAction != WorkerInPath::Normal or name[0] == '|') {
858  moduleLabel.erase(0, 1);
859  }
860 
861  Worker* worker =
862  getWorker(moduleLabel, proc_pset, workerManagerLumisAndEvents_, preg, prealloc, processConfiguration);
863  if (worker == nullptr) {
864  std::string pathType("endpath");
865  if (!search_all(endPathNames, pathName)) {
866  pathType = std::string("path");
867  }
869  << "The unknown module label \"" << moduleLabel << "\" appears in " << pathType << " \"" << pathName
870  << "\"\n please check spelling or remove that label from the path.";
871  }
872 
873  if (ignoreFilters && filterAction != WorkerInPath::Ignore && worker->moduleType() == Worker::kFilter) {
874  // We have a filter on an end path, and the filter is not explicitly ignored.
875  // See if the filter is allowed.
876  std::vector<std::string> allowed_filters = proc_pset.getUntrackedParameter<vstring>("@filters_on_endpaths");
877  if (!search_all(allowed_filters, worker->description()->moduleName())) {
878  // Filter is not allowed. Ignore the result, and issue a warning.
879  filterAction = WorkerInPath::Ignore;
880  LogWarning("FilterOnEndPath") << "The EDFilter '" << worker->description()->moduleName()
881  << "' with module label '" << moduleLabel << "' appears on EndPath '"
882  << pathName << "'.\n"
883  << "The return value of the filter will be ignored.\n"
884  << "To suppress this warning, either remove the filter from the endpath,\n"
885  << "or explicitly ignore it in the configuration by using cms.ignore().\n";
886  }
887  }
888  bool runConcurrently = not doNotRunConcurrently;
889  if (runConcurrently && worker->moduleType() == Worker::kFilter and filterAction != WorkerInPath::Ignore) {
890  runConcurrently = false;
891  }
892 
893  auto condModules = tryToPlaceConditionalModules(worker,
894  conditionalmods,
895  conditionalModsBranches,
896  conditionalTaskHelper.aliasMap(),
897  proc_pset,
898  preg,
899  prealloc,
900  processConfiguration);
901  for (auto condMod : condModules) {
902  tmpworkers.emplace_back(
903  condMod, WorkerInPath::Ignore, conditionalModOrder[condMod->description()->moduleLabel()], true);
904  }
905 
906  tmpworkers.emplace_back(worker, filterAction, placeInPath, runConcurrently);
907  ++placeInPath;
908  }
909 
910  out.swap(tmpworkers);
911  }
912 
914  ProductRegistry& preg,
915  PreallocationConfiguration const* prealloc,
916  std::shared_ptr<ProcessConfiguration const> processConfiguration,
917  int bitpos,
918  std::string const& name,
919  TrigResPtr trptr,
920  std::vector<std::string> const& endPathNames,
921  ConditionalTaskHelper const& conditionalTaskHelper,
922  std::unordered_set<std::string>& allConditionalModules) {
923  PathWorkers tmpworkers;
924  fillWorkers(proc_pset,
925  preg,
926  prealloc,
927  processConfiguration,
928  name,
929  false,
930  tmpworkers,
931  endPathNames,
932  conditionalTaskHelper,
933  allConditionalModules);
934 
935  // an empty path will cause an extra bit that is not used
936  if (!tmpworkers.empty()) {
937  trig_paths_.emplace_back(
938  bitpos, name, tmpworkers, trptr, actionTable(), actReg_, &streamContext_, PathContext::PathType::kPath);
939  } else {
940  empty_trig_paths_.push_back(bitpos);
941  }
942  for (WorkerInPath const& workerInPath : tmpworkers) {
943  addToAllWorkers(workerInPath.getWorker());
944  }
945  }
946 
948  ProductRegistry& preg,
949  PreallocationConfiguration const* prealloc,
950  std::shared_ptr<ProcessConfiguration const> processConfiguration,
951  int bitpos,
952  std::string const& name,
953  std::vector<std::string> const& endPathNames,
954  ConditionalTaskHelper const& conditionalTaskHelper,
955  std::unordered_set<std::string>& allConditionalModules) {
956  PathWorkers tmpworkers;
957  fillWorkers(proc_pset,
958  preg,
959  prealloc,
960  processConfiguration,
961  name,
962  true,
963  tmpworkers,
964  endPathNames,
965  conditionalTaskHelper,
966  allConditionalModules);
967 
968  if (!tmpworkers.empty()) {
969  end_paths_.emplace_back(bitpos,
970  name,
971  tmpworkers,
972  TrigResPtr(),
973  actionTable(),
974  actReg_,
977  } else {
978  empty_end_paths_.push_back(bitpos);
979  }
980  for (WorkerInPath const& workerInPath : tmpworkers) {
981  addToAllWorkers(workerInPath.getWorker());
982  }
983  }
984 
986 
988 
990  for (auto const& worker : allWorkersBeginEnd()) {
991  if (worker->description()->moduleLabel() == iLabel) {
992  iMod->replaceModuleFor(worker);
993  worker->beginStream(streamID_, streamContext_);
994  break;
995  }
996  }
997 
998  for (auto const& worker : allWorkersRuns()) {
999  if (worker->description()->moduleLabel() == iLabel) {
1000  iMod->replaceModuleFor(worker);
1001  break;
1002  }
1003  }
1004 
1005  for (auto const& worker : allWorkersLumisAndEvents()) {
1006  if (worker->description()->moduleLabel() == iLabel) {
1007  iMod->replaceModuleFor(worker);
1008  break;
1009  }
1010  }
1011  }
1012 
1017  }
1018 
1019  std::vector<ModuleDescription const*> StreamSchedule::getAllModuleDescriptions() const {
1020  std::vector<ModuleDescription const*> result;
1021  result.reserve(allWorkersLumisAndEvents().size());
1022 
1023  for (auto const& worker : allWorkersLumisAndEvents()) {
1024  ModuleDescription const* p = worker->description();
1025  result.push_back(p);
1026  }
1027  return result;
1028  }
1029 
1031  WaitingTaskHolder iTask,
1033  ServiceToken const& serviceToken,
1034  std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters) {
1035  EventPrincipal& ep = info.principal();
1036 
1037  // Caught exception is propagated via WaitingTaskHolder
1038  CMS_SA_ALLOW try {
1039  this->resetAll();
1040 
1042 
1043  Traits::setStreamContext(streamContext_, ep);
1044  //a service may want to communicate with another service
1045  ServiceRegistry::Operate guard(serviceToken);
1046  Traits::preScheduleSignal(actReg_.get(), &streamContext_);
1047 
1048  // Data dependencies need to be set up before marking empty
1049  // (End)Paths complete in case something consumes the status of
1050  // the empty (EndPath)
1053 
1054  HLTPathStatus hltPathStatus(hlt::Pass, 0);
1055  for (int empty_trig_path : empty_trig_paths_) {
1056  results_->at(empty_trig_path) = hltPathStatus;
1057  pathStatusInserters[empty_trig_path]->setPathStatus(streamID_, hltPathStatus);
1058  std::exception_ptr except = pathStatusInserterWorkers_[empty_trig_path]
1061  if (except) {
1062  iTask.doneWaiting(except);
1063  return;
1064  }
1065  }
1066  if (not endPathStatusInserterWorkers_.empty()) {
1067  for (int empty_end_path : empty_end_paths_) {
1068  std::exception_ptr except =
1069  endPathStatusInserterWorkers_[empty_end_path]
1072  if (except) {
1073  iTask.doneWaiting(except);
1074  return;
1075  }
1076  }
1077  }
1078 
1079  ++total_events_;
1080 
1081  //use to give priorities on an error to ones from Paths
1082  auto pathErrorHolder = std::make_unique<std::atomic<std::exception_ptr*>>(nullptr);
1083  auto pathErrorPtr = pathErrorHolder.get();
1084  ServiceWeakToken weakToken = serviceToken;
1085  auto allPathsDone = make_waiting_task(
1086  [iTask, this, weakToken, pathError = std::move(pathErrorHolder)](std::exception_ptr const* iPtr) mutable {
1087  ServiceRegistry::Operate operate(weakToken.lock());
1088 
1089  std::exception_ptr ptr;
1090  if (pathError->load()) {
1091  ptr = *pathError->load();
1092  delete pathError->load();
1093  }
1094  if ((not ptr) and iPtr) {
1095  ptr = *iPtr;
1096  }
1097  iTask.doneWaiting(finishProcessOneEvent(ptr));
1098  });
1099  //The holder guarantees that if the paths finish before the loop ends
1100  // that we do not start too soon. It also guarantees that the task will
1101  // run under that condition.
1102  WaitingTaskHolder allPathsHolder(*iTask.group(), allPathsDone);
1103 
1104  auto pathsDone = make_waiting_task([allPathsHolder, pathErrorPtr, transitionInfo = info, this, weakToken](
1105  std::exception_ptr const* iPtr) mutable {
1106  ServiceRegistry::Operate operate(weakToken.lock());
1107 
1108  if (iPtr) {
1109  //this is used to prioritize this error over one
1110  // that happens in EndPath or Accumulate
1111  pathErrorPtr->store(new std::exception_ptr(*iPtr));
1112  }
1113  finishedPaths(*pathErrorPtr, std::move(allPathsHolder), transitionInfo);
1114  });
1115 
1116  //The holder guarantees that if the paths finish before the loop ends
1117  // that we do not start too soon. It also guarantees that the task will
1118  // run under that condition.
1119  WaitingTaskHolder taskHolder(*iTask.group(), pathsDone);
1120 
1121  //start end paths first so on single threaded the paths will run first
1122  WaitingTaskHolder hAllPathsDone(*iTask.group(), allPathsDone);
1123  for (auto it = end_paths_.rbegin(), itEnd = end_paths_.rend(); it != itEnd; ++it) {
1124  it->processEventUsingPathAsync(hAllPathsDone, info, serviceToken, streamID_, &streamContext_);
1125  }
1126 
1127  for (auto it = trig_paths_.rbegin(), itEnd = trig_paths_.rend(); it != itEnd; ++it) {
1128  it->processEventUsingPathAsync(taskHolder, info, serviceToken, streamID_, &streamContext_);
1129  }
1130 
1131  ParentContext parentContext(&streamContext_);
1133  hAllPathsDone, info, serviceToken, streamID_, parentContext, &streamContext_);
1134  } catch (...) {
1135  iTask.doneWaiting(std::current_exception());
1136  }
1137  }
1138 
1139  void StreamSchedule::finishedPaths(std::atomic<std::exception_ptr*>& iExcept,
1140  WaitingTaskHolder iWait,
1142  if (iExcept) {
1143  // Caught exception is propagated via WaitingTaskHolder
1144  CMS_SA_ALLOW try { std::rethrow_exception(*(iExcept.load())); } catch (cms::Exception& e) {
1148  edm::printCmsExceptionWarning("TryToContinue", e);
1149  *(iExcept.load()) = std::exception_ptr();
1150  } else {
1151  *(iExcept.load()) = std::current_exception();
1152  }
1153  } catch (...) {
1154  *(iExcept.load()) = std::current_exception();
1155  }
1156  }
1157 
1158  if ((not iExcept) and results_->accept()) {
1159  ++total_passed_;
1160  }
1161 
1162  if (nullptr != results_inserter_.get()) {
1163  // Caught exception is propagated to the caller
1164  CMS_SA_ALLOW try {
1165  //Even if there was an exception, we need to allow results inserter
1166  // to run since some module may be waiting on its results.
1167  ParentContext parentContext(&streamContext_);
1169 
1170  auto expt = results_inserter_->runModuleDirectly<Traits>(info, streamID_, parentContext, &streamContext_);
1171  if (expt) {
1172  std::rethrow_exception(expt);
1173  }
1174  } catch (cms::Exception& ex) {
1175  if (not iExcept) {
1176  if (ex.context().empty()) {
1177  std::ostringstream ost;
1178  ost << "Processing Event " << info.principal().id();
1179  ex.addContext(ost.str());
1180  }
1181  iExcept.store(new std::exception_ptr(std::current_exception()));
1182  }
1183  } catch (...) {
1184  if (not iExcept) {
1185  iExcept.store(new std::exception_ptr(std::current_exception()));
1186  }
1187  }
1188  }
1189  std::exception_ptr ptr;
1190  if (iExcept) {
1191  ptr = *iExcept.load();
1192  }
1193  iWait.doneWaiting(ptr);
1194  }
1195 
1196  std::exception_ptr StreamSchedule::finishProcessOneEvent(std::exception_ptr iExcept) {
1198 
1199  if (iExcept) {
1200  //add context information to the exception and print message
1201  try {
1202  convertException::wrap([&]() { std::rethrow_exception(iExcept); });
1203  } catch (cms::Exception& ex) {
1204  bool const cleaningUpAfterException = false;
1205  if (ex.context().empty()) {
1206  addContextAndPrintException("Calling function StreamSchedule::processOneEvent", ex, cleaningUpAfterException);
1207  } else {
1208  addContextAndPrintException("", ex, cleaningUpAfterException);
1209  }
1210  iExcept = std::current_exception();
1211  }
1212 
1213  actReg_->preStreamEarlyTerminationSignal_(streamContext_, TerminationOrigin::ExceptionFromThisContext);
1214  }
1215  // Caught exception is propagated to the caller
1216  CMS_SA_ALLOW try { Traits::postScheduleSignal(actReg_.get(), &streamContext_); } catch (...) {
1217  if (not iExcept) {
1218  iExcept = std::current_exception();
1219  }
1220  }
1221  if (not iExcept) {
1222  resetEarlyDelete();
1223  }
1224 
1225  return iExcept;
1226  }
1227 
1228  void StreamSchedule::availablePaths(std::vector<std::string>& oLabelsToFill) const {
1229  oLabelsToFill.reserve(trig_paths_.size());
1230  std::transform(trig_paths_.begin(),
1231  trig_paths_.end(),
1232  std::back_inserter(oLabelsToFill),
1233  std::bind(&Path::name, std::placeholders::_1));
1234  }
1235 
1236  void StreamSchedule::modulesInPath(std::string const& iPathLabel, std::vector<std::string>& oLabelsToFill) const {
1237  TrigPaths::const_iterator itFound = std::find_if(
1238  trig_paths_.begin(),
1239  trig_paths_.end(),
1240  std::bind(std::equal_to<std::string>(), iPathLabel, std::bind(&Path::name, std::placeholders::_1)));
1241  if (itFound != trig_paths_.end()) {
1242  oLabelsToFill.reserve(itFound->size());
1243  for (size_t i = 0; i < itFound->size(); ++i) {
1244  oLabelsToFill.push_back(itFound->getWorker(i)->description()->moduleLabel());
1245  }
1246  }
1247  }
1248 
1250  std::vector<ModuleDescription const*>& descriptions,
1251  unsigned int hint) const {
1252  descriptions.clear();
1253  bool found = false;
1254  TrigPaths::const_iterator itFound;
1255 
1256  if (hint < trig_paths_.size()) {
1257  itFound = trig_paths_.begin() + hint;
1258  if (itFound->name() == iPathLabel)
1259  found = true;
1260  }
1261  if (!found) {
1262  // if the hint did not work, do it the slow way
1263  itFound = std::find_if(
1264  trig_paths_.begin(),
1265  trig_paths_.end(),
1266  std::bind(std::equal_to<std::string>(), iPathLabel, std::bind(&Path::name, std::placeholders::_1)));
1267  if (itFound != trig_paths_.end())
1268  found = true;
1269  }
1270  if (found) {
1271  descriptions.reserve(itFound->size());
1272  for (size_t i = 0; i < itFound->size(); ++i) {
1273  descriptions.push_back(itFound->getWorker(i)->description());
1274  }
1275  }
1276  }
1277 
1279  std::vector<ModuleDescription const*>& descriptions,
1280  unsigned int hint) const {
1281  descriptions.clear();
1282  bool found = false;
1283  TrigPaths::const_iterator itFound;
1284 
1285  if (hint < end_paths_.size()) {
1286  itFound = end_paths_.begin() + hint;
1287  if (itFound->name() == iEndPathLabel)
1288  found = true;
1289  }
1290  if (!found) {
1291  // if the hint did not work, do it the slow way
1292  itFound = std::find_if(
1293  end_paths_.begin(),
1294  end_paths_.end(),
1295  std::bind(std::equal_to<std::string>(), iEndPathLabel, std::bind(&Path::name, std::placeholders::_1)));
1296  if (itFound != end_paths_.end())
1297  found = true;
1298  }
1299  if (found) {
1300  descriptions.reserve(itFound->size());
1301  for (size_t i = 0; i < itFound->size(); ++i) {
1302  descriptions.push_back(itFound->getWorker(i)->description());
1303  }
1304  }
1305  }
1306 
1307  static void fillModuleInPathSummary(Path const& path, size_t which, ModuleInPathSummary& sum) {
1308  sum.timesVisited += path.timesVisited(which);
1309  sum.timesPassed += path.timesPassed(which);
1310  sum.timesFailed += path.timesFailed(which);
1311  sum.timesExcept += path.timesExcept(which);
1312  sum.moduleLabel = path.getWorker(which)->description()->moduleLabel();
1313  sum.bitPosition = path.bitPosition(which);
1314  }
1315 
1316  static void fillPathSummary(Path const& path, PathSummary& sum) {
1317  sum.name = path.name();
1318  sum.bitPosition = path.bitPosition();
1319  sum.timesRun += path.timesRun();
1320  sum.timesPassed += path.timesPassed();
1321  sum.timesFailed += path.timesFailed();
1322  sum.timesExcept += path.timesExcept();
1323 
1324  Path::size_type sz = path.size();
1325  if (sum.moduleInPathSummaries.empty()) {
1326  std::vector<ModuleInPathSummary> temp(sz);
1327  for (size_t i = 0; i != sz; ++i) {
1329  }
1330  sum.moduleInPathSummaries.swap(temp);
1331  } else {
1332  assert(sz == sum.moduleInPathSummaries.size());
1333  for (size_t i = 0; i != sz; ++i) {
1335  }
1336  }
1337  }
1338 
1339  static void fillWorkerSummaryAux(Worker const& w, WorkerSummary& sum) {
1340  sum.timesVisited += w.timesVisited();
1341  sum.timesRun += w.timesRun();
1342  sum.timesPassed += w.timesPassed();
1343  sum.timesFailed += w.timesFailed();
1344  sum.timesExcept += w.timesExcept();
1345  sum.moduleLabel = w.description()->moduleLabel();
1346  }
1347 
1348  static void fillWorkerSummary(Worker const* pw, WorkerSummary& sum) { fillWorkerSummaryAux(*pw, sum); }
1349 
1351  rep.eventSummary.totalEvents += totalEvents();
1352  rep.eventSummary.totalEventsPassed += totalEventsPassed();
1353  rep.eventSummary.totalEventsFailed += totalEventsFailed();
1354 
1355  fill_summary(trig_paths_, rep.trigPathSummaries, &fillPathSummary);
1356  fill_summary(end_paths_, rep.endPathSummaries, &fillPathSummary);
1357  fill_summary(allWorkersLumisAndEvents(), rep.workerSummaries, &fillWorkerSummary);
1358  }
1359 
1361  using std::placeholders::_1;
1363  for_all(trig_paths_, std::bind(&Path::clearCounters, _1));
1364  for_all(end_paths_, std::bind(&Path::clearCounters, _1));
1366  }
1367 
1368  void StreamSchedule::resetAll() { results_->reset(); }
1369 
1371 
1373  //must be sure we have cleared the count first
1374  for (auto& count : earlyDeleteBranchToCount_) {
1375  count.count = 0;
1376  }
1377  //now reset based on how many helpers use that branch
1379  ++(earlyDeleteBranchToCount_[index].count);
1380  }
1381  for (auto& helper : earlyDeleteHelpers_) {
1382  helper.reset();
1383  }
1384  }
1385 
1387  std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
1388  std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
1390  int bitpos = 0;
1391  unsigned int indexEmpty = 0;
1392  unsigned int indexOfPath = 0;
1393  for (auto& pathStatusInserter : pathStatusInserters) {
1394  std::shared_ptr<PathStatusInserter> inserterPtr = get_underlying(pathStatusInserter);
1395  WorkerPtr workerPtr(
1396  new edm::WorkerT<PathStatusInserter::ModuleType>(inserterPtr, inserterPtr->moduleDescription(), &actions));
1397  pathStatusInserterWorkers_.emplace_back(workerPtr);
1398  workerPtr->setActivityRegistry(actReg_);
1399  addToAllWorkers(workerPtr.get());
1400 
1401  // A little complexity here because a C++ Path object is not
1402  // instantiated and put into end_paths if there are no modules
1403  // on the configured path.
1404  if (indexEmpty < empty_trig_paths_.size() && bitpos == empty_trig_paths_.at(indexEmpty)) {
1405  ++indexEmpty;
1406  } else {
1407  trig_paths_.at(indexOfPath).setPathStatusInserter(inserterPtr.get(), workerPtr.get());
1408  ++indexOfPath;
1409  }
1410  ++bitpos;
1411  }
1412 
1413  bitpos = 0;
1414  indexEmpty = 0;
1415  indexOfPath = 0;
1416  for (auto& endPathStatusInserter : endPathStatusInserters) {
1417  std::shared_ptr<EndPathStatusInserter> inserterPtr = get_underlying(endPathStatusInserter);
1418  WorkerPtr workerPtr(
1419  new edm::WorkerT<EndPathStatusInserter::ModuleType>(inserterPtr, inserterPtr->moduleDescription(), &actions));
1420  endPathStatusInserterWorkers_.emplace_back(workerPtr);
1421  workerPtr->setActivityRegistry(actReg_);
1422  addToAllWorkers(workerPtr.get());
1423 
1424  // A little complexity here because a C++ Path object is not
1425  // instantiated and put into end_paths if there are no modules
1426  // on the configured path.
1427  if (indexEmpty < empty_end_paths_.size() && bitpos == empty_end_paths_.at(indexEmpty)) {
1428  ++indexEmpty;
1429  } else {
1430  end_paths_.at(indexOfPath).setPathStatusInserter(nullptr, workerPtr.get());
1431  ++indexOfPath;
1432  }
1433  ++bitpos;
1434  }
1435  }
1436 
1438  ServiceWeakToken const& weakToken,
1439  bool cleaningUpAfterException,
1440  std::exception_ptr& excpt) const noexcept {
1441  //add context information to the exception and print message
1442  try {
1443  convertException::wrap([&excpt]() { std::rethrow_exception(excpt); });
1444  } catch (cms::Exception& ex) {
1445  std::ostringstream ost;
1446  // In most cases the exception will already have context at this point,
1447  // but add some context here in those rare cases where it does not.
1448  if (ex.context().empty()) {
1449  exceptionContext(ost, streamContext);
1450  }
1451  ServiceRegistry::Operate op(weakToken.lock());
1452  addContextAndPrintException(ost.str().c_str(), ex, cleaningUpAfterException);
1453  excpt = std::current_exception();
1454  }
1455  // We are already handling an earlier exception, so ignore it
1456  // if this signal results in another exception being thrown.
1457  CMS_SA_ALLOW try {
1458  ServiceRegistry::Operate op(weakToken.lock());
1459  actReg_->preStreamEarlyTerminationSignal_(streamContext, TerminationOrigin::ExceptionFromThisContext);
1460  } catch (...) {
1461  }
1462  }
1463 } // namespace edm
static void fillModuleInPathSummary(Path const &path, size_t which, ModuleInPathSummary &sum)
void initializeEarlyDelete(ModuleRegistry &modReg, std::vector< std::string > const &branchesToDeleteEarly, std::multimap< std::string, std::string > const &referencesToBranches, std::vector< std::string > const &modulesToSkip, edm::ProductRegistry const &preg)
AllWorkers const & allWorkersBeginEnd() const
returns the collection of pointers to workers
void moduleDescriptionsInEndPath(std::string const &iEndPathLabel, std::vector< ModuleDescription const *> &descriptions, unsigned int hint) const
ExceptionToActionTable const & actionTable() const
returns the action table
T getParameter(std::string const &) const
Definition: ParameterSet.h:307
static const TGPicture * info(bool iBackgroundIsBlack)
#define CMS_SA_ALLOW
std::vector< int > empty_trig_paths_
Definition: helper.py:1
roAction_t actions[nactions]
Definition: GenABIO.cc:181
void exceptionContext(cms::Exception &, ESModuleCallingContext const &)
std::vector< edm::propagate_const< WorkerPtr > > pathStatusInserterWorkers_
void fillWorkers(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &name, bool ignoreFilters, PathWorkers &out, std::vector< std::string > const &endPathNames, ConditionalTaskHelper const &conditionalTaskHelper, std::unordered_set< std::string > &allConditionalModules)
void endStream(StreamID iID, StreamContext &streamContext)
virtual void replaceModuleFor(Worker *) const =0
ProductList const & productList() const
std::unordered_multimap< std::string, AliasInfo > const & aliasMap() const
void processAccumulatorsAsync(WaitingTaskHolder, typename T::TransitionInfoType const &, ServiceToken const &, StreamID, ParentContext const &, typename T::Context const *)
std::array< SelectedProducts, NumBranchTypes > SelectedProductsForBranchType
static PFTauRenderPlugin instance
T w() const
std::shared_ptr< HLTGlobalStatus > TrigResPtr
WorkersInPath::size_type size_type
Definition: Path.h:46
int totalEventsPassed() const
ret
prodAgent to be discontinued
void modulesInPath(std::string const &iPathLabel, std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel ...
void processOneEventAsync(WaitingTaskHolder iTask, EventTransitionInfo &, ServiceToken const &token, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters)
std::vector< BranchToCount > earlyDeleteBranchToCount_
void clearCounters() noexcept
Definition: Worker.h:235
std::unordered_multimap< std::string, AliasInfo > aliasMap_
void setupOnDemandSystem(EventTransitionInfo const &)
void addContextAndPrintException(char const *context, cms::Exception &ex, bool disablePrint)
ConditionalTaskHelper(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, WorkerManager &workerManagerLumisAndEvents, std::vector< std::string > const &trigPathNames)
void addToAllWorkers(Worker *w)
std::string const & moduleName() const
StreamSchedule(std::shared_ptr< TriggerResultInserter > inserter, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, std::shared_ptr< ModuleRegistry >, ParameterSet &proc_pset, service::TriggerNamesService const &tns, PreallocationConfiguration const &prealloc, ProductRegistry &pregistry, ExceptionToActionTable const &actions, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, StreamID streamID, ProcessContext const *processContext)
std::vector< Worker * > tryToPlaceConditionalModules(Worker *, std::unordered_set< std::string > &conditionalModules, std::unordered_multimap< std::string, edm::BranchDescription const *> const &conditionalModuleBranches, std::unordered_multimap< std::string, AliasInfo > const &aliasMap, ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration)
edm::propagate_const< WorkerPtr > results_inserter_
void deleteModuleIfExists(std::string const &moduleLabel)
std::shared_ptr< Worker > WorkerPtr
void find(edm::Handle< EcalRecHitCollection > &hits, DetId thisDet, std::vector< EcalRecHitCollection::const_iterator > &hit, bool debug=false)
Definition: FindCaloHit.cc:19
assert(be >=bs)
StreamID streamID() const
WorkerManager workerManagerBeginEnd_
unsigned int number_of_unscheduled_modules_
int totalEventsFailed() const
TEMPL(T2) struct Divides void
Definition: Factorize.h:24
std::shared_ptr< ActivityRegistry > actReg_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
T getUntrackedParameter(std::string const &, T const &) const
constexpr element_type const * get() const
std::string moduleLabel
Definition: TriggerReport.h:52
oneapi::tbb::task_group * group() const noexcept
void processEDAliases(std::vector< std::string > const &aliasNamesToProcess, std::unordered_set< std::string > const &aliasModulesToProcess, ParameterSet const &proc_pset, std::string const &processName, ProductRegistry &preg)
char const * label
std::vector< WorkerInPath > PathWorkers
std::vector< int > empty_end_paths_
accept
Definition: HLTenums.h:18
WorkerManager workerManagerRuns_
void processSwitchEDAliases(ParameterSet const &proc_pset, ProductRegistry &preg, ProcessConfiguration const &processConfiguration, std::unordered_set< std::string > const &allConditionalMods)
std::string name
Definition: TriggerReport.h:41
void moduleDescriptionsInPath(std::string const &iPathLabel, std::vector< ModuleDescription const *> &descriptions, unsigned int hint) const
TrigResConstPtr results() const
Strings const & getTrigPaths() const
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
def unique(seq, keepstr=True)
Definition: tier0.py:24
Definition: Path.h:41
StreamContext streamContext_
std::vector< std::string > vstring
virtual std::vector< ConsumesInfo > consumesInfo() const =0
static void fillPathSummary(Path const &path, PathSummary &sum)
void finishedPaths(std::atomic< std::exception_ptr *> &, WaitingTaskHolder, EventTransitionInfo &)
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:92
void handleException(StreamContext const &, ServiceWeakToken const &, bool cleaningUpAfterException, std::exception_ptr &) const noexcept
ModuleDescription const * description() const noexcept
Definition: Worker.h:201
void fillTrigPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, TrigResPtr, std::vector< std::string > const &endPathNames, ConditionalTaskHelper const &conditionalTaskHelper, std::unordered_set< std::string > &allConditionalModules)
bool typeIsViewCompatible(TypeID const &requestedViewType, TypeID const &wrappedtypeID, std::string const &className)
rep
Definition: cuy.py:1189
std::exception_ptr finishProcessOneEvent(std::exception_ptr)
ServiceToken lock() const
Definition: ServiceToken.h:101
void deleteModule(std::string const &iLabel)
Delete the module with label iLabel.
void doneWaiting(std::exception_ptr iExcept) noexcept
int totalEvents() const
static void fillWorkerSummaryAux(Worker const &w, WorkerSummary &sum)
virtual Types moduleType() const =0
Log< level::Info, false > LogInfo
void beginStream(StreamID iID, StreamContext &streamContext)
std::string const & className() const
Definition: TypeID.cc:40
void clearCounters()
Clear all the counters in the trigger report.
void fillAliasMap(ParameterSet const &proc_pset, std::unordered_set< std::string > const &allConditionalMods)
Strings const & getEndPaths() const
void getTriggerReport(TriggerReport &rep) const
void addToUnscheduledWorkers(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string label, std::set< std::string > &unscheduledLabels, std::vector< std::string > &shouldBeUsedLabels)
exception_actions::ActionCodes find(const std::string &category) const
std::vector< unsigned int > earlyDeleteHelperToBranchIndicies_
void forAllModuleHolders(F iFunc)
edm::propagate_const< TrigResPtr > results_
bool search_all(ForwardSequence const &s, Datum const &d)
Definition: Algorithms.h:36
AllWorkers const & allWorkersLumisAndEvents() const
double b
Definition: hdecay.h:120
constexpr T & get_underlying(propagate_const< T > &)
void addContext(std::string const &context)
Definition: Exception.cc:169
virtual std::unique_ptr< OutputModuleCommunicator > createOutputModuleCommunicator()=0
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
static void fillWorkerSummary(Worker const *pw, WorkerSummary &sum)
std::string const & processName() const
HLT enums.
void setupResolvers(Principal &principal)
void replaceModule(maker::ModuleHolder *iMod, std::string const &iLabel)
clone the type of module with label iLabel but configure with iPSet.
std::vector< edm::propagate_const< WorkerPtr > > endPathStatusInserterWorkers_
void makePathStatusInserters(std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, ExceptionToActionTable const &actions)
def which(cmd)
Definition: eostools.py:336
std::unordered_multimap< std::string, edm::BranchDescription const * > conditionalModuleBranches(std::unordered_set< std::string > const &conditionalmods) const
std::vector< ModuleInPathSummary > moduleInPathSummaries
Definition: TriggerReport.h:42
void availablePaths(std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill the labels for all paths in the process
unsigned int value() const
Definition: StreamID.h:43
auto wrap(F iFunc) -> decltype(iFunc())
std::unordered_multimap< std::string, edm::BranchDescription const * > conditionalModsBranches_
Log< level::Warning, false > LogWarning
void fillEndPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, std::vector< std::string > const &endPathNames, ConditionalTaskHelper const &conditionalTaskHelper, std::unordered_set< std::string > &allConditionalModules)
std::list< std::string > const & context() const
Definition: Exception.cc:151
long double T
std::string const & moduleLabel() const
T mod(const T &a, const T &b)
Definition: ecalDccMap.h:4
std::string const & name() const
Definition: Path.h:66
WorkerManager workerManagerLumisAndEvents_
std::vector< std::string > vstring
Definition: Schedule.cc:482
std::vector< std::string > set_difference(std::vector< std::string > const &v1, std::vector< std::string > const &v2)
AllWorkers const & allWorkersRuns() const
def move(src, dest)
Definition: eostools.py:511
void clearCounters()
Definition: Path.cc:183
std::vector< EarlyDeleteHelper > earlyDeleteHelpers_
void addToAllWorkers(Worker *w)
void printCmsExceptionWarning(char const *behavior, cms::Exception const &e)
ParameterSet * getPSetForUpdate(std::string const &name, bool &isTracked)
std::vector< std::string > set_intersection(std::vector< std::string > const &v1, std::vector< std::string > const &v2)
unsigned transform(const HcalDetId &id, unsigned transformCode)