CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
StreamSchedule.cc
Go to the documentation of this file.
2 
26 
27 #include "boost/bind.hpp"
28 #include "boost/ref.hpp"
29 
30 #include <algorithm>
31 #include <cassert>
32 #include <cstdlib>
33 #include <functional>
34 #include <iomanip>
35 #include <list>
36 #include <map>
37 #include <exception>
38 
39 namespace edm {
40  namespace {
41 
42  // Function template to transform each element in the input range to
43  // a value placed into the output range. The supplied function
44  // should take a const_reference to the 'input', and write to a
45  // reference to the 'output'.
46  template <typename InputIterator, typename ForwardIterator, typename Func>
47  void
48  transform_into(InputIterator begin, InputIterator end,
49  ForwardIterator out, Func func) {
50  for (; begin != end; ++begin, ++out) func(*begin, *out);
51  }
52 
53  // Function template that takes a sequence 'from', a sequence
54  // 'to', and a callable object 'func'. It and applies
55  // transform_into to fill the 'to' sequence with the values
56  // calcuated by the callable object, taking care to fill the
57  // outupt only if all calls succeed.
58  template <typename FROM, typename TO, typename FUNC>
59  void
60  fill_summary(FROM const& from, TO& to, FUNC func) {
61  if(to.size()!=from.size()) {
62  TO temp(from.size());
63  transform_into(from.begin(), from.end(), temp.begin(), func);
64  to.swap(temp);
65  } else {
66  transform_into(from.begin(), from.end(), to.begin(), func);
67  }
68  }
69 
70  // -----------------------------
71 
72  // Here we make the trigger results inserter directly. This should
73  // probably be a utility in the WorkerRegistry or elsewhere.
74 
76  makeInserter(ExceptionToActionTable const& actions,
77  boost::shared_ptr<ActivityRegistry> areg,
78  TriggerResultInserter* inserter) {
79  StreamSchedule::WorkerPtr ptr(new edm::WorkerT<TriggerResultInserter::ModuleType>(inserter, inserter->moduleDescription(), &actions));
80  ptr->setActivityRegistry(areg);
81  return ptr;
82  }
83 
84  bool binary_search_string(std::vector<std::string> const& v, std::string const& s) {
85  return std::binary_search(v.begin(), v.end(), s);
86  }
87 
88  void
89  initializeBranchToReadingWorker(ParameterSet const& opts,
90  ProductRegistry const& preg,
91  std::multimap<std::string,Worker*>& branchToReadingWorker)
92  {
93  // See if any data has been marked to be deleted early (removing any duplicates)
94  auto vBranchesToDeleteEarly = opts.getUntrackedParameter<std::vector<std::string>>("canDeleteEarly",std::vector<std::string>());
95  if(not vBranchesToDeleteEarly.empty()) {
96  std::sort(vBranchesToDeleteEarly.begin(),vBranchesToDeleteEarly.end(),std::less<std::string>());
97  vBranchesToDeleteEarly.erase(std::unique(vBranchesToDeleteEarly.begin(),vBranchesToDeleteEarly.end()),
98  vBranchesToDeleteEarly.end());
99 
100  // Are the requested items in the product registry?
101  auto allBranchNames = preg.allBranchNames();
102  //the branch names all end with a period, which we do not want to compare with
103  for(auto & b:allBranchNames) {
104  b.resize(b.size()-1);
105  }
106  std::sort(allBranchNames.begin(),allBranchNames.end(),std::less<std::string>());
107  std::vector<std::string> temp;
108  temp.reserve(vBranchesToDeleteEarly.size());
109 
110  std::set_intersection(vBranchesToDeleteEarly.begin(),vBranchesToDeleteEarly.end(),
111  allBranchNames.begin(),allBranchNames.end(),
112  std::back_inserter(temp));
113  vBranchesToDeleteEarly.swap(temp);
114  if(temp.size() != vBranchesToDeleteEarly.size()) {
115  std::vector<std::string> missingProducts;
116  std::set_difference(temp.begin(),temp.end(),
117  vBranchesToDeleteEarly.begin(),vBranchesToDeleteEarly.end(),
118  std::back_inserter(missingProducts));
119  LogInfo l("MissingProductsForCanDeleteEarly");
120  l<<"The following products in the 'canDeleteEarly' list are not available in this job and will be ignored.";
121  for(auto const& n:missingProducts){
122  l<<"\n "<<n;
123  }
124  }
125  //set placeholder for the branch, we will remove the nullptr if a
126  // module actually wants the branch.
127  for(auto const& branch:vBranchesToDeleteEarly) {
128  branchToReadingWorker.insert(std::make_pair(branch, static_cast<Worker*>(nullptr)));
129  }
130  }
131  }
132  }
133 
134  // -----------------------------
135 
136  typedef std::vector<std::string> vstring;
137 
138  // -----------------------------
139 
141  boost::shared_ptr<ModuleRegistry> modReg,
142  ParameterSet& proc_pset,
146  BranchIDListHelper& branchIDListHelper,
147  ExceptionToActionTable const& actions,
148  boost::shared_ptr<ActivityRegistry> areg,
149  boost::shared_ptr<ProcessConfiguration> processConfiguration,
150  bool allowEarlyDelete,
151  StreamID streamID,
152  ProcessContext const* processContext) :
153  workerManager_(modReg,areg, actions),
154  actReg_(areg),
155  trig_name_list_(tns.getTrigPaths()),
156  end_path_name_list_(tns.getEndPaths()),
157  results_(new HLTGlobalStatus(trig_name_list_.size())),
158  results_inserter_(),
159  trig_paths_(),
160  end_paths_(),
161  stopwatch_(tns.wantSummary() ? new RunStopwatch::StopwatchPointer::element_type : static_cast<RunStopwatch::StopwatchPointer::element_type*> (nullptr)),
162  total_events_(),
163  total_passed_(),
164  number_of_unscheduled_modules_(0),
165  streamID_(streamID),
166  streamContext_(streamID_, processContext),
167  wantSummary_(tns.wantSummary()),
169 
170  ParameterSet const& opts = proc_pset.getUntrackedParameterSet("options", ParameterSet());
171  bool hasPath = false;
172 
173  int trig_bitpos = 0;
174  trig_paths_.reserve(trig_name_list_.size());
175  vstring labelsOnTriggerPaths;
176  for (auto const& trig_name : trig_name_list_) {
177  fillTrigPath(proc_pset, preg, &prealloc, processConfiguration, trig_bitpos, trig_name, results_, &labelsOnTriggerPaths);
178  ++trig_bitpos;
179  hasPath = true;
180  }
181 
182  if (hasPath) {
183  // the results inserter stands alone
184  inserter->setTrigResultForStream(streamID.value(),results_);
185 
186  results_inserter_ = makeInserter(actions, actReg_, inserter);
188  }
189 
190  // fill normal endpaths
191  int bitpos = 0;
192  end_paths_.reserve(end_path_name_list_.size());
193  for (auto const& end_path_name : end_path_name_list_) {
194  fillEndPath(proc_pset, preg, &prealloc, processConfiguration, bitpos, end_path_name);
195  ++bitpos;
196  }
197 
198  //See if all modules were used
199  std::set<std::string> usedWorkerLabels;
200  for (auto const& worker : allWorkers()) {
201  usedWorkerLabels.insert(worker->description().moduleLabel());
202  }
203  std::vector<std::string> modulesInConfig(proc_pset.getParameter<std::vector<std::string> >("@all_modules"));
204  std::set<std::string> modulesInConfigSet(modulesInConfig.begin(), modulesInConfig.end());
205  std::vector<std::string> unusedLabels;
206  set_difference(modulesInConfigSet.begin(), modulesInConfigSet.end(),
207  usedWorkerLabels.begin(), usedWorkerLabels.end(),
208  back_inserter(unusedLabels));
209  //does the configuration say we should allow on demand?
210  bool allowUnscheduled = opts.getUntrackedParameter<bool>("allowUnscheduled", false);
211  std::set<std::string> unscheduledLabels;
212  std::vector<std::string> shouldBeUsedLabels;
213  if (!unusedLabels.empty()) {
214  //Need to
215  // 1) create worker
216  // 2) if it is a WorkerT<EDProducer>, add it to our list
217  // 3) hand list to our delayed reader
218  for (auto const& label : unusedLabels) {
219  if (allowUnscheduled) {
220  bool isTracked;
221  ParameterSet* modulePSet(proc_pset.getPSetForUpdate(label, isTracked));
222  assert(isTracked);
223  assert(modulePSet != nullptr);
224  workerManager_.addToUnscheduledWorkers(*modulePSet, preg, &prealloc, processConfiguration, label, wantSummary_, unscheduledLabels, shouldBeUsedLabels);
225  } else {
226  //everthing is marked are unused so no 'on demand' allowed
227  shouldBeUsedLabels.push_back(label);
228  }
229  }
230  if (!shouldBeUsedLabels.empty()) {
231  std::ostringstream unusedStream;
232  unusedStream << "'" << shouldBeUsedLabels.front() << "'";
233  for (std::vector<std::string>::iterator itLabel = shouldBeUsedLabels.begin() + 1,
234  itLabelEnd = shouldBeUsedLabels.end();
235  itLabel != itLabelEnd;
236  ++itLabel) {
237  unusedStream << ",'" << *itLabel << "'";
238  }
239  LogInfo("path")
240  << "The following module labels are not assigned to any path:\n"
241  << unusedStream.str()
242  << "\n";
243  }
244  }
245  if (!unscheduledLabels.empty()) {
246  number_of_unscheduled_modules_=unscheduledLabels.size();
247  workerManager_.setOnDemandProducts(preg, unscheduledLabels);
248  }
249 
250 
251  initializeEarlyDelete(*modReg, opts,preg,allowEarlyDelete);
252 
253  } // StreamSchedule::StreamSchedule
254 
255 
258  bool allowEarlyDelete) {
259  //for now, if have a subProcess, don't allow early delete
260  // In the future we should use the SubProcess's 'keep list' to decide what can be kept
261  if(not allowEarlyDelete) return;
262 
263  //see if 'canDeleteEarly' was set and if so setup the list with those products actually
264  // registered for this job
265  std::multimap<std::string,Worker*> branchToReadingWorker;
266  initializeBranchToReadingWorker(opts,preg,branchToReadingWorker);
267 
268  //If no delete early items have been specified we don't have to do anything
269  if(branchToReadingWorker.size()==0) {
270  return;
271  }
272  const std::vector<std::string> kEmpty;
273  std::map<Worker*,unsigned int> reserveSizeForWorker;
274  unsigned int upperLimitOnReadingWorker =0;
275  unsigned int upperLimitOnIndicies = 0;
276  unsigned int nUniqueBranchesToDelete=branchToReadingWorker.size();
277 
278  //talk with output modules first
279  modReg.forAllModuleHolders([this, &branchToReadingWorker,&nUniqueBranchesToDelete](maker::ModuleHolder* iHolder){
280  auto comm = iHolder->createOutputModuleCommunicator();
281  if (comm) {
282  if(branchToReadingWorker.size()>0) {
283  //If an OutputModule needs a product, we can't delete it early
284  // so we should remove it from our list
285  SelectedProductsForBranchType const&kept = comm->keptProducts();
286  for( auto const& item: kept[InEvent]) {
287  auto found = branchToReadingWorker.equal_range(item->branchName());
288  if(found.first !=found.second) {
289  --nUniqueBranchesToDelete;
290  branchToReadingWorker.erase(found.first,found.second);
291  }
292  }
293  }
294  }
295  });
296 
297  if(branchToReadingWorker.size()==0) {
298  return;
299  }
300 
301  for (auto w :allWorkers()) {
302  //determine if this module could read a branch we want to delete early
303  auto pset = pset::Registry::instance()->getMapped(w->description().parameterSetID());
304  if(0!=pset) {
305  auto branches = pset->getUntrackedParameter<std::vector<std::string>>("mightGet",kEmpty);
306  if(not branches.empty()) {
307  ++upperLimitOnReadingWorker;
308  }
309  for(auto const& branch:branches){
310  auto found = branchToReadingWorker.equal_range(branch);
311  if(found.first != found.second) {
312  ++upperLimitOnIndicies;
313  ++reserveSizeForWorker[w];
314  if(nullptr == found.first->second) {
315  found.first->second = w;
316  } else {
317  branchToReadingWorker.insert(make_pair(found.first->first,w));
318  }
319  }
320  }
321  }
322  }
323  {
324  auto it = branchToReadingWorker.begin();
325  std::vector<std::string> unusedBranches;
326  while(it !=branchToReadingWorker.end()) {
327  if(it->second == nullptr) {
328  unusedBranches.push_back(it->first);
329  //erasing the object invalidates the iterator so must advance it first
330  auto temp = it;
331  ++it;
332  branchToReadingWorker.erase(temp);
333  } else {
334  ++it;
335  }
336  }
337  if(not unusedBranches.empty()) {
338  LogWarning l("UnusedProductsForCanDeleteEarly");
339  l<<"The following products in the 'canDeleteEarly' list are not used in this job and will be ignored.\n"
340  " If possible, remove the producer from the job or add the product to the producer's own 'mightGet' list.";
341  for(auto const& n:unusedBranches){
342  l<<"\n "<<n;
343  }
344  }
345  }
346  if(0!=branchToReadingWorker.size()) {
347  earlyDeleteHelpers_.reserve(upperLimitOnReadingWorker);
348  earlyDeleteHelperToBranchIndicies_.resize(upperLimitOnIndicies,0);
349  earlyDeleteBranchToCount_.reserve(nUniqueBranchesToDelete);
350  std::map<const Worker*,EarlyDeleteHelper*> alreadySeenWorkers;
351  std::string lastBranchName;
352  size_t nextOpenIndex = 0;
353  unsigned int* beginAddress = &(earlyDeleteHelperToBranchIndicies_.front());
354  for(auto& branchAndWorker:branchToReadingWorker) {
355  if(lastBranchName != branchAndWorker.first) {
356  //have to put back the period we removed earlier in order to get the proper name
357  BranchID bid(branchAndWorker.first+".");
358  earlyDeleteBranchToCount_.emplace_back(std::make_pair(bid,0U));
359  lastBranchName = branchAndWorker.first;
360  }
361  auto found = alreadySeenWorkers.find(branchAndWorker.second);
362  if(alreadySeenWorkers.end() == found) {
363  //NOTE: we will set aside enough space in earlyDeleteHelperToBranchIndicies_ to accommodate
364  // all the branches that might be read by this worker. However, initially we will only tell the
365  // EarlyDeleteHelper about the first one. As additional branches are added via 'appendIndex' the
366  // EarlyDeleteHelper will automatically advance its internal end pointer.
367  size_t index = nextOpenIndex;
368  size_t nIndices = reserveSizeForWorker[branchAndWorker.second];
370  earlyDeleteHelpers_.emplace_back(EarlyDeleteHelper(beginAddress+index,
371  beginAddress+index+1,
373  branchAndWorker.second->setEarlyDeleteHelper(&(earlyDeleteHelpers_.back()));
374  alreadySeenWorkers.insert(std::make_pair(branchAndWorker.second,&(earlyDeleteHelpers_.back())));
375  nextOpenIndex +=nIndices;
376  } else {
377  found->second->appendIndex(earlyDeleteBranchToCount_.size()-1);
378  }
379  }
380 
381  //Now we can compactify the earlyDeleteHelperToBranchIndicies_ since we may have over estimated the
382  // space needed for each module
383  auto itLast = earlyDeleteHelpers_.begin();
384  for(auto it = earlyDeleteHelpers_.begin()+1;it != earlyDeleteHelpers_.end();++it) {
385  if(itLast->end() != it->begin()) {
386  //figure the offset for next Worker since it hasn't been moved yet so it has the original address
387  unsigned int delta = it->begin()- itLast->end();
388  it->shiftIndexPointers(delta);
389 
391  (itLast->end()-beginAddress),
393  (it->begin()-beginAddress));
394  }
395  itLast = it;
396  }
397  earlyDeleteHelperToBranchIndicies_.erase(earlyDeleteHelperToBranchIndicies_.begin()+(itLast->end()-beginAddress),
399 
400  //now tell the paths about the deleters
401  for(auto& p : trig_paths_) {
402  p.setEarlyDeleteHelpers(alreadySeenWorkers);
403  }
404  for(auto& p : end_paths_) {
405  p.setEarlyDeleteHelpers(alreadySeenWorkers);
406  }
408  }
409  }
410 
414  boost::shared_ptr<ProcessConfiguration const> processConfiguration,
415  std::string const& name,
416  bool ignoreFilters,
417  PathWorkers& out,
418  vstring* labelsOnPaths) {
419  vstring modnames = proc_pset.getParameter<vstring>(name);
420  PathWorkers tmpworkers;
421 
422  unsigned int placeInPath = 0;
423  for (auto const& name : modnames) {
424 
425  if (labelsOnPaths) labelsOnPaths->push_back(name);
426 
428  if (name[0] == '!') filterAction = WorkerInPath::Veto;
429  else if (name[0] == '-') filterAction = WorkerInPath::Ignore;
430 
431  std::string moduleLabel = name;
432  if (filterAction != WorkerInPath::Normal) moduleLabel.erase(0, 1);
433 
434  bool isTracked;
435  ParameterSet* modpset = proc_pset.getPSetForUpdate(moduleLabel, isTracked);
436  if (modpset == 0) {
437  std::string pathType("endpath");
438  if (!search_all(end_path_name_list_, name)) {
439  pathType = std::string("path");
440  }
442  "The unknown module label \"" << moduleLabel <<
443  "\" appears in " << pathType << " \"" << name <<
444  "\"\n please check spelling or remove that label from the path.";
445  }
446  assert(isTracked);
447 
448  Worker* worker = workerManager_.getWorker(*modpset, preg, prealloc, processConfiguration, moduleLabel);
449  if (ignoreFilters && filterAction != WorkerInPath::Ignore && worker->moduleType()==Worker::kFilter) {
450  // We have a filter on an end path, and the filter is not explicitly ignored.
451  // See if the filter is allowed.
452  std::vector<std::string> allowed_filters = proc_pset.getUntrackedParameter<vstring>("@filters_on_endpaths");
453  if (!search_all(allowed_filters, worker->description().moduleName())) {
454  // Filter is not allowed. Ignore the result, and issue a warning.
455  filterAction = WorkerInPath::Ignore;
456  LogWarning("FilterOnEndPath")
457  << "The EDFilter '" << worker->description().moduleName() << "' with module label '" << moduleLabel << "' appears on EndPath '" << name << "'.\n"
458  << "The return value of the filter will be ignored.\n"
459  << "To suppress this warning, either remove the filter from the endpath,\n"
460  << "or explicitly ignore it in the configuration by using cms.ignore().\n";
461  }
462  }
463  tmpworkers.emplace_back(worker, filterAction, placeInPath);
464  ++placeInPath;
465  }
466 
467  out.swap(tmpworkers);
468  }
469 
473  boost::shared_ptr<ProcessConfiguration const> processConfiguration,
474  int bitpos, std::string const& name, TrigResPtr trptr,
475  vstring* labelsOnTriggerPaths) {
476  PathWorkers tmpworkers;
477  Workers holder;
478  fillWorkers(proc_pset, preg, prealloc, processConfiguration, name, false, tmpworkers, labelsOnTriggerPaths);
479 
480  for (PathWorkers::iterator wi(tmpworkers.begin()),
481  we(tmpworkers.end()); wi != we; ++wi) {
482  holder.push_back(wi->getWorker());
483  }
484 
485  // an empty path will cause an extra bit that is not used
486  if (!tmpworkers.empty()) {
487  trig_paths_.emplace_back(bitpos, name, tmpworkers, trptr, actionTable(), actReg_, &streamContext_, PathContext::PathType::kPath);
488  if (wantSummary_) {
489  trig_paths_.back().useStopwatch();
490  }
491  } else {
492  empty_trig_paths_.push_back(bitpos);
493  empty_trig_path_names_.push_back(name);
494  }
495  for_all(holder, boost::bind(&StreamSchedule::addToAllWorkers, this, _1));
496  }
497 
501  boost::shared_ptr<ProcessConfiguration const> processConfiguration,
502  int bitpos, std::string const& name) {
503  PathWorkers tmpworkers;
504  fillWorkers(proc_pset, preg, prealloc, processConfiguration, name, true, tmpworkers, 0);
505  Workers holder;
506 
507  for (PathWorkers::iterator wi(tmpworkers.begin()), we(tmpworkers.end()); wi != we; ++wi) {
508  holder.push_back(wi->getWorker());
509  }
510 
511  if (!tmpworkers.empty()) {
512  end_paths_.emplace_back(bitpos, name, tmpworkers, TrigResPtr(), actionTable(), actReg_, &streamContext_, PathContext::PathType::kEndPath);
513  if (wantSummary_) {
514  end_paths_.back().useStopwatch();
515  }
516  }
517  for_all(holder, boost::bind(&StreamSchedule::addToAllWorkers, this, _1));
518  }
519 
522  }
523 
526  }
527 
528  void StreamSchedule::replaceModule(maker::ModuleHolder* iMod,
529  std::string const& iLabel) {
530  Worker* found = nullptr;
531  for (auto const& worker : allWorkers()) {
532  if (worker->description().moduleLabel() == iLabel) {
533  found = worker;
534  break;
535  }
536  }
537  if (nullptr == found) {
538  return;
539  }
540 
541  iMod->replaceModuleFor(found);
543  }
544 
545  std::vector<ModuleDescription const*>
547  std::vector<ModuleDescription const*> result;
548  result.reserve(allWorkers().size());
549 
550  for (auto const& worker : allWorkers()) {
551  ModuleDescription const* p = worker->descPtr();
552  result.push_back(p);
553  }
554  return result;
555  }
556 
557  void
558  StreamSchedule::availablePaths(std::vector<std::string>& oLabelsToFill) const {
559  oLabelsToFill.reserve(trig_paths_.size());
560  std::transform(trig_paths_.begin(),
561  trig_paths_.end(),
562  std::back_inserter(oLabelsToFill),
563  boost::bind(&Path::name, _1));
564  }
565 
566  void
568  std::vector<std::string>& oLabelsToFill) const {
569  TrigPaths::const_iterator itFound =
570  std::find_if (trig_paths_.begin(),
571  trig_paths_.end(),
572  boost::bind(std::equal_to<std::string>(),
573  iPathLabel,
574  boost::bind(&Path::name, _1)));
575  if (itFound!=trig_paths_.end()) {
576  oLabelsToFill.reserve(itFound->size());
577  for (size_t i = 0; i < itFound->size(); ++i) {
578  oLabelsToFill.push_back(itFound->getWorker(i)->description().moduleLabel());
579  }
580  }
581  }
582 
583  void
585  endpathsAreActive_ = active;
586  }
587 
588  bool
590  return endpathsAreActive_;
591  }
592 
593  static void
595  size_t which,
596  ModuleInPathSummary& sum) {
597  sum.timesVisited = +path.timesVisited(which);
598  sum.timesPassed = +path.timesPassed(which);
599  sum.timesFailed = +path.timesFailed(which);
600  sum.timesExcept = +path.timesExcept(which);
601  sum.moduleLabel = path.getWorker(which)->description().moduleLabel();
602  }
603 
604  static void
606  sum.name = path.name();
607  sum.bitPosition = path.bitPosition();
608  sum.timesRun += path.timesRun();
609  sum.timesPassed += path.timesPassed();
610  sum.timesFailed += path.timesFailed();
611  sum.timesExcept += path.timesExcept();
612 
613  Path::size_type sz = path.size();
614  if(sum.moduleInPathSummaries.size()==0) {
615  std::vector<ModuleInPathSummary> temp(sz);
616  for (size_t i = 0; i != sz; ++i) {
617  fillModuleInPathSummary(path, i, temp[i]);
618  }
619  sum.moduleInPathSummaries.swap(temp);
620  } else {
621  assert(sz == sum.moduleInPathSummaries.size());
622  for (size_t i = 0; i != sz; ++i) {
624  }
625  }
626  }
627 
628  static void
630  sum.timesVisited += w.timesVisited();
631  sum.timesRun += w.timesRun();
632  sum.timesPassed += w.timesPassed();
633  sum.timesFailed += w.timesFailed();
634  sum.timesExcept += w.timesExcept();
635  sum.moduleLabel = w.description().moduleLabel();
636  }
637 
638  static void
640  fillWorkerSummaryAux(*pw, sum);
641  }
642 
643  void
648 
649  fill_summary(trig_paths_, rep.trigPathSummaries, &fillPathSummary);
650  fill_summary(end_paths_, rep.endPathSummaries, &fillPathSummary);
651  fill_summary(allWorkers(), rep.workerSummaries, &fillWorkerSummary);
652  }
653 
654  static void
656  size_t which,
658  sum.timesVisited = +path.timesVisited(which);
659  auto times = path.timeCpuReal(which);
660  sum.cpuTime += times.first;
661  sum.realTime += path.timesFailed(which);
662  sum.moduleLabel = path.getWorker(which)->description().moduleLabel();
663  }
664 
665  static void
667  sum.name = path.name();
668  sum.bitPosition = path.bitPosition();
669  sum.timesRun += path.timesRun();
670  auto times = path.timeCpuReal();
671  sum.cpuTime += times.first;
672  sum.realTime += times.second;
673 
674  Path::size_type sz = path.size();
675  if(sum.moduleInPathSummaries.size()==0) {
676  std::vector<ModuleInPathTimingSummary> temp(sz);
677  for (size_t i = 0; i != sz; ++i) {
678  fillModuleInPathTimingSummary(path, i, temp[i]);
679  }
680  sum.moduleInPathSummaries.swap(temp);
681  } else {
682  assert(sz == sum.moduleInPathSummaries.size());
683  for (size_t i = 0; i != sz; ++i) {
685  }
686  }
687  }
688 
689  static void
691  sum.timesVisited += w.timesVisited();
692  sum.timesRun += w.timesRun();
693  auto times = w.timeCpuReal();
694  sum.cpuTime += times.first;
695  sum.realTime += times.second;
696  sum.moduleLabel = w.description().moduleLabel();
697  }
698 
699  static void
701  fillWorkerTimingSummaryAux(*pw, sum);
702  }
703 
704  void
707 
710  fill_summary(allWorkers(), rep.workerSummaries, &fillWorkerTimingSummary);
711  }
712 
713  void
716  for_all(trig_paths_, boost::bind(&Path::clearCounters, _1));
717  for_all(end_paths_, boost::bind(&Path::clearCounters, _1));
718  for_all(allWorkers(), boost::bind(&Worker::clearCounters, _1));
719  }
720 
721  void
723  results_->reset();
724  }
725 
726  void
729  }
730 
731  void
733  //must be sure we have cleared the count first
734  for(auto& count:earlyDeleteBranchToCount_) {
735  count.second = 0;
736  }
737  //now reset based on how many helpers use that branch
739  ++(earlyDeleteBranchToCount_[index].second);
740  }
741  for(auto& helper: earlyDeleteHelpers_) {
742  helper.reset();
743  }
744  }
745 
746 }
static void fillModuleInPathSummary(Path const &path, size_t which, ModuleInPathSummary &sum)
dbl * delta
Definition: mlp_gen.cc:36
T getParameter(std::string const &) const
std::vector< PathSummary > endPathSummaries
Definition: TriggerReport.h:64
T getUntrackedParameter(std::string const &, T const &) const
std::vector< PathTimingSummary > endPathSummaries
int i
Definition: DBlmapReader.cc:9
string rep
Definition: cuy.py:1188
int timesRun() const
Definition: Path.h:80
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
int bitPosition() const
Definition: Path.h:64
std::vector< Worker * > Workers
ModuleDescription const & description() const
Definition: Worker.h:97
std::vector< int > empty_trig_paths_
void endStream(StreamID iID, StreamContext &streamContext)
int totalEventsFailed() const
void initializeEarlyDelete(ModuleRegistry &modReg, edm::ParameterSet const &opts, edm::ProductRegistry const &preg, bool allowEarlyDelete)
void setOnDemandProducts(ProductRegistry &pregistry, std::set< std::string > const &unscheduledLabels) const
WorkersInPath::size_type size_type
Definition: Path.h:47
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
std::vector< std::pair< BranchID, unsigned int > > earlyDeleteBranchToCount_
int timesPassed() const
Definition: Worker.h:124
volatile bool endpathsAreActive_
#define nullptr
int totalEvents() const
void addToUnscheduledWorkers(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, boost::shared_ptr< ProcessConfiguration > processConfiguration, std::string label, bool useStopwatch, std::set< std::string > &unscheduledLabels, std::vector< std::string > &shouldBeUsedLabels)
void clearCounters()
Definition: Worker.h:116
std::string const & moduleName() const
processConfiguration
Definition: Schedule.cc:362
void addToAllWorkers(Worker *w)
int timesExcept() const
Definition: Path.h:83
size_type size() const
Definition: Path.h:87
ParameterSet getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
endpathsAreActive_(true)
Definition: Schedule.cc:367
ExceptionToActionTable const & actionTable() const
returns the action table
void beginStream(StreamID id, StreamContext &streamContext)
Definition: Worker.cc:155
std::vector< WorkerSummary > workerSummaries
Definition: TriggerReport.h:65
actions
Definition: Schedule.cc:362
std::string const & moduleLabel() const
std::pair< double, double > timeCpuReal() const
Definition: Worker.h:112
unsigned int number_of_unscheduled_modules_
WorkerManager workerManager_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
std::string moduleLabel
Definition: TriggerReport.h:56
std::vector< ModuleInPathTimingSummary > moduleInPathSummaries
Worker * getWorker(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, boost::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &label)
static void fillPathTimingSummary(Path const &path, PathTimingSummary &sum)
std::vector< WorkerInPath > PathWorkers
boost::shared_ptr< ActivityRegistry > actReg_
int timesExcept() const
Definition: Worker.h:126
boost::shared_ptr< Worker > WorkerPtr
int totalEventsPassed() const
void getTriggerReport(TriggerReport &rep) const
std::vector< PathSummary > trigPathSummaries
Definition: TriggerReport.h:63
EventSummary eventSummary
Definition: TriggerReport.h:62
int timesVisited() const
Definition: Worker.h:123
wantSummary_(tns.wantSummary())
std::string name
Definition: TriggerReport.h:44
EventTimingSummary eventSummary
static void fillWorkerTimingSummary(Worker const *pw, WorkerTimingSummary &sum)
static void fillModuleInPathTimingSummary(Path const &path, size_t which, ModuleInPathTimingSummary &sum)
Definition: Path.h:42
tuple result
Definition: query.py:137
void fillWorkers(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, boost::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &name, bool ignoreFilters, PathWorkers &out, vstring *labelsOnPaths)
StreamContext streamContext_
std::vector< std::string > vstring
static void fillPathSummary(Path const &path, PathSummary &sum)
void fillEndPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, boost::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name)
int timesPassed() const
Definition: Path.h:81
std::vector< PathTimingSummary > trigPathSummaries
bool getMapped(key_type const &k, value_type &result) const
Definition: Registry.cc:20
int timesRun() const
Definition: Worker.h:122
#define end
Definition: vmac.h:37
vstring empty_trig_path_names_
areg
Definition: Schedule.cc:362
static void fillWorkerSummaryAux(Worker const &w, WorkerSummary &sum)
tuple out
Definition: dbtoconf.py:99
virtual Types moduleType() const =0
void beginStream(StreamID iID, StreamContext &streamContext)
void clearCounters()
Clear all the counters in the trigger report.
WorkerPtr results_inserter_
unsigned int value() const
Definition: StreamID.h:46
boost::array< SelectedProducts, NumBranchTypes > SelectedProductsForBranchType
std::vector< unsigned int > earlyDeleteHelperToBranchIndicies_
void forAllModuleHolders(F iFunc)
boost::shared_ptr< HLTGlobalStatus > TrigResPtr
bool search_all(ForwardSequence const &s, Datum const &d)
Definition: Algorithms.h:46
int timesVisited(size_type i) const
Definition: Path.h:88
double b
Definition: hdecay.h:120
std::pair< double, double > timeCpuReal() const
Definition: Path.h:67
StreamSchedule(TriggerResultInserter *inserter, boost::shared_ptr< ModuleRegistry >, ParameterSet &proc_pset, service::TriggerNamesService &tns, PreallocationConfiguration const &prealloc, ProductRegistry &pregistry, BranchIDListHelper &branchIDListHelper, ExceptionToActionTable const &actions, boost::shared_ptr< ActivityRegistry > areg, boost::shared_ptr< ProcessConfiguration > processConfiguration, bool allowEarlyDelete, StreamID streamID, ProcessContext const *processContext)
void setTrigResultForStream(unsigned int iStreamIndex, const TrigResPtr &trptr)
void getTriggerTimingReport(TriggerTimingReport &rep) const
static void fillWorkerTimingSummaryAux(Worker const &w, WorkerTimingSummary &sum)
int timesFailed() const
Definition: Worker.h:125
static void fillWorkerSummary(Worker const *pw, WorkerSummary &sum)
bool endPathsEnabled() const
std::string const & name() const
Definition: Path.h:65
#define begin
Definition: vmac.h:30
Worker const * getWorker(size_type i) const
Definition: Path.h:92
string func
Definition: statics.py:48
void replaceModule(maker::ModuleHolder *iMod, std::string const &iLabel)
clone the type of module with label iLabel but configure with iPSet.
void enableEndPaths(bool active)
std::vector< ModuleInPathSummary > moduleInPathSummaries
Definition: TriggerReport.h:45
T w() const
void addToAllWorkers(Worker *w, bool useStopwatch)
preg
Definition: Schedule.cc:362
std::vector< WorkerTimingSummary > workerSummaries
void fillTrigPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, boost::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, TrigResPtr, vstring *labelsOnTriggerPaths)
std::vector< std::string > vstring
Definition: Schedule.cc:347
tuple size
Write out results.
static Registry * instance()
Definition: Registry.cc:14
void clearCounters()
Definition: Path.cc:163
void availablePaths(std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill the labels for all paths in the process
std::vector< EarlyDeleteHelper > earlyDeleteHelpers_
void modulesInPath(std::string const &iPathLabel, std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel ...
prealloc
Definition: Schedule.cc:362
int timesFailed() const
Definition: Path.h:82
ParameterSet * getPSetForUpdate(std::string const &name, bool &isTracked)