CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
Schedule.cc
Go to the documentation of this file.
2 
24 
25 #include "boost/bind.hpp"
26 #include "boost/ref.hpp"
27 
28 #include "boost/graph/graph_traits.hpp"
29 #include "boost/graph/adjacency_list.hpp"
30 #include "boost/graph/depth_first_search.hpp"
31 #include "boost/graph/visitors.hpp"
32 
33 
34 #include <algorithm>
35 #include <cassert>
36 #include <cstdlib>
37 #include <functional>
38 #include <iomanip>
39 #include <list>
40 #include <map>
41 #include <exception>
42 #include <sstream>
43 
44 namespace edm {
45  namespace {
46  bool binary_search_string(std::vector<std::string> const& v, std::string const& s) {
47  return std::binary_search(v.begin(), v.end(), s);
48  }
49 
50  // Here we make the trigger results inserter directly. This should
51  // probably be a utility in the WorkerRegistry or elsewhere.
52 
53  std::shared_ptr<TriggerResultInserter>
54  makeInserter(ParameterSet& proc_pset,
55  PreallocationConfiguration const& iPrealloc,
56  ProductRegistry& preg,
57  ExceptionToActionTable const& actions,
58  boost::shared_ptr<ActivityRegistry> areg,
59  boost::shared_ptr<ProcessConfiguration> processConfiguration) {
60 
61  ParameterSet* trig_pset = proc_pset.getPSetForUpdate("@trigger_paths");
62  trig_pset->registerIt();
63 
64  WorkerParams work_args(trig_pset, preg, &iPrealloc, processConfiguration, actions);
65  ModuleDescription md(trig_pset->id(),
66  "TriggerResultInserter",
67  "TriggerResults",
68  processConfiguration.get(),
70 
71  areg->preModuleConstructionSignal_(md);
72  bool postCalled = false;
73  std::shared_ptr<TriggerResultInserter> returnValue;
74  try {
75  maker::ModuleHolderT<TriggerResultInserter> holder(new TriggerResultInserter(*trig_pset, iPrealloc.numberOfStreams()),static_cast<Maker const*>(nullptr));
76  holder.setModuleDescription(md);
77  holder.registerProductsAndCallbacks(&preg);
78  returnValue.reset(holder.release());
79  postCalled = true;
80  // if exception then post will be called in the catch block
81  areg->postModuleConstructionSignal_(md);
82  }
83  catch (...) {
84  if(!postCalled) {
85  try {
86  areg->postModuleConstructionSignal_(md);
87  }
88  catch (...) {
89  // If post throws an exception ignore it because we are already handling another exception
90  }
91  }
92  throw;
93  }
94  return returnValue;
95  }
96 
97 
98  void
99  checkAndInsertAlias(std::string const& friendlyClassName,
100  std::string const& moduleLabel,
101  std::string const& productInstanceName,
102  std::string const& processName,
103  std::string const& alias,
104  std::string const& instanceAlias,
105  ProductRegistry const& preg,
106  std::multimap<BranchKey, BranchKey>& aliasMap,
107  std::map<BranchKey, BranchKey>& aliasKeys) {
108  std::string const star("*");
109 
110  BranchKey key(friendlyClassName, moduleLabel, productInstanceName, processName);
111  if(preg.productList().find(key) == preg.productList().end()) {
112  // No product was found matching the alias.
113  // We throw an exception only if a module with the specified module label was created in this process.
114  for(auto const& product : preg.productList()) {
115  if(moduleLabel == product.first.moduleLabel() && processName == product.first.processName()) {
116  throw Exception(errors::Configuration, "EDAlias does not match data\n")
117  << "There are no products of type '" << friendlyClassName << "'\n"
118  << "with module label '" << moduleLabel << "' and instance name '" << productInstanceName << "'.\n";
119  }
120  }
121  }
122 
123  std::string const& theInstanceAlias(instanceAlias == star ? productInstanceName : instanceAlias);
124  BranchKey aliasKey(friendlyClassName, alias, theInstanceAlias, processName);
125  if(preg.productList().find(aliasKey) != preg.productList().end()) {
126  throw Exception(errors::Configuration, "EDAlias conflicts with data\n")
127  << "A product of type '" << friendlyClassName << "'\n"
128  << "with module label '" << alias << "' and instance name '" << theInstanceAlias << "'\n"
129  << "already exists.\n";
130  }
131  auto iter = aliasKeys.find(aliasKey);
132  if(iter != aliasKeys.end()) {
133  // The alias matches a previous one. If the same alias is used for different product, throw.
134  if(iter->second != key) {
135  throw Exception(errors::Configuration, "EDAlias conflict\n")
136  << "The module label alias '" << alias << "' and product instance alias '" << theInstanceAlias << "'\n"
137  << "are used for multiple products of type '" << friendlyClassName << "'\n"
138  << "One has module label '" << moduleLabel << "' and product instance name '" << productInstanceName << "',\n"
139  << "the other has module label '" << iter->second.moduleLabel() << "' and product instance name '" << iter->second.productInstanceName() << "'.\n";
140  }
141  } else {
142  auto prodIter = preg.productList().find(key);
143  if(prodIter != preg.productList().end()) {
144  if (!prodIter->second.produced()) {
145  throw Exception(errors::Configuration, "EDAlias\n")
146  << "The module label alias '" << alias << "' and product instance alias '" << theInstanceAlias << "'\n"
147  << "are used for a product of type '" << friendlyClassName << "'\n"
148  << "with module label '" << moduleLabel << "' and product instance name '" << productInstanceName << "',\n"
149  << "An EDAlias can only be used for products produced in the current process. This one is not.\n";
150  }
151  aliasMap.insert(std::make_pair(key, aliasKey));
152  aliasKeys.insert(std::make_pair(aliasKey, key));
153  }
154  }
155  }
156 
157  void
158  processEDAliases(ParameterSet const& proc_pset, std::string const& processName, ProductRegistry& preg) {
159  std::vector<std::string> aliases = proc_pset.getParameter<std::vector<std::string> >("@all_aliases");
160  if(aliases.empty()) {
161  return;
162  }
163  std::string const star("*");
164  std::string const empty("");
166  desc.add<std::string>("type");
167  desc.add<std::string>("fromProductInstance", star);
168  desc.add<std::string>("toProductInstance", star);
169 
170  std::multimap<BranchKey, BranchKey> aliasMap;
171 
172  std::map<BranchKey, BranchKey> aliasKeys; // Used to search for duplicates or clashes.
173 
174  // Now, loop over the alias information and store it in aliasMap.
175  for(std::string const& alias : aliases) {
176  ParameterSet const& aliasPSet = proc_pset.getParameterSet(alias);
177  std::vector<std::string> vPSetNames = aliasPSet.getParameterNamesForType<VParameterSet>();
178  for(std::string const& moduleLabel : vPSetNames) {
179  VParameterSet vPSet = aliasPSet.getParameter<VParameterSet>(moduleLabel);
180  for(ParameterSet& pset : vPSet) {
181  desc.validate(pset);
182  std::string friendlyClassName = pset.getParameter<std::string>("type");
183  std::string productInstanceName = pset.getParameter<std::string>("fromProductInstance");
184  std::string instanceAlias = pset.getParameter<std::string>("toProductInstance");
185  if(productInstanceName == star) {
186  bool match = false;
187  BranchKey lowerBound(friendlyClassName, moduleLabel, empty, empty);
188  for(ProductRegistry::ProductList::const_iterator it = preg.productList().lower_bound(lowerBound);
189  it != preg.productList().end() && it->first.friendlyClassName() == friendlyClassName && it->first.moduleLabel() == moduleLabel;
190  ++it) {
191  if(it->first.processName() != processName) {
192  continue;
193  }
194  match = true;
195 
196  checkAndInsertAlias(friendlyClassName, moduleLabel, it->first.productInstanceName(), processName, alias, instanceAlias, preg, aliasMap, aliasKeys);
197  }
198  if(!match) {
199  // No product was found matching the alias.
200  // We throw an exception only if a module with the specified module label was created in this process.
201  for(auto const& product : preg.productList()) {
202  if(moduleLabel == product.first.moduleLabel() && processName == product.first.processName()) {
203  throw Exception(errors::Configuration, "EDAlias parameter set mismatch\n")
204  << "There are no products of type '" << friendlyClassName << "'\n"
205  << "with module label '" << moduleLabel << "'.\n";
206  }
207  }
208  }
209  } else {
210  checkAndInsertAlias(friendlyClassName, moduleLabel, productInstanceName, processName, alias, instanceAlias, preg, aliasMap, aliasKeys);
211  }
212  }
213  }
214  }
215 
216 
217  // Now add the new alias entries to the product registry.
218  for(auto const& aliasEntry : aliasMap) {
219  ProductRegistry::ProductList::const_iterator it = preg.productList().find(aliasEntry.first);
220  assert(it != preg.productList().end());
221  preg.addLabelAlias(it->second, aliasEntry.second.moduleLabel(), aliasEntry.second.productInstanceName());
222  }
223 
224  }
225 
226  typedef std::vector<std::string> vstring;
227 
228  void reduceParameterSet(ParameterSet& proc_pset,
229  vstring const& end_path_name_list,
230  vstring& modulesInConfig,
231  std::set<std::string> const& usedModuleLabels,
232  std::map<std::string, std::vector<std::pair<std::string, int> > >& outputModulePathPositions) {
233  // Before calculating the ParameterSetID of the top level ParameterSet or
234  // saving it in the registry drop from the top level ParameterSet all
235  // OutputModules and EDAnalyzers not on trigger paths. If unscheduled
236  // production is not enabled also drop all the EDFilters and EDProducers
237  // that are not scheduled. Drop the ParameterSet used to configure the module
238  // itself. Also drop the other traces of these labels in the top level
239  // ParameterSet: Remove that labels from @all_modules and from all the
240  // end paths. If this makes any end paths empty, then remove the end path
241  // name from @end_paths, and @paths.
242 
243  // First make a list of labels to drop
244  vstring outputModuleLabels;
245  std::string edmType;
246  std::string const moduleEdmType("@module_edm_type");
247  std::string const outputModule("OutputModule");
248  std::string const edAnalyzer("EDAnalyzer");
249  std::string const edFilter("EDFilter");
250  std::string const edProducer("EDProducer");
251 
252  std::set<std::string> modulesInConfigSet(modulesInConfig.begin(), modulesInConfig.end());
253 
254  //need a list of all modules on paths in order to determine
255  // if an EDAnalyzer only appears on an end path
256  vstring scheduledPaths = proc_pset.getParameter<vstring>("@paths");
257  std::set<std::string> modulesOnPaths;
258  {
259  std::set<std::string> noEndPaths(scheduledPaths.begin(),scheduledPaths.end());
260  for(auto const& endPath: end_path_name_list) {
261  noEndPaths.erase(endPath);
262  }
263  {
264  vstring labels;
265  for(auto const& path: noEndPaths) {
266  labels = proc_pset.getParameter<vstring>(path);
267  modulesOnPaths.insert(labels.begin(),labels.end());
268  }
269  }
270  }
271  //Initially fill labelsToBeDropped with all module mentioned in
272  // the configuration but which are not being used by the system
273  std::vector<std::string> labelsToBeDropped;
274  labelsToBeDropped.reserve(modulesInConfigSet.size());
275  std::set_difference(modulesInConfigSet.begin(),modulesInConfigSet.end(),
276  usedModuleLabels.begin(),usedModuleLabels.end(),
277  std::back_inserter(labelsToBeDropped));
278 
279  const unsigned int sizeBeforeOutputModules = labelsToBeDropped.size();
280  for (auto const& modLabel: usedModuleLabels) {
281  edmType = proc_pset.getParameterSet(modLabel).getParameter<std::string>(moduleEdmType);
282  if (edmType == outputModule) {
283  outputModuleLabels.push_back(modLabel);
284  labelsToBeDropped.push_back(modLabel);
285  }
286  if(edmType == edAnalyzer) {
287  if(modulesOnPaths.end()==modulesOnPaths.find(modLabel)) {
288  labelsToBeDropped.push_back(modLabel);
289  }
290  }
291  }
292  //labelsToBeDropped must be sorted
293  std::inplace_merge(labelsToBeDropped.begin(),
294  labelsToBeDropped.begin()+sizeBeforeOutputModules,
295  labelsToBeDropped.end());
296 
297  // drop the parameter sets used to configure the modules
298  for_all(labelsToBeDropped, boost::bind(&ParameterSet::eraseOrSetUntrackedParameterSet, boost::ref(proc_pset), _1));
299 
300  // drop the labels from @all_modules
301  vstring::iterator endAfterRemove = std::remove_if(modulesInConfig.begin(), modulesInConfig.end(), boost::bind(binary_search_string, boost::ref(labelsToBeDropped), _1));
302  modulesInConfig.erase(endAfterRemove, modulesInConfig.end());
303  proc_pset.addParameter<vstring>(std::string("@all_modules"), modulesInConfig);
304 
305  // drop the labels from all end paths
306  vstring endPathsToBeDropped;
307  vstring labels;
308  for (vstring::const_iterator iEndPath = end_path_name_list.begin(), endEndPath = end_path_name_list.end();
309  iEndPath != endEndPath;
310  ++iEndPath) {
311  labels = proc_pset.getParameter<vstring>(*iEndPath);
312  vstring::iterator iSave = labels.begin();
313  vstring::iterator iBegin = labels.begin();
314 
315  for (vstring::iterator iLabel = labels.begin(), iEnd = labels.end();
316  iLabel != iEnd; ++iLabel) {
317  if (binary_search_string(labelsToBeDropped, *iLabel)) {
318  if (binary_search_string(outputModuleLabels, *iLabel)) {
319  outputModulePathPositions[*iLabel].emplace_back(*iEndPath, iSave - iBegin);
320  }
321  } else {
322  if (iSave != iLabel) {
323  iSave->swap(*iLabel);
324  }
325  ++iSave;
326  }
327  }
328  labels.erase(iSave, labels.end());
329  if (labels.empty()) {
330  // remove empty end paths and save their names
331  proc_pset.eraseSimpleParameter(*iEndPath);
332  endPathsToBeDropped.push_back(*iEndPath);
333  } else {
334  proc_pset.addParameter<vstring>(*iEndPath, labels);
335  }
336  }
337  sort_all(endPathsToBeDropped);
338 
339  // remove empty end paths from @paths
340  endAfterRemove = std::remove_if(scheduledPaths.begin(), scheduledPaths.end(), boost::bind(binary_search_string, boost::ref(endPathsToBeDropped), _1));
341  scheduledPaths.erase(endAfterRemove, scheduledPaths.end());
342  proc_pset.addParameter<vstring>(std::string("@paths"), scheduledPaths);
343 
344  // remove empty end paths from @end_paths
345  vstring scheduledEndPaths = proc_pset.getParameter<vstring>("@end_paths");
346  endAfterRemove = std::remove_if(scheduledEndPaths.begin(), scheduledEndPaths.end(), boost::bind(binary_search_string, boost::ref(endPathsToBeDropped), _1));
347  scheduledEndPaths.erase(endAfterRemove, scheduledEndPaths.end());
348  proc_pset.addParameter<vstring>(std::string("@end_paths"), scheduledEndPaths);
349 
350  }
351  }
352  // -----------------------------
353 
354  typedef std::vector<std::string> vstring;
355 
356  // -----------------------------
357 
360  ProductRegistry& preg,
361  BranchIDListHelper& branchIDListHelper,
362  ExceptionToActionTable const& actions,
363  boost::shared_ptr<ActivityRegistry> areg,
364  boost::shared_ptr<ProcessConfiguration> processConfiguration,
365  const ParameterSet* subProcPSet,
367  ProcessContext const* processContext) :
368  //Only create a resultsInserter if there is a trigger path
369  resultsInserter_{tns.getTrigPaths().empty()? std::shared_ptr<TriggerResultInserter>{} :makeInserter(proc_pset,prealloc,preg,actions,areg,processConfiguration)},
370  moduleRegistry_(new ModuleRegistry()),
373  wantSummary_(tns.wantSummary()),
375  {
376  assert(0<prealloc.numberOfStreams());
377  streamSchedules_.reserve(prealloc.numberOfStreams());
378  for(unsigned int i=0; i<prealloc.numberOfStreams();++i) {
379  streamSchedules_.emplace_back(std::shared_ptr<StreamSchedule>{new StreamSchedule{resultsInserter_.get(),moduleRegistry_,proc_pset,tns,prealloc, preg,branchIDListHelper,actions,areg,processConfiguration,nullptr==subProcPSet,StreamID{i},processContext}});
380  }
381 
382  //TriggerResults are injected automatically by StreamSchedules and are
383  // unknown to the ModuleRegistry
384  const std::string kTriggerResults("TriggerResults");
385  std::vector<std::string> modulesToUse;
386  modulesToUse.reserve(streamSchedules_[0]->allWorkers().size());
387  for(auto const& worker : streamSchedules_[0]->allWorkers()) {
388  if(worker->description().moduleLabel() != kTriggerResults) {
389  modulesToUse.push_back(worker->description().moduleLabel());
390  }
391  }
392  //The unscheduled modules are at the end of the list, but we want them at the front
393  unsigned int n = streamSchedules_[0]->numberOfUnscheduledModules();
394  if(n>0) {
395  std::vector<std::string> temp;
396  temp.reserve(modulesToUse.size());
397  auto itBeginUnscheduled = modulesToUse.begin()+modulesToUse.size()-n;
398  std::copy(itBeginUnscheduled,modulesToUse.end(),
399  std::back_inserter(temp));
400  std::copy(modulesToUse.begin(),itBeginUnscheduled,std::back_inserter(temp));
401  temp.swap(modulesToUse);
402  }
403  globalSchedule_.reset( new GlobalSchedule{ resultsInserter_.get(),
405  modulesToUse,
406  proc_pset, preg, prealloc,
407  actions,areg,processConfiguration,processContext });
408 
409  //TriggerResults is not in the top level ParameterSet so the call to
410  // reduceParameterSet would fail to find it. Just remove it up front.
411  std::set<std::string> usedModuleLabels;
412  for( auto const worker: allWorkers()) {
413  if(worker->description().moduleLabel() != kTriggerResults) {
414  usedModuleLabels.insert(worker->description().moduleLabel());
415  }
416  }
417  std::vector<std::string> modulesInConfig(proc_pset.getParameter<std::vector<std::string> >("@all_modules"));
418  std::map<std::string, std::vector<std::pair<std::string, int> > > outputModulePathPositions;
419  reduceParameterSet(proc_pset, tns.getEndPaths(), modulesInConfig, usedModuleLabels,
420  outputModulePathPositions);
421  processEDAliases(proc_pset, processConfiguration->processName(), preg);
422  proc_pset.registerIt();
423  pset::setProcessParameterSetID(proc_pset.id());
424  processConfiguration->setParameterSetID(proc_pset.id());
425  processConfiguration->setProcessConfigurationID();
426 
427  // This is used for a little sanity-check to make sure no code
428  // modifications alter the number of workers at a later date.
429  size_t all_workers_count = allWorkers().size();
430 
431  moduleRegistry_->forAllModuleHolders([this](maker::ModuleHolder* iHolder){
432  auto comm = iHolder->createOutputModuleCommunicator();
433  if (comm) {
434  all_output_communicators_.emplace_back(boost::shared_ptr<OutputModuleCommunicator>{comm.release()});
435  }
436  });
437  // Now that the output workers are filled in, set any output limits or information.
438  limitOutput(proc_pset, branchIDListHelper.branchIDLists());
439 
441 
442  // Sanity check: make sure nobody has added a worker after we've
443  // already relied on the WorkerManager being full.
444  assert (all_workers_count == allWorkers().size());
445 
446  branchIDListHelper.updateFromRegistry(preg);
447 
448  preg.setFrozen();
449 
450  for (auto c : all_output_communicators_) {
451  c->setEventSelectionInfo(outputModulePathPositions, preg.anyProductProduced());
452  c->selectProducts(preg);
453  }
454 
455  } // Schedule::Schedule
456 
457 
458  void
459  Schedule::limitOutput(ParameterSet const& proc_pset, BranchIDLists const& branchIDLists) {
460  std::string const output("output");
461 
462  ParameterSet const& maxEventsPSet = proc_pset.getUntrackedParameterSet("maxEvents", ParameterSet());
463  int maxEventSpecs = 0;
464  int maxEventsOut = -1;
465  ParameterSet const* vMaxEventsOut = 0;
466  std::vector<std::string> intNamesE = maxEventsPSet.getParameterNamesForType<int>(false);
467  if (search_all(intNamesE, output)) {
468  maxEventsOut = maxEventsPSet.getUntrackedParameter<int>(output);
469  ++maxEventSpecs;
470  }
471  std::vector<std::string> psetNamesE;
472  maxEventsPSet.getParameterSetNames(psetNamesE, false);
473  if (search_all(psetNamesE, output)) {
474  vMaxEventsOut = &maxEventsPSet.getUntrackedParameterSet(output);
475  ++maxEventSpecs;
476  }
477 
478  if (maxEventSpecs > 1) {
480  "\nAt most, one form of 'output' may appear in the 'maxEvents' parameter set";
481  }
482 
483  for (auto c : all_output_communicators_) {
484  OutputModuleDescription desc(branchIDLists, maxEventsOut);
485  if (vMaxEventsOut != 0 && !vMaxEventsOut->empty()) {
486  std::string const& moduleLabel = c->description().moduleLabel();
487  try {
488  desc.maxEvents_ = vMaxEventsOut->getUntrackedParameter<int>(moduleLabel);
489  } catch (Exception const&) {
491  "\nNo entry in 'maxEvents' for output module label '" << moduleLabel << "'.\n";
492  }
493  }
494  c->configure(desc);
495  }
496  }
497 
498  bool Schedule::terminate() const {
499  if (all_output_communicators_.empty()) {
500  return false;
501  }
502  for (auto c : all_output_communicators_) {
503  if (!c->limitReached()) {
504  // Found an output module that has not reached output event count.
505  return false;
506  }
507  }
508  LogInfo("SuccessfulTermination")
509  << "The job is terminating successfully because each output module\n"
510  << "has reached its configured limit.\n";
511  return true;
512  }
513 
515  globalSchedule_->endJob(collector);
516  if (collector.hasThrown()) {
517  return;
518  }
519 
520  if (wantSummary_ == false) return;
521  {
522  TriggerReport tr;
523  getTriggerReport(tr);
524 
525  // The trigger report (pass/fail etc.):
526 
527  LogVerbatim("FwkSummary") << "";
528  LogVerbatim("FwkSummary") << "TrigReport " << "---------- Event Summary ------------";
529  if(!tr.trigPathSummaries.empty()) {
530  LogVerbatim("FwkSummary") << "TrigReport"
531  << " Events total = " << tr.eventSummary.totalEvents
532  << " passed = " << tr.eventSummary.totalEventsPassed
533  << " failed = " << tr.eventSummary.totalEventsFailed
534  << "";
535  } else {
536  LogVerbatim("FwkSummary") << "TrigReport"
537  << " Events total = " << tr.eventSummary.totalEvents
538  << " passed = " << tr.eventSummary.totalEvents
539  << " failed = 0";
540  }
541 
542  LogVerbatim("FwkSummary") << "";
543  LogVerbatim("FwkSummary") << "TrigReport " << "---------- Path Summary ------------";
544  LogVerbatim("FwkSummary") << "TrigReport "
545  << std::right << std::setw(10) << "Trig Bit#" << " "
546  << std::right << std::setw(10) << "Run" << " "
547  << std::right << std::setw(10) << "Passed" << " "
548  << std::right << std::setw(10) << "Failed" << " "
549  << std::right << std::setw(10) << "Error" << " "
550  << "Name" << "";
551  for (auto const& p: tr.trigPathSummaries) {
552  LogVerbatim("FwkSummary") << "TrigReport "
553  << std::right << std::setw(5) << 1
554  << std::right << std::setw(5) << p.bitPosition << " "
555  << std::right << std::setw(10) << p.timesRun << " "
556  << std::right << std::setw(10) << p.timesPassed << " "
557  << std::right << std::setw(10) << p.timesFailed << " "
558  << std::right << std::setw(10) << p.timesExcept << " "
559  << p.name << "";
560  }
561 
562  /*
563  std::vector<int>::const_iterator epi = empty_trig_paths_.begin();
564  std::vector<int>::const_iterator epe = empty_trig_paths_.end();
565  std::vector<std::string>::const_iterator epn = empty_trig_path_names_.begin();
566  for (; epi != epe; ++epi, ++epn) {
567 
568  LogVerbatim("FwkSummary") << "TrigReport "
569  << std::right << std::setw(5) << 1
570  << std::right << std::setw(5) << *epi << " "
571  << std::right << std::setw(10) << totalEvents() << " "
572  << std::right << std::setw(10) << totalEvents() << " "
573  << std::right << std::setw(10) << 0 << " "
574  << std::right << std::setw(10) << 0 << " "
575  << *epn << "";
576  }
577  */
578 
579  LogVerbatim("FwkSummary") << "";
580  LogVerbatim("FwkSummary") << "TrigReport " << "-------End-Path Summary ------------";
581  LogVerbatim("FwkSummary") << "TrigReport "
582  << std::right << std::setw(10) << "Trig Bit#" << " "
583  << std::right << std::setw(10) << "Run" << " "
584  << std::right << std::setw(10) << "Passed" << " "
585  << std::right << std::setw(10) << "Failed" << " "
586  << std::right << std::setw(10) << "Error" << " "
587  << "Name" << "";
588  for (auto const& p: tr.endPathSummaries) {
589  LogVerbatim("FwkSummary") << "TrigReport "
590  << std::right << std::setw(5) << 0
591  << std::right << std::setw(5) << p.bitPosition << " "
592  << std::right << std::setw(10) << p.timesRun << " "
593  << std::right << std::setw(10) << p.timesPassed << " "
594  << std::right << std::setw(10) << p.timesFailed << " "
595  << std::right << std::setw(10) << p.timesExcept << " "
596  << p.name << "";
597  }
598 
599  for (auto const& p: tr.trigPathSummaries) {
600  LogVerbatim("FwkSummary") << "";
601  LogVerbatim("FwkSummary") << "TrigReport " << "---------- Modules in Path: " << p.name << " ------------";
602  LogVerbatim("FwkSummary") << "TrigReport "
603  << std::right << std::setw(10) << "Trig Bit#" << " "
604  << std::right << std::setw(10) << "Visited" << " "
605  << std::right << std::setw(10) << "Passed" << " "
606  << std::right << std::setw(10) << "Failed" << " "
607  << std::right << std::setw(10) << "Error" << " "
608  << "Name" << "";
609 
610  unsigned int bitpos = 0;
611  for (auto const& mod: p.moduleInPathSummaries) {
612  LogVerbatim("FwkSummary") << "TrigReport "
613  << std::right << std::setw(5) << 1
614  << std::right << std::setw(5) << bitpos << " "
615  << std::right << std::setw(10) << mod.timesVisited << " "
616  << std::right << std::setw(10) << mod.timesPassed << " "
617  << std::right << std::setw(10) << mod.timesFailed << " "
618  << std::right << std::setw(10) << mod.timesExcept << " "
619  << mod.moduleLabel << "";
620  ++bitpos;
621  }
622  }
623 
624  for (auto const& p: tr.endPathSummaries) {
625  LogVerbatim("FwkSummary") << "";
626  LogVerbatim("FwkSummary") << "TrigReport " << "------ Modules in End-Path: " << p.name << " ------------";
627  LogVerbatim("FwkSummary") << "TrigReport "
628  << std::right << std::setw(10) << "Trig Bit#" << " "
629  << std::right << std::setw(10) << "Visited" << " "
630  << std::right << std::setw(10) << "Passed" << " "
631  << std::right << std::setw(10) << "Failed" << " "
632  << std::right << std::setw(10) << "Error" << " "
633  << "Name" << "";
634 
635  unsigned int bitpos=0;
636  for (auto const& mod: p.moduleInPathSummaries) {
637  LogVerbatim("FwkSummary") << "TrigReport "
638  << std::right << std::setw(5) << 0
639  << std::right << std::setw(5) << bitpos << " "
640  << std::right << std::setw(10) << mod.timesVisited << " "
641  << std::right << std::setw(10) << mod.timesPassed << " "
642  << std::right << std::setw(10) << mod.timesFailed << " "
643  << std::right << std::setw(10) << mod.timesExcept << " "
644  << mod.moduleLabel << "";
645  ++bitpos;
646  }
647  }
648 
649  LogVerbatim("FwkSummary") << "";
650  LogVerbatim("FwkSummary") << "TrigReport " << "---------- Module Summary ------------";
651  LogVerbatim("FwkSummary") << "TrigReport "
652  << std::right << std::setw(10) << "Visited" << " "
653  << std::right << std::setw(10) << "Run" << " "
654  << std::right << std::setw(10) << "Passed" << " "
655  << std::right << std::setw(10) << "Failed" << " "
656  << std::right << std::setw(10) << "Error" << " "
657  << "Name" << "";
658  for (auto const& worker : tr.workerSummaries) {
659  LogVerbatim("FwkSummary") << "TrigReport "
660  << std::right << std::setw(10) << worker.timesVisited << " "
661  << std::right << std::setw(10) << worker.timesRun << " "
662  << std::right << std::setw(10) << worker.timesPassed << " "
663  << std::right << std::setw(10) << worker.timesFailed << " "
664  << std::right << std::setw(10) << worker.timesExcept << " "
665  << worker.moduleLabel << "";
666  }
667  LogVerbatim("FwkSummary") << "";
668  }
669  // The timing report (CPU and Real Time):
672 
673  const int totalEvents = std::max(1, tr.eventSummary.totalEvents);
674 
675  LogVerbatim("FwkSummary") << "TimeReport " << "---------- Event Summary ---[sec]----";
676  LogVerbatim("FwkSummary") << "TimeReport"
677  << std::setprecision(6) << std::fixed
678  << " CPU/event = " << tr.eventSummary.cpuTime/totalEvents
679  << " Real/event = " << tr.eventSummary.realTime/totalEvents
680  << "";
681 
682  LogVerbatim("FwkSummary") << "";
683  LogVerbatim("FwkSummary") << "TimeReport " << "---------- Path Summary ---[sec]----";
684  LogVerbatim("FwkSummary") << "TimeReport "
685  << std::right << std::setw(22) << "per event "
686  << std::right << std::setw(22) << "per path-run "
687  << "";
688  LogVerbatim("FwkSummary") << "TimeReport "
689  << std::right << std::setw(10) << "CPU" << " "
690  << std::right << std::setw(10) << "Real" << " "
691  << std::right << std::setw(10) << "CPU" << " "
692  << std::right << std::setw(10) << "Real" << " "
693  << "Name" << "";
694  for (auto const& p: tr.trigPathSummaries) {
695  const int timesRun = std::max(1, p.timesRun);
696  LogVerbatim("FwkSummary") << "TimeReport "
697  << std::setprecision(6) << std::fixed
698  << std::right << std::setw(10) << p.cpuTime/totalEvents << " "
699  << std::right << std::setw(10) << p.realTime/totalEvents << " "
700  << std::right << std::setw(10) << p.cpuTime/timesRun << " "
701  << std::right << std::setw(10) << p.realTime/timesRun << " "
702  << p.name << "";
703  }
704  LogVerbatim("FwkSummary") << "TimeReport "
705  << std::right << std::setw(10) << "CPU" << " "
706  << std::right << std::setw(10) << "Real" << " "
707  << std::right << std::setw(10) << "CPU" << " "
708  << std::right << std::setw(10) << "Real" << " "
709  << "Name" << "";
710  LogVerbatim("FwkSummary") << "TimeReport "
711  << std::right << std::setw(22) << "per event "
712  << std::right << std::setw(22) << "per path-run "
713  << "";
714 
715  LogVerbatim("FwkSummary") << "";
716  LogVerbatim("FwkSummary") << "TimeReport " << "-------End-Path Summary ---[sec]----";
717  LogVerbatim("FwkSummary") << "TimeReport "
718  << std::right << std::setw(22) << "per event "
719  << std::right << std::setw(22) << "per endpath-run "
720  << "";
721  LogVerbatim("FwkSummary") << "TimeReport "
722  << std::right << std::setw(10) << "CPU" << " "
723  << std::right << std::setw(10) << "Real" << " "
724  << std::right << std::setw(10) << "CPU" << " "
725  << std::right << std::setw(10) << "Real" << " "
726  << "Name" << "";
727  for (auto const& p: tr.endPathSummaries) {
728  const int timesRun = std::max(1, p.timesRun);
729 
730  LogVerbatim("FwkSummary") << "TimeReport "
731  << std::setprecision(6) << std::fixed
732  << std::right << std::setw(10) << p.cpuTime/totalEvents << " "
733  << std::right << std::setw(10) << p.realTime/totalEvents << " "
734  << std::right << std::setw(10) << p.cpuTime/timesRun << " "
735  << std::right << std::setw(10) << p.realTime/timesRun << " "
736  << p.name << "";
737  }
738  LogVerbatim("FwkSummary") << "TimeReport "
739  << std::right << std::setw(10) << "CPU" << " "
740  << std::right << std::setw(10) << "Real" << " "
741  << std::right << std::setw(10) << "CPU" << " "
742  << std::right << std::setw(10) << "Real" << " "
743  << "Name" << "";
744  LogVerbatim("FwkSummary") << "TimeReport "
745  << std::right << std::setw(22) << "per event "
746  << std::right << std::setw(22) << "per endpath-run "
747  << "";
748 
749  for (auto const& p: tr.trigPathSummaries) {
750  LogVerbatim("FwkSummary") << "";
751  LogVerbatim("FwkSummary") << "TimeReport " << "---------- Modules in Path: " << p.name << " ---[sec]----";
752  LogVerbatim("FwkSummary") << "TimeReport "
753  << std::right << std::setw(22) << "per event "
754  << std::right << std::setw(22) << "per module-visit "
755  << "";
756  LogVerbatim("FwkSummary") << "TimeReport "
757  << std::right << std::setw(10) << "CPU" << " "
758  << std::right << std::setw(10) << "Real" << " "
759  << std::right << std::setw(10) << "CPU" << " "
760  << std::right << std::setw(10) << "Real" << " "
761  << "Name" << "";
762  for (auto const& mod: p.moduleInPathSummaries) {
763  LogVerbatim("FwkSummary") << "TimeReport "
764  << std::setprecision(6) << std::fixed
765  << std::right << std::setw(10) << mod.cpuTime/totalEvents << " "
766  << std::right << std::setw(10) << mod.realTime/totalEvents << " "
767  << std::right << std::setw(10) << mod.cpuTime/std::max(1, mod.timesVisited) << " "
768  << std::right << std::setw(10) << mod.realTime/std::max(1, mod.timesVisited) << " "
769  << mod.moduleLabel << "";
770  }
771  }
772  LogVerbatim("FwkSummary") << "TimeReport "
773  << std::right << std::setw(10) << "CPU" << " "
774  << std::right << std::setw(10) << "Real" << " "
775  << std::right << std::setw(10) << "CPU" << " "
776  << std::right << std::setw(10) << "Real" << " "
777  << "Name" << "";
778  LogVerbatim("FwkSummary") << "TimeReport "
779  << std::right << std::setw(22) << "per event "
780  << std::right << std::setw(22) << "per module-visit "
781  << "";
782 
783  for (auto const& p: tr.endPathSummaries) {
784  LogVerbatim("FwkSummary") << "";
785  LogVerbatim("FwkSummary") << "TimeReport " << "------ Modules in End-Path: " << p.name << " ---[sec]----";
786  LogVerbatim("FwkSummary") << "TimeReport "
787  << std::right << std::setw(22) << "per event "
788  << std::right << std::setw(22) << "per module-visit "
789  << "";
790  LogVerbatim("FwkSummary") << "TimeReport "
791  << std::right << std::setw(10) << "CPU" << " "
792  << std::right << std::setw(10) << "Real" << " "
793  << std::right << std::setw(10) << "CPU" << " "
794  << std::right << std::setw(10) << "Real" << " "
795  << "Name" << "";
796  for (auto const& mod: p.moduleInPathSummaries) {
797  LogVerbatim("FwkSummary") << "TimeReport "
798  << std::setprecision(6) << std::fixed
799  << std::right << std::setw(10) << mod.cpuTime/totalEvents << " "
800  << std::right << std::setw(10) << mod.realTime/totalEvents << " "
801  << std::right << std::setw(10) << mod.cpuTime/std::max(1, mod.timesVisited) << " "
802  << std::right << std::setw(10) << mod.realTime/std::max(1, mod.timesVisited) << " "
803  << mod.moduleLabel << "";
804  }
805  }
806  LogVerbatim("FwkSummary") << "TimeReport "
807  << std::right << std::setw(10) << "CPU" << " "
808  << std::right << std::setw(10) << "Real" << " "
809  << std::right << std::setw(10) << "CPU" << " "
810  << std::right << std::setw(10) << "Real" << " "
811  << "Name" << "";
812  LogVerbatim("FwkSummary") << "TimeReport "
813  << std::right << std::setw(22) << "per event "
814  << std::right << std::setw(22) << "per module-visit "
815  << "";
816 
817  LogVerbatim("FwkSummary") << "";
818  LogVerbatim("FwkSummary") << "TimeReport " << "---------- Module Summary ---[sec]----";
819  LogVerbatim("FwkSummary") << "TimeReport "
820  << std::right << std::setw(22) << "per event "
821  << std::right << std::setw(22) << "per module-run "
822  << std::right << std::setw(22) << "per module-visit "
823  << "";
824  LogVerbatim("FwkSummary") << "TimeReport "
825  << std::right << std::setw(10) << "CPU" << " "
826  << std::right << std::setw(10) << "Real" << " "
827  << std::right << std::setw(10) << "CPU" << " "
828  << std::right << std::setw(10) << "Real" << " "
829  << std::right << std::setw(10) << "CPU" << " "
830  << std::right << std::setw(10) << "Real" << " "
831  << "Name" << "";
832  for (auto const& worker : tr.workerSummaries) {
833  LogVerbatim("FwkSummary") << "TimeReport "
834  << std::setprecision(6) << std::fixed
835  << std::right << std::setw(10) << worker.cpuTime/totalEvents << " "
836  << std::right << std::setw(10) << worker.realTime/totalEvents << " "
837  << std::right << std::setw(10) << worker.cpuTime/std::max(1, worker.timesRun) << " "
838  << std::right << std::setw(10) << worker.realTime/std::max(1, worker.timesRun) << " "
839  << std::right << std::setw(10) << worker.cpuTime/std::max(1, worker.timesVisited) << " "
840  << std::right << std::setw(10) << worker.realTime/std::max(1, worker.timesVisited) << " "
841  << worker.moduleLabel << "";
842  }
843  LogVerbatim("FwkSummary") << "TimeReport "
844  << std::right << std::setw(10) << "CPU" << " "
845  << std::right << std::setw(10) << "Real" << " "
846  << std::right << std::setw(10) << "CPU" << " "
847  << std::right << std::setw(10) << "Real" << " "
848  << std::right << std::setw(10) << "CPU" << " "
849  << std::right << std::setw(10) << "Real" << " "
850  << "Name" << "";
851  LogVerbatim("FwkSummary") << "TimeReport "
852  << std::right << std::setw(22) << "per event "
853  << std::right << std::setw(22) << "per module-run "
854  << std::right << std::setw(22) << "per module-visit "
855  << "";
856 
857  LogVerbatim("FwkSummary") << "";
858  LogVerbatim("FwkSummary") << "T---Report end!" << "";
859  LogVerbatim("FwkSummary") << "";
860  }
861 
864  }
865 
868  }
869 
871  for_all(all_output_communicators_, boost::bind(&OutputModuleCommunicator::openFile, _1, boost::cref(fb)));
872  }
873 
874  void Schedule::writeRun(RunPrincipal const& rp, ProcessContext const* processContext) {
875  for_all(all_output_communicators_, boost::bind(&OutputModuleCommunicator::writeRun, _1, boost::cref(rp), processContext));
876  }
877 
878  void Schedule::writeLumi(LuminosityBlockPrincipal const& lbp, ProcessContext const* processContext) {
879  for_all(all_output_communicators_, boost::bind(&OutputModuleCommunicator::writeLumi, _1, boost::cref(lbp), processContext));
880  }
881 
883  // Return true iff at least one output module returns true.
884  return (std::find_if (all_output_communicators_.begin(), all_output_communicators_.end(),
886  != all_output_communicators_.end());
887  }
888 
890  for_all(allWorkers(), boost::bind(&Worker::respondToOpenInputFile, _1, boost::cref(fb)));
891  }
892 
894  for_all(allWorkers(), boost::bind(&Worker::respondToCloseInputFile, _1, boost::cref(fb)));
895  }
896 
897  void Schedule::beginJob(ProductRegistry const& iRegistry) {
899 
900  globalSchedule_->beginJob(iRegistry);
901  }
902 
903  void Schedule::beginStream(unsigned int iStreamID) {
904  assert(iStreamID<streamSchedules_.size());
905  streamSchedules_[iStreamID]->beginStream();
906  }
907 
908  void Schedule::endStream(unsigned int iStreamID) {
909  assert(iStreamID<streamSchedules_.size());
910  streamSchedules_[iStreamID]->endStream();
911  }
912 
914  for_all(allWorkers(), boost::bind(&Worker::preForkReleaseResources, _1));
915  }
916  void Schedule::postForkReacquireResources(unsigned int iChildIndex, unsigned int iNumberOfChildren) {
917  for_all(allWorkers(), boost::bind(&Worker::postForkReacquireResources, _1, iChildIndex, iNumberOfChildren));
918  }
919 
921  ParameterSet const& iPSet,
922  const ProductRegistry& iRegistry) {
923  Worker* found = nullptr;
924  for (auto const& worker : allWorkers()) {
925  if (worker->description().moduleLabel() == iLabel) {
926  found = worker;
927  break;
928  }
929  }
930  if (nullptr == found) {
931  return false;
932  }
933 
934  auto newMod = moduleRegistry_->replaceModule(iLabel,iPSet,preallocConfig_);
935 
936  globalSchedule_->replaceModule(newMod,iLabel);
937 
938  for(auto s: streamSchedules_) {
939  s->replaceModule(newMod,iLabel);
940  }
941 
942  {
943  //Need to updateLookup in order to make getByToken work
944  auto const runLookup = iRegistry.productLookup(InRun);
945  auto const lumiLookup = iRegistry.productLookup(InLumi);
946  auto const eventLookup = iRegistry.productLookup(InEvent);
947  found->updateLookup(InRun,*runLookup);
948  found->updateLookup(InLumi,*lumiLookup);
949  found->updateLookup(InEvent,*eventLookup);
950  }
951 
952  return true;
953  }
954 
955  std::vector<ModuleDescription const*>
957  std::vector<ModuleDescription const*> result;
958  result.reserve(allWorkers().size());
959 
960  for (auto const& worker : allWorkers()) {
961  ModuleDescription const* p = worker->descPtr();
962  result.push_back(p);
963  }
964  return result;
965  }
966 
967  Schedule::AllWorkers const&
969  return globalSchedule_->allWorkers();
970  }
971 
972  void
973  Schedule::availablePaths(std::vector<std::string>& oLabelsToFill) const {
974  streamSchedules_[0]->availablePaths(oLabelsToFill);
975  }
976 
977  void
979  std::vector<std::string>& oLabelsToFill) const {
980  streamSchedules_[0]->modulesInPath(iPathLabel,oLabelsToFill);
981  }
982 
983  void
985  endpathsAreActive_ = active;
986  for(auto const & s : streamSchedules_) {
987  s->enableEndPaths(active);
988  }
989  }
990 
991  bool
993  return endpathsAreActive_;
994  }
995 
996  void
998  rep.eventSummary.totalEvents = 0;
1001  for(auto& s: streamSchedules_) {
1002  s->getTriggerReport(rep);
1003  }
1004  }
1005 
1006  void
1008  rep.eventSummary.totalEvents = 0;
1009  rep.eventSummary.cpuTime = 0.;
1010  rep.eventSummary.realTime = 0.;
1011  for(auto& s: streamSchedules_) {
1012  s->getTriggerTimingReport(rep);
1013  }
1014  }
1015 
1016  int
1018  int returnValue = 0;
1019  for(auto& s: streamSchedules_) {
1020  returnValue += s->totalEvents();
1021  }
1022  return returnValue;
1023  }
1024 
1025  int
1027  int returnValue = 0;
1028  for(auto& s: streamSchedules_) {
1029  returnValue += s->totalEventsPassed();
1030  }
1031  return returnValue;
1032  }
1033 
1034  int
1036  int returnValue = 0;
1037  for(auto& s: streamSchedules_) {
1038  returnValue += s->totalEventsFailed();
1039  }
1040  return returnValue;
1041  }
1042 
1043 
1044  void
1046  for(auto const& s: streamSchedules_) {
1047  s->clearCounters();
1048  }
1049  }
1050 
1051  //====================================
1052  // Schedule::checkForCorrectness algorithm
1053  //
1054  // The code creates a 'dependency' graph between all
1055  // modules. A module depends on another module if
1056  // 1) it 'consumes' data produced by that module
1057  // 2) it appears directly after the module within a Path
1058  //
1059  // If there is a cycle in the 'dependency' graph then
1060  // the schedule may be unrunnable. The schedule is still
1061  // runnable if all cycles have at least two edges which
1062  // connect modules only by Path dependencies (i.e. not
1063  // linked by a data dependency).
1064  //
1065  // Example 1:
1066  // C consumes data from B
1067  // Path 1: A + B + C
1068  // Path 2: B + C + A
1069  //
1070  // Cycle: A after C [p2], C consumes B, B after A [p1]
1071  // Since this cycle has 2 path only edges it is OK since
1072  // A and (B+C) are independent so their run order doesn't matter
1073  //
1074  // Example 2:
1075  // B consumes A
1076  // C consumes B
1077  // Path: C + A
1078  //
1079  // Cycle: A after C [p], C consumes B, B consumes A
1080  // Since this cycle has 1 path only edge it is unrunnable.
1081  //
1082  // Example 3:
1083  // A consumes B
1084  // B consumes C
1085  // C consumes A
1086  // (no Path since unscheduled execution)
1087  //
1088  // Cycle: A consumes B, B consumes C, C consumes A
1089  // Since this cycle has 0 path only edges it is unrunnable.
1090  //====================================
1091 
1092  namespace {
1093  typedef std::pair<unsigned int, unsigned int> SimpleEdge;
1094  typedef std::map<SimpleEdge, std::vector<unsigned int>> EdgeToPathMap;
1095 
1096  typedef boost::adjacency_list<boost::vecS, boost::vecS, boost::bidirectionalS> Graph;
1097 
1098  typedef boost::graph_traits<Graph>::edge_descriptor Edge;
1099  struct cycle_detector : public boost::dfs_visitor<> {
1100 
1101  cycle_detector(EdgeToPathMap const& iEdgeToPathMap,
1102  std::vector<std::string> const& iPathNames,
1103  std::map<std::string,unsigned int> const& iModuleNamesToIndex):
1104  m_edgeToPathMap(iEdgeToPathMap),
1105  m_pathNames(iPathNames),
1106  m_namesToIndex(iModuleNamesToIndex){}
1107 
1108  void tree_edge(Edge iEdge, Graph const&) {
1109  m_stack.push_back(iEdge);
1110  }
1111 
1112  void finish_edge(Edge iEdge, Graph const& iGraph) {
1113  if(not m_stack.empty()) {
1114  if (iEdge == m_stack.back()) {
1115  m_stack.pop_back();
1116  }
1117  }
1118  }
1119 
1120  //Called if a cycle happens
1121  void back_edge(Edge iEdge, Graph const& iGraph) {
1122  //NOTE: If the path containing the cycle contains two or more
1123  // path only edges then there is no problem
1124 
1126  IndexMap const& index = get(boost::vertex_index, iGraph);
1127 
1128  unsigned int vertex = index[target(iEdge,iGraph)];
1129 
1130  //Find last edge which starts with this vertex
1131  std::list<Edge>::iterator itFirst = m_stack.begin();
1132  {
1133  bool seenVertex = false;
1134  while(itFirst != m_stack.end()) {
1135  if(not seenVertex) {
1136  if(index[source(*itFirst,iGraph)] == vertex) {
1137  seenVertex = true;
1138  }
1139  } else
1140  if (index[source(*itFirst,iGraph)] != vertex) {
1141  break;
1142  }
1143  ++itFirst;
1144  }
1145  if(itFirst != m_stack.begin()) {
1146  --itFirst;
1147  }
1148  }
1149  //This edge has not been added to the stack yet
1150  // making a copy allows us to add it in but not worry
1151  // about removing it at the end of the routine
1152  std::vector<Edge> tempStack;
1153  tempStack.reserve(m_stack.size()+1);
1154  tempStack.insert(tempStack.end(),itFirst,m_stack.end());
1155  tempStack.emplace_back(iEdge);
1156 
1157  unsigned int nPathDependencyOnly =0;
1158  for(auto const& edge: tempStack) {
1159  unsigned int in =index[source(edge,iGraph)];
1160  unsigned int out =index[target(edge,iGraph)];
1161 
1162  auto iFound = m_edgeToPathMap.find(SimpleEdge(in,out));
1163  bool pathDependencyOnly = true;
1164  for(auto dependency : iFound->second) {
1165  if (dependency == std::numeric_limits<unsigned int>::max()) {
1166  pathDependencyOnly = false;
1167  break;
1168  }
1169  }
1170  if (pathDependencyOnly) {
1171  ++nPathDependencyOnly;
1172  }
1173  }
1174  if(nPathDependencyOnly < 2) {
1175  reportError(tempStack,index,iGraph);
1176  }
1177  }
1178  private:
1179  std::string const& pathName(unsigned int iIndex) const {
1180  return m_pathNames[iIndex];
1181  }
1182 
1183  std::string const& moduleName(unsigned int iIndex) const {
1184  for(auto const& item : m_namesToIndex) {
1185  if(item.second == iIndex) {
1186  return item.first;
1187  }
1188  }
1189  assert(false);
1190  }
1191 
1192  void
1193  reportError(std::vector<Edge>const& iEdges,
1195  Graph const& iGraph) const {
1196  std::stringstream oStream;
1197  oStream <<"Module run order problem found: \n";
1198  bool first_edge = true;
1199  for(auto const& edge: iEdges) {
1200  unsigned int in =iIndex[source(edge,iGraph)];
1201  unsigned int out =iIndex[target(edge,iGraph)];
1202 
1203  if(first_edge) {
1204  first_edge = false;
1205  } else {
1206  oStream<<", ";
1207  }
1208  oStream <<moduleName(in);
1209 
1210  auto iFound = m_edgeToPathMap.find(SimpleEdge(in,out));
1211  bool pathDependencyOnly = true;
1212  for(auto dependency : iFound->second) {
1213  if (dependency == std::numeric_limits<unsigned int>::max()) {
1214  pathDependencyOnly = false;
1215  break;
1216  }
1217  }
1218  if (pathDependencyOnly) {
1219  oStream <<" after "<<moduleName(out)<<" [path "<<pathName(iFound->second[0])<<"]";
1220  } else {
1221  oStream <<" consumes "<<moduleName(out);
1222  }
1223  }
1224  oStream<<"\n Running in the threaded framework would lead to indeterminate results."
1225  "\n Please change order of modules in mentioned Path(s) to avoid inconsistent module ordering.";
1226 
1227  LogError("UnrunnableSchedule")<<oStream.str();
1228  }
1229 
1230  EdgeToPathMap const& m_edgeToPathMap;
1231  std::vector<std::string> const& m_pathNames;
1232  std::map<std::string,unsigned int> m_namesToIndex;
1233 
1234  std::list<Edge> m_stack;
1235  };
1236  }
1237 
1238  void
1240  {
1241  //Need to lookup names to ids quickly
1242  std::map<std::string,unsigned int> moduleNamesToIndex;
1243  for(auto worker: allWorkers()) {
1244  moduleNamesToIndex.insert( std::make_pair(worker->description().moduleLabel(),
1245  worker->description().id()));
1246  }
1247 
1248  //If a module to module dependency comes from a path, remember which path
1249  EdgeToPathMap edgeToPathMap;
1250 
1251  //determine the path dependencies
1252  std::vector<std::string> pathNames;
1253  {
1254  streamSchedules_[0]->availablePaths(pathNames);
1255 
1256  std::vector<std::string> moduleNames;
1257  std::vector<std::string> reducedModuleNames;
1258  unsigned int pathIndex=0;
1259  for(auto const& path: pathNames) {
1260  moduleNames.clear();
1261  reducedModuleNames.clear();
1262  std::set<std::string> alreadySeenNames;
1263 
1264  streamSchedules_[0]->modulesInPath(path,moduleNames);
1265  std::string lastModuleName;
1266  unsigned int lastModuleIndex;
1267  for(auto const& name: moduleNames) {
1268  auto found = alreadySeenNames.insert(name);
1269  if(found.second) {
1270  //first time for this path
1271  unsigned int moduleIndex = moduleNamesToIndex[name];
1272  if(not lastModuleName.empty() ) {
1273  edgeToPathMap[std::make_pair(moduleIndex,lastModuleIndex)].push_back(pathIndex);
1274  }
1275  lastModuleName = name;
1276  lastModuleIndex = moduleIndex;
1277  }
1278  }
1279  ++pathIndex;
1280  }
1281  }
1282  {
1283  std::vector<const char*> dependentModules;
1284  //determine the data dependencies
1285  for(auto const& worker: allWorkers()) {
1286  dependentModules.clear();
1287  //NOTE: what about aliases?
1288  worker->modulesDependentUpon(dependentModules);
1289  auto found = moduleNamesToIndex.find(worker->description().moduleLabel());
1290  if (found == moduleNamesToIndex.end()) {
1291  //The module was from a previous process
1292  continue;
1293  }
1294  unsigned int moduleIndex = found->second;
1295  for(auto name: dependentModules) {
1296  edgeToPathMap[std::make_pair(moduleIndex, moduleNamesToIndex[name])].push_back(std::numeric_limits<unsigned int>::max());
1297  }
1298  }
1299  }
1300  //Now use boost graph library to find cycles in the dependencies
1301  std::vector<SimpleEdge> outList;
1302  outList.reserve(edgeToPathMap.size());
1303  for(auto const& edgeInfo: edgeToPathMap) {
1304  outList.push_back(edgeInfo.first);
1305  }
1306 
1307  Graph g(outList.begin(),outList.end(), moduleNamesToIndex.size());
1308 
1309  cycle_detector detector(edgeToPathMap,pathNames,moduleNamesToIndex);
1310  boost::depth_first_search(g,boost::visitor(detector));
1311  }
1312 }
type
Definition: HCALResponse.h:21
bool empty() const
Definition: ParameterSet.h:216
std::vector< PathSummary > endPathSummaries
Definition: TriggerReport.h:64
T getUntrackedParameter(std::string const &, T const &) const
std::vector< PathTimingSummary > endPathSummaries
dictionary aliases
Definition: autoCond.py:51
int i
Definition: DBlmapReader.cc:9
string rep
Definition: cuy.py:1188
void checkForCorrectness() const
Check that the schedule is actually runable.
Definition: Schedule.cc:1239
std::vector< BranchIDList > BranchIDLists
Definition: BranchIDList.h:19
virtual void openFile(FileBlock const &fb)=0
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
Definition: Schedule.cc:968
void availablePaths(std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill the labels for all paths in the process
Definition: Schedule.cc:973
static std::string const source("source")
virtual void writeRun(RunPrincipal const &rp, ProcessContext const *)=0
preallocConfig_(prealloc)
bool endPathsEnabled() const
Definition: Schedule.cc:992
all_output_communicators_()
void respondToCloseInputFile(FileBlock const &fb)
Definition: Schedule.cc:893
std::vector< Worker * > AllWorkers
Definition: Schedule.h:113
std::vector< ParameterSet > VParameterSet
Definition: ParameterSet.h:33
void writeRun(RunPrincipal const &rp, ProcessContext const *)
Definition: Schedule.cc:874
void endStream(unsigned int)
Definition: Schedule.cc:908
void writeLumi(LuminosityBlockPrincipal const &lbp, ProcessContext const *)
Definition: Schedule.cc:878
void enableEndPaths(bool active)
Definition: Schedule.cc:984
processConfiguration
Definition: Schedule.cc:369
ParameterSet getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
endpathsAreActive_(true)
Definition: Schedule.cc:374
std::vector< WorkerSummary > workerSummaries
Definition: TriggerReport.h:65
actions
Definition: Schedule.cc:369
static unsigned int getUniqueID()
Returns a unique id each time called. Intended to be passed to ModuleDescription&#39;s constructor&#39;s modI...
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e g
Definition: Activities.doc:4
int totalEventsFailed() const
Definition: Schedule.cc:1035
bool changeModule(std::string const &iLabel, ParameterSet const &iPSet, const ProductRegistry &iRegistry)
Definition: Schedule.cc:920
void eraseOrSetUntrackedParameterSet(std::string const &name)
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
std::vector< std::string > getParameterNamesForType(bool trackiness=true) const
Definition: ParameterSet.h:192
boost::shared_ptr< ProductHolderIndexHelper > const & productLookup(BranchType branchType) const
int totalEventsPassed() const
Definition: Schedule.cc:1026
std::string moduleName(Provenance const &provenance)
Definition: Provenance.cc:27
virtual void updateLookup(BranchType iBranchType, ProductHolderIndexHelper const &)=0
tuple path
else: Piece not in the list, fine.
std::vector< PathSummary > trigPathSummaries
Definition: TriggerReport.h:63
EventSummary eventSummary
Definition: TriggerReport.h:62
void limitOutput(ParameterSet const &proc_pset, BranchIDLists const &branchIDLists)
Definition: Schedule.cc:459
wantSummary_(tns.wantSummary())
int totalEvents() const
Definition: Schedule.cc:1017
virtual void openNewFileIfNeeded()=0
EventTimingSummary eventSummary
void clearCounters()
Clear all the counters in the trigger report.
Definition: Schedule.cc:1045
tuple result
Definition: query.py:137
boost::shared_ptr< ModuleRegistry > moduleRegistry_
Definition: Schedule.h:244
std::vector< PathTimingSummary > trigPathSummaries
std::vector< std::shared_ptr< StreamSchedule > > streamSchedules_
Definition: Schedule.h:245
void setProcessParameterSetID(ParameterSetID const &id)
Associated free functions.
Definition: Registry.cc:76
bool terminate() const
Return whether each output module has reached its maximum count.
Definition: Schedule.cc:498
void respondToOpenInputFile(FileBlock const &fb)
Definition: Schedule.cc:889
virtual void writeLumi(LuminosityBlockPrincipal const &lbp, ProcessContext const *)=0
areg
Definition: Schedule.cc:369
void postForkReacquireResources(unsigned int iChildIndex, unsigned int iNumberOfChildren)
Definition: Worker.h:90
void getTriggerReport(TriggerReport &rep) const
Definition: Schedule.cc:997
tuple out
Definition: dbtoconf.py:99
virtual bool shouldWeCloseFile() const =0
PreallocationConfiguration preallocConfig_
Definition: Schedule.h:250
moduleRegistry_(new ModuleRegistry())
void sort_all(RandomAccessSequence &s)
wrappers for std::sort
Definition: Algorithms.h:120
volatile bool endpathsAreActive_
Definition: Schedule.h:255
void respondToOpenInputFile(FileBlock const &fb)
Definition: Worker.h:86
bool search_all(ForwardSequence const &s, Datum const &d)
Definition: Algorithms.h:46
virtual std::unique_ptr< OutputModuleCommunicator > createOutputModuleCommunicator()=0
AllOutputModuleCommunicators all_output_communicators_
Definition: Schedule.h:249
void loadMissingDictionaries()
void modulesInPath(std::string const &iPathLabel, std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel ...
Definition: Schedule.cc:978
Schedule(ParameterSet &proc_pset, service::TriggerNamesService &tns, ProductRegistry &pregistry, BranchIDListHelper &branchIDListHelper, ExceptionToActionTable const &actions, boost::shared_ptr< ActivityRegistry > areg, boost::shared_ptr< ProcessConfiguration > processConfiguration, const ParameterSet *subProcPSet, PreallocationConfiguration const &config, ProcessContext const *processContext)
Definition: Schedule.cc:358
void beginStream(unsigned int)
Definition: Schedule.cc:903
void respondToCloseInputFile(FileBlock const &fb)
Definition: Worker.h:87
void preForkReleaseResources()
Definition: Worker.h:89
bool wantSummary_
Definition: Schedule.h:253
void postForkReacquireResources(unsigned int iChildIndex, unsigned int iNumberOfChildren)
Definition: Schedule.cc:916
list key
Definition: combine.py:13
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
Definition: Schedule.cc:956
Strings const & getTrigPaths() const
std::unique_ptr< GlobalSchedule > globalSchedule_
Definition: Schedule.h:247
void beginJob(ProductRegistry const &)
Definition: Schedule.cc:897
void openNewOutputFilesIfNeeded()
Definition: Schedule.cc:866
size_t getParameterSetNames(std::vector< std::string > &output, bool trackiness=true) const
void preForkReleaseResources()
Definition: Schedule.cc:913
void openOutputFiles(FileBlock &fb)
Definition: Schedule.cc:870
preg
Definition: Schedule.cc:369
std::vector< WorkerTimingSummary > workerSummaries
T mod(const T &a, const T &b)
Definition: ecalDccMap.h:4
void endJob(ExceptionCollector &collector)
Definition: Schedule.cc:514
void getTriggerTimingReport(TriggerTimingReport &rep) const
Definition: Schedule.cc:1007
bool shouldWeCloseOutput() const
Definition: Schedule.cc:882
std::vector< std::string > vstring
Definition: Schedule.cc:354
tuple size
Write out results.
void closeOutputFiles()
Definition: Schedule.cc:862
prealloc
Definition: Schedule.cc:369
std::string match(BranchDescription const &a, BranchDescription const &b, std::string const &fileName)