23 std::shared_ptr<ActivityRegistry> areg,
25 std::atomic<bool>* stopProcessingEvent,
38 pathContext_(path_name, streamContext, bitpos, pathType),
39 stopProcessingEvent_(stopProcessingEvent),
40 pathStatusInserter_(nullptr),
41 pathStatusInserterWorker_(nullptr) {
42 for (
auto& workerInPath :
workers_) {
49 : timesRun_(
r.timesRun_),
50 timesPassed_(
r.timesPassed_),
51 timesFailed_(
r.timesFailed_),
52 timesExcept_(
r.timesExcept_),
53 failedModuleIndex_(
r.failedModuleIndex_),
58 act_table_(
r.act_table_),
60 pathContext_(
r.pathContext_),
61 stopProcessingEvent_(
r.stopProcessingEvent_),
62 pathStatusInserter_(
r.pathStatusInserter_),
63 pathStatusInserterWorker_(
r.pathStatusInserterWorker_) {
64 for (
auto& workerInPath :
workers_) {
77 if (
e.context().empty()) {
80 bool should_continue =
true;
89 should_continue =
false;
103 if (
e.category() == pNF) {
104 std::ostringstream ost;
105 ost <<
"If you wish to continue processing events after a " << pNF <<
" exception,\n"
106 <<
"add \"SkipEvent = cms.untracked.vstring('ProductNotFound')\" to the \"options\" PSet in the "
108 e.addAdditionalInfo(ost.str());
116 return should_continue;
126 std::ostringstream ost;
127 ost <<
"Running path '" << pathContext.
pathName() <<
"'";
130 ost <<
"Processing ";
133 if (begin && branchType ==
InRun) {
134 ost <<
"stream begin Run";
135 }
else if (begin && branchType ==
InLumi) {
136 ost <<
"stream begin LuminosityBlock ";
137 }
else if (!begin && branchType ==
InLumi) {
138 ost <<
"stream end LuminosityBlock ";
139 }
else if (!begin && branchType ==
InRun) {
140 ost <<
"stream end Run ";
141 }
else if (isEvent) {
150 bool expected =
false;
151 while (
stateLock_.compare_exchange_strong(expected,
true)) {
199 using std::placeholders::_1;
207 if (
found != iWorkerToDeleter.end()) {
208 found->second->addedToPath();
237 finished(std::exception_ptr(), iStreamContext, iInfo, iStreamID);
245 unsigned int iModuleIndex,
250 tbb::task_group& iGroup) {
256 auto& worker =
workers_[iModuleIndex];
257 bool shouldContinue = worker.checkResultsOfRunWorker(
true);
258 std::exception_ptr finalException;
260 std::unique_ptr<cms::Exception> pEx;
262 std::rethrow_exception(*iException);
264 pEx = std::unique_ptr<cms::Exception>(oldEx.
clone());
268 std::ostringstream ost;
280 worker.skipWorker(iEP);
281 finalException = std::exception_ptr();
283 shouldContinue =
false;
284 finalException = std::current_exception();
293 shouldContinue =
false;
295 auto const nextIndex = iModuleIndex + 1;
296 if (shouldContinue and nextIndex <
workers_.size()) {
297 if (not worker.runConcurrently()) {
304 if (not shouldContinue) {
307 if (not shouldContinue and not worker.runConcurrently()) {
309 for (
auto it =
workers_.begin() + nextIndex, itEnd =
workers_.end(); it != itEnd; ++it) {
316 finished(finalException, iContext, iInfo, iID);
333 std::exception_ptr jException =
336 if (jException && not iException) {
337 iException = jException;
341 if (not iException) {
342 iException = std::current_exception();
353 tbb::task_group& iGroup) {
355 const int firstModuleIndex = iNextModuleIndex;
356 int lastModuleIndex = firstModuleIndex;
357 while (lastModuleIndex + 1 != static_cast<int>(
workers_.size()) and
workers_[lastModuleIndex].runConcurrently()) {
360 for (; lastModuleIndex >= firstModuleIndex; --lastModuleIndex) {
362 auto nextTask =
make_waiting_task([
this, lastModuleIndex,
info = iInfo, iID, iContext, weakToken, &iGroup](
363 std::exception_ptr
const* iException) {