CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
Schedule.cc
Go to the documentation of this file.
2 
24 
25 #include "boost/graph/graph_traits.hpp"
26 #include "boost/graph/adjacency_list.hpp"
27 #include "boost/graph/depth_first_search.hpp"
28 #include "boost/graph/visitors.hpp"
29 
30 
31 #include <algorithm>
32 #include <cassert>
33 #include <cstdlib>
34 #include <functional>
35 #include <iomanip>
36 #include <list>
37 #include <map>
38 #include <exception>
39 #include <sstream>
40 
41 namespace edm {
42  namespace {
43  using std::placeholders::_1;
44 
45  bool binary_search_string(std::vector<std::string> const& v, std::string const& s) {
46  return std::binary_search(v.begin(), v.end(), s);
47  }
48 
49  // Here we make the trigger results inserter directly. This should
50  // probably be a utility in the WorkerRegistry or elsewhere.
51 
52  std::shared_ptr<TriggerResultInserter>
53  makeInserter(ParameterSet& proc_pset,
54  PreallocationConfiguration const& iPrealloc,
55  ProductRegistry& preg,
56  ExceptionToActionTable const& actions,
57  std::shared_ptr<ActivityRegistry> areg,
58  std::shared_ptr<ProcessConfiguration> processConfiguration) {
59 
60  ParameterSet* trig_pset = proc_pset.getPSetForUpdate("@trigger_paths");
61  trig_pset->registerIt();
62 
63  WorkerParams work_args(trig_pset, preg, &iPrealloc, processConfiguration, actions);
64  ModuleDescription md(trig_pset->id(),
65  "TriggerResultInserter",
66  "TriggerResults",
67  processConfiguration.get(),
69 
70  areg->preModuleConstructionSignal_(md);
71  bool postCalled = false;
72  std::shared_ptr<TriggerResultInserter> returnValue;
73  try {
74  maker::ModuleHolderT<TriggerResultInserter> holder(std::make_shared<TriggerResultInserter>(*trig_pset, iPrealloc.numberOfStreams()),static_cast<Maker const*>(nullptr));
75  holder.setModuleDescription(md);
76  holder.registerProductsAndCallbacks(&preg);
77  returnValue =holder.module();
78  postCalled = true;
79  // if exception then post will be called in the catch block
80  areg->postModuleConstructionSignal_(md);
81  }
82  catch (...) {
83  if(!postCalled) {
84  try {
85  areg->postModuleConstructionSignal_(md);
86  }
87  catch (...) {
88  // If post throws an exception ignore it because we are already handling another exception
89  }
90  }
91  throw;
92  }
93  return returnValue;
94  }
95 
96 
97  void
98  checkAndInsertAlias(std::string const& friendlyClassName,
99  std::string const& moduleLabel,
100  std::string const& productInstanceName,
101  std::string const& processName,
102  std::string const& alias,
103  std::string const& instanceAlias,
104  ProductRegistry const& preg,
105  std::multimap<BranchKey, BranchKey>& aliasMap,
106  std::map<BranchKey, BranchKey>& aliasKeys) {
107  std::string const star("*");
108 
109  BranchKey key(friendlyClassName, moduleLabel, productInstanceName, processName);
110  if(preg.productList().find(key) == preg.productList().end()) {
111  // No product was found matching the alias.
112  // We throw an exception only if a module with the specified module label was created in this process.
113  for(auto const& product : preg.productList()) {
114  if(moduleLabel == product.first.moduleLabel() && processName == product.first.processName()) {
115  throw Exception(errors::Configuration, "EDAlias does not match data\n")
116  << "There are no products of type '" << friendlyClassName << "'\n"
117  << "with module label '" << moduleLabel << "' and instance name '" << productInstanceName << "'.\n";
118  }
119  }
120  }
121 
122  std::string const& theInstanceAlias(instanceAlias == star ? productInstanceName : instanceAlias);
123  BranchKey aliasKey(friendlyClassName, alias, theInstanceAlias, processName);
124  if(preg.productList().find(aliasKey) != preg.productList().end()) {
125  throw Exception(errors::Configuration, "EDAlias conflicts with data\n")
126  << "A product of type '" << friendlyClassName << "'\n"
127  << "with module label '" << alias << "' and instance name '" << theInstanceAlias << "'\n"
128  << "already exists.\n";
129  }
130  auto iter = aliasKeys.find(aliasKey);
131  if(iter != aliasKeys.end()) {
132  // The alias matches a previous one. If the same alias is used for different product, throw.
133  if(iter->second != key) {
134  throw Exception(errors::Configuration, "EDAlias conflict\n")
135  << "The module label alias '" << alias << "' and product instance alias '" << theInstanceAlias << "'\n"
136  << "are used for multiple products of type '" << friendlyClassName << "'\n"
137  << "One has module label '" << moduleLabel << "' and product instance name '" << productInstanceName << "',\n"
138  << "the other has module label '" << iter->second.moduleLabel() << "' and product instance name '" << iter->second.productInstanceName() << "'.\n";
139  }
140  } else {
141  auto prodIter = preg.productList().find(key);
142  if(prodIter != preg.productList().end()) {
143  if (!prodIter->second.produced()) {
144  throw Exception(errors::Configuration, "EDAlias\n")
145  << "The module label alias '" << alias << "' and product instance alias '" << theInstanceAlias << "'\n"
146  << "are used for a product of type '" << friendlyClassName << "'\n"
147  << "with module label '" << moduleLabel << "' and product instance name '" << productInstanceName << "',\n"
148  << "An EDAlias can only be used for products produced in the current process. This one is not.\n";
149  }
150  aliasMap.insert(std::make_pair(key, aliasKey));
151  aliasKeys.insert(std::make_pair(aliasKey, key));
152  }
153  }
154  }
155 
156  void
157  processEDAliases(ParameterSet const& proc_pset, std::string const& processName, ProductRegistry& preg) {
158  std::vector<std::string> aliases = proc_pset.getParameter<std::vector<std::string> >("@all_aliases");
159  if(aliases.empty()) {
160  return;
161  }
162  std::string const star("*");
163  std::string const empty("");
165  desc.add<std::string>("type");
166  desc.add<std::string>("fromProductInstance", star);
167  desc.add<std::string>("toProductInstance", star);
168 
169  std::multimap<BranchKey, BranchKey> aliasMap;
170 
171  std::map<BranchKey, BranchKey> aliasKeys; // Used to search for duplicates or clashes.
172 
173  // Now, loop over the alias information and store it in aliasMap.
174  for(std::string const& alias : aliases) {
175  ParameterSet const& aliasPSet = proc_pset.getParameterSet(alias);
176  std::vector<std::string> vPSetNames = aliasPSet.getParameterNamesForType<VParameterSet>();
177  for(std::string const& moduleLabel : vPSetNames) {
178  VParameterSet vPSet = aliasPSet.getParameter<VParameterSet>(moduleLabel);
179  for(ParameterSet& pset : vPSet) {
180  desc.validate(pset);
181  std::string friendlyClassName = pset.getParameter<std::string>("type");
182  std::string productInstanceName = pset.getParameter<std::string>("fromProductInstance");
183  std::string instanceAlias = pset.getParameter<std::string>("toProductInstance");
184  if(productInstanceName == star) {
185  bool match = false;
186  BranchKey lowerBound(friendlyClassName, moduleLabel, empty, empty);
187  for(ProductRegistry::ProductList::const_iterator it = preg.productList().lower_bound(lowerBound);
188  it != preg.productList().end() && it->first.friendlyClassName() == friendlyClassName && it->first.moduleLabel() == moduleLabel;
189  ++it) {
190  if(it->first.processName() != processName) {
191  continue;
192  }
193  match = true;
194 
195  checkAndInsertAlias(friendlyClassName, moduleLabel, it->first.productInstanceName(), processName, alias, instanceAlias, preg, aliasMap, aliasKeys);
196  }
197  if(!match) {
198  // No product was found matching the alias.
199  // We throw an exception only if a module with the specified module label was created in this process.
200  for(auto const& product : preg.productList()) {
201  if(moduleLabel == product.first.moduleLabel() && processName == product.first.processName()) {
202  throw Exception(errors::Configuration, "EDAlias parameter set mismatch\n")
203  << "There are no products of type '" << friendlyClassName << "'\n"
204  << "with module label '" << moduleLabel << "'.\n";
205  }
206  }
207  }
208  } else {
209  checkAndInsertAlias(friendlyClassName, moduleLabel, productInstanceName, processName, alias, instanceAlias, preg, aliasMap, aliasKeys);
210  }
211  }
212  }
213  }
214 
215 
216  // Now add the new alias entries to the product registry.
217  for(auto const& aliasEntry : aliasMap) {
218  ProductRegistry::ProductList::const_iterator it = preg.productList().find(aliasEntry.first);
219  assert(it != preg.productList().end());
220  preg.addLabelAlias(it->second, aliasEntry.second.moduleLabel(), aliasEntry.second.productInstanceName());
221  }
222 
223  }
224 
225  typedef std::vector<std::string> vstring;
226 
227  void reduceParameterSet(ParameterSet& proc_pset,
228  vstring const& end_path_name_list,
229  vstring& modulesInConfig,
230  std::set<std::string> const& usedModuleLabels,
231  std::map<std::string, std::vector<std::pair<std::string, int> > >& outputModulePathPositions) {
232  // Before calculating the ParameterSetID of the top level ParameterSet or
233  // saving it in the registry drop from the top level ParameterSet all
234  // OutputModules and EDAnalyzers not on trigger paths. If unscheduled
235  // production is not enabled also drop all the EDFilters and EDProducers
236  // that are not scheduled. Drop the ParameterSet used to configure the module
237  // itself. Also drop the other traces of these labels in the top level
238  // ParameterSet: Remove that labels from @all_modules and from all the
239  // end paths. If this makes any end paths empty, then remove the end path
240  // name from @end_paths, and @paths.
241 
242  // First make a list of labels to drop
243  vstring outputModuleLabels;
244  std::string edmType;
245  std::string const moduleEdmType("@module_edm_type");
246  std::string const outputModule("OutputModule");
247  std::string const edAnalyzer("EDAnalyzer");
248  std::string const edFilter("EDFilter");
249  std::string const edProducer("EDProducer");
250 
251  std::set<std::string> modulesInConfigSet(modulesInConfig.begin(), modulesInConfig.end());
252 
253  //need a list of all modules on paths in order to determine
254  // if an EDAnalyzer only appears on an end path
255  vstring scheduledPaths = proc_pset.getParameter<vstring>("@paths");
256  std::set<std::string> modulesOnPaths;
257  {
258  std::set<std::string> noEndPaths(scheduledPaths.begin(),scheduledPaths.end());
259  for(auto const& endPath: end_path_name_list) {
260  noEndPaths.erase(endPath);
261  }
262  {
263  vstring labels;
264  for(auto const& path: noEndPaths) {
265  labels = proc_pset.getParameter<vstring>(path);
266  modulesOnPaths.insert(labels.begin(),labels.end());
267  }
268  }
269  }
270  //Initially fill labelsToBeDropped with all module mentioned in
271  // the configuration but which are not being used by the system
272  std::vector<std::string> labelsToBeDropped;
273  labelsToBeDropped.reserve(modulesInConfigSet.size());
274  std::set_difference(modulesInConfigSet.begin(),modulesInConfigSet.end(),
275  usedModuleLabels.begin(),usedModuleLabels.end(),
276  std::back_inserter(labelsToBeDropped));
277 
278  const unsigned int sizeBeforeOutputModules = labelsToBeDropped.size();
279  for (auto const& modLabel: usedModuleLabels) {
280  edmType = proc_pset.getParameterSet(modLabel).getParameter<std::string>(moduleEdmType);
281  if (edmType == outputModule) {
282  outputModuleLabels.push_back(modLabel);
283  labelsToBeDropped.push_back(modLabel);
284  }
285  if(edmType == edAnalyzer) {
286  if(modulesOnPaths.end()==modulesOnPaths.find(modLabel)) {
287  labelsToBeDropped.push_back(modLabel);
288  }
289  }
290  }
291  //labelsToBeDropped must be sorted
292  std::inplace_merge(labelsToBeDropped.begin(),
293  labelsToBeDropped.begin()+sizeBeforeOutputModules,
294  labelsToBeDropped.end());
295 
296  // drop the parameter sets used to configure the modules
297  for_all(labelsToBeDropped, std::bind(&ParameterSet::eraseOrSetUntrackedParameterSet, std::ref(proc_pset), _1));
298 
299  // drop the labels from @all_modules
300  vstring::iterator endAfterRemove = std::remove_if(modulesInConfig.begin(), modulesInConfig.end(), std::bind(binary_search_string, std::ref(labelsToBeDropped), _1));
301  modulesInConfig.erase(endAfterRemove, modulesInConfig.end());
302  proc_pset.addParameter<vstring>(std::string("@all_modules"), modulesInConfig);
303 
304  // drop the labels from all end paths
305  vstring endPathsToBeDropped;
306  vstring labels;
307  for (vstring::const_iterator iEndPath = end_path_name_list.begin(), endEndPath = end_path_name_list.end();
308  iEndPath != endEndPath;
309  ++iEndPath) {
310  labels = proc_pset.getParameter<vstring>(*iEndPath);
311  vstring::iterator iSave = labels.begin();
312  vstring::iterator iBegin = labels.begin();
313 
314  for (vstring::iterator iLabel = labels.begin(), iEnd = labels.end();
315  iLabel != iEnd; ++iLabel) {
316  if (binary_search_string(labelsToBeDropped, *iLabel)) {
317  if (binary_search_string(outputModuleLabels, *iLabel)) {
318  outputModulePathPositions[*iLabel].emplace_back(*iEndPath, iSave - iBegin);
319  }
320  } else {
321  if (iSave != iLabel) {
322  iSave->swap(*iLabel);
323  }
324  ++iSave;
325  }
326  }
327  labels.erase(iSave, labels.end());
328  if (labels.empty()) {
329  // remove empty end paths and save their names
330  proc_pset.eraseSimpleParameter(*iEndPath);
331  endPathsToBeDropped.push_back(*iEndPath);
332  } else {
333  proc_pset.addParameter<vstring>(*iEndPath, labels);
334  }
335  }
336  sort_all(endPathsToBeDropped);
337 
338  // remove empty end paths from @paths
339  endAfterRemove = std::remove_if(scheduledPaths.begin(), scheduledPaths.end(), std::bind(binary_search_string, std::ref(endPathsToBeDropped), _1));
340  scheduledPaths.erase(endAfterRemove, scheduledPaths.end());
341  proc_pset.addParameter<vstring>(std::string("@paths"), scheduledPaths);
342 
343  // remove empty end paths from @end_paths
344  vstring scheduledEndPaths = proc_pset.getParameter<vstring>("@end_paths");
345  endAfterRemove = std::remove_if(scheduledEndPaths.begin(), scheduledEndPaths.end(), std::bind(binary_search_string, std::ref(endPathsToBeDropped), _1));
346  scheduledEndPaths.erase(endAfterRemove, scheduledEndPaths.end());
347  proc_pset.addParameter<vstring>(std::string("@end_paths"), scheduledEndPaths);
348 
349  }
350  }
351  // -----------------------------
352 
353  typedef std::vector<std::string> vstring;
354 
355  // -----------------------------
356 
359  ProductRegistry& preg,
360  BranchIDListHelper& branchIDListHelper,
361  ThinnedAssociationsHelper& thinnedAssociationsHelper,
362  ExceptionToActionTable const& actions,
363  std::shared_ptr<ActivityRegistry> areg,
364  std::shared_ptr<ProcessConfiguration> processConfiguration,
365  const ParameterSet* subProcPSet,
367  ProcessContext const* processContext) :
368  //Only create a resultsInserter if there is a trigger path
369  resultsInserter_{tns.getTrigPaths().empty()? std::shared_ptr<TriggerResultInserter>{} :makeInserter(proc_pset,prealloc,preg,actions,areg,processConfiguration)},
370  moduleRegistry_(new ModuleRegistry()),
373  wantSummary_(tns.wantSummary()),
375  {
376  assert(0<prealloc.numberOfStreams());
377  streamSchedules_.reserve(prealloc.numberOfStreams());
378  for(unsigned int i=0; i<prealloc.numberOfStreams();++i) {
379  streamSchedules_.emplace_back(std::make_shared<StreamSchedule>(resultsInserter_,moduleRegistry_,proc_pset,tns,prealloc,preg,branchIDListHelper,actions,areg,processConfiguration,nullptr==subProcPSet,StreamID{i},processContext));
380  }
381 
382  //TriggerResults are injected automatically by StreamSchedules and are
383  // unknown to the ModuleRegistry
384  const std::string kTriggerResults("TriggerResults");
385  std::vector<std::string> modulesToUse;
386  modulesToUse.reserve(streamSchedules_[0]->allWorkers().size());
387  for(auto const& worker : streamSchedules_[0]->allWorkers()) {
388  if(worker->description().moduleLabel() != kTriggerResults) {
389  modulesToUse.push_back(worker->description().moduleLabel());
390  }
391  }
392  //The unscheduled modules are at the end of the list, but we want them at the front
393  unsigned int n = streamSchedules_[0]->numberOfUnscheduledModules();
394  if(n>0) {
395  std::vector<std::string> temp;
396  temp.reserve(modulesToUse.size());
397  auto itBeginUnscheduled = modulesToUse.begin()+modulesToUse.size()-n;
398  std::copy(itBeginUnscheduled,modulesToUse.end(),
399  std::back_inserter(temp));
400  std::copy(modulesToUse.begin(),itBeginUnscheduled,std::back_inserter(temp));
401  temp.swap(modulesToUse);
402  }
403  globalSchedule_.reset( new GlobalSchedule{ resultsInserter_,
405  modulesToUse,
406  proc_pset, preg, prealloc,
407  actions,areg,processConfiguration,processContext });
408 
409  //TriggerResults is not in the top level ParameterSet so the call to
410  // reduceParameterSet would fail to find it. Just remove it up front.
411  std::set<std::string> usedModuleLabels;
412  for( auto const worker: allWorkers()) {
413  if(worker->description().moduleLabel() != kTriggerResults) {
414  usedModuleLabels.insert(worker->description().moduleLabel());
415  }
416  }
417  std::vector<std::string> modulesInConfig(proc_pset.getParameter<std::vector<std::string> >("@all_modules"));
418  std::map<std::string, std::vector<std::pair<std::string, int> > > outputModulePathPositions;
419  reduceParameterSet(proc_pset, tns.getEndPaths(), modulesInConfig, usedModuleLabels,
420  outputModulePathPositions);
421  processEDAliases(proc_pset, processConfiguration->processName(), preg);
422  proc_pset.registerIt();
423  processConfiguration->setParameterSetID(proc_pset.id());
424  processConfiguration->setProcessConfigurationID();
425 
426  // This is used for a little sanity-check to make sure no code
427  // modifications alter the number of workers at a later date.
428  size_t all_workers_count = allWorkers().size();
429 
430  moduleRegistry_->forAllModuleHolders([this](maker::ModuleHolder* iHolder){
431  auto comm = iHolder->createOutputModuleCommunicator();
432  if (comm) {
433  all_output_communicators_.emplace_back(std::shared_ptr<OutputModuleCommunicator>{comm.release()});
434  }
435  });
436  // Now that the output workers are filled in, set any output limits or information.
437  limitOutput(proc_pset, branchIDListHelper.branchIDLists());
438 
440 
441  // Sanity check: make sure nobody has added a worker after we've
442  // already relied on the WorkerManager being full.
443  assert (all_workers_count == allWorkers().size());
444 
445  branchIDListHelper.updateFromRegistry(preg);
446 
447  preg.setFrozen();
448 
449  for(auto const& worker : streamSchedules_[0]->allWorkers()) {
450  worker->registerThinnedAssociations(preg, thinnedAssociationsHelper);
451  }
452  thinnedAssociationsHelper.sort();
453 
454  for (auto c : all_output_communicators_) {
455  c->setEventSelectionInfo(outputModulePathPositions, preg.anyProductProduced());
456  c->selectProducts(preg, thinnedAssociationsHelper);
457  }
458 
459  if(wantSummary_) {
460  std::vector<const ModuleDescription*> modDesc;
461  const auto& workers = allWorkers();
462  modDesc.reserve(workers.size());
463 
464  std::transform(workers.begin(),workers.end(),
465  std::back_inserter(modDesc),
466  [](const Worker* iWorker) -> const ModuleDescription* {
467  return iWorker->descPtr();
468  });
469 
470  summaryTimeKeeper_.reset(new SystemTimeKeeper(prealloc.numberOfStreams(),
471  modDesc,
472  tns));
473  auto timeKeeperPtr = summaryTimeKeeper_.get();
474 
475  areg->watchPreModuleEvent(timeKeeperPtr, &SystemTimeKeeper::startModuleEvent);
476  areg->watchPostModuleEvent(timeKeeperPtr, &SystemTimeKeeper::stopModuleEvent);
477  areg->watchPreModuleEventDelayedGet(timeKeeperPtr, &SystemTimeKeeper::pauseModuleEvent);
478  areg->watchPostModuleEventDelayedGet(timeKeeperPtr,&SystemTimeKeeper::restartModuleEvent);
479 
480  areg->watchPreSourceEvent(timeKeeperPtr, &SystemTimeKeeper::startEvent);
481  areg->watchPostEvent(timeKeeperPtr, &SystemTimeKeeper::stopEvent);
482 
483  areg->watchPrePathEvent(timeKeeperPtr, &SystemTimeKeeper::startPath);
484  areg->watchPostPathEvent(timeKeeperPtr, &SystemTimeKeeper::stopPath);
485 
486  areg->watchPostBeginJob(timeKeeperPtr, &SystemTimeKeeper::startProcessingLoop);
487  areg->watchPreEndJob(timeKeeperPtr, &SystemTimeKeeper::stopProcessingLoop);
488  //areg->preModuleEventSignal_.connect([timeKeeperPtr](StreamContext const& iContext, ModuleCallingContext const& iMod) {
489  //timeKeeperPtr->startModuleEvent(iContext,iMod);
490  //});
491  }
492 
493  } // Schedule::Schedule
494 
495 
496  void
497  Schedule::limitOutput(ParameterSet const& proc_pset, BranchIDLists const& branchIDLists) {
498  std::string const output("output");
499 
500  ParameterSet const& maxEventsPSet = proc_pset.getUntrackedParameterSet("maxEvents", ParameterSet());
501  int maxEventSpecs = 0;
502  int maxEventsOut = -1;
503  ParameterSet const* vMaxEventsOut = 0;
504  std::vector<std::string> intNamesE = maxEventsPSet.getParameterNamesForType<int>(false);
505  if (search_all(intNamesE, output)) {
506  maxEventsOut = maxEventsPSet.getUntrackedParameter<int>(output);
507  ++maxEventSpecs;
508  }
509  std::vector<std::string> psetNamesE;
510  maxEventsPSet.getParameterSetNames(psetNamesE, false);
511  if (search_all(psetNamesE, output)) {
512  vMaxEventsOut = &maxEventsPSet.getUntrackedParameterSet(output);
513  ++maxEventSpecs;
514  }
515 
516  if (maxEventSpecs > 1) {
518  "\nAt most, one form of 'output' may appear in the 'maxEvents' parameter set";
519  }
520 
521  for (auto c : all_output_communicators_) {
522  OutputModuleDescription desc(branchIDLists, maxEventsOut);
523  if (vMaxEventsOut != 0 && !vMaxEventsOut->empty()) {
524  std::string const& moduleLabel = c->description().moduleLabel();
525  try {
526  desc.maxEvents_ = vMaxEventsOut->getUntrackedParameter<int>(moduleLabel);
527  } catch (Exception const&) {
529  "\nNo entry in 'maxEvents' for output module label '" << moduleLabel << "'.\n";
530  }
531  }
532  c->configure(desc);
533  }
534  }
535 
536  bool Schedule::terminate() const {
537  if (all_output_communicators_.empty()) {
538  return false;
539  }
540  for (auto c : all_output_communicators_) {
541  if (!c->limitReached()) {
542  // Found an output module that has not reached output event count.
543  return false;
544  }
545  }
546  LogInfo("SuccessfulTermination")
547  << "The job is terminating successfully because each output module\n"
548  << "has reached its configured limit.\n";
549  return true;
550  }
551 
553  globalSchedule_->endJob(collector);
554  if (collector.hasThrown()) {
555  return;
556  }
557 
558  if (wantSummary_ == false) return;
559  {
560  TriggerReport tr;
561  getTriggerReport(tr);
562 
563  // The trigger report (pass/fail etc.):
564 
565  LogVerbatim("FwkSummary") << "";
566  LogVerbatim("FwkSummary") << "TrigReport " << "---------- Event Summary ------------";
567  if(!tr.trigPathSummaries.empty()) {
568  LogVerbatim("FwkSummary") << "TrigReport"
569  << " Events total = " << tr.eventSummary.totalEvents
570  << " passed = " << tr.eventSummary.totalEventsPassed
571  << " failed = " << tr.eventSummary.totalEventsFailed
572  << "";
573  } else {
574  LogVerbatim("FwkSummary") << "TrigReport"
575  << " Events total = " << tr.eventSummary.totalEvents
576  << " passed = " << tr.eventSummary.totalEvents
577  << " failed = 0";
578  }
579 
580  LogVerbatim("FwkSummary") << "";
581  LogVerbatim("FwkSummary") << "TrigReport " << "---------- Path Summary ------------";
582  LogVerbatim("FwkSummary") << "TrigReport "
583  << std::right << std::setw(10) << "Trig Bit#" << " "
584  << std::right << std::setw(10) << "Executed" << " "
585  << std::right << std::setw(10) << "Passed" << " "
586  << std::right << std::setw(10) << "Failed" << " "
587  << std::right << std::setw(10) << "Error" << " "
588  << "Name" << "";
589  for (auto const& p: tr.trigPathSummaries) {
590  LogVerbatim("FwkSummary") << "TrigReport "
591  << std::right << std::setw(5) << 1
592  << std::right << std::setw(5) << p.bitPosition << " "
593  << std::right << std::setw(10) << p.timesRun << " "
594  << std::right << std::setw(10) << p.timesPassed << " "
595  << std::right << std::setw(10) << p.timesFailed << " "
596  << std::right << std::setw(10) << p.timesExcept << " "
597  << p.name << "";
598  }
599 
600  /*
601  std::vector<int>::const_iterator epi = empty_trig_paths_.begin();
602  std::vector<int>::const_iterator epe = empty_trig_paths_.end();
603  std::vector<std::string>::const_iterator epn = empty_trig_path_names_.begin();
604  for (; epi != epe; ++epi, ++epn) {
605 
606  LogVerbatim("FwkSummary") << "TrigReport "
607  << std::right << std::setw(5) << 1
608  << std::right << std::setw(5) << *epi << " "
609  << std::right << std::setw(10) << totalEvents() << " "
610  << std::right << std::setw(10) << totalEvents() << " "
611  << std::right << std::setw(10) << 0 << " "
612  << std::right << std::setw(10) << 0 << " "
613  << *epn << "";
614  }
615  */
616 
617  LogVerbatim("FwkSummary") << "";
618  LogVerbatim("FwkSummary") << "TrigReport " << "-------End-Path Summary ------------";
619  LogVerbatim("FwkSummary") << "TrigReport "
620  << std::right << std::setw(10) << "Trig Bit#" << " "
621  << std::right << std::setw(10) << "Executed" << " "
622  << std::right << std::setw(10) << "Passed" << " "
623  << std::right << std::setw(10) << "Failed" << " "
624  << std::right << std::setw(10) << "Error" << " "
625  << "Name" << "";
626  for (auto const& p: tr.endPathSummaries) {
627  LogVerbatim("FwkSummary") << "TrigReport "
628  << std::right << std::setw(5) << 0
629  << std::right << std::setw(5) << p.bitPosition << " "
630  << std::right << std::setw(10) << p.timesRun << " "
631  << std::right << std::setw(10) << p.timesPassed << " "
632  << std::right << std::setw(10) << p.timesFailed << " "
633  << std::right << std::setw(10) << p.timesExcept << " "
634  << p.name << "";
635  }
636 
637  for (auto const& p: tr.trigPathSummaries) {
638  LogVerbatim("FwkSummary") << "";
639  LogVerbatim("FwkSummary") << "TrigReport " << "---------- Modules in Path: " << p.name << " ------------";
640  LogVerbatim("FwkSummary") << "TrigReport "
641  << std::right << std::setw(10) << "Trig Bit#" << " "
642  << std::right << std::setw(10) << "Visited" << " "
643  << std::right << std::setw(10) << "Passed" << " "
644  << std::right << std::setw(10) << "Failed" << " "
645  << std::right << std::setw(10) << "Error" << " "
646  << "Name" << "";
647 
648  unsigned int bitpos = 0;
649  for (auto const& mod: p.moduleInPathSummaries) {
650  LogVerbatim("FwkSummary") << "TrigReport "
651  << std::right << std::setw(5) << 1
652  << std::right << std::setw(5) << bitpos << " "
653  << std::right << std::setw(10) << mod.timesVisited << " "
654  << std::right << std::setw(10) << mod.timesPassed << " "
655  << std::right << std::setw(10) << mod.timesFailed << " "
656  << std::right << std::setw(10) << mod.timesExcept << " "
657  << mod.moduleLabel << "";
658  ++bitpos;
659  }
660  }
661 
662  for (auto const& p: tr.endPathSummaries) {
663  LogVerbatim("FwkSummary") << "";
664  LogVerbatim("FwkSummary") << "TrigReport " << "------ Modules in End-Path: " << p.name << " ------------";
665  LogVerbatim("FwkSummary") << "TrigReport "
666  << std::right << std::setw(10) << "Trig Bit#" << " "
667  << std::right << std::setw(10) << "Visited" << " "
668  << std::right << std::setw(10) << "Passed" << " "
669  << std::right << std::setw(10) << "Failed" << " "
670  << std::right << std::setw(10) << "Error" << " "
671  << "Name" << "";
672 
673  unsigned int bitpos=0;
674  for (auto const& mod: p.moduleInPathSummaries) {
675  LogVerbatim("FwkSummary") << "TrigReport "
676  << std::right << std::setw(5) << 0
677  << std::right << std::setw(5) << bitpos << " "
678  << std::right << std::setw(10) << mod.timesVisited << " "
679  << std::right << std::setw(10) << mod.timesPassed << " "
680  << std::right << std::setw(10) << mod.timesFailed << " "
681  << std::right << std::setw(10) << mod.timesExcept << " "
682  << mod.moduleLabel << "";
683  ++bitpos;
684  }
685  }
686 
687  LogVerbatim("FwkSummary") << "";
688  LogVerbatim("FwkSummary") << "TrigReport " << "---------- Module Summary ------------";
689  LogVerbatim("FwkSummary") << "TrigReport "
690  << std::right << std::setw(10) << "Visited" << " "
691  << std::right << std::setw(10) << "Executed" << " "
692  << std::right << std::setw(10) << "Passed" << " "
693  << std::right << std::setw(10) << "Failed" << " "
694  << std::right << std::setw(10) << "Error" << " "
695  << "Name" << "";
696  for (auto const& worker : tr.workerSummaries) {
697  LogVerbatim("FwkSummary") << "TrigReport "
698  << std::right << std::setw(10) << worker.timesVisited << " "
699  << std::right << std::setw(10) << worker.timesRun << " "
700  << std::right << std::setw(10) << worker.timesPassed << " "
701  << std::right << std::setw(10) << worker.timesFailed << " "
702  << std::right << std::setw(10) << worker.timesExcept << " "
703  << worker.moduleLabel << "";
704  }
705  LogVerbatim("FwkSummary") << "";
706  }
707  // The timing report (CPU and Real Time):
710 
711  const int totalEvents = std::max(1, tr.eventSummary.totalEvents);
712 
713  LogVerbatim("FwkSummary") << "TimeReport " << "---------- Event Summary ---[sec]----";
714  LogVerbatim("FwkSummary") << "TimeReport"
715  << std::setprecision(6) << std::fixed
716  << " event loop CPU/event = " << tr.eventSummary.cpuTime/totalEvents;
717  LogVerbatim("FwkSummary") << "TimeReport"
718  << std::setprecision(6) << std::fixed
719  << " event loop Real/event = " << tr.eventSummary.realTime/totalEvents;
720  LogVerbatim("FwkSummary") << "TimeReport"
721  << std::setprecision(6) << std::fixed
722  << " sum Streams Real/event = " << tr.eventSummary.sumStreamRealTime/totalEvents;
723  LogVerbatim("FwkSummary") << "TimeReport"
724  << std::setprecision(6) << std::fixed
725  << " efficiency CPU/Real/thread = " << tr.eventSummary.cpuTime/tr.eventSummary.realTime/preallocConfig_.numberOfThreads();
726 
727  constexpr int kColumn1Size = 10;
728  constexpr int kColumn2Size = 12;
729  constexpr int kColumn3Size = 12;
730  LogVerbatim("FwkSummary") << "";
731  LogVerbatim("FwkSummary") << "TimeReport " << "---------- Path Summary ---[Real sec]----";
732  LogVerbatim("FwkSummary") << "TimeReport "
733  << std::right << std::setw(kColumn1Size) << "per event"<<" "
734  << std::right << std::setw(kColumn2Size) << "per exec"
735  << " Name";
736  for (auto const& p: tr.trigPathSummaries) {
737  const int timesRun = std::max(1, p.timesRun);
738  LogVerbatim("FwkSummary") << "TimeReport "
739  << std::setprecision(6) << std::fixed
740  << std::right << std::setw(kColumn1Size) << p.realTime/totalEvents << " "
741  << std::right << std::setw(kColumn2Size) << p.realTime/timesRun << " "
742  << p.name << "";
743  }
744  LogVerbatim("FwkSummary") << "TimeReport "
745  << std::right << std::setw(kColumn1Size) << "per event"<<" "
746  << std::right << std::setw(kColumn2Size) << "per exec"
747  << " Name" << "";
748 
749  LogVerbatim("FwkSummary") << "";
750  LogVerbatim("FwkSummary") << "TimeReport " << "-------End-Path Summary ---[Real sec]----";
751  LogVerbatim("FwkSummary") << "TimeReport "
752  << std::right << std::setw(kColumn1Size) << "per event" <<" "
753  << std::right << std::setw(kColumn2Size) << "per exec"
754  << " Name" << "";
755  for (auto const& p: tr.endPathSummaries) {
756  const int timesRun = std::max(1, p.timesRun);
757 
758  LogVerbatim("FwkSummary") << "TimeReport "
759  << std::setprecision(6) << std::fixed
760  << std::right << std::setw(kColumn1Size) << p.realTime/totalEvents << " "
761  << std::right << std::setw(kColumn2Size) << p.realTime/timesRun << " "
762  << p.name << "";
763  }
764  LogVerbatim("FwkSummary") << "TimeReport "
765  << std::right << std::setw(kColumn1Size) << "per event" <<" "
766  << std::right << std::setw(kColumn2Size) << "per exec"
767  << " Name" << "";
768 
769  for (auto const& p: tr.trigPathSummaries) {
770  LogVerbatim("FwkSummary") << "";
771  LogVerbatim("FwkSummary") << "TimeReport " << "---------- Modules in Path: " << p.name << " ---[Real sec]----";
772  LogVerbatim("FwkSummary") << "TimeReport "
773  << std::right << std::setw(kColumn1Size) << "per event" <<" "
774  << std::right << std::setw(kColumn2Size) << "per visit"
775  << " Name" << "";
776  for (auto const& mod: p.moduleInPathSummaries) {
777  LogVerbatim("FwkSummary") << "TimeReport "
778  << std::setprecision(6) << std::fixed
779  << std::right << std::setw(kColumn1Size) << mod.realTime/totalEvents << " "
780  << std::right << std::setw(kColumn2Size) << mod.realTime/std::max(1, mod.timesVisited) << " "
781  << mod.moduleLabel << "";
782  }
783  }
784  if(not tr.trigPathSummaries.empty()) {
785  LogVerbatim("FwkSummary") << "TimeReport "
786  << std::right << std::setw(kColumn1Size) << "per event" <<" "
787  << std::right << std::setw(kColumn2Size) << "per visit"
788  << " Name" << "";
789  }
790  for (auto const& p: tr.endPathSummaries) {
791  LogVerbatim("FwkSummary") << "";
792  LogVerbatim("FwkSummary") << "TimeReport " << "------ Modules in End-Path: " << p.name << " ---[Real sec]----";
793  LogVerbatim("FwkSummary") << "TimeReport "
794  << std::right << std::setw(kColumn1Size) << "per event" <<" "
795  << std::right << std::setw(kColumn2Size) << "per visit"
796  << " Name" << "";
797  for (auto const& mod: p.moduleInPathSummaries) {
798  LogVerbatim("FwkSummary") << "TimeReport "
799  << std::setprecision(6) << std::fixed
800  << std::right << std::setw(kColumn1Size) << mod.realTime/totalEvents << " "
801  << std::right << std::setw(kColumn2Size) << mod.realTime/std::max(1, mod.timesVisited) << " "
802  << mod.moduleLabel << "";
803  }
804  }
805  if(not tr.endPathSummaries.empty()) {
806  LogVerbatim("FwkSummary") << "TimeReport "
807  << std::right << std::setw(kColumn1Size) << "per event" <<" "
808  << std::right << std::setw(kColumn2Size) << "per visit"
809  << " Name" << "";
810  }
811  LogVerbatim("FwkSummary") << "";
812  LogVerbatim("FwkSummary") << "TimeReport " << "---------- Module Summary ---[Real sec]----";
813  LogVerbatim("FwkSummary") << "TimeReport "
814  << std::right << std::setw(kColumn1Size) << "per event" <<" "
815  << std::right << std::setw(kColumn2Size) << "per exec" <<" "
816  << std::right << std::setw(kColumn3Size) << "per visit"
817  << " Name" << "";
818  for (auto const& worker : tr.workerSummaries) {
819  LogVerbatim("FwkSummary") << "TimeReport "
820  << std::setprecision(6) << std::fixed
821  << std::right << std::setw(kColumn1Size) << worker.realTime/totalEvents << " "
822  << std::right << std::setw(kColumn2Size) << worker.realTime/std::max(1, worker.timesRun) << " "
823  << std::right << std::setw(kColumn3Size) << worker.realTime/std::max(1, worker.timesVisited) << " "
824  << worker.moduleLabel << "";
825  }
826  LogVerbatim("FwkSummary") << "TimeReport "
827  << std::right << std::setw(kColumn1Size) << "per event" <<" "
828  << std::right << std::setw(kColumn2Size) << "per exec" <<" "
829  << std::right << std::setw(kColumn3Size) << "per visit"
830  << " Name" << "";
831 
832  LogVerbatim("FwkSummary") << "";
833  LogVerbatim("FwkSummary") << "T---Report end!" << "";
834  LogVerbatim("FwkSummary") << "";
835  }
836 
838  using std::placeholders::_1;
840  }
841 
843  using std::placeholders::_1;
845  }
846 
848  using std::placeholders::_1;
850  }
851 
852  void Schedule::writeRun(RunPrincipal const& rp, ProcessContext const* processContext) {
853  using std::placeholders::_1;
854  for_all(all_output_communicators_, std::bind(&OutputModuleCommunicator::writeRun, _1, std::cref(rp), processContext));
855  }
856 
857  void Schedule::writeLumi(LuminosityBlockPrincipal const& lbp, ProcessContext const* processContext) {
858  using std::placeholders::_1;
859  for_all(all_output_communicators_, std::bind(&OutputModuleCommunicator::writeLumi, _1, std::cref(lbp), processContext));
860  }
861 
863  using std::placeholders::_1;
864  // Return true iff at least one output module returns true.
865  return (std::find_if (all_output_communicators_.begin(), all_output_communicators_.end(),
867  != all_output_communicators_.end());
868  }
869 
871  using std::placeholders::_1;
872  for_all(allWorkers(), std::bind(&Worker::respondToOpenInputFile, _1, std::cref(fb)));
873  }
874 
876  using std::placeholders::_1;
877  for_all(allWorkers(), std::bind(&Worker::respondToCloseInputFile, _1, std::cref(fb)));
878  }
879 
880  void Schedule::beginJob(ProductRegistry const& iRegistry) {
882 
883  globalSchedule_->beginJob(iRegistry);
884  }
885 
886  void Schedule::beginStream(unsigned int iStreamID) {
887  assert(iStreamID<streamSchedules_.size());
888  streamSchedules_[iStreamID]->beginStream();
889  }
890 
891  void Schedule::endStream(unsigned int iStreamID) {
892  assert(iStreamID<streamSchedules_.size());
893  streamSchedules_[iStreamID]->endStream();
894  }
895 
897  using std::placeholders::_1;
899  }
900  void Schedule::postForkReacquireResources(unsigned int iChildIndex, unsigned int iNumberOfChildren) {
901  using std::placeholders::_1;
902  for_all(allWorkers(), std::bind(&Worker::postForkReacquireResources, _1, iChildIndex, iNumberOfChildren));
903  }
904 
906  ParameterSet const& iPSet,
907  const ProductRegistry& iRegistry) {
908  Worker* found = nullptr;
909  for (auto const& worker : allWorkers()) {
910  if (worker->description().moduleLabel() == iLabel) {
911  found = worker;
912  break;
913  }
914  }
915  if (nullptr == found) {
916  return false;
917  }
918 
919  auto newMod = moduleRegistry_->replaceModule(iLabel,iPSet,preallocConfig_);
920 
921  globalSchedule_->replaceModule(newMod,iLabel);
922 
923  for(auto s: streamSchedules_) {
924  s->replaceModule(newMod,iLabel);
925  }
926 
927  {
928  //Need to updateLookup in order to make getByToken work
929  auto const runLookup = iRegistry.productLookup(InRun);
930  auto const lumiLookup = iRegistry.productLookup(InLumi);
931  auto const eventLookup = iRegistry.productLookup(InEvent);
932  found->updateLookup(InRun,*runLookup);
933  found->updateLookup(InLumi,*lumiLookup);
934  found->updateLookup(InEvent,*eventLookup);
935  }
936 
937  return true;
938  }
939 
940  std::vector<ModuleDescription const*>
942  std::vector<ModuleDescription const*> result;
943  result.reserve(allWorkers().size());
944 
945  for (auto const& worker : allWorkers()) {
946  ModuleDescription const* p = worker->descPtr();
947  result.push_back(p);
948  }
949  return result;
950  }
951 
952  Schedule::AllWorkers const&
954  return globalSchedule_->allWorkers();
955  }
956 
957  void
958  Schedule::availablePaths(std::vector<std::string>& oLabelsToFill) const {
959  streamSchedules_[0]->availablePaths(oLabelsToFill);
960  }
961 
962  void
963  Schedule::triggerPaths(std::vector<std::string>& oLabelsToFill) const {
964  streamSchedules_[0]->triggerPaths(oLabelsToFill);
965  }
966 
967  void
968  Schedule::endPaths(std::vector<std::string>& oLabelsToFill) const {
969  streamSchedules_[0]->endPaths(oLabelsToFill);
970  }
971 
972  void
974  std::vector<std::string>& oLabelsToFill) const {
975  streamSchedules_[0]->modulesInPath(iPathLabel,oLabelsToFill);
976  }
977 
978  void
980  std::vector<ModuleDescription const*>& descriptions,
981  unsigned int hint) const {
982  streamSchedules_[0]->moduleDescriptionsInPath(iPathLabel, descriptions, hint);
983  }
984 
985  void
987  std::vector<ModuleDescription const*>& descriptions,
988  unsigned int hint) const {
989  streamSchedules_[0]->moduleDescriptionsInEndPath(iEndPathLabel, descriptions, hint);
990  }
991 
992  void
993  Schedule::fillModuleAndConsumesInfo(std::vector<ModuleDescription const*>& allModuleDescriptions,
994  std::vector<std::pair<unsigned int, unsigned int> >& moduleIDToIndex,
995  std::vector<std::vector<ModuleDescription const*> >& modulesWhoseProductsAreConsumedBy,
996  ProductRegistry const& preg) const {
997  allModuleDescriptions.clear();
998  moduleIDToIndex.clear();
999  modulesWhoseProductsAreConsumedBy.clear();
1000 
1001  allModuleDescriptions.reserve(allWorkers().size());
1002  moduleIDToIndex.reserve(allWorkers().size());
1003  modulesWhoseProductsAreConsumedBy.resize(allWorkers().size());
1004 
1005  std::map<std::string, ModuleDescription const*> labelToDesc;
1006  unsigned int i = 0;
1007  for (auto const& worker : allWorkers()) {
1008  ModuleDescription const* p = worker->descPtr();
1009  allModuleDescriptions.push_back(p);
1010  moduleIDToIndex.push_back(std::pair<unsigned int, unsigned int>(p->id(), i));
1011  labelToDesc[p->moduleLabel()] = p;
1012  ++i;
1013  }
1014  sort_all(moduleIDToIndex);
1015 
1016  i = 0;
1017  for (auto const& worker : allWorkers()) {
1018  std::vector<ModuleDescription const*>& modules = modulesWhoseProductsAreConsumedBy.at(i);
1019  worker->modulesWhoseProductsAreConsumed(modules, preg, labelToDesc);
1020  ++i;
1021  }
1022  }
1023 
1024  void
1026  endpathsAreActive_ = active;
1027  for(auto const & s : streamSchedules_) {
1028  s->enableEndPaths(active);
1029  }
1030  }
1031 
1032  bool
1034  return endpathsAreActive_;
1035  }
1036 
1037  void
1039  rep.eventSummary.totalEvents = 0;
1042  for(auto& s: streamSchedules_) {
1043  s->getTriggerReport(rep);
1044  }
1045  }
1046 
1047  void
1049  rep.eventSummary.totalEvents = 0;
1050  rep.eventSummary.cpuTime = 0.;
1051  rep.eventSummary.realTime = 0.;
1052  summaryTimeKeeper_->fillTriggerTimingReport(rep);
1053  }
1054 
1055  int
1057  int returnValue = 0;
1058  for(auto& s: streamSchedules_) {
1059  returnValue += s->totalEvents();
1060  }
1061  return returnValue;
1062  }
1063 
1064  int
1066  int returnValue = 0;
1067  for(auto& s: streamSchedules_) {
1068  returnValue += s->totalEventsPassed();
1069  }
1070  return returnValue;
1071  }
1072 
1073  int
1075  int returnValue = 0;
1076  for(auto& s: streamSchedules_) {
1077  returnValue += s->totalEventsFailed();
1078  }
1079  return returnValue;
1080  }
1081 
1082 
1083  void
1085  for(auto const& s: streamSchedules_) {
1086  s->clearCounters();
1087  }
1088  }
1089 
1090  //====================================
1091  // Schedule::checkForCorrectness algorithm
1092  //
1093  // The code creates a 'dependency' graph between all
1094  // modules. A module depends on another module if
1095  // 1) it 'consumes' data produced by that module
1096  // 2) it appears directly after the module within a Path
1097  //
1098  // If there is a cycle in the 'dependency' graph then
1099  // the schedule may be unrunnable. The schedule is still
1100  // runnable if all cycles have at least two edges which
1101  // connect modules only by Path dependencies (i.e. not
1102  // linked by a data dependency).
1103  //
1104  // Example 1:
1105  // C consumes data from B
1106  // Path 1: A + B + C
1107  // Path 2: B + C + A
1108  //
1109  // Cycle: A after C [p2], C consumes B, B after A [p1]
1110  // Since this cycle has 2 path only edges it is OK since
1111  // A and (B+C) are independent so their run order doesn't matter
1112  //
1113  // Example 2:
1114  // B consumes A
1115  // C consumes B
1116  // Path: C + A
1117  //
1118  // Cycle: A after C [p], C consumes B, B consumes A
1119  // Since this cycle has 1 path only edge it is unrunnable.
1120  //
1121  // Example 3:
1122  // A consumes B
1123  // B consumes C
1124  // C consumes A
1125  // (no Path since unscheduled execution)
1126  //
1127  // Cycle: A consumes B, B consumes C, C consumes A
1128  // Since this cycle has 0 path only edges it is unrunnable.
1129  //====================================
1130 
1131  namespace {
1132  typedef std::pair<unsigned int, unsigned int> SimpleEdge;
1133  typedef std::map<SimpleEdge, std::vector<unsigned int>> EdgeToPathMap;
1134 
1135  typedef boost::adjacency_list<boost::vecS, boost::vecS, boost::bidirectionalS> Graph;
1136 
1137  typedef boost::graph_traits<Graph>::edge_descriptor Edge;
1138  struct cycle_detector : public boost::dfs_visitor<> {
1139 
1140  cycle_detector(EdgeToPathMap const& iEdgeToPathMap,
1141  std::vector<std::string> const& iPathNames,
1142  std::map<std::string,unsigned int> const& iModuleNamesToIndex):
1143  m_edgeToPathMap(iEdgeToPathMap),
1144  m_pathNames(iPathNames),
1145  m_namesToIndex(iModuleNamesToIndex){}
1146 
1147  void tree_edge(Edge iEdge, Graph const&) {
1148  m_stack.push_back(iEdge);
1149  }
1150 
1151  void finish_edge(Edge iEdge, Graph const& iGraph) {
1152  if(not m_stack.empty()) {
1153  if (iEdge == m_stack.back()) {
1154  m_stack.pop_back();
1155  }
1156  }
1157  }
1158 
1159  //Called if a cycle happens
1160  void back_edge(Edge iEdge, Graph const& iGraph) {
1161  //NOTE: If the path containing the cycle contains two or more
1162  // path only edges then there is no problem
1163 
1165  IndexMap const& index = get(boost::vertex_index, iGraph);
1166 
1167  unsigned int vertex = index[target(iEdge,iGraph)];
1168 
1169  //Find last edge which starts with this vertex
1170  std::list<Edge>::iterator itFirst = m_stack.begin();
1171  {
1172  bool seenVertex = false;
1173  while(itFirst != m_stack.end()) {
1174  if(not seenVertex) {
1175  if(index[source(*itFirst,iGraph)] == vertex) {
1176  seenVertex = true;
1177  }
1178  } else
1179  if (index[source(*itFirst,iGraph)] != vertex) {
1180  break;
1181  }
1182  ++itFirst;
1183  }
1184  if(itFirst != m_stack.begin()) {
1185  --itFirst;
1186  }
1187  }
1188  //This edge has not been added to the stack yet
1189  // making a copy allows us to add it in but not worry
1190  // about removing it at the end of the routine
1191  std::vector<Edge> tempStack;
1192  tempStack.reserve(m_stack.size()+1);
1193  tempStack.insert(tempStack.end(),itFirst,m_stack.end());
1194  tempStack.emplace_back(iEdge);
1195 
1196  unsigned int nPathDependencyOnly =0;
1197  for(auto const& edge: tempStack) {
1198  unsigned int in =index[source(edge,iGraph)];
1199  unsigned int out =index[target(edge,iGraph)];
1200 
1201  auto iFound = m_edgeToPathMap.find(SimpleEdge(in,out));
1202  bool pathDependencyOnly = true;
1203  for(auto dependency : iFound->second) {
1204  if (dependency == std::numeric_limits<unsigned int>::max()) {
1205  pathDependencyOnly = false;
1206  break;
1207  }
1208  }
1209  if (pathDependencyOnly) {
1210  ++nPathDependencyOnly;
1211  }
1212  }
1213  if(nPathDependencyOnly < 2) {
1214  reportError(tempStack,index,iGraph);
1215  }
1216  }
1217  private:
1218  std::string const& pathName(unsigned int iIndex) const {
1219  return m_pathNames[iIndex];
1220  }
1221 
1222  std::string const& moduleName(unsigned int iIndex) const {
1223  for(auto const& item : m_namesToIndex) {
1224  if(item.second == iIndex) {
1225  return item.first;
1226  }
1227  }
1228  assert(false);
1229  }
1230 
1231  void
1232  reportError(std::vector<Edge>const& iEdges,
1234  Graph const& iGraph) const {
1235  std::stringstream oStream;
1236  oStream <<"Module run order problem found: \n";
1237  bool first_edge = true;
1238  for(auto const& edge: iEdges) {
1239  unsigned int in =iIndex[source(edge,iGraph)];
1240  unsigned int out =iIndex[target(edge,iGraph)];
1241 
1242  if(first_edge) {
1243  first_edge = false;
1244  } else {
1245  oStream<<", ";
1246  }
1247  oStream <<moduleName(in);
1248 
1249  auto iFound = m_edgeToPathMap.find(SimpleEdge(in,out));
1250  bool pathDependencyOnly = true;
1251  for(auto dependency : iFound->second) {
1252  if (dependency == std::numeric_limits<unsigned int>::max()) {
1253  pathDependencyOnly = false;
1254  break;
1255  }
1256  }
1257  if (pathDependencyOnly) {
1258  oStream <<" after "<<moduleName(out)<<" [path "<<pathName(iFound->second[0])<<"]";
1259  } else {
1260  oStream <<" consumes "<<moduleName(out);
1261  }
1262  }
1263  oStream<<"\n Running in the threaded framework would lead to indeterminate results."
1264  "\n Please change order of modules in mentioned Path(s) to avoid inconsistent module ordering.";
1265 
1266  LogError("UnrunnableSchedule")<<oStream.str();
1267  }
1268 
1269  EdgeToPathMap const& m_edgeToPathMap;
1270  std::vector<std::string> const& m_pathNames;
1271  std::map<std::string,unsigned int> m_namesToIndex;
1272 
1273  std::list<Edge> m_stack;
1274  };
1275  }
1276 
1277  void
1279  {
1280  //Need to lookup names to ids quickly
1281  std::map<std::string,unsigned int> moduleNamesToIndex;
1282  for(auto worker: allWorkers()) {
1283  moduleNamesToIndex.insert( std::make_pair(worker->description().moduleLabel(),
1284  worker->description().id()));
1285  }
1286 
1287  //If a module to module dependency comes from a path, remember which path
1288  EdgeToPathMap edgeToPathMap;
1289 
1290  //determine the path dependencies
1291  std::vector<std::string> pathNames;
1292  {
1293  streamSchedules_[0]->availablePaths(pathNames);
1294 
1295  std::vector<std::string> moduleNames;
1296  std::vector<std::string> reducedModuleNames;
1297  unsigned int pathIndex=0;
1298  for(auto const& path: pathNames) {
1299  moduleNames.clear();
1300  reducedModuleNames.clear();
1301  std::set<std::string> alreadySeenNames;
1302 
1303  streamSchedules_[0]->modulesInPath(path,moduleNames);
1304  std::string lastModuleName;
1305  unsigned int lastModuleIndex;
1306  for(auto const& name: moduleNames) {
1307  auto found = alreadySeenNames.insert(name);
1308  if(found.second) {
1309  //first time for this path
1310  unsigned int moduleIndex = moduleNamesToIndex[name];
1311  if(not lastModuleName.empty() ) {
1312  edgeToPathMap[std::make_pair(moduleIndex,lastModuleIndex)].push_back(pathIndex);
1313  }
1314  lastModuleName = name;
1315  lastModuleIndex = moduleIndex;
1316  }
1317  }
1318  ++pathIndex;
1319  }
1320  }
1321  {
1322  std::vector<const char*> dependentModules;
1323  //determine the data dependencies
1324  for(auto const& worker: allWorkers()) {
1325  dependentModules.clear();
1326  //NOTE: what about aliases?
1327  worker->modulesDependentUpon(dependentModules);
1328  auto found = moduleNamesToIndex.find(worker->description().moduleLabel());
1329  if (found == moduleNamesToIndex.end()) {
1330  //The module was from a previous process
1331  continue;
1332  }
1333  unsigned int moduleIndex = found->second;
1334  for(auto name: dependentModules) {
1335  edgeToPathMap[std::make_pair(moduleIndex, moduleNamesToIndex[name])].push_back(std::numeric_limits<unsigned int>::max());
1336  }
1337  }
1338  }
1339  //Now use boost graph library to find cycles in the dependencies
1340  std::vector<SimpleEdge> outList;
1341  outList.reserve(edgeToPathMap.size());
1342  for(auto const& edgeInfo: edgeToPathMap) {
1343  outList.push_back(edgeInfo.first);
1344  }
1345 
1346  Graph g(outList.begin(),outList.end(), moduleNamesToIndex.size());
1347 
1348  cycle_detector detector(edgeToPathMap,pathNames,moduleNamesToIndex);
1349  boost::depth_first_search(g,boost::visitor(detector));
1350  }
1351 }
type
Definition: HCALResponse.h:21
bool empty() const
Definition: ParameterSet.h:218
std::vector< PathSummary > endPathSummaries
Definition: TriggerReport.h:64
T getUntrackedParameter(std::string const &, T const &) const
std::vector< PathTimingSummary > endPathSummaries
dictionary aliases
Definition: autoCond.py:40
int i
Definition: DBlmapReader.cc:9
string rep
Definition: cuy.py:1188
void checkForCorrectness() const
Check that the schedule is actually runable.
Definition: Schedule.cc:1278
void stopEvent(StreamContext const &)
std::vector< BranchIDList > BranchIDLists
Definition: BranchIDList.h:19
void fillModuleAndConsumesInfo(std::vector< ModuleDescription const * > &allModuleDescriptions, std::vector< std::pair< unsigned int, unsigned int > > &moduleIDToIndex, std::vector< std::vector< ModuleDescription const * > > &modulesWhoseProductsAreConsumedBy, ProductRegistry const &preg) const
Definition: Schedule.cc:993
virtual void openFile(FileBlock const &fb)=0
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
Definition: Schedule.cc:953
void availablePaths(std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill the labels for all paths in the process
Definition: Schedule.cc:958
static std::string const source("source")
virtual void writeRun(RunPrincipal const &rp, ProcessContext const *)=0
preallocConfig_(prealloc)
void restartModuleEvent(StreamContext const &, ModuleCallingContext const &)
bool endPathsEnabled() const
Definition: Schedule.cc:1033
all_output_communicators_()
void respondToCloseInputFile(FileBlock const &fb)
Definition: Schedule.cc:875
void startModuleEvent(StreamContext const &, ModuleCallingContext const &)
std::vector< Worker * > AllWorkers
Definition: Schedule.h:117
std::vector< ParameterSet > VParameterSet
Definition: ParameterSet.h:33
assert(m_qm.get())
void writeRun(RunPrincipal const &rp, ProcessContext const *)
Definition: Schedule.cc:852
void endStream(unsigned int)
Definition: Schedule.cc:891
void writeLumi(LuminosityBlockPrincipal const &lbp, ProcessContext const *)
Definition: Schedule.cc:857
void enableEndPaths(bool active)
Definition: Schedule.cc:1025
processConfiguration
Definition: Schedule.cc:369
std::shared_ptr< ProductHolderIndexHelper > const & productLookup(BranchType branchType) const
ParameterSet getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
endpathsAreActive_(true)
Definition: Schedule.cc:374
std::shared_ptr< ModuleRegistry > moduleRegistry_
Definition: Schedule.h:274
void moduleDescriptionsInEndPath(std::string const &iEndPathLabel, std::vector< ModuleDescription const * > &descriptions, unsigned int hint) const
Definition: Schedule.cc:986
std::vector< WorkerSummary > workerSummaries
Definition: TriggerReport.h:65
actions
Definition: Schedule.cc:369
std::string const & moduleLabel() const
static unsigned int getUniqueID()
Returns a unique id each time called. Intended to be passed to ModuleDescription&#39;s constructor&#39;s modI...
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e g
Definition: Activities.doc:4
#define constexpr
int totalEventsFailed() const
Definition: Schedule.cc:1074
bool changeModule(std::string const &iLabel, ParameterSet const &iPSet, const ProductRegistry &iRegistry)
Definition: Schedule.cc:905
void eraseOrSetUntrackedParameterSet(std::string const &name)
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
std::vector< std::string > getParameterNamesForType(bool trackiness=true) const
Definition: ParameterSet.h:194
int totalEventsPassed() const
Definition: Schedule.cc:1065
void triggerPaths(std::vector< std::string > &oLabelsToFill) const
Definition: Schedule.cc:963
std::string moduleName(Provenance const &provenance)
Definition: Provenance.cc:27
virtual void updateLookup(BranchType iBranchType, ProductHolderIndexHelper const &)=0
std::vector< PathSummary > trigPathSummaries
Definition: TriggerReport.h:63
EventSummary eventSummary
Definition: TriggerReport.h:62
void limitOutput(ParameterSet const &proc_pset, BranchIDLists const &branchIDLists)
Definition: Schedule.cc:497
wantSummary_(tns.wantSummary())
int totalEvents() const
Definition: Schedule.cc:1056
virtual void openNewFileIfNeeded()=0
EventTimingSummary eventSummary
Schedule(ParameterSet &proc_pset, service::TriggerNamesService &tns, ProductRegistry &pregistry, BranchIDListHelper &branchIDListHelper, ThinnedAssociationsHelper &thinnedAssociationsHelper, ExceptionToActionTable const &actions, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration > processConfiguration, const ParameterSet *subProcPSet, PreallocationConfiguration const &config, ProcessContext const *processContext)
Definition: Schedule.cc:357
void clearCounters()
Clear all the counters in the trigger report.
Definition: Schedule.cc:1084
tuple result
Definition: query.py:137
std::vector< PathTimingSummary > trigPathSummaries
std::vector< std::shared_ptr< StreamSchedule > > streamSchedules_
Definition: Schedule.h:275
bool terminate() const
Return whether each output module has reached its maximum count.
Definition: Schedule.cc:536
void respondToOpenInputFile(FileBlock const &fb)
Definition: Schedule.cc:870
virtual void writeLumi(LuminosityBlockPrincipal const &lbp, ProcessContext const *)=0
string key
FastSim: produces sample of signal events, overlayed with premixed minbias events.
void stopPath(StreamContext const &, PathContext const &, HLTPathStatus const &)
areg
Definition: Schedule.cc:369
void stopModuleEvent(StreamContext const &, ModuleCallingContext const &)
void postForkReacquireResources(unsigned int iChildIndex, unsigned int iNumberOfChildren)
Definition: Worker.h:91
void getTriggerReport(TriggerReport &rep) const
Definition: Schedule.cc:1038
tuple out
Definition: dbtoconf.py:99
virtual bool shouldWeCloseFile() const =0
PreallocationConfiguration preallocConfig_
Definition: Schedule.h:280
moduleRegistry_(new ModuleRegistry())
void sort_all(RandomAccessSequence &s)
wrappers for std::sort
Definition: Algorithms.h:120
volatile bool endpathsAreActive_
Definition: Schedule.h:286
void respondToOpenInputFile(FileBlock const &fb)
Definition: Worker.h:87
void startPath(StreamContext const &, PathContext const &)
bool search_all(ForwardSequence const &s, Datum const &d)
Definition: Algorithms.h:46
std::unique_ptr< SystemTimeKeeper > summaryTimeKeeper_
Definition: Schedule.h:282
virtual std::unique_ptr< OutputModuleCommunicator > createOutputModuleCommunicator()=0
AllOutputModuleCommunicators all_output_communicators_
Definition: Schedule.h:279
void loadMissingDictionaries()
void modulesInPath(std::string const &iPathLabel, std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel ...
Definition: Schedule.cc:973
void pauseModuleEvent(StreamContext const &, ModuleCallingContext const &)
void beginStream(unsigned int)
Definition: Schedule.cc:886
void respondToCloseInputFile(FileBlock const &fb)
Definition: Worker.h:88
void preForkReleaseResources()
Definition: Worker.h:90
bool wantSummary_
Definition: Schedule.h:284
void postForkReacquireResources(unsigned int iChildIndex, unsigned int iNumberOfChildren)
Definition: Schedule.cc:900
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
Definition: Schedule.cc:941
Strings const & getTrigPaths() const
std::unique_ptr< GlobalSchedule > globalSchedule_
Definition: Schedule.h:277
void beginJob(ProductRegistry const &)
Definition: Schedule.cc:880
void openNewOutputFilesIfNeeded()
Definition: Schedule.cc:842
size_t getParameterSetNames(std::vector< std::string > &output, bool trackiness=true) const
void preForkReleaseResources()
Definition: Schedule.cc:896
void openOutputFiles(FileBlock &fb)
Definition: Schedule.cc:847
preg
Definition: Schedule.cc:369
std::vector< WorkerTimingSummary > workerSummaries
T mod(const T &a, const T &b)
Definition: ecalDccMap.h:4
void endJob(ExceptionCollector &collector)
Definition: Schedule.cc:552
void getTriggerTimingReport(TriggerTimingReport &rep) const
Definition: Schedule.cc:1048
bool shouldWeCloseOutput() const
Definition: Schedule.cc:862
std::vector< std::string > vstring
Definition: Schedule.cc:353
tuple size
Write out results.
void closeOutputFiles()
Definition: Schedule.cc:837
void endPaths(std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill the labels for all end paths in the process
Definition: Schedule.cc:968
void moduleDescriptionsInPath(std::string const &iPathLabel, std::vector< ModuleDescription const * > &descriptions, unsigned int hint) const
Definition: Schedule.cc:979
prealloc
Definition: Schedule.cc:369
unsigned int id() const
std::string match(BranchDescription const &a, BranchDescription const &b, std::string const &fileName)