CMS 3D CMS Logo

streamTransitionAsync.h
Go to the documentation of this file.
1 #ifndef FWCore_Framework_streamTransitionAsync_h
2 #define FWCore_Framework_streamTransitionAsync_h
3 // -*- C++ -*-
4 //
5 // Package: FWCore/Framework
6 // Function: streamTransitionAsync
7 //
16 //
17 // Original Author: Chris Jones
18 // Created: Tue, 06 Sep 2016 16:04:26 GMT
19 //
20 
21 // system include files
26 
27 // user include files
28 
29 // forward declarations
30 
31 namespace edm {
32  class IOVSyncValue;
33  class EventSetup;
34  class LuminosityBlockPrincipal;
35  class RunPrincipal;
36 
37  //This is code in common between beginStreamRun and beginStreamLuminosityBlock
38  inline void subProcessDoStreamBeginTransitionAsync(WaitingTaskHolder iHolder,SubProcess& iSubProcess, unsigned int i, LuminosityBlockPrincipal& iPrincipal, IOVSyncValue const& iTS) {
39  iSubProcess.doStreamBeginLuminosityBlockAsync(std::move(iHolder),i,iPrincipal, iTS);
40  }
41 
42  inline void subProcessDoStreamBeginTransitionAsync(WaitingTaskHolder iHolder, SubProcess& iSubProcess, unsigned int i, RunPrincipal& iPrincipal, IOVSyncValue const& iTS) {
43  iSubProcess.doStreamBeginRunAsync(std::move(iHolder),i,iPrincipal, iTS);
44  }
45 
46  inline void subProcessDoStreamEndTransitionAsync(WaitingTaskHolder iHolder, SubProcess& iSubProcess, unsigned int i, LuminosityBlockPrincipal& iPrincipal, IOVSyncValue const& iTS, bool cleaningUpAfterException) {
47  iSubProcess.doStreamEndLuminosityBlockAsync(std::move(iHolder),i,iPrincipal, iTS,cleaningUpAfterException);
48  }
49 
50  inline void subProcessDoStreamEndTransitionAsync(WaitingTaskHolder iHolder, SubProcess& iSubProcess, unsigned int i, RunPrincipal& iPrincipal, IOVSyncValue const& iTS, bool cleaningUpAfterException) {
51  iSubProcess.doStreamEndRunAsync(std::move(iHolder), i ,iPrincipal, iTS, cleaningUpAfterException);
52  }
53 
54 
55  template<typename Traits, typename P, typename SC >
57  Schedule& iSchedule,
58  unsigned int iStreamIndex,
59  P& iPrincipal,
60  IOVSyncValue const & iTS,
61  EventSetup const& iES,
62  ServiceToken const& token,
63  SC& iSubProcesses) {
64  //When we are done processing the stream for this process,
65  // we need to run the stream for all SubProcesses
66  //NOTE: The subprocesses set their own service tokens
67  auto subs = make_waiting_task(tbb::task::allocate_root(), [&iSubProcesses, iWait,iStreamIndex,&iPrincipal,iTS](std::exception_ptr const* iPtr) mutable {
68  if(iPtr) {
69  auto excpt = *iPtr;
70  auto delayError = make_waiting_task(tbb::task::allocate_root(), [iWait,excpt](std::exception_ptr const* ) mutable {
71  iWait.doneWaiting(excpt);
72  });
73  WaitingTaskHolder h(delayError);
74  for(auto& subProcess: iSubProcesses){
75  subProcessDoStreamBeginTransitionAsync(h,subProcess,iStreamIndex,iPrincipal, iTS);
76 
77  };
78  } else {
79  for(auto& subProcess: iSubProcesses){
80  subProcessDoStreamBeginTransitionAsync(iWait,subProcess,iStreamIndex,iPrincipal, iTS);
81  };
82  }
83  });
84 
85  WaitingTaskHolder h(subs);
86  iSchedule.processOneStreamAsync<Traits>(std::move(h), iStreamIndex,iPrincipal, iES,token);
87  }
88 
89 
90  template<typename Traits, typename P, typename SC >
92  Schedule& iSchedule,
93  unsigned int iNStreams,
94  P& iPrincipal,
95  IOVSyncValue const & iTS,
96  EventSetup const& iES,
97  ServiceToken const& token,
98  SC& iSubProcesses)
99  {
100  WaitingTaskHolder holdUntilAllStreamsCalled(iWait);
101  for(unsigned int i=0; i<iNStreams;++i) {
102  beginStreamTransitionAsync<Traits>(WaitingTaskHolder(iWait), iSchedule,i,iPrincipal,iTS,iES,token, iSubProcesses);
103  }
104  }
105 
106  template<typename Traits, typename P, typename SC >
108  Schedule& iSchedule,
109  unsigned int iStreamIndex,
110  P& iPrincipal,
111  IOVSyncValue const & iTS,
112  EventSetup const& iES,
113  ServiceToken const& token,
114  SC& iSubProcesses,
115  bool cleaningUpAfterException)
116  {
117  //When we are done processing the stream for this process,
118  // we need to run the stream for all SubProcesses
119  //NOTE: The subprocesses set their own service tokens
120 
121  auto subs = make_waiting_task(tbb::task::allocate_root(), [&iSubProcesses, iWait,iStreamIndex,&iPrincipal,iTS,cleaningUpAfterException](std::exception_ptr const* iPtr) mutable {
122  if(iPtr) {
123  auto excpt = *iPtr;
124  auto delayError = make_waiting_task(tbb::task::allocate_root(), [iWait,excpt](std::exception_ptr const* ) mutable {
125  iWait.doneWaiting(excpt);
126  });
127  WaitingTaskHolder h(delayError);
128  for(auto& subProcess: iSubProcesses) {
129  subProcessDoStreamEndTransitionAsync(h,subProcess,iStreamIndex,iPrincipal, iTS,cleaningUpAfterException);
130  }
131  } else {
132  for(auto& subProcess: iSubProcesses) {
133  subProcessDoStreamEndTransitionAsync(iWait,subProcess,iStreamIndex,iPrincipal, iTS,cleaningUpAfterException);
134  }
135  }
136  });
137 
138  iSchedule.processOneStreamAsync<Traits>(WaitingTaskHolder(subs), iStreamIndex,iPrincipal, iES, token, cleaningUpAfterException);
139  }
140 
141  template<typename Traits, typename P, typename SC >
143  Schedule& iSchedule,
144  unsigned int iNStreams,
145  P& iPrincipal,
146  IOVSyncValue const & iTS,
147  EventSetup const& iES,
148  ServiceToken const& iToken,
149  SC& iSubProcesses,
150  bool cleaningUpAfterException)
151  {
152  for(unsigned int i=0; i<iNStreams;++i) {
153  endStreamTransitionAsync<Traits>(iWait,
154  iSchedule,i,
155  iPrincipal,iTS,iES, iToken,
156  iSubProcesses,cleaningUpAfterException);
157  }
158  }
159 };
160 
161 #endif
void doStreamBeginRunAsync(WaitingTaskHolder iHolder, unsigned int iID, RunPrincipal const &principal, IOVSyncValue const &ts)
Definition: SubProcess.cc:583
FWCore Framework interface EventSetupRecordImplementation h
Helper function to determine trigger accepts.
void doStreamEndRunAsync(WaitingTaskHolder iHolder, unsigned int iID, RunPrincipal const &principal, IOVSyncValue const &ts, bool cleaningUpAfterException)
Definition: SubProcess.cc:602
void doStreamEndLuminosityBlockAsync(WaitingTaskHolder iHolder, unsigned int iID, LuminosityBlockPrincipal const &principal, IOVSyncValue const &ts, bool cleaningUpAfterException)
Definition: SubProcess.cc:638
void beginStreamsTransitionAsync(WaitingTask *iWait, Schedule &iSchedule, unsigned int iNStreams, P &iPrincipal, IOVSyncValue const &iTS, EventSetup const &iES, ServiceToken const &token, SC &iSubProcesses)
void endStreamTransitionAsync(WaitingTaskHolder iWait, Schedule &iSchedule, unsigned int iStreamIndex, P &iPrincipal, IOVSyncValue const &iTS, EventSetup const &iES, ServiceToken const &token, SC &iSubProcesses, bool cleaningUpAfterException)
void doneWaiting(std::exception_ptr iExcept)
void beginStreamTransitionAsync(WaitingTaskHolder iWait, Schedule &iSchedule, unsigned int iStreamIndex, P &iPrincipal, IOVSyncValue const &iTS, EventSetup const &iES, ServiceToken const &token, SC &iSubProcesses)
void subProcessDoStreamEndTransitionAsync(WaitingTaskHolder iHolder, SubProcess &iSubProcess, unsigned int i, LuminosityBlockPrincipal &iPrincipal, IOVSyncValue const &iTS, bool cleaningUpAfterException)
void subProcessDoStreamBeginTransitionAsync(WaitingTaskHolder iHolder, SubProcess &iSubProcess, unsigned int i, LuminosityBlockPrincipal &iPrincipal, IOVSyncValue const &iTS)
void processOneStreamAsync(WaitingTaskHolder iTask, unsigned int iStreamID, typename T::MyPrincipal &principal, EventSetup const &eventSetup, ServiceToken const &token, bool cleaningUpAfterException=false)
Definition: Schedule.h:313
void doStreamBeginLuminosityBlockAsync(WaitingTaskHolder iHolder, unsigned int iID, LuminosityBlockPrincipal const &principal, IOVSyncValue const &ts)
Definition: SubProcess.cc:619
void endStreamsTransitionAsync(WaitingTaskHolder iWait, Schedule &iSchedule, unsigned int iNStreams, P &iPrincipal, IOVSyncValue const &iTS, EventSetup const &iES, ServiceToken const &iToken, SC &iSubProcesses, bool cleaningUpAfterException)
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:92
std::pair< OmniClusterRef, TrackingParticleRef > P
HLT enums.
def move(src, dest)
Definition: eostools.py:511