CMS 3D CMS Logo

ProductResolvers.cc
Go to the documentation of this file.
1 /*----------------------------------------------------------------------
2 ----------------------------------------------------------------------*/
3 #include "ProductResolvers.h"
23 
24 #include <cassert>
25 #include <utility>
26 
27 static constexpr unsigned int kUnsetOffset = 0;
28 static constexpr unsigned int kAmbiguousOffset = 1;
29 static constexpr unsigned int kMissingOffset = 2;
30 
31 namespace edm {
32 
35  exception << "DataManagingProductResolver::resolveProduct_: The product matching all criteria was already deleted\n"
36  << "Looking for type: " << branchDescription().unwrappedTypeID() << "\n"
37  << "Looking for module label: " << moduleLabel() << "\n"
38  << "Looking for productInstanceName: " << productInstanceName() << "\n"
39  << (processName().empty() ? "" : "Looking for process: ") << processName() << "\n"
40  << "This means there is a configuration error.\n"
41  << "The module which is asking for this data must be configured to state that it will read this data.";
42  throw exception;
43  }
44 
45  //This is a templated function in order to avoid calling another virtual function
46  template <bool callResolver, typename FUNC>
48  if (productWasDeleted()) {
50  }
51  auto presentStatus = status();
52 
53  if (callResolver && presentStatus == ProductStatus::ResolveNotRun) {
54  //if resolver fails because of exception or not setting product
55  // make sure the status goes to failed
56  auto failedStatusSetter = [this](ProductStatus* iPresentStatus) {
57  if (this->status() == ProductStatus::ResolveNotRun) {
58  this->setFailedStatus();
59  }
60  *iPresentStatus = this->status();
61  };
62  std::unique_ptr<ProductStatus, decltype(failedStatusSetter)> failedStatusGuard(&presentStatus,
63  failedStatusSetter);
64 
65  //If successful, this will call setProduct
66  resolver();
67  }
68 
69  if (presentStatus == ProductStatus::ProductSet) {
70  auto pd = &getProductData();
71  if (pd->wrapper()->isPresent()) {
72  return Resolution(pd);
73  }
74  }
75 
76  return Resolution(nullptr);
77  }
78 
80  std::shared_ptr<WrapperBase> iFrom, MergeableRunProductMetadata const* mergeableRunProductMetadata) const {
81  // if its not mergeable and the previous read failed, go ahead and use this one
83  setProduct(std::move(iFrom));
84  return;
85  }
86 
88  if (not iFrom) {
89  return;
90  }
91 
92  checkType(*iFrom);
93 
95  if (original->isMergeable()) {
96  if (original->isPresent() != iFrom->isPresent()) {
98  << "Merge of Run or Lumi product failed for branch " << branchDescription().branchName() << "\n"
99  << "Was trying to merge objects where one product had been put in the input file and the other had not "
100  "been."
101  << "\n"
102  << "The solution is to drop the branch on input. Or better do not create inconsistent files\n"
103  << "that need to be merged in the first place.\n";
104  }
105  if (original->isPresent()) {
107  if (mergeableRunProductMetadata == nullptr || desc.branchType() != InRun) {
108  original->mergeProduct(iFrom.get());
109  } else {
111  mergeableRunProductMetadata->getMergeDecision(desc.processName());
112  if (decision == MergeableRunProductMetadata::MERGE) {
113  original->mergeProduct(iFrom.get());
114  } else if (decision == MergeableRunProductMetadata::REPLACE) {
115  // Note this swaps the content of the product where the
116  // both products branches are present and the product is
117  // also present (was put) in the branch. A module might
118  // have already gotten a pointer to the product so we
119  // keep those pointers valid. This does not call swap
120  // on the Wrapper.
121  original->swapProduct(iFrom.get());
122  }
123  // If the decision is IGNORE, do nothing
124  }
125  }
126  // If both have isPresent false, do nothing
127 
128  } else if (original->hasIsProductEqual()) {
129  if (original->isPresent() && iFrom->isPresent()) {
130  if (!original->isProductEqual(iFrom.get())) {
131  auto const& bd = branchDescription();
132  edm::LogError("RunLumiMerging")
133  << "ProductResolver::mergeTheProduct\n"
134  << "Two run/lumi products for the same run/lumi which should be equal are not\n"
135  << "Using the first, ignoring the second\n"
136  << "className = " << bd.className() << "\n"
137  << "moduleLabel = " << bd.moduleLabel() << "\n"
138  << "instance = " << bd.productInstanceName() << "\n"
139  << "process = " << bd.processName() << "\n";
140  }
141  } else if (!original->isPresent() && iFrom->isPresent()) {
142  setProduct(std::move(iFrom));
143  }
144  // if not iFrom->isPresent(), do nothing
145  } else {
146  auto const& bd = branchDescription();
147  edm::LogWarning("RunLumiMerging") << "ProductResolver::mergeTheProduct\n"
148  << "Run/lumi product has neither a mergeProduct nor isProductEqual function\n"
149  << "Using the first, ignoring the second in merge\n"
150  << "className = " << bd.className() << "\n"
151  << "moduleLabel = " << bd.moduleLabel() << "\n"
152  << "instance = " << bd.productInstanceName() << "\n"
153  << "process = " << bd.processName() << "\n";
154  if (!original->isPresent() && iFrom->isPresent()) {
155  setProduct(std::move(iFrom));
156  }
157  // In other cases, do nothing
158  }
159  }
160 
161  namespace {
162  void extendException(cms::Exception& e, BranchDescription const& bd, ModuleCallingContext const* mcc) {
163  e.addContext(std::string("While reading from source ") + bd.className() + " " + bd.moduleLabel() + " '" +
164  bd.productInstanceName() + "' " + bd.processName());
165  if (mcc) {
166  edm::exceptionContext(e, *mcc);
167  }
168  }
169  } // namespace
171  Principal const& principal, bool, SharedResourcesAcquirer*, ModuleCallingContext const* mcc) const {
172  return resolveProductImpl<true>([this, &principal, mcc]() {
173  auto branchType = principal.branchType();
174  if (branchType == InLumi || branchType == InRun) {
175  //delayed get has not been allowed with Run or Lumis
176  // The file may already be closed so the reader is invalid
177  return;
178  }
179  if (mcc and branchType == InEvent and aux_) {
181  }
182 
183  auto sentry(make_sentry(mcc, [this, branchType](ModuleCallingContext const* iContext) {
184  if (branchType == InEvent and aux_) {
185  aux_->postModuleDelayedGetSignal_.emit(*(iContext->getStreamContext()), *iContext);
186  }
187  }));
188 
189  if (auto reader = principal.reader()) {
190  std::unique_lock<std::recursive_mutex> guard;
191  if (auto sr = reader->sharedResources().second) {
192  guard = std::unique_lock<std::recursive_mutex>(*sr);
193  }
194  if (not productResolved()) {
195  try {
196  //another thread could have beaten us here
197  setProduct(reader->getProduct(branchDescription().branchID(), &principal, mcc));
198  } catch (cms::Exception& e) {
199  extendException(e, branchDescription(), mcc);
200  throw;
201  } catch (std::exception const& e) {
202  auto newExcept = edm::Exception(errors::StdException) << e.what();
203  extendException(newExcept, branchDescription(), mcc);
204  throw newExcept;
205  }
206  }
207  }
208  });
209  }
210 
212  Principal const& principal, MergeableRunProductMetadata const* mergeableRunProductMetadata) const {
213  if (auto reader = principal.reader()) {
214  std::unique_lock<std::recursive_mutex> guard;
215  if (auto sr = reader->sharedResources().second) {
216  guard = std::unique_lock<std::recursive_mutex>(*sr);
217  }
218 
219  //Can't use resolveProductImpl since it first checks to see
220  // if the product was already retrieved and then returns if it is
221  auto edp(reader->getProduct(branchDescription().branchID(), &principal));
222 
223  if (edp.get() != nullptr) {
224  if (edp->isMergeable() && branchDescription().branchType() == InRun && !edp->hasSwap()) {
226  << "Missing definition of member function swap for branch name " << branchDescription().branchName()
227  << "\n"
228  << "Mergeable data types written to a Run must have the swap member function defined"
229  << "\n";
230  }
232  (status() == ProductStatus::ResolveFailed && !branchDescription().isMergeable())) {
233  setOrMergeProduct(std::move(edp), mergeableRunProductMetadata);
234  } else { // status() == ResolveFailed && branchDescription().isMergeable()
236  << "Merge of Run or Lumi product failed for branch " << branchDescription().branchName() << "\n"
237  << "The product branch was dropped in the first run or lumi fragment and present in a later one"
238  << "\n"
239  << "The solution is to drop the branch on input. Or better do not create inconsistent files\n"
240  << "that need to be merged in the first place.\n";
241  }
242  } else if (status() == defaultStatus()) {
243  setFailedStatus();
244  } else if (status() != ProductStatus::ResolveFailed && branchDescription().isMergeable()) {
246  << "Merge of Run or Lumi product failed for branch " << branchDescription().branchName() << "\n"
247  << "The product branch was present in first run or lumi fragment and dropped in a later one"
248  << "\n"
249  << "The solution is to drop the branch on input. Or better do not create inconsistent files\n"
250  << "that need to be merged in the first place.\n";
251  }
252  // Do nothing in other case. status is ResolveFailed already or
253  // it is not mergeable and the status is ProductSet
254  }
255  }
256 
258  std::shared_ptr<WrapperBase> prod, MergeableRunProductMetadata const* mergeableRunProductMetadata) const {
259  if (status() == defaultStatus()) {
260  //resolveProduct has not been called or it failed
262  } else {
263  mergeProduct(std::move(prod), mergeableRunProductMetadata);
264  }
265  }
266 
269  }
270 
272  Principal const& principal,
273  bool skipCurrentProcess,
274  ServiceToken const& token,
276  ModuleCallingContext const* mcc) const noexcept {
277  //need to try changing m_prefetchRequested before adding to m_waitingTasks
278  bool expected = false;
279  bool prefetchRequested = m_prefetchRequested.compare_exchange_strong(expected, true);
280  m_waitingTasks.add(waitTask);
281 
282  if (prefetchRequested) {
283  ServiceWeakToken weakToken = token;
284  auto workToDo = [this, mcc, &principal, weakToken]() {
285  //need to make sure Service system is activated on the reading thread
286  ServiceRegistry::Operate operate(weakToken.lock());
287  // Caught exception is propagated via WaitingTaskList
288  CMS_SA_ALLOW try {
289  resolveProductImpl<true>([this, &principal, mcc]() {
290  if (principal.branchType() != InEvent && principal.branchType() != InProcess) {
291  return;
292  }
293  if (auto reader = principal.reader()) {
294  std::unique_lock<std::recursive_mutex> guard;
295  if (auto sr = reader->sharedResources().second) {
296  guard = std::unique_lock<std::recursive_mutex>(*sr);
297  }
298  if (not productResolved()) {
299  try {
300  //another thread could have finished this while we were waiting
301  setProduct(reader->getProduct(branchDescription().branchID(), &principal, mcc));
302  } catch (cms::Exception& e) {
303  extendException(e, branchDescription(), mcc);
304  throw;
305  } catch (std::exception const& e) {
306  auto newExcept = edm::Exception(errors::StdException) << e.what();
307  extendException(newExcept, branchDescription(), mcc);
308  throw newExcept;
309  }
310  }
311  }
312  });
313  } catch (...) {
314  this->m_waitingTasks.doneWaiting(std::current_exception());
315  return;
316  }
317  this->m_waitingTasks.doneWaiting(nullptr);
318  };
319 
320  SerialTaskQueueChain* queue = nullptr;
321  if (auto reader = principal.reader()) {
322  if (auto shared_res = reader->sharedResources().first) {
323  queue = &(shared_res->serialQueueChain());
324  }
325  }
326  if (queue) {
327  queue->push(*waitTask.group(), workToDo);
328  } else {
329  //Have to create a new task
330  auto t = make_functor_task(workToDo);
331  waitTask.group()->run([t]() {
332  TaskSentry s{t};
333  t->execute();
334  });
335  }
336  }
337  }
338 
340  if (not deleteEarly) {
341  m_prefetchRequested = false;
343  }
345  }
346 
348  aux_ = iConfigure.auxiliary();
349  }
350 
352 
353  void PutOnReadInputProductResolver::putProduct(std::unique_ptr<WrapperBase> edp) const {
354  if (status() != defaultStatus()) {
356  << "Attempt to insert more than one product on branch " << branchDescription().branchName() << "\n";
357  }
358 
359  setProduct(std::move(edp)); // ProductResolver takes ownership
360  }
361 
363 
365  bool skipCurrentProcess,
367  ModuleCallingContext const*) const {
368  return resolveProductImpl<false>([]() { return; });
369  }
370 
372  Principal const& principal,
373  bool skipCurrentProcess,
374  ServiceToken const& token,
376  ModuleCallingContext const* mcc) const noexcept {}
377 
378  void PutOnReadInputProductResolver::putOrMergeProduct(std::unique_ptr<WrapperBase> edp) const {
379  setOrMergeProduct(std::move(edp), nullptr);
380  }
381 
383  bool skipCurrentProcess,
385  ModuleCallingContext const*) const {
386  if (!skipCurrentProcess) {
387  //'false' means never call the lambda function
388  return resolveProductImpl<false>([]() { return; });
389  }
390  return Resolution(nullptr);
391  }
392 
394  Principal const& principal,
395  bool skipCurrentProcess,
396  ServiceToken const& token,
398  ModuleCallingContext const* mcc) const noexcept {
399  if (not skipCurrentProcess) {
400  if (branchDescription().branchType() == InProcess &&
402  // This is an accessInputProcessBlock transition
403  // We cannot access produced products in those transitions
404  // except for in SubProcesses where they should have already run.
405  return;
406  }
407  if (branchDescription().availableOnlyAtEndTransition() and mcc) {
408  if (not mcc->parent().isAtEndTransition()) {
409  return;
410  }
411  }
412 
413  if (waitingTasks_) {
414  // using a waiting task to do a callback guarantees that the
415  // waitingTasks_ list (from the worker) will be released from
416  // waiting even if the module does not put this data product
417  // or the module has an exception while running
418  waitingTasks_->add(waitTask);
419  }
420  }
421  }
422 
424  auto worker = iConfigure.findWorker(branchDescription().moduleLabel());
425  if (worker) {
426  waitingTasks_ = &worker->waitingTaskList();
427  }
428  }
429 
431  aux_ = iConfigure.auxiliary();
433  }
434 
436  bool skipCurrentProcess,
438  ModuleCallingContext const*) const {
439  if (!skipCurrentProcess and worker_) {
440  return resolveProductImpl<false>([] {});
441  }
442  return Resolution(nullptr);
443  }
444 
446  Principal const& principal,
447  bool skipCurrentProcess,
448  ServiceToken const& token,
450  ModuleCallingContext const* mcc) const noexcept {
451  if (skipCurrentProcess) {
452  return;
453  }
454  assert(worker_);
455  //need to try changing prefetchRequested_ before adding to waitingTasks_
456  bool expected = false;
457  bool prefetchRequested = prefetchRequested_.compare_exchange_strong(expected, true);
458  waitingTasks_.add(waitTask);
459  if (prefetchRequested) {
460  //Have to create a new task which will make sure the state for UnscheduledProductResolver
461  // is properly set after the module has run
462  auto t = make_waiting_task([this](std::exception_ptr const* iPtr) {
463  //The exception is being rethrown because resolveProductImpl sets the ProductResolver to a failed
464  // state for the case where an exception occurs during the call to the function.
465  // Caught exception is propagated via WaitingTaskList
466  CMS_SA_ALLOW try {
467  resolveProductImpl<true>([iPtr]() {
468  if (iPtr) {
469  std::rethrow_exception(*iPtr);
470  }
471  });
472  } catch (...) {
473  waitingTasks_.doneWaiting(std::current_exception());
474  return;
475  }
476  waitingTasks_.doneWaiting(nullptr);
477  });
478 
479  ParentContext parentContext(mcc);
480  EventTransitionInfo const& info = aux_->eventTransitionInfo();
482  WaitingTaskHolder(*waitTask.group(), t),
483  info,
484  token,
485  info.principal().streamID(),
486  parentContext,
487  mcc->getStreamContext());
488  }
489  }
490 
492  if (not deleteEarly) {
493  prefetchRequested_ = false;
495  }
497  }
498 
500  aux_ = iConfigure.auxiliary();
502  // worker can be missing if the corresponding module is
503  // unscheduled and none of its products are consumed
504  if (worker_) {
506  }
507  }
508 
510  bool skipCurrentProcess,
512  ModuleCallingContext const*) const {
513  if (!skipCurrentProcess and worker_) {
514  return resolveProductImpl<false>([] {});
515  }
516  return Resolution(nullptr);
517  }
518 
519  void TransformingProductResolver::putProduct(std::unique_ptr<WrapperBase> edp) const {
520  // Override putProduct() to not set the resolver status to
521  // ResolveFailed when the Event::commit_() checks which produced
522  // products were actually produced and which not, because the
523  // transforming products are never produced by time of commit_()
524  // by construction.
525  if (edp) {
527  }
528  }
529 
531  Principal const& principal,
532  bool skipCurrentProcess,
533  ServiceToken const& token,
535  ModuleCallingContext const* mcc) const noexcept {
536  if (skipCurrentProcess) {
537  return;
538  }
539  assert(worker_ != nullptr);
540  //need to try changing prefetchRequested_ before adding to waitingTasks_
541  bool expected = false;
542  bool prefetchRequested = prefetchRequested_.compare_exchange_strong(expected, true);
543  waitingTasks_.add(waitTask);
544  if (prefetchRequested) {
545  //Have to create a new task which will make sure the state for TransformingProductResolver
546  // is properly set after the module has run
547  auto t = make_waiting_task([this](std::exception_ptr const* iPtr) {
548  //The exception is being rethrown because resolveProductImpl sets the ProductResolver to a failed
549  // state for the case where an exception occurs during the call to the function.
550  // Caught exception is propagated via WaitingTaskList
551  CMS_SA_ALLOW try {
552  resolveProductImpl<true>([iPtr]() {
553  if (iPtr) {
554  std::rethrow_exception(*iPtr);
555  }
556  });
557  } catch (...) {
558  waitingTasks_.doneWaiting(std::current_exception());
559  return;
560  }
561  waitingTasks_.doneWaiting(nullptr);
562  });
563 
564  //This gives a lifetime greater than this call
565  ParentContext parent(mcc);
566  mcc_ = ModuleCallingContext(
567  worker_->description(), index_ + 1, ModuleCallingContext::State::kPrefetching, parent, nullptr);
568 
569  EventTransitionInfo const& info = aux_->eventTransitionInfo();
570  worker_->doTransformAsync(WaitingTaskHolder(*waitTask.group(), t),
571  index_,
572  info.principal(),
573  token,
574  info.principal().streamID(),
575  mcc_,
576  mcc->getStreamContext());
577  }
578  }
579 
581  if (not deleteEarly) {
582  prefetchRequested_ = false;
584  }
586  }
587 
588  void ProducedProductResolver::putProduct(std::unique_ptr<WrapperBase> edp) const {
589  if (status() != defaultStatus()) {
591  << "Attempt to insert more than one product on branch " << branchDescription().branchName() << "\n";
592  }
593 
594  setProduct(std::move(edp)); // ProductResolver takes ownership
595  }
596 
597  bool ProducedProductResolver::isFromCurrentProcess() const { return true; }
598 
600 
602  // Check if the types match.
603  TypeID typeID(prod.dynamicTypeInfo());
605  // Types do not match.
607  << "Product on branch " << branchDescription().branchName() << " is of wrong type.\n"
608  << "It is supposed to be of type " << branchDescription().className() << ".\n"
609  << "It is actually of type " << typeID.className() << ".\n";
610  }
611  }
612 
613  void DataManagingProductResolver::setProduct(std::unique_ptr<WrapperBase> edp) const {
614  if (edp) {
615  checkType(*edp);
618  } else {
619  setFailedStatus();
620  }
621  }
622  void DataManagingProductResolver::setProduct(std::shared_ptr<WrapperBase> edp) const {
623  if (edp) {
624  checkType(*edp);
627  } else {
628  setFailedStatus();
629  }
630  }
631 
632  // This routine returns true if it is known that currently there is no real product.
633  // If there is a real product, it returns false.
634  // If it is not known if there is a real product, it returns false.
636  auto presentStatus = status();
637  if (presentStatus == ProductStatus::ProductSet) {
638  return !(getProductData().wrapper()->isPresent());
639  }
640  return presentStatus != ProductStatus::ResolveNotRun;
641  }
642 
644  auto s = status();
645  return (s != defaultStatus()) or (s == ProductStatus::ProductDeleted);
646  }
647 
648  // This routine returns true if the product was deleted early in order to save memory
650 
651  bool DataManagingProductResolver::productWasFetchedAndIsValid_(bool iSkipCurrentProcess) const {
652  if (iSkipCurrentProcess and isFromCurrentProcess()) {
653  return false;
654  }
656  if (getProductData().wrapper()->isPresent()) {
657  return true;
658  }
659  }
660  return false;
661  }
662 
664  productData_.setProvenance(provRetriever);
665  }
666 
668 
670  MergeableRunProductMetadata const* mrpm) {
672  }
673 
675  return provenance()->productProvenance();
676  }
677 
681  }
682  if (deleteEarly) {
684  } else {
685  resetStatus();
686  }
687  }
688 
689  bool DataManagingProductResolver::singleProduct_() const { return true; }
690 
693  }
694 
696 
698  return provenance()->productProvenance();
699  }
700 
702 
703  bool AliasProductResolver::singleProduct_() const { return true; }
704 
705  SwitchBaseProductResolver::SwitchBaseProductResolver(std::shared_ptr<BranchDescription const> bd,
707  : realProduct_(realProduct), productData_(std::move(bd)), prefetchRequested_(false) {
708  // Parentage of this branch is always the same by construction, so we can compute the ID just "once" here.
709  Parentage p;
710  p.setParents(std::vector<BranchID>{realProduct.branchDescription().originalBranchID()});
711  parentageID_ = p.id();
713  }
714 
715  void SwitchBaseProductResolver::connectTo(ProductResolverBase const& iOther, Principal const* iParentPrincipal) {
717  << "SwitchBaseProductResolver::connectTo() not implemented and should never be called.\n"
718  << "Contact a Framework developer\n";
719  }
720 
723  }
724 
726  if (res.data() == nullptr)
727  return res;
728  return Resolution(&productData_);
729  }
730 
732  // SwitchProducer will never put anything in the event, and
733  // "false" will make Event::commit_() to call putProduct() with
734  // null unique_ptr<WrapperBase> to signal that the produce() was
735  // run.
736  return false;
737  }
738 
740  productData_.setProvenance(provRetriever);
741  }
742 
744  // insertIntoSet is const, so let's exploit that to fake the getting of the "input" product
746  }
747 
750  realProduct_.resetProductData_(deleteEarly);
751  if (not deleteEarly) {
752  prefetchRequested_ = false;
754  }
755  }
756 
758  // update provenance
760  // Use the Wrapper of the pointed-to resolver, but the provenance of this resolver
761  productData_.unsafe_setWrapper(realProduct().getProductData().sharedConstWrapper());
762  }
763 
764  SwitchProducerProductResolver::SwitchProducerProductResolver(std::shared_ptr<BranchDescription const> bd,
766  : SwitchBaseProductResolver(std::move(bd), realProduct), status_(defaultStatus_) {}
767 
769  bool skipCurrentProcess,
771  ModuleCallingContext const* mcc) const {
773  return resolveProductImpl(realProduct().resolveProduct(principal, skipCurrentProcess, sra, mcc));
774  }
775  return Resolution(nullptr);
776  }
777 
779  Principal const& principal,
780  bool skipCurrentProcess,
781  ServiceToken const& token,
783  ModuleCallingContext const* mcc) const noexcept {
784  if (skipCurrentProcess) {
785  return;
786  }
787  if (branchDescription().availableOnlyAtEndTransition() and mcc and not mcc->parent().isAtEndTransition()) {
788  return;
789  }
790 
791  //need to try changing prefetchRequested before adding to waitingTasks
792  bool expected = false;
793  bool doPrefetchRequested = prefetchRequested().compare_exchange_strong(expected, true);
794  waitingTasks().add(waitTask);
795 
796  if (doPrefetchRequested) {
797  //using a waiting task to do a callback guarantees that
798  // the waitingTasks() list will be released from waiting even
799  // if the module does not put this data product or the
800  // module has an exception while running
801  auto waiting = make_waiting_task([this](std::exception_ptr const* iException) {
802  if (nullptr != iException) {
803  waitingTasks().doneWaiting(*iException);
804  } else {
805  unsafe_setWrapperAndProvenance();
806  waitingTasks().doneWaiting(std::exception_ptr());
807  }
808  });
809  worker()->callWhenDoneAsync(WaitingTaskHolder(*waitTask.group(), waiting));
810  }
811  }
812 
813  void SwitchProducerProductResolver::putProduct(std::unique_ptr<WrapperBase> edp) const {
814  if (status_ != defaultStatus_) {
816  << "Attempt to insert more than one product for a branch " << branchDescription().branchName()
817  << "This makes no sense for SwitchProducerProductResolver.\nContact a Framework developer";
818  }
819  // Let's use ResolveFailed to signal that produce() was called, as
820  // there is no real product in this resolver
822  bool expected = false;
823  if (prefetchRequested().compare_exchange_strong(expected, true)) {
825  waitingTasks().doneWaiting(std::exception_ptr());
826  }
827  }
828 
830  // if produce() was run (ResolveFailed), ask from the real resolver
832  return realProduct().productUnavailable();
833  }
834  return true;
835  }
836 
839  if (not deleteEarly) {
841  }
842  }
843 
845  bool skipCurrentProcess,
847  ModuleCallingContext const* mcc) const {
848  return resolveProductImpl(realProduct().resolveProduct(principal, skipCurrentProcess, sra, mcc));
849  }
850 
852  Principal const& principal,
853  bool skipCurrentProcess,
854  ServiceToken const& token,
856  ModuleCallingContext const* mcc) const noexcept {
857  if (skipCurrentProcess) {
858  return;
859  }
860 
861  //need to try changing prefetchRequested_ before adding to waitingTasks_
862  bool expected = false;
863  bool doPrefetchRequested = prefetchRequested().compare_exchange_strong(expected, true);
864  waitingTasks().add(waitTask);
865 
866  if (doPrefetchRequested) {
867  //using a waiting task to do a callback guarantees that
868  // the waitingTasks() list will be released from waiting even
869  // if the module does not put this data product or the
870  // module has an exception while running
871  auto waiting = make_waiting_task([this](std::exception_ptr const* iException) {
872  if (nullptr != iException) {
873  waitingTasks().doneWaiting(*iException);
874  } else {
875  unsafe_setWrapperAndProvenance();
876  waitingTasks().doneWaiting(std::exception_ptr());
877  }
878  });
879  realProduct().prefetchAsync(
880  WaitingTaskHolder(*waitTask.group(), waiting), principal, skipCurrentProcess, token, sra, mcc);
881  }
882  }
883 
885  provRetriever_ = provRetriever;
886  }
887 
889 
891  return provRetriever_ ? provRetriever_->branchIDToProvenance(bd_->originalBranchID()) : nullptr;
892  }
893 
895 
896  bool ParentProcessProductResolver::singleProduct_() const { return true; }
897 
899  // In principle, this ought to be fixed. I noticed one hits this error
900  // when in a SubProcess and calling the Event::getProvenance function
901  // with a BranchID to a branch from an earlier SubProcess or the top
902  // level process and this branch is not kept in this SubProcess. It might
903  // be possible to hit this in other contexts. I say it ought to be
904  // fixed because one does not encounter this issue if the SubProcesses
905  // are split into genuinely different processes (in principle that
906  // ought to give identical behavior and results). No user has ever
907  // reported this issue which has been around for some time and it was only
908  // noticed when testing some rare corner cases after modifying Core code.
909  // After discussing this with Chris we decided that at least for the moment
910  // there are higher priorities than fixing this ... I converted it so it
911  // causes an exception instead of a seg fault. The issue that may need to
912  // be addressed someday is how ProductResolvers for non-kept branches are
913  // connected to earlier SubProcesses.
915  << "ParentProcessProductResolver::throwNullRealProduct RealProduct pointer not set in this context.\n"
916  << "Contact a Framework developer\n";
917  }
918 
919  NoProcessProductResolver::NoProcessProductResolver(std::vector<ProductResolverIndex> const& matchingHolders,
920  std::vector<bool> const& ambiguous,
921  bool madeAtEnd)
922  : matchingHolders_(matchingHolders),
923  ambiguous_(ambiguous),
924  lastCheckIndex_(ambiguous_.size() + kUnsetOffset),
925  lastSkipCurrentCheckIndex_(lastCheckIndex_.load()),
926  prefetchRequested_(false),
927  skippingPrefetchRequested_(false),
928  madeAtEnd_{madeAtEnd} {
929  assert(ambiguous_.size() == matchingHolders_.size());
930  }
931 
933  Principal const& principal,
934  bool skipCurrentProcess,
936  ModuleCallingContext const* mcc) const {
937  ProductResolverBase const* productResolver = principal.getProductResolverByIndex(matchingHolders_[index]);
938  return productResolver->resolveProduct(principal, skipCurrentProcess, sra, mcc);
939  }
940 
942  bool skipCurrentProcess,
944  ModuleCallingContext const* mcc) const {
945  //See if we've already cached which Resolver we should call or if
946  // we know it is ambiguous
947  const unsigned int choiceSize = ambiguous_.size();
948 
949  //madeAtEnd_==true and not at end transition is the same as skipping the current process
950  if ((not skipCurrentProcess) and (madeAtEnd_ and mcc)) {
951  skipCurrentProcess = not mcc->parent().isAtEndTransition();
952  }
953 
954  unsigned int checkCacheIndex = skipCurrentProcess ? lastSkipCurrentCheckIndex_.load() : lastCheckIndex_.load();
955  if (checkCacheIndex != choiceSize + kUnsetOffset) {
956  if (checkCacheIndex == choiceSize + kAmbiguousOffset) {
958  } else if (checkCacheIndex == choiceSize + kMissingOffset) {
959  return Resolution(nullptr);
960  }
961  return tryResolver(checkCacheIndex, principal, skipCurrentProcess, sra, mcc);
962  }
963 
964  std::atomic<unsigned int>& updateCacheIndex = skipCurrentProcess ? lastSkipCurrentCheckIndex_ : lastCheckIndex_;
965 
966  std::vector<unsigned int> const& lookupProcessOrder = principal.lookupProcessOrder();
967  for (unsigned int k : lookupProcessOrder) {
968  assert(k < ambiguous_.size());
969  if (k == 0)
970  break; // Done
971  if (ambiguous_[k]) {
972  updateCacheIndex = choiceSize + kAmbiguousOffset;
974  }
976  auto resolution = tryResolver(k, principal, skipCurrentProcess, sra, mcc);
977  if (resolution.data() != nullptr) {
978  updateCacheIndex = k;
979  return resolution;
980  }
981  }
982  }
983 
984  updateCacheIndex = choiceSize + kMissingOffset;
985  return Resolution(nullptr);
986  }
987 
989  Principal const& principal,
990  bool skipCurrentProcess,
991  ServiceToken const& token,
993  ModuleCallingContext const* mcc) const noexcept {
994  bool timeToMakeAtEnd = true;
995  if (madeAtEnd_ and mcc) {
996  timeToMakeAtEnd = mcc->parent().isAtEndTransition();
997  }
998 
999  //If timeToMakeAtEnd is false, then it is equivalent to skipping the current process
1000  if (not skipCurrentProcess and timeToMakeAtEnd) {
1001  //need to try changing prefetchRequested_ before adding to waitingTasks_
1002  bool expected = false;
1003  bool prefetchRequested = prefetchRequested_.compare_exchange_strong(expected, true);
1004  waitingTasks_.add(waitTask);
1005 
1006  if (prefetchRequested) {
1007  //we are the first thread to request
1008  tryPrefetchResolverAsync(0, principal, false, sra, mcc, token, waitTask.group());
1009  }
1010  } else {
1011  skippingWaitingTasks_.add(waitTask);
1012  bool expected = false;
1013  if (skippingPrefetchRequested_.compare_exchange_strong(expected, true)) {
1014  //we are the first thread to request
1015  tryPrefetchResolverAsync(0, principal, true, sra, mcc, token, waitTask.group());
1016  }
1017  }
1018  }
1019 
1020  void NoProcessProductResolver::setCache(bool iSkipCurrentProcess,
1021  ProductResolverIndex iIndex,
1022  std::exception_ptr iExceptPtr) const {
1023  if (not iSkipCurrentProcess) {
1024  lastCheckIndex_ = iIndex;
1025  waitingTasks_.doneWaiting(iExceptPtr);
1026  } else {
1027  lastSkipCurrentCheckIndex_ = iIndex;
1028  skippingWaitingTasks_.doneWaiting(iExceptPtr);
1029  }
1030  }
1031 
1032  namespace {
1033  class TryNextResolverWaitingTask : public edm::WaitingTask {
1034  public:
1035  TryNextResolverWaitingTask(NoProcessProductResolver const* iResolver,
1036  unsigned int iResolverIndex,
1037  Principal const* iPrincipal,
1039  ModuleCallingContext const* iMCC,
1040  bool iSkipCurrentProcess,
1041  ServiceToken iToken,
1042  oneapi::tbb::task_group* iGroup) noexcept
1043  : resolver_(iResolver),
1044  principal_(iPrincipal),
1045  sra_(iSRA),
1046  mcc_(iMCC),
1047  group_(iGroup),
1048  serviceToken_(iToken),
1049  index_(iResolverIndex),
1050  skipCurrentProcess_(iSkipCurrentProcess) {}
1051 
1052  void execute() final {
1053  auto exceptPtr = exceptionPtr();
1054  if (exceptPtr) {
1055  resolver_->prefetchFailed(index_, *principal_, skipCurrentProcess_, exceptPtr);
1056  } else {
1057  if (not resolver_->dataValidFromResolver(index_, *principal_, skipCurrentProcess_)) {
1058  resolver_->tryPrefetchResolverAsync(
1059  index_ + 1, *principal_, skipCurrentProcess_, sra_, mcc_, serviceToken_.lock(), group_);
1060  }
1061  }
1062  }
1063 
1064  private:
1065  NoProcessProductResolver const* resolver_;
1066  Principal const* principal_;
1068  ModuleCallingContext const* mcc_;
1069  oneapi::tbb::task_group* group_;
1070  ServiceWeakToken serviceToken_;
1071  unsigned int index_;
1072  bool skipCurrentProcess_;
1073  };
1074  } // namespace
1075 
1076  void NoProcessProductResolver::prefetchFailed(unsigned int iProcessingIndex,
1077  Principal const& principal,
1078  bool iSkipCurrentProcess,
1079  std::exception_ptr iExceptPtr) const {
1080  std::vector<unsigned int> const& lookupProcessOrder = principal.lookupProcessOrder();
1081  auto k = lookupProcessOrder[iProcessingIndex];
1082 
1083  setCache(iSkipCurrentProcess, k, iExceptPtr);
1084  }
1085 
1086  bool NoProcessProductResolver::dataValidFromResolver(unsigned int iProcessingIndex,
1087  Principal const& principal,
1088  bool iSkipCurrentProcess) const {
1089  std::vector<unsigned int> const& lookupProcessOrder = principal.lookupProcessOrder();
1090  auto k = lookupProcessOrder[iProcessingIndex];
1091  ProductResolverBase const* productResolver = principal.getProductResolverByIndex(matchingHolders_[k]);
1092 
1093  if (productResolver->productWasFetchedAndIsValid(iSkipCurrentProcess)) {
1094  setCache(iSkipCurrentProcess, k, nullptr);
1095  return true;
1096  }
1097  return false;
1098  }
1099 
1100  void NoProcessProductResolver::tryPrefetchResolverAsync(unsigned int iProcessingIndex,
1101  Principal const& principal,
1102  bool skipCurrentProcess,
1104  ModuleCallingContext const* mcc,
1106  oneapi::tbb::task_group* group) const noexcept {
1107  std::vector<unsigned int> const& lookupProcessOrder = principal.lookupProcessOrder();
1108  auto index = iProcessingIndex;
1109 
1110  const unsigned int choiceSize = ambiguous_.size();
1111  unsigned int newCacheIndex = choiceSize + kMissingOffset;
1112  while (index < lookupProcessOrder.size()) {
1113  auto k = lookupProcessOrder[index];
1114  if (k == 0) {
1115  break;
1116  }
1117  assert(k < ambiguous_.size());
1118  if (ambiguous_[k]) {
1119  newCacheIndex = choiceSize + kAmbiguousOffset;
1120  break;
1121  }
1122  if (matchingHolders_[k] != ProductResolverIndexInvalid) {
1123  //make new task
1124 
1125  auto task = new TryNextResolverWaitingTask(this, index, &principal, sra, mcc, skipCurrentProcess, token, group);
1126  WaitingTaskHolder hTask(*group, task);
1127  ProductResolverBase const* productResolver = principal.getProductResolverByIndex(matchingHolders_[k]);
1128 
1129  //Make sure the Services are available on this thread
1131 
1132  productResolver->prefetchAsync(hTask, principal, skipCurrentProcess, token, sra, mcc);
1133  return;
1134  }
1135  ++index;
1136  }
1137  //data product unavailable
1138  setCache(skipCurrentProcess, newCacheIndex, nullptr);
1139  }
1140 
1142 
1144 
1146 
1147  inline unsigned int NoProcessProductResolver::unsetIndexValue() const { return ambiguous_.size() + kUnsetOffset; }
1148 
1150  // This function should never receive 'true'. On the other hand,
1151  // nothing should break if a 'true' is passed, because
1152  // NoProcessProductResolver just forwards the resolve
1153  const auto resetValue = unsetIndexValue();
1154  lastCheckIndex_ = resetValue;
1155  lastSkipCurrentCheckIndex_ = resetValue;
1156  prefetchRequested_ = false;
1158  waitingTasks_.reset();
1160  }
1161 
1162  bool NoProcessProductResolver::singleProduct_() const { return false; }
1163 
1166  << "NoProcessProductResolver::unscheduledWasNotRun_() not implemented and should never be called.\n"
1167  << "Contact a Framework developer\n";
1168  }
1169 
1172  << "NoProcessProductResolver::productUnavailable_() not implemented and should never be called.\n"
1173  << "Contact a Framework developer\n";
1174  }
1175 
1178  << "NoProcessProductResolver::productResolved_() not implemented and should never be called.\n"
1179  << "Contact a Framework developer\n";
1180  }
1181 
1184  << "NoProcessProductResolver::productWasDeleted_() not implemented and should never be called.\n"
1185  << "Contact a Framework developer\n";
1186  }
1187 
1188  bool NoProcessProductResolver::productWasFetchedAndIsValid_(bool /*iSkipCurrentProcess*/) const {
1190  << "NoProcessProductResolver::productWasFetchedAndIsValid_() not implemented and should never be called.\n"
1191  << "Contact a Framework developer\n";
1192  }
1193 
1196  << "NoProcessProductResolver::branchDescription_() not implemented and should never be called.\n"
1197  << "Contact a Framework developer\n";
1198  }
1199 
1200  void NoProcessProductResolver::resetBranchDescription_(std::shared_ptr<BranchDescription const>) {
1202  << "NoProcessProductResolver::resetBranchDescription_() not implemented and should never be called.\n"
1203  << "Contact a Framework developer\n";
1204  }
1205 
1208  << "NoProcessProductResolver::provenance_() not implemented and should never be called.\n"
1209  << "Contact a Framework developer\n";
1210  }
1211 
1214  << "NoProcessProductResolver::connectTo() not implemented and should never be called.\n"
1215  << "Contact a Framework developer\n";
1216  }
1217 
1218  //---- SingleChoiceNoProcessProductResolver ----------------
1220  Principal const& principal,
1221  bool skipCurrentProcess,
1223  ModuleCallingContext const* mcc) const {
1224  //NOTE: Have to lookup the other ProductResolver each time rather than cache
1225  // it's pointer since it appears the pointer can change at some later stage
1227  ->resolveProduct(principal, skipCurrentProcess, sra, mcc);
1228  }
1229 
1231  Principal const& principal,
1232  bool skipCurrentProcess,
1233  ServiceToken const& token,
1235  ModuleCallingContext const* mcc) const noexcept {
1236  principal.getProductResolverByIndex(realResolverIndex_)
1237  ->prefetchAsync(waitTask, principal, skipCurrentProcess, token, sra, mcc);
1238  }
1239 
1241 
1243 
1245 
1247 
1249 
1252  << "SingleChoiceNoProcessProductResolver::unscheduledWasNotRun_() not implemented and should never be called.\n"
1253  << "Contact a Framework developer\n";
1254  }
1255 
1258  << "SingleChoiceNoProcessProductResolver::productUnavailable_() not implemented and should never be called.\n"
1259  << "Contact a Framework developer\n";
1260  }
1261 
1264  << "SingleChoiceNoProcessProductResolver::productResolved_() not implemented and should never be called.\n"
1265  << "Contact a Framework developer\n";
1266  }
1267 
1270  << "SingleChoiceNoProcessProductResolver::productWasDeleted_() not implemented and should never be called.\n"
1271  << "Contact a Framework developer\n";
1272  }
1273 
1275  throw Exception(errors::LogicError) << "SingleChoiceNoProcessProductResolver::productWasFetchedAndIsValid_() not "
1276  "implemented and should never be called.\n"
1277  << "Contact a Framework developer\n";
1278  }
1279 
1282  << "SingleChoiceNoProcessProductResolver::branchDescription_() not implemented and should never be called.\n"
1283  << "Contact a Framework developer\n";
1284  }
1285 
1286  void SingleChoiceNoProcessProductResolver::resetBranchDescription_(std::shared_ptr<BranchDescription const>) {
1287  throw Exception(errors::LogicError) << "SingleChoiceNoProcessProductResolver::resetBranchDescription_() not "
1288  "implemented and should never be called.\n"
1289  << "Contact a Framework developer\n";
1290  }
1291 
1294  << "SingleChoiceNoProcessProductResolver::provenance_() not implemented and should never be called.\n"
1295  << "Contact a Framework developer\n";
1296  }
1297 
1300  << "SingleChoiceNoProcessProductResolver::connectTo() not implemented and should never be called.\n"
1301  << "Contact a Framework developer\n";
1302  }
1303 
1304 } // namespace edm
void setProductProvenanceRetriever_(ProductProvenanceRetriever const *provRetriever) override
void connectTo(ProductResolverBase const &iOther, Principal const *) final
void setProductID(ProductID const &pid)
ProductData const & getProductData() const final
ProductProvenance const * productProvenancePtr_() const override
void setOrMergeProduct(std::shared_ptr< WrapperBase > prod, MergeableRunProductMetadata const *mergeableRunProductMetadata) const
BranchDescription const & branchDescription_() const override
void setProductProvenanceRetriever_(ProductProvenanceRetriever const *provRetriever) final
std::string const & productInstanceName() const
std::unique_ptr< T, F > make_sentry(T *iObject, F iFunc)
NOTE: if iObject is null, then iFunc will not be called.
Definition: make_sentry.h:30
void resetProductData_(bool deleteEarly) override
void putProduct(std::unique_ptr< WrapperBase > edp) const override
static const TGPicture * info(bool iBackgroundIsBlack)
#define CMS_SA_ALLOW
static constexpr unsigned int kAmbiguousOffset
Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const final
void prefetchAsync(WaitingTaskHolder waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const noexcept
bool singleProduct_() const override
void exceptionContext(cms::Exception &, ESModuleCallingContext const &)
unsigned int ProductResolverIndex
ProductProvenance const * productProvenance() const
Definition: Provenance.cc:24
static constexpr unsigned int kMissingOffset
Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
std::shared_ptr< BranchDescription const > bd_
Provenance const & provenance() const
Definition: ProductData.h:33
void resetProductData_(bool deleteEarly) override
Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
void prefetchAsync_(WaitingTaskHolder waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const noexcept override
BranchType const & branchType() const
void setProductID_(ProductID const &pid) final
void setProductProvenanceRetriever_(ProductProvenanceRetriever const *provRetriever) override
Worker * findWorker(std::string const &iLabel) const
void putProduct(std::unique_ptr< WrapperBase > edp) const final
bool dataValidFromResolver(unsigned int iProcessingIndex, Principal const &principal, bool iSkipCurrentProcess) const
void checkType(WrapperBase const &prod) const
std::atomic< bool > prefetchRequested_
void resetProductData_(bool deleteEarly) final
WaitingTaskList & waitingTasks() const
Resolution resolveProduct(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const
Provenance const * provenance() const
TypeID unwrappedTypeID() const
std::string const & processName() const
GlobalContext const * globalContext() const noexcept(false)
void mergeProduct(std::shared_ptr< WrapperBase > edp, MergeableRunProductMetadata const *) const
SwitchProducerProductResolver(std::shared_ptr< BranchDescription const > bd, DataManagingOrAliasProductResolver &realProduct)
reader
Definition: DQM.py:105
void resetProductData_(bool deleteEarly) override
bool productWasFetchedAndIsValid_(bool iSkipCurrentProcess) const override
bool unscheduledWasNotRun_() const override
bool isAtEndTransition() const noexcept
bool singleProduct_() const override
Log< level::Error, false > LogError
bool productUnavailable_() const override
TypeWithDict const & unwrappedType() const
void setupUnscheduled(UnscheduledConfigurator const &) final
void reset()
Resets access to the resource so that added tasks will wait.
ProductProvenanceRetriever const * provRetriever_
void connectTo(ProductResolverBase const &iOther, Principal const *) final
ConstProductResolverPtr getProductResolverByIndex(ProductResolverIndex const &oid) const noexcept
Definition: Principal.cc:566
assert(be >=bs)
std::string const & processName() const
ProductData const & getProductData() const final
WrapperBase const * wrapper() const
Definition: ProductData.h:35
void resetProductData_(bool deleteEarly) override
void connectTo(ProductResolverBase const &, Principal const *) final
void setProductID(ProductID const &pid)
Definition: ProductData.h:58
Definition: Electron.h:6
DataManagingOrAliasProductResolver & realProduct_
UnscheduledAuxiliary const * aux_
void setupUnscheduled(UnscheduledConfigurator const &) final
ProductProvenance const * productProvenancePtr_() const override
UnscheduledAuxiliary const * aux_
void tryPrefetchResolverAsync(unsigned int iProcessingIndex, Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc, ServiceToken token, oneapi::tbb::task_group *) const noexcept
bool singleProduct_() const override
void emit(Args &&... args) const
Definition: Signal.h:50
ProductProvenanceLookup const * store() const
Definition: Provenance.h:60
void setProductID_(ProductID const &pid) override
MergeDecision getMergeDecision(std::string const &processThatCreatedProduct) const
UnscheduledAuxiliary const * aux_
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
virtual bool isFromCurrentProcess() const =0
void prefetchAsync_(WaitingTaskHolder waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const noexcept final
void setupUnscheduled(UnscheduledConfigurator const &iConfigure) final
void prefetchAsync_(WaitingTaskHolder waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const noexcept override
std::string const & moduleLabel() const
SwitchBaseProductResolver(std::shared_ptr< BranchDescription const > bd, DataManagingOrAliasProductResolver &realProduct)
std::string const & className() const
ProductProvenance const * productProvenancePtr_() const final
std::string const & branchName() const
std::atomic< unsigned int > lastSkipCurrentCheckIndex_
void resetProductData()
Definition: ProductData.h:52
std::atomic< bool > prefetchRequested_
std::string const & productInstanceName() const
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
DataManagingOrAliasProductResolver const & realProduct() const
bool productWasFetchedAndIsValid_(bool iSkipCurrentProcess) const final
void resetProductData_(bool deleteEarly) override
void prefetchAsync_(WaitingTaskHolder waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const noexcept override
void resetProductData_(bool deleteEarly) override
Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
bool isFromCurrentProcess() const final
bool productWasFetchedAndIsValid(bool iSkipCurrentProcess) const
static constexpr const ProductStatus defaultStatus_
Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const final
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:92
std::atomic< bool > prefetchRequested_
std::atomic< unsigned int > lastCheckIndex_
void setProductID_(ProductID const &pid) override
ProductProvenance const * branchIDToProvenance(BranchID const &bid) const
void prefetchAsync_(WaitingTaskHolder waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const noexcept override
void setMergeableRunProductMetadata(MergeableRunProductMetadataBase const *mrpm)
Definition: ProductData.h:60
ProductProvenance const * productProvenancePtr_() const override
void putOrMergeProduct(std::unique_ptr< WrapperBase > prod) const override
WrapperBase * unsafe_wrapper() const
Definition: ProductData.h:36
void setProductID_(ProductID const &pid) final
ServiceToken lock() const
Definition: ServiceToken.h:101
void setCache(bool skipCurrentProcess, ProductResolverIndex index, std::exception_ptr exceptionPtr) const
NoProcessProductResolver(std::vector< ProductResolverIndex > const &matchingHolders, std::vector< bool > const &ambiguous, bool madeAtEnd)
std::vector< unsigned int > const & lookupProcessOrder() const noexcept
Definition: Principal.h:191
void prefetchAsync_(WaitingTaskHolder waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const noexcept final
std::atomic< bool > skippingPrefetchRequested_
Resolution resolveProductImpl(Resolution) const
std::atomic< ProductStatus > theStatus_
std::vector< bool > ambiguous_
std::type_info const & unvalidatedTypeInfo() const
Definition: TypeWithDict.h:81
StreamContext const * getStreamContext() const noexcept(false)
void setProductProvenanceRetriever_(ProductProvenanceRetriever const *provRetriever) override
void resetProductData_(bool deleteEarly) override
static constexpr unsigned int kUnsetOffset
void setProductProvenanceRetriever_(ProductProvenanceRetriever const *provRetriever) override
Provenance const * provenance_() const override
void setupUnscheduled(UnscheduledConfigurator const &) final
ParentContext const & parent() const noexcept
void setMergeableRunProductMetadataInProductData(MergeableRunProductMetadata const *)
void setProvenance(ProductProvenanceLookup const *provRetriever)
Definition: ProductData.h:56
bool isPresent() const
Definition: WrapperBase.h:30
def load(fileName)
Definition: svgfig.py:547
FunctorTask< F > * make_functor_task(F f)
Definition: FunctorTask.h:44
ProductStatus defaultStatus() const
BranchDescription const & branchDescription() const
void putProduct(std::unique_ptr< WrapperBase > edp) const override
void prefetchAsync_(WaitingTaskHolder waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const noexcept override
void prefetchFailed(unsigned int iProcessingIndex, Principal const &principal, bool iSkipCurrentProcess, std::exception_ptr iExceptPtr) const
void setProductID_(ProductID const &pid) override
void setMergeableRunProductMetadata_(MergeableRunProductMetadata const *) override
DataManagingOrAliasProductResolver & realProduct_
signalslot::Signal< void(StreamContext const &, ModuleCallingContext const &)> preModuleDelayedGetSignal_
Transition transition() const
Definition: GlobalContext.h:59
std::vector< ProductResolverIndex > matchingHolders_
void resetProductData_(bool deleteEarly) override
Resolution tryResolver(unsigned int index, Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const
Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
HLT enums.
unsigned int unsetIndexValue() const
void retrieveAndMerge_(Principal const &principal, MergeableRunProductMetadata const *mergeableRunProductMetadata) const override
void setProduct(std::unique_ptr< WrapperBase > edp) const
void unsafe_setWrapper(std::unique_ptr< WrapperBase > iValue) const
Definition: ProductData.cc:27
Provenance const * provenance_() const override
bool productWasDeleted_() const override
void setupUnscheduled(UnscheduledConfigurator const &) final
Resolution resolveProductImpl(FUNC resolver) const
void prefetchAsync_(WaitingTaskHolder waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const noexcept override
void resetProductData_(bool deleteEarly) override=0
Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const final
std::atomic< bool > prefetchRequested_
Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
Log< level::Warning, false > LogWarning
void resetProductData_(bool deleteEarly) override
signalslot::Signal< void(StreamContext const &, ModuleCallingContext const &)> postModuleDelayedGetSignal_
UnscheduledAuxiliary const * auxiliary() const
void prefetchAsync_(WaitingTaskHolder waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const noexcept final
std::atomic< bool > & prefetchRequested() const
static ParentageRegistry * instance()
BranchDescription const & branchDescription_() const override
bool productWasFetchedAndIsValid_(bool iSkipCurrentProcess) const override
std::string const & moduleLabel() const
void setProductID_(ProductID const &pid) override
ProductProvenance const * productProvenancePtr_() const override
void resetBranchDescription_(std::shared_ptr< BranchDescription const > bd) override
bool insertMapped(value_type const &v)
virtual size_t transformIndex(edm::BranchDescription const &) const noexcept=0
Definition: Worker.cc:253
void resetBranchDescription_(std::shared_ptr< BranchDescription const > bd) override
def move(src, dest)
Definition: eostools.py:511
void setProductProvenanceRetriever_(ProductProvenanceRetriever const *provRetriever) final
DelayedReader * reader() const
Definition: Principal.h:181
void putProduct(std::unique_ptr< WrapperBase > edp) const override
void setProductProvenanceRetriever(ProductProvenanceRetriever const *provRetriever)
void insertIntoSet(ProductProvenance provenanceProduct) const
static HepMC::HEPEVT_Wrapper wrapper
BranchID const & originalBranchID() const
Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
void connectTo(ProductResolverBase const &iOther, Principal const *iParentPrincipal) final