84 #include "boost/range/adaptor/reversed.hpp" 96 #include "oneapi/tbb/task.h" 104 class PauseQueueSentry {
107 ~PauseQueueSentry() { queue_.resume(); }
116 namespace chain = waiting_task::chain;
119 std::unique_ptr<InputSource>
makeInput(
unsigned int moduleIndex,
122 std::shared_ptr<ProductRegistry> preg,
123 std::shared_ptr<BranchIDListHelper> branchIDListHelper,
124 std::shared_ptr<ProcessBlockHelper>
const& processBlockHelper,
125 std::shared_ptr<ThinnedAssociationsHelper> thinnedAssociationsHelper,
126 std::shared_ptr<ActivityRegistry> areg,
127 std::shared_ptr<ProcessConfiguration const> processConfiguration,
130 if (main_input ==
nullptr) {
132 <<
"There must be exactly one source in the configuration.\n" 133 <<
"It is missing (or there are sufficient syntax errors such that it is not recognized as the source)\n";
138 std::unique_ptr<ParameterSetDescriptionFillerBase>
filler(
141 filler->fill(descriptions);
146 std::ostringstream ost;
147 ost <<
"Validating configuration of input source of type " << modtype;
163 processConfiguration.get(),
170 thinnedAssociationsHelper,
174 common.maxSecondsUntilRampdown_,
177 areg->preSourceConstructionSignal_(md);
178 std::unique_ptr<InputSource>
input;
181 std::shared_ptr<int> sentry(
nullptr, [areg, &md](
void*) { areg->postSourceConstructionSignal_(md); });
184 input->preEventReadFromSourceSignal_.connect(std::cref(areg->preEventReadFromSourceSignal_));
185 input->postEventReadFromSourceSignal_.connect(std::cref(areg->postEventReadFromSourceSignal_));
188 std::ostringstream ost;
189 ost <<
"Constructing input source of type " << modtype;
200 std::vector<std::string>
const& loopers) {
201 std::shared_ptr<EDLooperBase> vLooper;
203 assert(1 == loopers.size());
205 for (
auto const& looperName : loopers) {
217 std::vector<std::string>
const& defaultServices,
218 std::vector<std::string>
const& forcedServices)
221 branchIDListHelper_(),
225 espController_(
std::make_unique<eventsetup::EventSetupsController>(moduleTypeResolverMaker_.
get())),
228 processConfiguration_(),
234 deferredExceptionPtrIsSet_(
false),
238 beginJobCalled_(
false),
239 shouldWeStop_(
false),
240 fileModeNoMerge_(
false),
241 exceptionMessageFiles_(),
242 exceptionMessageRuns_(
false),
243 exceptionMessageLumis_(
false),
244 forceLooperToEnd_(
false),
245 looperBeginJobRun_(
false),
246 forceESCacheClearOnNewRun_(
false),
247 eventSetupDataToExcludeFromPrefetching_() {
249 processDesc->addServices(defaultServices, forcedServices);
250 init(processDesc, iToken, iLegacy);
254 std::vector<std::string>
const& defaultServices,
255 std::vector<std::string>
const& forcedServices)
258 branchIDListHelper_(),
262 espController_(
std::make_unique<eventsetup::EventSetupsController>(moduleTypeResolverMaker_.
get())),
265 processConfiguration_(),
271 deferredExceptionPtrIsSet_(
false),
275 beginJobCalled_(
false),
276 shouldWeStop_(
false),
277 fileModeNoMerge_(
false),
278 exceptionMessageFiles_(),
279 exceptionMessageRuns_(
false),
280 exceptionMessageLumis_(
false),
281 forceLooperToEnd_(
false),
282 looperBeginJobRun_(
false),
283 forceESCacheClearOnNewRun_(
false),
284 eventSetupDataToExcludeFromPrefetching_() {
286 processDesc->addServices(defaultServices, forcedServices);
295 branchIDListHelper_(),
299 espController_(
std::make_unique<eventsetup::EventSetupsController>(moduleTypeResolverMaker_.
get())),
302 processConfiguration_(),
308 deferredExceptionPtrIsSet_(
false),
312 beginJobCalled_(
false),
313 shouldWeStop_(
false),
314 fileModeNoMerge_(
false),
315 exceptionMessageFiles_(),
316 exceptionMessageRuns_(
false),
317 exceptionMessageLumis_(
false),
318 forceLooperToEnd_(
false),
319 looperBeginJobRun_(
false),
320 forceESCacheClearOnNewRun_(
false),
321 eventSetupDataToExcludeFromPrefetching_() {
336 std::shared_ptr<ParameterSet>
parameterSet = processDesc->getProcessPSet();
340 bool const hasSubProcesses = !subProcessVParameterSet.empty();
353 <<
"Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
361 unsigned int nThreads = optionsPset.getUntrackedParameter<
unsigned int>(
"numberOfThreads");
367 unsigned int nStreams = optionsPset.getUntrackedParameter<
unsigned int>(
"numberOfStreams");
371 unsigned int nConcurrentLumis =
372 optionsPset.getUntrackedParameter<
unsigned int>(
"numberOfConcurrentLuminosityBlocks");
373 if (nConcurrentLumis == 0) {
374 nConcurrentLumis = 2;
376 if (nConcurrentLumis > nStreams) {
377 nConcurrentLumis = nStreams;
379 unsigned int nConcurrentRuns = optionsPset.getUntrackedParameter<
unsigned int>(
"numberOfConcurrentRuns");
380 if (nConcurrentRuns == 0 || nConcurrentRuns > nConcurrentLumis) {
381 nConcurrentRuns = nConcurrentLumis;
384 if (!loopers.empty()) {
386 if (nStreams != 1 || nConcurrentLumis != 1 || nConcurrentRuns != 1) {
387 edm::LogWarning(
"ThreadStreamSetup") <<
"There is a looper, so the number of streams, the number " 388 "of concurrent runs, and the number of concurrent lumis " 389 "are all being reset to 1. Loopers cannot currently support " 390 "values greater than 1.";
392 nConcurrentLumis = 1;
396 bool dumpOptions = optionsPset.getUntrackedParameter<
bool>(
"dumpOptions");
400 if (nThreads > 1
or nStreams > 1) {
401 edm::LogInfo(
"ThreadStreamSetup") <<
"setting # threads " << nThreads <<
"\nsetting # streams " << nStreams;
410 unsigned int maxConcurrentIOVs =
411 3 * nConcurrentRuns - 2 + ((nConcurrentLumis > nConcurrentRuns) ? (nConcurrentLumis - nConcurrentRuns) : 0);
417 optionsPset.getUntrackedParameter<
bool>(
"deleteNonConsumedUnscheduledModules");
420 if (not hasSubProcesses) {
424 auto referencePSets =
425 optionsPset.getUntrackedParameter<std::vector<edm::ParameterSet>>(
"holdsReferencesToDeleteEarly");
426 for (
auto const&
pset : referencePSets) {
428 auto references =
pset.getParameter<std::vector<std::string>>(
"references");
429 for (
auto const& ref : references) {
434 optionsPset.getUntrackedParameter<std::vector<std::string>>(
"modulesToIgnoreForDeleteEarly");
441 auto& serviceSets = processDesc->getServicesPSets();
451 handler->willBeUsingThreads();
458 ParameterSet const& eventSetupPset(optionsPset.getUntrackedParameterSet(
"eventSetup"));
463 if (!loopers.empty()) {
474 runQueue_ = std::make_unique<LimitedTaskQueue>(nConcurrentRuns);
475 lumiQueue_ = std::make_unique<LimitedTaskQueue>(nConcurrentLumis);
483 std::optional<ScheduleItems::MadeModules> madeModules;
486 tbb::task_group
group;
489 auto tempReg = std::make_shared<ProductRegistry>();
492 group.run([&,
this]() {
500 group.run([&,
this, tempReg]() {
506 items.branchIDListHelper(),
508 items.thinnedAssociationsHelper(),
510 items.processConfiguration(),
515 items.preg()->addFromInput(*tempReg);
545 auto ep = std::make_shared<EventPrincipal>(
preg(),
557 auto rp = std::make_unique<RunPrincipal>(
578 for (
auto& subProcessPSet : subProcessVParameterSet) {
650 std::vector<ModuleProcessName> consumedBySubProcesses;
653 auto c = subProcess.keepOnlyConsumedUnscheduledModules(deleteModules);
654 if (consumedBySubProcesses.empty()) {
656 }
else if (not
c.empty()) {
657 std::vector<ModuleProcessName>
tmp;
658 tmp.reserve(consumedBySubProcesses.size() +
c.size());
660 consumedBySubProcesses.end(),
663 std::back_inserter(
tmp));
672 not unusedModules.empty()) {
675 edm::LogInfo(
"DeleteModules").log([&unusedModules](
auto&
l) {
676 l <<
"Following modules are not in any Path or EndPath, nor is their output consumed by any other module, " 678 "therefore they are deleted before beginJob transition.";
695 schedule_->initializeEarlyDelete(branchesToDeleteEarly, referencesToBranches, modulesToSkip, *
preg_);
725 ex.
addContext(
"Calling beginJob for the source");
731 constexpr
bool mustPrefetchMayGet =
true;
733 auto const runLookup =
preg_->productLookup(
InRun);
734 auto const lumiLookup =
preg_->productLookup(
InLumi);
737 looper_->updateLookup(
InRun, *runLookup, mustPrefetchMayGet);
738 looper_->updateLookup(
InLumi, *lumiLookup, mustPrefetchMayGet);
740 looper_->updateLookup(
esp_->recordsToResolverIndices());
744 actReg_->postBeginJobSignal_();
746 oneapi::tbb::task_group
group;
749 first([
this](
auto nextTask) {
751 first([
i,
this](
auto nextTask) {
766 "Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
773 oneapi::tbb::task_group
group;
781 first([
this,
i, &
c, &collectorMutex](
auto nextTask) {
782 std::exception_ptr
ep;
787 ep = std::current_exception();
790 std::lock_guard<std::mutex>
l(collectorMutex);
791 c.call([&
ep]() { std::rethrow_exception(
ep); });
793 }) |
then([
this,
i, &
c, &collectorMutex](
auto nextTask) {
795 first([
this,
i, &
c, &collectorMutex, &subProcess](
auto nextTask) {
796 std::exception_ptr
ep;
799 subProcess.doEndStream(
i);
801 ep = std::current_exception();
804 std::lock_guard<std::mutex>
l(collectorMutex);
805 c.call([&
ep]() { std::rethrow_exception(
ep); });
812 waitTask.waitNoThrow();
815 c.call([actReg]() { actReg->preEndJobSignal_(); });
824 c.call([actReg]() { actReg->postEndJobSignal_(); });
833 return schedule_->getAllModuleDescriptions();
845 #include "TransitionProcessors.icc" 849 bool returnValue =
false;
854 edm::LogSystem(
"ShutdownSignal") <<
"an external signal was sent to shutdown the job early.";
867 itemType =
input_->nextItemType();
871 sentry.completedSuccessfully();
888 actReg_->beginProcessingSignal_();
889 auto endSignal = [](
ActivityRegistry* iReg) { iReg->endProcessingSignal_(); };
890 std::unique_ptr<ActivityRegistry, decltype(endSignal)> guard(
actReg_.get(), endSignal);
895 bool firstTime =
true;
905 auto trans =
fp.processFiles(*
this);
916 throw cms::Exception(
"BadTransition") <<
"Unexpected transition change " << trans;
925 "Another exception was caught while trying to clean up lumis after the primary fatal exception.");
926 e.addAdditionalInfo(message);
927 if (
e.alreadyPrinted()) {
933 "Another exception was caught while trying to clean up runs after the primary fatal exception.");
934 e.addAdditionalInfo(message);
935 if (
e.alreadyPrinted()) {
941 if (
e.alreadyPrinted()) {
951 FDEBUG(1) <<
" \treadFile\n";
957 streamRunStatus_[0]->runPrincipal()->adjustIndexesAfterProductRegistryAddition();
961 streamLumiStatus_[0]->lumiPrincipal()->adjustIndexesAfterProductRegistryAddition();
965 if (size < preg_->
size()) {
972 sentry.completedSuccessfully();
978 input_->closeFile(
fb_.get(), cleaningUpAfterException);
979 sentry.completedSuccessfully();
981 FDEBUG(1) <<
"\tcloseInputFile\n";
989 FDEBUG(1) <<
"\topenOutputFiles\n";
996 FDEBUG(1) <<
"\tcloseOutputFiles\n";
1002 [
this](
auto& subProcess) { subProcess.updateBranchIDListHelper(
branchIDListHelper_->branchIDLists()); });
1006 FDEBUG(1) <<
"\trespondToOpenInputFile\n";
1014 FDEBUG(1) <<
"\trespondToCloseInputFile\n";
1024 FDEBUG(1) <<
"\tstartingNewLoop\n";
1030 looper_->setModuleChanger(&changer);
1032 looper_->setModuleChanger(
nullptr);
1038 FDEBUG(1) <<
"\tendOfLoop\n";
1045 FDEBUG(1) <<
"\trewind\n";
1050 FDEBUG(1) <<
"\tprepareForNextLoop\n";
1054 FDEBUG(1) <<
"\tshouldWeCloseOutput\n";
1057 if (subProcess.shouldWeCloseOutput()) {
1063 return schedule_->shouldWeCloseOutput();
1067 FDEBUG(1) <<
"\tdoErrorStuff\n";
1068 LogError(
"StateMachine") <<
"The EventProcessor state machine encountered an unexpected event\n" 1069 <<
"and went to the error state\n" 1070 <<
"Will attempt to terminate processing normally\n" 1071 <<
"(IF using the looper the next loop will be attempted)\n" 1072 <<
"This likely indicates a bug in an input module or corrupted input or both\n";
1083 beginGlobalTransitionAsync<Traits>(
1086 globalWaitTask.wait();
1087 beginProcessBlockSucceeded =
true;
1091 input_->fillProcessBlockHelper();
1093 while (
input_->nextProcessBlock(processBlockPrincipal)) {
1100 beginGlobalTransitionAsync<Traits>(
1103 globalWaitTask.wait();
1107 writeWaitTask.wait();
1128 cleaningUpAfterException);
1129 globalWaitTask.wait();
1131 if (beginProcessBlockSucceeded) {
1134 writeWaitTask.wait();
1157 runStatus->runPrincipal()->reducedProcessHistoryID() ==
input_->reducedProcessHistoryID()) {
1163 runStatus->setHolderOfTaskInProcessRuns(holder);
1180 actReg_->esSyncIOVQueuingSignal_.emit(iSync);
1187 status->endIOVWaitingTasks(),
1188 status->eventSetupImpls(),
1193 }) |
chain::then([
this,
status](std::exception_ptr
const* iException,
auto nextTask) {
1206 if (postRunQueueTask.taskHasFailed()) {
1207 status->resetBeginResources();
1215 *postRunQueueTask.group(), [
this, postSourceTask = postRunQueueTask,
status]()
mutable {
1219 if (postSourceTask.taskHasFailed()) {
1220 status->resetBeginResources();
1222 status->resumeGlobalRunQueue();
1232 sentry.completedSuccessfully();
1239 oneapi::tbb::task_group
group;
1244 }) |
then([
this, &es](
auto nextTask)
mutable {
1255 status->setStopBeforeProcessingRun(
true);
1256 nextTask.doneWaiting(std::current_exception());
1258 }) |
then([
this,
status, &es](
auto nextTask) {
1259 if (
status->stopBeforeProcessingRun()) {
1264 beginGlobalTransitionAsync<Traits>(
1267 if (
status->stopBeforeProcessingRun()) {
1270 status->globalBeginDidSucceed();
1272 if (
status->stopBeforeProcessingRun()) {
1278 if (
status->stopBeforeProcessingRun()) {
1283 }) |
then([
this,
status](std::exception_ptr
const* iException,
auto holder)
mutable {
1284 bool precedingTasksSucceeded =
true;
1286 precedingTasksSucceeded =
false;
1291 if (
status->stopBeforeProcessingRun()) {
1293 status->resetBeginResources();
1295 status->resumeGlobalRunQueue();
1301 auto globalEndRunTask =
1304 status->holderOfTaskInProcessRuns().doneWaiting(std::exception_ptr{});
1309 status->resetBeginResources();
1311 status->resumeGlobalRunQueue();
1312 holder.doneWaiting(std::current_exception());
1327 *holder.group(), [
this,
status, precedingTasksSucceeded, holder]()
mutable {
1332 [
this,
i,
status, precedingTasksSucceeded, holder]()
mutable {
1337 if (
status->streamFinishedBeginRun()) {
1340 status->resetBeginResources();
1350 status->resetBeginResources();
1357 status->resetBeginResources();
1359 status->resumeGlobalRunQueue();
1360 postSourceTask.doneWaiting(std::current_exception());
1364 status->resetBeginResources();
1366 status->resumeGlobalRunQueue();
1367 postRunQueueTask.doneWaiting(std::current_exception());
1371 status->resetBeginResources();
1373 nextTask.doneWaiting(std::current_exception());
1379 std::shared_ptr<RunProcessingStatus>
status,
1380 bool precedingTasksSucceeded,
1389 chain::first([
this, iStream, precedingTasksSucceeded](
auto nextTask) {
1390 if (precedingTasksSucceeded) {
1395 beginStreamTransitionAsync<Traits>(
1398 }) |
then([
this, iStream](std::exception_ptr
const* exceptionFromBeginStreamRun,
auto nextTask) {
1399 if (exceptionFromBeginStreamRun) {
1400 nextTask.doneWaiting(*exceptionFromBeginStreamRun);
1412 if (
status->streamFinishedBeginRun()) {
1413 status->resetBeginResources();
1420 RunPrincipal& runPrincipal = *iRunStatus->runPrincipal();
1433 iRunStatus->endIOVWaitingTasksEndRun(),
1434 iRunStatus->eventSetupImplsEndRun(),
1438 }) |
chain::then([
this, iRunStatus](std::exception_ptr
const* iException,
auto nextTask) {
1440 iRunStatus->setEndingEventSetupSucceeded(
false);
1447 streamQueues_[
i].push(*nextTask.group(), [
this,
i, nextTask]()
mutable {
1474 tmp.doneWaiting(iException);
1479 auto& runPrincipal = *(iRunStatus->runPrincipal());
1480 bool didGlobalBeginSucceed = iRunStatus->didGlobalBeginSucceed();
1481 bool cleaningUpAfterException = iRunStatus->cleaningUpAfterException() || iTask.
taskHasFailed();
1482 EventSetupImpl const& es = iRunStatus->eventSetupImplEndRun(
esp_->subProcessIndex());
1483 std::vector<std::shared_ptr<const EventSetupImpl>>
const* eventSetupImpls = &iRunStatus->eventSetupImplsEndRun();
1484 bool endingEventSetupSucceeded = iRunStatus->endingEventSetupSucceeded();
1488 chain::first([
this, &runPrincipal, &es, &eventSetupImpls, cleaningUpAfterException, endingEventSetupSucceeded](
1490 if (endingEventSetupSucceeded) {
1493 endGlobalTransitionAsync<Traits>(
1498 [
this, &runPrincipal, &es](
auto nextTask) {
1502 [
this, &runPrincipal, &es](
auto nextTask) {
1506 ifThen(didGlobalBeginSucceed && endingEventSetupSucceeded,
1507 [
this, mergeableRunProductMetadata, &runPrincipal = runPrincipal](
auto nextTask) {
1509 writeRunAsync(nextTask, runPrincipal, mergeableRunProductMetadata);
1513 didGlobalBeginSucceed,
1514 mergeableRunProductMetadata,
1515 endingEventSetupSucceeded](std::exception_ptr
const* iException,
auto nextTask)
mutable {
1516 if (didGlobalBeginSucceed && endingEventSetupSucceeded) {
1517 mergeableRunProductMetadata->postWriteRun();
1524 std::exception_ptr ptr;
1531 ptr = std::current_exception();
1535 status->resumeGlobalRunQueue();
1539 ptr = std::current_exception();
1543 status->resetEndResources();
1547 ptr = std::current_exception();
1551 if (ptr && !iException) {
1577 if (runStatus->streamFinishedRun()) {
1578 runStatus->globalEndRunHolder().doneWaiting(std::exception_ptr());
1589 if (runStatus->didGlobalBeginSucceed() && runStatus->endingEventSetupSucceeded()) {
1591 auto eventSetupImpls = &runStatus->eventSetupImplsEndRun();
1592 bool cleaningUpAfterException = runStatus->cleaningUpAfterException() || iTask.
taskHasFailed();
1594 auto& runPrincipal = *runStatus->runPrincipal();
1597 endStreamTransitionAsync<Traits>(
std::move(runDoneTaskHolder),
1603 cleaningUpAfterException);
1615 runStatus->setCleaningUpAfterException(cleaningUpAfterException);
1617 runStatus->setHolderOfTaskInProcessRuns(holder);
1625 std::shared_ptr<RunProcessingStatus> iRunStatus,
1627 actReg_->esSyncIOVQueuingSignal_.emit(iSync);
1633 status->endIOVWaitingTasks(),
1634 status->eventSetupImpls(),
1638 }) |
chain::then([
this,
status, iRunStatus](std::exception_ptr
const* iException,
auto nextTask) {
1650 if (postLumiQueueTask.taskHasFailed()) {
1651 status->resetResources();
1660 *postLumiQueueTask.group(),
1661 [
this, postSourceTask = postLumiQueueTask,
status, iRunStatus]()
mutable {
1665 if (postSourceTask.taskHasFailed()) {
1666 status->resetResources();
1678 sentry.completedSuccessfully();
1684 rng->preBeginLumi(lb);
1693 }) |
then([
this,
status, &es, &lumiPrincipal](
auto nextTask) {
1696 beginGlobalTransitionAsync<Traits>(
1702 status->globalBeginDidSucceed();
1705 }) |
then([
this,
status, iRunStatus](std::exception_ptr
const* iException,
auto holder)
mutable {
1707 status->resetResources();
1714 status->globalBeginDidSucceed();
1717 status->setGlobalEndRunHolder(iRunStatus->globalEndRunHolder());
1731 auto eventSetupImpls = &
status->eventSetupImpls();
1732 auto lp =
status->lumiPrincipal().get();
1735 event.setLuminosityBlockPrincipal(lp);
1739 beginStreamTransitionAsync<Traits>(
std::move(nextTask),
1746 then([
this,
i](std::exception_ptr
const* exceptionFromBeginStreamLumi,
1748 if (exceptionFromBeginStreamLumi) {
1750 copyHolder.
doneWaiting(*exceptionFromBeginStreamLumi);
1761 status->resetResources();
1769 status->resetResources();
1777 status->resetResources();
1793 status->lumiPrincipal()->luminosityBlock() ==
input_->luminosityBlock()) {
1799 unsigned int streamIndex = 0;
1800 oneapi::tbb::task_arena arena{oneapi::tbb::task_arena::attach()};
1804 nextTask.group()->run(
1814 tmp.doneWaiting(iException);
1819 std::shared_ptr<LuminosityBlockProcessingStatus> iLumiStatus) {
1822 auto& lp = *(iLumiStatus->lumiPrincipal());
1823 bool didGlobalBeginSucceed = iLumiStatus->didGlobalBeginSucceed();
1824 bool cleaningUpAfterException = iLumiStatus->cleaningUpAfterException() || iTask.
taskHasFailed();
1826 std::vector<std::shared_ptr<const EventSetupImpl>>
const* eventSetupImpls = &iLumiStatus->eventSetupImpls();
1829 chain::first([
this, &lp, &es, &eventSetupImpls, cleaningUpAfterException](
auto nextTask) {
1834 endGlobalTransitionAsync<Traits>(
1836 }) |
then([
this, didGlobalBeginSucceed, &lumiPrincipal = lp](
auto nextTask) {
1838 if (didGlobalBeginSucceed) {
1847 }) |
then([
status =
std::move(iLumiStatus),
this](std::exception_ptr
const* iException,
auto nextTask)
mutable {
1853 std::exception_ptr ptr;
1861 ptr = std::current_exception();
1867 ptr = std::current_exception();
1872 status->resetResources();
1873 status->globalEndRunHolderDoneWaiting();
1877 ptr = std::current_exception();
1881 if (ptr && !iException) {
1900 if (
status->streamFinishedLumi()) {
1910 lumiStatus->setEndTime();
1913 auto eventSetupImpls = &lumiStatus->eventSetupImpls();
1914 bool cleaningUpAfterException = lumiStatus->cleaningUpAfterException() || iTask.
taskHasFailed();
1916 if (lumiStatus->didGlobalBeginSucceed()) {
1917 auto& lumiPrincipal = *lumiStatus->lumiPrincipal();
1920 endStreamTransitionAsync<Traits>(
std::move(lumiDoneTask),
1926 cleaningUpAfterException);
1942 globalWaitTask.wait();
1949 input_->readProcessBlock(processBlockPrincipal);
1950 sentry.completedSuccessfully();
1956 rp->setAux(*
input_->runAuxiliary());
1960 sentry.completedSuccessfully();
1962 assert(
input_->reducedProcessHistoryID() == rp->reducedProcessHistoryID());
1974 input_->readAndMergeRun(runPrincipal);
1975 sentry.completedSuccessfully();
1982 lbp->setAux(*
input_->luminosityBlockAuxiliary());
1986 sentry.completedSuccessfully();
1994 assert(lumiPrincipal.aux().sameIdentity(*
input_->luminosityBlockAuxiliary())
or 1995 input_->processHistoryRegistry().reducedProcessHistoryID(lumiPrincipal.aux().processHistoryID()) ==
1996 input_->processHistoryRegistry().reducedProcessHistoryID(
1997 input_->luminosityBlockAuxiliary()->processHistoryID()));
1998 bool lumiOK = lumiPrincipal.adjustToNewProductRegistry(*
preg());
2000 lumiPrincipal.mergeAuxiliary(*
input_->luminosityBlockAuxiliary());
2004 sentry.completedSuccessfully();
2017 s.writeProcessBlockAsync(nextTask, processBlockType);
2033 s.writeRunAsync(nextTask, runPrincipal, mergeableRunProductMetadata);
2058 s.writeLumiAsync(nextTask, lumiPrincipal);
2068 iStatus.
lumiPrincipal()->setRunPrincipal(std::shared_ptr<RunPrincipal>());
2081 std::lock_guard<std::recursive_mutex> guard(*(
sourceMutex_.get()));
2085 status->runPrincipal()->reducedProcessHistoryID() ==
input_->reducedProcessHistoryID()) {
2086 if (
status->holderOfTaskInProcessRuns().taskHasFailed()) {
2087 status->setStopBeforeProcessingRun(
true);
2094 status->setStopBeforeProcessingRun(
true);
2095 holder.doneWaiting(std::current_exception());
2108 std::lock_guard<std::recursive_mutex> guard(*(
sourceMutex_.get()));
2112 iLumiStatus->lumiPrincipal()->luminosityBlock() ==
input_->luminosityBlock()) {
2117 holder.doneWaiting(std::current_exception());
2125 iRunStatus->holderOfTaskInProcessRuns().doneWaiting(std::exception_ptr{});
2130 input_->luminosityBlockAuxiliary()->beginTime()),
2146 unsigned int iStreamIndex,
2178 std::lock_guard<std::recursive_mutex> guard(*(
sourceMutex_.get()));
2193 iStatus.
lumiPrincipal()->runPrincipal().reducedProcessHistoryID() !=
input_->reducedProcessHistoryID()))) {
2212 auto recursionTask =
2213 make_waiting_task([
this, iTask, iStreamIndex](std::exception_ptr
const* iEventException)
mutable {
2214 if (iEventException) {
2228 if (not
status->haveStartedNextLumiOrEndedRun()) {
2229 status->startNextLumiOrEndRun();
2233 input_->luminosityBlockAuxiliary()->beginTime()),
2252 if (runStatus->holderOfTaskInProcessRuns().hasTask()) {
2253 runStatus->holderOfTaskInProcessRuns().doneWaiting(std::exception_ptr{});
2275 sentry.completedSuccessfully();
2277 FDEBUG(1) <<
"\treadEvent\n";
2291 rng->postEventRead(
ev);
2296 chain::first([
this, &es, pep, iStreamIndex](
auto nextTask) {
2301 subProcess.doEventAsync(nextTask, *pep, &
streamLumiStatus_[iStreamIndex]->eventSetupImpls());
2303 }) |
ifThen(
looper_, [
this, iStreamIndex, pep](
auto nextTask) {
2307 }) |
then([pep](
auto nextTask) {
2308 FDEBUG(1) <<
"\tprocessEvent\n";
2309 pep->clearEventPrincipal();
2314 bool randomAccess =
input_->randomAccess();
2323 status =
looper_->doDuringLoop(iPrincipal, es, pc, &streamContext);
2341 FDEBUG(1) <<
"\tshouldWeStop\n";
2346 if (subProcess.terminate()) {
2362 bool expected =
false;
2372 ex <<
"The framework is configured to use at least two streams, but the following modules\n" 2373 <<
"require synchronizing on LuminosityBlock boundaries:";
2375 for (
auto worker :
schedule_->allWorkers()) {
2376 if (worker->wantsGlobalLuminosityBlocks() and worker->globalLuminosityBlocksQueue()) {
2378 ex <<
"\n " << worker->description()->moduleName() <<
" " << worker->description()->moduleLabel();
2382 ex <<
"\n\nThe situation can be fixed by either\n" 2383 <<
" * modifying the modules to support concurrent LuminosityBlocks (preferred), or\n" 2384 <<
" * setting 'process.options.numberOfConcurrentLuminosityBlocks = 1' in the configuration file";
2390 std::unique_ptr<LogSystem>
s;
2391 for (
auto worker :
schedule_->allWorkers()) {
2392 if (worker->wantsGlobalRuns() and worker->globalRunsQueue()) {
2394 s = std::make_unique<LogSystem>(
"ModulesSynchingOnRuns");
2395 (*s) <<
"The following modules require synchronizing on Run boundaries:";
2397 (*s) <<
"\n " << worker->description()->moduleName() <<
" " << worker->description()->moduleLabel();
2403 std::unique_ptr<LogSystem>
s;
2404 for (
auto worker :
schedule_->allWorkers()) {
2407 s = std::make_unique<LogSystem>(
"LegacyModules");
2408 (*s) <<
"The following legacy modules are configured. Support for legacy modules\n" 2409 "is going to end soon. These modules need to be converted to have type\n" 2410 "edm::global, edm::stream, edm::one, or in rare cases edm::limited.";
2412 (*s) <<
"\n " << worker->description()->moduleName() <<
" " << worker->description()->moduleLabel();
LuminosityBlockNumber_t luminosityBlock() const
std::atomic< bool > exceptionMessageLumis_
bool readNextEventForStream(WaitingTaskHolder const &, unsigned int iStreamIndex, LuminosityBlockProcessingStatus &)
void readEvent(unsigned int iStreamIndex)
void streamEndRunAsync(WaitingTaskHolder, unsigned int iStreamIndex)
ProcessContext processContext_
Log< level::System, false > LogSystem
void setProcessesWithMergeableRunProducts(ProductRegistry const &productRegistry)
T getParameter(std::string const &) const
void clear()
Not thread safe.
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
void clearRunPrincipal(RunProcessingStatus &)
void setExceptionMessageRuns()
void globalEndRunAsync(WaitingTaskHolder, std::shared_ptr< RunProcessingStatus >)
SharedResourcesAcquirer sourceResourcesAcquirer_
Timestamp const & endTime() const
void handleNextEventForStreamAsync(WaitingTaskHolder, unsigned int iStreamIndex)
int totalEventsFailed() const
InputSource::ItemType nextTransitionType()
std::shared_ptr< ProductRegistry const > preg() const
void warnAboutLegacyModules() const
void processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
def create(alignables, pedeDump, additionalData, outputFile, config)
std::shared_ptr< RunPrincipal > readRun()
void handleEndLumiExceptions(std::exception_ptr, WaitingTaskHolder const &)
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
static PFTauRenderPlugin instance
EventProcessingState eventProcessingState() const
SerialTaskQueue streamQueuesInserter_
void endUnfinishedRun(bool cleaningUpAfterException)
void setExceptionMessageFiles(std::string &message)
RunPrincipal const & runPrincipal() const
void processEventAsyncImpl(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
void processEventWithLooper(EventPrincipal &, unsigned int iStreamIndex)
void handleEndRunExceptions(std::exception_ptr, WaitingTaskHolder const &)
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
std::unique_ptr< edm::LimitedTaskQueue > lumiQueue_
volatile std::atomic< bool > shutdown_flag
static LuminosityBlockNumber_t maxLuminosityBlockNumber()
void clearCounters()
Clears counters used by trigger report.
ProcessBlockPrincipal & inputProcessBlockPrincipal() const
void push(oneapi::tbb::task_group &, const T &iAction)
asynchronously pushes functor iAction into queue
bool checkForAsyncStopRequest(StatusCode &)
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
void writeLumiAsync(WaitingTaskHolder, LuminosityBlockPrincipal &)
std::unique_ptr< edm::ModuleTypeResolverMaker const > makeModuleTypeResolverMaker(edm::ParameterSet const &pset)
std::shared_ptr< EDLooperBase const > looper() const
void ensureAvailableAccelerators(edm::ParameterSet const ¶meterSet)
ParameterSet getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
std::shared_ptr< RunPrincipal > getAvailableRunPrincipalPtr()
InputSource::ItemType lastTransitionType() const
std::shared_ptr< RunPrincipal > & runPrincipal()
void adjustEventsToNewProductRegistry(std::shared_ptr< ProductRegistry const >)
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
Log< level::Error, false > LogError
void adjustIndexesAfterProductRegistryAddition()
StreamID streamID() const
ParameterSet const & parameterSet(StableProvenance const &provenance, ProcessHistory const &history)
PreallocationConfiguration preallocations_
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
constexpr auto then(O &&iO)
std::vector< SubProcess > subProcesses_
static unsigned int getUniqueID()
Returns a unique id each time called. Intended to be passed to ModuleDescription's constructor's modI...
std::atomic< bool > exceptionMessageRuns_
void mergeAuxiliary(RunAuxiliary const &aux)
edm::SerialTaskQueue queueWhichWaitsForIOVsToFinish_
void validateTopLevelParameterSets(ParameterSet *processParameterSet)
MergeableRunProductProcesses mergeableRunProductProcesses_
static std::string const input
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
void fillProcessBlockPrincipal(std::string const &processName, DelayedReader *reader=nullptr)
void setEndTime(Timestamp const &time)
ProcessBlockPrincipal & processBlockPrincipal() const
edm::propagate_const< std::unique_ptr< ModuleTypeResolverMaker const > > moduleTypeResolverMaker_
void endProcessBlock(bool cleaningUpAfterException, bool beginProcessBlockSucceeded)
U second(std::pair< T, U > const &p)
std::shared_ptr< LuminosityBlockPrincipal > & lumiPrincipal()
bool firstItemAfterLumiMerge_
oneapi::tbb::task_group * group() const noexcept
std::multimap< std::string, std::string > referencesToBranches_
unsigned int numberOfThreads() const
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
ServiceToken serviceToken_
ParameterSetID id() const
std::atomic< bool > deferredExceptionPtrIsSet_
bool resume()
Resumes processing if the queue was paused.
void doneWaiting(std::exception_ptr iExcept)
ParameterSet const & registerIt()
std::shared_ptr< EDLooperBase > fillLooper(eventsetup::EventSetupsController &esController, eventsetup::EventSetupProvider &cp, ParameterSet ¶ms, std::vector< std::string > const &loopers)
std::vector< edm::SerialTaskQueue > streamQueues_
auto runLast(edm::WaitingTaskHolder iTask)
bool taskHasFailed() const noexcept
std::vector< std::string > modulesToIgnoreForDeleteEarly_
void swap(edm::DataFrameContainer &lhs, edm::DataFrameContainer &rhs)
ShouldWriteRun shouldWriteRun() const
std::unique_ptr< edm::LimitedTaskQueue > runQueue_
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
void writeRunAsync(WaitingTaskHolder, RunPrincipal const &, MergeableRunProductMetadata const *)
int merge(int argc, char *argv[])
SerialTaskQueueChain & serialQueueChain() const
static void setThrowAnException(bool v)
void writeProcessBlockAsync(WaitingTaskHolder, ProcessBlockType)
void setLastOperationSucceeded(bool value)
unsigned int numberOfStreams() const
void initialize(Schedule const *, std::shared_ptr< ProductRegistry const >)
std::vector< ModuleDescription const * > nonConsumedUnscheduledModules(edm::PathsAndConsumesOfModulesBase const &iPnC, std::vector< ModuleProcessName > &consumedByChildren)
static ServiceRegistry & instance()
void clear()
Not thread safe.
StatusCode runToCompletion()
FunctorWaitingTask< F > * make_waiting_task(F f)
void clearLumiPrincipal(LuminosityBlockProcessingStatus &)
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
bool lastOperationSucceeded() const
unsigned int numberOfRuns() const
void insert(std::unique_ptr< ProcessBlockPrincipal >)
std::vector< std::shared_ptr< RunProcessingStatus > > streamRunStatus_
std::string exceptionMessageFiles_
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
std::unique_ptr< InputSource > makeInput(unsigned int moduleIndex, ParameterSet ¶ms, CommonParams const &common, std::shared_ptr< ProductRegistry > preg, std::shared_ptr< BranchIDListHelper > branchIDListHelper, std::shared_ptr< ProcessBlockHelper > const &processBlockHelper, std::shared_ptr< ThinnedAssociationsHelper > thinnedAssociationsHelper, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, PreallocationConfiguration const &allocations)
void releaseBeginRunResources(unsigned int iStream)
std::shared_ptr< RunProcessingStatus > exceptionRunStatus_
void respondToCloseInputFile()
InputSource::ItemType lastSourceTransition_
Log< level::Info, false > LogInfo
void readAndMergeRun(RunProcessingStatus &)
std::vector< ParameterSet > popSubProcessVParameterSet(ParameterSet ¶meterSet)
void reportShutdownSignal()
void warnAboutModulesRequiringRunSynchronization() const
void checkForModuleDependencyCorrectness(edm::PathsAndConsumesOfModulesBase const &iPnC, bool iPrintDependencies)
void respondToOpenInputFile()
unsigned int numberOfLuminosityBlocks() const
void beginLumiAsync(IOVSyncValue const &, std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
EventSetupImpl const & eventSetupImpl(unsigned subProcessIndex) const
void handleNextItemAfterMergingRunEntries(std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
InputSource::ItemType processRuns()
MergeableRunProductMetadata * mergeableRunProductMetadata()
oneapi::tbb::task_group taskGroup_
void throwAboutModulesRequiringLuminosityBlockSynchronization() const
void insertForInput(std::unique_ptr< ProcessBlockPrincipal >)
void addContext(std::string const &context)
static EventNumber_t maxEventNumber()
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
ShouldWriteLumi shouldWriteLumi() const
edm::EventID specifiedEventTransition() const
void endRunAsync(std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
bool forceESCacheClearOnNewRun_
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
bool shouldWeStop() const
std::vector< std::shared_ptr< const EventSetupImpl > > & eventSetupImpls()
std::atomic< unsigned int > streamRunActive_
void readAndMergeLumi(LuminosityBlockProcessingStatus &)
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
std::vector< std::string > branchesToDeleteEarly_
void readAndMergeLumiEntriesAsync(std::shared_ptr< LuminosityBlockProcessingStatus >, WaitingTaskHolder)
void closeInputFile(bool cleaningUpAfterException)
void readAndMergeRunEntriesAsync(std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
void readProcessBlock(ProcessBlockPrincipal &)
bool adjustToNewProductRegistry(ProductRegistry const ®)
static ComponentFactory< T > const * get()
std::exception_ptr deferredExceptionPtr_
void removeModules(std::vector< ModuleDescription const *> const &modules)
std::shared_ptr< LuminosityBlockPrincipal > readLuminosityBlock(std::shared_ptr< RunPrincipal > rp)
auto lastTask(edm::WaitingTaskHolder iTask)
T const & get(Event const &event, InputTag const &tag) noexcept(false)
void endUnfinishedLumi(bool cleaningUpAfterException)
bool shouldWeCloseOutput() const
PathsAndConsumesOfModules pathsAndConsumesOfModules_
int totalEventsPassed() const
edm::propagate_const< std::shared_ptr< ProcessBlockHelper > > processBlockHelper_
void continueLumiAsync(WaitingTaskHolder)
Transition requestedTransition() const
void globalEndLumiAsync(WaitingTaskHolder, std::shared_ptr< LuminosityBlockProcessingStatus >)
Log< level::System, true > LogAbsolute
void prepareForNextLoop()
std::shared_ptr< LuminosityBlockPrincipal > getAvailableLumiPrincipalPtr()
void streamBeginRunAsync(unsigned int iStream, std::shared_ptr< RunProcessingStatus >, bool precedingTasksSucceeded, WaitingTaskHolder)
auto wrap(F iFunc) -> decltype(iFunc())
std::shared_ptr< ActivityRegistry > actReg_
std::atomic< unsigned int > streamLumiActive_
Log< level::Warning, false > LogWarning
void streamEndLumiAsync(WaitingTaskHolder, unsigned int iStreamIndex)
void beginProcessBlock(bool &beginProcessBlockSucceeded)
The Signals That Services Can Subscribe To This is based on ActivityRegistry h
Helper function to determine trigger accepts.
T first(std::pair< T, U > const &p)
static ParentageRegistry * instance()
void setExceptionMessageLumis()
void beginRunAsync(IOVSyncValue const &, WaitingTaskHolder)
bool setDeferredException(std::exception_ptr)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
void setEventProcessingState(EventProcessingState val)
bool deleteNonConsumedUnscheduledModules_
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
void inputProcessBlocks()
bool insertMapped(value_type const &v)
static Registry * instance()
PrincipalCache principalCache_
void push(oneapi::tbb::task_group &iGroup, T &&iAction)
asynchronously pushes functor iAction into queue
EventProcessor(std::unique_ptr< ParameterSet > parameterSet, ServiceToken const &token=ServiceToken(), serviceregistry::ServiceLegacy=serviceregistry::kOverlapIsError, std::vector< std::string > const &defaultServices=std::vector< std::string >(), std::vector< std::string > const &forcedServices=std::vector< std::string >())
void dumpOptionsToLogFile(unsigned int nThreads, unsigned int nStreams, unsigned int nConcurrentLumis, unsigned int nConcurrentRuns)