CMS 3D CMS Logo

streamTransitionAsync.h
Go to the documentation of this file.
1 #ifndef FWCore_Framework_streamTransitionAsync_h
2 #define FWCore_Framework_streamTransitionAsync_h
3 // -*- C++ -*-
4 //
5 // Package: FWCore/Framework
6 // Function: streamTransitionAsync
7 //
16 //
17 // Original Author: Chris Jones
18 // Created: Tue, 06 Sep 2016 16:04:26 GMT
19 //
20 
21 // system include files
27 
28 // user include files
29 
30 // forward declarations
31 
32 namespace edm {
33  class EventSetupImpl;
34  class LuminosityBlockPrincipal;
35  class RunPrincipal;
36 
37  //This is code in common between beginStreamRun and beginStreamLuminosityBlock
39  WaitingTaskHolder iHolder,
40  SubProcess& iSubProcess,
41  unsigned int i,
42  LuminosityBlockPrincipal& iPrincipal,
43  IOVSyncValue const& iTS,
44  std::vector<std::shared_ptr<const EventSetupImpl>> const* iEventSetupImpls) {
45  iSubProcess.doStreamBeginLuminosityBlockAsync(std::move(iHolder), i, iPrincipal, iTS, iEventSetupImpls);
46  }
47 
49  WaitingTaskHolder iHolder,
50  SubProcess& iSubProcess,
51  unsigned int i,
52  RunPrincipal& iPrincipal,
53  IOVSyncValue const& iTS,
54  std::vector<std::shared_ptr<const EventSetupImpl>> const* iEventSetupImpls) {
55  iSubProcess.doStreamBeginRunAsync(std::move(iHolder), i, iPrincipal, iTS, iEventSetupImpls);
56  }
57 
59  WaitingTaskHolder iHolder,
60  SubProcess& iSubProcess,
61  unsigned int i,
62  LuminosityBlockPrincipal& iPrincipal,
63  IOVSyncValue const& iTS,
64  std::vector<std::shared_ptr<const EventSetupImpl>> const* iEventSetupImpls,
65  bool cleaningUpAfterException) {
67  std::move(iHolder), i, iPrincipal, iTS, iEventSetupImpls, cleaningUpAfterException);
68  }
69 
71  WaitingTaskHolder iHolder,
72  SubProcess& iSubProcess,
73  unsigned int i,
74  RunPrincipal& iPrincipal,
75  IOVSyncValue const& iTS,
76  std::vector<std::shared_ptr<const EventSetupImpl>> const* iEventSetupImpls,
77  bool cleaningUpAfterException) {
78  iSubProcess.doStreamEndRunAsync(std::move(iHolder), i, iPrincipal, iTS, iEventSetupImpls, cleaningUpAfterException);
79  }
80 
81  template <typename Traits, typename P, typename SC>
83  WaitingTaskHolder iWait,
84  Schedule& iSchedule,
85  unsigned int iStreamIndex,
86  P& iPrincipal,
87  IOVSyncValue const& iTS,
88  EventSetupImpl const& iES,
89  std::vector<std::shared_ptr<const EventSetupImpl>> const*
90  iEventSetupImpls, // always null for runs until we enable concurrent run processing
91  ServiceToken const& token,
92  SC& iSubProcesses) {
93  //When we are done processing the stream for this process,
94  // we need to run the stream for all SubProcesses
95  //NOTE: The subprocesses set their own service tokens
96  auto subs = make_waiting_task(
97  tbb::task::allocate_root(),
98  [&iSubProcesses, iWait, iStreamIndex, &iPrincipal, iTS, iEventSetupImpls](
99  std::exception_ptr const* iPtr) mutable {
100  if (iPtr) {
101  auto excpt = *iPtr;
102  auto delayError =
103  make_waiting_task(tbb::task::allocate_root(),
104  [iWait, excpt](std::exception_ptr const*) mutable { iWait.doneWaiting(excpt); });
105  WaitingTaskHolder h(delayError);
106  for (auto& subProcess : iSubProcesses) {
107  subProcessDoStreamBeginTransitionAsync(h, subProcess, iStreamIndex, iPrincipal, iTS, iEventSetupImpls);
108  };
109  } else {
110  for (auto& subProcess : iSubProcesses) {
112  iWait, subProcess, iStreamIndex, iPrincipal, iTS, iEventSetupImpls);
113  };
114  }
115  });
116 
117  WaitingTaskHolder h(subs);
118  iSchedule.processOneStreamAsync<Traits>(std::move(h), iStreamIndex, iPrincipal, iES, token);
119  }
120 
121  template <typename Traits, typename P, typename SC>
123  Schedule& iSchedule,
124  unsigned int iNStreams,
125  P& iPrincipal,
126  IOVSyncValue const& iTS,
127  EventSetupImpl const& iES,
128  std::vector<std::shared_ptr<const EventSetupImpl>> const* iEventSetupImpls,
129  ServiceToken const& token,
130  SC& iSubProcesses) {
131  WaitingTaskHolder holdUntilAllStreamsCalled(iWait);
132  for (unsigned int i = 0; i < iNStreams; ++i) {
133  beginStreamTransitionAsync<Traits>(
134  WaitingTaskHolder(iWait), iSchedule, i, iPrincipal, iTS, iES, iEventSetupImpls, token, iSubProcesses);
135  }
136  }
137 
138  template <typename Traits, typename P, typename SC>
140  Schedule& iSchedule,
141  unsigned int iStreamIndex,
142  P& iPrincipal,
143  IOVSyncValue const& iTS,
144  EventSetupImpl const& iES,
145  std::vector<std::shared_ptr<const EventSetupImpl>> const*
146  iEventSetupImpls, // always null for runs until we enable concurrent run processing
147  ServiceToken const& token,
148  SC& iSubProcesses,
149  bool cleaningUpAfterException) {
150  //When we are done processing the stream for this process,
151  // we need to run the stream for all SubProcesses
152  //NOTE: The subprocesses set their own service tokens
153 
154  auto subs = make_waiting_task(
155  tbb::task::allocate_root(),
156  [&iSubProcesses, iWait, iStreamIndex, &iPrincipal, iTS, iEventSetupImpls, cleaningUpAfterException](
157  std::exception_ptr const* iPtr) mutable {
158  if (iPtr) {
159  auto excpt = *iPtr;
160  auto delayError =
161  make_waiting_task(tbb::task::allocate_root(),
162  [iWait, excpt](std::exception_ptr const*) mutable { iWait.doneWaiting(excpt); });
163  WaitingTaskHolder h(delayError);
164  for (auto& subProcess : iSubProcesses) {
166  h, subProcess, iStreamIndex, iPrincipal, iTS, iEventSetupImpls, cleaningUpAfterException);
167  }
168  } else {
169  for (auto& subProcess : iSubProcesses) {
171  iWait, subProcess, iStreamIndex, iPrincipal, iTS, iEventSetupImpls, cleaningUpAfterException);
172  }
173  }
174  });
175 
176  iSchedule.processOneStreamAsync<Traits>(
177  WaitingTaskHolder(subs), iStreamIndex, iPrincipal, iES, token, cleaningUpAfterException);
178  }
179 
180  template <typename Traits, typename P, typename SC>
182  Schedule& iSchedule,
183  unsigned int iNStreams,
184  P& iPrincipal,
185  IOVSyncValue const& iTS,
186  EventSetupImpl const& iES,
187  std::vector<std::shared_ptr<const EventSetupImpl>> const* iEventSetupImpls,
188  ServiceToken const& iToken,
189  SC& iSubProcesses,
190  bool cleaningUpAfterException) {
191  for (unsigned int i = 0; i < iNStreams; ++i) {
192  endStreamTransitionAsync<Traits>(
193  iWait, iSchedule, i, iPrincipal, iTS, iES, iEventSetupImpls, iToken, iSubProcesses, cleaningUpAfterException);
194  }
195  }
196 }; // namespace edm
197 
198 #endif
FWCore Framework interface EventSetupRecordImplementation h
Helper function to determine trigger accepts.
void subProcessDoStreamBeginTransitionAsync(WaitingTaskHolder iHolder, SubProcess &iSubProcess, unsigned int i, LuminosityBlockPrincipal &iPrincipal, IOVSyncValue const &iTS, std::vector< std::shared_ptr< const EventSetupImpl >> const *iEventSetupImpls)
void beginStreamsTransitionAsync(WaitingTask *iWait, Schedule &iSchedule, unsigned int iNStreams, P &iPrincipal, IOVSyncValue const &iTS, EventSetupImpl const &iES, std::vector< std::shared_ptr< const EventSetupImpl >> const *iEventSetupImpls, ServiceToken const &token, SC &iSubProcesses)
void doStreamEndLuminosityBlockAsync(WaitingTaskHolder iHolder, unsigned int iID, LuminosityBlockPrincipal const &principal, IOVSyncValue const &ts, std::vector< std::shared_ptr< const EventSetupImpl >> const *, bool cleaningUpAfterException)
Definition: SubProcess.cc:642
void endStreamsTransitionAsync(WaitingTaskHolder iWait, Schedule &iSchedule, unsigned int iNStreams, P &iPrincipal, IOVSyncValue const &iTS, EventSetupImpl const &iES, std::vector< std::shared_ptr< const EventSetupImpl >> const *iEventSetupImpls, ServiceToken const &iToken, SC &iSubProcesses, bool cleaningUpAfterException)
void endStreamTransitionAsync(WaitingTaskHolder iWait, Schedule &iSchedule, unsigned int iStreamIndex, P &iPrincipal, IOVSyncValue const &iTS, EventSetupImpl const &iES, std::vector< std::shared_ptr< const EventSetupImpl >> const *iEventSetupImpls, ServiceToken const &token, SC &iSubProcesses, bool cleaningUpAfterException)
void subProcessDoStreamEndTransitionAsync(WaitingTaskHolder iHolder, SubProcess &iSubProcess, unsigned int i, LuminosityBlockPrincipal &iPrincipal, IOVSyncValue const &iTS, std::vector< std::shared_ptr< const EventSetupImpl >> const *iEventSetupImpls, bool cleaningUpAfterException)
void doneWaiting(std::exception_ptr iExcept)
void processOneStreamAsync(WaitingTaskHolder iTask, unsigned int iStreamID, typename T::MyPrincipal &principal, EventSetupImpl const &eventSetup, ServiceToken const &token, bool cleaningUpAfterException=false)
Definition: Schedule.h:318
void doStreamEndRunAsync(WaitingTaskHolder iHolder, unsigned int iID, RunPrincipal const &principal, IOVSyncValue const &ts, std::vector< std::shared_ptr< const EventSetupImpl >> const *, bool cleaningUpAfterException)
Definition: SubProcess.cc:600
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:87
std::pair< OmniClusterRef, TrackingParticleRef > P
HLT enums.
void doStreamBeginLuminosityBlockAsync(WaitingTaskHolder iHolder, unsigned int iID, LuminosityBlockPrincipal const &principal, IOVSyncValue const &ts, std::vector< std::shared_ptr< const EventSetupImpl >> const *)
Definition: SubProcess.cc:621
void doStreamBeginRunAsync(WaitingTaskHolder iHolder, unsigned int iID, RunPrincipal const &principal, IOVSyncValue const &ts, std::vector< std::shared_ptr< const EventSetupImpl >> const *)
Definition: SubProcess.cc:580
def move(src, dest)
Definition: eostools.py:511
void beginStreamTransitionAsync(WaitingTaskHolder iWait, Schedule &iSchedule, unsigned int iStreamIndex, P &iPrincipal, IOVSyncValue const &iTS, EventSetupImpl const &iES, std::vector< std::shared_ptr< const EventSetupImpl >> const *iEventSetupImpls, ServiceToken const &token, SC &iSubProcesses)