CMS 3D CMS Logo

StreamSchedule.cc
Go to the documentation of this file.
2 
29 
31 #include "processEDAliases.h"
32 
33 #include <algorithm>
34 #include <cassert>
35 #include <cstdlib>
36 #include <functional>
37 #include <iomanip>
38 #include <list>
39 #include <map>
40 #include <exception>
41 
42 namespace edm {
43 
44  namespace {
45 
46  // Function template to transform each element in the input range to
47  // a value placed into the output range. The supplied function
48  // should take a const_reference to the 'input', and write to a
49  // reference to the 'output'.
50  template <typename InputIterator, typename ForwardIterator, typename Func>
51  void transform_into(InputIterator begin, InputIterator end, ForwardIterator out, Func func) {
52  for (; begin != end; ++begin, ++out)
53  func(*begin, *out);
54  }
55 
56  // Function template that takes a sequence 'from', a sequence
57  // 'to', and a callable object 'func'. It and applies
58  // transform_into to fill the 'to' sequence with the values
59  // calcuated by the callable object, taking care to fill the
60  // outupt only if all calls succeed.
61  template <typename FROM, typename TO, typename FUNC>
62  void fill_summary(FROM const& from, TO& to, FUNC func) {
63  if (to.size() != from.size()) {
64  TO temp(from.size());
65  transform_into(from.begin(), from.end(), temp.begin(), func);
66  to.swap(temp);
67  } else {
68  transform_into(from.begin(), from.end(), to.begin(), func);
69  }
70  }
71 
72  // -----------------------------
73 
74  // Here we make the trigger results inserter directly. This should
75  // probably be a utility in the WorkerRegistry or elsewhere.
76 
77  StreamSchedule::WorkerPtr makeInserter(ExceptionToActionTable const& actions,
78  std::shared_ptr<ActivityRegistry> areg,
79  std::shared_ptr<TriggerResultInserter> inserter) {
81  new edm::WorkerT<TriggerResultInserter::ModuleType>(inserter, inserter->moduleDescription(), &actions));
82  ptr->setActivityRegistry(areg);
83  return ptr;
84  }
85 
86  void initializeBranchToReadingWorker(std::vector<std::string> const& branchesToDeleteEarly,
87  ProductRegistry const& preg,
88  std::multimap<std::string, Worker*>& branchToReadingWorker) {
89  auto vBranchesToDeleteEarly = branchesToDeleteEarly;
90  // Remove any duplicates
91  std::sort(vBranchesToDeleteEarly.begin(), vBranchesToDeleteEarly.end(), std::less<std::string>());
92  vBranchesToDeleteEarly.erase(std::unique(vBranchesToDeleteEarly.begin(), vBranchesToDeleteEarly.end()),
93  vBranchesToDeleteEarly.end());
94 
95  // Are the requested items in the product registry?
96  auto allBranchNames = preg.allBranchNames();
97  //the branch names all end with a period, which we do not want to compare with
98  for (auto& b : allBranchNames) {
99  b.resize(b.size() - 1);
100  }
101  std::sort(allBranchNames.begin(), allBranchNames.end(), std::less<std::string>());
102  std::vector<std::string> temp;
103  temp.reserve(vBranchesToDeleteEarly.size());
104 
105  std::set_intersection(vBranchesToDeleteEarly.begin(),
106  vBranchesToDeleteEarly.end(),
107  allBranchNames.begin(),
108  allBranchNames.end(),
109  std::back_inserter(temp));
110  vBranchesToDeleteEarly.swap(temp);
111  if (temp.size() != vBranchesToDeleteEarly.size()) {
112  std::vector<std::string> missingProducts;
113  std::set_difference(temp.begin(),
114  temp.end(),
115  vBranchesToDeleteEarly.begin(),
116  vBranchesToDeleteEarly.end(),
117  std::back_inserter(missingProducts));
118  LogInfo l("MissingProductsForCanDeleteEarly");
119  l << "The following products in the 'canDeleteEarly' list are not available in this job and will be ignored.";
120  for (auto const& n : missingProducts) {
121  l << "\n " << n;
122  }
123  }
124  //set placeholder for the branch, we will remove the nullptr if a
125  // module actually wants the branch.
126  for (auto const& branch : vBranchesToDeleteEarly) {
127  branchToReadingWorker.insert(std::make_pair(branch, static_cast<Worker*>(nullptr)));
128  }
129  }
130 
131  Worker* getWorker(std::string const& moduleLabel,
132  ParameterSet& proc_pset,
133  WorkerManager& workerManager,
134  ProductRegistry& preg,
135  PreallocationConfiguration const* prealloc,
136  std::shared_ptr<ProcessConfiguration const> processConfiguration) {
137  bool isTracked;
138  ParameterSet* modpset = proc_pset.getPSetForUpdate(moduleLabel, isTracked);
139  if (modpset == nullptr) {
140  return nullptr;
141  }
142  assert(isTracked);
143 
144  return workerManager.getWorker(*modpset, preg, prealloc, processConfiguration, moduleLabel);
145  }
146  } // namespace
147 
148  // -----------------------------
149 
150  typedef std::vector<std::string> vstring;
151 
152  // -----------------------------
153 
155  public:
157 
159  ProductRegistry& preg,
160  PreallocationConfiguration const* prealloc,
161  std::shared_ptr<ProcessConfiguration const> processConfiguration,
162  WorkerManager& workerManager,
163  std::vector<std::string> const& trigPathNames) {
164  std::unordered_set<std::string> allConditionalMods;
165  for (auto const& pathName : trigPathNames) {
166  auto const modnames = proc_pset.getParameter<vstring>(pathName);
167 
168  //Pull out ConditionalTask modules
169  auto itCondBegin = std::find(modnames.begin(), modnames.end(), "#");
170  if (itCondBegin == modnames.end())
171  continue;
172 
173  //the last entry should be ignored since it is required to be "@"
174  allConditionalMods.insert(itCondBegin + 1, std::prev(modnames.end()));
175  }
176 
177  for (auto const& cond : allConditionalMods) {
178  //force the creation of the conditional modules so alias check can work
179  (void)getWorker(cond, proc_pset, workerManager, preg, prealloc, processConfiguration);
180  }
181 
182  fillAliasMap(proc_pset, allConditionalMods);
183  processSwitchEDAliases(proc_pset, preg, *processConfiguration, allConditionalMods);
184 
185  //find branches created by the conditional modules
186  for (auto const& prod : preg.productList()) {
187  if (allConditionalMods.find(prod.first.moduleLabel()) != allConditionalMods.end()) {
188  conditionalModsBranches_.emplace(prod.first.moduleLabel(), &prod.second);
189  }
190  }
191  }
192 
193  std::multimap<std::string, AliasInfo> const& aliasMap() const { return aliasMap_; }
194 
195  std::multimap<std::string, edm::BranchDescription const*> conditionalModuleBranches(
196  std::unordered_set<std::string> const& conditionalmods) const {
197  std::multimap<std::string, edm::BranchDescription const*> ret;
198  for (auto const& mod : conditionalmods) {
199  auto range = conditionalModsBranches_.equal_range(mod);
200  ret.insert(range.first, range.second);
201  }
202  return ret;
203  }
204 
205  private:
206  void fillAliasMap(ParameterSet const& proc_pset, std::unordered_set<std::string> const& allConditionalMods) {
207  auto aliases = proc_pset.getParameter<std::vector<std::string>>("@all_aliases");
208  std::string const star("*");
209  for (auto const& alias : aliases) {
210  auto info = proc_pset.getParameter<edm::ParameterSet>(alias);
211  auto aliasedToModuleLabels = info.getParameterNames();
212  for (auto const& mod : aliasedToModuleLabels) {
213  if (not mod.empty() and mod[0] != '@' and allConditionalMods.find(mod) != allConditionalMods.end()) {
214  auto aliasVPSet = info.getParameter<std::vector<edm::ParameterSet>>(mod);
215  for (auto const& aliasPSet : aliasVPSet) {
216  std::string type = star;
217  std::string instance = star;
218  std::string originalInstance = star;
219  if (aliasPSet.exists("type")) {
220  type = aliasPSet.getParameter<std::string>("type");
221  }
222  if (aliasPSet.exists("toProductInstance")) {
223  instance = aliasPSet.getParameter<std::string>("toProductInstance");
224  }
225  if (aliasPSet.exists("fromProductInstance")) {
226  originalInstance = aliasPSet.getParameter<std::string>("fromProductInstance");
227  }
228 
229  aliasMap_.emplace(alias, AliasInfo{type, instance, originalInstance, mod});
230  }
231  }
232  }
233  }
234  }
235 
236  void processSwitchEDAliases(ParameterSet const& proc_pset,
237  ProductRegistry& preg,
238  ProcessConfiguration const& processConfiguration,
239  std::unordered_set<std::string> const& allConditionalMods) {
240  auto const& all_modules = proc_pset.getParameter<std::vector<std::string>>("@all_modules");
241  std::vector<std::string> switchEDAliases;
242  for (auto const& module : all_modules) {
243  auto const& mod_pset = proc_pset.getParameter<edm::ParameterSet>(module);
244  if (mod_pset.getParameter<std::string>("@module_type") == "SwitchProducer") {
245  auto const& all_cases = mod_pset.getParameter<std::vector<std::string>>("@all_cases");
246  for (auto const& case_label : all_cases) {
247  auto range = aliasMap_.equal_range(case_label);
248  if (range.first != range.second) {
249  switchEDAliases.push_back(case_label);
250  }
251  }
252  }
253  }
255  switchEDAliases, allConditionalMods, proc_pset, processConfiguration.processName(), preg);
256  }
257 
258  std::multimap<std::string, AliasInfo> aliasMap_;
259  std::multimap<std::string, edm::BranchDescription const*> conditionalModsBranches_;
260  };
261 
262  // -----------------------------
263 
265  std::shared_ptr<TriggerResultInserter> inserter,
266  std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
267  std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
268  std::shared_ptr<ModuleRegistry> modReg,
269  ParameterSet& proc_pset,
270  service::TriggerNamesService const& tns,
271  PreallocationConfiguration const& prealloc,
272  ProductRegistry& preg,
273  BranchIDListHelper& branchIDListHelper,
275  std::shared_ptr<ActivityRegistry> areg,
276  std::shared_ptr<ProcessConfiguration> processConfiguration,
277  StreamID streamID,
278  ProcessContext const* processContext)
279  : workerManager_(modReg, areg, actions),
280  actReg_(areg),
281  results_(new HLTGlobalStatus(tns.getTrigPaths().size())),
282  results_inserter_(),
283  trig_paths_(),
284  end_paths_(),
285  total_events_(),
286  total_passed_(),
287  number_of_unscheduled_modules_(0),
288  streamID_(streamID),
289  streamContext_(streamID_, processContext),
290  skippingEvent_(false) {
291  bool hasPath = false;
292  std::vector<std::string> const& pathNames = tns.getTrigPaths();
293  std::vector<std::string> const& endPathNames = tns.getEndPaths();
294 
295  ConditionalTaskHelper conditionalTaskHelper(
296  proc_pset, preg, &prealloc, processConfiguration, workerManager_, pathNames);
297 
298  int trig_bitpos = 0;
299  trig_paths_.reserve(pathNames.size());
300  for (auto const& trig_name : pathNames) {
301  fillTrigPath(proc_pset,
302  preg,
303  &prealloc,
304  processConfiguration,
305  trig_bitpos,
306  trig_name,
307  results(),
308  endPathNames,
309  conditionalTaskHelper);
310  ++trig_bitpos;
311  hasPath = true;
312  }
313 
314  if (hasPath) {
315  // the results inserter stands alone
316  inserter->setTrigResultForStream(streamID.value(), results());
317 
318  results_inserter_ = makeInserter(actions, actReg_, inserter);
320  }
321 
322  // fill normal endpaths
323  int bitpos = 0;
324  end_paths_.reserve(endPathNames.size());
325  for (auto const& end_path_name : endPathNames) {
326  fillEndPath(
327  proc_pset, preg, &prealloc, processConfiguration, bitpos, end_path_name, endPathNames, conditionalTaskHelper);
328  ++bitpos;
329  }
330 
331  makePathStatusInserters(pathStatusInserters, endPathStatusInserters, actions);
332 
333  //See if all modules were used
334  std::set<std::string> usedWorkerLabels;
335  for (auto const& worker : allWorkers()) {
336  usedWorkerLabels.insert(worker->description()->moduleLabel());
337  }
338  std::vector<std::string> modulesInConfig(proc_pset.getParameter<std::vector<std::string>>("@all_modules"));
339  std::set<std::string> modulesInConfigSet(modulesInConfig.begin(), modulesInConfig.end());
340  std::vector<std::string> unusedLabels;
341  set_difference(modulesInConfigSet.begin(),
342  modulesInConfigSet.end(),
343  usedWorkerLabels.begin(),
344  usedWorkerLabels.end(),
345  back_inserter(unusedLabels));
346  std::set<std::string> unscheduledLabels;
347  std::vector<std::string> shouldBeUsedLabels;
348  if (!unusedLabels.empty()) {
349  //Need to
350  // 1) create worker
351  // 2) if it is a WorkerT<EDProducer>, add it to our list
352  // 3) hand list to our delayed reader
353  for (auto const& label : unusedLabels) {
354  bool isTracked;
355  ParameterSet* modulePSet(proc_pset.getPSetForUpdate(label, isTracked));
356  assert(isTracked);
357  assert(modulePSet != nullptr);
359  *modulePSet, preg, &prealloc, processConfiguration, label, unscheduledLabels, shouldBeUsedLabels);
360  }
361  if (!shouldBeUsedLabels.empty()) {
362  std::ostringstream unusedStream;
363  unusedStream << "'" << shouldBeUsedLabels.front() << "'";
364  for (std::vector<std::string>::iterator itLabel = shouldBeUsedLabels.begin() + 1,
365  itLabelEnd = shouldBeUsedLabels.end();
366  itLabel != itLabelEnd;
367  ++itLabel) {
368  unusedStream << ",'" << *itLabel << "'";
369  }
370  LogInfo("path") << "The following module labels are not assigned to any path:\n" << unusedStream.str() << "\n";
371  }
372  }
373  number_of_unscheduled_modules_ = unscheduledLabels.size();
374  } // StreamSchedule::StreamSchedule
375 
377  std::vector<std::string> const& branchesToDeleteEarly,
378  edm::ProductRegistry const& preg) {
379  // setup the list with those products actually registered for this job
380  std::multimap<std::string, Worker*> branchToReadingWorker;
381  initializeBranchToReadingWorker(branchesToDeleteEarly, preg, branchToReadingWorker);
382 
383  const std::vector<std::string> kEmpty;
384  std::map<Worker*, unsigned int> reserveSizeForWorker;
385  unsigned int upperLimitOnReadingWorker = 0;
386  unsigned int upperLimitOnIndicies = 0;
387  unsigned int nUniqueBranchesToDelete = branchToReadingWorker.size();
388 
389  //talk with output modules first
390  modReg.forAllModuleHolders([&branchToReadingWorker, &nUniqueBranchesToDelete](maker::ModuleHolder* iHolder) {
391  auto comm = iHolder->createOutputModuleCommunicator();
392  if (comm) {
393  if (!branchToReadingWorker.empty()) {
394  //If an OutputModule needs a product, we can't delete it early
395  // so we should remove it from our list
396  SelectedProductsForBranchType const& kept = comm->keptProducts();
397  for (auto const& item : kept[InEvent]) {
398  BranchDescription const& desc = *item.first;
399  auto found = branchToReadingWorker.equal_range(desc.branchName());
400  if (found.first != found.second) {
401  --nUniqueBranchesToDelete;
402  branchToReadingWorker.erase(found.first, found.second);
403  }
404  }
405  }
406  }
407  });
408 
409  if (branchToReadingWorker.empty()) {
410  return;
411  }
412 
413  for (auto w : allWorkers()) {
414  //determine if this module could read a branch we want to delete early
415  auto pset = pset::Registry::instance()->getMapped(w->description()->parameterSetID());
416  if (nullptr != pset) {
417  auto branches = pset->getUntrackedParameter<std::vector<std::string>>("mightGet", kEmpty);
418  if (not branches.empty()) {
419  ++upperLimitOnReadingWorker;
420  }
421  for (auto const& branch : branches) {
422  auto found = branchToReadingWorker.equal_range(branch);
423  if (found.first != found.second) {
424  ++upperLimitOnIndicies;
425  ++reserveSizeForWorker[w];
426  if (nullptr == found.first->second) {
427  found.first->second = w;
428  } else {
429  branchToReadingWorker.insert(make_pair(found.first->first, w));
430  }
431  }
432  }
433  }
434  }
435  {
436  auto it = branchToReadingWorker.begin();
437  std::vector<std::string> unusedBranches;
438  while (it != branchToReadingWorker.end()) {
439  if (it->second == nullptr) {
440  unusedBranches.push_back(it->first);
441  //erasing the object invalidates the iterator so must advance it first
442  auto temp = it;
443  ++it;
444  branchToReadingWorker.erase(temp);
445  } else {
446  ++it;
447  }
448  }
449  if (not unusedBranches.empty()) {
450  LogWarning l("UnusedProductsForCanDeleteEarly");
451  l << "The following products in the 'canDeleteEarly' list are not used in this job and will be ignored.\n"
452  " If possible, remove the producer from the job or add the product to the producer's own 'mightGet' list.";
453  for (auto const& n : unusedBranches) {
454  l << "\n " << n;
455  }
456  }
457  }
458  if (!branchToReadingWorker.empty()) {
459  earlyDeleteHelpers_.reserve(upperLimitOnReadingWorker);
460  earlyDeleteHelperToBranchIndicies_.resize(upperLimitOnIndicies, 0);
461  earlyDeleteBranchToCount_.reserve(nUniqueBranchesToDelete);
462  std::map<const Worker*, EarlyDeleteHelper*> alreadySeenWorkers;
463  std::string lastBranchName;
464  size_t nextOpenIndex = 0;
465  unsigned int* beginAddress = &(earlyDeleteHelperToBranchIndicies_.front());
466  for (auto& branchAndWorker : branchToReadingWorker) {
467  if (lastBranchName != branchAndWorker.first) {
468  //have to put back the period we removed earlier in order to get the proper name
469  BranchID bid(branchAndWorker.first + ".");
470  earlyDeleteBranchToCount_.emplace_back(bid, 0U);
471  lastBranchName = branchAndWorker.first;
472  }
473  auto found = alreadySeenWorkers.find(branchAndWorker.second);
474  if (alreadySeenWorkers.end() == found) {
475  //NOTE: we will set aside enough space in earlyDeleteHelperToBranchIndicies_ to accommodate
476  // all the branches that might be read by this worker. However, initially we will only tell the
477  // EarlyDeleteHelper about the first one. As additional branches are added via 'appendIndex' the
478  // EarlyDeleteHelper will automatically advance its internal end pointer.
479  size_t index = nextOpenIndex;
480  size_t nIndices = reserveSizeForWorker[branchAndWorker.second];
482  earlyDeleteHelpers_.emplace_back(beginAddress + index, beginAddress + index + 1, &earlyDeleteBranchToCount_);
483  branchAndWorker.second->setEarlyDeleteHelper(&(earlyDeleteHelpers_.back()));
484  alreadySeenWorkers.insert(std::make_pair(branchAndWorker.second, &(earlyDeleteHelpers_.back())));
485  nextOpenIndex += nIndices;
486  } else {
487  found->second->appendIndex(earlyDeleteBranchToCount_.size() - 1);
488  }
489  }
490 
491  //Now we can compactify the earlyDeleteHelperToBranchIndicies_ since we may have over estimated the
492  // space needed for each module
493  auto itLast = earlyDeleteHelpers_.begin();
494  for (auto it = earlyDeleteHelpers_.begin() + 1; it != earlyDeleteHelpers_.end(); ++it) {
495  if (itLast->end() != it->begin()) {
496  //figure the offset for next Worker since it hasn't been moved yet so it has the original address
497  unsigned int delta = it->begin() - itLast->end();
498  it->shiftIndexPointers(delta);
499 
501  earlyDeleteHelperToBranchIndicies_.begin() + (itLast->end() - beginAddress),
502  earlyDeleteHelperToBranchIndicies_.begin() + (it->begin() - beginAddress));
503  }
504  itLast = it;
505  }
507  earlyDeleteHelperToBranchIndicies_.begin() + (itLast->end() - beginAddress),
509 
510  //now tell the paths about the deleters
511  for (auto& p : trig_paths_) {
512  p.setEarlyDeleteHelpers(alreadySeenWorkers);
513  }
514  for (auto& p : end_paths_) {
515  p.setEarlyDeleteHelpers(alreadySeenWorkers);
516  }
518  }
519  }
520 
522  Worker* worker,
523  std::unordered_set<std::string>& conditionalModules,
524  std::multimap<std::string, edm::BranchDescription const*> const& conditionalModuleBranches,
525  std::multimap<std::string, AliasInfo> const& aliasMap,
526  ParameterSet& proc_pset,
527  ProductRegistry& preg,
528  PreallocationConfiguration const* prealloc,
529  std::shared_ptr<ProcessConfiguration const> processConfiguration) {
530  std::vector<Worker*> returnValue;
531  auto const& consumesInfo = worker->consumesInfo();
532  auto moduleLabel = worker->description()->moduleLabel();
533  using namespace productholderindexhelper;
534  for (auto const& ci : consumesInfo) {
535  if (not ci.skipCurrentProcess() and
536  (ci.process().empty() or ci.process() == processConfiguration->processName())) {
537  auto productModuleLabel = ci.label();
538  if (productModuleLabel.empty()) {
539  //this is a consumesMany request
540  for (auto const& branch : conditionalModuleBranches) {
541  //check that the conditional module has not been used
542  if (conditionalModules.find(branch.first) == conditionalModules.end()) {
543  continue;
544  }
545  if (ci.kindOfType() == edm::PRODUCT_TYPE) {
546  if (branch.second->unwrappedTypeID() != ci.type()) {
547  continue;
548  }
549  } else {
550  if (not typeIsViewCompatible(
551  ci.type(), TypeID(branch.second->wrappedType().typeInfo()), branch.second->className())) {
552  continue;
553  }
554  }
555 
556  auto condWorker = getWorker(branch.first, proc_pset, workerManager_, preg, prealloc, processConfiguration);
557  assert(condWorker);
558 
559  conditionalModules.erase(branch.first);
560 
561  auto dependents = tryToPlaceConditionalModules(condWorker,
562  conditionalModules,
563  conditionalModuleBranches,
564  aliasMap,
565  proc_pset,
566  preg,
567  prealloc,
568  processConfiguration);
569  returnValue.insert(returnValue.end(), dependents.begin(), dependents.end());
570  returnValue.push_back(condWorker);
571  }
572  } else {
573  //just a regular consumes
574  bool productFromConditionalModule = false;
575  auto itFound = conditionalModules.find(productModuleLabel);
576  if (itFound == conditionalModules.end()) {
577  //Check to see if this was an alias
578  auto findAlias = aliasMap.equal_range(productModuleLabel);
579  for (auto it = findAlias.first; it != findAlias.second; ++it) {
580  //this was previously filtered so only the conditional modules remain
581  productModuleLabel = it->second.originalModuleLabel;
582  if (it->second.instanceLabel == "*" or ci.instance() == it->second.instanceLabel) {
583  if (it->second.friendlyClassName == "*" or
584  (ci.type().friendlyClassName() == it->second.friendlyClassName)) {
585  productFromConditionalModule = true;
586  //need to check the rest of the data product info
587  break;
588  } else if (ci.kindOfType() == ELEMENT_TYPE) {
589  //consume is a View so need to do more intrusive search
590  //find matching branches in module
591  auto branches = conditionalModuleBranches.equal_range(productModuleLabel);
592  for (auto itBranch = branches.first; itBranch != branches.second; ++it) {
593  if (it->second.originalInstanceLabel == "*" or
594  itBranch->second->productInstanceName() == it->second.originalInstanceLabel) {
595  if (typeIsViewCompatible(ci.type(),
596  TypeID(itBranch->second->wrappedType().typeInfo()),
597  itBranch->second->className())) {
598  productFromConditionalModule = true;
599  break;
600  }
601  }
602  }
603  if (productFromConditionalModule) {
604  break;
605  }
606  }
607  }
608  }
609  if (productFromConditionalModule) {
610  itFound = conditionalModules.find(productModuleLabel);
611  //check that the alias-for conditional module has not been used
612  if (itFound == conditionalModules.end()) {
613  continue;
614  }
615  }
616  } else {
617  //need to check the rest of the data product info
618  auto findBranches = conditionalModuleBranches.equal_range(productModuleLabel);
619  for (auto itBranch = findBranches.first; itBranch != findBranches.second; ++itBranch) {
620  if (itBranch->second->productInstanceName() == ci.instance()) {
621  if (ci.kindOfType() == PRODUCT_TYPE) {
622  if (ci.type() == itBranch->second->unwrappedTypeID()) {
623  productFromConditionalModule = true;
624  break;
625  }
626  } else {
627  //this is a view
628  if (typeIsViewCompatible(ci.type(),
629  TypeID(itBranch->second->wrappedType().typeInfo()),
630  itBranch->second->className())) {
631  productFromConditionalModule = true;
632  break;
633  }
634  }
635  }
636  }
637  }
638  if (productFromConditionalModule) {
639  auto condWorker =
640  getWorker(productModuleLabel, proc_pset, workerManager_, preg, prealloc, processConfiguration);
641  assert(condWorker);
642 
643  conditionalModules.erase(itFound);
644 
645  auto dependents = tryToPlaceConditionalModules(condWorker,
646  conditionalModules,
647  conditionalModuleBranches,
648  aliasMap,
649  proc_pset,
650  preg,
651  prealloc,
652  processConfiguration);
653  returnValue.insert(returnValue.end(), dependents.begin(), dependents.end());
654  returnValue.push_back(condWorker);
655  }
656  }
657  }
658  }
659  return returnValue;
660  }
661 
663  ProductRegistry& preg,
664  PreallocationConfiguration const* prealloc,
665  std::shared_ptr<ProcessConfiguration const> processConfiguration,
666  std::string const& pathName,
667  bool ignoreFilters,
668  PathWorkers& out,
669  std::vector<std::string> const& endPathNames,
670  ConditionalTaskHelper const& conditionalTaskHelper) {
671  vstring modnames = proc_pset.getParameter<vstring>(pathName);
672  PathWorkers tmpworkers;
673 
674  //Pull out ConditionalTask modules
675  auto itCondBegin = std::find(modnames.begin(), modnames.end(), "#");
676 
677  std::unordered_set<std::string> conditionalmods;
678  //An EDAlias may be redirecting to a module on a ConditionalTask
679  std::multimap<std::string, AliasInfo> aliasMap;
680  std::multimap<std::string, edm::BranchDescription const*> conditionalModsBranches;
681  std::unordered_map<std::string, unsigned int> conditionalModOrder;
682  if (itCondBegin != modnames.end()) {
683  for (auto it = itCondBegin + 1; it != modnames.begin() + modnames.size() - 1; ++it) {
684  // ordering needs to skip the # token in the path list
685  conditionalModOrder.emplace(*it, it - modnames.begin() - 1);
686  }
687  //the last entry should be ignored since it is required to be "@"
688  conditionalmods = std::unordered_set<std::string>(
689  std::make_move_iterator(itCondBegin + 1), std::make_move_iterator(modnames.begin() + modnames.size() - 1));
690 
691  conditionalModsBranches = conditionalTaskHelper.conditionalModuleBranches(conditionalmods);
692  }
693  modnames.erase(itCondBegin, modnames.end());
694 
695  unsigned int placeInPath = 0;
696  for (auto const& name : modnames) {
697  //Modules except EDFilters are set to run concurrently by default
698  bool doNotRunConcurrently = false;
700  if (name[0] == '!') {
701  filterAction = WorkerInPath::Veto;
702  } else if (name[0] == '-' or name[0] == '+') {
703  filterAction = WorkerInPath::Ignore;
704  }
705  if (name[0] == '|' or name[0] == '+') {
706  //cms.wait was specified so do not run concurrently
707  doNotRunConcurrently = true;
708  }
709 
711  if (filterAction != WorkerInPath::Normal or name[0] == '|') {
712  moduleLabel.erase(0, 1);
713  }
714 
715  Worker* worker = getWorker(moduleLabel, proc_pset, workerManager_, preg, prealloc, processConfiguration);
716  if (worker == nullptr) {
717  std::string pathType("endpath");
718  if (!search_all(endPathNames, pathName)) {
719  pathType = std::string("path");
720  }
722  << "The unknown module label \"" << moduleLabel << "\" appears in " << pathType << " \"" << pathName
723  << "\"\n please check spelling or remove that label from the path.";
724  }
725 
726  if (ignoreFilters && filterAction != WorkerInPath::Ignore && worker->moduleType() == Worker::kFilter) {
727  // We have a filter on an end path, and the filter is not explicitly ignored.
728  // See if the filter is allowed.
729  std::vector<std::string> allowed_filters = proc_pset.getUntrackedParameter<vstring>("@filters_on_endpaths");
730  if (!search_all(allowed_filters, worker->description()->moduleName())) {
731  // Filter is not allowed. Ignore the result, and issue a warning.
732  filterAction = WorkerInPath::Ignore;
733  LogWarning("FilterOnEndPath") << "The EDFilter '" << worker->description()->moduleName()
734  << "' with module label '" << moduleLabel << "' appears on EndPath '"
735  << pathName << "'.\n"
736  << "The return value of the filter will be ignored.\n"
737  << "To suppress this warning, either remove the filter from the endpath,\n"
738  << "or explicitly ignore it in the configuration by using cms.ignore().\n";
739  }
740  }
741  bool runConcurrently = not doNotRunConcurrently;
742  if (runConcurrently && worker->moduleType() == Worker::kFilter and filterAction != WorkerInPath::Ignore) {
743  runConcurrently = false;
744  }
745 
746  auto condModules = tryToPlaceConditionalModules(worker,
747  conditionalmods,
748  conditionalModsBranches,
749  conditionalTaskHelper.aliasMap(),
750  proc_pset,
751  preg,
752  prealloc,
753  processConfiguration);
754  for (auto condMod : condModules) {
755  tmpworkers.emplace_back(
756  condMod, WorkerInPath::Ignore, conditionalModOrder[condMod->description()->moduleLabel()], true);
757  }
758 
759  tmpworkers.emplace_back(worker, filterAction, placeInPath, runConcurrently);
760  ++placeInPath;
761  }
762 
763  out.swap(tmpworkers);
764  }
765 
767  ProductRegistry& preg,
768  PreallocationConfiguration const* prealloc,
769  std::shared_ptr<ProcessConfiguration const> processConfiguration,
770  int bitpos,
771  std::string const& name,
772  TrigResPtr trptr,
773  std::vector<std::string> const& endPathNames,
774  ConditionalTaskHelper const& conditionalTaskHelper) {
775  PathWorkers tmpworkers;
776  fillWorkers(
777  proc_pset, preg, prealloc, processConfiguration, name, false, tmpworkers, endPathNames, conditionalTaskHelper);
778 
779  // an empty path will cause an extra bit that is not used
780  if (!tmpworkers.empty()) {
781  trig_paths_.emplace_back(bitpos,
782  name,
783  tmpworkers,
784  trptr,
785  actionTable(),
786  actReg_,
790  } else {
791  empty_trig_paths_.push_back(bitpos);
792  }
793  for (WorkerInPath const& workerInPath : tmpworkers) {
794  addToAllWorkers(workerInPath.getWorker());
795  }
796  }
797 
799  ProductRegistry& preg,
800  PreallocationConfiguration const* prealloc,
801  std::shared_ptr<ProcessConfiguration const> processConfiguration,
802  int bitpos,
803  std::string const& name,
804  std::vector<std::string> const& endPathNames,
805  ConditionalTaskHelper const& conditionalTaskHelper) {
806  PathWorkers tmpworkers;
807  fillWorkers(
808  proc_pset, preg, prealloc, processConfiguration, name, true, tmpworkers, endPathNames, conditionalTaskHelper);
809 
810  if (!tmpworkers.empty()) {
811  //EndPaths are not supposed to stop if SkipEvent type exception happens
812  end_paths_.emplace_back(bitpos,
813  name,
814  tmpworkers,
815  TrigResPtr(),
816  actionTable(),
817  actReg_,
819  nullptr,
821  } else {
822  empty_end_paths_.push_back(bitpos);
823  }
824  for (WorkerInPath const& workerInPath : tmpworkers) {
825  addToAllWorkers(workerInPath.getWorker());
826  }
827  }
828 
830 
832 
834  Worker* found = nullptr;
835  for (auto const& worker : allWorkers()) {
836  if (worker->description()->moduleLabel() == iLabel) {
837  found = worker;
838  break;
839  }
840  }
841  if (nullptr == found) {
842  return;
843  }
844 
845  iMod->replaceModuleFor(found);
846  found->beginStream(streamID_, streamContext_);
847  }
848 
850 
851  std::vector<ModuleDescription const*> StreamSchedule::getAllModuleDescriptions() const {
852  std::vector<ModuleDescription const*> result;
853  result.reserve(allWorkers().size());
854 
855  for (auto const& worker : allWorkers()) {
856  ModuleDescription const* p = worker->description();
857  result.push_back(p);
858  }
859  return result;
860  }
861 
863  WaitingTaskHolder iTask,
865  ServiceToken const& serviceToken,
866  std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters) {
867  EventPrincipal& ep = info.principal();
868 
869  // Caught exception is propagated via WaitingTaskHolder
870  CMS_SA_ALLOW try {
871  this->resetAll();
872 
874 
875  Traits::setStreamContext(streamContext_, ep);
876  //a service may want to communicate with another service
877  ServiceRegistry::Operate guard(serviceToken);
878  Traits::preScheduleSignal(actReg_.get(), &streamContext_);
879 
880  HLTPathStatus hltPathStatus(hlt::Pass, 0);
881  for (int empty_trig_path : empty_trig_paths_) {
882  results_->at(empty_trig_path) = hltPathStatus;
883  pathStatusInserters[empty_trig_path]->setPathStatus(streamID_, hltPathStatus);
884  std::exception_ptr except = pathStatusInserterWorkers_[empty_trig_path]
887  if (except) {
888  iTask.doneWaiting(except);
889  return;
890  }
891  }
892  for (int empty_end_path : empty_end_paths_) {
893  std::exception_ptr except = endPathStatusInserterWorkers_[empty_end_path]
896  if (except) {
897  iTask.doneWaiting(except);
898  return;
899  }
900  }
901 
904 
905  ++total_events_;
906 
907  //use to give priorities on an error to ones from Paths
908  auto pathErrorHolder = std::make_unique<std::atomic<std::exception_ptr*>>(nullptr);
909  auto pathErrorPtr = pathErrorHolder.get();
910  ServiceWeakToken weakToken = serviceToken;
911  auto allPathsDone = make_waiting_task(
912  [iTask, this, weakToken, pathError = std::move(pathErrorHolder)](std::exception_ptr const* iPtr) mutable {
913  ServiceRegistry::Operate operate(weakToken.lock());
914 
915  std::exception_ptr ptr;
916  if (pathError->load()) {
917  ptr = *pathError->load();
918  delete pathError->load();
919  }
920  if ((not ptr) and iPtr) {
921  ptr = *iPtr;
922  }
924  });
925  //The holder guarantees that if the paths finish before the loop ends
926  // that we do not start too soon. It also guarantees that the task will
927  // run under that condition.
928  WaitingTaskHolder allPathsHolder(*iTask.group(), allPathsDone);
929 
930  auto pathsDone = make_waiting_task([allPathsHolder, pathErrorPtr, transitionInfo = info, this, weakToken](
931  std::exception_ptr const* iPtr) mutable {
932  ServiceRegistry::Operate operate(weakToken.lock());
933 
934  if (iPtr) {
935  //this is used to prioritize this error over one
936  // that happens in EndPath or Accumulate
937  pathErrorPtr->store(new std::exception_ptr(*iPtr));
938  }
939  finishedPaths(*pathErrorPtr, std::move(allPathsHolder), transitionInfo);
940  });
941 
942  //The holder guarantees that if the paths finish before the loop ends
943  // that we do not start too soon. It also guarantees that the task will
944  // run under that condition.
945  WaitingTaskHolder taskHolder(*iTask.group(), pathsDone);
946 
947  //start end paths first so on single threaded the paths will run first
948  WaitingTaskHolder hAllPathsDone(*iTask.group(), allPathsDone);
949  for (auto it = end_paths_.rbegin(), itEnd = end_paths_.rend(); it != itEnd; ++it) {
950  it->processOneOccurrenceAsync(hAllPathsDone, info, serviceToken, streamID_, &streamContext_);
951  }
952 
953  for (auto it = trig_paths_.rbegin(), itEnd = trig_paths_.rend(); it != itEnd; ++it) {
954  it->processOneOccurrenceAsync(taskHolder, info, serviceToken, streamID_, &streamContext_);
955  }
956 
957  ParentContext parentContext(&streamContext_);
959  hAllPathsDone, info, serviceToken, streamID_, parentContext, &streamContext_);
960  } catch (...) {
961  iTask.doneWaiting(std::current_exception());
962  }
963  }
964 
965  void StreamSchedule::finishedPaths(std::atomic<std::exception_ptr*>& iExcept,
966  WaitingTaskHolder iWait,
968  if (iExcept) {
969  // Caught exception is propagated via WaitingTaskHolder
970  CMS_SA_ALLOW try { std::rethrow_exception(*(iExcept.load())); } catch (cms::Exception& e) {
975  edm::printCmsExceptionWarning("SkipEvent", e);
976  *(iExcept.load()) = std::exception_ptr();
977  } else {
978  *(iExcept.load()) = std::current_exception();
979  }
980  } catch (...) {
981  *(iExcept.load()) = std::current_exception();
982  }
983  }
984 
985  if ((not iExcept) and results_->accept()) {
986  ++total_passed_;
987  }
988 
989  if (nullptr != results_inserter_.get()) {
990  // Caught exception is propagated to the caller
991  CMS_SA_ALLOW try {
992  //Even if there was an exception, we need to allow results inserter
993  // to run since some module may be waiting on its results.
994  ParentContext parentContext(&streamContext_);
996 
997  auto expt = results_inserter_->runModuleDirectly<Traits>(info, streamID_, parentContext, &streamContext_);
998  if (expt) {
999  std::rethrow_exception(expt);
1000  }
1001  } catch (cms::Exception& ex) {
1002  if (not iExcept) {
1003  if (ex.context().empty()) {
1004  std::ostringstream ost;
1005  ost << "Processing Event " << info.principal().id();
1006  ex.addContext(ost.str());
1007  }
1008  iExcept.store(new std::exception_ptr(std::current_exception()));
1009  }
1010  } catch (...) {
1011  if (not iExcept) {
1012  iExcept.store(new std::exception_ptr(std::current_exception()));
1013  }
1014  }
1015  }
1016  std::exception_ptr ptr;
1017  if (iExcept) {
1018  ptr = *iExcept.load();
1019  }
1020  iWait.doneWaiting(ptr);
1021  }
1022 
1023  std::exception_ptr StreamSchedule::finishProcessOneEvent(std::exception_ptr iExcept) {
1025 
1026  if (iExcept) {
1027  //add context information to the exception and print message
1028  try {
1029  convertException::wrap([&]() { std::rethrow_exception(iExcept); });
1030  } catch (cms::Exception& ex) {
1031  bool const cleaningUpAfterException = false;
1032  if (ex.context().empty()) {
1033  addContextAndPrintException("Calling function StreamSchedule::processOneEvent", ex, cleaningUpAfterException);
1034  } else {
1035  addContextAndPrintException("", ex, cleaningUpAfterException);
1036  }
1037  iExcept = std::current_exception();
1038  }
1039 
1040  actReg_->preStreamEarlyTerminationSignal_(streamContext_, TerminationOrigin::ExceptionFromThisContext);
1041  }
1042  // Caught exception is propagated to the caller
1043  CMS_SA_ALLOW try { Traits::postScheduleSignal(actReg_.get(), &streamContext_); } catch (...) {
1044  if (not iExcept) {
1045  iExcept = std::current_exception();
1046  }
1047  }
1048  if (not iExcept) {
1049  resetEarlyDelete();
1050  }
1051 
1052  return iExcept;
1053  }
1054 
1055  void StreamSchedule::availablePaths(std::vector<std::string>& oLabelsToFill) const {
1056  oLabelsToFill.reserve(trig_paths_.size());
1057  std::transform(trig_paths_.begin(),
1058  trig_paths_.end(),
1059  std::back_inserter(oLabelsToFill),
1060  std::bind(&Path::name, std::placeholders::_1));
1061  }
1062 
1063  void StreamSchedule::modulesInPath(std::string const& iPathLabel, std::vector<std::string>& oLabelsToFill) const {
1064  TrigPaths::const_iterator itFound = std::find_if(
1065  trig_paths_.begin(),
1066  trig_paths_.end(),
1067  std::bind(std::equal_to<std::string>(), iPathLabel, std::bind(&Path::name, std::placeholders::_1)));
1068  if (itFound != trig_paths_.end()) {
1069  oLabelsToFill.reserve(itFound->size());
1070  for (size_t i = 0; i < itFound->size(); ++i) {
1071  oLabelsToFill.push_back(itFound->getWorker(i)->description()->moduleLabel());
1072  }
1073  }
1074  }
1075 
1077  std::vector<ModuleDescription const*>& descriptions,
1078  unsigned int hint) const {
1079  descriptions.clear();
1080  bool found = false;
1081  TrigPaths::const_iterator itFound;
1082 
1083  if (hint < trig_paths_.size()) {
1084  itFound = trig_paths_.begin() + hint;
1085  if (itFound->name() == iPathLabel)
1086  found = true;
1087  }
1088  if (!found) {
1089  // if the hint did not work, do it the slow way
1090  itFound = std::find_if(
1091  trig_paths_.begin(),
1092  trig_paths_.end(),
1093  std::bind(std::equal_to<std::string>(), iPathLabel, std::bind(&Path::name, std::placeholders::_1)));
1094  if (itFound != trig_paths_.end())
1095  found = true;
1096  }
1097  if (found) {
1098  descriptions.reserve(itFound->size());
1099  for (size_t i = 0; i < itFound->size(); ++i) {
1100  descriptions.push_back(itFound->getWorker(i)->description());
1101  }
1102  }
1103  }
1104 
1106  std::vector<ModuleDescription const*>& descriptions,
1107  unsigned int hint) const {
1108  descriptions.clear();
1109  bool found = false;
1110  TrigPaths::const_iterator itFound;
1111 
1112  if (hint < end_paths_.size()) {
1113  itFound = end_paths_.begin() + hint;
1114  if (itFound->name() == iEndPathLabel)
1115  found = true;
1116  }
1117  if (!found) {
1118  // if the hint did not work, do it the slow way
1119  itFound = std::find_if(
1120  end_paths_.begin(),
1121  end_paths_.end(),
1122  std::bind(std::equal_to<std::string>(), iEndPathLabel, std::bind(&Path::name, std::placeholders::_1)));
1123  if (itFound != end_paths_.end())
1124  found = true;
1125  }
1126  if (found) {
1127  descriptions.reserve(itFound->size());
1128  for (size_t i = 0; i < itFound->size(); ++i) {
1129  descriptions.push_back(itFound->getWorker(i)->description());
1130  }
1131  }
1132  }
1133 
1134  static void fillModuleInPathSummary(Path const& path, size_t which, ModuleInPathSummary& sum) {
1135  sum.timesVisited += path.timesVisited(which);
1136  sum.timesPassed += path.timesPassed(which);
1137  sum.timesFailed += path.timesFailed(which);
1138  sum.timesExcept += path.timesExcept(which);
1139  sum.moduleLabel = path.getWorker(which)->description()->moduleLabel();
1140  sum.bitPosition = path.bitPosition(which);
1141  }
1142 
1143  static void fillPathSummary(Path const& path, PathSummary& sum) {
1144  sum.name = path.name();
1145  sum.bitPosition = path.bitPosition();
1146  sum.timesRun += path.timesRun();
1147  sum.timesPassed += path.timesPassed();
1148  sum.timesFailed += path.timesFailed();
1149  sum.timesExcept += path.timesExcept();
1150 
1151  Path::size_type sz = path.size();
1152  if (sum.moduleInPathSummaries.empty()) {
1153  std::vector<ModuleInPathSummary> temp(sz);
1154  for (size_t i = 0; i != sz; ++i) {
1156  }
1157  sum.moduleInPathSummaries.swap(temp);
1158  } else {
1159  assert(sz == sum.moduleInPathSummaries.size());
1160  for (size_t i = 0; i != sz; ++i) {
1162  }
1163  }
1164  }
1165 
1166  static void fillWorkerSummaryAux(Worker const& w, WorkerSummary& sum) {
1167  sum.timesVisited += w.timesVisited();
1168  sum.timesRun += w.timesRun();
1169  sum.timesPassed += w.timesPassed();
1170  sum.timesFailed += w.timesFailed();
1171  sum.timesExcept += w.timesExcept();
1172  sum.moduleLabel = w.description()->moduleLabel();
1173  }
1174 
1175  static void fillWorkerSummary(Worker const* pw, WorkerSummary& sum) { fillWorkerSummaryAux(*pw, sum); }
1176 
1178  rep.eventSummary.totalEvents += totalEvents();
1179  rep.eventSummary.totalEventsPassed += totalEventsPassed();
1180  rep.eventSummary.totalEventsFailed += totalEventsFailed();
1181 
1182  fill_summary(trig_paths_, rep.trigPathSummaries, &fillPathSummary);
1183  fill_summary(end_paths_, rep.endPathSummaries, &fillPathSummary);
1184  fill_summary(allWorkers(), rep.workerSummaries, &fillWorkerSummary);
1185  }
1186 
1188  using std::placeholders::_1;
1190  for_all(trig_paths_, std::bind(&Path::clearCounters, _1));
1191  for_all(end_paths_, std::bind(&Path::clearCounters, _1));
1192  for_all(allWorkers(), std::bind(&Worker::clearCounters, _1));
1193  }
1194 
1196  skippingEvent_ = false;
1197  results_->reset();
1198  }
1199 
1201 
1203  //must be sure we have cleared the count first
1204  for (auto& count : earlyDeleteBranchToCount_) {
1205  count.count = 0;
1206  }
1207  //now reset based on how many helpers use that branch
1209  ++(earlyDeleteBranchToCount_[index].count);
1210  }
1211  for (auto& helper : earlyDeleteHelpers_) {
1212  helper.reset();
1213  }
1214  }
1215 
1217  std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
1218  std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
1220  int bitpos = 0;
1221  unsigned int indexEmpty = 0;
1222  unsigned int indexOfPath = 0;
1223  for (auto& pathStatusInserter : pathStatusInserters) {
1224  std::shared_ptr<PathStatusInserter> inserterPtr = get_underlying(pathStatusInserter);
1225  WorkerPtr workerPtr(
1226  new edm::WorkerT<PathStatusInserter::ModuleType>(inserterPtr, inserterPtr->moduleDescription(), &actions));
1227  pathStatusInserterWorkers_.emplace_back(workerPtr);
1228  workerPtr->setActivityRegistry(actReg_);
1229  addToAllWorkers(workerPtr.get());
1230 
1231  // A little complexity here because a C++ Path object is not
1232  // instantiated and put into end_paths if there are no modules
1233  // on the configured path.
1234  if (indexEmpty < empty_trig_paths_.size() && bitpos == empty_trig_paths_.at(indexEmpty)) {
1235  ++indexEmpty;
1236  } else {
1237  trig_paths_.at(indexOfPath).setPathStatusInserter(inserterPtr.get(), workerPtr.get());
1238  ++indexOfPath;
1239  }
1240  ++bitpos;
1241  }
1242 
1243  bitpos = 0;
1244  indexEmpty = 0;
1245  indexOfPath = 0;
1246  for (auto& endPathStatusInserter : endPathStatusInserters) {
1247  std::shared_ptr<EndPathStatusInserter> inserterPtr = get_underlying(endPathStatusInserter);
1248  WorkerPtr workerPtr(
1249  new edm::WorkerT<EndPathStatusInserter::ModuleType>(inserterPtr, inserterPtr->moduleDescription(), &actions));
1250  endPathStatusInserterWorkers_.emplace_back(workerPtr);
1251  workerPtr->setActivityRegistry(actReg_);
1252  addToAllWorkers(workerPtr.get());
1253 
1254  // A little complexity here because a C++ Path object is not
1255  // instantiated and put into end_paths if there are no modules
1256  // on the configured path.
1257  if (indexEmpty < empty_end_paths_.size() && bitpos == empty_end_paths_.at(indexEmpty)) {
1258  ++indexEmpty;
1259  } else {
1260  end_paths_.at(indexOfPath).setPathStatusInserter(nullptr, workerPtr.get());
1261  ++indexOfPath;
1262  }
1263  ++bitpos;
1264  }
1265  }
1266 } // namespace edm
static void fillModuleInPathSummary(Path const &path, size_t which, ModuleInPathSummary &sum)
size
Write out results.
void moduleDescriptionsInEndPath(std::string const &iEndPathLabel, std::vector< ModuleDescription const *> &descriptions, unsigned int hint) const
ExceptionToActionTable const & actionTable() const
returns the action table
T getParameter(std::string const &) const
Definition: ParameterSet.h:303
void fillTrigPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, TrigResPtr, std::vector< std::string > const &endPathNames, ConditionalTaskHelper const &conditionalTaskHelper)
static const TGPicture * info(bool iBackgroundIsBlack)
#define CMS_SA_ALLOW
std::vector< int > empty_trig_paths_
std::multimap< std::string, edm::BranchDescription const * > conditionalModsBranches_
Definition: helper.py:1
roAction_t actions[nactions]
Definition: GenABIO.cc:181
bool getMapped(key_type const &k, value_type &result) const
Definition: Registry.cc:17
std::vector< edm::propagate_const< WorkerPtr > > pathStatusInserterWorkers_
void endStream(StreamID iID, StreamContext &streamContext)
virtual void replaceModuleFor(Worker *) const =0
ProductList const & productList() const
void processAccumulatorsAsync(WaitingTaskHolder, typename T::TransitionInfoType const &, ServiceToken const &, StreamID, ParentContext const &, typename T::Context const *)
std::array< SelectedProducts, NumBranchTypes > SelectedProductsForBranchType
static PFTauRenderPlugin instance
T w() const
std::shared_ptr< HLTGlobalStatus > TrigResPtr
WorkersInPath::size_type size_type
Definition: Path.h:46
int totalEventsPassed() const
ret
prodAgent to be discontinued
void addToUnscheduledWorkers(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration > processConfiguration, std::string label, std::set< std::string > &unscheduledLabels, std::vector< std::string > &shouldBeUsedLabels)
void modulesInPath(std::string const &iPathLabel, std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel ...
void processOneEventAsync(WaitingTaskHolder iTask, EventTransitionInfo &, ServiceToken const &token, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters)
std::vector< BranchToCount > earlyDeleteBranchToCount_
void setupOnDemandSystem(EventTransitionInfo const &)
void clearCounters()
Definition: Worker.h:222
void addContextAndPrintException(char const *context, cms::Exception &ex, bool disablePrint)
void addToAllWorkers(Worker *w)
std::string const & moduleName() const
edm::propagate_const< WorkerPtr > results_inserter_
void deleteModuleIfExists(std::string const &moduleLabel)
std::shared_ptr< Worker > WorkerPtr
void find(edm::Handle< EcalRecHitCollection > &hits, DetId thisDet, std::vector< EcalRecHitCollection::const_iterator > &hit, bool debug=false)
Definition: FindCaloHit.cc:19
std::multimap< std::string, AliasInfo > aliasMap_
assert(be >=bs)
StreamID streamID() const
std::vector< Worker * > tryToPlaceConditionalModules(Worker *, std::unordered_set< std::string > &conditionalModules, std::multimap< std::string, edm::BranchDescription const *> const &conditionalModuleBranches, std::multimap< std::string, AliasInfo > const &aliasMap, ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration)
ConditionalTaskHelper(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, WorkerManager &workerManager, std::vector< std::string > const &trigPathNames)
unsigned int number_of_unscheduled_modules_
int totalEventsFailed() const
TEMPL(T2) struct Divides void
Definition: Factorize.h:24
WorkerManager workerManager_
std::shared_ptr< ActivityRegistry > actReg_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
T getUntrackedParameter(std::string const &, T const &) const
constexpr element_type const * get() const
std::string moduleLabel
Definition: TriggerReport.h:52
oneapi::tbb::task_group * group() const noexcept
void processEDAliases(std::vector< std::string > const &aliasNamesToProcess, std::unordered_set< std::string > const &aliasModulesToProcess, ParameterSet const &proc_pset, std::string const &processName, ProductRegistry &preg)
char const * label
std::vector< WorkerInPath > PathWorkers
std::vector< int > empty_end_paths_
void doneWaiting(std::exception_ptr iExcept)
accept
Definition: HLTenums.h:18
void fillEndPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, std::vector< std::string > const &endPathNames, ConditionalTaskHelper const &conditionalTaskHelper)
std::multimap< std::string, edm::BranchDescription const * > conditionalModuleBranches(std::unordered_set< std::string > const &conditionalmods) const
void processSwitchEDAliases(ParameterSet const &proc_pset, ProductRegistry &preg, ProcessConfiguration const &processConfiguration, std::unordered_set< std::string > const &allConditionalMods)
std::string name
Definition: TriggerReport.h:41
void moduleDescriptionsInPath(std::string const &iPathLabel, std::vector< ModuleDescription const *> &descriptions, unsigned int hint) const
TrigResConstPtr results() const
Strings const & getTrigPaths() const
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
def unique(seq, keepstr=True)
Definition: tier0.py:24
Definition: Path.h:41
StreamContext streamContext_
std::vector< std::string > vstring
virtual std::vector< ConsumesInfo > consumesInfo() const =0
static void fillPathSummary(Path const &path, PathSummary &sum)
StreamSchedule(std::shared_ptr< TriggerResultInserter > inserter, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, std::shared_ptr< ModuleRegistry >, ParameterSet &proc_pset, service::TriggerNamesService const &tns, PreallocationConfiguration const &prealloc, ProductRegistry &pregistry, BranchIDListHelper &branchIDListHelper, ExceptionToActionTable const &actions, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration > processConfiguration, StreamID streamID, ProcessContext const *processContext)
void finishedPaths(std::atomic< std::exception_ptr *> &, WaitingTaskHolder, EventTransitionInfo &)
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:101
bool typeIsViewCompatible(TypeID const &requestedViewType, TypeID const &wrappedtypeID, std::string const &className)
rep
Definition: cuy.py:1189
std::exception_ptr finishProcessOneEvent(std::exception_ptr)
ServiceToken lock() const
Definition: ServiceToken.h:101
void deleteModule(std::string const &iLabel)
Delete the module with label iLabel.
int totalEvents() const
static void fillWorkerSummaryAux(Worker const &w, WorkerSummary &sum)
virtual Types moduleType() const =0
Log< level::Info, false > LogInfo
void beginStream(StreamID iID, StreamContext &streamContext)
ModuleDescription const * description() const
Definition: Worker.h:188
std::string const & className() const
Definition: TypeID.cc:40
void clearCounters()
Clear all the counters in the trigger report.
void initializeEarlyDelete(ModuleRegistry &modReg, std::vector< std::string > const &branchesToDeleteEarly, edm::ProductRegistry const &preg)
void fillAliasMap(ParameterSet const &proc_pset, std::unordered_set< std::string > const &allConditionalMods)
Strings const & getEndPaths() const
void getTriggerReport(TriggerReport &rep) const
exception_actions::ActionCodes find(const std::string &category) const
std::vector< unsigned int > earlyDeleteHelperToBranchIndicies_
void forAllModuleHolders(F iFunc)
edm::propagate_const< TrigResPtr > results_
bool search_all(ForwardSequence const &s, Datum const &d)
Definition: Algorithms.h:36
double b
Definition: hdecay.h:118
constexpr T & get_underlying(propagate_const< T > &)
void addContext(std::string const &context)
Definition: Exception.cc:165
virtual std::unique_ptr< OutputModuleCommunicator > createOutputModuleCommunicator()=0
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
static void fillWorkerSummary(Worker const *pw, WorkerSummary &sum)
std::string const & processName() const
std::atomic< bool > skippingEvent_
Definition: plugin.cc:23
HLT enums.
void setupResolvers(Principal &principal)
std::multimap< std::string, AliasInfo > const & aliasMap() const
void replaceModule(maker::ModuleHolder *iMod, std::string const &iLabel)
clone the type of module with label iLabel but configure with iPSet.
void fillWorkers(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &name, bool ignoreFilters, PathWorkers &out, std::vector< std::string > const &endPathNames, ConditionalTaskHelper const &conditionalTaskHelper)
std::vector< edm::propagate_const< WorkerPtr > > endPathStatusInserterWorkers_
void makePathStatusInserters(std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, ExceptionToActionTable const &actions)
def which(cmd)
Definition: eostools.py:336
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
std::vector< ModuleInPathSummary > moduleInPathSummaries
Definition: TriggerReport.h:42
void availablePaths(std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill the labels for all paths in the process
unsigned int value() const
Definition: StreamID.h:43
auto wrap(F iFunc) -> decltype(iFunc())
Log< level::Warning, false > LogWarning
std::list< std::string > const & context() const
Definition: Exception.cc:147
std::string const & moduleLabel() const
T mod(const T &a, const T &b)
Definition: ecalDccMap.h:4
std::string const & name() const
Definition: Path.h:74
std::vector< std::string > vstring
Definition: Schedule.cc:473
std::vector< std::string > set_difference(std::vector< std::string > const &v1, std::vector< std::string > const &v2)
def move(src, dest)
Definition: eostools.py:511
static Registry * instance()
Definition: Registry.cc:12
void clearCounters()
Definition: Path.cc:198
std::vector< EarlyDeleteHelper > earlyDeleteHelpers_
void addToAllWorkers(Worker *w)
void printCmsExceptionWarning(char const *behavior, cms::Exception const &e)
ParameterSet * getPSetForUpdate(std::string const &name, bool &isTracked)
std::vector< std::string > set_intersection(std::vector< std::string > const &v1, std::vector< std::string > const &v2)
unsigned transform(const HcalDetId &id, unsigned transformCode)