20 std::shared_ptr<ActivityRegistry> areg,
22 std::atomic<bool>* stopProcessingEvent,
34 pathContext_(path_name, streamContext, bitpos, pathType),
35 stopProcessingEvent_(stopProcessingEvent),
37 pathStatusInserterWorker_(
nullptr) {
38 for (
auto& workerInPath :
workers_) {
59 for (
auto& workerInPath :
workers_) {
74 bool should_continue =
true;
83 should_continue =
false;
101 std::ostringstream ost;
102 ost <<
"If you wish to continue processing events after a " << pNF <<
" exception,\n" 103 <<
"add \"SkipEvent = cms.untracked.vstring('ProductNotFound')\" to the \"options\" PSet in the " 113 return should_continue;
123 std::ostringstream ost;
124 ost <<
"Running path '" << pathContext.
pathName() <<
"'";
127 ost <<
"Processing ";
130 if (begin && branchType ==
InRun) {
131 ost <<
"stream begin Run";
132 }
else if (begin && branchType ==
InLumi) {
133 ost <<
"stream begin LuminosityBlock ";
134 }
else if (!begin && branchType ==
InLumi) {
135 ost <<
"stream end LuminosityBlock ";
136 }
else if (!begin && branchType ==
InRun) {
137 ost <<
"stream end Run ";
138 }
else if (isEvent) {
165 using std::placeholders::_1;
173 std::vector<EarlyDeleteHelper*>
temp;
174 temp.reserve(iWorkerToDeleter.size());
177 if (
found != iWorkerToDeleter.end()) {
178 temp.push_back(
found->second);
179 found->second->addedToPath();
182 std::vector<EarlyDeleteHelper*> tempCorrectSize(temp.begin(), temp.end());
193 helper->pathFinished(iEvent);
214 finished(-1,
true, std::exception_ptr(), iStreamContext, iEP, iES, iStreamID);
222 unsigned int iModuleIndex,
232 auto& worker =
workers_[iModuleIndex];
233 bool shouldContinue = worker.checkResultsOfRunWorker(
true);
234 std::exception_ptr finalException;
236 std::unique_ptr<cms::Exception> pEx;
238 std::rethrow_exception(*iException);
240 pEx = std::unique_ptr<cms::Exception>(oldEx.
clone());
243 std::ostringstream ost;
250 worker.getWorker()->description(),
253 worker.skipWorker(iEP);
254 finalException = std::exception_ptr();
256 shouldContinue =
false;
257 finalException = std::current_exception();
266 shouldContinue =
false;
268 auto const nextIndex = iModuleIndex + 1;
269 if (shouldContinue and nextIndex <
workers_.size()) {
274 if (not shouldContinue) {
276 for (
auto it =
workers_.begin() + nextIndex, itEnd =
workers_.end(); it != itEnd; ++it) {
281 finished(iModuleIndex, shouldContinue, finalException, iContext, iEP, iES, iID);
286 std::exception_ptr iException,
291 if (not iException) {
301 std::exception_ptr jException =
304 if (jException && not iException) {
305 iException = jException;
309 if (not iException) {
310 iException = std::current_exception();
323 tbb::task::allocate_root(),
324 [
this, iNextModuleIndex, &iEP, &iES, iID, iContext, token = iToken](std::exception_ptr
const* iException) {
325 this->
workerFinished(iException, iNextModuleIndex, iEP, iES, token, iID, iContext);
329 nextTask, iEP, iES, iToken, iID, iContext);
std::string const & pathName() const
void recordStatus(int nwrwue, bool isEvent)
void handleEarlyFinish(EventPrincipal const &)
roAction_t actions[nactions]
static const std::string & codeToString(Code)
-----------— implementation details ---------------—
std::vector< EarlyDeleteHelper * > earlyDeleteHelpers_
std::vector< WorkerInPath > WorkersInPath
void add(WaitingTask *)
Adds task to the waiting list.
std::exception_ptr runModuleDirectly(typename T::MyPrincipal const &ep, EventSetupImpl const &es, StreamID streamID, ParentContext const &parentContext, typename T::Context const *context)
EventID const & id() const
void setPathStatus(StreamID const &, HLTPathStatus const &)
std::string const & category() const
exception_actions::ActionCodes find(const std::string &category) const
WaitingTaskList waitingTasks_
void reset()
Resets access to the resource so that added tasks will wait.
void processOneOccurrenceAsync(WaitingTask *, EventPrincipal const &, EventSetupImpl const &, ServiceToken const &, StreamID const &, StreamContext const *)
void setPathStatusInserter(PathStatusInserter *pathStatusInserter, Worker *pathStatusInserterWorker)
std::shared_ptr< ActivityRegistry > actReg_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
std::shared_ptr< HLTGlobalStatus > TrigResPtr
std::list< std::string > const & context() const
void addAdditionalInfo(std::string const &info)
static void exceptionContext(cms::Exception &ex, bool isEvent, bool begin, BranchType branchType, ModuleDescription const &, std::string const &id, PathContext const &)
void presetTaskAsFailed(std::exception_ptr iExcept)
void updateCounters(bool succeed, bool isEvent)
Path(int bitpos, std::string const &path_name, WorkersInPath const &workers, TrigResPtr trptr, ExceptionToActionTable const &actions, std::shared_ptr< ActivityRegistry > reg, StreamContext const *streamContext, std::atomic< bool > *stopProcessEvent, PathContext::PathType pathType)
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
void addContext(std::string const &context)
bool handleWorkerFailure(cms::Exception &e, int nwrwue, bool isEvent, bool begin, BranchType branchType, ModuleDescription const &, std::string const &id)
void setEarlyDeleteHelpers(std::map< const Worker *, EarlyDeleteHelper * > const &)
ExceptionToActionTable const * act_table_
Worker const * getWorker(size_type i) const
void finished(int iModuleIndex, bool iSucceeded, std::exception_ptr, StreamContext const *, EventPrincipal const &iEP, EventSetupImpl const &iES, StreamID const &streamID)
void runNextWorkerAsync(unsigned int iNextModuleIndex, EventPrincipal const &, EventSetupImpl const &, ServiceToken const &, StreamID const &, StreamContext const *)
std::atomic< bool > * stopProcessingEvent_
def branchType(schema, name)
Worker * pathStatusInserterWorker_
void workerFinished(std::exception_ptr const *iException, unsigned int iModuleIndex, EventPrincipal const &iEP, EventSetupImpl const &iES, ServiceToken const &iToken, StreamID const &iID, StreamContext const *iContext)
virtual Exception * clone() const
void printCmsExceptionWarning(char const *behavior, cms::Exception const &e)
PathStatusInserter * pathStatusInserter_