77 #include "boost/range/adaptor/reversed.hpp"
101 class SendSourceTerminationSignalIfException {
104 ~SendSourceTerminationSignalIfException() {
109 void completedSuccessfully() { reg_ =
nullptr; }
122 std::shared_ptr<ProductRegistry> preg,
123 std::shared_ptr<BranchIDListHelper> branchIDListHelper,
124 std::shared_ptr<ThinnedAssociationsHelper> thinnedAssociationsHelper,
125 std::shared_ptr<ActivityRegistry> areg,
126 std::shared_ptr<ProcessConfiguration const> processConfiguration,
129 if (main_input ==
nullptr) {
131 <<
"There must be exactly one source in the configuration.\n"
132 <<
"It is missing (or there are sufficient syntax errors such that it is not recognized as the source)\n";
137 std::unique_ptr<ParameterSetDescriptionFillerBase>
filler(
140 filler->fill(descriptions);
145 std::ostringstream ost;
146 ost <<
"Validating configuration of input source of type " << modtype;
162 processConfiguration.get(),
168 thinnedAssociationsHelper,
172 common.maxSecondsUntilRampdown_,
175 areg->preSourceConstructionSignal_(md);
176 std::unique_ptr<InputSource>
input;
179 std::shared_ptr<int> sentry(
nullptr, [areg, &md](
void*) { areg->postSourceConstructionSignal_(md); });
182 input->preEventReadFromSourceSignal_.connect(std::cref(areg->preEventReadFromSourceSignal_));
183 input->postEventReadFromSourceSignal_.connect(std::cref(areg->postEventReadFromSourceSignal_));
186 std::ostringstream ost;
187 ost <<
"Constructing input source of type " << modtype;
200 filler->fill(descriptions);
213 std::shared_ptr<EDLooperBase> vLooper;
215 std::vector<std::string> loopers =
params.getParameter<std::vector<std::string>>(
"@all_loopers");
217 if (loopers.empty()) {
221 assert(1 == loopers.size());
223 for (std::vector<std::string>::iterator itName = loopers.begin(), itNameEnd = loopers.end(); itName != itNameEnd;
237 std::vector<std::string>
const& defaultServices,
238 std::vector<std::string>
const& forcedServices)
241 branchIDListHelper_(),
244 espController_(new eventsetup::EventSetupsController),
247 processConfiguration_(),
253 deferredExceptionPtrIsSet_(
false),
257 beginJobCalled_(
false),
258 shouldWeStop_(
false),
259 fileModeNoMerge_(
false),
260 exceptionMessageFiles_(),
261 exceptionMessageRuns_(),
262 exceptionMessageLumis_(
false),
263 forceLooperToEnd_(
false),
264 looperBeginJobRun_(
false),
265 forceESCacheClearOnNewRun_(
false),
266 eventSetupDataToExcludeFromPrefetching_() {
268 processDesc->addServices(defaultServices, forcedServices);
269 init(processDesc, iToken, iLegacy);
273 std::vector<std::string>
const& defaultServices,
274 std::vector<std::string>
const& forcedServices)
277 branchIDListHelper_(),
280 espController_(new eventsetup::EventSetupsController),
283 processConfiguration_(),
289 deferredExceptionPtrIsSet_(
false),
293 beginJobCalled_(
false),
294 shouldWeStop_(
false),
295 fileModeNoMerge_(
false),
296 exceptionMessageFiles_(),
297 exceptionMessageRuns_(),
298 exceptionMessageLumis_(
false),
299 forceLooperToEnd_(
false),
300 looperBeginJobRun_(
false),
301 forceESCacheClearOnNewRun_(
false),
302 asyncStopRequestedWhileProcessingEvents_(
false),
303 eventSetupDataToExcludeFromPrefetching_() {
305 processDesc->addServices(defaultServices, forcedServices);
314 branchIDListHelper_(),
317 espController_(new eventsetup::EventSetupsController),
320 processConfiguration_(),
326 deferredExceptionPtrIsSet_(
false),
330 beginJobCalled_(
false),
331 shouldWeStop_(
false),
332 fileModeNoMerge_(
false),
333 exceptionMessageFiles_(),
334 exceptionMessageRuns_(),
335 exceptionMessageLumis_(
false),
336 forceLooperToEnd_(
false),
337 looperBeginJobRun_(
false),
338 forceESCacheClearOnNewRun_(
false),
339 asyncStopRequestedWhileProcessingEvents_(
false),
340 eventSetupDataToExcludeFromPrefetching_() {
355 std::shared_ptr<ParameterSet>
parameterSet = processDesc->getProcessPSet();
359 bool const hasSubProcesses = !subProcessVParameterSet.empty();
372 <<
"Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
379 unsigned int nThreads = optionsPset.getUntrackedParameter<
unsigned int>(
"numberOfThreads");
385 unsigned int nStreams = optionsPset.getUntrackedParameter<
unsigned int>(
"numberOfStreams");
392 unsigned int nConcurrentRuns = optionsPset.getUntrackedParameter<
unsigned int>(
"numberOfConcurrentRuns");
393 if (nConcurrentRuns != 1) {
395 <<
"Although the plan is to change this in the future, currently nConcurrentRuns must always be 1.\n";
397 unsigned int nConcurrentLumis =
398 optionsPset.getUntrackedParameter<
unsigned int>(
"numberOfConcurrentLuminosityBlocks");
399 if (nConcurrentLumis == 0) {
400 nConcurrentLumis = nConcurrentRuns;
419 optionsPset.getUntrackedParameter<
bool>(
"deleteNonConsumedUnscheduledModules");
425 auto& serviceSets = processDesc->getServicesPSets();
434 handler->willBeUsingThreads();
441 ParameterSet const& eventSetupPset(optionsPset.getUntrackedParameterSet(
"eventSetup"));
452 nConcurrentLumis = 1;
461 lumiQueue_ = std::make_unique<LimitedTaskQueue>(nConcurrentLumis);
469 items.branchIDListHelper(),
470 items.thinnedAssociationsHelper(),
472 items.processConfiguration(),
494 auto ep = std::make_shared<EventPrincipal>(
preg(),
519 for (
auto& subProcessPSet : subProcessVParameterSet) {
574 actReg_->preallocateSignal_(bounds);
578 std::vector<ModuleProcessName> consumedBySubProcesses;
581 auto c = subProcess.keepOnlyConsumedUnscheduledModules(deleteModules);
582 if (consumedBySubProcesses.empty()) {
584 }
else if (not
c.empty()) {
585 std::vector<ModuleProcessName>
tmp;
586 tmp.reserve(consumedBySubProcesses.size() +
c.size());
588 consumedBySubProcesses.end(),
591 std::back_inserter(
tmp));
600 not unusedModules.empty()) {
603 edm::LogInfo(
"DeleteModules").log([&unusedModules](
auto&
l) {
604 l <<
"Following modules are not in any Path or EndPath, nor is their output consumed by any other module, "
606 "therefore they are deleted before beginJob transition.";
639 ex.
addContext(
"Calling beginJob for the source");
645 constexpr
bool mustPrefetchMayGet =
true;
647 auto const runLookup =
preg_->productLookup(
InRun);
648 auto const lumiLookup =
preg_->productLookup(
InLumi);
651 looper_->updateLookup(
InRun, *runLookup, mustPrefetchMayGet);
652 looper_->updateLookup(
InLumi, *lumiLookup, mustPrefetchMayGet);
654 looper_->updateLookup(
esp_->recordsToProxyIndices());
658 actReg_->postBeginJobSignal_();
669 "Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
678 c.call([&subProcess,
i]() { subProcess.doEndStream(
i); });
682 c.call([actReg]() { actReg->preEndJobSignal_(); });
691 c.call([actReg]() { actReg->postEndJobSignal_(); });
700 return schedule_->getAllModuleDescriptions();
716 #include "TransitionProcessors.icc"
720 bool returnValue =
false;
736 SendSourceTerminationSignalIfException sentry(
actReg_.get());
740 itemType =
input_->nextItemType();
744 sentry.completedSuccessfully();
757 return std::make_pair(
input_->reducedProcessHistoryID(),
input_->run());
776 bool firstTime =
true;
786 auto trans =
fp.processFiles(*
this);
797 throw cms::Exception(
"BadTransition") <<
"Unexpected transition change " << trans;
806 "Another exception was caught while trying to clean up lumis after the primary fatal exception.");
807 e.addAdditionalInfo(message);
808 if (
e.alreadyPrinted()) {
814 if (
e.alreadyPrinted()) {
820 if (
e.alreadyPrinted()) {
832 FDEBUG(1) <<
" \treadFile\n";
834 SendSourceTerminationSignalIfException sentry(
actReg_.get());
839 if (size < preg_->
size()) {
846 sentry.completedSuccessfully();
851 SendSourceTerminationSignalIfException sentry(
actReg_.get());
852 input_->closeFile(
fb_.get(), cleaningUpAfterException);
853 sentry.completedSuccessfully();
855 FDEBUG(1) <<
"\tcloseInputFile\n";
863 FDEBUG(1) <<
"\topenOutputFiles\n";
870 FDEBUG(1) <<
"\tcloseOutputFiles\n";
876 [
this](
auto& subProcess) { subProcess.updateBranchIDListHelper(
branchIDListHelper_->branchIDLists()); });
880 FDEBUG(1) <<
"\trespondToOpenInputFile\n";
888 FDEBUG(1) <<
"\trespondToCloseInputFile\n";
898 FDEBUG(1) <<
"\tstartingNewLoop\n";
904 looper_->setModuleChanger(&changer);
906 looper_->setModuleChanger(
nullptr);
912 FDEBUG(1) <<
"\tendOfLoop\n";
919 FDEBUG(1) <<
"\trewind\n";
924 FDEBUG(1) <<
"\tprepareForNextLoop\n";
928 FDEBUG(1) <<
"\tshouldWeCloseOutput\n";
931 if (subProcess.shouldWeCloseOutput()) {
941 FDEBUG(1) <<
"\tdoErrorStuff\n";
942 LogError(
"StateMachine") <<
"The EventProcessor state machine encountered an unexpected event\n"
943 <<
"and went to the error state\n"
944 <<
"Will attempt to terminate processing normally\n"
945 <<
"(IF using the looper the next loop will be attempted)\n"
946 <<
"This likely indicates a bug in an input module or corrupted input or both\n";
957 beginGlobalTransitionAsync<Traits>(
962 }
while (not globalWaitTask.
done());
965 std::rethrow_exception(*(globalWaitTask.
exceptionPtr()));
967 beginProcessBlockSucceeded =
true;
977 while (
input_->readProcessBlock()) {
985 beginGlobalTransitionAsync<Traits>(
990 }
while (not globalWaitTask.
done());
992 std::rethrow_exception(*(globalWaitTask.
exceptionPtr()));
999 }
while (not writeWaitTask.
done());
1023 cleaningUpAfterException);
1026 }
while (not globalWaitTask.
done());
1028 std::rethrow_exception(*(globalWaitTask.
exceptionPtr()));
1031 if (beginProcessBlockSucceeded) {
1036 }
while (not writeWaitTask.
done());
1050 bool& globalBeginSucceeded,
1051 bool& eventSetupForInstanceSucceeded) {
1052 globalBeginSucceeded =
false;
1055 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1058 sentry.completedSuccessfully();
1066 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1068 eventSetupForInstanceSucceeded =
true;
1069 sentry.completedSuccessfully();
1071 auto const& es =
esp_->eventSetupImpl();
1082 beginGlobalTransitionAsync<Traits>(
1086 }
while (not globalWaitTask.
done());
1088 std::rethrow_exception(*(globalWaitTask.
exceptionPtr()));
1091 globalBeginSucceeded =
true;
1092 FDEBUG(1) <<
"\tbeginRun " <<
run <<
"\n";
1111 }
while (not streamLoopWaitTask.
done());
1113 std::rethrow_exception(*(streamLoopWaitTask.
exceptionPtr()));
1116 FDEBUG(1) <<
"\tstreamBeginRun " <<
run <<
"\n";
1124 bool globalBeginSucceeded,
1125 bool cleaningUpAfterException,
1126 bool eventSetupForInstanceSucceeded) {
1127 if (eventSetupForInstanceSucceeded) {
1129 endRun(phid,
run, globalBeginSucceeded, cleaningUpAfterException);
1131 if (globalBeginSucceeded) {
1139 }
while (not
t.done());
1141 if (
t.exceptionPtr()) {
1142 std::rethrow_exception(*
t.exceptionPtr());
1151 bool globalBeginSucceeded,
1152 bool cleaningUpAfterException) {
1160 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1162 sentry.completedSuccessfully();
1164 auto const& es =
esp_->eventSetupImpl();
1165 if (globalBeginSucceeded) {
1178 cleaningUpAfterException);
1181 }
while (not streamLoopWaitTask.
done());
1183 std::rethrow_exception(*(streamLoopWaitTask.
exceptionPtr()));
1186 FDEBUG(1) <<
"\tstreamEndRun " <<
run <<
"\n";
1200 cleaningUpAfterException);
1203 }
while (not globalWaitTask.
done());
1205 std::rethrow_exception(*(globalWaitTask.
exceptionPtr()));
1208 FDEBUG(1) <<
"\tendRun " <<
run <<
"\n";
1222 input_->luminosityBlockAuxiliary()->beginTime()),
1228 }
while (not waitTask.
done());
1237 std::shared_ptr<void>
const& iRunResource,
1256 status->resetResources();
1272 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1275 sentry.completedSuccessfully();
1281 rng->preBeginLumi(lb);
1285 auto beginStreamsTask =
1288 status->resetResources();
1289 holder.doneWaiting(*iPtr);
1291 status->globalBeginDidSucceed();
1301 status->resetResources();
1302 holder.doneWaiting(std::current_exception());
1314 std::exception_ptr
const* exceptionFromBeginStreamLumi)
mutable {
1315 if (exceptionFromBeginStreamLumi) {
1317 tmp.doneWaiting(*exceptionFromBeginStreamLumi);
1327 auto eventSetupImpls = &
status->eventSetupImpls();
1328 auto lp =
status->lumiPrincipal().get();
1331 event.setLuminosityBlockPrincipal(lp);
1333 beginStreamTransitionAsync<Traits>(
WaitingTaskHolder(*holder.group(), eventTask),
1351 beginGlobalTransitionAsync<Traits>(
1355 status->resetResources();
1362 [
this, lumiWorkLambda =
std::move(lumiWork), iHolder](std::exception_ptr
const* iPtr)
mutable {
1379 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1385 iSync, queueLumiWorkTaskHolder,
status->endIOVWaitingTasks(),
status->eventSetupImpls());
1386 sentry.completedSuccessfully();
1388 queueLumiWorkTaskHolder.doneWaiting(std::current_exception());
1400 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1407 iSync, queueLumiWorkTaskHolder,
status->endIOVWaitingTasks(),
status->eventSetupImpls());
1408 sentry.completedSuccessfully();
1411 queueLumiWorkTaskHolder.doneWaiting(std::current_exception());
1420 status->needToContinueLumi();
1421 status->startProcessingEvents();
1424 unsigned int streamIndex = 0;
1425 tbb::task_arena arena{tbb::task_arena::attach()};
1429 iHolder.
group()->run(
1436 tmp.doneWaiting(*iPtr);
1443 std::shared_ptr<LuminosityBlockProcessingStatus> iLumiStatus) {
1446 auto& lp = *(iLumiStatus->lumiPrincipal());
1447 bool didGlobalBeginSucceed = iLumiStatus->didGlobalBeginSucceed();
1448 bool cleaningUpAfterException = iLumiStatus->cleaningUpAfterException();
1450 std::vector<std::shared_ptr<const EventSetupImpl>>
const* eventSetupImpls = &iLumiStatus->eventSetupImpls();
1453 tbb::task_group& taskGroup = *iTask.
group();
1456 std::exception_ptr ptr;
1464 auto& lumiPrincipal = *(
status->lumiPrincipal());
1469 ptr = std::current_exception();
1480 ptr = std::current_exception();
1485 status->resumeGlobalLumiQueue();
1489 ptr = std::current_exception();
1497 status->resetResources();
1501 ptr = std::current_exception();
1511 [
this, didGlobalBeginSucceed, &lumiPrincipal = lp,
task =
WaitingTaskHolder(taskGroup, finalTaskForThisLumi)](
1512 std::exception_ptr
const* iExcept)
mutable {
1514 task.doneWaiting(*iExcept);
1517 if (didGlobalBeginSucceed) {
1532 cleaningUpAfterException);
1547 if (
status->streamFinishedLumi()) {
1557 lumiStatus->setEndTime();
1561 bool cleaningUpAfterException = lumiStatus->cleaningUpAfterException();
1562 auto eventSetupImpls = &lumiStatus->eventSetupImpls();
1564 if (lumiStatus->didGlobalBeginSucceed()) {
1565 auto& lumiPrincipal = *lumiStatus->lumiPrincipal();
1567 lumiPrincipal.endTime());
1570 endStreamTransitionAsync<Traits>(
std::move(lumiDoneTask),
1576 cleaningUpAfterException);
1593 }
while (not globalWaitTask.
done());
1595 std::rethrow_exception(*(globalWaitTask.
exceptionPtr()));
1603 <<
"Illegal attempt to insert run into cache\n"
1604 <<
"Contact a Framework Developer\n";
1606 auto rp = std::make_shared<RunPrincipal>(
input_->runAuxiliary(),
1614 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1616 sentry.completedSuccessfully();
1618 assert(
input_->reducedProcessHistoryID() == rp->reducedProcessHistoryID());
1620 return std::make_pair(rp->reducedProcessHistoryID(),
input_->run());
1627 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1628 input_->readAndMergeRun(*runPrincipal);
1629 sentry.completedSuccessfully();
1631 assert(
input_->reducedProcessHistoryID() == runPrincipal->reducedProcessHistoryID());
1632 return std::make_pair(runPrincipal->reducedProcessHistoryID(),
input_->run());
1638 <<
"Illegal attempt to insert lumi into cache\n"
1639 <<
"Run is invalid\n"
1640 <<
"Contact a Framework Developer\n";
1644 lbp->setAux(*
input_->luminosityBlockAuxiliary());
1646 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1648 sentry.completedSuccessfully();
1656 assert(lumiPrincipal.aux().sameIdentity(*
input_->luminosityBlockAuxiliary())
or
1657 input_->processHistoryRegistry().reducedProcessHistoryID(lumiPrincipal.aux().processHistoryID()) ==
1658 input_->processHistoryRegistry().reducedProcessHistoryID(
1659 input_->luminosityBlockAuxiliary()->processHistoryID()));
1660 bool lumiOK = lumiPrincipal.adjustToNewProductRegistry(*
preg());
1662 lumiPrincipal.mergeAuxiliary(*
input_->luminosityBlockAuxiliary());
1664 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1666 sentry.completedSuccessfully();
1668 return input_->luminosityBlock();
1674 task.doneWaiting(*iExcept);
1678 s.writeProcessBlockAsync(
task, processBlockType);
1694 [
this, phid,
run,
task, mergeableRunProductMetadata](std::exception_ptr
const* iExcept)
mutable {
1696 task.doneWaiting(*iExcept);
1700 s.writeRunAsync(
task, phid,
run, mergeableRunProductMetadata);
1709 mergeableRunProductMetadata);
1715 FDEBUG(1) <<
"\tdeleteRunFromCache " <<
run <<
"\n";
1721 task.doneWaiting(*iExcept);
1725 s.writeLumiAsync(
task, lumiPrincipal);
1767 std::lock_guard<std::recursive_mutex> guard(*(
sourceMutex_.get()));
1779 input_->luminosityBlockAuxiliary()->beginTime()));
1796 bool expected =
false;
1815 auto recursionTask =
make_waiting_task([
this, iTask, iStreamIndex](std::exception_ptr
const* iPtr)
mutable {
1819 bool expected =
false;
1837 if (
status->isLumiEnding()) {
1853 bool expected =
false;
1855 auto e = std::current_exception();
1868 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1872 sentry.completedSuccessfully();
1874 FDEBUG(1) <<
"\treadEvent\n";
1888 rng->postEventRead(
ev);
1892 *iHolder.
group(),
make_waiting_task([
this, pep, iHolder, iStreamIndex](std::exception_ptr
const* iPtr)
mutable {
1899 FDEBUG(1) <<
"\tprocessEvent\n";
1900 pep->clearEventPrincipal();
1909 afterProcessTask =
std::move(finalizeEventTask);
1915 make_waiting_task([
this, pep, finalizeEventTask, iStreamIndex](std::exception_ptr
const* iPtr)
mutable {
1921 subProcess.doEventAsync(finalizeEventTask, *pep, &
streamLumiStatus_[iStreamIndex]->eventSetupImpls());
1935 bool randomAccess =
input_->randomAccess();
1944 status =
looper_->doDuringLoop(iPrincipal, es, pc, &streamContext);
1963 FDEBUG(1) <<
"\tshouldWeStop\n";
1968 if (subProcess.terminate()) {
1984 bool expected =
false;
1993 std::unique_ptr<LogSystem>
s;
1994 for (
auto worker :
schedule_->allWorkers()) {
1995 if (worker->wantsGlobalLuminosityBlocks() and worker->globalLuminosityBlocksQueue()) {
1997 s = std::make_unique<LogSystem>(
"ModulesSynchingOnLumis");
1998 (*s) <<
"The following modules require synchronizing on LuminosityBlock boundaries:";
2000 (*s) <<
"\n " << worker->description()->moduleName() <<
" " << worker->description()->moduleLabel();