CMS 3D CMS Logo

Schedule.cc
Go to the documentation of this file.
2 
34 
35 
36 #include <algorithm>
37 #include <cassert>
38 #include <cstdlib>
39 #include <functional>
40 #include <iomanip>
41 #include <list>
42 #include <map>
43 #include <set>
44 #include <exception>
45 #include <sstream>
46 
47 namespace edm {
48 
49  class Maker;
50 
51  namespace {
52  using std::placeholders::_1;
53 
54  bool binary_search_string(std::vector<std::string> const& v, std::string const& s) {
55  return std::binary_search(v.begin(), v.end(), s);
56  }
57 
58  // Here we make the trigger results inserter directly. This should
59  // probably be a utility in the WorkerRegistry or elsewhere.
60 
61  std::shared_ptr<TriggerResultInserter>
62  makeInserter(ParameterSet& proc_pset,
63  PreallocationConfiguration const& iPrealloc,
64  ProductRegistry& preg,
65  ExceptionToActionTable const& actions,
66  std::shared_ptr<ActivityRegistry> areg,
67  std::shared_ptr<ProcessConfiguration> processConfiguration) {
68 
69  ParameterSet* trig_pset = proc_pset.getPSetForUpdate("@trigger_paths");
70  trig_pset->registerIt();
71 
72  WorkerParams work_args(trig_pset, preg, &iPrealloc, processConfiguration, actions);
73  ModuleDescription md(trig_pset->id(),
74  "TriggerResultInserter",
75  "TriggerResults",
76  processConfiguration.get(),
78 
79  areg->preModuleConstructionSignal_(md);
80  bool postCalled = false;
81  std::shared_ptr<TriggerResultInserter> returnValue;
82  try {
83  maker::ModuleHolderT<TriggerResultInserter> holder(std::make_shared<TriggerResultInserter>(*trig_pset, iPrealloc.numberOfStreams()),static_cast<Maker const*>(nullptr));
84  holder.setModuleDescription(md);
85  holder.registerProductsAndCallbacks(&preg);
86  returnValue =holder.module();
87  postCalled = true;
88  // if exception then post will be called in the catch block
89  areg->postModuleConstructionSignal_(md);
90  }
91  catch (...) {
92  if(!postCalled) {
93  try {
94  areg->postModuleConstructionSignal_(md);
95  }
96  catch (...) {
97  // If post throws an exception ignore it because we are already handling another exception
98  }
99  }
100  throw;
101  }
102  return returnValue;
103  }
104 
105  template <typename T>
106  void
107  makePathStatusInserters(std::vector<edm::propagate_const<std::shared_ptr<T>>>& pathStatusInserters,
108  std::vector<std::string> const& pathNames,
109  PreallocationConfiguration const& iPrealloc,
110  ProductRegistry& preg,
111  std::shared_ptr<ActivityRegistry> areg,
112  std::shared_ptr<ProcessConfiguration> processConfiguration,
113  std::string const& moduleTypeName) {
114 
116  pset.addParameter<std::string>("@module_type", moduleTypeName);
117  pset.addParameter<std::string>("@module_edm_type", "EDProducer");
118  pset.registerIt();
119 
120  pathStatusInserters.reserve(pathNames.size());
121 
122  for (auto const& pathName : pathNames) {
123 
124  ModuleDescription md(pset.id(),
125  moduleTypeName,
126  pathName,
127  processConfiguration.get(),
129 
130  areg->preModuleConstructionSignal_(md);
131  bool postCalled = false;
132 
133  try {
134  maker::ModuleHolderT<T> holder(std::make_shared<T>(iPrealloc.numberOfStreams()),
135  static_cast<Maker const*>(nullptr));
136  holder.setModuleDescription(md);
137  holder.registerProductsAndCallbacks(&preg);
138  pathStatusInserters.emplace_back(holder.module());
139  postCalled = true;
140  // if exception then post will be called in the catch block
141  areg->postModuleConstructionSignal_(md);
142  }
143  catch (...) {
144  if(!postCalled) {
145  try {
146  areg->postModuleConstructionSignal_(md);
147  }
148  catch (...) {
149  // If post throws an exception ignore it because we are already handling another exception
150  }
151  }
152  throw;
153  }
154  }
155  }
156 
157  void
158  checkAndInsertAlias(std::string const& friendlyClassName,
159  std::string const& moduleLabel,
160  std::string const& productInstanceName,
161  std::string const& processName,
162  std::string const& alias,
163  std::string const& instanceAlias,
164  ProductRegistry const& preg,
165  std::multimap<BranchKey, BranchKey>& aliasMap,
166  std::map<BranchKey, BranchKey>& aliasKeys) {
167  std::string const star("*");
168 
169  BranchKey key(friendlyClassName, moduleLabel, productInstanceName, processName);
170  if(preg.productList().find(key) == preg.productList().end()) {
171  // No product was found matching the alias.
172  // We throw an exception only if a module with the specified module label was created in this process.
173  for(auto const& product : preg.productList()) {
174  if(moduleLabel == product.first.moduleLabel() && processName == product.first.processName()) {
175  throw Exception(errors::Configuration, "EDAlias does not match data\n")
176  << "There are no products of type '" << friendlyClassName << "'\n"
177  << "with module label '" << moduleLabel << "' and instance name '" << productInstanceName << "'.\n";
178  }
179  }
180  }
181 
182  std::string const& theInstanceAlias(instanceAlias == star ? productInstanceName : instanceAlias);
183  BranchKey aliasKey(friendlyClassName, alias, theInstanceAlias, processName);
184  if(preg.productList().find(aliasKey) != preg.productList().end()) {
185  throw Exception(errors::Configuration, "EDAlias conflicts with data\n")
186  << "A product of type '" << friendlyClassName << "'\n"
187  << "with module label '" << alias << "' and instance name '" << theInstanceAlias << "'\n"
188  << "already exists.\n";
189  }
190  auto iter = aliasKeys.find(aliasKey);
191  if(iter != aliasKeys.end()) {
192  // The alias matches a previous one. If the same alias is used for different product, throw.
193  if(iter->second != key) {
194  throw Exception(errors::Configuration, "EDAlias conflict\n")
195  << "The module label alias '" << alias << "' and product instance alias '" << theInstanceAlias << "'\n"
196  << "are used for multiple products of type '" << friendlyClassName << "'\n"
197  << "One has module label '" << moduleLabel << "' and product instance name '" << productInstanceName << "',\n"
198  << "the other has module label '" << iter->second.moduleLabel() << "' and product instance name '" << iter->second.productInstanceName() << "'.\n";
199  }
200  } else {
201  auto prodIter = preg.productList().find(key);
202  if(prodIter != preg.productList().end()) {
203  if (!prodIter->second.produced()) {
204  throw Exception(errors::Configuration, "EDAlias\n")
205  << "The module label alias '" << alias << "' and product instance alias '" << theInstanceAlias << "'\n"
206  << "are used for a product of type '" << friendlyClassName << "'\n"
207  << "with module label '" << moduleLabel << "' and product instance name '" << productInstanceName << "',\n"
208  << "An EDAlias can only be used for products produced in the current process. This one is not.\n";
209  }
210  aliasMap.insert(std::make_pair(key, aliasKey));
211  aliasKeys.insert(std::make_pair(aliasKey, key));
212  }
213  }
214  }
215 
216  void
217  processEDAliases(ParameterSet const& proc_pset, std::string const& processName, ProductRegistry& preg) {
218  std::vector<std::string> aliases = proc_pset.getParameter<std::vector<std::string> >("@all_aliases");
219  if(aliases.empty()) {
220  return;
221  }
222  std::string const star("*");
223  std::string const empty("");
225  desc.add<std::string>("type");
226  desc.add<std::string>("fromProductInstance", star);
227  desc.add<std::string>("toProductInstance", star);
228 
229  std::multimap<BranchKey, BranchKey> aliasMap;
230 
231  std::map<BranchKey, BranchKey> aliasKeys; // Used to search for duplicates or clashes.
232 
233  // Now, loop over the alias information and store it in aliasMap.
234  for(std::string const& alias : aliases) {
235  ParameterSet const& aliasPSet = proc_pset.getParameterSet(alias);
236  std::vector<std::string> vPSetNames = aliasPSet.getParameterNamesForType<VParameterSet>();
237  for(std::string const& moduleLabel : vPSetNames) {
238  VParameterSet vPSet = aliasPSet.getParameter<VParameterSet>(moduleLabel);
239  for(ParameterSet& pset : vPSet) {
240  desc.validate(pset);
241  std::string friendlyClassName = pset.getParameter<std::string>("type");
242  std::string productInstanceName = pset.getParameter<std::string>("fromProductInstance");
243  std::string instanceAlias = pset.getParameter<std::string>("toProductInstance");
244  if(productInstanceName == star) {
245  bool match = false;
246  BranchKey lowerBound(friendlyClassName, moduleLabel, empty, empty);
247  for(ProductRegistry::ProductList::const_iterator it = preg.productList().lower_bound(lowerBound);
248  it != preg.productList().end() && it->first.friendlyClassName() == friendlyClassName && it->first.moduleLabel() == moduleLabel;
249  ++it) {
250  if(it->first.processName() != processName) {
251  continue;
252  }
253  match = true;
254 
255  checkAndInsertAlias(friendlyClassName, moduleLabel, it->first.productInstanceName(), processName, alias, instanceAlias, preg, aliasMap, aliasKeys);
256  }
257  if(!match) {
258  // No product was found matching the alias.
259  // We throw an exception only if a module with the specified module label was created in this process.
260  for(auto const& product : preg.productList()) {
261  if(moduleLabel == product.first.moduleLabel() && processName == product.first.processName()) {
262  throw Exception(errors::Configuration, "EDAlias parameter set mismatch\n")
263  << "There are no products of type '" << friendlyClassName << "'\n"
264  << "with module label '" << moduleLabel << "'.\n";
265  }
266  }
267  }
268  } else {
269  checkAndInsertAlias(friendlyClassName, moduleLabel, productInstanceName, processName, alias, instanceAlias, preg, aliasMap, aliasKeys);
270  }
271  }
272  }
273  }
274 
275 
276  // Now add the new alias entries to the product registry.
277  for(auto const& aliasEntry : aliasMap) {
278  ProductRegistry::ProductList::const_iterator it = preg.productList().find(aliasEntry.first);
279  assert(it != preg.productList().end());
280  preg.addLabelAlias(it->second, aliasEntry.second.moduleLabel(), aliasEntry.second.productInstanceName());
281  }
282 
283  }
284 
285  typedef std::vector<std::string> vstring;
286 
287  void reduceParameterSet(ParameterSet& proc_pset,
288  vstring const& end_path_name_list,
289  vstring& modulesInConfig,
290  std::set<std::string> const& usedModuleLabels,
291  std::map<std::string, std::vector<std::pair<std::string, int> > >& outputModulePathPositions) {
292  // Before calculating the ParameterSetID of the top level ParameterSet or
293  // saving it in the registry drop from the top level ParameterSet all
294  // OutputModules and EDAnalyzers not on trigger paths. If unscheduled
295  // production is not enabled also drop all the EDFilters and EDProducers
296  // that are not scheduled. Drop the ParameterSet used to configure the module
297  // itself. Also drop the other traces of these labels in the top level
298  // ParameterSet: Remove that labels from @all_modules and from all the
299  // end paths. If this makes any end paths empty, then remove the end path
300  // name from @end_paths, and @paths.
301 
302  // First make a list of labels to drop
303  vstring outputModuleLabels;
304  std::string edmType;
305  std::string const moduleEdmType("@module_edm_type");
306  std::string const outputModule("OutputModule");
307  std::string const edAnalyzer("EDAnalyzer");
308  std::string const edFilter("EDFilter");
309  std::string const edProducer("EDProducer");
310 
311  std::set<std::string> modulesInConfigSet(modulesInConfig.begin(), modulesInConfig.end());
312 
313  //need a list of all modules on paths in order to determine
314  // if an EDAnalyzer only appears on an end path
315  vstring scheduledPaths = proc_pset.getParameter<vstring>("@paths");
316  std::set<std::string> modulesOnPaths;
317  {
318  std::set<std::string> noEndPaths(scheduledPaths.begin(),scheduledPaths.end());
319  for(auto const& endPath: end_path_name_list) {
320  noEndPaths.erase(endPath);
321  }
322  {
323  vstring labels;
324  for(auto const& path: noEndPaths) {
325  labels = proc_pset.getParameter<vstring>(path);
326  modulesOnPaths.insert(labels.begin(),labels.end());
327  }
328  }
329  }
330  //Initially fill labelsToBeDropped with all module mentioned in
331  // the configuration but which are not being used by the system
332  std::vector<std::string> labelsToBeDropped;
333  labelsToBeDropped.reserve(modulesInConfigSet.size());
334  std::set_difference(modulesInConfigSet.begin(),modulesInConfigSet.end(),
335  usedModuleLabels.begin(),usedModuleLabels.end(),
336  std::back_inserter(labelsToBeDropped));
337 
338  const unsigned int sizeBeforeOutputModules = labelsToBeDropped.size();
339  for (auto const& modLabel: usedModuleLabels) {
340  // Do nothing for modules that do not have a ParameterSet. Modules of type
341  // PathStatusInserter and EndPathStatusInserter will not have a ParameterSet.
342  if (proc_pset.existsAs<ParameterSet>(modLabel)) {
343  edmType = proc_pset.getParameterSet(modLabel).getParameter<std::string>(moduleEdmType);
344  if (edmType == outputModule) {
345  outputModuleLabels.push_back(modLabel);
346  labelsToBeDropped.push_back(modLabel);
347  }
348  if(edmType == edAnalyzer) {
349  if(modulesOnPaths.end()==modulesOnPaths.find(modLabel)) {
350  labelsToBeDropped.push_back(modLabel);
351  }
352  }
353  }
354  }
355  //labelsToBeDropped must be sorted
356  std::inplace_merge(labelsToBeDropped.begin(),
357  labelsToBeDropped.begin()+sizeBeforeOutputModules,
358  labelsToBeDropped.end());
359 
360  // drop the parameter sets used to configure the modules
361  for_all(labelsToBeDropped, std::bind(&ParameterSet::eraseOrSetUntrackedParameterSet, std::ref(proc_pset), _1));
362 
363  // drop the labels from @all_modules
364  vstring::iterator endAfterRemove = std::remove_if(modulesInConfig.begin(), modulesInConfig.end(), std::bind(binary_search_string, std::ref(labelsToBeDropped), _1));
365  modulesInConfig.erase(endAfterRemove, modulesInConfig.end());
366  proc_pset.addParameter<vstring>(std::string("@all_modules"), modulesInConfig);
367 
368  // drop the labels from all end paths
369  vstring endPathsToBeDropped;
370  vstring labels;
371  for (vstring::const_iterator iEndPath = end_path_name_list.begin(), endEndPath = end_path_name_list.end();
372  iEndPath != endEndPath;
373  ++iEndPath) {
374  labels = proc_pset.getParameter<vstring>(*iEndPath);
375  vstring::iterator iSave = labels.begin();
376  vstring::iterator iBegin = labels.begin();
377 
378  for (vstring::iterator iLabel = labels.begin(), iEnd = labels.end();
379  iLabel != iEnd; ++iLabel) {
380  if (binary_search_string(labelsToBeDropped, *iLabel)) {
381  if (binary_search_string(outputModuleLabels, *iLabel)) {
382  outputModulePathPositions[*iLabel].emplace_back(*iEndPath, iSave - iBegin);
383  }
384  } else {
385  if (iSave != iLabel) {
386  iSave->swap(*iLabel);
387  }
388  ++iSave;
389  }
390  }
391  labels.erase(iSave, labels.end());
392  if (labels.empty()) {
393  // remove empty end paths and save their names
394  proc_pset.eraseSimpleParameter(*iEndPath);
395  endPathsToBeDropped.push_back(*iEndPath);
396  } else {
397  proc_pset.addParameter<vstring>(*iEndPath, labels);
398  }
399  }
400  sort_all(endPathsToBeDropped);
401 
402  // remove empty end paths from @paths
403  endAfterRemove = std::remove_if(scheduledPaths.begin(), scheduledPaths.end(), std::bind(binary_search_string, std::ref(endPathsToBeDropped), _1));
404  scheduledPaths.erase(endAfterRemove, scheduledPaths.end());
405  proc_pset.addParameter<vstring>(std::string("@paths"), scheduledPaths);
406 
407  // remove empty end paths from @end_paths
408  vstring scheduledEndPaths = proc_pset.getParameter<vstring>("@end_paths");
409  endAfterRemove = std::remove_if(scheduledEndPaths.begin(), scheduledEndPaths.end(), std::bind(binary_search_string, std::ref(endPathsToBeDropped), _1));
410  scheduledEndPaths.erase(endAfterRemove, scheduledEndPaths.end());
411  proc_pset.addParameter<vstring>(std::string("@end_paths"), scheduledEndPaths);
412 
413  }
414 
415  class RngEDConsumer : public EDConsumerBase {
416  public:
417  explicit RngEDConsumer(std::set<TypeID>& typesConsumed) {
419  if(rng.isAvailable()) {
420  rng->consumes(consumesCollector());
421  for (auto const& consumesInfo : this->consumesInfo()) {
422  typesConsumed.emplace(consumesInfo.type());
423  }
424  }
425  }
426  };
427  }
428  // -----------------------------
429 
430  typedef std::vector<std::string> vstring;
431 
432  // -----------------------------
433 
435  service::TriggerNamesService const& tns,
436  ProductRegistry& preg,
437  BranchIDListHelper& branchIDListHelper,
438  ThinnedAssociationsHelper& thinnedAssociationsHelper,
439  SubProcessParentageHelper const* subProcessParentageHelper,
440  ExceptionToActionTable const& actions,
441  std::shared_ptr<ActivityRegistry> areg,
442  std::shared_ptr<ProcessConfiguration> processConfiguration,
443  bool hasSubprocesses,
444  PreallocationConfiguration const& prealloc,
445  ProcessContext const* processContext) :
446  //Only create a resultsInserter if there is a trigger path
447  resultsInserter_{tns.getTrigPaths().empty()? std::shared_ptr<TriggerResultInserter>{} :makeInserter(proc_pset,prealloc,preg,actions,areg,processConfiguration)},
450  preallocConfig_(prealloc),
451  pathNames_(&tns.getTrigPaths()),
452  endPathNames_(&tns.getEndPaths()),
453  wantSummary_(tns.wantSummary()),
454  endpathsAreActive_(true)
455  {
456  makePathStatusInserters(pathStatusInserters_,
457  *pathNames_,
458  prealloc,
459  preg,
460  areg,
461  processConfiguration,
462  std::string("PathStatusInserter"));
463 
464  makePathStatusInserters(endPathStatusInserters_,
465  *endPathNames_,
466  prealloc,
467  preg,
468  areg,
469  processConfiguration,
470  std::string("EndPathStatusInserter"));
471 
472  assert(0<prealloc.numberOfStreams());
473  streamSchedules_.reserve(prealloc.numberOfStreams());
474  for(unsigned int i=0; i<prealloc.numberOfStreams();++i) {
475  streamSchedules_.emplace_back(std::make_shared<StreamSchedule>(
476  resultsInserter(),
479  moduleRegistry(),
480  proc_pset,tns,prealloc,preg,
481  branchIDListHelper,actions,
482  areg,processConfiguration,
483  !hasSubprocesses,
484  StreamID{i},
485  processContext));
486  }
487 
488  //TriggerResults are injected automatically by StreamSchedules and are
489  // unknown to the ModuleRegistry
490  const std::string kTriggerResults("TriggerResults");
491  std::vector<std::string> modulesToUse;
492  modulesToUse.reserve(streamSchedules_[0]->allWorkers().size());
493  for(auto const& worker : streamSchedules_[0]->allWorkers()) {
494  if(worker->description().moduleLabel() != kTriggerResults) {
495  modulesToUse.push_back(worker->description().moduleLabel());
496  }
497  }
498  //The unscheduled modules are at the end of the list, but we want them at the front
499  unsigned int n = streamSchedules_[0]->numberOfUnscheduledModules();
500  if(n>0) {
501  std::vector<std::string> temp;
502  temp.reserve(modulesToUse.size());
503  auto itBeginUnscheduled = modulesToUse.begin()+modulesToUse.size()-n;
504  std::copy(itBeginUnscheduled,modulesToUse.end(),
505  std::back_inserter(temp));
506  std::copy(modulesToUse.begin(),itBeginUnscheduled,std::back_inserter(temp));
507  temp.swap(modulesToUse);
508  }
509 
510  // propagate_const<T> has no reset() function
511  globalSchedule_ = std::make_unique<GlobalSchedule>(
512  resultsInserter(),
515  moduleRegistry(),
516  modulesToUse,
517  proc_pset, preg, prealloc,
518  actions,areg,processConfiguration,processContext);
519 
520  //TriggerResults is not in the top level ParameterSet so the call to
521  // reduceParameterSet would fail to find it. Just remove it up front.
522  std::set<std::string> usedModuleLabels;
523  for(auto const& worker: allWorkers()) {
524  if(worker->description().moduleLabel() != kTriggerResults) {
525  usedModuleLabels.insert(worker->description().moduleLabel());
526  }
527  }
528  std::vector<std::string> modulesInConfig(proc_pset.getParameter<std::vector<std::string> >("@all_modules"));
529  std::map<std::string, std::vector<std::pair<std::string, int> > > outputModulePathPositions;
530  reduceParameterSet(proc_pset, tns.getEndPaths(), modulesInConfig, usedModuleLabels,
531  outputModulePathPositions);
532  processEDAliases(proc_pset, processConfiguration->processName(), preg);
533  proc_pset.registerIt();
534  processConfiguration->setParameterSetID(proc_pset.id());
535  processConfiguration->setProcessConfigurationID();
536 
537  // This is used for a little sanity-check to make sure no code
538  // modifications alter the number of workers at a later date.
539  size_t all_workers_count = allWorkers().size();
540 
541  moduleRegistry_->forAllModuleHolders([this](maker::ModuleHolder* iHolder){
542  auto comm = iHolder->createOutputModuleCommunicator();
543  if (comm) {
544  all_output_communicators_.emplace_back(std::shared_ptr<OutputModuleCommunicator>{comm.release()});
545  }
546  });
547  // Now that the output workers are filled in, set any output limits or information.
548  limitOutput(proc_pset, branchIDListHelper.branchIDLists(), subProcessParentageHelper);
549 
550  // Sanity check: make sure nobody has added a worker after we've
551  // already relied on the WorkerManager being full.
552  assert (all_workers_count == allWorkers().size());
553 
554  branchIDListHelper.updateFromRegistry(preg);
555 
556  for(auto const& worker : streamSchedules_[0]->allWorkers()) {
557  worker->registerThinnedAssociations(preg, thinnedAssociationsHelper);
558  }
559  thinnedAssociationsHelper.sort();
560 
561  // The output modules consume products in kept branches.
562  // So we must set this up before freezing.
563  for (auto& c : all_output_communicators_) {
564  c->selectProducts(preg, thinnedAssociationsHelper);
565  }
566 
567  {
568  // We now get a collection of types that may be consumed.
569  std::set<TypeID> productTypesConsumed;
570  std::set<TypeID> elementTypesConsumed;
571  // Loop over all modules
572  for (auto const& worker : allWorkers()) {
573  for (auto const& consumesInfo : worker->consumesInfo()) {
574  if (consumesInfo.kindOfType() == PRODUCT_TYPE) {
575  productTypesConsumed.emplace(consumesInfo.type());
576  } else {
577  elementTypesConsumed.emplace(consumesInfo.type());
578  }
579  }
580  }
581  // The SubProcess class is not a module, yet it may consume.
582  if(hasSubprocesses) {
583  productTypesConsumed.emplace(typeid(TriggerResults));
584  }
585  // The RandomNumberGeneratorService is not a module, yet it consumes.
586  {
587  RngEDConsumer rngConsumer = RngEDConsumer(productTypesConsumed);
588  }
589  preg.setFrozen(productTypesConsumed, elementTypesConsumed, processConfiguration->processName());
590  }
591 
592  for (auto& c : all_output_communicators_) {
593  c->setEventSelectionInfo(outputModulePathPositions, preg.anyProductProduced());
594  }
595 
596  if(wantSummary_) {
597  std::vector<const ModuleDescription*> modDesc;
598  const auto& workers = allWorkers();
599  modDesc.reserve(workers.size());
600 
601  std::transform(workers.begin(),workers.end(),
602  std::back_inserter(modDesc),
603  [](const Worker* iWorker) -> const ModuleDescription* {
604  return iWorker->descPtr();
605  });
606 
607  // propagate_const<T> has no reset() function
608  summaryTimeKeeper_ = std::make_unique<SystemTimeKeeper>(
609  prealloc.numberOfStreams(),
610  modDesc,
611  tns,
612  processContext);
613  auto timeKeeperPtr = summaryTimeKeeper_.get();
614 
615  areg->watchPreModuleEvent(timeKeeperPtr, &SystemTimeKeeper::startModuleEvent);
616  areg->watchPostModuleEvent(timeKeeperPtr, &SystemTimeKeeper::stopModuleEvent);
617  areg->watchPreModuleEventAcquire(timeKeeperPtr, &SystemTimeKeeper::restartModuleEvent);
618  areg->watchPostModuleEventAcquire(timeKeeperPtr, &SystemTimeKeeper::stopModuleEvent);
619  areg->watchPreModuleEventDelayedGet(timeKeeperPtr, &SystemTimeKeeper::pauseModuleEvent);
620  areg->watchPostModuleEventDelayedGet(timeKeeperPtr,&SystemTimeKeeper::restartModuleEvent);
621 
622  areg->watchPreSourceEvent(timeKeeperPtr, &SystemTimeKeeper::startEvent);
623  areg->watchPostEvent(timeKeeperPtr, &SystemTimeKeeper::stopEvent);
624 
625  areg->watchPrePathEvent(timeKeeperPtr, &SystemTimeKeeper::startPath);
626  areg->watchPostPathEvent(timeKeeperPtr, &SystemTimeKeeper::stopPath);
627 
628  areg->watchPostBeginJob(timeKeeperPtr, &SystemTimeKeeper::startProcessingLoop);
629  areg->watchPreEndJob(timeKeeperPtr, &SystemTimeKeeper::stopProcessingLoop);
630  //areg->preModuleEventSignal_.connect([timeKeeperPtr](StreamContext const& iContext, ModuleCallingContext const& iMod) {
631  //timeKeeperPtr->startModuleEvent(iContext,iMod);
632  //});
633  }
634 
635  } // Schedule::Schedule
636 
637 
638  void
640  BranchIDLists const& branchIDLists,
641  SubProcessParentageHelper const* subProcessParentageHelper) {
642  std::string const output("output");
643 
644  ParameterSet const& maxEventsPSet = proc_pset.getUntrackedParameterSet("maxEvents", ParameterSet());
645  int maxEventSpecs = 0;
646  int maxEventsOut = -1;
647  ParameterSet const* vMaxEventsOut = nullptr;
648  std::vector<std::string> intNamesE = maxEventsPSet.getParameterNamesForType<int>(false);
649  if (search_all(intNamesE, output)) {
650  maxEventsOut = maxEventsPSet.getUntrackedParameter<int>(output);
651  ++maxEventSpecs;
652  }
653  std::vector<std::string> psetNamesE;
654  maxEventsPSet.getParameterSetNames(psetNamesE, false);
655  if (search_all(psetNamesE, output)) {
656  vMaxEventsOut = &maxEventsPSet.getUntrackedParameterSet(output);
657  ++maxEventSpecs;
658  }
659 
660  if (maxEventSpecs > 1) {
662  "\nAt most, one form of 'output' may appear in the 'maxEvents' parameter set";
663  }
664 
665  for (auto& c : all_output_communicators_) {
666  OutputModuleDescription desc(branchIDLists, maxEventsOut, subProcessParentageHelper);
667  if (vMaxEventsOut != nullptr && !vMaxEventsOut->empty()) {
668  std::string const& moduleLabel = c->description().moduleLabel();
669  try {
670  desc.maxEvents_ = vMaxEventsOut->getUntrackedParameter<int>(moduleLabel);
671  } catch (Exception const&) {
673  "\nNo entry in 'maxEvents' for output module label '" << moduleLabel << "'.\n";
674  }
675  }
676  c->configure(desc);
677  }
678  }
679 
680  bool Schedule::terminate() const {
681  if (all_output_communicators_.empty()) {
682  return false;
683  }
684  for (auto& c : all_output_communicators_) {
685  if (!c->limitReached()) {
686  // Found an output module that has not reached output event count.
687  return false;
688  }
689  }
690  LogInfo("SuccessfulTermination")
691  << "The job is terminating successfully because each output module\n"
692  << "has reached its configured limit.\n";
693  return true;
694  }
695 
697  globalSchedule_->endJob(collector);
698  if (collector.hasThrown()) {
699  return;
700  }
701 
702  if (wantSummary_ == false) return;
703  {
704  TriggerReport tr;
705  getTriggerReport(tr);
706 
707  // The trigger report (pass/fail etc.):
708 
709  LogVerbatim("FwkSummary") << "";
710  if(streamSchedules_[0]->context().processContext()->isSubProcess()) {
711  LogVerbatim("FwkSummary") << "TrigReport Process: "<<streamSchedules_[0]->context().processContext()->processName();
712  }
713  LogVerbatim("FwkSummary") << "TrigReport " << "---------- Event Summary ------------";
714  if(!tr.trigPathSummaries.empty()) {
715  LogVerbatim("FwkSummary") << "TrigReport"
716  << " Events total = " << tr.eventSummary.totalEvents
717  << " passed = " << tr.eventSummary.totalEventsPassed
718  << " failed = " << tr.eventSummary.totalEventsFailed
719  << "";
720  } else {
721  LogVerbatim("FwkSummary") << "TrigReport"
722  << " Events total = " << tr.eventSummary.totalEvents
723  << " passed = " << tr.eventSummary.totalEvents
724  << " failed = 0";
725  }
726 
727  LogVerbatim("FwkSummary") << "";
728  LogVerbatim("FwkSummary") << "TrigReport " << "---------- Path Summary ------------";
729  LogVerbatim("FwkSummary") << "TrigReport "
730  << std::right << std::setw(10) << "Trig Bit#" << " "
731  << std::right << std::setw(10) << "Executed" << " "
732  << std::right << std::setw(10) << "Passed" << " "
733  << std::right << std::setw(10) << "Failed" << " "
734  << std::right << std::setw(10) << "Error" << " "
735  << "Name" << "";
736  for (auto const& p: tr.trigPathSummaries) {
737  LogVerbatim("FwkSummary") << "TrigReport "
738  << std::right << std::setw(5) << 1
739  << std::right << std::setw(5) << p.bitPosition << " "
740  << std::right << std::setw(10) << p.timesRun << " "
741  << std::right << std::setw(10) << p.timesPassed << " "
742  << std::right << std::setw(10) << p.timesFailed << " "
743  << std::right << std::setw(10) << p.timesExcept << " "
744  << p.name << "";
745  }
746 
747  /*
748  std::vector<int>::const_iterator epi = empty_trig_paths_.begin();
749  std::vector<int>::const_iterator epe = empty_trig_paths_.end();
750  std::vector<std::string>::const_iterator epn = empty_trig_path_names_.begin();
751  for (; epi != epe; ++epi, ++epn) {
752 
753  LogVerbatim("FwkSummary") << "TrigReport "
754  << std::right << std::setw(5) << 1
755  << std::right << std::setw(5) << *epi << " "
756  << std::right << std::setw(10) << totalEvents() << " "
757  << std::right << std::setw(10) << totalEvents() << " "
758  << std::right << std::setw(10) << 0 << " "
759  << std::right << std::setw(10) << 0 << " "
760  << *epn << "";
761  }
762  */
763 
764  LogVerbatim("FwkSummary") << "";
765  LogVerbatim("FwkSummary") << "TrigReport " << "-------End-Path Summary ------------";
766  LogVerbatim("FwkSummary") << "TrigReport "
767  << std::right << std::setw(10) << "Trig Bit#" << " "
768  << std::right << std::setw(10) << "Executed" << " "
769  << std::right << std::setw(10) << "Passed" << " "
770  << std::right << std::setw(10) << "Failed" << " "
771  << std::right << std::setw(10) << "Error" << " "
772  << "Name" << "";
773  for (auto const& p: tr.endPathSummaries) {
774  LogVerbatim("FwkSummary") << "TrigReport "
775  << std::right << std::setw(5) << 0
776  << std::right << std::setw(5) << p.bitPosition << " "
777  << std::right << std::setw(10) << p.timesRun << " "
778  << std::right << std::setw(10) << p.timesPassed << " "
779  << std::right << std::setw(10) << p.timesFailed << " "
780  << std::right << std::setw(10) << p.timesExcept << " "
781  << p.name << "";
782  }
783 
784  for (auto const& p: tr.trigPathSummaries) {
785  LogVerbatim("FwkSummary") << "";
786  LogVerbatim("FwkSummary") << "TrigReport " << "---------- Modules in Path: " << p.name << " ------------";
787  LogVerbatim("FwkSummary") << "TrigReport "
788  << std::right << std::setw(10) << "Trig Bit#" << " "
789  << std::right << std::setw(10) << "Visited" << " "
790  << std::right << std::setw(10) << "Passed" << " "
791  << std::right << std::setw(10) << "Failed" << " "
792  << std::right << std::setw(10) << "Error" << " "
793  << "Name" << "";
794 
795  unsigned int bitpos = 0;
796  for (auto const& mod: p.moduleInPathSummaries) {
797  LogVerbatim("FwkSummary") << "TrigReport "
798  << std::right << std::setw(5) << 1
799  << std::right << std::setw(5) << bitpos << " "
800  << std::right << std::setw(10) << mod.timesVisited << " "
801  << std::right << std::setw(10) << mod.timesPassed << " "
802  << std::right << std::setw(10) << mod.timesFailed << " "
803  << std::right << std::setw(10) << mod.timesExcept << " "
804  << mod.moduleLabel << "";
805  ++bitpos;
806  }
807  }
808 
809  for (auto const& p: tr.endPathSummaries) {
810  LogVerbatim("FwkSummary") << "";
811  LogVerbatim("FwkSummary") << "TrigReport " << "------ Modules in End-Path: " << p.name << " ------------";
812  LogVerbatim("FwkSummary") << "TrigReport "
813  << std::right << std::setw(10) << "Trig Bit#" << " "
814  << std::right << std::setw(10) << "Visited" << " "
815  << std::right << std::setw(10) << "Passed" << " "
816  << std::right << std::setw(10) << "Failed" << " "
817  << std::right << std::setw(10) << "Error" << " "
818  << "Name" << "";
819 
820  unsigned int bitpos=0;
821  for (auto const& mod: p.moduleInPathSummaries) {
822  LogVerbatim("FwkSummary") << "TrigReport "
823  << std::right << std::setw(5) << 0
824  << std::right << std::setw(5) << bitpos << " "
825  << std::right << std::setw(10) << mod.timesVisited << " "
826  << std::right << std::setw(10) << mod.timesPassed << " "
827  << std::right << std::setw(10) << mod.timesFailed << " "
828  << std::right << std::setw(10) << mod.timesExcept << " "
829  << mod.moduleLabel << "";
830  ++bitpos;
831  }
832  }
833 
834  LogVerbatim("FwkSummary") << "";
835  LogVerbatim("FwkSummary") << "TrigReport " << "---------- Module Summary ------------";
836  LogVerbatim("FwkSummary") << "TrigReport "
837  << std::right << std::setw(10) << "Visited" << " "
838  << std::right << std::setw(10) << "Executed" << " "
839  << std::right << std::setw(10) << "Passed" << " "
840  << std::right << std::setw(10) << "Failed" << " "
841  << std::right << std::setw(10) << "Error" << " "
842  << "Name" << "";
843  for (auto const& worker : tr.workerSummaries) {
844  LogVerbatim("FwkSummary") << "TrigReport "
845  << std::right << std::setw(10) << worker.timesVisited << " "
846  << std::right << std::setw(10) << worker.timesRun << " "
847  << std::right << std::setw(10) << worker.timesPassed << " "
848  << std::right << std::setw(10) << worker.timesFailed << " "
849  << std::right << std::setw(10) << worker.timesExcept << " "
850  << worker.moduleLabel << "";
851  }
852  LogVerbatim("FwkSummary") << "";
853  }
854  // The timing report (CPU and Real Time):
857 
858  const int totalEvents = std::max(1, tr.eventSummary.totalEvents);
859 
860  LogVerbatim("FwkSummary") << "TimeReport " << "---------- Event Summary ---[sec]----";
861  LogVerbatim("FwkSummary") << "TimeReport"
862  << std::setprecision(6) << std::fixed
863  << " event loop CPU/event = " << tr.eventSummary.cpuTime/totalEvents;
864  LogVerbatim("FwkSummary") << "TimeReport"
865  << std::setprecision(6) << std::fixed
866  << " event loop Real/event = " << tr.eventSummary.realTime/totalEvents;
867  LogVerbatim("FwkSummary") << "TimeReport"
868  << std::setprecision(6) << std::fixed
869  << " sum Streams Real/event = " << tr.eventSummary.sumStreamRealTime/totalEvents;
870  LogVerbatim("FwkSummary") << "TimeReport"
871  << std::setprecision(6) << std::fixed
872  << " efficiency CPU/Real/thread = " << tr.eventSummary.cpuTime/tr.eventSummary.realTime/preallocConfig_.numberOfThreads();
873 
874  constexpr int kColumn1Size = 10;
875  constexpr int kColumn2Size = 12;
876  constexpr int kColumn3Size = 12;
877  LogVerbatim("FwkSummary") << "";
878  LogVerbatim("FwkSummary") << "TimeReport " << "---------- Path Summary ---[Real sec]----";
879  LogVerbatim("FwkSummary") << "TimeReport "
880  << std::right << std::setw(kColumn1Size) << "per event"<<" "
881  << std::right << std::setw(kColumn2Size) << "per exec"
882  << " Name";
883  for (auto const& p: tr.trigPathSummaries) {
884  const int timesRun = std::max(1, p.timesRun);
885  LogVerbatim("FwkSummary") << "TimeReport "
886  << std::setprecision(6) << std::fixed
887  << std::right << std::setw(kColumn1Size) << p.realTime/totalEvents << " "
888  << std::right << std::setw(kColumn2Size) << p.realTime/timesRun << " "
889  << p.name << "";
890  }
891  LogVerbatim("FwkSummary") << "TimeReport "
892  << std::right << std::setw(kColumn1Size) << "per event"<<" "
893  << std::right << std::setw(kColumn2Size) << "per exec"
894  << " Name" << "";
895 
896  LogVerbatim("FwkSummary") << "";
897  LogVerbatim("FwkSummary") << "TimeReport " << "-------End-Path Summary ---[Real sec]----";
898  LogVerbatim("FwkSummary") << "TimeReport "
899  << std::right << std::setw(kColumn1Size) << "per event" <<" "
900  << std::right << std::setw(kColumn2Size) << "per exec"
901  << " Name" << "";
902  for (auto const& p: tr.endPathSummaries) {
903  const int timesRun = std::max(1, p.timesRun);
904 
905  LogVerbatim("FwkSummary") << "TimeReport "
906  << std::setprecision(6) << std::fixed
907  << std::right << std::setw(kColumn1Size) << p.realTime/totalEvents << " "
908  << std::right << std::setw(kColumn2Size) << p.realTime/timesRun << " "
909  << p.name << "";
910  }
911  LogVerbatim("FwkSummary") << "TimeReport "
912  << std::right << std::setw(kColumn1Size) << "per event" <<" "
913  << std::right << std::setw(kColumn2Size) << "per exec"
914  << " Name" << "";
915 
916  for (auto const& p: tr.trigPathSummaries) {
917  LogVerbatim("FwkSummary") << "";
918  LogVerbatim("FwkSummary") << "TimeReport " << "---------- Modules in Path: " << p.name << " ---[Real sec]----";
919  LogVerbatim("FwkSummary") << "TimeReport "
920  << std::right << std::setw(kColumn1Size) << "per event" <<" "
921  << std::right << std::setw(kColumn2Size) << "per visit"
922  << " Name" << "";
923  for (auto const& mod: p.moduleInPathSummaries) {
924  LogVerbatim("FwkSummary") << "TimeReport "
925  << std::setprecision(6) << std::fixed
926  << std::right << std::setw(kColumn1Size) << mod.realTime/totalEvents << " "
927  << std::right << std::setw(kColumn2Size) << mod.realTime/std::max(1, mod.timesVisited) << " "
928  << mod.moduleLabel << "";
929  }
930  }
931  if(not tr.trigPathSummaries.empty()) {
932  LogVerbatim("FwkSummary") << "TimeReport "
933  << std::right << std::setw(kColumn1Size) << "per event" <<" "
934  << std::right << std::setw(kColumn2Size) << "per visit"
935  << " Name" << "";
936  }
937  for (auto const& p: tr.endPathSummaries) {
938  LogVerbatim("FwkSummary") << "";
939  LogVerbatim("FwkSummary") << "TimeReport " << "------ Modules in End-Path: " << p.name << " ---[Real sec]----";
940  LogVerbatim("FwkSummary") << "TimeReport "
941  << std::right << std::setw(kColumn1Size) << "per event" <<" "
942  << std::right << std::setw(kColumn2Size) << "per visit"
943  << " Name" << "";
944  for (auto const& mod: p.moduleInPathSummaries) {
945  LogVerbatim("FwkSummary") << "TimeReport "
946  << std::setprecision(6) << std::fixed
947  << std::right << std::setw(kColumn1Size) << mod.realTime/totalEvents << " "
948  << std::right << std::setw(kColumn2Size) << mod.realTime/std::max(1, mod.timesVisited) << " "
949  << mod.moduleLabel << "";
950  }
951  }
952  if(not tr.endPathSummaries.empty()) {
953  LogVerbatim("FwkSummary") << "TimeReport "
954  << std::right << std::setw(kColumn1Size) << "per event" <<" "
955  << std::right << std::setw(kColumn2Size) << "per visit"
956  << " Name" << "";
957  }
958  LogVerbatim("FwkSummary") << "";
959  LogVerbatim("FwkSummary") << "TimeReport " << "---------- Module Summary ---[Real sec]----";
960  LogVerbatim("FwkSummary") << "TimeReport "
961  << std::right << std::setw(kColumn1Size) << "per event" <<" "
962  << std::right << std::setw(kColumn2Size) << "per exec" <<" "
963  << std::right << std::setw(kColumn3Size) << "per visit"
964  << " Name" << "";
965  for (auto const& worker : tr.workerSummaries) {
966  LogVerbatim("FwkSummary") << "TimeReport "
967  << std::setprecision(6) << std::fixed
968  << std::right << std::setw(kColumn1Size) << worker.realTime/totalEvents << " "
969  << std::right << std::setw(kColumn2Size) << worker.realTime/std::max(1, worker.timesRun) << " "
970  << std::right << std::setw(kColumn3Size) << worker.realTime/std::max(1, worker.timesVisited) << " "
971  << worker.moduleLabel << "";
972  }
973  LogVerbatim("FwkSummary") << "TimeReport "
974  << std::right << std::setw(kColumn1Size) << "per event" <<" "
975  << std::right << std::setw(kColumn2Size) << "per exec" <<" "
976  << std::right << std::setw(kColumn3Size) << "per visit"
977  << " Name" << "";
978 
979  LogVerbatim("FwkSummary") << "";
980  LogVerbatim("FwkSummary") << "T---Report end!" << "";
981  LogVerbatim("FwkSummary") << "";
982  }
983 
985  using std::placeholders::_1;
987  }
988 
990  using std::placeholders::_1;
992  }
993 
994  void Schedule::writeRun(RunPrincipal const& rp, ProcessContext const* processContext) {
995  using std::placeholders::_1;
996  for_all(all_output_communicators_, std::bind(&OutputModuleCommunicator::writeRun, _1, std::cref(rp), processContext));
997  }
998 
999  void Schedule::writeLumi(LuminosityBlockPrincipal const& lbp, ProcessContext const* processContext) {
1000  using std::placeholders::_1;
1001  for_all(all_output_communicators_, std::bind(&OutputModuleCommunicator::writeLumi, _1, std::cref(lbp), processContext));
1002  }
1003 
1005  using std::placeholders::_1;
1006  // Return true iff at least one output module returns true.
1007  return (std::find_if (all_output_communicators_.begin(), all_output_communicators_.end(),
1009  != all_output_communicators_.end());
1010  }
1011 
1013  using std::placeholders::_1;
1014  for_all(allWorkers(), std::bind(&Worker::respondToOpenInputFile, _1, std::cref(fb)));
1015  }
1016 
1018  using std::placeholders::_1;
1019  for_all(allWorkers(), std::bind(&Worker::respondToCloseInputFile, _1, std::cref(fb)));
1020  }
1021 
1022  void Schedule::beginJob(ProductRegistry const& iRegistry) {
1023  globalSchedule_->beginJob(iRegistry);
1024  }
1025 
1026  void Schedule::beginStream(unsigned int iStreamID) {
1027  assert(iStreamID<streamSchedules_.size());
1028  streamSchedules_[iStreamID]->beginStream();
1029  }
1030 
1031  void Schedule::endStream(unsigned int iStreamID) {
1032  assert(iStreamID<streamSchedules_.size());
1033  streamSchedules_[iStreamID]->endStream();
1034  }
1035 
1037  unsigned int iStreamID,
1038  EventPrincipal& ep,
1039  EventSetup const& es) {
1040  assert(iStreamID<streamSchedules_.size());
1041  streamSchedules_[iStreamID]->processOneEventAsync(std::move(iTask),ep,es,pathStatusInserters_);
1042  }
1043 
1045  ParameterSet const& iPSet,
1046  const ProductRegistry& iRegistry) {
1047  Worker* found = nullptr;
1048  for (auto const& worker : allWorkers()) {
1049  if (worker->description().moduleLabel() == iLabel) {
1050  found = worker;
1051  break;
1052  }
1053  }
1054  if (nullptr == found) {
1055  return false;
1056  }
1057 
1058  auto newMod = moduleRegistry_->replaceModule(iLabel,iPSet,preallocConfig_);
1059 
1060  globalSchedule_->replaceModule(newMod,iLabel);
1061 
1062  for(auto& s: streamSchedules_) {
1063  s->replaceModule(newMod,iLabel);
1064  }
1065 
1066  {
1067  //Need to updateLookup in order to make getByToken work
1068  auto const runLookup = iRegistry.productLookup(InRun);
1069  auto const lumiLookup = iRegistry.productLookup(InLumi);
1070  auto const eventLookup = iRegistry.productLookup(InEvent);
1071  found->updateLookup(InRun,*runLookup);
1072  found->updateLookup(InLumi,*lumiLookup);
1073  found->updateLookup(InEvent,*eventLookup);
1074 
1075  auto const& processName = newMod->moduleDescription().processName();
1076  auto const& runModuleToIndicies = runLookup->indiciesForModulesInProcess(processName);
1077  auto const& lumiModuleToIndicies = lumiLookup->indiciesForModulesInProcess(processName);
1078  auto const& eventModuleToIndicies = eventLookup->indiciesForModulesInProcess(processName);
1079  found->resolvePutIndicies(InRun,runModuleToIndicies);
1080  found->resolvePutIndicies(InLumi,lumiModuleToIndicies);
1081  found->resolvePutIndicies(InEvent,eventModuleToIndicies);
1082 
1083 
1084  }
1085 
1086  return true;
1087  }
1088 
1089  std::vector<ModuleDescription const*>
1091  std::vector<ModuleDescription const*> result;
1092  result.reserve(allWorkers().size());
1093 
1094  for (auto const& worker : allWorkers()) {
1095  ModuleDescription const* p = worker->descPtr();
1096  result.push_back(p);
1097  }
1098  return result;
1099  }
1100 
1101  Schedule::AllWorkers const&
1103  return globalSchedule_->allWorkers();
1104  }
1105 
1107  for (auto const& worker : allWorkers()) {
1108  worker->convertCurrentProcessAlias(processName);
1109  }
1110  }
1111 
1112  void
1113  Schedule::availablePaths(std::vector<std::string>& oLabelsToFill) const {
1114  streamSchedules_[0]->availablePaths(oLabelsToFill);
1115  }
1116 
1117  void
1118  Schedule::triggerPaths(std::vector<std::string>& oLabelsToFill) const {
1119  oLabelsToFill = *pathNames_;
1120 
1121  }
1122 
1123  void
1124  Schedule::endPaths(std::vector<std::string>& oLabelsToFill) const {
1125  oLabelsToFill = *endPathNames_;
1126  }
1127 
1128  void
1130  std::vector<std::string>& oLabelsToFill) const {
1131  streamSchedules_[0]->modulesInPath(iPathLabel,oLabelsToFill);
1132  }
1133 
1134  void
1136  std::vector<ModuleDescription const*>& descriptions,
1137  unsigned int hint) const {
1138  streamSchedules_[0]->moduleDescriptionsInPath(iPathLabel, descriptions, hint);
1139  }
1140 
1141  void
1143  std::vector<ModuleDescription const*>& descriptions,
1144  unsigned int hint) const {
1145  streamSchedules_[0]->moduleDescriptionsInEndPath(iEndPathLabel, descriptions, hint);
1146  }
1147 
1148  void
1149  Schedule::fillModuleAndConsumesInfo(std::vector<ModuleDescription const*>& allModuleDescriptions,
1150  std::vector<std::pair<unsigned int, unsigned int> >& moduleIDToIndex,
1151  std::vector<std::vector<ModuleDescription const*> >& modulesWhoseProductsAreConsumedBy,
1152  ProductRegistry const& preg) const {
1153  allModuleDescriptions.clear();
1154  moduleIDToIndex.clear();
1155  modulesWhoseProductsAreConsumedBy.clear();
1156 
1157  allModuleDescriptions.reserve(allWorkers().size());
1158  moduleIDToIndex.reserve(allWorkers().size());
1159  modulesWhoseProductsAreConsumedBy.resize(allWorkers().size());
1160 
1161  std::map<std::string, ModuleDescription const*> labelToDesc;
1162  unsigned int i = 0;
1163  for (auto const& worker : allWorkers()) {
1164  ModuleDescription const* p = worker->descPtr();
1165  allModuleDescriptions.push_back(p);
1166  moduleIDToIndex.push_back(std::pair<unsigned int, unsigned int>(p->id(), i));
1167  labelToDesc[p->moduleLabel()] = p;
1168  ++i;
1169  }
1170  sort_all(moduleIDToIndex);
1171 
1172  i = 0;
1173  for (auto const& worker : allWorkers()) {
1174  std::vector<ModuleDescription const*>& modules = modulesWhoseProductsAreConsumedBy.at(i);
1175  worker->modulesWhoseProductsAreConsumed(modules, preg, labelToDesc);
1176  ++i;
1177  }
1178  }
1179 
1180  void
1182  endpathsAreActive_ = active;
1183  for(auto& s : streamSchedules_) {
1184  s->enableEndPaths(active);
1185  }
1186  }
1187 
1188  bool
1190  return endpathsAreActive_;
1191  }
1192 
1193  void
1195  rep.eventSummary.totalEvents = 0;
1198  for(auto& s: streamSchedules_) {
1199  s->getTriggerReport(rep);
1200  }
1202  }
1203 
1204  void
1206  rep.eventSummary.totalEvents = 0;
1207  rep.eventSummary.cpuTime = 0.;
1208  rep.eventSummary.realTime = 0.;
1209  summaryTimeKeeper_->fillTriggerTimingReport(rep);
1210  }
1211 
1212  int
1214  int returnValue = 0;
1215  for(auto& s: streamSchedules_) {
1216  returnValue += s->totalEvents();
1217  }
1218  return returnValue;
1219  }
1220 
1221  int
1223  int returnValue = 0;
1224  for(auto& s: streamSchedules_) {
1225  returnValue += s->totalEventsPassed();
1226  }
1227  return returnValue;
1228  }
1229 
1230  int
1232  int returnValue = 0;
1233  for(auto& s: streamSchedules_) {
1234  returnValue += s->totalEventsFailed();
1235  }
1236  return returnValue;
1237  }
1238 
1239 
1240  void
1242  for(auto& s: streamSchedules_) {
1243  s->clearCounters();
1244  }
1245  }
1246 }
size
Write out results.
bool empty() const
Definition: ParameterSet.h:218
std::vector< PathSummary > endPathSummaries
Definition: TriggerReport.h:68
T getUntrackedParameter(std::string const &, T const &) const
std::vector< PathTimingSummary > endPathSummaries
void stopEvent(StreamContext const &)
std::vector< BranchIDList > BranchIDLists
Definition: BranchIDList.h:19
void fillModuleAndConsumesInfo(std::vector< ModuleDescription const * > &allModuleDescriptions, std::vector< std::pair< unsigned int, unsigned int > > &moduleIDToIndex, std::vector< std::vector< ModuleDescription const * > > &modulesWhoseProductsAreConsumedBy, ProductRegistry const &preg) const
Definition: Schedule.cc:1149
virtual void openFile(FileBlock const &fb)=0
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
Definition: Schedule.cc:1102
roAction_t actions[nactions]
Definition: GenABIO.cc:187
void availablePaths(std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill the labels for all paths in the process
Definition: Schedule.cc:1113
virtual void writeRun(RunPrincipal const &rp, ProcessContext const *)=0
void restartModuleEvent(StreamContext const &, ModuleCallingContext const &)
bool endPathsEnabled() const
Definition: Schedule.cc:1189
void respondToCloseInputFile(FileBlock const &fb)
Definition: Schedule.cc:1017
void startModuleEvent(StreamContext const &, ModuleCallingContext const &)
std::shared_ptr< ModuleRegistry const > moduleRegistry() const
Definition: Schedule.h:277
std::vector< Worker * > AllWorkers
Definition: Schedule.h:121
std::vector< ParameterSet > VParameterSet
Definition: ParameterSet.h:33
std::vector< std::string > const * pathNames_
Definition: Schedule.h:293
void convertCurrentProcessAlias(std::string const &processName)
Convert "@currentProcess" in InputTag process names to the actual current process name...
Definition: Schedule.cc:1106
virtual bool shouldWeCloseFile() const =0
void writeRun(RunPrincipal const &rp, ProcessContext const *)
Definition: Schedule.cc:994
void endStream(unsigned int)
Definition: Schedule.cc:1031
void writeLumi(LuminosityBlockPrincipal const &lbp, ProcessContext const *)
Definition: Schedule.cc:999
void enableEndPaths(bool active)
Definition: Schedule.cc:1181
ParameterSet getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
virtual void updateLookup(BranchType iBranchType, ProductResolverIndexHelper const &)=0
void moduleDescriptionsInEndPath(std::string const &iEndPathLabel, std::vector< ModuleDescription const * > &descriptions, unsigned int hint) const
Definition: Schedule.cc:1142
std::vector< WorkerSummary > workerSummaries
Definition: TriggerReport.h:69
edm::propagate_const< std::unique_ptr< SystemTimeKeeper > > summaryTimeKeeper_
Definition: Schedule.h:291
std::string const & moduleLabel() const
static unsigned int getUniqueID()
Returns a unique id each time called. Intended to be passed to ModuleDescription&#39;s constructor&#39;s modI...
#define constexpr
int totalEventsFailed() const
Definition: Schedule.cc:1231
edm::propagate_const< std::unique_ptr< GlobalSchedule > > globalSchedule_
Definition: Schedule.h:286
bool changeModule(std::string const &iLabel, ParameterSet const &iPSet, const ProductRegistry &iRegistry)
Definition: Schedule.cc:1044
void eraseOrSetUntrackedParameterSet(std::string const &name)
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
std::vector< std::string > getParameterNamesForType(bool trackiness=true) const
Definition: ParameterSet.h:194
int totalEventsPassed() const
Definition: Schedule.cc:1222
void triggerPaths(std::vector< std::string > &oLabelsToFill) const
Definition: Schedule.cc:1118
std::vector< PathSummary > trigPathSummaries
Definition: TriggerReport.h:67
EventSummary eventSummary
Definition: TriggerReport.h:66
std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter > > > pathStatusInserters_
Definition: Schedule.h:281
int totalEvents() const
Definition: Schedule.cc:1213
virtual void resolvePutIndicies(BranchType iBranchType, std::unordered_multimap< std::string, std::tuple< TypeID const *, const char *, edm::ProductResolverIndex >> const &iIndicies)=0
EventTimingSummary eventSummary
void clearCounters()
Clear all the counters in the trigger report.
Definition: Schedule.cc:1241
Schedule(ParameterSet &proc_pset, service::TriggerNamesService const &tns, ProductRegistry &pregistry, BranchIDListHelper &branchIDListHelper, ThinnedAssociationsHelper &thinnedAssociationsHelper, SubProcessParentageHelper const *subProcessParentageHelper, ExceptionToActionTable const &actions, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration > processConfiguration, bool hasSubprocesses, PreallocationConfiguration const &config, ProcessContext const *processContext)
Definition: Schedule.cc:434
std::vector< PathTimingSummary > trigPathSummaries
void limitOutput(ParameterSet const &proc_pset, BranchIDLists const &branchIDLists, SubProcessParentageHelper const *subProcessParentageHelper)
Definition: Schedule.cc:639
bool terminate() const
Return whether each output module has reached its maximum count.
Definition: Schedule.cc:680
void respondToOpenInputFile(FileBlock const &fb)
Definition: Schedule.cc:1012
edm::propagate_const< std::shared_ptr< ModuleRegistry > > moduleRegistry_
Definition: Schedule.h:283
rep
Definition: cuy.py:1188
virtual void writeLumi(LuminosityBlockPrincipal const &lbp, ProcessContext const *)=0
void stopPath(StreamContext const &, PathContext const &, HLTPathStatus const &)
void stopModuleEvent(StreamContext const &, ModuleCallingContext const &)
void getTriggerReport(TriggerReport &rep) const
Definition: Schedule.cc:1194
PreallocationConfiguration preallocConfig_
Definition: Schedule.h:289
std::vector< edm::propagate_const< std::shared_ptr< StreamSchedule > > > streamSchedules_
Definition: Schedule.h:284
void sort_all(RandomAccessSequence &s)
wrappers for std::sort
Definition: Algorithms.h:120
volatile bool endpathsAreActive_
Definition: Schedule.h:297
void respondToOpenInputFile(FileBlock const &fb)
Definition: Worker.h:167
std::shared_ptr< TriggerResultInserter const > resultsInserter() const
Definition: Schedule.h:275
void startPath(StreamContext const &, PathContext const &)
bool search_all(ForwardSequence const &s, Datum const &d)
Definition: Algorithms.h:46
std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter > > > endPathStatusInserters_
Definition: Schedule.h:282
virtual std::unique_ptr< OutputModuleCommunicator > createOutputModuleCommunicator()=0
AllOutputModuleCommunicators all_output_communicators_
Definition: Schedule.h:288
void modulesInPath(std::string const &iPathLabel, std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel ...
Definition: Schedule.cc:1129
void pauseModuleEvent(StreamContext const &, ModuleCallingContext const &)
std::shared_ptr< ProductResolverIndexHelper const > productLookup(BranchType branchType) const
void beginStream(unsigned int)
Definition: Schedule.cc:1026
void respondToCloseInputFile(FileBlock const &fb)
Definition: Worker.h:168
bool wantSummary_
Definition: Schedule.h:295
std::vector< std::string > const * endPathNames_
Definition: Schedule.h:294
HLT enums.
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
Definition: Schedule.cc:1090
Strings const & getTrigPaths() const
void processOneEventAsync(WaitingTaskHolder iTask, unsigned int iStreamID, EventPrincipal &principal, EventSetup const &eventSetup)
Definition: Schedule.cc:1036
void beginJob(ProductRegistry const &)
Definition: Schedule.cc:1022
size_t getParameterSetNames(std::vector< std::string > &output, bool trackiness=true) const
void openOutputFiles(FileBlock &fb)
Definition: Schedule.cc:989
std::vector< WorkerTimingSummary > workerSummaries
T mod(const T &a, const T &b)
Definition: ecalDccMap.h:4
void endJob(ExceptionCollector &collector)
Definition: Schedule.cc:696
void getTriggerTimingReport(TriggerTimingReport &rep) const
Definition: Schedule.cc:1205
bool shouldWeCloseOutput() const
Definition: Schedule.cc:1004
std::vector< std::string > vstring
Definition: Schedule.cc:430
def move(src, dest)
Definition: eostools.py:510
void closeOutputFiles()
Definition: Schedule.cc:984
void endPaths(std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill the labels for all end paths in the process
Definition: Schedule.cc:1124
void moduleDescriptionsInPath(std::string const &iPathLabel, std::vector< ModuleDescription const * > &descriptions, unsigned int hint) const
Definition: Schedule.cc:1135
unsigned int id() const
std::string match(BranchDescription const &a, BranchDescription const &b, std::string const &fileName)