CMS 3D CMS Logo

StreamSchedule.cc
Go to the documentation of this file.
2 
28 
30 
31 #include <algorithm>
32 #include <cassert>
33 #include <cstdlib>
34 #include <functional>
35 #include <iomanip>
36 #include <list>
37 #include <map>
38 #include <exception>
39 
40 namespace edm {
41  namespace {
42 
43  // Function template to transform each element in the input range to
44  // a value placed into the output range. The supplied function
45  // should take a const_reference to the 'input', and write to a
46  // reference to the 'output'.
47  template <typename InputIterator, typename ForwardIterator, typename Func>
48  void
49  transform_into(InputIterator begin, InputIterator end,
50  ForwardIterator out, Func func) {
51  for (; begin != end; ++begin, ++out) func(*begin, *out);
52  }
53 
54  // Function template that takes a sequence 'from', a sequence
55  // 'to', and a callable object 'func'. It and applies
56  // transform_into to fill the 'to' sequence with the values
57  // calcuated by the callable object, taking care to fill the
58  // outupt only if all calls succeed.
59  template <typename FROM, typename TO, typename FUNC>
60  void
61  fill_summary(FROM const& from, TO& to, FUNC func) {
62  if(to.size()!=from.size()) {
63  TO temp(from.size());
64  transform_into(from.begin(), from.end(), temp.begin(), func);
65  to.swap(temp);
66  } else {
67  transform_into(from.begin(), from.end(), to.begin(), func);
68  }
69  }
70 
71  // -----------------------------
72 
73  // Here we make the trigger results inserter directly. This should
74  // probably be a utility in the WorkerRegistry or elsewhere.
75 
77  makeInserter(ExceptionToActionTable const& actions,
78  std::shared_ptr<ActivityRegistry> areg,
79  std::shared_ptr<TriggerResultInserter> inserter) {
80  StreamSchedule::WorkerPtr ptr(new edm::WorkerT<TriggerResultInserter::ModuleType>(inserter, inserter->moduleDescription(), &actions));
81  ptr->setActivityRegistry(areg);
82  return ptr;
83  }
84 
85  void
86  initializeBranchToReadingWorker(ParameterSet const& opts,
87  ProductRegistry const& preg,
88  std::multimap<std::string,Worker*>& branchToReadingWorker)
89  {
90  // See if any data has been marked to be deleted early (removing any duplicates)
91  auto vBranchesToDeleteEarly = opts.getUntrackedParameter<std::vector<std::string>>("canDeleteEarly");
92  if(not vBranchesToDeleteEarly.empty()) {
93  std::sort(vBranchesToDeleteEarly.begin(),vBranchesToDeleteEarly.end(),std::less<std::string>());
94  vBranchesToDeleteEarly.erase(std::unique(vBranchesToDeleteEarly.begin(),vBranchesToDeleteEarly.end()),
95  vBranchesToDeleteEarly.end());
96 
97  // Are the requested items in the product registry?
98  auto allBranchNames = preg.allBranchNames();
99  //the branch names all end with a period, which we do not want to compare with
100  for(auto & b:allBranchNames) {
101  b.resize(b.size()-1);
102  }
103  std::sort(allBranchNames.begin(),allBranchNames.end(),std::less<std::string>());
104  std::vector<std::string> temp;
105  temp.reserve(vBranchesToDeleteEarly.size());
106 
107  std::set_intersection(vBranchesToDeleteEarly.begin(),vBranchesToDeleteEarly.end(),
108  allBranchNames.begin(),allBranchNames.end(),
109  std::back_inserter(temp));
110  vBranchesToDeleteEarly.swap(temp);
111  if(temp.size() != vBranchesToDeleteEarly.size()) {
112  std::vector<std::string> missingProducts;
113  std::set_difference(temp.begin(),temp.end(),
114  vBranchesToDeleteEarly.begin(),vBranchesToDeleteEarly.end(),
115  std::back_inserter(missingProducts));
116  LogInfo l("MissingProductsForCanDeleteEarly");
117  l<<"The following products in the 'canDeleteEarly' list are not available in this job and will be ignored.";
118  for(auto const& n:missingProducts){
119  l<<"\n "<<n;
120  }
121  }
122  //set placeholder for the branch, we will remove the nullptr if a
123  // module actually wants the branch.
124  for(auto const& branch:vBranchesToDeleteEarly) {
125  branchToReadingWorker.insert(std::make_pair(branch, static_cast<Worker*>(nullptr)));
126  }
127  }
128  }
129  }
130 
131  // -----------------------------
132 
133  typedef std::vector<std::string> vstring;
134 
135  // -----------------------------
136 
137  StreamSchedule::StreamSchedule(std::shared_ptr<TriggerResultInserter> inserter,
138  std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
139  std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
140  std::shared_ptr<ModuleRegistry> modReg,
141  ParameterSet& proc_pset,
142  service::TriggerNamesService const& tns,
143  PreallocationConfiguration const& prealloc,
144  ProductRegistry& preg,
145  BranchIDListHelper& branchIDListHelper,
146  ExceptionToActionTable const& actions,
147  std::shared_ptr<ActivityRegistry> areg,
148  std::shared_ptr<ProcessConfiguration> processConfiguration,
149  bool allowEarlyDelete,
150  StreamID streamID,
151  ProcessContext const* processContext) :
152  workerManager_(modReg,areg, actions),
153  actReg_(areg),
154  results_(new HLTGlobalStatus(tns.getTrigPaths().size())),
155  results_inserter_(),
156  trig_paths_(),
157  end_paths_(),
158  total_events_(),
159  total_passed_(),
160  number_of_unscheduled_modules_(0),
161  streamID_(streamID),
162  streamContext_(streamID_, processContext),
163  endpathsAreActive_(true),
164  skippingEvent_(false){
165 
166  ParameterSet const& opts = proc_pset.getUntrackedParameterSet("options", ParameterSet());
167  bool hasPath = false;
168  std::vector<std::string> const& pathNames = tns.getTrigPaths();
169  std::vector<std::string> const& endPathNames = tns.getEndPaths();
170 
171  int trig_bitpos = 0;
172  trig_paths_.reserve(pathNames.size());
173  for (auto const& trig_name : pathNames) {
174  fillTrigPath(proc_pset, preg, &prealloc, processConfiguration, trig_bitpos, trig_name, results(), endPathNames);
175  ++trig_bitpos;
176  hasPath = true;
177  }
178 
179  if (hasPath) {
180  // the results inserter stands alone
181  inserter->setTrigResultForStream(streamID.value(), results());
182 
183  results_inserter_ = makeInserter(actions, actReg_, inserter);
185  }
186 
187  // fill normal endpaths
188  int bitpos = 0;
189  end_paths_.reserve(endPathNames.size());
190  for (auto const& end_path_name : endPathNames) {
191  fillEndPath(proc_pset, preg, &prealloc, processConfiguration, bitpos, end_path_name, endPathNames);
192  ++bitpos;
193  }
194 
195  makePathStatusInserters(pathStatusInserters, endPathStatusInserters, actions);
196 
197  //See if all modules were used
198  std::set<std::string> usedWorkerLabels;
199  for (auto const& worker : allWorkers()) {
200  usedWorkerLabels.insert(worker->description().moduleLabel());
201  }
202  std::vector<std::string> modulesInConfig(proc_pset.getParameter<std::vector<std::string> >("@all_modules"));
203  std::set<std::string> modulesInConfigSet(modulesInConfig.begin(), modulesInConfig.end());
204  std::vector<std::string> unusedLabels;
205  set_difference(modulesInConfigSet.begin(), modulesInConfigSet.end(),
206  usedWorkerLabels.begin(), usedWorkerLabels.end(),
207  back_inserter(unusedLabels));
208  std::set<std::string> unscheduledLabels;
209  std::vector<std::string> shouldBeUsedLabels;
210  if (!unusedLabels.empty()) {
211  //Need to
212  // 1) create worker
213  // 2) if it is a WorkerT<EDProducer>, add it to our list
214  // 3) hand list to our delayed reader
215  for (auto const& label : unusedLabels) {
216  bool isTracked;
217  ParameterSet* modulePSet(proc_pset.getPSetForUpdate(label, isTracked));
218  assert(isTracked);
219  assert(modulePSet != nullptr);
220  workerManager_.addToUnscheduledWorkers(*modulePSet, preg, &prealloc, processConfiguration, label, unscheduledLabels, shouldBeUsedLabels);
221  }
222  if (!shouldBeUsedLabels.empty()) {
223  std::ostringstream unusedStream;
224  unusedStream << "'" << shouldBeUsedLabels.front() << "'";
225  for (std::vector<std::string>::iterator itLabel = shouldBeUsedLabels.begin() + 1,
226  itLabelEnd = shouldBeUsedLabels.end();
227  itLabel != itLabelEnd;
228  ++itLabel) {
229  unusedStream << ",'" << *itLabel << "'";
230  }
231  LogInfo("path")
232  << "The following module labels are not assigned to any path:\n"
233  << unusedStream.str()
234  << "\n";
235  }
236  }
237  if (!unscheduledLabels.empty()) {
238  number_of_unscheduled_modules_=unscheduledLabels.size();
239  workerManager_.setOnDemandProducts(preg, unscheduledLabels);
240  }
241 
242 
243  initializeEarlyDelete(*modReg, opts,preg,allowEarlyDelete);
244 
245  } // StreamSchedule::StreamSchedule
246 
247 
249  edm::ParameterSet const& opts, edm::ProductRegistry const& preg,
250  bool allowEarlyDelete) {
251  //for now, if have a subProcess, don't allow early delete
252  // In the future we should use the SubProcess's 'keep list' to decide what can be kept
253  if(not allowEarlyDelete) return;
254 
255  //see if 'canDeleteEarly' was set and if so setup the list with those products actually
256  // registered for this job
257  std::multimap<std::string,Worker*> branchToReadingWorker;
258  initializeBranchToReadingWorker(opts,preg,branchToReadingWorker);
259 
260  //If no delete early items have been specified we don't have to do anything
261  if(branchToReadingWorker.empty()) {
262  return;
263  }
264  const std::vector<std::string> kEmpty;
265  std::map<Worker*,unsigned int> reserveSizeForWorker;
266  unsigned int upperLimitOnReadingWorker =0;
267  unsigned int upperLimitOnIndicies = 0;
268  unsigned int nUniqueBranchesToDelete=branchToReadingWorker.size();
269 
270  //talk with output modules first
271  modReg.forAllModuleHolders([&branchToReadingWorker,&nUniqueBranchesToDelete](maker::ModuleHolder* iHolder){
272  auto comm = iHolder->createOutputModuleCommunicator();
273  if (comm) {
274  if(!branchToReadingWorker.empty()) {
275  //If an OutputModule needs a product, we can't delete it early
276  // so we should remove it from our list
277  SelectedProductsForBranchType const& kept = comm->keptProducts();
278  for(auto const& item: kept[InEvent]) {
279  BranchDescription const& desc = *item.first;
280  auto found = branchToReadingWorker.equal_range(desc.branchName());
281  if(found.first !=found.second) {
282  --nUniqueBranchesToDelete;
283  branchToReadingWorker.erase(found.first,found.second);
284  }
285  }
286  }
287  }
288  });
289 
290  if(branchToReadingWorker.empty()) {
291  return;
292  }
293 
294  for (auto w :allWorkers()) {
295  //determine if this module could read a branch we want to delete early
296  auto pset = pset::Registry::instance()->getMapped(w->description().parameterSetID());
297  if(nullptr!=pset) {
298  auto branches = pset->getUntrackedParameter<std::vector<std::string>>("mightGet",kEmpty);
299  if(not branches.empty()) {
300  ++upperLimitOnReadingWorker;
301  }
302  for(auto const& branch:branches){
303  auto found = branchToReadingWorker.equal_range(branch);
304  if(found.first != found.second) {
305  ++upperLimitOnIndicies;
306  ++reserveSizeForWorker[w];
307  if(nullptr == found.first->second) {
308  found.first->second = w;
309  } else {
310  branchToReadingWorker.insert(make_pair(found.first->first,w));
311  }
312  }
313  }
314  }
315  }
316  {
317  auto it = branchToReadingWorker.begin();
318  std::vector<std::string> unusedBranches;
319  while(it !=branchToReadingWorker.end()) {
320  if(it->second == nullptr) {
321  unusedBranches.push_back(it->first);
322  //erasing the object invalidates the iterator so must advance it first
323  auto temp = it;
324  ++it;
325  branchToReadingWorker.erase(temp);
326  } else {
327  ++it;
328  }
329  }
330  if(not unusedBranches.empty()) {
331  LogWarning l("UnusedProductsForCanDeleteEarly");
332  l<<"The following products in the 'canDeleteEarly' list are not used in this job and will be ignored.\n"
333  " If possible, remove the producer from the job or add the product to the producer's own 'mightGet' list.";
334  for(auto const& n:unusedBranches){
335  l<<"\n "<<n;
336  }
337  }
338  }
339  if(!branchToReadingWorker.empty()) {
340  earlyDeleteHelpers_.reserve(upperLimitOnReadingWorker);
341  earlyDeleteHelperToBranchIndicies_.resize(upperLimitOnIndicies,0);
342  earlyDeleteBranchToCount_.reserve(nUniqueBranchesToDelete);
343  std::map<const Worker*,EarlyDeleteHelper*> alreadySeenWorkers;
344  std::string lastBranchName;
345  size_t nextOpenIndex = 0;
346  unsigned int* beginAddress = &(earlyDeleteHelperToBranchIndicies_.front());
347  for(auto& branchAndWorker:branchToReadingWorker) {
348  if(lastBranchName != branchAndWorker.first) {
349  //have to put back the period we removed earlier in order to get the proper name
350  BranchID bid(branchAndWorker.first+".");
351  earlyDeleteBranchToCount_.emplace_back(bid,0U);
352  lastBranchName = branchAndWorker.first;
353  }
354  auto found = alreadySeenWorkers.find(branchAndWorker.second);
355  if(alreadySeenWorkers.end() == found) {
356  //NOTE: we will set aside enough space in earlyDeleteHelperToBranchIndicies_ to accommodate
357  // all the branches that might be read by this worker. However, initially we will only tell the
358  // EarlyDeleteHelper about the first one. As additional branches are added via 'appendIndex' the
359  // EarlyDeleteHelper will automatically advance its internal end pointer.
360  size_t index = nextOpenIndex;
361  size_t nIndices = reserveSizeForWorker[branchAndWorker.second];
363  earlyDeleteHelpers_.emplace_back(beginAddress+index,
364  beginAddress+index+1,
366  branchAndWorker.second->setEarlyDeleteHelper(&(earlyDeleteHelpers_.back()));
367  alreadySeenWorkers.insert(std::make_pair(branchAndWorker.second,&(earlyDeleteHelpers_.back())));
368  nextOpenIndex +=nIndices;
369  } else {
370  found->second->appendIndex(earlyDeleteBranchToCount_.size()-1);
371  }
372  }
373 
374  //Now we can compactify the earlyDeleteHelperToBranchIndicies_ since we may have over estimated the
375  // space needed for each module
376  auto itLast = earlyDeleteHelpers_.begin();
377  for(auto it = earlyDeleteHelpers_.begin()+1;it != earlyDeleteHelpers_.end();++it) {
378  if(itLast->end() != it->begin()) {
379  //figure the offset for next Worker since it hasn't been moved yet so it has the original address
380  unsigned int delta = it->begin()- itLast->end();
381  it->shiftIndexPointers(delta);
382 
384  (itLast->end()-beginAddress),
386  (it->begin()-beginAddress));
387  }
388  itLast = it;
389  }
390  earlyDeleteHelperToBranchIndicies_.erase(earlyDeleteHelperToBranchIndicies_.begin()+(itLast->end()-beginAddress),
392 
393  //now tell the paths about the deleters
394  for(auto& p : trig_paths_) {
395  p.setEarlyDeleteHelpers(alreadySeenWorkers);
396  }
397  for(auto& p : end_paths_) {
398  p.setEarlyDeleteHelpers(alreadySeenWorkers);
399  }
401  }
402  }
403 
405  ProductRegistry& preg,
406  PreallocationConfiguration const* prealloc,
407  std::shared_ptr<ProcessConfiguration const> processConfiguration,
408  std::string const& pathName,
409  bool ignoreFilters,
410  PathWorkers& out,
411  std::vector<std::string> const& endPathNames) {
412  vstring modnames = proc_pset.getParameter<vstring>(pathName);
413  PathWorkers tmpworkers;
414 
415  unsigned int placeInPath = 0;
416  for (auto const& name : modnames) {
417 
419  if (name[0] == '!') filterAction = WorkerInPath::Veto;
420  else if (name[0] == '-') filterAction = WorkerInPath::Ignore;
421 
422  std::string moduleLabel = name;
423  if (filterAction != WorkerInPath::Normal) moduleLabel.erase(0, 1);
424 
425  bool isTracked;
426  ParameterSet* modpset = proc_pset.getPSetForUpdate(moduleLabel, isTracked);
427  if (modpset == nullptr) {
428  std::string pathType("endpath");
429  if (!search_all(endPathNames, pathName)) {
430  pathType = std::string("path");
431  }
433  "The unknown module label \"" << moduleLabel <<
434  "\" appears in " << pathType << " \"" << pathName <<
435  "\"\n please check spelling or remove that label from the path.";
436  }
437  assert(isTracked);
438 
439  Worker* worker = workerManager_.getWorker(*modpset, preg, prealloc, processConfiguration, moduleLabel);
440  if (ignoreFilters && filterAction != WorkerInPath::Ignore && worker->moduleType()==Worker::kFilter) {
441  // We have a filter on an end path, and the filter is not explicitly ignored.
442  // See if the filter is allowed.
443  std::vector<std::string> allowed_filters = proc_pset.getUntrackedParameter<vstring>("@filters_on_endpaths");
444  if (!search_all(allowed_filters, worker->description().moduleName())) {
445  // Filter is not allowed. Ignore the result, and issue a warning.
446  filterAction = WorkerInPath::Ignore;
447  LogWarning("FilterOnEndPath")
448  << "The EDFilter '" << worker->description().moduleName() << "' with module label '" << moduleLabel << "' appears on EndPath '" << pathName << "'.\n"
449  << "The return value of the filter will be ignored.\n"
450  << "To suppress this warning, either remove the filter from the endpath,\n"
451  << "or explicitly ignore it in the configuration by using cms.ignore().\n";
452  }
453  }
454  tmpworkers.emplace_back(worker, filterAction, placeInPath);
455  ++placeInPath;
456  }
457 
458  out.swap(tmpworkers);
459  }
460 
462  ProductRegistry& preg,
463  PreallocationConfiguration const* prealloc,
464  std::shared_ptr<ProcessConfiguration const> processConfiguration,
465  int bitpos, std::string const& name, TrigResPtr trptr,
466  std::vector<std::string> const& endPathNames) {
467  PathWorkers tmpworkers;
468  fillWorkers(proc_pset, preg, prealloc, processConfiguration, name, false, tmpworkers, endPathNames);
469 
470  // an empty path will cause an extra bit that is not used
471  if (!tmpworkers.empty()) {
472  trig_paths_.emplace_back(bitpos, name, tmpworkers, trptr, actionTable(), actReg_, &streamContext_, &skippingEvent_, PathContext::PathType::kPath);
473  } else {
474  empty_trig_paths_.push_back(bitpos);
475  }
476  for (WorkerInPath const& workerInPath : tmpworkers) {
477  addToAllWorkers(workerInPath.getWorker());
478  }
479  }
480 
482  ProductRegistry& preg,
483  PreallocationConfiguration const* prealloc,
484  std::shared_ptr<ProcessConfiguration const> processConfiguration,
485  int bitpos, std::string const& name,
486  std::vector<std::string> const& endPathNames) {
487  PathWorkers tmpworkers;
488  fillWorkers(proc_pset, preg, prealloc, processConfiguration, name, true, tmpworkers, endPathNames);
489 
490  if (!tmpworkers.empty()) {
491  //EndPaths are not supposed to stop if SkipEvent type exception happens
492  end_paths_.emplace_back(bitpos, name, tmpworkers, TrigResPtr(), actionTable(), actReg_, &streamContext_, nullptr, PathContext::PathType::kEndPath);
493  } else {
494  empty_end_paths_.push_back(bitpos);
495  }
496  for (WorkerInPath const& workerInPath : tmpworkers) {
497  addToAllWorkers(workerInPath.getWorker());
498  }
499  }
500 
503  }
504 
507  }
508 
510  std::string const& iLabel) {
511  Worker* found = nullptr;
512  for (auto const& worker : allWorkers()) {
513  if (worker->description().moduleLabel() == iLabel) {
514  found = worker;
515  break;
516  }
517  }
518  if (nullptr == found) {
519  return;
520  }
521 
522  iMod->replaceModuleFor(found);
524  }
525 
526  std::vector<ModuleDescription const*>
528  std::vector<ModuleDescription const*> result;
529  result.reserve(allWorkers().size());
530 
531  for (auto const& worker : allWorkers()) {
532  ModuleDescription const* p = worker->descPtr();
533  result.push_back(p);
534  }
535  return result;
536  }
537 
539  EventPrincipal& ep,
540  EventSetup const& es,
541  ServiceToken const& serviceToken,
542  std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters) {
543  try {
544  this->resetAll();
545 
547 
548  Traits::setStreamContext(streamContext_, ep);
549  //a service may want to communicate with another service
550  ServiceRegistry::Operate guard(serviceToken);
551  Traits::preScheduleSignal(actReg_.get(), &streamContext_);
552 
553  HLTPathStatus hltPathStatus(hlt::Pass, 0);
554  for (int empty_trig_path : empty_trig_paths_) {
555  results_->at(empty_trig_path) = hltPathStatus;
556  pathStatusInserters[empty_trig_path]->setPathStatus(streamID_, hltPathStatus);
557  std::exception_ptr except = pathStatusInserterWorkers_[empty_trig_path]->runModuleDirectly<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>>(
559  );
560  if (except) {
561  iTask.doneWaiting(except);
562  return;
563  }
564  }
565  for (int empty_end_path : empty_end_paths_) {
566  std::exception_ptr except = endPathStatusInserterWorkers_[empty_end_path]->runModuleDirectly<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>>(
568  );
569  if (except) {
570  iTask.doneWaiting(except);
571  return;
572  }
573  }
574 
575  // This call takes care of the unscheduled processing.
577 
578  ++total_events_;
579  auto allPathsDone = make_waiting_task(tbb::task::allocate_root(),
580  [iTask,this,serviceToken](std::exception_ptr const* iPtr) mutable
581  {
582  ServiceRegistry::Operate operate(serviceToken);
583 
584  std::exception_ptr ptr;
585  if(iPtr) {
586  ptr = *iPtr;
587  }
589  });
590  //The holder guarantees that if the paths finish before the loop ends
591  // that we do not start too soon. It also guarantees that the task will
592  // run under that condition.
593  WaitingTaskHolder allPathsHolder(allPathsDone);
594 
595  auto pathsDone = make_waiting_task(tbb::task::allocate_root(),
596  [allPathsHolder,&ep, &es, this,serviceToken](std::exception_ptr const* iPtr) mutable
597  {
598  ServiceRegistry::Operate operate(serviceToken);
599 
600  std::exception_ptr ptr;
601  if(iPtr) {
602  ptr = *iPtr;
603  }
604  finishedPaths(ptr, std::move(allPathsHolder), ep, es);
605  });
606 
607  //The holder guarantees that if the paths finish before the loop ends
608  // that we do not start too soon. It also guarantees that the task will
609  // run under that condition.
610  WaitingTaskHolder taskHolder(pathsDone);
611 
612  //start end paths first so on single threaded the paths will run first
613  for(auto it = end_paths_.rbegin(), itEnd = end_paths_.rend();
614  it != itEnd; ++it) {
615  it->processOneOccurrenceAsync(allPathsDone,ep, es, serviceToken, streamID_, &streamContext_);
616  }
617 
618  for(auto it = trig_paths_.rbegin(), itEnd = trig_paths_.rend();
619  it != itEnd; ++ it) {
620  it->processOneOccurrenceAsync(pathsDone,ep, es, serviceToken, streamID_, &streamContext_);
621  }
622 
623  ParentContext parentContext(&streamContext_);
625  allPathsDone, ep, es, serviceToken, streamID_, parentContext, &streamContext_);
626  }catch (...) {
627  iTask.doneWaiting(std::current_exception());
628  }
629  }
630 
631  void
632  StreamSchedule::finishedPaths(std::exception_ptr iExcept, WaitingTaskHolder iWait, EventPrincipal& ep,
633  EventSetup const& es) {
634 
635  if(iExcept) {
636  try {
637  std::rethrow_exception(iExcept);
638  }
639  catch(cms::Exception& e) {
641  assert (action != exception_actions::IgnoreCompletely);
642  assert (action != exception_actions::FailPath);
643  if (action == exception_actions::SkipEvent) {
644  edm::printCmsExceptionWarning("SkipEvent", e);
645  iExcept = std::exception_ptr();
646  } else {
647  iExcept = std::current_exception();
648  }
649  }
650  catch(...) {
651  iExcept = std::current_exception();
652  }
653  }
654 
655 
656  if((not iExcept) and results_->accept()) {
657  ++total_passed_;
658  }
659 
660  if(nullptr != results_inserter_.get()) {
661  try {
662  //Even if there was an exception, we need to allow results inserter
663  // to run since some module may be waiting on its results.
664  ParentContext parentContext(&streamContext_);
666 
667  results_inserter_->doWork<Traits>(ep, es, streamID_, parentContext, &streamContext_);
668  }
669  catch (cms::Exception & ex) {
670  if (not iExcept) {
671  if(ex.context().empty()) {
672  std::ostringstream ost;
673  ost << "Processing Event " << ep.id();
674  ex.addContext(ost.str());
675  }
676  iExcept = std::current_exception();
677  }
678  }
679  catch(...) {
680  if (not iExcept) {
681  iExcept = std::current_exception();
682  }
683  }
684  }
685  iWait.doneWaiting(iExcept);
686  }
687 
688 
689  std::exception_ptr
690  StreamSchedule::finishProcessOneEvent(std::exception_ptr iExcept) {
692 
693  if(iExcept) {
694  //add context information to the exception and print message
695  try {
696  convertException::wrap([&]() {
697  std::rethrow_exception(iExcept);
698  });
699  } catch(cms::Exception& ex) {
700  bool const cleaningUpAfterException = false;
701  if (ex.context().empty()) {
702  addContextAndPrintException("Calling function StreamSchedule::processOneEvent", ex, cleaningUpAfterException);
703  } else {
704  addContextAndPrintException("", ex, cleaningUpAfterException);
705  }
706  iExcept = std::current_exception();
707  }
708 
709  actReg_->preStreamEarlyTerminationSignal_(streamContext_,TerminationOrigin::ExceptionFromThisContext);
710  }
711 
712  try {
713  Traits::postScheduleSignal(actReg_.get(), &streamContext_);
714  } catch(...) {
715  if(not iExcept) {
716  iExcept = std::current_exception();
717  }
718  }
719  if(not iExcept ) {
721  }
722 
723  return iExcept;
724  }
725 
726  void
727  StreamSchedule::availablePaths(std::vector<std::string>& oLabelsToFill) const {
728  oLabelsToFill.reserve(trig_paths_.size());
729  std::transform(trig_paths_.begin(),
730  trig_paths_.end(),
731  std::back_inserter(oLabelsToFill),
732  std::bind(&Path::name, std::placeholders::_1));
733  }
734 
735  void
737  std::vector<std::string>& oLabelsToFill) const {
738  TrigPaths::const_iterator itFound =
739  std::find_if (trig_paths_.begin(),
740  trig_paths_.end(),
741  std::bind(std::equal_to<std::string>(),
742  iPathLabel,
743  std::bind(&Path::name, std::placeholders::_1)));
744  if (itFound!=trig_paths_.end()) {
745  oLabelsToFill.reserve(itFound->size());
746  for (size_t i = 0; i < itFound->size(); ++i) {
747  oLabelsToFill.push_back(itFound->getWorker(i)->description().moduleLabel());
748  }
749  }
750  }
751 
752  void
754  std::vector<ModuleDescription const*>& descriptions,
755  unsigned int hint) const {
756  descriptions.clear();
757  bool found = false;
758  TrigPaths::const_iterator itFound;
759 
760  if(hint < trig_paths_.size()) {
761  itFound = trig_paths_.begin() + hint;
762  if(itFound->name() == iPathLabel) found = true;
763  }
764  if(!found) {
765  // if the hint did not work, do it the slow way
766  itFound = std::find_if (trig_paths_.begin(),
767  trig_paths_.end(),
768  std::bind(std::equal_to<std::string>(),
769  iPathLabel,
770  std::bind(&Path::name, std::placeholders::_1)));
771  if (itFound != trig_paths_.end()) found = true;
772  }
773  if (found) {
774  descriptions.reserve(itFound->size());
775  for (size_t i = 0; i < itFound->size(); ++i) {
776  descriptions.push_back(itFound->getWorker(i)->descPtr());
777  }
778  }
779  }
780 
781  void
783  std::vector<ModuleDescription const*>& descriptions,
784  unsigned int hint) const {
785  descriptions.clear();
786  bool found = false;
787  TrigPaths::const_iterator itFound;
788 
789  if(hint < end_paths_.size()) {
790  itFound = end_paths_.begin() + hint;
791  if(itFound->name() == iEndPathLabel) found = true;
792  }
793  if(!found) {
794  // if the hint did not work, do it the slow way
795  itFound = std::find_if (end_paths_.begin(),
796  end_paths_.end(),
797  std::bind(std::equal_to<std::string>(),
798  iEndPathLabel,
799  std::bind(&Path::name, std::placeholders::_1)));
800  if (itFound != end_paths_.end()) found = true;
801  }
802  if (found) {
803  descriptions.reserve(itFound->size());
804  for (size_t i = 0; i < itFound->size(); ++i) {
805  descriptions.push_back(itFound->getWorker(i)->descPtr());
806  }
807  }
808  }
809 
810  void
812  endpathsAreActive_ = active;
813  }
814 
815  bool
817  return endpathsAreActive_;
818  }
819 
820  static void
822  size_t which,
823  ModuleInPathSummary& sum) {
824  sum.timesVisited += path.timesVisited(which);
825  sum.timesPassed += path.timesPassed(which);
826  sum.timesFailed += path.timesFailed(which);
827  sum.timesExcept += path.timesExcept(which);
828  sum.moduleLabel = path.getWorker(which)->description().moduleLabel();
829  }
830 
831  static void
833  sum.name = path.name();
834  sum.bitPosition = path.bitPosition();
835  sum.timesRun += path.timesRun();
836  sum.timesPassed += path.timesPassed();
837  sum.timesFailed += path.timesFailed();
838  sum.timesExcept += path.timesExcept();
839 
840  Path::size_type sz = path.size();
841  if(sum.moduleInPathSummaries.empty()) {
842  std::vector<ModuleInPathSummary> temp(sz);
843  for (size_t i = 0; i != sz; ++i) {
844  fillModuleInPathSummary(path, i, temp[i]);
845  }
846  sum.moduleInPathSummaries.swap(temp);
847  } else {
848  assert(sz == sum.moduleInPathSummaries.size());
849  for (size_t i = 0; i != sz; ++i) {
851  }
852  }
853  }
854 
855  static void
857  sum.timesVisited += w.timesVisited();
858  sum.timesRun += w.timesRun();
859  sum.timesPassed += w.timesPassed();
860  sum.timesFailed += w.timesFailed();
861  sum.timesExcept += w.timesExcept();
862  sum.moduleLabel = w.description().moduleLabel();
863  }
864 
865  static void
867  fillWorkerSummaryAux(*pw, sum);
868  }
869 
870  void
875 
876  fill_summary(trig_paths_, rep.trigPathSummaries, &fillPathSummary);
877  fill_summary(end_paths_, rep.endPathSummaries, &fillPathSummary);
878  fill_summary(allWorkers(), rep.workerSummaries, &fillWorkerSummary);
879  }
880 
881  void
883  using std::placeholders::_1;
885  for_all(trig_paths_, std::bind(&Path::clearCounters, _1));
886  for_all(end_paths_, std::bind(&Path::clearCounters, _1));
887  for_all(allWorkers(), std::bind(&Worker::clearCounters, _1));
888  }
889 
890  void
892  skippingEvent_ = false;
893  results_->reset();
894  }
895 
896  void
899  }
900 
901  void
903  //must be sure we have cleared the count first
904  for(auto& count:earlyDeleteBranchToCount_) {
905  count.count = 0;
906  }
907  //now reset based on how many helpers use that branch
909  ++(earlyDeleteBranchToCount_[index].count);
910  }
911  for(auto& helper: earlyDeleteHelpers_) {
912  helper.reset();
913  }
914  }
915 
916  void
918  std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
919  std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
920  ExceptionToActionTable const& actions) {
921 
922  int bitpos = 0;
923  unsigned int indexEmpty = 0;
924  unsigned int indexOfPath = 0;
925  for(auto & pathStatusInserter : pathStatusInserters) {
926  std::shared_ptr<PathStatusInserter> inserterPtr = get_underlying(pathStatusInserter);
928  inserterPtr->moduleDescription(),
929  &actions));
930  pathStatusInserterWorkers_.emplace_back(workerPtr);
931  workerPtr->setActivityRegistry(actReg_);
932  addToAllWorkers(workerPtr.get());
933 
934  // A little complexity here because a C++ Path object is not
935  // instantiated and put into end_paths if there are no modules
936  // on the configured path.
937  if (indexEmpty < empty_trig_paths_.size() && bitpos == empty_trig_paths_.at(indexEmpty)) {
938  ++indexEmpty;
939  } else {
940  trig_paths_.at(indexOfPath).setPathStatusInserter(inserterPtr.get(),
941  workerPtr.get());
942  ++indexOfPath;
943  }
944  ++bitpos;
945  }
946 
947  bitpos = 0;
948  indexEmpty = 0;
949  indexOfPath = 0;
950  for(auto & endPathStatusInserter : endPathStatusInserters) {
951  std::shared_ptr<EndPathStatusInserter> inserterPtr = get_underlying(endPathStatusInserter);
953  inserterPtr->moduleDescription(),
954  &actions));
955  endPathStatusInserterWorkers_.emplace_back(workerPtr);
956  workerPtr->setActivityRegistry(actReg_);
957  addToAllWorkers(workerPtr.get());
958 
959  // A little complexity here because a C++ Path object is not
960  // instantiated and put into end_paths if there are no modules
961  // on the configured path.
962  if (indexEmpty < empty_end_paths_.size() && bitpos == empty_end_paths_.at(indexEmpty)) {
963  ++indexEmpty;
964  } else {
965  end_paths_.at(indexOfPath).setPathStatusInserter(nullptr,
966  workerPtr.get());
967  ++indexOfPath;
968  }
969  ++bitpos;
970  }
971  }
972 }
static void fillModuleInPathSummary(Path const &path, size_t which, ModuleInPathSummary &sum)
size
Write out results.
dbl * delta
Definition: mlp_gen.cc:36
void moduleDescriptionsInEndPath(std::string const &iEndPathLabel, std::vector< ModuleDescription const * > &descriptions, unsigned int hint) const
T getParameter(std::string const &) const
std::vector< PathSummary > endPathSummaries
Definition: TriggerReport.h:68
T getUntrackedParameter(std::string const &, T const &) const
std::string const & branchName() const
int timesRun() const
Definition: Path.h:79
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
int bitPosition() const
Definition: Path.h:74
ModuleDescription const & description() const
Definition: Worker.h:188
std::vector< int > empty_trig_paths_
Definition: helper.py:1
void moduleDescriptionsInPath(std::string const &iPathLabel, std::vector< ModuleDescription const * > &descriptions, unsigned int hint) const
roAction_t actions[nactions]
Definition: GenABIO.cc:187
void fillTrigPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, TrigResPtr, std::vector< std::string > const &endPathNames)
std::vector< edm::propagate_const< WorkerPtr > > pathStatusInserterWorkers_
void endStream(StreamID iID, StreamContext &streamContext)
const double w
Definition: UKUtility.cc:23
virtual void replaceModuleFor(Worker *) const =0
Strings const & getEndPaths() const
int totalEventsFailed() const
std::array< SelectedProducts, NumBranchTypes > SelectedProductsForBranchType
void initializeEarlyDelete(ModuleRegistry &modReg, edm::ParameterSet const &opts, edm::ProductRegistry const &preg, bool allowEarlyDelete)
std::shared_ptr< HLTGlobalStatus > TrigResPtr
void setOnDemandProducts(ProductRegistry &pregistry, std::set< std::string > const &unscheduledLabels) const
WorkersInPath::size_type size_type
Definition: Path.h:49
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
void addToUnscheduledWorkers(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration > processConfiguration, std::string label, std::set< std::string > &unscheduledLabels, std::vector< std::string > &shouldBeUsedLabels)
int timesPassed() const
Definition: Worker.h:226
volatile bool endpathsAreActive_
std::vector< BranchToCount > earlyDeleteBranchToCount_
EventID const & id() const
int totalEvents() const
void clearCounters()
Definition: Worker.h:212
void addContextAndPrintException(char const *context, cms::Exception &ex, bool disablePrint)
std::string const & moduleName() const
std::string const & category() const
Definition: Exception.cc:183
void addToAllWorkers(Worker *w)
int timesExcept() const
Definition: Path.h:82
exception_actions::ActionCodes find(const std::string &category) const
size_type size() const
Definition: Path.h:86
ParameterSet getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
edm::propagate_const< WorkerPtr > results_inserter_
ExceptionToActionTable const & actionTable() const
returns the action table
void beginStream(StreamID id, StreamContext &streamContext)
Definition: Worker.cc:321
std::shared_ptr< Worker > WorkerPtr
std::vector< WorkerSummary > workerSummaries
Definition: TriggerReport.h:69
void fillWorkers(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &name, bool ignoreFilters, PathWorkers &out, std::vector< std::string > const &endPathNames)
std::string const & moduleLabel() const
unsigned int number_of_unscheduled_modules_
WorkerManager workerManager_
std::shared_ptr< ActivityRegistry > actReg_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
std::string moduleLabel
Definition: TriggerReport.h:56
void fillEndPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, std::vector< std::string > const &endPathNames)
std::vector< WorkerInPath > PathWorkers
int timesExcept() const
Definition: Worker.h:228
int totalEventsPassed() const
std::vector< int > empty_end_paths_
void getTriggerReport(TriggerReport &rep) const
void doneWaiting(std::exception_ptr iExcept)
std::vector< PathSummary > trigPathSummaries
Definition: TriggerReport.h:67
EventSummary eventSummary
Definition: TriggerReport.h:66
accept
Definition: HLTenums.h:19
T & get_underlying(propagate_const< T > &)
int timesVisited() const
Definition: Worker.h:225
std::string name
Definition: TriggerReport.h:44
def unique(seq, keepstr=True)
Definition: tier0.py:24
Definition: Path.h:44
StreamContext streamContext_
std::list< std::string > const & context() const
Definition: Exception.cc:191
static void fillPathSummary(Path const &path, PathSummary &sum)
int timesPassed() const
Definition: Path.h:80
void processOneEventAsync(WaitingTaskHolder iTask, EventPrincipal &ep, EventSetup const &es, ServiceToken const &token, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters)
bool getMapped(key_type const &k, value_type &result) const
Definition: Registry.cc:19
int timesRun() const
Definition: Worker.h:224
#define end
Definition: vmac.h:39
rep
Definition: cuy.py:1188
std::exception_ptr finishProcessOneEvent(std::exception_ptr)
element_type const * get() const
static void fillWorkerSummaryAux(Worker const &w, WorkerSummary &sum)
void beginStream(StreamID iID, StreamContext &streamContext)
void clearCounters()
Clear all the counters in the trigger report.
unsigned int value() const
Definition: StreamID.h:46
std::vector< unsigned int > earlyDeleteHelperToBranchIndicies_
void forAllModuleHolders(F iFunc)
edm::propagate_const< TrigResPtr > results_
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:92
bool search_all(ForwardSequence const &s, Datum const &d)
Definition: Algorithms.h:46
int timesVisited(size_type i) const
Definition: Path.h:87
double b
Definition: hdecay.h:120
void addContext(std::string const &context)
Definition: Exception.cc:227
virtual std::unique_ptr< OutputModuleCommunicator > createOutputModuleCommunicator()=0
int timesFailed() const
Definition: Worker.h:227
static void fillWorkerSummary(Worker const *pw, WorkerSummary &sum)
bool endPathsEnabled() const
StreamSchedule(std::shared_ptr< TriggerResultInserter > inserter, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, std::shared_ptr< ModuleRegistry >, ParameterSet &proc_pset, service::TriggerNamesService const &tns, PreallocationConfiguration const &prealloc, ProductRegistry &pregistry, BranchIDListHelper &branchIDListHelper, ExceptionToActionTable const &actions, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration > processConfiguration, bool allowEarlyDelete, StreamID streamID, ProcessContext const *processContext)
std::atomic< bool > skippingEvent_
std::string const & name() const
Definition: Path.h:75
#define begin
Definition: vmac.h:32
HLT enums.
Strings const & getTrigPaths() const
Worker const * getWorker(size_type i) const
Definition: Path.h:91
void replaceModule(maker::ModuleHolder *iMod, std::string const &iLabel)
clone the type of module with label iLabel but configure with iPSet.
std::vector< edm::propagate_const< WorkerPtr > > endPathStatusInserterWorkers_
void enableEndPaths(bool active)
void makePathStatusInserters(std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, ExceptionToActionTable const &actions)
def which(cmd)
Definition: eostools.py:335
std::vector< ModuleInPathSummary > moduleInPathSummaries
Definition: TriggerReport.h:45
virtual Types moduleType() const =0
auto wrap(F iFunc) -> decltype(iFunc())
void finishedPaths(std::exception_ptr, WaitingTaskHolder, EventPrincipal &ep, EventSetup const &es)
void setupOnDemandSystem(Principal &principal, EventSetup const &es)
void processAccumulatorsAsync(WaitingTask *task, typename T::MyPrincipal const &ep, EventSetup const &es, ServiceToken const &token, StreamID streamID, ParentContext const &parentContext, typename T::Context const *context)
TrigResConstPtr results() const
std::vector< std::string > vstring
Definition: Schedule.cc:430
def move(src, dest)
Definition: eostools.py:510
static Registry * instance()
Definition: Registry.cc:13
Worker * getWorker(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &label)
void clearCounters()
Definition: Path.cc:171
void availablePaths(std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill the labels for all paths in the process
std::vector< EarlyDeleteHelper > earlyDeleteHelpers_
void modulesInPath(std::string const &iPathLabel, std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel ...
void addToAllWorkers(Worker *w)
int timesFailed() const
Definition: Path.h:81
void printCmsExceptionWarning(char const *behavior, cms::Exception const &e)
def operate(timelog, memlog, json_f, num)
ParameterSet * getPSetForUpdate(std::string const &name, bool &isTracked)