CMS 3D CMS Logo

Schedule.cc
Go to the documentation of this file.
2 
38 
39 #include <algorithm>
40 #include <cassert>
41 #include <cstdlib>
42 #include <functional>
43 #include <iomanip>
44 #include <list>
45 #include <map>
46 #include <set>
47 #include <exception>
48 #include <sstream>
49 
51 #include "processEDAliases.h"
52 
53 namespace edm {
54 
55  class Maker;
56 
57  namespace {
58  using std::placeholders::_1;
59 
60  bool binary_search_string(std::vector<std::string> const& v, std::string const& s) {
61  return std::binary_search(v.begin(), v.end(), s);
62  }
63 
64  // Here we make the trigger results inserter directly. This should
65  // probably be a utility in the WorkerRegistry or elsewhere.
66 
67  std::shared_ptr<TriggerResultInserter> makeInserter(ParameterSet& proc_pset,
68  PreallocationConfiguration const& iPrealloc,
69  ProductRegistry& preg,
70  ExceptionToActionTable const& actions,
71  std::shared_ptr<ActivityRegistry> areg,
72  std::shared_ptr<ProcessConfiguration> processConfiguration) {
73  ParameterSet* trig_pset = proc_pset.getPSetForUpdate("@trigger_paths");
74  trig_pset->registerIt();
75 
76  WorkerParams work_args(trig_pset, preg, &iPrealloc, processConfiguration, actions);
77  ModuleDescription md(trig_pset->id(),
78  "TriggerResultInserter",
79  "TriggerResults",
80  processConfiguration.get(),
82 
83  areg->preModuleConstructionSignal_(md);
84  bool postCalled = false;
85  std::shared_ptr<TriggerResultInserter> returnValue;
86  // Caught exception is rethrown
87  CMS_SA_ALLOW try {
88  maker::ModuleHolderT<TriggerResultInserter> holder(
89  make_shared_noexcept_false<TriggerResultInserter>(*trig_pset, iPrealloc.numberOfStreams()),
90  static_cast<Maker const*>(nullptr));
91  holder.setModuleDescription(md);
92  holder.registerProductsAndCallbacks(&preg);
93  returnValue = holder.module();
94  postCalled = true;
95  // if exception then post will be called in the catch block
96  areg->postModuleConstructionSignal_(md);
97  } catch (...) {
98  if (!postCalled) {
99  CMS_SA_ALLOW try { areg->postModuleConstructionSignal_(md); } catch (...) {
100  // If post throws an exception ignore it because we are already handling another exception
101  }
102  }
103  throw;
104  }
105  return returnValue;
106  }
107 
108  template <typename T>
109  void makePathStatusInserters(std::vector<edm::propagate_const<std::shared_ptr<T>>>& pathStatusInserters,
110  std::vector<std::string> const& pathNames,
111  PreallocationConfiguration const& iPrealloc,
112  ProductRegistry& preg,
113  std::shared_ptr<ActivityRegistry> areg,
114  std::shared_ptr<ProcessConfiguration> processConfiguration,
115  std::string const& moduleTypeName) {
117  pset.addParameter<std::string>("@module_type", moduleTypeName);
118  pset.addParameter<std::string>("@module_edm_type", "EDProducer");
119  pset.registerIt();
120 
121  pathStatusInserters.reserve(pathNames.size());
122 
123  for (auto const& pathName : pathNames) {
124  ModuleDescription md(
125  pset.id(), moduleTypeName, pathName, processConfiguration.get(), ModuleDescription::getUniqueID());
126 
127  areg->preModuleConstructionSignal_(md);
128  bool postCalled = false;
129  // Caught exception is rethrown
130  CMS_SA_ALLOW try {
131  maker::ModuleHolderT<T> holder(make_shared_noexcept_false<T>(iPrealloc.numberOfStreams()),
132  static_cast<Maker const*>(nullptr));
133  holder.setModuleDescription(md);
134  holder.registerProductsAndCallbacks(&preg);
135  pathStatusInserters.emplace_back(holder.module());
136  postCalled = true;
137  // if exception then post will be called in the catch block
138  areg->postModuleConstructionSignal_(md);
139  } catch (...) {
140  if (!postCalled) {
141  CMS_SA_ALLOW try { areg->postModuleConstructionSignal_(md); } catch (...) {
142  // If post throws an exception ignore it because we are already handling another exception
143  }
144  }
145  throw;
146  }
147  }
148  }
149 
150  typedef std::vector<std::string> vstring;
151 
152  void processSwitchProducers(ParameterSet const& proc_pset, std::string const& processName, ProductRegistry& preg) {
153  // Update Switch BranchDescriptions for the chosen case
154  struct BranchesCases {
155  BranchesCases(std::vector<std::string> cases) : caseLabels{std::move(cases)} {}
156  std::vector<BranchKey> chosenBranches;
157  std::vector<std::string> caseLabels;
158  };
159  std::map<std::string, BranchesCases> switchMap;
160  for (auto& prod : preg.productListUpdator()) {
161  if (prod.second.isSwitchAlias()) {
162  auto it = switchMap.find(prod.second.moduleLabel());
163  if (it == switchMap.end()) {
164  auto const& switchPSet = proc_pset.getParameter<edm::ParameterSet>(prod.second.moduleLabel());
165  auto inserted = switchMap.emplace(prod.second.moduleLabel(),
166  switchPSet.getParameter<std::vector<std::string>>("@all_cases"));
167  assert(inserted.second);
168  it = inserted.first;
169  }
170 
171  bool found = false;
172  for (auto const& productIter : preg.productList()) {
173  BranchKey const& branchKey = productIter.first;
174  // The alias-for product must be in the same process as
175  // the SwitchProducer (earlier processes or SubProcesses
176  // may contain products with same type, module label, and
177  // instance name)
178  if (branchKey.processName() != processName) {
179  continue;
180  }
181 
182  BranchDescription const& desc = productIter.second;
183  if (desc.branchType() == prod.second.branchType() and
184  desc.unwrappedTypeID().typeInfo() == prod.second.unwrappedTypeID().typeInfo() and
185  branchKey.moduleLabel() == prod.second.switchAliasModuleLabel() and
186  branchKey.productInstanceName() == prod.second.productInstanceName()) {
187  prod.second.setSwitchAliasForBranch(desc);
188  it->second.chosenBranches.push_back(prod.first); // with moduleLabel of the Switch
189  found = true;
190  }
191  }
192  if (not found) {
194  ex << "Trying to find a BranchDescription to be aliased-for by SwitchProducer with\n"
195  << " friendly class name = " << prod.second.friendlyClassName() << "\n"
196  << " module label = " << prod.second.moduleLabel() << "\n"
197  << " product instance name = " << prod.second.productInstanceName() << "\n"
198  << " process name = " << processName
199  << "\n\nbut did not find any. Please contact a framework developer.";
200  ex.addContext("Calling Schedule.cc:processSwitchProducers()");
201  throw ex;
202  }
203  }
204  }
205  if (switchMap.empty())
206  return;
207 
208  for (auto& elem : switchMap) {
209  std::sort(elem.second.chosenBranches.begin(), elem.second.chosenBranches.end());
210  }
211 
212  auto addProductsToException = [&preg, &processName](auto const& caseLabels, edm::Exception& ex) {
213  std::map<std::string, std::vector<BranchKey>> caseBranches;
214  for (auto const& item : preg.productList()) {
215  if (item.first.processName() != processName)
216  continue;
217 
218  if (auto found = std::find(caseLabels.begin(), caseLabels.end(), item.first.moduleLabel());
219  found != caseLabels.end()) {
220  caseBranches[*found].push_back(item.first);
221  }
222  }
223 
224  for (auto const& caseLabel : caseLabels) {
225  ex << "Products for case " << caseLabel << " (friendly class name, product instance name):\n";
226  auto& branches = caseBranches[caseLabel];
227  std::sort(branches.begin(), branches.end());
228  for (auto const& branch : branches) {
229  ex << " " << branch.friendlyClassName() << " " << branch.productInstanceName() << "\n";
230  }
231  }
232  };
233 
234  // Check that non-chosen cases declare exactly the same branches
235  // Also set the alias-for branches to transient
236  std::vector<bool> foundBranches;
237  for (auto const& switchItem : switchMap) {
238  auto const& switchLabel = switchItem.first;
239  auto const& chosenBranches = switchItem.second.chosenBranches;
240  auto const& caseLabels = switchItem.second.caseLabels;
241  foundBranches.resize(chosenBranches.size());
242  for (auto const& caseLabel : caseLabels) {
243  std::fill(foundBranches.begin(), foundBranches.end(), false);
244  for (auto& nonConstItem : preg.productListUpdator()) {
245  auto const& item = nonConstItem;
246  if (item.first.moduleLabel() == caseLabel and item.first.processName() == processName) {
247  // Set the alias-for branch as transient so it gets fully ignored in output.
248  // I tried first to implicitly drop all branches with
249  // '@' in ProductSelector, but that gave problems on
250  // input (those branches would be implicitly dropped on
251  // input as well, leading to the SwitchProducer branches
252  // do be dropped as dependent ones, as the alias
253  // detection logic in RootFile says that the
254  // SwitchProducer branches are not alias branches)
255  nonConstItem.second.setTransient(true);
256 
257  auto range = std::equal_range(chosenBranches.begin(),
258  chosenBranches.end(),
259  BranchKey(item.first.friendlyClassName(),
260  switchLabel,
261  item.first.productInstanceName(),
262  item.first.processName()));
263  if (range.first == range.second) {
265  ex << "SwitchProducer " << switchLabel << " has a case " << caseLabel << " with a product "
266  << item.first << " that is not produced by the chosen case "
267  << proc_pset.getParameter<edm::ParameterSet>(switchLabel)
268  .getUntrackedParameter<std::string>("@chosen_case")
269  << ". If the intention is to produce only a subset of the products listed below, each case with "
270  "more products needs to be replaced with an EDAlias to only the necessary products, and the "
271  "EDProducer itself needs to be moved to a Task.\n\n";
272  addProductsToException(caseLabels, ex);
273  throw ex;
274  }
275  assert(std::distance(range.first, range.second) == 1);
276  foundBranches[std::distance(chosenBranches.begin(), range.first)] = true;
277 
278  // Check that there are no BranchAliases for any of the cases
279  auto const& bd = item.second;
280  if (not bd.branchAliases().empty()) {
282  << "SwitchProducer does not support ROOT branch aliases. Got the following ROOT branch "
283  "aliases for SwitchProducer with label "
284  << switchLabel << " for case " << caseLabel << ":";
285  for (auto const& branchAlias : bd.branchAliases()) {
286  ex << " " << branchAlias;
287  }
288  throw ex;
289  }
290  }
291  }
292 
293  for (size_t i = 0; i < chosenBranches.size(); i++) {
294  if (not foundBranches[i]) {
295  auto chosenLabel = proc_pset.getParameter<edm::ParameterSet>(switchLabel)
296  .getUntrackedParameter<std::string>("@chosen_case");
298  ex << "SwitchProducer " << switchLabel << " has a case " << caseLabel
299  << " that does not produce a product " << chosenBranches[i] << " that is produced by the chosen case "
300  << chosenLabel
301  << ". If the intention is to produce only a subset of the products listed below, each case with more "
302  "products needs to be replaced with an EDAlias to only the necessary products, and the "
303  "EDProducer itself needs to be moved to a Task.\n\n";
304  addProductsToException(caseLabels, ex);
305  throw ex;
306  }
307  }
308  }
309  }
310  }
311 
312  void reduceParameterSet(ParameterSet& proc_pset,
313  vstring const& end_path_name_list,
314  vstring& modulesInConfig,
315  std::set<std::string> const& usedModuleLabels,
316  std::map<std::string, std::vector<std::pair<std::string, int>>>& outputModulePathPositions) {
317  // Before calculating the ParameterSetID of the top level ParameterSet or
318  // saving it in the registry drop from the top level ParameterSet all
319  // OutputModules and EDAnalyzers not on trigger paths. If unscheduled
320  // production is not enabled also drop all the EDFilters and EDProducers
321  // that are not scheduled. Drop the ParameterSet used to configure the module
322  // itself. Also drop the other traces of these labels in the top level
323  // ParameterSet: Remove that labels from @all_modules and from all the
324  // end paths. If this makes any end paths empty, then remove the end path
325  // name from @end_paths, and @paths.
326 
327  // First make a list of labels to drop
328  vstring outputModuleLabels;
329  std::string edmType;
330  std::string const moduleEdmType("@module_edm_type");
331  std::string const outputModule("OutputModule");
332  std::string const edAnalyzer("EDAnalyzer");
333  std::string const edFilter("EDFilter");
334  std::string const edProducer("EDProducer");
335 
336  std::set<std::string> modulesInConfigSet(modulesInConfig.begin(), modulesInConfig.end());
337 
338  //need a list of all modules on paths in order to determine
339  // if an EDAnalyzer only appears on an end path
340  vstring scheduledPaths = proc_pset.getParameter<vstring>("@paths");
341  std::set<std::string> modulesOnPaths;
342  {
343  std::set<std::string> noEndPaths(scheduledPaths.begin(), scheduledPaths.end());
344  for (auto const& endPath : end_path_name_list) {
345  noEndPaths.erase(endPath);
346  }
347  {
348  vstring labels;
349  for (auto const& path : noEndPaths) {
350  labels = proc_pset.getParameter<vstring>(path);
351  modulesOnPaths.insert(labels.begin(), labels.end());
352  }
353  }
354  }
355  //Initially fill labelsToBeDropped with all module mentioned in
356  // the configuration but which are not being used by the system
357  std::vector<std::string> labelsToBeDropped;
358  labelsToBeDropped.reserve(modulesInConfigSet.size());
359  std::set_difference(modulesInConfigSet.begin(),
360  modulesInConfigSet.end(),
361  usedModuleLabels.begin(),
362  usedModuleLabels.end(),
363  std::back_inserter(labelsToBeDropped));
364 
365  const unsigned int sizeBeforeOutputModules = labelsToBeDropped.size();
366  for (auto const& modLabel : usedModuleLabels) {
367  // Do nothing for modules that do not have a ParameterSet. Modules of type
368  // PathStatusInserter and EndPathStatusInserter will not have a ParameterSet.
369  if (proc_pset.existsAs<ParameterSet>(modLabel)) {
370  edmType = proc_pset.getParameterSet(modLabel).getParameter<std::string>(moduleEdmType);
371  if (edmType == outputModule) {
372  outputModuleLabels.push_back(modLabel);
373  labelsToBeDropped.push_back(modLabel);
374  }
375  if (edmType == edAnalyzer) {
376  if (modulesOnPaths.end() == modulesOnPaths.find(modLabel)) {
377  labelsToBeDropped.push_back(modLabel);
378  }
379  }
380  }
381  }
382  //labelsToBeDropped must be sorted
383  std::inplace_merge(
384  labelsToBeDropped.begin(), labelsToBeDropped.begin() + sizeBeforeOutputModules, labelsToBeDropped.end());
385 
386  // drop the parameter sets used to configure the modules
387  for_all(labelsToBeDropped, std::bind(&ParameterSet::eraseOrSetUntrackedParameterSet, std::ref(proc_pset), _1));
388 
389  // drop the labels from @all_modules
390  vstring::iterator endAfterRemove =
391  std::remove_if(modulesInConfig.begin(),
392  modulesInConfig.end(),
393  std::bind(binary_search_string, std::ref(labelsToBeDropped), _1));
394  modulesInConfig.erase(endAfterRemove, modulesInConfig.end());
395  proc_pset.addParameter<vstring>(std::string("@all_modules"), modulesInConfig);
396 
397  // drop the labels from all end paths
398  vstring endPathsToBeDropped;
399  vstring labels;
400  for (vstring::const_iterator iEndPath = end_path_name_list.begin(), endEndPath = end_path_name_list.end();
401  iEndPath != endEndPath;
402  ++iEndPath) {
403  labels = proc_pset.getParameter<vstring>(*iEndPath);
404  vstring::iterator iSave = labels.begin();
405  vstring::iterator iBegin = labels.begin();
406 
407  for (vstring::iterator iLabel = labels.begin(), iEnd = labels.end(); iLabel != iEnd; ++iLabel) {
408  if (binary_search_string(labelsToBeDropped, *iLabel)) {
409  if (binary_search_string(outputModuleLabels, *iLabel)) {
410  outputModulePathPositions[*iLabel].emplace_back(*iEndPath, iSave - iBegin);
411  }
412  } else {
413  if (iSave != iLabel) {
414  iSave->swap(*iLabel);
415  }
416  ++iSave;
417  }
418  }
419  labels.erase(iSave, labels.end());
420  if (labels.empty()) {
421  // remove empty end paths and save their names
422  proc_pset.eraseSimpleParameter(*iEndPath);
423  endPathsToBeDropped.push_back(*iEndPath);
424  } else {
425  proc_pset.addParameter<vstring>(*iEndPath, labels);
426  }
427  }
428  sort_all(endPathsToBeDropped);
429 
430  // remove empty end paths from @paths
431  endAfterRemove = std::remove_if(scheduledPaths.begin(),
432  scheduledPaths.end(),
433  std::bind(binary_search_string, std::ref(endPathsToBeDropped), _1));
434  scheduledPaths.erase(endAfterRemove, scheduledPaths.end());
435  proc_pset.addParameter<vstring>(std::string("@paths"), scheduledPaths);
436 
437  // remove empty end paths from @end_paths
438  vstring scheduledEndPaths = proc_pset.getParameter<vstring>("@end_paths");
439  endAfterRemove = std::remove_if(scheduledEndPaths.begin(),
440  scheduledEndPaths.end(),
441  std::bind(binary_search_string, std::ref(endPathsToBeDropped), _1));
442  scheduledEndPaths.erase(endAfterRemove, scheduledEndPaths.end());
443  proc_pset.addParameter<vstring>(std::string("@end_paths"), scheduledEndPaths);
444  }
445 
446  class RngEDConsumer : public EDConsumerBase {
447  public:
448  explicit RngEDConsumer(std::set<TypeID>& typesConsumed) {
450  if (rng.isAvailable()) {
451  rng->consumes(consumesCollector());
452  for (auto const& consumesInfo : this->consumesInfo()) {
453  typesConsumed.emplace(consumesInfo.type());
454  }
455  }
456  }
457  };
458 
459  template <typename F>
460  auto doCleanup(F&& iF) {
461  auto wrapped = [f = std::move(iF)](std::exception_ptr const* iPtr, edm::WaitingTaskHolder iTask) {
462  CMS_SA_ALLOW try { f(); } catch (...) {
463  }
464  if (iPtr) {
465  iTask.doneWaiting(*iPtr);
466  }
467  };
468  return wrapped;
469  }
470  } // namespace
471  // -----------------------------
472 
473  typedef std::vector<std::string> vstring;
474 
475  // -----------------------------
476 
478  service::TriggerNamesService const& tns,
479  ProductRegistry& preg,
480  BranchIDListHelper& branchIDListHelper,
481  ProcessBlockHelperBase& processBlockHelper,
482  ThinnedAssociationsHelper& thinnedAssociationsHelper,
483  SubProcessParentageHelper const* subProcessParentageHelper,
485  std::shared_ptr<ActivityRegistry> areg,
486  std::shared_ptr<ProcessConfiguration> processConfiguration,
487  bool hasSubprocesses,
488  PreallocationConfiguration const& prealloc,
489  ProcessContext const* processContext)
490  : //Only create a resultsInserter if there is a trigger path
491  resultsInserter_{tns.getTrigPaths().empty()
492  ? std::shared_ptr<TriggerResultInserter>{}
493  : makeInserter(proc_pset, prealloc, preg, actions, areg, processConfiguration)},
494  moduleRegistry_(new ModuleRegistry()),
495  all_output_communicators_(),
496  preallocConfig_(prealloc),
497  pathNames_(&tns.getTrigPaths()),
498  endPathNames_(&tns.getEndPaths()),
499  wantSummary_(tns.wantSummary()) {
500  makePathStatusInserters(pathStatusInserters_,
501  *pathNames_,
502  prealloc,
503  preg,
504  areg,
505  processConfiguration,
506  std::string("PathStatusInserter"));
507 
508  makePathStatusInserters(endPathStatusInserters_,
509  *endPathNames_,
510  prealloc,
511  preg,
512  areg,
513  processConfiguration,
514  std::string("EndPathStatusInserter"));
515 
516  assert(0 < prealloc.numberOfStreams());
517  streamSchedules_.reserve(prealloc.numberOfStreams());
518  for (unsigned int i = 0; i < prealloc.numberOfStreams(); ++i) {
519  streamSchedules_.emplace_back(make_shared_noexcept_false<StreamSchedule>(resultsInserter(),
520  pathStatusInserters_,
521  endPathStatusInserters_,
522  moduleRegistry(),
523  proc_pset,
524  tns,
525  prealloc,
526  preg,
527  branchIDListHelper,
528  actions,
529  areg,
530  processConfiguration,
531  StreamID{i},
532  processContext));
533  }
534 
535  //TriggerResults are injected automatically by StreamSchedules and are
536  // unknown to the ModuleRegistry
537  const std::string kTriggerResults("TriggerResults");
538  std::vector<std::string> modulesToUse;
539  modulesToUse.reserve(streamSchedules_[0]->allWorkers().size());
540  for (auto const& worker : streamSchedules_[0]->allWorkers()) {
541  if (worker->description()->moduleLabel() != kTriggerResults) {
542  modulesToUse.push_back(worker->description()->moduleLabel());
543  }
544  }
545  //The unscheduled modules are at the end of the list, but we want them at the front
546  unsigned int const nUnscheduledModules = streamSchedules_[0]->numberOfUnscheduledModules();
547  if (nUnscheduledModules > 0) {
548  std::vector<std::string> temp;
549  temp.reserve(modulesToUse.size());
550  auto itBeginUnscheduled = modulesToUse.begin() + modulesToUse.size() - nUnscheduledModules;
551  std::copy(itBeginUnscheduled, modulesToUse.end(), std::back_inserter(temp));
552  std::copy(modulesToUse.begin(), itBeginUnscheduled, std::back_inserter(temp));
553  temp.swap(modulesToUse);
554  }
555 
556  // propagate_const<T> has no reset() function
557  globalSchedule_ = std::make_unique<GlobalSchedule>(resultsInserter(),
558  pathStatusInserters_,
559  endPathStatusInserters_,
560  moduleRegistry(),
561  modulesToUse,
562  proc_pset,
563  preg,
564  prealloc,
565  actions,
566  areg,
567  processConfiguration,
568  processContext);
569 
570  //TriggerResults is not in the top level ParameterSet so the call to
571  // reduceParameterSet would fail to find it. Just remove it up front.
572  std::set<std::string> usedModuleLabels;
573  for (auto const& worker : allWorkers()) {
574  if (worker->description()->moduleLabel() != kTriggerResults) {
575  usedModuleLabels.insert(worker->description()->moduleLabel());
576  }
577  }
578  std::vector<std::string> modulesInConfig(proc_pset.getParameter<std::vector<std::string>>("@all_modules"));
579  std::map<std::string, std::vector<std::pair<std::string, int>>> outputModulePathPositions;
580  reduceParameterSet(proc_pset, tns.getEndPaths(), modulesInConfig, usedModuleLabels, outputModulePathPositions);
581  {
582  std::vector<std::string> aliases = proc_pset.getParameter<std::vector<std::string>>("@all_aliases");
583  detail::processEDAliases(aliases, {}, proc_pset, processConfiguration->processName(), preg);
584  }
585 
586  // At this point all BranchDescriptions are created. Mark now the
587  // ones of unscheduled workers to be on-demand.
588  if (nUnscheduledModules > 0) {
589  std::set<std::string> unscheduledModules(modulesToUse.begin(), modulesToUse.begin() + nUnscheduledModules);
590  preg.setUnscheduledProducts(unscheduledModules);
591  }
592 
593  processSwitchProducers(proc_pset, processConfiguration->processName(), preg);
594  proc_pset.registerIt();
595  processConfiguration->setParameterSetID(proc_pset.id());
596  processConfiguration->setProcessConfigurationID();
597 
598  // This is used for a little sanity-check to make sure no code
599  // modifications alter the number of workers at a later date.
600  size_t all_workers_count = allWorkers().size();
601 
602  moduleRegistry_->forAllModuleHolders([this](maker::ModuleHolder* iHolder) {
603  auto comm = iHolder->createOutputModuleCommunicator();
604  if (comm) {
605  all_output_communicators_.emplace_back(std::shared_ptr<OutputModuleCommunicator>{comm.release()});
606  }
607  });
608  // Now that the output workers are filled in, set any output limits or information.
609  limitOutput(proc_pset, branchIDListHelper.branchIDLists(), subProcessParentageHelper);
610 
611  // Sanity check: make sure nobody has added a worker after we've
612  // already relied on the WorkerManager being full.
613  assert(all_workers_count == allWorkers().size());
614 
615  branchIDListHelper.updateFromRegistry(preg);
616 
617  for (auto const& worker : streamSchedules_[0]->allWorkers()) {
618  worker->registerThinnedAssociations(preg, thinnedAssociationsHelper);
619  }
620 
621  processBlockHelper.updateForNewProcess(preg, processConfiguration->processName());
622 
623  // The output modules consume products in kept branches.
624  // So we must set this up before freezing.
625  for (auto& c : all_output_communicators_) {
626  c->selectProducts(preg, thinnedAssociationsHelper, processBlockHelper);
627  }
628 
629  for (auto& product : preg.productListUpdator()) {
630  setIsMergeable(product.second);
631  }
632 
633  {
634  // We now get a collection of types that may be consumed.
635  std::set<TypeID> productTypesConsumed;
636  std::set<TypeID> elementTypesConsumed;
637  // Loop over all modules
638  for (auto const& worker : allWorkers()) {
639  for (auto const& consumesInfo : worker->consumesInfo()) {
640  if (consumesInfo.kindOfType() == PRODUCT_TYPE) {
641  productTypesConsumed.emplace(consumesInfo.type());
642  } else {
643  elementTypesConsumed.emplace(consumesInfo.type());
644  }
645  }
646  }
647  // The SubProcess class is not a module, yet it may consume.
648  if (hasSubprocesses) {
649  productTypesConsumed.emplace(typeid(TriggerResults));
650  }
651  // The RandomNumberGeneratorService is not a module, yet it consumes.
652  { RngEDConsumer rngConsumer = RngEDConsumer(productTypesConsumed); }
653  preg.setFrozen(productTypesConsumed, elementTypesConsumed, processConfiguration->processName());
654  }
655 
656  for (auto& c : all_output_communicators_) {
657  c->setEventSelectionInfo(outputModulePathPositions, preg.anyProductProduced());
658  }
659 
660  if (wantSummary_) {
661  std::vector<const ModuleDescription*> modDesc;
662  const auto& workers = allWorkers();
663  modDesc.reserve(workers.size());
664 
665  std::transform(workers.begin(),
666  workers.end(),
667  std::back_inserter(modDesc),
668  [](const Worker* iWorker) -> const ModuleDescription* { return iWorker->description(); });
669 
670  // propagate_const<T> has no reset() function
671  summaryTimeKeeper_ = std::make_unique<SystemTimeKeeper>(prealloc.numberOfStreams(), modDesc, tns, processContext);
672  auto timeKeeperPtr = summaryTimeKeeper_.get();
673 
674  areg->watchPreModuleDestruction(timeKeeperPtr, &SystemTimeKeeper::removeModuleIfExists);
675 
676  areg->watchPreModuleEvent(timeKeeperPtr, &SystemTimeKeeper::startModuleEvent);
677  areg->watchPostModuleEvent(timeKeeperPtr, &SystemTimeKeeper::stopModuleEvent);
678  areg->watchPreModuleEventAcquire(timeKeeperPtr, &SystemTimeKeeper::restartModuleEvent);
679  areg->watchPostModuleEventAcquire(timeKeeperPtr, &SystemTimeKeeper::stopModuleEvent);
680  areg->watchPreModuleEventDelayedGet(timeKeeperPtr, &SystemTimeKeeper::pauseModuleEvent);
681  areg->watchPostModuleEventDelayedGet(timeKeeperPtr, &SystemTimeKeeper::restartModuleEvent);
682 
683  areg->watchPreSourceEvent(timeKeeperPtr, &SystemTimeKeeper::startEvent);
684  areg->watchPostEvent(timeKeeperPtr, &SystemTimeKeeper::stopEvent);
685 
686  areg->watchPrePathEvent(timeKeeperPtr, &SystemTimeKeeper::startPath);
687  areg->watchPostPathEvent(timeKeeperPtr, &SystemTimeKeeper::stopPath);
688 
689  areg->watchPostBeginJob(timeKeeperPtr, &SystemTimeKeeper::startProcessingLoop);
690  areg->watchPreEndJob(timeKeeperPtr, &SystemTimeKeeper::stopProcessingLoop);
691  //areg->preModuleEventSignal_.connect([timeKeeperPtr](StreamContext const& iContext, ModuleCallingContext const& iMod) {
692  //timeKeeperPtr->startModuleEvent(iContext,iMod);
693  //});
694  }
695 
696  } // Schedule::Schedule
697 
698  void Schedule::limitOutput(ParameterSet const& proc_pset,
699  BranchIDLists const& branchIDLists,
700  SubProcessParentageHelper const* subProcessParentageHelper) {
701  std::string const output("output");
702 
703  ParameterSet const& maxEventsPSet = proc_pset.getUntrackedParameterSet("maxEvents");
704  int maxEventSpecs = 0;
705  int maxEventsOut = -1;
706  ParameterSet const* vMaxEventsOut = nullptr;
707  std::vector<std::string> intNamesE = maxEventsPSet.getParameterNamesForType<int>(false);
708  if (search_all(intNamesE, output)) {
709  maxEventsOut = maxEventsPSet.getUntrackedParameter<int>(output);
710  ++maxEventSpecs;
711  }
712  std::vector<std::string> psetNamesE;
713  maxEventsPSet.getParameterSetNames(psetNamesE, false);
714  if (search_all(psetNamesE, output)) {
715  vMaxEventsOut = &maxEventsPSet.getUntrackedParameterSet(output);
716  ++maxEventSpecs;
717  }
718 
719  if (maxEventSpecs > 1) {
721  << "\nAt most, one form of 'output' may appear in the 'maxEvents' parameter set";
722  }
723 
724  for (auto& c : all_output_communicators_) {
725  OutputModuleDescription desc(branchIDLists, maxEventsOut, subProcessParentageHelper);
726  if (vMaxEventsOut != nullptr && !vMaxEventsOut->empty()) {
727  std::string const& moduleLabel = c->description().moduleLabel();
728  try {
729  desc.maxEvents_ = vMaxEventsOut->getUntrackedParameter<int>(moduleLabel);
730  } catch (Exception const&) {
732  << "\nNo entry in 'maxEvents' for output module label '" << moduleLabel << "'.\n";
733  }
734  }
735  c->configure(desc);
736  }
737  }
738 
739  bool Schedule::terminate() const {
740  if (all_output_communicators_.empty()) {
741  return false;
742  }
743  for (auto& c : all_output_communicators_) {
744  if (!c->limitReached()) {
745  // Found an output module that has not reached output event count.
746  return false;
747  }
748  }
749  LogInfo("SuccessfulTermination") << "The job is terminating successfully because each output module\n"
750  << "has reached its configured limit.\n";
751  return true;
752  }
753 
755  globalSchedule_->endJob(collector);
756  if (collector.hasThrown()) {
757  return;
758  }
759 
760  if (wantSummary_ == false)
761  return;
762  {
763  TriggerReport tr;
764  getTriggerReport(tr);
765 
766  // The trigger report (pass/fail etc.):
767 
768  LogFwkVerbatim("FwkSummary") << "";
769  if (streamSchedules_[0]->context().processContext()->isSubProcess()) {
770  LogFwkVerbatim("FwkSummary") << "TrigReport Process: "
771  << streamSchedules_[0]->context().processContext()->processName();
772  }
773  LogFwkVerbatim("FwkSummary") << "TrigReport "
774  << "---------- Event Summary ------------";
775  if (!tr.trigPathSummaries.empty()) {
776  LogFwkVerbatim("FwkSummary") << "TrigReport"
777  << " Events total = " << tr.eventSummary.totalEvents
778  << " passed = " << tr.eventSummary.totalEventsPassed
779  << " failed = " << tr.eventSummary.totalEventsFailed << "";
780  } else {
781  LogFwkVerbatim("FwkSummary") << "TrigReport"
782  << " Events total = " << tr.eventSummary.totalEvents
783  << " passed = " << tr.eventSummary.totalEvents << " failed = 0";
784  }
785 
786  LogFwkVerbatim("FwkSummary") << "";
787  LogFwkVerbatim("FwkSummary") << "TrigReport "
788  << "---------- Path Summary ------------";
789  LogFwkVerbatim("FwkSummary") << "TrigReport " << std::right << std::setw(10) << "Trig Bit#"
790  << " " << std::right << std::setw(10) << "Executed"
791  << " " << std::right << std::setw(10) << "Passed"
792  << " " << std::right << std::setw(10) << "Failed"
793  << " " << std::right << std::setw(10) << "Error"
794  << " "
795  << "Name"
796  << "";
797  for (auto const& p : tr.trigPathSummaries) {
798  LogFwkVerbatim("FwkSummary") << "TrigReport " << std::right << std::setw(5) << 1 << std::right << std::setw(5)
799  << p.bitPosition << " " << std::right << std::setw(10) << p.timesRun << " "
800  << std::right << std::setw(10) << p.timesPassed << " " << std::right
801  << std::setw(10) << p.timesFailed << " " << std::right << std::setw(10)
802  << p.timesExcept << " " << p.name << "";
803  }
804 
805  /*
806  std::vector<int>::const_iterator epi = empty_trig_paths_.begin();
807  std::vector<int>::const_iterator epe = empty_trig_paths_.end();
808  std::vector<std::string>::const_iterator epn = empty_trig_path_names_.begin();
809  for (; epi != epe; ++epi, ++epn) {
810 
811  LogFwkVerbatim("FwkSummary") << "TrigReport "
812  << std::right << std::setw(5) << 1
813  << std::right << std::setw(5) << *epi << " "
814  << std::right << std::setw(10) << totalEvents() << " "
815  << std::right << std::setw(10) << totalEvents() << " "
816  << std::right << std::setw(10) << 0 << " "
817  << std::right << std::setw(10) << 0 << " "
818  << *epn << "";
819  }
820  */
821 
822  LogFwkVerbatim("FwkSummary") << "";
823  LogFwkVerbatim("FwkSummary") << "TrigReport "
824  << "-------End-Path Summary ------------";
825  LogFwkVerbatim("FwkSummary") << "TrigReport " << std::right << std::setw(10) << "Trig Bit#"
826  << " " << std::right << std::setw(10) << "Executed"
827  << " " << std::right << std::setw(10) << "Passed"
828  << " " << std::right << std::setw(10) << "Failed"
829  << " " << std::right << std::setw(10) << "Error"
830  << " "
831  << "Name"
832  << "";
833  for (auto const& p : tr.endPathSummaries) {
834  LogFwkVerbatim("FwkSummary") << "TrigReport " << std::right << std::setw(5) << 0 << std::right << std::setw(5)
835  << p.bitPosition << " " << std::right << std::setw(10) << p.timesRun << " "
836  << std::right << std::setw(10) << p.timesPassed << " " << std::right
837  << std::setw(10) << p.timesFailed << " " << std::right << std::setw(10)
838  << p.timesExcept << " " << p.name << "";
839  }
840 
841  for (auto const& p : tr.trigPathSummaries) {
842  LogFwkVerbatim("FwkSummary") << "";
843  LogFwkVerbatim("FwkSummary") << "TrigReport "
844  << "---------- Modules in Path: " << p.name << " ------------";
845  LogFwkVerbatim("FwkSummary") << "TrigReport " << std::right << std::setw(10) << "Trig Bit#"
846  << " " << std::right << std::setw(10) << "Visited"
847  << " " << std::right << std::setw(10) << "Passed"
848  << " " << std::right << std::setw(10) << "Failed"
849  << " " << std::right << std::setw(10) << "Error"
850  << " "
851  << "Name"
852  << "";
853 
854  for (auto const& mod : p.moduleInPathSummaries) {
855  LogFwkVerbatim("FwkSummary") << "TrigReport " << std::right << std::setw(5) << 1 << std::right << std::setw(5)
856  << mod.bitPosition << " " << std::right << std::setw(10) << mod.timesVisited
857  << " " << std::right << std::setw(10) << mod.timesPassed << " " << std::right
858  << std::setw(10) << mod.timesFailed << " " << std::right << std::setw(10)
859  << mod.timesExcept << " " << mod.moduleLabel << "";
860  }
861  }
862 
863  for (auto const& p : tr.endPathSummaries) {
864  LogFwkVerbatim("FwkSummary") << "";
865  LogFwkVerbatim("FwkSummary") << "TrigReport "
866  << "------ Modules in End-Path: " << p.name << " ------------";
867  LogFwkVerbatim("FwkSummary") << "TrigReport " << std::right << std::setw(10) << "Trig Bit#"
868  << " " << std::right << std::setw(10) << "Visited"
869  << " " << std::right << std::setw(10) << "Passed"
870  << " " << std::right << std::setw(10) << "Failed"
871  << " " << std::right << std::setw(10) << "Error"
872  << " "
873  << "Name"
874  << "";
875 
876  unsigned int bitpos = 0;
877  for (auto const& mod : p.moduleInPathSummaries) {
878  LogFwkVerbatim("FwkSummary") << "TrigReport " << std::right << std::setw(5) << 0 << std::right << std::setw(5)
879  << bitpos << " " << std::right << std::setw(10) << mod.timesVisited << " "
880  << std::right << std::setw(10) << mod.timesPassed << " " << std::right
881  << std::setw(10) << mod.timesFailed << " " << std::right << std::setw(10)
882  << mod.timesExcept << " " << mod.moduleLabel << "";
883  ++bitpos;
884  }
885  }
886 
887  LogFwkVerbatim("FwkSummary") << "";
888  LogFwkVerbatim("FwkSummary") << "TrigReport "
889  << "---------- Module Summary ------------";
890  LogFwkVerbatim("FwkSummary") << "TrigReport " << std::right << std::setw(10) << "Visited"
891  << " " << std::right << std::setw(10) << "Executed"
892  << " " << std::right << std::setw(10) << "Passed"
893  << " " << std::right << std::setw(10) << "Failed"
894  << " " << std::right << std::setw(10) << "Error"
895  << " "
896  << "Name"
897  << "";
898  for (auto const& worker : tr.workerSummaries) {
899  LogFwkVerbatim("FwkSummary") << "TrigReport " << std::right << std::setw(10) << worker.timesVisited << " "
900  << std::right << std::setw(10) << worker.timesRun << " " << std::right
901  << std::setw(10) << worker.timesPassed << " " << std::right << std::setw(10)
902  << worker.timesFailed << " " << std::right << std::setw(10) << worker.timesExcept
903  << " " << worker.moduleLabel << "";
904  }
905  LogFwkVerbatim("FwkSummary") << "";
906  }
907  // The timing report (CPU and Real Time):
910 
911  const int totalEvents = std::max(1, tr.eventSummary.totalEvents);
912 
913  LogFwkVerbatim("FwkSummary") << "TimeReport "
914  << "---------- Event Summary ---[sec]----";
915  LogFwkVerbatim("FwkSummary") << "TimeReport" << std::setprecision(6) << std::fixed
916  << " event loop CPU/event = " << tr.eventSummary.cpuTime / totalEvents;
917  LogFwkVerbatim("FwkSummary") << "TimeReport" << std::setprecision(6) << std::fixed
918  << " event loop Real/event = " << tr.eventSummary.realTime / totalEvents;
919  LogFwkVerbatim("FwkSummary") << "TimeReport" << std::setprecision(6) << std::fixed
920  << " sum Streams Real/event = " << tr.eventSummary.sumStreamRealTime / totalEvents;
921  LogFwkVerbatim("FwkSummary") << "TimeReport" << std::setprecision(6) << std::fixed
922  << " efficiency CPU/Real/thread = "
925 
926  constexpr int kColumn1Size = 10;
927  constexpr int kColumn2Size = 12;
928  constexpr int kColumn3Size = 12;
929  LogFwkVerbatim("FwkSummary") << "";
930  LogFwkVerbatim("FwkSummary") << "TimeReport "
931  << "---------- Path Summary ---[Real sec]----";
932  LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
933  << " " << std::right << std::setw(kColumn2Size) << "per exec"
934  << " Name";
935  for (auto const& p : tr.trigPathSummaries) {
936  const int timesRun = std::max(1, p.timesRun);
937  LogFwkVerbatim("FwkSummary") << "TimeReport " << std::setprecision(6) << std::fixed << std::right
938  << std::setw(kColumn1Size) << p.realTime / totalEvents << " " << std::right
939  << std::setw(kColumn2Size) << p.realTime / timesRun << " " << p.name << "";
940  }
941  LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
942  << " " << std::right << std::setw(kColumn2Size) << "per exec"
943  << " Name"
944  << "";
945 
946  LogFwkVerbatim("FwkSummary") << "";
947  LogFwkVerbatim("FwkSummary") << "TimeReport "
948  << "-------End-Path Summary ---[Real sec]----";
949  LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
950  << " " << std::right << std::setw(kColumn2Size) << "per exec"
951  << " Name"
952  << "";
953  for (auto const& p : tr.endPathSummaries) {
954  const int timesRun = std::max(1, p.timesRun);
955 
956  LogFwkVerbatim("FwkSummary") << "TimeReport " << std::setprecision(6) << std::fixed << std::right
957  << std::setw(kColumn1Size) << p.realTime / totalEvents << " " << std::right
958  << std::setw(kColumn2Size) << p.realTime / timesRun << " " << p.name << "";
959  }
960  LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
961  << " " << std::right << std::setw(kColumn2Size) << "per exec"
962  << " Name"
963  << "";
964 
965  for (auto const& p : tr.trigPathSummaries) {
966  LogFwkVerbatim("FwkSummary") << "";
967  LogFwkVerbatim("FwkSummary") << "TimeReport "
968  << "---------- Modules in Path: " << p.name << " ---[Real sec]----";
969  LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
970  << " " << std::right << std::setw(kColumn2Size) << "per visit"
971  << " Name"
972  << "";
973  for (auto const& mod : p.moduleInPathSummaries) {
974  LogFwkVerbatim("FwkSummary") << "TimeReport " << std::setprecision(6) << std::fixed << std::right
975  << std::setw(kColumn1Size) << mod.realTime / totalEvents << " " << std::right
976  << std::setw(kColumn2Size) << mod.realTime / std::max(1, mod.timesVisited) << " "
977  << mod.moduleLabel << "";
978  }
979  }
980  if (not tr.trigPathSummaries.empty()) {
981  LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
982  << " " << std::right << std::setw(kColumn2Size) << "per visit"
983  << " Name"
984  << "";
985  }
986  for (auto const& p : tr.endPathSummaries) {
987  LogFwkVerbatim("FwkSummary") << "";
988  LogFwkVerbatim("FwkSummary") << "TimeReport "
989  << "------ Modules in End-Path: " << p.name << " ---[Real sec]----";
990  LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
991  << " " << std::right << std::setw(kColumn2Size) << "per visit"
992  << " Name"
993  << "";
994  for (auto const& mod : p.moduleInPathSummaries) {
995  LogFwkVerbatim("FwkSummary") << "TimeReport " << std::setprecision(6) << std::fixed << std::right
996  << std::setw(kColumn1Size) << mod.realTime / totalEvents << " " << std::right
997  << std::setw(kColumn2Size) << mod.realTime / std::max(1, mod.timesVisited) << " "
998  << mod.moduleLabel << "";
999  }
1000  }
1001  if (not tr.endPathSummaries.empty()) {
1002  LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
1003  << " " << std::right << std::setw(kColumn2Size) << "per visit"
1004  << " Name"
1005  << "";
1006  }
1007  LogFwkVerbatim("FwkSummary") << "";
1008  LogFwkVerbatim("FwkSummary") << "TimeReport "
1009  << "---------- Module Summary ---[Real sec]----";
1010  LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
1011  << " " << std::right << std::setw(kColumn2Size) << "per exec"
1012  << " " << std::right << std::setw(kColumn3Size) << "per visit"
1013  << " Name"
1014  << "";
1015  for (auto const& worker : tr.workerSummaries) {
1016  LogFwkVerbatim("FwkSummary") << "TimeReport " << std::setprecision(6) << std::fixed << std::right
1017  << std::setw(kColumn1Size) << worker.realTime / totalEvents << " " << std::right
1018  << std::setw(kColumn2Size) << worker.realTime / std::max(1, worker.timesRun) << " "
1019  << std::right << std::setw(kColumn3Size)
1020  << worker.realTime / std::max(1, worker.timesVisited) << " " << worker.moduleLabel
1021  << "";
1022  }
1023  LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
1024  << " " << std::right << std::setw(kColumn2Size) << "per exec"
1025  << " " << std::right << std::setw(kColumn3Size) << "per visit"
1026  << " Name"
1027  << "";
1028 
1029  LogFwkVerbatim("FwkSummary") << "";
1030  LogFwkVerbatim("FwkSummary") << "T---Report end!"
1031  << "";
1032  LogFwkVerbatim("FwkSummary") << "";
1033  }
1034 
1036  using std::placeholders::_1;
1038  for (auto& worker : allWorkers()) {
1039  worker->respondToCloseOutputFile();
1040  }
1041  }
1042 
1044  using std::placeholders::_1;
1045  for_all(all_output_communicators_, std::bind(&OutputModuleCommunicator::openFile, _1, std::cref(fb)));
1046  }
1047 
1049  RunPrincipal const& rp,
1050  ProcessContext const* processContext,
1051  ActivityRegistry* activityRegistry,
1052  MergeableRunProductMetadata const* mergeableRunProductMetadata) {
1055  LuminosityBlockID(rp.run(), 0),
1056  rp.index(),
1058  rp.endTime(),
1059  processContext);
1060 
1061  using namespace edm::waiting_task;
1062  chain::first([&](auto nextTask) {
1063  //services can depend on other services
1065 
1066  // Propagating the exception would be nontrivial, and signal actions are not supposed to throw exceptions
1067  CMS_SA_ALLOW try { activityRegistry->preGlobalWriteRunSignal_(globalContext); } catch (...) {
1068  }
1069  for (auto& c : all_output_communicators_) {
1070  c->writeRunAsync(nextTask, rp, processContext, activityRegistry, mergeableRunProductMetadata);
1071  }
1072  }) | chain::then(doCleanup([activityRegistry, globalContext, token]() {
1073  //services can depend on other services
1075 
1076  activityRegistry->postGlobalWriteRunSignal_(globalContext);
1077  })) |
1079  }
1080 
1082  ProcessBlockPrincipal const& pbp,
1083  ProcessContext const* processContext,
1084  ActivityRegistry* activityRegistry) {
1091  processContext);
1092 
1093  using namespace edm::waiting_task;
1094  chain::first([&](auto nextTask) {
1095  // Propagating the exception would be nontrivial, and signal actions are not supposed to throw exceptions
1097  CMS_SA_ALLOW try { activityRegistry->preWriteProcessBlockSignal_(globalContext); } catch (...) {
1098  }
1099  for (auto& c : all_output_communicators_) {
1100  c->writeProcessBlockAsync(nextTask, pbp, processContext, activityRegistry);
1101  }
1102  }) | chain::then(doCleanup([activityRegistry, globalContext, token]() {
1103  //services can depend on other services
1105 
1106  activityRegistry->postWriteProcessBlockSignal_(globalContext);
1107  })) |
1109  }
1110 
1112  LuminosityBlockPrincipal const& lbp,
1113  ProcessContext const* processContext,
1114  ActivityRegistry* activityRegistry) {
1117  lbp.id(),
1118  lbp.runPrincipal().index(),
1119  lbp.index(),
1120  lbp.beginTime(),
1121  processContext);
1122 
1123  using namespace edm::waiting_task;
1124  chain::first([&](auto nextTask) {
1126  CMS_SA_ALLOW try { activityRegistry->preGlobalWriteLumiSignal_(globalContext); } catch (...) {
1127  }
1128  for (auto& c : all_output_communicators_) {
1129  c->writeLumiAsync(nextTask, lbp, processContext, activityRegistry);
1130  }
1131  }) | chain::then(doCleanup([activityRegistry, globalContext, token]() {
1132  //services can depend on other services
1134 
1135  activityRegistry->postGlobalWriteLumiSignal_(globalContext);
1136  })) |
1138  }
1139 
1141  using std::placeholders::_1;
1142  // Return true iff at least one output module returns true.
1143  return (std::find_if(all_output_communicators_.begin(),
1147  }
1148 
1150  using std::placeholders::_1;
1151  for_all(allWorkers(), std::bind(&Worker::respondToOpenInputFile, _1, std::cref(fb)));
1152  }
1153 
1155  using std::placeholders::_1;
1156  for_all(allWorkers(), std::bind(&Worker::respondToCloseInputFile, _1, std::cref(fb)));
1157  }
1158 
1159  void Schedule::beginJob(ProductRegistry const& iRegistry,
1160  eventsetup::ESRecordsToProxyIndices const& iESIndices,
1161  ProcessBlockHelperBase const& processBlockHelperBase) {
1162  globalSchedule_->beginJob(iRegistry, iESIndices, processBlockHelperBase);
1163  }
1164 
1165  void Schedule::beginStream(unsigned int iStreamID) {
1166  assert(iStreamID < streamSchedules_.size());
1167  streamSchedules_[iStreamID]->beginStream();
1168  }
1169 
1170  void Schedule::endStream(unsigned int iStreamID) {
1171  assert(iStreamID < streamSchedules_.size());
1172  streamSchedules_[iStreamID]->endStream();
1173  }
1174 
1176  unsigned int iStreamID,
1178  ServiceToken const& token) {
1179  assert(iStreamID < streamSchedules_.size());
1180  streamSchedules_[iStreamID]->processOneEventAsync(std::move(iTask), info, token, pathStatusInserters_);
1181  }
1182 
1184  ParameterSet const& iPSet,
1185  const ProductRegistry& iRegistry,
1186  eventsetup::ESRecordsToProxyIndices const& iIndices) {
1187  Worker* found = nullptr;
1188  for (auto const& worker : allWorkers()) {
1189  if (worker->description()->moduleLabel() == iLabel) {
1190  found = worker;
1191  break;
1192  }
1193  }
1194  if (nullptr == found) {
1195  return false;
1196  }
1197 
1198  auto newMod = moduleRegistry_->replaceModule(iLabel, iPSet, preallocConfig_);
1199 
1200  globalSchedule_->replaceModule(newMod, iLabel);
1201 
1202  for (auto& s : streamSchedules_) {
1203  s->replaceModule(newMod, iLabel);
1204  }
1205 
1206  {
1207  //Need to updateLookup in order to make getByToken work
1208  auto const processBlockLookup = iRegistry.productLookup(InProcess);
1209  auto const runLookup = iRegistry.productLookup(InRun);
1210  auto const lumiLookup = iRegistry.productLookup(InLumi);
1211  auto const eventLookup = iRegistry.productLookup(InEvent);
1212  found->updateLookup(InProcess, *runLookup);
1213  found->updateLookup(InRun, *runLookup);
1214  found->updateLookup(InLumi, *lumiLookup);
1215  found->updateLookup(InEvent, *eventLookup);
1216  found->updateLookup(iIndices);
1217 
1218  auto const& processName = newMod->moduleDescription().processName();
1219  auto const& processBlockModuleToIndicies = processBlockLookup->indiciesForModulesInProcess(processName);
1220  auto const& runModuleToIndicies = runLookup->indiciesForModulesInProcess(processName);
1221  auto const& lumiModuleToIndicies = lumiLookup->indiciesForModulesInProcess(processName);
1222  auto const& eventModuleToIndicies = eventLookup->indiciesForModulesInProcess(processName);
1223  found->resolvePutIndicies(InProcess, processBlockModuleToIndicies);
1224  found->resolvePutIndicies(InRun, runModuleToIndicies);
1225  found->resolvePutIndicies(InLumi, lumiModuleToIndicies);
1226  found->resolvePutIndicies(InEvent, eventModuleToIndicies);
1227  }
1228 
1229  return true;
1230  }
1231 
1233  globalSchedule_->deleteModule(iLabel);
1234  for (auto& stream : streamSchedules_) {
1235  stream->deleteModule(iLabel);
1236  }
1237  moduleRegistry_->deleteModule(iLabel, areg->preModuleDestructionSignal_, areg->postModuleDestructionSignal_);
1238  }
1239 
1240  void Schedule::initializeEarlyDelete(std::vector<std::string> const& branchesToDeleteEarly,
1241  edm::ProductRegistry const& preg) {
1242  for (auto& stream : streamSchedules_) {
1243  stream->initializeEarlyDelete(*moduleRegistry(), branchesToDeleteEarly, preg);
1244  }
1245  }
1246 
1247  std::vector<ModuleDescription const*> Schedule::getAllModuleDescriptions() const {
1248  std::vector<ModuleDescription const*> result;
1249  result.reserve(allWorkers().size());
1250 
1251  for (auto const& worker : allWorkers()) {
1252  ModuleDescription const* p = worker->description();
1253  result.push_back(p);
1254  }
1255  return result;
1256  }
1257 
1258  Schedule::AllWorkers const& Schedule::allWorkers() const { return globalSchedule_->allWorkers(); }
1259 
1261  for (auto const& worker : allWorkers()) {
1262  worker->convertCurrentProcessAlias(processName);
1263  }
1264  }
1265 
1266  void Schedule::availablePaths(std::vector<std::string>& oLabelsToFill) const {
1267  streamSchedules_[0]->availablePaths(oLabelsToFill);
1268  }
1269 
1270  void Schedule::triggerPaths(std::vector<std::string>& oLabelsToFill) const { oLabelsToFill = *pathNames_; }
1271 
1272  void Schedule::endPaths(std::vector<std::string>& oLabelsToFill) const { oLabelsToFill = *endPathNames_; }
1273 
1274  void Schedule::modulesInPath(std::string const& iPathLabel, std::vector<std::string>& oLabelsToFill) const {
1275  streamSchedules_[0]->modulesInPath(iPathLabel, oLabelsToFill);
1276  }
1277 
1279  std::vector<ModuleDescription const*>& descriptions,
1280  unsigned int hint) const {
1281  streamSchedules_[0]->moduleDescriptionsInPath(iPathLabel, descriptions, hint);
1282  }
1283 
1285  std::vector<ModuleDescription const*>& descriptions,
1286  unsigned int hint) const {
1287  streamSchedules_[0]->moduleDescriptionsInEndPath(iEndPathLabel, descriptions, hint);
1288  }
1289 
1291  std::vector<ModuleDescription const*>& allModuleDescriptions,
1292  std::vector<std::pair<unsigned int, unsigned int>>& moduleIDToIndex,
1293  std::array<std::vector<std::vector<ModuleDescription const*>>, NumBranchTypes>& modulesWhoseProductsAreConsumedBy,
1294  std::vector<std::vector<ModuleProcessName>>& modulesInPreviousProcessesWhoseProductsAreConsumedBy,
1295  ProductRegistry const& preg) const {
1296  allModuleDescriptions.clear();
1297  moduleIDToIndex.clear();
1298  for (auto iBranchType = 0U; iBranchType < NumBranchTypes; ++iBranchType) {
1299  modulesWhoseProductsAreConsumedBy[iBranchType].clear();
1300  }
1301  modulesInPreviousProcessesWhoseProductsAreConsumedBy.clear();
1302 
1303  allModuleDescriptions.reserve(allWorkers().size());
1304  moduleIDToIndex.reserve(allWorkers().size());
1305  for (auto iBranchType = 0U; iBranchType < NumBranchTypes; ++iBranchType) {
1306  modulesWhoseProductsAreConsumedBy[iBranchType].resize(allWorkers().size());
1307  }
1308  modulesInPreviousProcessesWhoseProductsAreConsumedBy.resize(allWorkers().size());
1309 
1310  std::map<std::string, ModuleDescription const*> labelToDesc;
1311  unsigned int i = 0;
1312  for (auto const& worker : allWorkers()) {
1313  ModuleDescription const* p = worker->description();
1314  allModuleDescriptions.push_back(p);
1315  moduleIDToIndex.push_back(std::pair<unsigned int, unsigned int>(p->id(), i));
1316  labelToDesc[p->moduleLabel()] = p;
1317  ++i;
1318  }
1319  sort_all(moduleIDToIndex);
1320 
1321  i = 0;
1322  for (auto const& worker : allWorkers()) {
1323  std::array<std::vector<ModuleDescription const*>*, NumBranchTypes> modules;
1324  for (auto iBranchType = 0U; iBranchType < NumBranchTypes; ++iBranchType) {
1325  modules[iBranchType] = &modulesWhoseProductsAreConsumedBy[iBranchType].at(i);
1326  }
1327 
1328  std::vector<ModuleProcessName>& modulesInPreviousProcesses =
1329  modulesInPreviousProcessesWhoseProductsAreConsumedBy.at(i);
1330  try {
1331  worker->modulesWhoseProductsAreConsumed(modules, modulesInPreviousProcesses, preg, labelToDesc);
1332  } catch (cms::Exception& ex) {
1333  ex.addContext("Calling Worker::modulesWhoseProductsAreConsumed() for module " +
1334  worker->description()->moduleLabel());
1335  throw;
1336  }
1337  ++i;
1338  }
1339  }
1340 
1342  rep.eventSummary.totalEvents = 0;
1343  rep.eventSummary.totalEventsPassed = 0;
1344  rep.eventSummary.totalEventsFailed = 0;
1345  for (auto& s : streamSchedules_) {
1346  s->getTriggerReport(rep);
1347  }
1348  sort_all(rep.workerSummaries);
1349  }
1350 
1352  rep.eventSummary.totalEvents = 0;
1353  rep.eventSummary.cpuTime = 0.;
1354  rep.eventSummary.realTime = 0.;
1355  summaryTimeKeeper_->fillTriggerTimingReport(rep);
1356  }
1357 
1359  int returnValue = 0;
1360  for (auto& s : streamSchedules_) {
1361  returnValue += s->totalEvents();
1362  }
1363  return returnValue;
1364  }
1365 
1367  int returnValue = 0;
1368  for (auto& s : streamSchedules_) {
1369  returnValue += s->totalEventsPassed();
1370  }
1371  return returnValue;
1372  }
1373 
1375  int returnValue = 0;
1376  for (auto& s : streamSchedules_) {
1377  returnValue += s->totalEventsFailed();
1378  }
1379  return returnValue;
1380  }
1381 
1383  for (auto& s : streamSchedules_) {
1384  s->clearCounters();
1385  }
1386  }
1387 } // namespace edm
size
Write out results.
void endPaths(std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill the labels for all end paths in the process
Definition: Schedule.cc:1272
std::vector< PathSummary > endPathSummaries
Definition: TriggerReport.h:60
std::vector< PathTimingSummary > endPathSummaries
PostModuleDestruction postModuleDestructionSignal_
static const TGPicture * info(bool iBackgroundIsBlack)
bool terminate() const
Return whether each output module has reached its maximum count.
Definition: Schedule.cc:739
#define CMS_SA_ALLOW
Timestamp const & endTime() const
Definition: RunPrincipal.h:69
void stopEvent(StreamContext const &)
std::vector< BranchIDList > BranchIDLists
Definition: BranchIDList.h:19
virtual void openFile(FileBlock const &fb)=0
void getTriggerTimingReport(TriggerTimingReport &rep) const
Definition: Schedule.cc:1351
roAction_t actions[nactions]
Definition: GenABIO.cc:181
void writeProcessBlockAsync(WaitingTaskHolder iTask, ProcessBlockPrincipal const &, ProcessContext const *, ActivityRegistry *)
Definition: Schedule.cc:1081
static Timestamp invalidTimestamp()
Definition: Timestamp.h:82
void restartModuleEvent(StreamContext const &, ModuleCallingContext const &)
void respondToCloseInputFile(FileBlock const &fb)
Definition: Schedule.cc:1154
int totalEvents() const
Definition: Schedule.cc:1358
vector< string > vstring
Definition: ExoticaDQM.cc:8
void startModuleEvent(StreamContext const &, ModuleCallingContext const &)
std::vector< Worker * > AllWorkers
Definition: Schedule.h:125
RunPrincipal const & runPrincipal() const
void moduleDescriptionsInPath(std::string const &iPathLabel, std::vector< ModuleDescription const *> &descriptions, unsigned int hint) const
Definition: Schedule.cc:1278
void removeModuleIfExists(ModuleDescription const &module)
std::vector< std::string > const * pathNames_
Definition: Schedule.h:313
void getTriggerReport(TriggerReport &rep) const
Definition: Schedule.cc:1341
void convertCurrentProcessAlias(std::string const &processName)
Convert "@currentProcess" in InputTag process names to the actual current process name...
Definition: Schedule.cc:1260
RunNumber_t run() const
Definition: RunPrincipal.h:61
void endStream(unsigned int)
Definition: Schedule.cc:1170
ParameterSet getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
PostGlobalEndLumi postGlobalWriteLumiSignal_
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
std::vector< WorkerSummary > workerSummaries
Definition: TriggerReport.h:61
edm::propagate_const< std::unique_ptr< SystemTimeKeeper > > summaryTimeKeeper_
Definition: Schedule.h:311
void find(edm::Handle< EcalRecHitCollection > &hits, DetId thisDet, std::vector< EcalRecHitCollection::const_iterator > &hit, bool debug=false)
Definition: FindCaloHit.cc:19
assert(be >=bs)
PreModuleDestruction preModuleDestructionSignal_
constexpr auto then(O &&iO)
Definition: chain_first.h:277
void processOneEventAsync(WaitingTaskHolder iTask, unsigned int iStreamID, EventTransitionInfo &, ServiceToken const &token)
Definition: Schedule.cc:1175
static unsigned int getUniqueID()
Returns a unique id each time called. Intended to be passed to ModuleDescription&#39;s constructor&#39;s modI...
size_t getParameterSetNames(std::vector< std::string > &output, bool trackiness=true) const
edm::propagate_const< std::unique_ptr< GlobalSchedule > > globalSchedule_
Definition: Schedule.h:306
void eraseOrSetUntrackedParameterSet(std::string const &name)
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
T getUntrackedParameter(std::string const &, T const &) const
static RunIndex invalidRunIndex()
Definition: RunIndex.cc:9
void processEDAliases(std::vector< std::string > const &aliasNamesToProcess, std::unordered_set< std::string > const &aliasModulesToProcess, ParameterSet const &proc_pset, std::string const &processName, ProductRegistry &preg)
void deleteModule(std::string const &iLabel, ActivityRegistry *areg)
Deletes module with label iLabel.
Definition: Schedule.cc:1232
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
Definition: Schedule.cc:1247
std::vector< PathSummary > trigPathSummaries
Definition: TriggerReport.h:59
EventSummary eventSummary
Definition: TriggerReport.h:58
void beginJob(ProductRegistry const &, eventsetup::ESRecordsToProxyIndices const &, ProcessBlockHelperBase const &)
Definition: Schedule.cc:1159
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter > > > pathStatusInserters_
Definition: Schedule.h:301
void fillModuleAndConsumesInfo(std::vector< ModuleDescription const *> &allModuleDescriptions, std::vector< std::pair< unsigned int, unsigned int >> &moduleIDToIndex, std::array< std::vector< std::vector< ModuleDescription const *>>, NumBranchTypes > &modulesWhoseProductsAreConsumedBy, std::vector< std::vector< ModuleProcessName >> &modulesInPreviousProcessesWhoseProductsAreConsumedBy, ProductRegistry const &preg) const
Definition: Schedule.cc:1290
bool empty() const
Definition: ParameterSet.h:201
EventTimingSummary eventSummary
Strings const & getTrigPaths() const
void clearCounters()
Clear all the counters in the trigger report.
Definition: Schedule.cc:1382
std::vector< std::string > getParameterNamesForType(bool trackiness=true) const
Definition: ParameterSet.h:179
PostGlobalWriteRun postGlobalWriteRunSignal_
static ServiceRegistry & instance()
std::vector< PathTimingSummary > trigPathSummaries
RunIndex index() const
Definition: RunPrincipal.h:57
double f[11][100]
void limitOutput(ParameterSet const &proc_pset, BranchIDLists const &branchIDLists, SubProcessParentageHelper const *subProcessParentageHelper)
Definition: Schedule.cc:698
void respondToOpenInputFile(FileBlock const &fb)
Definition: Schedule.cc:1149
edm::propagate_const< std::shared_ptr< ModuleRegistry > > moduleRegistry_
Definition: Schedule.h:303
Timestamp const & beginTime() const
rep
Definition: cuy.py:1189
void writeRunAsync(WaitingTaskHolder iTask, RunPrincipal const &rp, ProcessContext const *, ActivityRegistry *, MergeableRunProductMetadata const *)
Definition: Schedule.cc:1048
void stopPath(StreamContext const &, PathContext const &, HLTPathStatus const &)
int totalEventsPassed() const
Definition: Schedule.cc:1366
std::shared_ptr< ModuleRegistry const > moduleRegistry() const
Definition: Schedule.h:297
static LuminosityBlockIndex invalidLuminosityBlockIndex()
PreGlobalWriteRun preGlobalWriteRunSignal_
void stopModuleEvent(StreamContext const &, ModuleCallingContext const &)
virtual bool shouldWeCloseFile() const =0
void availablePaths(std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill the labels for all paths in the process
Definition: Schedule.cc:1266
Log< level::Info, false > LogInfo
PreallocationConfiguration preallocConfig_
Definition: Schedule.h:309
Log< level::FwkInfo, true > LogFwkVerbatim
PreWriteProcessBlock preWriteProcessBlockSignal_
std::vector< edm::propagate_const< std::shared_ptr< StreamSchedule > > > streamSchedules_
Definition: Schedule.h:304
void sort_all(RandomAccessSequence &s)
wrappers for std::sort
Definition: Algorithms.h:92
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
Definition: Schedule.cc:1258
std::shared_ptr< ProductResolverIndexHelper const > productLookup(BranchType branchType) const
void respondToOpenInputFile(FileBlock const &fb)
Definition: Worker.h:173
void startPath(StreamContext const &, PathContext const &)
bool search_all(ForwardSequence const &s, Datum const &d)
Definition: Algorithms.h:36
PostWriteProcessBlock postWriteProcessBlockSignal_
bool shouldWeCloseOutput() const
Definition: Schedule.cc:1140
void addContext(std::string const &context)
Definition: Exception.cc:165
void moduleDescriptionsInEndPath(std::string const &iEndPathLabel, std::vector< ModuleDescription const *> &descriptions, unsigned int hint) const
Definition: Schedule.cc:1284
AllOutputModuleCommunicators all_output_communicators_
Definition: Schedule.h:308
void pauseModuleEvent(StreamContext const &, ModuleCallingContext const &)
PreGlobalEndLumi preGlobalWriteLumiSignal_
void beginStream(unsigned int)
Definition: Schedule.cc:1165
void respondToCloseInputFile(FileBlock const &fb)
Definition: Worker.h:174
bool wantSummary_
Definition: Schedule.h:315
std::vector< std::string > const * endPathNames_
Definition: Schedule.h:314
HLT enums.
void modulesInPath(std::string const &iPathLabel, std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel ...
Definition: Schedule.cc:1274
LuminosityBlockIndex index() const
void initializeEarlyDelete(std::vector< std::string > const &branchesToDeleteEarly, edm::ProductRegistry const &preg)
Definition: Schedule.cc:1240
static uInt32 F(BLOWFISH_CTX *ctx, uInt32 x)
Definition: blowfish.cc:163
void openOutputFiles(FileBlock &fb)
Definition: Schedule.cc:1043
void writeLumiAsync(WaitingTaskHolder iTask, LuminosityBlockPrincipal const &lbp, ProcessContext const *, ActivityRegistry *)
Definition: Schedule.cc:1111
std::vector< WorkerTimingSummary > workerSummaries
T mod(const T &a, const T &b)
Definition: ecalDccMap.h:4
void endJob(ExceptionCollector &collector)
Definition: Schedule.cc:754
Schedule(ParameterSet &proc_pset, service::TriggerNamesService const &tns, ProductRegistry &pregistry, BranchIDListHelper &branchIDListHelper, ProcessBlockHelperBase &, ThinnedAssociationsHelper &thinnedAssociationsHelper, SubProcessParentageHelper const *subProcessParentageHelper, ExceptionToActionTable const &actions, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration > processConfiguration, bool hasSubprocesses, PreallocationConfiguration const &config, ProcessContext const *processContext)
Definition: Schedule.cc:477
bool changeModule(std::string const &iLabel, ParameterSet const &iPSet, const ProductRegistry &iRegistry, eventsetup::ESRecordsToProxyIndices const &)
Definition: Schedule.cc:1183
std::vector< std::string > vstring
Definition: Schedule.cc:473
ServiceToken presentToken() const
void setIsMergeable(BranchDescription &)
std::vector< std::string > set_difference(std::vector< std::string > const &v1, std::vector< std::string > const &v2)
int totalEventsFailed() const
Definition: Schedule.cc:1374
def move(src, dest)
Definition: eostools.py:511
void closeOutputFiles()
Definition: Schedule.cc:1035
void triggerPaths(std::vector< std::string > &oLabelsToFill) const
Definition: Schedule.cc:1270
unsigned transform(const HcalDetId &id, unsigned transformCode)