16 std::shared_ptr<ActivityRegistry>
areg,
18 std::atomic<bool>* stopProcessingEvent,
30 pathContext_(path_name, streamContext, bitpos, pathType),
31 stopProcessingEvent_(stopProcessingEvent){
33 for (
auto& workerInPath :
workers_) {
39 timesRun_(r.timesRun_),
40 timesPassed_(r.timesPassed_),
41 timesFailed_(r.timesFailed_),
42 timesExcept_(r.timesExcept_),
47 act_table_(r.act_table_),
49 earlyDeleteHelpers_(r.earlyDeleteHelpers_),
50 pathContext_(r.pathContext_),
51 stopProcessingEvent_(r.stopProcessingEvent_){
53 for (
auto& workerInPath :
workers_) {
70 bool should_continue =
true;
79 should_continue =
false;
96 std::ostringstream ost;
97 ost <<
"If you wish to continue processing events after a " << pNF <<
" exception,\n" <<
98 "add \"SkipEvent = cms.untracked.vstring('ProductNotFound')\" to the \"options\" PSet in the configuration.\n";
106 return should_continue;
117 std::ostringstream ost;
121 if (begin && branchType ==
InRun) {
122 ost <<
"Calling beginRun";
124 else if (begin && branchType ==
InLumi) {
125 ost <<
"Calling beginLuminosityBlock";
127 else if (!begin && branchType ==
InLumi) {
128 ost <<
"Calling endLuminosityBlock";
130 else if (!begin && branchType ==
InRun) {
131 ost <<
"Calling endRun";
135 ost <<
"Calling unknown function";
141 ost <<
"Running path '" << pathContext.
pathName() <<
"'";
144 ost <<
"Processing ";
169 using std::placeholders::_1;
178 std::vector<EarlyDeleteHelper*>
temp;
179 temp.reserve(iWorkerToDeleter.size());
182 if(
found != iWorkerToDeleter.end()) {
183 temp.push_back(
found->second);
184 found->second->addedToPath();
187 std::vector<EarlyDeleteHelper*> tempCorrectSize(temp.begin(),temp.end());
194 helper->pathFinished(iEvent);
213 finished(-1,
true, std::exception_ptr(), iStreamContext);
222 unsigned int iModuleIndex,
228 auto& worker =
workers_[iModuleIndex];
229 bool shouldContinue = worker.checkResultsOfRunWorker(
true);
230 std::exception_ptr finalException;
232 std::unique_ptr<cms::Exception> pEx;
234 std::rethrow_exception(*iException);
236 pEx = std::make_unique<cms::Exception>(oldEx);
239 std::ostringstream ost;
242 worker.getWorker()->description(), ost.str());
244 worker.skipWorker(iEP);
245 finalException = std::exception_ptr();
247 shouldContinue =
false;
248 finalException = std::current_exception();
252 shouldContinue =
false;
254 auto const nextIndex = iModuleIndex +1;
255 if (shouldContinue and nextIndex <
workers_.size()) {
260 if (not shouldContinue) {
268 finished(iModuleIndex, shouldContinue, finalException, iContext);
283 iException = std::current_exception();
298 [
this, iNextModuleIndex, &iEP,&iES, iID, iContext,
token](std::exception_ptr
const* iException)
301 this->
workerFinished(iException, iNextModuleIndex, iEP,iES,iID,iContext);
304 workers_[iNextModuleIndex].runWorkerAsync<
std::string const & pathName() const
void recordStatus(int nwrwue, bool isEvent)
void handleEarlyFinish(EventPrincipal const &)
static const std::string & codeToString(Code)
-----------— implementation details ---------------—
std::vector< EarlyDeleteHelper * > earlyDeleteHelpers_
std::vector< WorkerInPath > WorkersInPath
void add(WaitingTask *)
Adds task to the waiting list.
EventID const & id() const
std::string const & moduleName() const
std::string const & category() const
exception_actions::ActionCodes find(const std::string &category) const
void runNextWorkerAsync(unsigned int iNextModuleIndex, EventPrincipal const &, EventSetup const &, StreamID const &, StreamContext const *)
WaitingTaskList waitingTasks_
void reset()
Resets access to the resource so that added tasks will wait.
ServiceToken presentToken() const
std::string const & moduleLabel() const
std::shared_ptr< ActivityRegistry > actReg_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
void workerFinished(std::exception_ptr const *iException, unsigned int iModuleIndex, EventPrincipal const &iEP, EventSetup const &iES, StreamID const &iID, StreamContext const *iContext)
std::shared_ptr< HLTGlobalStatus > TrigResPtr
void processOneOccurrenceAsync(WaitingTask *, EventPrincipal const &, EventSetup const &, StreamID const &, StreamContext const *)
static ServiceRegistry & instance()
void addAdditionalInfo(std::string const &info)
static void exceptionContext(cms::Exception &ex, bool isEvent, bool begin, BranchType branchType, ModuleDescription const &, std::string const &id, PathContext const &)
void updateCounters(bool succeed, bool isEvent)
Path(int bitpos, std::string const &path_name, WorkersInPath const &workers, TrigResPtr trptr, ExceptionToActionTable const &actions, std::shared_ptr< ActivityRegistry > reg, StreamContext const *streamContext, std::atomic< bool > *stopProcessEvent, PathContext::PathType pathType)
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
void addContext(std::string const &context)
bool handleWorkerFailure(cms::Exception &e, int nwrwue, bool isEvent, bool begin, BranchType branchType, ModuleDescription const &, std::string const &id)
void setEarlyDeleteHelpers(std::map< const Worker *, EarlyDeleteHelper * > const &)
ExceptionToActionTable const * act_table_
Worker const * getWorker(size_type i) const
void finished(int iModuleIndex, bool iSucceeded, std::exception_ptr, StreamContext const *)
std::atomic< bool > * stopProcessingEvent_
void printCmsExceptionWarning(char const *behavior, cms::Exception const &e)