CMS 3D CMS Logo

ProductResolvers.h
Go to the documentation of this file.
1 #ifndef FWCore_Framework_ProductResolvers_h
2 #define FWCore_Framework_ProductResolvers_h
3 
4 /*----------------------------------------------------------------------
5 
6 ProductResolver: A collection of information related to a single WrapperBase or
7 a set of related EDProducts. This is the storage unit of such information.
8 
9 ----------------------------------------------------------------------*/
19 
20 #include <memory>
21 #include <atomic>
22 
23 #include <string>
24 
25 namespace edm {
26  class ProductProvenanceRetriever;
27  class DelayedReader;
28  class ModuleCallingContext;
30  class Principal;
32  class Worker;
33  class ServiceToken;
34 
36  public:
37  enum class ProductStatus {
38  ProductSet,
39  NotPut,
43  };
44 
45  DataManagingProductResolver(std::shared_ptr<BranchDescription const> bd,ProductStatus iDefaultStatus): ProductResolverBase(),
46  productData_(bd),
47  theStatus_(iDefaultStatus),
48  defaultStatus_(iDefaultStatus){}
49 
50  void connectTo(ProductResolverBase const&, Principal const*) final;
51 
53 
54  //Give AliasProductResolver access
55  void resetProductData_(bool deleteEarly) override;
56 
57  protected:
58  void setProduct(std::unique_ptr<WrapperBase> edp) const;
59  ProductStatus status() const { return theStatus_;}
62  //Handle the boilerplate code needed for resolveProduct_
63  template <bool callResolver, typename FUNC>
64  Resolution resolveProductImpl( FUNC resolver) const;
65 
66  private:
67 
68  void throwProductDeletedException() const;
69  void checkType(WrapperBase const& prod) const;
70  ProductData const& getProductData() const {return productData_;}
71  virtual bool isFromCurrentProcess() const = 0;
72  // merges the product with the pre-existing product
73  void mergeProduct(std::unique_ptr<WrapperBase> edp) const;
74 
75  void putOrMergeProduct_(std::unique_ptr<WrapperBase> prod) const final;
76  bool productUnavailable_() const final;
77  bool productResolved_() const final;
78  bool productWasDeleted_() const final;
79  bool productWasFetchedAndIsValid_( bool iSkipCurrentProcess) const final;
80 
82  void resetBranchDescription_(std::shared_ptr<BranchDescription const> bd) final {productData_.resetBranchDescription(bd);}
83  Provenance const* provenance_() const final {return &productData_.provenance();}
84 
85  std::string const& resolvedModuleLabel_() const final {return moduleLabel();}
86  void setProvenance_(ProductProvenanceRetriever const* provRetriever, ProcessHistory const& ph, ProductID const& pid) final;
87  void setProcessHistory_(ProcessHistory const& ph) final;
88  ProductProvenance const* productProvenancePtr_() const final;
89  bool singleProduct_() const final;
90 
92  mutable std::atomic<ProductStatus> theStatus_;
94  };
95 
97  public:
98  explicit InputProductResolver(std::shared_ptr<BranchDescription const> bd) :
100  m_prefetchRequested{ false },
101  aux_{nullptr} {}
102 
103  void setupUnscheduled(UnscheduledConfigurator const&) final;
104 
105  private:
106  bool isFromCurrentProcess() const final;
107 
108 
110  bool skipCurrentProcess,
112  ModuleCallingContext const* mcc) const override;
113  void prefetchAsync_(WaitingTask* waitTask,
114  Principal const& principal,
115  bool skipCurrentProcess,
117  ModuleCallingContext const* mcc) const override;
118  void putProduct_(std::unique_ptr<WrapperBase> edp) const override;
119 
120  void retrieveAndMerge_(Principal const& principal) const override;
121 
122  bool unscheduledWasNotRun_() const final {return false;}
123 
124  void resetProductData_(bool deleteEarly) override;
125 
126  mutable std::atomic<bool> m_prefetchRequested;
128  UnscheduledAuxiliary const* aux_; //provides access to the delayedGet signals
129 
130 
131  };
132 
134  public:
135  ProducedProductResolver(std::shared_ptr<BranchDescription const> bd, ProductStatus iDefaultStatus) : DataManagingProductResolver(bd, iDefaultStatus) {assert(bd->produced());}
136 
137  void resetFailedFromThisProcess() override;
138 
139  protected:
140  void putProduct_(std::unique_ptr<WrapperBase> edp) const override;
141  private:
142  bool isFromCurrentProcess() const final;
143 
144  };
145 
147  public:
148  explicit PuttableProductResolver(std::shared_ptr<BranchDescription const> bd) : ProducedProductResolver(bd, ProductStatus::NotPut), worker_(nullptr), prefetchRequested_(false) {}
149 
150  void setupUnscheduled(UnscheduledConfigurator const&) final;
151 
152  private:
154  bool skipCurrentProcess,
156  ModuleCallingContext const* mcc) const override;
157  void prefetchAsync_(WaitingTask* waitTask,
158  Principal const& principal,
159  bool skipCurrentProcess,
161  ModuleCallingContext const* mcc) const override;
162  bool unscheduledWasNotRun_() const override {return false;}
163 
164  void putProduct_(std::unique_ptr<WrapperBase> edp) const override;
165  void resetProductData_(bool deleteEarly) override;
166 
169  mutable std::atomic<bool> prefetchRequested_;
170 
171  };
172 
174  public:
175  explicit UnscheduledProductResolver(std::shared_ptr<BranchDescription const> bd) :
177  aux_(nullptr),
178  prefetchRequested_(false){}
179 
180  void setupUnscheduled(UnscheduledConfigurator const&) final;
181 
182  private:
184  bool skipCurrentProcess,
186  ModuleCallingContext const* mcc) const override;
187  void prefetchAsync_(WaitingTask* waitTask,
188  Principal const& principal,
189  bool skipCurrentProcess,
191  ModuleCallingContext const* mcc) const override;
192  bool unscheduledWasNotRun_() const override {return status() == ProductStatus::ResolveNotRun;}
193 
194  void resetProductData_(bool deleteEarly) override;
195 
199  mutable std::atomic<bool> prefetchRequested_;
200  };
201 
203  public:
205  explicit AliasProductResolver(std::shared_ptr<BranchDescription const> bd, ProducedProductResolver& realProduct) : ProductResolverBase(), realProduct_(realProduct), bd_(bd) {}
206 
207  void connectTo(ProductResolverBase const& iOther, Principal const* iParentPrincipal) final {
208  realProduct_.connectTo(iOther, iParentPrincipal );
209  };
210 
211  private:
213  bool skipCurrentProcess,
215  ModuleCallingContext const* mcc) const override {
216  return realProduct_.resolveProduct(principal, skipCurrentProcess, sra, mcc);}
217  void prefetchAsync_(WaitingTask* waitTask,
218  Principal const& principal,
219  bool skipCurrentProcess,
221  ModuleCallingContext const* mcc) const override {
222  realProduct_.prefetchAsync(waitTask, principal, skipCurrentProcess, sra, mcc);
223  }
224  bool unscheduledWasNotRun_() const override {return realProduct_.unscheduledWasNotRun();}
225  bool productUnavailable_() const override {return realProduct_.productUnavailable();}
226  bool productResolved_() const final {
227  return realProduct_.productResolved(); }
228  bool productWasDeleted_() const override {return realProduct_.productWasDeleted();}
229  bool productWasFetchedAndIsValid_(bool iSkipCurrentProcess) const final {
230  return realProduct_.productWasFetchedAndIsValid(iSkipCurrentProcess);
231  }
232 
233  void putProduct_(std::unique_ptr<WrapperBase> edp) const override;
234  void putOrMergeProduct_(std::unique_ptr<WrapperBase> prod) const final;
235  BranchDescription const& branchDescription_() const override {return *bd_;}
236  void resetBranchDescription_(std::shared_ptr<BranchDescription const> bd) override {bd_ = bd;}
237  Provenance const* provenance_() const final { return realProduct_.provenance(); }
238 
239  std::string const& resolvedModuleLabel_() const override {return realProduct_.moduleLabel();}
240  void setProvenance_(ProductProvenanceRetriever const* provRetriever, ProcessHistory const& ph, ProductID const& pid) override;
241  void setProcessHistory_(ProcessHistory const& ph) override;
242  ProductProvenance const* productProvenancePtr_() const override;
243  void resetProductData_(bool deleteEarly) override;
244  bool singleProduct_() const override;
245 
247  std::shared_ptr<BranchDescription const> bd_;
248  };
249 
251  public:
253  explicit ParentProcessProductResolver(std::shared_ptr<BranchDescription const> bd) : ProductResolverBase(), realProduct_(nullptr), bd_(bd), provRetriever_(nullptr), parentPrincipal_(nullptr) {}
254 
255  void connectTo(ProductResolverBase const& iOther, Principal const* iParentPrincipal) final {
256  realProduct_ = &iOther;
257  parentPrincipal_ = iParentPrincipal;
258  };
259 
260  private:
262  bool skipCurrentProcess,
264  ModuleCallingContext const* mcc) const override {
265  skipCurrentProcess = false;
266  return realProduct_->resolveProduct(*parentPrincipal_, skipCurrentProcess, sra, mcc);
267  }
268  void prefetchAsync_(WaitingTask* waitTask,
269  Principal const& principal,
270  bool skipCurrentProcess,
272  ModuleCallingContext const* mcc) const override {
273  skipCurrentProcess = false;
274  realProduct_->prefetchAsync( waitTask, *parentPrincipal_, skipCurrentProcess, sra, mcc);
275  }
276  bool unscheduledWasNotRun_() const override {
277  if (realProduct_) return realProduct_->unscheduledWasNotRun();
278  throwNullRealProduct();
279  return false;
280  }
281  bool productUnavailable_() const override {return realProduct_->productUnavailable();}
282  bool productResolved_() const final { return realProduct_->productResolved(); }
283  bool productWasDeleted_() const override {return realProduct_->productWasDeleted();}
284  bool productWasFetchedAndIsValid_(bool iSkipCurrentProcess) const override {
285  iSkipCurrentProcess = false;
286  return realProduct_->productWasFetchedAndIsValid(iSkipCurrentProcess);
287  }
288 
289  void putProduct_(std::unique_ptr<WrapperBase> edp) const override;
290  void putOrMergeProduct_(std::unique_ptr<WrapperBase> prod) const final;
291  BranchDescription const& branchDescription_() const override {return *bd_;}
292  void resetBranchDescription_(std::shared_ptr<BranchDescription const> bd) override {bd_ = bd;}
293  Provenance const* provenance_() const final {return realProduct_->provenance();
294  }
295  std::string const& resolvedModuleLabel_() const override {return realProduct_->moduleLabel();}
296  void setProvenance_(ProductProvenanceRetriever const* provRetriever, ProcessHistory const& ph, ProductID const& pid) override;
297  void setProcessHistory_(ProcessHistory const& ph) override;
298  ProductProvenance const* productProvenancePtr_() const override;
299  void resetProductData_(bool deleteEarly) override;
300  bool singleProduct_() const override;
301  void throwNullRealProduct() const;
302 
304  std::shared_ptr<BranchDescription const> bd_;
307  };
308 
310  public:
312  NoProcessProductResolver(std::vector<ProductResolverIndex> const& matchingHolders,
313  std::vector<bool> const& ambiguous);
314 
315  void connectTo(ProductResolverBase const& iOther, Principal const*) final ;
316 
317  void tryPrefetchResolverAsync(unsigned int iProcessingIndex,
318  Principal const& principal,
319  bool skipCurrentProcess,
321  ModuleCallingContext const* mcc,
322  ServiceToken token) const;
323 
324  bool dataValidFromResolver(unsigned int iProcessingIndex,
325  Principal const& principal,
326  bool iSkipCurrentProcess) const;
327 
328  void prefetchFailed(unsigned int iProcessingIndex,
329  Principal const& principal,
330  bool iSkipCurrentProcess,
331  std::exception_ptr iExceptPtr) const;
332  private:
333  unsigned int unsetIndexValue() const;
334  Resolution resolveProduct_(Principal const& principal,
335  bool skipCurrentProcess,
337  ModuleCallingContext const* mcc) const override;
338  void prefetchAsync_(WaitingTask* waitTask,
339  Principal const& principal,
340  bool skipCurrentProcess,
342  ModuleCallingContext const* mcc) const override;
343  bool unscheduledWasNotRun_() const override;
344  bool productUnavailable_() const override;
345  bool productWasDeleted_() const override;
346  bool productResolved_() const final;
347  bool productWasFetchedAndIsValid_(bool iSkipCurrentProcess) const override;
348 
349  void putProduct_(std::unique_ptr<WrapperBase> edp) const override;
350  void putOrMergeProduct_(std::unique_ptr<WrapperBase> prod) const final;
351  BranchDescription const& branchDescription_() const override;
352  void resetBranchDescription_(std::shared_ptr<BranchDescription const> bd) override;
353  Provenance const* provenance_() const override;
354 
355  std::string const& resolvedModuleLabel_() const override {return moduleLabel();}
356  void setProvenance_(ProductProvenanceRetriever const* provRetriever, ProcessHistory const& ph, ProductID const& pid) override;
357  void setProcessHistory_(ProcessHistory const& ph) override;
358  ProductProvenance const* productProvenancePtr_() const override;
359  void resetProductData_(bool deleteEarly) override;
360  bool singleProduct_() const override;
361 
362  Resolution tryResolver(unsigned int index,
363  Principal const& principal,
364  bool skipCurrentProcess,
366  ModuleCallingContext const* mcc) const;
367 
368  void setCache(bool skipCurrentProcess, ProductResolverIndex index, std::exception_ptr exceptionPtr) const;
369 
370  std::vector<ProductResolverIndex> matchingHolders_;
371  std::vector<bool> ambiguous_;
374  mutable std::atomic<unsigned int> lastCheckIndex_;
375  mutable std::atomic<unsigned int> lastSkipCurrentCheckIndex_;
376  mutable std::atomic<bool> prefetchRequested_;
377  mutable std::atomic<bool> skippingPrefetchRequested_;
378  mutable std::atomic<bool> recheckedAtEnd_;
379  };
380 
382  public:
385  ProductResolverBase(), realResolverIndex_(iChoice) {}
386 
387  void connectTo(ProductResolverBase const& iOther, Principal const*) final ;
388 
389  private:
391  bool skipCurrentProcess,
393  ModuleCallingContext const* mcc) const override;
394  void prefetchAsync_(WaitingTask* waitTask,
395  Principal const& principal,
396  bool skipCurrentProcess,
398  ModuleCallingContext const* mcc) const override;
399  bool unscheduledWasNotRun_() const override;
400  bool productUnavailable_() const override;
401  bool productWasDeleted_() const override;
402  bool productResolved_() const final;
403  bool productWasFetchedAndIsValid_(bool iSkipCurrentProcess) const override;
404 
405  void putProduct_(std::unique_ptr<WrapperBase> edp) const override;
406  void putOrMergeProduct_(std::unique_ptr<WrapperBase> prod) const final;
407  BranchDescription const& branchDescription_() const override;
408  void resetBranchDescription_(std::shared_ptr<BranchDescription const> bd) override;
409  Provenance const* provenance_() const override;
410 
411  std::string const& resolvedModuleLabel_() const override {return moduleLabel();}
412  void setProvenance_(ProductProvenanceRetriever const* provRetriever, ProcessHistory const& ph, ProductID const& pid) override;
413  void setProcessHistory_(ProcessHistory const& ph) override;
414  ProductProvenance const* productProvenancePtr_() const override;
415  void resetProductData_(bool deleteEarly) override;
416  bool singleProduct_() const override;
417 
419  };
420 
421 }
422 
423 #endif
ProducedProductResolver & realProduct_
bool productResolved_() const final
Provenance const & provenance() const
Definition: ProductData.h:30
bool unscheduledWasNotRun_() const override
void connectTo(ProductResolverBase const &iOther, Principal const *iParentPrincipal) final
unsigned int ProductResolverIndex
std::shared_ptr< BranchDescription const > const & branchDescription() const
Definition: ProductData.h:26
void checkType(WrapperBase const &prod) const
ParentProcessProductResolver(std::shared_ptr< BranchDescription const > bd)
ProducedProductResolver::ProductStatus ProductStatus
std::string const & resolvedModuleLabel_() const override
ProductStatus const defaultStatus_
std::shared_ptr< BranchDescription const > bd_
bool productUnavailable_() const override
bool productWasFetchedAndIsValid_(bool iSkipCurrentProcess) const final
void resetProductData_(bool deleteEarly) override
UnscheduledProductResolver(std::shared_ptr< BranchDescription const > bd)
UnscheduledAuxiliary const * aux_
ProductStatus status() const
Provenance const * provenance_() const final
virtual void setupUnscheduled(UnscheduledConfigurator const &)
std::string const & resolvedModuleLabel_() const final
WaitingTaskList m_waitingTasks
virtual void putProduct_(std::unique_ptr< WrapperBase > edp) const =0
DataManagingProductResolver(std::shared_ptr< BranchDescription const > bd, ProductStatus iDefaultStatus)
Provenance const * provenance_() const final
#define nullptr
ProducedProductResolver(std::shared_ptr< BranchDescription const > bd, ProductStatus iDefaultStatus)
std::atomic< bool > prefetchRequested_
ProductProvenanceRetriever const * provRetriever_
Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
void connectTo(ProductResolverBase const &, Principal const *) final
void resetBranchDescription(std::shared_ptr< BranchDescription const > bd)
Definition: ProductData.cc:29
std::string const & resolvedModuleLabel_() const override
void resetBranchDescription_(std::shared_ptr< BranchDescription const > bd) override
bool productUnavailable_() const override
BranchDescription const & branchDescription_() const override
void prefetchAsync_(WaitingTask *waitTask, Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
ProducedProductResolver::ProductStatus ProductStatus
UnscheduledAuxiliary const * aux_
def principal(options)
ProductResolverBase const * realProduct_
void connectTo(ProductResolverBase const &iOther, Principal const *iParentPrincipal) final
std::atomic< unsigned int > lastSkipCurrentCheckIndex_
InputProductResolver(std::shared_ptr< BranchDescription const > bd)
virtual void resetFailedFromThisProcess()
std::atomic< bool > prefetchRequested_
bool productWasDeleted_() const override
BranchDescription const & branchDescription_() const final
void resetBranchDescription_(std::shared_ptr< BranchDescription const > bd) override
void mergeProduct(std::unique_ptr< WrapperBase > edp) const
void putOrMergeProduct_(std::unique_ptr< WrapperBase > prod) const final
std::atomic< bool > prefetchRequested_
std::atomic< bool > m_prefetchRequested
std::atomic< unsigned int > lastCheckIndex_
bool unscheduledWasNotRun_() const override
void setProvenance_(ProductProvenanceRetriever const *provRetriever, ProcessHistory const &ph, ProductID const &pid) final
bool unscheduledWasNotRun_() const override
virtual void prefetchAsync_(WaitingTask *waitTask, Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const =0
AliasProductResolver(std::shared_ptr< BranchDescription const > bd, ProducedProductResolver &realProduct)
ProducedProductResolver::ProductStatus ProductStatus
virtual bool isFromCurrentProcess() const =0
void setProduct(std::unique_ptr< WrapperBase > edp) const
std::atomic< bool > skippingPrefetchRequested_
std::atomic< ProductStatus > theStatus_
std::vector< bool > ambiguous_
Resolution resolveProductImpl(FUNC resolver) const
Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
bool unscheduledWasNotRun_() const override
virtual bool unscheduledWasNotRun_() const =0
ProductStatus defaultStatus() const
bool productWasFetchedAndIsValid_(bool iSkipCurrentProcess) const final
BranchDescription const & branchDescription_() const override
std::vector< ProductResolverIndex > matchingHolders_
HLT enums.
bool productWasFetchedAndIsValid_(bool iSkipCurrentProcess) const override
bool productWasDeleted_() const override
virtual void retrieveAndMerge_(Principal const &principal) const
void resetBranchDescription_(std::shared_ptr< BranchDescription const > bd) final
ProductProvenance const * productProvenancePtr_() const final
ProducedProductResolver::ProductStatus ProductStatus
ProductData const & getProductData() const
std::shared_ptr< BranchDescription const > bd_
std::string const & resolvedModuleLabel_() const override
bool unscheduledWasNotRun_() const final
PuttableProductResolver(std::shared_ptr< BranchDescription const > bd)
Provenance const * provenance_() const final
SingleChoiceNoProcessProductResolver(ProductResolverIndex iChoice)
virtual Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const =0
void setProcessHistory_(ProcessHistory const &ph) final
std::atomic< bool > recheckedAtEnd_
void prefetchAsync_(WaitingTask *waitTask, Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
std::string const & moduleLabel() const