CMS 3D CMS Logo

StreamSchedule.cc
Go to the documentation of this file.
2 
28 
29 #include <algorithm>
30 #include <cassert>
31 #include <cstdlib>
32 #include <functional>
33 #include <iomanip>
34 #include <list>
35 #include <map>
36 #include <exception>
37 
38 namespace edm {
39  namespace {
40 
41  // Function template to transform each element in the input range to
42  // a value placed into the output range. The supplied function
43  // should take a const_reference to the 'input', and write to a
44  // reference to the 'output'.
45  template <typename InputIterator, typename ForwardIterator, typename Func>
46  void
47  transform_into(InputIterator begin, InputIterator end,
48  ForwardIterator out, Func func) {
49  for (; begin != end; ++begin, ++out) func(*begin, *out);
50  }
51 
52  // Function template that takes a sequence 'from', a sequence
53  // 'to', and a callable object 'func'. It and applies
54  // transform_into to fill the 'to' sequence with the values
55  // calcuated by the callable object, taking care to fill the
56  // outupt only if all calls succeed.
57  template <typename FROM, typename TO, typename FUNC>
58  void
59  fill_summary(FROM const& from, TO& to, FUNC func) {
60  if(to.size()!=from.size()) {
61  TO temp(from.size());
62  transform_into(from.begin(), from.end(), temp.begin(), func);
63  to.swap(temp);
64  } else {
65  transform_into(from.begin(), from.end(), to.begin(), func);
66  }
67  }
68 
69  // -----------------------------
70 
71  // Here we make the trigger results inserter directly. This should
72  // probably be a utility in the WorkerRegistry or elsewhere.
73 
75  makeInserter(ExceptionToActionTable const& actions,
76  std::shared_ptr<ActivityRegistry> areg,
77  std::shared_ptr<TriggerResultInserter> inserter) {
78  StreamSchedule::WorkerPtr ptr(new edm::WorkerT<TriggerResultInserter::ModuleType>(inserter, inserter->moduleDescription(), &actions));
79  ptr->setActivityRegistry(areg);
80  return ptr;
81  }
82 
83  void
84  initializeBranchToReadingWorker(ParameterSet const& opts,
85  ProductRegistry const& preg,
86  std::multimap<std::string,Worker*>& branchToReadingWorker)
87  {
88  // See if any data has been marked to be deleted early (removing any duplicates)
89  auto vBranchesToDeleteEarly = opts.getUntrackedParameter<std::vector<std::string>>("canDeleteEarly",std::vector<std::string>());
90  if(not vBranchesToDeleteEarly.empty()) {
91  std::sort(vBranchesToDeleteEarly.begin(),vBranchesToDeleteEarly.end(),std::less<std::string>());
92  vBranchesToDeleteEarly.erase(std::unique(vBranchesToDeleteEarly.begin(),vBranchesToDeleteEarly.end()),
93  vBranchesToDeleteEarly.end());
94 
95  // Are the requested items in the product registry?
96  auto allBranchNames = preg.allBranchNames();
97  //the branch names all end with a period, which we do not want to compare with
98  for(auto & b:allBranchNames) {
99  b.resize(b.size()-1);
100  }
101  std::sort(allBranchNames.begin(),allBranchNames.end(),std::less<std::string>());
102  std::vector<std::string> temp;
103  temp.reserve(vBranchesToDeleteEarly.size());
104 
105  std::set_intersection(vBranchesToDeleteEarly.begin(),vBranchesToDeleteEarly.end(),
106  allBranchNames.begin(),allBranchNames.end(),
107  std::back_inserter(temp));
108  vBranchesToDeleteEarly.swap(temp);
109  if(temp.size() != vBranchesToDeleteEarly.size()) {
110  std::vector<std::string> missingProducts;
111  std::set_difference(temp.begin(),temp.end(),
112  vBranchesToDeleteEarly.begin(),vBranchesToDeleteEarly.end(),
113  std::back_inserter(missingProducts));
114  LogInfo l("MissingProductsForCanDeleteEarly");
115  l<<"The following products in the 'canDeleteEarly' list are not available in this job and will be ignored.";
116  for(auto const& n:missingProducts){
117  l<<"\n "<<n;
118  }
119  }
120  //set placeholder for the branch, we will remove the nullptr if a
121  // module actually wants the branch.
122  for(auto const& branch:vBranchesToDeleteEarly) {
123  branchToReadingWorker.insert(std::make_pair(branch, static_cast<Worker*>(nullptr)));
124  }
125  }
126  }
127  }
128 
129  // -----------------------------
130 
131  typedef std::vector<std::string> vstring;
132 
133  // -----------------------------
134 
135  StreamSchedule::StreamSchedule(std::shared_ptr<TriggerResultInserter> inserter,
136  std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
137  std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
138  std::shared_ptr<ModuleRegistry> modReg,
139  ParameterSet& proc_pset,
140  service::TriggerNamesService const& tns,
141  PreallocationConfiguration const& prealloc,
142  ProductRegistry& preg,
143  BranchIDListHelper& branchIDListHelper,
144  ExceptionToActionTable const& actions,
145  std::shared_ptr<ActivityRegistry> areg,
146  std::shared_ptr<ProcessConfiguration> processConfiguration,
147  bool allowEarlyDelete,
148  StreamID streamID,
149  ProcessContext const* processContext) :
150  workerManager_(modReg,areg, actions),
151  actReg_(areg),
152  results_(new HLTGlobalStatus(tns.getTrigPaths().size())),
153  results_inserter_(),
154  trig_paths_(),
155  end_paths_(),
156  total_events_(),
157  total_passed_(),
158  number_of_unscheduled_modules_(0),
159  streamID_(streamID),
160  streamContext_(streamID_, processContext),
161  endpathsAreActive_(true),
162  skippingEvent_(false){
163 
164  ParameterSet const& opts = proc_pset.getUntrackedParameterSet("options", ParameterSet());
165  bool hasPath = false;
166  std::vector<std::string> const& pathNames = tns.getTrigPaths();
167  std::vector<std::string> const& endPathNames = tns.getEndPaths();
168 
169  int trig_bitpos = 0;
170  trig_paths_.reserve(pathNames.size());
171  for (auto const& trig_name : pathNames) {
172  fillTrigPath(proc_pset, preg, &prealloc, processConfiguration, trig_bitpos, trig_name, results(), endPathNames);
173  ++trig_bitpos;
174  hasPath = true;
175  }
176 
177  if (hasPath) {
178  // the results inserter stands alone
179  inserter->setTrigResultForStream(streamID.value(), results());
180 
181  results_inserter_ = makeInserter(actions, actReg_, inserter);
183  }
184 
185  // fill normal endpaths
186  int bitpos = 0;
187  end_paths_.reserve(endPathNames.size());
188  for (auto const& end_path_name : endPathNames) {
189  fillEndPath(proc_pset, preg, &prealloc, processConfiguration, bitpos, end_path_name, endPathNames);
190  ++bitpos;
191  }
192 
193  makePathStatusInserters(pathStatusInserters, endPathStatusInserters, actions);
194 
195  //See if all modules were used
196  std::set<std::string> usedWorkerLabels;
197  for (auto const& worker : allWorkers()) {
198  usedWorkerLabels.insert(worker->description().moduleLabel());
199  }
200  std::vector<std::string> modulesInConfig(proc_pset.getParameter<std::vector<std::string> >("@all_modules"));
201  std::set<std::string> modulesInConfigSet(modulesInConfig.begin(), modulesInConfig.end());
202  std::vector<std::string> unusedLabels;
203  set_difference(modulesInConfigSet.begin(), modulesInConfigSet.end(),
204  usedWorkerLabels.begin(), usedWorkerLabels.end(),
205  back_inserter(unusedLabels));
206  std::set<std::string> unscheduledLabels;
207  std::vector<std::string> shouldBeUsedLabels;
208  if (!unusedLabels.empty()) {
209  //Need to
210  // 1) create worker
211  // 2) if it is a WorkerT<EDProducer>, add it to our list
212  // 3) hand list to our delayed reader
213  for (auto const& label : unusedLabels) {
214  bool isTracked;
215  ParameterSet* modulePSet(proc_pset.getPSetForUpdate(label, isTracked));
216  assert(isTracked);
217  assert(modulePSet != nullptr);
218  workerManager_.addToUnscheduledWorkers(*modulePSet, preg, &prealloc, processConfiguration, label, unscheduledLabels, shouldBeUsedLabels);
219  }
220  if (!shouldBeUsedLabels.empty()) {
221  std::ostringstream unusedStream;
222  unusedStream << "'" << shouldBeUsedLabels.front() << "'";
223  for (std::vector<std::string>::iterator itLabel = shouldBeUsedLabels.begin() + 1,
224  itLabelEnd = shouldBeUsedLabels.end();
225  itLabel != itLabelEnd;
226  ++itLabel) {
227  unusedStream << ",'" << *itLabel << "'";
228  }
229  LogInfo("path")
230  << "The following module labels are not assigned to any path:\n"
231  << unusedStream.str()
232  << "\n";
233  }
234  }
235  if (!unscheduledLabels.empty()) {
236  number_of_unscheduled_modules_=unscheduledLabels.size();
237  workerManager_.setOnDemandProducts(preg, unscheduledLabels);
238  }
239 
240 
241  initializeEarlyDelete(*modReg, opts,preg,allowEarlyDelete);
242 
243  } // StreamSchedule::StreamSchedule
244 
245 
247  edm::ParameterSet const& opts, edm::ProductRegistry const& preg,
248  bool allowEarlyDelete) {
249  //for now, if have a subProcess, don't allow early delete
250  // In the future we should use the SubProcess's 'keep list' to decide what can be kept
251  if(not allowEarlyDelete) return;
252 
253  //see if 'canDeleteEarly' was set and if so setup the list with those products actually
254  // registered for this job
255  std::multimap<std::string,Worker*> branchToReadingWorker;
256  initializeBranchToReadingWorker(opts,preg,branchToReadingWorker);
257 
258  //If no delete early items have been specified we don't have to do anything
259  if(branchToReadingWorker.size()==0) {
260  return;
261  }
262  const std::vector<std::string> kEmpty;
263  std::map<Worker*,unsigned int> reserveSizeForWorker;
264  unsigned int upperLimitOnReadingWorker =0;
265  unsigned int upperLimitOnIndicies = 0;
266  unsigned int nUniqueBranchesToDelete=branchToReadingWorker.size();
267 
268  //talk with output modules first
269  modReg.forAllModuleHolders([this, &branchToReadingWorker,&nUniqueBranchesToDelete](maker::ModuleHolder* iHolder){
270  auto comm = iHolder->createOutputModuleCommunicator();
271  if (comm) {
272  if(branchToReadingWorker.size()>0) {
273  //If an OutputModule needs a product, we can't delete it early
274  // so we should remove it from our list
275  SelectedProductsForBranchType const& kept = comm->keptProducts();
276  for(auto const& item: kept[InEvent]) {
277  BranchDescription const& desc = *item.first;
278  auto found = branchToReadingWorker.equal_range(desc.branchName());
279  if(found.first !=found.second) {
280  --nUniqueBranchesToDelete;
281  branchToReadingWorker.erase(found.first,found.second);
282  }
283  }
284  }
285  }
286  });
287 
288  if(branchToReadingWorker.size()==0) {
289  return;
290  }
291 
292  for (auto w :allWorkers()) {
293  //determine if this module could read a branch we want to delete early
294  auto pset = pset::Registry::instance()->getMapped(w->description().parameterSetID());
295  if(0!=pset) {
296  auto branches = pset->getUntrackedParameter<std::vector<std::string>>("mightGet",kEmpty);
297  if(not branches.empty()) {
298  ++upperLimitOnReadingWorker;
299  }
300  for(auto const& branch:branches){
301  auto found = branchToReadingWorker.equal_range(branch);
302  if(found.first != found.second) {
303  ++upperLimitOnIndicies;
304  ++reserveSizeForWorker[w];
305  if(nullptr == found.first->second) {
306  found.first->second = w;
307  } else {
308  branchToReadingWorker.insert(make_pair(found.first->first,w));
309  }
310  }
311  }
312  }
313  }
314  {
315  auto it = branchToReadingWorker.begin();
316  std::vector<std::string> unusedBranches;
317  while(it !=branchToReadingWorker.end()) {
318  if(it->second == nullptr) {
319  unusedBranches.push_back(it->first);
320  //erasing the object invalidates the iterator so must advance it first
321  auto temp = it;
322  ++it;
323  branchToReadingWorker.erase(temp);
324  } else {
325  ++it;
326  }
327  }
328  if(not unusedBranches.empty()) {
329  LogWarning l("UnusedProductsForCanDeleteEarly");
330  l<<"The following products in the 'canDeleteEarly' list are not used in this job and will be ignored.\n"
331  " If possible, remove the producer from the job or add the product to the producer's own 'mightGet' list.";
332  for(auto const& n:unusedBranches){
333  l<<"\n "<<n;
334  }
335  }
336  }
337  if(0!=branchToReadingWorker.size()) {
338  earlyDeleteHelpers_.reserve(upperLimitOnReadingWorker);
339  earlyDeleteHelperToBranchIndicies_.resize(upperLimitOnIndicies,0);
340  earlyDeleteBranchToCount_.reserve(nUniqueBranchesToDelete);
341  std::map<const Worker*,EarlyDeleteHelper*> alreadySeenWorkers;
342  std::string lastBranchName;
343  size_t nextOpenIndex = 0;
344  unsigned int* beginAddress = &(earlyDeleteHelperToBranchIndicies_.front());
345  for(auto& branchAndWorker:branchToReadingWorker) {
346  if(lastBranchName != branchAndWorker.first) {
347  //have to put back the period we removed earlier in order to get the proper name
348  BranchID bid(branchAndWorker.first+".");
349  earlyDeleteBranchToCount_.emplace_back(bid,0U);
350  lastBranchName = branchAndWorker.first;
351  }
352  auto found = alreadySeenWorkers.find(branchAndWorker.second);
353  if(alreadySeenWorkers.end() == found) {
354  //NOTE: we will set aside enough space in earlyDeleteHelperToBranchIndicies_ to accommodate
355  // all the branches that might be read by this worker. However, initially we will only tell the
356  // EarlyDeleteHelper about the first one. As additional branches are added via 'appendIndex' the
357  // EarlyDeleteHelper will automatically advance its internal end pointer.
358  size_t index = nextOpenIndex;
359  size_t nIndices = reserveSizeForWorker[branchAndWorker.second];
361  earlyDeleteHelpers_.emplace_back(beginAddress+index,
362  beginAddress+index+1,
364  branchAndWorker.second->setEarlyDeleteHelper(&(earlyDeleteHelpers_.back()));
365  alreadySeenWorkers.insert(std::make_pair(branchAndWorker.second,&(earlyDeleteHelpers_.back())));
366  nextOpenIndex +=nIndices;
367  } else {
368  found->second->appendIndex(earlyDeleteBranchToCount_.size()-1);
369  }
370  }
371 
372  //Now we can compactify the earlyDeleteHelperToBranchIndicies_ since we may have over estimated the
373  // space needed for each module
374  auto itLast = earlyDeleteHelpers_.begin();
375  for(auto it = earlyDeleteHelpers_.begin()+1;it != earlyDeleteHelpers_.end();++it) {
376  if(itLast->end() != it->begin()) {
377  //figure the offset for next Worker since it hasn't been moved yet so it has the original address
378  unsigned int delta = it->begin()- itLast->end();
379  it->shiftIndexPointers(delta);
380 
382  (itLast->end()-beginAddress),
384  (it->begin()-beginAddress));
385  }
386  itLast = it;
387  }
388  earlyDeleteHelperToBranchIndicies_.erase(earlyDeleteHelperToBranchIndicies_.begin()+(itLast->end()-beginAddress),
390 
391  //now tell the paths about the deleters
392  for(auto& p : trig_paths_) {
393  p.setEarlyDeleteHelpers(alreadySeenWorkers);
394  }
395  for(auto& p : end_paths_) {
396  p.setEarlyDeleteHelpers(alreadySeenWorkers);
397  }
399  }
400  }
401 
403  ProductRegistry& preg,
404  PreallocationConfiguration const* prealloc,
405  std::shared_ptr<ProcessConfiguration const> processConfiguration,
406  std::string const& pathName,
407  bool ignoreFilters,
408  PathWorkers& out,
409  std::vector<std::string> const& endPathNames) {
410  vstring modnames = proc_pset.getParameter<vstring>(pathName);
411  PathWorkers tmpworkers;
412 
413  unsigned int placeInPath = 0;
414  for (auto const& name : modnames) {
415 
417  if (name[0] == '!') filterAction = WorkerInPath::Veto;
418  else if (name[0] == '-') filterAction = WorkerInPath::Ignore;
419 
420  std::string moduleLabel = name;
421  if (filterAction != WorkerInPath::Normal) moduleLabel.erase(0, 1);
422 
423  bool isTracked;
424  ParameterSet* modpset = proc_pset.getPSetForUpdate(moduleLabel, isTracked);
425  if (modpset == 0) {
426  std::string pathType("endpath");
427  if (!search_all(endPathNames, pathName)) {
428  pathType = std::string("path");
429  }
431  "The unknown module label \"" << moduleLabel <<
432  "\" appears in " << pathType << " \"" << pathName <<
433  "\"\n please check spelling or remove that label from the path.";
434  }
435  assert(isTracked);
436 
437  Worker* worker = workerManager_.getWorker(*modpset, preg, prealloc, processConfiguration, moduleLabel);
438  if (ignoreFilters && filterAction != WorkerInPath::Ignore && worker->moduleType()==Worker::kFilter) {
439  // We have a filter on an end path, and the filter is not explicitly ignored.
440  // See if the filter is allowed.
441  std::vector<std::string> allowed_filters = proc_pset.getUntrackedParameter<vstring>("@filters_on_endpaths");
442  if (!search_all(allowed_filters, worker->description().moduleName())) {
443  // Filter is not allowed. Ignore the result, and issue a warning.
444  filterAction = WorkerInPath::Ignore;
445  LogWarning("FilterOnEndPath")
446  << "The EDFilter '" << worker->description().moduleName() << "' with module label '" << moduleLabel << "' appears on EndPath '" << pathName << "'.\n"
447  << "The return value of the filter will be ignored.\n"
448  << "To suppress this warning, either remove the filter from the endpath,\n"
449  << "or explicitly ignore it in the configuration by using cms.ignore().\n";
450  }
451  }
452  tmpworkers.emplace_back(worker, filterAction, placeInPath);
453  ++placeInPath;
454  }
455 
456  out.swap(tmpworkers);
457  }
458 
460  ProductRegistry& preg,
461  PreallocationConfiguration const* prealloc,
462  std::shared_ptr<ProcessConfiguration const> processConfiguration,
463  int bitpos, std::string const& name, TrigResPtr trptr,
464  std::vector<std::string> const& endPathNames) {
465  PathWorkers tmpworkers;
466  fillWorkers(proc_pset, preg, prealloc, processConfiguration, name, false, tmpworkers, endPathNames);
467 
468  // an empty path will cause an extra bit that is not used
469  if (!tmpworkers.empty()) {
470  trig_paths_.emplace_back(bitpos, name, tmpworkers, trptr, actionTable(), actReg_, &streamContext_, &skippingEvent_, PathContext::PathType::kPath);
471  } else {
472  empty_trig_paths_.push_back(bitpos);
473  }
474  for (WorkerInPath const& workerInPath : tmpworkers) {
475  addToAllWorkers(workerInPath.getWorker());
476  }
477  }
478 
480  ProductRegistry& preg,
481  PreallocationConfiguration const* prealloc,
482  std::shared_ptr<ProcessConfiguration const> processConfiguration,
483  int bitpos, std::string const& name,
484  std::vector<std::string> const& endPathNames) {
485  PathWorkers tmpworkers;
486  fillWorkers(proc_pset, preg, prealloc, processConfiguration, name, true, tmpworkers, endPathNames);
487 
488  if (!tmpworkers.empty()) {
489  //EndPaths are not supposed to stop if SkipEvent type exception happens
490  end_paths_.emplace_back(bitpos, name, tmpworkers, TrigResPtr(), actionTable(), actReg_, &streamContext_, nullptr, PathContext::PathType::kEndPath);
491  } else {
492  empty_end_paths_.push_back(bitpos);
493  }
494  for (WorkerInPath const& workerInPath : tmpworkers) {
495  addToAllWorkers(workerInPath.getWorker());
496  }
497  }
498 
501  }
502 
505  }
506 
508  std::string const& iLabel) {
509  Worker* found = nullptr;
510  for (auto const& worker : allWorkers()) {
511  if (worker->description().moduleLabel() == iLabel) {
512  found = worker;
513  break;
514  }
515  }
516  if (nullptr == found) {
517  return;
518  }
519 
520  iMod->replaceModuleFor(found);
522  }
523 
524  std::vector<ModuleDescription const*>
526  std::vector<ModuleDescription const*> result;
527  result.reserve(allWorkers().size());
528 
529  for (auto const& worker : allWorkers()) {
530  ModuleDescription const* p = worker->descPtr();
531  result.push_back(p);
532  }
533  return result;
534  }
535 
537  EventPrincipal& ep,
538  EventSetup const& es,
539  std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters) {
540  this->resetAll();
541 
543 
544  Traits::setStreamContext(streamContext_, ep);
545  Traits::preScheduleSignal(actReg_.get(), &streamContext_);
546 
547  HLTPathStatus hltPathStatus(hlt::Pass, 0);
548  for (int empty_trig_path : empty_trig_paths_) {
549  results_->at(empty_trig_path) = hltPathStatus;
550  pathStatusInserters[empty_trig_path]->setPathStatus(streamID_, hltPathStatus);
551  std::exception_ptr iException = pathStatusInserterWorkers_[empty_trig_path]->runModuleDirectly<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>>(
553  );
554  if (iException) {
555  iTask.doneWaiting(iException);
556  return;
557  }
558  }
559  for (int empty_end_path : empty_end_paths_) {
560  std::exception_ptr iException = endPathStatusInserterWorkers_[empty_end_path]->runModuleDirectly<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>>(
562  );
563  if (iException) {
564  iTask.doneWaiting(iException);
565  return;
566  }
567  }
568 
569  // This call takes care of the unscheduled processing.
571 
572  ++total_events_;
573  auto serviceToken = ServiceRegistry::instance().presentToken();
574  auto pathsDone = make_waiting_task(tbb::task::allocate_root(),
575  [iTask,&ep, &es, this,serviceToken](std::exception_ptr const* iPtr) mutable
576  {
577  ServiceRegistry::Operate operate(serviceToken);
578 
579  std::exception_ptr ptr;
580  if(iPtr) {
581  ptr = *iPtr;
582  }
583  finishedPaths(ptr, std::move(iTask), ep, es);
584  });
585 
586  //The holder guarantees that if the paths finish before the loop ends
587  // that we do not start too soon. It also guarantees that the task will
588  // run under that condition.
589  WaitingTaskHolder taskHolder(pathsDone);
590 
591  for(auto it = trig_paths_.rbegin(), itEnd = trig_paths_.rend();
592  it != itEnd; ++ it) {
593  it->processOneOccurrenceAsync(pathsDone,ep, es, streamID_, &streamContext_);
594  }
595  }
596 
597  void
598  StreamSchedule::finishedPaths(std::exception_ptr iExcept, WaitingTaskHolder iWait, EventPrincipal& ep,
599  EventSetup const& es) {
600 
601  if(iExcept) {
602  try {
603  std::rethrow_exception(iExcept);
604  }
605  catch(cms::Exception& e) {
607  assert (action != exception_actions::IgnoreCompletely);
608  assert (action != exception_actions::FailPath);
609  if (action == exception_actions::SkipEvent) {
610  edm::printCmsExceptionWarning("SkipEvent", e);
611  iExcept = std::exception_ptr();
612  } else {
613  iExcept = std::current_exception();
614  }
615  }
616  catch(...) {
617  iExcept = std::current_exception();
618  }
619  }
620 
621 
622  if((not iExcept) and results_->accept()) {
623  ++total_passed_;
624  }
625 
626  if((not iExcept) and (nullptr != results_inserter_.get())) {
627  try {
628  ParentContext parentContext(&streamContext_);
630 
631  results_inserter_->doWork<Traits>(ep, es, streamID_, parentContext, &streamContext_);
632  }
633  catch (cms::Exception & ex) {
634  if(ex.context().empty()) {
635  std::ostringstream ost;
636  ost << "Processing Event " << ep.id();
637  ex.addContext(ost.str());
638  }
639  iExcept = std::current_exception();
640  }
641  catch(...) {
642  iExcept = std::current_exception();
643  }
644  }
645  if(end_paths_.empty() or iExcept or (not endpathsAreActive_)) {
646  iExcept = finishProcessOneEvent(iExcept);
647  iWait.doneWaiting(iExcept);
648  } else {
649  auto serviceToken = ServiceRegistry::instance().presentToken();
650 
651  auto endPathsDone = make_waiting_task(tbb::task::allocate_root(),
652  [iWait,this,serviceToken](std::exception_ptr const* iPtr) mutable
653  {
654  ServiceRegistry::Operate operate(serviceToken);
655 
656  std::exception_ptr ptr;
657  if(iPtr) {
658  ptr = *iPtr;
659  }
661  });
662  //The holder guarantees that if the paths finish before the loop ends
663  // that we do not start too soon. It also guarantees that the task will
664  // run under that condition.
665  WaitingTaskHolder taskHolder(endPathsDone);
666  for(auto it = end_paths_.rbegin(), itEnd = end_paths_.rend();
667  it != itEnd; ++it) {
668  it->processOneOccurrenceAsync(endPathsDone,ep, es, streamID_, &streamContext_);
669  }
670  }
671  }
672 
673 
674  std::exception_ptr
675  StreamSchedule::finishProcessOneEvent(std::exception_ptr iExcept) {
677 
678  if(iExcept) {
679  //add context information to the exception and print message
680  try {
681  convertException::wrap([&]() {
682  std::rethrow_exception(iExcept);
683  });
684  } catch(cms::Exception& ex) {
685  bool const cleaningUpAfterException = false;
686  if (ex.context().empty()) {
687  addContextAndPrintException("Calling function StreamSchedule::processOneEvent", ex, cleaningUpAfterException);
688  } else {
689  addContextAndPrintException("", ex, cleaningUpAfterException);
690  }
691  iExcept = std::current_exception();
692  }
693 
694  actReg_->preStreamEarlyTerminationSignal_(streamContext_,TerminationOrigin::ExceptionFromThisContext);
695  }
696 
697  try {
698  Traits::postScheduleSignal(actReg_.get(), &streamContext_);
699  } catch(...) {
700  if(not iExcept) {
701  iExcept = std::current_exception();
702  }
703  }
704  if(not iExcept ) {
706  }
707 
708  return iExcept;
709  }
710 
711 
712  void
713  StreamSchedule::availablePaths(std::vector<std::string>& oLabelsToFill) const {
714  oLabelsToFill.reserve(trig_paths_.size());
715  std::transform(trig_paths_.begin(),
716  trig_paths_.end(),
717  std::back_inserter(oLabelsToFill),
718  std::bind(&Path::name, std::placeholders::_1));
719  }
720 
721  void
723  std::vector<std::string>& oLabelsToFill) const {
724  TrigPaths::const_iterator itFound =
725  std::find_if (trig_paths_.begin(),
726  trig_paths_.end(),
727  std::bind(std::equal_to<std::string>(),
728  iPathLabel,
729  std::bind(&Path::name, std::placeholders::_1)));
730  if (itFound!=trig_paths_.end()) {
731  oLabelsToFill.reserve(itFound->size());
732  for (size_t i = 0; i < itFound->size(); ++i) {
733  oLabelsToFill.push_back(itFound->getWorker(i)->description().moduleLabel());
734  }
735  }
736  }
737 
738  void
740  std::vector<ModuleDescription const*>& descriptions,
741  unsigned int hint) const {
742  descriptions.clear();
743  bool found = false;
744  TrigPaths::const_iterator itFound;
745 
746  if(hint < trig_paths_.size()) {
747  itFound = trig_paths_.begin() + hint;
748  if(itFound->name() == iPathLabel) found = true;
749  }
750  if(!found) {
751  // if the hint did not work, do it the slow way
752  itFound = std::find_if (trig_paths_.begin(),
753  trig_paths_.end(),
754  std::bind(std::equal_to<std::string>(),
755  iPathLabel,
756  std::bind(&Path::name, std::placeholders::_1)));
757  if (itFound != trig_paths_.end()) found = true;
758  }
759  if (found) {
760  descriptions.reserve(itFound->size());
761  for (size_t i = 0; i < itFound->size(); ++i) {
762  descriptions.push_back(itFound->getWorker(i)->descPtr());
763  }
764  }
765  }
766 
767  void
769  std::vector<ModuleDescription const*>& descriptions,
770  unsigned int hint) const {
771  descriptions.clear();
772  bool found = false;
773  TrigPaths::const_iterator itFound;
774 
775  if(hint < end_paths_.size()) {
776  itFound = end_paths_.begin() + hint;
777  if(itFound->name() == iEndPathLabel) found = true;
778  }
779  if(!found) {
780  // if the hint did not work, do it the slow way
781  itFound = std::find_if (end_paths_.begin(),
782  end_paths_.end(),
783  std::bind(std::equal_to<std::string>(),
784  iEndPathLabel,
785  std::bind(&Path::name, std::placeholders::_1)));
786  if (itFound != end_paths_.end()) found = true;
787  }
788  if (found) {
789  descriptions.reserve(itFound->size());
790  for (size_t i = 0; i < itFound->size(); ++i) {
791  descriptions.push_back(itFound->getWorker(i)->descPtr());
792  }
793  }
794  }
795 
796  void
798  endpathsAreActive_ = active;
799  }
800 
801  bool
803  return endpathsAreActive_;
804  }
805 
806  static void
808  size_t which,
809  ModuleInPathSummary& sum) {
810  sum.timesVisited += path.timesVisited(which);
811  sum.timesPassed += path.timesPassed(which);
812  sum.timesFailed += path.timesFailed(which);
813  sum.timesExcept += path.timesExcept(which);
814  sum.moduleLabel = path.getWorker(which)->description().moduleLabel();
815  }
816 
817  static void
819  sum.name = path.name();
820  sum.bitPosition = path.bitPosition();
821  sum.timesRun += path.timesRun();
822  sum.timesPassed += path.timesPassed();
823  sum.timesFailed += path.timesFailed();
824  sum.timesExcept += path.timesExcept();
825 
826  Path::size_type sz = path.size();
827  if(sum.moduleInPathSummaries.size()==0) {
828  std::vector<ModuleInPathSummary> temp(sz);
829  for (size_t i = 0; i != sz; ++i) {
830  fillModuleInPathSummary(path, i, temp[i]);
831  }
832  sum.moduleInPathSummaries.swap(temp);
833  } else {
834  assert(sz == sum.moduleInPathSummaries.size());
835  for (size_t i = 0; i != sz; ++i) {
837  }
838  }
839  }
840 
841  static void
843  sum.timesVisited += w.timesVisited();
844  sum.timesRun += w.timesRun();
845  sum.timesPassed += w.timesPassed();
846  sum.timesFailed += w.timesFailed();
847  sum.timesExcept += w.timesExcept();
848  sum.moduleLabel = w.description().moduleLabel();
849  }
850 
851  static void
853  fillWorkerSummaryAux(*pw, sum);
854  }
855 
856  void
861 
862  fill_summary(trig_paths_, rep.trigPathSummaries, &fillPathSummary);
863  fill_summary(end_paths_, rep.endPathSummaries, &fillPathSummary);
864  fill_summary(allWorkers(), rep.workerSummaries, &fillWorkerSummary);
865  }
866 
867  void
869  using std::placeholders::_1;
871  for_all(trig_paths_, std::bind(&Path::clearCounters, _1));
872  for_all(end_paths_, std::bind(&Path::clearCounters, _1));
873  for_all(allWorkers(), std::bind(&Worker::clearCounters, _1));
874  }
875 
876  void
878  skippingEvent_ = false;
879  results_->reset();
880  }
881 
882  void
885  }
886 
887  void
889  //must be sure we have cleared the count first
890  for(auto& count:earlyDeleteBranchToCount_) {
891  count.count = 0;
892  }
893  //now reset based on how many helpers use that branch
895  ++(earlyDeleteBranchToCount_[index].count);
896  }
897  for(auto& helper: earlyDeleteHelpers_) {
898  helper.reset();
899  }
900  }
901 
902  void
904  std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
905  std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
906  ExceptionToActionTable const& actions) {
907 
908  int bitpos = 0;
909  unsigned int indexEmpty = 0;
910  unsigned int indexOfPath = 0;
911  for(auto & pathStatusInserter : pathStatusInserters) {
912  std::shared_ptr<PathStatusInserter> inserterPtr = get_underlying(pathStatusInserter);
914  inserterPtr->moduleDescription(),
915  &actions));
916  pathStatusInserterWorkers_.emplace_back(workerPtr);
917  workerPtr->setActivityRegistry(actReg_);
918  addToAllWorkers(workerPtr.get());
919 
920  // A little complexity here because a C++ Path object is not
921  // instantiated and put into end_paths if there are no modules
922  // on the configured path.
923  if (indexEmpty < empty_trig_paths_.size() && bitpos == empty_trig_paths_.at(indexEmpty)) {
924  ++indexEmpty;
925  } else {
926  trig_paths_.at(indexOfPath).setPathStatusInserter(inserterPtr.get(),
927  workerPtr.get());
928  ++indexOfPath;
929  }
930  ++bitpos;
931  }
932 
933  bitpos = 0;
934  indexEmpty = 0;
935  indexOfPath = 0;
936  for(auto & endPathStatusInserter : endPathStatusInserters) {
937  std::shared_ptr<EndPathStatusInserter> inserterPtr = get_underlying(endPathStatusInserter);
939  inserterPtr->moduleDescription(),
940  &actions));
941  endPathStatusInserterWorkers_.emplace_back(workerPtr);
942  workerPtr->setActivityRegistry(actReg_);
943  addToAllWorkers(workerPtr.get());
944 
945  // A little complexity here because a C++ Path object is not
946  // instantiated and put into end_paths if there are no modules
947  // on the configured path.
948  if (indexEmpty < empty_end_paths_.size() && bitpos == empty_end_paths_.at(indexEmpty)) {
949  ++indexEmpty;
950  } else {
951  end_paths_.at(indexOfPath).setPathStatusInserter(nullptr,
952  workerPtr.get());
953  ++indexOfPath;
954  }
955  ++bitpos;
956  }
957  }
958 }
static void fillModuleInPathSummary(Path const &path, size_t which, ModuleInPathSummary &sum)
size
Write out results.
dbl * delta
Definition: mlp_gen.cc:36
void moduleDescriptionsInEndPath(std::string const &iEndPathLabel, std::vector< ModuleDescription const * > &descriptions, unsigned int hint) const
T getParameter(std::string const &) const
std::vector< PathSummary > endPathSummaries
Definition: TriggerReport.h:68
T getUntrackedParameter(std::string const &, T const &) const
std::string const & branchName() const
int timesRun() const
Definition: Path.h:81
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
int bitPosition() const
Definition: Path.h:76
ModuleDescription const & description() const
Definition: Worker.h:138
std::vector< int > empty_trig_paths_
Definition: helper.py:1
void moduleDescriptionsInPath(std::string const &iPathLabel, std::vector< ModuleDescription const * > &descriptions, unsigned int hint) const
roAction_t actions[nactions]
Definition: GenABIO.cc:187
void fillTrigPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, TrigResPtr, std::vector< std::string > const &endPathNames)
std::vector< edm::propagate_const< WorkerPtr > > pathStatusInserterWorkers_
void endStream(StreamID iID, StreamContext &streamContext)
const double w
Definition: UKUtility.cc:23
virtual void replaceModuleFor(Worker *) const =0
Strings const & getEndPaths() const
int totalEventsFailed() const
std::array< SelectedProducts, NumBranchTypes > SelectedProductsForBranchType
void initializeEarlyDelete(ModuleRegistry &modReg, edm::ParameterSet const &opts, edm::ProductRegistry const &preg, bool allowEarlyDelete)
std::shared_ptr< HLTGlobalStatus > TrigResPtr
void setOnDemandProducts(ProductRegistry &pregistry, std::set< std::string > const &unscheduledLabels) const
WorkersInPath::size_type size_type
Definition: Path.h:49
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
void addToUnscheduledWorkers(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration > processConfiguration, std::string label, std::set< std::string > &unscheduledLabels, std::vector< std::string > &shouldBeUsedLabels)
int timesPassed() const
Definition: Worker.h:176
volatile bool endpathsAreActive_
std::vector< BranchToCount > earlyDeleteBranchToCount_
EventID const & id() const
int totalEvents() const
void clearCounters()
Definition: Worker.h:162
void addContextAndPrintException(char const *context, cms::Exception &ex, bool disablePrint)
std::string const & moduleName() const
std::string const & category() const
Definition: Exception.cc:183
void addToAllWorkers(Worker *w)
int timesExcept() const
Definition: Path.h:84
exception_actions::ActionCodes find(const std::string &category) const
size_type size() const
Definition: Path.h:88
ParameterSet getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
edm::propagate_const< WorkerPtr > results_inserter_
ExceptionToActionTable const & actionTable() const
returns the action table
void beginStream(StreamID id, StreamContext &streamContext)
Definition: Worker.cc:277
std::shared_ptr< Worker > WorkerPtr
std::vector< WorkerSummary > workerSummaries
Definition: TriggerReport.h:69
void fillWorkers(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &name, bool ignoreFilters, PathWorkers &out, std::vector< std::string > const &endPathNames)
ServiceToken presentToken() const
std::string const & moduleLabel() const
unsigned int number_of_unscheduled_modules_
WorkerManager workerManager_
std::shared_ptr< ActivityRegistry > actReg_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
std::string moduleLabel
Definition: TriggerReport.h:56
void fillEndPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, std::vector< std::string > const &endPathNames)
std::vector< WorkerInPath > PathWorkers
int timesExcept() const
Definition: Worker.h:178
int totalEventsPassed() const
std::vector< int > empty_end_paths_
void getTriggerReport(TriggerReport &rep) const
void doneWaiting(std::exception_ptr iExcept)
std::vector< PathSummary > trigPathSummaries
Definition: TriggerReport.h:67
EventSummary eventSummary
Definition: TriggerReport.h:66
accept
Definition: HLTenums.h:19
T & get_underlying(propagate_const< T > &)
int timesVisited() const
Definition: Worker.h:175
std::string name
Definition: TriggerReport.h:44
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
def unique(seq, keepstr=True)
Definition: tier0.py:24
Definition: Path.h:44
StreamContext streamContext_
std::list< std::string > const & context() const
Definition: Exception.cc:191
static void fillPathSummary(Path const &path, PathSummary &sum)
static ServiceRegistry & instance()
int timesPassed() const
Definition: Path.h:82
bool getMapped(key_type const &k, value_type &result) const
Definition: Registry.cc:18
int timesRun() const
Definition: Worker.h:174
#define end
Definition: vmac.h:37
rep
Definition: cuy.py:1188
std::exception_ptr finishProcessOneEvent(std::exception_ptr)
element_type const * get() const
static void fillWorkerSummaryAux(Worker const &w, WorkerSummary &sum)
void beginStream(StreamID iID, StreamContext &streamContext)
void clearCounters()
Clear all the counters in the trigger report.
void processOneEventAsync(WaitingTaskHolder iTask, EventPrincipal &ep, EventSetup const &es, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters)
unsigned int value() const
Definition: StreamID.h:46
std::vector< unsigned int > earlyDeleteHelperToBranchIndicies_
void forAllModuleHolders(F iFunc)
edm::propagate_const< TrigResPtr > results_
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:90
bool search_all(ForwardSequence const &s, Datum const &d)
Definition: Algorithms.h:46
int timesVisited(size_type i) const
Definition: Path.h:89
double b
Definition: hdecay.h:120
void addContext(std::string const &context)
Definition: Exception.cc:227
virtual std::unique_ptr< OutputModuleCommunicator > createOutputModuleCommunicator()=0
int timesFailed() const
Definition: Worker.h:177
static void fillWorkerSummary(Worker const *pw, WorkerSummary &sum)
bool endPathsEnabled() const
StreamSchedule(std::shared_ptr< TriggerResultInserter > inserter, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, std::shared_ptr< ModuleRegistry >, ParameterSet &proc_pset, service::TriggerNamesService const &tns, PreallocationConfiguration const &prealloc, ProductRegistry &pregistry, BranchIDListHelper &branchIDListHelper, ExceptionToActionTable const &actions, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration > processConfiguration, bool allowEarlyDelete, StreamID streamID, ProcessContext const *processContext)
std::atomic< bool > skippingEvent_
std::string const & name() const
Definition: Path.h:77
#define begin
Definition: vmac.h:30
HLT enums.
Strings const & getTrigPaths() const
Worker const * getWorker(size_type i) const
Definition: Path.h:93
void replaceModule(maker::ModuleHolder *iMod, std::string const &iLabel)
clone the type of module with label iLabel but configure with iPSet.
std::vector< edm::propagate_const< WorkerPtr > > endPathStatusInserterWorkers_
void enableEndPaths(bool active)
void makePathStatusInserters(std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, ExceptionToActionTable const &actions)
def which(cmd)
Definition: eostools.py:335
std::vector< ModuleInPathSummary > moduleInPathSummaries
Definition: TriggerReport.h:45
virtual Types moduleType() const =0
auto wrap(F iFunc) -> decltype(iFunc())
void finishedPaths(std::exception_ptr, WaitingTaskHolder, EventPrincipal &ep, EventSetup const &es)
void setupOnDemandSystem(Principal &principal, EventSetup const &es)
TrigResConstPtr results() const
std::vector< std::string > vstring
Definition: Schedule.cc:429
def move(src, dest)
Definition: eostools.py:510
static Registry * instance()
Definition: Registry.cc:12
Worker * getWorker(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &label)
void clearCounters()
Definition: Path.cc:171
void availablePaths(std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill the labels for all paths in the process
std::vector< EarlyDeleteHelper > earlyDeleteHelpers_
void modulesInPath(std::string const &iPathLabel, std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel ...
void addToAllWorkers(Worker *w)
int timesFailed() const
Definition: Path.h:83
void printCmsExceptionWarning(char const *behavior, cms::Exception const &e)
def operate(timelog, memlog, json_f, num)
ParameterSet * getPSetForUpdate(std::string const &name, bool &isTracked)