CMS 3D CMS Logo

Schedule.cc
Go to the documentation of this file.
2 
38 
39 #include <array>
40 #include <algorithm>
41 #include <cassert>
42 #include <cstdlib>
43 #include <functional>
44 #include <iomanip>
45 #include <list>
46 #include <map>
47 #include <set>
48 #include <exception>
49 #include <sstream>
50 
52 #include "processEDAliases.h"
53 
54 namespace edm {
55 
56  class Maker;
57 
58  namespace {
59  using std::placeholders::_1;
60 
61  bool binary_search_string(std::vector<std::string> const& v, std::string const& s) {
62  return std::binary_search(v.begin(), v.end(), s);
63  }
64 
65  // Here we make the trigger results inserter directly. This should
66  // probably be a utility in the WorkerRegistry or elsewhere.
67 
68  std::shared_ptr<TriggerResultInserter> makeInserter(
69  ParameterSet& proc_pset,
70  PreallocationConfiguration const& iPrealloc,
71  ProductRegistry& preg,
72  ExceptionToActionTable const& actions,
73  std::shared_ptr<ActivityRegistry> areg,
74  std::shared_ptr<ProcessConfiguration const> processConfiguration) {
75  ParameterSet* trig_pset = proc_pset.getPSetForUpdate("@trigger_paths");
76  trig_pset->registerIt();
77 
78  WorkerParams work_args(trig_pset, preg, &iPrealloc, processConfiguration, actions);
79  ModuleDescription md(trig_pset->id(),
80  "TriggerResultInserter",
81  "TriggerResults",
82  processConfiguration.get(),
84 
85  areg->preModuleConstructionSignal_(md);
86  bool postCalled = false;
87  std::shared_ptr<TriggerResultInserter> returnValue;
88  // Caught exception is rethrown
89  CMS_SA_ALLOW try {
90  maker::ModuleHolderT<TriggerResultInserter> holder(
91  make_shared_noexcept_false<TriggerResultInserter>(*trig_pset, iPrealloc.numberOfStreams()),
92  static_cast<Maker const*>(nullptr));
93  holder.setModuleDescription(md);
94  holder.registerProductsAndCallbacks(&preg);
95  returnValue = holder.module();
96  postCalled = true;
97  // if exception then post will be called in the catch block
98  areg->postModuleConstructionSignal_(md);
99  } catch (...) {
100  if (!postCalled) {
101  CMS_SA_ALLOW try { areg->postModuleConstructionSignal_(md); } catch (...) {
102  // If post throws an exception ignore it because we are already handling another exception
103  }
104  }
105  throw;
106  }
107  return returnValue;
108  }
109 
110  template <typename T>
111  void makePathStatusInserters(std::vector<edm::propagate_const<std::shared_ptr<T>>>& pathStatusInserters,
112  std::vector<std::string> const& pathNames,
113  PreallocationConfiguration const& iPrealloc,
114  ProductRegistry& preg,
115  std::shared_ptr<ActivityRegistry> areg,
116  std::shared_ptr<ProcessConfiguration const> processConfiguration,
117  std::string const& moduleTypeName) {
119  pset.addParameter<std::string>("@module_type", moduleTypeName);
120  pset.addParameter<std::string>("@module_edm_type", "EDProducer");
121  pset.registerIt();
122 
123  pathStatusInserters.reserve(pathNames.size());
124 
125  for (auto const& pathName : pathNames) {
126  ModuleDescription md(
127  pset.id(), moduleTypeName, pathName, processConfiguration.get(), ModuleDescription::getUniqueID());
128 
129  areg->preModuleConstructionSignal_(md);
130  bool postCalled = false;
131  // Caught exception is rethrown
132  CMS_SA_ALLOW try {
133  maker::ModuleHolderT<T> holder(make_shared_noexcept_false<T>(iPrealloc.numberOfStreams()),
134  static_cast<Maker const*>(nullptr));
135  holder.setModuleDescription(md);
136  holder.registerProductsAndCallbacks(&preg);
137  pathStatusInserters.emplace_back(holder.module());
138  postCalled = true;
139  // if exception then post will be called in the catch block
140  areg->postModuleConstructionSignal_(md);
141  } catch (...) {
142  if (!postCalled) {
143  CMS_SA_ALLOW try { areg->postModuleConstructionSignal_(md); } catch (...) {
144  // If post throws an exception ignore it because we are already handling another exception
145  }
146  }
147  throw;
148  }
149  }
150  }
151 
152  typedef std::vector<std::string> vstring;
153 
154  void processSwitchProducers(ParameterSet const& proc_pset, std::string const& processName, ProductRegistry& preg) {
155  // Update Switch BranchDescriptions for the chosen case
156  struct BranchesCases {
157  BranchesCases(std::vector<std::string> cases) : caseLabels{std::move(cases)} {}
158  std::vector<BranchKey> chosenBranches;
159  std::vector<std::string> caseLabels;
160  };
161  std::map<std::string, BranchesCases> switchMap;
162  for (auto& prod : preg.productListUpdator()) {
163  if (prod.second.isSwitchAlias()) {
164  auto it = switchMap.find(prod.second.moduleLabel());
165  if (it == switchMap.end()) {
166  auto const& switchPSet = proc_pset.getParameter<edm::ParameterSet>(prod.second.moduleLabel());
167  auto inserted = switchMap.emplace(prod.second.moduleLabel(),
168  switchPSet.getParameter<std::vector<std::string>>("@all_cases"));
169  assert(inserted.second);
170  it = inserted.first;
171  }
172 
173  bool found = false;
174  for (auto const& productIter : preg.productList()) {
175  BranchKey const& branchKey = productIter.first;
176  // The alias-for product must be in the same process as
177  // the SwitchProducer (earlier processes or SubProcesses
178  // may contain products with same type, module label, and
179  // instance name)
180  if (branchKey.processName() != processName) {
181  continue;
182  }
183 
184  BranchDescription const& desc = productIter.second;
185  if (desc.branchType() == prod.second.branchType() and
186  desc.unwrappedTypeID().typeInfo() == prod.second.unwrappedTypeID().typeInfo() and
187  branchKey.moduleLabel() == prod.second.switchAliasModuleLabel() and
188  branchKey.productInstanceName() == prod.second.productInstanceName()) {
189  prod.second.setSwitchAliasForBranch(desc);
190  it->second.chosenBranches.push_back(prod.first); // with moduleLabel of the Switch
191  found = true;
192  }
193  }
194  if (not found) {
196  ex << "Trying to find a BranchDescription to be aliased-for by SwitchProducer with\n"
197  << " friendly class name = " << prod.second.friendlyClassName() << "\n"
198  << " module label = " << prod.second.moduleLabel() << "\n"
199  << " product instance name = " << prod.second.productInstanceName() << "\n"
200  << " process name = " << processName
201  << "\n\nbut did not find any. Please contact a framework developer.";
202  ex.addContext("Calling Schedule.cc:processSwitchProducers()");
203  throw ex;
204  }
205  }
206  }
207  if (switchMap.empty())
208  return;
209 
210  for (auto& elem : switchMap) {
211  std::sort(elem.second.chosenBranches.begin(), elem.second.chosenBranches.end());
212  }
213 
214  auto addProductsToException = [&preg, &processName](auto const& caseLabels, edm::Exception& ex) {
215  std::map<std::string, std::vector<BranchKey>> caseBranches;
216  for (auto const& item : preg.productList()) {
217  if (item.first.processName() != processName)
218  continue;
219 
220  if (auto found = std::find(caseLabels.begin(), caseLabels.end(), item.first.moduleLabel());
221  found != caseLabels.end()) {
222  caseBranches[*found].push_back(item.first);
223  }
224  }
225 
226  for (auto const& caseLabel : caseLabels) {
227  ex << "Products for case " << caseLabel << " (friendly class name, product instance name):\n";
228  auto& branches = caseBranches[caseLabel];
229  std::sort(branches.begin(), branches.end());
230  for (auto const& branch : branches) {
231  ex << " " << branch.friendlyClassName() << " " << branch.productInstanceName() << "\n";
232  }
233  }
234  };
235 
236  // Check that non-chosen cases declare exactly the same branches
237  // Also set the alias-for branches to transient
238  std::vector<bool> foundBranches;
239  for (auto const& switchItem : switchMap) {
240  auto const& switchLabel = switchItem.first;
241  auto const& chosenBranches = switchItem.second.chosenBranches;
242  auto const& caseLabels = switchItem.second.caseLabels;
243  foundBranches.resize(chosenBranches.size());
244  for (auto const& caseLabel : caseLabels) {
245  std::fill(foundBranches.begin(), foundBranches.end(), false);
246  for (auto& nonConstItem : preg.productListUpdator()) {
247  auto const& item = nonConstItem;
248  if (item.first.moduleLabel() == caseLabel and item.first.processName() == processName) {
249  // Set the alias-for branch as transient so it gets fully ignored in output.
250  // I tried first to implicitly drop all branches with
251  // '@' in ProductSelector, but that gave problems on
252  // input (those branches would be implicitly dropped on
253  // input as well, leading to the SwitchProducer branches
254  // do be dropped as dependent ones, as the alias
255  // detection logic in RootFile says that the
256  // SwitchProducer branches are not alias branches)
257  nonConstItem.second.setTransient(true);
258 
259  auto range = std::equal_range(chosenBranches.begin(),
260  chosenBranches.end(),
261  BranchKey(item.first.friendlyClassName(),
262  switchLabel,
263  item.first.productInstanceName(),
264  item.first.processName()));
265  if (range.first == range.second) {
267  ex << "SwitchProducer " << switchLabel << " has a case " << caseLabel << " with a product "
268  << item.first << " that is not produced by the chosen case "
269  << proc_pset.getParameter<edm::ParameterSet>(switchLabel)
270  .getUntrackedParameter<std::string>("@chosen_case")
271  << ". If the intention is to produce only a subset of the products listed below, each case with "
272  "more products needs to be replaced with an EDAlias to only the necessary products, and the "
273  "EDProducer itself needs to be moved to a Task.\n\n";
274  addProductsToException(caseLabels, ex);
275  throw ex;
276  }
277  assert(std::distance(range.first, range.second) == 1);
278  foundBranches[std::distance(chosenBranches.begin(), range.first)] = true;
279 
280  // Check that there are no BranchAliases for any of the cases
281  auto const& bd = item.second;
282  if (not bd.branchAliases().empty()) {
284  << "SwitchProducer does not support ROOT branch aliases. Got the following ROOT branch "
285  "aliases for SwitchProducer with label "
286  << switchLabel << " for case " << caseLabel << ":";
287  for (auto const& branchAlias : bd.branchAliases()) {
288  ex << " " << branchAlias;
289  }
290  throw ex;
291  }
292  }
293  }
294 
295  for (size_t i = 0; i < chosenBranches.size(); i++) {
296  if (not foundBranches[i]) {
297  auto chosenLabel = proc_pset.getParameter<edm::ParameterSet>(switchLabel)
298  .getUntrackedParameter<std::string>("@chosen_case");
300  ex << "SwitchProducer " << switchLabel << " has a case " << caseLabel
301  << " that does not produce a product " << chosenBranches[i] << " that is produced by the chosen case "
302  << chosenLabel
303  << ". If the intention is to produce only a subset of the products listed below, each case with more "
304  "products needs to be replaced with an EDAlias to only the necessary products, and the "
305  "EDProducer itself needs to be moved to a Task.\n\n";
306  addProductsToException(caseLabels, ex);
307  throw ex;
308  }
309  }
310  }
311  }
312  }
313 
314  void reduceParameterSet(ParameterSet& proc_pset,
315  vstring const& end_path_name_list,
316  vstring& modulesInConfig,
317  std::set<std::string> const& usedModuleLabels,
318  std::map<std::string, std::vector<std::pair<std::string, int>>>& outputModulePathPositions) {
319  // Before calculating the ParameterSetID of the top level ParameterSet or
320  // saving it in the registry drop from the top level ParameterSet all
321  // OutputModules and EDAnalyzers not on trigger paths. If unscheduled
322  // production is not enabled also drop all the EDFilters and EDProducers
323  // that are not scheduled. Drop the ParameterSet used to configure the module
324  // itself. Also drop the other traces of these labels in the top level
325  // ParameterSet: Remove that labels from @all_modules and from all the
326  // end paths. If this makes any end paths empty, then remove the end path
327  // name from @end_paths, and @paths.
328 
329  // First make a list of labels to drop
330  vstring outputModuleLabels;
331  std::string edmType;
332  std::string const moduleEdmType("@module_edm_type");
333  std::string const outputModule("OutputModule");
334  std::string const edAnalyzer("EDAnalyzer");
335  std::string const edFilter("EDFilter");
336  std::string const edProducer("EDProducer");
337 
338  std::set<std::string> modulesInConfigSet(modulesInConfig.begin(), modulesInConfig.end());
339 
340  //need a list of all modules on paths in order to determine
341  // if an EDAnalyzer only appears on an end path
342  vstring scheduledPaths = proc_pset.getParameter<vstring>("@paths");
343  std::set<std::string> modulesOnPaths;
344  {
345  std::set<std::string> noEndPaths(scheduledPaths.begin(), scheduledPaths.end());
346  for (auto const& endPath : end_path_name_list) {
347  noEndPaths.erase(endPath);
348  }
349  {
350  vstring labels;
351  for (auto const& path : noEndPaths) {
352  labels = proc_pset.getParameter<vstring>(path);
353  modulesOnPaths.insert(labels.begin(), labels.end());
354  }
355  }
356  }
357  //Initially fill labelsToBeDropped with all module mentioned in
358  // the configuration but which are not being used by the system
359  std::vector<std::string> labelsToBeDropped;
360  labelsToBeDropped.reserve(modulesInConfigSet.size());
361  std::set_difference(modulesInConfigSet.begin(),
362  modulesInConfigSet.end(),
363  usedModuleLabels.begin(),
364  usedModuleLabels.end(),
365  std::back_inserter(labelsToBeDropped));
366 
367  const unsigned int sizeBeforeOutputModules = labelsToBeDropped.size();
368  for (auto const& modLabel : usedModuleLabels) {
369  // Do nothing for modules that do not have a ParameterSet. Modules of type
370  // PathStatusInserter and EndPathStatusInserter will not have a ParameterSet.
371  if (proc_pset.existsAs<ParameterSet>(modLabel)) {
372  edmType = proc_pset.getParameterSet(modLabel).getParameter<std::string>(moduleEdmType);
373  if (edmType == outputModule) {
374  outputModuleLabels.push_back(modLabel);
375  labelsToBeDropped.push_back(modLabel);
376  }
377  if (edmType == edAnalyzer) {
378  if (modulesOnPaths.end() == modulesOnPaths.find(modLabel)) {
379  labelsToBeDropped.push_back(modLabel);
380  }
381  }
382  }
383  }
384  //labelsToBeDropped must be sorted
385  std::inplace_merge(
386  labelsToBeDropped.begin(), labelsToBeDropped.begin() + sizeBeforeOutputModules, labelsToBeDropped.end());
387 
388  // drop the parameter sets used to configure the modules
389  for_all(labelsToBeDropped, std::bind(&ParameterSet::eraseOrSetUntrackedParameterSet, std::ref(proc_pset), _1));
390 
391  // drop the labels from @all_modules
392  vstring::iterator endAfterRemove =
393  std::remove_if(modulesInConfig.begin(),
394  modulesInConfig.end(),
395  std::bind(binary_search_string, std::ref(labelsToBeDropped), _1));
396  modulesInConfig.erase(endAfterRemove, modulesInConfig.end());
397  proc_pset.addParameter<vstring>(std::string("@all_modules"), modulesInConfig);
398 
399  // drop the labels from all end paths
400  vstring endPathsToBeDropped;
401  vstring labels;
402  for (vstring::const_iterator iEndPath = end_path_name_list.begin(), endEndPath = end_path_name_list.end();
403  iEndPath != endEndPath;
404  ++iEndPath) {
405  labels = proc_pset.getParameter<vstring>(*iEndPath);
406  vstring::iterator iSave = labels.begin();
407  vstring::iterator iBegin = labels.begin();
408 
409  for (vstring::iterator iLabel = labels.begin(), iEnd = labels.end(); iLabel != iEnd; ++iLabel) {
410  if (binary_search_string(labelsToBeDropped, *iLabel)) {
411  if (binary_search_string(outputModuleLabels, *iLabel)) {
412  outputModulePathPositions[*iLabel].emplace_back(*iEndPath, iSave - iBegin);
413  }
414  } else {
415  if (iSave != iLabel) {
416  iSave->swap(*iLabel);
417  }
418  ++iSave;
419  }
420  }
421  labels.erase(iSave, labels.end());
422  if (labels.empty()) {
423  // remove empty end paths and save their names
424  proc_pset.eraseSimpleParameter(*iEndPath);
425  endPathsToBeDropped.push_back(*iEndPath);
426  } else {
427  proc_pset.addParameter<vstring>(*iEndPath, labels);
428  }
429  }
430  sort_all(endPathsToBeDropped);
431 
432  // remove empty end paths from @paths
433  endAfterRemove = std::remove_if(scheduledPaths.begin(),
434  scheduledPaths.end(),
435  std::bind(binary_search_string, std::ref(endPathsToBeDropped), _1));
436  scheduledPaths.erase(endAfterRemove, scheduledPaths.end());
437  proc_pset.addParameter<vstring>(std::string("@paths"), scheduledPaths);
438 
439  // remove empty end paths from @end_paths
440  vstring scheduledEndPaths = proc_pset.getParameter<vstring>("@end_paths");
441  endAfterRemove = std::remove_if(scheduledEndPaths.begin(),
442  scheduledEndPaths.end(),
443  std::bind(binary_search_string, std::ref(endPathsToBeDropped), _1));
444  scheduledEndPaths.erase(endAfterRemove, scheduledEndPaths.end());
445  proc_pset.addParameter<vstring>(std::string("@end_paths"), scheduledEndPaths);
446  }
447 
448  class RngEDConsumer : public EDConsumerBase {
449  public:
450  explicit RngEDConsumer(std::set<TypeID>& typesConsumed) {
452  if (rng.isAvailable()) {
453  rng->consumes(consumesCollector());
454  for (auto const& consumesInfo : this->consumesInfo()) {
455  typesConsumed.emplace(consumesInfo.type());
456  }
457  }
458  }
459  };
460 
461  template <typename F>
462  auto doCleanup(F&& iF) {
463  auto wrapped = [f = std::move(iF)](std::exception_ptr const* iPtr, edm::WaitingTaskHolder iTask) {
464  CMS_SA_ALLOW try { f(); } catch (...) {
465  }
466  if (iPtr) {
467  iTask.doneWaiting(*iPtr);
468  }
469  };
470  return wrapped;
471  }
472  } // namespace
473  // -----------------------------
474 
475  typedef std::vector<std::string> vstring;
476 
477  // -----------------------------
478 
480  service::TriggerNamesService const& tns,
481  ProductRegistry& preg,
483  std::shared_ptr<ActivityRegistry> areg,
484  std::shared_ptr<ProcessConfiguration const> processConfiguration,
485  PreallocationConfiguration const& prealloc,
486  ProcessContext const* processContext)
487  : //Only create a resultsInserter if there is a trigger path
488  resultsInserter_{tns.getTrigPaths().empty()
489  ? std::shared_ptr<TriggerResultInserter>{}
490  : makeInserter(proc_pset, prealloc, preg, actions, areg, processConfiguration)},
491  moduleRegistry_(new ModuleRegistry()),
492  all_output_communicators_(),
493  preallocConfig_(prealloc),
494  pathNames_(&tns.getTrigPaths()),
495  endPathNames_(&tns.getEndPaths()),
496  wantSummary_(tns.wantSummary()) {
497  makePathStatusInserters(pathStatusInserters_,
498  *pathNames_,
499  prealloc,
500  preg,
501  areg,
502  processConfiguration,
503  std::string("PathStatusInserter"));
504 
505  makePathStatusInserters(endPathStatusInserters_,
506  *endPathNames_,
507  prealloc,
508  preg,
509  areg,
510  processConfiguration,
511  std::string("EndPathStatusInserter"));
512 
513  assert(0 < prealloc.numberOfStreams());
514  streamSchedules_.reserve(prealloc.numberOfStreams());
515  for (unsigned int i = 0; i < prealloc.numberOfStreams(); ++i) {
516  streamSchedules_.emplace_back(make_shared_noexcept_false<StreamSchedule>(resultsInserter(),
517  pathStatusInserters_,
518  endPathStatusInserters_,
519  moduleRegistry(),
520  proc_pset,
521  tns,
522  prealloc,
523  preg,
524  actions,
525  areg,
526  processConfiguration,
527  StreamID{i},
528  processContext));
529  }
530 
531  //TriggerResults are injected automatically by StreamSchedules and are
532  // unknown to the ModuleRegistry
533  const std::string kTriggerResults("TriggerResults");
534  std::vector<std::string> modulesToUse;
535  modulesToUse.reserve(streamSchedules_[0]->allWorkers().size());
536  for (auto const& worker : streamSchedules_[0]->allWorkers()) {
537  if (worker->description()->moduleLabel() != kTriggerResults) {
538  modulesToUse.push_back(worker->description()->moduleLabel());
539  }
540  }
541  //The unscheduled modules are at the end of the list, but we want them at the front
542  unsigned int const nUnscheduledModules = streamSchedules_[0]->numberOfUnscheduledModules();
543  if (nUnscheduledModules > 0) {
544  std::vector<std::string> temp;
545  temp.reserve(modulesToUse.size());
546  auto itBeginUnscheduled = modulesToUse.begin() + modulesToUse.size() - nUnscheduledModules;
547  std::copy(itBeginUnscheduled, modulesToUse.end(), std::back_inserter(temp));
548  std::copy(modulesToUse.begin(), itBeginUnscheduled, std::back_inserter(temp));
549  temp.swap(modulesToUse);
550  }
551 
552  // propagate_const<T> has no reset() function
553  globalSchedule_ = std::make_unique<GlobalSchedule>(resultsInserter(),
554  pathStatusInserters_,
555  endPathStatusInserters_,
556  moduleRegistry(),
557  modulesToUse,
558  proc_pset,
559  preg,
560  prealloc,
561  actions,
562  areg,
563  processConfiguration,
564  processContext);
565  }
566 
568  service::TriggerNamesService const& tns,
569  ProductRegistry& preg,
570  BranchIDListHelper& branchIDListHelper,
571  ProcessBlockHelperBase& processBlockHelper,
572  ThinnedAssociationsHelper& thinnedAssociationsHelper,
573  SubProcessParentageHelper const* subProcessParentageHelper,
574  std::shared_ptr<ActivityRegistry> areg,
575  std::shared_ptr<ProcessConfiguration> processConfiguration,
576  bool hasSubprocesses,
577  PreallocationConfiguration const& prealloc,
578  ProcessContext const* processContext) {
579  //TriggerResults is not in the top level ParameterSet so the call to
580  // reduceParameterSet would fail to find it. Just remove it up front.
581  const std::string kTriggerResults("TriggerResults");
582 
583  std::set<std::string> usedModuleLabels;
584  for (auto const& worker : allWorkers()) {
585  if (worker->description()->moduleLabel() != kTriggerResults) {
586  usedModuleLabels.insert(worker->description()->moduleLabel());
587  }
588  }
589  std::vector<std::string> modulesInConfig(proc_pset.getParameter<std::vector<std::string>>("@all_modules"));
590  std::map<std::string, std::vector<std::pair<std::string, int>>> outputModulePathPositions;
591  reduceParameterSet(proc_pset, tns.getEndPaths(), modulesInConfig, usedModuleLabels, outputModulePathPositions);
592  {
593  std::vector<std::string> aliases = proc_pset.getParameter<std::vector<std::string>>("@all_aliases");
594  detail::processEDAliases(aliases, {}, proc_pset, processConfiguration->processName(), preg);
595  }
596 
597  // At this point all BranchDescriptions are created. Mark now the
598  // ones of unscheduled workers to be on-demand.
599  {
600  auto const& unsched = streamSchedules_[0]->unscheduledWorkers();
601  if (not unsched.empty()) {
602  std::set<std::string> unscheduledModules;
603  std::transform(unsched.begin(),
604  unsched.end(),
605  std::insert_iterator<std::set<std::string>>(unscheduledModules, unscheduledModules.begin()),
606  [](auto worker) { return worker->description()->moduleLabel(); });
607  preg.setUnscheduledProducts(unscheduledModules);
608  }
609  }
610 
611  processSwitchProducers(proc_pset, processConfiguration->processName(), preg);
612  proc_pset.registerIt();
613  processConfiguration->setParameterSetID(proc_pset.id());
614  processConfiguration->setProcessConfigurationID();
615 
616  // This is used for a little sanity-check to make sure no code
617  // modifications alter the number of workers at a later date.
618  size_t all_workers_count = allWorkers().size();
619 
620  moduleRegistry_->forAllModuleHolders([this](maker::ModuleHolder* iHolder) {
621  auto comm = iHolder->createOutputModuleCommunicator();
622  if (comm) {
623  all_output_communicators_.emplace_back(std::shared_ptr<OutputModuleCommunicator>{comm.release()});
624  }
625  });
626  // Now that the output workers are filled in, set any output limits or information.
627  limitOutput(proc_pset, branchIDListHelper.branchIDLists(), subProcessParentageHelper);
628 
629  // Sanity check: make sure nobody has added a worker after we've
630  // already relied on the WorkerManager being full.
631  assert(all_workers_count == allWorkers().size());
632 
633  branchIDListHelper.updateFromRegistry(preg);
634 
635  for (auto const& worker : streamSchedules_[0]->allWorkers()) {
636  worker->registerThinnedAssociations(preg, thinnedAssociationsHelper);
637  }
638 
639  processBlockHelper.updateForNewProcess(preg, processConfiguration->processName());
640 
641  // The output modules consume products in kept branches.
642  // So we must set this up before freezing.
643  for (auto& c : all_output_communicators_) {
644  c->selectProducts(preg, thinnedAssociationsHelper, processBlockHelper);
645  }
646 
647  for (auto& product : preg.productListUpdator()) {
648  setIsMergeable(product.second);
649  }
650 
651  {
652  // We now get a collection of types that may be consumed.
653  std::set<TypeID> productTypesConsumed;
654  std::set<TypeID> elementTypesConsumed;
655  // Loop over all modules
656  for (auto const& worker : allWorkers()) {
657  for (auto const& consumesInfo : worker->consumesInfo()) {
658  if (consumesInfo.kindOfType() == PRODUCT_TYPE) {
659  productTypesConsumed.emplace(consumesInfo.type());
660  } else {
661  elementTypesConsumed.emplace(consumesInfo.type());
662  }
663  }
664  }
665  // The SubProcess class is not a module, yet it may consume.
666  if (hasSubprocesses) {
667  productTypesConsumed.emplace(typeid(TriggerResults));
668  }
669  // The RandomNumberGeneratorService is not a module, yet it consumes.
670  { RngEDConsumer rngConsumer = RngEDConsumer(productTypesConsumed); }
671  preg.setFrozen(productTypesConsumed, elementTypesConsumed, processConfiguration->processName());
672  }
673 
674  for (auto& c : all_output_communicators_) {
675  c->setEventSelectionInfo(outputModulePathPositions, preg.anyProductProduced());
676  }
677 
678  if (wantSummary_) {
679  std::vector<const ModuleDescription*> modDesc;
680  const auto& workers = allWorkers();
681  modDesc.reserve(workers.size());
682 
683  std::transform(workers.begin(),
684  workers.end(),
685  std::back_inserter(modDesc),
686  [](const Worker* iWorker) -> const ModuleDescription* { return iWorker->description(); });
687 
688  // propagate_const<T> has no reset() function
689  summaryTimeKeeper_ = std::make_unique<SystemTimeKeeper>(prealloc.numberOfStreams(), modDesc, tns, processContext);
690  auto timeKeeperPtr = summaryTimeKeeper_.get();
691 
692  areg->watchPreModuleDestruction(timeKeeperPtr, &SystemTimeKeeper::removeModuleIfExists);
693 
694  areg->watchPreModuleEvent(timeKeeperPtr, &SystemTimeKeeper::startModuleEvent);
695  areg->watchPostModuleEvent(timeKeeperPtr, &SystemTimeKeeper::stopModuleEvent);
696  areg->watchPreModuleEventAcquire(timeKeeperPtr, &SystemTimeKeeper::restartModuleEvent);
697  areg->watchPostModuleEventAcquire(timeKeeperPtr, &SystemTimeKeeper::stopModuleEvent);
698  areg->watchPreModuleEventDelayedGet(timeKeeperPtr, &SystemTimeKeeper::pauseModuleEvent);
699  areg->watchPostModuleEventDelayedGet(timeKeeperPtr, &SystemTimeKeeper::restartModuleEvent);
700 
701  areg->watchPreSourceEvent(timeKeeperPtr, &SystemTimeKeeper::startEvent);
702  areg->watchPostEvent(timeKeeperPtr, &SystemTimeKeeper::stopEvent);
703 
704  areg->watchPrePathEvent(timeKeeperPtr, &SystemTimeKeeper::startPath);
705  areg->watchPostPathEvent(timeKeeperPtr, &SystemTimeKeeper::stopPath);
706 
707  areg->watchPostBeginJob(timeKeeperPtr, &SystemTimeKeeper::startProcessingLoop);
708  areg->watchPreEndJob(timeKeeperPtr, &SystemTimeKeeper::stopProcessingLoop);
709  //areg->preModuleEventSignal_.connect([timeKeeperPtr](StreamContext const& iContext, ModuleCallingContext const& iMod) {
710  //timeKeeperPtr->startModuleEvent(iContext,iMod);
711  //});
712  }
713 
714  } // Schedule::Schedule
715 
716  void Schedule::limitOutput(ParameterSet const& proc_pset,
717  BranchIDLists const& branchIDLists,
718  SubProcessParentageHelper const* subProcessParentageHelper) {
719  std::string const output("output");
720 
721  ParameterSet const& maxEventsPSet = proc_pset.getUntrackedParameterSet("maxEvents");
722  int maxEventSpecs = 0;
723  int maxEventsOut = -1;
724  ParameterSet const* vMaxEventsOut = nullptr;
725  std::vector<std::string> intNamesE = maxEventsPSet.getParameterNamesForType<int>(false);
726  if (search_all(intNamesE, output)) {
727  maxEventsOut = maxEventsPSet.getUntrackedParameter<int>(output);
728  ++maxEventSpecs;
729  }
730  std::vector<std::string> psetNamesE;
731  maxEventsPSet.getParameterSetNames(psetNamesE, false);
732  if (search_all(psetNamesE, output)) {
733  vMaxEventsOut = &maxEventsPSet.getUntrackedParameterSet(output);
734  ++maxEventSpecs;
735  }
736 
737  if (maxEventSpecs > 1) {
739  << "\nAt most, one form of 'output' may appear in the 'maxEvents' parameter set";
740  }
741 
742  for (auto& c : all_output_communicators_) {
743  OutputModuleDescription desc(branchIDLists, maxEventsOut, subProcessParentageHelper);
744  if (vMaxEventsOut != nullptr && !vMaxEventsOut->empty()) {
745  std::string const& moduleLabel = c->description().moduleLabel();
746  try {
747  desc.maxEvents_ = vMaxEventsOut->getUntrackedParameter<int>(moduleLabel);
748  } catch (Exception const&) {
750  << "\nNo entry in 'maxEvents' for output module label '" << moduleLabel << "'.\n";
751  }
752  }
753  c->configure(desc);
754  }
755  }
756 
757  bool Schedule::terminate() const {
758  if (all_output_communicators_.empty()) {
759  return false;
760  }
761  for (auto& c : all_output_communicators_) {
762  if (!c->limitReached()) {
763  // Found an output module that has not reached output event count.
764  return false;
765  }
766  }
767  LogInfo("SuccessfulTermination") << "The job is terminating successfully because each output module\n"
768  << "has reached its configured limit.\n";
769  return true;
770  }
771 
773  globalSchedule_->endJob(collector);
774  if (collector.hasThrown()) {
775  return;
776  }
777 
778  if (wantSummary_ == false)
779  return;
780  {
781  TriggerReport tr;
782  getTriggerReport(tr);
783 
784  // The trigger report (pass/fail etc.):
785 
786  LogFwkVerbatim("FwkSummary") << "";
787  if (streamSchedules_[0]->context().processContext()->isSubProcess()) {
788  LogFwkVerbatim("FwkSummary") << "TrigReport Process: "
789  << streamSchedules_[0]->context().processContext()->processName();
790  }
791  LogFwkVerbatim("FwkSummary") << "TrigReport "
792  << "---------- Event Summary ------------";
793  if (!tr.trigPathSummaries.empty()) {
794  LogFwkVerbatim("FwkSummary") << "TrigReport"
795  << " Events total = " << tr.eventSummary.totalEvents
796  << " passed = " << tr.eventSummary.totalEventsPassed
797  << " failed = " << tr.eventSummary.totalEventsFailed << "";
798  } else {
799  LogFwkVerbatim("FwkSummary") << "TrigReport"
800  << " Events total = " << tr.eventSummary.totalEvents
801  << " passed = " << tr.eventSummary.totalEvents << " failed = 0";
802  }
803 
804  LogFwkVerbatim("FwkSummary") << "";
805  LogFwkVerbatim("FwkSummary") << "TrigReport "
806  << "---------- Path Summary ------------";
807  LogFwkVerbatim("FwkSummary") << "TrigReport " << std::right << std::setw(10) << "Trig Bit#"
808  << " " << std::right << std::setw(10) << "Executed"
809  << " " << std::right << std::setw(10) << "Passed"
810  << " " << std::right << std::setw(10) << "Failed"
811  << " " << std::right << std::setw(10) << "Error"
812  << " "
813  << "Name"
814  << "";
815  for (auto const& p : tr.trigPathSummaries) {
816  LogFwkVerbatim("FwkSummary") << "TrigReport " << std::right << std::setw(5) << 1 << std::right << std::setw(5)
817  << p.bitPosition << " " << std::right << std::setw(10) << p.timesRun << " "
818  << std::right << std::setw(10) << p.timesPassed << " " << std::right
819  << std::setw(10) << p.timesFailed << " " << std::right << std::setw(10)
820  << p.timesExcept << " " << p.name << "";
821  }
822 
823  /*
824  std::vector<int>::const_iterator epi = empty_trig_paths_.begin();
825  std::vector<int>::const_iterator epe = empty_trig_paths_.end();
826  std::vector<std::string>::const_iterator epn = empty_trig_path_names_.begin();
827  for (; epi != epe; ++epi, ++epn) {
828 
829  LogFwkVerbatim("FwkSummary") << "TrigReport "
830  << std::right << std::setw(5) << 1
831  << std::right << std::setw(5) << *epi << " "
832  << std::right << std::setw(10) << totalEvents() << " "
833  << std::right << std::setw(10) << totalEvents() << " "
834  << std::right << std::setw(10) << 0 << " "
835  << std::right << std::setw(10) << 0 << " "
836  << *epn << "";
837  }
838  */
839 
840  LogFwkVerbatim("FwkSummary") << "";
841  LogFwkVerbatim("FwkSummary") << "TrigReport "
842  << "-------End-Path Summary ------------";
843  LogFwkVerbatim("FwkSummary") << "TrigReport " << std::right << std::setw(10) << "Trig Bit#"
844  << " " << std::right << std::setw(10) << "Executed"
845  << " " << std::right << std::setw(10) << "Passed"
846  << " " << std::right << std::setw(10) << "Failed"
847  << " " << std::right << std::setw(10) << "Error"
848  << " "
849  << "Name"
850  << "";
851  for (auto const& p : tr.endPathSummaries) {
852  LogFwkVerbatim("FwkSummary") << "TrigReport " << std::right << std::setw(5) << 0 << std::right << std::setw(5)
853  << p.bitPosition << " " << std::right << std::setw(10) << p.timesRun << " "
854  << std::right << std::setw(10) << p.timesPassed << " " << std::right
855  << std::setw(10) << p.timesFailed << " " << std::right << std::setw(10)
856  << p.timesExcept << " " << p.name << "";
857  }
858 
859  for (auto const& p : tr.trigPathSummaries) {
860  LogFwkVerbatim("FwkSummary") << "";
861  LogFwkVerbatim("FwkSummary") << "TrigReport "
862  << "---------- Modules in Path: " << p.name << " ------------";
863  LogFwkVerbatim("FwkSummary") << "TrigReport " << std::right << std::setw(10) << "Trig Bit#"
864  << " " << std::right << std::setw(10) << "Visited"
865  << " " << std::right << std::setw(10) << "Passed"
866  << " " << std::right << std::setw(10) << "Failed"
867  << " " << std::right << std::setw(10) << "Error"
868  << " "
869  << "Name"
870  << "";
871 
872  for (auto const& mod : p.moduleInPathSummaries) {
873  LogFwkVerbatim("FwkSummary") << "TrigReport " << std::right << std::setw(5) << 1 << std::right << std::setw(5)
874  << mod.bitPosition << " " << std::right << std::setw(10) << mod.timesVisited
875  << " " << std::right << std::setw(10) << mod.timesPassed << " " << std::right
876  << std::setw(10) << mod.timesFailed << " " << std::right << std::setw(10)
877  << mod.timesExcept << " " << mod.moduleLabel << "";
878  }
879  }
880 
881  for (auto const& p : tr.endPathSummaries) {
882  LogFwkVerbatim("FwkSummary") << "";
883  LogFwkVerbatim("FwkSummary") << "TrigReport "
884  << "------ Modules in End-Path: " << p.name << " ------------";
885  LogFwkVerbatim("FwkSummary") << "TrigReport " << std::right << std::setw(10) << "Trig Bit#"
886  << " " << std::right << std::setw(10) << "Visited"
887  << " " << std::right << std::setw(10) << "Passed"
888  << " " << std::right << std::setw(10) << "Failed"
889  << " " << std::right << std::setw(10) << "Error"
890  << " "
891  << "Name"
892  << "";
893 
894  unsigned int bitpos = 0;
895  for (auto const& mod : p.moduleInPathSummaries) {
896  LogFwkVerbatim("FwkSummary") << "TrigReport " << std::right << std::setw(5) << 0 << std::right << std::setw(5)
897  << bitpos << " " << std::right << std::setw(10) << mod.timesVisited << " "
898  << std::right << std::setw(10) << mod.timesPassed << " " << std::right
899  << std::setw(10) << mod.timesFailed << " " << std::right << std::setw(10)
900  << mod.timesExcept << " " << mod.moduleLabel << "";
901  ++bitpos;
902  }
903  }
904 
905  LogFwkVerbatim("FwkSummary") << "";
906  LogFwkVerbatim("FwkSummary") << "TrigReport "
907  << "---------- Module Summary ------------";
908  LogFwkVerbatim("FwkSummary") << "TrigReport " << std::right << std::setw(10) << "Visited"
909  << " " << std::right << std::setw(10) << "Executed"
910  << " " << std::right << std::setw(10) << "Passed"
911  << " " << std::right << std::setw(10) << "Failed"
912  << " " << std::right << std::setw(10) << "Error"
913  << " "
914  << "Name"
915  << "";
916  for (auto const& worker : tr.workerSummaries) {
917  LogFwkVerbatim("FwkSummary") << "TrigReport " << std::right << std::setw(10) << worker.timesVisited << " "
918  << std::right << std::setw(10) << worker.timesRun << " " << std::right
919  << std::setw(10) << worker.timesPassed << " " << std::right << std::setw(10)
920  << worker.timesFailed << " " << std::right << std::setw(10) << worker.timesExcept
921  << " " << worker.moduleLabel << "";
922  }
923  LogFwkVerbatim("FwkSummary") << "";
924  }
925  // The timing report (CPU and Real Time):
928 
929  const int totalEvents = std::max(1, tr.eventSummary.totalEvents);
930 
931  LogFwkVerbatim("FwkSummary") << "TimeReport "
932  << "---------- Event Summary ---[sec]----";
933  LogFwkVerbatim("FwkSummary") << "TimeReport" << std::setprecision(6) << std::fixed
934  << " event loop CPU/event = " << tr.eventSummary.cpuTime / totalEvents;
935  LogFwkVerbatim("FwkSummary") << "TimeReport" << std::setprecision(6) << std::fixed
936  << " event loop Real/event = " << tr.eventSummary.realTime / totalEvents;
937  LogFwkVerbatim("FwkSummary") << "TimeReport" << std::setprecision(6) << std::fixed
938  << " sum Streams Real/event = " << tr.eventSummary.sumStreamRealTime / totalEvents;
939  LogFwkVerbatim("FwkSummary") << "TimeReport" << std::setprecision(6) << std::fixed
940  << " efficiency CPU/Real/thread = "
943 
944  constexpr int kColumn1Size = 10;
945  constexpr int kColumn2Size = 12;
946  constexpr int kColumn3Size = 12;
947  LogFwkVerbatim("FwkSummary") << "";
948  LogFwkVerbatim("FwkSummary") << "TimeReport "
949  << "---------- Path Summary ---[Real sec]----";
950  LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
951  << " " << std::right << std::setw(kColumn2Size) << "per exec"
952  << " Name";
953  for (auto const& p : tr.trigPathSummaries) {
954  const int timesRun = std::max(1, p.timesRun);
955  LogFwkVerbatim("FwkSummary") << "TimeReport " << std::setprecision(6) << std::fixed << std::right
956  << std::setw(kColumn1Size) << p.realTime / totalEvents << " " << std::right
957  << std::setw(kColumn2Size) << p.realTime / timesRun << " " << p.name << "";
958  }
959  LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
960  << " " << std::right << std::setw(kColumn2Size) << "per exec"
961  << " Name"
962  << "";
963 
964  LogFwkVerbatim("FwkSummary") << "";
965  LogFwkVerbatim("FwkSummary") << "TimeReport "
966  << "-------End-Path Summary ---[Real sec]----";
967  LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
968  << " " << std::right << std::setw(kColumn2Size) << "per exec"
969  << " Name"
970  << "";
971  for (auto const& p : tr.endPathSummaries) {
972  const int timesRun = std::max(1, p.timesRun);
973 
974  LogFwkVerbatim("FwkSummary") << "TimeReport " << std::setprecision(6) << std::fixed << std::right
975  << std::setw(kColumn1Size) << p.realTime / totalEvents << " " << std::right
976  << std::setw(kColumn2Size) << p.realTime / timesRun << " " << p.name << "";
977  }
978  LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
979  << " " << std::right << std::setw(kColumn2Size) << "per exec"
980  << " Name"
981  << "";
982 
983  for (auto const& p : tr.trigPathSummaries) {
984  LogFwkVerbatim("FwkSummary") << "";
985  LogFwkVerbatim("FwkSummary") << "TimeReport "
986  << "---------- Modules in Path: " << p.name << " ---[Real sec]----";
987  LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
988  << " " << std::right << std::setw(kColumn2Size) << "per visit"
989  << " Name"
990  << "";
991  for (auto const& mod : p.moduleInPathSummaries) {
992  LogFwkVerbatim("FwkSummary") << "TimeReport " << std::setprecision(6) << std::fixed << std::right
993  << std::setw(kColumn1Size) << mod.realTime / totalEvents << " " << std::right
994  << std::setw(kColumn2Size) << mod.realTime / std::max(1, mod.timesVisited) << " "
995  << mod.moduleLabel << "";
996  }
997  }
998  if (not tr.trigPathSummaries.empty()) {
999  LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
1000  << " " << std::right << std::setw(kColumn2Size) << "per visit"
1001  << " Name"
1002  << "";
1003  }
1004  for (auto const& p : tr.endPathSummaries) {
1005  LogFwkVerbatim("FwkSummary") << "";
1006  LogFwkVerbatim("FwkSummary") << "TimeReport "
1007  << "------ Modules in End-Path: " << p.name << " ---[Real sec]----";
1008  LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
1009  << " " << std::right << std::setw(kColumn2Size) << "per visit"
1010  << " Name"
1011  << "";
1012  for (auto const& mod : p.moduleInPathSummaries) {
1013  LogFwkVerbatim("FwkSummary") << "TimeReport " << std::setprecision(6) << std::fixed << std::right
1014  << std::setw(kColumn1Size) << mod.realTime / totalEvents << " " << std::right
1015  << std::setw(kColumn2Size) << mod.realTime / std::max(1, mod.timesVisited) << " "
1016  << mod.moduleLabel << "";
1017  }
1018  }
1019  if (not tr.endPathSummaries.empty()) {
1020  LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
1021  << " " << std::right << std::setw(kColumn2Size) << "per visit"
1022  << " Name"
1023  << "";
1024  }
1025  LogFwkVerbatim("FwkSummary") << "";
1026  LogFwkVerbatim("FwkSummary") << "TimeReport "
1027  << "---------- Module Summary ---[Real sec]----";
1028  LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
1029  << " " << std::right << std::setw(kColumn2Size) << "per exec"
1030  << " " << std::right << std::setw(kColumn3Size) << "per visit"
1031  << " Name"
1032  << "";
1033  for (auto const& worker : tr.workerSummaries) {
1034  LogFwkVerbatim("FwkSummary") << "TimeReport " << std::setprecision(6) << std::fixed << std::right
1035  << std::setw(kColumn1Size) << worker.realTime / totalEvents << " " << std::right
1036  << std::setw(kColumn2Size) << worker.realTime / std::max(1, worker.timesRun) << " "
1037  << std::right << std::setw(kColumn3Size)
1038  << worker.realTime / std::max(1, worker.timesVisited) << " " << worker.moduleLabel
1039  << "";
1040  }
1041  LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
1042  << " " << std::right << std::setw(kColumn2Size) << "per exec"
1043  << " " << std::right << std::setw(kColumn3Size) << "per visit"
1044  << " Name"
1045  << "";
1046 
1047  LogFwkVerbatim("FwkSummary") << "";
1048  LogFwkVerbatim("FwkSummary") << "T---Report end!"
1049  << "";
1050  LogFwkVerbatim("FwkSummary") << "";
1051  }
1052 
1054  using std::placeholders::_1;
1056  for (auto& worker : allWorkers()) {
1057  worker->respondToCloseOutputFile();
1058  }
1059  }
1060 
1062  using std::placeholders::_1;
1063  for_all(all_output_communicators_, std::bind(&OutputModuleCommunicator::openFile, _1, std::cref(fb)));
1064  }
1065 
1067  RunPrincipal const& rp,
1068  ProcessContext const* processContext,
1069  ActivityRegistry* activityRegistry,
1070  MergeableRunProductMetadata const* mergeableRunProductMetadata) {
1073  LuminosityBlockID(rp.run(), 0),
1074  rp.index(),
1076  rp.endTime(),
1077  processContext);
1078 
1079  using namespace edm::waiting_task;
1080  chain::first([&](auto nextTask) {
1081  //services can depend on other services
1083 
1084  // Propagating the exception would be nontrivial, and signal actions are not supposed to throw exceptions
1085  CMS_SA_ALLOW try { activityRegistry->preGlobalWriteRunSignal_(globalContext); } catch (...) {
1086  }
1087  for (auto& c : all_output_communicators_) {
1088  c->writeRunAsync(nextTask, rp, processContext, activityRegistry, mergeableRunProductMetadata);
1089  }
1090  }) | chain::then(doCleanup([activityRegistry, globalContext, token]() {
1091  //services can depend on other services
1093 
1094  activityRegistry->postGlobalWriteRunSignal_(globalContext);
1095  })) |
1097  }
1098 
1100  ProcessBlockPrincipal const& pbp,
1101  ProcessContext const* processContext,
1102  ActivityRegistry* activityRegistry) {
1109  processContext);
1110 
1111  using namespace edm::waiting_task;
1112  chain::first([&](auto nextTask) {
1113  // Propagating the exception would be nontrivial, and signal actions are not supposed to throw exceptions
1115  CMS_SA_ALLOW try { activityRegistry->preWriteProcessBlockSignal_(globalContext); } catch (...) {
1116  }
1117  for (auto& c : all_output_communicators_) {
1118  c->writeProcessBlockAsync(nextTask, pbp, processContext, activityRegistry);
1119  }
1120  }) | chain::then(doCleanup([activityRegistry, globalContext, token]() {
1121  //services can depend on other services
1123 
1124  activityRegistry->postWriteProcessBlockSignal_(globalContext);
1125  })) |
1127  }
1128 
1130  LuminosityBlockPrincipal const& lbp,
1131  ProcessContext const* processContext,
1132  ActivityRegistry* activityRegistry) {
1135  lbp.id(),
1136  lbp.runPrincipal().index(),
1137  lbp.index(),
1138  lbp.beginTime(),
1139  processContext);
1140 
1141  using namespace edm::waiting_task;
1142  chain::first([&](auto nextTask) {
1144  CMS_SA_ALLOW try { activityRegistry->preGlobalWriteLumiSignal_(globalContext); } catch (...) {
1145  }
1146  for (auto& c : all_output_communicators_) {
1147  c->writeLumiAsync(nextTask, lbp, processContext, activityRegistry);
1148  }
1149  }) | chain::then(doCleanup([activityRegistry, globalContext, token]() {
1150  //services can depend on other services
1152 
1153  activityRegistry->postGlobalWriteLumiSignal_(globalContext);
1154  })) |
1156  }
1157 
1159  using std::placeholders::_1;
1160  // Return true iff at least one output module returns true.
1161  return (std::find_if(all_output_communicators_.begin(),
1165  }
1166 
1168  using std::placeholders::_1;
1169  for_all(allWorkers(), std::bind(&Worker::respondToOpenInputFile, _1, std::cref(fb)));
1170  }
1171 
1173  using std::placeholders::_1;
1174  for_all(allWorkers(), std::bind(&Worker::respondToCloseInputFile, _1, std::cref(fb)));
1175  }
1176 
1177  void Schedule::beginJob(ProductRegistry const& iRegistry,
1178  eventsetup::ESRecordsToProxyIndices const& iESIndices,
1179  ProcessBlockHelperBase const& processBlockHelperBase) {
1180  globalSchedule_->beginJob(iRegistry, iESIndices, processBlockHelperBase);
1181  }
1182 
1183  void Schedule::beginStream(unsigned int iStreamID) {
1184  assert(iStreamID < streamSchedules_.size());
1185  streamSchedules_[iStreamID]->beginStream();
1186  }
1187 
1188  void Schedule::endStream(unsigned int iStreamID) {
1189  assert(iStreamID < streamSchedules_.size());
1190  streamSchedules_[iStreamID]->endStream();
1191  }
1192 
1194  unsigned int iStreamID,
1196  ServiceToken const& token) {
1197  assert(iStreamID < streamSchedules_.size());
1198  streamSchedules_[iStreamID]->processOneEventAsync(std::move(iTask), info, token, pathStatusInserters_);
1199  }
1200 
1202  ParameterSet const& iPSet,
1203  const ProductRegistry& iRegistry,
1204  eventsetup::ESRecordsToProxyIndices const& iIndices) {
1205  Worker* found = nullptr;
1206  for (auto const& worker : allWorkers()) {
1207  if (worker->description()->moduleLabel() == iLabel) {
1208  found = worker;
1209  break;
1210  }
1211  }
1212  if (nullptr == found) {
1213  return false;
1214  }
1215 
1216  auto newMod = moduleRegistry_->replaceModule(iLabel, iPSet, preallocConfig_);
1217 
1218  globalSchedule_->replaceModule(newMod, iLabel);
1219 
1220  for (auto& s : streamSchedules_) {
1221  s->replaceModule(newMod, iLabel);
1222  }
1223 
1224  {
1225  //Need to updateLookup in order to make getByToken work
1226  auto const processBlockLookup = iRegistry.productLookup(InProcess);
1227  auto const runLookup = iRegistry.productLookup(InRun);
1228  auto const lumiLookup = iRegistry.productLookup(InLumi);
1229  auto const eventLookup = iRegistry.productLookup(InEvent);
1230  found->updateLookup(InProcess, *runLookup);
1231  found->updateLookup(InRun, *runLookup);
1232  found->updateLookup(InLumi, *lumiLookup);
1233  found->updateLookup(InEvent, *eventLookup);
1234  found->updateLookup(iIndices);
1235 
1236  auto const& processName = newMod->moduleDescription().processName();
1237  auto const& processBlockModuleToIndicies = processBlockLookup->indiciesForModulesInProcess(processName);
1238  auto const& runModuleToIndicies = runLookup->indiciesForModulesInProcess(processName);
1239  auto const& lumiModuleToIndicies = lumiLookup->indiciesForModulesInProcess(processName);
1240  auto const& eventModuleToIndicies = eventLookup->indiciesForModulesInProcess(processName);
1241  found->resolvePutIndicies(InProcess, processBlockModuleToIndicies);
1242  found->resolvePutIndicies(InRun, runModuleToIndicies);
1243  found->resolvePutIndicies(InLumi, lumiModuleToIndicies);
1244  found->resolvePutIndicies(InEvent, eventModuleToIndicies);
1245  }
1246 
1247  return true;
1248  }
1249 
1251  globalSchedule_->deleteModule(iLabel);
1252  for (auto& stream : streamSchedules_) {
1253  stream->deleteModule(iLabel);
1254  }
1255  moduleRegistry_->deleteModule(iLabel, areg->preModuleDestructionSignal_, areg->postModuleDestructionSignal_);
1256  }
1257 
1258  void Schedule::initializeEarlyDelete(std::vector<std::string> const& branchesToDeleteEarly,
1259  std::multimap<std::string, std::string> const& referencesToBranches,
1260  std::vector<std::string> const& modulesToSkip,
1261  edm::ProductRegistry const& preg) {
1262  for (auto& stream : streamSchedules_) {
1263  stream->initializeEarlyDelete(
1264  *moduleRegistry(), branchesToDeleteEarly, referencesToBranches, modulesToSkip, preg);
1265  }
1266  }
1267 
1268  std::vector<ModuleDescription const*> Schedule::getAllModuleDescriptions() const {
1269  std::vector<ModuleDescription const*> result;
1270  result.reserve(allWorkers().size());
1271 
1272  for (auto const& worker : allWorkers()) {
1273  ModuleDescription const* p = worker->description();
1274  result.push_back(p);
1275  }
1276  return result;
1277  }
1278 
1279  Schedule::AllWorkers const& Schedule::allWorkers() const { return globalSchedule_->allWorkers(); }
1280 
1282  for (auto const& worker : allWorkers()) {
1283  worker->convertCurrentProcessAlias(processName);
1284  }
1285  }
1286 
1287  void Schedule::availablePaths(std::vector<std::string>& oLabelsToFill) const {
1288  streamSchedules_[0]->availablePaths(oLabelsToFill);
1289  }
1290 
1291  void Schedule::triggerPaths(std::vector<std::string>& oLabelsToFill) const { oLabelsToFill = *pathNames_; }
1292 
1293  void Schedule::endPaths(std::vector<std::string>& oLabelsToFill) const { oLabelsToFill = *endPathNames_; }
1294 
1295  void Schedule::modulesInPath(std::string const& iPathLabel, std::vector<std::string>& oLabelsToFill) const {
1296  streamSchedules_[0]->modulesInPath(iPathLabel, oLabelsToFill);
1297  }
1298 
1300  std::vector<ModuleDescription const*>& descriptions,
1301  unsigned int hint) const {
1302  streamSchedules_[0]->moduleDescriptionsInPath(iPathLabel, descriptions, hint);
1303  }
1304 
1306  std::vector<ModuleDescription const*>& descriptions,
1307  unsigned int hint) const {
1308  streamSchedules_[0]->moduleDescriptionsInEndPath(iEndPathLabel, descriptions, hint);
1309  }
1310 
1312  std::vector<ModuleDescription const*>& allModuleDescriptions,
1313  std::vector<std::pair<unsigned int, unsigned int>>& moduleIDToIndex,
1314  std::array<std::vector<std::vector<ModuleDescription const*>>, NumBranchTypes>& modulesWhoseProductsAreConsumedBy,
1315  std::vector<std::vector<ModuleProcessName>>& modulesInPreviousProcessesWhoseProductsAreConsumedBy,
1316  ProductRegistry const& preg) const {
1317  allModuleDescriptions.clear();
1318  moduleIDToIndex.clear();
1319  for (auto iBranchType = 0U; iBranchType < NumBranchTypes; ++iBranchType) {
1320  modulesWhoseProductsAreConsumedBy[iBranchType].clear();
1321  }
1322  modulesInPreviousProcessesWhoseProductsAreConsumedBy.clear();
1323 
1324  allModuleDescriptions.reserve(allWorkers().size());
1325  moduleIDToIndex.reserve(allWorkers().size());
1326  for (auto iBranchType = 0U; iBranchType < NumBranchTypes; ++iBranchType) {
1327  modulesWhoseProductsAreConsumedBy[iBranchType].resize(allWorkers().size());
1328  }
1329  modulesInPreviousProcessesWhoseProductsAreConsumedBy.resize(allWorkers().size());
1330 
1331  std::map<std::string, ModuleDescription const*> labelToDesc;
1332  unsigned int i = 0;
1333  for (auto const& worker : allWorkers()) {
1334  ModuleDescription const* p = worker->description();
1335  allModuleDescriptions.push_back(p);
1336  moduleIDToIndex.push_back(std::pair<unsigned int, unsigned int>(p->id(), i));
1337  labelToDesc[p->moduleLabel()] = p;
1338  ++i;
1339  }
1340  sort_all(moduleIDToIndex);
1341 
1342  i = 0;
1343  for (auto const& worker : allWorkers()) {
1344  std::array<std::vector<ModuleDescription const*>*, NumBranchTypes> modules;
1345  for (auto iBranchType = 0U; iBranchType < NumBranchTypes; ++iBranchType) {
1346  modules[iBranchType] = &modulesWhoseProductsAreConsumedBy[iBranchType].at(i);
1347  }
1348 
1349  std::vector<ModuleProcessName>& modulesInPreviousProcesses =
1350  modulesInPreviousProcessesWhoseProductsAreConsumedBy.at(i);
1351  try {
1352  worker->modulesWhoseProductsAreConsumed(modules, modulesInPreviousProcesses, preg, labelToDesc);
1353  } catch (cms::Exception& ex) {
1354  ex.addContext("Calling Worker::modulesWhoseProductsAreConsumed() for module " +
1355  worker->description()->moduleLabel());
1356  throw;
1357  }
1358  ++i;
1359  }
1360  }
1361 
1363  rep.eventSummary.totalEvents = 0;
1364  rep.eventSummary.totalEventsPassed = 0;
1365  rep.eventSummary.totalEventsFailed = 0;
1366  for (auto& s : streamSchedules_) {
1367  s->getTriggerReport(rep);
1368  }
1369  sort_all(rep.workerSummaries);
1370  }
1371 
1373  rep.eventSummary.totalEvents = 0;
1374  rep.eventSummary.cpuTime = 0.;
1375  rep.eventSummary.realTime = 0.;
1376  summaryTimeKeeper_->fillTriggerTimingReport(rep);
1377  }
1378 
1380  int returnValue = 0;
1381  for (auto& s : streamSchedules_) {
1382  returnValue += s->totalEvents();
1383  }
1384  return returnValue;
1385  }
1386 
1388  int returnValue = 0;
1389  for (auto& s : streamSchedules_) {
1390  returnValue += s->totalEventsPassed();
1391  }
1392  return returnValue;
1393  }
1394 
1396  int returnValue = 0;
1397  for (auto& s : streamSchedules_) {
1398  returnValue += s->totalEventsFailed();
1399  }
1400  return returnValue;
1401  }
1402 
1404  for (auto& s : streamSchedules_) {
1405  s->clearCounters();
1406  }
1407  }
1408 } // namespace edm
size
Write out results.
void endPaths(std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill the labels for all end paths in the process
Definition: Schedule.cc:1293
std::vector< PathSummary > endPathSummaries
Definition: TriggerReport.h:60
std::vector< PathTimingSummary > endPathSummaries
PostModuleDestruction postModuleDestructionSignal_
T getParameter(std::string const &) const
Definition: ParameterSet.h:303
static const TGPicture * info(bool iBackgroundIsBlack)
bool terminate() const
Return whether each output module has reached its maximum count.
Definition: Schedule.cc:757
#define CMS_SA_ALLOW
Timestamp const & endTime() const
Definition: RunPrincipal.h:69
void stopEvent(StreamContext const &)
std::vector< BranchIDList > BranchIDLists
Definition: BranchIDList.h:19
virtual void openFile(FileBlock const &fb)=0
void getTriggerTimingReport(TriggerTimingReport &rep) const
Definition: Schedule.cc:1372
roAction_t actions[nactions]
Definition: GenABIO.cc:181
void writeProcessBlockAsync(WaitingTaskHolder iTask, ProcessBlockPrincipal const &, ProcessContext const *, ActivityRegistry *)
Definition: Schedule.cc:1099
static Timestamp invalidTimestamp()
Definition: Timestamp.h:75
void restartModuleEvent(StreamContext const &, ModuleCallingContext const &)
void respondToCloseInputFile(FileBlock const &fb)
Definition: Schedule.cc:1172
int totalEvents() const
Definition: Schedule.cc:1379
vector< string > vstring
Definition: ExoticaDQM.cc:8
void startModuleEvent(StreamContext const &, ModuleCallingContext const &)
std::vector< Worker * > AllWorkers
Definition: Schedule.h:126
RunPrincipal const & runPrincipal() const
void moduleDescriptionsInPath(std::string const &iPathLabel, std::vector< ModuleDescription const *> &descriptions, unsigned int hint) const
Definition: Schedule.cc:1299
void removeModuleIfExists(ModuleDescription const &module)
std::vector< std::string > const * pathNames_
Definition: Schedule.h:324
void getTriggerReport(TriggerReport &rep) const
Definition: Schedule.cc:1362
void convertCurrentProcessAlias(std::string const &processName)
Convert "@currentProcess" in InputTag process names to the actual current process name...
Definition: Schedule.cc:1281
RunNumber_t run() const
Definition: RunPrincipal.h:61
void endStream(unsigned int)
Definition: Schedule.cc:1188
Schedule(ParameterSet &proc_pset, service::TriggerNamesService const &tns, ProductRegistry &pregistry, ExceptionToActionTable const &actions, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, PreallocationConfiguration const &config, ProcessContext const *processContext)
Definition: Schedule.cc:479
BranchIDLists const & branchIDLists() const
ParameterSet getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
PostGlobalEndLumi postGlobalWriteLumiSignal_
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
std::vector< WorkerSummary > workerSummaries
Definition: TriggerReport.h:61
edm::propagate_const< std::unique_ptr< SystemTimeKeeper > > summaryTimeKeeper_
Definition: Schedule.h:322
void find(edm::Handle< EcalRecHitCollection > &hits, DetId thisDet, std::vector< EcalRecHitCollection::const_iterator > &hit, bool debug=false)
Definition: FindCaloHit.cc:19
assert(be >=bs)
PreModuleDestruction preModuleDestructionSignal_
constexpr auto then(O &&iO)
Definition: chain_first.h:277
void processOneEventAsync(WaitingTaskHolder iTask, unsigned int iStreamID, EventTransitionInfo &, ServiceToken const &token)
Definition: Schedule.cc:1193
static unsigned int getUniqueID()
Returns a unique id each time called. Intended to be passed to ModuleDescription&#39;s constructor&#39;s modI...
void updateFromRegistry(ProductRegistry const &reg)
size_t getParameterSetNames(std::vector< std::string > &output, bool trackiness=true) const
edm::propagate_const< std::unique_ptr< GlobalSchedule > > globalSchedule_
Definition: Schedule.h:317
void eraseOrSetUntrackedParameterSet(std::string const &name)
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
bool anyProductProduced() const
T getUntrackedParameter(std::string const &, T const &) const
static RunIndex invalidRunIndex()
Definition: RunIndex.cc:9
void processEDAliases(std::vector< std::string > const &aliasNamesToProcess, std::unordered_set< std::string > const &aliasModulesToProcess, ParameterSet const &proc_pset, std::string const &processName, ProductRegistry &preg)
void deleteModule(std::string const &iLabel, ActivityRegistry *areg)
Deletes module with label iLabel.
Definition: Schedule.cc:1250
ParameterSetID id() const
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
Definition: Schedule.cc:1268
void updateForNewProcess(ProductRegistry const &, std::string const &processName)
ParameterSet const & registerIt()
std::vector< PathSummary > trigPathSummaries
Definition: TriggerReport.h:59
EventSummary eventSummary
Definition: TriggerReport.h:58
void beginJob(ProductRegistry const &, eventsetup::ESRecordsToProxyIndices const &, ProcessBlockHelperBase const &)
Definition: Schedule.cc:1177
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter > > > pathStatusInserters_
Definition: Schedule.h:312
void fillModuleAndConsumesInfo(std::vector< ModuleDescription const *> &allModuleDescriptions, std::vector< std::pair< unsigned int, unsigned int >> &moduleIDToIndex, std::array< std::vector< std::vector< ModuleDescription const *>>, NumBranchTypes > &modulesWhoseProductsAreConsumedBy, std::vector< std::vector< ModuleProcessName >> &modulesInPreviousProcessesWhoseProductsAreConsumedBy, ProductRegistry const &preg) const
Definition: Schedule.cc:1311
bool empty() const
Definition: ParameterSet.h:201
EventTimingSummary eventSummary
Strings const & getTrigPaths() const
void setUnscheduledProducts(std::set< std::string > const &unscheduledLabels)
void clearCounters()
Clear all the counters in the trigger report.
Definition: Schedule.cc:1403
std::vector< std::string > getParameterNamesForType(bool trackiness=true) const
Definition: ParameterSet.h:179
PostGlobalWriteRun postGlobalWriteRunSignal_
static ServiceRegistry & instance()
std::vector< PathTimingSummary > trigPathSummaries
RunIndex index() const
Definition: RunPrincipal.h:56
double f[11][100]
void limitOutput(ParameterSet const &proc_pset, BranchIDLists const &branchIDLists, SubProcessParentageHelper const *subProcessParentageHelper)
Definition: Schedule.cc:716
void setFrozen(bool initializeLookupInfo=true)
void respondToOpenInputFile(FileBlock const &fb)
Definition: Schedule.cc:1167
edm::propagate_const< std::shared_ptr< ModuleRegistry > > moduleRegistry_
Definition: Schedule.h:314
Timestamp const & beginTime() const
rep
Definition: cuy.py:1189
void writeRunAsync(WaitingTaskHolder iTask, RunPrincipal const &rp, ProcessContext const *, ActivityRegistry *, MergeableRunProductMetadata const *)
Definition: Schedule.cc:1066
void stopPath(StreamContext const &, PathContext const &, HLTPathStatus const &)
int totalEventsPassed() const
Definition: Schedule.cc:1387
std::shared_ptr< ModuleRegistry const > moduleRegistry() const
Definition: Schedule.h:308
static LuminosityBlockIndex invalidLuminosityBlockIndex()
PreGlobalWriteRun preGlobalWriteRunSignal_
void stopModuleEvent(StreamContext const &, ModuleCallingContext const &)
virtual bool shouldWeCloseFile() const =0
void availablePaths(std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill the labels for all paths in the process
Definition: Schedule.cc:1287
Log< level::Info, false > LogInfo
PreallocationConfiguration preallocConfig_
Definition: Schedule.h:320
Log< level::FwkInfo, true > LogFwkVerbatim
PreWriteProcessBlock preWriteProcessBlockSignal_
std::vector< edm::propagate_const< std::shared_ptr< StreamSchedule > > > streamSchedules_
Definition: Schedule.h:315
Strings const & getEndPaths() const
void sort_all(RandomAccessSequence &s)
wrappers for std::sort
Definition: Algorithms.h:92
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
Definition: Schedule.cc:1279
std::shared_ptr< ProductResolverIndexHelper const > productLookup(BranchType branchType) const
void respondToOpenInputFile(FileBlock const &fb)
Definition: Worker.h:183
void startPath(StreamContext const &, PathContext const &)
bool search_all(ForwardSequence const &s, Datum const &d)
Definition: Algorithms.h:36
PostWriteProcessBlock postWriteProcessBlockSignal_
bool shouldWeCloseOutput() const
Definition: Schedule.cc:1158
ProductList & productListUpdator()
void addContext(std::string const &context)
Definition: Exception.cc:165
void moduleDescriptionsInEndPath(std::string const &iEndPathLabel, std::vector< ModuleDescription const *> &descriptions, unsigned int hint) const
Definition: Schedule.cc:1305
virtual std::unique_ptr< OutputModuleCommunicator > createOutputModuleCommunicator()=0
AllOutputModuleCommunicators all_output_communicators_
Definition: Schedule.h:319
void pauseModuleEvent(StreamContext const &, ModuleCallingContext const &)
PreGlobalEndLumi preGlobalWriteLumiSignal_
void beginStream(unsigned int)
Definition: Schedule.cc:1183
void respondToCloseInputFile(FileBlock const &fb)
Definition: Worker.h:184
bool wantSummary_
Definition: Schedule.h:326
std::vector< std::string > const * endPathNames_
Definition: Schedule.h:325
HLT enums.
void modulesInPath(std::string const &iPathLabel, std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel ...
Definition: Schedule.cc:1295
LuminosityBlockIndex index() const
void finishSetup(ParameterSet &proc_pset, service::TriggerNamesService const &tns, ProductRegistry &preg, BranchIDListHelper &branchIDListHelper, ProcessBlockHelperBase &processBlockHelper, ThinnedAssociationsHelper &thinnedAssociationsHelper, SubProcessParentageHelper const *subProcessParentageHelper, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration > processConfiguration, bool hasSubprocesses, PreallocationConfiguration const &prealloc, ProcessContext const *processContext)
Definition: Schedule.cc:567
static uInt32 F(BLOWFISH_CTX *ctx, uInt32 x)
Definition: blowfish.cc:163
void openOutputFiles(FileBlock &fb)
Definition: Schedule.cc:1061
void writeLumiAsync(WaitingTaskHolder iTask, LuminosityBlockPrincipal const &lbp, ProcessContext const *, ActivityRegistry *)
Definition: Schedule.cc:1129
std::vector< WorkerTimingSummary > workerSummaries
T mod(const T &a, const T &b)
Definition: ecalDccMap.h:4
void initializeEarlyDelete(std::vector< std::string > const &branchesToDeleteEarly, std::multimap< std::string, std::string > const &referencesToBranches, std::vector< std::string > const &modulesToSkip, edm::ProductRegistry const &preg)
Definition: Schedule.cc:1258
void endJob(ExceptionCollector &collector)
Definition: Schedule.cc:772
bool changeModule(std::string const &iLabel, ParameterSet const &iPSet, const ProductRegistry &iRegistry, eventsetup::ESRecordsToProxyIndices const &)
Definition: Schedule.cc:1201
std::vector< std::string > vstring
Definition: Schedule.cc:475
ServiceToken presentToken() const
void setIsMergeable(BranchDescription &)
std::vector< std::string > set_difference(std::vector< std::string > const &v1, std::vector< std::string > const &v2)
int totalEventsFailed() const
Definition: Schedule.cc:1395
def move(src, dest)
Definition: eostools.py:511
void closeOutputFiles()
Definition: Schedule.cc:1053
void triggerPaths(std::vector< std::string > &oLabelsToFill) const
Definition: Schedule.cc:1291
unsigned transform(const HcalDetId &id, unsigned transformCode)