CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Groups Pages
ProductResolvers.cc
Go to the documentation of this file.
1 /*----------------------------------------------------------------------
2 ----------------------------------------------------------------------*/
3 #include "ProductResolvers.h"
23 
24 #include <cassert>
25 #include <utility>
26 
27 static constexpr unsigned int kUnsetOffset = 0;
28 static constexpr unsigned int kAmbiguousOffset = 1;
29 static constexpr unsigned int kMissingOffset = 2;
30 
31 namespace edm {
32 
35  exception << "DataManagingProductResolver::resolveProduct_: The product matching all criteria was already deleted\n"
36  << "Looking for type: " << branchDescription().unwrappedTypeID() << "\n"
37  << "Looking for module label: " << moduleLabel() << "\n"
38  << "Looking for productInstanceName: " << productInstanceName() << "\n"
39  << (processName().empty() ? "" : "Looking for process: ") << processName() << "\n"
40  << "This means there is a configuration error.\n"
41  << "The module which is asking for this data must be configured to state that it will read this data.";
42  throw exception;
43  }
44 
45  //This is a templated function in order to avoid calling another virtual function
46  template <bool callResolver, typename FUNC>
48  if (productWasDeleted()) {
50  }
51  auto presentStatus = status();
52 
53  if (callResolver && presentStatus == ProductStatus::ResolveNotRun) {
54  //if resolver fails because of exception or not setting product
55  // make sure the status goes to failed
56  auto failedStatusSetter = [this](ProductStatus* iPresentStatus) {
57  if (this->status() == ProductStatus::ResolveNotRun) {
58  this->setFailedStatus();
59  }
60  *iPresentStatus = this->status();
61  };
62  std::unique_ptr<ProductStatus, decltype(failedStatusSetter)> failedStatusGuard(&presentStatus,
63  failedStatusSetter);
64 
65  //If successful, this will call setProduct
66  resolver();
67  }
68 
69  if (presentStatus == ProductStatus::ProductSet) {
70  auto pd = &getProductData();
71  if (pd->wrapper()->isPresent()) {
72  return Resolution(pd);
73  }
74  }
75 
76  return Resolution(nullptr);
77  }
78 
80  std::shared_ptr<WrapperBase> iFrom, MergeableRunProductMetadata const* mergeableRunProductMetadata) const {
81  // if its not mergeable and the previous read failed, go ahead and use this one
83  setProduct(std::move(iFrom));
84  return;
85  }
86 
88  if (not iFrom) {
89  return;
90  }
91 
92  checkType(*iFrom);
93 
95  if (original->isMergeable()) {
96  if (original->isPresent() != iFrom->isPresent()) {
98  << "Merge of Run or Lumi product failed for branch " << branchDescription().branchName() << "\n"
99  << "Was trying to merge objects where one product had been put in the input file and the other had not "
100  "been."
101  << "\n"
102  << "The solution is to drop the branch on input. Or better do not create inconsistent files\n"
103  << "that need to be merged in the first place.\n";
104  }
105  if (original->isPresent()) {
107  if (mergeableRunProductMetadata == nullptr || desc.branchType() != InRun) {
108  original->mergeProduct(iFrom.get());
109  } else {
111  mergeableRunProductMetadata->getMergeDecision(desc.processName());
112  if (decision == MergeableRunProductMetadata::MERGE) {
113  original->mergeProduct(iFrom.get());
114  } else if (decision == MergeableRunProductMetadata::REPLACE) {
115  // Note this swaps the content of the product where the
116  // both products branches are present and the product is
117  // also present (was put) in the branch. A module might
118  // have already gotten a pointer to the product so we
119  // keep those pointers valid. This does not call swap
120  // on the Wrapper.
121  original->swapProduct(iFrom.get());
122  }
123  // If the decision is IGNORE, do nothing
124  }
125  }
126  // If both have isPresent false, do nothing
127 
128  } else if (original->hasIsProductEqual()) {
129  if (original->isPresent() && iFrom->isPresent()) {
130  if (!original->isProductEqual(iFrom.get())) {
131  auto const& bd = branchDescription();
132  edm::LogError("RunLumiMerging")
133  << "ProductResolver::mergeTheProduct\n"
134  << "Two run/lumi products for the same run/lumi which should be equal are not\n"
135  << "Using the first, ignoring the second\n"
136  << "className = " << bd.className() << "\n"
137  << "moduleLabel = " << bd.moduleLabel() << "\n"
138  << "instance = " << bd.productInstanceName() << "\n"
139  << "process = " << bd.processName() << "\n";
140  }
141  } else if (!original->isPresent() && iFrom->isPresent()) {
142  setProduct(std::move(iFrom));
143  }
144  // if not iFrom->isPresent(), do nothing
145  } else {
146  auto const& bd = branchDescription();
147  edm::LogWarning("RunLumiMerging") << "ProductResolver::mergeTheProduct\n"
148  << "Run/lumi product has neither a mergeProduct nor isProductEqual function\n"
149  << "Using the first, ignoring the second in merge\n"
150  << "className = " << bd.className() << "\n"
151  << "moduleLabel = " << bd.moduleLabel() << "\n"
152  << "instance = " << bd.productInstanceName() << "\n"
153  << "process = " << bd.processName() << "\n";
154  if (!original->isPresent() && iFrom->isPresent()) {
155  setProduct(std::move(iFrom));
156  }
157  // In other cases, do nothing
158  }
159  }
160 
162  Principal const& principal, bool, SharedResourcesAcquirer*, ModuleCallingContext const* mcc) const {
163  return resolveProductImpl<true>([this, &principal, mcc]() {
164  auto branchType = principal.branchType();
165  if (branchType == InLumi || branchType == InRun) {
166  //delayed get has not been allowed with Run or Lumis
167  // The file may already be closed so the reader is invalid
168  return;
169  }
170  if (mcc and branchType == InEvent and aux_) {
171  aux_->preModuleDelayedGetSignal_.emit(*(mcc->getStreamContext()), *mcc);
172  }
173 
174  auto sentry(make_sentry(mcc, [this, branchType](ModuleCallingContext const* iContext) {
175  if (branchType == InEvent and aux_) {
176  aux_->postModuleDelayedGetSignal_.emit(*(iContext->getStreamContext()), *iContext);
177  }
178  }));
179 
180  if (auto reader = principal.reader()) {
181  std::unique_lock<std::recursive_mutex> guard;
182  if (auto sr = reader->sharedResources().second) {
183  guard = std::unique_lock<std::recursive_mutex>(*sr);
184  }
185  if (not productResolved()) {
186  //another thread could have beaten us here
187  setProduct(reader->getProduct(branchDescription().branchID(), &principal, mcc));
188  }
189  }
190  });
191  }
192 
194  Principal const& principal, MergeableRunProductMetadata const* mergeableRunProductMetadata) const {
195  if (auto reader = principal.reader()) {
196  std::unique_lock<std::recursive_mutex> guard;
197  if (auto sr = reader->sharedResources().second) {
198  guard = std::unique_lock<std::recursive_mutex>(*sr);
199  }
200 
201  //Can't use resolveProductImpl since it first checks to see
202  // if the product was already retrieved and then returns if it is
203  auto edp(reader->getProduct(branchDescription().branchID(), &principal));
204 
205  if (edp.get() != nullptr) {
206  if (edp->isMergeable() && branchDescription().branchType() == InRun && !edp->hasSwap()) {
208  << "Missing definition of member function swap for branch name " << branchDescription().branchName()
209  << "\n"
210  << "Mergeable data types written to a Run must have the swap member function defined"
211  << "\n";
212  }
214  (status() == ProductStatus::ResolveFailed && !branchDescription().isMergeable())) {
215  setOrMergeProduct(std::move(edp), mergeableRunProductMetadata);
216  } else { // status() == ResolveFailed && branchDescription().isMergeable()
218  << "Merge of Run or Lumi product failed for branch " << branchDescription().branchName() << "\n"
219  << "The product branch was dropped in the first run or lumi fragment and present in a later one"
220  << "\n"
221  << "The solution is to drop the branch on input. Or better do not create inconsistent files\n"
222  << "that need to be merged in the first place.\n";
223  }
224  } else if (status() == defaultStatus()) {
225  setFailedStatus();
226  } else if (status() != ProductStatus::ResolveFailed && branchDescription().isMergeable()) {
228  << "Merge of Run or Lumi product failed for branch " << branchDescription().branchName() << "\n"
229  << "The product branch was present in first run or lumi fragment and dropped in a later one"
230  << "\n"
231  << "The solution is to drop the branch on input. Or better do not create inconsistent files\n"
232  << "that need to be merged in the first place.\n";
233  }
234  // Do nothing in other case. status is ResolveFailed already or
235  // it is not mergeable and the status is ProductSet
236  }
237  }
238 
240  std::shared_ptr<WrapperBase> prod, MergeableRunProductMetadata const* mergeableRunProductMetadata) const {
241  if (status() == defaultStatus()) {
242  //resolveProduct has not been called or it failed
243  setProduct(std::move(prod));
244  } else {
245  mergeProduct(std::move(prod), mergeableRunProductMetadata);
246  }
247  }
248 
251  }
252 
254  Principal const& principal,
255  bool skipCurrentProcess,
256  ServiceToken const& token,
258  ModuleCallingContext const* mcc) const {
259  //need to try changing m_prefetchRequested before adding to m_waitingTasks
260  bool expected = false;
261  bool prefetchRequested = m_prefetchRequested.compare_exchange_strong(expected, true);
262  m_waitingTasks.add(waitTask);
263 
264  if (prefetchRequested) {
265  ServiceWeakToken weakToken = token;
266  auto workToDo = [this, mcc, &principal, weakToken]() {
267  //need to make sure Service system is activated on the reading thread
268  ServiceRegistry::Operate operate(weakToken.lock());
269  // Caught exception is propagated via WaitingTaskList
270  CMS_SA_ALLOW try {
271  resolveProductImpl<true>([this, &principal, mcc]() {
272  if (principal.branchType() != InEvent && principal.branchType() != InProcess) {
273  return;
274  }
275  if (auto reader = principal.reader()) {
276  std::unique_lock<std::recursive_mutex> guard;
277  if (auto sr = reader->sharedResources().second) {
278  guard = std::unique_lock<std::recursive_mutex>(*sr);
279  }
280  if (not productResolved()) {
281  //another thread could have finished this while we were waiting
282  setProduct(reader->getProduct(branchDescription().branchID(), &principal, mcc));
283  }
284  }
285  });
286  } catch (...) {
287  this->m_waitingTasks.doneWaiting(std::current_exception());
288  return;
289  }
290  this->m_waitingTasks.doneWaiting(nullptr);
291  };
292 
293  SerialTaskQueueChain* queue = nullptr;
294  if (auto reader = principal.reader()) {
295  if (auto shared_res = reader->sharedResources().first) {
296  queue = &(shared_res->serialQueueChain());
297  }
298  }
299  if (queue) {
300  queue->push(*waitTask.group(), workToDo);
301  } else {
302  //Have to create a new task
303  auto t = make_functor_task(workToDo);
304  waitTask.group()->run([t]() {
305  TaskSentry s{t};
306  t->execute();
307  });
308  }
309  }
310  }
311 
313  if (not deleteEarly) {
314  m_prefetchRequested = false;
316  }
318  }
319 
321  aux_ = iConfigure.auxiliary();
322  }
323 
325 
326  void PutOnReadInputProductResolver::putProduct(std::unique_ptr<WrapperBase> edp) const {
327  if (status() != defaultStatus()) {
329  << "Attempt to insert more than one product on branch " << branchDescription().branchName() << "\n";
330  }
331 
332  setProduct(std::move(edp)); // ProductResolver takes ownership
333  }
334 
336 
338  bool skipCurrentProcess,
340  ModuleCallingContext const*) const {
341  return resolveProductImpl<false>([]() { return; });
342  }
343 
345  Principal const& principal,
346  bool skipCurrentProcess,
347  ServiceToken const& token,
349  ModuleCallingContext const* mcc) const {}
350 
351  void PutOnReadInputProductResolver::putOrMergeProduct(std::unique_ptr<WrapperBase> edp) const {
352  setOrMergeProduct(std::move(edp), nullptr);
353  }
354 
356  bool skipCurrentProcess,
358  ModuleCallingContext const*) const {
359  if (!skipCurrentProcess) {
360  //'false' means never call the lambda function
361  return resolveProductImpl<false>([]() { return; });
362  }
363  return Resolution(nullptr);
364  }
365 
367  Principal const& principal,
368  bool skipCurrentProcess,
369  ServiceToken const& token,
371  ModuleCallingContext const* mcc) const {
372  if (not skipCurrentProcess) {
373  if (branchDescription().branchType() == InProcess &&
375  // This is an accessInputProcessBlock transition
376  // We cannot access produced products in those transitions
377  // except for in SubProcesses where they should have already run.
378  return;
379  }
381  if (not mcc->parent().isAtEndTransition()) {
382  return;
383  }
384  }
385 
386  //Need to try modifying prefetchRequested_ before adding to m_waitingTasks
387  bool expected = false;
388  bool prefetchRequested = prefetchRequested_.compare_exchange_strong(expected, true);
389  m_waitingTasks.add(waitTask);
390 
391  if (worker_ and prefetchRequested) {
392  //using a waiting task to do a callback guarantees that
393  // the m_waitingTasks list will be released from waiting even
394  // if the module does not put this data product or the
395  // module has an exception while running
396 
397  auto waiting = make_waiting_task([this](std::exception_ptr const* iException) {
398  if (nullptr != iException) {
399  m_waitingTasks.doneWaiting(*iException);
400  } else {
401  m_waitingTasks.doneWaiting(std::exception_ptr());
402  }
403  });
404  worker_->callWhenDoneAsync(WaitingTaskHolder(*waitTask.group(), waiting));
405  }
406  }
407  }
408 
409  void PuttableProductResolver::putProduct(std::unique_ptr<WrapperBase> edp) const {
411  bool expected = false;
412  if (prefetchRequested_.compare_exchange_strong(expected, true)) {
413  m_waitingTasks.doneWaiting(std::exception_ptr());
414  }
415  }
416 
418  if (not deleteEarly) {
419  prefetchRequested_ = false;
421  }
423  }
424 
427  }
428 
430  aux_ = iConfigure.auxiliary();
432  }
433 
435  bool skipCurrentProcess,
437  ModuleCallingContext const*) const {
438  if (!skipCurrentProcess and worker_) {
439  return resolveProductImpl<false>([] {});
440  }
441  return Resolution(nullptr);
442  }
443 
445  Principal const& principal,
446  bool skipCurrentProcess,
447  ServiceToken const& token,
449  ModuleCallingContext const* mcc) const {
450  if (skipCurrentProcess) {
451  return;
452  }
453  if (worker_ == nullptr) {
454  throw cms::Exception("LogicError") << "UnscheduledProductResolver::prefetchAsync_() called with null worker_. "
455  "This should not happen, please contact framework developers.";
456  }
457  //need to try changing prefetchRequested_ before adding to waitingTasks_
458  bool expected = false;
459  bool prefetchRequested = prefetchRequested_.compare_exchange_strong(expected, true);
460  waitingTasks_.add(waitTask);
461  if (prefetchRequested) {
462  //Have to create a new task which will make sure the state for UnscheduledProductResolver
463  // is properly set after the module has run
464  auto t = make_waiting_task([this](std::exception_ptr const* iPtr) {
465  //The exception is being rethrown because resolveProductImpl sets the ProductResolver to a failed
466  // state for the case where an exception occurs during the call to the function.
467  // Caught exception is propagated via WaitingTaskList
468  CMS_SA_ALLOW try {
469  resolveProductImpl<true>([iPtr]() {
470  if (iPtr) {
471  std::rethrow_exception(*iPtr);
472  }
473  });
474  } catch (...) {
475  waitingTasks_.doneWaiting(std::current_exception());
476  return;
477  }
478  waitingTasks_.doneWaiting(nullptr);
479  });
480 
481  ParentContext parentContext(mcc);
484  WaitingTaskHolder(*waitTask.group(), t),
485  info,
486  token,
487  info.principal().streamID(),
488  parentContext,
489  mcc->getStreamContext());
490  }
491  }
492 
494  if (not deleteEarly) {
495  prefetchRequested_ = false;
497  }
499  }
500 
501  void ProducedProductResolver::putProduct(std::unique_ptr<WrapperBase> edp) const {
502  if (status() != defaultStatus()) {
504  << "Attempt to insert more than one product on branch " << branchDescription().branchName() << "\n";
505  }
506 
507  setProduct(std::move(edp)); // ProductResolver takes ownership
508  }
509 
510  bool ProducedProductResolver::isFromCurrentProcess() const { return true; }
511 
513 
515  // Check if the types match.
516  TypeID typeID(prod.dynamicTypeInfo());
518  // Types do not match.
520  << "Product on branch " << branchDescription().branchName() << " is of wrong type.\n"
521  << "It is supposed to be of type " << branchDescription().className() << ".\n"
522  << "It is actually of type " << typeID.className() << ".\n";
523  }
524  }
525 
526  void DataManagingProductResolver::setProduct(std::unique_ptr<WrapperBase> edp) const {
527  if (edp) {
528  checkType(*edp);
531  } else {
532  setFailedStatus();
533  }
534  }
535  void DataManagingProductResolver::setProduct(std::shared_ptr<WrapperBase> edp) const {
536  if (edp) {
537  checkType(*edp);
540  } else {
541  setFailedStatus();
542  }
543  }
544 
545  // This routine returns true if it is known that currently there is no real product.
546  // If there is a real product, it returns false.
547  // If it is not known if there is a real product, it returns false.
549  auto presentStatus = status();
550  if (presentStatus == ProductStatus::ProductSet) {
551  return !(getProductData().wrapper()->isPresent());
552  }
553  return presentStatus != ProductStatus::ResolveNotRun;
554  }
555 
557  auto s = status();
558  return (s != defaultStatus()) or (s == ProductStatus::ProductDeleted);
559  }
560 
561  // This routine returns true if the product was deleted early in order to save memory
563 
564  bool DataManagingProductResolver::productWasFetchedAndIsValid_(bool iSkipCurrentProcess) const {
565  if (iSkipCurrentProcess and isFromCurrentProcess()) {
566  return false;
567  }
569  if (getProductData().wrapper()->isPresent()) {
570  return true;
571  }
572  }
573  return false;
574  }
575 
577  productData_.setProvenance(provRetriever);
578  }
579 
581 
583  MergeableRunProductMetadata const* mrpm) {
585  }
586 
588  return provenance()->productProvenance();
589  }
590 
594  }
595  if (deleteEarly) {
597  } else {
598  resetStatus();
599  }
600  }
601 
602  bool DataManagingProductResolver::singleProduct_() const { return true; }
603 
606  }
607 
609 
611  return provenance()->productProvenance();
612  }
613 
615 
616  bool AliasProductResolver::singleProduct_() const { return true; }
617 
618  SwitchBaseProductResolver::SwitchBaseProductResolver(std::shared_ptr<BranchDescription const> bd,
620  : realProduct_(realProduct), productData_(std::move(bd)), prefetchRequested_(false) {
621  // Parentage of this branch is always the same by construction, so we can compute the ID just "once" here.
622  Parentage p;
623  p.setParents(std::vector<BranchID>{realProduct.branchDescription().originalBranchID()});
624  parentageID_ = p.id();
626  }
627 
628  void SwitchBaseProductResolver::connectTo(ProductResolverBase const& iOther, Principal const* iParentPrincipal) {
630  << "SwitchBaseProductResolver::connectTo() not implemented and should never be called.\n"
631  << "Contact a Framework developer\n";
632  }
633 
636  }
637 
639  if (res.data() == nullptr)
640  return res;
641  return Resolution(&productData_);
642  }
643 
645  // SwitchProducer will never put anything in the event, and
646  // "false" will make Event::commit_() to call putProduct() with
647  // null unique_ptr<WrapperBase> to signal that the produce() was
648  // run.
649  return false;
650  }
651 
653  productData_.setProvenance(provRetriever);
654  }
655 
657  // insertIntoSet is const, so let's exploit that to fake the getting of the "input" product
659  }
660 
663  realProduct_.resetProductData_(deleteEarly);
664  if (not deleteEarly) {
665  prefetchRequested_ = false;
667  }
668  }
669 
671  // update provenance
673  // Use the Wrapper of the pointed-to resolver, but the provenance of this resolver
674  productData_.unsafe_setWrapper(realProduct().getProductData().sharedConstWrapper());
675  }
676 
677  SwitchProducerProductResolver::SwitchProducerProductResolver(std::shared_ptr<BranchDescription const> bd,
679  : SwitchBaseProductResolver(std::move(bd), realProduct), status_(defaultStatus_) {}
680 
682  bool skipCurrentProcess,
684  ModuleCallingContext const* mcc) const {
686  return resolveProductImpl(realProduct().resolveProduct(principal, skipCurrentProcess, sra, mcc));
687  }
688  return Resolution(nullptr);
689  }
690 
692  Principal const& principal,
693  bool skipCurrentProcess,
694  ServiceToken const& token,
696  ModuleCallingContext const* mcc) const {
697  if (skipCurrentProcess) {
698  return;
699  }
700  if (branchDescription().availableOnlyAtEndTransition() and mcc and not mcc->parent().isAtEndTransition()) {
701  return;
702  }
703 
704  //need to try changing prefetchRequested before adding to waitingTasks
705  bool expected = false;
706  bool doPrefetchRequested = prefetchRequested().compare_exchange_strong(expected, true);
707  waitingTasks().add(waitTask);
708 
709  if (doPrefetchRequested) {
710  //using a waiting task to do a callback guarantees that
711  // the waitingTasks() list will be released from waiting even
712  // if the module does not put this data product or the
713  // module has an exception while running
714  auto waiting = make_waiting_task([this](std::exception_ptr const* iException) {
715  if (nullptr != iException) {
716  waitingTasks().doneWaiting(*iException);
717  } else {
719  waitingTasks().doneWaiting(std::exception_ptr());
720  }
721  });
722  worker()->callWhenDoneAsync(WaitingTaskHolder(*waitTask.group(), waiting));
723  }
724  }
725 
726  void SwitchProducerProductResolver::putProduct(std::unique_ptr<WrapperBase> edp) const {
727  if (status_ != defaultStatus_) {
729  << "Attempt to insert more than one product for a branch " << branchDescription().branchName()
730  << "This makes no sense for SwitchProducerProductResolver.\nContact a Framework developer";
731  }
732  // Let's use ResolveFailed to signal that produce() was called, as
733  // there is no real product in this resolver
735  bool expected = false;
736  if (prefetchRequested().compare_exchange_strong(expected, true)) {
738  waitingTasks().doneWaiting(std::exception_ptr());
739  }
740  }
741 
743  // if produce() was run (ResolveFailed), ask from the real resolver
745  return realProduct().productUnavailable();
746  }
747  return true;
748  }
749 
752  if (not deleteEarly) {
754  }
755  }
756 
758  bool skipCurrentProcess,
760  ModuleCallingContext const* mcc) const {
761  return resolveProductImpl(realProduct().resolveProduct(principal, skipCurrentProcess, sra, mcc));
762  }
763 
765  Principal const& principal,
766  bool skipCurrentProcess,
767  ServiceToken const& token,
769  ModuleCallingContext const* mcc) const {
770  if (skipCurrentProcess) {
771  return;
772  }
773 
774  //need to try changing prefetchRequested_ before adding to waitingTasks_
775  bool expected = false;
776  bool doPrefetchRequested = prefetchRequested().compare_exchange_strong(expected, true);
777  waitingTasks().add(waitTask);
778 
779  if (doPrefetchRequested) {
780  //using a waiting task to do a callback guarantees that
781  // the waitingTasks() list will be released from waiting even
782  // if the module does not put this data product or the
783  // module has an exception while running
784  auto waiting = make_waiting_task([this](std::exception_ptr const* iException) {
785  if (nullptr != iException) {
786  waitingTasks().doneWaiting(*iException);
787  } else {
789  waitingTasks().doneWaiting(std::exception_ptr());
790  }
791  });
793  WaitingTaskHolder(*waitTask.group(), waiting), principal, skipCurrentProcess, token, sra, mcc);
794  }
795  }
796 
798  provRetriever_ = provRetriever;
799  }
800 
802 
804  return provRetriever_ ? provRetriever_->branchIDToProvenance(bd_->originalBranchID()) : nullptr;
805  }
806 
808 
809  bool ParentProcessProductResolver::singleProduct_() const { return true; }
810 
812  // In principle, this ought to be fixed. I noticed one hits this error
813  // when in a SubProcess and calling the Event::getProvenance function
814  // with a BranchID to a branch from an earlier SubProcess or the top
815  // level process and this branch is not kept in this SubProcess. It might
816  // be possible to hit this in other contexts. I say it ought to be
817  // fixed because one does not encounter this issue if the SubProcesses
818  // are split into genuinely different processes (in principle that
819  // ought to give identical behavior and results). No user has ever
820  // reported this issue which has been around for some time and it was only
821  // noticed when testing some rare corner cases after modifying Core code.
822  // After discussing this with Chris we decided that at least for the moment
823  // there are higher priorities than fixing this ... I converted it so it
824  // causes an exception instead of a seg fault. The issue that may need to
825  // be addressed someday is how ProductResolvers for non-kept branches are
826  // connected to earlier SubProcesses.
828  << "ParentProcessProductResolver::throwNullRealProduct RealProduct pointer not set in this context.\n"
829  << "Contact a Framework developer\n";
830  }
831 
832  NoProcessProductResolver::NoProcessProductResolver(std::vector<ProductResolverIndex> const& matchingHolders,
833  std::vector<bool> const& ambiguous,
834  bool madeAtEnd)
835  : matchingHolders_(matchingHolders),
836  ambiguous_(ambiguous),
837  lastCheckIndex_(ambiguous_.size() + kUnsetOffset),
838  lastSkipCurrentCheckIndex_(lastCheckIndex_.load()),
839  prefetchRequested_(false),
840  skippingPrefetchRequested_(false),
841  madeAtEnd_{madeAtEnd} {
842  assert(ambiguous_.size() == matchingHolders_.size());
843  }
844 
846  Principal const& principal,
847  bool skipCurrentProcess,
849  ModuleCallingContext const* mcc) const {
850  ProductResolverBase const* productResolver = principal.getProductResolverByIndex(matchingHolders_[index]);
851  return productResolver->resolveProduct(principal, skipCurrentProcess, sra, mcc);
852  }
853 
855  bool skipCurrentProcess,
857  ModuleCallingContext const* mcc) const {
858  //See if we've already cached which Resolver we should call or if
859  // we know it is ambiguous
860  const unsigned int choiceSize = ambiguous_.size();
861 
862  //madeAtEnd_==true and not at end transition is the same as skipping the current process
863  if ((not skipCurrentProcess) and (madeAtEnd_ and mcc)) {
864  skipCurrentProcess = not mcc->parent().isAtEndTransition();
865  }
866 
867  unsigned int checkCacheIndex = skipCurrentProcess ? lastSkipCurrentCheckIndex_.load() : lastCheckIndex_.load();
868  if (checkCacheIndex != choiceSize + kUnsetOffset) {
869  if (checkCacheIndex == choiceSize + kAmbiguousOffset) {
871  } else if (checkCacheIndex == choiceSize + kMissingOffset) {
872  return Resolution(nullptr);
873  }
874  return tryResolver(checkCacheIndex, principal, skipCurrentProcess, sra, mcc);
875  }
876 
877  std::atomic<unsigned int>& updateCacheIndex = skipCurrentProcess ? lastSkipCurrentCheckIndex_ : lastCheckIndex_;
878 
879  std::vector<unsigned int> const& lookupProcessOrder = principal.lookupProcessOrder();
880  for (unsigned int k : lookupProcessOrder) {
881  assert(k < ambiguous_.size());
882  if (k == 0)
883  break; // Done
884  if (ambiguous_[k]) {
885  updateCacheIndex = choiceSize + kAmbiguousOffset;
887  }
889  auto resolution = tryResolver(k, principal, skipCurrentProcess, sra, mcc);
890  if (resolution.data() != nullptr) {
891  updateCacheIndex = k;
892  return resolution;
893  }
894  }
895  }
896 
897  updateCacheIndex = choiceSize + kMissingOffset;
898  return Resolution(nullptr);
899  }
900 
902  Principal const& principal,
903  bool skipCurrentProcess,
904  ServiceToken const& token,
906  ModuleCallingContext const* mcc) const {
907  bool timeToMakeAtEnd = true;
908  if (madeAtEnd_ and mcc) {
909  timeToMakeAtEnd = mcc->parent().isAtEndTransition();
910  }
911 
912  //If timeToMakeAtEnd is false, then it is equivalent to skipping the current process
913  if (not skipCurrentProcess and timeToMakeAtEnd) {
914  //need to try changing prefetchRequested_ before adding to waitingTasks_
915  bool expected = false;
916  bool prefetchRequested = prefetchRequested_.compare_exchange_strong(expected, true);
917  waitingTasks_.add(waitTask);
918 
919  if (prefetchRequested) {
920  //we are the first thread to request
921  tryPrefetchResolverAsync(0, principal, false, sra, mcc, token, waitTask.group());
922  }
923  } else {
924  skippingWaitingTasks_.add(waitTask);
925  bool expected = false;
926  if (skippingPrefetchRequested_.compare_exchange_strong(expected, true)) {
927  //we are the first thread to request
928  tryPrefetchResolverAsync(0, principal, true, sra, mcc, token, waitTask.group());
929  }
930  }
931  }
932 
933  void NoProcessProductResolver::setCache(bool iSkipCurrentProcess,
934  ProductResolverIndex iIndex,
935  std::exception_ptr iExceptPtr) const {
936  if (not iSkipCurrentProcess) {
937  lastCheckIndex_ = iIndex;
938  waitingTasks_.doneWaiting(iExceptPtr);
939  } else {
942  }
943  }
944 
945  namespace {
946  class TryNextResolverWaitingTask : public edm::WaitingTask {
947  public:
948  TryNextResolverWaitingTask(NoProcessProductResolver const* iResolver,
949  unsigned int iResolverIndex,
950  Principal const* iPrincipal,
952  ModuleCallingContext const* iMCC,
953  bool iSkipCurrentProcess,
954  ServiceToken iToken,
955  tbb::task_group* iGroup)
956  : resolver_(iResolver),
957  principal_(iPrincipal),
958  sra_(iSRA),
959  mcc_(iMCC),
960  group_(iGroup),
961  serviceToken_(iToken),
962  index_(iResolverIndex),
963  skipCurrentProcess_(iSkipCurrentProcess) {}
964 
965  void execute() final {
966  auto exceptPtr = exceptionPtr();
967  if (exceptPtr) {
968  resolver_->prefetchFailed(index_, *principal_, skipCurrentProcess_, *exceptPtr);
969  } else {
970  if (not resolver_->dataValidFromResolver(index_, *principal_, skipCurrentProcess_)) {
971  resolver_->tryPrefetchResolverAsync(
972  index_ + 1, *principal_, skipCurrentProcess_, sra_, mcc_, serviceToken_.lock(), group_);
973  }
974  }
975  }
976 
977  private:
978  NoProcessProductResolver const* resolver_;
979  Principal const* principal_;
981  ModuleCallingContext const* mcc_;
982  tbb::task_group* group_;
983  ServiceWeakToken serviceToken_;
984  unsigned int index_;
985  bool skipCurrentProcess_;
986  };
987  } // namespace
988 
989  void NoProcessProductResolver::prefetchFailed(unsigned int iProcessingIndex,
990  Principal const& principal,
991  bool iSkipCurrentProcess,
992  std::exception_ptr iExceptPtr) const {
993  std::vector<unsigned int> const& lookupProcessOrder = principal.lookupProcessOrder();
994  auto k = lookupProcessOrder[iProcessingIndex];
995 
996  setCache(iSkipCurrentProcess, k, iExceptPtr);
997  }
998 
999  bool NoProcessProductResolver::dataValidFromResolver(unsigned int iProcessingIndex,
1000  Principal const& principal,
1001  bool iSkipCurrentProcess) const {
1002  std::vector<unsigned int> const& lookupProcessOrder = principal.lookupProcessOrder();
1003  auto k = lookupProcessOrder[iProcessingIndex];
1004  ProductResolverBase const* productResolver = principal.getProductResolverByIndex(matchingHolders_[k]);
1005 
1006  if (productResolver->productWasFetchedAndIsValid(iSkipCurrentProcess)) {
1007  setCache(iSkipCurrentProcess, k, nullptr);
1008  return true;
1009  }
1010  return false;
1011  }
1012 
1013  void NoProcessProductResolver::tryPrefetchResolverAsync(unsigned int iProcessingIndex,
1014  Principal const& principal,
1015  bool skipCurrentProcess,
1017  ModuleCallingContext const* mcc,
1019  tbb::task_group* group) const {
1020  std::vector<unsigned int> const& lookupProcessOrder = principal.lookupProcessOrder();
1021  auto index = iProcessingIndex;
1022 
1023  const unsigned int choiceSize = ambiguous_.size();
1024  unsigned int newCacheIndex = choiceSize + kMissingOffset;
1025  while (index < lookupProcessOrder.size()) {
1026  auto k = lookupProcessOrder[index];
1027  if (k == 0) {
1028  break;
1029  }
1030  assert(k < ambiguous_.size());
1031  if (ambiguous_[k]) {
1032  newCacheIndex = choiceSize + kAmbiguousOffset;
1033  break;
1034  }
1036  //make new task
1037 
1038  auto task = new TryNextResolverWaitingTask(this, index, &principal, sra, mcc, skipCurrentProcess, token, group);
1039  WaitingTaskHolder hTask(*group, task);
1040  ProductResolverBase const* productResolver = principal.getProductResolverByIndex(matchingHolders_[k]);
1041 
1042  //Make sure the Services are available on this thread
1043  ServiceRegistry::Operate guard(token);
1044 
1045  productResolver->prefetchAsync(hTask, principal, skipCurrentProcess, token, sra, mcc);
1046  return;
1047  }
1048  ++index;
1049  }
1050  //data product unavailable
1051  setCache(skipCurrentProcess, newCacheIndex, nullptr);
1052  }
1053 
1055 
1057 
1059 
1060  inline unsigned int NoProcessProductResolver::unsetIndexValue() const { return ambiguous_.size() + kUnsetOffset; }
1061 
1063  // This function should never receive 'true'. On the other hand,
1064  // nothing should break if a 'true' is passed, because
1065  // NoProcessProductResolver just forwards the resolve
1066  const auto resetValue = unsetIndexValue();
1067  lastCheckIndex_ = resetValue;
1068  lastSkipCurrentCheckIndex_ = resetValue;
1069  prefetchRequested_ = false;
1071  waitingTasks_.reset();
1073  }
1074 
1075  bool NoProcessProductResolver::singleProduct_() const { return false; }
1076 
1079  << "NoProcessProductResolver::unscheduledWasNotRun_() not implemented and should never be called.\n"
1080  << "Contact a Framework developer\n";
1081  }
1082 
1085  << "NoProcessProductResolver::productUnavailable_() not implemented and should never be called.\n"
1086  << "Contact a Framework developer\n";
1087  }
1088 
1091  << "NoProcessProductResolver::productResolved_() not implemented and should never be called.\n"
1092  << "Contact a Framework developer\n";
1093  }
1094 
1097  << "NoProcessProductResolver::productWasDeleted_() not implemented and should never be called.\n"
1098  << "Contact a Framework developer\n";
1099  }
1100 
1101  bool NoProcessProductResolver::productWasFetchedAndIsValid_(bool /*iSkipCurrentProcess*/) const {
1103  << "NoProcessProductResolver::productWasFetchedAndIsValid_() not implemented and should never be called.\n"
1104  << "Contact a Framework developer\n";
1105  }
1106 
1109  << "NoProcessProductResolver::branchDescription_() not implemented and should never be called.\n"
1110  << "Contact a Framework developer\n";
1111  }
1112 
1113  void NoProcessProductResolver::resetBranchDescription_(std::shared_ptr<BranchDescription const>) {
1115  << "NoProcessProductResolver::resetBranchDescription_() not implemented and should never be called.\n"
1116  << "Contact a Framework developer\n";
1117  }
1118 
1121  << "NoProcessProductResolver::provenance_() not implemented and should never be called.\n"
1122  << "Contact a Framework developer\n";
1123  }
1124 
1127  << "NoProcessProductResolver::connectTo() not implemented and should never be called.\n"
1128  << "Contact a Framework developer\n";
1129  }
1130 
1131  //---- SingleChoiceNoProcessProductResolver ----------------
1133  Principal const& principal,
1134  bool skipCurrentProcess,
1136  ModuleCallingContext const* mcc) const {
1137  //NOTE: Have to lookup the other ProductResolver each time rather than cache
1138  // it's pointer since it appears the pointer can change at some later stage
1140  ->resolveProduct(principal, skipCurrentProcess, sra, mcc);
1141  }
1142 
1144  Principal const& principal,
1145  bool skipCurrentProcess,
1146  ServiceToken const& token,
1148  ModuleCallingContext const* mcc) const {
1150  ->prefetchAsync(waitTask, principal, skipCurrentProcess, token, sra, mcc);
1151  }
1152 
1154 
1156 
1158 
1160 
1162 
1165  << "SingleChoiceNoProcessProductResolver::unscheduledWasNotRun_() not implemented and should never be called.\n"
1166  << "Contact a Framework developer\n";
1167  }
1168 
1171  << "SingleChoiceNoProcessProductResolver::productUnavailable_() not implemented and should never be called.\n"
1172  << "Contact a Framework developer\n";
1173  }
1174 
1177  << "SingleChoiceNoProcessProductResolver::productResolved_() not implemented and should never be called.\n"
1178  << "Contact a Framework developer\n";
1179  }
1180 
1183  << "SingleChoiceNoProcessProductResolver::productWasDeleted_() not implemented and should never be called.\n"
1184  << "Contact a Framework developer\n";
1185  }
1186 
1188  throw Exception(errors::LogicError) << "SingleChoiceNoProcessProductResolver::productWasFetchedAndIsValid_() not "
1189  "implemented and should never be called.\n"
1190  << "Contact a Framework developer\n";
1191  }
1192 
1195  << "SingleChoiceNoProcessProductResolver::branchDescription_() not implemented and should never be called.\n"
1196  << "Contact a Framework developer\n";
1197  }
1198 
1199  void SingleChoiceNoProcessProductResolver::resetBranchDescription_(std::shared_ptr<BranchDescription const>) {
1200  throw Exception(errors::LogicError) << "SingleChoiceNoProcessProductResolver::resetBranchDescription_() not "
1201  "implemented and should never be called.\n"
1202  << "Contact a Framework developer\n";
1203  }
1204 
1207  << "SingleChoiceNoProcessProductResolver::provenance_() not implemented and should never be called.\n"
1208  << "Contact a Framework developer\n";
1209  }
1210 
1213  << "SingleChoiceNoProcessProductResolver::connectTo() not implemented and should never be called.\n"
1214  << "Contact a Framework developer\n";
1215  }
1216 
1217 } // namespace edm
void setProductProvenanceRetriever_(ProductProvenanceRetriever const *provRetriever) override
void connectTo(ProductResolverBase const &iOther, Principal const *) final
ProductProvenance const * productProvenancePtr_() const override
void setProductID(ProductID const &pid)
ProductData const & getProductData() const final
Provenance const & provenance() const
Definition: ProductData.h:33
std::string const & branchName() const
ProductProvenanceLookup const * store() const
Definition: Provenance.h:60
ServiceToken lock() const
Definition: ServiceToken.h:101
std::string const & productInstanceName() const
void setProductProvenanceRetriever_(ProductProvenanceRetriever const *provRetriever) final
bool isAtEndTransition() const
std::unique_ptr< T, F > make_sentry(T *iObject, F iFunc)
NOTE: if iObject is null, then iFunc will not be called.
Definition: make_sentry.h:30
DataManagingOrAliasProductResolver const & realProduct() const
void resetProductData_(bool deleteEarly) override
static const TGPicture * info(bool iBackgroundIsBlack)
void prefetchAsync_(WaitingTaskHolder waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
#define CMS_SA_ALLOW
void prefetchAsync_(WaitingTaskHolder waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
Provenance const * provenance() const
BranchType const & branchType() const
void prefetchAsync(WaitingTaskHolder waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const
static constexpr unsigned int kAmbiguousOffset
Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const final
WaitingTaskList & waitingTasks() const
bool singleProduct_() const override
unsigned int ProductResolverIndex
StreamContext const * getStreamContext() const
SwitchProducerProductResolver(std::shared_ptr< BranchDescription const > bd, DataManagingOrAliasProductResolver &realProduct)
static constexpr unsigned int kMissingOffset
SwitchBaseProductResolver(std::shared_ptr< BranchDescription const > bd, DataManagingOrAliasProductResolver &realProduct)
void checkType(WrapperBase const &prod) const
std::type_info const & dynamicTypeInfo() const
Definition: WrapperBase.h:42
Resolution resolveProductImpl(Resolution) const
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventIDconst &, edm::Timestampconst & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
unsigned int unsetIndexValue() const
std::shared_ptr< BranchDescription const > bd_
void push(tbb::task_group &iGroup, T &&iAction)
asynchronously pushes functor iAction into queue
void resetProductData_(bool deleteEarly) override
ParentageID id() const
Definition: Parentage.cc:17
EventPrincipal & principal()
void setProductID_(ProductID const &pid) final
void setProductProvenanceRetriever_(ProductProvenanceRetriever const *provRetriever) override
void setCache(bool skipCurrentProcess, ProductResolverIndex index, std::exception_ptr exceptionPtr) const
void putProduct(std::unique_ptr< WrapperBase > edp) const final
list original
Definition: definitions.py:57
std::atomic< bool > prefetchRequested_
ProductStatus status() const
void resetProductData_(bool deleteEarly) final
void doWorkAsync(WaitingTaskHolder, typename T::TransitionInfoType const &, ServiceToken const &, StreamID, ParentContext const &, typename T::Context const *)
Definition: Worker.h:931
Resolution tryResolver(unsigned int index, Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const
void prefetchAsync_(WaitingTaskHolder waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const final
WrapperBase const * wrapper() const
Definition: ProductData.h:35
ProductProvenance const * branchIDToProvenance(BranchID const &bid) const
std::string const & processName() const
void resetProductData_(bool deleteEarly) override
bool productWasFetchedAndIsValid_(bool iSkipCurrentProcess) const override
ProductProvenance const * productProvenance() const
Definition: Provenance.cc:24
bool unscheduledWasNotRun_() const override
Resolution resolveProduct(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const
void prefetchAsync_(WaitingTaskHolder waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
bool singleProduct_() const override
Log< level::Error, false > LogError
bool productUnavailable_() const override
std::atomic< bool > prefetchRequested_
void reset()
Resets access to the resource so that added tasks will wait.
ProductProvenanceRetriever const * provRetriever_
void connectTo(ProductResolverBase const &iOther, Principal const *) final
assert(be >=bs)
ProductData const & getProductData() const final
BranchDescription const & branchDescription_() const override
void resetProductData_(bool deleteEarly) override
std::string const & processName() const
Provenance const * provenance_() const override
void connectTo(ProductResolverBase const &, Principal const *) final
ParentContext const & parent() const
void setProductID(ProductID const &pid)
Definition: ProductData.h:58
DataManagingOrAliasProductResolver & realProduct_
UnscheduledAuxiliary const * aux_
bool isPresent() const
Definition: WrapperBase.h:30
void setOrMergeProduct(std::shared_ptr< WrapperBase > prod, MergeableRunProductMetadata const *mergeableRunProductMetadata) const
void setupUnscheduled(UnscheduledConfigurator const &) final
bool singleProduct_() const override
void unsafe_setWrapper(std::unique_ptr< WrapperBase > iValue) const
Definition: ProductData.cc:27
void prefetchFailed(unsigned int iProcessingIndex, Principal const &principal, bool iSkipCurrentProcess, std::exception_ptr iExceptPtr) const
void setProductID_(ProductID const &pid) override
UnscheduledAuxiliary const * aux_
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
virtual bool isFromCurrentProcess() const =0
void setupUnscheduled(UnscheduledConfigurator const &iConfigure) final
std::string const & className() const
void prefetchAsync_(WaitingTaskHolder waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const final
std::atomic< unsigned int > lastSkipCurrentCheckIndex_
BranchDescription const & branchDescription_() const override
void resetProductData()
Definition: ProductData.h:52
std::vector< unsigned int > const & lookupProcessOrder() const
Definition: Principal.h:197
std::atomic< bool > prefetchRequested_
bool productWasFetchedAndIsValid_(bool iSkipCurrentProcess) const final
void resetProductData_(bool deleteEarly) override
def move
Definition: eostools.py:511
def load
Definition: svgfig.py:547
bool dataValidFromResolver(unsigned int iProcessingIndex, Principal const &principal, bool iSkipCurrentProcess) const
StreamID streamID() const
TypeID unwrappedTypeID() const
void resetProductData_(bool deleteEarly) override
Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
bool isFromCurrentProcess() const final
ProductProvenance const * productProvenancePtr_() const final
static constexpr const ProductStatus defaultStatus_
Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const final
void resetBranchDescription_(std::shared_ptr< BranchDescription const > bd) override
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:101
MergeDecision getMergeDecision(std::string const &processThatCreatedProduct) const
std::atomic< bool > prefetchRequested_
tuple reader
Definition: DQM.py:105
std::atomic< unsigned int > lastCheckIndex_
TypeWithDict const & unwrappedType() const
void callWhenDoneAsync(WaitingTaskHolder task)
Definition: Worker.h:167
void setProductID_(ProductID const &pid) override
EventTransitionInfo const & eventTransitionInfo() const
void setMergeableRunProductMetadata(MergeableRunProductMetadataBase const *mrpm)
Definition: ProductData.h:60
void putOrMergeProduct(std::unique_ptr< WrapperBase > prod) const override
std::type_info const & unvalidatedTypeInfo() const
Definition: TypeWithDict.h:81
void setProductID_(ProductID const &pid) final
void prefetchAsync_(WaitingTaskHolder waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
void add(tbb::task_group *, WaitingTask *)
Adds task to the waiting list.
void setProduct(std::unique_ptr< WrapperBase > edp) const
NoProcessProductResolver(std::vector< ProductResolverIndex > const &matchingHolders, std::vector< bool > const &ambiguous, bool madeAtEnd)
std::atomic< bool > skippingPrefetchRequested_
ProductProvenance const * productProvenancePtr_() const override
std::atomic< ProductStatus > theStatus_
std::vector< bool > ambiguous_
Resolution resolveProductImpl(FUNC resolver) const
tuple group
Definition: watchdog.py:82
DelayedReader * reader() const
Definition: Principal.h:187
void setProductProvenanceRetriever_(ProductProvenanceRetriever const *provRetriever) override
UnscheduledAuxiliary const * auxiliary() const
void resetProductData_(bool deleteEarly) override
static constexpr unsigned int kUnsetOffset
void resetProductData_(bool deleteEarly) override
void setProductProvenanceRetriever_(ProductProvenanceRetriever const *provRetriever) override
void setupUnscheduled(UnscheduledConfigurator const &) final
ProductStatus defaultStatus() const
ProductData const * data() const
bool productWasFetchedAndIsValid(bool iSkipCurrentProcess) const
void setMergeableRunProductMetadataInProductData(MergeableRunProductMetadata const *)
void setProvenance(ProductProvenanceLookup const *provRetriever)
Definition: ProductData.h:56
FunctorTask< F > * make_functor_task(F f)
Definition: FunctorTask.h:44
void insertIntoSet(ProductProvenance provenanceProduct) const
void putProduct(std::unique_ptr< WrapperBase > edp) const override
void setProductID_(ProductID const &pid) override
void setMergeableRunProductMetadata_(MergeableRunProductMetadata const *) override
DataManagingOrAliasProductResolver & realProduct_
tbb::task_group * group() const noexcept
std::vector< ProductResolverIndex > matchingHolders_
void resetBranchDescription_(std::shared_ptr< BranchDescription const > bd) override
Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
Transition transition() const
Definition: GlobalContext.h:53
void retrieveAndMerge_(Principal const &principal, MergeableRunProductMetadata const *mergeableRunProductMetadata) const override
BranchDescription const & branchDescription() const
Worker * findWorker(std::string const &iLabel) const
ConstProductResolverPtr getProductResolverByIndex(ProductResolverIndex const &oid) const
Definition: Principal.cc:562
void tryPrefetchResolverAsync(unsigned int iProcessingIndex, Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc, ServiceToken token, tbb::task_group *) const
void setParents(std::vector< BranchID > parents)
Definition: Parentage.h:46
bool productWasDeleted_() const override
void prefetchAsync_(WaitingTaskHolder waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
void setupUnscheduled(UnscheduledConfigurator const &) final
void resetProductData_(bool deleteEarly) override=0
BranchID const & originalBranchID() const
Provenance const * provenance_() const override
Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const final
bool availableOnlyAtEndTransition() const
Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
Log< level::Warning, false > LogWarning
void mergeProduct(std::shared_ptr< WrapperBase > edp, MergeableRunProductMetadata const *) const
void resetProductData_(bool deleteEarly) override
ProductProvenance const * productProvenancePtr_() const override
std::atomic< bool > & prefetchRequested() const
static ParentageRegistry * instance()
GlobalContext const * globalContext() const
bool productWasFetchedAndIsValid_(bool iSkipCurrentProcess) const override
ProductProvenance const * productProvenancePtr_() const override
void setProductID_(ProductID const &pid) override
void prefetchAsync_(WaitingTaskHolder waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const final
tuple size
Write out results.
bool insertMapped(value_type const &v)
void setProductProvenanceRetriever_(ProductProvenanceRetriever const *provRetriever) final
std::string const & moduleLabel() const
void putProduct(std::unique_ptr< WrapperBase > edp) const override
void setProductProvenanceRetriever(ProductProvenanceRetriever const *provRetriever)
WrapperBase * unsafe_wrapper() const
Definition: ProductData.h:36
static HepMC::HEPEVT_Wrapper wrapper
void putProduct(std::unique_ptr< WrapperBase > edp) const override
Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
void connectTo(ProductResolverBase const &iOther, Principal const *iParentPrincipal) final