CMS 3D CMS Logo

ProductResolvers.cc
Go to the documentation of this file.
1 /*----------------------------------------------------------------------
2 ----------------------------------------------------------------------*/
3 #include "ProductResolvers.h"
23 
24 #include <cassert>
25 #include <utility>
26 
27 static constexpr unsigned int kUnsetOffset = 0;
28 static constexpr unsigned int kAmbiguousOffset = 1;
29 static constexpr unsigned int kMissingOffset = 2;
30 
31 namespace edm {
32 
35  exception << "DataManagingProductResolver::resolveProduct_: The product matching all criteria was already deleted\n"
36  << "Looking for type: " << branchDescription().unwrappedTypeID() << "\n"
37  << "Looking for module label: " << moduleLabel() << "\n"
38  << "Looking for productInstanceName: " << productInstanceName() << "\n"
39  << (processName().empty() ? "" : "Looking for process: ") << processName() << "\n"
40  << "This means there is a configuration error.\n"
41  << "The module which is asking for this data must be configured to state that it will read this data.";
42  throw exception;
43  }
44 
45  //This is a templated function in order to avoid calling another virtual function
46  template <bool callResolver, typename FUNC>
48  if (productWasDeleted()) {
50  }
51  auto presentStatus = status();
52 
53  if (callResolver && presentStatus == ProductStatus::ResolveNotRun) {
54  //if resolver fails because of exception or not setting product
55  // make sure the status goes to failed
56  auto failedStatusSetter = [this](ProductStatus* iPresentStatus) {
57  if (this->status() == ProductStatus::ResolveNotRun) {
58  this->setFailedStatus();
59  }
60  *iPresentStatus = this->status();
61  };
62  std::unique_ptr<ProductStatus, decltype(failedStatusSetter)> failedStatusGuard(&presentStatus,
63  failedStatusSetter);
64 
65  //If successful, this will call setProduct
66  resolver();
67  }
68 
69  if (presentStatus == ProductStatus::ProductSet) {
70  auto pd = &getProductData();
71  if (pd->wrapper()->isPresent()) {
72  return Resolution(pd);
73  }
74  }
75 
76  return Resolution(nullptr);
77  }
78 
80  std::shared_ptr<WrapperBase> iFrom, MergeableRunProductMetadata const* mergeableRunProductMetadata) const {
81  // if its not mergeable and the previous read failed, go ahead and use this one
83  setProduct(std::move(iFrom));
84  return;
85  }
86 
88  if (not iFrom) {
89  return;
90  }
91 
92  checkType(*iFrom);
93 
95  if (original->isMergeable()) {
96  if (original->isPresent() != iFrom->isPresent()) {
98  << "Merge of Run or Lumi product failed for branch " << branchDescription().branchName() << "\n"
99  << "Was trying to merge objects where one product had been put in the input file and the other had not "
100  "been."
101  << "\n"
102  << "The solution is to drop the branch on input. Or better do not create inconsistent files\n"
103  << "that need to be merged in the first place.\n";
104  }
105  if (original->isPresent()) {
107  if (mergeableRunProductMetadata == nullptr || desc.branchType() != InRun) {
108  original->mergeProduct(iFrom.get());
109  } else {
111  mergeableRunProductMetadata->getMergeDecision(desc.processName());
112  if (decision == MergeableRunProductMetadata::MERGE) {
113  original->mergeProduct(iFrom.get());
114  } else if (decision == MergeableRunProductMetadata::REPLACE) {
115  // Note this swaps the content of the product where the
116  // both products branches are present and the product is
117  // also present (was put) in the branch. A module might
118  // have already gotten a pointer to the product so we
119  // keep those pointers valid. This does not call swap
120  // on the Wrapper.
121  original->swapProduct(iFrom.get());
122  }
123  // If the decision is IGNORE, do nothing
124  }
125  }
126  // If both have isPresent false, do nothing
127 
128  } else if (original->hasIsProductEqual()) {
129  if (original->isPresent() && iFrom->isPresent()) {
130  if (!original->isProductEqual(iFrom.get())) {
131  auto const& bd = branchDescription();
132  edm::LogError("RunLumiMerging")
133  << "ProductResolver::mergeTheProduct\n"
134  << "Two run/lumi products for the same run/lumi which should be equal are not\n"
135  << "Using the first, ignoring the second\n"
136  << "className = " << bd.className() << "\n"
137  << "moduleLabel = " << bd.moduleLabel() << "\n"
138  << "instance = " << bd.productInstanceName() << "\n"
139  << "process = " << bd.processName() << "\n";
140  }
141  } else if (!original->isPresent() && iFrom->isPresent()) {
142  setProduct(std::move(iFrom));
143  }
144  // if not iFrom->isPresent(), do nothing
145  } else {
146  auto const& bd = branchDescription();
147  edm::LogWarning("RunLumiMerging") << "ProductResolver::mergeTheProduct\n"
148  << "Run/lumi product has neither a mergeProduct nor isProductEqual function\n"
149  << "Using the first, ignoring the second in merge\n"
150  << "className = " << bd.className() << "\n"
151  << "moduleLabel = " << bd.moduleLabel() << "\n"
152  << "instance = " << bd.productInstanceName() << "\n"
153  << "process = " << bd.processName() << "\n";
154  if (!original->isPresent() && iFrom->isPresent()) {
155  setProduct(std::move(iFrom));
156  }
157  // In other cases, do nothing
158  }
159  }
160 
161  namespace {
162  void extendException(cms::Exception& e, BranchDescription const& bd, ModuleCallingContext const* mcc) {
163  e.addContext(std::string("While reading from source ") + bd.className() + " " + bd.moduleLabel() + " '" +
164  bd.productInstanceName() + "' " + bd.processName());
165  if (mcc) {
166  edm::exceptionContext(e, *mcc);
167  }
168  }
169  } // namespace
171  Principal const& principal, bool, SharedResourcesAcquirer*, ModuleCallingContext const* mcc) const {
172  return resolveProductImpl<true>([this, &principal, mcc]() {
173  auto branchType = principal.branchType();
174  if (branchType == InLumi || branchType == InRun) {
175  //delayed get has not been allowed with Run or Lumis
176  // The file may already be closed so the reader is invalid
177  return;
178  }
179  if (mcc and branchType == InEvent and aux_) {
181  }
182 
183  auto sentry(make_sentry(mcc, [this, branchType](ModuleCallingContext const* iContext) {
184  if (branchType == InEvent and aux_) {
185  aux_->postModuleDelayedGetSignal_.emit(*(iContext->getStreamContext()), *iContext);
186  }
187  }));
188 
189  if (auto reader = principal.reader()) {
190  std::unique_lock<std::recursive_mutex> guard;
191  if (auto sr = reader->sharedResources().second) {
192  guard = std::unique_lock<std::recursive_mutex>(*sr);
193  }
194  if (not productResolved()) {
195  try {
196  //another thread could have beaten us here
197  setProduct(reader->getProduct(branchDescription().branchID(), &principal, mcc));
198  } catch (cms::Exception& e) {
199  extendException(e, branchDescription(), mcc);
200  throw;
201  } catch (std::exception const& e) {
202  auto newExcept = edm::Exception(errors::StdException) << e.what();
203  extendException(newExcept, branchDescription(), mcc);
204  throw newExcept;
205  }
206  }
207  }
208  });
209  }
210 
212  Principal const& principal, MergeableRunProductMetadata const* mergeableRunProductMetadata) const {
213  if (auto reader = principal.reader()) {
214  std::unique_lock<std::recursive_mutex> guard;
215  if (auto sr = reader->sharedResources().second) {
216  guard = std::unique_lock<std::recursive_mutex>(*sr);
217  }
218 
219  //Can't use resolveProductImpl since it first checks to see
220  // if the product was already retrieved and then returns if it is
221  auto edp(reader->getProduct(branchDescription().branchID(), &principal));
222 
223  if (edp.get() != nullptr) {
224  if (edp->isMergeable() && branchDescription().branchType() == InRun && !edp->hasSwap()) {
226  << "Missing definition of member function swap for branch name " << branchDescription().branchName()
227  << "\n"
228  << "Mergeable data types written to a Run must have the swap member function defined"
229  << "\n";
230  }
232  (status() == ProductStatus::ResolveFailed && !branchDescription().isMergeable())) {
233  setOrMergeProduct(std::move(edp), mergeableRunProductMetadata);
234  } else { // status() == ResolveFailed && branchDescription().isMergeable()
236  << "Merge of Run or Lumi product failed for branch " << branchDescription().branchName() << "\n"
237  << "The product branch was dropped in the first run or lumi fragment and present in a later one"
238  << "\n"
239  << "The solution is to drop the branch on input. Or better do not create inconsistent files\n"
240  << "that need to be merged in the first place.\n";
241  }
242  } else if (status() == defaultStatus()) {
243  setFailedStatus();
244  } else if (status() != ProductStatus::ResolveFailed && branchDescription().isMergeable()) {
246  << "Merge of Run or Lumi product failed for branch " << branchDescription().branchName() << "\n"
247  << "The product branch was present in first run or lumi fragment and dropped in a later one"
248  << "\n"
249  << "The solution is to drop the branch on input. Or better do not create inconsistent files\n"
250  << "that need to be merged in the first place.\n";
251  }
252  // Do nothing in other case. status is ResolveFailed already or
253  // it is not mergeable and the status is ProductSet
254  }
255  }
256 
258  std::shared_ptr<WrapperBase> prod, MergeableRunProductMetadata const* mergeableRunProductMetadata) const {
259  if (status() == defaultStatus()) {
260  //resolveProduct has not been called or it failed
262  } else {
263  mergeProduct(std::move(prod), mergeableRunProductMetadata);
264  }
265  }
266 
269  }
270 
272  Principal const& principal,
273  bool skipCurrentProcess,
274  ServiceToken const& token,
276  ModuleCallingContext const* mcc) const {
277  //need to try changing m_prefetchRequested before adding to m_waitingTasks
278  bool expected = false;
279  bool prefetchRequested = m_prefetchRequested.compare_exchange_strong(expected, true);
280  m_waitingTasks.add(waitTask);
281 
282  if (prefetchRequested) {
283  ServiceWeakToken weakToken = token;
284  auto workToDo = [this, mcc, &principal, weakToken]() {
285  //need to make sure Service system is activated on the reading thread
286  ServiceRegistry::Operate operate(weakToken.lock());
287  // Caught exception is propagated via WaitingTaskList
288  CMS_SA_ALLOW try {
289  resolveProductImpl<true>([this, &principal, mcc]() {
290  if (principal.branchType() != InEvent && principal.branchType() != InProcess) {
291  return;
292  }
293  if (auto reader = principal.reader()) {
294  std::unique_lock<std::recursive_mutex> guard;
295  if (auto sr = reader->sharedResources().second) {
296  guard = std::unique_lock<std::recursive_mutex>(*sr);
297  }
298  if (not productResolved()) {
299  try {
300  //another thread could have finished this while we were waiting
301  setProduct(reader->getProduct(branchDescription().branchID(), &principal, mcc));
302  } catch (cms::Exception& e) {
303  extendException(e, branchDescription(), mcc);
304  throw;
305  } catch (std::exception const& e) {
306  auto newExcept = edm::Exception(errors::StdException) << e.what();
307  extendException(newExcept, branchDescription(), mcc);
308  throw newExcept;
309  }
310  }
311  }
312  });
313  } catch (...) {
314  this->m_waitingTasks.doneWaiting(std::current_exception());
315  return;
316  }
317  this->m_waitingTasks.doneWaiting(nullptr);
318  };
319 
320  SerialTaskQueueChain* queue = nullptr;
321  if (auto reader = principal.reader()) {
322  if (auto shared_res = reader->sharedResources().first) {
323  queue = &(shared_res->serialQueueChain());
324  }
325  }
326  if (queue) {
327  queue->push(*waitTask.group(), workToDo);
328  } else {
329  //Have to create a new task
330  auto t = make_functor_task(workToDo);
331  waitTask.group()->run([t]() {
332  TaskSentry s{t};
333  t->execute();
334  });
335  }
336  }
337  }
338 
340  if (not deleteEarly) {
341  m_prefetchRequested = false;
343  }
345  }
346 
348  aux_ = iConfigure.auxiliary();
349  }
350 
352 
353  void PutOnReadInputProductResolver::putProduct(std::unique_ptr<WrapperBase> edp) const {
354  if (status() != defaultStatus()) {
356  << "Attempt to insert more than one product on branch " << branchDescription().branchName() << "\n";
357  }
358 
359  setProduct(std::move(edp)); // ProductResolver takes ownership
360  }
361 
363 
365  bool skipCurrentProcess,
367  ModuleCallingContext const*) const {
368  return resolveProductImpl<false>([]() { return; });
369  }
370 
372  Principal const& principal,
373  bool skipCurrentProcess,
374  ServiceToken const& token,
376  ModuleCallingContext const* mcc) const {}
377 
378  void PutOnReadInputProductResolver::putOrMergeProduct(std::unique_ptr<WrapperBase> edp) const {
379  setOrMergeProduct(std::move(edp), nullptr);
380  }
381 
383  bool skipCurrentProcess,
385  ModuleCallingContext const*) const {
386  if (!skipCurrentProcess) {
387  //'false' means never call the lambda function
388  return resolveProductImpl<false>([]() { return; });
389  }
390  return Resolution(nullptr);
391  }
392 
394  Principal const& principal,
395  bool skipCurrentProcess,
396  ServiceToken const& token,
398  ModuleCallingContext const* mcc) const {
399  if (not skipCurrentProcess) {
400  if (branchDescription().branchType() == InProcess &&
402  // This is an accessInputProcessBlock transition
403  // We cannot access produced products in those transitions
404  // except for in SubProcesses where they should have already run.
405  return;
406  }
408  if (not mcc->parent().isAtEndTransition()) {
409  return;
410  }
411  }
412 
413  if (waitingTasks_) {
414  // using a waiting task to do a callback guarantees that the
415  // waitingTasks_ list (from the worker) will be released from
416  // waiting even if the module does not put this data product
417  // or the module has an exception while running
418  waitingTasks_->add(waitTask);
419  }
420  }
421  }
422 
424  auto worker = iConfigure.findWorker(branchDescription().moduleLabel());
425  if (worker) {
426  waitingTasks_ = &worker->waitingTaskList();
427  }
428  }
429 
431  aux_ = iConfigure.auxiliary();
433  }
434 
436  bool skipCurrentProcess,
438  ModuleCallingContext const*) const {
439  if (!skipCurrentProcess and worker_) {
440  return resolveProductImpl<false>([] {});
441  }
442  return Resolution(nullptr);
443  }
444 
446  Principal const& principal,
447  bool skipCurrentProcess,
448  ServiceToken const& token,
450  ModuleCallingContext const* mcc) const {
451  if (skipCurrentProcess) {
452  return;
453  }
454  if (worker_ == nullptr) {
455  throw cms::Exception("LogicError") << "UnscheduledProductResolver::prefetchAsync_() called with null worker_. "
456  "This should not happen, please contact framework developers.";
457  }
458  //need to try changing prefetchRequested_ before adding to waitingTasks_
459  bool expected = false;
460  bool prefetchRequested = prefetchRequested_.compare_exchange_strong(expected, true);
461  waitingTasks_.add(waitTask);
462  if (prefetchRequested) {
463  //Have to create a new task which will make sure the state for UnscheduledProductResolver
464  // is properly set after the module has run
465  auto t = make_waiting_task([this](std::exception_ptr const* iPtr) {
466  //The exception is being rethrown because resolveProductImpl sets the ProductResolver to a failed
467  // state for the case where an exception occurs during the call to the function.
468  // Caught exception is propagated via WaitingTaskList
469  CMS_SA_ALLOW try {
470  resolveProductImpl<true>([iPtr]() {
471  if (iPtr) {
472  std::rethrow_exception(*iPtr);
473  }
474  });
475  } catch (...) {
476  waitingTasks_.doneWaiting(std::current_exception());
477  return;
478  }
479  waitingTasks_.doneWaiting(nullptr);
480  });
481 
482  ParentContext parentContext(mcc);
485  WaitingTaskHolder(*waitTask.group(), t),
486  info,
487  token,
488  info.principal().streamID(),
489  parentContext,
490  mcc->getStreamContext());
491  }
492  }
493 
495  if (not deleteEarly) {
496  prefetchRequested_ = false;
498  }
500  }
501 
503  aux_ = iConfigure.auxiliary();
505  // worker can be missing if the corresponding module is
506  // unscheduled and none of its products are consumed
507  if (worker_) {
509  }
510  }
511 
513  bool skipCurrentProcess,
515  ModuleCallingContext const*) const {
516  if (!skipCurrentProcess and worker_) {
517  return resolveProductImpl<false>([] {});
518  }
519  return Resolution(nullptr);
520  }
521 
522  void TransformingProductResolver::putProduct(std::unique_ptr<WrapperBase> edp) const {
523  // Override putProduct() to not set the resolver status to
524  // ResolveFailed when the Event::commit_() checks which produced
525  // products were actually produced and which not, because the
526  // transforming products are never produced by time of commit_()
527  // by construction.
528  if (edp) {
530  }
531  }
532 
534  Principal const& principal,
535  bool skipCurrentProcess,
536  ServiceToken const& token,
538  ModuleCallingContext const* mcc) const {
539  if (skipCurrentProcess) {
540  return;
541  }
542  if (worker_ == nullptr) {
543  throw cms::Exception("LogicError") << "TransformingProductResolver::prefetchAsync_() called with null worker_. "
544  "This should not happen, please contact framework developers.";
545  }
546  //need to try changing prefetchRequested_ before adding to waitingTasks_
547  bool expected = false;
548  bool prefetchRequested = prefetchRequested_.compare_exchange_strong(expected, true);
549  waitingTasks_.add(waitTask);
550  if (prefetchRequested) {
551  //Have to create a new task which will make sure the state for TransformingProductResolver
552  // is properly set after the module has run
553  auto t = make_waiting_task([this](std::exception_ptr const* iPtr) {
554  //The exception is being rethrown because resolveProductImpl sets the ProductResolver to a failed
555  // state for the case where an exception occurs during the call to the function.
556  // Caught exception is propagated via WaitingTaskList
557  CMS_SA_ALLOW try {
558  resolveProductImpl<true>([iPtr]() {
559  if (iPtr) {
560  std::rethrow_exception(*iPtr);
561  }
562  });
563  } catch (...) {
564  waitingTasks_.doneWaiting(std::current_exception());
565  return;
566  }
567  waitingTasks_.doneWaiting(nullptr);
568  });
569 
570  //This gives a lifetime greater than this call
571  ParentContext parent(mcc);
574 
577  index_,
578  info.principal(),
579  token,
580  info.principal().streamID(),
581  mcc_,
582  mcc->getStreamContext());
583  }
584  }
585 
587  if (not deleteEarly) {
588  prefetchRequested_ = false;
590  }
592  }
593 
594  void ProducedProductResolver::putProduct(std::unique_ptr<WrapperBase> edp) const {
595  if (status() != defaultStatus()) {
597  << "Attempt to insert more than one product on branch " << branchDescription().branchName() << "\n";
598  }
599 
600  setProduct(std::move(edp)); // ProductResolver takes ownership
601  }
602 
603  bool ProducedProductResolver::isFromCurrentProcess() const { return true; }
604 
606 
608  // Check if the types match.
609  TypeID typeID(prod.dynamicTypeInfo());
611  // Types do not match.
613  << "Product on branch " << branchDescription().branchName() << " is of wrong type.\n"
614  << "It is supposed to be of type " << branchDescription().className() << ".\n"
615  << "It is actually of type " << typeID.className() << ".\n";
616  }
617  }
618 
619  void DataManagingProductResolver::setProduct(std::unique_ptr<WrapperBase> edp) const {
620  if (edp) {
621  checkType(*edp);
624  } else {
625  setFailedStatus();
626  }
627  }
628  void DataManagingProductResolver::setProduct(std::shared_ptr<WrapperBase> edp) const {
629  if (edp) {
630  checkType(*edp);
633  } else {
634  setFailedStatus();
635  }
636  }
637 
638  // This routine returns true if it is known that currently there is no real product.
639  // If there is a real product, it returns false.
640  // If it is not known if there is a real product, it returns false.
642  auto presentStatus = status();
643  if (presentStatus == ProductStatus::ProductSet) {
644  return !(getProductData().wrapper()->isPresent());
645  }
646  return presentStatus != ProductStatus::ResolveNotRun;
647  }
648 
650  auto s = status();
651  return (s != defaultStatus()) or (s == ProductStatus::ProductDeleted);
652  }
653 
654  // This routine returns true if the product was deleted early in order to save memory
656 
657  bool DataManagingProductResolver::productWasFetchedAndIsValid_(bool iSkipCurrentProcess) const {
658  if (iSkipCurrentProcess and isFromCurrentProcess()) {
659  return false;
660  }
662  if (getProductData().wrapper()->isPresent()) {
663  return true;
664  }
665  }
666  return false;
667  }
668 
670  productData_.setProvenance(provRetriever);
671  }
672 
674 
676  MergeableRunProductMetadata const* mrpm) {
678  }
679 
681  return provenance()->productProvenance();
682  }
683 
687  }
688  if (deleteEarly) {
690  } else {
691  resetStatus();
692  }
693  }
694 
695  bool DataManagingProductResolver::singleProduct_() const { return true; }
696 
699  }
700 
702 
704  return provenance()->productProvenance();
705  }
706 
708 
709  bool AliasProductResolver::singleProduct_() const { return true; }
710 
711  SwitchBaseProductResolver::SwitchBaseProductResolver(std::shared_ptr<BranchDescription const> bd,
713  : realProduct_(realProduct), productData_(std::move(bd)), prefetchRequested_(false) {
714  // Parentage of this branch is always the same by construction, so we can compute the ID just "once" here.
715  Parentage p;
716  p.setParents(std::vector<BranchID>{realProduct.branchDescription().originalBranchID()});
717  parentageID_ = p.id();
719  }
720 
721  void SwitchBaseProductResolver::connectTo(ProductResolverBase const& iOther, Principal const* iParentPrincipal) {
723  << "SwitchBaseProductResolver::connectTo() not implemented and should never be called.\n"
724  << "Contact a Framework developer\n";
725  }
726 
729  }
730 
732  if (res.data() == nullptr)
733  return res;
734  return Resolution(&productData_);
735  }
736 
738  // SwitchProducer will never put anything in the event, and
739  // "false" will make Event::commit_() to call putProduct() with
740  // null unique_ptr<WrapperBase> to signal that the produce() was
741  // run.
742  return false;
743  }
744 
746  productData_.setProvenance(provRetriever);
747  }
748 
750  // insertIntoSet is const, so let's exploit that to fake the getting of the "input" product
752  }
753 
756  realProduct_.resetProductData_(deleteEarly);
757  if (not deleteEarly) {
758  prefetchRequested_ = false;
760  }
761  }
762 
764  // update provenance
766  // Use the Wrapper of the pointed-to resolver, but the provenance of this resolver
767  productData_.unsafe_setWrapper(realProduct().getProductData().sharedConstWrapper());
768  }
769 
770  SwitchProducerProductResolver::SwitchProducerProductResolver(std::shared_ptr<BranchDescription const> bd,
772  : SwitchBaseProductResolver(std::move(bd), realProduct), status_(defaultStatus_) {}
773 
775  bool skipCurrentProcess,
777  ModuleCallingContext const* mcc) const {
779  return resolveProductImpl(realProduct().resolveProduct(principal, skipCurrentProcess, sra, mcc));
780  }
781  return Resolution(nullptr);
782  }
783 
785  Principal const& principal,
786  bool skipCurrentProcess,
787  ServiceToken const& token,
789  ModuleCallingContext const* mcc) const {
790  if (skipCurrentProcess) {
791  return;
792  }
793  if (branchDescription().availableOnlyAtEndTransition() and mcc and not mcc->parent().isAtEndTransition()) {
794  return;
795  }
796 
797  //need to try changing prefetchRequested before adding to waitingTasks
798  bool expected = false;
799  bool doPrefetchRequested = prefetchRequested().compare_exchange_strong(expected, true);
800  waitingTasks().add(waitTask);
801 
802  if (doPrefetchRequested) {
803  //using a waiting task to do a callback guarantees that
804  // the waitingTasks() list will be released from waiting even
805  // if the module does not put this data product or the
806  // module has an exception while running
807  auto waiting = make_waiting_task([this](std::exception_ptr const* iException) {
808  if (nullptr != iException) {
809  waitingTasks().doneWaiting(*iException);
810  } else {
812  waitingTasks().doneWaiting(std::exception_ptr());
813  }
814  });
815  worker()->callWhenDoneAsync(WaitingTaskHolder(*waitTask.group(), waiting));
816  }
817  }
818 
819  void SwitchProducerProductResolver::putProduct(std::unique_ptr<WrapperBase> edp) const {
820  if (status_ != defaultStatus_) {
822  << "Attempt to insert more than one product for a branch " << branchDescription().branchName()
823  << "This makes no sense for SwitchProducerProductResolver.\nContact a Framework developer";
824  }
825  // Let's use ResolveFailed to signal that produce() was called, as
826  // there is no real product in this resolver
828  bool expected = false;
829  if (prefetchRequested().compare_exchange_strong(expected, true)) {
831  waitingTasks().doneWaiting(std::exception_ptr());
832  }
833  }
834 
836  // if produce() was run (ResolveFailed), ask from the real resolver
838  return realProduct().productUnavailable();
839  }
840  return true;
841  }
842 
845  if (not deleteEarly) {
847  }
848  }
849 
851  bool skipCurrentProcess,
853  ModuleCallingContext const* mcc) const {
854  return resolveProductImpl(realProduct().resolveProduct(principal, skipCurrentProcess, sra, mcc));
855  }
856 
858  Principal const& principal,
859  bool skipCurrentProcess,
860  ServiceToken const& token,
862  ModuleCallingContext const* mcc) const {
863  if (skipCurrentProcess) {
864  return;
865  }
866 
867  //need to try changing prefetchRequested_ before adding to waitingTasks_
868  bool expected = false;
869  bool doPrefetchRequested = prefetchRequested().compare_exchange_strong(expected, true);
870  waitingTasks().add(waitTask);
871 
872  if (doPrefetchRequested) {
873  //using a waiting task to do a callback guarantees that
874  // the waitingTasks() list will be released from waiting even
875  // if the module does not put this data product or the
876  // module has an exception while running
877  auto waiting = make_waiting_task([this](std::exception_ptr const* iException) {
878  if (nullptr != iException) {
879  waitingTasks().doneWaiting(*iException);
880  } else {
882  waitingTasks().doneWaiting(std::exception_ptr());
883  }
884  });
886  WaitingTaskHolder(*waitTask.group(), waiting), principal, skipCurrentProcess, token, sra, mcc);
887  }
888  }
889 
891  provRetriever_ = provRetriever;
892  }
893 
895 
897  return provRetriever_ ? provRetriever_->branchIDToProvenance(bd_->originalBranchID()) : nullptr;
898  }
899 
901 
902  bool ParentProcessProductResolver::singleProduct_() const { return true; }
903 
905  // In principle, this ought to be fixed. I noticed one hits this error
906  // when in a SubProcess and calling the Event::getProvenance function
907  // with a BranchID to a branch from an earlier SubProcess or the top
908  // level process and this branch is not kept in this SubProcess. It might
909  // be possible to hit this in other contexts. I say it ought to be
910  // fixed because one does not encounter this issue if the SubProcesses
911  // are split into genuinely different processes (in principle that
912  // ought to give identical behavior and results). No user has ever
913  // reported this issue which has been around for some time and it was only
914  // noticed when testing some rare corner cases after modifying Core code.
915  // After discussing this with Chris we decided that at least for the moment
916  // there are higher priorities than fixing this ... I converted it so it
917  // causes an exception instead of a seg fault. The issue that may need to
918  // be addressed someday is how ProductResolvers for non-kept branches are
919  // connected to earlier SubProcesses.
921  << "ParentProcessProductResolver::throwNullRealProduct RealProduct pointer not set in this context.\n"
922  << "Contact a Framework developer\n";
923  }
924 
925  NoProcessProductResolver::NoProcessProductResolver(std::vector<ProductResolverIndex> const& matchingHolders,
926  std::vector<bool> const& ambiguous,
927  bool madeAtEnd)
928  : matchingHolders_(matchingHolders),
929  ambiguous_(ambiguous),
930  lastCheckIndex_(ambiguous_.size() + kUnsetOffset),
931  lastSkipCurrentCheckIndex_(lastCheckIndex_.load()),
932  prefetchRequested_(false),
933  skippingPrefetchRequested_(false),
934  madeAtEnd_{madeAtEnd} {
935  assert(ambiguous_.size() == matchingHolders_.size());
936  }
937 
939  Principal const& principal,
940  bool skipCurrentProcess,
942  ModuleCallingContext const* mcc) const {
943  ProductResolverBase const* productResolver = principal.getProductResolverByIndex(matchingHolders_[index]);
944  return productResolver->resolveProduct(principal, skipCurrentProcess, sra, mcc);
945  }
946 
948  bool skipCurrentProcess,
950  ModuleCallingContext const* mcc) const {
951  //See if we've already cached which Resolver we should call or if
952  // we know it is ambiguous
953  const unsigned int choiceSize = ambiguous_.size();
954 
955  //madeAtEnd_==true and not at end transition is the same as skipping the current process
956  if ((not skipCurrentProcess) and (madeAtEnd_ and mcc)) {
957  skipCurrentProcess = not mcc->parent().isAtEndTransition();
958  }
959 
960  unsigned int checkCacheIndex = skipCurrentProcess ? lastSkipCurrentCheckIndex_.load() : lastCheckIndex_.load();
961  if (checkCacheIndex != choiceSize + kUnsetOffset) {
962  if (checkCacheIndex == choiceSize + kAmbiguousOffset) {
964  } else if (checkCacheIndex == choiceSize + kMissingOffset) {
965  return Resolution(nullptr);
966  }
967  return tryResolver(checkCacheIndex, principal, skipCurrentProcess, sra, mcc);
968  }
969 
970  std::atomic<unsigned int>& updateCacheIndex = skipCurrentProcess ? lastSkipCurrentCheckIndex_ : lastCheckIndex_;
971 
972  std::vector<unsigned int> const& lookupProcessOrder = principal.lookupProcessOrder();
973  for (unsigned int k : lookupProcessOrder) {
974  assert(k < ambiguous_.size());
975  if (k == 0)
976  break; // Done
977  if (ambiguous_[k]) {
978  updateCacheIndex = choiceSize + kAmbiguousOffset;
980  }
982  auto resolution = tryResolver(k, principal, skipCurrentProcess, sra, mcc);
983  if (resolution.data() != nullptr) {
984  updateCacheIndex = k;
985  return resolution;
986  }
987  }
988  }
989 
990  updateCacheIndex = choiceSize + kMissingOffset;
991  return Resolution(nullptr);
992  }
993 
995  Principal const& principal,
996  bool skipCurrentProcess,
997  ServiceToken const& token,
999  ModuleCallingContext const* mcc) const {
1000  bool timeToMakeAtEnd = true;
1001  if (madeAtEnd_ and mcc) {
1002  timeToMakeAtEnd = mcc->parent().isAtEndTransition();
1003  }
1004 
1005  //If timeToMakeAtEnd is false, then it is equivalent to skipping the current process
1006  if (not skipCurrentProcess and timeToMakeAtEnd) {
1007  //need to try changing prefetchRequested_ before adding to waitingTasks_
1008  bool expected = false;
1009  bool prefetchRequested = prefetchRequested_.compare_exchange_strong(expected, true);
1010  waitingTasks_.add(waitTask);
1011 
1012  if (prefetchRequested) {
1013  //we are the first thread to request
1014  tryPrefetchResolverAsync(0, principal, false, sra, mcc, token, waitTask.group());
1015  }
1016  } else {
1017  skippingWaitingTasks_.add(waitTask);
1018  bool expected = false;
1019  if (skippingPrefetchRequested_.compare_exchange_strong(expected, true)) {
1020  //we are the first thread to request
1021  tryPrefetchResolverAsync(0, principal, true, sra, mcc, token, waitTask.group());
1022  }
1023  }
1024  }
1025 
1026  void NoProcessProductResolver::setCache(bool iSkipCurrentProcess,
1027  ProductResolverIndex iIndex,
1028  std::exception_ptr iExceptPtr) const {
1029  if (not iSkipCurrentProcess) {
1030  lastCheckIndex_ = iIndex;
1031  waitingTasks_.doneWaiting(iExceptPtr);
1032  } else {
1033  lastSkipCurrentCheckIndex_ = iIndex;
1034  skippingWaitingTasks_.doneWaiting(iExceptPtr);
1035  }
1036  }
1037 
1038  namespace {
1039  class TryNextResolverWaitingTask : public edm::WaitingTask {
1040  public:
1041  TryNextResolverWaitingTask(NoProcessProductResolver const* iResolver,
1042  unsigned int iResolverIndex,
1043  Principal const* iPrincipal,
1045  ModuleCallingContext const* iMCC,
1046  bool iSkipCurrentProcess,
1047  ServiceToken iToken,
1048  oneapi::tbb::task_group* iGroup)
1049  : resolver_(iResolver),
1050  principal_(iPrincipal),
1051  sra_(iSRA),
1052  mcc_(iMCC),
1053  group_(iGroup),
1054  serviceToken_(iToken),
1055  index_(iResolverIndex),
1056  skipCurrentProcess_(iSkipCurrentProcess) {}
1057 
1058  void execute() final {
1059  auto exceptPtr = exceptionPtr();
1060  if (exceptPtr) {
1061  resolver_->prefetchFailed(index_, *principal_, skipCurrentProcess_, exceptPtr);
1062  } else {
1063  if (not resolver_->dataValidFromResolver(index_, *principal_, skipCurrentProcess_)) {
1064  resolver_->tryPrefetchResolverAsync(
1065  index_ + 1, *principal_, skipCurrentProcess_, sra_, mcc_, serviceToken_.lock(), group_);
1066  }
1067  }
1068  }
1069 
1070  private:
1071  NoProcessProductResolver const* resolver_;
1072  Principal const* principal_;
1074  ModuleCallingContext const* mcc_;
1075  oneapi::tbb::task_group* group_;
1076  ServiceWeakToken serviceToken_;
1077  unsigned int index_;
1078  bool skipCurrentProcess_;
1079  };
1080  } // namespace
1081 
1082  void NoProcessProductResolver::prefetchFailed(unsigned int iProcessingIndex,
1083  Principal const& principal,
1084  bool iSkipCurrentProcess,
1085  std::exception_ptr iExceptPtr) const {
1086  std::vector<unsigned int> const& lookupProcessOrder = principal.lookupProcessOrder();
1087  auto k = lookupProcessOrder[iProcessingIndex];
1088 
1089  setCache(iSkipCurrentProcess, k, iExceptPtr);
1090  }
1091 
1092  bool NoProcessProductResolver::dataValidFromResolver(unsigned int iProcessingIndex,
1093  Principal const& principal,
1094  bool iSkipCurrentProcess) const {
1095  std::vector<unsigned int> const& lookupProcessOrder = principal.lookupProcessOrder();
1096  auto k = lookupProcessOrder[iProcessingIndex];
1097  ProductResolverBase const* productResolver = principal.getProductResolverByIndex(matchingHolders_[k]);
1098 
1099  if (productResolver->productWasFetchedAndIsValid(iSkipCurrentProcess)) {
1100  setCache(iSkipCurrentProcess, k, nullptr);
1101  return true;
1102  }
1103  return false;
1104  }
1105 
1106  void NoProcessProductResolver::tryPrefetchResolverAsync(unsigned int iProcessingIndex,
1107  Principal const& principal,
1108  bool skipCurrentProcess,
1110  ModuleCallingContext const* mcc,
1112  oneapi::tbb::task_group* group) const {
1113  std::vector<unsigned int> const& lookupProcessOrder = principal.lookupProcessOrder();
1114  auto index = iProcessingIndex;
1115 
1116  const unsigned int choiceSize = ambiguous_.size();
1117  unsigned int newCacheIndex = choiceSize + kMissingOffset;
1118  while (index < lookupProcessOrder.size()) {
1119  auto k = lookupProcessOrder[index];
1120  if (k == 0) {
1121  break;
1122  }
1123  assert(k < ambiguous_.size());
1124  if (ambiguous_[k]) {
1125  newCacheIndex = choiceSize + kAmbiguousOffset;
1126  break;
1127  }
1129  //make new task
1130 
1131  auto task = new TryNextResolverWaitingTask(this, index, &principal, sra, mcc, skipCurrentProcess, token, group);
1132  WaitingTaskHolder hTask(*group, task);
1133  ProductResolverBase const* productResolver = principal.getProductResolverByIndex(matchingHolders_[k]);
1134 
1135  //Make sure the Services are available on this thread
1137 
1138  productResolver->prefetchAsync(hTask, principal, skipCurrentProcess, token, sra, mcc);
1139  return;
1140  }
1141  ++index;
1142  }
1143  //data product unavailable
1144  setCache(skipCurrentProcess, newCacheIndex, nullptr);
1145  }
1146 
1148 
1150 
1152 
1153  inline unsigned int NoProcessProductResolver::unsetIndexValue() const { return ambiguous_.size() + kUnsetOffset; }
1154 
1156  // This function should never receive 'true'. On the other hand,
1157  // nothing should break if a 'true' is passed, because
1158  // NoProcessProductResolver just forwards the resolve
1159  const auto resetValue = unsetIndexValue();
1160  lastCheckIndex_ = resetValue;
1161  lastSkipCurrentCheckIndex_ = resetValue;
1162  prefetchRequested_ = false;
1164  waitingTasks_.reset();
1166  }
1167 
1168  bool NoProcessProductResolver::singleProduct_() const { return false; }
1169 
1172  << "NoProcessProductResolver::unscheduledWasNotRun_() not implemented and should never be called.\n"
1173  << "Contact a Framework developer\n";
1174  }
1175 
1178  << "NoProcessProductResolver::productUnavailable_() not implemented and should never be called.\n"
1179  << "Contact a Framework developer\n";
1180  }
1181 
1184  << "NoProcessProductResolver::productResolved_() not implemented and should never be called.\n"
1185  << "Contact a Framework developer\n";
1186  }
1187 
1190  << "NoProcessProductResolver::productWasDeleted_() not implemented and should never be called.\n"
1191  << "Contact a Framework developer\n";
1192  }
1193 
1194  bool NoProcessProductResolver::productWasFetchedAndIsValid_(bool /*iSkipCurrentProcess*/) const {
1196  << "NoProcessProductResolver::productWasFetchedAndIsValid_() not implemented and should never be called.\n"
1197  << "Contact a Framework developer\n";
1198  }
1199 
1202  << "NoProcessProductResolver::branchDescription_() not implemented and should never be called.\n"
1203  << "Contact a Framework developer\n";
1204  }
1205 
1206  void NoProcessProductResolver::resetBranchDescription_(std::shared_ptr<BranchDescription const>) {
1208  << "NoProcessProductResolver::resetBranchDescription_() not implemented and should never be called.\n"
1209  << "Contact a Framework developer\n";
1210  }
1211 
1214  << "NoProcessProductResolver::provenance_() not implemented and should never be called.\n"
1215  << "Contact a Framework developer\n";
1216  }
1217 
1220  << "NoProcessProductResolver::connectTo() not implemented and should never be called.\n"
1221  << "Contact a Framework developer\n";
1222  }
1223 
1224  //---- SingleChoiceNoProcessProductResolver ----------------
1226  Principal const& principal,
1227  bool skipCurrentProcess,
1229  ModuleCallingContext const* mcc) const {
1230  //NOTE: Have to lookup the other ProductResolver each time rather than cache
1231  // it's pointer since it appears the pointer can change at some later stage
1233  ->resolveProduct(principal, skipCurrentProcess, sra, mcc);
1234  }
1235 
1237  Principal const& principal,
1238  bool skipCurrentProcess,
1239  ServiceToken const& token,
1241  ModuleCallingContext const* mcc) const {
1243  ->prefetchAsync(waitTask, principal, skipCurrentProcess, token, sra, mcc);
1244  }
1245 
1247 
1249 
1251 
1253 
1255 
1258  << "SingleChoiceNoProcessProductResolver::unscheduledWasNotRun_() not implemented and should never be called.\n"
1259  << "Contact a Framework developer\n";
1260  }
1261 
1264  << "SingleChoiceNoProcessProductResolver::productUnavailable_() not implemented and should never be called.\n"
1265  << "Contact a Framework developer\n";
1266  }
1267 
1270  << "SingleChoiceNoProcessProductResolver::productResolved_() not implemented and should never be called.\n"
1271  << "Contact a Framework developer\n";
1272  }
1273 
1276  << "SingleChoiceNoProcessProductResolver::productWasDeleted_() not implemented and should never be called.\n"
1277  << "Contact a Framework developer\n";
1278  }
1279 
1281  throw Exception(errors::LogicError) << "SingleChoiceNoProcessProductResolver::productWasFetchedAndIsValid_() not "
1282  "implemented and should never be called.\n"
1283  << "Contact a Framework developer\n";
1284  }
1285 
1288  << "SingleChoiceNoProcessProductResolver::branchDescription_() not implemented and should never be called.\n"
1289  << "Contact a Framework developer\n";
1290  }
1291 
1292  void SingleChoiceNoProcessProductResolver::resetBranchDescription_(std::shared_ptr<BranchDescription const>) {
1293  throw Exception(errors::LogicError) << "SingleChoiceNoProcessProductResolver::resetBranchDescription_() not "
1294  "implemented and should never be called.\n"
1295  << "Contact a Framework developer\n";
1296  }
1297 
1300  << "SingleChoiceNoProcessProductResolver::provenance_() not implemented and should never be called.\n"
1301  << "Contact a Framework developer\n";
1302  }
1303 
1306  << "SingleChoiceNoProcessProductResolver::connectTo() not implemented and should never be called.\n"
1307  << "Contact a Framework developer\n";
1308  }
1309 
1310 } // namespace edm
void setProductProvenanceRetriever_(ProductProvenanceRetriever const *provRetriever) override
void connectTo(ProductResolverBase const &iOther, Principal const *) final
void setProductID(ProductID const &pid)
ProductData const & getProductData() const final
ProductProvenance const * productProvenancePtr_() const override
void setOrMergeProduct(std::shared_ptr< WrapperBase > prod, MergeableRunProductMetadata const *mergeableRunProductMetadata) const
BranchDescription const & branchDescription_() const override
void setProductProvenanceRetriever_(ProductProvenanceRetriever const *provRetriever) final
std::string const & productInstanceName() const
std::unique_ptr< T, F > make_sentry(T *iObject, F iFunc)
NOTE: if iObject is null, then iFunc will not be called.
Definition: make_sentry.h:30
void resetProductData_(bool deleteEarly) override
void putProduct(std::unique_ptr< WrapperBase > edp) const override
static const TGPicture * info(bool iBackgroundIsBlack)
void prefetchAsync_(WaitingTaskHolder waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
#define CMS_SA_ALLOW
void prefetchAsync_(WaitingTaskHolder waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
static constexpr unsigned int kAmbiguousOffset
Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const final
bool singleProduct_() const override
void exceptionContext(cms::Exception &, ESModuleCallingContext const &)
unsigned int ProductResolverIndex
ProductProvenance const * productProvenance() const
Definition: Provenance.cc:24
static constexpr unsigned int kMissingOffset
Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
std::vector< unsigned int > const & lookupProcessOrder() const
Definition: Principal.h:191
std::shared_ptr< BranchDescription const > bd_
Provenance const & provenance() const
Definition: ProductData.h:33
void resetProductData_(bool deleteEarly) override
GlobalContext const * globalContext() const
Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
BranchType const & branchType() const
void setProductID_(ProductID const &pid) final
void setProductProvenanceRetriever_(ProductProvenanceRetriever const *provRetriever) override
Worker * findWorker(std::string const &iLabel) const
void putProduct(std::unique_ptr< WrapperBase > edp) const final
bool dataValidFromResolver(unsigned int iProcessingIndex, Principal const &principal, bool iSkipCurrentProcess) const
void checkType(WrapperBase const &prod) const
std::atomic< bool > prefetchRequested_
void resetProductData_(bool deleteEarly) final
WaitingTaskList & waitingTasks() const
Resolution resolveProduct(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const
void doWorkAsync(WaitingTaskHolder, typename T::TransitionInfoType const &, ServiceToken const &, StreamID, ParentContext const &, typename T::Context const *)
Definition: Worker.h:973
Provenance const * provenance() const
void prefetchAsync_(WaitingTaskHolder waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const final
TypeID unwrappedTypeID() const
std::string const & processName() const
void mergeProduct(std::shared_ptr< WrapperBase > edp, MergeableRunProductMetadata const *) const
SwitchProducerProductResolver(std::shared_ptr< BranchDescription const > bd, DataManagingOrAliasProductResolver &realProduct)
reader
Definition: DQM.py:105
void resetProductData_(bool deleteEarly) override
bool productWasFetchedAndIsValid_(bool iSkipCurrentProcess) const override
bool unscheduledWasNotRun_() const override
void prefetchAsync_(WaitingTaskHolder waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
bool singleProduct_() const override
Log< level::Error, false > LogError
bool productUnavailable_() const override
TypeWithDict const & unwrappedType() const
bool isAtEndTransition() const
void setupUnscheduled(UnscheduledConfigurator const &) final
void reset()
Resets access to the resource so that added tasks will wait.
ProductProvenanceRetriever const * provRetriever_
void connectTo(ProductResolverBase const &iOther, Principal const *) final
assert(be >=bs)
std::string const & processName() const
ProductData const & getProductData() const final
WrapperBase const * wrapper() const
Definition: ProductData.h:35
void resetProductData_(bool deleteEarly) override
void connectTo(ProductResolverBase const &, Principal const *) final
void setProductID(ProductID const &pid)
Definition: ProductData.h:58
Definition: Electron.h:6
DataManagingOrAliasProductResolver & realProduct_
UnscheduledAuxiliary const * aux_
void setupUnscheduled(UnscheduledConfigurator const &) final
ProductProvenance const * productProvenancePtr_() const override
UnscheduledAuxiliary const * aux_
oneapi::tbb::task_group * group() const noexcept
bool singleProduct_() const override
void emit(Args &&... args) const
Definition: Signal.h:48
ProductProvenanceLookup const * store() const
Definition: Provenance.h:60
void doTransformAsync(WaitingTaskHolder, size_t iTransformIndex, EventPrincipal const &, ServiceToken const &, StreamID, ModuleCallingContext const &, StreamContext const *)
Definition: Worker.cc:245
void setProductID_(ProductID const &pid) override
MergeDecision getMergeDecision(std::string const &processThatCreatedProduct) const
UnscheduledAuxiliary const * aux_
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
void tryPrefetchResolverAsync(unsigned int iProcessingIndex, Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc, ServiceToken token, oneapi::tbb::task_group *) const
virtual bool isFromCurrentProcess() const =0
void setupUnscheduled(UnscheduledConfigurator const &iConfigure) final
std::string const & moduleLabel() const
SwitchBaseProductResolver(std::shared_ptr< BranchDescription const > bd, DataManagingOrAliasProductResolver &realProduct)
std::string const & className() const
ProductProvenance const * productProvenancePtr_() const final
void prefetchAsync_(WaitingTaskHolder waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const final
std::string const & branchName() const
std::atomic< unsigned int > lastSkipCurrentCheckIndex_
void resetProductData()
Definition: ProductData.h:52
std::atomic< bool > prefetchRequested_
std::string const & productInstanceName() const
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
DataManagingOrAliasProductResolver const & realProduct() const
bool productWasFetchedAndIsValid_(bool iSkipCurrentProcess) const final
void resetProductData_(bool deleteEarly) override
void add(oneapi::tbb::task_group *, WaitingTask *)
Adds task to the waiting list.
void resetProductData_(bool deleteEarly) override
Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
bool isFromCurrentProcess() const final
bool productWasFetchedAndIsValid(bool iSkipCurrentProcess) const
static constexpr const ProductStatus defaultStatus_
Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const final
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:92
std::atomic< bool > prefetchRequested_
std::atomic< unsigned int > lastCheckIndex_
void callWhenDoneAsync(WaitingTaskHolder task)
Definition: Worker.h:177
void setProductID_(ProductID const &pid) override
ProductProvenance const * branchIDToProvenance(BranchID const &bid) const
void setMergeableRunProductMetadata(MergeableRunProductMetadataBase const *mrpm)
Definition: ProductData.h:60
ProductProvenance const * productProvenancePtr_() const override
void prefetchAsync_(WaitingTaskHolder waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
void putOrMergeProduct(std::unique_ptr< WrapperBase > prod) const override
WrapperBase * unsafe_wrapper() const
Definition: ProductData.h:36
void setProductID_(ProductID const &pid) final
void prefetchAsync_(WaitingTaskHolder waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
ServiceToken lock() const
Definition: ServiceToken.h:101
void prefetchAsync(WaitingTaskHolder waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const
virtual size_t transformIndex(edm::BranchDescription const &) const =0
Definition: Worker.cc:244
void setCache(bool skipCurrentProcess, ProductResolverIndex index, std::exception_ptr exceptionPtr) const
NoProcessProductResolver(std::vector< ProductResolverIndex > const &matchingHolders, std::vector< bool > const &ambiguous, bool madeAtEnd)
std::atomic< bool > skippingPrefetchRequested_
Resolution resolveProductImpl(Resolution) const
std::atomic< ProductStatus > theStatus_
ModuleDescription const * description() const
Definition: Worker.h:198
std::vector< bool > ambiguous_
std::type_info const & unvalidatedTypeInfo() const
Definition: TypeWithDict.h:81
void setProductProvenanceRetriever_(ProductProvenanceRetriever const *provRetriever) override
void resetProductData_(bool deleteEarly) override
static constexpr unsigned int kUnsetOffset
void setProductProvenanceRetriever_(ProductProvenanceRetriever const *provRetriever) override
Provenance const * provenance_() const override
void setupUnscheduled(UnscheduledConfigurator const &) final
void setMergeableRunProductMetadataInProductData(MergeableRunProductMetadata const *)
void setProvenance(ProductProvenanceLookup const *provRetriever)
Definition: ProductData.h:56
bool isPresent() const
Definition: WrapperBase.h:30
def load(fileName)
Definition: svgfig.py:547
FunctorTask< F > * make_functor_task(F f)
Definition: FunctorTask.h:44
ProductStatus defaultStatus() const
BranchDescription const & branchDescription() const
void putProduct(std::unique_ptr< WrapperBase > edp) const override
void prefetchFailed(unsigned int iProcessingIndex, Principal const &principal, bool iSkipCurrentProcess, std::exception_ptr iExceptPtr) const
void setProductID_(ProductID const &pid) override
void setMergeableRunProductMetadata_(MergeableRunProductMetadata const *) override
DataManagingOrAliasProductResolver & realProduct_
signalslot::Signal< void(StreamContext const &, ModuleCallingContext const &)> preModuleDelayedGetSignal_
Transition transition() const
Definition: GlobalContext.h:55
std::vector< ProductResolverIndex > matchingHolders_
void resetProductData_(bool deleteEarly) override
Resolution tryResolver(unsigned int index, Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const
Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
bool availableOnlyAtEndTransition() const
HLT enums.
unsigned int unsetIndexValue() const
StreamContext const * getStreamContext() const
void retrieveAndMerge_(Principal const &principal, MergeableRunProductMetadata const *mergeableRunProductMetadata) const override
void setProduct(std::unique_ptr< WrapperBase > edp) const
void unsafe_setWrapper(std::unique_ptr< WrapperBase > iValue) const
Definition: ProductData.cc:27
Provenance const * provenance_() const override
bool productWasDeleted_() const override
void prefetchAsync_(WaitingTaskHolder waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
void setupUnscheduled(UnscheduledConfigurator const &) final
Resolution resolveProductImpl(FUNC resolver) const
void resetProductData_(bool deleteEarly) override=0
Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const final
std::atomic< bool > prefetchRequested_
Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
Log< level::Warning, false > LogWarning
void resetProductData_(bool deleteEarly) override
signalslot::Signal< void(StreamContext const &, ModuleCallingContext const &)> postModuleDelayedGetSignal_
UnscheduledAuxiliary const * auxiliary() const
std::atomic< bool > & prefetchRequested() const
static ParentageRegistry * instance()
BranchDescription const & branchDescription_() const override
EventTransitionInfo const & eventTransitionInfo() const
bool productWasFetchedAndIsValid_(bool iSkipCurrentProcess) const override
ConstProductResolverPtr getProductResolverByIndex(ProductResolverIndex const &oid) const
Definition: Principal.cc:563
std::string const & moduleLabel() const
void setProductID_(ProductID const &pid) override
ProductProvenance const * productProvenancePtr_() const override
void prefetchAsync_(WaitingTaskHolder waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const final
void resetBranchDescription_(std::shared_ptr< BranchDescription const > bd) override
bool insertMapped(value_type const &v)
void resetBranchDescription_(std::shared_ptr< BranchDescription const > bd) override
def move(src, dest)
Definition: eostools.py:511
void setProductProvenanceRetriever_(ProductProvenanceRetriever const *provRetriever) final
DelayedReader * reader() const
Definition: Principal.h:181
void putProduct(std::unique_ptr< WrapperBase > edp) const override
void setProductProvenanceRetriever(ProductProvenanceRetriever const *provRetriever)
void insertIntoSet(ProductProvenance provenanceProduct) const
static HepMC::HEPEVT_Wrapper wrapper
ParentContext const & parent() const
BranchID const & originalBranchID() const
Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
void connectTo(ProductResolverBase const &iOther, Principal const *iParentPrincipal) final