CMS 3D CMS Logo

ProductResolvers.cc
Go to the documentation of this file.
1 /*----------------------------------------------------------------------
2 ----------------------------------------------------------------------*/
3 #include "ProductResolvers.h"
23 
24 #include <cassert>
25 #include <utility>
26 
27 static constexpr unsigned int kUnsetOffset = 0;
28 static constexpr unsigned int kAmbiguousOffset = 1;
29 static constexpr unsigned int kMissingOffset = 2;
30 
31 namespace edm {
32 
35  exception << "DataManagingProductResolver::resolveProduct_: The product matching all criteria was already deleted\n"
36  << "Looking for type: " << branchDescription().unwrappedTypeID() << "\n"
37  << "Looking for module label: " << moduleLabel() << "\n"
38  << "Looking for productInstanceName: " << productInstanceName() << "\n"
39  << (processName().empty() ? "" : "Looking for process: ") << processName() << "\n"
40  << "This means there is a configuration error.\n"
41  << "The module which is asking for this data must be configured to state that it will read this data.";
42  throw exception;
43  }
44 
45  //This is a templated function in order to avoid calling another virtual function
46  template <bool callResolver, typename FUNC>
48  if (productWasDeleted()) {
50  }
51  auto presentStatus = status();
52 
53  if (callResolver && presentStatus == ProductStatus::ResolveNotRun) {
54  //if resolver fails because of exception or not setting product
55  // make sure the status goes to failed
56  auto failedStatusSetter = [this](ProductStatus* iPresentStatus) {
57  if (this->status() == ProductStatus::ResolveNotRun) {
58  this->setFailedStatus();
59  }
60  *iPresentStatus = this->status();
61  };
62  std::unique_ptr<ProductStatus, decltype(failedStatusSetter)> failedStatusGuard(&presentStatus,
63  failedStatusSetter);
64 
65  //If successful, this will call setProduct
66  resolver();
67  }
68 
69  if (presentStatus == ProductStatus::ProductSet) {
70  auto pd = &getProductData();
71  if (pd->wrapper()->isPresent()) {
72  return Resolution(pd);
73  }
74  }
75 
76  return Resolution(nullptr);
77  }
78 
80  std::shared_ptr<WrapperBase> iFrom, MergeableRunProductMetadata const* mergeableRunProductMetadata) const {
81  // if its not mergeable and the previous read failed, go ahead and use this one
83  setProduct(std::move(iFrom));
84  return;
85  }
86 
88  if (not iFrom) {
89  return;
90  }
91 
92  checkType(*iFrom);
93 
95  if (original->isMergeable()) {
96  if (original->isPresent() != iFrom->isPresent()) {
98  << "Merge of Run or Lumi product failed for branch " << branchDescription().branchName() << "\n"
99  << "Was trying to merge objects where one product had been put in the input file and the other had not "
100  "been."
101  << "\n"
102  << "The solution is to drop the branch on input. Or better do not create inconsistent files\n"
103  << "that need to be merged in the first place.\n";
104  }
105  if (original->isPresent()) {
107  if (mergeableRunProductMetadata == nullptr || desc.branchType() != InRun) {
108  original->mergeProduct(iFrom.get());
109  } else {
111  mergeableRunProductMetadata->getMergeDecision(desc.processName());
112  if (decision == MergeableRunProductMetadata::MERGE) {
113  original->mergeProduct(iFrom.get());
114  } else if (decision == MergeableRunProductMetadata::REPLACE) {
115  // Note this swaps the content of the product where the
116  // both products branches are present and the product is
117  // also present (was put) in the branch. A module might
118  // have already gotten a pointer to the product so we
119  // keep those pointers valid. This does not call swap
120  // on the Wrapper.
121  original->swapProduct(iFrom.get());
122  }
123  // If the decision is IGNORE, do nothing
124  }
125  }
126  // If both have isPresent false, do nothing
127 
128  } else if (original->hasIsProductEqual()) {
129  if (original->isPresent() && iFrom->isPresent()) {
130  if (!original->isProductEqual(iFrom.get())) {
131  auto const& bd = branchDescription();
132  edm::LogError("RunLumiMerging")
133  << "ProductResolver::mergeTheProduct\n"
134  << "Two run/lumi products for the same run/lumi which should be equal are not\n"
135  << "Using the first, ignoring the second\n"
136  << "className = " << bd.className() << "\n"
137  << "moduleLabel = " << bd.moduleLabel() << "\n"
138  << "instance = " << bd.productInstanceName() << "\n"
139  << "process = " << bd.processName() << "\n";
140  }
141  } else if (!original->isPresent() && iFrom->isPresent()) {
142  setProduct(std::move(iFrom));
143  }
144  // if not iFrom->isPresent(), do nothing
145  } else {
146  auto const& bd = branchDescription();
147  edm::LogWarning("RunLumiMerging") << "ProductResolver::mergeTheProduct\n"
148  << "Run/lumi product has neither a mergeProduct nor isProductEqual function\n"
149  << "Using the first, ignoring the second in merge\n"
150  << "className = " << bd.className() << "\n"
151  << "moduleLabel = " << bd.moduleLabel() << "\n"
152  << "instance = " << bd.productInstanceName() << "\n"
153  << "process = " << bd.processName() << "\n";
154  if (!original->isPresent() && iFrom->isPresent()) {
155  setProduct(std::move(iFrom));
156  }
157  // In other cases, do nothing
158  }
159  }
160 
161  namespace {
162  cms::Exception& extendException(cms::Exception& e, BranchDescription const& bd, ModuleCallingContext const* mcc) {
163  e.addContext(std::string("While reading from source ") + bd.className() + " " + bd.moduleLabel() + " '" +
164  bd.productInstanceName() + "' " + bd.processName());
165  if (mcc) {
166  edm::exceptionContext(e, *mcc);
167  }
168  return e;
169  }
170  } // namespace
172  Principal const& principal, bool, SharedResourcesAcquirer*, ModuleCallingContext const* mcc) const {
173  return resolveProductImpl<true>([this, &principal, mcc]() {
174  auto branchType = principal.branchType();
175  if (branchType == InLumi || branchType == InRun) {
176  //delayed get has not been allowed with Run or Lumis
177  // The file may already be closed so the reader is invalid
178  return;
179  }
180  if (mcc and branchType == InEvent and aux_) {
182  }
183 
184  auto sentry(make_sentry(mcc, [this, branchType](ModuleCallingContext const* iContext) {
185  if (branchType == InEvent and aux_) {
186  aux_->postModuleDelayedGetSignal_.emit(*(iContext->getStreamContext()), *iContext);
187  }
188  }));
189 
190  if (auto reader = principal.reader()) {
191  std::unique_lock<std::recursive_mutex> guard;
192  if (auto sr = reader->sharedResources().second) {
193  guard = std::unique_lock<std::recursive_mutex>(*sr);
194  }
195  if (not productResolved()) {
196  try {
197  //another thread could have beaten us here
198  setProduct(reader->getProduct(branchDescription().branchID(), &principal, mcc));
199  } catch (cms::Exception& e) {
200  throw extendException(e, branchDescription(), mcc);
201  } catch (std::exception const& e) {
202  auto newExcept = edm::Exception(errors::StdException) << e.what();
203  throw extendException(newExcept, branchDescription(), mcc);
204  }
205  }
206  }
207  });
208  }
209 
211  Principal const& principal, MergeableRunProductMetadata const* mergeableRunProductMetadata) const {
212  if (auto reader = principal.reader()) {
213  std::unique_lock<std::recursive_mutex> guard;
214  if (auto sr = reader->sharedResources().second) {
215  guard = std::unique_lock<std::recursive_mutex>(*sr);
216  }
217 
218  //Can't use resolveProductImpl since it first checks to see
219  // if the product was already retrieved and then returns if it is
220  auto edp(reader->getProduct(branchDescription().branchID(), &principal));
221 
222  if (edp.get() != nullptr) {
223  if (edp->isMergeable() && branchDescription().branchType() == InRun && !edp->hasSwap()) {
225  << "Missing definition of member function swap for branch name " << branchDescription().branchName()
226  << "\n"
227  << "Mergeable data types written to a Run must have the swap member function defined"
228  << "\n";
229  }
231  (status() == ProductStatus::ResolveFailed && !branchDescription().isMergeable())) {
232  setOrMergeProduct(std::move(edp), mergeableRunProductMetadata);
233  } else { // status() == ResolveFailed && branchDescription().isMergeable()
235  << "Merge of Run or Lumi product failed for branch " << branchDescription().branchName() << "\n"
236  << "The product branch was dropped in the first run or lumi fragment and present in a later one"
237  << "\n"
238  << "The solution is to drop the branch on input. Or better do not create inconsistent files\n"
239  << "that need to be merged in the first place.\n";
240  }
241  } else if (status() == defaultStatus()) {
242  setFailedStatus();
243  } else if (status() != ProductStatus::ResolveFailed && branchDescription().isMergeable()) {
245  << "Merge of Run or Lumi product failed for branch " << branchDescription().branchName() << "\n"
246  << "The product branch was present in first run or lumi fragment and dropped in a later one"
247  << "\n"
248  << "The solution is to drop the branch on input. Or better do not create inconsistent files\n"
249  << "that need to be merged in the first place.\n";
250  }
251  // Do nothing in other case. status is ResolveFailed already or
252  // it is not mergeable and the status is ProductSet
253  }
254  }
255 
257  std::shared_ptr<WrapperBase> prod, MergeableRunProductMetadata const* mergeableRunProductMetadata) const {
258  if (status() == defaultStatus()) {
259  //resolveProduct has not been called or it failed
261  } else {
262  mergeProduct(std::move(prod), mergeableRunProductMetadata);
263  }
264  }
265 
268  }
269 
271  Principal const& principal,
272  bool skipCurrentProcess,
273  ServiceToken const& token,
275  ModuleCallingContext const* mcc) const {
276  //need to try changing m_prefetchRequested before adding to m_waitingTasks
277  bool expected = false;
278  bool prefetchRequested = m_prefetchRequested.compare_exchange_strong(expected, true);
279  m_waitingTasks.add(waitTask);
280 
281  if (prefetchRequested) {
282  ServiceWeakToken weakToken = token;
283  auto workToDo = [this, mcc, &principal, weakToken]() {
284  //need to make sure Service system is activated on the reading thread
285  ServiceRegistry::Operate operate(weakToken.lock());
286  // Caught exception is propagated via WaitingTaskList
287  CMS_SA_ALLOW try {
288  resolveProductImpl<true>([this, &principal, mcc]() {
289  if (principal.branchType() != InEvent && principal.branchType() != InProcess) {
290  return;
291  }
292  if (auto reader = principal.reader()) {
293  std::unique_lock<std::recursive_mutex> guard;
294  if (auto sr = reader->sharedResources().second) {
295  guard = std::unique_lock<std::recursive_mutex>(*sr);
296  }
297  if (not productResolved()) {
298  try {
299  //another thread could have finished this while we were waiting
300  setProduct(reader->getProduct(branchDescription().branchID(), &principal, mcc));
301  } catch (cms::Exception& e) {
302  throw extendException(e, branchDescription(), mcc);
303  } catch (std::exception const& e) {
304  auto newExcept = edm::Exception(errors::StdException) << e.what();
305  throw extendException(newExcept, branchDescription(), mcc);
306  }
307  }
308  }
309  });
310  } catch (...) {
311  this->m_waitingTasks.doneWaiting(std::current_exception());
312  return;
313  }
314  this->m_waitingTasks.doneWaiting(nullptr);
315  };
316 
317  SerialTaskQueueChain* queue = nullptr;
318  if (auto reader = principal.reader()) {
319  if (auto shared_res = reader->sharedResources().first) {
320  queue = &(shared_res->serialQueueChain());
321  }
322  }
323  if (queue) {
324  queue->push(*waitTask.group(), workToDo);
325  } else {
326  //Have to create a new task
327  auto t = make_functor_task(workToDo);
328  waitTask.group()->run([t]() {
329  TaskSentry s{t};
330  t->execute();
331  });
332  }
333  }
334  }
335 
337  if (not deleteEarly) {
338  m_prefetchRequested = false;
340  }
342  }
343 
345  aux_ = iConfigure.auxiliary();
346  }
347 
349 
350  void PutOnReadInputProductResolver::putProduct(std::unique_ptr<WrapperBase> edp) const {
351  if (status() != defaultStatus()) {
353  << "Attempt to insert more than one product on branch " << branchDescription().branchName() << "\n";
354  }
355 
356  setProduct(std::move(edp)); // ProductResolver takes ownership
357  }
358 
360 
362  bool skipCurrentProcess,
364  ModuleCallingContext const*) const {
365  return resolveProductImpl<false>([]() { return; });
366  }
367 
369  Principal const& principal,
370  bool skipCurrentProcess,
371  ServiceToken const& token,
373  ModuleCallingContext const* mcc) const {}
374 
375  void PutOnReadInputProductResolver::putOrMergeProduct(std::unique_ptr<WrapperBase> edp) const {
376  setOrMergeProduct(std::move(edp), nullptr);
377  }
378 
380  bool skipCurrentProcess,
382  ModuleCallingContext const*) const {
383  if (!skipCurrentProcess) {
384  //'false' means never call the lambda function
385  return resolveProductImpl<false>([]() { return; });
386  }
387  return Resolution(nullptr);
388  }
389 
391  Principal const& principal,
392  bool skipCurrentProcess,
393  ServiceToken const& token,
395  ModuleCallingContext const* mcc) const {
396  if (not skipCurrentProcess) {
397  if (branchDescription().branchType() == InProcess &&
399  // This is an accessInputProcessBlock transition
400  // We cannot access produced products in those transitions
401  // except for in SubProcesses where they should have already run.
402  return;
403  }
405  if (not mcc->parent().isAtEndTransition()) {
406  return;
407  }
408  }
409 
410  if (waitingTasks_) {
411  // using a waiting task to do a callback guarantees that the
412  // waitingTasks_ list (from the worker) will be released from
413  // waiting even if the module does not put this data product
414  // or the module has an exception while running
415  waitingTasks_->add(waitTask);
416  }
417  }
418  }
419 
421  auto worker = iConfigure.findWorker(branchDescription().moduleLabel());
422  if (worker) {
423  waitingTasks_ = &worker->waitingTaskList();
424  }
425  }
426 
428  aux_ = iConfigure.auxiliary();
430  }
431 
433  bool skipCurrentProcess,
435  ModuleCallingContext const*) const {
436  if (!skipCurrentProcess and worker_) {
437  return resolveProductImpl<false>([] {});
438  }
439  return Resolution(nullptr);
440  }
441 
443  Principal const& principal,
444  bool skipCurrentProcess,
445  ServiceToken const& token,
447  ModuleCallingContext const* mcc) const {
448  if (skipCurrentProcess) {
449  return;
450  }
451  if (worker_ == nullptr) {
452  throw cms::Exception("LogicError") << "UnscheduledProductResolver::prefetchAsync_() called with null worker_. "
453  "This should not happen, please contact framework developers.";
454  }
455  //need to try changing prefetchRequested_ before adding to waitingTasks_
456  bool expected = false;
457  bool prefetchRequested = prefetchRequested_.compare_exchange_strong(expected, true);
458  waitingTasks_.add(waitTask);
459  if (prefetchRequested) {
460  //Have to create a new task which will make sure the state for UnscheduledProductResolver
461  // is properly set after the module has run
462  auto t = make_waiting_task([this](std::exception_ptr const* iPtr) {
463  //The exception is being rethrown because resolveProductImpl sets the ProductResolver to a failed
464  // state for the case where an exception occurs during the call to the function.
465  // Caught exception is propagated via WaitingTaskList
466  CMS_SA_ALLOW try {
467  resolveProductImpl<true>([iPtr]() {
468  if (iPtr) {
469  std::rethrow_exception(*iPtr);
470  }
471  });
472  } catch (...) {
473  waitingTasks_.doneWaiting(std::current_exception());
474  return;
475  }
476  waitingTasks_.doneWaiting(nullptr);
477  });
478 
479  ParentContext parentContext(mcc);
482  WaitingTaskHolder(*waitTask.group(), t),
483  info,
484  token,
485  info.principal().streamID(),
486  parentContext,
487  mcc->getStreamContext());
488  }
489  }
490 
492  if (not deleteEarly) {
493  prefetchRequested_ = false;
495  }
497  }
498 
500  aux_ = iConfigure.auxiliary();
502  // worker can be missing if the corresponding module is
503  // unscheduled and none of its products are consumed
504  if (worker_) {
506  }
507  }
508 
510  bool skipCurrentProcess,
512  ModuleCallingContext const*) const {
513  if (!skipCurrentProcess and worker_) {
514  return resolveProductImpl<false>([] {});
515  }
516  return Resolution(nullptr);
517  }
518 
519  void TransformingProductResolver::putProduct(std::unique_ptr<WrapperBase> edp) const {
520  // Override putProduct() to not set the resolver status to
521  // ResolveFailed when the Event::commit_() checks which produced
522  // products were actually produced and which not, because the
523  // transforming products are never produced by time of commit_()
524  // by construction.
525  if (edp) {
527  }
528  }
529 
531  Principal const& principal,
532  bool skipCurrentProcess,
533  ServiceToken const& token,
535  ModuleCallingContext const* mcc) const {
536  if (skipCurrentProcess) {
537  return;
538  }
539  if (worker_ == nullptr) {
540  throw cms::Exception("LogicError") << "TransformingProductResolver::prefetchAsync_() called with null worker_. "
541  "This should not happen, please contact framework developers.";
542  }
543  //need to try changing prefetchRequested_ before adding to waitingTasks_
544  bool expected = false;
545  bool prefetchRequested = prefetchRequested_.compare_exchange_strong(expected, true);
546  waitingTasks_.add(waitTask);
547  if (prefetchRequested) {
548  //Have to create a new task which will make sure the state for TransformingProductResolver
549  // is properly set after the module has run
550  auto t = make_waiting_task([this](std::exception_ptr const* iPtr) {
551  //The exception is being rethrown because resolveProductImpl sets the ProductResolver to a failed
552  // state for the case where an exception occurs during the call to the function.
553  // Caught exception is propagated via WaitingTaskList
554  CMS_SA_ALLOW try {
555  resolveProductImpl<true>([iPtr]() {
556  if (iPtr) {
557  std::rethrow_exception(*iPtr);
558  }
559  });
560  } catch (...) {
561  waitingTasks_.doneWaiting(std::current_exception());
562  return;
563  }
564  waitingTasks_.doneWaiting(nullptr);
565  });
566 
567  //This gives a lifetime greater than this call
568  ParentContext parent(mcc);
570 
573  index_,
574  info.principal(),
575  token,
576  info.principal().streamID(),
577  mcc_,
578  mcc->getStreamContext());
579  }
580  }
581 
583  if (not deleteEarly) {
584  prefetchRequested_ = false;
586  }
588  }
589 
590  void ProducedProductResolver::putProduct(std::unique_ptr<WrapperBase> edp) const {
591  if (status() != defaultStatus()) {
593  << "Attempt to insert more than one product on branch " << branchDescription().branchName() << "\n";
594  }
595 
596  setProduct(std::move(edp)); // ProductResolver takes ownership
597  }
598 
599  bool ProducedProductResolver::isFromCurrentProcess() const { return true; }
600 
602 
604  // Check if the types match.
605  TypeID typeID(prod.dynamicTypeInfo());
607  // Types do not match.
609  << "Product on branch " << branchDescription().branchName() << " is of wrong type.\n"
610  << "It is supposed to be of type " << branchDescription().className() << ".\n"
611  << "It is actually of type " << typeID.className() << ".\n";
612  }
613  }
614 
615  void DataManagingProductResolver::setProduct(std::unique_ptr<WrapperBase> edp) const {
616  if (edp) {
617  checkType(*edp);
620  } else {
621  setFailedStatus();
622  }
623  }
624  void DataManagingProductResolver::setProduct(std::shared_ptr<WrapperBase> edp) const {
625  if (edp) {
626  checkType(*edp);
629  } else {
630  setFailedStatus();
631  }
632  }
633 
634  // This routine returns true if it is known that currently there is no real product.
635  // If there is a real product, it returns false.
636  // If it is not known if there is a real product, it returns false.
638  auto presentStatus = status();
639  if (presentStatus == ProductStatus::ProductSet) {
640  return !(getProductData().wrapper()->isPresent());
641  }
642  return presentStatus != ProductStatus::ResolveNotRun;
643  }
644 
646  auto s = status();
647  return (s != defaultStatus()) or (s == ProductStatus::ProductDeleted);
648  }
649 
650  // This routine returns true if the product was deleted early in order to save memory
652 
653  bool DataManagingProductResolver::productWasFetchedAndIsValid_(bool iSkipCurrentProcess) const {
654  if (iSkipCurrentProcess and isFromCurrentProcess()) {
655  return false;
656  }
658  if (getProductData().wrapper()->isPresent()) {
659  return true;
660  }
661  }
662  return false;
663  }
664 
666  productData_.setProvenance(provRetriever);
667  }
668 
670 
672  MergeableRunProductMetadata const* mrpm) {
674  }
675 
677  return provenance()->productProvenance();
678  }
679 
683  }
684  if (deleteEarly) {
686  } else {
687  resetStatus();
688  }
689  }
690 
691  bool DataManagingProductResolver::singleProduct_() const { return true; }
692 
695  }
696 
698 
700  return provenance()->productProvenance();
701  }
702 
704 
705  bool AliasProductResolver::singleProduct_() const { return true; }
706 
707  SwitchBaseProductResolver::SwitchBaseProductResolver(std::shared_ptr<BranchDescription const> bd,
709  : realProduct_(realProduct), productData_(std::move(bd)), prefetchRequested_(false) {
710  // Parentage of this branch is always the same by construction, so we can compute the ID just "once" here.
711  Parentage p;
712  p.setParents(std::vector<BranchID>{realProduct.branchDescription().originalBranchID()});
713  parentageID_ = p.id();
715  }
716 
717  void SwitchBaseProductResolver::connectTo(ProductResolverBase const& iOther, Principal const* iParentPrincipal) {
719  << "SwitchBaseProductResolver::connectTo() not implemented and should never be called.\n"
720  << "Contact a Framework developer\n";
721  }
722 
725  }
726 
728  if (res.data() == nullptr)
729  return res;
730  return Resolution(&productData_);
731  }
732 
734  // SwitchProducer will never put anything in the event, and
735  // "false" will make Event::commit_() to call putProduct() with
736  // null unique_ptr<WrapperBase> to signal that the produce() was
737  // run.
738  return false;
739  }
740 
742  productData_.setProvenance(provRetriever);
743  }
744 
746  // insertIntoSet is const, so let's exploit that to fake the getting of the "input" product
748  }
749 
752  realProduct_.resetProductData_(deleteEarly);
753  if (not deleteEarly) {
754  prefetchRequested_ = false;
756  }
757  }
758 
760  // update provenance
762  // Use the Wrapper of the pointed-to resolver, but the provenance of this resolver
763  productData_.unsafe_setWrapper(realProduct().getProductData().sharedConstWrapper());
764  }
765 
766  SwitchProducerProductResolver::SwitchProducerProductResolver(std::shared_ptr<BranchDescription const> bd,
768  : SwitchBaseProductResolver(std::move(bd), realProduct), status_(defaultStatus_) {}
769 
771  bool skipCurrentProcess,
773  ModuleCallingContext const* mcc) const {
775  return resolveProductImpl(realProduct().resolveProduct(principal, skipCurrentProcess, sra, mcc));
776  }
777  return Resolution(nullptr);
778  }
779 
781  Principal const& principal,
782  bool skipCurrentProcess,
783  ServiceToken const& token,
785  ModuleCallingContext const* mcc) const {
786  if (skipCurrentProcess) {
787  return;
788  }
789  if (branchDescription().availableOnlyAtEndTransition() and mcc and not mcc->parent().isAtEndTransition()) {
790  return;
791  }
792 
793  //need to try changing prefetchRequested before adding to waitingTasks
794  bool expected = false;
795  bool doPrefetchRequested = prefetchRequested().compare_exchange_strong(expected, true);
796  waitingTasks().add(waitTask);
797 
798  if (doPrefetchRequested) {
799  //using a waiting task to do a callback guarantees that
800  // the waitingTasks() list will be released from waiting even
801  // if the module does not put this data product or the
802  // module has an exception while running
803  auto waiting = make_waiting_task([this](std::exception_ptr const* iException) {
804  if (nullptr != iException) {
805  waitingTasks().doneWaiting(*iException);
806  } else {
808  waitingTasks().doneWaiting(std::exception_ptr());
809  }
810  });
811  worker()->callWhenDoneAsync(WaitingTaskHolder(*waitTask.group(), waiting));
812  }
813  }
814 
815  void SwitchProducerProductResolver::putProduct(std::unique_ptr<WrapperBase> edp) const {
816  if (status_ != defaultStatus_) {
818  << "Attempt to insert more than one product for a branch " << branchDescription().branchName()
819  << "This makes no sense for SwitchProducerProductResolver.\nContact a Framework developer";
820  }
821  // Let's use ResolveFailed to signal that produce() was called, as
822  // there is no real product in this resolver
824  bool expected = false;
825  if (prefetchRequested().compare_exchange_strong(expected, true)) {
827  waitingTasks().doneWaiting(std::exception_ptr());
828  }
829  }
830 
832  // if produce() was run (ResolveFailed), ask from the real resolver
834  return realProduct().productUnavailable();
835  }
836  return true;
837  }
838 
841  if (not deleteEarly) {
843  }
844  }
845 
847  bool skipCurrentProcess,
849  ModuleCallingContext const* mcc) const {
850  return resolveProductImpl(realProduct().resolveProduct(principal, skipCurrentProcess, sra, mcc));
851  }
852 
854  Principal const& principal,
855  bool skipCurrentProcess,
856  ServiceToken const& token,
858  ModuleCallingContext const* mcc) const {
859  if (skipCurrentProcess) {
860  return;
861  }
862 
863  //need to try changing prefetchRequested_ before adding to waitingTasks_
864  bool expected = false;
865  bool doPrefetchRequested = prefetchRequested().compare_exchange_strong(expected, true);
866  waitingTasks().add(waitTask);
867 
868  if (doPrefetchRequested) {
869  //using a waiting task to do a callback guarantees that
870  // the waitingTasks() list will be released from waiting even
871  // if the module does not put this data product or the
872  // module has an exception while running
873  auto waiting = make_waiting_task([this](std::exception_ptr const* iException) {
874  if (nullptr != iException) {
875  waitingTasks().doneWaiting(*iException);
876  } else {
878  waitingTasks().doneWaiting(std::exception_ptr());
879  }
880  });
882  WaitingTaskHolder(*waitTask.group(), waiting), principal, skipCurrentProcess, token, sra, mcc);
883  }
884  }
885 
887  provRetriever_ = provRetriever;
888  }
889 
891 
893  return provRetriever_ ? provRetriever_->branchIDToProvenance(bd_->originalBranchID()) : nullptr;
894  }
895 
897 
898  bool ParentProcessProductResolver::singleProduct_() const { return true; }
899 
901  // In principle, this ought to be fixed. I noticed one hits this error
902  // when in a SubProcess and calling the Event::getProvenance function
903  // with a BranchID to a branch from an earlier SubProcess or the top
904  // level process and this branch is not kept in this SubProcess. It might
905  // be possible to hit this in other contexts. I say it ought to be
906  // fixed because one does not encounter this issue if the SubProcesses
907  // are split into genuinely different processes (in principle that
908  // ought to give identical behavior and results). No user has ever
909  // reported this issue which has been around for some time and it was only
910  // noticed when testing some rare corner cases after modifying Core code.
911  // After discussing this with Chris we decided that at least for the moment
912  // there are higher priorities than fixing this ... I converted it so it
913  // causes an exception instead of a seg fault. The issue that may need to
914  // be addressed someday is how ProductResolvers for non-kept branches are
915  // connected to earlier SubProcesses.
917  << "ParentProcessProductResolver::throwNullRealProduct RealProduct pointer not set in this context.\n"
918  << "Contact a Framework developer\n";
919  }
920 
921  NoProcessProductResolver::NoProcessProductResolver(std::vector<ProductResolverIndex> const& matchingHolders,
922  std::vector<bool> const& ambiguous,
923  bool madeAtEnd)
924  : matchingHolders_(matchingHolders),
925  ambiguous_(ambiguous),
926  lastCheckIndex_(ambiguous_.size() + kUnsetOffset),
927  lastSkipCurrentCheckIndex_(lastCheckIndex_.load()),
928  prefetchRequested_(false),
929  skippingPrefetchRequested_(false),
930  madeAtEnd_{madeAtEnd} {
931  assert(ambiguous_.size() == matchingHolders_.size());
932  }
933 
935  Principal const& principal,
936  bool skipCurrentProcess,
938  ModuleCallingContext const* mcc) const {
939  ProductResolverBase const* productResolver = principal.getProductResolverByIndex(matchingHolders_[index]);
940  return productResolver->resolveProduct(principal, skipCurrentProcess, sra, mcc);
941  }
942 
944  bool skipCurrentProcess,
946  ModuleCallingContext const* mcc) const {
947  //See if we've already cached which Resolver we should call or if
948  // we know it is ambiguous
949  const unsigned int choiceSize = ambiguous_.size();
950 
951  //madeAtEnd_==true and not at end transition is the same as skipping the current process
952  if ((not skipCurrentProcess) and (madeAtEnd_ and mcc)) {
953  skipCurrentProcess = not mcc->parent().isAtEndTransition();
954  }
955 
956  unsigned int checkCacheIndex = skipCurrentProcess ? lastSkipCurrentCheckIndex_.load() : lastCheckIndex_.load();
957  if (checkCacheIndex != choiceSize + kUnsetOffset) {
958  if (checkCacheIndex == choiceSize + kAmbiguousOffset) {
960  } else if (checkCacheIndex == choiceSize + kMissingOffset) {
961  return Resolution(nullptr);
962  }
963  return tryResolver(checkCacheIndex, principal, skipCurrentProcess, sra, mcc);
964  }
965 
966  std::atomic<unsigned int>& updateCacheIndex = skipCurrentProcess ? lastSkipCurrentCheckIndex_ : lastCheckIndex_;
967 
968  std::vector<unsigned int> const& lookupProcessOrder = principal.lookupProcessOrder();
969  for (unsigned int k : lookupProcessOrder) {
970  assert(k < ambiguous_.size());
971  if (k == 0)
972  break; // Done
973  if (ambiguous_[k]) {
974  updateCacheIndex = choiceSize + kAmbiguousOffset;
976  }
978  auto resolution = tryResolver(k, principal, skipCurrentProcess, sra, mcc);
979  if (resolution.data() != nullptr) {
980  updateCacheIndex = k;
981  return resolution;
982  }
983  }
984  }
985 
986  updateCacheIndex = choiceSize + kMissingOffset;
987  return Resolution(nullptr);
988  }
989 
991  Principal const& principal,
992  bool skipCurrentProcess,
993  ServiceToken const& token,
995  ModuleCallingContext const* mcc) const {
996  bool timeToMakeAtEnd = true;
997  if (madeAtEnd_ and mcc) {
998  timeToMakeAtEnd = mcc->parent().isAtEndTransition();
999  }
1000 
1001  //If timeToMakeAtEnd is false, then it is equivalent to skipping the current process
1002  if (not skipCurrentProcess and timeToMakeAtEnd) {
1003  //need to try changing prefetchRequested_ before adding to waitingTasks_
1004  bool expected = false;
1005  bool prefetchRequested = prefetchRequested_.compare_exchange_strong(expected, true);
1006  waitingTasks_.add(waitTask);
1007 
1008  if (prefetchRequested) {
1009  //we are the first thread to request
1010  tryPrefetchResolverAsync(0, principal, false, sra, mcc, token, waitTask.group());
1011  }
1012  } else {
1013  skippingWaitingTasks_.add(waitTask);
1014  bool expected = false;
1015  if (skippingPrefetchRequested_.compare_exchange_strong(expected, true)) {
1016  //we are the first thread to request
1017  tryPrefetchResolverAsync(0, principal, true, sra, mcc, token, waitTask.group());
1018  }
1019  }
1020  }
1021 
1022  void NoProcessProductResolver::setCache(bool iSkipCurrentProcess,
1023  ProductResolverIndex iIndex,
1024  std::exception_ptr iExceptPtr) const {
1025  if (not iSkipCurrentProcess) {
1026  lastCheckIndex_ = iIndex;
1027  waitingTasks_.doneWaiting(iExceptPtr);
1028  } else {
1029  lastSkipCurrentCheckIndex_ = iIndex;
1030  skippingWaitingTasks_.doneWaiting(iExceptPtr);
1031  }
1032  }
1033 
1034  namespace {
1035  class TryNextResolverWaitingTask : public edm::WaitingTask {
1036  public:
1037  TryNextResolverWaitingTask(NoProcessProductResolver const* iResolver,
1038  unsigned int iResolverIndex,
1039  Principal const* iPrincipal,
1041  ModuleCallingContext const* iMCC,
1042  bool iSkipCurrentProcess,
1043  ServiceToken iToken,
1044  oneapi::tbb::task_group* iGroup)
1045  : resolver_(iResolver),
1046  principal_(iPrincipal),
1047  sra_(iSRA),
1048  mcc_(iMCC),
1049  group_(iGroup),
1050  serviceToken_(iToken),
1051  index_(iResolverIndex),
1052  skipCurrentProcess_(iSkipCurrentProcess) {}
1053 
1054  void execute() final {
1055  auto exceptPtr = exceptionPtr();
1056  if (exceptPtr) {
1057  resolver_->prefetchFailed(index_, *principal_, skipCurrentProcess_, exceptPtr);
1058  } else {
1059  if (not resolver_->dataValidFromResolver(index_, *principal_, skipCurrentProcess_)) {
1060  resolver_->tryPrefetchResolverAsync(
1061  index_ + 1, *principal_, skipCurrentProcess_, sra_, mcc_, serviceToken_.lock(), group_);
1062  }
1063  }
1064  }
1065 
1066  private:
1067  NoProcessProductResolver const* resolver_;
1068  Principal const* principal_;
1070  ModuleCallingContext const* mcc_;
1071  oneapi::tbb::task_group* group_;
1072  ServiceWeakToken serviceToken_;
1073  unsigned int index_;
1074  bool skipCurrentProcess_;
1075  };
1076  } // namespace
1077 
1078  void NoProcessProductResolver::prefetchFailed(unsigned int iProcessingIndex,
1079  Principal const& principal,
1080  bool iSkipCurrentProcess,
1081  std::exception_ptr iExceptPtr) const {
1082  std::vector<unsigned int> const& lookupProcessOrder = principal.lookupProcessOrder();
1083  auto k = lookupProcessOrder[iProcessingIndex];
1084 
1085  setCache(iSkipCurrentProcess, k, iExceptPtr);
1086  }
1087 
1088  bool NoProcessProductResolver::dataValidFromResolver(unsigned int iProcessingIndex,
1089  Principal const& principal,
1090  bool iSkipCurrentProcess) const {
1091  std::vector<unsigned int> const& lookupProcessOrder = principal.lookupProcessOrder();
1092  auto k = lookupProcessOrder[iProcessingIndex];
1093  ProductResolverBase const* productResolver = principal.getProductResolverByIndex(matchingHolders_[k]);
1094 
1095  if (productResolver->productWasFetchedAndIsValid(iSkipCurrentProcess)) {
1096  setCache(iSkipCurrentProcess, k, nullptr);
1097  return true;
1098  }
1099  return false;
1100  }
1101 
1102  void NoProcessProductResolver::tryPrefetchResolverAsync(unsigned int iProcessingIndex,
1103  Principal const& principal,
1104  bool skipCurrentProcess,
1106  ModuleCallingContext const* mcc,
1108  oneapi::tbb::task_group* group) const {
1109  std::vector<unsigned int> const& lookupProcessOrder = principal.lookupProcessOrder();
1110  auto index = iProcessingIndex;
1111 
1112  const unsigned int choiceSize = ambiguous_.size();
1113  unsigned int newCacheIndex = choiceSize + kMissingOffset;
1114  while (index < lookupProcessOrder.size()) {
1115  auto k = lookupProcessOrder[index];
1116  if (k == 0) {
1117  break;
1118  }
1119  assert(k < ambiguous_.size());
1120  if (ambiguous_[k]) {
1121  newCacheIndex = choiceSize + kAmbiguousOffset;
1122  break;
1123  }
1125  //make new task
1126 
1127  auto task = new TryNextResolverWaitingTask(this, index, &principal, sra, mcc, skipCurrentProcess, token, group);
1128  WaitingTaskHolder hTask(*group, task);
1129  ProductResolverBase const* productResolver = principal.getProductResolverByIndex(matchingHolders_[k]);
1130 
1131  //Make sure the Services are available on this thread
1133 
1134  productResolver->prefetchAsync(hTask, principal, skipCurrentProcess, token, sra, mcc);
1135  return;
1136  }
1137  ++index;
1138  }
1139  //data product unavailable
1140  setCache(skipCurrentProcess, newCacheIndex, nullptr);
1141  }
1142 
1144 
1146 
1148 
1149  inline unsigned int NoProcessProductResolver::unsetIndexValue() const { return ambiguous_.size() + kUnsetOffset; }
1150 
1152  // This function should never receive 'true'. On the other hand,
1153  // nothing should break if a 'true' is passed, because
1154  // NoProcessProductResolver just forwards the resolve
1155  const auto resetValue = unsetIndexValue();
1156  lastCheckIndex_ = resetValue;
1157  lastSkipCurrentCheckIndex_ = resetValue;
1158  prefetchRequested_ = false;
1160  waitingTasks_.reset();
1162  }
1163 
1164  bool NoProcessProductResolver::singleProduct_() const { return false; }
1165 
1168  << "NoProcessProductResolver::unscheduledWasNotRun_() not implemented and should never be called.\n"
1169  << "Contact a Framework developer\n";
1170  }
1171 
1174  << "NoProcessProductResolver::productUnavailable_() not implemented and should never be called.\n"
1175  << "Contact a Framework developer\n";
1176  }
1177 
1180  << "NoProcessProductResolver::productResolved_() not implemented and should never be called.\n"
1181  << "Contact a Framework developer\n";
1182  }
1183 
1186  << "NoProcessProductResolver::productWasDeleted_() not implemented and should never be called.\n"
1187  << "Contact a Framework developer\n";
1188  }
1189 
1190  bool NoProcessProductResolver::productWasFetchedAndIsValid_(bool /*iSkipCurrentProcess*/) const {
1192  << "NoProcessProductResolver::productWasFetchedAndIsValid_() not implemented and should never be called.\n"
1193  << "Contact a Framework developer\n";
1194  }
1195 
1198  << "NoProcessProductResolver::branchDescription_() not implemented and should never be called.\n"
1199  << "Contact a Framework developer\n";
1200  }
1201 
1202  void NoProcessProductResolver::resetBranchDescription_(std::shared_ptr<BranchDescription const>) {
1204  << "NoProcessProductResolver::resetBranchDescription_() not implemented and should never be called.\n"
1205  << "Contact a Framework developer\n";
1206  }
1207 
1210  << "NoProcessProductResolver::provenance_() not implemented and should never be called.\n"
1211  << "Contact a Framework developer\n";
1212  }
1213 
1216  << "NoProcessProductResolver::connectTo() not implemented and should never be called.\n"
1217  << "Contact a Framework developer\n";
1218  }
1219 
1220  //---- SingleChoiceNoProcessProductResolver ----------------
1222  Principal const& principal,
1223  bool skipCurrentProcess,
1225  ModuleCallingContext const* mcc) const {
1226  //NOTE: Have to lookup the other ProductResolver each time rather than cache
1227  // it's pointer since it appears the pointer can change at some later stage
1229  ->resolveProduct(principal, skipCurrentProcess, sra, mcc);
1230  }
1231 
1233  Principal const& principal,
1234  bool skipCurrentProcess,
1235  ServiceToken const& token,
1237  ModuleCallingContext const* mcc) const {
1239  ->prefetchAsync(waitTask, principal, skipCurrentProcess, token, sra, mcc);
1240  }
1241 
1243 
1245 
1247 
1249 
1251 
1254  << "SingleChoiceNoProcessProductResolver::unscheduledWasNotRun_() not implemented and should never be called.\n"
1255  << "Contact a Framework developer\n";
1256  }
1257 
1260  << "SingleChoiceNoProcessProductResolver::productUnavailable_() not implemented and should never be called.\n"
1261  << "Contact a Framework developer\n";
1262  }
1263 
1266  << "SingleChoiceNoProcessProductResolver::productResolved_() not implemented and should never be called.\n"
1267  << "Contact a Framework developer\n";
1268  }
1269 
1272  << "SingleChoiceNoProcessProductResolver::productWasDeleted_() not implemented and should never be called.\n"
1273  << "Contact a Framework developer\n";
1274  }
1275 
1277  throw Exception(errors::LogicError) << "SingleChoiceNoProcessProductResolver::productWasFetchedAndIsValid_() not "
1278  "implemented and should never be called.\n"
1279  << "Contact a Framework developer\n";
1280  }
1281 
1284  << "SingleChoiceNoProcessProductResolver::branchDescription_() not implemented and should never be called.\n"
1285  << "Contact a Framework developer\n";
1286  }
1287 
1288  void SingleChoiceNoProcessProductResolver::resetBranchDescription_(std::shared_ptr<BranchDescription const>) {
1289  throw Exception(errors::LogicError) << "SingleChoiceNoProcessProductResolver::resetBranchDescription_() not "
1290  "implemented and should never be called.\n"
1291  << "Contact a Framework developer\n";
1292  }
1293 
1296  << "SingleChoiceNoProcessProductResolver::provenance_() not implemented and should never be called.\n"
1297  << "Contact a Framework developer\n";
1298  }
1299 
1302  << "SingleChoiceNoProcessProductResolver::connectTo() not implemented and should never be called.\n"
1303  << "Contact a Framework developer\n";
1304  }
1305 
1306 } // namespace edm
void setProductProvenanceRetriever_(ProductProvenanceRetriever const *provRetriever) override
size
Write out results.
void connectTo(ProductResolverBase const &iOther, Principal const *) final
void setProductID(ProductID const &pid)
ProductData const & getProductData() const final
ProductProvenance const * productProvenancePtr_() const override
void setOrMergeProduct(std::shared_ptr< WrapperBase > prod, MergeableRunProductMetadata const *mergeableRunProductMetadata) const
BranchDescription const & branchDescription_() const override
void setProductProvenanceRetriever_(ProductProvenanceRetriever const *provRetriever) final
std::string const & productInstanceName() const
std::unique_ptr< T, F > make_sentry(T *iObject, F iFunc)
NOTE: if iObject is null, then iFunc will not be called.
Definition: make_sentry.h:30
void resetProductData_(bool deleteEarly) override
void putProduct(std::unique_ptr< WrapperBase > edp) const override
static const TGPicture * info(bool iBackgroundIsBlack)
void prefetchAsync_(WaitingTaskHolder waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
#define CMS_SA_ALLOW
void prefetchAsync_(WaitingTaskHolder waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
static constexpr unsigned int kAmbiguousOffset
Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const final
bool singleProduct_() const override
void exceptionContext(cms::Exception &, ESModuleCallingContext const &)
unsigned int ProductResolverIndex
ProductProvenance const * productProvenance() const
Definition: Provenance.cc:24
static constexpr unsigned int kMissingOffset
Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
std::vector< unsigned int > const & lookupProcessOrder() const
Definition: Principal.h:197
std::shared_ptr< BranchDescription const > bd_
Provenance const & provenance() const
Definition: ProductData.h:33
void resetProductData_(bool deleteEarly) override
GlobalContext const * globalContext() const
Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
BranchType const & branchType() const
void setProductID_(ProductID const &pid) final
void setProductProvenanceRetriever_(ProductProvenanceRetriever const *provRetriever) override
Worker * findWorker(std::string const &iLabel) const
void putProduct(std::unique_ptr< WrapperBase > edp) const final
bool dataValidFromResolver(unsigned int iProcessingIndex, Principal const &principal, bool iSkipCurrentProcess) const
void checkType(WrapperBase const &prod) const
std::atomic< bool > prefetchRequested_
void resetProductData_(bool deleteEarly) final
WaitingTaskList & waitingTasks() const
Resolution resolveProduct(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const
void doWorkAsync(WaitingTaskHolder, typename T::TransitionInfoType const &, ServiceToken const &, StreamID, ParentContext const &, typename T::Context const *)
Definition: Worker.h:968
Provenance const * provenance() const
void prefetchAsync_(WaitingTaskHolder waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const final
TypeID unwrappedTypeID() const
void mergeProduct(std::shared_ptr< WrapperBase > edp, MergeableRunProductMetadata const *) const
SwitchProducerProductResolver(std::shared_ptr< BranchDescription const > bd, DataManagingOrAliasProductResolver &realProduct)
reader
Definition: DQM.py:105
void resetProductData_(bool deleteEarly) override
bool productWasFetchedAndIsValid_(bool iSkipCurrentProcess) const override
bool unscheduledWasNotRun_() const override
void prefetchAsync_(WaitingTaskHolder waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
bool singleProduct_() const override
Log< level::Error, false > LogError
bool productUnavailable_() const override
TypeWithDict const & unwrappedType() const
bool isAtEndTransition() const
void setupUnscheduled(UnscheduledConfigurator const &) final
void reset()
Resets access to the resource so that added tasks will wait.
ProductProvenanceRetriever const * provRetriever_
void connectTo(ProductResolverBase const &iOther, Principal const *) final
assert(be >=bs)
std::string const & processName() const
ProductData const & getProductData() const final
WrapperBase const * wrapper() const
Definition: ProductData.h:35
void resetProductData_(bool deleteEarly) override
void connectTo(ProductResolverBase const &, Principal const *) final
void setProductID(ProductID const &pid)
Definition: ProductData.h:58
Definition: Electron.h:6
DataManagingOrAliasProductResolver & realProduct_
UnscheduledAuxiliary const * aux_
void setupUnscheduled(UnscheduledConfigurator const &) final
ProductProvenance const * productProvenancePtr_() const override
UnscheduledAuxiliary const * aux_
oneapi::tbb::task_group * group() const noexcept
bool singleProduct_() const override
void emit(Args &&... args) const
Definition: Signal.h:48
ProductProvenanceLookup const * store() const
Definition: Provenance.h:60
void doTransformAsync(WaitingTaskHolder, size_t iTransformIndex, EventPrincipal const &, ServiceToken const &, StreamID, ModuleCallingContext const &, StreamContext const *)
Definition: Worker.cc:240
void setProductID_(ProductID const &pid) override
MergeDecision getMergeDecision(std::string const &processThatCreatedProduct) const
UnscheduledAuxiliary const * aux_
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
void tryPrefetchResolverAsync(unsigned int iProcessingIndex, Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc, ServiceToken token, oneapi::tbb::task_group *) const
virtual bool isFromCurrentProcess() const =0
void setupUnscheduled(UnscheduledConfigurator const &iConfigure) final
std::string const & moduleLabel() const
SwitchBaseProductResolver(std::shared_ptr< BranchDescription const > bd, DataManagingOrAliasProductResolver &realProduct)
std::string const & className() const
ProductProvenance const * productProvenancePtr_() const final
void prefetchAsync_(WaitingTaskHolder waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const final
std::string const & branchName() const
std::atomic< unsigned int > lastSkipCurrentCheckIndex_
void resetProductData()
Definition: ProductData.h:52
std::atomic< bool > prefetchRequested_
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
DataManagingOrAliasProductResolver const & realProduct() const
bool productWasFetchedAndIsValid_(bool iSkipCurrentProcess) const final
void resetProductData_(bool deleteEarly) override
void add(oneapi::tbb::task_group *, WaitingTask *)
Adds task to the waiting list.
void resetProductData_(bool deleteEarly) override
Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
bool isFromCurrentProcess() const final
bool productWasFetchedAndIsValid(bool iSkipCurrentProcess) const
static constexpr const ProductStatus defaultStatus_
Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const final
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:92
std::atomic< bool > prefetchRequested_
std::atomic< unsigned int > lastCheckIndex_
void callWhenDoneAsync(WaitingTaskHolder task)
Definition: Worker.h:177
void setProductID_(ProductID const &pid) override
ProductProvenance const * branchIDToProvenance(BranchID const &bid) const
void setMergeableRunProductMetadata(MergeableRunProductMetadataBase const *mrpm)
Definition: ProductData.h:60
ProductProvenance const * productProvenancePtr_() const override
void prefetchAsync_(WaitingTaskHolder waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
void putOrMergeProduct(std::unique_ptr< WrapperBase > prod) const override
WrapperBase * unsafe_wrapper() const
Definition: ProductData.h:36
void setProductID_(ProductID const &pid) final
void prefetchAsync_(WaitingTaskHolder waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
ServiceToken lock() const
Definition: ServiceToken.h:101
void prefetchAsync(WaitingTaskHolder waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const
virtual size_t transformIndex(edm::BranchDescription const &) const =0
Definition: Worker.cc:239
void setCache(bool skipCurrentProcess, ProductResolverIndex index, std::exception_ptr exceptionPtr) const
NoProcessProductResolver(std::vector< ProductResolverIndex > const &matchingHolders, std::vector< bool > const &ambiguous, bool madeAtEnd)
std::atomic< bool > skippingPrefetchRequested_
Resolution resolveProductImpl(Resolution) const
std::atomic< ProductStatus > theStatus_
ModuleDescription const * description() const
Definition: Worker.h:198
std::vector< bool > ambiguous_
std::type_info const & unvalidatedTypeInfo() const
Definition: TypeWithDict.h:81
void setProductProvenanceRetriever_(ProductProvenanceRetriever const *provRetriever) override
void resetProductData_(bool deleteEarly) override
static constexpr unsigned int kUnsetOffset
void setProductProvenanceRetriever_(ProductProvenanceRetriever const *provRetriever) override
Provenance const * provenance_() const override
void setupUnscheduled(UnscheduledConfigurator const &) final
void setMergeableRunProductMetadataInProductData(MergeableRunProductMetadata const *)
void setProvenance(ProductProvenanceLookup const *provRetriever)
Definition: ProductData.h:56
bool isPresent() const
Definition: WrapperBase.h:30
def load(fileName)
Definition: svgfig.py:547
FunctorTask< F > * make_functor_task(F f)
Definition: FunctorTask.h:44
ProductStatus defaultStatus() const
BranchDescription const & branchDescription() const
void putProduct(std::unique_ptr< WrapperBase > edp) const override
void prefetchFailed(unsigned int iProcessingIndex, Principal const &principal, bool iSkipCurrentProcess, std::exception_ptr iExceptPtr) const
void setProductID_(ProductID const &pid) override
void setMergeableRunProductMetadata_(MergeableRunProductMetadata const *) override
DataManagingOrAliasProductResolver & realProduct_
signalslot::Signal< void(StreamContext const &, ModuleCallingContext const &)> preModuleDelayedGetSignal_
Transition transition() const
Definition: GlobalContext.h:55
std::vector< ProductResolverIndex > matchingHolders_
void resetProductData_(bool deleteEarly) override
Resolution tryResolver(unsigned int index, Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const
Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
bool availableOnlyAtEndTransition() const
HLT enums.
unsigned int unsetIndexValue() const
StreamContext const * getStreamContext() const
void retrieveAndMerge_(Principal const &principal, MergeableRunProductMetadata const *mergeableRunProductMetadata) const override
void setProduct(std::unique_ptr< WrapperBase > edp) const
void unsafe_setWrapper(std::unique_ptr< WrapperBase > iValue) const
Definition: ProductData.cc:27
Provenance const * provenance_() const override
bool productWasDeleted_() const override
void prefetchAsync_(WaitingTaskHolder waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
void setupUnscheduled(UnscheduledConfigurator const &) final
Resolution resolveProductImpl(FUNC resolver) const
void resetProductData_(bool deleteEarly) override=0
Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const final
std::atomic< bool > prefetchRequested_
Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
Log< level::Warning, false > LogWarning
void resetProductData_(bool deleteEarly) override
signalslot::Signal< void(StreamContext const &, ModuleCallingContext const &)> postModuleDelayedGetSignal_
UnscheduledAuxiliary const * auxiliary() const
std::atomic< bool > & prefetchRequested() const
static ParentageRegistry * instance()
BranchDescription const & branchDescription_() const override
EventTransitionInfo const & eventTransitionInfo() const
bool productWasFetchedAndIsValid_(bool iSkipCurrentProcess) const override
ConstProductResolverPtr getProductResolverByIndex(ProductResolverIndex const &oid) const
Definition: Principal.cc:570
void setProductID_(ProductID const &pid) override
ProductProvenance const * productProvenancePtr_() const override
void prefetchAsync_(WaitingTaskHolder waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const final
void resetBranchDescription_(std::shared_ptr< BranchDescription const > bd) override
bool insertMapped(value_type const &v)
void resetBranchDescription_(std::shared_ptr< BranchDescription const > bd) override
def move(src, dest)
Definition: eostools.py:511
void setProductProvenanceRetriever_(ProductProvenanceRetriever const *provRetriever) final
DelayedReader * reader() const
Definition: Principal.h:187
void putProduct(std::unique_ptr< WrapperBase > edp) const override
void setProductProvenanceRetriever(ProductProvenanceRetriever const *provRetriever)
void insertIntoSet(ProductProvenance provenanceProduct) const
static HepMC::HEPEVT_Wrapper wrapper
ParentContext const & parent() const
BranchID const & originalBranchID() const
Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
void connectTo(ProductResolverBase const &iOther, Principal const *iParentPrincipal) final