CMS 3D CMS Logo

ProductResolvers.cc
Go to the documentation of this file.
1 /*----------------------------------------------------------------------
2 ----------------------------------------------------------------------*/
3 #include "ProductResolvers.h"
23 
24 #include <cassert>
25 #include <utility>
26 
27 static constexpr unsigned int kUnsetOffset = 0;
28 static constexpr unsigned int kAmbiguousOffset = 1;
29 static constexpr unsigned int kMissingOffset = 2;
30 
31 namespace edm {
32 
35  exception << "DataManagingProductResolver::resolveProduct_: The product matching all criteria was already deleted\n"
36  << "Looking for type: " << branchDescription().unwrappedTypeID() << "\n"
37  << "Looking for module label: " << moduleLabel() << "\n"
38  << "Looking for productInstanceName: " << productInstanceName() << "\n"
39  << (processName().empty() ? "" : "Looking for process: ") << processName() << "\n"
40  << "This means there is a configuration error.\n"
41  << "The module which is asking for this data must be configured to state that it will read this data.";
42  throw exception;
43  }
44 
45  //This is a templated function in order to avoid calling another virtual function
46  template <bool callResolver, typename FUNC>
48  if (productWasDeleted()) {
50  }
51  auto presentStatus = status();
52 
53  if (callResolver && presentStatus == ProductStatus::ResolveNotRun) {
54  //if resolver fails because of exception or not setting product
55  // make sure the status goes to failed
56  auto failedStatusSetter = [this](ProductStatus* iPresentStatus) {
57  if (this->status() == ProductStatus::ResolveNotRun) {
58  this->setFailedStatus();
59  }
60  *iPresentStatus = this->status();
61  };
62  std::unique_ptr<ProductStatus, decltype(failedStatusSetter)> failedStatusGuard(&presentStatus,
63  failedStatusSetter);
64 
65  //If successful, this will call setProduct
66  resolver();
67  }
68 
69  if (presentStatus == ProductStatus::ProductSet) {
70  auto pd = &getProductData();
71  if (pd->wrapper()->isPresent()) {
72  return Resolution(pd);
73  }
74  }
75 
76  return Resolution(nullptr);
77  }
78 
80  std::shared_ptr<WrapperBase> iFrom, MergeableRunProductMetadata const* mergeableRunProductMetadata) const {
81  // if its not mergeable and the previous read failed, go ahead and use this one
83  setProduct(std::move(iFrom));
84  return;
85  }
86 
88  if (not iFrom) {
89  return;
90  }
91 
92  checkType(*iFrom);
93 
95  if (original->isMergeable()) {
96  if (original->isPresent() != iFrom->isPresent()) {
98  << "Merge of Run or Lumi product failed for branch " << branchDescription().branchName() << "\n"
99  << "Was trying to merge objects where one product had been put in the input file and the other had not "
100  "been."
101  << "\n"
102  << "The solution is to drop the branch on input. Or better do not create inconsistent files\n"
103  << "that need to be merged in the first place.\n";
104  }
105  if (original->isPresent()) {
107  if (mergeableRunProductMetadata == nullptr || desc.branchType() != InRun) {
108  original->mergeProduct(iFrom.get());
109  } else {
111  mergeableRunProductMetadata->getMergeDecision(desc.processName());
112  if (decision == MergeableRunProductMetadata::MERGE) {
113  original->mergeProduct(iFrom.get());
114  } else if (decision == MergeableRunProductMetadata::REPLACE) {
115  // Note this swaps the content of the product where the
116  // both products branches are present and the product is
117  // also present (was put) in the branch. A module might
118  // have already gotten a pointer to the product so we
119  // keep those pointers valid. This does not call swap
120  // on the Wrapper.
121  original->swapProduct(iFrom.get());
122  }
123  // If the decision is IGNORE, do nothing
124  }
125  }
126  // If both have isPresent false, do nothing
127 
128  } else if (original->hasIsProductEqual()) {
129  if (original->isPresent() && iFrom->isPresent()) {
130  if (!original->isProductEqual(iFrom.get())) {
131  auto const& bd = branchDescription();
132  edm::LogError("RunLumiMerging")
133  << "ProductResolver::mergeTheProduct\n"
134  << "Two run/lumi products for the same run/lumi which should be equal are not\n"
135  << "Using the first, ignoring the second\n"
136  << "className = " << bd.className() << "\n"
137  << "moduleLabel = " << bd.moduleLabel() << "\n"
138  << "instance = " << bd.productInstanceName() << "\n"
139  << "process = " << bd.processName() << "\n";
140  }
141  } else if (!original->isPresent() && iFrom->isPresent()) {
142  setProduct(std::move(iFrom));
143  }
144  // if not iFrom->isPresent(), do nothing
145  } else {
146  auto const& bd = branchDescription();
147  edm::LogWarning("RunLumiMerging") << "ProductResolver::mergeTheProduct\n"
148  << "Run/lumi product has neither a mergeProduct nor isProductEqual function\n"
149  << "Using the first, ignoring the second in merge\n"
150  << "className = " << bd.className() << "\n"
151  << "moduleLabel = " << bd.moduleLabel() << "\n"
152  << "instance = " << bd.productInstanceName() << "\n"
153  << "process = " << bd.processName() << "\n";
154  if (!original->isPresent() && iFrom->isPresent()) {
155  setProduct(std::move(iFrom));
156  }
157  // In other cases, do nothing
158  }
159  }
160 
161  namespace {
162  void extendException(cms::Exception& e, BranchDescription const& bd, ModuleCallingContext const* mcc) {
163  e.addContext(std::string("While reading from source ") + bd.className() + " " + bd.moduleLabel() + " '" +
164  bd.productInstanceName() + "' " + bd.processName());
165  if (mcc) {
166  edm::exceptionContext(e, *mcc);
167  }
168  }
169  } // namespace
171  Principal const& principal, bool, SharedResourcesAcquirer*, ModuleCallingContext const* mcc) const {
172  return resolveProductImpl<true>([this, &principal, mcc]() {
173  auto branchType = principal.branchType();
174  if (branchType == InLumi || branchType == InRun) {
175  //delayed get has not been allowed with Run or Lumis
176  // The file may already be closed so the reader is invalid
177  return;
178  }
179  if (mcc and branchType == InEvent and aux_) {
181  }
182 
183  auto sentry(make_sentry(mcc, [this, branchType](ModuleCallingContext const* iContext) {
184  if (branchType == InEvent and aux_) {
185  aux_->postModuleDelayedGetSignal_.emit(*(iContext->getStreamContext()), *iContext);
186  }
187  }));
188 
189  if (auto reader = principal.reader()) {
190  std::unique_lock<std::recursive_mutex> guard;
191  if (auto sr = reader->sharedResources().second) {
192  guard = std::unique_lock<std::recursive_mutex>(*sr);
193  }
194  if (not productResolved()) {
195  try {
196  //another thread could have beaten us here
197  setProduct(reader->getProduct(branchDescription().branchID(), &principal, mcc));
198  } catch (cms::Exception& e) {
199  extendException(e, branchDescription(), mcc);
200  throw;
201  } catch (std::exception const& e) {
202  auto newExcept = edm::Exception(errors::StdException) << e.what();
203  extendException(newExcept, branchDescription(), mcc);
204  throw newExcept;
205  }
206  }
207  }
208  });
209  }
210 
212  Principal const& principal, MergeableRunProductMetadata const* mergeableRunProductMetadata) const {
213  if (auto reader = principal.reader()) {
214  std::unique_lock<std::recursive_mutex> guard;
215  if (auto sr = reader->sharedResources().second) {
216  guard = std::unique_lock<std::recursive_mutex>(*sr);
217  }
218 
219  //Can't use resolveProductImpl since it first checks to see
220  // if the product was already retrieved and then returns if it is
221  auto edp(reader->getProduct(branchDescription().branchID(), &principal));
222 
223  if (edp.get() != nullptr) {
224  if (edp->isMergeable() && branchDescription().branchType() == InRun && !edp->hasSwap()) {
226  << "Missing definition of member function swap for branch name " << branchDescription().branchName()
227  << "\n"
228  << "Mergeable data types written to a Run must have the swap member function defined"
229  << "\n";
230  }
232  (status() == ProductStatus::ResolveFailed && !branchDescription().isMergeable())) {
233  setOrMergeProduct(std::move(edp), mergeableRunProductMetadata);
234  } else { // status() == ResolveFailed && branchDescription().isMergeable()
236  << "Merge of Run or Lumi product failed for branch " << branchDescription().branchName() << "\n"
237  << "The product branch was dropped in the first run or lumi fragment and present in a later one"
238  << "\n"
239  << "The solution is to drop the branch on input. Or better do not create inconsistent files\n"
240  << "that need to be merged in the first place.\n";
241  }
242  } else if (status() == defaultStatus()) {
243  setFailedStatus();
244  } else if (status() != ProductStatus::ResolveFailed && branchDescription().isMergeable()) {
246  << "Merge of Run or Lumi product failed for branch " << branchDescription().branchName() << "\n"
247  << "The product branch was present in first run or lumi fragment and dropped in a later one"
248  << "\n"
249  << "The solution is to drop the branch on input. Or better do not create inconsistent files\n"
250  << "that need to be merged in the first place.\n";
251  }
252  // Do nothing in other case. status is ResolveFailed already or
253  // it is not mergeable and the status is ProductSet
254  }
255  }
256 
258  std::shared_ptr<WrapperBase> prod, MergeableRunProductMetadata const* mergeableRunProductMetadata) const {
259  if (status() == defaultStatus()) {
260  //resolveProduct has not been called or it failed
262  } else {
263  mergeProduct(std::move(prod), mergeableRunProductMetadata);
264  }
265  }
266 
269  }
270 
272  Principal const& principal,
273  bool skipCurrentProcess,
274  ServiceToken const& token,
276  ModuleCallingContext const* mcc) const {
277  //need to try changing m_prefetchRequested before adding to m_waitingTasks
278  bool expected = false;
279  bool prefetchRequested = m_prefetchRequested.compare_exchange_strong(expected, true);
280  m_waitingTasks.add(waitTask);
281 
282  if (prefetchRequested) {
283  ServiceWeakToken weakToken = token;
284  auto workToDo = [this, mcc, &principal, weakToken]() {
285  //need to make sure Service system is activated on the reading thread
286  ServiceRegistry::Operate operate(weakToken.lock());
287  // Caught exception is propagated via WaitingTaskList
288  CMS_SA_ALLOW try {
289  resolveProductImpl<true>([this, &principal, mcc]() {
290  if (principal.branchType() != InEvent && principal.branchType() != InProcess) {
291  return;
292  }
293  if (auto reader = principal.reader()) {
294  std::unique_lock<std::recursive_mutex> guard;
295  if (auto sr = reader->sharedResources().second) {
296  guard = std::unique_lock<std::recursive_mutex>(*sr);
297  }
298  if (not productResolved()) {
299  try {
300  //another thread could have finished this while we were waiting
301  setProduct(reader->getProduct(branchDescription().branchID(), &principal, mcc));
302  } catch (cms::Exception& e) {
303  extendException(e, branchDescription(), mcc);
304  throw;
305  } catch (std::exception const& e) {
306  auto newExcept = edm::Exception(errors::StdException) << e.what();
307  extendException(newExcept, branchDescription(), mcc);
308  throw newExcept;
309  }
310  }
311  }
312  });
313  } catch (...) {
314  this->m_waitingTasks.doneWaiting(std::current_exception());
315  return;
316  }
317  this->m_waitingTasks.doneWaiting(nullptr);
318  };
319 
320  SerialTaskQueueChain* queue = nullptr;
321  if (auto reader = principal.reader()) {
322  if (auto shared_res = reader->sharedResources().first) {
323  queue = &(shared_res->serialQueueChain());
324  }
325  }
326  if (queue) {
327  queue->push(*waitTask.group(), workToDo);
328  } else {
329  //Have to create a new task
330  auto t = make_functor_task(workToDo);
331  waitTask.group()->run([t]() {
332  TaskSentry s{t};
333  t->execute();
334  });
335  }
336  }
337  }
338 
340  if (not deleteEarly) {
341  m_prefetchRequested = false;
343  }
345  }
346 
348  aux_ = iConfigure.auxiliary();
349  }
350 
352 
353  void PutOnReadInputProductResolver::putProduct(std::unique_ptr<WrapperBase> edp) const {
354  if (status() != defaultStatus()) {
356  << "Attempt to insert more than one product on branch " << branchDescription().branchName() << "\n";
357  }
358 
359  setProduct(std::move(edp)); // ProductResolver takes ownership
360  }
361 
363 
365  bool skipCurrentProcess,
367  ModuleCallingContext const*) const {
368  return resolveProductImpl<false>([]() { return; });
369  }
370 
372  Principal const& principal,
373  bool skipCurrentProcess,
374  ServiceToken const& token,
376  ModuleCallingContext const* mcc) const {}
377 
378  void PutOnReadInputProductResolver::putOrMergeProduct(std::unique_ptr<WrapperBase> edp) const {
379  setOrMergeProduct(std::move(edp), nullptr);
380  }
381 
383  bool skipCurrentProcess,
385  ModuleCallingContext const*) const {
386  if (!skipCurrentProcess) {
387  //'false' means never call the lambda function
388  return resolveProductImpl<false>([]() { return; });
389  }
390  return Resolution(nullptr);
391  }
392 
394  Principal const& principal,
395  bool skipCurrentProcess,
396  ServiceToken const& token,
398  ModuleCallingContext const* mcc) const {
399  if (not skipCurrentProcess) {
400  if (branchDescription().branchType() == InProcess &&
402  // This is an accessInputProcessBlock transition
403  // We cannot access produced products in those transitions
404  // except for in SubProcesses where they should have already run.
405  return;
406  }
408  if (not mcc->parent().isAtEndTransition()) {
409  return;
410  }
411  }
412 
413  if (waitingTasks_) {
414  // using a waiting task to do a callback guarantees that the
415  // waitingTasks_ list (from the worker) will be released from
416  // waiting even if the module does not put this data product
417  // or the module has an exception while running
418  waitingTasks_->add(waitTask);
419  }
420  }
421  }
422 
424  auto worker = iConfigure.findWorker(branchDescription().moduleLabel());
425  if (worker) {
426  waitingTasks_ = &worker->waitingTaskList();
427  }
428  }
429 
431  aux_ = iConfigure.auxiliary();
433  }
434 
436  bool skipCurrentProcess,
438  ModuleCallingContext const*) const {
439  if (!skipCurrentProcess and worker_) {
440  return resolveProductImpl<false>([] {});
441  }
442  return Resolution(nullptr);
443  }
444 
446  Principal const& principal,
447  bool skipCurrentProcess,
448  ServiceToken const& token,
450  ModuleCallingContext const* mcc) const {
451  if (skipCurrentProcess) {
452  return;
453  }
454  if (worker_ == nullptr) {
455  throw cms::Exception("LogicError") << "UnscheduledProductResolver::prefetchAsync_() called with null worker_. "
456  "This should not happen, please contact framework developers.";
457  }
458  //need to try changing prefetchRequested_ before adding to waitingTasks_
459  bool expected = false;
460  bool prefetchRequested = prefetchRequested_.compare_exchange_strong(expected, true);
461  waitingTasks_.add(waitTask);
462  if (prefetchRequested) {
463  //Have to create a new task which will make sure the state for UnscheduledProductResolver
464  // is properly set after the module has run
465  auto t = make_waiting_task([this](std::exception_ptr const* iPtr) {
466  //The exception is being rethrown because resolveProductImpl sets the ProductResolver to a failed
467  // state for the case where an exception occurs during the call to the function.
468  // Caught exception is propagated via WaitingTaskList
469  CMS_SA_ALLOW try {
470  resolveProductImpl<true>([iPtr]() {
471  if (iPtr) {
472  std::rethrow_exception(*iPtr);
473  }
474  });
475  } catch (...) {
476  waitingTasks_.doneWaiting(std::current_exception());
477  return;
478  }
479  waitingTasks_.doneWaiting(nullptr);
480  });
481 
482  ParentContext parentContext(mcc);
485  WaitingTaskHolder(*waitTask.group(), t),
486  info,
487  token,
488  info.principal().streamID(),
489  parentContext,
490  mcc->getStreamContext());
491  }
492  }
493 
495  if (not deleteEarly) {
496  prefetchRequested_ = false;
498  }
500  }
501 
502  void ProducedProductResolver::putProduct(std::unique_ptr<WrapperBase> edp) const {
503  if (status() != defaultStatus()) {
505  << "Attempt to insert more than one product on branch " << branchDescription().branchName() << "\n";
506  }
507 
508  setProduct(std::move(edp)); // ProductResolver takes ownership
509  }
510 
511  bool ProducedProductResolver::isFromCurrentProcess() const { return true; }
512 
514 
516  // Check if the types match.
517  TypeID typeID(prod.dynamicTypeInfo());
519  // Types do not match.
521  << "Product on branch " << branchDescription().branchName() << " is of wrong type.\n"
522  << "It is supposed to be of type " << branchDescription().className() << ".\n"
523  << "It is actually of type " << typeID.className() << ".\n";
524  }
525  }
526 
527  void DataManagingProductResolver::setProduct(std::unique_ptr<WrapperBase> edp) const {
528  if (edp) {
529  checkType(*edp);
532  } else {
533  setFailedStatus();
534  }
535  }
536  void DataManagingProductResolver::setProduct(std::shared_ptr<WrapperBase> edp) const {
537  if (edp) {
538  checkType(*edp);
541  } else {
542  setFailedStatus();
543  }
544  }
545 
546  // This routine returns true if it is known that currently there is no real product.
547  // If there is a real product, it returns false.
548  // If it is not known if there is a real product, it returns false.
550  auto presentStatus = status();
551  if (presentStatus == ProductStatus::ProductSet) {
552  return !(getProductData().wrapper()->isPresent());
553  }
554  return presentStatus != ProductStatus::ResolveNotRun;
555  }
556 
558  auto s = status();
559  return (s != defaultStatus()) or (s == ProductStatus::ProductDeleted);
560  }
561 
562  // This routine returns true if the product was deleted early in order to save memory
564 
565  bool DataManagingProductResolver::productWasFetchedAndIsValid_(bool iSkipCurrentProcess) const {
566  if (iSkipCurrentProcess and isFromCurrentProcess()) {
567  return false;
568  }
570  if (getProductData().wrapper()->isPresent()) {
571  return true;
572  }
573  }
574  return false;
575  }
576 
578  productData_.setProvenance(provRetriever);
579  }
580 
582 
584  MergeableRunProductMetadata const* mrpm) {
586  }
587 
589  return provenance()->productProvenance();
590  }
591 
595  }
596  if (deleteEarly) {
598  } else {
599  resetStatus();
600  }
601  }
602 
603  bool DataManagingProductResolver::singleProduct_() const { return true; }
604 
607  }
608 
610 
612  return provenance()->productProvenance();
613  }
614 
616 
617  bool AliasProductResolver::singleProduct_() const { return true; }
618 
619  SwitchBaseProductResolver::SwitchBaseProductResolver(std::shared_ptr<BranchDescription const> bd,
621  : realProduct_(realProduct), productData_(std::move(bd)), prefetchRequested_(false) {
622  // Parentage of this branch is always the same by construction, so we can compute the ID just "once" here.
623  Parentage p;
624  p.setParents(std::vector<BranchID>{realProduct.branchDescription().originalBranchID()});
625  parentageID_ = p.id();
627  }
628 
629  void SwitchBaseProductResolver::connectTo(ProductResolverBase const& iOther, Principal const* iParentPrincipal) {
631  << "SwitchBaseProductResolver::connectTo() not implemented and should never be called.\n"
632  << "Contact a Framework developer\n";
633  }
634 
637  }
638 
640  if (res.data() == nullptr)
641  return res;
642  return Resolution(&productData_);
643  }
644 
646  // SwitchProducer will never put anything in the event, and
647  // "false" will make Event::commit_() to call putProduct() with
648  // null unique_ptr<WrapperBase> to signal that the produce() was
649  // run.
650  return false;
651  }
652 
654  productData_.setProvenance(provRetriever);
655  }
656 
658  // insertIntoSet is const, so let's exploit that to fake the getting of the "input" product
660  }
661 
664  realProduct_.resetProductData_(deleteEarly);
665  if (not deleteEarly) {
666  prefetchRequested_ = false;
668  }
669  }
670 
672  // update provenance
674  // Use the Wrapper of the pointed-to resolver, but the provenance of this resolver
675  productData_.unsafe_setWrapper(realProduct().getProductData().sharedConstWrapper());
676  }
677 
678  SwitchProducerProductResolver::SwitchProducerProductResolver(std::shared_ptr<BranchDescription const> bd,
680  : SwitchBaseProductResolver(std::move(bd), realProduct), status_(defaultStatus_) {}
681 
683  bool skipCurrentProcess,
685  ModuleCallingContext const* mcc) const {
687  return resolveProductImpl(realProduct().resolveProduct(principal, skipCurrentProcess, sra, mcc));
688  }
689  return Resolution(nullptr);
690  }
691 
693  Principal const& principal,
694  bool skipCurrentProcess,
695  ServiceToken const& token,
697  ModuleCallingContext const* mcc) const {
698  if (skipCurrentProcess) {
699  return;
700  }
701  if (branchDescription().availableOnlyAtEndTransition() and mcc and not mcc->parent().isAtEndTransition()) {
702  return;
703  }
704 
705  //need to try changing prefetchRequested before adding to waitingTasks
706  bool expected = false;
707  bool doPrefetchRequested = prefetchRequested().compare_exchange_strong(expected, true);
708  waitingTasks().add(waitTask);
709 
710  if (doPrefetchRequested) {
711  //using a waiting task to do a callback guarantees that
712  // the waitingTasks() list will be released from waiting even
713  // if the module does not put this data product or the
714  // module has an exception while running
715  auto waiting = make_waiting_task([this](std::exception_ptr const* iException) {
716  if (nullptr != iException) {
717  waitingTasks().doneWaiting(*iException);
718  } else {
720  waitingTasks().doneWaiting(std::exception_ptr());
721  }
722  });
723  worker()->callWhenDoneAsync(WaitingTaskHolder(*waitTask.group(), waiting));
724  }
725  }
726 
727  void SwitchProducerProductResolver::putProduct(std::unique_ptr<WrapperBase> edp) const {
728  if (status_ != defaultStatus_) {
730  << "Attempt to insert more than one product for a branch " << branchDescription().branchName()
731  << "This makes no sense for SwitchProducerProductResolver.\nContact a Framework developer";
732  }
733  // Let's use ResolveFailed to signal that produce() was called, as
734  // there is no real product in this resolver
736  bool expected = false;
737  if (prefetchRequested().compare_exchange_strong(expected, true)) {
739  waitingTasks().doneWaiting(std::exception_ptr());
740  }
741  }
742 
744  // if produce() was run (ResolveFailed), ask from the real resolver
746  return realProduct().productUnavailable();
747  }
748  return true;
749  }
750 
753  if (not deleteEarly) {
755  }
756  }
757 
759  bool skipCurrentProcess,
761  ModuleCallingContext const* mcc) const {
762  return resolveProductImpl(realProduct().resolveProduct(principal, skipCurrentProcess, sra, mcc));
763  }
764 
766  Principal const& principal,
767  bool skipCurrentProcess,
768  ServiceToken const& token,
770  ModuleCallingContext const* mcc) const {
771  if (skipCurrentProcess) {
772  return;
773  }
774 
775  //need to try changing prefetchRequested_ before adding to waitingTasks_
776  bool expected = false;
777  bool doPrefetchRequested = prefetchRequested().compare_exchange_strong(expected, true);
778  waitingTasks().add(waitTask);
779 
780  if (doPrefetchRequested) {
781  //using a waiting task to do a callback guarantees that
782  // the waitingTasks() list will be released from waiting even
783  // if the module does not put this data product or the
784  // module has an exception while running
785  auto waiting = make_waiting_task([this](std::exception_ptr const* iException) {
786  if (nullptr != iException) {
787  waitingTasks().doneWaiting(*iException);
788  } else {
790  waitingTasks().doneWaiting(std::exception_ptr());
791  }
792  });
794  WaitingTaskHolder(*waitTask.group(), waiting), principal, skipCurrentProcess, token, sra, mcc);
795  }
796  }
797 
799  provRetriever_ = provRetriever;
800  }
801 
803 
805  return provRetriever_ ? provRetriever_->branchIDToProvenance(bd_->originalBranchID()) : nullptr;
806  }
807 
809 
810  bool ParentProcessProductResolver::singleProduct_() const { return true; }
811 
813  // In principle, this ought to be fixed. I noticed one hits this error
814  // when in a SubProcess and calling the Event::getProvenance function
815  // with a BranchID to a branch from an earlier SubProcess or the top
816  // level process and this branch is not kept in this SubProcess. It might
817  // be possible to hit this in other contexts. I say it ought to be
818  // fixed because one does not encounter this issue if the SubProcesses
819  // are split into genuinely different processes (in principle that
820  // ought to give identical behavior and results). No user has ever
821  // reported this issue which has been around for some time and it was only
822  // noticed when testing some rare corner cases after modifying Core code.
823  // After discussing this with Chris we decided that at least for the moment
824  // there are higher priorities than fixing this ... I converted it so it
825  // causes an exception instead of a seg fault. The issue that may need to
826  // be addressed someday is how ProductResolvers for non-kept branches are
827  // connected to earlier SubProcesses.
829  << "ParentProcessProductResolver::throwNullRealProduct RealProduct pointer not set in this context.\n"
830  << "Contact a Framework developer\n";
831  }
832 
833  NoProcessProductResolver::NoProcessProductResolver(std::vector<ProductResolverIndex> const& matchingHolders,
834  std::vector<bool> const& ambiguous,
835  bool madeAtEnd)
836  : matchingHolders_(matchingHolders),
837  ambiguous_(ambiguous),
838  lastCheckIndex_(ambiguous_.size() + kUnsetOffset),
839  lastSkipCurrentCheckIndex_(lastCheckIndex_.load()),
840  prefetchRequested_(false),
841  skippingPrefetchRequested_(false),
842  madeAtEnd_{madeAtEnd} {
843  assert(ambiguous_.size() == matchingHolders_.size());
844  }
845 
847  Principal const& principal,
848  bool skipCurrentProcess,
850  ModuleCallingContext const* mcc) const {
851  ProductResolverBase const* productResolver = principal.getProductResolverByIndex(matchingHolders_[index]);
852  return productResolver->resolveProduct(principal, skipCurrentProcess, sra, mcc);
853  }
854 
856  bool skipCurrentProcess,
858  ModuleCallingContext const* mcc) const {
859  //See if we've already cached which Resolver we should call or if
860  // we know it is ambiguous
861  const unsigned int choiceSize = ambiguous_.size();
862 
863  //madeAtEnd_==true and not at end transition is the same as skipping the current process
864  if ((not skipCurrentProcess) and (madeAtEnd_ and mcc)) {
865  skipCurrentProcess = not mcc->parent().isAtEndTransition();
866  }
867 
868  unsigned int checkCacheIndex = skipCurrentProcess ? lastSkipCurrentCheckIndex_.load() : lastCheckIndex_.load();
869  if (checkCacheIndex != choiceSize + kUnsetOffset) {
870  if (checkCacheIndex == choiceSize + kAmbiguousOffset) {
872  } else if (checkCacheIndex == choiceSize + kMissingOffset) {
873  return Resolution(nullptr);
874  }
875  return tryResolver(checkCacheIndex, principal, skipCurrentProcess, sra, mcc);
876  }
877 
878  std::atomic<unsigned int>& updateCacheIndex = skipCurrentProcess ? lastSkipCurrentCheckIndex_ : lastCheckIndex_;
879 
880  std::vector<unsigned int> const& lookupProcessOrder = principal.lookupProcessOrder();
881  for (unsigned int k : lookupProcessOrder) {
882  assert(k < ambiguous_.size());
883  if (k == 0)
884  break; // Done
885  if (ambiguous_[k]) {
886  updateCacheIndex = choiceSize + kAmbiguousOffset;
888  }
890  auto resolution = tryResolver(k, principal, skipCurrentProcess, sra, mcc);
891  if (resolution.data() != nullptr) {
892  updateCacheIndex = k;
893  return resolution;
894  }
895  }
896  }
897 
898  updateCacheIndex = choiceSize + kMissingOffset;
899  return Resolution(nullptr);
900  }
901 
903  Principal const& principal,
904  bool skipCurrentProcess,
905  ServiceToken const& token,
907  ModuleCallingContext const* mcc) const {
908  bool timeToMakeAtEnd = true;
909  if (madeAtEnd_ and mcc) {
910  timeToMakeAtEnd = mcc->parent().isAtEndTransition();
911  }
912 
913  //If timeToMakeAtEnd is false, then it is equivalent to skipping the current process
914  if (not skipCurrentProcess and timeToMakeAtEnd) {
915  //need to try changing prefetchRequested_ before adding to waitingTasks_
916  bool expected = false;
917  bool prefetchRequested = prefetchRequested_.compare_exchange_strong(expected, true);
918  waitingTasks_.add(waitTask);
919 
920  if (prefetchRequested) {
921  //we are the first thread to request
922  tryPrefetchResolverAsync(0, principal, false, sra, mcc, token, waitTask.group());
923  }
924  } else {
925  skippingWaitingTasks_.add(waitTask);
926  bool expected = false;
927  if (skippingPrefetchRequested_.compare_exchange_strong(expected, true)) {
928  //we are the first thread to request
929  tryPrefetchResolverAsync(0, principal, true, sra, mcc, token, waitTask.group());
930  }
931  }
932  }
933 
934  void NoProcessProductResolver::setCache(bool iSkipCurrentProcess,
935  ProductResolverIndex iIndex,
936  std::exception_ptr iExceptPtr) const {
937  if (not iSkipCurrentProcess) {
938  lastCheckIndex_ = iIndex;
939  waitingTasks_.doneWaiting(iExceptPtr);
940  } else {
943  }
944  }
945 
946  namespace {
947  class TryNextResolverWaitingTask : public edm::WaitingTask {
948  public:
949  TryNextResolverWaitingTask(NoProcessProductResolver const* iResolver,
950  unsigned int iResolverIndex,
951  Principal const* iPrincipal,
953  ModuleCallingContext const* iMCC,
954  bool iSkipCurrentProcess,
955  ServiceToken iToken,
956  oneapi::tbb::task_group* iGroup)
957  : resolver_(iResolver),
958  principal_(iPrincipal),
959  sra_(iSRA),
960  mcc_(iMCC),
961  group_(iGroup),
962  serviceToken_(iToken),
963  index_(iResolverIndex),
964  skipCurrentProcess_(iSkipCurrentProcess) {}
965 
966  void execute() final {
967  auto exceptPtr = exceptionPtr();
968  if (exceptPtr) {
969  resolver_->prefetchFailed(index_, *principal_, skipCurrentProcess_, *exceptPtr);
970  } else {
971  if (not resolver_->dataValidFromResolver(index_, *principal_, skipCurrentProcess_)) {
972  resolver_->tryPrefetchResolverAsync(
973  index_ + 1, *principal_, skipCurrentProcess_, sra_, mcc_, serviceToken_.lock(), group_);
974  }
975  }
976  }
977 
978  private:
979  NoProcessProductResolver const* resolver_;
980  Principal const* principal_;
982  ModuleCallingContext const* mcc_;
983  oneapi::tbb::task_group* group_;
984  ServiceWeakToken serviceToken_;
985  unsigned int index_;
986  bool skipCurrentProcess_;
987  };
988  } // namespace
989 
990  void NoProcessProductResolver::prefetchFailed(unsigned int iProcessingIndex,
991  Principal const& principal,
992  bool iSkipCurrentProcess,
993  std::exception_ptr iExceptPtr) const {
994  std::vector<unsigned int> const& lookupProcessOrder = principal.lookupProcessOrder();
995  auto k = lookupProcessOrder[iProcessingIndex];
996 
997  setCache(iSkipCurrentProcess, k, iExceptPtr);
998  }
999 
1000  bool NoProcessProductResolver::dataValidFromResolver(unsigned int iProcessingIndex,
1001  Principal const& principal,
1002  bool iSkipCurrentProcess) const {
1003  std::vector<unsigned int> const& lookupProcessOrder = principal.lookupProcessOrder();
1004  auto k = lookupProcessOrder[iProcessingIndex];
1005  ProductResolverBase const* productResolver = principal.getProductResolverByIndex(matchingHolders_[k]);
1006 
1007  if (productResolver->productWasFetchedAndIsValid(iSkipCurrentProcess)) {
1008  setCache(iSkipCurrentProcess, k, nullptr);
1009  return true;
1010  }
1011  return false;
1012  }
1013 
1014  void NoProcessProductResolver::tryPrefetchResolverAsync(unsigned int iProcessingIndex,
1015  Principal const& principal,
1016  bool skipCurrentProcess,
1018  ModuleCallingContext const* mcc,
1020  oneapi::tbb::task_group* group) const {
1021  std::vector<unsigned int> const& lookupProcessOrder = principal.lookupProcessOrder();
1022  auto index = iProcessingIndex;
1023 
1024  const unsigned int choiceSize = ambiguous_.size();
1025  unsigned int newCacheIndex = choiceSize + kMissingOffset;
1026  while (index < lookupProcessOrder.size()) {
1027  auto k = lookupProcessOrder[index];
1028  if (k == 0) {
1029  break;
1030  }
1031  assert(k < ambiguous_.size());
1032  if (ambiguous_[k]) {
1033  newCacheIndex = choiceSize + kAmbiguousOffset;
1034  break;
1035  }
1037  //make new task
1038 
1039  auto task = new TryNextResolverWaitingTask(this, index, &principal, sra, mcc, skipCurrentProcess, token, group);
1040  WaitingTaskHolder hTask(*group, task);
1041  ProductResolverBase const* productResolver = principal.getProductResolverByIndex(matchingHolders_[k]);
1042 
1043  //Make sure the Services are available on this thread
1045 
1046  productResolver->prefetchAsync(hTask, principal, skipCurrentProcess, token, sra, mcc);
1047  return;
1048  }
1049  ++index;
1050  }
1051  //data product unavailable
1052  setCache(skipCurrentProcess, newCacheIndex, nullptr);
1053  }
1054 
1056 
1058 
1060 
1061  inline unsigned int NoProcessProductResolver::unsetIndexValue() const { return ambiguous_.size() + kUnsetOffset; }
1062 
1064  // This function should never receive 'true'. On the other hand,
1065  // nothing should break if a 'true' is passed, because
1066  // NoProcessProductResolver just forwards the resolve
1067  const auto resetValue = unsetIndexValue();
1068  lastCheckIndex_ = resetValue;
1069  lastSkipCurrentCheckIndex_ = resetValue;
1070  prefetchRequested_ = false;
1072  waitingTasks_.reset();
1074  }
1075 
1076  bool NoProcessProductResolver::singleProduct_() const { return false; }
1077 
1080  << "NoProcessProductResolver::unscheduledWasNotRun_() not implemented and should never be called.\n"
1081  << "Contact a Framework developer\n";
1082  }
1083 
1086  << "NoProcessProductResolver::productUnavailable_() not implemented and should never be called.\n"
1087  << "Contact a Framework developer\n";
1088  }
1089 
1092  << "NoProcessProductResolver::productResolved_() not implemented and should never be called.\n"
1093  << "Contact a Framework developer\n";
1094  }
1095 
1098  << "NoProcessProductResolver::productWasDeleted_() not implemented and should never be called.\n"
1099  << "Contact a Framework developer\n";
1100  }
1101 
1102  bool NoProcessProductResolver::productWasFetchedAndIsValid_(bool /*iSkipCurrentProcess*/) const {
1104  << "NoProcessProductResolver::productWasFetchedAndIsValid_() not implemented and should never be called.\n"
1105  << "Contact a Framework developer\n";
1106  }
1107 
1110  << "NoProcessProductResolver::branchDescription_() not implemented and should never be called.\n"
1111  << "Contact a Framework developer\n";
1112  }
1113 
1114  void NoProcessProductResolver::resetBranchDescription_(std::shared_ptr<BranchDescription const>) {
1116  << "NoProcessProductResolver::resetBranchDescription_() not implemented and should never be called.\n"
1117  << "Contact a Framework developer\n";
1118  }
1119 
1122  << "NoProcessProductResolver::provenance_() not implemented and should never be called.\n"
1123  << "Contact a Framework developer\n";
1124  }
1125 
1128  << "NoProcessProductResolver::connectTo() not implemented and should never be called.\n"
1129  << "Contact a Framework developer\n";
1130  }
1131 
1132  //---- SingleChoiceNoProcessProductResolver ----------------
1134  Principal const& principal,
1135  bool skipCurrentProcess,
1137  ModuleCallingContext const* mcc) const {
1138  //NOTE: Have to lookup the other ProductResolver each time rather than cache
1139  // it's pointer since it appears the pointer can change at some later stage
1141  ->resolveProduct(principal, skipCurrentProcess, sra, mcc);
1142  }
1143 
1145  Principal const& principal,
1146  bool skipCurrentProcess,
1147  ServiceToken const& token,
1149  ModuleCallingContext const* mcc) const {
1151  ->prefetchAsync(waitTask, principal, skipCurrentProcess, token, sra, mcc);
1152  }
1153 
1155 
1157 
1159 
1161 
1163 
1166  << "SingleChoiceNoProcessProductResolver::unscheduledWasNotRun_() not implemented and should never be called.\n"
1167  << "Contact a Framework developer\n";
1168  }
1169 
1172  << "SingleChoiceNoProcessProductResolver::productUnavailable_() not implemented and should never be called.\n"
1173  << "Contact a Framework developer\n";
1174  }
1175 
1178  << "SingleChoiceNoProcessProductResolver::productResolved_() not implemented and should never be called.\n"
1179  << "Contact a Framework developer\n";
1180  }
1181 
1184  << "SingleChoiceNoProcessProductResolver::productWasDeleted_() not implemented and should never be called.\n"
1185  << "Contact a Framework developer\n";
1186  }
1187 
1189  throw Exception(errors::LogicError) << "SingleChoiceNoProcessProductResolver::productWasFetchedAndIsValid_() not "
1190  "implemented and should never be called.\n"
1191  << "Contact a Framework developer\n";
1192  }
1193 
1196  << "SingleChoiceNoProcessProductResolver::branchDescription_() not implemented and should never be called.\n"
1197  << "Contact a Framework developer\n";
1198  }
1199 
1200  void SingleChoiceNoProcessProductResolver::resetBranchDescription_(std::shared_ptr<BranchDescription const>) {
1201  throw Exception(errors::LogicError) << "SingleChoiceNoProcessProductResolver::resetBranchDescription_() not "
1202  "implemented and should never be called.\n"
1203  << "Contact a Framework developer\n";
1204  }
1205 
1208  << "SingleChoiceNoProcessProductResolver::provenance_() not implemented and should never be called.\n"
1209  << "Contact a Framework developer\n";
1210  }
1211 
1214  << "SingleChoiceNoProcessProductResolver::connectTo() not implemented and should never be called.\n"
1215  << "Contact a Framework developer\n";
1216  }
1217 
1218 } // namespace edm
void setProductProvenanceRetriever_(ProductProvenanceRetriever const *provRetriever) override
size
Write out results.
void connectTo(ProductResolverBase const &iOther, Principal const *) final
void setProductID(ProductID const &pid)
ProductData const & getProductData() const final
ProductProvenance const * productProvenancePtr_() const override
void setOrMergeProduct(std::shared_ptr< WrapperBase > prod, MergeableRunProductMetadata const *mergeableRunProductMetadata) const
BranchDescription const & branchDescription_() const override
void setProductProvenanceRetriever_(ProductProvenanceRetriever const *provRetriever) final
std::string const & productInstanceName() const
std::unique_ptr< T, F > make_sentry(T *iObject, F iFunc)
NOTE: if iObject is null, then iFunc will not be called.
Definition: make_sentry.h:30
void resetProductData_(bool deleteEarly) override
static const TGPicture * info(bool iBackgroundIsBlack)
void prefetchAsync_(WaitingTaskHolder waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
#define CMS_SA_ALLOW
void prefetchAsync_(WaitingTaskHolder waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
static constexpr unsigned int kAmbiguousOffset
Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const final
bool singleProduct_() const override
void exceptionContext(cms::Exception &, ESModuleCallingContext const &)
unsigned int ProductResolverIndex
ProductProvenance const * productProvenance() const
Definition: Provenance.cc:24
static constexpr unsigned int kMissingOffset
Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
std::vector< unsigned int > const & lookupProcessOrder() const
Definition: Principal.h:197
std::shared_ptr< BranchDescription const > bd_
Provenance const & provenance() const
Definition: ProductData.h:33
void resetProductData_(bool deleteEarly) override
GlobalContext const * globalContext() const
BranchType const & branchType() const
void setProductID_(ProductID const &pid) final
void setProductProvenanceRetriever_(ProductProvenanceRetriever const *provRetriever) override
Worker * findWorker(std::string const &iLabel) const
void putProduct(std::unique_ptr< WrapperBase > edp) const final
bool dataValidFromResolver(unsigned int iProcessingIndex, Principal const &principal, bool iSkipCurrentProcess) const
void checkType(WrapperBase const &prod) const
std::atomic< bool > prefetchRequested_
void resetProductData_(bool deleteEarly) final
WaitingTaskList & waitingTasks() const
Resolution resolveProduct(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const
void doWorkAsync(WaitingTaskHolder, typename T::TransitionInfoType const &, ServiceToken const &, StreamID, ParentContext const &, typename T::Context const *)
Definition: Worker.h:934
Provenance const * provenance() const
void prefetchAsync_(WaitingTaskHolder waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const final
TypeID unwrappedTypeID() const
void mergeProduct(std::shared_ptr< WrapperBase > edp, MergeableRunProductMetadata const *) const
SwitchProducerProductResolver(std::shared_ptr< BranchDescription const > bd, DataManagingOrAliasProductResolver &realProduct)
reader
Definition: DQM.py:105
void resetProductData_(bool deleteEarly) override
bool productWasFetchedAndIsValid_(bool iSkipCurrentProcess) const override
bool unscheduledWasNotRun_() const override
void prefetchAsync_(WaitingTaskHolder waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
bool singleProduct_() const override
Log< level::Error, false > LogError
bool productUnavailable_() const override
TypeWithDict const & unwrappedType() const
bool isAtEndTransition() const
void reset()
Resets access to the resource so that added tasks will wait.
ProductProvenanceRetriever const * provRetriever_
void connectTo(ProductResolverBase const &iOther, Principal const *) final
assert(be >=bs)
std::string const & processName() const
ProductData const & getProductData() const final
WrapperBase const * wrapper() const
Definition: ProductData.h:35
void resetProductData_(bool deleteEarly) override
void connectTo(ProductResolverBase const &, Principal const *) final
void setProductID(ProductID const &pid)
Definition: ProductData.h:58
Definition: Electron.h:6
DataManagingOrAliasProductResolver & realProduct_
UnscheduledAuxiliary const * aux_
void setupUnscheduled(UnscheduledConfigurator const &) final
ProductProvenance const * productProvenancePtr_() const override
oneapi::tbb::task_group * group() const noexcept
bool singleProduct_() const override
void emit(Args &&... args) const
Definition: Signal.h:48
ProductProvenanceLookup const * store() const
Definition: Provenance.h:60
void setProductID_(ProductID const &pid) override
MergeDecision getMergeDecision(std::string const &processThatCreatedProduct) const
UnscheduledAuxiliary const * aux_
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
void tryPrefetchResolverAsync(unsigned int iProcessingIndex, Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc, ServiceToken token, oneapi::tbb::task_group *) const
virtual bool isFromCurrentProcess() const =0
void setupUnscheduled(UnscheduledConfigurator const &iConfigure) final
std::string const & moduleLabel() const
SwitchBaseProductResolver(std::shared_ptr< BranchDescription const > bd, DataManagingOrAliasProductResolver &realProduct)
std::string const & className() const
ProductProvenance const * productProvenancePtr_() const final
void prefetchAsync_(WaitingTaskHolder waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const final
std::string const & branchName() const
std::atomic< unsigned int > lastSkipCurrentCheckIndex_
void resetProductData()
Definition: ProductData.h:52
std::atomic< bool > prefetchRequested_
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
DataManagingOrAliasProductResolver const & realProduct() const
bool productWasFetchedAndIsValid_(bool iSkipCurrentProcess) const final
void resetProductData_(bool deleteEarly) override
void add(oneapi::tbb::task_group *, WaitingTask *)
Adds task to the waiting list.
void resetProductData_(bool deleteEarly) override
Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
bool isFromCurrentProcess() const final
bool productWasFetchedAndIsValid(bool iSkipCurrentProcess) const
static constexpr const ProductStatus defaultStatus_
Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const final
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:101
std::atomic< bool > prefetchRequested_
std::atomic< unsigned int > lastCheckIndex_
void callWhenDoneAsync(WaitingTaskHolder task)
Definition: Worker.h:167
void setProductID_(ProductID const &pid) override
ProductProvenance const * branchIDToProvenance(BranchID const &bid) const
void setMergeableRunProductMetadata(MergeableRunProductMetadataBase const *mrpm)
Definition: ProductData.h:60
ProductProvenance const * productProvenancePtr_() const override
void putOrMergeProduct(std::unique_ptr< WrapperBase > prod) const override
WrapperBase * unsafe_wrapper() const
Definition: ProductData.h:36
void setProductID_(ProductID const &pid) final
void prefetchAsync_(WaitingTaskHolder waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
ServiceToken lock() const
Definition: ServiceToken.h:101
void prefetchAsync(WaitingTaskHolder waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const
void setCache(bool skipCurrentProcess, ProductResolverIndex index, std::exception_ptr exceptionPtr) const
NoProcessProductResolver(std::vector< ProductResolverIndex > const &matchingHolders, std::vector< bool > const &ambiguous, bool madeAtEnd)
std::atomic< bool > skippingPrefetchRequested_
Resolution resolveProductImpl(Resolution) const
std::atomic< ProductStatus > theStatus_
std::vector< bool > ambiguous_
std::type_info const & unvalidatedTypeInfo() const
Definition: TypeWithDict.h:81
void setProductProvenanceRetriever_(ProductProvenanceRetriever const *provRetriever) override
void resetProductData_(bool deleteEarly) override
static constexpr unsigned int kUnsetOffset
void setProductProvenanceRetriever_(ProductProvenanceRetriever const *provRetriever) override
Provenance const * provenance_() const override
void setupUnscheduled(UnscheduledConfigurator const &) final
void setMergeableRunProductMetadataInProductData(MergeableRunProductMetadata const *)
void setProvenance(ProductProvenanceLookup const *provRetriever)
Definition: ProductData.h:56
bool isPresent() const
Definition: WrapperBase.h:30
def load(fileName)
Definition: svgfig.py:547
FunctorTask< F > * make_functor_task(F f)
Definition: FunctorTask.h:44
ProductStatus defaultStatus() const
BranchDescription const & branchDescription() const
void putProduct(std::unique_ptr< WrapperBase > edp) const override
void prefetchFailed(unsigned int iProcessingIndex, Principal const &principal, bool iSkipCurrentProcess, std::exception_ptr iExceptPtr) const
void setProductID_(ProductID const &pid) override
void setMergeableRunProductMetadata_(MergeableRunProductMetadata const *) override
DataManagingOrAliasProductResolver & realProduct_
signalslot::Signal< void(StreamContext const &, ModuleCallingContext const &)> preModuleDelayedGetSignal_
Transition transition() const
Definition: GlobalContext.h:53
std::vector< ProductResolverIndex > matchingHolders_
Resolution tryResolver(unsigned int index, Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const
Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
bool availableOnlyAtEndTransition() const
HLT enums.
unsigned int unsetIndexValue() const
StreamContext const * getStreamContext() const
void retrieveAndMerge_(Principal const &principal, MergeableRunProductMetadata const *mergeableRunProductMetadata) const override
void setProduct(std::unique_ptr< WrapperBase > edp) const
void unsafe_setWrapper(std::unique_ptr< WrapperBase > iValue) const
Definition: ProductData.cc:27
Provenance const * provenance_() const override
bool productWasDeleted_() const override
void prefetchAsync_(WaitingTaskHolder waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
void setupUnscheduled(UnscheduledConfigurator const &) final
Resolution resolveProductImpl(FUNC resolver) const
void resetProductData_(bool deleteEarly) override=0
Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const final
Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
Log< level::Warning, false > LogWarning
void resetProductData_(bool deleteEarly) override
signalslot::Signal< void(StreamContext const &, ModuleCallingContext const &)> postModuleDelayedGetSignal_
UnscheduledAuxiliary const * auxiliary() const
std::atomic< bool > & prefetchRequested() const
static ParentageRegistry * instance()
BranchDescription const & branchDescription_() const override
EventTransitionInfo const & eventTransitionInfo() const
bool productWasFetchedAndIsValid_(bool iSkipCurrentProcess) const override
ConstProductResolverPtr getProductResolverByIndex(ProductResolverIndex const &oid) const
Definition: Principal.cc:562
void setProductID_(ProductID const &pid) override
ProductProvenance const * productProvenancePtr_() const override
void prefetchAsync_(WaitingTaskHolder waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const final
void resetBranchDescription_(std::shared_ptr< BranchDescription const > bd) override
bool insertMapped(value_type const &v)
void resetBranchDescription_(std::shared_ptr< BranchDescription const > bd) override
def move(src, dest)
Definition: eostools.py:511
void setProductProvenanceRetriever_(ProductProvenanceRetriever const *provRetriever) final
DelayedReader * reader() const
Definition: Principal.h:187
void putProduct(std::unique_ptr< WrapperBase > edp) const override
void setProductProvenanceRetriever(ProductProvenanceRetriever const *provRetriever)
void insertIntoSet(ProductProvenance provenanceProduct) const
static HepMC::HEPEVT_Wrapper wrapper
ParentContext const & parent() const
BranchID const & originalBranchID() const
Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
void connectTo(ProductResolverBase const &iOther, Principal const *iParentPrincipal) final