CMS 3D CMS Logo

StreamSchedule.cc
Go to the documentation of this file.
2 
29 
31 #include "processEDAliases.h"
32 
33 #include <algorithm>
34 #include <cassert>
35 #include <cstdlib>
36 #include <functional>
37 #include <iomanip>
38 #include <limits>
39 #include <list>
40 #include <map>
41 #include <exception>
42 #include <fmt/format.h>
43 
44 namespace edm {
45 
46  namespace {
47 
48  // Function template to transform each element in the input range to
49  // a value placed into the output range. The supplied function
50  // should take a const_reference to the 'input', and write to a
51  // reference to the 'output'.
52  template <typename InputIterator, typename ForwardIterator, typename Func>
53  void transform_into(InputIterator begin, InputIterator end, ForwardIterator out, Func func) {
54  for (; begin != end; ++begin, ++out)
55  func(*begin, *out);
56  }
57 
58  // Function template that takes a sequence 'from', a sequence
59  // 'to', and a callable object 'func'. It and applies
60  // transform_into to fill the 'to' sequence with the values
61  // calcuated by the callable object, taking care to fill the
62  // outupt only if all calls succeed.
63  template <typename FROM, typename TO, typename FUNC>
64  void fill_summary(FROM const& from, TO& to, FUNC func) {
65  if (to.size() != from.size()) {
66  TO temp(from.size());
67  transform_into(from.begin(), from.end(), temp.begin(), func);
68  to.swap(temp);
69  } else {
70  transform_into(from.begin(), from.end(), to.begin(), func);
71  }
72  }
73 
74  // -----------------------------
75 
76  // Here we make the trigger results inserter directly. This should
77  // probably be a utility in the WorkerRegistry or elsewhere.
78 
79  StreamSchedule::WorkerPtr makeInserter(ExceptionToActionTable const& actions,
80  std::shared_ptr<ActivityRegistry> areg,
81  std::shared_ptr<TriggerResultInserter> inserter) {
83  new edm::WorkerT<TriggerResultInserter::ModuleType>(inserter, inserter->moduleDescription(), &actions));
84  ptr->setActivityRegistry(areg);
85  return ptr;
86  }
87 
88  void initializeBranchToReadingWorker(std::vector<std::string> const& branchesToDeleteEarly,
89  ProductRegistry const& preg,
90  std::multimap<std::string, Worker*>& branchToReadingWorker) {
91  auto vBranchesToDeleteEarly = branchesToDeleteEarly;
92  // Remove any duplicates
93  std::sort(vBranchesToDeleteEarly.begin(), vBranchesToDeleteEarly.end(), std::less<std::string>());
94  vBranchesToDeleteEarly.erase(std::unique(vBranchesToDeleteEarly.begin(), vBranchesToDeleteEarly.end()),
95  vBranchesToDeleteEarly.end());
96 
97  // Are the requested items in the product registry?
98  auto allBranchNames = preg.allBranchNames();
99  //the branch names all end with a period, which we do not want to compare with
100  for (auto& b : allBranchNames) {
101  b.resize(b.size() - 1);
102  }
103  std::sort(allBranchNames.begin(), allBranchNames.end(), std::less<std::string>());
104  std::vector<std::string> temp;
105  temp.reserve(vBranchesToDeleteEarly.size());
106 
107  std::set_intersection(vBranchesToDeleteEarly.begin(),
108  vBranchesToDeleteEarly.end(),
109  allBranchNames.begin(),
110  allBranchNames.end(),
111  std::back_inserter(temp));
112  vBranchesToDeleteEarly.swap(temp);
113  if (temp.size() != vBranchesToDeleteEarly.size()) {
114  std::vector<std::string> missingProducts;
115  std::set_difference(temp.begin(),
116  temp.end(),
117  vBranchesToDeleteEarly.begin(),
118  vBranchesToDeleteEarly.end(),
119  std::back_inserter(missingProducts));
120  LogInfo l("MissingProductsForCanDeleteEarly");
121  l << "The following products in the 'canDeleteEarly' list are not available in this job and will be ignored.";
122  for (auto const& n : missingProducts) {
123  l << "\n " << n;
124  }
125  }
126  //set placeholder for the branch, we will remove the nullptr if a
127  // module actually wants the branch.
128  for (auto const& branch : vBranchesToDeleteEarly) {
129  branchToReadingWorker.insert(std::make_pair(branch, static_cast<Worker*>(nullptr)));
130  }
131  }
132 
133  Worker* getWorker(std::string const& moduleLabel,
134  ParameterSet& proc_pset,
135  WorkerManager& workerManager,
136  ProductRegistry& preg,
137  PreallocationConfiguration const* prealloc,
138  std::shared_ptr<ProcessConfiguration const> processConfiguration) {
139  bool isTracked;
140  ParameterSet* modpset = proc_pset.getPSetForUpdate(moduleLabel, isTracked);
141  if (modpset == nullptr) {
142  return nullptr;
143  }
144  assert(isTracked);
145 
146  return workerManager.getWorker(*modpset, preg, prealloc, processConfiguration, moduleLabel);
147  }
148 
149  // If ConditionalTask modules exist in the container of module
150  // names, returns the range (std::pair) for the modules. The range
151  // excludes the special markers '#' (right before the
152  // ConditionalTask modules) and '@' (last element).
153  // If the module name container does not contain ConditionalTask
154  // modules, returns std::pair of end iterators.
155  template <typename T>
156  auto findConditionalTaskModulesRange(T& modnames) {
157  auto beg = std::find(modnames.begin(), modnames.end(), "#");
158  if (beg == modnames.end()) {
159  return std::pair(modnames.end(), modnames.end());
160  }
161  return std::pair(beg + 1, std::prev(modnames.end()));
162  }
163 
164  std::optional<std::string> findBestMatchingAlias(
165  std::unordered_multimap<std::string, edm::BranchDescription const*> const& conditionalModuleBranches,
166  std::unordered_multimap<std::string, StreamSchedule::AliasInfo> const& aliasMap,
167  std::string const& productModuleLabel,
168  ConsumesInfo const& consumesInfo) {
169  std::optional<std::string> best;
170  int wildcardsInBest = std::numeric_limits<int>::max();
171  bool bestIsAmbiguous = false;
172 
173  auto updateBest = [&best, &wildcardsInBest, &bestIsAmbiguous](
174  std::string const& label, bool instanceIsWildcard, bool typeIsWildcard) {
175  int const wildcards = static_cast<int>(instanceIsWildcard) + static_cast<int>(typeIsWildcard);
176  if (wildcards == 0) {
177  bestIsAmbiguous = false;
178  return true;
179  }
180  if (not best or wildcards < wildcardsInBest) {
181  best = label;
182  wildcardsInBest = wildcards;
183  bestIsAmbiguous = false;
184  } else if (best and *best != label and wildcardsInBest == wildcards) {
185  bestIsAmbiguous = true;
186  }
187  return false;
188  };
189 
190  auto findAlias = aliasMap.equal_range(productModuleLabel);
191  for (auto it = findAlias.first; it != findAlias.second; ++it) {
192  std::string const& aliasInstanceLabel =
193  it->second.instanceLabel != "*" ? it->second.instanceLabel : it->second.originalInstanceLabel;
194  bool const instanceIsWildcard = (aliasInstanceLabel == "*");
195  if (instanceIsWildcard or consumesInfo.instance() == aliasInstanceLabel) {
196  bool const typeIsWildcard = it->second.friendlyClassName == "*";
197  if (typeIsWildcard or (consumesInfo.type().friendlyClassName() == it->second.friendlyClassName)) {
198  if (updateBest(it->second.originalModuleLabel, instanceIsWildcard, typeIsWildcard)) {
199  return it->second.originalModuleLabel;
200  }
201  } else if (consumesInfo.kindOfType() == ELEMENT_TYPE) {
202  //consume is a View so need to do more intrusive search
203  //find matching branches in module
204  auto branches = conditionalModuleBranches.equal_range(productModuleLabel);
205  for (auto itBranch = branches.first; itBranch != branches.second; ++it) {
206  if (typeIsWildcard or itBranch->second->productInstanceName() == it->second.originalInstanceLabel) {
207  if (productholderindexhelper::typeIsViewCompatible(consumesInfo.type(),
208  TypeID(itBranch->second->wrappedType().typeInfo()),
209  itBranch->second->className())) {
210  if (updateBest(it->second.originalModuleLabel, instanceIsWildcard, typeIsWildcard)) {
211  return it->second.originalModuleLabel;
212  }
213  }
214  }
215  }
216  }
217  }
218  }
219  if (bestIsAmbiguous) {
221  << "Encountered ambiguity when trying to find a best-matching alias for\n"
222  << " friendly class name " << consumesInfo.type().friendlyClassName() << "\n"
223  << " module label " << productModuleLabel << "\n"
224  << " product instance name " << consumesInfo.instance() << "\n"
225  << "when processing EDAliases for modules in ConditionalTasks. Two aliases have the same number of "
226  "wildcards ("
227  << wildcardsInBest << ")";
228  }
229  return best;
230  }
231  } // namespace
232 
233  // -----------------------------
234 
235  typedef std::vector<std::string> vstring;
236 
237  // -----------------------------
238 
240  public:
242 
244  ProductRegistry& preg,
245  PreallocationConfiguration const* prealloc,
246  std::shared_ptr<ProcessConfiguration const> processConfiguration,
247  WorkerManager& workerManager,
248  std::vector<std::string> const& trigPathNames) {
249  std::unordered_set<std::string> allConditionalMods;
250  for (auto const& pathName : trigPathNames) {
251  auto const modnames = proc_pset.getParameter<vstring>(pathName);
252 
253  //Pull out ConditionalTask modules
254  auto condRange = findConditionalTaskModulesRange(modnames);
255  if (condRange.first == condRange.second)
256  continue;
257 
258  //the last entry should be ignored since it is required to be "@"
259  allConditionalMods.insert(condRange.first, condRange.second);
260  }
261 
262  for (auto const& cond : allConditionalMods) {
263  //force the creation of the conditional modules so alias check can work
264  (void)getWorker(cond, proc_pset, workerManager, preg, prealloc, processConfiguration);
265  }
266 
267  fillAliasMap(proc_pset, allConditionalMods);
268  processSwitchEDAliases(proc_pset, preg, *processConfiguration, allConditionalMods);
269 
270  //find branches created by the conditional modules
271  for (auto const& prod : preg.productList()) {
272  if (allConditionalMods.find(prod.first.moduleLabel()) != allConditionalMods.end()) {
273  conditionalModsBranches_.emplace(prod.first.moduleLabel(), &prod.second);
274  }
275  }
276  }
277 
278  std::unordered_multimap<std::string, AliasInfo> const& aliasMap() const { return aliasMap_; }
279 
280  std::unordered_multimap<std::string, edm::BranchDescription const*> conditionalModuleBranches(
281  std::unordered_set<std::string> const& conditionalmods) const {
282  std::unordered_multimap<std::string, edm::BranchDescription const*> ret;
283  for (auto const& mod : conditionalmods) {
284  auto range = conditionalModsBranches_.equal_range(mod);
285  ret.insert(range.first, range.second);
286  }
287  return ret;
288  }
289 
290  private:
291  void fillAliasMap(ParameterSet const& proc_pset, std::unordered_set<std::string> const& allConditionalMods) {
292  auto aliases = proc_pset.getParameter<std::vector<std::string>>("@all_aliases");
293  std::string const star("*");
294  for (auto const& alias : aliases) {
295  auto info = proc_pset.getParameter<edm::ParameterSet>(alias);
296  auto aliasedToModuleLabels = info.getParameterNames();
297  for (auto const& mod : aliasedToModuleLabels) {
298  if (not mod.empty() and mod[0] != '@' and allConditionalMods.find(mod) != allConditionalMods.end()) {
299  auto aliasVPSet = info.getParameter<std::vector<edm::ParameterSet>>(mod);
300  for (auto const& aliasPSet : aliasVPSet) {
301  std::string type = star;
302  std::string instance = star;
303  std::string originalInstance = star;
304  if (aliasPSet.exists("type")) {
305  type = aliasPSet.getParameter<std::string>("type");
306  }
307  if (aliasPSet.exists("toProductInstance")) {
308  instance = aliasPSet.getParameter<std::string>("toProductInstance");
309  }
310  if (aliasPSet.exists("fromProductInstance")) {
311  originalInstance = aliasPSet.getParameter<std::string>("fromProductInstance");
312  }
313 
314  aliasMap_.emplace(alias, AliasInfo{type, instance, originalInstance, mod});
315  }
316  }
317  }
318  }
319  }
320 
321  void processSwitchEDAliases(ParameterSet const& proc_pset,
322  ProductRegistry& preg,
323  ProcessConfiguration const& processConfiguration,
324  std::unordered_set<std::string> const& allConditionalMods) {
325  auto const& all_modules = proc_pset.getParameter<std::vector<std::string>>("@all_modules");
326  std::vector<std::string> switchEDAliases;
327  for (auto const& module : all_modules) {
328  auto const& mod_pset = proc_pset.getParameter<edm::ParameterSet>(module);
329  if (mod_pset.getParameter<std::string>("@module_type") == "SwitchProducer") {
330  auto const& all_cases = mod_pset.getParameter<std::vector<std::string>>("@all_cases");
331  for (auto const& case_label : all_cases) {
332  auto range = aliasMap_.equal_range(case_label);
333  if (range.first != range.second) {
334  switchEDAliases.push_back(case_label);
335  }
336  }
337  }
338  }
340  switchEDAliases, allConditionalMods, proc_pset, processConfiguration.processName(), preg);
341  }
342 
343  std::unordered_multimap<std::string, AliasInfo> aliasMap_;
344  std::unordered_multimap<std::string, edm::BranchDescription const*> conditionalModsBranches_;
345  };
346 
347  // -----------------------------
348 
350  std::shared_ptr<TriggerResultInserter> inserter,
351  std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
352  std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
353  std::shared_ptr<ModuleRegistry> modReg,
354  ParameterSet& proc_pset,
355  service::TriggerNamesService const& tns,
356  PreallocationConfiguration const& prealloc,
357  ProductRegistry& preg,
359  std::shared_ptr<ActivityRegistry> areg,
360  std::shared_ptr<ProcessConfiguration const> processConfiguration,
361  StreamID streamID,
362  ProcessContext const* processContext)
363  : workerManager_(modReg, areg, actions),
364  actReg_(areg),
365  results_(new HLTGlobalStatus(tns.getTrigPaths().size())),
366  results_inserter_(),
367  trig_paths_(),
368  end_paths_(),
369  total_events_(),
370  total_passed_(),
371  number_of_unscheduled_modules_(0),
372  streamID_(streamID),
373  streamContext_(streamID_, processContext) {
374  bool hasPath = false;
375  std::vector<std::string> const& pathNames = tns.getTrigPaths();
376  std::vector<std::string> const& endPathNames = tns.getEndPaths();
377 
378  ConditionalTaskHelper conditionalTaskHelper(
379  proc_pset, preg, &prealloc, processConfiguration, workerManager_, pathNames);
380 
381  int trig_bitpos = 0;
382  trig_paths_.reserve(pathNames.size());
383  for (auto const& trig_name : pathNames) {
384  fillTrigPath(proc_pset,
385  preg,
386  &prealloc,
387  processConfiguration,
388  trig_bitpos,
389  trig_name,
390  results(),
391  endPathNames,
392  conditionalTaskHelper);
393  ++trig_bitpos;
394  hasPath = true;
395  }
396 
397  if (hasPath) {
398  // the results inserter stands alone
399  inserter->setTrigResultForStream(streamID.value(), results());
400 
401  results_inserter_ = makeInserter(actions, actReg_, inserter);
403  }
404 
405  // fill normal endpaths
406  int bitpos = 0;
407  end_paths_.reserve(endPathNames.size());
408  for (auto const& end_path_name : endPathNames) {
409  fillEndPath(
410  proc_pset, preg, &prealloc, processConfiguration, bitpos, end_path_name, endPathNames, conditionalTaskHelper);
411  ++bitpos;
412  }
413 
414  makePathStatusInserters(pathStatusInserters, endPathStatusInserters, actions);
415 
416  //See if all modules were used
417  std::set<std::string> usedWorkerLabels;
418  for (auto const& worker : allWorkers()) {
419  usedWorkerLabels.insert(worker->description()->moduleLabel());
420  }
421  std::vector<std::string> modulesInConfig(proc_pset.getParameter<std::vector<std::string>>("@all_modules"));
422  std::set<std::string> modulesInConfigSet(modulesInConfig.begin(), modulesInConfig.end());
423  std::vector<std::string> unusedLabels;
424  set_difference(modulesInConfigSet.begin(),
425  modulesInConfigSet.end(),
426  usedWorkerLabels.begin(),
427  usedWorkerLabels.end(),
428  back_inserter(unusedLabels));
429  std::set<std::string> unscheduledLabels;
430  std::vector<std::string> shouldBeUsedLabels;
431  if (!unusedLabels.empty()) {
432  //Need to
433  // 1) create worker
434  // 2) if it is a WorkerT<EDProducer>, add it to our list
435  // 3) hand list to our delayed reader
436  for (auto const& label : unusedLabels) {
437  bool isTracked;
438  ParameterSet* modulePSet(proc_pset.getPSetForUpdate(label, isTracked));
439  assert(isTracked);
440  assert(modulePSet != nullptr);
442  *modulePSet, preg, &prealloc, processConfiguration, label, unscheduledLabels, shouldBeUsedLabels);
443  }
444  if (!shouldBeUsedLabels.empty()) {
445  std::ostringstream unusedStream;
446  unusedStream << "'" << shouldBeUsedLabels.front() << "'";
447  for (std::vector<std::string>::iterator itLabel = shouldBeUsedLabels.begin() + 1,
448  itLabelEnd = shouldBeUsedLabels.end();
449  itLabel != itLabelEnd;
450  ++itLabel) {
451  unusedStream << ",'" << *itLabel << "'";
452  }
453  LogInfo("path") << "The following module labels are not assigned to any path:\n" << unusedStream.str() << "\n";
454  }
455  }
456  number_of_unscheduled_modules_ = unscheduledLabels.size();
457  } // StreamSchedule::StreamSchedule
458 
460  std::vector<std::string> const& branchesToDeleteEarly,
461  std::multimap<std::string, std::string> const& referencesToBranches,
462  std::vector<std::string> const& modulesToSkip,
463  edm::ProductRegistry const& preg) {
464  // setup the list with those products actually registered for this job
465  std::multimap<std::string, Worker*> branchToReadingWorker;
466  initializeBranchToReadingWorker(branchesToDeleteEarly, preg, branchToReadingWorker);
467 
468  const std::vector<std::string> kEmpty;
469  std::map<Worker*, unsigned int> reserveSizeForWorker;
470  unsigned int upperLimitOnReadingWorker = 0;
471  unsigned int upperLimitOnIndicies = 0;
472  unsigned int nUniqueBranchesToDelete = branchToReadingWorker.size();
473 
474  //talk with output modules first
475  modReg.forAllModuleHolders([&branchToReadingWorker, &nUniqueBranchesToDelete](maker::ModuleHolder* iHolder) {
476  auto comm = iHolder->createOutputModuleCommunicator();
477  if (comm) {
478  if (!branchToReadingWorker.empty()) {
479  //If an OutputModule needs a product, we can't delete it early
480  // so we should remove it from our list
481  SelectedProductsForBranchType const& kept = comm->keptProducts();
482  for (auto const& item : kept[InEvent]) {
483  BranchDescription const& desc = *item.first;
484  auto found = branchToReadingWorker.equal_range(desc.branchName());
485  if (found.first != found.second) {
486  --nUniqueBranchesToDelete;
487  branchToReadingWorker.erase(found.first, found.second);
488  }
489  }
490  }
491  }
492  });
493 
494  if (branchToReadingWorker.empty()) {
495  return;
496  }
497 
498  std::unordered_set<std::string> modulesToExclude(modulesToSkip.begin(), modulesToSkip.end());
499  for (auto w : allWorkers()) {
500  if (modulesToExclude.end() != modulesToExclude.find(w->description()->moduleLabel())) {
501  continue;
502  }
503  //determine if this module could read a branch we want to delete early
504  auto consumes = w->consumesInfo();
505  if (not consumes.empty()) {
506  bool foundAtLeastOneMatchingBranch = false;
507  for (auto const& product : consumes) {
508  std::string branch = fmt::format("{}_{}_{}_{}",
509  product.type().friendlyClassName(),
510  product.label().data(),
511  product.instance().data(),
512  product.process().data());
513  {
514  //Handle case where worker directly consumes product
515  auto found = branchToReadingWorker.end();
516  if (product.process().empty()) {
517  auto startFound = branchToReadingWorker.lower_bound(branch);
518  if (startFound != branchToReadingWorker.end()) {
519  if (startFound->first.substr(0, branch.size()) == branch) {
520  //match all processNames here, even if it means multiple matches will happen
521  found = startFound;
522  }
523  }
524  } else {
525  auto exactFound = branchToReadingWorker.equal_range(branch);
526  if (exactFound.first != exactFound.second) {
527  found = exactFound.first;
528  }
529  }
530  if (found != branchToReadingWorker.end()) {
531  if (not foundAtLeastOneMatchingBranch) {
532  ++upperLimitOnReadingWorker;
533  foundAtLeastOneMatchingBranch = true;
534  }
535  ++upperLimitOnIndicies;
536  ++reserveSizeForWorker[w];
537  if (nullptr == found->second) {
538  found->second = w;
539  } else {
540  branchToReadingWorker.insert(make_pair(found->first, w));
541  }
542  }
543  }
544  {
545  //Handle case where indirectly consumes product
546  auto found = referencesToBranches.end();
547  if (product.process().empty()) {
548  auto startFound = referencesToBranches.lower_bound(branch);
549  if (startFound != referencesToBranches.end()) {
550  if (startFound->first.substr(0, branch.size()) == branch) {
551  //match all processNames here, even if it means multiple matches will happen
552  found = startFound;
553  }
554  }
555  } else {
556  //can match exactly
557  auto exactFound = referencesToBranches.equal_range(branch);
558  if (exactFound.first != exactFound.second) {
559  found = exactFound.first;
560  }
561  }
562  if (found != referencesToBranches.end()) {
563  for (auto itr = found; (itr != referencesToBranches.end()) and (itr->first == found->first); ++itr) {
564  auto foundInBranchToReadingWorker = branchToReadingWorker.find(itr->second);
565  if (foundInBranchToReadingWorker == branchToReadingWorker.end()) {
566  continue;
567  }
568  if (not foundAtLeastOneMatchingBranch) {
569  ++upperLimitOnReadingWorker;
570  foundAtLeastOneMatchingBranch = true;
571  }
572  ++upperLimitOnIndicies;
573  ++reserveSizeForWorker[w];
574  if (nullptr == foundInBranchToReadingWorker->second) {
575  foundInBranchToReadingWorker->second = w;
576  } else {
577  branchToReadingWorker.insert(make_pair(itr->second, w));
578  }
579  }
580  }
581  }
582  }
583  }
584  }
585  {
586  auto it = branchToReadingWorker.begin();
587  std::vector<std::string> unusedBranches;
588  while (it != branchToReadingWorker.end()) {
589  if (it->second == nullptr) {
590  unusedBranches.push_back(it->first);
591  //erasing the object invalidates the iterator so must advance it first
592  auto temp = it;
593  ++it;
594  branchToReadingWorker.erase(temp);
595  } else {
596  ++it;
597  }
598  }
599  if (not unusedBranches.empty()) {
600  LogWarning l("UnusedProductsForCanDeleteEarly");
601  l << "The following products in the 'canDeleteEarly' list are not used in this job and will be ignored.\n"
602  " If possible, remove the producer from the job.";
603  for (auto const& n : unusedBranches) {
604  l << "\n " << n;
605  }
606  }
607  }
608  if (!branchToReadingWorker.empty()) {
609  earlyDeleteHelpers_.reserve(upperLimitOnReadingWorker);
610  earlyDeleteHelperToBranchIndicies_.resize(upperLimitOnIndicies, 0);
611  earlyDeleteBranchToCount_.reserve(nUniqueBranchesToDelete);
612  std::map<const Worker*, EarlyDeleteHelper*> alreadySeenWorkers;
613  std::string lastBranchName;
614  size_t nextOpenIndex = 0;
615  unsigned int* beginAddress = &(earlyDeleteHelperToBranchIndicies_.front());
616  for (auto& branchAndWorker : branchToReadingWorker) {
617  if (lastBranchName != branchAndWorker.first) {
618  //have to put back the period we removed earlier in order to get the proper name
619  BranchID bid(branchAndWorker.first + ".");
620  earlyDeleteBranchToCount_.emplace_back(bid, 0U);
621  lastBranchName = branchAndWorker.first;
622  }
623  auto found = alreadySeenWorkers.find(branchAndWorker.second);
624  if (alreadySeenWorkers.end() == found) {
625  //NOTE: we will set aside enough space in earlyDeleteHelperToBranchIndicies_ to accommodate
626  // all the branches that might be read by this worker. However, initially we will only tell the
627  // EarlyDeleteHelper about the first one. As additional branches are added via 'appendIndex' the
628  // EarlyDeleteHelper will automatically advance its internal end pointer.
629  size_t index = nextOpenIndex;
630  size_t nIndices = reserveSizeForWorker[branchAndWorker.second];
633  earlyDeleteHelpers_.emplace_back(beginAddress + index, beginAddress + index + 1, &earlyDeleteBranchToCount_);
634  branchAndWorker.second->setEarlyDeleteHelper(&(earlyDeleteHelpers_.back()));
635  alreadySeenWorkers.insert(std::make_pair(branchAndWorker.second, &(earlyDeleteHelpers_.back())));
636  nextOpenIndex += nIndices;
637  } else {
638  found->second->appendIndex(earlyDeleteBranchToCount_.size() - 1);
639  }
640  }
641 
642  //Now we can compactify the earlyDeleteHelperToBranchIndicies_ since we may have over estimated the
643  // space needed for each module
644  auto itLast = earlyDeleteHelpers_.begin();
645  for (auto it = earlyDeleteHelpers_.begin() + 1; it != earlyDeleteHelpers_.end(); ++it) {
646  if (itLast->end() != it->begin()) {
647  //figure the offset for next Worker since it hasn't been moved yet so it has the original address
648  unsigned int delta = it->begin() - itLast->end();
649  it->shiftIndexPointers(delta);
650 
652  earlyDeleteHelperToBranchIndicies_.begin() + (itLast->end() - beginAddress),
653  earlyDeleteHelperToBranchIndicies_.begin() + (it->begin() - beginAddress));
654  }
655  itLast = it;
656  }
658  earlyDeleteHelperToBranchIndicies_.begin() + (itLast->end() - beginAddress),
660 
661  //now tell the paths about the deleters
662  for (auto& p : trig_paths_) {
663  p.setEarlyDeleteHelpers(alreadySeenWorkers);
664  }
665  for (auto& p : end_paths_) {
666  p.setEarlyDeleteHelpers(alreadySeenWorkers);
667  }
669  }
670  }
671 
673  Worker* worker,
674  std::unordered_set<std::string>& conditionalModules,
675  std::unordered_multimap<std::string, edm::BranchDescription const*> const& conditionalModuleBranches,
676  std::unordered_multimap<std::string, AliasInfo> const& aliasMap,
677  ParameterSet& proc_pset,
678  ProductRegistry& preg,
679  PreallocationConfiguration const* prealloc,
680  std::shared_ptr<ProcessConfiguration const> processConfiguration) {
681  std::vector<Worker*> returnValue;
682  auto const& consumesInfo = worker->consumesInfo();
683  auto moduleLabel = worker->description()->moduleLabel();
684  using namespace productholderindexhelper;
685  for (auto const& ci : consumesInfo) {
686  if (not ci.skipCurrentProcess() and
687  (ci.process().empty() or ci.process() == processConfiguration->processName())) {
688  auto productModuleLabel = std::string(ci.label());
689  bool productFromConditionalModule = false;
690  auto itFound = conditionalModules.find(productModuleLabel);
691  if (itFound == conditionalModules.end()) {
692  //Check to see if this was an alias
693  //note that aliasMap was previously filtered so only the conditional modules remain there
694  auto foundAlias = findBestMatchingAlias(conditionalModuleBranches, aliasMap, productModuleLabel, ci);
695  if (foundAlias) {
696  productModuleLabel = *foundAlias;
697  productFromConditionalModule = true;
698  itFound = conditionalModules.find(productModuleLabel);
699  //check that the alias-for conditional module has not been used
700  if (itFound == conditionalModules.end()) {
701  continue;
702  }
703  }
704  } else {
705  //need to check the rest of the data product info
706  auto findBranches = conditionalModuleBranches.equal_range(productModuleLabel);
707  for (auto itBranch = findBranches.first; itBranch != findBranches.second; ++itBranch) {
708  if (itBranch->second->productInstanceName() == ci.instance()) {
709  if (ci.kindOfType() == PRODUCT_TYPE) {
710  if (ci.type() == itBranch->second->unwrappedTypeID()) {
711  productFromConditionalModule = true;
712  break;
713  }
714  } else {
715  //this is a view
717  ci.type(), TypeID(itBranch->second->wrappedType().typeInfo()), itBranch->second->className())) {
718  productFromConditionalModule = true;
719  break;
720  }
721  }
722  }
723  }
724  }
725  if (productFromConditionalModule) {
726  auto condWorker =
727  getWorker(productModuleLabel, proc_pset, workerManager_, preg, prealloc, processConfiguration);
728  assert(condWorker);
729 
730  conditionalModules.erase(itFound);
731 
732  auto dependents = tryToPlaceConditionalModules(condWorker,
733  conditionalModules,
734  conditionalModuleBranches,
735  aliasMap,
736  proc_pset,
737  preg,
738  prealloc,
739  processConfiguration);
740  returnValue.insert(returnValue.end(), dependents.begin(), dependents.end());
741  returnValue.push_back(condWorker);
742  }
743  }
744  }
745  return returnValue;
746  }
747 
749  ProductRegistry& preg,
750  PreallocationConfiguration const* prealloc,
751  std::shared_ptr<ProcessConfiguration const> processConfiguration,
752  std::string const& pathName,
753  bool ignoreFilters,
754  PathWorkers& out,
755  std::vector<std::string> const& endPathNames,
756  ConditionalTaskHelper const& conditionalTaskHelper) {
757  vstring modnames = proc_pset.getParameter<vstring>(pathName);
758  PathWorkers tmpworkers;
759 
760  //Pull out ConditionalTask modules
761  auto condRange = findConditionalTaskModulesRange(modnames);
762 
763  std::unordered_set<std::string> conditionalmods;
764  //An EDAlias may be redirecting to a module on a ConditionalTask
765  std::unordered_multimap<std::string, edm::BranchDescription const*> conditionalModsBranches;
766  std::unordered_map<std::string, unsigned int> conditionalModOrder;
767  if (condRange.first != condRange.second) {
768  for (auto it = condRange.first; it != condRange.second; ++it) {
769  // ordering needs to skip the # token in the path list
770  conditionalModOrder.emplace(*it, it - modnames.begin() - 1);
771  }
772  //the last entry should be ignored since it is required to be "@"
773  conditionalmods = std::unordered_set<std::string>(std::make_move_iterator(condRange.first),
774  std::make_move_iterator(condRange.second));
775 
776  conditionalModsBranches = conditionalTaskHelper.conditionalModuleBranches(conditionalmods);
777  modnames.erase(std::prev(condRange.first), modnames.end());
778  }
779 
780  unsigned int placeInPath = 0;
781  for (auto const& name : modnames) {
782  //Modules except EDFilters are set to run concurrently by default
783  bool doNotRunConcurrently = false;
785  if (name[0] == '!') {
786  filterAction = WorkerInPath::Veto;
787  } else if (name[0] == '-' or name[0] == '+') {
788  filterAction = WorkerInPath::Ignore;
789  }
790  if (name[0] == '|' or name[0] == '+') {
791  //cms.wait was specified so do not run concurrently
792  doNotRunConcurrently = true;
793  }
794 
796  if (filterAction != WorkerInPath::Normal or name[0] == '|') {
797  moduleLabel.erase(0, 1);
798  }
799 
800  Worker* worker = getWorker(moduleLabel, proc_pset, workerManager_, preg, prealloc, processConfiguration);
801  if (worker == nullptr) {
802  std::string pathType("endpath");
803  if (!search_all(endPathNames, pathName)) {
804  pathType = std::string("path");
805  }
807  << "The unknown module label \"" << moduleLabel << "\" appears in " << pathType << " \"" << pathName
808  << "\"\n please check spelling or remove that label from the path.";
809  }
810 
811  if (ignoreFilters && filterAction != WorkerInPath::Ignore && worker->moduleType() == Worker::kFilter) {
812  // We have a filter on an end path, and the filter is not explicitly ignored.
813  // See if the filter is allowed.
814  std::vector<std::string> allowed_filters = proc_pset.getUntrackedParameter<vstring>("@filters_on_endpaths");
815  if (!search_all(allowed_filters, worker->description()->moduleName())) {
816  // Filter is not allowed. Ignore the result, and issue a warning.
817  filterAction = WorkerInPath::Ignore;
818  LogWarning("FilterOnEndPath") << "The EDFilter '" << worker->description()->moduleName()
819  << "' with module label '" << moduleLabel << "' appears on EndPath '"
820  << pathName << "'.\n"
821  << "The return value of the filter will be ignored.\n"
822  << "To suppress this warning, either remove the filter from the endpath,\n"
823  << "or explicitly ignore it in the configuration by using cms.ignore().\n";
824  }
825  }
826  bool runConcurrently = not doNotRunConcurrently;
827  if (runConcurrently && worker->moduleType() == Worker::kFilter and filterAction != WorkerInPath::Ignore) {
828  runConcurrently = false;
829  }
830 
831  auto condModules = tryToPlaceConditionalModules(worker,
832  conditionalmods,
833  conditionalModsBranches,
834  conditionalTaskHelper.aliasMap(),
835  proc_pset,
836  preg,
837  prealloc,
838  processConfiguration);
839  for (auto condMod : condModules) {
840  tmpworkers.emplace_back(
841  condMod, WorkerInPath::Ignore, conditionalModOrder[condMod->description()->moduleLabel()], true);
842  }
843 
844  tmpworkers.emplace_back(worker, filterAction, placeInPath, runConcurrently);
845  ++placeInPath;
846  }
847 
848  out.swap(tmpworkers);
849  }
850 
852  ProductRegistry& preg,
853  PreallocationConfiguration const* prealloc,
854  std::shared_ptr<ProcessConfiguration const> processConfiguration,
855  int bitpos,
856  std::string const& name,
857  TrigResPtr trptr,
858  std::vector<std::string> const& endPathNames,
859  ConditionalTaskHelper const& conditionalTaskHelper) {
860  PathWorkers tmpworkers;
861  fillWorkers(
862  proc_pset, preg, prealloc, processConfiguration, name, false, tmpworkers, endPathNames, conditionalTaskHelper);
863 
864  // an empty path will cause an extra bit that is not used
865  if (!tmpworkers.empty()) {
866  trig_paths_.emplace_back(
867  bitpos, name, tmpworkers, trptr, actionTable(), actReg_, &streamContext_, PathContext::PathType::kPath);
868  } else {
869  empty_trig_paths_.push_back(bitpos);
870  }
871  for (WorkerInPath const& workerInPath : tmpworkers) {
872  addToAllWorkers(workerInPath.getWorker());
873  }
874  }
875 
877  ProductRegistry& preg,
878  PreallocationConfiguration const* prealloc,
879  std::shared_ptr<ProcessConfiguration const> processConfiguration,
880  int bitpos,
881  std::string const& name,
882  std::vector<std::string> const& endPathNames,
883  ConditionalTaskHelper const& conditionalTaskHelper) {
884  PathWorkers tmpworkers;
885  fillWorkers(
886  proc_pset, preg, prealloc, processConfiguration, name, true, tmpworkers, endPathNames, conditionalTaskHelper);
887 
888  if (!tmpworkers.empty()) {
889  end_paths_.emplace_back(bitpos,
890  name,
891  tmpworkers,
892  TrigResPtr(),
893  actionTable(),
894  actReg_,
897  } else {
898  empty_end_paths_.push_back(bitpos);
899  }
900  for (WorkerInPath const& workerInPath : tmpworkers) {
901  addToAllWorkers(workerInPath.getWorker());
902  }
903  }
904 
906 
908 
910  Worker* found = nullptr;
911  for (auto const& worker : allWorkers()) {
912  if (worker->description()->moduleLabel() == iLabel) {
913  found = worker;
914  break;
915  }
916  }
917  if (nullptr == found) {
918  return;
919  }
920 
921  iMod->replaceModuleFor(found);
922  found->beginStream(streamID_, streamContext_);
923  }
924 
926 
927  std::vector<ModuleDescription const*> StreamSchedule::getAllModuleDescriptions() const {
928  std::vector<ModuleDescription const*> result;
929  result.reserve(allWorkers().size());
930 
931  for (auto const& worker : allWorkers()) {
932  ModuleDescription const* p = worker->description();
933  result.push_back(p);
934  }
935  return result;
936  }
937 
939  WaitingTaskHolder iTask,
941  ServiceToken const& serviceToken,
942  std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters) {
943  EventPrincipal& ep = info.principal();
944 
945  // Caught exception is propagated via WaitingTaskHolder
946  CMS_SA_ALLOW try {
947  this->resetAll();
948 
950 
951  Traits::setStreamContext(streamContext_, ep);
952  //a service may want to communicate with another service
953  ServiceRegistry::Operate guard(serviceToken);
954  Traits::preScheduleSignal(actReg_.get(), &streamContext_);
955 
956  // Data dependencies need to be set up before marking empty
957  // (End)Paths complete in case something consumes the status of
958  // the empty (EndPath)
961 
962  HLTPathStatus hltPathStatus(hlt::Pass, 0);
963  for (int empty_trig_path : empty_trig_paths_) {
964  results_->at(empty_trig_path) = hltPathStatus;
965  pathStatusInserters[empty_trig_path]->setPathStatus(streamID_, hltPathStatus);
966  std::exception_ptr except = pathStatusInserterWorkers_[empty_trig_path]
969  if (except) {
970  iTask.doneWaiting(except);
971  return;
972  }
973  }
974  for (int empty_end_path : empty_end_paths_) {
975  std::exception_ptr except = endPathStatusInserterWorkers_[empty_end_path]
978  if (except) {
979  iTask.doneWaiting(except);
980  return;
981  }
982  }
983 
984  ++total_events_;
985 
986  //use to give priorities on an error to ones from Paths
987  auto pathErrorHolder = std::make_unique<std::atomic<std::exception_ptr*>>(nullptr);
988  auto pathErrorPtr = pathErrorHolder.get();
989  ServiceWeakToken weakToken = serviceToken;
990  auto allPathsDone = make_waiting_task(
991  [iTask, this, weakToken, pathError = std::move(pathErrorHolder)](std::exception_ptr const* iPtr) mutable {
992  ServiceRegistry::Operate operate(weakToken.lock());
993 
994  std::exception_ptr ptr;
995  if (pathError->load()) {
996  ptr = *pathError->load();
997  delete pathError->load();
998  }
999  if ((not ptr) and iPtr) {
1000  ptr = *iPtr;
1001  }
1002  iTask.doneWaiting(finishProcessOneEvent(ptr));
1003  });
1004  //The holder guarantees that if the paths finish before the loop ends
1005  // that we do not start too soon. It also guarantees that the task will
1006  // run under that condition.
1007  WaitingTaskHolder allPathsHolder(*iTask.group(), allPathsDone);
1008 
1009  auto pathsDone = make_waiting_task([allPathsHolder, pathErrorPtr, transitionInfo = info, this, weakToken](
1010  std::exception_ptr const* iPtr) mutable {
1011  ServiceRegistry::Operate operate(weakToken.lock());
1012 
1013  if (iPtr) {
1014  //this is used to prioritize this error over one
1015  // that happens in EndPath or Accumulate
1016  pathErrorPtr->store(new std::exception_ptr(*iPtr));
1017  }
1018  finishedPaths(*pathErrorPtr, std::move(allPathsHolder), transitionInfo);
1019  });
1020 
1021  //The holder guarantees that if the paths finish before the loop ends
1022  // that we do not start too soon. It also guarantees that the task will
1023  // run under that condition.
1024  WaitingTaskHolder taskHolder(*iTask.group(), pathsDone);
1025 
1026  //start end paths first so on single threaded the paths will run first
1027  WaitingTaskHolder hAllPathsDone(*iTask.group(), allPathsDone);
1028  for (auto it = end_paths_.rbegin(), itEnd = end_paths_.rend(); it != itEnd; ++it) {
1029  it->processOneOccurrenceAsync(hAllPathsDone, info, serviceToken, streamID_, &streamContext_);
1030  }
1031 
1032  for (auto it = trig_paths_.rbegin(), itEnd = trig_paths_.rend(); it != itEnd; ++it) {
1033  it->processOneOccurrenceAsync(taskHolder, info, serviceToken, streamID_, &streamContext_);
1034  }
1035 
1036  ParentContext parentContext(&streamContext_);
1038  hAllPathsDone, info, serviceToken, streamID_, parentContext, &streamContext_);
1039  } catch (...) {
1040  iTask.doneWaiting(std::current_exception());
1041  }
1042  }
1043 
1044  void StreamSchedule::finishedPaths(std::atomic<std::exception_ptr*>& iExcept,
1045  WaitingTaskHolder iWait,
1047  if (iExcept) {
1048  // Caught exception is propagated via WaitingTaskHolder
1049  CMS_SA_ALLOW try { std::rethrow_exception(*(iExcept.load())); } catch (cms::Exception& e) {
1053  edm::printCmsExceptionWarning("TryToContinue", e);
1054  *(iExcept.load()) = std::exception_ptr();
1055  } else {
1056  *(iExcept.load()) = std::current_exception();
1057  }
1058  } catch (...) {
1059  *(iExcept.load()) = std::current_exception();
1060  }
1061  }
1062 
1063  if ((not iExcept) and results_->accept()) {
1064  ++total_passed_;
1065  }
1066 
1067  if (nullptr != results_inserter_.get()) {
1068  // Caught exception is propagated to the caller
1069  CMS_SA_ALLOW try {
1070  //Even if there was an exception, we need to allow results inserter
1071  // to run since some module may be waiting on its results.
1072  ParentContext parentContext(&streamContext_);
1074 
1075  auto expt = results_inserter_->runModuleDirectly<Traits>(info, streamID_, parentContext, &streamContext_);
1076  if (expt) {
1077  std::rethrow_exception(expt);
1078  }
1079  } catch (cms::Exception& ex) {
1080  if (not iExcept) {
1081  if (ex.context().empty()) {
1082  std::ostringstream ost;
1083  ost << "Processing Event " << info.principal().id();
1084  ex.addContext(ost.str());
1085  }
1086  iExcept.store(new std::exception_ptr(std::current_exception()));
1087  }
1088  } catch (...) {
1089  if (not iExcept) {
1090  iExcept.store(new std::exception_ptr(std::current_exception()));
1091  }
1092  }
1093  }
1094  std::exception_ptr ptr;
1095  if (iExcept) {
1096  ptr = *iExcept.load();
1097  }
1098  iWait.doneWaiting(ptr);
1099  }
1100 
1101  std::exception_ptr StreamSchedule::finishProcessOneEvent(std::exception_ptr iExcept) {
1103 
1104  if (iExcept) {
1105  //add context information to the exception and print message
1106  try {
1107  convertException::wrap([&]() { std::rethrow_exception(iExcept); });
1108  } catch (cms::Exception& ex) {
1109  bool const cleaningUpAfterException = false;
1110  if (ex.context().empty()) {
1111  addContextAndPrintException("Calling function StreamSchedule::processOneEvent", ex, cleaningUpAfterException);
1112  } else {
1113  addContextAndPrintException("", ex, cleaningUpAfterException);
1114  }
1115  iExcept = std::current_exception();
1116  }
1117 
1118  actReg_->preStreamEarlyTerminationSignal_(streamContext_, TerminationOrigin::ExceptionFromThisContext);
1119  }
1120  // Caught exception is propagated to the caller
1121  CMS_SA_ALLOW try { Traits::postScheduleSignal(actReg_.get(), &streamContext_); } catch (...) {
1122  if (not iExcept) {
1123  iExcept = std::current_exception();
1124  }
1125  }
1126  if (not iExcept) {
1127  resetEarlyDelete();
1128  }
1129 
1130  return iExcept;
1131  }
1132 
1133  void StreamSchedule::availablePaths(std::vector<std::string>& oLabelsToFill) const {
1134  oLabelsToFill.reserve(trig_paths_.size());
1135  std::transform(trig_paths_.begin(),
1136  trig_paths_.end(),
1137  std::back_inserter(oLabelsToFill),
1138  std::bind(&Path::name, std::placeholders::_1));
1139  }
1140 
1141  void StreamSchedule::modulesInPath(std::string const& iPathLabel, std::vector<std::string>& oLabelsToFill) const {
1142  TrigPaths::const_iterator itFound = std::find_if(
1143  trig_paths_.begin(),
1144  trig_paths_.end(),
1145  std::bind(std::equal_to<std::string>(), iPathLabel, std::bind(&Path::name, std::placeholders::_1)));
1146  if (itFound != trig_paths_.end()) {
1147  oLabelsToFill.reserve(itFound->size());
1148  for (size_t i = 0; i < itFound->size(); ++i) {
1149  oLabelsToFill.push_back(itFound->getWorker(i)->description()->moduleLabel());
1150  }
1151  }
1152  }
1153 
1155  std::vector<ModuleDescription const*>& descriptions,
1156  unsigned int hint) const {
1157  descriptions.clear();
1158  bool found = false;
1159  TrigPaths::const_iterator itFound;
1160 
1161  if (hint < trig_paths_.size()) {
1162  itFound = trig_paths_.begin() + hint;
1163  if (itFound->name() == iPathLabel)
1164  found = true;
1165  }
1166  if (!found) {
1167  // if the hint did not work, do it the slow way
1168  itFound = std::find_if(
1169  trig_paths_.begin(),
1170  trig_paths_.end(),
1171  std::bind(std::equal_to<std::string>(), iPathLabel, std::bind(&Path::name, std::placeholders::_1)));
1172  if (itFound != trig_paths_.end())
1173  found = true;
1174  }
1175  if (found) {
1176  descriptions.reserve(itFound->size());
1177  for (size_t i = 0; i < itFound->size(); ++i) {
1178  descriptions.push_back(itFound->getWorker(i)->description());
1179  }
1180  }
1181  }
1182 
1184  std::vector<ModuleDescription const*>& descriptions,
1185  unsigned int hint) const {
1186  descriptions.clear();
1187  bool found = false;
1188  TrigPaths::const_iterator itFound;
1189 
1190  if (hint < end_paths_.size()) {
1191  itFound = end_paths_.begin() + hint;
1192  if (itFound->name() == iEndPathLabel)
1193  found = true;
1194  }
1195  if (!found) {
1196  // if the hint did not work, do it the slow way
1197  itFound = std::find_if(
1198  end_paths_.begin(),
1199  end_paths_.end(),
1200  std::bind(std::equal_to<std::string>(), iEndPathLabel, std::bind(&Path::name, std::placeholders::_1)));
1201  if (itFound != end_paths_.end())
1202  found = true;
1203  }
1204  if (found) {
1205  descriptions.reserve(itFound->size());
1206  for (size_t i = 0; i < itFound->size(); ++i) {
1207  descriptions.push_back(itFound->getWorker(i)->description());
1208  }
1209  }
1210  }
1211 
1212  static void fillModuleInPathSummary(Path const& path, size_t which, ModuleInPathSummary& sum) {
1213  sum.timesVisited += path.timesVisited(which);
1214  sum.timesPassed += path.timesPassed(which);
1215  sum.timesFailed += path.timesFailed(which);
1216  sum.timesExcept += path.timesExcept(which);
1217  sum.moduleLabel = path.getWorker(which)->description()->moduleLabel();
1218  sum.bitPosition = path.bitPosition(which);
1219  }
1220 
1221  static void fillPathSummary(Path const& path, PathSummary& sum) {
1222  sum.name = path.name();
1223  sum.bitPosition = path.bitPosition();
1224  sum.timesRun += path.timesRun();
1225  sum.timesPassed += path.timesPassed();
1226  sum.timesFailed += path.timesFailed();
1227  sum.timesExcept += path.timesExcept();
1228 
1229  Path::size_type sz = path.size();
1230  if (sum.moduleInPathSummaries.empty()) {
1231  std::vector<ModuleInPathSummary> temp(sz);
1232  for (size_t i = 0; i != sz; ++i) {
1234  }
1235  sum.moduleInPathSummaries.swap(temp);
1236  } else {
1237  assert(sz == sum.moduleInPathSummaries.size());
1238  for (size_t i = 0; i != sz; ++i) {
1240  }
1241  }
1242  }
1243 
1244  static void fillWorkerSummaryAux(Worker const& w, WorkerSummary& sum) {
1245  sum.timesVisited += w.timesVisited();
1246  sum.timesRun += w.timesRun();
1247  sum.timesPassed += w.timesPassed();
1248  sum.timesFailed += w.timesFailed();
1249  sum.timesExcept += w.timesExcept();
1250  sum.moduleLabel = w.description()->moduleLabel();
1251  }
1252 
1253  static void fillWorkerSummary(Worker const* pw, WorkerSummary& sum) { fillWorkerSummaryAux(*pw, sum); }
1254 
1256  rep.eventSummary.totalEvents += totalEvents();
1257  rep.eventSummary.totalEventsPassed += totalEventsPassed();
1258  rep.eventSummary.totalEventsFailed += totalEventsFailed();
1259 
1260  fill_summary(trig_paths_, rep.trigPathSummaries, &fillPathSummary);
1261  fill_summary(end_paths_, rep.endPathSummaries, &fillPathSummary);
1262  fill_summary(allWorkers(), rep.workerSummaries, &fillWorkerSummary);
1263  }
1264 
1266  using std::placeholders::_1;
1268  for_all(trig_paths_, std::bind(&Path::clearCounters, _1));
1269  for_all(end_paths_, std::bind(&Path::clearCounters, _1));
1270  for_all(allWorkers(), std::bind(&Worker::clearCounters, _1));
1271  }
1272 
1273  void StreamSchedule::resetAll() { results_->reset(); }
1274 
1276 
1278  //must be sure we have cleared the count first
1279  for (auto& count : earlyDeleteBranchToCount_) {
1280  count.count = 0;
1281  }
1282  //now reset based on how many helpers use that branch
1284  ++(earlyDeleteBranchToCount_[index].count);
1285  }
1286  for (auto& helper : earlyDeleteHelpers_) {
1287  helper.reset();
1288  }
1289  }
1290 
1292  std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
1293  std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
1295  int bitpos = 0;
1296  unsigned int indexEmpty = 0;
1297  unsigned int indexOfPath = 0;
1298  for (auto& pathStatusInserter : pathStatusInserters) {
1299  std::shared_ptr<PathStatusInserter> inserterPtr = get_underlying(pathStatusInserter);
1300  WorkerPtr workerPtr(
1301  new edm::WorkerT<PathStatusInserter::ModuleType>(inserterPtr, inserterPtr->moduleDescription(), &actions));
1302  pathStatusInserterWorkers_.emplace_back(workerPtr);
1303  workerPtr->setActivityRegistry(actReg_);
1304  addToAllWorkers(workerPtr.get());
1305 
1306  // A little complexity here because a C++ Path object is not
1307  // instantiated and put into end_paths if there are no modules
1308  // on the configured path.
1309  if (indexEmpty < empty_trig_paths_.size() && bitpos == empty_trig_paths_.at(indexEmpty)) {
1310  ++indexEmpty;
1311  } else {
1312  trig_paths_.at(indexOfPath).setPathStatusInserter(inserterPtr.get(), workerPtr.get());
1313  ++indexOfPath;
1314  }
1315  ++bitpos;
1316  }
1317 
1318  bitpos = 0;
1319  indexEmpty = 0;
1320  indexOfPath = 0;
1321  for (auto& endPathStatusInserter : endPathStatusInserters) {
1322  std::shared_ptr<EndPathStatusInserter> inserterPtr = get_underlying(endPathStatusInserter);
1323  WorkerPtr workerPtr(
1324  new edm::WorkerT<EndPathStatusInserter::ModuleType>(inserterPtr, inserterPtr->moduleDescription(), &actions));
1325  endPathStatusInserterWorkers_.emplace_back(workerPtr);
1326  workerPtr->setActivityRegistry(actReg_);
1327  addToAllWorkers(workerPtr.get());
1328 
1329  // A little complexity here because a C++ Path object is not
1330  // instantiated and put into end_paths if there are no modules
1331  // on the configured path.
1332  if (indexEmpty < empty_end_paths_.size() && bitpos == empty_end_paths_.at(indexEmpty)) {
1333  ++indexEmpty;
1334  } else {
1335  end_paths_.at(indexOfPath).setPathStatusInserter(nullptr, workerPtr.get());
1336  ++indexOfPath;
1337  }
1338  ++bitpos;
1339  }
1340  }
1341 } // namespace edm
static void fillModuleInPathSummary(Path const &path, size_t which, ModuleInPathSummary &sum)
void initializeEarlyDelete(ModuleRegistry &modReg, std::vector< std::string > const &branchesToDeleteEarly, std::multimap< std::string, std::string > const &referencesToBranches, std::vector< std::string > const &modulesToSkip, edm::ProductRegistry const &preg)
void moduleDescriptionsInEndPath(std::string const &iEndPathLabel, std::vector< ModuleDescription const *> &descriptions, unsigned int hint) const
ExceptionToActionTable const & actionTable() const
returns the action table
T getParameter(std::string const &) const
Definition: ParameterSet.h:307
void fillTrigPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, TrigResPtr, std::vector< std::string > const &endPathNames, ConditionalTaskHelper const &conditionalTaskHelper)
static const TGPicture * info(bool iBackgroundIsBlack)
#define CMS_SA_ALLOW
std::vector< int > empty_trig_paths_
Definition: helper.py:1
roAction_t actions[nactions]
Definition: GenABIO.cc:181
std::vector< edm::propagate_const< WorkerPtr > > pathStatusInserterWorkers_
void endStream(StreamID iID, StreamContext &streamContext)
virtual void replaceModuleFor(Worker *) const =0
ProductList const & productList() const
std::unordered_multimap< std::string, AliasInfo > const & aliasMap() const
void processAccumulatorsAsync(WaitingTaskHolder, typename T::TransitionInfoType const &, ServiceToken const &, StreamID, ParentContext const &, typename T::Context const *)
std::array< SelectedProducts, NumBranchTypes > SelectedProductsForBranchType
static PFTauRenderPlugin instance
T w() const
std::shared_ptr< HLTGlobalStatus > TrigResPtr
WorkersInPath::size_type size_type
Definition: Path.h:46
int totalEventsPassed() const
ret
prodAgent to be discontinued
void modulesInPath(std::string const &iPathLabel, std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel ...
void processOneEventAsync(WaitingTaskHolder iTask, EventTransitionInfo &, ServiceToken const &token, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters)
std::vector< BranchToCount > earlyDeleteBranchToCount_
std::unordered_multimap< std::string, AliasInfo > aliasMap_
void setupOnDemandSystem(EventTransitionInfo const &)
void clearCounters()
Definition: Worker.h:232
void addContextAndPrintException(char const *context, cms::Exception &ex, bool disablePrint)
void addToAllWorkers(Worker *w)
std::string const & moduleName() const
StreamSchedule(std::shared_ptr< TriggerResultInserter > inserter, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, std::shared_ptr< ModuleRegistry >, ParameterSet &proc_pset, service::TriggerNamesService const &tns, PreallocationConfiguration const &prealloc, ProductRegistry &pregistry, ExceptionToActionTable const &actions, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, StreamID streamID, ProcessContext const *processContext)
std::vector< Worker * > tryToPlaceConditionalModules(Worker *, std::unordered_set< std::string > &conditionalModules, std::unordered_multimap< std::string, edm::BranchDescription const *> const &conditionalModuleBranches, std::unordered_multimap< std::string, AliasInfo > const &aliasMap, ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration)
edm::propagate_const< WorkerPtr > results_inserter_
void deleteModuleIfExists(std::string const &moduleLabel)
std::shared_ptr< Worker > WorkerPtr
void find(edm::Handle< EcalRecHitCollection > &hits, DetId thisDet, std::vector< EcalRecHitCollection::const_iterator > &hit, bool debug=false)
Definition: FindCaloHit.cc:19
assert(be >=bs)
StreamID streamID() const
ConditionalTaskHelper(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, WorkerManager &workerManager, std::vector< std::string > const &trigPathNames)
unsigned int number_of_unscheduled_modules_
int totalEventsFailed() const
TEMPL(T2) struct Divides void
Definition: Factorize.h:24
WorkerManager workerManager_
std::shared_ptr< ActivityRegistry > actReg_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
T getUntrackedParameter(std::string const &, T const &) const
constexpr element_type const * get() const
std::string moduleLabel
Definition: TriggerReport.h:52
oneapi::tbb::task_group * group() const noexcept
void processEDAliases(std::vector< std::string > const &aliasNamesToProcess, std::unordered_set< std::string > const &aliasModulesToProcess, ParameterSet const &proc_pset, std::string const &processName, ProductRegistry &preg)
char const * label
std::vector< WorkerInPath > PathWorkers
std::vector< int > empty_end_paths_
void doneWaiting(std::exception_ptr iExcept)
accept
Definition: HLTenums.h:18
void fillEndPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, std::vector< std::string > const &endPathNames, ConditionalTaskHelper const &conditionalTaskHelper)
void processSwitchEDAliases(ParameterSet const &proc_pset, ProductRegistry &preg, ProcessConfiguration const &processConfiguration, std::unordered_set< std::string > const &allConditionalMods)
std::string name
Definition: TriggerReport.h:41
void moduleDescriptionsInPath(std::string const &iPathLabel, std::vector< ModuleDescription const *> &descriptions, unsigned int hint) const
TrigResConstPtr results() const
Strings const & getTrigPaths() const
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
def unique(seq, keepstr=True)
Definition: tier0.py:24
Definition: Path.h:41
StreamContext streamContext_
std::vector< std::string > vstring
virtual std::vector< ConsumesInfo > consumesInfo() const =0
static void fillPathSummary(Path const &path, PathSummary &sum)
void finishedPaths(std::atomic< std::exception_ptr *> &, WaitingTaskHolder, EventTransitionInfo &)
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:92
bool typeIsViewCompatible(TypeID const &requestedViewType, TypeID const &wrappedtypeID, std::string const &className)
rep
Definition: cuy.py:1189
std::exception_ptr finishProcessOneEvent(std::exception_ptr)
ServiceToken lock() const
Definition: ServiceToken.h:101
void deleteModule(std::string const &iLabel)
Delete the module with label iLabel.
int totalEvents() const
static void fillWorkerSummaryAux(Worker const &w, WorkerSummary &sum)
virtual Types moduleType() const =0
Log< level::Info, false > LogInfo
void beginStream(StreamID iID, StreamContext &streamContext)
ModuleDescription const * description() const
Definition: Worker.h:198
std::string const & className() const
Definition: TypeID.cc:40
void clearCounters()
Clear all the counters in the trigger report.
void fillAliasMap(ParameterSet const &proc_pset, std::unordered_set< std::string > const &allConditionalMods)
Strings const & getEndPaths() const
void getTriggerReport(TriggerReport &rep) const
void addToUnscheduledWorkers(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string label, std::set< std::string > &unscheduledLabels, std::vector< std::string > &shouldBeUsedLabels)
exception_actions::ActionCodes find(const std::string &category) const
std::vector< unsigned int > earlyDeleteHelperToBranchIndicies_
void forAllModuleHolders(F iFunc)
edm::propagate_const< TrigResPtr > results_
bool search_all(ForwardSequence const &s, Datum const &d)
Definition: Algorithms.h:36
double b
Definition: hdecay.h:120
constexpr T & get_underlying(propagate_const< T > &)
void addContext(std::string const &context)
Definition: Exception.cc:169
virtual std::unique_ptr< OutputModuleCommunicator > createOutputModuleCommunicator()=0
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
static void fillWorkerSummary(Worker const *pw, WorkerSummary &sum)
std::string const & processName() const
HLT enums.
void setupResolvers(Principal &principal)
void replaceModule(maker::ModuleHolder *iMod, std::string const &iLabel)
clone the type of module with label iLabel but configure with iPSet.
void fillWorkers(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &name, bool ignoreFilters, PathWorkers &out, std::vector< std::string > const &endPathNames, ConditionalTaskHelper const &conditionalTaskHelper)
std::vector< edm::propagate_const< WorkerPtr > > endPathStatusInserterWorkers_
void makePathStatusInserters(std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, ExceptionToActionTable const &actions)
def which(cmd)
Definition: eostools.py:336
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
std::unordered_multimap< std::string, edm::BranchDescription const * > conditionalModuleBranches(std::unordered_set< std::string > const &conditionalmods) const
std::vector< ModuleInPathSummary > moduleInPathSummaries
Definition: TriggerReport.h:42
void availablePaths(std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill the labels for all paths in the process
unsigned int value() const
Definition: StreamID.h:43
auto wrap(F iFunc) -> decltype(iFunc())
std::unordered_multimap< std::string, edm::BranchDescription const * > conditionalModsBranches_
Log< level::Warning, false > LogWarning
std::list< std::string > const & context() const
Definition: Exception.cc:151
long double T
std::string const & moduleLabel() const
T mod(const T &a, const T &b)
Definition: ecalDccMap.h:4
std::string const & name() const
Definition: Path.h:73
std::vector< std::string > vstring
Definition: Schedule.cc:482
std::vector< std::string > set_difference(std::vector< std::string > const &v1, std::vector< std::string > const &v2)
def move(src, dest)
Definition: eostools.py:511
void clearCounters()
Definition: Path.cc:183
std::vector< EarlyDeleteHelper > earlyDeleteHelpers_
void addToAllWorkers(Worker *w)
void printCmsExceptionWarning(char const *behavior, cms::Exception const &e)
ParameterSet * getPSetForUpdate(std::string const &name, bool &isTracked)
std::vector< std::string > set_intersection(std::vector< std::string > const &v1, std::vector< std::string > const &v2)
unsigned transform(const HcalDetId &id, unsigned transformCode)