CMS 3D CMS Logo

Schedule.cc
Go to the documentation of this file.
2 
33 
34 
35 #include <algorithm>
36 #include <cassert>
37 #include <cstdlib>
38 #include <functional>
39 #include <iomanip>
40 #include <list>
41 #include <map>
42 #include <set>
43 #include <exception>
44 #include <sstream>
45 
46 namespace edm {
47 
48  class Maker;
49 
50  namespace {
51  using std::placeholders::_1;
52 
53  bool binary_search_string(std::vector<std::string> const& v, std::string const& s) {
54  return std::binary_search(v.begin(), v.end(), s);
55  }
56 
57  // Here we make the trigger results inserter directly. This should
58  // probably be a utility in the WorkerRegistry or elsewhere.
59 
60  std::shared_ptr<TriggerResultInserter>
61  makeInserter(ParameterSet& proc_pset,
62  PreallocationConfiguration const& iPrealloc,
63  ProductRegistry& preg,
64  ExceptionToActionTable const& actions,
65  std::shared_ptr<ActivityRegistry> areg,
66  std::shared_ptr<ProcessConfiguration> processConfiguration) {
67 
68  ParameterSet* trig_pset = proc_pset.getPSetForUpdate("@trigger_paths");
69  trig_pset->registerIt();
70 
71  WorkerParams work_args(trig_pset, preg, &iPrealloc, processConfiguration, actions);
72  ModuleDescription md(trig_pset->id(),
73  "TriggerResultInserter",
74  "TriggerResults",
75  processConfiguration.get(),
77 
78  areg->preModuleConstructionSignal_(md);
79  bool postCalled = false;
80  std::shared_ptr<TriggerResultInserter> returnValue;
81  try {
82  maker::ModuleHolderT<TriggerResultInserter> holder(std::make_shared<TriggerResultInserter>(*trig_pset, iPrealloc.numberOfStreams()),static_cast<Maker const*>(nullptr));
83  holder.setModuleDescription(md);
84  holder.registerProductsAndCallbacks(&preg);
85  returnValue =holder.module();
86  postCalled = true;
87  // if exception then post will be called in the catch block
88  areg->postModuleConstructionSignal_(md);
89  }
90  catch (...) {
91  if(!postCalled) {
92  try {
93  areg->postModuleConstructionSignal_(md);
94  }
95  catch (...) {
96  // If post throws an exception ignore it because we are already handling another exception
97  }
98  }
99  throw;
100  }
101  return returnValue;
102  }
103 
104  template <typename T>
105  void
106  makePathStatusInserters(std::vector<edm::propagate_const<std::shared_ptr<T>>>& pathStatusInserters,
107  std::vector<std::string> const& pathNames,
108  PreallocationConfiguration const& iPrealloc,
109  ProductRegistry& preg,
110  std::shared_ptr<ActivityRegistry> areg,
111  std::shared_ptr<ProcessConfiguration> processConfiguration,
112  std::string const& moduleTypeName) {
113 
115  pset.addParameter<std::string>("@module_type", moduleTypeName);
116  pset.addParameter<std::string>("@module_edm_type", "EDProducer");
117  pset.registerIt();
118 
119  pathStatusInserters.reserve(pathNames.size());
120 
121  for (auto const& pathName : pathNames) {
122 
123  ModuleDescription md(pset.id(),
124  moduleTypeName,
125  pathName,
126  processConfiguration.get(),
128 
129  areg->preModuleConstructionSignal_(md);
130  bool postCalled = false;
131 
132  try {
133  maker::ModuleHolderT<T> holder(std::make_shared<T>(iPrealloc.numberOfStreams()),
134  static_cast<Maker const*>(nullptr));
135  holder.setModuleDescription(md);
136  holder.registerProductsAndCallbacks(&preg);
137  pathStatusInserters.emplace_back(holder.module());
138  postCalled = true;
139  // if exception then post will be called in the catch block
140  areg->postModuleConstructionSignal_(md);
141  }
142  catch (...) {
143  if(!postCalled) {
144  try {
145  areg->postModuleConstructionSignal_(md);
146  }
147  catch (...) {
148  // If post throws an exception ignore it because we are already handling another exception
149  }
150  }
151  throw;
152  }
153  }
154  }
155 
156  void
157  checkAndInsertAlias(std::string const& friendlyClassName,
158  std::string const& moduleLabel,
159  std::string const& productInstanceName,
160  std::string const& processName,
161  std::string const& alias,
162  std::string const& instanceAlias,
163  ProductRegistry const& preg,
164  std::multimap<BranchKey, BranchKey>& aliasMap,
165  std::map<BranchKey, BranchKey>& aliasKeys) {
166  std::string const star("*");
167 
168  BranchKey key(friendlyClassName, moduleLabel, productInstanceName, processName);
169  if(preg.productList().find(key) == preg.productList().end()) {
170  // No product was found matching the alias.
171  // We throw an exception only if a module with the specified module label was created in this process.
172  for(auto const& product : preg.productList()) {
173  if(moduleLabel == product.first.moduleLabel() && processName == product.first.processName()) {
174  throw Exception(errors::Configuration, "EDAlias does not match data\n")
175  << "There are no products of type '" << friendlyClassName << "'\n"
176  << "with module label '" << moduleLabel << "' and instance name '" << productInstanceName << "'.\n";
177  }
178  }
179  }
180 
181  std::string const& theInstanceAlias(instanceAlias == star ? productInstanceName : instanceAlias);
182  BranchKey aliasKey(friendlyClassName, alias, theInstanceAlias, processName);
183  if(preg.productList().find(aliasKey) != preg.productList().end()) {
184  throw Exception(errors::Configuration, "EDAlias conflicts with data\n")
185  << "A product of type '" << friendlyClassName << "'\n"
186  << "with module label '" << alias << "' and instance name '" << theInstanceAlias << "'\n"
187  << "already exists.\n";
188  }
189  auto iter = aliasKeys.find(aliasKey);
190  if(iter != aliasKeys.end()) {
191  // The alias matches a previous one. If the same alias is used for different product, throw.
192  if(iter->second != key) {
193  throw Exception(errors::Configuration, "EDAlias conflict\n")
194  << "The module label alias '" << alias << "' and product instance alias '" << theInstanceAlias << "'\n"
195  << "are used for multiple products of type '" << friendlyClassName << "'\n"
196  << "One has module label '" << moduleLabel << "' and product instance name '" << productInstanceName << "',\n"
197  << "the other has module label '" << iter->second.moduleLabel() << "' and product instance name '" << iter->second.productInstanceName() << "'.\n";
198  }
199  } else {
200  auto prodIter = preg.productList().find(key);
201  if(prodIter != preg.productList().end()) {
202  if (!prodIter->second.produced()) {
203  throw Exception(errors::Configuration, "EDAlias\n")
204  << "The module label alias '" << alias << "' and product instance alias '" << theInstanceAlias << "'\n"
205  << "are used for a product of type '" << friendlyClassName << "'\n"
206  << "with module label '" << moduleLabel << "' and product instance name '" << productInstanceName << "',\n"
207  << "An EDAlias can only be used for products produced in the current process. This one is not.\n";
208  }
209  aliasMap.insert(std::make_pair(key, aliasKey));
210  aliasKeys.insert(std::make_pair(aliasKey, key));
211  }
212  }
213  }
214 
215  void
216  processEDAliases(ParameterSet const& proc_pset, std::string const& processName, ProductRegistry& preg) {
217  std::vector<std::string> aliases = proc_pset.getParameter<std::vector<std::string> >("@all_aliases");
218  if(aliases.empty()) {
219  return;
220  }
221  std::string const star("*");
222  std::string const empty("");
224  desc.add<std::string>("type");
225  desc.add<std::string>("fromProductInstance", star);
226  desc.add<std::string>("toProductInstance", star);
227 
228  std::multimap<BranchKey, BranchKey> aliasMap;
229 
230  std::map<BranchKey, BranchKey> aliasKeys; // Used to search for duplicates or clashes.
231 
232  // Now, loop over the alias information and store it in aliasMap.
233  for(std::string const& alias : aliases) {
234  ParameterSet const& aliasPSet = proc_pset.getParameterSet(alias);
235  std::vector<std::string> vPSetNames = aliasPSet.getParameterNamesForType<VParameterSet>();
236  for(std::string const& moduleLabel : vPSetNames) {
237  VParameterSet vPSet = aliasPSet.getParameter<VParameterSet>(moduleLabel);
238  for(ParameterSet& pset : vPSet) {
239  desc.validate(pset);
240  std::string friendlyClassName = pset.getParameter<std::string>("type");
241  std::string productInstanceName = pset.getParameter<std::string>("fromProductInstance");
242  std::string instanceAlias = pset.getParameter<std::string>("toProductInstance");
243  if(productInstanceName == star) {
244  bool match = false;
245  BranchKey lowerBound(friendlyClassName, moduleLabel, empty, empty);
246  for(ProductRegistry::ProductList::const_iterator it = preg.productList().lower_bound(lowerBound);
247  it != preg.productList().end() && it->first.friendlyClassName() == friendlyClassName && it->first.moduleLabel() == moduleLabel;
248  ++it) {
249  if(it->first.processName() != processName) {
250  continue;
251  }
252  match = true;
253 
254  checkAndInsertAlias(friendlyClassName, moduleLabel, it->first.productInstanceName(), processName, alias, instanceAlias, preg, aliasMap, aliasKeys);
255  }
256  if(!match) {
257  // No product was found matching the alias.
258  // We throw an exception only if a module with the specified module label was created in this process.
259  for(auto const& product : preg.productList()) {
260  if(moduleLabel == product.first.moduleLabel() && processName == product.first.processName()) {
261  throw Exception(errors::Configuration, "EDAlias parameter set mismatch\n")
262  << "There are no products of type '" << friendlyClassName << "'\n"
263  << "with module label '" << moduleLabel << "'.\n";
264  }
265  }
266  }
267  } else {
268  checkAndInsertAlias(friendlyClassName, moduleLabel, productInstanceName, processName, alias, instanceAlias, preg, aliasMap, aliasKeys);
269  }
270  }
271  }
272  }
273 
274 
275  // Now add the new alias entries to the product registry.
276  for(auto const& aliasEntry : aliasMap) {
277  ProductRegistry::ProductList::const_iterator it = preg.productList().find(aliasEntry.first);
278  assert(it != preg.productList().end());
279  preg.addLabelAlias(it->second, aliasEntry.second.moduleLabel(), aliasEntry.second.productInstanceName());
280  }
281 
282  }
283 
284  typedef std::vector<std::string> vstring;
285 
286  void reduceParameterSet(ParameterSet& proc_pset,
287  vstring const& end_path_name_list,
288  vstring& modulesInConfig,
289  std::set<std::string> const& usedModuleLabels,
290  std::map<std::string, std::vector<std::pair<std::string, int> > >& outputModulePathPositions) {
291  // Before calculating the ParameterSetID of the top level ParameterSet or
292  // saving it in the registry drop from the top level ParameterSet all
293  // OutputModules and EDAnalyzers not on trigger paths. If unscheduled
294  // production is not enabled also drop all the EDFilters and EDProducers
295  // that are not scheduled. Drop the ParameterSet used to configure the module
296  // itself. Also drop the other traces of these labels in the top level
297  // ParameterSet: Remove that labels from @all_modules and from all the
298  // end paths. If this makes any end paths empty, then remove the end path
299  // name from @end_paths, and @paths.
300 
301  // First make a list of labels to drop
302  vstring outputModuleLabels;
303  std::string edmType;
304  std::string const moduleEdmType("@module_edm_type");
305  std::string const outputModule("OutputModule");
306  std::string const edAnalyzer("EDAnalyzer");
307  std::string const edFilter("EDFilter");
308  std::string const edProducer("EDProducer");
309 
310  std::set<std::string> modulesInConfigSet(modulesInConfig.begin(), modulesInConfig.end());
311 
312  //need a list of all modules on paths in order to determine
313  // if an EDAnalyzer only appears on an end path
314  vstring scheduledPaths = proc_pset.getParameter<vstring>("@paths");
315  std::set<std::string> modulesOnPaths;
316  {
317  std::set<std::string> noEndPaths(scheduledPaths.begin(),scheduledPaths.end());
318  for(auto const& endPath: end_path_name_list) {
319  noEndPaths.erase(endPath);
320  }
321  {
322  vstring labels;
323  for(auto const& path: noEndPaths) {
324  labels = proc_pset.getParameter<vstring>(path);
325  modulesOnPaths.insert(labels.begin(),labels.end());
326  }
327  }
328  }
329  //Initially fill labelsToBeDropped with all module mentioned in
330  // the configuration but which are not being used by the system
331  std::vector<std::string> labelsToBeDropped;
332  labelsToBeDropped.reserve(modulesInConfigSet.size());
333  std::set_difference(modulesInConfigSet.begin(),modulesInConfigSet.end(),
334  usedModuleLabels.begin(),usedModuleLabels.end(),
335  std::back_inserter(labelsToBeDropped));
336 
337  const unsigned int sizeBeforeOutputModules = labelsToBeDropped.size();
338  for (auto const& modLabel: usedModuleLabels) {
339  // Do nothing for modules that do not have a ParameterSet. Modules of type
340  // PathStatusInserter and EndPathStatusInserter will not have a ParameterSet.
341  if (proc_pset.existsAs<ParameterSet>(modLabel)) {
342  edmType = proc_pset.getParameterSet(modLabel).getParameter<std::string>(moduleEdmType);
343  if (edmType == outputModule) {
344  outputModuleLabels.push_back(modLabel);
345  labelsToBeDropped.push_back(modLabel);
346  }
347  if(edmType == edAnalyzer) {
348  if(modulesOnPaths.end()==modulesOnPaths.find(modLabel)) {
349  labelsToBeDropped.push_back(modLabel);
350  }
351  }
352  }
353  }
354  //labelsToBeDropped must be sorted
355  std::inplace_merge(labelsToBeDropped.begin(),
356  labelsToBeDropped.begin()+sizeBeforeOutputModules,
357  labelsToBeDropped.end());
358 
359  // drop the parameter sets used to configure the modules
360  for_all(labelsToBeDropped, std::bind(&ParameterSet::eraseOrSetUntrackedParameterSet, std::ref(proc_pset), _1));
361 
362  // drop the labels from @all_modules
363  vstring::iterator endAfterRemove = std::remove_if(modulesInConfig.begin(), modulesInConfig.end(), std::bind(binary_search_string, std::ref(labelsToBeDropped), _1));
364  modulesInConfig.erase(endAfterRemove, modulesInConfig.end());
365  proc_pset.addParameter<vstring>(std::string("@all_modules"), modulesInConfig);
366 
367  // drop the labels from all end paths
368  vstring endPathsToBeDropped;
369  vstring labels;
370  for (vstring::const_iterator iEndPath = end_path_name_list.begin(), endEndPath = end_path_name_list.end();
371  iEndPath != endEndPath;
372  ++iEndPath) {
373  labels = proc_pset.getParameter<vstring>(*iEndPath);
374  vstring::iterator iSave = labels.begin();
375  vstring::iterator iBegin = labels.begin();
376 
377  for (vstring::iterator iLabel = labels.begin(), iEnd = labels.end();
378  iLabel != iEnd; ++iLabel) {
379  if (binary_search_string(labelsToBeDropped, *iLabel)) {
380  if (binary_search_string(outputModuleLabels, *iLabel)) {
381  outputModulePathPositions[*iLabel].emplace_back(*iEndPath, iSave - iBegin);
382  }
383  } else {
384  if (iSave != iLabel) {
385  iSave->swap(*iLabel);
386  }
387  ++iSave;
388  }
389  }
390  labels.erase(iSave, labels.end());
391  if (labels.empty()) {
392  // remove empty end paths and save their names
393  proc_pset.eraseSimpleParameter(*iEndPath);
394  endPathsToBeDropped.push_back(*iEndPath);
395  } else {
396  proc_pset.addParameter<vstring>(*iEndPath, labels);
397  }
398  }
399  sort_all(endPathsToBeDropped);
400 
401  // remove empty end paths from @paths
402  endAfterRemove = std::remove_if(scheduledPaths.begin(), scheduledPaths.end(), std::bind(binary_search_string, std::ref(endPathsToBeDropped), _1));
403  scheduledPaths.erase(endAfterRemove, scheduledPaths.end());
404  proc_pset.addParameter<vstring>(std::string("@paths"), scheduledPaths);
405 
406  // remove empty end paths from @end_paths
407  vstring scheduledEndPaths = proc_pset.getParameter<vstring>("@end_paths");
408  endAfterRemove = std::remove_if(scheduledEndPaths.begin(), scheduledEndPaths.end(), std::bind(binary_search_string, std::ref(endPathsToBeDropped), _1));
409  scheduledEndPaths.erase(endAfterRemove, scheduledEndPaths.end());
410  proc_pset.addParameter<vstring>(std::string("@end_paths"), scheduledEndPaths);
411 
412  }
413 
414  class RngEDConsumer : public EDConsumerBase {
415  public:
416  explicit RngEDConsumer(std::set<TypeID>& typesConsumed) {
418  if(rng.isAvailable()) {
419  rng->consumes(consumesCollector());
420  for (auto const& consumesInfo : this->consumesInfo()) {
421  typesConsumed.emplace(consumesInfo.type());
422  }
423  }
424  }
425  };
426  }
427  // -----------------------------
428 
429  typedef std::vector<std::string> vstring;
430 
431  // -----------------------------
432 
434  service::TriggerNamesService const& tns,
435  ProductRegistry& preg,
436  BranchIDListHelper& branchIDListHelper,
437  ThinnedAssociationsHelper& thinnedAssociationsHelper,
438  SubProcessParentageHelper const* subProcessParentageHelper,
439  ExceptionToActionTable const& actions,
440  std::shared_ptr<ActivityRegistry> areg,
441  std::shared_ptr<ProcessConfiguration> processConfiguration,
442  bool hasSubprocesses,
443  PreallocationConfiguration const& prealloc,
444  ProcessContext const* processContext) :
445  //Only create a resultsInserter if there is a trigger path
446  resultsInserter_{tns.getTrigPaths().empty()? std::shared_ptr<TriggerResultInserter>{} :makeInserter(proc_pset,prealloc,preg,actions,areg,processConfiguration)},
449  preallocConfig_(prealloc),
450  pathNames_(&tns.getTrigPaths()),
451  endPathNames_(&tns.getEndPaths()),
452  wantSummary_(tns.wantSummary()),
453  endpathsAreActive_(true)
454  {
455  makePathStatusInserters(pathStatusInserters_,
456  *pathNames_,
457  prealloc,
458  preg,
459  areg,
460  processConfiguration,
461  std::string("PathStatusInserter"));
462 
463  makePathStatusInserters(endPathStatusInserters_,
464  *endPathNames_,
465  prealloc,
466  preg,
467  areg,
468  processConfiguration,
469  std::string("EndPathStatusInserter"));
470 
471  assert(0<prealloc.numberOfStreams());
472  streamSchedules_.reserve(prealloc.numberOfStreams());
473  for(unsigned int i=0; i<prealloc.numberOfStreams();++i) {
474  streamSchedules_.emplace_back(std::make_shared<StreamSchedule>(
475  resultsInserter(),
478  moduleRegistry(),
479  proc_pset,tns,prealloc,preg,
480  branchIDListHelper,actions,
481  areg,processConfiguration,
482  !hasSubprocesses,
483  StreamID{i},
484  processContext));
485  }
486 
487  //TriggerResults are injected automatically by StreamSchedules and are
488  // unknown to the ModuleRegistry
489  const std::string kTriggerResults("TriggerResults");
490  std::vector<std::string> modulesToUse;
491  modulesToUse.reserve(streamSchedules_[0]->allWorkers().size());
492  for(auto const& worker : streamSchedules_[0]->allWorkers()) {
493  if(worker->description().moduleLabel() != kTriggerResults) {
494  modulesToUse.push_back(worker->description().moduleLabel());
495  }
496  }
497  //The unscheduled modules are at the end of the list, but we want them at the front
498  unsigned int n = streamSchedules_[0]->numberOfUnscheduledModules();
499  if(n>0) {
500  std::vector<std::string> temp;
501  temp.reserve(modulesToUse.size());
502  auto itBeginUnscheduled = modulesToUse.begin()+modulesToUse.size()-n;
503  std::copy(itBeginUnscheduled,modulesToUse.end(),
504  std::back_inserter(temp));
505  std::copy(modulesToUse.begin(),itBeginUnscheduled,std::back_inserter(temp));
506  temp.swap(modulesToUse);
507  }
508 
509  // propagate_const<T> has no reset() function
510  globalSchedule_ = std::make_unique<GlobalSchedule>(
511  resultsInserter(),
514  moduleRegistry(),
515  modulesToUse,
516  proc_pset, preg, prealloc,
517  actions,areg,processConfiguration,processContext);
518 
519  //TriggerResults is not in the top level ParameterSet so the call to
520  // reduceParameterSet would fail to find it. Just remove it up front.
521  std::set<std::string> usedModuleLabels;
522  for(auto const& worker: allWorkers()) {
523  if(worker->description().moduleLabel() != kTriggerResults) {
524  usedModuleLabels.insert(worker->description().moduleLabel());
525  }
526  }
527  std::vector<std::string> modulesInConfig(proc_pset.getParameter<std::vector<std::string> >("@all_modules"));
528  std::map<std::string, std::vector<std::pair<std::string, int> > > outputModulePathPositions;
529  reduceParameterSet(proc_pset, tns.getEndPaths(), modulesInConfig, usedModuleLabels,
530  outputModulePathPositions);
531  processEDAliases(proc_pset, processConfiguration->processName(), preg);
532  proc_pset.registerIt();
533  processConfiguration->setParameterSetID(proc_pset.id());
534  processConfiguration->setProcessConfigurationID();
535 
536  // This is used for a little sanity-check to make sure no code
537  // modifications alter the number of workers at a later date.
538  size_t all_workers_count = allWorkers().size();
539 
540  moduleRegistry_->forAllModuleHolders([this](maker::ModuleHolder* iHolder){
541  auto comm = iHolder->createOutputModuleCommunicator();
542  if (comm) {
543  all_output_communicators_.emplace_back(std::shared_ptr<OutputModuleCommunicator>{comm.release()});
544  }
545  });
546  // Now that the output workers are filled in, set any output limits or information.
547  limitOutput(proc_pset, branchIDListHelper.branchIDLists(), subProcessParentageHelper);
548 
549  // Sanity check: make sure nobody has added a worker after we've
550  // already relied on the WorkerManager being full.
551  assert (all_workers_count == allWorkers().size());
552 
553  branchIDListHelper.updateFromRegistry(preg);
554 
555  for(auto const& worker : streamSchedules_[0]->allWorkers()) {
556  worker->registerThinnedAssociations(preg, thinnedAssociationsHelper);
557  }
558  thinnedAssociationsHelper.sort();
559 
560  // The output modules consume products in kept branches.
561  // So we must set this up before freezing.
562  for (auto& c : all_output_communicators_) {
563  c->selectProducts(preg, thinnedAssociationsHelper);
564  }
565 
566  {
567  // We now get a collection of types that may be consumed.
568  std::set<TypeID> productTypesConsumed;
569  std::set<TypeID> elementTypesConsumed;
570  // Loop over all modules
571  for (auto const& worker : allWorkers()) {
572  for (auto const& consumesInfo : worker->consumesInfo()) {
573  if (consumesInfo.kindOfType() == PRODUCT_TYPE) {
574  productTypesConsumed.emplace(consumesInfo.type());
575  } else {
576  elementTypesConsumed.emplace(consumesInfo.type());
577  }
578  }
579  }
580  // The SubProcess class is not a module, yet it may consume.
581  if(hasSubprocesses) {
582  productTypesConsumed.emplace(typeid(TriggerResults));
583  }
584  // The RandomNumberGeneratorService is not a module, yet it consumes.
585  {
586  RngEDConsumer rngConsumer = RngEDConsumer(productTypesConsumed);
587  }
588  preg.setFrozen(productTypesConsumed, elementTypesConsumed, processConfiguration->processName());
589  }
590 
591  for (auto& c : all_output_communicators_) {
592  c->setEventSelectionInfo(outputModulePathPositions, preg.anyProductProduced());
593  }
594 
595  if(wantSummary_) {
596  std::vector<const ModuleDescription*> modDesc;
597  const auto& workers = allWorkers();
598  modDesc.reserve(workers.size());
599 
600  std::transform(workers.begin(),workers.end(),
601  std::back_inserter(modDesc),
602  [](const Worker* iWorker) -> const ModuleDescription* {
603  return iWorker->descPtr();
604  });
605 
606  // propagate_const<T> has no reset() function
607  summaryTimeKeeper_ = std::make_unique<SystemTimeKeeper>(
608  prealloc.numberOfStreams(),
609  modDesc,
610  tns,
611  processContext);
612  auto timeKeeperPtr = summaryTimeKeeper_.get();
613 
614  areg->watchPreModuleEvent(timeKeeperPtr, &SystemTimeKeeper::startModuleEvent);
615  areg->watchPostModuleEvent(timeKeeperPtr, &SystemTimeKeeper::stopModuleEvent);
616  areg->watchPreModuleEventDelayedGet(timeKeeperPtr, &SystemTimeKeeper::pauseModuleEvent);
617  areg->watchPostModuleEventDelayedGet(timeKeeperPtr,&SystemTimeKeeper::restartModuleEvent);
618 
619  areg->watchPreSourceEvent(timeKeeperPtr, &SystemTimeKeeper::startEvent);
620  areg->watchPostEvent(timeKeeperPtr, &SystemTimeKeeper::stopEvent);
621 
622  areg->watchPrePathEvent(timeKeeperPtr, &SystemTimeKeeper::startPath);
623  areg->watchPostPathEvent(timeKeeperPtr, &SystemTimeKeeper::stopPath);
624 
625  areg->watchPostBeginJob(timeKeeperPtr, &SystemTimeKeeper::startProcessingLoop);
626  areg->watchPreEndJob(timeKeeperPtr, &SystemTimeKeeper::stopProcessingLoop);
627  //areg->preModuleEventSignal_.connect([timeKeeperPtr](StreamContext const& iContext, ModuleCallingContext const& iMod) {
628  //timeKeeperPtr->startModuleEvent(iContext,iMod);
629  //});
630  }
631 
632  } // Schedule::Schedule
633 
634 
635  void
637  BranchIDLists const& branchIDLists,
638  SubProcessParentageHelper const* subProcessParentageHelper) {
639  std::string const output("output");
640 
641  ParameterSet const& maxEventsPSet = proc_pset.getUntrackedParameterSet("maxEvents", ParameterSet());
642  int maxEventSpecs = 0;
643  int maxEventsOut = -1;
644  ParameterSet const* vMaxEventsOut = 0;
645  std::vector<std::string> intNamesE = maxEventsPSet.getParameterNamesForType<int>(false);
646  if (search_all(intNamesE, output)) {
647  maxEventsOut = maxEventsPSet.getUntrackedParameter<int>(output);
648  ++maxEventSpecs;
649  }
650  std::vector<std::string> psetNamesE;
651  maxEventsPSet.getParameterSetNames(psetNamesE, false);
652  if (search_all(psetNamesE, output)) {
653  vMaxEventsOut = &maxEventsPSet.getUntrackedParameterSet(output);
654  ++maxEventSpecs;
655  }
656 
657  if (maxEventSpecs > 1) {
659  "\nAt most, one form of 'output' may appear in the 'maxEvents' parameter set";
660  }
661 
662  for (auto& c : all_output_communicators_) {
663  OutputModuleDescription desc(branchIDLists, maxEventsOut, subProcessParentageHelper);
664  if (vMaxEventsOut != 0 && !vMaxEventsOut->empty()) {
665  std::string const& moduleLabel = c->description().moduleLabel();
666  try {
667  desc.maxEvents_ = vMaxEventsOut->getUntrackedParameter<int>(moduleLabel);
668  } catch (Exception const&) {
670  "\nNo entry in 'maxEvents' for output module label '" << moduleLabel << "'.\n";
671  }
672  }
673  c->configure(desc);
674  }
675  }
676 
677  bool Schedule::terminate() const {
678  if (all_output_communicators_.empty()) {
679  return false;
680  }
681  for (auto& c : all_output_communicators_) {
682  if (!c->limitReached()) {
683  // Found an output module that has not reached output event count.
684  return false;
685  }
686  }
687  LogInfo("SuccessfulTermination")
688  << "The job is terminating successfully because each output module\n"
689  << "has reached its configured limit.\n";
690  return true;
691  }
692 
694  globalSchedule_->endJob(collector);
695  if (collector.hasThrown()) {
696  return;
697  }
698 
699  if (wantSummary_ == false) return;
700  {
701  TriggerReport tr;
702  getTriggerReport(tr);
703 
704  // The trigger report (pass/fail etc.):
705 
706  LogVerbatim("FwkSummary") << "";
707  if(streamSchedules_[0]->context().processContext()->isSubProcess()) {
708  LogVerbatim("FwkSummary") << "TrigReport Process: "<<streamSchedules_[0]->context().processContext()->processName();
709  }
710  LogVerbatim("FwkSummary") << "TrigReport " << "---------- Event Summary ------------";
711  if(!tr.trigPathSummaries.empty()) {
712  LogVerbatim("FwkSummary") << "TrigReport"
713  << " Events total = " << tr.eventSummary.totalEvents
714  << " passed = " << tr.eventSummary.totalEventsPassed
715  << " failed = " << tr.eventSummary.totalEventsFailed
716  << "";
717  } else {
718  LogVerbatim("FwkSummary") << "TrigReport"
719  << " Events total = " << tr.eventSummary.totalEvents
720  << " passed = " << tr.eventSummary.totalEvents
721  << " failed = 0";
722  }
723 
724  LogVerbatim("FwkSummary") << "";
725  LogVerbatim("FwkSummary") << "TrigReport " << "---------- Path Summary ------------";
726  LogVerbatim("FwkSummary") << "TrigReport "
727  << std::right << std::setw(10) << "Trig Bit#" << " "
728  << std::right << std::setw(10) << "Executed" << " "
729  << std::right << std::setw(10) << "Passed" << " "
730  << std::right << std::setw(10) << "Failed" << " "
731  << std::right << std::setw(10) << "Error" << " "
732  << "Name" << "";
733  for (auto const& p: tr.trigPathSummaries) {
734  LogVerbatim("FwkSummary") << "TrigReport "
735  << std::right << std::setw(5) << 1
736  << std::right << std::setw(5) << p.bitPosition << " "
737  << std::right << std::setw(10) << p.timesRun << " "
738  << std::right << std::setw(10) << p.timesPassed << " "
739  << std::right << std::setw(10) << p.timesFailed << " "
740  << std::right << std::setw(10) << p.timesExcept << " "
741  << p.name << "";
742  }
743 
744  /*
745  std::vector<int>::const_iterator epi = empty_trig_paths_.begin();
746  std::vector<int>::const_iterator epe = empty_trig_paths_.end();
747  std::vector<std::string>::const_iterator epn = empty_trig_path_names_.begin();
748  for (; epi != epe; ++epi, ++epn) {
749 
750  LogVerbatim("FwkSummary") << "TrigReport "
751  << std::right << std::setw(5) << 1
752  << std::right << std::setw(5) << *epi << " "
753  << std::right << std::setw(10) << totalEvents() << " "
754  << std::right << std::setw(10) << totalEvents() << " "
755  << std::right << std::setw(10) << 0 << " "
756  << std::right << std::setw(10) << 0 << " "
757  << *epn << "";
758  }
759  */
760 
761  LogVerbatim("FwkSummary") << "";
762  LogVerbatim("FwkSummary") << "TrigReport " << "-------End-Path Summary ------------";
763  LogVerbatim("FwkSummary") << "TrigReport "
764  << std::right << std::setw(10) << "Trig Bit#" << " "
765  << std::right << std::setw(10) << "Executed" << " "
766  << std::right << std::setw(10) << "Passed" << " "
767  << std::right << std::setw(10) << "Failed" << " "
768  << std::right << std::setw(10) << "Error" << " "
769  << "Name" << "";
770  for (auto const& p: tr.endPathSummaries) {
771  LogVerbatim("FwkSummary") << "TrigReport "
772  << std::right << std::setw(5) << 0
773  << std::right << std::setw(5) << p.bitPosition << " "
774  << std::right << std::setw(10) << p.timesRun << " "
775  << std::right << std::setw(10) << p.timesPassed << " "
776  << std::right << std::setw(10) << p.timesFailed << " "
777  << std::right << std::setw(10) << p.timesExcept << " "
778  << p.name << "";
779  }
780 
781  for (auto const& p: tr.trigPathSummaries) {
782  LogVerbatim("FwkSummary") << "";
783  LogVerbatim("FwkSummary") << "TrigReport " << "---------- Modules in Path: " << p.name << " ------------";
784  LogVerbatim("FwkSummary") << "TrigReport "
785  << std::right << std::setw(10) << "Trig Bit#" << " "
786  << std::right << std::setw(10) << "Visited" << " "
787  << std::right << std::setw(10) << "Passed" << " "
788  << std::right << std::setw(10) << "Failed" << " "
789  << std::right << std::setw(10) << "Error" << " "
790  << "Name" << "";
791 
792  unsigned int bitpos = 0;
793  for (auto const& mod: p.moduleInPathSummaries) {
794  LogVerbatim("FwkSummary") << "TrigReport "
795  << std::right << std::setw(5) << 1
796  << std::right << std::setw(5) << bitpos << " "
797  << std::right << std::setw(10) << mod.timesVisited << " "
798  << std::right << std::setw(10) << mod.timesPassed << " "
799  << std::right << std::setw(10) << mod.timesFailed << " "
800  << std::right << std::setw(10) << mod.timesExcept << " "
801  << mod.moduleLabel << "";
802  ++bitpos;
803  }
804  }
805 
806  for (auto const& p: tr.endPathSummaries) {
807  LogVerbatim("FwkSummary") << "";
808  LogVerbatim("FwkSummary") << "TrigReport " << "------ Modules in End-Path: " << p.name << " ------------";
809  LogVerbatim("FwkSummary") << "TrigReport "
810  << std::right << std::setw(10) << "Trig Bit#" << " "
811  << std::right << std::setw(10) << "Visited" << " "
812  << std::right << std::setw(10) << "Passed" << " "
813  << std::right << std::setw(10) << "Failed" << " "
814  << std::right << std::setw(10) << "Error" << " "
815  << "Name" << "";
816 
817  unsigned int bitpos=0;
818  for (auto const& mod: p.moduleInPathSummaries) {
819  LogVerbatim("FwkSummary") << "TrigReport "
820  << std::right << std::setw(5) << 0
821  << std::right << std::setw(5) << bitpos << " "
822  << std::right << std::setw(10) << mod.timesVisited << " "
823  << std::right << std::setw(10) << mod.timesPassed << " "
824  << std::right << std::setw(10) << mod.timesFailed << " "
825  << std::right << std::setw(10) << mod.timesExcept << " "
826  << mod.moduleLabel << "";
827  ++bitpos;
828  }
829  }
830 
831  LogVerbatim("FwkSummary") << "";
832  LogVerbatim("FwkSummary") << "TrigReport " << "---------- Module Summary ------------";
833  LogVerbatim("FwkSummary") << "TrigReport "
834  << std::right << std::setw(10) << "Visited" << " "
835  << std::right << std::setw(10) << "Executed" << " "
836  << std::right << std::setw(10) << "Passed" << " "
837  << std::right << std::setw(10) << "Failed" << " "
838  << std::right << std::setw(10) << "Error" << " "
839  << "Name" << "";
840  for (auto const& worker : tr.workerSummaries) {
841  LogVerbatim("FwkSummary") << "TrigReport "
842  << std::right << std::setw(10) << worker.timesVisited << " "
843  << std::right << std::setw(10) << worker.timesRun << " "
844  << std::right << std::setw(10) << worker.timesPassed << " "
845  << std::right << std::setw(10) << worker.timesFailed << " "
846  << std::right << std::setw(10) << worker.timesExcept << " "
847  << worker.moduleLabel << "";
848  }
849  LogVerbatim("FwkSummary") << "";
850  }
851  // The timing report (CPU and Real Time):
854 
855  const int totalEvents = std::max(1, tr.eventSummary.totalEvents);
856 
857  LogVerbatim("FwkSummary") << "TimeReport " << "---------- Event Summary ---[sec]----";
858  LogVerbatim("FwkSummary") << "TimeReport"
859  << std::setprecision(6) << std::fixed
860  << " event loop CPU/event = " << tr.eventSummary.cpuTime/totalEvents;
861  LogVerbatim("FwkSummary") << "TimeReport"
862  << std::setprecision(6) << std::fixed
863  << " event loop Real/event = " << tr.eventSummary.realTime/totalEvents;
864  LogVerbatim("FwkSummary") << "TimeReport"
865  << std::setprecision(6) << std::fixed
866  << " sum Streams Real/event = " << tr.eventSummary.sumStreamRealTime/totalEvents;
867  LogVerbatim("FwkSummary") << "TimeReport"
868  << std::setprecision(6) << std::fixed
869  << " efficiency CPU/Real/thread = " << tr.eventSummary.cpuTime/tr.eventSummary.realTime/preallocConfig_.numberOfThreads();
870 
871  constexpr int kColumn1Size = 10;
872  constexpr int kColumn2Size = 12;
873  constexpr int kColumn3Size = 12;
874  LogVerbatim("FwkSummary") << "";
875  LogVerbatim("FwkSummary") << "TimeReport " << "---------- Path Summary ---[Real sec]----";
876  LogVerbatim("FwkSummary") << "TimeReport "
877  << std::right << std::setw(kColumn1Size) << "per event"<<" "
878  << std::right << std::setw(kColumn2Size) << "per exec"
879  << " Name";
880  for (auto const& p: tr.trigPathSummaries) {
881  const int timesRun = std::max(1, p.timesRun);
882  LogVerbatim("FwkSummary") << "TimeReport "
883  << std::setprecision(6) << std::fixed
884  << std::right << std::setw(kColumn1Size) << p.realTime/totalEvents << " "
885  << std::right << std::setw(kColumn2Size) << p.realTime/timesRun << " "
886  << p.name << "";
887  }
888  LogVerbatim("FwkSummary") << "TimeReport "
889  << std::right << std::setw(kColumn1Size) << "per event"<<" "
890  << std::right << std::setw(kColumn2Size) << "per exec"
891  << " Name" << "";
892 
893  LogVerbatim("FwkSummary") << "";
894  LogVerbatim("FwkSummary") << "TimeReport " << "-------End-Path Summary ---[Real sec]----";
895  LogVerbatim("FwkSummary") << "TimeReport "
896  << std::right << std::setw(kColumn1Size) << "per event" <<" "
897  << std::right << std::setw(kColumn2Size) << "per exec"
898  << " Name" << "";
899  for (auto const& p: tr.endPathSummaries) {
900  const int timesRun = std::max(1, p.timesRun);
901 
902  LogVerbatim("FwkSummary") << "TimeReport "
903  << std::setprecision(6) << std::fixed
904  << std::right << std::setw(kColumn1Size) << p.realTime/totalEvents << " "
905  << std::right << std::setw(kColumn2Size) << p.realTime/timesRun << " "
906  << p.name << "";
907  }
908  LogVerbatim("FwkSummary") << "TimeReport "
909  << std::right << std::setw(kColumn1Size) << "per event" <<" "
910  << std::right << std::setw(kColumn2Size) << "per exec"
911  << " Name" << "";
912 
913  for (auto const& p: tr.trigPathSummaries) {
914  LogVerbatim("FwkSummary") << "";
915  LogVerbatim("FwkSummary") << "TimeReport " << "---------- Modules in Path: " << p.name << " ---[Real sec]----";
916  LogVerbatim("FwkSummary") << "TimeReport "
917  << std::right << std::setw(kColumn1Size) << "per event" <<" "
918  << std::right << std::setw(kColumn2Size) << "per visit"
919  << " Name" << "";
920  for (auto const& mod: p.moduleInPathSummaries) {
921  LogVerbatim("FwkSummary") << "TimeReport "
922  << std::setprecision(6) << std::fixed
923  << std::right << std::setw(kColumn1Size) << mod.realTime/totalEvents << " "
924  << std::right << std::setw(kColumn2Size) << mod.realTime/std::max(1, mod.timesVisited) << " "
925  << mod.moduleLabel << "";
926  }
927  }
928  if(not tr.trigPathSummaries.empty()) {
929  LogVerbatim("FwkSummary") << "TimeReport "
930  << std::right << std::setw(kColumn1Size) << "per event" <<" "
931  << std::right << std::setw(kColumn2Size) << "per visit"
932  << " Name" << "";
933  }
934  for (auto const& p: tr.endPathSummaries) {
935  LogVerbatim("FwkSummary") << "";
936  LogVerbatim("FwkSummary") << "TimeReport " << "------ Modules in End-Path: " << p.name << " ---[Real sec]----";
937  LogVerbatim("FwkSummary") << "TimeReport "
938  << std::right << std::setw(kColumn1Size) << "per event" <<" "
939  << std::right << std::setw(kColumn2Size) << "per visit"
940  << " Name" << "";
941  for (auto const& mod: p.moduleInPathSummaries) {
942  LogVerbatim("FwkSummary") << "TimeReport "
943  << std::setprecision(6) << std::fixed
944  << std::right << std::setw(kColumn1Size) << mod.realTime/totalEvents << " "
945  << std::right << std::setw(kColumn2Size) << mod.realTime/std::max(1, mod.timesVisited) << " "
946  << mod.moduleLabel << "";
947  }
948  }
949  if(not tr.endPathSummaries.empty()) {
950  LogVerbatim("FwkSummary") << "TimeReport "
951  << std::right << std::setw(kColumn1Size) << "per event" <<" "
952  << std::right << std::setw(kColumn2Size) << "per visit"
953  << " Name" << "";
954  }
955  LogVerbatim("FwkSummary") << "";
956  LogVerbatim("FwkSummary") << "TimeReport " << "---------- Module Summary ---[Real sec]----";
957  LogVerbatim("FwkSummary") << "TimeReport "
958  << std::right << std::setw(kColumn1Size) << "per event" <<" "
959  << std::right << std::setw(kColumn2Size) << "per exec" <<" "
960  << std::right << std::setw(kColumn3Size) << "per visit"
961  << " Name" << "";
962  for (auto const& worker : tr.workerSummaries) {
963  LogVerbatim("FwkSummary") << "TimeReport "
964  << std::setprecision(6) << std::fixed
965  << std::right << std::setw(kColumn1Size) << worker.realTime/totalEvents << " "
966  << std::right << std::setw(kColumn2Size) << worker.realTime/std::max(1, worker.timesRun) << " "
967  << std::right << std::setw(kColumn3Size) << worker.realTime/std::max(1, worker.timesVisited) << " "
968  << worker.moduleLabel << "";
969  }
970  LogVerbatim("FwkSummary") << "TimeReport "
971  << std::right << std::setw(kColumn1Size) << "per event" <<" "
972  << std::right << std::setw(kColumn2Size) << "per exec" <<" "
973  << std::right << std::setw(kColumn3Size) << "per visit"
974  << " Name" << "";
975 
976  LogVerbatim("FwkSummary") << "";
977  LogVerbatim("FwkSummary") << "T---Report end!" << "";
978  LogVerbatim("FwkSummary") << "";
979  }
980 
982  using std::placeholders::_1;
984  }
985 
987  using std::placeholders::_1;
989  }
990 
991  void Schedule::writeRun(RunPrincipal const& rp, ProcessContext const* processContext) {
992  using std::placeholders::_1;
993  for_all(all_output_communicators_, std::bind(&OutputModuleCommunicator::writeRun, _1, std::cref(rp), processContext));
994  }
995 
996  void Schedule::writeLumi(LuminosityBlockPrincipal const& lbp, ProcessContext const* processContext) {
997  using std::placeholders::_1;
998  for_all(all_output_communicators_, std::bind(&OutputModuleCommunicator::writeLumi, _1, std::cref(lbp), processContext));
999  }
1000 
1002  using std::placeholders::_1;
1003  // Return true iff at least one output module returns true.
1004  return (std::find_if (all_output_communicators_.begin(), all_output_communicators_.end(),
1006  != all_output_communicators_.end());
1007  }
1008 
1010  using std::placeholders::_1;
1011  for_all(allWorkers(), std::bind(&Worker::respondToOpenInputFile, _1, std::cref(fb)));
1012  }
1013 
1015  using std::placeholders::_1;
1016  for_all(allWorkers(), std::bind(&Worker::respondToCloseInputFile, _1, std::cref(fb)));
1017  }
1018 
1019  void Schedule::beginJob(ProductRegistry const& iRegistry) {
1020  globalSchedule_->beginJob(iRegistry);
1021  }
1022 
1023  void Schedule::beginStream(unsigned int iStreamID) {
1024  assert(iStreamID<streamSchedules_.size());
1025  streamSchedules_[iStreamID]->beginStream();
1026  }
1027 
1028  void Schedule::endStream(unsigned int iStreamID) {
1029  assert(iStreamID<streamSchedules_.size());
1030  streamSchedules_[iStreamID]->endStream();
1031  }
1032 
1034  unsigned int iStreamID,
1035  EventPrincipal& ep,
1036  EventSetup const& es) {
1037  assert(iStreamID<streamSchedules_.size());
1038  streamSchedules_[iStreamID]->processOneEventAsync(std::move(iTask),ep,es,pathStatusInserters_);
1039  }
1040 
1042  ParameterSet const& iPSet,
1043  const ProductRegistry& iRegistry) {
1044  Worker* found = nullptr;
1045  for (auto const& worker : allWorkers()) {
1046  if (worker->description().moduleLabel() == iLabel) {
1047  found = worker;
1048  break;
1049  }
1050  }
1051  if (nullptr == found) {
1052  return false;
1053  }
1054 
1055  auto newMod = moduleRegistry_->replaceModule(iLabel,iPSet,preallocConfig_);
1056 
1057  globalSchedule_->replaceModule(newMod,iLabel);
1058 
1059  for(auto& s: streamSchedules_) {
1060  s->replaceModule(newMod,iLabel);
1061  }
1062 
1063  {
1064  //Need to updateLookup in order to make getByToken work
1065  auto const runLookup = iRegistry.productLookup(InRun);
1066  auto const lumiLookup = iRegistry.productLookup(InLumi);
1067  auto const eventLookup = iRegistry.productLookup(InEvent);
1068  found->updateLookup(InRun,*runLookup);
1069  found->updateLookup(InLumi,*lumiLookup);
1070  found->updateLookup(InEvent,*eventLookup);
1071  }
1072 
1073  return true;
1074  }
1075 
1076  std::vector<ModuleDescription const*>
1078  std::vector<ModuleDescription const*> result;
1079  result.reserve(allWorkers().size());
1080 
1081  for (auto const& worker : allWorkers()) {
1082  ModuleDescription const* p = worker->descPtr();
1083  result.push_back(p);
1084  }
1085  return result;
1086  }
1087 
1088  Schedule::AllWorkers const&
1090  return globalSchedule_->allWorkers();
1091  }
1092 
1094  for (auto const& worker : allWorkers()) {
1095  worker->convertCurrentProcessAlias(processName);
1096  }
1097  }
1098 
1099  void
1100  Schedule::availablePaths(std::vector<std::string>& oLabelsToFill) const {
1101  streamSchedules_[0]->availablePaths(oLabelsToFill);
1102  }
1103 
1104  void
1105  Schedule::triggerPaths(std::vector<std::string>& oLabelsToFill) const {
1106  oLabelsToFill = *pathNames_;
1107 
1108  }
1109 
1110  void
1111  Schedule::endPaths(std::vector<std::string>& oLabelsToFill) const {
1112  oLabelsToFill = *endPathNames_;
1113  }
1114 
1115  void
1117  std::vector<std::string>& oLabelsToFill) const {
1118  streamSchedules_[0]->modulesInPath(iPathLabel,oLabelsToFill);
1119  }
1120 
1121  void
1123  std::vector<ModuleDescription const*>& descriptions,
1124  unsigned int hint) const {
1125  streamSchedules_[0]->moduleDescriptionsInPath(iPathLabel, descriptions, hint);
1126  }
1127 
1128  void
1130  std::vector<ModuleDescription const*>& descriptions,
1131  unsigned int hint) const {
1132  streamSchedules_[0]->moduleDescriptionsInEndPath(iEndPathLabel, descriptions, hint);
1133  }
1134 
1135  void
1136  Schedule::fillModuleAndConsumesInfo(std::vector<ModuleDescription const*>& allModuleDescriptions,
1137  std::vector<std::pair<unsigned int, unsigned int> >& moduleIDToIndex,
1138  std::vector<std::vector<ModuleDescription const*> >& modulesWhoseProductsAreConsumedBy,
1139  ProductRegistry const& preg) const {
1140  allModuleDescriptions.clear();
1141  moduleIDToIndex.clear();
1142  modulesWhoseProductsAreConsumedBy.clear();
1143 
1144  allModuleDescriptions.reserve(allWorkers().size());
1145  moduleIDToIndex.reserve(allWorkers().size());
1146  modulesWhoseProductsAreConsumedBy.resize(allWorkers().size());
1147 
1148  std::map<std::string, ModuleDescription const*> labelToDesc;
1149  unsigned int i = 0;
1150  for (auto const& worker : allWorkers()) {
1151  ModuleDescription const* p = worker->descPtr();
1152  allModuleDescriptions.push_back(p);
1153  moduleIDToIndex.push_back(std::pair<unsigned int, unsigned int>(p->id(), i));
1154  labelToDesc[p->moduleLabel()] = p;
1155  ++i;
1156  }
1157  sort_all(moduleIDToIndex);
1158 
1159  i = 0;
1160  for (auto const& worker : allWorkers()) {
1161  std::vector<ModuleDescription const*>& modules = modulesWhoseProductsAreConsumedBy.at(i);
1162  worker->modulesWhoseProductsAreConsumed(modules, preg, labelToDesc);
1163  ++i;
1164  }
1165  }
1166 
1167  void
1169  endpathsAreActive_ = active;
1170  for(auto& s : streamSchedules_) {
1171  s->enableEndPaths(active);
1172  }
1173  }
1174 
1175  bool
1177  return endpathsAreActive_;
1178  }
1179 
1180  void
1182  rep.eventSummary.totalEvents = 0;
1185  for(auto& s: streamSchedules_) {
1186  s->getTriggerReport(rep);
1187  }
1189  }
1190 
1191  void
1193  rep.eventSummary.totalEvents = 0;
1194  rep.eventSummary.cpuTime = 0.;
1195  rep.eventSummary.realTime = 0.;
1196  summaryTimeKeeper_->fillTriggerTimingReport(rep);
1197  }
1198 
1199  int
1201  int returnValue = 0;
1202  for(auto& s: streamSchedules_) {
1203  returnValue += s->totalEvents();
1204  }
1205  return returnValue;
1206  }
1207 
1208  int
1210  int returnValue = 0;
1211  for(auto& s: streamSchedules_) {
1212  returnValue += s->totalEventsPassed();
1213  }
1214  return returnValue;
1215  }
1216 
1217  int
1219  int returnValue = 0;
1220  for(auto& s: streamSchedules_) {
1221  returnValue += s->totalEventsFailed();
1222  }
1223  return returnValue;
1224  }
1225 
1226 
1227  void
1229  for(auto& s: streamSchedules_) {
1230  s->clearCounters();
1231  }
1232  }
1233 }
size
Write out results.
bool empty() const
Definition: ParameterSet.h:218
std::vector< PathSummary > endPathSummaries
Definition: TriggerReport.h:68
T getUntrackedParameter(std::string const &, T const &) const
std::vector< PathTimingSummary > endPathSummaries
void stopEvent(StreamContext const &)
std::vector< BranchIDList > BranchIDLists
Definition: BranchIDList.h:19
void fillModuleAndConsumesInfo(std::vector< ModuleDescription const * > &allModuleDescriptions, std::vector< std::pair< unsigned int, unsigned int > > &moduleIDToIndex, std::vector< std::vector< ModuleDescription const * > > &modulesWhoseProductsAreConsumedBy, ProductRegistry const &preg) const
Definition: Schedule.cc:1136
virtual void openFile(FileBlock const &fb)=0
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
Definition: Schedule.cc:1089
roAction_t actions[nactions]
Definition: GenABIO.cc:187
void availablePaths(std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill the labels for all paths in the process
Definition: Schedule.cc:1100
virtual void writeRun(RunPrincipal const &rp, ProcessContext const *)=0
void restartModuleEvent(StreamContext const &, ModuleCallingContext const &)
bool endPathsEnabled() const
Definition: Schedule.cc:1176
void respondToCloseInputFile(FileBlock const &fb)
Definition: Schedule.cc:1014
void startModuleEvent(StreamContext const &, ModuleCallingContext const &)
std::shared_ptr< ModuleRegistry const > moduleRegistry() const
Definition: Schedule.h:288
std::vector< Worker * > AllWorkers
Definition: Schedule.h:121
std::vector< ParameterSet > VParameterSet
Definition: ParameterSet.h:33
std::vector< std::string > const * pathNames_
Definition: Schedule.h:304
void convertCurrentProcessAlias(std::string const &processName)
Convert "@currentProcess" in InputTag process names to the actual current process name...
Definition: Schedule.cc:1093
virtual bool shouldWeCloseFile() const =0
void writeRun(RunPrincipal const &rp, ProcessContext const *)
Definition: Schedule.cc:991
void endStream(unsigned int)
Definition: Schedule.cc:1028
void writeLumi(LuminosityBlockPrincipal const &lbp, ProcessContext const *)
Definition: Schedule.cc:996
void enableEndPaths(bool active)
Definition: Schedule.cc:1168
ParameterSet getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
virtual void updateLookup(BranchType iBranchType, ProductResolverIndexHelper const &)=0
void moduleDescriptionsInEndPath(std::string const &iEndPathLabel, std::vector< ModuleDescription const * > &descriptions, unsigned int hint) const
Definition: Schedule.cc:1129
std::vector< WorkerSummary > workerSummaries
Definition: TriggerReport.h:69
edm::propagate_const< std::unique_ptr< SystemTimeKeeper > > summaryTimeKeeper_
Definition: Schedule.h:302
std::string const & moduleLabel() const
static unsigned int getUniqueID()
Returns a unique id each time called. Intended to be passed to ModuleDescription&#39;s constructor&#39;s modI...
#define constexpr
int totalEventsFailed() const
Definition: Schedule.cc:1218
edm::propagate_const< std::unique_ptr< GlobalSchedule > > globalSchedule_
Definition: Schedule.h:297
bool changeModule(std::string const &iLabel, ParameterSet const &iPSet, const ProductRegistry &iRegistry)
Definition: Schedule.cc:1041
void eraseOrSetUntrackedParameterSet(std::string const &name)
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
std::vector< std::string > getParameterNamesForType(bool trackiness=true) const
Definition: ParameterSet.h:194
int totalEventsPassed() const
Definition: Schedule.cc:1209
void triggerPaths(std::vector< std::string > &oLabelsToFill) const
Definition: Schedule.cc:1105
std::vector< PathSummary > trigPathSummaries
Definition: TriggerReport.h:67
EventSummary eventSummary
Definition: TriggerReport.h:66
std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter > > > pathStatusInserters_
Definition: Schedule.h:292
int totalEvents() const
Definition: Schedule.cc:1200
EventTimingSummary eventSummary
void clearCounters()
Clear all the counters in the trigger report.
Definition: Schedule.cc:1228
Schedule(ParameterSet &proc_pset, service::TriggerNamesService const &tns, ProductRegistry &pregistry, BranchIDListHelper &branchIDListHelper, ThinnedAssociationsHelper &thinnedAssociationsHelper, SubProcessParentageHelper const *subProcessParentageHelper, ExceptionToActionTable const &actions, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration > processConfiguration, bool hasSubprocesses, PreallocationConfiguration const &config, ProcessContext const *processContext)
Definition: Schedule.cc:433
std::vector< PathTimingSummary > trigPathSummaries
void limitOutput(ParameterSet const &proc_pset, BranchIDLists const &branchIDLists, SubProcessParentageHelper const *subProcessParentageHelper)
Definition: Schedule.cc:636
bool terminate() const
Return whether each output module has reached its maximum count.
Definition: Schedule.cc:677
void respondToOpenInputFile(FileBlock const &fb)
Definition: Schedule.cc:1009
edm::propagate_const< std::shared_ptr< ModuleRegistry > > moduleRegistry_
Definition: Schedule.h:294
rep
Definition: cuy.py:1188
virtual void writeLumi(LuminosityBlockPrincipal const &lbp, ProcessContext const *)=0
void stopPath(StreamContext const &, PathContext const &, HLTPathStatus const &)
void stopModuleEvent(StreamContext const &, ModuleCallingContext const &)
void getTriggerReport(TriggerReport &rep) const
Definition: Schedule.cc:1181
PreallocationConfiguration preallocConfig_
Definition: Schedule.h:300
std::vector< edm::propagate_const< std::shared_ptr< StreamSchedule > > > streamSchedules_
Definition: Schedule.h:295
void sort_all(RandomAccessSequence &s)
wrappers for std::sort
Definition: Algorithms.h:120
volatile bool endpathsAreActive_
Definition: Schedule.h:308
void respondToOpenInputFile(FileBlock const &fb)
Definition: Worker.h:123
std::shared_ptr< TriggerResultInserter const > resultsInserter() const
Definition: Schedule.h:286
void startPath(StreamContext const &, PathContext const &)
bool search_all(ForwardSequence const &s, Datum const &d)
Definition: Algorithms.h:46
std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter > > > endPathStatusInserters_
Definition: Schedule.h:293
virtual std::unique_ptr< OutputModuleCommunicator > createOutputModuleCommunicator()=0
AllOutputModuleCommunicators all_output_communicators_
Definition: Schedule.h:299
void modulesInPath(std::string const &iPathLabel, std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel ...
Definition: Schedule.cc:1116
void pauseModuleEvent(StreamContext const &, ModuleCallingContext const &)
std::shared_ptr< ProductResolverIndexHelper const > productLookup(BranchType branchType) const
void beginStream(unsigned int)
Definition: Schedule.cc:1023
void respondToCloseInputFile(FileBlock const &fb)
Definition: Worker.h:124
bool wantSummary_
Definition: Schedule.h:306
std::vector< std::string > const * endPathNames_
Definition: Schedule.h:305
HLT enums.
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
Definition: Schedule.cc:1077
Strings const & getTrigPaths() const
void processOneEventAsync(WaitingTaskHolder iTask, unsigned int iStreamID, EventPrincipal &principal, EventSetup const &eventSetup)
Definition: Schedule.cc:1033
void beginJob(ProductRegistry const &)
Definition: Schedule.cc:1019
size_t getParameterSetNames(std::vector< std::string > &output, bool trackiness=true) const
void openOutputFiles(FileBlock &fb)
Definition: Schedule.cc:986
std::vector< WorkerTimingSummary > workerSummaries
T mod(const T &a, const T &b)
Definition: ecalDccMap.h:4
void endJob(ExceptionCollector &collector)
Definition: Schedule.cc:693
void getTriggerTimingReport(TriggerTimingReport &rep) const
Definition: Schedule.cc:1192
bool shouldWeCloseOutput() const
Definition: Schedule.cc:1001
std::vector< std::string > vstring
Definition: Schedule.cc:429
def move(src, dest)
Definition: eostools.py:510
void closeOutputFiles()
Definition: Schedule.cc:981
void endPaths(std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill the labels for all end paths in the process
Definition: Schedule.cc:1111
void moduleDescriptionsInPath(std::string const &iPathLabel, std::vector< ModuleDescription const * > &descriptions, unsigned int hint) const
Definition: Schedule.cc:1122
unsigned int id() const
std::string match(BranchDescription const &a, BranchDescription const &b, std::string const &fileName)