CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Groups Pages
OutputModuleCommunicatorT.cc
Go to the documentation of this file.
1 /*----------------------------------------------------------------------
2 ----------------------------------------------------------------------*/
3 
17 
19 
24 
25 namespace {
26 
27  template <typename F>
28  void async(edm::one::OutputModuleBase& iMod, tbb::task_group& iGroup, F&& iFunc) {
29  iMod.sharedResourcesAcquirer().serialQueueChain().push(iGroup, std::move(iFunc));
30  }
31 
32  template <typename F>
33  void async(edm::limited::OutputModuleBase& iMod, tbb::task_group& iGroup, F&& iFunc) {
34  iMod.queue().push(iGroup, std::move(iFunc));
35  }
36 
37  template <typename F>
38  void async(edm::global::OutputModuleBase&, tbb::task_group& iGroup, F iFunc) {
39  //NOTE, need the functor since group can not run a 'mutable' lambda
40  auto t = edm::make_functor_task(iFunc);
41  iGroup.run([t]() {
43  t->execute();
44  });
45  }
46 } // namespace
47 
48 namespace edm {
49 
50  template <typename T>
52  module().doCloseFile();
53  }
54 
55  template <typename T>
57  return module().shouldWeCloseFile();
58  }
59 
60  template <typename T>
62  module().doOpenFile(fb);
63  }
64 
65  template <typename T>
67  ProcessBlockPrincipal const& processBlockPrincipal,
68  ProcessContext const* processContext,
69  ActivityRegistry* activityRegistry) {
76  processContext);
77  auto t = [&mod = module(),
78  &processBlockPrincipal,
79  globalContext,
80  token,
81  desc = &description(),
82  activityRegistry,
83  iTask]() mutable {
84  std::exception_ptr ex;
85  // Caught exception is propagated via WaitingTaskHolder
86  CMS_SA_ALLOW try {
87  ServiceRegistry::Operate op(token);
88  ParentContext parentContext(&globalContext);
89  ModuleCallingContext mcc(desc);
90  ModuleContextSentry moduleContextSentry(&mcc, parentContext);
91  activityRegistry->preModuleWriteProcessBlockSignal_(globalContext, mcc);
92  auto sentry(make_sentry(activityRegistry, [&globalContext, &mcc](ActivityRegistry* ar) {
93  ar->postModuleWriteProcessBlockSignal_(globalContext, mcc);
94  }));
95  mod.doWriteProcessBlock(processBlockPrincipal, &mcc);
96  } catch (...) {
97  ex = std::current_exception();
98  }
99  iTask.doneWaiting(ex);
100  };
101  async(module(), *iTask.group(), std::move(t));
102  }
103 
104  template <typename T>
106  edm::RunPrincipal const& rp,
107  ProcessContext const* processContext,
108  ActivityRegistry* activityRegistry,
109  MergeableRunProductMetadata const* mergeableRunProductMetadata) {
112  LuminosityBlockID(rp.run(), 0),
113  rp.index(),
115  rp.endTime(),
116  processContext);
117  auto t = [&mod = module(),
118  &rp,
119  globalContext,
120  token,
121  desc = &description(),
122  activityRegistry,
123  mergeableRunProductMetadata,
124  iTask]() mutable {
125  std::exception_ptr ex;
126  // Caught exception is propagated via WaitingTaskHolder
127  CMS_SA_ALLOW try {
128  ServiceRegistry::Operate op(token);
129  ParentContext parentContext(&globalContext);
130  ModuleCallingContext mcc(desc);
131  ModuleContextSentry moduleContextSentry(&mcc, parentContext);
132  activityRegistry->preModuleWriteRunSignal_(globalContext, mcc);
133  auto sentry(make_sentry(activityRegistry, [&globalContext, &mcc](ActivityRegistry* ar) {
134  ar->postModuleWriteRunSignal_(globalContext, mcc);
135  }));
136  mod.doWriteRun(rp, &mcc, mergeableRunProductMetadata);
137  } catch (...) {
138  ex = std::current_exception();
139  }
140  iTask.doneWaiting(ex);
141  };
142  async(module(), *iTask.group(), std::move(t));
143  }
144 
145  template <typename T>
148  ProcessContext const* processContext,
149  ActivityRegistry* activityRegistry) {
152  lbp.id(),
153  lbp.runPrincipal().index(),
154  lbp.index(),
155  lbp.beginTime(),
156  processContext);
157  auto t = [&mod = module(), &lbp, activityRegistry, token, globalContext, desc = &description(), iTask]() mutable {
158  std::exception_ptr ex;
159  // Caught exception is propagated via WaitingTaskHolder
160  CMS_SA_ALLOW try {
161  ServiceRegistry::Operate op(token);
162 
163  ParentContext parentContext(&globalContext);
164  ModuleCallingContext mcc(desc);
165  ModuleContextSentry moduleContextSentry(&mcc, parentContext);
166  activityRegistry->preModuleWriteLumiSignal_(globalContext, mcc);
167  auto sentry(make_sentry(activityRegistry, [&globalContext, &mcc](ActivityRegistry* ar) {
168  ar->postModuleWriteLumiSignal_(globalContext, mcc);
169  }));
170  mod.doWriteLuminosityBlock(lbp, &mcc);
171  } catch (...) {
172  ex = std::current_exception();
173  }
174  iTask.doneWaiting(ex);
175  };
176  async(module(), *iTask.group(), std::move(t));
177  }
178 
179  template <typename T>
181  return module().wantAllEvents();
182  }
183 
184  template <typename T>
186  return module().limitReached();
187  }
188 
189  template <typename T>
191  module().configure(desc);
192  }
193 
194  template <typename T>
196  return module().keptProducts();
197  }
198 
199  template <typename T>
201  ThinnedAssociationsHelper const& helper,
202  ProcessBlockHelperBase const& processBlockHelper) {
203  module().selectProducts(preg, helper, processBlockHelper);
204  }
205 
206  template <typename T>
208  std::map<std::string, std::vector<std::pair<std::string, int>>> const& outputModulePathPositions,
209  bool anyProductProduced) {
210  module().setEventSelectionInfo(outputModulePathPositions, anyProductProduced);
211  }
212 
213  template <typename T>
215  return module().description();
216  }
217 
218  namespace impl {
219  std::unique_ptr<edm::OutputModuleCommunicator> createCommunicatorIfNeeded(void*) {
220  return std::unique_ptr<edm::OutputModuleCommunicator>{};
221  }
222  std::unique_ptr<edm::OutputModuleCommunicator> createCommunicatorIfNeeded(::edm::global::OutputModuleBase* iMod) {
223  return std::make_unique<OutputModuleCommunicatorT<edm::global::OutputModuleBase>>(iMod);
224  }
225  std::unique_ptr<edm::OutputModuleCommunicator> createCommunicatorIfNeeded(::edm::one::OutputModuleBase* iMod) {
226  return std::make_unique<OutputModuleCommunicatorT<edm::one::OutputModuleBase>>(iMod);
227  }
228  std::unique_ptr<edm::OutputModuleCommunicator> createCommunicatorIfNeeded(::edm::limited::OutputModuleBase* iMod) {
229  return std::make_unique<OutputModuleCommunicatorT<edm::limited::OutputModuleBase>>(iMod);
230  }
231  } // namespace impl
232 } // namespace edm
233 
234 namespace edm {
235  template class OutputModuleCommunicatorT<one::OutputModuleBase>;
236  template class OutputModuleCommunicatorT<global::OutputModuleBase>;
237  template class OutputModuleCommunicatorT<limited::OutputModuleBase>;
238 } // namespace edm
RunPrincipal const & runPrincipal() const
void selectProducts(edm::ProductRegistry const &preg, ThinnedAssociationsHelper const &, ProcessBlockHelperBase const &) override
edm::SelectedProductsForBranchType const & keptProducts() const override
SharedResourcesAcquirer & sharedResourcesAcquirer()
std::unique_ptr< T, F > make_sentry(T *iObject, F iFunc)
NOTE: if iObject is null, then iFunc will not be called.
Definition: make_sentry.h:30
#define CMS_SA_ALLOW
Timestamp const & beginTime() const
static Timestamp invalidTimestamp()
Definition: Timestamp.h:82
PostModuleWriteProcessBlock postModuleWriteProcessBlockSignal_
void push(tbb::task_group &iGroup, T &&iAction)
asynchronously pushes functor iAction into queue
std::array< SelectedProducts, NumBranchTypes > SelectedProductsForBranchType
PostModuleWriteLumi postModuleWriteLumiSignal_
void configure(edm::OutputModuleDescription const &desc) override
LuminosityBlockIndex index() const
RunNumber_t run() const
Definition: RunPrincipal.h:61
ServiceToken presentToken() const
static RunIndex invalidRunIndex()
Definition: RunIndex.cc:9
void writeRunAsync(WaitingTaskHolder iTask, edm::RunPrincipal const &rp, ProcessContext const *, ActivityRegistry *, MergeableRunProductMetadata const *) override
void doneWaiting(std::exception_ptr iExcept)
void push(tbb::task_group &iGroup, T &&iAction)
asynchronously pushes functor iAction into queue
def move
Definition: eostools.py:511
static ServiceRegistry & instance()
Timestamp const & endTime() const
Definition: RunPrincipal.h:69
std::unique_ptr< edm::OutputModuleCommunicator > createCommunicatorIfNeeded(void *)
PostModuleWriteRun postModuleWriteRunSignal_
static LuminosityBlockIndex invalidLuminosityBlockIndex()
void writeProcessBlockAsync(WaitingTaskHolder iTask, ProcessBlockPrincipal const &, ProcessContext const *, ActivityRegistry *) override
SerialTaskQueueChain & serialQueueChain() const
void writeLumiAsync(WaitingTaskHolder iTask, edm::LuminosityBlockPrincipal const &lbp, ProcessContext const *, ActivityRegistry *) override
void setEventSelectionInfo(std::map< std::string, std::vector< std::pair< std::string, int > > > const &outputModulePathPositions, bool anyProductProduced) override
ModuleDescription const & description() const override
PreModuleWriteRun preModuleWriteRunSignal_
FunctorTask< F > * make_functor_task(F f)
Definition: FunctorTask.h:44
tbb::task_group * group() const noexcept
RunIndex index() const
Definition: RunPrincipal.h:57
PreModuleWriteProcessBlock preModuleWriteProcessBlockSignal_
static uInt32 F(BLOWFISH_CTX *ctx, uInt32 x)
Definition: blowfish.cc:163
preg
Definition: Schedule.cc:687
T mod(const T &a, const T &b)
Definition: ecalDccMap.h:4
void openFile(edm::FileBlock const &fb) override
tuple module
Definition: callgraph.py:69