CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
Schedule.cc
Go to the documentation of this file.
2 
25 
26 #include "boost/graph/graph_traits.hpp"
27 #include "boost/graph/adjacency_list.hpp"
28 #include "boost/graph/depth_first_search.hpp"
29 #include "boost/graph/visitors.hpp"
30 
31 
32 #include <algorithm>
33 #include <cassert>
34 #include <cstdlib>
35 #include <functional>
36 #include <iomanip>
37 #include <list>
38 #include <map>
39 #include <exception>
40 #include <sstream>
41 
42 namespace edm {
43  namespace {
44  using std::placeholders::_1;
45 
46  bool binary_search_string(std::vector<std::string> const& v, std::string const& s) {
47  return std::binary_search(v.begin(), v.end(), s);
48  }
49 
50  // Here we make the trigger results inserter directly. This should
51  // probably be a utility in the WorkerRegistry or elsewhere.
52 
53  std::shared_ptr<TriggerResultInserter>
54  makeInserter(ParameterSet& proc_pset,
55  PreallocationConfiguration const& iPrealloc,
56  ProductRegistry& preg,
57  ExceptionToActionTable const& actions,
58  std::shared_ptr<ActivityRegistry> areg,
59  std::shared_ptr<ProcessConfiguration> processConfiguration) {
60 
61  ParameterSet* trig_pset = proc_pset.getPSetForUpdate("@trigger_paths");
62  trig_pset->registerIt();
63 
64  WorkerParams work_args(trig_pset, preg, &iPrealloc, processConfiguration, actions);
65  ModuleDescription md(trig_pset->id(),
66  "TriggerResultInserter",
67  "TriggerResults",
68  processConfiguration.get(),
70 
71  areg->preModuleConstructionSignal_(md);
72  bool postCalled = false;
73  std::shared_ptr<TriggerResultInserter> returnValue;
74  try {
75  maker::ModuleHolderT<TriggerResultInserter> holder(std::make_shared<TriggerResultInserter>(*trig_pset, iPrealloc.numberOfStreams()),static_cast<Maker const*>(nullptr));
76  holder.setModuleDescription(md);
77  holder.registerProductsAndCallbacks(&preg);
78  returnValue =holder.module();
79  postCalled = true;
80  // if exception then post will be called in the catch block
81  areg->postModuleConstructionSignal_(md);
82  }
83  catch (...) {
84  if(!postCalled) {
85  try {
86  areg->postModuleConstructionSignal_(md);
87  }
88  catch (...) {
89  // If post throws an exception ignore it because we are already handling another exception
90  }
91  }
92  throw;
93  }
94  return returnValue;
95  }
96 
97 
98  void
99  checkAndInsertAlias(std::string const& friendlyClassName,
100  std::string const& moduleLabel,
101  std::string const& productInstanceName,
102  std::string const& processName,
103  std::string const& alias,
104  std::string const& instanceAlias,
105  ProductRegistry const& preg,
106  std::multimap<BranchKey, BranchKey>& aliasMap,
107  std::map<BranchKey, BranchKey>& aliasKeys) {
108  std::string const star("*");
109 
110  BranchKey key(friendlyClassName, moduleLabel, productInstanceName, processName);
111  if(preg.productList().find(key) == preg.productList().end()) {
112  // No product was found matching the alias.
113  // We throw an exception only if a module with the specified module label was created in this process.
114  for(auto const& product : preg.productList()) {
115  if(moduleLabel == product.first.moduleLabel() && processName == product.first.processName()) {
116  throw Exception(errors::Configuration, "EDAlias does not match data\n")
117  << "There are no products of type '" << friendlyClassName << "'\n"
118  << "with module label '" << moduleLabel << "' and instance name '" << productInstanceName << "'.\n";
119  }
120  }
121  }
122 
123  std::string const& theInstanceAlias(instanceAlias == star ? productInstanceName : instanceAlias);
124  BranchKey aliasKey(friendlyClassName, alias, theInstanceAlias, processName);
125  if(preg.productList().find(aliasKey) != preg.productList().end()) {
126  throw Exception(errors::Configuration, "EDAlias conflicts with data\n")
127  << "A product of type '" << friendlyClassName << "'\n"
128  << "with module label '" << alias << "' and instance name '" << theInstanceAlias << "'\n"
129  << "already exists.\n";
130  }
131  auto iter = aliasKeys.find(aliasKey);
132  if(iter != aliasKeys.end()) {
133  // The alias matches a previous one. If the same alias is used for different product, throw.
134  if(iter->second != key) {
135  throw Exception(errors::Configuration, "EDAlias conflict\n")
136  << "The module label alias '" << alias << "' and product instance alias '" << theInstanceAlias << "'\n"
137  << "are used for multiple products of type '" << friendlyClassName << "'\n"
138  << "One has module label '" << moduleLabel << "' and product instance name '" << productInstanceName << "',\n"
139  << "the other has module label '" << iter->second.moduleLabel() << "' and product instance name '" << iter->second.productInstanceName() << "'.\n";
140  }
141  } else {
142  auto prodIter = preg.productList().find(key);
143  if(prodIter != preg.productList().end()) {
144  if (!prodIter->second.produced()) {
145  throw Exception(errors::Configuration, "EDAlias\n")
146  << "The module label alias '" << alias << "' and product instance alias '" << theInstanceAlias << "'\n"
147  << "are used for a product of type '" << friendlyClassName << "'\n"
148  << "with module label '" << moduleLabel << "' and product instance name '" << productInstanceName << "',\n"
149  << "An EDAlias can only be used for products produced in the current process. This one is not.\n";
150  }
151  aliasMap.insert(std::make_pair(key, aliasKey));
152  aliasKeys.insert(std::make_pair(aliasKey, key));
153  }
154  }
155  }
156 
157  void
158  processEDAliases(ParameterSet const& proc_pset, std::string const& processName, ProductRegistry& preg) {
159  std::vector<std::string> aliases = proc_pset.getParameter<std::vector<std::string> >("@all_aliases");
160  if(aliases.empty()) {
161  return;
162  }
163  std::string const star("*");
164  std::string const empty("");
166  desc.add<std::string>("type");
167  desc.add<std::string>("fromProductInstance", star);
168  desc.add<std::string>("toProductInstance", star);
169 
170  std::multimap<BranchKey, BranchKey> aliasMap;
171 
172  std::map<BranchKey, BranchKey> aliasKeys; // Used to search for duplicates or clashes.
173 
174  // Now, loop over the alias information and store it in aliasMap.
175  for(std::string const& alias : aliases) {
176  ParameterSet const& aliasPSet = proc_pset.getParameterSet(alias);
177  std::vector<std::string> vPSetNames = aliasPSet.getParameterNamesForType<VParameterSet>();
178  for(std::string const& moduleLabel : vPSetNames) {
179  VParameterSet vPSet = aliasPSet.getParameter<VParameterSet>(moduleLabel);
180  for(ParameterSet& pset : vPSet) {
181  desc.validate(pset);
182  std::string friendlyClassName = pset.getParameter<std::string>("type");
183  std::string productInstanceName = pset.getParameter<std::string>("fromProductInstance");
184  std::string instanceAlias = pset.getParameter<std::string>("toProductInstance");
185  if(productInstanceName == star) {
186  bool match = false;
187  BranchKey lowerBound(friendlyClassName, moduleLabel, empty, empty);
188  for(ProductRegistry::ProductList::const_iterator it = preg.productList().lower_bound(lowerBound);
189  it != preg.productList().end() && it->first.friendlyClassName() == friendlyClassName && it->first.moduleLabel() == moduleLabel;
190  ++it) {
191  if(it->first.processName() != processName) {
192  continue;
193  }
194  match = true;
195 
196  checkAndInsertAlias(friendlyClassName, moduleLabel, it->first.productInstanceName(), processName, alias, instanceAlias, preg, aliasMap, aliasKeys);
197  }
198  if(!match) {
199  // No product was found matching the alias.
200  // We throw an exception only if a module with the specified module label was created in this process.
201  for(auto const& product : preg.productList()) {
202  if(moduleLabel == product.first.moduleLabel() && processName == product.first.processName()) {
203  throw Exception(errors::Configuration, "EDAlias parameter set mismatch\n")
204  << "There are no products of type '" << friendlyClassName << "'\n"
205  << "with module label '" << moduleLabel << "'.\n";
206  }
207  }
208  }
209  } else {
210  checkAndInsertAlias(friendlyClassName, moduleLabel, productInstanceName, processName, alias, instanceAlias, preg, aliasMap, aliasKeys);
211  }
212  }
213  }
214  }
215 
216 
217  // Now add the new alias entries to the product registry.
218  for(auto const& aliasEntry : aliasMap) {
219  ProductRegistry::ProductList::const_iterator it = preg.productList().find(aliasEntry.first);
220  assert(it != preg.productList().end());
221  preg.addLabelAlias(it->second, aliasEntry.second.moduleLabel(), aliasEntry.second.productInstanceName());
222  }
223 
224  }
225 
226  typedef std::vector<std::string> vstring;
227 
228  void reduceParameterSet(ParameterSet& proc_pset,
229  vstring const& end_path_name_list,
230  vstring& modulesInConfig,
231  std::set<std::string> const& usedModuleLabels,
232  std::map<std::string, std::vector<std::pair<std::string, int> > >& outputModulePathPositions) {
233  // Before calculating the ParameterSetID of the top level ParameterSet or
234  // saving it in the registry drop from the top level ParameterSet all
235  // OutputModules and EDAnalyzers not on trigger paths. If unscheduled
236  // production is not enabled also drop all the EDFilters and EDProducers
237  // that are not scheduled. Drop the ParameterSet used to configure the module
238  // itself. Also drop the other traces of these labels in the top level
239  // ParameterSet: Remove that labels from @all_modules and from all the
240  // end paths. If this makes any end paths empty, then remove the end path
241  // name from @end_paths, and @paths.
242 
243  // First make a list of labels to drop
244  vstring outputModuleLabels;
245  std::string edmType;
246  std::string const moduleEdmType("@module_edm_type");
247  std::string const outputModule("OutputModule");
248  std::string const edAnalyzer("EDAnalyzer");
249  std::string const edFilter("EDFilter");
250  std::string const edProducer("EDProducer");
251 
252  std::set<std::string> modulesInConfigSet(modulesInConfig.begin(), modulesInConfig.end());
253 
254  //need a list of all modules on paths in order to determine
255  // if an EDAnalyzer only appears on an end path
256  vstring scheduledPaths = proc_pset.getParameter<vstring>("@paths");
257  std::set<std::string> modulesOnPaths;
258  {
259  std::set<std::string> noEndPaths(scheduledPaths.begin(),scheduledPaths.end());
260  for(auto const& endPath: end_path_name_list) {
261  noEndPaths.erase(endPath);
262  }
263  {
264  vstring labels;
265  for(auto const& path: noEndPaths) {
266  labels = proc_pset.getParameter<vstring>(path);
267  modulesOnPaths.insert(labels.begin(),labels.end());
268  }
269  }
270  }
271  //Initially fill labelsToBeDropped with all module mentioned in
272  // the configuration but which are not being used by the system
273  std::vector<std::string> labelsToBeDropped;
274  labelsToBeDropped.reserve(modulesInConfigSet.size());
275  std::set_difference(modulesInConfigSet.begin(),modulesInConfigSet.end(),
276  usedModuleLabels.begin(),usedModuleLabels.end(),
277  std::back_inserter(labelsToBeDropped));
278 
279  const unsigned int sizeBeforeOutputModules = labelsToBeDropped.size();
280  for (auto const& modLabel: usedModuleLabels) {
281  edmType = proc_pset.getParameterSet(modLabel).getParameter<std::string>(moduleEdmType);
282  if (edmType == outputModule) {
283  outputModuleLabels.push_back(modLabel);
284  labelsToBeDropped.push_back(modLabel);
285  }
286  if(edmType == edAnalyzer) {
287  if(modulesOnPaths.end()==modulesOnPaths.find(modLabel)) {
288  labelsToBeDropped.push_back(modLabel);
289  }
290  }
291  }
292  //labelsToBeDropped must be sorted
293  std::inplace_merge(labelsToBeDropped.begin(),
294  labelsToBeDropped.begin()+sizeBeforeOutputModules,
295  labelsToBeDropped.end());
296 
297  // drop the parameter sets used to configure the modules
298  for_all(labelsToBeDropped, std::bind(&ParameterSet::eraseOrSetUntrackedParameterSet, std::ref(proc_pset), _1));
299 
300  // drop the labels from @all_modules
301  vstring::iterator endAfterRemove = std::remove_if(modulesInConfig.begin(), modulesInConfig.end(), std::bind(binary_search_string, std::ref(labelsToBeDropped), _1));
302  modulesInConfig.erase(endAfterRemove, modulesInConfig.end());
303  proc_pset.addParameter<vstring>(std::string("@all_modules"), modulesInConfig);
304 
305  // drop the labels from all end paths
306  vstring endPathsToBeDropped;
307  vstring labels;
308  for (vstring::const_iterator iEndPath = end_path_name_list.begin(), endEndPath = end_path_name_list.end();
309  iEndPath != endEndPath;
310  ++iEndPath) {
311  labels = proc_pset.getParameter<vstring>(*iEndPath);
312  vstring::iterator iSave = labels.begin();
313  vstring::iterator iBegin = labels.begin();
314 
315  for (vstring::iterator iLabel = labels.begin(), iEnd = labels.end();
316  iLabel != iEnd; ++iLabel) {
317  if (binary_search_string(labelsToBeDropped, *iLabel)) {
318  if (binary_search_string(outputModuleLabels, *iLabel)) {
319  outputModulePathPositions[*iLabel].emplace_back(*iEndPath, iSave - iBegin);
320  }
321  } else {
322  if (iSave != iLabel) {
323  iSave->swap(*iLabel);
324  }
325  ++iSave;
326  }
327  }
328  labels.erase(iSave, labels.end());
329  if (labels.empty()) {
330  // remove empty end paths and save their names
331  proc_pset.eraseSimpleParameter(*iEndPath);
332  endPathsToBeDropped.push_back(*iEndPath);
333  } else {
334  proc_pset.addParameter<vstring>(*iEndPath, labels);
335  }
336  }
337  sort_all(endPathsToBeDropped);
338 
339  // remove empty end paths from @paths
340  endAfterRemove = std::remove_if(scheduledPaths.begin(), scheduledPaths.end(), std::bind(binary_search_string, std::ref(endPathsToBeDropped), _1));
341  scheduledPaths.erase(endAfterRemove, scheduledPaths.end());
342  proc_pset.addParameter<vstring>(std::string("@paths"), scheduledPaths);
343 
344  // remove empty end paths from @end_paths
345  vstring scheduledEndPaths = proc_pset.getParameter<vstring>("@end_paths");
346  endAfterRemove = std::remove_if(scheduledEndPaths.begin(), scheduledEndPaths.end(), std::bind(binary_search_string, std::ref(endPathsToBeDropped), _1));
347  scheduledEndPaths.erase(endAfterRemove, scheduledEndPaths.end());
348  proc_pset.addParameter<vstring>(std::string("@end_paths"), scheduledEndPaths);
349 
350  }
351  }
352  // -----------------------------
353 
354  typedef std::vector<std::string> vstring;
355 
356  // -----------------------------
357 
360  ProductRegistry& preg,
361  BranchIDListHelper& branchIDListHelper,
362  ThinnedAssociationsHelper& thinnedAssociationsHelper,
363  ExceptionToActionTable const& actions,
364  std::shared_ptr<ActivityRegistry> areg,
365  std::shared_ptr<ProcessConfiguration> processConfiguration,
366  const ParameterSet* subProcPSet,
368  ProcessContext const* processContext) :
369  //Only create a resultsInserter if there is a trigger path
370  resultsInserter_{tns.getTrigPaths().empty()? std::shared_ptr<TriggerResultInserter>{} :makeInserter(proc_pset,prealloc,preg,actions,areg,processConfiguration)},
371  moduleRegistry_(new ModuleRegistry()),
374  wantSummary_(tns.wantSummary()),
376  {
377  assert(0<prealloc.numberOfStreams());
378  streamSchedules_.reserve(prealloc.numberOfStreams());
379  for(unsigned int i=0; i<prealloc.numberOfStreams();++i) {
380  streamSchedules_.emplace_back(std::make_shared<StreamSchedule>(resultsInserter_,moduleRegistry_,proc_pset,tns,prealloc,preg,branchIDListHelper,actions,areg,processConfiguration,nullptr==subProcPSet,StreamID{i},processContext));
381  }
382 
383  //TriggerResults are injected automatically by StreamSchedules and are
384  // unknown to the ModuleRegistry
385  const std::string kTriggerResults("TriggerResults");
386  std::vector<std::string> modulesToUse;
387  modulesToUse.reserve(streamSchedules_[0]->allWorkers().size());
388  for(auto const& worker : streamSchedules_[0]->allWorkers()) {
389  if(worker->description().moduleLabel() != kTriggerResults) {
390  modulesToUse.push_back(worker->description().moduleLabel());
391  }
392  }
393  //The unscheduled modules are at the end of the list, but we want them at the front
394  unsigned int n = streamSchedules_[0]->numberOfUnscheduledModules();
395  if(n>0) {
396  std::vector<std::string> temp;
397  temp.reserve(modulesToUse.size());
398  auto itBeginUnscheduled = modulesToUse.begin()+modulesToUse.size()-n;
399  std::copy(itBeginUnscheduled,modulesToUse.end(),
400  std::back_inserter(temp));
401  std::copy(modulesToUse.begin(),itBeginUnscheduled,std::back_inserter(temp));
402  temp.swap(modulesToUse);
403  }
404  globalSchedule_.reset( new GlobalSchedule{ resultsInserter_,
406  modulesToUse,
407  proc_pset, preg, prealloc,
408  actions,areg,processConfiguration,processContext });
409 
410  //TriggerResults is not in the top level ParameterSet so the call to
411  // reduceParameterSet would fail to find it. Just remove it up front.
412  std::set<std::string> usedModuleLabels;
413  for( auto const worker: allWorkers()) {
414  if(worker->description().moduleLabel() != kTriggerResults) {
415  usedModuleLabels.insert(worker->description().moduleLabel());
416  }
417  }
418  std::vector<std::string> modulesInConfig(proc_pset.getParameter<std::vector<std::string> >("@all_modules"));
419  std::map<std::string, std::vector<std::pair<std::string, int> > > outputModulePathPositions;
420  reduceParameterSet(proc_pset, tns.getEndPaths(), modulesInConfig, usedModuleLabels,
421  outputModulePathPositions);
422  processEDAliases(proc_pset, processConfiguration->processName(), preg);
423  proc_pset.registerIt();
424  pset::setProcessParameterSetID(proc_pset.id());
425  processConfiguration->setParameterSetID(proc_pset.id());
426  processConfiguration->setProcessConfigurationID();
427 
428  // This is used for a little sanity-check to make sure no code
429  // modifications alter the number of workers at a later date.
430  size_t all_workers_count = allWorkers().size();
431 
432  moduleRegistry_->forAllModuleHolders([this](maker::ModuleHolder* iHolder){
433  auto comm = iHolder->createOutputModuleCommunicator();
434  if (comm) {
435  all_output_communicators_.emplace_back(std::shared_ptr<OutputModuleCommunicator>{comm.release()});
436  }
437  });
438  // Now that the output workers are filled in, set any output limits or information.
439  limitOutput(proc_pset, branchIDListHelper.branchIDLists());
440 
442 
443  // Sanity check: make sure nobody has added a worker after we've
444  // already relied on the WorkerManager being full.
445  assert (all_workers_count == allWorkers().size());
446 
447  branchIDListHelper.updateFromRegistry(preg);
448 
449  preg.setFrozen();
450 
451  for(auto const& worker : streamSchedules_[0]->allWorkers()) {
452  worker->registerThinnedAssociations(preg, thinnedAssociationsHelper);
453  }
454  thinnedAssociationsHelper.sort();
455 
456  for (auto c : all_output_communicators_) {
457  c->setEventSelectionInfo(outputModulePathPositions, preg.anyProductProduced());
458  c->selectProducts(preg, thinnedAssociationsHelper);
459  }
460 
461  if(wantSummary_) {
462  std::vector<const ModuleDescription*> modDesc;
463  const auto& workers = allWorkers();
464  modDesc.reserve(workers.size());
465 
466  std::transform(workers.begin(),workers.end(),
467  std::back_inserter(modDesc),
468  [](const Worker* iWorker) -> const ModuleDescription* {
469  return iWorker->descPtr();
470  });
471 
472  summaryTimeKeeper_.reset(new SystemTimeKeeper(prealloc.numberOfStreams(),
473  modDesc,
474  tns));
475  auto timeKeeperPtr = summaryTimeKeeper_.get();
476 
477  areg->watchPreModuleEvent(timeKeeperPtr, &SystemTimeKeeper::startModuleEvent);
478  areg->watchPostModuleEvent(timeKeeperPtr, &SystemTimeKeeper::stopModuleEvent);
479  areg->watchPreModuleEventDelayedGet(timeKeeperPtr, &SystemTimeKeeper::pauseModuleEvent);
480  areg->watchPostModuleEventDelayedGet(timeKeeperPtr,&SystemTimeKeeper::restartModuleEvent);
481 
482  areg->watchPreSourceEvent(timeKeeperPtr, &SystemTimeKeeper::startEvent);
483  areg->watchPostEvent(timeKeeperPtr, &SystemTimeKeeper::stopEvent);
484 
485  areg->watchPrePathEvent(timeKeeperPtr, &SystemTimeKeeper::startPath);
486  areg->watchPostPathEvent(timeKeeperPtr, &SystemTimeKeeper::stopPath);
487  //areg->preModuleEventSignal_.connect([timeKeeperPtr](StreamContext const& iContext, ModuleCallingContext const& iMod) {
488  //timeKeeperPtr->startModuleEvent(iContext,iMod);
489  //});
490  }
491 
492  } // Schedule::Schedule
493 
494 
495  void
496  Schedule::limitOutput(ParameterSet const& proc_pset, BranchIDLists const& branchIDLists) {
497  std::string const output("output");
498 
499  ParameterSet const& maxEventsPSet = proc_pset.getUntrackedParameterSet("maxEvents", ParameterSet());
500  int maxEventSpecs = 0;
501  int maxEventsOut = -1;
502  ParameterSet const* vMaxEventsOut = 0;
503  std::vector<std::string> intNamesE = maxEventsPSet.getParameterNamesForType<int>(false);
504  if (search_all(intNamesE, output)) {
505  maxEventsOut = maxEventsPSet.getUntrackedParameter<int>(output);
506  ++maxEventSpecs;
507  }
508  std::vector<std::string> psetNamesE;
509  maxEventsPSet.getParameterSetNames(psetNamesE, false);
510  if (search_all(psetNamesE, output)) {
511  vMaxEventsOut = &maxEventsPSet.getUntrackedParameterSet(output);
512  ++maxEventSpecs;
513  }
514 
515  if (maxEventSpecs > 1) {
517  "\nAt most, one form of 'output' may appear in the 'maxEvents' parameter set";
518  }
519 
520  for (auto c : all_output_communicators_) {
521  OutputModuleDescription desc(branchIDLists, maxEventsOut);
522  if (vMaxEventsOut != 0 && !vMaxEventsOut->empty()) {
523  std::string const& moduleLabel = c->description().moduleLabel();
524  try {
525  desc.maxEvents_ = vMaxEventsOut->getUntrackedParameter<int>(moduleLabel);
526  } catch (Exception const&) {
528  "\nNo entry in 'maxEvents' for output module label '" << moduleLabel << "'.\n";
529  }
530  }
531  c->configure(desc);
532  }
533  }
534 
535  bool Schedule::terminate() const {
536  if (all_output_communicators_.empty()) {
537  return false;
538  }
539  for (auto c : all_output_communicators_) {
540  if (!c->limitReached()) {
541  // Found an output module that has not reached output event count.
542  return false;
543  }
544  }
545  LogInfo("SuccessfulTermination")
546  << "The job is terminating successfully because each output module\n"
547  << "has reached its configured limit.\n";
548  return true;
549  }
550 
552  globalSchedule_->endJob(collector);
553  if (collector.hasThrown()) {
554  return;
555  }
556 
557  if (wantSummary_ == false) return;
558  {
559  TriggerReport tr;
560  getTriggerReport(tr);
561 
562  // The trigger report (pass/fail etc.):
563 
564  LogVerbatim("FwkSummary") << "";
565  LogVerbatim("FwkSummary") << "TrigReport " << "---------- Event Summary ------------";
566  if(!tr.trigPathSummaries.empty()) {
567  LogVerbatim("FwkSummary") << "TrigReport"
568  << " Events total = " << tr.eventSummary.totalEvents
569  << " passed = " << tr.eventSummary.totalEventsPassed
570  << " failed = " << tr.eventSummary.totalEventsFailed
571  << "";
572  } else {
573  LogVerbatim("FwkSummary") << "TrigReport"
574  << " Events total = " << tr.eventSummary.totalEvents
575  << " passed = " << tr.eventSummary.totalEvents
576  << " failed = 0";
577  }
578 
579  LogVerbatim("FwkSummary") << "";
580  LogVerbatim("FwkSummary") << "TrigReport " << "---------- Path Summary ------------";
581  LogVerbatim("FwkSummary") << "TrigReport "
582  << std::right << std::setw(10) << "Trig Bit#" << " "
583  << std::right << std::setw(10) << "Run" << " "
584  << std::right << std::setw(10) << "Passed" << " "
585  << std::right << std::setw(10) << "Failed" << " "
586  << std::right << std::setw(10) << "Error" << " "
587  << "Name" << "";
588  for (auto const& p: tr.trigPathSummaries) {
589  LogVerbatim("FwkSummary") << "TrigReport "
590  << std::right << std::setw(5) << 1
591  << std::right << std::setw(5) << p.bitPosition << " "
592  << std::right << std::setw(10) << p.timesRun << " "
593  << std::right << std::setw(10) << p.timesPassed << " "
594  << std::right << std::setw(10) << p.timesFailed << " "
595  << std::right << std::setw(10) << p.timesExcept << " "
596  << p.name << "";
597  }
598 
599  /*
600  std::vector<int>::const_iterator epi = empty_trig_paths_.begin();
601  std::vector<int>::const_iterator epe = empty_trig_paths_.end();
602  std::vector<std::string>::const_iterator epn = empty_trig_path_names_.begin();
603  for (; epi != epe; ++epi, ++epn) {
604 
605  LogVerbatim("FwkSummary") << "TrigReport "
606  << std::right << std::setw(5) << 1
607  << std::right << std::setw(5) << *epi << " "
608  << std::right << std::setw(10) << totalEvents() << " "
609  << std::right << std::setw(10) << totalEvents() << " "
610  << std::right << std::setw(10) << 0 << " "
611  << std::right << std::setw(10) << 0 << " "
612  << *epn << "";
613  }
614  */
615 
616  LogVerbatim("FwkSummary") << "";
617  LogVerbatim("FwkSummary") << "TrigReport " << "-------End-Path Summary ------------";
618  LogVerbatim("FwkSummary") << "TrigReport "
619  << std::right << std::setw(10) << "Trig Bit#" << " "
620  << std::right << std::setw(10) << "Run" << " "
621  << std::right << std::setw(10) << "Passed" << " "
622  << std::right << std::setw(10) << "Failed" << " "
623  << std::right << std::setw(10) << "Error" << " "
624  << "Name" << "";
625  for (auto const& p: tr.endPathSummaries) {
626  LogVerbatim("FwkSummary") << "TrigReport "
627  << std::right << std::setw(5) << 0
628  << std::right << std::setw(5) << p.bitPosition << " "
629  << std::right << std::setw(10) << p.timesRun << " "
630  << std::right << std::setw(10) << p.timesPassed << " "
631  << std::right << std::setw(10) << p.timesFailed << " "
632  << std::right << std::setw(10) << p.timesExcept << " "
633  << p.name << "";
634  }
635 
636  for (auto const& p: tr.trigPathSummaries) {
637  LogVerbatim("FwkSummary") << "";
638  LogVerbatim("FwkSummary") << "TrigReport " << "---------- Modules in Path: " << p.name << " ------------";
639  LogVerbatim("FwkSummary") << "TrigReport "
640  << std::right << std::setw(10) << "Trig Bit#" << " "
641  << std::right << std::setw(10) << "Visited" << " "
642  << std::right << std::setw(10) << "Passed" << " "
643  << std::right << std::setw(10) << "Failed" << " "
644  << std::right << std::setw(10) << "Error" << " "
645  << "Name" << "";
646 
647  unsigned int bitpos = 0;
648  for (auto const& mod: p.moduleInPathSummaries) {
649  LogVerbatim("FwkSummary") << "TrigReport "
650  << std::right << std::setw(5) << 1
651  << std::right << std::setw(5) << bitpos << " "
652  << std::right << std::setw(10) << mod.timesVisited << " "
653  << std::right << std::setw(10) << mod.timesPassed << " "
654  << std::right << std::setw(10) << mod.timesFailed << " "
655  << std::right << std::setw(10) << mod.timesExcept << " "
656  << mod.moduleLabel << "";
657  ++bitpos;
658  }
659  }
660 
661  for (auto const& p: tr.endPathSummaries) {
662  LogVerbatim("FwkSummary") << "";
663  LogVerbatim("FwkSummary") << "TrigReport " << "------ Modules in End-Path: " << p.name << " ------------";
664  LogVerbatim("FwkSummary") << "TrigReport "
665  << std::right << std::setw(10) << "Trig Bit#" << " "
666  << std::right << std::setw(10) << "Visited" << " "
667  << std::right << std::setw(10) << "Passed" << " "
668  << std::right << std::setw(10) << "Failed" << " "
669  << std::right << std::setw(10) << "Error" << " "
670  << "Name" << "";
671 
672  unsigned int bitpos=0;
673  for (auto const& mod: p.moduleInPathSummaries) {
674  LogVerbatim("FwkSummary") << "TrigReport "
675  << std::right << std::setw(5) << 0
676  << std::right << std::setw(5) << bitpos << " "
677  << std::right << std::setw(10) << mod.timesVisited << " "
678  << std::right << std::setw(10) << mod.timesPassed << " "
679  << std::right << std::setw(10) << mod.timesFailed << " "
680  << std::right << std::setw(10) << mod.timesExcept << " "
681  << mod.moduleLabel << "";
682  ++bitpos;
683  }
684  }
685 
686  LogVerbatim("FwkSummary") << "";
687  LogVerbatim("FwkSummary") << "TrigReport " << "---------- Module Summary ------------";
688  LogVerbatim("FwkSummary") << "TrigReport "
689  << std::right << std::setw(10) << "Visited" << " "
690  << std::right << std::setw(10) << "Run" << " "
691  << std::right << std::setw(10) << "Passed" << " "
692  << std::right << std::setw(10) << "Failed" << " "
693  << std::right << std::setw(10) << "Error" << " "
694  << "Name" << "";
695  for (auto const& worker : tr.workerSummaries) {
696  LogVerbatim("FwkSummary") << "TrigReport "
697  << std::right << std::setw(10) << worker.timesVisited << " "
698  << std::right << std::setw(10) << worker.timesRun << " "
699  << std::right << std::setw(10) << worker.timesPassed << " "
700  << std::right << std::setw(10) << worker.timesFailed << " "
701  << std::right << std::setw(10) << worker.timesExcept << " "
702  << worker.moduleLabel << "";
703  }
704  LogVerbatim("FwkSummary") << "";
705  }
706  // The timing report (CPU and Real Time):
709 
710  const int totalEvents = std::max(1, tr.eventSummary.totalEvents);
711 
712  LogVerbatim("FwkSummary") << "TimeReport " << "---------- Event Summary ---[sec]----";
713  LogVerbatim("FwkSummary") << "TimeReport"
714  << std::setprecision(6) << std::fixed
715  << " CPU/event = " << tr.eventSummary.cpuTime/totalEvents
716  << " Real/event = " << tr.eventSummary.realTime/totalEvents
717  << "";
718 
719  LogVerbatim("FwkSummary") << "";
720  LogVerbatim("FwkSummary") << "TimeReport " << "---------- Path Summary ---[sec]----";
721  LogVerbatim("FwkSummary") << "TimeReport "
722  << std::right << std::setw(22) << "per event "
723  << std::right << std::setw(22) << "per path-run "
724  << "";
725  LogVerbatim("FwkSummary") << "TimeReport "
726  << std::right << std::setw(10) << "CPU" << " "
727  << std::right << std::setw(10) << "Real" << " "
728  << std::right << std::setw(10) << "CPU" << " "
729  << std::right << std::setw(10) << "Real" << " "
730  << "Name" << "";
731  for (auto const& p: tr.trigPathSummaries) {
732  const int timesRun = std::max(1, p.timesRun);
733  LogVerbatim("FwkSummary") << "TimeReport "
734  << std::setprecision(6) << std::fixed
735  << std::right << std::setw(10) << p.cpuTime/totalEvents << " "
736  << std::right << std::setw(10) << p.realTime/totalEvents << " "
737  << std::right << std::setw(10) << p.cpuTime/timesRun << " "
738  << std::right << std::setw(10) << p.realTime/timesRun << " "
739  << p.name << "";
740  }
741  LogVerbatim("FwkSummary") << "TimeReport "
742  << std::right << std::setw(10) << "CPU" << " "
743  << std::right << std::setw(10) << "Real" << " "
744  << std::right << std::setw(10) << "CPU" << " "
745  << std::right << std::setw(10) << "Real" << " "
746  << "Name" << "";
747  LogVerbatim("FwkSummary") << "TimeReport "
748  << std::right << std::setw(22) << "per event "
749  << std::right << std::setw(22) << "per path-run "
750  << "";
751 
752  LogVerbatim("FwkSummary") << "";
753  LogVerbatim("FwkSummary") << "TimeReport " << "-------End-Path Summary ---[sec]----";
754  LogVerbatim("FwkSummary") << "TimeReport "
755  << std::right << std::setw(22) << "per event "
756  << std::right << std::setw(22) << "per endpath-run "
757  << "";
758  LogVerbatim("FwkSummary") << "TimeReport "
759  << std::right << std::setw(10) << "CPU" << " "
760  << std::right << std::setw(10) << "Real" << " "
761  << std::right << std::setw(10) << "CPU" << " "
762  << std::right << std::setw(10) << "Real" << " "
763  << "Name" << "";
764  for (auto const& p: tr.endPathSummaries) {
765  const int timesRun = std::max(1, p.timesRun);
766 
767  LogVerbatim("FwkSummary") << "TimeReport "
768  << std::setprecision(6) << std::fixed
769  << std::right << std::setw(10) << p.cpuTime/totalEvents << " "
770  << std::right << std::setw(10) << p.realTime/totalEvents << " "
771  << std::right << std::setw(10) << p.cpuTime/timesRun << " "
772  << std::right << std::setw(10) << p.realTime/timesRun << " "
773  << p.name << "";
774  }
775  LogVerbatim("FwkSummary") << "TimeReport "
776  << std::right << std::setw(10) << "CPU" << " "
777  << std::right << std::setw(10) << "Real" << " "
778  << std::right << std::setw(10) << "CPU" << " "
779  << std::right << std::setw(10) << "Real" << " "
780  << "Name" << "";
781  LogVerbatim("FwkSummary") << "TimeReport "
782  << std::right << std::setw(22) << "per event "
783  << std::right << std::setw(22) << "per endpath-run "
784  << "";
785 
786  for (auto const& p: tr.trigPathSummaries) {
787  LogVerbatim("FwkSummary") << "";
788  LogVerbatim("FwkSummary") << "TimeReport " << "---------- Modules in Path: " << p.name << " ---[sec]----";
789  LogVerbatim("FwkSummary") << "TimeReport "
790  << std::right << std::setw(22) << "per event "
791  << std::right << std::setw(22) << "per module-visit "
792  << "";
793  LogVerbatim("FwkSummary") << "TimeReport "
794  << std::right << std::setw(10) << "CPU" << " "
795  << std::right << std::setw(10) << "Real" << " "
796  << std::right << std::setw(10) << "CPU" << " "
797  << std::right << std::setw(10) << "Real" << " "
798  << "Name" << "";
799  for (auto const& mod: p.moduleInPathSummaries) {
800  LogVerbatim("FwkSummary") << "TimeReport "
801  << std::setprecision(6) << std::fixed
802  << std::right << std::setw(10) << mod.cpuTime/totalEvents << " "
803  << std::right << std::setw(10) << mod.realTime/totalEvents << " "
804  << std::right << std::setw(10) << mod.cpuTime/std::max(1, mod.timesVisited) << " "
805  << std::right << std::setw(10) << mod.realTime/std::max(1, mod.timesVisited) << " "
806  << mod.moduleLabel << "";
807  }
808  }
809  LogVerbatim("FwkSummary") << "TimeReport "
810  << std::right << std::setw(10) << "CPU" << " "
811  << std::right << std::setw(10) << "Real" << " "
812  << std::right << std::setw(10) << "CPU" << " "
813  << std::right << std::setw(10) << "Real" << " "
814  << "Name" << "";
815  LogVerbatim("FwkSummary") << "TimeReport "
816  << std::right << std::setw(22) << "per event "
817  << std::right << std::setw(22) << "per module-visit "
818  << "";
819 
820  for (auto const& p: tr.endPathSummaries) {
821  LogVerbatim("FwkSummary") << "";
822  LogVerbatim("FwkSummary") << "TimeReport " << "------ Modules in End-Path: " << p.name << " ---[sec]----";
823  LogVerbatim("FwkSummary") << "TimeReport "
824  << std::right << std::setw(22) << "per event "
825  << std::right << std::setw(22) << "per module-visit "
826  << "";
827  LogVerbatim("FwkSummary") << "TimeReport "
828  << std::right << std::setw(10) << "CPU" << " "
829  << std::right << std::setw(10) << "Real" << " "
830  << std::right << std::setw(10) << "CPU" << " "
831  << std::right << std::setw(10) << "Real" << " "
832  << "Name" << "";
833  for (auto const& mod: p.moduleInPathSummaries) {
834  LogVerbatim("FwkSummary") << "TimeReport "
835  << std::setprecision(6) << std::fixed
836  << std::right << std::setw(10) << mod.cpuTime/totalEvents << " "
837  << std::right << std::setw(10) << mod.realTime/totalEvents << " "
838  << std::right << std::setw(10) << mod.cpuTime/std::max(1, mod.timesVisited) << " "
839  << std::right << std::setw(10) << mod.realTime/std::max(1, mod.timesVisited) << " "
840  << mod.moduleLabel << "";
841  }
842  }
843  LogVerbatim("FwkSummary") << "TimeReport "
844  << std::right << std::setw(10) << "CPU" << " "
845  << std::right << std::setw(10) << "Real" << " "
846  << std::right << std::setw(10) << "CPU" << " "
847  << std::right << std::setw(10) << "Real" << " "
848  << "Name" << "";
849  LogVerbatim("FwkSummary") << "TimeReport "
850  << std::right << std::setw(22) << "per event "
851  << std::right << std::setw(22) << "per module-visit "
852  << "";
853 
854  LogVerbatim("FwkSummary") << "";
855  LogVerbatim("FwkSummary") << "TimeReport " << "---------- Module Summary ---[sec]----";
856  LogVerbatim("FwkSummary") << "TimeReport "
857  << std::right << std::setw(22) << "per event "
858  << std::right << std::setw(22) << "per module-run "
859  << std::right << std::setw(22) << "per module-visit "
860  << "";
861  LogVerbatim("FwkSummary") << "TimeReport "
862  << std::right << std::setw(10) << "CPU" << " "
863  << std::right << std::setw(10) << "Real" << " "
864  << std::right << std::setw(10) << "CPU" << " "
865  << std::right << std::setw(10) << "Real" << " "
866  << std::right << std::setw(10) << "CPU" << " "
867  << std::right << std::setw(10) << "Real" << " "
868  << "Name" << "";
869  for (auto const& worker : tr.workerSummaries) {
870  LogVerbatim("FwkSummary") << "TimeReport "
871  << std::setprecision(6) << std::fixed
872  << std::right << std::setw(10) << worker.cpuTime/totalEvents << " "
873  << std::right << std::setw(10) << worker.realTime/totalEvents << " "
874  << std::right << std::setw(10) << worker.cpuTime/std::max(1, worker.timesRun) << " "
875  << std::right << std::setw(10) << worker.realTime/std::max(1, worker.timesRun) << " "
876  << std::right << std::setw(10) << worker.cpuTime/std::max(1, worker.timesVisited) << " "
877  << std::right << std::setw(10) << worker.realTime/std::max(1, worker.timesVisited) << " "
878  << worker.moduleLabel << "";
879  }
880  LogVerbatim("FwkSummary") << "TimeReport "
881  << std::right << std::setw(10) << "CPU" << " "
882  << std::right << std::setw(10) << "Real" << " "
883  << std::right << std::setw(10) << "CPU" << " "
884  << std::right << std::setw(10) << "Real" << " "
885  << std::right << std::setw(10) << "CPU" << " "
886  << std::right << std::setw(10) << "Real" << " "
887  << "Name" << "";
888  LogVerbatim("FwkSummary") << "TimeReport "
889  << std::right << std::setw(22) << "per event "
890  << std::right << std::setw(22) << "per module-run "
891  << std::right << std::setw(22) << "per module-visit "
892  << "";
893 
894  LogVerbatim("FwkSummary") << "";
895  LogVerbatim("FwkSummary") << "T---Report end!" << "";
896  LogVerbatim("FwkSummary") << "";
897  }
898 
900  using std::placeholders::_1;
902  }
903 
905  using std::placeholders::_1;
907  }
908 
910  using std::placeholders::_1;
912  }
913 
914  void Schedule::writeRun(RunPrincipal const& rp, ProcessContext const* processContext) {
915  using std::placeholders::_1;
916  for_all(all_output_communicators_, std::bind(&OutputModuleCommunicator::writeRun, _1, std::cref(rp), processContext));
917  }
918 
919  void Schedule::writeLumi(LuminosityBlockPrincipal const& lbp, ProcessContext const* processContext) {
920  using std::placeholders::_1;
921  for_all(all_output_communicators_, std::bind(&OutputModuleCommunicator::writeLumi, _1, std::cref(lbp), processContext));
922  }
923 
925  using std::placeholders::_1;
926  // Return true iff at least one output module returns true.
927  return (std::find_if (all_output_communicators_.begin(), all_output_communicators_.end(),
929  != all_output_communicators_.end());
930  }
931 
933  using std::placeholders::_1;
934  for_all(allWorkers(), std::bind(&Worker::respondToOpenInputFile, _1, std::cref(fb)));
935  }
936 
938  using std::placeholders::_1;
939  for_all(allWorkers(), std::bind(&Worker::respondToCloseInputFile, _1, std::cref(fb)));
940  }
941 
942  void Schedule::beginJob(ProductRegistry const& iRegistry) {
944 
945  globalSchedule_->beginJob(iRegistry);
946  }
947 
948  void Schedule::beginStream(unsigned int iStreamID) {
949  assert(iStreamID<streamSchedules_.size());
950  streamSchedules_[iStreamID]->beginStream();
951  }
952 
953  void Schedule::endStream(unsigned int iStreamID) {
954  assert(iStreamID<streamSchedules_.size());
955  streamSchedules_[iStreamID]->endStream();
956  }
957 
959  using std::placeholders::_1;
961  }
962  void Schedule::postForkReacquireResources(unsigned int iChildIndex, unsigned int iNumberOfChildren) {
963  using std::placeholders::_1;
964  for_all(allWorkers(), std::bind(&Worker::postForkReacquireResources, _1, iChildIndex, iNumberOfChildren));
965  }
966 
968  ParameterSet const& iPSet,
969  const ProductRegistry& iRegistry) {
970  Worker* found = nullptr;
971  for (auto const& worker : allWorkers()) {
972  if (worker->description().moduleLabel() == iLabel) {
973  found = worker;
974  break;
975  }
976  }
977  if (nullptr == found) {
978  return false;
979  }
980 
981  auto newMod = moduleRegistry_->replaceModule(iLabel,iPSet,preallocConfig_);
982 
983  globalSchedule_->replaceModule(newMod,iLabel);
984 
985  for(auto s: streamSchedules_) {
986  s->replaceModule(newMod,iLabel);
987  }
988 
989  {
990  //Need to updateLookup in order to make getByToken work
991  auto const runLookup = iRegistry.productLookup(InRun);
992  auto const lumiLookup = iRegistry.productLookup(InLumi);
993  auto const eventLookup = iRegistry.productLookup(InEvent);
994  found->updateLookup(InRun,*runLookup);
995  found->updateLookup(InLumi,*lumiLookup);
996  found->updateLookup(InEvent,*eventLookup);
997  }
998 
999  return true;
1000  }
1001 
1002  std::vector<ModuleDescription const*>
1004  std::vector<ModuleDescription const*> result;
1005  result.reserve(allWorkers().size());
1006 
1007  for (auto const& worker : allWorkers()) {
1008  ModuleDescription const* p = worker->descPtr();
1009  result.push_back(p);
1010  }
1011  return result;
1012  }
1013 
1014  Schedule::AllWorkers const&
1016  return globalSchedule_->allWorkers();
1017  }
1018 
1019  void
1020  Schedule::availablePaths(std::vector<std::string>& oLabelsToFill) const {
1021  streamSchedules_[0]->availablePaths(oLabelsToFill);
1022  }
1023 
1024  void
1026  std::vector<std::string>& oLabelsToFill) const {
1027  streamSchedules_[0]->modulesInPath(iPathLabel,oLabelsToFill);
1028  }
1029 
1030  void
1032  endpathsAreActive_ = active;
1033  for(auto const & s : streamSchedules_) {
1034  s->enableEndPaths(active);
1035  }
1036  }
1037 
1038  bool
1040  return endpathsAreActive_;
1041  }
1042 
1043  void
1045  rep.eventSummary.totalEvents = 0;
1048  for(auto& s: streamSchedules_) {
1049  s->getTriggerReport(rep);
1050  }
1051  }
1052 
1053  void
1055  rep.eventSummary.totalEvents = 0;
1056  rep.eventSummary.cpuTime = 0.;
1057  rep.eventSummary.realTime = 0.;
1058  summaryTimeKeeper_->fillTriggerTimingReport(rep);
1059  }
1060 
1061  int
1063  int returnValue = 0;
1064  for(auto& s: streamSchedules_) {
1065  returnValue += s->totalEvents();
1066  }
1067  return returnValue;
1068  }
1069 
1070  int
1072  int returnValue = 0;
1073  for(auto& s: streamSchedules_) {
1074  returnValue += s->totalEventsPassed();
1075  }
1076  return returnValue;
1077  }
1078 
1079  int
1081  int returnValue = 0;
1082  for(auto& s: streamSchedules_) {
1083  returnValue += s->totalEventsFailed();
1084  }
1085  return returnValue;
1086  }
1087 
1088 
1089  void
1091  for(auto const& s: streamSchedules_) {
1092  s->clearCounters();
1093  }
1094  }
1095 
1096  //====================================
1097  // Schedule::checkForCorrectness algorithm
1098  //
1099  // The code creates a 'dependency' graph between all
1100  // modules. A module depends on another module if
1101  // 1) it 'consumes' data produced by that module
1102  // 2) it appears directly after the module within a Path
1103  //
1104  // If there is a cycle in the 'dependency' graph then
1105  // the schedule may be unrunnable. The schedule is still
1106  // runnable if all cycles have at least two edges which
1107  // connect modules only by Path dependencies (i.e. not
1108  // linked by a data dependency).
1109  //
1110  // Example 1:
1111  // C consumes data from B
1112  // Path 1: A + B + C
1113  // Path 2: B + C + A
1114  //
1115  // Cycle: A after C [p2], C consumes B, B after A [p1]
1116  // Since this cycle has 2 path only edges it is OK since
1117  // A and (B+C) are independent so their run order doesn't matter
1118  //
1119  // Example 2:
1120  // B consumes A
1121  // C consumes B
1122  // Path: C + A
1123  //
1124  // Cycle: A after C [p], C consumes B, B consumes A
1125  // Since this cycle has 1 path only edge it is unrunnable.
1126  //
1127  // Example 3:
1128  // A consumes B
1129  // B consumes C
1130  // C consumes A
1131  // (no Path since unscheduled execution)
1132  //
1133  // Cycle: A consumes B, B consumes C, C consumes A
1134  // Since this cycle has 0 path only edges it is unrunnable.
1135  //====================================
1136 
1137  namespace {
1138  typedef std::pair<unsigned int, unsigned int> SimpleEdge;
1139  typedef std::map<SimpleEdge, std::vector<unsigned int>> EdgeToPathMap;
1140 
1141  typedef boost::adjacency_list<boost::vecS, boost::vecS, boost::bidirectionalS> Graph;
1142 
1143  typedef boost::graph_traits<Graph>::edge_descriptor Edge;
1144  struct cycle_detector : public boost::dfs_visitor<> {
1145 
1146  cycle_detector(EdgeToPathMap const& iEdgeToPathMap,
1147  std::vector<std::string> const& iPathNames,
1148  std::map<std::string,unsigned int> const& iModuleNamesToIndex):
1149  m_edgeToPathMap(iEdgeToPathMap),
1150  m_pathNames(iPathNames),
1151  m_namesToIndex(iModuleNamesToIndex){}
1152 
1153  void tree_edge(Edge iEdge, Graph const&) {
1154  m_stack.push_back(iEdge);
1155  }
1156 
1157  void finish_edge(Edge iEdge, Graph const& iGraph) {
1158  if(not m_stack.empty()) {
1159  if (iEdge == m_stack.back()) {
1160  m_stack.pop_back();
1161  }
1162  }
1163  }
1164 
1165  //Called if a cycle happens
1166  void back_edge(Edge iEdge, Graph const& iGraph) {
1167  //NOTE: If the path containing the cycle contains two or more
1168  // path only edges then there is no problem
1169 
1171  IndexMap const& index = get(boost::vertex_index, iGraph);
1172 
1173  unsigned int vertex = index[target(iEdge,iGraph)];
1174 
1175  //Find last edge which starts with this vertex
1176  std::list<Edge>::iterator itFirst = m_stack.begin();
1177  {
1178  bool seenVertex = false;
1179  while(itFirst != m_stack.end()) {
1180  if(not seenVertex) {
1181  if(index[source(*itFirst,iGraph)] == vertex) {
1182  seenVertex = true;
1183  }
1184  } else
1185  if (index[source(*itFirst,iGraph)] != vertex) {
1186  break;
1187  }
1188  ++itFirst;
1189  }
1190  if(itFirst != m_stack.begin()) {
1191  --itFirst;
1192  }
1193  }
1194  //This edge has not been added to the stack yet
1195  // making a copy allows us to add it in but not worry
1196  // about removing it at the end of the routine
1197  std::vector<Edge> tempStack;
1198  tempStack.reserve(m_stack.size()+1);
1199  tempStack.insert(tempStack.end(),itFirst,m_stack.end());
1200  tempStack.emplace_back(iEdge);
1201 
1202  unsigned int nPathDependencyOnly =0;
1203  for(auto const& edge: tempStack) {
1204  unsigned int in =index[source(edge,iGraph)];
1205  unsigned int out =index[target(edge,iGraph)];
1206 
1207  auto iFound = m_edgeToPathMap.find(SimpleEdge(in,out));
1208  bool pathDependencyOnly = true;
1209  for(auto dependency : iFound->second) {
1210  if (dependency == std::numeric_limits<unsigned int>::max()) {
1211  pathDependencyOnly = false;
1212  break;
1213  }
1214  }
1215  if (pathDependencyOnly) {
1216  ++nPathDependencyOnly;
1217  }
1218  }
1219  if(nPathDependencyOnly < 2) {
1220  reportError(tempStack,index,iGraph);
1221  }
1222  }
1223  private:
1224  std::string const& pathName(unsigned int iIndex) const {
1225  return m_pathNames[iIndex];
1226  }
1227 
1228  std::string const& moduleName(unsigned int iIndex) const {
1229  for(auto const& item : m_namesToIndex) {
1230  if(item.second == iIndex) {
1231  return item.first;
1232  }
1233  }
1234  assert(false);
1235  }
1236 
1237  void
1238  reportError(std::vector<Edge>const& iEdges,
1240  Graph const& iGraph) const {
1241  std::stringstream oStream;
1242  oStream <<"Module run order problem found: \n";
1243  bool first_edge = true;
1244  for(auto const& edge: iEdges) {
1245  unsigned int in =iIndex[source(edge,iGraph)];
1246  unsigned int out =iIndex[target(edge,iGraph)];
1247 
1248  if(first_edge) {
1249  first_edge = false;
1250  } else {
1251  oStream<<", ";
1252  }
1253  oStream <<moduleName(in);
1254 
1255  auto iFound = m_edgeToPathMap.find(SimpleEdge(in,out));
1256  bool pathDependencyOnly = true;
1257  for(auto dependency : iFound->second) {
1258  if (dependency == std::numeric_limits<unsigned int>::max()) {
1259  pathDependencyOnly = false;
1260  break;
1261  }
1262  }
1263  if (pathDependencyOnly) {
1264  oStream <<" after "<<moduleName(out)<<" [path "<<pathName(iFound->second[0])<<"]";
1265  } else {
1266  oStream <<" consumes "<<moduleName(out);
1267  }
1268  }
1269  oStream<<"\n Running in the threaded framework would lead to indeterminate results."
1270  "\n Please change order of modules in mentioned Path(s) to avoid inconsistent module ordering.";
1271 
1272  LogError("UnrunnableSchedule")<<oStream.str();
1273  }
1274 
1275  EdgeToPathMap const& m_edgeToPathMap;
1276  std::vector<std::string> const& m_pathNames;
1277  std::map<std::string,unsigned int> m_namesToIndex;
1278 
1279  std::list<Edge> m_stack;
1280  };
1281  }
1282 
1283  void
1285  {
1286  //Need to lookup names to ids quickly
1287  std::map<std::string,unsigned int> moduleNamesToIndex;
1288  for(auto worker: allWorkers()) {
1289  moduleNamesToIndex.insert( std::make_pair(worker->description().moduleLabel(),
1290  worker->description().id()));
1291  }
1292 
1293  //If a module to module dependency comes from a path, remember which path
1294  EdgeToPathMap edgeToPathMap;
1295 
1296  //determine the path dependencies
1297  std::vector<std::string> pathNames;
1298  {
1299  streamSchedules_[0]->availablePaths(pathNames);
1300 
1301  std::vector<std::string> moduleNames;
1302  std::vector<std::string> reducedModuleNames;
1303  unsigned int pathIndex=0;
1304  for(auto const& path: pathNames) {
1305  moduleNames.clear();
1306  reducedModuleNames.clear();
1307  std::set<std::string> alreadySeenNames;
1308 
1309  streamSchedules_[0]->modulesInPath(path,moduleNames);
1310  std::string lastModuleName;
1311  unsigned int lastModuleIndex;
1312  for(auto const& name: moduleNames) {
1313  auto found = alreadySeenNames.insert(name);
1314  if(found.second) {
1315  //first time for this path
1316  unsigned int moduleIndex = moduleNamesToIndex[name];
1317  if(not lastModuleName.empty() ) {
1318  edgeToPathMap[std::make_pair(moduleIndex,lastModuleIndex)].push_back(pathIndex);
1319  }
1320  lastModuleName = name;
1321  lastModuleIndex = moduleIndex;
1322  }
1323  }
1324  ++pathIndex;
1325  }
1326  }
1327  {
1328  std::vector<const char*> dependentModules;
1329  //determine the data dependencies
1330  for(auto const& worker: allWorkers()) {
1331  dependentModules.clear();
1332  //NOTE: what about aliases?
1333  worker->modulesDependentUpon(dependentModules);
1334  auto found = moduleNamesToIndex.find(worker->description().moduleLabel());
1335  if (found == moduleNamesToIndex.end()) {
1336  //The module was from a previous process
1337  continue;
1338  }
1339  unsigned int moduleIndex = found->second;
1340  for(auto name: dependentModules) {
1341  edgeToPathMap[std::make_pair(moduleIndex, moduleNamesToIndex[name])].push_back(std::numeric_limits<unsigned int>::max());
1342  }
1343  }
1344  }
1345  //Now use boost graph library to find cycles in the dependencies
1346  std::vector<SimpleEdge> outList;
1347  outList.reserve(edgeToPathMap.size());
1348  for(auto const& edgeInfo: edgeToPathMap) {
1349  outList.push_back(edgeInfo.first);
1350  }
1351 
1352  Graph g(outList.begin(),outList.end(), moduleNamesToIndex.size());
1353 
1354  cycle_detector detector(edgeToPathMap,pathNames,moduleNamesToIndex);
1355  boost::depth_first_search(g,boost::visitor(detector));
1356  }
1357 }
type
Definition: HCALResponse.h:21
bool empty() const
Definition: ParameterSet.h:216
std::vector< PathSummary > endPathSummaries
Definition: TriggerReport.h:64
T getUntrackedParameter(std::string const &, T const &) const
std::vector< PathTimingSummary > endPathSummaries
dictionary aliases
Definition: autoCond.py:34
int i
Definition: DBlmapReader.cc:9
string rep
Definition: cuy.py:1188
void checkForCorrectness() const
Check that the schedule is actually runable.
Definition: Schedule.cc:1284
void stopEvent(StreamContext const &)
std::vector< BranchIDList > BranchIDLists
Definition: BranchIDList.h:19
virtual void openFile(FileBlock const &fb)=0
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
Definition: Schedule.cc:1015
void availablePaths(std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill the labels for all paths in the process
Definition: Schedule.cc:1020
static std::string const source("source")
virtual void writeRun(RunPrincipal const &rp, ProcessContext const *)=0
preallocConfig_(prealloc)
void restartModuleEvent(StreamContext const &, ModuleCallingContext const &)
bool endPathsEnabled() const
Definition: Schedule.cc:1039
all_output_communicators_()
void respondToCloseInputFile(FileBlock const &fb)
Definition: Schedule.cc:937
void startModuleEvent(StreamContext const &, ModuleCallingContext const &)
std::vector< Worker * > AllWorkers
Definition: Schedule.h:115
std::vector< ParameterSet > VParameterSet
Definition: ParameterSet.h:33
void writeRun(RunPrincipal const &rp, ProcessContext const *)
Definition: Schedule.cc:914
void endStream(unsigned int)
Definition: Schedule.cc:953
void writeLumi(LuminosityBlockPrincipal const &lbp, ProcessContext const *)
Definition: Schedule.cc:919
void enableEndPaths(bool active)
Definition: Schedule.cc:1031
processConfiguration
Definition: Schedule.cc:370
std::shared_ptr< ProductHolderIndexHelper > const & productLookup(BranchType branchType) const
ParameterSet getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
endpathsAreActive_(true)
Definition: Schedule.cc:375
std::shared_ptr< ModuleRegistry > moduleRegistry_
Definition: Schedule.h:247
std::vector< WorkerSummary > workerSummaries
Definition: TriggerReport.h:65
actions
Definition: Schedule.cc:370
static unsigned int getUniqueID()
Returns a unique id each time called. Intended to be passed to ModuleDescription&#39;s constructor&#39;s modI...
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e g
Definition: Activities.doc:4
int totalEventsFailed() const
Definition: Schedule.cc:1080
bool changeModule(std::string const &iLabel, ParameterSet const &iPSet, const ProductRegistry &iRegistry)
Definition: Schedule.cc:967
void eraseOrSetUntrackedParameterSet(std::string const &name)
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
std::vector< std::string > getParameterNamesForType(bool trackiness=true) const
Definition: ParameterSet.h:192
int totalEventsPassed() const
Definition: Schedule.cc:1071
std::string moduleName(Provenance const &provenance)
Definition: Provenance.cc:27
virtual void updateLookup(BranchType iBranchType, ProductHolderIndexHelper const &)=0
tuple path
else: Piece not in the list, fine.
std::vector< PathSummary > trigPathSummaries
Definition: TriggerReport.h:63
EventSummary eventSummary
Definition: TriggerReport.h:62
void limitOutput(ParameterSet const &proc_pset, BranchIDLists const &branchIDLists)
Definition: Schedule.cc:496
wantSummary_(tns.wantSummary())
int totalEvents() const
Definition: Schedule.cc:1062
virtual void openNewFileIfNeeded()=0
EventTimingSummary eventSummary
Schedule(ParameterSet &proc_pset, service::TriggerNamesService &tns, ProductRegistry &pregistry, BranchIDListHelper &branchIDListHelper, ThinnedAssociationsHelper &thinnedAssociationsHelper, ExceptionToActionTable const &actions, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration > processConfiguration, const ParameterSet *subProcPSet, PreallocationConfiguration const &config, ProcessContext const *processContext)
Definition: Schedule.cc:358
void clearCounters()
Clear all the counters in the trigger report.
Definition: Schedule.cc:1090
tuple result
Definition: query.py:137
std::vector< PathTimingSummary > trigPathSummaries
std::vector< std::shared_ptr< StreamSchedule > > streamSchedules_
Definition: Schedule.h:248
void setProcessParameterSetID(ParameterSetID const &id)
Associated free functions.
Definition: Registry.cc:79
bool terminate() const
Return whether each output module has reached its maximum count.
Definition: Schedule.cc:535
void respondToOpenInputFile(FileBlock const &fb)
Definition: Schedule.cc:932
virtual void writeLumi(LuminosityBlockPrincipal const &lbp, ProcessContext const *)=0
void stopPath(StreamContext const &, PathContext const &, HLTPathStatus const &)
areg
Definition: Schedule.cc:370
void stopModuleEvent(StreamContext const &, ModuleCallingContext const &)
void postForkReacquireResources(unsigned int iChildIndex, unsigned int iNumberOfChildren)
Definition: Worker.h:88
void getTriggerReport(TriggerReport &rep) const
Definition: Schedule.cc:1044
tuple out
Definition: dbtoconf.py:99
virtual bool shouldWeCloseFile() const =0
PreallocationConfiguration preallocConfig_
Definition: Schedule.h:253
moduleRegistry_(new ModuleRegistry())
void sort_all(RandomAccessSequence &s)
wrappers for std::sort
Definition: Algorithms.h:120
volatile bool endpathsAreActive_
Definition: Schedule.h:259
void respondToOpenInputFile(FileBlock const &fb)
Definition: Worker.h:84
void startPath(StreamContext const &, PathContext const &)
bool search_all(ForwardSequence const &s, Datum const &d)
Definition: Algorithms.h:46
std::unique_ptr< SystemTimeKeeper > summaryTimeKeeper_
Definition: Schedule.h:255
virtual std::unique_ptr< OutputModuleCommunicator > createOutputModuleCommunicator()=0
AllOutputModuleCommunicators all_output_communicators_
Definition: Schedule.h:252
void loadMissingDictionaries()
void modulesInPath(std::string const &iPathLabel, std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel ...
Definition: Schedule.cc:1025
void pauseModuleEvent(StreamContext const &, ModuleCallingContext const &)
void beginStream(unsigned int)
Definition: Schedule.cc:948
void respondToCloseInputFile(FileBlock const &fb)
Definition: Worker.h:85
void preForkReleaseResources()
Definition: Worker.h:87
bool wantSummary_
Definition: Schedule.h:257
void postForkReacquireResources(unsigned int iChildIndex, unsigned int iNumberOfChildren)
Definition: Schedule.cc:962
list key
Definition: combine.py:13
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
Definition: Schedule.cc:1003
Strings const & getTrigPaths() const
std::unique_ptr< GlobalSchedule > globalSchedule_
Definition: Schedule.h:250
void beginJob(ProductRegistry const &)
Definition: Schedule.cc:942
void openNewOutputFilesIfNeeded()
Definition: Schedule.cc:904
size_t getParameterSetNames(std::vector< std::string > &output, bool trackiness=true) const
void preForkReleaseResources()
Definition: Schedule.cc:958
void openOutputFiles(FileBlock &fb)
Definition: Schedule.cc:909
preg
Definition: Schedule.cc:370
std::vector< WorkerTimingSummary > workerSummaries
T mod(const T &a, const T &b)
Definition: ecalDccMap.h:4
void endJob(ExceptionCollector &collector)
Definition: Schedule.cc:551
void getTriggerTimingReport(TriggerTimingReport &rep) const
Definition: Schedule.cc:1054
bool shouldWeCloseOutput() const
Definition: Schedule.cc:924
std::vector< std::string > vstring
Definition: Schedule.cc:354
tuple size
Write out results.
void closeOutputFiles()
Definition: Schedule.cc:899
prealloc
Definition: Schedule.cc:370
std::string match(BranchDescription const &a, BranchDescription const &b, std::string const &fileName)