CMS 3D CMS Logo

List of all members | Public Member Functions | Private Member Functions | Private Attributes
edm::InputProductResolver Class Reference

#include <ProductResolvers.h>

Inheritance diagram for edm::InputProductResolver:
edm::DataManagingProductResolver edm::ProductResolverBase

Public Member Functions

 InputProductResolver (std::shared_ptr< BranchDescription const > bd)
 
void setupUnscheduled (UnscheduledConfigurator const &) final
 
- Public Member Functions inherited from edm::DataManagingProductResolver
void connectTo (ProductResolverBase const &, Principal const *) final
 
 DataManagingProductResolver (std::shared_ptr< BranchDescription const > bd, ProductStatus iDefaultStatus)
 
void resetStatus ()
 
template<bool callResolver, typename FUNC >
ProductResolverBase::Resolution resolveProductImpl (FUNC resolver) const
 
- Public Member Functions inherited from edm::ProductResolverBase
BranchDescription const & branchDescription () const
 
std::string const & moduleLabel () const
 
ProductResolverBaseoperator= (ProductResolverBase const &)=delete
 
void prefetchAsync (WaitingTask *waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const
 
std::string const & processName () const
 
ProductID const & productID () const
 
std::string const & productInstanceName () const
 
ProductProvenance const * productProvenancePtr () const
 
bool productResolved () const
 
 ProductResolverBase ()
 
 ProductResolverBase (ProductResolverBase const &)=delete
 
TypeID productType () const
 
bool productUnavailable () const
 
bool productWasDeleted () const
 
bool productWasFetchedAndIsValid (bool iSkipCurrentProcess) const
 
Provenance const * provenance () const
 
bool provenanceAvailable () const
 
void putOrMergeProduct (std::unique_ptr< WrapperBase > edp, MergeableRunProductMetadata const *mergeableRunProductMetadata=nullptr) const
 
void putProduct (std::unique_ptr< WrapperBase > edp) const
 
void resetBranchDescription (std::shared_ptr< BranchDescription const > bd)
 
void resetProductData ()
 
std::string const & resolvedModuleLabel () const
 
Resolution resolveProduct (Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const
 
void retrieveAndMerge (Principal const &principal, MergeableRunProductMetadata const *mergeableRunProductMetadata) const
 
void setMergeableRunProductMetadata (MergeableRunProductMetadata const *mrpm)
 
void setProcessHistory (ProcessHistory const &ph)
 
void setProvenance (ProductProvenanceRetriever const *provRetriever, ProcessHistory const &ph, ProductID const &pid)
 
bool singleProduct () const
 
StableProvenance const * stableProvenance () const
 
void unsafe_deleteProduct () const
 
bool unscheduledWasNotRun () const
 
void write (std::ostream &os) const
 
virtual ~ProductResolverBase ()
 

Private Member Functions

bool isFromCurrentProcess () const final
 
void prefetchAsync_ (WaitingTask *waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
 
void putProduct_ (std::unique_ptr< WrapperBase > edp) const override
 
void resetProductData_ (bool deleteEarly) override
 
Resolution resolveProduct_ (Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
 
void retrieveAndMerge_ (Principal const &principal, MergeableRunProductMetadata const *mergeableRunProductMetadata) const override
 
void setMergeableRunProductMetadata_ (MergeableRunProductMetadata const *) override
 
bool unscheduledWasNotRun_ () const final
 

Private Attributes

UnscheduledAuxiliary const * aux_
 
std::atomic< bool > m_prefetchRequested
 
WaitingTaskList m_waitingTasks
 

Additional Inherited Members

- Public Types inherited from edm::DataManagingProductResolver
enum  ProductStatus {
  ProductStatus::ProductSet, ProductStatus::NotPut, ProductStatus::ResolveFailed, ProductStatus::ResolveNotRun,
  ProductStatus::ProductDeleted
}
 
- Protected Member Functions inherited from edm::DataManagingProductResolver
ProductStatus defaultStatus () const
 
template<bool callResolver, typename FUNC >
Resolution resolveProductImpl (FUNC resolver) const
 
void setFailedStatus () const
 
void setMergeableRunProductMetadataInProductData (MergeableRunProductMetadata const *)
 
void setProduct (std::unique_ptr< WrapperBase > edp) const
 
ProductStatus status () const
 

Detailed Description

Definition at line 98 of file ProductResolvers.h.

Constructor & Destructor Documentation

edm::InputProductResolver::InputProductResolver ( std::shared_ptr< BranchDescription const >  bd)
inlineexplicit

Member Function Documentation

bool edm::InputProductResolver::isFromCurrentProcess ( ) const
finalprivatevirtual

Implements edm::DataManagingProductResolver.

Definition at line 311 of file ProductResolvers.cc.

311  {
312  return false;
313  }
void edm::InputProductResolver::prefetchAsync_ ( WaitingTask waitTask,
Principal const &  principal,
bool  skipCurrentProcess,
ServiceToken const &  token,
SharedResourcesAcquirer sra,
ModuleCallingContext const *  mcc 
) const
overrideprivatevirtual

Implements edm::ProductResolverBase.

Definition at line 245 of file ProductResolvers.cc.

References edm::ProductResolverBase::branchDescription(), edm::InEvent, edm::make_functor_task(), cmsRelvalreport::principal(), edm::ProductResolverBase::productResolved(), edm::SerialTaskQueueChain::push(), edm::ProductResolverBase::putProduct(), createBeamHaloJobs::queue, matplotRender::reader, and lumiQTWidget::t.

250  {
251  m_waitingTasks.add(waitTask);
252 
253  bool expected = false;
254  if( m_prefetchRequested.compare_exchange_strong(expected, true) ) {
255 
256  auto workToDo = [this, mcc, &principal, token] () {
257  //need to make sure Service system is activated on the reading thread
258  ServiceRegistry::Operate guard(token);
259  try {
260  resolveProductImpl<true>([this,&principal,mcc]() {
261  if(principal.branchType() != InEvent) { return; }
262  if(auto reader = principal.reader()) {
263  std::unique_lock<std::recursive_mutex> guard;
264  if(auto sr = reader->sharedResources().second) {
265  guard =std::unique_lock<std::recursive_mutex>(*sr);
266  }
267  if ( not productResolved()) {
268  //another thread could have finished this while we were waiting
269  putProduct( reader->getProduct(branchDescription().branchID(), &principal, mcc));
270  }
271  }
272  });
273  } catch(...) {
274  this->m_waitingTasks.doneWaiting(std::current_exception());
275  return;
276  }
277  this->m_waitingTasks.doneWaiting(nullptr);
278  };
279 
280  SerialTaskQueueChain* queue = nullptr;
281  if(auto reader = principal.reader()) {
282  if (auto shared_res = reader->sharedResources().first) {
283  queue = &(shared_res->serialQueueChain());
284  }
285  }
286  if(queue) {
287  queue->push(workToDo);
288  } else {
289  //Have to create a new task
290  auto t = make_functor_task(tbb::task::allocate_root(),
291  workToDo);
292  tbb::task::spawn(*t);
293  }
294  }
295  }
FunctorTask< F > * make_functor_task(ALLOC &&iAlloc, F f)
Definition: FunctorTask.h:47
void add(WaitingTask *)
Adds task to the waiting list.
WaitingTaskList m_waitingTasks
void putProduct(std::unique_ptr< WrapperBase > edp) const
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
def principal(options)
std::atomic< bool > m_prefetchRequested
BranchDescription const & branchDescription() const
void edm::InputProductResolver::putProduct_ ( std::unique_ptr< WrapperBase edp) const
overrideprivatevirtual

Implements edm::ProductResolverBase.

Definition at line 501 of file ProductResolvers.cc.

References eostools::move(), edm::ProductResolverBase::productResolved(), and edm::DataManagingProductResolver::setProduct().

501  {
502  if ( not productResolved()) {
503  //Another thread could have set this
504  setProduct(std::move(edp));
505  }
506  }
void setProduct(std::unique_ptr< WrapperBase > edp) const
def move(src, dest)
Definition: eostools.py:511
void edm::InputProductResolver::resetProductData_ ( bool  deleteEarly)
overrideprivatevirtual

Reimplemented from edm::DataManagingProductResolver.

Definition at line 298 of file ProductResolvers.cc.

References edm::DataManagingProductResolver::resetProductData_().

298  {
299  m_prefetchRequested = false;
302  }
void resetProductData_(bool deleteEarly) override
WaitingTaskList m_waitingTasks
void reset()
Resets access to the resource so that added tasks will wait.
std::atomic< bool > m_prefetchRequested
ProductResolverBase::Resolution edm::InputProductResolver::resolveProduct_ ( Principal const &  principal,
bool  skipCurrentProcess,
SharedResourcesAcquirer sra,
ModuleCallingContext const *  mcc 
) const
overrideprivatevirtual

Implements edm::ProductResolverBase.

Definition at line 160 of file ProductResolvers.cc.

References edm::ProductResolverBase::branchDescription(), revisionDML::branchType(), edm::ModuleCallingContext::getStreamContext(), edm::InEvent, edm::make_sentry(), cmsRelvalreport::principal(), edm::ProductResolverBase::productResolved(), edm::ProductResolverBase::putProduct(), and matplotRender::reader.

163  {
164  return resolveProductImpl<true>([this,&principal,mcc]() {
165  auto branchType = principal.branchType();
166  if(branchType != InEvent) {
167  //delayed get has not been allowed with Run or Lumis
168  // The file may already be closed so the reader is invalid
169  return;
170  }
171  if(mcc and (branchType == InEvent) and aux_) {
172  aux_->preModuleDelayedGetSignal_.emit(*(mcc->getStreamContext()),*mcc);
173  }
174 
175  auto sentry( make_sentry(mcc,
176  [this, branchType](ModuleCallingContext const* iContext){
177  if(branchType == InEvent and aux_) {
178  aux_->postModuleDelayedGetSignal_.emit(*(iContext->getStreamContext()), *iContext); }
179  }));
180 
181  if(auto reader=principal.reader()) {
182  std::unique_lock<std::recursive_mutex> guard;
183  if(auto sr = reader->sharedResources().second) {
184  guard =std::unique_lock<std::recursive_mutex>(*sr);
185  }
186  if ( not productResolved()) {
187  //another thread could have beaten us here
188  putProduct( reader->getProduct(branchDescription().branchID(), &principal, mcc));
189  }
190  }
191  });
192 
193  }
std::unique_ptr< T, F > make_sentry(T *iObject, F iFunc)
NOTE: if iObject is null, then iFunc will not be called.
Definition: make_sentry.h:29
UnscheduledAuxiliary const * aux_
void putProduct(std::unique_ptr< WrapperBase > edp) const
def principal(options)
BranchDescription const & branchDescription() const
def branchType(schema, name)
Definition: revisionDML.py:113
void edm::InputProductResolver::retrieveAndMerge_ ( Principal const &  principal,
MergeableRunProductMetadata const *  mergeableRunProductMetadata 
) const
overrideprivatevirtual

Reimplemented from edm::ProductResolverBase.

Definition at line 196 of file ProductResolvers.cc.

References edm::ProductResolverBase::branchDescription(), edm::BranchDescription::branchName(), edm::BranchDescription::branchType(), edm::DataManagingProductResolver::defaultStatus(), Exception, edm::InRun, edm::errors::LogicError, edm::errors::MismatchedInputFiles, eostools::move(), edm::DataManagingProductResolver::ProductSet, edm::ProductResolverBase::putOrMergeProduct(), edm::Principal::reader(), matplotRender::reader, edm::DataManagingProductResolver::ResolveFailed, edm::DataManagingProductResolver::setFailedStatus(), and edm::DataManagingProductResolver::status().

196  {
197 
198  if(auto reader = principal.reader()) {
199 
200  std::unique_lock<std::recursive_mutex> guard;
201  if(auto sr = reader->sharedResources().second) {
202  guard =std::unique_lock<std::recursive_mutex>(*sr);
203  }
204 
205  //Can't use resolveProductImpl since it first checks to see
206  // if the product was already retrieved and then returns if it is
207  std::unique_ptr<WrapperBase> edp(reader->getProduct(branchDescription().branchID(), &principal));
208 
209  if (edp.get() != nullptr) {
210  if (edp->isMergeable() && branchDescription().branchType() == InRun && !edp->hasSwap()) {
212  << "Missing definition of member function swap for branch name " << branchDescription().branchName() << "\n"
213  << "Mergeable data types written to a Run must have the swap member function defined" << "\n";
214  }
215  if (status() == defaultStatus() ||
217  (status() == ProductStatus::ResolveFailed && !branchDescription().isMergeable())) {
218  putOrMergeProduct(std::move(edp), mergeableRunProductMetadata);
219  } else { // status() == ResolveFailed && branchDescription().isMergeable()
221  << "Merge of Run or Lumi product failed for branch " << branchDescription().branchName() << "\n"
222  << "The product branch was dropped in the first run or lumi fragment and present in a later one" << "\n"
223  << "The solution is to drop the branch on input. Or better do not create inconsistent files\n"
224  << "that need to be merged in the first place.\n";
225  }
226  } else if (status() == defaultStatus()) {
227  setFailedStatus();
228  } else if ( status() != ProductStatus::ResolveFailed && branchDescription().isMergeable()) {
230  << "Merge of Run or Lumi product failed for branch " << branchDescription().branchName() << "\n"
231  << "The product branch was present in first run or lumi fragment and dropped in a later one" << "\n"
232  << "The solution is to drop the branch on input. Or better do not create inconsistent files\n"
233  << "that need to be merged in the first place.\n";
234  }
235  // Do nothing in other case. status is ResolveFailed already or
236  // it is not mergeable and the status is ProductSet
237  }
238  }
std::string const & branchName() const
BranchType const & branchType() const
ProductStatus status() const
void putOrMergeProduct(std::unique_ptr< WrapperBase > edp, MergeableRunProductMetadata const *mergeableRunProductMetadata=nullptr) const
def principal(options)
ProductStatus defaultStatus() const
BranchDescription const & branchDescription() const
def move(src, dest)
Definition: eostools.py:511
void edm::InputProductResolver::setMergeableRunProductMetadata_ ( MergeableRunProductMetadata const *  mrpm)
overrideprivatevirtual

Reimplemented from edm::ProductResolverBase.

Definition at line 241 of file ProductResolvers.cc.

References edm::DataManagingProductResolver::setMergeableRunProductMetadataInProductData().

241  {
243  }
void setMergeableRunProductMetadataInProductData(MergeableRunProductMetadata const *)
void edm::InputProductResolver::setupUnscheduled ( UnscheduledConfigurator const &  iConfigure)
finalvirtual

Reimplemented from edm::ProductResolverBase.

Definition at line 305 of file ProductResolvers.cc.

References edm::UnscheduledConfigurator::auxiliary().

305  {
306  aux_ = iConfigure.auxiliary();
307  }
UnscheduledAuxiliary const * aux_
bool edm::InputProductResolver::unscheduledWasNotRun_ ( ) const
inlinefinalprivatevirtual

Implements edm::ProductResolverBase.

Definition at line 127 of file ProductResolvers.h.

References edm::DataManagingProductResolver::resetProductData_().

127 {return false;}

Member Data Documentation

UnscheduledAuxiliary const* edm::InputProductResolver::aux_
private

Definition at line 133 of file ProductResolvers.h.

std::atomic<bool> edm::InputProductResolver::m_prefetchRequested
mutableprivate

Definition at line 131 of file ProductResolvers.h.

WaitingTaskList edm::InputProductResolver::m_waitingTasks
mutableprivate

Definition at line 132 of file ProductResolvers.h.