CMS 3D CMS Logo

SecondaryEventProvider.cc
Go to the documentation of this file.
11 #include "oneapi/tbb/task_arena.h"
12 
13 #include <mutex>
14 
15 namespace {
16  template <typename T, typename U>
17  void processOneOccurrence(edm::WorkerManager& manager,
18  typename T::TransitionInfoType& info,
19  edm::StreamID streamID,
20  typename T::Context const* topContext,
21  U const* context,
22  bool cleaningUpAfterException = false) {
23  manager.resetAll();
24 
25  if (manager.allWorkers().empty())
26  return;
27 
29  //we need the arena to guarantee that the syncWait will return to this thread
30  // and not cause this callstack to possibly be moved to a new thread
31  tbb::task_arena localArena{tbb::this_task_arena::max_concurrency()};
32  std::exception_ptr exceptPtr = localArena.execute([&]() {
33  return edm::syncWait([&](edm::WaitingTaskHolder&& iHolder) {
34  manager.processOneOccurrenceAsync<T, U>(std::move(iHolder), info, token, streamID, topContext, context);
35  });
36  });
37 
38  if (exceptPtr) {
39  try {
40  edm::convertException::wrap([&]() { std::rethrow_exception(exceptPtr); });
41  } catch (cms::Exception& ex) {
42  if (ex.context().empty()) {
43  edm::addContextAndPrintException("Calling SecondaryEventProvider", ex, cleaningUpAfterException);
44  } else {
45  edm::addContextAndPrintException("", ex, cleaningUpAfterException);
46  }
47  throw;
48  }
49  }
50  }
51 } // namespace
52 
53 namespace edm {
54  SecondaryEventProvider::SecondaryEventProvider(std::vector<ParameterSet>& psets,
55  ProductRegistry& preg,
56  std::shared_ptr<ProcessConfiguration> processConfiguration)
57  : exceptionToActionTable_(new ExceptionToActionTable),
58  // no type resolver for modules in SecondaryEventProvider for now
59  workerManager_(std::make_shared<ActivityRegistry>(), *exceptionToActionTable_, nullptr) {
60  std::vector<std::string> shouldBeUsedLabels;
61  std::set<std::string> unscheduledLabels;
62  const PreallocationConfiguration preallocConfig;
63  for (auto& pset : psets) {
64  std::string label = pset.getParameter<std::string>("@module_label");
66  pset, preg, &preallocConfig, processConfiguration, label, unscheduledLabels, shouldBeUsedLabels);
67  }
68  if (!unscheduledLabels.empty()) {
69  preg.setUnscheduledProducts(unscheduledLabels);
70  }
71  } // SecondaryEventProvider::SecondaryEventProvider
72 
75  GlobalContext const& globalContext) {
76  ProcessBlockHelper dummyProcessBlockHelper;
77  workerManager_.beginJob(iRegistry, iIndices, dummyProcessBlockHelper, globalContext);
78  }
79 
80  //NOTE: When the Stream interfaces are propagated to the modules, this code must be updated
81  // to also send the stream based transitions
83  const EventSetupImpl& setup,
84  ModuleCallingContext const* mcc,
85  StreamContext& sContext) {
87  processOneOccurrence<OccurrenceTraits<RunPrincipal, BranchActionGlobalBegin>>(
89  processOneOccurrence<OccurrenceTraits<RunPrincipal, BranchActionStreamBegin>>(
90  workerManager_, info, sContext.streamID(), &sContext, mcc);
91  }
92 
94  const EventSetupImpl& setup,
95  ModuleCallingContext const* mcc,
96  StreamContext& sContext) {
98  processOneOccurrence<OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalBegin>>(
100  processOneOccurrence<OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamBegin>>(
101  workerManager_, info, sContext.streamID(), &sContext, mcc);
102  }
103 
105  const EventSetupImpl& setup,
106  ModuleCallingContext const* mcc,
107  StreamContext& sContext) {
109  processOneOccurrence<OccurrenceTraits<RunPrincipal, BranchActionStreamEnd>>(
110  workerManager_, info, sContext.streamID(), &sContext, mcc);
111  processOneOccurrence<OccurrenceTraits<RunPrincipal, BranchActionGlobalEnd>>(
112  workerManager_, info, StreamID::invalidStreamID(), nullptr, mcc);
113  }
114 
116  const EventSetupImpl& setup,
117  ModuleCallingContext const* mcc,
118  StreamContext& sContext) {
120  processOneOccurrence<OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamEnd>>(
121  workerManager_, info, sContext.streamID(), &sContext, mcc);
122  processOneOccurrence<OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalEnd>>(
123  workerManager_, info, StreamID::invalidStreamID(), nullptr, mcc);
124  }
125 
127  const EventSetupImpl& setup,
128  StreamContext& sContext) {
132 
133  if (workerManager_.unscheduledWorkers().empty()) {
134  return;
135  }
137  //we need the arena to guarantee that the syncWait will return to this thread
138  // and not cause this callstack to possibly be moved to a new thread
139  ParentContext pc(&sContext);
140  std::exception_ptr exceptPtr = tbb::this_task_arena::isolate([&]() {
141  return edm::syncWait([&](edm::WaitingTaskHolder&& iHolder) {
142  for (auto& worker : workerManager_.unscheduledWorkers()) {
144  iHolder, info, token, sContext.streamID(), pc, &sContext);
145  }
146  });
147  });
148  if (exceptPtr) {
149  try {
150  edm::convertException::wrap([&]() { std::rethrow_exception(exceptPtr); });
151  } catch (cms::Exception& ex) {
152  if (ex.context().empty()) {
153  edm::addContextAndPrintException("Calling SecondaryEventProvider", ex, false);
154  } else {
155  edm::addContextAndPrintException("", ex, false);
156  }
157  throw;
158  }
159  }
160  }
161 
163  workerManager_.beginStream(iID, sContext);
164  }
165 
167  StreamContext const& sContext,
168  ExceptionCollector& exceptionCollector) {
169  // In this context the mutex is not needed because these things are not
170  // executing concurrently but in general the WorkerManager needs one.
171  std::mutex exceptionCollectorMutex;
172  workerManager_.endStream(iID, sContext, exceptionCollector, exceptionCollectorMutex);
173  }
174 } // namespace edm
static const TGPicture * info(bool iBackgroundIsBlack)
void setupPileUpEvent(EventPrincipal &ep, const EventSetupImpl &setup, StreamContext &sContext)
std::exception_ptr syncWait(F &&iFunc)
void beginJob(ProductRegistry const &iRegistry, eventsetup::ESRecordsToProductResolverIndices const &, ProcessBlockHelperBase const &, GlobalContext const &)
static std::mutex mutex
Definition: Proxy.cc:8
void setupOnDemandSystem(EventTransitionInfo const &)
void addContextAndPrintException(char const *context, cms::Exception &ex, bool disablePrint)
AllWorkers const & unscheduledWorkers() const
Definition: WorkerManager.h:83
static StreamID invalidStreamID()
Definition: StreamID.h:45
void endStream(edm::StreamID, StreamContext const &, ExceptionCollector &)
char const * label
void setUnscheduledProducts(std::set< std::string > const &unscheduledLabels)
void processOneOccurrenceAsync(WaitingTaskHolder, typename T::TransitionInfoType &, ServiceToken const &, StreamID, typename T::Context const *topContext, U const *context) noexcept
StreamID const & streamID() const
Definition: StreamContext.h:55
void beginRun(RunPrincipal &run, const edm::EventSetupImpl &setup, ModuleCallingContext const *, StreamContext &sContext)
static ServiceRegistry & instance()
AllWorkers const & allWorkers() const
Definition: WorkerManager.h:82
void endLuminosityBlock(LuminosityBlockPrincipal &lumi, const edm::EventSetupImpl &setup, ModuleCallingContext const *, StreamContext &sContext)
void beginJob(ProductRegistry const &iRegistry, eventsetup::ESRecordsToProductResolverIndices const &, GlobalContext const &)
void endRun(RunPrincipal &run, const edm::EventSetupImpl &setup, ModuleCallingContext const *, StreamContext &sContext)
void addToUnscheduledWorkers(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string label, std::set< std::string > &unscheduledLabels, std::vector< std::string > &shouldBeUsedLabels)
SecondaryEventProvider(std::vector< ParameterSet > &psets, ProductRegistry &pregistry, std::shared_ptr< ProcessConfiguration > processConfiguration)
HLT enums.
void setupResolvers(Principal &principal)
void beginStream(edm::StreamID, StreamContext const &)
auto wrap(F iFunc) -> decltype(iFunc())
void beginLuminosityBlock(LuminosityBlockPrincipal &lumi, const edm::EventSetupImpl &setup, ModuleCallingContext const *, StreamContext &sContext)
void endStream(StreamID, StreamContext const &, ExceptionCollector &, std::mutex &collectorMutex) noexcept
std::list< std::string > const & context() const
Definition: Exception.cc:151
long double T
void beginStream(StreamID, StreamContext const &)
ServiceToken presentToken() const
def move(src, dest)
Definition: eostools.py:511