CMS 3D CMS Logo

StreamSchedule.cc
Go to the documentation of this file.
2 
29 
31 #include "processEDAliases.h"
32 
33 #include <algorithm>
34 #include <cassert>
35 #include <cstdlib>
36 #include <functional>
37 #include <iomanip>
38 #include <list>
39 #include <map>
40 #include <exception>
41 
42 namespace edm {
43 
44  namespace {
45 
46  // Function template to transform each element in the input range to
47  // a value placed into the output range. The supplied function
48  // should take a const_reference to the 'input', and write to a
49  // reference to the 'output'.
50  template <typename InputIterator, typename ForwardIterator, typename Func>
51  void transform_into(InputIterator begin, InputIterator end, ForwardIterator out, Func func) {
52  for (; begin != end; ++begin, ++out)
53  func(*begin, *out);
54  }
55 
56  // Function template that takes a sequence 'from', a sequence
57  // 'to', and a callable object 'func'. It and applies
58  // transform_into to fill the 'to' sequence with the values
59  // calcuated by the callable object, taking care to fill the
60  // outupt only if all calls succeed.
61  template <typename FROM, typename TO, typename FUNC>
62  void fill_summary(FROM const& from, TO& to, FUNC func) {
63  if (to.size() != from.size()) {
64  TO temp(from.size());
65  transform_into(from.begin(), from.end(), temp.begin(), func);
66  to.swap(temp);
67  } else {
68  transform_into(from.begin(), from.end(), to.begin(), func);
69  }
70  }
71 
72  // -----------------------------
73 
74  // Here we make the trigger results inserter directly. This should
75  // probably be a utility in the WorkerRegistry or elsewhere.
76 
77  StreamSchedule::WorkerPtr makeInserter(ExceptionToActionTable const& actions,
78  std::shared_ptr<ActivityRegistry> areg,
79  std::shared_ptr<TriggerResultInserter> inserter) {
81  new edm::WorkerT<TriggerResultInserter::ModuleType>(inserter, inserter->moduleDescription(), &actions));
82  ptr->setActivityRegistry(areg);
83  return ptr;
84  }
85 
86  void initializeBranchToReadingWorker(std::vector<std::string> const& branchesToDeleteEarly,
87  ProductRegistry const& preg,
88  std::multimap<std::string, Worker*>& branchToReadingWorker) {
89  auto vBranchesToDeleteEarly = branchesToDeleteEarly;
90  // Remove any duplicates
91  std::sort(vBranchesToDeleteEarly.begin(), vBranchesToDeleteEarly.end(), std::less<std::string>());
92  vBranchesToDeleteEarly.erase(std::unique(vBranchesToDeleteEarly.begin(), vBranchesToDeleteEarly.end()),
93  vBranchesToDeleteEarly.end());
94 
95  // Are the requested items in the product registry?
96  auto allBranchNames = preg.allBranchNames();
97  //the branch names all end with a period, which we do not want to compare with
98  for (auto& b : allBranchNames) {
99  b.resize(b.size() - 1);
100  }
101  std::sort(allBranchNames.begin(), allBranchNames.end(), std::less<std::string>());
102  std::vector<std::string> temp;
103  temp.reserve(vBranchesToDeleteEarly.size());
104 
105  std::set_intersection(vBranchesToDeleteEarly.begin(),
106  vBranchesToDeleteEarly.end(),
107  allBranchNames.begin(),
108  allBranchNames.end(),
109  std::back_inserter(temp));
110  vBranchesToDeleteEarly.swap(temp);
111  if (temp.size() != vBranchesToDeleteEarly.size()) {
112  std::vector<std::string> missingProducts;
113  std::set_difference(temp.begin(),
114  temp.end(),
115  vBranchesToDeleteEarly.begin(),
116  vBranchesToDeleteEarly.end(),
117  std::back_inserter(missingProducts));
118  LogInfo l("MissingProductsForCanDeleteEarly");
119  l << "The following products in the 'canDeleteEarly' list are not available in this job and will be ignored.";
120  for (auto const& n : missingProducts) {
121  l << "\n " << n;
122  }
123  }
124  //set placeholder for the branch, we will remove the nullptr if a
125  // module actually wants the branch.
126  for (auto const& branch : vBranchesToDeleteEarly) {
127  branchToReadingWorker.insert(std::make_pair(branch, static_cast<Worker*>(nullptr)));
128  }
129  }
130  } // namespace
131 
132  // -----------------------------
133 
134  typedef std::vector<std::string> vstring;
135 
136  // -----------------------------
137 
139  std::shared_ptr<TriggerResultInserter> inserter,
140  std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
141  std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
142  std::shared_ptr<ModuleRegistry> modReg,
143  ParameterSet& proc_pset,
144  service::TriggerNamesService const& tns,
145  PreallocationConfiguration const& prealloc,
146  ProductRegistry& preg,
147  BranchIDListHelper& branchIDListHelper,
149  std::shared_ptr<ActivityRegistry> areg,
150  std::shared_ptr<ProcessConfiguration> processConfiguration,
151  StreamID streamID,
152  ProcessContext const* processContext)
153  : workerManager_(modReg, areg, actions),
154  actReg_(areg),
155  results_(new HLTGlobalStatus(tns.getTrigPaths().size())),
156  results_inserter_(),
157  trig_paths_(),
158  end_paths_(),
159  total_events_(),
160  total_passed_(),
161  number_of_unscheduled_modules_(0),
162  streamID_(streamID),
163  streamContext_(streamID_, processContext),
164  skippingEvent_(false) {
165  bool hasPath = false;
166  std::vector<std::string> const& pathNames = tns.getTrigPaths();
167  std::vector<std::string> const& endPathNames = tns.getEndPaths();
168 
169  int trig_bitpos = 0;
170  trig_paths_.reserve(pathNames.size());
171  for (auto const& trig_name : pathNames) {
172  fillTrigPath(proc_pset, preg, &prealloc, processConfiguration, trig_bitpos, trig_name, results(), endPathNames);
173  ++trig_bitpos;
174  hasPath = true;
175  }
176 
177  if (hasPath) {
178  // the results inserter stands alone
179  inserter->setTrigResultForStream(streamID.value(), results());
180 
181  results_inserter_ = makeInserter(actions, actReg_, inserter);
183  }
184 
185  // fill normal endpaths
186  int bitpos = 0;
187  end_paths_.reserve(endPathNames.size());
188  for (auto const& end_path_name : endPathNames) {
189  fillEndPath(proc_pset, preg, &prealloc, processConfiguration, bitpos, end_path_name, endPathNames);
190  ++bitpos;
191  }
192 
193  makePathStatusInserters(pathStatusInserters, endPathStatusInserters, actions);
194 
195  //See if all modules were used
196  std::set<std::string> usedWorkerLabels;
197  for (auto const& worker : allWorkers()) {
198  usedWorkerLabels.insert(worker->description()->moduleLabel());
199  }
200  std::vector<std::string> modulesInConfig(proc_pset.getParameter<std::vector<std::string>>("@all_modules"));
201  std::set<std::string> modulesInConfigSet(modulesInConfig.begin(), modulesInConfig.end());
202  std::vector<std::string> unusedLabels;
203  set_difference(modulesInConfigSet.begin(),
204  modulesInConfigSet.end(),
205  usedWorkerLabels.begin(),
206  usedWorkerLabels.end(),
207  back_inserter(unusedLabels));
208  std::set<std::string> unscheduledLabels;
209  std::vector<std::string> shouldBeUsedLabels;
210  if (!unusedLabels.empty()) {
211  //Need to
212  // 1) create worker
213  // 2) if it is a WorkerT<EDProducer>, add it to our list
214  // 3) hand list to our delayed reader
215  for (auto const& label : unusedLabels) {
216  bool isTracked;
217  ParameterSet* modulePSet(proc_pset.getPSetForUpdate(label, isTracked));
218  assert(isTracked);
219  assert(modulePSet != nullptr);
221  *modulePSet, preg, &prealloc, processConfiguration, label, unscheduledLabels, shouldBeUsedLabels);
222  }
223  if (!shouldBeUsedLabels.empty()) {
224  std::ostringstream unusedStream;
225  unusedStream << "'" << shouldBeUsedLabels.front() << "'";
226  for (std::vector<std::string>::iterator itLabel = shouldBeUsedLabels.begin() + 1,
227  itLabelEnd = shouldBeUsedLabels.end();
228  itLabel != itLabelEnd;
229  ++itLabel) {
230  unusedStream << ",'" << *itLabel << "'";
231  }
232  LogInfo("path") << "The following module labels are not assigned to any path:\n" << unusedStream.str() << "\n";
233  }
234  }
235  number_of_unscheduled_modules_ = unscheduledLabels.size();
236  } // StreamSchedule::StreamSchedule
237 
239  std::vector<std::string> const& branchesToDeleteEarly,
240  edm::ProductRegistry const& preg) {
241  // setup the list with those products actually registered for this job
242  std::multimap<std::string, Worker*> branchToReadingWorker;
243  initializeBranchToReadingWorker(branchesToDeleteEarly, preg, branchToReadingWorker);
244 
245  const std::vector<std::string> kEmpty;
246  std::map<Worker*, unsigned int> reserveSizeForWorker;
247  unsigned int upperLimitOnReadingWorker = 0;
248  unsigned int upperLimitOnIndicies = 0;
249  unsigned int nUniqueBranchesToDelete = branchToReadingWorker.size();
250 
251  //talk with output modules first
252  modReg.forAllModuleHolders([&branchToReadingWorker, &nUniqueBranchesToDelete](maker::ModuleHolder* iHolder) {
253  auto comm = iHolder->createOutputModuleCommunicator();
254  if (comm) {
255  if (!branchToReadingWorker.empty()) {
256  //If an OutputModule needs a product, we can't delete it early
257  // so we should remove it from our list
258  SelectedProductsForBranchType const& kept = comm->keptProducts();
259  for (auto const& item : kept[InEvent]) {
260  BranchDescription const& desc = *item.first;
261  auto found = branchToReadingWorker.equal_range(desc.branchName());
262  if (found.first != found.second) {
263  --nUniqueBranchesToDelete;
264  branchToReadingWorker.erase(found.first, found.second);
265  }
266  }
267  }
268  }
269  });
270 
271  if (branchToReadingWorker.empty()) {
272  return;
273  }
274 
275  for (auto w : allWorkers()) {
276  //determine if this module could read a branch we want to delete early
277  auto pset = pset::Registry::instance()->getMapped(w->description()->parameterSetID());
278  if (nullptr != pset) {
279  auto branches = pset->getUntrackedParameter<std::vector<std::string>>("mightGet", kEmpty);
280  if (not branches.empty()) {
281  ++upperLimitOnReadingWorker;
282  }
283  for (auto const& branch : branches) {
284  auto found = branchToReadingWorker.equal_range(branch);
285  if (found.first != found.second) {
286  ++upperLimitOnIndicies;
287  ++reserveSizeForWorker[w];
288  if (nullptr == found.first->second) {
289  found.first->second = w;
290  } else {
291  branchToReadingWorker.insert(make_pair(found.first->first, w));
292  }
293  }
294  }
295  }
296  }
297  {
298  auto it = branchToReadingWorker.begin();
299  std::vector<std::string> unusedBranches;
300  while (it != branchToReadingWorker.end()) {
301  if (it->second == nullptr) {
302  unusedBranches.push_back(it->first);
303  //erasing the object invalidates the iterator so must advance it first
304  auto temp = it;
305  ++it;
306  branchToReadingWorker.erase(temp);
307  } else {
308  ++it;
309  }
310  }
311  if (not unusedBranches.empty()) {
312  LogWarning l("UnusedProductsForCanDeleteEarly");
313  l << "The following products in the 'canDeleteEarly' list are not used in this job and will be ignored.\n"
314  " If possible, remove the producer from the job or add the product to the producer's own 'mightGet' list.";
315  for (auto const& n : unusedBranches) {
316  l << "\n " << n;
317  }
318  }
319  }
320  if (!branchToReadingWorker.empty()) {
321  earlyDeleteHelpers_.reserve(upperLimitOnReadingWorker);
322  earlyDeleteHelperToBranchIndicies_.resize(upperLimitOnIndicies, 0);
323  earlyDeleteBranchToCount_.reserve(nUniqueBranchesToDelete);
324  std::map<const Worker*, EarlyDeleteHelper*> alreadySeenWorkers;
325  std::string lastBranchName;
326  size_t nextOpenIndex = 0;
327  unsigned int* beginAddress = &(earlyDeleteHelperToBranchIndicies_.front());
328  for (auto& branchAndWorker : branchToReadingWorker) {
329  if (lastBranchName != branchAndWorker.first) {
330  //have to put back the period we removed earlier in order to get the proper name
331  BranchID bid(branchAndWorker.first + ".");
332  earlyDeleteBranchToCount_.emplace_back(bid, 0U);
333  lastBranchName = branchAndWorker.first;
334  }
335  auto found = alreadySeenWorkers.find(branchAndWorker.second);
336  if (alreadySeenWorkers.end() == found) {
337  //NOTE: we will set aside enough space in earlyDeleteHelperToBranchIndicies_ to accommodate
338  // all the branches that might be read by this worker. However, initially we will only tell the
339  // EarlyDeleteHelper about the first one. As additional branches are added via 'appendIndex' the
340  // EarlyDeleteHelper will automatically advance its internal end pointer.
341  size_t index = nextOpenIndex;
342  size_t nIndices = reserveSizeForWorker[branchAndWorker.second];
344  earlyDeleteHelpers_.emplace_back(beginAddress + index, beginAddress + index + 1, &earlyDeleteBranchToCount_);
345  branchAndWorker.second->setEarlyDeleteHelper(&(earlyDeleteHelpers_.back()));
346  alreadySeenWorkers.insert(std::make_pair(branchAndWorker.second, &(earlyDeleteHelpers_.back())));
347  nextOpenIndex += nIndices;
348  } else {
349  found->second->appendIndex(earlyDeleteBranchToCount_.size() - 1);
350  }
351  }
352 
353  //Now we can compactify the earlyDeleteHelperToBranchIndicies_ since we may have over estimated the
354  // space needed for each module
355  auto itLast = earlyDeleteHelpers_.begin();
356  for (auto it = earlyDeleteHelpers_.begin() + 1; it != earlyDeleteHelpers_.end(); ++it) {
357  if (itLast->end() != it->begin()) {
358  //figure the offset for next Worker since it hasn't been moved yet so it has the original address
359  unsigned int delta = it->begin() - itLast->end();
360  it->shiftIndexPointers(delta);
361 
363  earlyDeleteHelperToBranchIndicies_.begin() + (itLast->end() - beginAddress),
364  earlyDeleteHelperToBranchIndicies_.begin() + (it->begin() - beginAddress));
365  }
366  itLast = it;
367  }
369  earlyDeleteHelperToBranchIndicies_.begin() + (itLast->end() - beginAddress),
371 
372  //now tell the paths about the deleters
373  for (auto& p : trig_paths_) {
374  p.setEarlyDeleteHelpers(alreadySeenWorkers);
375  }
376  for (auto& p : end_paths_) {
377  p.setEarlyDeleteHelpers(alreadySeenWorkers);
378  }
380  }
381  }
382 
384  ParameterSet& proc_pset,
385  WorkerManager& workerManager,
386  ProductRegistry& preg,
387  PreallocationConfiguration const* prealloc,
388  std::shared_ptr<ProcessConfiguration const> processConfiguration) {
389  bool isTracked;
390  ParameterSet* modpset = proc_pset.getPSetForUpdate(moduleLabel, isTracked);
391  if (modpset == nullptr) {
392  return nullptr;
393  }
394  assert(isTracked);
395 
396  return workerManager.getWorker(*modpset, preg, prealloc, processConfiguration, moduleLabel);
397  }
398 
400  Worker* worker,
401  std::unordered_set<std::string>& conditionalModules,
402  std::multimap<std::string, edm::BranchDescription const*> const& conditionalModuleBranches,
403  std::multimap<std::string, AliasInfo> const& aliasMap,
404  ParameterSet& proc_pset,
405  ProductRegistry& preg,
406  PreallocationConfiguration const* prealloc,
407  std::shared_ptr<ProcessConfiguration const> processConfiguration) {
408  std::vector<Worker*> returnValue;
409  auto const& consumesInfo = worker->consumesInfo();
410  auto moduleLabel = worker->description()->moduleLabel();
411  using namespace productholderindexhelper;
412  for (auto const& ci : consumesInfo) {
413  if (not ci.skipCurrentProcess() and
414  (ci.process().empty() or ci.process() == processConfiguration->processName())) {
415  auto productModuleLabel = ci.label();
416  if (productModuleLabel.empty()) {
417  //this is a consumesMany request
418  for (auto const& branch : conditionalModuleBranches) {
419  //check that the conditional module has not been used
420  if (conditionalModules.find(branch.first) == conditionalModules.end()) {
421  continue;
422  }
423  if (ci.kindOfType() == edm::PRODUCT_TYPE) {
424  if (branch.second->unwrappedTypeID() != ci.type()) {
425  continue;
426  }
427  } else {
428  if (not typeIsViewCompatible(
429  ci.type(), TypeID(branch.second->wrappedType().typeInfo()), branch.second->className())) {
430  continue;
431  }
432  }
433 
434  auto condWorker = getWorker(branch.first, proc_pset, workerManager_, preg, prealloc, processConfiguration);
435  assert(condWorker);
436 
437  conditionalModules.erase(branch.first);
438 
439  auto dependents = tryToPlaceConditionalModules(condWorker,
440  conditionalModules,
441  conditionalModuleBranches,
442  aliasMap,
443  proc_pset,
444  preg,
445  prealloc,
446  processConfiguration);
447  returnValue.insert(returnValue.end(), dependents.begin(), dependents.end());
448  returnValue.push_back(condWorker);
449  }
450  } else {
451  //just a regular consumes
452  bool productFromConditionalModule = false;
453  auto itFound = conditionalModules.find(productModuleLabel);
454  if (itFound == conditionalModules.end()) {
455  //Check to see if this was an alias
456  auto findAlias = aliasMap.equal_range(productModuleLabel);
457  for (auto it = findAlias.first; it != findAlias.second; ++it) {
458  //this was previously filtered so only the conditional modules remain
459  productModuleLabel = it->second.originalModuleLabel;
460  if (it->second.instanceLabel == "*" or ci.instance() == it->second.instanceLabel) {
461  if (it->second.friendlyClassName == "*" or
462  (ci.type().friendlyClassName() == it->second.friendlyClassName)) {
463  productFromConditionalModule = true;
464  //need to check the rest of the data product info
465  break;
466  } else if (ci.kindOfType() == ELEMENT_TYPE) {
467  //consume is a View so need to do more intrusive search
468  //find matching branches in module
469  auto branches = conditionalModuleBranches.equal_range(productModuleLabel);
470  for (auto itBranch = branches.first; itBranch != branches.second; ++it) {
471  if (it->second.originalInstanceLabel == "*" or
472  itBranch->second->productInstanceName() == it->second.originalInstanceLabel) {
473  if (typeIsViewCompatible(ci.type(),
474  TypeID(itBranch->second->wrappedType().typeInfo()),
475  itBranch->second->className())) {
476  productFromConditionalModule = true;
477  break;
478  }
479  }
480  }
481  if (productFromConditionalModule) {
482  break;
483  }
484  }
485  }
486  }
487  if (productFromConditionalModule) {
488  itFound = conditionalModules.find(productModuleLabel);
489  //check that the alias-for conditional module has not been used
490  if (itFound == conditionalModules.end()) {
491  continue;
492  }
493  }
494  } else {
495  //need to check the rest of the data product info
496  auto findBranches = conditionalModuleBranches.equal_range(productModuleLabel);
497  for (auto itBranch = findBranches.first; itBranch != findBranches.second; ++itBranch) {
498  if (itBranch->second->productInstanceName() == ci.instance()) {
499  if (ci.kindOfType() == PRODUCT_TYPE) {
500  if (ci.type() == itBranch->second->unwrappedTypeID()) {
501  productFromConditionalModule = true;
502  break;
503  }
504  } else {
505  //this is a view
506  if (typeIsViewCompatible(ci.type(),
507  TypeID(itBranch->second->wrappedType().typeInfo()),
508  itBranch->second->className())) {
509  productFromConditionalModule = true;
510  break;
511  }
512  }
513  }
514  }
515  }
516  if (productFromConditionalModule) {
517  auto condWorker =
518  getWorker(productModuleLabel, proc_pset, workerManager_, preg, prealloc, processConfiguration);
519  assert(condWorker);
520 
521  conditionalModules.erase(itFound);
522 
523  auto dependents = tryToPlaceConditionalModules(condWorker,
524  conditionalModules,
525  conditionalModuleBranches,
526  aliasMap,
527  proc_pset,
528  preg,
529  prealloc,
530  processConfiguration);
531  returnValue.insert(returnValue.end(), dependents.begin(), dependents.end());
532  returnValue.push_back(condWorker);
533  }
534  }
535  }
536  }
537  return returnValue;
538  }
539 
541  ProductRegistry& preg,
542  PreallocationConfiguration const* prealloc,
543  std::shared_ptr<ProcessConfiguration const> processConfiguration,
544  std::string const& pathName,
545  bool ignoreFilters,
546  PathWorkers& out,
547  std::vector<std::string> const& endPathNames) {
548  vstring modnames = proc_pset.getParameter<vstring>(pathName);
549  PathWorkers tmpworkers;
550 
551  //Pull out ConditionalTask modules
552  auto itCondBegin = std::find(modnames.begin(), modnames.end(), "#");
553 
554  std::unordered_set<std::string> conditionalmods;
555  //An EDAlias may be redirecting to a module on a ConditionalTask
556  std::multimap<std::string, AliasInfo> aliasMap;
557  std::multimap<std::string, edm::BranchDescription const*> conditionalModsBranches;
558  std::unordered_map<std::string, unsigned int> conditionalModOrder;
559  if (itCondBegin != modnames.end()) {
560  for (auto it = itCondBegin + 1; it != modnames.begin() + modnames.size() - 1; ++it) {
561  // ordering needs to skip the # token in the path list
562  conditionalModOrder.emplace(*it, it - modnames.begin() - 1);
563  }
564  //the last entry should be ignored since it is required to be "@"
565  conditionalmods = std::unordered_set<std::string>(
566  std::make_move_iterator(itCondBegin + 1), std::make_move_iterator(modnames.begin() + modnames.size() - 1));
567 
568  for (auto const& cond : conditionalmods) {
569  //force the creation of the conditional modules so alias check can work
570  (void)getWorker(cond, proc_pset, workerManager_, preg, prealloc, processConfiguration);
571  }
572  //find aliases
573  {
574  auto aliases = proc_pset.getParameter<std::vector<std::string>>("@all_aliases");
575  std::string const star("*");
576  for (auto const& alias : aliases) {
577  auto info = proc_pset.getParameter<edm::ParameterSet>(alias);
578  auto aliasedToModuleLabels = info.getParameterNames();
579  for (auto const& mod : aliasedToModuleLabels) {
580  if (not mod.empty() and mod[0] != '@' and conditionalmods.find(mod) != conditionalmods.end()) {
581  auto aliasVPSet = info.getParameter<std::vector<edm::ParameterSet>>(mod);
582  for (auto const& aliasPSet : aliasVPSet) {
583  std::string type = star;
584  std::string instance = star;
585  std::string originalInstance = star;
586  if (aliasPSet.exists("type")) {
587  type = aliasPSet.getParameter<std::string>("type");
588  }
589  if (aliasPSet.exists("toProductInstance")) {
590  instance = aliasPSet.getParameter<std::string>("toProductInstance");
591  }
592  if (aliasPSet.exists("fromProductInstance")) {
593  originalInstance = aliasPSet.getParameter<std::string>("fromProductInstance");
594  }
595 
596  aliasMap.emplace(alias, AliasInfo{type, instance, originalInstance, mod});
597  }
598  }
599  }
600  }
601  }
602  //find SwitchProducers whose cases are aliases
603  {
604  auto const& all_modules = proc_pset.getParameter<std::vector<std::string>>("@all_modules");
605  std::vector<std::string> switchEDAliases;
606  for (auto const& module : all_modules) {
607  auto const& mod_pset = proc_pset.getParameter<edm::ParameterSet>(module);
608  if (mod_pset.getParameter<std::string>("@module_type") == "SwitchProducer") {
609  auto const& all_cases = mod_pset.getParameter<std::vector<std::string>>("@all_cases");
610  for (auto const& case_label : all_cases) {
611  auto range = aliasMap.equal_range(case_label);
612  if (range.first != range.second) {
613  switchEDAliases.push_back(case_label);
614  }
615  }
616  }
617  }
619  switchEDAliases, conditionalmods, proc_pset, processConfiguration->processName(), preg);
620  }
621  {
622  //find branches created by the conditional modules
623  for (auto const& prod : preg.productList()) {
624  if (conditionalmods.find(prod.first.moduleLabel()) != conditionalmods.end()) {
625  conditionalModsBranches.emplace(prod.first.moduleLabel(), &prod.second);
626  }
627  }
628  }
629  }
630  modnames.erase(itCondBegin, modnames.end());
631 
632  unsigned int placeInPath = 0;
633  for (auto const& name : modnames) {
634  //Modules except EDFilters are set to run concurrently by default
635  bool doNotRunConcurrently = false;
637  if (name[0] == '!') {
638  filterAction = WorkerInPath::Veto;
639  } else if (name[0] == '-' or name[0] == '+') {
640  filterAction = WorkerInPath::Ignore;
641  }
642  if (name[0] == '|' or name[0] == '+') {
643  //cms.wait was specified so do not run concurrently
644  doNotRunConcurrently = true;
645  }
646 
648  if (filterAction != WorkerInPath::Normal or name[0] == '|') {
649  moduleLabel.erase(0, 1);
650  }
651 
652  Worker* worker = getWorker(moduleLabel, proc_pset, workerManager_, preg, prealloc, processConfiguration);
653  if (worker == nullptr) {
654  std::string pathType("endpath");
655  if (!search_all(endPathNames, pathName)) {
656  pathType = std::string("path");
657  }
659  << "The unknown module label \"" << moduleLabel << "\" appears in " << pathType << " \"" << pathName
660  << "\"\n please check spelling or remove that label from the path.";
661  }
662 
663  if (ignoreFilters && filterAction != WorkerInPath::Ignore && worker->moduleType() == Worker::kFilter) {
664  // We have a filter on an end path, and the filter is not explicitly ignored.
665  // See if the filter is allowed.
666  std::vector<std::string> allowed_filters = proc_pset.getUntrackedParameter<vstring>("@filters_on_endpaths");
667  if (!search_all(allowed_filters, worker->description()->moduleName())) {
668  // Filter is not allowed. Ignore the result, and issue a warning.
669  filterAction = WorkerInPath::Ignore;
670  LogWarning("FilterOnEndPath") << "The EDFilter '" << worker->description()->moduleName()
671  << "' with module label '" << moduleLabel << "' appears on EndPath '"
672  << pathName << "'.\n"
673  << "The return value of the filter will be ignored.\n"
674  << "To suppress this warning, either remove the filter from the endpath,\n"
675  << "or explicitly ignore it in the configuration by using cms.ignore().\n";
676  }
677  }
678  bool runConcurrently = not doNotRunConcurrently;
679  if (runConcurrently && worker->moduleType() == Worker::kFilter and filterAction != WorkerInPath::Ignore) {
680  runConcurrently = false;
681  }
682 
683  auto condModules = tryToPlaceConditionalModules(
684  worker, conditionalmods, conditionalModsBranches, aliasMap, proc_pset, preg, prealloc, processConfiguration);
685  for (auto condMod : condModules) {
686  tmpworkers.emplace_back(
687  condMod, WorkerInPath::Ignore, conditionalModOrder[condMod->description()->moduleLabel()], true);
688  }
689 
690  tmpworkers.emplace_back(worker, filterAction, placeInPath, runConcurrently);
691  ++placeInPath;
692  }
693 
694  out.swap(tmpworkers);
695  }
696 
698  ProductRegistry& preg,
699  PreallocationConfiguration const* prealloc,
700  std::shared_ptr<ProcessConfiguration const> processConfiguration,
701  int bitpos,
702  std::string const& name,
703  TrigResPtr trptr,
704  std::vector<std::string> const& endPathNames) {
705  PathWorkers tmpworkers;
706  fillWorkers(proc_pset, preg, prealloc, processConfiguration, name, false, tmpworkers, endPathNames);
707 
708  // an empty path will cause an extra bit that is not used
709  if (!tmpworkers.empty()) {
710  trig_paths_.emplace_back(bitpos,
711  name,
712  tmpworkers,
713  trptr,
714  actionTable(),
715  actReg_,
719  } else {
720  empty_trig_paths_.push_back(bitpos);
721  }
722  for (WorkerInPath const& workerInPath : tmpworkers) {
723  addToAllWorkers(workerInPath.getWorker());
724  }
725  }
726 
728  ProductRegistry& preg,
729  PreallocationConfiguration const* prealloc,
730  std::shared_ptr<ProcessConfiguration const> processConfiguration,
731  int bitpos,
732  std::string const& name,
733  std::vector<std::string> const& endPathNames) {
734  PathWorkers tmpworkers;
735  fillWorkers(proc_pset, preg, prealloc, processConfiguration, name, true, tmpworkers, endPathNames);
736 
737  if (!tmpworkers.empty()) {
738  //EndPaths are not supposed to stop if SkipEvent type exception happens
739  end_paths_.emplace_back(bitpos,
740  name,
741  tmpworkers,
742  TrigResPtr(),
743  actionTable(),
744  actReg_,
746  nullptr,
748  } else {
749  empty_end_paths_.push_back(bitpos);
750  }
751  for (WorkerInPath const& workerInPath : tmpworkers) {
752  addToAllWorkers(workerInPath.getWorker());
753  }
754  }
755 
757 
759 
761  Worker* found = nullptr;
762  for (auto const& worker : allWorkers()) {
763  if (worker->description()->moduleLabel() == iLabel) {
764  found = worker;
765  break;
766  }
767  }
768  if (nullptr == found) {
769  return;
770  }
771 
772  iMod->replaceModuleFor(found);
773  found->beginStream(streamID_, streamContext_);
774  }
775 
777 
778  std::vector<ModuleDescription const*> StreamSchedule::getAllModuleDescriptions() const {
779  std::vector<ModuleDescription const*> result;
780  result.reserve(allWorkers().size());
781 
782  for (auto const& worker : allWorkers()) {
783  ModuleDescription const* p = worker->description();
784  result.push_back(p);
785  }
786  return result;
787  }
788 
790  WaitingTaskHolder iTask,
792  ServiceToken const& serviceToken,
793  std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters) {
794  EventPrincipal& ep = info.principal();
795 
796  // Caught exception is propagated via WaitingTaskHolder
797  CMS_SA_ALLOW try {
798  this->resetAll();
799 
801 
802  Traits::setStreamContext(streamContext_, ep);
803  //a service may want to communicate with another service
804  ServiceRegistry::Operate guard(serviceToken);
805  Traits::preScheduleSignal(actReg_.get(), &streamContext_);
806 
807  HLTPathStatus hltPathStatus(hlt::Pass, 0);
808  for (int empty_trig_path : empty_trig_paths_) {
809  results_->at(empty_trig_path) = hltPathStatus;
810  pathStatusInserters[empty_trig_path]->setPathStatus(streamID_, hltPathStatus);
811  std::exception_ptr except = pathStatusInserterWorkers_[empty_trig_path]
814  if (except) {
815  iTask.doneWaiting(except);
816  return;
817  }
818  }
819  for (int empty_end_path : empty_end_paths_) {
820  std::exception_ptr except = endPathStatusInserterWorkers_[empty_end_path]
823  if (except) {
824  iTask.doneWaiting(except);
825  return;
826  }
827  }
828 
831 
832  ++total_events_;
833 
834  //use to give priorities on an error to ones from Paths
835  auto pathErrorHolder = std::make_unique<std::atomic<std::exception_ptr*>>(nullptr);
836  auto pathErrorPtr = pathErrorHolder.get();
837  ServiceWeakToken weakToken = serviceToken;
838  auto allPathsDone = make_waiting_task(
839  [iTask, this, weakToken, pathError = std::move(pathErrorHolder)](std::exception_ptr const* iPtr) mutable {
840  ServiceRegistry::Operate operate(weakToken.lock());
841 
842  std::exception_ptr ptr;
843  if (pathError->load()) {
844  ptr = *pathError->load();
845  delete pathError->load();
846  }
847  if ((not ptr) and iPtr) {
848  ptr = *iPtr;
849  }
851  });
852  //The holder guarantees that if the paths finish before the loop ends
853  // that we do not start too soon. It also guarantees that the task will
854  // run under that condition.
855  WaitingTaskHolder allPathsHolder(*iTask.group(), allPathsDone);
856 
857  auto pathsDone = make_waiting_task([allPathsHolder, pathErrorPtr, transitionInfo = info, this, weakToken](
858  std::exception_ptr const* iPtr) mutable {
859  ServiceRegistry::Operate operate(weakToken.lock());
860 
861  if (iPtr) {
862  //this is used to prioritize this error over one
863  // that happens in EndPath or Accumulate
864  pathErrorPtr->store(new std::exception_ptr(*iPtr));
865  }
866  finishedPaths(*pathErrorPtr, std::move(allPathsHolder), transitionInfo);
867  });
868 
869  //The holder guarantees that if the paths finish before the loop ends
870  // that we do not start too soon. It also guarantees that the task will
871  // run under that condition.
872  WaitingTaskHolder taskHolder(*iTask.group(), pathsDone);
873 
874  //start end paths first so on single threaded the paths will run first
875  WaitingTaskHolder hAllPathsDone(*iTask.group(), allPathsDone);
876  for (auto it = end_paths_.rbegin(), itEnd = end_paths_.rend(); it != itEnd; ++it) {
877  it->processOneOccurrenceAsync(hAllPathsDone, info, serviceToken, streamID_, &streamContext_);
878  }
879 
880  for (auto it = trig_paths_.rbegin(), itEnd = trig_paths_.rend(); it != itEnd; ++it) {
881  it->processOneOccurrenceAsync(taskHolder, info, serviceToken, streamID_, &streamContext_);
882  }
883 
884  ParentContext parentContext(&streamContext_);
886  hAllPathsDone, info, serviceToken, streamID_, parentContext, &streamContext_);
887  } catch (...) {
888  iTask.doneWaiting(std::current_exception());
889  }
890  }
891 
892  void StreamSchedule::finishedPaths(std::atomic<std::exception_ptr*>& iExcept,
893  WaitingTaskHolder iWait,
895  if (iExcept) {
896  // Caught exception is propagated via WaitingTaskHolder
897  CMS_SA_ALLOW try { std::rethrow_exception(*(iExcept.load())); } catch (cms::Exception& e) {
902  edm::printCmsExceptionWarning("SkipEvent", e);
903  *(iExcept.load()) = std::exception_ptr();
904  } else {
905  *(iExcept.load()) = std::current_exception();
906  }
907  } catch (...) {
908  *(iExcept.load()) = std::current_exception();
909  }
910  }
911 
912  if ((not iExcept) and results_->accept()) {
913  ++total_passed_;
914  }
915 
916  if (nullptr != results_inserter_.get()) {
917  // Caught exception is propagated to the caller
918  CMS_SA_ALLOW try {
919  //Even if there was an exception, we need to allow results inserter
920  // to run since some module may be waiting on its results.
921  ParentContext parentContext(&streamContext_);
923 
924  auto expt = results_inserter_->runModuleDirectly<Traits>(info, streamID_, parentContext, &streamContext_);
925  if (expt) {
926  std::rethrow_exception(expt);
927  }
928  } catch (cms::Exception& ex) {
929  if (not iExcept) {
930  if (ex.context().empty()) {
931  std::ostringstream ost;
932  ost << "Processing Event " << info.principal().id();
933  ex.addContext(ost.str());
934  }
935  iExcept.store(new std::exception_ptr(std::current_exception()));
936  }
937  } catch (...) {
938  if (not iExcept) {
939  iExcept.store(new std::exception_ptr(std::current_exception()));
940  }
941  }
942  }
943  std::exception_ptr ptr;
944  if (iExcept) {
945  ptr = *iExcept.load();
946  }
947  iWait.doneWaiting(ptr);
948  }
949 
950  std::exception_ptr StreamSchedule::finishProcessOneEvent(std::exception_ptr iExcept) {
952 
953  if (iExcept) {
954  //add context information to the exception and print message
955  try {
956  convertException::wrap([&]() { std::rethrow_exception(iExcept); });
957  } catch (cms::Exception& ex) {
958  bool const cleaningUpAfterException = false;
959  if (ex.context().empty()) {
960  addContextAndPrintException("Calling function StreamSchedule::processOneEvent", ex, cleaningUpAfterException);
961  } else {
962  addContextAndPrintException("", ex, cleaningUpAfterException);
963  }
964  iExcept = std::current_exception();
965  }
966 
967  actReg_->preStreamEarlyTerminationSignal_(streamContext_, TerminationOrigin::ExceptionFromThisContext);
968  }
969  // Caught exception is propagated to the caller
970  CMS_SA_ALLOW try { Traits::postScheduleSignal(actReg_.get(), &streamContext_); } catch (...) {
971  if (not iExcept) {
972  iExcept = std::current_exception();
973  }
974  }
975  if (not iExcept) {
977  }
978 
979  return iExcept;
980  }
981 
982  void StreamSchedule::availablePaths(std::vector<std::string>& oLabelsToFill) const {
983  oLabelsToFill.reserve(trig_paths_.size());
984  std::transform(trig_paths_.begin(),
985  trig_paths_.end(),
986  std::back_inserter(oLabelsToFill),
987  std::bind(&Path::name, std::placeholders::_1));
988  }
989 
990  void StreamSchedule::modulesInPath(std::string const& iPathLabel, std::vector<std::string>& oLabelsToFill) const {
991  TrigPaths::const_iterator itFound = std::find_if(
992  trig_paths_.begin(),
993  trig_paths_.end(),
994  std::bind(std::equal_to<std::string>(), iPathLabel, std::bind(&Path::name, std::placeholders::_1)));
995  if (itFound != trig_paths_.end()) {
996  oLabelsToFill.reserve(itFound->size());
997  for (size_t i = 0; i < itFound->size(); ++i) {
998  oLabelsToFill.push_back(itFound->getWorker(i)->description()->moduleLabel());
999  }
1000  }
1001  }
1002 
1004  std::vector<ModuleDescription const*>& descriptions,
1005  unsigned int hint) const {
1006  descriptions.clear();
1007  bool found = false;
1008  TrigPaths::const_iterator itFound;
1009 
1010  if (hint < trig_paths_.size()) {
1011  itFound = trig_paths_.begin() + hint;
1012  if (itFound->name() == iPathLabel)
1013  found = true;
1014  }
1015  if (!found) {
1016  // if the hint did not work, do it the slow way
1017  itFound = std::find_if(
1018  trig_paths_.begin(),
1019  trig_paths_.end(),
1020  std::bind(std::equal_to<std::string>(), iPathLabel, std::bind(&Path::name, std::placeholders::_1)));
1021  if (itFound != trig_paths_.end())
1022  found = true;
1023  }
1024  if (found) {
1025  descriptions.reserve(itFound->size());
1026  for (size_t i = 0; i < itFound->size(); ++i) {
1027  descriptions.push_back(itFound->getWorker(i)->description());
1028  }
1029  }
1030  }
1031 
1033  std::vector<ModuleDescription const*>& descriptions,
1034  unsigned int hint) const {
1035  descriptions.clear();
1036  bool found = false;
1037  TrigPaths::const_iterator itFound;
1038 
1039  if (hint < end_paths_.size()) {
1040  itFound = end_paths_.begin() + hint;
1041  if (itFound->name() == iEndPathLabel)
1042  found = true;
1043  }
1044  if (!found) {
1045  // if the hint did not work, do it the slow way
1046  itFound = std::find_if(
1047  end_paths_.begin(),
1048  end_paths_.end(),
1049  std::bind(std::equal_to<std::string>(), iEndPathLabel, std::bind(&Path::name, std::placeholders::_1)));
1050  if (itFound != end_paths_.end())
1051  found = true;
1052  }
1053  if (found) {
1054  descriptions.reserve(itFound->size());
1055  for (size_t i = 0; i < itFound->size(); ++i) {
1056  descriptions.push_back(itFound->getWorker(i)->description());
1057  }
1058  }
1059  }
1060 
1061  static void fillModuleInPathSummary(Path const& path, size_t which, ModuleInPathSummary& sum) {
1062  sum.timesVisited += path.timesVisited(which);
1063  sum.timesPassed += path.timesPassed(which);
1064  sum.timesFailed += path.timesFailed(which);
1065  sum.timesExcept += path.timesExcept(which);
1066  sum.moduleLabel = path.getWorker(which)->description()->moduleLabel();
1067  sum.bitPosition = path.bitPosition(which);
1068  }
1069 
1070  static void fillPathSummary(Path const& path, PathSummary& sum) {
1071  sum.name = path.name();
1072  sum.bitPosition = path.bitPosition();
1073  sum.timesRun += path.timesRun();
1074  sum.timesPassed += path.timesPassed();
1075  sum.timesFailed += path.timesFailed();
1076  sum.timesExcept += path.timesExcept();
1077 
1078  Path::size_type sz = path.size();
1079  if (sum.moduleInPathSummaries.empty()) {
1080  std::vector<ModuleInPathSummary> temp(sz);
1081  for (size_t i = 0; i != sz; ++i) {
1083  }
1084  sum.moduleInPathSummaries.swap(temp);
1085  } else {
1086  assert(sz == sum.moduleInPathSummaries.size());
1087  for (size_t i = 0; i != sz; ++i) {
1089  }
1090  }
1091  }
1092 
1093  static void fillWorkerSummaryAux(Worker const& w, WorkerSummary& sum) {
1094  sum.timesVisited += w.timesVisited();
1095  sum.timesRun += w.timesRun();
1096  sum.timesPassed += w.timesPassed();
1097  sum.timesFailed += w.timesFailed();
1098  sum.timesExcept += w.timesExcept();
1099  sum.moduleLabel = w.description()->moduleLabel();
1100  }
1101 
1102  static void fillWorkerSummary(Worker const* pw, WorkerSummary& sum) { fillWorkerSummaryAux(*pw, sum); }
1103 
1105  rep.eventSummary.totalEvents += totalEvents();
1106  rep.eventSummary.totalEventsPassed += totalEventsPassed();
1107  rep.eventSummary.totalEventsFailed += totalEventsFailed();
1108 
1109  fill_summary(trig_paths_, rep.trigPathSummaries, &fillPathSummary);
1110  fill_summary(end_paths_, rep.endPathSummaries, &fillPathSummary);
1111  fill_summary(allWorkers(), rep.workerSummaries, &fillWorkerSummary);
1112  }
1113 
1115  using std::placeholders::_1;
1117  for_all(trig_paths_, std::bind(&Path::clearCounters, _1));
1118  for_all(end_paths_, std::bind(&Path::clearCounters, _1));
1119  for_all(allWorkers(), std::bind(&Worker::clearCounters, _1));
1120  }
1121 
1123  skippingEvent_ = false;
1124  results_->reset();
1125  }
1126 
1128 
1130  //must be sure we have cleared the count first
1131  for (auto& count : earlyDeleteBranchToCount_) {
1132  count.count = 0;
1133  }
1134  //now reset based on how many helpers use that branch
1136  ++(earlyDeleteBranchToCount_[index].count);
1137  }
1138  for (auto& helper : earlyDeleteHelpers_) {
1139  helper.reset();
1140  }
1141  }
1142 
1144  std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
1145  std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
1147  int bitpos = 0;
1148  unsigned int indexEmpty = 0;
1149  unsigned int indexOfPath = 0;
1150  for (auto& pathStatusInserter : pathStatusInserters) {
1151  std::shared_ptr<PathStatusInserter> inserterPtr = get_underlying(pathStatusInserter);
1152  WorkerPtr workerPtr(
1153  new edm::WorkerT<PathStatusInserter::ModuleType>(inserterPtr, inserterPtr->moduleDescription(), &actions));
1154  pathStatusInserterWorkers_.emplace_back(workerPtr);
1155  workerPtr->setActivityRegistry(actReg_);
1156  addToAllWorkers(workerPtr.get());
1157 
1158  // A little complexity here because a C++ Path object is not
1159  // instantiated and put into end_paths if there are no modules
1160  // on the configured path.
1161  if (indexEmpty < empty_trig_paths_.size() && bitpos == empty_trig_paths_.at(indexEmpty)) {
1162  ++indexEmpty;
1163  } else {
1164  trig_paths_.at(indexOfPath).setPathStatusInserter(inserterPtr.get(), workerPtr.get());
1165  ++indexOfPath;
1166  }
1167  ++bitpos;
1168  }
1169 
1170  bitpos = 0;
1171  indexEmpty = 0;
1172  indexOfPath = 0;
1173  for (auto& endPathStatusInserter : endPathStatusInserters) {
1174  std::shared_ptr<EndPathStatusInserter> inserterPtr = get_underlying(endPathStatusInserter);
1175  WorkerPtr workerPtr(
1176  new edm::WorkerT<EndPathStatusInserter::ModuleType>(inserterPtr, inserterPtr->moduleDescription(), &actions));
1177  endPathStatusInserterWorkers_.emplace_back(workerPtr);
1178  workerPtr->setActivityRegistry(actReg_);
1179  addToAllWorkers(workerPtr.get());
1180 
1181  // A little complexity here because a C++ Path object is not
1182  // instantiated and put into end_paths if there are no modules
1183  // on the configured path.
1184  if (indexEmpty < empty_end_paths_.size() && bitpos == empty_end_paths_.at(indexEmpty)) {
1185  ++indexEmpty;
1186  } else {
1187  end_paths_.at(indexOfPath).setPathStatusInserter(nullptr, workerPtr.get());
1188  ++indexOfPath;
1189  }
1190  ++bitpos;
1191  }
1192  }
1193 } // namespace edm
static void fillModuleInPathSummary(Path const &path, size_t which, ModuleInPathSummary &sum)
size
Write out results.
void moduleDescriptionsInEndPath(std::string const &iEndPathLabel, std::vector< ModuleDescription const *> &descriptions, unsigned int hint) const
ExceptionToActionTable const & actionTable() const
returns the action table
T getParameter(std::string const &) const
Definition: ParameterSet.h:303
static const TGPicture * info(bool iBackgroundIsBlack)
#define CMS_SA_ALLOW
std::vector< int > empty_trig_paths_
Definition: helper.py:1
roAction_t actions[nactions]
Definition: GenABIO.cc:181
bool getMapped(key_type const &k, value_type &result) const
Definition: Registry.cc:17
std::vector< edm::propagate_const< WorkerPtr > > pathStatusInserterWorkers_
void endStream(StreamID iID, StreamContext &streamContext)
virtual void replaceModuleFor(Worker *) const =0
ProductList const & productList() const
void processAccumulatorsAsync(WaitingTaskHolder, typename T::TransitionInfoType const &, ServiceToken const &, StreamID, ParentContext const &, typename T::Context const *)
std::array< SelectedProducts, NumBranchTypes > SelectedProductsForBranchType
static PFTauRenderPlugin instance
T w() const
std::shared_ptr< HLTGlobalStatus > TrigResPtr
WorkersInPath::size_type size_type
Definition: Path.h:46
int totalEventsPassed() const
void addToUnscheduledWorkers(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration > processConfiguration, std::string label, std::set< std::string > &unscheduledLabels, std::vector< std::string > &shouldBeUsedLabels)
void modulesInPath(std::string const &iPathLabel, std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel ...
void processOneEventAsync(WaitingTaskHolder iTask, EventTransitionInfo &, ServiceToken const &token, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters)
std::vector< BranchToCount > earlyDeleteBranchToCount_
void setupOnDemandSystem(EventTransitionInfo const &)
void clearCounters()
Definition: Worker.h:222
void addContextAndPrintException(char const *context, cms::Exception &ex, bool disablePrint)
void addToAllWorkers(Worker *w)
std::string const & moduleName() const
edm::propagate_const< WorkerPtr > results_inserter_
void deleteModuleIfExists(std::string const &moduleLabel)
std::shared_ptr< Worker > WorkerPtr
void find(edm::Handle< EcalRecHitCollection > &hits, DetId thisDet, std::vector< EcalRecHitCollection::const_iterator > &hit, bool debug=false)
Definition: FindCaloHit.cc:19
assert(be >=bs)
StreamID streamID() const
std::vector< Worker * > tryToPlaceConditionalModules(Worker *, std::unordered_set< std::string > &conditionalModules, std::multimap< std::string, edm::BranchDescription const *> const &conditionalModuleBranches, std::multimap< std::string, AliasInfo > const &aliasMap, ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration)
unsigned int number_of_unscheduled_modules_
int totalEventsFailed() const
void fillWorkers(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &name, bool ignoreFilters, PathWorkers &out, std::vector< std::string > const &endPathNames)
TEMPL(T2) struct Divides void
Definition: Factorize.h:24
WorkerManager workerManager_
std::shared_ptr< ActivityRegistry > actReg_
Worker * getWorker(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &label)
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
void fillEndPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, std::vector< std::string > const &endPathNames)
T getUntrackedParameter(std::string const &, T const &) const
constexpr element_type const * get() const
std::string moduleLabel
Definition: TriggerReport.h:52
oneapi::tbb::task_group * group() const noexcept
void processEDAliases(std::vector< std::string > const &aliasNamesToProcess, std::unordered_set< std::string > const &aliasModulesToProcess, ParameterSet const &proc_pset, std::string const &processName, ProductRegistry &preg)
char const * label
std::vector< WorkerInPath > PathWorkers
std::vector< int > empty_end_paths_
void doneWaiting(std::exception_ptr iExcept)
accept
Definition: HLTenums.h:18
std::string name
Definition: TriggerReport.h:41
void moduleDescriptionsInPath(std::string const &iPathLabel, std::vector< ModuleDescription const *> &descriptions, unsigned int hint) const
TrigResConstPtr results() const
Strings const & getTrigPaths() const
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
def unique(seq, keepstr=True)
Definition: tier0.py:24
Definition: Path.h:41
StreamContext streamContext_
std::vector< std::string > vstring
virtual std::vector< ConsumesInfo > consumesInfo() const =0
static void fillPathSummary(Path const &path, PathSummary &sum)
StreamSchedule(std::shared_ptr< TriggerResultInserter > inserter, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, std::shared_ptr< ModuleRegistry >, ParameterSet &proc_pset, service::TriggerNamesService const &tns, PreallocationConfiguration const &prealloc, ProductRegistry &pregistry, BranchIDListHelper &branchIDListHelper, ExceptionToActionTable const &actions, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration > processConfiguration, StreamID streamID, ProcessContext const *processContext)
void finishedPaths(std::atomic< std::exception_ptr *> &, WaitingTaskHolder, EventTransitionInfo &)
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:101
bool typeIsViewCompatible(TypeID const &requestedViewType, TypeID const &wrappedtypeID, std::string const &className)
rep
Definition: cuy.py:1189
std::exception_ptr finishProcessOneEvent(std::exception_ptr)
static Worker * getWorker(std::string const &moduleLabel, ParameterSet &proc_pset, WorkerManager &workerManager, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration)
ServiceToken lock() const
Definition: ServiceToken.h:101
void deleteModule(std::string const &iLabel)
Delete the module with label iLabel.
int totalEvents() const
static void fillWorkerSummaryAux(Worker const &w, WorkerSummary &sum)
virtual Types moduleType() const =0
Log< level::Info, false > LogInfo
void beginStream(StreamID iID, StreamContext &streamContext)
ModuleDescription const * description() const
Definition: Worker.h:188
std::string const & className() const
Definition: TypeID.cc:40
void clearCounters()
Clear all the counters in the trigger report.
void initializeEarlyDelete(ModuleRegistry &modReg, std::vector< std::string > const &branchesToDeleteEarly, edm::ProductRegistry const &preg)
void fillTrigPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, TrigResPtr, std::vector< std::string > const &endPathNames)
Strings const & getEndPaths() const
void getTriggerReport(TriggerReport &rep) const
exception_actions::ActionCodes find(const std::string &category) const
std::vector< unsigned int > earlyDeleteHelperToBranchIndicies_
void forAllModuleHolders(F iFunc)
edm::propagate_const< TrigResPtr > results_
bool search_all(ForwardSequence const &s, Datum const &d)
Definition: Algorithms.h:36
double b
Definition: hdecay.h:118
constexpr T & get_underlying(propagate_const< T > &)
void addContext(std::string const &context)
Definition: Exception.cc:165
virtual std::unique_ptr< OutputModuleCommunicator > createOutputModuleCommunicator()=0
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
static void fillWorkerSummary(Worker const *pw, WorkerSummary &sum)
std::atomic< bool > skippingEvent_
Definition: plugin.cc:23
HLT enums.
void setupResolvers(Principal &principal)
void replaceModule(maker::ModuleHolder *iMod, std::string const &iLabel)
clone the type of module with label iLabel but configure with iPSet.
std::vector< edm::propagate_const< WorkerPtr > > endPathStatusInserterWorkers_
void makePathStatusInserters(std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, ExceptionToActionTable const &actions)
def which(cmd)
Definition: eostools.py:336
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
std::vector< ModuleInPathSummary > moduleInPathSummaries
Definition: TriggerReport.h:42
void availablePaths(std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill the labels for all paths in the process
unsigned int value() const
Definition: StreamID.h:43
auto wrap(F iFunc) -> decltype(iFunc())
Log< level::Warning, false > LogWarning
std::list< std::string > const & context() const
Definition: Exception.cc:147
std::string const & moduleLabel() const
T mod(const T &a, const T &b)
Definition: ecalDccMap.h:4
std::string const & name() const
Definition: Path.h:74
std::vector< std::string > vstring
Definition: Schedule.cc:473
std::vector< std::string > set_difference(std::vector< std::string > const &v1, std::vector< std::string > const &v2)
def move(src, dest)
Definition: eostools.py:511
static Registry * instance()
Definition: Registry.cc:12
void clearCounters()
Definition: Path.cc:198
std::vector< EarlyDeleteHelper > earlyDeleteHelpers_
void addToAllWorkers(Worker *w)
void printCmsExceptionWarning(char const *behavior, cms::Exception const &e)
ParameterSet * getPSetForUpdate(std::string const &name, bool &isTracked)
std::vector< std::string > set_intersection(std::vector< std::string > const &v1, std::vector< std::string > const &v2)
unsigned transform(const HcalDetId &id, unsigned transformCode)