CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
Schedule.cc
Go to the documentation of this file.
2 
23 
24 #include "boost/graph/graph_traits.hpp"
25 #include "boost/graph/adjacency_list.hpp"
26 #include "boost/graph/depth_first_search.hpp"
27 #include "boost/graph/visitors.hpp"
28 
29 
30 #include <algorithm>
31 #include <cassert>
32 #include <cstdlib>
33 #include <functional>
34 #include <iomanip>
35 #include <list>
36 #include <map>
37 #include <exception>
38 #include <sstream>
39 
40 namespace edm {
41  namespace {
42  using std::placeholders::_1;
43 
44  bool binary_search_string(std::vector<std::string> const& v, std::string const& s) {
45  return std::binary_search(v.begin(), v.end(), s);
46  }
47 
48  // Here we make the trigger results inserter directly. This should
49  // probably be a utility in the WorkerRegistry or elsewhere.
50 
51  std::shared_ptr<TriggerResultInserter>
52  makeInserter(ParameterSet& proc_pset,
53  PreallocationConfiguration const& iPrealloc,
54  ProductRegistry& preg,
55  ExceptionToActionTable const& actions,
56  std::shared_ptr<ActivityRegistry> areg,
57  std::shared_ptr<ProcessConfiguration> processConfiguration) {
58 
59  ParameterSet* trig_pset = proc_pset.getPSetForUpdate("@trigger_paths");
60  trig_pset->registerIt();
61 
62  WorkerParams work_args(trig_pset, preg, &iPrealloc, processConfiguration, actions);
63  ModuleDescription md(trig_pset->id(),
64  "TriggerResultInserter",
65  "TriggerResults",
66  processConfiguration.get(),
68 
69  areg->preModuleConstructionSignal_(md);
70  bool postCalled = false;
71  std::shared_ptr<TriggerResultInserter> returnValue;
72  try {
73  maker::ModuleHolderT<TriggerResultInserter> holder(std::make_shared<TriggerResultInserter>(*trig_pset, iPrealloc.numberOfStreams()),static_cast<Maker const*>(nullptr));
74  holder.setModuleDescription(md);
75  holder.registerProductsAndCallbacks(&preg);
76  returnValue =holder.module();
77  postCalled = true;
78  // if exception then post will be called in the catch block
79  areg->postModuleConstructionSignal_(md);
80  }
81  catch (...) {
82  if(!postCalled) {
83  try {
84  areg->postModuleConstructionSignal_(md);
85  }
86  catch (...) {
87  // If post throws an exception ignore it because we are already handling another exception
88  }
89  }
90  throw;
91  }
92  return returnValue;
93  }
94 
95 
96  void
97  checkAndInsertAlias(std::string const& friendlyClassName,
98  std::string const& moduleLabel,
99  std::string const& productInstanceName,
100  std::string const& processName,
101  std::string const& alias,
102  std::string const& instanceAlias,
103  ProductRegistry const& preg,
104  std::multimap<BranchKey, BranchKey>& aliasMap,
105  std::map<BranchKey, BranchKey>& aliasKeys) {
106  std::string const star("*");
107 
108  BranchKey key(friendlyClassName, moduleLabel, productInstanceName, processName);
109  if(preg.productList().find(key) == preg.productList().end()) {
110  // No product was found matching the alias.
111  // We throw an exception only if a module with the specified module label was created in this process.
112  for(auto const& product : preg.productList()) {
113  if(moduleLabel == product.first.moduleLabel() && processName == product.first.processName()) {
114  throw Exception(errors::Configuration, "EDAlias does not match data\n")
115  << "There are no products of type '" << friendlyClassName << "'\n"
116  << "with module label '" << moduleLabel << "' and instance name '" << productInstanceName << "'.\n";
117  }
118  }
119  }
120 
121  std::string const& theInstanceAlias(instanceAlias == star ? productInstanceName : instanceAlias);
122  BranchKey aliasKey(friendlyClassName, alias, theInstanceAlias, processName);
123  if(preg.productList().find(aliasKey) != preg.productList().end()) {
124  throw Exception(errors::Configuration, "EDAlias conflicts with data\n")
125  << "A product of type '" << friendlyClassName << "'\n"
126  << "with module label '" << alias << "' and instance name '" << theInstanceAlias << "'\n"
127  << "already exists.\n";
128  }
129  auto iter = aliasKeys.find(aliasKey);
130  if(iter != aliasKeys.end()) {
131  // The alias matches a previous one. If the same alias is used for different product, throw.
132  if(iter->second != key) {
133  throw Exception(errors::Configuration, "EDAlias conflict\n")
134  << "The module label alias '" << alias << "' and product instance alias '" << theInstanceAlias << "'\n"
135  << "are used for multiple products of type '" << friendlyClassName << "'\n"
136  << "One has module label '" << moduleLabel << "' and product instance name '" << productInstanceName << "',\n"
137  << "the other has module label '" << iter->second.moduleLabel() << "' and product instance name '" << iter->second.productInstanceName() << "'.\n";
138  }
139  } else {
140  auto prodIter = preg.productList().find(key);
141  if(prodIter != preg.productList().end()) {
142  if (!prodIter->second.produced()) {
143  throw Exception(errors::Configuration, "EDAlias\n")
144  << "The module label alias '" << alias << "' and product instance alias '" << theInstanceAlias << "'\n"
145  << "are used for a product of type '" << friendlyClassName << "'\n"
146  << "with module label '" << moduleLabel << "' and product instance name '" << productInstanceName << "',\n"
147  << "An EDAlias can only be used for products produced in the current process. This one is not.\n";
148  }
149  aliasMap.insert(std::make_pair(key, aliasKey));
150  aliasKeys.insert(std::make_pair(aliasKey, key));
151  }
152  }
153  }
154 
155  void
156  processEDAliases(ParameterSet const& proc_pset, std::string const& processName, ProductRegistry& preg) {
157  std::vector<std::string> aliases = proc_pset.getParameter<std::vector<std::string> >("@all_aliases");
158  if(aliases.empty()) {
159  return;
160  }
161  std::string const star("*");
162  std::string const empty("");
164  desc.add<std::string>("type");
165  desc.add<std::string>("fromProductInstance", star);
166  desc.add<std::string>("toProductInstance", star);
167 
168  std::multimap<BranchKey, BranchKey> aliasMap;
169 
170  std::map<BranchKey, BranchKey> aliasKeys; // Used to search for duplicates or clashes.
171 
172  // Now, loop over the alias information and store it in aliasMap.
173  for(std::string const& alias : aliases) {
174  ParameterSet const& aliasPSet = proc_pset.getParameterSet(alias);
175  std::vector<std::string> vPSetNames = aliasPSet.getParameterNamesForType<VParameterSet>();
176  for(std::string const& moduleLabel : vPSetNames) {
177  VParameterSet vPSet = aliasPSet.getParameter<VParameterSet>(moduleLabel);
178  for(ParameterSet& pset : vPSet) {
179  desc.validate(pset);
180  std::string friendlyClassName = pset.getParameter<std::string>("type");
181  std::string productInstanceName = pset.getParameter<std::string>("fromProductInstance");
182  std::string instanceAlias = pset.getParameter<std::string>("toProductInstance");
183  if(productInstanceName == star) {
184  bool match = false;
185  BranchKey lowerBound(friendlyClassName, moduleLabel, empty, empty);
186  for(ProductRegistry::ProductList::const_iterator it = preg.productList().lower_bound(lowerBound);
187  it != preg.productList().end() && it->first.friendlyClassName() == friendlyClassName && it->first.moduleLabel() == moduleLabel;
188  ++it) {
189  if(it->first.processName() != processName) {
190  continue;
191  }
192  match = true;
193 
194  checkAndInsertAlias(friendlyClassName, moduleLabel, it->first.productInstanceName(), processName, alias, instanceAlias, preg, aliasMap, aliasKeys);
195  }
196  if(!match) {
197  // No product was found matching the alias.
198  // We throw an exception only if a module with the specified module label was created in this process.
199  for(auto const& product : preg.productList()) {
200  if(moduleLabel == product.first.moduleLabel() && processName == product.first.processName()) {
201  throw Exception(errors::Configuration, "EDAlias parameter set mismatch\n")
202  << "There are no products of type '" << friendlyClassName << "'\n"
203  << "with module label '" << moduleLabel << "'.\n";
204  }
205  }
206  }
207  } else {
208  checkAndInsertAlias(friendlyClassName, moduleLabel, productInstanceName, processName, alias, instanceAlias, preg, aliasMap, aliasKeys);
209  }
210  }
211  }
212  }
213 
214 
215  // Now add the new alias entries to the product registry.
216  for(auto const& aliasEntry : aliasMap) {
217  ProductRegistry::ProductList::const_iterator it = preg.productList().find(aliasEntry.first);
218  assert(it != preg.productList().end());
219  preg.addLabelAlias(it->second, aliasEntry.second.moduleLabel(), aliasEntry.second.productInstanceName());
220  }
221 
222  }
223 
224  typedef std::vector<std::string> vstring;
225 
226  void reduceParameterSet(ParameterSet& proc_pset,
227  vstring const& end_path_name_list,
228  vstring& modulesInConfig,
229  std::set<std::string> const& usedModuleLabels,
230  std::map<std::string, std::vector<std::pair<std::string, int> > >& outputModulePathPositions) {
231  // Before calculating the ParameterSetID of the top level ParameterSet or
232  // saving it in the registry drop from the top level ParameterSet all
233  // OutputModules and EDAnalyzers not on trigger paths. If unscheduled
234  // production is not enabled also drop all the EDFilters and EDProducers
235  // that are not scheduled. Drop the ParameterSet used to configure the module
236  // itself. Also drop the other traces of these labels in the top level
237  // ParameterSet: Remove that labels from @all_modules and from all the
238  // end paths. If this makes any end paths empty, then remove the end path
239  // name from @end_paths, and @paths.
240 
241  // First make a list of labels to drop
242  vstring outputModuleLabels;
243  std::string edmType;
244  std::string const moduleEdmType("@module_edm_type");
245  std::string const outputModule("OutputModule");
246  std::string const edAnalyzer("EDAnalyzer");
247  std::string const edFilter("EDFilter");
248  std::string const edProducer("EDProducer");
249 
250  std::set<std::string> modulesInConfigSet(modulesInConfig.begin(), modulesInConfig.end());
251 
252  //need a list of all modules on paths in order to determine
253  // if an EDAnalyzer only appears on an end path
254  vstring scheduledPaths = proc_pset.getParameter<vstring>("@paths");
255  std::set<std::string> modulesOnPaths;
256  {
257  std::set<std::string> noEndPaths(scheduledPaths.begin(),scheduledPaths.end());
258  for(auto const& endPath: end_path_name_list) {
259  noEndPaths.erase(endPath);
260  }
261  {
262  vstring labels;
263  for(auto const& path: noEndPaths) {
264  labels = proc_pset.getParameter<vstring>(path);
265  modulesOnPaths.insert(labels.begin(),labels.end());
266  }
267  }
268  }
269  //Initially fill labelsToBeDropped with all module mentioned in
270  // the configuration but which are not being used by the system
271  std::vector<std::string> labelsToBeDropped;
272  labelsToBeDropped.reserve(modulesInConfigSet.size());
273  std::set_difference(modulesInConfigSet.begin(),modulesInConfigSet.end(),
274  usedModuleLabels.begin(),usedModuleLabels.end(),
275  std::back_inserter(labelsToBeDropped));
276 
277  const unsigned int sizeBeforeOutputModules = labelsToBeDropped.size();
278  for (auto const& modLabel: usedModuleLabels) {
279  edmType = proc_pset.getParameterSet(modLabel).getParameter<std::string>(moduleEdmType);
280  if (edmType == outputModule) {
281  outputModuleLabels.push_back(modLabel);
282  labelsToBeDropped.push_back(modLabel);
283  }
284  if(edmType == edAnalyzer) {
285  if(modulesOnPaths.end()==modulesOnPaths.find(modLabel)) {
286  labelsToBeDropped.push_back(modLabel);
287  }
288  }
289  }
290  //labelsToBeDropped must be sorted
291  std::inplace_merge(labelsToBeDropped.begin(),
292  labelsToBeDropped.begin()+sizeBeforeOutputModules,
293  labelsToBeDropped.end());
294 
295  // drop the parameter sets used to configure the modules
296  for_all(labelsToBeDropped, std::bind(&ParameterSet::eraseOrSetUntrackedParameterSet, std::ref(proc_pset), _1));
297 
298  // drop the labels from @all_modules
299  vstring::iterator endAfterRemove = std::remove_if(modulesInConfig.begin(), modulesInConfig.end(), std::bind(binary_search_string, std::ref(labelsToBeDropped), _1));
300  modulesInConfig.erase(endAfterRemove, modulesInConfig.end());
301  proc_pset.addParameter<vstring>(std::string("@all_modules"), modulesInConfig);
302 
303  // drop the labels from all end paths
304  vstring endPathsToBeDropped;
305  vstring labels;
306  for (vstring::const_iterator iEndPath = end_path_name_list.begin(), endEndPath = end_path_name_list.end();
307  iEndPath != endEndPath;
308  ++iEndPath) {
309  labels = proc_pset.getParameter<vstring>(*iEndPath);
310  vstring::iterator iSave = labels.begin();
311  vstring::iterator iBegin = labels.begin();
312 
313  for (vstring::iterator iLabel = labels.begin(), iEnd = labels.end();
314  iLabel != iEnd; ++iLabel) {
315  if (binary_search_string(labelsToBeDropped, *iLabel)) {
316  if (binary_search_string(outputModuleLabels, *iLabel)) {
317  outputModulePathPositions[*iLabel].emplace_back(*iEndPath, iSave - iBegin);
318  }
319  } else {
320  if (iSave != iLabel) {
321  iSave->swap(*iLabel);
322  }
323  ++iSave;
324  }
325  }
326  labels.erase(iSave, labels.end());
327  if (labels.empty()) {
328  // remove empty end paths and save their names
329  proc_pset.eraseSimpleParameter(*iEndPath);
330  endPathsToBeDropped.push_back(*iEndPath);
331  } else {
332  proc_pset.addParameter<vstring>(*iEndPath, labels);
333  }
334  }
335  sort_all(endPathsToBeDropped);
336 
337  // remove empty end paths from @paths
338  endAfterRemove = std::remove_if(scheduledPaths.begin(), scheduledPaths.end(), std::bind(binary_search_string, std::ref(endPathsToBeDropped), _1));
339  scheduledPaths.erase(endAfterRemove, scheduledPaths.end());
340  proc_pset.addParameter<vstring>(std::string("@paths"), scheduledPaths);
341 
342  // remove empty end paths from @end_paths
343  vstring scheduledEndPaths = proc_pset.getParameter<vstring>("@end_paths");
344  endAfterRemove = std::remove_if(scheduledEndPaths.begin(), scheduledEndPaths.end(), std::bind(binary_search_string, std::ref(endPathsToBeDropped), _1));
345  scheduledEndPaths.erase(endAfterRemove, scheduledEndPaths.end());
346  proc_pset.addParameter<vstring>(std::string("@end_paths"), scheduledEndPaths);
347 
348  }
349 
350  bool printDependencies(ParameterSet const& pset) {
351  ParameterSet defopts;
352  ParameterSet const& opts = pset.getUntrackedParameterSet("options", defopts);
353  return opts.getUntrackedParameter("printDependencies", false);
354  }
355  }
356  // -----------------------------
357 
358  typedef std::vector<std::string> vstring;
359 
360  // -----------------------------
361 
364  ProductRegistry& preg,
365  BranchIDListHelper& branchIDListHelper,
366  ThinnedAssociationsHelper& thinnedAssociationsHelper,
367  ExceptionToActionTable const& actions,
368  std::shared_ptr<ActivityRegistry> areg,
369  std::shared_ptr<ProcessConfiguration> processConfiguration,
370  bool hasSubprocesses,
372  ProcessContext const* processContext) :
373  //Only create a resultsInserter if there is a trigger path
374  resultsInserter_{tns.getTrigPaths().empty()? std::shared_ptr<TriggerResultInserter>{} :makeInserter(proc_pset,prealloc,preg,actions,areg,processConfiguration)},
375  moduleRegistry_(new ModuleRegistry()),
378  wantSummary_(tns.wantSummary()),
379  printDependencies_(printDependencies(proc_pset)),
381  {
382  assert(0<prealloc.numberOfStreams());
383  streamSchedules_.reserve(prealloc.numberOfStreams());
384  for(unsigned int i=0; i<prealloc.numberOfStreams();++i) {
385  streamSchedules_.emplace_back(std::make_shared<StreamSchedule>(
386  resultsInserter(),
387  moduleRegistry(),
388  proc_pset,tns,prealloc,preg,
389  branchIDListHelper,actions,
390  areg,processConfiguration,
391  !hasSubprocesses,
392  StreamID{i},
393  processContext));
394  }
395 
396  //TriggerResults are injected automatically by StreamSchedules and are
397  // unknown to the ModuleRegistry
398  const std::string kTriggerResults("TriggerResults");
399  std::vector<std::string> modulesToUse;
400  modulesToUse.reserve(streamSchedules_[0]->allWorkers().size());
401  for(auto const& worker : streamSchedules_[0]->allWorkers()) {
402  if(worker->description().moduleLabel() != kTriggerResults) {
403  modulesToUse.push_back(worker->description().moduleLabel());
404  }
405  }
406  //The unscheduled modules are at the end of the list, but we want them at the front
407  unsigned int n = streamSchedules_[0]->numberOfUnscheduledModules();
408  if(n>0) {
409  std::vector<std::string> temp;
410  temp.reserve(modulesToUse.size());
411  auto itBeginUnscheduled = modulesToUse.begin()+modulesToUse.size()-n;
412  std::copy(itBeginUnscheduled,modulesToUse.end(),
413  std::back_inserter(temp));
414  std::copy(modulesToUse.begin(),itBeginUnscheduled,std::back_inserter(temp));
415  temp.swap(modulesToUse);
416  }
417 
418  // propagate_const<T> has no reset() function
419  globalSchedule_ = std::make_unique<GlobalSchedule>(
420  resultsInserter(),
421  moduleRegistry(),
422  modulesToUse,
423  proc_pset, preg, prealloc,
424  actions,areg,processConfiguration,processContext);
425 
426  //TriggerResults is not in the top level ParameterSet so the call to
427  // reduceParameterSet would fail to find it. Just remove it up front.
428  std::set<std::string> usedModuleLabels;
429  for(auto const& worker: allWorkers()) {
430  if(worker->description().moduleLabel() != kTriggerResults) {
431  usedModuleLabels.insert(worker->description().moduleLabel());
432  }
433  }
434  std::vector<std::string> modulesInConfig(proc_pset.getParameter<std::vector<std::string> >("@all_modules"));
435  std::map<std::string, std::vector<std::pair<std::string, int> > > outputModulePathPositions;
436  reduceParameterSet(proc_pset, tns.getEndPaths(), modulesInConfig, usedModuleLabels,
437  outputModulePathPositions);
438  processEDAliases(proc_pset, processConfiguration->processName(), preg);
439  proc_pset.registerIt();
440  processConfiguration->setParameterSetID(proc_pset.id());
441  processConfiguration->setProcessConfigurationID();
442 
443  // This is used for a little sanity-check to make sure no code
444  // modifications alter the number of workers at a later date.
445  size_t all_workers_count = allWorkers().size();
446 
447  moduleRegistry_->forAllModuleHolders([this](maker::ModuleHolder* iHolder){
448  auto comm = iHolder->createOutputModuleCommunicator();
449  if (comm) {
450  all_output_communicators_.emplace_back(std::shared_ptr<OutputModuleCommunicator>{comm.release()});
451  }
452  });
453  // Now that the output workers are filled in, set any output limits or information.
454  limitOutput(proc_pset, branchIDListHelper.branchIDLists());
455 
456  // Sanity check: make sure nobody has added a worker after we've
457  // already relied on the WorkerManager being full.
458  assert (all_workers_count == allWorkers().size());
459 
460  branchIDListHelper.updateFromRegistry(preg);
461 
462  preg.setFrozen();
463 
464  for(auto const& worker : streamSchedules_[0]->allWorkers()) {
465  worker->registerThinnedAssociations(preg, thinnedAssociationsHelper);
466  }
467  thinnedAssociationsHelper.sort();
468 
469  for (auto& c : all_output_communicators_) {
470  c->setEventSelectionInfo(outputModulePathPositions, preg.anyProductProduced());
471  c->selectProducts(preg, thinnedAssociationsHelper);
472  }
473 
474  if(wantSummary_) {
475  std::vector<const ModuleDescription*> modDesc;
476  const auto& workers = allWorkers();
477  modDesc.reserve(workers.size());
478 
479  std::transform(workers.begin(),workers.end(),
480  std::back_inserter(modDesc),
481  [](const Worker* iWorker) -> const ModuleDescription* {
482  return iWorker->descPtr();
483  });
484 
485  // propagate_const<T> has no reset() function
486  summaryTimeKeeper_ = std::make_unique<SystemTimeKeeper>(
487  prealloc.numberOfStreams(),
488  modDesc,
489  tns);
490  auto timeKeeperPtr = summaryTimeKeeper_.get();
491 
492  areg->watchPreModuleEvent(timeKeeperPtr, &SystemTimeKeeper::startModuleEvent);
493  areg->watchPostModuleEvent(timeKeeperPtr, &SystemTimeKeeper::stopModuleEvent);
494  areg->watchPreModuleEventDelayedGet(timeKeeperPtr, &SystemTimeKeeper::pauseModuleEvent);
495  areg->watchPostModuleEventDelayedGet(timeKeeperPtr,&SystemTimeKeeper::restartModuleEvent);
496 
497  areg->watchPreSourceEvent(timeKeeperPtr, &SystemTimeKeeper::startEvent);
498  areg->watchPostEvent(timeKeeperPtr, &SystemTimeKeeper::stopEvent);
499 
500  areg->watchPrePathEvent(timeKeeperPtr, &SystemTimeKeeper::startPath);
501  areg->watchPostPathEvent(timeKeeperPtr, &SystemTimeKeeper::stopPath);
502 
503  areg->watchPostBeginJob(timeKeeperPtr, &SystemTimeKeeper::startProcessingLoop);
504  areg->watchPreEndJob(timeKeeperPtr, &SystemTimeKeeper::stopProcessingLoop);
505  //areg->preModuleEventSignal_.connect([timeKeeperPtr](StreamContext const& iContext, ModuleCallingContext const& iMod) {
506  //timeKeeperPtr->startModuleEvent(iContext,iMod);
507  //});
508  }
509 
510  } // Schedule::Schedule
511 
512 
513  void
514  Schedule::limitOutput(ParameterSet const& proc_pset, BranchIDLists const& branchIDLists) {
515  std::string const output("output");
516 
517  ParameterSet const& maxEventsPSet = proc_pset.getUntrackedParameterSet("maxEvents", ParameterSet());
518  int maxEventSpecs = 0;
519  int maxEventsOut = -1;
520  ParameterSet const* vMaxEventsOut = 0;
521  std::vector<std::string> intNamesE = maxEventsPSet.getParameterNamesForType<int>(false);
522  if (search_all(intNamesE, output)) {
523  maxEventsOut = maxEventsPSet.getUntrackedParameter<int>(output);
524  ++maxEventSpecs;
525  }
526  std::vector<std::string> psetNamesE;
527  maxEventsPSet.getParameterSetNames(psetNamesE, false);
528  if (search_all(psetNamesE, output)) {
529  vMaxEventsOut = &maxEventsPSet.getUntrackedParameterSet(output);
530  ++maxEventSpecs;
531  }
532 
533  if (maxEventSpecs > 1) {
535  "\nAt most, one form of 'output' may appear in the 'maxEvents' parameter set";
536  }
537 
538  for (auto& c : all_output_communicators_) {
539  OutputModuleDescription desc(branchIDLists, maxEventsOut);
540  if (vMaxEventsOut != 0 && !vMaxEventsOut->empty()) {
541  std::string const& moduleLabel = c->description().moduleLabel();
542  try {
543  desc.maxEvents_ = vMaxEventsOut->getUntrackedParameter<int>(moduleLabel);
544  } catch (Exception const&) {
546  "\nNo entry in 'maxEvents' for output module label '" << moduleLabel << "'.\n";
547  }
548  }
549  c->configure(desc);
550  }
551  }
552 
553  bool Schedule::terminate() const {
554  if (all_output_communicators_.empty()) {
555  return false;
556  }
557  for (auto& c : all_output_communicators_) {
558  if (!c->limitReached()) {
559  // Found an output module that has not reached output event count.
560  return false;
561  }
562  }
563  LogInfo("SuccessfulTermination")
564  << "The job is terminating successfully because each output module\n"
565  << "has reached its configured limit.\n";
566  return true;
567  }
568 
570  globalSchedule_->endJob(collector);
571  if (collector.hasThrown()) {
572  return;
573  }
574 
575  if (wantSummary_ == false) return;
576  {
577  TriggerReport tr;
578  getTriggerReport(tr);
579 
580  // The trigger report (pass/fail etc.):
581 
582  LogVerbatim("FwkSummary") << "";
583  LogVerbatim("FwkSummary") << "TrigReport " << "---------- Event Summary ------------";
584  if(!tr.trigPathSummaries.empty()) {
585  LogVerbatim("FwkSummary") << "TrigReport"
586  << " Events total = " << tr.eventSummary.totalEvents
587  << " passed = " << tr.eventSummary.totalEventsPassed
588  << " failed = " << tr.eventSummary.totalEventsFailed
589  << "";
590  } else {
591  LogVerbatim("FwkSummary") << "TrigReport"
592  << " Events total = " << tr.eventSummary.totalEvents
593  << " passed = " << tr.eventSummary.totalEvents
594  << " failed = 0";
595  }
596 
597  LogVerbatim("FwkSummary") << "";
598  LogVerbatim("FwkSummary") << "TrigReport " << "---------- Path Summary ------------";
599  LogVerbatim("FwkSummary") << "TrigReport "
600  << std::right << std::setw(10) << "Trig Bit#" << " "
601  << std::right << std::setw(10) << "Executed" << " "
602  << std::right << std::setw(10) << "Passed" << " "
603  << std::right << std::setw(10) << "Failed" << " "
604  << std::right << std::setw(10) << "Error" << " "
605  << "Name" << "";
606  for (auto const& p: tr.trigPathSummaries) {
607  LogVerbatim("FwkSummary") << "TrigReport "
608  << std::right << std::setw(5) << 1
609  << std::right << std::setw(5) << p.bitPosition << " "
610  << std::right << std::setw(10) << p.timesRun << " "
611  << std::right << std::setw(10) << p.timesPassed << " "
612  << std::right << std::setw(10) << p.timesFailed << " "
613  << std::right << std::setw(10) << p.timesExcept << " "
614  << p.name << "";
615  }
616 
617  /*
618  std::vector<int>::const_iterator epi = empty_trig_paths_.begin();
619  std::vector<int>::const_iterator epe = empty_trig_paths_.end();
620  std::vector<std::string>::const_iterator epn = empty_trig_path_names_.begin();
621  for (; epi != epe; ++epi, ++epn) {
622 
623  LogVerbatim("FwkSummary") << "TrigReport "
624  << std::right << std::setw(5) << 1
625  << std::right << std::setw(5) << *epi << " "
626  << std::right << std::setw(10) << totalEvents() << " "
627  << std::right << std::setw(10) << totalEvents() << " "
628  << std::right << std::setw(10) << 0 << " "
629  << std::right << std::setw(10) << 0 << " "
630  << *epn << "";
631  }
632  */
633 
634  LogVerbatim("FwkSummary") << "";
635  LogVerbatim("FwkSummary") << "TrigReport " << "-------End-Path Summary ------------";
636  LogVerbatim("FwkSummary") << "TrigReport "
637  << std::right << std::setw(10) << "Trig Bit#" << " "
638  << std::right << std::setw(10) << "Executed" << " "
639  << std::right << std::setw(10) << "Passed" << " "
640  << std::right << std::setw(10) << "Failed" << " "
641  << std::right << std::setw(10) << "Error" << " "
642  << "Name" << "";
643  for (auto const& p: tr.endPathSummaries) {
644  LogVerbatim("FwkSummary") << "TrigReport "
645  << std::right << std::setw(5) << 0
646  << std::right << std::setw(5) << p.bitPosition << " "
647  << std::right << std::setw(10) << p.timesRun << " "
648  << std::right << std::setw(10) << p.timesPassed << " "
649  << std::right << std::setw(10) << p.timesFailed << " "
650  << std::right << std::setw(10) << p.timesExcept << " "
651  << p.name << "";
652  }
653 
654  for (auto const& p: tr.trigPathSummaries) {
655  LogVerbatim("FwkSummary") << "";
656  LogVerbatim("FwkSummary") << "TrigReport " << "---------- Modules in Path: " << p.name << " ------------";
657  LogVerbatim("FwkSummary") << "TrigReport "
658  << std::right << std::setw(10) << "Trig Bit#" << " "
659  << std::right << std::setw(10) << "Visited" << " "
660  << std::right << std::setw(10) << "Passed" << " "
661  << std::right << std::setw(10) << "Failed" << " "
662  << std::right << std::setw(10) << "Error" << " "
663  << "Name" << "";
664 
665  unsigned int bitpos = 0;
666  for (auto const& mod: p.moduleInPathSummaries) {
667  LogVerbatim("FwkSummary") << "TrigReport "
668  << std::right << std::setw(5) << 1
669  << std::right << std::setw(5) << bitpos << " "
670  << std::right << std::setw(10) << mod.timesVisited << " "
671  << std::right << std::setw(10) << mod.timesPassed << " "
672  << std::right << std::setw(10) << mod.timesFailed << " "
673  << std::right << std::setw(10) << mod.timesExcept << " "
674  << mod.moduleLabel << "";
675  ++bitpos;
676  }
677  }
678 
679  for (auto const& p: tr.endPathSummaries) {
680  LogVerbatim("FwkSummary") << "";
681  LogVerbatim("FwkSummary") << "TrigReport " << "------ Modules in End-Path: " << p.name << " ------------";
682  LogVerbatim("FwkSummary") << "TrigReport "
683  << std::right << std::setw(10) << "Trig Bit#" << " "
684  << std::right << std::setw(10) << "Visited" << " "
685  << std::right << std::setw(10) << "Passed" << " "
686  << std::right << std::setw(10) << "Failed" << " "
687  << std::right << std::setw(10) << "Error" << " "
688  << "Name" << "";
689 
690  unsigned int bitpos=0;
691  for (auto const& mod: p.moduleInPathSummaries) {
692  LogVerbatim("FwkSummary") << "TrigReport "
693  << std::right << std::setw(5) << 0
694  << std::right << std::setw(5) << bitpos << " "
695  << std::right << std::setw(10) << mod.timesVisited << " "
696  << std::right << std::setw(10) << mod.timesPassed << " "
697  << std::right << std::setw(10) << mod.timesFailed << " "
698  << std::right << std::setw(10) << mod.timesExcept << " "
699  << mod.moduleLabel << "";
700  ++bitpos;
701  }
702  }
703 
704  LogVerbatim("FwkSummary") << "";
705  LogVerbatim("FwkSummary") << "TrigReport " << "---------- Module Summary ------------";
706  LogVerbatim("FwkSummary") << "TrigReport "
707  << std::right << std::setw(10) << "Visited" << " "
708  << std::right << std::setw(10) << "Executed" << " "
709  << std::right << std::setw(10) << "Passed" << " "
710  << std::right << std::setw(10) << "Failed" << " "
711  << std::right << std::setw(10) << "Error" << " "
712  << "Name" << "";
713  for (auto const& worker : tr.workerSummaries) {
714  LogVerbatim("FwkSummary") << "TrigReport "
715  << std::right << std::setw(10) << worker.timesVisited << " "
716  << std::right << std::setw(10) << worker.timesRun << " "
717  << std::right << std::setw(10) << worker.timesPassed << " "
718  << std::right << std::setw(10) << worker.timesFailed << " "
719  << std::right << std::setw(10) << worker.timesExcept << " "
720  << worker.moduleLabel << "";
721  }
722  LogVerbatim("FwkSummary") << "";
723  }
724  // The timing report (CPU and Real Time):
727 
728  const int totalEvents = std::max(1, tr.eventSummary.totalEvents);
729 
730  LogVerbatim("FwkSummary") << "TimeReport " << "---------- Event Summary ---[sec]----";
731  LogVerbatim("FwkSummary") << "TimeReport"
732  << std::setprecision(6) << std::fixed
733  << " event loop CPU/event = " << tr.eventSummary.cpuTime/totalEvents;
734  LogVerbatim("FwkSummary") << "TimeReport"
735  << std::setprecision(6) << std::fixed
736  << " event loop Real/event = " << tr.eventSummary.realTime/totalEvents;
737  LogVerbatim("FwkSummary") << "TimeReport"
738  << std::setprecision(6) << std::fixed
739  << " sum Streams Real/event = " << tr.eventSummary.sumStreamRealTime/totalEvents;
740  LogVerbatim("FwkSummary") << "TimeReport"
741  << std::setprecision(6) << std::fixed
742  << " efficiency CPU/Real/thread = " << tr.eventSummary.cpuTime/tr.eventSummary.realTime/preallocConfig_.numberOfThreads();
743 
744  constexpr int kColumn1Size = 10;
745  constexpr int kColumn2Size = 12;
746  constexpr int kColumn3Size = 12;
747  LogVerbatim("FwkSummary") << "";
748  LogVerbatim("FwkSummary") << "TimeReport " << "---------- Path Summary ---[Real sec]----";
749  LogVerbatim("FwkSummary") << "TimeReport "
750  << std::right << std::setw(kColumn1Size) << "per event"<<" "
751  << std::right << std::setw(kColumn2Size) << "per exec"
752  << " Name";
753  for (auto const& p: tr.trigPathSummaries) {
754  const int timesRun = std::max(1, p.timesRun);
755  LogVerbatim("FwkSummary") << "TimeReport "
756  << std::setprecision(6) << std::fixed
757  << std::right << std::setw(kColumn1Size) << p.realTime/totalEvents << " "
758  << std::right << std::setw(kColumn2Size) << p.realTime/timesRun << " "
759  << p.name << "";
760  }
761  LogVerbatim("FwkSummary") << "TimeReport "
762  << std::right << std::setw(kColumn1Size) << "per event"<<" "
763  << std::right << std::setw(kColumn2Size) << "per exec"
764  << " Name" << "";
765 
766  LogVerbatim("FwkSummary") << "";
767  LogVerbatim("FwkSummary") << "TimeReport " << "-------End-Path Summary ---[Real sec]----";
768  LogVerbatim("FwkSummary") << "TimeReport "
769  << std::right << std::setw(kColumn1Size) << "per event" <<" "
770  << std::right << std::setw(kColumn2Size) << "per exec"
771  << " Name" << "";
772  for (auto const& p: tr.endPathSummaries) {
773  const int timesRun = std::max(1, p.timesRun);
774 
775  LogVerbatim("FwkSummary") << "TimeReport "
776  << std::setprecision(6) << std::fixed
777  << std::right << std::setw(kColumn1Size) << p.realTime/totalEvents << " "
778  << std::right << std::setw(kColumn2Size) << p.realTime/timesRun << " "
779  << p.name << "";
780  }
781  LogVerbatim("FwkSummary") << "TimeReport "
782  << std::right << std::setw(kColumn1Size) << "per event" <<" "
783  << std::right << std::setw(kColumn2Size) << "per exec"
784  << " Name" << "";
785 
786  for (auto const& p: tr.trigPathSummaries) {
787  LogVerbatim("FwkSummary") << "";
788  LogVerbatim("FwkSummary") << "TimeReport " << "---------- Modules in Path: " << p.name << " ---[Real sec]----";
789  LogVerbatim("FwkSummary") << "TimeReport "
790  << std::right << std::setw(kColumn1Size) << "per event" <<" "
791  << std::right << std::setw(kColumn2Size) << "per visit"
792  << " Name" << "";
793  for (auto const& mod: p.moduleInPathSummaries) {
794  LogVerbatim("FwkSummary") << "TimeReport "
795  << std::setprecision(6) << std::fixed
796  << std::right << std::setw(kColumn1Size) << mod.realTime/totalEvents << " "
797  << std::right << std::setw(kColumn2Size) << mod.realTime/std::max(1, mod.timesVisited) << " "
798  << mod.moduleLabel << "";
799  }
800  }
801  if(not tr.trigPathSummaries.empty()) {
802  LogVerbatim("FwkSummary") << "TimeReport "
803  << std::right << std::setw(kColumn1Size) << "per event" <<" "
804  << std::right << std::setw(kColumn2Size) << "per visit"
805  << " Name" << "";
806  }
807  for (auto const& p: tr.endPathSummaries) {
808  LogVerbatim("FwkSummary") << "";
809  LogVerbatim("FwkSummary") << "TimeReport " << "------ Modules in End-Path: " << p.name << " ---[Real sec]----";
810  LogVerbatim("FwkSummary") << "TimeReport "
811  << std::right << std::setw(kColumn1Size) << "per event" <<" "
812  << std::right << std::setw(kColumn2Size) << "per visit"
813  << " Name" << "";
814  for (auto const& mod: p.moduleInPathSummaries) {
815  LogVerbatim("FwkSummary") << "TimeReport "
816  << std::setprecision(6) << std::fixed
817  << std::right << std::setw(kColumn1Size) << mod.realTime/totalEvents << " "
818  << std::right << std::setw(kColumn2Size) << mod.realTime/std::max(1, mod.timesVisited) << " "
819  << mod.moduleLabel << "";
820  }
821  }
822  if(not tr.endPathSummaries.empty()) {
823  LogVerbatim("FwkSummary") << "TimeReport "
824  << std::right << std::setw(kColumn1Size) << "per event" <<" "
825  << std::right << std::setw(kColumn2Size) << "per visit"
826  << " Name" << "";
827  }
828  LogVerbatim("FwkSummary") << "";
829  LogVerbatim("FwkSummary") << "TimeReport " << "---------- Module Summary ---[Real sec]----";
830  LogVerbatim("FwkSummary") << "TimeReport "
831  << std::right << std::setw(kColumn1Size) << "per event" <<" "
832  << std::right << std::setw(kColumn2Size) << "per exec" <<" "
833  << std::right << std::setw(kColumn3Size) << "per visit"
834  << " Name" << "";
835  for (auto const& worker : tr.workerSummaries) {
836  LogVerbatim("FwkSummary") << "TimeReport "
837  << std::setprecision(6) << std::fixed
838  << std::right << std::setw(kColumn1Size) << worker.realTime/totalEvents << " "
839  << std::right << std::setw(kColumn2Size) << worker.realTime/std::max(1, worker.timesRun) << " "
840  << std::right << std::setw(kColumn3Size) << worker.realTime/std::max(1, worker.timesVisited) << " "
841  << worker.moduleLabel << "";
842  }
843  LogVerbatim("FwkSummary") << "TimeReport "
844  << std::right << std::setw(kColumn1Size) << "per event" <<" "
845  << std::right << std::setw(kColumn2Size) << "per exec" <<" "
846  << std::right << std::setw(kColumn3Size) << "per visit"
847  << " Name" << "";
848 
849  LogVerbatim("FwkSummary") << "";
850  LogVerbatim("FwkSummary") << "T---Report end!" << "";
851  LogVerbatim("FwkSummary") << "";
852  }
853 
855  using std::placeholders::_1;
857  }
858 
860  using std::placeholders::_1;
862  }
863 
865  using std::placeholders::_1;
867  }
868 
869  void Schedule::writeRun(RunPrincipal const& rp, ProcessContext const* processContext) {
870  using std::placeholders::_1;
871  for_all(all_output_communicators_, std::bind(&OutputModuleCommunicator::writeRun, _1, std::cref(rp), processContext));
872  }
873 
874  void Schedule::writeLumi(LuminosityBlockPrincipal const& lbp, ProcessContext const* processContext) {
875  using std::placeholders::_1;
876  for_all(all_output_communicators_, std::bind(&OutputModuleCommunicator::writeLumi, _1, std::cref(lbp), processContext));
877  }
878 
880  using std::placeholders::_1;
881  // Return true iff at least one output module returns true.
882  return (std::find_if (all_output_communicators_.begin(), all_output_communicators_.end(),
884  != all_output_communicators_.end());
885  }
886 
888  using std::placeholders::_1;
889  for_all(allWorkers(), std::bind(&Worker::respondToOpenInputFile, _1, std::cref(fb)));
890  }
891 
893  using std::placeholders::_1;
894  for_all(allWorkers(), std::bind(&Worker::respondToCloseInputFile, _1, std::cref(fb)));
895  }
896 
897  void Schedule::beginJob(ProductRegistry const& iRegistry) {
899 
900  globalSchedule_->beginJob(iRegistry);
901  }
902 
903  void Schedule::beginStream(unsigned int iStreamID) {
904  assert(iStreamID<streamSchedules_.size());
905  streamSchedules_[iStreamID]->beginStream();
906  }
907 
908  void Schedule::endStream(unsigned int iStreamID) {
909  assert(iStreamID<streamSchedules_.size());
910  streamSchedules_[iStreamID]->endStream();
911  }
912 
914  using std::placeholders::_1;
916  }
917  void Schedule::postForkReacquireResources(unsigned int iChildIndex, unsigned int iNumberOfChildren) {
918  using std::placeholders::_1;
919  for_all(allWorkers(), std::bind(&Worker::postForkReacquireResources, _1, iChildIndex, iNumberOfChildren));
920  }
921 
923  ParameterSet const& iPSet,
924  const ProductRegistry& iRegistry) {
925  Worker* found = nullptr;
926  for (auto const& worker : allWorkers()) {
927  if (worker->description().moduleLabel() == iLabel) {
928  found = worker;
929  break;
930  }
931  }
932  if (nullptr == found) {
933  return false;
934  }
935 
936  auto newMod = moduleRegistry_->replaceModule(iLabel,iPSet,preallocConfig_);
937 
938  globalSchedule_->replaceModule(newMod,iLabel);
939 
940  for(auto& s: streamSchedules_) {
941  s->replaceModule(newMod,iLabel);
942  }
943 
944  {
945  //Need to updateLookup in order to make getByToken work
946  auto const runLookup = iRegistry.productLookup(InRun);
947  auto const lumiLookup = iRegistry.productLookup(InLumi);
948  auto const eventLookup = iRegistry.productLookup(InEvent);
949  found->updateLookup(InRun,*runLookup);
950  found->updateLookup(InLumi,*lumiLookup);
951  found->updateLookup(InEvent,*eventLookup);
952  }
953 
954  return true;
955  }
956 
957  std::vector<ModuleDescription const*>
959  std::vector<ModuleDescription const*> result;
960  result.reserve(allWorkers().size());
961 
962  for (auto const& worker : allWorkers()) {
963  ModuleDescription const* p = worker->descPtr();
964  result.push_back(p);
965  }
966  return result;
967  }
968 
969  Schedule::AllWorkers const&
971  return globalSchedule_->allWorkers();
972  }
973 
974  void
975  Schedule::availablePaths(std::vector<std::string>& oLabelsToFill) const {
976  streamSchedules_[0]->availablePaths(oLabelsToFill);
977  }
978 
979  void
980  Schedule::triggerPaths(std::vector<std::string>& oLabelsToFill) const {
981  streamSchedules_[0]->triggerPaths(oLabelsToFill);
982  }
983 
984  void
985  Schedule::endPaths(std::vector<std::string>& oLabelsToFill) const {
986  streamSchedules_[0]->endPaths(oLabelsToFill);
987  }
988 
989  void
991  std::vector<std::string>& oLabelsToFill) const {
992  streamSchedules_[0]->modulesInPath(iPathLabel,oLabelsToFill);
993  }
994 
995  void
997  std::vector<ModuleDescription const*>& descriptions,
998  unsigned int hint) const {
999  streamSchedules_[0]->moduleDescriptionsInPath(iPathLabel, descriptions, hint);
1000  }
1001 
1002  void
1004  std::vector<ModuleDescription const*>& descriptions,
1005  unsigned int hint) const {
1006  streamSchedules_[0]->moduleDescriptionsInEndPath(iEndPathLabel, descriptions, hint);
1007  }
1008 
1009  void
1010  Schedule::fillModuleAndConsumesInfo(std::vector<ModuleDescription const*>& allModuleDescriptions,
1011  std::vector<std::pair<unsigned int, unsigned int> >& moduleIDToIndex,
1012  std::vector<std::vector<ModuleDescription const*> >& modulesWhoseProductsAreConsumedBy,
1013  ProductRegistry const& preg) const {
1014  allModuleDescriptions.clear();
1015  moduleIDToIndex.clear();
1016  modulesWhoseProductsAreConsumedBy.clear();
1017 
1018  allModuleDescriptions.reserve(allWorkers().size());
1019  moduleIDToIndex.reserve(allWorkers().size());
1020  modulesWhoseProductsAreConsumedBy.resize(allWorkers().size());
1021 
1022  std::map<std::string, ModuleDescription const*> labelToDesc;
1023  unsigned int i = 0;
1024  for (auto const& worker : allWorkers()) {
1025  ModuleDescription const* p = worker->descPtr();
1026  allModuleDescriptions.push_back(p);
1027  moduleIDToIndex.push_back(std::pair<unsigned int, unsigned int>(p->id(), i));
1028  labelToDesc[p->moduleLabel()] = p;
1029  ++i;
1030  }
1031  sort_all(moduleIDToIndex);
1032 
1033  i = 0;
1034  for (auto const& worker : allWorkers()) {
1035  std::vector<ModuleDescription const*>& modules = modulesWhoseProductsAreConsumedBy.at(i);
1036  worker->modulesWhoseProductsAreConsumed(modules, preg, labelToDesc);
1037  ++i;
1038  }
1039  }
1040 
1041  void
1043  endpathsAreActive_ = active;
1044  for(auto& s : streamSchedules_) {
1045  s->enableEndPaths(active);
1046  }
1047  }
1048 
1049  bool
1051  return endpathsAreActive_;
1052  }
1053 
1054  void
1056  rep.eventSummary.totalEvents = 0;
1059  for(auto& s: streamSchedules_) {
1060  s->getTriggerReport(rep);
1061  }
1062  }
1063 
1064  void
1066  rep.eventSummary.totalEvents = 0;
1067  rep.eventSummary.cpuTime = 0.;
1068  rep.eventSummary.realTime = 0.;
1069  summaryTimeKeeper_->fillTriggerTimingReport(rep);
1070  }
1071 
1072  int
1074  int returnValue = 0;
1075  for(auto& s: streamSchedules_) {
1076  returnValue += s->totalEvents();
1077  }
1078  return returnValue;
1079  }
1080 
1081  int
1083  int returnValue = 0;
1084  for(auto& s: streamSchedules_) {
1085  returnValue += s->totalEventsPassed();
1086  }
1087  return returnValue;
1088  }
1089 
1090  int
1092  int returnValue = 0;
1093  for(auto& s: streamSchedules_) {
1094  returnValue += s->totalEventsFailed();
1095  }
1096  return returnValue;
1097  }
1098 
1099 
1100  void
1102  for(auto& s: streamSchedules_) {
1103  s->clearCounters();
1104  }
1105  }
1106 
1107  //====================================
1108  // Schedule::checkForCorrectness algorithm
1109  //
1110  // The code creates a 'dependency' graph between all
1111  // modules. A module depends on another module if
1112  // 1) it 'consumes' data produced by that module
1113  // 2) it appears directly after the module within a Path
1114  //
1115  // If there is a cycle in the 'dependency' graph then
1116  // the schedule may be unrunnable. The schedule is still
1117  // runnable if all cycles have at least two edges which
1118  // connect modules only by Path dependencies (i.e. not
1119  // linked by a data dependency).
1120  //
1121  // Example 1:
1122  // C consumes data from B
1123  // Path 1: A + B + C
1124  // Path 2: B + C + A
1125  //
1126  // Cycle: A after C [p2], C consumes B, B after A [p1]
1127  // Since this cycle has 2 path only edges it is OK since
1128  // A and (B+C) are independent so their run order doesn't matter
1129  //
1130  // Example 2:
1131  // B consumes A
1132  // C consumes B
1133  // Path: C + A
1134  //
1135  // Cycle: A after C [p], C consumes B, B consumes A
1136  // Since this cycle has 1 path only edge it is unrunnable.
1137  //
1138  // Example 3:
1139  // A consumes B
1140  // B consumes C
1141  // C consumes A
1142  // (no Path since unscheduled execution)
1143  //
1144  // Cycle: A consumes B, B consumes C, C consumes A
1145  // Since this cycle has 0 path only edges it is unrunnable.
1146  //====================================
1147 
1148  namespace {
1149  typedef std::pair<unsigned int, unsigned int> SimpleEdge;
1150  typedef std::map<SimpleEdge, std::vector<unsigned int>> EdgeToPathMap;
1151 
1152  typedef boost::adjacency_list<boost::vecS, boost::vecS, boost::bidirectionalS> Graph;
1153 
1154  typedef boost::graph_traits<Graph>::edge_descriptor Edge;
1155  struct cycle_detector : public boost::dfs_visitor<> {
1156 
1157  cycle_detector(EdgeToPathMap const& iEdgeToPathMap,
1158  std::vector<std::string> const& iPathNames,
1159  std::map<std::string,unsigned int> const& iModuleNamesToIndex):
1160  m_edgeToPathMap(iEdgeToPathMap),
1161  m_pathNames(iPathNames),
1162  m_namesToIndex(iModuleNamesToIndex){}
1163 
1164  void tree_edge(Edge iEdge, Graph const&) {
1165  m_stack.push_back(iEdge);
1166  }
1167 
1168  void finish_edge(Edge iEdge, Graph const& iGraph) {
1169  if(not m_stack.empty()) {
1170  if (iEdge == m_stack.back()) {
1171  m_stack.pop_back();
1172  }
1173  }
1174  }
1175 
1176  //Called if a cycle happens
1177  void back_edge(Edge iEdge, Graph const& iGraph) {
1178  //NOTE: If the path containing the cycle contains two or more
1179  // path only edges then there is no problem
1180 
1182  IndexMap const& index = get(boost::vertex_index, iGraph);
1183 
1184  unsigned int vertex = index[target(iEdge,iGraph)];
1185 
1186  //Find last edge which starts with this vertex
1187  std::list<Edge>::iterator itFirst = m_stack.begin();
1188  {
1189  bool seenVertex = false;
1190  while(itFirst != m_stack.end()) {
1191  if(not seenVertex) {
1192  if(index[source(*itFirst,iGraph)] == vertex) {
1193  seenVertex = true;
1194  }
1195  } else
1196  if (index[source(*itFirst,iGraph)] != vertex) {
1197  break;
1198  }
1199  ++itFirst;
1200  }
1201  if(itFirst != m_stack.begin()) {
1202  --itFirst;
1203  }
1204  }
1205  //This edge has not been added to the stack yet
1206  // making a copy allows us to add it in but not worry
1207  // about removing it at the end of the routine
1208  std::vector<Edge> tempStack;
1209  tempStack.reserve(m_stack.size()+1);
1210  tempStack.insert(tempStack.end(),itFirst,m_stack.end());
1211  tempStack.emplace_back(iEdge);
1212 
1213  unsigned int nPathDependencyOnly =0;
1214  for(auto const& edge: tempStack) {
1215  unsigned int in =index[source(edge,iGraph)];
1216  unsigned int out =index[target(edge,iGraph)];
1217 
1218  auto iFound = m_edgeToPathMap.find(SimpleEdge(in,out));
1219  bool pathDependencyOnly = true;
1220  for(auto dependency : iFound->second) {
1221  if (dependency == std::numeric_limits<unsigned int>::max()) {
1222  pathDependencyOnly = false;
1223  break;
1224  }
1225  }
1226  if (pathDependencyOnly) {
1227  ++nPathDependencyOnly;
1228  }
1229  }
1230  if(nPathDependencyOnly < 2) {
1231  throwOnError(tempStack,index,iGraph);
1232  }
1233  }
1234  private:
1235  std::string const& pathName(unsigned int iIndex) const {
1236  return m_pathNames[iIndex];
1237  }
1238 
1239  std::string const& moduleName(unsigned int iIndex) const {
1240  for(auto const& item : m_namesToIndex) {
1241  if(item.second == iIndex) {
1242  return item.first;
1243  }
1244  }
1245  assert(false);
1246  }
1247 
1248  void
1249  throwOnError(std::vector<Edge>const& iEdges,
1251  Graph const& iGraph) const {
1252  std::stringstream oStream;
1253  oStream <<"Module run order problem found: \n";
1254  bool first_edge = true;
1255  for(auto const& edge: iEdges) {
1256  unsigned int in =iIndex[source(edge,iGraph)];
1257  unsigned int out =iIndex[target(edge,iGraph)];
1258 
1259  if(first_edge) {
1260  first_edge = false;
1261  } else {
1262  oStream<<", ";
1263  }
1264  oStream <<moduleName(in);
1265 
1266  auto iFound = m_edgeToPathMap.find(SimpleEdge(in,out));
1267  bool pathDependencyOnly = true;
1268  for(auto dependency : iFound->second) {
1269  if (dependency == std::numeric_limits<unsigned int>::max()) {
1270  pathDependencyOnly = false;
1271  break;
1272  }
1273  }
1274  if (pathDependencyOnly) {
1275  oStream <<" after "<<moduleName(out)<<" [path "<<pathName(iFound->second[0])<<"]";
1276  } else {
1277  oStream <<" consumes "<<moduleName(out);
1278  }
1279  }
1280  oStream<<"\n Running in the threaded framework would lead to indeterminate results."
1281  "\n Please change order of modules in mentioned Path(s) to avoid inconsistent module ordering.";
1282 
1283  throw Exception(errors::ScheduleExecutionFailure, "Unrunnable schedule\n")
1284  << oStream.str() << "\n";
1285  }
1286 
1287  EdgeToPathMap const& m_edgeToPathMap;
1288  std::vector<std::string> const& m_pathNames;
1289  std::map<std::string,unsigned int> m_namesToIndex;
1290 
1291  std::list<Edge> m_stack;
1292  };
1293  }
1294 
1295  void
1297  {
1298  //Need to lookup names to ids quickly
1299  std::map<std::string,unsigned int> moduleNamesToIndex;
1300  for(auto const& worker: allWorkers()) {
1301  moduleNamesToIndex.insert(std::make_pair(worker->description().moduleLabel(),
1302  worker->description().id()));
1303  }
1304 
1305  //If a module to module dependency comes from a path, remember which path
1306  EdgeToPathMap edgeToPathMap;
1307 
1308  //determine the path dependencies
1309  std::vector<std::string> pathNames;
1310  {
1311  streamSchedules_[0]->availablePaths(pathNames);
1312 
1313  std::vector<std::string> moduleNames;
1314  std::vector<std::string> reducedModuleNames;
1315  unsigned int pathIndex=0;
1316  for(auto const& path: pathNames) {
1317  moduleNames.clear();
1318  reducedModuleNames.clear();
1319  std::set<std::string> alreadySeenNames;
1320 
1321  streamSchedules_[0]->modulesInPath(path,moduleNames);
1322  std::string lastModuleName;
1323  unsigned int lastModuleIndex;
1324  for(auto const& name: moduleNames) {
1325  auto found = alreadySeenNames.insert(name);
1326  if(found.second) {
1327  //first time for this path
1328  unsigned int const moduleIndex = moduleNamesToIndex[name];
1329  if(not lastModuleName.empty()) {
1330  edgeToPathMap[std::make_pair(moduleIndex,lastModuleIndex)].push_back(pathIndex);
1331  }
1332  lastModuleName = name;
1333  lastModuleIndex = moduleIndex;
1334  }
1335  }
1336  ++pathIndex;
1337  }
1338  }
1339  {
1340  std::vector<const char*> dependentModules;
1341  //determine the data dependencies
1342  for(auto const& worker: allWorkers()) {
1343  dependentModules.clear();
1344  //NOTE: what about aliases?
1345  worker->modulesDependentUpon(dependentModules, printDependencies_);
1346  auto found = moduleNamesToIndex.find(worker->description().moduleLabel());
1347  if (found == moduleNamesToIndex.end()) {
1348  //The module was from a previous process
1349  continue;
1350  }
1351  unsigned int const moduleIndex = found->second;
1352  for(auto const& name: dependentModules) {
1353  edgeToPathMap[std::make_pair(moduleIndex, moduleNamesToIndex[name])].push_back(std::numeric_limits<unsigned int>::max());
1354  }
1355  }
1356  }
1357  //Now use boost graph library to find cycles in the dependencies
1358  std::vector<SimpleEdge> outList;
1359  outList.reserve(edgeToPathMap.size());
1360  for(auto const& edgeInfo: edgeToPathMap) {
1361  outList.push_back(edgeInfo.first);
1362  }
1363 
1364  Graph g(outList.begin(),outList.end(), moduleNamesToIndex.size());
1365 
1366  cycle_detector detector(edgeToPathMap,pathNames,moduleNamesToIndex);
1367  boost::depth_first_search(g,boost::visitor(detector));
1368  }
1369 }
type
Definition: HCALResponse.h:21
bool empty() const
Definition: ParameterSet.h:218
std::vector< PathSummary > endPathSummaries
Definition: TriggerReport.h:68
T getUntrackedParameter(std::string const &, T const &) const
std::vector< PathTimingSummary > endPathSummaries
dictionary aliases
Definition: autoCond.py:46
int i
Definition: DBlmapReader.cc:9
string rep
Definition: cuy.py:1188
void checkForCorrectness() const
Check that the schedule is actually runable.
Definition: Schedule.cc:1296
void stopEvent(StreamContext const &)
std::vector< BranchIDList > BranchIDLists
Definition: BranchIDList.h:19
void fillModuleAndConsumesInfo(std::vector< ModuleDescription const * > &allModuleDescriptions, std::vector< std::pair< unsigned int, unsigned int > > &moduleIDToIndex, std::vector< std::vector< ModuleDescription const * > > &modulesWhoseProductsAreConsumedBy, ProductRegistry const &preg) const
Definition: Schedule.cc:1010
virtual void openFile(FileBlock const &fb)=0
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
Definition: Schedule.cc:970
void availablePaths(std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill the labels for all paths in the process
Definition: Schedule.cc:975
static std::string const source("source")
virtual void writeRun(RunPrincipal const &rp, ProcessContext const *)=0
preallocConfig_(prealloc)
void restartModuleEvent(StreamContext const &, ModuleCallingContext const &)
bool endPathsEnabled() const
Definition: Schedule.cc:1050
all_output_communicators_()
void respondToCloseInputFile(FileBlock const &fb)
Definition: Schedule.cc:892
void startModuleEvent(StreamContext const &, ModuleCallingContext const &)
std::vector< Worker * > AllWorkers
Definition: Schedule.h:115
std::vector< ParameterSet > VParameterSet
Definition: ParameterSet.h:33
assert(m_qm.get())
void writeRun(RunPrincipal const &rp, ProcessContext const *)
Definition: Schedule.cc:869
void endStream(unsigned int)
Definition: Schedule.cc:908
void writeLumi(LuminosityBlockPrincipal const &lbp, ProcessContext const *)
Definition: Schedule.cc:874
void enableEndPaths(bool active)
Definition: Schedule.cc:1042
processConfiguration
Definition: Schedule.cc:374
ParameterSet getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
endpathsAreActive_(true)
Definition: Schedule.cc:380
std::shared_ptr< ProductHolderIndexHelper const > productLookup(BranchType branchType) const
void moduleDescriptionsInEndPath(std::string const &iEndPathLabel, std::vector< ModuleDescription const * > &descriptions, unsigned int hint) const
Definition: Schedule.cc:1003
std::vector< WorkerSummary > workerSummaries
Definition: TriggerReport.h:69
edm::propagate_const< std::unique_ptr< SystemTimeKeeper > > summaryTimeKeeper_
Definition: Schedule.h:285
actions
Definition: Schedule.cc:374
std::string const & moduleLabel() const
static unsigned int getUniqueID()
Returns a unique id each time called. Intended to be passed to ModuleDescription&#39;s constructor&#39;s modI...
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e g
Definition: Activities.doc:4
#define constexpr
int totalEventsFailed() const
Definition: Schedule.cc:1091
edm::propagate_const< std::unique_ptr< GlobalSchedule > > globalSchedule_
Definition: Schedule.h:280
bool changeModule(std::string const &iLabel, ParameterSet const &iPSet, const ProductRegistry &iRegistry)
Definition: Schedule.cc:922
tuple result
Definition: mps_fire.py:83
void eraseOrSetUntrackedParameterSet(std::string const &name)
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
std::vector< std::string > getParameterNamesForType(bool trackiness=true) const
Definition: ParameterSet.h:194
int totalEventsPassed() const
Definition: Schedule.cc:1082
void triggerPaths(std::vector< std::string > &oLabelsToFill) const
Definition: Schedule.cc:980
std::string moduleName(Provenance const &provenance)
Definition: Provenance.cc:27
virtual void updateLookup(BranchType iBranchType, ProductHolderIndexHelper const &)=0
std::vector< PathSummary > trigPathSummaries
Definition: TriggerReport.h:67
EventSummary eventSummary
Definition: TriggerReport.h:66
void limitOutput(ParameterSet const &proc_pset, BranchIDLists const &branchIDLists)
Definition: Schedule.cc:514
wantSummary_(tns.wantSummary())
int totalEvents() const
Definition: Schedule.cc:1073
virtual void openNewFileIfNeeded()=0
EventTimingSummary eventSummary
void clearCounters()
Clear all the counters in the trigger report.
Definition: Schedule.cc:1101
std::vector< PathTimingSummary > trigPathSummaries
bool terminate() const
Return whether each output module has reached its maximum count.
Definition: Schedule.cc:553
bool printDependencies_
Definition: Schedule.h:288
void respondToOpenInputFile(FileBlock const &fb)
Definition: Schedule.cc:887
edm::propagate_const< std::shared_ptr< ModuleRegistry > > moduleRegistry_
Definition: Schedule.h:277
virtual void writeLumi(LuminosityBlockPrincipal const &lbp, ProcessContext const *)=0
string key
FastSim: produces sample of signal events, overlayed with premixed minbias events.
void stopPath(StreamContext const &, PathContext const &, HLTPathStatus const &)
areg
Definition: Schedule.cc:374
void stopModuleEvent(StreamContext const &, ModuleCallingContext const &)
void postForkReacquireResources(unsigned int iChildIndex, unsigned int iNumberOfChildren)
Definition: Worker.h:92
void getTriggerReport(TriggerReport &rep) const
Definition: Schedule.cc:1055
virtual bool shouldWeCloseFile() const =0
PreallocationConfiguration preallocConfig_
Definition: Schedule.h:283
moduleRegistry_(new ModuleRegistry())
std::vector< edm::propagate_const< std::shared_ptr< StreamSchedule > > > streamSchedules_
Definition: Schedule.h:278
void sort_all(RandomAccessSequence &s)
wrappers for std::sort
Definition: Algorithms.h:120
volatile bool endpathsAreActive_
Definition: Schedule.h:290
void respondToOpenInputFile(FileBlock const &fb)
Definition: Worker.h:88
void startPath(StreamContext const &, PathContext const &)
bool search_all(ForwardSequence const &s, Datum const &d)
Definition: Algorithms.h:46
virtual std::unique_ptr< OutputModuleCommunicator > createOutputModuleCommunicator()=0
AllOutputModuleCommunicators all_output_communicators_
Definition: Schedule.h:282
void modulesInPath(std::string const &iPathLabel, std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel ...
Definition: Schedule.cc:990
void pauseModuleEvent(StreamContext const &, ModuleCallingContext const &)
void beginStream(unsigned int)
Definition: Schedule.cc:903
void respondToCloseInputFile(FileBlock const &fb)
Definition: Worker.h:89
void preForkReleaseResources()
Definition: Worker.h:91
bool wantSummary_
Definition: Schedule.h:287
void postForkReacquireResources(unsigned int iChildIndex, unsigned int iNumberOfChildren)
Definition: Schedule.cc:917
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
Definition: Schedule.cc:958
Strings const & getTrigPaths() const
void beginJob(ProductRegistry const &)
Definition: Schedule.cc:897
printDependencies_(printDependencies(proc_pset))
void openNewOutputFilesIfNeeded()
Definition: Schedule.cc:859
size_t getParameterSetNames(std::vector< std::string > &output, bool trackiness=true) const
void preForkReleaseResources()
Definition: Schedule.cc:913
void openOutputFiles(FileBlock &fb)
Definition: Schedule.cc:864
preg
Definition: Schedule.cc:374
Schedule(ParameterSet &proc_pset, service::TriggerNamesService &tns, ProductRegistry &pregistry, BranchIDListHelper &branchIDListHelper, ThinnedAssociationsHelper &thinnedAssociationsHelper, ExceptionToActionTable const &actions, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration > processConfiguration, bool hasSubprocesses, PreallocationConfiguration const &config, ProcessContext const *processContext)
Definition: Schedule.cc:362
std::vector< WorkerTimingSummary > workerSummaries
T mod(const T &a, const T &b)
Definition: ecalDccMap.h:4
void endJob(ExceptionCollector &collector)
Definition: Schedule.cc:569
void getTriggerTimingReport(TriggerTimingReport &rep) const
Definition: Schedule.cc:1065
bool shouldWeCloseOutput() const
Definition: Schedule.cc:879
std::vector< std::string > vstring
Definition: Schedule.cc:358
tuple size
Write out results.
void closeOutputFiles()
Definition: Schedule.cc:854
void endPaths(std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill the labels for all end paths in the process
Definition: Schedule.cc:985
void moduleDescriptionsInPath(std::string const &iPathLabel, std::vector< ModuleDescription const * > &descriptions, unsigned int hint) const
Definition: Schedule.cc:996
prealloc
Definition: Schedule.cc:374
unsigned int id() const
std::string match(BranchDescription const &a, BranchDescription const &b, std::string const &fileName)