CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
WorkerManager.h
Go to the documentation of this file.
1 #ifndef FWCore_Framework_WorkerManager_h
2 #define FWCore_Framework_WorkerManager_h
3 
4 /*
5 
6 */
7 
15 
16 #include <memory>
17 
18 #include <set>
19 #include <string>
20 #include <vector>
21 
22 namespace edm {
23  class ExceptionCollector;
24  class StreamID;
25  class StreamContext;
26  class ModuleRegistry;
27  class PreallocationConfiguration;
28 
29  class WorkerManager {
30  public:
31  typedef std::vector<Worker*> AllWorkers;
32 
33  WorkerManager(std::shared_ptr<ActivityRegistry> actReg, ExceptionToActionTable const& actions);
34 
35  WorkerManager(std::shared_ptr<ModuleRegistry> modReg,
36  std::shared_ptr<ActivityRegistry> actReg,
41  std::shared_ptr<ProcessConfiguration> processConfiguration,
43  std::set<std::string>& unscheduledLabels,
44  std::vector<std::string>& shouldBeUsedLabels);
45 
46  void setOnDemandProducts(ProductRegistry& pregistry, std::set<std::string> const& unscheduledLabels) const;
47 
48  template <typename T, typename U>
49  void processOneOccurrence(typename T::MyPrincipal& principal,
50  EventSetup const& eventSetup,
51  StreamID streamID,
52  typename T::Context const* topContext,
53  U const* context,
54  bool cleaningUpAfterException = false);
55 
56  void beginJob(ProductRegistry const& iRegistry);
57  void endJob();
58  void endJob(ExceptionCollector& collector);
59 
60  void beginStream(StreamID iID, StreamContext& streamContext);
61  void endStream(StreamID iID, StreamContext& streamContext);
62 
63  AllWorkers const& allWorkers() const {return allWorkers_;}
64 
65  void addToAllWorkers(Worker* w);
66 
68 
72  std::shared_ptr<ProcessConfiguration const> processConfiguration,
73  std::string const& label);
74 
75  private:
76 
77  void resetAll();
78 
80 
83 
85 
86  std::shared_ptr<UnscheduledCallProducer> unscheduled_;
87  };
88 
89  template <typename T, typename U>
90  void
91  WorkerManager::processOneOccurrence(typename T::MyPrincipal& ep,
92  EventSetup const& es,
93  StreamID streamID,
94  typename T::Context const* topContext,
95  U const* context,
96  bool cleaningUpAfterException) {
97  this->resetAll();
98 
99  try {
100  convertException::wrap([&]() {
101  try {
102  if (T::isEvent_) {
103  setupOnDemandSystem(dynamic_cast<EventPrincipal&>(ep), es);
104  } else {
105  //make sure the unscheduled items see this run or lumi rtansition
106  unscheduled_->runNow<T,U>(ep, es,streamID, topContext, context);
107  }
108  }
109  catch(cms::Exception& e) {
111  assert (action != exception_actions::IgnoreCompletely);
112  assert (action != exception_actions::FailPath);
113  if (action == exception_actions::SkipEvent) {
114  printCmsExceptionWarning("SkipEvent", e);
115  } else {
116  throw;
117  }
118  }
119  });
120  }
121  catch(cms::Exception& ex) {
122  if (ex.context().empty()) {
123  addContextAndPrintException("Calling function WorkerManager::processOneOccurrence", ex, cleaningUpAfterException);
124  } else {
125  addContextAndPrintException("", ex, cleaningUpAfterException);
126  }
127  throw;
128  }
129  }
130 }
131 
132 #endif
void endStream(StreamID iID, StreamContext &streamContext)
const double w
Definition: UKUtility.cc:23
void setOnDemandProducts(ProductRegistry &pregistry, std::set< std::string > const &unscheduledLabels) const
void addToUnscheduledWorkers(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration > processConfiguration, std::string label, std::set< std::string > &unscheduledLabels, std::vector< std::string > &shouldBeUsedLabels)
AllWorkers allWorkers_
Definition: WorkerManager.h:84
void processOneOccurrence(typename T::MyPrincipal &principal, EventSetup const &eventSetup, StreamID streamID, typename T::Context const *topContext, U const *context, bool cleaningUpAfterException=false)
Definition: WorkerManager.h:91
void addContextAndPrintException(char const *context, cms::Exception &ex, bool disablePrint)
void beginJob(ProductRegistry const &iRegistry)
processConfiguration
Definition: Schedule.cc:370
std::string const & category() const
Definition: Exception.cc:183
exception_actions::ActionCodes find(const std::string &category) const
actions
Definition: Schedule.cc:370
ExceptionToActionTable const * actionTable_
Definition: WorkerManager.h:82
std::vector< Worker * > AllWorkers
Definition: WorkerManager.h:31
std::list< std::string > const & context() const
Definition: Exception.cc:191
void printCmsExceptionWarning(char const *behavior, cms::Exception const &e, edm::JobReport *jobRep=0, int rc=-1)
void beginStream(StreamID iID, StreamContext &streamContext)
AllWorkers const & allWorkers() const
Definition: WorkerManager.h:63
std::shared_ptr< UnscheduledCallProducer > unscheduled_
Definition: WorkerManager.h:86
WorkerRegistry workerReg_
Definition: WorkerManager.h:81
The Registry of all workers that where requested Holds all instances of workers. In this implementati...
auto wrap(F iFunc) -> decltype(iFunc())
WorkerManager(std::shared_ptr< ActivityRegistry > actReg, ExceptionToActionTable const &actions)
preg
Definition: Schedule.cc:370
long double T
Worker * getWorker(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &label)
ExceptionToActionTable const & actionTable() const
Definition: WorkerManager.h:67
void setupOnDemandSystem(EventPrincipal &principal, EventSetup const &es)
prealloc
Definition: Schedule.cc:370
void addToAllWorkers(Worker *w)