23 std::shared_ptr<ActivityRegistry> areg,
25 std::atomic<bool>* stopProcessingEvent,
38 pathContext_(path_name, streamContext, bitpos, pathType),
39 stopProcessingEvent_(stopProcessingEvent),
40 pathStatusInserter_(nullptr),
41 pathStatusInserterWorker_(nullptr) {
42 for (
auto& workerInPath :
workers_) {
49 : timesRun_(r.timesRun_),
50 timesPassed_(r.timesPassed_),
51 timesFailed_(r.timesFailed_),
52 timesExcept_(r.timesExcept_),
53 failedModuleIndex_(r.failedModuleIndex_),
58 act_table_(r.act_table_),
60 pathContext_(r.pathContext_),
61 stopProcessingEvent_(r.stopProcessingEvent_),
62 pathStatusInserter_(r.pathStatusInserter_),
63 pathStatusInserterWorker_(r.pathStatusInserterWorker_) {
64 for (
auto& workerInPath :
workers_) {
77 if (
e.context().empty()) {
80 bool should_continue =
true;
89 should_continue =
false;
103 if (
e.category() == pNF) {
104 std::ostringstream ost;
105 ost <<
"If you wish to continue processing events after a " << pNF <<
" exception,\n" 106 <<
"add \"SkipEvent = cms.untracked.vstring('ProductNotFound')\" to the \"options\" PSet in the " 108 e.addAdditionalInfo(ost.str());
116 return should_continue;
126 std::ostringstream ost;
127 ost <<
"Running path '" << pathContext.
pathName() <<
"'";
130 ost <<
"Processing ";
133 if (begin && branchType ==
InRun) {
134 ost <<
"stream begin Run";
135 }
else if (begin && branchType ==
InLumi) {
136 ost <<
"stream begin LuminosityBlock ";
137 }
else if (!begin && branchType ==
InLumi) {
138 ost <<
"stream end LuminosityBlock ";
139 }
else if (!begin && branchType ==
InRun) {
140 ost <<
"stream end Run ";
141 }
else if (isEvent) {
150 bool expected =
false;
151 while (
stateLock_.compare_exchange_strong(expected,
true)) {
199 using std::placeholders::_1;
207 if (
found != iWorkerToDeleter.end()) {
208 found->second->addedToPath();
237 finished(std::exception_ptr(), iStreamContext, iInfo, iStreamID);
245 unsigned int iModuleIndex,
250 oneapi::tbb::task_group& iGroup) {
256 auto& worker =
workers_[iModuleIndex];
257 bool shouldContinue = worker.checkResultsOfRunWorker(
true);
258 std::exception_ptr finalException;
260 std::unique_ptr<cms::Exception> pEx;
262 std::rethrow_exception(*iException);
264 pEx = std::unique_ptr<cms::Exception>(oldEx.
clone());
272 std::ostringstream ost;
284 worker.skipWorker(iEP);
285 finalException = std::exception_ptr();
287 shouldContinue =
false;
288 finalException = std::current_exception();
297 shouldContinue =
false;
299 auto const nextIndex = iModuleIndex + 1;
300 if (shouldContinue and nextIndex <
workers_.size()) {
301 if (not worker.runConcurrently()) {
308 if (not shouldContinue) {
311 if (not shouldContinue and not worker.runConcurrently()) {
313 for (
auto it =
workers_.begin() + nextIndex, itEnd =
workers_.end(); it != itEnd; ++it) {
320 finished(finalException, iContext, iInfo, iID);
338 std::exception_ptr jException =
341 if (jException && not iException) {
342 iException = jException;
346 if (not iException) {
347 iException = std::current_exception();
358 oneapi::tbb::task_group& iGroup) {
360 const int firstModuleIndex = iNextModuleIndex;
361 int lastModuleIndex = firstModuleIndex;
362 while (lastModuleIndex + 1 != static_cast<int>(
workers_.size()) and
workers_[lastModuleIndex].runConcurrently()) {
365 for (; lastModuleIndex >= firstModuleIndex; --lastModuleIndex) {
367 auto nextTask =
make_waiting_task([
this, lastModuleIndex,
info = iInfo, iID, iContext, weakToken, &iGroup](
368 std::exception_ptr
const* iException) {
Worker const * getWorker(size_type i) const
std::exception_ptr runModuleDirectly(typename T::TransitionInfoType const &, StreamID, ParentContext const &, typename T::Context const *)
bool handleWorkerFailure(cms::Exception &e, int nwrwue, bool isEvent, bool begin, BranchType branchType, ModuleDescription const &, std::string const &id) const
std::atomic< unsigned int > modulesToRun_
void recordStatus(int nwrwue, hlt::HLTState state)
roAction_t actions[nactions]
static const std::string & codeToString(Code)
-----------— implementation details ---------------—
std::vector< WorkerInPath > WorkersInPath
void threadsafe_setFailedModuleInfo(int nwrwue, std::exception_ptr)
EventPrincipal & principal()
HLTState
status of a trigger path
void runNextWorkerAsync(unsigned int iNextModuleIndex, EventTransitionInfo const &, ServiceToken const &, StreamID const &, StreamContext const *, oneapi::tbb::task_group &iGroup)
void setPathStatus(StreamID const &, HLTPathStatus const &)
WaitingTaskList waitingTasks_
void reset()
Resets access to the resource so that added tasks will wait.
void setPathStatusInserter(PathStatusInserter *pathStatusInserter, Worker *pathStatusInserterWorker)
void workerFinished(std::exception_ptr const *, unsigned int iModuleIndex, EventTransitionInfo const &, ServiceToken const &, StreamID const &, StreamContext const *, oneapi::tbb::task_group &iGroup)
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
oneapi::tbb::task_group * group() const noexcept
void setEarlyDeleteHelpers(std::map< const Worker *, EarlyDeleteHelper *> const &)
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
std::shared_ptr< HLTGlobalStatus > TrigResPtr
void finished(std::exception_ptr, StreamContext const *, EventTransitionInfo const &, StreamID const &)
void add(oneapi::tbb::task_group *, WaitingTask *)
Adds task to the waiting list.
void updateCounters(hlt::HLTState state)
std::atomic< bool > *const stopProcessingEvent_
FunctorWaitingTask< F > * make_waiting_task(F f)
ServiceToken lock() const
ExceptionToActionTable const *const act_table_
static void exceptionContext(cms::Exception &ex, bool isEvent, bool begin, BranchType branchType, ModuleDescription const &, std::string const &id, PathContext const &)
void presetTaskAsFailed(std::exception_ptr iExcept)
exception_actions::ActionCodes find(const std::string &category) const
Path(int bitpos, std::string const &path_name, WorkersInPath const &workers, TrigResPtr trptr, ExceptionToActionTable const &actions, std::shared_ptr< ActivityRegistry > reg, StreamContext const *streamContext, std::atomic< bool > *stopProcessEvent, PathContext::PathType pathType)
void addContext(std::string const &context)
std::shared_ptr< ActivityRegistry > const actReg_
std::string const & pathName() const
EventID const & id() const
std::atomic< bool > stateLock_
virtual Exception * clone() const
Worker * pathStatusInserterWorker_
void processOneOccurrenceAsync(WaitingTaskHolder, EventTransitionInfo const &, ServiceToken const &, StreamID const &, StreamContext const *)
void printCmsExceptionWarning(char const *behavior, cms::Exception const &e)
PathStatusInserter * pathStatusInserter_