CMS 3D CMS Logo

ProductResolvers.h
Go to the documentation of this file.
1 #ifndef FWCore_Framework_ProductResolvers_h
2 #define FWCore_Framework_ProductResolvers_h
3 
4 /*----------------------------------------------------------------------
5 
6 ProductResolver: A collection of information related to a single WrapperBase or
7 a set of related EDProducts. This is the storage unit of such information.
8 
9 ----------------------------------------------------------------------*/
19 
20 #include <memory>
21 #include <atomic>
22 
23 #include <string>
24 
25 namespace edm {
26  class MergeableRunProductMetadata;
27  class ProductProvenanceRetriever;
28  class DelayedReader;
29  class ModuleCallingContext;
31  class Principal;
33  class Worker;
34  class ServiceToken;
35 
37  public:
38  enum class ProductStatus {
39  ProductSet,
40  NotPut,
44  };
45 
46  DataManagingProductResolver(std::shared_ptr<BranchDescription const> bd,ProductStatus iDefaultStatus): ProductResolverBase(),
47  productData_(bd),
48  theStatus_(iDefaultStatus),
49  defaultStatus_(iDefaultStatus){}
50 
51  void connectTo(ProductResolverBase const&, Principal const*) final;
52 
54 
55  //Give AliasProductResolver access
56  void resetProductData_(bool deleteEarly) override;
57 
58  protected:
59  void setProduct(std::unique_ptr<WrapperBase> edp) const;
60  ProductStatus status() const { return theStatus_;}
63  //Handle the boilerplate code needed for resolveProduct_
64  template <bool callResolver, typename FUNC>
65  Resolution resolveProductImpl( FUNC resolver) const;
67 
68  private:
69 
70  void throwProductDeletedException() const;
71  void checkType(WrapperBase const& prod) const;
72  ProductData const& getProductData() const {return productData_;}
73  virtual bool isFromCurrentProcess() const = 0;
74  // merges the product with the pre-existing product
75  void mergeProduct(std::unique_ptr<WrapperBase> edp, MergeableRunProductMetadata const*) const;
76 
77  void putOrMergeProduct_(std::unique_ptr<WrapperBase> prod, MergeableRunProductMetadata const* mergeableRunProductMetadata) const final;
78  bool productUnavailable_() const final;
79  bool productResolved_() const final;
80  bool productWasDeleted_() const final;
81  bool productWasFetchedAndIsValid_( bool iSkipCurrentProcess) const final;
82 
84  void resetBranchDescription_(std::shared_ptr<BranchDescription const> bd) final {productData_.resetBranchDescription(bd);}
85  Provenance const* provenance_() const final {return &productData_.provenance();}
86 
87  std::string const& resolvedModuleLabel_() const final {return moduleLabel();}
88  void setProvenance_(ProductProvenanceRetriever const* provRetriever, ProcessHistory const& ph, ProductID const& pid) final;
89  void setProcessHistory_(ProcessHistory const& ph) final;
90  ProductProvenance const* productProvenancePtr_() const final;
91  bool singleProduct_() const final;
92 
94  mutable std::atomic<ProductStatus> theStatus_;
96  };
97 
99  public:
100  explicit InputProductResolver(std::shared_ptr<BranchDescription const> bd) :
102  m_prefetchRequested{ false },
103  aux_{nullptr} {}
104 
105  void setupUnscheduled(UnscheduledConfigurator const&) final;
106 
107  private:
108  bool isFromCurrentProcess() const final;
109 
110 
112  bool skipCurrentProcess,
114  ModuleCallingContext const* mcc) const override;
115  void prefetchAsync_(WaitingTask* waitTask,
116  Principal const& principal,
117  bool skipCurrentProcess,
118  ServiceToken const& token,
120  ModuleCallingContext const* mcc) const override;
121  void putProduct_(std::unique_ptr<WrapperBase> edp) const override;
122 
123  void retrieveAndMerge_(Principal const& principal, MergeableRunProductMetadata const* mergeableRunProductMetadata) const override;
124 
126 
127  bool unscheduledWasNotRun_() const final {return false;}
128 
129  void resetProductData_(bool deleteEarly) override;
130 
131  mutable std::atomic<bool> m_prefetchRequested;
133  UnscheduledAuxiliary const* aux_; //provides access to the delayedGet signals
134 
135 
136  };
137 
139  public:
140  ProducedProductResolver(std::shared_ptr<BranchDescription const> bd, ProductStatus iDefaultStatus) : DataManagingProductResolver(bd, iDefaultStatus) {assert(bd->produced());}
141 
142  protected:
143  void putProduct_(std::unique_ptr<WrapperBase> edp) const override;
144  private:
145  bool isFromCurrentProcess() const final;
146 
147  };
148 
150  public:
151  explicit PuttableProductResolver(std::shared_ptr<BranchDescription const> bd) : ProducedProductResolver(bd, ProductStatus::NotPut), worker_(nullptr), prefetchRequested_(false) {}
152 
153  void setupUnscheduled(UnscheduledConfigurator const&) final;
154 
155  private:
157  bool skipCurrentProcess,
159  ModuleCallingContext const* mcc) const override;
160  void prefetchAsync_(WaitingTask* waitTask,
161  Principal const& principal,
162  bool skipCurrentProcess,
163  ServiceToken const& token,
165  ModuleCallingContext const* mcc) const override;
166  bool unscheduledWasNotRun_() const override {return false;}
167 
168  void putProduct_(std::unique_ptr<WrapperBase> edp) const override;
169  void resetProductData_(bool deleteEarly) override;
170 
173  mutable std::atomic<bool> prefetchRequested_;
174 
175  };
176 
178  public:
179  explicit UnscheduledProductResolver(std::shared_ptr<BranchDescription const> bd) :
181  aux_(nullptr),
182  prefetchRequested_(false){}
183 
184  void setupUnscheduled(UnscheduledConfigurator const&) final;
185 
186  private:
188  bool skipCurrentProcess,
190  ModuleCallingContext const* mcc) const override;
191  void prefetchAsync_(WaitingTask* waitTask,
192  Principal const& principal,
193  bool skipCurrentProcess,
194  ServiceToken const& token,
196  ModuleCallingContext const* mcc) const override;
197  bool unscheduledWasNotRun_() const override {return status() == ProductStatus::ResolveNotRun;}
198 
199  void resetProductData_(bool deleteEarly) override;
200 
204  mutable std::atomic<bool> prefetchRequested_;
205  };
206 
208  public:
210  explicit AliasProductResolver(std::shared_ptr<BranchDescription const> bd, ProducedProductResolver& realProduct) : ProductResolverBase(), realProduct_(realProduct), bd_(bd) {}
211 
212  void connectTo(ProductResolverBase const& iOther, Principal const* iParentPrincipal) final {
213  realProduct_.connectTo(iOther, iParentPrincipal );
214  };
215 
216  private:
218  bool skipCurrentProcess,
220  ModuleCallingContext const* mcc) const override {
221  return realProduct_.resolveProduct(principal, skipCurrentProcess, sra, mcc);}
222  void prefetchAsync_(WaitingTask* waitTask,
223  Principal const& principal,
224  bool skipCurrentProcess,
225  ServiceToken const& token,
227  ModuleCallingContext const* mcc) const override {
228  realProduct_.prefetchAsync(waitTask, principal, skipCurrentProcess, token, sra, mcc);
229  }
230  bool unscheduledWasNotRun_() const override {return realProduct_.unscheduledWasNotRun();}
231  bool productUnavailable_() const override {return realProduct_.productUnavailable();}
232  bool productResolved_() const final {
233  return realProduct_.productResolved(); }
234  bool productWasDeleted_() const override {return realProduct_.productWasDeleted();}
235  bool productWasFetchedAndIsValid_(bool iSkipCurrentProcess) const final {
236  return realProduct_.productWasFetchedAndIsValid(iSkipCurrentProcess);
237  }
238 
239  void putProduct_(std::unique_ptr<WrapperBase> edp) const override;
240  void putOrMergeProduct_(std::unique_ptr<WrapperBase> prod, MergeableRunProductMetadata const* mergeableRunProductMetadata) const final;
241  BranchDescription const& branchDescription_() const override {return *bd_;}
242  void resetBranchDescription_(std::shared_ptr<BranchDescription const> bd) override {bd_ = bd;}
243  Provenance const* provenance_() const final { return realProduct_.provenance(); }
244 
245  std::string const& resolvedModuleLabel_() const override {return realProduct_.moduleLabel();}
246  void setProvenance_(ProductProvenanceRetriever const* provRetriever, ProcessHistory const& ph, ProductID const& pid) override;
247  void setProcessHistory_(ProcessHistory const& ph) override;
248  ProductProvenance const* productProvenancePtr_() const override;
249  void resetProductData_(bool deleteEarly) override;
250  bool singleProduct_() const override;
251 
253  std::shared_ptr<BranchDescription const> bd_;
254  };
255 
257  public:
259  explicit ParentProcessProductResolver(std::shared_ptr<BranchDescription const> bd) : ProductResolverBase(), realProduct_(nullptr), bd_(bd), provRetriever_(nullptr), parentPrincipal_(nullptr) {}
260 
261  void connectTo(ProductResolverBase const& iOther, Principal const* iParentPrincipal) final {
262  realProduct_ = &iOther;
263  parentPrincipal_ = iParentPrincipal;
264  };
265 
266  private:
268  bool skipCurrentProcess,
270  ModuleCallingContext const* mcc) const override {
271  skipCurrentProcess = false;
272  return realProduct_->resolveProduct(*parentPrincipal_, skipCurrentProcess, sra, mcc);
273  }
274  void prefetchAsync_(WaitingTask* waitTask,
275  Principal const& principal,
276  bool skipCurrentProcess,
277  ServiceToken const& token,
279  ModuleCallingContext const* mcc) const override {
280  skipCurrentProcess = false;
281  realProduct_->prefetchAsync( waitTask, *parentPrincipal_, skipCurrentProcess, token, sra, mcc);
282  }
283  bool unscheduledWasNotRun_() const override {
284  if (realProduct_) return realProduct_->unscheduledWasNotRun();
285  throwNullRealProduct();
286  return false;
287  }
288  bool productUnavailable_() const override {return realProduct_->productUnavailable();}
289  bool productResolved_() const final { return realProduct_->productResolved(); }
290  bool productWasDeleted_() const override {return realProduct_->productWasDeleted();}
291  bool productWasFetchedAndIsValid_(bool iSkipCurrentProcess) const override {
292  iSkipCurrentProcess = false;
293  return realProduct_->productWasFetchedAndIsValid(iSkipCurrentProcess);
294  }
295 
296  void putProduct_(std::unique_ptr<WrapperBase> edp) const override;
297  void putOrMergeProduct_(std::unique_ptr<WrapperBase> prod, MergeableRunProductMetadata const* mergeableRunProductMetadata) const final;
298  BranchDescription const& branchDescription_() const override {return *bd_;}
299  void resetBranchDescription_(std::shared_ptr<BranchDescription const> bd) override {bd_ = bd;}
300  Provenance const* provenance_() const final {return realProduct_->provenance();
301  }
302  std::string const& resolvedModuleLabel_() const override {return realProduct_->moduleLabel();}
303  void setProvenance_(ProductProvenanceRetriever const* provRetriever, ProcessHistory const& ph, ProductID const& pid) override;
304  void setProcessHistory_(ProcessHistory const& ph) override;
305  ProductProvenance const* productProvenancePtr_() const override;
306  void resetProductData_(bool deleteEarly) override;
307  bool singleProduct_() const override;
308  void throwNullRealProduct() const;
309 
311  std::shared_ptr<BranchDescription const> bd_;
314  };
315 
317  public:
319  NoProcessProductResolver(std::vector<ProductResolverIndex> const& matchingHolders,
320  std::vector<bool> const& ambiguous, bool madeAtEnd);
321 
322  void connectTo(ProductResolverBase const& iOther, Principal const*) final ;
323 
324  void tryPrefetchResolverAsync(unsigned int iProcessingIndex,
325  Principal const& principal,
326  bool skipCurrentProcess,
328  ModuleCallingContext const* mcc,
329  ServiceToken token) const;
330 
331  bool dataValidFromResolver(unsigned int iProcessingIndex,
332  Principal const& principal,
333  bool iSkipCurrentProcess) const;
334 
335  void prefetchFailed(unsigned int iProcessingIndex,
336  Principal const& principal,
337  bool iSkipCurrentProcess,
338  std::exception_ptr iExceptPtr) const;
339  private:
340  unsigned int unsetIndexValue() const;
341  Resolution resolveProduct_(Principal const& principal,
342  bool skipCurrentProcess,
344  ModuleCallingContext const* mcc) const override;
345  void prefetchAsync_(WaitingTask* waitTask,
346  Principal const& principal,
347  bool skipCurrentProcess,
348  ServiceToken const& token,
350  ModuleCallingContext const* mcc) const override;
351  bool unscheduledWasNotRun_() const override;
352  bool productUnavailable_() const override;
353  bool productWasDeleted_() const override;
354  bool productResolved_() const final;
355  bool productWasFetchedAndIsValid_(bool iSkipCurrentProcess) const override;
356 
357  void putProduct_(std::unique_ptr<WrapperBase> edp) const override;
358  void putOrMergeProduct_(std::unique_ptr<WrapperBase> prod, MergeableRunProductMetadata const* mergeableRunProductMetadata) const final;
359  BranchDescription const& branchDescription_() const override;
360  void resetBranchDescription_(std::shared_ptr<BranchDescription const> bd) override;
361  Provenance const* provenance_() const override;
362 
363  std::string const& resolvedModuleLabel_() const override {return moduleLabel();}
364  void setProvenance_(ProductProvenanceRetriever const* provRetriever, ProcessHistory const& ph, ProductID const& pid) override;
365  void setProcessHistory_(ProcessHistory const& ph) override;
366  ProductProvenance const* productProvenancePtr_() const override;
367  void resetProductData_(bool deleteEarly) override;
368  bool singleProduct_() const override;
369 
370  Resolution tryResolver(unsigned int index,
371  Principal const& principal,
372  bool skipCurrentProcess,
374  ModuleCallingContext const* mcc) const;
375 
376  void setCache(bool skipCurrentProcess, ProductResolverIndex index, std::exception_ptr exceptionPtr) const;
377 
378  std::vector<ProductResolverIndex> matchingHolders_;
379  std::vector<bool> ambiguous_;
382  mutable std::atomic<unsigned int> lastCheckIndex_;
383  mutable std::atomic<unsigned int> lastSkipCurrentCheckIndex_;
384  mutable std::atomic<bool> prefetchRequested_;
385  mutable std::atomic<bool> skippingPrefetchRequested_;
386  const bool madeAtEnd_;
387  };
388 
390  public:
393  ProductResolverBase(), realResolverIndex_(iChoice) {}
394 
395  void connectTo(ProductResolverBase const& iOther, Principal const*) final ;
396 
397  private:
399  bool skipCurrentProcess,
401  ModuleCallingContext const* mcc) const override;
402  void prefetchAsync_(WaitingTask* waitTask,
403  Principal const& principal,
404  bool skipCurrentProcess,
405  ServiceToken const& token,
407  ModuleCallingContext const* mcc) const override;
408  bool unscheduledWasNotRun_() const override;
409  bool productUnavailable_() const override;
410  bool productWasDeleted_() const override;
411  bool productResolved_() const final;
412  bool productWasFetchedAndIsValid_(bool iSkipCurrentProcess) const override;
413 
414  void putProduct_(std::unique_ptr<WrapperBase> edp) const override;
415  void putOrMergeProduct_(std::unique_ptr<WrapperBase> prod, MergeableRunProductMetadata const* mergeableRunProductMetadata) const final;
416  BranchDescription const& branchDescription_() const override;
417  void resetBranchDescription_(std::shared_ptr<BranchDescription const> bd) override;
418  Provenance const* provenance_() const override;
419 
420  std::string const& resolvedModuleLabel_() const override {return moduleLabel();}
421  void setProvenance_(ProductProvenanceRetriever const* provRetriever, ProcessHistory const& ph, ProductID const& pid) override;
422  void setProcessHistory_(ProcessHistory const& ph) override;
423  ProductProvenance const* productProvenancePtr_() const override;
424  void resetProductData_(bool deleteEarly) override;
425  bool singleProduct_() const override;
426 
428  };
429 
430 }
431 
432 #endif
ProducedProductResolver & realProduct_
bool productResolved_() const final
Provenance const & provenance() const
Definition: ProductData.h:32
bool unscheduledWasNotRun_() const override
void connectTo(ProductResolverBase const &iOther, Principal const *iParentPrincipal) final
unsigned int ProductResolverIndex
std::shared_ptr< BranchDescription const > const & branchDescription() const
Definition: ProductData.h:28
void checkType(WrapperBase const &prod) const
ParentProcessProductResolver(std::shared_ptr< BranchDescription const > bd)
ProducedProductResolver::ProductStatus ProductStatus
std::string const & resolvedModuleLabel_() const override
ProductStatus const defaultStatus_
std::shared_ptr< BranchDescription const > bd_
bool productUnavailable_() const override
bool productWasFetchedAndIsValid_(bool iSkipCurrentProcess) const final
void resetProductData_(bool deleteEarly) override
UnscheduledProductResolver(std::shared_ptr< BranchDescription const > bd)
UnscheduledAuxiliary const * aux_
ProductStatus status() const
Provenance const * provenance_() const final
virtual void setupUnscheduled(UnscheduledConfigurator const &)
std::string const & resolvedModuleLabel_() const final
WaitingTaskList m_waitingTasks
virtual void putProduct_(std::unique_ptr< WrapperBase > edp) const =0
virtual void prefetchAsync_(WaitingTask *waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const =0
DataManagingProductResolver(std::shared_ptr< BranchDescription const > bd, ProductStatus iDefaultStatus)
Provenance const * provenance_() const final
#define nullptr
ProducedProductResolver(std::shared_ptr< BranchDescription const > bd, ProductStatus iDefaultStatus)
std::atomic< bool > prefetchRequested_
ProductProvenanceRetriever const * provRetriever_
Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
void connectTo(ProductResolverBase const &, Principal const *) final
void resetBranchDescription(std::shared_ptr< BranchDescription const > bd)
Definition: ProductData.cc:29
std::string const & resolvedModuleLabel_() const override
void resetBranchDescription_(std::shared_ptr< BranchDescription const > bd) override
bool productUnavailable_() const override
BranchDescription const & branchDescription_() const override
ProducedProductResolver::ProductStatus ProductStatus
UnscheduledAuxiliary const * aux_
def principal(options)
ProductResolverBase const * realProduct_
void mergeProduct(std::unique_ptr< WrapperBase > edp, MergeableRunProductMetadata const *) const
void connectTo(ProductResolverBase const &iOther, Principal const *iParentPrincipal) final
std::atomic< unsigned int > lastSkipCurrentCheckIndex_
InputProductResolver(std::shared_ptr< BranchDescription const > bd)
std::atomic< bool > prefetchRequested_
bool productWasDeleted_() const override
BranchDescription const & branchDescription_() const final
void resetBranchDescription_(std::shared_ptr< BranchDescription const > bd) override
std::atomic< bool > prefetchRequested_
std::atomic< bool > m_prefetchRequested
std::atomic< unsigned int > lastCheckIndex_
bool unscheduledWasNotRun_() const override
void setProvenance_(ProductProvenanceRetriever const *provRetriever, ProcessHistory const &ph, ProductID const &pid) final
bool unscheduledWasNotRun_() const override
AliasProductResolver(std::shared_ptr< BranchDescription const > bd, ProducedProductResolver &realProduct)
ProducedProductResolver::ProductStatus ProductStatus
virtual bool isFromCurrentProcess() const =0
void setProduct(std::unique_ptr< WrapperBase > edp) const
std::atomic< bool > skippingPrefetchRequested_
std::atomic< ProductStatus > theStatus_
std::vector< bool > ambiguous_
Resolution resolveProductImpl(FUNC resolver) const
Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
virtual void setMergeableRunProductMetadata_(MergeableRunProductMetadata const *)
bool unscheduledWasNotRun_() const override
virtual void retrieveAndMerge_(Principal const &principal, MergeableRunProductMetadata const *mergeableRunProductMetadata) const
virtual bool unscheduledWasNotRun_() const =0
ProductStatus defaultStatus() const
void setMergeableRunProductMetadataInProductData(MergeableRunProductMetadata const *)
bool productWasFetchedAndIsValid_(bool iSkipCurrentProcess) const final
void putOrMergeProduct_(std::unique_ptr< WrapperBase > prod, MergeableRunProductMetadata const *mergeableRunProductMetadata) const final
BranchDescription const & branchDescription_() const override
std::vector< ProductResolverIndex > matchingHolders_
HLT enums.
bool productWasFetchedAndIsValid_(bool iSkipCurrentProcess) const override
bool productWasDeleted_() const override
void resetBranchDescription_(std::shared_ptr< BranchDescription const > bd) final
ProductProvenance const * productProvenancePtr_() const final
ProducedProductResolver::ProductStatus ProductStatus
void prefetchAsync_(WaitingTask *waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
ProductData const & getProductData() const
std::shared_ptr< BranchDescription const > bd_
void prefetchAsync_(WaitingTask *waitTask, Principal const &principal, bool skipCurrentProcess, ServiceToken const &token, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
std::string const & resolvedModuleLabel_() const override
bool unscheduledWasNotRun_() const final
PuttableProductResolver(std::shared_ptr< BranchDescription const > bd)
Provenance const * provenance_() const final
SingleChoiceNoProcessProductResolver(ProductResolverIndex iChoice)
virtual Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const =0
void setProcessHistory_(ProcessHistory const &ph) final
std::string const & moduleLabel() const