CMS 3D CMS Logo

ProductResolvers.cc
Go to the documentation of this file.
1 /*----------------------------------------------------------------------
2 ----------------------------------------------------------------------*/
3 #include "ProductResolvers.h"
4 #include "Worker.h"
5 #include "UnscheduledAuxiliary.h"
20 
21 #include <cassert>
22 #include <utility>
23 
24 static constexpr unsigned int kUnsetOffset = 0;
25 static constexpr unsigned int kAmbiguousOffset = 1;
26 static constexpr unsigned int kMissingOffset = 2;
27 
28 namespace edm {
29 
32  exception << "ProductResolverBase::resolveProduct_: The product matching all criteria was already deleted\n"
33  << "Looking for type: " << branchDescription().unwrappedTypeID() << "\n"
34  << "Looking for module label: " << moduleLabel() << "\n"
35  << "Looking for productInstanceName: " << productInstanceName() << "\n"
36  << (processName().empty() ? "" : "Looking for process: ") << processName() << "\n"
37  << "This means there is a configuration error.\n"
38  << "The module which is asking for this data must be configured to state that it will read this data.";
39  throw exception;
40 
41  }
42 
43  //This is a templated function in order to avoid calling another virtual function
44  template <bool callResolver, typename FUNC>
47 
48  if(productWasDeleted()) {
50  }
51  auto presentStatus = status();
52 
53  if(callResolver && presentStatus == ProductStatus::ResolveNotRun) {
54  //if resolver fails because of exception or not setting product
55  // make sure the status goes to failed
56  auto failedStatusSetter = [this](ProductStatus* presentStatus) {
57  if(this->status() == ProductStatus::ResolveNotRun) {
58  this->setFailedStatus();
59  }
60  *presentStatus = this->status();
61  };
62  std::unique_ptr<ProductStatus, decltype(failedStatusSetter)> failedStatusGuard(&presentStatus, failedStatusSetter);
63 
64  //If successful, this will call setProduct
65  resolver();
66  }
67 
68 
69  if (presentStatus == ProductStatus::ProductSet) {
70  auto pd = &getProductData();
71  if(pd->wrapper()->isPresent()) {
72  return Resolution(pd);
73  }
74  }
75 
76  return Resolution(nullptr);
77  }
78 
79  void
80  DataManagingProductResolver::mergeProduct(std::unique_ptr<WrapperBase> iFrom, MergeableRunProductMetadata const* mergeableRunProductMetadata) const {
81 
82  // if its not mergeable and the previous read failed, go ahead and use this one
84  setProduct(std::move(iFrom));
85  return;
86  }
87 
88  assert(status() == ProductStatus::ProductSet);
89  if(not iFrom) { return;}
90 
91  checkType(*iFrom);
92 
94  if (original->isMergeable()) {
95  if (original->isPresent() != iFrom->isPresent()) {
97  << "Merge of Run or Lumi product failed for branch " << branchDescription().branchName() << "\n"
98  << "Was trying to merge objects where one product had been put in the input file and the other had not been." << "\n"
99  << "The solution is to drop the branch on input. Or better do not create inconsistent files\n"
100  << "that need to be merged in the first place.\n";
101  }
102  if (original->isPresent()) {
103 
104  BranchDescription const& desc = branchDescription_();
105  if (mergeableRunProductMetadata == nullptr || desc.branchType() != InRun) {
106  original->mergeProduct(iFrom.get());
107  } else {
108  MergeableRunProductMetadata::MergeDecision decision = mergeableRunProductMetadata->getMergeDecision(desc.processName());
109  if (decision == MergeableRunProductMetadata::MERGE) {
110  original->mergeProduct(iFrom.get());
111  } else if (decision == MergeableRunProductMetadata::REPLACE) {
112  // Note this swaps the content of the product where the
113  // both products branches are present and the product is
114  // also present (was put) in the branch. A module might
115  // have already gotten a pointer to the product so we
116  // keep those pointers valid. This does not call swap
117  // on the Wrapper.
118  original->swapProduct(iFrom.get());
119  }
120  // If the decision is IGNORE, do nothing
121  }
122  }
123  // If both have isPresent false, do nothing
124 
125  } else if(original->hasIsProductEqual()) {
126  if (original->isPresent() && iFrom->isPresent()) {
127  if(!original->isProductEqual(iFrom.get())) {
128  auto const& bd = branchDescription();
129  edm::LogError("RunLumiMerging")
130  << "ProductResolver::mergeTheProduct\n"
131  << "Two run/lumi products for the same run/lumi which should be equal are not\n"
132  << "Using the first, ignoring the second\n"
133  << "className = " << bd.className() << "\n"
134  << "moduleLabel = " << bd.moduleLabel() << "\n"
135  << "instance = " << bd.productInstanceName() << "\n"
136  << "process = " << bd.processName() << "\n";
137  }
138  } else if (!original->isPresent() && iFrom->isPresent()) {
139  setProduct(std::move(iFrom));
140  }
141  // if not iFrom->isPresent(), do nothing
142  } else {
143  auto const& bd = branchDescription();
144  edm::LogWarning("RunLumiMerging")
145  << "ProductResolver::mergeTheProduct\n"
146  << "Run/lumi product has neither a mergeProduct nor isProductEqual function\n"
147  << "Using the first, ignoring the second in merge\n"
148  << "className = " << bd.className() << "\n"
149  << "moduleLabel = " << bd.moduleLabel() << "\n"
150  << "instance = " << bd.productInstanceName() << "\n"
151  << "process = " << bd.processName() << "\n";
152  if (!original->isPresent() && iFrom->isPresent()) {
153  setProduct(std::move(iFrom));
154  }
155  // In other cases, do nothing
156  }
157  }
158 
161  bool,
163  ModuleCallingContext const* mcc) const {
164  return resolveProductImpl<true>([this,&principal,mcc]() {
165  auto branchType = principal.branchType();
166  if(branchType != InEvent) {
167  //delayed get has not been allowed with Run or Lumis
168  // The file may already be closed so the reader is invalid
169  return;
170  }
171  if(mcc and (branchType == InEvent) and aux_) {
172  aux_->preModuleDelayedGetSignal_.emit(*(mcc->getStreamContext()),*mcc);
173  }
174 
175  auto sentry( make_sentry(mcc,
176  [this, branchType](ModuleCallingContext const* iContext){
177  if(branchType == InEvent and aux_) {
178  aux_->postModuleDelayedGetSignal_.emit(*(iContext->getStreamContext()), *iContext); }
179  }));
180 
181  if(auto reader=principal.reader()) {
182  std::unique_lock<std::recursive_mutex> guard;
183  if(auto sr = reader->sharedResources().second) {
184  guard =std::unique_lock<std::recursive_mutex>(*sr);
185  }
186  if ( not productResolved()) {
187  //another thread could have beaten us here
188  putProduct( reader->getProduct(branchDescription().branchID(), &principal, mcc));
189  }
190  }
191  });
192 
193  }
194 
195  void
197 
198  if(auto reader = principal.reader()) {
199 
200  std::unique_lock<std::recursive_mutex> guard;
201  if(auto sr = reader->sharedResources().second) {
202  guard =std::unique_lock<std::recursive_mutex>(*sr);
203  }
204 
205  //Can't use resolveProductImpl since it first checks to see
206  // if the product was already retrieved and then returns if it is
207  std::unique_ptr<WrapperBase> edp(reader->getProduct(branchDescription().branchID(), &principal));
208 
209  if (edp.get() != nullptr) {
210  if (edp->isMergeable() && branchDescription().branchType() == InRun && !edp->hasSwap()) {
212  << "Missing definition of member function swap for branch name " << branchDescription().branchName() << "\n"
213  << "Mergeable data types written to a Run must have the swap member function defined" << "\n";
214  }
215  if (status() == defaultStatus() ||
217  (status() == ProductStatus::ResolveFailed && !branchDescription().isMergeable())) {
218  putOrMergeProduct(std::move(edp), mergeableRunProductMetadata);
219  } else { // status() == ResolveFailed && branchDescription().isMergeable()
221  << "Merge of Run or Lumi product failed for branch " << branchDescription().branchName() << "\n"
222  << "The product branch was dropped in the first run or lumi fragment and present in a later one" << "\n"
223  << "The solution is to drop the branch on input. Or better do not create inconsistent files\n"
224  << "that need to be merged in the first place.\n";
225  }
226  } else if (status() == defaultStatus()) {
227  setFailedStatus();
228  } else if ( status() != ProductStatus::ResolveFailed && branchDescription().isMergeable()) {
230  << "Merge of Run or Lumi product failed for branch " << branchDescription().branchName() << "\n"
231  << "The product branch was present in first run or lumi fragment and dropped in a later one" << "\n"
232  << "The solution is to drop the branch on input. Or better do not create inconsistent files\n"
233  << "that need to be merged in the first place.\n";
234  }
235  // Do nothing in other case. status is ResolveFailed already or
236  // it is not mergeable and the status is ProductSet
237  }
238  }
239 
240  void
243  }
244 
246  Principal const& principal,
247  bool skipCurrentProcess,
248  ServiceToken const& token,
250  ModuleCallingContext const* mcc) const {
251  m_waitingTasks.add(waitTask);
252 
253  bool expected = false;
254  if( m_prefetchRequested.compare_exchange_strong(expected, true) ) {
255 
256  auto workToDo = [this, mcc, &principal, token] () {
257  //need to make sure Service system is activated on the reading thread
258  ServiceRegistry::Operate guard(token);
259  try {
260  resolveProductImpl<true>([this,&principal,mcc]() {
261  if(principal.branchType() != InEvent) { return; }
262  if(auto reader = principal.reader()) {
263  std::unique_lock<std::recursive_mutex> guard;
264  if(auto sr = reader->sharedResources().second) {
265  guard =std::unique_lock<std::recursive_mutex>(*sr);
266  }
267  if ( not productResolved()) {
268  //another thread could have finished this while we were waiting
269  putProduct( reader->getProduct(branchDescription().branchID(), &principal, mcc));
270  }
271  }
272  });
273  } catch(...) {
274  this->m_waitingTasks.doneWaiting(std::current_exception());
275  return;
276  }
277  this->m_waitingTasks.doneWaiting(nullptr);
278  };
279 
280  SerialTaskQueueChain* queue = nullptr;
281  if(auto reader = principal.reader()) {
282  if (auto shared_res = reader->sharedResources().first) {
283  queue = &(shared_res->serialQueueChain());
284  }
285  }
286  if(queue) {
287  queue->push(workToDo);
288  } else {
289  //Have to create a new task
290  auto t = make_functor_task(tbb::task::allocate_root(),
291  workToDo);
292  tbb::task::spawn(*t);
293  }
294  }
295  }
296 
297  void
299  m_prefetchRequested = false;
300  m_waitingTasks.reset();
302  }
303 
304  void
306  aux_ = iConfigure.auxiliary();
307  }
308 
309 
310  bool
312  return false;
313  }
314 
315 
318  bool skipCurrentProcess,
320  ModuleCallingContext const*) const {
321  if (!skipCurrentProcess) {
322  //'false' means never call the lambda function
323  return resolveProductImpl<false>([](){return;});
324  }
325  return Resolution(nullptr);
326  }
327 
329  Principal const& principal,
330  bool skipCurrentProcess,
331  ServiceToken const& token,
333  ModuleCallingContext const* mcc) const {
334  if(not skipCurrentProcess) {
335  if(branchDescription().availableOnlyAtEndTransition() and mcc ) {
336  if( not mcc->parent().isAtEndTransition() ) {
337  return;
338  }
339  }
340  m_waitingTasks.add(waitTask);
341 
342  bool expected = false;
343  if(worker_ and prefetchRequested_.compare_exchange_strong(expected,true)) {
344  //using a waiting task to do a callback guarantees that
345  // the m_waitingTasks list will be released from waiting even
346  // if the module does not put this data product or the
347  // module has an exception while running
348 
349  auto waiting = make_waiting_task(tbb::task::allocate_root(),
350  [this](std::exception_ptr const * iException) {
351  if(nullptr != iException) {
352  m_waitingTasks.doneWaiting(*iException);
353  } else {
354  m_waitingTasks.doneWaiting(std::exception_ptr());
355  }
356  });
357  worker_->callWhenDoneAsync(waiting);
358  }
359  }
360  }
361 
362  void
363  PuttableProductResolver::putProduct_(std::unique_ptr<WrapperBase> edp) const {
365  bool expected = false;
366  if(prefetchRequested_.compare_exchange_strong(expected,true)) {
367  m_waitingTasks.doneWaiting(std::exception_ptr());
368  }
369  }
370 
371 
372  void
374  m_waitingTasks.reset();
376  prefetchRequested_ = false;
377  }
378 
379  void
381  worker_ = iConfigure.findWorker(branchDescription().moduleLabel());
382  }
383 
384 
385  void
387  aux_ = iConfigure.auxiliary();
388  worker_ = iConfigure.findWorker(branchDescription().moduleLabel());
389  assert(worker_ != nullptr);
390 
391  }
392 
395  bool skipCurrentProcess,
397  ModuleCallingContext const* mcc) const {
398  if (!skipCurrentProcess and worker_) {
399  return resolveProductImpl<true>(
400  [&principal,this,sra,mcc]() {
401  try {
402  auto const& event = static_cast<EventPrincipal const&>(principal);
403  ParentContext parentContext(mcc);
404  aux_->preModuleDelayedGetSignal_.emit(*(mcc->getStreamContext()),*mcc);
405 
406  auto workCall = [this,&event,&parentContext,mcc] () {
407  auto sentry( make_sentry(mcc,[this](ModuleCallingContext const* iContext){aux_->postModuleDelayedGetSignal_.emit(*(iContext->getStreamContext()), *iContext); }));
408 
410  event,
411  *(aux_->eventSetup()),
412  event.streamID(),
413  parentContext,
414  mcc->getStreamContext());
415  };
416 
417  if (sra) {
418  assert(false);
419  } else {
420  workCall();
421  }
422 
423  }
424  catch (cms::Exception & ex) {
425  std::ostringstream ost;
426  ost << "Calling produce method for unscheduled module "
427  << worker_->description().moduleName() << "/'"
428  << worker_->description().moduleLabel() << "'";
429  ex.addContext(ost.str());
430  throw;
431  }
432  });
433  }
434  return Resolution(nullptr);
435  }
436 
437  void
439  Principal const& principal,
440  bool skipCurrentProcess,
441  ServiceToken const& token,
443  ModuleCallingContext const* mcc) const
444  {
445  if(skipCurrentProcess) { return; }
446  waitingTasks_.add(waitTask);
447  bool expected = false;
448  if(prefetchRequested_.compare_exchange_strong(expected, true)) {
449 
450  //Have to create a new task which will make sure the state for UnscheduledProductResolver
451  // is properly set after the module has run
452  auto t = make_waiting_task(tbb::task::allocate_root(),
453  [this](std::exception_ptr const* iPtr)
454  {
455  //The exception is being rethrown because resolveProductImpl sets the ProductResolver to a failed
456  // state for the case where an exception occurs during the call to the function.
457  try {
458  resolveProductImpl<true>([iPtr]() {
459  if ( iPtr) {
460  std::rethrow_exception(*iPtr);
461  }
462  });
463  } catch(...) {
464  waitingTasks_.doneWaiting(std::current_exception());
465  return;
466  }
467  waitingTasks_.doneWaiting(nullptr);
468  } );
469  auto const& event = static_cast<EventPrincipal const&>(principal);
470  ParentContext parentContext(mcc);
471 
473  event,
474  *(aux_->eventSetup()),
475  token,
476  event.streamID(),
477  parentContext,
478  mcc->getStreamContext());
479  }
480  }
481 
482  void
484  prefetchRequested_ = false;
485  waitingTasks_.reset();
487  }
488 
489 
490  void
491  ProducedProductResolver::putProduct_(std::unique_ptr<WrapperBase> edp) const {
492  if(status() != defaultStatus()) {
494  << "Attempt to insert more than one product on branch " << branchDescription().branchName() << "\n";
495  }
496 
497  setProduct(std::move(edp)); // ProductResolver takes ownership
498  }
499 
500  void
501  InputProductResolver::putProduct_(std::unique_ptr<WrapperBase> edp) const {
502  if ( not productResolved()) {
503  //Another thread could have set this
504  setProduct(std::move(edp));
505  }
506  }
507 
508  bool
510  return true;
511  }
512 
513  void
515  assert(false);
516  }
517 
518  void
519  DataManagingProductResolver::putOrMergeProduct_(std::unique_ptr<WrapperBase> prod, MergeableRunProductMetadata const* mergeableRunProductMetadata) const {
520  if(not prod) {return;}
521  if(status() == defaultStatus()) {
522  //resolveProduct has not been called or it failed
523  putProduct(std::move(prod));
524  } else {
525  mergeProduct(std::move(prod), mergeableRunProductMetadata);
526  }
527  }
528 
529 
530 
531  void
533  // Check if the types match.
534  TypeID typeID(prod.dynamicTypeInfo());
536  // Types do not match.
538  << "Product on branch " << branchDescription().branchName() << " is of wrong type.\n"
539  << "It is supposed to be of type " << branchDescription().className() << ".\n"
540  << "It is actually of type " << typeID.className() << ".\n";
541  }
542  }
543 
544 
545  void
546  DataManagingProductResolver::setProduct(std::unique_ptr<WrapperBase> edp) const {
547  if(edp) {
548  checkType(*edp);
551  } else {
552  setFailedStatus();
553  }
554  }
555  // This routine returns true if it is known that currently there is no real product.
556  // If there is a real product, it returns false.
557  // If it is not known if there is a real product, it returns false.
558  bool
560  auto presentStatus = status();
561  if(presentStatus == ProductStatus::ProductSet) {
562  return !(getProductData().wrapper()->isPresent());
563  }
564  return presentStatus != ProductStatus::ResolveNotRun;
565  }
566 
567  bool
569  auto s = status();
570  return (s != defaultStatus() ) or (s == ProductStatus::ProductDeleted);
571  }
572 
573 
574  // This routine returns true if the product was deleted early in order to save memory
575  bool
578  }
579 
580  bool
582  if (iSkipCurrentProcess and isFromCurrentProcess() ) {
583  return false;
584  }
586  if(getProductData().wrapper()->isPresent()) {
587  return true;
588  }
589  }
590  return false;
591  }
592 
593 
595  productData_.setProvenance(provRetriever,ph,pid);
596  }
597 
600  }
601 
604  }
605 
607  return provenance()->productProvenance();
608  }
609 
613  }
614  if(deleteEarly) {
616  } else {
617  resetStatus();
618  }
619  }
620 
622  return true;
623  }
624 
626  realProduct_.setProvenance(provRetriever,ph,pid);
627  }
628 
630  realProduct_.setProcessHistory(ph);
631  }
632 
634  return provenance()->productProvenance();
635  }
636 
638  realProduct_.resetProductData_(deleteEarly);
639  }
640 
642  return true;
643  }
644 
645  void AliasProductResolver::putProduct_(std::unique_ptr<WrapperBase> ) const {
647  << "AliasProductResolver::putProduct_() not implemented and should never be called.\n"
648  << "Contact a Framework developer\n";
649  }
650 
651  void AliasProductResolver::putOrMergeProduct_(std::unique_ptr<WrapperBase> edp, MergeableRunProductMetadata const*) const {
653  << "AliasProductResolver::putOrMergeProduct_(std::unique_ptr<WrapperBase> edp, MergeableRunProductMetadata const*) not implemented and should never be called.\n"
654  << "Contact a Framework developer\n";
655  }
656 
657 
658 
660  provRetriever_ = provRetriever;
661  }
662 
664  }
665 
667  return provRetriever_? provRetriever_->branchIDToProvenance(bd_->originalBranchID()): nullptr;
668  }
669 
671  }
672 
674  return true;
675  }
676 
677  void ParentProcessProductResolver::putProduct_(std::unique_ptr<WrapperBase> ) const {
679  << "ParentProcessProductResolver::putProduct_() not implemented and should never be called.\n"
680  << "Contact a Framework developer\n";
681  }
682 
683  void ParentProcessProductResolver::putOrMergeProduct_(std::unique_ptr<WrapperBase> edp, MergeableRunProductMetadata const*) const {
685  << "ParentProcessProductResolver::putOrMergeProduct_(std::unique_ptr<WrapperBase> edp, MergeableRunProductMetadata const*) not implemented and should never be called.\n"
686  << "Contact a Framework developer\n";
687  }
688 
690  // In principle, this ought to be fixed. I noticed one hits this error
691  // when in a SubProcess and calling the Event::getProvenance function
692  // with a BranchID to a branch from an earlier SubProcess or the top
693  // level process and this branch is not kept in this SubProcess. It might
694  // be possible to hit this in other contexts. I say it ought to be
695  // fixed because one does not encounter this issue if the SubProcesses
696  // are split into genuinely different processes (in principle that
697  // ought to give identical behavior and results). No user has ever
698  // reported this issue which has been around for some time and it was only
699  // noticed when testing some rare corner cases after modifying Core code.
700  // After discussing this with Chris we decided that at least for the moment
701  // there are higher priorities than fixing this ... I converted it so it
702  // causes an exception instead of a seg fault. The issue that may need to
703  // be addressed someday is how ProductResolvers for non-kept branches are
704  // connected to earlier SubProcesses.
706  << "ParentProcessProductResolver::throwNullRealProduct RealProduct pointer not set in this context.\n"
707  << "Contact a Framework developer\n";
708  }
709 
711  NoProcessProductResolver(std::vector<ProductResolverIndex> const& matchingHolders,
712  std::vector<bool> const& ambiguous,
713  bool madeAtEnd) :
714  matchingHolders_(matchingHolders),
715  ambiguous_(ambiguous),
716  lastCheckIndex_(ambiguous_.size() + kUnsetOffset),
717  lastSkipCurrentCheckIndex_(lastCheckIndex_.load()),
718  prefetchRequested_(false),
719  skippingPrefetchRequested_(false),
720  madeAtEnd_{madeAtEnd}
721  {
722  assert(ambiguous_.size() == matchingHolders_.size());
723  }
724 
727  Principal const& principal,
728  bool skipCurrentProcess,
730  ModuleCallingContext const* mcc) const {
731  ProductResolverBase const* productResolver = principal.getProductResolverByIndex(matchingHolders_[index]);
732  return productResolver->resolveProduct(principal, skipCurrentProcess, sra, mcc);
733  }
734 
735 
738  bool skipCurrentProcess,
740  ModuleCallingContext const* mcc) const {
741  //See if we've already cached which Resolver we should call or if
742  // we know it is ambiguous
743  const unsigned int choiceSize = ambiguous_.size();
744 
745  //madeAtEnd_==true and not at end transition is the same as skipping the current process
746  if( (not skipCurrentProcess) and (madeAtEnd_ and mcc)) {
747  skipCurrentProcess = not mcc->parent().isAtEndTransition();
748  }
749 
750  unsigned int checkCacheIndex = skipCurrentProcess? lastSkipCurrentCheckIndex_.load() : lastCheckIndex_.load();
751  if( checkCacheIndex != choiceSize +kUnsetOffset) {
752  if (checkCacheIndex == choiceSize+kAmbiguousOffset) {
754  } else if(checkCacheIndex == choiceSize+kMissingOffset) {
755  return Resolution(nullptr);
756  }
757  return tryResolver(checkCacheIndex, principal, skipCurrentProcess,
758  sra,mcc);
759  }
760 
761  std::atomic<unsigned int>& updateCacheIndex = skipCurrentProcess? lastSkipCurrentCheckIndex_ : lastCheckIndex_;
762 
763  std::vector<unsigned int> const& lookupProcessOrder = principal.lookupProcessOrder();
764  for(unsigned int k : lookupProcessOrder) {
765  assert(k < ambiguous_.size());
766  if(k == 0) break; // Done
767  if(ambiguous_[k]) {
768  updateCacheIndex = choiceSize + kAmbiguousOffset;
770  }
772  auto resolution = tryResolver(k,principal, skipCurrentProcess, sra,mcc);
773  if(resolution.data() != nullptr) {
774  updateCacheIndex = k;
775  return resolution;
776  }
777  }
778  }
779 
780  updateCacheIndex = choiceSize + kMissingOffset;
781  return Resolution(nullptr);
782  }
783 
784  void
786  Principal const& principal,
787  bool skipCurrentProcess,
788  ServiceToken const& token,
790  ModuleCallingContext const* mcc) const {
791  bool timeToMakeAtEnd = true;
792  if(madeAtEnd_ and mcc) {
793  timeToMakeAtEnd = mcc->parent().isAtEndTransition();
794  }
795 
796  //If timeToMakeAtEnd is false, then it is equivalent to skipping the current process
797  if(not skipCurrentProcess and timeToMakeAtEnd) {
798  waitingTasks_.add(waitTask);
799 
800  bool expected = false;
801  if( prefetchRequested_.compare_exchange_strong(expected,true)) {
802  //we are the first thread to request
803  tryPrefetchResolverAsync(0, principal, false, sra, mcc, token);
804  }
805  } else {
806  skippingWaitingTasks_.add(waitTask);
807  bool expected = false;
808  if( skippingPrefetchRequested_.compare_exchange_strong(expected,true)) {
809  //we are the first thread to request
810  tryPrefetchResolverAsync(0, principal, true, sra, mcc, token);
811  }
812  }
813  }
814 
815  void NoProcessProductResolver::setCache(bool iSkipCurrentProcess,
816  ProductResolverIndex iIndex,
817  std::exception_ptr iExceptPtr) const {
818  if( not iSkipCurrentProcess) {
819  lastCheckIndex_ = iIndex;
820  waitingTasks_.doneWaiting(iExceptPtr);
821  } else {
824  }
825  }
826 
827  namespace {
828  class TryNextResolverWaitingTask : public edm::WaitingTask {
829  public:
830 
831  TryNextResolverWaitingTask(NoProcessProductResolver const* iResolver,
832  unsigned int iResolverIndex,
833  Principal const* iPrincipal,
835  ModuleCallingContext const* iMCC,
836  bool iSkipCurrentProcess,
837  ServiceToken iToken) :
838  resolver_(iResolver),
839  principal_(iPrincipal),
840  sra_(iSRA),
841  mcc_(iMCC),
842  serviceToken_(iToken),
843  index_(iResolverIndex),
844  skipCurrentProcess_(iSkipCurrentProcess){}
845 
846  tbb::task* execute() override {
847  auto exceptPtr =exceptionPtr();
848  if(exceptPtr) {
849  resolver_->prefetchFailed(index_, *principal_, skipCurrentProcess_, *exceptPtr);
850  } else {
851  if(not resolver_->dataValidFromResolver(index_, *principal_, skipCurrentProcess_)) {
852  resolver_->tryPrefetchResolverAsync(index_+1,
853  *principal_,
854  skipCurrentProcess_,
855  sra_,
856  mcc_,
857  serviceToken_);
858  }
859  }
860  return nullptr;
861  }
862 
863  private:
864  NoProcessProductResolver const* resolver_;
865  Principal const* principal_;
867  ModuleCallingContext const* mcc_;
868  ServiceToken serviceToken_;
869  unsigned int index_;
870  bool skipCurrentProcess_;
871  };
872  }
873 
874  void
875  NoProcessProductResolver::prefetchFailed(unsigned int iProcessingIndex,
876  Principal const& principal,
877  bool iSkipCurrentProcess,
878  std::exception_ptr iExceptPtr) const {
879  std::vector<unsigned int> const& lookupProcessOrder = principal.lookupProcessOrder();
880  auto k = lookupProcessOrder[iProcessingIndex];
881 
882  setCache(iSkipCurrentProcess, k, iExceptPtr);
883  }
884 
885 
886  bool
887  NoProcessProductResolver::dataValidFromResolver(unsigned int iProcessingIndex,
888  Principal const& principal,
889  bool iSkipCurrentProcess) const {
890  std::vector<unsigned int> const& lookupProcessOrder = principal.lookupProcessOrder();
891  auto k = lookupProcessOrder[iProcessingIndex];
892  ProductResolverBase const* productResolver = principal.getProductResolverByIndex(matchingHolders_[k]);
893 
894  if(productResolver->productWasFetchedAndIsValid(iSkipCurrentProcess)) {
895 
896  setCache(iSkipCurrentProcess, k, nullptr);
897  return true;
898  }
899  return false;
900  }
901 
902 
903  void
905  Principal const& principal,
906  bool skipCurrentProcess,
908  ModuleCallingContext const* mcc,
909  ServiceToken token) const {
910  std::vector<unsigned int> const& lookupProcessOrder = principal.lookupProcessOrder();
911  auto index = iProcessingIndex;
912 
913  const unsigned int choiceSize = ambiguous_.size();
914  unsigned int newCacheIndex = choiceSize + kMissingOffset;
915  while(index < lookupProcessOrder.size()) {
916  auto k = lookupProcessOrder[index];
917  if(k==0) {
918  break;
919  }
920  assert(k < ambiguous_.size());
921  if(ambiguous_[k]) {
922  newCacheIndex = choiceSize + kAmbiguousOffset;
923  break;
924  }
926  //make new task
927 
928  auto task = new (tbb::task::allocate_root()) TryNextResolverWaitingTask(
929  this,
930  index,
931  &principal,
932  sra,
933  mcc,
934  skipCurrentProcess,
935  token
936  );
937  task->increment_ref_count();
938  ProductResolverBase const* productResolver = principal.getProductResolverByIndex(matchingHolders_[k]);
939 
940  //Make sure the Services are available on this thread
941  ServiceRegistry::Operate guard(token);
942 
943  productResolver->prefetchAsync(task,
944  principal,
945  skipCurrentProcess,
946  token,
947  sra, mcc);
948  if(0 == task->decrement_ref_count()) {
949  tbb::task::spawn(*task);
950  }
951  return;
952  }
953  ++index;
954  }
955  //data product unavailable
956  setCache(skipCurrentProcess, newCacheIndex, nullptr);
957  }
958 
960  }
961 
963  }
964 
966  return nullptr;
967  }
968 
969  inline unsigned int NoProcessProductResolver::unsetIndexValue() const { return ambiguous_.size()+kUnsetOffset; }
970 
972  const auto resetValue = unsetIndexValue();
973  lastCheckIndex_ = resetValue;
974  lastSkipCurrentCheckIndex_ = resetValue;
975  prefetchRequested_ = false;
979  }
980 
982  return false;
983  }
984 
987  << "NoProcessProductResolver::unscheduledWasNotRun_() not implemented and should never be called.\n"
988  << "Contact a Framework developer\n";
989  }
990 
993  << "NoProcessProductResolver::productUnavailable_() not implemented and should never be called.\n"
994  << "Contact a Framework developer\n";
995  }
996 
999  << "NoProcessProductResolver::productResolved_() not implemented and should never be called.\n"
1000  << "Contact a Framework developer\n";
1001  }
1002 
1005  << "NoProcessProductResolver::productWasDeleted_() not implemented and should never be called.\n"
1006  << "Contact a Framework developer\n";
1007  }
1008 
1009  bool NoProcessProductResolver::productWasFetchedAndIsValid_(bool /*iSkipCurrentProcess*/) const {
1011  << "NoProcessProductResolver::productWasFetchedAndIsValid_() not implemented and should never be called.\n"
1012  << "Contact a Framework developer\n";
1013  }
1014 
1015  void NoProcessProductResolver::putProduct_(std::unique_ptr<WrapperBase> ) const {
1017  << "NoProcessProductResolver::putProduct_() not implemented and should never be called.\n"
1018  << "Contact a Framework developer\n";
1019  }
1020 
1021  void NoProcessProductResolver::putOrMergeProduct_(std::unique_ptr<WrapperBase> edp, MergeableRunProductMetadata const*) const {
1023  << "NoProcessProductResolver::putOrMergeProduct_(std::unique_ptr<WrapperBase> edp, MergeableRunProductMetadata const*) not implemented and should never be called.\n"
1024  << "Contact a Framework developer\n";
1025  }
1026 
1029  << "NoProcessProductResolver::branchDescription_() not implemented and should never be called.\n"
1030  << "Contact a Framework developer\n";
1031  }
1032 
1033  void NoProcessProductResolver::resetBranchDescription_(std::shared_ptr<BranchDescription const>) {
1035  << "NoProcessProductResolver::resetBranchDescription_() not implemented and should never be called.\n"
1036  << "Contact a Framework developer\n";
1037  }
1038 
1041  << "NoProcessProductResolver::provenance_() not implemented and should never be called.\n"
1042  << "Contact a Framework developer\n";
1043  }
1044 
1047  << "NoProcessProductResolver::connectTo() not implemented and should never be called.\n"
1048  << "Contact a Framework developer\n";
1049 
1050  }
1051 
1052  //---- SingleChoiceNoProcessProductResolver ----------------
1054  Principal const& principal,
1055  bool skipCurrentProcess,
1057  ModuleCallingContext const* mcc) const
1058  {
1059  //NOTE: Have to lookup the other ProductResolver each time rather than cache
1060  // it's pointer since it appears the pointer can change at some later stage
1061  return principal.getProductResolverByIndex(realResolverIndex_)
1062  ->resolveProduct(principal,
1063  skipCurrentProcess, sra, mcc);
1064  }
1065 
1067  Principal const& principal,
1068  bool skipCurrentProcess,
1069  ServiceToken const& token,
1071  ModuleCallingContext const* mcc) const {
1072  principal.getProductResolverByIndex(realResolverIndex_)
1073  ->prefetchAsync(waitTask,principal,
1074  skipCurrentProcess, token, sra, mcc);
1075  }
1076 
1078  }
1079 
1081  }
1082 
1084  return nullptr;
1085  }
1086 
1088  }
1089 
1091  return false;
1092  }
1093 
1096  << "SingleChoiceNoProcessProductResolver::unscheduledWasNotRun_() not implemented and should never be called.\n"
1097  << "Contact a Framework developer\n";
1098  }
1099 
1102  << "SingleChoiceNoProcessProductResolver::productUnavailable_() not implemented and should never be called.\n"
1103  << "Contact a Framework developer\n";
1104  }
1105 
1108  << "SingleChoiceNoProcessProductResolver::productResolved_() not implemented and should never be called.\n"
1109  << "Contact a Framework developer\n";
1110  }
1111 
1114  << "SingleChoiceNoProcessProductResolver::productWasDeleted_() not implemented and should never be called.\n"
1115  << "Contact a Framework developer\n";
1116  }
1117 
1120  << "SingleChoiceNoProcessProductResolver::productWasFetchedAndIsValid_() not implemented and should never be called.\n"
1121  << "Contact a Framework developer\n";
1122  }
1123 
1124 
1125  void SingleChoiceNoProcessProductResolver::putProduct_(std::unique_ptr<WrapperBase> ) const {
1127  << "SingleChoiceNoProcessProductResolver::putProduct_() not implemented and should never be called.\n"
1128  << "Contact a Framework developer\n";
1129  }
1130 
1131  void SingleChoiceNoProcessProductResolver::putOrMergeProduct_(std::unique_ptr<WrapperBase> edp, MergeableRunProductMetadata const*) const {
1133  << "SingleChoiceNoProcessProductResolver::putOrMergeProduct_(std::unique_ptr<WrapperBase> edp, MergeableRunProductMetadata const*) not implemented and should never be called.\n"
1134  << "Contact a Framework developer\n";
1135  }
1136 
1139  << "SingleChoiceNoProcessProductResolver::branchDescription_() not implemented and should never be called.\n"
1140  << "Contact a Framework developer\n";
1141  }
1142 
1143  void SingleChoiceNoProcessProductResolver::resetBranchDescription_(std::shared_ptr<BranchDescription const>) {
1145  << "SingleChoiceNoProcessProductResolver::resetBranchDescription_() not implemented and should never be called.\n"
1146  << "Contact a Framework developer\n";
1147  }
1148 
1151  << "SingleChoiceNoProcessProductResolver::provenance_() not implemented and should never be called.\n"
1152  << "Contact a Framework developer\n";
1153  }
1154 
1157  << "SingleChoiceNoProcessProductResolver::connectTo() not implemented and should never be called.\n"
1158  << "Contact a Framework developer\n";
1159 
1160  }
1161 
1162 }
size
Write out results.
void connectTo(ProductResolverBase const &iOther, Principal const *) final
Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
std::string const & branchName() const
std::string const & productInstanceName() const
std::unique_ptr< T, F > make_sentry(T *iObject, F iFunc)
NOTE: if iObject is null, then iFunc will not be called.
Definition: make_sentry.h:29
void resetProductData_(bool deleteEarly) override
Provenance const * provenance() const
BranchType const & branchType() const
void setProcessHistory(ProcessHistory const &ph)
Definition: ProductData.h:61
unsigned int ProductResolverIndex
StreamContext const * getStreamContext() const
void resetProductData_(bool deleteEarly) override
void checkType(WrapperBase const &prod) const
std::type_info const & dynamicTypeInfo() const
Definition: WrapperBase.h:46
void prefetchAsync(WaitingTask *waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const
unsigned int unsetIndexValue() const
void resetProductData_(bool deleteEarly) override
void setMergeableRunProductMetadata_(MergeableRunProductMetadata const *) override
void prefetchAsync_(WaitingTask *waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
FunctorTask< F > * make_functor_task(ALLOC &&iAlloc, F f)
Definition: FunctorTask.h:47
void setCache(bool skipCurrentProcess, ProductResolverIndex index, std::exception_ptr exceptionPtr) const
void add(WaitingTask *)
Adds task to the waiting list.
void setProcessHistory_(ProcessHistory const &ph) override
ProductStatus status() const
bool singleProduct_() const override
bool unscheduledWasNotRun_() const override
void putProduct_(std::unique_ptr< WrapperBase > edp) const override
Resolution tryResolver(unsigned int index, Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const
WrapperBase const * wrapper() const
Definition: ProductData.h:34
std::string const & processName() const
void resetProductData_(bool deleteEarly) override
ProductProvenance const * productProvenance() const
Definition: Provenance.cc:35
void push(T &&iAction)
asynchronously pushes functor iAction into queue
Resolution resolveProduct(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const
void setProvenance_(ProductProvenanceRetriever const *provRetriever, ProcessHistory const &ph, ProductID const &pid) override
void reset()
Resets access to the resource so that added tasks will wait.
void connectTo(ProductResolverBase const &iOther, Principal const *) final
BranchDescription const & branchDescription_() const override
bool productResolved_() const final
static unsigned int kUnsetOffset
void resetProductData_(bool deleteEarly) override
std::string const & processName() const
void putOrMergeProduct_(std::unique_ptr< WrapperBase > prod, MergeableRunProductMetadata const *mergeableRunProductMetadata) const final
void connectTo(ProductResolverBase const &, Principal const *) final
#define constexpr
void putProduct_(std::unique_ptr< WrapperBase > edp) const override
bool isPresent() const
Definition: WrapperBase.h:30
void putOrMergeProduct(std::unique_ptr< WrapperBase > edp, MergeableRunProductMetadata const *mergeableRunProductMetadata=nullptr) const
ProductProvenance const * productProvenancePtr_() const override
static unsigned int kMissingOffset
void putProduct_(std::unique_ptr< WrapperBase > edp) const override
void putProduct(std::unique_ptr< WrapperBase > edp) const
void unsafe_setWrapper(std::unique_ptr< WrapperBase > iValue) const
Definition: ProductData.cc:38
void prefetchFailed(unsigned int iProcessingIndex, Principal const &principal, bool iSkipCurrentProcess, std::exception_ptr iExceptPtr) const
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
void setProvenance(ProductProvenanceRetriever const *provRetriever, ProcessHistory const &ph, ProductID const &pid)
Definition: ProductData.h:65
def principal(options)
bool isFromCurrentProcess() const final
void mergeProduct(std::unique_ptr< WrapperBase > edp, MergeableRunProductMetadata const *) const
std::string const & className() const
std::atomic< unsigned int > lastSkipCurrentCheckIndex_
void resetProductData()
Definition: ProductData.h:53
bool singleProduct_() const override
Provenance const * provenance_() const override
std::vector< unsigned int > const & lookupProcessOrder() const
Definition: Principal.h:191
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
void putOrMergeProduct_(std::unique_ptr< WrapperBase > prod, MergeableRunProductMetadata const *mergeableRunProductMetadata) const final
Provenance const * provenance_() const override
bool dataValidFromResolver(unsigned int iProcessingIndex, Principal const &principal, bool iSkipCurrentProcess) const
void setProvenance_(ProductProvenanceRetriever const *provRetriever, ProcessHistory const &ph, ProductID const &pid) override
TypeID unwrappedTypeID() const
void resetProductData_(bool deleteEarly) override
BranchDescription const & branchDescription_() const final
void setProcessHistory_(ProcessHistory const &ph) override
ProductProvenance const * productProvenancePtr_() const override
bool isFromCurrentProcess() const final
bool productUnavailable_() const override
void resetBranchDescription_(std::shared_ptr< BranchDescription const > bd) override
ProductProvenance const * productProvenancePtr_() const override
MergeDecision getMergeDecision(std::string const &processThatCreatedProduct) const
std::atomic< bool > prefetchRequested_
ProductProvenance const * productProvenancePtr_() const override
std::atomic< unsigned int > lastCheckIndex_
TypeWithDict const & unwrappedType() const
void setProvenance_(ProductProvenanceRetriever const *provRetriever, ProcessHistory const &ph, ProductID const &pid) final
void prefetchAsync_(WaitingTask *waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger but the state exists so we define the behavior If all triggers are the negative crieriion will lead to accepting the event(this again matches the behavior of"!*"before the partial wildcard feature was incorporated).The per-event"cost"of each negative criterion with multiple relevant triggers is about the same as!*was in the past
void setMergeableRunProductMetadata(MergeableRunProductMetadataBase const *mrpm)
Definition: ProductData.h:71
std::type_info const & unvalidatedTypeInfo() const
Definition: TypeWithDict.h:76
void putProduct_(std::unique_ptr< WrapperBase > edp) const override
virtual bool isFromCurrentProcess() const =0
Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
int k[5][pyjets_maxn]
void setProduct(std::unique_ptr< WrapperBase > edp) const
void prefetchAsync_(WaitingTask *waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
NoProcessProductResolver(std::vector< ProductResolverIndex > const &matchingHolders, std::vector< bool > const &ambiguous, bool madeAtEnd)
std::atomic< bool > skippingPrefetchRequested_
std::atomic< ProductStatus > theStatus_
std::vector< bool > ambiguous_
Resolution resolveProductImpl(FUNC resolver) const
DelayedReader * reader() const
Definition: Principal.h:183
void putOrMergeProduct_(std::unique_ptr< WrapperBase > prod, MergeableRunProductMetadata const *mergeableRunProductMetadata) const final
UnscheduledAuxiliary const * auxiliary() const
void resetProductData_(bool deleteEarly) override
static unsigned int kAmbiguousOffset
void setupUnscheduled(UnscheduledConfigurator const &) final
ProductStatus defaultStatus() const
bool productWasFetchedAndIsValid(bool iSkipCurrentProcess) const
void setMergeableRunProductMetadataInProductData(MergeableRunProductMetadata const *)
bool productWasFetchedAndIsValid_(bool iSkipCurrentProcess) const final
void setProcessHistory_(ProcessHistory const &ph) override
void setupUnscheduled(UnscheduledConfigurator const &) final
def load(fileName)
Definition: svgfig.py:546
BranchDescription const & branchDescription_() const override
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:92
void putOrMergeProduct_(std::unique_ptr< WrapperBase > prod, MergeableRunProductMetadata const *mergeableRunProductMetadata) const final
void addContext(std::string const &context)
Definition: Exception.cc:227
bool singleProduct_() const override
void retrieveAndMerge_(Principal const &principal, MergeableRunProductMetadata const *mergeableRunProductMetadata) const override
std::vector< ProductResolverIndex > matchingHolders_
void resetBranchDescription_(std::shared_ptr< BranchDescription const > bd) override
void tryPrefetchResolverAsync(unsigned int iProcessingIndex, Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc, ServiceToken token) const
HLT enums.
BranchDescription const & branchDescription() const
Worker * findWorker(std::string const &iLabel) const
ConstProductResolverPtr getProductResolverByIndex(ProductResolverIndex const &oid) const
Definition: Principal.cc:520
ProductProvenance const * productProvenancePtr_() const final
void putProduct_(std::unique_ptr< WrapperBase > edp) const override
void setProvenance_(ProductProvenanceRetriever const *provRetriever, ProcessHistory const &ph, ProductID const &pid) override
void setProcessHistory_(ProcessHistory const &ph) override
void putProduct_(std::unique_ptr< WrapperBase > edp) const override
void setupUnscheduled(UnscheduledConfigurator const &) final
ProductData const & getProductData() const
bool productWasDeleted_() const override
Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
bool productWasFetchedAndIsValid_(bool iSkipCurrentProcess) const override
void resetProductData_(bool deleteEarly) override
Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
void setProvenance_(ProductProvenanceRetriever const *provRetriever, ProcessHistory const &ph, ProductID const &pid) override
void setProcessHistory_(ProcessHistory const &ph) final
bool productWasFetchedAndIsValid_(bool iSkipCurrentProcess) const override
void prefetchAsync_(WaitingTask *waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
void putOrMergeProduct_(std::unique_ptr< WrapperBase > prod, MergeableRunProductMetadata const *mergeableRunProductMetadata) const final
def move(src, dest)
Definition: eostools.py:511
def branchType(schema, name)
Definition: revisionDML.py:113
std::string const & moduleLabel() const
WrapperBase * unsafe_wrapper() const
Definition: ProductData.h:36
static HepMC::HEPEVT_Wrapper wrapper
void putProduct_(std::unique_ptr< WrapperBase > edp) const override
void prefetchAsync_(WaitingTask *waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override