CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Groups Pages
streamTransitionAsync.h
Go to the documentation of this file.
1 #ifndef FWCore_Framework_streamTransitionAsync_h
2 #define FWCore_Framework_streamTransitionAsync_h
3 // -*- C++ -*-
4 //
5 // Package: FWCore/Framework
6 // Function: streamTransitionAsync
7 //
16 //
17 // Original Author: Chris Jones
18 // Created: Tue, 06 Sep 2016 16:04:26 GMT
19 //
20 
27 
28 #include <vector>
29 
30 namespace edm {
31 
32  //This is code in common between beginStreamRun and beginStreamLuminosityBlock
34  SubProcess& iSubProcess,
35  unsigned int i,
36  LumiTransitionInfo const& iTransitionInfo) {
37  iSubProcess.doStreamBeginLuminosityBlockAsync(std::move(iHolder), i, iTransitionInfo);
38  }
39 
41  SubProcess& iSubProcess,
42  unsigned int i,
43  RunTransitionInfo const& iTransitionInfo) {
44  iSubProcess.doStreamBeginRunAsync(std::move(iHolder), i, iTransitionInfo);
45  }
46 
48  SubProcess& iSubProcess,
49  unsigned int i,
50  LumiTransitionInfo const& iTransitionInfo,
51  bool cleaningUpAfterException) {
52  iSubProcess.doStreamEndLuminosityBlockAsync(std::move(iHolder), i, iTransitionInfo, cleaningUpAfterException);
53  }
54 
56  SubProcess& iSubProcess,
57  unsigned int i,
58  RunTransitionInfo const& iTransitionInfo,
59  bool cleaningUpAfterException) {
60  iSubProcess.doStreamEndRunAsync(std::move(iHolder), i, iTransitionInfo, cleaningUpAfterException);
61  }
62 
63  template <typename Traits>
65  Schedule& iSchedule,
66  unsigned int iStreamIndex,
67  typename Traits::TransitionInfoType& transitionInfo,
68  ServiceToken const& token,
69  std::vector<SubProcess>& iSubProcesses) {
70  //When we are done processing the stream for this process,
71  // we need to run the stream for all SubProcesses
72  //NOTE: The subprocesses set their own service tokens
73  using namespace edm::waiting_task;
74  chain::first([&](auto nextTask) {
75  iSchedule.processOneStreamAsync<Traits>(std::move(nextTask), iStreamIndex, transitionInfo, token);
76  }) |
78  [&iSubProcesses, iStreamIndex, info = transitionInfo](std::exception_ptr const* iPtr, auto nextTask) {
79  if (iPtr) {
80  auto excpt = *iPtr;
81  //defer handling exception until after sub processes run
82  chain::first([&](std::exception_ptr const*, auto nextTask) {
83  for (auto& subProcess : iSubProcesses) {
84  subProcessDoStreamBeginTransitionAsync(nextTask, subProcess, iStreamIndex, info);
85  };
86  }) | chain::then([excpt](std::exception_ptr const*, auto nextTask) { nextTask.doneWaiting(excpt); }) |
87  chain::runLast(nextTask);
88  } else {
89  for (auto& subProcess : iSubProcesses) {
90  subProcessDoStreamBeginTransitionAsync(nextTask, subProcess, iStreamIndex, info);
91  };
92  }
93  }) |
94  chain::runLast(iWait);
95  }
96 
97  template <typename Traits>
99  Schedule& iSchedule,
100  unsigned int iNStreams,
101  typename Traits::TransitionInfoType& transitionInfo,
102  ServiceToken const& token,
103  std::vector<SubProcess>& iSubProcesses) {
104  for (unsigned int i = 0; i < iNStreams; ++i) {
105  beginStreamTransitionAsync<Traits>(iWait, iSchedule, i, transitionInfo, token, iSubProcesses);
106  }
107  }
108 
109  template <typename Traits>
111  Schedule& iSchedule,
112  unsigned int iStreamIndex,
113  typename Traits::TransitionInfoType& transitionInfo,
114  ServiceToken const& token,
115  std::vector<SubProcess>& iSubProcesses,
116  bool cleaningUpAfterException) {
117  //When we are done processing the stream for this process,
118  // we need to run the stream for all SubProcesses
119  //NOTE: The subprocesses set their own service tokens
120 
121  using namespace edm::waiting_task;
122  chain::first([&](auto nextTask) {
123  iSchedule.processOneStreamAsync<Traits>(nextTask, iStreamIndex, transitionInfo, token, cleaningUpAfterException);
124  }) |
125  chain::then([&iSubProcesses, iStreamIndex, info = transitionInfo, cleaningUpAfterException](
126  std::exception_ptr const* iPtr, auto nextTask) {
127  if (iPtr) {
128  auto excpt = *iPtr;
129  chain::first([&](std::exception_ptr const*, auto nextTask) {
130  for (auto& subProcess : iSubProcesses) {
132  nextTask, subProcess, iStreamIndex, info, cleaningUpAfterException);
133  }
134  }) | chain::then([excpt](std::exception_ptr const*, auto nextTask) { nextTask.doneWaiting(excpt); }) |
135  chain::runLast(nextTask);
136  } else {
137  for (auto& subProcess : iSubProcesses) {
138  subProcessDoStreamEndTransitionAsync(nextTask, subProcess, iStreamIndex, info, cleaningUpAfterException);
139  }
140  }
141  }) |
142  chain::runLast(iWait);
143  }
144 
145  template <typename Traits>
147  Schedule& iSchedule,
148  unsigned int iNStreams,
149  typename Traits::TransitionInfoType& transitionInfo,
150  ServiceToken const& iToken,
151  std::vector<SubProcess>& iSubProcesses,
152  bool cleaningUpAfterException) {
153  for (unsigned int i = 0; i < iNStreams; ++i) {
154  endStreamTransitionAsync<Traits>(
155  iWait, iSchedule, i, transitionInfo, iToken, iSubProcesses, cleaningUpAfterException);
156  }
157  }
158 }; // namespace edm
159 
160 #endif
void doStreamEndLuminosityBlockAsync(WaitingTaskHolder iHolder, unsigned int iID, LumiTransitionInfo const &, bool cleaningUpAfterException)
Definition: SubProcess.cc:747
static const TGPicture * info(bool iBackgroundIsBlack)
void processOneStreamAsync(WaitingTaskHolder iTask, unsigned int iStreamID, typename T::TransitionInfoType &transitionInfo, ServiceToken const &token, bool cleaningUpAfterException=false)
Definition: Schedule.h:317
void doStreamEndRunAsync(WaitingTaskHolder iHolder, unsigned int iID, RunTransitionInfo const &, bool cleaningUpAfterException)
Definition: SubProcess.cc:723
constexpr auto then(O &&iO)
Definition: chain_first.h:277
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
def move
Definition: eostools.py:511
void beginStreamTransitionAsync(WaitingTaskHolder iWait, Schedule &iSchedule, unsigned int iStreamIndex, typename Traits::TransitionInfoType &transitionInfo, ServiceToken const &token, std::vector< SubProcess > &iSubProcesses)
void beginStreamsTransitionAsync(WaitingTaskHolder iWait, Schedule &iSchedule, unsigned int iNStreams, typename Traits::TransitionInfoType &transitionInfo, ServiceToken const &token, std::vector< SubProcess > &iSubProcesses)
void doStreamBeginLuminosityBlockAsync(WaitingTaskHolder iHolder, unsigned int iID, LumiTransitionInfo const &)
Definition: SubProcess.cc:735
void doStreamBeginRunAsync(WaitingTaskHolder iHolder, unsigned int iID, RunTransitionInfo const &)
Definition: SubProcess.cc:713
void endStreamTransitionAsync(WaitingTaskHolder iWait, Schedule &iSchedule, unsigned int iStreamIndex, typename Traits::TransitionInfoType &transitionInfo, ServiceToken const &token, std::vector< SubProcess > &iSubProcesses, bool cleaningUpAfterException)
void endStreamsTransitionAsync(WaitingTaskHolder iWait, Schedule &iSchedule, unsigned int iNStreams, typename Traits::TransitionInfoType &transitionInfo, ServiceToken const &iToken, std::vector< SubProcess > &iSubProcesses, bool cleaningUpAfterException)
void subProcessDoStreamBeginTransitionAsync(WaitingTaskHolder iHolder, SubProcess &iSubProcess, unsigned int i, LumiTransitionInfo const &iTransitionInfo)
void subProcessDoStreamEndTransitionAsync(WaitingTaskHolder iHolder, SubProcess &iSubProcess, unsigned int i, LumiTransitionInfo const &iTransitionInfo, bool cleaningUpAfterException)