CMS 3D CMS Logo

ProductResolvers.cc
Go to the documentation of this file.
1 /*----------------------------------------------------------------------
2 ----------------------------------------------------------------------*/
3 #include "ProductResolvers.h"
23 
24 #include <cassert>
25 #include <utility>
26 
27 static constexpr unsigned int kUnsetOffset = 0;
28 static constexpr unsigned int kAmbiguousOffset = 1;
29 static constexpr unsigned int kMissingOffset = 2;
30 
31 namespace edm {
32 
35  exception << "DataManagingProductResolver::resolveProduct_: The product matching all criteria was already deleted\n"
36  << "Looking for type: " << branchDescription().unwrappedTypeID() << "\n"
37  << "Looking for module label: " << moduleLabel() << "\n"
38  << "Looking for productInstanceName: " << productInstanceName() << "\n"
39  << (processName().empty() ? "" : "Looking for process: ") << processName() << "\n"
40  << "This means there is a configuration error.\n"
41  << "The module which is asking for this data must be configured to state that it will read this data.";
42  throw exception;
43  }
44 
45  //This is a templated function in order to avoid calling another virtual function
46  template <bool callResolver, typename FUNC>
48  if (productWasDeleted()) {
50  }
51  auto presentStatus = status();
52 
53  if (callResolver && presentStatus == ProductStatus::ResolveNotRun) {
54  //if resolver fails because of exception or not setting product
55  // make sure the status goes to failed
56  auto failedStatusSetter = [this](ProductStatus* iPresentStatus) {
57  if (this->status() == ProductStatus::ResolveNotRun) {
58  this->setFailedStatus();
59  }
60  *iPresentStatus = this->status();
61  };
62  std::unique_ptr<ProductStatus, decltype(failedStatusSetter)> failedStatusGuard(&presentStatus,
63  failedStatusSetter);
64 
65  //If successful, this will call setProduct
66  resolver();
67  }
68 
69  if (presentStatus == ProductStatus::ProductSet) {
70  auto pd = &getProductData();
71  if (pd->wrapper()->isPresent()) {
72  return Resolution(pd);
73  }
74  }
75 
76  return Resolution(nullptr);
77  }
78 
80  std::shared_ptr<WrapperBase> iFrom, MergeableRunProductMetadata const* mergeableRunProductMetadata) const {
81  // if its not mergeable and the previous read failed, go ahead and use this one
83  setProduct(std::move(iFrom));
84  return;
85  }
86 
88  if (not iFrom) {
89  return;
90  }
91 
92  checkType(*iFrom);
93 
95  if (original->isMergeable()) {
96  if (original->isPresent() != iFrom->isPresent()) {
98  << "Merge of Run or Lumi product failed for branch " << branchDescription().branchName() << "\n"
99  << "Was trying to merge objects where one product had been put in the input file and the other had not "
100  "been."
101  << "\n"
102  << "The solution is to drop the branch on input. Or better do not create inconsistent files\n"
103  << "that need to be merged in the first place.\n";
104  }
105  if (original->isPresent()) {
107  if (mergeableRunProductMetadata == nullptr || desc.branchType() != InRun) {
108  original->mergeProduct(iFrom.get());
109  } else {
111  mergeableRunProductMetadata->getMergeDecision(desc.processName());
112  if (decision == MergeableRunProductMetadata::MERGE) {
113  original->mergeProduct(iFrom.get());
114  } else if (decision == MergeableRunProductMetadata::REPLACE) {
115  // Note this swaps the content of the product where the
116  // both products branches are present and the product is
117  // also present (was put) in the branch. A module might
118  // have already gotten a pointer to the product so we
119  // keep those pointers valid. This does not call swap
120  // on the Wrapper.
121  original->swapProduct(iFrom.get());
122  }
123  // If the decision is IGNORE, do nothing
124  }
125  }
126  // If both have isPresent false, do nothing
127 
128  } else if (original->hasIsProductEqual()) {
129  if (original->isPresent() && iFrom->isPresent()) {
130  if (!original->isProductEqual(iFrom.get())) {
131  auto const& bd = branchDescription();
132  edm::LogError("RunLumiMerging")
133  << "ProductResolver::mergeTheProduct\n"
134  << "Two run/lumi products for the same run/lumi which should be equal are not\n"
135  << "Using the first, ignoring the second\n"
136  << "className = " << bd.className() << "\n"
137  << "moduleLabel = " << bd.moduleLabel() << "\n"
138  << "instance = " << bd.productInstanceName() << "\n"
139  << "process = " << bd.processName() << "\n";
140  }
141  } else if (!original->isPresent() && iFrom->isPresent()) {
142  setProduct(std::move(iFrom));
143  }
144  // if not iFrom->isPresent(), do nothing
145  } else {
146  auto const& bd = branchDescription();
147  edm::LogWarning("RunLumiMerging") << "ProductResolver::mergeTheProduct\n"
148  << "Run/lumi product has neither a mergeProduct nor isProductEqual function\n"
149  << "Using the first, ignoring the second in merge\n"
150  << "className = " << bd.className() << "\n"
151  << "moduleLabel = " << bd.moduleLabel() << "\n"
152  << "instance = " << bd.productInstanceName() << "\n"
153  << "process = " << bd.processName() << "\n";
154  if (!original->isPresent() && iFrom->isPresent()) {
155  setProduct(std::move(iFrom));
156  }
157  // In other cases, do nothing
158  }
159  }
160 
161  namespace {
162  void extendException(cms::Exception& e, BranchDescription const& bd, ModuleCallingContext const* mcc) {
163  e.addContext(std::string("While reading from source ") + bd.className() + " " + bd.moduleLabel() + " '" +
164  bd.productInstanceName() + "' " + bd.processName());
165  if (mcc) {
166  edm::exceptionContext(e, *mcc);
167  }
168  }
169  } // namespace
171  Principal const& principal, bool, SharedResourcesAcquirer*, ModuleCallingContext const* mcc) const {
172  return resolveProductImpl<true>([this, &principal, mcc]() {
173  auto branchType = principal.branchType();
174  if (branchType == InLumi || branchType == InRun) {
175  //delayed get has not been allowed with Run or Lumis
176  // The file may already be closed so the reader is invalid
177  return;
178  }
179  if (mcc and branchType == InEvent and aux_) {
181  }
182 
183  auto sentry(make_sentry(mcc, [this, branchType](ModuleCallingContext const* iContext) {
184  if (branchType == InEvent and aux_) {
185  aux_->postModuleDelayedGetSignal_.emit(*(iContext->getStreamContext()), *iContext);
186  }
187  }));
188 
189  if (auto reader = principal.reader()) {
190  std::unique_lock<std::recursive_mutex> guard;
191  if (auto sr = reader->sharedResources().second) {
192  guard = std::unique_lock<std::recursive_mutex>(*sr);
193  }
194  if (not productResolved()) {
195  try {
196  //another thread could have beaten us here
197  setProduct(reader->getProduct(branchDescription().branchID(), &principal, mcc));
198  } catch (cms::Exception& e) {
199  extendException(e, branchDescription(), mcc);
200  throw;
201  } catch (std::exception const& e) {
202  auto newExcept = edm::Exception(errors::StdException) << e.what();
203  extendException(newExcept, branchDescription(), mcc);
204  throw newExcept;
205  }
206  }
207  }
208  });
209  }
210 
212  Principal const& principal, MergeableRunProductMetadata const* mergeableRunProductMetadata) const {
213  if (auto reader = principal.reader()) {
214  std::unique_lock<std::recursive_mutex> guard;
215  if (auto sr = reader->sharedResources().second) {
216  guard = std::unique_lock<std::recursive_mutex>(*sr);
217  }
218 
219  //Can't use resolveProductImpl since it first checks to see
220  // if the product was already retrieved and then returns if it is
221  auto edp(reader->getProduct(branchDescription().branchID(), &principal));
222 
223  if (edp.get() != nullptr) {
224  if (edp->isMergeable() && branchDescription().branchType() == InRun && !edp->hasSwap()) {
226  << "Missing definition of member function swap for branch name " << branchDescription().branchName()
227  << "\n"
228  << "Mergeable data types written to a Run must have the swap member function defined"
229  << "\n";
230  }
232  (status() == ProductStatus::ResolveFailed && !branchDescription().isMergeable())) {
233  setOrMergeProduct(std::move(edp), mergeableRunProductMetadata);
234  } else { // status() == ResolveFailed && branchDescription().isMergeable()
236  << "Merge of Run or Lumi product failed for branch " << branchDescription().branchName() << "\n"
237  << "The product branch was dropped in the first run or lumi fragment and present in a later one"
238  << "\n"
239  << "The solution is to drop the branch on input. Or better do not create inconsistent files\n"
240  << "that need to be merged in the first place.\n";
241  }
242  } else if (status() == defaultStatus()) {
243  setFailedStatus();
244  } else if (status() != ProductStatus::ResolveFailed && branchDescription().isMergeable()) {
246  << "Merge of Run or Lumi product failed for branch " << branchDescription().branchName() << "\n"
247  << "The product branch was present in first run or lumi fragment and dropped in a later one"
248  << "\n"
249  << "The solution is to drop the branch on input. Or better do not create inconsistent files\n"
250  << "that need to be merged in the first place.\n";
251  }
252  // Do nothing in other case. status is ResolveFailed already or
253  // it is not mergeable and the status is ProductSet
254  }
255  }
256 
258  std::shared_ptr<WrapperBase> prod, MergeableRunProductMetadata const* mergeableRunProductMetadata) const {
259  if (status() == defaultStatus()) {
260  //resolveProduct has not been called or it failed
262  } else {
263  mergeProduct(std::move(prod), mergeableRunProductMetadata);
264  }
265  }
266 
269  }
270 
272  Principal const& principal,
273  bool skipCurrentProcess,
274  ServiceToken const& token,
276  ModuleCallingContext const* mcc) const {
277  //need to try changing m_prefetchRequested before adding to m_waitingTasks
278  bool expected = false;
279  bool prefetchRequested = m_prefetchRequested.compare_exchange_strong(expected, true);
280  m_waitingTasks.add(waitTask);
281 
282  if (prefetchRequested) {
283  ServiceWeakToken weakToken = token;
284  auto workToDo = [this, mcc, &principal, weakToken]() {
285  //need to make sure Service system is activated on the reading thread
286  ServiceRegistry::Operate operate(weakToken.lock());
287  // Caught exception is propagated via WaitingTaskList
288  CMS_SA_ALLOW try {
289  resolveProductImpl<true>([this, &principal, mcc]() {
290  if (principal.branchType() != InEvent && principal.branchType() != InProcess) {
291  return;
292  }
293  if (auto reader = principal.reader()) {
294  std::unique_lock<std::recursive_mutex> guard;
295  if (auto sr = reader->sharedResources().second) {
296  guard = std::unique_lock<std::recursive_mutex>(*sr);
297  }
298  if (not productResolved()) {
299  try {
300  //another thread could have finished this while we were waiting
301  setProduct(reader->getProduct(branchDescription().branchID(), &principal, mcc));
302  } catch (cms::Exception& e) {
303  extendException(e, branchDescription(), mcc);
304  throw;
305  } catch (std::exception const& e) {
306  auto newExcept = edm::Exception(errors::StdException) << e.what();
307  extendException(newExcept, branchDescription(), mcc);
308  throw newExcept;
309  }
310  }
311  }
312  });
313  } catch (...) {
314  this->m_waitingTasks.doneWaiting(std::current_exception());
315  return;
316  }
317  this->m_waitingTasks.doneWaiting(nullptr);
318  };
319 
320  SerialTaskQueueChain* queue = nullptr;
321  if (auto reader = principal.reader()) {
322  if (auto shared_res = reader->sharedResources().first) {
323  queue = &(shared_res->serialQueueChain());
324  }
325  }
326  if (queue) {
327  queue->push(*waitTask.group(), workToDo);
328  } else {
329  //Have to create a new task
330  auto t = make_functor_task(workToDo);
331  waitTask.group()->run([t]() {
332  TaskSentry s{t};
333  t->execute();
334  });
335  }
336  }
337  }
338 
340  if (not deleteEarly) {
341  m_prefetchRequested = false;
343  }
345  }
346 
348  aux_ = iConfigure.auxiliary();
349  }
350 
352 
353  void PutOnReadInputProductResolver::putProduct(std::unique_ptr<WrapperBase> edp) const {
354  if (status() != defaultStatus()) {
356  << "Attempt to insert more than one product on branch " << branchDescription().branchName() << "\n";
357  }
358 
359  setProduct(std::move(edp)); // ProductResolver takes ownership
360  }
361 
363 
365  bool skipCurrentProcess,
367  ModuleCallingContext const*) const {
368  return resolveProductImpl<false>([]() { return; });
369  }
370 
372  Principal const& principal,
373  bool skipCurrentProcess,
374  ServiceToken const& token,
376  ModuleCallingContext const* mcc) const {}
377 
378  void PutOnReadInputProductResolver::putOrMergeProduct(std::unique_ptr<WrapperBase> edp) const {
379  setOrMergeProduct(std::move(edp), nullptr);
380  }
381 
383  bool skipCurrentProcess,
385  ModuleCallingContext const*) const {
386  if (!skipCurrentProcess) {
387  //'false' means never call the lambda function
388  return resolveProductImpl<false>([]() { return; });
389  }
390  return Resolution(nullptr);
391  }
392 
394  Principal const& principal,
395  bool skipCurrentProcess,
396  ServiceToken const& token,
398  ModuleCallingContext const* mcc) const {
399  if (not skipCurrentProcess) {
400  if (branchDescription().branchType() == InProcess &&
402  // This is an accessInputProcessBlock transition
403  // We cannot access produced products in those transitions
404  // except for in SubProcesses where they should have already run.
405  return;
406  }
408  if (not mcc->parent().isAtEndTransition()) {
409  return;
410  }
411  }
412 
413  if (waitingTasks_) {
414  // using a waiting task to do a callback guarantees that the
415  // waitingTasks_ list (from the worker) will be released from
416  // waiting even if the module does not put this data product
417  // or the module has an exception while running
418  waitingTasks_->add(waitTask);
419  }
420  }
421  }
422 
424  auto worker = iConfigure.findWorker(branchDescription().moduleLabel());
425  if (worker) {
426  waitingTasks_ = &worker->waitingTaskList();
427  }
428  }
429 
431  aux_ = iConfigure.auxiliary();
433  }
434 
436  bool skipCurrentProcess,
438  ModuleCallingContext const*) const {
439  if (!skipCurrentProcess and worker_) {
440  return resolveProductImpl<false>([] {});
441  }
442  return Resolution(nullptr);
443  }
444 
446  Principal const& principal,
447  bool skipCurrentProcess,
448  ServiceToken const& token,
450  ModuleCallingContext const* mcc) const {
451  if (skipCurrentProcess) {
452  return;
453  }
454  if (worker_ == nullptr) {
455  throw cms::Exception("LogicError") << "UnscheduledProductResolver::prefetchAsync_() called with null worker_. "
456  "This should not happen, please contact framework developers.";
457  }
458  //need to try changing prefetchRequested_ before adding to waitingTasks_
459  bool expected = false;
460  bool prefetchRequested = prefetchRequested_.compare_exchange_strong(expected, true);
461  waitingTasks_.add(waitTask);
462  if (prefetchRequested) {
463  //Have to create a new task which will make sure the state for UnscheduledProductResolver
464  // is properly set after the module has run
465  auto t = make_waiting_task([this](std::exception_ptr const* iPtr) {
466  //The exception is being rethrown because resolveProductImpl sets the ProductResolver to a failed
467  // state for the case where an exception occurs during the call to the function.
468  // Caught exception is propagated via WaitingTaskList
469  CMS_SA_ALLOW try {
470  resolveProductImpl<true>([iPtr]() {
471  if (iPtr) {
472  std::rethrow_exception(*iPtr);
473  }
474  });
475  } catch (...) {
476  waitingTasks_.doneWaiting(std::current_exception());
477  return;
478  }
479  waitingTasks_.doneWaiting(nullptr);
480  });
481 
482  ParentContext parentContext(mcc);
485  WaitingTaskHolder(*waitTask.group(), t),
486  info,
487  token,
488  info.principal().streamID(),
489  parentContext,
490  mcc->getStreamContext());
491  }
492  }
493 
495  if (not deleteEarly) {
496  prefetchRequested_ = false;
498  }
500  }
501 
503  aux_ = iConfigure.auxiliary();
505  // worker can be missing if the corresponding module is
506  // unscheduled and none of its products are consumed
507  if (worker_) {
509  }
510  }
511 
513  bool skipCurrentProcess,
515  ModuleCallingContext const*) const {
516  if (!skipCurrentProcess and worker_) {
517  return resolveProductImpl<false>([] {});
518  }
519  return Resolution(nullptr);
520  }
521 
522  void TransformingProductResolver::putProduct(std::unique_ptr<WrapperBase> edp) const {
523  // Override putProduct() to not set the resolver status to
524  // ResolveFailed when the Event::commit_() checks which produced
525  // products were actually produced and which not, because the
526  // transforming products are never produced by time of commit_()
527  // by construction.
528  if (edp) {
530  }
531  }
532 
534  Principal const& principal,
535  bool skipCurrentProcess,
536  ServiceToken const& token,
538  ModuleCallingContext const* mcc) const {
539  if (skipCurrentProcess) {
540  return;
541  }
542  if (worker_ == nullptr) {
543  throw cms::Exception("LogicError") << "TransformingProductResolver::prefetchAsync_() called with null worker_. "
544  "This should not happen, please contact framework developers.";
545  }
546  //need to try changing prefetchRequested_ before adding to waitingTasks_
547  bool expected = false;
548  bool prefetchRequested = prefetchRequested_.compare_exchange_strong(expected, true);
549  waitingTasks_.add(waitTask);
550  if (prefetchRequested) {
551  //Have to create a new task which will make sure the state for TransformingProductResolver
552  // is properly set after the module has run
553  auto t = make_waiting_task([this](std::exception_ptr const* iPtr) {
554  //The exception is being rethrown because resolveProductImpl sets the ProductResolver to a failed
555  // state for the case where an exception occurs during the call to the function.
556  // Caught exception is propagated via WaitingTaskList
557  CMS_SA_ALLOW try {
558  resolveProductImpl<true>([iPtr]() {
559  if (iPtr) {
560  std::rethrow_exception(*iPtr);
561  }
562  });
563  } catch (...) {
564  waitingTasks_.doneWaiting(std::current_exception());
565  return;
566  }
567  waitingTasks_.doneWaiting(nullptr);
568  });
569 
570  //This gives a lifetime greater than this call
571  ParentContext parent(mcc);
573 
576  index_,
577  info.principal(),
578  token,
579  info.principal().streamID(),
580  mcc_,
581  mcc->getStreamContext());
582  }
583  }
584 
586  if (not deleteEarly) {
587  prefetchRequested_ = false;
589  }
591  }
592 
593  void ProducedProductResolver::putProduct(std::unique_ptr<WrapperBase> edp) const {
594  if (status() != defaultStatus()) {
596  << "Attempt to insert more than one product on branch " << branchDescription().branchName() << "\n";
597  }
598 
599  setProduct(std::move(edp)); // ProductResolver takes ownership
600  }
601 
602  bool ProducedProductResolver::isFromCurrentProcess() const { return true; }
603 
605 
607  // Check if the types match.
608  TypeID typeID(prod.dynamicTypeInfo());
610  // Types do not match.
612  << "Product on branch " << branchDescription().branchName() << " is of wrong type.\n"
613  << "It is supposed to be of type " << branchDescription().className() << ".\n"
614  << "It is actually of type " << typeID.className() << ".\n";
615  }
616  }
617 
618  void DataManagingProductResolver::setProduct(std::unique_ptr<WrapperBase> edp) const {
619  if (edp) {
620  checkType(*edp);
623  } else {
624  setFailedStatus();
625  }
626  }
627  void DataManagingProductResolver::setProduct(std::shared_ptr<WrapperBase> edp) const {
628  if (edp) {
629  checkType(*edp);
632  } else {
633  setFailedStatus();
634  }
635  }
636 
637  // This routine returns true if it is known that currently there is no real product.
638  // If there is a real product, it returns false.
639  // If it is not known if there is a real product, it returns false.
641  auto presentStatus = status();
642  if (presentStatus == ProductStatus::ProductSet) {
643  return !(getProductData().wrapper()->isPresent());
644  }
645  return presentStatus != ProductStatus::ResolveNotRun;
646  }
647 
649  auto s = status();
650  return (s != defaultStatus()) or (s == ProductStatus::ProductDeleted);
651  }
652 
653  // This routine returns true if the product was deleted early in order to save memory
655 
656  bool DataManagingProductResolver::productWasFetchedAndIsValid_(bool iSkipCurrentProcess) const {
657  if (iSkipCurrentProcess and isFromCurrentProcess()) {
658  return false;
659  }
661  if (getProductData().wrapper()->isPresent()) {
662  return true;
663  }
664  }
665  return false;
666  }
667 
669  productData_.setProvenance(provRetriever);
670  }
671 
673 
675  MergeableRunProductMetadata const* mrpm) {
677  }
678 
680  return provenance()->productProvenance();
681  }
682 
686  }
687  if (deleteEarly) {
689  } else {
690  resetStatus();
691  }
692  }
693 
694  bool DataManagingProductResolver::singleProduct_() const { return true; }
695 
698  }
699 
701 
703  return provenance()->productProvenance();
704  }
705 
707 
708  bool AliasProductResolver::singleProduct_() const { return true; }
709 
710  SwitchBaseProductResolver::SwitchBaseProductResolver(std::shared_ptr<BranchDescription const> bd,
712  : realProduct_(realProduct), productData_(std::move(bd)), prefetchRequested_(false) {
713  // Parentage of this branch is always the same by construction, so we can compute the ID just "once" here.
714  Parentage p;
715  p.setParents(std::vector<BranchID>{realProduct.branchDescription().originalBranchID()});
716  parentageID_ = p.id();
718  }
719 
720  void SwitchBaseProductResolver::connectTo(ProductResolverBase const& iOther, Principal const* iParentPrincipal) {
722  << "SwitchBaseProductResolver::connectTo() not implemented and should never be called.\n"
723  << "Contact a Framework developer\n";
724  }
725 
728  }
729 
731  if (res.data() == nullptr)
732  return res;
733  return Resolution(&productData_);
734  }
735 
737  // SwitchProducer will never put anything in the event, and
738  // "false" will make Event::commit_() to call putProduct() with
739  // null unique_ptr<WrapperBase> to signal that the produce() was
740  // run.
741  return false;
742  }
743 
745  productData_.setProvenance(provRetriever);
746  }
747 
749  // insertIntoSet is const, so let's exploit that to fake the getting of the "input" product
751  }
752 
755  realProduct_.resetProductData_(deleteEarly);
756  if (not deleteEarly) {
757  prefetchRequested_ = false;
759  }
760  }
761 
763  // update provenance
765  // Use the Wrapper of the pointed-to resolver, but the provenance of this resolver
766  productData_.unsafe_setWrapper(realProduct().getProductData().sharedConstWrapper());
767  }
768 
769  SwitchProducerProductResolver::SwitchProducerProductResolver(std::shared_ptr<BranchDescription const> bd,
771  : SwitchBaseProductResolver(std::move(bd), realProduct), status_(defaultStatus_) {}
772 
774  bool skipCurrentProcess,
776  ModuleCallingContext const* mcc) const {
778  return resolveProductImpl(realProduct().resolveProduct(principal, skipCurrentProcess, sra, mcc));
779  }
780  return Resolution(nullptr);
781  }
782 
784  Principal const& principal,
785  bool skipCurrentProcess,
786  ServiceToken const& token,
788  ModuleCallingContext const* mcc) const {
789  if (skipCurrentProcess) {
790  return;
791  }
792  if (branchDescription().availableOnlyAtEndTransition() and mcc and not mcc->parent().isAtEndTransition()) {
793  return;
794  }
795 
796  //need to try changing prefetchRequested before adding to waitingTasks
797  bool expected = false;
798  bool doPrefetchRequested = prefetchRequested().compare_exchange_strong(expected, true);
799  waitingTasks().add(waitTask);
800 
801  if (doPrefetchRequested) {
802  //using a waiting task to do a callback guarantees that
803  // the waitingTasks() list will be released from waiting even
804  // if the module does not put this data product or the
805  // module has an exception while running
806  auto waiting = make_waiting_task([this](std::exception_ptr const* iException) {
807  if (nullptr != iException) {
808  waitingTasks().doneWaiting(*iException);
809  } else {
811  waitingTasks().doneWaiting(std::exception_ptr());
812  }
813  });
814  worker()->callWhenDoneAsync(WaitingTaskHolder(*waitTask.group(), waiting));
815  }
816  }
817 
818  void SwitchProducerProductResolver::putProduct(std::unique_ptr<WrapperBase> edp) const {
819  if (status_ != defaultStatus_) {
821  << "Attempt to insert more than one product for a branch " << branchDescription().branchName()
822  << "This makes no sense for SwitchProducerProductResolver.\nContact a Framework developer";
823  }
824  // Let's use ResolveFailed to signal that produce() was called, as
825  // there is no real product in this resolver
827  bool expected = false;
828  if (prefetchRequested().compare_exchange_strong(expected, true)) {
830  waitingTasks().doneWaiting(std::exception_ptr());
831  }
832  }
833 
835  // if produce() was run (ResolveFailed), ask from the real resolver
837  return realProduct().productUnavailable();
838  }
839  return true;
840  }
841 
844  if (not deleteEarly) {
846  }
847  }
848 
850  bool skipCurrentProcess,
852  ModuleCallingContext const* mcc) const {
853  return resolveProductImpl(realProduct().resolveProduct(principal, skipCurrentProcess, sra, mcc));
854  }
855 
857  Principal const& principal,
858  bool skipCurrentProcess,
859  ServiceToken const& token,
861  ModuleCallingContext const* mcc) const {
862  if (skipCurrentProcess) {
863  return;
864  }
865 
866  //need to try changing prefetchRequested_ before adding to waitingTasks_
867  bool expected = false;
868  bool doPrefetchRequested = prefetchRequested().compare_exchange_strong(expected, true);
869  waitingTasks().add(waitTask);
870 
871  if (doPrefetchRequested) {
872  //using a waiting task to do a callback guarantees that
873  // the waitingTasks() list will be released from waiting even
874  // if the module does not put this data product or the
875  // module has an exception while running
876  auto waiting = make_waiting_task([this](std::exception_ptr const* iException) {
877  if (nullptr != iException) {
878  waitingTasks().doneWaiting(*iException);
879  } else {
881  waitingTasks().doneWaiting(std::exception_ptr());
882  }
883  });
885  WaitingTaskHolder(*waitTask.group(), waiting), principal, skipCurrentProcess, token, sra, mcc);
886  }
887  }
888 
890  provRetriever_ = provRetriever;
891  }
892 
894 
896  return provRetriever_ ? provRetriever_->branchIDToProvenance(bd_->originalBranchID()) : nullptr;
897  }
898 
900 
901  bool ParentProcessProductResolver::singleProduct_() const { return true; }
902 
904  // In principle, this ought to be fixed. I noticed one hits this error
905  // when in a SubProcess and calling the Event::getProvenance function
906  // with a BranchID to a branch from an earlier SubProcess or the top
907  // level process and this branch is not kept in this SubProcess. It might
908  // be possible to hit this in other contexts. I say it ought to be
909  // fixed because one does not encounter this issue if the SubProcesses
910  // are split into genuinely different processes (in principle that
911  // ought to give identical behavior and results). No user has ever
912  // reported this issue which has been around for some time and it was only
913  // noticed when testing some rare corner cases after modifying Core code.
914  // After discussing this with Chris we decided that at least for the moment
915  // there are higher priorities than fixing this ... I converted it so it
916  // causes an exception instead of a seg fault. The issue that may need to
917  // be addressed someday is how ProductResolvers for non-kept branches are
918  // connected to earlier SubProcesses.
920  << "ParentProcessProductResolver::throwNullRealProduct RealProduct pointer not set in this context.\n"
921  << "Contact a Framework developer\n";
922  }
923 
924  NoProcessProductResolver::NoProcessProductResolver(std::vector<ProductResolverIndex> const& matchingHolders,
925  std::vector<bool> const& ambiguous,
926  bool madeAtEnd)
927  : matchingHolders_(matchingHolders),
928  ambiguous_(ambiguous),
929  lastCheckIndex_(ambiguous_.size() + kUnsetOffset),
930  lastSkipCurrentCheckIndex_(lastCheckIndex_.load()),
931  prefetchRequested_(false),
932  skippingPrefetchRequested_(false),
933  madeAtEnd_{madeAtEnd} {
934  assert(ambiguous_.size() == matchingHolders_.size());
935  }
936 
938  Principal const& principal,
939  bool skipCurrentProcess,
941  ModuleCallingContext const* mcc) const {
942  ProductResolverBase const* productResolver = principal.getProductResolverByIndex(matchingHolders_[index]);
943  return productResolver->resolveProduct(principal, skipCurrentProcess, sra, mcc);
944  }
945 
947  bool skipCurrentProcess,
949  ModuleCallingContext const* mcc) const {
950  //See if we've already cached which Resolver we should call or if
951  // we know it is ambiguous
952  const unsigned int choiceSize = ambiguous_.size();
953 
954  //madeAtEnd_==true and not at end transition is the same as skipping the current process
955  if ((not skipCurrentProcess) and (madeAtEnd_ and mcc)) {
956  skipCurrentProcess = not mcc->parent().isAtEndTransition();
957  }
958 
959  unsigned int checkCacheIndex = skipCurrentProcess ? lastSkipCurrentCheckIndex_.load() : lastCheckIndex_.load();
960  if (checkCacheIndex != choiceSize + kUnsetOffset) {
961  if (checkCacheIndex == choiceSize + kAmbiguousOffset) {
963  } else if (checkCacheIndex == choiceSize + kMissingOffset) {
964  return Resolution(nullptr);
965  }
966  return tryResolver(checkCacheIndex, principal, skipCurrentProcess, sra, mcc);
967  }
968 
969  std::atomic<unsigned int>& updateCacheIndex = skipCurrentProcess ? lastSkipCurrentCheckIndex_ : lastCheckIndex_;
970 
971  std::vector<unsigned int> const& lookupProcessOrder = principal.lookupProcessOrder();
972  for (unsigned int k : lookupProcessOrder) {
973  assert(k < ambiguous_.size());
974  if (k == 0)
975  break; // Done
976  if (ambiguous_[k]) {
977  updateCacheIndex = choiceSize + kAmbiguousOffset;
979  }
981  auto resolution = tryResolver(k, principal, skipCurrentProcess, sra, mcc);
982  if (resolution.data() != nullptr) {
983  updateCacheIndex = k;
984  return resolution;
985  }
986  }
987  }
988 
989  updateCacheIndex = choiceSize + kMissingOffset;
990  return Resolution(nullptr);
991  }
992 
994  Principal const& principal,
995  bool skipCurrentProcess,
996  ServiceToken const& token,
998  ModuleCallingContext const* mcc) const {
999  bool timeToMakeAtEnd = true;
1000  if (madeAtEnd_ and mcc) {
1001  timeToMakeAtEnd = mcc->parent().isAtEndTransition();
1002  }
1003 
1004  //If timeToMakeAtEnd is false, then it is equivalent to skipping the current process
1005  if (not skipCurrentProcess and timeToMakeAtEnd) {
1006  //need to try changing prefetchRequested_ before adding to waitingTasks_
1007  bool expected = false;
1008  bool prefetchRequested = prefetchRequested_.compare_exchange_strong(expected, true);
1009  waitingTasks_.add(waitTask);
1010 
1011  if (prefetchRequested) {
1012  //we are the first thread to request
1013  tryPrefetchResolverAsync(0, principal, false, sra, mcc, token, waitTask.group());
1014  }
1015  } else {
1016  skippingWaitingTasks_.add(waitTask);
1017  bool expected = false;
1018  if (skippingPrefetchRequested_.compare_exchange_strong(expected, true)) {
1019  //we are the first thread to request
1020  tryPrefetchResolverAsync(0, principal, true, sra, mcc, token, waitTask.group());
1021  }
1022  }
1023  }
1024 
1025  void NoProcessProductResolver::setCache(bool iSkipCurrentProcess,
1026  ProductResolverIndex iIndex,
1027  std::exception_ptr iExceptPtr) const {
1028  if (not iSkipCurrentProcess) {
1029  lastCheckIndex_ = iIndex;
1030  waitingTasks_.doneWaiting(iExceptPtr);
1031  } else {
1032  lastSkipCurrentCheckIndex_ = iIndex;
1033  skippingWaitingTasks_.doneWaiting(iExceptPtr);
1034  }
1035  }
1036 
1037  namespace {
1038  class TryNextResolverWaitingTask : public edm::WaitingTask {
1039  public:
1040  TryNextResolverWaitingTask(NoProcessProductResolver const* iResolver,
1041  unsigned int iResolverIndex,
1042  Principal const* iPrincipal,
1044  ModuleCallingContext const* iMCC,
1045  bool iSkipCurrentProcess,
1046  ServiceToken iToken,
1047  oneapi::tbb::task_group* iGroup)
1048  : resolver_(iResolver),
1049  principal_(iPrincipal),
1050  sra_(iSRA),
1051  mcc_(iMCC),
1052  group_(iGroup),
1053  serviceToken_(iToken),
1054  index_(iResolverIndex),
1055  skipCurrentProcess_(iSkipCurrentProcess) {}
1056 
1057  void execute() final {
1058  auto exceptPtr = exceptionPtr();
1059  if (exceptPtr) {
1060  resolver_->prefetchFailed(index_, *principal_, skipCurrentProcess_, exceptPtr);
1061  } else {
1062  if (not resolver_->dataValidFromResolver(index_, *principal_, skipCurrentProcess_)) {
1063  resolver_->tryPrefetchResolverAsync(
1064  index_ + 1, *principal_, skipCurrentProcess_, sra_, mcc_, serviceToken_.lock(), group_);
1065  }
1066  }
1067  }
1068 
1069  private:
1070  NoProcessProductResolver const* resolver_;
1071  Principal const* principal_;
1073  ModuleCallingContext const* mcc_;
1074  oneapi::tbb::task_group* group_;
1075  ServiceWeakToken serviceToken_;
1076  unsigned int index_;
1077  bool skipCurrentProcess_;
1078  };
1079  } // namespace
1080 
1081  void NoProcessProductResolver::prefetchFailed(unsigned int iProcessingIndex,
1082  Principal const& principal,
1083  bool iSkipCurrentProcess,
1084  std::exception_ptr iExceptPtr) const {
1085  std::vector<unsigned int> const& lookupProcessOrder = principal.lookupProcessOrder();
1086  auto k = lookupProcessOrder[iProcessingIndex];
1087 
1088  setCache(iSkipCurrentProcess, k, iExceptPtr);
1089  }
1090 
1091  bool NoProcessProductResolver::dataValidFromResolver(unsigned int iProcessingIndex,
1092  Principal const& principal,
1093  bool iSkipCurrentProcess) const {
1094  std::vector<unsigned int> const& lookupProcessOrder = principal.lookupProcessOrder();
1095  auto k = lookupProcessOrder[iProcessingIndex];
1096  ProductResolverBase const* productResolver = principal.getProductResolverByIndex(matchingHolders_[k]);
1097 
1098  if (productResolver->productWasFetchedAndIsValid(iSkipCurrentProcess)) {
1099  setCache(iSkipCurrentProcess, k, nullptr);
1100  return true;
1101  }
1102  return false;
1103  }
1104 
1105  void NoProcessProductResolver::tryPrefetchResolverAsync(unsigned int iProcessingIndex,
1106  Principal const& principal,
1107  bool skipCurrentProcess,
1109  ModuleCallingContext const* mcc,
1111  oneapi::tbb::task_group* group) const {
1112  std::vector<unsigned int> const& lookupProcessOrder = principal.lookupProcessOrder();
1113  auto index = iProcessingIndex;
1114 
1115  const unsigned int choiceSize = ambiguous_.size();
1116  unsigned int newCacheIndex = choiceSize + kMissingOffset;
1117  while (index < lookupProcessOrder.size()) {
1118  auto k = lookupProcessOrder[index];
1119  if (k == 0) {
1120  break;
1121  }
1122  assert(k < ambiguous_.size());
1123  if (ambiguous_[k]) {
1124  newCacheIndex = choiceSize + kAmbiguousOffset;
1125  break;
1126  }
1128  //make new task
1129 
1130  auto task = new TryNextResolverWaitingTask(this, index, &principal, sra, mcc, skipCurrentProcess, token, group);
1131  WaitingTaskHolder hTask(*group, task);
1132  ProductResolverBase const* productResolver = principal.getProductResolverByIndex(matchingHolders_[k]);
1133 
1134  //Make sure the Services are available on this thread
1136 
1137  productResolver->prefetchAsync(hTask, principal, skipCurrentProcess, token, sra, mcc);
1138  return;
1139  }
1140  ++index;
1141  }
1142  //data product unavailable
1143  setCache(skipCurrentProcess, newCacheIndex, nullptr);
1144  }
1145 
1147 
1149 
1151 
1152  inline unsigned int NoProcessProductResolver::unsetIndexValue() const { return ambiguous_.size() + kUnsetOffset; }
1153 
1155  // This function should never receive 'true'. On the other hand,
1156  // nothing should break if a 'true' is passed, because
1157  // NoProcessProductResolver just forwards the resolve
1158  const auto resetValue = unsetIndexValue();
1159  lastCheckIndex_ = resetValue;
1160  lastSkipCurrentCheckIndex_ = resetValue;
1161  prefetchRequested_ = false;
1163  waitingTasks_.reset();
1165  }
1166 
1167  bool NoProcessProductResolver::singleProduct_() const { return false; }
1168 
1171  << "NoProcessProductResolver::unscheduledWasNotRun_() not implemented and should never be called.\n"
1172  << "Contact a Framework developer\n";
1173  }
1174 
1177  << "NoProcessProductResolver::productUnavailable_() not implemented and should never be called.\n"
1178  << "Contact a Framework developer\n";
1179  }
1180 
1183  << "NoProcessProductResolver::productResolved_() not implemented and should never be called.\n"
1184  << "Contact a Framework developer\n";
1185  }
1186 
1189  << "NoProcessProductResolver::productWasDeleted_() not implemented and should never be called.\n"
1190  << "Contact a Framework developer\n";
1191  }
1192 
1193  bool NoProcessProductResolver::productWasFetchedAndIsValid_(bool /*iSkipCurrentProcess*/) const {
1195  << "NoProcessProductResolver::productWasFetchedAndIsValid_() not implemented and should never be called.\n"
1196  << "Contact a Framework developer\n";
1197  }
1198 
1201  << "NoProcessProductResolver::branchDescription_() not implemented and should never be called.\n"
1202  << "Contact a Framework developer\n";
1203  }
1204 
1205  void NoProcessProductResolver::resetBranchDescription_(std::shared_ptr<BranchDescription const>) {
1207  << "NoProcessProductResolver::resetBranchDescription_() not implemented and should never be called.\n"
1208  << "Contact a Framework developer\n";
1209  }
1210 
1213  << "NoProcessProductResolver::provenance_() not implemented and should never be called.\n"
1214  << "Contact a Framework developer\n";
1215  }
1216 
1219  << "NoProcessProductResolver::connectTo() not implemented and should never be called.\n"
1220  << "Contact a Framework developer\n";
1221  }
1222 
1223  //---- SingleChoiceNoProcessProductResolver ----------------
1225  Principal const& principal,
1226  bool skipCurrentProcess,
1228  ModuleCallingContext const* mcc) const {
1229  //NOTE: Have to lookup the other ProductResolver each time rather than cache
1230  // it's pointer since it appears the pointer can change at some later stage
1232  ->resolveProduct(principal, skipCurrentProcess, sra, mcc);
1233  }
1234 
1236  Principal const& principal,
1237  bool skipCurrentProcess,
1238  ServiceToken const& token,
1240  ModuleCallingContext const* mcc) const {
1242  ->prefetchAsync(waitTask, principal, skipCurrentProcess, token, sra, mcc);
1243  }
1244 
1246 
1248 
1250 
1252 
1254 
1257  << "SingleChoiceNoProcessProductResolver::unscheduledWasNotRun_() not implemented and should never be called.\n"
1258  << "Contact a Framework developer\n";
1259  }
1260 
1263  << "SingleChoiceNoProcessProductResolver::productUnavailable_() not implemented and should never be called.\n"
1264  << "Contact a Framework developer\n";
1265  }
1266 
1269  << "SingleChoiceNoProcessProductResolver::productResolved_() not implemented and should never be called.\n"
1270  << "Contact a Framework developer\n";
1271  }
1272 
1275  << "SingleChoiceNoProcessProductResolver::productWasDeleted_() not implemented and should never be called.\n"
1276  << "Contact a Framework developer\n";
1277  }
1278 
1280  throw Exception(errors::LogicError) << "SingleChoiceNoProcessProductResolver::productWasFetchedAndIsValid_() not "
1281  "implemented and should never be called.\n"
1282  << "Contact a Framework developer\n";
1283  }
1284 
1287  << "SingleChoiceNoProcessProductResolver::branchDescription_() not implemented and should never be called.\n"
1288  << "Contact a Framework developer\n";
1289  }
1290 
1291  void SingleChoiceNoProcessProductResolver::resetBranchDescription_(std::shared_ptr<BranchDescription const>) {
1292  throw Exception(errors::LogicError) << "SingleChoiceNoProcessProductResolver::resetBranchDescription_() not "
1293  "implemented and should never be called.\n"
1294  << "Contact a Framework developer\n";
1295  }
1296 
1299  << "SingleChoiceNoProcessProductResolver::provenance_() not implemented and should never be called.\n"
1300  << "Contact a Framework developer\n";
1301  }
1302 
1305  << "SingleChoiceNoProcessProductResolver::connectTo() not implemented and should never be called.\n"
1306  << "Contact a Framework developer\n";
1307  }
1308 
1309 } // namespace edm
void setProductProvenanceRetriever_(ProductProvenanceRetriever const *provRetriever) override
size
Write out results.
void connectTo(ProductResolverBase const &iOther, Principal const *) final
void setProductID(ProductID const &pid)
ProductData const & getProductData() const final
ProductProvenance const * productProvenancePtr_() const override
void setOrMergeProduct(std::shared_ptr< WrapperBase > prod, MergeableRunProductMetadata const *mergeableRunProductMetadata) const
BranchDescription const & branchDescription_() const override
void setProductProvenanceRetriever_(ProductProvenanceRetriever const *provRetriever) final
std::string const & productInstanceName() const
std::unique_ptr< T, F > make_sentry(T *iObject, F iFunc)
NOTE: if iObject is null, then iFunc will not be called.
Definition: make_sentry.h:30
void resetProductData_(bool deleteEarly) override
void putProduct(std::unique_ptr< WrapperBase > edp) const override
static const TGPicture * info(bool iBackgroundIsBlack)
void prefetchAsync_(WaitingTaskHolder waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
#define CMS_SA_ALLOW
void prefetchAsync_(WaitingTaskHolder waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
static constexpr unsigned int kAmbiguousOffset
Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const final
bool singleProduct_() const override
void exceptionContext(cms::Exception &, ESModuleCallingContext const &)
unsigned int ProductResolverIndex
ProductProvenance const * productProvenance() const
Definition: Provenance.cc:24
static constexpr unsigned int kMissingOffset
Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
std::vector< unsigned int > const & lookupProcessOrder() const
Definition: Principal.h:197
std::shared_ptr< BranchDescription const > bd_
Provenance const & provenance() const
Definition: ProductData.h:33
void resetProductData_(bool deleteEarly) override
GlobalContext const * globalContext() const
Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
BranchType const & branchType() const
void setProductID_(ProductID const &pid) final
void setProductProvenanceRetriever_(ProductProvenanceRetriever const *provRetriever) override
Worker * findWorker(std::string const &iLabel) const
void putProduct(std::unique_ptr< WrapperBase > edp) const final
bool dataValidFromResolver(unsigned int iProcessingIndex, Principal const &principal, bool iSkipCurrentProcess) const
void checkType(WrapperBase const &prod) const
std::atomic< bool > prefetchRequested_
void resetProductData_(bool deleteEarly) final
WaitingTaskList & waitingTasks() const
Resolution resolveProduct(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const
void doWorkAsync(WaitingTaskHolder, typename T::TransitionInfoType const &, ServiceToken const &, StreamID, ParentContext const &, typename T::Context const *)
Definition: Worker.h:968
Provenance const * provenance() const
void prefetchAsync_(WaitingTaskHolder waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const final
TypeID unwrappedTypeID() const
void mergeProduct(std::shared_ptr< WrapperBase > edp, MergeableRunProductMetadata const *) const
SwitchProducerProductResolver(std::shared_ptr< BranchDescription const > bd, DataManagingOrAliasProductResolver &realProduct)
reader
Definition: DQM.py:105
void resetProductData_(bool deleteEarly) override
bool productWasFetchedAndIsValid_(bool iSkipCurrentProcess) const override
bool unscheduledWasNotRun_() const override
void prefetchAsync_(WaitingTaskHolder waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
bool singleProduct_() const override
Log< level::Error, false > LogError
bool productUnavailable_() const override
TypeWithDict const & unwrappedType() const
bool isAtEndTransition() const
void setupUnscheduled(UnscheduledConfigurator const &) final
void reset()
Resets access to the resource so that added tasks will wait.
ProductProvenanceRetriever const * provRetriever_
void connectTo(ProductResolverBase const &iOther, Principal const *) final
assert(be >=bs)
std::string const & processName() const
ProductData const & getProductData() const final
WrapperBase const * wrapper() const
Definition: ProductData.h:35
void resetProductData_(bool deleteEarly) override
void connectTo(ProductResolverBase const &, Principal const *) final
void setProductID(ProductID const &pid)
Definition: ProductData.h:58
Definition: Electron.h:6
DataManagingOrAliasProductResolver & realProduct_
UnscheduledAuxiliary const * aux_
void setupUnscheduled(UnscheduledConfigurator const &) final
ProductProvenance const * productProvenancePtr_() const override
UnscheduledAuxiliary const * aux_
oneapi::tbb::task_group * group() const noexcept
bool singleProduct_() const override
void emit(Args &&... args) const
Definition: Signal.h:48
ProductProvenanceLookup const * store() const
Definition: Provenance.h:60
void doTransformAsync(WaitingTaskHolder, size_t iTransformIndex, EventPrincipal const &, ServiceToken const &, StreamID, ModuleCallingContext const &, StreamContext const *)
Definition: Worker.cc:240
void setProductID_(ProductID const &pid) override
MergeDecision getMergeDecision(std::string const &processThatCreatedProduct) const
UnscheduledAuxiliary const * aux_
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
void tryPrefetchResolverAsync(unsigned int iProcessingIndex, Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc, ServiceToken token, oneapi::tbb::task_group *) const
virtual bool isFromCurrentProcess() const =0
void setupUnscheduled(UnscheduledConfigurator const &iConfigure) final
std::string const & moduleLabel() const
SwitchBaseProductResolver(std::shared_ptr< BranchDescription const > bd, DataManagingOrAliasProductResolver &realProduct)
std::string const & className() const
ProductProvenance const * productProvenancePtr_() const final
void prefetchAsync_(WaitingTaskHolder waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const final
std::string const & branchName() const
std::atomic< unsigned int > lastSkipCurrentCheckIndex_
void resetProductData()
Definition: ProductData.h:52
std::atomic< bool > prefetchRequested_
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
DataManagingOrAliasProductResolver const & realProduct() const
bool productWasFetchedAndIsValid_(bool iSkipCurrentProcess) const final
void resetProductData_(bool deleteEarly) override
void add(oneapi::tbb::task_group *, WaitingTask *)
Adds task to the waiting list.
void resetProductData_(bool deleteEarly) override
Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
bool isFromCurrentProcess() const final
bool productWasFetchedAndIsValid(bool iSkipCurrentProcess) const
static constexpr const ProductStatus defaultStatus_
Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const final
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:92
std::atomic< bool > prefetchRequested_
std::atomic< unsigned int > lastCheckIndex_
void callWhenDoneAsync(WaitingTaskHolder task)
Definition: Worker.h:177
void setProductID_(ProductID const &pid) override
ProductProvenance const * branchIDToProvenance(BranchID const &bid) const
void setMergeableRunProductMetadata(MergeableRunProductMetadataBase const *mrpm)
Definition: ProductData.h:60
ProductProvenance const * productProvenancePtr_() const override
void prefetchAsync_(WaitingTaskHolder waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
void putOrMergeProduct(std::unique_ptr< WrapperBase > prod) const override
WrapperBase * unsafe_wrapper() const
Definition: ProductData.h:36
void setProductID_(ProductID const &pid) final
void prefetchAsync_(WaitingTaskHolder waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
ServiceToken lock() const
Definition: ServiceToken.h:101
void prefetchAsync(WaitingTaskHolder waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const
virtual size_t transformIndex(edm::BranchDescription const &) const =0
Definition: Worker.cc:239
void setCache(bool skipCurrentProcess, ProductResolverIndex index, std::exception_ptr exceptionPtr) const
NoProcessProductResolver(std::vector< ProductResolverIndex > const &matchingHolders, std::vector< bool > const &ambiguous, bool madeAtEnd)
std::atomic< bool > skippingPrefetchRequested_
Resolution resolveProductImpl(Resolution) const
std::atomic< ProductStatus > theStatus_
ModuleDescription const * description() const
Definition: Worker.h:198
std::vector< bool > ambiguous_
std::type_info const & unvalidatedTypeInfo() const
Definition: TypeWithDict.h:81
void setProductProvenanceRetriever_(ProductProvenanceRetriever const *provRetriever) override
void resetProductData_(bool deleteEarly) override
static constexpr unsigned int kUnsetOffset
void setProductProvenanceRetriever_(ProductProvenanceRetriever const *provRetriever) override
Provenance const * provenance_() const override
void setupUnscheduled(UnscheduledConfigurator const &) final
void setMergeableRunProductMetadataInProductData(MergeableRunProductMetadata const *)
void setProvenance(ProductProvenanceLookup const *provRetriever)
Definition: ProductData.h:56
bool isPresent() const
Definition: WrapperBase.h:30
def load(fileName)
Definition: svgfig.py:547
FunctorTask< F > * make_functor_task(F f)
Definition: FunctorTask.h:44
ProductStatus defaultStatus() const
BranchDescription const & branchDescription() const
void putProduct(std::unique_ptr< WrapperBase > edp) const override
void prefetchFailed(unsigned int iProcessingIndex, Principal const &principal, bool iSkipCurrentProcess, std::exception_ptr iExceptPtr) const
void setProductID_(ProductID const &pid) override
void setMergeableRunProductMetadata_(MergeableRunProductMetadata const *) override
DataManagingOrAliasProductResolver & realProduct_
signalslot::Signal< void(StreamContext const &, ModuleCallingContext const &)> preModuleDelayedGetSignal_
Transition transition() const
Definition: GlobalContext.h:55
std::vector< ProductResolverIndex > matchingHolders_
void resetProductData_(bool deleteEarly) override
Resolution tryResolver(unsigned int index, Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const
Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
bool availableOnlyAtEndTransition() const
HLT enums.
unsigned int unsetIndexValue() const
StreamContext const * getStreamContext() const
void retrieveAndMerge_(Principal const &principal, MergeableRunProductMetadata const *mergeableRunProductMetadata) const override
void setProduct(std::unique_ptr< WrapperBase > edp) const
void unsafe_setWrapper(std::unique_ptr< WrapperBase > iValue) const
Definition: ProductData.cc:27
Provenance const * provenance_() const override
bool productWasDeleted_() const override
void prefetchAsync_(WaitingTaskHolder waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
void setupUnscheduled(UnscheduledConfigurator const &) final
Resolution resolveProductImpl(FUNC resolver) const
void resetProductData_(bool deleteEarly) override=0
Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const final
std::atomic< bool > prefetchRequested_
Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
Log< level::Warning, false > LogWarning
void resetProductData_(bool deleteEarly) override
signalslot::Signal< void(StreamContext const &, ModuleCallingContext const &)> postModuleDelayedGetSignal_
UnscheduledAuxiliary const * auxiliary() const
std::atomic< bool > & prefetchRequested() const
static ParentageRegistry * instance()
BranchDescription const & branchDescription_() const override
EventTransitionInfo const & eventTransitionInfo() const
bool productWasFetchedAndIsValid_(bool iSkipCurrentProcess) const override
ConstProductResolverPtr getProductResolverByIndex(ProductResolverIndex const &oid) const
Definition: Principal.cc:570
void setProductID_(ProductID const &pid) override
ProductProvenance const * productProvenancePtr_() const override
void prefetchAsync_(WaitingTaskHolder waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const final
void resetBranchDescription_(std::shared_ptr< BranchDescription const > bd) override
bool insertMapped(value_type const &v)
void resetBranchDescription_(std::shared_ptr< BranchDescription const > bd) override
def move(src, dest)
Definition: eostools.py:511
void setProductProvenanceRetriever_(ProductProvenanceRetriever const *provRetriever) final
DelayedReader * reader() const
Definition: Principal.h:187
void putProduct(std::unique_ptr< WrapperBase > edp) const override
void setProductProvenanceRetriever(ProductProvenanceRetriever const *provRetriever)
void insertIntoSet(ProductProvenance provenanceProduct) const
static HepMC::HEPEVT_Wrapper wrapper
ParentContext const & parent() const
BranchID const & originalBranchID() const
Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
void connectTo(ProductResolverBase const &iOther, Principal const *iParentPrincipal) final