CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
ProductResolvers.cc
Go to the documentation of this file.
1 /*----------------------------------------------------------------------
2 ----------------------------------------------------------------------*/
3 #include "ProductResolvers.h"
4 #include "Worker.h"
5 #include "UnscheduledAuxiliary.h"
17 
18 #include <cassert>
19 #include <utility>
20 
21 static constexpr unsigned int kUnsetOffset = 0;
22 static constexpr unsigned int kAmbiguousOffset = 1;
23 static constexpr unsigned int kMissingOffset = 2;
24 
25 namespace edm {
26 
29  exception << "ProductResolverBase::resolveProduct_: The product matching all criteria was already deleted\n"
30  << "Looking for type: " << branchDescription().unwrappedTypeID() << "\n"
31  << "Looking for module label: " << moduleLabel() << "\n"
32  << "Looking for productInstanceName: " << productInstanceName() << "\n"
33  << (processName().empty() ? "" : "Looking for process: ") << processName() << "\n"
34  << "This means there is a configuration error.\n"
35  << "The module which is asking for this data must be configured to state that it will read this data.";
36  throw exception;
37 
38  }
39 
40  //This is a templated function in order to avoid calling another virtual function
41  template <bool callResolver, typename FUNC>
44 
45  if(productWasDeleted()) {
47  }
48  auto presentStatus = status();
49 
50  if(callResolver && presentStatus == ProductStatus::ResolveNotRun) {
51  //if resolver fails because of exception or not setting product
52  // make sure the status goes to failed
53  auto failedStatusSetter = [this](ProductStatus* presentStatus) {
54  if(this->status() == ProductStatus::ResolveNotRun) {
55  this->setFailedStatus();
56  }
57  *presentStatus = this->status();
58  };
59  std::unique_ptr<ProductStatus, decltype(failedStatusSetter)> failedStatusGuard(&presentStatus, failedStatusSetter);
60 
61  //If successful, this will call setProduct
62  resolver();
63  }
64 
65 
66  if (presentStatus == ProductStatus::ProductSet) {
67  auto pd = &getProductData();
68  if(pd->wrapper()->isPresent()) {
69  return Resolution(pd);
70  }
71  }
72 
73  return Resolution(nullptr);
74  }
75 
76  void
77  DataManagingProductResolver::mergeProduct(std::unique_ptr<WrapperBase> iFrom) const {
79  if(not iFrom) { return;}
80 
81  checkType(*iFrom);
82 
84  if(original->isMergeable()) {
85  original->mergeProduct(iFrom.get());
86  } else if(original->hasIsProductEqual()) {
87  if(!original->isProductEqual(iFrom.get())) {
88  auto const& bd = branchDescription();
89  edm::LogError("RunLumiMerging")
90  << "ProductResolver::mergeTheProduct\n"
91  << "Two run/lumi products for the same run/lumi which should be equal are not\n"
92  << "Using the first, ignoring the second\n"
93  << "className = " << bd.className() << "\n"
94  << "moduleLabel = " << bd.moduleLabel() << "\n"
95  << "instance = " << bd.productInstanceName() << "\n"
96  << "process = " << bd.processName() << "\n";
97  }
98  } else {
99  auto const& bd = branchDescription();
100  edm::LogWarning("RunLumiMerging")
101  << "ProductResolver::mergeTheProduct\n"
102  << "Run/lumi product has neither a mergeProduct nor isProductEqual function\n"
103  << "Using the first, ignoring the second in merge\n"
104  << "className = " << bd.className() << "\n"
105  << "moduleLabel = " << bd.moduleLabel() << "\n"
106  << "instance = " << bd.productInstanceName() << "\n"
107  << "process = " << bd.processName() << "\n";
108  }
109  }
110 
111 
114  bool,
116  ModuleCallingContext const* mcc) const {
117  return resolveProductImpl<true>([this,&principal,mcc]() {
118  auto branchType = principal.branchType();
119  if(branchType != InEvent) {
120  //delayed get has not been allowed with Run or Lumis
121  // The file may already be closed so the reader is invalid
122  return;
123  }
124  if(mcc and (branchType == InEvent) and aux_) {
125  aux_->preModuleDelayedGetSignal_.emit(*(mcc->getStreamContext()),*mcc);
126  }
127 
128  auto sentry( make_sentry(mcc,
129  [this, branchType](ModuleCallingContext const* iContext){
130  if(branchType == InEvent and aux_) {
131  aux_->postModuleDelayedGetSignal_.emit(*(iContext->getStreamContext()), *iContext); }
132  }));
133 
134  if(auto reader=principal.reader()) {
135  std::unique_lock<std::recursive_mutex> guard;
136  if(auto sr = reader->sharedResources().second) {
137  guard =std::unique_lock<std::recursive_mutex>(*sr);
138  }
139  if ( not productResolved()) {
140  //another thread could have beaten us here
141  putProduct( reader->getProduct(BranchKey(branchDescription()), &principal, mcc));
142  }
143  }
144  });
145 
146  }
147 
148  void
150  if(auto reader = principal.reader()) {
151 
152  std::unique_lock<std::recursive_mutex> guard;
153  if(auto sr = reader->sharedResources().second) {
154  guard =std::unique_lock<std::recursive_mutex>(*sr);
155  }
156 
157  //Can't use resolveProductImpl since it first checks to see
158  // if the product was already retrieved and then returns if it is
159  BranchKey const bk = BranchKey(branchDescription());
160  std::unique_ptr<WrapperBase> edp(reader->getProduct(bk, &principal));
161 
162  if(edp.get() != nullptr) {
164  } else if( status()== defaultStatus()) {
165  setFailedStatus();
166  }
167  }
168  }
169 
170 
171  template<typename F>
172  class FunctorTask : public tbb::task {
173  public:
174  explicit FunctorTask( F f): func_(f) {}
175 
176  task* execute() override {
177  func_();
178  return nullptr;
179  };
180 
181  private:
182  F func_;
183  };
184 
185  template< typename ALLOC, typename F>
186  FunctorTask<F>* make_functor_task( ALLOC&& iAlloc, F f) {
187  return new (iAlloc) FunctorTask<F>(f);
188  }
189 
191  Principal const& principal,
192  bool skipCurrentProcess,
194  ModuleCallingContext const* mcc) const {
195  m_waitingTasks.add(waitTask);
196 
197  bool expected = false;
198  if( m_prefetchRequested.compare_exchange_strong(expected, true) ) {
199 
200  //need to make sure Service system is activated on the reading thread
202  auto workToDo = [this, mcc, &principal, token] () {
204  try {
205  resolveProductImpl<true>([this,&principal,mcc]() {
206  if(principal.branchType() != InEvent) { return; }
207  if(auto reader = principal.reader()) {
208  std::unique_lock<std::recursive_mutex> guard;
209  if(auto sr = reader->sharedResources().second) {
210  guard =std::unique_lock<std::recursive_mutex>(*sr);
211  }
212  if ( not productResolved()) {
213  //another thread could have finished this while we were waiting
214  putProduct( reader->getProduct(BranchKey(branchDescription()), &principal, mcc));
215  }
216  }
217  });
218  } catch(...) {
219  this->m_waitingTasks.doneWaiting(std::current_exception());
220  return;
221  }
222  this->m_waitingTasks.doneWaiting(nullptr);
223  };
224 
225  SerialTaskQueueChain* queue = nullptr;
226  if(auto reader = principal.reader()) {
227  if (auto shared_res = reader->sharedResources().first) {
228  queue = &(shared_res->serialQueueChain());
229  }
230  }
231  if(queue) {
232  queue->push(workToDo);
233  } else {
234  //Have to create a new task
235  auto t = make_functor_task(tbb::task::allocate_root(),
236  workToDo);
237  tbb::task::spawn(*t);
238  }
239  }
240  }
241 
242  void
244  m_prefetchRequested = false;
247  }
248 
249  void
251  aux_ = iConfigure.auxiliary();
252  }
253 
254 
255  bool
257  return false;
258  }
259 
260 
263  bool skipCurrentProcess,
265  ModuleCallingContext const*) const {
266  if (!skipCurrentProcess) {
267  //'false' means never call the lambda function
268  return resolveProductImpl<false>([](){return;});
269  }
270  return Resolution(nullptr);
271  }
272 
274  Principal const& principal,
275  bool skipCurrentProcess,
277  ModuleCallingContext const* mcc) const {
278  if(not skipCurrentProcess) {
279  m_waitingTasks.add(waitTask);
280 
281  bool expected = false;
282  if(worker_ and prefetchRequested_.compare_exchange_strong(expected,true)) {
283  //using a waiting task to do a callback guarantees that
284  // the m_waitingTasks list will be released from waiting even
285  // if the module does not put this data product or the
286  // module has an exception while running
287 
288  auto waiting = make_waiting_task(tbb::task::allocate_root(),
289  [this](std::exception_ptr const * iException) {
290  if(nullptr != iException) {
291  m_waitingTasks.doneWaiting(*iException);
292  } else {
293  m_waitingTasks.doneWaiting(std::exception_ptr());
294  }
295  });
296  worker_->callWhenDoneAsync(waiting);
297  }
298  }
299  }
300 
301  void
302  PuttableProductResolver::putProduct_(std::unique_ptr<WrapperBase> edp) const {
304  bool expected = false;
305  if(prefetchRequested_.compare_exchange_strong(expected,true)) {
306  m_waitingTasks.doneWaiting(std::exception_ptr());
307  }
308  }
309 
310 
311  void
315  prefetchRequested_ = false;
316  }
317 
318  void
321  }
322 
323 
324  void
326  aux_ = iConfigure.auxiliary();
328  assert(worker_ != nullptr);
329 
330  }
331 
334  bool skipCurrentProcess,
336  ModuleCallingContext const* mcc) const {
337  if (!skipCurrentProcess and worker_) {
338  return resolveProductImpl<true>(
339  [&principal,this,sra,mcc]() {
340  try {
341  auto const& event = static_cast<EventPrincipal const&>(principal);
342  ParentContext parentContext(mcc);
344 
345  auto workCall = [this,&event,&parentContext,mcc] () {
346  auto sentry( make_sentry(mcc,[this](ModuleCallingContext const* iContext){aux_->postModuleDelayedGetSignal_.emit(*(iContext->getStreamContext()), *iContext); }));
347 
349  event,
350  *(aux_->eventSetup()),
351  event.streamID(),
352  parentContext,
353  mcc->getStreamContext());
354  };
355 
356  if (sra) {
357  assert(false);
358  } else {
359  workCall();
360  }
361 
362  }
363  catch (cms::Exception & ex) {
364  std::ostringstream ost;
365  ost << "Calling produce method for unscheduled module "
366  << worker_->description().moduleName() << "/'"
367  << worker_->description().moduleLabel() << "'";
368  ex.addContext(ost.str());
369  throw;
370  }
371  });
372  }
373  return Resolution(nullptr);
374  }
375 
376  void
378  Principal const& principal,
379  bool skipCurrentProcess,
381  ModuleCallingContext const* mcc) const
382  {
383  if(skipCurrentProcess) { return; }
384  waitingTasks_.add(waitTask);
385  bool expected = false;
386  if(prefetchRequested_.compare_exchange_strong(expected, true)) {
387 
388  //Have to create a new task which will make sure the state for UnscheduledProductResolver
389  // is properly set after the module has run
390  auto t = make_waiting_task(tbb::task::allocate_root(),
391  [this,&principal, skipCurrentProcess,sra,mcc](std::exception_ptr const* iPtr)
392  {
393  //The exception is being rethrown because resolveProductImpl sets the ProductResolver to a failed
394  // state for the case where an exception occurs during the call to the function.
395  try {
396  resolveProductImpl<true>([iPtr]() {
397  if ( iPtr) {
398  std::rethrow_exception(*iPtr);
399  }
400  });
401  } catch(...) {
402  waitingTasks_.doneWaiting(std::current_exception());
403  return;
404  }
405  waitingTasks_.doneWaiting(nullptr);
406  } );
407  auto const& event = static_cast<EventPrincipal const&>(principal);
408  ParentContext parentContext(mcc);
409 
411  event,
412  *(aux_->eventSetup()),
413  event.streamID(),
414  parentContext,
415  mcc->getStreamContext());
416  }
417  }
418 
419  void
421  prefetchRequested_ = false;
424  }
425 
426 
427  void
428  ProducedProductResolver::putProduct_(std::unique_ptr<WrapperBase> edp) const {
429  if(status() != defaultStatus()) {
431  << "Attempt to insert more than one product on branch " << branchDescription().branchName() << "\n";
432  }
433 
434  setProduct(std::move(edp)); // ProductResolver takes ownership
435  }
436 
437  void
438  InputProductResolver::putProduct_(std::unique_ptr<WrapperBase> edp) const {
439  if ( not productResolved()) {
440  //Another thread could have set this
441  setProduct(std::move(edp));
442  }
443  }
444 
445  bool
447  return true;
448  }
449 
450  void
453  resetProductData_(false);
454  }
455  }
456 
457 
458  void
460  assert(false);
461  }
462 
463  void
464  DataManagingProductResolver::putOrMergeProduct_(std::unique_ptr<WrapperBase> prod) const {
465  if(not prod) {return;}
466  if(status() == defaultStatus()) {
467  //resolveProduct has not been called or it failed
468  putProduct(std::move(prod));
469  } else {
470  mergeProduct(std::move(prod));
471  }
472  }
473 
474 
475 
476  void
478  // Check if the types match.
479  TypeID typeID(prod.dynamicTypeInfo());
480  if(typeID != branchDescription().unwrappedTypeID()) {
481  // Types do not match.
483  << "Product on branch " << branchDescription().branchName() << " is of wrong type.\n"
484  << "It is supposed to be of type " << branchDescription().className() << ".\n"
485  << "It is actually of type " << typeID.className() << ".\n";
486  }
487  }
488 
489 
490  void
491  DataManagingProductResolver::setProduct(std::unique_ptr<WrapperBase> edp) const {
492  if(edp) {
493  checkType(*edp);
496  } else {
497  setFailedStatus();
498  }
499  }
500  // This routine returns true if it is known that currently there is no real product.
501  // If there is a real product, it returns false.
502  // If it is not known if there is a real product, it returns false.
503  bool
505  auto presentStatus = status();
506  if(presentStatus == ProductStatus::ProductSet) {
507  return !(getProductData().wrapper()->isPresent());
508  }
509  return presentStatus != ProductStatus::ResolveNotRun;
510  }
511 
512  bool
514  auto s = status();
515  return (s != defaultStatus() ) or (s == ProductStatus::ProductDeleted);
516  }
517 
518 
519  // This routine returns true if the product was deleted early in order to save memory
520  bool
523  }
524 
525  bool
527  if (iSkipCurrentProcess and isFromCurrentProcess() ) {
528  return false;
529  }
531  if(getProductData().wrapper()->isPresent()) {
532  return true;
533  }
534  }
535  return false;
536  }
537 
538 
540  productData_.setProvenance(provRetriever,ph,pid);
541  }
542 
545  }
546 
548  return provenance()->productProvenance();
549  }
550 
554  }
555  if(deleteEarly) {
557  } else {
558  resetStatus();
559  }
560  }
561 
563  return true;
564  }
565 
567  realProduct_.setProvenance(provRetriever,ph,pid);
568  }
569 
572  }
573 
575  return provenance()->productProvenance();
576  }
577 
579  realProduct_.resetProductData_(deleteEarly);
580  }
581 
583  return true;
584  }
585 
586  void AliasProductResolver::putProduct_(std::unique_ptr<WrapperBase> ) const {
588  << "AliasProductResolver::putProduct_() not implemented and should never be called.\n"
589  << "Contact a Framework developer\n";
590  }
591 
592  void AliasProductResolver::putOrMergeProduct_(std::unique_ptr<WrapperBase> edp) const {
594  << "AliasProductResolver::putOrMergeProduct_(std::unique_ptr<WrapperBase> edp) not implemented and should never be called.\n"
595  << "Contact a Framework developer\n";
596  }
597 
598 
599 
601  provRetriever_ = provRetriever;
602  }
603 
605  }
606 
608  return provRetriever_? provRetriever_->branchIDToProvenance(bd_->branchID()): nullptr;
609  }
610 
612  }
613 
615  return true;
616  }
617 
618  void ParentProcessProductResolver::putProduct_(std::unique_ptr<WrapperBase> ) const {
620  << "ParentProcessProductResolver::putProduct_() not implemented and should never be called.\n"
621  << "Contact a Framework developer\n";
622  }
623 
624  void ParentProcessProductResolver::putOrMergeProduct_(std::unique_ptr<WrapperBase> edp) const {
626  << "ParentProcessProductResolver::putOrMergeProduct_(std::unique_ptr<WrapperBase> edp) not implemented and should never be called.\n"
627  << "Contact a Framework developer\n";
628  }
629 
631  NoProcessProductResolver(std::vector<ProductResolverIndex> const& matchingHolders,
632  std::vector<bool> const& ambiguous) :
633  matchingHolders_(matchingHolders),
634  ambiguous_(ambiguous),
635  lastCheckIndex_(ambiguous_.size() + kUnsetOffset),
636  lastSkipCurrentCheckIndex_(lastCheckIndex_.load()),
637  prefetchRequested_(false),
638  skippingPrefetchRequested_(false) {
639  assert(ambiguous_.size() == matchingHolders_.size());
640  }
641 
644  Principal const& principal,
645  bool skipCurrentProcess,
647  ModuleCallingContext const* mcc) const {
648  ProductResolverBase const* productResolver = principal.getProductResolverByIndex(matchingHolders_[index]);
649  return productResolver->resolveProduct(principal, skipCurrentProcess, sra, mcc);
650  }
651 
652 
655  bool skipCurrentProcess,
657  ModuleCallingContext const* mcc) const {
658  //See if we've already cached which Resolver we should call or if
659  // we know it is ambiguous
660  const unsigned int choiceSize = ambiguous_.size();
661  {
662  unsigned int checkCacheIndex = skipCurrentProcess? lastSkipCurrentCheckIndex_.load() : lastCheckIndex_.load();
663  if( checkCacheIndex != choiceSize +kUnsetOffset) {
664  if (checkCacheIndex == choiceSize+kAmbiguousOffset) {
666  } else if(checkCacheIndex == choiceSize+kMissingOffset) {
667  return Resolution(nullptr);
668  }
669  return tryResolver(checkCacheIndex, principal, skipCurrentProcess,
670  sra,mcc);
671  }
672  }
673 
674  std::atomic<unsigned int>& updateCacheIndex = skipCurrentProcess? lastSkipCurrentCheckIndex_ : lastCheckIndex_;
675 
676  std::vector<unsigned int> const& lookupProcessOrder = principal.lookupProcessOrder();
677  for(unsigned int k : lookupProcessOrder) {
678  assert(k < ambiguous_.size());
679  if(k == 0) break; // Done
680  if(ambiguous_[k]) {
681  updateCacheIndex = choiceSize + kAmbiguousOffset;
683  }
685  auto resolution = tryResolver(k,principal, skipCurrentProcess, sra,mcc);
686  if(resolution.data() != nullptr) {
687  updateCacheIndex = k;
688  return resolution;
689  }
690  }
691  }
692 
693  updateCacheIndex = choiceSize + kMissingOffset;
694  return Resolution(nullptr);
695  }
696 
697  void
699  Principal const& principal,
700  bool skipCurrentProcess,
702  ModuleCallingContext const* mcc) const {
703  if(not skipCurrentProcess) {
704  waitingTasks_.add(waitTask);
705 
706  bool expected = false;
707  if( prefetchRequested_.compare_exchange_strong(expected,true)) {
708  //we are the first thread to request
709  tryPrefetchResolverAsync(0, principal, skipCurrentProcess, sra, mcc, ServiceRegistry::instance().presentToken());
710  }
711  } else {
712  skippingWaitingTasks_.add(waitTask);
713  bool expected = false;
714  if( skippingPrefetchRequested_.compare_exchange_strong(expected,true)) {
715  //we are the first thread to request
716  tryPrefetchResolverAsync(0, principal, skipCurrentProcess, sra, mcc, ServiceRegistry::instance().presentToken());
717  }
718  }
719  }
720 
721  void NoProcessProductResolver::setCache(bool iSkipCurrentProcess,
722  ProductResolverIndex iIndex,
723  std::exception_ptr iExceptPtr) const {
724  if( not iSkipCurrentProcess) {
725  lastCheckIndex_ = iIndex;
726  waitingTasks_.doneWaiting(iExceptPtr);
727  } else {
730  }
731  }
732 
733  namespace {
734  class TryNextResolverWaitingTask : public edm::WaitingTask {
735  public:
736 
737  TryNextResolverWaitingTask(NoProcessProductResolver const* iResolver,
738  unsigned int iResolverIndex,
739  Principal const* iPrincipal,
741  ModuleCallingContext const* iMCC,
742  bool iSkipCurrentProcess,
743  ServiceToken iToken) :
744  resolver_(iResolver),
745  principal_(iPrincipal),
746  sra_(iSRA),
747  mcc_(iMCC),
748  serviceToken_(iToken),
749  index_(iResolverIndex),
750  skipCurrentProcess_(iSkipCurrentProcess){}
751 
752  tbb::task* execute() override {
753  auto exceptPtr =exceptionPtr();
754  if(exceptPtr) {
755  resolver_->prefetchFailed(index_, *principal_, skipCurrentProcess_, *exceptPtr);
756  } else {
757  if(not resolver_->dataValidFromResolver(index_, *principal_, skipCurrentProcess_)) {
758  resolver_->tryPrefetchResolverAsync(index_+1,
759  *principal_,
760  skipCurrentProcess_,
761  sra_,
762  mcc_,
763  serviceToken_);
764  }
765  }
766  return nullptr;
767  }
768 
769  private:
770  NoProcessProductResolver const* resolver_;
771  Principal const* principal_;
773  ModuleCallingContext const* mcc_;
774  ServiceToken serviceToken_;
775  unsigned int index_;
776  bool skipCurrentProcess_;
777  };
778  }
779 
780  void
781  NoProcessProductResolver::prefetchFailed(unsigned int iProcessingIndex,
782  Principal const& principal,
783  bool iSkipCurrentProcess,
784  std::exception_ptr iExceptPtr) const {
785  std::vector<unsigned int> const& lookupProcessOrder = principal.lookupProcessOrder();
786  auto k = lookupProcessOrder[iProcessingIndex];
787 
788  setCache(iSkipCurrentProcess, k, iExceptPtr);
789  }
790 
791 
792  bool
793  NoProcessProductResolver::dataValidFromResolver(unsigned int iProcessingIndex,
794  Principal const& principal,
795  bool iSkipCurrentProcess) const {
796  std::vector<unsigned int> const& lookupProcessOrder = principal.lookupProcessOrder();
797  auto k = lookupProcessOrder[iProcessingIndex];
798  ProductResolverBase const* productResolver = principal.getProductResolverByIndex(matchingHolders_[k]);
799 
800  if(productResolver->productWasFetchedAndIsValid(iSkipCurrentProcess)) {
801 
802  setCache(iSkipCurrentProcess, k, nullptr);
803  return true;
804  }
805  return false;
806  }
807 
808 
809  void
811  Principal const& principal,
812  bool skipCurrentProcess,
814  ModuleCallingContext const* mcc,
815  ServiceToken token) const {
816  std::vector<unsigned int> const& lookupProcessOrder = principal.lookupProcessOrder();
817  auto index = iProcessingIndex;
818 
819  const unsigned int choiceSize = ambiguous_.size();
820  unsigned int newCacheIndex = choiceSize + kMissingOffset;
821  while(index < lookupProcessOrder.size()) {
822  auto k = lookupProcessOrder[index];
823  if(k==0) {
824  break;
825  }
826  assert(k < ambiguous_.size());
827  if(ambiguous_[k]) {
828  newCacheIndex = choiceSize + kAmbiguousOffset;
829  break;
830  }
832  //make new task
833 
834  auto task = new (tbb::task::allocate_root()) TryNextResolverWaitingTask(
835  this,
836  index,
837  &principal,
838  sra,
839  mcc,
840  skipCurrentProcess,
841  token
842  );
843  task->increment_ref_count();
844  ProductResolverBase const* productResolver = principal.getProductResolverByIndex(matchingHolders_[k]);
845 
846  //Make sure the Services are available on this thread
847  ServiceRegistry::Operate guard(token);
848 
849  productResolver->prefetchAsync(task,
850  principal,
851  skipCurrentProcess,
852  sra, mcc);
853  if(0 == task->decrement_ref_count()) {
854  tbb::task::spawn(*task);
855  }
856  return;
857  }
858  ++index;
859  }
860  //data product unavailable
861  setCache(skipCurrentProcess, newCacheIndex, nullptr);
862  }
863 
865  }
866 
868  }
869 
871  return nullptr;
872  }
873 
874  inline unsigned int NoProcessProductResolver::unsetIndexValue() const { return ambiguous_.size()+kUnsetOffset; }
875 
877  const auto resetValue = unsetIndexValue();
878  lastCheckIndex_ = resetValue;
879  lastSkipCurrentCheckIndex_ = resetValue;
880  prefetchRequested_ = false;
884  }
885 
887  return false;
888  }
889 
892  << "NoProcessProductResolver::unscheduledWasNotRun_() not implemented and should never be called.\n"
893  << "Contact a Framework developer\n";
894  }
895 
898  << "NoProcessProductResolver::productUnavailable_() not implemented and should never be called.\n"
899  << "Contact a Framework developer\n";
900  }
901 
904  << "NoProcessProductResolver::productResolved_() not implemented and should never be called.\n"
905  << "Contact a Framework developer\n";
906  }
907 
910  << "NoProcessProductResolver::productWasDeleted_() not implemented and should never be called.\n"
911  << "Contact a Framework developer\n";
912  }
913 
914  bool NoProcessProductResolver::productWasFetchedAndIsValid_(bool /*iSkipCurrentProcess*/) const {
916  << "NoProcessProductResolver::productWasFetchedAndIsValid_() not implemented and should never be called.\n"
917  << "Contact a Framework developer\n";
918  }
919 
920  void NoProcessProductResolver::putProduct_(std::unique_ptr<WrapperBase> ) const {
922  << "NoProcessProductResolver::putProduct_() not implemented and should never be called.\n"
923  << "Contact a Framework developer\n";
924  }
925 
926  void NoProcessProductResolver::putOrMergeProduct_(std::unique_ptr<WrapperBase> edp) const {
928  << "NoProcessProductResolver::putOrMergeProduct_(std::unique_ptr<WrapperBase> edp) not implemented and should never be called.\n"
929  << "Contact a Framework developer\n";
930  }
931 
934  << "NoProcessProductResolver::branchDescription_() not implemented and should never be called.\n"
935  << "Contact a Framework developer\n";
936  }
937 
938  void NoProcessProductResolver::resetBranchDescription_(std::shared_ptr<BranchDescription const>) {
940  << "NoProcessProductResolver::resetBranchDescription_() not implemented and should never be called.\n"
941  << "Contact a Framework developer\n";
942  }
943 
946  << "NoProcessProductResolver::provenance_() not implemented and should never be called.\n"
947  << "Contact a Framework developer\n";
948  }
949 
952  << "NoProcessProductResolver::connectTo() not implemented and should never be called.\n"
953  << "Contact a Framework developer\n";
954 
955  }
956 
957  //---- SingleChoiceNoProcessProductResolver ----------------
959  Principal const& principal,
960  bool skipCurrentProcess,
962  ModuleCallingContext const* mcc) const
963  {
964  //NOTE: Have to lookup the other ProductResolver each time rather than cache
965  // it's pointer since it appears the pointer can change at some later stage
967  ->resolveProduct(principal,
968  skipCurrentProcess, sra, mcc);
969  }
970 
972  Principal const& principal,
973  bool skipCurrentProcess,
975  ModuleCallingContext const* mcc) const {
977  ->prefetchAsync(waitTask,principal,
978  skipCurrentProcess, sra, mcc);
979  }
980 
982  }
983 
985  }
986 
988  return nullptr;
989  }
990 
992  }
993 
995  return false;
996  }
997 
1000  << "SingleChoiceNoProcessProductResolver::unscheduledWasNotRun_() not implemented and should never be called.\n"
1001  << "Contact a Framework developer\n";
1002  }
1003 
1006  << "SingleChoiceNoProcessProductResolver::productUnavailable_() not implemented and should never be called.\n"
1007  << "Contact a Framework developer\n";
1008  }
1009 
1012  << "SingleChoiceNoProcessProductResolver::productResolved_() not implemented and should never be called.\n"
1013  << "Contact a Framework developer\n";
1014  }
1015 
1018  << "SingleChoiceNoProcessProductResolver::productWasDeleted_() not implemented and should never be called.\n"
1019  << "Contact a Framework developer\n";
1020  }
1021 
1024  << "SingleChoiceNoProcessProductResolver::productWasFetchedAndIsValid_() not implemented and should never be called.\n"
1025  << "Contact a Framework developer\n";
1026  }
1027 
1028 
1029  void SingleChoiceNoProcessProductResolver::putProduct_(std::unique_ptr<WrapperBase> ) const {
1031  << "SingleChoiceNoProcessProductResolver::putProduct_() not implemented and should never be called.\n"
1032  << "Contact a Framework developer\n";
1033  }
1034 
1035  void SingleChoiceNoProcessProductResolver::putOrMergeProduct_(std::unique_ptr<WrapperBase> edp) const {
1037  << "SingleChoiceNoProcessProductResolver::putOrMergeProduct_(std::unique_ptr<WrapperBase> edp) not implemented and should never be called.\n"
1038  << "Contact a Framework developer\n";
1039  }
1040 
1043  << "SingleChoiceNoProcessProductResolver::branchDescription_() not implemented and should never be called.\n"
1044  << "Contact a Framework developer\n";
1045  }
1046 
1047  void SingleChoiceNoProcessProductResolver::resetBranchDescription_(std::shared_ptr<BranchDescription const>) {
1049  << "SingleChoiceNoProcessProductResolver::resetBranchDescription_() not implemented and should never be called.\n"
1050  << "Contact a Framework developer\n";
1051  }
1052 
1055  << "SingleChoiceNoProcessProductResolver::provenance_() not implemented and should never be called.\n"
1056  << "Contact a Framework developer\n";
1057  }
1058 
1061  << "SingleChoiceNoProcessProductResolver::connectTo() not implemented and should never be called.\n"
1062  << "Contact a Framework developer\n";
1063 
1064  }
1065 
1066 }
virtual ProductProvenance const * productProvenancePtr_() const override
ProducedProductResolver & realProduct_
virtual void setupUnscheduled(UnscheduledConfigurator const &) overridefinal
std::string const & branchName() const
std::string const & productInstanceName() const
std::unique_ptr< T, F > make_sentry(T *iObject, F iFunc)
NOTE: if iObject is null, then iFunc will not be called.
Definition: make_sentry.h:29
virtual void resetProductData_(bool deleteEarly) override
virtual bool productWasDeleted_() const override
Provenance const * provenance() const
void callWhenDoneAsync(WaitingTask *task)
Definition: Worker.h:98
virtual void putOrMergeProduct_(std::unique_ptr< WrapperBase > prod) const overridefinal
void setProcessHistory(ProcessHistory const &ph)
Definition: ProductData.h:59
virtual bool singleProduct_() const override
unsigned int ProductResolverIndex
StreamContext const * getStreamContext() const
virtual void setProcessHistory_(ProcessHistory const &ph) overridefinal
virtual void resetProductData_(bool deleteEarly) override
virtual void putProduct_(std::unique_ptr< WrapperBase > edp) const override
void prefetchAsync(WaitingTask *waitTask, Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const
void checkType(WrapperBase const &prod) const
std::type_info const & dynamicTypeInfo() const
Definition: WrapperBase.h:38
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventIDconst &, edm::Timestampconst & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
unsigned int unsetIndexValue() const
std::shared_ptr< BranchDescription const > bd_
virtual bool productResolved_() const overridefinal
virtual void resetProductData_(bool deleteEarly) override
FunctorTask< F > * make_functor_task(ALLOC &&iAlloc, F f)
void push(const T &iAction)
asynchronously pushes functor iAction into queue
virtual void prefetchAsync_(WaitingTask *waitTask, Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
UnscheduledAuxiliary const * aux_
void setCache(bool skipCurrentProcess, ProductResolverIndex index, std::exception_ptr exceptionPtr) const
void add(WaitingTask *)
Adds task to the waiting list.
list original
Definition: definitions.py:57
assert(m_qm.get())
virtual void setProcessHistory_(ProcessHistory const &ph) override
ProductStatus status() const
virtual void putProduct_(std::unique_ptr< WrapperBase > edp) const override
WaitingTaskList m_waitingTasks
Resolution tryResolver(unsigned int index, Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const
WrapperBase const * wrapper() const
Definition: ProductData.h:32
virtual void resetProductData_(bool deleteEarly) override
virtual bool productWasFetchedAndIsValid_(bool iSkipCurrentProcess) const override
virtual void putOrMergeProduct_(std::unique_ptr< WrapperBase > prod) const overridefinal
ProductProvenance const * productProvenance() const
Definition: Provenance.cc:31
virtual bool unscheduledWasNotRun_() const override
Resolution resolveProduct(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const
virtual void setProvenance_(ProductProvenanceRetriever const *provRetriever, ProcessHistory const &ph, ProductID const &pid) override
virtual bool singleProduct_() const override
virtual bool productUnavailable_() const override
std::atomic< bool > prefetchRequested_
EventSetup const * eventSetup() const
void reset()
Resets access to the resource so that added tasks will wait.
ProductProvenanceRetriever const * provRetriever_
static unsigned int kUnsetOffset
virtual BranchDescription const & branchDescription_() const override
ServiceToken presentToken() const
virtual void resetProductData_(bool deleteEarly) override
std::string const & processName() const
virtual Provenance const * provenance_() const override
#define constexpr
bool isPresent() const
Definition: WrapperBase.h:22
static unsigned int kMissingOffset
virtual void putProduct_(std::unique_ptr< WrapperBase > edp) const override
void putProduct(std::unique_ptr< WrapperBase > edp) const
virtual bool singleProduct_() const override
virtual bool productResolved_() const overridefinal
void unsafe_setWrapper(std::unique_ptr< WrapperBase > iValue) const
Definition: ProductData.cc:36
void prefetchFailed(unsigned int iProcessingIndex, Principal const &principal, bool iSkipCurrentProcess, std::exception_ptr iExceptPtr) const
virtual void setupUnscheduled(UnscheduledConfigurator const &) overridefinal
virtual void resetFailedFromThisProcess() override
UnscheduledAuxiliary const * aux_
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
void setProvenance(ProductProvenanceRetriever const *provRetriever, ProcessHistory const &ph, ProductID const &pid)
Definition: ProductData.h:63
virtual bool isFromCurrentProcess() const =0
virtual bool isFromCurrentProcess() const overridefinal
std::string const & className() const
std::atomic< unsigned int > lastSkipCurrentCheckIndex_
virtual BranchDescription const & branchDescription_() const override
virtual void putProduct_(std::unique_ptr< WrapperBase > edp) const override
task * execute() override
void resetProductData()
Definition: ProductData.h:51
std::vector< unsigned int > const & lookupProcessOrder() const
Definition: Principal.h:190
std::atomic< bool > prefetchRequested_
def move
Definition: eostools.py:510
def load
Definition: svgfig.py:546
bool dataValidFromResolver(unsigned int iProcessingIndex, Principal const &principal, bool iSkipCurrentProcess) const
virtual void setProvenance_(ProductProvenanceRetriever const *provRetriever, ProcessHistory const &ph, ProductID const &pid) override
TypeID unwrappedTypeID() const
virtual void resetProductData_(bool deleteEarly) override
static ServiceRegistry & instance()
virtual void setProcessHistory_(ProcessHistory const &ph) override
virtual Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
virtual void prefetchAsync_(WaitingTask *waitTask, Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
virtual void resetBranchDescription_(std::shared_ptr< BranchDescription const > bd) override
void mergeProduct(std::unique_ptr< WrapperBase > edp) const
double f[11][100]
std::atomic< bool > prefetchRequested_
std::atomic< bool > m_prefetchRequested
std::atomic< unsigned int > lastCheckIndex_
virtual bool isFromCurrentProcess() const overridefinal
virtual bool productUnavailable_() const overridefinal
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger but the state exists so we define the behavior If all triggers are the negative crieriion will lead to accepting the event(this again matches the behavior of"!*"before the partial wildcard feature was incorporated).The per-event"cost"of each negative criterion with multiple relevant triggers is about the same as!*was in the past
virtual Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
virtual void prefetchAsync_(WaitingTask *waitTask, Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
void setProcessHistory(ProcessHistory const &ph)
virtual void prefetchAsync_(WaitingTask *waitTask, Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
virtual ProductProvenance const * productProvenancePtr_() const overridefinal
void setProduct(std::unique_ptr< WrapperBase > edp) const
std::atomic< bool > skippingPrefetchRequested_
virtual void putProduct_(std::unique_ptr< WrapperBase > edp) const override
virtual ProductProvenance const * productProvenancePtr_() const override
std::atomic< ProductStatus > theStatus_
std::vector< bool > ambiguous_
Resolution resolveProductImpl(FUNC resolver) const
DelayedReader * reader() const
Definition: Principal.h:178
virtual void setProvenance_(ProductProvenanceRetriever const *provRetriever, ProcessHistory const &ph, ProductID const &pid) overridefinal
virtual bool productWasDeleted_() const overridefinal
UnscheduledAuxiliary const * auxiliary() const
virtual void connectTo(ProductResolverBase const &iOther, Principal const *) overridefinal
virtual void resetProductData_(bool deleteEarly) override
static unsigned int kAmbiguousOffset
virtual bool singleProduct_() const override
ProductStatus defaultStatus() const
bool productWasFetchedAndIsValid(bool iSkipCurrentProcess) const
virtual void setProcessHistory_(ProcessHistory const &ph) override
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:90
tuple pid
Definition: sysUtil.py:22
virtual void retrieveAndMerge_(Principal const &principal) const override
virtual void putProduct_(std::unique_ptr< WrapperBase > edp) const override
virtual void prefetchAsync_(WaitingTask *waitTask, Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
void addContext(std::string const &context)
Definition: Exception.cc:227
signalslot::Signal< void(StreamContext const &, ModuleCallingContext const &)> preModuleDelayedGetSignal_
std::vector< ProductResolverIndex > matchingHolders_
virtual void resetBranchDescription_(std::shared_ptr< BranchDescription const > bd) override
virtual Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
void tryPrefetchResolverAsync(unsigned int iProcessingIndex, Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc, ServiceToken token) const
virtual bool productWasFetchedAndIsValid_(bool iSkipCurrentProcess) const overridefinal
virtual bool singleProduct_() const overridefinal
BranchDescription const & branchDescription() const
Worker * findWorker(std::string const &iLabel) const
ProductProvenance const * branchIDToProvenance(BranchID const &bid) const
ConstProductResolverPtr getProductResolverByIndex(ProductResolverIndex const &oid) const
Definition: Principal.cc:442
virtual bool unscheduledWasNotRun_() const override
virtual void putOrMergeProduct_(std::unique_ptr< WrapperBase > prod) const overridefinal
virtual void setProvenance_(ProductProvenanceRetriever const *provRetriever, ProcessHistory const &ph, ProductID const &pid) override
virtual void setProcessHistory_(ProcessHistory const &ph) override
void setProvenance(ProductProvenanceRetriever const *provRetriever, ProcessHistory const &ph, ProductID const &pid)
virtual bool productWasDeleted_() const override
virtual void setupUnscheduled(UnscheduledConfigurator const &) overridefinal
virtual void connectTo(ProductResolverBase const &iOther, Principal const *) overridefinal
ProductData const & getProductData() const
virtual Provenance const * provenance_() const override
virtual void connectTo(ProductResolverBase const &, Principal const *) overridefinal
volatile std::atomic< bool > shutdown_flag false
virtual Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
void doWorkAsync(WaitingTask *task, typename T::MyPrincipal const &, EventSetup const &c, StreamID stream, ParentContext const &parentContext, typename T::Context const *context)
Definition: Worker.h:602
virtual void resetProductData_(bool deleteEarly) override
virtual ProductProvenance const * productProvenancePtr_() const override
signalslot::Signal< void(StreamContext const &, ModuleCallingContext const &)> postModuleDelayedGetSignal_
static uInt32 F(BLOWFISH_CTX *ctx, uInt32 x)
Definition: blowfish.cc:281
NoProcessProductResolver(std::vector< ProductResolverIndex > const &matchingHolders, std::vector< bool > const &ambiguous)
virtual void putOrMergeProduct_(std::unique_ptr< WrapperBase > prod) const overridefinal
virtual bool productUnavailable_() const override
virtual void putOrMergeProduct_(std::unique_ptr< WrapperBase > prod) const overridefinal
void putOrMergeProduct(std::unique_ptr< WrapperBase > edp) const
virtual bool productWasFetchedAndIsValid_(bool iSkipCurrentProcess) const override
virtual void setProvenance_(ProductProvenanceRetriever const *provRetriever, ProcessHistory const &ph, ProductID const &pid) override
virtual ProductProvenance const * productProvenancePtr_() const override
virtual bool productResolved_() const overridefinal
tuple size
Write out results.
void emit(Args &&...args) const
Definition: Signal.h:47
std::string const & moduleLabel() const
WrapperBase * unsafe_wrapper() const
Definition: ProductData.h:34
virtual void putProduct_(std::unique_ptr< WrapperBase > edp) const override
static HepMC::HEPEVT_Wrapper wrapper
virtual Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override