CMS 3D CMS Logo

ProductResolvers.cc
Go to the documentation of this file.
1 /*----------------------------------------------------------------------
2 ----------------------------------------------------------------------*/
3 #include "ProductResolvers.h"
4 #include "Worker.h"
5 #include "UnscheduledAuxiliary.h"
18 
19 #include <cassert>
20 #include <utility>
21 
22 static constexpr unsigned int kUnsetOffset = 0;
23 static constexpr unsigned int kAmbiguousOffset = 1;
24 static constexpr unsigned int kMissingOffset = 2;
25 
26 namespace edm {
27 
30  exception << "ProductResolverBase::resolveProduct_: The product matching all criteria was already deleted\n"
31  << "Looking for type: " << branchDescription().unwrappedTypeID() << "\n"
32  << "Looking for module label: " << moduleLabel() << "\n"
33  << "Looking for productInstanceName: " << productInstanceName() << "\n"
34  << (processName().empty() ? "" : "Looking for process: ") << processName() << "\n"
35  << "This means there is a configuration error.\n"
36  << "The module which is asking for this data must be configured to state that it will read this data.";
37  throw exception;
38 
39  }
40 
41  //This is a templated function in order to avoid calling another virtual function
42  template <bool callResolver, typename FUNC>
45 
46  if(productWasDeleted()) {
48  }
49  auto presentStatus = status();
50 
51  if(callResolver && presentStatus == ProductStatus::ResolveNotRun) {
52  //if resolver fails because of exception or not setting product
53  // make sure the status goes to failed
54  auto failedStatusSetter = [this](ProductStatus* presentStatus) {
55  if(this->status() == ProductStatus::ResolveNotRun) {
56  this->setFailedStatus();
57  }
58  *presentStatus = this->status();
59  };
60  std::unique_ptr<ProductStatus, decltype(failedStatusSetter)> failedStatusGuard(&presentStatus, failedStatusSetter);
61 
62  //If successful, this will call setProduct
63  resolver();
64  }
65 
66 
67  if (presentStatus == ProductStatus::ProductSet) {
68  auto pd = &getProductData();
69  if(pd->wrapper()->isPresent()) {
70  return Resolution(pd);
71  }
72  }
73 
74  return Resolution(nullptr);
75  }
76 
77  void
78  DataManagingProductResolver::mergeProduct(std::unique_ptr<WrapperBase> iFrom) const {
79  assert(status() == ProductStatus::ProductSet);
80  if(not iFrom) { return;}
81 
82  checkType(*iFrom);
83 
85  if(original->isMergeable()) {
86  original->mergeProduct(iFrom.get());
87  } else if(original->hasIsProductEqual()) {
88  if(!original->isProductEqual(iFrom.get())) {
89  auto const& bd = branchDescription();
90  edm::LogError("RunLumiMerging")
91  << "ProductResolver::mergeTheProduct\n"
92  << "Two run/lumi products for the same run/lumi which should be equal are not\n"
93  << "Using the first, ignoring the second\n"
94  << "className = " << bd.className() << "\n"
95  << "moduleLabel = " << bd.moduleLabel() << "\n"
96  << "instance = " << bd.productInstanceName() << "\n"
97  << "process = " << bd.processName() << "\n";
98  }
99  } else {
100  auto const& bd = branchDescription();
101  edm::LogWarning("RunLumiMerging")
102  << "ProductResolver::mergeTheProduct\n"
103  << "Run/lumi product has neither a mergeProduct nor isProductEqual function\n"
104  << "Using the first, ignoring the second in merge\n"
105  << "className = " << bd.className() << "\n"
106  << "moduleLabel = " << bd.moduleLabel() << "\n"
107  << "instance = " << bd.productInstanceName() << "\n"
108  << "process = " << bd.processName() << "\n";
109  }
110  }
111 
112 
115  bool,
117  ModuleCallingContext const* mcc) const {
118  return resolveProductImpl<true>([this,&principal,mcc]() {
119  auto branchType = principal.branchType();
120  if(branchType != InEvent) {
121  //delayed get has not been allowed with Run or Lumis
122  // The file may already be closed so the reader is invalid
123  return;
124  }
125  if(mcc and (branchType == InEvent) and aux_) {
126  aux_->preModuleDelayedGetSignal_.emit(*(mcc->getStreamContext()),*mcc);
127  }
128 
129  auto sentry( make_sentry(mcc,
130  [this, branchType](ModuleCallingContext const* iContext){
131  if(branchType == InEvent and aux_) {
132  aux_->postModuleDelayedGetSignal_.emit(*(iContext->getStreamContext()), *iContext); }
133  }));
134 
135  if(auto reader=principal.reader()) {
136  std::unique_lock<std::recursive_mutex> guard;
137  if(auto sr = reader->sharedResources().second) {
138  guard =std::unique_lock<std::recursive_mutex>(*sr);
139  }
140  if ( not productResolved()) {
141  //another thread could have beaten us here
142  putProduct( reader->getProduct(BranchKey(branchDescription()), &principal, mcc));
143  }
144  }
145  });
146 
147  }
148 
149  void
151  if(auto reader = principal.reader()) {
152 
153  std::unique_lock<std::recursive_mutex> guard;
154  if(auto sr = reader->sharedResources().second) {
155  guard =std::unique_lock<std::recursive_mutex>(*sr);
156  }
157 
158  //Can't use resolveProductImpl since it first checks to see
159  // if the product was already retrieved and then returns if it is
161  std::unique_ptr<WrapperBase> edp(reader->getProduct(bk, &principal));
162 
163  if(edp.get() != nullptr) {
165  } else if( status()== defaultStatus()) {
166  setFailedStatus();
167  }
168  }
169  }
170 
171 
173  Principal const& principal,
174  bool skipCurrentProcess,
176  ModuleCallingContext const* mcc) const {
177  m_waitingTasks.add(waitTask);
178 
179  bool expected = false;
180  if( m_prefetchRequested.compare_exchange_strong(expected, true) ) {
181 
182  //need to make sure Service system is activated on the reading thread
183  auto token = ServiceRegistry::instance().presentToken();
184  auto workToDo = [this, mcc, &principal, token] () {
185  ServiceRegistry::Operate guard(token);
186  try {
187  resolveProductImpl<true>([this,&principal,mcc]() {
188  if(principal.branchType() != InEvent) { return; }
189  if(auto reader = principal.reader()) {
190  std::unique_lock<std::recursive_mutex> guard;
191  if(auto sr = reader->sharedResources().second) {
192  guard =std::unique_lock<std::recursive_mutex>(*sr);
193  }
194  if ( not productResolved()) {
195  //another thread could have finished this while we were waiting
196  putProduct( reader->getProduct(BranchKey(branchDescription()), &principal, mcc));
197  }
198  }
199  });
200  } catch(...) {
201  this->m_waitingTasks.doneWaiting(std::current_exception());
202  return;
203  }
204  this->m_waitingTasks.doneWaiting(nullptr);
205  };
206 
207  SerialTaskQueueChain* queue = nullptr;
208  if(auto reader = principal.reader()) {
209  if (auto shared_res = reader->sharedResources().first) {
210  queue = &(shared_res->serialQueueChain());
211  }
212  }
213  if(queue) {
214  queue->push(workToDo);
215  } else {
216  //Have to create a new task
217  auto t = make_functor_task(tbb::task::allocate_root(),
218  workToDo);
219  tbb::task::spawn(*t);
220  }
221  }
222  }
223 
224  void
226  m_prefetchRequested = false;
227  m_waitingTasks.reset();
229  }
230 
231  void
233  aux_ = iConfigure.auxiliary();
234  }
235 
236 
237  bool
239  return false;
240  }
241 
242 
245  bool skipCurrentProcess,
247  ModuleCallingContext const*) const {
248  if (!skipCurrentProcess) {
249  //'false' means never call the lambda function
250  return resolveProductImpl<false>([](){return;});
251  }
252  return Resolution(nullptr);
253  }
254 
256  Principal const& principal,
257  bool skipCurrentProcess,
259  ModuleCallingContext const* mcc) const {
260  if(not skipCurrentProcess) {
261  if(branchDescription().availableOnlyAtEndTransition() and
262  not principal.atEndTransition()) {
263  return;
264  }
265  m_waitingTasks.add(waitTask);
266 
267  bool expected = false;
268  if(worker_ and prefetchRequested_.compare_exchange_strong(expected,true)) {
269  //using a waiting task to do a callback guarantees that
270  // the m_waitingTasks list will be released from waiting even
271  // if the module does not put this data product or the
272  // module has an exception while running
273 
274  auto waiting = make_waiting_task(tbb::task::allocate_root(),
275  [this](std::exception_ptr const * iException) {
276  if(nullptr != iException) {
277  m_waitingTasks.doneWaiting(*iException);
278  } else {
279  m_waitingTasks.doneWaiting(std::exception_ptr());
280  }
281  });
282  worker_->callWhenDoneAsync(waiting);
283  }
284  }
285  }
286 
287  void
288  PuttableProductResolver::putProduct_(std::unique_ptr<WrapperBase> edp) const {
290  bool expected = false;
291  if(prefetchRequested_.compare_exchange_strong(expected,true)) {
292  m_waitingTasks.doneWaiting(std::exception_ptr());
293  }
294  }
295 
296 
297  void
299  m_waitingTasks.reset();
301  prefetchRequested_ = false;
302  }
303 
304  void
306  worker_ = iConfigure.findWorker(branchDescription().moduleLabel());
307  }
308 
309 
310  void
312  aux_ = iConfigure.auxiliary();
313  worker_ = iConfigure.findWorker(branchDescription().moduleLabel());
314  assert(worker_ != nullptr);
315 
316  }
317 
320  bool skipCurrentProcess,
322  ModuleCallingContext const* mcc) const {
323  if (!skipCurrentProcess and worker_) {
324  return resolveProductImpl<true>(
325  [&principal,this,sra,mcc]() {
326  try {
327  auto const& event = static_cast<EventPrincipal const&>(principal);
328  ParentContext parentContext(mcc);
329  aux_->preModuleDelayedGetSignal_.emit(*(mcc->getStreamContext()),*mcc);
330 
331  auto workCall = [this,&event,&parentContext,mcc] () {
332  auto sentry( make_sentry(mcc,[this](ModuleCallingContext const* iContext){aux_->postModuleDelayedGetSignal_.emit(*(iContext->getStreamContext()), *iContext); }));
333 
335  event,
336  *(aux_->eventSetup()),
337  event.streamID(),
338  parentContext,
339  mcc->getStreamContext());
340  };
341 
342  if (sra) {
343  assert(false);
344  } else {
345  workCall();
346  }
347 
348  }
349  catch (cms::Exception & ex) {
350  std::ostringstream ost;
351  ost << "Calling produce method for unscheduled module "
352  << worker_->description().moduleName() << "/'"
353  << worker_->description().moduleLabel() << "'";
354  ex.addContext(ost.str());
355  throw;
356  }
357  });
358  }
359  return Resolution(nullptr);
360  }
361 
362  void
364  Principal const& principal,
365  bool skipCurrentProcess,
367  ModuleCallingContext const* mcc) const
368  {
369  if(skipCurrentProcess) { return; }
370  waitingTasks_.add(waitTask);
371  bool expected = false;
372  if(prefetchRequested_.compare_exchange_strong(expected, true)) {
373 
374  //Have to create a new task which will make sure the state for UnscheduledProductResolver
375  // is properly set after the module has run
376  auto t = make_waiting_task(tbb::task::allocate_root(),
377  [this,&principal, skipCurrentProcess,sra,mcc](std::exception_ptr const* iPtr)
378  {
379  //The exception is being rethrown because resolveProductImpl sets the ProductResolver to a failed
380  // state for the case where an exception occurs during the call to the function.
381  try {
382  resolveProductImpl<true>([iPtr]() {
383  if ( iPtr) {
384  std::rethrow_exception(*iPtr);
385  }
386  });
387  } catch(...) {
388  waitingTasks_.doneWaiting(std::current_exception());
389  return;
390  }
391  waitingTasks_.doneWaiting(nullptr);
392  } );
393  auto const& event = static_cast<EventPrincipal const&>(principal);
394  ParentContext parentContext(mcc);
395 
397  event,
398  *(aux_->eventSetup()),
399  event.streamID(),
400  parentContext,
401  mcc->getStreamContext());
402  }
403  }
404 
405  void
407  prefetchRequested_ = false;
408  waitingTasks_.reset();
410  }
411 
412 
413  void
414  ProducedProductResolver::putProduct_(std::unique_ptr<WrapperBase> edp) const {
415  if(status() != defaultStatus()) {
417  << "Attempt to insert more than one product on branch " << branchDescription().branchName() << "\n";
418  }
419 
420  setProduct(std::move(edp)); // ProductResolver takes ownership
421  }
422 
423  void
424  InputProductResolver::putProduct_(std::unique_ptr<WrapperBase> edp) const {
425  if ( not productResolved()) {
426  //Another thread could have set this
427  setProduct(std::move(edp));
428  }
429  }
430 
431  bool
433  return true;
434  }
435 
436  void
439  resetProductData_(false);
440  }
441  }
442 
443 
444  void
446  assert(false);
447  }
448 
449  void
450  DataManagingProductResolver::putOrMergeProduct_(std::unique_ptr<WrapperBase> prod) const {
451  if(not prod) {return;}
452  if(status() == defaultStatus()) {
453  //resolveProduct has not been called or it failed
454  putProduct(std::move(prod));
455  } else {
456  mergeProduct(std::move(prod));
457  }
458  }
459 
460 
461 
462  void
464  // Check if the types match.
465  TypeID typeID(prod.dynamicTypeInfo());
466  if(typeID != branchDescription().unwrappedTypeID()) {
467  // Types do not match.
469  << "Product on branch " << branchDescription().branchName() << " is of wrong type.\n"
470  << "It is supposed to be of type " << branchDescription().className() << ".\n"
471  << "It is actually of type " << typeID.className() << ".\n";
472  }
473  }
474 
475 
476  void
477  DataManagingProductResolver::setProduct(std::unique_ptr<WrapperBase> edp) const {
478  if(edp) {
479  checkType(*edp);
482  } else {
483  setFailedStatus();
484  }
485  }
486  // This routine returns true if it is known that currently there is no real product.
487  // If there is a real product, it returns false.
488  // If it is not known if there is a real product, it returns false.
489  bool
491  auto presentStatus = status();
492  if(presentStatus == ProductStatus::ProductSet) {
493  return !(getProductData().wrapper()->isPresent());
494  }
495  return presentStatus != ProductStatus::ResolveNotRun;
496  }
497 
498  bool
500  auto s = status();
501  return (s != defaultStatus() ) or (s == ProductStatus::ProductDeleted);
502  }
503 
504 
505  // This routine returns true if the product was deleted early in order to save memory
506  bool
509  }
510 
511  bool
513  if (iSkipCurrentProcess and isFromCurrentProcess() ) {
514  return false;
515  }
517  if(getProductData().wrapper()->isPresent()) {
518  return true;
519  }
520  }
521  return false;
522  }
523 
524 
526  productData_.setProvenance(provRetriever,ph,pid);
527  }
528 
531  }
532 
534  return provenance()->productProvenance();
535  }
536 
540  }
541  if(deleteEarly) {
543  } else {
544  resetStatus();
545  }
546  }
547 
549  return true;
550  }
551 
553  realProduct_.setProvenance(provRetriever,ph,pid);
554  }
555 
557  realProduct_.setProcessHistory(ph);
558  }
559 
561  return provenance()->productProvenance();
562  }
563 
565  realProduct_.resetProductData_(deleteEarly);
566  }
567 
569  return true;
570  }
571 
572  void AliasProductResolver::putProduct_(std::unique_ptr<WrapperBase> ) const {
574  << "AliasProductResolver::putProduct_() not implemented and should never be called.\n"
575  << "Contact a Framework developer\n";
576  }
577 
578  void AliasProductResolver::putOrMergeProduct_(std::unique_ptr<WrapperBase> edp) const {
580  << "AliasProductResolver::putOrMergeProduct_(std::unique_ptr<WrapperBase> edp) not implemented and should never be called.\n"
581  << "Contact a Framework developer\n";
582  }
583 
584 
585 
587  provRetriever_ = provRetriever;
588  }
589 
591  }
592 
594  return provRetriever_? provRetriever_->branchIDToProvenance(bd_->originalBranchID()): nullptr;
595  }
596 
598  }
599 
601  return true;
602  }
603 
604  void ParentProcessProductResolver::putProduct_(std::unique_ptr<WrapperBase> ) const {
606  << "ParentProcessProductResolver::putProduct_() not implemented and should never be called.\n"
607  << "Contact a Framework developer\n";
608  }
609 
610  void ParentProcessProductResolver::putOrMergeProduct_(std::unique_ptr<WrapperBase> edp) const {
612  << "ParentProcessProductResolver::putOrMergeProduct_(std::unique_ptr<WrapperBase> edp) not implemented and should never be called.\n"
613  << "Contact a Framework developer\n";
614  }
615 
617  // In principle, this ought to be fixed. I noticed one hits this error
618  // when in a SubProcess and calling the Event::getProvenance function
619  // with a BranchID to a branch from an earlier SubProcess or the top
620  // level process and this branch is not kept in this SubProcess. It might
621  // be possible to hit this in other contexts. I say it ought to be
622  // fixed because one does not encounter this issue if the SubProcesses
623  // are split into genuinely different processes (in principle that
624  // ought to give identical behavior and results). No user has ever
625  // reported this issue which has been around for some time and it was only
626  // noticed when testing some rare corner cases after modifying Core code.
627  // After discussing this with Chris we decided that at least for the moment
628  // there are higher priorities than fixing this ... I converted it so it
629  // causes an exception instead of a seg fault. The issue that may need to
630  // be addressed someday is how ProductResolvers for non-kept branches are
631  // connected to earlier SubProcesses.
633  << "ParentProcessProductResolver::throwNullRealProduct RealProduct pointer not set in this context.\n"
634  << "Contact a Framework developer\n";
635  }
636 
638  NoProcessProductResolver(std::vector<ProductResolverIndex> const& matchingHolders,
639  std::vector<bool> const& ambiguous) :
640  matchingHolders_(matchingHolders),
641  ambiguous_(ambiguous),
642  lastCheckIndex_(ambiguous_.size() + kUnsetOffset),
643  lastSkipCurrentCheckIndex_(lastCheckIndex_.load()),
644  prefetchRequested_(false),
645  skippingPrefetchRequested_(false),
646  recheckedAtEnd_(false) {
647  assert(ambiguous_.size() == matchingHolders_.size());
648  }
649 
652  Principal const& principal,
653  bool skipCurrentProcess,
655  ModuleCallingContext const* mcc) const {
656  ProductResolverBase const* productResolver = principal.getProductResolverByIndex(matchingHolders_[index]);
657  return productResolver->resolveProduct(principal, skipCurrentProcess, sra, mcc);
658  }
659 
660 
663  bool skipCurrentProcess,
665  ModuleCallingContext const* mcc) const {
666  //See if we've already cached which Resolver we should call or if
667  // we know it is ambiguous
668  const unsigned int choiceSize = ambiguous_.size();
669  {
670  if( (not principal.atEndTransition()) or
671  recheckedAtEnd_) {
672  unsigned int checkCacheIndex = skipCurrentProcess? lastSkipCurrentCheckIndex_.load() : lastCheckIndex_.load();
673  if( checkCacheIndex != choiceSize +kUnsetOffset) {
674  if (checkCacheIndex == choiceSize+kAmbiguousOffset) {
676  } else if(checkCacheIndex == choiceSize+kMissingOffset) {
677  return Resolution(nullptr);
678  }
679  return tryResolver(checkCacheIndex, principal, skipCurrentProcess,
680  sra,mcc);
681  }
682  }
683  }
684 
685  std::atomic<unsigned int>& updateCacheIndex = skipCurrentProcess? lastSkipCurrentCheckIndex_ : lastCheckIndex_;
686 
687  //make sure recheckedAtEnd_ set to true if needed
688  auto setTrue = [](std::atomic<bool>* iBool) { *iBool = true; };
689  using TrueGuard = std::unique_ptr<std::atomic<bool>, decltype(setTrue)>;
690  TrueGuard guard( principal.atEndTransition()?&recheckedAtEnd_:nullptr,setTrue);
691 
692  std::vector<unsigned int> const& lookupProcessOrder = principal.lookupProcessOrder();
693  for(unsigned int k : lookupProcessOrder) {
694  assert(k < ambiguous_.size());
695  if(k == 0) break; // Done
696  if(ambiguous_[k]) {
697  updateCacheIndex = choiceSize + kAmbiguousOffset;
699  }
701  auto resolution = tryResolver(k,principal, skipCurrentProcess, sra,mcc);
702  if(resolution.data() != nullptr) {
703  updateCacheIndex = k;
704  return resolution;
705  }
706  }
707  }
708 
709  updateCacheIndex = choiceSize + kMissingOffset;
710  return Resolution(nullptr);
711  }
712 
713  void
715  Principal const& principal,
716  bool skipCurrentProcess,
718  ModuleCallingContext const* mcc) const {
719  if(not skipCurrentProcess) {
720  waitingTasks_.add(waitTask);
721 
722  //It is possible that a new product was added at then end transition
723  // so we need to recheck what to return
724  bool needToRecheckAtEnd = false;
725  if(principal.atEndTransition()) {
726  bool expected = false;
727  needToRecheckAtEnd = recheckedAtEnd_.compare_exchange_strong(expected,true);
728  if(needToRecheckAtEnd) {
729  prefetchRequested_=true;
730  }
731  }
732 
733  bool expected = false;
734  if( needToRecheckAtEnd or prefetchRequested_.compare_exchange_strong(expected,true)) {
735  //we are the first thread to request
736  tryPrefetchResolverAsync(0, principal, skipCurrentProcess, sra, mcc, ServiceRegistry::instance().presentToken());
737  }
738  } else {
739  skippingWaitingTasks_.add(waitTask);
740  bool expected = false;
741  if( skippingPrefetchRequested_.compare_exchange_strong(expected,true)) {
742  //we are the first thread to request
743  tryPrefetchResolverAsync(0, principal, skipCurrentProcess, sra, mcc, ServiceRegistry::instance().presentToken());
744  }
745  }
746  }
747 
748  void NoProcessProductResolver::setCache(bool iSkipCurrentProcess,
749  ProductResolverIndex iIndex,
750  std::exception_ptr iExceptPtr) const {
751  if( not iSkipCurrentProcess) {
752  lastCheckIndex_ = iIndex;
753  waitingTasks_.doneWaiting(iExceptPtr);
754  } else {
757  }
758  }
759 
760  namespace {
761  class TryNextResolverWaitingTask : public edm::WaitingTask {
762  public:
763 
764  TryNextResolverWaitingTask(NoProcessProductResolver const* iResolver,
765  unsigned int iResolverIndex,
766  Principal const* iPrincipal,
768  ModuleCallingContext const* iMCC,
769  bool iSkipCurrentProcess,
770  ServiceToken iToken) :
771  resolver_(iResolver),
772  principal_(iPrincipal),
773  sra_(iSRA),
774  mcc_(iMCC),
775  serviceToken_(iToken),
776  index_(iResolverIndex),
777  skipCurrentProcess_(iSkipCurrentProcess){}
778 
779  tbb::task* execute() override {
780  auto exceptPtr =exceptionPtr();
781  if(exceptPtr) {
782  resolver_->prefetchFailed(index_, *principal_, skipCurrentProcess_, *exceptPtr);
783  } else {
784  if(not resolver_->dataValidFromResolver(index_, *principal_, skipCurrentProcess_)) {
785  resolver_->tryPrefetchResolverAsync(index_+1,
786  *principal_,
787  skipCurrentProcess_,
788  sra_,
789  mcc_,
790  serviceToken_);
791  }
792  }
793  return nullptr;
794  }
795 
796  private:
797  NoProcessProductResolver const* resolver_;
798  Principal const* principal_;
800  ModuleCallingContext const* mcc_;
801  ServiceToken serviceToken_;
802  unsigned int index_;
803  bool skipCurrentProcess_;
804  };
805  }
806 
807  void
808  NoProcessProductResolver::prefetchFailed(unsigned int iProcessingIndex,
809  Principal const& principal,
810  bool iSkipCurrentProcess,
811  std::exception_ptr iExceptPtr) const {
812  std::vector<unsigned int> const& lookupProcessOrder = principal.lookupProcessOrder();
813  auto k = lookupProcessOrder[iProcessingIndex];
814 
815  setCache(iSkipCurrentProcess, k, iExceptPtr);
816  }
817 
818 
819  bool
820  NoProcessProductResolver::dataValidFromResolver(unsigned int iProcessingIndex,
821  Principal const& principal,
822  bool iSkipCurrentProcess) const {
823  std::vector<unsigned int> const& lookupProcessOrder = principal.lookupProcessOrder();
824  auto k = lookupProcessOrder[iProcessingIndex];
825  ProductResolverBase const* productResolver = principal.getProductResolverByIndex(matchingHolders_[k]);
826 
827  if(productResolver->productWasFetchedAndIsValid(iSkipCurrentProcess)) {
828 
829  setCache(iSkipCurrentProcess, k, nullptr);
830  return true;
831  }
832  return false;
833  }
834 
835 
836  void
838  Principal const& principal,
839  bool skipCurrentProcess,
841  ModuleCallingContext const* mcc,
842  ServiceToken token) const {
843  std::vector<unsigned int> const& lookupProcessOrder = principal.lookupProcessOrder();
844  auto index = iProcessingIndex;
845 
846  const unsigned int choiceSize = ambiguous_.size();
847  unsigned int newCacheIndex = choiceSize + kMissingOffset;
848  while(index < lookupProcessOrder.size()) {
849  auto k = lookupProcessOrder[index];
850  if(k==0) {
851  break;
852  }
853  assert(k < ambiguous_.size());
854  if(ambiguous_[k]) {
855  newCacheIndex = choiceSize + kAmbiguousOffset;
856  break;
857  }
859  //make new task
860 
861  auto task = new (tbb::task::allocate_root()) TryNextResolverWaitingTask(
862  this,
863  index,
864  &principal,
865  sra,
866  mcc,
867  skipCurrentProcess,
868  token
869  );
870  task->increment_ref_count();
871  ProductResolverBase const* productResolver = principal.getProductResolverByIndex(matchingHolders_[k]);
872 
873  //Make sure the Services are available on this thread
874  ServiceRegistry::Operate guard(token);
875 
876  productResolver->prefetchAsync(task,
877  principal,
878  skipCurrentProcess,
879  sra, mcc);
880  if(0 == task->decrement_ref_count()) {
881  tbb::task::spawn(*task);
882  }
883  return;
884  }
885  ++index;
886  }
887  //data product unavailable
888  setCache(skipCurrentProcess, newCacheIndex, nullptr);
889  }
890 
892  }
893 
895  }
896 
898  return nullptr;
899  }
900 
901  inline unsigned int NoProcessProductResolver::unsetIndexValue() const { return ambiguous_.size()+kUnsetOffset; }
902 
904  const auto resetValue = unsetIndexValue();
905  lastCheckIndex_ = resetValue;
906  lastSkipCurrentCheckIndex_ = resetValue;
907  prefetchRequested_ = false;
909  recheckedAtEnd_ = false;
912  }
913 
915  return false;
916  }
917 
920  << "NoProcessProductResolver::unscheduledWasNotRun_() not implemented and should never be called.\n"
921  << "Contact a Framework developer\n";
922  }
923 
926  << "NoProcessProductResolver::productUnavailable_() not implemented and should never be called.\n"
927  << "Contact a Framework developer\n";
928  }
929 
932  << "NoProcessProductResolver::productResolved_() not implemented and should never be called.\n"
933  << "Contact a Framework developer\n";
934  }
935 
938  << "NoProcessProductResolver::productWasDeleted_() not implemented and should never be called.\n"
939  << "Contact a Framework developer\n";
940  }
941 
942  bool NoProcessProductResolver::productWasFetchedAndIsValid_(bool /*iSkipCurrentProcess*/) const {
944  << "NoProcessProductResolver::productWasFetchedAndIsValid_() not implemented and should never be called.\n"
945  << "Contact a Framework developer\n";
946  }
947 
948  void NoProcessProductResolver::putProduct_(std::unique_ptr<WrapperBase> ) const {
950  << "NoProcessProductResolver::putProduct_() not implemented and should never be called.\n"
951  << "Contact a Framework developer\n";
952  }
953 
954  void NoProcessProductResolver::putOrMergeProduct_(std::unique_ptr<WrapperBase> edp) const {
956  << "NoProcessProductResolver::putOrMergeProduct_(std::unique_ptr<WrapperBase> edp) not implemented and should never be called.\n"
957  << "Contact a Framework developer\n";
958  }
959 
962  << "NoProcessProductResolver::branchDescription_() not implemented and should never be called.\n"
963  << "Contact a Framework developer\n";
964  }
965 
966  void NoProcessProductResolver::resetBranchDescription_(std::shared_ptr<BranchDescription const>) {
968  << "NoProcessProductResolver::resetBranchDescription_() not implemented and should never be called.\n"
969  << "Contact a Framework developer\n";
970  }
971 
974  << "NoProcessProductResolver::provenance_() not implemented and should never be called.\n"
975  << "Contact a Framework developer\n";
976  }
977 
980  << "NoProcessProductResolver::connectTo() not implemented and should never be called.\n"
981  << "Contact a Framework developer\n";
982 
983  }
984 
985  //---- SingleChoiceNoProcessProductResolver ----------------
987  Principal const& principal,
988  bool skipCurrentProcess,
990  ModuleCallingContext const* mcc) const
991  {
992  //NOTE: Have to lookup the other ProductResolver each time rather than cache
993  // it's pointer since it appears the pointer can change at some later stage
994  return principal.getProductResolverByIndex(realResolverIndex_)
995  ->resolveProduct(principal,
996  skipCurrentProcess, sra, mcc);
997  }
998 
1000  Principal const& principal,
1001  bool skipCurrentProcess,
1003  ModuleCallingContext const* mcc) const {
1004  principal.getProductResolverByIndex(realResolverIndex_)
1005  ->prefetchAsync(waitTask,principal,
1006  skipCurrentProcess, sra, mcc);
1007  }
1008 
1010  }
1011 
1013  }
1014 
1016  return nullptr;
1017  }
1018 
1020  }
1021 
1023  return false;
1024  }
1025 
1028  << "SingleChoiceNoProcessProductResolver::unscheduledWasNotRun_() not implemented and should never be called.\n"
1029  << "Contact a Framework developer\n";
1030  }
1031 
1034  << "SingleChoiceNoProcessProductResolver::productUnavailable_() not implemented and should never be called.\n"
1035  << "Contact a Framework developer\n";
1036  }
1037 
1040  << "SingleChoiceNoProcessProductResolver::productResolved_() not implemented and should never be called.\n"
1041  << "Contact a Framework developer\n";
1042  }
1043 
1046  << "SingleChoiceNoProcessProductResolver::productWasDeleted_() not implemented and should never be called.\n"
1047  << "Contact a Framework developer\n";
1048  }
1049 
1052  << "SingleChoiceNoProcessProductResolver::productWasFetchedAndIsValid_() not implemented and should never be called.\n"
1053  << "Contact a Framework developer\n";
1054  }
1055 
1056 
1057  void SingleChoiceNoProcessProductResolver::putProduct_(std::unique_ptr<WrapperBase> ) const {
1059  << "SingleChoiceNoProcessProductResolver::putProduct_() not implemented and should never be called.\n"
1060  << "Contact a Framework developer\n";
1061  }
1062 
1063  void SingleChoiceNoProcessProductResolver::putOrMergeProduct_(std::unique_ptr<WrapperBase> edp) const {
1065  << "SingleChoiceNoProcessProductResolver::putOrMergeProduct_(std::unique_ptr<WrapperBase> edp) not implemented and should never be called.\n"
1066  << "Contact a Framework developer\n";
1067  }
1068 
1071  << "SingleChoiceNoProcessProductResolver::branchDescription_() not implemented and should never be called.\n"
1072  << "Contact a Framework developer\n";
1073  }
1074 
1075  void SingleChoiceNoProcessProductResolver::resetBranchDescription_(std::shared_ptr<BranchDescription const>) {
1077  << "SingleChoiceNoProcessProductResolver::resetBranchDescription_() not implemented and should never be called.\n"
1078  << "Contact a Framework developer\n";
1079  }
1080 
1083  << "SingleChoiceNoProcessProductResolver::provenance_() not implemented and should never be called.\n"
1084  << "Contact a Framework developer\n";
1085  }
1086 
1089  << "SingleChoiceNoProcessProductResolver::connectTo() not implemented and should never be called.\n"
1090  << "Contact a Framework developer\n";
1091 
1092  }
1093 
1094 }
virtual bool productResolved_() const override final
virtual void connectTo(ProductResolverBase const &iOther, Principal const *) override final
size
Write out results.
virtual Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
std::string const & branchName() const
std::string const & productInstanceName() const
virtual void putOrMergeProduct_(std::unique_ptr< WrapperBase > prod) const override final
std::unique_ptr< T, F > make_sentry(T *iObject, F iFunc)
NOTE: if iObject is null, then iFunc will not be called.
Definition: make_sentry.h:29
virtual void resetProductData_(bool deleteEarly) override
Provenance const * provenance() const
void setProcessHistory(ProcessHistory const &ph)
Definition: ProductData.h:59
unsigned int ProductResolverIndex
StreamContext const * getStreamContext() const
virtual void resetProductData_(bool deleteEarly) override
virtual bool productUnavailable_() const override final
void prefetchAsync(WaitingTask *waitTask, Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const
void checkType(WrapperBase const &prod) const
std::type_info const & dynamicTypeInfo() const
Definition: WrapperBase.h:38
unsigned int unsetIndexValue() const
virtual void prefetchAsync_(WaitingTask *waitTask, Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
virtual void resetProductData_(bool deleteEarly) override
FunctorTask< F > * make_functor_task(ALLOC &&iAlloc, F f)
Definition: FunctorTask.h:47
void push(const T &iAction)
asynchronously pushes functor iAction into queue
void setCache(bool skipCurrentProcess, ProductResolverIndex index, std::exception_ptr exceptionPtr) const
void add(WaitingTask *)
Adds task to the waiting list.
virtual void setProcessHistory_(ProcessHistory const &ph) override
ProductStatus status() const
virtual bool singleProduct_() const override
virtual bool unscheduledWasNotRun_() const override
virtual void putProduct_(std::unique_ptr< WrapperBase > edp) const override
virtual void connectTo(ProductResolverBase const &iOther, Principal const *) override final
virtual bool productWasFetchedAndIsValid_(bool iSkipCurrentProcess) const override final
Resolution tryResolver(unsigned int index, Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const
WrapperBase const * wrapper() const
Definition: ProductData.h:32
virtual bool productWasDeleted_() const override final
virtual bool singleProduct_() const override
virtual void resetProductData_(bool deleteEarly) override
ProductProvenance const * productProvenance() const
Definition: Provenance.cc:31
Resolution resolveProduct(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const
virtual void setProvenance_(ProductProvenanceRetriever const *provRetriever, ProcessHistory const &ph, ProductID const &pid) override
void reset()
Resets access to the resource so that added tasks will wait.
virtual BranchDescription const & branchDescription_() const override
static unsigned int kUnsetOffset
ServiceToken presentToken() const
virtual void resetProductData_(bool deleteEarly) override
std::string const & processName() const
virtual void retrieveAndMerge_(Principal const &principal) const override
#define constexpr
virtual void putProduct_(std::unique_ptr< WrapperBase > edp) const override
bool isPresent() const
Definition: WrapperBase.h:22
virtual void prefetchAsync_(WaitingTask *waitTask, Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
virtual void prefetchAsync_(WaitingTask *waitTask, Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
virtual ProductProvenance const * productProvenancePtr_() const override
static unsigned int kMissingOffset
virtual void putProduct_(std::unique_ptr< WrapperBase > edp) const override
void putProduct(std::unique_ptr< WrapperBase > edp) const
void unsafe_setWrapper(std::unique_ptr< WrapperBase > iValue) const
Definition: ProductData.cc:38
void prefetchFailed(unsigned int iProcessingIndex, Principal const &principal, bool iSkipCurrentProcess, std::exception_ptr iExceptPtr) const
virtual void putOrMergeProduct_(std::unique_ptr< WrapperBase > prod) const override final
virtual void resetFailedFromThisProcess() override
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
void setProvenance(ProductProvenanceRetriever const *provRetriever, ProcessHistory const &ph, ProductID const &pid)
Definition: ProductData.h:63
virtual void setProcessHistory_(ProcessHistory const &ph) override final
virtual bool isFromCurrentProcess() const override final
def principal(options)
std::string const & className() const
std::atomic< unsigned int > lastSkipCurrentCheckIndex_
void resetProductData()
Definition: ProductData.h:51
virtual bool singleProduct_() const override
virtual Provenance const * provenance_() const override
std::vector< unsigned int > const & lookupProcessOrder() const
Definition: Principal.h:196
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
virtual void putOrMergeProduct_(std::unique_ptr< WrapperBase > prod) const override final
virtual Provenance const * provenance_() const override
bool dataValidFromResolver(unsigned int iProcessingIndex, Principal const &principal, bool iSkipCurrentProcess) const
virtual void setProvenance_(ProductProvenanceRetriever const *provRetriever, ProcessHistory const &ph, ProductID const &pid) override
TypeID unwrappedTypeID() const
virtual void resetProductData_(bool deleteEarly) override
static ServiceRegistry & instance()
virtual void setProcessHistory_(ProcessHistory const &ph) override
virtual ProductProvenance const * productProvenancePtr_() const override
virtual bool productUnavailable_() const override
virtual void resetBranchDescription_(std::shared_ptr< BranchDescription const > bd) override
void mergeProduct(std::unique_ptr< WrapperBase > edp) const
virtual ProductProvenance const * productProvenancePtr_() const override
std::atomic< bool > prefetchRequested_
virtual ProductProvenance const * productProvenancePtr_() const override
std::atomic< unsigned int > lastCheckIndex_
virtual void prefetchAsync_(WaitingTask *waitTask, Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger but the state exists so we define the behavior If all triggers are the negative crieriion will lead to accepting the event(this again matches the behavior of"!*"before the partial wildcard feature was incorporated).The per-event"cost"of each negative criterion with multiple relevant triggers is about the same as!*was in the past
virtual void prefetchAsync_(WaitingTask *waitTask, Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
virtual bool productWasDeleted_() const override
virtual void putProduct_(std::unique_ptr< WrapperBase > edp) const override
virtual void setProvenance_(ProductProvenanceRetriever const *provRetriever, ProcessHistory const &ph, ProductID const &pid) override final
virtual bool isFromCurrentProcess() const =0
virtual ProductProvenance const * productProvenancePtr_() const override final
virtual Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
virtual Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
int k[5][pyjets_maxn]
void setProduct(std::unique_ptr< WrapperBase > edp) const
std::atomic< bool > skippingPrefetchRequested_
std::atomic< ProductStatus > theStatus_
virtual bool singleProduct_() const override final
std::vector< bool > ambiguous_
Resolution resolveProductImpl(FUNC resolver) const
virtual bool productResolved_() const override final
DelayedReader * reader() const
Definition: Principal.h:184
UnscheduledAuxiliary const * auxiliary() const
virtual void resetProductData_(bool deleteEarly) override
static unsigned int kAmbiguousOffset
ProductStatus defaultStatus() const
bool productWasFetchedAndIsValid(bool iSkipCurrentProcess) const
virtual void setProcessHistory_(ProcessHistory const &ph) override
def load(fileName)
Definition: svgfig.py:546
virtual BranchDescription const & branchDescription_() const override
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:90
virtual bool isFromCurrentProcess() const override final
virtual bool productUnavailable_() const override
void addContext(std::string const &context)
Definition: Exception.cc:227
virtual void setupUnscheduled(UnscheduledConfigurator const &) override final
virtual void setupUnscheduled(UnscheduledConfigurator const &) override final
virtual bool singleProduct_() const override
std::vector< ProductResolverIndex > matchingHolders_
virtual void resetBranchDescription_(std::shared_ptr< BranchDescription const > bd) override
void tryPrefetchResolverAsync(unsigned int iProcessingIndex, Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc, ServiceToken token) const
HLT enums.
BranchDescription const & branchDescription() const
virtual void putOrMergeProduct_(std::unique_ptr< WrapperBase > prod) const override final
Worker * findWorker(std::string const &iLabel) const
ConstProductResolverPtr getProductResolverByIndex(ProductResolverIndex const &oid) const
Definition: Principal.cc:457
virtual bool productResolved_() const override final
virtual void putProduct_(std::unique_ptr< WrapperBase > edp) const override
virtual void setProvenance_(ProductProvenanceRetriever const *provRetriever, ProcessHistory const &ph, ProductID const &pid) override
virtual void setProcessHistory_(ProcessHistory const &ph) override
virtual void putProduct_(std::unique_ptr< WrapperBase > edp) const override
ProductData const & getProductData() const
virtual bool productWasDeleted_() const override
virtual Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
virtual bool productWasFetchedAndIsValid_(bool iSkipCurrentProcess) const override
virtual void putOrMergeProduct_(std::unique_ptr< WrapperBase > prod) const override final
virtual void resetProductData_(bool deleteEarly) override
NoProcessProductResolver(std::vector< ProductResolverIndex > const &matchingHolders, std::vector< bool > const &ambiguous)
virtual bool unscheduledWasNotRun_() const override
virtual Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
void putOrMergeProduct(std::unique_ptr< WrapperBase > edp) const
virtual void setProvenance_(ProductProvenanceRetriever const *provRetriever, ProcessHistory const &ph, ProductID const &pid) override
std::atomic< bool > recheckedAtEnd_
virtual bool productWasFetchedAndIsValid_(bool iSkipCurrentProcess) const override
def move(src, dest)
Definition: eostools.py:510
def branchType(schema, name)
Definition: revisionDML.py:112
std::string const & moduleLabel() const
WrapperBase * unsafe_wrapper() const
Definition: ProductData.h:34
static HepMC::HEPEVT_Wrapper wrapper
virtual void connectTo(ProductResolverBase const &, Principal const *) override final
virtual void putProduct_(std::unique_ptr< WrapperBase > edp) const override
bool atEndTransition() const
Definition: Principal.h:90
virtual void setupUnscheduled(UnscheduledConfigurator const &) override final