CMS 3D CMS Logo

StreamSchedule.cc
Go to the documentation of this file.
2 
29 
31 #include "processEDAliases.h"
32 
33 #include <algorithm>
34 #include <cassert>
35 #include <cstdlib>
36 #include <functional>
37 #include <iomanip>
38 #include <limits>
39 #include <list>
40 #include <map>
41 #include <exception>
42 #include <fmt/format.h>
43 
44 namespace edm {
45 
46  namespace {
47 
48  // Function template to transform each element in the input range to
49  // a value placed into the output range. The supplied function
50  // should take a const_reference to the 'input', and write to a
51  // reference to the 'output'.
52  template <typename InputIterator, typename ForwardIterator, typename Func>
53  void transform_into(InputIterator begin, InputIterator end, ForwardIterator out, Func func) {
54  for (; begin != end; ++begin, ++out)
55  func(*begin, *out);
56  }
57 
58  // Function template that takes a sequence 'from', a sequence
59  // 'to', and a callable object 'func'. It and applies
60  // transform_into to fill the 'to' sequence with the values
61  // calcuated by the callable object, taking care to fill the
62  // outupt only if all calls succeed.
63  template <typename FROM, typename TO, typename FUNC>
64  void fill_summary(FROM const& from, TO& to, FUNC func) {
65  if (to.size() != from.size()) {
66  TO temp(from.size());
67  transform_into(from.begin(), from.end(), temp.begin(), func);
68  to.swap(temp);
69  } else {
70  transform_into(from.begin(), from.end(), to.begin(), func);
71  }
72  }
73 
74  // -----------------------------
75 
76  // Here we make the trigger results inserter directly. This should
77  // probably be a utility in the WorkerRegistry or elsewhere.
78 
79  StreamSchedule::WorkerPtr makeInserter(ExceptionToActionTable const& actions,
80  std::shared_ptr<ActivityRegistry> areg,
81  std::shared_ptr<TriggerResultInserter> inserter) {
83  new edm::WorkerT<TriggerResultInserter::ModuleType>(inserter, inserter->moduleDescription(), &actions));
84  ptr->setActivityRegistry(areg);
85  return ptr;
86  }
87 
88  void initializeBranchToReadingWorker(std::vector<std::string> const& branchesToDeleteEarly,
89  ProductRegistry const& preg,
90  std::multimap<std::string, Worker*>& branchToReadingWorker) {
91  auto vBranchesToDeleteEarly = branchesToDeleteEarly;
92  // Remove any duplicates
93  std::sort(vBranchesToDeleteEarly.begin(), vBranchesToDeleteEarly.end(), std::less<std::string>());
94  vBranchesToDeleteEarly.erase(std::unique(vBranchesToDeleteEarly.begin(), vBranchesToDeleteEarly.end()),
95  vBranchesToDeleteEarly.end());
96 
97  // Are the requested items in the product registry?
98  auto allBranchNames = preg.allBranchNames();
99  //the branch names all end with a period, which we do not want to compare with
100  for (auto& b : allBranchNames) {
101  b.resize(b.size() - 1);
102  }
103  std::sort(allBranchNames.begin(), allBranchNames.end(), std::less<std::string>());
104  std::vector<std::string> temp;
105  temp.reserve(vBranchesToDeleteEarly.size());
106 
107  std::set_intersection(vBranchesToDeleteEarly.begin(),
108  vBranchesToDeleteEarly.end(),
109  allBranchNames.begin(),
110  allBranchNames.end(),
111  std::back_inserter(temp));
112  vBranchesToDeleteEarly.swap(temp);
113  if (temp.size() != vBranchesToDeleteEarly.size()) {
114  std::vector<std::string> missingProducts;
115  std::set_difference(temp.begin(),
116  temp.end(),
117  vBranchesToDeleteEarly.begin(),
118  vBranchesToDeleteEarly.end(),
119  std::back_inserter(missingProducts));
120  LogInfo l("MissingProductsForCanDeleteEarly");
121  l << "The following products in the 'canDeleteEarly' list are not available in this job and will be ignored.";
122  for (auto const& n : missingProducts) {
123  l << "\n " << n;
124  }
125  }
126  //set placeholder for the branch, we will remove the nullptr if a
127  // module actually wants the branch.
128  for (auto const& branch : vBranchesToDeleteEarly) {
129  branchToReadingWorker.insert(std::make_pair(branch, static_cast<Worker*>(nullptr)));
130  }
131  }
132 
133  Worker* getWorker(std::string const& moduleLabel,
134  ParameterSet& proc_pset,
135  WorkerManager& workerManager,
136  ProductRegistry& preg,
137  PreallocationConfiguration const* prealloc,
138  std::shared_ptr<ProcessConfiguration const> processConfiguration) {
139  bool isTracked;
140  ParameterSet* modpset = proc_pset.getPSetForUpdate(moduleLabel, isTracked);
141  if (modpset == nullptr) {
142  return nullptr;
143  }
144  assert(isTracked);
145 
146  return workerManager.getWorker(*modpset, preg, prealloc, processConfiguration, moduleLabel);
147  }
148 
149  // If ConditionalTask modules exist in the container of module
150  // names, returns the range (std::pair) for the modules. The range
151  // excludes the special markers '#' (right before the
152  // ConditionalTask modules) and '@' (last element).
153  // If the module name container does not contain ConditionalTask
154  // modules, returns std::pair of end iterators.
155  template <typename T>
156  auto findConditionalTaskModulesRange(T& modnames) {
157  auto beg = std::find(modnames.begin(), modnames.end(), "#");
158  if (beg == modnames.end()) {
159  return std::pair(modnames.end(), modnames.end());
160  }
161  return std::pair(beg + 1, std::prev(modnames.end()));
162  }
163 
164  std::optional<std::string> findBestMatchingAlias(
165  std::unordered_multimap<std::string, edm::BranchDescription const*> const& conditionalModuleBranches,
166  std::unordered_multimap<std::string, StreamSchedule::AliasInfo> const& aliasMap,
167  std::string const& productModuleLabel,
168  ConsumesInfo const& consumesInfo) {
169  std::optional<std::string> best;
170  int wildcardsInBest = std::numeric_limits<int>::max();
171  bool bestIsAmbiguous = false;
172 
173  auto updateBest = [&best, &wildcardsInBest, &bestIsAmbiguous](
174  std::string const& label, bool instanceIsWildcard, bool typeIsWildcard) {
175  int const wildcards = static_cast<int>(instanceIsWildcard) + static_cast<int>(typeIsWildcard);
176  if (wildcards == 0) {
177  bestIsAmbiguous = false;
178  return true;
179  }
180  if (not best or wildcards < wildcardsInBest) {
181  best = label;
182  wildcardsInBest = wildcards;
183  bestIsAmbiguous = false;
184  } else if (best and *best != label and wildcardsInBest == wildcards) {
185  bestIsAmbiguous = true;
186  }
187  return false;
188  };
189 
190  auto findAlias = aliasMap.equal_range(productModuleLabel);
191  for (auto it = findAlias.first; it != findAlias.second; ++it) {
192  std::string const& aliasInstanceLabel =
193  it->second.instanceLabel != "*" ? it->second.instanceLabel : it->second.originalInstanceLabel;
194  bool const instanceIsWildcard = (aliasInstanceLabel == "*");
195  if (instanceIsWildcard or consumesInfo.instance() == aliasInstanceLabel) {
196  bool const typeIsWildcard = it->second.friendlyClassName == "*";
197  if (typeIsWildcard or (consumesInfo.type().friendlyClassName() == it->second.friendlyClassName)) {
198  if (updateBest(it->second.originalModuleLabel, instanceIsWildcard, typeIsWildcard)) {
199  return it->second.originalModuleLabel;
200  }
201  } else if (consumesInfo.kindOfType() == ELEMENT_TYPE) {
202  //consume is a View so need to do more intrusive search
203  //find matching branches in module
204  auto branches = conditionalModuleBranches.equal_range(productModuleLabel);
205  for (auto itBranch = branches.first; itBranch != branches.second; ++it) {
206  if (typeIsWildcard or itBranch->second->productInstanceName() == it->second.originalInstanceLabel) {
207  if (productholderindexhelper::typeIsViewCompatible(consumesInfo.type(),
208  TypeID(itBranch->second->wrappedType().typeInfo()),
209  itBranch->second->className())) {
210  if (updateBest(it->second.originalModuleLabel, instanceIsWildcard, typeIsWildcard)) {
211  return it->second.originalModuleLabel;
212  }
213  }
214  }
215  }
216  }
217  }
218  }
219  if (bestIsAmbiguous) {
221  << "Encountered ambiguity when trying to find a best-matching alias for\n"
222  << " friendly class name " << consumesInfo.type().friendlyClassName() << "\n"
223  << " module label " << productModuleLabel << "\n"
224  << " product instance name " << consumesInfo.instance() << "\n"
225  << "when processing EDAliases for modules in ConditionalTasks. Two aliases have the same number of "
226  "wildcards ("
227  << wildcardsInBest << ")";
228  }
229  return best;
230  }
231  } // namespace
232 
233  // -----------------------------
234 
235  typedef std::vector<std::string> vstring;
236 
237  // -----------------------------
238 
240  public:
242 
244  ProductRegistry& preg,
245  PreallocationConfiguration const* prealloc,
246  std::shared_ptr<ProcessConfiguration const> processConfiguration,
247  WorkerManager& workerManager,
248  std::vector<std::string> const& trigPathNames) {
249  std::unordered_set<std::string> allConditionalMods;
250  for (auto const& pathName : trigPathNames) {
251  auto const modnames = proc_pset.getParameter<vstring>(pathName);
252 
253  //Pull out ConditionalTask modules
254  auto condRange = findConditionalTaskModulesRange(modnames);
255  if (condRange.first == condRange.second)
256  continue;
257 
258  //the last entry should be ignored since it is required to be "@"
259  allConditionalMods.insert(condRange.first, condRange.second);
260  }
261 
262  for (auto const& cond : allConditionalMods) {
263  //force the creation of the conditional modules so alias check can work
264  (void)getWorker(cond, proc_pset, workerManager, preg, prealloc, processConfiguration);
265  }
266 
267  fillAliasMap(proc_pset, allConditionalMods);
268  processSwitchEDAliases(proc_pset, preg, *processConfiguration, allConditionalMods);
269 
270  //find branches created by the conditional modules
271  for (auto const& prod : preg.productList()) {
272  if (allConditionalMods.find(prod.first.moduleLabel()) != allConditionalMods.end()) {
273  conditionalModsBranches_.emplace(prod.first.moduleLabel(), &prod.second);
274  }
275  }
276  }
277 
278  std::unordered_multimap<std::string, AliasInfo> const& aliasMap() const { return aliasMap_; }
279 
280  std::unordered_multimap<std::string, edm::BranchDescription const*> conditionalModuleBranches(
281  std::unordered_set<std::string> const& conditionalmods) const {
282  std::unordered_multimap<std::string, edm::BranchDescription const*> ret;
283  for (auto const& mod : conditionalmods) {
284  auto range = conditionalModsBranches_.equal_range(mod);
285  ret.insert(range.first, range.second);
286  }
287  return ret;
288  }
289 
290  private:
291  void fillAliasMap(ParameterSet const& proc_pset, std::unordered_set<std::string> const& allConditionalMods) {
292  auto aliases = proc_pset.getParameter<std::vector<std::string>>("@all_aliases");
293  std::string const star("*");
294  for (auto const& alias : aliases) {
295  auto info = proc_pset.getParameter<edm::ParameterSet>(alias);
296  auto aliasedToModuleLabels = info.getParameterNames();
297  for (auto const& mod : aliasedToModuleLabels) {
298  if (not mod.empty() and mod[0] != '@' and allConditionalMods.find(mod) != allConditionalMods.end()) {
299  auto aliasVPSet = info.getParameter<std::vector<edm::ParameterSet>>(mod);
300  for (auto const& aliasPSet : aliasVPSet) {
301  std::string type = star;
302  std::string instance = star;
303  std::string originalInstance = star;
304  if (aliasPSet.exists("type")) {
305  type = aliasPSet.getParameter<std::string>("type");
306  }
307  if (aliasPSet.exists("toProductInstance")) {
308  instance = aliasPSet.getParameter<std::string>("toProductInstance");
309  }
310  if (aliasPSet.exists("fromProductInstance")) {
311  originalInstance = aliasPSet.getParameter<std::string>("fromProductInstance");
312  }
313 
314  aliasMap_.emplace(alias, AliasInfo{type, instance, originalInstance, mod});
315  }
316  }
317  }
318  }
319  }
320 
321  void processSwitchEDAliases(ParameterSet const& proc_pset,
322  ProductRegistry& preg,
323  ProcessConfiguration const& processConfiguration,
324  std::unordered_set<std::string> const& allConditionalMods) {
325  auto const& all_modules = proc_pset.getParameter<std::vector<std::string>>("@all_modules");
326  std::vector<std::string> switchEDAliases;
327  for (auto const& module : all_modules) {
328  auto const& mod_pset = proc_pset.getParameter<edm::ParameterSet>(module);
329  if (mod_pset.getParameter<std::string>("@module_type") == "SwitchProducer") {
330  auto const& all_cases = mod_pset.getParameter<std::vector<std::string>>("@all_cases");
331  for (auto const& case_label : all_cases) {
332  auto range = aliasMap_.equal_range(case_label);
333  if (range.first != range.second) {
334  switchEDAliases.push_back(case_label);
335  }
336  }
337  }
338  }
340  switchEDAliases, allConditionalMods, proc_pset, processConfiguration.processName(), preg);
341  }
342 
343  std::unordered_multimap<std::string, AliasInfo> aliasMap_;
344  std::unordered_multimap<std::string, edm::BranchDescription const*> conditionalModsBranches_;
345  };
346 
347  // -----------------------------
348 
350  std::shared_ptr<TriggerResultInserter> inserter,
351  std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
352  std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
353  std::shared_ptr<ModuleRegistry> modReg,
354  ParameterSet& proc_pset,
355  service::TriggerNamesService const& tns,
356  PreallocationConfiguration const& prealloc,
357  ProductRegistry& preg,
359  std::shared_ptr<ActivityRegistry> areg,
360  std::shared_ptr<ProcessConfiguration const> processConfiguration,
361  StreamID streamID,
362  ProcessContext const* processContext)
363  : workerManager_(modReg, areg, actions),
364  actReg_(areg),
365  results_(new HLTGlobalStatus(tns.getTrigPaths().size())),
366  results_inserter_(),
367  trig_paths_(),
368  end_paths_(),
369  total_events_(),
370  total_passed_(),
371  number_of_unscheduled_modules_(0),
372  streamID_(streamID),
373  streamContext_(streamID_, processContext),
374  skippingEvent_(false) {
375  bool hasPath = false;
376  std::vector<std::string> const& pathNames = tns.getTrigPaths();
377  std::vector<std::string> const& endPathNames = tns.getEndPaths();
378 
379  ConditionalTaskHelper conditionalTaskHelper(
380  proc_pset, preg, &prealloc, processConfiguration, workerManager_, pathNames);
381 
382  int trig_bitpos = 0;
383  trig_paths_.reserve(pathNames.size());
384  for (auto const& trig_name : pathNames) {
385  fillTrigPath(proc_pset,
386  preg,
387  &prealloc,
388  processConfiguration,
389  trig_bitpos,
390  trig_name,
391  results(),
392  endPathNames,
393  conditionalTaskHelper);
394  ++trig_bitpos;
395  hasPath = true;
396  }
397 
398  if (hasPath) {
399  // the results inserter stands alone
400  inserter->setTrigResultForStream(streamID.value(), results());
401 
402  results_inserter_ = makeInserter(actions, actReg_, inserter);
404  }
405 
406  // fill normal endpaths
407  int bitpos = 0;
408  end_paths_.reserve(endPathNames.size());
409  for (auto const& end_path_name : endPathNames) {
410  fillEndPath(
411  proc_pset, preg, &prealloc, processConfiguration, bitpos, end_path_name, endPathNames, conditionalTaskHelper);
412  ++bitpos;
413  }
414 
415  makePathStatusInserters(pathStatusInserters, endPathStatusInserters, actions);
416 
417  //See if all modules were used
418  std::set<std::string> usedWorkerLabels;
419  for (auto const& worker : allWorkers()) {
420  usedWorkerLabels.insert(worker->description()->moduleLabel());
421  }
422  std::vector<std::string> modulesInConfig(proc_pset.getParameter<std::vector<std::string>>("@all_modules"));
423  std::set<std::string> modulesInConfigSet(modulesInConfig.begin(), modulesInConfig.end());
424  std::vector<std::string> unusedLabels;
425  set_difference(modulesInConfigSet.begin(),
426  modulesInConfigSet.end(),
427  usedWorkerLabels.begin(),
428  usedWorkerLabels.end(),
429  back_inserter(unusedLabels));
430  std::set<std::string> unscheduledLabels;
431  std::vector<std::string> shouldBeUsedLabels;
432  if (!unusedLabels.empty()) {
433  //Need to
434  // 1) create worker
435  // 2) if it is a WorkerT<EDProducer>, add it to our list
436  // 3) hand list to our delayed reader
437  for (auto const& label : unusedLabels) {
438  bool isTracked;
439  ParameterSet* modulePSet(proc_pset.getPSetForUpdate(label, isTracked));
440  assert(isTracked);
441  assert(modulePSet != nullptr);
443  *modulePSet, preg, &prealloc, processConfiguration, label, unscheduledLabels, shouldBeUsedLabels);
444  }
445  if (!shouldBeUsedLabels.empty()) {
446  std::ostringstream unusedStream;
447  unusedStream << "'" << shouldBeUsedLabels.front() << "'";
448  for (std::vector<std::string>::iterator itLabel = shouldBeUsedLabels.begin() + 1,
449  itLabelEnd = shouldBeUsedLabels.end();
450  itLabel != itLabelEnd;
451  ++itLabel) {
452  unusedStream << ",'" << *itLabel << "'";
453  }
454  LogInfo("path") << "The following module labels are not assigned to any path:\n" << unusedStream.str() << "\n";
455  }
456  }
457  number_of_unscheduled_modules_ = unscheduledLabels.size();
458  } // StreamSchedule::StreamSchedule
459 
461  std::vector<std::string> const& branchesToDeleteEarly,
462  std::multimap<std::string, std::string> const& referencesToBranches,
463  std::vector<std::string> const& modulesToSkip,
464  edm::ProductRegistry const& preg) {
465  // setup the list with those products actually registered for this job
466  std::multimap<std::string, Worker*> branchToReadingWorker;
467  initializeBranchToReadingWorker(branchesToDeleteEarly, preg, branchToReadingWorker);
468 
469  const std::vector<std::string> kEmpty;
470  std::map<Worker*, unsigned int> reserveSizeForWorker;
471  unsigned int upperLimitOnReadingWorker = 0;
472  unsigned int upperLimitOnIndicies = 0;
473  unsigned int nUniqueBranchesToDelete = branchToReadingWorker.size();
474 
475  //talk with output modules first
476  modReg.forAllModuleHolders([&branchToReadingWorker, &nUniqueBranchesToDelete](maker::ModuleHolder* iHolder) {
477  auto comm = iHolder->createOutputModuleCommunicator();
478  if (comm) {
479  if (!branchToReadingWorker.empty()) {
480  //If an OutputModule needs a product, we can't delete it early
481  // so we should remove it from our list
482  SelectedProductsForBranchType const& kept = comm->keptProducts();
483  for (auto const& item : kept[InEvent]) {
484  BranchDescription const& desc = *item.first;
485  auto found = branchToReadingWorker.equal_range(desc.branchName());
486  if (found.first != found.second) {
487  --nUniqueBranchesToDelete;
488  branchToReadingWorker.erase(found.first, found.second);
489  }
490  }
491  }
492  }
493  });
494 
495  if (branchToReadingWorker.empty()) {
496  return;
497  }
498 
499  std::unordered_set<std::string> modulesToExclude(modulesToSkip.begin(), modulesToSkip.end());
500  for (auto w : allWorkers()) {
501  if (modulesToExclude.end() != modulesToExclude.find(w->description()->moduleLabel())) {
502  continue;
503  }
504  //determine if this module could read a branch we want to delete early
505  auto consumes = w->consumesInfo();
506  if (not consumes.empty()) {
507  bool foundAtLeastOneMatchingBranch = false;
508  for (auto const& product : consumes) {
509  std::string branch = fmt::format("{}_{}_{}_{}",
510  product.type().friendlyClassName(),
511  product.label().data(),
512  product.instance().data(),
513  product.process().data());
514  {
515  //Handle case where worker directly consumes product
516  auto found = branchToReadingWorker.end();
517  if (product.process().empty()) {
518  auto startFound = branchToReadingWorker.lower_bound(branch);
519  if (startFound != branchToReadingWorker.end()) {
520  if (startFound->first.substr(0, branch.size()) == branch) {
521  //match all processNames here, even if it means multiple matches will happen
522  found = startFound;
523  }
524  }
525  } else {
526  auto exactFound = branchToReadingWorker.equal_range(branch);
527  if (exactFound.first != exactFound.second) {
528  found = exactFound.first;
529  }
530  }
531  if (found != branchToReadingWorker.end()) {
532  if (not foundAtLeastOneMatchingBranch) {
533  ++upperLimitOnReadingWorker;
534  foundAtLeastOneMatchingBranch = true;
535  }
536  ++upperLimitOnIndicies;
537  ++reserveSizeForWorker[w];
538  if (nullptr == found->second) {
539  found->second = w;
540  } else {
541  branchToReadingWorker.insert(make_pair(found->first, w));
542  }
543  }
544  }
545  {
546  //Handle case where indirectly consumes product
547  auto found = referencesToBranches.end();
548  if (product.process().empty()) {
549  auto startFound = referencesToBranches.lower_bound(branch);
550  if (startFound != referencesToBranches.end()) {
551  if (startFound->first.substr(0, branch.size()) == branch) {
552  //match all processNames here, even if it means multiple matches will happen
553  found = startFound;
554  }
555  }
556  } else {
557  //can match exactly
558  auto exactFound = referencesToBranches.equal_range(branch);
559  if (exactFound.first != exactFound.second) {
560  found = exactFound.first;
561  }
562  }
563  if (found != referencesToBranches.end()) {
564  for (auto itr = found; (itr != referencesToBranches.end()) and (itr->first == found->first); ++itr) {
565  auto foundInBranchToReadingWorker = branchToReadingWorker.find(itr->second);
566  if (foundInBranchToReadingWorker == branchToReadingWorker.end()) {
567  continue;
568  }
569  if (not foundAtLeastOneMatchingBranch) {
570  ++upperLimitOnReadingWorker;
571  foundAtLeastOneMatchingBranch = true;
572  }
573  ++upperLimitOnIndicies;
574  ++reserveSizeForWorker[w];
575  if (nullptr == foundInBranchToReadingWorker->second) {
576  foundInBranchToReadingWorker->second = w;
577  } else {
578  branchToReadingWorker.insert(make_pair(itr->second, w));
579  }
580  }
581  }
582  }
583  }
584  }
585  }
586  {
587  auto it = branchToReadingWorker.begin();
588  std::vector<std::string> unusedBranches;
589  while (it != branchToReadingWorker.end()) {
590  if (it->second == nullptr) {
591  unusedBranches.push_back(it->first);
592  //erasing the object invalidates the iterator so must advance it first
593  auto temp = it;
594  ++it;
595  branchToReadingWorker.erase(temp);
596  } else {
597  ++it;
598  }
599  }
600  if (not unusedBranches.empty()) {
601  LogWarning l("UnusedProductsForCanDeleteEarly");
602  l << "The following products in the 'canDeleteEarly' list are not used in this job and will be ignored.\n"
603  " If possible, remove the producer from the job.";
604  for (auto const& n : unusedBranches) {
605  l << "\n " << n;
606  }
607  }
608  }
609  if (!branchToReadingWorker.empty()) {
610  earlyDeleteHelpers_.reserve(upperLimitOnReadingWorker);
611  earlyDeleteHelperToBranchIndicies_.resize(upperLimitOnIndicies, 0);
612  earlyDeleteBranchToCount_.reserve(nUniqueBranchesToDelete);
613  std::map<const Worker*, EarlyDeleteHelper*> alreadySeenWorkers;
614  std::string lastBranchName;
615  size_t nextOpenIndex = 0;
616  unsigned int* beginAddress = &(earlyDeleteHelperToBranchIndicies_.front());
617  for (auto& branchAndWorker : branchToReadingWorker) {
618  if (lastBranchName != branchAndWorker.first) {
619  //have to put back the period we removed earlier in order to get the proper name
620  BranchID bid(branchAndWorker.first + ".");
621  earlyDeleteBranchToCount_.emplace_back(bid, 0U);
622  lastBranchName = branchAndWorker.first;
623  }
624  auto found = alreadySeenWorkers.find(branchAndWorker.second);
625  if (alreadySeenWorkers.end() == found) {
626  //NOTE: we will set aside enough space in earlyDeleteHelperToBranchIndicies_ to accommodate
627  // all the branches that might be read by this worker. However, initially we will only tell the
628  // EarlyDeleteHelper about the first one. As additional branches are added via 'appendIndex' the
629  // EarlyDeleteHelper will automatically advance its internal end pointer.
630  size_t index = nextOpenIndex;
631  size_t nIndices = reserveSizeForWorker[branchAndWorker.second];
634  earlyDeleteHelpers_.emplace_back(beginAddress + index, beginAddress + index + 1, &earlyDeleteBranchToCount_);
635  branchAndWorker.second->setEarlyDeleteHelper(&(earlyDeleteHelpers_.back()));
636  alreadySeenWorkers.insert(std::make_pair(branchAndWorker.second, &(earlyDeleteHelpers_.back())));
637  nextOpenIndex += nIndices;
638  } else {
639  found->second->appendIndex(earlyDeleteBranchToCount_.size() - 1);
640  }
641  }
642 
643  //Now we can compactify the earlyDeleteHelperToBranchIndicies_ since we may have over estimated the
644  // space needed for each module
645  auto itLast = earlyDeleteHelpers_.begin();
646  for (auto it = earlyDeleteHelpers_.begin() + 1; it != earlyDeleteHelpers_.end(); ++it) {
647  if (itLast->end() != it->begin()) {
648  //figure the offset for next Worker since it hasn't been moved yet so it has the original address
649  unsigned int delta = it->begin() - itLast->end();
650  it->shiftIndexPointers(delta);
651 
653  earlyDeleteHelperToBranchIndicies_.begin() + (itLast->end() - beginAddress),
654  earlyDeleteHelperToBranchIndicies_.begin() + (it->begin() - beginAddress));
655  }
656  itLast = it;
657  }
659  earlyDeleteHelperToBranchIndicies_.begin() + (itLast->end() - beginAddress),
661 
662  //now tell the paths about the deleters
663  for (auto& p : trig_paths_) {
664  p.setEarlyDeleteHelpers(alreadySeenWorkers);
665  }
666  for (auto& p : end_paths_) {
667  p.setEarlyDeleteHelpers(alreadySeenWorkers);
668  }
670  }
671  }
672 
674  Worker* worker,
675  std::unordered_set<std::string>& conditionalModules,
676  std::unordered_multimap<std::string, edm::BranchDescription const*> const& conditionalModuleBranches,
677  std::unordered_multimap<std::string, AliasInfo> const& aliasMap,
678  ParameterSet& proc_pset,
679  ProductRegistry& preg,
680  PreallocationConfiguration const* prealloc,
681  std::shared_ptr<ProcessConfiguration const> processConfiguration) {
682  std::vector<Worker*> returnValue;
683  auto const& consumesInfo = worker->consumesInfo();
684  auto moduleLabel = worker->description()->moduleLabel();
685  using namespace productholderindexhelper;
686  for (auto const& ci : consumesInfo) {
687  if (not ci.skipCurrentProcess() and
688  (ci.process().empty() or ci.process() == processConfiguration->processName())) {
689  auto productModuleLabel = std::string(ci.label());
690  bool productFromConditionalModule = false;
691  auto itFound = conditionalModules.find(productModuleLabel);
692  if (itFound == conditionalModules.end()) {
693  //Check to see if this was an alias
694  //note that aliasMap was previously filtered so only the conditional modules remain there
695  auto foundAlias = findBestMatchingAlias(conditionalModuleBranches, aliasMap, productModuleLabel, ci);
696  if (foundAlias) {
697  productModuleLabel = *foundAlias;
698  productFromConditionalModule = true;
699  itFound = conditionalModules.find(productModuleLabel);
700  //check that the alias-for conditional module has not been used
701  if (itFound == conditionalModules.end()) {
702  continue;
703  }
704  }
705  } else {
706  //need to check the rest of the data product info
707  auto findBranches = conditionalModuleBranches.equal_range(productModuleLabel);
708  for (auto itBranch = findBranches.first; itBranch != findBranches.second; ++itBranch) {
709  if (itBranch->second->productInstanceName() == ci.instance()) {
710  if (ci.kindOfType() == PRODUCT_TYPE) {
711  if (ci.type() == itBranch->second->unwrappedTypeID()) {
712  productFromConditionalModule = true;
713  break;
714  }
715  } else {
716  //this is a view
718  ci.type(), TypeID(itBranch->second->wrappedType().typeInfo()), itBranch->second->className())) {
719  productFromConditionalModule = true;
720  break;
721  }
722  }
723  }
724  }
725  }
726  if (productFromConditionalModule) {
727  auto condWorker =
728  getWorker(productModuleLabel, proc_pset, workerManager_, preg, prealloc, processConfiguration);
729  assert(condWorker);
730 
731  conditionalModules.erase(itFound);
732 
733  auto dependents = tryToPlaceConditionalModules(condWorker,
734  conditionalModules,
735  conditionalModuleBranches,
736  aliasMap,
737  proc_pset,
738  preg,
739  prealloc,
740  processConfiguration);
741  returnValue.insert(returnValue.end(), dependents.begin(), dependents.end());
742  returnValue.push_back(condWorker);
743  }
744  }
745  }
746  return returnValue;
747  }
748 
750  ProductRegistry& preg,
751  PreallocationConfiguration const* prealloc,
752  std::shared_ptr<ProcessConfiguration const> processConfiguration,
753  std::string const& pathName,
754  bool ignoreFilters,
755  PathWorkers& out,
756  std::vector<std::string> const& endPathNames,
757  ConditionalTaskHelper const& conditionalTaskHelper) {
758  vstring modnames = proc_pset.getParameter<vstring>(pathName);
759  PathWorkers tmpworkers;
760 
761  //Pull out ConditionalTask modules
762  auto condRange = findConditionalTaskModulesRange(modnames);
763 
764  std::unordered_set<std::string> conditionalmods;
765  //An EDAlias may be redirecting to a module on a ConditionalTask
766  std::unordered_multimap<std::string, edm::BranchDescription const*> conditionalModsBranches;
767  std::unordered_map<std::string, unsigned int> conditionalModOrder;
768  if (condRange.first != condRange.second) {
769  for (auto it = condRange.first; it != condRange.second; ++it) {
770  // ordering needs to skip the # token in the path list
771  conditionalModOrder.emplace(*it, it - modnames.begin() - 1);
772  }
773  //the last entry should be ignored since it is required to be "@"
774  conditionalmods = std::unordered_set<std::string>(std::make_move_iterator(condRange.first),
775  std::make_move_iterator(condRange.second));
776 
777  conditionalModsBranches = conditionalTaskHelper.conditionalModuleBranches(conditionalmods);
778  modnames.erase(std::prev(condRange.first), modnames.end());
779  }
780 
781  unsigned int placeInPath = 0;
782  for (auto const& name : modnames) {
783  //Modules except EDFilters are set to run concurrently by default
784  bool doNotRunConcurrently = false;
786  if (name[0] == '!') {
787  filterAction = WorkerInPath::Veto;
788  } else if (name[0] == '-' or name[0] == '+') {
789  filterAction = WorkerInPath::Ignore;
790  }
791  if (name[0] == '|' or name[0] == '+') {
792  //cms.wait was specified so do not run concurrently
793  doNotRunConcurrently = true;
794  }
795 
797  if (filterAction != WorkerInPath::Normal or name[0] == '|') {
798  moduleLabel.erase(0, 1);
799  }
800 
801  Worker* worker = getWorker(moduleLabel, proc_pset, workerManager_, preg, prealloc, processConfiguration);
802  if (worker == nullptr) {
803  std::string pathType("endpath");
804  if (!search_all(endPathNames, pathName)) {
805  pathType = std::string("path");
806  }
808  << "The unknown module label \"" << moduleLabel << "\" appears in " << pathType << " \"" << pathName
809  << "\"\n please check spelling or remove that label from the path.";
810  }
811 
812  if (ignoreFilters && filterAction != WorkerInPath::Ignore && worker->moduleType() == Worker::kFilter) {
813  // We have a filter on an end path, and the filter is not explicitly ignored.
814  // See if the filter is allowed.
815  std::vector<std::string> allowed_filters = proc_pset.getUntrackedParameter<vstring>("@filters_on_endpaths");
816  if (!search_all(allowed_filters, worker->description()->moduleName())) {
817  // Filter is not allowed. Ignore the result, and issue a warning.
818  filterAction = WorkerInPath::Ignore;
819  LogWarning("FilterOnEndPath") << "The EDFilter '" << worker->description()->moduleName()
820  << "' with module label '" << moduleLabel << "' appears on EndPath '"
821  << pathName << "'.\n"
822  << "The return value of the filter will be ignored.\n"
823  << "To suppress this warning, either remove the filter from the endpath,\n"
824  << "or explicitly ignore it in the configuration by using cms.ignore().\n";
825  }
826  }
827  bool runConcurrently = not doNotRunConcurrently;
828  if (runConcurrently && worker->moduleType() == Worker::kFilter and filterAction != WorkerInPath::Ignore) {
829  runConcurrently = false;
830  }
831 
832  auto condModules = tryToPlaceConditionalModules(worker,
833  conditionalmods,
834  conditionalModsBranches,
835  conditionalTaskHelper.aliasMap(),
836  proc_pset,
837  preg,
838  prealloc,
839  processConfiguration);
840  for (auto condMod : condModules) {
841  tmpworkers.emplace_back(
842  condMod, WorkerInPath::Ignore, conditionalModOrder[condMod->description()->moduleLabel()], true);
843  }
844 
845  tmpworkers.emplace_back(worker, filterAction, placeInPath, runConcurrently);
846  ++placeInPath;
847  }
848 
849  out.swap(tmpworkers);
850  }
851 
853  ProductRegistry& preg,
854  PreallocationConfiguration const* prealloc,
855  std::shared_ptr<ProcessConfiguration const> processConfiguration,
856  int bitpos,
857  std::string const& name,
858  TrigResPtr trptr,
859  std::vector<std::string> const& endPathNames,
860  ConditionalTaskHelper const& conditionalTaskHelper) {
861  PathWorkers tmpworkers;
862  fillWorkers(
863  proc_pset, preg, prealloc, processConfiguration, name, false, tmpworkers, endPathNames, conditionalTaskHelper);
864 
865  // an empty path will cause an extra bit that is not used
866  if (!tmpworkers.empty()) {
867  trig_paths_.emplace_back(bitpos,
868  name,
869  tmpworkers,
870  trptr,
871  actionTable(),
872  actReg_,
876  } else {
877  empty_trig_paths_.push_back(bitpos);
878  }
879  for (WorkerInPath const& workerInPath : tmpworkers) {
880  addToAllWorkers(workerInPath.getWorker());
881  }
882  }
883 
885  ProductRegistry& preg,
886  PreallocationConfiguration const* prealloc,
887  std::shared_ptr<ProcessConfiguration const> processConfiguration,
888  int bitpos,
889  std::string const& name,
890  std::vector<std::string> const& endPathNames,
891  ConditionalTaskHelper const& conditionalTaskHelper) {
892  PathWorkers tmpworkers;
893  fillWorkers(
894  proc_pset, preg, prealloc, processConfiguration, name, true, tmpworkers, endPathNames, conditionalTaskHelper);
895 
896  if (!tmpworkers.empty()) {
897  //EndPaths are not supposed to stop if SkipEvent type exception happens
898  end_paths_.emplace_back(bitpos,
899  name,
900  tmpworkers,
901  TrigResPtr(),
902  actionTable(),
903  actReg_,
905  nullptr,
907  } else {
908  empty_end_paths_.push_back(bitpos);
909  }
910  for (WorkerInPath const& workerInPath : tmpworkers) {
911  addToAllWorkers(workerInPath.getWorker());
912  }
913  }
914 
916 
918 
920  Worker* found = nullptr;
921  for (auto const& worker : allWorkers()) {
922  if (worker->description()->moduleLabel() == iLabel) {
923  found = worker;
924  break;
925  }
926  }
927  if (nullptr == found) {
928  return;
929  }
930 
931  iMod->replaceModuleFor(found);
932  found->beginStream(streamID_, streamContext_);
933  }
934 
936 
937  std::vector<ModuleDescription const*> StreamSchedule::getAllModuleDescriptions() const {
938  std::vector<ModuleDescription const*> result;
939  result.reserve(allWorkers().size());
940 
941  for (auto const& worker : allWorkers()) {
942  ModuleDescription const* p = worker->description();
943  result.push_back(p);
944  }
945  return result;
946  }
947 
949  WaitingTaskHolder iTask,
951  ServiceToken const& serviceToken,
952  std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters) {
953  EventPrincipal& ep = info.principal();
954 
955  // Caught exception is propagated via WaitingTaskHolder
956  CMS_SA_ALLOW try {
957  this->resetAll();
958 
960 
961  Traits::setStreamContext(streamContext_, ep);
962  //a service may want to communicate with another service
963  ServiceRegistry::Operate guard(serviceToken);
964  Traits::preScheduleSignal(actReg_.get(), &streamContext_);
965 
966  // Data dependencies need to be set up before marking empty
967  // (End)Paths complete in case something consumes the status of
968  // the empty (EndPath)
971 
972  HLTPathStatus hltPathStatus(hlt::Pass, 0);
973  for (int empty_trig_path : empty_trig_paths_) {
974  results_->at(empty_trig_path) = hltPathStatus;
975  pathStatusInserters[empty_trig_path]->setPathStatus(streamID_, hltPathStatus);
976  std::exception_ptr except = pathStatusInserterWorkers_[empty_trig_path]
979  if (except) {
980  iTask.doneWaiting(except);
981  return;
982  }
983  }
984  for (int empty_end_path : empty_end_paths_) {
985  std::exception_ptr except = endPathStatusInserterWorkers_[empty_end_path]
988  if (except) {
989  iTask.doneWaiting(except);
990  return;
991  }
992  }
993 
994  ++total_events_;
995 
996  //use to give priorities on an error to ones from Paths
997  auto pathErrorHolder = std::make_unique<std::atomic<std::exception_ptr*>>(nullptr);
998  auto pathErrorPtr = pathErrorHolder.get();
999  ServiceWeakToken weakToken = serviceToken;
1000  auto allPathsDone = make_waiting_task(
1001  [iTask, this, weakToken, pathError = std::move(pathErrorHolder)](std::exception_ptr const* iPtr) mutable {
1002  ServiceRegistry::Operate operate(weakToken.lock());
1003 
1004  std::exception_ptr ptr;
1005  if (pathError->load()) {
1006  ptr = *pathError->load();
1007  delete pathError->load();
1008  }
1009  if ((not ptr) and iPtr) {
1010  ptr = *iPtr;
1011  }
1012  iTask.doneWaiting(finishProcessOneEvent(ptr));
1013  });
1014  //The holder guarantees that if the paths finish before the loop ends
1015  // that we do not start too soon. It also guarantees that the task will
1016  // run under that condition.
1017  WaitingTaskHolder allPathsHolder(*iTask.group(), allPathsDone);
1018 
1019  auto pathsDone = make_waiting_task([allPathsHolder, pathErrorPtr, transitionInfo = info, this, weakToken](
1020  std::exception_ptr const* iPtr) mutable {
1021  ServiceRegistry::Operate operate(weakToken.lock());
1022 
1023  if (iPtr) {
1024  //this is used to prioritize this error over one
1025  // that happens in EndPath or Accumulate
1026  pathErrorPtr->store(new std::exception_ptr(*iPtr));
1027  }
1028  finishedPaths(*pathErrorPtr, std::move(allPathsHolder), transitionInfo);
1029  });
1030 
1031  //The holder guarantees that if the paths finish before the loop ends
1032  // that we do not start too soon. It also guarantees that the task will
1033  // run under that condition.
1034  WaitingTaskHolder taskHolder(*iTask.group(), pathsDone);
1035 
1036  //start end paths first so on single threaded the paths will run first
1037  WaitingTaskHolder hAllPathsDone(*iTask.group(), allPathsDone);
1038  for (auto it = end_paths_.rbegin(), itEnd = end_paths_.rend(); it != itEnd; ++it) {
1039  it->processOneOccurrenceAsync(hAllPathsDone, info, serviceToken, streamID_, &streamContext_);
1040  }
1041 
1042  for (auto it = trig_paths_.rbegin(), itEnd = trig_paths_.rend(); it != itEnd; ++it) {
1043  it->processOneOccurrenceAsync(taskHolder, info, serviceToken, streamID_, &streamContext_);
1044  }
1045 
1046  ParentContext parentContext(&streamContext_);
1048  hAllPathsDone, info, serviceToken, streamID_, parentContext, &streamContext_);
1049  } catch (...) {
1050  iTask.doneWaiting(std::current_exception());
1051  }
1052  }
1053 
1054  void StreamSchedule::finishedPaths(std::atomic<std::exception_ptr*>& iExcept,
1055  WaitingTaskHolder iWait,
1057  if (iExcept) {
1058  // Caught exception is propagated via WaitingTaskHolder
1059  CMS_SA_ALLOW try { std::rethrow_exception(*(iExcept.load())); } catch (cms::Exception& e) {
1064  edm::printCmsExceptionWarning("SkipEvent", e);
1065  *(iExcept.load()) = std::exception_ptr();
1066  } else {
1067  *(iExcept.load()) = std::current_exception();
1068  }
1069  } catch (...) {
1070  *(iExcept.load()) = std::current_exception();
1071  }
1072  }
1073 
1074  if ((not iExcept) and results_->accept()) {
1075  ++total_passed_;
1076  }
1077 
1078  if (nullptr != results_inserter_.get()) {
1079  // Caught exception is propagated to the caller
1080  CMS_SA_ALLOW try {
1081  //Even if there was an exception, we need to allow results inserter
1082  // to run since some module may be waiting on its results.
1083  ParentContext parentContext(&streamContext_);
1085 
1086  auto expt = results_inserter_->runModuleDirectly<Traits>(info, streamID_, parentContext, &streamContext_);
1087  if (expt) {
1088  std::rethrow_exception(expt);
1089  }
1090  } catch (cms::Exception& ex) {
1091  if (not iExcept) {
1092  if (ex.context().empty()) {
1093  std::ostringstream ost;
1094  ost << "Processing Event " << info.principal().id();
1095  ex.addContext(ost.str());
1096  }
1097  iExcept.store(new std::exception_ptr(std::current_exception()));
1098  }
1099  } catch (...) {
1100  if (not iExcept) {
1101  iExcept.store(new std::exception_ptr(std::current_exception()));
1102  }
1103  }
1104  }
1105  std::exception_ptr ptr;
1106  if (iExcept) {
1107  ptr = *iExcept.load();
1108  }
1109  iWait.doneWaiting(ptr);
1110  }
1111 
1112  std::exception_ptr StreamSchedule::finishProcessOneEvent(std::exception_ptr iExcept) {
1114 
1115  if (iExcept) {
1116  //add context information to the exception and print message
1117  try {
1118  convertException::wrap([&]() { std::rethrow_exception(iExcept); });
1119  } catch (cms::Exception& ex) {
1120  bool const cleaningUpAfterException = false;
1121  if (ex.context().empty()) {
1122  addContextAndPrintException("Calling function StreamSchedule::processOneEvent", ex, cleaningUpAfterException);
1123  } else {
1124  addContextAndPrintException("", ex, cleaningUpAfterException);
1125  }
1126  iExcept = std::current_exception();
1127  }
1128 
1129  actReg_->preStreamEarlyTerminationSignal_(streamContext_, TerminationOrigin::ExceptionFromThisContext);
1130  }
1131  // Caught exception is propagated to the caller
1132  CMS_SA_ALLOW try { Traits::postScheduleSignal(actReg_.get(), &streamContext_); } catch (...) {
1133  if (not iExcept) {
1134  iExcept = std::current_exception();
1135  }
1136  }
1137  if (not iExcept) {
1138  resetEarlyDelete();
1139  }
1140 
1141  return iExcept;
1142  }
1143 
1144  void StreamSchedule::availablePaths(std::vector<std::string>& oLabelsToFill) const {
1145  oLabelsToFill.reserve(trig_paths_.size());
1146  std::transform(trig_paths_.begin(),
1147  trig_paths_.end(),
1148  std::back_inserter(oLabelsToFill),
1149  std::bind(&Path::name, std::placeholders::_1));
1150  }
1151 
1152  void StreamSchedule::modulesInPath(std::string const& iPathLabel, std::vector<std::string>& oLabelsToFill) const {
1153  TrigPaths::const_iterator itFound = std::find_if(
1154  trig_paths_.begin(),
1155  trig_paths_.end(),
1156  std::bind(std::equal_to<std::string>(), iPathLabel, std::bind(&Path::name, std::placeholders::_1)));
1157  if (itFound != trig_paths_.end()) {
1158  oLabelsToFill.reserve(itFound->size());
1159  for (size_t i = 0; i < itFound->size(); ++i) {
1160  oLabelsToFill.push_back(itFound->getWorker(i)->description()->moduleLabel());
1161  }
1162  }
1163  }
1164 
1166  std::vector<ModuleDescription const*>& descriptions,
1167  unsigned int hint) const {
1168  descriptions.clear();
1169  bool found = false;
1170  TrigPaths::const_iterator itFound;
1171 
1172  if (hint < trig_paths_.size()) {
1173  itFound = trig_paths_.begin() + hint;
1174  if (itFound->name() == iPathLabel)
1175  found = true;
1176  }
1177  if (!found) {
1178  // if the hint did not work, do it the slow way
1179  itFound = std::find_if(
1180  trig_paths_.begin(),
1181  trig_paths_.end(),
1182  std::bind(std::equal_to<std::string>(), iPathLabel, std::bind(&Path::name, std::placeholders::_1)));
1183  if (itFound != trig_paths_.end())
1184  found = true;
1185  }
1186  if (found) {
1187  descriptions.reserve(itFound->size());
1188  for (size_t i = 0; i < itFound->size(); ++i) {
1189  descriptions.push_back(itFound->getWorker(i)->description());
1190  }
1191  }
1192  }
1193 
1195  std::vector<ModuleDescription const*>& descriptions,
1196  unsigned int hint) const {
1197  descriptions.clear();
1198  bool found = false;
1199  TrigPaths::const_iterator itFound;
1200 
1201  if (hint < end_paths_.size()) {
1202  itFound = end_paths_.begin() + hint;
1203  if (itFound->name() == iEndPathLabel)
1204  found = true;
1205  }
1206  if (!found) {
1207  // if the hint did not work, do it the slow way
1208  itFound = std::find_if(
1209  end_paths_.begin(),
1210  end_paths_.end(),
1211  std::bind(std::equal_to<std::string>(), iEndPathLabel, std::bind(&Path::name, std::placeholders::_1)));
1212  if (itFound != end_paths_.end())
1213  found = true;
1214  }
1215  if (found) {
1216  descriptions.reserve(itFound->size());
1217  for (size_t i = 0; i < itFound->size(); ++i) {
1218  descriptions.push_back(itFound->getWorker(i)->description());
1219  }
1220  }
1221  }
1222 
1223  static void fillModuleInPathSummary(Path const& path, size_t which, ModuleInPathSummary& sum) {
1224  sum.timesVisited += path.timesVisited(which);
1225  sum.timesPassed += path.timesPassed(which);
1226  sum.timesFailed += path.timesFailed(which);
1227  sum.timesExcept += path.timesExcept(which);
1228  sum.moduleLabel = path.getWorker(which)->description()->moduleLabel();
1229  sum.bitPosition = path.bitPosition(which);
1230  }
1231 
1232  static void fillPathSummary(Path const& path, PathSummary& sum) {
1233  sum.name = path.name();
1234  sum.bitPosition = path.bitPosition();
1235  sum.timesRun += path.timesRun();
1236  sum.timesPassed += path.timesPassed();
1237  sum.timesFailed += path.timesFailed();
1238  sum.timesExcept += path.timesExcept();
1239 
1240  Path::size_type sz = path.size();
1241  if (sum.moduleInPathSummaries.empty()) {
1242  std::vector<ModuleInPathSummary> temp(sz);
1243  for (size_t i = 0; i != sz; ++i) {
1245  }
1246  sum.moduleInPathSummaries.swap(temp);
1247  } else {
1248  assert(sz == sum.moduleInPathSummaries.size());
1249  for (size_t i = 0; i != sz; ++i) {
1251  }
1252  }
1253  }
1254 
1255  static void fillWorkerSummaryAux(Worker const& w, WorkerSummary& sum) {
1256  sum.timesVisited += w.timesVisited();
1257  sum.timesRun += w.timesRun();
1258  sum.timesPassed += w.timesPassed();
1259  sum.timesFailed += w.timesFailed();
1260  sum.timesExcept += w.timesExcept();
1261  sum.moduleLabel = w.description()->moduleLabel();
1262  }
1263 
1264  static void fillWorkerSummary(Worker const* pw, WorkerSummary& sum) { fillWorkerSummaryAux(*pw, sum); }
1265 
1267  rep.eventSummary.totalEvents += totalEvents();
1268  rep.eventSummary.totalEventsPassed += totalEventsPassed();
1269  rep.eventSummary.totalEventsFailed += totalEventsFailed();
1270 
1271  fill_summary(trig_paths_, rep.trigPathSummaries, &fillPathSummary);
1272  fill_summary(end_paths_, rep.endPathSummaries, &fillPathSummary);
1273  fill_summary(allWorkers(), rep.workerSummaries, &fillWorkerSummary);
1274  }
1275 
1277  using std::placeholders::_1;
1279  for_all(trig_paths_, std::bind(&Path::clearCounters, _1));
1280  for_all(end_paths_, std::bind(&Path::clearCounters, _1));
1281  for_all(allWorkers(), std::bind(&Worker::clearCounters, _1));
1282  }
1283 
1285  skippingEvent_ = false;
1286  results_->reset();
1287  }
1288 
1290 
1292  //must be sure we have cleared the count first
1293  for (auto& count : earlyDeleteBranchToCount_) {
1294  count.count = 0;
1295  }
1296  //now reset based on how many helpers use that branch
1298  ++(earlyDeleteBranchToCount_[index].count);
1299  }
1300  for (auto& helper : earlyDeleteHelpers_) {
1301  helper.reset();
1302  }
1303  }
1304 
1306  std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
1307  std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
1309  int bitpos = 0;
1310  unsigned int indexEmpty = 0;
1311  unsigned int indexOfPath = 0;
1312  for (auto& pathStatusInserter : pathStatusInserters) {
1313  std::shared_ptr<PathStatusInserter> inserterPtr = get_underlying(pathStatusInserter);
1314  WorkerPtr workerPtr(
1315  new edm::WorkerT<PathStatusInserter::ModuleType>(inserterPtr, inserterPtr->moduleDescription(), &actions));
1316  pathStatusInserterWorkers_.emplace_back(workerPtr);
1317  workerPtr->setActivityRegistry(actReg_);
1318  addToAllWorkers(workerPtr.get());
1319 
1320  // A little complexity here because a C++ Path object is not
1321  // instantiated and put into end_paths if there are no modules
1322  // on the configured path.
1323  if (indexEmpty < empty_trig_paths_.size() && bitpos == empty_trig_paths_.at(indexEmpty)) {
1324  ++indexEmpty;
1325  } else {
1326  trig_paths_.at(indexOfPath).setPathStatusInserter(inserterPtr.get(), workerPtr.get());
1327  ++indexOfPath;
1328  }
1329  ++bitpos;
1330  }
1331 
1332  bitpos = 0;
1333  indexEmpty = 0;
1334  indexOfPath = 0;
1335  for (auto& endPathStatusInserter : endPathStatusInserters) {
1336  std::shared_ptr<EndPathStatusInserter> inserterPtr = get_underlying(endPathStatusInserter);
1337  WorkerPtr workerPtr(
1338  new edm::WorkerT<EndPathStatusInserter::ModuleType>(inserterPtr, inserterPtr->moduleDescription(), &actions));
1339  endPathStatusInserterWorkers_.emplace_back(workerPtr);
1340  workerPtr->setActivityRegistry(actReg_);
1341  addToAllWorkers(workerPtr.get());
1342 
1343  // A little complexity here because a C++ Path object is not
1344  // instantiated and put into end_paths if there are no modules
1345  // on the configured path.
1346  if (indexEmpty < empty_end_paths_.size() && bitpos == empty_end_paths_.at(indexEmpty)) {
1347  ++indexEmpty;
1348  } else {
1349  end_paths_.at(indexOfPath).setPathStatusInserter(nullptr, workerPtr.get());
1350  ++indexOfPath;
1351  }
1352  ++bitpos;
1353  }
1354  }
1355 } // namespace edm
static void fillModuleInPathSummary(Path const &path, size_t which, ModuleInPathSummary &sum)
size
Write out results.
void initializeEarlyDelete(ModuleRegistry &modReg, std::vector< std::string > const &branchesToDeleteEarly, std::multimap< std::string, std::string > const &referencesToBranches, std::vector< std::string > const &modulesToSkip, edm::ProductRegistry const &preg)
void moduleDescriptionsInEndPath(std::string const &iEndPathLabel, std::vector< ModuleDescription const *> &descriptions, unsigned int hint) const
ExceptionToActionTable const & actionTable() const
returns the action table
T getParameter(std::string const &) const
Definition: ParameterSet.h:303
void fillTrigPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, TrigResPtr, std::vector< std::string > const &endPathNames, ConditionalTaskHelper const &conditionalTaskHelper)
static const TGPicture * info(bool iBackgroundIsBlack)
#define CMS_SA_ALLOW
std::vector< int > empty_trig_paths_
Definition: helper.py:1
roAction_t actions[nactions]
Definition: GenABIO.cc:181
std::vector< edm::propagate_const< WorkerPtr > > pathStatusInserterWorkers_
void endStream(StreamID iID, StreamContext &streamContext)
virtual void replaceModuleFor(Worker *) const =0
ProductList const & productList() const
std::unordered_multimap< std::string, AliasInfo > const & aliasMap() const
void processAccumulatorsAsync(WaitingTaskHolder, typename T::TransitionInfoType const &, ServiceToken const &, StreamID, ParentContext const &, typename T::Context const *)
std::array< SelectedProducts, NumBranchTypes > SelectedProductsForBranchType
static PFTauRenderPlugin instance
T w() const
std::shared_ptr< HLTGlobalStatus > TrigResPtr
WorkersInPath::size_type size_type
Definition: Path.h:46
int totalEventsPassed() const
ret
prodAgent to be discontinued
void modulesInPath(std::string const &iPathLabel, std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel ...
void processOneEventAsync(WaitingTaskHolder iTask, EventTransitionInfo &, ServiceToken const &token, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters)
std::vector< BranchToCount > earlyDeleteBranchToCount_
std::unordered_multimap< std::string, AliasInfo > aliasMap_
void setupOnDemandSystem(EventTransitionInfo const &)
void clearCounters()
Definition: Worker.h:232
void addContextAndPrintException(char const *context, cms::Exception &ex, bool disablePrint)
void addToAllWorkers(Worker *w)
std::string const & moduleName() const
StreamSchedule(std::shared_ptr< TriggerResultInserter > inserter, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, std::shared_ptr< ModuleRegistry >, ParameterSet &proc_pset, service::TriggerNamesService const &tns, PreallocationConfiguration const &prealloc, ProductRegistry &pregistry, ExceptionToActionTable const &actions, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, StreamID streamID, ProcessContext const *processContext)
std::vector< Worker * > tryToPlaceConditionalModules(Worker *, std::unordered_set< std::string > &conditionalModules, std::unordered_multimap< std::string, edm::BranchDescription const *> const &conditionalModuleBranches, std::unordered_multimap< std::string, AliasInfo > const &aliasMap, ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration)
edm::propagate_const< WorkerPtr > results_inserter_
void deleteModuleIfExists(std::string const &moduleLabel)
std::shared_ptr< Worker > WorkerPtr
void find(edm::Handle< EcalRecHitCollection > &hits, DetId thisDet, std::vector< EcalRecHitCollection::const_iterator > &hit, bool debug=false)
Definition: FindCaloHit.cc:19
assert(be >=bs)
StreamID streamID() const
ConditionalTaskHelper(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, WorkerManager &workerManager, std::vector< std::string > const &trigPathNames)
unsigned int number_of_unscheduled_modules_
int totalEventsFailed() const
TEMPL(T2) struct Divides void
Definition: Factorize.h:24
WorkerManager workerManager_
std::shared_ptr< ActivityRegistry > actReg_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
T getUntrackedParameter(std::string const &, T const &) const
constexpr element_type const * get() const
std::string moduleLabel
Definition: TriggerReport.h:52
oneapi::tbb::task_group * group() const noexcept
void processEDAliases(std::vector< std::string > const &aliasNamesToProcess, std::unordered_set< std::string > const &aliasModulesToProcess, ParameterSet const &proc_pset, std::string const &processName, ProductRegistry &preg)
char const * label
std::vector< WorkerInPath > PathWorkers
std::vector< int > empty_end_paths_
void doneWaiting(std::exception_ptr iExcept)
accept
Definition: HLTenums.h:18
void fillEndPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, std::vector< std::string > const &endPathNames, ConditionalTaskHelper const &conditionalTaskHelper)
void processSwitchEDAliases(ParameterSet const &proc_pset, ProductRegistry &preg, ProcessConfiguration const &processConfiguration, std::unordered_set< std::string > const &allConditionalMods)
std::string name
Definition: TriggerReport.h:41
void moduleDescriptionsInPath(std::string const &iPathLabel, std::vector< ModuleDescription const *> &descriptions, unsigned int hint) const
TrigResConstPtr results() const
Strings const & getTrigPaths() const
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
def unique(seq, keepstr=True)
Definition: tier0.py:24
Definition: Path.h:41
StreamContext streamContext_
std::vector< std::string > vstring
virtual std::vector< ConsumesInfo > consumesInfo() const =0
static void fillPathSummary(Path const &path, PathSummary &sum)
void finishedPaths(std::atomic< std::exception_ptr *> &, WaitingTaskHolder, EventTransitionInfo &)
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:92
bool typeIsViewCompatible(TypeID const &requestedViewType, TypeID const &wrappedtypeID, std::string const &className)
rep
Definition: cuy.py:1189
std::exception_ptr finishProcessOneEvent(std::exception_ptr)
ServiceToken lock() const
Definition: ServiceToken.h:101
void deleteModule(std::string const &iLabel)
Delete the module with label iLabel.
int totalEvents() const
static void fillWorkerSummaryAux(Worker const &w, WorkerSummary &sum)
virtual Types moduleType() const =0
Log< level::Info, false > LogInfo
void beginStream(StreamID iID, StreamContext &streamContext)
ModuleDescription const * description() const
Definition: Worker.h:198
std::string const & className() const
Definition: TypeID.cc:40
void clearCounters()
Clear all the counters in the trigger report.
void fillAliasMap(ParameterSet const &proc_pset, std::unordered_set< std::string > const &allConditionalMods)
Strings const & getEndPaths() const
void getTriggerReport(TriggerReport &rep) const
void addToUnscheduledWorkers(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string label, std::set< std::string > &unscheduledLabels, std::vector< std::string > &shouldBeUsedLabels)
exception_actions::ActionCodes find(const std::string &category) const
std::vector< unsigned int > earlyDeleteHelperToBranchIndicies_
void forAllModuleHolders(F iFunc)
edm::propagate_const< TrigResPtr > results_
bool search_all(ForwardSequence const &s, Datum const &d)
Definition: Algorithms.h:36
double b
Definition: hdecay.h:120
constexpr T & get_underlying(propagate_const< T > &)
void addContext(std::string const &context)
Definition: Exception.cc:169
virtual std::unique_ptr< OutputModuleCommunicator > createOutputModuleCommunicator()=0
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
static void fillWorkerSummary(Worker const *pw, WorkerSummary &sum)
std::string const & processName() const
std::atomic< bool > skippingEvent_
Definition: plugin.cc:23
HLT enums.
void setupResolvers(Principal &principal)
void replaceModule(maker::ModuleHolder *iMod, std::string const &iLabel)
clone the type of module with label iLabel but configure with iPSet.
void fillWorkers(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &name, bool ignoreFilters, PathWorkers &out, std::vector< std::string > const &endPathNames, ConditionalTaskHelper const &conditionalTaskHelper)
std::vector< edm::propagate_const< WorkerPtr > > endPathStatusInserterWorkers_
void makePathStatusInserters(std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, ExceptionToActionTable const &actions)
def which(cmd)
Definition: eostools.py:336
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
std::unordered_multimap< std::string, edm::BranchDescription const * > conditionalModuleBranches(std::unordered_set< std::string > const &conditionalmods) const
std::vector< ModuleInPathSummary > moduleInPathSummaries
Definition: TriggerReport.h:42
void availablePaths(std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill the labels for all paths in the process
unsigned int value() const
Definition: StreamID.h:43
auto wrap(F iFunc) -> decltype(iFunc())
std::unordered_multimap< std::string, edm::BranchDescription const * > conditionalModsBranches_
Log< level::Warning, false > LogWarning
std::list< std::string > const & context() const
Definition: Exception.cc:151
long double T
std::string const & moduleLabel() const
T mod(const T &a, const T &b)
Definition: ecalDccMap.h:4
std::string const & name() const
Definition: Path.h:74
std::vector< std::string > vstring
Definition: Schedule.cc:482
std::vector< std::string > set_difference(std::vector< std::string > const &v1, std::vector< std::string > const &v2)
def move(src, dest)
Definition: eostools.py:511
void clearCounters()
Definition: Path.cc:198
std::vector< EarlyDeleteHelper > earlyDeleteHelpers_
void addToAllWorkers(Worker *w)
void printCmsExceptionWarning(char const *behavior, cms::Exception const &e)
ParameterSet * getPSetForUpdate(std::string const &name, bool &isTracked)
std::vector< std::string > set_intersection(std::vector< std::string > const &v1, std::vector< std::string > const &v2)
unsigned transform(const HcalDetId &id, unsigned transformCode)