CMS 3D CMS Logo

StreamSchedule.cc
Go to the documentation of this file.
2 
29 
31 #include "processEDAliases.h"
32 
33 #include <algorithm>
34 #include <cassert>
35 #include <cstdlib>
36 #include <functional>
37 #include <iomanip>
38 #include <list>
39 #include <map>
40 #include <exception>
41 
42 namespace edm {
43 
44  namespace {
45 
46  // Function template to transform each element in the input range to
47  // a value placed into the output range. The supplied function
48  // should take a const_reference to the 'input', and write to a
49  // reference to the 'output'.
50  template <typename InputIterator, typename ForwardIterator, typename Func>
51  void transform_into(InputIterator begin, InputIterator end, ForwardIterator out, Func func) {
52  for (; begin != end; ++begin, ++out)
53  func(*begin, *out);
54  }
55 
56  // Function template that takes a sequence 'from', a sequence
57  // 'to', and a callable object 'func'. It and applies
58  // transform_into to fill the 'to' sequence with the values
59  // calcuated by the callable object, taking care to fill the
60  // outupt only if all calls succeed.
61  template <typename FROM, typename TO, typename FUNC>
62  void fill_summary(FROM const& from, TO& to, FUNC func) {
63  if (to.size() != from.size()) {
64  TO temp(from.size());
65  transform_into(from.begin(), from.end(), temp.begin(), func);
66  to.swap(temp);
67  } else {
68  transform_into(from.begin(), from.end(), to.begin(), func);
69  }
70  }
71 
72  // -----------------------------
73 
74  // Here we make the trigger results inserter directly. This should
75  // probably be a utility in the WorkerRegistry or elsewhere.
76 
77  StreamSchedule::WorkerPtr makeInserter(ExceptionToActionTable const& actions,
78  std::shared_ptr<ActivityRegistry> areg,
79  std::shared_ptr<TriggerResultInserter> inserter) {
81  new edm::WorkerT<TriggerResultInserter::ModuleType>(inserter, inserter->moduleDescription(), &actions));
82  ptr->setActivityRegistry(areg);
83  return ptr;
84  }
85 
86  void initializeBranchToReadingWorker(std::vector<std::string> const& branchesToDeleteEarly,
87  ProductRegistry const& preg,
88  std::multimap<std::string, Worker*>& branchToReadingWorker) {
89  auto vBranchesToDeleteEarly = branchesToDeleteEarly;
90  // Remove any duplicates
91  std::sort(vBranchesToDeleteEarly.begin(), vBranchesToDeleteEarly.end(), std::less<std::string>());
92  vBranchesToDeleteEarly.erase(std::unique(vBranchesToDeleteEarly.begin(), vBranchesToDeleteEarly.end()),
93  vBranchesToDeleteEarly.end());
94 
95  // Are the requested items in the product registry?
96  auto allBranchNames = preg.allBranchNames();
97  //the branch names all end with a period, which we do not want to compare with
98  for (auto& b : allBranchNames) {
99  b.resize(b.size() - 1);
100  }
101  std::sort(allBranchNames.begin(), allBranchNames.end(), std::less<std::string>());
102  std::vector<std::string> temp;
103  temp.reserve(vBranchesToDeleteEarly.size());
104 
105  std::set_intersection(vBranchesToDeleteEarly.begin(),
106  vBranchesToDeleteEarly.end(),
107  allBranchNames.begin(),
108  allBranchNames.end(),
109  std::back_inserter(temp));
110  vBranchesToDeleteEarly.swap(temp);
111  if (temp.size() != vBranchesToDeleteEarly.size()) {
112  std::vector<std::string> missingProducts;
113  std::set_difference(temp.begin(),
114  temp.end(),
115  vBranchesToDeleteEarly.begin(),
116  vBranchesToDeleteEarly.end(),
117  std::back_inserter(missingProducts));
118  LogInfo l("MissingProductsForCanDeleteEarly");
119  l << "The following products in the 'canDeleteEarly' list are not available in this job and will be ignored.";
120  for (auto const& n : missingProducts) {
121  l << "\n " << n;
122  }
123  }
124  //set placeholder for the branch, we will remove the nullptr if a
125  // module actually wants the branch.
126  for (auto const& branch : vBranchesToDeleteEarly) {
127  branchToReadingWorker.insert(std::make_pair(branch, static_cast<Worker*>(nullptr)));
128  }
129  }
130 
131  Worker* getWorker(std::string const& moduleLabel,
132  ParameterSet& proc_pset,
133  WorkerManager& workerManager,
134  ProductRegistry& preg,
135  PreallocationConfiguration const* prealloc,
136  std::shared_ptr<ProcessConfiguration const> processConfiguration) {
137  bool isTracked;
138  ParameterSet* modpset = proc_pset.getPSetForUpdate(moduleLabel, isTracked);
139  if (modpset == nullptr) {
140  return nullptr;
141  }
142  assert(isTracked);
143 
144  return workerManager.getWorker(*modpset, preg, prealloc, processConfiguration, moduleLabel);
145  }
146  } // namespace
147 
148  // -----------------------------
149 
150  typedef std::vector<std::string> vstring;
151 
152  // -----------------------------
153 
155  public:
157 
159  ProductRegistry& preg,
160  PreallocationConfiguration const* prealloc,
161  std::shared_ptr<ProcessConfiguration const> processConfiguration,
162  WorkerManager& workerManager,
163  std::vector<std::string> const& trigPathNames) {
164  std::unordered_set<std::string> allConditionalMods;
165  for (auto const& pathName : trigPathNames) {
166  auto const modnames = proc_pset.getParameter<vstring>(pathName);
167 
168  //Pull out ConditionalTask modules
169  auto itCondBegin = std::find(modnames.begin(), modnames.end(), "#");
170  if (itCondBegin == modnames.end())
171  continue;
172 
173  //the last entry should be ignored since it is required to be "@"
174  allConditionalMods.insert(itCondBegin + 1, std::prev(modnames.end()));
175  }
176 
177  for (auto const& cond : allConditionalMods) {
178  //force the creation of the conditional modules so alias check can work
179  (void)getWorker(cond, proc_pset, workerManager, preg, prealloc, processConfiguration);
180  }
181 
182  fillAliasMap(proc_pset, allConditionalMods);
183  processSwitchEDAliases(proc_pset, preg, *processConfiguration, allConditionalMods);
184 
185  //find branches created by the conditional modules
186  for (auto const& prod : preg.productList()) {
187  if (allConditionalMods.find(prod.first.moduleLabel()) != allConditionalMods.end()) {
188  conditionalModsBranches_.emplace(prod.first.moduleLabel(), &prod.second);
189  }
190  }
191  }
192 
193  std::multimap<std::string, AliasInfo> const& aliasMap() const { return aliasMap_; }
194 
195  std::multimap<std::string, edm::BranchDescription const*> conditionalModuleBranches(
196  std::unordered_set<std::string> const& conditionalmods) const {
197  std::multimap<std::string, edm::BranchDescription const*> ret;
198  for (auto const& mod : conditionalmods) {
199  auto range = conditionalModsBranches_.equal_range(mod);
200  ret.insert(range.first, range.second);
201  }
202  return ret;
203  }
204 
205  private:
206  void fillAliasMap(ParameterSet const& proc_pset, std::unordered_set<std::string> const& allConditionalMods) {
207  auto aliases = proc_pset.getParameter<std::vector<std::string>>("@all_aliases");
208  std::string const star("*");
209  for (auto const& alias : aliases) {
210  auto info = proc_pset.getParameter<edm::ParameterSet>(alias);
211  auto aliasedToModuleLabels = info.getParameterNames();
212  for (auto const& mod : aliasedToModuleLabels) {
213  if (not mod.empty() and mod[0] != '@' and allConditionalMods.find(mod) != allConditionalMods.end()) {
214  auto aliasVPSet = info.getParameter<std::vector<edm::ParameterSet>>(mod);
215  for (auto const& aliasPSet : aliasVPSet) {
216  std::string type = star;
217  std::string instance = star;
218  std::string originalInstance = star;
219  if (aliasPSet.exists("type")) {
220  type = aliasPSet.getParameter<std::string>("type");
221  }
222  if (aliasPSet.exists("toProductInstance")) {
223  instance = aliasPSet.getParameter<std::string>("toProductInstance");
224  }
225  if (aliasPSet.exists("fromProductInstance")) {
226  originalInstance = aliasPSet.getParameter<std::string>("fromProductInstance");
227  }
228 
229  aliasMap_.emplace(alias, AliasInfo{type, instance, originalInstance, mod});
230  }
231  }
232  }
233  }
234  }
235 
236  void processSwitchEDAliases(ParameterSet const& proc_pset,
237  ProductRegistry& preg,
238  ProcessConfiguration const& processConfiguration,
239  std::unordered_set<std::string> const& allConditionalMods) {
240  auto const& all_modules = proc_pset.getParameter<std::vector<std::string>>("@all_modules");
241  std::vector<std::string> switchEDAliases;
242  for (auto const& module : all_modules) {
243  auto const& mod_pset = proc_pset.getParameter<edm::ParameterSet>(module);
244  if (mod_pset.getParameter<std::string>("@module_type") == "SwitchProducer") {
245  auto const& all_cases = mod_pset.getParameter<std::vector<std::string>>("@all_cases");
246  for (auto const& case_label : all_cases) {
247  auto range = aliasMap_.equal_range(case_label);
248  if (range.first != range.second) {
249  switchEDAliases.push_back(case_label);
250  }
251  }
252  }
253  }
255  switchEDAliases, allConditionalMods, proc_pset, processConfiguration.processName(), preg);
256  }
257 
258  std::multimap<std::string, AliasInfo> aliasMap_;
259  std::multimap<std::string, edm::BranchDescription const*> conditionalModsBranches_;
260  };
261 
262  // -----------------------------
263 
265  std::shared_ptr<TriggerResultInserter> inserter,
266  std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
267  std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
268  std::shared_ptr<ModuleRegistry> modReg,
269  ParameterSet& proc_pset,
270  service::TriggerNamesService const& tns,
271  PreallocationConfiguration const& prealloc,
272  ProductRegistry& preg,
273  BranchIDListHelper& branchIDListHelper,
275  std::shared_ptr<ActivityRegistry> areg,
276  std::shared_ptr<ProcessConfiguration> processConfiguration,
277  StreamID streamID,
278  ProcessContext const* processContext)
279  : workerManager_(modReg, areg, actions),
280  actReg_(areg),
281  results_(new HLTGlobalStatus(tns.getTrigPaths().size())),
282  results_inserter_(),
283  trig_paths_(),
284  end_paths_(),
285  total_events_(),
286  total_passed_(),
287  number_of_unscheduled_modules_(0),
288  streamID_(streamID),
289  streamContext_(streamID_, processContext),
290  skippingEvent_(false) {
291  bool hasPath = false;
292  std::vector<std::string> const& pathNames = tns.getTrigPaths();
293  std::vector<std::string> const& endPathNames = tns.getEndPaths();
294 
295  ConditionalTaskHelper conditionalTaskHelper(
296  proc_pset, preg, &prealloc, processConfiguration, workerManager_, pathNames);
297 
298  int trig_bitpos = 0;
299  trig_paths_.reserve(pathNames.size());
300  for (auto const& trig_name : pathNames) {
301  fillTrigPath(proc_pset,
302  preg,
303  &prealloc,
304  processConfiguration,
305  trig_bitpos,
306  trig_name,
307  results(),
308  endPathNames,
309  conditionalTaskHelper);
310  ++trig_bitpos;
311  hasPath = true;
312  }
313 
314  if (hasPath) {
315  // the results inserter stands alone
316  inserter->setTrigResultForStream(streamID.value(), results());
317 
318  results_inserter_ = makeInserter(actions, actReg_, inserter);
320  }
321 
322  // fill normal endpaths
323  int bitpos = 0;
324  end_paths_.reserve(endPathNames.size());
325  for (auto const& end_path_name : endPathNames) {
326  fillEndPath(
327  proc_pset, preg, &prealloc, processConfiguration, bitpos, end_path_name, endPathNames, conditionalTaskHelper);
328  ++bitpos;
329  }
330 
331  makePathStatusInserters(pathStatusInserters, endPathStatusInserters, actions);
332 
333  //See if all modules were used
334  std::set<std::string> usedWorkerLabels;
335  for (auto const& worker : allWorkers()) {
336  usedWorkerLabels.insert(worker->description()->moduleLabel());
337  }
338  std::vector<std::string> modulesInConfig(proc_pset.getParameter<std::vector<std::string>>("@all_modules"));
339  std::set<std::string> modulesInConfigSet(modulesInConfig.begin(), modulesInConfig.end());
340  std::vector<std::string> unusedLabels;
341  set_difference(modulesInConfigSet.begin(),
342  modulesInConfigSet.end(),
343  usedWorkerLabels.begin(),
344  usedWorkerLabels.end(),
345  back_inserter(unusedLabels));
346  std::set<std::string> unscheduledLabels;
347  std::vector<std::string> shouldBeUsedLabels;
348  if (!unusedLabels.empty()) {
349  //Need to
350  // 1) create worker
351  // 2) if it is a WorkerT<EDProducer>, add it to our list
352  // 3) hand list to our delayed reader
353  for (auto const& label : unusedLabels) {
354  bool isTracked;
355  ParameterSet* modulePSet(proc_pset.getPSetForUpdate(label, isTracked));
356  assert(isTracked);
357  assert(modulePSet != nullptr);
359  *modulePSet, preg, &prealloc, processConfiguration, label, unscheduledLabels, shouldBeUsedLabels);
360  }
361  if (!shouldBeUsedLabels.empty()) {
362  std::ostringstream unusedStream;
363  unusedStream << "'" << shouldBeUsedLabels.front() << "'";
364  for (std::vector<std::string>::iterator itLabel = shouldBeUsedLabels.begin() + 1,
365  itLabelEnd = shouldBeUsedLabels.end();
366  itLabel != itLabelEnd;
367  ++itLabel) {
368  unusedStream << ",'" << *itLabel << "'";
369  }
370  LogInfo("path") << "The following module labels are not assigned to any path:\n" << unusedStream.str() << "\n";
371  }
372  }
373  number_of_unscheduled_modules_ = unscheduledLabels.size();
374  } // StreamSchedule::StreamSchedule
375 
377  std::vector<std::string> const& branchesToDeleteEarly,
378  edm::ProductRegistry const& preg) {
379  // setup the list with those products actually registered for this job
380  std::multimap<std::string, Worker*> branchToReadingWorker;
381  initializeBranchToReadingWorker(branchesToDeleteEarly, preg, branchToReadingWorker);
382 
383  const std::vector<std::string> kEmpty;
384  std::map<Worker*, unsigned int> reserveSizeForWorker;
385  unsigned int upperLimitOnReadingWorker = 0;
386  unsigned int upperLimitOnIndicies = 0;
387  unsigned int nUniqueBranchesToDelete = branchToReadingWorker.size();
388 
389  //talk with output modules first
390  modReg.forAllModuleHolders([&branchToReadingWorker, &nUniqueBranchesToDelete](maker::ModuleHolder* iHolder) {
391  auto comm = iHolder->createOutputModuleCommunicator();
392  if (comm) {
393  if (!branchToReadingWorker.empty()) {
394  //If an OutputModule needs a product, we can't delete it early
395  // so we should remove it from our list
396  SelectedProductsForBranchType const& kept = comm->keptProducts();
397  for (auto const& item : kept[InEvent]) {
398  BranchDescription const& desc = *item.first;
399  auto found = branchToReadingWorker.equal_range(desc.branchName());
400  if (found.first != found.second) {
401  --nUniqueBranchesToDelete;
402  branchToReadingWorker.erase(found.first, found.second);
403  }
404  }
405  }
406  }
407  });
408 
409  if (branchToReadingWorker.empty()) {
410  return;
411  }
412 
413  for (auto w : allWorkers()) {
414  //determine if this module could read a branch we want to delete early
415  auto pset = pset::Registry::instance()->getMapped(w->description()->parameterSetID());
416  if (nullptr != pset) {
417  auto branches = pset->getUntrackedParameter<std::vector<std::string>>("mightGet", kEmpty);
418  if (not branches.empty()) {
419  ++upperLimitOnReadingWorker;
420  }
421  for (auto const& branch : branches) {
422  auto found = branchToReadingWorker.equal_range(branch);
423  if (found.first != found.second) {
424  ++upperLimitOnIndicies;
425  ++reserveSizeForWorker[w];
426  if (nullptr == found.first->second) {
427  found.first->second = w;
428  } else {
429  branchToReadingWorker.insert(make_pair(found.first->first, w));
430  }
431  }
432  }
433  }
434  }
435  {
436  auto it = branchToReadingWorker.begin();
437  std::vector<std::string> unusedBranches;
438  while (it != branchToReadingWorker.end()) {
439  if (it->second == nullptr) {
440  unusedBranches.push_back(it->first);
441  //erasing the object invalidates the iterator so must advance it first
442  auto temp = it;
443  ++it;
444  branchToReadingWorker.erase(temp);
445  } else {
446  ++it;
447  }
448  }
449  if (not unusedBranches.empty()) {
450  LogWarning l("UnusedProductsForCanDeleteEarly");
451  l << "The following products in the 'canDeleteEarly' list are not used in this job and will be ignored.\n"
452  " If possible, remove the producer from the job or add the product to the producer's own 'mightGet' list.";
453  for (auto const& n : unusedBranches) {
454  l << "\n " << n;
455  }
456  }
457  }
458  if (!branchToReadingWorker.empty()) {
459  earlyDeleteHelpers_.reserve(upperLimitOnReadingWorker);
460  earlyDeleteHelperToBranchIndicies_.resize(upperLimitOnIndicies, 0);
461  earlyDeleteBranchToCount_.reserve(nUniqueBranchesToDelete);
462  std::map<const Worker*, EarlyDeleteHelper*> alreadySeenWorkers;
463  std::string lastBranchName;
464  size_t nextOpenIndex = 0;
465  unsigned int* beginAddress = &(earlyDeleteHelperToBranchIndicies_.front());
466  for (auto& branchAndWorker : branchToReadingWorker) {
467  if (lastBranchName != branchAndWorker.first) {
468  //have to put back the period we removed earlier in order to get the proper name
469  BranchID bid(branchAndWorker.first + ".");
470  earlyDeleteBranchToCount_.emplace_back(bid, 0U);
471  lastBranchName = branchAndWorker.first;
472  }
473  auto found = alreadySeenWorkers.find(branchAndWorker.second);
474  if (alreadySeenWorkers.end() == found) {
475  //NOTE: we will set aside enough space in earlyDeleteHelperToBranchIndicies_ to accommodate
476  // all the branches that might be read by this worker. However, initially we will only tell the
477  // EarlyDeleteHelper about the first one. As additional branches are added via 'appendIndex' the
478  // EarlyDeleteHelper will automatically advance its internal end pointer.
479  size_t index = nextOpenIndex;
480  size_t nIndices = reserveSizeForWorker[branchAndWorker.second];
482  earlyDeleteHelpers_.emplace_back(beginAddress + index, beginAddress + index + 1, &earlyDeleteBranchToCount_);
483  branchAndWorker.second->setEarlyDeleteHelper(&(earlyDeleteHelpers_.back()));
484  alreadySeenWorkers.insert(std::make_pair(branchAndWorker.second, &(earlyDeleteHelpers_.back())));
485  nextOpenIndex += nIndices;
486  } else {
487  found->second->appendIndex(earlyDeleteBranchToCount_.size() - 1);
488  }
489  }
490 
491  //Now we can compactify the earlyDeleteHelperToBranchIndicies_ since we may have over estimated the
492  // space needed for each module
493  auto itLast = earlyDeleteHelpers_.begin();
494  for (auto it = earlyDeleteHelpers_.begin() + 1; it != earlyDeleteHelpers_.end(); ++it) {
495  if (itLast->end() != it->begin()) {
496  //figure the offset for next Worker since it hasn't been moved yet so it has the original address
497  unsigned int delta = it->begin() - itLast->end();
498  it->shiftIndexPointers(delta);
499 
501  earlyDeleteHelperToBranchIndicies_.begin() + (itLast->end() - beginAddress),
502  earlyDeleteHelperToBranchIndicies_.begin() + (it->begin() - beginAddress));
503  }
504  itLast = it;
505  }
507  earlyDeleteHelperToBranchIndicies_.begin() + (itLast->end() - beginAddress),
509 
510  //now tell the paths about the deleters
511  for (auto& p : trig_paths_) {
512  p.setEarlyDeleteHelpers(alreadySeenWorkers);
513  }
514  for (auto& p : end_paths_) {
515  p.setEarlyDeleteHelpers(alreadySeenWorkers);
516  }
518  }
519  }
520 
522  Worker* worker,
523  std::unordered_set<std::string>& conditionalModules,
524  std::multimap<std::string, edm::BranchDescription const*> const& conditionalModuleBranches,
525  std::multimap<std::string, AliasInfo> const& aliasMap,
526  ParameterSet& proc_pset,
527  ProductRegistry& preg,
528  PreallocationConfiguration const* prealloc,
529  std::shared_ptr<ProcessConfiguration const> processConfiguration) {
530  std::vector<Worker*> returnValue;
531  auto const& consumesInfo = worker->consumesInfo();
532  auto moduleLabel = worker->description()->moduleLabel();
533  using namespace productholderindexhelper;
534  for (auto const& ci : consumesInfo) {
535  if (not ci.skipCurrentProcess() and
536  (ci.process().empty() or ci.process() == processConfiguration->processName())) {
537  auto productModuleLabel = ci.label();
538  if (productModuleLabel.empty()) {
539  //this is a consumesMany request
540  for (auto const& branch : conditionalModuleBranches) {
541  //check that the conditional module has not been used
542  if (conditionalModules.find(branch.first) == conditionalModules.end()) {
543  continue;
544  }
545  if (ci.kindOfType() == edm::PRODUCT_TYPE) {
546  if (branch.second->unwrappedTypeID() != ci.type()) {
547  continue;
548  }
549  } else {
550  if (not typeIsViewCompatible(
551  ci.type(), TypeID(branch.second->wrappedType().typeInfo()), branch.second->className())) {
552  continue;
553  }
554  }
555 
556  auto condWorker = getWorker(branch.first, proc_pset, workerManager_, preg, prealloc, processConfiguration);
557  assert(condWorker);
558 
559  conditionalModules.erase(branch.first);
560 
561  auto dependents = tryToPlaceConditionalModules(condWorker,
562  conditionalModules,
563  conditionalModuleBranches,
564  aliasMap,
565  proc_pset,
566  preg,
567  prealloc,
568  processConfiguration);
569  returnValue.insert(returnValue.end(), dependents.begin(), dependents.end());
570  returnValue.push_back(condWorker);
571  }
572  } else {
573  //just a regular consumes
574  bool productFromConditionalModule = false;
575  auto itFound = conditionalModules.find(productModuleLabel);
576  if (itFound == conditionalModules.end()) {
577  //Check to see if this was an alias
578  auto findAlias = aliasMap.equal_range(productModuleLabel);
579  for (auto it = findAlias.first; it != findAlias.second; ++it) {
580  //this was previously filtered so only the conditional modules remain
581  productModuleLabel = it->second.originalModuleLabel;
582  if (it->second.instanceLabel == "*" or ci.instance() == it->second.instanceLabel) {
583  if (it->second.friendlyClassName == "*" or
584  (ci.type().friendlyClassName() == it->second.friendlyClassName)) {
585  productFromConditionalModule = true;
586  //need to check the rest of the data product info
587  break;
588  } else if (ci.kindOfType() == ELEMENT_TYPE) {
589  //consume is a View so need to do more intrusive search
590  //find matching branches in module
591  auto branches = conditionalModuleBranches.equal_range(productModuleLabel);
592  for (auto itBranch = branches.first; itBranch != branches.second; ++it) {
593  if (it->second.originalInstanceLabel == "*" or
594  itBranch->second->productInstanceName() == it->second.originalInstanceLabel) {
595  if (typeIsViewCompatible(ci.type(),
596  TypeID(itBranch->second->wrappedType().typeInfo()),
597  itBranch->second->className())) {
598  productFromConditionalModule = true;
599  break;
600  }
601  }
602  }
603  if (productFromConditionalModule) {
604  break;
605  }
606  }
607  }
608  }
609  if (productFromConditionalModule) {
610  itFound = conditionalModules.find(productModuleLabel);
611  //check that the alias-for conditional module has not been used
612  if (itFound == conditionalModules.end()) {
613  continue;
614  }
615  }
616  } else {
617  //need to check the rest of the data product info
618  auto findBranches = conditionalModuleBranches.equal_range(productModuleLabel);
619  for (auto itBranch = findBranches.first; itBranch != findBranches.second; ++itBranch) {
620  if (itBranch->second->productInstanceName() == ci.instance()) {
621  if (ci.kindOfType() == PRODUCT_TYPE) {
622  if (ci.type() == itBranch->second->unwrappedTypeID()) {
623  productFromConditionalModule = true;
624  break;
625  }
626  } else {
627  //this is a view
628  if (typeIsViewCompatible(ci.type(),
629  TypeID(itBranch->second->wrappedType().typeInfo()),
630  itBranch->second->className())) {
631  productFromConditionalModule = true;
632  break;
633  }
634  }
635  }
636  }
637  }
638  if (productFromConditionalModule) {
639  auto condWorker =
640  getWorker(productModuleLabel, proc_pset, workerManager_, preg, prealloc, processConfiguration);
641  assert(condWorker);
642 
643  conditionalModules.erase(itFound);
644 
645  auto dependents = tryToPlaceConditionalModules(condWorker,
646  conditionalModules,
647  conditionalModuleBranches,
648  aliasMap,
649  proc_pset,
650  preg,
651  prealloc,
652  processConfiguration);
653  returnValue.insert(returnValue.end(), dependents.begin(), dependents.end());
654  returnValue.push_back(condWorker);
655  }
656  }
657  }
658  }
659  return returnValue;
660  }
661 
663  ProductRegistry& preg,
664  PreallocationConfiguration const* prealloc,
665  std::shared_ptr<ProcessConfiguration const> processConfiguration,
666  std::string const& pathName,
667  bool ignoreFilters,
668  PathWorkers& out,
669  std::vector<std::string> const& endPathNames,
670  ConditionalTaskHelper const& conditionalTaskHelper) {
671  vstring modnames = proc_pset.getParameter<vstring>(pathName);
672  PathWorkers tmpworkers;
673 
674  //Pull out ConditionalTask modules
675  auto itCondBegin = std::find(modnames.begin(), modnames.end(), "#");
676 
677  std::unordered_set<std::string> conditionalmods;
678  //An EDAlias may be redirecting to a module on a ConditionalTask
679  std::multimap<std::string, AliasInfo> aliasMap;
680  std::multimap<std::string, edm::BranchDescription const*> conditionalModsBranches;
681  std::unordered_map<std::string, unsigned int> conditionalModOrder;
682  if (itCondBegin != modnames.end()) {
683  for (auto it = itCondBegin + 1; it != modnames.begin() + modnames.size() - 1; ++it) {
684  // ordering needs to skip the # token in the path list
685  conditionalModOrder.emplace(*it, it - modnames.begin() - 1);
686  }
687  //the last entry should be ignored since it is required to be "@"
688  conditionalmods = std::unordered_set<std::string>(
689  std::make_move_iterator(itCondBegin + 1), std::make_move_iterator(modnames.begin() + modnames.size() - 1));
690 
691  conditionalModsBranches = conditionalTaskHelper.conditionalModuleBranches(conditionalmods);
692  }
693  modnames.erase(itCondBegin, modnames.end());
694 
695  unsigned int placeInPath = 0;
696  for (auto const& name : modnames) {
697  //Modules except EDFilters are set to run concurrently by default
698  bool doNotRunConcurrently = false;
700  if (name[0] == '!') {
701  filterAction = WorkerInPath::Veto;
702  } else if (name[0] == '-' or name[0] == '+') {
703  filterAction = WorkerInPath::Ignore;
704  }
705  if (name[0] == '|' or name[0] == '+') {
706  //cms.wait was specified so do not run concurrently
707  doNotRunConcurrently = true;
708  }
709 
711  if (filterAction != WorkerInPath::Normal or name[0] == '|') {
712  moduleLabel.erase(0, 1);
713  }
714 
715  Worker* worker = getWorker(moduleLabel, proc_pset, workerManager_, preg, prealloc, processConfiguration);
716  if (worker == nullptr) {
717  std::string pathType("endpath");
718  if (!search_all(endPathNames, pathName)) {
719  pathType = std::string("path");
720  }
722  << "The unknown module label \"" << moduleLabel << "\" appears in " << pathType << " \"" << pathName
723  << "\"\n please check spelling or remove that label from the path.";
724  }
725 
726  if (ignoreFilters && filterAction != WorkerInPath::Ignore && worker->moduleType() == Worker::kFilter) {
727  // We have a filter on an end path, and the filter is not explicitly ignored.
728  // See if the filter is allowed.
729  std::vector<std::string> allowed_filters = proc_pset.getUntrackedParameter<vstring>("@filters_on_endpaths");
730  if (!search_all(allowed_filters, worker->description()->moduleName())) {
731  // Filter is not allowed. Ignore the result, and issue a warning.
732  filterAction = WorkerInPath::Ignore;
733  LogWarning("FilterOnEndPath") << "The EDFilter '" << worker->description()->moduleName()
734  << "' with module label '" << moduleLabel << "' appears on EndPath '"
735  << pathName << "'.\n"
736  << "The return value of the filter will be ignored.\n"
737  << "To suppress this warning, either remove the filter from the endpath,\n"
738  << "or explicitly ignore it in the configuration by using cms.ignore().\n";
739  }
740  }
741  bool runConcurrently = not doNotRunConcurrently;
742  if (runConcurrently && worker->moduleType() == Worker::kFilter and filterAction != WorkerInPath::Ignore) {
743  runConcurrently = false;
744  }
745 
746  auto condModules = tryToPlaceConditionalModules(worker,
747  conditionalmods,
748  conditionalModsBranches,
749  conditionalTaskHelper.aliasMap(),
750  proc_pset,
751  preg,
752  prealloc,
753  processConfiguration);
754  for (auto condMod : condModules) {
755  tmpworkers.emplace_back(
756  condMod, WorkerInPath::Ignore, conditionalModOrder[condMod->description()->moduleLabel()], true);
757  }
758 
759  tmpworkers.emplace_back(worker, filterAction, placeInPath, runConcurrently);
760  ++placeInPath;
761  }
762 
763  out.swap(tmpworkers);
764  }
765 
767  ProductRegistry& preg,
768  PreallocationConfiguration const* prealloc,
769  std::shared_ptr<ProcessConfiguration const> processConfiguration,
770  int bitpos,
771  std::string const& name,
772  TrigResPtr trptr,
773  std::vector<std::string> const& endPathNames,
774  ConditionalTaskHelper const& conditionalTaskHelper) {
775  PathWorkers tmpworkers;
776  fillWorkers(
777  proc_pset, preg, prealloc, processConfiguration, name, false, tmpworkers, endPathNames, conditionalTaskHelper);
778 
779  // an empty path will cause an extra bit that is not used
780  if (!tmpworkers.empty()) {
781  trig_paths_.emplace_back(bitpos,
782  name,
783  tmpworkers,
784  trptr,
785  actionTable(),
786  actReg_,
790  } else {
791  empty_trig_paths_.push_back(bitpos);
792  }
793  for (WorkerInPath const& workerInPath : tmpworkers) {
794  addToAllWorkers(workerInPath.getWorker());
795  }
796  }
797 
799  ProductRegistry& preg,
800  PreallocationConfiguration const* prealloc,
801  std::shared_ptr<ProcessConfiguration const> processConfiguration,
802  int bitpos,
803  std::string const& name,
804  std::vector<std::string> const& endPathNames,
805  ConditionalTaskHelper const& conditionalTaskHelper) {
806  PathWorkers tmpworkers;
807  fillWorkers(
808  proc_pset, preg, prealloc, processConfiguration, name, true, tmpworkers, endPathNames, conditionalTaskHelper);
809 
810  if (!tmpworkers.empty()) {
811  //EndPaths are not supposed to stop if SkipEvent type exception happens
812  end_paths_.emplace_back(bitpos,
813  name,
814  tmpworkers,
815  TrigResPtr(),
816  actionTable(),
817  actReg_,
819  nullptr,
821  } else {
822  empty_end_paths_.push_back(bitpos);
823  }
824  for (WorkerInPath const& workerInPath : tmpworkers) {
825  addToAllWorkers(workerInPath.getWorker());
826  }
827  }
828 
830 
832 
834  Worker* found = nullptr;
835  for (auto const& worker : allWorkers()) {
836  if (worker->description()->moduleLabel() == iLabel) {
837  found = worker;
838  break;
839  }
840  }
841  if (nullptr == found) {
842  return;
843  }
844 
845  iMod->replaceModuleFor(found);
846  found->beginStream(streamID_, streamContext_);
847  }
848 
850 
851  std::vector<ModuleDescription const*> StreamSchedule::getAllModuleDescriptions() const {
852  std::vector<ModuleDescription const*> result;
853  result.reserve(allWorkers().size());
854 
855  for (auto const& worker : allWorkers()) {
856  ModuleDescription const* p = worker->description();
857  result.push_back(p);
858  }
859  return result;
860  }
861 
863  WaitingTaskHolder iTask,
865  ServiceToken const& serviceToken,
866  std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters) {
867  EventPrincipal& ep = info.principal();
868 
869  // Caught exception is propagated via WaitingTaskHolder
870  CMS_SA_ALLOW try {
871  this->resetAll();
872 
874 
875  Traits::setStreamContext(streamContext_, ep);
876  //a service may want to communicate with another service
877  ServiceRegistry::Operate guard(serviceToken);
878  Traits::preScheduleSignal(actReg_.get(), &streamContext_);
879 
880  // Data dependencies need to be set up before marking empty
881  // (End)Paths complete in case something consumes the status of
882  // the empty (EndPath)
885 
886  HLTPathStatus hltPathStatus(hlt::Pass, 0);
887  for (int empty_trig_path : empty_trig_paths_) {
888  results_->at(empty_trig_path) = hltPathStatus;
889  pathStatusInserters[empty_trig_path]->setPathStatus(streamID_, hltPathStatus);
890  std::exception_ptr except = pathStatusInserterWorkers_[empty_trig_path]
893  if (except) {
894  iTask.doneWaiting(except);
895  return;
896  }
897  }
898  for (int empty_end_path : empty_end_paths_) {
899  std::exception_ptr except = endPathStatusInserterWorkers_[empty_end_path]
902  if (except) {
903  iTask.doneWaiting(except);
904  return;
905  }
906  }
907 
908  ++total_events_;
909 
910  //use to give priorities on an error to ones from Paths
911  auto pathErrorHolder = std::make_unique<std::atomic<std::exception_ptr*>>(nullptr);
912  auto pathErrorPtr = pathErrorHolder.get();
913  ServiceWeakToken weakToken = serviceToken;
914  auto allPathsDone = make_waiting_task(
915  [iTask, this, weakToken, pathError = std::move(pathErrorHolder)](std::exception_ptr const* iPtr) mutable {
916  ServiceRegistry::Operate operate(weakToken.lock());
917 
918  std::exception_ptr ptr;
919  if (pathError->load()) {
920  ptr = *pathError->load();
921  delete pathError->load();
922  }
923  if ((not ptr) and iPtr) {
924  ptr = *iPtr;
925  }
927  });
928  //The holder guarantees that if the paths finish before the loop ends
929  // that we do not start too soon. It also guarantees that the task will
930  // run under that condition.
931  WaitingTaskHolder allPathsHolder(*iTask.group(), allPathsDone);
932 
933  auto pathsDone = make_waiting_task([allPathsHolder, pathErrorPtr, transitionInfo = info, this, weakToken](
934  std::exception_ptr const* iPtr) mutable {
935  ServiceRegistry::Operate operate(weakToken.lock());
936 
937  if (iPtr) {
938  //this is used to prioritize this error over one
939  // that happens in EndPath or Accumulate
940  pathErrorPtr->store(new std::exception_ptr(*iPtr));
941  }
942  finishedPaths(*pathErrorPtr, std::move(allPathsHolder), transitionInfo);
943  });
944 
945  //The holder guarantees that if the paths finish before the loop ends
946  // that we do not start too soon. It also guarantees that the task will
947  // run under that condition.
948  WaitingTaskHolder taskHolder(*iTask.group(), pathsDone);
949 
950  //start end paths first so on single threaded the paths will run first
951  WaitingTaskHolder hAllPathsDone(*iTask.group(), allPathsDone);
952  for (auto it = end_paths_.rbegin(), itEnd = end_paths_.rend(); it != itEnd; ++it) {
953  it->processOneOccurrenceAsync(hAllPathsDone, info, serviceToken, streamID_, &streamContext_);
954  }
955 
956  for (auto it = trig_paths_.rbegin(), itEnd = trig_paths_.rend(); it != itEnd; ++it) {
957  it->processOneOccurrenceAsync(taskHolder, info, serviceToken, streamID_, &streamContext_);
958  }
959 
960  ParentContext parentContext(&streamContext_);
962  hAllPathsDone, info, serviceToken, streamID_, parentContext, &streamContext_);
963  } catch (...) {
964  iTask.doneWaiting(std::current_exception());
965  }
966  }
967 
968  void StreamSchedule::finishedPaths(std::atomic<std::exception_ptr*>& iExcept,
969  WaitingTaskHolder iWait,
971  if (iExcept) {
972  // Caught exception is propagated via WaitingTaskHolder
973  CMS_SA_ALLOW try { std::rethrow_exception(*(iExcept.load())); } catch (cms::Exception& e) {
978  edm::printCmsExceptionWarning("SkipEvent", e);
979  *(iExcept.load()) = std::exception_ptr();
980  } else {
981  *(iExcept.load()) = std::current_exception();
982  }
983  } catch (...) {
984  *(iExcept.load()) = std::current_exception();
985  }
986  }
987 
988  if ((not iExcept) and results_->accept()) {
989  ++total_passed_;
990  }
991 
992  if (nullptr != results_inserter_.get()) {
993  // Caught exception is propagated to the caller
994  CMS_SA_ALLOW try {
995  //Even if there was an exception, we need to allow results inserter
996  // to run since some module may be waiting on its results.
997  ParentContext parentContext(&streamContext_);
999 
1000  auto expt = results_inserter_->runModuleDirectly<Traits>(info, streamID_, parentContext, &streamContext_);
1001  if (expt) {
1002  std::rethrow_exception(expt);
1003  }
1004  } catch (cms::Exception& ex) {
1005  if (not iExcept) {
1006  if (ex.context().empty()) {
1007  std::ostringstream ost;
1008  ost << "Processing Event " << info.principal().id();
1009  ex.addContext(ost.str());
1010  }
1011  iExcept.store(new std::exception_ptr(std::current_exception()));
1012  }
1013  } catch (...) {
1014  if (not iExcept) {
1015  iExcept.store(new std::exception_ptr(std::current_exception()));
1016  }
1017  }
1018  }
1019  std::exception_ptr ptr;
1020  if (iExcept) {
1021  ptr = *iExcept.load();
1022  }
1023  iWait.doneWaiting(ptr);
1024  }
1025 
1026  std::exception_ptr StreamSchedule::finishProcessOneEvent(std::exception_ptr iExcept) {
1028 
1029  if (iExcept) {
1030  //add context information to the exception and print message
1031  try {
1032  convertException::wrap([&]() { std::rethrow_exception(iExcept); });
1033  } catch (cms::Exception& ex) {
1034  bool const cleaningUpAfterException = false;
1035  if (ex.context().empty()) {
1036  addContextAndPrintException("Calling function StreamSchedule::processOneEvent", ex, cleaningUpAfterException);
1037  } else {
1038  addContextAndPrintException("", ex, cleaningUpAfterException);
1039  }
1040  iExcept = std::current_exception();
1041  }
1042 
1043  actReg_->preStreamEarlyTerminationSignal_(streamContext_, TerminationOrigin::ExceptionFromThisContext);
1044  }
1045  // Caught exception is propagated to the caller
1046  CMS_SA_ALLOW try { Traits::postScheduleSignal(actReg_.get(), &streamContext_); } catch (...) {
1047  if (not iExcept) {
1048  iExcept = std::current_exception();
1049  }
1050  }
1051  if (not iExcept) {
1052  resetEarlyDelete();
1053  }
1054 
1055  return iExcept;
1056  }
1057 
1058  void StreamSchedule::availablePaths(std::vector<std::string>& oLabelsToFill) const {
1059  oLabelsToFill.reserve(trig_paths_.size());
1060  std::transform(trig_paths_.begin(),
1061  trig_paths_.end(),
1062  std::back_inserter(oLabelsToFill),
1063  std::bind(&Path::name, std::placeholders::_1));
1064  }
1065 
1066  void StreamSchedule::modulesInPath(std::string const& iPathLabel, std::vector<std::string>& oLabelsToFill) const {
1067  TrigPaths::const_iterator itFound = std::find_if(
1068  trig_paths_.begin(),
1069  trig_paths_.end(),
1070  std::bind(std::equal_to<std::string>(), iPathLabel, std::bind(&Path::name, std::placeholders::_1)));
1071  if (itFound != trig_paths_.end()) {
1072  oLabelsToFill.reserve(itFound->size());
1073  for (size_t i = 0; i < itFound->size(); ++i) {
1074  oLabelsToFill.push_back(itFound->getWorker(i)->description()->moduleLabel());
1075  }
1076  }
1077  }
1078 
1080  std::vector<ModuleDescription const*>& descriptions,
1081  unsigned int hint) const {
1082  descriptions.clear();
1083  bool found = false;
1084  TrigPaths::const_iterator itFound;
1085 
1086  if (hint < trig_paths_.size()) {
1087  itFound = trig_paths_.begin() + hint;
1088  if (itFound->name() == iPathLabel)
1089  found = true;
1090  }
1091  if (!found) {
1092  // if the hint did not work, do it the slow way
1093  itFound = std::find_if(
1094  trig_paths_.begin(),
1095  trig_paths_.end(),
1096  std::bind(std::equal_to<std::string>(), iPathLabel, std::bind(&Path::name, std::placeholders::_1)));
1097  if (itFound != trig_paths_.end())
1098  found = true;
1099  }
1100  if (found) {
1101  descriptions.reserve(itFound->size());
1102  for (size_t i = 0; i < itFound->size(); ++i) {
1103  descriptions.push_back(itFound->getWorker(i)->description());
1104  }
1105  }
1106  }
1107 
1109  std::vector<ModuleDescription const*>& descriptions,
1110  unsigned int hint) const {
1111  descriptions.clear();
1112  bool found = false;
1113  TrigPaths::const_iterator itFound;
1114 
1115  if (hint < end_paths_.size()) {
1116  itFound = end_paths_.begin() + hint;
1117  if (itFound->name() == iEndPathLabel)
1118  found = true;
1119  }
1120  if (!found) {
1121  // if the hint did not work, do it the slow way
1122  itFound = std::find_if(
1123  end_paths_.begin(),
1124  end_paths_.end(),
1125  std::bind(std::equal_to<std::string>(), iEndPathLabel, std::bind(&Path::name, std::placeholders::_1)));
1126  if (itFound != end_paths_.end())
1127  found = true;
1128  }
1129  if (found) {
1130  descriptions.reserve(itFound->size());
1131  for (size_t i = 0; i < itFound->size(); ++i) {
1132  descriptions.push_back(itFound->getWorker(i)->description());
1133  }
1134  }
1135  }
1136 
1137  static void fillModuleInPathSummary(Path const& path, size_t which, ModuleInPathSummary& sum) {
1138  sum.timesVisited += path.timesVisited(which);
1139  sum.timesPassed += path.timesPassed(which);
1140  sum.timesFailed += path.timesFailed(which);
1141  sum.timesExcept += path.timesExcept(which);
1142  sum.moduleLabel = path.getWorker(which)->description()->moduleLabel();
1143  sum.bitPosition = path.bitPosition(which);
1144  }
1145 
1146  static void fillPathSummary(Path const& path, PathSummary& sum) {
1147  sum.name = path.name();
1148  sum.bitPosition = path.bitPosition();
1149  sum.timesRun += path.timesRun();
1150  sum.timesPassed += path.timesPassed();
1151  sum.timesFailed += path.timesFailed();
1152  sum.timesExcept += path.timesExcept();
1153 
1154  Path::size_type sz = path.size();
1155  if (sum.moduleInPathSummaries.empty()) {
1156  std::vector<ModuleInPathSummary> temp(sz);
1157  for (size_t i = 0; i != sz; ++i) {
1159  }
1160  sum.moduleInPathSummaries.swap(temp);
1161  } else {
1162  assert(sz == sum.moduleInPathSummaries.size());
1163  for (size_t i = 0; i != sz; ++i) {
1165  }
1166  }
1167  }
1168 
1169  static void fillWorkerSummaryAux(Worker const& w, WorkerSummary& sum) {
1170  sum.timesVisited += w.timesVisited();
1171  sum.timesRun += w.timesRun();
1172  sum.timesPassed += w.timesPassed();
1173  sum.timesFailed += w.timesFailed();
1174  sum.timesExcept += w.timesExcept();
1175  sum.moduleLabel = w.description()->moduleLabel();
1176  }
1177 
1178  static void fillWorkerSummary(Worker const* pw, WorkerSummary& sum) { fillWorkerSummaryAux(*pw, sum); }
1179 
1181  rep.eventSummary.totalEvents += totalEvents();
1182  rep.eventSummary.totalEventsPassed += totalEventsPassed();
1183  rep.eventSummary.totalEventsFailed += totalEventsFailed();
1184 
1185  fill_summary(trig_paths_, rep.trigPathSummaries, &fillPathSummary);
1186  fill_summary(end_paths_, rep.endPathSummaries, &fillPathSummary);
1187  fill_summary(allWorkers(), rep.workerSummaries, &fillWorkerSummary);
1188  }
1189 
1191  using std::placeholders::_1;
1193  for_all(trig_paths_, std::bind(&Path::clearCounters, _1));
1194  for_all(end_paths_, std::bind(&Path::clearCounters, _1));
1195  for_all(allWorkers(), std::bind(&Worker::clearCounters, _1));
1196  }
1197 
1199  skippingEvent_ = false;
1200  results_->reset();
1201  }
1202 
1204 
1206  //must be sure we have cleared the count first
1207  for (auto& count : earlyDeleteBranchToCount_) {
1208  count.count = 0;
1209  }
1210  //now reset based on how many helpers use that branch
1212  ++(earlyDeleteBranchToCount_[index].count);
1213  }
1214  for (auto& helper : earlyDeleteHelpers_) {
1215  helper.reset();
1216  }
1217  }
1218 
1220  std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
1221  std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
1223  int bitpos = 0;
1224  unsigned int indexEmpty = 0;
1225  unsigned int indexOfPath = 0;
1226  for (auto& pathStatusInserter : pathStatusInserters) {
1227  std::shared_ptr<PathStatusInserter> inserterPtr = get_underlying(pathStatusInserter);
1228  WorkerPtr workerPtr(
1229  new edm::WorkerT<PathStatusInserter::ModuleType>(inserterPtr, inserterPtr->moduleDescription(), &actions));
1230  pathStatusInserterWorkers_.emplace_back(workerPtr);
1231  workerPtr->setActivityRegistry(actReg_);
1232  addToAllWorkers(workerPtr.get());
1233 
1234  // A little complexity here because a C++ Path object is not
1235  // instantiated and put into end_paths if there are no modules
1236  // on the configured path.
1237  if (indexEmpty < empty_trig_paths_.size() && bitpos == empty_trig_paths_.at(indexEmpty)) {
1238  ++indexEmpty;
1239  } else {
1240  trig_paths_.at(indexOfPath).setPathStatusInserter(inserterPtr.get(), workerPtr.get());
1241  ++indexOfPath;
1242  }
1243  ++bitpos;
1244  }
1245 
1246  bitpos = 0;
1247  indexEmpty = 0;
1248  indexOfPath = 0;
1249  for (auto& endPathStatusInserter : endPathStatusInserters) {
1250  std::shared_ptr<EndPathStatusInserter> inserterPtr = get_underlying(endPathStatusInserter);
1251  WorkerPtr workerPtr(
1252  new edm::WorkerT<EndPathStatusInserter::ModuleType>(inserterPtr, inserterPtr->moduleDescription(), &actions));
1253  endPathStatusInserterWorkers_.emplace_back(workerPtr);
1254  workerPtr->setActivityRegistry(actReg_);
1255  addToAllWorkers(workerPtr.get());
1256 
1257  // A little complexity here because a C++ Path object is not
1258  // instantiated and put into end_paths if there are no modules
1259  // on the configured path.
1260  if (indexEmpty < empty_end_paths_.size() && bitpos == empty_end_paths_.at(indexEmpty)) {
1261  ++indexEmpty;
1262  } else {
1263  end_paths_.at(indexOfPath).setPathStatusInserter(nullptr, workerPtr.get());
1264  ++indexOfPath;
1265  }
1266  ++bitpos;
1267  }
1268  }
1269 } // namespace edm
static void fillModuleInPathSummary(Path const &path, size_t which, ModuleInPathSummary &sum)
size
Write out results.
void moduleDescriptionsInEndPath(std::string const &iEndPathLabel, std::vector< ModuleDescription const *> &descriptions, unsigned int hint) const
ExceptionToActionTable const & actionTable() const
returns the action table
T getParameter(std::string const &) const
Definition: ParameterSet.h:307
void fillTrigPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, TrigResPtr, std::vector< std::string > const &endPathNames, ConditionalTaskHelper const &conditionalTaskHelper)
static const TGPicture * info(bool iBackgroundIsBlack)
#define CMS_SA_ALLOW
std::vector< int > empty_trig_paths_
std::multimap< std::string, edm::BranchDescription const * > conditionalModsBranches_
Definition: helper.py:1
roAction_t actions[nactions]
Definition: GenABIO.cc:181
bool getMapped(key_type const &k, value_type &result) const
Definition: Registry.cc:17
std::vector< edm::propagate_const< WorkerPtr > > pathStatusInserterWorkers_
void endStream(StreamID iID, StreamContext &streamContext)
virtual void replaceModuleFor(Worker *) const =0
ProductList const & productList() const
void processAccumulatorsAsync(WaitingTaskHolder, typename T::TransitionInfoType const &, ServiceToken const &, StreamID, ParentContext const &, typename T::Context const *)
std::array< SelectedProducts, NumBranchTypes > SelectedProductsForBranchType
static PFTauRenderPlugin instance
T w() const
std::shared_ptr< HLTGlobalStatus > TrigResPtr
WorkersInPath::size_type size_type
Definition: Path.h:46
int totalEventsPassed() const
ret
prodAgent to be discontinued
void addToUnscheduledWorkers(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration > processConfiguration, std::string label, std::set< std::string > &unscheduledLabels, std::vector< std::string > &shouldBeUsedLabels)
void modulesInPath(std::string const &iPathLabel, std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel ...
void processOneEventAsync(WaitingTaskHolder iTask, EventTransitionInfo &, ServiceToken const &token, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters)
std::vector< BranchToCount > earlyDeleteBranchToCount_
void setupOnDemandSystem(EventTransitionInfo const &)
void clearCounters()
Definition: Worker.h:222
void addContextAndPrintException(char const *context, cms::Exception &ex, bool disablePrint)
void addToAllWorkers(Worker *w)
std::string const & moduleName() const
edm::propagate_const< WorkerPtr > results_inserter_
void deleteModuleIfExists(std::string const &moduleLabel)
std::shared_ptr< Worker > WorkerPtr
void find(edm::Handle< EcalRecHitCollection > &hits, DetId thisDet, std::vector< EcalRecHitCollection::const_iterator > &hit, bool debug=false)
Definition: FindCaloHit.cc:19
std::multimap< std::string, AliasInfo > aliasMap_
assert(be >=bs)
StreamID streamID() const
std::vector< Worker * > tryToPlaceConditionalModules(Worker *, std::unordered_set< std::string > &conditionalModules, std::multimap< std::string, edm::BranchDescription const *> const &conditionalModuleBranches, std::multimap< std::string, AliasInfo > const &aliasMap, ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration)
ConditionalTaskHelper(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, WorkerManager &workerManager, std::vector< std::string > const &trigPathNames)
unsigned int number_of_unscheduled_modules_
int totalEventsFailed() const
TEMPL(T2) struct Divides void
Definition: Factorize.h:24
WorkerManager workerManager_
std::shared_ptr< ActivityRegistry > actReg_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
T getUntrackedParameter(std::string const &, T const &) const
constexpr element_type const * get() const
std::string moduleLabel
Definition: TriggerReport.h:52
oneapi::tbb::task_group * group() const noexcept
void processEDAliases(std::vector< std::string > const &aliasNamesToProcess, std::unordered_set< std::string > const &aliasModulesToProcess, ParameterSet const &proc_pset, std::string const &processName, ProductRegistry &preg)
char const * label
std::vector< WorkerInPath > PathWorkers
std::vector< int > empty_end_paths_
void doneWaiting(std::exception_ptr iExcept)
accept
Definition: HLTenums.h:18
void fillEndPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, std::vector< std::string > const &endPathNames, ConditionalTaskHelper const &conditionalTaskHelper)
std::multimap< std::string, edm::BranchDescription const * > conditionalModuleBranches(std::unordered_set< std::string > const &conditionalmods) const
void processSwitchEDAliases(ParameterSet const &proc_pset, ProductRegistry &preg, ProcessConfiguration const &processConfiguration, std::unordered_set< std::string > const &allConditionalMods)
std::string name
Definition: TriggerReport.h:41
void moduleDescriptionsInPath(std::string const &iPathLabel, std::vector< ModuleDescription const *> &descriptions, unsigned int hint) const
TrigResConstPtr results() const
Strings const & getTrigPaths() const
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
def unique(seq, keepstr=True)
Definition: tier0.py:24
Definition: Path.h:41
StreamContext streamContext_
std::vector< std::string > vstring
virtual std::vector< ConsumesInfo > consumesInfo() const =0
static void fillPathSummary(Path const &path, PathSummary &sum)
StreamSchedule(std::shared_ptr< TriggerResultInserter > inserter, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, std::shared_ptr< ModuleRegistry >, ParameterSet &proc_pset, service::TriggerNamesService const &tns, PreallocationConfiguration const &prealloc, ProductRegistry &pregistry, BranchIDListHelper &branchIDListHelper, ExceptionToActionTable const &actions, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration > processConfiguration, StreamID streamID, ProcessContext const *processContext)
void finishedPaths(std::atomic< std::exception_ptr *> &, WaitingTaskHolder, EventTransitionInfo &)
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:101
bool typeIsViewCompatible(TypeID const &requestedViewType, TypeID const &wrappedtypeID, std::string const &className)
rep
Definition: cuy.py:1189
std::exception_ptr finishProcessOneEvent(std::exception_ptr)
ServiceToken lock() const
Definition: ServiceToken.h:101
void deleteModule(std::string const &iLabel)
Delete the module with label iLabel.
int totalEvents() const
static void fillWorkerSummaryAux(Worker const &w, WorkerSummary &sum)
virtual Types moduleType() const =0
Log< level::Info, false > LogInfo
void beginStream(StreamID iID, StreamContext &streamContext)
ModuleDescription const * description() const
Definition: Worker.h:188
std::string const & className() const
Definition: TypeID.cc:40
void clearCounters()
Clear all the counters in the trigger report.
void initializeEarlyDelete(ModuleRegistry &modReg, std::vector< std::string > const &branchesToDeleteEarly, edm::ProductRegistry const &preg)
void fillAliasMap(ParameterSet const &proc_pset, std::unordered_set< std::string > const &allConditionalMods)
Strings const & getEndPaths() const
void getTriggerReport(TriggerReport &rep) const
exception_actions::ActionCodes find(const std::string &category) const
std::vector< unsigned int > earlyDeleteHelperToBranchIndicies_
void forAllModuleHolders(F iFunc)
edm::propagate_const< TrigResPtr > results_
bool search_all(ForwardSequence const &s, Datum const &d)
Definition: Algorithms.h:36
double b
Definition: hdecay.h:118
constexpr T & get_underlying(propagate_const< T > &)
void addContext(std::string const &context)
Definition: Exception.cc:165
virtual std::unique_ptr< OutputModuleCommunicator > createOutputModuleCommunicator()=0
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
static void fillWorkerSummary(Worker const *pw, WorkerSummary &sum)
std::string const & processName() const
std::atomic< bool > skippingEvent_
Definition: plugin.cc:23
HLT enums.
void setupResolvers(Principal &principal)
std::multimap< std::string, AliasInfo > const & aliasMap() const
void replaceModule(maker::ModuleHolder *iMod, std::string const &iLabel)
clone the type of module with label iLabel but configure with iPSet.
void fillWorkers(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &name, bool ignoreFilters, PathWorkers &out, std::vector< std::string > const &endPathNames, ConditionalTaskHelper const &conditionalTaskHelper)
std::vector< edm::propagate_const< WorkerPtr > > endPathStatusInserterWorkers_
void makePathStatusInserters(std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, ExceptionToActionTable const &actions)
def which(cmd)
Definition: eostools.py:336
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
std::vector< ModuleInPathSummary > moduleInPathSummaries
Definition: TriggerReport.h:42
void availablePaths(std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill the labels for all paths in the process
unsigned int value() const
Definition: StreamID.h:43
auto wrap(F iFunc) -> decltype(iFunc())
Log< level::Warning, false > LogWarning
std::list< std::string > const & context() const
Definition: Exception.cc:147
std::string const & moduleLabel() const
T mod(const T &a, const T &b)
Definition: ecalDccMap.h:4
std::string const & name() const
Definition: Path.h:74
std::vector< std::string > vstring
Definition: Schedule.cc:473
std::vector< std::string > set_difference(std::vector< std::string > const &v1, std::vector< std::string > const &v2)
def move(src, dest)
Definition: eostools.py:511
static Registry * instance()
Definition: Registry.cc:12
void clearCounters()
Definition: Path.cc:198
std::vector< EarlyDeleteHelper > earlyDeleteHelpers_
void addToAllWorkers(Worker *w)
void printCmsExceptionWarning(char const *behavior, cms::Exception const &e)
ParameterSet * getPSetForUpdate(std::string const &name, bool &isTracked)
std::vector< std::string > set_intersection(std::vector< std::string > const &v1, std::vector< std::string > const &v2)
unsigned transform(const HcalDetId &id, unsigned transformCode)