84 #include "boost/range/adaptor/reversed.hpp" 96 #include "oneapi/tbb/task.h" 104 class PauseQueueSentry {
107 ~PauseQueueSentry() { queue_.resume(); }
116 namespace chain = waiting_task::chain;
119 std::unique_ptr<InputSource>
makeInput(
unsigned int moduleIndex,
122 std::shared_ptr<ProductRegistry> preg,
123 std::shared_ptr<BranchIDListHelper> branchIDListHelper,
124 std::shared_ptr<ProcessBlockHelper>
const& processBlockHelper,
125 std::shared_ptr<ThinnedAssociationsHelper> thinnedAssociationsHelper,
126 std::shared_ptr<ActivityRegistry> areg,
127 std::shared_ptr<ProcessConfiguration const> processConfiguration,
130 if (main_input ==
nullptr) {
132 <<
"There must be exactly one source in the configuration.\n" 133 <<
"It is missing (or there are sufficient syntax errors such that it is not recognized as the source)\n";
138 std::unique_ptr<ParameterSetDescriptionFillerBase>
filler(
141 filler->fill(descriptions);
146 std::ostringstream ost;
147 ost <<
"Validating configuration of input source of type " << modtype;
163 processConfiguration.get(),
170 thinnedAssociationsHelper,
174 common.maxSecondsUntilRampdown_,
177 areg->preSourceConstructionSignal_(md);
178 std::unique_ptr<InputSource>
input;
181 std::shared_ptr<int> sentry(
nullptr, [areg, &md](
void*) { areg->postSourceConstructionSignal_(md); });
184 input->preEventReadFromSourceSignal_.connect(std::cref(areg->preEventReadFromSourceSignal_));
185 input->postEventReadFromSourceSignal_.connect(std::cref(areg->postEventReadFromSourceSignal_));
188 std::ostringstream ost;
189 ost <<
"Constructing input source of type " << modtype;
200 std::vector<std::string>
const& loopers) {
201 std::shared_ptr<EDLooperBase> vLooper;
203 assert(1 == loopers.size());
205 for (
auto const& looperName : loopers) {
217 std::vector<std::string>
const& defaultServices,
218 std::vector<std::string>
const& forcedServices)
221 branchIDListHelper_(),
225 espController_(
std::make_unique<eventsetup::EventSetupsController>(moduleTypeResolverMaker_.
get())),
228 processConfiguration_(),
234 deferredExceptionPtrIsSet_(
false),
238 beginJobCalled_(
false),
239 shouldWeStop_(
false),
240 fileModeNoMerge_(
false),
241 exceptionMessageFiles_(),
242 exceptionMessageRuns_(
false),
243 exceptionMessageLumis_(
false),
244 forceLooperToEnd_(
false),
245 looperBeginJobRun_(
false),
246 forceESCacheClearOnNewRun_(
false),
247 eventSetupDataToExcludeFromPrefetching_() {
249 processDesc->addServices(defaultServices, forcedServices);
250 init(processDesc, iToken, iLegacy);
254 std::vector<std::string>
const& defaultServices,
255 std::vector<std::string>
const& forcedServices)
258 branchIDListHelper_(),
262 espController_(
std::make_unique<eventsetup::EventSetupsController>(moduleTypeResolverMaker_.
get())),
265 processConfiguration_(),
271 deferredExceptionPtrIsSet_(
false),
275 beginJobCalled_(
false),
276 shouldWeStop_(
false),
277 fileModeNoMerge_(
false),
278 exceptionMessageFiles_(),
279 exceptionMessageRuns_(
false),
280 exceptionMessageLumis_(
false),
281 forceLooperToEnd_(
false),
282 looperBeginJobRun_(
false),
283 forceESCacheClearOnNewRun_(
false),
284 eventSetupDataToExcludeFromPrefetching_() {
286 processDesc->addServices(defaultServices, forcedServices);
295 branchIDListHelper_(),
299 espController_(
std::make_unique<eventsetup::EventSetupsController>(moduleTypeResolverMaker_.
get())),
302 processConfiguration_(),
308 deferredExceptionPtrIsSet_(
false),
312 beginJobCalled_(
false),
313 shouldWeStop_(
false),
314 fileModeNoMerge_(
false),
315 exceptionMessageFiles_(),
316 exceptionMessageRuns_(
false),
317 exceptionMessageLumis_(
false),
318 forceLooperToEnd_(
false),
319 looperBeginJobRun_(
false),
320 forceESCacheClearOnNewRun_(
false),
321 eventSetupDataToExcludeFromPrefetching_() {
336 std::shared_ptr<ParameterSet>
parameterSet = processDesc->getProcessPSet();
340 bool const hasSubProcesses = !subProcessVParameterSet.empty();
353 <<
"Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
361 unsigned int nThreads = optionsPset.getUntrackedParameter<
unsigned int>(
"numberOfThreads");
367 unsigned int nStreams = optionsPset.getUntrackedParameter<
unsigned int>(
"numberOfStreams");
371 unsigned int nConcurrentLumis =
372 optionsPset.getUntrackedParameter<
unsigned int>(
"numberOfConcurrentLuminosityBlocks");
373 if (nConcurrentLumis == 0) {
374 nConcurrentLumis = 2;
376 if (nConcurrentLumis > nStreams) {
377 nConcurrentLumis = nStreams;
379 unsigned int nConcurrentRuns = optionsPset.getUntrackedParameter<
unsigned int>(
"numberOfConcurrentRuns");
380 if (nConcurrentRuns == 0 || nConcurrentRuns > nConcurrentLumis) {
381 nConcurrentRuns = nConcurrentLumis;
384 if (!loopers.empty()) {
386 if (nStreams != 1 || nConcurrentLumis != 1 || nConcurrentRuns != 1) {
387 edm::LogWarning(
"ThreadStreamSetup") <<
"There is a looper, so the number of streams, the number " 388 "of concurrent runs, and the number of concurrent lumis " 389 "are all being reset to 1. Loopers cannot currently support " 390 "values greater than 1.";
392 nConcurrentLumis = 1;
396 bool dumpOptions = optionsPset.getUntrackedParameter<
bool>(
"dumpOptions");
400 if (nThreads > 1
or nStreams > 1) {
401 edm::LogInfo(
"ThreadStreamSetup") <<
"setting # threads " << nThreads <<
"\nsetting # streams " << nStreams;
410 unsigned int maxConcurrentIOVs =
411 3 * nConcurrentRuns - 2 + ((nConcurrentLumis > nConcurrentRuns) ? (nConcurrentLumis - nConcurrentRuns) : 0);
417 optionsPset.getUntrackedParameter<
bool>(
"deleteNonConsumedUnscheduledModules");
420 if (not hasSubProcesses) {
424 auto referencePSets =
425 optionsPset.getUntrackedParameter<std::vector<edm::ParameterSet>>(
"holdsReferencesToDeleteEarly");
426 for (
auto const&
pset : referencePSets) {
428 auto references =
pset.getParameter<std::vector<std::string>>(
"references");
429 for (
auto const& ref : references) {
434 optionsPset.getUntrackedParameter<std::vector<std::string>>(
"modulesToIgnoreForDeleteEarly");
441 auto& serviceSets = processDesc->getServicesPSets();
451 handler->willBeUsingThreads();
458 ParameterSet const& eventSetupPset(optionsPset.getUntrackedParameterSet(
"eventSetup"));
463 if (!loopers.empty()) {
474 runQueue_ = std::make_unique<LimitedTaskQueue>(nConcurrentRuns);
475 lumiQueue_ = std::make_unique<LimitedTaskQueue>(nConcurrentLumis);
483 std::optional<ScheduleItems::MadeModules> madeModules;
486 tbb::task_group
group;
489 auto tempReg = std::make_shared<ProductRegistry>();
492 group.run([&,
this]() {
500 group.run([&,
this, tempReg]() {
506 items.branchIDListHelper(),
508 items.thinnedAssociationsHelper(),
510 items.processConfiguration(),
515 items.preg()->addFromInput(*tempReg);
545 auto ep = std::make_shared<EventPrincipal>(
preg(),
557 auto rp = std::make_unique<RunPrincipal>(
578 for (
auto& subProcessPSet : subProcessVParameterSet) {
648 std::vector<ModuleProcessName> consumedBySubProcesses;
651 auto c = subProcess.keepOnlyConsumedUnscheduledModules(deleteModules);
652 if (consumedBySubProcesses.empty()) {
654 }
else if (not
c.empty()) {
655 std::vector<ModuleProcessName>
tmp;
656 tmp.reserve(consumedBySubProcesses.size() +
c.size());
658 consumedBySubProcesses.end(),
661 std::back_inserter(
tmp));
670 not unusedModules.empty()) {
673 edm::LogInfo(
"DeleteModules").log([&unusedModules](
auto&
l) {
674 l <<
"Following modules are not in any Path or EndPath, nor is their output consumed by any other module, " 676 "therefore they are deleted before beginJob transition.";
693 schedule_->initializeEarlyDelete(branchesToDeleteEarly, referencesToBranches, modulesToSkip, *
preg_);
722 ex.
addContext(
"Calling beginJob for the source");
731 std::exception_ptr firstException;
736 firstException = std::current_exception();
738 if (
looper_ && !firstException) {
740 constexpr bool mustPrefetchMayGet =
true;
742 auto const runLookup =
preg_->productLookup(
InRun);
743 auto const lumiLookup =
preg_->productLookup(
InLumi);
746 looper_->updateLookup(
InRun, *runLookup, mustPrefetchMayGet);
747 looper_->updateLookup(
InLumi, *lumiLookup, mustPrefetchMayGet);
749 looper_->updateLookup(
esp_->recordsToResolverIndices());
751 firstException = std::current_exception();
755 CMS_SA_ALLOW try { subProcess.doBeginJob(); }
catch (...) {
756 if (!firstException) {
757 firstException = std::current_exception();
761 if (firstException) {
762 std::rethrow_exception(firstException);
772 oneapi::tbb::task_group
group;
778 first([
this,
i](
auto nextTask) {
779 std::exception_ptr exceptionPtr;
783 exceptionPtr = std::current_exception();
786 CMS_SA_ALLOW try { subProcess.doBeginStream(
i); }
catch (...) {
788 exceptionPtr = std::current_exception();
793 nextTask.doneWaiting(exceptionPtr);
797 finalWaitingTask.wait();
805 oneapi::tbb::task_group
group;
811 first([
this,
i, &collector, &collectorMutex](
auto nextTask) {
814 schedule_->endStream(
i, collector, collectorMutex);
816 subProcess.doEndStream(
i, collector, collectorMutex);
822 finalWaitingTask.waitNoThrow();
828 "Multiple exceptions were thrown while executing endStream and endJob. An exception message follows for " 841 subProcess.doEndJob(
c);
856 return schedule_->getAllModuleDescriptions();
868 #include "TransitionProcessors.icc" 872 bool returnValue =
false;
877 edm::LogSystem(
"ShutdownSignal") <<
"an external signal was sent to shutdown the job early.";
886 struct SourceNextGuard {
888 ~SourceNextGuard() { act_.postSourceNextTransitionSignal_.emit(); }
897 SourceNextGuard guard(*
actReg_.get());
900 itemTypeInfo =
input_->nextItemType();
904 sentry.completedSuccessfully();
923 std::lock_guard<std::recursive_mutex> guard(*(
sourceMutex_.get()));
926 runStatus->runPrincipal()->run() ==
input_->run() &&
927 runStatus->runPrincipal()->reducedProcessHistoryID() ==
input_->reducedProcessHistoryID()) {
929 <<
"InputSource claimed previous Run Entry was last to be merged in this file,\n" 930 <<
"but the next entry has the same run number and reduced ProcessHistoryID.\n" 931 <<
"This is probably a bug in the InputSource. Please report to the Core group.\n";
934 nextHolder.doneWaiting(std::current_exception());
944 actReg_->beginProcessingSignal_();
945 auto endSignal = [](
ActivityRegistry* iReg) { iReg->endProcessingSignal_(); };
946 std::unique_ptr<ActivityRegistry, decltype(endSignal)> guard(
actReg_.get(), endSignal);
951 bool firstTime =
true;
961 auto trans =
fp.processFiles(*
this);
972 throw cms::Exception(
"BadTransition") <<
"Unexpected transition change " <<
static_cast<int>(trans);
981 "Another exception was caught while trying to clean up lumis after the primary fatal exception.");
982 e.addAdditionalInfo(message);
983 if (
e.alreadyPrinted()) {
989 "Another exception was caught while trying to clean up runs after the primary fatal exception.");
990 e.addAdditionalInfo(message);
991 if (
e.alreadyPrinted()) {
997 if (
e.alreadyPrinted()) {
1007 FDEBUG(1) <<
" \treadFile\n";
1008 size_t size =
preg_->size();
1013 streamRunStatus_[0]->runPrincipal()->adjustIndexesAfterProductRegistryAddition();
1017 streamLumiStatus_[0]->lumiPrincipal()->adjustIndexesAfterProductRegistryAddition();
1021 if (size < preg_->size()) {
1028 sentry.completedSuccessfully();
1034 input_->closeFile(
fb_.get(), cleaningUpAfterException);
1035 sentry.completedSuccessfully();
1037 FDEBUG(1) <<
"\tcloseInputFile\n";
1045 FDEBUG(1) <<
"\topenOutputFiles\n";
1052 FDEBUG(1) <<
"\tcloseOutputFiles\n";
1058 [
this](
auto& subProcess) { subProcess.updateBranchIDListHelper(
branchIDListHelper_->branchIDLists()); });
1062 FDEBUG(1) <<
"\trespondToOpenInputFile\n";
1070 FDEBUG(1) <<
"\trespondToCloseInputFile\n";
1080 FDEBUG(1) <<
"\tstartingNewLoop\n";
1086 looper_->setModuleChanger(&changer);
1088 looper_->setModuleChanger(
nullptr);
1094 FDEBUG(1) <<
"\tendOfLoop\n";
1101 FDEBUG(1) <<
"\trewind\n";
1106 FDEBUG(1) <<
"\tprepareForNextLoop\n";
1110 FDEBUG(1) <<
"\tshouldWeCloseOutput\n";
1113 if (subProcess.shouldWeCloseOutput()) {
1119 return schedule_->shouldWeCloseOutput();
1123 FDEBUG(1) <<
"\tdoErrorStuff\n";
1124 LogError(
"StateMachine") <<
"The EventProcessor state machine encountered an unexpected event\n" 1125 <<
"and went to the error state\n" 1126 <<
"Will attempt to terminate processing normally\n" 1127 <<
"(IF using the looper the next loop will be attempted)\n" 1128 <<
"This likely indicates a bug in an input module or corrupted input or both\n";
1139 beginGlobalTransitionAsync<Traits>(
1142 globalWaitTask.wait();
1143 beginProcessBlockSucceeded =
true;
1147 input_->fillProcessBlockHelper();
1149 while (
input_->nextProcessBlock(processBlockPrincipal)) {
1156 beginGlobalTransitionAsync<Traits>(
1159 globalWaitTask.wait();
1163 writeWaitTask.wait();
1184 cleaningUpAfterException);
1185 globalWaitTask.wait();
1187 if (beginProcessBlockSucceeded) {
1190 writeWaitTask.wait();
1213 runStatus->runPrincipal()->run() ==
input_->run() and
1214 runStatus->runPrincipal()->reducedProcessHistoryID() ==
input_->reducedProcessHistoryID()) {
1222 runStatus->setHolderOfTaskInProcessRuns(holder);
1239 actReg_->esSyncIOVQueuingSignal_.emit(iSync);
1246 status->endIOVWaitingTasks(),
1247 status->eventSetupImpls(),
1252 }) |
chain::then([
this,
status](std::exception_ptr
const* iException,
auto nextTask) {
1265 if (postRunQueueTask.taskHasFailed()) {
1266 status->resetBeginResources();
1274 *postRunQueueTask.group(), [
this, postSourceTask = postRunQueueTask,
status]()
mutable {
1278 if (postSourceTask.taskHasFailed()) {
1279 status->resetBeginResources();
1281 status->resumeGlobalRunQueue();
1291 sentry.completedSuccessfully();
1298 oneapi::tbb::task_group
group;
1303 }) |
then([
this, &es](
auto nextTask)
mutable {
1320 status->setStopBeforeProcessingRun(
true);
1321 nextTask.doneWaiting(std::current_exception());
1323 }) |
then([
this,
status, &es](
auto nextTask) {
1324 if (
status->stopBeforeProcessingRun()) {
1329 beginGlobalTransitionAsync<Traits>(
1332 if (
status->stopBeforeProcessingRun()) {
1338 if (
status->stopBeforeProcessingRun()) {
1343 }) |
then([
this,
status](std::exception_ptr
const* iException,
auto holder)
mutable {
1348 status->globalBeginDidSucceed();
1351 if (
status->stopBeforeProcessingRun()) {
1353 status->resetBeginResources();
1355 status->resumeGlobalRunQueue();
1361 auto globalEndRunTask =
1364 status->holderOfTaskInProcessRuns().doneWaiting(std::exception_ptr{});
1369 status->resetBeginResources();
1371 status->resumeGlobalRunQueue();
1372 holder.doneWaiting(std::current_exception());
1393 if (
status->streamFinishedBeginRun()) {
1396 status->resetBeginResources();
1406 status->resetBeginResources();
1413 status->resetBeginResources();
1415 status->resumeGlobalRunQueue();
1416 postSourceTask.doneWaiting(std::current_exception());
1420 status->resetBeginResources();
1422 status->resumeGlobalRunQueue();
1423 postRunQueueTask.doneWaiting(std::current_exception());
1427 status->resetBeginResources();
1429 nextTask.doneWaiting(std::current_exception());
1435 std::shared_ptr<RunProcessingStatus>
status,
1450 beginStreamTransitionAsync<Traits>(
1453 }) |
then([
this, iStream](std::exception_ptr
const* exceptionFromBeginStreamRun,
auto nextTask) {
1454 if (exceptionFromBeginStreamRun) {
1455 nextTask.doneWaiting(*exceptionFromBeginStreamRun);
1461 iHolder.doneWaiting(std::current_exception());
1467 if (
status->streamFinishedBeginRun()) {
1468 status->resetBeginResources();
1475 RunPrincipal& runPrincipal = *iRunStatus->runPrincipal();
1488 iRunStatus->endIOVWaitingTasksEndRun(),
1489 iRunStatus->eventSetupImplsEndRun(),
1493 }) |
chain::then([
this, iRunStatus](std::exception_ptr
const* iException,
auto nextTask) {
1495 iRunStatus->setEndingEventSetupSucceeded(
false);
1502 streamQueues_[
i].push(*nextTask.group(), [
this,
i, nextTask]()
mutable {
1529 tmp.doneWaiting(iException);
1534 auto& runPrincipal = *(iRunStatus->runPrincipal());
1535 bool didGlobalBeginSucceed = iRunStatus->didGlobalBeginSucceed();
1536 bool cleaningUpAfterException = iRunStatus->cleaningUpAfterException() || iTask.
taskHasFailed();
1537 EventSetupImpl const& es = iRunStatus->eventSetupImplEndRun(
esp_->subProcessIndex());
1538 std::vector<std::shared_ptr<const EventSetupImpl>>
const* eventSetupImpls = &iRunStatus->eventSetupImplsEndRun();
1539 bool endingEventSetupSucceeded = iRunStatus->endingEventSetupSucceeded();
1543 chain::first([
this, &runPrincipal, &es, &eventSetupImpls, cleaningUpAfterException, endingEventSetupSucceeded](
1545 if (endingEventSetupSucceeded) {
1548 endGlobalTransitionAsync<Traits>(
1553 [
this, &runPrincipal, &es](
auto nextTask) {
1557 [
this, &runPrincipal, &es](
auto nextTask) {
1561 ifThen(didGlobalBeginSucceed && endingEventSetupSucceeded,
1562 [
this, mergeableRunProductMetadata, &runPrincipal = runPrincipal](
auto nextTask) {
1564 writeRunAsync(nextTask, runPrincipal, mergeableRunProductMetadata);
1568 didGlobalBeginSucceed,
1569 mergeableRunProductMetadata,
1570 endingEventSetupSucceeded](std::exception_ptr
const* iException,
auto nextTask)
mutable {
1571 if (didGlobalBeginSucceed && endingEventSetupSucceeded) {
1572 mergeableRunProductMetadata->postWriteRun();
1579 std::exception_ptr ptr;
1586 ptr = std::current_exception();
1590 status->resumeGlobalRunQueue();
1594 ptr = std::current_exception();
1598 status->resetEndResources();
1602 ptr = std::current_exception();
1606 if (ptr && !iException) {
1632 if (runStatus->streamFinishedRun()) {
1633 runStatus->globalEndRunHolder().doneWaiting(std::exception_ptr());
1644 if (runStatus->didGlobalBeginSucceed() && runStatus->endingEventSetupSucceeded()) {
1646 auto eventSetupImpls = &runStatus->eventSetupImplsEndRun();
1647 bool cleaningUpAfterException = runStatus->cleaningUpAfterException() || iTask.
taskHasFailed();
1649 auto& runPrincipal = *runStatus->runPrincipal();
1652 endStreamTransitionAsync<Traits>(
std::move(runDoneTaskHolder),
1658 cleaningUpAfterException);
1670 runStatus->setCleaningUpAfterException(cleaningUpAfterException);
1672 runStatus->setHolderOfTaskInProcessRuns(holder);
1680 std::shared_ptr<RunProcessingStatus> iRunStatus,
1682 actReg_->esSyncIOVQueuingSignal_.emit(iSync);
1684 auto status = std::make_shared<LuminosityBlockProcessingStatus>();
1688 status->endIOVWaitingTasks(),
1689 status->eventSetupImpls(),
1693 }) |
chain::then([
this,
status, iRunStatus](std::exception_ptr
const* iException,
auto nextTask) {
1705 if (postLumiQueueTask.taskHasFailed()) {
1706 status->resetResources();
1715 *postLumiQueueTask.group(),
1716 [
this, postSourceTask = postLumiQueueTask,
status, iRunStatus]()
mutable {
1720 if (postSourceTask.taskHasFailed()) {
1721 status->resetResources();
1733 sentry.completedSuccessfully();
1739 rng->preBeginLumi(lb);
1751 }) |
then([
this,
status, &es, &lumiPrincipal](
auto nextTask) {
1754 beginGlobalTransitionAsync<Traits>(
1762 }) |
then([
this,
status, iRunStatus](std::exception_ptr
const* iException,
auto holder)
mutable {
1763 status->setGlobalEndRunHolder(iRunStatus->globalEndRunHolder());
1771 status->globalBeginDidSucceed();
1779 if (!
status->shouldStreamStartLumi()) {
1785 auto eventSetupImpls = &
status->eventSetupImpls();
1786 auto lp =
status->lumiPrincipal().get();
1789 event.setLuminosityBlockPrincipal(lp);
1793 beginStreamTransitionAsync<Traits>(
std::move(nextTask),
1800 then([
this,
i](std::exception_ptr
const* exceptionFromBeginStreamLumi,
1802 if (exceptionFromBeginStreamLumi) {
1804 copyHolder.
doneWaiting(*exceptionFromBeginStreamLumi);
1815 status->resetResources();
1823 status->resetResources();
1831 status->resetResources();
1847 status->lumiPrincipal()->luminosityBlock() ==
input_->luminosityBlock()) {
1852 unsigned int streamIndex = 0;
1853 oneapi::tbb::task_arena arena{oneapi::tbb::task_arena::attach()};
1857 nextTask.group()->run(
1867 tmp.doneWaiting(iException);
1872 std::shared_ptr<LuminosityBlockProcessingStatus> iLumiStatus) {
1875 auto& lp = *(iLumiStatus->lumiPrincipal());
1876 bool didGlobalBeginSucceed = iLumiStatus->didGlobalBeginSucceed();
1877 bool cleaningUpAfterException = iLumiStatus->cleaningUpAfterException() || iTask.
taskHasFailed();
1879 std::vector<std::shared_ptr<const EventSetupImpl>>
const* eventSetupImpls = &iLumiStatus->eventSetupImpls();
1882 chain::first([
this, &lp, &es, &eventSetupImpls, cleaningUpAfterException](
auto nextTask) {
1887 endGlobalTransitionAsync<Traits>(
1889 }) |
then([
this, didGlobalBeginSucceed, &lumiPrincipal = lp](
auto nextTask) {
1891 if (didGlobalBeginSucceed) {
1900 }) |
then([
status =
std::move(iLumiStatus),
this](std::exception_ptr
const* iException,
auto nextTask)
mutable {
1906 std::exception_ptr ptr;
1914 ptr = std::current_exception();
1920 ptr = std::current_exception();
1925 status->resetResources();
1926 status->globalEndRunHolderDoneWaiting();
1930 ptr = std::current_exception();
1934 if (ptr && !iException) {
1953 if (
status->streamFinishedLumi()) {
1963 lumiStatus->setEndTime();
1966 auto eventSetupImpls = &lumiStatus->eventSetupImpls();
1967 bool cleaningUpAfterException = lumiStatus->cleaningUpAfterException() || iTask.
taskHasFailed();
1969 auto& lumiPrincipal = *lumiStatus->lumiPrincipal();
1972 endStreamTransitionAsync<Traits>(
std::move(lumiDoneTask),
1978 cleaningUpAfterException);
1994 globalWaitTask.wait();
2001 input_->readProcessBlock(processBlockPrincipal);
2002 sentry.completedSuccessfully();
2008 rp->setAux(*
input_->runAuxiliary());
2012 sentry.completedSuccessfully();
2014 assert(
input_->reducedProcessHistoryID() == rp->reducedProcessHistoryID());
2026 input_->readAndMergeRun(runPrincipal);
2027 sentry.completedSuccessfully();
2034 lbp->setAux(*
input_->luminosityBlockAuxiliary());
2038 sentry.completedSuccessfully();
2046 assert(lumiPrincipal.aux().sameIdentity(*
input_->luminosityBlockAuxiliary())
or 2047 input_->processHistoryRegistry().reducedProcessHistoryID(lumiPrincipal.aux().processHistoryID()) ==
2048 input_->processHistoryRegistry().reducedProcessHistoryID(
2049 input_->luminosityBlockAuxiliary()->processHistoryID()));
2050 bool lumiOK = lumiPrincipal.adjustToNewProductRegistry(*
preg());
2052 lumiPrincipal.mergeAuxiliary(*
input_->luminosityBlockAuxiliary());
2056 sentry.completedSuccessfully();
2069 s.writeProcessBlockAsync(nextTask, processBlockType);
2085 s.writeRunAsync(nextTask, runPrincipal, mergeableRunProductMetadata);
2110 s.writeLumiAsync(nextTask, lumiPrincipal);
2120 iStatus.
lumiPrincipal()->setRunPrincipal(std::shared_ptr<RunPrincipal>());
2133 std::lock_guard<std::recursive_mutex> guard(*(
sourceMutex_.get()));
2140 status->runPrincipal()->reducedProcessHistoryID() ==
input_->reducedProcessHistoryID()) {
2141 if (
status->holderOfTaskInProcessRuns().taskHasFailed()) {
2142 status->setStopBeforeProcessingRun(
true);
2153 status->setStopBeforeProcessingRun(
true);
2154 holder.doneWaiting(std::current_exception());
2167 std::lock_guard<std::recursive_mutex> guard(*(
sourceMutex_.get()));
2173 iLumiStatus->lumiPrincipal()->luminosityBlock() ==
input_->luminosityBlock()) {
2182 holder.doneWaiting(std::current_exception());
2189 chain::first([
this, iRunStatus](
auto nextTask)
mutable {
2193 }) |
chain::then([
this, iRunStatus](std::exception_ptr
const* iException,
auto nextTask) {
2200 iRunStatus->holderOfTaskInProcessRuns().doneWaiting(std::exception_ptr{});
2206 input_->luminosityBlockAuxiliary()->beginTime()),
2222 unsigned int iStreamIndex,
2254 std::lock_guard<std::recursive_mutex> guard(*(
sourceMutex_.get()));
2269 iStatus.
lumiPrincipal()->runPrincipal().reducedProcessHistoryID() !=
input_->reducedProcessHistoryID()))) {
2273 <<
"InputSource claimed previous Lumi Entry was last to be merged in this file,\n" 2274 <<
"but the next lumi entry has the same lumi number.\n" 2275 <<
"This is probably a bug in the InputSource. Please report to the Core group.\n";
2299 auto recursionTask =
2300 make_waiting_task([
this, iTask, iStreamIndex](std::exception_ptr
const* iEventException)
mutable {
2301 if (iEventException) {
2315 if (not
status->haveStartedNextLumiOrEndedRun()) {
2316 status->noMoreEventsInLumi();
2317 status->startNextLumiOrEndRun();
2321 input_->luminosityBlockAuxiliary()->beginTime()),
2340 if (runStatus->holderOfTaskInProcessRuns().hasTask()) {
2341 runStatus->holderOfTaskInProcessRuns().doneWaiting(std::exception_ptr{});
2363 sentry.completedSuccessfully();
2365 FDEBUG(1) <<
"\treadEvent\n";
2373 struct ClearEventGuard {
2375 : act_(iReg), context_(iContext) {
2378 ~ClearEventGuard() { act_.postClearEventSignal_.emit(context_); }
2391 rng->postEventRead(
ev);
2396 chain::first([
this, &es, pep, iStreamIndex](
auto nextTask) {
2401 subProcess.doEventAsync(nextTask, *pep, &
streamLumiStatus_[iStreamIndex]->eventSetupImpls());
2403 }) |
ifThen(
looper_, [
this, iStreamIndex, pep](
auto nextTask) {
2407 }) |
then([
this, pep](
auto nextTask) {
2408 FDEBUG(1) <<
"\tprocessEvent\n";
2412 pep->runPrincipal().index(),
2413 pep->luminosityBlockPrincipal().index(),
2416 ClearEventGuard guard(*this->
actReg_.get(), streamContext);
2417 pep->clearEventPrincipal();
2422 bool randomAccess =
input_->randomAccess();
2431 status =
looper_->doDuringLoop(iPrincipal, es, pc, &streamContext);
2449 FDEBUG(1) <<
"\tshouldWeStop\n";
2454 if (subProcess.terminate()) {
2470 bool expected =
false;
2480 ex <<
"The framework is configured to use at least two streams, but the following modules\n" 2481 <<
"require synchronizing on LuminosityBlock boundaries:";
2483 for (
auto worker :
schedule_->allWorkers()) {
2484 if (worker->wantsGlobalLuminosityBlocks() and worker->globalLuminosityBlocksQueue()) {
2486 ex <<
"\n " << worker->description()->moduleName() <<
" " << worker->description()->moduleLabel();
2490 ex <<
"\n\nThe situation can be fixed by either\n" 2491 <<
" * modifying the modules to support concurrent LuminosityBlocks (preferred), or\n" 2492 <<
" * setting 'process.options.numberOfConcurrentLuminosityBlocks = 1' in the configuration file";
2498 std::unique_ptr<LogSystem>
s;
2499 for (
auto worker :
schedule_->allWorkers()) {
2500 if (worker->wantsGlobalRuns() and worker->globalRunsQueue()) {
2502 s = std::make_unique<LogSystem>(
"ModulesSynchingOnRuns");
2503 (*s) <<
"The following modules require synchronizing on Run boundaries:";
2505 (*s) <<
"\n " << worker->description()->moduleName() <<
" " << worker->description()->moduleLabel();
LuminosityBlockNumber_t luminosityBlock() const
PreClearEvent preClearEventSignal_
signal is emitted before the data products in the Event are cleared
std::atomic< bool > exceptionMessageLumis_
bool readNextEventForStream(WaitingTaskHolder const &, unsigned int iStreamIndex, LuminosityBlockProcessingStatus &)
void readEvent(unsigned int iStreamIndex)
void streamEndRunAsync(WaitingTaskHolder, unsigned int iStreamIndex)
ProcessContext processContext_
Log< level::System, false > LogSystem
void setProcessesWithMergeableRunProducts(ProductRegistry const &productRegistry)
T getParameter(std::string const &) const
void clear()
Not thread safe.
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
void clearRunPrincipal(RunProcessingStatus &)
void setExceptionMessageRuns()
void globalEndRunAsync(WaitingTaskHolder, std::shared_ptr< RunProcessingStatus >)
SharedResourcesAcquirer sourceResourcesAcquirer_
Timestamp const & endTime() const
void handleNextEventForStreamAsync(WaitingTaskHolder, unsigned int iStreamIndex)
int totalEventsFailed() const
std::shared_ptr< ProductRegistry const > preg() const
void processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
InputSource::ItemTypeInfo lastSourceTransition_
def create(alignables, pedeDump, additionalData, outputFile, config)
std::shared_ptr< RunPrincipal > readRun()
void handleEndLumiExceptions(std::exception_ptr, WaitingTaskHolder const &)
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
int merge(int argc, char *argv[])
static PFTauRenderPlugin instance
EventProcessingState eventProcessingState() const
SerialTaskQueue streamQueuesInserter_
void endUnfinishedRun(bool cleaningUpAfterException)
void streamBeginRunAsync(unsigned int iStream, std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder) noexcept
void setExceptionMessageFiles(std::string &message)
InputSource::ItemTypeInfo nextTransitionType()
RunPrincipal const & runPrincipal() const
void processEventAsyncImpl(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
void processEventWithLooper(EventPrincipal &, unsigned int iStreamIndex)
void handleEndRunExceptions(std::exception_ptr, WaitingTaskHolder const &)
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
std::unique_ptr< edm::LimitedTaskQueue > lumiQueue_
volatile std::atomic< bool > shutdown_flag
static LuminosityBlockNumber_t maxLuminosityBlockNumber()
void clearCounters()
Clears counters used by trigger report.
ProcessBlockPrincipal & inputProcessBlockPrincipal() const
void push(oneapi::tbb::task_group &, const T &iAction)
asynchronously pushes functor iAction into queue
bool checkForAsyncStopRequest(StatusCode &)
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
bool needToCallNext() const
void writeLumiAsync(WaitingTaskHolder, LuminosityBlockPrincipal &)
std::unique_ptr< edm::ModuleTypeResolverMaker const > makeModuleTypeResolverMaker(edm::ParameterSet const &pset)
std::shared_ptr< EDLooperBase const > looper() const
void ensureAvailableAccelerators(edm::ParameterSet const ¶meterSet)
ParameterSet getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
std::shared_ptr< RunPrincipal > getAvailableRunPrincipalPtr()
std::shared_ptr< RunPrincipal > & runPrincipal()
void adjustEventsToNewProductRegistry(std::shared_ptr< ProductRegistry const >)
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
Log< level::Error, false > LogError
void adjustIndexesAfterProductRegistryAddition()
StreamID streamID() const
ParameterSet const & parameterSet(StableProvenance const &provenance, ProcessHistory const &history)
PreallocationConfiguration preallocations_
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
constexpr auto then(O &&iO)
void endStreams(ExceptionCollector &) noexcept
bool beginJobStartedModules_
std::vector< SubProcess > subProcesses_
static unsigned int getUniqueID()
Returns a unique id each time called. Intended to be passed to ModuleDescription's constructor's modI...
void swap(Association< C > &lhs, Association< C > &rhs)
bool didGlobalBeginSucceed() const
std::atomic< bool > exceptionMessageRuns_
void mergeAuxiliary(RunAuxiliary const &aux)
edm::SerialTaskQueue queueWhichWaitsForIOVsToFinish_
void validateTopLevelParameterSets(ParameterSet *processParameterSet)
MergeableRunProductProcesses mergeableRunProductProcesses_
static std::string const input
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
void fillProcessBlockPrincipal(std::string const &processName, DelayedReader *reader=nullptr)
void setEndTime(Timestamp const &time)
ProcessBlockPrincipal & processBlockPrincipal() const
edm::propagate_const< std::unique_ptr< ModuleTypeResolverMaker const > > moduleTypeResolverMaker_
void endProcessBlock(bool cleaningUpAfterException, bool beginProcessBlockSucceeded)
U second(std::pair< T, U > const &p)
std::shared_ptr< LuminosityBlockPrincipal > & lumiPrincipal()
oneapi::tbb::task_group * group() const noexcept
void emit(Args &&... args) const
std::multimap< std::string, std::string > referencesToBranches_
unsigned int numberOfThreads() const
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
ServiceToken serviceToken_
ParameterSetID id() const
std::atomic< bool > deferredExceptionPtrIsSet_
bool resume()
Resumes processing if the queue was paused.
ParameterSet const & registerIt()
std::shared_ptr< EDLooperBase > fillLooper(eventsetup::EventSetupsController &esController, eventsetup::EventSetupProvider &cp, ParameterSet ¶ms, std::vector< std::string > const &loopers)
std::vector< edm::SerialTaskQueue > streamQueues_
auto runLast(edm::WaitingTaskHolder iTask)
void nextTransitionTypeAsync(std::shared_ptr< RunProcessingStatus > iRunStatus, WaitingTaskHolder nextTask)
bool taskHasFailed() const noexcept
std::vector< std::string > modulesToIgnoreForDeleteEarly_
ShouldWriteRun shouldWriteRun() const
std::unique_ptr< edm::LimitedTaskQueue > runQueue_
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
void writeRunAsync(WaitingTaskHolder, RunPrincipal const &, MergeableRunProductMetadata const *)
SerialTaskQueueChain & serialQueueChain() const
static void setThrowAnException(bool v)
void writeProcessBlockAsync(WaitingTaskHolder, ProcessBlockType)
void setLastOperationSucceeded(bool value)
unsigned int numberOfStreams() const
void initialize(Schedule const *, std::shared_ptr< ProductRegistry const >)
std::vector< ModuleDescription const * > nonConsumedUnscheduledModules(edm::PathsAndConsumesOfModulesBase const &iPnC, std::vector< ModuleProcessName > &consumedByChildren)
static ServiceRegistry & instance()
void clear()
Not thread safe.
void setNeedToCallNext(bool val)
StatusCode runToCompletion()
FunctorWaitingTask< F > * make_waiting_task(F f)
void clearLumiPrincipal(LuminosityBlockProcessingStatus &)
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
PreSourceNextTransition preSourceNextTransitionSignal_
bool lastOperationSucceeded() const
unsigned int numberOfRuns() const
void insert(std::unique_ptr< ProcessBlockPrincipal >)
std::vector< std::shared_ptr< RunProcessingStatus > > streamRunStatus_
void doneWaiting(std::exception_ptr iExcept) noexcept
std::string exceptionMessageFiles_
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
std::unique_ptr< InputSource > makeInput(unsigned int moduleIndex, ParameterSet ¶ms, CommonParams const &common, std::shared_ptr< ProductRegistry > preg, std::shared_ptr< BranchIDListHelper > branchIDListHelper, std::shared_ptr< ProcessBlockHelper > const &processBlockHelper, std::shared_ptr< ThinnedAssociationsHelper > thinnedAssociationsHelper, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, PreallocationConfiguration const &allocations)
void releaseBeginRunResources(unsigned int iStream)
std::shared_ptr< RunProcessingStatus > exceptionRunStatus_
void respondToCloseInputFile()
Log< level::Info, false > LogInfo
void readAndMergeRun(RunProcessingStatus &)
std::vector< ParameterSet > popSubProcessVParameterSet(ParameterSet ¶meterSet)
void reportShutdownSignal()
void warnAboutModulesRequiringRunSynchronization() const
void checkForModuleDependencyCorrectness(edm::PathsAndConsumesOfModulesBase const &iPnC, bool iPrintDependencies)
void respondToOpenInputFile()
unsigned int numberOfLuminosityBlocks() const
void beginLumiAsync(IOVSyncValue const &, std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
EventSetupImpl const & eventSetupImpl(unsigned subProcessIndex) const
void handleNextItemAfterMergingRunEntries(std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
InputSource::ItemTypeInfo lastTransitionType() const
InputSource::ItemType processRuns()
MergeableRunProductMetadata * mergeableRunProductMetadata()
oneapi::tbb::task_group taskGroup_
void throwAboutModulesRequiringLuminosityBlockSynchronization() const
void insertForInput(std::unique_ptr< ProcessBlockPrincipal >)
void addContext(std::string const &context)
static EventNumber_t maxEventNumber()
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
ShouldWriteLumi shouldWriteLumi() const
edm::EventID specifiedEventTransition() const
void endRunAsync(std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
bool forceESCacheClearOnNewRun_
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
bool shouldWeStop() const
std::vector< std::shared_ptr< const EventSetupImpl > > & eventSetupImpls()
std::atomic< unsigned int > streamRunActive_
void readAndMergeLumi(LuminosityBlockProcessingStatus &)
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
std::vector< std::string > branchesToDeleteEarly_
void readAndMergeLumiEntriesAsync(std::shared_ptr< LuminosityBlockProcessingStatus >, WaitingTaskHolder)
void closeInputFile(bool cleaningUpAfterException)
void readAndMergeRunEntriesAsync(std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
void readProcessBlock(ProcessBlockPrincipal &)
bool adjustToNewProductRegistry(ProductRegistry const ®)
static ComponentFactory< T > const * get()
std::exception_ptr deferredExceptionPtr_
void removeModules(std::vector< ModuleDescription const *> const &modules)
std::shared_ptr< LuminosityBlockPrincipal > readLuminosityBlock(std::shared_ptr< RunPrincipal > rp)
auto lastTask(edm::WaitingTaskHolder iTask)
T const & get(Event const &event, InputTag const &tag) noexcept(false)
void endUnfinishedLumi(bool cleaningUpAfterException)
bool shouldWeCloseOutput() const
PathsAndConsumesOfModules pathsAndConsumesOfModules_
int totalEventsPassed() const
edm::propagate_const< std::shared_ptr< ProcessBlockHelper > > processBlockHelper_
void continueLumiAsync(WaitingTaskHolder)
Transition requestedTransition() const
void globalEndLumiAsync(WaitingTaskHolder, std::shared_ptr< LuminosityBlockProcessingStatus >)
Log< level::System, true > LogAbsolute
void prepareForNextLoop()
std::shared_ptr< LuminosityBlockPrincipal > getAvailableLumiPrincipalPtr()
auto wrap(F iFunc) -> decltype(iFunc())
std::shared_ptr< ActivityRegistry > actReg_
std::atomic< unsigned int > streamLumiActive_
Log< level::Warning, false > LogWarning
void streamEndLumiAsync(WaitingTaskHolder, unsigned int iStreamIndex)
void beginProcessBlock(bool &beginProcessBlockSucceeded)
The Signals That Services Can Subscribe To This is based on ActivityRegistry h
Helper function to determine trigger accepts.
T first(std::pair< T, U > const &p)
static ParentageRegistry * instance()
void setExceptionMessageLumis()
void beginRunAsync(IOVSyncValue const &, WaitingTaskHolder)
bool setDeferredException(std::exception_ptr)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
void setEventProcessingState(EventProcessingState val)
bool deleteNonConsumedUnscheduledModules_
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
void inputProcessBlocks()
bool insertMapped(value_type const &v)
static Registry * instance()
PrincipalCache principalCache_
void push(oneapi::tbb::task_group &iGroup, T &&iAction)
asynchronously pushes functor iAction into queue
EventProcessor(std::unique_ptr< ParameterSet > parameterSet, ServiceToken const &token=ServiceToken(), serviceregistry::ServiceLegacy=serviceregistry::kOverlapIsError, std::vector< std::string > const &defaultServices=std::vector< std::string >(), std::vector< std::string > const &forcedServices=std::vector< std::string >())
void dumpOptionsToLogFile(unsigned int nThreads, unsigned int nStreams, unsigned int nConcurrentLumis, unsigned int nConcurrentRuns)