CMS 3D CMS Logo

ProductResolvers.cc
Go to the documentation of this file.
1 /*----------------------------------------------------------------------
2 ----------------------------------------------------------------------*/
3 #include "ProductResolvers.h"
4 #include "Worker.h"
5 #include "UnscheduledAuxiliary.h"
21 
22 #include <cassert>
23 #include <utility>
24 
25 static constexpr unsigned int kUnsetOffset = 0;
26 static constexpr unsigned int kAmbiguousOffset = 1;
27 static constexpr unsigned int kMissingOffset = 2;
28 
29 namespace edm {
30 
33  exception << "ProductResolverBase::resolveProduct_: The product matching all criteria was already deleted\n"
34  << "Looking for type: " << branchDescription().unwrappedTypeID() << "\n"
35  << "Looking for module label: " << moduleLabel() << "\n"
36  << "Looking for productInstanceName: " << productInstanceName() << "\n"
37  << (processName().empty() ? "" : "Looking for process: ") << processName() << "\n"
38  << "This means there is a configuration error.\n"
39  << "The module which is asking for this data must be configured to state that it will read this data.";
40  throw exception;
41  }
42 
43  //This is a templated function in order to avoid calling another virtual function
44  template <bool callResolver, typename FUNC>
46  if (productWasDeleted()) {
48  }
49  auto presentStatus = status();
50 
51  if (callResolver && presentStatus == ProductStatus::ResolveNotRun) {
52  //if resolver fails because of exception or not setting product
53  // make sure the status goes to failed
54  auto failedStatusSetter = [this](ProductStatus* presentStatus) {
55  if (this->status() == ProductStatus::ResolveNotRun) {
56  this->setFailedStatus();
57  }
58  *presentStatus = this->status();
59  };
60  std::unique_ptr<ProductStatus, decltype(failedStatusSetter)> failedStatusGuard(&presentStatus,
61  failedStatusSetter);
62 
63  //If successful, this will call setProduct
64  resolver();
65  }
66 
67  if (presentStatus == ProductStatus::ProductSet) {
68  auto pd = &getProductData();
69  if (pd->wrapper()->isPresent()) {
70  return Resolution(pd);
71  }
72  }
73 
74  return Resolution(nullptr);
75  }
76 
77  void DataManagingProductResolver::mergeProduct(std::unique_ptr<WrapperBase> iFrom,
78  MergeableRunProductMetadata const* mergeableRunProductMetadata) const {
79  // if its not mergeable and the previous read failed, go ahead and use this one
81  setProduct(std::move(iFrom));
82  return;
83  }
84 
85  assert(status() == ProductStatus::ProductSet);
86  if (not iFrom) {
87  return;
88  }
89 
90  checkType(*iFrom);
91 
93  if (original->isMergeable()) {
94  if (original->isPresent() != iFrom->isPresent()) {
96  << "Merge of Run or Lumi product failed for branch " << branchDescription().branchName() << "\n"
97  << "Was trying to merge objects where one product had been put in the input file and the other had not "
98  "been."
99  << "\n"
100  << "The solution is to drop the branch on input. Or better do not create inconsistent files\n"
101  << "that need to be merged in the first place.\n";
102  }
103  if (original->isPresent()) {
104  BranchDescription const& desc = branchDescription_();
105  if (mergeableRunProductMetadata == nullptr || desc.branchType() != InRun) {
106  original->mergeProduct(iFrom.get());
107  } else {
109  mergeableRunProductMetadata->getMergeDecision(desc.processName());
110  if (decision == MergeableRunProductMetadata::MERGE) {
111  original->mergeProduct(iFrom.get());
112  } else if (decision == MergeableRunProductMetadata::REPLACE) {
113  // Note this swaps the content of the product where the
114  // both products branches are present and the product is
115  // also present (was put) in the branch. A module might
116  // have already gotten a pointer to the product so we
117  // keep those pointers valid. This does not call swap
118  // on the Wrapper.
119  original->swapProduct(iFrom.get());
120  }
121  // If the decision is IGNORE, do nothing
122  }
123  }
124  // If both have isPresent false, do nothing
125 
126  } else if (original->hasIsProductEqual()) {
127  if (original->isPresent() && iFrom->isPresent()) {
128  if (!original->isProductEqual(iFrom.get())) {
129  auto const& bd = branchDescription();
130  edm::LogError("RunLumiMerging")
131  << "ProductResolver::mergeTheProduct\n"
132  << "Two run/lumi products for the same run/lumi which should be equal are not\n"
133  << "Using the first, ignoring the second\n"
134  << "className = " << bd.className() << "\n"
135  << "moduleLabel = " << bd.moduleLabel() << "\n"
136  << "instance = " << bd.productInstanceName() << "\n"
137  << "process = " << bd.processName() << "\n";
138  }
139  } else if (!original->isPresent() && iFrom->isPresent()) {
140  setProduct(std::move(iFrom));
141  }
142  // if not iFrom->isPresent(), do nothing
143  } else {
144  auto const& bd = branchDescription();
145  edm::LogWarning("RunLumiMerging") << "ProductResolver::mergeTheProduct\n"
146  << "Run/lumi product has neither a mergeProduct nor isProductEqual function\n"
147  << "Using the first, ignoring the second in merge\n"
148  << "className = " << bd.className() << "\n"
149  << "moduleLabel = " << bd.moduleLabel() << "\n"
150  << "instance = " << bd.productInstanceName() << "\n"
151  << "process = " << bd.processName() << "\n";
152  if (!original->isPresent() && iFrom->isPresent()) {
153  setProduct(std::move(iFrom));
154  }
155  // In other cases, do nothing
156  }
157  }
158 
160  bool,
162  ModuleCallingContext const* mcc) const {
163  return resolveProductImpl<true>([this, &principal, mcc]() {
164  auto branchType = principal.branchType();
165  if (branchType != InEvent) {
166  //delayed get has not been allowed with Run or Lumis
167  // The file may already be closed so the reader is invalid
168  return;
169  }
170  if (mcc and (branchType == InEvent) and aux_) {
171  aux_->preModuleDelayedGetSignal_.emit(*(mcc->getStreamContext()), *mcc);
172  }
173 
174  auto sentry(make_sentry(mcc, [this, branchType](ModuleCallingContext const* iContext) {
175  if (branchType == InEvent and aux_) {
176  aux_->postModuleDelayedGetSignal_.emit(*(iContext->getStreamContext()), *iContext);
177  }
178  }));
179 
180  if (auto reader = principal.reader()) {
181  std::unique_lock<std::recursive_mutex> guard;
182  if (auto sr = reader->sharedResources().second) {
183  guard = std::unique_lock<std::recursive_mutex>(*sr);
184  }
185  if (not productResolved()) {
186  //another thread could have beaten us here
187  putProduct(reader->getProduct(branchDescription().branchID(), &principal, mcc));
188  }
189  }
190  });
191  }
192 
194  MergeableRunProductMetadata const* mergeableRunProductMetadata) const {
195  if (auto reader = principal.reader()) {
196  std::unique_lock<std::recursive_mutex> guard;
197  if (auto sr = reader->sharedResources().second) {
198  guard = std::unique_lock<std::recursive_mutex>(*sr);
199  }
200 
201  //Can't use resolveProductImpl since it first checks to see
202  // if the product was already retrieved and then returns if it is
203  std::unique_ptr<WrapperBase> edp(reader->getProduct(branchDescription().branchID(), &principal));
204 
205  if (edp.get() != nullptr) {
206  if (edp->isMergeable() && branchDescription().branchType() == InRun && !edp->hasSwap()) {
208  << "Missing definition of member function swap for branch name " << branchDescription().branchName()
209  << "\n"
210  << "Mergeable data types written to a Run must have the swap member function defined"
211  << "\n";
212  }
214  (status() == ProductStatus::ResolveFailed && !branchDescription().isMergeable())) {
215  putOrMergeProduct(std::move(edp), mergeableRunProductMetadata);
216  } else { // status() == ResolveFailed && branchDescription().isMergeable()
218  << "Merge of Run or Lumi product failed for branch " << branchDescription().branchName() << "\n"
219  << "The product branch was dropped in the first run or lumi fragment and present in a later one"
220  << "\n"
221  << "The solution is to drop the branch on input. Or better do not create inconsistent files\n"
222  << "that need to be merged in the first place.\n";
223  }
224  } else if (status() == defaultStatus()) {
225  setFailedStatus();
226  } else if (status() != ProductStatus::ResolveFailed && branchDescription().isMergeable()) {
228  << "Merge of Run or Lumi product failed for branch " << branchDescription().branchName() << "\n"
229  << "The product branch was present in first run or lumi fragment and dropped in a later one"
230  << "\n"
231  << "The solution is to drop the branch on input. Or better do not create inconsistent files\n"
232  << "that need to be merged in the first place.\n";
233  }
234  // Do nothing in other case. status is ResolveFailed already or
235  // it is not mergeable and the status is ProductSet
236  }
237  }
238 
241  }
242 
244  Principal const& principal,
245  bool skipCurrentProcess,
246  ServiceToken const& token,
248  ModuleCallingContext const* mcc) const {
249  m_waitingTasks.add(waitTask);
250 
251  bool expected = false;
252  if (m_prefetchRequested.compare_exchange_strong(expected, true)) {
253  auto workToDo = [this, mcc, &principal, token]() {
254  //need to make sure Service system is activated on the reading thread
255  ServiceRegistry::Operate guard(token);
256  try {
257  resolveProductImpl<true>([this, &principal, mcc]() {
258  if (principal.branchType() != InEvent) {
259  return;
260  }
261  if (auto reader = principal.reader()) {
262  std::unique_lock<std::recursive_mutex> guard;
263  if (auto sr = reader->sharedResources().second) {
264  guard = std::unique_lock<std::recursive_mutex>(*sr);
265  }
266  if (not productResolved()) {
267  //another thread could have finished this while we were waiting
268  putProduct(reader->getProduct(branchDescription().branchID(), &principal, mcc));
269  }
270  }
271  });
272  } catch (...) {
273  this->m_waitingTasks.doneWaiting(std::current_exception());
274  return;
275  }
276  this->m_waitingTasks.doneWaiting(nullptr);
277  };
278 
279  SerialTaskQueueChain* queue = nullptr;
280  if (auto reader = principal.reader()) {
281  if (auto shared_res = reader->sharedResources().first) {
282  queue = &(shared_res->serialQueueChain());
283  }
284  }
285  if (queue) {
286  queue->push(workToDo);
287  } else {
288  //Have to create a new task
289  auto t = make_functor_task(tbb::task::allocate_root(), workToDo);
290  tbb::task::spawn(*t);
291  }
292  }
293  }
294 
296  m_prefetchRequested = false;
297  m_waitingTasks.reset();
299  }
300 
302  aux_ = iConfigure.auxiliary();
303  }
304 
305  bool InputProductResolver::isFromCurrentProcess() const { return false; }
306 
308  bool skipCurrentProcess,
310  ModuleCallingContext const*) const {
311  if (!skipCurrentProcess) {
312  //'false' means never call the lambda function
313  return resolveProductImpl<false>([]() { return; });
314  }
315  return Resolution(nullptr);
316  }
317 
319  Principal const& principal,
320  bool skipCurrentProcess,
321  ServiceToken const& token,
323  ModuleCallingContext const* mcc) const {
324  if (not skipCurrentProcess) {
325  if (branchDescription().availableOnlyAtEndTransition() and mcc) {
326  if (not mcc->parent().isAtEndTransition()) {
327  return;
328  }
329  }
330  m_waitingTasks.add(waitTask);
331 
332  bool expected = false;
333  if (worker_ and prefetchRequested_.compare_exchange_strong(expected, true)) {
334  //using a waiting task to do a callback guarantees that
335  // the m_waitingTasks list will be released from waiting even
336  // if the module does not put this data product or the
337  // module has an exception while running
338 
339  auto waiting = make_waiting_task(tbb::task::allocate_root(), [this](std::exception_ptr const* iException) {
340  if (nullptr != iException) {
341  m_waitingTasks.doneWaiting(*iException);
342  } else {
343  m_waitingTasks.doneWaiting(std::exception_ptr());
344  }
345  });
346  worker_->callWhenDoneAsync(waiting);
347  }
348  }
349  }
350 
351  void PuttableProductResolver::putProduct_(std::unique_ptr<WrapperBase> edp) const {
353  bool expected = false;
354  if (prefetchRequested_.compare_exchange_strong(expected, true)) {
355  m_waitingTasks.doneWaiting(std::exception_ptr());
356  }
357  }
358 
360  m_waitingTasks.reset();
362  prefetchRequested_ = false;
363  }
364 
366  worker_ = iConfigure.findWorker(branchDescription().moduleLabel());
367  }
368 
370  aux_ = iConfigure.auxiliary();
371  worker_ = iConfigure.findWorker(branchDescription().moduleLabel());
372  assert(worker_ != nullptr);
373  }
374 
376  bool skipCurrentProcess,
378  ModuleCallingContext const* mcc) const {
379  if (!skipCurrentProcess and worker_) {
380  return resolveProductImpl<true>([&principal, this, sra, mcc]() {
381  try {
382  auto const& event = static_cast<EventPrincipal const&>(principal);
383  ParentContext parentContext(mcc);
384  aux_->preModuleDelayedGetSignal_.emit(*(mcc->getStreamContext()), *mcc);
385 
386  auto workCall = [this, &event, &parentContext, mcc]() {
387  auto sentry(make_sentry(mcc, [this](ModuleCallingContext const* iContext) {
388  aux_->postModuleDelayedGetSignal_.emit(*(iContext->getStreamContext()), *iContext);
389  }));
390 
392  event, *(aux_->eventSetup()), event.streamID(), parentContext, mcc->getStreamContext());
393  };
394 
395  if (sra) {
396  assert(false);
397  } else {
398  workCall();
399  }
400 
401  } catch (cms::Exception& ex) {
402  std::ostringstream ost;
403  ost << "Calling produce method for unscheduled module " << worker_->description().moduleName() << "/'"
404  << worker_->description().moduleLabel() << "'";
405  ex.addContext(ost.str());
406  throw;
407  }
408  });
409  }
410  return Resolution(nullptr);
411  }
412 
414  Principal const& principal,
415  bool skipCurrentProcess,
416  ServiceToken const& token,
418  ModuleCallingContext const* mcc) const {
419  if (skipCurrentProcess) {
420  return;
421  }
422  waitingTasks_.add(waitTask);
423  bool expected = false;
424  if (prefetchRequested_.compare_exchange_strong(expected, true)) {
425  //Have to create a new task which will make sure the state for UnscheduledProductResolver
426  // is properly set after the module has run
427  auto t = make_waiting_task(tbb::task::allocate_root(), [this](std::exception_ptr const* iPtr) {
428  //The exception is being rethrown because resolveProductImpl sets the ProductResolver to a failed
429  // state for the case where an exception occurs during the call to the function.
430  try {
431  resolveProductImpl<true>([iPtr]() {
432  if (iPtr) {
433  std::rethrow_exception(*iPtr);
434  }
435  });
436  } catch (...) {
437  waitingTasks_.doneWaiting(std::current_exception());
438  return;
439  }
440  waitingTasks_.doneWaiting(nullptr);
441  });
442  auto const& event = static_cast<EventPrincipal const&>(principal);
443  ParentContext parentContext(mcc);
444 
446  t, event, *(aux_->eventSetup()), token, event.streamID(), parentContext, mcc->getStreamContext());
447  }
448  }
449 
451  prefetchRequested_ = false;
452  waitingTasks_.reset();
454  }
455 
456  void ProducedProductResolver::putProduct_(std::unique_ptr<WrapperBase> edp) const {
457  if (status() != defaultStatus()) {
459  << "Attempt to insert more than one product on branch " << branchDescription().branchName() << "\n";
460  }
461 
462  setProduct(std::move(edp)); // ProductResolver takes ownership
463  }
464 
465  void InputProductResolver::putProduct_(std::unique_ptr<WrapperBase> edp) const {
466  if (not productResolved()) {
467  //Another thread could have set this
468  setProduct(std::move(edp));
469  }
470  }
471 
472  bool ProducedProductResolver::isFromCurrentProcess() const { return true; }
473 
474  void DataManagingProductResolver::connectTo(ProductResolverBase const& iOther, Principal const*) { assert(false); }
475 
477  std::unique_ptr<WrapperBase> prod, MergeableRunProductMetadata const* mergeableRunProductMetadata) const {
478  if (not prod) {
479  return;
480  }
481  if (status() == defaultStatus()) {
482  //resolveProduct has not been called or it failed
483  putProduct(std::move(prod));
484  } else {
485  mergeProduct(std::move(prod), mergeableRunProductMetadata);
486  }
487  }
488 
490  // Check if the types match.
491  TypeID typeID(prod.dynamicTypeInfo());
493  // Types do not match.
495  << "Product on branch " << branchDescription().branchName() << " is of wrong type.\n"
496  << "It is supposed to be of type " << branchDescription().className() << ".\n"
497  << "It is actually of type " << typeID.className() << ".\n";
498  }
499  }
500 
501  void DataManagingProductResolver::setProduct(std::unique_ptr<WrapperBase> edp) const {
502  if (edp) {
503  checkType(*edp);
506  } else {
507  setFailedStatus();
508  }
509  }
510  // This routine returns true if it is known that currently there is no real product.
511  // If there is a real product, it returns false.
512  // If it is not known if there is a real product, it returns false.
514  auto presentStatus = status();
515  if (presentStatus == ProductStatus::ProductSet) {
516  return !(getProductData().wrapper()->isPresent());
517  }
518  return presentStatus != ProductStatus::ResolveNotRun;
519  }
520 
522  auto s = status();
523  return (s != defaultStatus()) or (s == ProductStatus::ProductDeleted);
524  }
525 
526  // This routine returns true if the product was deleted early in order to save memory
528 
529  bool DataManagingProductResolver::productWasFetchedAndIsValid_(bool iSkipCurrentProcess) const {
530  if (iSkipCurrentProcess and isFromCurrentProcess()) {
531  return false;
532  }
534  if (getProductData().wrapper()->isPresent()) {
535  return true;
536  }
537  }
538  return false;
539  }
540 
542  ProcessHistory const& ph,
543  ProductID const& pid) {
544  productData_.setProvenance(provRetriever, ph, pid);
545  }
546 
548  MergeableRunProductMetadata const* mrpm) {
550  }
551 
553 
555  return provenance()->productProvenance();
556  }
557 
561  }
562  if (deleteEarly) {
564  } else {
565  resetStatus();
566  }
567  }
568 
569  bool DataManagingProductResolver::singleProduct_() const { return true; }
570 
572  ProcessHistory const& ph,
573  ProductID const& pid) {
574  realProduct_.setProvenance(provRetriever, ph, pid);
575  }
576 
577  void AliasProductResolver::setProcessHistory_(ProcessHistory const& ph) { realProduct_.setProcessHistory(ph); }
578 
580  return provenance()->productProvenance();
581  }
582 
583  void AliasProductResolver::resetProductData_(bool deleteEarly) { realProduct_.resetProductData_(deleteEarly); }
584 
585  bool AliasProductResolver::singleProduct_() const { return true; }
586 
587  void AliasProductResolver::putProduct_(std::unique_ptr<WrapperBase>) const {
589  << "AliasProductResolver::putProduct_() not implemented and should never be called.\n"
590  << "Contact a Framework developer\n";
591  }
592 
593  void AliasProductResolver::putOrMergeProduct_(std::unique_ptr<WrapperBase> edp,
594  MergeableRunProductMetadata const*) const {
596  << "AliasProductResolver::putOrMergeProduct_(std::unique_ptr<WrapperBase> edp, MergeableRunProductMetadata "
597  "const*) not implemented and should never be called.\n"
598  << "Contact a Framework developer\n";
599  }
600 
601  SwitchBaseProductResolver::SwitchBaseProductResolver(std::shared_ptr<BranchDescription const> bd,
603  : realProduct_(realProduct), productData_(std::move(bd)), prefetchRequested_(false), status_(defaultStatus_) {
604  // Parentage of this branch is always the same by construction, so we can compute the ID just "once" here.
605  Parentage p;
606  p.setParents(std::vector<BranchID>{realProduct.branchDescription().originalBranchID()});
607  parentageID_ = p.id();
609  }
610 
611  void SwitchBaseProductResolver::connectTo(ProductResolverBase const& iOther, Principal const* iParentPrincipal) {
613  << "SwitchBaseProductResolver::connectTo() not implemented and should never be called.\n"
614  << "Contact a Framework developer\n";
615  }
616 
619  }
620 
622  if (res.data() == nullptr)
623  return res;
624  // Use the Wrapper of the pointed-to resolver, but the provenance of this resolver
626  return Resolution(&productData_);
627  }
628 
630  // SwitchProducer will never put anything in the event, and
631  // "false" will make Event::commit_() to call putProduct() with
632  // null unique_ptr<WrapperBase> to signal that the produce() was
633  // run.
634  return false;
635  }
636 
637  void SwitchBaseProductResolver::putProduct_(std::unique_ptr<WrapperBase> edp) const {
638  if (status_ != defaultStatus_) {
640  << "Attempt to insert more than one product for a branch " << branchDescription().branchName()
641  << "This makes no sense for SwitchBaseProductResolver.\nContact a Framework developer";
642  }
643  // Let's use ResolveFailed to signal that produce() was called, as
644  // there is no real product in this resolver
646  bool expected = false;
647  if (prefetchRequested_.compare_exchange_strong(expected, true)) {
648  waitingTasks_.doneWaiting(std::exception_ptr());
649  }
650  }
651 
652  void SwitchBaseProductResolver::putOrMergeProduct_(std::unique_ptr<WrapperBase> edp,
653  MergeableRunProductMetadata const*) const {
655  << "SwitchBaseProductResolver::putOrMergeProduct_(std::unique_ptr<WrapperBase> edp, "
656  "MergeableRunProductMetadata const*) not implemented and should never be called.\n"
657  << "Contact a Framework developer\n";
658  }
659 
661  ProcessHistory const& ph,
662  ProductID const& pid) {
663  // insertIntoSet is const, so let's exploit that to fake the getting of the "input" product
664  provRetriever->insertIntoSet(ProductProvenance(branchDescription().branchID(), parentageID_));
665  productData_.setProvenance(provRetriever, ph, pid);
666  }
667 
670  realProduct_.resetProductData_(deleteEarly);
671  if (not deleteEarly) {
673  }
674  }
675 
677  bool skipCurrentProcess,
679  ModuleCallingContext const* mcc) const {
681  return resolveProductImpl(realProduct().resolveProduct(principal, skipCurrentProcess, sra, mcc));
682  }
683  return Resolution(nullptr);
684  }
685 
687  Principal const& principal,
688  bool skipCurrentProcess,
689  ServiceToken const& token,
691  ModuleCallingContext const* mcc) const {
692  if (skipCurrentProcess) {
693  return;
694  }
695  if (branchDescription().availableOnlyAtEndTransition() and mcc and not mcc->parent().isAtEndTransition()) {
696  return;
697  }
698  waitingTasks().add(waitTask);
699 
700  bool expected = false;
701  if (prefetchRequested().compare_exchange_strong(expected, true)) {
702  //using a waiting task to do a callback guarantees that
703  // the waitingTasks() list will be released from waiting even
704  // if the module does not put this data product or the
705  // module has an exception while running
706  auto waiting = make_waiting_task(tbb::task::allocate_root(), [this](std::exception_ptr const* iException) {
707  if (nullptr != iException) {
708  waitingTasks().doneWaiting(*iException);
709  } else {
710  waitingTasks().doneWaiting(std::exception_ptr());
711  }
712  });
713  worker()->callWhenDoneAsync(waiting);
714  }
715  }
716 
718  // if produce() was run (ResolveFailed), ask from the real resolver
720  return realProduct().productUnavailable();
721  }
722  return true;
723  }
724 
726  bool skipCurrentProcess,
728  ModuleCallingContext const* mcc) const {
729  return resolveProductImpl(realProduct().resolveProduct(principal, skipCurrentProcess, sra, mcc));
730  }
731 
733  Principal const& principal,
734  bool skipCurrentProcess,
735  ServiceToken const& token,
737  ModuleCallingContext const* mcc) const {
738  if (skipCurrentProcess) {
739  return;
740  }
741  realProduct().prefetchAsync(waitTask, principal, skipCurrentProcess, token, sra, mcc);
742  }
743 
745  ProcessHistory const& ph,
746  ProductID const& pid) {
747  provRetriever_ = provRetriever;
748  }
749 
751 
753  return provRetriever_ ? provRetriever_->branchIDToProvenance(bd_->originalBranchID()) : nullptr;
754  }
755 
757 
758  bool ParentProcessProductResolver::singleProduct_() const { return true; }
759 
760  void ParentProcessProductResolver::putProduct_(std::unique_ptr<WrapperBase>) const {
762  << "ParentProcessProductResolver::putProduct_() not implemented and should never be called.\n"
763  << "Contact a Framework developer\n";
764  }
765 
766  void ParentProcessProductResolver::putOrMergeProduct_(std::unique_ptr<WrapperBase> edp,
767  MergeableRunProductMetadata const*) const {
769  << "ParentProcessProductResolver::putOrMergeProduct_(std::unique_ptr<WrapperBase> edp, "
770  "MergeableRunProductMetadata const*) not implemented and should never be called.\n"
771  << "Contact a Framework developer\n";
772  }
773 
775  // In principle, this ought to be fixed. I noticed one hits this error
776  // when in a SubProcess and calling the Event::getProvenance function
777  // with a BranchID to a branch from an earlier SubProcess or the top
778  // level process and this branch is not kept in this SubProcess. It might
779  // be possible to hit this in other contexts. I say it ought to be
780  // fixed because one does not encounter this issue if the SubProcesses
781  // are split into genuinely different processes (in principle that
782  // ought to give identical behavior and results). No user has ever
783  // reported this issue which has been around for some time and it was only
784  // noticed when testing some rare corner cases after modifying Core code.
785  // After discussing this with Chris we decided that at least for the moment
786  // there are higher priorities than fixing this ... I converted it so it
787  // causes an exception instead of a seg fault. The issue that may need to
788  // be addressed someday is how ProductResolvers for non-kept branches are
789  // connected to earlier SubProcesses.
791  << "ParentProcessProductResolver::throwNullRealProduct RealProduct pointer not set in this context.\n"
792  << "Contact a Framework developer\n";
793  }
794 
795  NoProcessProductResolver::NoProcessProductResolver(std::vector<ProductResolverIndex> const& matchingHolders,
796  std::vector<bool> const& ambiguous,
797  bool madeAtEnd)
798  : matchingHolders_(matchingHolders),
799  ambiguous_(ambiguous),
800  lastCheckIndex_(ambiguous_.size() + kUnsetOffset),
801  lastSkipCurrentCheckIndex_(lastCheckIndex_.load()),
803  skippingPrefetchRequested_(false),
804  madeAtEnd_{madeAtEnd} {
805  assert(ambiguous_.size() == matchingHolders_.size());
806  }
807 
809  Principal const& principal,
810  bool skipCurrentProcess,
812  ModuleCallingContext const* mcc) const {
813  ProductResolverBase const* productResolver = principal.getProductResolverByIndex(matchingHolders_[index]);
814  return productResolver->resolveProduct(principal, skipCurrentProcess, sra, mcc);
815  }
816 
818  bool skipCurrentProcess,
820  ModuleCallingContext const* mcc) const {
821  //See if we've already cached which Resolver we should call or if
822  // we know it is ambiguous
823  const unsigned int choiceSize = ambiguous_.size();
824 
825  //madeAtEnd_==true and not at end transition is the same as skipping the current process
826  if ((not skipCurrentProcess) and (madeAtEnd_ and mcc)) {
827  skipCurrentProcess = not mcc->parent().isAtEndTransition();
828  }
829 
830  unsigned int checkCacheIndex = skipCurrentProcess ? lastSkipCurrentCheckIndex_.load() : lastCheckIndex_.load();
831  if (checkCacheIndex != choiceSize + kUnsetOffset) {
832  if (checkCacheIndex == choiceSize + kAmbiguousOffset) {
834  } else if (checkCacheIndex == choiceSize + kMissingOffset) {
835  return Resolution(nullptr);
836  }
837  return tryResolver(checkCacheIndex, principal, skipCurrentProcess, sra, mcc);
838  }
839 
840  std::atomic<unsigned int>& updateCacheIndex = skipCurrentProcess ? lastSkipCurrentCheckIndex_ : lastCheckIndex_;
841 
842  std::vector<unsigned int> const& lookupProcessOrder = principal.lookupProcessOrder();
843  for (unsigned int k : lookupProcessOrder) {
844  assert(k < ambiguous_.size());
845  if (k == 0)
846  break; // Done
847  if (ambiguous_[k]) {
848  updateCacheIndex = choiceSize + kAmbiguousOffset;
850  }
852  auto resolution = tryResolver(k, principal, skipCurrentProcess, sra, mcc);
853  if (resolution.data() != nullptr) {
854  updateCacheIndex = k;
855  return resolution;
856  }
857  }
858  }
859 
860  updateCacheIndex = choiceSize + kMissingOffset;
861  return Resolution(nullptr);
862  }
863 
865  Principal const& principal,
866  bool skipCurrentProcess,
867  ServiceToken const& token,
869  ModuleCallingContext const* mcc) const {
870  bool timeToMakeAtEnd = true;
871  if (madeAtEnd_ and mcc) {
872  timeToMakeAtEnd = mcc->parent().isAtEndTransition();
873  }
874 
875  //If timeToMakeAtEnd is false, then it is equivalent to skipping the current process
876  if (not skipCurrentProcess and timeToMakeAtEnd) {
877  waitingTasks_.add(waitTask);
878 
879  bool expected = false;
880  if (prefetchRequested_.compare_exchange_strong(expected, true)) {
881  //we are the first thread to request
882  tryPrefetchResolverAsync(0, principal, false, sra, mcc, token);
883  }
884  } else {
885  skippingWaitingTasks_.add(waitTask);
886  bool expected = false;
887  if (skippingPrefetchRequested_.compare_exchange_strong(expected, true)) {
888  //we are the first thread to request
889  tryPrefetchResolverAsync(0, principal, true, sra, mcc, token);
890  }
891  }
892  }
893 
894  void NoProcessProductResolver::setCache(bool iSkipCurrentProcess,
895  ProductResolverIndex iIndex,
896  std::exception_ptr iExceptPtr) const {
897  if (not iSkipCurrentProcess) {
898  lastCheckIndex_ = iIndex;
899  waitingTasks_.doneWaiting(iExceptPtr);
900  } else {
903  }
904  }
905 
906  namespace {
907  class TryNextResolverWaitingTask : public edm::WaitingTask {
908  public:
909  TryNextResolverWaitingTask(NoProcessProductResolver const* iResolver,
910  unsigned int iResolverIndex,
911  Principal const* iPrincipal,
913  ModuleCallingContext const* iMCC,
914  bool iSkipCurrentProcess,
915  ServiceToken iToken)
916  : resolver_(iResolver),
917  principal_(iPrincipal),
918  sra_(iSRA),
919  mcc_(iMCC),
920  serviceToken_(iToken),
921  index_(iResolverIndex),
922  skipCurrentProcess_(iSkipCurrentProcess) {}
923 
924  tbb::task* execute() override {
925  auto exceptPtr = exceptionPtr();
926  if (exceptPtr) {
927  resolver_->prefetchFailed(index_, *principal_, skipCurrentProcess_, *exceptPtr);
928  } else {
929  if (not resolver_->dataValidFromResolver(index_, *principal_, skipCurrentProcess_)) {
930  resolver_->tryPrefetchResolverAsync(
931  index_ + 1, *principal_, skipCurrentProcess_, sra_, mcc_, serviceToken_);
932  }
933  }
934  return nullptr;
935  }
936 
937  private:
938  NoProcessProductResolver const* resolver_;
939  Principal const* principal_;
941  ModuleCallingContext const* mcc_;
942  ServiceToken serviceToken_;
943  unsigned int index_;
944  bool skipCurrentProcess_;
945  };
946  } // namespace
947 
948  void NoProcessProductResolver::prefetchFailed(unsigned int iProcessingIndex,
949  Principal const& principal,
950  bool iSkipCurrentProcess,
951  std::exception_ptr iExceptPtr) const {
952  std::vector<unsigned int> const& lookupProcessOrder = principal.lookupProcessOrder();
953  auto k = lookupProcessOrder[iProcessingIndex];
954 
955  setCache(iSkipCurrentProcess, k, iExceptPtr);
956  }
957 
958  bool NoProcessProductResolver::dataValidFromResolver(unsigned int iProcessingIndex,
959  Principal const& principal,
960  bool iSkipCurrentProcess) const {
961  std::vector<unsigned int> const& lookupProcessOrder = principal.lookupProcessOrder();
962  auto k = lookupProcessOrder[iProcessingIndex];
963  ProductResolverBase const* productResolver = principal.getProductResolverByIndex(matchingHolders_[k]);
964 
965  if (productResolver->productWasFetchedAndIsValid(iSkipCurrentProcess)) {
966  setCache(iSkipCurrentProcess, k, nullptr);
967  return true;
968  }
969  return false;
970  }
971 
972  void NoProcessProductResolver::tryPrefetchResolverAsync(unsigned int iProcessingIndex,
973  Principal const& principal,
974  bool skipCurrentProcess,
976  ModuleCallingContext const* mcc,
977  ServiceToken token) const {
978  std::vector<unsigned int> const& lookupProcessOrder = principal.lookupProcessOrder();
979  auto index = iProcessingIndex;
980 
981  const unsigned int choiceSize = ambiguous_.size();
982  unsigned int newCacheIndex = choiceSize + kMissingOffset;
983  while (index < lookupProcessOrder.size()) {
984  auto k = lookupProcessOrder[index];
985  if (k == 0) {
986  break;
987  }
988  assert(k < ambiguous_.size());
989  if (ambiguous_[k]) {
990  newCacheIndex = choiceSize + kAmbiguousOffset;
991  break;
992  }
994  //make new task
995 
996  auto task = new (tbb::task::allocate_root())
997  TryNextResolverWaitingTask(this, index, &principal, sra, mcc, skipCurrentProcess, token);
998  task->increment_ref_count();
999  ProductResolverBase const* productResolver = principal.getProductResolverByIndex(matchingHolders_[k]);
1000 
1001  //Make sure the Services are available on this thread
1002  ServiceRegistry::Operate guard(token);
1003 
1004  productResolver->prefetchAsync(task, principal, skipCurrentProcess, token, sra, mcc);
1005  if (0 == task->decrement_ref_count()) {
1006  tbb::task::spawn(*task);
1007  }
1008  return;
1009  }
1010  ++index;
1011  }
1012  //data product unavailable
1013  setCache(skipCurrentProcess, newCacheIndex, nullptr);
1014  }
1015 
1017  ProcessHistory const&,
1018  ProductID const&) {}
1019 
1021 
1023 
1024  inline unsigned int NoProcessProductResolver::unsetIndexValue() const { return ambiguous_.size() + kUnsetOffset; }
1025 
1027  const auto resetValue = unsetIndexValue();
1028  lastCheckIndex_ = resetValue;
1029  lastSkipCurrentCheckIndex_ = resetValue;
1030  prefetchRequested_ = false;
1032  waitingTasks_.reset();
1034  }
1035 
1036  bool NoProcessProductResolver::singleProduct_() const { return false; }
1037 
1040  << "NoProcessProductResolver::unscheduledWasNotRun_() not implemented and should never be called.\n"
1041  << "Contact a Framework developer\n";
1042  }
1043 
1046  << "NoProcessProductResolver::productUnavailable_() not implemented and should never be called.\n"
1047  << "Contact a Framework developer\n";
1048  }
1049 
1052  << "NoProcessProductResolver::productResolved_() not implemented and should never be called.\n"
1053  << "Contact a Framework developer\n";
1054  }
1055 
1058  << "NoProcessProductResolver::productWasDeleted_() not implemented and should never be called.\n"
1059  << "Contact a Framework developer\n";
1060  }
1061 
1062  bool NoProcessProductResolver::productWasFetchedAndIsValid_(bool /*iSkipCurrentProcess*/) const {
1064  << "NoProcessProductResolver::productWasFetchedAndIsValid_() not implemented and should never be called.\n"
1065  << "Contact a Framework developer\n";
1066  }
1067 
1068  void NoProcessProductResolver::putProduct_(std::unique_ptr<WrapperBase>) const {
1070  << "NoProcessProductResolver::putProduct_() not implemented and should never be called.\n"
1071  << "Contact a Framework developer\n";
1072  }
1073 
1074  void NoProcessProductResolver::putOrMergeProduct_(std::unique_ptr<WrapperBase> edp,
1075  MergeableRunProductMetadata const*) const {
1077  << "NoProcessProductResolver::putOrMergeProduct_(std::unique_ptr<WrapperBase> edp, MergeableRunProductMetadata "
1078  "const*) not implemented and should never be called.\n"
1079  << "Contact a Framework developer\n";
1080  }
1081 
1084  << "NoProcessProductResolver::branchDescription_() not implemented and should never be called.\n"
1085  << "Contact a Framework developer\n";
1086  }
1087 
1088  void NoProcessProductResolver::resetBranchDescription_(std::shared_ptr<BranchDescription const>) {
1090  << "NoProcessProductResolver::resetBranchDescription_() not implemented and should never be called.\n"
1091  << "Contact a Framework developer\n";
1092  }
1093 
1096  << "NoProcessProductResolver::provenance_() not implemented and should never be called.\n"
1097  << "Contact a Framework developer\n";
1098  }
1099 
1102  << "NoProcessProductResolver::connectTo() not implemented and should never be called.\n"
1103  << "Contact a Framework developer\n";
1104  }
1105 
1106  //---- SingleChoiceNoProcessProductResolver ----------------
1108  Principal const& principal,
1109  bool skipCurrentProcess,
1111  ModuleCallingContext const* mcc) const {
1112  //NOTE: Have to lookup the other ProductResolver each time rather than cache
1113  // it's pointer since it appears the pointer can change at some later stage
1114  return principal.getProductResolverByIndex(realResolverIndex_)
1115  ->resolveProduct(principal, skipCurrentProcess, sra, mcc);
1116  }
1117 
1119  Principal const& principal,
1120  bool skipCurrentProcess,
1121  ServiceToken const& token,
1123  ModuleCallingContext const* mcc) const {
1124  principal.getProductResolverByIndex(realResolverIndex_)
1125  ->prefetchAsync(waitTask, principal, skipCurrentProcess, token, sra, mcc);
1126  }
1127 
1129  ProcessHistory const&,
1130  ProductID const&) {}
1131 
1133 
1135 
1137 
1139 
1142  << "SingleChoiceNoProcessProductResolver::unscheduledWasNotRun_() not implemented and should never be called.\n"
1143  << "Contact a Framework developer\n";
1144  }
1145 
1148  << "SingleChoiceNoProcessProductResolver::productUnavailable_() not implemented and should never be called.\n"
1149  << "Contact a Framework developer\n";
1150  }
1151 
1154  << "SingleChoiceNoProcessProductResolver::productResolved_() not implemented and should never be called.\n"
1155  << "Contact a Framework developer\n";
1156  }
1157 
1160  << "SingleChoiceNoProcessProductResolver::productWasDeleted_() not implemented and should never be called.\n"
1161  << "Contact a Framework developer\n";
1162  }
1163 
1165  throw Exception(errors::LogicError) << "SingleChoiceNoProcessProductResolver::productWasFetchedAndIsValid_() not "
1166  "implemented and should never be called.\n"
1167  << "Contact a Framework developer\n";
1168  }
1169 
1170  void SingleChoiceNoProcessProductResolver::putProduct_(std::unique_ptr<WrapperBase>) const {
1172  << "SingleChoiceNoProcessProductResolver::putProduct_() not implemented and should never be called.\n"
1173  << "Contact a Framework developer\n";
1174  }
1175 
1176  void SingleChoiceNoProcessProductResolver::putOrMergeProduct_(std::unique_ptr<WrapperBase> edp,
1177  MergeableRunProductMetadata const*) const {
1179  << "SingleChoiceNoProcessProductResolver::putOrMergeProduct_(std::unique_ptr<WrapperBase> edp, "
1180  "MergeableRunProductMetadata const*) not implemented and should never be called.\n"
1181  << "Contact a Framework developer\n";
1182  }
1183 
1186  << "SingleChoiceNoProcessProductResolver::branchDescription_() not implemented and should never be called.\n"
1187  << "Contact a Framework developer\n";
1188  }
1189 
1190  void SingleChoiceNoProcessProductResolver::resetBranchDescription_(std::shared_ptr<BranchDescription const>) {
1191  throw Exception(errors::LogicError) << "SingleChoiceNoProcessProductResolver::resetBranchDescription_() not "
1192  "implemented and should never be called.\n"
1193  << "Contact a Framework developer\n";
1194  }
1195 
1198  << "SingleChoiceNoProcessProductResolver::provenance_() not implemented and should never be called.\n"
1199  << "Contact a Framework developer\n";
1200  }
1201 
1204  << "SingleChoiceNoProcessProductResolver::connectTo() not implemented and should never be called.\n"
1205  << "Contact a Framework developer\n";
1206  }
1207 
1208 } // namespace edm
static constexpr const ProductStatus defaultStatus_
size
Write out results.
void connectTo(ProductResolverBase const &iOther, Principal const *) final
Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
std::string const & branchName() const
std::string const & productInstanceName() const
bool isAtEndTransition() const
std::unique_ptr< T, F > make_sentry(T *iObject, F iFunc)
NOTE: if iObject is null, then iFunc will not be called.
Definition: make_sentry.h:30
DataManagingOrAliasProductResolver const & realProduct() const
void resetProductData_(bool deleteEarly) override
Provenance const * provenance() const
BranchType const & branchType() const
void callWhenDoneAsync(WaitingTask *task)
Definition: Worker.h:167
WaitingTaskList & waitingTasks() const
void setProcessHistory(ProcessHistory const &ph)
Definition: ProductData.h:61
unsigned int ProductResolverIndex
StreamContext const * getStreamContext() const
void prefetchAsync_(WaitingTask *waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const final
void resetProductData_(bool deleteEarly) override
SwitchBaseProductResolver(std::shared_ptr< BranchDescription const > bd, DataManagingOrAliasProductResolver &realProduct)
void checkType(WrapperBase const &prod) const
std::type_info const & dynamicTypeInfo() const
Definition: WrapperBase.h:46
Resolution resolveProductImpl(Resolution) const
void prefetchAsync(WaitingTask *waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const
unsigned int unsetIndexValue() const
void putOrMergeProduct_(std::unique_ptr< WrapperBase > edp, MergeableRunProductMetadata const *mergeableRunProductMetadata) const final
void resetProductData_(bool deleteEarly) override
void setMergeableRunProductMetadata_(MergeableRunProductMetadata const *) override
void prefetchAsync_(WaitingTask *waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
FunctorTask< F > * make_functor_task(ALLOC &&iAlloc, F f)
Definition: FunctorTask.h:47
ParentageID id() const
Definition: Parentage.cc:23
void setCache(bool skipCurrentProcess, ProductResolverIndex index, std::exception_ptr exceptionPtr) const
void add(WaitingTask *)
Adds task to the waiting list.
std::atomic< bool > prefetchRequested_
void setProcessHistory_(ProcessHistory const &ph) override
ProductStatus status() const
bool singleProduct_() const override
bool unscheduledWasNotRun_() const override
void putProduct_(std::unique_ptr< WrapperBase > edp) const override
Resolution tryResolver(unsigned int index, Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const
WrapperBase const * wrapper() const
Definition: ProductData.h:34
std::string const & processName() const
void resetProductData_(bool deleteEarly) override
ProductProvenance const * productProvenance() const
Definition: Provenance.cc:35
void push(T &&iAction)
asynchronously pushes functor iAction into queue
Resolution resolveProduct(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const
void setProvenance_(ProductProvenanceRetriever const *provRetriever, ProcessHistory const &ph, ProductID const &pid) override
void reset()
Resets access to the resource so that added tasks will wait.
void setProvenance_(ProductProvenanceRetriever const *provRetriever, ProcessHistory const &ph, ProductID const &pid) final
Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const final
void connectTo(ProductResolverBase const &iOther, Principal const *) final
BranchDescription const & branchDescription_() const override
static unsigned int kUnsetOffset
void resetProductData_(bool deleteEarly) override
std::string const & processName() const
void putOrMergeProduct_(std::unique_ptr< WrapperBase > prod, MergeableRunProductMetadata const *mergeableRunProductMetadata) const final
void connectTo(ProductResolverBase const &, Principal const *) final
ParentContext const & parent() const
Definition: Electron.h:6
void putProduct_(std::unique_ptr< WrapperBase > edp) const final
DataManagingOrAliasProductResolver & realProduct_
void putProduct_(std::unique_ptr< WrapperBase > edp) const override
bool isPresent() const
Definition: WrapperBase.h:30
void putOrMergeProduct(std::unique_ptr< WrapperBase > edp, MergeableRunProductMetadata const *mergeableRunProductMetadata=nullptr) const
ProductProvenance const * productProvenancePtr_() const override
static unsigned int kMissingOffset
void putProduct_(std::unique_ptr< WrapperBase > edp) const override
void putProduct(std::unique_ptr< WrapperBase > edp) const
void unsafe_setWrapper(std::unique_ptr< WrapperBase > iValue) const
Definition: ProductData.cc:38
void prefetchFailed(unsigned int iProcessingIndex, Principal const &principal, bool iSkipCurrentProcess, std::exception_ptr iExceptPtr) const
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
void setParents(std::vector< BranchID > const &parents)
Definition: Parentage.h:46
void setProvenance(ProductProvenanceRetriever const *provRetriever, ProcessHistory const &ph, ProductID const &pid)
Definition: ProductData.h:65
def principal(options)
void setupUnscheduled(UnscheduledConfigurator const &iConfigure) final
bool isFromCurrentProcess() const final
void mergeProduct(std::unique_ptr< WrapperBase > edp, MergeableRunProductMetadata const *) const
std::string const & className() const
std::atomic< unsigned int > lastSkipCurrentCheckIndex_
void resetProductData()
Definition: ProductData.h:53
bool singleProduct_() const override
Provenance const * provenance_() const override
std::vector< unsigned int > const & lookupProcessOrder() const
Definition: Principal.h:196
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
void putOrMergeProduct_(std::unique_ptr< WrapperBase > prod, MergeableRunProductMetadata const *mergeableRunProductMetadata) const final
Provenance const * provenance_() const override
bool dataValidFromResolver(unsigned int iProcessingIndex, Principal const &principal, bool iSkipCurrentProcess) const
void setProvenance_(ProductProvenanceRetriever const *provRetriever, ProcessHistory const &ph, ProductID const &pid) override
TypeID unwrappedTypeID() const
void resetProductData_(bool deleteEarly) override
BranchDescription const & branchDescription_() const final
void setProcessHistory_(ProcessHistory const &ph) override
ProductProvenance const * productProvenancePtr_() const override
bool isFromCurrentProcess() const final
bool productUnavailable_() const override
void resetBranchDescription_(std::shared_ptr< BranchDescription const > bd) override
ProductProvenance const * productProvenancePtr_() const override
MergeDecision getMergeDecision(std::string const &processThatCreatedProduct) const
std::atomic< bool > prefetchRequested_
ProductProvenance const * productProvenancePtr_() const override
std::atomic< unsigned int > lastCheckIndex_
TypeWithDict const & unwrappedType() const
void setProvenance_(ProductProvenanceRetriever const *provRetriever, ProcessHistory const &ph, ProductID const &pid) final
void prefetchAsync_(WaitingTask *waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger but the state exists so we define the behavior If all triggers are the negative crieriion will lead to accepting the event(this again matches the behavior of"!*"before the partial wildcard feature was incorporated).The per-event"cost"of each negative criterion with multiple relevant triggers is about the same as!*was in the past
void setMergeableRunProductMetadata(MergeableRunProductMetadataBase const *mrpm)
Definition: ProductData.h:71
std::type_info const & unvalidatedTypeInfo() const
Definition: TypeWithDict.h:81
void putProduct_(std::unique_ptr< WrapperBase > edp) const override
virtual bool isFromCurrentProcess() const =0
void prefetchAsync_(WaitingTask *waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const final
Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const final
int k[5][pyjets_maxn]
void setProduct(std::unique_ptr< WrapperBase > edp) const
void prefetchAsync_(WaitingTask *waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
NoProcessProductResolver(std::vector< ProductResolverIndex > const &matchingHolders, std::vector< bool > const &ambiguous, bool madeAtEnd)
std::atomic< bool > skippingPrefetchRequested_
std::atomic< ProductStatus > theStatus_
std::vector< bool > ambiguous_
Resolution resolveProductImpl(FUNC resolver) const
DelayedReader * reader() const
Definition: Principal.h:186
void putOrMergeProduct_(std::unique_ptr< WrapperBase > prod, MergeableRunProductMetadata const *mergeableRunProductMetadata) const final
UnscheduledAuxiliary const * auxiliary() const
void resetProductData_(bool deleteEarly) override
void insertIntoSet(ProductProvenance provenanceProduct) const
static unsigned int kAmbiguousOffset
void setupUnscheduled(UnscheduledConfigurator const &) final
ProductStatus defaultStatus() const
ProductData const * data() const
bool productWasFetchedAndIsValid(bool iSkipCurrentProcess) const
void setMergeableRunProductMetadataInProductData(MergeableRunProductMetadata const *)
bool productWasFetchedAndIsValid_(bool iSkipCurrentProcess) const final
void setProcessHistory_(ProcessHistory const &ph) override
void setupUnscheduled(UnscheduledConfigurator const &) final
def load(fileName)
Definition: svgfig.py:547
BranchDescription const & branchDescription_() const override
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:87
void resetProductData_(bool deleteEarly) final
void putOrMergeProduct_(std::unique_ptr< WrapperBase > prod, MergeableRunProductMetadata const *mergeableRunProductMetadata) const final
void addContext(std::string const &context)
Definition: Exception.cc:165
bool singleProduct_() const override
void retrieveAndMerge_(Principal const &principal, MergeableRunProductMetadata const *mergeableRunProductMetadata) const override
std::vector< ProductResolverIndex > matchingHolders_
void resetBranchDescription_(std::shared_ptr< BranchDescription const > bd) override
void tryPrefetchResolverAsync(unsigned int iProcessingIndex, Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc, ServiceToken token) const
HLT enums.
BranchDescription const & branchDescription() const
Worker * findWorker(std::string const &iLabel) const
ConstProductResolverPtr getProductResolverByIndex(ProductResolverIndex const &oid) const
Definition: Principal.cc:535
ProductProvenance const * productProvenancePtr_() const final
void putProduct_(std::unique_ptr< WrapperBase > edp) const override
void setProvenance_(ProductProvenanceRetriever const *provRetriever, ProcessHistory const &ph, ProductID const &pid) override
void setProcessHistory_(ProcessHistory const &ph) override
void putProduct_(std::unique_ptr< WrapperBase > edp) const override
void setupUnscheduled(UnscheduledConfigurator const &) final
ProductData const & getProductData() const
void resetProductData_(bool deleteEarly) override=0
BranchID const & originalBranchID() const
bool productWasDeleted_() const override
Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
bool productWasFetchedAndIsValid_(bool iSkipCurrentProcess) const override
void resetProductData_(bool deleteEarly) override
std::atomic< bool > & prefetchRequested() const
Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
static ParentageRegistry * instance()
void setProvenance_(ProductProvenanceRetriever const *provRetriever, ProcessHistory const &ph, ProductID const &pid) override
void setProcessHistory_(ProcessHistory const &ph) final
bool productWasFetchedAndIsValid_(bool iSkipCurrentProcess) const override
bool insertMapped(value_type const &v)
std::shared_ptr< WrapperBase const > sharedConstWrapper() const
Definition: ProductData.h:36
void prefetchAsync_(WaitingTask *waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
void putOrMergeProduct_(std::unique_ptr< WrapperBase > prod, MergeableRunProductMetadata const *mergeableRunProductMetadata) const final
def move(src, dest)
Definition: eostools.py:511
def branchType(schema, name)
Definition: revisionDML.py:114
#define constexpr
std::string const & moduleLabel() const
ProductStatus status() const
WrapperBase * unsafe_wrapper() const
Definition: ProductData.h:35
static HepMC::HEPEVT_Wrapper wrapper
void putProduct_(std::unique_ptr< WrapperBase > edp) const override
void prefetchAsync_(WaitingTask *waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
void connectTo(ProductResolverBase const &iOther, Principal const *iParentPrincipal) final