CMS 3D CMS Logo

StreamSchedule.cc
Go to the documentation of this file.
2 
29 
31 #include "processEDAliases.h"
32 
33 #include <algorithm>
34 #include <cassert>
35 #include <cstdlib>
36 #include <functional>
37 #include <iomanip>
38 #include <limits>
39 #include <list>
40 #include <map>
41 #include <exception>
42 
43 namespace edm {
44 
45  namespace {
46 
47  // Function template to transform each element in the input range to
48  // a value placed into the output range. The supplied function
49  // should take a const_reference to the 'input', and write to a
50  // reference to the 'output'.
51  template <typename InputIterator, typename ForwardIterator, typename Func>
52  void transform_into(InputIterator begin, InputIterator end, ForwardIterator out, Func func) {
53  for (; begin != end; ++begin, ++out)
54  func(*begin, *out);
55  }
56 
57  // Function template that takes a sequence 'from', a sequence
58  // 'to', and a callable object 'func'. It and applies
59  // transform_into to fill the 'to' sequence with the values
60  // calcuated by the callable object, taking care to fill the
61  // outupt only if all calls succeed.
62  template <typename FROM, typename TO, typename FUNC>
63  void fill_summary(FROM const& from, TO& to, FUNC func) {
64  if (to.size() != from.size()) {
65  TO temp(from.size());
66  transform_into(from.begin(), from.end(), temp.begin(), func);
67  to.swap(temp);
68  } else {
69  transform_into(from.begin(), from.end(), to.begin(), func);
70  }
71  }
72 
73  // -----------------------------
74 
75  // Here we make the trigger results inserter directly. This should
76  // probably be a utility in the WorkerRegistry or elsewhere.
77 
78  StreamSchedule::WorkerPtr makeInserter(ExceptionToActionTable const& actions,
79  std::shared_ptr<ActivityRegistry> areg,
80  std::shared_ptr<TriggerResultInserter> inserter) {
82  new edm::WorkerT<TriggerResultInserter::ModuleType>(inserter, inserter->moduleDescription(), &actions));
83  ptr->setActivityRegistry(areg);
84  return ptr;
85  }
86 
87  void initializeBranchToReadingWorker(std::vector<std::string> const& branchesToDeleteEarly,
88  ProductRegistry const& preg,
89  std::multimap<std::string, Worker*>& branchToReadingWorker) {
90  auto vBranchesToDeleteEarly = branchesToDeleteEarly;
91  // Remove any duplicates
92  std::sort(vBranchesToDeleteEarly.begin(), vBranchesToDeleteEarly.end(), std::less<std::string>());
93  vBranchesToDeleteEarly.erase(std::unique(vBranchesToDeleteEarly.begin(), vBranchesToDeleteEarly.end()),
94  vBranchesToDeleteEarly.end());
95 
96  // Are the requested items in the product registry?
97  auto allBranchNames = preg.allBranchNames();
98  //the branch names all end with a period, which we do not want to compare with
99  for (auto& b : allBranchNames) {
100  b.resize(b.size() - 1);
101  }
102  std::sort(allBranchNames.begin(), allBranchNames.end(), std::less<std::string>());
103  std::vector<std::string> temp;
104  temp.reserve(vBranchesToDeleteEarly.size());
105 
106  std::set_intersection(vBranchesToDeleteEarly.begin(),
107  vBranchesToDeleteEarly.end(),
108  allBranchNames.begin(),
109  allBranchNames.end(),
110  std::back_inserter(temp));
111  vBranchesToDeleteEarly.swap(temp);
112  if (temp.size() != vBranchesToDeleteEarly.size()) {
113  std::vector<std::string> missingProducts;
114  std::set_difference(temp.begin(),
115  temp.end(),
116  vBranchesToDeleteEarly.begin(),
117  vBranchesToDeleteEarly.end(),
118  std::back_inserter(missingProducts));
119  LogInfo l("MissingProductsForCanDeleteEarly");
120  l << "The following products in the 'canDeleteEarly' list are not available in this job and will be ignored.";
121  for (auto const& n : missingProducts) {
122  l << "\n " << n;
123  }
124  }
125  //set placeholder for the branch, we will remove the nullptr if a
126  // module actually wants the branch.
127  for (auto const& branch : vBranchesToDeleteEarly) {
128  branchToReadingWorker.insert(std::make_pair(branch, static_cast<Worker*>(nullptr)));
129  }
130  }
131 
132  Worker* getWorker(std::string const& moduleLabel,
133  ParameterSet& proc_pset,
134  WorkerManager& workerManager,
135  ProductRegistry& preg,
136  PreallocationConfiguration const* prealloc,
137  std::shared_ptr<ProcessConfiguration const> processConfiguration) {
138  bool isTracked;
139  ParameterSet* modpset = proc_pset.getPSetForUpdate(moduleLabel, isTracked);
140  if (modpset == nullptr) {
141  return nullptr;
142  }
143  assert(isTracked);
144 
145  return workerManager.getWorker(*modpset, preg, prealloc, processConfiguration, moduleLabel);
146  }
147 
148  // If ConditionalTask modules exist in the container of module
149  // names, returns the range (std::pair) for the modules. The range
150  // excludes the special markers '#' (right before the
151  // ConditionalTask modules) and '@' (last element).
152  // If the module name container does not contain ConditionalTask
153  // modules, returns std::pair of end iterators.
154  template <typename T>
155  auto findConditionalTaskModulesRange(T& modnames) {
156  auto beg = std::find(modnames.begin(), modnames.end(), "#");
157  if (beg == modnames.end()) {
158  return std::pair(modnames.end(), modnames.end());
159  }
160  return std::pair(beg + 1, std::prev(modnames.end()));
161  }
162 
163  std::optional<std::string> findBestMatchingAlias(
164  std::unordered_multimap<std::string, edm::BranchDescription const*> const& conditionalModuleBranches,
165  std::unordered_multimap<std::string, StreamSchedule::AliasInfo> const& aliasMap,
166  std::string const& productModuleLabel,
167  ConsumesInfo const& consumesInfo) {
168  std::optional<std::string> best;
169  int wildcardsInBest = std::numeric_limits<int>::max();
170  bool bestIsAmbiguous = false;
171 
172  auto updateBest = [&best, &wildcardsInBest, &bestIsAmbiguous](
173  std::string const& label, bool instanceIsWildcard, bool typeIsWildcard) {
174  int const wildcards = static_cast<int>(instanceIsWildcard) + static_cast<int>(typeIsWildcard);
175  if (wildcards == 0) {
176  bestIsAmbiguous = false;
177  return true;
178  }
179  if (not best or wildcards < wildcardsInBest) {
180  best = label;
181  wildcardsInBest = wildcards;
182  bestIsAmbiguous = false;
183  } else if (best and *best != label and wildcardsInBest == wildcards) {
184  bestIsAmbiguous = true;
185  }
186  return false;
187  };
188 
189  auto findAlias = aliasMap.equal_range(productModuleLabel);
190  for (auto it = findAlias.first; it != findAlias.second; ++it) {
191  std::string const& aliasInstanceLabel =
192  it->second.instanceLabel != "*" ? it->second.instanceLabel : it->second.originalInstanceLabel;
193  bool const instanceIsWildcard = (aliasInstanceLabel == "*");
194  if (instanceIsWildcard or consumesInfo.instance() == aliasInstanceLabel) {
195  bool const typeIsWildcard = it->second.friendlyClassName == "*";
196  if (typeIsWildcard or (consumesInfo.type().friendlyClassName() == it->second.friendlyClassName)) {
197  if (updateBest(it->second.originalModuleLabel, instanceIsWildcard, typeIsWildcard)) {
198  return it->second.originalModuleLabel;
199  }
200  } else if (consumesInfo.kindOfType() == ELEMENT_TYPE) {
201  //consume is a View so need to do more intrusive search
202  //find matching branches in module
203  auto branches = conditionalModuleBranches.equal_range(productModuleLabel);
204  for (auto itBranch = branches.first; itBranch != branches.second; ++it) {
205  if (typeIsWildcard or itBranch->second->productInstanceName() == it->second.originalInstanceLabel) {
206  if (productholderindexhelper::typeIsViewCompatible(consumesInfo.type(),
207  TypeID(itBranch->second->wrappedType().typeInfo()),
208  itBranch->second->className())) {
209  if (updateBest(it->second.originalModuleLabel, instanceIsWildcard, typeIsWildcard)) {
210  return it->second.originalModuleLabel;
211  }
212  }
213  }
214  }
215  }
216  }
217  }
218  if (bestIsAmbiguous) {
220  << "Encountered ambiguity when trying to find a best-matching alias for\n"
221  << " friendly class name " << consumesInfo.type().friendlyClassName() << "\n"
222  << " module label " << productModuleLabel << "\n"
223  << " product instance name " << consumesInfo.instance() << "\n"
224  << "when processing EDAliases for modules in ConditionalTasks. Two aliases have the same number of "
225  "wildcards ("
226  << wildcardsInBest << ")";
227  }
228  return best;
229  }
230  } // namespace
231 
232  // -----------------------------
233 
234  typedef std::vector<std::string> vstring;
235 
236  // -----------------------------
237 
239  public:
241 
243  ProductRegistry& preg,
244  PreallocationConfiguration const* prealloc,
245  std::shared_ptr<ProcessConfiguration const> processConfiguration,
246  WorkerManager& workerManager,
247  std::vector<std::string> const& trigPathNames) {
248  std::unordered_set<std::string> allConditionalMods;
249  for (auto const& pathName : trigPathNames) {
250  auto const modnames = proc_pset.getParameter<vstring>(pathName);
251 
252  //Pull out ConditionalTask modules
253  auto condRange = findConditionalTaskModulesRange(modnames);
254  if (condRange.first == condRange.second)
255  continue;
256 
257  //the last entry should be ignored since it is required to be "@"
258  allConditionalMods.insert(condRange.first, condRange.second);
259  }
260 
261  for (auto const& cond : allConditionalMods) {
262  //force the creation of the conditional modules so alias check can work
263  (void)getWorker(cond, proc_pset, workerManager, preg, prealloc, processConfiguration);
264  }
265 
266  fillAliasMap(proc_pset, allConditionalMods);
267  processSwitchEDAliases(proc_pset, preg, *processConfiguration, allConditionalMods);
268 
269  //find branches created by the conditional modules
270  for (auto const& prod : preg.productList()) {
271  if (allConditionalMods.find(prod.first.moduleLabel()) != allConditionalMods.end()) {
272  conditionalModsBranches_.emplace(prod.first.moduleLabel(), &prod.second);
273  }
274  }
275  }
276 
277  std::unordered_multimap<std::string, AliasInfo> const& aliasMap() const { return aliasMap_; }
278 
279  std::unordered_multimap<std::string, edm::BranchDescription const*> conditionalModuleBranches(
280  std::unordered_set<std::string> const& conditionalmods) const {
281  std::unordered_multimap<std::string, edm::BranchDescription const*> ret;
282  for (auto const& mod : conditionalmods) {
283  auto range = conditionalModsBranches_.equal_range(mod);
284  ret.insert(range.first, range.second);
285  }
286  return ret;
287  }
288 
289  private:
290  void fillAliasMap(ParameterSet const& proc_pset, std::unordered_set<std::string> const& allConditionalMods) {
291  auto aliases = proc_pset.getParameter<std::vector<std::string>>("@all_aliases");
292  std::string const star("*");
293  for (auto const& alias : aliases) {
294  auto info = proc_pset.getParameter<edm::ParameterSet>(alias);
295  auto aliasedToModuleLabels = info.getParameterNames();
296  for (auto const& mod : aliasedToModuleLabels) {
297  if (not mod.empty() and mod[0] != '@' and allConditionalMods.find(mod) != allConditionalMods.end()) {
298  auto aliasVPSet = info.getParameter<std::vector<edm::ParameterSet>>(mod);
299  for (auto const& aliasPSet : aliasVPSet) {
300  std::string type = star;
301  std::string instance = star;
302  std::string originalInstance = star;
303  if (aliasPSet.exists("type")) {
304  type = aliasPSet.getParameter<std::string>("type");
305  }
306  if (aliasPSet.exists("toProductInstance")) {
307  instance = aliasPSet.getParameter<std::string>("toProductInstance");
308  }
309  if (aliasPSet.exists("fromProductInstance")) {
310  originalInstance = aliasPSet.getParameter<std::string>("fromProductInstance");
311  }
312 
313  aliasMap_.emplace(alias, AliasInfo{type, instance, originalInstance, mod});
314  }
315  }
316  }
317  }
318  }
319 
320  void processSwitchEDAliases(ParameterSet const& proc_pset,
321  ProductRegistry& preg,
322  ProcessConfiguration const& processConfiguration,
323  std::unordered_set<std::string> const& allConditionalMods) {
324  auto const& all_modules = proc_pset.getParameter<std::vector<std::string>>("@all_modules");
325  std::vector<std::string> switchEDAliases;
326  for (auto const& module : all_modules) {
327  auto const& mod_pset = proc_pset.getParameter<edm::ParameterSet>(module);
328  if (mod_pset.getParameter<std::string>("@module_type") == "SwitchProducer") {
329  auto const& all_cases = mod_pset.getParameter<std::vector<std::string>>("@all_cases");
330  for (auto const& case_label : all_cases) {
331  auto range = aliasMap_.equal_range(case_label);
332  if (range.first != range.second) {
333  switchEDAliases.push_back(case_label);
334  }
335  }
336  }
337  }
339  switchEDAliases, allConditionalMods, proc_pset, processConfiguration.processName(), preg);
340  }
341 
342  std::unordered_multimap<std::string, AliasInfo> aliasMap_;
343  std::unordered_multimap<std::string, edm::BranchDescription const*> conditionalModsBranches_;
344  };
345 
346  // -----------------------------
347 
349  std::shared_ptr<TriggerResultInserter> inserter,
350  std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
351  std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
352  std::shared_ptr<ModuleRegistry> modReg,
353  ParameterSet& proc_pset,
354  service::TriggerNamesService const& tns,
355  PreallocationConfiguration const& prealloc,
356  ProductRegistry& preg,
358  std::shared_ptr<ActivityRegistry> areg,
359  std::shared_ptr<ProcessConfiguration const> processConfiguration,
360  StreamID streamID,
361  ProcessContext const* processContext)
362  : workerManager_(modReg, areg, actions),
363  actReg_(areg),
364  results_(new HLTGlobalStatus(tns.getTrigPaths().size())),
365  results_inserter_(),
366  trig_paths_(),
367  end_paths_(),
368  total_events_(),
369  total_passed_(),
370  number_of_unscheduled_modules_(0),
371  streamID_(streamID),
372  streamContext_(streamID_, processContext),
373  skippingEvent_(false) {
374  bool hasPath = false;
375  std::vector<std::string> const& pathNames = tns.getTrigPaths();
376  std::vector<std::string> const& endPathNames = tns.getEndPaths();
377 
378  ConditionalTaskHelper conditionalTaskHelper(
379  proc_pset, preg, &prealloc, processConfiguration, workerManager_, pathNames);
380 
381  int trig_bitpos = 0;
382  trig_paths_.reserve(pathNames.size());
383  for (auto const& trig_name : pathNames) {
384  fillTrigPath(proc_pset,
385  preg,
386  &prealloc,
387  processConfiguration,
388  trig_bitpos,
389  trig_name,
390  results(),
391  endPathNames,
392  conditionalTaskHelper);
393  ++trig_bitpos;
394  hasPath = true;
395  }
396 
397  if (hasPath) {
398  // the results inserter stands alone
399  inserter->setTrigResultForStream(streamID.value(), results());
400 
401  results_inserter_ = makeInserter(actions, actReg_, inserter);
403  }
404 
405  // fill normal endpaths
406  int bitpos = 0;
407  end_paths_.reserve(endPathNames.size());
408  for (auto const& end_path_name : endPathNames) {
409  fillEndPath(
410  proc_pset, preg, &prealloc, processConfiguration, bitpos, end_path_name, endPathNames, conditionalTaskHelper);
411  ++bitpos;
412  }
413 
414  makePathStatusInserters(pathStatusInserters, endPathStatusInserters, actions);
415 
416  //See if all modules were used
417  std::set<std::string> usedWorkerLabels;
418  for (auto const& worker : allWorkers()) {
419  usedWorkerLabels.insert(worker->description()->moduleLabel());
420  }
421  std::vector<std::string> modulesInConfig(proc_pset.getParameter<std::vector<std::string>>("@all_modules"));
422  std::set<std::string> modulesInConfigSet(modulesInConfig.begin(), modulesInConfig.end());
423  std::vector<std::string> unusedLabels;
424  set_difference(modulesInConfigSet.begin(),
425  modulesInConfigSet.end(),
426  usedWorkerLabels.begin(),
427  usedWorkerLabels.end(),
428  back_inserter(unusedLabels));
429  std::set<std::string> unscheduledLabels;
430  std::vector<std::string> shouldBeUsedLabels;
431  if (!unusedLabels.empty()) {
432  //Need to
433  // 1) create worker
434  // 2) if it is a WorkerT<EDProducer>, add it to our list
435  // 3) hand list to our delayed reader
436  for (auto const& label : unusedLabels) {
437  bool isTracked;
438  ParameterSet* modulePSet(proc_pset.getPSetForUpdate(label, isTracked));
439  assert(isTracked);
440  assert(modulePSet != nullptr);
442  *modulePSet, preg, &prealloc, processConfiguration, label, unscheduledLabels, shouldBeUsedLabels);
443  }
444  if (!shouldBeUsedLabels.empty()) {
445  std::ostringstream unusedStream;
446  unusedStream << "'" << shouldBeUsedLabels.front() << "'";
447  for (std::vector<std::string>::iterator itLabel = shouldBeUsedLabels.begin() + 1,
448  itLabelEnd = shouldBeUsedLabels.end();
449  itLabel != itLabelEnd;
450  ++itLabel) {
451  unusedStream << ",'" << *itLabel << "'";
452  }
453  LogInfo("path") << "The following module labels are not assigned to any path:\n" << unusedStream.str() << "\n";
454  }
455  }
456  number_of_unscheduled_modules_ = unscheduledLabels.size();
457  } // StreamSchedule::StreamSchedule
458 
460  std::vector<std::string> const& branchesToDeleteEarly,
461  edm::ProductRegistry const& preg) {
462  // setup the list with those products actually registered for this job
463  std::multimap<std::string, Worker*> branchToReadingWorker;
464  initializeBranchToReadingWorker(branchesToDeleteEarly, preg, branchToReadingWorker);
465 
466  const std::vector<std::string> kEmpty;
467  std::map<Worker*, unsigned int> reserveSizeForWorker;
468  unsigned int upperLimitOnReadingWorker = 0;
469  unsigned int upperLimitOnIndicies = 0;
470  unsigned int nUniqueBranchesToDelete = branchToReadingWorker.size();
471 
472  //talk with output modules first
473  modReg.forAllModuleHolders([&branchToReadingWorker, &nUniqueBranchesToDelete](maker::ModuleHolder* iHolder) {
474  auto comm = iHolder->createOutputModuleCommunicator();
475  if (comm) {
476  if (!branchToReadingWorker.empty()) {
477  //If an OutputModule needs a product, we can't delete it early
478  // so we should remove it from our list
479  SelectedProductsForBranchType const& kept = comm->keptProducts();
480  for (auto const& item : kept[InEvent]) {
481  BranchDescription const& desc = *item.first;
482  auto found = branchToReadingWorker.equal_range(desc.branchName());
483  if (found.first != found.second) {
484  --nUniqueBranchesToDelete;
485  branchToReadingWorker.erase(found.first, found.second);
486  }
487  }
488  }
489  }
490  });
491 
492  if (branchToReadingWorker.empty()) {
493  return;
494  }
495 
496  for (auto w : allWorkers()) {
497  //determine if this module could read a branch we want to delete early
498  auto pset = pset::Registry::instance()->getMapped(w->description()->parameterSetID());
499  if (nullptr != pset) {
500  auto branches = pset->getUntrackedParameter<std::vector<std::string>>("mightGet", kEmpty);
501  if (not branches.empty()) {
502  ++upperLimitOnReadingWorker;
503  }
504  for (auto const& branch : branches) {
505  auto found = branchToReadingWorker.equal_range(branch);
506  if (found.first != found.second) {
507  ++upperLimitOnIndicies;
508  ++reserveSizeForWorker[w];
509  if (nullptr == found.first->second) {
510  found.first->second = w;
511  } else {
512  branchToReadingWorker.insert(make_pair(found.first->first, w));
513  }
514  }
515  }
516  }
517  }
518  {
519  auto it = branchToReadingWorker.begin();
520  std::vector<std::string> unusedBranches;
521  while (it != branchToReadingWorker.end()) {
522  if (it->second == nullptr) {
523  unusedBranches.push_back(it->first);
524  //erasing the object invalidates the iterator so must advance it first
525  auto temp = it;
526  ++it;
527  branchToReadingWorker.erase(temp);
528  } else {
529  ++it;
530  }
531  }
532  if (not unusedBranches.empty()) {
533  LogWarning l("UnusedProductsForCanDeleteEarly");
534  l << "The following products in the 'canDeleteEarly' list are not used in this job and will be ignored.\n"
535  " If possible, remove the producer from the job or add the product to the producer's own 'mightGet' list.";
536  for (auto const& n : unusedBranches) {
537  l << "\n " << n;
538  }
539  }
540  }
541  if (!branchToReadingWorker.empty()) {
542  earlyDeleteHelpers_.reserve(upperLimitOnReadingWorker);
543  earlyDeleteHelperToBranchIndicies_.resize(upperLimitOnIndicies, 0);
544  earlyDeleteBranchToCount_.reserve(nUniqueBranchesToDelete);
545  std::map<const Worker*, EarlyDeleteHelper*> alreadySeenWorkers;
546  std::string lastBranchName;
547  size_t nextOpenIndex = 0;
548  unsigned int* beginAddress = &(earlyDeleteHelperToBranchIndicies_.front());
549  for (auto& branchAndWorker : branchToReadingWorker) {
550  if (lastBranchName != branchAndWorker.first) {
551  //have to put back the period we removed earlier in order to get the proper name
552  BranchID bid(branchAndWorker.first + ".");
553  earlyDeleteBranchToCount_.emplace_back(bid, 0U);
554  lastBranchName = branchAndWorker.first;
555  }
556  auto found = alreadySeenWorkers.find(branchAndWorker.second);
557  if (alreadySeenWorkers.end() == found) {
558  //NOTE: we will set aside enough space in earlyDeleteHelperToBranchIndicies_ to accommodate
559  // all the branches that might be read by this worker. However, initially we will only tell the
560  // EarlyDeleteHelper about the first one. As additional branches are added via 'appendIndex' the
561  // EarlyDeleteHelper will automatically advance its internal end pointer.
562  size_t index = nextOpenIndex;
563  size_t nIndices = reserveSizeForWorker[branchAndWorker.second];
565  earlyDeleteHelpers_.emplace_back(beginAddress + index, beginAddress + index + 1, &earlyDeleteBranchToCount_);
566  branchAndWorker.second->setEarlyDeleteHelper(&(earlyDeleteHelpers_.back()));
567  alreadySeenWorkers.insert(std::make_pair(branchAndWorker.second, &(earlyDeleteHelpers_.back())));
568  nextOpenIndex += nIndices;
569  } else {
570  found->second->appendIndex(earlyDeleteBranchToCount_.size() - 1);
571  }
572  }
573 
574  //Now we can compactify the earlyDeleteHelperToBranchIndicies_ since we may have over estimated the
575  // space needed for each module
576  auto itLast = earlyDeleteHelpers_.begin();
577  for (auto it = earlyDeleteHelpers_.begin() + 1; it != earlyDeleteHelpers_.end(); ++it) {
578  if (itLast->end() != it->begin()) {
579  //figure the offset for next Worker since it hasn't been moved yet so it has the original address
580  unsigned int delta = it->begin() - itLast->end();
581  it->shiftIndexPointers(delta);
582 
584  earlyDeleteHelperToBranchIndicies_.begin() + (itLast->end() - beginAddress),
585  earlyDeleteHelperToBranchIndicies_.begin() + (it->begin() - beginAddress));
586  }
587  itLast = it;
588  }
590  earlyDeleteHelperToBranchIndicies_.begin() + (itLast->end() - beginAddress),
592 
593  //now tell the paths about the deleters
594  for (auto& p : trig_paths_) {
595  p.setEarlyDeleteHelpers(alreadySeenWorkers);
596  }
597  for (auto& p : end_paths_) {
598  p.setEarlyDeleteHelpers(alreadySeenWorkers);
599  }
601  }
602  }
603 
605  Worker* worker,
606  std::unordered_set<std::string>& conditionalModules,
607  std::unordered_multimap<std::string, edm::BranchDescription const*> const& conditionalModuleBranches,
608  std::unordered_multimap<std::string, AliasInfo> const& aliasMap,
609  ParameterSet& proc_pset,
610  ProductRegistry& preg,
611  PreallocationConfiguration const* prealloc,
612  std::shared_ptr<ProcessConfiguration const> processConfiguration) {
613  std::vector<Worker*> returnValue;
614  auto const& consumesInfo = worker->consumesInfo();
615  auto moduleLabel = worker->description()->moduleLabel();
616  using namespace productholderindexhelper;
617  for (auto const& ci : consumesInfo) {
618  if (not ci.skipCurrentProcess() and
619  (ci.process().empty() or ci.process() == processConfiguration->processName())) {
620  auto productModuleLabel = ci.label();
621  if (productModuleLabel.empty()) {
622  //this is a consumesMany request
623  for (auto const& branch : conditionalModuleBranches) {
624  //check that the conditional module has not been used
625  if (conditionalModules.find(branch.first) == conditionalModules.end()) {
626  continue;
627  }
628  if (ci.kindOfType() == edm::PRODUCT_TYPE) {
629  if (branch.second->unwrappedTypeID() != ci.type()) {
630  continue;
631  }
632  } else {
633  if (not typeIsViewCompatible(
634  ci.type(), TypeID(branch.second->wrappedType().typeInfo()), branch.second->className())) {
635  continue;
636  }
637  }
638 
639  auto condWorker = getWorker(branch.first, proc_pset, workerManager_, preg, prealloc, processConfiguration);
640  assert(condWorker);
641 
642  conditionalModules.erase(branch.first);
643 
644  auto dependents = tryToPlaceConditionalModules(condWorker,
645  conditionalModules,
646  conditionalModuleBranches,
647  aliasMap,
648  proc_pset,
649  preg,
650  prealloc,
651  processConfiguration);
652  returnValue.insert(returnValue.end(), dependents.begin(), dependents.end());
653  returnValue.push_back(condWorker);
654  }
655  } else {
656  //just a regular consumes
657  bool productFromConditionalModule = false;
658  auto itFound = conditionalModules.find(productModuleLabel);
659  if (itFound == conditionalModules.end()) {
660  //Check to see if this was an alias
661  //note that aliasMap was previously filtered so only the conditional modules remain there
662  auto foundAlias = findBestMatchingAlias(conditionalModuleBranches, aliasMap, productModuleLabel, ci);
663  if (foundAlias) {
664  productModuleLabel = *foundAlias;
665  productFromConditionalModule = true;
666  itFound = conditionalModules.find(productModuleLabel);
667  //check that the alias-for conditional module has not been used
668  if (itFound == conditionalModules.end()) {
669  continue;
670  }
671  }
672  } else {
673  //need to check the rest of the data product info
674  auto findBranches = conditionalModuleBranches.equal_range(productModuleLabel);
675  for (auto itBranch = findBranches.first; itBranch != findBranches.second; ++itBranch) {
676  if (itBranch->second->productInstanceName() == ci.instance()) {
677  if (ci.kindOfType() == PRODUCT_TYPE) {
678  if (ci.type() == itBranch->second->unwrappedTypeID()) {
679  productFromConditionalModule = true;
680  break;
681  }
682  } else {
683  //this is a view
684  if (typeIsViewCompatible(ci.type(),
685  TypeID(itBranch->second->wrappedType().typeInfo()),
686  itBranch->second->className())) {
687  productFromConditionalModule = true;
688  break;
689  }
690  }
691  }
692  }
693  }
694  if (productFromConditionalModule) {
695  auto condWorker =
696  getWorker(productModuleLabel, proc_pset, workerManager_, preg, prealloc, processConfiguration);
697  assert(condWorker);
698 
699  conditionalModules.erase(itFound);
700 
701  auto dependents = tryToPlaceConditionalModules(condWorker,
702  conditionalModules,
703  conditionalModuleBranches,
704  aliasMap,
705  proc_pset,
706  preg,
707  prealloc,
708  processConfiguration);
709  returnValue.insert(returnValue.end(), dependents.begin(), dependents.end());
710  returnValue.push_back(condWorker);
711  }
712  }
713  }
714  }
715  return returnValue;
716  }
717 
719  ProductRegistry& preg,
720  PreallocationConfiguration const* prealloc,
721  std::shared_ptr<ProcessConfiguration const> processConfiguration,
722  std::string const& pathName,
723  bool ignoreFilters,
724  PathWorkers& out,
725  std::vector<std::string> const& endPathNames,
726  ConditionalTaskHelper const& conditionalTaskHelper) {
727  vstring modnames = proc_pset.getParameter<vstring>(pathName);
728  PathWorkers tmpworkers;
729 
730  //Pull out ConditionalTask modules
731  auto condRange = findConditionalTaskModulesRange(modnames);
732 
733  std::unordered_set<std::string> conditionalmods;
734  //An EDAlias may be redirecting to a module on a ConditionalTask
735  std::unordered_multimap<std::string, edm::BranchDescription const*> conditionalModsBranches;
736  std::unordered_map<std::string, unsigned int> conditionalModOrder;
737  if (condRange.first != condRange.second) {
738  for (auto it = condRange.first; it != condRange.second; ++it) {
739  // ordering needs to skip the # token in the path list
740  conditionalModOrder.emplace(*it, it - modnames.begin() - 1);
741  }
742  //the last entry should be ignored since it is required to be "@"
743  conditionalmods = std::unordered_set<std::string>(std::make_move_iterator(condRange.first),
744  std::make_move_iterator(condRange.second));
745 
746  conditionalModsBranches = conditionalTaskHelper.conditionalModuleBranches(conditionalmods);
747  modnames.erase(std::prev(condRange.first), modnames.end());
748  }
749 
750  unsigned int placeInPath = 0;
751  for (auto const& name : modnames) {
752  //Modules except EDFilters are set to run concurrently by default
753  bool doNotRunConcurrently = false;
755  if (name[0] == '!') {
756  filterAction = WorkerInPath::Veto;
757  } else if (name[0] == '-' or name[0] == '+') {
758  filterAction = WorkerInPath::Ignore;
759  }
760  if (name[0] == '|' or name[0] == '+') {
761  //cms.wait was specified so do not run concurrently
762  doNotRunConcurrently = true;
763  }
764 
766  if (filterAction != WorkerInPath::Normal or name[0] == '|') {
767  moduleLabel.erase(0, 1);
768  }
769 
770  Worker* worker = getWorker(moduleLabel, proc_pset, workerManager_, preg, prealloc, processConfiguration);
771  if (worker == nullptr) {
772  std::string pathType("endpath");
773  if (!search_all(endPathNames, pathName)) {
774  pathType = std::string("path");
775  }
777  << "The unknown module label \"" << moduleLabel << "\" appears in " << pathType << " \"" << pathName
778  << "\"\n please check spelling or remove that label from the path.";
779  }
780 
781  if (ignoreFilters && filterAction != WorkerInPath::Ignore && worker->moduleType() == Worker::kFilter) {
782  // We have a filter on an end path, and the filter is not explicitly ignored.
783  // See if the filter is allowed.
784  std::vector<std::string> allowed_filters = proc_pset.getUntrackedParameter<vstring>("@filters_on_endpaths");
785  if (!search_all(allowed_filters, worker->description()->moduleName())) {
786  // Filter is not allowed. Ignore the result, and issue a warning.
787  filterAction = WorkerInPath::Ignore;
788  LogWarning("FilterOnEndPath") << "The EDFilter '" << worker->description()->moduleName()
789  << "' with module label '" << moduleLabel << "' appears on EndPath '"
790  << pathName << "'.\n"
791  << "The return value of the filter will be ignored.\n"
792  << "To suppress this warning, either remove the filter from the endpath,\n"
793  << "or explicitly ignore it in the configuration by using cms.ignore().\n";
794  }
795  }
796  bool runConcurrently = not doNotRunConcurrently;
797  if (runConcurrently && worker->moduleType() == Worker::kFilter and filterAction != WorkerInPath::Ignore) {
798  runConcurrently = false;
799  }
800 
801  auto condModules = tryToPlaceConditionalModules(worker,
802  conditionalmods,
803  conditionalModsBranches,
804  conditionalTaskHelper.aliasMap(),
805  proc_pset,
806  preg,
807  prealloc,
808  processConfiguration);
809  for (auto condMod : condModules) {
810  tmpworkers.emplace_back(
811  condMod, WorkerInPath::Ignore, conditionalModOrder[condMod->description()->moduleLabel()], true);
812  }
813 
814  tmpworkers.emplace_back(worker, filterAction, placeInPath, runConcurrently);
815  ++placeInPath;
816  }
817 
818  out.swap(tmpworkers);
819  }
820 
822  ProductRegistry& preg,
823  PreallocationConfiguration const* prealloc,
824  std::shared_ptr<ProcessConfiguration const> processConfiguration,
825  int bitpos,
826  std::string const& name,
827  TrigResPtr trptr,
828  std::vector<std::string> const& endPathNames,
829  ConditionalTaskHelper const& conditionalTaskHelper) {
830  PathWorkers tmpworkers;
831  fillWorkers(
832  proc_pset, preg, prealloc, processConfiguration, name, false, tmpworkers, endPathNames, conditionalTaskHelper);
833 
834  // an empty path will cause an extra bit that is not used
835  if (!tmpworkers.empty()) {
836  trig_paths_.emplace_back(bitpos,
837  name,
838  tmpworkers,
839  trptr,
840  actionTable(),
841  actReg_,
845  } else {
846  empty_trig_paths_.push_back(bitpos);
847  }
848  for (WorkerInPath const& workerInPath : tmpworkers) {
849  addToAllWorkers(workerInPath.getWorker());
850  }
851  }
852 
854  ProductRegistry& preg,
855  PreallocationConfiguration const* prealloc,
856  std::shared_ptr<ProcessConfiguration const> processConfiguration,
857  int bitpos,
858  std::string const& name,
859  std::vector<std::string> const& endPathNames,
860  ConditionalTaskHelper const& conditionalTaskHelper) {
861  PathWorkers tmpworkers;
862  fillWorkers(
863  proc_pset, preg, prealloc, processConfiguration, name, true, tmpworkers, endPathNames, conditionalTaskHelper);
864 
865  if (!tmpworkers.empty()) {
866  //EndPaths are not supposed to stop if SkipEvent type exception happens
867  end_paths_.emplace_back(bitpos,
868  name,
869  tmpworkers,
870  TrigResPtr(),
871  actionTable(),
872  actReg_,
874  nullptr,
876  } else {
877  empty_end_paths_.push_back(bitpos);
878  }
879  for (WorkerInPath const& workerInPath : tmpworkers) {
880  addToAllWorkers(workerInPath.getWorker());
881  }
882  }
883 
885 
887 
889  Worker* found = nullptr;
890  for (auto const& worker : allWorkers()) {
891  if (worker->description()->moduleLabel() == iLabel) {
892  found = worker;
893  break;
894  }
895  }
896  if (nullptr == found) {
897  return;
898  }
899 
900  iMod->replaceModuleFor(found);
901  found->beginStream(streamID_, streamContext_);
902  }
903 
905 
906  std::vector<ModuleDescription const*> StreamSchedule::getAllModuleDescriptions() const {
907  std::vector<ModuleDescription const*> result;
908  result.reserve(allWorkers().size());
909 
910  for (auto const& worker : allWorkers()) {
911  ModuleDescription const* p = worker->description();
912  result.push_back(p);
913  }
914  return result;
915  }
916 
918  WaitingTaskHolder iTask,
920  ServiceToken const& serviceToken,
921  std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters) {
922  EventPrincipal& ep = info.principal();
923 
924  // Caught exception is propagated via WaitingTaskHolder
925  CMS_SA_ALLOW try {
926  this->resetAll();
927 
929 
930  Traits::setStreamContext(streamContext_, ep);
931  //a service may want to communicate with another service
932  ServiceRegistry::Operate guard(serviceToken);
933  Traits::preScheduleSignal(actReg_.get(), &streamContext_);
934 
935  // Data dependencies need to be set up before marking empty
936  // (End)Paths complete in case something consumes the status of
937  // the empty (EndPath)
940 
941  HLTPathStatus hltPathStatus(hlt::Pass, 0);
942  for (int empty_trig_path : empty_trig_paths_) {
943  results_->at(empty_trig_path) = hltPathStatus;
944  pathStatusInserters[empty_trig_path]->setPathStatus(streamID_, hltPathStatus);
945  std::exception_ptr except = pathStatusInserterWorkers_[empty_trig_path]
948  if (except) {
949  iTask.doneWaiting(except);
950  return;
951  }
952  }
953  for (int empty_end_path : empty_end_paths_) {
954  std::exception_ptr except = endPathStatusInserterWorkers_[empty_end_path]
957  if (except) {
958  iTask.doneWaiting(except);
959  return;
960  }
961  }
962 
963  ++total_events_;
964 
965  //use to give priorities on an error to ones from Paths
966  auto pathErrorHolder = std::make_unique<std::atomic<std::exception_ptr*>>(nullptr);
967  auto pathErrorPtr = pathErrorHolder.get();
968  ServiceWeakToken weakToken = serviceToken;
969  auto allPathsDone = make_waiting_task(
970  [iTask, this, weakToken, pathError = std::move(pathErrorHolder)](std::exception_ptr const* iPtr) mutable {
971  ServiceRegistry::Operate operate(weakToken.lock());
972 
973  std::exception_ptr ptr;
974  if (pathError->load()) {
975  ptr = *pathError->load();
976  delete pathError->load();
977  }
978  if ((not ptr) and iPtr) {
979  ptr = *iPtr;
980  }
982  });
983  //The holder guarantees that if the paths finish before the loop ends
984  // that we do not start too soon. It also guarantees that the task will
985  // run under that condition.
986  WaitingTaskHolder allPathsHolder(*iTask.group(), allPathsDone);
987 
988  auto pathsDone = make_waiting_task([allPathsHolder, pathErrorPtr, transitionInfo = info, this, weakToken](
989  std::exception_ptr const* iPtr) mutable {
990  ServiceRegistry::Operate operate(weakToken.lock());
991 
992  if (iPtr) {
993  //this is used to prioritize this error over one
994  // that happens in EndPath or Accumulate
995  pathErrorPtr->store(new std::exception_ptr(*iPtr));
996  }
997  finishedPaths(*pathErrorPtr, std::move(allPathsHolder), transitionInfo);
998  });
999 
1000  //The holder guarantees that if the paths finish before the loop ends
1001  // that we do not start too soon. It also guarantees that the task will
1002  // run under that condition.
1003  WaitingTaskHolder taskHolder(*iTask.group(), pathsDone);
1004 
1005  //start end paths first so on single threaded the paths will run first
1006  WaitingTaskHolder hAllPathsDone(*iTask.group(), allPathsDone);
1007  for (auto it = end_paths_.rbegin(), itEnd = end_paths_.rend(); it != itEnd; ++it) {
1008  it->processOneOccurrenceAsync(hAllPathsDone, info, serviceToken, streamID_, &streamContext_);
1009  }
1010 
1011  for (auto it = trig_paths_.rbegin(), itEnd = trig_paths_.rend(); it != itEnd; ++it) {
1012  it->processOneOccurrenceAsync(taskHolder, info, serviceToken, streamID_, &streamContext_);
1013  }
1014 
1015  ParentContext parentContext(&streamContext_);
1017  hAllPathsDone, info, serviceToken, streamID_, parentContext, &streamContext_);
1018  } catch (...) {
1019  iTask.doneWaiting(std::current_exception());
1020  }
1021  }
1022 
1023  void StreamSchedule::finishedPaths(std::atomic<std::exception_ptr*>& iExcept,
1024  WaitingTaskHolder iWait,
1026  if (iExcept) {
1027  // Caught exception is propagated via WaitingTaskHolder
1028  CMS_SA_ALLOW try { std::rethrow_exception(*(iExcept.load())); } catch (cms::Exception& e) {
1033  edm::printCmsExceptionWarning("SkipEvent", e);
1034  *(iExcept.load()) = std::exception_ptr();
1035  } else {
1036  *(iExcept.load()) = std::current_exception();
1037  }
1038  } catch (...) {
1039  *(iExcept.load()) = std::current_exception();
1040  }
1041  }
1042 
1043  if ((not iExcept) and results_->accept()) {
1044  ++total_passed_;
1045  }
1046 
1047  if (nullptr != results_inserter_.get()) {
1048  // Caught exception is propagated to the caller
1049  CMS_SA_ALLOW try {
1050  //Even if there was an exception, we need to allow results inserter
1051  // to run since some module may be waiting on its results.
1052  ParentContext parentContext(&streamContext_);
1054 
1055  auto expt = results_inserter_->runModuleDirectly<Traits>(info, streamID_, parentContext, &streamContext_);
1056  if (expt) {
1057  std::rethrow_exception(expt);
1058  }
1059  } catch (cms::Exception& ex) {
1060  if (not iExcept) {
1061  if (ex.context().empty()) {
1062  std::ostringstream ost;
1063  ost << "Processing Event " << info.principal().id();
1064  ex.addContext(ost.str());
1065  }
1066  iExcept.store(new std::exception_ptr(std::current_exception()));
1067  }
1068  } catch (...) {
1069  if (not iExcept) {
1070  iExcept.store(new std::exception_ptr(std::current_exception()));
1071  }
1072  }
1073  }
1074  std::exception_ptr ptr;
1075  if (iExcept) {
1076  ptr = *iExcept.load();
1077  }
1078  iWait.doneWaiting(ptr);
1079  }
1080 
1081  std::exception_ptr StreamSchedule::finishProcessOneEvent(std::exception_ptr iExcept) {
1083 
1084  if (iExcept) {
1085  //add context information to the exception and print message
1086  try {
1087  convertException::wrap([&]() { std::rethrow_exception(iExcept); });
1088  } catch (cms::Exception& ex) {
1089  bool const cleaningUpAfterException = false;
1090  if (ex.context().empty()) {
1091  addContextAndPrintException("Calling function StreamSchedule::processOneEvent", ex, cleaningUpAfterException);
1092  } else {
1093  addContextAndPrintException("", ex, cleaningUpAfterException);
1094  }
1095  iExcept = std::current_exception();
1096  }
1097 
1098  actReg_->preStreamEarlyTerminationSignal_(streamContext_, TerminationOrigin::ExceptionFromThisContext);
1099  }
1100  // Caught exception is propagated to the caller
1101  CMS_SA_ALLOW try { Traits::postScheduleSignal(actReg_.get(), &streamContext_); } catch (...) {
1102  if (not iExcept) {
1103  iExcept = std::current_exception();
1104  }
1105  }
1106  if (not iExcept) {
1107  resetEarlyDelete();
1108  }
1109 
1110  return iExcept;
1111  }
1112 
1113  void StreamSchedule::availablePaths(std::vector<std::string>& oLabelsToFill) const {
1114  oLabelsToFill.reserve(trig_paths_.size());
1115  std::transform(trig_paths_.begin(),
1116  trig_paths_.end(),
1117  std::back_inserter(oLabelsToFill),
1118  std::bind(&Path::name, std::placeholders::_1));
1119  }
1120 
1121  void StreamSchedule::modulesInPath(std::string const& iPathLabel, std::vector<std::string>& oLabelsToFill) const {
1122  TrigPaths::const_iterator itFound = std::find_if(
1123  trig_paths_.begin(),
1124  trig_paths_.end(),
1125  std::bind(std::equal_to<std::string>(), iPathLabel, std::bind(&Path::name, std::placeholders::_1)));
1126  if (itFound != trig_paths_.end()) {
1127  oLabelsToFill.reserve(itFound->size());
1128  for (size_t i = 0; i < itFound->size(); ++i) {
1129  oLabelsToFill.push_back(itFound->getWorker(i)->description()->moduleLabel());
1130  }
1131  }
1132  }
1133 
1135  std::vector<ModuleDescription const*>& descriptions,
1136  unsigned int hint) const {
1137  descriptions.clear();
1138  bool found = false;
1139  TrigPaths::const_iterator itFound;
1140 
1141  if (hint < trig_paths_.size()) {
1142  itFound = trig_paths_.begin() + hint;
1143  if (itFound->name() == iPathLabel)
1144  found = true;
1145  }
1146  if (!found) {
1147  // if the hint did not work, do it the slow way
1148  itFound = std::find_if(
1149  trig_paths_.begin(),
1150  trig_paths_.end(),
1151  std::bind(std::equal_to<std::string>(), iPathLabel, std::bind(&Path::name, std::placeholders::_1)));
1152  if (itFound != trig_paths_.end())
1153  found = true;
1154  }
1155  if (found) {
1156  descriptions.reserve(itFound->size());
1157  for (size_t i = 0; i < itFound->size(); ++i) {
1158  descriptions.push_back(itFound->getWorker(i)->description());
1159  }
1160  }
1161  }
1162 
1164  std::vector<ModuleDescription const*>& descriptions,
1165  unsigned int hint) const {
1166  descriptions.clear();
1167  bool found = false;
1168  TrigPaths::const_iterator itFound;
1169 
1170  if (hint < end_paths_.size()) {
1171  itFound = end_paths_.begin() + hint;
1172  if (itFound->name() == iEndPathLabel)
1173  found = true;
1174  }
1175  if (!found) {
1176  // if the hint did not work, do it the slow way
1177  itFound = std::find_if(
1178  end_paths_.begin(),
1179  end_paths_.end(),
1180  std::bind(std::equal_to<std::string>(), iEndPathLabel, std::bind(&Path::name, std::placeholders::_1)));
1181  if (itFound != end_paths_.end())
1182  found = true;
1183  }
1184  if (found) {
1185  descriptions.reserve(itFound->size());
1186  for (size_t i = 0; i < itFound->size(); ++i) {
1187  descriptions.push_back(itFound->getWorker(i)->description());
1188  }
1189  }
1190  }
1191 
1192  static void fillModuleInPathSummary(Path const& path, size_t which, ModuleInPathSummary& sum) {
1193  sum.timesVisited += path.timesVisited(which);
1194  sum.timesPassed += path.timesPassed(which);
1195  sum.timesFailed += path.timesFailed(which);
1196  sum.timesExcept += path.timesExcept(which);
1197  sum.moduleLabel = path.getWorker(which)->description()->moduleLabel();
1198  sum.bitPosition = path.bitPosition(which);
1199  }
1200 
1201  static void fillPathSummary(Path const& path, PathSummary& sum) {
1202  sum.name = path.name();
1203  sum.bitPosition = path.bitPosition();
1204  sum.timesRun += path.timesRun();
1205  sum.timesPassed += path.timesPassed();
1206  sum.timesFailed += path.timesFailed();
1207  sum.timesExcept += path.timesExcept();
1208 
1209  Path::size_type sz = path.size();
1210  if (sum.moduleInPathSummaries.empty()) {
1211  std::vector<ModuleInPathSummary> temp(sz);
1212  for (size_t i = 0; i != sz; ++i) {
1214  }
1215  sum.moduleInPathSummaries.swap(temp);
1216  } else {
1217  assert(sz == sum.moduleInPathSummaries.size());
1218  for (size_t i = 0; i != sz; ++i) {
1220  }
1221  }
1222  }
1223 
1224  static void fillWorkerSummaryAux(Worker const& w, WorkerSummary& sum) {
1225  sum.timesVisited += w.timesVisited();
1226  sum.timesRun += w.timesRun();
1227  sum.timesPassed += w.timesPassed();
1228  sum.timesFailed += w.timesFailed();
1229  sum.timesExcept += w.timesExcept();
1230  sum.moduleLabel = w.description()->moduleLabel();
1231  }
1232 
1233  static void fillWorkerSummary(Worker const* pw, WorkerSummary& sum) { fillWorkerSummaryAux(*pw, sum); }
1234 
1236  rep.eventSummary.totalEvents += totalEvents();
1237  rep.eventSummary.totalEventsPassed += totalEventsPassed();
1238  rep.eventSummary.totalEventsFailed += totalEventsFailed();
1239 
1240  fill_summary(trig_paths_, rep.trigPathSummaries, &fillPathSummary);
1241  fill_summary(end_paths_, rep.endPathSummaries, &fillPathSummary);
1242  fill_summary(allWorkers(), rep.workerSummaries, &fillWorkerSummary);
1243  }
1244 
1246  using std::placeholders::_1;
1248  for_all(trig_paths_, std::bind(&Path::clearCounters, _1));
1249  for_all(end_paths_, std::bind(&Path::clearCounters, _1));
1250  for_all(allWorkers(), std::bind(&Worker::clearCounters, _1));
1251  }
1252 
1254  skippingEvent_ = false;
1255  results_->reset();
1256  }
1257 
1259 
1261  //must be sure we have cleared the count first
1262  for (auto& count : earlyDeleteBranchToCount_) {
1263  count.count = 0;
1264  }
1265  //now reset based on how many helpers use that branch
1267  ++(earlyDeleteBranchToCount_[index].count);
1268  }
1269  for (auto& helper : earlyDeleteHelpers_) {
1270  helper.reset();
1271  }
1272  }
1273 
1275  std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
1276  std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
1278  int bitpos = 0;
1279  unsigned int indexEmpty = 0;
1280  unsigned int indexOfPath = 0;
1281  for (auto& pathStatusInserter : pathStatusInserters) {
1282  std::shared_ptr<PathStatusInserter> inserterPtr = get_underlying(pathStatusInserter);
1283  WorkerPtr workerPtr(
1284  new edm::WorkerT<PathStatusInserter::ModuleType>(inserterPtr, inserterPtr->moduleDescription(), &actions));
1285  pathStatusInserterWorkers_.emplace_back(workerPtr);
1286  workerPtr->setActivityRegistry(actReg_);
1287  addToAllWorkers(workerPtr.get());
1288 
1289  // A little complexity here because a C++ Path object is not
1290  // instantiated and put into end_paths if there are no modules
1291  // on the configured path.
1292  if (indexEmpty < empty_trig_paths_.size() && bitpos == empty_trig_paths_.at(indexEmpty)) {
1293  ++indexEmpty;
1294  } else {
1295  trig_paths_.at(indexOfPath).setPathStatusInserter(inserterPtr.get(), workerPtr.get());
1296  ++indexOfPath;
1297  }
1298  ++bitpos;
1299  }
1300 
1301  bitpos = 0;
1302  indexEmpty = 0;
1303  indexOfPath = 0;
1304  for (auto& endPathStatusInserter : endPathStatusInserters) {
1305  std::shared_ptr<EndPathStatusInserter> inserterPtr = get_underlying(endPathStatusInserter);
1306  WorkerPtr workerPtr(
1307  new edm::WorkerT<EndPathStatusInserter::ModuleType>(inserterPtr, inserterPtr->moduleDescription(), &actions));
1308  endPathStatusInserterWorkers_.emplace_back(workerPtr);
1309  workerPtr->setActivityRegistry(actReg_);
1310  addToAllWorkers(workerPtr.get());
1311 
1312  // A little complexity here because a C++ Path object is not
1313  // instantiated and put into end_paths if there are no modules
1314  // on the configured path.
1315  if (indexEmpty < empty_end_paths_.size() && bitpos == empty_end_paths_.at(indexEmpty)) {
1316  ++indexEmpty;
1317  } else {
1318  end_paths_.at(indexOfPath).setPathStatusInserter(nullptr, workerPtr.get());
1319  ++indexOfPath;
1320  }
1321  ++bitpos;
1322  }
1323  }
1324 } // namespace edm
static void fillModuleInPathSummary(Path const &path, size_t which, ModuleInPathSummary &sum)
size
Write out results.
void moduleDescriptionsInEndPath(std::string const &iEndPathLabel, std::vector< ModuleDescription const *> &descriptions, unsigned int hint) const
ExceptionToActionTable const & actionTable() const
returns the action table
T getParameter(std::string const &) const
Definition: ParameterSet.h:303
void fillTrigPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, TrigResPtr, std::vector< std::string > const &endPathNames, ConditionalTaskHelper const &conditionalTaskHelper)
static const TGPicture * info(bool iBackgroundIsBlack)
#define CMS_SA_ALLOW
std::vector< int > empty_trig_paths_
Definition: helper.py:1
roAction_t actions[nactions]
Definition: GenABIO.cc:181
bool getMapped(key_type const &k, value_type &result) const
Definition: Registry.cc:17
std::vector< edm::propagate_const< WorkerPtr > > pathStatusInserterWorkers_
void endStream(StreamID iID, StreamContext &streamContext)
virtual void replaceModuleFor(Worker *) const =0
ProductList const & productList() const
std::unordered_multimap< std::string, AliasInfo > const & aliasMap() const
void processAccumulatorsAsync(WaitingTaskHolder, typename T::TransitionInfoType const &, ServiceToken const &, StreamID, ParentContext const &, typename T::Context const *)
std::array< SelectedProducts, NumBranchTypes > SelectedProductsForBranchType
static PFTauRenderPlugin instance
T w() const
std::shared_ptr< HLTGlobalStatus > TrigResPtr
WorkersInPath::size_type size_type
Definition: Path.h:46
int totalEventsPassed() const
ret
prodAgent to be discontinued
void modulesInPath(std::string const &iPathLabel, std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel ...
void processOneEventAsync(WaitingTaskHolder iTask, EventTransitionInfo &, ServiceToken const &token, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters)
std::vector< BranchToCount > earlyDeleteBranchToCount_
std::unordered_multimap< std::string, AliasInfo > aliasMap_
void setupOnDemandSystem(EventTransitionInfo const &)
void clearCounters()
Definition: Worker.h:223
void addContextAndPrintException(char const *context, cms::Exception &ex, bool disablePrint)
void addToAllWorkers(Worker *w)
std::string const & moduleName() const
StreamSchedule(std::shared_ptr< TriggerResultInserter > inserter, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, std::shared_ptr< ModuleRegistry >, ParameterSet &proc_pset, service::TriggerNamesService const &tns, PreallocationConfiguration const &prealloc, ProductRegistry &pregistry, ExceptionToActionTable const &actions, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, StreamID streamID, ProcessContext const *processContext)
std::vector< Worker * > tryToPlaceConditionalModules(Worker *, std::unordered_set< std::string > &conditionalModules, std::unordered_multimap< std::string, edm::BranchDescription const *> const &conditionalModuleBranches, std::unordered_multimap< std::string, AliasInfo > const &aliasMap, ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration)
edm::propagate_const< WorkerPtr > results_inserter_
void deleteModuleIfExists(std::string const &moduleLabel)
std::shared_ptr< Worker > WorkerPtr
void find(edm::Handle< EcalRecHitCollection > &hits, DetId thisDet, std::vector< EcalRecHitCollection::const_iterator > &hit, bool debug=false)
Definition: FindCaloHit.cc:19
assert(be >=bs)
StreamID streamID() const
ConditionalTaskHelper(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, WorkerManager &workerManager, std::vector< std::string > const &trigPathNames)
unsigned int number_of_unscheduled_modules_
int totalEventsFailed() const
TEMPL(T2) struct Divides void
Definition: Factorize.h:24
WorkerManager workerManager_
std::shared_ptr< ActivityRegistry > actReg_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
T getUntrackedParameter(std::string const &, T const &) const
constexpr element_type const * get() const
std::string moduleLabel
Definition: TriggerReport.h:52
oneapi::tbb::task_group * group() const noexcept
void processEDAliases(std::vector< std::string > const &aliasNamesToProcess, std::unordered_set< std::string > const &aliasModulesToProcess, ParameterSet const &proc_pset, std::string const &processName, ProductRegistry &preg)
char const * label
std::vector< WorkerInPath > PathWorkers
std::vector< int > empty_end_paths_
void doneWaiting(std::exception_ptr iExcept)
accept
Definition: HLTenums.h:18
void fillEndPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, std::vector< std::string > const &endPathNames, ConditionalTaskHelper const &conditionalTaskHelper)
void processSwitchEDAliases(ParameterSet const &proc_pset, ProductRegistry &preg, ProcessConfiguration const &processConfiguration, std::unordered_set< std::string > const &allConditionalMods)
std::string name
Definition: TriggerReport.h:41
void moduleDescriptionsInPath(std::string const &iPathLabel, std::vector< ModuleDescription const *> &descriptions, unsigned int hint) const
TrigResConstPtr results() const
Strings const & getTrigPaths() const
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
def unique(seq, keepstr=True)
Definition: tier0.py:24
Definition: Path.h:41
StreamContext streamContext_
std::vector< std::string > vstring
virtual std::vector< ConsumesInfo > consumesInfo() const =0
static void fillPathSummary(Path const &path, PathSummary &sum)
void finishedPaths(std::atomic< std::exception_ptr *> &, WaitingTaskHolder, EventTransitionInfo &)
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:92
bool typeIsViewCompatible(TypeID const &requestedViewType, TypeID const &wrappedtypeID, std::string const &className)
rep
Definition: cuy.py:1189
std::exception_ptr finishProcessOneEvent(std::exception_ptr)
ServiceToken lock() const
Definition: ServiceToken.h:101
void deleteModule(std::string const &iLabel)
Delete the module with label iLabel.
int totalEvents() const
static void fillWorkerSummaryAux(Worker const &w, WorkerSummary &sum)
virtual Types moduleType() const =0
Log< level::Info, false > LogInfo
void beginStream(StreamID iID, StreamContext &streamContext)
ModuleDescription const * description() const
Definition: Worker.h:189
std::string const & className() const
Definition: TypeID.cc:40
void clearCounters()
Clear all the counters in the trigger report.
void initializeEarlyDelete(ModuleRegistry &modReg, std::vector< std::string > const &branchesToDeleteEarly, edm::ProductRegistry const &preg)
void fillAliasMap(ParameterSet const &proc_pset, std::unordered_set< std::string > const &allConditionalMods)
Strings const & getEndPaths() const
void getTriggerReport(TriggerReport &rep) const
void addToUnscheduledWorkers(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string label, std::set< std::string > &unscheduledLabels, std::vector< std::string > &shouldBeUsedLabels)
exception_actions::ActionCodes find(const std::string &category) const
std::vector< unsigned int > earlyDeleteHelperToBranchIndicies_
void forAllModuleHolders(F iFunc)
edm::propagate_const< TrigResPtr > results_
bool search_all(ForwardSequence const &s, Datum const &d)
Definition: Algorithms.h:36
double b
Definition: hdecay.h:118
constexpr T & get_underlying(propagate_const< T > &)
void addContext(std::string const &context)
Definition: Exception.cc:165
virtual std::unique_ptr< OutputModuleCommunicator > createOutputModuleCommunicator()=0
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
static void fillWorkerSummary(Worker const *pw, WorkerSummary &sum)
std::string const & processName() const
std::atomic< bool > skippingEvent_
Definition: plugin.cc:23
HLT enums.
void setupResolvers(Principal &principal)
void replaceModule(maker::ModuleHolder *iMod, std::string const &iLabel)
clone the type of module with label iLabel but configure with iPSet.
void fillWorkers(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &name, bool ignoreFilters, PathWorkers &out, std::vector< std::string > const &endPathNames, ConditionalTaskHelper const &conditionalTaskHelper)
std::vector< edm::propagate_const< WorkerPtr > > endPathStatusInserterWorkers_
void makePathStatusInserters(std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, ExceptionToActionTable const &actions)
def which(cmd)
Definition: eostools.py:336
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
std::unordered_multimap< std::string, edm::BranchDescription const * > conditionalModuleBranches(std::unordered_set< std::string > const &conditionalmods) const
std::vector< ModuleInPathSummary > moduleInPathSummaries
Definition: TriggerReport.h:42
void availablePaths(std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill the labels for all paths in the process
unsigned int value() const
Definition: StreamID.h:43
auto wrap(F iFunc) -> decltype(iFunc())
std::unordered_multimap< std::string, edm::BranchDescription const * > conditionalModsBranches_
Log< level::Warning, false > LogWarning
std::list< std::string > const & context() const
Definition: Exception.cc:147
long double T
std::string const & moduleLabel() const
T mod(const T &a, const T &b)
Definition: ecalDccMap.h:4
std::string const & name() const
Definition: Path.h:74
std::vector< std::string > vstring
Definition: Schedule.cc:475
std::vector< std::string > set_difference(std::vector< std::string > const &v1, std::vector< std::string > const &v2)
def move(src, dest)
Definition: eostools.py:511
static Registry * instance()
Definition: Registry.cc:12
void clearCounters()
Definition: Path.cc:198
std::vector< EarlyDeleteHelper > earlyDeleteHelpers_
void addToAllWorkers(Worker *w)
void printCmsExceptionWarning(char const *behavior, cms::Exception const &e)
ParameterSet * getPSetForUpdate(std::string const &name, bool &isTracked)
std::vector< std::string > set_intersection(std::vector< std::string > const &v1, std::vector< std::string > const &v2)
unsigned transform(const HcalDetId &id, unsigned transformCode)