CMS 3D CMS Logo

All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
streamTransitionAsync.h
Go to the documentation of this file.
1 #ifndef FWCore_Framework_streamTransitionAsync_h
2 #define FWCore_Framework_streamTransitionAsync_h
3 // -*- C++ -*-
4 //
5 // Package: FWCore/Framework
6 // Function: streamTransitionAsync
7 //
16 //
17 // Original Author: Chris Jones
18 // Created: Tue, 06 Sep 2016 16:04:26 GMT
19 //
20 
21 // system include files
26 
27 // user include files
28 
29 // forward declarations
30 
31 namespace edm {
32  class IOVSyncValue;
33  class EventSetup;
34  class LuminosityBlockPrincipal;
35  class RunPrincipal;
36 
37  //This is code in common between beginStreamRun and beginStreamLuminosityBlock
38  inline void subProcessDoStreamBeginTransitionAsync(WaitingTaskHolder iHolder,SubProcess& iSubProcess, unsigned int i, LuminosityBlockPrincipal& iPrincipal, IOVSyncValue const& iTS) {
39  iSubProcess.doStreamBeginLuminosityBlockAsync(std::move(iHolder),i,iPrincipal, iTS);
40  }
41 
42  inline void subProcessDoStreamBeginTransitionAsync(WaitingTaskHolder iHolder, SubProcess& iSubProcess, unsigned int i, RunPrincipal& iPrincipal, IOVSyncValue const& iTS) {
43  iSubProcess.doStreamBeginRunAsync(std::move(iHolder),i,iPrincipal, iTS);
44  }
45 
46  inline void subProcessDoStreamEndTransitionAsync(WaitingTaskHolder iHolder, SubProcess& iSubProcess, unsigned int i, LuminosityBlockPrincipal& iPrincipal, IOVSyncValue const& iTS, bool cleaningUpAfterException) {
47  iSubProcess.doStreamEndLuminosityBlockAsync(std::move(iHolder),i,iPrincipal, iTS,cleaningUpAfterException);
48  }
49 
50  inline void subProcessDoStreamEndTransitionAsync(WaitingTaskHolder iHolder, SubProcess& iSubProcess, unsigned int i, RunPrincipal& iPrincipal, IOVSyncValue const& iTS, bool cleaningUpAfterException) {
51  iSubProcess.doStreamEndRunAsync(std::move(iHolder), i ,iPrincipal, iTS, cleaningUpAfterException);
52  }
53 
54 
55  template<typename Traits, typename P, typename SC >
57  Schedule& iSchedule,
58  unsigned int iStreamIndex,
59  P& iPrincipal,
60  IOVSyncValue const & iTS,
61  EventSetup const& iES,
62  SC& iSubProcesses) {
64 
65  //When we are done processing the stream for this process,
66  // we need to run the stream for all SubProcesses
67  auto subs = make_waiting_task(tbb::task::allocate_root(), [&iSubProcesses, iWait,iStreamIndex,&iPrincipal,iTS,token](std::exception_ptr const* iPtr) mutable {
68  ServiceRegistry::Operate op(token);
69  if(iPtr) {
70  auto excpt = *iPtr;
71  auto delayError = make_waiting_task(tbb::task::allocate_root(), [iWait,token,excpt](std::exception_ptr const* ) mutable {
72  ServiceRegistry::Operate op(token);
73  iWait.doneWaiting(excpt);
74  });
75  WaitingTaskHolder h(delayError);
76  for_all(iSubProcesses, [&h,iStreamIndex, &iPrincipal, iTS](auto& subProcess){ subProcessDoStreamBeginTransitionAsync(h,subProcess,iStreamIndex,iPrincipal, iTS); });
77  } else {
78  for_all(iSubProcesses, [&iWait,iStreamIndex, &iPrincipal, iTS](auto& subProcess){ subProcessDoStreamBeginTransitionAsync(iWait,subProcess,iStreamIndex,iPrincipal, iTS); });
79  }
80  });
81 
82  WaitingTaskHolder h(subs);
83  iSchedule.processOneStreamAsync<Traits>(std::move(h), iStreamIndex,iPrincipal, iES);
84  }
85 
86 
87  template<typename Traits, typename P, typename SC >
89  Schedule& iSchedule,
90  unsigned int iNStreams,
91  P& iPrincipal,
92  IOVSyncValue const & iTS,
93  EventSetup const& iES,
94  SC& iSubProcesses)
95  {
96  WaitingTaskHolder holdUntilAllStreamsCalled(iWait);
97  for(unsigned int i=0; i<iNStreams;++i) {
98  beginStreamTransitionAsync<Traits>(WaitingTaskHolder(iWait), iSchedule,i,iPrincipal,iTS,iES,iSubProcesses);
99  }
100  }
101 
102  template<typename Traits, typename P, typename SC >
104  Schedule& iSchedule,
105  unsigned int iStreamIndex,
106  P& iPrincipal,
107  IOVSyncValue const & iTS,
108  EventSetup const& iES,
109  SC& iSubProcesses,
110  bool cleaningUpAfterException)
111  {
113 
114  //When we are done processing the stream for this process,
115  // we need to run the stream for all SubProcesses
116  auto subs = make_waiting_task(tbb::task::allocate_root(), [&iSubProcesses, iWait,iStreamIndex,&iPrincipal,iTS,token,cleaningUpAfterException](std::exception_ptr const* iPtr) mutable {
117  ServiceRegistry::Operate op(token);
118  if(iPtr) {
119  auto excpt = *iPtr;
120  auto delayError = make_waiting_task(tbb::task::allocate_root(), [iWait,token,excpt](std::exception_ptr const* ) mutable {
121  ServiceRegistry::Operate op(token);
122  iWait.doneWaiting(excpt);
123  });
124  WaitingTaskHolder h(delayError);
125  for_all(iSubProcesses, [&h,iStreamIndex, &iPrincipal, iTS,cleaningUpAfterException](auto& subProcess){
126  subProcessDoStreamEndTransitionAsync(h,subProcess,iStreamIndex,iPrincipal, iTS,cleaningUpAfterException); });
127  } else {
128  for_all(iSubProcesses, [&iWait,iStreamIndex, &iPrincipal, iTS,cleaningUpAfterException](auto& subProcess){
129  subProcessDoStreamEndTransitionAsync(iWait,subProcess,iStreamIndex,iPrincipal, iTS,cleaningUpAfterException); });
130  }
131  });
132 
133  WaitingTaskHolder h(subs);
134  iSchedule.processOneStreamAsync<Traits>(std::move(h), iStreamIndex,iPrincipal, iES,cleaningUpAfterException);
135 
136 
137  }
138 
139  template<typename Traits, typename P, typename SC >
141  Schedule& iSchedule,
142  unsigned int iNStreams,
143  P& iPrincipal,
144  IOVSyncValue const & iTS,
145  EventSetup const& iES,
146  SC& iSubProcesses,
147  bool cleaningUpAfterException)
148  {
149  WaitingTaskHolder holdUntilAllStreamsCalled(iWait);
150  for(unsigned int i=0; i<iNStreams;++i) {
151  endStreamTransitionAsync<Traits>(WaitingTaskHolder(iWait),
152  iSchedule,i,
153  iPrincipal,iTS,iES,
154  iSubProcesses,cleaningUpAfterException);
155  }
156  }
157 };
158 
159 #endif
void doStreamBeginRunAsync(WaitingTaskHolder iHolder, unsigned int iID, RunPrincipal const &principal, IOVSyncValue const &ts)
Definition: SubProcess.cc:551
FWCore Framework interface EventSetupRecordImplementation h
Helper function to determine trigger accepts.
void doStreamEndRunAsync(WaitingTaskHolder iHolder, unsigned int iID, RunPrincipal const &principal, IOVSyncValue const &ts, bool cleaningUpAfterException)
Definition: SubProcess.cc:571
void doStreamEndLuminosityBlockAsync(WaitingTaskHolder iHolder, unsigned int iID, LuminosityBlockPrincipal const &principal, IOVSyncValue const &ts, bool cleaningUpAfterException)
Definition: SubProcess.cc:608
ServiceToken presentToken() const
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
void doneWaiting(std::exception_ptr iExcept)
void subProcessDoStreamEndTransitionAsync(WaitingTaskHolder iHolder, SubProcess &iSubProcess, unsigned int i, LuminosityBlockPrincipal &iPrincipal, IOVSyncValue const &iTS, bool cleaningUpAfterException)
static ServiceRegistry & instance()
void subProcessDoStreamBeginTransitionAsync(WaitingTaskHolder iHolder, SubProcess &iSubProcess, unsigned int i, LuminosityBlockPrincipal &iPrincipal, IOVSyncValue const &iTS)
void doStreamBeginLuminosityBlockAsync(WaitingTaskHolder iHolder, unsigned int iID, LuminosityBlockPrincipal const &principal, IOVSyncValue const &ts)
Definition: SubProcess.cc:588
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:92
std::pair< OmniClusterRef, TrackingParticleRef > P
HLT enums.
void beginStreamsTransitionAsync(WaitingTask *iWait, Schedule &iSchedule, unsigned int iNStreams, P &iPrincipal, IOVSyncValue const &iTS, EventSetup const &iES, SC &iSubProcesses)
void endStreamTransitionAsync(WaitingTaskHolder iWait, Schedule &iSchedule, unsigned int iStreamIndex, P &iPrincipal, IOVSyncValue const &iTS, EventSetup const &iES, SC &iSubProcesses, bool cleaningUpAfterException)
void beginStreamTransitionAsync(WaitingTaskHolder iWait, Schedule &iSchedule, unsigned int iStreamIndex, P &iPrincipal, IOVSyncValue const &iTS, EventSetup const &iES, SC &iSubProcesses)
void endStreamsTransitionAsync(WaitingTask *iWait, Schedule &iSchedule, unsigned int iNStreams, P &iPrincipal, IOVSyncValue const &iTS, EventSetup const &iES, SC &iSubProcesses, bool cleaningUpAfterException)
void processOneStreamAsync(WaitingTaskHolder iTask, unsigned int iStreamID, typename T::MyPrincipal &principal, EventSetup const &eventSetup, bool cleaningUpAfterException=false)
Definition: Schedule.h:302
def move(src, dest)
Definition: eostools.py:510