CMS 3D CMS Logo

WorkerManager.h
Go to the documentation of this file.
1 #ifndef FWCore_Framework_WorkerManager_h
2 #define FWCore_Framework_WorkerManager_h
3 
12 
13 #include <memory>
14 #include <mutex>
15 #include <set>
16 #include <string>
17 #include <utility>
18 #include <vector>
19 
20 namespace edm {
21  class ExceptionCollector;
22  class ExceptionToActionTable;
23  class ModuleRegistry;
24  class ModuleTypeResolverMaker;
25  class PreallocationConfiguration;
26  class Worker;
27  namespace eventsetup {
29  }
30 
31  class WorkerManager {
32  public:
33  typedef std::vector<Worker*> AllWorkers;
34 
35  WorkerManager(std::shared_ptr<ActivityRegistry> actReg,
37  ModuleTypeResolverMaker const* typeResolverMaker);
38  WorkerManager(WorkerManager&&) = default;
39 
40  WorkerManager(std::shared_ptr<ModuleRegistry> modReg,
41  std::shared_ptr<ActivityRegistry> actReg,
43 
45 
47  ProductRegistry& preg,
48  PreallocationConfiguration const* prealloc,
49  std::shared_ptr<ProcessConfiguration const> processConfiguration,
51  std::set<std::string>& unscheduledLabels,
52  std::vector<std::string>& shouldBeUsedLabels);
53 
54  template <typename T, typename U>
56  typename T::TransitionInfoType&,
57  ServiceToken const&,
58  StreamID,
59  typename T::Context const* topContext,
60  U const* context) noexcept;
61 
62  template <typename T>
64  typename T::TransitionInfoType const&,
65  ServiceToken const&,
66  StreamID,
67  ParentContext const&,
68  typename T::Context const*);
69 
70  void setupResolvers(Principal& principal);
72 
73  void beginJob(ProductRegistry const& iRegistry,
76  GlobalContext const&);
78 
79  void beginStream(StreamID, StreamContext const&);
80  void endStream(StreamID, StreamContext const&, ExceptionCollector&, std::mutex& collectorMutex) noexcept;
81 
82  AllWorkers const& allWorkers() const { return allWorkers_; }
83  AllWorkers const& unscheduledWorkers() const { return unscheduled_.workers(); }
84 
85  void addToAllWorkers(Worker* w);
86 
87  ExceptionToActionTable const& actionTable() const { return *actionTable_; }
88 
90  ProductRegistry& preg,
91  PreallocationConfiguration const* prealloc,
92  std::shared_ptr<ProcessConfiguration const> processConfiguration,
93  std::string const& label);
94 
95  void resetAll();
96 
97  private:
103  };
104 
105  template <typename T, typename U>
107  typename T::TransitionInfoType& info,
108  ServiceToken const& token,
109  StreamID streamID,
110  typename T::Context const* topContext,
111  U const* context) noexcept {
112  static_assert(!T::isEvent_);
113 
114  // Spawn them in reverse order. At least in the single threaded case that makes
115  // them run in forward order (and more likely to with multiple threads).
116  for (auto it = allWorkers_.rbegin(), itEnd = allWorkers_.rend(); it != itEnd; ++it) {
117  Worker* worker = *it;
118 
119  ParentContext parentContext(context);
120 
121  // We do not need to run prefetching here because this only handles
122  // stream begin/end transitions for runs and lumis. There are no products
123  // put into the runs or lumis in stream transitions, so there can be
124  // no data dependencies which require prefetching. Prefetching is
125  // needed for global transitions, but they are run elsewhere.
126  // (One exception, the SecondaryEventProvider (used for mixing) sends
127  // global begin/end run/lumi transitions through here. They shouldn't
128  // need prefetching either and for some years nothing has been using
129  // that part of the code anyway...)
130  worker->doWorkNoPrefetchingAsync<T>(task, info, token, streamID, parentContext, topContext);
131  }
132  }
133 
134  template <typename T>
136  typename T::TransitionInfoType const& info,
137  ServiceToken const& token,
138  StreamID streamID,
139  ParentContext const& parentContext,
140  typename T::Context const* context) {
141  unscheduled_.runAccumulatorsAsync<T>(std::move(task), info, token, streamID, parentContext, context);
142  }
143 } // namespace edm
144 
145 #endif
static const TGPicture * info(bool iBackgroundIsBlack)
roAction_t actions[nactions]
Definition: GenABIO.cc:181
void doWorkNoPrefetchingAsync(WaitingTaskHolder, typename T::TransitionInfoType const &, ServiceToken const &, StreamID, ParentContext const &, typename T::Context const *) noexcept
Definition: Worker.h:1200
worker_container const & workers() const
void processAccumulatorsAsync(WaitingTaskHolder, typename T::TransitionInfoType const &, ServiceToken const &, StreamID, ParentContext const &, typename T::Context const *)
T w() const
UnscheduledCallProducer unscheduled_
void beginJob(ProductRegistry const &iRegistry, eventsetup::ESRecordsToProductResolverIndices const &, ProcessBlockHelperBase const &, GlobalContext const &)
static std::mutex mutex
Definition: Proxy.cc:8
AllWorkers allWorkers_
void setupOnDemandSystem(EventTransitionInfo const &)
AllWorkers const & unscheduledWorkers() const
Definition: WorkerManager.h:83
void deleteModuleIfExists(std::string const &moduleLabel)
ExceptionToActionTable const * actionTable_
Definition: WorkerManager.h:99
Worker * getWorker(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &label)
void const * lastSetupEventPrincipal_
std::vector< Worker * > AllWorkers
Definition: WorkerManager.h:33
char const * label
void endJob(ExceptionCollector &, GlobalContext const &)
WorkerManager(std::shared_ptr< ActivityRegistry > actReg, ExceptionToActionTable const &actions, ModuleTypeResolverMaker const *typeResolverMaker)
void processOneOccurrenceAsync(WaitingTaskHolder, typename T::TransitionInfoType &, ServiceToken const &, StreamID, typename T::Context const *topContext, U const *context) noexcept
AllWorkers const & allWorkers() const
Definition: WorkerManager.h:82
void runAccumulatorsAsync(WaitingTaskHolder task, typename T::TransitionInfoType const &info, ServiceToken const &token, StreamID streamID, ParentContext const &parentContext, typename T::Context const *context) noexcept
ExceptionToActionTable const & actionTable() const
Definition: WorkerManager.h:87
void addToUnscheduledWorkers(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string label, std::set< std::string > &unscheduledLabels, std::vector< std::string > &shouldBeUsedLabels)
HLT enums.
void setupResolvers(Principal &principal)
WorkerRegistry workerReg_
Definition: WorkerManager.h:98
The Registry of all workers that where requested Holds all instances of workers. In this implementati...
void endStream(StreamID, StreamContext const &, ExceptionCollector &, std::mutex &collectorMutex) noexcept
long double T
void beginStream(StreamID, StreamContext const &)
def move(src, dest)
Definition: eostools.py:511
void addToAllWorkers(Worker *w)