CMS 3D CMS Logo

List of all members | Public Member Functions | Private Member Functions | Private Attributes
edm::InputProductResolver Class Reference

#include <ProductResolvers.h>

Inheritance diagram for edm::InputProductResolver:
edm::DataManagingProductResolver edm::DataManagingOrAliasProductResolver edm::ProductResolverBase

Public Member Functions

 InputProductResolver (std::shared_ptr< BranchDescription const > bd)
 
void setupUnscheduled (UnscheduledConfigurator const &) final
 
- Public Member Functions inherited from edm::DataManagingProductResolver
void connectTo (ProductResolverBase const &, Principal const *) final
 
 DataManagingProductResolver (std::shared_ptr< BranchDescription const > bd, ProductStatus iDefaultStatus)
 
void resetStatus ()
 
- Public Member Functions inherited from edm::DataManagingOrAliasProductResolver
 DataManagingOrAliasProductResolver ()
 
- Public Member Functions inherited from edm::ProductResolverBase
BranchDescription const & branchDescription () const
 
std::string const & moduleLabel () const
 
ProductResolverBaseoperator= (ProductResolverBase const &)=delete
 
void prefetchAsync (WaitingTask *waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const
 
std::string const & processName () const
 
ProductID const & productID () const
 
std::string const & productInstanceName () const
 
ProductProvenance const * productProvenancePtr () const
 
bool productResolved () const
 
 ProductResolverBase ()
 
 ProductResolverBase (ProductResolverBase const &)=delete
 
TypeID productType () const
 
bool productUnavailable () const
 
bool productWasDeleted () const
 
bool productWasFetchedAndIsValid (bool iSkipCurrentProcess) const
 
Provenance const * provenance () const
 
bool provenanceAvailable () const
 
void putOrMergeProduct (std::unique_ptr< WrapperBase > edp, MergeableRunProductMetadata const *mergeableRunProductMetadata=nullptr) const
 
void putProduct (std::unique_ptr< WrapperBase > edp) const
 
void resetBranchDescription (std::shared_ptr< BranchDescription const > bd)
 
void resetProductData ()
 
std::string const & resolvedModuleLabel () const
 
Resolution resolveProduct (Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const
 
void retrieveAndMerge (Principal const &principal, MergeableRunProductMetadata const *mergeableRunProductMetadata) const
 
void setMergeableRunProductMetadata (MergeableRunProductMetadata const *mrpm)
 
void setProductID (ProductID const &pid)
 
void setProductProvenanceRetriever (ProductProvenanceRetriever const *provRetriever)
 
bool singleProduct () const
 
StableProvenance const * stableProvenance () const
 
void unsafe_deleteProduct () const
 
bool unscheduledWasNotRun () const
 
void write (std::ostream &os) const
 
virtual ~ProductResolverBase ()
 

Private Member Functions

bool isFromCurrentProcess () const final
 
void prefetchAsync_ (WaitingTask *waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
 
void putProduct_ (std::unique_ptr< WrapperBase > edp) const override
 
void resetProductData_ (bool deleteEarly) override
 
Resolution resolveProduct_ (Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
 
void retrieveAndMerge_ (Principal const &principal, MergeableRunProductMetadata const *mergeableRunProductMetadata) const override
 
void setMergeableRunProductMetadata_ (MergeableRunProductMetadata const *) override
 
bool unscheduledWasNotRun_ () const final
 

Private Attributes

UnscheduledAuxiliary const * aux_
 
std::atomic< bool > m_prefetchRequested
 
WaitingTaskList m_waitingTasks
 

Additional Inherited Members

- Public Types inherited from edm::DataManagingProductResolver
enum  ProductStatus {
  ProductStatus::ProductSet, ProductStatus::NotPut, ProductStatus::ResolveFailed, ProductStatus::ResolveNotRun,
  ProductStatus::ProductDeleted
}
 
- Protected Member Functions inherited from edm::DataManagingProductResolver
ProductStatus defaultStatus () const
 
ProductData const & getProductData () const final
 
template<bool callResolver, typename FUNC >
ProductResolverBase::Resolution resolveProductImpl (FUNC resolver) const
 
template<bool callResolver, typename FUNC >
Resolution resolveProductImpl (FUNC resolver) const
 
void setFailedStatus () const
 
void setMergeableRunProductMetadataInProductData (MergeableRunProductMetadata const *)
 
void setProduct (std::unique_ptr< WrapperBase > edp) const
 
ProductStatus status () const
 

Detailed Description

Definition at line 104 of file ProductResolvers.h.

Constructor & Destructor Documentation

◆ InputProductResolver()

edm::InputProductResolver::InputProductResolver ( std::shared_ptr< BranchDescription const >  bd)
inlineexplicit

Member Function Documentation

◆ isFromCurrentProcess()

bool edm::InputProductResolver::isFromCurrentProcess ( ) const
finalprivatevirtual

Implements edm::DataManagingProductResolver.

Definition at line 313 of file ProductResolvers.cc.

313 { return false; }

◆ prefetchAsync_()

void edm::InputProductResolver::prefetchAsync_ ( WaitingTask waitTask,
Principal const &  principal,
bool  skipCurrentProcess,
ServiceToken const &  token,
SharedResourcesAcquirer sra,
ModuleCallingContext const *  mcc 
) const
overrideprivatevirtual

Implements edm::ProductResolverBase.

Definition at line 246 of file ProductResolvers.cc.

251  {
252  //need to try changing m_prefetchRequested before adding to m_waitingTasks
253  bool expected = false;
254  bool prefetchRequested = m_prefetchRequested.compare_exchange_strong(expected, true);
255  m_waitingTasks.add(waitTask);
256 
257  if (prefetchRequested) {
258  auto workToDo = [this, mcc, &principal, token]() {
259  //need to make sure Service system is activated on the reading thread
261  // Caught exception is propagated via WaitingTaskList
262  CMS_SA_ALLOW try {
263  resolveProductImpl<true>([this, &principal, mcc]() {
264  if (principal.branchType() != InEvent) {
265  return;
266  }
267  if (auto reader = principal.reader()) {
268  std::unique_lock<std::recursive_mutex> guard;
269  if (auto sr = reader->sharedResources().second) {
270  guard = std::unique_lock<std::recursive_mutex>(*sr);
271  }
272  if (not productResolved()) {
273  //another thread could have finished this while we were waiting
274  putProduct(reader->getProduct(branchDescription().branchID(), &principal, mcc));
275  }
276  }
277  });
278  } catch (...) {
279  this->m_waitingTasks.doneWaiting(std::current_exception());
280  return;
281  }
282  this->m_waitingTasks.doneWaiting(nullptr);
283  };
284 
285  SerialTaskQueueChain* queue = nullptr;
286  if (auto reader = principal.reader()) {
287  if (auto shared_res = reader->sharedResources().first) {
288  queue = &(shared_res->serialQueueChain());
289  }
290  }
291  if (queue) {
292  queue->push(workToDo);
293  } else {
294  //Have to create a new task
295  auto t = make_functor_task(tbb::task::allocate_root(), workToDo);
296  tbb::task::spawn(*t);
297  }
298  }
299  }

References edm::WaitingTaskList::add(), edm::ProductResolverBase::branchDescription(), CMS_SA_ALLOW, edm::WaitingTaskList::doneWaiting(), edm::InEvent, m_prefetchRequested, m_waitingTasks, edm::make_functor_task(), edm::ProductResolverBase::productResolved(), edm::ProductResolverBase::putProduct(), createBeamHaloJobs::queue, DQM::reader, submitPVValidationJobs::t, and unpackBuffers-CaloStage2::token.

◆ putProduct_()

void edm::InputProductResolver::putProduct_ ( std::unique_ptr< WrapperBase edp) const
overrideprivatevirtual

Implements edm::ProductResolverBase.

Definition at line 482 of file ProductResolvers.cc.

482  {
483  if (not productResolved()) {
484  //Another thread could have set this
485  setProduct(std::move(edp));
486  }
487  }

References eostools::move(), edm::ProductResolverBase::productResolved(), and edm::DataManagingProductResolver::setProduct().

◆ resetProductData_()

void edm::InputProductResolver::resetProductData_ ( bool  deleteEarly)
overrideprivatevirtual

Reimplemented from edm::DataManagingProductResolver.

Definition at line 301 of file ProductResolvers.cc.

301  {
302  if (not deleteEarly) {
303  m_prefetchRequested = false;
305  }
307  }

References m_prefetchRequested, m_waitingTasks, edm::WaitingTaskList::reset(), and edm::DataManagingProductResolver::resetProductData_().

◆ resolveProduct_()

ProductResolverBase::Resolution edm::InputProductResolver::resolveProduct_ ( Principal const &  principal,
bool  skipCurrentProcess,
SharedResourcesAcquirer sra,
ModuleCallingContext const *  mcc 
) const
overrideprivatevirtual

Implements edm::ProductResolverBase.

Definition at line 162 of file ProductResolvers.cc.

165  {
166  return resolveProductImpl<true>([this, &principal, mcc]() {
167  auto branchType = principal.branchType();
168  if (branchType == InLumi || branchType == InRun) {
169  //delayed get has not been allowed with Run or Lumis
170  // The file may already be closed so the reader is invalid
171  return;
172  }
173  if (mcc and (branchType == InEvent || branchType == InProcess) and aux_) {
174  aux_->preModuleDelayedGetSignal_.emit(*(mcc->getStreamContext()), *mcc);
175  }
176 
177  auto sentry(make_sentry(mcc, [this, branchType](ModuleCallingContext const* iContext) {
178  if ((branchType == InEvent || branchType == InProcess) and aux_) {
179  aux_->postModuleDelayedGetSignal_.emit(*(iContext->getStreamContext()), *iContext);
180  }
181  }));
182 
183  if (auto reader = principal.reader()) {
184  std::unique_lock<std::recursive_mutex> guard;
185  if (auto sr = reader->sharedResources().second) {
186  guard = std::unique_lock<std::recursive_mutex>(*sr);
187  }
188  if (not productResolved()) {
189  //another thread could have beaten us here
190  putProduct(reader->getProduct(branchDescription().branchID(), &principal, mcc));
191  }
192  }
193  });
194  }

References aux_, edm::ProductResolverBase::branchDescription(), edm::Principal::branchType(), edm::signalslot::Signal< T >::emit(), edm::ModuleCallingContext::getStreamContext(), edm::InEvent, edm::InLumi, edm::InProcess, edm::InRun, edm::make_sentry(), edm::UnscheduledAuxiliary::postModuleDelayedGetSignal_, edm::UnscheduledAuxiliary::preModuleDelayedGetSignal_, edm::ProductResolverBase::productResolved(), edm::ProductResolverBase::putProduct(), DQM::reader, and edm::Principal::reader().

◆ retrieveAndMerge_()

void edm::InputProductResolver::retrieveAndMerge_ ( Principal const &  principal,
MergeableRunProductMetadata const *  mergeableRunProductMetadata 
) const
overrideprivatevirtual

Reimplemented from edm::ProductResolverBase.

Definition at line 196 of file ProductResolvers.cc.

197  {
198  if (auto reader = principal.reader()) {
199  std::unique_lock<std::recursive_mutex> guard;
200  if (auto sr = reader->sharedResources().second) {
201  guard = std::unique_lock<std::recursive_mutex>(*sr);
202  }
203 
204  //Can't use resolveProductImpl since it first checks to see
205  // if the product was already retrieved and then returns if it is
206  std::unique_ptr<WrapperBase> edp(reader->getProduct(branchDescription().branchID(), &principal));
207 
208  if (edp.get() != nullptr) {
209  if (edp->isMergeable() && branchDescription().branchType() == InRun && !edp->hasSwap()) {
211  << "Missing definition of member function swap for branch name " << branchDescription().branchName()
212  << "\n"
213  << "Mergeable data types written to a Run must have the swap member function defined"
214  << "\n";
215  }
217  (status() == ProductStatus::ResolveFailed && !branchDescription().isMergeable())) {
218  putOrMergeProduct(std::move(edp), mergeableRunProductMetadata);
219  } else { // status() == ResolveFailed && branchDescription().isMergeable()
221  << "Merge of Run or Lumi product failed for branch " << branchDescription().branchName() << "\n"
222  << "The product branch was dropped in the first run or lumi fragment and present in a later one"
223  << "\n"
224  << "The solution is to drop the branch on input. Or better do not create inconsistent files\n"
225  << "that need to be merged in the first place.\n";
226  }
227  } else if (status() == defaultStatus()) {
228  setFailedStatus();
229  } else if (status() != ProductStatus::ResolveFailed && branchDescription().isMergeable()) {
231  << "Merge of Run or Lumi product failed for branch " << branchDescription().branchName() << "\n"
232  << "The product branch was present in first run or lumi fragment and dropped in a later one"
233  << "\n"
234  << "The solution is to drop the branch on input. Or better do not create inconsistent files\n"
235  << "that need to be merged in the first place.\n";
236  }
237  // Do nothing in other case. status is ResolveFailed already or
238  // it is not mergeable and the status is ProductSet
239  }
240  }

References edm::ProductResolverBase::branchDescription(), edm::BranchDescription::branchName(), edm::BranchDescription::branchType(), edm::DataManagingProductResolver::defaultStatus(), Exception, edm::InRun, edm::errors::LogicError, edm::errors::MismatchedInputFiles, eostools::move(), edm::DataManagingProductResolver::ProductSet, edm::ProductResolverBase::putOrMergeProduct(), DQM::reader, edm::Principal::reader(), edm::DataManagingProductResolver::ResolveFailed, edm::DataManagingProductResolver::setFailedStatus(), and edm::DataManagingProductResolver::status().

◆ setMergeableRunProductMetadata_()

void edm::InputProductResolver::setMergeableRunProductMetadata_ ( MergeableRunProductMetadata const *  mrpm)
overrideprivatevirtual

◆ setupUnscheduled()

void edm::InputProductResolver::setupUnscheduled ( UnscheduledConfigurator const &  iConfigure)
finalvirtual

Reimplemented from edm::ProductResolverBase.

Definition at line 309 of file ProductResolvers.cc.

309  {
310  aux_ = iConfigure.auxiliary();
311  }

References aux_, and edm::UnscheduledConfigurator::auxiliary().

◆ unscheduledWasNotRun_()

bool edm::InputProductResolver::unscheduledWasNotRun_ ( ) const
inlinefinalprivatevirtual

Implements edm::ProductResolverBase.

Definition at line 131 of file ProductResolvers.h.

131 { return false; }

Member Data Documentation

◆ aux_

UnscheduledAuxiliary const* edm::InputProductResolver::aux_
private

Definition at line 137 of file ProductResolvers.h.

Referenced by resolveProduct_(), and setupUnscheduled().

◆ m_prefetchRequested

std::atomic<bool> edm::InputProductResolver::m_prefetchRequested
mutableprivate

Definition at line 135 of file ProductResolvers.h.

Referenced by prefetchAsync_(), and resetProductData_().

◆ m_waitingTasks

WaitingTaskList edm::InputProductResolver::m_waitingTasks
mutableprivate

Definition at line 136 of file ProductResolvers.h.

Referenced by prefetchAsync_(), and resetProductData_().

edm::errors::MismatchedInputFiles
Definition: EDMException.h:52
edm::ProductResolverBase::putProduct
void putProduct(std::unique_ptr< WrapperBase > edp) const
Definition: ProductResolverBase.h:163
edm::DataManagingProductResolver::DataManagingProductResolver
DataManagingProductResolver(std::shared_ptr< BranchDescription const > bd, ProductStatus iDefaultStatus)
Definition: ProductResolvers.h:50
edm::ProductResolverBase::putOrMergeProduct
void putOrMergeProduct(std::unique_ptr< WrapperBase > edp, MergeableRunProductMetadata const *mergeableRunProductMetadata=nullptr) const
Definition: ProductResolverBase.h:166
edm::signalslot::Signal::emit
void emit(Args &&... args) const
Definition: Signal.h:48
edm::errors::LogicError
Definition: EDMException.h:37
deep_tau::DeepTauBase::BasicDiscriminator
BasicDiscriminator
Definition: DeepTauBase.h:115
edm::WaitingTaskList::add
void add(WaitingTask *)
Adds task to the waiting list.
Definition: WaitingTaskList.cc:90
edm::make_functor_task
FunctorTask< F > * make_functor_task(ALLOC &&iAlloc, F f)
Definition: FunctorTask.h:47
edm::ProductResolverBase::productResolved
bool productResolved() const
Definition: ProductResolverBase.h:90
edm::DataManagingProductResolver::setMergeableRunProductMetadataInProductData
void setMergeableRunProductMetadataInProductData(MergeableRunProductMetadata const *)
Definition: ProductResolvers.cc:564
edm::DataManagingProductResolver::defaultStatus
ProductStatus defaultStatus() const
Definition: ProductResolvers.h:65
edm::InRun
Definition: BranchType.h:11
CMS_SA_ALLOW
#define CMS_SA_ALLOW
Definition: thread_safety_macros.h:5
edm::WaitingTaskList::reset
void reset()
Resets access to the resource so that added tasks will wait.
Definition: WaitingTaskList.cc:52
edm::InProcess
Definition: BranchType.h:11
edm::DataManagingProductResolver::setProduct
void setProduct(std::unique_ptr< WrapperBase > edp) const
Definition: ProductResolvers.cc:518
DQM.reader
reader
Definition: DQM.py:105
edm::WaitingTaskList::doneWaiting
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
Definition: WaitingTaskList.cc:170
edm::InEvent
Definition: BranchType.h:11
edm::InputProductResolver::m_prefetchRequested
std::atomic< bool > m_prefetchRequested
Definition: ProductResolvers.h:135
edm::UnscheduledAuxiliary::preModuleDelayedGetSignal_
signalslot::Signal< void(StreamContext const &, ModuleCallingContext const &)> preModuleDelayedGetSignal_
Definition: UnscheduledAuxiliary.h:43
createBeamHaloJobs.queue
queue
Definition: createBeamHaloJobs.py:343
edm::ServiceRegistry::Operate
friend class Operate
Definition: ServiceRegistry.h:54
edm::InputProductResolver::aux_
UnscheduledAuxiliary const * aux_
Definition: ProductResolvers.h:137
edm::InLumi
Definition: BranchType.h:11
edm::DataManagingProductResolver::ProductStatus::ProductSet
edm::DataManagingProductResolver::ProductStatus::ResolveNotRun
edm::BranchDescription::branchType
BranchType const & branchType() const
Definition: BranchDescription.h:121
edm::BranchDescription::branchName
std::string const & branchName() const
Definition: BranchDescription.h:119
edm::DataManagingProductResolver::resetProductData_
void resetProductData_(bool deleteEarly) override
Definition: ProductResolvers.cc:573
edm::DataManagingProductResolver::status
ProductStatus status() const
Definition: ProductResolvers.h:64
eostools.move
def move(src, dest)
Definition: eostools.py:511
edm::ProductResolverBase::branchDescription
BranchDescription const & branchDescription() const
Definition: ProductResolverBase.h:110
SerialTaskQueueChain
Exception
Definition: hltDiff.cc:246
edm::DataManagingProductResolver::setFailedStatus
void setFailedStatus() const
Definition: ProductResolvers.h:66
edm::make_sentry
std::unique_ptr< T, F > make_sentry(T *iObject, F iFunc)
NOTE: if iObject is null, then iFunc will not be called.
Definition: make_sentry.h:30
edm::UnscheduledAuxiliary::postModuleDelayedGetSignal_
signalslot::Signal< void(StreamContext const &, ModuleCallingContext const &)> postModuleDelayedGetSignal_
Definition: UnscheduledAuxiliary.h:44
edm::InputProductResolver::m_waitingTasks
WaitingTaskList m_waitingTasks
Definition: ProductResolvers.h:136
submitPVValidationJobs.t
string t
Definition: submitPVValidationJobs.py:644
edm::DataManagingProductResolver::ProductStatus::ResolveFailed
unpackBuffers-CaloStage2.token
token
Definition: unpackBuffers-CaloStage2.py:318