CMS 3D CMS Logo

SubProcess.h
Go to the documentation of this file.
1 #ifndef FWCore_Framework_SubProcess_h
2 #define FWCore_Framework_SubProcess_h
3 
20 
22 
23 #include <map>
24 #include <memory>
25 #include <set>
26 
27 namespace edm {
28  class ActivityRegistry;
29  class BranchDescription;
30  class BranchIDListHelper;
31  class HistoryAppender;
32  class IOVSyncValue;
33  class ParameterSet;
34  class ProductRegistry;
35  class PreallocationConfiguration;
36  class ThinnedAssociationsHelper;
37  class SubProcessParentageHelper;
38  class WaitingTaskHolder;
39 
40  namespace eventsetup {
41  class EventSetupsController;
42  }
43  class SubProcess : public EDConsumerBase {
44  public:
46  ParameterSet const& topLevelParameterSet,
47  std::shared_ptr<ProductRegistry const> parentProductRegistry,
48  std::shared_ptr<BranchIDListHelper const> parentBranchIDListHelper,
49  ThinnedAssociationsHelper const& parentThinnedAssociationsHelper,
50  SubProcessParentageHelper const& parentSubProcessParentageHelper,
52  ActivityRegistry& parentActReg,
53  ServiceToken const& token,
55  PreallocationConfiguration const& preallocConfig,
56  ProcessContext const* parentProcessContext);
57 
58  virtual ~SubProcess();
59 
60  SubProcess(SubProcess const&) = delete; // Disallow copying
61  SubProcess& operator=(SubProcess const&) = delete; // Disallow copying
62  SubProcess(SubProcess&&) = default; // Allow Moving
63  SubProcess& operator=(SubProcess&&) = default; // Allow moving
64 
65  //From OutputModule
66  void selectProducts(ProductRegistry const& preg,
67  ThinnedAssociationsHelper const& parentThinnedAssociationsHelper,
68  std::map<BranchID, bool>& keepAssociation);
69 
70  SelectedProductsForBranchType const& keptProducts() const {return keptProducts_;}
71 
72  void doBeginJob();
73  void doEndJob();
74 
75  void doEventAsync(WaitingTaskHolder iHolder,
76  EventPrincipal const& principal);
77 
78  void doBeginRun(RunPrincipal const& principal, IOVSyncValue const& ts);
79 
80  void doEndRun(RunPrincipal const& principal, IOVSyncValue const& ts, bool cleaningUpAfterException);
81 
82  void doBeginLuminosityBlock(LuminosityBlockPrincipal const& principal, IOVSyncValue const& ts);
83 
84  void doEndLuminosityBlock(LuminosityBlockPrincipal const& principal, IOVSyncValue const& ts, bool cleaningUpAfterException);
85 
86 
87  void doBeginStream(unsigned int);
88  void doEndStream(unsigned int);
89  void doStreamBeginRun(unsigned int iID, RunPrincipal const& principal, IOVSyncValue const& ts);
90  void doStreamBeginRunAsync(WaitingTaskHolder iHolder,
91  unsigned int iID,
92  RunPrincipal const& principal,
93  IOVSyncValue const& ts);
94 
95  void doStreamEndRun(unsigned int iID, RunPrincipal const& principal, IOVSyncValue const& ts, bool cleaningUpAfterException);
96  void doStreamEndRunAsync(WaitingTaskHolder iHolder,
97  unsigned int iID, RunPrincipal const& principal,
98  IOVSyncValue const& ts,
99  bool cleaningUpAfterException);
100 
101  void doStreamBeginLuminosityBlock(unsigned int iID, LuminosityBlockPrincipal const& principal, IOVSyncValue const& ts);
102  void doStreamBeginLuminosityBlockAsync(WaitingTaskHolder iHolder,
103  unsigned int iID,
104  LuminosityBlockPrincipal const& principal,
105  IOVSyncValue const& ts);
106 
107  void doStreamEndLuminosityBlock(unsigned int iID, LuminosityBlockPrincipal const& principal, IOVSyncValue const& ts, bool cleaningUpAfterException);
108  void doStreamEndLuminosityBlockAsync(WaitingTaskHolder iHolder,
109  unsigned int iID,
110  LuminosityBlockPrincipal const& principal,
111  IOVSyncValue const& ts,
112  bool cleaningUpAfterException);
113 
114 
115  // Write the luminosity block
116  void writeLumi(ProcessHistoryID const& parentPhID, int runNumber, int lumiNumber);
117 
118  void deleteLumiFromCache(ProcessHistoryID const& parentPhID, int runNumber, int lumiNumber);
119 
120  // Write the run
121  void writeRun(ProcessHistoryID const& parentPhID, int runNumber);
122 
123  void deleteRunFromCache(ProcessHistoryID const& parentPhID, int runNumber);
124 
125  // Call closeFile() on all OutputModules.
127  ServiceRegistry::Operate operate(serviceToken_);
128  schedule_->closeOutputFiles();
129  for_all(subProcesses_, [](auto& subProcess) { subProcess.closeOutputFiles(); });
130  }
131 
132  // Call openNewFileIfNeeded() on all OutputModules
134  ServiceRegistry::Operate operate(serviceToken_);
135  schedule_->openNewOutputFilesIfNeeded();
136  for_all(subProcesses_, [](auto& subProcess) { subProcess.openNewOutputFilesIfNeeded(); });
137  }
138 
139  // Call openFiles() on all OutputModules
141  ServiceRegistry::Operate operate(serviceToken_);
142  schedule_->openOutputFiles(fb);
143  for_all(subProcesses_, [&fb](auto& subProcess) { subProcess.openOutputFiles(fb); });
144  }
145 
146  void updateBranchIDListHelper(BranchIDLists const&);
147 
148  // Call respondToOpenInputFile() on all Modules
149  void respondToOpenInputFile(FileBlock const& fb);
150 
151  // Call respondToCloseInputFile() on all Modules
153  ServiceRegistry::Operate operate(serviceToken_);
154  schedule_->respondToCloseInputFile(fb);
155  for_all(subProcesses_, [&fb](auto& subProcess) { subProcess.respondToCloseInputFile(fb); });
156  }
157 
158  // Call shouldWeCloseFile() on all OutputModules.
159  bool shouldWeCloseOutput() const {
160  ServiceRegistry::Operate operate(serviceToken_);
161  if(schedule_->shouldWeCloseOutput()) {
162  return true;
163  }
164  for(auto const& subProcess : subProcesses_) {
165  if(subProcess.shouldWeCloseOutput()) {
166  return true;
167  }
168  }
169  return false;
170  }
171 
173  ServiceRegistry::Operate operate(serviceToken_);
174  schedule_->preForkReleaseResources();
175  for_all(subProcesses_, [](auto& subProcess){ subProcess.preForkReleaseResources(); });
176  }
177 
178  void postForkReacquireResources(unsigned int iChildIndex, unsigned int iNumberOfChildren) {
179  ServiceRegistry::Operate operate(serviceToken_);
180  schedule_->postForkReacquireResources(iChildIndex, iNumberOfChildren);
181  for_all(subProcesses_, [iChildIndex, iNumberOfChildren](auto& subProcess){ subProcess.postForkReacquireResources(iChildIndex, iNumberOfChildren); });
182  }
183 
185 
189  std::vector<ModuleDescription const*> getAllModuleDescriptions() const;
190 
194  int totalEvents() const {
195  return schedule_->totalEvents();
196  }
197 
199  int totalEventsPassed() const {
200  ServiceRegistry::Operate operate(serviceToken_);
201  return schedule_->totalEventsPassed();
202  }
203 
206  int totalEventsFailed() const {
207  ServiceRegistry::Operate operate(serviceToken_);
208  return schedule_->totalEventsFailed();
209  }
210 
213  void enableEndPaths(bool active) {
214  ServiceRegistry::Operate operate(serviceToken_);
215  schedule_->enableEndPaths(active);
216  for_all(subProcesses_, [active](auto& subProcess){ subProcess.enableEndPaths(active); });
217  }
218 
220  bool endPathsEnabled() const {
221  ServiceRegistry::Operate operate(serviceToken_);
222  return schedule_->endPathsEnabled();
223  }
224 
228  ServiceRegistry::Operate operate(serviceToken_);
229  schedule_->getTriggerReport(rep);
230  }
231 
234  bool terminate() const {
235  ServiceRegistry::Operate operate(serviceToken_);
236  if(schedule_->terminate()) {
237  return true;
238  }
239  for(auto const& subProcess : subProcesses_) {
240  if(subProcess.terminate()) {
241  return true;
242  }
243  }
244  return false;
245  }
246 
248  void clearCounters() {
249  ServiceRegistry::Operate operate(serviceToken_);
250  schedule_->clearCounters();
251  for_all(subProcesses_, [](auto& subProcess){ subProcess.clearCounters(); });
252  }
253 
254  private:
255  void beginJob();
256  void endJob();
257  void processAsync(WaitingTaskHolder iHolder, EventPrincipal const& e);
258  void beginRun(RunPrincipal const& r, IOVSyncValue const& ts);
259  void endRun(RunPrincipal const& r, IOVSyncValue const& ts, bool cleaningUpAfterException);
260  void beginLuminosityBlock(LuminosityBlockPrincipal const& lb, IOVSyncValue const& ts);
261  void endLuminosityBlock(LuminosityBlockPrincipal const& lb, IOVSyncValue const& ts, bool cleaningUpAfterException);
262 
263  void propagateProducts(BranchType type, Principal const& parentPrincipal, Principal& principal) const;
264  void fixBranchIDListsForEDAliases(std::map<BranchID::value_type, BranchID::value_type> const& droppedBranchIDToKeptBranchID);
265  void keepThisBranch(BranchDescription const& desc,
266  std::map<BranchID, BranchDescription const*>& trueBranchIDToKeptBranchDesc,
267  std::set<BranchID>& keptProductsInEvent);
268 
269  std::map<BranchID::value_type, BranchID::value_type> const& droppedBranchIDToKeptBranchID() {
270  return droppedBranchIDToKeptBranchID_;
271  }
272 
273  std::shared_ptr<BranchIDListHelper const> branchIDListHelper() const {return get_underlying_safe(branchIDListHelper_);}
274  std::shared_ptr<BranchIDListHelper>& branchIDListHelper() {return get_underlying_safe(branchIDListHelper_);}
275  std::shared_ptr<ThinnedAssociationsHelper const> thinnedAssociationsHelper() const {return get_underlying_safe(thinnedAssociationsHelper_);}
276  std::shared_ptr<ThinnedAssociationsHelper> thinnedAssociationsHelper() {return get_underlying_safe(thinnedAssociationsHelper_);}
277 
278  std::shared_ptr<ActivityRegistry> actReg_; // We do not use propagate_const because the registry itself is mutable.
280  std::shared_ptr<ProductRegistry const> parentPreg_;
281  std::shared_ptr<ProductRegistry const> preg_;
285  std::unique_ptr<ExceptionToActionTable const> act_table_;
286  std::shared_ptr<ProcessConfiguration const> processConfiguration_;
289  //We require 1 history for each Run, Lumi and Stream
290  // The vectors first hold Stream info, then Lumi then Run
291  unsigned int historyLumiOffset_;
292  unsigned int historyRunOffset_;
293  std::vector<ProcessHistoryRegistry> processHistoryRegistries_;
294  std::vector<HistoryAppender> historyAppenders_;
298  std::map<ProcessHistoryID, ProcessHistoryID> parentToChildPhID_;
299  std::vector<SubProcess> subProcesses_;
301 
302  // keptProducts_ are pointers to the BranchDescription objects describing
303  // the branches we are to write.
304  //
305  // We do not own the BranchDescriptions to which we point.
309 
310  //EventSelection
314 
315  // needed because of possible EDAliases.
316  // filled in only if key and value are different.
317  std::map<BranchID::value_type, BranchID::value_type> droppedBranchIDToKeptBranchID_;
318 
319  };
320 
321  // free function
322  std::vector<ParameterSet> popSubProcessVParameterSet(ParameterSet& parameterSet);
323 }
324 #endif
unsigned int historyRunOffset_
Definition: SubProcess.h:292
unsigned int historyLumiOffset_
Definition: SubProcess.h:291
ParameterSetID selector_config_id_
Definition: SubProcess.h:312
std::shared_ptr< ThinnedAssociationsHelper > thinnedAssociationsHelper()
Definition: SubProcess.h:276
type
Definition: HCALResponse.h:21
void enableEndPaths(bool active)
Definition: SubProcess.h:213
std::vector< BranchIDList > BranchIDLists
Definition: BranchIDList.h:19
int totalEventsFailed() const
Definition: SubProcess.h:206
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
Definition: SubProcess.h:273
std::vector< ProcessHistoryRegistry > processHistoryRegistries_
Definition: SubProcess.h:293
int totalEvents() const
Definition: SubProcess.h:194
std::array< SelectedProducts, NumBranchTypes > SelectedProductsForBranchType
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
Definition: SubProcess.h:283
std::vector< SubProcess > subProcesses_
Definition: SubProcess.h:299
Definition: Hash.h:43
PathsAndConsumesOfModules pathsAndConsumesOfModules_
Definition: SubProcess.h:288
SelectedProductsForBranchType const & keptProducts() const
Definition: SubProcess.h:70
bool terminate() const
Definition: SubProcess.h:234
void openOutputFiles(FileBlock &fb)
Definition: SubProcess.h:140
void postForkReacquireResources(unsigned int iChildIndex, unsigned int iNumberOfChildren)
Definition: SubProcess.h:178
std::map< BranchID::value_type, BranchID::value_type > const & droppedBranchIDToKeptBranchID()
Definition: SubProcess.h:269
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
Definition: SubProcess.h:297
int totalEventsPassed() const
Return the number of events which have been passed by one or more trigger paths.
Definition: SubProcess.h:199
bool endPathsEnabled() const
Return true if end_paths are active, and false if they are inactive.
Definition: SubProcess.h:220
BranchType
Definition: BranchType.h:11
void beginJob()
Definition: Breakpoints.cc:15
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
void openNewOutputFilesIfNeeded()
Definition: SubProcess.h:133
ServiceToken serviceToken_
Definition: SubProcess.h:279
def principal(options)
bool shouldWeCloseOutput() const
Definition: SubProcess.h:159
SelectedProductsForBranchType keptProducts_
Definition: SubProcess.h:306
ProductSelectorRules productSelectorRules_
Definition: SubProcess.h:307
void closeOutputFiles()
Definition: SubProcess.h:126
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
Definition: SubProcess.h:296
edm::propagate_const< std::unique_ptr< ParameterSet > > processParameterSet_
Definition: SubProcess.h:300
std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
rep
Definition: cuy.py:1188
void preForkReleaseResources()
Definition: SubProcess.h:172
std::unique_ptr< ExceptionToActionTable const > act_table_
Definition: SubProcess.h:285
std::vector< ParameterSet > popSubProcessVParameterSet(ParameterSet &parameterSet)
Definition: SubProcess.cc:692
std::map< BranchID::value_type, BranchID::value_type > droppedBranchIDToKeptBranchID_
Definition: SubProcess.h:317
ProductSelector productSelector_
Definition: SubProcess.h:308
detail::TriggerResultsBasedEventSelector selectors_
Definition: SubProcess.h:313
std::map< ProcessHistoryID, ProcessHistoryID > parentToChildPhID_
Definition: SubProcess.h:298
void respondToCloseInputFile(FileBlock const &fb)
Definition: SubProcess.h:152
std::shared_ptr< ActivityRegistry > actReg_
Definition: SubProcess.h:278
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
Definition: SubProcess.h:282
void getTriggerReport(TriggerReport &rep) const
Definition: SubProcess.h:227
std::vector< HistoryAppender > historyAppenders_
Definition: SubProcess.h:294
HLT enums.
std::shared_ptr< ProductRegistry const > parentPreg_
Definition: SubProcess.h:280
std::shared_ptr< BranchIDListHelper > & branchIDListHelper()
Definition: SubProcess.h:274
ProcessContext processContext_
Definition: SubProcess.h:287
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
Definition: SubProcess.h:275
void clearCounters()
Clear all the counters in the trigger report.
Definition: SubProcess.h:248
std::shared_ptr< ProductRegistry const > preg_
Definition: SubProcess.h:281
PrincipalCache principalCache_
Definition: SubProcess.h:295
ParameterSet const & parameterSet(Provenance const &provenance)
Definition: Provenance.cc:11
edm::propagate_const< std::shared_ptr< SubProcessParentageHelper > > subProcessParentageHelper_
Definition: SubProcess.h:284
std::shared_ptr< ProcessConfiguration const > processConfiguration_
Definition: SubProcess.h:286
def operate(timelog, memlog, json_f, num)