74 #include "boost/range/adaptor/reversed.hpp"
98 class SendSourceTerminationSignalIfException {
101 ~SendSourceTerminationSignalIfException() {
106 void completedSuccessfully() { reg_ =
nullptr; }
119 std::shared_ptr<ProductRegistry> preg,
120 std::shared_ptr<BranchIDListHelper> branchIDListHelper,
121 std::shared_ptr<ThinnedAssociationsHelper> thinnedAssociationsHelper,
122 std::shared_ptr<ActivityRegistry> areg,
123 std::shared_ptr<ProcessConfiguration const> processConfiguration,
126 if (main_input ==
nullptr) {
128 <<
"There must be exactly one source in the configuration.\n"
129 <<
"It is missing (or there are sufficient syntax errors such that it is not recognized as the source)\n";
134 std::unique_ptr<ParameterSetDescriptionFillerBase>
filler(
137 filler->fill(descriptions);
142 std::ostringstream ost;
143 ost <<
"Validating configuration of input source of type " << modtype;
159 processConfiguration.get(),
165 thinnedAssociationsHelper,
169 common.maxSecondsUntilRampdown_,
172 areg->preSourceConstructionSignal_(md);
173 std::unique_ptr<InputSource>
input;
176 std::shared_ptr<int> sentry(
nullptr, [areg, &md](
void*) { areg->postSourceConstructionSignal_(md); });
179 input->preEventReadFromSourceSignal_.connect(std::cref(areg->preEventReadFromSourceSignal_));
180 input->postEventReadFromSourceSignal_.connect(std::cref(areg->postEventReadFromSourceSignal_));
183 std::ostringstream ost;
184 ost <<
"Constructing input source of type " << modtype;
195 std::shared_ptr<EDLooperBase> vLooper;
197 std::vector<std::string> loopers =
params.getParameter<std::vector<std::string>>(
"@all_loopers");
199 if (loopers.empty()) {
203 assert(1 == loopers.size());
205 for (std::vector<std::string>::iterator itName = loopers.begin(), itNameEnd = loopers.end(); itName != itNameEnd;
218 std::vector<std::string>
const& defaultServices,
219 std::vector<std::string>
const& forcedServices)
222 branchIDListHelper_(),
225 espController_(new eventsetup::EventSetupsController),
228 processConfiguration_(),
234 deferredExceptionPtrIsSet_(
false),
238 beginJobCalled_(
false),
239 shouldWeStop_(
false),
240 fileModeNoMerge_(
false),
241 exceptionMessageFiles_(),
242 exceptionMessageRuns_(),
243 exceptionMessageLumis_(
false),
244 forceLooperToEnd_(
false),
245 looperBeginJobRun_(
false),
246 forceESCacheClearOnNewRun_(
false),
247 eventSetupDataToExcludeFromPrefetching_() {
249 processDesc->addServices(defaultServices, forcedServices);
250 init(processDesc, iToken, iLegacy);
254 std::vector<std::string>
const& defaultServices,
255 std::vector<std::string>
const& forcedServices)
258 branchIDListHelper_(),
261 espController_(new eventsetup::EventSetupsController),
264 processConfiguration_(),
270 deferredExceptionPtrIsSet_(
false),
274 beginJobCalled_(
false),
275 shouldWeStop_(
false),
276 fileModeNoMerge_(
false),
277 exceptionMessageFiles_(),
278 exceptionMessageRuns_(),
279 exceptionMessageLumis_(
false),
280 forceLooperToEnd_(
false),
281 looperBeginJobRun_(
false),
282 forceESCacheClearOnNewRun_(
false),
283 asyncStopRequestedWhileProcessingEvents_(
false),
284 eventSetupDataToExcludeFromPrefetching_() {
286 processDesc->addServices(defaultServices, forcedServices);
295 branchIDListHelper_(),
298 espController_(new eventsetup::EventSetupsController),
301 processConfiguration_(),
307 deferredExceptionPtrIsSet_(
false),
311 beginJobCalled_(
false),
312 shouldWeStop_(
false),
313 fileModeNoMerge_(
false),
314 exceptionMessageFiles_(),
315 exceptionMessageRuns_(),
316 exceptionMessageLumis_(
false),
317 forceLooperToEnd_(
false),
318 looperBeginJobRun_(
false),
319 forceESCacheClearOnNewRun_(
false),
320 asyncStopRequestedWhileProcessingEvents_(
false),
321 eventSetupDataToExcludeFromPrefetching_() {
336 std::shared_ptr<ParameterSet>
parameterSet = processDesc->getProcessPSet();
340 bool const hasSubProcesses = !subProcessVParameterSet.empty();
353 <<
"Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
360 unsigned int nThreads = optionsPset.getUntrackedParameter<
unsigned int>(
"numberOfThreads");
366 unsigned int nStreams = optionsPset.getUntrackedParameter<
unsigned int>(
"numberOfStreams");
371 edm::LogInfo(
"ThreadStreamSetup") <<
"setting # threads " <<
nThreads <<
"\nsetting # streams " << nStreams;
373 unsigned int nConcurrentRuns = optionsPset.getUntrackedParameter<
unsigned int>(
"numberOfConcurrentRuns");
374 if (nConcurrentRuns != 1) {
376 <<
"Although the plan is to change this in the future, currently nConcurrentRuns must always be 1.\n";
378 unsigned int nConcurrentLumis =
379 optionsPset.getUntrackedParameter<
unsigned int>(
"numberOfConcurrentLuminosityBlocks");
380 if (nConcurrentLumis == 0) {
381 nConcurrentLumis = nConcurrentRuns;
404 auto& serviceSets = processDesc->getServicesPSets();
413 handler->willBeUsingThreads();
420 ParameterSet const& eventSetupPset(optionsPset.getUntrackedParameterSet(
"eventSetup"));
431 nConcurrentLumis = 1;
438 lumiQueue_ = std::make_unique<LimitedTaskQueue>(nConcurrentLumis);
446 items.branchIDListHelper(),
447 items.thinnedAssociationsHelper(),
449 items.processConfiguration(),
471 auto ep = std::make_shared<EventPrincipal>(
preg(),
488 for (
auto& subProcessPSet : subProcessVParameterSet) {
536 actReg_->preallocateSignal_(bounds);
564 ex.
addContext(
"Calling beginJob for the source");
571 actReg_->postBeginJobSignal_();
582 "Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
591 c.call([&subProcess,
i]() { subProcess.doEndStream(
i); });
595 c.call([actReg]() { actReg->preEndJobSignal_(); });
604 c.call([actReg]() { actReg->postEndJobSignal_(); });
613 return schedule_->getAllModuleDescriptions();
631 #include "TransitionProcessors.icc"
635 bool returnValue =
false;
651 SendSourceTerminationSignalIfException sentry(
actReg_.get());
655 itemType =
input_->nextItemType();
659 sentry.completedSuccessfully();
672 return std::make_pair(
input_->reducedProcessHistoryID(),
input_->run());
691 bool firstTime =
true;
701 auto trans =
fp.processFiles(*
this);
712 throw cms::Exception(
"BadTransition") <<
"Unexpected transition change " << trans;
721 "Another exception was caught while trying to clean up lumis after the primary fatal exception.");
722 e.addAdditionalInfo(message);
723 if (
e.alreadyPrinted()) {
729 if (
e.alreadyPrinted()) {
735 if (
e.alreadyPrinted()) {
747 FDEBUG(1) <<
" \treadFile\n";
749 SendSourceTerminationSignalIfException sentry(
actReg_.get());
754 if (size < preg_->
size()) {
761 sentry.completedSuccessfully();
765 if (
fb_.get() !=
nullptr) {
766 SendSourceTerminationSignalIfException sentry(
actReg_.get());
767 input_->closeFile(
fb_.get(), cleaningUpAfterException);
768 sentry.completedSuccessfully();
770 FDEBUG(1) <<
"\tcloseInputFile\n";
774 if (
fb_.get() !=
nullptr) {
778 FDEBUG(1) <<
"\topenOutputFiles\n";
782 if (
fb_.get() !=
nullptr) {
786 FDEBUG(1) <<
"\tcloseOutputFiles\n";
791 [
this](
auto& subProcess) { subProcess.updateBranchIDListHelper(
branchIDListHelper_->branchIDLists()); });
792 if (
fb_.get() !=
nullptr) {
796 FDEBUG(1) <<
"\trespondToOpenInputFile\n";
800 if (
fb_.get() !=
nullptr) {
804 FDEBUG(1) <<
"\trespondToCloseInputFile\n";
814 FDEBUG(1) <<
"\tstartingNewLoop\n";
820 looper_->setModuleChanger(&changer);
822 looper_->setModuleChanger(
nullptr);
828 FDEBUG(1) <<
"\tendOfLoop\n";
835 FDEBUG(1) <<
"\trewind\n";
840 FDEBUG(1) <<
"\tprepareForNextLoop\n";
844 FDEBUG(1) <<
"\tshouldWeCloseOutput\n";
847 if (subProcess.shouldWeCloseOutput()) {
857 FDEBUG(1) <<
"\tdoErrorStuff\n";
858 LogError(
"StateMachine") <<
"The EventProcessor state machine encountered an unexpected event\n"
859 <<
"and went to the error state\n"
860 <<
"Will attempt to terminate processing normally\n"
861 <<
"(IF using the looper the next loop will be attempted)\n"
862 <<
"This likely indicates a bug in an input module or corrupted input or both\n";
867 bool& globalBeginSucceeded,
868 bool& eventSetupForInstanceSucceeded) {
869 globalBeginSucceeded =
false;
872 SendSourceTerminationSignalIfException sentry(
actReg_.get());
875 sentry.completedSuccessfully();
883 SendSourceTerminationSignalIfException sentry(
actReg_.get());
885 eventSetupForInstanceSucceeded =
true;
886 sentry.completedSuccessfully();
888 auto const& es =
esp_->eventSetupImpl();
898 globalWaitTask->increment_ref_count();
907 globalWaitTask->wait_for_all();
908 if (globalWaitTask->exceptionPtr() !=
nullptr) {
909 std::rethrow_exception(*(globalWaitTask->exceptionPtr()));
912 globalBeginSucceeded =
true;
913 FDEBUG(1) <<
"\tbeginRun " <<
run <<
"\n";
920 streamLoopWaitTask->increment_ref_count();
924 beginStreamsTransitionAsync<Traits>(streamLoopWaitTask.get(),
934 streamLoopWaitTask->wait_for_all();
935 if (streamLoopWaitTask->exceptionPtr() !=
nullptr) {
936 std::rethrow_exception(*(streamLoopWaitTask->exceptionPtr()));
939 FDEBUG(1) <<
"\tstreamBeginRun " <<
run <<
"\n";
947 bool globalBeginSucceeded,
948 bool cleaningUpAfterException,
949 bool eventSetupForInstanceSucceeded) {
950 if (eventSetupForInstanceSucceeded) {
952 endRun(phid,
run, globalBeginSucceeded, cleaningUpAfterException);
954 if (globalBeginSucceeded) {
956 t->increment_ref_count();
963 if (
t->exceptionPtr()) {
964 std::rethrow_exception(*
t->exceptionPtr());
973 bool globalBeginSucceeded,
974 bool cleaningUpAfterException) {
982 SendSourceTerminationSignalIfException sentry(
actReg_.get());
984 sentry.completedSuccessfully();
986 auto const& es =
esp_->eventSetupImpl();
987 if (globalBeginSucceeded) {
990 streamLoopWaitTask->increment_ref_count();
1003 cleaningUpAfterException);
1005 streamLoopWaitTask->wait_for_all();
1006 if (streamLoopWaitTask->exceptionPtr() !=
nullptr) {
1007 std::rethrow_exception(*(streamLoopWaitTask->exceptionPtr()));
1010 FDEBUG(1) <<
"\tstreamEndRun " <<
run <<
"\n";
1016 globalWaitTask->increment_ref_count();
1027 cleaningUpAfterException);
1028 globalWaitTask->wait_for_all();
1029 if (globalWaitTask->exceptionPtr() !=
nullptr) {
1030 std::rethrow_exception(*(globalWaitTask->exceptionPtr()));
1033 FDEBUG(1) <<
"\tendRun " <<
run <<
"\n";
1041 waitTask->increment_ref_count();
1049 input_->luminosityBlockAuxiliary()->beginTime()),
1053 waitTask->wait_for_all();
1055 if (waitTask->exceptionPtr() !=
nullptr) {
1056 std::rethrow_exception(*(waitTask->exceptionPtr()));
1062 std::shared_ptr<void>
const& iRunResource,
1081 status->resetResources();
1096 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1099 sentry.completedSuccessfully();
1105 rng->preBeginLumi(lb);
1112 tbb::task::allocate_root(), [
this, holder = iHolder,
status, ts](std::exception_ptr
const* iPtr)
mutable {
1114 status->resetResources();
1115 holder.doneWaiting(*iPtr);
1117 status->globalBeginDidSucceed();
1127 status->resetResources();
1128 holder.doneWaiting(std::current_exception());
1141 std::exception_ptr
const* exceptionFromBeginStreamLumi)
mutable {
1142 if (exceptionFromBeginStreamLumi) {
1144 tmp.doneWaiting(*exceptionFromBeginStreamLumi);
1154 auto eventSetupImpls = &
status->eventSetupImpls();
1155 auto lp =
status->lumiPrincipal().get();
1158 event.setLuminosityBlockPrincipal(lp);
1179 beginGlobalTransitionAsync<Traits>(beginStreamsHolder,
1184 &
status->eventSetupImpls(),
1189 status->resetResources();
1196 tbb::task::allocate_root(),
1197 [
this, lumiWorkLambda =
std::move(lumiWork), iHolder](std::exception_ptr
const* iPtr)
mutable {
1214 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1220 iSync, queueLumiWorkTaskHolder,
status->endIOVWaitingTasks(),
status->eventSetupImpls());
1221 sentry.completedSuccessfully();
1223 queueLumiWorkTaskHolder.doneWaiting(std::current_exception());
1235 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1242 iSync, queueLumiWorkTaskHolder,
status->endIOVWaitingTasks(),
status->eventSetupImpls());
1243 sentry.completedSuccessfully();
1246 queueLumiWorkTaskHolder.doneWaiting(std::current_exception());
1255 status->needToContinueLumi();
1256 status->startProcessingEvents();
1259 unsigned int streamIndex = 0;
1273 tmp.doneWaiting(*iPtr);
1280 std::shared_ptr<LuminosityBlockProcessingStatus> iLumiStatus) {
1283 auto& lp = *(iLumiStatus->lumiPrincipal());
1284 bool didGlobalBeginSucceed = iLumiStatus->didGlobalBeginSucceed();
1285 bool cleaningUpAfterException = iLumiStatus->cleaningUpAfterException();
1287 std::vector<std::shared_ptr<const EventSetupImpl>>
const* eventSetupImpls = &iLumiStatus->eventSetupImpls();
1290 tbb::task::allocate_root(),
1292 std::exception_ptr ptr;
1300 auto& lumiPrincipal = *(
status->lumiPrincipal());
1305 ptr = std::current_exception();
1316 ptr = std::current_exception();
1321 status->resumeGlobalLumiQueue();
1325 ptr = std::current_exception();
1333 status->resetResources();
1337 ptr = std::current_exception();
1347 tbb::task::allocate_root(),
1349 std::exception_ptr
const* iExcept)
mutable {
1351 task.doneWaiting(*iExcept);
1354 if (didGlobalBeginSucceed) {
1372 cleaningUpAfterException);
1377 [
this, iStreamIndex, iTask](std::exception_ptr
const* iPtr)
mutable {
1388 if (
status->streamFinishedLumi()) {
1398 lumiStatus->setEndTime();
1402 bool cleaningUpAfterException = lumiStatus->cleaningUpAfterException();
1403 auto eventSetupImpls = &lumiStatus->eventSetupImpls();
1405 if (lumiStatus->didGlobalBeginSucceed()) {
1406 auto& lumiPrincipal = *lumiStatus->lumiPrincipal();
1408 lumiPrincipal.endTime());
1410 endStreamTransitionAsync<Traits>(
std::move(lumiDoneTask),
1419 cleaningUpAfterException);
1426 globalWaitTask->increment_ref_count();
1435 globalWaitTask->wait_for_all();
1436 if (globalWaitTask->exceptionPtr() !=
nullptr) {
1437 std::rethrow_exception(*(globalWaitTask->exceptionPtr()));
1445 <<
"Illegal attempt to insert run into cache\n"
1446 <<
"Contact a Framework Developer\n";
1448 auto rp = std::make_shared<RunPrincipal>(
input_->runAuxiliary(),
1456 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1458 sentry.completedSuccessfully();
1460 assert(
input_->reducedProcessHistoryID() == rp->reducedProcessHistoryID());
1462 return std::make_pair(rp->reducedProcessHistoryID(),
input_->run());
1469 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1470 input_->readAndMergeRun(*runPrincipal);
1471 sentry.completedSuccessfully();
1473 assert(
input_->reducedProcessHistoryID() == runPrincipal->reducedProcessHistoryID());
1474 return std::make_pair(runPrincipal->reducedProcessHistoryID(),
input_->run());
1480 <<
"Illegal attempt to insert lumi into cache\n"
1481 <<
"Run is invalid\n"
1482 <<
"Contact a Framework Developer\n";
1486 lbp->setAux(*
input_->luminosityBlockAuxiliary());
1488 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1490 sentry.completedSuccessfully();
1498 assert(lumiPrincipal.aux().sameIdentity(*
input_->luminosityBlockAuxiliary())
or
1499 input_->processHistoryRegistry().reducedProcessHistoryID(lumiPrincipal.aux().processHistoryID()) ==
1500 input_->processHistoryRegistry().reducedProcessHistoryID(
1501 input_->luminosityBlockAuxiliary()->processHistoryID()));
1502 bool lumiOK = lumiPrincipal.adjustToNewProductRegistry(*
preg());
1504 lumiPrincipal.mergeAuxiliary(*
input_->luminosityBlockAuxiliary());
1506 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1508 sentry.completedSuccessfully();
1510 return input_->luminosityBlock();
1518 tbb::task::allocate_root(),
1519 [
this, phid,
run,
task, mergeableRunProductMetadata](std::exception_ptr
const* iExcept)
mutable {
1521 task.doneWaiting(*iExcept);
1525 s.writeRunAsync(
task, phid,
run, mergeableRunProductMetadata);
1534 mergeableRunProductMetadata);
1540 FDEBUG(1) <<
"\tdeleteRunFromCache " <<
run <<
"\n";
1545 [
this,
task, &lumiPrincipal](std::exception_ptr
const* iExcept)
mutable {
1547 task.doneWaiting(*iExcept);
1551 s.writeLumiAsync(
task, lumiPrincipal);
1593 std::lock_guard<std::recursive_mutex> guard(*(
sourceMutex_.get()));
1605 input_->luminosityBlockAuxiliary()->beginTime()));
1622 bool expected =
false;
1642 tbb::task::allocate_root(), [
this, iTask, iStreamIndex](std::exception_ptr
const* iPtr)
mutable {
1646 bool expected =
false;
1664 if (
status->isLumiEnding()) {
1680 bool expected =
false;
1682 auto e = std::current_exception();
1695 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1699 sentry.completedSuccessfully();
1701 FDEBUG(1) <<
"\treadEvent\n";
1716 rng->postEventRead(
ev);
1720 tbb::task::allocate_root(), [
this, pep, iHolder, iStreamIndex](std::exception_ptr
const* iPtr)
mutable {
1727 FDEBUG(1) <<
"\tprocessEvent\n";
1728 pep->clearEventPrincipal();
1737 afterProcessTask =
std::move(finalizeEventTask);
1742 tbb::task::allocate_root(),
1743 [
this, pep, finalizeEventTask, iStreamIndex](std::exception_ptr
const* iPtr)
mutable {
1749 subProcess.doEventAsync(finalizeEventTask, *pep, &
streamLumiStatus_[iStreamIndex]->eventSetupImpls());
1762 bool randomAccess =
input_->randomAccess();
1771 status =
looper_->doDuringLoop(iPrincipal, es, pc, &streamContext);
1790 FDEBUG(1) <<
"\tshouldWeStop\n";
1795 if (subProcess.terminate()) {
1811 bool expected =
false;
1820 std::unique_ptr<LogSystem>
s;
1821 for (
auto worker :
schedule_->allWorkers()) {
1822 if (worker->wantsGlobalLuminosityBlocks() and worker->globalLuminosityBlocksQueue()) {
1824 s = std::make_unique<LogSystem>(
"ModulesSynchingOnLumis");
1825 (*s) <<
"The following modules require synchronizing on LuminosityBlock boundaries:";
1827 (*s) <<
"\n " << worker->description().moduleName() <<
" " << worker->description().moduleLabel();