CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
List of all members | Public Member Functions | Private Member Functions | Private Attributes
edm::InputProductResolver Class Reference

#include <ProductResolvers.h>

Inheritance diagram for edm::InputProductResolver:
edm::DataManagingProductResolver edm::ProductResolverBase

Public Member Functions

 InputProductResolver (std::shared_ptr< BranchDescription const > bd)
 
virtual void setupUnscheduled (UnscheduledConfigurator const &) overridefinal
 
- Public Member Functions inherited from edm::DataManagingProductResolver
virtual void connectTo (ProductResolverBase const &, Principal const *) overridefinal
 
 DataManagingProductResolver (std::shared_ptr< BranchDescription const > bd, ProductStatus iDefaultStatus)
 
void resetStatus ()
 
template<bool callResolver, typename FUNC >
ProductResolverBase::Resolution resolveProductImpl (FUNC resolver) const
 
- Public Member Functions inherited from edm::ProductResolverBase
BranchDescription const & branchDescription () const
 
std::string const & moduleLabel () const
 
ProductResolverBaseoperator= (ProductResolverBase const &)=delete
 
void prefetchAsync (WaitingTask *waitTask, Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const
 
std::string const & processName () const
 
ProductID const & productID () const
 
std::string const & productInstanceName () const
 
ProductProvenance const * productProvenancePtr () const
 
bool productResolved () const
 
 ProductResolverBase ()
 
 ProductResolverBase (ProductResolverBase const &)=delete
 
TypeID productType () const
 
bool productUnavailable () const
 
bool productWasDeleted () const
 
bool productWasFetchedAndIsValid (bool iSkipCurrentProcess) const
 
Provenance const * provenance () const
 
bool provenanceAvailable () const
 
void putOrMergeProduct (std::unique_ptr< WrapperBase > edp) const
 
void putProduct (std::unique_ptr< WrapperBase > edp) const
 
void resetBranchDescription (std::shared_ptr< BranchDescription const > bd)
 
virtual void resetFailedFromThisProcess ()
 
void resetProductData ()
 
std::string const & resolvedModuleLabel () const
 
Resolution resolveProduct (Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const
 
void retrieveAndMerge (Principal const &principal) const
 
void setProcessHistory (ProcessHistory const &ph)
 
void setProvenance (ProductProvenanceRetriever const *provRetriever, ProcessHistory const &ph, ProductID const &pid)
 
bool singleProduct () const
 
StableProvenance const * stableProvenance () const
 
void unsafe_deleteProduct () const
 
bool unscheduledWasNotRun () const
 
void write (std::ostream &os) const
 
virtual ~ProductResolverBase ()
 

Private Member Functions

virtual bool isFromCurrentProcess () const overridefinal
 
virtual void prefetchAsync_ (WaitingTask *waitTask, Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
 
virtual void putProduct_ (std::unique_ptr< WrapperBase > edp) const override
 
virtual void resetProductData_ (bool deleteEarly) override
 
virtual Resolution resolveProduct_ (Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
 
virtual void retrieveAndMerge_ (Principal const &principal) const override
 
virtual bool unscheduledWasNotRun_ () const overridefinal
 

Private Attributes

UnscheduledAuxiliary const * aux_
 
std::atomic< bool > m_prefetchRequested
 
WaitingTaskList m_waitingTasks
 

Additional Inherited Members

- Public Types inherited from edm::DataManagingProductResolver
enum  ProductStatus {
  ProductStatus::ProductSet, ProductStatus::NotPut, ProductStatus::ResolveFailed, ProductStatus::ResolveNotRun,
  ProductStatus::ProductDeleted
}
 
- Protected Member Functions inherited from edm::DataManagingProductResolver
ProductStatus defaultStatus () const
 
template<bool callResolver, typename FUNC >
Resolution resolveProductImpl (FUNC resolver) const
 
void setFailedStatus () const
 
void setProduct (std::unique_ptr< WrapperBase > edp) const
 
ProductStatus status () const
 

Detailed Description

Definition at line 96 of file ProductResolvers.h.

Constructor & Destructor Documentation

edm::InputProductResolver::InputProductResolver ( std::shared_ptr< BranchDescription const >  bd)
inlineexplicit

Definition at line 98 of file ProductResolvers.h.

98  :
100  m_prefetchRequested{ false },
101  aux_{nullptr} {}
UnscheduledAuxiliary const * aux_
DataManagingProductResolver(std::shared_ptr< BranchDescription const > bd, ProductStatus iDefaultStatus)
std::atomic< bool > m_prefetchRequested

Member Function Documentation

bool edm::InputProductResolver::isFromCurrentProcess ( ) const
finaloverrideprivatevirtual

Implements edm::DataManagingProductResolver.

Definition at line 256 of file ProductResolvers.cc.

256  {
257  return false;
258  }
void edm::InputProductResolver::prefetchAsync_ ( WaitingTask waitTask,
Principal const &  principal,
bool  skipCurrentProcess,
SharedResourcesAcquirer sra,
ModuleCallingContext const *  mcc 
) const
overrideprivatevirtual

Implements edm::ProductResolverBase.

Definition at line 190 of file ProductResolvers.cc.

References edm::WaitingTaskList::add(), edm::ProductResolverBase::branchDescription(), edm::WaitingTaskList::doneWaiting(), edm::InEvent, edm::ServiceRegistry::instance(), m_prefetchRequested, m_waitingTasks, edm::make_functor_task(), edm::ServiceRegistry::presentToken(), cmsRelvalreport::principal(), edm::ProductResolverBase::productResolved(), edm::SerialTaskQueueChain::push(), edm::ProductResolverBase::putProduct(), mps_fire::queue, matplotRender::reader, t, and unpackBuffers-CaloStage2::token.

194  {
195  m_waitingTasks.add(waitTask);
196 
197  bool expected = false;
198  if( m_prefetchRequested.compare_exchange_strong(expected, true) ) {
199 
200  //need to make sure Service system is activated on the reading thread
202  auto workToDo = [this, mcc, &principal, token] () {
204  try {
205  resolveProductImpl<true>([this,&principal,mcc]() {
206  if(principal.branchType() != InEvent) { return; }
207  if(auto reader = principal.reader()) {
208  std::unique_lock<std::recursive_mutex> guard;
209  if(auto sr = reader->sharedResources().second) {
210  guard =std::unique_lock<std::recursive_mutex>(*sr);
211  }
212  if ( not productResolved()) {
213  //another thread could have finished this while we were waiting
214  putProduct( reader->getProduct(BranchKey(branchDescription()), &principal, mcc));
215  }
216  }
217  });
218  } catch(...) {
219  this->m_waitingTasks.doneWaiting(std::current_exception());
220  return;
221  }
222  this->m_waitingTasks.doneWaiting(nullptr);
223  };
224 
225  SerialTaskQueueChain* queue = nullptr;
226  if(auto reader = principal.reader()) {
227  if (auto shared_res = reader->sharedResources().first) {
228  queue = &(shared_res->serialQueueChain());
229  }
230  }
231  if(queue) {
232  queue->push(workToDo);
233  } else {
234  //Have to create a new task
235  auto t = make_functor_task(tbb::task::allocate_root(),
236  workToDo);
237  tbb::task::spawn(*t);
238  }
239  }
240  }
FunctorTask< F > * make_functor_task(ALLOC &&iAlloc, F f)
void add(WaitingTask *)
Adds task to the waiting list.
WaitingTaskList m_waitingTasks
ServiceToken presentToken() const
void putProduct(std::unique_ptr< WrapperBase > edp) const
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
static ServiceRegistry & instance()
std::atomic< bool > m_prefetchRequested
BranchDescription const & branchDescription() const
void edm::InputProductResolver::putProduct_ ( std::unique_ptr< WrapperBase edp) const
overrideprivatevirtual

Implements edm::ProductResolverBase.

Definition at line 438 of file ProductResolvers.cc.

References eostools::move(), edm::ProductResolverBase::productResolved(), and edm::DataManagingProductResolver::setProduct().

438  {
439  if ( not productResolved()) {
440  //Another thread could have set this
441  setProduct(std::move(edp));
442  }
443  }
def move
Definition: eostools.py:510
void setProduct(std::unique_ptr< WrapperBase > edp) const
void edm::InputProductResolver::resetProductData_ ( bool  deleteEarly)
overrideprivatevirtual

Reimplemented from edm::DataManagingProductResolver.

Definition at line 243 of file ProductResolvers.cc.

References m_prefetchRequested, m_waitingTasks, edm::WaitingTaskList::reset(), and edm::DataManagingProductResolver::resetProductData_().

243  {
244  m_prefetchRequested = false;
247  }
virtual void resetProductData_(bool deleteEarly) override
WaitingTaskList m_waitingTasks
void reset()
Resets access to the resource so that added tasks will wait.
std::atomic< bool > m_prefetchRequested
ProductResolverBase::Resolution edm::InputProductResolver::resolveProduct_ ( Principal const &  principal,
bool  skipCurrentProcess,
SharedResourcesAcquirer sra,
ModuleCallingContext const *  mcc 
) const
overrideprivatevirtual

Implements edm::ProductResolverBase.

Definition at line 113 of file ProductResolvers.cc.

References aux_, edm::ProductResolverBase::branchDescription(), revisionDML::branchType(), edm::ModuleCallingContext::getStreamContext(), edm::InEvent, edm::make_sentry(), cmsRelvalreport::principal(), edm::ProductResolverBase::productResolved(), edm::ProductResolverBase::putProduct(), and matplotRender::reader.

116  {
117  return resolveProductImpl<true>([this,&principal,mcc]() {
118  auto branchType = principal.branchType();
119  if(branchType != InEvent) {
120  //delayed get has not been allowed with Run or Lumis
121  // The file may already be closed so the reader is invalid
122  return;
123  }
124  if(mcc and (branchType == InEvent) and aux_) {
125  aux_->preModuleDelayedGetSignal_.emit(*(mcc->getStreamContext()),*mcc);
126  }
127 
128  auto sentry( make_sentry(mcc,
129  [this, branchType](ModuleCallingContext const* iContext){
130  if(branchType == InEvent and aux_) {
131  aux_->postModuleDelayedGetSignal_.emit(*(iContext->getStreamContext()), *iContext); }
132  }));
133 
134  if(auto reader=principal.reader()) {
135  std::unique_lock<std::recursive_mutex> guard;
136  if(auto sr = reader->sharedResources().second) {
137  guard =std::unique_lock<std::recursive_mutex>(*sr);
138  }
139  if ( not productResolved()) {
140  //another thread could have beaten us here
141  putProduct( reader->getProduct(BranchKey(branchDescription()), &principal, mcc));
142  }
143  }
144  });
145 
146  }
std::unique_ptr< T, F > make_sentry(T *iObject, F iFunc)
NOTE: if iObject is null, then iFunc will not be called.
Definition: make_sentry.h:29
UnscheduledAuxiliary const * aux_
void putProduct(std::unique_ptr< WrapperBase > edp) const
BranchDescription const & branchDescription() const
void edm::InputProductResolver::retrieveAndMerge_ ( Principal const &  principal) const
overrideprivatevirtual

Reimplemented from edm::ProductResolverBase.

Definition at line 149 of file ProductResolvers.cc.

References edm::ProductResolverBase::branchDescription(), edm::DataManagingProductResolver::defaultStatus(), eostools::move(), edm::ProductResolverBase::putOrMergeProduct(), edm::Principal::reader(), matplotRender::reader, edm::DataManagingProductResolver::setFailedStatus(), and edm::DataManagingProductResolver::status().

149  {
150  if(auto reader = principal.reader()) {
151 
152  std::unique_lock<std::recursive_mutex> guard;
153  if(auto sr = reader->sharedResources().second) {
154  guard =std::unique_lock<std::recursive_mutex>(*sr);
155  }
156 
157  //Can't use resolveProductImpl since it first checks to see
158  // if the product was already retrieved and then returns if it is
159  BranchKey const bk = BranchKey(branchDescription());
160  std::unique_ptr<WrapperBase> edp(reader->getProduct(bk, &principal));
161 
162  if(edp.get() != nullptr) {
164  } else if( status()== defaultStatus()) {
165  setFailedStatus();
166  }
167  }
168  }
ProductStatus status() const
def move
Definition: eostools.py:510
ProductStatus defaultStatus() const
BranchDescription const & branchDescription() const
void putOrMergeProduct(std::unique_ptr< WrapperBase > edp) const
void edm::InputProductResolver::setupUnscheduled ( UnscheduledConfigurator const &  iConfigure)
finaloverridevirtual

Reimplemented from edm::ProductResolverBase.

Definition at line 250 of file ProductResolvers.cc.

References aux_, and edm::UnscheduledConfigurator::auxiliary().

250  {
251  aux_ = iConfigure.auxiliary();
252  }
UnscheduledAuxiliary const * aux_
virtual bool edm::InputProductResolver::unscheduledWasNotRun_ ( ) const
inlinefinaloverrideprivatevirtual

Implements edm::ProductResolverBase.

Definition at line 122 of file ProductResolvers.h.

122 {return false;}

Member Data Documentation

UnscheduledAuxiliary const* edm::InputProductResolver::aux_
private

Definition at line 128 of file ProductResolvers.h.

Referenced by resolveProduct_(), and setupUnscheduled().

std::atomic<bool> edm::InputProductResolver::m_prefetchRequested
mutableprivate

Definition at line 126 of file ProductResolvers.h.

Referenced by prefetchAsync_(), and resetProductData_().

WaitingTaskList edm::InputProductResolver::m_waitingTasks
mutableprivate

Definition at line 127 of file ProductResolvers.h.

Referenced by prefetchAsync_(), and resetProductData_().