83 #include "boost/range/adaptor/reversed.hpp" 95 #include "oneapi/tbb/task.h" 103 class PauseQueueSentry {
106 ~PauseQueueSentry() { queue_.resume(); }
115 namespace chain = waiting_task::chain;
118 std::unique_ptr<InputSource>
makeInput(
unsigned int moduleIndex,
121 std::shared_ptr<ProductRegistry> preg,
122 std::shared_ptr<BranchIDListHelper> branchIDListHelper,
123 std::shared_ptr<ProcessBlockHelper>
const& processBlockHelper,
124 std::shared_ptr<ThinnedAssociationsHelper> thinnedAssociationsHelper,
125 std::shared_ptr<ActivityRegistry> areg,
126 std::shared_ptr<ProcessConfiguration const> processConfiguration,
129 if (main_input ==
nullptr) {
131 <<
"There must be exactly one source in the configuration.\n" 132 <<
"It is missing (or there are sufficient syntax errors such that it is not recognized as the source)\n";
137 std::unique_ptr<ParameterSetDescriptionFillerBase>
filler(
140 filler->fill(descriptions);
145 std::ostringstream ost;
146 ost <<
"Validating configuration of input source of type " << modtype;
162 processConfiguration.get(),
169 thinnedAssociationsHelper,
173 common.maxSecondsUntilRampdown_,
176 areg->preSourceConstructionSignal_(md);
177 std::unique_ptr<InputSource>
input;
180 std::shared_ptr<int> sentry(
nullptr, [areg, &md](
void*) { areg->postSourceConstructionSignal_(md); });
183 input->preEventReadFromSourceSignal_.connect(std::cref(areg->preEventReadFromSourceSignal_));
184 input->postEventReadFromSourceSignal_.connect(std::cref(areg->postEventReadFromSourceSignal_));
187 std::ostringstream ost;
188 ost <<
"Constructing input source of type " << modtype;
199 std::vector<std::string>
const& loopers) {
200 std::shared_ptr<EDLooperBase> vLooper;
202 assert(1 == loopers.size());
204 for (
auto const& looperName : loopers) {
216 std::vector<std::string>
const& defaultServices,
217 std::vector<std::string>
const& forcedServices)
220 branchIDListHelper_(),
223 espController_(new eventsetup::EventSetupsController),
226 processConfiguration_(),
232 deferredExceptionPtrIsSet_(
false),
236 beginJobCalled_(
false),
237 shouldWeStop_(
false),
238 fileModeNoMerge_(
false),
239 exceptionMessageFiles_(),
240 exceptionMessageRuns_(
false),
241 exceptionMessageLumis_(
false),
242 forceLooperToEnd_(
false),
243 looperBeginJobRun_(
false),
244 forceESCacheClearOnNewRun_(
false),
245 eventSetupDataToExcludeFromPrefetching_() {
247 processDesc->addServices(defaultServices, forcedServices);
248 init(processDesc, iToken, iLegacy);
252 std::vector<std::string>
const& defaultServices,
253 std::vector<std::string>
const& forcedServices)
256 branchIDListHelper_(),
259 espController_(new eventsetup::EventSetupsController),
262 processConfiguration_(),
268 deferredExceptionPtrIsSet_(
false),
272 beginJobCalled_(
false),
273 shouldWeStop_(
false),
274 fileModeNoMerge_(
false),
275 exceptionMessageFiles_(),
276 exceptionMessageRuns_(
false),
277 exceptionMessageLumis_(
false),
278 forceLooperToEnd_(
false),
279 looperBeginJobRun_(
false),
280 forceESCacheClearOnNewRun_(
false),
281 eventSetupDataToExcludeFromPrefetching_() {
283 processDesc->addServices(defaultServices, forcedServices);
292 branchIDListHelper_(),
295 espController_(new eventsetup::EventSetupsController),
298 processConfiguration_(),
304 deferredExceptionPtrIsSet_(
false),
308 beginJobCalled_(
false),
309 shouldWeStop_(
false),
310 fileModeNoMerge_(
false),
311 exceptionMessageFiles_(),
312 exceptionMessageRuns_(
false),
313 exceptionMessageLumis_(
false),
314 forceLooperToEnd_(
false),
315 looperBeginJobRun_(
false),
316 forceESCacheClearOnNewRun_(
false),
317 eventSetupDataToExcludeFromPrefetching_() {
332 std::shared_ptr<ParameterSet>
parameterSet = processDesc->getProcessPSet();
336 bool const hasSubProcesses = !subProcessVParameterSet.empty();
349 <<
"Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
357 unsigned int nThreads = optionsPset.getUntrackedParameter<
unsigned int>(
"numberOfThreads");
363 unsigned int nStreams = optionsPset.getUntrackedParameter<
unsigned int>(
"numberOfStreams");
367 unsigned int nConcurrentLumis =
368 optionsPset.getUntrackedParameter<
unsigned int>(
"numberOfConcurrentLuminosityBlocks");
369 if (nConcurrentLumis == 0) {
370 nConcurrentLumis = 2;
372 if (nConcurrentLumis > nStreams) {
373 nConcurrentLumis = nStreams;
375 unsigned int nConcurrentRuns = optionsPset.getUntrackedParameter<
unsigned int>(
"numberOfConcurrentRuns");
376 if (nConcurrentRuns == 0 || nConcurrentRuns > nConcurrentLumis) {
377 nConcurrentRuns = nConcurrentLumis;
380 if (!loopers.empty()) {
382 if (nStreams != 1 || nConcurrentLumis != 1 || nConcurrentRuns != 1) {
383 edm::LogWarning(
"ThreadStreamSetup") <<
"There is a looper, so the number of streams, the number " 384 "of concurrent runs, and the number of concurrent lumis " 385 "are all being reset to 1. Loopers cannot currently support " 386 "values greater than 1.";
388 nConcurrentLumis = 1;
392 bool dumpOptions = optionsPset.getUntrackedParameter<
bool>(
"dumpOptions");
396 if (nThreads > 1
or nStreams > 1) {
397 edm::LogInfo(
"ThreadStreamSetup") <<
"setting # threads " << nThreads <<
"\nsetting # streams " << nStreams;
406 unsigned int maxConcurrentIOVs =
407 3 * nConcurrentRuns - 2 + ((nConcurrentLumis > nConcurrentRuns) ? (nConcurrentLumis - nConcurrentRuns) : 0);
413 optionsPset.getUntrackedParameter<
bool>(
"deleteNonConsumedUnscheduledModules");
416 if (not hasSubProcesses) {
420 auto referencePSets =
421 optionsPset.getUntrackedParameter<std::vector<edm::ParameterSet>>(
"holdsReferencesToDeleteEarly");
422 for (
auto const&
pset : referencePSets) {
424 auto references =
pset.getParameter<std::vector<std::string>>(
"references");
425 for (
auto const& ref : references) {
430 optionsPset.getUntrackedParameter<std::vector<std::string>>(
"modulesToIgnoreForDeleteEarly");
437 auto& serviceSets = processDesc->getServicesPSets();
447 handler->willBeUsingThreads();
454 ParameterSet const& eventSetupPset(optionsPset.getUntrackedParameterSet(
"eventSetup"));
459 if (!loopers.empty()) {
470 runQueue_ = std::make_unique<LimitedTaskQueue>(nConcurrentRuns);
471 lumiQueue_ = std::make_unique<LimitedTaskQueue>(nConcurrentLumis);
479 std::optional<ScheduleItems::MadeModules> madeModules;
482 tbb::task_group
group;
485 auto tempReg = std::make_shared<ProductRegistry>();
488 group.run([&,
this]() {
495 group.run([&,
this, tempReg]() {
501 items.branchIDListHelper(),
503 items.thinnedAssociationsHelper(),
505 items.processConfiguration(),
510 items.preg()->addFromInput(*tempReg);
540 auto ep = std::make_shared<EventPrincipal>(
preg(),
552 auto rp = std::make_unique<RunPrincipal>(
573 for (
auto& subProcessPSet : subProcessVParameterSet) {
644 std::vector<ModuleProcessName> consumedBySubProcesses;
647 auto c = subProcess.keepOnlyConsumedUnscheduledModules(deleteModules);
648 if (consumedBySubProcesses.empty()) {
650 }
else if (not
c.empty()) {
651 std::vector<ModuleProcessName>
tmp;
652 tmp.reserve(consumedBySubProcesses.size() +
c.size());
654 consumedBySubProcesses.end(),
657 std::back_inserter(
tmp));
666 not unusedModules.empty()) {
669 edm::LogInfo(
"DeleteModules").log([&unusedModules](
auto&
l) {
670 l <<
"Following modules are not in any Path or EndPath, nor is their output consumed by any other module, " 672 "therefore they are deleted before beginJob transition.";
689 schedule_->initializeEarlyDelete(branchesToDeleteEarly, referencesToBranches, modulesToSkip, *
preg_);
719 ex.
addContext(
"Calling beginJob for the source");
725 constexpr
bool mustPrefetchMayGet =
true;
727 auto const runLookup =
preg_->productLookup(
InRun);
728 auto const lumiLookup =
preg_->productLookup(
InLumi);
731 looper_->updateLookup(
InRun, *runLookup, mustPrefetchMayGet);
732 looper_->updateLookup(
InLumi, *lumiLookup, mustPrefetchMayGet);
734 looper_->updateLookup(
esp_->recordsToProxyIndices());
738 actReg_->postBeginJobSignal_();
740 oneapi::tbb::task_group
group;
743 first([
this](
auto nextTask) {
745 first([
i,
this](
auto nextTask) {
760 "Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
767 oneapi::tbb::task_group
group;
775 first([
this,
i, &
c, &collectorMutex](
auto nextTask) {
776 std::exception_ptr
ep;
781 ep = std::current_exception();
784 std::lock_guard<std::mutex>
l(collectorMutex);
785 c.call([&
ep]() { std::rethrow_exception(
ep); });
787 }) |
then([
this,
i, &
c, &collectorMutex](
auto nextTask) {
789 first([
this,
i, &
c, &collectorMutex, &subProcess](
auto nextTask) {
790 std::exception_ptr
ep;
793 subProcess.doEndStream(
i);
795 ep = std::current_exception();
798 std::lock_guard<std::mutex>
l(collectorMutex);
799 c.call([&
ep]() { std::rethrow_exception(
ep); });
806 waitTask.waitNoThrow();
809 c.call([actReg]() { actReg->preEndJobSignal_(); });
818 c.call([actReg]() { actReg->postEndJobSignal_(); });
827 return schedule_->getAllModuleDescriptions();
839 #include "TransitionProcessors.icc" 843 bool returnValue =
false;
858 itemType =
input_->nextItemType();
862 sentry.completedSuccessfully();
884 bool firstTime =
true;
894 auto trans =
fp.processFiles(*
this);
905 throw cms::Exception(
"BadTransition") <<
"Unexpected transition change " << trans;
914 "Another exception was caught while trying to clean up lumis after the primary fatal exception.");
915 e.addAdditionalInfo(message);
916 if (
e.alreadyPrinted()) {
922 "Another exception was caught while trying to clean up runs after the primary fatal exception.");
923 e.addAdditionalInfo(message);
924 if (
e.alreadyPrinted()) {
930 if (
e.alreadyPrinted()) {
940 FDEBUG(1) <<
" \treadFile\n";
946 streamRunStatus_[0]->runPrincipal()->adjustIndexesAfterProductRegistryAddition();
950 streamLumiStatus_[0]->lumiPrincipal()->adjustIndexesAfterProductRegistryAddition();
954 if (size < preg_->
size()) {
961 sentry.completedSuccessfully();
967 input_->closeFile(
fb_.get(), cleaningUpAfterException);
968 sentry.completedSuccessfully();
970 FDEBUG(1) <<
"\tcloseInputFile\n";
978 FDEBUG(1) <<
"\topenOutputFiles\n";
985 FDEBUG(1) <<
"\tcloseOutputFiles\n";
991 [
this](
auto& subProcess) { subProcess.updateBranchIDListHelper(
branchIDListHelper_->branchIDLists()); });
995 FDEBUG(1) <<
"\trespondToOpenInputFile\n";
1003 FDEBUG(1) <<
"\trespondToCloseInputFile\n";
1013 FDEBUG(1) <<
"\tstartingNewLoop\n";
1019 looper_->setModuleChanger(&changer);
1021 looper_->setModuleChanger(
nullptr);
1027 FDEBUG(1) <<
"\tendOfLoop\n";
1034 FDEBUG(1) <<
"\trewind\n";
1039 FDEBUG(1) <<
"\tprepareForNextLoop\n";
1043 FDEBUG(1) <<
"\tshouldWeCloseOutput\n";
1046 if (subProcess.shouldWeCloseOutput()) {
1052 return schedule_->shouldWeCloseOutput();
1056 FDEBUG(1) <<
"\tdoErrorStuff\n";
1057 LogError(
"StateMachine") <<
"The EventProcessor state machine encountered an unexpected event\n" 1058 <<
"and went to the error state\n" 1059 <<
"Will attempt to terminate processing normally\n" 1060 <<
"(IF using the looper the next loop will be attempted)\n" 1061 <<
"This likely indicates a bug in an input module or corrupted input or both\n";
1072 beginGlobalTransitionAsync<Traits>(
1075 globalWaitTask.wait();
1076 beginProcessBlockSucceeded =
true;
1080 input_->fillProcessBlockHelper();
1082 while (
input_->nextProcessBlock(processBlockPrincipal)) {
1089 beginGlobalTransitionAsync<Traits>(
1092 globalWaitTask.wait();
1096 writeWaitTask.wait();
1117 cleaningUpAfterException);
1118 globalWaitTask.wait();
1120 if (beginProcessBlockSucceeded) {
1123 writeWaitTask.wait();
1146 runStatus->runPrincipal()->reducedProcessHistoryID() ==
input_->reducedProcessHistoryID()) {
1152 runStatus->setHolderOfTaskInProcessRuns(holder);
1169 actReg_->esSyncIOVQueuingSignal_.emit(iSync);
1176 status->endIOVWaitingTasks(),
1177 status->eventSetupImpls(),
1182 }) |
chain::then([
this,
status, iSync](std::exception_ptr
const* iException,
auto nextTask) {
1190 actReg_->postESSyncIOVSignal_.emit(iSync);
1196 if (postRunQueueTask.taskHasFailed()) {
1197 status->resetBeginResources();
1205 *postRunQueueTask.group(), [
this, postSourceTask = postRunQueueTask,
status]()
mutable {
1209 if (postSourceTask.taskHasFailed()) {
1210 status->resetBeginResources();
1212 status->resumeGlobalRunQueue();
1222 sentry.completedSuccessfully();
1229 oneapi::tbb::task_group
group;
1234 }) |
then([
this, &es](
auto nextTask)
mutable {
1245 status->setStopBeforeProcessingRun(
true);
1246 nextTask.doneWaiting(std::current_exception());
1248 }) |
then([
this,
status, &es](
auto nextTask) {
1249 if (
status->stopBeforeProcessingRun()) {
1254 beginGlobalTransitionAsync<Traits>(
1257 if (
status->stopBeforeProcessingRun()) {
1260 status->globalBeginDidSucceed();
1262 if (
status->stopBeforeProcessingRun()) {
1268 if (
status->stopBeforeProcessingRun()) {
1273 }) |
then([
this,
status](std::exception_ptr
const* iException,
auto holder)
mutable {
1274 bool precedingTasksSucceeded =
true;
1276 precedingTasksSucceeded =
false;
1281 if (
status->stopBeforeProcessingRun()) {
1283 status->resetBeginResources();
1285 status->resumeGlobalRunQueue();
1291 auto globalEndRunTask =
1294 status->holderOfTaskInProcessRuns().doneWaiting(std::exception_ptr{});
1299 status->resetBeginResources();
1301 status->resumeGlobalRunQueue();
1302 holder.doneWaiting(std::current_exception());
1317 *holder.group(), [
this,
status, precedingTasksSucceeded, holder]()
mutable {
1322 [
this,
i,
status, precedingTasksSucceeded, holder]()
mutable {
1327 if (
status->streamFinishedBeginRun()) {
1330 status->resetBeginResources();
1340 status->resetBeginResources();
1347 status->resetBeginResources();
1349 status->resumeGlobalRunQueue();
1350 postSourceTask.doneWaiting(std::current_exception());
1354 status->resetBeginResources();
1356 status->resumeGlobalRunQueue();
1357 postRunQueueTask.doneWaiting(std::current_exception());
1361 status->resetBeginResources();
1363 nextTask.doneWaiting(std::current_exception());
1369 std::shared_ptr<RunProcessingStatus>
status,
1370 bool precedingTasksSucceeded,
1379 chain::first([
this, iStream, precedingTasksSucceeded](
auto nextTask) {
1380 if (precedingTasksSucceeded) {
1385 beginStreamTransitionAsync<Traits>(
1388 }) |
then([
this, iStream](std::exception_ptr
const* exceptionFromBeginStreamRun,
auto nextTask) {
1389 if (exceptionFromBeginStreamRun) {
1390 nextTask.doneWaiting(*exceptionFromBeginStreamRun);
1402 if (
status->streamFinishedBeginRun()) {
1403 status->resetBeginResources();
1410 RunPrincipal& runPrincipal = *iRunStatus->runPrincipal();
1423 iRunStatus->endIOVWaitingTasksEndRun(),
1424 iRunStatus->eventSetupImplsEndRun(),
1428 }) |
chain::then([
this, iRunStatus, ts](std::exception_ptr
const* iException,
auto nextTask) {
1430 iRunStatus->setEndingEventSetupSucceeded(
false);
1442 streamQueues_[
i].push(*nextTask.group(), [
this,
i, nextTask]()
mutable {
1469 tmp.doneWaiting(iException);
1474 auto& runPrincipal = *(iRunStatus->runPrincipal());
1475 bool didGlobalBeginSucceed = iRunStatus->didGlobalBeginSucceed();
1476 bool cleaningUpAfterException = iRunStatus->cleaningUpAfterException() || iTask.
taskHasFailed();
1477 EventSetupImpl const& es = iRunStatus->eventSetupImplEndRun(
esp_->subProcessIndex());
1478 std::vector<std::shared_ptr<const EventSetupImpl>>
const* eventSetupImpls = &iRunStatus->eventSetupImplsEndRun();
1479 bool endingEventSetupSucceeded = iRunStatus->endingEventSetupSucceeded();
1483 chain::first([
this, &runPrincipal, &es, &eventSetupImpls, cleaningUpAfterException, endingEventSetupSucceeded](
1485 if (endingEventSetupSucceeded) {
1488 endGlobalTransitionAsync<Traits>(
1493 [
this, &runPrincipal, &es](
auto nextTask) {
1497 [
this, &runPrincipal, &es](
auto nextTask) {
1501 ifThen(didGlobalBeginSucceed && endingEventSetupSucceeded,
1502 [
this, mergeableRunProductMetadata, &runPrincipal = runPrincipal](
auto nextTask) {
1504 writeRunAsync(nextTask, runPrincipal, mergeableRunProductMetadata);
1508 didGlobalBeginSucceed,
1509 mergeableRunProductMetadata,
1510 endingEventSetupSucceeded](std::exception_ptr
const* iException,
auto nextTask)
mutable {
1511 if (didGlobalBeginSucceed && endingEventSetupSucceeded) {
1512 mergeableRunProductMetadata->postWriteRun();
1519 std::exception_ptr ptr;
1526 ptr = std::current_exception();
1530 status->resumeGlobalRunQueue();
1534 ptr = std::current_exception();
1538 status->resetEndResources();
1542 ptr = std::current_exception();
1546 if (ptr && !iException) {
1572 if (runStatus->streamFinishedRun()) {
1573 runStatus->globalEndRunHolder().doneWaiting(std::exception_ptr());
1584 if (runStatus->didGlobalBeginSucceed() && runStatus->endingEventSetupSucceeded()) {
1586 auto eventSetupImpls = &runStatus->eventSetupImplsEndRun();
1587 bool cleaningUpAfterException = runStatus->cleaningUpAfterException() || iTask.
taskHasFailed();
1589 auto& runPrincipal = *runStatus->runPrincipal();
1592 endStreamTransitionAsync<Traits>(
std::move(runDoneTaskHolder),
1598 cleaningUpAfterException);
1610 runStatus->setCleaningUpAfterException(cleaningUpAfterException);
1612 runStatus->setHolderOfTaskInProcessRuns(holder);
1620 std::shared_ptr<RunProcessingStatus> iRunStatus,
1622 actReg_->esSyncIOVQueuingSignal_.emit(iSync);
1628 status->endIOVWaitingTasks(),
1629 status->eventSetupImpls(),
1633 }) |
chain::then([
this,
status, iRunStatus, iSync](std::exception_ptr
const* iException,
auto nextTask) {
1642 actReg_->postESSyncIOVSignal_.emit(iSync);
1648 if (postLumiQueueTask.taskHasFailed()) {
1649 status->resetResources();
1658 *postLumiQueueTask.group(),
1659 [
this, postSourceTask = postLumiQueueTask,
status, iRunStatus]()
mutable {
1663 if (postSourceTask.taskHasFailed()) {
1664 status->resetResources();
1676 sentry.completedSuccessfully();
1682 rng->preBeginLumi(lb);
1691 }) |
then([
this,
status, &es, &lumiPrincipal](
auto nextTask) {
1694 beginGlobalTransitionAsync<Traits>(
1700 status->globalBeginDidSucceed();
1703 }) |
then([
this,
status, iRunStatus](std::exception_ptr
const* iException,
auto holder)
mutable {
1705 status->resetResources();
1712 status->globalBeginDidSucceed();
1715 status->setGlobalEndRunHolder(iRunStatus->globalEndRunHolder());
1729 auto eventSetupImpls = &
status->eventSetupImpls();
1730 auto lp =
status->lumiPrincipal().get();
1733 event.setLuminosityBlockPrincipal(lp);
1737 beginStreamTransitionAsync<Traits>(
std::move(nextTask),
1744 then([
this,
i](std::exception_ptr
const* exceptionFromBeginStreamLumi,
1746 if (exceptionFromBeginStreamLumi) {
1748 copyHolder.
doneWaiting(*exceptionFromBeginStreamLumi);
1759 status->resetResources();
1767 status->resetResources();
1775 status->resetResources();
1791 status->lumiPrincipal()->luminosityBlock() ==
input_->luminosityBlock()) {
1797 unsigned int streamIndex = 0;
1798 oneapi::tbb::task_arena arena{oneapi::tbb::task_arena::attach()};
1802 nextTask.group()->run(
1812 tmp.doneWaiting(iException);
1817 std::shared_ptr<LuminosityBlockProcessingStatus> iLumiStatus) {
1820 auto& lp = *(iLumiStatus->lumiPrincipal());
1821 bool didGlobalBeginSucceed = iLumiStatus->didGlobalBeginSucceed();
1822 bool cleaningUpAfterException = iLumiStatus->cleaningUpAfterException() || iTask.
taskHasFailed();
1824 std::vector<std::shared_ptr<const EventSetupImpl>>
const* eventSetupImpls = &iLumiStatus->eventSetupImpls();
1827 chain::first([
this, &lp, &es, &eventSetupImpls, cleaningUpAfterException](
auto nextTask) {
1832 endGlobalTransitionAsync<Traits>(
1834 }) |
then([
this, didGlobalBeginSucceed, &lumiPrincipal = lp](
auto nextTask) {
1836 if (didGlobalBeginSucceed) {
1845 }) |
then([
status =
std::move(iLumiStatus),
this](std::exception_ptr
const* iException,
auto nextTask)
mutable {
1851 std::exception_ptr ptr;
1859 ptr = std::current_exception();
1865 ptr = std::current_exception();
1870 status->resetResources();
1871 status->globalEndRunHolderDoneWaiting();
1875 ptr = std::current_exception();
1879 if (ptr && !iException) {
1898 if (
status->streamFinishedLumi()) {
1908 lumiStatus->setEndTime();
1911 auto eventSetupImpls = &lumiStatus->eventSetupImpls();
1912 bool cleaningUpAfterException = lumiStatus->cleaningUpAfterException() || iTask.
taskHasFailed();
1914 if (lumiStatus->didGlobalBeginSucceed()) {
1915 auto& lumiPrincipal = *lumiStatus->lumiPrincipal();
1918 endStreamTransitionAsync<Traits>(
std::move(lumiDoneTask),
1924 cleaningUpAfterException);
1940 globalWaitTask.wait();
1947 input_->readProcessBlock(processBlockPrincipal);
1948 sentry.completedSuccessfully();
1954 rp->setAux(*
input_->runAuxiliary());
1958 sentry.completedSuccessfully();
1960 assert(
input_->reducedProcessHistoryID() == rp->reducedProcessHistoryID());
1972 input_->readAndMergeRun(runPrincipal);
1973 sentry.completedSuccessfully();
1980 lbp->setAux(*
input_->luminosityBlockAuxiliary());
1984 sentry.completedSuccessfully();
1992 assert(lumiPrincipal.aux().sameIdentity(*
input_->luminosityBlockAuxiliary())
or 1993 input_->processHistoryRegistry().reducedProcessHistoryID(lumiPrincipal.aux().processHistoryID()) ==
1994 input_->processHistoryRegistry().reducedProcessHistoryID(
1995 input_->luminosityBlockAuxiliary()->processHistoryID()));
1996 bool lumiOK = lumiPrincipal.adjustToNewProductRegistry(*
preg());
1998 lumiPrincipal.mergeAuxiliary(*
input_->luminosityBlockAuxiliary());
2002 sentry.completedSuccessfully();
2015 s.writeProcessBlockAsync(nextTask, processBlockType);
2031 s.writeRunAsync(nextTask, runPrincipal, mergeableRunProductMetadata);
2056 s.writeLumiAsync(nextTask, lumiPrincipal);
2066 iStatus.
lumiPrincipal()->setRunPrincipal(std::shared_ptr<RunPrincipal>());
2079 std::lock_guard<std::recursive_mutex> guard(*(
sourceMutex_.get()));
2083 status->runPrincipal()->reducedProcessHistoryID() ==
input_->reducedProcessHistoryID()) {
2084 if (
status->holderOfTaskInProcessRuns().taskHasFailed()) {
2085 status->setStopBeforeProcessingRun(
true);
2092 status->setStopBeforeProcessingRun(
true);
2093 holder.doneWaiting(std::current_exception());
2106 std::lock_guard<std::recursive_mutex> guard(*(
sourceMutex_.get()));
2110 iLumiStatus->lumiPrincipal()->luminosityBlock() ==
input_->luminosityBlock()) {
2115 holder.doneWaiting(std::current_exception());
2123 iRunStatus->holderOfTaskInProcessRuns().doneWaiting(std::exception_ptr{});
2128 input_->luminosityBlockAuxiliary()->beginTime()),
2144 unsigned int iStreamIndex,
2176 std::lock_guard<std::recursive_mutex> guard(*(
sourceMutex_.get()));
2191 iStatus.
lumiPrincipal()->runPrincipal().reducedProcessHistoryID() !=
input_->reducedProcessHistoryID()))) {
2210 auto recursionTask =
2211 make_waiting_task([
this, iTask, iStreamIndex](std::exception_ptr
const* iEventException)
mutable {
2212 if (iEventException) {
2226 if (not
status->haveStartedNextLumiOrEndedRun()) {
2227 status->startNextLumiOrEndRun();
2231 input_->luminosityBlockAuxiliary()->beginTime()),
2250 if (runStatus->holderOfTaskInProcessRuns().hasTask()) {
2251 runStatus->holderOfTaskInProcessRuns().doneWaiting(std::exception_ptr{});
2273 sentry.completedSuccessfully();
2275 FDEBUG(1) <<
"\treadEvent\n";
2289 rng->postEventRead(
ev);
2294 chain::first([
this, &es, pep, iStreamIndex](
auto nextTask) {
2299 subProcess.doEventAsync(nextTask, *pep, &
streamLumiStatus_[iStreamIndex]->eventSetupImpls());
2301 }) |
ifThen(
looper_, [
this, iStreamIndex, pep](
auto nextTask) {
2305 }) |
then([pep](
auto nextTask) {
2306 FDEBUG(1) <<
"\tprocessEvent\n";
2307 pep->clearEventPrincipal();
2312 bool randomAccess =
input_->randomAccess();
2321 status =
looper_->doDuringLoop(iPrincipal, es, pc, &streamContext);
2339 FDEBUG(1) <<
"\tshouldWeStop\n";
2344 if (subProcess.terminate()) {
2360 bool expected =
false;
2370 ex <<
"The framework is configured to use at least two streams, but the following modules\n" 2371 <<
"require synchronizing on LuminosityBlock boundaries:";
2373 for (
auto worker :
schedule_->allWorkers()) {
2374 if (worker->wantsGlobalLuminosityBlocks() and worker->globalLuminosityBlocksQueue()) {
2376 ex <<
"\n " << worker->description()->moduleName() <<
" " << worker->description()->moduleLabel();
2380 ex <<
"\n\nThe situation can be fixed by either\n" 2381 <<
" * modifying the modules to support concurrent LuminosityBlocks (preferred), or\n" 2382 <<
" * setting 'process.options.numberOfConcurrentLuminosityBlocks = 1' in the configuration file";
2388 std::unique_ptr<LogSystem>
s;
2389 for (
auto worker :
schedule_->allWorkers()) {
2390 if (worker->wantsGlobalRuns() and worker->globalRunsQueue()) {
2392 s = std::make_unique<LogSystem>(
"ModulesSynchingOnRuns");
2393 (*s) <<
"The following modules require synchronizing on Run boundaries:";
2395 (*s) <<
"\n " << worker->description()->moduleName() <<
" " << worker->description()->moduleLabel();
2401 std::unique_ptr<LogSystem>
s;
2402 for (
auto worker :
schedule_->allWorkers()) {
2405 s = std::make_unique<LogSystem>(
"LegacyModules");
2406 (*s) <<
"The following legacy modules are configured. Support for legacy modules\n" 2407 "is going to end soon. These modules need to be converted to have type\n" 2408 "edm::global, edm::stream, edm::one, or in rare cases edm::limited.";
2410 (*s) <<
"\n " << worker->description()->moduleName() <<
" " << worker->description()->moduleLabel();
LuminosityBlockNumber_t luminosityBlock() const
std::atomic< bool > exceptionMessageLumis_
bool readNextEventForStream(WaitingTaskHolder const &, unsigned int iStreamIndex, LuminosityBlockProcessingStatus &)
void readEvent(unsigned int iStreamIndex)
void streamEndRunAsync(WaitingTaskHolder, unsigned int iStreamIndex)
ProcessContext processContext_
void setProcessesWithMergeableRunProducts(ProductRegistry const &productRegistry)
T getParameter(std::string const &) const
void clear()
Not thread safe.
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
void clearRunPrincipal(RunProcessingStatus &)
void setExceptionMessageRuns()
void globalEndRunAsync(WaitingTaskHolder, std::shared_ptr< RunProcessingStatus >)
SharedResourcesAcquirer sourceResourcesAcquirer_
Timestamp const & endTime() const
void handleNextEventForStreamAsync(WaitingTaskHolder, unsigned int iStreamIndex)
int totalEventsFailed() const
InputSource::ItemType nextTransitionType()
std::shared_ptr< ProductRegistry const > preg() const
void warnAboutLegacyModules() const
void processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
def create(alignables, pedeDump, additionalData, outputFile, config)
std::shared_ptr< RunPrincipal > readRun()
void handleEndLumiExceptions(std::exception_ptr, WaitingTaskHolder const &)
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
static PFTauRenderPlugin instance
EventProcessingState eventProcessingState() const
SerialTaskQueue streamQueuesInserter_
void endUnfinishedRun(bool cleaningUpAfterException)
void setExceptionMessageFiles(std::string &message)
RunPrincipal const & runPrincipal() const
void processEventAsyncImpl(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
void processEventWithLooper(EventPrincipal &, unsigned int iStreamIndex)
void handleEndRunExceptions(std::exception_ptr, WaitingTaskHolder const &)
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
std::unique_ptr< edm::LimitedTaskQueue > lumiQueue_
volatile std::atomic< bool > shutdown_flag
static LuminosityBlockNumber_t maxLuminosityBlockNumber()
void clearCounters()
Clears counters used by trigger report.
ProcessBlockPrincipal & inputProcessBlockPrincipal() const
void push(oneapi::tbb::task_group &, const T &iAction)
asynchronously pushes functor iAction into queue
bool checkForAsyncStopRequest(StatusCode &)
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
void writeLumiAsync(WaitingTaskHolder, LuminosityBlockPrincipal &)
std::shared_ptr< EDLooperBase const > looper() const
void ensureAvailableAccelerators(edm::ParameterSet const ¶meterSet)
ParameterSet getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
std::shared_ptr< RunPrincipal > getAvailableRunPrincipalPtr()
InputSource::ItemType lastTransitionType() const
std::shared_ptr< RunPrincipal > & runPrincipal()
void adjustEventsToNewProductRegistry(std::shared_ptr< ProductRegistry const >)
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
Log< level::Error, false > LogError
void adjustIndexesAfterProductRegistryAddition()
StreamID streamID() const
ParameterSet const & parameterSet(StableProvenance const &provenance, ProcessHistory const &history)
PreallocationConfiguration preallocations_
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
constexpr auto then(O &&iO)
std::vector< SubProcess > subProcesses_
static unsigned int getUniqueID()
Returns a unique id each time called. Intended to be passed to ModuleDescription's constructor's modI...
std::atomic< bool > exceptionMessageRuns_
void mergeAuxiliary(RunAuxiliary const &aux)
edm::SerialTaskQueue queueWhichWaitsForIOVsToFinish_
void validateTopLevelParameterSets(ParameterSet *processParameterSet)
MergeableRunProductProcesses mergeableRunProductProcesses_
static std::string const input
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
void fillProcessBlockPrincipal(std::string const &processName, DelayedReader *reader=nullptr)
void setEndTime(Timestamp const &time)
ProcessBlockPrincipal & processBlockPrincipal() const
void endProcessBlock(bool cleaningUpAfterException, bool beginProcessBlockSucceeded)
U second(std::pair< T, U > const &p)
std::shared_ptr< LuminosityBlockPrincipal > & lumiPrincipal()
bool firstItemAfterLumiMerge_
oneapi::tbb::task_group * group() const noexcept
std::multimap< std::string, std::string > referencesToBranches_
unsigned int numberOfThreads() const
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
ServiceToken serviceToken_
ParameterSetID id() const
std::atomic< bool > deferredExceptionPtrIsSet_
bool resume()
Resumes processing if the queue was paused.
void doneWaiting(std::exception_ptr iExcept)
ParameterSet const & registerIt()
std::shared_ptr< EDLooperBase > fillLooper(eventsetup::EventSetupsController &esController, eventsetup::EventSetupProvider &cp, ParameterSet ¶ms, std::vector< std::string > const &loopers)
std::vector< edm::SerialTaskQueue > streamQueues_
auto runLast(edm::WaitingTaskHolder iTask)
bool taskHasFailed() const noexcept
std::vector< std::string > modulesToIgnoreForDeleteEarly_
void swap(edm::DataFrameContainer &lhs, edm::DataFrameContainer &rhs)
ShouldWriteRun shouldWriteRun() const
std::unique_ptr< edm::LimitedTaskQueue > runQueue_
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
void writeRunAsync(WaitingTaskHolder, RunPrincipal const &, MergeableRunProductMetadata const *)
int merge(int argc, char *argv[])
SerialTaskQueueChain & serialQueueChain() const
static void setThrowAnException(bool v)
void writeProcessBlockAsync(WaitingTaskHolder, ProcessBlockType)
void setLastOperationSucceeded(bool value)
unsigned int numberOfStreams() const
void initialize(Schedule const *, std::shared_ptr< ProductRegistry const >)
std::vector< ModuleDescription const * > nonConsumedUnscheduledModules(edm::PathsAndConsumesOfModulesBase const &iPnC, std::vector< ModuleProcessName > &consumedByChildren)
static ServiceRegistry & instance()
void clear()
Not thread safe.
StatusCode runToCompletion()
FunctorWaitingTask< F > * make_waiting_task(F f)
void clearLumiPrincipal(LuminosityBlockProcessingStatus &)
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
bool lastOperationSucceeded() const
unsigned int numberOfRuns() const
void insert(std::unique_ptr< ProcessBlockPrincipal >)
std::vector< std::shared_ptr< RunProcessingStatus > > streamRunStatus_
std::string exceptionMessageFiles_
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
std::unique_ptr< InputSource > makeInput(unsigned int moduleIndex, ParameterSet ¶ms, CommonParams const &common, std::shared_ptr< ProductRegistry > preg, std::shared_ptr< BranchIDListHelper > branchIDListHelper, std::shared_ptr< ProcessBlockHelper > const &processBlockHelper, std::shared_ptr< ThinnedAssociationsHelper > thinnedAssociationsHelper, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, PreallocationConfiguration const &allocations)
void releaseBeginRunResources(unsigned int iStream)
std::shared_ptr< RunProcessingStatus > exceptionRunStatus_
void respondToCloseInputFile()
InputSource::ItemType lastSourceTransition_
Log< level::Info, false > LogInfo
void readAndMergeRun(RunProcessingStatus &)
std::vector< ParameterSet > popSubProcessVParameterSet(ParameterSet ¶meterSet)
void warnAboutModulesRequiringRunSynchronization() const
void checkForModuleDependencyCorrectness(edm::PathsAndConsumesOfModulesBase const &iPnC, bool iPrintDependencies)
void respondToOpenInputFile()
unsigned int numberOfLuminosityBlocks() const
void beginLumiAsync(IOVSyncValue const &, std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
EventSetupImpl const & eventSetupImpl(unsigned subProcessIndex) const
void handleNextItemAfterMergingRunEntries(std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
InputSource::ItemType processRuns()
MergeableRunProductMetadata * mergeableRunProductMetadata()
oneapi::tbb::task_group taskGroup_
void throwAboutModulesRequiringLuminosityBlockSynchronization() const
void insertForInput(std::unique_ptr< ProcessBlockPrincipal >)
void addContext(std::string const &context)
static EventNumber_t maxEventNumber()
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
ShouldWriteLumi shouldWriteLumi() const
edm::EventID specifiedEventTransition() const
void endRunAsync(std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
bool forceESCacheClearOnNewRun_
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
bool shouldWeStop() const
std::vector< std::shared_ptr< const EventSetupImpl > > & eventSetupImpls()
std::atomic< unsigned int > streamRunActive_
void readAndMergeLumi(LuminosityBlockProcessingStatus &)
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
std::vector< std::string > branchesToDeleteEarly_
void readAndMergeLumiEntriesAsync(std::shared_ptr< LuminosityBlockProcessingStatus >, WaitingTaskHolder)
void closeInputFile(bool cleaningUpAfterException)
void readAndMergeRunEntriesAsync(std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
void readProcessBlock(ProcessBlockPrincipal &)
bool adjustToNewProductRegistry(ProductRegistry const ®)
static ComponentFactory< T > const * get()
std::exception_ptr deferredExceptionPtr_
void removeModules(std::vector< ModuleDescription const *> const &modules)
std::shared_ptr< LuminosityBlockPrincipal > readLuminosityBlock(std::shared_ptr< RunPrincipal > rp)
auto lastTask(edm::WaitingTaskHolder iTask)
void endUnfinishedLumi(bool cleaningUpAfterException)
bool shouldWeCloseOutput() const
PathsAndConsumesOfModules pathsAndConsumesOfModules_
int totalEventsPassed() const
edm::propagate_const< std::shared_ptr< ProcessBlockHelper > > processBlockHelper_
void continueLumiAsync(WaitingTaskHolder)
Transition requestedTransition() const
void globalEndLumiAsync(WaitingTaskHolder, std::shared_ptr< LuminosityBlockProcessingStatus >)
Log< level::System, true > LogAbsolute
void prepareForNextLoop()
std::shared_ptr< LuminosityBlockPrincipal > getAvailableLumiPrincipalPtr()
void streamBeginRunAsync(unsigned int iStream, std::shared_ptr< RunProcessingStatus >, bool precedingTasksSucceeded, WaitingTaskHolder)
auto wrap(F iFunc) -> decltype(iFunc())
std::shared_ptr< ActivityRegistry > actReg_
std::atomic< unsigned int > streamLumiActive_
Log< level::Warning, false > LogWarning
void streamEndLumiAsync(WaitingTaskHolder, unsigned int iStreamIndex)
void beginProcessBlock(bool &beginProcessBlockSucceeded)
The Signals That Services Can Subscribe To This is based on ActivityRegistry h
Helper function to determine trigger accepts.
T first(std::pair< T, U > const &p)
static ParentageRegistry * instance()
void setExceptionMessageLumis()
void beginRunAsync(IOVSyncValue const &, WaitingTaskHolder)
bool setDeferredException(std::exception_ptr)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
void setEventProcessingState(EventProcessingState val)
bool deleteNonConsumedUnscheduledModules_
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
void inputProcessBlocks()
bool insertMapped(value_type const &v)
static Registry * instance()
PrincipalCache principalCache_
void push(oneapi::tbb::task_group &iGroup, T &&iAction)
asynchronously pushes functor iAction into queue
EventProcessor(std::unique_ptr< ParameterSet > parameterSet, ServiceToken const &token=ServiceToken(), serviceregistry::ServiceLegacy=serviceregistry::kOverlapIsError, std::vector< std::string > const &defaultServices=std::vector< std::string >(), std::vector< std::string > const &forcedServices=std::vector< std::string >())
void dumpOptionsToLogFile(unsigned int nThreads, unsigned int nStreams, unsigned int nConcurrentLumis, unsigned int nConcurrentRuns)