CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
ProductResolvers.h
Go to the documentation of this file.
1 #ifndef FWCore_Framework_ProductResolvers_h
2 #define FWCore_Framework_ProductResolvers_h
3 
4 /*----------------------------------------------------------------------
5 
6 ProductResolver: A collection of information related to a single WrapperBase or
7 a set of related EDProducts. This is the storage unit of such information.
8 
9 ----------------------------------------------------------------------*/
19 
20 #include <memory>
21 #include <atomic>
22 
23 #include <string>
24 
25 namespace edm {
26  class ProductProvenanceRetriever;
27  class DelayedReader;
28  class ModuleCallingContext;
30  class Principal;
32  class Worker;
33  class ServiceToken;
34 
36  public:
37  enum class ProductStatus {
38  ProductSet,
39  NotPut,
43  };
44 
45  DataManagingProductResolver(std::shared_ptr<BranchDescription const> bd,ProductStatus iDefaultStatus): ProductResolverBase(),
46  productData_(bd),
47  theStatus_(iDefaultStatus),
48  defaultStatus_(iDefaultStatus){}
49 
50  virtual void connectTo(ProductResolverBase const&, Principal const*) override final;
51 
53 
54  //Give AliasProductResolver access
55  virtual void resetProductData_(bool deleteEarly) override;
56 
57  protected:
58  void setProduct(std::unique_ptr<WrapperBase> edp) const;
59  ProductStatus status() const { return theStatus_;}
62  //Handle the boilerplate code needed for resolveProduct_
63  template <bool callResolver, typename FUNC>
64  Resolution resolveProductImpl( FUNC resolver) const;
65 
66  private:
67 
68  void throwProductDeletedException() const;
69  void checkType(WrapperBase const& prod) const;
70  ProductData const& getProductData() const {return productData_;}
71  virtual bool isFromCurrentProcess() const = 0;
72  // merges the product with the pre-existing product
73  void mergeProduct(std::unique_ptr<WrapperBase> edp) const;
74 
75  virtual void putOrMergeProduct_(std::unique_ptr<WrapperBase> prod) const override final;
76  virtual bool productUnavailable_() const override final;
77  virtual bool productResolved_() const override final;
78  virtual bool productWasDeleted_() const override final;
79  virtual bool productWasFetchedAndIsValid_( bool iSkipCurrentProcess) const override final;
80 
82  virtual void resetBranchDescription_(std::shared_ptr<BranchDescription const> bd) override final {productData_.resetBranchDescription(bd);}
83  virtual Provenance const* provenance_() const override final {return &productData_.provenance();}
84 
85  virtual std::string const& resolvedModuleLabel_() const override final {return moduleLabel();}
86  virtual void setProvenance_(ProductProvenanceRetriever const* provRetriever, ProcessHistory const& ph, ProductID const& pid) override final;
87  virtual void setProcessHistory_(ProcessHistory const& ph) override final;
88  virtual ProductProvenance const* productProvenancePtr_() const override final;
89  virtual bool singleProduct_() const override final;
90 
92  mutable std::atomic<ProductStatus> theStatus_;
94  };
95 
97  public:
98  explicit InputProductResolver(std::shared_ptr<BranchDescription const> bd) :
99  DataManagingProductResolver(bd, ProductStatus::ResolveNotRun),
100  m_prefetchRequested{ false },
101  aux_{nullptr} {}
102 
103  virtual void setupUnscheduled(UnscheduledConfigurator const&) override final;
104 
105  private:
106  virtual bool isFromCurrentProcess() const override final;
107 
108 
109  virtual Resolution resolveProduct_(Principal const& principal,
110  bool skipCurrentProcess,
112  ModuleCallingContext const* mcc) const override;
113  virtual void prefetchAsync_(WaitingTask* waitTask,
114  Principal const& principal,
115  bool skipCurrentProcess,
117  ModuleCallingContext const* mcc) const override;
118  virtual void putProduct_(std::unique_ptr<WrapperBase> edp) const override;
119 
120  virtual void retrieveAndMerge_(Principal const& principal) const override;
121 
122  virtual bool unscheduledWasNotRun_() const override final {return false;}
123 
124  virtual void resetProductData_(bool deleteEarly) override;
125 
126  mutable std::atomic<bool> m_prefetchRequested;
128  UnscheduledAuxiliary const* aux_; //provides access to the delayedGet signals
129 
130 
131  };
132 
134  public:
135  ProducedProductResolver(std::shared_ptr<BranchDescription const> bd, ProductStatus iDefaultStatus) : DataManagingProductResolver(bd, iDefaultStatus) {assert(bd->produced());}
136 
137  virtual void resetFailedFromThisProcess() override;
138 
139  protected:
140  virtual void putProduct_(std::unique_ptr<WrapperBase> edp) const override;
141  private:
142  virtual bool isFromCurrentProcess() const override final;
143 
144  };
145 
147  public:
148  explicit PuttableProductResolver(std::shared_ptr<BranchDescription const> bd) : ProducedProductResolver(bd, ProductStatus::NotPut), worker_(nullptr), prefetchRequested_(false) {}
149 
150  virtual void setupUnscheduled(UnscheduledConfigurator const&) override final;
151 
152  private:
154  bool skipCurrentProcess,
156  ModuleCallingContext const* mcc) const override;
157  virtual void prefetchAsync_(WaitingTask* waitTask,
158  Principal const& principal,
159  bool skipCurrentProcess,
161  ModuleCallingContext const* mcc) const override;
162  virtual bool unscheduledWasNotRun_() const override {return false;}
163 
164  virtual void putProduct_(std::unique_ptr<WrapperBase> edp) const override;
165  virtual void resetProductData_(bool deleteEarly) override;
166 
169  mutable std::atomic<bool> prefetchRequested_;
170 
171  };
172 
174  public:
175  explicit UnscheduledProductResolver(std::shared_ptr<BranchDescription const> bd) :
176  ProducedProductResolver(bd,ProductStatus::ResolveNotRun),
177  aux_(nullptr),
178  prefetchRequested_(false){}
179 
180  virtual void setupUnscheduled(UnscheduledConfigurator const&) override final;
181 
182  private:
184  bool skipCurrentProcess,
186  ModuleCallingContext const* mcc) const override;
187  virtual void prefetchAsync_(WaitingTask* waitTask,
188  Principal const& principal,
189  bool skipCurrentProcess,
191  ModuleCallingContext const* mcc) const override;
192  virtual bool unscheduledWasNotRun_() const override {return status() == ProductStatus::ResolveNotRun;}
193 
194  virtual void resetProductData_(bool deleteEarly) override;
195 
199  mutable std::atomic<bool> prefetchRequested_;
200  };
201 
203  public:
205  explicit AliasProductResolver(std::shared_ptr<BranchDescription const> bd, ProducedProductResolver& realProduct) : ProductResolverBase(), realProduct_(realProduct), bd_(bd) {}
206 
207  virtual void connectTo(ProductResolverBase const& iOther, Principal const* iParentPrincipal) override final {
208  realProduct_.connectTo(iOther, iParentPrincipal );
209  };
210 
211  private:
213  bool skipCurrentProcess,
215  ModuleCallingContext const* mcc) const override {
216  return realProduct_.resolveProduct(principal, skipCurrentProcess, sra, mcc);}
217  virtual void prefetchAsync_(WaitingTask* waitTask,
218  Principal const& principal,
219  bool skipCurrentProcess,
221  ModuleCallingContext const* mcc) const override {
222  realProduct_.prefetchAsync(waitTask, principal, skipCurrentProcess, sra, mcc);
223  }
224  virtual bool unscheduledWasNotRun_() const override {return realProduct_.unscheduledWasNotRun();}
225  virtual bool productUnavailable_() const override {return realProduct_.productUnavailable();}
226  virtual bool productResolved_() const override final {
227  return realProduct_.productResolved(); }
228  virtual bool productWasDeleted_() const override {return realProduct_.productWasDeleted();}
229  virtual bool productWasFetchedAndIsValid_(bool iSkipCurrentProcess) const override final {
230  return realProduct_.productWasFetchedAndIsValid(iSkipCurrentProcess);
231  }
232 
233  virtual void putProduct_(std::unique_ptr<WrapperBase> edp) const override;
234  virtual void putOrMergeProduct_(std::unique_ptr<WrapperBase> prod) const override final;
235  virtual BranchDescription const& branchDescription_() const override {return *bd_;}
236  virtual void resetBranchDescription_(std::shared_ptr<BranchDescription const> bd) override {bd_ = bd;}
237  virtual Provenance const* provenance_() const override final { return realProduct_.provenance(); }
238 
239  virtual std::string const& resolvedModuleLabel_() const override {return realProduct_.moduleLabel();}
240  virtual void setProvenance_(ProductProvenanceRetriever const* provRetriever, ProcessHistory const& ph, ProductID const& pid) override;
241  virtual void setProcessHistory_(ProcessHistory const& ph) override;
242  virtual ProductProvenance const* productProvenancePtr_() const override;
243  virtual void resetProductData_(bool deleteEarly) override;
244  virtual bool singleProduct_() const override;
245 
247  std::shared_ptr<BranchDescription const> bd_;
248  };
249 
251  public:
253  explicit ParentProcessProductResolver(std::shared_ptr<BranchDescription const> bd) : ProductResolverBase(), realProduct_(nullptr), bd_(bd), provRetriever_(nullptr), parentPrincipal_(nullptr) {}
254 
255  virtual void connectTo(ProductResolverBase const& iOther, Principal const* iParentPrincipal) override final {
256  realProduct_ = &iOther;
257  parentPrincipal_ = iParentPrincipal;
258  };
259 
260  private:
262  bool skipCurrentProcess,
264  ModuleCallingContext const* mcc) const override {return realProduct_->resolveProduct(*parentPrincipal_, skipCurrentProcess, sra, mcc);}
265  virtual void prefetchAsync_(WaitingTask* waitTask,
266  Principal const& principal,
267  bool skipCurrentProcess,
269  ModuleCallingContext const* mcc) const override {
270  realProduct_->prefetchAsync( waitTask, *parentPrincipal_, skipCurrentProcess, sra, mcc);
271  }
272  virtual bool unscheduledWasNotRun_() const override {return realProduct_->unscheduledWasNotRun();}
273  virtual bool productUnavailable_() const override {return realProduct_->productUnavailable();}
274  virtual bool productResolved_() const override final { return realProduct_->productResolved(); }
275  virtual bool productWasDeleted_() const override {return realProduct_->productWasDeleted();}
276  virtual bool productWasFetchedAndIsValid_(bool iSkipCurrentProcess) const override {
277  return realProduct_->productWasFetchedAndIsValid(iSkipCurrentProcess); }
278 
279  virtual void putProduct_(std::unique_ptr<WrapperBase> edp) const override;
280  virtual void putOrMergeProduct_(std::unique_ptr<WrapperBase> prod) const override final;
281  virtual BranchDescription const& branchDescription_() const override {return *bd_;}
282  virtual void resetBranchDescription_(std::shared_ptr<BranchDescription const> bd) override {bd_ = bd;}
283  virtual Provenance const* provenance_() const override final {return realProduct_->provenance();
284  }
285  virtual std::string const& resolvedModuleLabel_() const override {return realProduct_->moduleLabel();}
286  virtual void setProvenance_(ProductProvenanceRetriever const* provRetriever, ProcessHistory const& ph, ProductID const& pid) override;
287  virtual void setProcessHistory_(ProcessHistory const& ph) override;
288  virtual ProductProvenance const* productProvenancePtr_() const override;
289  virtual void resetProductData_(bool deleteEarly) override;
290  virtual bool singleProduct_() const override;
291 
293  std::shared_ptr<BranchDescription const> bd_;
296  };
297 
299  public:
301  NoProcessProductResolver(std::vector<ProductResolverIndex> const& matchingHolders,
302  std::vector<bool> const& ambiguous);
303 
304  virtual void connectTo(ProductResolverBase const& iOther, Principal const*) override final ;
305 
306  void tryPrefetchResolverAsync(unsigned int iProcessingIndex,
308  bool skipCurrentProcess,
312 
313  bool dataValidFromResolver(unsigned int iProcessingIndex,
314  Principal const& principal,
315  bool iSkipCurrentProcess) const;
316 
317  void prefetchFailed(unsigned int iProcessingIndex,
318  Principal const& principal,
319  bool iSkipCurrentProcess,
320  std::exception_ptr iExceptPtr) const;
321  private:
322  unsigned int unsetIndexValue() const;
323  virtual Resolution resolveProduct_(Principal const& principal,
324  bool skipCurrentProcess,
326  ModuleCallingContext const* mcc) const override;
327  virtual void prefetchAsync_(WaitingTask* waitTask,
328  Principal const& principal,
329  bool skipCurrentProcess,
331  ModuleCallingContext const* mcc) const override;
332  virtual bool unscheduledWasNotRun_() const override;
333  virtual bool productUnavailable_() const override;
334  virtual bool productWasDeleted_() const override;
335  virtual bool productResolved_() const override final;
336  virtual bool productWasFetchedAndIsValid_(bool iSkipCurrentProcess) const override;
337 
338  virtual void putProduct_(std::unique_ptr<WrapperBase> edp) const override;
339  virtual void putOrMergeProduct_(std::unique_ptr<WrapperBase> prod) const override final;
340  virtual BranchDescription const& branchDescription_() const override;
341  virtual void resetBranchDescription_(std::shared_ptr<BranchDescription const> bd) override;
342  virtual Provenance const* provenance_() const override;
343 
344  virtual std::string const& resolvedModuleLabel_() const override {return moduleLabel();}
345  virtual void setProvenance_(ProductProvenanceRetriever const* provRetriever, ProcessHistory const& ph, ProductID const& pid) override;
346  virtual void setProcessHistory_(ProcessHistory const& ph) override;
347  virtual ProductProvenance const* productProvenancePtr_() const override;
348  virtual void resetProductData_(bool deleteEarly) override;
349  virtual bool singleProduct_() const override;
350 
351  Resolution tryResolver(unsigned int index,
352  Principal const& principal,
353  bool skipCurrentProcess,
355  ModuleCallingContext const* mcc) const;
356 
357  void setCache(bool skipCurrentProcess, ProductResolverIndex index, std::exception_ptr exceptionPtr) const;
358 
359  std::vector<ProductResolverIndex> matchingHolders_;
360  std::vector<bool> ambiguous_;
363  mutable std::atomic<unsigned int> lastCheckIndex_;
364  mutable std::atomic<unsigned int> lastSkipCurrentCheckIndex_;
365  mutable std::atomic<bool> prefetchRequested_;
366  mutable std::atomic<bool> skippingPrefetchRequested_;
367  };
368 
370  public:
373  ProductResolverBase(), realResolverIndex_(iChoice) {}
374 
375  virtual void connectTo(ProductResolverBase const& iOther, Principal const*) override final ;
376 
377  private:
379  bool skipCurrentProcess,
381  ModuleCallingContext const* mcc) const override;
382  virtual void prefetchAsync_(WaitingTask* waitTask,
383  Principal const& principal,
384  bool skipCurrentProcess,
386  ModuleCallingContext const* mcc) const override;
387  virtual bool unscheduledWasNotRun_() const override;
388  virtual bool productUnavailable_() const override;
389  virtual bool productWasDeleted_() const override;
390  virtual bool productResolved_() const override final;
391  virtual bool productWasFetchedAndIsValid_(bool iSkipCurrentProcess) const override;
392 
393  virtual void putProduct_(std::unique_ptr<WrapperBase> edp) const override;
394  virtual void putOrMergeProduct_(std::unique_ptr<WrapperBase> prod) const override final;
395  virtual BranchDescription const& branchDescription_() const override;
396  virtual void resetBranchDescription_(std::shared_ptr<BranchDescription const> bd) override;
397  virtual Provenance const* provenance_() const override;
398 
399  virtual std::string const& resolvedModuleLabel_() const override {return moduleLabel();}
400  virtual void setProvenance_(ProductProvenanceRetriever const* provRetriever, ProcessHistory const& ph, ProductID const& pid) override;
401  virtual void setProcessHistory_(ProcessHistory const& ph) override;
402  virtual ProductProvenance const* productProvenancePtr_() const override;
403  virtual void resetProductData_(bool deleteEarly) override;
404  virtual bool singleProduct_() const override;
405 
407  };
408 
409 }
410 
411 #endif
ProducedProductResolver & realProduct_
Provenance const & provenance() const
Definition: ProductData.h:30
virtual bool productWasDeleted_() const override
virtual Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
unsigned int ProductResolverIndex
std::shared_ptr< BranchDescription const > const & branchDescription() const
Definition: ProductData.h:26
virtual void setProcessHistory_(ProcessHistory const &ph) overridefinal
virtual BranchDescription const & branchDescription_() const override
void checkType(WrapperBase const &prod) const
ParentProcessProductResolver(std::shared_ptr< BranchDescription const > bd)
virtual Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const =0
ProducedProductResolver::ProductStatus ProductStatus
ProductStatus const defaultStatus_
std::shared_ptr< BranchDescription const > bd_
virtual Resolution resolveProduct_(Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
virtual void resetProductData_(bool deleteEarly) override
virtual std::string const & resolvedModuleLabel_() const override
virtual Provenance const * provenance_() const overridefinal
UnscheduledProductResolver(std::shared_ptr< BranchDescription const > bd)
virtual bool unscheduledWasNotRun_() const =0
UnscheduledAuxiliary const * aux_
assert(m_qm.get())
virtual bool productWasDeleted_() const override
ProductStatus status() const
virtual void setupUnscheduled(UnscheduledConfigurator const &)
virtual void connectTo(ProductResolverBase const &iOther, Principal const *iParentPrincipal) overridefinal
WaitingTaskList m_waitingTasks
virtual void resetBranchDescription_(std::shared_ptr< BranchDescription const > bd) override
DataManagingProductResolver(std::shared_ptr< BranchDescription const > bd, ProductStatus iDefaultStatus)
virtual Provenance const * provenance_() const overridefinal
virtual bool productResolved_() const overridefinal
virtual std::string const & resolvedModuleLabel_() const override
#define nullptr
ProducedProductResolver(std::shared_ptr< BranchDescription const > bd, ProductStatus iDefaultStatus)
std::atomic< bool > prefetchRequested_
virtual bool unscheduledWasNotRun_() const override
ProductProvenanceRetriever const * provRetriever_
void resetBranchDescription(std::shared_ptr< BranchDescription const > bd)
Definition: ProductData.cc:27
ProducedProductResolver::ProductStatus ProductStatus
virtual bool productResolved_() const overridefinal
virtual void prefetchAsync_(WaitingTask *waitTask, Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const =0
UnscheduledAuxiliary const * aux_
virtual bool productWasFetchedAndIsValid_(bool iSkipCurrentProcess) const override
virtual BranchDescription const & branchDescription_() const overridefinal
virtual BranchDescription const & branchDescription_() const override
virtual bool isFromCurrentProcess() const =0
ProductResolverBase const * realProduct_
std::atomic< unsigned int > lastSkipCurrentCheckIndex_
InputProductResolver(std::shared_ptr< BranchDescription const > bd)
virtual void resetFailedFromThisProcess()
virtual bool productResolved_() const overridefinal
std::atomic< bool > prefetchRequested_
virtual Provenance const * provenance_() const overridefinal
virtual void resetBranchDescription_(std::shared_ptr< BranchDescription const > bd) overridefinal
virtual bool productUnavailable_() const override
void mergeProduct(std::unique_ptr< WrapperBase > edp) const
virtual void putProduct_(std::unique_ptr< WrapperBase > edp) const =0
std::atomic< bool > prefetchRequested_
std::atomic< bool > m_prefetchRequested
std::atomic< unsigned int > lastCheckIndex_
virtual bool productUnavailable_() const overridefinal
AliasProductResolver(std::shared_ptr< BranchDescription const > bd, ProducedProductResolver &realProduct)
ProducedProductResolver::ProductStatus ProductStatus
virtual ProductProvenance const * productProvenancePtr_() const overridefinal
void setProduct(std::unique_ptr< WrapperBase > edp) const
std::atomic< bool > skippingPrefetchRequested_
std::atomic< ProductStatus > theStatus_
std::vector< bool > ambiguous_
Resolution resolveProductImpl(FUNC resolver) const
virtual void setProvenance_(ProductProvenanceRetriever const *provRetriever, ProcessHistory const &ph, ProductID const &pid) overridefinal
virtual bool productWasDeleted_() const overridefinal
virtual bool productUnavailable_() const override
ProductStatus defaultStatus() const
tuple pid
Definition: sysUtil.py:22
virtual void connectTo(ProductResolverBase const &iOther, Principal const *iParentPrincipal) overridefinal
std::vector< ProductResolverIndex > matchingHolders_
virtual bool productWasFetchedAndIsValid_(bool iSkipCurrentProcess) const overridefinal
virtual bool singleProduct_() const overridefinal
virtual void retrieveAndMerge_(Principal const &principal) const
virtual void prefetchAsync_(WaitingTask *waitTask, Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
virtual void resetBranchDescription_(std::shared_ptr< BranchDescription const > bd) override
virtual std::string const & resolvedModuleLabel_() const overridefinal
ProducedProductResolver::ProductStatus ProductStatus
virtual bool unscheduledWasNotRun_() const override
ProductData const & getProductData() const
std::shared_ptr< BranchDescription const > bd_
virtual void connectTo(ProductResolverBase const &, Principal const *) overridefinal
volatile std::atomic< bool > shutdown_flag false
virtual void putOrMergeProduct_(std::unique_ptr< WrapperBase > prod) const overridefinal
PuttableProductResolver(std::shared_ptr< BranchDescription const > bd)
SingleChoiceNoProcessProductResolver(ProductResolverIndex iChoice)
virtual bool productWasFetchedAndIsValid_(bool iSkipCurrentProcess) const overridefinal
virtual void prefetchAsync_(WaitingTask *waitTask, Principal const &principal, bool skipCurrentProcess, SharedResourcesAcquirer *sra, ModuleCallingContext const *mcc) const override
std::string const & moduleLabel() const