CMS 3D CMS Logo

StreamSchedule.cc
Go to the documentation of this file.
2 
26 
27 #include <algorithm>
28 #include <cassert>
29 #include <cstdlib>
30 #include <functional>
31 #include <iomanip>
32 #include <list>
33 #include <map>
34 #include <exception>
35 
36 namespace edm {
37  namespace {
38 
39  // Function template to transform each element in the input range to
40  // a value placed into the output range. The supplied function
41  // should take a const_reference to the 'input', and write to a
42  // reference to the 'output'.
43  template <typename InputIterator, typename ForwardIterator, typename Func>
44  void
45  transform_into(InputIterator begin, InputIterator end,
46  ForwardIterator out, Func func) {
47  for (; begin != end; ++begin, ++out) func(*begin, *out);
48  }
49 
50  // Function template that takes a sequence 'from', a sequence
51  // 'to', and a callable object 'func'. It and applies
52  // transform_into to fill the 'to' sequence with the values
53  // calcuated by the callable object, taking care to fill the
54  // outupt only if all calls succeed.
55  template <typename FROM, typename TO, typename FUNC>
56  void
57  fill_summary(FROM const& from, TO& to, FUNC func) {
58  if(to.size()!=from.size()) {
59  TO temp(from.size());
60  transform_into(from.begin(), from.end(), temp.begin(), func);
61  to.swap(temp);
62  } else {
63  transform_into(from.begin(), from.end(), to.begin(), func);
64  }
65  }
66 
67  // -----------------------------
68 
69  // Here we make the trigger results inserter directly. This should
70  // probably be a utility in the WorkerRegistry or elsewhere.
71 
73  makeInserter(ExceptionToActionTable const& actions,
74  std::shared_ptr<ActivityRegistry> areg,
75  std::shared_ptr<TriggerResultInserter> inserter) {
76  StreamSchedule::WorkerPtr ptr(new edm::WorkerT<TriggerResultInserter::ModuleType>(inserter, inserter->moduleDescription(), &actions));
77  ptr->setActivityRegistry(areg);
78  return ptr;
79  }
80 
81  void
82  initializeBranchToReadingWorker(ParameterSet const& opts,
83  ProductRegistry const& preg,
84  std::multimap<std::string,Worker*>& branchToReadingWorker)
85  {
86  // See if any data has been marked to be deleted early (removing any duplicates)
87  auto vBranchesToDeleteEarly = opts.getUntrackedParameter<std::vector<std::string>>("canDeleteEarly",std::vector<std::string>());
88  if(not vBranchesToDeleteEarly.empty()) {
89  std::sort(vBranchesToDeleteEarly.begin(),vBranchesToDeleteEarly.end(),std::less<std::string>());
90  vBranchesToDeleteEarly.erase(std::unique(vBranchesToDeleteEarly.begin(),vBranchesToDeleteEarly.end()),
91  vBranchesToDeleteEarly.end());
92 
93  // Are the requested items in the product registry?
94  auto allBranchNames = preg.allBranchNames();
95  //the branch names all end with a period, which we do not want to compare with
96  for(auto & b:allBranchNames) {
97  b.resize(b.size()-1);
98  }
99  std::sort(allBranchNames.begin(),allBranchNames.end(),std::less<std::string>());
100  std::vector<std::string> temp;
101  temp.reserve(vBranchesToDeleteEarly.size());
102 
103  std::set_intersection(vBranchesToDeleteEarly.begin(),vBranchesToDeleteEarly.end(),
104  allBranchNames.begin(),allBranchNames.end(),
105  std::back_inserter(temp));
106  vBranchesToDeleteEarly.swap(temp);
107  if(temp.size() != vBranchesToDeleteEarly.size()) {
108  std::vector<std::string> missingProducts;
109  std::set_difference(temp.begin(),temp.end(),
110  vBranchesToDeleteEarly.begin(),vBranchesToDeleteEarly.end(),
111  std::back_inserter(missingProducts));
112  LogInfo l("MissingProductsForCanDeleteEarly");
113  l<<"The following products in the 'canDeleteEarly' list are not available in this job and will be ignored.";
114  for(auto const& n:missingProducts){
115  l<<"\n "<<n;
116  }
117  }
118  //set placeholder for the branch, we will remove the nullptr if a
119  // module actually wants the branch.
120  for(auto const& branch:vBranchesToDeleteEarly) {
121  branchToReadingWorker.insert(std::make_pair(branch, static_cast<Worker*>(nullptr)));
122  }
123  }
124  }
125  }
126 
127  // -----------------------------
128 
129  typedef std::vector<std::string> vstring;
130 
131  // -----------------------------
132 
133  StreamSchedule::StreamSchedule(std::shared_ptr<TriggerResultInserter> inserter,
134  std::shared_ptr<ModuleRegistry> modReg,
135  ParameterSet& proc_pset,
137  PreallocationConfiguration const& prealloc,
138  ProductRegistry& preg,
139  BranchIDListHelper& branchIDListHelper,
140  ExceptionToActionTable const& actions,
141  std::shared_ptr<ActivityRegistry> areg,
142  std::shared_ptr<ProcessConfiguration> processConfiguration,
143  bool allowEarlyDelete,
144  StreamID streamID,
145  ProcessContext const* processContext) :
146  workerManager_(modReg,areg, actions),
147  actReg_(areg),
148  trig_name_list_(tns.getTrigPaths()),
149  end_path_name_list_(tns.getEndPaths()),
150  results_(new HLTGlobalStatus(trig_name_list_.size())),
151  results_inserter_(),
152  trig_paths_(),
153  end_paths_(),
154  total_events_(),
155  total_passed_(),
156  number_of_unscheduled_modules_(0),
157  streamID_(streamID),
158  streamContext_(streamID_, processContext),
159  endpathsAreActive_(true),
160  skippingEvent_(false){
161 
162  ParameterSet const& opts = proc_pset.getUntrackedParameterSet("options", ParameterSet());
163  bool hasPath = false;
164 
165  int trig_bitpos = 0;
166  trig_paths_.reserve(trig_name_list_.size());
167  vstring labelsOnTriggerPaths;
168  for (auto const& trig_name : trig_name_list_) {
169  fillTrigPath(proc_pset, preg, &prealloc, processConfiguration, trig_bitpos, trig_name, results(), &labelsOnTriggerPaths);
170  ++trig_bitpos;
171  hasPath = true;
172  }
173 
174  if (hasPath) {
175  // the results inserter stands alone
176  inserter->setTrigResultForStream(streamID.value(), results());
177 
178  results_inserter_ = makeInserter(actions, actReg_, inserter);
180  }
181 
182  // fill normal endpaths
183  int bitpos = 0;
184  end_paths_.reserve(end_path_name_list_.size());
185  for (auto const& end_path_name : end_path_name_list_) {
186  fillEndPath(proc_pset, preg, &prealloc, processConfiguration, bitpos, end_path_name);
187  ++bitpos;
188  }
189 
190  //See if all modules were used
191  std::set<std::string> usedWorkerLabels;
192  for (auto const& worker : allWorkers()) {
193  usedWorkerLabels.insert(worker->description().moduleLabel());
194  }
195  std::vector<std::string> modulesInConfig(proc_pset.getParameter<std::vector<std::string> >("@all_modules"));
196  std::set<std::string> modulesInConfigSet(modulesInConfig.begin(), modulesInConfig.end());
197  std::vector<std::string> unusedLabels;
198  set_difference(modulesInConfigSet.begin(), modulesInConfigSet.end(),
199  usedWorkerLabels.begin(), usedWorkerLabels.end(),
200  back_inserter(unusedLabels));
201  std::set<std::string> unscheduledLabels;
202  std::vector<std::string> shouldBeUsedLabels;
203  if (!unusedLabels.empty()) {
204  //Need to
205  // 1) create worker
206  // 2) if it is a WorkerT<EDProducer>, add it to our list
207  // 3) hand list to our delayed reader
208  for (auto const& label : unusedLabels) {
209  bool isTracked;
210  ParameterSet* modulePSet(proc_pset.getPSetForUpdate(label, isTracked));
211  assert(isTracked);
212  assert(modulePSet != nullptr);
213  workerManager_.addToUnscheduledWorkers(*modulePSet, preg, &prealloc, processConfiguration, label, unscheduledLabels, shouldBeUsedLabels);
214  }
215  if (!shouldBeUsedLabels.empty()) {
216  std::ostringstream unusedStream;
217  unusedStream << "'" << shouldBeUsedLabels.front() << "'";
218  for (std::vector<std::string>::iterator itLabel = shouldBeUsedLabels.begin() + 1,
219  itLabelEnd = shouldBeUsedLabels.end();
220  itLabel != itLabelEnd;
221  ++itLabel) {
222  unusedStream << ",'" << *itLabel << "'";
223  }
224  LogInfo("path")
225  << "The following module labels are not assigned to any path:\n"
226  << unusedStream.str()
227  << "\n";
228  }
229  }
230  if (!unscheduledLabels.empty()) {
231  number_of_unscheduled_modules_=unscheduledLabels.size();
232  workerManager_.setOnDemandProducts(preg, unscheduledLabels);
233  }
234 
235 
236  initializeEarlyDelete(*modReg, opts,preg,allowEarlyDelete);
237 
238  } // StreamSchedule::StreamSchedule
239 
240 
242  edm::ParameterSet const& opts, edm::ProductRegistry const& preg,
243  bool allowEarlyDelete) {
244  //for now, if have a subProcess, don't allow early delete
245  // In the future we should use the SubProcess's 'keep list' to decide what can be kept
246  if(not allowEarlyDelete) return;
247 
248  //see if 'canDeleteEarly' was set and if so setup the list with those products actually
249  // registered for this job
250  std::multimap<std::string,Worker*> branchToReadingWorker;
251  initializeBranchToReadingWorker(opts,preg,branchToReadingWorker);
252 
253  //If no delete early items have been specified we don't have to do anything
254  if(branchToReadingWorker.size()==0) {
255  return;
256  }
257  const std::vector<std::string> kEmpty;
258  std::map<Worker*,unsigned int> reserveSizeForWorker;
259  unsigned int upperLimitOnReadingWorker =0;
260  unsigned int upperLimitOnIndicies = 0;
261  unsigned int nUniqueBranchesToDelete=branchToReadingWorker.size();
262 
263  //talk with output modules first
264  modReg.forAllModuleHolders([this, &branchToReadingWorker,&nUniqueBranchesToDelete](maker::ModuleHolder* iHolder){
265  auto comm = iHolder->createOutputModuleCommunicator();
266  if (comm) {
267  if(branchToReadingWorker.size()>0) {
268  //If an OutputModule needs a product, we can't delete it early
269  // so we should remove it from our list
270  SelectedProductsForBranchType const& kept = comm->keptProducts();
271  for(auto const& item: kept[InEvent]) {
272  BranchDescription const& desc = *item.first;
273  auto found = branchToReadingWorker.equal_range(desc.branchName());
274  if(found.first !=found.second) {
275  --nUniqueBranchesToDelete;
276  branchToReadingWorker.erase(found.first,found.second);
277  }
278  }
279  }
280  }
281  });
282 
283  if(branchToReadingWorker.size()==0) {
284  return;
285  }
286 
287  for (auto w :allWorkers()) {
288  //determine if this module could read a branch we want to delete early
289  auto pset = pset::Registry::instance()->getMapped(w->description().parameterSetID());
290  if(0!=pset) {
291  auto branches = pset->getUntrackedParameter<std::vector<std::string>>("mightGet",kEmpty);
292  if(not branches.empty()) {
293  ++upperLimitOnReadingWorker;
294  }
295  for(auto const& branch:branches){
296  auto found = branchToReadingWorker.equal_range(branch);
297  if(found.first != found.second) {
298  ++upperLimitOnIndicies;
299  ++reserveSizeForWorker[w];
300  if(nullptr == found.first->second) {
301  found.first->second = w;
302  } else {
303  branchToReadingWorker.insert(make_pair(found.first->first,w));
304  }
305  }
306  }
307  }
308  }
309  {
310  auto it = branchToReadingWorker.begin();
311  std::vector<std::string> unusedBranches;
312  while(it !=branchToReadingWorker.end()) {
313  if(it->second == nullptr) {
314  unusedBranches.push_back(it->first);
315  //erasing the object invalidates the iterator so must advance it first
316  auto temp = it;
317  ++it;
318  branchToReadingWorker.erase(temp);
319  } else {
320  ++it;
321  }
322  }
323  if(not unusedBranches.empty()) {
324  LogWarning l("UnusedProductsForCanDeleteEarly");
325  l<<"The following products in the 'canDeleteEarly' list are not used in this job and will be ignored.\n"
326  " If possible, remove the producer from the job or add the product to the producer's own 'mightGet' list.";
327  for(auto const& n:unusedBranches){
328  l<<"\n "<<n;
329  }
330  }
331  }
332  if(0!=branchToReadingWorker.size()) {
333  earlyDeleteHelpers_.reserve(upperLimitOnReadingWorker);
334  earlyDeleteHelperToBranchIndicies_.resize(upperLimitOnIndicies,0);
335  earlyDeleteBranchToCount_.reserve(nUniqueBranchesToDelete);
336  std::map<const Worker*,EarlyDeleteHelper*> alreadySeenWorkers;
337  std::string lastBranchName;
338  size_t nextOpenIndex = 0;
339  unsigned int* beginAddress = &(earlyDeleteHelperToBranchIndicies_.front());
340  for(auto& branchAndWorker:branchToReadingWorker) {
341  if(lastBranchName != branchAndWorker.first) {
342  //have to put back the period we removed earlier in order to get the proper name
343  BranchID bid(branchAndWorker.first+".");
344  earlyDeleteBranchToCount_.emplace_back(bid,0U);
345  lastBranchName = branchAndWorker.first;
346  }
347  auto found = alreadySeenWorkers.find(branchAndWorker.second);
348  if(alreadySeenWorkers.end() == found) {
349  //NOTE: we will set aside enough space in earlyDeleteHelperToBranchIndicies_ to accommodate
350  // all the branches that might be read by this worker. However, initially we will only tell the
351  // EarlyDeleteHelper about the first one. As additional branches are added via 'appendIndex' the
352  // EarlyDeleteHelper will automatically advance its internal end pointer.
353  size_t index = nextOpenIndex;
354  size_t nIndices = reserveSizeForWorker[branchAndWorker.second];
356  earlyDeleteHelpers_.emplace_back(beginAddress+index,
357  beginAddress+index+1,
359  branchAndWorker.second->setEarlyDeleteHelper(&(earlyDeleteHelpers_.back()));
360  alreadySeenWorkers.insert(std::make_pair(branchAndWorker.second,&(earlyDeleteHelpers_.back())));
361  nextOpenIndex +=nIndices;
362  } else {
363  found->second->appendIndex(earlyDeleteBranchToCount_.size()-1);
364  }
365  }
366 
367  //Now we can compactify the earlyDeleteHelperToBranchIndicies_ since we may have over estimated the
368  // space needed for each module
369  auto itLast = earlyDeleteHelpers_.begin();
370  for(auto it = earlyDeleteHelpers_.begin()+1;it != earlyDeleteHelpers_.end();++it) {
371  if(itLast->end() != it->begin()) {
372  //figure the offset for next Worker since it hasn't been moved yet so it has the original address
373  unsigned int delta = it->begin()- itLast->end();
374  it->shiftIndexPointers(delta);
375 
377  (itLast->end()-beginAddress),
379  (it->begin()-beginAddress));
380  }
381  itLast = it;
382  }
383  earlyDeleteHelperToBranchIndicies_.erase(earlyDeleteHelperToBranchIndicies_.begin()+(itLast->end()-beginAddress),
385 
386  //now tell the paths about the deleters
387  for(auto& p : trig_paths_) {
388  p.setEarlyDeleteHelpers(alreadySeenWorkers);
389  }
390  for(auto& p : end_paths_) {
391  p.setEarlyDeleteHelpers(alreadySeenWorkers);
392  }
394  }
395  }
396 
398  ProductRegistry& preg,
399  PreallocationConfiguration const* prealloc,
400  std::shared_ptr<ProcessConfiguration const> processConfiguration,
401  std::string const& pathName,
402  bool ignoreFilters,
403  PathWorkers& out,
404  vstring* labelsOnPaths) {
405  vstring modnames = proc_pset.getParameter<vstring>(pathName);
406  PathWorkers tmpworkers;
407 
408  unsigned int placeInPath = 0;
409  for (auto const& name : modnames) {
410 
411  if (labelsOnPaths) labelsOnPaths->push_back(name);
412 
414  if (name[0] == '!') filterAction = WorkerInPath::Veto;
415  else if (name[0] == '-') filterAction = WorkerInPath::Ignore;
416 
417  std::string moduleLabel = name;
418  if (filterAction != WorkerInPath::Normal) moduleLabel.erase(0, 1);
419 
420  bool isTracked;
421  ParameterSet* modpset = proc_pset.getPSetForUpdate(moduleLabel, isTracked);
422  if (modpset == 0) {
423  std::string pathType("endpath");
424  if (!search_all(end_path_name_list_, pathName)) {
425  pathType = std::string("path");
426  }
428  "The unknown module label \"" << moduleLabel <<
429  "\" appears in " << pathType << " \"" << pathName <<
430  "\"\n please check spelling or remove that label from the path.";
431  }
432  assert(isTracked);
433 
434  Worker* worker = workerManager_.getWorker(*modpset, preg, prealloc, processConfiguration, moduleLabel);
435  if (ignoreFilters && filterAction != WorkerInPath::Ignore && worker->moduleType()==Worker::kFilter) {
436  // We have a filter on an end path, and the filter is not explicitly ignored.
437  // See if the filter is allowed.
438  std::vector<std::string> allowed_filters = proc_pset.getUntrackedParameter<vstring>("@filters_on_endpaths");
439  if (!search_all(allowed_filters, worker->description().moduleName())) {
440  // Filter is not allowed. Ignore the result, and issue a warning.
441  filterAction = WorkerInPath::Ignore;
442  LogWarning("FilterOnEndPath")
443  << "The EDFilter '" << worker->description().moduleName() << "' with module label '" << moduleLabel << "' appears on EndPath '" << pathName << "'.\n"
444  << "The return value of the filter will be ignored.\n"
445  << "To suppress this warning, either remove the filter from the endpath,\n"
446  << "or explicitly ignore it in the configuration by using cms.ignore().\n";
447  }
448  }
449  tmpworkers.emplace_back(worker, filterAction, placeInPath);
450  ++placeInPath;
451  }
452 
453  out.swap(tmpworkers);
454  }
455 
457  ProductRegistry& preg,
458  PreallocationConfiguration const* prealloc,
459  std::shared_ptr<ProcessConfiguration const> processConfiguration,
460  int bitpos, std::string const& name, TrigResPtr trptr,
461  vstring* labelsOnTriggerPaths) {
462  using std::placeholders::_1;
463  PathWorkers tmpworkers;
464  Workers holder;
465  fillWorkers(proc_pset, preg, prealloc, processConfiguration, name, false, tmpworkers, labelsOnTriggerPaths);
466 
467  for (PathWorkers::iterator wi(tmpworkers.begin()),
468  we(tmpworkers.end()); wi != we; ++wi) {
469  holder.push_back(wi->getWorker());
470  }
471 
472  // an empty path will cause an extra bit that is not used
473  if (!tmpworkers.empty()) {
474  trig_paths_.emplace_back(bitpos, name, tmpworkers, trptr, actionTable(), actReg_, &streamContext_, &skippingEvent_, PathContext::PathType::kPath);
475  } else {
476  empty_trig_paths_.push_back(bitpos);
477  empty_trig_path_names_.push_back(name);
478  }
479  for_all(holder, std::bind(&StreamSchedule::addToAllWorkers, this, _1));
480  }
481 
483  ProductRegistry& preg,
484  PreallocationConfiguration const* prealloc,
485  std::shared_ptr<ProcessConfiguration const> processConfiguration,
486  int bitpos, std::string const& name) {
487  using std::placeholders::_1;
488  PathWorkers tmpworkers;
489  fillWorkers(proc_pset, preg, prealloc, processConfiguration, name, true, tmpworkers, 0);
490  Workers holder;
491 
492  for (PathWorkers::iterator wi(tmpworkers.begin()), we(tmpworkers.end()); wi != we; ++wi) {
493  holder.push_back(wi->getWorker());
494  }
495 
496  if (!tmpworkers.empty()) {
497  //EndPaths are not supposed to stop if SkipEvent type exception happens
498  end_paths_.emplace_back(bitpos, name, tmpworkers, TrigResPtr(), actionTable(), actReg_, &streamContext_, nullptr, PathContext::PathType::kEndPath);
499  }
500  for_all(holder, std::bind(&StreamSchedule::addToAllWorkers, this, _1));
501  }
502 
505  }
506 
509  }
510 
512  std::string const& iLabel) {
513  Worker* found = nullptr;
514  for (auto const& worker : allWorkers()) {
515  if (worker->description().moduleLabel() == iLabel) {
516  found = worker;
517  break;
518  }
519  }
520  if (nullptr == found) {
521  return;
522  }
523 
524  iMod->replaceModuleFor(found);
526  }
527 
528  std::vector<ModuleDescription const*>
530  std::vector<ModuleDescription const*> result;
531  result.reserve(allWorkers().size());
532 
533  for (auto const& worker : allWorkers()) {
534  ModuleDescription const* p = worker->descPtr();
535  result.push_back(p);
536  }
537  return result;
538  }
539 
541  EventPrincipal& ep,
542  EventSetup const& es)
543  {
544  this->resetAll();
545  for (int empty_trig_path : empty_trig_paths_) {
546  results_->at(empty_trig_path) = HLTPathStatus(hlt::Pass, 0);
547  }
548 
550 
551  Traits::setStreamContext(streamContext_, ep);
552  Traits::preScheduleSignal(actReg_.get(), &streamContext_);
553 
554  // This call takes care of the unscheduled processing.
556 
557  ++total_events_;
558  auto serviceToken = ServiceRegistry::instance().presentToken();
559  auto pathsDone = make_waiting_task(tbb::task::allocate_root(),
560  [iTask,&ep, &es, this,serviceToken](std::exception_ptr const* iPtr) mutable
561  {
562  ServiceRegistry::Operate operate(serviceToken);
563 
564  std::exception_ptr ptr;
565  if(iPtr) {
566  ptr = *iPtr;
567  }
568  finishedPaths(ptr, std::move(iTask), ep, es);
569  });
570 
571  //The holder guarantees that if the paths finish before the loop ends
572  // that we do not start too soon. It also guarantees that the task will
573  // run under that condition.
574  WaitingTaskHolder taskHolder(pathsDone);
575 
576  for(auto it = trig_paths_.rbegin(), itEnd = trig_paths_.rend();
577  it != itEnd; ++ it) {
578  it->processOneOccurrenceAsync(pathsDone,ep, es, streamID_, &streamContext_);
579  }
580  }
581 
582  void
583  StreamSchedule::finishedPaths(std::exception_ptr iExcept, WaitingTaskHolder iWait, EventPrincipal& ep,
584  EventSetup const& es) {
585 
586  if(iExcept) {
587  try {
588  std::rethrow_exception(iExcept);
589  }
590  catch(cms::Exception& e) {
592  assert (action != exception_actions::IgnoreCompletely);
593  assert (action != exception_actions::FailPath);
594  if (action == exception_actions::SkipEvent) {
595  edm::printCmsExceptionWarning("SkipEvent", e);
596  iExcept = std::exception_ptr();
597  } else {
598  iExcept = std::current_exception();
599  }
600  }
601  catch(...) {
602  iExcept = std::current_exception();
603  }
604  }
605 
606 
607  if((not iExcept) and results_->accept()) {
608  ++total_passed_;
609  }
610 
611  if((not iExcept) and (nullptr != results_inserter_.get())) {
612  try {
613  ParentContext parentContext(&streamContext_);
615 
616  results_inserter_->doWork<Traits>(ep, es, streamID_, parentContext, &streamContext_);
617  }
618  catch (cms::Exception & ex) {
619  if(ex.context().empty()) {
620  std::ostringstream ost;
621  ost << "Processing Event " << ep.id();
622  ex.addContext(ost.str());
623  }
624  iExcept = std::current_exception();
625  }
626  catch(...) {
627  iExcept = std::current_exception();
628  }
629  }
630  if(end_paths_.empty() or iExcept or (not endpathsAreActive_)) {
631  iExcept = finishProcessOneEvent(iExcept);
632  iWait.doneWaiting(iExcept);
633  } else {
634  auto serviceToken = ServiceRegistry::instance().presentToken();
635 
636  auto endPathsDone = make_waiting_task(tbb::task::allocate_root(),
637  [iWait,this,serviceToken](std::exception_ptr const* iPtr) mutable
638  {
639  ServiceRegistry::Operate operate(serviceToken);
640 
641  std::exception_ptr ptr;
642  if(iPtr) {
643  ptr = *iPtr;
644  }
646  });
647  //The holder guarantees that if the paths finish before the loop ends
648  // that we do not start too soon. It also guarantees that the task will
649  // run under that condition.
650  WaitingTaskHolder taskHolder(endPathsDone);
651  for(auto it = end_paths_.rbegin(), itEnd = end_paths_.rend();
652  it != itEnd; ++it) {
653  it->processOneOccurrenceAsync(endPathsDone,ep, es, streamID_, &streamContext_);
654  }
655  }
656  }
657 
658 
659  std::exception_ptr
660  StreamSchedule::finishProcessOneEvent(std::exception_ptr iExcept) {
662 
663  if(iExcept) {
664  //add context information to the exception and print message
665  try {
666  convertException::wrap([&]() {
667  std::rethrow_exception(iExcept);
668  });
669  } catch(cms::Exception& ex) {
670  bool const cleaningUpAfterException = false;
671  if (ex.context().empty()) {
672  addContextAndPrintException("Calling function StreamSchedule::processOneEvent", ex, cleaningUpAfterException);
673  } else {
674  addContextAndPrintException("", ex, cleaningUpAfterException);
675  }
676  iExcept = std::current_exception();
677  }
678 
679  actReg_->preStreamEarlyTerminationSignal_(streamContext_,TerminationOrigin::ExceptionFromThisContext);
680  }
681 
682  try {
683  Traits::postScheduleSignal(actReg_.get(), &streamContext_);
684  } catch(...) {
685  if(not iExcept) {
686  iExcept = std::current_exception();
687  }
688  }
689  if(not iExcept ) {
691  }
692 
693  return iExcept;
694  }
695 
696 
697  void
698  StreamSchedule::availablePaths(std::vector<std::string>& oLabelsToFill) const {
699  oLabelsToFill.reserve(trig_paths_.size());
700  std::transform(trig_paths_.begin(),
701  trig_paths_.end(),
702  std::back_inserter(oLabelsToFill),
703  std::bind(&Path::name, std::placeholders::_1));
704  }
705 
706  void
707  StreamSchedule::triggerPaths(std::vector<std::string>& oLabelsToFill) const {
708  oLabelsToFill = trig_name_list_;
709  }
710 
711  void
712  StreamSchedule::endPaths(std::vector<std::string>& oLabelsToFill) const {
713  oLabelsToFill = end_path_name_list_;
714  }
715 
716  void
718  std::vector<std::string>& oLabelsToFill) const {
719  TrigPaths::const_iterator itFound =
720  std::find_if (trig_paths_.begin(),
721  trig_paths_.end(),
722  std::bind(std::equal_to<std::string>(),
723  iPathLabel,
724  std::bind(&Path::name, std::placeholders::_1)));
725  if (itFound!=trig_paths_.end()) {
726  oLabelsToFill.reserve(itFound->size());
727  for (size_t i = 0; i < itFound->size(); ++i) {
728  oLabelsToFill.push_back(itFound->getWorker(i)->description().moduleLabel());
729  }
730  }
731  }
732 
733  void
735  std::vector<ModuleDescription const*>& descriptions,
736  unsigned int hint) const {
737  descriptions.clear();
738  bool found = false;
739  TrigPaths::const_iterator itFound;
740 
741  if(hint < trig_paths_.size()) {
742  itFound = trig_paths_.begin() + hint;
743  if(itFound->name() == iPathLabel) found = true;
744  }
745  if(!found) {
746  // if the hint did not work, do it the slow way
747  itFound = std::find_if (trig_paths_.begin(),
748  trig_paths_.end(),
749  std::bind(std::equal_to<std::string>(),
750  iPathLabel,
751  std::bind(&Path::name, std::placeholders::_1)));
752  if (itFound != trig_paths_.end()) found = true;
753  }
754  if (found) {
755  descriptions.reserve(itFound->size());
756  for (size_t i = 0; i < itFound->size(); ++i) {
757  descriptions.push_back(itFound->getWorker(i)->descPtr());
758  }
759  }
760  }
761 
762  void
764  std::vector<ModuleDescription const*>& descriptions,
765  unsigned int hint) const {
766  descriptions.clear();
767  bool found = false;
768  TrigPaths::const_iterator itFound;
769 
770  if(hint < end_paths_.size()) {
771  itFound = end_paths_.begin() + hint;
772  if(itFound->name() == iEndPathLabel) found = true;
773  }
774  if(!found) {
775  // if the hint did not work, do it the slow way
776  itFound = std::find_if (end_paths_.begin(),
777  end_paths_.end(),
778  std::bind(std::equal_to<std::string>(),
779  iEndPathLabel,
780  std::bind(&Path::name, std::placeholders::_1)));
781  if (itFound != end_paths_.end()) found = true;
782  }
783  if (found) {
784  descriptions.reserve(itFound->size());
785  for (size_t i = 0; i < itFound->size(); ++i) {
786  descriptions.push_back(itFound->getWorker(i)->descPtr());
787  }
788  }
789  }
790 
791  void
793  endpathsAreActive_ = active;
794  }
795 
796  bool
798  return endpathsAreActive_;
799  }
800 
801  static void
803  size_t which,
804  ModuleInPathSummary& sum) {
805  sum.timesVisited += path.timesVisited(which);
806  sum.timesPassed += path.timesPassed(which);
807  sum.timesFailed += path.timesFailed(which);
808  sum.timesExcept += path.timesExcept(which);
809  sum.moduleLabel = path.getWorker(which)->description().moduleLabel();
810  }
811 
812  static void
814  sum.name = path.name();
815  sum.bitPosition = path.bitPosition();
816  sum.timesRun += path.timesRun();
817  sum.timesPassed += path.timesPassed();
818  sum.timesFailed += path.timesFailed();
819  sum.timesExcept += path.timesExcept();
820 
821  Path::size_type sz = path.size();
822  if(sum.moduleInPathSummaries.size()==0) {
823  std::vector<ModuleInPathSummary> temp(sz);
824  for (size_t i = 0; i != sz; ++i) {
825  fillModuleInPathSummary(path, i, temp[i]);
826  }
827  sum.moduleInPathSummaries.swap(temp);
828  } else {
829  assert(sz == sum.moduleInPathSummaries.size());
830  for (size_t i = 0; i != sz; ++i) {
832  }
833  }
834  }
835 
836  static void
838  sum.timesVisited += w.timesVisited();
839  sum.timesRun += w.timesRun();
840  sum.timesPassed += w.timesPassed();
841  sum.timesFailed += w.timesFailed();
842  sum.timesExcept += w.timesExcept();
843  sum.moduleLabel = w.description().moduleLabel();
844  }
845 
846  static void
848  fillWorkerSummaryAux(*pw, sum);
849  }
850 
851  void
856 
857  fill_summary(trig_paths_, rep.trigPathSummaries, &fillPathSummary);
858  fill_summary(end_paths_, rep.endPathSummaries, &fillPathSummary);
859  fill_summary(allWorkers(), rep.workerSummaries, &fillWorkerSummary);
860  }
861 
862  void
864  using std::placeholders::_1;
866  for_all(trig_paths_, std::bind(&Path::clearCounters, _1));
867  for_all(end_paths_, std::bind(&Path::clearCounters, _1));
868  for_all(allWorkers(), std::bind(&Worker::clearCounters, _1));
869  }
870 
871  void
873  skippingEvent_ = false;
874  results_->reset();
875  }
876 
877  void
880  }
881 
882  void
884  //must be sure we have cleared the count first
885  for(auto& count:earlyDeleteBranchToCount_) {
886  count.count = 0;
887  }
888  //now reset based on how many helpers use that branch
890  ++(earlyDeleteBranchToCount_[index].count);
891  }
892  for(auto& helper: earlyDeleteHelpers_) {
893  helper.reset();
894  }
895  }
896 
897 }
static void fillModuleInPathSummary(Path const &path, size_t which, ModuleInPathSummary &sum)
void triggerPaths(std::vector< std::string > &oLabelsToFill) const
size
Write out results.
dbl * delta
Definition: mlp_gen.cc:36
void moduleDescriptionsInEndPath(std::string const &iEndPathLabel, std::vector< ModuleDescription const * > &descriptions, unsigned int hint) const
T getParameter(std::string const &) const
std::vector< PathSummary > endPathSummaries
Definition: TriggerReport.h:68
T getUntrackedParameter(std::string const &, T const &) const
std::string const & branchName() const
int timesRun() const
Definition: Path.h:79
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
int bitPosition() const
Definition: Path.h:74
std::vector< Worker * > Workers
ModuleDescription const & description() const
Definition: Worker.h:132
std::vector< int > empty_trig_paths_
Definition: helper.py:1
void moduleDescriptionsInPath(std::string const &iPathLabel, std::vector< ModuleDescription const * > &descriptions, unsigned int hint) const
roAction_t actions[nactions]
Definition: GenABIO.cc:187
void endStream(StreamID iID, StreamContext &streamContext)
void fillEndPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name)
const double w
Definition: UKUtility.cc:23
virtual void replaceModuleFor(Worker *) const =0
int totalEventsFailed() const
std::array< SelectedProducts, NumBranchTypes > SelectedProductsForBranchType
void initializeEarlyDelete(ModuleRegistry &modReg, edm::ParameterSet const &opts, edm::ProductRegistry const &preg, bool allowEarlyDelete)
std::shared_ptr< HLTGlobalStatus > TrigResPtr
void setOnDemandProducts(ProductRegistry &pregistry, std::set< std::string > const &unscheduledLabels) const
WorkersInPath::size_type size_type
Definition: Path.h:47
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
void addToUnscheduledWorkers(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration > processConfiguration, std::string label, std::set< std::string > &unscheduledLabels, std::vector< std::string > &shouldBeUsedLabels)
int timesPassed() const
Definition: Worker.h:168
volatile bool endpathsAreActive_
std::vector< BranchToCount > earlyDeleteBranchToCount_
EventID const & id() const
int totalEvents() const
void clearCounters()
Definition: Worker.h:154
void addContextAndPrintException(char const *context, cms::Exception &ex, bool disablePrint)
std::string const & moduleName() const
std::string const & category() const
Definition: Exception.cc:183
void addToAllWorkers(Worker *w)
int timesExcept() const
Definition: Path.h:82
exception_actions::ActionCodes find(const std::string &category) const
size_type size() const
Definition: Path.h:86
ParameterSet getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
edm::propagate_const< WorkerPtr > results_inserter_
ExceptionToActionTable const & actionTable() const
returns the action table
void beginStream(StreamID id, StreamContext &streamContext)
Definition: Worker.cc:277
std::shared_ptr< Worker > WorkerPtr
std::vector< WorkerSummary > workerSummaries
Definition: TriggerReport.h:69
ServiceToken presentToken() const
std::string const & moduleLabel() const
void processOneEventAsync(WaitingTaskHolder iTask, EventPrincipal &ep, EventSetup const &es)
unsigned int number_of_unscheduled_modules_
WorkerManager workerManager_
std::shared_ptr< ActivityRegistry > actReg_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
std::string moduleLabel
Definition: TriggerReport.h:56
std::vector< WorkerInPath > PathWorkers
int timesExcept() const
Definition: Worker.h:170
int totalEventsPassed() const
void getTriggerReport(TriggerReport &rep) const
void doneWaiting(std::exception_ptr iExcept)
std::vector< PathSummary > trigPathSummaries
Definition: TriggerReport.h:67
EventSummary eventSummary
Definition: TriggerReport.h:66
accept
Definition: HLTenums.h:19
StreamSchedule(std::shared_ptr< TriggerResultInserter > inserter, std::shared_ptr< ModuleRegistry >, ParameterSet &proc_pset, service::TriggerNamesService &tns, PreallocationConfiguration const &prealloc, ProductRegistry &pregistry, BranchIDListHelper &branchIDListHelper, ExceptionToActionTable const &actions, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration > processConfiguration, bool allowEarlyDelete, StreamID streamID, ProcessContext const *processContext)
int timesVisited() const
Definition: Worker.h:167
std::string name
Definition: TriggerReport.h:44
void fillWorkers(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &name, bool ignoreFilters, PathWorkers &out, vstring *labelsOnPaths)
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
def unique(seq, keepstr=True)
Definition: tier0.py:24
Definition: Path.h:42
StreamContext streamContext_
std::list< std::string > const & context() const
Definition: Exception.cc:191
static void fillPathSummary(Path const &path, PathSummary &sum)
static ServiceRegistry & instance()
int timesPassed() const
Definition: Path.h:80
bool getMapped(key_type const &k, value_type &result) const
Definition: Registry.cc:18
int timesRun() const
Definition: Worker.h:166
#define end
Definition: vmac.h:37
rep
Definition: cuy.py:1188
std::exception_ptr finishProcessOneEvent(std::exception_ptr)
vstring empty_trig_path_names_
element_type const * get() const
static void fillWorkerSummaryAux(Worker const &w, WorkerSummary &sum)
void beginStream(StreamID iID, StreamContext &streamContext)
void clearCounters()
Clear all the counters in the trigger report.
unsigned int value() const
Definition: StreamID.h:46
std::vector< unsigned int > earlyDeleteHelperToBranchIndicies_
void forAllModuleHolders(F iFunc)
edm::propagate_const< TrigResPtr > results_
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:90
bool search_all(ForwardSequence const &s, Datum const &d)
Definition: Algorithms.h:46
int timesVisited(size_type i) const
Definition: Path.h:87
double b
Definition: hdecay.h:120
void addContext(std::string const &context)
Definition: Exception.cc:227
virtual std::unique_ptr< OutputModuleCommunicator > createOutputModuleCommunicator()=0
int timesFailed() const
Definition: Worker.h:169
static void fillWorkerSummary(Worker const *pw, WorkerSummary &sum)
bool endPathsEnabled() const
std::atomic< bool > skippingEvent_
std::string const & name() const
Definition: Path.h:75
#define begin
Definition: vmac.h:30
HLT enums.
void fillTrigPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, TrigResPtr, vstring *labelsOnTriggerPaths)
Worker const * getWorker(size_type i) const
Definition: Path.h:91
void replaceModule(maker::ModuleHolder *iMod, std::string const &iLabel)
clone the type of module with label iLabel but configure with iPSet.
void enableEndPaths(bool active)
def which(cmd)
Definition: eostools.py:335
std::vector< ModuleInPathSummary > moduleInPathSummaries
Definition: TriggerReport.h:45
virtual Types moduleType() const =0
auto wrap(F iFunc) -> decltype(iFunc())
void finishedPaths(std::exception_ptr, WaitingTaskHolder, EventPrincipal &ep, EventSetup const &es)
void setupOnDemandSystem(Principal &principal, EventSetup const &es)
TrigResConstPtr results() const
std::vector< std::string > vstring
Definition: Schedule.cc:368
def move(src, dest)
Definition: eostools.py:510
static Registry * instance()
Definition: Registry.cc:12
Worker * getWorker(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &label)
void clearCounters()
Definition: Path.cc:164
void availablePaths(std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill the labels for all paths in the process
std::vector< EarlyDeleteHelper > earlyDeleteHelpers_
void modulesInPath(std::string const &iPathLabel, std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel ...
void addToAllWorkers(Worker *w)
int timesFailed() const
Definition: Path.h:81
void printCmsExceptionWarning(char const *behavior, cms::Exception const &e)
def operate(timelog, memlog, json_f, num)
void endPaths(std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill the labels for all end paths in the process
ParameterSet * getPSetForUpdate(std::string const &name, bool &isTracked)