CMS 3D CMS Logo

ProductResolvers.cc
Go to the documentation of this file.
1 /*----------------------------------------------------------------------
2 ----------------------------------------------------------------------*/
3 #include "ProductResolvers.h"
23 
24 #include <cassert>
25 #include <utility>
26 
27 static constexpr unsigned int kUnsetOffset = 0;
28 static constexpr unsigned int kAmbiguousOffset = 1;
29 static constexpr unsigned int kMissingOffset = 2;
30 
31 namespace edm {
32 
35  exception << "DataManagingProductResolver::resolveProduct_: The product matching all criteria was already deleted\n"
36  << "Looking for type: " << branchDescription().unwrappedTypeID() << "\n"
37  << "Looking for module label: " << moduleLabel() << "\n"
38  << "Looking for productInstanceName: " << productInstanceName() << "\n"
39  << (processName().empty() ? "" : "Looking for process: ") << processName() << "\n"
40  << "This means there is a configuration error.\n"
41  << "The module which is asking for this data must be configured to state that it will read this data.";
42  throw exception;
43  }
44 
45  //This is a templated function in order to avoid calling another virtual function
46  template <bool callResolver, typename FUNC>
48  if (productWasDeleted()) {
50  }
51  auto presentStatus = status();
52 
53  if (callResolver && presentStatus == ProductStatus::ResolveNotRun) {
54  //if resolver fails because of exception or not setting product
55  // make sure the status goes to failed
56  auto failedStatusSetter = [this](ProductStatus* iPresentStatus) {
57  if (this->status() == ProductStatus::ResolveNotRun) {
58  this->setFailedStatus();
59  }
60  *iPresentStatus = this->status();
61  };
62  std::unique_ptr<ProductStatus, decltype(failedStatusSetter)> failedStatusGuard(&presentStatus,
63  failedStatusSetter);
64 
65  //If successful, this will call setProduct
66  resolver();
67  }
68 
69  if (presentStatus == ProductStatus::ProductSet) {
70  auto pd = &getProductData();
71  if (pd->wrapper()->isPresent()) {
72  return Resolution(pd);
73  }
74  }
75 
76  return Resolution(nullptr);
77  }
78 
80  std::shared_ptr<WrapperBase> iFrom, MergeableRunProductMetadata const* mergeableRunProductMetadata) const {
81  // if its not mergeable and the previous read failed, go ahead and use this one
83  setProduct(std::move(iFrom));
84  return;
85  }
86 
88  if (not iFrom) {
89  return;
90  }
91 
92  checkType(*iFrom);
93 
95  if (original->isMergeable()) {
96  if (original->isPresent() != iFrom->isPresent()) {
98  << "Merge of Run or Lumi product failed for branch " << branchDescription().branchName() << "\n"
99  << "Was trying to merge objects where one product had been put in the input file and the other had not "
100  "been."
101  << "\n"
102  << "The solution is to drop the branch on input. Or better do not create inconsistent files\n"
103  << "that need to be merged in the first place.\n";
104  }
105  if (original->isPresent()) {
107  if (mergeableRunProductMetadata == nullptr || desc.branchType() != InRun) {
108  original->mergeProduct(iFrom.get());
109  } else {
111  mergeableRunProductMetadata->getMergeDecision(desc.processName());
112  if (decision == MergeableRunProductMetadata::MERGE) {
113  original->mergeProduct(iFrom.get());
114  } else if (decision == MergeableRunProductMetadata::REPLACE) {
115  // Note this swaps the content of the product where the
116  // both products branches are present and the product is
117  // also present (was put) in the branch. A module might
118  // have already gotten a pointer to the product so we
119  // keep those pointers valid. This does not call swap
120  // on the Wrapper.
121  original->swapProduct(iFrom.get());
122  }
123  // If the decision is IGNORE, do nothing
124  }
125  }
126  // If both have isPresent false, do nothing
127 
128  } else if (original->hasIsProductEqual()) {
129  if (original->isPresent() && iFrom->isPresent()) {
130  if (!original->isProductEqual(iFrom.get())) {
131  auto const& bd = branchDescription();
132  edm::LogError("RunLumiMerging")
133  << "ProductResolver::mergeTheProduct\n"
134  << "Two run/lumi products for the same run/lumi which should be equal are not\n"
135  << "Using the first, ignoring the second\n"
136  << "className = " << bd.className() << "\n"
137  << "moduleLabel = " << bd.moduleLabel() << "\n"
138  << "instance = " << bd.productInstanceName() << "\n"
139  << "process = " << bd.processName() << "\n";
140  }
141  } else if (!original->isPresent() && iFrom->isPresent()) {
142  setProduct(std::move(iFrom));
143  }
144  // if not iFrom->isPresent(), do nothing
145  } else {
146  auto const& bd = branchDescription();
147  edm::LogWarning("RunLumiMerging") << "ProductResolver::mergeTheProduct\n"
148  << "Run/lumi product has neither a mergeProduct nor isProductEqual function\n"
149  << "Using the first, ignoring the second in merge\n"
150  << "className = " << bd.className() << "\n"
151  << "moduleLabel = " << bd.moduleLabel() << "\n"
152  << "instance = " << bd.productInstanceName() << "\n"
153  << "process = " << bd.processName() << "\n";
154  if (!original->isPresent() && iFrom->isPresent()) {
155  setProduct(std::move(iFrom));
156  }
157  // In other cases, do nothing
158  }
159  }
160 
161  namespace {
162  cms::Exception& extendException(cms::Exception& e, BranchDescription const& bd, ModuleCallingContext const* mcc) {
163  e.addContext(std::string("While reading from source ") + bd.className() + " " + bd.moduleLabel() + " '" +
164  bd.productInstanceName() + "' " + bd.processName());
165  if (mcc) {
166  edm::exceptionContext(e, *mcc);
167  }
168  return e;
169  }
170  } // namespace
172  Principal const& principal, bool, SharedResourcesAcquirer*, ModuleCallingContext const* mcc) const {
173  return resolveProductImpl<true>([this, &principal, mcc]() {
174  auto branchType = principal.branchType();
175  if (branchType == InLumi || branchType == InRun) {
176  //delayed get has not been allowed with Run or Lumis
177  // The file may already be closed so the reader is invalid
178  return;
179  }
180  if (mcc and branchType == InEvent and aux_) {
182  }
183 
184  auto sentry(make_sentry(mcc, [this, branchType](ModuleCallingContext const* iContext) {
185  if (branchType == InEvent and aux_) {
186  aux_->postModuleDelayedGetSignal_.emit(*(iContext->getStreamContext()), *iContext);
187  }
188  }));
189 
190  if (auto reader = principal.reader()) {
191  std::unique_lock<std::recursive_mutex> guard;
192  if (auto sr = reader->sharedResources().second) {
193  guard = std::unique_lock<std::recursive_mutex>(*sr);
194  }
195  if (not productResolved()) {
196  try {
197  //another thread could have beaten us here
198  setProduct(reader->getProduct(branchDescription().branchID(), &principal, mcc));
199  } catch (cms::Exception& e) {
200  throw extendException(e, branchDescription(), mcc);
201  } catch (std::exception const& e) {
202  auto newExcept = edm::Exception(errors::StdException) << e.what();
203  throw extendException(newExcept, branchDescription(), mcc);
204  }
205  }
206  }
207  });
208  }
209 
211  Principal const& principal, MergeableRunProductMetadata const* mergeableRunProductMetadata) const {
212  if (auto reader = principal.reader()) {
213  std::unique_lock<std::recursive_mutex> guard;
214  if (auto sr = reader->sharedResources().second) {
215  guard = std::unique_lock<std::recursive_mutex>(*sr);
216  }
217 
218  //Can't use resolveProductImpl since it first checks to see
219  // if the product was already retrieved and then returns if it is
220  auto edp(reader->getProduct(branchDescription().branchID(), &principal));
221 
222  if (edp.get() != nullptr) {
223  if (edp->isMergeable() && branchDescription().branchType() == InRun && !edp->hasSwap()) {
225  << "Missing definition of member function swap for branch name " << branchDescription().branchName()
226  << "\n"
227  << "Mergeable data types written to a Run must have the swap member function defined"
228  << "\n";
229  }
231  (status() == ProductStatus::ResolveFailed && !branchDescription().isMergeable())) {
232  setOrMergeProduct(std::move(edp), mergeableRunProductMetadata);
233  } else { // status() == ResolveFailed && branchDescription().isMergeable()
235  << "Merge of Run or Lumi product failed for branch " << branchDescription().branchName() << "\n"
236  << "The product branch was dropped in the first run or lumi fragment and present in a later one"
237  << "\n"
238  << "The solution is to drop the branch on input. Or better do not create inconsistent files\n"
239  << "that need to be merged in the first place.\n";
240  }
241  } else if (status() == defaultStatus()) {
242  setFailedStatus();
243  } else if (status() != ProductStatus::ResolveFailed && branchDescription().isMergeable()) {
245  << "Merge of Run or Lumi product failed for branch " << branchDescription().branchName() << "\n"
246  << "The product branch was present in first run or lumi fragment and dropped in a later one"
247  << "\n"
248  << "The solution is to drop the branch on input. Or better do not create inconsistent files\n"
249  << "that need to be merged in the first place.\n";
250  }
251  // Do nothing in other case. status is ResolveFailed already or
252  // it is not mergeable and the status is ProductSet
253  }
254  }
255 
257  std::shared_ptr<WrapperBase> prod, MergeableRunProductMetadata const* mergeableRunProductMetadata) const {
258  if (status() == defaultStatus()) {
259  //resolveProduct has not been called or it failed
261  } else {
262  mergeProduct(std::move(prod), mergeableRunProductMetadata);
263  }
264  }
265 
268  }
269 
271  Principal const& principal,
272  bool skipCurrentProcess,
273  ServiceToken const& token,
275  ModuleCallingContext const* mcc) const {
276  //need to try changing m_prefetchRequested before adding to m_waitingTasks
277  bool expected = false;
278  bool prefetchRequested = m_prefetchRequested.compare_exchange_strong(expected, true);
279  m_waitingTasks.add(waitTask);
280 
281  if (prefetchRequested) {
282  ServiceWeakToken weakToken = token;
283  auto workToDo = [this, mcc, &principal, weakToken]() {
284  //need to make sure Service system is activated on the reading thread
285  ServiceRegistry::Operate operate(weakToken.lock());
286  // Caught exception is propagated via WaitingTaskList
287  CMS_SA_ALLOW try {
288  resolveProductImpl<true>([this, &principal, mcc]() {
289  if (principal.branchType() != InEvent && principal.branchType() != InProcess) {
290  return;
291  }
292  if (auto reader = principal.reader()) {
293  std::unique_lock<std::recursive_mutex> guard;
294  if (auto sr = reader->sharedResources().second) {
295  guard = std::unique_lock<std::recursive_mutex>(*sr);
296  }
297  if (not productResolved()) {
298  try {
299  //another thread could have finished this while we were waiting
300  setProduct(reader->getProduct(branchDescription().branchID(), &principal, mcc));
301  } catch (cms::Exception& e) {
302  throw extendException(e, branchDescription(), mcc);
303  } catch (std::exception const& e) {
304  auto newExcept = edm::Exception(errors::StdException) << e.what();
305  throw extendException(newExcept, branchDescription(), mcc);
306  }
307  }
308  }
309  });
310  } catch (...) {
311  this->m_waitingTasks.doneWaiting(std::current_exception());
312  return;
313  }
314  this->m_waitingTasks.doneWaiting(nullptr);
315  };
316 
317  SerialTaskQueueChain* queue = nullptr;
318  if (auto reader = principal.reader()) {
319  if (auto shared_res = reader->sharedResources().first) {
320  queue = &(shared_res->serialQueueChain());
321  }
322  }
323  if (queue) {
324  queue->push(*waitTask.group(), workToDo);
325  } else {
326  //Have to create a new task
327  auto t = make_functor_task(workToDo);
328  waitTask.group()->run([t]() {
329  TaskSentry s{t};
330  t->execute();
331  });
332  }
333  }
334  }
335 
337  if (not deleteEarly) {
338  m_prefetchRequested = false;
340  }
342  }
343 
345  aux_ = iConfigure.auxiliary();
346  }
347 
349 
350  void PutOnReadInputProductResolver::putProduct(std::unique_ptr<WrapperBase> edp) const {
351  if (status() != defaultStatus()) {
353  << "Attempt to insert more than one product on branch " << branchDescription().branchName() << "\n";
354  }
355 
356  setProduct(std::move(edp)); // ProductResolver takes ownership
357  }
358 
360 
362  bool skipCurrentProcess,
364  ModuleCallingContext const*) const {
365  return resolveProductImpl<false>([]() { return; });
366  }
367 
369  Principal const& principal,
370  bool skipCurrentProcess,
371  ServiceToken const& token,
373  ModuleCallingContext const* mcc) const {}
374 
375  void PutOnReadInputProductResolver::putOrMergeProduct(std::unique_ptr<WrapperBase> edp) const {
376  setOrMergeProduct(std::move(edp), nullptr);
377  }
378 
380  bool skipCurrentProcess,
382  ModuleCallingContext const*) const {
383  if (!skipCurrentProcess) {
384  //'false' means never call the lambda function
385  return resolveProductImpl<false>([]() { return; });
386  }
387  return Resolution(nullptr);
388  }
389 
391  Principal const& principal,
392  bool skipCurrentProcess,
393  ServiceToken const& token,
395  ModuleCallingContext const* mcc) const {
396  if (not skipCurrentProcess) {
397  if (branchDescription().branchType() == InProcess &&
399  // This is an accessInputProcessBlock transition
400  // We cannot access produced products in those transitions
401  // except for in SubProcesses where they should have already run.
402  return;
403  }
405  if (not mcc->parent().isAtEndTransition()) {
406  return;
407  }
408  }
409 
410  if (waitingTasks_) {
411  // using a waiting task to do a callback guarantees that the
412  // waitingTasks_ list (from the worker) will be released from
413  // waiting even if the module does not put this data product
414  // or the module has an exception while running
415  waitingTasks_->add(waitTask);
416  }
417  }
418  }
419 
421  auto worker = iConfigure.findWorker(branchDescription().moduleLabel());
422  if (worker) {
423  waitingTasks_ = &worker->waitingTaskList();
424  }
425  }
426 
428  aux_ = iConfigure.auxiliary();
430  }
431 
433  bool skipCurrentProcess,
435  ModuleCallingContext const*) const {
436  if (!skipCurrentProcess and worker_) {
437  return resolveProductImpl<false>([] {});
438  }
439  return Resolution(nullptr);
440  }
441 
443  Principal const& principal,
444  bool skipCurrentProcess,
445  ServiceToken const& token,
447  ModuleCallingContext const* mcc) const {
448  if (skipCurrentProcess) {
449  return;
450  }
451  if (worker_ == nullptr) {
452  throw cms::Exception("LogicError") << "UnscheduledProductResolver::prefetchAsync_() called with null worker_. "
453  "This should not happen, please contact framework developers.";
454  }
455  //need to try changing prefetchRequested_ before adding to waitingTasks_
456  bool expected = false;
457  bool prefetchRequested = prefetchRequested_.compare_exchange_strong(expected, true);
458  waitingTasks_.add(waitTask);
459  if (prefetchRequested) {
460  //Have to create a new task which will make sure the state for UnscheduledProductResolver
461  // is properly set after the module has run
462  auto t = make_waiting_task([this](std::exception_ptr const* iPtr) {
463  //The exception is being rethrown because resolveProductImpl sets the ProductResolver to a failed
464  // state for the case where an exception occurs during the call to the function.
465  // Caught exception is propagated via WaitingTaskList
466  CMS_SA_ALLOW try {
467  resolveProductImpl<true>([iPtr]() {
468  if (iPtr) {
469  std::rethrow_exception(*iPtr);
470  }
471  });
472  } catch (...) {
473  waitingTasks_.doneWaiting(std::current_exception());
474  return;
475  }
476  waitingTasks_.doneWaiting(nullptr);
477  });
478 
479  ParentContext parentContext(mcc);
482  WaitingTaskHolder(*waitTask.group(), t),
483  info,
484  token,
485  info.principal().streamID(),
486  parentContext,
487  mcc->getStreamContext());
488  }
489  }
490 
492  if (not deleteEarly) {
493  prefetchRequested_ = false;
495  }
497  }
498 
499  void ProducedProductResolver::putProduct(std::unique_ptr<WrapperBase> edp) const {
500  if (status() != defaultStatus()) {
502  << "Attempt to insert more than one product on branch " << branchDescription().branchName() << "\n";
503  }
504 
505  setProduct(std::move(edp)); // ProductResolver takes ownership
506  }
507 
508  bool ProducedProductResolver::isFromCurrentProcess() const { return true; }
509 
511 
513  // Check if the types match.
514  TypeID typeID(prod.dynamicTypeInfo());
516  // Types do not match.
518  << "Product on branch " << branchDescription().branchName() << " is of wrong type.\n"
519  << "It is supposed to be of type " << branchDescription().className() << ".\n"
520  << "It is actually of type " << typeID.className() << ".\n";
521  }
522  }
523 
524  void DataManagingProductResolver::setProduct(std::unique_ptr<WrapperBase> edp) const {
525  if (edp) {
526  checkType(*edp);
529  } else {
530  setFailedStatus();
531  }
532  }
533  void DataManagingProductResolver::setProduct(std::shared_ptr<WrapperBase> edp) const {
534  if (edp) {
535  checkType(*edp);
538  } else {
539  setFailedStatus();
540  }
541  }
542 
543  // This routine returns true if it is known that currently there is no real product.
544  // If there is a real product, it returns false.
545  // If it is not known if there is a real product, it returns false.
547  auto presentStatus = status();
548  if (presentStatus == ProductStatus::ProductSet) {
549  return !(getProductData().wrapper()->isPresent());
550  }
551  return presentStatus != ProductStatus::ResolveNotRun;
552  }
553 
555  auto s = status();
556  return (s != defaultStatus()) or (s == ProductStatus::ProductDeleted);
557  }
558 
559  // This routine returns true if the product was deleted early in order to save memory
561 
562  bool DataManagingProductResolver::productWasFetchedAndIsValid_(bool iSkipCurrentProcess) const {
563  if (iSkipCurrentProcess and isFromCurrentProcess()) {
564  return false;
565  }
567  if (getProductData().wrapper()->isPresent()) {
568  return true;
569  }
570  }
571  return false;
572  }
573 
575  productData_.setProvenance(provRetriever);
576  }
577 
579 
581  MergeableRunProductMetadata const* mrpm) {
583  }
584 
586  return provenance()->productProvenance();
587  }
588 
592  }
593  if (deleteEarly) {
595  } else {
596  resetStatus();
597  }
598  }
599 
600  bool DataManagingProductResolver::singleProduct_() const { return true; }
601 
604  }
605 
607 
609  return provenance()->productProvenance();
610  }
611 
613 
614  bool AliasProductResolver::singleProduct_() const { return true; }
615 
616  SwitchBaseProductResolver::SwitchBaseProductResolver(std::shared_ptr<BranchDescription const> bd,
618  : realProduct_(realProduct), productData_(std::move(bd)), prefetchRequested_(false) {
619  // Parentage of this branch is always the same by construction, so we can compute the ID just "once" here.
620  Parentage p;
621  p.setParents(std::vector<BranchID>{realProduct.branchDescription().originalBranchID()});
622  parentageID_ = p.id();
624  }
625 
626  void SwitchBaseProductResolver::connectTo(ProductResolverBase const& iOther, Principal const* iParentPrincipal) {
628  << "SwitchBaseProductResolver::connectTo() not implemented and should never be called.\n"
629  << "Contact a Framework developer\n";
630  }
631 
634  }
635 
637  if (res.data() == nullptr)
638  return res;
639  return Resolution(&productData_);
640  }
641 
643  // SwitchProducer will never put anything in the event, and
644  // "false" will make Event::commit_() to call putProduct() with
645  // null unique_ptr<WrapperBase> to signal that the produce() was
646  // run.
647  return false;
648  }
649 
651  productData_.setProvenance(provRetriever);
652  }
653 
655  // insertIntoSet is const, so let's exploit that to fake the getting of the "input" product
657  }
658 
661  realProduct_.resetProductData_(deleteEarly);
662  if (not deleteEarly) {
663  prefetchRequested_ = false;
665  }
666  }
667 
669  // update provenance
671  // Use the Wrapper of the pointed-to resolver, but the provenance of this resolver
672  productData_.unsafe_setWrapper(realProduct().getProductData().sharedConstWrapper());
673  }
674 
675  SwitchProducerProductResolver::SwitchProducerProductResolver(std::shared_ptr<BranchDescription const> bd,
677  : SwitchBaseProductResolver(std::move(bd), realProduct), status_(defaultStatus_) {}
678 
680  bool skipCurrentProcess,
682  ModuleCallingContext const* mcc) const {
684  return resolveProductImpl(realProduct().resolveProduct(principal, skipCurrentProcess, sra, mcc));
685  }
686  return Resolution(nullptr);
687  }
688 
690  Principal const& principal,
691  bool skipCurrentProcess,
692  ServiceToken const& token,
694  ModuleCallingContext const* mcc) const {
695  if (skipCurrentProcess) {
696  return;
697  }
698  if (branchDescription().availableOnlyAtEndTransition() and mcc and not mcc->parent().isAtEndTransition()) {
699  return;
700  }
701 
702  //need to try changing prefetchRequested before adding to waitingTasks
703  bool expected = false;
704  bool doPrefetchRequested = prefetchRequested().compare_exchange_strong(expected, true);
705  waitingTasks().add(waitTask);
706 
707  if (doPrefetchRequested) {
708  //using a waiting task to do a callback guarantees that
709  // the waitingTasks() list will be released from waiting even
710  // if the module does not put this data product or the
711  // module has an exception while running
712  auto waiting = make_waiting_task([this](std::exception_ptr const* iException) {
713  if (nullptr != iException) {
714  waitingTasks().doneWaiting(*iException);
715  } else {
717  waitingTasks().doneWaiting(std::exception_ptr());
718  }
719  });
720  worker()->callWhenDoneAsync(WaitingTaskHolder(*waitTask.group(), waiting));
721  }
722  }
723 
724  void SwitchProducerProductResolver::putProduct(std::unique_ptr<WrapperBase> edp) const {
725  if (status_ != defaultStatus_) {
727  << "Attempt to insert more than one product for a branch " << branchDescription().branchName()
728  << "This makes no sense for SwitchProducerProductResolver.\nContact a Framework developer";
729  }
730  // Let's use ResolveFailed to signal that produce() was called, as
731  // there is no real product in this resolver
733  bool expected = false;
734  if (prefetchRequested().compare_exchange_strong(expected, true)) {
736  waitingTasks().doneWaiting(std::exception_ptr());
737  }
738  }
739 
741  // if produce() was run (ResolveFailed), ask from the real resolver
743  return realProduct().productUnavailable();
744  }
745  return true;
746  }
747 
750  if (not deleteEarly) {
752  }
753  }
754 
756  bool skipCurrentProcess,
758  ModuleCallingContext const* mcc) const {
759  return resolveProductImpl(realProduct().resolveProduct(principal, skipCurrentProcess, sra, mcc));
760  }
761 
763  Principal const& principal,
764  bool skipCurrentProcess,
765  ServiceToken const& token,
767  ModuleCallingContext const* mcc) const {
768  if (skipCurrentProcess) {
769  return;
770  }
771 
772  //need to try changing prefetchRequested_ before adding to waitingTasks_
773  bool expected = false;
774  bool doPrefetchRequested = prefetchRequested().compare_exchange_strong(expected, true);
775  waitingTasks().add(waitTask);
776 
777  if (doPrefetchRequested) {
778  //using a waiting task to do a callback guarantees that
779  // the waitingTasks() list will be released from waiting even
780  // if the module does not put this data product or the
781  // module has an exception while running
782  auto waiting = make_waiting_task([this](std::exception_ptr const* iException) {
783  if (nullptr != iException) {
784  waitingTasks().doneWaiting(*iException);
785  } else {
787  waitingTasks().doneWaiting(std::exception_ptr());
788  }
789  });
791  WaitingTaskHolder(*waitTask.group(), waiting), principal, skipCurrentProcess, token, sra, mcc);
792  }
793  }
794 
796  provRetriever_ = provRetriever;
797  }
798 
800 
802  return provRetriever_ ? provRetriever_->branchIDToProvenance(bd_->originalBranchID()) : nullptr;
803  }
804 
806 
807  bool ParentProcessProductResolver::singleProduct_() const { return true; }
808 
810  // In principle, this ought to be fixed. I noticed one hits this error
811  // when in a SubProcess and calling the Event::getProvenance function
812  // with a BranchID to a branch from an earlier SubProcess or the top
813  // level process and this branch is not kept in this SubProcess. It might
814  // be possible to hit this in other contexts. I say it ought to be
815  // fixed because one does not encounter this issue if the SubProcesses
816  // are split into genuinely different processes (in principle that
817  // ought to give identical behavior and results). No user has ever
818  // reported this issue which has been around for some time and it was only
819  // noticed when testing some rare corner cases after modifying Core code.
820  // After discussing this with Chris we decided that at least for the moment
821  // there are higher priorities than fixing this ... I converted it so it
822  // causes an exception instead of a seg fault. The issue that may need to
823  // be addressed someday is how ProductResolvers for non-kept branches are
824  // connected to earlier SubProcesses.
826  << "ParentProcessProductResolver::throwNullRealProduct RealProduct pointer not set in this context.\n"
827  << "Contact a Framework developer\n";
828  }
829 
830  NoProcessProductResolver::NoProcessProductResolver(std::vector<ProductResolverIndex> const& matchingHolders,
831  std::vector<bool> const& ambiguous,
832  bool madeAtEnd)
833  : matchingHolders_(matchingHolders),
834  ambiguous_(ambiguous),
835  lastCheckIndex_(ambiguous_.size() + kUnsetOffset),
836  lastSkipCurrentCheckIndex_(lastCheckIndex_.load()),
837  prefetchRequested_(false),
838  skippingPrefetchRequested_(false),
839  madeAtEnd_{madeAtEnd} {
840  assert(ambiguous_.size() == matchingHolders_.size());
841  }
842 
844  Principal const& principal,
845  bool skipCurrentProcess,
847  ModuleCallingContext const* mcc) const {
848  ProductResolverBase const* productResolver = principal.getProductResolverByIndex(matchingHolders_[index]);
849  return productResolver->resolveProduct(principal, skipCurrentProcess, sra, mcc);
850  }
851 
853  bool skipCurrentProcess,
855  ModuleCallingContext const* mcc) const {
856  //See if we've already cached which Resolver we should call or if
857  // we know it is ambiguous
858  const unsigned int choiceSize = ambiguous_.size();
859 
860  //madeAtEnd_==true and not at end transition is the same as skipping the current process
861  if ((not skipCurrentProcess) and (madeAtEnd_ and mcc)) {
862  skipCurrentProcess = not mcc->parent().isAtEndTransition();
863  }
864 
865  unsigned int checkCacheIndex = skipCurrentProcess ? lastSkipCurrentCheckIndex_.load() : lastCheckIndex_.load();
866  if (checkCacheIndex != choiceSize + kUnsetOffset) {
867  if (checkCacheIndex == choiceSize + kAmbiguousOffset) {
869  } else if (checkCacheIndex == choiceSize + kMissingOffset) {
870  return Resolution(nullptr);
871  }
872  return tryResolver(checkCacheIndex, principal, skipCurrentProcess, sra, mcc);
873  }
874 
875  std::atomic<unsigned int>& updateCacheIndex = skipCurrentProcess ? lastSkipCurrentCheckIndex_ : lastCheckIndex_;
876 
877  std::vector<unsigned int> const& lookupProcessOrder = principal.lookupProcessOrder();
878  for (unsigned int k : lookupProcessOrder) {
879  assert(k < ambiguous_.size());
880  if (k == 0)
881  break; // Done
882  if (ambiguous_[k]) {
883  updateCacheIndex = choiceSize + kAmbiguousOffset;
885  }
887  auto resolution = tryResolver(k, principal, skipCurrentProcess, sra, mcc);
888  if (resolution.data() != nullptr) {
889  updateCacheIndex = k;
890  return resolution;
891  }
892  }
893  }
894 
895  updateCacheIndex = choiceSize + kMissingOffset;
896  return Resolution(nullptr);
897  }
898 
900  Principal const& principal,
901  bool skipCurrentProcess,
902  ServiceToken const& token,
904  ModuleCallingContext const* mcc) const {
905  bool timeToMakeAtEnd = true;
906  if (madeAtEnd_ and mcc) {
907  timeToMakeAtEnd = mcc->parent().isAtEndTransition();
908  }
909 
910  //If timeToMakeAtEnd is false, then it is equivalent to skipping the current process
911  if (not skipCurrentProcess and timeToMakeAtEnd) {
912  //need to try changing prefetchRequested_ before adding to waitingTasks_
913  bool expected = false;
914  bool prefetchRequested = prefetchRequested_.compare_exchange_strong(expected, true);
915  waitingTasks_.add(waitTask);
916 
917  if (prefetchRequested) {
918  //we are the first thread to request
919  tryPrefetchResolverAsync(0, principal, false, sra, mcc, token, waitTask.group());
920  }
921  } else {
922  skippingWaitingTasks_.add(waitTask);
923  bool expected = false;
924  if (skippingPrefetchRequested_.compare_exchange_strong(expected, true)) {
925  //we are the first thread to request
926  tryPrefetchResolverAsync(0, principal, true, sra, mcc, token, waitTask.group());
927  }
928  }
929  }
930 
931  void NoProcessProductResolver::setCache(bool iSkipCurrentProcess,
932  ProductResolverIndex iIndex,
933  std::exception_ptr iExceptPtr) const {
934  if (not iSkipCurrentProcess) {
935  lastCheckIndex_ = iIndex;
936  waitingTasks_.doneWaiting(iExceptPtr);
937  } else {
940  }
941  }
942 
943  namespace {
944  class TryNextResolverWaitingTask : public edm::WaitingTask {
945  public:
946  TryNextResolverWaitingTask(NoProcessProductResolver const* iResolver,
947  unsigned int iResolverIndex,
948  Principal const* iPrincipal,
950  ModuleCallingContext const* iMCC,
951  bool iSkipCurrentProcess,
952  ServiceToken iToken,
953  oneapi::tbb::task_group* iGroup)
954  : resolver_(iResolver),
955  principal_(iPrincipal),
956  sra_(iSRA),
957  mcc_(iMCC),
958  group_(iGroup),
959  serviceToken_(iToken),
960  index_(iResolverIndex),
961  skipCurrentProcess_(iSkipCurrentProcess) {}
962 
963  void execute() final {
964  auto exceptPtr = exceptionPtr();
965  if (exceptPtr) {
966  resolver_->prefetchFailed(index_, *principal_, skipCurrentProcess_, exceptPtr);
967  } else {
968  if (not resolver_->dataValidFromResolver(index_, *principal_, skipCurrentProcess_)) {
969  resolver_->tryPrefetchResolverAsync(
970  index_ + 1, *principal_, skipCurrentProcess_, sra_, mcc_, serviceToken_.lock(), group_);
971  }
972  }
973  }
974 
975  private:
976  NoProcessProductResolver const* resolver_;
977  Principal const* principal_;
979  ModuleCallingContext const* mcc_;
980  oneapi::tbb::task_group* group_;
981  ServiceWeakToken serviceToken_;
982  unsigned int index_;
983  bool skipCurrentProcess_;
984  };
985  } // namespace
986 
987  void NoProcessProductResolver::prefetchFailed(unsigned int iProcessingIndex,
988  Principal const& principal,
989  bool iSkipCurrentProcess,
990  std::exception_ptr iExceptPtr) const {
991  std::vector<unsigned int> const& lookupProcessOrder = principal.lookupProcessOrder();
992  auto k = lookupProcessOrder[iProcessingIndex];
993 
994  setCache(iSkipCurrentProcess, k, iExceptPtr);
995  }
996 
997  bool NoProcessProductResolver::dataValidFromResolver(unsigned int iProcessingIndex,
998  Principal const& principal,
999  bool iSkipCurrentProcess) const {
1000  std::vector<unsigned int> const& lookupProcessOrder = principal.lookupProcessOrder();
1001  auto k = lookupProcessOrder[iProcessingIndex];
1002  ProductResolverBase const* productResolver = principal.getProductResolverByIndex(matchingHolders_[k]);
1003 
1004  if (productResolver->productWasFetchedAndIsValid(iSkipCurrentProcess)) {
1005  setCache(iSkipCurrentProcess, k, nullptr);
1006  return true;
1007  }
1008  return false;
1009  }
1010 
1011  void NoProcessProductResolver::tryPrefetchResolverAsync(unsigned int iProcessingIndex,
1012  Principal const& principal,
1013  bool skipCurrentProcess,
1015  ModuleCallingContext const* mcc,
1017  oneapi::tbb::task_group* group) const {
1018  std::vector<unsigned int> const& lookupProcessOrder = principal.lookupProcessOrder();
1019  auto index = iProcessingIndex;
1020 
1021  const unsigned int choiceSize = ambiguous_.size();
1022  unsigned int newCacheIndex = choiceSize + kMissingOffset;
1023  while (index < lookupProcessOrder.size()) {
1024  auto k = lookupProcessOrder[index];
1025  if (k == 0) {
1026  break;
1027  }
1028  assert(k < ambiguous_.size());
1029  if (ambiguous_[k]) {
1030  newCacheIndex = choiceSize + kAmbiguousOffset;
1031  break;
1032  }
1034  //make new task
1035 
1036  auto task = new TryNextResolverWaitingTask(this, index, &principal, sra, mcc, skipCurrentProcess, token, group);
1037  WaitingTaskHolder hTask(*group, task);
1038  ProductResolverBase const* productResolver = principal.getProductResolverByIndex(matchingHolders_[k]);
1039 
1040  //Make sure the Services are available on this thread
1042 
1043  productResolver->prefetchAsync(hTask, principal, skipCurrentProcess, token, sra, mcc);
1044  return;
1045  }
1046  ++index;
1047  }
1048  //data product unavailable
1049  setCache(skipCurrentProcess, newCacheIndex, nullptr);
1050  }
1051 
1053 
1055 
1057 
1058  inline unsigned int NoProcessProductResolver::unsetIndexValue() const { return ambiguous_.size() + kUnsetOffset; }
1059 
1061  // This function should never receive 'true'. On the other hand,
1062  // nothing should break if a 'true' is passed, because
1063  // NoProcessProductResolver just forwards the resolve
1064  const auto resetValue = unsetIndexValue();
1065  lastCheckIndex_ = resetValue;
1066  lastSkipCurrentCheckIndex_ = resetValue;
1067  prefetchRequested_ = false;
1069  waitingTasks_.reset();
1071  }
1072 
1073  bool NoProcessProductResolver::singleProduct_() const { return false; }
1074 
1077  << "NoProcessProductResolver::unscheduledWasNotRun_() not implemented and should never be called.\n"
1078  << "Contact a Framework developer\n";
1079  }
1080 
1083  << "NoProcessProductResolver::productUnavailable_() not implemented and should never be called.\n"
1084  << "Contact a Framework developer\n";
1085  }
1086 
1089  << "NoProcessProductResolver::productResolved_() not implemented and should never be called.\n"
1090  << "Contact a Framework developer\n";
1091  }
1092 
1095  << "NoProcessProductResolver::productWasDeleted_() not implemented and should never be called.\n"
1096  << "Contact a Framework developer\n";
1097  }
1098 
1099  bool NoProcessProductResolver::productWasFetchedAndIsValid_(bool /*iSkipCurrentProcess*/) const {
1101  << "NoProcessProductResolver::productWasFetchedAndIsValid_() not implemented and should never be called.\n"
1102  << "Contact a Framework developer\n";
1103  }
1104 
1107  << "NoProcessProductResolver::branchDescription_() not implemented and should never be called.\n"
1108  << "Contact a Framework developer\n";
1109  }
1110 
1111  void NoProcessProductResolver::resetBranchDescription_(std::shared_ptr<BranchDescription const>) {
1113  << "NoProcessProductResolver::resetBranchDescription_() not implemented and should never be called.\n"
1114  << "Contact a Framework developer\n";
1115  }
1116 
1119  << "NoProcessProductResolver::provenance_() not implemented and should never be called.\n"
1120  << "Contact a Framework developer\n";
1121  }
1122 
1125  << "NoProcessProductResolver::connectTo() not implemented and should never be called.\n"
1126  << "Contact a Framework developer\n";
1127  }
1128 
1129  //---- SingleChoiceNoProcessProductResolver ----------------
1131  Principal const& principal,
1132  bool skipCurrentProcess,
1134  ModuleCallingContext const* mcc) const {
1135  //NOTE: Have to lookup the other ProductResolver each time rather than cache
1136  // it's pointer since it appears the pointer can change at some later stage
1138  ->resolveProduct(principal, skipCurrentProcess, sra, mcc);
1139  }
1140 
1142  Principal const& principal,
1143  bool skipCurrentProcess,
1144  ServiceToken const& token,
1146  ModuleCallingContext const* mcc) const {
1148  ->prefetchAsync(waitTask, principal, skipCurrentProcess, token, sra, mcc);
1149  }
1150 
1152 
1154 
1156 
1158 
1160 
1163  << "SingleChoiceNoProcessProductResolver::unscheduledWasNotRun_() not implemented and should never be called.\n"
1164  << "Contact a Framework developer\n";
1165  }
1166 
1169  << "SingleChoiceNoProcessProductResolver::productUnavailable_() not implemented and should never be called.\n"
1170  << "Contact a Framework developer\n";
1171  }
1172 
1175  << "SingleChoiceNoProcessProductResolver::productResolved_() not implemented and should never be called.\n"
1176  << "Contact a Framework developer\n";
1177  }
1178 
1181  << "SingleChoiceNoProcessProductResolver::productWasDeleted_() not implemented and should never be called.\n"
1182  << "Contact a Framework developer\n";
1183  }
1184 
1186  throw Exception(errors::LogicError) << "SingleChoiceNoProcessProductResolver::productWasFetchedAndIsValid_() not "
1187  "implemented and should never be called.\n"
1188  << "Contact a Framework developer\n";
1189  }
1190 
1193  << "SingleChoiceNoProcessProductResolver::branchDescription_() not implemented and should never be called.\n"
1194  << "Contact a Framework developer\n";
1195  }
1196 
1197  void SingleChoiceNoProcessProductResolver::resetBranchDescription_(std::shared_ptr<BranchDescription const>) {
1198  throw Exception(errors::LogicError) << "SingleChoiceNoProcessProductResolver::resetBranchDescription_() not "
1199  "implemented and should never be called.\n"
1200  << "Contact a Framework developer\n";
1201  }
1202 
1205  << "SingleChoiceNoProcessProductResolver::provenance_() not implemented and should never be called.\n"
1206  << "Contact a Framework developer\n";
1207  }
1208 
1211  << "SingleChoiceNoProcessProductResolver::connectTo() not implemented and should never be called.\n"
1212  << "Contact a Framework developer\n";
1213  }
1214 
1215 } // namespace edm
void setProductProvenanceRetriever_(ProductProvenanceRetriever const *provRetriever) override
size
Write out results.
void connectTo(ProductResolverBase const &iOther, Principal const *) final
void setProductID(ProductID const &pid)
ProductData const & getProductData() const final
ProductProvenance const * productProvenancePtr_() const override
void setOrMergeProduct(std::shared_ptr< WrapperBase > prod, MergeableRunProductMetadata const *mergeableRunProductMetadata) const
BranchDescription const & branchDescription_() const override
void setProductProvenanceRetriever_(ProductProvenanceRetriever const *provRetriever) final
std::string const & productInstanceName() const
std::unique_ptr< T, F > make_sentry(T *iObject, F iFunc)
NOTE: if iObject is null, then iFunc will not be called.
Definition: make_sentry.h:30
void resetProductData_(bool deleteEarly) override
static const TGPicture * info(bool iBackgroundIsBlack)
void prefetchAsync_(WaitingTaskHolder waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
#define CMS_SA_ALLOW
void prefetchAsync_(WaitingTaskHolder waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
static constexpr unsigned int kAmbiguousOffset
Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const final
bool singleProduct_() const override
void exceptionContext(cms::Exception &, ESModuleCallingContext const &)
unsigned int ProductResolverIndex
ProductProvenance const * productProvenance() const
Definition: Provenance.cc:24
static constexpr unsigned int kMissingOffset
Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
std::vector< unsigned int > const & lookupProcessOrder() const
Definition: Principal.h:197
std::shared_ptr< BranchDescription const > bd_
Provenance const & provenance() const
Definition: ProductData.h:33
void resetProductData_(bool deleteEarly) override
GlobalContext const * globalContext() const
BranchType const & branchType() const
void setProductID_(ProductID const &pid) final
void setProductProvenanceRetriever_(ProductProvenanceRetriever const *provRetriever) override
Worker * findWorker(std::string const &iLabel) const
void putProduct(std::unique_ptr< WrapperBase > edp) const final
bool dataValidFromResolver(unsigned int iProcessingIndex, Principal const &principal, bool iSkipCurrentProcess) const
void checkType(WrapperBase const &prod) const
std::atomic< bool > prefetchRequested_
void resetProductData_(bool deleteEarly) final
WaitingTaskList & waitingTasks() const
Resolution resolveProduct(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const
void doWorkAsync(WaitingTaskHolder, typename T::TransitionInfoType const &, ServiceToken const &, StreamID, ParentContext const &, typename T::Context const *)
Definition: Worker.h:953
Provenance const * provenance() const
void prefetchAsync_(WaitingTaskHolder waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const final
TypeID unwrappedTypeID() const
void mergeProduct(std::shared_ptr< WrapperBase > edp, MergeableRunProductMetadata const *) const
SwitchProducerProductResolver(std::shared_ptr< BranchDescription const > bd, DataManagingOrAliasProductResolver &realProduct)
reader
Definition: DQM.py:105
void resetProductData_(bool deleteEarly) override
bool productWasFetchedAndIsValid_(bool iSkipCurrentProcess) const override
bool unscheduledWasNotRun_() const override
void prefetchAsync_(WaitingTaskHolder waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
bool singleProduct_() const override
Log< level::Error, false > LogError
bool productUnavailable_() const override
TypeWithDict const & unwrappedType() const
bool isAtEndTransition() const
void reset()
Resets access to the resource so that added tasks will wait.
ProductProvenanceRetriever const * provRetriever_
void connectTo(ProductResolverBase const &iOther, Principal const *) final
assert(be >=bs)
std::string const & processName() const
ProductData const & getProductData() const final
WrapperBase const * wrapper() const
Definition: ProductData.h:35
void resetProductData_(bool deleteEarly) override
void connectTo(ProductResolverBase const &, Principal const *) final
void setProductID(ProductID const &pid)
Definition: ProductData.h:58
Definition: Electron.h:6
DataManagingOrAliasProductResolver & realProduct_
UnscheduledAuxiliary const * aux_
void setupUnscheduled(UnscheduledConfigurator const &) final
ProductProvenance const * productProvenancePtr_() const override
oneapi::tbb::task_group * group() const noexcept
bool singleProduct_() const override
void emit(Args &&... args) const
Definition: Signal.h:48
ProductProvenanceLookup const * store() const
Definition: Provenance.h:60
void setProductID_(ProductID const &pid) override
MergeDecision getMergeDecision(std::string const &processThatCreatedProduct) const
UnscheduledAuxiliary const * aux_
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
void tryPrefetchResolverAsync(unsigned int iProcessingIndex, Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc, ServiceToken token, oneapi::tbb::task_group *) const
virtual bool isFromCurrentProcess() const =0
void setupUnscheduled(UnscheduledConfigurator const &iConfigure) final
std::string const & moduleLabel() const
SwitchBaseProductResolver(std::shared_ptr< BranchDescription const > bd, DataManagingOrAliasProductResolver &realProduct)
std::string const & className() const
ProductProvenance const * productProvenancePtr_() const final
void prefetchAsync_(WaitingTaskHolder waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const final
std::string const & branchName() const
std::atomic< unsigned int > lastSkipCurrentCheckIndex_
void resetProductData()
Definition: ProductData.h:52
std::atomic< bool > prefetchRequested_
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
DataManagingOrAliasProductResolver const & realProduct() const
bool productWasFetchedAndIsValid_(bool iSkipCurrentProcess) const final
void resetProductData_(bool deleteEarly) override
void add(oneapi::tbb::task_group *, WaitingTask *)
Adds task to the waiting list.
void resetProductData_(bool deleteEarly) override
Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
bool isFromCurrentProcess() const final
bool productWasFetchedAndIsValid(bool iSkipCurrentProcess) const
static constexpr const ProductStatus defaultStatus_
Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const final
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:92
std::atomic< bool > prefetchRequested_
std::atomic< unsigned int > lastCheckIndex_
void callWhenDoneAsync(WaitingTaskHolder task)
Definition: Worker.h:168
void setProductID_(ProductID const &pid) override
ProductProvenance const * branchIDToProvenance(BranchID const &bid) const
void setMergeableRunProductMetadata(MergeableRunProductMetadataBase const *mrpm)
Definition: ProductData.h:60
ProductProvenance const * productProvenancePtr_() const override
void putOrMergeProduct(std::unique_ptr< WrapperBase > prod) const override
WrapperBase * unsafe_wrapper() const
Definition: ProductData.h:36
void setProductID_(ProductID const &pid) final
void prefetchAsync_(WaitingTaskHolder waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
ServiceToken lock() const
Definition: ServiceToken.h:101
void prefetchAsync(WaitingTaskHolder waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const
void setCache(bool skipCurrentProcess, ProductResolverIndex index, std::exception_ptr exceptionPtr) const
NoProcessProductResolver(std::vector< ProductResolverIndex > const &matchingHolders, std::vector< bool > const &ambiguous, bool madeAtEnd)
std::atomic< bool > skippingPrefetchRequested_
Resolution resolveProductImpl(Resolution) const
std::atomic< ProductStatus > theStatus_
std::vector< bool > ambiguous_
std::type_info const & unvalidatedTypeInfo() const
Definition: TypeWithDict.h:81
void setProductProvenanceRetriever_(ProductProvenanceRetriever const *provRetriever) override
void resetProductData_(bool deleteEarly) override
static constexpr unsigned int kUnsetOffset
void setProductProvenanceRetriever_(ProductProvenanceRetriever const *provRetriever) override
Provenance const * provenance_() const override
void setupUnscheduled(UnscheduledConfigurator const &) final
void setMergeableRunProductMetadataInProductData(MergeableRunProductMetadata const *)
void setProvenance(ProductProvenanceLookup const *provRetriever)
Definition: ProductData.h:56
bool isPresent() const
Definition: WrapperBase.h:30
def load(fileName)
Definition: svgfig.py:547
FunctorTask< F > * make_functor_task(F f)
Definition: FunctorTask.h:44
ProductStatus defaultStatus() const
BranchDescription const & branchDescription() const
void putProduct(std::unique_ptr< WrapperBase > edp) const override
void prefetchFailed(unsigned int iProcessingIndex, Principal const &principal, bool iSkipCurrentProcess, std::exception_ptr iExceptPtr) const
void setProductID_(ProductID const &pid) override
void setMergeableRunProductMetadata_(MergeableRunProductMetadata const *) override
DataManagingOrAliasProductResolver & realProduct_
signalslot::Signal< void(StreamContext const &, ModuleCallingContext const &)> preModuleDelayedGetSignal_
Transition transition() const
Definition: GlobalContext.h:55
std::vector< ProductResolverIndex > matchingHolders_
Resolution tryResolver(unsigned int index, Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const
Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
bool availableOnlyAtEndTransition() const
HLT enums.
unsigned int unsetIndexValue() const
StreamContext const * getStreamContext() const
void retrieveAndMerge_(Principal const &principal, MergeableRunProductMetadata const *mergeableRunProductMetadata) const override
void setProduct(std::unique_ptr< WrapperBase > edp) const
void unsafe_setWrapper(std::unique_ptr< WrapperBase > iValue) const
Definition: ProductData.cc:27
Provenance const * provenance_() const override
bool productWasDeleted_() const override
void prefetchAsync_(WaitingTaskHolder waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
void setupUnscheduled(UnscheduledConfigurator const &) final
Resolution resolveProductImpl(FUNC resolver) const
void resetProductData_(bool deleteEarly) override=0
Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const final
Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
Log< level::Warning, false > LogWarning
void resetProductData_(bool deleteEarly) override
signalslot::Signal< void(StreamContext const &, ModuleCallingContext const &)> postModuleDelayedGetSignal_
UnscheduledAuxiliary const * auxiliary() const
std::atomic< bool > & prefetchRequested() const
static ParentageRegistry * instance()
BranchDescription const & branchDescription_() const override
EventTransitionInfo const & eventTransitionInfo() const
bool productWasFetchedAndIsValid_(bool iSkipCurrentProcess) const override
ConstProductResolverPtr getProductResolverByIndex(ProductResolverIndex const &oid) const
Definition: Principal.cc:562
void setProductID_(ProductID const &pid) override
ProductProvenance const * productProvenancePtr_() const override
void prefetchAsync_(WaitingTaskHolder waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const final
void resetBranchDescription_(std::shared_ptr< BranchDescription const > bd) override
bool insertMapped(value_type const &v)
void resetBranchDescription_(std::shared_ptr< BranchDescription const > bd) override
def move(src, dest)
Definition: eostools.py:511
void setProductProvenanceRetriever_(ProductProvenanceRetriever const *provRetriever) final
DelayedReader * reader() const
Definition: Principal.h:187
void putProduct(std::unique_ptr< WrapperBase > edp) const override
void setProductProvenanceRetriever(ProductProvenanceRetriever const *provRetriever)
void insertIntoSet(ProductProvenance provenanceProduct) const
static HepMC::HEPEVT_Wrapper wrapper
ParentContext const & parent() const
BranchID const & originalBranchID() const
Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
void connectTo(ProductResolverBase const &iOther, Principal const *iParentPrincipal) final