CMS 3D CMS Logo

Schedule.cc
Go to the documentation of this file.
2 
35 
36 #include <algorithm>
37 #include <cassert>
38 #include <cstdlib>
39 #include <functional>
40 #include <iomanip>
41 #include <list>
42 #include <map>
43 #include <set>
44 #include <exception>
45 #include <sstream>
46 
48 
49 namespace edm {
50 
51  class Maker;
52 
53  namespace {
54  using std::placeholders::_1;
55 
56  bool binary_search_string(std::vector<std::string> const& v, std::string const& s) {
57  return std::binary_search(v.begin(), v.end(), s);
58  }
59 
60  // Here we make the trigger results inserter directly. This should
61  // probably be a utility in the WorkerRegistry or elsewhere.
62 
63  std::shared_ptr<TriggerResultInserter> makeInserter(ParameterSet& proc_pset,
64  PreallocationConfiguration const& iPrealloc,
65  ProductRegistry& preg,
66  ExceptionToActionTable const& actions,
67  std::shared_ptr<ActivityRegistry> areg,
68  std::shared_ptr<ProcessConfiguration> processConfiguration) {
69  ParameterSet* trig_pset = proc_pset.getPSetForUpdate("@trigger_paths");
70  trig_pset->registerIt();
71 
72  WorkerParams work_args(trig_pset, preg, &iPrealloc, processConfiguration, actions);
73  ModuleDescription md(trig_pset->id(),
74  "TriggerResultInserter",
75  "TriggerResults",
76  processConfiguration.get(),
78 
79  areg->preModuleConstructionSignal_(md);
80  bool postCalled = false;
81  std::shared_ptr<TriggerResultInserter> returnValue;
82  try {
83  maker::ModuleHolderT<TriggerResultInserter> holder(
84  make_shared_noexcept_false<TriggerResultInserter>(*trig_pset, iPrealloc.numberOfStreams()),
85  static_cast<Maker const*>(nullptr));
86  holder.setModuleDescription(md);
87  holder.registerProductsAndCallbacks(&preg);
88  returnValue = holder.module();
89  postCalled = true;
90  // if exception then post will be called in the catch block
91  areg->postModuleConstructionSignal_(md);
92  } catch (...) {
93  if (!postCalled) {
94  try {
95  areg->postModuleConstructionSignal_(md);
96  } catch (...) {
97  // If post throws an exception ignore it because we are already handling another exception
98  }
99  }
100  throw;
101  }
102  return returnValue;
103  }
104 
105  template <typename T>
106  void makePathStatusInserters(std::vector<edm::propagate_const<std::shared_ptr<T>>>& pathStatusInserters,
107  std::vector<std::string> const& pathNames,
108  PreallocationConfiguration const& iPrealloc,
109  ProductRegistry& preg,
110  std::shared_ptr<ActivityRegistry> areg,
111  std::shared_ptr<ProcessConfiguration> processConfiguration,
112  std::string const& moduleTypeName) {
114  pset.addParameter<std::string>("@module_type", moduleTypeName);
115  pset.addParameter<std::string>("@module_edm_type", "EDProducer");
116  pset.registerIt();
117 
118  pathStatusInserters.reserve(pathNames.size());
119 
120  for (auto const& pathName : pathNames) {
121  ModuleDescription md(
122  pset.id(), moduleTypeName, pathName, processConfiguration.get(), ModuleDescription::getUniqueID());
123 
124  areg->preModuleConstructionSignal_(md);
125  bool postCalled = false;
126 
127  try {
128  maker::ModuleHolderT<T> holder(make_shared_noexcept_false<T>(iPrealloc.numberOfStreams()),
129  static_cast<Maker const*>(nullptr));
130  holder.setModuleDescription(md);
131  holder.registerProductsAndCallbacks(&preg);
132  pathStatusInserters.emplace_back(holder.module());
133  postCalled = true;
134  // if exception then post will be called in the catch block
135  areg->postModuleConstructionSignal_(md);
136  } catch (...) {
137  if (!postCalled) {
138  try {
139  areg->postModuleConstructionSignal_(md);
140  } catch (...) {
141  // If post throws an exception ignore it because we are already handling another exception
142  }
143  }
144  throw;
145  }
146  }
147  }
148 
149  void checkAndInsertAlias(std::string const& friendlyClassName,
150  std::string const& moduleLabel,
151  std::string const& productInstanceName,
152  std::string const& processName,
153  std::string const& alias,
154  std::string const& instanceAlias,
155  ProductRegistry const& preg,
156  std::multimap<BranchKey, BranchKey>& aliasMap,
157  std::map<BranchKey, BranchKey>& aliasKeys) {
158  std::string const star("*");
159 
160  BranchKey key(friendlyClassName, moduleLabel, productInstanceName, processName);
161  if (preg.productList().find(key) == preg.productList().end()) {
162  // No product was found matching the alias.
163  // We throw an exception only if a module with the specified module label was created in this process.
164  for (auto const& product : preg.productList()) {
165  if (moduleLabel == product.first.moduleLabel() && processName == product.first.processName()) {
166  throw Exception(errors::Configuration, "EDAlias does not match data\n")
167  << "There are no products of type '" << friendlyClassName << "'\n"
168  << "with module label '" << moduleLabel << "' and instance name '" << productInstanceName << "'.\n";
169  }
170  }
171  }
172 
173  if (auto iter = aliasMap.find(key); iter != aliasMap.end()) {
174  // If the same EDAlias defines multiple products pointing to the same product, throw
175  if (iter->second.moduleLabel() == alias) {
176  throw Exception(errors::Configuration, "EDAlias conflict\n")
177  << "The module label alias '" << alias << "' is used for multiple products of type '" << friendlyClassName
178  << "' with module label '" << moduleLabel << "' and instance name '" << productInstanceName
179  << "'. One alias has the instance name '" << iter->first.productInstanceName()
180  << "' and the other has the instance name '" << instanceAlias << "'.";
181  }
182  }
183 
184  std::string const& theInstanceAlias(instanceAlias == star ? productInstanceName : instanceAlias);
185  BranchKey aliasKey(friendlyClassName, alias, theInstanceAlias, processName);
186  if (preg.productList().find(aliasKey) != preg.productList().end()) {
187  throw Exception(errors::Configuration, "EDAlias conflicts with data\n")
188  << "A product of type '" << friendlyClassName << "'\n"
189  << "with module label '" << alias << "' and instance name '" << theInstanceAlias << "'\n"
190  << "already exists.\n";
191  }
192  auto iter = aliasKeys.find(aliasKey);
193  if (iter != aliasKeys.end()) {
194  // The alias matches a previous one. If the same alias is used for different product, throw.
195  if (iter->second != key) {
196  throw Exception(errors::Configuration, "EDAlias conflict\n")
197  << "The module label alias '" << alias << "' and product instance alias '" << theInstanceAlias << "'\n"
198  << "are used for multiple products of type '" << friendlyClassName << "'\n"
199  << "One has module label '" << moduleLabel << "' and product instance name '" << productInstanceName
200  << "',\n"
201  << "the other has module label '" << iter->second.moduleLabel() << "' and product instance name '"
202  << iter->second.productInstanceName() << "'.\n";
203  }
204  } else {
205  auto prodIter = preg.productList().find(key);
206  if (prodIter != preg.productList().end()) {
207  if (!prodIter->second.produced()) {
208  throw Exception(errors::Configuration, "EDAlias\n")
209  << "The module label alias '" << alias << "' and product instance alias '" << theInstanceAlias << "'\n"
210  << "are used for a product of type '" << friendlyClassName << "'\n"
211  << "with module label '" << moduleLabel << "' and product instance name '" << productInstanceName
212  << "',\n"
213  << "An EDAlias can only be used for products produced in the current process. This one is not.\n";
214  }
215  aliasMap.insert(std::make_pair(key, aliasKey));
216  aliasKeys.insert(std::make_pair(aliasKey, key));
217  }
218  }
219  }
220 
221  void processEDAliases(ParameterSet const& proc_pset, std::string const& processName, ProductRegistry& preg) {
222  std::vector<std::string> aliases = proc_pset.getParameter<std::vector<std::string>>("@all_aliases");
223  if (aliases.empty()) {
224  return;
225  }
226  std::string const star("*");
227  std::string const empty("");
229  desc.add<std::string>("type");
230  desc.add<std::string>("fromProductInstance", star);
231  desc.add<std::string>("toProductInstance", star);
232 
233  std::multimap<BranchKey, BranchKey> aliasMap;
234 
235  std::map<BranchKey, BranchKey> aliasKeys; // Used to search for duplicates or clashes.
236 
237  // Now, loop over the alias information and store it in aliasMap.
238  for (std::string const& alias : aliases) {
239  ParameterSet const& aliasPSet = proc_pset.getParameterSet(alias);
240  std::vector<std::string> vPSetNames = aliasPSet.getParameterNamesForType<VParameterSet>();
241  for (std::string const& moduleLabel : vPSetNames) {
242  VParameterSet vPSet = aliasPSet.getParameter<VParameterSet>(moduleLabel);
243  for (ParameterSet& pset : vPSet) {
244  desc.validate(pset);
245  std::string friendlyClassName = pset.getParameter<std::string>("type");
246  std::string productInstanceName = pset.getParameter<std::string>("fromProductInstance");
247  std::string instanceAlias = pset.getParameter<std::string>("toProductInstance");
248  if (productInstanceName == star) {
249  bool match = false;
250  BranchKey lowerBound(friendlyClassName, moduleLabel, empty, empty);
251  for (ProductRegistry::ProductList::const_iterator it = preg.productList().lower_bound(lowerBound);
252  it != preg.productList().end() && it->first.friendlyClassName() == friendlyClassName &&
253  it->first.moduleLabel() == moduleLabel;
254  ++it) {
255  if (it->first.processName() != processName) {
256  continue;
257  }
258  match = true;
259 
260  checkAndInsertAlias(friendlyClassName,
261  moduleLabel,
262  it->first.productInstanceName(),
263  processName,
264  alias,
265  instanceAlias,
266  preg,
267  aliasMap,
268  aliasKeys);
269  }
270  if (!match) {
271  // No product was found matching the alias.
272  // We throw an exception only if a module with the specified module label was created in this process.
273  for (auto const& product : preg.productList()) {
274  if (moduleLabel == product.first.moduleLabel() && processName == product.first.processName()) {
275  throw Exception(errors::Configuration, "EDAlias parameter set mismatch\n")
276  << "There are no products of type '" << friendlyClassName << "'\n"
277  << "with module label '" << moduleLabel << "'.\n";
278  }
279  }
280  }
281  } else {
282  checkAndInsertAlias(friendlyClassName,
283  moduleLabel,
284  productInstanceName,
285  processName,
286  alias,
287  instanceAlias,
288  preg,
289  aliasMap,
290  aliasKeys);
291  }
292  }
293  }
294  }
295 
296  // Now add the new alias entries to the product registry.
297  for (auto const& aliasEntry : aliasMap) {
298  ProductRegistry::ProductList::const_iterator it = preg.productList().find(aliasEntry.first);
299  assert(it != preg.productList().end());
300  preg.addLabelAlias(it->second, aliasEntry.second.moduleLabel(), aliasEntry.second.productInstanceName());
301  }
302  }
303 
304  typedef std::vector<std::string> vstring;
305 
306  void processSwitchProducers(ParameterSet const& proc_pset, std::string const& processName, ProductRegistry& preg) {
307  // Update Switch BranchDescriptions for the chosen case
308  struct BranchesCases {
309  BranchesCases(std::vector<std::string> cases) : caseLabels{std::move(cases)} {}
310  std::vector<BranchKey> chosenBranches;
311  std::vector<std::string> caseLabels;
312  };
313  std::map<std::string, BranchesCases> switchMap;
314  for (auto& prod : preg.productListUpdator()) {
315  if (prod.second.isSwitchAlias()) {
316  auto it = switchMap.find(prod.second.moduleLabel());
317  if (it == switchMap.end()) {
318  auto const& switchPSet = proc_pset.getParameter<edm::ParameterSet>(prod.second.moduleLabel());
319  auto inserted = switchMap.emplace(prod.second.moduleLabel(),
320  switchPSet.getParameter<std::vector<std::string>>("@all_cases"));
321  assert(inserted.second);
322  it = inserted.first;
323  }
324 
325  for (auto const& item : preg.productList()) {
326  if (item.second.branchType() == prod.second.branchType() and
327  item.second.unwrappedTypeID().typeInfo() == prod.second.unwrappedTypeID().typeInfo() and
328  item.first.moduleLabel() == prod.second.switchAliasModuleLabel() and
329  item.first.productInstanceName() == prod.second.productInstanceName()) {
330  if (item.first.processName() != processName) {
332  << "Encountered a BranchDescription that is aliased-for by SwitchProducer, and whose processName "
333  << item.first.processName() << " differs from current process " << processName
334  << ". Module label is " << item.first.moduleLabel() << ".\nPlease contact a framework developer.";
335  }
336  prod.second.setSwitchAliasForBranch(item.second);
337  it->second.chosenBranches.push_back(prod.first); // with moduleLabel of the Switch
338  }
339  }
340  }
341  }
342  if (switchMap.empty())
343  return;
344 
345  for (auto& elem : switchMap) {
346  std::sort(elem.second.chosenBranches.begin(), elem.second.chosenBranches.end());
347  }
348 
349  // Check that non-chosen cases declare exactly the same branches
350  // Also set the alias-for branches to transient
351  std::vector<bool> foundBranches;
352  for (auto const& switchItem : switchMap) {
353  auto const& switchLabel = switchItem.first;
354  auto const& chosenBranches = switchItem.second.chosenBranches;
355  auto const& caseLabels = switchItem.second.caseLabels;
356  foundBranches.resize(chosenBranches.size());
357  for (auto const& caseLabel : caseLabels) {
358  std::fill(foundBranches.begin(), foundBranches.end(), false);
359  for (auto& nonConstItem : preg.productListUpdator()) {
360  auto const& item = nonConstItem;
361  if (item.first.moduleLabel() == caseLabel) {
362  // Set the alias-for branch as transient so it gets fully ignored in output.
363  // I tried first to implicitly drop all branches with
364  // '@' in ProductSelector, but that gave problems on
365  // input (those branches would be implicitly dropped on
366  // input as well, leading to the SwitchProducer branches
367  // do be dropped as dependent ones, as the alias
368  // detection logic in RootFile says that the
369  // SwitchProducer branches are not alias branches)
370  nonConstItem.second.setTransient(true);
371 
372  auto range = std::equal_range(chosenBranches.begin(),
373  chosenBranches.end(),
374  BranchKey(item.first.friendlyClassName(),
375  switchLabel,
376  item.first.productInstanceName(),
377  item.first.processName()));
378  if (range.first == range.second) {
380  << "SwitchProducer " << switchLabel << " has a case " << caseLabel << " with a product "
381  << item.first << " that is not produced by the chosen case "
382  << proc_pset.getParameter<edm::ParameterSet>(switchLabel)
383  .getUntrackedParameter<std::string>("@chosen_case");
384  }
385  assert(std::distance(range.first, range.second) == 1);
386  foundBranches[std::distance(chosenBranches.begin(), range.first)] = true;
387 
388  // Check that there are no BranchAliases for any of the cases
389  auto const& bd = item.second;
390  if (not bd.branchAliases().empty()) {
392  << "SwitchProducer does not support ROOT branch aliases. Got the following ROOT branch "
393  "aliases for SwitchProducer with label "
394  << switchLabel << " for case " << caseLabel << ":";
395  for (auto const& item : bd.branchAliases()) {
396  ex << " " << item;
397  }
398  throw ex;
399  }
400  }
401  }
402 
403  for (size_t i = 0; i < chosenBranches.size(); i++) {
404  if (not foundBranches[i]) {
406  << "SwitchProducer " << switchLabel << " has a case " << caseLabel
407  << " that does not produce a product " << chosenBranches[i] << " that is produced by the chosen case "
408  << proc_pset.getParameter<edm::ParameterSet>(switchLabel)
409  .getUntrackedParameter<std::string>("@chosen_case");
410  }
411  }
412  }
413  }
414  }
415 
416  void reduceParameterSet(ParameterSet& proc_pset,
417  vstring const& end_path_name_list,
418  vstring& modulesInConfig,
419  std::set<std::string> const& usedModuleLabels,
420  std::map<std::string, std::vector<std::pair<std::string, int>>>& outputModulePathPositions) {
421  // Before calculating the ParameterSetID of the top level ParameterSet or
422  // saving it in the registry drop from the top level ParameterSet all
423  // OutputModules and EDAnalyzers not on trigger paths. If unscheduled
424  // production is not enabled also drop all the EDFilters and EDProducers
425  // that are not scheduled. Drop the ParameterSet used to configure the module
426  // itself. Also drop the other traces of these labels in the top level
427  // ParameterSet: Remove that labels from @all_modules and from all the
428  // end paths. If this makes any end paths empty, then remove the end path
429  // name from @end_paths, and @paths.
430 
431  // First make a list of labels to drop
432  vstring outputModuleLabels;
433  std::string edmType;
434  std::string const moduleEdmType("@module_edm_type");
435  std::string const outputModule("OutputModule");
436  std::string const edAnalyzer("EDAnalyzer");
437  std::string const edFilter("EDFilter");
438  std::string const edProducer("EDProducer");
439 
440  std::set<std::string> modulesInConfigSet(modulesInConfig.begin(), modulesInConfig.end());
441 
442  //need a list of all modules on paths in order to determine
443  // if an EDAnalyzer only appears on an end path
444  vstring scheduledPaths = proc_pset.getParameter<vstring>("@paths");
445  std::set<std::string> modulesOnPaths;
446  {
447  std::set<std::string> noEndPaths(scheduledPaths.begin(), scheduledPaths.end());
448  for (auto const& endPath : end_path_name_list) {
449  noEndPaths.erase(endPath);
450  }
451  {
452  vstring labels;
453  for (auto const& path : noEndPaths) {
454  labels = proc_pset.getParameter<vstring>(path);
455  modulesOnPaths.insert(labels.begin(), labels.end());
456  }
457  }
458  }
459  //Initially fill labelsToBeDropped with all module mentioned in
460  // the configuration but which are not being used by the system
461  std::vector<std::string> labelsToBeDropped;
462  labelsToBeDropped.reserve(modulesInConfigSet.size());
463  std::set_difference(modulesInConfigSet.begin(),
464  modulesInConfigSet.end(),
465  usedModuleLabels.begin(),
466  usedModuleLabels.end(),
467  std::back_inserter(labelsToBeDropped));
468 
469  const unsigned int sizeBeforeOutputModules = labelsToBeDropped.size();
470  for (auto const& modLabel : usedModuleLabels) {
471  // Do nothing for modules that do not have a ParameterSet. Modules of type
472  // PathStatusInserter and EndPathStatusInserter will not have a ParameterSet.
473  if (proc_pset.existsAs<ParameterSet>(modLabel)) {
474  edmType = proc_pset.getParameterSet(modLabel).getParameter<std::string>(moduleEdmType);
475  if (edmType == outputModule) {
476  outputModuleLabels.push_back(modLabel);
477  labelsToBeDropped.push_back(modLabel);
478  }
479  if (edmType == edAnalyzer) {
480  if (modulesOnPaths.end() == modulesOnPaths.find(modLabel)) {
481  labelsToBeDropped.push_back(modLabel);
482  }
483  }
484  }
485  }
486  //labelsToBeDropped must be sorted
487  std::inplace_merge(
488  labelsToBeDropped.begin(), labelsToBeDropped.begin() + sizeBeforeOutputModules, labelsToBeDropped.end());
489 
490  // drop the parameter sets used to configure the modules
491  for_all(labelsToBeDropped, std::bind(&ParameterSet::eraseOrSetUntrackedParameterSet, std::ref(proc_pset), _1));
492 
493  // drop the labels from @all_modules
494  vstring::iterator endAfterRemove =
495  std::remove_if(modulesInConfig.begin(),
496  modulesInConfig.end(),
497  std::bind(binary_search_string, std::ref(labelsToBeDropped), _1));
498  modulesInConfig.erase(endAfterRemove, modulesInConfig.end());
499  proc_pset.addParameter<vstring>(std::string("@all_modules"), modulesInConfig);
500 
501  // drop the labels from all end paths
502  vstring endPathsToBeDropped;
503  vstring labels;
504  for (vstring::const_iterator iEndPath = end_path_name_list.begin(), endEndPath = end_path_name_list.end();
505  iEndPath != endEndPath;
506  ++iEndPath) {
507  labels = proc_pset.getParameter<vstring>(*iEndPath);
508  vstring::iterator iSave = labels.begin();
509  vstring::iterator iBegin = labels.begin();
510 
511  for (vstring::iterator iLabel = labels.begin(), iEnd = labels.end(); iLabel != iEnd; ++iLabel) {
512  if (binary_search_string(labelsToBeDropped, *iLabel)) {
513  if (binary_search_string(outputModuleLabels, *iLabel)) {
514  outputModulePathPositions[*iLabel].emplace_back(*iEndPath, iSave - iBegin);
515  }
516  } else {
517  if (iSave != iLabel) {
518  iSave->swap(*iLabel);
519  }
520  ++iSave;
521  }
522  }
523  labels.erase(iSave, labels.end());
524  if (labels.empty()) {
525  // remove empty end paths and save their names
526  proc_pset.eraseSimpleParameter(*iEndPath);
527  endPathsToBeDropped.push_back(*iEndPath);
528  } else {
529  proc_pset.addParameter<vstring>(*iEndPath, labels);
530  }
531  }
532  sort_all(endPathsToBeDropped);
533 
534  // remove empty end paths from @paths
535  endAfterRemove = std::remove_if(scheduledPaths.begin(),
536  scheduledPaths.end(),
537  std::bind(binary_search_string, std::ref(endPathsToBeDropped), _1));
538  scheduledPaths.erase(endAfterRemove, scheduledPaths.end());
539  proc_pset.addParameter<vstring>(std::string("@paths"), scheduledPaths);
540 
541  // remove empty end paths from @end_paths
542  vstring scheduledEndPaths = proc_pset.getParameter<vstring>("@end_paths");
543  endAfterRemove = std::remove_if(scheduledEndPaths.begin(),
544  scheduledEndPaths.end(),
545  std::bind(binary_search_string, std::ref(endPathsToBeDropped), _1));
546  scheduledEndPaths.erase(endAfterRemove, scheduledEndPaths.end());
547  proc_pset.addParameter<vstring>(std::string("@end_paths"), scheduledEndPaths);
548  }
549 
550  class RngEDConsumer : public EDConsumerBase {
551  public:
552  explicit RngEDConsumer(std::set<TypeID>& typesConsumed) {
554  if (rng.isAvailable()) {
555  rng->consumes(consumesCollector());
556  for (auto const& consumesInfo : this->consumesInfo()) {
557  typesConsumed.emplace(consumesInfo.type());
558  }
559  }
560  }
561  };
562  } // namespace
563  // -----------------------------
564 
565  typedef std::vector<std::string> vstring;
566 
567  // -----------------------------
568 
570  service::TriggerNamesService const& tns,
571  ProductRegistry& preg,
572  BranchIDListHelper& branchIDListHelper,
573  ThinnedAssociationsHelper& thinnedAssociationsHelper,
574  SubProcessParentageHelper const* subProcessParentageHelper,
575  ExceptionToActionTable const& actions,
576  std::shared_ptr<ActivityRegistry> areg,
577  std::shared_ptr<ProcessConfiguration> processConfiguration,
578  bool hasSubprocesses,
579  PreallocationConfiguration const& prealloc,
580  ProcessContext const* processContext)
581  : //Only create a resultsInserter if there is a trigger path
582  resultsInserter_{tns.getTrigPaths().empty()
583  ? std::shared_ptr<TriggerResultInserter>{}
584  : makeInserter(proc_pset, prealloc, preg, actions, areg, processConfiguration)},
587  preallocConfig_(prealloc),
588  pathNames_(&tns.getTrigPaths()),
589  endPathNames_(&tns.getEndPaths()),
590  wantSummary_(tns.wantSummary()),
591  endpathsAreActive_(true) {
592  makePathStatusInserters(pathStatusInserters_,
593  *pathNames_,
594  prealloc,
595  preg,
596  areg,
597  processConfiguration,
598  std::string("PathStatusInserter"));
599 
600  makePathStatusInserters(endPathStatusInserters_,
601  *endPathNames_,
602  prealloc,
603  preg,
604  areg,
605  processConfiguration,
606  std::string("EndPathStatusInserter"));
607 
608  assert(0 < prealloc.numberOfStreams());
609  streamSchedules_.reserve(prealloc.numberOfStreams());
610  for (unsigned int i = 0; i < prealloc.numberOfStreams(); ++i) {
611  streamSchedules_.emplace_back(make_shared_noexcept_false<StreamSchedule>(resultsInserter(),
614  moduleRegistry(),
615  proc_pset,
616  tns,
617  prealloc,
618  preg,
619  branchIDListHelper,
620  actions,
621  areg,
622  processConfiguration,
623  !hasSubprocesses,
624  StreamID{i},
625  processContext));
626  }
627 
628  //TriggerResults are injected automatically by StreamSchedules and are
629  // unknown to the ModuleRegistry
630  const std::string kTriggerResults("TriggerResults");
631  std::vector<std::string> modulesToUse;
632  modulesToUse.reserve(streamSchedules_[0]->allWorkers().size());
633  for (auto const& worker : streamSchedules_[0]->allWorkers()) {
634  if (worker->description().moduleLabel() != kTriggerResults) {
635  modulesToUse.push_back(worker->description().moduleLabel());
636  }
637  }
638  //The unscheduled modules are at the end of the list, but we want them at the front
639  unsigned int const nUnscheduledModules = streamSchedules_[0]->numberOfUnscheduledModules();
640  if (nUnscheduledModules > 0) {
641  std::vector<std::string> temp;
642  temp.reserve(modulesToUse.size());
643  auto itBeginUnscheduled = modulesToUse.begin() + modulesToUse.size() - nUnscheduledModules;
644  std::copy(itBeginUnscheduled, modulesToUse.end(), std::back_inserter(temp));
645  std::copy(modulesToUse.begin(), itBeginUnscheduled, std::back_inserter(temp));
646  temp.swap(modulesToUse);
647  }
648 
649  // propagate_const<T> has no reset() function
650  globalSchedule_ = std::make_unique<GlobalSchedule>(resultsInserter(),
653  moduleRegistry(),
654  modulesToUse,
655  proc_pset,
656  preg,
657  prealloc,
658  actions,
659  areg,
660  processConfiguration,
661  processContext);
662 
663  //TriggerResults is not in the top level ParameterSet so the call to
664  // reduceParameterSet would fail to find it. Just remove it up front.
665  std::set<std::string> usedModuleLabels;
666  for (auto const& worker : allWorkers()) {
667  if (worker->description().moduleLabel() != kTriggerResults) {
668  usedModuleLabels.insert(worker->description().moduleLabel());
669  }
670  }
671  std::vector<std::string> modulesInConfig(proc_pset.getParameter<std::vector<std::string>>("@all_modules"));
672  std::map<std::string, std::vector<std::pair<std::string, int>>> outputModulePathPositions;
673  reduceParameterSet(proc_pset, tns.getEndPaths(), modulesInConfig, usedModuleLabels, outputModulePathPositions);
674  processEDAliases(proc_pset, processConfiguration->processName(), preg);
675 
676  // At this point all BranchDescriptions are created. Mark now the
677  // ones of unscheduled workers to be on-demand.
678  if (nUnscheduledModules > 0) {
679  std::set<std::string> unscheduledModules(modulesToUse.begin(), modulesToUse.begin() + nUnscheduledModules);
680  preg.setUnscheduledProducts(unscheduledModules);
681  }
682 
683  processSwitchProducers(proc_pset, processConfiguration->processName(), preg);
684  proc_pset.registerIt();
685  processConfiguration->setParameterSetID(proc_pset.id());
686  processConfiguration->setProcessConfigurationID();
687 
688  // This is used for a little sanity-check to make sure no code
689  // modifications alter the number of workers at a later date.
690  size_t all_workers_count = allWorkers().size();
691 
692  moduleRegistry_->forAllModuleHolders([this](maker::ModuleHolder* iHolder) {
693  auto comm = iHolder->createOutputModuleCommunicator();
694  if (comm) {
695  all_output_communicators_.emplace_back(std::shared_ptr<OutputModuleCommunicator>{comm.release()});
696  }
697  });
698  // Now that the output workers are filled in, set any output limits or information.
699  limitOutput(proc_pset, branchIDListHelper.branchIDLists(), subProcessParentageHelper);
700 
701  // Sanity check: make sure nobody has added a worker after we've
702  // already relied on the WorkerManager being full.
703  assert(all_workers_count == allWorkers().size());
704 
705  branchIDListHelper.updateFromRegistry(preg);
706 
707  for (auto const& worker : streamSchedules_[0]->allWorkers()) {
708  worker->registerThinnedAssociations(preg, thinnedAssociationsHelper);
709  }
710  thinnedAssociationsHelper.sort();
711 
712  // The output modules consume products in kept branches.
713  // So we must set this up before freezing.
714  for (auto& c : all_output_communicators_) {
715  c->selectProducts(preg, thinnedAssociationsHelper);
716  }
717 
718  for (auto& product : preg.productListUpdator()) {
719  setIsMergeable(product.second);
720  }
721 
722  {
723  // We now get a collection of types that may be consumed.
724  std::set<TypeID> productTypesConsumed;
725  std::set<TypeID> elementTypesConsumed;
726  // Loop over all modules
727  for (auto const& worker : allWorkers()) {
728  for (auto const& consumesInfo : worker->consumesInfo()) {
729  if (consumesInfo.kindOfType() == PRODUCT_TYPE) {
730  productTypesConsumed.emplace(consumesInfo.type());
731  } else {
732  elementTypesConsumed.emplace(consumesInfo.type());
733  }
734  }
735  }
736  // The SubProcess class is not a module, yet it may consume.
737  if (hasSubprocesses) {
738  productTypesConsumed.emplace(typeid(TriggerResults));
739  }
740  // The RandomNumberGeneratorService is not a module, yet it consumes.
741  { RngEDConsumer rngConsumer = RngEDConsumer(productTypesConsumed); }
742  preg.setFrozen(productTypesConsumed, elementTypesConsumed, processConfiguration->processName());
743  }
744 
745  for (auto& c : all_output_communicators_) {
746  c->setEventSelectionInfo(outputModulePathPositions, preg.anyProductProduced());
747  }
748 
749  if (wantSummary_) {
750  std::vector<const ModuleDescription*> modDesc;
751  const auto& workers = allWorkers();
752  modDesc.reserve(workers.size());
753 
754  std::transform(workers.begin(),
755  workers.end(),
756  std::back_inserter(modDesc),
757  [](const Worker* iWorker) -> const ModuleDescription* { return iWorker->descPtr(); });
758 
759  // propagate_const<T> has no reset() function
760  summaryTimeKeeper_ = std::make_unique<SystemTimeKeeper>(prealloc.numberOfStreams(), modDesc, tns, processContext);
761  auto timeKeeperPtr = summaryTimeKeeper_.get();
762 
763  areg->watchPreModuleEvent(timeKeeperPtr, &SystemTimeKeeper::startModuleEvent);
764  areg->watchPostModuleEvent(timeKeeperPtr, &SystemTimeKeeper::stopModuleEvent);
765  areg->watchPreModuleEventAcquire(timeKeeperPtr, &SystemTimeKeeper::restartModuleEvent);
766  areg->watchPostModuleEventAcquire(timeKeeperPtr, &SystemTimeKeeper::stopModuleEvent);
767  areg->watchPreModuleEventDelayedGet(timeKeeperPtr, &SystemTimeKeeper::pauseModuleEvent);
768  areg->watchPostModuleEventDelayedGet(timeKeeperPtr, &SystemTimeKeeper::restartModuleEvent);
769 
770  areg->watchPreSourceEvent(timeKeeperPtr, &SystemTimeKeeper::startEvent);
771  areg->watchPostEvent(timeKeeperPtr, &SystemTimeKeeper::stopEvent);
772 
773  areg->watchPrePathEvent(timeKeeperPtr, &SystemTimeKeeper::startPath);
774  areg->watchPostPathEvent(timeKeeperPtr, &SystemTimeKeeper::stopPath);
775 
776  areg->watchPostBeginJob(timeKeeperPtr, &SystemTimeKeeper::startProcessingLoop);
777  areg->watchPreEndJob(timeKeeperPtr, &SystemTimeKeeper::stopProcessingLoop);
778  //areg->preModuleEventSignal_.connect([timeKeeperPtr](StreamContext const& iContext, ModuleCallingContext const& iMod) {
779  //timeKeeperPtr->startModuleEvent(iContext,iMod);
780  //});
781  }
782 
783  } // Schedule::Schedule
784 
785  void Schedule::limitOutput(ParameterSet const& proc_pset,
786  BranchIDLists const& branchIDLists,
787  SubProcessParentageHelper const* subProcessParentageHelper) {
788  std::string const output("output");
789 
790  ParameterSet const& maxEventsPSet = proc_pset.getUntrackedParameterSet("maxEvents");
791  int maxEventSpecs = 0;
792  int maxEventsOut = -1;
793  ParameterSet const* vMaxEventsOut = nullptr;
794  std::vector<std::string> intNamesE = maxEventsPSet.getParameterNamesForType<int>(false);
795  if (search_all(intNamesE, output)) {
796  maxEventsOut = maxEventsPSet.getUntrackedParameter<int>(output);
797  ++maxEventSpecs;
798  }
799  std::vector<std::string> psetNamesE;
800  maxEventsPSet.getParameterSetNames(psetNamesE, false);
801  if (search_all(psetNamesE, output)) {
802  vMaxEventsOut = &maxEventsPSet.getUntrackedParameterSet(output);
803  ++maxEventSpecs;
804  }
805 
806  if (maxEventSpecs > 1) {
808  << "\nAt most, one form of 'output' may appear in the 'maxEvents' parameter set";
809  }
810 
811  for (auto& c : all_output_communicators_) {
812  OutputModuleDescription desc(branchIDLists, maxEventsOut, subProcessParentageHelper);
813  if (vMaxEventsOut != nullptr && !vMaxEventsOut->empty()) {
814  std::string const& moduleLabel = c->description().moduleLabel();
815  try {
816  desc.maxEvents_ = vMaxEventsOut->getUntrackedParameter<int>(moduleLabel);
817  } catch (Exception const&) {
819  << "\nNo entry in 'maxEvents' for output module label '" << moduleLabel << "'.\n";
820  }
821  }
822  c->configure(desc);
823  }
824  }
825 
826  bool Schedule::terminate() const {
827  if (all_output_communicators_.empty()) {
828  return false;
829  }
830  for (auto& c : all_output_communicators_) {
831  if (!c->limitReached()) {
832  // Found an output module that has not reached output event count.
833  return false;
834  }
835  }
836  LogInfo("SuccessfulTermination") << "The job is terminating successfully because each output module\n"
837  << "has reached its configured limit.\n";
838  return true;
839  }
840 
842  globalSchedule_->endJob(collector);
843  if (collector.hasThrown()) {
844  return;
845  }
846 
847  if (wantSummary_ == false)
848  return;
849  {
850  TriggerReport tr;
851  getTriggerReport(tr);
852 
853  // The trigger report (pass/fail etc.):
854 
855  LogVerbatim("FwkSummary") << "";
856  if (streamSchedules_[0]->context().processContext()->isSubProcess()) {
857  LogVerbatim("FwkSummary") << "TrigReport Process: "
858  << streamSchedules_[0]->context().processContext()->processName();
859  }
860  LogVerbatim("FwkSummary") << "TrigReport "
861  << "---------- Event Summary ------------";
862  if (!tr.trigPathSummaries.empty()) {
863  LogVerbatim("FwkSummary") << "TrigReport"
864  << " Events total = " << tr.eventSummary.totalEvents
865  << " passed = " << tr.eventSummary.totalEventsPassed
866  << " failed = " << tr.eventSummary.totalEventsFailed << "";
867  } else {
868  LogVerbatim("FwkSummary") << "TrigReport"
869  << " Events total = " << tr.eventSummary.totalEvents
870  << " passed = " << tr.eventSummary.totalEvents << " failed = 0";
871  }
872 
873  LogVerbatim("FwkSummary") << "";
874  LogVerbatim("FwkSummary") << "TrigReport "
875  << "---------- Path Summary ------------";
876  LogVerbatim("FwkSummary") << "TrigReport " << std::right << std::setw(10) << "Trig Bit#"
877  << " " << std::right << std::setw(10) << "Executed"
878  << " " << std::right << std::setw(10) << "Passed"
879  << " " << std::right << std::setw(10) << "Failed"
880  << " " << std::right << std::setw(10) << "Error"
881  << " "
882  << "Name"
883  << "";
884  for (auto const& p : tr.trigPathSummaries) {
885  LogVerbatim("FwkSummary") << "TrigReport " << std::right << std::setw(5) << 1 << std::right << std::setw(5)
886  << p.bitPosition << " " << std::right << std::setw(10) << p.timesRun << " "
887  << std::right << std::setw(10) << p.timesPassed << " " << std::right << std::setw(10)
888  << p.timesFailed << " " << std::right << std::setw(10) << p.timesExcept << " "
889  << p.name << "";
890  }
891 
892  /*
893  std::vector<int>::const_iterator epi = empty_trig_paths_.begin();
894  std::vector<int>::const_iterator epe = empty_trig_paths_.end();
895  std::vector<std::string>::const_iterator epn = empty_trig_path_names_.begin();
896  for (; epi != epe; ++epi, ++epn) {
897 
898  LogVerbatim("FwkSummary") << "TrigReport "
899  << std::right << std::setw(5) << 1
900  << std::right << std::setw(5) << *epi << " "
901  << std::right << std::setw(10) << totalEvents() << " "
902  << std::right << std::setw(10) << totalEvents() << " "
903  << std::right << std::setw(10) << 0 << " "
904  << std::right << std::setw(10) << 0 << " "
905  << *epn << "";
906  }
907  */
908 
909  LogVerbatim("FwkSummary") << "";
910  LogVerbatim("FwkSummary") << "TrigReport "
911  << "-------End-Path Summary ------------";
912  LogVerbatim("FwkSummary") << "TrigReport " << std::right << std::setw(10) << "Trig Bit#"
913  << " " << std::right << std::setw(10) << "Executed"
914  << " " << std::right << std::setw(10) << "Passed"
915  << " " << std::right << std::setw(10) << "Failed"
916  << " " << std::right << std::setw(10) << "Error"
917  << " "
918  << "Name"
919  << "";
920  for (auto const& p : tr.endPathSummaries) {
921  LogVerbatim("FwkSummary") << "TrigReport " << std::right << std::setw(5) << 0 << std::right << std::setw(5)
922  << p.bitPosition << " " << std::right << std::setw(10) << p.timesRun << " "
923  << std::right << std::setw(10) << p.timesPassed << " " << std::right << std::setw(10)
924  << p.timesFailed << " " << std::right << std::setw(10) << p.timesExcept << " "
925  << p.name << "";
926  }
927 
928  for (auto const& p : tr.trigPathSummaries) {
929  LogVerbatim("FwkSummary") << "";
930  LogVerbatim("FwkSummary") << "TrigReport "
931  << "---------- Modules in Path: " << p.name << " ------------";
932  LogVerbatim("FwkSummary") << "TrigReport " << std::right << std::setw(10) << "Trig Bit#"
933  << " " << std::right << std::setw(10) << "Visited"
934  << " " << std::right << std::setw(10) << "Passed"
935  << " " << std::right << std::setw(10) << "Failed"
936  << " " << std::right << std::setw(10) << "Error"
937  << " "
938  << "Name"
939  << "";
940 
941  unsigned int bitpos = 0;
942  for (auto const& mod : p.moduleInPathSummaries) {
943  LogVerbatim("FwkSummary") << "TrigReport " << std::right << std::setw(5) << 1 << std::right << std::setw(5)
944  << bitpos << " " << std::right << std::setw(10) << mod.timesVisited << " "
945  << std::right << std::setw(10) << mod.timesPassed << " " << std::right
946  << std::setw(10) << mod.timesFailed << " " << std::right << std::setw(10)
947  << mod.timesExcept << " " << mod.moduleLabel << "";
948  ++bitpos;
949  }
950  }
951 
952  for (auto const& p : tr.endPathSummaries) {
953  LogVerbatim("FwkSummary") << "";
954  LogVerbatim("FwkSummary") << "TrigReport "
955  << "------ Modules in End-Path: " << p.name << " ------------";
956  LogVerbatim("FwkSummary") << "TrigReport " << std::right << std::setw(10) << "Trig Bit#"
957  << " " << std::right << std::setw(10) << "Visited"
958  << " " << std::right << std::setw(10) << "Passed"
959  << " " << std::right << std::setw(10) << "Failed"
960  << " " << std::right << std::setw(10) << "Error"
961  << " "
962  << "Name"
963  << "";
964 
965  unsigned int bitpos = 0;
966  for (auto const& mod : p.moduleInPathSummaries) {
967  LogVerbatim("FwkSummary") << "TrigReport " << std::right << std::setw(5) << 0 << std::right << std::setw(5)
968  << bitpos << " " << std::right << std::setw(10) << mod.timesVisited << " "
969  << std::right << std::setw(10) << mod.timesPassed << " " << std::right
970  << std::setw(10) << mod.timesFailed << " " << std::right << std::setw(10)
971  << mod.timesExcept << " " << mod.moduleLabel << "";
972  ++bitpos;
973  }
974  }
975 
976  LogVerbatim("FwkSummary") << "";
977  LogVerbatim("FwkSummary") << "TrigReport "
978  << "---------- Module Summary ------------";
979  LogVerbatim("FwkSummary") << "TrigReport " << std::right << std::setw(10) << "Visited"
980  << " " << std::right << std::setw(10) << "Executed"
981  << " " << std::right << std::setw(10) << "Passed"
982  << " " << std::right << std::setw(10) << "Failed"
983  << " " << std::right << std::setw(10) << "Error"
984  << " "
985  << "Name"
986  << "";
987  for (auto const& worker : tr.workerSummaries) {
988  LogVerbatim("FwkSummary") << "TrigReport " << std::right << std::setw(10) << worker.timesVisited << " "
989  << std::right << std::setw(10) << worker.timesRun << " " << std::right
990  << std::setw(10) << worker.timesPassed << " " << std::right << std::setw(10)
991  << worker.timesFailed << " " << std::right << std::setw(10) << worker.timesExcept
992  << " " << worker.moduleLabel << "";
993  }
994  LogVerbatim("FwkSummary") << "";
995  }
996  // The timing report (CPU and Real Time):
999 
1000  const int totalEvents = std::max(1, tr.eventSummary.totalEvents);
1001 
1002  LogVerbatim("FwkSummary") << "TimeReport "
1003  << "---------- Event Summary ---[sec]----";
1004  LogVerbatim("FwkSummary") << "TimeReport" << std::setprecision(6) << std::fixed
1005  << " event loop CPU/event = " << tr.eventSummary.cpuTime / totalEvents;
1006  LogVerbatim("FwkSummary") << "TimeReport" << std::setprecision(6) << std::fixed
1007  << " event loop Real/event = " << tr.eventSummary.realTime / totalEvents;
1008  LogVerbatim("FwkSummary") << "TimeReport" << std::setprecision(6) << std::fixed
1009  << " sum Streams Real/event = " << tr.eventSummary.sumStreamRealTime / totalEvents;
1010  LogVerbatim("FwkSummary") << "TimeReport" << std::setprecision(6) << std::fixed << " efficiency CPU/Real/thread = "
1012 
1013  constexpr int kColumn1Size = 10;
1014  constexpr int kColumn2Size = 12;
1015  constexpr int kColumn3Size = 12;
1016  LogVerbatim("FwkSummary") << "";
1017  LogVerbatim("FwkSummary") << "TimeReport "
1018  << "---------- Path Summary ---[Real sec]----";
1019  LogVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
1020  << " " << std::right << std::setw(kColumn2Size) << "per exec"
1021  << " Name";
1022  for (auto const& p : tr.trigPathSummaries) {
1023  const int timesRun = std::max(1, p.timesRun);
1024  LogVerbatim("FwkSummary") << "TimeReport " << std::setprecision(6) << std::fixed << std::right
1025  << std::setw(kColumn1Size) << p.realTime / totalEvents << " " << std::right
1026  << std::setw(kColumn2Size) << p.realTime / timesRun << " " << p.name << "";
1027  }
1028  LogVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
1029  << " " << std::right << std::setw(kColumn2Size) << "per exec"
1030  << " Name"
1031  << "";
1032 
1033  LogVerbatim("FwkSummary") << "";
1034  LogVerbatim("FwkSummary") << "TimeReport "
1035  << "-------End-Path Summary ---[Real sec]----";
1036  LogVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
1037  << " " << std::right << std::setw(kColumn2Size) << "per exec"
1038  << " Name"
1039  << "";
1040  for (auto const& p : tr.endPathSummaries) {
1041  const int timesRun = std::max(1, p.timesRun);
1042 
1043  LogVerbatim("FwkSummary") << "TimeReport " << std::setprecision(6) << std::fixed << std::right
1044  << std::setw(kColumn1Size) << p.realTime / totalEvents << " " << std::right
1045  << std::setw(kColumn2Size) << p.realTime / timesRun << " " << p.name << "";
1046  }
1047  LogVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
1048  << " " << std::right << std::setw(kColumn2Size) << "per exec"
1049  << " Name"
1050  << "";
1051 
1052  for (auto const& p : tr.trigPathSummaries) {
1053  LogVerbatim("FwkSummary") << "";
1054  LogVerbatim("FwkSummary") << "TimeReport "
1055  << "---------- Modules in Path: " << p.name << " ---[Real sec]----";
1056  LogVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
1057  << " " << std::right << std::setw(kColumn2Size) << "per visit"
1058  << " Name"
1059  << "";
1060  for (auto const& mod : p.moduleInPathSummaries) {
1061  LogVerbatim("FwkSummary") << "TimeReport " << std::setprecision(6) << std::fixed << std::right
1062  << std::setw(kColumn1Size) << mod.realTime / totalEvents << " " << std::right
1063  << std::setw(kColumn2Size) << mod.realTime / std::max(1, mod.timesVisited) << " "
1064  << mod.moduleLabel << "";
1065  }
1066  }
1067  if (not tr.trigPathSummaries.empty()) {
1068  LogVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
1069  << " " << std::right << std::setw(kColumn2Size) << "per visit"
1070  << " Name"
1071  << "";
1072  }
1073  for (auto const& p : tr.endPathSummaries) {
1074  LogVerbatim("FwkSummary") << "";
1075  LogVerbatim("FwkSummary") << "TimeReport "
1076  << "------ Modules in End-Path: " << p.name << " ---[Real sec]----";
1077  LogVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
1078  << " " << std::right << std::setw(kColumn2Size) << "per visit"
1079  << " Name"
1080  << "";
1081  for (auto const& mod : p.moduleInPathSummaries) {
1082  LogVerbatim("FwkSummary") << "TimeReport " << std::setprecision(6) << std::fixed << std::right
1083  << std::setw(kColumn1Size) << mod.realTime / totalEvents << " " << std::right
1084  << std::setw(kColumn2Size) << mod.realTime / std::max(1, mod.timesVisited) << " "
1085  << mod.moduleLabel << "";
1086  }
1087  }
1088  if (not tr.endPathSummaries.empty()) {
1089  LogVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
1090  << " " << std::right << std::setw(kColumn2Size) << "per visit"
1091  << " Name"
1092  << "";
1093  }
1094  LogVerbatim("FwkSummary") << "";
1095  LogVerbatim("FwkSummary") << "TimeReport "
1096  << "---------- Module Summary ---[Real sec]----";
1097  LogVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
1098  << " " << std::right << std::setw(kColumn2Size) << "per exec"
1099  << " " << std::right << std::setw(kColumn3Size) << "per visit"
1100  << " Name"
1101  << "";
1102  for (auto const& worker : tr.workerSummaries) {
1103  LogVerbatim("FwkSummary") << "TimeReport " << std::setprecision(6) << std::fixed << std::right
1104  << std::setw(kColumn1Size) << worker.realTime / totalEvents << " " << std::right
1105  << std::setw(kColumn2Size) << worker.realTime / std::max(1, worker.timesRun) << " "
1106  << std::right << std::setw(kColumn3Size)
1107  << worker.realTime / std::max(1, worker.timesVisited) << " " << worker.moduleLabel
1108  << "";
1109  }
1110  LogVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
1111  << " " << std::right << std::setw(kColumn2Size) << "per exec"
1112  << " " << std::right << std::setw(kColumn3Size) << "per visit"
1113  << " Name"
1114  << "";
1115 
1116  LogVerbatim("FwkSummary") << "";
1117  LogVerbatim("FwkSummary") << "T---Report end!"
1118  << "";
1119  LogVerbatim("FwkSummary") << "";
1120  }
1121 
1123  using std::placeholders::_1;
1125  }
1126 
1128  using std::placeholders::_1;
1129  for_all(all_output_communicators_, std::bind(&OutputModuleCommunicator::openFile, _1, std::cref(fb)));
1130  }
1131 
1133  RunPrincipal const& rp,
1134  ProcessContext const* processContext,
1135  ActivityRegistry* activityRegistry,
1136  MergeableRunProductMetadata const* mergeableRunProductMetadata) {
1137  for (auto& c : all_output_communicators_) {
1138  c->writeRunAsync(task, rp, processContext, activityRegistry, mergeableRunProductMetadata);
1139  }
1140  }
1141 
1143  LuminosityBlockPrincipal const& lbp,
1144  ProcessContext const* processContext,
1145  ActivityRegistry* activityRegistry) {
1146  for (auto& c : all_output_communicators_) {
1147  c->writeLumiAsync(task, lbp, processContext, activityRegistry);
1148  }
1149  }
1150 
1152  using std::placeholders::_1;
1153  // Return true iff at least one output module returns true.
1154  return (std::find_if(all_output_communicators_.begin(),
1158  }
1159 
1161  using std::placeholders::_1;
1162  for_all(allWorkers(), std::bind(&Worker::respondToOpenInputFile, _1, std::cref(fb)));
1163  }
1164 
1166  using std::placeholders::_1;
1167  for_all(allWorkers(), std::bind(&Worker::respondToCloseInputFile, _1, std::cref(fb)));
1168  }
1169 
1170  void Schedule::beginJob(ProductRegistry const& iRegistry, eventsetup::ESRecordsToProxyIndices const& iESIndices) {
1171  globalSchedule_->beginJob(iRegistry, iESIndices);
1172  }
1173 
1174  void Schedule::beginStream(unsigned int iStreamID) {
1175  assert(iStreamID < streamSchedules_.size());
1176  streamSchedules_[iStreamID]->beginStream();
1177  }
1178 
1179  void Schedule::endStream(unsigned int iStreamID) {
1180  assert(iStreamID < streamSchedules_.size());
1181  streamSchedules_[iStreamID]->endStream();
1182  }
1183 
1185  unsigned int iStreamID,
1186  EventPrincipal& ep,
1187  EventSetupImpl const& es,
1188  ServiceToken const& token) {
1189  assert(iStreamID < streamSchedules_.size());
1190  streamSchedules_[iStreamID]->processOneEventAsync(std::move(iTask), ep, es, token, pathStatusInserters_);
1191  }
1192 
1194  ParameterSet const& iPSet,
1195  const ProductRegistry& iRegistry,
1196  eventsetup::ESRecordsToProxyIndices const& iIndices) {
1197  Worker* found = nullptr;
1198  for (auto const& worker : allWorkers()) {
1199  if (worker->description().moduleLabel() == iLabel) {
1200  found = worker;
1201  break;
1202  }
1203  }
1204  if (nullptr == found) {
1205  return false;
1206  }
1207 
1208  auto newMod = moduleRegistry_->replaceModule(iLabel, iPSet, preallocConfig_);
1209 
1210  globalSchedule_->replaceModule(newMod, iLabel);
1211 
1212  for (auto& s : streamSchedules_) {
1213  s->replaceModule(newMod, iLabel);
1214  }
1215 
1216  {
1217  //Need to updateLookup in order to make getByToken work
1218  auto const runLookup = iRegistry.productLookup(InRun);
1219  auto const lumiLookup = iRegistry.productLookup(InLumi);
1220  auto const eventLookup = iRegistry.productLookup(InEvent);
1221  found->updateLookup(InRun, *runLookup);
1222  found->updateLookup(InLumi, *lumiLookup);
1223  found->updateLookup(InEvent, *eventLookup);
1224  found->updateLookup(iIndices);
1225 
1226  auto const& processName = newMod->moduleDescription().processName();
1227  auto const& runModuleToIndicies = runLookup->indiciesForModulesInProcess(processName);
1228  auto const& lumiModuleToIndicies = lumiLookup->indiciesForModulesInProcess(processName);
1229  auto const& eventModuleToIndicies = eventLookup->indiciesForModulesInProcess(processName);
1230  found->resolvePutIndicies(InRun, runModuleToIndicies);
1231  found->resolvePutIndicies(InLumi, lumiModuleToIndicies);
1232  found->resolvePutIndicies(InEvent, eventModuleToIndicies);
1233  }
1234 
1235  return true;
1236  }
1237 
1238  std::vector<ModuleDescription const*> Schedule::getAllModuleDescriptions() const {
1239  std::vector<ModuleDescription const*> result;
1240  result.reserve(allWorkers().size());
1241 
1242  for (auto const& worker : allWorkers()) {
1243  ModuleDescription const* p = worker->descPtr();
1244  result.push_back(p);
1245  }
1246  return result;
1247  }
1248 
1249  Schedule::AllWorkers const& Schedule::allWorkers() const { return globalSchedule_->allWorkers(); }
1250 
1252  for (auto const& worker : allWorkers()) {
1253  worker->convertCurrentProcessAlias(processName);
1254  }
1255  }
1256 
1257  void Schedule::availablePaths(std::vector<std::string>& oLabelsToFill) const {
1258  streamSchedules_[0]->availablePaths(oLabelsToFill);
1259  }
1260 
1261  void Schedule::triggerPaths(std::vector<std::string>& oLabelsToFill) const { oLabelsToFill = *pathNames_; }
1262 
1263  void Schedule::endPaths(std::vector<std::string>& oLabelsToFill) const { oLabelsToFill = *endPathNames_; }
1264 
1265  void Schedule::modulesInPath(std::string const& iPathLabel, std::vector<std::string>& oLabelsToFill) const {
1266  streamSchedules_[0]->modulesInPath(iPathLabel, oLabelsToFill);
1267  }
1268 
1270  std::vector<ModuleDescription const*>& descriptions,
1271  unsigned int hint) const {
1272  streamSchedules_[0]->moduleDescriptionsInPath(iPathLabel, descriptions, hint);
1273  }
1274 
1276  std::vector<ModuleDescription const*>& descriptions,
1277  unsigned int hint) const {
1278  streamSchedules_[0]->moduleDescriptionsInEndPath(iEndPathLabel, descriptions, hint);
1279  }
1280 
1282  std::vector<ModuleDescription const*>& allModuleDescriptions,
1283  std::vector<std::pair<unsigned int, unsigned int>>& moduleIDToIndex,
1284  std::vector<std::vector<ModuleDescription const*>>& modulesWhoseProductsAreConsumedBy,
1285  ProductRegistry const& preg) const {
1286  allModuleDescriptions.clear();
1287  moduleIDToIndex.clear();
1288  modulesWhoseProductsAreConsumedBy.clear();
1289 
1290  allModuleDescriptions.reserve(allWorkers().size());
1291  moduleIDToIndex.reserve(allWorkers().size());
1292  modulesWhoseProductsAreConsumedBy.resize(allWorkers().size());
1293 
1294  std::map<std::string, ModuleDescription const*> labelToDesc;
1295  unsigned int i = 0;
1296  for (auto const& worker : allWorkers()) {
1297  ModuleDescription const* p = worker->descPtr();
1298  allModuleDescriptions.push_back(p);
1299  moduleIDToIndex.push_back(std::pair<unsigned int, unsigned int>(p->id(), i));
1300  labelToDesc[p->moduleLabel()] = p;
1301  ++i;
1302  }
1303  sort_all(moduleIDToIndex);
1304 
1305  i = 0;
1306  for (auto const& worker : allWorkers()) {
1307  std::vector<ModuleDescription const*>& modules = modulesWhoseProductsAreConsumedBy.at(i);
1308  worker->modulesWhoseProductsAreConsumed(modules, preg, labelToDesc);
1309  ++i;
1310  }
1311  }
1312 
1313  void Schedule::enableEndPaths(bool active) {
1314  endpathsAreActive_ = active;
1315  for (auto& s : streamSchedules_) {
1316  s->enableEndPaths(active);
1317  }
1318  }
1319 
1321 
1323  rep.eventSummary.totalEvents = 0;
1326  for (auto& s : streamSchedules_) {
1327  s->getTriggerReport(rep);
1328  }
1330  }
1331 
1333  rep.eventSummary.totalEvents = 0;
1334  rep.eventSummary.cpuTime = 0.;
1335  rep.eventSummary.realTime = 0.;
1336  summaryTimeKeeper_->fillTriggerTimingReport(rep);
1337  }
1338 
1340  int returnValue = 0;
1341  for (auto& s : streamSchedules_) {
1342  returnValue += s->totalEvents();
1343  }
1344  return returnValue;
1345  }
1346 
1348  int returnValue = 0;
1349  for (auto& s : streamSchedules_) {
1350  returnValue += s->totalEventsPassed();
1351  }
1352  return returnValue;
1353  }
1354 
1356  int returnValue = 0;
1357  for (auto& s : streamSchedules_) {
1358  returnValue += s->totalEventsFailed();
1359  }
1360  return returnValue;
1361  }
1362 
1364  for (auto& s : streamSchedules_) {
1365  s->clearCounters();
1366  }
1367  }
1368 } // namespace edm
size
Write out results.
bool empty() const
Definition: ParameterSet.h:191
std::vector< PathSummary > endPathSummaries
Definition: TriggerReport.h:59
T getUntrackedParameter(std::string const &, T const &) const
std::vector< PathTimingSummary > endPathSummaries
void stopEvent(StreamContext const &)
std::vector< BranchIDList > BranchIDLists
Definition: BranchIDList.h:19
virtual void openFile(FileBlock const &fb)=0
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
Definition: Schedule.cc:1249
roAction_t actions[nactions]
Definition: GenABIO.cc:181
void availablePaths(std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill the labels for all paths in the process
Definition: Schedule.cc:1257
void restartModuleEvent(StreamContext const &, ModuleCallingContext const &)
bool endPathsEnabled() const
Definition: Schedule.cc:1320
def copy(args, dbName)
void respondToCloseInputFile(FileBlock const &fb)
Definition: Schedule.cc:1165
void startModuleEvent(StreamContext const &, ModuleCallingContext const &)
std::shared_ptr< ModuleRegistry const > moduleRegistry() const
Definition: Schedule.h:294
std::vector< Worker * > AllWorkers
Definition: Schedule.h:125
std::vector< ParameterSet > VParameterSet
Definition: ParameterSet.h:33
std::vector< std::string > const * pathNames_
Definition: Schedule.h:310
void convertCurrentProcessAlias(std::string const &processName)
Convert "@currentProcess" in InputTag process names to the actual current process name...
Definition: Schedule.cc:1251
virtual bool shouldWeCloseFile() const =0
void endStream(unsigned int)
Definition: Schedule.cc:1179
void enableEndPaths(bool active)
Definition: Schedule.cc:1313
ParameterSet getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
virtual void updateLookup(BranchType iBranchType, ProductResolverIndexHelper const &)=0
void moduleDescriptionsInEndPath(std::string const &iEndPathLabel, std::vector< ModuleDescription const * > &descriptions, unsigned int hint) const
Definition: Schedule.cc:1275
std::vector< WorkerSummary > workerSummaries
Definition: TriggerReport.h:60
edm::propagate_const< std::unique_ptr< SystemTimeKeeper > > summaryTimeKeeper_
Definition: Schedule.h:308
std::string const & moduleLabel() const
static unsigned int getUniqueID()
Returns a unique id each time called. Intended to be passed to ModuleDescription&#39;s constructor&#39;s modI...
int totalEventsFailed() const
Definition: Schedule.cc:1355
edm::propagate_const< std::unique_ptr< GlobalSchedule > > globalSchedule_
Definition: Schedule.h:303
void eraseOrSetUntrackedParameterSet(std::string const &name)
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
void processOneEventAsync(WaitingTaskHolder iTask, unsigned int iStreamID, EventPrincipal &principal, EventSetupImpl const &eventSetup, ServiceToken const &token)
Definition: Schedule.cc:1184
std::vector< std::string > getParameterNamesForType(bool trackiness=true) const
Definition: ParameterSet.h:169
int totalEventsPassed() const
Definition: Schedule.cc:1347
void triggerPaths(std::vector< std::string > &oLabelsToFill) const
Definition: Schedule.cc:1261
std::vector< PathSummary > trigPathSummaries
Definition: TriggerReport.h:58
EventSummary eventSummary
Definition: TriggerReport.h:57
std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter > > > pathStatusInserters_
Definition: Schedule.h:298
int totalEvents() const
Definition: Schedule.cc:1339
virtual void resolvePutIndicies(BranchType iBranchType, std::unordered_multimap< std::string, std::tuple< TypeID const *, const char *, edm::ProductResolverIndex >> const &iIndicies)=0
EventTimingSummary eventSummary
void clearCounters()
Clear all the counters in the trigger report.
Definition: Schedule.cc:1363
void beginJob(ProductRegistry const &, eventsetup::ESRecordsToProxyIndices const &)
Definition: Schedule.cc:1170
Schedule(ParameterSet &proc_pset, service::TriggerNamesService const &tns, ProductRegistry &pregistry, BranchIDListHelper &branchIDListHelper, ThinnedAssociationsHelper &thinnedAssociationsHelper, SubProcessParentageHelper const *subProcessParentageHelper, ExceptionToActionTable const &actions, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration > processConfiguration, bool hasSubprocesses, PreallocationConfiguration const &config, ProcessContext const *processContext)
Definition: Schedule.cc:569
std::vector< PathTimingSummary > trigPathSummaries
void limitOutput(ParameterSet const &proc_pset, BranchIDLists const &branchIDLists, SubProcessParentageHelper const *subProcessParentageHelper)
Definition: Schedule.cc:785
bool terminate() const
Return whether each output module has reached its maximum count.
Definition: Schedule.cc:826
void respondToOpenInputFile(FileBlock const &fb)
Definition: Schedule.cc:1160
edm::propagate_const< std::shared_ptr< ModuleRegistry > > moduleRegistry_
Definition: Schedule.h:300
rep
Definition: cuy.py:1190
void writeRunAsync(WaitingTaskHolder iTask, RunPrincipal const &rp, ProcessContext const *, ActivityRegistry *, MergeableRunProductMetadata const *)
Definition: Schedule.cc:1132
void stopPath(StreamContext const &, PathContext const &, HLTPathStatus const &)
def elem(elemtype, innerHTML='', html_class='', kwargs)
Definition: HTMLExport.py:19
void stopModuleEvent(StreamContext const &, ModuleCallingContext const &)
void getTriggerReport(TriggerReport &rep) const
Definition: Schedule.cc:1322
PreallocationConfiguration preallocConfig_
Definition: Schedule.h:306
void fillModuleAndConsumesInfo(std::vector< ModuleDescription const * > &allModuleDescriptions, std::vector< std::pair< unsigned int, unsigned int >> &moduleIDToIndex, std::vector< std::vector< ModuleDescription const * >> &modulesWhoseProductsAreConsumedBy, ProductRegistry const &preg) const
Definition: Schedule.cc:1281
std::vector< edm::propagate_const< std::shared_ptr< StreamSchedule > > > streamSchedules_
Definition: Schedule.h:301
void sort_all(RandomAccessSequence &s)
wrappers for std::sort
Definition: Algorithms.h:92
volatile bool endpathsAreActive_
Definition: Schedule.h:314
void respondToOpenInputFile(FileBlock const &fb)
Definition: Worker.h:173
std::shared_ptr< TriggerResultInserter const > resultsInserter() const
Definition: Schedule.h:290
void startPath(StreamContext const &, PathContext const &)
bool search_all(ForwardSequence const &s, Datum const &d)
Definition: Algorithms.h:36
std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter > > > endPathStatusInserters_
Definition: Schedule.h:299
virtual std::unique_ptr< OutputModuleCommunicator > createOutputModuleCommunicator()=0
AllOutputModuleCommunicators all_output_communicators_
Definition: Schedule.h:305
void modulesInPath(std::string const &iPathLabel, std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel ...
Definition: Schedule.cc:1265
void pauseModuleEvent(StreamContext const &, ModuleCallingContext const &)
std::shared_ptr< ProductResolverIndexHelper const > productLookup(BranchType branchType) const
void beginStream(unsigned int)
Definition: Schedule.cc:1174
void respondToCloseInputFile(FileBlock const &fb)
Definition: Worker.h:174
bool wantSummary_
Definition: Schedule.h:312
std::vector< std::string > const * endPathNames_
Definition: Schedule.h:311
HLT enums.
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
Definition: Schedule.cc:1238
Strings const & getTrigPaths() const
size_t getParameterSetNames(std::vector< std::string > &output, bool trackiness=true) const
void openOutputFiles(FileBlock &fb)
Definition: Schedule.cc:1127
void writeLumiAsync(WaitingTaskHolder iTask, LuminosityBlockPrincipal const &lbp, ProcessContext const *, ActivityRegistry *)
Definition: Schedule.cc:1142
std::vector< WorkerTimingSummary > workerSummaries
T mod(const T &a, const T &b)
Definition: ecalDccMap.h:4
void endJob(ExceptionCollector &collector)
Definition: Schedule.cc:841
void getTriggerTimingReport(TriggerTimingReport &rep) const
Definition: Schedule.cc:1332
bool shouldWeCloseOutput() const
Definition: Schedule.cc:1151
bool changeModule(std::string const &iLabel, ParameterSet const &iPSet, const ProductRegistry &iRegistry, eventsetup::ESRecordsToProxyIndices const &)
Definition: Schedule.cc:1193
std::vector< std::string > vstring
Definition: Schedule.cc:565
void setIsMergeable(BranchDescription &)
def move(src, dest)
Definition: eostools.py:511
#define constexpr
void closeOutputFiles()
Definition: Schedule.cc:1122
void endPaths(std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill the labels for all end paths in the process
Definition: Schedule.cc:1263
void moduleDescriptionsInPath(std::string const &iPathLabel, std::vector< ModuleDescription const * > &descriptions, unsigned int hint) const
Definition: Schedule.cc:1269
unsigned int id() const
std::string match(BranchDescription const &a, BranchDescription const &b, std::string const &fileName)