CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
Schedule.cc
Go to the documentation of this file.
2 
25 
26 #include "boost/graph/graph_traits.hpp"
27 #include "boost/graph/adjacency_list.hpp"
28 #include "boost/graph/depth_first_search.hpp"
29 #include "boost/graph/visitors.hpp"
30 
31 
32 #include <algorithm>
33 #include <cassert>
34 #include <cstdlib>
35 #include <functional>
36 #include <iomanip>
37 #include <list>
38 #include <map>
39 #include <exception>
40 #include <sstream>
41 
42 namespace edm {
43  namespace {
44  using std::placeholders::_1;
45 
46  bool binary_search_string(std::vector<std::string> const& v, std::string const& s) {
47  return std::binary_search(v.begin(), v.end(), s);
48  }
49 
50  // Here we make the trigger results inserter directly. This should
51  // probably be a utility in the WorkerRegistry or elsewhere.
52 
53  std::shared_ptr<TriggerResultInserter>
54  makeInserter(ParameterSet& proc_pset,
55  PreallocationConfiguration const& iPrealloc,
56  ProductRegistry& preg,
57  ExceptionToActionTable const& actions,
58  std::shared_ptr<ActivityRegistry> areg,
59  std::shared_ptr<ProcessConfiguration> processConfiguration) {
60 
61  ParameterSet* trig_pset = proc_pset.getPSetForUpdate("@trigger_paths");
62  trig_pset->registerIt();
63 
64  WorkerParams work_args(trig_pset, preg, &iPrealloc, processConfiguration, actions);
65  ModuleDescription md(trig_pset->id(),
66  "TriggerResultInserter",
67  "TriggerResults",
68  processConfiguration.get(),
70 
71  areg->preModuleConstructionSignal_(md);
72  bool postCalled = false;
73  std::shared_ptr<TriggerResultInserter> returnValue;
74  try {
75  maker::ModuleHolderT<TriggerResultInserter> holder(std::make_shared<TriggerResultInserter>(*trig_pset, iPrealloc.numberOfStreams()),static_cast<Maker const*>(nullptr));
76  holder.setModuleDescription(md);
77  holder.registerProductsAndCallbacks(&preg);
78  returnValue =holder.module();
79  postCalled = true;
80  // if exception then post will be called in the catch block
81  areg->postModuleConstructionSignal_(md);
82  }
83  catch (...) {
84  if(!postCalled) {
85  try {
86  areg->postModuleConstructionSignal_(md);
87  }
88  catch (...) {
89  // If post throws an exception ignore it because we are already handling another exception
90  }
91  }
92  throw;
93  }
94  return returnValue;
95  }
96 
97 
98  void
99  checkAndInsertAlias(std::string const& friendlyClassName,
100  std::string const& moduleLabel,
101  std::string const& productInstanceName,
102  std::string const& processName,
103  std::string const& alias,
104  std::string const& instanceAlias,
105  ProductRegistry const& preg,
106  std::multimap<BranchKey, BranchKey>& aliasMap,
107  std::map<BranchKey, BranchKey>& aliasKeys) {
108  std::string const star("*");
109 
110  BranchKey key(friendlyClassName, moduleLabel, productInstanceName, processName);
111  if(preg.productList().find(key) == preg.productList().end()) {
112  // No product was found matching the alias.
113  // We throw an exception only if a module with the specified module label was created in this process.
114  for(auto const& product : preg.productList()) {
115  if(moduleLabel == product.first.moduleLabel() && processName == product.first.processName()) {
116  throw Exception(errors::Configuration, "EDAlias does not match data\n")
117  << "There are no products of type '" << friendlyClassName << "'\n"
118  << "with module label '" << moduleLabel << "' and instance name '" << productInstanceName << "'.\n";
119  }
120  }
121  }
122 
123  std::string const& theInstanceAlias(instanceAlias == star ? productInstanceName : instanceAlias);
124  BranchKey aliasKey(friendlyClassName, alias, theInstanceAlias, processName);
125  if(preg.productList().find(aliasKey) != preg.productList().end()) {
126  throw Exception(errors::Configuration, "EDAlias conflicts with data\n")
127  << "A product of type '" << friendlyClassName << "'\n"
128  << "with module label '" << alias << "' and instance name '" << theInstanceAlias << "'\n"
129  << "already exists.\n";
130  }
131  auto iter = aliasKeys.find(aliasKey);
132  if(iter != aliasKeys.end()) {
133  // The alias matches a previous one. If the same alias is used for different product, throw.
134  if(iter->second != key) {
135  throw Exception(errors::Configuration, "EDAlias conflict\n")
136  << "The module label alias '" << alias << "' and product instance alias '" << theInstanceAlias << "'\n"
137  << "are used for multiple products of type '" << friendlyClassName << "'\n"
138  << "One has module label '" << moduleLabel << "' and product instance name '" << productInstanceName << "',\n"
139  << "the other has module label '" << iter->second.moduleLabel() << "' and product instance name '" << iter->second.productInstanceName() << "'.\n";
140  }
141  } else {
142  auto prodIter = preg.productList().find(key);
143  if(prodIter != preg.productList().end()) {
144  if (!prodIter->second.produced()) {
145  throw Exception(errors::Configuration, "EDAlias\n")
146  << "The module label alias '" << alias << "' and product instance alias '" << theInstanceAlias << "'\n"
147  << "are used for a product of type '" << friendlyClassName << "'\n"
148  << "with module label '" << moduleLabel << "' and product instance name '" << productInstanceName << "',\n"
149  << "An EDAlias can only be used for products produced in the current process. This one is not.\n";
150  }
151  aliasMap.insert(std::make_pair(key, aliasKey));
152  aliasKeys.insert(std::make_pair(aliasKey, key));
153  }
154  }
155  }
156 
157  void
158  processEDAliases(ParameterSet const& proc_pset, std::string const& processName, ProductRegistry& preg) {
159  std::vector<std::string> aliases = proc_pset.getParameter<std::vector<std::string> >("@all_aliases");
160  if(aliases.empty()) {
161  return;
162  }
163  std::string const star("*");
164  std::string const empty("");
166  desc.add<std::string>("type");
167  desc.add<std::string>("fromProductInstance", star);
168  desc.add<std::string>("toProductInstance", star);
169 
170  std::multimap<BranchKey, BranchKey> aliasMap;
171 
172  std::map<BranchKey, BranchKey> aliasKeys; // Used to search for duplicates or clashes.
173 
174  // Now, loop over the alias information and store it in aliasMap.
175  for(std::string const& alias : aliases) {
176  ParameterSet const& aliasPSet = proc_pset.getParameterSet(alias);
177  std::vector<std::string> vPSetNames = aliasPSet.getParameterNamesForType<VParameterSet>();
178  for(std::string const& moduleLabel : vPSetNames) {
179  VParameterSet vPSet = aliasPSet.getParameter<VParameterSet>(moduleLabel);
180  for(ParameterSet& pset : vPSet) {
181  desc.validate(pset);
182  std::string friendlyClassName = pset.getParameter<std::string>("type");
183  std::string productInstanceName = pset.getParameter<std::string>("fromProductInstance");
184  std::string instanceAlias = pset.getParameter<std::string>("toProductInstance");
185  if(productInstanceName == star) {
186  bool match = false;
187  BranchKey lowerBound(friendlyClassName, moduleLabel, empty, empty);
188  for(ProductRegistry::ProductList::const_iterator it = preg.productList().lower_bound(lowerBound);
189  it != preg.productList().end() && it->first.friendlyClassName() == friendlyClassName && it->first.moduleLabel() == moduleLabel;
190  ++it) {
191  if(it->first.processName() != processName) {
192  continue;
193  }
194  match = true;
195 
196  checkAndInsertAlias(friendlyClassName, moduleLabel, it->first.productInstanceName(), processName, alias, instanceAlias, preg, aliasMap, aliasKeys);
197  }
198  if(!match) {
199  // No product was found matching the alias.
200  // We throw an exception only if a module with the specified module label was created in this process.
201  for(auto const& product : preg.productList()) {
202  if(moduleLabel == product.first.moduleLabel() && processName == product.first.processName()) {
203  throw Exception(errors::Configuration, "EDAlias parameter set mismatch\n")
204  << "There are no products of type '" << friendlyClassName << "'\n"
205  << "with module label '" << moduleLabel << "'.\n";
206  }
207  }
208  }
209  } else {
210  checkAndInsertAlias(friendlyClassName, moduleLabel, productInstanceName, processName, alias, instanceAlias, preg, aliasMap, aliasKeys);
211  }
212  }
213  }
214  }
215 
216 
217  // Now add the new alias entries to the product registry.
218  for(auto const& aliasEntry : aliasMap) {
219  ProductRegistry::ProductList::const_iterator it = preg.productList().find(aliasEntry.first);
220  assert(it != preg.productList().end());
221  preg.addLabelAlias(it->second, aliasEntry.second.moduleLabel(), aliasEntry.second.productInstanceName());
222  }
223 
224  }
225 
226  typedef std::vector<std::string> vstring;
227 
228  void reduceParameterSet(ParameterSet& proc_pset,
229  vstring const& end_path_name_list,
230  vstring& modulesInConfig,
231  std::set<std::string> const& usedModuleLabels,
232  std::map<std::string, std::vector<std::pair<std::string, int> > >& outputModulePathPositions) {
233  // Before calculating the ParameterSetID of the top level ParameterSet or
234  // saving it in the registry drop from the top level ParameterSet all
235  // OutputModules and EDAnalyzers not on trigger paths. If unscheduled
236  // production is not enabled also drop all the EDFilters and EDProducers
237  // that are not scheduled. Drop the ParameterSet used to configure the module
238  // itself. Also drop the other traces of these labels in the top level
239  // ParameterSet: Remove that labels from @all_modules and from all the
240  // end paths. If this makes any end paths empty, then remove the end path
241  // name from @end_paths, and @paths.
242 
243  // First make a list of labels to drop
244  vstring outputModuleLabels;
245  std::string edmType;
246  std::string const moduleEdmType("@module_edm_type");
247  std::string const outputModule("OutputModule");
248  std::string const edAnalyzer("EDAnalyzer");
249  std::string const edFilter("EDFilter");
250  std::string const edProducer("EDProducer");
251 
252  std::set<std::string> modulesInConfigSet(modulesInConfig.begin(), modulesInConfig.end());
253 
254  //need a list of all modules on paths in order to determine
255  // if an EDAnalyzer only appears on an end path
256  vstring scheduledPaths = proc_pset.getParameter<vstring>("@paths");
257  std::set<std::string> modulesOnPaths;
258  {
259  std::set<std::string> noEndPaths(scheduledPaths.begin(),scheduledPaths.end());
260  for(auto const& endPath: end_path_name_list) {
261  noEndPaths.erase(endPath);
262  }
263  {
264  vstring labels;
265  for(auto const& path: noEndPaths) {
266  labels = proc_pset.getParameter<vstring>(path);
267  modulesOnPaths.insert(labels.begin(),labels.end());
268  }
269  }
270  }
271  //Initially fill labelsToBeDropped with all module mentioned in
272  // the configuration but which are not being used by the system
273  std::vector<std::string> labelsToBeDropped;
274  labelsToBeDropped.reserve(modulesInConfigSet.size());
275  std::set_difference(modulesInConfigSet.begin(),modulesInConfigSet.end(),
276  usedModuleLabels.begin(),usedModuleLabels.end(),
277  std::back_inserter(labelsToBeDropped));
278 
279  const unsigned int sizeBeforeOutputModules = labelsToBeDropped.size();
280  for (auto const& modLabel: usedModuleLabels) {
281  edmType = proc_pset.getParameterSet(modLabel).getParameter<std::string>(moduleEdmType);
282  if (edmType == outputModule) {
283  outputModuleLabels.push_back(modLabel);
284  labelsToBeDropped.push_back(modLabel);
285  }
286  if(edmType == edAnalyzer) {
287  if(modulesOnPaths.end()==modulesOnPaths.find(modLabel)) {
288  labelsToBeDropped.push_back(modLabel);
289  }
290  }
291  }
292  //labelsToBeDropped must be sorted
293  std::inplace_merge(labelsToBeDropped.begin(),
294  labelsToBeDropped.begin()+sizeBeforeOutputModules,
295  labelsToBeDropped.end());
296 
297  // drop the parameter sets used to configure the modules
298  for_all(labelsToBeDropped, std::bind(&ParameterSet::eraseOrSetUntrackedParameterSet, std::ref(proc_pset), _1));
299 
300  // drop the labels from @all_modules
301  vstring::iterator endAfterRemove = std::remove_if(modulesInConfig.begin(), modulesInConfig.end(), std::bind(binary_search_string, std::ref(labelsToBeDropped), _1));
302  modulesInConfig.erase(endAfterRemove, modulesInConfig.end());
303  proc_pset.addParameter<vstring>(std::string("@all_modules"), modulesInConfig);
304 
305  // drop the labels from all end paths
306  vstring endPathsToBeDropped;
307  vstring labels;
308  for (vstring::const_iterator iEndPath = end_path_name_list.begin(), endEndPath = end_path_name_list.end();
309  iEndPath != endEndPath;
310  ++iEndPath) {
311  labels = proc_pset.getParameter<vstring>(*iEndPath);
312  vstring::iterator iSave = labels.begin();
313  vstring::iterator iBegin = labels.begin();
314 
315  for (vstring::iterator iLabel = labels.begin(), iEnd = labels.end();
316  iLabel != iEnd; ++iLabel) {
317  if (binary_search_string(labelsToBeDropped, *iLabel)) {
318  if (binary_search_string(outputModuleLabels, *iLabel)) {
319  outputModulePathPositions[*iLabel].emplace_back(*iEndPath, iSave - iBegin);
320  }
321  } else {
322  if (iSave != iLabel) {
323  iSave->swap(*iLabel);
324  }
325  ++iSave;
326  }
327  }
328  labels.erase(iSave, labels.end());
329  if (labels.empty()) {
330  // remove empty end paths and save their names
331  proc_pset.eraseSimpleParameter(*iEndPath);
332  endPathsToBeDropped.push_back(*iEndPath);
333  } else {
334  proc_pset.addParameter<vstring>(*iEndPath, labels);
335  }
336  }
337  sort_all(endPathsToBeDropped);
338 
339  // remove empty end paths from @paths
340  endAfterRemove = std::remove_if(scheduledPaths.begin(), scheduledPaths.end(), std::bind(binary_search_string, std::ref(endPathsToBeDropped), _1));
341  scheduledPaths.erase(endAfterRemove, scheduledPaths.end());
342  proc_pset.addParameter<vstring>(std::string("@paths"), scheduledPaths);
343 
344  // remove empty end paths from @end_paths
345  vstring scheduledEndPaths = proc_pset.getParameter<vstring>("@end_paths");
346  endAfterRemove = std::remove_if(scheduledEndPaths.begin(), scheduledEndPaths.end(), std::bind(binary_search_string, std::ref(endPathsToBeDropped), _1));
347  scheduledEndPaths.erase(endAfterRemove, scheduledEndPaths.end());
348  proc_pset.addParameter<vstring>(std::string("@end_paths"), scheduledEndPaths);
349 
350  }
351  }
352  // -----------------------------
353 
354  typedef std::vector<std::string> vstring;
355 
356  // -----------------------------
357 
360  ProductRegistry& preg,
361  BranchIDListHelper& branchIDListHelper,
362  ThinnedAssociationsHelper& thinnedAssociationsHelper,
363  ExceptionToActionTable const& actions,
364  std::shared_ptr<ActivityRegistry> areg,
365  std::shared_ptr<ProcessConfiguration> processConfiguration,
366  const ParameterSet* subProcPSet,
368  ProcessContext const* processContext) :
369  //Only create a resultsInserter if there is a trigger path
370  resultsInserter_{tns.getTrigPaths().empty()? std::shared_ptr<TriggerResultInserter>{} :makeInserter(proc_pset,prealloc,preg,actions,areg,processConfiguration)},
371  moduleRegistry_(new ModuleRegistry()),
374  wantSummary_(tns.wantSummary()),
376  {
377  assert(0<prealloc.numberOfStreams());
378  streamSchedules_.reserve(prealloc.numberOfStreams());
379  for(unsigned int i=0; i<prealloc.numberOfStreams();++i) {
380  streamSchedules_.emplace_back(std::make_shared<StreamSchedule>(resultsInserter_,moduleRegistry_,proc_pset,tns,prealloc,preg,branchIDListHelper,actions,areg,processConfiguration,nullptr==subProcPSet,StreamID{i},processContext));
381  }
382 
383  //TriggerResults are injected automatically by StreamSchedules and are
384  // unknown to the ModuleRegistry
385  const std::string kTriggerResults("TriggerResults");
386  std::vector<std::string> modulesToUse;
387  modulesToUse.reserve(streamSchedules_[0]->allWorkers().size());
388  for(auto const& worker : streamSchedules_[0]->allWorkers()) {
389  if(worker->description().moduleLabel() != kTriggerResults) {
390  modulesToUse.push_back(worker->description().moduleLabel());
391  }
392  }
393  //The unscheduled modules are at the end of the list, but we want them at the front
394  unsigned int n = streamSchedules_[0]->numberOfUnscheduledModules();
395  if(n>0) {
396  std::vector<std::string> temp;
397  temp.reserve(modulesToUse.size());
398  auto itBeginUnscheduled = modulesToUse.begin()+modulesToUse.size()-n;
399  std::copy(itBeginUnscheduled,modulesToUse.end(),
400  std::back_inserter(temp));
401  std::copy(modulesToUse.begin(),itBeginUnscheduled,std::back_inserter(temp));
402  temp.swap(modulesToUse);
403  }
404  globalSchedule_.reset( new GlobalSchedule{ resultsInserter_,
406  modulesToUse,
407  proc_pset, preg, prealloc,
408  actions,areg,processConfiguration,processContext });
409 
410  //TriggerResults is not in the top level ParameterSet so the call to
411  // reduceParameterSet would fail to find it. Just remove it up front.
412  std::set<std::string> usedModuleLabels;
413  for( auto const worker: allWorkers()) {
414  if(worker->description().moduleLabel() != kTriggerResults) {
415  usedModuleLabels.insert(worker->description().moduleLabel());
416  }
417  }
418  std::vector<std::string> modulesInConfig(proc_pset.getParameter<std::vector<std::string> >("@all_modules"));
419  std::map<std::string, std::vector<std::pair<std::string, int> > > outputModulePathPositions;
420  reduceParameterSet(proc_pset, tns.getEndPaths(), modulesInConfig, usedModuleLabels,
421  outputModulePathPositions);
422  processEDAliases(proc_pset, processConfiguration->processName(), preg);
423  proc_pset.registerIt();
424  pset::setProcessParameterSetID(proc_pset.id());
425  processConfiguration->setParameterSetID(proc_pset.id());
426  processConfiguration->setProcessConfigurationID();
427 
428  // This is used for a little sanity-check to make sure no code
429  // modifications alter the number of workers at a later date.
430  size_t all_workers_count = allWorkers().size();
431 
432  moduleRegistry_->forAllModuleHolders([this](maker::ModuleHolder* iHolder){
433  auto comm = iHolder->createOutputModuleCommunicator();
434  if (comm) {
435  all_output_communicators_.emplace_back(std::shared_ptr<OutputModuleCommunicator>{comm.release()});
436  }
437  });
438  // Now that the output workers are filled in, set any output limits or information.
439  limitOutput(proc_pset, branchIDListHelper.branchIDLists());
440 
442 
443  // Sanity check: make sure nobody has added a worker after we've
444  // already relied on the WorkerManager being full.
445  assert (all_workers_count == allWorkers().size());
446 
447  branchIDListHelper.updateFromRegistry(preg);
448 
449  preg.setFrozen();
450 
451  for(auto const& worker : streamSchedules_[0]->allWorkers()) {
452  worker->registerThinnedAssociations(preg, thinnedAssociationsHelper);
453  }
454  thinnedAssociationsHelper.sort();
455 
456  for (auto c : all_output_communicators_) {
457  c->setEventSelectionInfo(outputModulePathPositions, preg.anyProductProduced());
458  c->selectProducts(preg, thinnedAssociationsHelper);
459  }
460 
461  if(wantSummary_) {
462  std::vector<const ModuleDescription*> modDesc;
463  const auto& workers = allWorkers();
464  modDesc.reserve(workers.size());
465 
466  std::transform(workers.begin(),workers.end(),
467  std::back_inserter(modDesc),
468  [](const Worker* iWorker) -> const ModuleDescription* {
469  return iWorker->descPtr();
470  });
471 
472  summaryTimeKeeper_.reset(new SystemTimeKeeper(prealloc.numberOfStreams(),
473  modDesc,
474  tns));
475  auto timeKeeperPtr = summaryTimeKeeper_.get();
476 
477  areg->watchPreModuleEvent(timeKeeperPtr, &SystemTimeKeeper::startModuleEvent);
478  areg->watchPostModuleEvent(timeKeeperPtr, &SystemTimeKeeper::stopModuleEvent);
479  areg->watchPreModuleEventDelayedGet(timeKeeperPtr, &SystemTimeKeeper::pauseModuleEvent);
480  areg->watchPostModuleEventDelayedGet(timeKeeperPtr,&SystemTimeKeeper::restartModuleEvent);
481 
482  areg->watchPreSourceEvent(timeKeeperPtr, &SystemTimeKeeper::startEvent);
483  areg->watchPostEvent(timeKeeperPtr, &SystemTimeKeeper::stopEvent);
484 
485  areg->watchPrePathEvent(timeKeeperPtr, &SystemTimeKeeper::startPath);
486  areg->watchPostPathEvent(timeKeeperPtr, &SystemTimeKeeper::stopPath);
487 
488  areg->watchPostBeginJob(timeKeeperPtr, &SystemTimeKeeper::startProcessingLoop);
489  areg->watchPreEndJob(timeKeeperPtr, &SystemTimeKeeper::stopProcessingLoop);
490  //areg->preModuleEventSignal_.connect([timeKeeperPtr](StreamContext const& iContext, ModuleCallingContext const& iMod) {
491  //timeKeeperPtr->startModuleEvent(iContext,iMod);
492  //});
493  }
494 
495  } // Schedule::Schedule
496 
497 
498  void
499  Schedule::limitOutput(ParameterSet const& proc_pset, BranchIDLists const& branchIDLists) {
500  std::string const output("output");
501 
502  ParameterSet const& maxEventsPSet = proc_pset.getUntrackedParameterSet("maxEvents", ParameterSet());
503  int maxEventSpecs = 0;
504  int maxEventsOut = -1;
505  ParameterSet const* vMaxEventsOut = 0;
506  std::vector<std::string> intNamesE = maxEventsPSet.getParameterNamesForType<int>(false);
507  if (search_all(intNamesE, output)) {
508  maxEventsOut = maxEventsPSet.getUntrackedParameter<int>(output);
509  ++maxEventSpecs;
510  }
511  std::vector<std::string> psetNamesE;
512  maxEventsPSet.getParameterSetNames(psetNamesE, false);
513  if (search_all(psetNamesE, output)) {
514  vMaxEventsOut = &maxEventsPSet.getUntrackedParameterSet(output);
515  ++maxEventSpecs;
516  }
517 
518  if (maxEventSpecs > 1) {
520  "\nAt most, one form of 'output' may appear in the 'maxEvents' parameter set";
521  }
522 
523  for (auto c : all_output_communicators_) {
524  OutputModuleDescription desc(branchIDLists, maxEventsOut);
525  if (vMaxEventsOut != 0 && !vMaxEventsOut->empty()) {
526  std::string const& moduleLabel = c->description().moduleLabel();
527  try {
528  desc.maxEvents_ = vMaxEventsOut->getUntrackedParameter<int>(moduleLabel);
529  } catch (Exception const&) {
531  "\nNo entry in 'maxEvents' for output module label '" << moduleLabel << "'.\n";
532  }
533  }
534  c->configure(desc);
535  }
536  }
537 
538  bool Schedule::terminate() const {
539  if (all_output_communicators_.empty()) {
540  return false;
541  }
542  for (auto c : all_output_communicators_) {
543  if (!c->limitReached()) {
544  // Found an output module that has not reached output event count.
545  return false;
546  }
547  }
548  LogInfo("SuccessfulTermination")
549  << "The job is terminating successfully because each output module\n"
550  << "has reached its configured limit.\n";
551  return true;
552  }
553 
555  globalSchedule_->endJob(collector);
556  if (collector.hasThrown()) {
557  return;
558  }
559 
560  if (wantSummary_ == false) return;
561  {
562  TriggerReport tr;
563  getTriggerReport(tr);
564 
565  // The trigger report (pass/fail etc.):
566 
567  LogVerbatim("FwkSummary") << "";
568  LogVerbatim("FwkSummary") << "TrigReport " << "---------- Event Summary ------------";
569  if(!tr.trigPathSummaries.empty()) {
570  LogVerbatim("FwkSummary") << "TrigReport"
571  << " Events total = " << tr.eventSummary.totalEvents
572  << " passed = " << tr.eventSummary.totalEventsPassed
573  << " failed = " << tr.eventSummary.totalEventsFailed
574  << "";
575  } else {
576  LogVerbatim("FwkSummary") << "TrigReport"
577  << " Events total = " << tr.eventSummary.totalEvents
578  << " passed = " << tr.eventSummary.totalEvents
579  << " failed = 0";
580  }
581 
582  LogVerbatim("FwkSummary") << "";
583  LogVerbatim("FwkSummary") << "TrigReport " << "---------- Path Summary ------------";
584  LogVerbatim("FwkSummary") << "TrigReport "
585  << std::right << std::setw(10) << "Trig Bit#" << " "
586  << std::right << std::setw(10) << "Executed" << " "
587  << std::right << std::setw(10) << "Passed" << " "
588  << std::right << std::setw(10) << "Failed" << " "
589  << std::right << std::setw(10) << "Error" << " "
590  << "Name" << "";
591  for (auto const& p: tr.trigPathSummaries) {
592  LogVerbatim("FwkSummary") << "TrigReport "
593  << std::right << std::setw(5) << 1
594  << std::right << std::setw(5) << p.bitPosition << " "
595  << std::right << std::setw(10) << p.timesRun << " "
596  << std::right << std::setw(10) << p.timesPassed << " "
597  << std::right << std::setw(10) << p.timesFailed << " "
598  << std::right << std::setw(10) << p.timesExcept << " "
599  << p.name << "";
600  }
601 
602  /*
603  std::vector<int>::const_iterator epi = empty_trig_paths_.begin();
604  std::vector<int>::const_iterator epe = empty_trig_paths_.end();
605  std::vector<std::string>::const_iterator epn = empty_trig_path_names_.begin();
606  for (; epi != epe; ++epi, ++epn) {
607 
608  LogVerbatim("FwkSummary") << "TrigReport "
609  << std::right << std::setw(5) << 1
610  << std::right << std::setw(5) << *epi << " "
611  << std::right << std::setw(10) << totalEvents() << " "
612  << std::right << std::setw(10) << totalEvents() << " "
613  << std::right << std::setw(10) << 0 << " "
614  << std::right << std::setw(10) << 0 << " "
615  << *epn << "";
616  }
617  */
618 
619  LogVerbatim("FwkSummary") << "";
620  LogVerbatim("FwkSummary") << "TrigReport " << "-------End-Path Summary ------------";
621  LogVerbatim("FwkSummary") << "TrigReport "
622  << std::right << std::setw(10) << "Trig Bit#" << " "
623  << std::right << std::setw(10) << "Executed" << " "
624  << std::right << std::setw(10) << "Passed" << " "
625  << std::right << std::setw(10) << "Failed" << " "
626  << std::right << std::setw(10) << "Error" << " "
627  << "Name" << "";
628  for (auto const& p: tr.endPathSummaries) {
629  LogVerbatim("FwkSummary") << "TrigReport "
630  << std::right << std::setw(5) << 0
631  << std::right << std::setw(5) << p.bitPosition << " "
632  << std::right << std::setw(10) << p.timesRun << " "
633  << std::right << std::setw(10) << p.timesPassed << " "
634  << std::right << std::setw(10) << p.timesFailed << " "
635  << std::right << std::setw(10) << p.timesExcept << " "
636  << p.name << "";
637  }
638 
639  for (auto const& p: tr.trigPathSummaries) {
640  LogVerbatim("FwkSummary") << "";
641  LogVerbatim("FwkSummary") << "TrigReport " << "---------- Modules in Path: " << p.name << " ------------";
642  LogVerbatim("FwkSummary") << "TrigReport "
643  << std::right << std::setw(10) << "Trig Bit#" << " "
644  << std::right << std::setw(10) << "Visited" << " "
645  << std::right << std::setw(10) << "Passed" << " "
646  << std::right << std::setw(10) << "Failed" << " "
647  << std::right << std::setw(10) << "Error" << " "
648  << "Name" << "";
649 
650  unsigned int bitpos = 0;
651  for (auto const& mod: p.moduleInPathSummaries) {
652  LogVerbatim("FwkSummary") << "TrigReport "
653  << std::right << std::setw(5) << 1
654  << std::right << std::setw(5) << bitpos << " "
655  << std::right << std::setw(10) << mod.timesVisited << " "
656  << std::right << std::setw(10) << mod.timesPassed << " "
657  << std::right << std::setw(10) << mod.timesFailed << " "
658  << std::right << std::setw(10) << mod.timesExcept << " "
659  << mod.moduleLabel << "";
660  ++bitpos;
661  }
662  }
663 
664  for (auto const& p: tr.endPathSummaries) {
665  LogVerbatim("FwkSummary") << "";
666  LogVerbatim("FwkSummary") << "TrigReport " << "------ Modules in End-Path: " << p.name << " ------------";
667  LogVerbatim("FwkSummary") << "TrigReport "
668  << std::right << std::setw(10) << "Trig Bit#" << " "
669  << std::right << std::setw(10) << "Visited" << " "
670  << std::right << std::setw(10) << "Passed" << " "
671  << std::right << std::setw(10) << "Failed" << " "
672  << std::right << std::setw(10) << "Error" << " "
673  << "Name" << "";
674 
675  unsigned int bitpos=0;
676  for (auto const& mod: p.moduleInPathSummaries) {
677  LogVerbatim("FwkSummary") << "TrigReport "
678  << std::right << std::setw(5) << 0
679  << std::right << std::setw(5) << bitpos << " "
680  << std::right << std::setw(10) << mod.timesVisited << " "
681  << std::right << std::setw(10) << mod.timesPassed << " "
682  << std::right << std::setw(10) << mod.timesFailed << " "
683  << std::right << std::setw(10) << mod.timesExcept << " "
684  << mod.moduleLabel << "";
685  ++bitpos;
686  }
687  }
688 
689  LogVerbatim("FwkSummary") << "";
690  LogVerbatim("FwkSummary") << "TrigReport " << "---------- Module Summary ------------";
691  LogVerbatim("FwkSummary") << "TrigReport "
692  << std::right << std::setw(10) << "Visited" << " "
693  << std::right << std::setw(10) << "Executed" << " "
694  << std::right << std::setw(10) << "Passed" << " "
695  << std::right << std::setw(10) << "Failed" << " "
696  << std::right << std::setw(10) << "Error" << " "
697  << "Name" << "";
698  for (auto const& worker : tr.workerSummaries) {
699  LogVerbatim("FwkSummary") << "TrigReport "
700  << std::right << std::setw(10) << worker.timesVisited << " "
701  << std::right << std::setw(10) << worker.timesRun << " "
702  << std::right << std::setw(10) << worker.timesPassed << " "
703  << std::right << std::setw(10) << worker.timesFailed << " "
704  << std::right << std::setw(10) << worker.timesExcept << " "
705  << worker.moduleLabel << "";
706  }
707  LogVerbatim("FwkSummary") << "";
708  }
709  // The timing report (CPU and Real Time):
712 
713  const int totalEvents = std::max(1, tr.eventSummary.totalEvents);
714 
715  LogVerbatim("FwkSummary") << "TimeReport " << "---------- Event Summary ---[sec]----";
716  LogVerbatim("FwkSummary") << "TimeReport"
717  << std::setprecision(6) << std::fixed
718  << " event loop CPU/event = " << tr.eventSummary.cpuTime/totalEvents;
719  LogVerbatim("FwkSummary") << "TimeReport"
720  << std::setprecision(6) << std::fixed
721  << " event loop Real/event = " << tr.eventSummary.realTime/totalEvents;
722  LogVerbatim("FwkSummary") << "TimeReport"
723  << std::setprecision(6) << std::fixed
724  << " sum Streams Real/event = " << tr.eventSummary.sumStreamRealTime/totalEvents;
725  LogVerbatim("FwkSummary") << "TimeReport"
726  << std::setprecision(6) << std::fixed
727  << " efficiency CPU/Real/thread = " << tr.eventSummary.cpuTime/tr.eventSummary.realTime/preallocConfig_.numberOfThreads();
728 
729  constexpr int kColumn1Size = 10;
730  constexpr int kColumn2Size = 12;
731  constexpr int kColumn3Size = 12;
732  LogVerbatim("FwkSummary") << "";
733  LogVerbatim("FwkSummary") << "TimeReport " << "---------- Path Summary ---[Real sec]----";
734  LogVerbatim("FwkSummary") << "TimeReport "
735  << std::right << std::setw(kColumn1Size) << "per event"<<" "
736  << std::right << std::setw(kColumn2Size) << "per exec"
737  << " Name";
738  for (auto const& p: tr.trigPathSummaries) {
739  const int timesRun = std::max(1, p.timesRun);
740  LogVerbatim("FwkSummary") << "TimeReport "
741  << std::setprecision(6) << std::fixed
742  << std::right << std::setw(kColumn1Size) << p.realTime/totalEvents << " "
743  << std::right << std::setw(kColumn2Size) << p.realTime/timesRun << " "
744  << p.name << "";
745  }
746  LogVerbatim("FwkSummary") << "TimeReport "
747  << std::right << std::setw(kColumn1Size) << "per event"<<" "
748  << std::right << std::setw(kColumn2Size) << "per exec"
749  << " Name" << "";
750 
751  LogVerbatim("FwkSummary") << "";
752  LogVerbatim("FwkSummary") << "TimeReport " << "-------End-Path Summary ---[Real sec]----";
753  LogVerbatim("FwkSummary") << "TimeReport "
754  << std::right << std::setw(kColumn1Size) << "per event" <<" "
755  << std::right << std::setw(kColumn2Size) << "per exec"
756  << " Name" << "";
757  for (auto const& p: tr.endPathSummaries) {
758  const int timesRun = std::max(1, p.timesRun);
759 
760  LogVerbatim("FwkSummary") << "TimeReport "
761  << std::setprecision(6) << std::fixed
762  << std::right << std::setw(kColumn1Size) << p.realTime/totalEvents << " "
763  << std::right << std::setw(kColumn2Size) << p.realTime/timesRun << " "
764  << p.name << "";
765  }
766  LogVerbatim("FwkSummary") << "TimeReport "
767  << std::right << std::setw(kColumn1Size) << "per event" <<" "
768  << std::right << std::setw(kColumn2Size) << "per exec"
769  << " Name" << "";
770 
771  for (auto const& p: tr.trigPathSummaries) {
772  LogVerbatim("FwkSummary") << "";
773  LogVerbatim("FwkSummary") << "TimeReport " << "---------- Modules in Path: " << p.name << " ---[Real sec]----";
774  LogVerbatim("FwkSummary") << "TimeReport "
775  << std::right << std::setw(kColumn1Size) << "per event" <<" "
776  << std::right << std::setw(kColumn2Size) << "per visit"
777  << " Name" << "";
778  for (auto const& mod: p.moduleInPathSummaries) {
779  LogVerbatim("FwkSummary") << "TimeReport "
780  << std::setprecision(6) << std::fixed
781  << std::right << std::setw(kColumn1Size) << mod.realTime/totalEvents << " "
782  << std::right << std::setw(kColumn2Size) << mod.realTime/std::max(1, mod.timesVisited) << " "
783  << mod.moduleLabel << "";
784  }
785  }
786  if(not tr.trigPathSummaries.empty()) {
787  LogVerbatim("FwkSummary") << "TimeReport "
788  << std::right << std::setw(kColumn1Size) << "per event" <<" "
789  << std::right << std::setw(kColumn2Size) << "per visit"
790  << " Name" << "";
791  }
792  for (auto const& p: tr.endPathSummaries) {
793  LogVerbatim("FwkSummary") << "";
794  LogVerbatim("FwkSummary") << "TimeReport " << "------ Modules in End-Path: " << p.name << " ---[Real sec]----";
795  LogVerbatim("FwkSummary") << "TimeReport "
796  << std::right << std::setw(kColumn1Size) << "per event" <<" "
797  << std::right << std::setw(kColumn2Size) << "per visit"
798  << " Name" << "";
799  for (auto const& mod: p.moduleInPathSummaries) {
800  LogVerbatim("FwkSummary") << "TimeReport "
801  << std::setprecision(6) << std::fixed
802  << std::right << std::setw(kColumn1Size) << mod.realTime/totalEvents << " "
803  << std::right << std::setw(kColumn2Size) << mod.realTime/std::max(1, mod.timesVisited) << " "
804  << mod.moduleLabel << "";
805  }
806  }
807  if(not tr.endPathSummaries.empty()) {
808  LogVerbatim("FwkSummary") << "TimeReport "
809  << std::right << std::setw(kColumn1Size) << "per event" <<" "
810  << std::right << std::setw(kColumn2Size) << "per visit"
811  << " Name" << "";
812  }
813  LogVerbatim("FwkSummary") << "";
814  LogVerbatim("FwkSummary") << "TimeReport " << "---------- Module Summary ---[Real sec]----";
815  LogVerbatim("FwkSummary") << "TimeReport "
816  << std::right << std::setw(kColumn1Size) << "per event" <<" "
817  << std::right << std::setw(kColumn2Size) << "per exec" <<" "
818  << std::right << std::setw(kColumn3Size) << "per visit"
819  << " Name" << "";
820  for (auto const& worker : tr.workerSummaries) {
821  LogVerbatim("FwkSummary") << "TimeReport "
822  << std::setprecision(6) << std::fixed
823  << std::right << std::setw(kColumn1Size) << worker.realTime/totalEvents << " "
824  << std::right << std::setw(kColumn2Size) << worker.realTime/std::max(1, worker.timesRun) << " "
825  << std::right << std::setw(kColumn3Size) << worker.realTime/std::max(1, worker.timesVisited) << " "
826  << worker.moduleLabel << "";
827  }
828  LogVerbatim("FwkSummary") << "TimeReport "
829  << std::right << std::setw(kColumn1Size) << "per event" <<" "
830  << std::right << std::setw(kColumn2Size) << "per exec" <<" "
831  << std::right << std::setw(kColumn3Size) << "per visit"
832  << " Name" << "";
833 
834  LogVerbatim("FwkSummary") << "";
835  LogVerbatim("FwkSummary") << "T---Report end!" << "";
836  LogVerbatim("FwkSummary") << "";
837  }
838 
840  using std::placeholders::_1;
842  }
843 
845  using std::placeholders::_1;
847  }
848 
850  using std::placeholders::_1;
852  }
853 
854  void Schedule::writeRun(RunPrincipal const& rp, ProcessContext const* processContext) {
855  using std::placeholders::_1;
856  for_all(all_output_communicators_, std::bind(&OutputModuleCommunicator::writeRun, _1, std::cref(rp), processContext));
857  }
858 
859  void Schedule::writeLumi(LuminosityBlockPrincipal const& lbp, ProcessContext const* processContext) {
860  using std::placeholders::_1;
861  for_all(all_output_communicators_, std::bind(&OutputModuleCommunicator::writeLumi, _1, std::cref(lbp), processContext));
862  }
863 
865  using std::placeholders::_1;
866  // Return true iff at least one output module returns true.
867  return (std::find_if (all_output_communicators_.begin(), all_output_communicators_.end(),
869  != all_output_communicators_.end());
870  }
871 
873  using std::placeholders::_1;
874  for_all(allWorkers(), std::bind(&Worker::respondToOpenInputFile, _1, std::cref(fb)));
875  }
876 
878  using std::placeholders::_1;
879  for_all(allWorkers(), std::bind(&Worker::respondToCloseInputFile, _1, std::cref(fb)));
880  }
881 
882  void Schedule::beginJob(ProductRegistry const& iRegistry) {
884 
885  globalSchedule_->beginJob(iRegistry);
886  }
887 
888  void Schedule::beginStream(unsigned int iStreamID) {
889  assert(iStreamID<streamSchedules_.size());
890  streamSchedules_[iStreamID]->beginStream();
891  }
892 
893  void Schedule::endStream(unsigned int iStreamID) {
894  assert(iStreamID<streamSchedules_.size());
895  streamSchedules_[iStreamID]->endStream();
896  }
897 
899  using std::placeholders::_1;
901  }
902  void Schedule::postForkReacquireResources(unsigned int iChildIndex, unsigned int iNumberOfChildren) {
903  using std::placeholders::_1;
904  for_all(allWorkers(), std::bind(&Worker::postForkReacquireResources, _1, iChildIndex, iNumberOfChildren));
905  }
906 
908  ParameterSet const& iPSet,
909  const ProductRegistry& iRegistry) {
910  Worker* found = nullptr;
911  for (auto const& worker : allWorkers()) {
912  if (worker->description().moduleLabel() == iLabel) {
913  found = worker;
914  break;
915  }
916  }
917  if (nullptr == found) {
918  return false;
919  }
920 
921  auto newMod = moduleRegistry_->replaceModule(iLabel,iPSet,preallocConfig_);
922 
923  globalSchedule_->replaceModule(newMod,iLabel);
924 
925  for(auto s: streamSchedules_) {
926  s->replaceModule(newMod,iLabel);
927  }
928 
929  {
930  //Need to updateLookup in order to make getByToken work
931  auto const runLookup = iRegistry.productLookup(InRun);
932  auto const lumiLookup = iRegistry.productLookup(InLumi);
933  auto const eventLookup = iRegistry.productLookup(InEvent);
934  found->updateLookup(InRun,*runLookup);
935  found->updateLookup(InLumi,*lumiLookup);
936  found->updateLookup(InEvent,*eventLookup);
937  }
938 
939  return true;
940  }
941 
942  std::vector<ModuleDescription const*>
944  std::vector<ModuleDescription const*> result;
945  result.reserve(allWorkers().size());
946 
947  for (auto const& worker : allWorkers()) {
948  ModuleDescription const* p = worker->descPtr();
949  result.push_back(p);
950  }
951  return result;
952  }
953 
954  Schedule::AllWorkers const&
956  return globalSchedule_->allWorkers();
957  }
958 
959  void
960  Schedule::availablePaths(std::vector<std::string>& oLabelsToFill) const {
961  streamSchedules_[0]->availablePaths(oLabelsToFill);
962  }
963 
964  void
965  Schedule::triggerPaths(std::vector<std::string>& oLabelsToFill) const {
966  streamSchedules_[0]->triggerPaths(oLabelsToFill);
967  }
968 
969  void
970  Schedule::endPaths(std::vector<std::string>& oLabelsToFill) const {
971  streamSchedules_[0]->endPaths(oLabelsToFill);
972  }
973 
974  void
976  std::vector<std::string>& oLabelsToFill) const {
977  streamSchedules_[0]->modulesInPath(iPathLabel,oLabelsToFill);
978  }
979 
980  void
982  std::vector<ModuleDescription const*>& descriptions,
983  unsigned int hint) const {
984  streamSchedules_[0]->moduleDescriptionsInPath(iPathLabel, descriptions, hint);
985  }
986 
987  void
989  std::vector<ModuleDescription const*>& descriptions,
990  unsigned int hint) const {
991  streamSchedules_[0]->moduleDescriptionsInEndPath(iEndPathLabel, descriptions, hint);
992  }
993 
994  void
995  Schedule::fillModuleAndConsumesInfo(std::vector<ModuleDescription const*>& allModuleDescriptions,
996  std::vector<std::pair<unsigned int, unsigned int> >& moduleIDToIndex,
997  std::vector<std::vector<ModuleDescription const*> >& modulesWhoseProductsAreConsumedBy,
998  ProductRegistry const& preg) const {
999  allModuleDescriptions.clear();
1000  moduleIDToIndex.clear();
1001  modulesWhoseProductsAreConsumedBy.clear();
1002 
1003  allModuleDescriptions.reserve(allWorkers().size());
1004  moduleIDToIndex.reserve(allWorkers().size());
1005  modulesWhoseProductsAreConsumedBy.resize(allWorkers().size());
1006 
1007  std::map<std::string, ModuleDescription const*> labelToDesc;
1008  unsigned int i = 0;
1009  for (auto const& worker : allWorkers()) {
1010  ModuleDescription const* p = worker->descPtr();
1011  allModuleDescriptions.push_back(p);
1012  moduleIDToIndex.push_back(std::pair<unsigned int, unsigned int>(p->id(), i));
1013  labelToDesc[p->moduleLabel()] = p;
1014  ++i;
1015  }
1016  sort_all(moduleIDToIndex);
1017 
1018  i = 0;
1019  for (auto const& worker : allWorkers()) {
1020  std::vector<ModuleDescription const*>& modules = modulesWhoseProductsAreConsumedBy.at(i);
1021  worker->modulesWhoseProductsAreConsumed(modules, preg, labelToDesc);
1022  ++i;
1023  }
1024  }
1025 
1026  void
1028  endpathsAreActive_ = active;
1029  for(auto const & s : streamSchedules_) {
1030  s->enableEndPaths(active);
1031  }
1032  }
1033 
1034  bool
1036  return endpathsAreActive_;
1037  }
1038 
1039  void
1041  rep.eventSummary.totalEvents = 0;
1044  for(auto& s: streamSchedules_) {
1045  s->getTriggerReport(rep);
1046  }
1047  }
1048 
1049  void
1051  rep.eventSummary.totalEvents = 0;
1052  rep.eventSummary.cpuTime = 0.;
1053  rep.eventSummary.realTime = 0.;
1054  summaryTimeKeeper_->fillTriggerTimingReport(rep);
1055  }
1056 
1057  int
1059  int returnValue = 0;
1060  for(auto& s: streamSchedules_) {
1061  returnValue += s->totalEvents();
1062  }
1063  return returnValue;
1064  }
1065 
1066  int
1068  int returnValue = 0;
1069  for(auto& s: streamSchedules_) {
1070  returnValue += s->totalEventsPassed();
1071  }
1072  return returnValue;
1073  }
1074 
1075  int
1077  int returnValue = 0;
1078  for(auto& s: streamSchedules_) {
1079  returnValue += s->totalEventsFailed();
1080  }
1081  return returnValue;
1082  }
1083 
1084 
1085  void
1087  for(auto const& s: streamSchedules_) {
1088  s->clearCounters();
1089  }
1090  }
1091 
1092  //====================================
1093  // Schedule::checkForCorrectness algorithm
1094  //
1095  // The code creates a 'dependency' graph between all
1096  // modules. A module depends on another module if
1097  // 1) it 'consumes' data produced by that module
1098  // 2) it appears directly after the module within a Path
1099  //
1100  // If there is a cycle in the 'dependency' graph then
1101  // the schedule may be unrunnable. The schedule is still
1102  // runnable if all cycles have at least two edges which
1103  // connect modules only by Path dependencies (i.e. not
1104  // linked by a data dependency).
1105  //
1106  // Example 1:
1107  // C consumes data from B
1108  // Path 1: A + B + C
1109  // Path 2: B + C + A
1110  //
1111  // Cycle: A after C [p2], C consumes B, B after A [p1]
1112  // Since this cycle has 2 path only edges it is OK since
1113  // A and (B+C) are independent so their run order doesn't matter
1114  //
1115  // Example 2:
1116  // B consumes A
1117  // C consumes B
1118  // Path: C + A
1119  //
1120  // Cycle: A after C [p], C consumes B, B consumes A
1121  // Since this cycle has 1 path only edge it is unrunnable.
1122  //
1123  // Example 3:
1124  // A consumes B
1125  // B consumes C
1126  // C consumes A
1127  // (no Path since unscheduled execution)
1128  //
1129  // Cycle: A consumes B, B consumes C, C consumes A
1130  // Since this cycle has 0 path only edges it is unrunnable.
1131  //====================================
1132 
1133  namespace {
1134  typedef std::pair<unsigned int, unsigned int> SimpleEdge;
1135  typedef std::map<SimpleEdge, std::vector<unsigned int>> EdgeToPathMap;
1136 
1137  typedef boost::adjacency_list<boost::vecS, boost::vecS, boost::bidirectionalS> Graph;
1138 
1139  typedef boost::graph_traits<Graph>::edge_descriptor Edge;
1140  struct cycle_detector : public boost::dfs_visitor<> {
1141 
1142  cycle_detector(EdgeToPathMap const& iEdgeToPathMap,
1143  std::vector<std::string> const& iPathNames,
1144  std::map<std::string,unsigned int> const& iModuleNamesToIndex):
1145  m_edgeToPathMap(iEdgeToPathMap),
1146  m_pathNames(iPathNames),
1147  m_namesToIndex(iModuleNamesToIndex){}
1148 
1149  void tree_edge(Edge iEdge, Graph const&) {
1150  m_stack.push_back(iEdge);
1151  }
1152 
1153  void finish_edge(Edge iEdge, Graph const& iGraph) {
1154  if(not m_stack.empty()) {
1155  if (iEdge == m_stack.back()) {
1156  m_stack.pop_back();
1157  }
1158  }
1159  }
1160 
1161  //Called if a cycle happens
1162  void back_edge(Edge iEdge, Graph const& iGraph) {
1163  //NOTE: If the path containing the cycle contains two or more
1164  // path only edges then there is no problem
1165 
1167  IndexMap const& index = get(boost::vertex_index, iGraph);
1168 
1169  unsigned int vertex = index[target(iEdge,iGraph)];
1170 
1171  //Find last edge which starts with this vertex
1172  std::list<Edge>::iterator itFirst = m_stack.begin();
1173  {
1174  bool seenVertex = false;
1175  while(itFirst != m_stack.end()) {
1176  if(not seenVertex) {
1177  if(index[source(*itFirst,iGraph)] == vertex) {
1178  seenVertex = true;
1179  }
1180  } else
1181  if (index[source(*itFirst,iGraph)] != vertex) {
1182  break;
1183  }
1184  ++itFirst;
1185  }
1186  if(itFirst != m_stack.begin()) {
1187  --itFirst;
1188  }
1189  }
1190  //This edge has not been added to the stack yet
1191  // making a copy allows us to add it in but not worry
1192  // about removing it at the end of the routine
1193  std::vector<Edge> tempStack;
1194  tempStack.reserve(m_stack.size()+1);
1195  tempStack.insert(tempStack.end(),itFirst,m_stack.end());
1196  tempStack.emplace_back(iEdge);
1197 
1198  unsigned int nPathDependencyOnly =0;
1199  for(auto const& edge: tempStack) {
1200  unsigned int in =index[source(edge,iGraph)];
1201  unsigned int out =index[target(edge,iGraph)];
1202 
1203  auto iFound = m_edgeToPathMap.find(SimpleEdge(in,out));
1204  bool pathDependencyOnly = true;
1205  for(auto dependency : iFound->second) {
1206  if (dependency == std::numeric_limits<unsigned int>::max()) {
1207  pathDependencyOnly = false;
1208  break;
1209  }
1210  }
1211  if (pathDependencyOnly) {
1212  ++nPathDependencyOnly;
1213  }
1214  }
1215  if(nPathDependencyOnly < 2) {
1216  reportError(tempStack,index,iGraph);
1217  }
1218  }
1219  private:
1220  std::string const& pathName(unsigned int iIndex) const {
1221  return m_pathNames[iIndex];
1222  }
1223 
1224  std::string const& moduleName(unsigned int iIndex) const {
1225  for(auto const& item : m_namesToIndex) {
1226  if(item.second == iIndex) {
1227  return item.first;
1228  }
1229  }
1230  assert(false);
1231  }
1232 
1233  void
1234  reportError(std::vector<Edge>const& iEdges,
1236  Graph const& iGraph) const {
1237  std::stringstream oStream;
1238  oStream <<"Module run order problem found: \n";
1239  bool first_edge = true;
1240  for(auto const& edge: iEdges) {
1241  unsigned int in =iIndex[source(edge,iGraph)];
1242  unsigned int out =iIndex[target(edge,iGraph)];
1243 
1244  if(first_edge) {
1245  first_edge = false;
1246  } else {
1247  oStream<<", ";
1248  }
1249  oStream <<moduleName(in);
1250 
1251  auto iFound = m_edgeToPathMap.find(SimpleEdge(in,out));
1252  bool pathDependencyOnly = true;
1253  for(auto dependency : iFound->second) {
1254  if (dependency == std::numeric_limits<unsigned int>::max()) {
1255  pathDependencyOnly = false;
1256  break;
1257  }
1258  }
1259  if (pathDependencyOnly) {
1260  oStream <<" after "<<moduleName(out)<<" [path "<<pathName(iFound->second[0])<<"]";
1261  } else {
1262  oStream <<" consumes "<<moduleName(out);
1263  }
1264  }
1265  oStream<<"\n Running in the threaded framework would lead to indeterminate results."
1266  "\n Please change order of modules in mentioned Path(s) to avoid inconsistent module ordering.";
1267 
1268  LogError("UnrunnableSchedule")<<oStream.str();
1269  }
1270 
1271  EdgeToPathMap const& m_edgeToPathMap;
1272  std::vector<std::string> const& m_pathNames;
1273  std::map<std::string,unsigned int> m_namesToIndex;
1274 
1275  std::list<Edge> m_stack;
1276  };
1277  }
1278 
1279  void
1281  {
1282  //Need to lookup names to ids quickly
1283  std::map<std::string,unsigned int> moduleNamesToIndex;
1284  for(auto worker: allWorkers()) {
1285  moduleNamesToIndex.insert( std::make_pair(worker->description().moduleLabel(),
1286  worker->description().id()));
1287  }
1288 
1289  //If a module to module dependency comes from a path, remember which path
1290  EdgeToPathMap edgeToPathMap;
1291 
1292  //determine the path dependencies
1293  std::vector<std::string> pathNames;
1294  {
1295  streamSchedules_[0]->availablePaths(pathNames);
1296 
1297  std::vector<std::string> moduleNames;
1298  std::vector<std::string> reducedModuleNames;
1299  unsigned int pathIndex=0;
1300  for(auto const& path: pathNames) {
1301  moduleNames.clear();
1302  reducedModuleNames.clear();
1303  std::set<std::string> alreadySeenNames;
1304 
1305  streamSchedules_[0]->modulesInPath(path,moduleNames);
1306  std::string lastModuleName;
1307  unsigned int lastModuleIndex;
1308  for(auto const& name: moduleNames) {
1309  auto found = alreadySeenNames.insert(name);
1310  if(found.second) {
1311  //first time for this path
1312  unsigned int moduleIndex = moduleNamesToIndex[name];
1313  if(not lastModuleName.empty() ) {
1314  edgeToPathMap[std::make_pair(moduleIndex,lastModuleIndex)].push_back(pathIndex);
1315  }
1316  lastModuleName = name;
1317  lastModuleIndex = moduleIndex;
1318  }
1319  }
1320  ++pathIndex;
1321  }
1322  }
1323  {
1324  std::vector<const char*> dependentModules;
1325  //determine the data dependencies
1326  for(auto const& worker: allWorkers()) {
1327  dependentModules.clear();
1328  //NOTE: what about aliases?
1329  worker->modulesDependentUpon(dependentModules);
1330  auto found = moduleNamesToIndex.find(worker->description().moduleLabel());
1331  if (found == moduleNamesToIndex.end()) {
1332  //The module was from a previous process
1333  continue;
1334  }
1335  unsigned int moduleIndex = found->second;
1336  for(auto name: dependentModules) {
1337  edgeToPathMap[std::make_pair(moduleIndex, moduleNamesToIndex[name])].push_back(std::numeric_limits<unsigned int>::max());
1338  }
1339  }
1340  }
1341  //Now use boost graph library to find cycles in the dependencies
1342  std::vector<SimpleEdge> outList;
1343  outList.reserve(edgeToPathMap.size());
1344  for(auto const& edgeInfo: edgeToPathMap) {
1345  outList.push_back(edgeInfo.first);
1346  }
1347 
1348  Graph g(outList.begin(),outList.end(), moduleNamesToIndex.size());
1349 
1350  cycle_detector detector(edgeToPathMap,pathNames,moduleNamesToIndex);
1351  boost::depth_first_search(g,boost::visitor(detector));
1352  }
1353 }
type
Definition: HCALResponse.h:21
bool empty() const
Definition: ParameterSet.h:217
std::vector< PathSummary > endPathSummaries
Definition: TriggerReport.h:64
T getUntrackedParameter(std::string const &, T const &) const
std::vector< PathTimingSummary > endPathSummaries
dictionary aliases
Definition: autoCond.py:36
int i
Definition: DBlmapReader.cc:9
string rep
Definition: cuy.py:1188
void checkForCorrectness() const
Check that the schedule is actually runable.
Definition: Schedule.cc:1280
void stopEvent(StreamContext const &)
std::vector< BranchIDList > BranchIDLists
Definition: BranchIDList.h:19
void fillModuleAndConsumesInfo(std::vector< ModuleDescription const * > &allModuleDescriptions, std::vector< std::pair< unsigned int, unsigned int > > &moduleIDToIndex, std::vector< std::vector< ModuleDescription const * > > &modulesWhoseProductsAreConsumedBy, ProductRegistry const &preg) const
Definition: Schedule.cc:995
virtual void openFile(FileBlock const &fb)=0
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
Definition: Schedule.cc:955
void availablePaths(std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill the labels for all paths in the process
Definition: Schedule.cc:960
static std::string const source("source")
virtual void writeRun(RunPrincipal const &rp, ProcessContext const *)=0
preallocConfig_(prealloc)
void restartModuleEvent(StreamContext const &, ModuleCallingContext const &)
bool endPathsEnabled() const
Definition: Schedule.cc:1035
all_output_communicators_()
void respondToCloseInputFile(FileBlock const &fb)
Definition: Schedule.cc:877
void startModuleEvent(StreamContext const &, ModuleCallingContext const &)
std::vector< Worker * > AllWorkers
Definition: Schedule.h:117
std::vector< ParameterSet > VParameterSet
Definition: ParameterSet.h:33
assert(m_qm.get())
void writeRun(RunPrincipal const &rp, ProcessContext const *)
Definition: Schedule.cc:854
void endStream(unsigned int)
Definition: Schedule.cc:893
void writeLumi(LuminosityBlockPrincipal const &lbp, ProcessContext const *)
Definition: Schedule.cc:859
void enableEndPaths(bool active)
Definition: Schedule.cc:1027
processConfiguration
Definition: Schedule.cc:370
std::shared_ptr< ProductHolderIndexHelper > const & productLookup(BranchType branchType) const
ParameterSet getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
endpathsAreActive_(true)
Definition: Schedule.cc:375
std::shared_ptr< ModuleRegistry > moduleRegistry_
Definition: Schedule.h:274
void moduleDescriptionsInEndPath(std::string const &iEndPathLabel, std::vector< ModuleDescription const * > &descriptions, unsigned int hint) const
Definition: Schedule.cc:988
std::vector< WorkerSummary > workerSummaries
Definition: TriggerReport.h:65
actions
Definition: Schedule.cc:370
std::string const & moduleLabel() const
static unsigned int getUniqueID()
Returns a unique id each time called. Intended to be passed to ModuleDescription&#39;s constructor&#39;s modI...
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e g
Definition: Activities.doc:4
#define constexpr
int totalEventsFailed() const
Definition: Schedule.cc:1076
bool changeModule(std::string const &iLabel, ParameterSet const &iPSet, const ProductRegistry &iRegistry)
Definition: Schedule.cc:907
void eraseOrSetUntrackedParameterSet(std::string const &name)
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
std::vector< std::string > getParameterNamesForType(bool trackiness=true) const
Definition: ParameterSet.h:193
int totalEventsPassed() const
Definition: Schedule.cc:1067
void triggerPaths(std::vector< std::string > &oLabelsToFill) const
Definition: Schedule.cc:965
std::string moduleName(Provenance const &provenance)
Definition: Provenance.cc:27
virtual void updateLookup(BranchType iBranchType, ProductHolderIndexHelper const &)=0
tuple path
else: Piece not in the list, fine.
std::vector< PathSummary > trigPathSummaries
Definition: TriggerReport.h:63
EventSummary eventSummary
Definition: TriggerReport.h:62
void limitOutput(ParameterSet const &proc_pset, BranchIDLists const &branchIDLists)
Definition: Schedule.cc:499
wantSummary_(tns.wantSummary())
int totalEvents() const
Definition: Schedule.cc:1058
virtual void openNewFileIfNeeded()=0
EventTimingSummary eventSummary
Schedule(ParameterSet &proc_pset, service::TriggerNamesService &tns, ProductRegistry &pregistry, BranchIDListHelper &branchIDListHelper, ThinnedAssociationsHelper &thinnedAssociationsHelper, ExceptionToActionTable const &actions, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration > processConfiguration, const ParameterSet *subProcPSet, PreallocationConfiguration const &config, ProcessContext const *processContext)
Definition: Schedule.cc:358
void clearCounters()
Clear all the counters in the trigger report.
Definition: Schedule.cc:1086
tuple result
Definition: query.py:137
std::vector< PathTimingSummary > trigPathSummaries
std::vector< std::shared_ptr< StreamSchedule > > streamSchedules_
Definition: Schedule.h:275
void setProcessParameterSetID(ParameterSetID const &id)
Associated free functions.
Definition: Registry.cc:79
bool terminate() const
Return whether each output module has reached its maximum count.
Definition: Schedule.cc:538
void respondToOpenInputFile(FileBlock const &fb)
Definition: Schedule.cc:872
virtual void writeLumi(LuminosityBlockPrincipal const &lbp, ProcessContext const *)=0
void stopPath(StreamContext const &, PathContext const &, HLTPathStatus const &)
areg
Definition: Schedule.cc:370
void stopModuleEvent(StreamContext const &, ModuleCallingContext const &)
void postForkReacquireResources(unsigned int iChildIndex, unsigned int iNumberOfChildren)
Definition: Worker.h:91
void getTriggerReport(TriggerReport &rep) const
Definition: Schedule.cc:1040
tuple out
Definition: dbtoconf.py:99
virtual bool shouldWeCloseFile() const =0
PreallocationConfiguration preallocConfig_
Definition: Schedule.h:280
moduleRegistry_(new ModuleRegistry())
void sort_all(RandomAccessSequence &s)
wrappers for std::sort
Definition: Algorithms.h:120
volatile bool endpathsAreActive_
Definition: Schedule.h:286
void respondToOpenInputFile(FileBlock const &fb)
Definition: Worker.h:87
void startPath(StreamContext const &, PathContext const &)
bool search_all(ForwardSequence const &s, Datum const &d)
Definition: Algorithms.h:46
std::unique_ptr< SystemTimeKeeper > summaryTimeKeeper_
Definition: Schedule.h:282
virtual std::unique_ptr< OutputModuleCommunicator > createOutputModuleCommunicator()=0
AllOutputModuleCommunicators all_output_communicators_
Definition: Schedule.h:279
void loadMissingDictionaries()
void modulesInPath(std::string const &iPathLabel, std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel ...
Definition: Schedule.cc:975
void pauseModuleEvent(StreamContext const &, ModuleCallingContext const &)
void beginStream(unsigned int)
Definition: Schedule.cc:888
void respondToCloseInputFile(FileBlock const &fb)
Definition: Worker.h:88
void preForkReleaseResources()
Definition: Worker.h:90
bool wantSummary_
Definition: Schedule.h:284
void postForkReacquireResources(unsigned int iChildIndex, unsigned int iNumberOfChildren)
Definition: Schedule.cc:902
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
Definition: Schedule.cc:943
Strings const & getTrigPaths() const
std::unique_ptr< GlobalSchedule > globalSchedule_
Definition: Schedule.h:277
void beginJob(ProductRegistry const &)
Definition: Schedule.cc:882
void openNewOutputFilesIfNeeded()
Definition: Schedule.cc:844
size_t getParameterSetNames(std::vector< std::string > &output, bool trackiness=true) const
void preForkReleaseResources()
Definition: Schedule.cc:898
void openOutputFiles(FileBlock &fb)
Definition: Schedule.cc:849
preg
Definition: Schedule.cc:370
std::vector< WorkerTimingSummary > workerSummaries
T mod(const T &a, const T &b)
Definition: ecalDccMap.h:4
void endJob(ExceptionCollector &collector)
Definition: Schedule.cc:554
void getTriggerTimingReport(TriggerTimingReport &rep) const
Definition: Schedule.cc:1050
bool shouldWeCloseOutput() const
Definition: Schedule.cc:864
std::vector< std::string > vstring
Definition: Schedule.cc:354
tuple size
Write out results.
void closeOutputFiles()
Definition: Schedule.cc:839
void endPaths(std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill the labels for all end paths in the process
Definition: Schedule.cc:970
void moduleDescriptionsInPath(std::string const &iPathLabel, std::vector< ModuleDescription const * > &descriptions, unsigned int hint) const
Definition: Schedule.cc:981
prealloc
Definition: Schedule.cc:370
unsigned int id() const
std::string match(BranchDescription const &a, BranchDescription const &b, std::string const &fileName)