21 std::shared_ptr<ActivityRegistry> areg,
23 std::atomic<bool>* stopProcessingEvent,
36 pathContext_(path_name, streamContext, bitpos, pathType),
37 stopProcessingEvent_(stopProcessingEvent),
38 pathStatusInserter_(nullptr),
39 pathStatusInserterWorker_(nullptr) {
40 for (
auto& workerInPath :
workers_) {
47 : timesRun_(
r.timesRun_),
48 timesPassed_(
r.timesPassed_),
49 timesFailed_(
r.timesFailed_),
50 timesExcept_(
r.timesExcept_),
51 failedModuleIndex_(
r.failedModuleIndex_),
56 act_table_(
r.act_table_),
58 pathContext_(
r.pathContext_),
59 stopProcessingEvent_(
r.stopProcessingEvent_),
60 pathStatusInserter_(
r.pathStatusInserter_),
61 pathStatusInserterWorker_(
r.pathStatusInserterWorker_) {
62 for (
auto& workerInPath :
workers_) {
75 if (
e.context().empty()) {
78 bool should_continue =
true;
87 should_continue =
false;
101 if (
e.category() == pNF) {
102 std::ostringstream ost;
103 ost <<
"If you wish to continue processing events after a " << pNF <<
" exception,\n"
104 <<
"add \"SkipEvent = cms.untracked.vstring('ProductNotFound')\" to the \"options\" PSet in the "
106 e.addAdditionalInfo(ost.str());
114 return should_continue;
124 std::ostringstream ost;
125 ost <<
"Running path '" << pathContext.
pathName() <<
"'";
128 ost <<
"Processing ";
132 ost <<
"stream begin Run";
134 ost <<
"stream begin LuminosityBlock ";
136 ost <<
"stream end LuminosityBlock ";
138 ost <<
"stream end Run ";
139 }
else if (isEvent) {
148 bool expected =
false;
149 while (
stateLock_.compare_exchange_strong(expected,
true)) {
197 using std::placeholders::_1;
205 if (
found != iWorkerToDeleter.end()) {
206 found->second->addedToPath();
236 finished(std::exception_ptr(), iStreamContext, iEP, iES, iStreamID);
244 unsigned int iModuleIndex,
254 auto& worker =
workers_[iModuleIndex];
255 bool shouldContinue = worker.checkResultsOfRunWorker(
true);
256 std::exception_ptr finalException;
258 std::unique_ptr<cms::Exception> pEx;
260 std::rethrow_exception(*iException);
262 pEx = std::unique_ptr<cms::Exception>(oldEx.
clone());
266 std::ostringstream ost;
273 worker.getWorker()->description(),
276 worker.skipWorker(iEP);
277 finalException = std::exception_ptr();
279 shouldContinue =
false;
280 finalException = std::current_exception();
289 shouldContinue =
false;
291 auto const nextIndex = iModuleIndex + 1;
292 if (shouldContinue and nextIndex <
workers_.size()) {
293 if (not worker.runConcurrently()) {
300 if (not shouldContinue) {
303 if (not shouldContinue and not worker.runConcurrently()) {
305 for (
auto it =
workers_.begin() + nextIndex, itEnd =
workers_.end(); it != itEnd; ++it) {
312 finished(finalException, iContext, iEP, iES, iID);
330 std::exception_ptr jException =
333 if (jException && not iException) {
334 iException = jException;
338 if (not iException) {
339 iException = std::current_exception();
352 const int firstModuleIndex = iNextModuleIndex;
353 int lastModuleIndex = firstModuleIndex;
354 while (lastModuleIndex + 1 != static_cast<int>(
workers_.size()) and
workers_[lastModuleIndex].runConcurrently()) {
357 for (; lastModuleIndex >= firstModuleIndex; --lastModuleIndex) {
359 tbb::task::allocate_root(),
360 [
this, lastModuleIndex, &iEP, &iES, iID, iContext,
token = iToken](std::exception_ptr
const* iException) {
364 nextTask, iEP, iES, iToken, iID, iContext);