84 #include "boost/range/adaptor/reversed.hpp" 96 #include "oneapi/tbb/task.h" 104 class PauseQueueSentry {
107 ~PauseQueueSentry() { queue_.resume(); }
116 namespace chain = waiting_task::chain;
119 std::unique_ptr<InputSource>
makeInput(
unsigned int moduleIndex,
122 std::shared_ptr<ProductRegistry> preg,
123 std::shared_ptr<BranchIDListHelper> branchIDListHelper,
124 std::shared_ptr<ProcessBlockHelper>
const& processBlockHelper,
125 std::shared_ptr<ThinnedAssociationsHelper> thinnedAssociationsHelper,
126 std::shared_ptr<ActivityRegistry> areg,
127 std::shared_ptr<ProcessConfiguration const> processConfiguration,
130 if (main_input ==
nullptr) {
132 <<
"There must be exactly one source in the configuration.\n" 133 <<
"It is missing (or there are sufficient syntax errors such that it is not recognized as the source)\n";
138 std::unique_ptr<ParameterSetDescriptionFillerBase>
filler(
141 filler->fill(descriptions);
146 std::ostringstream ost;
147 ost <<
"Validating configuration of input source of type " << modtype;
163 processConfiguration.get(),
170 thinnedAssociationsHelper,
174 common.maxSecondsUntilRampdown_,
177 areg->preSourceConstructionSignal_(md);
178 std::unique_ptr<InputSource>
input;
181 std::shared_ptr<int> sentry(
nullptr, [areg, &md](
void*) { areg->postSourceConstructionSignal_(md); });
184 input->preEventReadFromSourceSignal_.connect(std::cref(areg->preEventReadFromSourceSignal_));
185 input->postEventReadFromSourceSignal_.connect(std::cref(areg->postEventReadFromSourceSignal_));
188 std::ostringstream ost;
189 ost <<
"Constructing input source of type " << modtype;
200 std::vector<std::string>
const& loopers) {
201 std::shared_ptr<EDLooperBase> vLooper;
203 assert(1 == loopers.size());
205 for (
auto const& looperName : loopers) {
217 std::vector<std::string>
const& defaultServices,
218 std::vector<std::string>
const& forcedServices)
221 branchIDListHelper_(),
225 espController_(
std::make_unique<eventsetup::EventSetupsController>(moduleTypeResolverMaker_.
get())),
228 processConfiguration_(),
234 deferredExceptionPtrIsSet_(
false),
238 beginJobCalled_(
false),
239 shouldWeStop_(
false),
240 fileModeNoMerge_(
false),
241 exceptionMessageFiles_(),
242 exceptionMessageRuns_(
false),
243 exceptionMessageLumis_(
false),
244 forceLooperToEnd_(
false),
245 looperBeginJobRun_(
false),
246 forceESCacheClearOnNewRun_(
false),
247 eventSetupDataToExcludeFromPrefetching_() {
249 processDesc->addServices(defaultServices, forcedServices);
250 init(processDesc, iToken, iLegacy);
254 std::vector<std::string>
const& defaultServices,
255 std::vector<std::string>
const& forcedServices)
258 branchIDListHelper_(),
262 espController_(
std::make_unique<eventsetup::EventSetupsController>(moduleTypeResolverMaker_.
get())),
265 processConfiguration_(),
271 deferredExceptionPtrIsSet_(
false),
275 beginJobCalled_(
false),
276 shouldWeStop_(
false),
277 fileModeNoMerge_(
false),
278 exceptionMessageFiles_(),
279 exceptionMessageRuns_(
false),
280 exceptionMessageLumis_(
false),
281 forceLooperToEnd_(
false),
282 looperBeginJobRun_(
false),
283 forceESCacheClearOnNewRun_(
false),
284 eventSetupDataToExcludeFromPrefetching_() {
286 processDesc->addServices(defaultServices, forcedServices);
295 branchIDListHelper_(),
299 espController_(
std::make_unique<eventsetup::EventSetupsController>(moduleTypeResolverMaker_.
get())),
302 processConfiguration_(),
308 deferredExceptionPtrIsSet_(
false),
312 beginJobCalled_(
false),
313 shouldWeStop_(
false),
314 fileModeNoMerge_(
false),
315 exceptionMessageFiles_(),
316 exceptionMessageRuns_(
false),
317 exceptionMessageLumis_(
false),
318 forceLooperToEnd_(
false),
319 looperBeginJobRun_(
false),
320 forceESCacheClearOnNewRun_(
false),
321 eventSetupDataToExcludeFromPrefetching_() {
336 std::shared_ptr<ParameterSet>
parameterSet = processDesc->getProcessPSet();
340 bool const hasSubProcesses = !subProcessVParameterSet.empty();
353 <<
"Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
361 unsigned int nThreads = optionsPset.getUntrackedParameter<
unsigned int>(
"numberOfThreads");
367 unsigned int nStreams = optionsPset.getUntrackedParameter<
unsigned int>(
"numberOfStreams");
371 unsigned int nConcurrentLumis =
372 optionsPset.getUntrackedParameter<
unsigned int>(
"numberOfConcurrentLuminosityBlocks");
373 if (nConcurrentLumis == 0) {
374 nConcurrentLumis = 2;
376 if (nConcurrentLumis > nStreams) {
377 nConcurrentLumis = nStreams;
379 unsigned int nConcurrentRuns = optionsPset.getUntrackedParameter<
unsigned int>(
"numberOfConcurrentRuns");
380 if (nConcurrentRuns == 0 || nConcurrentRuns > nConcurrentLumis) {
381 nConcurrentRuns = nConcurrentLumis;
384 if (!loopers.empty()) {
386 if (nStreams != 1 || nConcurrentLumis != 1 || nConcurrentRuns != 1) {
387 edm::LogWarning(
"ThreadStreamSetup") <<
"There is a looper, so the number of streams, the number " 388 "of concurrent runs, and the number of concurrent lumis " 389 "are all being reset to 1. Loopers cannot currently support " 390 "values greater than 1.";
392 nConcurrentLumis = 1;
396 bool dumpOptions = optionsPset.getUntrackedParameter<
bool>(
"dumpOptions");
400 if (nThreads > 1
or nStreams > 1) {
401 edm::LogInfo(
"ThreadStreamSetup") <<
"setting # threads " << nThreads <<
"\nsetting # streams " << nStreams;
410 unsigned int maxConcurrentIOVs =
411 3 * nConcurrentRuns - 2 + ((nConcurrentLumis > nConcurrentRuns) ? (nConcurrentLumis - nConcurrentRuns) : 0);
417 optionsPset.getUntrackedParameter<
bool>(
"deleteNonConsumedUnscheduledModules");
420 if (not hasSubProcesses) {
424 auto referencePSets =
425 optionsPset.getUntrackedParameter<std::vector<edm::ParameterSet>>(
"holdsReferencesToDeleteEarly");
426 for (
auto const&
pset : referencePSets) {
428 auto references =
pset.getParameter<std::vector<std::string>>(
"references");
429 for (
auto const& ref : references) {
434 optionsPset.getUntrackedParameter<std::vector<std::string>>(
"modulesToIgnoreForDeleteEarly");
441 auto& serviceSets = processDesc->getServicesPSets();
451 handler->willBeUsingThreads();
458 ParameterSet const& eventSetupPset(optionsPset.getUntrackedParameterSet(
"eventSetup"));
463 if (!loopers.empty()) {
474 runQueue_ = std::make_unique<LimitedTaskQueue>(nConcurrentRuns);
475 lumiQueue_ = std::make_unique<LimitedTaskQueue>(nConcurrentLumis);
483 std::optional<ScheduleItems::MadeModules> madeModules;
486 tbb::task_group
group;
489 auto tempReg = std::make_shared<ProductRegistry>();
492 group.run([&,
this]() {
500 group.run([&,
this, tempReg]() {
506 items.branchIDListHelper(),
508 items.thinnedAssociationsHelper(),
510 items.processConfiguration(),
515 items.preg()->addFromInput(*tempReg);
545 auto ep = std::make_shared<EventPrincipal>(
preg(),
557 auto rp = std::make_unique<RunPrincipal>(
578 for (
auto& subProcessPSet : subProcessVParameterSet) {
650 std::vector<ModuleProcessName> consumedBySubProcesses;
653 auto c = subProcess.keepOnlyConsumedUnscheduledModules(deleteModules);
654 if (consumedBySubProcesses.empty()) {
656 }
else if (not
c.empty()) {
657 std::vector<ModuleProcessName>
tmp;
658 tmp.reserve(consumedBySubProcesses.size() +
c.size());
660 consumedBySubProcesses.end(),
663 std::back_inserter(
tmp));
672 not unusedModules.empty()) {
675 edm::LogInfo(
"DeleteModules").log([&unusedModules](
auto&
l) {
676 l <<
"Following modules are not in any Path or EndPath, nor is their output consumed by any other module, " 678 "therefore they are deleted before beginJob transition.";
695 schedule_->initializeEarlyDelete(branchesToDeleteEarly, referencesToBranches, modulesToSkip, *
preg_);
725 ex.
addContext(
"Calling beginJob for the source");
731 constexpr
bool mustPrefetchMayGet =
true;
733 auto const runLookup =
preg_->productLookup(
InRun);
734 auto const lumiLookup =
preg_->productLookup(
InLumi);
737 looper_->updateLookup(
InRun, *runLookup, mustPrefetchMayGet);
738 looper_->updateLookup(
InLumi, *lumiLookup, mustPrefetchMayGet);
740 looper_->updateLookup(
esp_->recordsToProxyIndices());
744 actReg_->postBeginJobSignal_();
746 oneapi::tbb::task_group
group;
749 first([
this](
auto nextTask) {
751 first([
i,
this](
auto nextTask) {
766 "Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
773 oneapi::tbb::task_group
group;
781 first([
this,
i, &
c, &collectorMutex](
auto nextTask) {
782 std::exception_ptr
ep;
787 ep = std::current_exception();
790 std::lock_guard<std::mutex>
l(collectorMutex);
791 c.call([&
ep]() { std::rethrow_exception(
ep); });
793 }) |
then([
this,
i, &
c, &collectorMutex](
auto nextTask) {
795 first([
this,
i, &
c, &collectorMutex, &subProcess](
auto nextTask) {
796 std::exception_ptr
ep;
799 subProcess.doEndStream(
i);
801 ep = std::current_exception();
804 std::lock_guard<std::mutex>
l(collectorMutex);
805 c.call([&
ep]() { std::rethrow_exception(
ep); });
812 waitTask.waitNoThrow();
815 c.call([actReg]() { actReg->preEndJobSignal_(); });
824 c.call([actReg]() { actReg->postEndJobSignal_(); });
833 return schedule_->getAllModuleDescriptions();
845 #include "TransitionProcessors.icc" 849 bool returnValue =
false;
864 itemType =
input_->nextItemType();
868 sentry.completedSuccessfully();
890 bool firstTime =
true;
900 auto trans =
fp.processFiles(*
this);
911 throw cms::Exception(
"BadTransition") <<
"Unexpected transition change " << trans;
920 "Another exception was caught while trying to clean up lumis after the primary fatal exception.");
921 e.addAdditionalInfo(message);
922 if (
e.alreadyPrinted()) {
928 "Another exception was caught while trying to clean up runs after the primary fatal exception.");
929 e.addAdditionalInfo(message);
930 if (
e.alreadyPrinted()) {
936 if (
e.alreadyPrinted()) {
946 FDEBUG(1) <<
" \treadFile\n";
952 streamRunStatus_[0]->runPrincipal()->adjustIndexesAfterProductRegistryAddition();
956 streamLumiStatus_[0]->lumiPrincipal()->adjustIndexesAfterProductRegistryAddition();
960 if (size < preg_->
size()) {
967 sentry.completedSuccessfully();
973 input_->closeFile(
fb_.get(), cleaningUpAfterException);
974 sentry.completedSuccessfully();
976 FDEBUG(1) <<
"\tcloseInputFile\n";
984 FDEBUG(1) <<
"\topenOutputFiles\n";
991 FDEBUG(1) <<
"\tcloseOutputFiles\n";
997 [
this](
auto& subProcess) { subProcess.updateBranchIDListHelper(
branchIDListHelper_->branchIDLists()); });
1001 FDEBUG(1) <<
"\trespondToOpenInputFile\n";
1009 FDEBUG(1) <<
"\trespondToCloseInputFile\n";
1019 FDEBUG(1) <<
"\tstartingNewLoop\n";
1025 looper_->setModuleChanger(&changer);
1027 looper_->setModuleChanger(
nullptr);
1033 FDEBUG(1) <<
"\tendOfLoop\n";
1040 FDEBUG(1) <<
"\trewind\n";
1045 FDEBUG(1) <<
"\tprepareForNextLoop\n";
1049 FDEBUG(1) <<
"\tshouldWeCloseOutput\n";
1052 if (subProcess.shouldWeCloseOutput()) {
1058 return schedule_->shouldWeCloseOutput();
1062 FDEBUG(1) <<
"\tdoErrorStuff\n";
1063 LogError(
"StateMachine") <<
"The EventProcessor state machine encountered an unexpected event\n" 1064 <<
"and went to the error state\n" 1065 <<
"Will attempt to terminate processing normally\n" 1066 <<
"(IF using the looper the next loop will be attempted)\n" 1067 <<
"This likely indicates a bug in an input module or corrupted input or both\n";
1078 beginGlobalTransitionAsync<Traits>(
1081 globalWaitTask.wait();
1082 beginProcessBlockSucceeded =
true;
1086 input_->fillProcessBlockHelper();
1088 while (
input_->nextProcessBlock(processBlockPrincipal)) {
1095 beginGlobalTransitionAsync<Traits>(
1098 globalWaitTask.wait();
1102 writeWaitTask.wait();
1123 cleaningUpAfterException);
1124 globalWaitTask.wait();
1126 if (beginProcessBlockSucceeded) {
1129 writeWaitTask.wait();
1152 runStatus->runPrincipal()->reducedProcessHistoryID() ==
input_->reducedProcessHistoryID()) {
1158 runStatus->setHolderOfTaskInProcessRuns(holder);
1175 actReg_->esSyncIOVQueuingSignal_.emit(iSync);
1182 status->endIOVWaitingTasks(),
1183 status->eventSetupImpls(),
1188 }) |
chain::then([
this,
status, iSync](std::exception_ptr
const* iException,
auto nextTask) {
1196 actReg_->postESSyncIOVSignal_.emit(iSync);
1202 if (postRunQueueTask.taskHasFailed()) {
1203 status->resetBeginResources();
1211 *postRunQueueTask.group(), [
this, postSourceTask = postRunQueueTask,
status]()
mutable {
1215 if (postSourceTask.taskHasFailed()) {
1216 status->resetBeginResources();
1218 status->resumeGlobalRunQueue();
1228 sentry.completedSuccessfully();
1235 oneapi::tbb::task_group
group;
1240 }) |
then([
this, &es](
auto nextTask)
mutable {
1251 status->setStopBeforeProcessingRun(
true);
1252 nextTask.doneWaiting(std::current_exception());
1254 }) |
then([
this,
status, &es](
auto nextTask) {
1255 if (
status->stopBeforeProcessingRun()) {
1260 beginGlobalTransitionAsync<Traits>(
1263 if (
status->stopBeforeProcessingRun()) {
1266 status->globalBeginDidSucceed();
1268 if (
status->stopBeforeProcessingRun()) {
1274 if (
status->stopBeforeProcessingRun()) {
1279 }) |
then([
this,
status](std::exception_ptr
const* iException,
auto holder)
mutable {
1280 bool precedingTasksSucceeded =
true;
1282 precedingTasksSucceeded =
false;
1287 if (
status->stopBeforeProcessingRun()) {
1289 status->resetBeginResources();
1291 status->resumeGlobalRunQueue();
1297 auto globalEndRunTask =
1300 status->holderOfTaskInProcessRuns().doneWaiting(std::exception_ptr{});
1305 status->resetBeginResources();
1307 status->resumeGlobalRunQueue();
1308 holder.doneWaiting(std::current_exception());
1323 *holder.group(), [
this,
status, precedingTasksSucceeded, holder]()
mutable {
1328 [
this,
i,
status, precedingTasksSucceeded, holder]()
mutable {
1333 if (
status->streamFinishedBeginRun()) {
1336 status->resetBeginResources();
1346 status->resetBeginResources();
1353 status->resetBeginResources();
1355 status->resumeGlobalRunQueue();
1356 postSourceTask.doneWaiting(std::current_exception());
1360 status->resetBeginResources();
1362 status->resumeGlobalRunQueue();
1363 postRunQueueTask.doneWaiting(std::current_exception());
1367 status->resetBeginResources();
1369 nextTask.doneWaiting(std::current_exception());
1375 std::shared_ptr<RunProcessingStatus>
status,
1376 bool precedingTasksSucceeded,
1385 chain::first([
this, iStream, precedingTasksSucceeded](
auto nextTask) {
1386 if (precedingTasksSucceeded) {
1391 beginStreamTransitionAsync<Traits>(
1394 }) |
then([
this, iStream](std::exception_ptr
const* exceptionFromBeginStreamRun,
auto nextTask) {
1395 if (exceptionFromBeginStreamRun) {
1396 nextTask.doneWaiting(*exceptionFromBeginStreamRun);
1408 if (
status->streamFinishedBeginRun()) {
1409 status->resetBeginResources();
1416 RunPrincipal& runPrincipal = *iRunStatus->runPrincipal();
1429 iRunStatus->endIOVWaitingTasksEndRun(),
1430 iRunStatus->eventSetupImplsEndRun(),
1434 }) |
chain::then([
this, iRunStatus, ts](std::exception_ptr
const* iException,
auto nextTask) {
1436 iRunStatus->setEndingEventSetupSucceeded(
false);
1448 streamQueues_[
i].push(*nextTask.group(), [
this,
i, nextTask]()
mutable {
1475 tmp.doneWaiting(iException);
1480 auto& runPrincipal = *(iRunStatus->runPrincipal());
1481 bool didGlobalBeginSucceed = iRunStatus->didGlobalBeginSucceed();
1482 bool cleaningUpAfterException = iRunStatus->cleaningUpAfterException() || iTask.
taskHasFailed();
1483 EventSetupImpl const& es = iRunStatus->eventSetupImplEndRun(
esp_->subProcessIndex());
1484 std::vector<std::shared_ptr<const EventSetupImpl>>
const* eventSetupImpls = &iRunStatus->eventSetupImplsEndRun();
1485 bool endingEventSetupSucceeded = iRunStatus->endingEventSetupSucceeded();
1489 chain::first([
this, &runPrincipal, &es, &eventSetupImpls, cleaningUpAfterException, endingEventSetupSucceeded](
1491 if (endingEventSetupSucceeded) {
1494 endGlobalTransitionAsync<Traits>(
1499 [
this, &runPrincipal, &es](
auto nextTask) {
1503 [
this, &runPrincipal, &es](
auto nextTask) {
1507 ifThen(didGlobalBeginSucceed && endingEventSetupSucceeded,
1508 [
this, mergeableRunProductMetadata, &runPrincipal = runPrincipal](
auto nextTask) {
1510 writeRunAsync(nextTask, runPrincipal, mergeableRunProductMetadata);
1514 didGlobalBeginSucceed,
1515 mergeableRunProductMetadata,
1516 endingEventSetupSucceeded](std::exception_ptr
const* iException,
auto nextTask)
mutable {
1517 if (didGlobalBeginSucceed && endingEventSetupSucceeded) {
1518 mergeableRunProductMetadata->postWriteRun();
1525 std::exception_ptr ptr;
1532 ptr = std::current_exception();
1536 status->resumeGlobalRunQueue();
1540 ptr = std::current_exception();
1544 status->resetEndResources();
1548 ptr = std::current_exception();
1552 if (ptr && !iException) {
1578 if (runStatus->streamFinishedRun()) {
1579 runStatus->globalEndRunHolder().doneWaiting(std::exception_ptr());
1590 if (runStatus->didGlobalBeginSucceed() && runStatus->endingEventSetupSucceeded()) {
1592 auto eventSetupImpls = &runStatus->eventSetupImplsEndRun();
1593 bool cleaningUpAfterException = runStatus->cleaningUpAfterException() || iTask.
taskHasFailed();
1595 auto& runPrincipal = *runStatus->runPrincipal();
1598 endStreamTransitionAsync<Traits>(
std::move(runDoneTaskHolder),
1604 cleaningUpAfterException);
1616 runStatus->setCleaningUpAfterException(cleaningUpAfterException);
1618 runStatus->setHolderOfTaskInProcessRuns(holder);
1626 std::shared_ptr<RunProcessingStatus> iRunStatus,
1628 actReg_->esSyncIOVQueuingSignal_.emit(iSync);
1634 status->endIOVWaitingTasks(),
1635 status->eventSetupImpls(),
1639 }) |
chain::then([
this,
status, iRunStatus, iSync](std::exception_ptr
const* iException,
auto nextTask) {
1648 actReg_->postESSyncIOVSignal_.emit(iSync);
1654 if (postLumiQueueTask.taskHasFailed()) {
1655 status->resetResources();
1664 *postLumiQueueTask.group(),
1665 [
this, postSourceTask = postLumiQueueTask,
status, iRunStatus]()
mutable {
1669 if (postSourceTask.taskHasFailed()) {
1670 status->resetResources();
1682 sentry.completedSuccessfully();
1688 rng->preBeginLumi(lb);
1697 }) |
then([
this,
status, &es, &lumiPrincipal](
auto nextTask) {
1700 beginGlobalTransitionAsync<Traits>(
1706 status->globalBeginDidSucceed();
1709 }) |
then([
this,
status, iRunStatus](std::exception_ptr
const* iException,
auto holder)
mutable {
1711 status->resetResources();
1718 status->globalBeginDidSucceed();
1721 status->setGlobalEndRunHolder(iRunStatus->globalEndRunHolder());
1735 auto eventSetupImpls = &
status->eventSetupImpls();
1736 auto lp =
status->lumiPrincipal().get();
1739 event.setLuminosityBlockPrincipal(lp);
1743 beginStreamTransitionAsync<Traits>(
std::move(nextTask),
1750 then([
this,
i](std::exception_ptr
const* exceptionFromBeginStreamLumi,
1752 if (exceptionFromBeginStreamLumi) {
1754 copyHolder.
doneWaiting(*exceptionFromBeginStreamLumi);
1765 status->resetResources();
1773 status->resetResources();
1781 status->resetResources();
1797 status->lumiPrincipal()->luminosityBlock() ==
input_->luminosityBlock()) {
1803 unsigned int streamIndex = 0;
1804 oneapi::tbb::task_arena arena{oneapi::tbb::task_arena::attach()};
1808 nextTask.group()->run(
1818 tmp.doneWaiting(iException);
1823 std::shared_ptr<LuminosityBlockProcessingStatus> iLumiStatus) {
1826 auto& lp = *(iLumiStatus->lumiPrincipal());
1827 bool didGlobalBeginSucceed = iLumiStatus->didGlobalBeginSucceed();
1828 bool cleaningUpAfterException = iLumiStatus->cleaningUpAfterException() || iTask.
taskHasFailed();
1830 std::vector<std::shared_ptr<const EventSetupImpl>>
const* eventSetupImpls = &iLumiStatus->eventSetupImpls();
1833 chain::first([
this, &lp, &es, &eventSetupImpls, cleaningUpAfterException](
auto nextTask) {
1838 endGlobalTransitionAsync<Traits>(
1840 }) |
then([
this, didGlobalBeginSucceed, &lumiPrincipal = lp](
auto nextTask) {
1842 if (didGlobalBeginSucceed) {
1851 }) |
then([
status =
std::move(iLumiStatus),
this](std::exception_ptr
const* iException,
auto nextTask)
mutable {
1857 std::exception_ptr ptr;
1865 ptr = std::current_exception();
1871 ptr = std::current_exception();
1876 status->resetResources();
1877 status->globalEndRunHolderDoneWaiting();
1881 ptr = std::current_exception();
1885 if (ptr && !iException) {
1904 if (
status->streamFinishedLumi()) {
1914 lumiStatus->setEndTime();
1917 auto eventSetupImpls = &lumiStatus->eventSetupImpls();
1918 bool cleaningUpAfterException = lumiStatus->cleaningUpAfterException() || iTask.
taskHasFailed();
1920 if (lumiStatus->didGlobalBeginSucceed()) {
1921 auto& lumiPrincipal = *lumiStatus->lumiPrincipal();
1924 endStreamTransitionAsync<Traits>(
std::move(lumiDoneTask),
1930 cleaningUpAfterException);
1946 globalWaitTask.wait();
1953 input_->readProcessBlock(processBlockPrincipal);
1954 sentry.completedSuccessfully();
1960 rp->setAux(*
input_->runAuxiliary());
1964 sentry.completedSuccessfully();
1966 assert(
input_->reducedProcessHistoryID() == rp->reducedProcessHistoryID());
1978 input_->readAndMergeRun(runPrincipal);
1979 sentry.completedSuccessfully();
1986 lbp->setAux(*
input_->luminosityBlockAuxiliary());
1990 sentry.completedSuccessfully();
1998 assert(lumiPrincipal.aux().sameIdentity(*
input_->luminosityBlockAuxiliary())
or 1999 input_->processHistoryRegistry().reducedProcessHistoryID(lumiPrincipal.aux().processHistoryID()) ==
2000 input_->processHistoryRegistry().reducedProcessHistoryID(
2001 input_->luminosityBlockAuxiliary()->processHistoryID()));
2002 bool lumiOK = lumiPrincipal.adjustToNewProductRegistry(*
preg());
2004 lumiPrincipal.mergeAuxiliary(*
input_->luminosityBlockAuxiliary());
2008 sentry.completedSuccessfully();
2021 s.writeProcessBlockAsync(nextTask, processBlockType);
2037 s.writeRunAsync(nextTask, runPrincipal, mergeableRunProductMetadata);
2062 s.writeLumiAsync(nextTask, lumiPrincipal);
2072 iStatus.
lumiPrincipal()->setRunPrincipal(std::shared_ptr<RunPrincipal>());
2085 std::lock_guard<std::recursive_mutex> guard(*(
sourceMutex_.get()));
2089 status->runPrincipal()->reducedProcessHistoryID() ==
input_->reducedProcessHistoryID()) {
2090 if (
status->holderOfTaskInProcessRuns().taskHasFailed()) {
2091 status->setStopBeforeProcessingRun(
true);
2098 status->setStopBeforeProcessingRun(
true);
2099 holder.doneWaiting(std::current_exception());
2112 std::lock_guard<std::recursive_mutex> guard(*(
sourceMutex_.get()));
2116 iLumiStatus->lumiPrincipal()->luminosityBlock() ==
input_->luminosityBlock()) {
2121 holder.doneWaiting(std::current_exception());
2129 iRunStatus->holderOfTaskInProcessRuns().doneWaiting(std::exception_ptr{});
2134 input_->luminosityBlockAuxiliary()->beginTime()),
2150 unsigned int iStreamIndex,
2182 std::lock_guard<std::recursive_mutex> guard(*(
sourceMutex_.get()));
2197 iStatus.
lumiPrincipal()->runPrincipal().reducedProcessHistoryID() !=
input_->reducedProcessHistoryID()))) {
2216 auto recursionTask =
2217 make_waiting_task([
this, iTask, iStreamIndex](std::exception_ptr
const* iEventException)
mutable {
2218 if (iEventException) {
2232 if (not
status->haveStartedNextLumiOrEndedRun()) {
2233 status->startNextLumiOrEndRun();
2237 input_->luminosityBlockAuxiliary()->beginTime()),
2256 if (runStatus->holderOfTaskInProcessRuns().hasTask()) {
2257 runStatus->holderOfTaskInProcessRuns().doneWaiting(std::exception_ptr{});
2279 sentry.completedSuccessfully();
2281 FDEBUG(1) <<
"\treadEvent\n";
2295 rng->postEventRead(
ev);
2300 chain::first([
this, &es, pep, iStreamIndex](
auto nextTask) {
2305 subProcess.doEventAsync(nextTask, *pep, &
streamLumiStatus_[iStreamIndex]->eventSetupImpls());
2307 }) |
ifThen(
looper_, [
this, iStreamIndex, pep](
auto nextTask) {
2311 }) |
then([pep](
auto nextTask) {
2312 FDEBUG(1) <<
"\tprocessEvent\n";
2313 pep->clearEventPrincipal();
2318 bool randomAccess =
input_->randomAccess();
2327 status =
looper_->doDuringLoop(iPrincipal, es, pc, &streamContext);
2345 FDEBUG(1) <<
"\tshouldWeStop\n";
2350 if (subProcess.terminate()) {
2366 bool expected =
false;
2376 ex <<
"The framework is configured to use at least two streams, but the following modules\n" 2377 <<
"require synchronizing on LuminosityBlock boundaries:";
2379 for (
auto worker :
schedule_->allWorkers()) {
2380 if (worker->wantsGlobalLuminosityBlocks() and worker->globalLuminosityBlocksQueue()) {
2382 ex <<
"\n " << worker->description()->moduleName() <<
" " << worker->description()->moduleLabel();
2386 ex <<
"\n\nThe situation can be fixed by either\n" 2387 <<
" * modifying the modules to support concurrent LuminosityBlocks (preferred), or\n" 2388 <<
" * setting 'process.options.numberOfConcurrentLuminosityBlocks = 1' in the configuration file";
2394 std::unique_ptr<LogSystem>
s;
2395 for (
auto worker :
schedule_->allWorkers()) {
2396 if (worker->wantsGlobalRuns() and worker->globalRunsQueue()) {
2398 s = std::make_unique<LogSystem>(
"ModulesSynchingOnRuns");
2399 (*s) <<
"The following modules require synchronizing on Run boundaries:";
2401 (*s) <<
"\n " << worker->description()->moduleName() <<
" " << worker->description()->moduleLabel();
2407 std::unique_ptr<LogSystem>
s;
2408 for (
auto worker :
schedule_->allWorkers()) {
2411 s = std::make_unique<LogSystem>(
"LegacyModules");
2412 (*s) <<
"The following legacy modules are configured. Support for legacy modules\n" 2413 "is going to end soon. These modules need to be converted to have type\n" 2414 "edm::global, edm::stream, edm::one, or in rare cases edm::limited.";
2416 (*s) <<
"\n " << worker->description()->moduleName() <<
" " << worker->description()->moduleLabel();
LuminosityBlockNumber_t luminosityBlock() const
std::atomic< bool > exceptionMessageLumis_
bool readNextEventForStream(WaitingTaskHolder const &, unsigned int iStreamIndex, LuminosityBlockProcessingStatus &)
void readEvent(unsigned int iStreamIndex)
void streamEndRunAsync(WaitingTaskHolder, unsigned int iStreamIndex)
ProcessContext processContext_
void setProcessesWithMergeableRunProducts(ProductRegistry const &productRegistry)
T getParameter(std::string const &) const
void clear()
Not thread safe.
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
void clearRunPrincipal(RunProcessingStatus &)
void setExceptionMessageRuns()
void globalEndRunAsync(WaitingTaskHolder, std::shared_ptr< RunProcessingStatus >)
SharedResourcesAcquirer sourceResourcesAcquirer_
Timestamp const & endTime() const
void handleNextEventForStreamAsync(WaitingTaskHolder, unsigned int iStreamIndex)
int totalEventsFailed() const
InputSource::ItemType nextTransitionType()
std::shared_ptr< ProductRegistry const > preg() const
void warnAboutLegacyModules() const
void processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
def create(alignables, pedeDump, additionalData, outputFile, config)
std::shared_ptr< RunPrincipal > readRun()
void handleEndLumiExceptions(std::exception_ptr, WaitingTaskHolder const &)
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
static PFTauRenderPlugin instance
EventProcessingState eventProcessingState() const
SerialTaskQueue streamQueuesInserter_
void endUnfinishedRun(bool cleaningUpAfterException)
void setExceptionMessageFiles(std::string &message)
RunPrincipal const & runPrincipal() const
void processEventAsyncImpl(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
void processEventWithLooper(EventPrincipal &, unsigned int iStreamIndex)
void handleEndRunExceptions(std::exception_ptr, WaitingTaskHolder const &)
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
std::unique_ptr< edm::LimitedTaskQueue > lumiQueue_
volatile std::atomic< bool > shutdown_flag
static LuminosityBlockNumber_t maxLuminosityBlockNumber()
void clearCounters()
Clears counters used by trigger report.
ProcessBlockPrincipal & inputProcessBlockPrincipal() const
void push(oneapi::tbb::task_group &, const T &iAction)
asynchronously pushes functor iAction into queue
bool checkForAsyncStopRequest(StatusCode &)
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
void writeLumiAsync(WaitingTaskHolder, LuminosityBlockPrincipal &)
std::unique_ptr< edm::ModuleTypeResolverMaker const > makeModuleTypeResolverMaker(edm::ParameterSet const &pset)
std::shared_ptr< EDLooperBase const > looper() const
void ensureAvailableAccelerators(edm::ParameterSet const ¶meterSet)
ParameterSet getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
std::shared_ptr< RunPrincipal > getAvailableRunPrincipalPtr()
InputSource::ItemType lastTransitionType() const
std::shared_ptr< RunPrincipal > & runPrincipal()
void adjustEventsToNewProductRegistry(std::shared_ptr< ProductRegistry const >)
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
Log< level::Error, false > LogError
void adjustIndexesAfterProductRegistryAddition()
StreamID streamID() const
ParameterSet const & parameterSet(StableProvenance const &provenance, ProcessHistory const &history)
PreallocationConfiguration preallocations_
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
constexpr auto then(O &&iO)
std::vector< SubProcess > subProcesses_
static unsigned int getUniqueID()
Returns a unique id each time called. Intended to be passed to ModuleDescription's constructor's modI...
std::atomic< bool > exceptionMessageRuns_
void mergeAuxiliary(RunAuxiliary const &aux)
edm::SerialTaskQueue queueWhichWaitsForIOVsToFinish_
void validateTopLevelParameterSets(ParameterSet *processParameterSet)
MergeableRunProductProcesses mergeableRunProductProcesses_
static std::string const input
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
void fillProcessBlockPrincipal(std::string const &processName, DelayedReader *reader=nullptr)
void setEndTime(Timestamp const &time)
ProcessBlockPrincipal & processBlockPrincipal() const
edm::propagate_const< std::unique_ptr< ModuleTypeResolverMaker const > > moduleTypeResolverMaker_
void endProcessBlock(bool cleaningUpAfterException, bool beginProcessBlockSucceeded)
U second(std::pair< T, U > const &p)
std::shared_ptr< LuminosityBlockPrincipal > & lumiPrincipal()
bool firstItemAfterLumiMerge_
oneapi::tbb::task_group * group() const noexcept
std::multimap< std::string, std::string > referencesToBranches_
unsigned int numberOfThreads() const
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
ServiceToken serviceToken_
ParameterSetID id() const
std::atomic< bool > deferredExceptionPtrIsSet_
bool resume()
Resumes processing if the queue was paused.
void doneWaiting(std::exception_ptr iExcept)
ParameterSet const & registerIt()
std::shared_ptr< EDLooperBase > fillLooper(eventsetup::EventSetupsController &esController, eventsetup::EventSetupProvider &cp, ParameterSet ¶ms, std::vector< std::string > const &loopers)
std::vector< edm::SerialTaskQueue > streamQueues_
auto runLast(edm::WaitingTaskHolder iTask)
bool taskHasFailed() const noexcept
std::vector< std::string > modulesToIgnoreForDeleteEarly_
void swap(edm::DataFrameContainer &lhs, edm::DataFrameContainer &rhs)
ShouldWriteRun shouldWriteRun() const
std::unique_ptr< edm::LimitedTaskQueue > runQueue_
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
void writeRunAsync(WaitingTaskHolder, RunPrincipal const &, MergeableRunProductMetadata const *)
int merge(int argc, char *argv[])
SerialTaskQueueChain & serialQueueChain() const
static void setThrowAnException(bool v)
void writeProcessBlockAsync(WaitingTaskHolder, ProcessBlockType)
void setLastOperationSucceeded(bool value)
unsigned int numberOfStreams() const
void initialize(Schedule const *, std::shared_ptr< ProductRegistry const >)
std::vector< ModuleDescription const * > nonConsumedUnscheduledModules(edm::PathsAndConsumesOfModulesBase const &iPnC, std::vector< ModuleProcessName > &consumedByChildren)
static ServiceRegistry & instance()
void clear()
Not thread safe.
StatusCode runToCompletion()
FunctorWaitingTask< F > * make_waiting_task(F f)
void clearLumiPrincipal(LuminosityBlockProcessingStatus &)
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
bool lastOperationSucceeded() const
unsigned int numberOfRuns() const
void insert(std::unique_ptr< ProcessBlockPrincipal >)
std::vector< std::shared_ptr< RunProcessingStatus > > streamRunStatus_
std::string exceptionMessageFiles_
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
std::unique_ptr< InputSource > makeInput(unsigned int moduleIndex, ParameterSet ¶ms, CommonParams const &common, std::shared_ptr< ProductRegistry > preg, std::shared_ptr< BranchIDListHelper > branchIDListHelper, std::shared_ptr< ProcessBlockHelper > const &processBlockHelper, std::shared_ptr< ThinnedAssociationsHelper > thinnedAssociationsHelper, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, PreallocationConfiguration const &allocations)
void releaseBeginRunResources(unsigned int iStream)
std::shared_ptr< RunProcessingStatus > exceptionRunStatus_
void respondToCloseInputFile()
InputSource::ItemType lastSourceTransition_
Log< level::Info, false > LogInfo
void readAndMergeRun(RunProcessingStatus &)
std::vector< ParameterSet > popSubProcessVParameterSet(ParameterSet ¶meterSet)
void warnAboutModulesRequiringRunSynchronization() const
void checkForModuleDependencyCorrectness(edm::PathsAndConsumesOfModulesBase const &iPnC, bool iPrintDependencies)
void respondToOpenInputFile()
unsigned int numberOfLuminosityBlocks() const
void beginLumiAsync(IOVSyncValue const &, std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
EventSetupImpl const & eventSetupImpl(unsigned subProcessIndex) const
void handleNextItemAfterMergingRunEntries(std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
InputSource::ItemType processRuns()
MergeableRunProductMetadata * mergeableRunProductMetadata()
oneapi::tbb::task_group taskGroup_
void throwAboutModulesRequiringLuminosityBlockSynchronization() const
void insertForInput(std::unique_ptr< ProcessBlockPrincipal >)
void addContext(std::string const &context)
static EventNumber_t maxEventNumber()
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
ShouldWriteLumi shouldWriteLumi() const
edm::EventID specifiedEventTransition() const
void endRunAsync(std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
bool forceESCacheClearOnNewRun_
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
bool shouldWeStop() const
std::vector< std::shared_ptr< const EventSetupImpl > > & eventSetupImpls()
std::atomic< unsigned int > streamRunActive_
void readAndMergeLumi(LuminosityBlockProcessingStatus &)
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
std::vector< std::string > branchesToDeleteEarly_
void readAndMergeLumiEntriesAsync(std::shared_ptr< LuminosityBlockProcessingStatus >, WaitingTaskHolder)
void closeInputFile(bool cleaningUpAfterException)
void readAndMergeRunEntriesAsync(std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
void readProcessBlock(ProcessBlockPrincipal &)
bool adjustToNewProductRegistry(ProductRegistry const ®)
static ComponentFactory< T > const * get()
std::exception_ptr deferredExceptionPtr_
void removeModules(std::vector< ModuleDescription const *> const &modules)
std::shared_ptr< LuminosityBlockPrincipal > readLuminosityBlock(std::shared_ptr< RunPrincipal > rp)
auto lastTask(edm::WaitingTaskHolder iTask)
T const & get(Event const &event, InputTag const &tag) noexcept(false)
void endUnfinishedLumi(bool cleaningUpAfterException)
bool shouldWeCloseOutput() const
PathsAndConsumesOfModules pathsAndConsumesOfModules_
int totalEventsPassed() const
edm::propagate_const< std::shared_ptr< ProcessBlockHelper > > processBlockHelper_
void continueLumiAsync(WaitingTaskHolder)
Transition requestedTransition() const
void globalEndLumiAsync(WaitingTaskHolder, std::shared_ptr< LuminosityBlockProcessingStatus >)
Log< level::System, true > LogAbsolute
void prepareForNextLoop()
std::shared_ptr< LuminosityBlockPrincipal > getAvailableLumiPrincipalPtr()
void streamBeginRunAsync(unsigned int iStream, std::shared_ptr< RunProcessingStatus >, bool precedingTasksSucceeded, WaitingTaskHolder)
auto wrap(F iFunc) -> decltype(iFunc())
std::shared_ptr< ActivityRegistry > actReg_
std::atomic< unsigned int > streamLumiActive_
Log< level::Warning, false > LogWarning
void streamEndLumiAsync(WaitingTaskHolder, unsigned int iStreamIndex)
void beginProcessBlock(bool &beginProcessBlockSucceeded)
The Signals That Services Can Subscribe To This is based on ActivityRegistry h
Helper function to determine trigger accepts.
T first(std::pair< T, U > const &p)
static ParentageRegistry * instance()
void setExceptionMessageLumis()
void beginRunAsync(IOVSyncValue const &, WaitingTaskHolder)
bool setDeferredException(std::exception_ptr)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
void setEventProcessingState(EventProcessingState val)
bool deleteNonConsumedUnscheduledModules_
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
void inputProcessBlocks()
bool insertMapped(value_type const &v)
static Registry * instance()
PrincipalCache principalCache_
void push(oneapi::tbb::task_group &iGroup, T &&iAction)
asynchronously pushes functor iAction into queue
EventProcessor(std::unique_ptr< ParameterSet > parameterSet, ServiceToken const &token=ServiceToken(), serviceregistry::ServiceLegacy=serviceregistry::kOverlapIsError, std::vector< std::string > const &defaultServices=std::vector< std::string >(), std::vector< std::string > const &forcedServices=std::vector< std::string >())
void dumpOptionsToLogFile(unsigned int nThreads, unsigned int nStreams, unsigned int nConcurrentLumis, unsigned int nConcurrentRuns)