1 #ifndef FWCore_Framework_Worker_h 2 #define FWCore_Framework_Worker_h 63 #include <unordered_map> 68 class ProductResolverIndexHelper;
69 class ProductResolverIndexAndSkipBit;
72 class ProductRegistry;
73 class ThinnedAssociationsHelper;
75 namespace workerhelper {
79 namespace eventsetup {
95 operator bool() {
return serial_ !=
nullptr or limited_ !=
nullptr; }
121 virtual bool wantsGlobalRuns()
const = 0;
122 virtual bool wantsGlobalLuminosityBlocks()
const = 0;
123 virtual bool wantsStreamRuns()
const = 0;
124 virtual bool wantsStreamLuminosityBlocks()
const = 0;
129 template <
typename T>
130 bool doWork(
typename T::MyPrincipal
const&,
134 typename T::Context
const* context);
142 template <
typename T>
144 typename T::MyPrincipal
const&,
149 typename T::Context
const* context);
151 template <
typename T>
153 typename T::MyPrincipal
const&,
158 typename T::Context
const* context);
160 template <
typename T>
161 std::exception_ptr runModuleDirectly(
typename T::MyPrincipal
const& ep,
165 typename T::Context
const* context);
177 implRegisterThinnedAssociations(registry, helper);
181 cached_exception_ = std::exception_ptr();
183 waitingTasks_.reset();
184 workStarted_ =
false;
185 numberOfPathsLeftToRun_ = numberOfPathsOn_;
194 void setActivityRegistry(std::shared_ptr<ActivityRegistry> areg);
201 virtual void resolvePutIndicies(
203 std::unordered_multimap<
std::string, std::tuple<TypeID const*, const char*, edm::ProductResolverIndex>>
const&
206 virtual void modulesWhoseProductsAreConsumed(
207 std::vector<ModuleDescription const*>&
modules,
209 std::map<std::string, ModuleDescription const*>
const& labelsToDesc)
const = 0;
213 virtual std::vector<ConsumesInfo> consumesInfo()
const = 0;
215 virtual Types moduleType()
const = 0;
218 timesRun_.store(0, std::memory_order_release);
219 timesVisited_.store(0, std::memory_order_release);
220 timesPassed_.store(0, std::memory_order_release);
221 timesFailed_.store(0, std::memory_order_release);
222 timesExcept_.store(0, std::memory_order_release);
227 int timesRun()
const {
return timesRun_.load(std::memory_order_acquire); }
228 int timesVisited()
const {
return timesVisited_.load(std::memory_order_acquire); }
229 int timesPassed()
const {
return timesPassed_.load(std::memory_order_acquire); }
230 int timesFailed()
const {
return timesFailed_.load(std::memory_order_acquire); }
231 int timesExcept()
const {
return timesExcept_.load(std::memory_order_acquire); }
236 virtual bool hasAccumulator()
const = 0;
239 template <
typename O>
244 virtual void itemsToGetForSelection(std::vector<ProductResolverIndexAndSkipBit>&)
const = 0;
245 virtual bool implNeedToRunSelection()
const = 0;
254 virtual bool implDoStreamBegin(
StreamID id,
258 virtual bool implDoStreamEnd(
StreamID id,
266 virtual bool implDoStreamBegin(
StreamID id,
270 virtual bool implDoStreamEnd(
StreamID id,
277 virtual void implBeginJob() = 0;
278 virtual void implEndJob() = 0;
279 virtual void implBeginStream(
StreamID) = 0;
280 virtual void implEndStream(
StreamID) = 0;
287 template <
typename T>
288 bool runModule(
typename T::MyPrincipal
const&,
292 typename T::Context
const* context);
294 virtual void itemsToGet(
BranchType, std::vector<ProductResolverIndexAndSkipBit>&)
const = 0;
295 virtual void itemsMayGet(
BranchType, std::vector<ProductResolverIndexAndSkipBit>&)
const = 0;
297 virtual std::vector<ProductResolverIndexAndSkipBit>
const& itemsToGetFrom(
BranchType)
const = 0;
299 virtual std::vector<ProductResolverIndex>
const& itemsShouldPutInEvent()
const = 0;
301 virtual void preActionBeforeRunEventAsync(
WaitingTask* iTask,
305 virtual void implRespondToOpenInputFile(
FileBlock const&
fb) = 0;
306 virtual void implRespondToCloseInputFile(
FileBlock const& fb) = 0;
325 template <
typename T>
330 std::ostringstream iost;
339 bool shouldRethrowException(std::exception_ptr iPtr,
344 template <
bool IS_EVENT>
347 timesPassed_.fetch_add(1, std::memory_order_relaxed);
353 template <
bool IS_EVENT>
356 timesFailed_.fetch_add(1, std::memory_order_relaxed);
362 template <
bool IS_EVENT>
365 timesExcept_.fetch_add(1, std::memory_order_relaxed);
367 cached_exception_ = iException;
369 return cached_exception_;
375 actReg_->postModuleEventPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(), moduleCallingContext_);
378 virtual bool hasAcquire()
const = 0;
380 template <
typename T>
381 std::exception_ptr runModuleAfterAsyncPrefetch(std::exception_ptr
const* iEPtr,
382 typename T::MyPrincipal
const& ep,
386 typename T::Context
const* context);
393 void runAcquireAfterAsyncPrefetch(std::exception_ptr
const* iEPtr,
399 std::exception_ptr handleExternalWorkException(std::exception_ptr
const* iEPtr,
ParentContext const& parentContext);
401 template <
typename T>
405 typename T::MyPrincipal
const& ep,
410 typename T::Context
const* context)
414 m_streamID(streamID),
415 m_parentContext(parentContext),
417 m_serviceToken(token) {}
439 std::exception_ptr temp_excptr;
440 auto excptr = exceptionPtr();
441 if (T::isEvent_ && !m_worker->hasAcquire()) {
444 m_worker->emitPostModuleEventPrefetchingSignal();
446 temp_excptr = std::current_exception();
448 excptr = &temp_excptr;
454 if (
auto queue = m_worker->serializeRunModule()) {
457 auto f = [worker = m_worker,
460 streamID = m_streamID,
461 parentContext = m_parentContext,
462 sContext = m_context,
463 serviceToken = m_serviceToken]() {
471 std::exception_ptr* ptr =
nullptr;
472 worker->template runModuleAfterAsyncPrefetch<T>(ptr,
principal, es, streamID, parentContext, sContext);
477 gQueue->push([
queue, gQueue,
f]()
mutable {
488 m_worker->runModuleAfterAsyncPrefetch<
T>(excptr, m_principal, m_es, m_streamID, m_parentContext, m_context);
506 template <
typename T,
typename DUMMY =
void>
510 typename T::MyPrincipal
const& ep,
518 template <
typename DUMMY>
530 m_parentContext(parentContext),
532 m_serviceToken(token) {}
540 std::exception_ptr temp_excptr;
541 auto excptr = exceptionPtr();
544 m_worker->emitPostModuleEventPrefetchingSignal();
546 temp_excptr = std::current_exception();
548 excptr = &temp_excptr;
553 if (
auto queue = m_worker->serializeRunModule()) {
556 queue.push([worker = m_worker,
559 parentContext = m_parentContext,
560 serviceToken = m_serviceToken,
561 holder = m_holder]() {
565 std::exception_ptr* ptr =
nullptr;
566 worker->runAcquireAfterAsyncPrefetch(ptr,
principal, es, parentContext, holder);
572 m_worker->runAcquireAfterAsyncPrefetch(excptr, m_principal, m_es, m_parentContext,
std::move(m_holder));
619 std::atomic<
bool> workStarted_;
620 bool ranAcquireWithoutException_;
624 template <
typename T>
625 class ModuleSignalSentry {
628 typename T::Context
const* context,
630 : a_(a), context_(context), moduleCallingContext_(moduleCallingContext) {
632 T::preModuleSignal(a_, context, moduleCallingContext_);
635 ~ModuleSignalSentry() {
637 T::postModuleSignal(a_, context_, moduleCallingContext_);
642 typename T::Context
const* context_;
648 namespace workerhelper {
661 return iWorker->
implDo(ep, es, mcc);
681 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
700 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
719 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
738 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
758 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
777 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
797 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
816 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
826 template <
typename T>
828 typename T::MyPrincipal
const& ep,
833 typename T::Context
const* context) {
838 waitingTasks_.add(task);
840 timesVisited_.fetch_add(1, std::memory_order_relaxed);
843 bool expected =
false;
844 if (workStarted_.compare_exchange_strong(expected,
true)) {
852 new (tbb::task::allocate_root())
RunModuleTask<T>(
this, ep, es, token, streamID, parentContext, context);
859 auto p = m_task.load();
866 auto t = m_task.load();
867 m_task.store(
nullptr);
871 std::atomic<edm::WaitingTask*> m_task;
874 auto ownRunTask = std::make_shared<DestroyTask>(runTask);
877 [ownRunTask, parentContext, &ep, token,
this](std::exception_ptr
const*)
mutable {
879 prefetchAsync(ownRunTask->release(), token, parentContext, ep);
881 prePrefetchSelectionAsync(selectionTask, token, streamID, &ep);
884 new (tbb::task::allocate_root())
RunModuleTask<T>(
this, ep, es, token, streamID, parentContext, context);
885 if (T::isEvent_ && hasAcquire()) {
888 moduleTask =
new (tbb::task::allocate_root())
891 prefetchAsync(moduleTask, token, parentContext, ep);
896 template <
typename T>
898 typename T::MyPrincipal
const& ep,
902 typename T::Context
const* context) {
903 std::exception_ptr exceptionPtr;
907 if (shouldRethrowException(*iEPtr, parentContext, T::isEvent_, idValue)) {
908 exceptionPtr = *iEPtr;
909 setException<T::isEvent_>(exceptionPtr);
911 setPassed<T::isEvent_>();
916 runModule<T>(ep, es, streamID, parentContext, context);
918 exceptionPtr = std::current_exception();
921 waitingTasks_.doneWaiting(exceptionPtr);
925 template <
typename T>
927 typename T::MyPrincipal
const&
principal,
932 typename T::Context
const* context) {
936 waitingTasks_.add(task);
937 bool expected =
false;
938 if (workStarted_.compare_exchange_strong(expected,
true)) {
939 auto toDo = [
this, &
principal, &es, streamID, parentContext, context, serviceToken]() {
940 std::exception_ptr exceptionPtr;
945 this->runModule<T>(
principal, es, streamID, parentContext, context);
947 exceptionPtr = std::current_exception();
949 this->waitingTasks_.doneWaiting(exceptionPtr);
951 if (
auto queue = this->serializeRunModule()) {
955 tbb::task::spawn(*task);
960 template <
typename T>
965 typename T::Context
const* context) {
967 timesVisited_.fetch_add(1, std::memory_order_relaxed);
979 std::rethrow_exception(cached_exception_);
983 bool expected =
false;
984 if (not workStarted_.compare_exchange_strong(expected,
true)) {
987 waitTask->increment_ref_count();
989 waitingTasks_.add(waitTask.get());
991 waitTask->wait_for_all();
1001 std::rethrow_exception(cached_exception_);
1012 std::unique_ptr<ModuleCallingContext, decltype(resetContext)> prefetchSentry(&moduleCallingContext_, resetContext);
1018 waitTask->set_ref_count(2);
1020 waitTask->decrement_ref_count();
1021 waitTask->wait_for_all();
1023 if (state() !=
Ready) {
1033 auto sentryFunc = [
this](
void*) { emitPostModuleEventPrefetchingSignal(); };
1034 std::unique_ptr<ActivityRegistry, decltype(sentryFunc)> signalSentry(actReg_.get(), sentryFunc);
1037 waitTask->set_ref_count(2);
1040 waitTask->decrement_ref_count();
1041 waitTask->wait_for_all();
1043 if (waitTask->exceptionPtr() !=
nullptr) {
1045 if (shouldRethrowException(*waitTask->exceptionPtr(), parentContext, T::isEvent_, idValue)) {
1046 setException<T::isEvent_>(*waitTask->exceptionPtr());
1047 waitingTasks_.doneWaiting(cached_exception_);
1048 std::rethrow_exception(cached_exception_);
1050 setPassed<T::isEvent_>();
1051 waitingTasks_.doneWaiting(
nullptr);
1058 prefetchSentry.release();
1059 if (
auto queue = serializeRunModule()) {
1061 queue.pushAndWait([&]() {
1065 rc = runModule<T>(ep, es, streamID, parentContext, context);
1071 rc = runModule<T>(ep, es, streamID, parentContext, context);
1075 if (state_ == Exception) {
1076 waitingTasks_.doneWaiting(cached_exception_);
1077 std::rethrow_exception(cached_exception_);
1080 waitingTasks_.doneWaiting(
nullptr);
1084 template <
typename T>
1089 typename T::Context
const* context) {
1096 timesRun_.fetch_add(1, std::memory_order_relaxed);
1105 setPassed<T::isEvent_>();
1107 setFailed<T::isEvent_>();
1113 if (shouldRethrowException(std::current_exception(), parentContext, T::isEvent_, idValue)) {
1114 assert(not cached_exception_);
1115 setException<T::isEvent_>(std::current_exception());
1116 std::rethrow_exception(cached_exception_);
1118 rc = setPassed<T::isEvent_>();
1125 template <
typename T>
1130 typename T::Context
const* context) {
1131 timesVisited_.fetch_add(1, std::memory_order_relaxed);
1132 std::exception_ptr
const* prefetchingException =
nullptr;
1133 return runModuleAfterAsyncPrefetch<T>(prefetchingException, ep, es, streamID, parentContext, context);
void push(T &&iAction)
asynchronously pushes functor iAction into queue
static SerialTaskQueue * enableGlobalQueue(Worker *)
tbb::task * execute() override
std::atomic< int > timesVisited_
static SerialTaskQueue * pauseGlobalQueue(Worker *)
ModuleDescription const & description() const
T::Context const * m_context
void callWhenDoneAsync(WaitingTask *task)
EventSetupImpl const & m_es
T::MyPrincipal const & m_principal
std::atomic< int > numberOfPathsLeftToRun_
std::exception_ptr setException(std::exception_ptr iException)
virtual bool wantsGlobalRuns() const =0
static bool needToRunSelection(Worker const *iWorker)
static SerialTaskQueue * enableGlobalQueue(Worker *)
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker)
static bool call(Worker *iWorker, StreamID, EventPrincipal const &ep, EventSetupImpl const &es, ActivityRegistry *, ModuleCallingContext const *mcc, Arg::Context const *)
static bool call(Worker *iWorker, StreamID, LuminosityBlockPrincipal const &ep, EventSetupImpl const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
static bool wantsTransition(Worker const *iWorker)
OccurrenceTraits< LuminosityBlockPrincipal, BranchActionStreamBegin > Arg
static bool needToRunSelection(Worker const *iWorker)
virtual bool implDo(EventPrincipal const &, EventSetupImpl const &c, ModuleCallingContext const *mcc)=0
ParentContext const m_parentContext
ParentContext const m_parentContext
FunctorTask< F > * make_functor_task(ALLOC &&iAlloc, F f)
static SerialTaskQueue * enableGlobalQueue(Worker *)
std::atomic< int > timesExcept_
static bool wantsTransition(Worker const *iWorker)
virtual bool implDoStreamEnd(StreamID id, RunPrincipal const &rp, EventSetupImpl const &c, ModuleCallingContext const *mcc)=0
EventPrincipal const & m_principal
static bool needToRunSelection(Worker const *iWorker)
OccurrenceTraits< RunPrincipal, BranchActionGlobalEnd > Arg
std::atomic< int > timesFailed_
WaitingTask * m_runModuleTask
std::exception_ptr runModuleDirectly(typename T::MyPrincipal const &ep, EventSetupImpl const &es, StreamID streamID, ParentContext const &parentContext, typename T::Context const *context)
TaskQueueAdaptor(SerialTaskQueueChain *iChain)
static bool needToRunSelection(Worker const *iWorker)
static SerialTaskQueue * enableGlobalQueue(Worker *)
virtual bool implDoStreamBegin(StreamID id, RunPrincipal const &rp, EventSetupImpl const &c, ModuleCallingContext const *mcc)=0
void prePrefetchSelectionAsync(WaitingTask *task, ServiceToken const &, StreamID stream, void const *)
void push(T &&iAction)
asynchronously pushes functor iAction into queue
ActivityRegistry * activityRegistry()
void pushAndWait(T &&iAction)
synchronously pushes functor iAction into queue
void exceptionContext(std::ostream &, GlobalContext const &)
virtual bool implDoBegin(RunPrincipal const &rp, EventSetupImpl const &c, ModuleCallingContext const *mcc)=0
ExceptionToActionTable const * actions_
static bool call(Worker *iWorker, StreamID, RunPrincipal const &ep, EventSetupImpl const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
static bool call(Worker *iWorker, StreamID, RunPrincipal const &ep, EventSetupImpl const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
ServiceToken presentToken() const
bool runModule(typename T::MyPrincipal const &, EventSetupImpl const &c, StreamID stream, ParentContext const &parentContext, typename T::Context const *context)
void setContext(State state, ParentContext const &parent, ModuleCallingContext const *previousOnThread)
ModuleCallingContext moduleCallingContext_
void doWorkNoPrefetchingAsync(WaitingTask *task, typename T::MyPrincipal const &, EventSetupImpl const &c, ServiceToken const &token, StreamID stream, ParentContext const &parentContext, typename T::Context const *context)
tbb::task * execute() override
std::exception_ptr runModuleAfterAsyncPrefetch(std::exception_ptr const *iEPtr, typename T::MyPrincipal const &ep, EventSetupImpl const &es, StreamID streamID, ParentContext const &parentContext, typename T::Context const *context)
AcquireTask(Worker *worker, EventPrincipal const &ep, EventSetupImpl const &es, ServiceToken const &token, ParentContext const &parentContext, WaitingTaskWithArenaHolder holder)
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker)
static bool needToRunSelection(Worker const *iWorker)
static bool needToRunSelection(Worker const *iWorker)
bool resume()
Resumes processing if the queue was paused.
static bool wantsTransition(Worker const *iWorker)
EnableQueueGuard(EnableQueueGuard &&iGuard)
static bool needToRunSelection(Worker const *iWorker)
virtual bool implNeedToRunSelection() const =0
static SerialTaskQueue * enableGlobalQueue(Worker *)
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
ServiceToken m_serviceToken
OccurrenceTraits< RunPrincipal, BranchActionGlobalBegin > Arg
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
ModuleDescription const * descPtr() const
EnableQueueGuard(SerialTaskQueue *iQueue)
static bool call(Worker *iWorker, StreamID, LuminosityBlockPrincipal const &ep, EventSetupImpl const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
void registerThinnedAssociations(ProductRegistry const ®istry, ThinnedAssociationsHelper &helper)
static ServiceRegistry & instance()
static bool wantsTransition(Worker const *iWorker)
#define CMS_THREAD_GUARD(_var_)
OccurrenceTraits< LuminosityBlockPrincipal, BranchActionGlobalEnd > Arg
virtual bool wantsStreamLuminosityBlocks() const =0
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker)
tbb::task * execute() override
bool doWork(typename T::MyPrincipal const &, EventSetupImpl const &c, StreamID stream, ParentContext const &parentContext, typename T::Context const *context)
static bool call(Worker *iWorker, StreamID id, LuminosityBlockPrincipal const &ep, EventSetupImpl const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
std::atomic< State > state_
static SerialTaskQueue * enableGlobalQueue(Worker *)
static SerialTaskQueue * enableGlobalQueue(Worker *)
static bool call(Worker *iWorker, StreamID id, RunPrincipal const &ep, EventSetupImpl const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
WaitingTaskWithArenaHolder m_holder
OccurrenceTraits< LuminosityBlockPrincipal, BranchActionStreamEnd > Arg
TransitionIDValue(T const &iP)
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker)
AcquireTask(Worker *worker, typename T::MyPrincipal const &ep, EventSetupImpl const &es, ServiceToken const &token, ParentContext const &parentContext, WaitingTaskWithArenaHolder holder)
static bool wantsTransition(Worker const *iWorker)
static bool wantsTransition(Worker const *iWorker)
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker)
static bool wantsTransition(Worker const *iWorker)
static bool wantsTransition(Worker const *iWorker)
TaskQueueAdaptor(LimitedTaskQueue *iLimited)
OccurrenceTraits< EventPrincipal, BranchActionStreamBegin > Arg
static bool call(Worker *iWorker, StreamID id, LuminosityBlockPrincipal const &ep, EventSetupImpl const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker)
std::string value() const override
static bool call(Worker *iWorker, StreamID id, RunPrincipal const &ep, EventSetupImpl const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
virtual SerialTaskQueue * globalRunsQueue()=0
OccurrenceTraits< RunPrincipal, BranchActionStreamBegin > Arg
void respondToOpenInputFile(FileBlock const &fb)
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker)
virtual bool wantsGlobalLuminosityBlocks() const =0
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
static SerialTaskQueue * enableGlobalQueue(Worker *iWorker)
RunModuleTask(Worker *worker, typename T::MyPrincipal const &ep, EventSetupImpl const &es, ServiceToken const &token, StreamID streamID, ParentContext const &parentContext, typename T::Context const *context)
void respondToCloseInputFile(FileBlock const &fb)
virtual bool wantsStreamRuns() const =0
virtual ~TransitionIDValueBase()
virtual bool implDoEnd(RunPrincipal const &rp, EventSetupImpl const &c, ModuleCallingContext const *mcc)=0
void emitPostModuleEventPrefetchingSignal()
ParentContext const m_parentContext
static SerialTaskQueue * enableGlobalQueue(Worker *iWorker)
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker)
std::atomic< int > timesPassed_
static bool needToRunSelection(Worker const *iWorker)
void pushAndWait(T &&iAction)
synchronously pushes functor iAction into queue
static bool needToRunSelection(Worker const *iWorker)
auto wrap(F iFunc) -> decltype(iFunc())
static bool wantsTransition(Worker const *iWorker)
OccurrenceTraits< LuminosityBlockPrincipal, BranchActionGlobalBegin > Arg
static uInt32 F(BLOWFISH_CTX *ctx, uInt32 x)
std::atomic< int > timesRun_
void doWorkAsync(WaitingTask *task, typename T::MyPrincipal const &, EventSetupImpl const &c, ServiceToken const &token, StreamID stream, ParentContext const &parentContext, typename T::Context const *context)
EventSetupImpl const & m_es
OccurrenceTraits< RunPrincipal, BranchActionStreamEnd > Arg
ServiceToken m_serviceToken
virtual SerialTaskQueue * globalLuminosityBlocksQueue()=0