CMS 3D CMS Logo

Schedule.cc
Go to the documentation of this file.
2 
38 
39 #include <array>
40 #include <algorithm>
41 #include <cassert>
42 #include <cstdlib>
43 #include <functional>
44 #include <iomanip>
45 #include <list>
46 #include <map>
47 #include <set>
48 #include <exception>
49 #include <sstream>
50 
52 #include "processEDAliases.h"
53 
54 namespace edm {
55 
56  class Maker;
57 
58  namespace {
59  using std::placeholders::_1;
60 
61  bool binary_search_string(std::vector<std::string> const& v, std::string const& s) {
62  return std::binary_search(v.begin(), v.end(), s);
63  }
64 
65  // Here we make the trigger results inserter directly. This should
66  // probably be a utility in the WorkerRegistry or elsewhere.
67 
68  std::shared_ptr<TriggerResultInserter> makeInserter(
69  ParameterSet& proc_pset,
70  PreallocationConfiguration const& iPrealloc,
71  ProductRegistry& preg,
72  ExceptionToActionTable const& actions,
73  std::shared_ptr<ActivityRegistry> areg,
74  std::shared_ptr<ProcessConfiguration const> processConfiguration) {
75  ParameterSet* trig_pset = proc_pset.getPSetForUpdate("@trigger_paths");
76  trig_pset->registerIt();
77 
78  WorkerParams work_args(trig_pset, preg, &iPrealloc, processConfiguration, actions);
79  ModuleDescription md(trig_pset->id(),
80  "TriggerResultInserter",
81  "TriggerResults",
82  processConfiguration.get(),
84 
85  areg->preModuleConstructionSignal_(md);
86  bool postCalled = false;
87  std::shared_ptr<TriggerResultInserter> returnValue;
88  // Caught exception is rethrown
89  CMS_SA_ALLOW try {
90  maker::ModuleHolderT<TriggerResultInserter> holder(
91  make_shared_noexcept_false<TriggerResultInserter>(*trig_pset, iPrealloc.numberOfStreams()),
92  static_cast<Maker const*>(nullptr));
93  holder.setModuleDescription(md);
94  holder.registerProductsAndCallbacks(&preg);
95  returnValue = holder.module();
96  postCalled = true;
97  // if exception then post will be called in the catch block
98  areg->postModuleConstructionSignal_(md);
99  } catch (...) {
100  if (!postCalled) {
101  CMS_SA_ALLOW try { areg->postModuleConstructionSignal_(md); } catch (...) {
102  // If post throws an exception ignore it because we are already handling another exception
103  }
104  }
105  throw;
106  }
107  return returnValue;
108  }
109 
110  template <typename T>
111  void makePathStatusInserters(std::vector<edm::propagate_const<std::shared_ptr<T>>>& pathStatusInserters,
112  std::vector<std::string> const& pathNames,
113  PreallocationConfiguration const& iPrealloc,
114  ProductRegistry& preg,
115  std::shared_ptr<ActivityRegistry> areg,
116  std::shared_ptr<ProcessConfiguration const> processConfiguration,
117  std::string const& moduleTypeName) {
119  pset.addParameter<std::string>("@module_type", moduleTypeName);
120  pset.addParameter<std::string>("@module_edm_type", "EDProducer");
121  pset.registerIt();
122 
123  pathStatusInserters.reserve(pathNames.size());
124 
125  for (auto const& pathName : pathNames) {
126  ModuleDescription md(
127  pset.id(), moduleTypeName, pathName, processConfiguration.get(), ModuleDescription::getUniqueID());
128 
129  areg->preModuleConstructionSignal_(md);
130  bool postCalled = false;
131  // Caught exception is rethrown
132  CMS_SA_ALLOW try {
133  maker::ModuleHolderT<T> holder(make_shared_noexcept_false<T>(iPrealloc.numberOfStreams()),
134  static_cast<Maker const*>(nullptr));
135  holder.setModuleDescription(md);
136  holder.registerProductsAndCallbacks(&preg);
137  pathStatusInserters.emplace_back(holder.module());
138  postCalled = true;
139  // if exception then post will be called in the catch block
140  areg->postModuleConstructionSignal_(md);
141  } catch (...) {
142  if (!postCalled) {
143  CMS_SA_ALLOW try { areg->postModuleConstructionSignal_(md); } catch (...) {
144  // If post throws an exception ignore it because we are already handling another exception
145  }
146  }
147  throw;
148  }
149  }
150  }
151 
152  typedef std::vector<std::string> vstring;
153 
154  void processSwitchProducers(ParameterSet const& proc_pset, std::string const& processName, ProductRegistry& preg) {
155  // Update Switch BranchDescriptions for the chosen case
156  struct BranchesCases {
157  BranchesCases(std::vector<std::string> cases) : caseLabels{std::move(cases)} {}
158  std::vector<BranchKey> chosenBranches;
159  std::vector<std::string> caseLabels;
160  };
161  std::map<std::string, BranchesCases> switchMap;
162  for (auto& prod : preg.productListUpdator()) {
163  if (prod.second.isSwitchAlias()) {
164  auto it = switchMap.find(prod.second.moduleLabel());
165  if (it == switchMap.end()) {
166  auto const& switchPSet = proc_pset.getParameter<edm::ParameterSet>(prod.second.moduleLabel());
167  auto inserted = switchMap.emplace(prod.second.moduleLabel(),
168  switchPSet.getParameter<std::vector<std::string>>("@all_cases"));
169  assert(inserted.second);
170  it = inserted.first;
171  }
172 
173  bool found = false;
174  for (auto const& productIter : preg.productList()) {
175  BranchKey const& branchKey = productIter.first;
176  // The alias-for product must be in the same process as
177  // the SwitchProducer (earlier processes or SubProcesses
178  // may contain products with same type, module label, and
179  // instance name)
180  if (branchKey.processName() != processName) {
181  continue;
182  }
183 
184  BranchDescription const& desc = productIter.second;
185  if (desc.branchType() == prod.second.branchType() and
186  desc.unwrappedTypeID().typeInfo() == prod.second.unwrappedTypeID().typeInfo() and
187  branchKey.moduleLabel() == prod.second.switchAliasModuleLabel() and
188  branchKey.productInstanceName() == prod.second.productInstanceName()) {
189  prod.second.setSwitchAliasForBranch(desc);
190  if (!prod.second.transient()) {
191  it->second.chosenBranches.push_back(prod.first); // with moduleLabel of the Switch
192  }
193  found = true;
194  }
195  }
196  if (not found) {
198  ex << "Trying to find a BranchDescription to be aliased-for by SwitchProducer with\n"
199  << " friendly class name = " << prod.second.friendlyClassName() << "\n"
200  << " module label = " << prod.second.moduleLabel() << "\n"
201  << " product instance name = " << prod.second.productInstanceName() << "\n"
202  << " process name = " << processName
203  << "\n\nbut did not find any. Please contact a framework developer.";
204  ex.addContext("Calling Schedule.cc:processSwitchProducers()");
205  throw ex;
206  }
207  }
208  }
209  if (switchMap.empty())
210  return;
211 
212  for (auto& elem : switchMap) {
213  std::sort(elem.second.chosenBranches.begin(), elem.second.chosenBranches.end());
214  }
215 
216  auto addProductsToException = [&preg, &processName](auto const& caseLabels, edm::Exception& ex) {
217  std::map<std::string, std::vector<BranchKey>> caseBranches;
218  for (auto const& item : preg.productList()) {
219  if (item.first.processName() != processName)
220  continue;
221 
222  if (auto found = std::find(caseLabels.begin(), caseLabels.end(), item.first.moduleLabel());
223  found != caseLabels.end()) {
224  caseBranches[*found].push_back(item.first);
225  }
226  }
227 
228  for (auto const& caseLabel : caseLabels) {
229  ex << "Products for case " << caseLabel << " (friendly class name, product instance name):\n";
230  auto& branches = caseBranches[caseLabel];
231  std::sort(branches.begin(), branches.end());
232  for (auto const& branch : branches) {
233  ex << " " << branch.friendlyClassName() << " " << branch.productInstanceName() << "\n";
234  }
235  }
236  };
237 
238  // Check that non-chosen cases declare exactly the same non-transient branches
239  // Also set the alias-for branches to transient
240  std::vector<bool> foundBranches;
241  for (auto const& switchItem : switchMap) {
242  auto const& switchLabel = switchItem.first;
243  auto const& chosenBranches = switchItem.second.chosenBranches;
244  auto const& caseLabels = switchItem.second.caseLabels;
245  foundBranches.resize(chosenBranches.size());
246  for (auto const& caseLabel : caseLabels) {
247  std::fill(foundBranches.begin(), foundBranches.end(), false);
248  for (auto& nonConstItem : preg.productListUpdator()) {
249  auto const& item = nonConstItem;
250  if (item.first.moduleLabel() == caseLabel and item.first.processName() == processName) {
251  // Check that products which are not transient in the dictionary are consistent between
252  // all the cases of a SwitchProducer.
253  if (!item.second.transient()) {
254  auto range = std::equal_range(chosenBranches.begin(),
255  chosenBranches.end(),
256  BranchKey(item.first.friendlyClassName(),
257  switchLabel,
258  item.first.productInstanceName(),
259  item.first.processName()));
260  if (range.first == range.second) {
262  ex << "SwitchProducer " << switchLabel << " has a case " << caseLabel << " with a product "
263  << item.first << " that is not produced by the chosen case "
264  << proc_pset.getParameter<edm::ParameterSet>(switchLabel)
265  .getUntrackedParameter<std::string>("@chosen_case")
266  << " and that product is not transient. "
267  << "If the intention is to produce only a subset of the non-transient products listed below, each "
268  "case with more non-transient products needs to be replaced with an EDAlias to only the "
269  "necessary products, and the EDProducer itself needs to be moved to a Task.\n\n";
270  addProductsToException(caseLabels, ex);
271  throw ex;
272  }
273  assert(std::distance(range.first, range.second) == 1);
274  foundBranches[std::distance(chosenBranches.begin(), range.first)] = true;
275  }
276 
277  // Set the alias-for branch as transient so it gets fully ignored in output.
278  // I tried first to implicitly drop all branches with
279  // '@' in ProductSelector, but that gave problems on
280  // input (those branches would be implicitly dropped on
281  // input as well, leading to the SwitchProducer branches
282  // do be dropped as dependent ones, as the alias
283  // detection logic in RootFile says that the
284  // SwitchProducer branches are not alias branches)
285  nonConstItem.second.setTransient(true);
286 
287  // Check that there are no BranchAliases for any of the cases
288  auto const& bd = item.second;
289  if (not bd.branchAliases().empty()) {
291  << "SwitchProducer does not support ROOT branch aliases. Got the following ROOT branch "
292  "aliases for SwitchProducer with label "
293  << switchLabel << " for case " << caseLabel << ":";
294  for (auto const& branchAlias : bd.branchAliases()) {
295  ex << " " << branchAlias;
296  }
297  throw ex;
298  }
299  }
300  }
301 
302  for (size_t i = 0; i < chosenBranches.size(); i++) {
303  if (not foundBranches[i]) {
304  auto chosenLabel = proc_pset.getParameter<edm::ParameterSet>(switchLabel)
305  .getUntrackedParameter<std::string>("@chosen_case");
307  ex << "SwitchProducer " << switchLabel << " has a case " << caseLabel
308  << " that does not produce a product " << chosenBranches[i] << " that is produced by the chosen case "
309  << chosenLabel << " and that product is not transient. "
310  << "If the intention is to produce only a subset of the non-transient products listed below, each "
311  "case with more non-transient products needs to be replaced with an EDAlias to only the "
312  "necessary products, and the EDProducer itself needs to be moved to a Task.\n\n";
313  addProductsToException(caseLabels, ex);
314  throw ex;
315  }
316  }
317  }
318  }
319  }
320 
321  void reduceParameterSet(ParameterSet& proc_pset,
322  vstring const& end_path_name_list,
323  vstring& modulesInConfig,
324  std::set<std::string> const& usedModuleLabels,
325  std::map<std::string, std::vector<std::pair<std::string, int>>>& outputModulePathPositions) {
326  // Before calculating the ParameterSetID of the top level ParameterSet or
327  // saving it in the registry drop from the top level ParameterSet all
328  // OutputModules and EDAnalyzers not on trigger paths. If unscheduled
329  // production is not enabled also drop all the EDFilters and EDProducers
330  // that are not scheduled. Drop the ParameterSet used to configure the module
331  // itself. Also drop the other traces of these labels in the top level
332  // ParameterSet: Remove that labels from @all_modules and from all the
333  // end paths. If this makes any end paths empty, then remove the end path
334  // name from @end_paths, and @paths.
335 
336  // First make a list of labels to drop
337  vstring outputModuleLabels;
338  std::string edmType;
339  std::string const moduleEdmType("@module_edm_type");
340  std::string const outputModule("OutputModule");
341  std::string const edAnalyzer("EDAnalyzer");
342  std::string const edFilter("EDFilter");
343  std::string const edProducer("EDProducer");
344 
345  std::set<std::string> modulesInConfigSet(modulesInConfig.begin(), modulesInConfig.end());
346 
347  //need a list of all modules on paths in order to determine
348  // if an EDAnalyzer only appears on an end path
349  vstring scheduledPaths = proc_pset.getParameter<vstring>("@paths");
350  std::set<std::string> modulesOnPaths;
351  {
352  std::set<std::string> noEndPaths(scheduledPaths.begin(), scheduledPaths.end());
353  for (auto const& endPath : end_path_name_list) {
354  noEndPaths.erase(endPath);
355  }
356  {
357  vstring labels;
358  for (auto const& path : noEndPaths) {
359  labels = proc_pset.getParameter<vstring>(path);
360  modulesOnPaths.insert(labels.begin(), labels.end());
361  }
362  }
363  }
364  //Initially fill labelsToBeDropped with all module mentioned in
365  // the configuration but which are not being used by the system
366  std::vector<std::string> labelsToBeDropped;
367  labelsToBeDropped.reserve(modulesInConfigSet.size());
368  std::set_difference(modulesInConfigSet.begin(),
369  modulesInConfigSet.end(),
370  usedModuleLabels.begin(),
371  usedModuleLabels.end(),
372  std::back_inserter(labelsToBeDropped));
373 
374  const unsigned int sizeBeforeOutputModules = labelsToBeDropped.size();
375  for (auto const& modLabel : usedModuleLabels) {
376  // Do nothing for modules that do not have a ParameterSet. Modules of type
377  // PathStatusInserter and EndPathStatusInserter will not have a ParameterSet.
378  if (proc_pset.existsAs<ParameterSet>(modLabel)) {
379  edmType = proc_pset.getParameterSet(modLabel).getParameter<std::string>(moduleEdmType);
380  if (edmType == outputModule) {
381  outputModuleLabels.push_back(modLabel);
382  labelsToBeDropped.push_back(modLabel);
383  }
384  if (edmType == edAnalyzer) {
385  if (modulesOnPaths.end() == modulesOnPaths.find(modLabel)) {
386  labelsToBeDropped.push_back(modLabel);
387  }
388  }
389  }
390  }
391  //labelsToBeDropped must be sorted
392  std::inplace_merge(
393  labelsToBeDropped.begin(), labelsToBeDropped.begin() + sizeBeforeOutputModules, labelsToBeDropped.end());
394 
395  // drop the parameter sets used to configure the modules
396  for_all(labelsToBeDropped, std::bind(&ParameterSet::eraseOrSetUntrackedParameterSet, std::ref(proc_pset), _1));
397 
398  // drop the labels from @all_modules
399  vstring::iterator endAfterRemove =
400  std::remove_if(modulesInConfig.begin(),
401  modulesInConfig.end(),
402  std::bind(binary_search_string, std::ref(labelsToBeDropped), _1));
403  modulesInConfig.erase(endAfterRemove, modulesInConfig.end());
404  proc_pset.addParameter<vstring>(std::string("@all_modules"), modulesInConfig);
405 
406  // drop the labels from all end paths
407  vstring endPathsToBeDropped;
408  vstring labels;
409  for (vstring::const_iterator iEndPath = end_path_name_list.begin(), endEndPath = end_path_name_list.end();
410  iEndPath != endEndPath;
411  ++iEndPath) {
412  labels = proc_pset.getParameter<vstring>(*iEndPath);
413  vstring::iterator iSave = labels.begin();
414  vstring::iterator iBegin = labels.begin();
415 
416  for (vstring::iterator iLabel = labels.begin(), iEnd = labels.end(); iLabel != iEnd; ++iLabel) {
417  if (binary_search_string(labelsToBeDropped, *iLabel)) {
418  if (binary_search_string(outputModuleLabels, *iLabel)) {
419  outputModulePathPositions[*iLabel].emplace_back(*iEndPath, iSave - iBegin);
420  }
421  } else {
422  if (iSave != iLabel) {
423  iSave->swap(*iLabel);
424  }
425  ++iSave;
426  }
427  }
428  labels.erase(iSave, labels.end());
429  if (labels.empty()) {
430  // remove empty end paths and save their names
431  proc_pset.eraseSimpleParameter(*iEndPath);
432  endPathsToBeDropped.push_back(*iEndPath);
433  } else {
434  proc_pset.addParameter<vstring>(*iEndPath, labels);
435  }
436  }
437  sort_all(endPathsToBeDropped);
438 
439  // remove empty end paths from @paths
440  endAfterRemove = std::remove_if(scheduledPaths.begin(),
441  scheduledPaths.end(),
442  std::bind(binary_search_string, std::ref(endPathsToBeDropped), _1));
443  scheduledPaths.erase(endAfterRemove, scheduledPaths.end());
444  proc_pset.addParameter<vstring>(std::string("@paths"), scheduledPaths);
445 
446  // remove empty end paths from @end_paths
447  vstring scheduledEndPaths = proc_pset.getParameter<vstring>("@end_paths");
448  endAfterRemove = std::remove_if(scheduledEndPaths.begin(),
449  scheduledEndPaths.end(),
450  std::bind(binary_search_string, std::ref(endPathsToBeDropped), _1));
451  scheduledEndPaths.erase(endAfterRemove, scheduledEndPaths.end());
452  proc_pset.addParameter<vstring>(std::string("@end_paths"), scheduledEndPaths);
453  }
454 
455  class RngEDConsumer : public EDConsumerBase {
456  public:
457  explicit RngEDConsumer(std::set<TypeID>& typesConsumed) {
459  if (rng.isAvailable()) {
460  rng->consumes(consumesCollector());
461  for (auto const& consumesInfo : this->consumesInfo()) {
462  typesConsumed.emplace(consumesInfo.type());
463  }
464  }
465  }
466  };
467 
468  template <typename F>
469  auto doCleanup(F&& iF) {
470  auto wrapped = [f = std::move(iF)](std::exception_ptr const* iPtr, edm::WaitingTaskHolder iTask) {
471  CMS_SA_ALLOW try { f(); } catch (...) {
472  }
473  if (iPtr) {
474  iTask.doneWaiting(*iPtr);
475  }
476  };
477  return wrapped;
478  }
479  } // namespace
480  // -----------------------------
481 
482  typedef std::vector<std::string> vstring;
483 
484  // -----------------------------
485 
487  service::TriggerNamesService const& tns,
488  ProductRegistry& preg,
490  std::shared_ptr<ActivityRegistry> areg,
491  std::shared_ptr<ProcessConfiguration const> processConfiguration,
492  PreallocationConfiguration const& prealloc,
493  ProcessContext const* processContext,
494  ModuleTypeResolverMaker const* resolverMaker)
495  : //Only create a resultsInserter if there is a trigger path
496  resultsInserter_{tns.getTrigPaths().empty()
497  ? std::shared_ptr<TriggerResultInserter>{}
498  : makeInserter(proc_pset, prealloc, preg, actions, areg, processConfiguration)},
499  moduleRegistry_(std::make_shared<ModuleRegistry>(resolverMaker)),
500  all_output_communicators_(),
501  preallocConfig_(prealloc),
502  pathNames_(&tns.getTrigPaths()),
503  endPathNames_(&tns.getEndPaths()),
504  wantSummary_(tns.wantSummary()) {
505  makePathStatusInserters(pathStatusInserters_,
506  *pathNames_,
507  prealloc,
508  preg,
509  areg,
510  processConfiguration,
511  std::string("PathStatusInserter"));
512 
513  if (endPathNames_->size() > 1) {
514  //NOTE: FinalPaths are a type of EndPath
515  makePathStatusInserters(endPathStatusInserters_,
516  *endPathNames_,
517  prealloc,
518  preg,
519  areg,
520  processConfiguration,
521  std::string("EndPathStatusInserter"));
522  }
523 
524  assert(0 < prealloc.numberOfStreams());
525  streamSchedules_.reserve(prealloc.numberOfStreams());
526  for (unsigned int i = 0; i < prealloc.numberOfStreams(); ++i) {
527  streamSchedules_.emplace_back(make_shared_noexcept_false<StreamSchedule>(resultsInserter(),
528  pathStatusInserters_,
529  endPathStatusInserters_,
530  moduleRegistry(),
531  proc_pset,
532  tns,
533  prealloc,
534  preg,
535  actions,
536  areg,
537  processConfiguration,
538  StreamID{i},
539  processContext));
540  }
541 
542  //TriggerResults are injected automatically by StreamSchedules and are
543  // unknown to the ModuleRegistry
544  const std::string kTriggerResults("TriggerResults");
545  std::vector<std::string> modulesToUse;
546  modulesToUse.reserve(streamSchedules_[0]->allWorkersLumisAndEvents().size());
547  for (auto const& worker : streamSchedules_[0]->allWorkersLumisAndEvents()) {
548  if (worker->description()->moduleLabel() != kTriggerResults) {
549  modulesToUse.push_back(worker->description()->moduleLabel());
550  }
551  }
552  //The unscheduled modules are at the end of the list, but we want them at the front
553  unsigned int const nUnscheduledModules = streamSchedules_[0]->numberOfUnscheduledModules();
554  if (nUnscheduledModules > 0) {
555  std::vector<std::string> temp;
556  temp.reserve(modulesToUse.size());
557  auto itBeginUnscheduled = modulesToUse.begin() + modulesToUse.size() - nUnscheduledModules;
558  std::copy(itBeginUnscheduled, modulesToUse.end(), std::back_inserter(temp));
559  std::copy(modulesToUse.begin(), itBeginUnscheduled, std::back_inserter(temp));
560  temp.swap(modulesToUse);
561  }
562 
563  // propagate_const<T> has no reset() function
564  globalSchedule_ = std::make_unique<GlobalSchedule>(resultsInserter(),
565  pathStatusInserters_,
566  endPathStatusInserters_,
567  moduleRegistry(),
568  modulesToUse,
569  proc_pset,
570  preg,
571  prealloc,
572  actions,
573  areg,
574  processConfiguration,
575  processContext);
576  }
577 
579  service::TriggerNamesService const& tns,
580  ProductRegistry& preg,
581  BranchIDListHelper& branchIDListHelper,
582  ProcessBlockHelperBase& processBlockHelper,
583  ThinnedAssociationsHelper& thinnedAssociationsHelper,
584  SubProcessParentageHelper const* subProcessParentageHelper,
585  std::shared_ptr<ActivityRegistry> areg,
586  std::shared_ptr<ProcessConfiguration> processConfiguration,
587  bool hasSubprocesses,
588  PreallocationConfiguration const& prealloc,
589  ProcessContext const* processContext) {
590  //TriggerResults is not in the top level ParameterSet so the call to
591  // reduceParameterSet would fail to find it. Just remove it up front.
592  const std::string kTriggerResults("TriggerResults");
593 
594  std::set<std::string> usedModuleLabels;
595  for (auto const& worker : allWorkers()) {
596  if (worker->description()->moduleLabel() != kTriggerResults) {
597  usedModuleLabels.insert(worker->description()->moduleLabel());
598  }
599  }
600  std::vector<std::string> modulesInConfig(proc_pset.getParameter<std::vector<std::string>>("@all_modules"));
601  std::map<std::string, std::vector<std::pair<std::string, int>>> outputModulePathPositions;
602  reduceParameterSet(proc_pset, tns.getEndPaths(), modulesInConfig, usedModuleLabels, outputModulePathPositions);
603  {
604  std::vector<std::string> aliases = proc_pset.getParameter<std::vector<std::string>>("@all_aliases");
605  detail::processEDAliases(aliases, {}, proc_pset, processConfiguration->processName(), preg);
606  }
607 
608  // At this point all BranchDescriptions are created. Mark now the
609  // ones of unscheduled workers to be on-demand.
610  {
611  auto const& unsched = streamSchedules_[0]->unscheduledWorkersLumisAndEvents();
612  if (not unsched.empty()) {
613  std::set<std::string> unscheduledModules;
614  std::transform(unsched.begin(),
615  unsched.end(),
616  std::insert_iterator<std::set<std::string>>(unscheduledModules, unscheduledModules.begin()),
617  [](auto worker) { return worker->description()->moduleLabel(); });
618  preg.setUnscheduledProducts(unscheduledModules);
619  }
620  }
621 
622  processSwitchProducers(proc_pset, processConfiguration->processName(), preg);
623  proc_pset.registerIt();
624  processConfiguration->setParameterSetID(proc_pset.id());
625  processConfiguration->setProcessConfigurationID();
626 
627  // This is used for a little sanity-check to make sure no code
628  // modifications alter the number of workers at a later date.
629  size_t all_workers_count = allWorkers().size();
630 
631  moduleRegistry_->forAllModuleHolders([this](maker::ModuleHolder* iHolder) {
632  auto comm = iHolder->createOutputModuleCommunicator();
633  if (comm) {
634  all_output_communicators_.emplace_back(std::shared_ptr<OutputModuleCommunicator>{comm.release()});
635  }
636  });
637  // Now that the output workers are filled in, set any output limits or information.
638  limitOutput(proc_pset, branchIDListHelper.branchIDLists(), subProcessParentageHelper);
639 
640  // Sanity check: make sure nobody has added a worker after we've
641  // already relied on the WorkerManager being full.
642  assert(all_workers_count == allWorkers().size());
643 
644  branchIDListHelper.updateFromRegistry(preg);
645 
646  for (auto const& worker : streamSchedules_[0]->allWorkersLumisAndEvents()) {
647  worker->registerThinnedAssociations(preg, thinnedAssociationsHelper);
648  }
649 
650  processBlockHelper.updateForNewProcess(preg, processConfiguration->processName());
651 
652  // The output modules consume products in kept branches.
653  // So we must set this up before freezing.
654  for (auto& c : all_output_communicators_) {
655  c->selectProducts(preg, thinnedAssociationsHelper, processBlockHelper);
656  }
657 
658  for (auto& product : preg.productListUpdator()) {
659  setIsMergeable(product.second);
660  }
661 
662  {
663  // We now get a collection of types that may be consumed.
664  std::set<TypeID> productTypesConsumed;
665  std::set<TypeID> elementTypesConsumed;
666  // Loop over all modules
667  for (auto const& worker : allWorkers()) {
668  for (auto const& consumesInfo : worker->consumesInfo()) {
669  if (consumesInfo.kindOfType() == PRODUCT_TYPE) {
670  productTypesConsumed.emplace(consumesInfo.type());
671  } else {
672  elementTypesConsumed.emplace(consumesInfo.type());
673  }
674  }
675  }
676  // The SubProcess class is not a module, yet it may consume.
677  if (hasSubprocesses) {
678  productTypesConsumed.emplace(typeid(TriggerResults));
679  }
680  // The RandomNumberGeneratorService is not a module, yet it consumes.
681  { RngEDConsumer rngConsumer = RngEDConsumer(productTypesConsumed); }
682  preg.setFrozen(productTypesConsumed, elementTypesConsumed, processConfiguration->processName());
683  }
684 
685  for (auto& c : all_output_communicators_) {
686  c->setEventSelectionInfo(outputModulePathPositions, preg.anyProductProduced());
687  }
688 
689  if (wantSummary_) {
690  std::vector<const ModuleDescription*> modDesc;
691  const auto& workers = allWorkers();
692  modDesc.reserve(workers.size());
693 
694  std::transform(workers.begin(),
695  workers.end(),
696  std::back_inserter(modDesc),
697  [](const Worker* iWorker) -> const ModuleDescription* { return iWorker->description(); });
698 
699  // propagate_const<T> has no reset() function
700  summaryTimeKeeper_ = std::make_unique<SystemTimeKeeper>(prealloc.numberOfStreams(), modDesc, tns, processContext);
701  auto timeKeeperPtr = summaryTimeKeeper_.get();
702 
703  areg->watchPreModuleDestruction(timeKeeperPtr, &SystemTimeKeeper::removeModuleIfExists);
704 
705  areg->watchPreModuleEvent(timeKeeperPtr, &SystemTimeKeeper::startModuleEvent);
706  areg->watchPostModuleEvent(timeKeeperPtr, &SystemTimeKeeper::stopModuleEvent);
707  areg->watchPreModuleEventAcquire(timeKeeperPtr, &SystemTimeKeeper::restartModuleEvent);
708  areg->watchPostModuleEventAcquire(timeKeeperPtr, &SystemTimeKeeper::stopModuleEvent);
709  areg->watchPreModuleEventDelayedGet(timeKeeperPtr, &SystemTimeKeeper::pauseModuleEvent);
710  areg->watchPostModuleEventDelayedGet(timeKeeperPtr, &SystemTimeKeeper::restartModuleEvent);
711 
712  areg->watchPreSourceEvent(timeKeeperPtr, &SystemTimeKeeper::startEvent);
713  areg->watchPostEvent(timeKeeperPtr, &SystemTimeKeeper::stopEvent);
714 
715  areg->watchPrePathEvent(timeKeeperPtr, &SystemTimeKeeper::startPath);
716  areg->watchPostPathEvent(timeKeeperPtr, &SystemTimeKeeper::stopPath);
717 
718  areg->watchPostBeginJob(timeKeeperPtr, &SystemTimeKeeper::startProcessingLoop);
719  areg->watchPreEndJob(timeKeeperPtr, &SystemTimeKeeper::stopProcessingLoop);
720  //areg->preModuleEventSignal_.connect([timeKeeperPtr](StreamContext const& iContext, ModuleCallingContext const& iMod) {
721  //timeKeeperPtr->startModuleEvent(iContext,iMod);
722  //});
723  }
724 
725  } // Schedule::Schedule
726 
727  void Schedule::limitOutput(ParameterSet const& proc_pset,
728  BranchIDLists const& branchIDLists,
729  SubProcessParentageHelper const* subProcessParentageHelper) {
730  std::string const output("output");
731 
732  ParameterSet const& maxEventsPSet = proc_pset.getUntrackedParameterSet("maxEvents");
733  int maxEventSpecs = 0;
734  int maxEventsOut = -1;
735  ParameterSet const* vMaxEventsOut = nullptr;
736  std::vector<std::string> intNamesE = maxEventsPSet.getParameterNamesForType<int>(false);
737  if (search_all(intNamesE, output)) {
738  maxEventsOut = maxEventsPSet.getUntrackedParameter<int>(output);
739  ++maxEventSpecs;
740  }
741  std::vector<std::string> psetNamesE;
742  maxEventsPSet.getParameterSetNames(psetNamesE, false);
743  if (search_all(psetNamesE, output)) {
744  vMaxEventsOut = &maxEventsPSet.getUntrackedParameterSet(output);
745  ++maxEventSpecs;
746  }
747 
748  if (maxEventSpecs > 1) {
750  << "\nAt most, one form of 'output' may appear in the 'maxEvents' parameter set";
751  }
752 
753  for (auto& c : all_output_communicators_) {
754  OutputModuleDescription desc(branchIDLists, maxEventsOut, subProcessParentageHelper);
755  if (vMaxEventsOut != nullptr && !vMaxEventsOut->empty()) {
756  std::string const& moduleLabel = c->description().moduleLabel();
757  try {
758  desc.maxEvents_ = vMaxEventsOut->getUntrackedParameter<int>(moduleLabel);
759  } catch (Exception const&) {
761  << "\nNo entry in 'maxEvents' for output module label '" << moduleLabel << "'.\n";
762  }
763  }
764  c->configure(desc);
765  }
766  }
767 
768  bool Schedule::terminate() const {
769  if (all_output_communicators_.empty()) {
770  return false;
771  }
772  for (auto& c : all_output_communicators_) {
773  if (!c->limitReached()) {
774  // Found an output module that has not reached output event count.
775  return false;
776  }
777  }
778  LogInfo("SuccessfulTermination") << "The job is terminating successfully because each output module\n"
779  << "has reached its configured limit.\n";
780  return true;
781  }
782 
784  globalSchedule_->endJob(collector);
785  if (collector.hasThrown()) {
786  return;
787  }
788 
789  if (wantSummary_) {
790  try {
792  } catch (cms::Exception const& ex) {
793  collector.addException(ex);
794  }
795  }
796  }
797 
799  //Function to loop over items in a container and periodically
800  // flush to the message logger.
801  auto logForEach = [](auto const& iContainer, auto iMessage) {
802  auto logger = LogFwkVerbatim("FwkSummary");
803  int count = 0;
804  for (auto const& element : iContainer) {
805  iMessage(logger, element);
806  if (++count == 10) {
807  logger = LogFwkVerbatim("FwkSummary");
808  count = 0;
809  } else {
810  logger << "\n";
811  }
812  }
813  };
814 
815  {
816  TriggerReport tr;
817  getTriggerReport(tr);
818 
819  // The trigger report (pass/fail etc.):
820 
821  LogFwkVerbatim("FwkSummary") << "";
822  if (streamSchedules_[0]->context().processContext()->isSubProcess()) {
823  LogFwkVerbatim("FwkSummary") << "TrigReport Process: "
824  << streamSchedules_[0]->context().processContext()->processName();
825  }
826  LogFwkVerbatim("FwkSummary") << "TrigReport "
827  << "---------- Event Summary ------------";
828  if (!tr.trigPathSummaries.empty()) {
829  LogFwkVerbatim("FwkSummary") << "TrigReport"
830  << " Events total = " << tr.eventSummary.totalEvents
831  << " passed = " << tr.eventSummary.totalEventsPassed
832  << " failed = " << tr.eventSummary.totalEventsFailed << "";
833  } else {
834  LogFwkVerbatim("FwkSummary") << "TrigReport"
835  << " Events total = " << tr.eventSummary.totalEvents
836  << " passed = " << tr.eventSummary.totalEvents << " failed = 0";
837  }
838 
839  LogFwkVerbatim("FwkSummary") << "";
840  LogFwkVerbatim("FwkSummary") << "TrigReport "
841  << "---------- Path Summary ------------";
842  LogFwkVerbatim("FwkSummary") << "TrigReport " << std::right << std::setw(10) << "Trig Bit#"
843  << " " << std::right << std::setw(10) << "Executed"
844  << " " << std::right << std::setw(10) << "Passed"
845  << " " << std::right << std::setw(10) << "Failed"
846  << " " << std::right << std::setw(10) << "Error"
847  << " "
848  << "Name"
849  << "";
850  logForEach(tr.trigPathSummaries, [](auto& logger, auto const& p) {
851  logger << "TrigReport " << std::right << std::setw(5) << 1 << std::right << std::setw(5) << p.bitPosition << " "
852  << std::right << std::setw(10) << p.timesRun << " " << std::right << std::setw(10) << p.timesPassed
853  << " " << std::right << std::setw(10) << p.timesFailed << " " << std::right << std::setw(10)
854  << p.timesExcept << " " << p.name;
855  });
856 
857  /*
858  std::vector<int>::const_iterator epi = empty_trig_paths_.begin();
859  std::vector<int>::const_iterator epe = empty_trig_paths_.end();
860  std::vector<std::string>::const_iterator epn = empty_trig_path_names_.begin();
861  for (; epi != epe; ++epi, ++epn) {
862 
863  LogFwkVerbatim("FwkSummary") << "TrigReport "
864  << std::right << std::setw(5) << 1
865  << std::right << std::setw(5) << *epi << " "
866  << std::right << std::setw(10) << totalEvents() << " "
867  << std::right << std::setw(10) << totalEvents() << " "
868  << std::right << std::setw(10) << 0 << " "
869  << std::right << std::setw(10) << 0 << " "
870  << *epn << "";
871  }
872  */
873 
874  LogFwkVerbatim("FwkSummary") << "\n"
875  << "TrigReport "
876  << "-------End-Path Summary ------------\n"
877  << "TrigReport " << std::right << std::setw(10) << "Trig Bit#"
878  << " " << std::right << std::setw(10) << "Executed"
879  << " " << std::right << std::setw(10) << "Passed"
880  << " " << std::right << std::setw(10) << "Failed"
881  << " " << std::right << std::setw(10) << "Error"
882  << " "
883  << "Name";
884  logForEach(tr.endPathSummaries, [](auto& logger, auto const& p) {
885  logger << "TrigReport " << std::right << std::setw(5) << 0 << std::right << std::setw(5) << p.bitPosition << " "
886  << std::right << std::setw(10) << p.timesRun << " " << std::right << std::setw(10) << p.timesPassed
887  << " " << std::right << std::setw(10) << p.timesFailed << " " << std::right << std::setw(10)
888  << p.timesExcept << " " << p.name;
889  });
890 
891  for (auto const& p : tr.trigPathSummaries) {
892  LogFwkVerbatim("FwkSummary") << "\n"
893  << "TrigReport "
894  << "---------- Modules in Path: " << p.name << " ------------\n"
895  << "TrigReport " << std::right << std::setw(10) << "Trig Bit#"
896  << " " << std::right << std::setw(10) << "Visited"
897  << " " << std::right << std::setw(10) << "Passed"
898  << " " << std::right << std::setw(10) << "Failed"
899  << " " << std::right << std::setw(10) << "Error"
900  << " "
901  << "Name";
902 
903  logForEach(p.moduleInPathSummaries, [](auto& logger, auto const& mod) {
904  logger << "TrigReport " << std::right << std::setw(5) << 1 << std::right << std::setw(5) << mod.bitPosition
905  << " " << std::right << std::setw(10) << mod.timesVisited << " " << std::right << std::setw(10)
906  << mod.timesPassed << " " << std::right << std::setw(10) << mod.timesFailed << " " << std::right
907  << std::setw(10) << mod.timesExcept << " " << mod.moduleLabel;
908  });
909  }
910 
911  for (auto const& p : tr.endPathSummaries) {
912  LogFwkVerbatim("FwkSummary") << "\n"
913  << "TrigReport "
914  << "------ Modules in End-Path: " << p.name << " ------------\n"
915  << "TrigReport " << std::right << std::setw(10) << "Trig Bit#"
916  << " " << std::right << std::setw(10) << "Visited"
917  << " " << std::right << std::setw(10) << "Passed"
918  << " " << std::right << std::setw(10) << "Failed"
919  << " " << std::right << std::setw(10) << "Error"
920  << " "
921  << "Name";
922 
923  unsigned int bitpos = 0;
924  logForEach(p.moduleInPathSummaries, [&bitpos](auto& logger, auto const& mod) {
925  logger << "TrigReport " << std::right << std::setw(5) << 0 << std::right << std::setw(5) << bitpos << " "
926  << std::right << std::setw(10) << mod.timesVisited << " " << std::right << std::setw(10)
927  << mod.timesPassed << " " << std::right << std::setw(10) << mod.timesFailed << " " << std::right
928  << std::setw(10) << mod.timesExcept << " " << mod.moduleLabel;
929  ++bitpos;
930  });
931  }
932 
933  LogFwkVerbatim("FwkSummary") << "\n"
934  << "TrigReport "
935  << "---------- Module Summary ------------\n"
936  << "TrigReport " << std::right << std::setw(10) << "Visited"
937  << " " << std::right << std::setw(10) << "Executed"
938  << " " << std::right << std::setw(10) << "Passed"
939  << " " << std::right << std::setw(10) << "Failed"
940  << " " << std::right << std::setw(10) << "Error"
941  << " "
942  << "Name";
943 
944  logForEach(tr.workerSummaries, [](auto& logger, auto const& worker) {
945  logger << "TrigReport " << std::right << std::setw(10) << worker.timesVisited << " " << std::right
946  << std::setw(10) << worker.timesRun << " " << std::right << std::setw(10) << worker.timesPassed << " "
947  << std::right << std::setw(10) << worker.timesFailed << " " << std::right << std::setw(10)
948  << worker.timesExcept << " " << worker.moduleLabel;
949  });
950  LogFwkVerbatim("FwkSummary") << "";
951  }
952  // The timing report (CPU and Real Time):
955 
956  const int totalEvents = std::max(1, tr.eventSummary.totalEvents);
957 
958  LogFwkVerbatim("FwkSummary") << "TimeReport "
959  << "---------- Event Summary ---[sec]----";
960  LogFwkVerbatim("FwkSummary") << "TimeReport" << std::setprecision(6) << std::fixed
961  << " event loop CPU/event = " << tr.eventSummary.cpuTime / totalEvents;
962  LogFwkVerbatim("FwkSummary") << "TimeReport" << std::setprecision(6) << std::fixed
963  << " event loop Real/event = " << tr.eventSummary.realTime / totalEvents;
964  LogFwkVerbatim("FwkSummary") << "TimeReport" << std::setprecision(6) << std::fixed
965  << " sum Streams Real/event = " << tr.eventSummary.sumStreamRealTime / totalEvents;
966  LogFwkVerbatim("FwkSummary") << "TimeReport" << std::setprecision(6) << std::fixed
967  << " efficiency CPU/Real/thread = "
970 
971  constexpr int kColumn1Size = 10;
972  constexpr int kColumn2Size = 12;
973  constexpr int kColumn3Size = 12;
974  LogFwkVerbatim("FwkSummary") << "";
975  LogFwkVerbatim("FwkSummary") << "TimeReport "
976  << "---------- Path Summary ---[Real sec]----";
977  LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
978  << " " << std::right << std::setw(kColumn2Size) << "per exec"
979  << " Name";
980  logForEach(tr.trigPathSummaries, [&](auto& logger, auto const& p) {
981  const int timesRun = std::max(1, p.timesRun);
982  logger << "TimeReport " << std::setprecision(6) << std::fixed << std::right << std::setw(kColumn1Size)
983  << p.realTime / totalEvents << " " << std::right << std::setw(kColumn2Size) << p.realTime / timesRun
984  << " " << p.name;
985  });
986  LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
987  << " " << std::right << std::setw(kColumn2Size) << "per exec"
988  << " Name";
989 
990  LogFwkVerbatim("FwkSummary") << "\n"
991  << "TimeReport "
992  << "-------End-Path Summary ---[Real sec]----\n"
993  << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
994  << " " << std::right << std::setw(kColumn2Size) << "per exec"
995  << " Name";
996  logForEach(tr.endPathSummaries, [&](auto& logger, auto const& p) {
997  const int timesRun = std::max(1, p.timesRun);
998 
999  logger << "TimeReport " << std::setprecision(6) << std::fixed << std::right << std::setw(kColumn1Size)
1000  << p.realTime / totalEvents << " " << std::right << std::setw(kColumn2Size) << p.realTime / timesRun
1001  << " " << p.name;
1002  });
1003  LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
1004  << " " << std::right << std::setw(kColumn2Size) << "per exec"
1005  << " Name";
1006 
1007  for (auto const& p : tr.trigPathSummaries) {
1008  LogFwkVerbatim("FwkSummary") << "\n"
1009  << "TimeReport "
1010  << "---------- Modules in Path: " << p.name << " ---[Real sec]----\n"
1011  << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
1012  << " " << std::right << std::setw(kColumn2Size) << "per visit"
1013  << " Name";
1014  logForEach(p.moduleInPathSummaries, [&](auto& logger, auto const& mod) {
1015  logger << "TimeReport " << std::setprecision(6) << std::fixed << std::right << std::setw(kColumn1Size)
1016  << mod.realTime / totalEvents << " " << std::right << std::setw(kColumn2Size)
1017  << mod.realTime / std::max(1, mod.timesVisited) << " " << mod.moduleLabel;
1018  });
1019  }
1020  if (not tr.trigPathSummaries.empty()) {
1021  LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
1022  << " " << std::right << std::setw(kColumn2Size) << "per visit"
1023  << " Name";
1024  }
1025  for (auto const& p : tr.endPathSummaries) {
1026  LogFwkVerbatim("FwkSummary") << "\n"
1027  << "TimeReport "
1028  << "------ Modules in End-Path: " << p.name << " ---[Real sec]----\n"
1029  << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
1030  << " " << std::right << std::setw(kColumn2Size) << "per visit"
1031  << " Name";
1032  logForEach(p.moduleInPathSummaries, [&](auto& logger, auto const& mod) {
1033  logger << "TimeReport " << std::setprecision(6) << std::fixed << std::right << std::setw(kColumn1Size)
1034  << mod.realTime / totalEvents << " " << std::right << std::setw(kColumn2Size)
1035  << mod.realTime / std::max(1, mod.timesVisited) << " " << mod.moduleLabel;
1036  });
1037  }
1038  if (not tr.endPathSummaries.empty()) {
1039  LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
1040  << " " << std::right << std::setw(kColumn2Size) << "per visit"
1041  << " Name";
1042  }
1043  LogFwkVerbatim("FwkSummary") << "\n"
1044  << "TimeReport "
1045  << "---------- Module Summary ---[Real sec]----\n"
1046  << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
1047  << " " << std::right << std::setw(kColumn2Size) << "per exec"
1048  << " " << std::right << std::setw(kColumn3Size) << "per visit"
1049  << " Name";
1050  logForEach(tr.workerSummaries, [&](auto& logger, auto const& worker) {
1051  logger << "TimeReport " << std::setprecision(6) << std::fixed << std::right << std::setw(kColumn1Size)
1052  << worker.realTime / totalEvents << " " << std::right << std::setw(kColumn2Size)
1053  << worker.realTime / std::max(1, worker.timesRun) << " " << std::right << std::setw(kColumn3Size)
1054  << worker.realTime / std::max(1, worker.timesVisited) << " " << worker.moduleLabel;
1055  });
1056  LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
1057  << " " << std::right << std::setw(kColumn2Size) << "per exec"
1058  << " " << std::right << std::setw(kColumn3Size) << "per visit"
1059  << " Name";
1060 
1061  LogFwkVerbatim("FwkSummary") << "\nT---Report end!\n\n";
1062  }
1063 
1065  using std::placeholders::_1;
1067  for (auto& worker : allWorkers()) {
1068  worker->respondToCloseOutputFile();
1069  }
1070  }
1071 
1073  using std::placeholders::_1;
1074  for_all(all_output_communicators_, std::bind(&OutputModuleCommunicator::openFile, _1, std::cref(fb)));
1075  }
1076 
1078  RunPrincipal const& rp,
1079  ProcessContext const* processContext,
1080  ActivityRegistry* activityRegistry,
1081  MergeableRunProductMetadata const* mergeableRunProductMetadata) {
1084  LuminosityBlockID(rp.run(), 0),
1085  rp.index(),
1087  rp.endTime(),
1088  processContext);
1089 
1090  using namespace edm::waiting_task;
1091  chain::first([&](auto nextTask) {
1092  //services can depend on other services
1094 
1095  // Propagating the exception would be nontrivial, and signal actions are not supposed to throw exceptions
1096  CMS_SA_ALLOW try { activityRegistry->preGlobalWriteRunSignal_(globalContext); } catch (...) {
1097  }
1098  for (auto& c : all_output_communicators_) {
1099  c->writeRunAsync(nextTask, rp, processContext, activityRegistry, mergeableRunProductMetadata);
1100  }
1101  }) | chain::then(doCleanup([activityRegistry, globalContext, token]() {
1102  //services can depend on other services
1104 
1105  activityRegistry->postGlobalWriteRunSignal_(globalContext);
1106  })) |
1108  }
1109 
1111  ProcessBlockPrincipal const& pbp,
1112  ProcessContext const* processContext,
1113  ActivityRegistry* activityRegistry) {
1120  processContext);
1121 
1122  using namespace edm::waiting_task;
1123  chain::first([&](auto nextTask) {
1124  // Propagating the exception would be nontrivial, and signal actions are not supposed to throw exceptions
1126  CMS_SA_ALLOW try { activityRegistry->preWriteProcessBlockSignal_(globalContext); } catch (...) {
1127  }
1128  for (auto& c : all_output_communicators_) {
1129  c->writeProcessBlockAsync(nextTask, pbp, processContext, activityRegistry);
1130  }
1131  }) | chain::then(doCleanup([activityRegistry, globalContext, token]() {
1132  //services can depend on other services
1134 
1135  activityRegistry->postWriteProcessBlockSignal_(globalContext);
1136  })) |
1138  }
1139 
1141  LuminosityBlockPrincipal const& lbp,
1142  ProcessContext const* processContext,
1143  ActivityRegistry* activityRegistry) {
1146  lbp.id(),
1147  lbp.runPrincipal().index(),
1148  lbp.index(),
1149  lbp.beginTime(),
1150  processContext);
1151 
1152  using namespace edm::waiting_task;
1153  chain::first([&](auto nextTask) {
1155  CMS_SA_ALLOW try { activityRegistry->preGlobalWriteLumiSignal_(globalContext); } catch (...) {
1156  }
1157  for (auto& c : all_output_communicators_) {
1158  c->writeLumiAsync(nextTask, lbp, processContext, activityRegistry);
1159  }
1160  }) | chain::then(doCleanup([activityRegistry, globalContext, token]() {
1161  //services can depend on other services
1163 
1164  activityRegistry->postGlobalWriteLumiSignal_(globalContext);
1165  })) |
1167  }
1168 
1170  using std::placeholders::_1;
1171  // Return true iff at least one output module returns true.
1172  return (std::find_if(all_output_communicators_.begin(),
1176  }
1177 
1179  using std::placeholders::_1;
1180  for_all(allWorkers(), std::bind(&Worker::respondToOpenInputFile, _1, std::cref(fb)));
1181  }
1182 
1184  using std::placeholders::_1;
1185  for_all(allWorkers(), std::bind(&Worker::respondToCloseInputFile, _1, std::cref(fb)));
1186  }
1187 
1188  void Schedule::beginJob(ProductRegistry const& iRegistry,
1190  ProcessBlockHelperBase const& processBlockHelperBase,
1191  PathsAndConsumesOfModulesBase const& pathsAndConsumesOfModules,
1192  ProcessContext const& processContext) {
1193  globalSchedule_->beginJob(iRegistry, iESIndices, processBlockHelperBase, pathsAndConsumesOfModules, processContext);
1194  }
1195 
1196  void Schedule::beginStream(unsigned int streamID) {
1197  assert(streamID < streamSchedules_.size());
1198  streamSchedules_[streamID]->beginStream();
1199  }
1200 
1201  void Schedule::endStream(unsigned int streamID, ExceptionCollector& collector, std::mutex& collectorMutex) noexcept {
1202  assert(streamID < streamSchedules_.size());
1203  streamSchedules_[streamID]->endStream(collector, collectorMutex);
1204  }
1205 
1207  unsigned int iStreamID,
1209  ServiceToken const& token) {
1210  assert(iStreamID < streamSchedules_.size());
1211  streamSchedules_[iStreamID]->processOneEventAsync(std::move(iTask), info, token, pathStatusInserters_);
1212  }
1213 
1215  ParameterSet const& iPSet,
1216  const ProductRegistry& iRegistry,
1218  Worker* found = nullptr;
1219  for (auto const& worker : allWorkers()) {
1220  if (worker->description()->moduleLabel() == iLabel) {
1221  found = worker;
1222  break;
1223  }
1224  }
1225  if (nullptr == found) {
1226  return false;
1227  }
1228 
1229  auto newMod = moduleRegistry_->replaceModule(iLabel, iPSet, preallocConfig_);
1230 
1231  globalSchedule_->replaceModule(newMod, iLabel);
1232 
1233  for (auto& s : streamSchedules_) {
1234  s->replaceModule(newMod, iLabel);
1235  }
1236 
1237  {
1238  //Need to updateLookup in order to make getByToken work
1239  auto const processBlockLookup = iRegistry.productLookup(InProcess);
1240  auto const runLookup = iRegistry.productLookup(InRun);
1241  auto const lumiLookup = iRegistry.productLookup(InLumi);
1242  auto const eventLookup = iRegistry.productLookup(InEvent);
1243  found->updateLookup(InProcess, *runLookup);
1244  found->updateLookup(InRun, *runLookup);
1245  found->updateLookup(InLumi, *lumiLookup);
1246  found->updateLookup(InEvent, *eventLookup);
1247  found->updateLookup(iIndices);
1248 
1249  auto const& processName = newMod->moduleDescription().processName();
1250  auto const& processBlockModuleToIndicies = processBlockLookup->indiciesForModulesInProcess(processName);
1251  auto const& runModuleToIndicies = runLookup->indiciesForModulesInProcess(processName);
1252  auto const& lumiModuleToIndicies = lumiLookup->indiciesForModulesInProcess(processName);
1253  auto const& eventModuleToIndicies = eventLookup->indiciesForModulesInProcess(processName);
1254  found->resolvePutIndicies(InProcess, processBlockModuleToIndicies);
1255  found->resolvePutIndicies(InRun, runModuleToIndicies);
1256  found->resolvePutIndicies(InLumi, lumiModuleToIndicies);
1257  found->resolvePutIndicies(InEvent, eventModuleToIndicies);
1258  }
1259 
1260  return true;
1261  }
1262 
1264  globalSchedule_->deleteModule(iLabel);
1265  for (auto& stream : streamSchedules_) {
1266  stream->deleteModule(iLabel);
1267  }
1268  moduleRegistry_->deleteModule(iLabel, areg->preModuleDestructionSignal_, areg->postModuleDestructionSignal_);
1269  }
1270 
1271  void Schedule::initializeEarlyDelete(std::vector<std::string> const& branchesToDeleteEarly,
1272  std::multimap<std::string, std::string> const& referencesToBranches,
1273  std::vector<std::string> const& modulesToSkip,
1274  edm::ProductRegistry const& preg) {
1275  for (auto& stream : streamSchedules_) {
1276  stream->initializeEarlyDelete(
1277  *moduleRegistry(), branchesToDeleteEarly, referencesToBranches, modulesToSkip, preg);
1278  }
1279  }
1280 
1281  std::vector<ModuleDescription const*> Schedule::getAllModuleDescriptions() const {
1282  std::vector<ModuleDescription const*> result;
1283  result.reserve(allWorkers().size());
1284 
1285  for (auto const& worker : allWorkers()) {
1286  ModuleDescription const* p = worker->description();
1287  result.push_back(p);
1288  }
1289  return result;
1290  }
1291 
1292  Schedule::AllWorkers const& Schedule::allWorkers() const { return globalSchedule_->allWorkers(); }
1293 
1295  for (auto const& worker : allWorkers()) {
1296  worker->convertCurrentProcessAlias(processName);
1297  }
1298  }
1299 
1300  void Schedule::availablePaths(std::vector<std::string>& oLabelsToFill) const {
1301  streamSchedules_[0]->availablePaths(oLabelsToFill);
1302  }
1303 
1304  void Schedule::triggerPaths(std::vector<std::string>& oLabelsToFill) const { oLabelsToFill = *pathNames_; }
1305 
1306  void Schedule::endPaths(std::vector<std::string>& oLabelsToFill) const { oLabelsToFill = *endPathNames_; }
1307 
1308  void Schedule::modulesInPath(std::string const& iPathLabel, std::vector<std::string>& oLabelsToFill) const {
1309  streamSchedules_[0]->modulesInPath(iPathLabel, oLabelsToFill);
1310  }
1311 
1313  std::vector<ModuleDescription const*>& descriptions,
1314  unsigned int hint) const {
1315  streamSchedules_[0]->moduleDescriptionsInPath(iPathLabel, descriptions, hint);
1316  }
1317 
1319  std::vector<ModuleDescription const*>& descriptions,
1320  unsigned int hint) const {
1321  streamSchedules_[0]->moduleDescriptionsInEndPath(iEndPathLabel, descriptions, hint);
1322  }
1323 
1325  std::vector<ModuleDescription const*>& allModuleDescriptions,
1326  std::vector<std::pair<unsigned int, unsigned int>>& moduleIDToIndex,
1327  std::array<std::vector<std::vector<ModuleDescription const*>>, NumBranchTypes>& modulesWhoseProductsAreConsumedBy,
1328  std::vector<std::vector<ModuleProcessName>>& modulesInPreviousProcessesWhoseProductsAreConsumedBy,
1329  ProductRegistry const& preg) const {
1330  allModuleDescriptions.clear();
1331  moduleIDToIndex.clear();
1332  for (auto iBranchType = 0U; iBranchType < NumBranchTypes; ++iBranchType) {
1333  modulesWhoseProductsAreConsumedBy[iBranchType].clear();
1334  }
1335  modulesInPreviousProcessesWhoseProductsAreConsumedBy.clear();
1336 
1337  allModuleDescriptions.reserve(allWorkers().size());
1338  moduleIDToIndex.reserve(allWorkers().size());
1339  for (auto iBranchType = 0U; iBranchType < NumBranchTypes; ++iBranchType) {
1340  modulesWhoseProductsAreConsumedBy[iBranchType].resize(allWorkers().size());
1341  }
1342  modulesInPreviousProcessesWhoseProductsAreConsumedBy.resize(allWorkers().size());
1343 
1344  std::map<std::string, ModuleDescription const*> labelToDesc;
1345  unsigned int i = 0;
1346  for (auto const& worker : allWorkers()) {
1347  ModuleDescription const* p = worker->description();
1348  allModuleDescriptions.push_back(p);
1349  moduleIDToIndex.push_back(std::pair<unsigned int, unsigned int>(p->id(), i));
1350  labelToDesc[p->moduleLabel()] = p;
1351  ++i;
1352  }
1353  sort_all(moduleIDToIndex);
1354 
1355  i = 0;
1356  for (auto const& worker : allWorkers()) {
1357  std::array<std::vector<ModuleDescription const*>*, NumBranchTypes> modules;
1358  for (auto iBranchType = 0U; iBranchType < NumBranchTypes; ++iBranchType) {
1359  modules[iBranchType] = &modulesWhoseProductsAreConsumedBy[iBranchType].at(i);
1360  }
1361 
1362  std::vector<ModuleProcessName>& modulesInPreviousProcesses =
1363  modulesInPreviousProcessesWhoseProductsAreConsumedBy.at(i);
1364  try {
1365  worker->modulesWhoseProductsAreConsumed(modules, modulesInPreviousProcesses, preg, labelToDesc);
1366  } catch (cms::Exception& ex) {
1367  ex.addContext("Calling Worker::modulesWhoseProductsAreConsumed() for module " +
1368  worker->description()->moduleLabel());
1369  throw;
1370  }
1371  ++i;
1372  }
1373  }
1374 
1376  rep.eventSummary.totalEvents = 0;
1377  rep.eventSummary.totalEventsPassed = 0;
1378  rep.eventSummary.totalEventsFailed = 0;
1379  for (auto& s : streamSchedules_) {
1380  s->getTriggerReport(rep);
1381  }
1382  sort_all(rep.workerSummaries);
1383  }
1384 
1386  rep.eventSummary.totalEvents = 0;
1387  rep.eventSummary.cpuTime = 0.;
1388  rep.eventSummary.realTime = 0.;
1389  summaryTimeKeeper_->fillTriggerTimingReport(rep);
1390  }
1391 
1393  int returnValue = 0;
1394  for (auto& s : streamSchedules_) {
1395  returnValue += s->totalEvents();
1396  }
1397  return returnValue;
1398  }
1399 
1401  int returnValue = 0;
1402  for (auto& s : streamSchedules_) {
1403  returnValue += s->totalEventsPassed();
1404  }
1405  return returnValue;
1406  }
1407 
1409  int returnValue = 0;
1410  for (auto& s : streamSchedules_) {
1411  returnValue += s->totalEventsFailed();
1412  }
1413  return returnValue;
1414  }
1415 
1417  for (auto& s : streamSchedules_) {
1418  s->clearCounters();
1419  }
1420  }
1421 } // namespace edm
size
Write out results.
void endPaths(std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill the labels for all end paths in the process
Definition: Schedule.cc:1306
std::vector< PathSummary > endPathSummaries
Definition: TriggerReport.h:60
std::vector< PathTimingSummary > endPathSummaries
dictionary aliases
Definition: autoCond.py:108
void addException(cms::Exception const &exception)
PostModuleDestruction postModuleDestructionSignal_
T getParameter(std::string const &) const
Definition: ParameterSet.h:307
static const TGPicture * info(bool iBackgroundIsBlack)
bool terminate() const
Return whether each output module has reached its maximum count.
Definition: Schedule.cc:768
#define CMS_SA_ALLOW
Timestamp const & endTime() const
Definition: RunPrincipal.h:69
void stopEvent(StreamContext const &)
std::vector< BranchIDList > BranchIDLists
Definition: BranchIDList.h:19
virtual void openFile(FileBlock const &fb)=0
void getTriggerTimingReport(TriggerTimingReport &rep) const
Definition: Schedule.cc:1385
roAction_t actions[nactions]
Definition: GenABIO.cc:181
void writeProcessBlockAsync(WaitingTaskHolder iTask, ProcessBlockPrincipal const &, ProcessContext const *, ActivityRegistry *)
Definition: Schedule.cc:1110
static Timestamp invalidTimestamp()
Definition: Timestamp.h:75
void restartModuleEvent(StreamContext const &, ModuleCallingContext const &)
void respondToCloseInputFile(FileBlock const &fb)
Definition: Schedule.cc:1183
int totalEvents() const
Definition: Schedule.cc:1392
vector< string > vstring
Definition: ExoticaDQM.cc:7
void startModuleEvent(StreamContext const &, ModuleCallingContext const &)
std::vector< Worker * > AllWorkers
Definition: Schedule.h:126
RunPrincipal const & runPrincipal() const
void moduleDescriptionsInPath(std::string const &iPathLabel, std::vector< ModuleDescription const *> &descriptions, unsigned int hint) const
Definition: Schedule.cc:1312
void removeModuleIfExists(ModuleDescription const &module)
static std::mutex mutex
Definition: Proxy.cc:8
std::vector< std::string > const * pathNames_
Definition: Schedule.h:330
void getTriggerReport(TriggerReport &rep) const
Definition: Schedule.cc:1375
void convertCurrentProcessAlias(std::string const &processName)
Convert "@currentProcess" in InputTag process names to the actual current process name...
Definition: Schedule.cc:1294
RunNumber_t run() const
Definition: RunPrincipal.h:61
BranchIDLists const & branchIDLists() const
ParameterSet getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
PostGlobalEndLumi postGlobalWriteLumiSignal_
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
std::vector< WorkerSummary > workerSummaries
Definition: TriggerReport.h:61
edm::propagate_const< std::unique_ptr< SystemTimeKeeper > > summaryTimeKeeper_
Definition: Schedule.h:328
void find(edm::Handle< EcalRecHitCollection > &hits, DetId thisDet, std::vector< EcalRecHitCollection::const_iterator > &hit, bool debug=false)
Definition: FindCaloHit.cc:19
assert(be >=bs)
PreModuleDestruction preModuleDestructionSignal_
constexpr auto then(O &&iO)
Definition: chain_first.h:277
void processOneEventAsync(WaitingTaskHolder iTask, unsigned int iStreamID, EventTransitionInfo &, ServiceToken const &token)
Definition: Schedule.cc:1206
static unsigned int getUniqueID()
Returns a unique id each time called. Intended to be passed to ModuleDescription&#39;s constructor&#39;s modI...
void updateFromRegistry(ProductRegistry const &reg)
size_t getParameterSetNames(std::vector< std::string > &output, bool trackiness=true) const
edm::propagate_const< std::unique_ptr< GlobalSchedule > > globalSchedule_
Definition: Schedule.h:323
void eraseOrSetUntrackedParameterSet(std::string const &name)
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
bool anyProductProduced() const
T getUntrackedParameter(std::string const &, T const &) const
static RunIndex invalidRunIndex()
Definition: RunIndex.cc:9
void processEDAliases(std::vector< std::string > const &aliasNamesToProcess, std::unordered_set< std::string > const &aliasModulesToProcess, ParameterSet const &proc_pset, std::string const &processName, ProductRegistry &preg)
void deleteModule(std::string const &iLabel, ActivityRegistry *areg)
Deletes module with label iLabel.
Definition: Schedule.cc:1263
ParameterSetID id() const
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
Definition: Schedule.cc:1281
void updateForNewProcess(ProductRegistry const &, std::string const &processName)
ParameterSet const & registerIt()
std::vector< PathSummary > trigPathSummaries
Definition: TriggerReport.h:59
EventSummary eventSummary
Definition: TriggerReport.h:58
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter > > > pathStatusInserters_
Definition: Schedule.h:318
bool changeModule(std::string const &iLabel, ParameterSet const &iPSet, const ProductRegistry &iRegistry, eventsetup::ESRecordsToProductResolverIndices const &)
Definition: Schedule.cc:1214
void fillModuleAndConsumesInfo(std::vector< ModuleDescription const *> &allModuleDescriptions, std::vector< std::pair< unsigned int, unsigned int >> &moduleIDToIndex, std::array< std::vector< std::vector< ModuleDescription const *>>, NumBranchTypes > &modulesWhoseProductsAreConsumedBy, std::vector< std::vector< ModuleProcessName >> &modulesInPreviousProcessesWhoseProductsAreConsumedBy, ProductRegistry const &preg) const
Definition: Schedule.cc:1324
bool empty() const
Definition: ParameterSet.h:202
EventTimingSummary eventSummary
Strings const & getTrigPaths() const
Schedule(ParameterSet &proc_pset, service::TriggerNamesService const &tns, ProductRegistry &pregistry, ExceptionToActionTable const &actions, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, PreallocationConfiguration const &config, ProcessContext const *processContext, ModuleTypeResolverMaker const *resolverMaker)
Definition: Schedule.cc:486
void setUnscheduledProducts(std::set< std::string > const &unscheduledLabels)
void clearCounters()
Clear all the counters in the trigger report.
Definition: Schedule.cc:1416
std::vector< std::string > getParameterNamesForType(bool trackiness=true) const
Definition: ParameterSet.h:180
PostGlobalWriteRun postGlobalWriteRunSignal_
static ServiceRegistry & instance()
std::vector< PathTimingSummary > trigPathSummaries
RunIndex index() const
Definition: RunPrincipal.h:56
double f[11][100]
void limitOutput(ParameterSet const &proc_pset, BranchIDLists const &branchIDLists, SubProcessParentageHelper const *subProcessParentageHelper)
Definition: Schedule.cc:727
void setFrozen(bool initializeLookupInfo=true)
void respondToOpenInputFile(FileBlock const &fb)
Definition: Schedule.cc:1178
edm::propagate_const< std::shared_ptr< ModuleRegistry > > moduleRegistry_
Definition: Schedule.h:320
Timestamp const & beginTime() const
rep
Definition: cuy.py:1189
void beginStream(unsigned int streamID)
Definition: Schedule.cc:1196
Definition: logger.py:1
void writeRunAsync(WaitingTaskHolder iTask, RunPrincipal const &rp, ProcessContext const *, ActivityRegistry *, MergeableRunProductMetadata const *)
Definition: Schedule.cc:1077
void stopPath(StreamContext const &, PathContext const &, HLTPathStatus const &)
int totalEventsPassed() const
Definition: Schedule.cc:1400
std::shared_ptr< ModuleRegistry const > moduleRegistry() const
Definition: Schedule.h:314
static LuminosityBlockIndex invalidLuminosityBlockIndex()
PreGlobalWriteRun preGlobalWriteRunSignal_
void stopModuleEvent(StreamContext const &, ModuleCallingContext const &)
virtual bool shouldWeCloseFile() const =0
void availablePaths(std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill the labels for all paths in the process
Definition: Schedule.cc:1300
Log< level::Info, false > LogInfo
PreallocationConfiguration preallocConfig_
Definition: Schedule.h:326
Log< level::FwkInfo, true > LogFwkVerbatim
PreWriteProcessBlock preWriteProcessBlockSignal_
std::vector< edm::propagate_const< std::shared_ptr< StreamSchedule > > > streamSchedules_
Definition: Schedule.h:321
Strings const & getEndPaths() const
void sort_all(RandomAccessSequence &s)
wrappers for std::sort
Definition: Algorithms.h:92
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
Definition: Schedule.cc:1292
std::shared_ptr< ProductResolverIndexHelper const > productLookup(BranchType branchType) const
void endStream(unsigned int streamID, ExceptionCollector &collector, std::mutex &collectorMutex) noexcept
Definition: Schedule.cc:1201
void respondToOpenInputFile(FileBlock const &fb)
Definition: Worker.h:185
void startPath(StreamContext const &, PathContext const &)
bool search_all(ForwardSequence const &s, Datum const &d)
Definition: Algorithms.h:36
PostWriteProcessBlock postWriteProcessBlockSignal_
bool shouldWeCloseOutput() const
Definition: Schedule.cc:1169
ProductList & productListUpdator()
void addContext(std::string const &context)
Definition: Exception.cc:169
void moduleDescriptionsInEndPath(std::string const &iEndPathLabel, std::vector< ModuleDescription const *> &descriptions, unsigned int hint) const
Definition: Schedule.cc:1318
virtual std::unique_ptr< OutputModuleCommunicator > createOutputModuleCommunicator()=0
AllOutputModuleCommunicators all_output_communicators_
Definition: Schedule.h:325
void pauseModuleEvent(StreamContext const &, ModuleCallingContext const &)
PreGlobalEndLumi preGlobalWriteLumiSignal_
void respondToCloseInputFile(FileBlock const &fb)
Definition: Worker.h:186
bool wantSummary_
Definition: Schedule.h:332
std::vector< std::string > const * endPathNames_
Definition: Schedule.h:331
HLT enums.
void modulesInPath(std::string const &iPathLabel, std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel ...
Definition: Schedule.cc:1308
LuminosityBlockIndex index() const
void finishSetup(ParameterSet &proc_pset, service::TriggerNamesService const &tns, ProductRegistry &preg, BranchIDListHelper &branchIDListHelper, ProcessBlockHelperBase &processBlockHelper, ThinnedAssociationsHelper &thinnedAssociationsHelper, SubProcessParentageHelper const *subProcessParentageHelper, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration > processConfiguration, bool hasSubprocesses, PreallocationConfiguration const &prealloc, ProcessContext const *processContext)
Definition: Schedule.cc:578
Definition: output.py:1
void sendFwkSummaryToMessageLogger() const
Definition: Schedule.cc:798
auto wrap(F iFunc) -> decltype(iFunc())
static uInt32 F(BLOWFISH_CTX *ctx, uInt32 x)
Definition: blowfish.cc:163
void openOutputFiles(FileBlock &fb)
Definition: Schedule.cc:1072
void writeLumiAsync(WaitingTaskHolder iTask, LuminosityBlockPrincipal const &lbp, ProcessContext const *, ActivityRegistry *)
Definition: Schedule.cc:1140
std::vector< WorkerTimingSummary > workerSummaries
T mod(const T &a, const T &b)
Definition: ecalDccMap.h:4
void initializeEarlyDelete(std::vector< std::string > const &branchesToDeleteEarly, std::multimap< std::string, std::string > const &referencesToBranches, std::vector< std::string > const &modulesToSkip, edm::ProductRegistry const &preg)
Definition: Schedule.cc:1271
void endJob(ExceptionCollector &collector)
Definition: Schedule.cc:783
std::vector< std::string > vstring
Definition: Schedule.cc:482
ServiceToken presentToken() const
void setIsMergeable(BranchDescription &)
std::vector< std::string > set_difference(std::vector< std::string > const &v1, std::vector< std::string > const &v2)
int totalEventsFailed() const
Definition: Schedule.cc:1408
def move(src, dest)
Definition: eostools.py:511
void beginJob(ProductRegistry const &, eventsetup::ESRecordsToProductResolverIndices const &, ProcessBlockHelperBase const &, PathsAndConsumesOfModulesBase const &, ProcessContext const &)
Definition: Schedule.cc:1188
void closeOutputFiles()
Definition: Schedule.cc:1064
void triggerPaths(std::vector< std::string > &oLabelsToFill) const
Definition: Schedule.cc:1304
unsigned transform(const HcalDetId &id, unsigned transformCode)