CMS 3D CMS Logo

Schedule.h
Go to the documentation of this file.
1 #ifndef FWCore_Framework_Schedule_h
2 #define FWCore_Framework_Schedule_h
3 
4 /*
5  Author: Jim Kowalkowski 28-01-06
6 
7  A class for creating a schedule based on paths in the configuration file.
8  The schedule is maintained as a sequence of paths.
9  After construction, events can be fed to the object and passed through
10  all the modules in the schedule. All accounting about processing
11  of events by modules and paths is contained here or in object held
12  by containment.
13 
14  The trigger results producer and product are generated and managed here.
15  This class also manages endpaths and calls to endjob and beginjob.
16  Endpaths are just treated as a simple list of modules that need to
17  do processing of the event and do not participate in trigger path
18  activities.
19 
20  This class requires the high-level process pset. It uses @process_name.
21  If the high-level pset contains an "options" pset, then the
22  following optional parameter can be present:
23  bool wantSummary = true/false # default false
24 
25  wantSummary indicates whether or not the pass/fail/error stats
26  for modules and paths should be printed at the end-of-job.
27 
28  A TriggerResults object will always be inserted into the event
29  for any schedule. The producer of the TriggerResults EDProduct
30  is always the first module in the endpath. The TriggerResultInserter
31  is given a fixed label of "TriggerResults".
32 
33  Processing of an event happens by pushing the event through the Paths.
34  The scheduler performs the reset() on each of the workers independent
35  of the Path objects.
36 
37  ------------------------
38 
39  About Paths:
40  Paths fit into two categories:
41  1) trigger paths that contribute directly to saved trigger bits
42  2) end paths
43  The Schedule holds these paths in two data structures:
44  1) main path list
45  2) end path list
46 
47  Trigger path processing always precedes endpath processing.
48  The order of the paths from the input configuration is
49  preserved in the main paths list.
50 
51  ------------------------
52 
53  The Schedule uses the TriggerNamesService to get the names of the
54  trigger paths and end paths. When a TriggerResults object is created
55  the results are stored in the same order as the trigger names from
56  TriggerNamesService.
57 
58 */
59 
84 
85 #include <array>
86 #include <map>
87 #include <memory>
88 #include <mutex>
89 #include <set>
90 #include <string>
91 #include <vector>
92 #include <sstream>
93 #include <utility>
94 
95 namespace edm {
96 
97  namespace service {
98  class TriggerNamesService;
99  }
100  namespace evetnsetup {
101  class ESRecordsToProductResolverIndices;
102  }
103 
104  class BranchIDListHelper;
105  class EventTransitionInfo;
106  class ExceptionCollector;
109  class ProductRegistry;
111  class StreamSchedule;
112  class GlobalSchedule;
113  struct TriggerTimingReport;
114  class ModuleRegistry;
118  class TriggerResultInserter;
119  class PathStatusInserter;
120  class EndPathStatusInserter;
121  class WaitingTaskHolder;
122 
123  class Schedule {
124  public:
125  typedef std::vector<std::string> vstring;
126  typedef std::vector<Worker*> AllWorkers;
127  typedef std::vector<edm::propagate_const<std::shared_ptr<OutputModuleCommunicator>>> AllOutputModuleCommunicators;
128 
129  typedef std::vector<Worker*> Workers;
130 
131  Schedule(ParameterSet& proc_pset,
132  service::TriggerNamesService const& tns,
133  ProductRegistry& pregistry,
135  std::shared_ptr<ActivityRegistry> areg,
136  std::shared_ptr<ProcessConfiguration const> processConfiguration,
138  ProcessContext const* processContext,
139  ModuleTypeResolverMaker const* resolverMaker);
140  void finishSetup(ParameterSet& proc_pset,
141  service::TriggerNamesService const& tns,
142  ProductRegistry& preg,
143  BranchIDListHelper& branchIDListHelper,
144  ProcessBlockHelperBase& processBlockHelper,
145  ThinnedAssociationsHelper& thinnedAssociationsHelper,
146  SubProcessParentageHelper const* subProcessParentageHelper,
147  std::shared_ptr<ActivityRegistry> areg,
148  std::shared_ptr<ProcessConfiguration> processConfiguration,
149  bool hasSubprocesses,
150  PreallocationConfiguration const& prealloc,
151  ProcessContext const* processContext);
152 
154  unsigned int iStreamID,
156  ServiceToken const& token);
157 
158  template <typename T>
160  typename T::TransitionInfoType& transitionInfo,
161  ServiceToken const& token,
162  bool cleaningUpAfterException = false);
163 
164  template <typename T>
166  unsigned int iStreamID,
167  typename T::TransitionInfoType& transitionInfo,
168  ServiceToken const& token,
169  bool cleaningUpAfterException = false);
170 
171  void beginJob(ProductRegistry const&,
173  ProcessBlockHelperBase const&,
175  ProcessContext const&);
176  void endJob(ExceptionCollector& collector);
177  void sendFwkSummaryToMessageLogger() const;
178 
179  void beginStream(unsigned int streamID);
180  void endStream(unsigned int streamID, ExceptionCollector& collector, std::mutex& collectorMutex) noexcept;
181 
182  // Write the luminosity block
184  LuminosityBlockPrincipal const& lbp,
185  ProcessContext const*,
187 
188  // Write the run
189  void writeRunAsync(WaitingTaskHolder iTask,
190  RunPrincipal const& rp,
191  ProcessContext const*,
194 
196  ProcessBlockPrincipal const&,
197  ProcessContext const*,
199 
200  // Call closeFile() on all OutputModules.
201  void closeOutputFiles();
202 
203  // Call openFiles() on all OutputModules
204  void openOutputFiles(FileBlock& fb);
205 
206  // Call respondToOpenInputFile() on all Modules
207  void respondToOpenInputFile(FileBlock const& fb);
208 
209  // Call respondToCloseInputFile() on all Modules
210  void respondToCloseInputFile(FileBlock const& fb);
211 
212  // Call shouldWeCloseFile() on all OutputModules.
213  bool shouldWeCloseOutput() const;
214 
217 
221  std::vector<ModuleDescription const*> getAllModuleDescriptions() const;
222 
224  void availablePaths(std::vector<std::string>& oLabelsToFill) const;
225 
229  void triggerPaths(std::vector<std::string>& oLabelsToFill) const;
230 
232  void endPaths(std::vector<std::string>& oLabelsToFill) const;
233 
235  void modulesInPath(std::string const& iPathLabel, std::vector<std::string>& oLabelsToFill) const;
236 
239  void moduleDescriptionsInPath(std::string const& iPathLabel,
240  std::vector<ModuleDescription const*>& descriptions,
241  unsigned int hint) const;
242 
245  void moduleDescriptionsInEndPath(std::string const& iEndPathLabel,
246  std::vector<ModuleDescription const*>& descriptions,
247  unsigned int hint) const;
248 
250  std::vector<ModuleDescription const*>& allModuleDescriptions,
251  std::vector<std::pair<unsigned int, unsigned int>>& moduleIDToIndex,
252  std::array<std::vector<std::vector<ModuleDescription const*>>, NumBranchTypes>&
253  modulesWhoseProductsAreConsumedBy,
254  std::vector<std::vector<ModuleProcessName>>& modulesInPreviousProcessesWhoseProductsAreConsumedBy,
255  ProductRegistry const& preg) const;
256 
260  int totalEvents() const;
261 
264  int totalEventsPassed() const;
265 
268  int totalEventsFailed() const;
269 
272  void getTriggerReport(TriggerReport& rep) const;
273 
277 
279  bool terminate() const;
280 
282  void clearCounters();
283 
286  bool changeModule(std::string const& iLabel,
287  ParameterSet const& iPSet,
288  const ProductRegistry& iRegistry,
290 
292  void deleteModule(std::string const& iLabel, ActivityRegistry* areg);
293 
294  void initializeEarlyDelete(std::vector<std::string> const& branchesToDeleteEarly,
295  std::multimap<std::string, std::string> const& referencesToBranches,
296  std::vector<std::string> const& modulesToSkip,
297  edm::ProductRegistry const& preg);
298 
300  AllWorkers const& allWorkers() const;
301 
304 
305  private:
306  void limitOutput(ParameterSet const& proc_pset,
307  BranchIDLists const& branchIDLists,
308  SubProcessParentageHelper const* subProcessParentageHelper);
309 
310  std::shared_ptr<TriggerResultInserter const> resultsInserter() const {
312  }
313  std::shared_ptr<TriggerResultInserter>& resultsInserter() { return get_underlying_safe(resultsInserter_); }
314  std::shared_ptr<ModuleRegistry const> moduleRegistry() const { return get_underlying_safe(moduleRegistry_); }
315  std::shared_ptr<ModuleRegistry>& moduleRegistry() { return get_underlying_safe(moduleRegistry_); }
316 
318  std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>> pathStatusInserters_;
319  std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>> endPathStatusInserters_;
321  std::vector<edm::propagate_const<std::shared_ptr<StreamSchedule>>> streamSchedules_;
322  //In the future, we will have one GlobalSchedule per simultaneous transition
324 
327 
329 
330  std::vector<std::string> const* pathNames_;
331  std::vector<std::string> const* endPathNames_;
333  };
334 
335  template <typename T>
337  unsigned int iStreamID,
338  typename T::TransitionInfoType& transitionInfo,
339  ServiceToken const& token,
340  bool cleaningUpAfterException) {
341  assert(iStreamID < streamSchedules_.size());
342  streamSchedules_[iStreamID]->processOneStreamAsync<T>(
343  std::move(iTaskHolder), transitionInfo, token, cleaningUpAfterException);
344  }
345 
346  template <typename T>
348  typename T::TransitionInfoType& transitionInfo,
349  ServiceToken const& token,
350  bool cleaningUpAfterException) {
351  globalSchedule_->processOneGlobalAsync<T>(iTaskHolder, transitionInfo, token, cleaningUpAfterException);
352  }
353 
354 } // namespace edm
355 #endif
void endPaths(std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill the labels for all end paths in the process
Definition: Schedule.cc:1306
bool terminate() const
Return whether each output module has reached its maximum count.
Definition: Schedule.cc:768
std::vector< BranchIDList > BranchIDLists
Definition: BranchIDList.h:19
void getTriggerTimingReport(TriggerTimingReport &rep) const
Definition: Schedule.cc:1385
roAction_t actions[nactions]
Definition: GenABIO.cc:181
void writeProcessBlockAsync(WaitingTaskHolder iTask, ProcessBlockPrincipal const &, ProcessContext const *, ActivityRegistry *)
Definition: Schedule.cc:1110
void respondToCloseInputFile(FileBlock const &fb)
Definition: Schedule.cc:1183
int totalEvents() const
Definition: Schedule.cc:1392
void processOneStreamAsync(WaitingTaskHolder iTask, unsigned int iStreamID, typename T::TransitionInfoType &transitionInfo, ServiceToken const &token, bool cleaningUpAfterException=false)
Definition: Schedule.h:336
std::vector< Worker * > AllWorkers
Definition: Schedule.h:126
void processOneGlobalAsync(WaitingTaskHolder iTask, typename T::TransitionInfoType &transitionInfo, ServiceToken const &token, bool cleaningUpAfterException=false)
Definition: Schedule.h:347
void moduleDescriptionsInPath(std::string const &iPathLabel, std::vector< ModuleDescription const *> &descriptions, unsigned int hint) const
Definition: Schedule.cc:1312
static std::mutex mutex
Definition: Proxy.cc:8
std::vector< std::string > const * pathNames_
Definition: Schedule.h:330
void getTriggerReport(TriggerReport &rep) const
Definition: Schedule.cc:1375
void convertCurrentProcessAlias(std::string const &processName)
Convert "@currentProcess" in InputTag process names to the actual current process name...
Definition: Schedule.cc:1294
Definition: config.py:1
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::unique_ptr< SystemTimeKeeper > > summaryTimeKeeper_
Definition: Schedule.h:328
assert(be >=bs)
void processOneEventAsync(WaitingTaskHolder iTask, unsigned int iStreamID, EventTransitionInfo &, ServiceToken const &token)
Definition: Schedule.cc:1206
edm::propagate_const< std::unique_ptr< GlobalSchedule > > globalSchedule_
Definition: Schedule.h:323
std::shared_ptr< TriggerResultInserter > & resultsInserter()
Definition: Schedule.h:313
void deleteModule(std::string const &iLabel, ActivityRegistry *areg)
Deletes module with label iLabel.
Definition: Schedule.cc:1263
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
Definition: Schedule.cc:1281
std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter > > > pathStatusInserters_
Definition: Schedule.h:318
bool changeModule(std::string const &iLabel, ParameterSet const &iPSet, const ProductRegistry &iRegistry, eventsetup::ESRecordsToProductResolverIndices const &)
Definition: Schedule.cc:1214
void fillModuleAndConsumesInfo(std::vector< ModuleDescription const *> &allModuleDescriptions, std::vector< std::pair< unsigned int, unsigned int >> &moduleIDToIndex, std::array< std::vector< std::vector< ModuleDescription const *>>, NumBranchTypes > &modulesWhoseProductsAreConsumedBy, std::vector< std::vector< ModuleProcessName >> &modulesInPreviousProcessesWhoseProductsAreConsumedBy, ProductRegistry const &preg) const
Definition: Schedule.cc:1324
Schedule(ParameterSet &proc_pset, service::TriggerNamesService const &tns, ProductRegistry &pregistry, ExceptionToActionTable const &actions, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, PreallocationConfiguration const &config, ProcessContext const *processContext, ModuleTypeResolverMaker const *resolverMaker)
Definition: Schedule.cc:486
edm::propagate_const< std::shared_ptr< TriggerResultInserter > > resultsInserter_
Definition: Schedule.h:317
void clearCounters()
Clear all the counters in the trigger report.
Definition: Schedule.cc:1416
std::vector< std::string > vstring
Definition: Schedule.h:125
void limitOutput(ParameterSet const &proc_pset, BranchIDLists const &branchIDLists, SubProcessParentageHelper const *subProcessParentageHelper)
Definition: Schedule.cc:727
void respondToOpenInputFile(FileBlock const &fb)
Definition: Schedule.cc:1178
edm::propagate_const< std::shared_ptr< ModuleRegistry > > moduleRegistry_
Definition: Schedule.h:320
rep
Definition: cuy.py:1189
void beginStream(unsigned int streamID)
Definition: Schedule.cc:1196
void writeRunAsync(WaitingTaskHolder iTask, RunPrincipal const &rp, ProcessContext const *, ActivityRegistry *, MergeableRunProductMetadata const *)
Definition: Schedule.cc:1077
int totalEventsPassed() const
Definition: Schedule.cc:1400
std::shared_ptr< ModuleRegistry const > moduleRegistry() const
Definition: Schedule.h:314
std::vector< Worker * > Workers
Definition: Schedule.h:129
void availablePaths(std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill the labels for all paths in the process
Definition: Schedule.cc:1300
PreallocationConfiguration preallocConfig_
Definition: Schedule.h:326
std::vector< edm::propagate_const< std::shared_ptr< StreamSchedule > > > streamSchedules_
Definition: Schedule.h:321
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
Definition: Schedule.cc:1292
void endStream(unsigned int streamID, ExceptionCollector &collector, std::mutex &collectorMutex) noexcept
Definition: Schedule.cc:1201
bool shouldWeCloseOutput() const
Definition: Schedule.cc:1169
void moduleDescriptionsInEndPath(std::string const &iEndPathLabel, std::vector< ModuleDescription const *> &descriptions, unsigned int hint) const
Definition: Schedule.cc:1318
std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter > > > endPathStatusInserters_
Definition: Schedule.h:319
AllOutputModuleCommunicators all_output_communicators_
Definition: Schedule.h:325
std::vector< edm::propagate_const< std::shared_ptr< OutputModuleCommunicator > > > AllOutputModuleCommunicators
Definition: Schedule.h:127
bool wantSummary_
Definition: Schedule.h:332
std::vector< std::string > const * endPathNames_
Definition: Schedule.h:331
HLT enums.
void modulesInPath(std::string const &iPathLabel, std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel ...
Definition: Schedule.cc:1308
void finishSetup(ParameterSet &proc_pset, service::TriggerNamesService const &tns, ProductRegistry &preg, BranchIDListHelper &branchIDListHelper, ProcessBlockHelperBase &processBlockHelper, ThinnedAssociationsHelper &thinnedAssociationsHelper, SubProcessParentageHelper const *subProcessParentageHelper, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration > processConfiguration, bool hasSubprocesses, PreallocationConfiguration const &prealloc, ProcessContext const *processContext)
Definition: Schedule.cc:578
std::shared_ptr< ModuleRegistry > & moduleRegistry()
Definition: Schedule.h:315
void sendFwkSummaryToMessageLogger() const
Definition: Schedule.cc:798
std::shared_ptr< TriggerResultInserter const > resultsInserter() const
Definition: Schedule.h:310
void openOutputFiles(FileBlock &fb)
Definition: Schedule.cc:1072
void writeLumiAsync(WaitingTaskHolder iTask, LuminosityBlockPrincipal const &lbp, ProcessContext const *, ActivityRegistry *)
Definition: Schedule.cc:1140
long double T
void initializeEarlyDelete(std::vector< std::string > const &branchesToDeleteEarly, std::multimap< std::string, std::string > const &referencesToBranches, std::vector< std::string > const &modulesToSkip, edm::ProductRegistry const &preg)
Definition: Schedule.cc:1271
void endJob(ExceptionCollector &collector)
Definition: Schedule.cc:783
int totalEventsFailed() const
Definition: Schedule.cc:1408
def move(src, dest)
Definition: eostools.py:511
void beginJob(ProductRegistry const &, eventsetup::ESRecordsToProductResolverIndices const &, ProcessBlockHelperBase const &, PathsAndConsumesOfModulesBase const &, ProcessContext const &)
Definition: Schedule.cc:1188
void closeOutputFiles()
Definition: Schedule.cc:1064
void triggerPaths(std::vector< std::string > &oLabelsToFill) const
Definition: Schedule.cc:1304