CMS 3D CMS Logo

OutputModuleCommunicatorT.cc
Go to the documentation of this file.
1 /*----------------------------------------------------------------------
2 ----------------------------------------------------------------------*/
3 
17 
19 
24 
25 namespace {
26 
27  template <typename F>
28  void async(edm::one::OutputModuleBase& iMod, tbb::task_group& iGroup, F&& iFunc) {
29  iMod.sharedResourcesAcquirer().serialQueueChain().push(iGroup, std::move(iFunc));
30  }
31 
32  template <typename F>
33  void async(edm::limited::OutputModuleBase& iMod, tbb::task_group& iGroup, F&& iFunc) {
34  iMod.queue().push(iGroup, std::move(iFunc));
35  }
36 
37  template <typename F>
38  void async(edm::global::OutputModuleBase&, tbb::task_group& iGroup, F iFunc) {
39  //NOTE, need the functor since group can not run a 'mutable' lambda
40  auto t = edm::make_functor_task(iFunc);
41  iGroup.run([t]() {
43  t->execute();
44  });
45  }
46 } // namespace
47 
48 namespace edm {
49 
50  template <typename T>
52  module().doCloseFile();
53  }
54 
55  template <typename T>
57  return module().shouldWeCloseFile();
58  }
59 
60  template <typename T>
62  module().doOpenFile(fb);
63  }
64 
65  template <typename T>
67  ProcessBlockPrincipal const& processBlockPrincipal,
68  ProcessContext const* processContext,
69  ActivityRegistry* activityRegistry) {
76  processContext);
77  auto t = [&mod = module(),
78  &processBlockPrincipal,
79  globalContext,
80  token,
81  desc = &description(),
82  activityRegistry,
83  iTask]() mutable {
84  std::exception_ptr ex;
85  // Caught exception is propagated via WaitingTaskHolder
86  CMS_SA_ALLOW try {
88  ParentContext parentContext(&globalContext);
90  ModuleContextSentry moduleContextSentry(&mcc, parentContext);
91  activityRegistry->preModuleWriteProcessBlockSignal_(globalContext, mcc);
92  auto sentry(make_sentry(activityRegistry, [&globalContext, &mcc](ActivityRegistry* ar) {
93  ar->postModuleWriteProcessBlockSignal_(globalContext, mcc);
94  }));
95  mod.doWriteProcessBlock(processBlockPrincipal, &mcc);
96  } catch (...) {
97  ex = std::current_exception();
98  }
99  iTask.doneWaiting(ex);
100  };
101  async(module(), *iTask.group(), std::move(t));
102  }
103 
104  template <typename T>
106  edm::RunPrincipal const& rp,
107  ProcessContext const* processContext,
108  ActivityRegistry* activityRegistry,
109  MergeableRunProductMetadata const* mergeableRunProductMetadata) {
112  LuminosityBlockID(rp.run(), 0),
113  rp.index(),
115  rp.endTime(),
116  processContext);
117  auto t = [&mod = module(),
118  &rp,
119  globalContext,
120  token,
121  desc = &description(),
122  activityRegistry,
123  mergeableRunProductMetadata,
124  iTask]() mutable {
125  std::exception_ptr ex;
126  // Caught exception is propagated via WaitingTaskHolder
127  CMS_SA_ALLOW try {
129  ParentContext parentContext(&globalContext);
131  ModuleContextSentry moduleContextSentry(&mcc, parentContext);
132  activityRegistry->preModuleWriteRunSignal_(globalContext, mcc);
133  auto sentry(make_sentry(activityRegistry, [&globalContext, &mcc](ActivityRegistry* ar) {
134  ar->postModuleWriteRunSignal_(globalContext, mcc);
135  }));
136  mod.doWriteRun(rp, &mcc, mergeableRunProductMetadata);
137  } catch (...) {
138  ex = std::current_exception();
139  }
140  iTask.doneWaiting(ex);
141  };
142  async(module(), *iTask.group(), std::move(t));
143  }
144 
145  template <typename T>
148  ProcessContext const* processContext,
149  ActivityRegistry* activityRegistry) {
152  lbp.id(),
153  lbp.runPrincipal().index(),
154  lbp.index(),
155  lbp.beginTime(),
156  processContext);
157  auto t = [&mod = module(), &lbp, activityRegistry, token, globalContext, desc = &description(), iTask]() mutable {
158  std::exception_ptr ex;
159  // Caught exception is propagated via WaitingTaskHolder
160  CMS_SA_ALLOW try {
162 
163  ParentContext parentContext(&globalContext);
165  ModuleContextSentry moduleContextSentry(&mcc, parentContext);
166  activityRegistry->preModuleWriteLumiSignal_(globalContext, mcc);
167  auto sentry(make_sentry(activityRegistry, [&globalContext, &mcc](ActivityRegistry* ar) {
168  ar->postModuleWriteLumiSignal_(globalContext, mcc);
169  }));
170  mod.doWriteLuminosityBlock(lbp, &mcc);
171  } catch (...) {
172  ex = std::current_exception();
173  }
174  iTask.doneWaiting(ex);
175  };
176  async(module(), *iTask.group(), std::move(t));
177  }
178 
179  template <typename T>
181  return module().wantAllEvents();
182  }
183 
184  template <typename T>
186  return module().limitReached();
187  }
188 
189  template <typename T>
191  module().configure(desc);
192  }
193 
194  template <typename T>
196  return module().keptProducts();
197  }
198 
199  template <typename T>
202  module().selectProducts(preg, helper);
203  }
204 
205  template <typename T>
207  std::map<std::string, std::vector<std::pair<std::string, int>>> const& outputModulePathPositions,
208  bool anyProductProduced) {
209  module().setEventSelectionInfo(outputModulePathPositions, anyProductProduced);
210  }
211 
212  template <typename T>
214  return module().description();
215  }
216 
217  namespace impl {
218  std::unique_ptr<edm::OutputModuleCommunicator> createCommunicatorIfNeeded(void*) {
219  return std::unique_ptr<edm::OutputModuleCommunicator>{};
220  }
221  std::unique_ptr<edm::OutputModuleCommunicator> createCommunicatorIfNeeded(::edm::global::OutputModuleBase* iMod) {
222  return std::make_unique<OutputModuleCommunicatorT<edm::global::OutputModuleBase>>(iMod);
223  }
224  std::unique_ptr<edm::OutputModuleCommunicator> createCommunicatorIfNeeded(::edm::one::OutputModuleBase* iMod) {
225  return std::make_unique<OutputModuleCommunicatorT<edm::one::OutputModuleBase>>(iMod);
226  }
227  std::unique_ptr<edm::OutputModuleCommunicator> createCommunicatorIfNeeded(::edm::limited::OutputModuleBase* iMod) {
228  return std::make_unique<OutputModuleCommunicatorT<edm::limited::OutputModuleBase>>(iMod);
229  }
230  } // namespace impl
231 } // namespace edm
232 
233 namespace edm {
234  template class OutputModuleCommunicatorT<one::OutputModuleBase>;
235  template class OutputModuleCommunicatorT<global::OutputModuleBase>;
236  template class OutputModuleCommunicatorT<limited::OutputModuleBase>;
237 } // namespace edm
edm::SharedResourcesAcquirer::serialQueueChain
SerialTaskQueueChain & serialQueueChain() const
Definition: SharedResourcesAcquirer.h:54
edm::limited::OutputModuleBase::queue
LimitedTaskQueue & queue()
Definition: OutputModuleBase.h:119
edm::RunPrincipal::endTime
Timestamp const & endTime() const
Definition: RunPrincipal.h:69
edm::LuminosityBlockPrincipal::runPrincipal
RunPrincipal const & runPrincipal() const
Definition: LuminosityBlockPrincipal.h:45
ModuleCallingContext.h
edm::OutputModuleCommunicatorT::writeRunAsync
void writeRunAsync(WaitingTaskHolder iTask, edm::RunPrincipal const &rp, ProcessContext const *, ActivityRegistry *, MergeableRunProductMetadata const *) override
Definition: OutputModuleCommunicatorT.cc:105
ServiceRegistry.h
edm::ModuleContextSentry
Definition: ModuleContextSentry.h:11
WaitingTaskHolder.h
edm::global::OutputModuleBase
Definition: OutputModuleBase.h:64
edm
HLT enums.
Definition: AlignableModifier.h:19
edm::ActivityRegistry::preModuleWriteRunSignal_
PreModuleWriteRun preModuleWriteRunSignal_
Definition: ActivityRegistry.h:1004
edm::ProcessContext
Definition: ProcessContext.h:27
edmLumisInFiles.description
description
Definition: edmLumisInFiles.py:11
mod
T mod(const T &a, const T &b)
Definition: ecalDccMap.h:4
FunctorTask.h
edm::LuminosityBlockPrincipal
Definition: LuminosityBlockPrincipal.h:31
edm::GlobalContext::Transition::kWriteProcessBlock
edm::OutputModuleCommunicatorT::selectProducts
void selectProducts(edm::ProductRegistry const &preg, ThinnedAssociationsHelper const &) override
Definition: OutputModuleCommunicatorT.cc:200
edm::WaitingTaskHolder::doneWaiting
void doneWaiting(std::exception_ptr iExcept)
Definition: WaitingTaskHolder.h:93
edm::TaskSentry
Definition: TaskBase.h:50
edm::ActivityRegistry::postModuleWriteLumiSignal_
PostModuleWriteLumi postModuleWriteLumiSignal_
Definition: ActivityRegistry.h:1023
edm::ProcessBlockPrincipal
Definition: ProcessBlockPrincipal.h:22
LuminosityBlockID.h
CMS_SA_ALLOW
#define CMS_SA_ALLOW
Definition: thread_safety_macros.h:5
edm::ModuleDescription
Definition: ModuleDescription.h:21
edm::one::OutputModuleBase::sharedResourcesAcquirer
SharedResourcesAcquirer & sharedResourcesAcquirer()
Definition: OutputModuleBase.h:115
F
static uInt32 F(BLOWFISH_CTX *ctx, uInt32 x)
Definition: blowfish.cc:163
edm::ProductRegistry
Definition: ProductRegistry.h:37
ActivityRegistry.h
edm::make_functor_task
FunctorTask< F > * make_functor_task(F f)
Definition: FunctorTask.h:44
edm::ActivityRegistry::postModuleWriteRunSignal_
PostModuleWriteRun postModuleWriteRunSignal_
Definition: ActivityRegistry.h:1009
edm::FileBlock
Definition: FileBlock.h:20
edm::OutputModuleCommunicatorT::configure
void configure(edm::OutputModuleDescription const &desc) override
Definition: OutputModuleCommunicatorT.cc:190
alignCSCRings.s
s
Definition: alignCSCRings.py:92
edm::OutputModuleCommunicatorT::description
ModuleDescription const & description() const override
Definition: OutputModuleCommunicatorT.cc:213
edm::OutputModuleCommunicatorT::writeProcessBlockAsync
void writeProcessBlockAsync(WaitingTaskHolder iTask, ProcessBlockPrincipal const &, ProcessContext const *, ActivityRegistry *) override
Definition: OutputModuleCommunicatorT.cc:66
edm::ActivityRegistry
Definition: ActivityRegistry.h:134
edm::MergeableRunProductMetadata
Definition: MergeableRunProductMetadata.h:52
make_sentry.h
edm::OutputModuleCommunicatorT::writeLumiAsync
void writeLumiAsync(WaitingTaskHolder iTask, edm::LuminosityBlockPrincipal const &lbp, ProcessContext const *, ActivityRegistry *) override
Definition: OutputModuleCommunicatorT.cc:146
edm::one::OutputModuleBase
Definition: OutputModuleBase.h:67
edm::LuminosityBlockID
Definition: LuminosityBlockID.h:31
edm::OutputModuleCommunicatorT::closeFile
void closeFile() override
Definition: OutputModuleCommunicatorT.cc:51
edm::OutputModuleCommunicatorT::limitReached
bool limitReached() const override
Definition: OutputModuleCommunicatorT.cc:185
edm::ParentContext
Definition: ParentContext.h:27
AlCaHLTBitMon_QueryRunRegistry.string
string
Definition: AlCaHLTBitMon_QueryRunRegistry.py:256
edm::RunPrincipal::index
RunIndex index() const
Definition: RunPrincipal.h:57
edm::GlobalContext::Transition::kWriteLuminosityBlock
edm::ThinnedAssociationsHelper
Definition: ThinnedAssociationsHelper.h:37
edm::LimitedTaskQueue::push
void push(tbb::task_group &iGroup, T &&iAction)
asynchronously pushes functor iAction into queue
Definition: LimitedTaskQueue.h:115
edm::GlobalContext
Definition: GlobalContext.h:29
edm::impl::createCommunicatorIfNeeded
std::unique_ptr< edm::OutputModuleCommunicator > createCommunicatorIfNeeded(void *)
Definition: OutputModuleCommunicatorT.cc:218
GlobalContext.h
edm::LuminosityBlockIndex::invalidLuminosityBlockIndex
static LuminosityBlockIndex invalidLuminosityBlockIndex()
Definition: LuminosityBlockIndex.cc:9
edm::WaitingTaskHolder
Definition: WaitingTaskHolder.h:32
edm::ActivityRegistry::postModuleWriteProcessBlockSignal_
PostModuleWriteProcessBlock postModuleWriteProcessBlockSignal_
Definition: ActivityRegistry.h:997
edm::ServiceRegistry::presentToken
ServiceToken presentToken() const
Definition: ServiceRegistry.cc:63
edm::OutputModuleCommunicatorT::keptProducts
edm::SelectedProductsForBranchType const & keptProducts() const override
Definition: OutputModuleCommunicatorT.cc:195
thread_safety_macros.h
RunPrincipal.h
helper
Definition: helper.py:1
edm::RunPrincipal::run
RunNumber_t run() const
Definition: RunPrincipal.h:61
edm::limited::OutputModuleBase
Definition: OutputModuleBase.h:65
edm::SerialTaskQueueChain::push
void push(tbb::task_group &iGroup, T &&iAction)
asynchronously pushes functor iAction into queue
Definition: SerialTaskQueueChain.h:75
trackerHitRTTI::vector
Definition: trackerHitRTTI.h:21
edm::GlobalContext::Transition::kWriteRun
OutputModuleCommunicatorT.h
edm::OutputModuleCommunicatorT::wantAllEvents
bool wantAllEvents() const override
Definition: OutputModuleCommunicatorT.cc:180
edm::ServiceRegistry::instance
static ServiceRegistry & instance()
Definition: ServiceRegistry.cc:90
edm::RunIndex::invalidRunIndex
static RunIndex invalidRunIndex()
Definition: RunIndex.cc:9
edm::OutputModuleCommunicatorT::setEventSelectionInfo
void setEventSelectionInfo(std::map< std::string, std::vector< std::pair< std::string, int > > > const &outputModulePathPositions, bool anyProductProduced) override
Definition: OutputModuleCommunicatorT.cc:206
LuminosityBlockPrincipal.h
OutputModuleBase.h
edm::Timestamp::invalidTimestamp
static Timestamp invalidTimestamp()
Definition: Timestamp.h:82
ParentContext.h
edm::OutputModuleCommunicatorT::shouldWeCloseFile
bool shouldWeCloseFile() const override
Definition: OutputModuleCommunicatorT.cc:56
submitPVResolutionJobs.desc
string desc
Definition: submitPVResolutionJobs.py:251
edm::SelectedProductsForBranchType
std::array< SelectedProducts, NumBranchTypes > SelectedProductsForBranchType
Definition: SelectedProducts.h:13
impl
Definition: trackAlgoPriorityOrder.h:18
eostools.move
def move(src, dest)
Definition: eostools.py:511
edm::LuminosityBlockPrincipal::id
LuminosityBlockID id() const
Definition: LuminosityBlockPrincipal.h:53
edm::make_sentry
std::unique_ptr< T, F > make_sentry(T *iObject, F iFunc)
NOTE: if iObject is null, then iFunc will not be called.
Definition: make_sentry.h:30
LuminosityBlockIndex.h
edm::ActivityRegistry::preModuleWriteProcessBlockSignal_
PreModuleWriteProcessBlock preModuleWriteProcessBlockSignal_
Definition: ActivityRegistry.h:990
edm::OutputModuleCommunicatorT::openFile
void openFile(edm::FileBlock const &fb) override
Definition: OutputModuleCommunicatorT.cc:61
OutputModuleBase.h
genParticles_cff.map
map
Definition: genParticles_cff.py:11
edm::OutputModuleDescription
Definition: OutputModuleDescription.h:17
edm::LuminosityBlockPrincipal::index
LuminosityBlockIndex index() const
Definition: LuminosityBlockPrincipal.h:51
edm::LuminosityBlockPrincipal::beginTime
Timestamp const & beginTime() const
Definition: LuminosityBlockPrincipal.h:55
OutputModuleBase.h
edm::RunPrincipal
Definition: RunPrincipal.h:34
submitPVValidationJobs.t
string t
Definition: submitPVValidationJobs.py:644
edm::ServiceRegistry::Operate
Definition: ServiceRegistry.h:40
ModuleContextSentry.h
benchmark_cfg.fb
fb
Definition: benchmark_cfg.py:14
edm::WaitingTaskHolder::group
tbb::task_group * group() const noexcept
Definition: WaitingTaskHolder.h:77
edm::ModuleCallingContext
Definition: ModuleCallingContext.h:29
unpackBuffers-CaloStage2.token
token
Definition: unpackBuffers-CaloStage2.py:316