CMS 3D CMS Logo

streamTransitionAsync.h
Go to the documentation of this file.
1 #ifndef FWCore_Framework_streamTransitionAsync_h
2 #define FWCore_Framework_streamTransitionAsync_h
3 // -*- C++ -*-
4 //
5 // Package: FWCore/Framework
6 // Function: streamTransitionAsync
7 //
16 //
17 // Original Author: Chris Jones
18 // Created: Tue, 06 Sep 2016 16:04:26 GMT
19 //
20 
21 // system include files
26 
27 // user include files
28 
29 // forward declarations
30 
31 namespace edm {
32  class IOVSyncValue;
33  class EventSetupImpl;
34  class LuminosityBlockPrincipal;
35  class RunPrincipal;
36 
37  //This is code in common between beginStreamRun and beginStreamLuminosityBlock
39  SubProcess& iSubProcess,
40  unsigned int i,
41  LuminosityBlockPrincipal& iPrincipal,
42  IOVSyncValue const& iTS) {
43  iSubProcess.doStreamBeginLuminosityBlockAsync(std::move(iHolder), i, iPrincipal, iTS);
44  }
45 
47  SubProcess& iSubProcess,
48  unsigned int i,
49  RunPrincipal& iPrincipal,
50  IOVSyncValue const& iTS) {
51  iSubProcess.doStreamBeginRunAsync(std::move(iHolder), i, iPrincipal, iTS);
52  }
53 
55  SubProcess& iSubProcess,
56  unsigned int i,
57  LuminosityBlockPrincipal& iPrincipal,
58  IOVSyncValue const& iTS,
59  bool cleaningUpAfterException) {
60  iSubProcess.doStreamEndLuminosityBlockAsync(std::move(iHolder), i, iPrincipal, iTS, cleaningUpAfterException);
61  }
62 
64  SubProcess& iSubProcess,
65  unsigned int i,
66  RunPrincipal& iPrincipal,
67  IOVSyncValue const& iTS,
68  bool cleaningUpAfterException) {
69  iSubProcess.doStreamEndRunAsync(std::move(iHolder), i, iPrincipal, iTS, cleaningUpAfterException);
70  }
71 
72  template <typename Traits, typename P, typename SC>
74  Schedule& iSchedule,
75  unsigned int iStreamIndex,
76  P& iPrincipal,
77  IOVSyncValue const& iTS,
78  EventSetupImpl const& iES,
79  ServiceToken const& token,
80  SC& iSubProcesses) {
81  //When we are done processing the stream for this process,
82  // we need to run the stream for all SubProcesses
83  //NOTE: The subprocesses set their own service tokens
84  auto subs = make_waiting_task(
85  tbb::task::allocate_root(),
86  [&iSubProcesses, iWait, iStreamIndex, &iPrincipal, iTS](std::exception_ptr const* iPtr) mutable {
87  if (iPtr) {
88  auto excpt = *iPtr;
89  auto delayError =
90  make_waiting_task(tbb::task::allocate_root(),
91  [iWait, excpt](std::exception_ptr const*) mutable { iWait.doneWaiting(excpt); });
92  WaitingTaskHolder h(delayError);
93  for (auto& subProcess : iSubProcesses) {
94  subProcessDoStreamBeginTransitionAsync(h, subProcess, iStreamIndex, iPrincipal, iTS);
95  };
96  } else {
97  for (auto& subProcess : iSubProcesses) {
98  subProcessDoStreamBeginTransitionAsync(iWait, subProcess, iStreamIndex, iPrincipal, iTS);
99  };
100  }
101  });
102 
103  WaitingTaskHolder h(subs);
104  iSchedule.processOneStreamAsync<Traits>(std::move(h), iStreamIndex, iPrincipal, iES, token);
105  }
106 
107  template <typename Traits, typename P, typename SC>
109  Schedule& iSchedule,
110  unsigned int iNStreams,
111  P& iPrincipal,
112  IOVSyncValue const& iTS,
113  EventSetupImpl const& iES,
114  ServiceToken const& token,
115  SC& iSubProcesses) {
116  WaitingTaskHolder holdUntilAllStreamsCalled(iWait);
117  for (unsigned int i = 0; i < iNStreams; ++i) {
118  beginStreamTransitionAsync<Traits>(
119  WaitingTaskHolder(iWait), iSchedule, i, iPrincipal, iTS, iES, token, iSubProcesses);
120  }
121  }
122 
123  template <typename Traits, typename P, typename SC>
125  Schedule& iSchedule,
126  unsigned int iStreamIndex,
127  P& iPrincipal,
128  IOVSyncValue const& iTS,
129  EventSetupImpl const& iES,
130  ServiceToken const& token,
131  SC& iSubProcesses,
132  bool cleaningUpAfterException) {
133  //When we are done processing the stream for this process,
134  // we need to run the stream for all SubProcesses
135  //NOTE: The subprocesses set their own service tokens
136 
137  auto subs =
138  make_waiting_task(tbb::task::allocate_root(),
139  [&iSubProcesses, iWait, iStreamIndex, &iPrincipal, iTS, cleaningUpAfterException](
140  std::exception_ptr const* iPtr) mutable {
141  if (iPtr) {
142  auto excpt = *iPtr;
143  auto delayError = make_waiting_task(
144  tbb::task::allocate_root(),
145  [iWait, excpt](std::exception_ptr const*) mutable { iWait.doneWaiting(excpt); });
146  WaitingTaskHolder h(delayError);
147  for (auto& subProcess : iSubProcesses) {
149  h, subProcess, iStreamIndex, iPrincipal, iTS, cleaningUpAfterException);
150  }
151  } else {
152  for (auto& subProcess : iSubProcesses) {
154  iWait, subProcess, iStreamIndex, iPrincipal, iTS, cleaningUpAfterException);
155  }
156  }
157  });
158 
159  iSchedule.processOneStreamAsync<Traits>(
160  WaitingTaskHolder(subs), iStreamIndex, iPrincipal, iES, token, cleaningUpAfterException);
161  }
162 
163  template <typename Traits, typename P, typename SC>
165  Schedule& iSchedule,
166  unsigned int iNStreams,
167  P& iPrincipal,
168  IOVSyncValue const& iTS,
169  EventSetupImpl const& iES,
170  ServiceToken const& iToken,
171  SC& iSubProcesses,
172  bool cleaningUpAfterException) {
173  for (unsigned int i = 0; i < iNStreams; ++i) {
174  endStreamTransitionAsync<Traits>(
175  iWait, iSchedule, i, iPrincipal, iTS, iES, iToken, iSubProcesses, cleaningUpAfterException);
176  }
177  }
178 }; // namespace edm
179 
180 #endif
void doStreamBeginRunAsync(WaitingTaskHolder iHolder, unsigned int iID, RunPrincipal const &principal, IOVSyncValue const &ts)
Definition: SubProcess.cc:557
FWCore Framework interface EventSetupRecordImplementation h
Helper function to determine trigger accepts.
void doStreamEndRunAsync(WaitingTaskHolder iHolder, unsigned int iID, RunPrincipal const &principal, IOVSyncValue const &ts, bool cleaningUpAfterException)
Definition: SubProcess.cc:569
void doStreamEndLuminosityBlockAsync(WaitingTaskHolder iHolder, unsigned int iID, LuminosityBlockPrincipal const &principal, IOVSyncValue const &ts, bool cleaningUpAfterException)
Definition: SubProcess.cc:600
void endStreamsTransitionAsync(WaitingTaskHolder iWait, Schedule &iSchedule, unsigned int iNStreams, P &iPrincipal, IOVSyncValue const &iTS, EventSetupImpl const &iES, ServiceToken const &iToken, SC &iSubProcesses, bool cleaningUpAfterException)
void doneWaiting(std::exception_ptr iExcept)
void endStreamTransitionAsync(WaitingTaskHolder iWait, Schedule &iSchedule, unsigned int iStreamIndex, P &iPrincipal, IOVSyncValue const &iTS, EventSetupImpl const &iES, ServiceToken const &token, SC &iSubProcesses, bool cleaningUpAfterException)
void processOneStreamAsync(WaitingTaskHolder iTask, unsigned int iStreamID, typename T::MyPrincipal &principal, EventSetupImpl const &eventSetup, ServiceToken const &token, bool cleaningUpAfterException=false)
Definition: Schedule.h:318
void subProcessDoStreamEndTransitionAsync(WaitingTaskHolder iHolder, SubProcess &iSubProcess, unsigned int i, LuminosityBlockPrincipal &iPrincipal, IOVSyncValue const &iTS, bool cleaningUpAfterException)
void subProcessDoStreamBeginTransitionAsync(WaitingTaskHolder iHolder, SubProcess &iSubProcess, unsigned int i, LuminosityBlockPrincipal &iPrincipal, IOVSyncValue const &iTS)
void beginStreamsTransitionAsync(WaitingTask *iWait, Schedule &iSchedule, unsigned int iNStreams, P &iPrincipal, IOVSyncValue const &iTS, EventSetupImpl const &iES, ServiceToken const &token, SC &iSubProcesses)
void doStreamBeginLuminosityBlockAsync(WaitingTaskHolder iHolder, unsigned int iID, LuminosityBlockPrincipal const &principal, IOVSyncValue const &ts)
Definition: SubProcess.cc:588
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:87
std::pair< OmniClusterRef, TrackingParticleRef > P
HLT enums.
void beginStreamTransitionAsync(WaitingTaskHolder iWait, Schedule &iSchedule, unsigned int iStreamIndex, P &iPrincipal, IOVSyncValue const &iTS, EventSetupImpl const &iES, ServiceToken const &token, SC &iSubProcesses)
def move(src, dest)
Definition: eostools.py:511