CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
Schedule.cc
Go to the documentation of this file.
2 
24 
25 #include "boost/bind.hpp"
26 #include "boost/ref.hpp"
27 
28 #include "boost/graph/graph_traits.hpp"
29 #include "boost/graph/adjacency_list.hpp"
30 #include "boost/graph/depth_first_search.hpp"
31 #include "boost/graph/visitors.hpp"
32 
33 
34 #include <algorithm>
35 #include <cassert>
36 #include <cstdlib>
37 #include <functional>
38 #include <iomanip>
39 #include <list>
40 #include <map>
41 #include <exception>
42 #include <sstream>
43 
44 namespace edm {
45  namespace {
46  bool binary_search_string(std::vector<std::string> const& v, std::string const& s) {
47  return std::binary_search(v.begin(), v.end(), s);
48  }
49 
50  // Here we make the trigger results inserter directly. This should
51  // probably be a utility in the WorkerRegistry or elsewhere.
52 
53  std::shared_ptr<TriggerResultInserter>
54  makeInserter(ParameterSet& proc_pset,
55  PreallocationConfiguration const& iPrealloc,
56  ProductRegistry& preg,
57  ExceptionToActionTable const& actions,
58  boost::shared_ptr<ActivityRegistry> areg,
59  boost::shared_ptr<ProcessConfiguration> processConfiguration) {
60 
61  ParameterSet* trig_pset = proc_pset.getPSetForUpdate("@trigger_paths");
62  trig_pset->registerIt();
63 
64  WorkerParams work_args(trig_pset, preg, &iPrealloc, processConfiguration, actions);
65  ModuleDescription md(trig_pset->id(),
66  "TriggerResultInserter",
67  "TriggerResults",
68  processConfiguration.get(),
70 
71  areg->preModuleConstructionSignal_(md);
72  bool postCalled = false;
73  std::shared_ptr<TriggerResultInserter> returnValue;
74  try {
75  maker::ModuleHolderT<TriggerResultInserter> holder(new TriggerResultInserter(*trig_pset, iPrealloc.numberOfStreams()),static_cast<Maker const*>(nullptr));
76  holder.setModuleDescription(md);
77  holder.registerProductsAndCallbacks(&preg);
78  returnValue.reset(holder.release());
79  postCalled = true;
80  // if exception then post will be called in the catch block
81  areg->postModuleConstructionSignal_(md);
82  }
83  catch (...) {
84  if(!postCalled) {
85  try {
86  areg->postModuleConstructionSignal_(md);
87  }
88  catch (...) {
89  // If post throws an exception ignore it because we are already handling another exception
90  }
91  }
92  throw;
93  }
94  return returnValue;
95  }
96 
97 
98  void
99  checkAndInsertAlias(std::string const& friendlyClassName,
100  std::string const& moduleLabel,
101  std::string const& productInstanceName,
102  std::string const& processName,
103  std::string const& alias,
104  std::string const& instanceAlias,
105  ProductRegistry const& preg,
106  std::multimap<BranchKey, BranchKey>& aliasMap,
107  std::map<BranchKey, BranchKey>& aliasKeys) {
108  std::string const star("*");
109 
110  BranchKey key(friendlyClassName, moduleLabel, productInstanceName, processName);
111  if(preg.productList().find(key) == preg.productList().end()) {
112  // No product was found matching the alias.
113  // We throw an exception only if a module with the specified module label was created in this process.
114  for(auto const& product : preg.productList()) {
115  if(moduleLabel == product.first.moduleLabel() && processName == product.first.processName()) {
116  throw Exception(errors::Configuration, "EDAlias does not match data\n")
117  << "There are no products of type '" << friendlyClassName << "'\n"
118  << "with module label '" << moduleLabel << "' and instance name '" << productInstanceName << "'.\n";
119  }
120  }
121  }
122 
123  std::string const& theInstanceAlias(instanceAlias == star ? productInstanceName : instanceAlias);
124  BranchKey aliasKey(friendlyClassName, alias, theInstanceAlias, processName);
125  if(preg.productList().find(aliasKey) != preg.productList().end()) {
126  throw Exception(errors::Configuration, "EDAlias conflicts with data\n")
127  << "A product of type '" << friendlyClassName << "'\n"
128  << "with module label '" << alias << "' and instance name '" << theInstanceAlias << "'\n"
129  << "already exists.\n";
130  }
131  auto iter = aliasKeys.find(aliasKey);
132  if(iter != aliasKeys.end()) {
133  // The alias matches a previous one. If the same alias is used for different product, throw.
134  if(iter->second != key) {
135  throw Exception(errors::Configuration, "EDAlias conflict\n")
136  << "The module label alias '" << alias << "' and product instance alias '" << theInstanceAlias << "'\n"
137  << "are used for multiple products of type '" << friendlyClassName << "'\n"
138  << "One has module label '" << moduleLabel << "' and product instance name '" << productInstanceName << "',\n"
139  << "the other has module label '" << iter->second.moduleLabel() << "' and product instance name '" << iter->second.productInstanceName() << "'.\n";
140  }
141  } else {
142  auto prodIter = preg.productList().find(key);
143  if(prodIter != preg.productList().end()) {
144  if (!prodIter->second.produced()) {
145  throw Exception(errors::Configuration, "EDAlias\n")
146  << "The module label alias '" << alias << "' and product instance alias '" << theInstanceAlias << "'\n"
147  << "are used for a product of type '" << friendlyClassName << "'\n"
148  << "with module label '" << moduleLabel << "' and product instance name '" << productInstanceName << "',\n"
149  << "An EDAlias can only be used for products produced in the current process. This one is not.\n";
150  }
151  aliasMap.insert(std::make_pair(key, aliasKey));
152  aliasKeys.insert(std::make_pair(aliasKey, key));
153  }
154  }
155  }
156 
157  void
158  processEDAliases(ParameterSet const& proc_pset, std::string const& processName, ProductRegistry& preg) {
159  std::vector<std::string> aliases = proc_pset.getParameter<std::vector<std::string> >("@all_aliases");
160  if(aliases.empty()) {
161  return;
162  }
163  std::string const star("*");
164  std::string const empty("");
166  desc.add<std::string>("type");
167  desc.add<std::string>("fromProductInstance", star);
168  desc.add<std::string>("toProductInstance", star);
169 
170  std::multimap<BranchKey, BranchKey> aliasMap;
171 
172  std::map<BranchKey, BranchKey> aliasKeys; // Used to search for duplicates or clashes.
173 
174  // Now, loop over the alias information and store it in aliasMap.
175  for(std::string const& alias : aliases) {
176  ParameterSet const& aliasPSet = proc_pset.getParameterSet(alias);
177  std::vector<std::string> vPSetNames = aliasPSet.getParameterNamesForType<VParameterSet>();
178  for(std::string const& moduleLabel : vPSetNames) {
179  VParameterSet vPSet = aliasPSet.getParameter<VParameterSet>(moduleLabel);
180  for(ParameterSet& pset : vPSet) {
181  desc.validate(pset);
182  std::string friendlyClassName = pset.getParameter<std::string>("type");
183  std::string productInstanceName = pset.getParameter<std::string>("fromProductInstance");
184  std::string instanceAlias = pset.getParameter<std::string>("toProductInstance");
185  if(productInstanceName == star) {
186  bool match = false;
187  BranchKey lowerBound(friendlyClassName, moduleLabel, empty, empty);
188  for(ProductRegistry::ProductList::const_iterator it = preg.productList().lower_bound(lowerBound);
189  it != preg.productList().end() && it->first.friendlyClassName() == friendlyClassName && it->first.moduleLabel() == moduleLabel;
190  ++it) {
191  if(it->first.processName() != processName) {
192  continue;
193  }
194  match = true;
195 
196  checkAndInsertAlias(friendlyClassName, moduleLabel, it->first.productInstanceName(), processName, alias, instanceAlias, preg, aliasMap, aliasKeys);
197  }
198  if(!match) {
199  // No product was found matching the alias.
200  // We throw an exception only if a module with the specified module label was created in this process.
201  for(auto const& product : preg.productList()) {
202  if(moduleLabel == product.first.moduleLabel() && processName == product.first.processName()) {
203  throw Exception(errors::Configuration, "EDAlias parameter set mismatch\n")
204  << "There are no products of type '" << friendlyClassName << "'\n"
205  << "with module label '" << moduleLabel << "'.\n";
206  }
207  }
208  }
209  } else {
210  checkAndInsertAlias(friendlyClassName, moduleLabel, productInstanceName, processName, alias, instanceAlias, preg, aliasMap, aliasKeys);
211  }
212  }
213  }
214  }
215 
216 
217  // Now add the new alias entries to the product registry.
218  for(auto const& aliasEntry : aliasMap) {
219  ProductRegistry::ProductList::const_iterator it = preg.productList().find(aliasEntry.first);
220  assert(it != preg.productList().end());
221  preg.addLabelAlias(it->second, aliasEntry.second.moduleLabel(), aliasEntry.second.productInstanceName());
222  }
223 
224  }
225 
226  typedef std::vector<std::string> vstring;
227 
228  void reduceParameterSet(ParameterSet& proc_pset,
229  vstring const& end_path_name_list,
230  vstring& modulesInConfig,
231  std::set<std::string> const& usedModuleLabels,
232  std::map<std::string, std::vector<std::pair<std::string, int> > >& outputModulePathPositions) {
233  // Before calculating the ParameterSetID of the top level ParameterSet or
234  // saving it in the registry drop from the top level ParameterSet all
235  // OutputModules and EDAnalyzers not on trigger paths. If unscheduled
236  // production is not enabled also drop all the EDFilters and EDProducers
237  // that are not scheduled. Drop the ParameterSet used to configure the module
238  // itself. Also drop the other traces of these labels in the top level
239  // ParameterSet: Remove that labels from @all_modules and from all the
240  // end paths. If this makes any end paths empty, then remove the end path
241  // name from @end_paths, and @paths.
242 
243  // First make a list of labels to drop
244  vstring outputModuleLabels;
245  std::string edmType;
246  std::string const moduleEdmType("@module_edm_type");
247  std::string const outputModule("OutputModule");
248  std::string const edAnalyzer("EDAnalyzer");
249  std::string const edFilter("EDFilter");
250  std::string const edProducer("EDProducer");
251 
252  std::set<std::string> modulesInConfigSet(modulesInConfig.begin(), modulesInConfig.end());
253 
254  //need a list of all modules on paths in order to determine
255  // if an EDAnalyzer only appears on an end path
256  vstring scheduledPaths = proc_pset.getParameter<vstring>("@paths");
257  std::set<std::string> modulesOnPaths;
258  {
259  std::set<std::string> noEndPaths(scheduledPaths.begin(),scheduledPaths.end());
260  for(auto const& endPath: end_path_name_list) {
261  noEndPaths.erase(endPath);
262  }
263  {
264  vstring labels;
265  for(auto const& path: noEndPaths) {
266  labels = proc_pset.getParameter<vstring>(path);
267  modulesOnPaths.insert(labels.begin(),labels.end());
268  }
269  }
270  }
271  //Initially fill labelsToBeDropped with all module mentioned in
272  // the configuration but which are not being used by the system
273  std::vector<std::string> labelsToBeDropped;
274  labelsToBeDropped.reserve(modulesInConfigSet.size());
275  std::set_difference(modulesInConfigSet.begin(),modulesInConfigSet.end(),
276  usedModuleLabels.begin(),usedModuleLabels.end(),
277  std::back_inserter(labelsToBeDropped));
278 
279  const unsigned int sizeBeforeOutputModules = labelsToBeDropped.size();
280  for (auto const& modLabel: usedModuleLabels) {
281  edmType = proc_pset.getParameterSet(modLabel).getParameter<std::string>(moduleEdmType);
282  if (edmType == outputModule) {
283  outputModuleLabels.push_back(modLabel);
284  labelsToBeDropped.push_back(modLabel);
285  }
286  if(edmType == edAnalyzer) {
287  if(modulesOnPaths.end()==modulesOnPaths.find(modLabel)) {
288  labelsToBeDropped.push_back(modLabel);
289  }
290  }
291  }
292  //labelsToBeDropped must be sorted
293  std::inplace_merge(labelsToBeDropped.begin(),
294  labelsToBeDropped.begin()+sizeBeforeOutputModules,
295  labelsToBeDropped.end());
296 
297  // drop the parameter sets used to configure the modules
298  for_all(labelsToBeDropped, boost::bind(&ParameterSet::eraseOrSetUntrackedParameterSet, boost::ref(proc_pset), _1));
299 
300  // drop the labels from @all_modules
301  vstring::iterator endAfterRemove = std::remove_if(modulesInConfig.begin(), modulesInConfig.end(), boost::bind(binary_search_string, boost::ref(labelsToBeDropped), _1));
302  modulesInConfig.erase(endAfterRemove, modulesInConfig.end());
303  proc_pset.addParameter<vstring>(std::string("@all_modules"), modulesInConfig);
304 
305  // drop the labels from all end paths
306  vstring endPathsToBeDropped;
307  vstring labels;
308  for (vstring::const_iterator iEndPath = end_path_name_list.begin(), endEndPath = end_path_name_list.end();
309  iEndPath != endEndPath;
310  ++iEndPath) {
311  labels = proc_pset.getParameter<vstring>(*iEndPath);
312  vstring::iterator iSave = labels.begin();
313  vstring::iterator iBegin = labels.begin();
314 
315  for (vstring::iterator iLabel = labels.begin(), iEnd = labels.end();
316  iLabel != iEnd; ++iLabel) {
317  if (binary_search_string(labelsToBeDropped, *iLabel)) {
318  if (binary_search_string(outputModuleLabels, *iLabel)) {
319  outputModulePathPositions[*iLabel].emplace_back(*iEndPath, iSave - iBegin);
320  }
321  } else {
322  if (iSave != iLabel) {
323  iSave->swap(*iLabel);
324  }
325  ++iSave;
326  }
327  }
328  labels.erase(iSave, labels.end());
329  if (labels.empty()) {
330  // remove empty end paths and save their names
331  proc_pset.eraseSimpleParameter(*iEndPath);
332  endPathsToBeDropped.push_back(*iEndPath);
333  } else {
334  proc_pset.addParameter<vstring>(*iEndPath, labels);
335  }
336  }
337  sort_all(endPathsToBeDropped);
338 
339  // remove empty end paths from @paths
340  endAfterRemove = std::remove_if(scheduledPaths.begin(), scheduledPaths.end(), boost::bind(binary_search_string, boost::ref(endPathsToBeDropped), _1));
341  scheduledPaths.erase(endAfterRemove, scheduledPaths.end());
342  proc_pset.addParameter<vstring>(std::string("@paths"), scheduledPaths);
343 
344  // remove empty end paths from @end_paths
345  vstring scheduledEndPaths = proc_pset.getParameter<vstring>("@end_paths");
346  endAfterRemove = std::remove_if(scheduledEndPaths.begin(), scheduledEndPaths.end(), boost::bind(binary_search_string, boost::ref(endPathsToBeDropped), _1));
347  scheduledEndPaths.erase(endAfterRemove, scheduledEndPaths.end());
348  proc_pset.addParameter<vstring>(std::string("@end_paths"), scheduledEndPaths);
349 
350  }
351  }
352  // -----------------------------
353 
354  typedef std::vector<std::string> vstring;
355 
356  // -----------------------------
357 
360  ProductRegistry& preg,
361  BranchIDListHelper& branchIDListHelper,
362  ExceptionToActionTable const& actions,
363  boost::shared_ptr<ActivityRegistry> areg,
364  boost::shared_ptr<ProcessConfiguration> processConfiguration,
365  const ParameterSet* subProcPSet,
367  ProcessContext const* processContext) :
368  //Only create a resultsInserter if there is a trigger path
369  resultsInserter_{tns.getTrigPaths().empty()? std::shared_ptr<TriggerResultInserter>{} :makeInserter(proc_pset,prealloc,preg,actions,areg,processConfiguration)},
370  moduleRegistry_(new ModuleRegistry()),
373  wantSummary_(tns.wantSummary()),
375  {
376  assert(0<prealloc.numberOfStreams());
377  streamSchedules_.reserve(prealloc.numberOfStreams());
378  for(unsigned int i=0; i<prealloc.numberOfStreams();++i) {
379  streamSchedules_.emplace_back(std::shared_ptr<StreamSchedule>{new StreamSchedule{resultsInserter_.get(),moduleRegistry_,proc_pset,tns,prealloc, preg,branchIDListHelper,actions,areg,processConfiguration,nullptr==subProcPSet,StreamID{i},processContext}});
380  }
381 
382  //TriggerResults are injected automatically by StreamSchedules and are
383  // unknown to the ModuleRegistry
384  const std::string kTriggerResults("TriggerResults");
385  std::vector<std::string> modulesToUse;
386  modulesToUse.reserve(streamSchedules_[0]->allWorkers().size());
387  for(auto const& worker : streamSchedules_[0]->allWorkers()) {
388  if(worker->description().moduleLabel() != kTriggerResults) {
389  modulesToUse.push_back(worker->description().moduleLabel());
390  }
391  }
392  //The unscheduled modules are at the end of the list, but we want them at the front
393  unsigned int n = streamSchedules_[0]->numberOfUnscheduledModules();
394  if(n>0) {
395  std::vector<std::string> temp;
396  temp.reserve(modulesToUse.size());
397  auto itBeginUnscheduled = modulesToUse.begin()+modulesToUse.size()-n;
398  std::copy(itBeginUnscheduled,modulesToUse.end(),
399  std::back_inserter(temp));
400  std::copy(modulesToUse.begin(),itBeginUnscheduled,std::back_inserter(temp));
401  temp.swap(modulesToUse);
402  }
403  globalSchedule_.reset( new GlobalSchedule{ resultsInserter_.get(),
405  modulesToUse,
406  proc_pset, preg, prealloc,
407  actions,areg,processConfiguration,processContext });
408 
409  //TriggerResults is not in the top level ParameterSet so the call to
410  // reduceParameterSet would fail to find it. Just remove it up front.
411  std::set<std::string> usedModuleLabels;
412  for( auto const worker: allWorkers()) {
413  if(worker->description().moduleLabel() != kTriggerResults) {
414  usedModuleLabels.insert(worker->description().moduleLabel());
415  }
416  }
417  std::vector<std::string> modulesInConfig(proc_pset.getParameter<std::vector<std::string> >("@all_modules"));
418  std::map<std::string, std::vector<std::pair<std::string, int> > > outputModulePathPositions;
419  reduceParameterSet(proc_pset, tns.getEndPaths(), modulesInConfig, usedModuleLabels,
420  outputModulePathPositions);
421  processEDAliases(proc_pset, processConfiguration->processName(), preg);
422  proc_pset.registerIt();
423  pset::setProcessParameterSetID(proc_pset.id());
424  processConfiguration->setParameterSetID(proc_pset.id());
425  processConfiguration->setProcessConfigurationID();
426 
427  // This is used for a little sanity-check to make sure no code
428  // modifications alter the number of workers at a later date.
429  size_t all_workers_count = allWorkers().size();
430 
431  moduleRegistry_->forAllModuleHolders([this](maker::ModuleHolder* iHolder){
432  auto comm = iHolder->createOutputModuleCommunicator();
433  if (comm) {
434  all_output_communicators_.emplace_back(boost::shared_ptr<OutputModuleCommunicator>{comm.release()});
435  }
436  });
437  // Now that the output workers are filled in, set any output limits or information.
438  limitOutput(proc_pset, branchIDListHelper.branchIDLists());
439 
441 
442  // Sanity check: make sure nobody has added a worker after we've
443  // already relied on the WorkerManager being full.
444  assert (all_workers_count == allWorkers().size());
445 
446  branchIDListHelper.updateFromRegistry(preg);
447 
448  preg.setFrozen();
449 
450  for (auto c : all_output_communicators_) {
451  c->setEventSelectionInfo(outputModulePathPositions, preg.anyProductProduced());
452  c->selectProducts(preg);
453  }
454 
455  } // Schedule::Schedule
456 
457 
458  void
459  Schedule::limitOutput(ParameterSet const& proc_pset, BranchIDLists const& branchIDLists) {
460  std::string const output("output");
461 
462  ParameterSet const& maxEventsPSet = proc_pset.getUntrackedParameterSet("maxEvents", ParameterSet());
463  int maxEventSpecs = 0;
464  int maxEventsOut = -1;
465  ParameterSet const* vMaxEventsOut = 0;
466  std::vector<std::string> intNamesE = maxEventsPSet.getParameterNamesForType<int>(false);
467  if (search_all(intNamesE, output)) {
468  maxEventsOut = maxEventsPSet.getUntrackedParameter<int>(output);
469  ++maxEventSpecs;
470  }
471  std::vector<std::string> psetNamesE;
472  maxEventsPSet.getParameterSetNames(psetNamesE, false);
473  if (search_all(psetNamesE, output)) {
474  vMaxEventsOut = &maxEventsPSet.getUntrackedParameterSet(output);
475  ++maxEventSpecs;
476  }
477 
478  if (maxEventSpecs > 1) {
480  "\nAt most, one form of 'output' may appear in the 'maxEvents' parameter set";
481  }
482 
483  for (auto c : all_output_communicators_) {
484  OutputModuleDescription desc(branchIDLists, maxEventsOut);
485  if (vMaxEventsOut != 0 && !vMaxEventsOut->empty()) {
486  std::string const& moduleLabel = c->description().moduleLabel();
487  try {
488  desc.maxEvents_ = vMaxEventsOut->getUntrackedParameter<int>(moduleLabel);
489  } catch (Exception const&) {
491  "\nNo entry in 'maxEvents' for output module label '" << moduleLabel << "'.\n";
492  }
493  }
494  c->configure(desc);
495  }
496  }
497 
498  bool Schedule::terminate() const {
499  if (all_output_communicators_.empty()) {
500  return false;
501  }
502  for (auto c : all_output_communicators_) {
503  if (!c->limitReached()) {
504  // Found an output module that has not reached output event count.
505  return false;
506  }
507  }
508  LogInfo("SuccessfulTermination")
509  << "The job is terminating successfully because each output module\n"
510  << "has reached its configured limit.\n";
511  return true;
512  }
513 
515  globalSchedule_->endJob(collector);
516  if (collector.hasThrown()) {
517  return;
518  }
519 
520  if (wantSummary_ == false) return;
521  {
522  TriggerReport tr;
523  getTriggerReport(tr);
524 
525  // The trigger report (pass/fail etc.):
526 
527  LogVerbatim("FwkSummary") << "";
528  LogVerbatim("FwkSummary") << "TrigReport " << "---------- Event Summary ------------";
529  if(!tr.trigPathSummaries.empty()) {
530  LogVerbatim("FwkSummary") << "TrigReport"
531  << " Events total = " << tr.eventSummary.totalEvents
532  << " passed = " << tr.eventSummary.totalEventsPassed
533  << " failed = " << tr.eventSummary.totalEventsFailed
534  << "";
535  } else {
536  LogVerbatim("FwkSummary") << "TrigReport"
537  << " Events total = " << tr.eventSummary.totalEvents
538  << " passed = " << tr.eventSummary.totalEvents
539  << " failed = 0";
540  }
541 
542  LogVerbatim("FwkSummary") << "";
543  LogVerbatim("FwkSummary") << "TrigReport " << "---------- Path Summary ------------";
544  LogVerbatim("FwkSummary") << "TrigReport "
545  << std::right << std::setw(10) << "Trig Bit#" << " "
546  << std::right << std::setw(10) << "Run" << " "
547  << std::right << std::setw(10) << "Passed" << " "
548  << std::right << std::setw(10) << "Failed" << " "
549  << std::right << std::setw(10) << "Error" << " "
550  << "Name" << "";
551  for (auto const& p: tr.trigPathSummaries) {
552  LogVerbatim("FwkSummary") << "TrigReport "
553  << std::right << std::setw(5) << 1
554  << std::right << std::setw(5) << p.bitPosition << " "
555  << std::right << std::setw(10) << p.timesRun << " "
556  << std::right << std::setw(10) << p.timesPassed << " "
557  << std::right << std::setw(10) << p.timesFailed << " "
558  << std::right << std::setw(10) << p.timesExcept << " "
559  << p.name << "";
560  }
561 
562  /*
563  std::vector<int>::const_iterator epi = empty_trig_paths_.begin();
564  std::vector<int>::const_iterator epe = empty_trig_paths_.end();
565  std::vector<std::string>::const_iterator epn = empty_trig_path_names_.begin();
566  for (; epi != epe; ++epi, ++epn) {
567 
568  LogVerbatim("FwkSummary") << "TrigReport "
569  << std::right << std::setw(5) << 1
570  << std::right << std::setw(5) << *epi << " "
571  << std::right << std::setw(10) << totalEvents() << " "
572  << std::right << std::setw(10) << totalEvents() << " "
573  << std::right << std::setw(10) << 0 << " "
574  << std::right << std::setw(10) << 0 << " "
575  << *epn << "";
576  }
577  */
578 
579  LogVerbatim("FwkSummary") << "";
580  LogVerbatim("FwkSummary") << "TrigReport " << "-------End-Path Summary ------------";
581  LogVerbatim("FwkSummary") << "TrigReport "
582  << std::right << std::setw(10) << "Trig Bit#" << " "
583  << std::right << std::setw(10) << "Run" << " "
584  << std::right << std::setw(10) << "Passed" << " "
585  << std::right << std::setw(10) << "Failed" << " "
586  << std::right << std::setw(10) << "Error" << " "
587  << "Name" << "";
588  for (auto const& p: tr.endPathSummaries) {
589  LogVerbatim("FwkSummary") << "TrigReport "
590  << std::right << std::setw(5) << 0
591  << std::right << std::setw(5) << p.bitPosition << " "
592  << std::right << std::setw(10) << p.timesRun << " "
593  << std::right << std::setw(10) << p.timesPassed << " "
594  << std::right << std::setw(10) << p.timesFailed << " "
595  << std::right << std::setw(10) << p.timesExcept << " "
596  << p.name << "";
597  }
598 
599  for (auto const& p: tr.trigPathSummaries) {
600  LogVerbatim("FwkSummary") << "";
601  LogVerbatim("FwkSummary") << "TrigReport " << "---------- Modules in Path: " << p.name << " ------------";
602  LogVerbatim("FwkSummary") << "TrigReport "
603  << std::right << std::setw(10) << "Trig Bit#" << " "
604  << std::right << std::setw(10) << "Visited" << " "
605  << std::right << std::setw(10) << "Passed" << " "
606  << std::right << std::setw(10) << "Failed" << " "
607  << std::right << std::setw(10) << "Error" << " "
608  << "Name" << "";
609 
610  unsigned int bitpos = 0;
611  for (auto const& mod: p.moduleInPathSummaries) {
612  LogVerbatim("FwkSummary") << "TrigReport "
613  << std::right << std::setw(5) << 1
614  << std::right << std::setw(5) << bitpos << " "
615  << std::right << std::setw(10) << mod.timesVisited << " "
616  << std::right << std::setw(10) << mod.timesPassed << " "
617  << std::right << std::setw(10) << mod.timesFailed << " "
618  << std::right << std::setw(10) << mod.timesExcept << " "
619  << mod.moduleLabel << "";
620  ++bitpos;
621  }
622  }
623 
624  for (auto const& p: tr.endPathSummaries) {
625  LogVerbatim("FwkSummary") << "";
626  LogVerbatim("FwkSummary") << "TrigReport " << "------ Modules in End-Path: " << p.name << " ------------";
627  LogVerbatim("FwkSummary") << "TrigReport "
628  << std::right << std::setw(10) << "Trig Bit#" << " "
629  << std::right << std::setw(10) << "Visited" << " "
630  << std::right << std::setw(10) << "Passed" << " "
631  << std::right << std::setw(10) << "Failed" << " "
632  << std::right << std::setw(10) << "Error" << " "
633  << "Name" << "";
634 
635  unsigned int bitpos=0;
636  for (auto const& mod: p.moduleInPathSummaries) {
637  LogVerbatim("FwkSummary") << "TrigReport "
638  << std::right << std::setw(5) << 0
639  << std::right << std::setw(5) << bitpos << " "
640  << std::right << std::setw(10) << mod.timesVisited << " "
641  << std::right << std::setw(10) << mod.timesPassed << " "
642  << std::right << std::setw(10) << mod.timesFailed << " "
643  << std::right << std::setw(10) << mod.timesExcept << " "
644  << mod.moduleLabel << "";
645  ++bitpos;
646  }
647  }
648 
649  LogVerbatim("FwkSummary") << "";
650  LogVerbatim("FwkSummary") << "TrigReport " << "---------- Module Summary ------------";
651  LogVerbatim("FwkSummary") << "TrigReport "
652  << std::right << std::setw(10) << "Visited" << " "
653  << std::right << std::setw(10) << "Run" << " "
654  << std::right << std::setw(10) << "Passed" << " "
655  << std::right << std::setw(10) << "Failed" << " "
656  << std::right << std::setw(10) << "Error" << " "
657  << "Name" << "";
658  for (auto const& worker : tr.workerSummaries) {
659  LogVerbatim("FwkSummary") << "TrigReport "
660  << std::right << std::setw(10) << worker.timesVisited << " "
661  << std::right << std::setw(10) << worker.timesRun << " "
662  << std::right << std::setw(10) << worker.timesPassed << " "
663  << std::right << std::setw(10) << worker.timesFailed << " "
664  << std::right << std::setw(10) << worker.timesExcept << " "
665  << worker.moduleLabel << "";
666  }
667  LogVerbatim("FwkSummary") << "";
668  }
669  // The timing report (CPU and Real Time):
672 
673  const int totalEvents = std::max(1, tr.eventSummary.totalEvents);
674 
675  LogVerbatim("FwkSummary") << "TimeReport " << "---------- Event Summary ---[sec]----";
676  LogVerbatim("FwkSummary") << "TimeReport"
677  << std::setprecision(6) << std::fixed
678  << " CPU/event = " << tr.eventSummary.cpuTime/totalEvents
679  << " Real/event = " << tr.eventSummary.realTime/totalEvents
680  << "";
681 
682  LogVerbatim("FwkSummary") << "";
683  LogVerbatim("FwkSummary") << "TimeReport " << "---------- Path Summary ---[sec]----";
684  LogVerbatim("FwkSummary") << "TimeReport "
685  << std::right << std::setw(22) << "per event "
686  << std::right << std::setw(22) << "per path-run "
687  << "";
688  LogVerbatim("FwkSummary") << "TimeReport "
689  << std::right << std::setw(10) << "CPU" << " "
690  << std::right << std::setw(10) << "Real" << " "
691  << std::right << std::setw(10) << "CPU" << " "
692  << std::right << std::setw(10) << "Real" << " "
693  << "Name" << "";
694  for (auto const& p: tr.trigPathSummaries) {
695  const int timesRun = std::max(1, p.timesRun);
696  LogVerbatim("FwkSummary") << "TimeReport "
697  << std::setprecision(6) << std::fixed
698  << std::right << std::setw(10) << p.cpuTime/totalEvents << " "
699  << std::right << std::setw(10) << p.realTime/totalEvents << " "
700  << std::right << std::setw(10) << p.cpuTime/timesRun << " "
701  << std::right << std::setw(10) << p.realTime/timesRun << " "
702  << p.name << "";
703  }
704  LogVerbatim("FwkSummary") << "TimeReport "
705  << std::right << std::setw(10) << "CPU" << " "
706  << std::right << std::setw(10) << "Real" << " "
707  << std::right << std::setw(10) << "CPU" << " "
708  << std::right << std::setw(10) << "Real" << " "
709  << "Name" << "";
710  LogVerbatim("FwkSummary") << "TimeReport "
711  << std::right << std::setw(22) << "per event "
712  << std::right << std::setw(22) << "per path-run "
713  << "";
714 
715  LogVerbatim("FwkSummary") << "";
716  LogVerbatim("FwkSummary") << "TimeReport " << "-------End-Path Summary ---[sec]----";
717  LogVerbatim("FwkSummary") << "TimeReport "
718  << std::right << std::setw(22) << "per event "
719  << std::right << std::setw(22) << "per endpath-run "
720  << "";
721  LogVerbatim("FwkSummary") << "TimeReport "
722  << std::right << std::setw(10) << "CPU" << " "
723  << std::right << std::setw(10) << "Real" << " "
724  << std::right << std::setw(10) << "CPU" << " "
725  << std::right << std::setw(10) << "Real" << " "
726  << "Name" << "";
727  for (auto const& p: tr.endPathSummaries) {
728  const int timesRun = std::max(1, p.timesRun);
729 
730  LogVerbatim("FwkSummary") << "TimeReport "
731  << std::setprecision(6) << std::fixed
732  << std::right << std::setw(10) << p.cpuTime/totalEvents << " "
733  << std::right << std::setw(10) << p.realTime/totalEvents << " "
734  << std::right << std::setw(10) << p.cpuTime/timesRun << " "
735  << std::right << std::setw(10) << p.realTime/timesRun << " "
736  << p.name << "";
737  }
738  LogVerbatim("FwkSummary") << "TimeReport "
739  << std::right << std::setw(10) << "CPU" << " "
740  << std::right << std::setw(10) << "Real" << " "
741  << std::right << std::setw(10) << "CPU" << " "
742  << std::right << std::setw(10) << "Real" << " "
743  << "Name" << "";
744  LogVerbatim("FwkSummary") << "TimeReport "
745  << std::right << std::setw(22) << "per event "
746  << std::right << std::setw(22) << "per endpath-run "
747  << "";
748 
749  for (auto const& p: tr.trigPathSummaries) {
750  LogVerbatim("FwkSummary") << "";
751  LogVerbatim("FwkSummary") << "TimeReport " << "---------- Modules in Path: " << p.name << " ---[sec]----";
752  LogVerbatim("FwkSummary") << "TimeReport "
753  << std::right << std::setw(22) << "per event "
754  << std::right << std::setw(22) << "per module-visit "
755  << "";
756  LogVerbatim("FwkSummary") << "TimeReport "
757  << std::right << std::setw(10) << "CPU" << " "
758  << std::right << std::setw(10) << "Real" << " "
759  << std::right << std::setw(10) << "CPU" << " "
760  << std::right << std::setw(10) << "Real" << " "
761  << "Name" << "";
762  for (auto const& mod: p.moduleInPathSummaries) {
763  LogVerbatim("FwkSummary") << "TimeReport "
764  << std::setprecision(6) << std::fixed
765  << std::right << std::setw(10) << mod.cpuTime/totalEvents << " "
766  << std::right << std::setw(10) << mod.realTime/totalEvents << " "
767  << std::right << std::setw(10) << mod.cpuTime/std::max(1, mod.timesVisited) << " "
768  << std::right << std::setw(10) << mod.realTime/std::max(1, mod.timesVisited) << " "
769  << mod.moduleLabel << "";
770  }
771  }
772  LogVerbatim("FwkSummary") << "TimeReport "
773  << std::right << std::setw(10) << "CPU" << " "
774  << std::right << std::setw(10) << "Real" << " "
775  << std::right << std::setw(10) << "CPU" << " "
776  << std::right << std::setw(10) << "Real" << " "
777  << "Name" << "";
778  LogVerbatim("FwkSummary") << "TimeReport "
779  << std::right << std::setw(22) << "per event "
780  << std::right << std::setw(22) << "per module-visit "
781  << "";
782 
783  for (auto const& p: tr.endPathSummaries) {
784  LogVerbatim("FwkSummary") << "";
785  LogVerbatim("FwkSummary") << "TimeReport " << "------ Modules in End-Path: " << p.name << " ---[sec]----";
786  LogVerbatim("FwkSummary") << "TimeReport "
787  << std::right << std::setw(22) << "per event "
788  << std::right << std::setw(22) << "per module-visit "
789  << "";
790  LogVerbatim("FwkSummary") << "TimeReport "
791  << std::right << std::setw(10) << "CPU" << " "
792  << std::right << std::setw(10) << "Real" << " "
793  << std::right << std::setw(10) << "CPU" << " "
794  << std::right << std::setw(10) << "Real" << " "
795  << "Name" << "";
796  for (auto const& mod: p.moduleInPathSummaries) {
797  LogVerbatim("FwkSummary") << "TimeReport "
798  << std::setprecision(6) << std::fixed
799  << std::right << std::setw(10) << mod.cpuTime/totalEvents << " "
800  << std::right << std::setw(10) << mod.realTime/totalEvents << " "
801  << std::right << std::setw(10) << mod.cpuTime/std::max(1, mod.timesVisited) << " "
802  << std::right << std::setw(10) << mod.realTime/std::max(1, mod.timesVisited) << " "
803  << mod.moduleLabel << "";
804  }
805  }
806  LogVerbatim("FwkSummary") << "TimeReport "
807  << std::right << std::setw(10) << "CPU" << " "
808  << std::right << std::setw(10) << "Real" << " "
809  << std::right << std::setw(10) << "CPU" << " "
810  << std::right << std::setw(10) << "Real" << " "
811  << "Name" << "";
812  LogVerbatim("FwkSummary") << "TimeReport "
813  << std::right << std::setw(22) << "per event "
814  << std::right << std::setw(22) << "per module-visit "
815  << "";
816 
817  LogVerbatim("FwkSummary") << "";
818  LogVerbatim("FwkSummary") << "TimeReport " << "---------- Module Summary ---[sec]----";
819  LogVerbatim("FwkSummary") << "TimeReport "
820  << std::right << std::setw(22) << "per event "
821  << std::right << std::setw(22) << "per module-run "
822  << std::right << std::setw(22) << "per module-visit "
823  << "";
824  LogVerbatim("FwkSummary") << "TimeReport "
825  << std::right << std::setw(10) << "CPU" << " "
826  << std::right << std::setw(10) << "Real" << " "
827  << std::right << std::setw(10) << "CPU" << " "
828  << std::right << std::setw(10) << "Real" << " "
829  << std::right << std::setw(10) << "CPU" << " "
830  << std::right << std::setw(10) << "Real" << " "
831  << "Name" << "";
832  for (auto const& worker : tr.workerSummaries) {
833  LogVerbatim("FwkSummary") << "TimeReport "
834  << std::setprecision(6) << std::fixed
835  << std::right << std::setw(10) << worker.cpuTime/totalEvents << " "
836  << std::right << std::setw(10) << worker.realTime/totalEvents << " "
837  << std::right << std::setw(10) << worker.cpuTime/std::max(1, worker.timesRun) << " "
838  << std::right << std::setw(10) << worker.realTime/std::max(1, worker.timesRun) << " "
839  << std::right << std::setw(10) << worker.cpuTime/std::max(1, worker.timesVisited) << " "
840  << std::right << std::setw(10) << worker.realTime/std::max(1, worker.timesVisited) << " "
841  << worker.moduleLabel << "";
842  }
843  LogVerbatim("FwkSummary") << "TimeReport "
844  << std::right << std::setw(10) << "CPU" << " "
845  << std::right << std::setw(10) << "Real" << " "
846  << std::right << std::setw(10) << "CPU" << " "
847  << std::right << std::setw(10) << "Real" << " "
848  << std::right << std::setw(10) << "CPU" << " "
849  << std::right << std::setw(10) << "Real" << " "
850  << "Name" << "";
851  LogVerbatim("FwkSummary") << "TimeReport "
852  << std::right << std::setw(22) << "per event "
853  << std::right << std::setw(22) << "per module-run "
854  << std::right << std::setw(22) << "per module-visit "
855  << "";
856 
857  LogVerbatim("FwkSummary") << "";
858  LogVerbatim("FwkSummary") << "T---Report end!" << "";
859  LogVerbatim("FwkSummary") << "";
860  }
861 
864  }
865 
868  }
869 
871  for_all(all_output_communicators_, boost::bind(&OutputModuleCommunicator::openFile, _1, boost::cref(fb)));
872  }
873 
874  void Schedule::writeRun(RunPrincipal const& rp, ProcessContext const* processContext) {
875  for_all(all_output_communicators_, boost::bind(&OutputModuleCommunicator::writeRun, _1, boost::cref(rp), processContext));
876  }
877 
878  void Schedule::writeLumi(LuminosityBlockPrincipal const& lbp, ProcessContext const* processContext) {
879  for_all(all_output_communicators_, boost::bind(&OutputModuleCommunicator::writeLumi, _1, boost::cref(lbp), processContext));
880  }
881 
883  // Return true iff at least one output module returns true.
884  return (std::find_if (all_output_communicators_.begin(), all_output_communicators_.end(),
886  != all_output_communicators_.end());
887  }
888 
890  for_all(allWorkers(), boost::bind(&Worker::respondToOpenInputFile, _1, boost::cref(fb)));
891  }
892 
894  for_all(allWorkers(), boost::bind(&Worker::respondToCloseInputFile, _1, boost::cref(fb)));
895  }
896 
897  void Schedule::beginJob(ProductRegistry const& iRegistry) {
899 
900  globalSchedule_->beginJob(iRegistry);
901  }
902 
903  void Schedule::beginStream(unsigned int iStreamID) {
904  assert(iStreamID<streamSchedules_.size());
905  streamSchedules_[iStreamID]->beginStream();
906  }
907 
908  void Schedule::endStream(unsigned int iStreamID) {
909  assert(iStreamID<streamSchedules_.size());
910  streamSchedules_[iStreamID]->endStream();
911  }
912 
914  for_all(allWorkers(), boost::bind(&Worker::preForkReleaseResources, _1));
915  }
916  void Schedule::postForkReacquireResources(unsigned int iChildIndex, unsigned int iNumberOfChildren) {
917  for_all(allWorkers(), boost::bind(&Worker::postForkReacquireResources, _1, iChildIndex, iNumberOfChildren));
918  }
919 
921  ParameterSet const& iPSet) {
922  Worker* found = nullptr;
923  for (auto const& worker : allWorkers()) {
924  if (worker->description().moduleLabel() == iLabel) {
925  found = worker;
926  break;
927  }
928  }
929  if (nullptr == found) {
930  return false;
931  }
932 
933  auto newMod = moduleRegistry_->replaceModule(iLabel,iPSet,preallocConfig_);
934 
935  globalSchedule_->replaceModule(newMod,iLabel);
936 
937  for(auto s: streamSchedules_) {
938  s->replaceModule(newMod,iLabel);
939  }
940  return true;
941  }
942 
943  std::vector<ModuleDescription const*>
945  std::vector<ModuleDescription const*> result;
946  result.reserve(allWorkers().size());
947 
948  for (auto const& worker : allWorkers()) {
949  ModuleDescription const* p = worker->descPtr();
950  result.push_back(p);
951  }
952  return result;
953  }
954 
955  Schedule::AllWorkers const&
957  return globalSchedule_->allWorkers();
958  }
959 
960  void
961  Schedule::availablePaths(std::vector<std::string>& oLabelsToFill) const {
962  streamSchedules_[0]->availablePaths(oLabelsToFill);
963  }
964 
965  void
967  std::vector<std::string>& oLabelsToFill) const {
968  streamSchedules_[0]->modulesInPath(iPathLabel,oLabelsToFill);
969  }
970 
971  void
973  endpathsAreActive_ = active;
974  for(auto const & s : streamSchedules_) {
975  s->enableEndPaths(active);
976  }
977  }
978 
979  bool
981  return endpathsAreActive_;
982  }
983 
984  void
986  rep.eventSummary.totalEvents = 0;
989  for(auto& s: streamSchedules_) {
990  s->getTriggerReport(rep);
991  }
992  }
993 
994  void
996  rep.eventSummary.totalEvents = 0;
997  rep.eventSummary.cpuTime = 0.;
998  rep.eventSummary.realTime = 0.;
999  for(auto& s: streamSchedules_) {
1000  s->getTriggerTimingReport(rep);
1001  }
1002  }
1003 
1004  int
1006  int returnValue = 0;
1007  for(auto& s: streamSchedules_) {
1008  returnValue += s->totalEvents();
1009  }
1010  return returnValue;
1011  }
1012 
1013  int
1015  int returnValue = 0;
1016  for(auto& s: streamSchedules_) {
1017  returnValue += s->totalEventsPassed();
1018  }
1019  return returnValue;
1020  }
1021 
1022  int
1024  int returnValue = 0;
1025  for(auto& s: streamSchedules_) {
1026  returnValue += s->totalEventsFailed();
1027  }
1028  return returnValue;
1029  }
1030 
1031 
1032  void
1034  for(auto const& s: streamSchedules_) {
1035  s->clearCounters();
1036  }
1037  }
1038 
1039  //====================================
1040  // Schedule::checkForCorrectness algorithm
1041  //
1042  // The code creates a 'dependency' graph between all
1043  // modules. A module depends on another module if
1044  // 1) it 'consumes' data produced by that module
1045  // 2) it appears directly after the module within a Path
1046  //
1047  // If there is a cycle in the 'dependency' graph then
1048  // the schedule may be unrunnable. The schedule is still
1049  // runnable if all cycles have at least two edges which
1050  // connect modules only by Path dependencies (i.e. not
1051  // linked by a data dependency).
1052  //
1053  // Example 1:
1054  // C consumes data from B
1055  // Path 1: A + B + C
1056  // Path 2: B + C + A
1057  //
1058  // Cycle: A after C [p2], C consumes B, B after A [p1]
1059  // Since this cycle has 2 path only edges it is OK since
1060  // A and (B+C) are independent so their run order doesn't matter
1061  //
1062  // Example 2:
1063  // B consumes A
1064  // C consumes B
1065  // Path: C + A
1066  //
1067  // Cycle: A after C [p], C consumes B, B consumes A
1068  // Since this cycle has 1 path only edge it is unrunnable.
1069  //
1070  // Example 3:
1071  // A consumes B
1072  // B consumes C
1073  // C consumes A
1074  // (no Path since unscheduled execution)
1075  //
1076  // Cycle: A consumes B, B consumes C, C consumes A
1077  // Since this cycle has 0 path only edges it is unrunnable.
1078  //====================================
1079 
1080  namespace {
1081  typedef std::pair<unsigned int, unsigned int> SimpleEdge;
1082  typedef std::map<SimpleEdge, std::vector<unsigned int>> EdgeToPathMap;
1083 
1084  typedef boost::adjacency_list<boost::vecS, boost::vecS, boost::bidirectionalS> Graph;
1085 
1086  typedef boost::graph_traits<Graph>::edge_descriptor Edge;
1087  struct cycle_detector : public boost::dfs_visitor<> {
1088 
1089  cycle_detector(EdgeToPathMap const& iEdgeToPathMap,
1090  std::vector<std::string> const& iPathNames,
1091  std::map<std::string,unsigned int> const& iModuleNamesToIndex):
1092  m_edgeToPathMap(iEdgeToPathMap),
1093  m_pathNames(iPathNames),
1094  m_namesToIndex(iModuleNamesToIndex){}
1095 
1096  void tree_edge(Edge iEdge, Graph const&) {
1097  m_stack.push_back(iEdge);
1098  }
1099 
1100  void finish_edge(Edge iEdge, Graph const& iGraph) {
1101  if(not m_stack.empty()) {
1102  if (iEdge == m_stack.back()) {
1103  m_stack.pop_back();
1104  }
1105  }
1106  }
1107 
1108  //Called if a cycle happens
1109  void back_edge(Edge iEdge, Graph const& iGraph) {
1110  //NOTE: If the path containing the cycle contains two or more
1111  // path only edges then there is no problem
1112 
1114  IndexMap const& index = get(boost::vertex_index, iGraph);
1115 
1116  unsigned int vertex = index[target(iEdge,iGraph)];
1117 
1118  //Find last edge which starts with this vertex
1119  std::list<Edge>::iterator itFirst = m_stack.begin();
1120  {
1121  bool seenVertex = false;
1122  while(itFirst != m_stack.end()) {
1123  if(not seenVertex) {
1124  if(index[source(*itFirst,iGraph)] == vertex) {
1125  seenVertex = true;
1126  }
1127  } else
1128  if (index[source(*itFirst,iGraph)] != vertex) {
1129  break;
1130  }
1131  ++itFirst;
1132  }
1133  if(itFirst != m_stack.begin()) {
1134  --itFirst;
1135  }
1136  }
1137  //This edge has not been added to the stack yet
1138  // making a copy allows us to add it in but not worry
1139  // about removing it at the end of the routine
1140  std::vector<Edge> tempStack;
1141  tempStack.reserve(m_stack.size()+1);
1142  tempStack.insert(tempStack.end(),itFirst,m_stack.end());
1143  tempStack.emplace_back(iEdge);
1144 
1145  unsigned int nPathDependencyOnly =0;
1146  for(auto const& edge: tempStack) {
1147  unsigned int in =index[source(edge,iGraph)];
1148  unsigned int out =index[target(edge,iGraph)];
1149 
1150  auto iFound = m_edgeToPathMap.find(SimpleEdge(in,out));
1151  bool pathDependencyOnly = true;
1152  for(auto dependency : iFound->second) {
1153  if (dependency == std::numeric_limits<unsigned int>::max()) {
1154  pathDependencyOnly = false;
1155  break;
1156  }
1157  }
1158  if (pathDependencyOnly) {
1159  ++nPathDependencyOnly;
1160  }
1161  }
1162  if(nPathDependencyOnly < 2) {
1163  reportError(tempStack,index,iGraph);
1164  }
1165  }
1166  private:
1167  std::string const& pathName(unsigned int iIndex) const {
1168  return m_pathNames[iIndex];
1169  }
1170 
1171  std::string const& moduleName(unsigned int iIndex) const {
1172  for(auto const& item : m_namesToIndex) {
1173  if(item.second == iIndex) {
1174  return item.first;
1175  }
1176  }
1177  assert(false);
1178  }
1179 
1180  void
1181  reportError(std::vector<Edge>const& iEdges,
1183  Graph const& iGraph) const {
1184  std::stringstream oStream;
1185  oStream <<"Module run order problem found: \n";
1186  bool first_edge = true;
1187  for(auto const& edge: iEdges) {
1188  unsigned int in =iIndex[source(edge,iGraph)];
1189  unsigned int out =iIndex[target(edge,iGraph)];
1190 
1191  if(first_edge) {
1192  first_edge = false;
1193  } else {
1194  oStream<<", ";
1195  }
1196  oStream <<moduleName(in);
1197 
1198  auto iFound = m_edgeToPathMap.find(SimpleEdge(in,out));
1199  bool pathDependencyOnly = true;
1200  for(auto dependency : iFound->second) {
1201  if (dependency == std::numeric_limits<unsigned int>::max()) {
1202  pathDependencyOnly = false;
1203  break;
1204  }
1205  }
1206  if (pathDependencyOnly) {
1207  oStream <<" after "<<moduleName(out)<<" [path "<<pathName(iFound->second[0])<<"]";
1208  } else {
1209  oStream <<" consumes "<<moduleName(out);
1210  }
1211  }
1212  oStream<<"\n Running in the threaded framework would lead to indeterminate results."
1213  "\n Please change order of modules in mentioned Path(s) to avoid inconsistent module ordering.";
1214 
1215  LogError("UnrunnableSchedule")<<oStream.str();
1216  }
1217 
1218  EdgeToPathMap const& m_edgeToPathMap;
1219  std::vector<std::string> const& m_pathNames;
1220  std::map<std::string,unsigned int> m_namesToIndex;
1221 
1222  std::list<Edge> m_stack;
1223  };
1224  }
1225 
1226  void
1228  {
1229  //Need to lookup names to ids quickly
1230  std::map<std::string,unsigned int> moduleNamesToIndex;
1231  for(auto worker: allWorkers()) {
1232  moduleNamesToIndex.insert( std::make_pair(worker->description().moduleLabel(),
1233  worker->description().id()));
1234  }
1235 
1236  //If a module to module dependency comes from a path, remember which path
1237  EdgeToPathMap edgeToPathMap;
1238 
1239  //determine the path dependencies
1240  std::vector<std::string> pathNames;
1241  {
1242  streamSchedules_[0]->availablePaths(pathNames);
1243 
1244  std::vector<std::string> moduleNames;
1245  std::vector<std::string> reducedModuleNames;
1246  unsigned int pathIndex=0;
1247  for(auto const& path: pathNames) {
1248  moduleNames.clear();
1249  reducedModuleNames.clear();
1250  std::set<std::string> alreadySeenNames;
1251 
1252  streamSchedules_[0]->modulesInPath(path,moduleNames);
1253  std::string lastModuleName;
1254  unsigned int lastModuleIndex;
1255  for(auto const& name: moduleNames) {
1256  auto found = alreadySeenNames.insert(name);
1257  if(found.second) {
1258  //first time for this path
1259  unsigned int moduleIndex = moduleNamesToIndex[name];
1260  if(not lastModuleName.empty() ) {
1261  edgeToPathMap[std::make_pair(moduleIndex,lastModuleIndex)].push_back(pathIndex);
1262  }
1263  lastModuleName = name;
1264  lastModuleIndex = moduleIndex;
1265  }
1266  }
1267  ++pathIndex;
1268  }
1269  }
1270  {
1271  std::vector<const char*> dependentModules;
1272  //determine the data dependencies
1273  for(auto const& worker: allWorkers()) {
1274  dependentModules.clear();
1275  //NOTE: what about aliases?
1276  worker->modulesDependentUpon(dependentModules);
1277  auto found = moduleNamesToIndex.find(worker->description().moduleLabel());
1278  if (found == moduleNamesToIndex.end()) {
1279  //The module was from a previous process
1280  continue;
1281  }
1282  unsigned int moduleIndex = found->second;
1283  for(auto name: dependentModules) {
1284  edgeToPathMap[std::make_pair(moduleIndex, moduleNamesToIndex[name])].push_back(std::numeric_limits<unsigned int>::max());
1285  }
1286  }
1287  }
1288  //Now use boost graph library to find cycles in the dependencies
1289  std::vector<SimpleEdge> outList;
1290  outList.reserve(edgeToPathMap.size());
1291  for(auto const& edgeInfo: edgeToPathMap) {
1292  outList.push_back(edgeInfo.first);
1293  }
1294 
1295  Graph g(outList.begin(),outList.end(), moduleNamesToIndex.size());
1296 
1297  cycle_detector detector(edgeToPathMap,pathNames,moduleNamesToIndex);
1298  boost::depth_first_search(g,boost::visitor(detector));
1299  }
1300 }
type
Definition: HCALResponse.h:21
bool empty() const
Definition: ParameterSet.h:216
std::vector< PathSummary > endPathSummaries
Definition: TriggerReport.h:64
T getUntrackedParameter(std::string const &, T const &) const
std::vector< PathTimingSummary > endPathSummaries
dictionary aliases
Definition: autoCond.py:51
int i
Definition: DBlmapReader.cc:9
string rep
Definition: cuy.py:1188
void checkForCorrectness() const
Check that the schedule is actually runable.
Definition: Schedule.cc:1227
std::vector< BranchIDList > BranchIDLists
Definition: BranchIDList.h:19
virtual void openFile(FileBlock const &fb)=0
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
Definition: Schedule.cc:956
void availablePaths(std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill the labels for all paths in the process
Definition: Schedule.cc:961
static std::string const source("source")
virtual void writeRun(RunPrincipal const &rp, ProcessContext const *)=0
preallocConfig_(prealloc)
bool endPathsEnabled() const
Definition: Schedule.cc:980
all_output_communicators_()
void respondToCloseInputFile(FileBlock const &fb)
Definition: Schedule.cc:893
bool changeModule(std::string const &iLabel, ParameterSet const &iPSet)
Definition: Schedule.cc:920
std::vector< Worker * > AllWorkers
Definition: Schedule.h:113
std::vector< ParameterSet > VParameterSet
Definition: ParameterSet.h:33
void writeRun(RunPrincipal const &rp, ProcessContext const *)
Definition: Schedule.cc:874
void endStream(unsigned int)
Definition: Schedule.cc:908
void writeLumi(LuminosityBlockPrincipal const &lbp, ProcessContext const *)
Definition: Schedule.cc:878
void enableEndPaths(bool active)
Definition: Schedule.cc:972
processConfiguration
Definition: Schedule.cc:369
ParameterSet getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
endpathsAreActive_(true)
Definition: Schedule.cc:374
std::vector< WorkerSummary > workerSummaries
Definition: TriggerReport.h:65
actions
Definition: Schedule.cc:369
static unsigned int getUniqueID()
Returns a unique id each time called. Intended to be passed to ModuleDescription&#39;s constructor&#39;s modI...
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e g
Definition: Activities.doc:4
int totalEventsFailed() const
Definition: Schedule.cc:1023
void eraseOrSetUntrackedParameterSet(std::string const &name)
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
std::vector< std::string > getParameterNamesForType(bool trackiness=true) const
Definition: ParameterSet.h:192
int totalEventsPassed() const
Definition: Schedule.cc:1014
std::string moduleName(Provenance const &provenance)
Definition: Provenance.cc:27
std::vector< PathSummary > trigPathSummaries
Definition: TriggerReport.h:63
EventSummary eventSummary
Definition: TriggerReport.h:62
void limitOutput(ParameterSet const &proc_pset, BranchIDLists const &branchIDLists)
Definition: Schedule.cc:459
const T & max(const T &a, const T &b)
wantSummary_(tns.wantSummary())
int totalEvents() const
Definition: Schedule.cc:1005
virtual void openNewFileIfNeeded()=0
EventTimingSummary eventSummary
void clearCounters()
Clear all the counters in the trigger report.
Definition: Schedule.cc:1033
tuple result
Definition: query.py:137
boost::shared_ptr< ModuleRegistry > moduleRegistry_
Definition: Schedule.h:244
std::vector< PathTimingSummary > trigPathSummaries
std::vector< std::shared_ptr< StreamSchedule > > streamSchedules_
Definition: Schedule.h:245
void setProcessParameterSetID(ParameterSetID const &id)
Associated free functions.
Definition: Registry.cc:76
bool terminate() const
Return whether each output module has reached its maximum count.
Definition: Schedule.cc:498
void respondToOpenInputFile(FileBlock const &fb)
Definition: Schedule.cc:889
virtual void writeLumi(LuminosityBlockPrincipal const &lbp, ProcessContext const *)=0
areg
Definition: Schedule.cc:369
void postForkReacquireResources(unsigned int iChildIndex, unsigned int iNumberOfChildren)
Definition: Worker.h:90
void getTriggerReport(TriggerReport &rep) const
Definition: Schedule.cc:985
tuple out
Definition: dbtoconf.py:99
virtual bool shouldWeCloseFile() const =0
PreallocationConfiguration preallocConfig_
Definition: Schedule.h:250
moduleRegistry_(new ModuleRegistry())
void sort_all(RandomAccessSequence &s)
wrappers for std::sort
Definition: Algorithms.h:120
volatile bool endpathsAreActive_
Definition: Schedule.h:255
void respondToOpenInputFile(FileBlock const &fb)
Definition: Worker.h:86
bool search_all(ForwardSequence const &s, Datum const &d)
Definition: Algorithms.h:46
virtual std::unique_ptr< OutputModuleCommunicator > createOutputModuleCommunicator()=0
AllOutputModuleCommunicators all_output_communicators_
Definition: Schedule.h:249
void loadMissingDictionaries()
void modulesInPath(std::string const &iPathLabel, std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel ...
Definition: Schedule.cc:966
Schedule(ParameterSet &proc_pset, service::TriggerNamesService &tns, ProductRegistry &pregistry, BranchIDListHelper &branchIDListHelper, ExceptionToActionTable const &actions, boost::shared_ptr< ActivityRegistry > areg, boost::shared_ptr< ProcessConfiguration > processConfiguration, const ParameterSet *subProcPSet, PreallocationConfiguration const &config, ProcessContext const *processContext)
Definition: Schedule.cc:358
void beginStream(unsigned int)
Definition: Schedule.cc:903
void respondToCloseInputFile(FileBlock const &fb)
Definition: Worker.h:87
void preForkReleaseResources()
Definition: Worker.h:89
bool wantSummary_
Definition: Schedule.h:253
void postForkReacquireResources(unsigned int iChildIndex, unsigned int iNumberOfChildren)
Definition: Schedule.cc:916
list key
Definition: combine.py:13
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
Definition: Schedule.cc:944
Strings const & getTrigPaths() const
std::unique_ptr< GlobalSchedule > globalSchedule_
Definition: Schedule.h:247
void beginJob(ProductRegistry const &)
Definition: Schedule.cc:897
void openNewOutputFilesIfNeeded()
Definition: Schedule.cc:866
size_t getParameterSetNames(std::vector< std::string > &output, bool trackiness=true) const
void preForkReleaseResources()
Definition: Schedule.cc:913
void openOutputFiles(FileBlock &fb)
Definition: Schedule.cc:870
preg
Definition: Schedule.cc:369
std::vector< WorkerTimingSummary > workerSummaries
T mod(const T &a, const T &b)
Definition: ecalDccMap.h:4
void endJob(ExceptionCollector &collector)
Definition: Schedule.cc:514
void getTriggerTimingReport(TriggerTimingReport &rep) const
Definition: Schedule.cc:995
bool shouldWeCloseOutput() const
Definition: Schedule.cc:882
std::vector< std::string > vstring
Definition: Schedule.cc:354
tuple size
Write out results.
void closeOutputFiles()
Definition: Schedule.cc:862
prealloc
Definition: Schedule.cc:369
std::string match(BranchDescription const &a, BranchDescription const &b, std::string const &fileName)