CMS 3D CMS Logo

ProductResolvers.cc
Go to the documentation of this file.
1 /*----------------------------------------------------------------------
2 ----------------------------------------------------------------------*/
3 #include "ProductResolvers.h"
4 #include "Worker.h"
5 #include "UnscheduledAuxiliary.h"
19 
20 #include <cassert>
21 #include <utility>
22 
23 static constexpr unsigned int kUnsetOffset = 0;
24 static constexpr unsigned int kAmbiguousOffset = 1;
25 static constexpr unsigned int kMissingOffset = 2;
26 
27 namespace edm {
28 
31  exception << "ProductResolverBase::resolveProduct_: The product matching all criteria was already deleted\n"
32  << "Looking for type: " << branchDescription().unwrappedTypeID() << "\n"
33  << "Looking for module label: " << moduleLabel() << "\n"
34  << "Looking for productInstanceName: " << productInstanceName() << "\n"
35  << (processName().empty() ? "" : "Looking for process: ") << processName() << "\n"
36  << "This means there is a configuration error.\n"
37  << "The module which is asking for this data must be configured to state that it will read this data.";
38  throw exception;
39 
40  }
41 
42  //This is a templated function in order to avoid calling another virtual function
43  template <bool callResolver, typename FUNC>
46 
47  if(productWasDeleted()) {
49  }
50  auto presentStatus = status();
51 
52  if(callResolver && presentStatus == ProductStatus::ResolveNotRun) {
53  //if resolver fails because of exception or not setting product
54  // make sure the status goes to failed
55  auto failedStatusSetter = [this](ProductStatus* presentStatus) {
56  if(this->status() == ProductStatus::ResolveNotRun) {
57  this->setFailedStatus();
58  }
59  *presentStatus = this->status();
60  };
61  std::unique_ptr<ProductStatus, decltype(failedStatusSetter)> failedStatusGuard(&presentStatus, failedStatusSetter);
62 
63  //If successful, this will call setProduct
64  resolver();
65  }
66 
67 
68  if (presentStatus == ProductStatus::ProductSet) {
69  auto pd = &getProductData();
70  if(pd->wrapper()->isPresent()) {
71  return Resolution(pd);
72  }
73  }
74 
75  return Resolution(nullptr);
76  }
77 
78  void
79  DataManagingProductResolver::mergeProduct(std::unique_ptr<WrapperBase> iFrom) const {
80  assert(status() == ProductStatus::ProductSet);
81  if(not iFrom) { return;}
82 
83  checkType(*iFrom);
84 
86  if(original->isMergeable()) {
87  original->mergeProduct(iFrom.get());
88  } else if(original->hasIsProductEqual()) {
89  if(!original->isProductEqual(iFrom.get())) {
90  auto const& bd = branchDescription();
91  edm::LogError("RunLumiMerging")
92  << "ProductResolver::mergeTheProduct\n"
93  << "Two run/lumi products for the same run/lumi which should be equal are not\n"
94  << "Using the first, ignoring the second\n"
95  << "className = " << bd.className() << "\n"
96  << "moduleLabel = " << bd.moduleLabel() << "\n"
97  << "instance = " << bd.productInstanceName() << "\n"
98  << "process = " << bd.processName() << "\n";
99  }
100  } else {
101  auto const& bd = branchDescription();
102  edm::LogWarning("RunLumiMerging")
103  << "ProductResolver::mergeTheProduct\n"
104  << "Run/lumi product has neither a mergeProduct nor isProductEqual function\n"
105  << "Using the first, ignoring the second in merge\n"
106  << "className = " << bd.className() << "\n"
107  << "moduleLabel = " << bd.moduleLabel() << "\n"
108  << "instance = " << bd.productInstanceName() << "\n"
109  << "process = " << bd.processName() << "\n";
110  }
111  }
112 
113 
116  bool,
118  ModuleCallingContext const* mcc) const {
119  return resolveProductImpl<true>([this,&principal,mcc]() {
120  auto branchType = principal.branchType();
121  if(branchType != InEvent) {
122  //delayed get has not been allowed with Run or Lumis
123  // The file may already be closed so the reader is invalid
124  return;
125  }
126  if(mcc and (branchType == InEvent) and aux_) {
127  aux_->preModuleDelayedGetSignal_.emit(*(mcc->getStreamContext()),*mcc);
128  }
129 
130  auto sentry( make_sentry(mcc,
131  [this, branchType](ModuleCallingContext const* iContext){
132  if(branchType == InEvent and aux_) {
133  aux_->postModuleDelayedGetSignal_.emit(*(iContext->getStreamContext()), *iContext); }
134  }));
135 
136  if(auto reader=principal.reader()) {
137  std::unique_lock<std::recursive_mutex> guard;
138  if(auto sr = reader->sharedResources().second) {
139  guard =std::unique_lock<std::recursive_mutex>(*sr);
140  }
141  if ( not productResolved()) {
142  //another thread could have beaten us here
143  putProduct( reader->getProduct(branchDescription().branchID(), &principal, mcc));
144  }
145  }
146  });
147 
148  }
149 
150  void
152  if(auto reader = principal.reader()) {
153 
154  std::unique_lock<std::recursive_mutex> guard;
155  if(auto sr = reader->sharedResources().second) {
156  guard =std::unique_lock<std::recursive_mutex>(*sr);
157  }
158 
159  //Can't use resolveProductImpl since it first checks to see
160  // if the product was already retrieved and then returns if it is
161  std::unique_ptr<WrapperBase> edp(reader->getProduct(branchDescription().branchID(), &principal));
162 
163  if(edp.get() != nullptr) {
165  } else if( status()== defaultStatus()) {
166  setFailedStatus();
167  }
168  }
169  }
170 
171 
173  Principal const& principal,
174  bool skipCurrentProcess,
175  ServiceToken const& token,
177  ModuleCallingContext const* mcc) const {
178  m_waitingTasks.add(waitTask);
179 
180  bool expected = false;
181  if( m_prefetchRequested.compare_exchange_strong(expected, true) ) {
182 
183  auto workToDo = [this, mcc, &principal, token] () {
184  //need to make sure Service system is activated on the reading thread
185  ServiceRegistry::Operate guard(token);
186  try {
187  resolveProductImpl<true>([this,&principal,mcc]() {
188  if(principal.branchType() != InEvent) { return; }
189  if(auto reader = principal.reader()) {
190  std::unique_lock<std::recursive_mutex> guard;
191  if(auto sr = reader->sharedResources().second) {
192  guard =std::unique_lock<std::recursive_mutex>(*sr);
193  }
194  if ( not productResolved()) {
195  //another thread could have finished this while we were waiting
196  putProduct( reader->getProduct(branchDescription().branchID(), &principal, mcc));
197  }
198  }
199  });
200  } catch(...) {
201  this->m_waitingTasks.doneWaiting(std::current_exception());
202  return;
203  }
204  this->m_waitingTasks.doneWaiting(nullptr);
205  };
206 
207  SerialTaskQueueChain* queue = nullptr;
208  if(auto reader = principal.reader()) {
209  if (auto shared_res = reader->sharedResources().first) {
210  queue = &(shared_res->serialQueueChain());
211  }
212  }
213  if(queue) {
214  queue->push(workToDo);
215  } else {
216  //Have to create a new task
217  auto t = make_functor_task(tbb::task::allocate_root(),
218  workToDo);
219  tbb::task::spawn(*t);
220  }
221  }
222  }
223 
224  void
226  m_prefetchRequested = false;
227  m_waitingTasks.reset();
229  }
230 
231  void
233  aux_ = iConfigure.auxiliary();
234  }
235 
236 
237  bool
239  return false;
240  }
241 
242 
245  bool skipCurrentProcess,
247  ModuleCallingContext const*) const {
248  if (!skipCurrentProcess) {
249  //'false' means never call the lambda function
250  return resolveProductImpl<false>([](){return;});
251  }
252  return Resolution(nullptr);
253  }
254 
256  Principal const& principal,
257  bool skipCurrentProcess,
258  ServiceToken const& token,
260  ModuleCallingContext const* mcc) const {
261  if(not skipCurrentProcess) {
262  if(branchDescription().availableOnlyAtEndTransition() and mcc ) {
263  if( not mcc->parent().isAtEndTransition() ) {
264  return;
265  }
266  }
267  m_waitingTasks.add(waitTask);
268 
269  bool expected = false;
270  if(worker_ and prefetchRequested_.compare_exchange_strong(expected,true)) {
271  //using a waiting task to do a callback guarantees that
272  // the m_waitingTasks list will be released from waiting even
273  // if the module does not put this data product or the
274  // module has an exception while running
275 
276  auto waiting = make_waiting_task(tbb::task::allocate_root(),
277  [this](std::exception_ptr const * iException) {
278  if(nullptr != iException) {
279  m_waitingTasks.doneWaiting(*iException);
280  } else {
281  m_waitingTasks.doneWaiting(std::exception_ptr());
282  }
283  });
284  worker_->callWhenDoneAsync(waiting);
285  }
286  }
287  }
288 
289  void
290  PuttableProductResolver::putProduct_(std::unique_ptr<WrapperBase> edp) const {
292  bool expected = false;
293  if(prefetchRequested_.compare_exchange_strong(expected,true)) {
294  m_waitingTasks.doneWaiting(std::exception_ptr());
295  }
296  }
297 
298 
299  void
301  m_waitingTasks.reset();
303  prefetchRequested_ = false;
304  }
305 
306  void
308  worker_ = iConfigure.findWorker(branchDescription().moduleLabel());
309  }
310 
311 
312  void
314  aux_ = iConfigure.auxiliary();
315  worker_ = iConfigure.findWorker(branchDescription().moduleLabel());
316  assert(worker_ != nullptr);
317 
318  }
319 
322  bool skipCurrentProcess,
324  ModuleCallingContext const* mcc) const {
325  if (!skipCurrentProcess and worker_) {
326  return resolveProductImpl<true>(
327  [&principal,this,sra,mcc]() {
328  try {
329  auto const& event = static_cast<EventPrincipal const&>(principal);
330  ParentContext parentContext(mcc);
331  aux_->preModuleDelayedGetSignal_.emit(*(mcc->getStreamContext()),*mcc);
332 
333  auto workCall = [this,&event,&parentContext,mcc] () {
334  auto sentry( make_sentry(mcc,[this](ModuleCallingContext const* iContext){aux_->postModuleDelayedGetSignal_.emit(*(iContext->getStreamContext()), *iContext); }));
335 
337  event,
338  *(aux_->eventSetup()),
339  event.streamID(),
340  parentContext,
341  mcc->getStreamContext());
342  };
343 
344  if (sra) {
345  assert(false);
346  } else {
347  workCall();
348  }
349 
350  }
351  catch (cms::Exception & ex) {
352  std::ostringstream ost;
353  ost << "Calling produce method for unscheduled module "
354  << worker_->description().moduleName() << "/'"
355  << worker_->description().moduleLabel() << "'";
356  ex.addContext(ost.str());
357  throw;
358  }
359  });
360  }
361  return Resolution(nullptr);
362  }
363 
364  void
366  Principal const& principal,
367  bool skipCurrentProcess,
368  ServiceToken const& token,
370  ModuleCallingContext const* mcc) const
371  {
372  if(skipCurrentProcess) { return; }
373  waitingTasks_.add(waitTask);
374  bool expected = false;
375  if(prefetchRequested_.compare_exchange_strong(expected, true)) {
376 
377  //Have to create a new task which will make sure the state for UnscheduledProductResolver
378  // is properly set after the module has run
379  auto t = make_waiting_task(tbb::task::allocate_root(),
380  [this](std::exception_ptr const* iPtr)
381  {
382  //The exception is being rethrown because resolveProductImpl sets the ProductResolver to a failed
383  // state for the case where an exception occurs during the call to the function.
384  try {
385  resolveProductImpl<true>([iPtr]() {
386  if ( iPtr) {
387  std::rethrow_exception(*iPtr);
388  }
389  });
390  } catch(...) {
391  waitingTasks_.doneWaiting(std::current_exception());
392  return;
393  }
394  waitingTasks_.doneWaiting(nullptr);
395  } );
396  auto const& event = static_cast<EventPrincipal const&>(principal);
397  ParentContext parentContext(mcc);
398 
400  event,
401  *(aux_->eventSetup()),
402  token,
403  event.streamID(),
404  parentContext,
405  mcc->getStreamContext());
406  }
407  }
408 
409  void
411  prefetchRequested_ = false;
412  waitingTasks_.reset();
414  }
415 
416 
417  void
418  ProducedProductResolver::putProduct_(std::unique_ptr<WrapperBase> edp) const {
419  if(status() != defaultStatus()) {
421  << "Attempt to insert more than one product on branch " << branchDescription().branchName() << "\n";
422  }
423 
424  setProduct(std::move(edp)); // ProductResolver takes ownership
425  }
426 
427  void
428  InputProductResolver::putProduct_(std::unique_ptr<WrapperBase> edp) const {
429  if ( not productResolved()) {
430  //Another thread could have set this
431  setProduct(std::move(edp));
432  }
433  }
434 
435  bool
437  return true;
438  }
439 
440  void
442  assert(false);
443  }
444 
445  void
446  DataManagingProductResolver::putOrMergeProduct_(std::unique_ptr<WrapperBase> prod) const {
447  if(not prod) {return;}
448  if(status() == defaultStatus()) {
449  //resolveProduct has not been called or it failed
450  putProduct(std::move(prod));
451  } else {
452  mergeProduct(std::move(prod));
453  }
454  }
455 
456 
457 
458  void
460  // Check if the types match.
461  TypeID typeID(prod.dynamicTypeInfo());
463  // Types do not match.
465  << "Product on branch " << branchDescription().branchName() << " is of wrong type.\n"
466  << "It is supposed to be of type " << branchDescription().className() << ".\n"
467  << "It is actually of type " << typeID.className() << ".\n";
468  }
469  }
470 
471 
472  void
473  DataManagingProductResolver::setProduct(std::unique_ptr<WrapperBase> edp) const {
474  if(edp) {
475  checkType(*edp);
478  } else {
479  setFailedStatus();
480  }
481  }
482  // This routine returns true if it is known that currently there is no real product.
483  // If there is a real product, it returns false.
484  // If it is not known if there is a real product, it returns false.
485  bool
487  auto presentStatus = status();
488  if(presentStatus == ProductStatus::ProductSet) {
489  return !(getProductData().wrapper()->isPresent());
490  }
491  return presentStatus != ProductStatus::ResolveNotRun;
492  }
493 
494  bool
496  auto s = status();
497  return (s != defaultStatus() ) or (s == ProductStatus::ProductDeleted);
498  }
499 
500 
501  // This routine returns true if the product was deleted early in order to save memory
502  bool
505  }
506 
507  bool
509  if (iSkipCurrentProcess and isFromCurrentProcess() ) {
510  return false;
511  }
513  if(getProductData().wrapper()->isPresent()) {
514  return true;
515  }
516  }
517  return false;
518  }
519 
520 
522  productData_.setProvenance(provRetriever,ph,pid);
523  }
524 
527  }
528 
530  return provenance()->productProvenance();
531  }
532 
536  }
537  if(deleteEarly) {
539  } else {
540  resetStatus();
541  }
542  }
543 
545  return true;
546  }
547 
549  realProduct_.setProvenance(provRetriever,ph,pid);
550  }
551 
553  realProduct_.setProcessHistory(ph);
554  }
555 
557  return provenance()->productProvenance();
558  }
559 
561  realProduct_.resetProductData_(deleteEarly);
562  }
563 
565  return true;
566  }
567 
568  void AliasProductResolver::putProduct_(std::unique_ptr<WrapperBase> ) const {
570  << "AliasProductResolver::putProduct_() not implemented and should never be called.\n"
571  << "Contact a Framework developer\n";
572  }
573 
574  void AliasProductResolver::putOrMergeProduct_(std::unique_ptr<WrapperBase> edp) const {
576  << "AliasProductResolver::putOrMergeProduct_(std::unique_ptr<WrapperBase> edp) not implemented and should never be called.\n"
577  << "Contact a Framework developer\n";
578  }
579 
580 
581 
583  provRetriever_ = provRetriever;
584  }
585 
587  }
588 
590  return provRetriever_? provRetriever_->branchIDToProvenance(bd_->originalBranchID()): nullptr;
591  }
592 
594  }
595 
597  return true;
598  }
599 
600  void ParentProcessProductResolver::putProduct_(std::unique_ptr<WrapperBase> ) const {
602  << "ParentProcessProductResolver::putProduct_() not implemented and should never be called.\n"
603  << "Contact a Framework developer\n";
604  }
605 
606  void ParentProcessProductResolver::putOrMergeProduct_(std::unique_ptr<WrapperBase> edp) const {
608  << "ParentProcessProductResolver::putOrMergeProduct_(std::unique_ptr<WrapperBase> edp) not implemented and should never be called.\n"
609  << "Contact a Framework developer\n";
610  }
611 
613  // In principle, this ought to be fixed. I noticed one hits this error
614  // when in a SubProcess and calling the Event::getProvenance function
615  // with a BranchID to a branch from an earlier SubProcess or the top
616  // level process and this branch is not kept in this SubProcess. It might
617  // be possible to hit this in other contexts. I say it ought to be
618  // fixed because one does not encounter this issue if the SubProcesses
619  // are split into genuinely different processes (in principle that
620  // ought to give identical behavior and results). No user has ever
621  // reported this issue which has been around for some time and it was only
622  // noticed when testing some rare corner cases after modifying Core code.
623  // After discussing this with Chris we decided that at least for the moment
624  // there are higher priorities than fixing this ... I converted it so it
625  // causes an exception instead of a seg fault. The issue that may need to
626  // be addressed someday is how ProductResolvers for non-kept branches are
627  // connected to earlier SubProcesses.
629  << "ParentProcessProductResolver::throwNullRealProduct RealProduct pointer not set in this context.\n"
630  << "Contact a Framework developer\n";
631  }
632 
634  NoProcessProductResolver(std::vector<ProductResolverIndex> const& matchingHolders,
635  std::vector<bool> const& ambiguous,
636  bool madeAtEnd) :
637  matchingHolders_(matchingHolders),
638  ambiguous_(ambiguous),
639  lastCheckIndex_(ambiguous_.size() + kUnsetOffset),
640  lastSkipCurrentCheckIndex_(lastCheckIndex_.load()),
641  prefetchRequested_(false),
642  skippingPrefetchRequested_(false),
643  madeAtEnd_{madeAtEnd}
644  {
645  assert(ambiguous_.size() == matchingHolders_.size());
646  }
647 
650  Principal const& principal,
651  bool skipCurrentProcess,
653  ModuleCallingContext const* mcc) const {
654  ProductResolverBase const* productResolver = principal.getProductResolverByIndex(matchingHolders_[index]);
655  return productResolver->resolveProduct(principal, skipCurrentProcess, sra, mcc);
656  }
657 
658 
661  bool skipCurrentProcess,
663  ModuleCallingContext const* mcc) const {
664  //See if we've already cached which Resolver we should call or if
665  // we know it is ambiguous
666  const unsigned int choiceSize = ambiguous_.size();
667 
668  //madeAtEnd_==true and not at end transition is the same as skipping the current process
669  if( (not skipCurrentProcess) and (madeAtEnd_ and mcc)) {
670  skipCurrentProcess = not mcc->parent().isAtEndTransition();
671  }
672 
673  unsigned int checkCacheIndex = skipCurrentProcess? lastSkipCurrentCheckIndex_.load() : lastCheckIndex_.load();
674  if( checkCacheIndex != choiceSize +kUnsetOffset) {
675  if (checkCacheIndex == choiceSize+kAmbiguousOffset) {
677  } else if(checkCacheIndex == choiceSize+kMissingOffset) {
678  return Resolution(nullptr);
679  }
680  return tryResolver(checkCacheIndex, principal, skipCurrentProcess,
681  sra,mcc);
682  }
683 
684  std::atomic<unsigned int>& updateCacheIndex = skipCurrentProcess? lastSkipCurrentCheckIndex_ : lastCheckIndex_;
685 
686  std::vector<unsigned int> const& lookupProcessOrder = principal.lookupProcessOrder();
687  for(unsigned int k : lookupProcessOrder) {
688  assert(k < ambiguous_.size());
689  if(k == 0) break; // Done
690  if(ambiguous_[k]) {
691  updateCacheIndex = choiceSize + kAmbiguousOffset;
693  }
695  auto resolution = tryResolver(k,principal, skipCurrentProcess, sra,mcc);
696  if(resolution.data() != nullptr) {
697  updateCacheIndex = k;
698  return resolution;
699  }
700  }
701  }
702 
703  updateCacheIndex = choiceSize + kMissingOffset;
704  return Resolution(nullptr);
705  }
706 
707  void
709  Principal const& principal,
710  bool skipCurrentProcess,
711  ServiceToken const& token,
713  ModuleCallingContext const* mcc) const {
714  bool timeToMakeAtEnd = true;
715  if(madeAtEnd_ and mcc) {
716  timeToMakeAtEnd = mcc->parent().isAtEndTransition();
717  }
718 
719  //If timeToMakeAtEnd is false, then it is equivalent to skipping the current process
720  if(not skipCurrentProcess and timeToMakeAtEnd) {
721  waitingTasks_.add(waitTask);
722 
723  bool expected = false;
724  if( prefetchRequested_.compare_exchange_strong(expected,true)) {
725  //we are the first thread to request
726  tryPrefetchResolverAsync(0, principal, false, sra, mcc, token);
727  }
728  } else {
729  skippingWaitingTasks_.add(waitTask);
730  bool expected = false;
731  if( skippingPrefetchRequested_.compare_exchange_strong(expected,true)) {
732  //we are the first thread to request
733  tryPrefetchResolverAsync(0, principal, true, sra, mcc, token);
734  }
735  }
736  }
737 
738  void NoProcessProductResolver::setCache(bool iSkipCurrentProcess,
739  ProductResolverIndex iIndex,
740  std::exception_ptr iExceptPtr) const {
741  if( not iSkipCurrentProcess) {
742  lastCheckIndex_ = iIndex;
743  waitingTasks_.doneWaiting(iExceptPtr);
744  } else {
747  }
748  }
749 
750  namespace {
751  class TryNextResolverWaitingTask : public edm::WaitingTask {
752  public:
753 
754  TryNextResolverWaitingTask(NoProcessProductResolver const* iResolver,
755  unsigned int iResolverIndex,
756  Principal const* iPrincipal,
758  ModuleCallingContext const* iMCC,
759  bool iSkipCurrentProcess,
760  ServiceToken iToken) :
761  resolver_(iResolver),
762  principal_(iPrincipal),
763  sra_(iSRA),
764  mcc_(iMCC),
765  serviceToken_(iToken),
766  index_(iResolverIndex),
767  skipCurrentProcess_(iSkipCurrentProcess){}
768 
769  tbb::task* execute() override {
770  auto exceptPtr =exceptionPtr();
771  if(exceptPtr) {
772  resolver_->prefetchFailed(index_, *principal_, skipCurrentProcess_, *exceptPtr);
773  } else {
774  if(not resolver_->dataValidFromResolver(index_, *principal_, skipCurrentProcess_)) {
775  resolver_->tryPrefetchResolverAsync(index_+1,
776  *principal_,
777  skipCurrentProcess_,
778  sra_,
779  mcc_,
780  serviceToken_);
781  }
782  }
783  return nullptr;
784  }
785 
786  private:
787  NoProcessProductResolver const* resolver_;
788  Principal const* principal_;
790  ModuleCallingContext const* mcc_;
791  ServiceToken serviceToken_;
792  unsigned int index_;
793  bool skipCurrentProcess_;
794  };
795  }
796 
797  void
798  NoProcessProductResolver::prefetchFailed(unsigned int iProcessingIndex,
799  Principal const& principal,
800  bool iSkipCurrentProcess,
801  std::exception_ptr iExceptPtr) const {
802  std::vector<unsigned int> const& lookupProcessOrder = principal.lookupProcessOrder();
803  auto k = lookupProcessOrder[iProcessingIndex];
804 
805  setCache(iSkipCurrentProcess, k, iExceptPtr);
806  }
807 
808 
809  bool
810  NoProcessProductResolver::dataValidFromResolver(unsigned int iProcessingIndex,
811  Principal const& principal,
812  bool iSkipCurrentProcess) const {
813  std::vector<unsigned int> const& lookupProcessOrder = principal.lookupProcessOrder();
814  auto k = lookupProcessOrder[iProcessingIndex];
815  ProductResolverBase const* productResolver = principal.getProductResolverByIndex(matchingHolders_[k]);
816 
817  if(productResolver->productWasFetchedAndIsValid(iSkipCurrentProcess)) {
818 
819  setCache(iSkipCurrentProcess, k, nullptr);
820  return true;
821  }
822  return false;
823  }
824 
825 
826  void
828  Principal const& principal,
829  bool skipCurrentProcess,
831  ModuleCallingContext const* mcc,
832  ServiceToken token) const {
833  std::vector<unsigned int> const& lookupProcessOrder = principal.lookupProcessOrder();
834  auto index = iProcessingIndex;
835 
836  const unsigned int choiceSize = ambiguous_.size();
837  unsigned int newCacheIndex = choiceSize + kMissingOffset;
838  while(index < lookupProcessOrder.size()) {
839  auto k = lookupProcessOrder[index];
840  if(k==0) {
841  break;
842  }
843  assert(k < ambiguous_.size());
844  if(ambiguous_[k]) {
845  newCacheIndex = choiceSize + kAmbiguousOffset;
846  break;
847  }
849  //make new task
850 
851  auto task = new (tbb::task::allocate_root()) TryNextResolverWaitingTask(
852  this,
853  index,
854  &principal,
855  sra,
856  mcc,
857  skipCurrentProcess,
858  token
859  );
860  task->increment_ref_count();
861  ProductResolverBase const* productResolver = principal.getProductResolverByIndex(matchingHolders_[k]);
862 
863  //Make sure the Services are available on this thread
864  ServiceRegistry::Operate guard(token);
865 
866  productResolver->prefetchAsync(task,
867  principal,
868  skipCurrentProcess,
869  token,
870  sra, mcc);
871  if(0 == task->decrement_ref_count()) {
872  tbb::task::spawn(*task);
873  }
874  return;
875  }
876  ++index;
877  }
878  //data product unavailable
879  setCache(skipCurrentProcess, newCacheIndex, nullptr);
880  }
881 
883  }
884 
886  }
887 
889  return nullptr;
890  }
891 
892  inline unsigned int NoProcessProductResolver::unsetIndexValue() const { return ambiguous_.size()+kUnsetOffset; }
893 
895  const auto resetValue = unsetIndexValue();
896  lastCheckIndex_ = resetValue;
897  lastSkipCurrentCheckIndex_ = resetValue;
898  prefetchRequested_ = false;
902  }
903 
905  return false;
906  }
907 
910  << "NoProcessProductResolver::unscheduledWasNotRun_() not implemented and should never be called.\n"
911  << "Contact a Framework developer\n";
912  }
913 
916  << "NoProcessProductResolver::productUnavailable_() not implemented and should never be called.\n"
917  << "Contact a Framework developer\n";
918  }
919 
922  << "NoProcessProductResolver::productResolved_() not implemented and should never be called.\n"
923  << "Contact a Framework developer\n";
924  }
925 
928  << "NoProcessProductResolver::productWasDeleted_() not implemented and should never be called.\n"
929  << "Contact a Framework developer\n";
930  }
931 
932  bool NoProcessProductResolver::productWasFetchedAndIsValid_(bool /*iSkipCurrentProcess*/) const {
934  << "NoProcessProductResolver::productWasFetchedAndIsValid_() not implemented and should never be called.\n"
935  << "Contact a Framework developer\n";
936  }
937 
938  void NoProcessProductResolver::putProduct_(std::unique_ptr<WrapperBase> ) const {
940  << "NoProcessProductResolver::putProduct_() not implemented and should never be called.\n"
941  << "Contact a Framework developer\n";
942  }
943 
944  void NoProcessProductResolver::putOrMergeProduct_(std::unique_ptr<WrapperBase> edp) const {
946  << "NoProcessProductResolver::putOrMergeProduct_(std::unique_ptr<WrapperBase> edp) not implemented and should never be called.\n"
947  << "Contact a Framework developer\n";
948  }
949 
952  << "NoProcessProductResolver::branchDescription_() not implemented and should never be called.\n"
953  << "Contact a Framework developer\n";
954  }
955 
956  void NoProcessProductResolver::resetBranchDescription_(std::shared_ptr<BranchDescription const>) {
958  << "NoProcessProductResolver::resetBranchDescription_() not implemented and should never be called.\n"
959  << "Contact a Framework developer\n";
960  }
961 
964  << "NoProcessProductResolver::provenance_() not implemented and should never be called.\n"
965  << "Contact a Framework developer\n";
966  }
967 
970  << "NoProcessProductResolver::connectTo() not implemented and should never be called.\n"
971  << "Contact a Framework developer\n";
972 
973  }
974 
975  //---- SingleChoiceNoProcessProductResolver ----------------
977  Principal const& principal,
978  bool skipCurrentProcess,
980  ModuleCallingContext const* mcc) const
981  {
982  //NOTE: Have to lookup the other ProductResolver each time rather than cache
983  // it's pointer since it appears the pointer can change at some later stage
984  return principal.getProductResolverByIndex(realResolverIndex_)
985  ->resolveProduct(principal,
986  skipCurrentProcess, sra, mcc);
987  }
988 
990  Principal const& principal,
991  bool skipCurrentProcess,
992  ServiceToken const& token,
994  ModuleCallingContext const* mcc) const {
995  principal.getProductResolverByIndex(realResolverIndex_)
996  ->prefetchAsync(waitTask,principal,
997  skipCurrentProcess, token, sra, mcc);
998  }
999 
1001  }
1002 
1004  }
1005 
1007  return nullptr;
1008  }
1009 
1011  }
1012 
1014  return false;
1015  }
1016 
1019  << "SingleChoiceNoProcessProductResolver::unscheduledWasNotRun_() not implemented and should never be called.\n"
1020  << "Contact a Framework developer\n";
1021  }
1022 
1025  << "SingleChoiceNoProcessProductResolver::productUnavailable_() not implemented and should never be called.\n"
1026  << "Contact a Framework developer\n";
1027  }
1028 
1031  << "SingleChoiceNoProcessProductResolver::productResolved_() not implemented and should never be called.\n"
1032  << "Contact a Framework developer\n";
1033  }
1034 
1037  << "SingleChoiceNoProcessProductResolver::productWasDeleted_() not implemented and should never be called.\n"
1038  << "Contact a Framework developer\n";
1039  }
1040 
1043  << "SingleChoiceNoProcessProductResolver::productWasFetchedAndIsValid_() not implemented and should never be called.\n"
1044  << "Contact a Framework developer\n";
1045  }
1046 
1047 
1048  void SingleChoiceNoProcessProductResolver::putProduct_(std::unique_ptr<WrapperBase> ) const {
1050  << "SingleChoiceNoProcessProductResolver::putProduct_() not implemented and should never be called.\n"
1051  << "Contact a Framework developer\n";
1052  }
1053 
1054  void SingleChoiceNoProcessProductResolver::putOrMergeProduct_(std::unique_ptr<WrapperBase> edp) const {
1056  << "SingleChoiceNoProcessProductResolver::putOrMergeProduct_(std::unique_ptr<WrapperBase> edp) not implemented and should never be called.\n"
1057  << "Contact a Framework developer\n";
1058  }
1059 
1062  << "SingleChoiceNoProcessProductResolver::branchDescription_() not implemented and should never be called.\n"
1063  << "Contact a Framework developer\n";
1064  }
1065 
1066  void SingleChoiceNoProcessProductResolver::resetBranchDescription_(std::shared_ptr<BranchDescription const>) {
1068  << "SingleChoiceNoProcessProductResolver::resetBranchDescription_() not implemented and should never be called.\n"
1069  << "Contact a Framework developer\n";
1070  }
1071 
1074  << "SingleChoiceNoProcessProductResolver::provenance_() not implemented and should never be called.\n"
1075  << "Contact a Framework developer\n";
1076  }
1077 
1080  << "SingleChoiceNoProcessProductResolver::connectTo() not implemented and should never be called.\n"
1081  << "Contact a Framework developer\n";
1082 
1083  }
1084 
1085 }
size
Write out results.
void connectTo(ProductResolverBase const &iOther, Principal const *) final
Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
std::string const & branchName() const
std::string const & productInstanceName() const
std::unique_ptr< T, F > make_sentry(T *iObject, F iFunc)
NOTE: if iObject is null, then iFunc will not be called.
Definition: make_sentry.h:29
void resetProductData_(bool deleteEarly) override
Provenance const * provenance() const
void setProcessHistory(ProcessHistory const &ph)
Definition: ProductData.h:59
unsigned int ProductResolverIndex
StreamContext const * getStreamContext() const
void resetProductData_(bool deleteEarly) override
void checkType(WrapperBase const &prod) const
std::type_info const & dynamicTypeInfo() const
Definition: WrapperBase.h:43
void prefetchAsync(WaitingTask *waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const
unsigned int unsetIndexValue() const
void resetProductData_(bool deleteEarly) override
void prefetchAsync_(WaitingTask *waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
FunctorTask< F > * make_functor_task(ALLOC &&iAlloc, F f)
Definition: FunctorTask.h:47
void setCache(bool skipCurrentProcess, ProductResolverIndex index, std::exception_ptr exceptionPtr) const
void add(WaitingTask *)
Adds task to the waiting list.
void setProcessHistory_(ProcessHistory const &ph) override
ProductStatus status() const
bool singleProduct_() const override
bool unscheduledWasNotRun_() const override
void putProduct_(std::unique_ptr< WrapperBase > edp) const override
Resolution tryResolver(unsigned int index, Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const
WrapperBase const * wrapper() const
Definition: ProductData.h:32
void resetProductData_(bool deleteEarly) override
ProductProvenance const * productProvenance() const
Definition: Provenance.cc:31
void push(T &&iAction)
asynchronously pushes functor iAction into queue
Resolution resolveProduct(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const
void setProvenance_(ProductProvenanceRetriever const *provRetriever, ProcessHistory const &ph, ProductID const &pid) override
void reset()
Resets access to the resource so that added tasks will wait.
void connectTo(ProductResolverBase const &iOther, Principal const *) final
BranchDescription const & branchDescription_() const override
bool productResolved_() const final
static unsigned int kUnsetOffset
void resetProductData_(bool deleteEarly) override
std::string const & processName() const
void retrieveAndMerge_(Principal const &principal) const override
void connectTo(ProductResolverBase const &, Principal const *) final
#define constexpr
void putProduct_(std::unique_ptr< WrapperBase > edp) const override
bool isPresent() const
Definition: WrapperBase.h:27
void putOrMergeProduct_(std::unique_ptr< WrapperBase > prod) const final
ProductProvenance const * productProvenancePtr_() const override
static unsigned int kMissingOffset
void putProduct_(std::unique_ptr< WrapperBase > edp) const override
void putProduct(std::unique_ptr< WrapperBase > edp) const
void unsafe_setWrapper(std::unique_ptr< WrapperBase > iValue) const
Definition: ProductData.cc:38
void prefetchFailed(unsigned int iProcessingIndex, Principal const &principal, bool iSkipCurrentProcess, std::exception_ptr iExceptPtr) const
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
void setProvenance(ProductProvenanceRetriever const *provRetriever, ProcessHistory const &ph, ProductID const &pid)
Definition: ProductData.h:63
def principal(options)
bool isFromCurrentProcess() const final
std::string const & className() const
std::atomic< unsigned int > lastSkipCurrentCheckIndex_
void resetProductData()
Definition: ProductData.h:51
bool singleProduct_() const override
Provenance const * provenance_() const override
std::vector< unsigned int > const & lookupProcessOrder() const
Definition: Principal.h:190
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
void putOrMergeProduct_(std::unique_ptr< WrapperBase > prod) const final
Provenance const * provenance_() const override
bool dataValidFromResolver(unsigned int iProcessingIndex, Principal const &principal, bool iSkipCurrentProcess) const
void setProvenance_(ProductProvenanceRetriever const *provRetriever, ProcessHistory const &ph, ProductID const &pid) override
TypeID unwrappedTypeID() const
void resetProductData_(bool deleteEarly) override
void setProcessHistory_(ProcessHistory const &ph) override
ProductProvenance const * productProvenancePtr_() const override
bool isFromCurrentProcess() const final
bool productUnavailable_() const override
void resetBranchDescription_(std::shared_ptr< BranchDescription const > bd) override
void mergeProduct(std::unique_ptr< WrapperBase > edp) const
ProductProvenance const * productProvenancePtr_() const override
void putOrMergeProduct_(std::unique_ptr< WrapperBase > prod) const final
std::atomic< bool > prefetchRequested_
ProductProvenance const * productProvenancePtr_() const override
std::atomic< unsigned int > lastCheckIndex_
TypeWithDict const & unwrappedType() const
void setProvenance_(ProductProvenanceRetriever const *provRetriever, ProcessHistory const &ph, ProductID const &pid) final
void prefetchAsync_(WaitingTask *waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger but the state exists so we define the behavior If all triggers are the negative crieriion will lead to accepting the event(this again matches the behavior of"!*"before the partial wildcard feature was incorporated).The per-event"cost"of each negative criterion with multiple relevant triggers is about the same as!*was in the past
std::type_info const & unvalidatedTypeInfo() const
Definition: TypeWithDict.h:76
void putProduct_(std::unique_ptr< WrapperBase > edp) const override
virtual bool isFromCurrentProcess() const =0
Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
int k[5][pyjets_maxn]
void setProduct(std::unique_ptr< WrapperBase > edp) const
void prefetchAsync_(WaitingTask *waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
NoProcessProductResolver(std::vector< ProductResolverIndex > const &matchingHolders, std::vector< bool > const &ambiguous, bool madeAtEnd)
std::atomic< bool > skippingPrefetchRequested_
std::atomic< ProductStatus > theStatus_
std::vector< bool > ambiguous_
Resolution resolveProductImpl(FUNC resolver) const
DelayedReader * reader() const
Definition: Principal.h:182
UnscheduledAuxiliary const * auxiliary() const
void resetProductData_(bool deleteEarly) override
static unsigned int kAmbiguousOffset
void setupUnscheduled(UnscheduledConfigurator const &) final
ProductStatus defaultStatus() const
bool productWasFetchedAndIsValid(bool iSkipCurrentProcess) const
bool productWasFetchedAndIsValid_(bool iSkipCurrentProcess) const final
void setProcessHistory_(ProcessHistory const &ph) override
void setupUnscheduled(UnscheduledConfigurator const &) final
def load(fileName)
Definition: svgfig.py:546
BranchDescription const & branchDescription_() const override
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:92
void addContext(std::string const &context)
Definition: Exception.cc:227
bool singleProduct_() const override
std::vector< ProductResolverIndex > matchingHolders_
void resetBranchDescription_(std::shared_ptr< BranchDescription const > bd) override
void tryPrefetchResolverAsync(unsigned int iProcessingIndex, Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc, ServiceToken token) const
HLT enums.
BranchDescription const & branchDescription() const
Worker * findWorker(std::string const &iLabel) const
ConstProductResolverPtr getProductResolverByIndex(ProductResolverIndex const &oid) const
Definition: Principal.cc:520
ProductProvenance const * productProvenancePtr_() const final
void putProduct_(std::unique_ptr< WrapperBase > edp) const override
void putOrMergeProduct_(std::unique_ptr< WrapperBase > prod) const final
void setProvenance_(ProductProvenanceRetriever const *provRetriever, ProcessHistory const &ph, ProductID const &pid) override
void setProcessHistory_(ProcessHistory const &ph) override
void putProduct_(std::unique_ptr< WrapperBase > edp) const override
void setupUnscheduled(UnscheduledConfigurator const &) final
ProductData const & getProductData() const
bool productWasDeleted_() const override
Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
void putOrMergeProduct_(std::unique_ptr< WrapperBase > prod) const final
bool productWasFetchedAndIsValid_(bool iSkipCurrentProcess) const override
void resetProductData_(bool deleteEarly) override
Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
void putOrMergeProduct(std::unique_ptr< WrapperBase > edp) const
void setProvenance_(ProductProvenanceRetriever const *provRetriever, ProcessHistory const &ph, ProductID const &pid) override
void setProcessHistory_(ProcessHistory const &ph) final
bool productWasFetchedAndIsValid_(bool iSkipCurrentProcess) const override
void prefetchAsync_(WaitingTask *waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
def move(src, dest)
Definition: eostools.py:510
def branchType(schema, name)
Definition: revisionDML.py:112
std::string const & moduleLabel() const
WrapperBase * unsafe_wrapper() const
Definition: ProductData.h:34
static HepMC::HEPEVT_Wrapper wrapper
void putProduct_(std::unique_ptr< WrapperBase > edp) const override
void prefetchAsync_(WaitingTask *waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override