CMS 3D CMS Logo

Schedule.cc
Go to the documentation of this file.
2 
35 
36 
37 
38 #include <algorithm>
39 #include <cassert>
40 #include <cstdlib>
41 #include <functional>
42 #include <iomanip>
43 #include <list>
44 #include <map>
45 #include <set>
46 #include <exception>
47 #include <sstream>
48 
50 
51 
52 namespace edm {
53 
54  class Maker;
55 
56  namespace {
57  using std::placeholders::_1;
58 
59  bool binary_search_string(std::vector<std::string> const& v, std::string const& s) {
60  return std::binary_search(v.begin(), v.end(), s);
61  }
62 
63  // Here we make the trigger results inserter directly. This should
64  // probably be a utility in the WorkerRegistry or elsewhere.
65 
66  std::shared_ptr<TriggerResultInserter>
67  makeInserter(ParameterSet& proc_pset,
68  PreallocationConfiguration const& iPrealloc,
69  ProductRegistry& preg,
70  ExceptionToActionTable const& actions,
71  std::shared_ptr<ActivityRegistry> areg,
72  std::shared_ptr<ProcessConfiguration> processConfiguration) {
73 
74  ParameterSet* trig_pset = proc_pset.getPSetForUpdate("@trigger_paths");
75  trig_pset->registerIt();
76 
77  WorkerParams work_args(trig_pset, preg, &iPrealloc, processConfiguration, actions);
78  ModuleDescription md(trig_pset->id(),
79  "TriggerResultInserter",
80  "TriggerResults",
81  processConfiguration.get(),
83 
84  areg->preModuleConstructionSignal_(md);
85  bool postCalled = false;
86  std::shared_ptr<TriggerResultInserter> returnValue;
87  try {
88  maker::ModuleHolderT<TriggerResultInserter> holder(make_shared_noexcept_false<TriggerResultInserter>(*trig_pset, iPrealloc.numberOfStreams()),static_cast<Maker const*>(nullptr));
89  holder.setModuleDescription(md);
90  holder.registerProductsAndCallbacks(&preg);
91  returnValue =holder.module();
92  postCalled = true;
93  // if exception then post will be called in the catch block
94  areg->postModuleConstructionSignal_(md);
95  }
96  catch (...) {
97  if(!postCalled) {
98  try {
99  areg->postModuleConstructionSignal_(md);
100  }
101  catch (...) {
102  // If post throws an exception ignore it because we are already handling another exception
103  }
104  }
105  throw;
106  }
107  return returnValue;
108  }
109 
110  template <typename T>
111  void
112  makePathStatusInserters(std::vector<edm::propagate_const<std::shared_ptr<T>>>& pathStatusInserters,
113  std::vector<std::string> const& pathNames,
114  PreallocationConfiguration const& iPrealloc,
115  ProductRegistry& preg,
116  std::shared_ptr<ActivityRegistry> areg,
117  std::shared_ptr<ProcessConfiguration> processConfiguration,
118  std::string const& moduleTypeName) {
119 
121  pset.addParameter<std::string>("@module_type", moduleTypeName);
122  pset.addParameter<std::string>("@module_edm_type", "EDProducer");
123  pset.registerIt();
124 
125  pathStatusInserters.reserve(pathNames.size());
126 
127  for (auto const& pathName : pathNames) {
128 
129  ModuleDescription md(pset.id(),
130  moduleTypeName,
131  pathName,
132  processConfiguration.get(),
134 
135  areg->preModuleConstructionSignal_(md);
136  bool postCalled = false;
137 
138  try {
139  maker::ModuleHolderT<T> holder(make_shared_noexcept_false<T>(iPrealloc.numberOfStreams()),
140  static_cast<Maker const*>(nullptr));
141  holder.setModuleDescription(md);
142  holder.registerProductsAndCallbacks(&preg);
143  pathStatusInserters.emplace_back(holder.module());
144  postCalled = true;
145  // if exception then post will be called in the catch block
146  areg->postModuleConstructionSignal_(md);
147  }
148  catch (...) {
149  if(!postCalled) {
150  try {
151  areg->postModuleConstructionSignal_(md);
152  }
153  catch (...) {
154  // If post throws an exception ignore it because we are already handling another exception
155  }
156  }
157  throw;
158  }
159  }
160  }
161 
162  void
163  checkAndInsertAlias(std::string const& friendlyClassName,
164  std::string const& moduleLabel,
165  std::string const& productInstanceName,
166  std::string const& processName,
167  std::string const& alias,
168  std::string const& instanceAlias,
169  ProductRegistry const& preg,
170  std::multimap<BranchKey, BranchKey>& aliasMap,
171  std::map<BranchKey, BranchKey>& aliasKeys) {
172  std::string const star("*");
173 
174  BranchKey key(friendlyClassName, moduleLabel, productInstanceName, processName);
175  if(preg.productList().find(key) == preg.productList().end()) {
176  // No product was found matching the alias.
177  // We throw an exception only if a module with the specified module label was created in this process.
178  for(auto const& product : preg.productList()) {
179  if(moduleLabel == product.first.moduleLabel() && processName == product.first.processName()) {
180  throw Exception(errors::Configuration, "EDAlias does not match data\n")
181  << "There are no products of type '" << friendlyClassName << "'\n"
182  << "with module label '" << moduleLabel << "' and instance name '" << productInstanceName << "'.\n";
183  }
184  }
185  }
186 
187  std::string const& theInstanceAlias(instanceAlias == star ? productInstanceName : instanceAlias);
188  BranchKey aliasKey(friendlyClassName, alias, theInstanceAlias, processName);
189  if(preg.productList().find(aliasKey) != preg.productList().end()) {
190  throw Exception(errors::Configuration, "EDAlias conflicts with data\n")
191  << "A product of type '" << friendlyClassName << "'\n"
192  << "with module label '" << alias << "' and instance name '" << theInstanceAlias << "'\n"
193  << "already exists.\n";
194  }
195  auto iter = aliasKeys.find(aliasKey);
196  if(iter != aliasKeys.end()) {
197  // The alias matches a previous one. If the same alias is used for different product, throw.
198  if(iter->second != key) {
199  throw Exception(errors::Configuration, "EDAlias conflict\n")
200  << "The module label alias '" << alias << "' and product instance alias '" << theInstanceAlias << "'\n"
201  << "are used for multiple products of type '" << friendlyClassName << "'\n"
202  << "One has module label '" << moduleLabel << "' and product instance name '" << productInstanceName << "',\n"
203  << "the other has module label '" << iter->second.moduleLabel() << "' and product instance name '" << iter->second.productInstanceName() << "'.\n";
204  }
205  } else {
206  auto prodIter = preg.productList().find(key);
207  if(prodIter != preg.productList().end()) {
208  if (!prodIter->second.produced()) {
209  throw Exception(errors::Configuration, "EDAlias\n")
210  << "The module label alias '" << alias << "' and product instance alias '" << theInstanceAlias << "'\n"
211  << "are used for a product of type '" << friendlyClassName << "'\n"
212  << "with module label '" << moduleLabel << "' and product instance name '" << productInstanceName << "',\n"
213  << "An EDAlias can only be used for products produced in the current process. This one is not.\n";
214  }
215  aliasMap.insert(std::make_pair(key, aliasKey));
216  aliasKeys.insert(std::make_pair(aliasKey, key));
217  }
218  }
219  }
220 
221  void
222  processEDAliases(ParameterSet const& proc_pset, std::string const& processName, ProductRegistry& preg) {
223  std::vector<std::string> aliases = proc_pset.getParameter<std::vector<std::string> >("@all_aliases");
224  if(aliases.empty()) {
225  return;
226  }
227  std::string const star("*");
228  std::string const empty("");
230  desc.add<std::string>("type");
231  desc.add<std::string>("fromProductInstance", star);
232  desc.add<std::string>("toProductInstance", star);
233 
234  std::multimap<BranchKey, BranchKey> aliasMap;
235 
236  std::map<BranchKey, BranchKey> aliasKeys; // Used to search for duplicates or clashes.
237 
238  // Now, loop over the alias information and store it in aliasMap.
239  for(std::string const& alias : aliases) {
240  ParameterSet const& aliasPSet = proc_pset.getParameterSet(alias);
241  std::vector<std::string> vPSetNames = aliasPSet.getParameterNamesForType<VParameterSet>();
242  for(std::string const& moduleLabel : vPSetNames) {
243  VParameterSet vPSet = aliasPSet.getParameter<VParameterSet>(moduleLabel);
244  for(ParameterSet& pset : vPSet) {
245  desc.validate(pset);
246  std::string friendlyClassName = pset.getParameter<std::string>("type");
247  std::string productInstanceName = pset.getParameter<std::string>("fromProductInstance");
248  std::string instanceAlias = pset.getParameter<std::string>("toProductInstance");
249  if(productInstanceName == star) {
250  bool match = false;
251  BranchKey lowerBound(friendlyClassName, moduleLabel, empty, empty);
252  for(ProductRegistry::ProductList::const_iterator it = preg.productList().lower_bound(lowerBound);
253  it != preg.productList().end() && it->first.friendlyClassName() == friendlyClassName && it->first.moduleLabel() == moduleLabel;
254  ++it) {
255  if(it->first.processName() != processName) {
256  continue;
257  }
258  match = true;
259 
260  checkAndInsertAlias(friendlyClassName, moduleLabel, it->first.productInstanceName(), processName, alias, instanceAlias, preg, aliasMap, aliasKeys);
261  }
262  if(!match) {
263  // No product was found matching the alias.
264  // We throw an exception only if a module with the specified module label was created in this process.
265  for(auto const& product : preg.productList()) {
266  if(moduleLabel == product.first.moduleLabel() && processName == product.first.processName()) {
267  throw Exception(errors::Configuration, "EDAlias parameter set mismatch\n")
268  << "There are no products of type '" << friendlyClassName << "'\n"
269  << "with module label '" << moduleLabel << "'.\n";
270  }
271  }
272  }
273  } else {
274  checkAndInsertAlias(friendlyClassName, moduleLabel, productInstanceName, processName, alias, instanceAlias, preg, aliasMap, aliasKeys);
275  }
276  }
277  }
278  }
279 
280 
281  // Now add the new alias entries to the product registry.
282  for(auto const& aliasEntry : aliasMap) {
283  ProductRegistry::ProductList::const_iterator it = preg.productList().find(aliasEntry.first);
284  assert(it != preg.productList().end());
285  preg.addLabelAlias(it->second, aliasEntry.second.moduleLabel(), aliasEntry.second.productInstanceName());
286  }
287 
288  }
289 
290  typedef std::vector<std::string> vstring;
291 
292  void reduceParameterSet(ParameterSet& proc_pset,
293  vstring const& end_path_name_list,
294  vstring& modulesInConfig,
295  std::set<std::string> const& usedModuleLabels,
296  std::map<std::string, std::vector<std::pair<std::string, int> > >& outputModulePathPositions) {
297  // Before calculating the ParameterSetID of the top level ParameterSet or
298  // saving it in the registry drop from the top level ParameterSet all
299  // OutputModules and EDAnalyzers not on trigger paths. If unscheduled
300  // production is not enabled also drop all the EDFilters and EDProducers
301  // that are not scheduled. Drop the ParameterSet used to configure the module
302  // itself. Also drop the other traces of these labels in the top level
303  // ParameterSet: Remove that labels from @all_modules and from all the
304  // end paths. If this makes any end paths empty, then remove the end path
305  // name from @end_paths, and @paths.
306 
307  // First make a list of labels to drop
308  vstring outputModuleLabels;
309  std::string edmType;
310  std::string const moduleEdmType("@module_edm_type");
311  std::string const outputModule("OutputModule");
312  std::string const edAnalyzer("EDAnalyzer");
313  std::string const edFilter("EDFilter");
314  std::string const edProducer("EDProducer");
315 
316  std::set<std::string> modulesInConfigSet(modulesInConfig.begin(), modulesInConfig.end());
317 
318  //need a list of all modules on paths in order to determine
319  // if an EDAnalyzer only appears on an end path
320  vstring scheduledPaths = proc_pset.getParameter<vstring>("@paths");
321  std::set<std::string> modulesOnPaths;
322  {
323  std::set<std::string> noEndPaths(scheduledPaths.begin(),scheduledPaths.end());
324  for(auto const& endPath: end_path_name_list) {
325  noEndPaths.erase(endPath);
326  }
327  {
328  vstring labels;
329  for(auto const& path: noEndPaths) {
330  labels = proc_pset.getParameter<vstring>(path);
331  modulesOnPaths.insert(labels.begin(),labels.end());
332  }
333  }
334  }
335  //Initially fill labelsToBeDropped with all module mentioned in
336  // the configuration but which are not being used by the system
337  std::vector<std::string> labelsToBeDropped;
338  labelsToBeDropped.reserve(modulesInConfigSet.size());
339  std::set_difference(modulesInConfigSet.begin(),modulesInConfigSet.end(),
340  usedModuleLabels.begin(),usedModuleLabels.end(),
341  std::back_inserter(labelsToBeDropped));
342 
343  const unsigned int sizeBeforeOutputModules = labelsToBeDropped.size();
344  for (auto const& modLabel: usedModuleLabels) {
345  // Do nothing for modules that do not have a ParameterSet. Modules of type
346  // PathStatusInserter and EndPathStatusInserter will not have a ParameterSet.
347  if (proc_pset.existsAs<ParameterSet>(modLabel)) {
348  edmType = proc_pset.getParameterSet(modLabel).getParameter<std::string>(moduleEdmType);
349  if (edmType == outputModule) {
350  outputModuleLabels.push_back(modLabel);
351  labelsToBeDropped.push_back(modLabel);
352  }
353  if(edmType == edAnalyzer) {
354  if(modulesOnPaths.end()==modulesOnPaths.find(modLabel)) {
355  labelsToBeDropped.push_back(modLabel);
356  }
357  }
358  }
359  }
360  //labelsToBeDropped must be sorted
361  std::inplace_merge(labelsToBeDropped.begin(),
362  labelsToBeDropped.begin()+sizeBeforeOutputModules,
363  labelsToBeDropped.end());
364 
365  // drop the parameter sets used to configure the modules
366  for_all(labelsToBeDropped, std::bind(&ParameterSet::eraseOrSetUntrackedParameterSet, std::ref(proc_pset), _1));
367 
368  // drop the labels from @all_modules
369  vstring::iterator endAfterRemove = std::remove_if(modulesInConfig.begin(), modulesInConfig.end(), std::bind(binary_search_string, std::ref(labelsToBeDropped), _1));
370  modulesInConfig.erase(endAfterRemove, modulesInConfig.end());
371  proc_pset.addParameter<vstring>(std::string("@all_modules"), modulesInConfig);
372 
373  // drop the labels from all end paths
374  vstring endPathsToBeDropped;
375  vstring labels;
376  for (vstring::const_iterator iEndPath = end_path_name_list.begin(), endEndPath = end_path_name_list.end();
377  iEndPath != endEndPath;
378  ++iEndPath) {
379  labels = proc_pset.getParameter<vstring>(*iEndPath);
380  vstring::iterator iSave = labels.begin();
381  vstring::iterator iBegin = labels.begin();
382 
383  for (vstring::iterator iLabel = labels.begin(), iEnd = labels.end();
384  iLabel != iEnd; ++iLabel) {
385  if (binary_search_string(labelsToBeDropped, *iLabel)) {
386  if (binary_search_string(outputModuleLabels, *iLabel)) {
387  outputModulePathPositions[*iLabel].emplace_back(*iEndPath, iSave - iBegin);
388  }
389  } else {
390  if (iSave != iLabel) {
391  iSave->swap(*iLabel);
392  }
393  ++iSave;
394  }
395  }
396  labels.erase(iSave, labels.end());
397  if (labels.empty()) {
398  // remove empty end paths and save their names
399  proc_pset.eraseSimpleParameter(*iEndPath);
400  endPathsToBeDropped.push_back(*iEndPath);
401  } else {
402  proc_pset.addParameter<vstring>(*iEndPath, labels);
403  }
404  }
405  sort_all(endPathsToBeDropped);
406 
407  // remove empty end paths from @paths
408  endAfterRemove = std::remove_if(scheduledPaths.begin(), scheduledPaths.end(), std::bind(binary_search_string, std::ref(endPathsToBeDropped), _1));
409  scheduledPaths.erase(endAfterRemove, scheduledPaths.end());
410  proc_pset.addParameter<vstring>(std::string("@paths"), scheduledPaths);
411 
412  // remove empty end paths from @end_paths
413  vstring scheduledEndPaths = proc_pset.getParameter<vstring>("@end_paths");
414  endAfterRemove = std::remove_if(scheduledEndPaths.begin(), scheduledEndPaths.end(), std::bind(binary_search_string, std::ref(endPathsToBeDropped), _1));
415  scheduledEndPaths.erase(endAfterRemove, scheduledEndPaths.end());
416  proc_pset.addParameter<vstring>(std::string("@end_paths"), scheduledEndPaths);
417 
418  }
419 
420  class RngEDConsumer : public EDConsumerBase {
421  public:
422  explicit RngEDConsumer(std::set<TypeID>& typesConsumed) {
424  if(rng.isAvailable()) {
425  rng->consumes(consumesCollector());
426  for (auto const& consumesInfo : this->consumesInfo()) {
427  typesConsumed.emplace(consumesInfo.type());
428  }
429  }
430  }
431  };
432  }
433  // -----------------------------
434 
435  typedef std::vector<std::string> vstring;
436 
437  // -----------------------------
438 
440  service::TriggerNamesService const& tns,
441  ProductRegistry& preg,
442  BranchIDListHelper& branchIDListHelper,
443  ThinnedAssociationsHelper& thinnedAssociationsHelper,
444  SubProcessParentageHelper const* subProcessParentageHelper,
445  ExceptionToActionTable const& actions,
446  std::shared_ptr<ActivityRegistry> areg,
447  std::shared_ptr<ProcessConfiguration> processConfiguration,
448  bool hasSubprocesses,
449  PreallocationConfiguration const& prealloc,
450  ProcessContext const* processContext) :
451  //Only create a resultsInserter if there is a trigger path
452  resultsInserter_{tns.getTrigPaths().empty()? std::shared_ptr<TriggerResultInserter>{} :makeInserter(proc_pset,prealloc,preg,actions,areg,processConfiguration)},
455  preallocConfig_(prealloc),
456  pathNames_(&tns.getTrigPaths()),
457  endPathNames_(&tns.getEndPaths()),
458  wantSummary_(tns.wantSummary()),
459  endpathsAreActive_(true)
460  {
461  makePathStatusInserters(pathStatusInserters_,
462  *pathNames_,
463  prealloc,
464  preg,
465  areg,
466  processConfiguration,
467  std::string("PathStatusInserter"));
468 
469  makePathStatusInserters(endPathStatusInserters_,
470  *endPathNames_,
471  prealloc,
472  preg,
473  areg,
474  processConfiguration,
475  std::string("EndPathStatusInserter"));
476 
477  assert(0<prealloc.numberOfStreams());
478  streamSchedules_.reserve(prealloc.numberOfStreams());
479  for(unsigned int i=0; i<prealloc.numberOfStreams();++i) {
480  streamSchedules_.emplace_back(make_shared_noexcept_false<StreamSchedule>(
481  resultsInserter(),
484  moduleRegistry(),
485  proc_pset,tns,prealloc,preg,
486  branchIDListHelper,actions,
487  areg,processConfiguration,
488  !hasSubprocesses,
489  StreamID{i},
490  processContext));
491  }
492 
493  //TriggerResults are injected automatically by StreamSchedules and are
494  // unknown to the ModuleRegistry
495  const std::string kTriggerResults("TriggerResults");
496  std::vector<std::string> modulesToUse;
497  modulesToUse.reserve(streamSchedules_[0]->allWorkers().size());
498  for(auto const& worker : streamSchedules_[0]->allWorkers()) {
499  if(worker->description().moduleLabel() != kTriggerResults) {
500  modulesToUse.push_back(worker->description().moduleLabel());
501  }
502  }
503  //The unscheduled modules are at the end of the list, but we want them at the front
504  unsigned int n = streamSchedules_[0]->numberOfUnscheduledModules();
505  if(n>0) {
506  std::vector<std::string> temp;
507  temp.reserve(modulesToUse.size());
508  auto itBeginUnscheduled = modulesToUse.begin()+modulesToUse.size()-n;
509  std::copy(itBeginUnscheduled,modulesToUse.end(),
510  std::back_inserter(temp));
511  std::copy(modulesToUse.begin(),itBeginUnscheduled,std::back_inserter(temp));
512  temp.swap(modulesToUse);
513  }
514 
515  // propagate_const<T> has no reset() function
516  globalSchedule_ = std::make_unique<GlobalSchedule>(
517  resultsInserter(),
520  moduleRegistry(),
521  modulesToUse,
522  proc_pset, preg, prealloc,
523  actions,areg,processConfiguration,processContext);
524 
525  //TriggerResults is not in the top level ParameterSet so the call to
526  // reduceParameterSet would fail to find it. Just remove it up front.
527  std::set<std::string> usedModuleLabels;
528  for(auto const& worker: allWorkers()) {
529  if(worker->description().moduleLabel() != kTriggerResults) {
530  usedModuleLabels.insert(worker->description().moduleLabel());
531  }
532  }
533  std::vector<std::string> modulesInConfig(proc_pset.getParameter<std::vector<std::string> >("@all_modules"));
534  std::map<std::string, std::vector<std::pair<std::string, int> > > outputModulePathPositions;
535  reduceParameterSet(proc_pset, tns.getEndPaths(), modulesInConfig, usedModuleLabels,
536  outputModulePathPositions);
537  processEDAliases(proc_pset, processConfiguration->processName(), preg);
538  proc_pset.registerIt();
539  processConfiguration->setParameterSetID(proc_pset.id());
540  processConfiguration->setProcessConfigurationID();
541 
542  // This is used for a little sanity-check to make sure no code
543  // modifications alter the number of workers at a later date.
544  size_t all_workers_count = allWorkers().size();
545 
546  moduleRegistry_->forAllModuleHolders([this](maker::ModuleHolder* iHolder){
547  auto comm = iHolder->createOutputModuleCommunicator();
548  if (comm) {
549  all_output_communicators_.emplace_back(std::shared_ptr<OutputModuleCommunicator>{comm.release()});
550  }
551  });
552  // Now that the output workers are filled in, set any output limits or information.
553  limitOutput(proc_pset, branchIDListHelper.branchIDLists(), subProcessParentageHelper);
554 
555  // Sanity check: make sure nobody has added a worker after we've
556  // already relied on the WorkerManager being full.
557  assert (all_workers_count == allWorkers().size());
558 
559  branchIDListHelper.updateFromRegistry(preg);
560 
561  for(auto const& worker : streamSchedules_[0]->allWorkers()) {
562  worker->registerThinnedAssociations(preg, thinnedAssociationsHelper);
563  }
564  thinnedAssociationsHelper.sort();
565 
566  // The output modules consume products in kept branches.
567  // So we must set this up before freezing.
568  for (auto& c : all_output_communicators_) {
569  c->selectProducts(preg, thinnedAssociationsHelper);
570  }
571 
572  for(auto & product : preg.productListUpdator()) {
573  setIsMergeable(product.second);
574  }
575 
576  {
577  // We now get a collection of types that may be consumed.
578  std::set<TypeID> productTypesConsumed;
579  std::set<TypeID> elementTypesConsumed;
580  // Loop over all modules
581  for (auto const& worker : allWorkers()) {
582  for (auto const& consumesInfo : worker->consumesInfo()) {
583  if (consumesInfo.kindOfType() == PRODUCT_TYPE) {
584  productTypesConsumed.emplace(consumesInfo.type());
585  } else {
586  elementTypesConsumed.emplace(consumesInfo.type());
587  }
588  }
589  }
590  // The SubProcess class is not a module, yet it may consume.
591  if(hasSubprocesses) {
592  productTypesConsumed.emplace(typeid(TriggerResults));
593  }
594  // The RandomNumberGeneratorService is not a module, yet it consumes.
595  {
596  RngEDConsumer rngConsumer = RngEDConsumer(productTypesConsumed);
597  }
598  preg.setFrozen(productTypesConsumed, elementTypesConsumed, processConfiguration->processName());
599  }
600 
601  for (auto& c : all_output_communicators_) {
602  c->setEventSelectionInfo(outputModulePathPositions, preg.anyProductProduced());
603  }
604 
605  if(wantSummary_) {
606  std::vector<const ModuleDescription*> modDesc;
607  const auto& workers = allWorkers();
608  modDesc.reserve(workers.size());
609 
610  std::transform(workers.begin(),workers.end(),
611  std::back_inserter(modDesc),
612  [](const Worker* iWorker) -> const ModuleDescription* {
613  return iWorker->descPtr();
614  });
615 
616  // propagate_const<T> has no reset() function
617  summaryTimeKeeper_ = std::make_unique<SystemTimeKeeper>(
618  prealloc.numberOfStreams(),
619  modDesc,
620  tns,
621  processContext);
622  auto timeKeeperPtr = summaryTimeKeeper_.get();
623 
624  areg->watchPreModuleEvent(timeKeeperPtr, &SystemTimeKeeper::startModuleEvent);
625  areg->watchPostModuleEvent(timeKeeperPtr, &SystemTimeKeeper::stopModuleEvent);
626  areg->watchPreModuleEventAcquire(timeKeeperPtr, &SystemTimeKeeper::restartModuleEvent);
627  areg->watchPostModuleEventAcquire(timeKeeperPtr, &SystemTimeKeeper::stopModuleEvent);
628  areg->watchPreModuleEventDelayedGet(timeKeeperPtr, &SystemTimeKeeper::pauseModuleEvent);
629  areg->watchPostModuleEventDelayedGet(timeKeeperPtr,&SystemTimeKeeper::restartModuleEvent);
630 
631  areg->watchPreSourceEvent(timeKeeperPtr, &SystemTimeKeeper::startEvent);
632  areg->watchPostEvent(timeKeeperPtr, &SystemTimeKeeper::stopEvent);
633 
634  areg->watchPrePathEvent(timeKeeperPtr, &SystemTimeKeeper::startPath);
635  areg->watchPostPathEvent(timeKeeperPtr, &SystemTimeKeeper::stopPath);
636 
637  areg->watchPostBeginJob(timeKeeperPtr, &SystemTimeKeeper::startProcessingLoop);
638  areg->watchPreEndJob(timeKeeperPtr, &SystemTimeKeeper::stopProcessingLoop);
639  //areg->preModuleEventSignal_.connect([timeKeeperPtr](StreamContext const& iContext, ModuleCallingContext const& iMod) {
640  //timeKeeperPtr->startModuleEvent(iContext,iMod);
641  //});
642  }
643 
644  } // Schedule::Schedule
645 
646 
647  void
649  BranchIDLists const& branchIDLists,
650  SubProcessParentageHelper const* subProcessParentageHelper) {
651  std::string const output("output");
652 
653  ParameterSet const& maxEventsPSet = proc_pset.getUntrackedParameterSet("maxEvents");
654  int maxEventSpecs = 0;
655  int maxEventsOut = -1;
656  ParameterSet const* vMaxEventsOut = nullptr;
657  std::vector<std::string> intNamesE = maxEventsPSet.getParameterNamesForType<int>(false);
658  if (search_all(intNamesE, output)) {
659  maxEventsOut = maxEventsPSet.getUntrackedParameter<int>(output);
660  ++maxEventSpecs;
661  }
662  std::vector<std::string> psetNamesE;
663  maxEventsPSet.getParameterSetNames(psetNamesE, false);
664  if (search_all(psetNamesE, output)) {
665  vMaxEventsOut = &maxEventsPSet.getUntrackedParameterSet(output);
666  ++maxEventSpecs;
667  }
668 
669  if (maxEventSpecs > 1) {
671  "\nAt most, one form of 'output' may appear in the 'maxEvents' parameter set";
672  }
673 
674  for (auto& c : all_output_communicators_) {
675  OutputModuleDescription desc(branchIDLists, maxEventsOut, subProcessParentageHelper);
676  if (vMaxEventsOut != nullptr && !vMaxEventsOut->empty()) {
677  std::string const& moduleLabel = c->description().moduleLabel();
678  try {
679  desc.maxEvents_ = vMaxEventsOut->getUntrackedParameter<int>(moduleLabel);
680  } catch (Exception const&) {
682  "\nNo entry in 'maxEvents' for output module label '" << moduleLabel << "'.\n";
683  }
684  }
685  c->configure(desc);
686  }
687  }
688 
689  bool Schedule::terminate() const {
690  if (all_output_communicators_.empty()) {
691  return false;
692  }
693  for (auto& c : all_output_communicators_) {
694  if (!c->limitReached()) {
695  // Found an output module that has not reached output event count.
696  return false;
697  }
698  }
699  LogInfo("SuccessfulTermination")
700  << "The job is terminating successfully because each output module\n"
701  << "has reached its configured limit.\n";
702  return true;
703  }
704 
706  globalSchedule_->endJob(collector);
707  if (collector.hasThrown()) {
708  return;
709  }
710 
711  if (wantSummary_ == false) return;
712  {
713  TriggerReport tr;
714  getTriggerReport(tr);
715 
716  // The trigger report (pass/fail etc.):
717 
718  LogVerbatim("FwkSummary") << "";
719  if(streamSchedules_[0]->context().processContext()->isSubProcess()) {
720  LogVerbatim("FwkSummary") << "TrigReport Process: "<<streamSchedules_[0]->context().processContext()->processName();
721  }
722  LogVerbatim("FwkSummary") << "TrigReport " << "---------- Event Summary ------------";
723  if(!tr.trigPathSummaries.empty()) {
724  LogVerbatim("FwkSummary") << "TrigReport"
725  << " Events total = " << tr.eventSummary.totalEvents
726  << " passed = " << tr.eventSummary.totalEventsPassed
727  << " failed = " << tr.eventSummary.totalEventsFailed
728  << "";
729  } else {
730  LogVerbatim("FwkSummary") << "TrigReport"
731  << " Events total = " << tr.eventSummary.totalEvents
732  << " passed = " << tr.eventSummary.totalEvents
733  << " failed = 0";
734  }
735 
736  LogVerbatim("FwkSummary") << "";
737  LogVerbatim("FwkSummary") << "TrigReport " << "---------- Path Summary ------------";
738  LogVerbatim("FwkSummary") << "TrigReport "
739  << std::right << std::setw(10) << "Trig Bit#" << " "
740  << std::right << std::setw(10) << "Executed" << " "
741  << std::right << std::setw(10) << "Passed" << " "
742  << std::right << std::setw(10) << "Failed" << " "
743  << std::right << std::setw(10) << "Error" << " "
744  << "Name" << "";
745  for (auto const& p: tr.trigPathSummaries) {
746  LogVerbatim("FwkSummary") << "TrigReport "
747  << std::right << std::setw(5) << 1
748  << std::right << std::setw(5) << p.bitPosition << " "
749  << std::right << std::setw(10) << p.timesRun << " "
750  << std::right << std::setw(10) << p.timesPassed << " "
751  << std::right << std::setw(10) << p.timesFailed << " "
752  << std::right << std::setw(10) << p.timesExcept << " "
753  << p.name << "";
754  }
755 
756  /*
757  std::vector<int>::const_iterator epi = empty_trig_paths_.begin();
758  std::vector<int>::const_iterator epe = empty_trig_paths_.end();
759  std::vector<std::string>::const_iterator epn = empty_trig_path_names_.begin();
760  for (; epi != epe; ++epi, ++epn) {
761 
762  LogVerbatim("FwkSummary") << "TrigReport "
763  << std::right << std::setw(5) << 1
764  << std::right << std::setw(5) << *epi << " "
765  << std::right << std::setw(10) << totalEvents() << " "
766  << std::right << std::setw(10) << totalEvents() << " "
767  << std::right << std::setw(10) << 0 << " "
768  << std::right << std::setw(10) << 0 << " "
769  << *epn << "";
770  }
771  */
772 
773  LogVerbatim("FwkSummary") << "";
774  LogVerbatim("FwkSummary") << "TrigReport " << "-------End-Path Summary ------------";
775  LogVerbatim("FwkSummary") << "TrigReport "
776  << std::right << std::setw(10) << "Trig Bit#" << " "
777  << std::right << std::setw(10) << "Executed" << " "
778  << std::right << std::setw(10) << "Passed" << " "
779  << std::right << std::setw(10) << "Failed" << " "
780  << std::right << std::setw(10) << "Error" << " "
781  << "Name" << "";
782  for (auto const& p: tr.endPathSummaries) {
783  LogVerbatim("FwkSummary") << "TrigReport "
784  << std::right << std::setw(5) << 0
785  << std::right << std::setw(5) << p.bitPosition << " "
786  << std::right << std::setw(10) << p.timesRun << " "
787  << std::right << std::setw(10) << p.timesPassed << " "
788  << std::right << std::setw(10) << p.timesFailed << " "
789  << std::right << std::setw(10) << p.timesExcept << " "
790  << p.name << "";
791  }
792 
793  for (auto const& p: tr.trigPathSummaries) {
794  LogVerbatim("FwkSummary") << "";
795  LogVerbatim("FwkSummary") << "TrigReport " << "---------- Modules in Path: " << p.name << " ------------";
796  LogVerbatim("FwkSummary") << "TrigReport "
797  << std::right << std::setw(10) << "Trig Bit#" << " "
798  << std::right << std::setw(10) << "Visited" << " "
799  << std::right << std::setw(10) << "Passed" << " "
800  << std::right << std::setw(10) << "Failed" << " "
801  << std::right << std::setw(10) << "Error" << " "
802  << "Name" << "";
803 
804  unsigned int bitpos = 0;
805  for (auto const& mod: p.moduleInPathSummaries) {
806  LogVerbatim("FwkSummary") << "TrigReport "
807  << std::right << std::setw(5) << 1
808  << std::right << std::setw(5) << bitpos << " "
809  << std::right << std::setw(10) << mod.timesVisited << " "
810  << std::right << std::setw(10) << mod.timesPassed << " "
811  << std::right << std::setw(10) << mod.timesFailed << " "
812  << std::right << std::setw(10) << mod.timesExcept << " "
813  << mod.moduleLabel << "";
814  ++bitpos;
815  }
816  }
817 
818  for (auto const& p: tr.endPathSummaries) {
819  LogVerbatim("FwkSummary") << "";
820  LogVerbatim("FwkSummary") << "TrigReport " << "------ Modules in End-Path: " << p.name << " ------------";
821  LogVerbatim("FwkSummary") << "TrigReport "
822  << std::right << std::setw(10) << "Trig Bit#" << " "
823  << std::right << std::setw(10) << "Visited" << " "
824  << std::right << std::setw(10) << "Passed" << " "
825  << std::right << std::setw(10) << "Failed" << " "
826  << std::right << std::setw(10) << "Error" << " "
827  << "Name" << "";
828 
829  unsigned int bitpos=0;
830  for (auto const& mod: p.moduleInPathSummaries) {
831  LogVerbatim("FwkSummary") << "TrigReport "
832  << std::right << std::setw(5) << 0
833  << std::right << std::setw(5) << bitpos << " "
834  << std::right << std::setw(10) << mod.timesVisited << " "
835  << std::right << std::setw(10) << mod.timesPassed << " "
836  << std::right << std::setw(10) << mod.timesFailed << " "
837  << std::right << std::setw(10) << mod.timesExcept << " "
838  << mod.moduleLabel << "";
839  ++bitpos;
840  }
841  }
842 
843  LogVerbatim("FwkSummary") << "";
844  LogVerbatim("FwkSummary") << "TrigReport " << "---------- Module Summary ------------";
845  LogVerbatim("FwkSummary") << "TrigReport "
846  << std::right << std::setw(10) << "Visited" << " "
847  << std::right << std::setw(10) << "Executed" << " "
848  << std::right << std::setw(10) << "Passed" << " "
849  << std::right << std::setw(10) << "Failed" << " "
850  << std::right << std::setw(10) << "Error" << " "
851  << "Name" << "";
852  for (auto const& worker : tr.workerSummaries) {
853  LogVerbatim("FwkSummary") << "TrigReport "
854  << std::right << std::setw(10) << worker.timesVisited << " "
855  << std::right << std::setw(10) << worker.timesRun << " "
856  << std::right << std::setw(10) << worker.timesPassed << " "
857  << std::right << std::setw(10) << worker.timesFailed << " "
858  << std::right << std::setw(10) << worker.timesExcept << " "
859  << worker.moduleLabel << "";
860  }
861  LogVerbatim("FwkSummary") << "";
862  }
863  // The timing report (CPU and Real Time):
866 
867  const int totalEvents = std::max(1, tr.eventSummary.totalEvents);
868 
869  LogVerbatim("FwkSummary") << "TimeReport " << "---------- Event Summary ---[sec]----";
870  LogVerbatim("FwkSummary") << "TimeReport"
871  << std::setprecision(6) << std::fixed
872  << " event loop CPU/event = " << tr.eventSummary.cpuTime/totalEvents;
873  LogVerbatim("FwkSummary") << "TimeReport"
874  << std::setprecision(6) << std::fixed
875  << " event loop Real/event = " << tr.eventSummary.realTime/totalEvents;
876  LogVerbatim("FwkSummary") << "TimeReport"
877  << std::setprecision(6) << std::fixed
878  << " sum Streams Real/event = " << tr.eventSummary.sumStreamRealTime/totalEvents;
879  LogVerbatim("FwkSummary") << "TimeReport"
880  << std::setprecision(6) << std::fixed
881  << " efficiency CPU/Real/thread = " << tr.eventSummary.cpuTime/tr.eventSummary.realTime/preallocConfig_.numberOfThreads();
882 
883  constexpr int kColumn1Size = 10;
884  constexpr int kColumn2Size = 12;
885  constexpr int kColumn3Size = 12;
886  LogVerbatim("FwkSummary") << "";
887  LogVerbatim("FwkSummary") << "TimeReport " << "---------- Path Summary ---[Real sec]----";
888  LogVerbatim("FwkSummary") << "TimeReport "
889  << std::right << std::setw(kColumn1Size) << "per event"<<" "
890  << std::right << std::setw(kColumn2Size) << "per exec"
891  << " Name";
892  for (auto const& p: tr.trigPathSummaries) {
893  const int timesRun = std::max(1, p.timesRun);
894  LogVerbatim("FwkSummary") << "TimeReport "
895  << std::setprecision(6) << std::fixed
896  << std::right << std::setw(kColumn1Size) << p.realTime/totalEvents << " "
897  << std::right << std::setw(kColumn2Size) << p.realTime/timesRun << " "
898  << p.name << "";
899  }
900  LogVerbatim("FwkSummary") << "TimeReport "
901  << std::right << std::setw(kColumn1Size) << "per event"<<" "
902  << std::right << std::setw(kColumn2Size) << "per exec"
903  << " Name" << "";
904 
905  LogVerbatim("FwkSummary") << "";
906  LogVerbatim("FwkSummary") << "TimeReport " << "-------End-Path Summary ---[Real sec]----";
907  LogVerbatim("FwkSummary") << "TimeReport "
908  << std::right << std::setw(kColumn1Size) << "per event" <<" "
909  << std::right << std::setw(kColumn2Size) << "per exec"
910  << " Name" << "";
911  for (auto const& p: tr.endPathSummaries) {
912  const int timesRun = std::max(1, p.timesRun);
913 
914  LogVerbatim("FwkSummary") << "TimeReport "
915  << std::setprecision(6) << std::fixed
916  << std::right << std::setw(kColumn1Size) << p.realTime/totalEvents << " "
917  << std::right << std::setw(kColumn2Size) << p.realTime/timesRun << " "
918  << p.name << "";
919  }
920  LogVerbatim("FwkSummary") << "TimeReport "
921  << std::right << std::setw(kColumn1Size) << "per event" <<" "
922  << std::right << std::setw(kColumn2Size) << "per exec"
923  << " Name" << "";
924 
925  for (auto const& p: tr.trigPathSummaries) {
926  LogVerbatim("FwkSummary") << "";
927  LogVerbatim("FwkSummary") << "TimeReport " << "---------- Modules in Path: " << p.name << " ---[Real sec]----";
928  LogVerbatim("FwkSummary") << "TimeReport "
929  << std::right << std::setw(kColumn1Size) << "per event" <<" "
930  << std::right << std::setw(kColumn2Size) << "per visit"
931  << " Name" << "";
932  for (auto const& mod: p.moduleInPathSummaries) {
933  LogVerbatim("FwkSummary") << "TimeReport "
934  << std::setprecision(6) << std::fixed
935  << std::right << std::setw(kColumn1Size) << mod.realTime/totalEvents << " "
936  << std::right << std::setw(kColumn2Size) << mod.realTime/std::max(1, mod.timesVisited) << " "
937  << mod.moduleLabel << "";
938  }
939  }
940  if(not tr.trigPathSummaries.empty()) {
941  LogVerbatim("FwkSummary") << "TimeReport "
942  << std::right << std::setw(kColumn1Size) << "per event" <<" "
943  << std::right << std::setw(kColumn2Size) << "per visit"
944  << " Name" << "";
945  }
946  for (auto const& p: tr.endPathSummaries) {
947  LogVerbatim("FwkSummary") << "";
948  LogVerbatim("FwkSummary") << "TimeReport " << "------ Modules in End-Path: " << p.name << " ---[Real sec]----";
949  LogVerbatim("FwkSummary") << "TimeReport "
950  << std::right << std::setw(kColumn1Size) << "per event" <<" "
951  << std::right << std::setw(kColumn2Size) << "per visit"
952  << " Name" << "";
953  for (auto const& mod: p.moduleInPathSummaries) {
954  LogVerbatim("FwkSummary") << "TimeReport "
955  << std::setprecision(6) << std::fixed
956  << std::right << std::setw(kColumn1Size) << mod.realTime/totalEvents << " "
957  << std::right << std::setw(kColumn2Size) << mod.realTime/std::max(1, mod.timesVisited) << " "
958  << mod.moduleLabel << "";
959  }
960  }
961  if(not tr.endPathSummaries.empty()) {
962  LogVerbatim("FwkSummary") << "TimeReport "
963  << std::right << std::setw(kColumn1Size) << "per event" <<" "
964  << std::right << std::setw(kColumn2Size) << "per visit"
965  << " Name" << "";
966  }
967  LogVerbatim("FwkSummary") << "";
968  LogVerbatim("FwkSummary") << "TimeReport " << "---------- Module Summary ---[Real sec]----";
969  LogVerbatim("FwkSummary") << "TimeReport "
970  << std::right << std::setw(kColumn1Size) << "per event" <<" "
971  << std::right << std::setw(kColumn2Size) << "per exec" <<" "
972  << std::right << std::setw(kColumn3Size) << "per visit"
973  << " Name" << "";
974  for (auto const& worker : tr.workerSummaries) {
975  LogVerbatim("FwkSummary") << "TimeReport "
976  << std::setprecision(6) << std::fixed
977  << std::right << std::setw(kColumn1Size) << worker.realTime/totalEvents << " "
978  << std::right << std::setw(kColumn2Size) << worker.realTime/std::max(1, worker.timesRun) << " "
979  << std::right << std::setw(kColumn3Size) << worker.realTime/std::max(1, worker.timesVisited) << " "
980  << worker.moduleLabel << "";
981  }
982  LogVerbatim("FwkSummary") << "TimeReport "
983  << std::right << std::setw(kColumn1Size) << "per event" <<" "
984  << std::right << std::setw(kColumn2Size) << "per exec" <<" "
985  << std::right << std::setw(kColumn3Size) << "per visit"
986  << " Name" << "";
987 
988  LogVerbatim("FwkSummary") << "";
989  LogVerbatim("FwkSummary") << "T---Report end!" << "";
990  LogVerbatim("FwkSummary") << "";
991  }
992 
994  using std::placeholders::_1;
996  }
997 
999  using std::placeholders::_1;
1000  for_all(all_output_communicators_, std::bind(&OutputModuleCommunicator::openFile, _1, std::cref(fb)));
1001  }
1002 
1004  RunPrincipal const& rp,
1005  ProcessContext const* processContext,
1006  ActivityRegistry* activityRegistry,
1007  MergeableRunProductMetadata const* mergeableRunProductMetadata) {
1008  for(auto& c: all_output_communicators_) {
1009  c->writeRunAsync(task, rp, processContext, activityRegistry, mergeableRunProductMetadata);
1010  }
1011  }
1012 
1014  LuminosityBlockPrincipal const& lbp,
1015  ProcessContext const* processContext,
1016  ActivityRegistry* activityRegistry) {
1017  for(auto& c: all_output_communicators_) {
1018  c->writeLumiAsync(task, lbp, processContext, activityRegistry);
1019  }
1020  }
1021 
1023  using std::placeholders::_1;
1024  // Return true iff at least one output module returns true.
1025  return (std::find_if (all_output_communicators_.begin(), all_output_communicators_.end(),
1027  != all_output_communicators_.end());
1028  }
1029 
1031  using std::placeholders::_1;
1032  for_all(allWorkers(), std::bind(&Worker::respondToOpenInputFile, _1, std::cref(fb)));
1033  }
1034 
1036  using std::placeholders::_1;
1037  for_all(allWorkers(), std::bind(&Worker::respondToCloseInputFile, _1, std::cref(fb)));
1038  }
1039 
1040  void Schedule::beginJob(ProductRegistry const& iRegistry) {
1041  globalSchedule_->beginJob(iRegistry);
1042  }
1043 
1044  void Schedule::beginStream(unsigned int iStreamID) {
1045  assert(iStreamID<streamSchedules_.size());
1046  streamSchedules_[iStreamID]->beginStream();
1047  }
1048 
1049  void Schedule::endStream(unsigned int iStreamID) {
1050  assert(iStreamID<streamSchedules_.size());
1051  streamSchedules_[iStreamID]->endStream();
1052  }
1053 
1055  unsigned int iStreamID,
1056  EventPrincipal& ep,
1057  EventSetup const& es,
1058  ServiceToken const& token) {
1059  assert(iStreamID<streamSchedules_.size());
1060  streamSchedules_[iStreamID]->processOneEventAsync(std::move(iTask),ep,es,token,pathStatusInserters_);
1061  }
1062 
1064  ParameterSet const& iPSet,
1065  const ProductRegistry& iRegistry) {
1066  Worker* found = nullptr;
1067  for (auto const& worker : allWorkers()) {
1068  if (worker->description().moduleLabel() == iLabel) {
1069  found = worker;
1070  break;
1071  }
1072  }
1073  if (nullptr == found) {
1074  return false;
1075  }
1076 
1077  auto newMod = moduleRegistry_->replaceModule(iLabel,iPSet,preallocConfig_);
1078 
1079  globalSchedule_->replaceModule(newMod,iLabel);
1080 
1081  for(auto& s: streamSchedules_) {
1082  s->replaceModule(newMod,iLabel);
1083  }
1084 
1085  {
1086  //Need to updateLookup in order to make getByToken work
1087  auto const runLookup = iRegistry.productLookup(InRun);
1088  auto const lumiLookup = iRegistry.productLookup(InLumi);
1089  auto const eventLookup = iRegistry.productLookup(InEvent);
1090  found->updateLookup(InRun,*runLookup);
1091  found->updateLookup(InLumi,*lumiLookup);
1092  found->updateLookup(InEvent,*eventLookup);
1093 
1094  auto const& processName = newMod->moduleDescription().processName();
1095  auto const& runModuleToIndicies = runLookup->indiciesForModulesInProcess(processName);
1096  auto const& lumiModuleToIndicies = lumiLookup->indiciesForModulesInProcess(processName);
1097  auto const& eventModuleToIndicies = eventLookup->indiciesForModulesInProcess(processName);
1098  found->resolvePutIndicies(InRun,runModuleToIndicies);
1099  found->resolvePutIndicies(InLumi,lumiModuleToIndicies);
1100  found->resolvePutIndicies(InEvent,eventModuleToIndicies);
1101 
1102 
1103  }
1104 
1105  return true;
1106  }
1107 
1108  std::vector<ModuleDescription const*>
1110  std::vector<ModuleDescription const*> result;
1111  result.reserve(allWorkers().size());
1112 
1113  for (auto const& worker : allWorkers()) {
1114  ModuleDescription const* p = worker->descPtr();
1115  result.push_back(p);
1116  }
1117  return result;
1118  }
1119 
1120  Schedule::AllWorkers const&
1122  return globalSchedule_->allWorkers();
1123  }
1124 
1126  for (auto const& worker : allWorkers()) {
1127  worker->convertCurrentProcessAlias(processName);
1128  }
1129  }
1130 
1131  void
1132  Schedule::availablePaths(std::vector<std::string>& oLabelsToFill) const {
1133  streamSchedules_[0]->availablePaths(oLabelsToFill);
1134  }
1135 
1136  void
1137  Schedule::triggerPaths(std::vector<std::string>& oLabelsToFill) const {
1138  oLabelsToFill = *pathNames_;
1139 
1140  }
1141 
1142  void
1143  Schedule::endPaths(std::vector<std::string>& oLabelsToFill) const {
1144  oLabelsToFill = *endPathNames_;
1145  }
1146 
1147  void
1149  std::vector<std::string>& oLabelsToFill) const {
1150  streamSchedules_[0]->modulesInPath(iPathLabel,oLabelsToFill);
1151  }
1152 
1153  void
1155  std::vector<ModuleDescription const*>& descriptions,
1156  unsigned int hint) const {
1157  streamSchedules_[0]->moduleDescriptionsInPath(iPathLabel, descriptions, hint);
1158  }
1159 
1160  void
1162  std::vector<ModuleDescription const*>& descriptions,
1163  unsigned int hint) const {
1164  streamSchedules_[0]->moduleDescriptionsInEndPath(iEndPathLabel, descriptions, hint);
1165  }
1166 
1167  void
1168  Schedule::fillModuleAndConsumesInfo(std::vector<ModuleDescription const*>& allModuleDescriptions,
1169  std::vector<std::pair<unsigned int, unsigned int> >& moduleIDToIndex,
1170  std::vector<std::vector<ModuleDescription const*> >& modulesWhoseProductsAreConsumedBy,
1171  ProductRegistry const& preg) const {
1172  allModuleDescriptions.clear();
1173  moduleIDToIndex.clear();
1174  modulesWhoseProductsAreConsumedBy.clear();
1175 
1176  allModuleDescriptions.reserve(allWorkers().size());
1177  moduleIDToIndex.reserve(allWorkers().size());
1178  modulesWhoseProductsAreConsumedBy.resize(allWorkers().size());
1179 
1180  std::map<std::string, ModuleDescription const*> labelToDesc;
1181  unsigned int i = 0;
1182  for (auto const& worker : allWorkers()) {
1183  ModuleDescription const* p = worker->descPtr();
1184  allModuleDescriptions.push_back(p);
1185  moduleIDToIndex.push_back(std::pair<unsigned int, unsigned int>(p->id(), i));
1186  labelToDesc[p->moduleLabel()] = p;
1187  ++i;
1188  }
1189  sort_all(moduleIDToIndex);
1190 
1191  i = 0;
1192  for (auto const& worker : allWorkers()) {
1193  std::vector<ModuleDescription const*>& modules = modulesWhoseProductsAreConsumedBy.at(i);
1194  worker->modulesWhoseProductsAreConsumed(modules, preg, labelToDesc);
1195  ++i;
1196  }
1197  }
1198 
1199  void
1201  endpathsAreActive_ = active;
1202  for(auto& s : streamSchedules_) {
1203  s->enableEndPaths(active);
1204  }
1205  }
1206 
1207  bool
1209  return endpathsAreActive_;
1210  }
1211 
1212  void
1214  rep.eventSummary.totalEvents = 0;
1217  for(auto& s: streamSchedules_) {
1218  s->getTriggerReport(rep);
1219  }
1221  }
1222 
1223  void
1225  rep.eventSummary.totalEvents = 0;
1226  rep.eventSummary.cpuTime = 0.;
1227  rep.eventSummary.realTime = 0.;
1228  summaryTimeKeeper_->fillTriggerTimingReport(rep);
1229  }
1230 
1231  int
1233  int returnValue = 0;
1234  for(auto& s: streamSchedules_) {
1235  returnValue += s->totalEvents();
1236  }
1237  return returnValue;
1238  }
1239 
1240  int
1242  int returnValue = 0;
1243  for(auto& s: streamSchedules_) {
1244  returnValue += s->totalEventsPassed();
1245  }
1246  return returnValue;
1247  }
1248 
1249  int
1251  int returnValue = 0;
1252  for(auto& s: streamSchedules_) {
1253  returnValue += s->totalEventsFailed();
1254  }
1255  return returnValue;
1256  }
1257 
1258 
1259  void
1261  for(auto& s: streamSchedules_) {
1262  s->clearCounters();
1263  }
1264  }
1265 }
size
Write out results.
bool empty() const
Definition: ParameterSet.h:217
std::vector< PathSummary > endPathSummaries
Definition: TriggerReport.h:68
T getUntrackedParameter(std::string const &, T const &) const
std::vector< PathTimingSummary > endPathSummaries
void stopEvent(StreamContext const &)
std::vector< BranchIDList > BranchIDLists
Definition: BranchIDList.h:19
void fillModuleAndConsumesInfo(std::vector< ModuleDescription const * > &allModuleDescriptions, std::vector< std::pair< unsigned int, unsigned int > > &moduleIDToIndex, std::vector< std::vector< ModuleDescription const * > > &modulesWhoseProductsAreConsumedBy, ProductRegistry const &preg) const
Definition: Schedule.cc:1168
virtual void openFile(FileBlock const &fb)=0
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
Definition: Schedule.cc:1121
roAction_t actions[nactions]
Definition: GenABIO.cc:187
void availablePaths(std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill the labels for all paths in the process
Definition: Schedule.cc:1132
void restartModuleEvent(StreamContext const &, ModuleCallingContext const &)
bool endPathsEnabled() const
Definition: Schedule.cc:1208
def copy(args, dbName)
void respondToCloseInputFile(FileBlock const &fb)
Definition: Schedule.cc:1035
void startModuleEvent(StreamContext const &, ModuleCallingContext const &)
std::shared_ptr< ModuleRegistry const > moduleRegistry() const
Definition: Schedule.h:288
std::vector< Worker * > AllWorkers
Definition: Schedule.h:122
std::vector< ParameterSet > VParameterSet
Definition: ParameterSet.h:33
std::vector< std::string > const * pathNames_
Definition: Schedule.h:304
void convertCurrentProcessAlias(std::string const &processName)
Convert "@currentProcess" in InputTag process names to the actual current process name...
Definition: Schedule.cc:1125
virtual bool shouldWeCloseFile() const =0
void endStream(unsigned int)
Definition: Schedule.cc:1049
void enableEndPaths(bool active)
Definition: Schedule.cc:1200
ParameterSet getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
virtual void updateLookup(BranchType iBranchType, ProductResolverIndexHelper const &)=0
void processOneEventAsync(WaitingTaskHolder iTask, unsigned int iStreamID, EventPrincipal &principal, EventSetup const &eventSetup, ServiceToken const &token)
Definition: Schedule.cc:1054
void moduleDescriptionsInEndPath(std::string const &iEndPathLabel, std::vector< ModuleDescription const * > &descriptions, unsigned int hint) const
Definition: Schedule.cc:1161
std::vector< WorkerSummary > workerSummaries
Definition: TriggerReport.h:69
edm::propagate_const< std::unique_ptr< SystemTimeKeeper > > summaryTimeKeeper_
Definition: Schedule.h:302
std::string const & moduleLabel() const
static unsigned int getUniqueID()
Returns a unique id each time called. Intended to be passed to ModuleDescription&#39;s constructor&#39;s modI...
#define constexpr
int totalEventsFailed() const
Definition: Schedule.cc:1250
edm::propagate_const< std::unique_ptr< GlobalSchedule > > globalSchedule_
Definition: Schedule.h:297
bool changeModule(std::string const &iLabel, ParameterSet const &iPSet, const ProductRegistry &iRegistry)
Definition: Schedule.cc:1063
void eraseOrSetUntrackedParameterSet(std::string const &name)
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
std::vector< std::string > getParameterNamesForType(bool trackiness=true) const
Definition: ParameterSet.h:193
int totalEventsPassed() const
Definition: Schedule.cc:1241
void triggerPaths(std::vector< std::string > &oLabelsToFill) const
Definition: Schedule.cc:1137
std::vector< PathSummary > trigPathSummaries
Definition: TriggerReport.h:67
EventSummary eventSummary
Definition: TriggerReport.h:66
std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter > > > pathStatusInserters_
Definition: Schedule.h:292
int totalEvents() const
Definition: Schedule.cc:1232
virtual void resolvePutIndicies(BranchType iBranchType, std::unordered_multimap< std::string, std::tuple< TypeID const *, const char *, edm::ProductResolverIndex >> const &iIndicies)=0
EventTimingSummary eventSummary
void clearCounters()
Clear all the counters in the trigger report.
Definition: Schedule.cc:1260
Schedule(ParameterSet &proc_pset, service::TriggerNamesService const &tns, ProductRegistry &pregistry, BranchIDListHelper &branchIDListHelper, ThinnedAssociationsHelper &thinnedAssociationsHelper, SubProcessParentageHelper const *subProcessParentageHelper, ExceptionToActionTable const &actions, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration > processConfiguration, bool hasSubprocesses, PreallocationConfiguration const &config, ProcessContext const *processContext)
Definition: Schedule.cc:439
std::vector< PathTimingSummary > trigPathSummaries
void limitOutput(ParameterSet const &proc_pset, BranchIDLists const &branchIDLists, SubProcessParentageHelper const *subProcessParentageHelper)
Definition: Schedule.cc:648
bool terminate() const
Return whether each output module has reached its maximum count.
Definition: Schedule.cc:689
void respondToOpenInputFile(FileBlock const &fb)
Definition: Schedule.cc:1030
edm::propagate_const< std::shared_ptr< ModuleRegistry > > moduleRegistry_
Definition: Schedule.h:294
rep
Definition: cuy.py:1190
void writeRunAsync(WaitingTaskHolder iTask, RunPrincipal const &rp, ProcessContext const *, ActivityRegistry *, MergeableRunProductMetadata const *)
Definition: Schedule.cc:1003
void stopPath(StreamContext const &, PathContext const &, HLTPathStatus const &)
void stopModuleEvent(StreamContext const &, ModuleCallingContext const &)
void getTriggerReport(TriggerReport &rep) const
Definition: Schedule.cc:1213
PreallocationConfiguration preallocConfig_
Definition: Schedule.h:300
std::vector< edm::propagate_const< std::shared_ptr< StreamSchedule > > > streamSchedules_
Definition: Schedule.h:295
void sort_all(RandomAccessSequence &s)
wrappers for std::sort
Definition: Algorithms.h:120
volatile bool endpathsAreActive_
Definition: Schedule.h:308
void respondToOpenInputFile(FileBlock const &fb)
Definition: Worker.h:173
std::shared_ptr< TriggerResultInserter const > resultsInserter() const
Definition: Schedule.h:286
void startPath(StreamContext const &, PathContext const &)
bool search_all(ForwardSequence const &s, Datum const &d)
Definition: Algorithms.h:46
std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter > > > endPathStatusInserters_
Definition: Schedule.h:293
virtual std::unique_ptr< OutputModuleCommunicator > createOutputModuleCommunicator()=0
AllOutputModuleCommunicators all_output_communicators_
Definition: Schedule.h:299
void modulesInPath(std::string const &iPathLabel, std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel ...
Definition: Schedule.cc:1148
void pauseModuleEvent(StreamContext const &, ModuleCallingContext const &)
std::shared_ptr< ProductResolverIndexHelper const > productLookup(BranchType branchType) const
void beginStream(unsigned int)
Definition: Schedule.cc:1044
void respondToCloseInputFile(FileBlock const &fb)
Definition: Worker.h:174
bool wantSummary_
Definition: Schedule.h:306
std::vector< std::string > const * endPathNames_
Definition: Schedule.h:305
HLT enums.
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
Definition: Schedule.cc:1109
Strings const & getTrigPaths() const
void beginJob(ProductRegistry const &)
Definition: Schedule.cc:1040
size_t getParameterSetNames(std::vector< std::string > &output, bool trackiness=true) const
void openOutputFiles(FileBlock &fb)
Definition: Schedule.cc:998
void writeLumiAsync(WaitingTaskHolder iTask, LuminosityBlockPrincipal const &lbp, ProcessContext const *, ActivityRegistry *)
Definition: Schedule.cc:1013
std::vector< WorkerTimingSummary > workerSummaries
T mod(const T &a, const T &b)
Definition: ecalDccMap.h:4
void endJob(ExceptionCollector &collector)
Definition: Schedule.cc:705
void getTriggerTimingReport(TriggerTimingReport &rep) const
Definition: Schedule.cc:1224
bool shouldWeCloseOutput() const
Definition: Schedule.cc:1022
std::vector< std::string > vstring
Definition: Schedule.cc:435
void setIsMergeable(BranchDescription &)
def move(src, dest)
Definition: eostools.py:511
void closeOutputFiles()
Definition: Schedule.cc:993
void endPaths(std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill the labels for all end paths in the process
Definition: Schedule.cc:1143
void moduleDescriptionsInPath(std::string const &iPathLabel, std::vector< ModuleDescription const * > &descriptions, unsigned int hint) const
Definition: Schedule.cc:1154
unsigned int id() const
std::string match(BranchDescription const &a, BranchDescription const &b, std::string const &fileName)