CMS 3D CMS Logo

ProductResolvers.cc
Go to the documentation of this file.
1 /*----------------------------------------------------------------------
2 ----------------------------------------------------------------------*/
3 #include "ProductResolvers.h"
23 
24 #include <cassert>
25 #include <utility>
26 
27 static constexpr unsigned int kUnsetOffset = 0;
28 static constexpr unsigned int kAmbiguousOffset = 1;
29 static constexpr unsigned int kMissingOffset = 2;
30 
31 namespace edm {
32 
35  exception << "DataManagingProductResolver::resolveProduct_: The product matching all criteria was already deleted\n"
36  << "Looking for type: " << branchDescription().unwrappedTypeID() << "\n"
37  << "Looking for module label: " << moduleLabel() << "\n"
38  << "Looking for productInstanceName: " << productInstanceName() << "\n"
39  << (processName().empty() ? "" : "Looking for process: ") << processName() << "\n"
40  << "This means there is a configuration error.\n"
41  << "The module which is asking for this data must be configured to state that it will read this data.";
42  throw exception;
43  }
44 
45  //This is a templated function in order to avoid calling another virtual function
46  template <bool callResolver, typename FUNC>
48  if (productWasDeleted()) {
50  }
51  auto presentStatus = status();
52 
53  if (callResolver && presentStatus == ProductStatus::ResolveNotRun) {
54  //if resolver fails because of exception or not setting product
55  // make sure the status goes to failed
56  auto failedStatusSetter = [this](ProductStatus* iPresentStatus) {
57  if (this->status() == ProductStatus::ResolveNotRun) {
58  this->setFailedStatus();
59  }
60  *iPresentStatus = this->status();
61  };
62  std::unique_ptr<ProductStatus, decltype(failedStatusSetter)> failedStatusGuard(&presentStatus,
63  failedStatusSetter);
64 
65  //If successful, this will call setProduct
66  resolver();
67  }
68 
69  if (presentStatus == ProductStatus::ProductSet) {
70  auto pd = &getProductData();
71  if (pd->wrapper()->isPresent()) {
72  return Resolution(pd);
73  }
74  }
75 
76  return Resolution(nullptr);
77  }
78 
80  std::shared_ptr<WrapperBase> iFrom, MergeableRunProductMetadata const* mergeableRunProductMetadata) const {
81  // if its not mergeable and the previous read failed, go ahead and use this one
83  setProduct(std::move(iFrom));
84  return;
85  }
86 
88  if (not iFrom) {
89  return;
90  }
91 
92  checkType(*iFrom);
93 
95  if (original->isMergeable()) {
96  if (original->isPresent() != iFrom->isPresent()) {
98  << "Merge of Run or Lumi product failed for branch " << branchDescription().branchName() << "\n"
99  << "Was trying to merge objects where one product had been put in the input file and the other had not "
100  "been."
101  << "\n"
102  << "The solution is to drop the branch on input. Or better do not create inconsistent files\n"
103  << "that need to be merged in the first place.\n";
104  }
105  if (original->isPresent()) {
107  if (mergeableRunProductMetadata == nullptr || desc.branchType() != InRun) {
108  original->mergeProduct(iFrom.get());
109  } else {
111  mergeableRunProductMetadata->getMergeDecision(desc.processName());
112  if (decision == MergeableRunProductMetadata::MERGE) {
113  original->mergeProduct(iFrom.get());
114  } else if (decision == MergeableRunProductMetadata::REPLACE) {
115  // Note this swaps the content of the product where the
116  // both products branches are present and the product is
117  // also present (was put) in the branch. A module might
118  // have already gotten a pointer to the product so we
119  // keep those pointers valid. This does not call swap
120  // on the Wrapper.
121  original->swapProduct(iFrom.get());
122  }
123  // If the decision is IGNORE, do nothing
124  }
125  }
126  // If both have isPresent false, do nothing
127 
128  } else if (original->hasIsProductEqual()) {
129  if (original->isPresent() && iFrom->isPresent()) {
130  if (!original->isProductEqual(iFrom.get())) {
131  auto const& bd = branchDescription();
132  edm::LogError("RunLumiMerging")
133  << "ProductResolver::mergeTheProduct\n"
134  << "Two run/lumi products for the same run/lumi which should be equal are not\n"
135  << "Using the first, ignoring the second\n"
136  << "className = " << bd.className() << "\n"
137  << "moduleLabel = " << bd.moduleLabel() << "\n"
138  << "instance = " << bd.productInstanceName() << "\n"
139  << "process = " << bd.processName() << "\n";
140  }
141  } else if (!original->isPresent() && iFrom->isPresent()) {
142  setProduct(std::move(iFrom));
143  }
144  // if not iFrom->isPresent(), do nothing
145  } else {
146  auto const& bd = branchDescription();
147  edm::LogWarning("RunLumiMerging") << "ProductResolver::mergeTheProduct\n"
148  << "Run/lumi product has neither a mergeProduct nor isProductEqual function\n"
149  << "Using the first, ignoring the second in merge\n"
150  << "className = " << bd.className() << "\n"
151  << "moduleLabel = " << bd.moduleLabel() << "\n"
152  << "instance = " << bd.productInstanceName() << "\n"
153  << "process = " << bd.processName() << "\n";
154  if (!original->isPresent() && iFrom->isPresent()) {
155  setProduct(std::move(iFrom));
156  }
157  // In other cases, do nothing
158  }
159  }
160 
161  namespace {
162  cms::Exception& extendException(cms::Exception& e, BranchDescription const& bd, ModuleCallingContext const* mcc) {
163  e.addContext(std::string("While reading from source ") + bd.className() + " " + bd.moduleLabel() + " '" +
164  bd.productInstanceName() + "' " + bd.processName());
165  if (mcc) {
166  edm::exceptionContext(e, *mcc);
167  }
168  return e;
169  }
170  } // namespace
172  Principal const& principal, bool, SharedResourcesAcquirer*, ModuleCallingContext const* mcc) const {
173  return resolveProductImpl<true>([this, &principal, mcc]() {
174  auto branchType = principal.branchType();
175  if (branchType == InLumi || branchType == InRun) {
176  //delayed get has not been allowed with Run or Lumis
177  // The file may already be closed so the reader is invalid
178  return;
179  }
180  if (mcc and branchType == InEvent and aux_) {
182  }
183 
184  auto sentry(make_sentry(mcc, [this, branchType](ModuleCallingContext const* iContext) {
185  if (branchType == InEvent and aux_) {
186  aux_->postModuleDelayedGetSignal_.emit(*(iContext->getStreamContext()), *iContext);
187  }
188  }));
189 
190  if (auto reader = principal.reader()) {
191  std::unique_lock<std::recursive_mutex> guard;
192  if (auto sr = reader->sharedResources().second) {
193  guard = std::unique_lock<std::recursive_mutex>(*sr);
194  }
195  if (not productResolved()) {
196  try {
197  //another thread could have beaten us here
198  setProduct(reader->getProduct(branchDescription().branchID(), &principal, mcc));
199  } catch (cms::Exception& e) {
200  throw extendException(e, branchDescription(), mcc);
201  } catch (std::exception const& e) {
202  auto newExcept = edm::Exception(errors::StdException) << e.what();
203  throw extendException(newExcept, branchDescription(), mcc);
204  }
205  }
206  }
207  });
208  }
209 
211  Principal const& principal, MergeableRunProductMetadata const* mergeableRunProductMetadata) const {
212  if (auto reader = principal.reader()) {
213  std::unique_lock<std::recursive_mutex> guard;
214  if (auto sr = reader->sharedResources().second) {
215  guard = std::unique_lock<std::recursive_mutex>(*sr);
216  }
217 
218  //Can't use resolveProductImpl since it first checks to see
219  // if the product was already retrieved and then returns if it is
220  auto edp(reader->getProduct(branchDescription().branchID(), &principal));
221 
222  if (edp.get() != nullptr) {
223  if (edp->isMergeable() && branchDescription().branchType() == InRun && !edp->hasSwap()) {
225  << "Missing definition of member function swap for branch name " << branchDescription().branchName()
226  << "\n"
227  << "Mergeable data types written to a Run must have the swap member function defined"
228  << "\n";
229  }
231  (status() == ProductStatus::ResolveFailed && !branchDescription().isMergeable())) {
232  setOrMergeProduct(std::move(edp), mergeableRunProductMetadata);
233  } else { // status() == ResolveFailed && branchDescription().isMergeable()
235  << "Merge of Run or Lumi product failed for branch " << branchDescription().branchName() << "\n"
236  << "The product branch was dropped in the first run or lumi fragment and present in a later one"
237  << "\n"
238  << "The solution is to drop the branch on input. Or better do not create inconsistent files\n"
239  << "that need to be merged in the first place.\n";
240  }
241  } else if (status() == defaultStatus()) {
242  setFailedStatus();
243  } else if (status() != ProductStatus::ResolveFailed && branchDescription().isMergeable()) {
245  << "Merge of Run or Lumi product failed for branch " << branchDescription().branchName() << "\n"
246  << "The product branch was present in first run or lumi fragment and dropped in a later one"
247  << "\n"
248  << "The solution is to drop the branch on input. Or better do not create inconsistent files\n"
249  << "that need to be merged in the first place.\n";
250  }
251  // Do nothing in other case. status is ResolveFailed already or
252  // it is not mergeable and the status is ProductSet
253  }
254  }
255 
257  std::shared_ptr<WrapperBase> prod, MergeableRunProductMetadata const* mergeableRunProductMetadata) const {
258  if (status() == defaultStatus()) {
259  //resolveProduct has not been called or it failed
261  } else {
262  mergeProduct(std::move(prod), mergeableRunProductMetadata);
263  }
264  }
265 
268  }
269 
271  Principal const& principal,
272  bool skipCurrentProcess,
273  ServiceToken const& token,
275  ModuleCallingContext const* mcc) const {
276  //need to try changing m_prefetchRequested before adding to m_waitingTasks
277  bool expected = false;
278  bool prefetchRequested = m_prefetchRequested.compare_exchange_strong(expected, true);
279  m_waitingTasks.add(waitTask);
280 
281  if (prefetchRequested) {
282  ServiceWeakToken weakToken = token;
283  auto workToDo = [this, mcc, &principal, weakToken]() {
284  //need to make sure Service system is activated on the reading thread
285  ServiceRegistry::Operate operate(weakToken.lock());
286  // Caught exception is propagated via WaitingTaskList
287  CMS_SA_ALLOW try {
288  resolveProductImpl<true>([this, &principal, mcc]() {
289  if (principal.branchType() != InEvent && principal.branchType() != InProcess) {
290  return;
291  }
292  if (auto reader = principal.reader()) {
293  std::unique_lock<std::recursive_mutex> guard;
294  if (auto sr = reader->sharedResources().second) {
295  guard = std::unique_lock<std::recursive_mutex>(*sr);
296  }
297  if (not productResolved()) {
298  try {
299  //another thread could have finished this while we were waiting
300  setProduct(reader->getProduct(branchDescription().branchID(), &principal, mcc));
301  } catch (cms::Exception& e) {
302  throw extendException(e, branchDescription(), mcc);
303  } catch (std::exception const& e) {
304  auto newExcept = edm::Exception(errors::StdException) << e.what();
305  throw extendException(newExcept, branchDescription(), mcc);
306  }
307  }
308  }
309  });
310  } catch (...) {
311  this->m_waitingTasks.doneWaiting(std::current_exception());
312  return;
313  }
314  this->m_waitingTasks.doneWaiting(nullptr);
315  };
316 
317  SerialTaskQueueChain* queue = nullptr;
318  if (auto reader = principal.reader()) {
319  if (auto shared_res = reader->sharedResources().first) {
320  queue = &(shared_res->serialQueueChain());
321  }
322  }
323  if (queue) {
324  queue->push(*waitTask.group(), workToDo);
325  } else {
326  //Have to create a new task
327  auto t = make_functor_task(workToDo);
328  waitTask.group()->run([t]() {
329  TaskSentry s{t};
330  t->execute();
331  });
332  }
333  }
334  }
335 
337  if (not deleteEarly) {
338  m_prefetchRequested = false;
340  }
342  }
343 
345  aux_ = iConfigure.auxiliary();
346  }
347 
349 
350  void PutOnReadInputProductResolver::putProduct(std::unique_ptr<WrapperBase> edp) const {
351  if (status() != defaultStatus()) {
353  << "Attempt to insert more than one product on branch " << branchDescription().branchName() << "\n";
354  }
355 
356  setProduct(std::move(edp)); // ProductResolver takes ownership
357  }
358 
360 
362  bool skipCurrentProcess,
364  ModuleCallingContext const*) const {
365  return resolveProductImpl<false>([]() { return; });
366  }
367 
369  Principal const& principal,
370  bool skipCurrentProcess,
371  ServiceToken const& token,
373  ModuleCallingContext const* mcc) const {}
374 
375  void PutOnReadInputProductResolver::putOrMergeProduct(std::unique_ptr<WrapperBase> edp) const {
376  setOrMergeProduct(std::move(edp), nullptr);
377  }
378 
380  bool skipCurrentProcess,
382  ModuleCallingContext const*) const {
383  if (!skipCurrentProcess) {
384  //'false' means never call the lambda function
385  return resolveProductImpl<false>([]() { return; });
386  }
387  return Resolution(nullptr);
388  }
389 
391  Principal const& principal,
392  bool skipCurrentProcess,
393  ServiceToken const& token,
395  ModuleCallingContext const* mcc) const {
396  if (not skipCurrentProcess) {
397  if (branchDescription().branchType() == InProcess &&
399  // This is an accessInputProcessBlock transition
400  // We cannot access produced products in those transitions
401  // except for in SubProcesses where they should have already run.
402  return;
403  }
405  if (not mcc->parent().isAtEndTransition()) {
406  return;
407  }
408  }
409 
410  if (waitingTasks_) {
411  // using a waiting task to do a callback guarantees that the
412  // waitingTasks_ list (from the worker) will be released from
413  // waiting even if the module does not put this data product
414  // or the module has an exception while running
415  waitingTasks_->add(waitTask);
416  }
417  }
418  }
419 
421  auto worker = iConfigure.findWorker(branchDescription().moduleLabel());
422  if (worker) {
423  waitingTasks_ = &worker->waitingTaskList();
424  }
425  }
426 
428  aux_ = iConfigure.auxiliary();
430  }
431 
433  bool skipCurrentProcess,
435  ModuleCallingContext const*) const {
436  if (!skipCurrentProcess and worker_) {
437  return resolveProductImpl<false>([] {});
438  }
439  return Resolution(nullptr);
440  }
441 
443  Principal const& principal,
444  bool skipCurrentProcess,
445  ServiceToken const& token,
447  ModuleCallingContext const* mcc) const {
448  if (skipCurrentProcess) {
449  return;
450  }
451  if (worker_ == nullptr) {
452  throw cms::Exception("LogicError") << "UnscheduledProductResolver::prefetchAsync_() called with null worker_. "
453  "This should not happen, please contact framework developers.";
454  }
455  //need to try changing prefetchRequested_ before adding to waitingTasks_
456  bool expected = false;
457  bool prefetchRequested = prefetchRequested_.compare_exchange_strong(expected, true);
458  waitingTasks_.add(waitTask);
459  if (prefetchRequested) {
460  //Have to create a new task which will make sure the state for UnscheduledProductResolver
461  // is properly set after the module has run
462  auto t = make_waiting_task([this](std::exception_ptr const* iPtr) {
463  //The exception is being rethrown because resolveProductImpl sets the ProductResolver to a failed
464  // state for the case where an exception occurs during the call to the function.
465  // Caught exception is propagated via WaitingTaskList
466  CMS_SA_ALLOW try {
467  resolveProductImpl<true>([iPtr]() {
468  if (iPtr) {
469  std::rethrow_exception(*iPtr);
470  }
471  });
472  } catch (...) {
473  waitingTasks_.doneWaiting(std::current_exception());
474  return;
475  }
476  waitingTasks_.doneWaiting(nullptr);
477  });
478 
479  ParentContext parentContext(mcc);
482  WaitingTaskHolder(*waitTask.group(), t),
483  info,
484  token,
485  info.principal().streamID(),
486  parentContext,
487  mcc->getStreamContext());
488  }
489  }
490 
492  if (not deleteEarly) {
493  prefetchRequested_ = false;
495  }
497  }
498 
500  aux_ = iConfigure.auxiliary();
502  // worker can be missing if the corresponding module is
503  // unscheduled and none of its products are consumed
504  if (worker_) {
506  }
507  }
508 
510  bool skipCurrentProcess,
512  ModuleCallingContext const*) const {
513  if (!skipCurrentProcess and worker_) {
514  return resolveProductImpl<false>([] {});
515  }
516  return Resolution(nullptr);
517  }
518 
520  Principal const& principal,
521  bool skipCurrentProcess,
522  ServiceToken const& token,
524  ModuleCallingContext const* mcc) const {
525  if (skipCurrentProcess) {
526  return;
527  }
528  if (worker_ == nullptr) {
529  throw cms::Exception("LogicError") << "TransformingProductResolver::prefetchAsync_() called with null worker_. "
530  "This should not happen, please contact framework developers.";
531  }
532  //need to try changing prefetchRequested_ before adding to waitingTasks_
533  bool expected = false;
534  bool prefetchRequested = prefetchRequested_.compare_exchange_strong(expected, true);
535  waitingTasks_.add(waitTask);
536  if (prefetchRequested) {
537  //Have to create a new task which will make sure the state for TransformingProductResolver
538  // is properly set after the module has run
539  auto t = make_waiting_task([this](std::exception_ptr const* iPtr) {
540  //The exception is being rethrown because resolveProductImpl sets the ProductResolver to a failed
541  // state for the case where an exception occurs during the call to the function.
542  // Caught exception is propagated via WaitingTaskList
543  CMS_SA_ALLOW try {
544  resolveProductImpl<true>([iPtr]() {
545  if (iPtr) {
546  std::rethrow_exception(*iPtr);
547  }
548  });
549  } catch (...) {
550  waitingTasks_.doneWaiting(std::current_exception());
551  return;
552  }
553  waitingTasks_.doneWaiting(nullptr);
554  });
555 
556  //This gives a lifetime greater than this call
557  ParentContext parent(mcc);
559 
562  index_,
563  info.principal(),
564  token,
565  info.principal().streamID(),
566  mcc_,
567  mcc->getStreamContext());
568  }
569  }
570 
572  if (not deleteEarly) {
573  prefetchRequested_ = false;
575  }
577  }
578 
579  void ProducedProductResolver::putProduct(std::unique_ptr<WrapperBase> edp) const {
580  if (status() != defaultStatus()) {
582  << "Attempt to insert more than one product on branch " << branchDescription().branchName() << "\n";
583  }
584 
585  setProduct(std::move(edp)); // ProductResolver takes ownership
586  }
587 
588  bool ProducedProductResolver::isFromCurrentProcess() const { return true; }
589 
591 
593  // Check if the types match.
594  TypeID typeID(prod.dynamicTypeInfo());
596  // Types do not match.
598  << "Product on branch " << branchDescription().branchName() << " is of wrong type.\n"
599  << "It is supposed to be of type " << branchDescription().className() << ".\n"
600  << "It is actually of type " << typeID.className() << ".\n";
601  }
602  }
603 
604  void DataManagingProductResolver::setProduct(std::unique_ptr<WrapperBase> edp) const {
605  if (edp) {
606  checkType(*edp);
609  } else {
610  setFailedStatus();
611  }
612  }
613  void DataManagingProductResolver::setProduct(std::shared_ptr<WrapperBase> edp) const {
614  if (edp) {
615  checkType(*edp);
618  } else {
619  setFailedStatus();
620  }
621  }
622 
623  // This routine returns true if it is known that currently there is no real product.
624  // If there is a real product, it returns false.
625  // If it is not known if there is a real product, it returns false.
627  auto presentStatus = status();
628  if (presentStatus == ProductStatus::ProductSet) {
629  return !(getProductData().wrapper()->isPresent());
630  }
631  return presentStatus != ProductStatus::ResolveNotRun;
632  }
633 
635  auto s = status();
636  return (s != defaultStatus()) or (s == ProductStatus::ProductDeleted);
637  }
638 
639  // This routine returns true if the product was deleted early in order to save memory
641 
642  bool DataManagingProductResolver::productWasFetchedAndIsValid_(bool iSkipCurrentProcess) const {
643  if (iSkipCurrentProcess and isFromCurrentProcess()) {
644  return false;
645  }
647  if (getProductData().wrapper()->isPresent()) {
648  return true;
649  }
650  }
651  return false;
652  }
653 
655  productData_.setProvenance(provRetriever);
656  }
657 
659 
661  MergeableRunProductMetadata const* mrpm) {
663  }
664 
666  return provenance()->productProvenance();
667  }
668 
672  }
673  if (deleteEarly) {
675  } else {
676  resetStatus();
677  }
678  }
679 
680  bool DataManagingProductResolver::singleProduct_() const { return true; }
681 
684  }
685 
687 
689  return provenance()->productProvenance();
690  }
691 
693 
694  bool AliasProductResolver::singleProduct_() const { return true; }
695 
696  SwitchBaseProductResolver::SwitchBaseProductResolver(std::shared_ptr<BranchDescription const> bd,
698  : realProduct_(realProduct), productData_(std::move(bd)), prefetchRequested_(false) {
699  // Parentage of this branch is always the same by construction, so we can compute the ID just "once" here.
700  Parentage p;
701  p.setParents(std::vector<BranchID>{realProduct.branchDescription().originalBranchID()});
702  parentageID_ = p.id();
704  }
705 
706  void SwitchBaseProductResolver::connectTo(ProductResolverBase const& iOther, Principal const* iParentPrincipal) {
708  << "SwitchBaseProductResolver::connectTo() not implemented and should never be called.\n"
709  << "Contact a Framework developer\n";
710  }
711 
714  }
715 
717  if (res.data() == nullptr)
718  return res;
719  return Resolution(&productData_);
720  }
721 
723  // SwitchProducer will never put anything in the event, and
724  // "false" will make Event::commit_() to call putProduct() with
725  // null unique_ptr<WrapperBase> to signal that the produce() was
726  // run.
727  return false;
728  }
729 
731  productData_.setProvenance(provRetriever);
732  }
733 
735  // insertIntoSet is const, so let's exploit that to fake the getting of the "input" product
737  }
738 
741  realProduct_.resetProductData_(deleteEarly);
742  if (not deleteEarly) {
743  prefetchRequested_ = false;
745  }
746  }
747 
749  // update provenance
751  // Use the Wrapper of the pointed-to resolver, but the provenance of this resolver
752  productData_.unsafe_setWrapper(realProduct().getProductData().sharedConstWrapper());
753  }
754 
755  SwitchProducerProductResolver::SwitchProducerProductResolver(std::shared_ptr<BranchDescription const> bd,
757  : SwitchBaseProductResolver(std::move(bd), realProduct), status_(defaultStatus_) {}
758 
760  bool skipCurrentProcess,
762  ModuleCallingContext const* mcc) const {
764  return resolveProductImpl(realProduct().resolveProduct(principal, skipCurrentProcess, sra, mcc));
765  }
766  return Resolution(nullptr);
767  }
768 
770  Principal const& principal,
771  bool skipCurrentProcess,
772  ServiceToken const& token,
774  ModuleCallingContext const* mcc) const {
775  if (skipCurrentProcess) {
776  return;
777  }
778  if (branchDescription().availableOnlyAtEndTransition() and mcc and not mcc->parent().isAtEndTransition()) {
779  return;
780  }
781 
782  //need to try changing prefetchRequested before adding to waitingTasks
783  bool expected = false;
784  bool doPrefetchRequested = prefetchRequested().compare_exchange_strong(expected, true);
785  waitingTasks().add(waitTask);
786 
787  if (doPrefetchRequested) {
788  //using a waiting task to do a callback guarantees that
789  // the waitingTasks() list will be released from waiting even
790  // if the module does not put this data product or the
791  // module has an exception while running
792  auto waiting = make_waiting_task([this](std::exception_ptr const* iException) {
793  if (nullptr != iException) {
794  waitingTasks().doneWaiting(*iException);
795  } else {
797  waitingTasks().doneWaiting(std::exception_ptr());
798  }
799  });
800  worker()->callWhenDoneAsync(WaitingTaskHolder(*waitTask.group(), waiting));
801  }
802  }
803 
804  void SwitchProducerProductResolver::putProduct(std::unique_ptr<WrapperBase> edp) const {
805  if (status_ != defaultStatus_) {
807  << "Attempt to insert more than one product for a branch " << branchDescription().branchName()
808  << "This makes no sense for SwitchProducerProductResolver.\nContact a Framework developer";
809  }
810  // Let's use ResolveFailed to signal that produce() was called, as
811  // there is no real product in this resolver
813  bool expected = false;
814  if (prefetchRequested().compare_exchange_strong(expected, true)) {
816  waitingTasks().doneWaiting(std::exception_ptr());
817  }
818  }
819 
821  // if produce() was run (ResolveFailed), ask from the real resolver
823  return realProduct().productUnavailable();
824  }
825  return true;
826  }
827 
830  if (not deleteEarly) {
832  }
833  }
834 
836  bool skipCurrentProcess,
838  ModuleCallingContext const* mcc) const {
839  return resolveProductImpl(realProduct().resolveProduct(principal, skipCurrentProcess, sra, mcc));
840  }
841 
843  Principal const& principal,
844  bool skipCurrentProcess,
845  ServiceToken const& token,
847  ModuleCallingContext const* mcc) const {
848  if (skipCurrentProcess) {
849  return;
850  }
851 
852  //need to try changing prefetchRequested_ before adding to waitingTasks_
853  bool expected = false;
854  bool doPrefetchRequested = prefetchRequested().compare_exchange_strong(expected, true);
855  waitingTasks().add(waitTask);
856 
857  if (doPrefetchRequested) {
858  //using a waiting task to do a callback guarantees that
859  // the waitingTasks() list will be released from waiting even
860  // if the module does not put this data product or the
861  // module has an exception while running
862  auto waiting = make_waiting_task([this](std::exception_ptr const* iException) {
863  if (nullptr != iException) {
864  waitingTasks().doneWaiting(*iException);
865  } else {
867  waitingTasks().doneWaiting(std::exception_ptr());
868  }
869  });
871  WaitingTaskHolder(*waitTask.group(), waiting), principal, skipCurrentProcess, token, sra, mcc);
872  }
873  }
874 
876  provRetriever_ = provRetriever;
877  }
878 
880 
882  return provRetriever_ ? provRetriever_->branchIDToProvenance(bd_->originalBranchID()) : nullptr;
883  }
884 
886 
887  bool ParentProcessProductResolver::singleProduct_() const { return true; }
888 
890  // In principle, this ought to be fixed. I noticed one hits this error
891  // when in a SubProcess and calling the Event::getProvenance function
892  // with a BranchID to a branch from an earlier SubProcess or the top
893  // level process and this branch is not kept in this SubProcess. It might
894  // be possible to hit this in other contexts. I say it ought to be
895  // fixed because one does not encounter this issue if the SubProcesses
896  // are split into genuinely different processes (in principle that
897  // ought to give identical behavior and results). No user has ever
898  // reported this issue which has been around for some time and it was only
899  // noticed when testing some rare corner cases after modifying Core code.
900  // After discussing this with Chris we decided that at least for the moment
901  // there are higher priorities than fixing this ... I converted it so it
902  // causes an exception instead of a seg fault. The issue that may need to
903  // be addressed someday is how ProductResolvers for non-kept branches are
904  // connected to earlier SubProcesses.
906  << "ParentProcessProductResolver::throwNullRealProduct RealProduct pointer not set in this context.\n"
907  << "Contact a Framework developer\n";
908  }
909 
910  NoProcessProductResolver::NoProcessProductResolver(std::vector<ProductResolverIndex> const& matchingHolders,
911  std::vector<bool> const& ambiguous,
912  bool madeAtEnd)
913  : matchingHolders_(matchingHolders),
914  ambiguous_(ambiguous),
915  lastCheckIndex_(ambiguous_.size() + kUnsetOffset),
916  lastSkipCurrentCheckIndex_(lastCheckIndex_.load()),
917  prefetchRequested_(false),
918  skippingPrefetchRequested_(false),
919  madeAtEnd_{madeAtEnd} {
920  assert(ambiguous_.size() == matchingHolders_.size());
921  }
922 
924  Principal const& principal,
925  bool skipCurrentProcess,
927  ModuleCallingContext const* mcc) const {
928  ProductResolverBase const* productResolver = principal.getProductResolverByIndex(matchingHolders_[index]);
929  return productResolver->resolveProduct(principal, skipCurrentProcess, sra, mcc);
930  }
931 
933  bool skipCurrentProcess,
935  ModuleCallingContext const* mcc) const {
936  //See if we've already cached which Resolver we should call or if
937  // we know it is ambiguous
938  const unsigned int choiceSize = ambiguous_.size();
939 
940  //madeAtEnd_==true and not at end transition is the same as skipping the current process
941  if ((not skipCurrentProcess) and (madeAtEnd_ and mcc)) {
942  skipCurrentProcess = not mcc->parent().isAtEndTransition();
943  }
944 
945  unsigned int checkCacheIndex = skipCurrentProcess ? lastSkipCurrentCheckIndex_.load() : lastCheckIndex_.load();
946  if (checkCacheIndex != choiceSize + kUnsetOffset) {
947  if (checkCacheIndex == choiceSize + kAmbiguousOffset) {
949  } else if (checkCacheIndex == choiceSize + kMissingOffset) {
950  return Resolution(nullptr);
951  }
952  return tryResolver(checkCacheIndex, principal, skipCurrentProcess, sra, mcc);
953  }
954 
955  std::atomic<unsigned int>& updateCacheIndex = skipCurrentProcess ? lastSkipCurrentCheckIndex_ : lastCheckIndex_;
956 
957  std::vector<unsigned int> const& lookupProcessOrder = principal.lookupProcessOrder();
958  for (unsigned int k : lookupProcessOrder) {
959  assert(k < ambiguous_.size());
960  if (k == 0)
961  break; // Done
962  if (ambiguous_[k]) {
963  updateCacheIndex = choiceSize + kAmbiguousOffset;
965  }
967  auto resolution = tryResolver(k, principal, skipCurrentProcess, sra, mcc);
968  if (resolution.data() != nullptr) {
969  updateCacheIndex = k;
970  return resolution;
971  }
972  }
973  }
974 
975  updateCacheIndex = choiceSize + kMissingOffset;
976  return Resolution(nullptr);
977  }
978 
980  Principal const& principal,
981  bool skipCurrentProcess,
982  ServiceToken const& token,
984  ModuleCallingContext const* mcc) const {
985  bool timeToMakeAtEnd = true;
986  if (madeAtEnd_ and mcc) {
987  timeToMakeAtEnd = mcc->parent().isAtEndTransition();
988  }
989 
990  //If timeToMakeAtEnd is false, then it is equivalent to skipping the current process
991  if (not skipCurrentProcess and timeToMakeAtEnd) {
992  //need to try changing prefetchRequested_ before adding to waitingTasks_
993  bool expected = false;
994  bool prefetchRequested = prefetchRequested_.compare_exchange_strong(expected, true);
995  waitingTasks_.add(waitTask);
996 
997  if (prefetchRequested) {
998  //we are the first thread to request
999  tryPrefetchResolverAsync(0, principal, false, sra, mcc, token, waitTask.group());
1000  }
1001  } else {
1002  skippingWaitingTasks_.add(waitTask);
1003  bool expected = false;
1004  if (skippingPrefetchRequested_.compare_exchange_strong(expected, true)) {
1005  //we are the first thread to request
1006  tryPrefetchResolverAsync(0, principal, true, sra, mcc, token, waitTask.group());
1007  }
1008  }
1009  }
1010 
1011  void NoProcessProductResolver::setCache(bool iSkipCurrentProcess,
1012  ProductResolverIndex iIndex,
1013  std::exception_ptr iExceptPtr) const {
1014  if (not iSkipCurrentProcess) {
1015  lastCheckIndex_ = iIndex;
1016  waitingTasks_.doneWaiting(iExceptPtr);
1017  } else {
1018  lastSkipCurrentCheckIndex_ = iIndex;
1019  skippingWaitingTasks_.doneWaiting(iExceptPtr);
1020  }
1021  }
1022 
1023  namespace {
1024  class TryNextResolverWaitingTask : public edm::WaitingTask {
1025  public:
1026  TryNextResolverWaitingTask(NoProcessProductResolver const* iResolver,
1027  unsigned int iResolverIndex,
1028  Principal const* iPrincipal,
1030  ModuleCallingContext const* iMCC,
1031  bool iSkipCurrentProcess,
1032  ServiceToken iToken,
1033  oneapi::tbb::task_group* iGroup)
1034  : resolver_(iResolver),
1035  principal_(iPrincipal),
1036  sra_(iSRA),
1037  mcc_(iMCC),
1038  group_(iGroup),
1039  serviceToken_(iToken),
1040  index_(iResolverIndex),
1041  skipCurrentProcess_(iSkipCurrentProcess) {}
1042 
1043  void execute() final {
1044  auto exceptPtr = exceptionPtr();
1045  if (exceptPtr) {
1046  resolver_->prefetchFailed(index_, *principal_, skipCurrentProcess_, exceptPtr);
1047  } else {
1048  if (not resolver_->dataValidFromResolver(index_, *principal_, skipCurrentProcess_)) {
1049  resolver_->tryPrefetchResolverAsync(
1050  index_ + 1, *principal_, skipCurrentProcess_, sra_, mcc_, serviceToken_.lock(), group_);
1051  }
1052  }
1053  }
1054 
1055  private:
1056  NoProcessProductResolver const* resolver_;
1057  Principal const* principal_;
1059  ModuleCallingContext const* mcc_;
1060  oneapi::tbb::task_group* group_;
1061  ServiceWeakToken serviceToken_;
1062  unsigned int index_;
1063  bool skipCurrentProcess_;
1064  };
1065  } // namespace
1066 
1067  void NoProcessProductResolver::prefetchFailed(unsigned int iProcessingIndex,
1068  Principal const& principal,
1069  bool iSkipCurrentProcess,
1070  std::exception_ptr iExceptPtr) const {
1071  std::vector<unsigned int> const& lookupProcessOrder = principal.lookupProcessOrder();
1072  auto k = lookupProcessOrder[iProcessingIndex];
1073 
1074  setCache(iSkipCurrentProcess, k, iExceptPtr);
1075  }
1076 
1077  bool NoProcessProductResolver::dataValidFromResolver(unsigned int iProcessingIndex,
1078  Principal const& principal,
1079  bool iSkipCurrentProcess) const {
1080  std::vector<unsigned int> const& lookupProcessOrder = principal.lookupProcessOrder();
1081  auto k = lookupProcessOrder[iProcessingIndex];
1082  ProductResolverBase const* productResolver = principal.getProductResolverByIndex(matchingHolders_[k]);
1083 
1084  if (productResolver->productWasFetchedAndIsValid(iSkipCurrentProcess)) {
1085  setCache(iSkipCurrentProcess, k, nullptr);
1086  return true;
1087  }
1088  return false;
1089  }
1090 
1091  void NoProcessProductResolver::tryPrefetchResolverAsync(unsigned int iProcessingIndex,
1092  Principal const& principal,
1093  bool skipCurrentProcess,
1095  ModuleCallingContext const* mcc,
1097  oneapi::tbb::task_group* group) const {
1098  std::vector<unsigned int> const& lookupProcessOrder = principal.lookupProcessOrder();
1099  auto index = iProcessingIndex;
1100 
1101  const unsigned int choiceSize = ambiguous_.size();
1102  unsigned int newCacheIndex = choiceSize + kMissingOffset;
1103  while (index < lookupProcessOrder.size()) {
1104  auto k = lookupProcessOrder[index];
1105  if (k == 0) {
1106  break;
1107  }
1108  assert(k < ambiguous_.size());
1109  if (ambiguous_[k]) {
1110  newCacheIndex = choiceSize + kAmbiguousOffset;
1111  break;
1112  }
1114  //make new task
1115 
1116  auto task = new TryNextResolverWaitingTask(this, index, &principal, sra, mcc, skipCurrentProcess, token, group);
1117  WaitingTaskHolder hTask(*group, task);
1118  ProductResolverBase const* productResolver = principal.getProductResolverByIndex(matchingHolders_[k]);
1119 
1120  //Make sure the Services are available on this thread
1122 
1123  productResolver->prefetchAsync(hTask, principal, skipCurrentProcess, token, sra, mcc);
1124  return;
1125  }
1126  ++index;
1127  }
1128  //data product unavailable
1129  setCache(skipCurrentProcess, newCacheIndex, nullptr);
1130  }
1131 
1133 
1135 
1137 
1138  inline unsigned int NoProcessProductResolver::unsetIndexValue() const { return ambiguous_.size() + kUnsetOffset; }
1139 
1141  // This function should never receive 'true'. On the other hand,
1142  // nothing should break if a 'true' is passed, because
1143  // NoProcessProductResolver just forwards the resolve
1144  const auto resetValue = unsetIndexValue();
1145  lastCheckIndex_ = resetValue;
1146  lastSkipCurrentCheckIndex_ = resetValue;
1147  prefetchRequested_ = false;
1149  waitingTasks_.reset();
1151  }
1152 
1153  bool NoProcessProductResolver::singleProduct_() const { return false; }
1154 
1157  << "NoProcessProductResolver::unscheduledWasNotRun_() not implemented and should never be called.\n"
1158  << "Contact a Framework developer\n";
1159  }
1160 
1163  << "NoProcessProductResolver::productUnavailable_() not implemented and should never be called.\n"
1164  << "Contact a Framework developer\n";
1165  }
1166 
1169  << "NoProcessProductResolver::productResolved_() not implemented and should never be called.\n"
1170  << "Contact a Framework developer\n";
1171  }
1172 
1175  << "NoProcessProductResolver::productWasDeleted_() not implemented and should never be called.\n"
1176  << "Contact a Framework developer\n";
1177  }
1178 
1179  bool NoProcessProductResolver::productWasFetchedAndIsValid_(bool /*iSkipCurrentProcess*/) const {
1181  << "NoProcessProductResolver::productWasFetchedAndIsValid_() not implemented and should never be called.\n"
1182  << "Contact a Framework developer\n";
1183  }
1184 
1187  << "NoProcessProductResolver::branchDescription_() not implemented and should never be called.\n"
1188  << "Contact a Framework developer\n";
1189  }
1190 
1191  void NoProcessProductResolver::resetBranchDescription_(std::shared_ptr<BranchDescription const>) {
1193  << "NoProcessProductResolver::resetBranchDescription_() not implemented and should never be called.\n"
1194  << "Contact a Framework developer\n";
1195  }
1196 
1199  << "NoProcessProductResolver::provenance_() not implemented and should never be called.\n"
1200  << "Contact a Framework developer\n";
1201  }
1202 
1205  << "NoProcessProductResolver::connectTo() not implemented and should never be called.\n"
1206  << "Contact a Framework developer\n";
1207  }
1208 
1209  //---- SingleChoiceNoProcessProductResolver ----------------
1211  Principal const& principal,
1212  bool skipCurrentProcess,
1214  ModuleCallingContext const* mcc) const {
1215  //NOTE: Have to lookup the other ProductResolver each time rather than cache
1216  // it's pointer since it appears the pointer can change at some later stage
1218  ->resolveProduct(principal, skipCurrentProcess, sra, mcc);
1219  }
1220 
1222  Principal const& principal,
1223  bool skipCurrentProcess,
1224  ServiceToken const& token,
1226  ModuleCallingContext const* mcc) const {
1228  ->prefetchAsync(waitTask, principal, skipCurrentProcess, token, sra, mcc);
1229  }
1230 
1232 
1234 
1236 
1238 
1240 
1243  << "SingleChoiceNoProcessProductResolver::unscheduledWasNotRun_() not implemented and should never be called.\n"
1244  << "Contact a Framework developer\n";
1245  }
1246 
1249  << "SingleChoiceNoProcessProductResolver::productUnavailable_() not implemented and should never be called.\n"
1250  << "Contact a Framework developer\n";
1251  }
1252 
1255  << "SingleChoiceNoProcessProductResolver::productResolved_() not implemented and should never be called.\n"
1256  << "Contact a Framework developer\n";
1257  }
1258 
1261  << "SingleChoiceNoProcessProductResolver::productWasDeleted_() not implemented and should never be called.\n"
1262  << "Contact a Framework developer\n";
1263  }
1264 
1266  throw Exception(errors::LogicError) << "SingleChoiceNoProcessProductResolver::productWasFetchedAndIsValid_() not "
1267  "implemented and should never be called.\n"
1268  << "Contact a Framework developer\n";
1269  }
1270 
1273  << "SingleChoiceNoProcessProductResolver::branchDescription_() not implemented and should never be called.\n"
1274  << "Contact a Framework developer\n";
1275  }
1276 
1277  void SingleChoiceNoProcessProductResolver::resetBranchDescription_(std::shared_ptr<BranchDescription const>) {
1278  throw Exception(errors::LogicError) << "SingleChoiceNoProcessProductResolver::resetBranchDescription_() not "
1279  "implemented and should never be called.\n"
1280  << "Contact a Framework developer\n";
1281  }
1282 
1285  << "SingleChoiceNoProcessProductResolver::provenance_() not implemented and should never be called.\n"
1286  << "Contact a Framework developer\n";
1287  }
1288 
1291  << "SingleChoiceNoProcessProductResolver::connectTo() not implemented and should never be called.\n"
1292  << "Contact a Framework developer\n";
1293  }
1294 
1295 } // namespace edm
void setProductProvenanceRetriever_(ProductProvenanceRetriever const *provRetriever) override
size
Write out results.
void connectTo(ProductResolverBase const &iOther, Principal const *) final
void setProductID(ProductID const &pid)
ProductData const & getProductData() const final
ProductProvenance const * productProvenancePtr_() const override
void setOrMergeProduct(std::shared_ptr< WrapperBase > prod, MergeableRunProductMetadata const *mergeableRunProductMetadata) const
BranchDescription const & branchDescription_() const override
void setProductProvenanceRetriever_(ProductProvenanceRetriever const *provRetriever) final
std::string const & productInstanceName() const
std::unique_ptr< T, F > make_sentry(T *iObject, F iFunc)
NOTE: if iObject is null, then iFunc will not be called.
Definition: make_sentry.h:30
void resetProductData_(bool deleteEarly) override
static const TGPicture * info(bool iBackgroundIsBlack)
void prefetchAsync_(WaitingTaskHolder waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
#define CMS_SA_ALLOW
void prefetchAsync_(WaitingTaskHolder waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
static constexpr unsigned int kAmbiguousOffset
Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const final
bool singleProduct_() const override
void exceptionContext(cms::Exception &, ESModuleCallingContext const &)
unsigned int ProductResolverIndex
ProductProvenance const * productProvenance() const
Definition: Provenance.cc:24
static constexpr unsigned int kMissingOffset
Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
std::vector< unsigned int > const & lookupProcessOrder() const
Definition: Principal.h:197
std::shared_ptr< BranchDescription const > bd_
Provenance const & provenance() const
Definition: ProductData.h:33
void resetProductData_(bool deleteEarly) override
GlobalContext const * globalContext() const
Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
BranchType const & branchType() const
void setProductID_(ProductID const &pid) final
void setProductProvenanceRetriever_(ProductProvenanceRetriever const *provRetriever) override
Worker * findWorker(std::string const &iLabel) const
void putProduct(std::unique_ptr< WrapperBase > edp) const final
bool dataValidFromResolver(unsigned int iProcessingIndex, Principal const &principal, bool iSkipCurrentProcess) const
void checkType(WrapperBase const &prod) const
std::atomic< bool > prefetchRequested_
void resetProductData_(bool deleteEarly) final
WaitingTaskList & waitingTasks() const
Resolution resolveProduct(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const
void doWorkAsync(WaitingTaskHolder, typename T::TransitionInfoType const &, ServiceToken const &, StreamID, ParentContext const &, typename T::Context const *)
Definition: Worker.h:968
Provenance const * provenance() const
void prefetchAsync_(WaitingTaskHolder waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const final
TypeID unwrappedTypeID() const
void mergeProduct(std::shared_ptr< WrapperBase > edp, MergeableRunProductMetadata const *) const
SwitchProducerProductResolver(std::shared_ptr< BranchDescription const > bd, DataManagingOrAliasProductResolver &realProduct)
reader
Definition: DQM.py:105
void resetProductData_(bool deleteEarly) override
bool productWasFetchedAndIsValid_(bool iSkipCurrentProcess) const override
bool unscheduledWasNotRun_() const override
void prefetchAsync_(WaitingTaskHolder waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
bool singleProduct_() const override
Log< level::Error, false > LogError
bool productUnavailable_() const override
TypeWithDict const & unwrappedType() const
bool isAtEndTransition() const
void setupUnscheduled(UnscheduledConfigurator const &) final
void reset()
Resets access to the resource so that added tasks will wait.
ProductProvenanceRetriever const * provRetriever_
void connectTo(ProductResolverBase const &iOther, Principal const *) final
assert(be >=bs)
std::string const & processName() const
ProductData const & getProductData() const final
WrapperBase const * wrapper() const
Definition: ProductData.h:35
void resetProductData_(bool deleteEarly) override
void connectTo(ProductResolverBase const &, Principal const *) final
void setProductID(ProductID const &pid)
Definition: ProductData.h:58
Definition: Electron.h:6
DataManagingOrAliasProductResolver & realProduct_
UnscheduledAuxiliary const * aux_
void setupUnscheduled(UnscheduledConfigurator const &) final
ProductProvenance const * productProvenancePtr_() const override
UnscheduledAuxiliary const * aux_
oneapi::tbb::task_group * group() const noexcept
bool singleProduct_() const override
void emit(Args &&... args) const
Definition: Signal.h:48
ProductProvenanceLookup const * store() const
Definition: Provenance.h:60
void doTransformAsync(WaitingTaskHolder, size_t iTransformIndex, EventPrincipal const &, ServiceToken const &, StreamID, ModuleCallingContext const &, StreamContext const *)
Definition: Worker.cc:240
void setProductID_(ProductID const &pid) override
MergeDecision getMergeDecision(std::string const &processThatCreatedProduct) const
UnscheduledAuxiliary const * aux_
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
void tryPrefetchResolverAsync(unsigned int iProcessingIndex, Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc, ServiceToken token, oneapi::tbb::task_group *) const
virtual bool isFromCurrentProcess() const =0
void setupUnscheduled(UnscheduledConfigurator const &iConfigure) final
std::string const & moduleLabel() const
SwitchBaseProductResolver(std::shared_ptr< BranchDescription const > bd, DataManagingOrAliasProductResolver &realProduct)
std::string const & className() const
ProductProvenance const * productProvenancePtr_() const final
void prefetchAsync_(WaitingTaskHolder waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const final
std::string const & branchName() const
std::atomic< unsigned int > lastSkipCurrentCheckIndex_
void resetProductData()
Definition: ProductData.h:52
std::atomic< bool > prefetchRequested_
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
DataManagingOrAliasProductResolver const & realProduct() const
bool productWasFetchedAndIsValid_(bool iSkipCurrentProcess) const final
void resetProductData_(bool deleteEarly) override
void add(oneapi::tbb::task_group *, WaitingTask *)
Adds task to the waiting list.
void resetProductData_(bool deleteEarly) override
Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
bool isFromCurrentProcess() const final
bool productWasFetchedAndIsValid(bool iSkipCurrentProcess) const
static constexpr const ProductStatus defaultStatus_
Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const final
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:92
std::atomic< bool > prefetchRequested_
std::atomic< unsigned int > lastCheckIndex_
void callWhenDoneAsync(WaitingTaskHolder task)
Definition: Worker.h:177
void setProductID_(ProductID const &pid) override
ProductProvenance const * branchIDToProvenance(BranchID const &bid) const
void setMergeableRunProductMetadata(MergeableRunProductMetadataBase const *mrpm)
Definition: ProductData.h:60
ProductProvenance const * productProvenancePtr_() const override
void prefetchAsync_(WaitingTaskHolder waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
void putOrMergeProduct(std::unique_ptr< WrapperBase > prod) const override
WrapperBase * unsafe_wrapper() const
Definition: ProductData.h:36
void setProductID_(ProductID const &pid) final
void prefetchAsync_(WaitingTaskHolder waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
ServiceToken lock() const
Definition: ServiceToken.h:101
void prefetchAsync(WaitingTaskHolder waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const
virtual size_t transformIndex(edm::BranchDescription const &) const =0
Definition: Worker.cc:239
void setCache(bool skipCurrentProcess, ProductResolverIndex index, std::exception_ptr exceptionPtr) const
NoProcessProductResolver(std::vector< ProductResolverIndex > const &matchingHolders, std::vector< bool > const &ambiguous, bool madeAtEnd)
std::atomic< bool > skippingPrefetchRequested_
Resolution resolveProductImpl(Resolution) const
std::atomic< ProductStatus > theStatus_
ModuleDescription const * description() const
Definition: Worker.h:198
std::vector< bool > ambiguous_
std::type_info const & unvalidatedTypeInfo() const
Definition: TypeWithDict.h:81
void setProductProvenanceRetriever_(ProductProvenanceRetriever const *provRetriever) override
void resetProductData_(bool deleteEarly) override
static constexpr unsigned int kUnsetOffset
void setProductProvenanceRetriever_(ProductProvenanceRetriever const *provRetriever) override
Provenance const * provenance_() const override
void setupUnscheduled(UnscheduledConfigurator const &) final
void setMergeableRunProductMetadataInProductData(MergeableRunProductMetadata const *)
void setProvenance(ProductProvenanceLookup const *provRetriever)
Definition: ProductData.h:56
bool isPresent() const
Definition: WrapperBase.h:30
def load(fileName)
Definition: svgfig.py:547
FunctorTask< F > * make_functor_task(F f)
Definition: FunctorTask.h:44
ProductStatus defaultStatus() const
BranchDescription const & branchDescription() const
void putProduct(std::unique_ptr< WrapperBase > edp) const override
void prefetchFailed(unsigned int iProcessingIndex, Principal const &principal, bool iSkipCurrentProcess, std::exception_ptr iExceptPtr) const
void setProductID_(ProductID const &pid) override
void setMergeableRunProductMetadata_(MergeableRunProductMetadata const *) override
DataManagingOrAliasProductResolver & realProduct_
signalslot::Signal< void(StreamContext const &, ModuleCallingContext const &)> preModuleDelayedGetSignal_
Transition transition() const
Definition: GlobalContext.h:55
std::vector< ProductResolverIndex > matchingHolders_
void resetProductData_(bool deleteEarly) override
Resolution tryResolver(unsigned int index, Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const
Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
bool availableOnlyAtEndTransition() const
HLT enums.
unsigned int unsetIndexValue() const
StreamContext const * getStreamContext() const
void retrieveAndMerge_(Principal const &principal, MergeableRunProductMetadata const *mergeableRunProductMetadata) const override
void setProduct(std::unique_ptr< WrapperBase > edp) const
void unsafe_setWrapper(std::unique_ptr< WrapperBase > iValue) const
Definition: ProductData.cc:27
Provenance const * provenance_() const override
bool productWasDeleted_() const override
void prefetchAsync_(WaitingTaskHolder waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
void setupUnscheduled(UnscheduledConfigurator const &) final
Resolution resolveProductImpl(FUNC resolver) const
void resetProductData_(bool deleteEarly) override=0
Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const final
std::atomic< bool > prefetchRequested_
Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
Log< level::Warning, false > LogWarning
void resetProductData_(bool deleteEarly) override
signalslot::Signal< void(StreamContext const &, ModuleCallingContext const &)> postModuleDelayedGetSignal_
UnscheduledAuxiliary const * auxiliary() const
std::atomic< bool > & prefetchRequested() const
static ParentageRegistry * instance()
BranchDescription const & branchDescription_() const override
EventTransitionInfo const & eventTransitionInfo() const
bool productWasFetchedAndIsValid_(bool iSkipCurrentProcess) const override
ConstProductResolverPtr getProductResolverByIndex(ProductResolverIndex const &oid) const
Definition: Principal.cc:570
void setProductID_(ProductID const &pid) override
ProductProvenance const * productProvenancePtr_() const override
void prefetchAsync_(WaitingTaskHolder waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const final
void resetBranchDescription_(std::shared_ptr< BranchDescription const > bd) override
bool insertMapped(value_type const &v)
void resetBranchDescription_(std::shared_ptr< BranchDescription const > bd) override
def move(src, dest)
Definition: eostools.py:511
void setProductProvenanceRetriever_(ProductProvenanceRetriever const *provRetriever) final
DelayedReader * reader() const
Definition: Principal.h:187
void putProduct(std::unique_ptr< WrapperBase > edp) const override
void setProductProvenanceRetriever(ProductProvenanceRetriever const *provRetriever)
void insertIntoSet(ProductProvenance provenanceProduct) const
static HepMC::HEPEVT_Wrapper wrapper
ParentContext const & parent() const
BranchID const & originalBranchID() const
Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
void connectTo(ProductResolverBase const &iOther, Principal const *iParentPrincipal) final