CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
WorkerManager.h
Go to the documentation of this file.
1 #ifndef FWCore_Framework_WorkerManager_h
2 #define FWCore_Framework_WorkerManager_h
3 
4 /*
5 
6 */
7 
16 
17 #include <memory>
18 
19 #include <set>
20 #include <string>
21 #include <vector>
22 
23 namespace edm {
24  class ExceptionCollector;
25  class StreamID;
26  class StreamContext;
27  class ModuleRegistry;
28  class PreallocationConfiguration;
29 
30  class WorkerManager {
31  public:
32  typedef std::vector<Worker*> AllWorkers;
33 
34  WorkerManager(std::shared_ptr<ActivityRegistry> actReg, ExceptionToActionTable const& actions);
35 
36  WorkerManager(std::shared_ptr<ModuleRegistry> modReg,
37  std::shared_ptr<ActivityRegistry> actReg,
42  std::shared_ptr<ProcessConfiguration> processConfiguration,
44  std::set<std::string>& unscheduledLabels,
45  std::vector<std::string>& shouldBeUsedLabels);
46 
47  void setOnDemandProducts(ProductRegistry& pregistry, std::set<std::string> const& unscheduledLabels) const;
48 
49  template <typename T, typename U>
50  void processOneOccurrence(typename T::MyPrincipal& principal,
51  EventSetup const& eventSetup,
52  StreamID streamID,
53  typename T::Context const* topContext,
54  U const* context,
55  bool cleaningUpAfterException = false);
56 
57  void beginJob(ProductRegistry const& iRegistry);
58  void endJob();
59  void endJob(ExceptionCollector& collector);
60 
61  void beginStream(StreamID iID, StreamContext& streamContext);
62  void endStream(StreamID iID, StreamContext& streamContext);
63 
64  AllWorkers const& allWorkers() const {return allWorkers_;}
65 
66  void addToAllWorkers(Worker* w);
67 
69 
73  std::shared_ptr<ProcessConfiguration const> processConfiguration,
74  std::string const& label);
75 
76  private:
77 
78  void resetAll();
79 
81 
82  std::shared_ptr<UnscheduledCallProducer const> unscheduled() const {return get_underlying_safe(unscheduled_);}
83  std::shared_ptr<UnscheduledCallProducer>& unscheduled() {return get_underlying_safe(unscheduled_);}
84 
87 
89 
91  };
92 
93  template <typename T, typename U>
94  void
95  WorkerManager::processOneOccurrence(typename T::MyPrincipal& ep,
96  EventSetup const& es,
97  StreamID streamID,
98  typename T::Context const* topContext,
99  U const* context,
100  bool cleaningUpAfterException) {
101  this->resetAll();
102 
103  try {
104  convertException::wrap([&]() {
105  try {
106  if (T::isEvent_) {
107  setupOnDemandSystem(dynamic_cast<EventPrincipal&>(ep), es);
108  } else {
109  //make sure the unscheduled items see this run or lumi rtansition
110  unscheduled_->runNow<T,U>(ep, es,streamID, topContext, context);
111  }
112  }
113  catch(cms::Exception& e) {
117  if (action == exception_actions::SkipEvent) {
118  printCmsExceptionWarning("SkipEvent", e);
119  } else {
120  throw;
121  }
122  }
123  });
124  }
125  catch(cms::Exception& ex) {
126  if (ex.context().empty()) {
127  addContextAndPrintException("Calling function WorkerManager::processOneOccurrence", ex, cleaningUpAfterException);
128  } else {
129  addContextAndPrintException("", ex, cleaningUpAfterException);
130  }
131  throw;
132  }
133  }
134 }
135 
136 #endif
std::shared_ptr< UnscheduledCallProducer const > unscheduled() const
Definition: WorkerManager.h:82
std::shared_ptr< UnscheduledCallProducer > & unscheduled()
Definition: WorkerManager.h:83
void endStream(StreamID iID, StreamContext &streamContext)
const double w
Definition: UKUtility.cc:23
void setOnDemandProducts(ProductRegistry &pregistry, std::set< std::string > const &unscheduledLabels) const
void addToUnscheduledWorkers(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration > processConfiguration, std::string label, std::set< std::string > &unscheduledLabels, std::vector< std::string > &shouldBeUsedLabels)
AllWorkers allWorkers_
Definition: WorkerManager.h:88
void processOneOccurrence(typename T::MyPrincipal &principal, EventSetup const &eventSetup, StreamID streamID, typename T::Context const *topContext, U const *context, bool cleaningUpAfterException=false)
Definition: WorkerManager.h:95
assert(m_qm.get())
void addContextAndPrintException(char const *context, cms::Exception &ex, bool disablePrint)
void beginJob(ProductRegistry const &iRegistry)
edm::propagate_const< std::shared_ptr< UnscheduledCallProducer > > unscheduled_
Definition: WorkerManager.h:90
processConfiguration
Definition: Schedule.cc:374
std::string const & category() const
Definition: Exception.cc:183
exception_actions::ActionCodes find(const std::string &category) const
actions
Definition: Schedule.cc:374
ExceptionToActionTable const * actionTable_
Definition: WorkerManager.h:86
std::vector< Worker * > AllWorkers
Definition: WorkerManager.h:32
std::list< std::string > const & context() const
Definition: Exception.cc:191
std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
void beginStream(StreamID iID, StreamContext &streamContext)
AllWorkers const & allWorkers() const
Definition: WorkerManager.h:64
string action
Definition: mps_fire.py:28
WorkerRegistry workerReg_
Definition: WorkerManager.h:85
The Registry of all workers that where requested Holds all instances of workers. In this implementati...
auto wrap(F iFunc) -> decltype(iFunc())
WorkerManager(std::shared_ptr< ActivityRegistry > actReg, ExceptionToActionTable const &actions)
preg
Definition: Schedule.cc:374
long double T
Worker * getWorker(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &label)
ExceptionToActionTable const & actionTable() const
Definition: WorkerManager.h:68
void setupOnDemandSystem(EventPrincipal &principal, EventSetup const &es)
prealloc
Definition: Schedule.cc:374
void addToAllWorkers(Worker *w)
void printCmsExceptionWarning(char const *behavior, cms::Exception const &e)