23 std::shared_ptr<ActivityRegistry> areg,
25 std::atomic<bool>* stopProcessingEvent,
38 pathContext_(path_name, streamContext, bitpos, pathType),
39 stopProcessingEvent_(stopProcessingEvent),
40 pathStatusInserter_(nullptr),
41 pathStatusInserterWorker_(nullptr) {
42 for (
auto& workerInPath :
workers_) {
49 : timesRun_(
r.timesRun_),
50 timesPassed_(
r.timesPassed_),
51 timesFailed_(
r.timesFailed_),
52 timesExcept_(
r.timesExcept_),
53 failedModuleIndex_(
r.failedModuleIndex_),
58 act_table_(
r.act_table_),
60 pathContext_(
r.pathContext_),
61 stopProcessingEvent_(
r.stopProcessingEvent_),
62 pathStatusInserter_(
r.pathStatusInserter_),
63 pathStatusInserterWorker_(
r.pathStatusInserterWorker_) {
64 for (
auto& workerInPath :
workers_) {
77 if (
e.context().empty()) {
80 bool should_continue =
true;
89 should_continue =
false;
103 if (
e.category() == pNF) {
104 std::ostringstream ost;
105 ost <<
"If you wish to continue processing events after a " << pNF <<
" exception,\n"
106 <<
"add \"SkipEvent = cms.untracked.vstring('ProductNotFound')\" to the \"options\" PSet in the "
108 e.addAdditionalInfo(ost.str());
116 return should_continue;
126 std::ostringstream ost;
127 ost <<
"Running path '" << pathContext.
pathName() <<
"'";
130 ost <<
"Processing ";
133 if (begin && branchType ==
InRun) {
134 ost <<
"stream begin Run";
135 }
else if (begin && branchType ==
InLumi) {
136 ost <<
"stream begin LuminosityBlock ";
137 }
else if (!begin && branchType ==
InLumi) {
138 ost <<
"stream end LuminosityBlock ";
139 }
else if (!begin && branchType ==
InRun) {
140 ost <<
"stream end Run ";
141 }
else if (isEvent) {
150 bool expected =
false;
151 while (
stateLock_.compare_exchange_strong(expected,
true)) {
199 using std::placeholders::_1;
207 if (
found != iWorkerToDeleter.end()) {
208 found->second->addedToPath();
237 finished(std::exception_ptr(), iStreamContext, iInfo, iStreamID);
245 unsigned int iModuleIndex,
255 auto& worker =
workers_[iModuleIndex];
256 bool shouldContinue = worker.checkResultsOfRunWorker(
true);
257 std::exception_ptr finalException;
259 std::unique_ptr<cms::Exception> pEx;
261 std::rethrow_exception(*iException);
263 pEx = std::unique_ptr<cms::Exception>(oldEx.
clone());
267 std::ostringstream ost;
274 worker.getWorker()->description(),
277 worker.skipWorker(iEP);
278 finalException = std::exception_ptr();
280 shouldContinue =
false;
281 finalException = std::current_exception();
290 shouldContinue =
false;
292 auto const nextIndex = iModuleIndex + 1;
293 if (shouldContinue and nextIndex <
workers_.size()) {
294 if (not worker.runConcurrently()) {
301 if (not shouldContinue) {
304 if (not shouldContinue and not worker.runConcurrently()) {
306 for (
auto it =
workers_.begin() + nextIndex, itEnd =
workers_.end(); it != itEnd; ++it) {
313 finished(finalException, iContext, iInfo, iID);
330 std::exception_ptr jException =
333 if (jException && not iException) {
334 iException = jException;
338 if (not iException) {
339 iException = std::current_exception();
351 const int firstModuleIndex = iNextModuleIndex;
352 int lastModuleIndex = firstModuleIndex;
353 while (lastModuleIndex + 1 != static_cast<int>(
workers_.size()) and
workers_[lastModuleIndex].runConcurrently()) {
356 for (; lastModuleIndex >= firstModuleIndex; --lastModuleIndex) {
358 tbb::task::allocate_root(),
359 [
this, lastModuleIndex,
info = iInfo, iID, iContext,
token = iToken](std::exception_ptr
const* iException) {
363 nextTask, iInfo, iToken, iID, iContext);