CMS 3D CMS Logo

StreamSchedule.cc
Go to the documentation of this file.
2 
26 
27 #include <algorithm>
28 #include <cassert>
29 #include <cstdlib>
30 #include <functional>
31 #include <iomanip>
32 #include <list>
33 #include <map>
34 #include <exception>
35 
36 namespace edm {
37  namespace {
38 
39  // Function template to transform each element in the input range to
40  // a value placed into the output range. The supplied function
41  // should take a const_reference to the 'input', and write to a
42  // reference to the 'output'.
43  template <typename InputIterator, typename ForwardIterator, typename Func>
44  void
45  transform_into(InputIterator begin, InputIterator end,
46  ForwardIterator out, Func func) {
47  for (; begin != end; ++begin, ++out) func(*begin, *out);
48  }
49 
50  // Function template that takes a sequence 'from', a sequence
51  // 'to', and a callable object 'func'. It and applies
52  // transform_into to fill the 'to' sequence with the values
53  // calcuated by the callable object, taking care to fill the
54  // outupt only if all calls succeed.
55  template <typename FROM, typename TO, typename FUNC>
56  void
57  fill_summary(FROM const& from, TO& to, FUNC func) {
58  if(to.size()!=from.size()) {
59  TO temp(from.size());
60  transform_into(from.begin(), from.end(), temp.begin(), func);
61  to.swap(temp);
62  } else {
63  transform_into(from.begin(), from.end(), to.begin(), func);
64  }
65  }
66 
67  // -----------------------------
68 
69  // Here we make the trigger results inserter directly. This should
70  // probably be a utility in the WorkerRegistry or elsewhere.
71 
73  makeInserter(ExceptionToActionTable const& actions,
74  std::shared_ptr<ActivityRegistry> areg,
75  std::shared_ptr<TriggerResultInserter> inserter) {
76  StreamSchedule::WorkerPtr ptr(new edm::WorkerT<TriggerResultInserter::ModuleType>(inserter, inserter->moduleDescription(), &actions));
77  ptr->setActivityRegistry(areg);
78  return ptr;
79  }
80 
81  void
82  initializeBranchToReadingWorker(ParameterSet const& opts,
83  ProductRegistry const& preg,
84  std::multimap<std::string,Worker*>& branchToReadingWorker)
85  {
86  // See if any data has been marked to be deleted early (removing any duplicates)
87  auto vBranchesToDeleteEarly = opts.getUntrackedParameter<std::vector<std::string>>("canDeleteEarly",std::vector<std::string>());
88  if(not vBranchesToDeleteEarly.empty()) {
89  std::sort(vBranchesToDeleteEarly.begin(),vBranchesToDeleteEarly.end(),std::less<std::string>());
90  vBranchesToDeleteEarly.erase(std::unique(vBranchesToDeleteEarly.begin(),vBranchesToDeleteEarly.end()),
91  vBranchesToDeleteEarly.end());
92 
93  // Are the requested items in the product registry?
94  auto allBranchNames = preg.allBranchNames();
95  //the branch names all end with a period, which we do not want to compare with
96  for(auto & b:allBranchNames) {
97  b.resize(b.size()-1);
98  }
99  std::sort(allBranchNames.begin(),allBranchNames.end(),std::less<std::string>());
100  std::vector<std::string> temp;
101  temp.reserve(vBranchesToDeleteEarly.size());
102 
103  std::set_intersection(vBranchesToDeleteEarly.begin(),vBranchesToDeleteEarly.end(),
104  allBranchNames.begin(),allBranchNames.end(),
105  std::back_inserter(temp));
106  vBranchesToDeleteEarly.swap(temp);
107  if(temp.size() != vBranchesToDeleteEarly.size()) {
108  std::vector<std::string> missingProducts;
109  std::set_difference(temp.begin(),temp.end(),
110  vBranchesToDeleteEarly.begin(),vBranchesToDeleteEarly.end(),
111  std::back_inserter(missingProducts));
112  LogInfo l("MissingProductsForCanDeleteEarly");
113  l<<"The following products in the 'canDeleteEarly' list are not available in this job and will be ignored.";
114  for(auto const& n:missingProducts){
115  l<<"\n "<<n;
116  }
117  }
118  //set placeholder for the branch, we will remove the nullptr if a
119  // module actually wants the branch.
120  for(auto const& branch:vBranchesToDeleteEarly) {
121  branchToReadingWorker.insert(std::make_pair(branch, static_cast<Worker*>(nullptr)));
122  }
123  }
124  }
125  }
126 
127  // -----------------------------
128 
129  typedef std::vector<std::string> vstring;
130 
131  // -----------------------------
132 
133  StreamSchedule::StreamSchedule(std::shared_ptr<TriggerResultInserter> inserter,
134  std::shared_ptr<ModuleRegistry> modReg,
135  ParameterSet& proc_pset,
137  PreallocationConfiguration const& prealloc,
138  ProductRegistry& preg,
139  BranchIDListHelper& branchIDListHelper,
140  ExceptionToActionTable const& actions,
141  std::shared_ptr<ActivityRegistry> areg,
142  std::shared_ptr<ProcessConfiguration> processConfiguration,
143  bool allowEarlyDelete,
144  StreamID streamID,
145  ProcessContext const* processContext) :
146  workerManager_(modReg,areg, actions),
147  actReg_(areg),
148  trig_name_list_(tns.getTrigPaths()),
149  end_path_name_list_(tns.getEndPaths()),
150  results_(new HLTGlobalStatus(trig_name_list_.size())),
151  results_inserter_(),
152  trig_paths_(),
153  end_paths_(),
154  total_events_(),
155  total_passed_(),
156  number_of_unscheduled_modules_(0),
157  streamID_(streamID),
158  streamContext_(streamID_, processContext),
159  endpathsAreActive_(true),
160  skippingEvent_(false){
161 
162  ParameterSet const& opts = proc_pset.getUntrackedParameterSet("options", ParameterSet());
163  bool hasPath = false;
164 
165  int trig_bitpos = 0;
166  trig_paths_.reserve(trig_name_list_.size());
167  vstring labelsOnTriggerPaths;
168  for (auto const& trig_name : trig_name_list_) {
169  fillTrigPath(proc_pset, preg, &prealloc, processConfiguration, trig_bitpos, trig_name, results(), &labelsOnTriggerPaths);
170  ++trig_bitpos;
171  hasPath = true;
172  }
173 
174  if (hasPath) {
175  // the results inserter stands alone
176  inserter->setTrigResultForStream(streamID.value(), results());
177 
178  results_inserter_ = makeInserter(actions, actReg_, inserter);
180  }
181 
182  // fill normal endpaths
183  int bitpos = 0;
184  end_paths_.reserve(end_path_name_list_.size());
185  for (auto const& end_path_name : end_path_name_list_) {
186  fillEndPath(proc_pset, preg, &prealloc, processConfiguration, bitpos, end_path_name);
187  ++bitpos;
188  }
189 
190  //See if all modules were used
191  std::set<std::string> usedWorkerLabels;
192  for (auto const& worker : allWorkers()) {
193  usedWorkerLabels.insert(worker->description().moduleLabel());
194  }
195  std::vector<std::string> modulesInConfig(proc_pset.getParameter<std::vector<std::string> >("@all_modules"));
196  std::set<std::string> modulesInConfigSet(modulesInConfig.begin(), modulesInConfig.end());
197  std::vector<std::string> unusedLabels;
198  set_difference(modulesInConfigSet.begin(), modulesInConfigSet.end(),
199  usedWorkerLabels.begin(), usedWorkerLabels.end(),
200  back_inserter(unusedLabels));
201  //does the configuration say we should allow on demand?
202  bool allowUnscheduled = opts.getUntrackedParameter<bool>("allowUnscheduled", false);
203  std::set<std::string> unscheduledLabels;
204  std::vector<std::string> shouldBeUsedLabels;
205  if (!unusedLabels.empty()) {
206  //Need to
207  // 1) create worker
208  // 2) if it is a WorkerT<EDProducer>, add it to our list
209  // 3) hand list to our delayed reader
210  for (auto const& label : unusedLabels) {
211  if (allowUnscheduled) {
212  bool isTracked;
213  ParameterSet* modulePSet(proc_pset.getPSetForUpdate(label, isTracked));
214  assert(isTracked);
215  assert(modulePSet != nullptr);
216  workerManager_.addToUnscheduledWorkers(*modulePSet, preg, &prealloc, processConfiguration, label, unscheduledLabels, shouldBeUsedLabels);
217  } else {
218  //everthing is marked are unused so no 'on demand' allowed
219  shouldBeUsedLabels.push_back(label);
220  }
221  }
222  if (!shouldBeUsedLabels.empty()) {
223  std::ostringstream unusedStream;
224  unusedStream << "'" << shouldBeUsedLabels.front() << "'";
225  for (std::vector<std::string>::iterator itLabel = shouldBeUsedLabels.begin() + 1,
226  itLabelEnd = shouldBeUsedLabels.end();
227  itLabel != itLabelEnd;
228  ++itLabel) {
229  unusedStream << ",'" << *itLabel << "'";
230  }
231  LogInfo("path")
232  << "The following module labels are not assigned to any path:\n"
233  << unusedStream.str()
234  << "\n";
235  }
236  }
237  if (!unscheduledLabels.empty()) {
238  number_of_unscheduled_modules_=unscheduledLabels.size();
239  workerManager_.setOnDemandProducts(preg, unscheduledLabels);
240  }
241 
242 
243  initializeEarlyDelete(*modReg, opts,preg,allowEarlyDelete);
244 
245  } // StreamSchedule::StreamSchedule
246 
247 
249  edm::ParameterSet const& opts, edm::ProductRegistry const& preg,
250  bool allowEarlyDelete) {
251  //for now, if have a subProcess, don't allow early delete
252  // In the future we should use the SubProcess's 'keep list' to decide what can be kept
253  if(not allowEarlyDelete) return;
254 
255  //see if 'canDeleteEarly' was set and if so setup the list with those products actually
256  // registered for this job
257  std::multimap<std::string,Worker*> branchToReadingWorker;
258  initializeBranchToReadingWorker(opts,preg,branchToReadingWorker);
259 
260  //If no delete early items have been specified we don't have to do anything
261  if(branchToReadingWorker.size()==0) {
262  return;
263  }
264  const std::vector<std::string> kEmpty;
265  std::map<Worker*,unsigned int> reserveSizeForWorker;
266  unsigned int upperLimitOnReadingWorker =0;
267  unsigned int upperLimitOnIndicies = 0;
268  unsigned int nUniqueBranchesToDelete=branchToReadingWorker.size();
269 
270  //talk with output modules first
271  modReg.forAllModuleHolders([this, &branchToReadingWorker,&nUniqueBranchesToDelete](maker::ModuleHolder* iHolder){
272  auto comm = iHolder->createOutputModuleCommunicator();
273  if (comm) {
274  if(branchToReadingWorker.size()>0) {
275  //If an OutputModule needs a product, we can't delete it early
276  // so we should remove it from our list
277  SelectedProductsForBranchType const& kept = comm->keptProducts();
278  for(auto const& item: kept[InEvent]) {
279  BranchDescription const& desc = *item.first;
280  auto found = branchToReadingWorker.equal_range(desc.branchName());
281  if(found.first !=found.second) {
282  --nUniqueBranchesToDelete;
283  branchToReadingWorker.erase(found.first,found.second);
284  }
285  }
286  }
287  }
288  });
289 
290  if(branchToReadingWorker.size()==0) {
291  return;
292  }
293 
294  for (auto w :allWorkers()) {
295  //determine if this module could read a branch we want to delete early
296  auto pset = pset::Registry::instance()->getMapped(w->description().parameterSetID());
297  if(0!=pset) {
298  auto branches = pset->getUntrackedParameter<std::vector<std::string>>("mightGet",kEmpty);
299  if(not branches.empty()) {
300  ++upperLimitOnReadingWorker;
301  }
302  for(auto const& branch:branches){
303  auto found = branchToReadingWorker.equal_range(branch);
304  if(found.first != found.second) {
305  ++upperLimitOnIndicies;
306  ++reserveSizeForWorker[w];
307  if(nullptr == found.first->second) {
308  found.first->second = w;
309  } else {
310  branchToReadingWorker.insert(make_pair(found.first->first,w));
311  }
312  }
313  }
314  }
315  }
316  {
317  auto it = branchToReadingWorker.begin();
318  std::vector<std::string> unusedBranches;
319  while(it !=branchToReadingWorker.end()) {
320  if(it->second == nullptr) {
321  unusedBranches.push_back(it->first);
322  //erasing the object invalidates the iterator so must advance it first
323  auto temp = it;
324  ++it;
325  branchToReadingWorker.erase(temp);
326  } else {
327  ++it;
328  }
329  }
330  if(not unusedBranches.empty()) {
331  LogWarning l("UnusedProductsForCanDeleteEarly");
332  l<<"The following products in the 'canDeleteEarly' list are not used in this job and will be ignored.\n"
333  " If possible, remove the producer from the job or add the product to the producer's own 'mightGet' list.";
334  for(auto const& n:unusedBranches){
335  l<<"\n "<<n;
336  }
337  }
338  }
339  if(0!=branchToReadingWorker.size()) {
340  earlyDeleteHelpers_.reserve(upperLimitOnReadingWorker);
341  earlyDeleteHelperToBranchIndicies_.resize(upperLimitOnIndicies,0);
342  earlyDeleteBranchToCount_.reserve(nUniqueBranchesToDelete);
343  std::map<const Worker*,EarlyDeleteHelper*> alreadySeenWorkers;
344  std::string lastBranchName;
345  size_t nextOpenIndex = 0;
346  unsigned int* beginAddress = &(earlyDeleteHelperToBranchIndicies_.front());
347  for(auto& branchAndWorker:branchToReadingWorker) {
348  if(lastBranchName != branchAndWorker.first) {
349  //have to put back the period we removed earlier in order to get the proper name
350  BranchID bid(branchAndWorker.first+".");
351  earlyDeleteBranchToCount_.emplace_back(bid,0U);
352  lastBranchName = branchAndWorker.first;
353  }
354  auto found = alreadySeenWorkers.find(branchAndWorker.second);
355  if(alreadySeenWorkers.end() == found) {
356  //NOTE: we will set aside enough space in earlyDeleteHelperToBranchIndicies_ to accommodate
357  // all the branches that might be read by this worker. However, initially we will only tell the
358  // EarlyDeleteHelper about the first one. As additional branches are added via 'appendIndex' the
359  // EarlyDeleteHelper will automatically advance its internal end pointer.
360  size_t index = nextOpenIndex;
361  size_t nIndices = reserveSizeForWorker[branchAndWorker.second];
363  earlyDeleteHelpers_.emplace_back(beginAddress+index,
364  beginAddress+index+1,
366  branchAndWorker.second->setEarlyDeleteHelper(&(earlyDeleteHelpers_.back()));
367  alreadySeenWorkers.insert(std::make_pair(branchAndWorker.second,&(earlyDeleteHelpers_.back())));
368  nextOpenIndex +=nIndices;
369  } else {
370  found->second->appendIndex(earlyDeleteBranchToCount_.size()-1);
371  }
372  }
373 
374  //Now we can compactify the earlyDeleteHelperToBranchIndicies_ since we may have over estimated the
375  // space needed for each module
376  auto itLast = earlyDeleteHelpers_.begin();
377  for(auto it = earlyDeleteHelpers_.begin()+1;it != earlyDeleteHelpers_.end();++it) {
378  if(itLast->end() != it->begin()) {
379  //figure the offset for next Worker since it hasn't been moved yet so it has the original address
380  unsigned int delta = it->begin()- itLast->end();
381  it->shiftIndexPointers(delta);
382 
384  (itLast->end()-beginAddress),
386  (it->begin()-beginAddress));
387  }
388  itLast = it;
389  }
390  earlyDeleteHelperToBranchIndicies_.erase(earlyDeleteHelperToBranchIndicies_.begin()+(itLast->end()-beginAddress),
392 
393  //now tell the paths about the deleters
394  for(auto& p : trig_paths_) {
395  p.setEarlyDeleteHelpers(alreadySeenWorkers);
396  }
397  for(auto& p : end_paths_) {
398  p.setEarlyDeleteHelpers(alreadySeenWorkers);
399  }
401  }
402  }
403 
405  ProductRegistry& preg,
406  PreallocationConfiguration const* prealloc,
407  std::shared_ptr<ProcessConfiguration const> processConfiguration,
408  std::string const& pathName,
409  bool ignoreFilters,
410  PathWorkers& out,
411  vstring* labelsOnPaths) {
412  vstring modnames = proc_pset.getParameter<vstring>(pathName);
413  PathWorkers tmpworkers;
414 
415  unsigned int placeInPath = 0;
416  for (auto const& name : modnames) {
417 
418  if (labelsOnPaths) labelsOnPaths->push_back(name);
419 
421  if (name[0] == '!') filterAction = WorkerInPath::Veto;
422  else if (name[0] == '-') filterAction = WorkerInPath::Ignore;
423 
424  std::string moduleLabel = name;
425  if (filterAction != WorkerInPath::Normal) moduleLabel.erase(0, 1);
426 
427  bool isTracked;
428  ParameterSet* modpset = proc_pset.getPSetForUpdate(moduleLabel, isTracked);
429  if (modpset == 0) {
430  std::string pathType("endpath");
431  if (!search_all(end_path_name_list_, pathName)) {
432  pathType = std::string("path");
433  }
435  "The unknown module label \"" << moduleLabel <<
436  "\" appears in " << pathType << " \"" << pathName <<
437  "\"\n please check spelling or remove that label from the path.";
438  }
439  assert(isTracked);
440 
441  Worker* worker = workerManager_.getWorker(*modpset, preg, prealloc, processConfiguration, moduleLabel);
442  if (ignoreFilters && filterAction != WorkerInPath::Ignore && worker->moduleType()==Worker::kFilter) {
443  // We have a filter on an end path, and the filter is not explicitly ignored.
444  // See if the filter is allowed.
445  std::vector<std::string> allowed_filters = proc_pset.getUntrackedParameter<vstring>("@filters_on_endpaths");
446  if (!search_all(allowed_filters, worker->description().moduleName())) {
447  // Filter is not allowed. Ignore the result, and issue a warning.
448  filterAction = WorkerInPath::Ignore;
449  LogWarning("FilterOnEndPath")
450  << "The EDFilter '" << worker->description().moduleName() << "' with module label '" << moduleLabel << "' appears on EndPath '" << pathName << "'.\n"
451  << "The return value of the filter will be ignored.\n"
452  << "To suppress this warning, either remove the filter from the endpath,\n"
453  << "or explicitly ignore it in the configuration by using cms.ignore().\n";
454  }
455  }
456  tmpworkers.emplace_back(worker, filterAction, placeInPath);
457  ++placeInPath;
458  }
459 
460  out.swap(tmpworkers);
461  }
462 
464  ProductRegistry& preg,
465  PreallocationConfiguration const* prealloc,
466  std::shared_ptr<ProcessConfiguration const> processConfiguration,
467  int bitpos, std::string const& name, TrigResPtr trptr,
468  vstring* labelsOnTriggerPaths) {
469  using std::placeholders::_1;
470  PathWorkers tmpworkers;
471  Workers holder;
472  fillWorkers(proc_pset, preg, prealloc, processConfiguration, name, false, tmpworkers, labelsOnTriggerPaths);
473 
474  for (PathWorkers::iterator wi(tmpworkers.begin()),
475  we(tmpworkers.end()); wi != we; ++wi) {
476  holder.push_back(wi->getWorker());
477  }
478 
479  // an empty path will cause an extra bit that is not used
480  if (!tmpworkers.empty()) {
481  trig_paths_.emplace_back(bitpos, name, tmpworkers, trptr, actionTable(), actReg_, &streamContext_, &skippingEvent_, PathContext::PathType::kPath);
482  } else {
483  empty_trig_paths_.push_back(bitpos);
484  empty_trig_path_names_.push_back(name);
485  }
486  for_all(holder, std::bind(&StreamSchedule::addToAllWorkers, this, _1));
487  }
488 
490  ProductRegistry& preg,
491  PreallocationConfiguration const* prealloc,
492  std::shared_ptr<ProcessConfiguration const> processConfiguration,
493  int bitpos, std::string const& name) {
494  using std::placeholders::_1;
495  PathWorkers tmpworkers;
496  fillWorkers(proc_pset, preg, prealloc, processConfiguration, name, true, tmpworkers, 0);
497  Workers holder;
498 
499  for (PathWorkers::iterator wi(tmpworkers.begin()), we(tmpworkers.end()); wi != we; ++wi) {
500  holder.push_back(wi->getWorker());
501  }
502 
503  if (!tmpworkers.empty()) {
504  //EndPaths are not supposed to stop if SkipEvent type exception happens
505  end_paths_.emplace_back(bitpos, name, tmpworkers, TrigResPtr(), actionTable(), actReg_, &streamContext_, nullptr, PathContext::PathType::kEndPath);
506  }
507  for_all(holder, std::bind(&StreamSchedule::addToAllWorkers, this, _1));
508  }
509 
512  }
513 
516  }
517 
519  std::string const& iLabel) {
520  Worker* found = nullptr;
521  for (auto const& worker : allWorkers()) {
522  if (worker->description().moduleLabel() == iLabel) {
523  found = worker;
524  break;
525  }
526  }
527  if (nullptr == found) {
528  return;
529  }
530 
531  iMod->replaceModuleFor(found);
533  }
534 
535  std::vector<ModuleDescription const*>
537  std::vector<ModuleDescription const*> result;
538  result.reserve(allWorkers().size());
539 
540  for (auto const& worker : allWorkers()) {
541  ModuleDescription const* p = worker->descPtr();
542  result.push_back(p);
543  }
544  return result;
545  }
546 
548  EventPrincipal& ep,
549  EventSetup const& es)
550  {
551  this->resetAll();
552  for (int empty_trig_path : empty_trig_paths_) {
553  results_->at(empty_trig_path) = HLTPathStatus(hlt::Pass, 0);
554  }
555 
557 
558  Traits::setStreamContext(streamContext_, ep);
559  Traits::preScheduleSignal(actReg_.get(), &streamContext_);
560 
561  // This call takes care of the unscheduled processing.
563 
564  ++total_events_;
565  auto serviceToken = ServiceRegistry::instance().presentToken();
566  auto pathsDone = make_waiting_task(tbb::task::allocate_root(),
567  [iTask,&ep, &es, this,serviceToken](std::exception_ptr const* iPtr) mutable
568  {
569  ServiceRegistry::Operate operate(serviceToken);
570 
571  std::exception_ptr ptr;
572  if(iPtr) {
573  ptr = *iPtr;
574  }
575  finishedPaths(ptr, std::move(iTask), ep, es);
576  });
577 
578  //The holder guarantees that if the paths finish before the loop ends
579  // that we do not start too soon. It also guarantees that the task will
580  // run under that condition.
581  WaitingTaskHolder taskHolder(pathsDone);
582 
583  for(auto it = trig_paths_.rbegin(), itEnd = trig_paths_.rend();
584  it != itEnd; ++ it) {
585  it->processOneOccurrenceAsync(pathsDone,ep, es, streamID_, &streamContext_);
586  }
587  }
588 
589  void
590  StreamSchedule::finishedPaths(std::exception_ptr iExcept, WaitingTaskHolder iWait, EventPrincipal& ep,
591  EventSetup const& es) {
592 
593  if(iExcept) {
594  try {
595  std::rethrow_exception(iExcept);
596  }
597  catch(cms::Exception& e) {
599  assert (action != exception_actions::IgnoreCompletely);
600  assert (action != exception_actions::FailPath);
601  if (action == exception_actions::SkipEvent) {
602  edm::printCmsExceptionWarning("SkipEvent", e);
603  iExcept = std::exception_ptr();
604  } else {
605  iExcept = std::current_exception();
606  }
607  }
608  catch(...) {
609  iExcept = std::current_exception();
610  }
611  }
612 
613 
614  if((not iExcept) and results_->accept()) {
615  ++total_passed_;
616  }
617 
618  if((not iExcept) and (nullptr != results_inserter_.get())) {
619  try {
620  ParentContext parentContext(&streamContext_);
622 
623  results_inserter_->doWork<Traits>(ep, es, streamID_, parentContext, &streamContext_);
624  }
625  catch (cms::Exception & ex) {
626  ex.addContext("Calling produce method for module TriggerResultInserter");
627  std::ostringstream ost;
628  ost << "Processing " << ep.id();
629  ex.addContext(ost.str());
630  iExcept = std::current_exception();
631  }
632  catch(...) {
633  iExcept = std::current_exception();
634  }
635  }
636  if(end_paths_.empty() or iExcept or (not endpathsAreActive_)) {
637  iExcept = finishProcessOneEvent(iExcept);
638  iWait.doneWaiting(iExcept);
639  } else {
640  auto serviceToken = ServiceRegistry::instance().presentToken();
641 
642  auto endPathsDone = make_waiting_task(tbb::task::allocate_root(),
643  [iWait,this,serviceToken](std::exception_ptr const* iPtr) mutable
644  {
645  ServiceRegistry::Operate operate(serviceToken);
646 
647  std::exception_ptr ptr;
648  if(iPtr) {
649  ptr = *iPtr;
650  }
652  });
653  //The holder guarantees that if the paths finish before the loop ends
654  // that we do not start too soon. It also guarantees that the task will
655  // run under that condition.
656  WaitingTaskHolder taskHolder(endPathsDone);
657  for(auto it = end_paths_.rbegin(), itEnd = end_paths_.rend();
658  it != itEnd; ++it) {
659  it->processOneOccurrenceAsync(endPathsDone,ep, es, streamID_, &streamContext_);
660  }
661  }
662  }
663 
664 
665  std::exception_ptr
666  StreamSchedule::finishProcessOneEvent(std::exception_ptr iExcept) {
668 
669  if(iExcept) {
670  //add context information to the exception and print message
671  try {
672  convertException::wrap([&]() {
673  std::rethrow_exception(iExcept);
674  });
675  } catch(cms::Exception& ex) {
676  bool const cleaningUpAfterException = false;
677  if (ex.context().empty()) {
678  addContextAndPrintException("Calling function StreamSchedule::processOneEvent", ex, cleaningUpAfterException);
679  } else {
680  addContextAndPrintException("", ex, cleaningUpAfterException);
681  }
682  iExcept = std::current_exception();
683  }
684 
685  actReg_->preStreamEarlyTerminationSignal_(streamContext_,TerminationOrigin::ExceptionFromThisContext);
686  }
687 
688  try {
689  Traits::postScheduleSignal(actReg_.get(), &streamContext_);
690  } catch(...) {
691  if(not iExcept) {
692  iExcept = std::current_exception();
693  }
694  }
695  if(not iExcept ) {
697  }
698 
699  return iExcept;
700  }
701 
702 
703  void
704  StreamSchedule::availablePaths(std::vector<std::string>& oLabelsToFill) const {
705  oLabelsToFill.reserve(trig_paths_.size());
706  std::transform(trig_paths_.begin(),
707  trig_paths_.end(),
708  std::back_inserter(oLabelsToFill),
709  std::bind(&Path::name, std::placeholders::_1));
710  }
711 
712  void
713  StreamSchedule::triggerPaths(std::vector<std::string>& oLabelsToFill) const {
714  oLabelsToFill = trig_name_list_;
715  }
716 
717  void
718  StreamSchedule::endPaths(std::vector<std::string>& oLabelsToFill) const {
719  oLabelsToFill = end_path_name_list_;
720  }
721 
722  void
724  std::vector<std::string>& oLabelsToFill) const {
725  TrigPaths::const_iterator itFound =
726  std::find_if (trig_paths_.begin(),
727  trig_paths_.end(),
728  std::bind(std::equal_to<std::string>(),
729  iPathLabel,
730  std::bind(&Path::name, std::placeholders::_1)));
731  if (itFound!=trig_paths_.end()) {
732  oLabelsToFill.reserve(itFound->size());
733  for (size_t i = 0; i < itFound->size(); ++i) {
734  oLabelsToFill.push_back(itFound->getWorker(i)->description().moduleLabel());
735  }
736  }
737  }
738 
739  void
741  std::vector<ModuleDescription const*>& descriptions,
742  unsigned int hint) const {
743  descriptions.clear();
744  bool found = false;
745  TrigPaths::const_iterator itFound;
746 
747  if(hint < trig_paths_.size()) {
748  itFound = trig_paths_.begin() + hint;
749  if(itFound->name() == iPathLabel) found = true;
750  }
751  if(!found) {
752  // if the hint did not work, do it the slow way
753  itFound = std::find_if (trig_paths_.begin(),
754  trig_paths_.end(),
755  std::bind(std::equal_to<std::string>(),
756  iPathLabel,
757  std::bind(&Path::name, std::placeholders::_1)));
758  if (itFound != trig_paths_.end()) found = true;
759  }
760  if (found) {
761  descriptions.reserve(itFound->size());
762  for (size_t i = 0; i < itFound->size(); ++i) {
763  descriptions.push_back(itFound->getWorker(i)->descPtr());
764  }
765  }
766  }
767 
768  void
770  std::vector<ModuleDescription const*>& descriptions,
771  unsigned int hint) const {
772  descriptions.clear();
773  bool found = false;
774  TrigPaths::const_iterator itFound;
775 
776  if(hint < end_paths_.size()) {
777  itFound = end_paths_.begin() + hint;
778  if(itFound->name() == iEndPathLabel) found = true;
779  }
780  if(!found) {
781  // if the hint did not work, do it the slow way
782  itFound = std::find_if (end_paths_.begin(),
783  end_paths_.end(),
784  std::bind(std::equal_to<std::string>(),
785  iEndPathLabel,
786  std::bind(&Path::name, std::placeholders::_1)));
787  if (itFound != end_paths_.end()) found = true;
788  }
789  if (found) {
790  descriptions.reserve(itFound->size());
791  for (size_t i = 0; i < itFound->size(); ++i) {
792  descriptions.push_back(itFound->getWorker(i)->descPtr());
793  }
794  }
795  }
796 
797  void
799  endpathsAreActive_ = active;
800  }
801 
802  bool
804  return endpathsAreActive_;
805  }
806 
807  static void
809  size_t which,
810  ModuleInPathSummary& sum) {
811  sum.timesVisited += path.timesVisited(which);
812  sum.timesPassed += path.timesPassed(which);
813  sum.timesFailed += path.timesFailed(which);
814  sum.timesExcept += path.timesExcept(which);
815  sum.moduleLabel = path.getWorker(which)->description().moduleLabel();
816  }
817 
818  static void
820  sum.name = path.name();
821  sum.bitPosition = path.bitPosition();
822  sum.timesRun += path.timesRun();
823  sum.timesPassed += path.timesPassed();
824  sum.timesFailed += path.timesFailed();
825  sum.timesExcept += path.timesExcept();
826 
827  Path::size_type sz = path.size();
828  if(sum.moduleInPathSummaries.size()==0) {
829  std::vector<ModuleInPathSummary> temp(sz);
830  for (size_t i = 0; i != sz; ++i) {
831  fillModuleInPathSummary(path, i, temp[i]);
832  }
833  sum.moduleInPathSummaries.swap(temp);
834  } else {
835  assert(sz == sum.moduleInPathSummaries.size());
836  for (size_t i = 0; i != sz; ++i) {
838  }
839  }
840  }
841 
842  static void
844  sum.timesVisited += w.timesVisited();
845  sum.timesRun += w.timesRun();
846  sum.timesPassed += w.timesPassed();
847  sum.timesFailed += w.timesFailed();
848  sum.timesExcept += w.timesExcept();
849  sum.moduleLabel = w.description().moduleLabel();
850  }
851 
852  static void
854  fillWorkerSummaryAux(*pw, sum);
855  }
856 
857  void
862 
863  fill_summary(trig_paths_, rep.trigPathSummaries, &fillPathSummary);
864  fill_summary(end_paths_, rep.endPathSummaries, &fillPathSummary);
865  fill_summary(allWorkers(), rep.workerSummaries, &fillWorkerSummary);
867  }
868 
869  void
871  using std::placeholders::_1;
873  for_all(trig_paths_, std::bind(&Path::clearCounters, _1));
874  for_all(end_paths_, std::bind(&Path::clearCounters, _1));
875  for_all(allWorkers(), std::bind(&Worker::clearCounters, _1));
876  }
877 
878  void
880  skippingEvent_ = false;
881  results_->reset();
882  }
883 
884  void
887  }
888 
889  void
891  //must be sure we have cleared the count first
892  for(auto& count:earlyDeleteBranchToCount_) {
893  count.count = 0;
894  }
895  //now reset based on how many helpers use that branch
897  ++(earlyDeleteBranchToCount_[index].count);
898  }
899  for(auto& helper: earlyDeleteHelpers_) {
900  helper.reset();
901  }
902  }
903 
904 }
static void fillModuleInPathSummary(Path const &path, size_t which, ModuleInPathSummary &sum)
void triggerPaths(std::vector< std::string > &oLabelsToFill) const
size
Write out results.
dbl * delta
Definition: mlp_gen.cc:36
void moduleDescriptionsInEndPath(std::string const &iEndPathLabel, std::vector< ModuleDescription const * > &descriptions, unsigned int hint) const
T getParameter(std::string const &) const
std::vector< PathSummary > endPathSummaries
Definition: TriggerReport.h:68
T getUntrackedParameter(std::string const &, T const &) const
int i
Definition: DBlmapReader.cc:9
std::string const & branchName() const
int timesRun() const
Definition: Path.h:72
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
int bitPosition() const
Definition: Path.h:67
std::vector< Worker * > Workers
ModuleDescription const & description() const
Definition: Worker.h:123
std::vector< int > empty_trig_paths_
Definition: helper.py:1
void moduleDescriptionsInPath(std::string const &iPathLabel, std::vector< ModuleDescription const * > &descriptions, unsigned int hint) const
roAction_t actions[nactions]
Definition: GenABIO.cc:187
void endStream(StreamID iID, StreamContext &streamContext)
void fillEndPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name)
const double w
Definition: UKUtility.cc:23
virtual void replaceModuleFor(Worker *) const =0
int totalEventsFailed() const
std::array< SelectedProducts, NumBranchTypes > SelectedProductsForBranchType
void initializeEarlyDelete(ModuleRegistry &modReg, edm::ParameterSet const &opts, edm::ProductRegistry const &preg, bool allowEarlyDelete)
std::shared_ptr< HLTGlobalStatus > TrigResPtr
void setOnDemandProducts(ProductRegistry &pregistry, std::set< std::string > const &unscheduledLabels) const
WorkersInPath::size_type size_type
Definition: Path.h:47
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
void addToUnscheduledWorkers(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration > processConfiguration, std::string label, std::set< std::string > &unscheduledLabels, std::vector< std::string > &shouldBeUsedLabels)
int timesPassed() const
Definition: Worker.h:159
volatile bool endpathsAreActive_
std::vector< BranchToCount > earlyDeleteBranchToCount_
EventID const & id() const
int totalEvents() const
void clearCounters()
Definition: Worker.h:145
void addContextAndPrintException(char const *context, cms::Exception &ex, bool disablePrint)
std::string const & moduleName() const
std::string const & category() const
Definition: Exception.cc:183
void addToAllWorkers(Worker *w)
int timesExcept() const
Definition: Path.h:75
exception_actions::ActionCodes find(const std::string &category) const
size_type size() const
Definition: Path.h:79
ParameterSet getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
edm::propagate_const< WorkerPtr > results_inserter_
ExceptionToActionTable const & actionTable() const
returns the action table
void beginStream(StreamID id, StreamContext &streamContext)
Definition: Worker.cc:266
std::shared_ptr< Worker > WorkerPtr
std::vector< WorkerSummary > workerSummaries
Definition: TriggerReport.h:69
ServiceToken presentToken() const
std::string const & moduleLabel() const
void processOneEventAsync(WaitingTaskHolder iTask, EventPrincipal &ep, EventSetup const &es)
unsigned int number_of_unscheduled_modules_
WorkerManager workerManager_
std::shared_ptr< ActivityRegistry > actReg_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
std::string moduleLabel
Definition: TriggerReport.h:56
std::vector< WorkerInPath > PathWorkers
int timesExcept() const
Definition: Worker.h:161
int totalEventsPassed() const
void getTriggerReport(TriggerReport &rep) const
void doneWaiting(std::exception_ptr iExcept)
std::vector< PathSummary > trigPathSummaries
Definition: TriggerReport.h:67
EventSummary eventSummary
Definition: TriggerReport.h:66
accept
Definition: HLTenums.h:19
StreamSchedule(std::shared_ptr< TriggerResultInserter > inserter, std::shared_ptr< ModuleRegistry >, ParameterSet &proc_pset, service::TriggerNamesService &tns, PreallocationConfiguration const &prealloc, ProductRegistry &pregistry, BranchIDListHelper &branchIDListHelper, ExceptionToActionTable const &actions, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration > processConfiguration, bool allowEarlyDelete, StreamID streamID, ProcessContext const *processContext)
int timesVisited() const
Definition: Worker.h:158
std::string name
Definition: TriggerReport.h:44
void fillWorkers(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &name, bool ignoreFilters, PathWorkers &out, vstring *labelsOnPaths)
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
def unique(seq, keepstr=True)
Definition: tier0.py:24
Definition: Path.h:42
StreamContext streamContext_
std::list< std::string > const & context() const
Definition: Exception.cc:191
static void fillPathSummary(Path const &path, PathSummary &sum)
static ServiceRegistry & instance()
int timesPassed() const
Definition: Path.h:73
bool getMapped(key_type const &k, value_type &result) const
Definition: Registry.cc:18
int timesRun() const
Definition: Worker.h:157
#define end
Definition: vmac.h:37
rep
Definition: cuy.py:1188
std::exception_ptr finishProcessOneEvent(std::exception_ptr)
vstring empty_trig_path_names_
element_type const * get() const
static void fillWorkerSummaryAux(Worker const &w, WorkerSummary &sum)
void beginStream(StreamID iID, StreamContext &streamContext)
void clearCounters()
Clear all the counters in the trigger report.
unsigned int value() const
Definition: StreamID.h:46
void sort_all(RandomAccessSequence &s)
wrappers for std::sort
Definition: Algorithms.h:120
std::vector< unsigned int > earlyDeleteHelperToBranchIndicies_
void forAllModuleHolders(F iFunc)
edm::propagate_const< TrigResPtr > results_
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:90
bool search_all(ForwardSequence const &s, Datum const &d)
Definition: Algorithms.h:46
int timesVisited(size_type i) const
Definition: Path.h:80
double b
Definition: hdecay.h:120
void addContext(std::string const &context)
Definition: Exception.cc:227
virtual std::unique_ptr< OutputModuleCommunicator > createOutputModuleCommunicator()=0
int timesFailed() const
Definition: Worker.h:160
static void fillWorkerSummary(Worker const *pw, WorkerSummary &sum)
bool endPathsEnabled() const
std::atomic< bool > skippingEvent_
std::string const & name() const
Definition: Path.h:68
#define begin
Definition: vmac.h:30
HLT enums.
void fillTrigPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, TrigResPtr, vstring *labelsOnTriggerPaths)
Worker const * getWorker(size_type i) const
Definition: Path.h:84
void replaceModule(maker::ModuleHolder *iMod, std::string const &iLabel)
clone the type of module with label iLabel but configure with iPSet.
void enableEndPaths(bool active)
def which(cmd)
Definition: eostools.py:335
std::vector< ModuleInPathSummary > moduleInPathSummaries
Definition: TriggerReport.h:45
virtual Types moduleType() const =0
auto wrap(F iFunc) -> decltype(iFunc())
void finishedPaths(std::exception_ptr, WaitingTaskHolder, EventPrincipal &ep, EventSetup const &es)
TrigResConstPtr results() const
std::vector< std::string > vstring
Definition: Schedule.cc:368
def move(src, dest)
Definition: eostools.py:510
static Registry * instance()
Definition: Registry.cc:12
Worker * getWorker(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &label)
void clearCounters()
Definition: Path.cc:168
void availablePaths(std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill the labels for all paths in the process
void setupOnDemandSystem(EventPrincipal &principal, EventSetup const &es)
std::vector< EarlyDeleteHelper > earlyDeleteHelpers_
void modulesInPath(std::string const &iPathLabel, std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel ...
void addToAllWorkers(Worker *w)
int timesFailed() const
Definition: Path.h:74
void printCmsExceptionWarning(char const *behavior, cms::Exception const &e)
def operate(timelog, memlog, json_f, num)
void endPaths(std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill the labels for all end paths in the process
ParameterSet * getPSetForUpdate(std::string const &name, bool &isTracked)