CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
StreamSchedule.cc
Go to the documentation of this file.
2 
26 
27 #include "boost/bind.hpp"
28 #include "boost/ref.hpp"
29 
30 #include <algorithm>
31 #include <cassert>
32 #include <cstdlib>
33 #include <functional>
34 #include <iomanip>
35 #include <list>
36 #include <map>
37 #include <exception>
38 
39 namespace edm {
40  namespace {
41 
42  // Function template to transform each element in the input range to
43  // a value placed into the output range. The supplied function
44  // should take a const_reference to the 'input', and write to a
45  // reference to the 'output'.
46  template <typename InputIterator, typename ForwardIterator, typename Func>
47  void
48  transform_into(InputIterator begin, InputIterator end,
49  ForwardIterator out, Func func) {
50  for (; begin != end; ++begin, ++out) func(*begin, *out);
51  }
52 
53  // Function template that takes a sequence 'from', a sequence
54  // 'to', and a callable object 'func'. It and applies
55  // transform_into to fill the 'to' sequence with the values
56  // calcuated by the callable object, taking care to fill the
57  // outupt only if all calls succeed.
58  template <typename FROM, typename TO, typename FUNC>
59  void
60  fill_summary(FROM const& from, TO& to, FUNC func) {
61  if(to.size()!=from.size()) {
62  TO temp(from.size());
63  transform_into(from.begin(), from.end(), temp.begin(), func);
64  to.swap(temp);
65  } else {
66  transform_into(from.begin(), from.end(), to.begin(), func);
67  }
68  }
69 
70  // -----------------------------
71 
72  // Here we make the trigger results inserter directly. This should
73  // probably be a utility in the WorkerRegistry or elsewhere.
74 
76  makeInserter(ExceptionToActionTable const& actions,
77  boost::shared_ptr<ActivityRegistry> areg,
78  TriggerResultInserter* inserter) {
79  StreamSchedule::WorkerPtr ptr(new edm::WorkerT<TriggerResultInserter::ModuleType>(inserter, inserter->moduleDescription(), &actions));
80  ptr->setActivityRegistry(areg);
81  return ptr;
82  }
83 
84  void
85  initializeBranchToReadingWorker(ParameterSet const& opts,
86  ProductRegistry const& preg,
87  std::multimap<std::string,Worker*>& branchToReadingWorker)
88  {
89  // See if any data has been marked to be deleted early (removing any duplicates)
90  auto vBranchesToDeleteEarly = opts.getUntrackedParameter<std::vector<std::string>>("canDeleteEarly",std::vector<std::string>());
91  if(not vBranchesToDeleteEarly.empty()) {
92  std::sort(vBranchesToDeleteEarly.begin(),vBranchesToDeleteEarly.end(),std::less<std::string>());
93  vBranchesToDeleteEarly.erase(std::unique(vBranchesToDeleteEarly.begin(),vBranchesToDeleteEarly.end()),
94  vBranchesToDeleteEarly.end());
95 
96  // Are the requested items in the product registry?
97  auto allBranchNames = preg.allBranchNames();
98  //the branch names all end with a period, which we do not want to compare with
99  for(auto & b:allBranchNames) {
100  b.resize(b.size()-1);
101  }
102  std::sort(allBranchNames.begin(),allBranchNames.end(),std::less<std::string>());
103  std::vector<std::string> temp;
104  temp.reserve(vBranchesToDeleteEarly.size());
105 
106  std::set_intersection(vBranchesToDeleteEarly.begin(),vBranchesToDeleteEarly.end(),
107  allBranchNames.begin(),allBranchNames.end(),
108  std::back_inserter(temp));
109  vBranchesToDeleteEarly.swap(temp);
110  if(temp.size() != vBranchesToDeleteEarly.size()) {
111  std::vector<std::string> missingProducts;
112  std::set_difference(temp.begin(),temp.end(),
113  vBranchesToDeleteEarly.begin(),vBranchesToDeleteEarly.end(),
114  std::back_inserter(missingProducts));
115  LogInfo l("MissingProductsForCanDeleteEarly");
116  l<<"The following products in the 'canDeleteEarly' list are not available in this job and will be ignored.";
117  for(auto const& n:missingProducts){
118  l<<"\n "<<n;
119  }
120  }
121  //set placeholder for the branch, we will remove the nullptr if a
122  // module actually wants the branch.
123  for(auto const& branch:vBranchesToDeleteEarly) {
124  branchToReadingWorker.insert(std::make_pair(branch, static_cast<Worker*>(nullptr)));
125  }
126  }
127  }
128  }
129 
130  // -----------------------------
131 
132  typedef std::vector<std::string> vstring;
133 
134  // -----------------------------
135 
137  boost::shared_ptr<ModuleRegistry> modReg,
138  ParameterSet& proc_pset,
142  BranchIDListHelper& branchIDListHelper,
143  ExceptionToActionTable const& actions,
144  boost::shared_ptr<ActivityRegistry> areg,
145  boost::shared_ptr<ProcessConfiguration> processConfiguration,
146  bool allowEarlyDelete,
147  StreamID streamID,
148  ProcessContext const* processContext) :
149  workerManager_(modReg,areg, actions),
150  actReg_(areg),
151  trig_name_list_(tns.getTrigPaths()),
152  end_path_name_list_(tns.getEndPaths()),
153  results_(new HLTGlobalStatus(trig_name_list_.size())),
154  results_inserter_(),
155  trig_paths_(),
156  end_paths_(),
157  stopwatch_(tns.wantSummary() ? new RunStopwatch::StopwatchPointer::element_type : static_cast<RunStopwatch::StopwatchPointer::element_type*> (nullptr)),
158  total_events_(),
159  total_passed_(),
160  number_of_unscheduled_modules_(0),
161  streamID_(streamID),
162  streamContext_(streamID_, processContext),
163  wantSummary_(tns.wantSummary()),
165 
166  ParameterSet const& opts = proc_pset.getUntrackedParameterSet("options", ParameterSet());
167  bool hasPath = false;
168 
169  int trig_bitpos = 0;
170  trig_paths_.reserve(trig_name_list_.size());
171  vstring labelsOnTriggerPaths;
172  for (auto const& trig_name : trig_name_list_) {
173  fillTrigPath(proc_pset, preg, &prealloc, processConfiguration, trig_bitpos, trig_name, results_, &labelsOnTriggerPaths);
174  ++trig_bitpos;
175  hasPath = true;
176  }
177 
178  if (hasPath) {
179  // the results inserter stands alone
180  inserter->setTrigResultForStream(streamID.value(),results_);
181 
182  results_inserter_ = makeInserter(actions, actReg_, inserter);
184  }
185 
186  // fill normal endpaths
187  int bitpos = 0;
188  end_paths_.reserve(end_path_name_list_.size());
189  for (auto const& end_path_name : end_path_name_list_) {
190  fillEndPath(proc_pset, preg, &prealloc, processConfiguration, bitpos, end_path_name);
191  ++bitpos;
192  }
193 
194  //See if all modules were used
195  std::set<std::string> usedWorkerLabels;
196  for (auto const& worker : allWorkers()) {
197  usedWorkerLabels.insert(worker->description().moduleLabel());
198  }
199  std::vector<std::string> modulesInConfig(proc_pset.getParameter<std::vector<std::string> >("@all_modules"));
200  std::set<std::string> modulesInConfigSet(modulesInConfig.begin(), modulesInConfig.end());
201  std::vector<std::string> unusedLabels;
202  set_difference(modulesInConfigSet.begin(), modulesInConfigSet.end(),
203  usedWorkerLabels.begin(), usedWorkerLabels.end(),
204  back_inserter(unusedLabels));
205  //does the configuration say we should allow on demand?
206  bool allowUnscheduled = opts.getUntrackedParameter<bool>("allowUnscheduled", false);
207  std::set<std::string> unscheduledLabels;
208  std::vector<std::string> shouldBeUsedLabels;
209  if (!unusedLabels.empty()) {
210  //Need to
211  // 1) create worker
212  // 2) if it is a WorkerT<EDProducer>, add it to our list
213  // 3) hand list to our delayed reader
214  for (auto const& label : unusedLabels) {
215  if (allowUnscheduled) {
216  bool isTracked;
217  ParameterSet* modulePSet(proc_pset.getPSetForUpdate(label, isTracked));
218  assert(isTracked);
219  assert(modulePSet != nullptr);
220  workerManager_.addToUnscheduledWorkers(*modulePSet, preg, &prealloc, processConfiguration, label, wantSummary_, unscheduledLabels, shouldBeUsedLabels);
221  } else {
222  //everthing is marked are unused so no 'on demand' allowed
223  shouldBeUsedLabels.push_back(label);
224  }
225  }
226  if (!shouldBeUsedLabels.empty()) {
227  std::ostringstream unusedStream;
228  unusedStream << "'" << shouldBeUsedLabels.front() << "'";
229  for (std::vector<std::string>::iterator itLabel = shouldBeUsedLabels.begin() + 1,
230  itLabelEnd = shouldBeUsedLabels.end();
231  itLabel != itLabelEnd;
232  ++itLabel) {
233  unusedStream << ",'" << *itLabel << "'";
234  }
235  LogInfo("path")
236  << "The following module labels are not assigned to any path:\n"
237  << unusedStream.str()
238  << "\n";
239  }
240  }
241  if (!unscheduledLabels.empty()) {
242  number_of_unscheduled_modules_=unscheduledLabels.size();
243  workerManager_.setOnDemandProducts(preg, unscheduledLabels);
244  }
245 
246 
247  initializeEarlyDelete(*modReg, opts,preg,allowEarlyDelete);
248 
249  } // StreamSchedule::StreamSchedule
250 
251 
254  bool allowEarlyDelete) {
255  //for now, if have a subProcess, don't allow early delete
256  // In the future we should use the SubProcess's 'keep list' to decide what can be kept
257  if(not allowEarlyDelete) return;
258 
259  //see if 'canDeleteEarly' was set and if so setup the list with those products actually
260  // registered for this job
261  std::multimap<std::string,Worker*> branchToReadingWorker;
262  initializeBranchToReadingWorker(opts,preg,branchToReadingWorker);
263 
264  //If no delete early items have been specified we don't have to do anything
265  if(branchToReadingWorker.size()==0) {
266  return;
267  }
268  const std::vector<std::string> kEmpty;
269  std::map<Worker*,unsigned int> reserveSizeForWorker;
270  unsigned int upperLimitOnReadingWorker =0;
271  unsigned int upperLimitOnIndicies = 0;
272  unsigned int nUniqueBranchesToDelete=branchToReadingWorker.size();
273 
274  //talk with output modules first
275  modReg.forAllModuleHolders([this, &branchToReadingWorker,&nUniqueBranchesToDelete](maker::ModuleHolder* iHolder){
276  auto comm = iHolder->createOutputModuleCommunicator();
277  if (comm) {
278  if(branchToReadingWorker.size()>0) {
279  //If an OutputModule needs a product, we can't delete it early
280  // so we should remove it from our list
281  SelectedProductsForBranchType const&kept = comm->keptProducts();
282  for( auto const& item: kept[InEvent]) {
283  auto found = branchToReadingWorker.equal_range(item->branchName());
284  if(found.first !=found.second) {
285  --nUniqueBranchesToDelete;
286  branchToReadingWorker.erase(found.first,found.second);
287  }
288  }
289  }
290  }
291  });
292 
293  if(branchToReadingWorker.size()==0) {
294  return;
295  }
296 
297  for (auto w :allWorkers()) {
298  //determine if this module could read a branch we want to delete early
299  auto pset = pset::Registry::instance()->getMapped(w->description().parameterSetID());
300  if(0!=pset) {
301  auto branches = pset->getUntrackedParameter<std::vector<std::string>>("mightGet",kEmpty);
302  if(not branches.empty()) {
303  ++upperLimitOnReadingWorker;
304  }
305  for(auto const& branch:branches){
306  auto found = branchToReadingWorker.equal_range(branch);
307  if(found.first != found.second) {
308  ++upperLimitOnIndicies;
309  ++reserveSizeForWorker[w];
310  if(nullptr == found.first->second) {
311  found.first->second = w;
312  } else {
313  branchToReadingWorker.insert(make_pair(found.first->first,w));
314  }
315  }
316  }
317  }
318  }
319  {
320  auto it = branchToReadingWorker.begin();
321  std::vector<std::string> unusedBranches;
322  while(it !=branchToReadingWorker.end()) {
323  if(it->second == nullptr) {
324  unusedBranches.push_back(it->first);
325  //erasing the object invalidates the iterator so must advance it first
326  auto temp = it;
327  ++it;
328  branchToReadingWorker.erase(temp);
329  } else {
330  ++it;
331  }
332  }
333  if(not unusedBranches.empty()) {
334  LogWarning l("UnusedProductsForCanDeleteEarly");
335  l<<"The following products in the 'canDeleteEarly' list are not used in this job and will be ignored.\n"
336  " If possible, remove the producer from the job or add the product to the producer's own 'mightGet' list.";
337  for(auto const& n:unusedBranches){
338  l<<"\n "<<n;
339  }
340  }
341  }
342  if(0!=branchToReadingWorker.size()) {
343  earlyDeleteHelpers_.reserve(upperLimitOnReadingWorker);
344  earlyDeleteHelperToBranchIndicies_.resize(upperLimitOnIndicies,0);
345  earlyDeleteBranchToCount_.reserve(nUniqueBranchesToDelete);
346  std::map<const Worker*,EarlyDeleteHelper*> alreadySeenWorkers;
347  std::string lastBranchName;
348  size_t nextOpenIndex = 0;
349  unsigned int* beginAddress = &(earlyDeleteHelperToBranchIndicies_.front());
350  for(auto& branchAndWorker:branchToReadingWorker) {
351  if(lastBranchName != branchAndWorker.first) {
352  //have to put back the period we removed earlier in order to get the proper name
353  BranchID bid(branchAndWorker.first+".");
354  earlyDeleteBranchToCount_.emplace_back(std::make_pair(bid,0U));
355  lastBranchName = branchAndWorker.first;
356  }
357  auto found = alreadySeenWorkers.find(branchAndWorker.second);
358  if(alreadySeenWorkers.end() == found) {
359  //NOTE: we will set aside enough space in earlyDeleteHelperToBranchIndicies_ to accommodate
360  // all the branches that might be read by this worker. However, initially we will only tell the
361  // EarlyDeleteHelper about the first one. As additional branches are added via 'appendIndex' the
362  // EarlyDeleteHelper will automatically advance its internal end pointer.
363  size_t index = nextOpenIndex;
364  size_t nIndices = reserveSizeForWorker[branchAndWorker.second];
366  earlyDeleteHelpers_.emplace_back(EarlyDeleteHelper(beginAddress+index,
367  beginAddress+index+1,
369  branchAndWorker.second->setEarlyDeleteHelper(&(earlyDeleteHelpers_.back()));
370  alreadySeenWorkers.insert(std::make_pair(branchAndWorker.second,&(earlyDeleteHelpers_.back())));
371  nextOpenIndex +=nIndices;
372  } else {
373  found->second->appendIndex(earlyDeleteBranchToCount_.size()-1);
374  }
375  }
376 
377  //Now we can compactify the earlyDeleteHelperToBranchIndicies_ since we may have over estimated the
378  // space needed for each module
379  auto itLast = earlyDeleteHelpers_.begin();
380  for(auto it = earlyDeleteHelpers_.begin()+1;it != earlyDeleteHelpers_.end();++it) {
381  if(itLast->end() != it->begin()) {
382  //figure the offset for next Worker since it hasn't been moved yet so it has the original address
383  unsigned int delta = it->begin()- itLast->end();
384  it->shiftIndexPointers(delta);
385 
387  (itLast->end()-beginAddress),
389  (it->begin()-beginAddress));
390  }
391  itLast = it;
392  }
393  earlyDeleteHelperToBranchIndicies_.erase(earlyDeleteHelperToBranchIndicies_.begin()+(itLast->end()-beginAddress),
395 
396  //now tell the paths about the deleters
397  for(auto& p : trig_paths_) {
398  p.setEarlyDeleteHelpers(alreadySeenWorkers);
399  }
400  for(auto& p : end_paths_) {
401  p.setEarlyDeleteHelpers(alreadySeenWorkers);
402  }
404  }
405  }
406 
410  boost::shared_ptr<ProcessConfiguration const> processConfiguration,
411  std::string const& name,
412  bool ignoreFilters,
413  PathWorkers& out,
414  vstring* labelsOnPaths) {
415  vstring modnames = proc_pset.getParameter<vstring>(name);
416  PathWorkers tmpworkers;
417 
418  unsigned int placeInPath = 0;
419  for (auto const& name : modnames) {
420 
421  if (labelsOnPaths) labelsOnPaths->push_back(name);
422 
424  if (name[0] == '!') filterAction = WorkerInPath::Veto;
425  else if (name[0] == '-') filterAction = WorkerInPath::Ignore;
426 
427  std::string moduleLabel = name;
428  if (filterAction != WorkerInPath::Normal) moduleLabel.erase(0, 1);
429 
430  bool isTracked;
431  ParameterSet* modpset = proc_pset.getPSetForUpdate(moduleLabel, isTracked);
432  if (modpset == 0) {
433  std::string pathType("endpath");
434  if (!search_all(end_path_name_list_, name)) {
435  pathType = std::string("path");
436  }
438  "The unknown module label \"" << moduleLabel <<
439  "\" appears in " << pathType << " \"" << name <<
440  "\"\n please check spelling or remove that label from the path.";
441  }
442  assert(isTracked);
443 
444  Worker* worker = workerManager_.getWorker(*modpset, preg, prealloc, processConfiguration, moduleLabel);
445  if (ignoreFilters && filterAction != WorkerInPath::Ignore && worker->moduleType()==Worker::kFilter) {
446  // We have a filter on an end path, and the filter is not explicitly ignored.
447  // See if the filter is allowed.
448  std::vector<std::string> allowed_filters = proc_pset.getUntrackedParameter<vstring>("@filters_on_endpaths");
449  if (!search_all(allowed_filters, worker->description().moduleName())) {
450  // Filter is not allowed. Ignore the result, and issue a warning.
451  filterAction = WorkerInPath::Ignore;
452  LogWarning("FilterOnEndPath")
453  << "The EDFilter '" << worker->description().moduleName() << "' with module label '" << moduleLabel << "' appears on EndPath '" << name << "'.\n"
454  << "The return value of the filter will be ignored.\n"
455  << "To suppress this warning, either remove the filter from the endpath,\n"
456  << "or explicitly ignore it in the configuration by using cms.ignore().\n";
457  }
458  }
459  tmpworkers.emplace_back(worker, filterAction, placeInPath);
460  ++placeInPath;
461  }
462 
463  out.swap(tmpworkers);
464  }
465 
469  boost::shared_ptr<ProcessConfiguration const> processConfiguration,
470  int bitpos, std::string const& name, TrigResPtr trptr,
471  vstring* labelsOnTriggerPaths) {
472  PathWorkers tmpworkers;
473  Workers holder;
474  fillWorkers(proc_pset, preg, prealloc, processConfiguration, name, false, tmpworkers, labelsOnTriggerPaths);
475 
476  for (PathWorkers::iterator wi(tmpworkers.begin()),
477  we(tmpworkers.end()); wi != we; ++wi) {
478  holder.push_back(wi->getWorker());
479  }
480 
481  // an empty path will cause an extra bit that is not used
482  if (!tmpworkers.empty()) {
483  trig_paths_.emplace_back(bitpos, name, tmpworkers, trptr, actionTable(), actReg_, &streamContext_, PathContext::PathType::kPath);
484  if (wantSummary_) {
485  trig_paths_.back().useStopwatch();
486  }
487  } else {
488  empty_trig_paths_.push_back(bitpos);
489  empty_trig_path_names_.push_back(name);
490  }
491  for_all(holder, boost::bind(&StreamSchedule::addToAllWorkers, this, _1));
492  }
493 
497  boost::shared_ptr<ProcessConfiguration const> processConfiguration,
498  int bitpos, std::string const& name) {
499  PathWorkers tmpworkers;
500  fillWorkers(proc_pset, preg, prealloc, processConfiguration, name, true, tmpworkers, 0);
501  Workers holder;
502 
503  for (PathWorkers::iterator wi(tmpworkers.begin()), we(tmpworkers.end()); wi != we; ++wi) {
504  holder.push_back(wi->getWorker());
505  }
506 
507  if (!tmpworkers.empty()) {
508  end_paths_.emplace_back(bitpos, name, tmpworkers, TrigResPtr(), actionTable(), actReg_, &streamContext_, PathContext::PathType::kEndPath);
509  if (wantSummary_) {
510  end_paths_.back().useStopwatch();
511  }
512  }
513  for_all(holder, boost::bind(&StreamSchedule::addToAllWorkers, this, _1));
514  }
515 
518  }
519 
522  }
523 
525  std::string const& iLabel) {
526  Worker* found = nullptr;
527  for (auto const& worker : allWorkers()) {
528  if (worker->description().moduleLabel() == iLabel) {
529  found = worker;
530  break;
531  }
532  }
533  if (nullptr == found) {
534  return;
535  }
536 
537  iMod->replaceModuleFor(found);
539  }
540 
541  std::vector<ModuleDescription const*>
543  std::vector<ModuleDescription const*> result;
544  result.reserve(allWorkers().size());
545 
546  for (auto const& worker : allWorkers()) {
547  ModuleDescription const* p = worker->descPtr();
548  result.push_back(p);
549  }
550  return result;
551  }
552 
553  void
554  StreamSchedule::availablePaths(std::vector<std::string>& oLabelsToFill) const {
555  oLabelsToFill.reserve(trig_paths_.size());
556  std::transform(trig_paths_.begin(),
557  trig_paths_.end(),
558  std::back_inserter(oLabelsToFill),
559  boost::bind(&Path::name, _1));
560  }
561 
562  void
564  std::vector<std::string>& oLabelsToFill) const {
565  TrigPaths::const_iterator itFound =
566  std::find_if (trig_paths_.begin(),
567  trig_paths_.end(),
568  boost::bind(std::equal_to<std::string>(),
569  iPathLabel,
570  boost::bind(&Path::name, _1)));
571  if (itFound!=trig_paths_.end()) {
572  oLabelsToFill.reserve(itFound->size());
573  for (size_t i = 0; i < itFound->size(); ++i) {
574  oLabelsToFill.push_back(itFound->getWorker(i)->description().moduleLabel());
575  }
576  }
577  }
578 
579  void
581  endpathsAreActive_ = active;
582  }
583 
584  bool
586  return endpathsAreActive_;
587  }
588 
589  static void
591  size_t which,
592  ModuleInPathSummary& sum) {
593  sum.timesVisited += path.timesVisited(which);
594  sum.timesPassed += path.timesPassed(which);
595  sum.timesFailed += path.timesFailed(which);
596  sum.timesExcept += path.timesExcept(which);
597  sum.moduleLabel = path.getWorker(which)->description().moduleLabel();
598  }
599 
600  static void
602  sum.name = path.name();
603  sum.bitPosition = path.bitPosition();
604  sum.timesRun += path.timesRun();
605  sum.timesPassed += path.timesPassed();
606  sum.timesFailed += path.timesFailed();
607  sum.timesExcept += path.timesExcept();
608 
609  Path::size_type sz = path.size();
610  if(sum.moduleInPathSummaries.size()==0) {
611  std::vector<ModuleInPathSummary> temp(sz);
612  for (size_t i = 0; i != sz; ++i) {
613  fillModuleInPathSummary(path, i, temp[i]);
614  }
615  sum.moduleInPathSummaries.swap(temp);
616  } else {
617  assert(sz == sum.moduleInPathSummaries.size());
618  for (size_t i = 0; i != sz; ++i) {
620  }
621  }
622  }
623 
624  static void
626  sum.timesVisited += w.timesVisited();
627  sum.timesRun += w.timesRun();
628  sum.timesPassed += w.timesPassed();
629  sum.timesFailed += w.timesFailed();
630  sum.timesExcept += w.timesExcept();
631  sum.moduleLabel = w.description().moduleLabel();
632  }
633 
634  static void
636  fillWorkerSummaryAux(*pw, sum);
637  }
638 
639  void
644 
645  fill_summary(trig_paths_, rep.trigPathSummaries, &fillPathSummary);
646  fill_summary(end_paths_, rep.endPathSummaries, &fillPathSummary);
647  fill_summary(allWorkers(), rep.workerSummaries, &fillWorkerSummary);
648  }
649 
650  static void
652  size_t which,
654  sum.timesVisited = +path.timesVisited(which);
655  auto times = path.timeCpuReal(which);
656  sum.cpuTime += times.first;
657  sum.realTime += path.timesFailed(which);
658  sum.moduleLabel = path.getWorker(which)->description().moduleLabel();
659  }
660 
661  static void
663  sum.name = path.name();
664  sum.bitPosition = path.bitPosition();
665  sum.timesRun += path.timesRun();
666  auto times = path.timeCpuReal();
667  sum.cpuTime += times.first;
668  sum.realTime += times.second;
669 
670  Path::size_type sz = path.size();
671  if(sum.moduleInPathSummaries.size()==0) {
672  std::vector<ModuleInPathTimingSummary> temp(sz);
673  for (size_t i = 0; i != sz; ++i) {
674  fillModuleInPathTimingSummary(path, i, temp[i]);
675  }
676  sum.moduleInPathSummaries.swap(temp);
677  } else {
678  assert(sz == sum.moduleInPathSummaries.size());
679  for (size_t i = 0; i != sz; ++i) {
681  }
682  }
683  }
684 
685  static void
687  sum.timesVisited += w.timesVisited();
688  sum.timesRun += w.timesRun();
689  auto times = w.timeCpuReal();
690  sum.cpuTime += times.first;
691  sum.realTime += times.second;
692  sum.moduleLabel = w.description().moduleLabel();
693  }
694 
695  static void
697  fillWorkerTimingSummaryAux(*pw, sum);
698  }
699 
700  void
703 
706  fill_summary(allWorkers(), rep.workerSummaries, &fillWorkerTimingSummary);
707  }
708 
709  void
712  for_all(trig_paths_, boost::bind(&Path::clearCounters, _1));
713  for_all(end_paths_, boost::bind(&Path::clearCounters, _1));
714  for_all(allWorkers(), boost::bind(&Worker::clearCounters, _1));
715  }
716 
717  void
719  results_->reset();
720  }
721 
722  void
725  }
726 
727  void
729  //must be sure we have cleared the count first
730  for(auto& count:earlyDeleteBranchToCount_) {
731  count.second = 0;
732  }
733  //now reset based on how many helpers use that branch
735  ++(earlyDeleteBranchToCount_[index].second);
736  }
737  for(auto& helper: earlyDeleteHelpers_) {
738  helper.reset();
739  }
740  }
741 
742 }
static void fillModuleInPathSummary(Path const &path, size_t which, ModuleInPathSummary &sum)
dbl * delta
Definition: mlp_gen.cc:36
T getParameter(std::string const &) const
std::vector< PathSummary > endPathSummaries
Definition: TriggerReport.h:64
T getUntrackedParameter(std::string const &, T const &) const
std::vector< PathTimingSummary > endPathSummaries
int i
Definition: DBlmapReader.cc:9
string rep
Definition: cuy.py:1188
int timesRun() const
Definition: Path.h:80
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
int bitPosition() const
Definition: Path.h:64
std::vector< Worker * > Workers
ModuleDescription const & description() const
Definition: Worker.h:97
std::vector< int > empty_trig_paths_
void endStream(StreamID iID, StreamContext &streamContext)
const double w
Definition: UKUtility.cc:23
virtual void replaceModuleFor(Worker *) const =0
int totalEventsFailed() const
void initializeEarlyDelete(ModuleRegistry &modReg, edm::ParameterSet const &opts, edm::ProductRegistry const &preg, bool allowEarlyDelete)
void setOnDemandProducts(ProductRegistry &pregistry, std::set< std::string > const &unscheduledLabels) const
WorkersInPath::size_type size_type
Definition: Path.h:47
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
std::vector< std::pair< BranchID, unsigned int > > earlyDeleteBranchToCount_
int timesPassed() const
Definition: Worker.h:125
volatile bool endpathsAreActive_
int totalEvents() const
void addToUnscheduledWorkers(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, boost::shared_ptr< ProcessConfiguration > processConfiguration, std::string label, bool useStopwatch, std::set< std::string > &unscheduledLabels, std::vector< std::string > &shouldBeUsedLabels)
void clearCounters()
Definition: Worker.h:117
std::string const & moduleName() const
processConfiguration
Definition: Schedule.cc:369
void addToAllWorkers(Worker *w)
int timesExcept() const
Definition: Path.h:83
size_type size() const
Definition: Path.h:87
ParameterSet getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
endpathsAreActive_(true)
Definition: Schedule.cc:374
ExceptionToActionTable const & actionTable() const
returns the action table
void beginStream(StreamID id, StreamContext &streamContext)
Definition: Worker.cc:137
std::vector< WorkerSummary > workerSummaries
Definition: TriggerReport.h:65
#define nullptr
actions
Definition: Schedule.cc:369
std::string const & moduleLabel() const
std::pair< double, double > timeCpuReal() const
Definition: Worker.h:113
unsigned int number_of_unscheduled_modules_
WorkerManager workerManager_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
std::string moduleLabel
Definition: TriggerReport.h:56
std::vector< ModuleInPathTimingSummary > moduleInPathSummaries
Worker * getWorker(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, boost::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &label)
static void fillPathTimingSummary(Path const &path, PathTimingSummary &sum)
std::vector< WorkerInPath > PathWorkers
boost::shared_ptr< ActivityRegistry > actReg_
int timesExcept() const
Definition: Worker.h:127
boost::shared_ptr< Worker > WorkerPtr
int totalEventsPassed() const
void getTriggerReport(TriggerReport &rep) const
tuple path
else: Piece not in the list, fine.
std::vector< PathSummary > trigPathSummaries
Definition: TriggerReport.h:63
EventSummary eventSummary
Definition: TriggerReport.h:62
int timesVisited() const
Definition: Worker.h:124
wantSummary_(tns.wantSummary())
std::string name
Definition: TriggerReport.h:44
EventTimingSummary eventSummary
static void fillWorkerTimingSummary(Worker const *pw, WorkerTimingSummary &sum)
static void fillModuleInPathTimingSummary(Path const &path, size_t which, ModuleInPathTimingSummary &sum)
Definition: Path.h:42
tuple result
Definition: query.py:137
void fillWorkers(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, boost::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &name, bool ignoreFilters, PathWorkers &out, vstring *labelsOnPaths)
StreamContext streamContext_
std::vector< std::string > vstring
static void fillPathSummary(Path const &path, PathSummary &sum)
void fillEndPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, boost::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name)
int timesPassed() const
Definition: Path.h:81
std::vector< PathTimingSummary > trigPathSummaries
bool getMapped(key_type const &k, value_type &result) const
Definition: Registry.cc:20
int timesRun() const
Definition: Worker.h:123
#define end
Definition: vmac.h:37
vstring empty_trig_path_names_
areg
Definition: Schedule.cc:369
static void fillWorkerSummaryAux(Worker const &w, WorkerSummary &sum)
tuple out
Definition: dbtoconf.py:99
virtual Types moduleType() const =0
void beginStream(StreamID iID, StreamContext &streamContext)
void clearCounters()
Clear all the counters in the trigger report.
WorkerPtr results_inserter_
unsigned int value() const
Definition: StreamID.h:46
boost::array< SelectedProducts, NumBranchTypes > SelectedProductsForBranchType
std::vector< unsigned int > earlyDeleteHelperToBranchIndicies_
void forAllModuleHolders(F iFunc)
boost::shared_ptr< HLTGlobalStatus > TrigResPtr
bool search_all(ForwardSequence const &s, Datum const &d)
Definition: Algorithms.h:46
int timesVisited(size_type i) const
Definition: Path.h:88
double b
Definition: hdecay.h:120
std::pair< double, double > timeCpuReal() const
Definition: Path.h:67
StreamSchedule(TriggerResultInserter *inserter, boost::shared_ptr< ModuleRegistry >, ParameterSet &proc_pset, service::TriggerNamesService &tns, PreallocationConfiguration const &prealloc, ProductRegistry &pregistry, BranchIDListHelper &branchIDListHelper, ExceptionToActionTable const &actions, boost::shared_ptr< ActivityRegistry > areg, boost::shared_ptr< ProcessConfiguration > processConfiguration, bool allowEarlyDelete, StreamID streamID, ProcessContext const *processContext)
virtual std::unique_ptr< OutputModuleCommunicator > createOutputModuleCommunicator()=0
void setTrigResultForStream(unsigned int iStreamIndex, const TrigResPtr &trptr)
void getTriggerTimingReport(TriggerTimingReport &rep) const
static void fillWorkerTimingSummaryAux(Worker const &w, WorkerTimingSummary &sum)
int timesFailed() const
Definition: Worker.h:126
static void fillWorkerSummary(Worker const *pw, WorkerSummary &sum)
bool endPathsEnabled() const
std::string const & name() const
Definition: Path.h:65
#define begin
Definition: vmac.h:30
Worker const * getWorker(size_type i) const
Definition: Path.h:92
void replaceModule(maker::ModuleHolder *iMod, std::string const &iLabel)
clone the type of module with label iLabel but configure with iPSet.
void enableEndPaths(bool active)
std::vector< ModuleInPathSummary > moduleInPathSummaries
Definition: TriggerReport.h:45
void addToAllWorkers(Worker *w, bool useStopwatch)
preg
Definition: Schedule.cc:369
std::vector< WorkerTimingSummary > workerSummaries
void fillTrigPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, boost::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, TrigResPtr, vstring *labelsOnTriggerPaths)
std::vector< std::string > vstring
Definition: Schedule.cc:354
tuple size
Write out results.
static Registry * instance()
Definition: Registry.cc:14
void clearCounters()
Definition: Path.cc:161
void availablePaths(std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill the labels for all paths in the process
std::vector< EarlyDeleteHelper > earlyDeleteHelpers_
void modulesInPath(std::string const &iPathLabel, std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel ...
prealloc
Definition: Schedule.cc:369
int timesFailed() const
Definition: Path.h:82
ParameterSet * getPSetForUpdate(std::string const &name, bool &isTracked)