78 #include "boost/range/adaptor/reversed.hpp"
102 class SendSourceTerminationSignalIfException {
105 ~SendSourceTerminationSignalIfException() {
110 void completedSuccessfully() { reg_ =
nullptr; }
119 namespace chain = waiting_task::chain;
124 std::shared_ptr<ProductRegistry> preg,
125 std::shared_ptr<BranchIDListHelper> branchIDListHelper,
126 std::shared_ptr<ProcessBlockHelper>
const& processBlockHelper,
127 std::shared_ptr<ThinnedAssociationsHelper> thinnedAssociationsHelper,
128 std::shared_ptr<ActivityRegistry> areg,
129 std::shared_ptr<ProcessConfiguration const> processConfiguration,
132 if (main_input ==
nullptr) {
134 <<
"There must be exactly one source in the configuration.\n"
135 <<
"It is missing (or there are sufficient syntax errors such that it is not recognized as the source)\n";
140 std::unique_ptr<ParameterSetDescriptionFillerBase>
filler(
143 filler->fill(descriptions);
148 std::ostringstream ost;
149 ost <<
"Validating configuration of input source of type " << modtype;
165 processConfiguration.get(),
172 thinnedAssociationsHelper,
176 common.maxSecondsUntilRampdown_,
179 areg->preSourceConstructionSignal_(md);
180 std::unique_ptr<InputSource>
input;
183 std::shared_ptr<int> sentry(
nullptr, [areg, &md](
void*) { areg->postSourceConstructionSignal_(md); });
186 input->preEventReadFromSourceSignal_.connect(std::cref(areg->preEventReadFromSourceSignal_));
187 input->postEventReadFromSourceSignal_.connect(std::cref(areg->postEventReadFromSourceSignal_));
190 std::ostringstream ost;
191 ost <<
"Constructing input source of type " << modtype;
204 filler->fill(descriptions);
217 std::vector<std::string>
const& loopers) {
218 std::shared_ptr<EDLooperBase> vLooper;
220 assert(1 == loopers.size());
222 for (
auto const& looperName : loopers) {
235 std::vector<std::string>
const& defaultServices,
236 std::vector<std::string>
const& forcedServices)
239 branchIDListHelper_(),
242 espController_(new eventsetup::EventSetupsController),
245 processConfiguration_(),
251 deferredExceptionPtrIsSet_(
false),
255 beginJobCalled_(
false),
256 shouldWeStop_(
false),
257 fileModeNoMerge_(
false),
258 exceptionMessageFiles_(),
259 exceptionMessageRuns_(),
260 exceptionMessageLumis_(
false),
261 forceLooperToEnd_(
false),
262 looperBeginJobRun_(
false),
263 forceESCacheClearOnNewRun_(
false),
264 eventSetupDataToExcludeFromPrefetching_() {
266 processDesc->addServices(defaultServices, forcedServices);
267 init(processDesc, iToken, iLegacy);
271 std::vector<std::string>
const& defaultServices,
272 std::vector<std::string>
const& forcedServices)
275 branchIDListHelper_(),
278 espController_(new eventsetup::EventSetupsController),
281 processConfiguration_(),
287 deferredExceptionPtrIsSet_(
false),
291 beginJobCalled_(
false),
292 shouldWeStop_(
false),
293 fileModeNoMerge_(
false),
294 exceptionMessageFiles_(),
295 exceptionMessageRuns_(),
296 exceptionMessageLumis_(
false),
297 forceLooperToEnd_(
false),
298 looperBeginJobRun_(
false),
299 forceESCacheClearOnNewRun_(
false),
300 asyncStopRequestedWhileProcessingEvents_(
false),
301 eventSetupDataToExcludeFromPrefetching_() {
303 processDesc->addServices(defaultServices, forcedServices);
312 branchIDListHelper_(),
315 espController_(new eventsetup::EventSetupsController),
318 processConfiguration_(),
324 deferredExceptionPtrIsSet_(
false),
328 beginJobCalled_(
false),
329 shouldWeStop_(
false),
330 fileModeNoMerge_(
false),
331 exceptionMessageFiles_(),
332 exceptionMessageRuns_(),
333 exceptionMessageLumis_(
false),
334 forceLooperToEnd_(
false),
335 looperBeginJobRun_(
false),
336 forceESCacheClearOnNewRun_(
false),
337 asyncStopRequestedWhileProcessingEvents_(
false),
338 eventSetupDataToExcludeFromPrefetching_() {
353 std::shared_ptr<ParameterSet>
parameterSet = processDesc->getProcessPSet();
357 bool const hasSubProcesses = !subProcessVParameterSet.empty();
370 <<
"Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
377 unsigned int nThreads = optionsPset.getUntrackedParameter<
unsigned int>(
"numberOfThreads");
383 unsigned int nStreams = optionsPset.getUntrackedParameter<
unsigned int>(
"numberOfStreams");
387 unsigned int nConcurrentRuns = optionsPset.getUntrackedParameter<
unsigned int>(
"numberOfConcurrentRuns");
388 if (nConcurrentRuns != 1) {
390 <<
"Although the plan is to change this in the future, currently nConcurrentRuns must always be 1.\n";
392 unsigned int nConcurrentLumis =
393 optionsPset.getUntrackedParameter<
unsigned int>(
"numberOfConcurrentLuminosityBlocks");
394 if (nConcurrentLumis == 0) {
395 nConcurrentLumis = 2;
401 if (!loopers.empty()) {
403 if (
nStreams != 1 || nConcurrentLumis != 1 || nConcurrentRuns != 1) {
404 edm::LogWarning(
"ThreadStreamSetup") <<
"There is a looper, so the number of streams, the number "
405 "of concurrent runs, and the number of concurrent lumis "
406 "are all being reset to 1. Loopers cannot currently support "
407 "values greater than 1.";
409 nConcurrentLumis = 1;
413 bool dumpOptions = optionsPset.getUntrackedParameter<
bool>(
"dumpOptions");
423 unsigned int maxConcurrentIOVs = nConcurrentLumis;
441 optionsPset.getUntrackedParameter<
bool>(
"deleteNonConsumedUnscheduledModules");
447 auto& serviceSets = processDesc->getServicesPSets();
456 handler->willBeUsingThreads();
463 ParameterSet const& eventSetupPset(optionsPset.getUntrackedParameterSet(
"eventSetup"));
465 *
parameterSet,
items.actReg_.get(), &eventSetupPset, maxConcurrentIOVs, dumpOptions);
468 if (!loopers.empty()) {
479 lumiQueue_ = std::make_unique<LimitedTaskQueue>(nConcurrentLumis);
489 items.branchIDListHelper(),
491 items.thinnedAssociationsHelper(),
493 items.processConfiguration(),
516 auto ep = std::make_shared<EventPrincipal>(
preg(),
543 for (
auto& subProcessPSet : subProcessVParameterSet) {
599 actReg_->preallocateSignal_(bounds);
603 std::vector<ModuleProcessName> consumedBySubProcesses;
606 auto c = subProcess.keepOnlyConsumedUnscheduledModules(deleteModules);
607 if (consumedBySubProcesses.empty()) {
609 }
else if (not
c.empty()) {
610 std::vector<ModuleProcessName>
tmp;
611 tmp.reserve(consumedBySubProcesses.size() +
c.size());
613 consumedBySubProcesses.end(),
616 std::back_inserter(
tmp));
625 not unusedModules.empty()) {
628 edm::LogInfo(
"DeleteModules").log([&unusedModules](
auto&
l) {
629 l <<
"Following modules are not in any Path or EndPath, nor is their output consumed by any other module, "
631 "therefore they are deleted before beginJob transition.";
666 ex.
addContext(
"Calling beginJob for the source");
672 constexpr
bool mustPrefetchMayGet =
true;
674 auto const runLookup =
preg_->productLookup(
InRun);
675 auto const lumiLookup =
preg_->productLookup(
InLumi);
678 looper_->updateLookup(
InRun, *runLookup, mustPrefetchMayGet);
679 looper_->updateLookup(
InLumi, *lumiLookup, mustPrefetchMayGet);
681 looper_->updateLookup(
esp_->recordsToProxyIndices());
685 actReg_->postBeginJobSignal_();
696 "Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
705 c.call([&subProcess,
i]() { subProcess.doEndStream(
i); });
709 c.call([actReg]() { actReg->preEndJobSignal_(); });
718 c.call([actReg]() { actReg->postEndJobSignal_(); });
727 return schedule_->getAllModuleDescriptions();
743 #include "TransitionProcessors.icc"
747 bool returnValue =
false;
763 SendSourceTerminationSignalIfException sentry(
actReg_.get());
767 itemType =
input_->nextItemType();
771 sentry.completedSuccessfully();
784 return std::make_pair(
input_->reducedProcessHistoryID(),
input_->run());
803 bool firstTime =
true;
813 auto trans =
fp.processFiles(*
this);
824 throw cms::Exception(
"BadTransition") <<
"Unexpected transition change " << trans;
833 "Another exception was caught while trying to clean up lumis after the primary fatal exception.");
834 e.addAdditionalInfo(message);
835 if (
e.alreadyPrinted()) {
841 if (
e.alreadyPrinted()) {
847 if (
e.alreadyPrinted()) {
859 FDEBUG(1) <<
" \treadFile\n";
861 SendSourceTerminationSignalIfException sentry(
actReg_.get());
866 if (size < preg_->
size()) {
873 sentry.completedSuccessfully();
878 SendSourceTerminationSignalIfException sentry(
actReg_.get());
879 input_->closeFile(
fb_.get(), cleaningUpAfterException);
880 sentry.completedSuccessfully();
882 FDEBUG(1) <<
"\tcloseInputFile\n";
890 FDEBUG(1) <<
"\topenOutputFiles\n";
897 FDEBUG(1) <<
"\tcloseOutputFiles\n";
903 [
this](
auto& subProcess) { subProcess.updateBranchIDListHelper(
branchIDListHelper_->branchIDLists()); });
907 FDEBUG(1) <<
"\trespondToOpenInputFile\n";
915 FDEBUG(1) <<
"\trespondToCloseInputFile\n";
925 FDEBUG(1) <<
"\tstartingNewLoop\n";
931 looper_->setModuleChanger(&changer);
933 looper_->setModuleChanger(
nullptr);
939 FDEBUG(1) <<
"\tendOfLoop\n";
946 FDEBUG(1) <<
"\trewind\n";
951 FDEBUG(1) <<
"\tprepareForNextLoop\n";
955 FDEBUG(1) <<
"\tshouldWeCloseOutput\n";
958 if (subProcess.shouldWeCloseOutput()) {
968 FDEBUG(1) <<
"\tdoErrorStuff\n";
969 LogError(
"StateMachine") <<
"The EventProcessor state machine encountered an unexpected event\n"
970 <<
"and went to the error state\n"
971 <<
"Will attempt to terminate processing normally\n"
972 <<
"(IF using the looper the next loop will be attempted)\n"
973 <<
"This likely indicates a bug in an input module or corrupted input or both\n";
984 beginGlobalTransitionAsync<Traits>(
989 }
while (not globalWaitTask.
done());
992 std::rethrow_exception(*(globalWaitTask.
exceptionPtr()));
994 beginProcessBlockSucceeded =
true;
998 input_->fillProcessBlockHelper();
1000 while (
input_->nextProcessBlock(processBlockPrincipal)) {
1007 beginGlobalTransitionAsync<Traits>(
1012 }
while (not globalWaitTask.
done());
1014 std::rethrow_exception(*(globalWaitTask.
exceptionPtr()));
1021 }
while (not writeWaitTask.
done());
1045 cleaningUpAfterException);
1048 }
while (not globalWaitTask.
done());
1050 std::rethrow_exception(*(globalWaitTask.
exceptionPtr()));
1053 if (beginProcessBlockSucceeded) {
1058 }
while (not writeWaitTask.
done());
1072 bool& globalBeginSucceeded,
1073 bool& eventSetupForInstanceSucceeded) {
1074 globalBeginSucceeded =
false;
1077 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1080 sentry.completedSuccessfully();
1088 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1090 eventSetupForInstanceSucceeded =
true;
1091 sentry.completedSuccessfully();
1093 auto const& es =
esp_->eventSetupImpl();
1101 }) |
then([
this, &es](
auto nextTask)
mutable {
1109 }
while (not waitTask.
done());
1119 chain::first([&runPrincipal, &es,
this](
auto waitTask) {
1121 beginGlobalTransitionAsync<Traits>(
1123 }) |
then([&globalBeginSucceeded,
run](
auto waitTask)
mutable {
1124 globalBeginSucceeded =
true;
1125 FDEBUG(1) <<
"\tbeginRun " <<
run <<
"\n";
1126 }) |
ifThen(
looper_, [
this, &runPrincipal, &es](
auto waitTask) {
1128 }) |
ifThen(
looper_, [
this, &runPrincipal, &es](
auto waitTask) {
1134 }
while (not globalWaitTask.
done());
1136 std::rethrow_exception(*(globalWaitTask.
exceptionPtr()));
1154 }
while (not streamLoopWaitTask.
done());
1156 std::rethrow_exception(*(streamLoopWaitTask.
exceptionPtr()));
1159 FDEBUG(1) <<
"\tstreamBeginRun " <<
run <<
"\n";
1167 bool globalBeginSucceeded,
1168 bool cleaningUpAfterException,
1169 bool eventSetupForInstanceSucceeded) {
1170 if (eventSetupForInstanceSucceeded) {
1172 endRun(phid,
run, globalBeginSucceeded, cleaningUpAfterException);
1174 if (globalBeginSucceeded) {
1182 }
while (not
t.done());
1184 if (
t.exceptionPtr()) {
1185 std::rethrow_exception(*
t.exceptionPtr());
1194 bool globalBeginSucceeded,
1195 bool cleaningUpAfterException) {
1203 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1205 sentry.completedSuccessfully();
1207 auto const& es =
esp_->eventSetupImpl();
1208 if (globalBeginSucceeded) {
1221 cleaningUpAfterException);
1224 }
while (not streamLoopWaitTask.
done());
1226 std::rethrow_exception(*(streamLoopWaitTask.
exceptionPtr()));
1229 FDEBUG(1) <<
"\tstreamEndRun " <<
run <<
"\n";
1237 chain::first([
this, &runPrincipal, &es, cleaningUpAfterException](
auto nextTask) {
1240 endGlobalTransitionAsync<Traits>(
1242 }) |
ifThen(
looper_, [
this, &runPrincipal, &es](
auto nextTask) {
1244 }) |
ifThen(
looper_, [
this, &runPrincipal, &es](
auto nextTask) {
1250 }
while (not globalWaitTask.
done());
1252 std::rethrow_exception(*(globalWaitTask.
exceptionPtr()));
1255 FDEBUG(1) <<
"\tendRun " <<
run <<
"\n";
1266 input_->luminosityBlockAuxiliary()->beginTime()),
1272 }
while (not waitTask.
done());
1281 std::shared_ptr<void>
const& iRunResource,
1300 status->resetResources();
1316 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1319 sentry.completedSuccessfully();
1325 rng->preBeginLumi(lb);
1336 beginGlobalTransitionAsync<Traits>(
1343 status->globalBeginDidSucceed();
1347 }) |
then([
this,
status](std::exception_ptr
const* iPtr,
auto holder)
mutable {
1349 status->resetResources();
1350 holder.doneWaiting(*iPtr);
1353 status->globalBeginDidSucceed();
1366 auto eventSetupImpls = &
status->eventSetupImpls();
1367 auto lp =
status->lumiPrincipal().get();
1370 event.setLuminosityBlockPrincipal(lp);
1374 beginStreamTransitionAsync<Traits>(
1376 }) |
then([
this,
i](std::exception_ptr
const* exceptionFromBeginStreamLumi,
auto nextTask) {
1377 if (exceptionFromBeginStreamLumi) {
1379 tmp.doneWaiting(*exceptionFromBeginStreamLumi);
1391 status->resetResources();
1398 [
this, lumiWorkLambda =
std::move(lumiWork), iHolder](std::exception_ptr
const* iPtr)
mutable {
1415 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1421 iSync, queueLumiWorkTaskHolder,
status->endIOVWaitingTasks(),
status->eventSetupImpls());
1422 sentry.completedSuccessfully();
1424 queueLumiWorkTaskHolder.doneWaiting(std::current_exception());
1436 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1443 iSync, queueLumiWorkTaskHolder,
status->endIOVWaitingTasks(),
status->eventSetupImpls());
1444 sentry.completedSuccessfully();
1447 queueLumiWorkTaskHolder.doneWaiting(std::current_exception());
1456 status->needToContinueLumi();
1457 status->startProcessingEvents();
1460 unsigned int streamIndex = 0;
1461 tbb::task_arena arena{tbb::task_arena::attach()};
1465 iHolder.
group()->run(
1472 tmp.doneWaiting(*iPtr);
1479 std::shared_ptr<LuminosityBlockProcessingStatus> iLumiStatus) {
1482 auto& lp = *(iLumiStatus->lumiPrincipal());
1483 bool didGlobalBeginSucceed = iLumiStatus->didGlobalBeginSucceed();
1484 bool cleaningUpAfterException = iLumiStatus->cleaningUpAfterException();
1486 std::vector<std::shared_ptr<const EventSetupImpl>>
const* eventSetupImpls = &iLumiStatus->eventSetupImpls();
1489 chain::first([
this, &lp, &es, &eventSetupImpls, cleaningUpAfterException](
auto nextTask) {
1494 endGlobalTransitionAsync<Traits>(
1496 }) |
then([
this, didGlobalBeginSucceed, &lumiPrincipal = lp](
auto nextTask) {
1498 if (didGlobalBeginSucceed) {
1507 }) |
then([
status =
std::move(iLumiStatus),
this](std::exception_ptr
const* iPtr,
auto nextTask)
mutable {
1508 std::exception_ptr ptr;
1520 ptr = std::current_exception();
1525 status->resumeGlobalLumiQueue();
1529 ptr = std::current_exception();
1537 status->resetResources();
1541 ptr = std::current_exception();
1563 if (
status->streamFinishedLumi()) {
1573 lumiStatus->setEndTime();
1577 bool cleaningUpAfterException = lumiStatus->cleaningUpAfterException();
1578 auto eventSetupImpls = &lumiStatus->eventSetupImpls();
1580 if (lumiStatus->didGlobalBeginSucceed()) {
1581 auto& lumiPrincipal = *lumiStatus->lumiPrincipal();
1583 lumiPrincipal.endTime());
1586 endStreamTransitionAsync<Traits>(
std::move(lumiDoneTask),
1592 cleaningUpAfterException);
1609 }
while (not globalWaitTask.
done());
1611 std::rethrow_exception(*(globalWaitTask.
exceptionPtr()));
1617 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1618 input_->readProcessBlock(processBlockPrincipal);
1619 sentry.completedSuccessfully();
1625 <<
"Illegal attempt to insert run into cache\n"
1626 <<
"Contact a Framework Developer\n";
1628 auto rp = std::make_shared<RunPrincipal>(
input_->runAuxiliary(),
1636 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1638 sentry.completedSuccessfully();
1640 assert(
input_->reducedProcessHistoryID() == rp->reducedProcessHistoryID());
1642 return std::make_pair(rp->reducedProcessHistoryID(),
input_->run());
1649 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1650 input_->readAndMergeRun(*runPrincipal);
1651 sentry.completedSuccessfully();
1653 assert(
input_->reducedProcessHistoryID() == runPrincipal->reducedProcessHistoryID());
1654 return std::make_pair(runPrincipal->reducedProcessHistoryID(),
input_->run());
1660 <<
"Illegal attempt to insert lumi into cache\n"
1661 <<
"Run is invalid\n"
1662 <<
"Contact a Framework Developer\n";
1666 lbp->setAux(*
input_->luminosityBlockAuxiliary());
1668 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1670 sentry.completedSuccessfully();
1678 assert(lumiPrincipal.aux().sameIdentity(*
input_->luminosityBlockAuxiliary())
or
1679 input_->processHistoryRegistry().reducedProcessHistoryID(lumiPrincipal.aux().processHistoryID()) ==
1680 input_->processHistoryRegistry().reducedProcessHistoryID(
1681 input_->luminosityBlockAuxiliary()->processHistoryID()));
1682 bool lumiOK = lumiPrincipal.adjustToNewProductRegistry(*
preg());
1684 lumiPrincipal.mergeAuxiliary(*
input_->luminosityBlockAuxiliary());
1686 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1688 sentry.completedSuccessfully();
1690 return input_->luminosityBlock();
1696 task.doneWaiting(*iExcept);
1700 s.writeProcessBlockAsync(
task, processBlockType);
1716 [
this, phid,
run,
task, mergeableRunProductMetadata](std::exception_ptr
const* iExcept)
mutable {
1718 task.doneWaiting(*iExcept);
1722 s.writeRunAsync(
task, phid,
run, mergeableRunProductMetadata);
1731 mergeableRunProductMetadata);
1737 FDEBUG(1) <<
"\tdeleteRunFromCache " <<
run <<
"\n";
1743 task.doneWaiting(*iExcept);
1747 s.writeLumiAsync(
task, lumiPrincipal);
1789 std::lock_guard<std::recursive_mutex> guard(*(
sourceMutex_.get()));
1801 input_->luminosityBlockAuxiliary()->beginTime()));
1818 bool expected =
false;
1837 auto recursionTask =
make_waiting_task([
this, iTask, iStreamIndex](std::exception_ptr
const* iPtr)
mutable {
1841 bool expected =
false;
1859 if (
status->isLumiEnding()) {
1875 bool expected =
false;
1877 auto e = std::current_exception();
1890 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1894 sentry.completedSuccessfully();
1896 FDEBUG(1) <<
"\treadEvent\n";
1910 rng->postEventRead(
ev);
1915 chain::first([
this, &es, pep, iStreamIndex](
auto nextTask) {
1920 subProcess.doEventAsync(nextTask, *pep, &
streamLumiStatus_[iStreamIndex]->eventSetupImpls());
1922 }) |
ifThen(
looper_, [
this, iStreamIndex, pep](
auto nextTask) {
1926 }) |
then([pep](
auto nextTask) {
1927 FDEBUG(1) <<
"\tprocessEvent\n";
1928 pep->clearEventPrincipal();
1933 bool randomAccess =
input_->randomAccess();
1942 status =
looper_->doDuringLoop(iPrincipal, es, pc, &streamContext);
1961 FDEBUG(1) <<
"\tshouldWeStop\n";
1966 if (subProcess.terminate()) {
1982 bool expected =
false;
1991 std::unique_ptr<LogSystem>
s;
1992 for (
auto worker :
schedule_->allWorkers()) {
1993 if (worker->wantsGlobalLuminosityBlocks() and worker->globalLuminosityBlocksQueue()) {
1995 s = std::make_unique<LogSystem>(
"ModulesSynchingOnLumis");
1996 (*s) <<
"The following modules require synchronizing on LuminosityBlock boundaries:";
1998 (*s) <<
"\n " << worker->description()->moduleName() <<
" " << worker->description()->moduleLabel();
2004 std::unique_ptr<LogSystem>
s;
2005 for (
auto worker :
schedule_->allWorkers()) {
2008 s = std::make_unique<LogSystem>(
"LegacyModules");
2009 (*s) <<
"The following legacy modules are configured. Support for legacy modules\n"
2010 "is going to end soon. These modules need to be converted to have type\n"
2011 "edm::global, edm::stream, edm::one, or in rare cases edm::limited.";
2013 (*s) <<
"\n " << worker->description()->moduleName() <<
" " << worker->description()->moduleLabel();