CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Groups Pages
ProductResolvers.cc
Go to the documentation of this file.
1 /*----------------------------------------------------------------------
2 ----------------------------------------------------------------------*/
3 #include "ProductResolvers.h"
23 
24 #include <cassert>
25 #include <utility>
26 
27 static constexpr unsigned int kUnsetOffset = 0;
28 static constexpr unsigned int kAmbiguousOffset = 1;
29 static constexpr unsigned int kMissingOffset = 2;
30 
31 namespace edm {
32 
35  exception << "DataManagingProductResolver::resolveProduct_: The product matching all criteria was already deleted\n"
36  << "Looking for type: " << branchDescription().unwrappedTypeID() << "\n"
37  << "Looking for module label: " << moduleLabel() << "\n"
38  << "Looking for productInstanceName: " << productInstanceName() << "\n"
39  << (processName().empty() ? "" : "Looking for process: ") << processName() << "\n"
40  << "This means there is a configuration error.\n"
41  << "The module which is asking for this data must be configured to state that it will read this data.";
42  throw exception;
43  }
44 
45  //This is a templated function in order to avoid calling another virtual function
46  template <bool callResolver, typename FUNC>
48  if (productWasDeleted()) {
50  }
51  auto presentStatus = status();
52 
53  if (callResolver && presentStatus == ProductStatus::ResolveNotRun) {
54  //if resolver fails because of exception or not setting product
55  // make sure the status goes to failed
56  auto failedStatusSetter = [this](ProductStatus* iPresentStatus) {
57  if (this->status() == ProductStatus::ResolveNotRun) {
58  this->setFailedStatus();
59  }
60  *iPresentStatus = this->status();
61  };
62  std::unique_ptr<ProductStatus, decltype(failedStatusSetter)> failedStatusGuard(&presentStatus,
63  failedStatusSetter);
64 
65  //If successful, this will call setProduct
66  resolver();
67  }
68 
69  if (presentStatus == ProductStatus::ProductSet) {
70  auto pd = &getProductData();
71  if (pd->wrapper()->isPresent()) {
72  return Resolution(pd);
73  }
74  }
75 
76  return Resolution(nullptr);
77  }
78 
80  std::shared_ptr<WrapperBase> iFrom, MergeableRunProductMetadata const* mergeableRunProductMetadata) const {
81  // if its not mergeable and the previous read failed, go ahead and use this one
83  setProduct(std::move(iFrom));
84  return;
85  }
86 
88  if (not iFrom) {
89  return;
90  }
91 
92  checkType(*iFrom);
93 
95  if (original->isMergeable()) {
96  if (original->isPresent() != iFrom->isPresent()) {
98  << "Merge of Run or Lumi product failed for branch " << branchDescription().branchName() << "\n"
99  << "Was trying to merge objects where one product had been put in the input file and the other had not "
100  "been."
101  << "\n"
102  << "The solution is to drop the branch on input. Or better do not create inconsistent files\n"
103  << "that need to be merged in the first place.\n";
104  }
105  if (original->isPresent()) {
107  if (mergeableRunProductMetadata == nullptr || desc.branchType() != InRun) {
108  original->mergeProduct(iFrom.get());
109  } else {
111  mergeableRunProductMetadata->getMergeDecision(desc.processName());
112  if (decision == MergeableRunProductMetadata::MERGE) {
113  original->mergeProduct(iFrom.get());
114  } else if (decision == MergeableRunProductMetadata::REPLACE) {
115  // Note this swaps the content of the product where the
116  // both products branches are present and the product is
117  // also present (was put) in the branch. A module might
118  // have already gotten a pointer to the product so we
119  // keep those pointers valid. This does not call swap
120  // on the Wrapper.
121  original->swapProduct(iFrom.get());
122  }
123  // If the decision is IGNORE, do nothing
124  }
125  }
126  // If both have isPresent false, do nothing
127 
128  } else if (original->hasIsProductEqual()) {
129  if (original->isPresent() && iFrom->isPresent()) {
130  if (!original->isProductEqual(iFrom.get())) {
131  auto const& bd = branchDescription();
132  edm::LogError("RunLumiMerging")
133  << "ProductResolver::mergeTheProduct\n"
134  << "Two run/lumi products for the same run/lumi which should be equal are not\n"
135  << "Using the first, ignoring the second\n"
136  << "className = " << bd.className() << "\n"
137  << "moduleLabel = " << bd.moduleLabel() << "\n"
138  << "instance = " << bd.productInstanceName() << "\n"
139  << "process = " << bd.processName() << "\n";
140  }
141  } else if (!original->isPresent() && iFrom->isPresent()) {
142  setProduct(std::move(iFrom));
143  }
144  // if not iFrom->isPresent(), do nothing
145  } else {
146  auto const& bd = branchDescription();
147  edm::LogWarning("RunLumiMerging") << "ProductResolver::mergeTheProduct\n"
148  << "Run/lumi product has neither a mergeProduct nor isProductEqual function\n"
149  << "Using the first, ignoring the second in merge\n"
150  << "className = " << bd.className() << "\n"
151  << "moduleLabel = " << bd.moduleLabel() << "\n"
152  << "instance = " << bd.productInstanceName() << "\n"
153  << "process = " << bd.processName() << "\n";
154  if (!original->isPresent() && iFrom->isPresent()) {
155  setProduct(std::move(iFrom));
156  }
157  // In other cases, do nothing
158  }
159  }
160 
161  namespace {
162  cms::Exception& extendException(cms::Exception& e, BranchDescription const& bd, ModuleCallingContext const* mcc) {
163  e.addContext(std::string("While reading from source ") + bd.className() + " " + bd.moduleLabel() + " '" +
164  bd.productInstanceName() + "' " + bd.processName());
165  if (mcc) {
166  edm::exceptionContext(e, *mcc);
167  }
168  return e;
169  }
170  } // namespace
172  Principal const& principal, bool, SharedResourcesAcquirer*, ModuleCallingContext const* mcc) const {
173  return resolveProductImpl<true>([this, &principal, mcc]() {
174  auto branchType = principal.branchType();
175  if (branchType == InLumi || branchType == InRun) {
176  //delayed get has not been allowed with Run or Lumis
177  // The file may already be closed so the reader is invalid
178  return;
179  }
180  if (mcc and branchType == InEvent and aux_) {
181  aux_->preModuleDelayedGetSignal_.emit(*(mcc->getStreamContext()), *mcc);
182  }
183 
184  auto sentry(make_sentry(mcc, [this, branchType](ModuleCallingContext const* iContext) {
185  if (branchType == InEvent and aux_) {
186  aux_->postModuleDelayedGetSignal_.emit(*(iContext->getStreamContext()), *iContext);
187  }
188  }));
189 
190  if (auto reader = principal.reader()) {
191  std::unique_lock<std::recursive_mutex> guard;
192  if (auto sr = reader->sharedResources().second) {
193  guard = std::unique_lock<std::recursive_mutex>(*sr);
194  }
195  if (not productResolved()) {
196  try {
197  //another thread could have beaten us here
198  setProduct(reader->getProduct(branchDescription().branchID(), &principal, mcc));
199  } catch (cms::Exception& e) {
200  throw extendException(e, branchDescription(), mcc);
201  } catch (std::exception const& e) {
202  auto newExcept = edm::Exception(errors::StdException) << e.what();
203  throw extendException(newExcept, branchDescription(), mcc);
204  }
205  }
206  }
207  });
208  }
209 
211  Principal const& principal, MergeableRunProductMetadata const* mergeableRunProductMetadata) const {
212  if (auto reader = principal.reader()) {
213  std::unique_lock<std::recursive_mutex> guard;
214  if (auto sr = reader->sharedResources().second) {
215  guard = std::unique_lock<std::recursive_mutex>(*sr);
216  }
217 
218  //Can't use resolveProductImpl since it first checks to see
219  // if the product was already retrieved and then returns if it is
220  auto edp(reader->getProduct(branchDescription().branchID(), &principal));
221 
222  if (edp.get() != nullptr) {
223  if (edp->isMergeable() && branchDescription().branchType() == InRun && !edp->hasSwap()) {
225  << "Missing definition of member function swap for branch name " << branchDescription().branchName()
226  << "\n"
227  << "Mergeable data types written to a Run must have the swap member function defined"
228  << "\n";
229  }
231  (status() == ProductStatus::ResolveFailed && !branchDescription().isMergeable())) {
232  setOrMergeProduct(std::move(edp), mergeableRunProductMetadata);
233  } else { // status() == ResolveFailed && branchDescription().isMergeable()
235  << "Merge of Run or Lumi product failed for branch " << branchDescription().branchName() << "\n"
236  << "The product branch was dropped in the first run or lumi fragment and present in a later one"
237  << "\n"
238  << "The solution is to drop the branch on input. Or better do not create inconsistent files\n"
239  << "that need to be merged in the first place.\n";
240  }
241  } else if (status() == defaultStatus()) {
242  setFailedStatus();
243  } else if (status() != ProductStatus::ResolveFailed && branchDescription().isMergeable()) {
245  << "Merge of Run or Lumi product failed for branch " << branchDescription().branchName() << "\n"
246  << "The product branch was present in first run or lumi fragment and dropped in a later one"
247  << "\n"
248  << "The solution is to drop the branch on input. Or better do not create inconsistent files\n"
249  << "that need to be merged in the first place.\n";
250  }
251  // Do nothing in other case. status is ResolveFailed already or
252  // it is not mergeable and the status is ProductSet
253  }
254  }
255 
257  std::shared_ptr<WrapperBase> prod, MergeableRunProductMetadata const* mergeableRunProductMetadata) const {
258  if (status() == defaultStatus()) {
259  //resolveProduct has not been called or it failed
260  setProduct(std::move(prod));
261  } else {
262  mergeProduct(std::move(prod), mergeableRunProductMetadata);
263  }
264  }
265 
268  }
269 
271  Principal const& principal,
272  bool skipCurrentProcess,
273  ServiceToken const& token,
275  ModuleCallingContext const* mcc) const {
276  //need to try changing m_prefetchRequested before adding to m_waitingTasks
277  bool expected = false;
278  bool prefetchRequested = m_prefetchRequested.compare_exchange_strong(expected, true);
279  m_waitingTasks.add(waitTask);
280 
281  if (prefetchRequested) {
282  ServiceWeakToken weakToken = token;
283  auto workToDo = [this, mcc, &principal, weakToken]() {
284  //need to make sure Service system is activated on the reading thread
285  ServiceRegistry::Operate operate(weakToken.lock());
286  // Caught exception is propagated via WaitingTaskList
287  CMS_SA_ALLOW try {
288  resolveProductImpl<true>([this, &principal, mcc]() {
289  if (principal.branchType() != InEvent && principal.branchType() != InProcess) {
290  return;
291  }
292  if (auto reader = principal.reader()) {
293  std::unique_lock<std::recursive_mutex> guard;
294  if (auto sr = reader->sharedResources().second) {
295  guard = std::unique_lock<std::recursive_mutex>(*sr);
296  }
297  if (not productResolved()) {
298  try {
299  //another thread could have finished this while we were waiting
300  setProduct(reader->getProduct(branchDescription().branchID(), &principal, mcc));
301  } catch (cms::Exception& e) {
302  throw extendException(e, branchDescription(), mcc);
303  } catch (std::exception const& e) {
304  auto newExcept = edm::Exception(errors::StdException) << e.what();
305  throw extendException(newExcept, branchDescription(), mcc);
306  }
307  }
308  }
309  });
310  } catch (...) {
311  this->m_waitingTasks.doneWaiting(std::current_exception());
312  return;
313  }
314  this->m_waitingTasks.doneWaiting(nullptr);
315  };
316 
317  SerialTaskQueueChain* queue = nullptr;
318  if (auto reader = principal.reader()) {
319  if (auto shared_res = reader->sharedResources().first) {
320  queue = &(shared_res->serialQueueChain());
321  }
322  }
323  if (queue) {
324  queue->push(*waitTask.group(), workToDo);
325  } else {
326  //Have to create a new task
327  auto t = make_functor_task(workToDo);
328  waitTask.group()->run([t]() {
329  TaskSentry s{t};
330  t->execute();
331  });
332  }
333  }
334  }
335 
337  if (not deleteEarly) {
338  m_prefetchRequested = false;
340  }
342  }
343 
345  aux_ = iConfigure.auxiliary();
346  }
347 
349 
350  void PutOnReadInputProductResolver::putProduct(std::unique_ptr<WrapperBase> edp) const {
351  if (status() != defaultStatus()) {
353  << "Attempt to insert more than one product on branch " << branchDescription().branchName() << "\n";
354  }
355 
356  setProduct(std::move(edp)); // ProductResolver takes ownership
357  }
358 
360 
362  bool skipCurrentProcess,
364  ModuleCallingContext const*) const {
365  return resolveProductImpl<false>([]() { return; });
366  }
367 
369  Principal const& principal,
370  bool skipCurrentProcess,
371  ServiceToken const& token,
373  ModuleCallingContext const* mcc) const {}
374 
375  void PutOnReadInputProductResolver::putOrMergeProduct(std::unique_ptr<WrapperBase> edp) const {
376  setOrMergeProduct(std::move(edp), nullptr);
377  }
378 
380  bool skipCurrentProcess,
382  ModuleCallingContext const*) const {
383  if (!skipCurrentProcess) {
384  //'false' means never call the lambda function
385  return resolveProductImpl<false>([]() { return; });
386  }
387  return Resolution(nullptr);
388  }
389 
391  Principal const& principal,
392  bool skipCurrentProcess,
393  ServiceToken const& token,
395  ModuleCallingContext const* mcc) const {
396  if (not skipCurrentProcess) {
397  if (branchDescription().branchType() == InProcess &&
399  // This is an accessInputProcessBlock transition
400  // We cannot access produced products in those transitions
401  // except for in SubProcesses where they should have already run.
402  return;
403  }
405  if (not mcc->parent().isAtEndTransition()) {
406  return;
407  }
408  }
409 
410  //Need to try modifying prefetchRequested_ before adding to m_waitingTasks
411  bool expected = false;
412  bool prefetchRequested = prefetchRequested_.compare_exchange_strong(expected, true);
413  m_waitingTasks.add(waitTask);
414 
415  if (worker_ and prefetchRequested) {
416  //using a waiting task to do a callback guarantees that
417  // the m_waitingTasks list will be released from waiting even
418  // if the module does not put this data product or the
419  // module has an exception while running
420 
421  auto waiting = make_waiting_task([this](std::exception_ptr const* iException) {
422  if (nullptr != iException) {
423  m_waitingTasks.doneWaiting(*iException);
424  } else {
425  m_waitingTasks.doneWaiting(std::exception_ptr());
426  }
427  });
428  worker_->callWhenDoneAsync(WaitingTaskHolder(*waitTask.group(), waiting));
429  }
430  }
431  }
432 
433  void PuttableProductResolver::putProduct(std::unique_ptr<WrapperBase> edp) const {
435  bool expected = false;
436  if (prefetchRequested_.compare_exchange_strong(expected, true)) {
437  m_waitingTasks.doneWaiting(std::exception_ptr());
438  }
439  }
440 
442  if (not deleteEarly) {
443  prefetchRequested_ = false;
445  }
447  }
448 
451  }
452 
454  aux_ = iConfigure.auxiliary();
456  }
457 
459  bool skipCurrentProcess,
461  ModuleCallingContext const*) const {
462  if (!skipCurrentProcess and worker_) {
463  return resolveProductImpl<false>([] {});
464  }
465  return Resolution(nullptr);
466  }
467 
469  Principal const& principal,
470  bool skipCurrentProcess,
471  ServiceToken const& token,
473  ModuleCallingContext const* mcc) const {
474  if (skipCurrentProcess) {
475  return;
476  }
477  if (worker_ == nullptr) {
478  throw cms::Exception("LogicError") << "UnscheduledProductResolver::prefetchAsync_() called with null worker_. "
479  "This should not happen, please contact framework developers.";
480  }
481  //need to try changing prefetchRequested_ before adding to waitingTasks_
482  bool expected = false;
483  bool prefetchRequested = prefetchRequested_.compare_exchange_strong(expected, true);
484  waitingTasks_.add(waitTask);
485  if (prefetchRequested) {
486  //Have to create a new task which will make sure the state for UnscheduledProductResolver
487  // is properly set after the module has run
488  auto t = make_waiting_task([this](std::exception_ptr const* iPtr) {
489  //The exception is being rethrown because resolveProductImpl sets the ProductResolver to a failed
490  // state for the case where an exception occurs during the call to the function.
491  // Caught exception is propagated via WaitingTaskList
492  CMS_SA_ALLOW try {
493  resolveProductImpl<true>([iPtr]() {
494  if (iPtr) {
495  std::rethrow_exception(*iPtr);
496  }
497  });
498  } catch (...) {
499  waitingTasks_.doneWaiting(std::current_exception());
500  return;
501  }
502  waitingTasks_.doneWaiting(nullptr);
503  });
504 
505  ParentContext parentContext(mcc);
508  WaitingTaskHolder(*waitTask.group(), t),
509  info,
510  token,
511  info.principal().streamID(),
512  parentContext,
513  mcc->getStreamContext());
514  }
515  }
516 
518  if (not deleteEarly) {
519  prefetchRequested_ = false;
521  }
523  }
524 
525  void ProducedProductResolver::putProduct(std::unique_ptr<WrapperBase> edp) const {
526  if (status() != defaultStatus()) {
528  << "Attempt to insert more than one product on branch " << branchDescription().branchName() << "\n";
529  }
530 
531  setProduct(std::move(edp)); // ProductResolver takes ownership
532  }
533 
534  bool ProducedProductResolver::isFromCurrentProcess() const { return true; }
535 
537 
539  // Check if the types match.
540  TypeID typeID(prod.dynamicTypeInfo());
542  // Types do not match.
544  << "Product on branch " << branchDescription().branchName() << " is of wrong type.\n"
545  << "It is supposed to be of type " << branchDescription().className() << ".\n"
546  << "It is actually of type " << typeID.className() << ".\n";
547  }
548  }
549 
550  void DataManagingProductResolver::setProduct(std::unique_ptr<WrapperBase> edp) const {
551  if (edp) {
552  checkType(*edp);
555  } else {
556  setFailedStatus();
557  }
558  }
559  void DataManagingProductResolver::setProduct(std::shared_ptr<WrapperBase> edp) const {
560  if (edp) {
561  checkType(*edp);
564  } else {
565  setFailedStatus();
566  }
567  }
568 
569  // This routine returns true if it is known that currently there is no real product.
570  // If there is a real product, it returns false.
571  // If it is not known if there is a real product, it returns false.
573  auto presentStatus = status();
574  if (presentStatus == ProductStatus::ProductSet) {
575  return !(getProductData().wrapper()->isPresent());
576  }
577  return presentStatus != ProductStatus::ResolveNotRun;
578  }
579 
581  auto s = status();
582  return (s != defaultStatus()) or (s == ProductStatus::ProductDeleted);
583  }
584 
585  // This routine returns true if the product was deleted early in order to save memory
587 
588  bool DataManagingProductResolver::productWasFetchedAndIsValid_(bool iSkipCurrentProcess) const {
589  if (iSkipCurrentProcess and isFromCurrentProcess()) {
590  return false;
591  }
593  if (getProductData().wrapper()->isPresent()) {
594  return true;
595  }
596  }
597  return false;
598  }
599 
601  productData_.setProvenance(provRetriever);
602  }
603 
605 
607  MergeableRunProductMetadata const* mrpm) {
609  }
610 
612  return provenance()->productProvenance();
613  }
614 
618  }
619  if (deleteEarly) {
621  } else {
622  resetStatus();
623  }
624  }
625 
626  bool DataManagingProductResolver::singleProduct_() const { return true; }
627 
630  }
631 
633 
635  return provenance()->productProvenance();
636  }
637 
639 
640  bool AliasProductResolver::singleProduct_() const { return true; }
641 
642  SwitchBaseProductResolver::SwitchBaseProductResolver(std::shared_ptr<BranchDescription const> bd,
644  : realProduct_(realProduct), productData_(std::move(bd)), prefetchRequested_(false) {
645  // Parentage of this branch is always the same by construction, so we can compute the ID just "once" here.
646  Parentage p;
647  p.setParents(std::vector<BranchID>{realProduct.branchDescription().originalBranchID()});
648  parentageID_ = p.id();
650  }
651 
652  void SwitchBaseProductResolver::connectTo(ProductResolverBase const& iOther, Principal const* iParentPrincipal) {
654  << "SwitchBaseProductResolver::connectTo() not implemented and should never be called.\n"
655  << "Contact a Framework developer\n";
656  }
657 
660  }
661 
663  if (res.data() == nullptr)
664  return res;
665  return Resolution(&productData_);
666  }
667 
669  // SwitchProducer will never put anything in the event, and
670  // "false" will make Event::commit_() to call putProduct() with
671  // null unique_ptr<WrapperBase> to signal that the produce() was
672  // run.
673  return false;
674  }
675 
677  productData_.setProvenance(provRetriever);
678  }
679 
681  // insertIntoSet is const, so let's exploit that to fake the getting of the "input" product
683  }
684 
687  realProduct_.resetProductData_(deleteEarly);
688  if (not deleteEarly) {
689  prefetchRequested_ = false;
691  }
692  }
693 
695  // update provenance
697  // Use the Wrapper of the pointed-to resolver, but the provenance of this resolver
698  productData_.unsafe_setWrapper(realProduct().getProductData().sharedConstWrapper());
699  }
700 
701  SwitchProducerProductResolver::SwitchProducerProductResolver(std::shared_ptr<BranchDescription const> bd,
703  : SwitchBaseProductResolver(std::move(bd), realProduct), status_(defaultStatus_) {}
704 
706  bool skipCurrentProcess,
708  ModuleCallingContext const* mcc) const {
710  return resolveProductImpl(realProduct().resolveProduct(principal, skipCurrentProcess, sra, mcc));
711  }
712  return Resolution(nullptr);
713  }
714 
716  Principal const& principal,
717  bool skipCurrentProcess,
718  ServiceToken const& token,
720  ModuleCallingContext const* mcc) const {
721  if (skipCurrentProcess) {
722  return;
723  }
724  if (branchDescription().availableOnlyAtEndTransition() and mcc and not mcc->parent().isAtEndTransition()) {
725  return;
726  }
727 
728  //need to try changing prefetchRequested before adding to waitingTasks
729  bool expected = false;
730  bool doPrefetchRequested = prefetchRequested().compare_exchange_strong(expected, true);
731  waitingTasks().add(waitTask);
732 
733  if (doPrefetchRequested) {
734  //using a waiting task to do a callback guarantees that
735  // the waitingTasks() list will be released from waiting even
736  // if the module does not put this data product or the
737  // module has an exception while running
738  auto waiting = make_waiting_task([this](std::exception_ptr const* iException) {
739  if (nullptr != iException) {
740  waitingTasks().doneWaiting(*iException);
741  } else {
743  waitingTasks().doneWaiting(std::exception_ptr());
744  }
745  });
746  worker()->callWhenDoneAsync(WaitingTaskHolder(*waitTask.group(), waiting));
747  }
748  }
749 
750  void SwitchProducerProductResolver::putProduct(std::unique_ptr<WrapperBase> edp) const {
751  if (status_ != defaultStatus_) {
753  << "Attempt to insert more than one product for a branch " << branchDescription().branchName()
754  << "This makes no sense for SwitchProducerProductResolver.\nContact a Framework developer";
755  }
756  // Let's use ResolveFailed to signal that produce() was called, as
757  // there is no real product in this resolver
759  bool expected = false;
760  if (prefetchRequested().compare_exchange_strong(expected, true)) {
762  waitingTasks().doneWaiting(std::exception_ptr());
763  }
764  }
765 
767  // if produce() was run (ResolveFailed), ask from the real resolver
769  return realProduct().productUnavailable();
770  }
771  return true;
772  }
773 
776  if (not deleteEarly) {
778  }
779  }
780 
782  bool skipCurrentProcess,
784  ModuleCallingContext const* mcc) const {
785  return resolveProductImpl(realProduct().resolveProduct(principal, skipCurrentProcess, sra, mcc));
786  }
787 
789  Principal const& principal,
790  bool skipCurrentProcess,
791  ServiceToken const& token,
793  ModuleCallingContext const* mcc) const {
794  if (skipCurrentProcess) {
795  return;
796  }
797 
798  //need to try changing prefetchRequested_ before adding to waitingTasks_
799  bool expected = false;
800  bool doPrefetchRequested = prefetchRequested().compare_exchange_strong(expected, true);
801  waitingTasks().add(waitTask);
802 
803  if (doPrefetchRequested) {
804  //using a waiting task to do a callback guarantees that
805  // the waitingTasks() list will be released from waiting even
806  // if the module does not put this data product or the
807  // module has an exception while running
808  auto waiting = make_waiting_task([this](std::exception_ptr const* iException) {
809  if (nullptr != iException) {
810  waitingTasks().doneWaiting(*iException);
811  } else {
813  waitingTasks().doneWaiting(std::exception_ptr());
814  }
815  });
817  WaitingTaskHolder(*waitTask.group(), waiting), principal, skipCurrentProcess, token, sra, mcc);
818  }
819  }
820 
822  provRetriever_ = provRetriever;
823  }
824 
826 
828  return provRetriever_ ? provRetriever_->branchIDToProvenance(bd_->originalBranchID()) : nullptr;
829  }
830 
832 
833  bool ParentProcessProductResolver::singleProduct_() const { return true; }
834 
836  // In principle, this ought to be fixed. I noticed one hits this error
837  // when in a SubProcess and calling the Event::getProvenance function
838  // with a BranchID to a branch from an earlier SubProcess or the top
839  // level process and this branch is not kept in this SubProcess. It might
840  // be possible to hit this in other contexts. I say it ought to be
841  // fixed because one does not encounter this issue if the SubProcesses
842  // are split into genuinely different processes (in principle that
843  // ought to give identical behavior and results). No user has ever
844  // reported this issue which has been around for some time and it was only
845  // noticed when testing some rare corner cases after modifying Core code.
846  // After discussing this with Chris we decided that at least for the moment
847  // there are higher priorities than fixing this ... I converted it so it
848  // causes an exception instead of a seg fault. The issue that may need to
849  // be addressed someday is how ProductResolvers for non-kept branches are
850  // connected to earlier SubProcesses.
852  << "ParentProcessProductResolver::throwNullRealProduct RealProduct pointer not set in this context.\n"
853  << "Contact a Framework developer\n";
854  }
855 
856  NoProcessProductResolver::NoProcessProductResolver(std::vector<ProductResolverIndex> const& matchingHolders,
857  std::vector<bool> const& ambiguous,
858  bool madeAtEnd)
859  : matchingHolders_(matchingHolders),
860  ambiguous_(ambiguous),
861  lastCheckIndex_(ambiguous_.size() + kUnsetOffset),
862  lastSkipCurrentCheckIndex_(lastCheckIndex_.load()),
863  prefetchRequested_(false),
864  skippingPrefetchRequested_(false),
865  madeAtEnd_{madeAtEnd} {
866  assert(ambiguous_.size() == matchingHolders_.size());
867  }
868 
870  Principal const& principal,
871  bool skipCurrentProcess,
873  ModuleCallingContext const* mcc) const {
874  ProductResolverBase const* productResolver = principal.getProductResolverByIndex(matchingHolders_[index]);
875  return productResolver->resolveProduct(principal, skipCurrentProcess, sra, mcc);
876  }
877 
879  bool skipCurrentProcess,
881  ModuleCallingContext const* mcc) const {
882  //See if we've already cached which Resolver we should call or if
883  // we know it is ambiguous
884  const unsigned int choiceSize = ambiguous_.size();
885 
886  //madeAtEnd_==true and not at end transition is the same as skipping the current process
887  if ((not skipCurrentProcess) and (madeAtEnd_ and mcc)) {
888  skipCurrentProcess = not mcc->parent().isAtEndTransition();
889  }
890 
891  unsigned int checkCacheIndex = skipCurrentProcess ? lastSkipCurrentCheckIndex_.load() : lastCheckIndex_.load();
892  if (checkCacheIndex != choiceSize + kUnsetOffset) {
893  if (checkCacheIndex == choiceSize + kAmbiguousOffset) {
895  } else if (checkCacheIndex == choiceSize + kMissingOffset) {
896  return Resolution(nullptr);
897  }
898  return tryResolver(checkCacheIndex, principal, skipCurrentProcess, sra, mcc);
899  }
900 
901  std::atomic<unsigned int>& updateCacheIndex = skipCurrentProcess ? lastSkipCurrentCheckIndex_ : lastCheckIndex_;
902 
903  std::vector<unsigned int> const& lookupProcessOrder = principal.lookupProcessOrder();
904  for (unsigned int k : lookupProcessOrder) {
905  assert(k < ambiguous_.size());
906  if (k == 0)
907  break; // Done
908  if (ambiguous_[k]) {
909  updateCacheIndex = choiceSize + kAmbiguousOffset;
911  }
913  auto resolution = tryResolver(k, principal, skipCurrentProcess, sra, mcc);
914  if (resolution.data() != nullptr) {
915  updateCacheIndex = k;
916  return resolution;
917  }
918  }
919  }
920 
921  updateCacheIndex = choiceSize + kMissingOffset;
922  return Resolution(nullptr);
923  }
924 
926  Principal const& principal,
927  bool skipCurrentProcess,
928  ServiceToken const& token,
930  ModuleCallingContext const* mcc) const {
931  bool timeToMakeAtEnd = true;
932  if (madeAtEnd_ and mcc) {
933  timeToMakeAtEnd = mcc->parent().isAtEndTransition();
934  }
935 
936  //If timeToMakeAtEnd is false, then it is equivalent to skipping the current process
937  if (not skipCurrentProcess and timeToMakeAtEnd) {
938  //need to try changing prefetchRequested_ before adding to waitingTasks_
939  bool expected = false;
940  bool prefetchRequested = prefetchRequested_.compare_exchange_strong(expected, true);
941  waitingTasks_.add(waitTask);
942 
943  if (prefetchRequested) {
944  //we are the first thread to request
945  tryPrefetchResolverAsync(0, principal, false, sra, mcc, token, waitTask.group());
946  }
947  } else {
948  skippingWaitingTasks_.add(waitTask);
949  bool expected = false;
950  if (skippingPrefetchRequested_.compare_exchange_strong(expected, true)) {
951  //we are the first thread to request
952  tryPrefetchResolverAsync(0, principal, true, sra, mcc, token, waitTask.group());
953  }
954  }
955  }
956 
957  void NoProcessProductResolver::setCache(bool iSkipCurrentProcess,
958  ProductResolverIndex iIndex,
959  std::exception_ptr iExceptPtr) const {
960  if (not iSkipCurrentProcess) {
961  lastCheckIndex_ = iIndex;
962  waitingTasks_.doneWaiting(iExceptPtr);
963  } else {
966  }
967  }
968 
969  namespace {
970  class TryNextResolverWaitingTask : public edm::WaitingTask {
971  public:
972  TryNextResolverWaitingTask(NoProcessProductResolver const* iResolver,
973  unsigned int iResolverIndex,
974  Principal const* iPrincipal,
976  ModuleCallingContext const* iMCC,
977  bool iSkipCurrentProcess,
978  ServiceToken iToken,
979  oneapi::tbb::task_group* iGroup)
980  : resolver_(iResolver),
981  principal_(iPrincipal),
982  sra_(iSRA),
983  mcc_(iMCC),
984  group_(iGroup),
985  serviceToken_(iToken),
986  index_(iResolverIndex),
987  skipCurrentProcess_(iSkipCurrentProcess) {}
988 
989  void execute() final {
990  auto exceptPtr = exceptionPtr();
991  if (exceptPtr) {
992  resolver_->prefetchFailed(index_, *principal_, skipCurrentProcess_, *exceptPtr);
993  } else {
994  if (not resolver_->dataValidFromResolver(index_, *principal_, skipCurrentProcess_)) {
995  resolver_->tryPrefetchResolverAsync(
996  index_ + 1, *principal_, skipCurrentProcess_, sra_, mcc_, serviceToken_.lock(), group_);
997  }
998  }
999  }
1000 
1001  private:
1002  NoProcessProductResolver const* resolver_;
1003  Principal const* principal_;
1005  ModuleCallingContext const* mcc_;
1006  oneapi::tbb::task_group* group_;
1007  ServiceWeakToken serviceToken_;
1008  unsigned int index_;
1009  bool skipCurrentProcess_;
1010  };
1011  } // namespace
1012 
1013  void NoProcessProductResolver::prefetchFailed(unsigned int iProcessingIndex,
1014  Principal const& principal,
1015  bool iSkipCurrentProcess,
1016  std::exception_ptr iExceptPtr) const {
1017  std::vector<unsigned int> const& lookupProcessOrder = principal.lookupProcessOrder();
1018  auto k = lookupProcessOrder[iProcessingIndex];
1019 
1020  setCache(iSkipCurrentProcess, k, iExceptPtr);
1021  }
1022 
1023  bool NoProcessProductResolver::dataValidFromResolver(unsigned int iProcessingIndex,
1024  Principal const& principal,
1025  bool iSkipCurrentProcess) const {
1026  std::vector<unsigned int> const& lookupProcessOrder = principal.lookupProcessOrder();
1027  auto k = lookupProcessOrder[iProcessingIndex];
1028  ProductResolverBase const* productResolver = principal.getProductResolverByIndex(matchingHolders_[k]);
1029 
1030  if (productResolver->productWasFetchedAndIsValid(iSkipCurrentProcess)) {
1031  setCache(iSkipCurrentProcess, k, nullptr);
1032  return true;
1033  }
1034  return false;
1035  }
1036 
1037  void NoProcessProductResolver::tryPrefetchResolverAsync(unsigned int iProcessingIndex,
1038  Principal const& principal,
1039  bool skipCurrentProcess,
1041  ModuleCallingContext const* mcc,
1043  oneapi::tbb::task_group* group) const {
1044  std::vector<unsigned int> const& lookupProcessOrder = principal.lookupProcessOrder();
1045  auto index = iProcessingIndex;
1046 
1047  const unsigned int choiceSize = ambiguous_.size();
1048  unsigned int newCacheIndex = choiceSize + kMissingOffset;
1049  while (index < lookupProcessOrder.size()) {
1050  auto k = lookupProcessOrder[index];
1051  if (k == 0) {
1052  break;
1053  }
1054  assert(k < ambiguous_.size());
1055  if (ambiguous_[k]) {
1056  newCacheIndex = choiceSize + kAmbiguousOffset;
1057  break;
1058  }
1060  //make new task
1061 
1062  auto task = new TryNextResolverWaitingTask(this, index, &principal, sra, mcc, skipCurrentProcess, token, group);
1063  WaitingTaskHolder hTask(*group, task);
1064  ProductResolverBase const* productResolver = principal.getProductResolverByIndex(matchingHolders_[k]);
1065 
1066  //Make sure the Services are available on this thread
1067  ServiceRegistry::Operate guard(token);
1068 
1069  productResolver->prefetchAsync(hTask, principal, skipCurrentProcess, token, sra, mcc);
1070  return;
1071  }
1072  ++index;
1073  }
1074  //data product unavailable
1075  setCache(skipCurrentProcess, newCacheIndex, nullptr);
1076  }
1077 
1079 
1081 
1083 
1084  inline unsigned int NoProcessProductResolver::unsetIndexValue() const { return ambiguous_.size() + kUnsetOffset; }
1085 
1087  // This function should never receive 'true'. On the other hand,
1088  // nothing should break if a 'true' is passed, because
1089  // NoProcessProductResolver just forwards the resolve
1090  const auto resetValue = unsetIndexValue();
1091  lastCheckIndex_ = resetValue;
1092  lastSkipCurrentCheckIndex_ = resetValue;
1093  prefetchRequested_ = false;
1095  waitingTasks_.reset();
1097  }
1098 
1099  bool NoProcessProductResolver::singleProduct_() const { return false; }
1100 
1103  << "NoProcessProductResolver::unscheduledWasNotRun_() not implemented and should never be called.\n"
1104  << "Contact a Framework developer\n";
1105  }
1106 
1109  << "NoProcessProductResolver::productUnavailable_() not implemented and should never be called.\n"
1110  << "Contact a Framework developer\n";
1111  }
1112 
1115  << "NoProcessProductResolver::productResolved_() not implemented and should never be called.\n"
1116  << "Contact a Framework developer\n";
1117  }
1118 
1121  << "NoProcessProductResolver::productWasDeleted_() not implemented and should never be called.\n"
1122  << "Contact a Framework developer\n";
1123  }
1124 
1125  bool NoProcessProductResolver::productWasFetchedAndIsValid_(bool /*iSkipCurrentProcess*/) const {
1127  << "NoProcessProductResolver::productWasFetchedAndIsValid_() not implemented and should never be called.\n"
1128  << "Contact a Framework developer\n";
1129  }
1130 
1133  << "NoProcessProductResolver::branchDescription_() not implemented and should never be called.\n"
1134  << "Contact a Framework developer\n";
1135  }
1136 
1137  void NoProcessProductResolver::resetBranchDescription_(std::shared_ptr<BranchDescription const>) {
1139  << "NoProcessProductResolver::resetBranchDescription_() not implemented and should never be called.\n"
1140  << "Contact a Framework developer\n";
1141  }
1142 
1145  << "NoProcessProductResolver::provenance_() not implemented and should never be called.\n"
1146  << "Contact a Framework developer\n";
1147  }
1148 
1151  << "NoProcessProductResolver::connectTo() not implemented and should never be called.\n"
1152  << "Contact a Framework developer\n";
1153  }
1154 
1155  //---- SingleChoiceNoProcessProductResolver ----------------
1157  Principal const& principal,
1158  bool skipCurrentProcess,
1160  ModuleCallingContext const* mcc) const {
1161  //NOTE: Have to lookup the other ProductResolver each time rather than cache
1162  // it's pointer since it appears the pointer can change at some later stage
1164  ->resolveProduct(principal, skipCurrentProcess, sra, mcc);
1165  }
1166 
1168  Principal const& principal,
1169  bool skipCurrentProcess,
1170  ServiceToken const& token,
1172  ModuleCallingContext const* mcc) const {
1174  ->prefetchAsync(waitTask, principal, skipCurrentProcess, token, sra, mcc);
1175  }
1176 
1178 
1180 
1182 
1184 
1186 
1189  << "SingleChoiceNoProcessProductResolver::unscheduledWasNotRun_() not implemented and should never be called.\n"
1190  << "Contact a Framework developer\n";
1191  }
1192 
1195  << "SingleChoiceNoProcessProductResolver::productUnavailable_() not implemented and should never be called.\n"
1196  << "Contact a Framework developer\n";
1197  }
1198 
1201  << "SingleChoiceNoProcessProductResolver::productResolved_() not implemented and should never be called.\n"
1202  << "Contact a Framework developer\n";
1203  }
1204 
1207  << "SingleChoiceNoProcessProductResolver::productWasDeleted_() not implemented and should never be called.\n"
1208  << "Contact a Framework developer\n";
1209  }
1210 
1212  throw Exception(errors::LogicError) << "SingleChoiceNoProcessProductResolver::productWasFetchedAndIsValid_() not "
1213  "implemented and should never be called.\n"
1214  << "Contact a Framework developer\n";
1215  }
1216 
1219  << "SingleChoiceNoProcessProductResolver::branchDescription_() not implemented and should never be called.\n"
1220  << "Contact a Framework developer\n";
1221  }
1222 
1223  void SingleChoiceNoProcessProductResolver::resetBranchDescription_(std::shared_ptr<BranchDescription const>) {
1224  throw Exception(errors::LogicError) << "SingleChoiceNoProcessProductResolver::resetBranchDescription_() not "
1225  "implemented and should never be called.\n"
1226  << "Contact a Framework developer\n";
1227  }
1228 
1231  << "SingleChoiceNoProcessProductResolver::provenance_() not implemented and should never be called.\n"
1232  << "Contact a Framework developer\n";
1233  }
1234 
1237  << "SingleChoiceNoProcessProductResolver::connectTo() not implemented and should never be called.\n"
1238  << "Contact a Framework developer\n";
1239  }
1240 
1241 } // namespace edm
void setProductProvenanceRetriever_(ProductProvenanceRetriever const *provRetriever) override
void connectTo(ProductResolverBase const &iOther, Principal const *) final
ProductProvenance const * productProvenancePtr_() const override
void setProductID(ProductID const &pid)
ProductData const & getProductData() const final
Provenance const & provenance() const
Definition: ProductData.h:33
std::string const & branchName() const
ProductProvenanceLookup const * store() const
Definition: Provenance.h:60
ServiceToken lock() const
Definition: ServiceToken.h:101
std::string const & productInstanceName() const
void setProductProvenanceRetriever_(ProductProvenanceRetriever const *provRetriever) final
bool isAtEndTransition() const
std::unique_ptr< T, F > make_sentry(T *iObject, F iFunc)
NOTE: if iObject is null, then iFunc will not be called.
Definition: make_sentry.h:30
DataManagingOrAliasProductResolver const & realProduct() const
void resetProductData_(bool deleteEarly) override
static const TGPicture * info(bool iBackgroundIsBlack)
void prefetchAsync_(WaitingTaskHolder waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
#define CMS_SA_ALLOW
void prefetchAsync_(WaitingTaskHolder waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
Provenance const * provenance() const
BranchType const & branchType() const
void prefetchAsync(WaitingTaskHolder waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const
static constexpr unsigned int kAmbiguousOffset
Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const final
WaitingTaskList & waitingTasks() const
bool singleProduct_() const override
void exceptionContext(cms::Exception &, ESModuleCallingContext const &)
unsigned int ProductResolverIndex
StreamContext const * getStreamContext() const
SwitchProducerProductResolver(std::shared_ptr< BranchDescription const > bd, DataManagingOrAliasProductResolver &realProduct)
static constexpr unsigned int kMissingOffset
SwitchBaseProductResolver(std::shared_ptr< BranchDescription const > bd, DataManagingOrAliasProductResolver &realProduct)
void checkType(WrapperBase const &prod) const
std::type_info const & dynamicTypeInfo() const
Definition: WrapperBase.h:42
Resolution resolveProductImpl(Resolution) const
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventIDconst &, edm::Timestampconst & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
unsigned int unsetIndexValue() const
std::shared_ptr< BranchDescription const > bd_
void resetProductData_(bool deleteEarly) override
ParentageID id() const
Definition: Parentage.cc:17
EventPrincipal & principal()
void setProductID_(ProductID const &pid) final
void setProductProvenanceRetriever_(ProductProvenanceRetriever const *provRetriever) override
void setCache(bool skipCurrentProcess, ProductResolverIndex index, std::exception_ptr exceptionPtr) const
void putProduct(std::unique_ptr< WrapperBase > edp) const final
list original
Definition: definitions.py:57
std::atomic< bool > prefetchRequested_
ProductStatus status() const
void resetProductData_(bool deleteEarly) final
void doWorkAsync(WaitingTaskHolder, typename T::TransitionInfoType const &, ServiceToken const &, StreamID, ParentContext const &, typename T::Context const *)
Definition: Worker.h:931
Resolution tryResolver(unsigned int index, Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const
void prefetchAsync_(WaitingTaskHolder waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const final
WrapperBase const * wrapper() const
Definition: ProductData.h:35
ProductProvenance const * branchIDToProvenance(BranchID const &bid) const
std::string const & processName() const
void resetProductData_(bool deleteEarly) override
bool productWasFetchedAndIsValid_(bool iSkipCurrentProcess) const override
ProductProvenance const * productProvenance() const
Definition: Provenance.cc:24
bool unscheduledWasNotRun_() const override
Resolution resolveProduct(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const
void prefetchAsync_(WaitingTaskHolder waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
bool singleProduct_() const override
Log< level::Error, false > LogError
bool productUnavailable_() const override
std::atomic< bool > prefetchRequested_
void reset()
Resets access to the resource so that added tasks will wait.
ProductProvenanceRetriever const * provRetriever_
void connectTo(ProductResolverBase const &iOther, Principal const *) final
assert(be >=bs)
ProductData const & getProductData() const final
BranchDescription const & branchDescription_() const override
void resetProductData_(bool deleteEarly) override
std::string const & processName() const
Provenance const * provenance_() const override
void connectTo(ProductResolverBase const &, Principal const *) final
ParentContext const & parent() const
void setProductID(ProductID const &pid)
Definition: ProductData.h:58
DataManagingOrAliasProductResolver & realProduct_
UnscheduledAuxiliary const * aux_
bool isPresent() const
Definition: WrapperBase.h:30
void setOrMergeProduct(std::shared_ptr< WrapperBase > prod, MergeableRunProductMetadata const *mergeableRunProductMetadata) const
void setupUnscheduled(UnscheduledConfigurator const &) final
oneapi::tbb::task_group * group() const noexcept
bool singleProduct_() const override
void unsafe_setWrapper(std::unique_ptr< WrapperBase > iValue) const
Definition: ProductData.cc:27
void prefetchFailed(unsigned int iProcessingIndex, Principal const &principal, bool iSkipCurrentProcess, std::exception_ptr iExceptPtr) const
void setProductID_(ProductID const &pid) override
UnscheduledAuxiliary const * aux_
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
virtual bool isFromCurrentProcess() const =0
void setupUnscheduled(UnscheduledConfigurator const &iConfigure) final
std::string const & className() const
std::string const & moduleLabel() const
void prefetchAsync_(WaitingTaskHolder waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const final
std::string const & productInstanceName() const
std::atomic< unsigned int > lastSkipCurrentCheckIndex_
BranchDescription const & branchDescription_() const override
void resetProductData()
Definition: ProductData.h:52
std::vector< unsigned int > const & lookupProcessOrder() const
Definition: Principal.h:197
std::atomic< bool > prefetchRequested_
bool productWasFetchedAndIsValid_(bool iSkipCurrentProcess) const final
void resetProductData_(bool deleteEarly) override
def move
Definition: eostools.py:511
def load
Definition: svgfig.py:547
bool dataValidFromResolver(unsigned int iProcessingIndex, Principal const &principal, bool iSkipCurrentProcess) const
StreamID streamID() const
TypeID unwrappedTypeID() const
void add(oneapi::tbb::task_group *, WaitingTask *)
Adds task to the waiting list.
void resetProductData_(bool deleteEarly) override
Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
bool isFromCurrentProcess() const final
ProductProvenance const * productProvenancePtr_() const final
static constexpr const ProductStatus defaultStatus_
Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const final
void resetBranchDescription_(std::shared_ptr< BranchDescription const > bd) override
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:101
MergeDecision getMergeDecision(std::string const &processThatCreatedProduct) const
std::atomic< bool > prefetchRequested_
tuple reader
Definition: DQM.py:105
std::atomic< unsigned int > lastCheckIndex_
TypeWithDict const & unwrappedType() const
void callWhenDoneAsync(WaitingTaskHolder task)
Definition: Worker.h:167
void setProductID_(ProductID const &pid) override
EventTransitionInfo const & eventTransitionInfo() const
void setMergeableRunProductMetadata(MergeableRunProductMetadataBase const *mrpm)
Definition: ProductData.h:60
void putOrMergeProduct(std::unique_ptr< WrapperBase > prod) const override
std::type_info const & unvalidatedTypeInfo() const
Definition: TypeWithDict.h:81
void setProductID_(ProductID const &pid) final
void prefetchAsync_(WaitingTaskHolder waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
void setProduct(std::unique_ptr< WrapperBase > edp) const
NoProcessProductResolver(std::vector< ProductResolverIndex > const &matchingHolders, std::vector< bool > const &ambiguous, bool madeAtEnd)
std::atomic< bool > skippingPrefetchRequested_
ProductProvenance const * productProvenancePtr_() const override
std::atomic< ProductStatus > theStatus_
std::vector< bool > ambiguous_
Resolution resolveProductImpl(FUNC resolver) const
tuple group
Definition: watchdog.py:82
DelayedReader * reader() const
Definition: Principal.h:187
void setProductProvenanceRetriever_(ProductProvenanceRetriever const *provRetriever) override
UnscheduledAuxiliary const * auxiliary() const
void resetProductData_(bool deleteEarly) override
static constexpr unsigned int kUnsetOffset
void resetProductData_(bool deleteEarly) override
void setProductProvenanceRetriever_(ProductProvenanceRetriever const *provRetriever) override
void setupUnscheduled(UnscheduledConfigurator const &) final
ProductStatus defaultStatus() const
ProductData const * data() const
bool productWasFetchedAndIsValid(bool iSkipCurrentProcess) const
void setMergeableRunProductMetadataInProductData(MergeableRunProductMetadata const *)
void setProvenance(ProductProvenanceLookup const *provRetriever)
Definition: ProductData.h:56
FunctorTask< F > * make_functor_task(F f)
Definition: FunctorTask.h:44
void tryPrefetchResolverAsync(unsigned int iProcessingIndex, Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc, ServiceToken token, oneapi::tbb::task_group *) const
void insertIntoSet(ProductProvenance provenanceProduct) const
void putProduct(std::unique_ptr< WrapperBase > edp) const override
void setProductID_(ProductID const &pid) override
void setMergeableRunProductMetadata_(MergeableRunProductMetadata const *) override
void addContext(std::string const &context)
Definition: Exception.cc:165
DataManagingOrAliasProductResolver & realProduct_
std::vector< ProductResolverIndex > matchingHolders_
void resetBranchDescription_(std::shared_ptr< BranchDescription const > bd) override
Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
Transition transition() const
Definition: GlobalContext.h:53
void retrieveAndMerge_(Principal const &principal, MergeableRunProductMetadata const *mergeableRunProductMetadata) const override
BranchDescription const & branchDescription() const
Worker * findWorker(std::string const &iLabel) const
ConstProductResolverPtr getProductResolverByIndex(ProductResolverIndex const &oid) const
Definition: Principal.cc:562
void setParents(std::vector< BranchID > parents)
Definition: Parentage.h:46
bool productWasDeleted_() const override
void prefetchAsync_(WaitingTaskHolder waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
void setupUnscheduled(UnscheduledConfigurator const &) final
void resetProductData_(bool deleteEarly) override=0
BranchID const & originalBranchID() const
Provenance const * provenance_() const override
Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const final
bool availableOnlyAtEndTransition() const
Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
Log< level::Warning, false > LogWarning
void mergeProduct(std::shared_ptr< WrapperBase > edp, MergeableRunProductMetadata const *) const
void resetProductData_(bool deleteEarly) override
ProductProvenance const * productProvenancePtr_() const override
std::atomic< bool > & prefetchRequested() const
static ParentageRegistry * instance()
GlobalContext const * globalContext() const
bool productWasFetchedAndIsValid_(bool iSkipCurrentProcess) const override
ProductProvenance const * productProvenancePtr_() const override
void setProductID_(ProductID const &pid) override
void prefetchAsync_(WaitingTaskHolder waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const final
tuple size
Write out results.
bool insertMapped(value_type const &v)
void setProductProvenanceRetriever_(ProductProvenanceRetriever const *provRetriever) final
std::string const & moduleLabel() const
void putProduct(std::unique_ptr< WrapperBase > edp) const override
void setProductProvenanceRetriever(ProductProvenanceRetriever const *provRetriever)
WrapperBase * unsafe_wrapper() const
Definition: ProductData.h:36
void push(oneapi::tbb::task_group &iGroup, T &&iAction)
asynchronously pushes functor iAction into queue
static HepMC::HEPEVT_Wrapper wrapper
void putProduct(std::unique_ptr< WrapperBase > edp) const override
Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
void connectTo(ProductResolverBase const &iOther, Principal const *iParentPrincipal) final