CMS 3D CMS Logo

Schedule.cc
Go to the documentation of this file.
2 
35 
36 #include <algorithm>
37 #include <cassert>
38 #include <cstdlib>
39 #include <functional>
40 #include <iomanip>
41 #include <list>
42 #include <map>
43 #include <set>
44 #include <exception>
45 #include <sstream>
46 
48 
49 namespace edm {
50 
51  class Maker;
52 
53  namespace {
54  using std::placeholders::_1;
55 
56  bool binary_search_string(std::vector<std::string> const& v, std::string const& s) {
57  return std::binary_search(v.begin(), v.end(), s);
58  }
59 
60  // Here we make the trigger results inserter directly. This should
61  // probably be a utility in the WorkerRegistry or elsewhere.
62 
63  std::shared_ptr<TriggerResultInserter> makeInserter(ParameterSet& proc_pset,
64  PreallocationConfiguration const& iPrealloc,
65  ProductRegistry& preg,
66  ExceptionToActionTable const& actions,
67  std::shared_ptr<ActivityRegistry> areg,
68  std::shared_ptr<ProcessConfiguration> processConfiguration) {
69  ParameterSet* trig_pset = proc_pset.getPSetForUpdate("@trigger_paths");
70  trig_pset->registerIt();
71 
72  WorkerParams work_args(trig_pset, preg, &iPrealloc, processConfiguration, actions);
73  ModuleDescription md(trig_pset->id(),
74  "TriggerResultInserter",
75  "TriggerResults",
76  processConfiguration.get(),
78 
79  areg->preModuleConstructionSignal_(md);
80  bool postCalled = false;
81  std::shared_ptr<TriggerResultInserter> returnValue;
82  try {
83  maker::ModuleHolderT<TriggerResultInserter> holder(
84  make_shared_noexcept_false<TriggerResultInserter>(*trig_pset, iPrealloc.numberOfStreams()),
85  static_cast<Maker const*>(nullptr));
86  holder.setModuleDescription(md);
87  holder.registerProductsAndCallbacks(&preg);
88  returnValue = holder.module();
89  postCalled = true;
90  // if exception then post will be called in the catch block
91  areg->postModuleConstructionSignal_(md);
92  } catch (...) {
93  if (!postCalled) {
94  try {
95  areg->postModuleConstructionSignal_(md);
96  } catch (...) {
97  // If post throws an exception ignore it because we are already handling another exception
98  }
99  }
100  throw;
101  }
102  return returnValue;
103  }
104 
105  template <typename T>
106  void makePathStatusInserters(std::vector<edm::propagate_const<std::shared_ptr<T>>>& pathStatusInserters,
107  std::vector<std::string> const& pathNames,
108  PreallocationConfiguration const& iPrealloc,
109  ProductRegistry& preg,
110  std::shared_ptr<ActivityRegistry> areg,
111  std::shared_ptr<ProcessConfiguration> processConfiguration,
112  std::string const& moduleTypeName) {
114  pset.addParameter<std::string>("@module_type", moduleTypeName);
115  pset.addParameter<std::string>("@module_edm_type", "EDProducer");
116  pset.registerIt();
117 
118  pathStatusInserters.reserve(pathNames.size());
119 
120  for (auto const& pathName : pathNames) {
121  ModuleDescription md(
122  pset.id(), moduleTypeName, pathName, processConfiguration.get(), ModuleDescription::getUniqueID());
123 
124  areg->preModuleConstructionSignal_(md);
125  bool postCalled = false;
126 
127  try {
128  maker::ModuleHolderT<T> holder(make_shared_noexcept_false<T>(iPrealloc.numberOfStreams()),
129  static_cast<Maker const*>(nullptr));
130  holder.setModuleDescription(md);
131  holder.registerProductsAndCallbacks(&preg);
132  pathStatusInserters.emplace_back(holder.module());
133  postCalled = true;
134  // if exception then post will be called in the catch block
135  areg->postModuleConstructionSignal_(md);
136  } catch (...) {
137  if (!postCalled) {
138  try {
139  areg->postModuleConstructionSignal_(md);
140  } catch (...) {
141  // If post throws an exception ignore it because we are already handling another exception
142  }
143  }
144  throw;
145  }
146  }
147  }
148 
149  void checkAndInsertAlias(std::string const& friendlyClassName,
150  std::string const& moduleLabel,
151  std::string const& productInstanceName,
152  std::string const& processName,
153  std::string const& alias,
154  std::string const& instanceAlias,
155  ProductRegistry const& preg,
156  std::multimap<BranchKey, BranchKey>& aliasMap,
157  std::map<BranchKey, BranchKey>& aliasKeys) {
158  std::string const star("*");
159 
160  BranchKey key(friendlyClassName, moduleLabel, productInstanceName, processName);
161  if (preg.productList().find(key) == preg.productList().end()) {
162  // No product was found matching the alias.
163  // We throw an exception only if a module with the specified module label was created in this process.
164  for (auto const& product : preg.productList()) {
165  if (moduleLabel == product.first.moduleLabel() && processName == product.first.processName()) {
166  throw Exception(errors::Configuration, "EDAlias does not match data\n")
167  << "There are no products of type '" << friendlyClassName << "'\n"
168  << "with module label '" << moduleLabel << "' and instance name '" << productInstanceName << "'.\n";
169  }
170  }
171  }
172 
173  if (auto iter = aliasMap.find(key); iter != aliasMap.end()) {
174  // If the same EDAlias defines multiple products pointing to the same product, throw
175  if (iter->second.moduleLabel() == alias) {
176  throw Exception(errors::Configuration, "EDAlias conflict\n")
177  << "The module label alias '" << alias << "' is used for multiple products of type '" << friendlyClassName
178  << "' with module label '" << moduleLabel << "' and instance name '" << productInstanceName
179  << "'. One alias has the instance name '" << iter->first.productInstanceName()
180  << "' and the other has the instance name '" << instanceAlias << "'.";
181  }
182  }
183 
184  std::string const& theInstanceAlias(instanceAlias == star ? productInstanceName : instanceAlias);
185  BranchKey aliasKey(friendlyClassName, alias, theInstanceAlias, processName);
186  if (preg.productList().find(aliasKey) != preg.productList().end()) {
187  throw Exception(errors::Configuration, "EDAlias conflicts with data\n")
188  << "A product of type '" << friendlyClassName << "'\n"
189  << "with module label '" << alias << "' and instance name '" << theInstanceAlias << "'\n"
190  << "already exists.\n";
191  }
192  auto iter = aliasKeys.find(aliasKey);
193  if (iter != aliasKeys.end()) {
194  // The alias matches a previous one. If the same alias is used for different product, throw.
195  if (iter->second != key) {
196  throw Exception(errors::Configuration, "EDAlias conflict\n")
197  << "The module label alias '" << alias << "' and product instance alias '" << theInstanceAlias << "'\n"
198  << "are used for multiple products of type '" << friendlyClassName << "'\n"
199  << "One has module label '" << moduleLabel << "' and product instance name '" << productInstanceName
200  << "',\n"
201  << "the other has module label '" << iter->second.moduleLabel() << "' and product instance name '"
202  << iter->second.productInstanceName() << "'.\n";
203  }
204  } else {
205  auto prodIter = preg.productList().find(key);
206  if (prodIter != preg.productList().end()) {
207  if (!prodIter->second.produced()) {
208  throw Exception(errors::Configuration, "EDAlias\n")
209  << "The module label alias '" << alias << "' and product instance alias '" << theInstanceAlias << "'\n"
210  << "are used for a product of type '" << friendlyClassName << "'\n"
211  << "with module label '" << moduleLabel << "' and product instance name '" << productInstanceName
212  << "',\n"
213  << "An EDAlias can only be used for products produced in the current process. This one is not.\n";
214  }
215  aliasMap.insert(std::make_pair(key, aliasKey));
216  aliasKeys.insert(std::make_pair(aliasKey, key));
217  }
218  }
219  }
220 
221  void processEDAliases(ParameterSet const& proc_pset, std::string const& processName, ProductRegistry& preg) {
222  std::vector<std::string> aliases = proc_pset.getParameter<std::vector<std::string>>("@all_aliases");
223  if (aliases.empty()) {
224  return;
225  }
226  std::string const star("*");
227  std::string const empty("");
229  desc.add<std::string>("type");
230  desc.add<std::string>("fromProductInstance", star);
231  desc.add<std::string>("toProductInstance", star);
232 
233  std::multimap<BranchKey, BranchKey> aliasMap;
234 
235  std::map<BranchKey, BranchKey> aliasKeys; // Used to search for duplicates or clashes.
236 
237  // Now, loop over the alias information and store it in aliasMap.
238  for (std::string const& alias : aliases) {
239  ParameterSet const& aliasPSet = proc_pset.getParameterSet(alias);
240  std::vector<std::string> vPSetNames = aliasPSet.getParameterNamesForType<VParameterSet>();
241  for (std::string const& moduleLabel : vPSetNames) {
242  VParameterSet vPSet = aliasPSet.getParameter<VParameterSet>(moduleLabel);
243  for (ParameterSet& pset : vPSet) {
244  desc.validate(pset);
245  std::string friendlyClassName = pset.getParameter<std::string>("type");
246  std::string productInstanceName = pset.getParameter<std::string>("fromProductInstance");
247  std::string instanceAlias = pset.getParameter<std::string>("toProductInstance");
248  if (productInstanceName == star) {
249  bool match = false;
250  BranchKey lowerBound(friendlyClassName, moduleLabel, empty, empty);
251  for (ProductRegistry::ProductList::const_iterator it = preg.productList().lower_bound(lowerBound);
252  it != preg.productList().end() && it->first.friendlyClassName() == friendlyClassName &&
253  it->first.moduleLabel() == moduleLabel;
254  ++it) {
255  if (it->first.processName() != processName) {
256  continue;
257  }
258  match = true;
259 
260  checkAndInsertAlias(friendlyClassName,
261  moduleLabel,
262  it->first.productInstanceName(),
263  processName,
264  alias,
265  instanceAlias,
266  preg,
267  aliasMap,
268  aliasKeys);
269  }
270  if (!match) {
271  // No product was found matching the alias.
272  // We throw an exception only if a module with the specified module label was created in this process.
273  for (auto const& product : preg.productList()) {
274  if (moduleLabel == product.first.moduleLabel() && processName == product.first.processName()) {
275  throw Exception(errors::Configuration, "EDAlias parameter set mismatch\n")
276  << "There are no products of type '" << friendlyClassName << "'\n"
277  << "with module label '" << moduleLabel << "'.\n";
278  }
279  }
280  }
281  } else {
282  checkAndInsertAlias(friendlyClassName,
283  moduleLabel,
284  productInstanceName,
285  processName,
286  alias,
287  instanceAlias,
288  preg,
289  aliasMap,
290  aliasKeys);
291  }
292  }
293  }
294  }
295 
296  // Now add the new alias entries to the product registry.
297  for (auto const& aliasEntry : aliasMap) {
298  ProductRegistry::ProductList::const_iterator it = preg.productList().find(aliasEntry.first);
299  assert(it != preg.productList().end());
300  preg.addLabelAlias(it->second, aliasEntry.second.moduleLabel(), aliasEntry.second.productInstanceName());
301  }
302  }
303 
304  typedef std::vector<std::string> vstring;
305 
306  void processSwitchProducers(ParameterSet const& proc_pset, std::string const& processName, ProductRegistry& preg) {
307  // Update Switch BranchDescriptions for the chosen case
308  struct BranchesCases {
309  BranchesCases(std::vector<std::string> cases) : caseLabels{std::move(cases)} {}
310  std::vector<BranchKey> chosenBranches;
311  std::vector<std::string> caseLabels;
312  };
313  std::map<std::string, BranchesCases> switchMap;
314  for (auto& prod : preg.productListUpdator()) {
315  if (prod.second.isSwitchAlias()) {
316  auto it = switchMap.find(prod.second.moduleLabel());
317  if (it == switchMap.end()) {
318  auto const& switchPSet = proc_pset.getParameter<edm::ParameterSet>(prod.second.moduleLabel());
319  auto inserted = switchMap.emplace(prod.second.moduleLabel(),
320  switchPSet.getParameter<std::vector<std::string>>("@all_cases"));
321  assert(inserted.second);
322  it = inserted.first;
323  }
324 
325  for (auto const& productIter : preg.productList()) {
326  BranchKey const& branchKey = productIter.first;
327  BranchDescription const& desc = productIter.second;
328  if (desc.branchType() == prod.second.branchType() and
329  desc.unwrappedTypeID().typeInfo() == prod.second.unwrappedTypeID().typeInfo() and
330  branchKey.moduleLabel() == prod.second.switchAliasModuleLabel() and
331  branchKey.productInstanceName() == prod.second.productInstanceName()) {
332  if (branchKey.processName() != processName) {
334  << "Encountered a BranchDescription that is aliased-for by SwitchProducer, and whose processName "
335  << branchKey.processName() << " differs from current process " << processName
336  << ". Module label is " << branchKey.moduleLabel() << ".\nPlease contact a framework developer.";
337  }
338  prod.second.setSwitchAliasForBranch(desc);
339  it->second.chosenBranches.push_back(prod.first); // with moduleLabel of the Switch
340  }
341  }
342  }
343  }
344  if (switchMap.empty())
345  return;
346 
347  for (auto& elem : switchMap) {
348  std::sort(elem.second.chosenBranches.begin(), elem.second.chosenBranches.end());
349  }
350 
351  // Check that non-chosen cases declare exactly the same branches
352  // Also set the alias-for branches to transient
353  std::vector<bool> foundBranches;
354  for (auto const& switchItem : switchMap) {
355  auto const& switchLabel = switchItem.first;
356  auto const& chosenBranches = switchItem.second.chosenBranches;
357  auto const& caseLabels = switchItem.second.caseLabels;
358  foundBranches.resize(chosenBranches.size());
359  for (auto const& caseLabel : caseLabels) {
360  std::fill(foundBranches.begin(), foundBranches.end(), false);
361  for (auto& nonConstItem : preg.productListUpdator()) {
362  auto const& item = nonConstItem;
363  if (item.first.moduleLabel() == caseLabel) {
364  // Set the alias-for branch as transient so it gets fully ignored in output.
365  // I tried first to implicitly drop all branches with
366  // '@' in ProductSelector, but that gave problems on
367  // input (those branches would be implicitly dropped on
368  // input as well, leading to the SwitchProducer branches
369  // do be dropped as dependent ones, as the alias
370  // detection logic in RootFile says that the
371  // SwitchProducer branches are not alias branches)
372  nonConstItem.second.setTransient(true);
373 
374  auto range = std::equal_range(chosenBranches.begin(),
375  chosenBranches.end(),
376  BranchKey(item.first.friendlyClassName(),
377  switchLabel,
378  item.first.productInstanceName(),
379  item.first.processName()));
380  if (range.first == range.second) {
382  << "SwitchProducer " << switchLabel << " has a case " << caseLabel << " with a product "
383  << item.first << " that is not produced by the chosen case "
384  << proc_pset.getParameter<edm::ParameterSet>(switchLabel)
385  .getUntrackedParameter<std::string>("@chosen_case");
386  }
387  assert(std::distance(range.first, range.second) == 1);
388  foundBranches[std::distance(chosenBranches.begin(), range.first)] = true;
389 
390  // Check that there are no BranchAliases for any of the cases
391  auto const& bd = item.second;
392  if (not bd.branchAliases().empty()) {
394  << "SwitchProducer does not support ROOT branch aliases. Got the following ROOT branch "
395  "aliases for SwitchProducer with label "
396  << switchLabel << " for case " << caseLabel << ":";
397  for (auto const& branchAlias : bd.branchAliases()) {
398  ex << " " << branchAlias;
399  }
400  throw ex;
401  }
402  }
403  }
404 
405  for (size_t i = 0; i < chosenBranches.size(); i++) {
406  if (not foundBranches[i]) {
408  << "SwitchProducer " << switchLabel << " has a case " << caseLabel
409  << " that does not produce a product " << chosenBranches[i] << " that is produced by the chosen case "
410  << proc_pset.getParameter<edm::ParameterSet>(switchLabel)
411  .getUntrackedParameter<std::string>("@chosen_case");
412  }
413  }
414  }
415  }
416  }
417 
418  void reduceParameterSet(ParameterSet& proc_pset,
419  vstring const& end_path_name_list,
420  vstring& modulesInConfig,
421  std::set<std::string> const& usedModuleLabels,
422  std::map<std::string, std::vector<std::pair<std::string, int>>>& outputModulePathPositions) {
423  // Before calculating the ParameterSetID of the top level ParameterSet or
424  // saving it in the registry drop from the top level ParameterSet all
425  // OutputModules and EDAnalyzers not on trigger paths. If unscheduled
426  // production is not enabled also drop all the EDFilters and EDProducers
427  // that are not scheduled. Drop the ParameterSet used to configure the module
428  // itself. Also drop the other traces of these labels in the top level
429  // ParameterSet: Remove that labels from @all_modules and from all the
430  // end paths. If this makes any end paths empty, then remove the end path
431  // name from @end_paths, and @paths.
432 
433  // First make a list of labels to drop
434  vstring outputModuleLabels;
435  std::string edmType;
436  std::string const moduleEdmType("@module_edm_type");
437  std::string const outputModule("OutputModule");
438  std::string const edAnalyzer("EDAnalyzer");
439  std::string const edFilter("EDFilter");
440  std::string const edProducer("EDProducer");
441 
442  std::set<std::string> modulesInConfigSet(modulesInConfig.begin(), modulesInConfig.end());
443 
444  //need a list of all modules on paths in order to determine
445  // if an EDAnalyzer only appears on an end path
446  vstring scheduledPaths = proc_pset.getParameter<vstring>("@paths");
447  std::set<std::string> modulesOnPaths;
448  {
449  std::set<std::string> noEndPaths(scheduledPaths.begin(), scheduledPaths.end());
450  for (auto const& endPath : end_path_name_list) {
451  noEndPaths.erase(endPath);
452  }
453  {
454  vstring labels;
455  for (auto const& path : noEndPaths) {
456  labels = proc_pset.getParameter<vstring>(path);
457  modulesOnPaths.insert(labels.begin(), labels.end());
458  }
459  }
460  }
461  //Initially fill labelsToBeDropped with all module mentioned in
462  // the configuration but which are not being used by the system
463  std::vector<std::string> labelsToBeDropped;
464  labelsToBeDropped.reserve(modulesInConfigSet.size());
465  std::set_difference(modulesInConfigSet.begin(),
466  modulesInConfigSet.end(),
467  usedModuleLabels.begin(),
468  usedModuleLabels.end(),
469  std::back_inserter(labelsToBeDropped));
470 
471  const unsigned int sizeBeforeOutputModules = labelsToBeDropped.size();
472  for (auto const& modLabel : usedModuleLabels) {
473  // Do nothing for modules that do not have a ParameterSet. Modules of type
474  // PathStatusInserter and EndPathStatusInserter will not have a ParameterSet.
475  if (proc_pset.existsAs<ParameterSet>(modLabel)) {
476  edmType = proc_pset.getParameterSet(modLabel).getParameter<std::string>(moduleEdmType);
477  if (edmType == outputModule) {
478  outputModuleLabels.push_back(modLabel);
479  labelsToBeDropped.push_back(modLabel);
480  }
481  if (edmType == edAnalyzer) {
482  if (modulesOnPaths.end() == modulesOnPaths.find(modLabel)) {
483  labelsToBeDropped.push_back(modLabel);
484  }
485  }
486  }
487  }
488  //labelsToBeDropped must be sorted
489  std::inplace_merge(
490  labelsToBeDropped.begin(), labelsToBeDropped.begin() + sizeBeforeOutputModules, labelsToBeDropped.end());
491 
492  // drop the parameter sets used to configure the modules
493  for_all(labelsToBeDropped, std::bind(&ParameterSet::eraseOrSetUntrackedParameterSet, std::ref(proc_pset), _1));
494 
495  // drop the labels from @all_modules
496  vstring::iterator endAfterRemove =
497  std::remove_if(modulesInConfig.begin(),
498  modulesInConfig.end(),
499  std::bind(binary_search_string, std::ref(labelsToBeDropped), _1));
500  modulesInConfig.erase(endAfterRemove, modulesInConfig.end());
501  proc_pset.addParameter<vstring>(std::string("@all_modules"), modulesInConfig);
502 
503  // drop the labels from all end paths
504  vstring endPathsToBeDropped;
505  vstring labels;
506  for (vstring::const_iterator iEndPath = end_path_name_list.begin(), endEndPath = end_path_name_list.end();
507  iEndPath != endEndPath;
508  ++iEndPath) {
509  labels = proc_pset.getParameter<vstring>(*iEndPath);
510  vstring::iterator iSave = labels.begin();
511  vstring::iterator iBegin = labels.begin();
512 
513  for (vstring::iterator iLabel = labels.begin(), iEnd = labels.end(); iLabel != iEnd; ++iLabel) {
514  if (binary_search_string(labelsToBeDropped, *iLabel)) {
515  if (binary_search_string(outputModuleLabels, *iLabel)) {
516  outputModulePathPositions[*iLabel].emplace_back(*iEndPath, iSave - iBegin);
517  }
518  } else {
519  if (iSave != iLabel) {
520  iSave->swap(*iLabel);
521  }
522  ++iSave;
523  }
524  }
525  labels.erase(iSave, labels.end());
526  if (labels.empty()) {
527  // remove empty end paths and save their names
528  proc_pset.eraseSimpleParameter(*iEndPath);
529  endPathsToBeDropped.push_back(*iEndPath);
530  } else {
531  proc_pset.addParameter<vstring>(*iEndPath, labels);
532  }
533  }
534  sort_all(endPathsToBeDropped);
535 
536  // remove empty end paths from @paths
537  endAfterRemove = std::remove_if(scheduledPaths.begin(),
538  scheduledPaths.end(),
539  std::bind(binary_search_string, std::ref(endPathsToBeDropped), _1));
540  scheduledPaths.erase(endAfterRemove, scheduledPaths.end());
541  proc_pset.addParameter<vstring>(std::string("@paths"), scheduledPaths);
542 
543  // remove empty end paths from @end_paths
544  vstring scheduledEndPaths = proc_pset.getParameter<vstring>("@end_paths");
545  endAfterRemove = std::remove_if(scheduledEndPaths.begin(),
546  scheduledEndPaths.end(),
547  std::bind(binary_search_string, std::ref(endPathsToBeDropped), _1));
548  scheduledEndPaths.erase(endAfterRemove, scheduledEndPaths.end());
549  proc_pset.addParameter<vstring>(std::string("@end_paths"), scheduledEndPaths);
550  }
551 
552  class RngEDConsumer : public EDConsumerBase {
553  public:
554  explicit RngEDConsumer(std::set<TypeID>& typesConsumed) {
556  if (rng.isAvailable()) {
557  rng->consumes(consumesCollector());
558  for (auto const& consumesInfo : this->consumesInfo()) {
559  typesConsumed.emplace(consumesInfo.type());
560  }
561  }
562  }
563  };
564  } // namespace
565  // -----------------------------
566 
567  typedef std::vector<std::string> vstring;
568 
569  // -----------------------------
570 
572  service::TriggerNamesService const& tns,
573  ProductRegistry& preg,
574  BranchIDListHelper& branchIDListHelper,
575  ThinnedAssociationsHelper& thinnedAssociationsHelper,
576  SubProcessParentageHelper const* subProcessParentageHelper,
577  ExceptionToActionTable const& actions,
578  std::shared_ptr<ActivityRegistry> areg,
579  std::shared_ptr<ProcessConfiguration> processConfiguration,
580  bool hasSubprocesses,
581  PreallocationConfiguration const& prealloc,
582  ProcessContext const* processContext)
583  : //Only create a resultsInserter if there is a trigger path
584  resultsInserter_{tns.getTrigPaths().empty()
585  ? std::shared_ptr<TriggerResultInserter>{}
586  : makeInserter(proc_pset, prealloc, preg, actions, areg, processConfiguration)},
589  preallocConfig_(prealloc),
590  pathNames_(&tns.getTrigPaths()),
591  endPathNames_(&tns.getEndPaths()),
592  wantSummary_(tns.wantSummary()),
593  endpathsAreActive_(true) {
594  makePathStatusInserters(pathStatusInserters_,
595  *pathNames_,
596  prealloc,
597  preg,
598  areg,
599  processConfiguration,
600  std::string("PathStatusInserter"));
601 
602  makePathStatusInserters(endPathStatusInserters_,
603  *endPathNames_,
604  prealloc,
605  preg,
606  areg,
607  processConfiguration,
608  std::string("EndPathStatusInserter"));
609 
610  assert(0 < prealloc.numberOfStreams());
611  streamSchedules_.reserve(prealloc.numberOfStreams());
612  for (unsigned int i = 0; i < prealloc.numberOfStreams(); ++i) {
613  streamSchedules_.emplace_back(make_shared_noexcept_false<StreamSchedule>(resultsInserter(),
616  moduleRegistry(),
617  proc_pset,
618  tns,
619  prealloc,
620  preg,
621  branchIDListHelper,
622  actions,
623  areg,
624  processConfiguration,
625  !hasSubprocesses,
626  StreamID{i},
627  processContext));
628  }
629 
630  //TriggerResults are injected automatically by StreamSchedules and are
631  // unknown to the ModuleRegistry
632  const std::string kTriggerResults("TriggerResults");
633  std::vector<std::string> modulesToUse;
634  modulesToUse.reserve(streamSchedules_[0]->allWorkers().size());
635  for (auto const& worker : streamSchedules_[0]->allWorkers()) {
636  if (worker->description().moduleLabel() != kTriggerResults) {
637  modulesToUse.push_back(worker->description().moduleLabel());
638  }
639  }
640  //The unscheduled modules are at the end of the list, but we want them at the front
641  unsigned int const nUnscheduledModules = streamSchedules_[0]->numberOfUnscheduledModules();
642  if (nUnscheduledModules > 0) {
643  std::vector<std::string> temp;
644  temp.reserve(modulesToUse.size());
645  auto itBeginUnscheduled = modulesToUse.begin() + modulesToUse.size() - nUnscheduledModules;
646  std::copy(itBeginUnscheduled, modulesToUse.end(), std::back_inserter(temp));
647  std::copy(modulesToUse.begin(), itBeginUnscheduled, std::back_inserter(temp));
648  temp.swap(modulesToUse);
649  }
650 
651  // propagate_const<T> has no reset() function
652  globalSchedule_ = std::make_unique<GlobalSchedule>(resultsInserter(),
655  moduleRegistry(),
656  modulesToUse,
657  proc_pset,
658  preg,
659  prealloc,
660  actions,
661  areg,
662  processConfiguration,
663  processContext);
664 
665  //TriggerResults is not in the top level ParameterSet so the call to
666  // reduceParameterSet would fail to find it. Just remove it up front.
667  std::set<std::string> usedModuleLabels;
668  for (auto const& worker : allWorkers()) {
669  if (worker->description().moduleLabel() != kTriggerResults) {
670  usedModuleLabels.insert(worker->description().moduleLabel());
671  }
672  }
673  std::vector<std::string> modulesInConfig(proc_pset.getParameter<std::vector<std::string>>("@all_modules"));
674  std::map<std::string, std::vector<std::pair<std::string, int>>> outputModulePathPositions;
675  reduceParameterSet(proc_pset, tns.getEndPaths(), modulesInConfig, usedModuleLabels, outputModulePathPositions);
676  processEDAliases(proc_pset, processConfiguration->processName(), preg);
677 
678  // At this point all BranchDescriptions are created. Mark now the
679  // ones of unscheduled workers to be on-demand.
680  if (nUnscheduledModules > 0) {
681  std::set<std::string> unscheduledModules(modulesToUse.begin(), modulesToUse.begin() + nUnscheduledModules);
682  preg.setUnscheduledProducts(unscheduledModules);
683  }
684 
685  processSwitchProducers(proc_pset, processConfiguration->processName(), preg);
686  proc_pset.registerIt();
687  processConfiguration->setParameterSetID(proc_pset.id());
688  processConfiguration->setProcessConfigurationID();
689 
690  // This is used for a little sanity-check to make sure no code
691  // modifications alter the number of workers at a later date.
692  size_t all_workers_count = allWorkers().size();
693 
694  moduleRegistry_->forAllModuleHolders([this](maker::ModuleHolder* iHolder) {
695  auto comm = iHolder->createOutputModuleCommunicator();
696  if (comm) {
697  all_output_communicators_.emplace_back(std::shared_ptr<OutputModuleCommunicator>{comm.release()});
698  }
699  });
700  // Now that the output workers are filled in, set any output limits or information.
701  limitOutput(proc_pset, branchIDListHelper.branchIDLists(), subProcessParentageHelper);
702 
703  // Sanity check: make sure nobody has added a worker after we've
704  // already relied on the WorkerManager being full.
705  assert(all_workers_count == allWorkers().size());
706 
707  branchIDListHelper.updateFromRegistry(preg);
708 
709  for (auto const& worker : streamSchedules_[0]->allWorkers()) {
710  worker->registerThinnedAssociations(preg, thinnedAssociationsHelper);
711  }
712  thinnedAssociationsHelper.sort();
713 
714  // The output modules consume products in kept branches.
715  // So we must set this up before freezing.
716  for (auto& c : all_output_communicators_) {
717  c->selectProducts(preg, thinnedAssociationsHelper);
718  }
719 
720  for (auto& product : preg.productListUpdator()) {
721  setIsMergeable(product.second);
722  }
723 
724  {
725  // We now get a collection of types that may be consumed.
726  std::set<TypeID> productTypesConsumed;
727  std::set<TypeID> elementTypesConsumed;
728  // Loop over all modules
729  for (auto const& worker : allWorkers()) {
730  for (auto const& consumesInfo : worker->consumesInfo()) {
731  if (consumesInfo.kindOfType() == PRODUCT_TYPE) {
732  productTypesConsumed.emplace(consumesInfo.type());
733  } else {
734  elementTypesConsumed.emplace(consumesInfo.type());
735  }
736  }
737  }
738  // The SubProcess class is not a module, yet it may consume.
739  if (hasSubprocesses) {
740  productTypesConsumed.emplace(typeid(TriggerResults));
741  }
742  // The RandomNumberGeneratorService is not a module, yet it consumes.
743  { RngEDConsumer rngConsumer = RngEDConsumer(productTypesConsumed); }
744  preg.setFrozen(productTypesConsumed, elementTypesConsumed, processConfiguration->processName());
745  }
746 
747  for (auto& c : all_output_communicators_) {
748  c->setEventSelectionInfo(outputModulePathPositions, preg.anyProductProduced());
749  }
750 
751  if (wantSummary_) {
752  std::vector<const ModuleDescription*> modDesc;
753  const auto& workers = allWorkers();
754  modDesc.reserve(workers.size());
755 
756  std::transform(workers.begin(),
757  workers.end(),
758  std::back_inserter(modDesc),
759  [](const Worker* iWorker) -> const ModuleDescription* { return iWorker->descPtr(); });
760 
761  // propagate_const<T> has no reset() function
762  summaryTimeKeeper_ = std::make_unique<SystemTimeKeeper>(prealloc.numberOfStreams(), modDesc, tns, processContext);
763  auto timeKeeperPtr = summaryTimeKeeper_.get();
764 
765  areg->watchPreModuleEvent(timeKeeperPtr, &SystemTimeKeeper::startModuleEvent);
766  areg->watchPostModuleEvent(timeKeeperPtr, &SystemTimeKeeper::stopModuleEvent);
767  areg->watchPreModuleEventAcquire(timeKeeperPtr, &SystemTimeKeeper::restartModuleEvent);
768  areg->watchPostModuleEventAcquire(timeKeeperPtr, &SystemTimeKeeper::stopModuleEvent);
769  areg->watchPreModuleEventDelayedGet(timeKeeperPtr, &SystemTimeKeeper::pauseModuleEvent);
770  areg->watchPostModuleEventDelayedGet(timeKeeperPtr, &SystemTimeKeeper::restartModuleEvent);
771 
772  areg->watchPreSourceEvent(timeKeeperPtr, &SystemTimeKeeper::startEvent);
773  areg->watchPostEvent(timeKeeperPtr, &SystemTimeKeeper::stopEvent);
774 
775  areg->watchPrePathEvent(timeKeeperPtr, &SystemTimeKeeper::startPath);
776  areg->watchPostPathEvent(timeKeeperPtr, &SystemTimeKeeper::stopPath);
777 
778  areg->watchPostBeginJob(timeKeeperPtr, &SystemTimeKeeper::startProcessingLoop);
779  areg->watchPreEndJob(timeKeeperPtr, &SystemTimeKeeper::stopProcessingLoop);
780  //areg->preModuleEventSignal_.connect([timeKeeperPtr](StreamContext const& iContext, ModuleCallingContext const& iMod) {
781  //timeKeeperPtr->startModuleEvent(iContext,iMod);
782  //});
783  }
784 
785  } // Schedule::Schedule
786 
787  void Schedule::limitOutput(ParameterSet const& proc_pset,
788  BranchIDLists const& branchIDLists,
789  SubProcessParentageHelper const* subProcessParentageHelper) {
790  std::string const output("output");
791 
792  ParameterSet const& maxEventsPSet = proc_pset.getUntrackedParameterSet("maxEvents");
793  int maxEventSpecs = 0;
794  int maxEventsOut = -1;
795  ParameterSet const* vMaxEventsOut = nullptr;
796  std::vector<std::string> intNamesE = maxEventsPSet.getParameterNamesForType<int>(false);
797  if (search_all(intNamesE, output)) {
798  maxEventsOut = maxEventsPSet.getUntrackedParameter<int>(output);
799  ++maxEventSpecs;
800  }
801  std::vector<std::string> psetNamesE;
802  maxEventsPSet.getParameterSetNames(psetNamesE, false);
803  if (search_all(psetNamesE, output)) {
804  vMaxEventsOut = &maxEventsPSet.getUntrackedParameterSet(output);
805  ++maxEventSpecs;
806  }
807 
808  if (maxEventSpecs > 1) {
810  << "\nAt most, one form of 'output' may appear in the 'maxEvents' parameter set";
811  }
812 
813  for (auto& c : all_output_communicators_) {
814  OutputModuleDescription desc(branchIDLists, maxEventsOut, subProcessParentageHelper);
815  if (vMaxEventsOut != nullptr && !vMaxEventsOut->empty()) {
816  std::string const& moduleLabel = c->description().moduleLabel();
817  try {
818  desc.maxEvents_ = vMaxEventsOut->getUntrackedParameter<int>(moduleLabel);
819  } catch (Exception const&) {
821  << "\nNo entry in 'maxEvents' for output module label '" << moduleLabel << "'.\n";
822  }
823  }
824  c->configure(desc);
825  }
826  }
827 
828  bool Schedule::terminate() const {
829  if (all_output_communicators_.empty()) {
830  return false;
831  }
832  for (auto& c : all_output_communicators_) {
833  if (!c->limitReached()) {
834  // Found an output module that has not reached output event count.
835  return false;
836  }
837  }
838  LogInfo("SuccessfulTermination") << "The job is terminating successfully because each output module\n"
839  << "has reached its configured limit.\n";
840  return true;
841  }
842 
844  globalSchedule_->endJob(collector);
845  if (collector.hasThrown()) {
846  return;
847  }
848 
849  if (wantSummary_ == false)
850  return;
851  {
852  TriggerReport tr;
853  getTriggerReport(tr);
854 
855  // The trigger report (pass/fail etc.):
856 
857  LogVerbatim("FwkSummary") << "";
858  if (streamSchedules_[0]->context().processContext()->isSubProcess()) {
859  LogVerbatim("FwkSummary") << "TrigReport Process: "
860  << streamSchedules_[0]->context().processContext()->processName();
861  }
862  LogVerbatim("FwkSummary") << "TrigReport "
863  << "---------- Event Summary ------------";
864  if (!tr.trigPathSummaries.empty()) {
865  LogVerbatim("FwkSummary") << "TrigReport"
866  << " Events total = " << tr.eventSummary.totalEvents
867  << " passed = " << tr.eventSummary.totalEventsPassed
868  << " failed = " << tr.eventSummary.totalEventsFailed << "";
869  } else {
870  LogVerbatim("FwkSummary") << "TrigReport"
871  << " Events total = " << tr.eventSummary.totalEvents
872  << " passed = " << tr.eventSummary.totalEvents << " failed = 0";
873  }
874 
875  LogVerbatim("FwkSummary") << "";
876  LogVerbatim("FwkSummary") << "TrigReport "
877  << "---------- Path Summary ------------";
878  LogVerbatim("FwkSummary") << "TrigReport " << std::right << std::setw(10) << "Trig Bit#"
879  << " " << std::right << std::setw(10) << "Executed"
880  << " " << std::right << std::setw(10) << "Passed"
881  << " " << std::right << std::setw(10) << "Failed"
882  << " " << std::right << std::setw(10) << "Error"
883  << " "
884  << "Name"
885  << "";
886  for (auto const& p : tr.trigPathSummaries) {
887  LogVerbatim("FwkSummary") << "TrigReport " << std::right << std::setw(5) << 1 << std::right << std::setw(5)
888  << p.bitPosition << " " << std::right << std::setw(10) << p.timesRun << " "
889  << std::right << std::setw(10) << p.timesPassed << " " << std::right << std::setw(10)
890  << p.timesFailed << " " << std::right << std::setw(10) << p.timesExcept << " "
891  << p.name << "";
892  }
893 
894  /*
895  std::vector<int>::const_iterator epi = empty_trig_paths_.begin();
896  std::vector<int>::const_iterator epe = empty_trig_paths_.end();
897  std::vector<std::string>::const_iterator epn = empty_trig_path_names_.begin();
898  for (; epi != epe; ++epi, ++epn) {
899 
900  LogVerbatim("FwkSummary") << "TrigReport "
901  << std::right << std::setw(5) << 1
902  << std::right << std::setw(5) << *epi << " "
903  << std::right << std::setw(10) << totalEvents() << " "
904  << std::right << std::setw(10) << totalEvents() << " "
905  << std::right << std::setw(10) << 0 << " "
906  << std::right << std::setw(10) << 0 << " "
907  << *epn << "";
908  }
909  */
910 
911  LogVerbatim("FwkSummary") << "";
912  LogVerbatim("FwkSummary") << "TrigReport "
913  << "-------End-Path Summary ------------";
914  LogVerbatim("FwkSummary") << "TrigReport " << std::right << std::setw(10) << "Trig Bit#"
915  << " " << std::right << std::setw(10) << "Executed"
916  << " " << std::right << std::setw(10) << "Passed"
917  << " " << std::right << std::setw(10) << "Failed"
918  << " " << std::right << std::setw(10) << "Error"
919  << " "
920  << "Name"
921  << "";
922  for (auto const& p : tr.endPathSummaries) {
923  LogVerbatim("FwkSummary") << "TrigReport " << std::right << std::setw(5) << 0 << std::right << std::setw(5)
924  << p.bitPosition << " " << std::right << std::setw(10) << p.timesRun << " "
925  << std::right << std::setw(10) << p.timesPassed << " " << std::right << std::setw(10)
926  << p.timesFailed << " " << std::right << std::setw(10) << p.timesExcept << " "
927  << p.name << "";
928  }
929 
930  for (auto const& p : tr.trigPathSummaries) {
931  LogVerbatim("FwkSummary") << "";
932  LogVerbatim("FwkSummary") << "TrigReport "
933  << "---------- Modules in Path: " << p.name << " ------------";
934  LogVerbatim("FwkSummary") << "TrigReport " << std::right << std::setw(10) << "Trig Bit#"
935  << " " << std::right << std::setw(10) << "Visited"
936  << " " << std::right << std::setw(10) << "Passed"
937  << " " << std::right << std::setw(10) << "Failed"
938  << " " << std::right << std::setw(10) << "Error"
939  << " "
940  << "Name"
941  << "";
942 
943  unsigned int bitpos = 0;
944  for (auto const& mod : p.moduleInPathSummaries) {
945  LogVerbatim("FwkSummary") << "TrigReport " << std::right << std::setw(5) << 1 << std::right << std::setw(5)
946  << bitpos << " " << std::right << std::setw(10) << mod.timesVisited << " "
947  << std::right << std::setw(10) << mod.timesPassed << " " << std::right
948  << std::setw(10) << mod.timesFailed << " " << std::right << std::setw(10)
949  << mod.timesExcept << " " << mod.moduleLabel << "";
950  ++bitpos;
951  }
952  }
953 
954  for (auto const& p : tr.endPathSummaries) {
955  LogVerbatim("FwkSummary") << "";
956  LogVerbatim("FwkSummary") << "TrigReport "
957  << "------ Modules in End-Path: " << p.name << " ------------";
958  LogVerbatim("FwkSummary") << "TrigReport " << std::right << std::setw(10) << "Trig Bit#"
959  << " " << std::right << std::setw(10) << "Visited"
960  << " " << std::right << std::setw(10) << "Passed"
961  << " " << std::right << std::setw(10) << "Failed"
962  << " " << std::right << std::setw(10) << "Error"
963  << " "
964  << "Name"
965  << "";
966 
967  unsigned int bitpos = 0;
968  for (auto const& mod : p.moduleInPathSummaries) {
969  LogVerbatim("FwkSummary") << "TrigReport " << std::right << std::setw(5) << 0 << std::right << std::setw(5)
970  << bitpos << " " << std::right << std::setw(10) << mod.timesVisited << " "
971  << std::right << std::setw(10) << mod.timesPassed << " " << std::right
972  << std::setw(10) << mod.timesFailed << " " << std::right << std::setw(10)
973  << mod.timesExcept << " " << mod.moduleLabel << "";
974  ++bitpos;
975  }
976  }
977 
978  LogVerbatim("FwkSummary") << "";
979  LogVerbatim("FwkSummary") << "TrigReport "
980  << "---------- Module Summary ------------";
981  LogVerbatim("FwkSummary") << "TrigReport " << std::right << std::setw(10) << "Visited"
982  << " " << std::right << std::setw(10) << "Executed"
983  << " " << std::right << std::setw(10) << "Passed"
984  << " " << std::right << std::setw(10) << "Failed"
985  << " " << std::right << std::setw(10) << "Error"
986  << " "
987  << "Name"
988  << "";
989  for (auto const& worker : tr.workerSummaries) {
990  LogVerbatim("FwkSummary") << "TrigReport " << std::right << std::setw(10) << worker.timesVisited << " "
991  << std::right << std::setw(10) << worker.timesRun << " " << std::right
992  << std::setw(10) << worker.timesPassed << " " << std::right << std::setw(10)
993  << worker.timesFailed << " " << std::right << std::setw(10) << worker.timesExcept
994  << " " << worker.moduleLabel << "";
995  }
996  LogVerbatim("FwkSummary") << "";
997  }
998  // The timing report (CPU and Real Time):
1001 
1002  const int totalEvents = std::max(1, tr.eventSummary.totalEvents);
1003 
1004  LogVerbatim("FwkSummary") << "TimeReport "
1005  << "---------- Event Summary ---[sec]----";
1006  LogVerbatim("FwkSummary") << "TimeReport" << std::setprecision(6) << std::fixed
1007  << " event loop CPU/event = " << tr.eventSummary.cpuTime / totalEvents;
1008  LogVerbatim("FwkSummary") << "TimeReport" << std::setprecision(6) << std::fixed
1009  << " event loop Real/event = " << tr.eventSummary.realTime / totalEvents;
1010  LogVerbatim("FwkSummary") << "TimeReport" << std::setprecision(6) << std::fixed
1011  << " sum Streams Real/event = " << tr.eventSummary.sumStreamRealTime / totalEvents;
1012  LogVerbatim("FwkSummary") << "TimeReport" << std::setprecision(6) << std::fixed << " efficiency CPU/Real/thread = "
1014 
1015  constexpr int kColumn1Size = 10;
1016  constexpr int kColumn2Size = 12;
1017  constexpr int kColumn3Size = 12;
1018  LogVerbatim("FwkSummary") << "";
1019  LogVerbatim("FwkSummary") << "TimeReport "
1020  << "---------- Path Summary ---[Real sec]----";
1021  LogVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
1022  << " " << std::right << std::setw(kColumn2Size) << "per exec"
1023  << " Name";
1024  for (auto const& p : tr.trigPathSummaries) {
1025  const int timesRun = std::max(1, p.timesRun);
1026  LogVerbatim("FwkSummary") << "TimeReport " << std::setprecision(6) << std::fixed << std::right
1027  << std::setw(kColumn1Size) << p.realTime / totalEvents << " " << std::right
1028  << std::setw(kColumn2Size) << p.realTime / timesRun << " " << p.name << "";
1029  }
1030  LogVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
1031  << " " << std::right << std::setw(kColumn2Size) << "per exec"
1032  << " Name"
1033  << "";
1034 
1035  LogVerbatim("FwkSummary") << "";
1036  LogVerbatim("FwkSummary") << "TimeReport "
1037  << "-------End-Path Summary ---[Real sec]----";
1038  LogVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
1039  << " " << std::right << std::setw(kColumn2Size) << "per exec"
1040  << " Name"
1041  << "";
1042  for (auto const& p : tr.endPathSummaries) {
1043  const int timesRun = std::max(1, p.timesRun);
1044 
1045  LogVerbatim("FwkSummary") << "TimeReport " << std::setprecision(6) << std::fixed << std::right
1046  << std::setw(kColumn1Size) << p.realTime / totalEvents << " " << std::right
1047  << std::setw(kColumn2Size) << p.realTime / timesRun << " " << p.name << "";
1048  }
1049  LogVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
1050  << " " << std::right << std::setw(kColumn2Size) << "per exec"
1051  << " Name"
1052  << "";
1053 
1054  for (auto const& p : tr.trigPathSummaries) {
1055  LogVerbatim("FwkSummary") << "";
1056  LogVerbatim("FwkSummary") << "TimeReport "
1057  << "---------- Modules in Path: " << p.name << " ---[Real sec]----";
1058  LogVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
1059  << " " << std::right << std::setw(kColumn2Size) << "per visit"
1060  << " Name"
1061  << "";
1062  for (auto const& mod : p.moduleInPathSummaries) {
1063  LogVerbatim("FwkSummary") << "TimeReport " << std::setprecision(6) << std::fixed << std::right
1064  << std::setw(kColumn1Size) << mod.realTime / totalEvents << " " << std::right
1065  << std::setw(kColumn2Size) << mod.realTime / std::max(1, mod.timesVisited) << " "
1066  << mod.moduleLabel << "";
1067  }
1068  }
1069  if (not tr.trigPathSummaries.empty()) {
1070  LogVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
1071  << " " << std::right << std::setw(kColumn2Size) << "per visit"
1072  << " Name"
1073  << "";
1074  }
1075  for (auto const& p : tr.endPathSummaries) {
1076  LogVerbatim("FwkSummary") << "";
1077  LogVerbatim("FwkSummary") << "TimeReport "
1078  << "------ Modules in End-Path: " << p.name << " ---[Real sec]----";
1079  LogVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
1080  << " " << std::right << std::setw(kColumn2Size) << "per visit"
1081  << " Name"
1082  << "";
1083  for (auto const& mod : p.moduleInPathSummaries) {
1084  LogVerbatim("FwkSummary") << "TimeReport " << std::setprecision(6) << std::fixed << std::right
1085  << std::setw(kColumn1Size) << mod.realTime / totalEvents << " " << std::right
1086  << std::setw(kColumn2Size) << mod.realTime / std::max(1, mod.timesVisited) << " "
1087  << mod.moduleLabel << "";
1088  }
1089  }
1090  if (not tr.endPathSummaries.empty()) {
1091  LogVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
1092  << " " << std::right << std::setw(kColumn2Size) << "per visit"
1093  << " Name"
1094  << "";
1095  }
1096  LogVerbatim("FwkSummary") << "";
1097  LogVerbatim("FwkSummary") << "TimeReport "
1098  << "---------- Module Summary ---[Real sec]----";
1099  LogVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
1100  << " " << std::right << std::setw(kColumn2Size) << "per exec"
1101  << " " << std::right << std::setw(kColumn3Size) << "per visit"
1102  << " Name"
1103  << "";
1104  for (auto const& worker : tr.workerSummaries) {
1105  LogVerbatim("FwkSummary") << "TimeReport " << std::setprecision(6) << std::fixed << std::right
1106  << std::setw(kColumn1Size) << worker.realTime / totalEvents << " " << std::right
1107  << std::setw(kColumn2Size) << worker.realTime / std::max(1, worker.timesRun) << " "
1108  << std::right << std::setw(kColumn3Size)
1109  << worker.realTime / std::max(1, worker.timesVisited) << " " << worker.moduleLabel
1110  << "";
1111  }
1112  LogVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
1113  << " " << std::right << std::setw(kColumn2Size) << "per exec"
1114  << " " << std::right << std::setw(kColumn3Size) << "per visit"
1115  << " Name"
1116  << "";
1117 
1118  LogVerbatim("FwkSummary") << "";
1119  LogVerbatim("FwkSummary") << "T---Report end!"
1120  << "";
1121  LogVerbatim("FwkSummary") << "";
1122  }
1123 
1125  using std::placeholders::_1;
1127  }
1128 
1130  using std::placeholders::_1;
1131  for_all(all_output_communicators_, std::bind(&OutputModuleCommunicator::openFile, _1, std::cref(fb)));
1132  }
1133 
1135  RunPrincipal const& rp,
1136  ProcessContext const* processContext,
1137  ActivityRegistry* activityRegistry,
1138  MergeableRunProductMetadata const* mergeableRunProductMetadata) {
1139  for (auto& c : all_output_communicators_) {
1140  c->writeRunAsync(task, rp, processContext, activityRegistry, mergeableRunProductMetadata);
1141  }
1142  }
1143 
1145  LuminosityBlockPrincipal const& lbp,
1146  ProcessContext const* processContext,
1147  ActivityRegistry* activityRegistry) {
1148  for (auto& c : all_output_communicators_) {
1149  c->writeLumiAsync(task, lbp, processContext, activityRegistry);
1150  }
1151  }
1152 
1154  using std::placeholders::_1;
1155  // Return true iff at least one output module returns true.
1156  return (std::find_if(all_output_communicators_.begin(),
1160  }
1161 
1163  using std::placeholders::_1;
1164  for_all(allWorkers(), std::bind(&Worker::respondToOpenInputFile, _1, std::cref(fb)));
1165  }
1166 
1168  using std::placeholders::_1;
1169  for_all(allWorkers(), std::bind(&Worker::respondToCloseInputFile, _1, std::cref(fb)));
1170  }
1171 
1172  void Schedule::beginJob(ProductRegistry const& iRegistry, eventsetup::ESRecordsToProxyIndices const& iESIndices) {
1173  globalSchedule_->beginJob(iRegistry, iESIndices);
1174  }
1175 
1176  void Schedule::beginStream(unsigned int iStreamID) {
1177  assert(iStreamID < streamSchedules_.size());
1178  streamSchedules_[iStreamID]->beginStream();
1179  }
1180 
1181  void Schedule::endStream(unsigned int iStreamID) {
1182  assert(iStreamID < streamSchedules_.size());
1183  streamSchedules_[iStreamID]->endStream();
1184  }
1185 
1187  unsigned int iStreamID,
1188  EventPrincipal& ep,
1189  EventSetupImpl const& es,
1190  ServiceToken const& token) {
1191  assert(iStreamID < streamSchedules_.size());
1192  streamSchedules_[iStreamID]->processOneEventAsync(std::move(iTask), ep, es, token, pathStatusInserters_);
1193  }
1194 
1196  ParameterSet const& iPSet,
1197  const ProductRegistry& iRegistry,
1198  eventsetup::ESRecordsToProxyIndices const& iIndices) {
1199  Worker* found = nullptr;
1200  for (auto const& worker : allWorkers()) {
1201  if (worker->description().moduleLabel() == iLabel) {
1202  found = worker;
1203  break;
1204  }
1205  }
1206  if (nullptr == found) {
1207  return false;
1208  }
1209 
1210  auto newMod = moduleRegistry_->replaceModule(iLabel, iPSet, preallocConfig_);
1211 
1212  globalSchedule_->replaceModule(newMod, iLabel);
1213 
1214  for (auto& s : streamSchedules_) {
1215  s->replaceModule(newMod, iLabel);
1216  }
1217 
1218  {
1219  //Need to updateLookup in order to make getByToken work
1220  auto const runLookup = iRegistry.productLookup(InRun);
1221  auto const lumiLookup = iRegistry.productLookup(InLumi);
1222  auto const eventLookup = iRegistry.productLookup(InEvent);
1223  found->updateLookup(InRun, *runLookup);
1224  found->updateLookup(InLumi, *lumiLookup);
1225  found->updateLookup(InEvent, *eventLookup);
1226  found->updateLookup(iIndices);
1227 
1228  auto const& processName = newMod->moduleDescription().processName();
1229  auto const& runModuleToIndicies = runLookup->indiciesForModulesInProcess(processName);
1230  auto const& lumiModuleToIndicies = lumiLookup->indiciesForModulesInProcess(processName);
1231  auto const& eventModuleToIndicies = eventLookup->indiciesForModulesInProcess(processName);
1232  found->resolvePutIndicies(InRun, runModuleToIndicies);
1233  found->resolvePutIndicies(InLumi, lumiModuleToIndicies);
1234  found->resolvePutIndicies(InEvent, eventModuleToIndicies);
1235  }
1236 
1237  return true;
1238  }
1239 
1240  std::vector<ModuleDescription const*> Schedule::getAllModuleDescriptions() const {
1241  std::vector<ModuleDescription const*> result;
1242  result.reserve(allWorkers().size());
1243 
1244  for (auto const& worker : allWorkers()) {
1245  ModuleDescription const* p = worker->descPtr();
1246  result.push_back(p);
1247  }
1248  return result;
1249  }
1250 
1251  Schedule::AllWorkers const& Schedule::allWorkers() const { return globalSchedule_->allWorkers(); }
1252 
1254  for (auto const& worker : allWorkers()) {
1255  worker->convertCurrentProcessAlias(processName);
1256  }
1257  }
1258 
1259  void Schedule::availablePaths(std::vector<std::string>& oLabelsToFill) const {
1260  streamSchedules_[0]->availablePaths(oLabelsToFill);
1261  }
1262 
1263  void Schedule::triggerPaths(std::vector<std::string>& oLabelsToFill) const { oLabelsToFill = *pathNames_; }
1264 
1265  void Schedule::endPaths(std::vector<std::string>& oLabelsToFill) const { oLabelsToFill = *endPathNames_; }
1266 
1267  void Schedule::modulesInPath(std::string const& iPathLabel, std::vector<std::string>& oLabelsToFill) const {
1268  streamSchedules_[0]->modulesInPath(iPathLabel, oLabelsToFill);
1269  }
1270 
1272  std::vector<ModuleDescription const*>& descriptions,
1273  unsigned int hint) const {
1274  streamSchedules_[0]->moduleDescriptionsInPath(iPathLabel, descriptions, hint);
1275  }
1276 
1278  std::vector<ModuleDescription const*>& descriptions,
1279  unsigned int hint) const {
1280  streamSchedules_[0]->moduleDescriptionsInEndPath(iEndPathLabel, descriptions, hint);
1281  }
1282 
1284  std::vector<ModuleDescription const*>& allModuleDescriptions,
1285  std::vector<std::pair<unsigned int, unsigned int>>& moduleIDToIndex,
1286  std::vector<std::vector<ModuleDescription const*>>& modulesWhoseProductsAreConsumedBy,
1287  ProductRegistry const& preg) const {
1288  allModuleDescriptions.clear();
1289  moduleIDToIndex.clear();
1290  modulesWhoseProductsAreConsumedBy.clear();
1291 
1292  allModuleDescriptions.reserve(allWorkers().size());
1293  moduleIDToIndex.reserve(allWorkers().size());
1294  modulesWhoseProductsAreConsumedBy.resize(allWorkers().size());
1295 
1296  std::map<std::string, ModuleDescription const*> labelToDesc;
1297  unsigned int i = 0;
1298  for (auto const& worker : allWorkers()) {
1299  ModuleDescription const* p = worker->descPtr();
1300  allModuleDescriptions.push_back(p);
1301  moduleIDToIndex.push_back(std::pair<unsigned int, unsigned int>(p->id(), i));
1302  labelToDesc[p->moduleLabel()] = p;
1303  ++i;
1304  }
1305  sort_all(moduleIDToIndex);
1306 
1307  i = 0;
1308  for (auto const& worker : allWorkers()) {
1309  std::vector<ModuleDescription const*>& modules = modulesWhoseProductsAreConsumedBy.at(i);
1310  worker->modulesWhoseProductsAreConsumed(modules, preg, labelToDesc);
1311  ++i;
1312  }
1313  }
1314 
1315  void Schedule::enableEndPaths(bool active) {
1316  endpathsAreActive_ = active;
1317  for (auto& s : streamSchedules_) {
1318  s->enableEndPaths(active);
1319  }
1320  }
1321 
1323 
1325  rep.eventSummary.totalEvents = 0;
1328  for (auto& s : streamSchedules_) {
1329  s->getTriggerReport(rep);
1330  }
1332  }
1333 
1335  rep.eventSummary.totalEvents = 0;
1336  rep.eventSummary.cpuTime = 0.;
1337  rep.eventSummary.realTime = 0.;
1338  summaryTimeKeeper_->fillTriggerTimingReport(rep);
1339  }
1340 
1342  int returnValue = 0;
1343  for (auto& s : streamSchedules_) {
1344  returnValue += s->totalEvents();
1345  }
1346  return returnValue;
1347  }
1348 
1350  int returnValue = 0;
1351  for (auto& s : streamSchedules_) {
1352  returnValue += s->totalEventsPassed();
1353  }
1354  return returnValue;
1355  }
1356 
1358  int returnValue = 0;
1359  for (auto& s : streamSchedules_) {
1360  returnValue += s->totalEventsFailed();
1361  }
1362  return returnValue;
1363  }
1364 
1366  for (auto& s : streamSchedules_) {
1367  s->clearCounters();
1368  }
1369  }
1370 } // namespace edm
size
Write out results.
bool empty() const
Definition: ParameterSet.h:190
std::vector< PathSummary > endPathSummaries
Definition: TriggerReport.h:59
T getUntrackedParameter(std::string const &, T const &) const
std::vector< PathTimingSummary > endPathSummaries
void stopEvent(StreamContext const &)
std::vector< BranchIDList > BranchIDLists
Definition: BranchIDList.h:19
virtual void openFile(FileBlock const &fb)=0
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
Definition: Schedule.cc:1251
roAction_t actions[nactions]
Definition: GenABIO.cc:181
void availablePaths(std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill the labels for all paths in the process
Definition: Schedule.cc:1259
void restartModuleEvent(StreamContext const &, ModuleCallingContext const &)
bool endPathsEnabled() const
Definition: Schedule.cc:1322
void respondToCloseInputFile(FileBlock const &fb)
Definition: Schedule.cc:1167
void startModuleEvent(StreamContext const &, ModuleCallingContext const &)
std::shared_ptr< ModuleRegistry const > moduleRegistry() const
Definition: Schedule.h:294
std::vector< Worker * > AllWorkers
Definition: Schedule.h:125
std::vector< ParameterSet > VParameterSet
Definition: ParameterSet.h:33
std::vector< std::string > const * pathNames_
Definition: Schedule.h:310
void convertCurrentProcessAlias(std::string const &processName)
Convert "@currentProcess" in InputTag process names to the actual current process name...
Definition: Schedule.cc:1253
virtual bool shouldWeCloseFile() const =0
void endStream(unsigned int)
Definition: Schedule.cc:1181
void enableEndPaths(bool active)
Definition: Schedule.cc:1315
ParameterSet getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
virtual void updateLookup(BranchType iBranchType, ProductResolverIndexHelper const &)=0
void moduleDescriptionsInEndPath(std::string const &iEndPathLabel, std::vector< ModuleDescription const * > &descriptions, unsigned int hint) const
Definition: Schedule.cc:1277
std::vector< WorkerSummary > workerSummaries
Definition: TriggerReport.h:60
edm::propagate_const< std::unique_ptr< SystemTimeKeeper > > summaryTimeKeeper_
Definition: Schedule.h:308
std::string const & moduleLabel() const
static unsigned int getUniqueID()
Returns a unique id each time called. Intended to be passed to ModuleDescription&#39;s constructor&#39;s modI...
int totalEventsFailed() const
Definition: Schedule.cc:1357
edm::propagate_const< std::unique_ptr< GlobalSchedule > > globalSchedule_
Definition: Schedule.h:303
void eraseOrSetUntrackedParameterSet(std::string const &name)
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
void processOneEventAsync(WaitingTaskHolder iTask, unsigned int iStreamID, EventPrincipal &principal, EventSetupImpl const &eventSetup, ServiceToken const &token)
Definition: Schedule.cc:1186
std::vector< std::string > getParameterNamesForType(bool trackiness=true) const
Definition: ParameterSet.h:168
int totalEventsPassed() const
Definition: Schedule.cc:1349
void triggerPaths(std::vector< std::string > &oLabelsToFill) const
Definition: Schedule.cc:1263
std::vector< PathSummary > trigPathSummaries
Definition: TriggerReport.h:58
EventSummary eventSummary
Definition: TriggerReport.h:57
std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter > > > pathStatusInserters_
Definition: Schedule.h:298
int totalEvents() const
Definition: Schedule.cc:1341
virtual void resolvePutIndicies(BranchType iBranchType, std::unordered_multimap< std::string, std::tuple< TypeID const *, const char *, edm::ProductResolverIndex >> const &iIndicies)=0
EventTimingSummary eventSummary
void clearCounters()
Clear all the counters in the trigger report.
Definition: Schedule.cc:1365
void beginJob(ProductRegistry const &, eventsetup::ESRecordsToProxyIndices const &)
Definition: Schedule.cc:1172
Schedule(ParameterSet &proc_pset, service::TriggerNamesService const &tns, ProductRegistry &pregistry, BranchIDListHelper &branchIDListHelper, ThinnedAssociationsHelper &thinnedAssociationsHelper, SubProcessParentageHelper const *subProcessParentageHelper, ExceptionToActionTable const &actions, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration > processConfiguration, bool hasSubprocesses, PreallocationConfiguration const &config, ProcessContext const *processContext)
Definition: Schedule.cc:571
std::vector< PathTimingSummary > trigPathSummaries
void limitOutput(ParameterSet const &proc_pset, BranchIDLists const &branchIDLists, SubProcessParentageHelper const *subProcessParentageHelper)
Definition: Schedule.cc:787
bool terminate() const
Return whether each output module has reached its maximum count.
Definition: Schedule.cc:828
void respondToOpenInputFile(FileBlock const &fb)
Definition: Schedule.cc:1162
edm::propagate_const< std::shared_ptr< ModuleRegistry > > moduleRegistry_
Definition: Schedule.h:300
rep
Definition: cuy.py:1190
void writeRunAsync(WaitingTaskHolder iTask, RunPrincipal const &rp, ProcessContext const *, ActivityRegistry *, MergeableRunProductMetadata const *)
Definition: Schedule.cc:1134
void stopPath(StreamContext const &, PathContext const &, HLTPathStatus const &)
def elem(elemtype, innerHTML='', html_class='', kwargs)
Definition: HTMLExport.py:19
void stopModuleEvent(StreamContext const &, ModuleCallingContext const &)
void getTriggerReport(TriggerReport &rep) const
Definition: Schedule.cc:1324
PreallocationConfiguration preallocConfig_
Definition: Schedule.h:306
void fillModuleAndConsumesInfo(std::vector< ModuleDescription const * > &allModuleDescriptions, std::vector< std::pair< unsigned int, unsigned int >> &moduleIDToIndex, std::vector< std::vector< ModuleDescription const * >> &modulesWhoseProductsAreConsumedBy, ProductRegistry const &preg) const
Definition: Schedule.cc:1283
std::vector< edm::propagate_const< std::shared_ptr< StreamSchedule > > > streamSchedules_
Definition: Schedule.h:301
void sort_all(RandomAccessSequence &s)
wrappers for std::sort
Definition: Algorithms.h:92
volatile bool endpathsAreActive_
Definition: Schedule.h:314
void respondToOpenInputFile(FileBlock const &fb)
Definition: Worker.h:173
std::shared_ptr< TriggerResultInserter const > resultsInserter() const
Definition: Schedule.h:290
void startPath(StreamContext const &, PathContext const &)
bool search_all(ForwardSequence const &s, Datum const &d)
Definition: Algorithms.h:36
std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter > > > endPathStatusInserters_
Definition: Schedule.h:299
virtual std::unique_ptr< OutputModuleCommunicator > createOutputModuleCommunicator()=0
AllOutputModuleCommunicators all_output_communicators_
Definition: Schedule.h:305
void modulesInPath(std::string const &iPathLabel, std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel ...
Definition: Schedule.cc:1267
void pauseModuleEvent(StreamContext const &, ModuleCallingContext const &)
std::shared_ptr< ProductResolverIndexHelper const > productLookup(BranchType branchType) const
void beginStream(unsigned int)
Definition: Schedule.cc:1176
void respondToCloseInputFile(FileBlock const &fb)
Definition: Worker.h:174
bool wantSummary_
Definition: Schedule.h:312
std::vector< std::string > const * endPathNames_
Definition: Schedule.h:311
HLT enums.
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
Definition: Schedule.cc:1240
Strings const & getTrigPaths() const
size_t getParameterSetNames(std::vector< std::string > &output, bool trackiness=true) const
void openOutputFiles(FileBlock &fb)
Definition: Schedule.cc:1129
void writeLumiAsync(WaitingTaskHolder iTask, LuminosityBlockPrincipal const &lbp, ProcessContext const *, ActivityRegistry *)
Definition: Schedule.cc:1144
std::vector< WorkerTimingSummary > workerSummaries
T mod(const T &a, const T &b)
Definition: ecalDccMap.h:4
void endJob(ExceptionCollector &collector)
Definition: Schedule.cc:843
void getTriggerTimingReport(TriggerTimingReport &rep) const
Definition: Schedule.cc:1334
bool shouldWeCloseOutput() const
Definition: Schedule.cc:1153
bool changeModule(std::string const &iLabel, ParameterSet const &iPSet, const ProductRegistry &iRegistry, eventsetup::ESRecordsToProxyIndices const &)
Definition: Schedule.cc:1195
std::vector< std::string > vstring
Definition: Schedule.cc:567
void setIsMergeable(BranchDescription &)
def move(src, dest)
Definition: eostools.py:511
#define constexpr
void closeOutputFiles()
Definition: Schedule.cc:1124
void endPaths(std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill the labels for all end paths in the process
Definition: Schedule.cc:1265
void moduleDescriptionsInPath(std::string const &iPathLabel, std::vector< ModuleDescription const * > &descriptions, unsigned int hint) const
Definition: Schedule.cc:1271
unsigned int id() const
std::string match(BranchDescription const &a, BranchDescription const &b, std::string const &fileName)
unsigned transform(const HcalDetId &id, unsigned transformCode)