CMS 3D CMS Logo

List of all members | Public Member Functions | Private Member Functions | Private Attributes
edm::DelayedReaderInputProductResolver Class Reference

#include <ProductResolvers.h>

Inheritance diagram for edm::DelayedReaderInputProductResolver:
edm::MergeableInputProductResolver edm::DataManagingProductResolver edm::DataManagingOrAliasProductResolver edm::ProductResolverBase

Public Member Functions

 DelayedReaderInputProductResolver (std::shared_ptr< BranchDescription const > bd)
 
void setupUnscheduled (UnscheduledConfigurator const &) final
 
- Public Member Functions inherited from edm::MergeableInputProductResolver
 MergeableInputProductResolver (std::shared_ptr< BranchDescription const > bd, ProductStatus iDefaultStatus)
 
- Public Member Functions inherited from edm::DataManagingProductResolver
void connectTo (ProductResolverBase const &, Principal const *) final
 
 DataManagingProductResolver (std::shared_ptr< BranchDescription const > bd, ProductStatus iDefaultStatus)
 
void resetStatus ()
 
template<bool callResolver, typename FUNC >
ProductResolverBase::Resolution resolveProductImpl (FUNC resolver) const
 
- Public Member Functions inherited from edm::DataManagingOrAliasProductResolver
 DataManagingOrAliasProductResolver ()
 
- Public Member Functions inherited from edm::ProductResolverBase
BranchDescription const & branchDescription () const
 
std::string const & moduleLabel () const
 
ProductResolverBaseoperator= (ProductResolverBase const &)=delete
 
void prefetchAsync (WaitingTaskHolder waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const
 
std::string const & processName () const
 
ProductID const & productID () const
 
std::string const & productInstanceName () const
 
ProductProvenance const * productProvenancePtr () const
 
bool productResolved () const
 
 ProductResolverBase ()
 
 ProductResolverBase (ProductResolverBase const &)=delete
 
TypeID productType () const
 
bool productUnavailable () const
 
bool productWasDeleted () const
 
bool productWasFetchedAndIsValid (bool iSkipCurrentProcess) const
 
Provenance const * provenance () const
 
bool provenanceAvailable () const
 
void resetBranchDescription (std::shared_ptr< BranchDescription const > bd)
 
void resetProductData ()
 
std::string const & resolvedModuleLabel () const
 
Resolution resolveProduct (Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const
 
void retrieveAndMerge (Principal const &principal, MergeableRunProductMetadata const *mergeableRunProductMetadata) const
 
void setMergeableRunProductMetadata (MergeableRunProductMetadata const *mrpm)
 
void setProductID (ProductID const &pid)
 
void setProductProvenanceRetriever (ProductProvenanceRetriever const *provRetriever)
 
bool singleProduct () const
 
StableProvenance const * stableProvenance () const
 
void unsafe_deleteProduct () const
 
bool unscheduledWasNotRun () const
 
void write (std::ostream &os) const
 
virtual ~ProductResolverBase ()
 

Private Member Functions

bool isFromCurrentProcess () const final
 
void prefetchAsync_ (WaitingTaskHolder waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
 
void resetProductData_ (bool deleteEarly) override
 
Resolution resolveProduct_ (Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
 
void retrieveAndMerge_ (Principal const &principal, MergeableRunProductMetadata const *mergeableRunProductMetadata) const override
 
void setMergeableRunProductMetadata_ (MergeableRunProductMetadata const *) override
 
bool unscheduledWasNotRun_ () const final
 

Private Attributes

UnscheduledAuxiliary const * aux_
 
std::atomic< bool > m_prefetchRequested
 
WaitingTaskList m_waitingTasks
 

Additional Inherited Members

- Public Types inherited from edm::DataManagingProductResolver
enum  ProductStatus {
  ProductStatus::ProductSet, ProductStatus::NotPut, ProductStatus::ResolveFailed, ProductStatus::ResolveNotRun,
  ProductStatus::ProductDeleted
}
 
- Protected Member Functions inherited from edm::MergeableInputProductResolver
void mergeProduct (std::shared_ptr< WrapperBase > edp, MergeableRunProductMetadata const *) const
 
void setOrMergeProduct (std::shared_ptr< WrapperBase > prod, MergeableRunProductMetadata const *mergeableRunProductMetadata) const
 
- Protected Member Functions inherited from edm::DataManagingProductResolver
void checkType (WrapperBase const &prod) const
 
ProductStatus defaultStatus () const
 
ProductData const & getProductData () const final
 
template<bool callResolver, typename FUNC >
Resolution resolveProductImpl (FUNC resolver) const
 
void setFailedStatus () const
 
void setMergeableRunProductMetadataInProductData (MergeableRunProductMetadata const *)
 
void setProduct (std::unique_ptr< WrapperBase > edp) const
 
void setProduct (std::shared_ptr< WrapperBase > edp) const
 
ProductStatus status () const
 

Detailed Description

Definition at line 119 of file ProductResolvers.h.

Constructor & Destructor Documentation

◆ DelayedReaderInputProductResolver()

edm::DelayedReaderInputProductResolver::DelayedReaderInputProductResolver ( std::shared_ptr< BranchDescription const >  bd)
inlineexplicit

Definition at line 121 of file ProductResolvers.h.

123  assert(bd->onDemand());
124  assert(not bd->produced());
125  }
MergeableInputProductResolver(std::shared_ptr< BranchDescription const > bd, ProductStatus iDefaultStatus)
assert(be >=bs)
UnscheduledAuxiliary const * aux_

Member Function Documentation

◆ isFromCurrentProcess()

bool edm::DelayedReaderInputProductResolver::isFromCurrentProcess ( ) const
finalprivatevirtual

Implements edm::DataManagingProductResolver.

Definition at line 351 of file ProductResolvers.cc.

351 { return false; }

◆ prefetchAsync_()

void edm::DelayedReaderInputProductResolver::prefetchAsync_ ( WaitingTaskHolder  waitTask,
Principal const &  principal,
bool  skipCurrentProcess,
ServiceToken const &  token,
SharedResourcesAcquirer sra,
ModuleCallingContext const *  mcc 
) const
overrideprivatevirtual

Implements edm::ProductResolverBase.

Definition at line 271 of file ProductResolvers.cc.

References edm::WaitingTaskList::add(), edm::ProductResolverBase::branchDescription(), CMS_SA_ALLOW, edm::WaitingTaskList::doneWaiting(), MillePedeFileConverter_cfg::e, cppFunctionSkipper::exception, Exception, edm::WaitingTaskHolder::group(), edm::InEvent, edm::InProcess, edm::ServiceWeakToken::lock(), m_prefetchRequested, m_waitingTasks, edm::make_functor_task(), edm::ProductResolverBase::productResolved(), createBeamHaloJobs::queue, DQM::reader, alignCSCRings::s, edm::DataManagingProductResolver::setProduct(), edm::errors::StdException, submitPVValidationJobs::t, and unpackBuffers-CaloStage2::token.

276  {
277  //need to try changing m_prefetchRequested before adding to m_waitingTasks
278  bool expected = false;
279  bool prefetchRequested = m_prefetchRequested.compare_exchange_strong(expected, true);
280  m_waitingTasks.add(waitTask);
281 
282  if (prefetchRequested) {
283  ServiceWeakToken weakToken = token;
284  auto workToDo = [this, mcc, &principal, weakToken]() {
285  //need to make sure Service system is activated on the reading thread
286  ServiceRegistry::Operate operate(weakToken.lock());
287  // Caught exception is propagated via WaitingTaskList
288  CMS_SA_ALLOW try {
289  resolveProductImpl<true>([this, &principal, mcc]() {
290  if (principal.branchType() != InEvent && principal.branchType() != InProcess) {
291  return;
292  }
293  if (auto reader = principal.reader()) {
294  std::unique_lock<std::recursive_mutex> guard;
295  if (auto sr = reader->sharedResources().second) {
296  guard = std::unique_lock<std::recursive_mutex>(*sr);
297  }
298  if (not productResolved()) {
299  try {
300  //another thread could have finished this while we were waiting
301  setProduct(reader->getProduct(branchDescription().branchID(), &principal, mcc));
302  } catch (cms::Exception& e) {
303  extendException(e, branchDescription(), mcc);
304  throw;
305  } catch (std::exception const& e) {
306  auto newExcept = edm::Exception(errors::StdException) << e.what();
307  extendException(newExcept, branchDescription(), mcc);
308  throw newExcept;
309  }
310  }
311  }
312  });
313  } catch (...) {
314  this->m_waitingTasks.doneWaiting(std::current_exception());
315  return;
316  }
317  this->m_waitingTasks.doneWaiting(nullptr);
318  };
319 
320  SerialTaskQueueChain* queue = nullptr;
321  if (auto reader = principal.reader()) {
322  if (auto shared_res = reader->sharedResources().first) {
323  queue = &(shared_res->serialQueueChain());
324  }
325  }
326  if (queue) {
327  queue->push(*waitTask.group(), workToDo);
328  } else {
329  //Have to create a new task
330  auto t = make_functor_task(workToDo);
331  waitTask.group()->run([t]() {
332  TaskSentry s{t};
333  t->execute();
334  });
335  }
336  }
337  }
#define CMS_SA_ALLOW
reader
Definition: DQM.py:105
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
void add(oneapi::tbb::task_group *, WaitingTask *)
Adds task to the waiting list.
FunctorTask< F > * make_functor_task(F f)
Definition: FunctorTask.h:44
BranchDescription const & branchDescription() const
void setProduct(std::unique_ptr< WrapperBase > edp) const

◆ resetProductData_()

void edm::DelayedReaderInputProductResolver::resetProductData_ ( bool  deleteEarly)
overrideprivatevirtual

Reimplemented from edm::DataManagingProductResolver.

Definition at line 339 of file ProductResolvers.cc.

References m_prefetchRequested, m_waitingTasks, edm::WaitingTaskList::reset(), and edm::DataManagingProductResolver::resetProductData_().

339  {
340  if (not deleteEarly) {
341  m_prefetchRequested = false;
343  }
345  }
void resetProductData_(bool deleteEarly) override
void reset()
Resets access to the resource so that added tasks will wait.

◆ resolveProduct_()

ProductResolverBase::Resolution edm::DelayedReaderInputProductResolver::resolveProduct_ ( Principal const &  principal,
bool  skipCurrentProcess,
SharedResourcesAcquirer sra,
ModuleCallingContext const *  mcc 
) const
overrideprivatevirtual

Implements edm::ProductResolverBase.

Definition at line 170 of file ProductResolvers.cc.

References aux_, edm::ProductResolverBase::branchDescription(), MillePedeFileConverter_cfg::e, edm::signalslot::Signal< T >::emit(), cppFunctionSkipper::exception, Exception, edm::ModuleCallingContext::getStreamContext(), edm::InEvent, edm::InLumi, edm::InRun, edm::make_sentry(), edm::UnscheduledAuxiliary::postModuleDelayedGetSignal_, edm::UnscheduledAuxiliary::preModuleDelayedGetSignal_, edm::ProductResolverBase::productResolved(), DQM::reader, edm::DataManagingProductResolver::setProduct(), and edm::errors::StdException.

171  {
172  return resolveProductImpl<true>([this, &principal, mcc]() {
173  auto branchType = principal.branchType();
174  if (branchType == InLumi || branchType == InRun) {
175  //delayed get has not been allowed with Run or Lumis
176  // The file may already be closed so the reader is invalid
177  return;
178  }
179  if (mcc and branchType == InEvent and aux_) {
180  aux_->preModuleDelayedGetSignal_.emit(*(mcc->getStreamContext()), *mcc);
181  }
182 
183  auto sentry(make_sentry(mcc, [this, branchType](ModuleCallingContext const* iContext) {
184  if (branchType == InEvent and aux_) {
185  aux_->postModuleDelayedGetSignal_.emit(*(iContext->getStreamContext()), *iContext);
186  }
187  }));
188 
189  if (auto reader = principal.reader()) {
190  std::unique_lock<std::recursive_mutex> guard;
191  if (auto sr = reader->sharedResources().second) {
192  guard = std::unique_lock<std::recursive_mutex>(*sr);
193  }
194  if (not productResolved()) {
195  try {
196  //another thread could have beaten us here
197  setProduct(reader->getProduct(branchDescription().branchID(), &principal, mcc));
198  } catch (cms::Exception& e) {
199  extendException(e, branchDescription(), mcc);
200  throw;
201  } catch (std::exception const& e) {
202  auto newExcept = edm::Exception(errors::StdException) << e.what();
203  extendException(newExcept, branchDescription(), mcc);
204  throw newExcept;
205  }
206  }
207  }
208  });
209  }
std::unique_ptr< T, F > make_sentry(T *iObject, F iFunc)
NOTE: if iObject is null, then iFunc will not be called.
Definition: make_sentry.h:30
reader
Definition: DQM.py:105
UnscheduledAuxiliary const * aux_
void emit(Args &&... args) const
Definition: Signal.h:48
BranchDescription const & branchDescription() const
signalslot::Signal< void(StreamContext const &, ModuleCallingContext const &)> preModuleDelayedGetSignal_
void setProduct(std::unique_ptr< WrapperBase > edp) const
signalslot::Signal< void(StreamContext const &, ModuleCallingContext const &)> postModuleDelayedGetSignal_

◆ retrieveAndMerge_()

void edm::DelayedReaderInputProductResolver::retrieveAndMerge_ ( Principal const &  principal,
MergeableRunProductMetadata const *  mergeableRunProductMetadata 
) const
overrideprivatevirtual

Reimplemented from edm::ProductResolverBase.

Definition at line 211 of file ProductResolvers.cc.

References edm::ProductResolverBase::branchDescription(), edm::BranchDescription::branchName(), edm::BranchDescription::branchType(), edm::DataManagingProductResolver::defaultStatus(), Exception, edm::InRun, edm::errors::LogicError, edm::errors::MismatchedInputFiles, eostools::move(), edm::DataManagingProductResolver::ProductSet, DQM::reader, edm::Principal::reader(), edm::DataManagingProductResolver::ResolveFailed, edm::DataManagingProductResolver::setFailedStatus(), edm::MergeableInputProductResolver::setOrMergeProduct(), and edm::DataManagingProductResolver::status().

212  {
213  if (auto reader = principal.reader()) {
214  std::unique_lock<std::recursive_mutex> guard;
215  if (auto sr = reader->sharedResources().second) {
216  guard = std::unique_lock<std::recursive_mutex>(*sr);
217  }
218 
219  //Can't use resolveProductImpl since it first checks to see
220  // if the product was already retrieved and then returns if it is
221  auto edp(reader->getProduct(branchDescription().branchID(), &principal));
222 
223  if (edp.get() != nullptr) {
224  if (edp->isMergeable() && branchDescription().branchType() == InRun && !edp->hasSwap()) {
226  << "Missing definition of member function swap for branch name " << branchDescription().branchName()
227  << "\n"
228  << "Mergeable data types written to a Run must have the swap member function defined"
229  << "\n";
230  }
232  (status() == ProductStatus::ResolveFailed && !branchDescription().isMergeable())) {
233  setOrMergeProduct(std::move(edp), mergeableRunProductMetadata);
234  } else { // status() == ResolveFailed && branchDescription().isMergeable()
236  << "Merge of Run or Lumi product failed for branch " << branchDescription().branchName() << "\n"
237  << "The product branch was dropped in the first run or lumi fragment and present in a later one"
238  << "\n"
239  << "The solution is to drop the branch on input. Or better do not create inconsistent files\n"
240  << "that need to be merged in the first place.\n";
241  }
242  } else if (status() == defaultStatus()) {
243  setFailedStatus();
244  } else if (status() != ProductStatus::ResolveFailed && branchDescription().isMergeable()) {
246  << "Merge of Run or Lumi product failed for branch " << branchDescription().branchName() << "\n"
247  << "The product branch was present in first run or lumi fragment and dropped in a later one"
248  << "\n"
249  << "The solution is to drop the branch on input. Or better do not create inconsistent files\n"
250  << "that need to be merged in the first place.\n";
251  }
252  // Do nothing in other case. status is ResolveFailed already or
253  // it is not mergeable and the status is ProductSet
254  }
255  }
void setOrMergeProduct(std::shared_ptr< WrapperBase > prod, MergeableRunProductMetadata const *mergeableRunProductMetadata) const
BranchType const & branchType() const
reader
Definition: DQM.py:105
std::string const & branchName() const
ProductStatus defaultStatus() const
BranchDescription const & branchDescription() const
def move(src, dest)
Definition: eostools.py:511

◆ setMergeableRunProductMetadata_()

void edm::DelayedReaderInputProductResolver::setMergeableRunProductMetadata_ ( MergeableRunProductMetadata const *  mrpm)
overrideprivatevirtual

Reimplemented from edm::ProductResolverBase.

Definition at line 267 of file ProductResolvers.cc.

References edm::DataManagingProductResolver::setMergeableRunProductMetadataInProductData().

267  {
269  }
void setMergeableRunProductMetadataInProductData(MergeableRunProductMetadata const *)

◆ setupUnscheduled()

void edm::DelayedReaderInputProductResolver::setupUnscheduled ( UnscheduledConfigurator const &  iConfigure)
finalvirtual

Reimplemented from edm::ProductResolverBase.

Definition at line 347 of file ProductResolvers.cc.

References aux_, and edm::UnscheduledConfigurator::auxiliary().

347  {
348  aux_ = iConfigure.auxiliary();
349  }
UnscheduledAuxiliary const * aux_

◆ unscheduledWasNotRun_()

bool edm::DelayedReaderInputProductResolver::unscheduledWasNotRun_ ( ) const
inlinefinalprivatevirtual

Implements edm::ProductResolverBase.

Definition at line 148 of file ProductResolvers.h.

148 { return false; }

Member Data Documentation

◆ aux_

UnscheduledAuxiliary const* edm::DelayedReaderInputProductResolver::aux_
private

Definition at line 154 of file ProductResolvers.h.

Referenced by resolveProduct_(), and setupUnscheduled().

◆ m_prefetchRequested

std::atomic<bool> edm::DelayedReaderInputProductResolver::m_prefetchRequested
mutableprivate

Definition at line 152 of file ProductResolvers.h.

Referenced by prefetchAsync_(), and resetProductData_().

◆ m_waitingTasks

WaitingTaskList edm::DelayedReaderInputProductResolver::m_waitingTasks
mutableprivate

Definition at line 153 of file ProductResolvers.h.

Referenced by prefetchAsync_(), and resetProductData_().