CMS 3D CMS Logo

ProductResolvers.cc
Go to the documentation of this file.
1 /*----------------------------------------------------------------------
2 ----------------------------------------------------------------------*/
3 #include "ProductResolvers.h"
4 #include "Worker.h"
5 #include "UnscheduledAuxiliary.h"
18 
19 #include <cassert>
20 #include <utility>
21 
22 static constexpr unsigned int kUnsetOffset = 0;
23 static constexpr unsigned int kAmbiguousOffset = 1;
24 static constexpr unsigned int kMissingOffset = 2;
25 
26 namespace edm {
27 
30  exception << "ProductResolverBase::resolveProduct_: The product matching all criteria was already deleted\n"
31  << "Looking for type: " << branchDescription().unwrappedTypeID() << "\n"
32  << "Looking for module label: " << moduleLabel() << "\n"
33  << "Looking for productInstanceName: " << productInstanceName() << "\n"
34  << (processName().empty() ? "" : "Looking for process: ") << processName() << "\n"
35  << "This means there is a configuration error.\n"
36  << "The module which is asking for this data must be configured to state that it will read this data.";
37  throw exception;
38 
39  }
40 
41  //This is a templated function in order to avoid calling another virtual function
42  template <bool callResolver, typename FUNC>
45 
46  if(productWasDeleted()) {
48  }
49  auto presentStatus = status();
50 
51  if(callResolver && presentStatus == ProductStatus::ResolveNotRun) {
52  //if resolver fails because of exception or not setting product
53  // make sure the status goes to failed
54  auto failedStatusSetter = [this](ProductStatus* presentStatus) {
55  if(this->status() == ProductStatus::ResolveNotRun) {
56  this->setFailedStatus();
57  }
58  *presentStatus = this->status();
59  };
60  std::unique_ptr<ProductStatus, decltype(failedStatusSetter)> failedStatusGuard(&presentStatus, failedStatusSetter);
61 
62  //If successful, this will call setProduct
63  resolver();
64  }
65 
66 
67  if (presentStatus == ProductStatus::ProductSet) {
68  auto pd = &getProductData();
69  if(pd->wrapper()->isPresent()) {
70  return Resolution(pd);
71  }
72  }
73 
74  return Resolution(nullptr);
75  }
76 
77  void
78  DataManagingProductResolver::mergeProduct(std::unique_ptr<WrapperBase> iFrom) const {
79  assert(status() == ProductStatus::ProductSet);
80  if(not iFrom) { return;}
81 
82  checkType(*iFrom);
83 
85  if(original->isMergeable()) {
86  original->mergeProduct(iFrom.get());
87  } else if(original->hasIsProductEqual()) {
88  if(!original->isProductEqual(iFrom.get())) {
89  auto const& bd = branchDescription();
90  edm::LogError("RunLumiMerging")
91  << "ProductResolver::mergeTheProduct\n"
92  << "Two run/lumi products for the same run/lumi which should be equal are not\n"
93  << "Using the first, ignoring the second\n"
94  << "className = " << bd.className() << "\n"
95  << "moduleLabel = " << bd.moduleLabel() << "\n"
96  << "instance = " << bd.productInstanceName() << "\n"
97  << "process = " << bd.processName() << "\n";
98  }
99  } else {
100  auto const& bd = branchDescription();
101  edm::LogWarning("RunLumiMerging")
102  << "ProductResolver::mergeTheProduct\n"
103  << "Run/lumi product has neither a mergeProduct nor isProductEqual function\n"
104  << "Using the first, ignoring the second in merge\n"
105  << "className = " << bd.className() << "\n"
106  << "moduleLabel = " << bd.moduleLabel() << "\n"
107  << "instance = " << bd.productInstanceName() << "\n"
108  << "process = " << bd.processName() << "\n";
109  }
110  }
111 
112 
115  bool,
117  ModuleCallingContext const* mcc) const {
118  return resolveProductImpl<true>([this,&principal,mcc]() {
119  auto branchType = principal.branchType();
120  if(branchType != InEvent) {
121  //delayed get has not been allowed with Run or Lumis
122  // The file may already be closed so the reader is invalid
123  return;
124  }
125  if(mcc and (branchType == InEvent) and aux_) {
126  aux_->preModuleDelayedGetSignal_.emit(*(mcc->getStreamContext()),*mcc);
127  }
128 
129  auto sentry( make_sentry(mcc,
130  [this, branchType](ModuleCallingContext const* iContext){
131  if(branchType == InEvent and aux_) {
132  aux_->postModuleDelayedGetSignal_.emit(*(iContext->getStreamContext()), *iContext); }
133  }));
134 
135  if(auto reader=principal.reader()) {
136  std::unique_lock<std::recursive_mutex> guard;
137  if(auto sr = reader->sharedResources().second) {
138  guard =std::unique_lock<std::recursive_mutex>(*sr);
139  }
140  if ( not productResolved()) {
141  //another thread could have beaten us here
142  putProduct( reader->getProduct(BranchKey(branchDescription()), &principal, mcc));
143  }
144  }
145  });
146 
147  }
148 
149  void
151  if(auto reader = principal.reader()) {
152 
153  std::unique_lock<std::recursive_mutex> guard;
154  if(auto sr = reader->sharedResources().second) {
155  guard =std::unique_lock<std::recursive_mutex>(*sr);
156  }
157 
158  //Can't use resolveProductImpl since it first checks to see
159  // if the product was already retrieved and then returns if it is
161  std::unique_ptr<WrapperBase> edp(reader->getProduct(bk, &principal));
162 
163  if(edp.get() != nullptr) {
165  } else if( status()== defaultStatus()) {
166  setFailedStatus();
167  }
168  }
169  }
170 
171 
173  Principal const& principal,
174  bool skipCurrentProcess,
176  ModuleCallingContext const* mcc) const {
177  m_waitingTasks.add(waitTask);
178 
179  bool expected = false;
180  if( m_prefetchRequested.compare_exchange_strong(expected, true) ) {
181 
182  //need to make sure Service system is activated on the reading thread
183  auto token = ServiceRegistry::instance().presentToken();
184  auto workToDo = [this, mcc, &principal, token] () {
185  ServiceRegistry::Operate guard(token);
186  try {
187  resolveProductImpl<true>([this,&principal,mcc]() {
188  if(principal.branchType() != InEvent) { return; }
189  if(auto reader = principal.reader()) {
190  std::unique_lock<std::recursive_mutex> guard;
191  if(auto sr = reader->sharedResources().second) {
192  guard =std::unique_lock<std::recursive_mutex>(*sr);
193  }
194  if ( not productResolved()) {
195  //another thread could have finished this while we were waiting
196  putProduct( reader->getProduct(BranchKey(branchDescription()), &principal, mcc));
197  }
198  }
199  });
200  } catch(...) {
201  this->m_waitingTasks.doneWaiting(std::current_exception());
202  return;
203  }
204  this->m_waitingTasks.doneWaiting(nullptr);
205  };
206 
207  SerialTaskQueueChain* queue = nullptr;
208  if(auto reader = principal.reader()) {
209  if (auto shared_res = reader->sharedResources().first) {
210  queue = &(shared_res->serialQueueChain());
211  }
212  }
213  if(queue) {
214  queue->push(workToDo);
215  } else {
216  //Have to create a new task
217  auto t = make_functor_task(tbb::task::allocate_root(),
218  workToDo);
219  tbb::task::spawn(*t);
220  }
221  }
222  }
223 
224  void
226  m_prefetchRequested = false;
227  m_waitingTasks.reset();
229  }
230 
231  void
233  aux_ = iConfigure.auxiliary();
234  }
235 
236 
237  bool
239  return false;
240  }
241 
242 
245  bool skipCurrentProcess,
247  ModuleCallingContext const*) const {
248  if (!skipCurrentProcess) {
249  //'false' means never call the lambda function
250  return resolveProductImpl<false>([](){return;});
251  }
252  return Resolution(nullptr);
253  }
254 
256  Principal const& principal,
257  bool skipCurrentProcess,
259  ModuleCallingContext const* mcc) const {
260  if(not skipCurrentProcess) {
261  m_waitingTasks.add(waitTask);
262 
263  bool expected = false;
264  if(worker_ and prefetchRequested_.compare_exchange_strong(expected,true)) {
265  //using a waiting task to do a callback guarantees that
266  // the m_waitingTasks list will be released from waiting even
267  // if the module does not put this data product or the
268  // module has an exception while running
269 
270  auto waiting = make_waiting_task(tbb::task::allocate_root(),
271  [this](std::exception_ptr const * iException) {
272  if(nullptr != iException) {
273  m_waitingTasks.doneWaiting(*iException);
274  } else {
275  m_waitingTasks.doneWaiting(std::exception_ptr());
276  }
277  });
278  worker_->callWhenDoneAsync(waiting);
279  }
280  }
281  }
282 
283  void
284  PuttableProductResolver::putProduct_(std::unique_ptr<WrapperBase> edp) const {
286  bool expected = false;
287  if(prefetchRequested_.compare_exchange_strong(expected,true)) {
288  m_waitingTasks.doneWaiting(std::exception_ptr());
289  }
290  }
291 
292 
293  void
295  m_waitingTasks.reset();
297  prefetchRequested_ = false;
298  }
299 
300  void
302  worker_ = iConfigure.findWorker(branchDescription().moduleLabel());
303  }
304 
305 
306  void
308  aux_ = iConfigure.auxiliary();
309  worker_ = iConfigure.findWorker(branchDescription().moduleLabel());
310  assert(worker_ != nullptr);
311 
312  }
313 
316  bool skipCurrentProcess,
318  ModuleCallingContext const* mcc) const {
319  if (!skipCurrentProcess and worker_) {
320  return resolveProductImpl<true>(
321  [&principal,this,sra,mcc]() {
322  try {
323  auto const& event = static_cast<EventPrincipal const&>(principal);
324  ParentContext parentContext(mcc);
325  aux_->preModuleDelayedGetSignal_.emit(*(mcc->getStreamContext()),*mcc);
326 
327  auto workCall = [this,&event,&parentContext,mcc] () {
328  auto sentry( make_sentry(mcc,[this](ModuleCallingContext const* iContext){aux_->postModuleDelayedGetSignal_.emit(*(iContext->getStreamContext()), *iContext); }));
329 
331  event,
332  *(aux_->eventSetup()),
333  event.streamID(),
334  parentContext,
335  mcc->getStreamContext());
336  };
337 
338  if (sra) {
339  assert(false);
340  } else {
341  workCall();
342  }
343 
344  }
345  catch (cms::Exception & ex) {
346  std::ostringstream ost;
347  ost << "Calling produce method for unscheduled module "
348  << worker_->description().moduleName() << "/'"
349  << worker_->description().moduleLabel() << "'";
350  ex.addContext(ost.str());
351  throw;
352  }
353  });
354  }
355  return Resolution(nullptr);
356  }
357 
358  void
360  Principal const& principal,
361  bool skipCurrentProcess,
363  ModuleCallingContext const* mcc) const
364  {
365  if(skipCurrentProcess) { return; }
366  waitingTasks_.add(waitTask);
367  bool expected = false;
368  if(prefetchRequested_.compare_exchange_strong(expected, true)) {
369 
370  //Have to create a new task which will make sure the state for UnscheduledProductResolver
371  // is properly set after the module has run
372  auto t = make_waiting_task(tbb::task::allocate_root(),
373  [this,&principal, skipCurrentProcess,sra,mcc](std::exception_ptr const* iPtr)
374  {
375  //The exception is being rethrown because resolveProductImpl sets the ProductResolver to a failed
376  // state for the case where an exception occurs during the call to the function.
377  try {
378  resolveProductImpl<true>([iPtr]() {
379  if ( iPtr) {
380  std::rethrow_exception(*iPtr);
381  }
382  });
383  } catch(...) {
384  waitingTasks_.doneWaiting(std::current_exception());
385  return;
386  }
387  waitingTasks_.doneWaiting(nullptr);
388  } );
389  auto const& event = static_cast<EventPrincipal const&>(principal);
390  ParentContext parentContext(mcc);
391 
393  event,
394  *(aux_->eventSetup()),
395  event.streamID(),
396  parentContext,
397  mcc->getStreamContext());
398  }
399  }
400 
401  void
403  prefetchRequested_ = false;
404  waitingTasks_.reset();
406  }
407 
408 
409  void
410  ProducedProductResolver::putProduct_(std::unique_ptr<WrapperBase> edp) const {
411  if(status() != defaultStatus()) {
413  << "Attempt to insert more than one product on branch " << branchDescription().branchName() << "\n";
414  }
415 
416  setProduct(std::move(edp)); // ProductResolver takes ownership
417  }
418 
419  void
420  InputProductResolver::putProduct_(std::unique_ptr<WrapperBase> edp) const {
421  if ( not productResolved()) {
422  //Another thread could have set this
423  setProduct(std::move(edp));
424  }
425  }
426 
427  bool
429  return true;
430  }
431 
432  void
435  resetProductData_(false);
436  }
437  }
438 
439 
440  void
442  assert(false);
443  }
444 
445  void
446  DataManagingProductResolver::putOrMergeProduct_(std::unique_ptr<WrapperBase> prod) const {
447  if(not prod) {return;}
448  if(status() == defaultStatus()) {
449  //resolveProduct has not been called or it failed
450  putProduct(std::move(prod));
451  } else {
452  mergeProduct(std::move(prod));
453  }
454  }
455 
456 
457 
458  void
460  // Check if the types match.
461  TypeID typeID(prod.dynamicTypeInfo());
462  if(typeID != branchDescription().unwrappedTypeID()) {
463  // Types do not match.
465  << "Product on branch " << branchDescription().branchName() << " is of wrong type.\n"
466  << "It is supposed to be of type " << branchDescription().className() << ".\n"
467  << "It is actually of type " << typeID.className() << ".\n";
468  }
469  }
470 
471 
472  void
473  DataManagingProductResolver::setProduct(std::unique_ptr<WrapperBase> edp) const {
474  if(edp) {
475  checkType(*edp);
478  } else {
479  setFailedStatus();
480  }
481  }
482  // This routine returns true if it is known that currently there is no real product.
483  // If there is a real product, it returns false.
484  // If it is not known if there is a real product, it returns false.
485  bool
487  auto presentStatus = status();
488  if(presentStatus == ProductStatus::ProductSet) {
489  return !(getProductData().wrapper()->isPresent());
490  }
491  return presentStatus != ProductStatus::ResolveNotRun;
492  }
493 
494  bool
496  auto s = status();
497  return (s != defaultStatus() ) or (s == ProductStatus::ProductDeleted);
498  }
499 
500 
501  // This routine returns true if the product was deleted early in order to save memory
502  bool
505  }
506 
507  bool
509  if (iSkipCurrentProcess and isFromCurrentProcess() ) {
510  return false;
511  }
513  if(getProductData().wrapper()->isPresent()) {
514  return true;
515  }
516  }
517  return false;
518  }
519 
520 
522  productData_.setProvenance(provRetriever,ph,pid);
523  }
524 
527  }
528 
530  return provenance()->productProvenance();
531  }
532 
536  }
537  if(deleteEarly) {
539  } else {
540  resetStatus();
541  }
542  }
543 
545  return true;
546  }
547 
549  realProduct_.setProvenance(provRetriever,ph,pid);
550  }
551 
553  realProduct_.setProcessHistory(ph);
554  }
555 
557  return provenance()->productProvenance();
558  }
559 
561  realProduct_.resetProductData_(deleteEarly);
562  }
563 
565  return true;
566  }
567 
568  void AliasProductResolver::putProduct_(std::unique_ptr<WrapperBase> ) const {
570  << "AliasProductResolver::putProduct_() not implemented and should never be called.\n"
571  << "Contact a Framework developer\n";
572  }
573 
574  void AliasProductResolver::putOrMergeProduct_(std::unique_ptr<WrapperBase> edp) const {
576  << "AliasProductResolver::putOrMergeProduct_(std::unique_ptr<WrapperBase> edp) not implemented and should never be called.\n"
577  << "Contact a Framework developer\n";
578  }
579 
580 
581 
583  provRetriever_ = provRetriever;
584  }
585 
587  }
588 
590  return provRetriever_? provRetriever_->branchIDToProvenance(bd_->originalBranchID()): nullptr;
591  }
592 
594  }
595 
597  return true;
598  }
599 
600  void ParentProcessProductResolver::putProduct_(std::unique_ptr<WrapperBase> ) const {
602  << "ParentProcessProductResolver::putProduct_() not implemented and should never be called.\n"
603  << "Contact a Framework developer\n";
604  }
605 
606  void ParentProcessProductResolver::putOrMergeProduct_(std::unique_ptr<WrapperBase> edp) const {
608  << "ParentProcessProductResolver::putOrMergeProduct_(std::unique_ptr<WrapperBase> edp) not implemented and should never be called.\n"
609  << "Contact a Framework developer\n";
610  }
611 
613  // In principle, this ought to be fixed. I noticed one hits this error
614  // when in a SubProcess and calling the Event::getProvenance function
615  // with a BranchID to a branch from an earlier SubProcess or the top
616  // level process and this branch is not kept in this SubProcess. It might
617  // be possible to hit this in other contexts. I say it ought to be
618  // fixed because one does not encounter this issue if the SubProcesses
619  // are split into genuinely different processes (in principle that
620  // ought to give identical behavior and results). No user has ever
621  // reported this issue which has been around for some time and it was only
622  // noticed when testing some rare corner cases after modifying Core code.
623  // After discussing this with Chris we decided that at least for the moment
624  // there are higher priorities than fixing this ... I converted it so it
625  // causes an exception instead of a seg fault. The issue that may need to
626  // be addressed someday is how ProductResolvers for non-kept branches are
627  // connected to earlier SubProcesses.
629  << "ParentProcessProductResolver::throwNullRealProduct RealProduct pointer not set in this context.\n"
630  << "Contact a Framework developer\n";
631  }
632 
634  NoProcessProductResolver(std::vector<ProductResolverIndex> const& matchingHolders,
635  std::vector<bool> const& ambiguous) :
636  matchingHolders_(matchingHolders),
637  ambiguous_(ambiguous),
638  lastCheckIndex_(ambiguous_.size() + kUnsetOffset),
639  lastSkipCurrentCheckIndex_(lastCheckIndex_.load()),
640  prefetchRequested_(false),
641  skippingPrefetchRequested_(false) {
642  assert(ambiguous_.size() == matchingHolders_.size());
643  }
644 
647  Principal const& principal,
648  bool skipCurrentProcess,
650  ModuleCallingContext const* mcc) const {
651  ProductResolverBase const* productResolver = principal.getProductResolverByIndex(matchingHolders_[index]);
652  return productResolver->resolveProduct(principal, skipCurrentProcess, sra, mcc);
653  }
654 
655 
658  bool skipCurrentProcess,
660  ModuleCallingContext const* mcc) const {
661  //See if we've already cached which Resolver we should call or if
662  // we know it is ambiguous
663  const unsigned int choiceSize = ambiguous_.size();
664  {
665  unsigned int checkCacheIndex = skipCurrentProcess? lastSkipCurrentCheckIndex_.load() : lastCheckIndex_.load();
666  if( checkCacheIndex != choiceSize +kUnsetOffset) {
667  if (checkCacheIndex == choiceSize+kAmbiguousOffset) {
669  } else if(checkCacheIndex == choiceSize+kMissingOffset) {
670  return Resolution(nullptr);
671  }
672  return tryResolver(checkCacheIndex, principal, skipCurrentProcess,
673  sra,mcc);
674  }
675  }
676 
677  std::atomic<unsigned int>& updateCacheIndex = skipCurrentProcess? lastSkipCurrentCheckIndex_ : lastCheckIndex_;
678 
679  std::vector<unsigned int> const& lookupProcessOrder = principal.lookupProcessOrder();
680  for(unsigned int k : lookupProcessOrder) {
681  assert(k < ambiguous_.size());
682  if(k == 0) break; // Done
683  if(ambiguous_[k]) {
684  updateCacheIndex = choiceSize + kAmbiguousOffset;
686  }
688  auto resolution = tryResolver(k,principal, skipCurrentProcess, sra,mcc);
689  if(resolution.data() != nullptr) {
690  updateCacheIndex = k;
691  return resolution;
692  }
693  }
694  }
695 
696  updateCacheIndex = choiceSize + kMissingOffset;
697  return Resolution(nullptr);
698  }
699 
700  void
702  Principal const& principal,
703  bool skipCurrentProcess,
705  ModuleCallingContext const* mcc) const {
706  if(not skipCurrentProcess) {
707  waitingTasks_.add(waitTask);
708 
709  bool expected = false;
710  if( prefetchRequested_.compare_exchange_strong(expected,true)) {
711  //we are the first thread to request
712  tryPrefetchResolverAsync(0, principal, skipCurrentProcess, sra, mcc, ServiceRegistry::instance().presentToken());
713  }
714  } else {
715  skippingWaitingTasks_.add(waitTask);
716  bool expected = false;
717  if( skippingPrefetchRequested_.compare_exchange_strong(expected,true)) {
718  //we are the first thread to request
719  tryPrefetchResolverAsync(0, principal, skipCurrentProcess, sra, mcc, ServiceRegistry::instance().presentToken());
720  }
721  }
722  }
723 
724  void NoProcessProductResolver::setCache(bool iSkipCurrentProcess,
725  ProductResolverIndex iIndex,
726  std::exception_ptr iExceptPtr) const {
727  if( not iSkipCurrentProcess) {
728  lastCheckIndex_ = iIndex;
729  waitingTasks_.doneWaiting(iExceptPtr);
730  } else {
733  }
734  }
735 
736  namespace {
737  class TryNextResolverWaitingTask : public edm::WaitingTask {
738  public:
739 
740  TryNextResolverWaitingTask(NoProcessProductResolver const* iResolver,
741  unsigned int iResolverIndex,
742  Principal const* iPrincipal,
744  ModuleCallingContext const* iMCC,
745  bool iSkipCurrentProcess,
746  ServiceToken iToken) :
747  resolver_(iResolver),
748  principal_(iPrincipal),
749  sra_(iSRA),
750  mcc_(iMCC),
751  serviceToken_(iToken),
752  index_(iResolverIndex),
753  skipCurrentProcess_(iSkipCurrentProcess){}
754 
755  tbb::task* execute() override {
756  auto exceptPtr =exceptionPtr();
757  if(exceptPtr) {
758  resolver_->prefetchFailed(index_, *principal_, skipCurrentProcess_, *exceptPtr);
759  } else {
760  if(not resolver_->dataValidFromResolver(index_, *principal_, skipCurrentProcess_)) {
761  resolver_->tryPrefetchResolverAsync(index_+1,
762  *principal_,
763  skipCurrentProcess_,
764  sra_,
765  mcc_,
766  serviceToken_);
767  }
768  }
769  return nullptr;
770  }
771 
772  private:
773  NoProcessProductResolver const* resolver_;
774  Principal const* principal_;
776  ModuleCallingContext const* mcc_;
777  ServiceToken serviceToken_;
778  unsigned int index_;
779  bool skipCurrentProcess_;
780  };
781  }
782 
783  void
784  NoProcessProductResolver::prefetchFailed(unsigned int iProcessingIndex,
785  Principal const& principal,
786  bool iSkipCurrentProcess,
787  std::exception_ptr iExceptPtr) const {
788  std::vector<unsigned int> const& lookupProcessOrder = principal.lookupProcessOrder();
789  auto k = lookupProcessOrder[iProcessingIndex];
790 
791  setCache(iSkipCurrentProcess, k, iExceptPtr);
792  }
793 
794 
795  bool
796  NoProcessProductResolver::dataValidFromResolver(unsigned int iProcessingIndex,
797  Principal const& principal,
798  bool iSkipCurrentProcess) const {
799  std::vector<unsigned int> const& lookupProcessOrder = principal.lookupProcessOrder();
800  auto k = lookupProcessOrder[iProcessingIndex];
801  ProductResolverBase const* productResolver = principal.getProductResolverByIndex(matchingHolders_[k]);
802 
803  if(productResolver->productWasFetchedAndIsValid(iSkipCurrentProcess)) {
804 
805  setCache(iSkipCurrentProcess, k, nullptr);
806  return true;
807  }
808  return false;
809  }
810 
811 
812  void
814  Principal const& principal,
815  bool skipCurrentProcess,
817  ModuleCallingContext const* mcc,
818  ServiceToken token) const {
819  std::vector<unsigned int> const& lookupProcessOrder = principal.lookupProcessOrder();
820  auto index = iProcessingIndex;
821 
822  const unsigned int choiceSize = ambiguous_.size();
823  unsigned int newCacheIndex = choiceSize + kMissingOffset;
824  while(index < lookupProcessOrder.size()) {
825  auto k = lookupProcessOrder[index];
826  if(k==0) {
827  break;
828  }
829  assert(k < ambiguous_.size());
830  if(ambiguous_[k]) {
831  newCacheIndex = choiceSize + kAmbiguousOffset;
832  break;
833  }
835  //make new task
836 
837  auto task = new (tbb::task::allocate_root()) TryNextResolverWaitingTask(
838  this,
839  index,
840  &principal,
841  sra,
842  mcc,
843  skipCurrentProcess,
844  token
845  );
846  task->increment_ref_count();
847  ProductResolverBase const* productResolver = principal.getProductResolverByIndex(matchingHolders_[k]);
848 
849  //Make sure the Services are available on this thread
850  ServiceRegistry::Operate guard(token);
851 
852  productResolver->prefetchAsync(task,
853  principal,
854  skipCurrentProcess,
855  sra, mcc);
856  if(0 == task->decrement_ref_count()) {
857  tbb::task::spawn(*task);
858  }
859  return;
860  }
861  ++index;
862  }
863  //data product unavailable
864  setCache(skipCurrentProcess, newCacheIndex, nullptr);
865  }
866 
868  }
869 
871  }
872 
874  return nullptr;
875  }
876 
877  inline unsigned int NoProcessProductResolver::unsetIndexValue() const { return ambiguous_.size()+kUnsetOffset; }
878 
880  const auto resetValue = unsetIndexValue();
881  lastCheckIndex_ = resetValue;
882  lastSkipCurrentCheckIndex_ = resetValue;
883  prefetchRequested_ = false;
887  }
888 
890  return false;
891  }
892 
895  << "NoProcessProductResolver::unscheduledWasNotRun_() not implemented and should never be called.\n"
896  << "Contact a Framework developer\n";
897  }
898 
901  << "NoProcessProductResolver::productUnavailable_() not implemented and should never be called.\n"
902  << "Contact a Framework developer\n";
903  }
904 
907  << "NoProcessProductResolver::productResolved_() not implemented and should never be called.\n"
908  << "Contact a Framework developer\n";
909  }
910 
913  << "NoProcessProductResolver::productWasDeleted_() not implemented and should never be called.\n"
914  << "Contact a Framework developer\n";
915  }
916 
917  bool NoProcessProductResolver::productWasFetchedAndIsValid_(bool /*iSkipCurrentProcess*/) const {
919  << "NoProcessProductResolver::productWasFetchedAndIsValid_() not implemented and should never be called.\n"
920  << "Contact a Framework developer\n";
921  }
922 
923  void NoProcessProductResolver::putProduct_(std::unique_ptr<WrapperBase> ) const {
925  << "NoProcessProductResolver::putProduct_() not implemented and should never be called.\n"
926  << "Contact a Framework developer\n";
927  }
928 
929  void NoProcessProductResolver::putOrMergeProduct_(std::unique_ptr<WrapperBase> edp) const {
931  << "NoProcessProductResolver::putOrMergeProduct_(std::unique_ptr<WrapperBase> edp) not implemented and should never be called.\n"
932  << "Contact a Framework developer\n";
933  }
934 
937  << "NoProcessProductResolver::branchDescription_() not implemented and should never be called.\n"
938  << "Contact a Framework developer\n";
939  }
940 
941  void NoProcessProductResolver::resetBranchDescription_(std::shared_ptr<BranchDescription const>) {
943  << "NoProcessProductResolver::resetBranchDescription_() not implemented and should never be called.\n"
944  << "Contact a Framework developer\n";
945  }
946 
949  << "NoProcessProductResolver::provenance_() not implemented and should never be called.\n"
950  << "Contact a Framework developer\n";
951  }
952 
955  << "NoProcessProductResolver::connectTo() not implemented and should never be called.\n"
956  << "Contact a Framework developer\n";
957 
958  }
959 
960  //---- SingleChoiceNoProcessProductResolver ----------------
962  Principal const& principal,
963  bool skipCurrentProcess,
965  ModuleCallingContext const* mcc) const
966  {
967  //NOTE: Have to lookup the other ProductResolver each time rather than cache
968  // it's pointer since it appears the pointer can change at some later stage
969  return principal.getProductResolverByIndex(realResolverIndex_)
970  ->resolveProduct(principal,
971  skipCurrentProcess, sra, mcc);
972  }
973 
975  Principal const& principal,
976  bool skipCurrentProcess,
978  ModuleCallingContext const* mcc) const {
979  principal.getProductResolverByIndex(realResolverIndex_)
980  ->prefetchAsync(waitTask,principal,
981  skipCurrentProcess, sra, mcc);
982  }
983 
985  }
986 
988  }
989 
991  return nullptr;
992  }
993 
995  }
996 
998  return false;
999  }
1000 
1003  << "SingleChoiceNoProcessProductResolver::unscheduledWasNotRun_() not implemented and should never be called.\n"
1004  << "Contact a Framework developer\n";
1005  }
1006 
1009  << "SingleChoiceNoProcessProductResolver::productUnavailable_() not implemented and should never be called.\n"
1010  << "Contact a Framework developer\n";
1011  }
1012 
1015  << "SingleChoiceNoProcessProductResolver::productResolved_() not implemented and should never be called.\n"
1016  << "Contact a Framework developer\n";
1017  }
1018 
1021  << "SingleChoiceNoProcessProductResolver::productWasDeleted_() not implemented and should never be called.\n"
1022  << "Contact a Framework developer\n";
1023  }
1024 
1027  << "SingleChoiceNoProcessProductResolver::productWasFetchedAndIsValid_() not implemented and should never be called.\n"
1028  << "Contact a Framework developer\n";
1029  }
1030 
1031 
1032  void SingleChoiceNoProcessProductResolver::putProduct_(std::unique_ptr<WrapperBase> ) const {
1034  << "SingleChoiceNoProcessProductResolver::putProduct_() not implemented and should never be called.\n"
1035  << "Contact a Framework developer\n";
1036  }
1037 
1038  void SingleChoiceNoProcessProductResolver::putOrMergeProduct_(std::unique_ptr<WrapperBase> edp) const {
1040  << "SingleChoiceNoProcessProductResolver::putOrMergeProduct_(std::unique_ptr<WrapperBase> edp) not implemented and should never be called.\n"
1041  << "Contact a Framework developer\n";
1042  }
1043 
1046  << "SingleChoiceNoProcessProductResolver::branchDescription_() not implemented and should never be called.\n"
1047  << "Contact a Framework developer\n";
1048  }
1049 
1050  void SingleChoiceNoProcessProductResolver::resetBranchDescription_(std::shared_ptr<BranchDescription const>) {
1052  << "SingleChoiceNoProcessProductResolver::resetBranchDescription_() not implemented and should never be called.\n"
1053  << "Contact a Framework developer\n";
1054  }
1055 
1058  << "SingleChoiceNoProcessProductResolver::provenance_() not implemented and should never be called.\n"
1059  << "Contact a Framework developer\n";
1060  }
1061 
1064  << "SingleChoiceNoProcessProductResolver::connectTo() not implemented and should never be called.\n"
1065  << "Contact a Framework developer\n";
1066 
1067  }
1068 
1069 }
virtual bool productResolved_() const override final
virtual void connectTo(ProductResolverBase const &iOther, Principal const *) override final
size
Write out results.
virtual Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
std::string const & branchName() const
std::string const & productInstanceName() const
virtual void putOrMergeProduct_(std::unique_ptr< WrapperBase > prod) const override final
std::unique_ptr< T, F > make_sentry(T *iObject, F iFunc)
NOTE: if iObject is null, then iFunc will not be called.
Definition: make_sentry.h:29
virtual void resetProductData_(bool deleteEarly) override
Provenance const * provenance() const
void setProcessHistory(ProcessHistory const &ph)
Definition: ProductData.h:59
unsigned int ProductResolverIndex
StreamContext const * getStreamContext() const
virtual void resetProductData_(bool deleteEarly) override
virtual bool productUnavailable_() const override final
void prefetchAsync(WaitingTask *waitTask, Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const
void checkType(WrapperBase const &prod) const
std::type_info const & dynamicTypeInfo() const
Definition: WrapperBase.h:38
unsigned int unsetIndexValue() const
virtual void prefetchAsync_(WaitingTask *waitTask, Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
virtual void resetProductData_(bool deleteEarly) override
FunctorTask< F > * make_functor_task(ALLOC &&iAlloc, F f)
Definition: FunctorTask.h:47
void push(const T &iAction)
asynchronously pushes functor iAction into queue
void setCache(bool skipCurrentProcess, ProductResolverIndex index, std::exception_ptr exceptionPtr) const
void add(WaitingTask *)
Adds task to the waiting list.
virtual void setProcessHistory_(ProcessHistory const &ph) override
ProductStatus status() const
virtual bool singleProduct_() const override
virtual bool unscheduledWasNotRun_() const override
virtual void putProduct_(std::unique_ptr< WrapperBase > edp) const override
virtual void connectTo(ProductResolverBase const &iOther, Principal const *) override final
virtual bool productWasFetchedAndIsValid_(bool iSkipCurrentProcess) const override final
Resolution tryResolver(unsigned int index, Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const
WrapperBase const * wrapper() const
Definition: ProductData.h:32
virtual bool productWasDeleted_() const override final
virtual bool singleProduct_() const override
virtual void resetProductData_(bool deleteEarly) override
ProductProvenance const * productProvenance() const
Definition: Provenance.cc:31
Resolution resolveProduct(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const
virtual void setProvenance_(ProductProvenanceRetriever const *provRetriever, ProcessHistory const &ph, ProductID const &pid) override
void reset()
Resets access to the resource so that added tasks will wait.
virtual BranchDescription const & branchDescription_() const override
static unsigned int kUnsetOffset
ServiceToken presentToken() const
virtual void resetProductData_(bool deleteEarly) override
std::string const & processName() const
virtual void retrieveAndMerge_(Principal const &principal) const override
#define constexpr
virtual void putProduct_(std::unique_ptr< WrapperBase > edp) const override
bool isPresent() const
Definition: WrapperBase.h:22
virtual void prefetchAsync_(WaitingTask *waitTask, Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
virtual void prefetchAsync_(WaitingTask *waitTask, Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
virtual ProductProvenance const * productProvenancePtr_() const override
static unsigned int kMissingOffset
virtual void putProduct_(std::unique_ptr< WrapperBase > edp) const override
void putProduct(std::unique_ptr< WrapperBase > edp) const
void unsafe_setWrapper(std::unique_ptr< WrapperBase > iValue) const
Definition: ProductData.cc:38
void prefetchFailed(unsigned int iProcessingIndex, Principal const &principal, bool iSkipCurrentProcess, std::exception_ptr iExceptPtr) const
virtual void putOrMergeProduct_(std::unique_ptr< WrapperBase > prod) const override final
virtual void resetFailedFromThisProcess() override
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
void setProvenance(ProductProvenanceRetriever const *provRetriever, ProcessHistory const &ph, ProductID const &pid)
Definition: ProductData.h:63
virtual void setProcessHistory_(ProcessHistory const &ph) override final
virtual bool isFromCurrentProcess() const override final
def principal(options)
std::string const & className() const
std::atomic< unsigned int > lastSkipCurrentCheckIndex_
void resetProductData()
Definition: ProductData.h:51
virtual bool singleProduct_() const override
virtual Provenance const * provenance_() const override
std::vector< unsigned int > const & lookupProcessOrder() const
Definition: Principal.h:190
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
virtual void putOrMergeProduct_(std::unique_ptr< WrapperBase > prod) const override final
virtual Provenance const * provenance_() const override
bool dataValidFromResolver(unsigned int iProcessingIndex, Principal const &principal, bool iSkipCurrentProcess) const
virtual void setProvenance_(ProductProvenanceRetriever const *provRetriever, ProcessHistory const &ph, ProductID const &pid) override
TypeID unwrappedTypeID() const
virtual void resetProductData_(bool deleteEarly) override
static ServiceRegistry & instance()
virtual void setProcessHistory_(ProcessHistory const &ph) override
virtual ProductProvenance const * productProvenancePtr_() const override
virtual bool productUnavailable_() const override
virtual void resetBranchDescription_(std::shared_ptr< BranchDescription const > bd) override
void mergeProduct(std::unique_ptr< WrapperBase > edp) const
virtual ProductProvenance const * productProvenancePtr_() const override
std::atomic< bool > prefetchRequested_
virtual ProductProvenance const * productProvenancePtr_() const override
std::atomic< unsigned int > lastCheckIndex_
virtual void prefetchAsync_(WaitingTask *waitTask, Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger but the state exists so we define the behavior If all triggers are the negative crieriion will lead to accepting the event(this again matches the behavior of"!*"before the partial wildcard feature was incorporated).The per-event"cost"of each negative criterion with multiple relevant triggers is about the same as!*was in the past
virtual void prefetchAsync_(WaitingTask *waitTask, Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
virtual bool productWasDeleted_() const override
virtual void putProduct_(std::unique_ptr< WrapperBase > edp) const override
virtual void setProvenance_(ProductProvenanceRetriever const *provRetriever, ProcessHistory const &ph, ProductID const &pid) override final
virtual bool isFromCurrentProcess() const =0
virtual ProductProvenance const * productProvenancePtr_() const override final
virtual Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
virtual Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
int k[5][pyjets_maxn]
void setProduct(std::unique_ptr< WrapperBase > edp) const
std::atomic< bool > skippingPrefetchRequested_
std::atomic< ProductStatus > theStatus_
virtual bool singleProduct_() const override final
std::vector< bool > ambiguous_
Resolution resolveProductImpl(FUNC resolver) const
virtual bool productResolved_() const override final
DelayedReader * reader() const
Definition: Principal.h:178
UnscheduledAuxiliary const * auxiliary() const
virtual void resetProductData_(bool deleteEarly) override
static unsigned int kAmbiguousOffset
ProductStatus defaultStatus() const
bool productWasFetchedAndIsValid(bool iSkipCurrentProcess) const
virtual void setProcessHistory_(ProcessHistory const &ph) override
def load(fileName)
Definition: svgfig.py:546
virtual BranchDescription const & branchDescription_() const override
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:90
virtual bool isFromCurrentProcess() const override final
virtual bool productUnavailable_() const override
void addContext(std::string const &context)
Definition: Exception.cc:227
virtual void setupUnscheduled(UnscheduledConfigurator const &) override final
virtual void setupUnscheduled(UnscheduledConfigurator const &) override final
virtual bool singleProduct_() const override
std::vector< ProductResolverIndex > matchingHolders_
virtual void resetBranchDescription_(std::shared_ptr< BranchDescription const > bd) override
void tryPrefetchResolverAsync(unsigned int iProcessingIndex, Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc, ServiceToken token) const
HLT enums.
BranchDescription const & branchDescription() const
virtual void putOrMergeProduct_(std::unique_ptr< WrapperBase > prod) const override final
Worker * findWorker(std::string const &iLabel) const
ConstProductResolverPtr getProductResolverByIndex(ProductResolverIndex const &oid) const
Definition: Principal.cc:442
virtual bool productResolved_() const override final
virtual void putProduct_(std::unique_ptr< WrapperBase > edp) const override
virtual void setProvenance_(ProductProvenanceRetriever const *provRetriever, ProcessHistory const &ph, ProductID const &pid) override
virtual void setProcessHistory_(ProcessHistory const &ph) override
virtual void putProduct_(std::unique_ptr< WrapperBase > edp) const override
ProductData const & getProductData() const
virtual bool productWasDeleted_() const override
virtual Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
virtual bool productWasFetchedAndIsValid_(bool iSkipCurrentProcess) const override
virtual void putOrMergeProduct_(std::unique_ptr< WrapperBase > prod) const override final
virtual void resetProductData_(bool deleteEarly) override
NoProcessProductResolver(std::vector< ProductResolverIndex > const &matchingHolders, std::vector< bool > const &ambiguous)
virtual bool unscheduledWasNotRun_() const override
virtual Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
void putOrMergeProduct(std::unique_ptr< WrapperBase > edp) const
virtual void setProvenance_(ProductProvenanceRetriever const *provRetriever, ProcessHistory const &ph, ProductID const &pid) override
virtual bool productWasFetchedAndIsValid_(bool iSkipCurrentProcess) const override
def move(src, dest)
Definition: eostools.py:510
def branchType(schema, name)
Definition: revisionDML.py:112
std::string const & moduleLabel() const
WrapperBase * unsafe_wrapper() const
Definition: ProductData.h:34
static HepMC::HEPEVT_Wrapper wrapper
virtual void connectTo(ProductResolverBase const &, Principal const *) override final
virtual void putProduct_(std::unique_ptr< WrapperBase > edp) const override
virtual void setupUnscheduled(UnscheduledConfigurator const &) override final