23 std::shared_ptr<ActivityRegistry>
areg,
25 std::atomic<bool>* stopProcessingEvent,
31 failedModuleIndex_(workers.
size()),
38 pathContext_(path_name, streamContext, bitpos, pathType),
39 stopProcessingEvent_(stopProcessingEvent),
40 pathStatusInserter_(nullptr),
41 pathStatusInserterWorker_(nullptr) {
42 for (
auto& workerInPath :
workers_) {
49 : timesRun_(r.timesRun_),
50 timesPassed_(r.timesPassed_),
51 timesFailed_(r.timesFailed_),
52 timesExcept_(r.timesExcept_),
53 failedModuleIndex_(r.failedModuleIndex_),
58 act_table_(r.act_table_),
60 pathContext_(r.pathContext_),
61 stopProcessingEvent_(r.stopProcessingEvent_),
62 pathStatusInserter_(r.pathStatusInserter_),
63 pathStatusInserterWorker_(r.pathStatusInserterWorker_) {
64 for (
auto& workerInPath :
workers_) {
80 bool should_continue =
true;
89 should_continue =
false;
104 std::ostringstream ost;
105 ost <<
"If you wish to continue processing events after a " << pNF <<
" exception,\n"
106 <<
"add \"SkipEvent = cms.untracked.vstring('ProductNotFound')\" to the \"options\" PSet in the "
116 return should_continue;
126 std::ostringstream ost;
127 ost <<
"Running path '" << pathContext.
pathName() <<
"'";
130 ost <<
"Processing ";
133 if (begin && branchType ==
InRun) {
134 ost <<
"stream begin Run";
135 }
else if (begin && branchType ==
InLumi) {
136 ost <<
"stream begin LuminosityBlock ";
137 }
else if (!begin && branchType ==
InLumi) {
138 ost <<
"stream end LuminosityBlock ";
139 }
else if (!begin && branchType ==
InRun) {
140 ost <<
"stream end Run ";
141 }
else if (isEvent) {
150 bool expected =
false;
151 while (
stateLock_.compare_exchange_strong(expected,
true)) {
199 using std::placeholders::_1;
207 if (
found != iWorkerToDeleter.end()) {
208 found->second->addedToPath();
237 finished(std::exception_ptr(), iStreamContext, iInfo, iStreamID);
245 unsigned int iModuleIndex,
250 tbb::task_group& iGroup) {
256 auto& worker =
workers_[iModuleIndex];
257 bool shouldContinue = worker.checkResultsOfRunWorker(
true);
258 std::exception_ptr finalException;
260 std::unique_ptr<cms::Exception> pEx;
262 std::rethrow_exception(*iException);
264 pEx = std::unique_ptr<cms::Exception>(oldEx.
clone());
268 std::ostringstream ost;
280 worker.skipWorker(iEP);
281 finalException = std::exception_ptr();
283 shouldContinue =
false;
284 finalException = std::current_exception();
293 shouldContinue =
false;
295 auto const nextIndex = iModuleIndex + 1;
296 if (shouldContinue and nextIndex <
workers_.size()) {
297 if (not worker.runConcurrently()) {
304 if (not shouldContinue) {
307 if (not shouldContinue and not worker.runConcurrently()) {
309 for (
auto it =
workers_.begin() + nextIndex, itEnd =
workers_.end(); it != itEnd; ++it) {
316 finished(finalException, iContext, iInfo, iID);
333 std::exception_ptr jException =
336 if (jException && not iException) {
337 iException = jException;
341 if (not iException) {
342 iException = std::current_exception();
353 tbb::task_group& iGroup) {
355 const int firstModuleIndex = iNextModuleIndex;
356 int lastModuleIndex = firstModuleIndex;
357 while (lastModuleIndex + 1 != static_cast<int>(
workers_.size()) and
workers_[lastModuleIndex].runConcurrently()) {
360 for (; lastModuleIndex >= firstModuleIndex; --lastModuleIndex) {
362 auto nextTask =
make_waiting_task([
this, lastModuleIndex,
info = iInfo, iID, iContext, weakToken, &iGroup](
363 std::exception_ptr
const* iException) {
void workerFinished(std::exception_ptr const *, unsigned int iModuleIndex, EventTransitionInfo const &, ServiceToken const &, StreamID const &, StreamContext const *, tbb::task_group &iGroup)
std::exception_ptr runModuleDirectly(typename T::TransitionInfoType const &, StreamID, ParentContext const &, typename T::Context const *)
std::string const & pathName() const
ServiceToken lock() const
std::atomic< unsigned int > modulesToRun_
void recordStatus(int nwrwue, hlt::HLTState state)
static const std::string & codeToString(Code)
-----------— implementation details ---------------—
uint16_t *__restrict__ id
std::vector< WorkerInPath > WorkersInPath
void threadsafe_setFailedModuleInfo(int nwrwue, std::exception_ptr)
EventPrincipal & principal()
HLTState
status of a trigger path
EventID const & id() const
void setPathStatus(StreamID const &, HLTPathStatus const &)
std::string const & category() const
exception_actions::ActionCodes find(const std::string &category) const
WaitingTaskList waitingTasks_
void reset()
Resets access to the resource so that added tasks will wait.
void setPathStatusInserter(PathStatusInserter *pathStatusInserter, Worker *pathStatusInserterWorker)
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
std::shared_ptr< HLTGlobalStatus > TrigResPtr
void finished(std::exception_ptr, StreamContext const *, EventTransitionInfo const &, StreamID const &)
std::list< std::string > const & context() const
void updateCounters(hlt::HLTState state)
std::atomic< bool > *const stopProcessingEvent_
void addAdditionalInfo(std::string const &info)
FunctorWaitingTask< F > * make_waiting_task(F f)
void runNextWorkerAsync(unsigned int iNextModuleIndex, EventTransitionInfo const &, ServiceToken const &, StreamID const &, StreamContext const *, tbb::task_group &iGroup)
void add(tbb::task_group *, WaitingTask *)
Adds task to the waiting list.
ExceptionToActionTable const *const act_table_
static void exceptionContext(cms::Exception &ex, bool isEvent, bool begin, BranchType branchType, ModuleDescription const &, std::string const &id, PathContext const &)
void presetTaskAsFailed(std::exception_ptr iExcept)
Path(int bitpos, std::string const &path_name, WorkersInPath const &workers, TrigResPtr trptr, ExceptionToActionTable const &actions, std::shared_ptr< ActivityRegistry > reg, StreamContext const *streamContext, std::atomic< bool > *stopProcessEvent, PathContext::PathType pathType)
void addContext(std::string const &context)
std::shared_ptr< ActivityRegistry > const actReg_
void setEarlyDeleteHelpers(std::map< const Worker *, EarlyDeleteHelper * > const &)
tbb::task_group * group() const noexcept
Worker const * getWorker(size_type i) const
bool handleWorkerFailure(cms::Exception &e, int nwrwue, bool isEvent, bool begin, BranchType branchType, ModuleDescription const &, std::string const &id) const
std::atomic< bool > stateLock_
tuple size
Write out results.
Worker * pathStatusInserterWorker_
void processOneOccurrenceAsync(WaitingTaskHolder, EventTransitionInfo const &, ServiceToken const &, StreamID const &, StreamContext const *)
virtual Exception * clone() const
void printCmsExceptionWarning(char const *behavior, cms::Exception const &e)
PathStatusInserter * pathStatusInserter_