CMS 3D CMS Logo

OutputModuleCommunicatorT.cc
Go to the documentation of this file.
1 /*----------------------------------------------------------------------
2 ----------------------------------------------------------------------*/
3 
17 
19 
24 
25 namespace {
26 
27  template <typename F>
28  void async(edm::one::OutputModuleBase& iMod, oneapi::tbb::task_group& iGroup, F&& iFunc) {
29  iMod.sharedResourcesAcquirer().serialQueueChain().push(iGroup, std::move(iFunc));
30  }
31 
32  template <typename F>
33  void async(edm::limited::OutputModuleBase& iMod, oneapi::tbb::task_group& iGroup, F&& iFunc) {
34  iMod.queue().push(iGroup, std::move(iFunc));
35  }
36 
37  template <typename F>
38  void async(edm::global::OutputModuleBase&, oneapi::tbb::task_group& iGroup, F iFunc) {
39  //NOTE, need the functor since group can not run a 'mutable' lambda
40  auto t = edm::make_functor_task(iFunc);
41  iGroup.run([t]() {
43  t->execute();
44  });
45  }
46 } // namespace
47 
48 namespace edm {
49 
50  template <typename T>
52  module().doCloseFile();
53  }
54 
55  template <typename T>
57  return module().shouldWeCloseFile();
58  }
59 
60  template <typename T>
62  module().doOpenFile(fb);
63  }
64 
65  template <typename T>
67  ProcessBlockPrincipal const& processBlockPrincipal,
68  ProcessContext const* processContext,
69  ActivityRegistry* activityRegistry) noexcept {
76  processContext);
77  auto t = [&mod = module(),
78  &processBlockPrincipal,
79  globalContext,
80  token,
81  desc = &description(),
82  activityRegistry,
83  iTask]() mutable {
84  std::exception_ptr ex;
85  // Caught exception is propagated via WaitingTaskHolder
86  CMS_SA_ALLOW try {
88  ParentContext parentContext(&globalContext);
90  ModuleContextSentry moduleContextSentry(&mcc, parentContext);
91  activityRegistry->preModuleWriteProcessBlockSignal_(globalContext, mcc);
92  auto sentry(make_sentry(activityRegistry, [&globalContext, &mcc](ActivityRegistry* ar) {
93  ar->postModuleWriteProcessBlockSignal_(globalContext, mcc);
94  }));
95  mod.doWriteProcessBlock(processBlockPrincipal, &mcc);
96  } catch (...) {
97  ex = std::current_exception();
98  }
99  iTask.doneWaiting(ex);
100  };
101  async(module(), *iTask.group(), std::move(t));
102  }
103 
104  template <typename T>
106  WaitingTaskHolder iTask,
107  edm::RunPrincipal const& rp,
108  ProcessContext const* processContext,
109  ActivityRegistry* activityRegistry,
110  MergeableRunProductMetadata const* mergeableRunProductMetadata) noexcept {
113  LuminosityBlockID(rp.run(), 0),
114  rp.index(),
116  rp.endTime(),
117  processContext);
118  auto t = [&mod = module(),
119  &rp,
120  globalContext,
121  token,
122  desc = &description(),
123  activityRegistry,
124  mergeableRunProductMetadata,
125  iTask]() mutable {
126  std::exception_ptr ex;
127  // Caught exception is propagated via WaitingTaskHolder
128  CMS_SA_ALLOW try {
130  ParentContext parentContext(&globalContext);
132  ModuleContextSentry moduleContextSentry(&mcc, parentContext);
133  activityRegistry->preModuleWriteRunSignal_(globalContext, mcc);
134  auto sentry(make_sentry(activityRegistry, [&globalContext, &mcc](ActivityRegistry* ar) {
135  ar->postModuleWriteRunSignal_(globalContext, mcc);
136  }));
137  mod.doWriteRun(rp, &mcc, mergeableRunProductMetadata);
138  } catch (...) {
139  ex = std::current_exception();
140  }
141  iTask.doneWaiting(ex);
142  };
143  async(module(), *iTask.group(), std::move(t));
144  }
145 
146  template <typename T>
149  ProcessContext const* processContext,
150  ActivityRegistry* activityRegistry) noexcept {
153  lbp.id(),
154  lbp.runPrincipal().index(),
155  lbp.index(),
156  lbp.beginTime(),
157  processContext);
158  auto t = [&mod = module(), &lbp, activityRegistry, token, globalContext, desc = &description(), iTask]() mutable {
159  std::exception_ptr ex;
160  // Caught exception is propagated via WaitingTaskHolder
161  CMS_SA_ALLOW try {
163 
164  ParentContext parentContext(&globalContext);
166  ModuleContextSentry moduleContextSentry(&mcc, parentContext);
167  activityRegistry->preModuleWriteLumiSignal_(globalContext, mcc);
168  auto sentry(make_sentry(activityRegistry, [&globalContext, &mcc](ActivityRegistry* ar) {
169  ar->postModuleWriteLumiSignal_(globalContext, mcc);
170  }));
171  mod.doWriteLuminosityBlock(lbp, &mcc);
172  } catch (...) {
173  ex = std::current_exception();
174  }
175  iTask.doneWaiting(ex);
176  };
177  async(module(), *iTask.group(), std::move(t));
178  }
179 
180  template <typename T>
182  return module().wantAllEvents();
183  }
184 
185  template <typename T>
187  return module().limitReached();
188  }
189 
190  template <typename T>
192  module().configure(desc);
193  }
194 
195  template <typename T>
197  return module().keptProducts();
198  }
199 
200  template <typename T>
203  ProcessBlockHelperBase const& processBlockHelper) {
204  module().selectProducts(preg, helper, processBlockHelper);
205  }
206 
207  template <typename T>
209  std::map<std::string, std::vector<std::pair<std::string, int>>> const& outputModulePathPositions,
210  bool anyProductProduced) {
211  module().setEventSelectionInfo(outputModulePathPositions, anyProductProduced);
212  }
213 
214  template <typename T>
216  return module().description();
217  }
218 
219  namespace impl {
220  std::unique_ptr<edm::OutputModuleCommunicator> createCommunicatorIfNeeded(void*) {
221  return std::unique_ptr<edm::OutputModuleCommunicator>{};
222  }
223  std::unique_ptr<edm::OutputModuleCommunicator> createCommunicatorIfNeeded(::edm::global::OutputModuleBase* iMod) {
224  return std::make_unique<OutputModuleCommunicatorT<edm::global::OutputModuleBase>>(iMod);
225  }
226  std::unique_ptr<edm::OutputModuleCommunicator> createCommunicatorIfNeeded(::edm::one::OutputModuleBase* iMod) {
227  return std::make_unique<OutputModuleCommunicatorT<edm::one::OutputModuleBase>>(iMod);
228  }
229  std::unique_ptr<edm::OutputModuleCommunicator> createCommunicatorIfNeeded(::edm::limited::OutputModuleBase* iMod) {
230  return std::make_unique<OutputModuleCommunicatorT<edm::limited::OutputModuleBase>>(iMod);
231  }
232  } // namespace impl
233 } // namespace edm
234 
235 namespace edm {
236  template class OutputModuleCommunicatorT<one::OutputModuleBase>;
237  template class OutputModuleCommunicatorT<global::OutputModuleBase>;
238  template class OutputModuleCommunicatorT<limited::OutputModuleBase>;
239 } // namespace edm
void selectProducts(edm::ProductRegistry const &preg, ThinnedAssociationsHelper const &, ProcessBlockHelperBase const &) override
void push(oneapi::tbb::task_group &iGroup, T &&iAction)
asynchronously pushes functor iAction into queue
SharedResourcesAcquirer & sharedResourcesAcquirer()
std::unique_ptr< T, F > make_sentry(T *iObject, F iFunc)
NOTE: if iObject is null, then iFunc will not be called.
Definition: make_sentry.h:30
#define CMS_SA_ALLOW
void writeLumiAsync(WaitingTaskHolder iTask, edm::LuminosityBlockPrincipal const &lbp, ProcessContext const *, ActivityRegistry *) noexcept override
Definition: helper.py:1
static Timestamp invalidTimestamp()
Definition: Timestamp.h:75
ModuleDescription const & description() const override
PostModuleWriteProcessBlock postModuleWriteProcessBlockSignal_
std::array< SelectedProducts, NumBranchTypes > SelectedProductsForBranchType
PostModuleWriteLumi postModuleWriteLumiSignal_
void configure(edm::OutputModuleDescription const &desc) override
void writeRunAsync(WaitingTaskHolder iTask, edm::RunPrincipal const &rp, ProcessContext const *, ActivityRegistry *, MergeableRunProductMetadata const *) noexcept override
static RunIndex invalidRunIndex()
Definition: RunIndex.cc:9
edm::SelectedProductsForBranchType const & keptProducts() const override
SerialTaskQueueChain & serialQueueChain() const
static ServiceRegistry & instance()
std::unique_ptr< edm::OutputModuleCommunicator > createCommunicatorIfNeeded(void *)
PostModuleWriteRun postModuleWriteRunSignal_
static LuminosityBlockIndex invalidLuminosityBlockIndex()
void setEventSelectionInfo(std::map< std::string, std::vector< std::pair< std::string, int > > > const &outputModulePathPositions, bool anyProductProduced) override
FunctorTask< F > * make_functor_task(F f)
Definition: FunctorTask.h:44
void writeProcessBlockAsync(WaitingTaskHolder iTask, ProcessBlockPrincipal const &, ProcessContext const *, ActivityRegistry *) noexcept override
HLT enums.
static uInt32 F(BLOWFISH_CTX *ctx, uInt32 x)
Definition: blowfish.cc:163
T mod(const T &a, const T &b)
Definition: ecalDccMap.h:4
void openFile(edm::FileBlock const &fb) override
ServiceToken presentToken() const
def move(src, dest)
Definition: eostools.py:511
void push(oneapi::tbb::task_group &iGroup, T &&iAction)
asynchronously pushes functor iAction into queue