CMS 3D CMS Logo

ProductResolvers.cc
Go to the documentation of this file.
1 /*----------------------------------------------------------------------
2 ----------------------------------------------------------------------*/
3 #include "ProductResolvers.h"
4 #include "Worker.h"
5 #include "UnscheduledAuxiliary.h"
19 
20 #include <cassert>
21 #include <utility>
22 
23 static constexpr unsigned int kUnsetOffset = 0;
24 static constexpr unsigned int kAmbiguousOffset = 1;
25 static constexpr unsigned int kMissingOffset = 2;
26 
27 namespace edm {
28 
31  exception << "ProductResolverBase::resolveProduct_: The product matching all criteria was already deleted\n"
32  << "Looking for type: " << branchDescription().unwrappedTypeID() << "\n"
33  << "Looking for module label: " << moduleLabel() << "\n"
34  << "Looking for productInstanceName: " << productInstanceName() << "\n"
35  << (processName().empty() ? "" : "Looking for process: ") << processName() << "\n"
36  << "This means there is a configuration error.\n"
37  << "The module which is asking for this data must be configured to state that it will read this data.";
38  throw exception;
39 
40  }
41 
42  //This is a templated function in order to avoid calling another virtual function
43  template <bool callResolver, typename FUNC>
46 
47  if(productWasDeleted()) {
49  }
50  auto presentStatus = status();
51 
52  if(callResolver && presentStatus == ProductStatus::ResolveNotRun) {
53  //if resolver fails because of exception or not setting product
54  // make sure the status goes to failed
55  auto failedStatusSetter = [this](ProductStatus* presentStatus) {
56  if(this->status() == ProductStatus::ResolveNotRun) {
57  this->setFailedStatus();
58  }
59  *presentStatus = this->status();
60  };
61  std::unique_ptr<ProductStatus, decltype(failedStatusSetter)> failedStatusGuard(&presentStatus, failedStatusSetter);
62 
63  //If successful, this will call setProduct
64  resolver();
65  }
66 
67 
68  if (presentStatus == ProductStatus::ProductSet) {
69  auto pd = &getProductData();
70  if(pd->wrapper()->isPresent()) {
71  return Resolution(pd);
72  }
73  }
74 
75  return Resolution(nullptr);
76  }
77 
78  void
79  DataManagingProductResolver::mergeProduct(std::unique_ptr<WrapperBase> iFrom) const {
80  assert(status() == ProductStatus::ProductSet);
81  if(not iFrom) { return;}
82 
83  checkType(*iFrom);
84 
86  if(original->isMergeable()) {
87  original->mergeProduct(iFrom.get());
88  } else if(original->hasIsProductEqual()) {
89  if(!original->isProductEqual(iFrom.get())) {
90  auto const& bd = branchDescription();
91  edm::LogError("RunLumiMerging")
92  << "ProductResolver::mergeTheProduct\n"
93  << "Two run/lumi products for the same run/lumi which should be equal are not\n"
94  << "Using the first, ignoring the second\n"
95  << "className = " << bd.className() << "\n"
96  << "moduleLabel = " << bd.moduleLabel() << "\n"
97  << "instance = " << bd.productInstanceName() << "\n"
98  << "process = " << bd.processName() << "\n";
99  }
100  } else {
101  auto const& bd = branchDescription();
102  edm::LogWarning("RunLumiMerging")
103  << "ProductResolver::mergeTheProduct\n"
104  << "Run/lumi product has neither a mergeProduct nor isProductEqual function\n"
105  << "Using the first, ignoring the second in merge\n"
106  << "className = " << bd.className() << "\n"
107  << "moduleLabel = " << bd.moduleLabel() << "\n"
108  << "instance = " << bd.productInstanceName() << "\n"
109  << "process = " << bd.processName() << "\n";
110  }
111  }
112 
113 
116  bool,
118  ModuleCallingContext const* mcc) const {
119  return resolveProductImpl<true>([this,&principal,mcc]() {
120  auto branchType = principal.branchType();
121  if(branchType != InEvent) {
122  //delayed get has not been allowed with Run or Lumis
123  // The file may already be closed so the reader is invalid
124  return;
125  }
126  if(mcc and (branchType == InEvent) and aux_) {
127  aux_->preModuleDelayedGetSignal_.emit(*(mcc->getStreamContext()),*mcc);
128  }
129 
130  auto sentry( make_sentry(mcc,
131  [this, branchType](ModuleCallingContext const* iContext){
132  if(branchType == InEvent and aux_) {
133  aux_->postModuleDelayedGetSignal_.emit(*(iContext->getStreamContext()), *iContext); }
134  }));
135 
136  if(auto reader=principal.reader()) {
137  std::unique_lock<std::recursive_mutex> guard;
138  if(auto sr = reader->sharedResources().second) {
139  guard =std::unique_lock<std::recursive_mutex>(*sr);
140  }
141  if ( not productResolved()) {
142  //another thread could have beaten us here
143  putProduct( reader->getProduct(BranchKey(branchDescription()), &principal, mcc));
144  }
145  }
146  });
147 
148  }
149 
150  void
152  if(auto reader = principal.reader()) {
153 
154  std::unique_lock<std::recursive_mutex> guard;
155  if(auto sr = reader->sharedResources().second) {
156  guard =std::unique_lock<std::recursive_mutex>(*sr);
157  }
158 
159  //Can't use resolveProductImpl since it first checks to see
160  // if the product was already retrieved and then returns if it is
162  std::unique_ptr<WrapperBase> edp(reader->getProduct(bk, &principal));
163 
164  if(edp.get() != nullptr) {
166  } else if( status()== defaultStatus()) {
167  setFailedStatus();
168  }
169  }
170  }
171 
172 
174  Principal const& principal,
175  bool skipCurrentProcess,
176  ServiceToken const& token,
178  ModuleCallingContext const* mcc) const {
179  m_waitingTasks.add(waitTask);
180 
181  bool expected = false;
182  if( m_prefetchRequested.compare_exchange_strong(expected, true) ) {
183 
184  auto workToDo = [this, mcc, &principal, token] () {
185  //need to make sure Service system is activated on the reading thread
186  ServiceRegistry::Operate guard(token);
187  try {
188  resolveProductImpl<true>([this,&principal,mcc]() {
189  if(principal.branchType() != InEvent) { return; }
190  if(auto reader = principal.reader()) {
191  std::unique_lock<std::recursive_mutex> guard;
192  if(auto sr = reader->sharedResources().second) {
193  guard =std::unique_lock<std::recursive_mutex>(*sr);
194  }
195  if ( not productResolved()) {
196  //another thread could have finished this while we were waiting
197  putProduct( reader->getProduct(BranchKey(branchDescription()), &principal, mcc));
198  }
199  }
200  });
201  } catch(...) {
202  this->m_waitingTasks.doneWaiting(std::current_exception());
203  return;
204  }
205  this->m_waitingTasks.doneWaiting(nullptr);
206  };
207 
208  SerialTaskQueueChain* queue = nullptr;
209  if(auto reader = principal.reader()) {
210  if (auto shared_res = reader->sharedResources().first) {
211  queue = &(shared_res->serialQueueChain());
212  }
213  }
214  if(queue) {
215  queue->push(workToDo);
216  } else {
217  //Have to create a new task
218  auto t = make_functor_task(tbb::task::allocate_root(),
219  workToDo);
220  tbb::task::spawn(*t);
221  }
222  }
223  }
224 
225  void
227  m_prefetchRequested = false;
228  m_waitingTasks.reset();
230  }
231 
232  void
234  aux_ = iConfigure.auxiliary();
235  }
236 
237 
238  bool
240  return false;
241  }
242 
243 
246  bool skipCurrentProcess,
248  ModuleCallingContext const*) const {
249  if (!skipCurrentProcess) {
250  //'false' means never call the lambda function
251  return resolveProductImpl<false>([](){return;});
252  }
253  return Resolution(nullptr);
254  }
255 
257  Principal const& principal,
258  bool skipCurrentProcess,
259  ServiceToken const& token,
261  ModuleCallingContext const* mcc) const {
262  if(not skipCurrentProcess) {
263  if(branchDescription().availableOnlyAtEndTransition() and mcc ) {
264  if( not mcc->parent().isAtEndTransition() ) {
265  return;
266  }
267  }
268  m_waitingTasks.add(waitTask);
269 
270  bool expected = false;
271  if(worker_ and prefetchRequested_.compare_exchange_strong(expected,true)) {
272  //using a waiting task to do a callback guarantees that
273  // the m_waitingTasks list will be released from waiting even
274  // if the module does not put this data product or the
275  // module has an exception while running
276 
277  auto waiting = make_waiting_task(tbb::task::allocate_root(),
278  [this](std::exception_ptr const * iException) {
279  if(nullptr != iException) {
280  m_waitingTasks.doneWaiting(*iException);
281  } else {
282  m_waitingTasks.doneWaiting(std::exception_ptr());
283  }
284  });
285  worker_->callWhenDoneAsync(waiting);
286  }
287  }
288  }
289 
290  void
291  PuttableProductResolver::putProduct_(std::unique_ptr<WrapperBase> edp) const {
293  bool expected = false;
294  if(prefetchRequested_.compare_exchange_strong(expected,true)) {
295  m_waitingTasks.doneWaiting(std::exception_ptr());
296  }
297  }
298 
299 
300  void
302  m_waitingTasks.reset();
304  prefetchRequested_ = false;
305  }
306 
307  void
309  worker_ = iConfigure.findWorker(branchDescription().moduleLabel());
310  }
311 
312 
313  void
315  aux_ = iConfigure.auxiliary();
316  worker_ = iConfigure.findWorker(branchDescription().moduleLabel());
317  assert(worker_ != nullptr);
318 
319  }
320 
323  bool skipCurrentProcess,
325  ModuleCallingContext const* mcc) const {
326  if (!skipCurrentProcess and worker_) {
327  return resolveProductImpl<true>(
328  [&principal,this,sra,mcc]() {
329  try {
330  auto const& event = static_cast<EventPrincipal const&>(principal);
331  ParentContext parentContext(mcc);
332  aux_->preModuleDelayedGetSignal_.emit(*(mcc->getStreamContext()),*mcc);
333 
334  auto workCall = [this,&event,&parentContext,mcc] () {
335  auto sentry( make_sentry(mcc,[this](ModuleCallingContext const* iContext){aux_->postModuleDelayedGetSignal_.emit(*(iContext->getStreamContext()), *iContext); }));
336 
338  event,
339  *(aux_->eventSetup()),
340  event.streamID(),
341  parentContext,
342  mcc->getStreamContext());
343  };
344 
345  if (sra) {
346  assert(false);
347  } else {
348  workCall();
349  }
350 
351  }
352  catch (cms::Exception & ex) {
353  std::ostringstream ost;
354  ost << "Calling produce method for unscheduled module "
355  << worker_->description().moduleName() << "/'"
356  << worker_->description().moduleLabel() << "'";
357  ex.addContext(ost.str());
358  throw;
359  }
360  });
361  }
362  return Resolution(nullptr);
363  }
364 
365  void
367  Principal const& principal,
368  bool skipCurrentProcess,
369  ServiceToken const& token,
371  ModuleCallingContext const* mcc) const
372  {
373  if(skipCurrentProcess) { return; }
374  waitingTasks_.add(waitTask);
375  bool expected = false;
376  if(prefetchRequested_.compare_exchange_strong(expected, true)) {
377 
378  //Have to create a new task which will make sure the state for UnscheduledProductResolver
379  // is properly set after the module has run
380  auto t = make_waiting_task(tbb::task::allocate_root(),
381  [this](std::exception_ptr const* iPtr)
382  {
383  //The exception is being rethrown because resolveProductImpl sets the ProductResolver to a failed
384  // state for the case where an exception occurs during the call to the function.
385  try {
386  resolveProductImpl<true>([iPtr]() {
387  if ( iPtr) {
388  std::rethrow_exception(*iPtr);
389  }
390  });
391  } catch(...) {
392  waitingTasks_.doneWaiting(std::current_exception());
393  return;
394  }
395  waitingTasks_.doneWaiting(nullptr);
396  } );
397  auto const& event = static_cast<EventPrincipal const&>(principal);
398  ParentContext parentContext(mcc);
399 
401  event,
402  *(aux_->eventSetup()),
403  token,
404  event.streamID(),
405  parentContext,
406  mcc->getStreamContext());
407  }
408  }
409 
410  void
412  prefetchRequested_ = false;
413  waitingTasks_.reset();
415  }
416 
417 
418  void
419  ProducedProductResolver::putProduct_(std::unique_ptr<WrapperBase> edp) const {
420  if(status() != defaultStatus()) {
422  << "Attempt to insert more than one product on branch " << branchDescription().branchName() << "\n";
423  }
424 
425  setProduct(std::move(edp)); // ProductResolver takes ownership
426  }
427 
428  void
429  InputProductResolver::putProduct_(std::unique_ptr<WrapperBase> edp) const {
430  if ( not productResolved()) {
431  //Another thread could have set this
432  setProduct(std::move(edp));
433  }
434  }
435 
436  bool
438  return true;
439  }
440 
441  void
443  assert(false);
444  }
445 
446  void
447  DataManagingProductResolver::putOrMergeProduct_(std::unique_ptr<WrapperBase> prod) const {
448  if(not prod) {return;}
449  if(status() == defaultStatus()) {
450  //resolveProduct has not been called or it failed
451  putProduct(std::move(prod));
452  } else {
453  mergeProduct(std::move(prod));
454  }
455  }
456 
457 
458 
459  void
461  // Check if the types match.
462  TypeID typeID(prod.dynamicTypeInfo());
464  // Types do not match.
466  << "Product on branch " << branchDescription().branchName() << " is of wrong type.\n"
467  << "It is supposed to be of type " << branchDescription().className() << ".\n"
468  << "It is actually of type " << typeID.className() << ".\n";
469  }
470  }
471 
472 
473  void
474  DataManagingProductResolver::setProduct(std::unique_ptr<WrapperBase> edp) const {
475  if(edp) {
476  checkType(*edp);
479  } else {
480  setFailedStatus();
481  }
482  }
483  // This routine returns true if it is known that currently there is no real product.
484  // If there is a real product, it returns false.
485  // If it is not known if there is a real product, it returns false.
486  bool
488  auto presentStatus = status();
489  if(presentStatus == ProductStatus::ProductSet) {
490  return !(getProductData().wrapper()->isPresent());
491  }
492  return presentStatus != ProductStatus::ResolveNotRun;
493  }
494 
495  bool
497  auto s = status();
498  return (s != defaultStatus() ) or (s == ProductStatus::ProductDeleted);
499  }
500 
501 
502  // This routine returns true if the product was deleted early in order to save memory
503  bool
506  }
507 
508  bool
510  if (iSkipCurrentProcess and isFromCurrentProcess() ) {
511  return false;
512  }
514  if(getProductData().wrapper()->isPresent()) {
515  return true;
516  }
517  }
518  return false;
519  }
520 
521 
523  productData_.setProvenance(provRetriever,ph,pid);
524  }
525 
528  }
529 
531  return provenance()->productProvenance();
532  }
533 
537  }
538  if(deleteEarly) {
540  } else {
541  resetStatus();
542  }
543  }
544 
546  return true;
547  }
548 
550  realProduct_.setProvenance(provRetriever,ph,pid);
551  }
552 
554  realProduct_.setProcessHistory(ph);
555  }
556 
558  return provenance()->productProvenance();
559  }
560 
562  realProduct_.resetProductData_(deleteEarly);
563  }
564 
566  return true;
567  }
568 
569  void AliasProductResolver::putProduct_(std::unique_ptr<WrapperBase> ) const {
571  << "AliasProductResolver::putProduct_() not implemented and should never be called.\n"
572  << "Contact a Framework developer\n";
573  }
574 
575  void AliasProductResolver::putOrMergeProduct_(std::unique_ptr<WrapperBase> edp) const {
577  << "AliasProductResolver::putOrMergeProduct_(std::unique_ptr<WrapperBase> edp) not implemented and should never be called.\n"
578  << "Contact a Framework developer\n";
579  }
580 
581 
582 
584  provRetriever_ = provRetriever;
585  }
586 
588  }
589 
591  return provRetriever_? provRetriever_->branchIDToProvenance(bd_->originalBranchID()): nullptr;
592  }
593 
595  }
596 
598  return true;
599  }
600 
601  void ParentProcessProductResolver::putProduct_(std::unique_ptr<WrapperBase> ) const {
603  << "ParentProcessProductResolver::putProduct_() not implemented and should never be called.\n"
604  << "Contact a Framework developer\n";
605  }
606 
607  void ParentProcessProductResolver::putOrMergeProduct_(std::unique_ptr<WrapperBase> edp) const {
609  << "ParentProcessProductResolver::putOrMergeProduct_(std::unique_ptr<WrapperBase> edp) not implemented and should never be called.\n"
610  << "Contact a Framework developer\n";
611  }
612 
614  // In principle, this ought to be fixed. I noticed one hits this error
615  // when in a SubProcess and calling the Event::getProvenance function
616  // with a BranchID to a branch from an earlier SubProcess or the top
617  // level process and this branch is not kept in this SubProcess. It might
618  // be possible to hit this in other contexts. I say it ought to be
619  // fixed because one does not encounter this issue if the SubProcesses
620  // are split into genuinely different processes (in principle that
621  // ought to give identical behavior and results). No user has ever
622  // reported this issue which has been around for some time and it was only
623  // noticed when testing some rare corner cases after modifying Core code.
624  // After discussing this with Chris we decided that at least for the moment
625  // there are higher priorities than fixing this ... I converted it so it
626  // causes an exception instead of a seg fault. The issue that may need to
627  // be addressed someday is how ProductResolvers for non-kept branches are
628  // connected to earlier SubProcesses.
630  << "ParentProcessProductResolver::throwNullRealProduct RealProduct pointer not set in this context.\n"
631  << "Contact a Framework developer\n";
632  }
633 
635  NoProcessProductResolver(std::vector<ProductResolverIndex> const& matchingHolders,
636  std::vector<bool> const& ambiguous,
637  bool madeAtEnd) :
638  matchingHolders_(matchingHolders),
639  ambiguous_(ambiguous),
640  lastCheckIndex_(ambiguous_.size() + kUnsetOffset),
641  lastSkipCurrentCheckIndex_(lastCheckIndex_.load()),
642  prefetchRequested_(false),
643  skippingPrefetchRequested_(false),
644  madeAtEnd_{madeAtEnd}
645  {
646  assert(ambiguous_.size() == matchingHolders_.size());
647  }
648 
651  Principal const& principal,
652  bool skipCurrentProcess,
654  ModuleCallingContext const* mcc) const {
655  ProductResolverBase const* productResolver = principal.getProductResolverByIndex(matchingHolders_[index]);
656  return productResolver->resolveProduct(principal, skipCurrentProcess, sra, mcc);
657  }
658 
659 
662  bool skipCurrentProcess,
664  ModuleCallingContext const* mcc) const {
665  //See if we've already cached which Resolver we should call or if
666  // we know it is ambiguous
667  const unsigned int choiceSize = ambiguous_.size();
668 
669  //madeAtEnd_==true and not at end transition is the same as skipping the current process
670  if( (not skipCurrentProcess) and (madeAtEnd_ and mcc)) {
671  skipCurrentProcess = not mcc->parent().isAtEndTransition();
672  }
673 
674  unsigned int checkCacheIndex = skipCurrentProcess? lastSkipCurrentCheckIndex_.load() : lastCheckIndex_.load();
675  if( checkCacheIndex != choiceSize +kUnsetOffset) {
676  if (checkCacheIndex == choiceSize+kAmbiguousOffset) {
678  } else if(checkCacheIndex == choiceSize+kMissingOffset) {
679  return Resolution(nullptr);
680  }
681  return tryResolver(checkCacheIndex, principal, skipCurrentProcess,
682  sra,mcc);
683  }
684 
685  std::atomic<unsigned int>& updateCacheIndex = skipCurrentProcess? lastSkipCurrentCheckIndex_ : lastCheckIndex_;
686 
687  std::vector<unsigned int> const& lookupProcessOrder = principal.lookupProcessOrder();
688  for(unsigned int k : lookupProcessOrder) {
689  assert(k < ambiguous_.size());
690  if(k == 0) break; // Done
691  if(ambiguous_[k]) {
692  updateCacheIndex = choiceSize + kAmbiguousOffset;
694  }
696  auto resolution = tryResolver(k,principal, skipCurrentProcess, sra,mcc);
697  if(resolution.data() != nullptr) {
698  updateCacheIndex = k;
699  return resolution;
700  }
701  }
702  }
703 
704  updateCacheIndex = choiceSize + kMissingOffset;
705  return Resolution(nullptr);
706  }
707 
708  void
710  Principal const& principal,
711  bool skipCurrentProcess,
712  ServiceToken const& token,
714  ModuleCallingContext const* mcc) const {
715  bool timeToMakeAtEnd = true;
716  if(madeAtEnd_ and mcc) {
717  timeToMakeAtEnd = mcc->parent().isAtEndTransition();
718  }
719 
720  //If timeToMakeAtEnd is false, then it is equivalent to skipping the current process
721  if(not skipCurrentProcess and timeToMakeAtEnd) {
722  waitingTasks_.add(waitTask);
723 
724  bool expected = false;
725  if( prefetchRequested_.compare_exchange_strong(expected,true)) {
726  //we are the first thread to request
727  tryPrefetchResolverAsync(0, principal, false, sra, mcc, token);
728  }
729  } else {
730  skippingWaitingTasks_.add(waitTask);
731  bool expected = false;
732  if( skippingPrefetchRequested_.compare_exchange_strong(expected,true)) {
733  //we are the first thread to request
734  tryPrefetchResolverAsync(0, principal, true, sra, mcc, token);
735  }
736  }
737  }
738 
739  void NoProcessProductResolver::setCache(bool iSkipCurrentProcess,
740  ProductResolverIndex iIndex,
741  std::exception_ptr iExceptPtr) const {
742  if( not iSkipCurrentProcess) {
743  lastCheckIndex_ = iIndex;
744  waitingTasks_.doneWaiting(iExceptPtr);
745  } else {
748  }
749  }
750 
751  namespace {
752  class TryNextResolverWaitingTask : public edm::WaitingTask {
753  public:
754 
755  TryNextResolverWaitingTask(NoProcessProductResolver const* iResolver,
756  unsigned int iResolverIndex,
757  Principal const* iPrincipal,
759  ModuleCallingContext const* iMCC,
760  bool iSkipCurrentProcess,
761  ServiceToken iToken) :
762  resolver_(iResolver),
763  principal_(iPrincipal),
764  sra_(iSRA),
765  mcc_(iMCC),
766  serviceToken_(iToken),
767  index_(iResolverIndex),
768  skipCurrentProcess_(iSkipCurrentProcess){}
769 
770  tbb::task* execute() override {
771  auto exceptPtr =exceptionPtr();
772  if(exceptPtr) {
773  resolver_->prefetchFailed(index_, *principal_, skipCurrentProcess_, *exceptPtr);
774  } else {
775  if(not resolver_->dataValidFromResolver(index_, *principal_, skipCurrentProcess_)) {
776  resolver_->tryPrefetchResolverAsync(index_+1,
777  *principal_,
778  skipCurrentProcess_,
779  sra_,
780  mcc_,
781  serviceToken_);
782  }
783  }
784  return nullptr;
785  }
786 
787  private:
788  NoProcessProductResolver const* resolver_;
789  Principal const* principal_;
791  ModuleCallingContext const* mcc_;
792  ServiceToken serviceToken_;
793  unsigned int index_;
794  bool skipCurrentProcess_;
795  };
796  }
797 
798  void
799  NoProcessProductResolver::prefetchFailed(unsigned int iProcessingIndex,
800  Principal const& principal,
801  bool iSkipCurrentProcess,
802  std::exception_ptr iExceptPtr) const {
803  std::vector<unsigned int> const& lookupProcessOrder = principal.lookupProcessOrder();
804  auto k = lookupProcessOrder[iProcessingIndex];
805 
806  setCache(iSkipCurrentProcess, k, iExceptPtr);
807  }
808 
809 
810  bool
811  NoProcessProductResolver::dataValidFromResolver(unsigned int iProcessingIndex,
812  Principal const& principal,
813  bool iSkipCurrentProcess) const {
814  std::vector<unsigned int> const& lookupProcessOrder = principal.lookupProcessOrder();
815  auto k = lookupProcessOrder[iProcessingIndex];
816  ProductResolverBase const* productResolver = principal.getProductResolverByIndex(matchingHolders_[k]);
817 
818  if(productResolver->productWasFetchedAndIsValid(iSkipCurrentProcess)) {
819 
820  setCache(iSkipCurrentProcess, k, nullptr);
821  return true;
822  }
823  return false;
824  }
825 
826 
827  void
829  Principal const& principal,
830  bool skipCurrentProcess,
832  ModuleCallingContext const* mcc,
833  ServiceToken token) const {
834  std::vector<unsigned int> const& lookupProcessOrder = principal.lookupProcessOrder();
835  auto index = iProcessingIndex;
836 
837  const unsigned int choiceSize = ambiguous_.size();
838  unsigned int newCacheIndex = choiceSize + kMissingOffset;
839  while(index < lookupProcessOrder.size()) {
840  auto k = lookupProcessOrder[index];
841  if(k==0) {
842  break;
843  }
844  assert(k < ambiguous_.size());
845  if(ambiguous_[k]) {
846  newCacheIndex = choiceSize + kAmbiguousOffset;
847  break;
848  }
850  //make new task
851 
852  auto task = new (tbb::task::allocate_root()) TryNextResolverWaitingTask(
853  this,
854  index,
855  &principal,
856  sra,
857  mcc,
858  skipCurrentProcess,
859  token
860  );
861  task->increment_ref_count();
862  ProductResolverBase const* productResolver = principal.getProductResolverByIndex(matchingHolders_[k]);
863 
864  //Make sure the Services are available on this thread
865  ServiceRegistry::Operate guard(token);
866 
867  productResolver->prefetchAsync(task,
868  principal,
869  skipCurrentProcess,
870  token,
871  sra, mcc);
872  if(0 == task->decrement_ref_count()) {
873  tbb::task::spawn(*task);
874  }
875  return;
876  }
877  ++index;
878  }
879  //data product unavailable
880  setCache(skipCurrentProcess, newCacheIndex, nullptr);
881  }
882 
884  }
885 
887  }
888 
890  return nullptr;
891  }
892 
893  inline unsigned int NoProcessProductResolver::unsetIndexValue() const { return ambiguous_.size()+kUnsetOffset; }
894 
896  const auto resetValue = unsetIndexValue();
897  lastCheckIndex_ = resetValue;
898  lastSkipCurrentCheckIndex_ = resetValue;
899  prefetchRequested_ = false;
903  }
904 
906  return false;
907  }
908 
911  << "NoProcessProductResolver::unscheduledWasNotRun_() not implemented and should never be called.\n"
912  << "Contact a Framework developer\n";
913  }
914 
917  << "NoProcessProductResolver::productUnavailable_() not implemented and should never be called.\n"
918  << "Contact a Framework developer\n";
919  }
920 
923  << "NoProcessProductResolver::productResolved_() not implemented and should never be called.\n"
924  << "Contact a Framework developer\n";
925  }
926 
929  << "NoProcessProductResolver::productWasDeleted_() not implemented and should never be called.\n"
930  << "Contact a Framework developer\n";
931  }
932 
933  bool NoProcessProductResolver::productWasFetchedAndIsValid_(bool /*iSkipCurrentProcess*/) const {
935  << "NoProcessProductResolver::productWasFetchedAndIsValid_() not implemented and should never be called.\n"
936  << "Contact a Framework developer\n";
937  }
938 
939  void NoProcessProductResolver::putProduct_(std::unique_ptr<WrapperBase> ) const {
941  << "NoProcessProductResolver::putProduct_() not implemented and should never be called.\n"
942  << "Contact a Framework developer\n";
943  }
944 
945  void NoProcessProductResolver::putOrMergeProduct_(std::unique_ptr<WrapperBase> edp) const {
947  << "NoProcessProductResolver::putOrMergeProduct_(std::unique_ptr<WrapperBase> edp) not implemented and should never be called.\n"
948  << "Contact a Framework developer\n";
949  }
950 
953  << "NoProcessProductResolver::branchDescription_() not implemented and should never be called.\n"
954  << "Contact a Framework developer\n";
955  }
956 
957  void NoProcessProductResolver::resetBranchDescription_(std::shared_ptr<BranchDescription const>) {
959  << "NoProcessProductResolver::resetBranchDescription_() not implemented and should never be called.\n"
960  << "Contact a Framework developer\n";
961  }
962 
965  << "NoProcessProductResolver::provenance_() not implemented and should never be called.\n"
966  << "Contact a Framework developer\n";
967  }
968 
971  << "NoProcessProductResolver::connectTo() not implemented and should never be called.\n"
972  << "Contact a Framework developer\n";
973 
974  }
975 
976  //---- SingleChoiceNoProcessProductResolver ----------------
978  Principal const& principal,
979  bool skipCurrentProcess,
981  ModuleCallingContext const* mcc) const
982  {
983  //NOTE: Have to lookup the other ProductResolver each time rather than cache
984  // it's pointer since it appears the pointer can change at some later stage
985  return principal.getProductResolverByIndex(realResolverIndex_)
986  ->resolveProduct(principal,
987  skipCurrentProcess, sra, mcc);
988  }
989 
991  Principal const& principal,
992  bool skipCurrentProcess,
993  ServiceToken const& token,
995  ModuleCallingContext const* mcc) const {
996  principal.getProductResolverByIndex(realResolverIndex_)
997  ->prefetchAsync(waitTask,principal,
998  skipCurrentProcess, token, sra, mcc);
999  }
1000 
1002  }
1003 
1005  }
1006 
1008  return nullptr;
1009  }
1010 
1012  }
1013 
1015  return false;
1016  }
1017 
1020  << "SingleChoiceNoProcessProductResolver::unscheduledWasNotRun_() not implemented and should never be called.\n"
1021  << "Contact a Framework developer\n";
1022  }
1023 
1026  << "SingleChoiceNoProcessProductResolver::productUnavailable_() not implemented and should never be called.\n"
1027  << "Contact a Framework developer\n";
1028  }
1029 
1032  << "SingleChoiceNoProcessProductResolver::productResolved_() not implemented and should never be called.\n"
1033  << "Contact a Framework developer\n";
1034  }
1035 
1038  << "SingleChoiceNoProcessProductResolver::productWasDeleted_() not implemented and should never be called.\n"
1039  << "Contact a Framework developer\n";
1040  }
1041 
1044  << "SingleChoiceNoProcessProductResolver::productWasFetchedAndIsValid_() not implemented and should never be called.\n"
1045  << "Contact a Framework developer\n";
1046  }
1047 
1048 
1049  void SingleChoiceNoProcessProductResolver::putProduct_(std::unique_ptr<WrapperBase> ) const {
1051  << "SingleChoiceNoProcessProductResolver::putProduct_() not implemented and should never be called.\n"
1052  << "Contact a Framework developer\n";
1053  }
1054 
1055  void SingleChoiceNoProcessProductResolver::putOrMergeProduct_(std::unique_ptr<WrapperBase> edp) const {
1057  << "SingleChoiceNoProcessProductResolver::putOrMergeProduct_(std::unique_ptr<WrapperBase> edp) not implemented and should never be called.\n"
1058  << "Contact a Framework developer\n";
1059  }
1060 
1063  << "SingleChoiceNoProcessProductResolver::branchDescription_() not implemented and should never be called.\n"
1064  << "Contact a Framework developer\n";
1065  }
1066 
1067  void SingleChoiceNoProcessProductResolver::resetBranchDescription_(std::shared_ptr<BranchDescription const>) {
1069  << "SingleChoiceNoProcessProductResolver::resetBranchDescription_() not implemented and should never be called.\n"
1070  << "Contact a Framework developer\n";
1071  }
1072 
1075  << "SingleChoiceNoProcessProductResolver::provenance_() not implemented and should never be called.\n"
1076  << "Contact a Framework developer\n";
1077  }
1078 
1081  << "SingleChoiceNoProcessProductResolver::connectTo() not implemented and should never be called.\n"
1082  << "Contact a Framework developer\n";
1083 
1084  }
1085 
1086 }
size
Write out results.
void connectTo(ProductResolverBase const &iOther, Principal const *) final
Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
std::string const & branchName() const
std::string const & productInstanceName() const
std::unique_ptr< T, F > make_sentry(T *iObject, F iFunc)
NOTE: if iObject is null, then iFunc will not be called.
Definition: make_sentry.h:29
void resetProductData_(bool deleteEarly) override
Provenance const * provenance() const
void setProcessHistory(ProcessHistory const &ph)
Definition: ProductData.h:59
unsigned int ProductResolverIndex
StreamContext const * getStreamContext() const
void resetProductData_(bool deleteEarly) override
void checkType(WrapperBase const &prod) const
std::type_info const & dynamicTypeInfo() const
Definition: WrapperBase.h:43
void prefetchAsync(WaitingTask *waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const
unsigned int unsetIndexValue() const
void resetProductData_(bool deleteEarly) override
void prefetchAsync_(WaitingTask *waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
FunctorTask< F > * make_functor_task(ALLOC &&iAlloc, F f)
Definition: FunctorTask.h:47
void setCache(bool skipCurrentProcess, ProductResolverIndex index, std::exception_ptr exceptionPtr) const
void add(WaitingTask *)
Adds task to the waiting list.
void setProcessHistory_(ProcessHistory const &ph) override
ProductStatus status() const
bool singleProduct_() const override
bool unscheduledWasNotRun_() const override
void putProduct_(std::unique_ptr< WrapperBase > edp) const override
Resolution tryResolver(unsigned int index, Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const
WrapperBase const * wrapper() const
Definition: ProductData.h:32
void resetProductData_(bool deleteEarly) override
ProductProvenance const * productProvenance() const
Definition: Provenance.cc:31
void push(T &&iAction)
asynchronously pushes functor iAction into queue
Resolution resolveProduct(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const
void setProvenance_(ProductProvenanceRetriever const *provRetriever, ProcessHistory const &ph, ProductID const &pid) override
void reset()
Resets access to the resource so that added tasks will wait.
void connectTo(ProductResolverBase const &iOther, Principal const *) final
BranchDescription const & branchDescription_() const override
bool productResolved_() const final
static unsigned int kUnsetOffset
void resetProductData_(bool deleteEarly) override
std::string const & processName() const
void retrieveAndMerge_(Principal const &principal) const override
void connectTo(ProductResolverBase const &, Principal const *) final
#define constexpr
void putProduct_(std::unique_ptr< WrapperBase > edp) const override
bool isPresent() const
Definition: WrapperBase.h:27
void putOrMergeProduct_(std::unique_ptr< WrapperBase > prod) const final
ProductProvenance const * productProvenancePtr_() const override
static unsigned int kMissingOffset
void putProduct_(std::unique_ptr< WrapperBase > edp) const override
void putProduct(std::unique_ptr< WrapperBase > edp) const
void unsafe_setWrapper(std::unique_ptr< WrapperBase > iValue) const
Definition: ProductData.cc:38
void prefetchFailed(unsigned int iProcessingIndex, Principal const &principal, bool iSkipCurrentProcess, std::exception_ptr iExceptPtr) const
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
void setProvenance(ProductProvenanceRetriever const *provRetriever, ProcessHistory const &ph, ProductID const &pid)
Definition: ProductData.h:63
def principal(options)
bool isFromCurrentProcess() const final
std::string const & className() const
std::atomic< unsigned int > lastSkipCurrentCheckIndex_
void resetProductData()
Definition: ProductData.h:51
bool singleProduct_() const override
Provenance const * provenance_() const override
std::vector< unsigned int > const & lookupProcessOrder() const
Definition: Principal.h:190
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
void putOrMergeProduct_(std::unique_ptr< WrapperBase > prod) const final
Provenance const * provenance_() const override
bool dataValidFromResolver(unsigned int iProcessingIndex, Principal const &principal, bool iSkipCurrentProcess) const
void setProvenance_(ProductProvenanceRetriever const *provRetriever, ProcessHistory const &ph, ProductID const &pid) override
TypeID unwrappedTypeID() const
void resetProductData_(bool deleteEarly) override
void setProcessHistory_(ProcessHistory const &ph) override
ProductProvenance const * productProvenancePtr_() const override
bool isFromCurrentProcess() const final
bool productUnavailable_() const override
void resetBranchDescription_(std::shared_ptr< BranchDescription const > bd) override
void mergeProduct(std::unique_ptr< WrapperBase > edp) const
ProductProvenance const * productProvenancePtr_() const override
void putOrMergeProduct_(std::unique_ptr< WrapperBase > prod) const final
std::atomic< bool > prefetchRequested_
ProductProvenance const * productProvenancePtr_() const override
std::atomic< unsigned int > lastCheckIndex_
TypeWithDict const & unwrappedType() const
void setProvenance_(ProductProvenanceRetriever const *provRetriever, ProcessHistory const &ph, ProductID const &pid) final
void prefetchAsync_(WaitingTask *waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger but the state exists so we define the behavior If all triggers are the negative crieriion will lead to accepting the event(this again matches the behavior of"!*"before the partial wildcard feature was incorporated).The per-event"cost"of each negative criterion with multiple relevant triggers is about the same as!*was in the past
std::type_info const & unvalidatedTypeInfo() const
Definition: TypeWithDict.h:75
void putProduct_(std::unique_ptr< WrapperBase > edp) const override
virtual bool isFromCurrentProcess() const =0
Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
int k[5][pyjets_maxn]
void setProduct(std::unique_ptr< WrapperBase > edp) const
void prefetchAsync_(WaitingTask *waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
NoProcessProductResolver(std::vector< ProductResolverIndex > const &matchingHolders, std::vector< bool > const &ambiguous, bool madeAtEnd)
std::atomic< bool > skippingPrefetchRequested_
std::atomic< ProductStatus > theStatus_
std::vector< bool > ambiguous_
Resolution resolveProductImpl(FUNC resolver) const
DelayedReader * reader() const
Definition: Principal.h:182
UnscheduledAuxiliary const * auxiliary() const
void resetProductData_(bool deleteEarly) override
static unsigned int kAmbiguousOffset
void setupUnscheduled(UnscheduledConfigurator const &) final
ProductStatus defaultStatus() const
bool productWasFetchedAndIsValid(bool iSkipCurrentProcess) const
bool productWasFetchedAndIsValid_(bool iSkipCurrentProcess) const final
void setProcessHistory_(ProcessHistory const &ph) override
void setupUnscheduled(UnscheduledConfigurator const &) final
def load(fileName)
Definition: svgfig.py:546
BranchDescription const & branchDescription_() const override
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:92
void addContext(std::string const &context)
Definition: Exception.cc:227
bool singleProduct_() const override
std::vector< ProductResolverIndex > matchingHolders_
void resetBranchDescription_(std::shared_ptr< BranchDescription const > bd) override
void tryPrefetchResolverAsync(unsigned int iProcessingIndex, Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc, ServiceToken token) const
HLT enums.
BranchDescription const & branchDescription() const
Worker * findWorker(std::string const &iLabel) const
ConstProductResolverPtr getProductResolverByIndex(ProductResolverIndex const &oid) const
Definition: Principal.cc:513
ProductProvenance const * productProvenancePtr_() const final
void putProduct_(std::unique_ptr< WrapperBase > edp) const override
void putOrMergeProduct_(std::unique_ptr< WrapperBase > prod) const final
void setProvenance_(ProductProvenanceRetriever const *provRetriever, ProcessHistory const &ph, ProductID const &pid) override
void setProcessHistory_(ProcessHistory const &ph) override
void putProduct_(std::unique_ptr< WrapperBase > edp) const override
void setupUnscheduled(UnscheduledConfigurator const &) final
ProductData const & getProductData() const
bool productWasDeleted_() const override
Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
void putOrMergeProduct_(std::unique_ptr< WrapperBase > prod) const final
bool productWasFetchedAndIsValid_(bool iSkipCurrentProcess) const override
void resetProductData_(bool deleteEarly) override
Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
void putOrMergeProduct(std::unique_ptr< WrapperBase > edp) const
void setProvenance_(ProductProvenanceRetriever const *provRetriever, ProcessHistory const &ph, ProductID const &pid) override
void setProcessHistory_(ProcessHistory const &ph) final
bool productWasFetchedAndIsValid_(bool iSkipCurrentProcess) const override
void prefetchAsync_(WaitingTask *waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
def move(src, dest)
Definition: eostools.py:510
def branchType(schema, name)
Definition: revisionDML.py:112
std::string const & moduleLabel() const
WrapperBase * unsafe_wrapper() const
Definition: ProductData.h:34
static HepMC::HEPEVT_Wrapper wrapper
void putProduct_(std::unique_ptr< WrapperBase > edp) const override
void prefetchAsync_(WaitingTask *waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override