24 std::shared_ptr<ActivityRegistry> areg,
31 failedModuleIndex_(
workers.size()),
38 pathContext_(path_name, streamContext, bitpos, pathType),
39 pathStatusInserter_(nullptr),
40 pathStatusInserterWorker_(nullptr) {
41 for (
auto& workerInPath :
workers_) {
48 : timesRun_(r.timesRun_),
49 timesPassed_(r.timesPassed_),
50 timesFailed_(r.timesFailed_),
51 timesExcept_(r.timesExcept_),
52 failedModuleIndex_(r.failedModuleIndex_),
57 act_table_(r.act_table_),
59 pathContext_(r.pathContext_),
60 pathStatusInserter_(r.pathStatusInserter_),
61 pathStatusInserterWorker_(r.pathStatusInserterWorker_) {
62 for (
auto& workerInPath :
workers_) {
69 if (
e.context().empty()) {
79 bool expected =
false;
82 s <<
"Path " <<
name() <<
" applying TryToContinue on";
90 if (
e.category() == pNF) {
91 std::ostringstream ost;
92 ost <<
"If you wish to continue processing events after a " << pNF <<
" exception,\n" 93 <<
"add \"TryToContinue = cms.untracked.vstring('ProductNotFound')\" to the \"options\" PSet in the " 95 e.addAdditionalInfo(ost.str());
111 std::ostringstream ost;
112 ost <<
"Running path '" << pathContext.
pathName() <<
"'";
115 ost <<
"Processing ";
118 if (begin && branchType ==
InRun) {
119 ost <<
"stream begin Run";
120 }
else if (begin && branchType ==
InLumi) {
121 ost <<
"stream begin LuminosityBlock ";
122 }
else if (!begin && branchType ==
InLumi) {
123 ost <<
"stream end LuminosityBlock ";
124 }
else if (!begin && branchType ==
InRun) {
125 ost <<
"stream end Run ";
126 }
else if (isEvent) {
135 bool expected =
false;
136 while (
stateLock_.compare_exchange_strong(expected,
true)) {
184 using std::placeholders::_1;
192 if (
found != iWorkerToDeleter.end()) {
193 found->second->addedToPath();
223 finished(std::exception_ptr(), iStreamContext, iInfo, iStreamID);
231 unsigned int iModuleIndex,
236 oneapi::tbb::task_group& iGroup) {
242 auto& worker =
workers_[iModuleIndex];
243 bool shouldContinue = worker.checkResultsOfRunWorker(
true);
244 std::exception_ptr finalException;
246 shouldContinue =
false;
247 std::unique_ptr<cms::Exception> pEx;
249 std::rethrow_exception(*iException);
251 pEx = std::unique_ptr<cms::Exception>(oldEx.
clone());
259 std::ostringstream ost;
265 worker.skipWorker(iEP);
267 finalException = std::current_exception();
275 auto const nextIndex = iModuleIndex + 1;
276 if (shouldContinue and nextIndex <
workers_.size()) {
277 if (not worker.runConcurrently()) {
284 if (not shouldContinue) {
287 if (not shouldContinue and not worker.runConcurrently()) {
289 for (
auto it =
workers_.begin() + nextIndex, itEnd =
workers_.end(); it != itEnd; ++it) {
296 finished(finalException, iContext, iInfo, iID);
314 std::exception_ptr jException =
317 if (jException && not iException) {
318 iException = jException;
322 if (not iException) {
323 iException = std::current_exception();
334 oneapi::tbb::task_group& iGroup) {
336 const int firstModuleIndex = iNextModuleIndex;
337 int lastModuleIndex = firstModuleIndex;
338 while (lastModuleIndex + 1 != static_cast<int>(
workers_.size()) and
workers_[lastModuleIndex].runConcurrently()) {
341 for (; lastModuleIndex >= firstModuleIndex; --lastModuleIndex) {
343 auto nextTask =
make_waiting_task([
this, lastModuleIndex,
info = iInfo, iID, iContext, weakToken, &iGroup](
344 std::exception_ptr
const* iException) {
Worker const * getWorker(size_type i) const
std::exception_ptr runModuleDirectly(typename T::TransitionInfoType const &, StreamID, ParentContext const &, typename T::Context const *)
std::atomic< unsigned int > modulesToRun_
void recordStatus(int nwrwue, hlt::HLTState state)
roAction_t actions[nactions]
static const std::string & codeToString(Code)
-----------— implementation details ---------------—
std::vector< WorkerInPath > WorkersInPath
EventPrincipal & principal()
HLTState
status of a trigger path
void runNextWorkerAsync(unsigned int iNextModuleIndex, EventTransitionInfo const &, ServiceToken const &, StreamID const &, StreamContext const *, oneapi::tbb::task_group &iGroup)
void setPathStatus(StreamID const &, HLTPathStatus const &)
WaitingTaskList waitingTasks_
void reset()
Resets access to the resource so that added tasks will wait.
void handleWorkerFailure(cms::Exception &e, int nwrwue, ModuleDescription const &, std::string const &id)
void setPathStatusInserter(PathStatusInserter *pathStatusInserter, Worker *pathStatusInserterWorker)
void workerFinished(std::exception_ptr const *, unsigned int iModuleIndex, EventTransitionInfo const &, ServiceToken const &, StreamID const &, StreamContext const *, oneapi::tbb::task_group &iGroup)
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
oneapi::tbb::task_group * group() const noexcept
void setEarlyDeleteHelpers(std::map< const Worker *, EarlyDeleteHelper *> const &)
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
std::shared_ptr< HLTGlobalStatus > TrigResPtr
void finished(std::exception_ptr, StreamContext const *, EventTransitionInfo const &, StreamID const &)
void add(oneapi::tbb::task_group *, WaitingTask *)
Adds task to the waiting list.
void updateCounters(hlt::HLTState state)
FunctorWaitingTask< F > * make_waiting_task(F f)
ServiceToken lock() const
ExceptionToActionTable const *const act_table_
static void exceptionContext(cms::Exception &ex, bool isEvent, bool begin, BranchType branchType, ModuleDescription const &, std::string const &id, PathContext const &)
void presetTaskAsFailed(std::exception_ptr iExcept)
exception_actions::ActionCodes find(const std::string &category) const
void addContext(std::string const &context)
std::shared_ptr< ActivityRegistry > const actReg_
std::atomic< bool > printedException_
std::string const & pathName() const
EventID const & id() const
std::string const & name() const
std::atomic< bool > stateLock_
virtual Exception * clone() const
Worker * pathStatusInserterWorker_
void processOneOccurrenceAsync(WaitingTaskHolder, EventTransitionInfo const &, ServiceToken const &, StreamID const &, StreamContext const *)
Path(int bitpos, std::string const &path_name, WorkersInPath const &workers, TrigResPtr trptr, ExceptionToActionTable const &actions, std::shared_ptr< ActivityRegistry > reg, StreamContext const *streamContext, PathContext::PathType pathType)
void printCmsExceptionWarning(char const *behavior, cms::Exception const &e)
void threadsafe_setFailedModuleInfo(int nwrwue, bool iExceptionHappened)
PathStatusInserter * pathStatusInserter_