CMS 3D CMS Logo

StreamSchedule.cc
Go to the documentation of this file.
2 
29 
31 #include "processEDAliases.h"
32 
33 #include <algorithm>
34 #include <cassert>
35 #include <cstdlib>
36 #include <functional>
37 #include <iomanip>
38 #include <limits>
39 #include <list>
40 #include <map>
41 #include <exception>
42 #include <fmt/format.h>
43 
44 namespace edm {
45 
46  namespace {
47 
48  // Function template to transform each element in the input range to
49  // a value placed into the output range. The supplied function
50  // should take a const_reference to the 'input', and write to a
51  // reference to the 'output'.
52  template <typename InputIterator, typename ForwardIterator, typename Func>
53  void transform_into(InputIterator begin, InputIterator end, ForwardIterator out, Func func) {
54  for (; begin != end; ++begin, ++out)
55  func(*begin, *out);
56  }
57 
58  // Function template that takes a sequence 'from', a sequence
59  // 'to', and a callable object 'func'. It and applies
60  // transform_into to fill the 'to' sequence with the values
61  // calcuated by the callable object, taking care to fill the
62  // outupt only if all calls succeed.
63  template <typename FROM, typename TO, typename FUNC>
64  void fill_summary(FROM const& from, TO& to, FUNC func) {
65  if (to.size() != from.size()) {
66  TO temp(from.size());
67  transform_into(from.begin(), from.end(), temp.begin(), func);
68  to.swap(temp);
69  } else {
70  transform_into(from.begin(), from.end(), to.begin(), func);
71  }
72  }
73 
74  // -----------------------------
75 
76  // Here we make the trigger results inserter directly. This should
77  // probably be a utility in the WorkerRegistry or elsewhere.
78 
79  StreamSchedule::WorkerPtr makeInserter(ExceptionToActionTable const& actions,
80  std::shared_ptr<ActivityRegistry> areg,
81  std::shared_ptr<TriggerResultInserter> inserter) {
83  new edm::WorkerT<TriggerResultInserter::ModuleType>(inserter, inserter->moduleDescription(), &actions));
84  ptr->setActivityRegistry(areg);
85  return ptr;
86  }
87 
88  void initializeBranchToReadingWorker(std::vector<std::string> const& branchesToDeleteEarly,
89  ProductRegistry const& preg,
90  std::multimap<std::string, Worker*>& branchToReadingWorker) {
91  auto vBranchesToDeleteEarly = branchesToDeleteEarly;
92  // Remove any duplicates
93  std::sort(vBranchesToDeleteEarly.begin(), vBranchesToDeleteEarly.end(), std::less<std::string>());
94  vBranchesToDeleteEarly.erase(std::unique(vBranchesToDeleteEarly.begin(), vBranchesToDeleteEarly.end()),
95  vBranchesToDeleteEarly.end());
96 
97  // Are the requested items in the product registry?
98  auto allBranchNames = preg.allBranchNames();
99  //the branch names all end with a period, which we do not want to compare with
100  for (auto& b : allBranchNames) {
101  b.resize(b.size() - 1);
102  }
103  std::sort(allBranchNames.begin(), allBranchNames.end(), std::less<std::string>());
104  std::vector<std::string> temp;
105  temp.reserve(vBranchesToDeleteEarly.size());
106 
107  std::set_intersection(vBranchesToDeleteEarly.begin(),
108  vBranchesToDeleteEarly.end(),
109  allBranchNames.begin(),
110  allBranchNames.end(),
111  std::back_inserter(temp));
112  vBranchesToDeleteEarly.swap(temp);
113  if (temp.size() != vBranchesToDeleteEarly.size()) {
114  std::vector<std::string> missingProducts;
115  std::set_difference(temp.begin(),
116  temp.end(),
117  vBranchesToDeleteEarly.begin(),
118  vBranchesToDeleteEarly.end(),
119  std::back_inserter(missingProducts));
120  LogInfo l("MissingProductsForCanDeleteEarly");
121  l << "The following products in the 'canDeleteEarly' list are not available in this job and will be ignored.";
122  for (auto const& n : missingProducts) {
123  l << "\n " << n;
124  }
125  }
126  //set placeholder for the branch, we will remove the nullptr if a
127  // module actually wants the branch.
128  for (auto const& branch : vBranchesToDeleteEarly) {
129  branchToReadingWorker.insert(std::make_pair(branch, static_cast<Worker*>(nullptr)));
130  }
131  }
132 
133  Worker* getWorker(std::string const& moduleLabel,
134  ParameterSet& proc_pset,
135  WorkerManager& workerManager,
136  ProductRegistry& preg,
137  PreallocationConfiguration const* prealloc,
138  std::shared_ptr<ProcessConfiguration const> processConfiguration) {
139  bool isTracked;
140  ParameterSet* modpset = proc_pset.getPSetForUpdate(moduleLabel, isTracked);
141  if (modpset == nullptr) {
142  return nullptr;
143  }
144  assert(isTracked);
145 
146  return workerManager.getWorker(*modpset, preg, prealloc, processConfiguration, moduleLabel);
147  }
148 
149  // If ConditionalTask modules exist in the container of module
150  // names, returns the range (std::pair) for the modules. The range
151  // excludes the special markers '#' (right before the
152  // ConditionalTask modules) and '@' (last element).
153  // If the module name container does not contain ConditionalTask
154  // modules, returns std::pair of end iterators.
155  template <typename T>
156  auto findConditionalTaskModulesRange(T& modnames) {
157  auto beg = std::find(modnames.begin(), modnames.end(), "#");
158  if (beg == modnames.end()) {
159  return std::pair(modnames.end(), modnames.end());
160  }
161  return std::pair(beg + 1, std::prev(modnames.end()));
162  }
163 
164  std::optional<std::string> findBestMatchingAlias(
165  std::unordered_multimap<std::string, edm::BranchDescription const*> const& conditionalModuleBranches,
166  std::unordered_multimap<std::string, StreamSchedule::AliasInfo> const& aliasMap,
167  std::string const& productModuleLabel,
168  ConsumesInfo const& consumesInfo) {
169  std::optional<std::string> best;
170  int wildcardsInBest = std::numeric_limits<int>::max();
171  bool bestIsAmbiguous = false;
172 
173  auto updateBest = [&best, &wildcardsInBest, &bestIsAmbiguous](
174  std::string const& label, bool instanceIsWildcard, bool typeIsWildcard) {
175  int const wildcards = static_cast<int>(instanceIsWildcard) + static_cast<int>(typeIsWildcard);
176  if (wildcards == 0) {
177  bestIsAmbiguous = false;
178  return true;
179  }
180  if (not best or wildcards < wildcardsInBest) {
181  best = label;
182  wildcardsInBest = wildcards;
183  bestIsAmbiguous = false;
184  } else if (best and *best != label and wildcardsInBest == wildcards) {
185  bestIsAmbiguous = true;
186  }
187  return false;
188  };
189 
190  auto findAlias = aliasMap.equal_range(productModuleLabel);
191  for (auto it = findAlias.first; it != findAlias.second; ++it) {
192  std::string const& aliasInstanceLabel =
193  it->second.instanceLabel != "*" ? it->second.instanceLabel : it->second.originalInstanceLabel;
194  bool const instanceIsWildcard = (aliasInstanceLabel == "*");
195  if (instanceIsWildcard or consumesInfo.instance() == aliasInstanceLabel) {
196  bool const typeIsWildcard = it->second.friendlyClassName == "*";
197  if (typeIsWildcard or (consumesInfo.type().friendlyClassName() == it->second.friendlyClassName)) {
198  if (updateBest(it->second.originalModuleLabel, instanceIsWildcard, typeIsWildcard)) {
199  return it->second.originalModuleLabel;
200  }
201  } else if (consumesInfo.kindOfType() == ELEMENT_TYPE) {
202  //consume is a View so need to do more intrusive search
203  //find matching branches in module
204  auto branches = conditionalModuleBranches.equal_range(productModuleLabel);
205  for (auto itBranch = branches.first; itBranch != branches.second; ++it) {
206  if (typeIsWildcard or itBranch->second->productInstanceName() == it->second.originalInstanceLabel) {
207  if (productholderindexhelper::typeIsViewCompatible(consumesInfo.type(),
208  TypeID(itBranch->second->wrappedType().typeInfo()),
209  itBranch->second->className())) {
210  if (updateBest(it->second.originalModuleLabel, instanceIsWildcard, typeIsWildcard)) {
211  return it->second.originalModuleLabel;
212  }
213  }
214  }
215  }
216  }
217  }
218  }
219  if (bestIsAmbiguous) {
221  << "Encountered ambiguity when trying to find a best-matching alias for\n"
222  << " friendly class name " << consumesInfo.type().friendlyClassName() << "\n"
223  << " module label " << productModuleLabel << "\n"
224  << " product instance name " << consumesInfo.instance() << "\n"
225  << "when processing EDAliases for modules in ConditionalTasks. Two aliases have the same number of "
226  "wildcards ("
227  << wildcardsInBest << ")";
228  }
229  return best;
230  }
231  } // namespace
232 
233  // -----------------------------
234 
235  typedef std::vector<std::string> vstring;
236 
237  // -----------------------------
238 
240  public:
242 
244  ProductRegistry& preg,
245  PreallocationConfiguration const* prealloc,
246  std::shared_ptr<ProcessConfiguration const> processConfiguration,
247  WorkerManager& workerManager,
248  std::vector<std::string> const& trigPathNames) {
249  std::unordered_set<std::string> allConditionalMods;
250  for (auto const& pathName : trigPathNames) {
251  auto const modnames = proc_pset.getParameter<vstring>(pathName);
252 
253  //Pull out ConditionalTask modules
254  auto condRange = findConditionalTaskModulesRange(modnames);
255  if (condRange.first == condRange.second)
256  continue;
257 
258  //the last entry should be ignored since it is required to be "@"
259  allConditionalMods.insert(condRange.first, condRange.second);
260  }
261 
262  for (auto const& cond : allConditionalMods) {
263  //force the creation of the conditional modules so alias check can work
264  (void)getWorker(cond, proc_pset, workerManager, preg, prealloc, processConfiguration);
265  }
266 
267  fillAliasMap(proc_pset, allConditionalMods);
268  processSwitchEDAliases(proc_pset, preg, *processConfiguration, allConditionalMods);
269 
270  //find branches created by the conditional modules
271  for (auto const& prod : preg.productList()) {
272  if (allConditionalMods.find(prod.first.moduleLabel()) != allConditionalMods.end()) {
273  conditionalModsBranches_.emplace(prod.first.moduleLabel(), &prod.second);
274  }
275  }
276  }
277 
278  std::unordered_multimap<std::string, AliasInfo> const& aliasMap() const { return aliasMap_; }
279 
280  std::unordered_multimap<std::string, edm::BranchDescription const*> conditionalModuleBranches(
281  std::unordered_set<std::string> const& conditionalmods) const {
282  std::unordered_multimap<std::string, edm::BranchDescription const*> ret;
283  for (auto const& mod : conditionalmods) {
284  auto range = conditionalModsBranches_.equal_range(mod);
285  ret.insert(range.first, range.second);
286  }
287  return ret;
288  }
289 
290  private:
291  void fillAliasMap(ParameterSet const& proc_pset, std::unordered_set<std::string> const& allConditionalMods) {
292  auto aliases = proc_pset.getParameter<std::vector<std::string>>("@all_aliases");
293  std::string const star("*");
294  for (auto const& alias : aliases) {
295  auto info = proc_pset.getParameter<edm::ParameterSet>(alias);
296  auto aliasedToModuleLabels = info.getParameterNames();
297  for (auto const& mod : aliasedToModuleLabels) {
298  if (not mod.empty() and mod[0] != '@' and allConditionalMods.find(mod) != allConditionalMods.end()) {
299  auto aliasVPSet = info.getParameter<std::vector<edm::ParameterSet>>(mod);
300  for (auto const& aliasPSet : aliasVPSet) {
301  std::string type = star;
302  std::string instance = star;
303  std::string originalInstance = star;
304  if (aliasPSet.exists("type")) {
305  type = aliasPSet.getParameter<std::string>("type");
306  }
307  if (aliasPSet.exists("toProductInstance")) {
308  instance = aliasPSet.getParameter<std::string>("toProductInstance");
309  }
310  if (aliasPSet.exists("fromProductInstance")) {
311  originalInstance = aliasPSet.getParameter<std::string>("fromProductInstance");
312  }
313 
314  aliasMap_.emplace(alias, AliasInfo{type, instance, originalInstance, mod});
315  }
316  }
317  }
318  }
319  }
320 
321  void processSwitchEDAliases(ParameterSet const& proc_pset,
322  ProductRegistry& preg,
323  ProcessConfiguration const& processConfiguration,
324  std::unordered_set<std::string> const& allConditionalMods) {
325  auto const& all_modules = proc_pset.getParameter<std::vector<std::string>>("@all_modules");
326  std::vector<std::string> switchEDAliases;
327  for (auto const& module : all_modules) {
328  auto const& mod_pset = proc_pset.getParameter<edm::ParameterSet>(module);
329  if (mod_pset.getParameter<std::string>("@module_type") == "SwitchProducer") {
330  auto const& all_cases = mod_pset.getParameter<std::vector<std::string>>("@all_cases");
331  for (auto const& case_label : all_cases) {
332  auto range = aliasMap_.equal_range(case_label);
333  if (range.first != range.second) {
334  switchEDAliases.push_back(case_label);
335  }
336  }
337  }
338  }
340  switchEDAliases, allConditionalMods, proc_pset, processConfiguration.processName(), preg);
341  }
342 
343  std::unordered_multimap<std::string, AliasInfo> aliasMap_;
344  std::unordered_multimap<std::string, edm::BranchDescription const*> conditionalModsBranches_;
345  };
346 
347  // -----------------------------
348 
350  std::shared_ptr<TriggerResultInserter> inserter,
351  std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
352  std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
353  std::shared_ptr<ModuleRegistry> modReg,
354  ParameterSet& proc_pset,
355  service::TriggerNamesService const& tns,
356  PreallocationConfiguration const& prealloc,
357  ProductRegistry& preg,
359  std::shared_ptr<ActivityRegistry> areg,
360  std::shared_ptr<ProcessConfiguration const> processConfiguration,
361  StreamID streamID,
362  ProcessContext const* processContext)
363  : workerManager_(modReg, areg, actions),
364  actReg_(areg),
365  results_(new HLTGlobalStatus(tns.getTrigPaths().size())),
366  results_inserter_(),
367  trig_paths_(),
368  end_paths_(),
369  total_events_(),
370  total_passed_(),
371  number_of_unscheduled_modules_(0),
372  streamID_(streamID),
373  streamContext_(streamID_, processContext),
374  skippingEvent_(false) {
375  bool hasPath = false;
376  std::vector<std::string> const& pathNames = tns.getTrigPaths();
377  std::vector<std::string> const& endPathNames = tns.getEndPaths();
378 
379  ConditionalTaskHelper conditionalTaskHelper(
380  proc_pset, preg, &prealloc, processConfiguration, workerManager_, pathNames);
381 
382  int trig_bitpos = 0;
383  trig_paths_.reserve(pathNames.size());
384  for (auto const& trig_name : pathNames) {
385  fillTrigPath(proc_pset,
386  preg,
387  &prealloc,
388  processConfiguration,
389  trig_bitpos,
390  trig_name,
391  results(),
392  endPathNames,
393  conditionalTaskHelper);
394  ++trig_bitpos;
395  hasPath = true;
396  }
397 
398  if (hasPath) {
399  // the results inserter stands alone
400  inserter->setTrigResultForStream(streamID.value(), results());
401 
402  results_inserter_ = makeInserter(actions, actReg_, inserter);
404  }
405 
406  // fill normal endpaths
407  int bitpos = 0;
408  end_paths_.reserve(endPathNames.size());
409  for (auto const& end_path_name : endPathNames) {
410  fillEndPath(
411  proc_pset, preg, &prealloc, processConfiguration, bitpos, end_path_name, endPathNames, conditionalTaskHelper);
412  ++bitpos;
413  }
414 
415  makePathStatusInserters(pathStatusInserters, endPathStatusInserters, actions);
416 
417  //See if all modules were used
418  std::set<std::string> usedWorkerLabels;
419  for (auto const& worker : allWorkers()) {
420  usedWorkerLabels.insert(worker->description()->moduleLabel());
421  }
422  std::vector<std::string> modulesInConfig(proc_pset.getParameter<std::vector<std::string>>("@all_modules"));
423  std::set<std::string> modulesInConfigSet(modulesInConfig.begin(), modulesInConfig.end());
424  std::vector<std::string> unusedLabels;
425  set_difference(modulesInConfigSet.begin(),
426  modulesInConfigSet.end(),
427  usedWorkerLabels.begin(),
428  usedWorkerLabels.end(),
429  back_inserter(unusedLabels));
430  std::set<std::string> unscheduledLabels;
431  std::vector<std::string> shouldBeUsedLabels;
432  if (!unusedLabels.empty()) {
433  //Need to
434  // 1) create worker
435  // 2) if it is a WorkerT<EDProducer>, add it to our list
436  // 3) hand list to our delayed reader
437  for (auto const& label : unusedLabels) {
438  bool isTracked;
439  ParameterSet* modulePSet(proc_pset.getPSetForUpdate(label, isTracked));
440  assert(isTracked);
441  assert(modulePSet != nullptr);
443  *modulePSet, preg, &prealloc, processConfiguration, label, unscheduledLabels, shouldBeUsedLabels);
444  }
445  if (!shouldBeUsedLabels.empty()) {
446  std::ostringstream unusedStream;
447  unusedStream << "'" << shouldBeUsedLabels.front() << "'";
448  for (std::vector<std::string>::iterator itLabel = shouldBeUsedLabels.begin() + 1,
449  itLabelEnd = shouldBeUsedLabels.end();
450  itLabel != itLabelEnd;
451  ++itLabel) {
452  unusedStream << ",'" << *itLabel << "'";
453  }
454  LogInfo("path") << "The following module labels are not assigned to any path:\n" << unusedStream.str() << "\n";
455  }
456  }
457  number_of_unscheduled_modules_ = unscheduledLabels.size();
458  } // StreamSchedule::StreamSchedule
459 
461  std::vector<std::string> const& branchesToDeleteEarly,
462  std::multimap<std::string, std::string> const& referencesToBranches,
463  std::vector<std::string> const& modulesToSkip,
464  edm::ProductRegistry const& preg) {
465  // setup the list with those products actually registered for this job
466  std::multimap<std::string, Worker*> branchToReadingWorker;
467  initializeBranchToReadingWorker(branchesToDeleteEarly, preg, branchToReadingWorker);
468 
469  const std::vector<std::string> kEmpty;
470  std::map<Worker*, unsigned int> reserveSizeForWorker;
471  unsigned int upperLimitOnReadingWorker = 0;
472  unsigned int upperLimitOnIndicies = 0;
473  unsigned int nUniqueBranchesToDelete = branchToReadingWorker.size();
474 
475  //talk with output modules first
476  modReg.forAllModuleHolders([&branchToReadingWorker, &nUniqueBranchesToDelete](maker::ModuleHolder* iHolder) {
477  auto comm = iHolder->createOutputModuleCommunicator();
478  if (comm) {
479  if (!branchToReadingWorker.empty()) {
480  //If an OutputModule needs a product, we can't delete it early
481  // so we should remove it from our list
482  SelectedProductsForBranchType const& kept = comm->keptProducts();
483  for (auto const& item : kept[InEvent]) {
484  BranchDescription const& desc = *item.first;
485  auto found = branchToReadingWorker.equal_range(desc.branchName());
486  if (found.first != found.second) {
487  --nUniqueBranchesToDelete;
488  branchToReadingWorker.erase(found.first, found.second);
489  }
490  }
491  }
492  }
493  });
494 
495  if (branchToReadingWorker.empty()) {
496  return;
497  }
498 
499  std::unordered_set<std::string> modulesToExclude(modulesToSkip.begin(), modulesToSkip.end());
500  for (auto w : allWorkers()) {
501  if (modulesToExclude.end() != modulesToExclude.find(w->description()->moduleLabel())) {
502  continue;
503  }
504  //determine if this module could read a branch we want to delete early
505  auto consumes = w->consumesInfo();
506  if (not consumes.empty()) {
507  bool foundAtLeastOneMatchingBranch = false;
508  for (auto const& product : consumes) {
509  std::string branch = fmt::format("{}_{}_{}_{}",
510  product.type().friendlyClassName(),
511  product.label().data(),
512  product.instance().data(),
513  product.process().data());
514  {
515  //Handle case where worker directly consumes product
516  auto found = branchToReadingWorker.end();
517  if (product.process().empty()) {
518  auto startFound = branchToReadingWorker.lower_bound(branch);
519  if (startFound != branchToReadingWorker.end()) {
520  if (startFound->first.substr(0, branch.size()) == branch) {
521  //match all processNames here, even if it means multiple matches will happen
522  found = startFound;
523  }
524  }
525  } else {
526  auto exactFound = branchToReadingWorker.equal_range(branch);
527  if (exactFound.first != exactFound.second) {
528  found = exactFound.first;
529  }
530  }
531  if (found != branchToReadingWorker.end()) {
532  if (not foundAtLeastOneMatchingBranch) {
533  ++upperLimitOnReadingWorker;
534  foundAtLeastOneMatchingBranch = true;
535  }
536  ++upperLimitOnIndicies;
537  ++reserveSizeForWorker[w];
538  if (nullptr == found->second) {
539  found->second = w;
540  } else {
541  branchToReadingWorker.insert(make_pair(found->first, w));
542  }
543  }
544  }
545  {
546  //Handle case where indirectly consumes product
547  auto found = referencesToBranches.end();
548  if (product.process().empty()) {
549  auto startFound = referencesToBranches.lower_bound(branch);
550  if (startFound != referencesToBranches.end()) {
551  if (startFound->first.substr(0, branch.size()) == branch) {
552  //match all processNames here, even if it means multiple matches will happen
553  found = startFound;
554  }
555  }
556  } else {
557  //can match exactly
558  auto exactFound = referencesToBranches.equal_range(branch);
559  if (exactFound.first != exactFound.second) {
560  found = exactFound.first;
561  }
562  }
563  if (found != referencesToBranches.end()) {
564  for (auto itr = found; (itr != referencesToBranches.end()) and (itr->first == found->first); ++itr) {
565  auto foundInBranchToReadingWorker = branchToReadingWorker.find(itr->second);
566  if (foundInBranchToReadingWorker == branchToReadingWorker.end()) {
567  continue;
568  }
569  if (not foundAtLeastOneMatchingBranch) {
570  ++upperLimitOnReadingWorker;
571  foundAtLeastOneMatchingBranch = true;
572  }
573  ++upperLimitOnIndicies;
574  ++reserveSizeForWorker[w];
575  if (nullptr == foundInBranchToReadingWorker->second) {
576  foundInBranchToReadingWorker->second = w;
577  } else {
578  branchToReadingWorker.insert(make_pair(itr->second, w));
579  }
580  }
581  }
582  }
583  }
584  }
585  }
586  {
587  auto it = branchToReadingWorker.begin();
588  std::vector<std::string> unusedBranches;
589  while (it != branchToReadingWorker.end()) {
590  if (it->second == nullptr) {
591  unusedBranches.push_back(it->first);
592  //erasing the object invalidates the iterator so must advance it first
593  auto temp = it;
594  ++it;
595  branchToReadingWorker.erase(temp);
596  } else {
597  ++it;
598  }
599  }
600  if (not unusedBranches.empty()) {
601  LogWarning l("UnusedProductsForCanDeleteEarly");
602  l << "The following products in the 'canDeleteEarly' list are not used in this job and will be ignored.\n"
603  " If possible, remove the producer from the job.";
604  for (auto const& n : unusedBranches) {
605  l << "\n " << n;
606  }
607  }
608  }
609  if (!branchToReadingWorker.empty()) {
610  earlyDeleteHelpers_.reserve(upperLimitOnReadingWorker);
611  earlyDeleteHelperToBranchIndicies_.resize(upperLimitOnIndicies, 0);
612  earlyDeleteBranchToCount_.reserve(nUniqueBranchesToDelete);
613  std::map<const Worker*, EarlyDeleteHelper*> alreadySeenWorkers;
614  std::string lastBranchName;
615  size_t nextOpenIndex = 0;
616  unsigned int* beginAddress = &(earlyDeleteHelperToBranchIndicies_.front());
617  for (auto& branchAndWorker : branchToReadingWorker) {
618  if (lastBranchName != branchAndWorker.first) {
619  //have to put back the period we removed earlier in order to get the proper name
620  BranchID bid(branchAndWorker.first + ".");
621  earlyDeleteBranchToCount_.emplace_back(bid, 0U);
622  lastBranchName = branchAndWorker.first;
623  }
624  auto found = alreadySeenWorkers.find(branchAndWorker.second);
625  if (alreadySeenWorkers.end() == found) {
626  //NOTE: we will set aside enough space in earlyDeleteHelperToBranchIndicies_ to accommodate
627  // all the branches that might be read by this worker. However, initially we will only tell the
628  // EarlyDeleteHelper about the first one. As additional branches are added via 'appendIndex' the
629  // EarlyDeleteHelper will automatically advance its internal end pointer.
630  size_t index = nextOpenIndex;
631  size_t nIndices = reserveSizeForWorker[branchAndWorker.second];
634  earlyDeleteHelpers_.emplace_back(beginAddress + index, beginAddress + index + 1, &earlyDeleteBranchToCount_);
635  branchAndWorker.second->setEarlyDeleteHelper(&(earlyDeleteHelpers_.back()));
636  alreadySeenWorkers.insert(std::make_pair(branchAndWorker.second, &(earlyDeleteHelpers_.back())));
637  nextOpenIndex += nIndices;
638  } else {
639  found->second->appendIndex(earlyDeleteBranchToCount_.size() - 1);
640  }
641  }
642 
643  //Now we can compactify the earlyDeleteHelperToBranchIndicies_ since we may have over estimated the
644  // space needed for each module
645  auto itLast = earlyDeleteHelpers_.begin();
646  for (auto it = earlyDeleteHelpers_.begin() + 1; it != earlyDeleteHelpers_.end(); ++it) {
647  if (itLast->end() != it->begin()) {
648  //figure the offset for next Worker since it hasn't been moved yet so it has the original address
649  unsigned int delta = it->begin() - itLast->end();
650  it->shiftIndexPointers(delta);
651 
653  earlyDeleteHelperToBranchIndicies_.begin() + (itLast->end() - beginAddress),
654  earlyDeleteHelperToBranchIndicies_.begin() + (it->begin() - beginAddress));
655  }
656  itLast = it;
657  }
659  earlyDeleteHelperToBranchIndicies_.begin() + (itLast->end() - beginAddress),
661 
662  //now tell the paths about the deleters
663  for (auto& p : trig_paths_) {
664  p.setEarlyDeleteHelpers(alreadySeenWorkers);
665  }
666  for (auto& p : end_paths_) {
667  p.setEarlyDeleteHelpers(alreadySeenWorkers);
668  }
670  }
671  }
672 
674  Worker* worker,
675  std::unordered_set<std::string>& conditionalModules,
676  std::unordered_multimap<std::string, edm::BranchDescription const*> const& conditionalModuleBranches,
677  std::unordered_multimap<std::string, AliasInfo> const& aliasMap,
678  ParameterSet& proc_pset,
679  ProductRegistry& preg,
680  PreallocationConfiguration const* prealloc,
681  std::shared_ptr<ProcessConfiguration const> processConfiguration) {
682  std::vector<Worker*> returnValue;
683  auto const& consumesInfo = worker->consumesInfo();
684  auto moduleLabel = worker->description()->moduleLabel();
685  using namespace productholderindexhelper;
686  for (auto const& ci : consumesInfo) {
687  if (not ci.skipCurrentProcess() and
688  (ci.process().empty() or ci.process() == processConfiguration->processName())) {
689  auto productModuleLabel = std::string(ci.label());
690  if (productModuleLabel.empty()) {
691  //this is a consumesMany request
692  for (auto const& branch : conditionalModuleBranches) {
693  //check that the conditional module has not been used
694  if (conditionalModules.find(branch.first) == conditionalModules.end()) {
695  continue;
696  }
697  if (ci.kindOfType() == edm::PRODUCT_TYPE) {
698  if (branch.second->unwrappedTypeID() != ci.type()) {
699  continue;
700  }
701  } else {
702  if (not typeIsViewCompatible(
703  ci.type(), TypeID(branch.second->wrappedType().typeInfo()), branch.second->className())) {
704  continue;
705  }
706  }
707 
708  auto condWorker = getWorker(branch.first, proc_pset, workerManager_, preg, prealloc, processConfiguration);
709  assert(condWorker);
710 
711  conditionalModules.erase(branch.first);
712 
713  auto dependents = tryToPlaceConditionalModules(condWorker,
714  conditionalModules,
715  conditionalModuleBranches,
716  aliasMap,
717  proc_pset,
718  preg,
719  prealloc,
720  processConfiguration);
721  returnValue.insert(returnValue.end(), dependents.begin(), dependents.end());
722  returnValue.push_back(condWorker);
723  }
724  } else {
725  //just a regular consumes
726  bool productFromConditionalModule = false;
727  auto itFound = conditionalModules.find(productModuleLabel);
728  if (itFound == conditionalModules.end()) {
729  //Check to see if this was an alias
730  //note that aliasMap was previously filtered so only the conditional modules remain there
731  auto foundAlias = findBestMatchingAlias(conditionalModuleBranches, aliasMap, productModuleLabel, ci);
732  if (foundAlias) {
733  productModuleLabel = *foundAlias;
734  productFromConditionalModule = true;
735  itFound = conditionalModules.find(productModuleLabel);
736  //check that the alias-for conditional module has not been used
737  if (itFound == conditionalModules.end()) {
738  continue;
739  }
740  }
741  } else {
742  //need to check the rest of the data product info
743  auto findBranches = conditionalModuleBranches.equal_range(productModuleLabel);
744  for (auto itBranch = findBranches.first; itBranch != findBranches.second; ++itBranch) {
745  if (itBranch->second->productInstanceName() == ci.instance()) {
746  if (ci.kindOfType() == PRODUCT_TYPE) {
747  if (ci.type() == itBranch->second->unwrappedTypeID()) {
748  productFromConditionalModule = true;
749  break;
750  }
751  } else {
752  //this is a view
753  if (typeIsViewCompatible(ci.type(),
754  TypeID(itBranch->second->wrappedType().typeInfo()),
755  itBranch->second->className())) {
756  productFromConditionalModule = true;
757  break;
758  }
759  }
760  }
761  }
762  }
763  if (productFromConditionalModule) {
764  auto condWorker =
765  getWorker(productModuleLabel, proc_pset, workerManager_, preg, prealloc, processConfiguration);
766  assert(condWorker);
767 
768  conditionalModules.erase(itFound);
769 
770  auto dependents = tryToPlaceConditionalModules(condWorker,
771  conditionalModules,
772  conditionalModuleBranches,
773  aliasMap,
774  proc_pset,
775  preg,
776  prealloc,
777  processConfiguration);
778  returnValue.insert(returnValue.end(), dependents.begin(), dependents.end());
779  returnValue.push_back(condWorker);
780  }
781  }
782  }
783  }
784  return returnValue;
785  }
786 
788  ProductRegistry& preg,
789  PreallocationConfiguration const* prealloc,
790  std::shared_ptr<ProcessConfiguration const> processConfiguration,
791  std::string const& pathName,
792  bool ignoreFilters,
793  PathWorkers& out,
794  std::vector<std::string> const& endPathNames,
795  ConditionalTaskHelper const& conditionalTaskHelper) {
796  vstring modnames = proc_pset.getParameter<vstring>(pathName);
797  PathWorkers tmpworkers;
798 
799  //Pull out ConditionalTask modules
800  auto condRange = findConditionalTaskModulesRange(modnames);
801 
802  std::unordered_set<std::string> conditionalmods;
803  //An EDAlias may be redirecting to a module on a ConditionalTask
804  std::unordered_multimap<std::string, edm::BranchDescription const*> conditionalModsBranches;
805  std::unordered_map<std::string, unsigned int> conditionalModOrder;
806  if (condRange.first != condRange.second) {
807  for (auto it = condRange.first; it != condRange.second; ++it) {
808  // ordering needs to skip the # token in the path list
809  conditionalModOrder.emplace(*it, it - modnames.begin() - 1);
810  }
811  //the last entry should be ignored since it is required to be "@"
812  conditionalmods = std::unordered_set<std::string>(std::make_move_iterator(condRange.first),
813  std::make_move_iterator(condRange.second));
814 
815  conditionalModsBranches = conditionalTaskHelper.conditionalModuleBranches(conditionalmods);
816  modnames.erase(std::prev(condRange.first), modnames.end());
817  }
818 
819  unsigned int placeInPath = 0;
820  for (auto const& name : modnames) {
821  //Modules except EDFilters are set to run concurrently by default
822  bool doNotRunConcurrently = false;
824  if (name[0] == '!') {
825  filterAction = WorkerInPath::Veto;
826  } else if (name[0] == '-' or name[0] == '+') {
827  filterAction = WorkerInPath::Ignore;
828  }
829  if (name[0] == '|' or name[0] == '+') {
830  //cms.wait was specified so do not run concurrently
831  doNotRunConcurrently = true;
832  }
833 
835  if (filterAction != WorkerInPath::Normal or name[0] == '|') {
836  moduleLabel.erase(0, 1);
837  }
838 
839  Worker* worker = getWorker(moduleLabel, proc_pset, workerManager_, preg, prealloc, processConfiguration);
840  if (worker == nullptr) {
841  std::string pathType("endpath");
842  if (!search_all(endPathNames, pathName)) {
843  pathType = std::string("path");
844  }
846  << "The unknown module label \"" << moduleLabel << "\" appears in " << pathType << " \"" << pathName
847  << "\"\n please check spelling or remove that label from the path.";
848  }
849 
850  if (ignoreFilters && filterAction != WorkerInPath::Ignore && worker->moduleType() == Worker::kFilter) {
851  // We have a filter on an end path, and the filter is not explicitly ignored.
852  // See if the filter is allowed.
853  std::vector<std::string> allowed_filters = proc_pset.getUntrackedParameter<vstring>("@filters_on_endpaths");
854  if (!search_all(allowed_filters, worker->description()->moduleName())) {
855  // Filter is not allowed. Ignore the result, and issue a warning.
856  filterAction = WorkerInPath::Ignore;
857  LogWarning("FilterOnEndPath") << "The EDFilter '" << worker->description()->moduleName()
858  << "' with module label '" << moduleLabel << "' appears on EndPath '"
859  << pathName << "'.\n"
860  << "The return value of the filter will be ignored.\n"
861  << "To suppress this warning, either remove the filter from the endpath,\n"
862  << "or explicitly ignore it in the configuration by using cms.ignore().\n";
863  }
864  }
865  bool runConcurrently = not doNotRunConcurrently;
866  if (runConcurrently && worker->moduleType() == Worker::kFilter and filterAction != WorkerInPath::Ignore) {
867  runConcurrently = false;
868  }
869 
870  auto condModules = tryToPlaceConditionalModules(worker,
871  conditionalmods,
872  conditionalModsBranches,
873  conditionalTaskHelper.aliasMap(),
874  proc_pset,
875  preg,
876  prealloc,
877  processConfiguration);
878  for (auto condMod : condModules) {
879  tmpworkers.emplace_back(
880  condMod, WorkerInPath::Ignore, conditionalModOrder[condMod->description()->moduleLabel()], true);
881  }
882 
883  tmpworkers.emplace_back(worker, filterAction, placeInPath, runConcurrently);
884  ++placeInPath;
885  }
886 
887  out.swap(tmpworkers);
888  }
889 
891  ProductRegistry& preg,
892  PreallocationConfiguration const* prealloc,
893  std::shared_ptr<ProcessConfiguration const> processConfiguration,
894  int bitpos,
895  std::string const& name,
896  TrigResPtr trptr,
897  std::vector<std::string> const& endPathNames,
898  ConditionalTaskHelper const& conditionalTaskHelper) {
899  PathWorkers tmpworkers;
900  fillWorkers(
901  proc_pset, preg, prealloc, processConfiguration, name, false, tmpworkers, endPathNames, conditionalTaskHelper);
902 
903  // an empty path will cause an extra bit that is not used
904  if (!tmpworkers.empty()) {
905  trig_paths_.emplace_back(bitpos,
906  name,
907  tmpworkers,
908  trptr,
909  actionTable(),
910  actReg_,
914  } else {
915  empty_trig_paths_.push_back(bitpos);
916  }
917  for (WorkerInPath const& workerInPath : tmpworkers) {
918  addToAllWorkers(workerInPath.getWorker());
919  }
920  }
921 
923  ProductRegistry& preg,
924  PreallocationConfiguration const* prealloc,
925  std::shared_ptr<ProcessConfiguration const> processConfiguration,
926  int bitpos,
927  std::string const& name,
928  std::vector<std::string> const& endPathNames,
929  ConditionalTaskHelper const& conditionalTaskHelper) {
930  PathWorkers tmpworkers;
931  fillWorkers(
932  proc_pset, preg, prealloc, processConfiguration, name, true, tmpworkers, endPathNames, conditionalTaskHelper);
933 
934  if (!tmpworkers.empty()) {
935  //EndPaths are not supposed to stop if SkipEvent type exception happens
936  end_paths_.emplace_back(bitpos,
937  name,
938  tmpworkers,
939  TrigResPtr(),
940  actionTable(),
941  actReg_,
943  nullptr,
945  } else {
946  empty_end_paths_.push_back(bitpos);
947  }
948  for (WorkerInPath const& workerInPath : tmpworkers) {
949  addToAllWorkers(workerInPath.getWorker());
950  }
951  }
952 
954 
956 
958  Worker* found = nullptr;
959  for (auto const& worker : allWorkers()) {
960  if (worker->description()->moduleLabel() == iLabel) {
961  found = worker;
962  break;
963  }
964  }
965  if (nullptr == found) {
966  return;
967  }
968 
969  iMod->replaceModuleFor(found);
970  found->beginStream(streamID_, streamContext_);
971  }
972 
974 
975  std::vector<ModuleDescription const*> StreamSchedule::getAllModuleDescriptions() const {
976  std::vector<ModuleDescription const*> result;
977  result.reserve(allWorkers().size());
978 
979  for (auto const& worker : allWorkers()) {
980  ModuleDescription const* p = worker->description();
981  result.push_back(p);
982  }
983  return result;
984  }
985 
987  WaitingTaskHolder iTask,
989  ServiceToken const& serviceToken,
990  std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters) {
991  EventPrincipal& ep = info.principal();
992 
993  // Caught exception is propagated via WaitingTaskHolder
994  CMS_SA_ALLOW try {
995  this->resetAll();
996 
998 
999  Traits::setStreamContext(streamContext_, ep);
1000  //a service may want to communicate with another service
1001  ServiceRegistry::Operate guard(serviceToken);
1002  Traits::preScheduleSignal(actReg_.get(), &streamContext_);
1003 
1004  // Data dependencies need to be set up before marking empty
1005  // (End)Paths complete in case something consumes the status of
1006  // the empty (EndPath)
1009 
1010  HLTPathStatus hltPathStatus(hlt::Pass, 0);
1011  for (int empty_trig_path : empty_trig_paths_) {
1012  results_->at(empty_trig_path) = hltPathStatus;
1013  pathStatusInserters[empty_trig_path]->setPathStatus(streamID_, hltPathStatus);
1014  std::exception_ptr except = pathStatusInserterWorkers_[empty_trig_path]
1017  if (except) {
1018  iTask.doneWaiting(except);
1019  return;
1020  }
1021  }
1022  for (int empty_end_path : empty_end_paths_) {
1023  std::exception_ptr except = endPathStatusInserterWorkers_[empty_end_path]
1026  if (except) {
1027  iTask.doneWaiting(except);
1028  return;
1029  }
1030  }
1031 
1032  ++total_events_;
1033 
1034  //use to give priorities on an error to ones from Paths
1035  auto pathErrorHolder = std::make_unique<std::atomic<std::exception_ptr*>>(nullptr);
1036  auto pathErrorPtr = pathErrorHolder.get();
1037  ServiceWeakToken weakToken = serviceToken;
1038  auto allPathsDone = make_waiting_task(
1039  [iTask, this, weakToken, pathError = std::move(pathErrorHolder)](std::exception_ptr const* iPtr) mutable {
1040  ServiceRegistry::Operate operate(weakToken.lock());
1041 
1042  std::exception_ptr ptr;
1043  if (pathError->load()) {
1044  ptr = *pathError->load();
1045  delete pathError->load();
1046  }
1047  if ((not ptr) and iPtr) {
1048  ptr = *iPtr;
1049  }
1050  iTask.doneWaiting(finishProcessOneEvent(ptr));
1051  });
1052  //The holder guarantees that if the paths finish before the loop ends
1053  // that we do not start too soon. It also guarantees that the task will
1054  // run under that condition.
1055  WaitingTaskHolder allPathsHolder(*iTask.group(), allPathsDone);
1056 
1057  auto pathsDone = make_waiting_task([allPathsHolder, pathErrorPtr, transitionInfo = info, this, weakToken](
1058  std::exception_ptr const* iPtr) mutable {
1059  ServiceRegistry::Operate operate(weakToken.lock());
1060 
1061  if (iPtr) {
1062  //this is used to prioritize this error over one
1063  // that happens in EndPath or Accumulate
1064  pathErrorPtr->store(new std::exception_ptr(*iPtr));
1065  }
1066  finishedPaths(*pathErrorPtr, std::move(allPathsHolder), transitionInfo);
1067  });
1068 
1069  //The holder guarantees that if the paths finish before the loop ends
1070  // that we do not start too soon. It also guarantees that the task will
1071  // run under that condition.
1072  WaitingTaskHolder taskHolder(*iTask.group(), pathsDone);
1073 
1074  //start end paths first so on single threaded the paths will run first
1075  WaitingTaskHolder hAllPathsDone(*iTask.group(), allPathsDone);
1076  for (auto it = end_paths_.rbegin(), itEnd = end_paths_.rend(); it != itEnd; ++it) {
1077  it->processOneOccurrenceAsync(hAllPathsDone, info, serviceToken, streamID_, &streamContext_);
1078  }
1079 
1080  for (auto it = trig_paths_.rbegin(), itEnd = trig_paths_.rend(); it != itEnd; ++it) {
1081  it->processOneOccurrenceAsync(taskHolder, info, serviceToken, streamID_, &streamContext_);
1082  }
1083 
1084  ParentContext parentContext(&streamContext_);
1086  hAllPathsDone, info, serviceToken, streamID_, parentContext, &streamContext_);
1087  } catch (...) {
1088  iTask.doneWaiting(std::current_exception());
1089  }
1090  }
1091 
1092  void StreamSchedule::finishedPaths(std::atomic<std::exception_ptr*>& iExcept,
1093  WaitingTaskHolder iWait,
1095  if (iExcept) {
1096  // Caught exception is propagated via WaitingTaskHolder
1097  CMS_SA_ALLOW try { std::rethrow_exception(*(iExcept.load())); } catch (cms::Exception& e) {
1102  edm::printCmsExceptionWarning("SkipEvent", e);
1103  *(iExcept.load()) = std::exception_ptr();
1104  } else {
1105  *(iExcept.load()) = std::current_exception();
1106  }
1107  } catch (...) {
1108  *(iExcept.load()) = std::current_exception();
1109  }
1110  }
1111 
1112  if ((not iExcept) and results_->accept()) {
1113  ++total_passed_;
1114  }
1115 
1116  if (nullptr != results_inserter_.get()) {
1117  // Caught exception is propagated to the caller
1118  CMS_SA_ALLOW try {
1119  //Even if there was an exception, we need to allow results inserter
1120  // to run since some module may be waiting on its results.
1121  ParentContext parentContext(&streamContext_);
1123 
1124  auto expt = results_inserter_->runModuleDirectly<Traits>(info, streamID_, parentContext, &streamContext_);
1125  if (expt) {
1126  std::rethrow_exception(expt);
1127  }
1128  } catch (cms::Exception& ex) {
1129  if (not iExcept) {
1130  if (ex.context().empty()) {
1131  std::ostringstream ost;
1132  ost << "Processing Event " << info.principal().id();
1133  ex.addContext(ost.str());
1134  }
1135  iExcept.store(new std::exception_ptr(std::current_exception()));
1136  }
1137  } catch (...) {
1138  if (not iExcept) {
1139  iExcept.store(new std::exception_ptr(std::current_exception()));
1140  }
1141  }
1142  }
1143  std::exception_ptr ptr;
1144  if (iExcept) {
1145  ptr = *iExcept.load();
1146  }
1147  iWait.doneWaiting(ptr);
1148  }
1149 
1150  std::exception_ptr StreamSchedule::finishProcessOneEvent(std::exception_ptr iExcept) {
1152 
1153  if (iExcept) {
1154  //add context information to the exception and print message
1155  try {
1156  convertException::wrap([&]() { std::rethrow_exception(iExcept); });
1157  } catch (cms::Exception& ex) {
1158  bool const cleaningUpAfterException = false;
1159  if (ex.context().empty()) {
1160  addContextAndPrintException("Calling function StreamSchedule::processOneEvent", ex, cleaningUpAfterException);
1161  } else {
1162  addContextAndPrintException("", ex, cleaningUpAfterException);
1163  }
1164  iExcept = std::current_exception();
1165  }
1166 
1167  actReg_->preStreamEarlyTerminationSignal_(streamContext_, TerminationOrigin::ExceptionFromThisContext);
1168  }
1169  // Caught exception is propagated to the caller
1170  CMS_SA_ALLOW try { Traits::postScheduleSignal(actReg_.get(), &streamContext_); } catch (...) {
1171  if (not iExcept) {
1172  iExcept = std::current_exception();
1173  }
1174  }
1175  if (not iExcept) {
1176  resetEarlyDelete();
1177  }
1178 
1179  return iExcept;
1180  }
1181 
1182  void StreamSchedule::availablePaths(std::vector<std::string>& oLabelsToFill) const {
1183  oLabelsToFill.reserve(trig_paths_.size());
1184  std::transform(trig_paths_.begin(),
1185  trig_paths_.end(),
1186  std::back_inserter(oLabelsToFill),
1187  std::bind(&Path::name, std::placeholders::_1));
1188  }
1189 
1190  void StreamSchedule::modulesInPath(std::string const& iPathLabel, std::vector<std::string>& oLabelsToFill) const {
1191  TrigPaths::const_iterator itFound = std::find_if(
1192  trig_paths_.begin(),
1193  trig_paths_.end(),
1194  std::bind(std::equal_to<std::string>(), iPathLabel, std::bind(&Path::name, std::placeholders::_1)));
1195  if (itFound != trig_paths_.end()) {
1196  oLabelsToFill.reserve(itFound->size());
1197  for (size_t i = 0; i < itFound->size(); ++i) {
1198  oLabelsToFill.push_back(itFound->getWorker(i)->description()->moduleLabel());
1199  }
1200  }
1201  }
1202 
1204  std::vector<ModuleDescription const*>& descriptions,
1205  unsigned int hint) const {
1206  descriptions.clear();
1207  bool found = false;
1208  TrigPaths::const_iterator itFound;
1209 
1210  if (hint < trig_paths_.size()) {
1211  itFound = trig_paths_.begin() + hint;
1212  if (itFound->name() == iPathLabel)
1213  found = true;
1214  }
1215  if (!found) {
1216  // if the hint did not work, do it the slow way
1217  itFound = std::find_if(
1218  trig_paths_.begin(),
1219  trig_paths_.end(),
1220  std::bind(std::equal_to<std::string>(), iPathLabel, std::bind(&Path::name, std::placeholders::_1)));
1221  if (itFound != trig_paths_.end())
1222  found = true;
1223  }
1224  if (found) {
1225  descriptions.reserve(itFound->size());
1226  for (size_t i = 0; i < itFound->size(); ++i) {
1227  descriptions.push_back(itFound->getWorker(i)->description());
1228  }
1229  }
1230  }
1231 
1233  std::vector<ModuleDescription const*>& descriptions,
1234  unsigned int hint) const {
1235  descriptions.clear();
1236  bool found = false;
1237  TrigPaths::const_iterator itFound;
1238 
1239  if (hint < end_paths_.size()) {
1240  itFound = end_paths_.begin() + hint;
1241  if (itFound->name() == iEndPathLabel)
1242  found = true;
1243  }
1244  if (!found) {
1245  // if the hint did not work, do it the slow way
1246  itFound = std::find_if(
1247  end_paths_.begin(),
1248  end_paths_.end(),
1249  std::bind(std::equal_to<std::string>(), iEndPathLabel, std::bind(&Path::name, std::placeholders::_1)));
1250  if (itFound != end_paths_.end())
1251  found = true;
1252  }
1253  if (found) {
1254  descriptions.reserve(itFound->size());
1255  for (size_t i = 0; i < itFound->size(); ++i) {
1256  descriptions.push_back(itFound->getWorker(i)->description());
1257  }
1258  }
1259  }
1260 
1261  static void fillModuleInPathSummary(Path const& path, size_t which, ModuleInPathSummary& sum) {
1262  sum.timesVisited += path.timesVisited(which);
1263  sum.timesPassed += path.timesPassed(which);
1264  sum.timesFailed += path.timesFailed(which);
1265  sum.timesExcept += path.timesExcept(which);
1266  sum.moduleLabel = path.getWorker(which)->description()->moduleLabel();
1267  sum.bitPosition = path.bitPosition(which);
1268  }
1269 
1270  static void fillPathSummary(Path const& path, PathSummary& sum) {
1271  sum.name = path.name();
1272  sum.bitPosition = path.bitPosition();
1273  sum.timesRun += path.timesRun();
1274  sum.timesPassed += path.timesPassed();
1275  sum.timesFailed += path.timesFailed();
1276  sum.timesExcept += path.timesExcept();
1277 
1278  Path::size_type sz = path.size();
1279  if (sum.moduleInPathSummaries.empty()) {
1280  std::vector<ModuleInPathSummary> temp(sz);
1281  for (size_t i = 0; i != sz; ++i) {
1283  }
1284  sum.moduleInPathSummaries.swap(temp);
1285  } else {
1286  assert(sz == sum.moduleInPathSummaries.size());
1287  for (size_t i = 0; i != sz; ++i) {
1289  }
1290  }
1291  }
1292 
1293  static void fillWorkerSummaryAux(Worker const& w, WorkerSummary& sum) {
1294  sum.timesVisited += w.timesVisited();
1295  sum.timesRun += w.timesRun();
1296  sum.timesPassed += w.timesPassed();
1297  sum.timesFailed += w.timesFailed();
1298  sum.timesExcept += w.timesExcept();
1299  sum.moduleLabel = w.description()->moduleLabel();
1300  }
1301 
1302  static void fillWorkerSummary(Worker const* pw, WorkerSummary& sum) { fillWorkerSummaryAux(*pw, sum); }
1303 
1305  rep.eventSummary.totalEvents += totalEvents();
1306  rep.eventSummary.totalEventsPassed += totalEventsPassed();
1307  rep.eventSummary.totalEventsFailed += totalEventsFailed();
1308 
1309  fill_summary(trig_paths_, rep.trigPathSummaries, &fillPathSummary);
1310  fill_summary(end_paths_, rep.endPathSummaries, &fillPathSummary);
1311  fill_summary(allWorkers(), rep.workerSummaries, &fillWorkerSummary);
1312  }
1313 
1315  using std::placeholders::_1;
1317  for_all(trig_paths_, std::bind(&Path::clearCounters, _1));
1318  for_all(end_paths_, std::bind(&Path::clearCounters, _1));
1319  for_all(allWorkers(), std::bind(&Worker::clearCounters, _1));
1320  }
1321 
1323  skippingEvent_ = false;
1324  results_->reset();
1325  }
1326 
1328 
1330  //must be sure we have cleared the count first
1331  for (auto& count : earlyDeleteBranchToCount_) {
1332  count.count = 0;
1333  }
1334  //now reset based on how many helpers use that branch
1336  ++(earlyDeleteBranchToCount_[index].count);
1337  }
1338  for (auto& helper : earlyDeleteHelpers_) {
1339  helper.reset();
1340  }
1341  }
1342 
1344  std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
1345  std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
1347  int bitpos = 0;
1348  unsigned int indexEmpty = 0;
1349  unsigned int indexOfPath = 0;
1350  for (auto& pathStatusInserter : pathStatusInserters) {
1351  std::shared_ptr<PathStatusInserter> inserterPtr = get_underlying(pathStatusInserter);
1352  WorkerPtr workerPtr(
1353  new edm::WorkerT<PathStatusInserter::ModuleType>(inserterPtr, inserterPtr->moduleDescription(), &actions));
1354  pathStatusInserterWorkers_.emplace_back(workerPtr);
1355  workerPtr->setActivityRegistry(actReg_);
1356  addToAllWorkers(workerPtr.get());
1357 
1358  // A little complexity here because a C++ Path object is not
1359  // instantiated and put into end_paths if there are no modules
1360  // on the configured path.
1361  if (indexEmpty < empty_trig_paths_.size() && bitpos == empty_trig_paths_.at(indexEmpty)) {
1362  ++indexEmpty;
1363  } else {
1364  trig_paths_.at(indexOfPath).setPathStatusInserter(inserterPtr.get(), workerPtr.get());
1365  ++indexOfPath;
1366  }
1367  ++bitpos;
1368  }
1369 
1370  bitpos = 0;
1371  indexEmpty = 0;
1372  indexOfPath = 0;
1373  for (auto& endPathStatusInserter : endPathStatusInserters) {
1374  std::shared_ptr<EndPathStatusInserter> inserterPtr = get_underlying(endPathStatusInserter);
1375  WorkerPtr workerPtr(
1376  new edm::WorkerT<EndPathStatusInserter::ModuleType>(inserterPtr, inserterPtr->moduleDescription(), &actions));
1377  endPathStatusInserterWorkers_.emplace_back(workerPtr);
1378  workerPtr->setActivityRegistry(actReg_);
1379  addToAllWorkers(workerPtr.get());
1380 
1381  // A little complexity here because a C++ Path object is not
1382  // instantiated and put into end_paths if there are no modules
1383  // on the configured path.
1384  if (indexEmpty < empty_end_paths_.size() && bitpos == empty_end_paths_.at(indexEmpty)) {
1385  ++indexEmpty;
1386  } else {
1387  end_paths_.at(indexOfPath).setPathStatusInserter(nullptr, workerPtr.get());
1388  ++indexOfPath;
1389  }
1390  ++bitpos;
1391  }
1392  }
1393 } // namespace edm
static void fillModuleInPathSummary(Path const &path, size_t which, ModuleInPathSummary &sum)
size
Write out results.
void initializeEarlyDelete(ModuleRegistry &modReg, std::vector< std::string > const &branchesToDeleteEarly, std::multimap< std::string, std::string > const &referencesToBranches, std::vector< std::string > const &modulesToSkip, edm::ProductRegistry const &preg)
void moduleDescriptionsInEndPath(std::string const &iEndPathLabel, std::vector< ModuleDescription const *> &descriptions, unsigned int hint) const
ExceptionToActionTable const & actionTable() const
returns the action table
T getParameter(std::string const &) const
Definition: ParameterSet.h:303
void fillTrigPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, TrigResPtr, std::vector< std::string > const &endPathNames, ConditionalTaskHelper const &conditionalTaskHelper)
static const TGPicture * info(bool iBackgroundIsBlack)
#define CMS_SA_ALLOW
std::vector< int > empty_trig_paths_
Definition: helper.py:1
roAction_t actions[nactions]
Definition: GenABIO.cc:181
std::vector< edm::propagate_const< WorkerPtr > > pathStatusInserterWorkers_
void endStream(StreamID iID, StreamContext &streamContext)
virtual void replaceModuleFor(Worker *) const =0
ProductList const & productList() const
std::unordered_multimap< std::string, AliasInfo > const & aliasMap() const
void processAccumulatorsAsync(WaitingTaskHolder, typename T::TransitionInfoType const &, ServiceToken const &, StreamID, ParentContext const &, typename T::Context const *)
std::array< SelectedProducts, NumBranchTypes > SelectedProductsForBranchType
static PFTauRenderPlugin instance
T w() const
std::shared_ptr< HLTGlobalStatus > TrigResPtr
WorkersInPath::size_type size_type
Definition: Path.h:46
int totalEventsPassed() const
ret
prodAgent to be discontinued
void modulesInPath(std::string const &iPathLabel, std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel ...
void processOneEventAsync(WaitingTaskHolder iTask, EventTransitionInfo &, ServiceToken const &token, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters)
std::vector< BranchToCount > earlyDeleteBranchToCount_
std::unordered_multimap< std::string, AliasInfo > aliasMap_
void setupOnDemandSystem(EventTransitionInfo const &)
void clearCounters()
Definition: Worker.h:232
void addContextAndPrintException(char const *context, cms::Exception &ex, bool disablePrint)
void addToAllWorkers(Worker *w)
std::string const & moduleName() const
StreamSchedule(std::shared_ptr< TriggerResultInserter > inserter, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, std::shared_ptr< ModuleRegistry >, ParameterSet &proc_pset, service::TriggerNamesService const &tns, PreallocationConfiguration const &prealloc, ProductRegistry &pregistry, ExceptionToActionTable const &actions, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, StreamID streamID, ProcessContext const *processContext)
std::vector< Worker * > tryToPlaceConditionalModules(Worker *, std::unordered_set< std::string > &conditionalModules, std::unordered_multimap< std::string, edm::BranchDescription const *> const &conditionalModuleBranches, std::unordered_multimap< std::string, AliasInfo > const &aliasMap, ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration)
edm::propagate_const< WorkerPtr > results_inserter_
void deleteModuleIfExists(std::string const &moduleLabel)
std::shared_ptr< Worker > WorkerPtr
void find(edm::Handle< EcalRecHitCollection > &hits, DetId thisDet, std::vector< EcalRecHitCollection::const_iterator > &hit, bool debug=false)
Definition: FindCaloHit.cc:19
assert(be >=bs)
StreamID streamID() const
ConditionalTaskHelper(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, WorkerManager &workerManager, std::vector< std::string > const &trigPathNames)
unsigned int number_of_unscheduled_modules_
int totalEventsFailed() const
TEMPL(T2) struct Divides void
Definition: Factorize.h:24
WorkerManager workerManager_
std::shared_ptr< ActivityRegistry > actReg_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
T getUntrackedParameter(std::string const &, T const &) const
constexpr element_type const * get() const
std::string moduleLabel
Definition: TriggerReport.h:52
oneapi::tbb::task_group * group() const noexcept
void processEDAliases(std::vector< std::string > const &aliasNamesToProcess, std::unordered_set< std::string > const &aliasModulesToProcess, ParameterSet const &proc_pset, std::string const &processName, ProductRegistry &preg)
char const * label
std::vector< WorkerInPath > PathWorkers
std::vector< int > empty_end_paths_
void doneWaiting(std::exception_ptr iExcept)
accept
Definition: HLTenums.h:18
void fillEndPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, std::vector< std::string > const &endPathNames, ConditionalTaskHelper const &conditionalTaskHelper)
void processSwitchEDAliases(ParameterSet const &proc_pset, ProductRegistry &preg, ProcessConfiguration const &processConfiguration, std::unordered_set< std::string > const &allConditionalMods)
std::string name
Definition: TriggerReport.h:41
void moduleDescriptionsInPath(std::string const &iPathLabel, std::vector< ModuleDescription const *> &descriptions, unsigned int hint) const
TrigResConstPtr results() const
Strings const & getTrigPaths() const
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
def unique(seq, keepstr=True)
Definition: tier0.py:24
Definition: Path.h:41
StreamContext streamContext_
std::vector< std::string > vstring
virtual std::vector< ConsumesInfo > consumesInfo() const =0
static void fillPathSummary(Path const &path, PathSummary &sum)
void finishedPaths(std::atomic< std::exception_ptr *> &, WaitingTaskHolder, EventTransitionInfo &)
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:92
bool typeIsViewCompatible(TypeID const &requestedViewType, TypeID const &wrappedtypeID, std::string const &className)
rep
Definition: cuy.py:1189
std::exception_ptr finishProcessOneEvent(std::exception_ptr)
ServiceToken lock() const
Definition: ServiceToken.h:101
void deleteModule(std::string const &iLabel)
Delete the module with label iLabel.
int totalEvents() const
static void fillWorkerSummaryAux(Worker const &w, WorkerSummary &sum)
virtual Types moduleType() const =0
Log< level::Info, false > LogInfo
void beginStream(StreamID iID, StreamContext &streamContext)
ModuleDescription const * description() const
Definition: Worker.h:198
std::string const & className() const
Definition: TypeID.cc:40
void clearCounters()
Clear all the counters in the trigger report.
void fillAliasMap(ParameterSet const &proc_pset, std::unordered_set< std::string > const &allConditionalMods)
Strings const & getEndPaths() const
void getTriggerReport(TriggerReport &rep) const
void addToUnscheduledWorkers(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string label, std::set< std::string > &unscheduledLabels, std::vector< std::string > &shouldBeUsedLabels)
exception_actions::ActionCodes find(const std::string &category) const
std::vector< unsigned int > earlyDeleteHelperToBranchIndicies_
void forAllModuleHolders(F iFunc)
edm::propagate_const< TrigResPtr > results_
bool search_all(ForwardSequence const &s, Datum const &d)
Definition: Algorithms.h:36
double b
Definition: hdecay.h:118
constexpr T & get_underlying(propagate_const< T > &)
void addContext(std::string const &context)
Definition: Exception.cc:165
virtual std::unique_ptr< OutputModuleCommunicator > createOutputModuleCommunicator()=0
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
static void fillWorkerSummary(Worker const *pw, WorkerSummary &sum)
std::string const & processName() const
std::atomic< bool > skippingEvent_
Definition: plugin.cc:23
HLT enums.
void setupResolvers(Principal &principal)
void replaceModule(maker::ModuleHolder *iMod, std::string const &iLabel)
clone the type of module with label iLabel but configure with iPSet.
void fillWorkers(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &name, bool ignoreFilters, PathWorkers &out, std::vector< std::string > const &endPathNames, ConditionalTaskHelper const &conditionalTaskHelper)
std::vector< edm::propagate_const< WorkerPtr > > endPathStatusInserterWorkers_
void makePathStatusInserters(std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, ExceptionToActionTable const &actions)
def which(cmd)
Definition: eostools.py:336
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
std::unordered_multimap< std::string, edm::BranchDescription const * > conditionalModuleBranches(std::unordered_set< std::string > const &conditionalmods) const
std::vector< ModuleInPathSummary > moduleInPathSummaries
Definition: TriggerReport.h:42
void availablePaths(std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill the labels for all paths in the process
unsigned int value() const
Definition: StreamID.h:43
auto wrap(F iFunc) -> decltype(iFunc())
std::unordered_multimap< std::string, edm::BranchDescription const * > conditionalModsBranches_
Log< level::Warning, false > LogWarning
std::list< std::string > const & context() const
Definition: Exception.cc:147
long double T
std::string const & moduleLabel() const
T mod(const T &a, const T &b)
Definition: ecalDccMap.h:4
std::string const & name() const
Definition: Path.h:74
std::vector< std::string > vstring
Definition: Schedule.cc:482
std::vector< std::string > set_difference(std::vector< std::string > const &v1, std::vector< std::string > const &v2)
def move(src, dest)
Definition: eostools.py:511
void clearCounters()
Definition: Path.cc:198
std::vector< EarlyDeleteHelper > earlyDeleteHelpers_
void addToAllWorkers(Worker *w)
void printCmsExceptionWarning(char const *behavior, cms::Exception const &e)
ParameterSet * getPSetForUpdate(std::string const &name, bool &isTracked)
std::vector< std::string > set_intersection(std::vector< std::string > const &v1, std::vector< std::string > const &v2)
unsigned transform(const HcalDetId &id, unsigned transformCode)