84 #include "boost/range/adaptor/reversed.hpp" 96 #include "oneapi/tbb/task.h" 104 class PauseQueueSentry {
107 ~PauseQueueSentry() { queue_.resume(); }
116 namespace chain = waiting_task::chain;
119 std::unique_ptr<InputSource>
makeInput(
unsigned int moduleIndex,
122 std::shared_ptr<ProductRegistry> preg,
123 std::shared_ptr<BranchIDListHelper> branchIDListHelper,
124 std::shared_ptr<ProcessBlockHelper>
const& processBlockHelper,
125 std::shared_ptr<ThinnedAssociationsHelper> thinnedAssociationsHelper,
126 std::shared_ptr<ActivityRegistry> areg,
127 std::shared_ptr<ProcessConfiguration const> processConfiguration,
130 if (main_input ==
nullptr) {
132 <<
"There must be exactly one source in the configuration.\n" 133 <<
"It is missing (or there are sufficient syntax errors such that it is not recognized as the source)\n";
138 std::unique_ptr<ParameterSetDescriptionFillerBase>
filler(
141 filler->fill(descriptions);
146 std::ostringstream ost;
147 ost <<
"Validating configuration of input source of type " << modtype;
163 processConfiguration.get(),
170 thinnedAssociationsHelper,
174 common.maxSecondsUntilRampdown_,
177 areg->preSourceConstructionSignal_(md);
178 std::unique_ptr<InputSource>
input;
181 std::shared_ptr<int> sentry(
nullptr, [areg, &md](
void*) { areg->postSourceConstructionSignal_(md); });
184 input->preEventReadFromSourceSignal_.connect(std::cref(areg->preEventReadFromSourceSignal_));
185 input->postEventReadFromSourceSignal_.connect(std::cref(areg->postEventReadFromSourceSignal_));
188 std::ostringstream ost;
189 ost <<
"Constructing input source of type " << modtype;
200 std::vector<std::string>
const& loopers) {
201 std::shared_ptr<EDLooperBase> vLooper;
203 assert(1 == loopers.size());
205 for (
auto const& looperName : loopers) {
217 std::vector<std::string>
const& defaultServices,
218 std::vector<std::string>
const& forcedServices)
221 branchIDListHelper_(),
225 espController_(
std::make_unique<eventsetup::EventSetupsController>(moduleTypeResolverMaker_.
get())),
228 processConfiguration_(),
234 deferredExceptionPtrIsSet_(
false),
238 beginJobCalled_(
false),
239 shouldWeStop_(
false),
240 fileModeNoMerge_(
false),
241 exceptionMessageFiles_(),
242 exceptionMessageRuns_(
false),
243 exceptionMessageLumis_(
false),
244 forceLooperToEnd_(
false),
245 looperBeginJobRun_(
false),
246 forceESCacheClearOnNewRun_(
false),
247 eventSetupDataToExcludeFromPrefetching_() {
249 processDesc->addServices(defaultServices, forcedServices);
250 init(processDesc, iToken, iLegacy);
254 std::vector<std::string>
const& defaultServices,
255 std::vector<std::string>
const& forcedServices)
258 branchIDListHelper_(),
262 espController_(
std::make_unique<eventsetup::EventSetupsController>(moduleTypeResolverMaker_.
get())),
265 processConfiguration_(),
271 deferredExceptionPtrIsSet_(
false),
275 beginJobCalled_(
false),
276 shouldWeStop_(
false),
277 fileModeNoMerge_(
false),
278 exceptionMessageFiles_(),
279 exceptionMessageRuns_(
false),
280 exceptionMessageLumis_(
false),
281 forceLooperToEnd_(
false),
282 looperBeginJobRun_(
false),
283 forceESCacheClearOnNewRun_(
false),
284 eventSetupDataToExcludeFromPrefetching_() {
286 processDesc->addServices(defaultServices, forcedServices);
295 branchIDListHelper_(),
299 espController_(
std::make_unique<eventsetup::EventSetupsController>(moduleTypeResolverMaker_.
get())),
302 processConfiguration_(),
308 deferredExceptionPtrIsSet_(
false),
312 beginJobCalled_(
false),
313 shouldWeStop_(
false),
314 fileModeNoMerge_(
false),
315 exceptionMessageFiles_(),
316 exceptionMessageRuns_(
false),
317 exceptionMessageLumis_(
false),
318 forceLooperToEnd_(
false),
319 looperBeginJobRun_(
false),
320 forceESCacheClearOnNewRun_(
false),
321 eventSetupDataToExcludeFromPrefetching_() {
336 std::shared_ptr<ParameterSet>
parameterSet = processDesc->getProcessPSet();
340 bool const hasSubProcesses = !subProcessVParameterSet.empty();
353 <<
"Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
361 unsigned int nThreads = optionsPset.getUntrackedParameter<
unsigned int>(
"numberOfThreads");
367 unsigned int nStreams = optionsPset.getUntrackedParameter<
unsigned int>(
"numberOfStreams");
371 unsigned int nConcurrentLumis =
372 optionsPset.getUntrackedParameter<
unsigned int>(
"numberOfConcurrentLuminosityBlocks");
373 if (nConcurrentLumis == 0) {
374 nConcurrentLumis = 2;
376 if (nConcurrentLumis > nStreams) {
377 nConcurrentLumis = nStreams;
379 unsigned int nConcurrentRuns = optionsPset.getUntrackedParameter<
unsigned int>(
"numberOfConcurrentRuns");
380 if (nConcurrentRuns == 0 || nConcurrentRuns > nConcurrentLumis) {
381 nConcurrentRuns = nConcurrentLumis;
384 if (!loopers.empty()) {
386 if (nStreams != 1 || nConcurrentLumis != 1 || nConcurrentRuns != 1) {
387 edm::LogWarning(
"ThreadStreamSetup") <<
"There is a looper, so the number of streams, the number " 388 "of concurrent runs, and the number of concurrent lumis " 389 "are all being reset to 1. Loopers cannot currently support " 390 "values greater than 1.";
392 nConcurrentLumis = 1;
396 bool dumpOptions = optionsPset.getUntrackedParameter<
bool>(
"dumpOptions");
400 if (nThreads > 1
or nStreams > 1) {
401 edm::LogInfo(
"ThreadStreamSetup") <<
"setting # threads " << nThreads <<
"\nsetting # streams " << nStreams;
410 unsigned int maxConcurrentIOVs =
411 3 * nConcurrentRuns - 2 + ((nConcurrentLumis > nConcurrentRuns) ? (nConcurrentLumis - nConcurrentRuns) : 0);
417 optionsPset.getUntrackedParameter<
bool>(
"deleteNonConsumedUnscheduledModules");
420 if (not hasSubProcesses) {
424 auto referencePSets =
425 optionsPset.getUntrackedParameter<std::vector<edm::ParameterSet>>(
"holdsReferencesToDeleteEarly");
426 for (
auto const&
pset : referencePSets) {
428 auto references =
pset.getParameter<std::vector<std::string>>(
"references");
429 for (
auto const& ref : references) {
434 optionsPset.getUntrackedParameter<std::vector<std::string>>(
"modulesToIgnoreForDeleteEarly");
441 auto& serviceSets = processDesc->getServicesPSets();
451 handler->willBeUsingThreads();
458 ParameterSet const& eventSetupPset(optionsPset.getUntrackedParameterSet(
"eventSetup"));
463 if (!loopers.empty()) {
474 runQueue_ = std::make_unique<LimitedTaskQueue>(nConcurrentRuns);
475 lumiQueue_ = std::make_unique<LimitedTaskQueue>(nConcurrentLumis);
483 std::optional<ScheduleItems::MadeModules> madeModules;
486 tbb::task_group
group;
489 auto tempReg = std::make_shared<ProductRegistry>();
492 group.run([&,
this]() {
500 group.run([&,
this, tempReg]() {
506 items.branchIDListHelper(),
508 items.thinnedAssociationsHelper(),
510 items.processConfiguration(),
515 items.preg()->addFromInput(*tempReg);
545 auto ep = std::make_shared<EventPrincipal>(
preg(),
557 auto rp = std::make_unique<RunPrincipal>(
578 for (
auto& subProcessPSet : subProcessVParameterSet) {
650 std::vector<ModuleProcessName> consumedBySubProcesses;
653 auto c = subProcess.keepOnlyConsumedUnscheduledModules(deleteModules);
654 if (consumedBySubProcesses.empty()) {
656 }
else if (not
c.empty()) {
657 std::vector<ModuleProcessName>
tmp;
658 tmp.reserve(consumedBySubProcesses.size() +
c.size());
660 consumedBySubProcesses.end(),
663 std::back_inserter(
tmp));
672 not unusedModules.empty()) {
675 edm::LogInfo(
"DeleteModules").log([&unusedModules](
auto&
l) {
676 l <<
"Following modules are not in any Path or EndPath, nor is their output consumed by any other module, " 678 "therefore they are deleted before beginJob transition.";
695 schedule_->initializeEarlyDelete(branchesToDeleteEarly, referencesToBranches, modulesToSkip, *
preg_);
725 ex.
addContext(
"Calling beginJob for the source");
731 constexpr bool mustPrefetchMayGet =
true;
733 auto const runLookup =
preg_->productLookup(
InRun);
734 auto const lumiLookup =
preg_->productLookup(
InLumi);
737 looper_->updateLookup(
InRun, *runLookup, mustPrefetchMayGet);
738 looper_->updateLookup(
InLumi, *lumiLookup, mustPrefetchMayGet);
740 looper_->updateLookup(
esp_->recordsToResolverIndices());
744 actReg_->postBeginJobSignal_();
746 oneapi::tbb::task_group
group;
749 first([
this](
auto nextTask) {
751 first([
i,
this](
auto nextTask) {
766 "Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
773 oneapi::tbb::task_group
group;
781 first([
this,
i, &
c, &collectorMutex](
auto nextTask) {
782 std::exception_ptr
ep;
787 ep = std::current_exception();
790 std::lock_guard<std::mutex>
l(collectorMutex);
791 c.call([&
ep]() { std::rethrow_exception(
ep); });
793 }) |
then([
this,
i, &
c, &collectorMutex](
auto nextTask) {
795 first([
this,
i, &
c, &collectorMutex, &subProcess](
auto nextTask) {
796 std::exception_ptr
ep;
799 subProcess.doEndStream(
i);
801 ep = std::current_exception();
804 std::lock_guard<std::mutex>
l(collectorMutex);
805 c.call([&
ep]() { std::rethrow_exception(
ep); });
812 waitTask.waitNoThrow();
815 c.call([actReg]() { actReg->preEndJobSignal_(); });
824 c.call([actReg]() { actReg->postEndJobSignal_(); });
833 return schedule_->getAllModuleDescriptions();
845 #include "TransitionProcessors.icc" 849 bool returnValue =
false;
854 edm::LogSystem(
"ShutdownSignal") <<
"an external signal was sent to shutdown the job early.";
863 struct SourceNextGuard {
865 ~SourceNextGuard() { act_.postSourceNextTransitionSignal_.emit(); }
874 SourceNextGuard guard(*
actReg_.get());
877 itemTypeInfo =
input_->nextItemType();
881 sentry.completedSuccessfully();
900 std::lock_guard<std::recursive_mutex> guard(*(
sourceMutex_.get()));
903 runStatus->runPrincipal()->run() ==
input_->run() &&
904 runStatus->runPrincipal()->reducedProcessHistoryID() ==
input_->reducedProcessHistoryID()) {
906 <<
"InputSource claimed previous Run Entry was last to be merged in this file,\n" 907 <<
"but the next entry has the same run number and reduced ProcessHistoryID.\n" 908 <<
"This is probably a bug in the InputSource. Please report to the Core group.\n";
911 nextHolder.doneWaiting(std::current_exception());
921 actReg_->beginProcessingSignal_();
922 auto endSignal = [](
ActivityRegistry* iReg) { iReg->endProcessingSignal_(); };
923 std::unique_ptr<ActivityRegistry, decltype(endSignal)> guard(
actReg_.get(), endSignal);
928 bool firstTime =
true;
938 auto trans =
fp.processFiles(*
this);
949 throw cms::Exception(
"BadTransition") <<
"Unexpected transition change " <<
static_cast<int>(trans);
958 "Another exception was caught while trying to clean up lumis after the primary fatal exception.");
959 e.addAdditionalInfo(message);
960 if (
e.alreadyPrinted()) {
966 "Another exception was caught while trying to clean up runs after the primary fatal exception.");
967 e.addAdditionalInfo(message);
968 if (
e.alreadyPrinted()) {
974 if (
e.alreadyPrinted()) {
984 FDEBUG(1) <<
" \treadFile\n";
985 size_t size =
preg_->size();
990 streamRunStatus_[0]->runPrincipal()->adjustIndexesAfterProductRegistryAddition();
994 streamLumiStatus_[0]->lumiPrincipal()->adjustIndexesAfterProductRegistryAddition();
998 if (size < preg_->size()) {
1005 sentry.completedSuccessfully();
1011 input_->closeFile(
fb_.get(), cleaningUpAfterException);
1012 sentry.completedSuccessfully();
1014 FDEBUG(1) <<
"\tcloseInputFile\n";
1022 FDEBUG(1) <<
"\topenOutputFiles\n";
1029 FDEBUG(1) <<
"\tcloseOutputFiles\n";
1035 [
this](
auto& subProcess) { subProcess.updateBranchIDListHelper(
branchIDListHelper_->branchIDLists()); });
1039 FDEBUG(1) <<
"\trespondToOpenInputFile\n";
1047 FDEBUG(1) <<
"\trespondToCloseInputFile\n";
1057 FDEBUG(1) <<
"\tstartingNewLoop\n";
1063 looper_->setModuleChanger(&changer);
1065 looper_->setModuleChanger(
nullptr);
1071 FDEBUG(1) <<
"\tendOfLoop\n";
1078 FDEBUG(1) <<
"\trewind\n";
1083 FDEBUG(1) <<
"\tprepareForNextLoop\n";
1087 FDEBUG(1) <<
"\tshouldWeCloseOutput\n";
1090 if (subProcess.shouldWeCloseOutput()) {
1096 return schedule_->shouldWeCloseOutput();
1100 FDEBUG(1) <<
"\tdoErrorStuff\n";
1101 LogError(
"StateMachine") <<
"The EventProcessor state machine encountered an unexpected event\n" 1102 <<
"and went to the error state\n" 1103 <<
"Will attempt to terminate processing normally\n" 1104 <<
"(IF using the looper the next loop will be attempted)\n" 1105 <<
"This likely indicates a bug in an input module or corrupted input or both\n";
1116 beginGlobalTransitionAsync<Traits>(
1119 globalWaitTask.wait();
1120 beginProcessBlockSucceeded =
true;
1124 input_->fillProcessBlockHelper();
1126 while (
input_->nextProcessBlock(processBlockPrincipal)) {
1133 beginGlobalTransitionAsync<Traits>(
1136 globalWaitTask.wait();
1140 writeWaitTask.wait();
1161 cleaningUpAfterException);
1162 globalWaitTask.wait();
1164 if (beginProcessBlockSucceeded) {
1167 writeWaitTask.wait();
1190 runStatus->runPrincipal()->run() ==
input_->run() and
1191 runStatus->runPrincipal()->reducedProcessHistoryID() ==
input_->reducedProcessHistoryID()) {
1199 runStatus->setHolderOfTaskInProcessRuns(holder);
1216 actReg_->esSyncIOVQueuingSignal_.emit(iSync);
1223 status->endIOVWaitingTasks(),
1224 status->eventSetupImpls(),
1229 }) |
chain::then([
this,
status](std::exception_ptr
const* iException,
auto nextTask) {
1242 if (postRunQueueTask.taskHasFailed()) {
1243 status->resetBeginResources();
1251 *postRunQueueTask.group(), [
this, postSourceTask = postRunQueueTask,
status]()
mutable {
1255 if (postSourceTask.taskHasFailed()) {
1256 status->resetBeginResources();
1258 status->resumeGlobalRunQueue();
1268 sentry.completedSuccessfully();
1275 oneapi::tbb::task_group
group;
1280 }) |
then([
this, &es](
auto nextTask)
mutable {
1297 status->setStopBeforeProcessingRun(
true);
1298 nextTask.doneWaiting(std::current_exception());
1300 }) |
then([
this,
status, &es](
auto nextTask) {
1301 if (
status->stopBeforeProcessingRun()) {
1306 beginGlobalTransitionAsync<Traits>(
1309 if (
status->stopBeforeProcessingRun()) {
1315 if (
status->stopBeforeProcessingRun()) {
1320 }) |
then([
this,
status](std::exception_ptr
const* iException,
auto holder)
mutable {
1325 status->globalBeginDidSucceed();
1328 if (
status->stopBeforeProcessingRun()) {
1330 status->resetBeginResources();
1332 status->resumeGlobalRunQueue();
1338 auto globalEndRunTask =
1341 status->holderOfTaskInProcessRuns().doneWaiting(std::exception_ptr{});
1346 status->resetBeginResources();
1348 status->resumeGlobalRunQueue();
1349 holder.doneWaiting(std::current_exception());
1370 if (
status->streamFinishedBeginRun()) {
1373 status->resetBeginResources();
1383 status->resetBeginResources();
1390 status->resetBeginResources();
1392 status->resumeGlobalRunQueue();
1393 postSourceTask.doneWaiting(std::current_exception());
1397 status->resetBeginResources();
1399 status->resumeGlobalRunQueue();
1400 postRunQueueTask.doneWaiting(std::current_exception());
1404 status->resetBeginResources();
1406 nextTask.doneWaiting(std::current_exception());
1412 std::shared_ptr<RunProcessingStatus>
status,
1427 beginStreamTransitionAsync<Traits>(
1430 }) |
then([
this, iStream](std::exception_ptr
const* exceptionFromBeginStreamRun,
auto nextTask) {
1431 if (exceptionFromBeginStreamRun) {
1432 nextTask.doneWaiting(*exceptionFromBeginStreamRun);
1438 iHolder.doneWaiting(std::current_exception());
1444 if (
status->streamFinishedBeginRun()) {
1445 status->resetBeginResources();
1452 RunPrincipal& runPrincipal = *iRunStatus->runPrincipal();
1465 iRunStatus->endIOVWaitingTasksEndRun(),
1466 iRunStatus->eventSetupImplsEndRun(),
1470 }) |
chain::then([
this, iRunStatus](std::exception_ptr
const* iException,
auto nextTask) {
1472 iRunStatus->setEndingEventSetupSucceeded(
false);
1479 streamQueues_[
i].push(*nextTask.group(), [
this,
i, nextTask]()
mutable {
1506 tmp.doneWaiting(iException);
1511 auto& runPrincipal = *(iRunStatus->runPrincipal());
1512 bool didGlobalBeginSucceed = iRunStatus->didGlobalBeginSucceed();
1513 bool cleaningUpAfterException = iRunStatus->cleaningUpAfterException() || iTask.
taskHasFailed();
1514 EventSetupImpl const& es = iRunStatus->eventSetupImplEndRun(
esp_->subProcessIndex());
1515 std::vector<std::shared_ptr<const EventSetupImpl>>
const* eventSetupImpls = &iRunStatus->eventSetupImplsEndRun();
1516 bool endingEventSetupSucceeded = iRunStatus->endingEventSetupSucceeded();
1520 chain::first([
this, &runPrincipal, &es, &eventSetupImpls, cleaningUpAfterException, endingEventSetupSucceeded](
1522 if (endingEventSetupSucceeded) {
1525 endGlobalTransitionAsync<Traits>(
1530 [
this, &runPrincipal, &es](
auto nextTask) {
1534 [
this, &runPrincipal, &es](
auto nextTask) {
1538 ifThen(didGlobalBeginSucceed && endingEventSetupSucceeded,
1539 [
this, mergeableRunProductMetadata, &runPrincipal = runPrincipal](
auto nextTask) {
1541 writeRunAsync(nextTask, runPrincipal, mergeableRunProductMetadata);
1545 didGlobalBeginSucceed,
1546 mergeableRunProductMetadata,
1547 endingEventSetupSucceeded](std::exception_ptr
const* iException,
auto nextTask)
mutable {
1548 if (didGlobalBeginSucceed && endingEventSetupSucceeded) {
1549 mergeableRunProductMetadata->postWriteRun();
1556 std::exception_ptr ptr;
1563 ptr = std::current_exception();
1567 status->resumeGlobalRunQueue();
1571 ptr = std::current_exception();
1575 status->resetEndResources();
1579 ptr = std::current_exception();
1583 if (ptr && !iException) {
1609 if (runStatus->streamFinishedRun()) {
1610 runStatus->globalEndRunHolder().doneWaiting(std::exception_ptr());
1621 if (runStatus->didGlobalBeginSucceed() && runStatus->endingEventSetupSucceeded()) {
1623 auto eventSetupImpls = &runStatus->eventSetupImplsEndRun();
1624 bool cleaningUpAfterException = runStatus->cleaningUpAfterException() || iTask.
taskHasFailed();
1626 auto& runPrincipal = *runStatus->runPrincipal();
1629 endStreamTransitionAsync<Traits>(
std::move(runDoneTaskHolder),
1635 cleaningUpAfterException);
1647 runStatus->setCleaningUpAfterException(cleaningUpAfterException);
1649 runStatus->setHolderOfTaskInProcessRuns(holder);
1657 std::shared_ptr<RunProcessingStatus> iRunStatus,
1659 actReg_->esSyncIOVQueuingSignal_.emit(iSync);
1661 auto status = std::make_shared<LuminosityBlockProcessingStatus>();
1665 status->endIOVWaitingTasks(),
1666 status->eventSetupImpls(),
1670 }) |
chain::then([
this,
status, iRunStatus](std::exception_ptr
const* iException,
auto nextTask) {
1682 if (postLumiQueueTask.taskHasFailed()) {
1683 status->resetResources();
1692 *postLumiQueueTask.group(),
1693 [
this, postSourceTask = postLumiQueueTask,
status, iRunStatus]()
mutable {
1697 if (postSourceTask.taskHasFailed()) {
1698 status->resetResources();
1710 sentry.completedSuccessfully();
1716 rng->preBeginLumi(lb);
1728 }) |
then([
this,
status, &es, &lumiPrincipal](
auto nextTask) {
1731 beginGlobalTransitionAsync<Traits>(
1739 }) |
then([
this,
status, iRunStatus](std::exception_ptr
const* iException,
auto holder)
mutable {
1740 status->setGlobalEndRunHolder(iRunStatus->globalEndRunHolder());
1748 status->globalBeginDidSucceed();
1756 if (!
status->shouldStreamStartLumi()) {
1762 auto eventSetupImpls = &
status->eventSetupImpls();
1763 auto lp =
status->lumiPrincipal().get();
1766 event.setLuminosityBlockPrincipal(lp);
1770 beginStreamTransitionAsync<Traits>(
std::move(nextTask),
1777 then([
this,
i](std::exception_ptr
const* exceptionFromBeginStreamLumi,
1779 if (exceptionFromBeginStreamLumi) {
1781 copyHolder.
doneWaiting(*exceptionFromBeginStreamLumi);
1792 status->resetResources();
1800 status->resetResources();
1808 status->resetResources();
1824 status->lumiPrincipal()->luminosityBlock() ==
input_->luminosityBlock()) {
1829 unsigned int streamIndex = 0;
1830 oneapi::tbb::task_arena arena{oneapi::tbb::task_arena::attach()};
1834 nextTask.group()->run(
1844 tmp.doneWaiting(iException);
1849 std::shared_ptr<LuminosityBlockProcessingStatus> iLumiStatus) {
1852 auto& lp = *(iLumiStatus->lumiPrincipal());
1853 bool didGlobalBeginSucceed = iLumiStatus->didGlobalBeginSucceed();
1854 bool cleaningUpAfterException = iLumiStatus->cleaningUpAfterException() || iTask.
taskHasFailed();
1856 std::vector<std::shared_ptr<const EventSetupImpl>>
const* eventSetupImpls = &iLumiStatus->eventSetupImpls();
1859 chain::first([
this, &lp, &es, &eventSetupImpls, cleaningUpAfterException](
auto nextTask) {
1864 endGlobalTransitionAsync<Traits>(
1866 }) |
then([
this, didGlobalBeginSucceed, &lumiPrincipal = lp](
auto nextTask) {
1868 if (didGlobalBeginSucceed) {
1877 }) |
then([
status =
std::move(iLumiStatus),
this](std::exception_ptr
const* iException,
auto nextTask)
mutable {
1883 std::exception_ptr ptr;
1891 ptr = std::current_exception();
1897 ptr = std::current_exception();
1902 status->resetResources();
1903 status->globalEndRunHolderDoneWaiting();
1907 ptr = std::current_exception();
1911 if (ptr && !iException) {
1930 if (
status->streamFinishedLumi()) {
1940 lumiStatus->setEndTime();
1943 auto eventSetupImpls = &lumiStatus->eventSetupImpls();
1944 bool cleaningUpAfterException = lumiStatus->cleaningUpAfterException() || iTask.
taskHasFailed();
1946 auto& lumiPrincipal = *lumiStatus->lumiPrincipal();
1949 endStreamTransitionAsync<Traits>(
std::move(lumiDoneTask),
1955 cleaningUpAfterException);
1971 globalWaitTask.wait();
1978 input_->readProcessBlock(processBlockPrincipal);
1979 sentry.completedSuccessfully();
1985 rp->setAux(*
input_->runAuxiliary());
1989 sentry.completedSuccessfully();
1991 assert(
input_->reducedProcessHistoryID() == rp->reducedProcessHistoryID());
2003 input_->readAndMergeRun(runPrincipal);
2004 sentry.completedSuccessfully();
2011 lbp->setAux(*
input_->luminosityBlockAuxiliary());
2015 sentry.completedSuccessfully();
2023 assert(lumiPrincipal.aux().sameIdentity(*
input_->luminosityBlockAuxiliary())
or 2024 input_->processHistoryRegistry().reducedProcessHistoryID(lumiPrincipal.aux().processHistoryID()) ==
2025 input_->processHistoryRegistry().reducedProcessHistoryID(
2026 input_->luminosityBlockAuxiliary()->processHistoryID()));
2027 bool lumiOK = lumiPrincipal.adjustToNewProductRegistry(*
preg());
2029 lumiPrincipal.mergeAuxiliary(*
input_->luminosityBlockAuxiliary());
2033 sentry.completedSuccessfully();
2046 s.writeProcessBlockAsync(nextTask, processBlockType);
2062 s.writeRunAsync(nextTask, runPrincipal, mergeableRunProductMetadata);
2087 s.writeLumiAsync(nextTask, lumiPrincipal);
2097 iStatus.
lumiPrincipal()->setRunPrincipal(std::shared_ptr<RunPrincipal>());
2110 std::lock_guard<std::recursive_mutex> guard(*(
sourceMutex_.get()));
2117 status->runPrincipal()->reducedProcessHistoryID() ==
input_->reducedProcessHistoryID()) {
2118 if (
status->holderOfTaskInProcessRuns().taskHasFailed()) {
2119 status->setStopBeforeProcessingRun(
true);
2130 status->setStopBeforeProcessingRun(
true);
2131 holder.doneWaiting(std::current_exception());
2144 std::lock_guard<std::recursive_mutex> guard(*(
sourceMutex_.get()));
2150 iLumiStatus->lumiPrincipal()->luminosityBlock() ==
input_->luminosityBlock()) {
2159 holder.doneWaiting(std::current_exception());
2166 chain::first([
this, iRunStatus](
auto nextTask)
mutable {
2170 }) |
chain::then([
this, iRunStatus](std::exception_ptr
const* iException,
auto nextTask) {
2177 iRunStatus->holderOfTaskInProcessRuns().doneWaiting(std::exception_ptr{});
2183 input_->luminosityBlockAuxiliary()->beginTime()),
2199 unsigned int iStreamIndex,
2231 std::lock_guard<std::recursive_mutex> guard(*(
sourceMutex_.get()));
2246 iStatus.
lumiPrincipal()->runPrincipal().reducedProcessHistoryID() !=
input_->reducedProcessHistoryID()))) {
2250 <<
"InputSource claimed previous Lumi Entry was last to be merged in this file,\n" 2251 <<
"but the next lumi entry has the same lumi number.\n" 2252 <<
"This is probably a bug in the InputSource. Please report to the Core group.\n";
2276 auto recursionTask =
2277 make_waiting_task([
this, iTask, iStreamIndex](std::exception_ptr
const* iEventException)
mutable {
2278 if (iEventException) {
2292 if (not
status->haveStartedNextLumiOrEndedRun()) {
2293 status->noMoreEventsInLumi();
2294 status->startNextLumiOrEndRun();
2298 input_->luminosityBlockAuxiliary()->beginTime()),
2317 if (runStatus->holderOfTaskInProcessRuns().hasTask()) {
2318 runStatus->holderOfTaskInProcessRuns().doneWaiting(std::exception_ptr{});
2340 sentry.completedSuccessfully();
2342 FDEBUG(1) <<
"\treadEvent\n";
2350 struct ClearEventGuard {
2352 : act_(iReg), context_(iContext) {
2355 ~ClearEventGuard() { act_.postClearEventSignal_.emit(context_); }
2368 rng->postEventRead(
ev);
2373 chain::first([
this, &es, pep, iStreamIndex](
auto nextTask) {
2378 subProcess.doEventAsync(nextTask, *pep, &
streamLumiStatus_[iStreamIndex]->eventSetupImpls());
2380 }) |
ifThen(
looper_, [
this, iStreamIndex, pep](
auto nextTask) {
2384 }) |
then([
this, pep](
auto nextTask) {
2385 FDEBUG(1) <<
"\tprocessEvent\n";
2389 pep->runPrincipal().index(),
2390 pep->luminosityBlockPrincipal().index(),
2393 ClearEventGuard guard(*this->
actReg_.get(), streamContext);
2394 pep->clearEventPrincipal();
2399 bool randomAccess =
input_->randomAccess();
2408 status =
looper_->doDuringLoop(iPrincipal, es, pc, &streamContext);
2426 FDEBUG(1) <<
"\tshouldWeStop\n";
2431 if (subProcess.terminate()) {
2447 bool expected =
false;
2457 ex <<
"The framework is configured to use at least two streams, but the following modules\n" 2458 <<
"require synchronizing on LuminosityBlock boundaries:";
2460 for (
auto worker :
schedule_->allWorkers()) {
2461 if (worker->wantsGlobalLuminosityBlocks() and worker->globalLuminosityBlocksQueue()) {
2463 ex <<
"\n " << worker->description()->moduleName() <<
" " << worker->description()->moduleLabel();
2467 ex <<
"\n\nThe situation can be fixed by either\n" 2468 <<
" * modifying the modules to support concurrent LuminosityBlocks (preferred), or\n" 2469 <<
" * setting 'process.options.numberOfConcurrentLuminosityBlocks = 1' in the configuration file";
2475 std::unique_ptr<LogSystem>
s;
2476 for (
auto worker :
schedule_->allWorkers()) {
2477 if (worker->wantsGlobalRuns() and worker->globalRunsQueue()) {
2479 s = std::make_unique<LogSystem>(
"ModulesSynchingOnRuns");
2480 (*s) <<
"The following modules require synchronizing on Run boundaries:";
2482 (*s) <<
"\n " << worker->description()->moduleName() <<
" " << worker->description()->moduleLabel();
LuminosityBlockNumber_t luminosityBlock() const
PreClearEvent preClearEventSignal_
signal is emitted before the data products in the Event are cleared
std::atomic< bool > exceptionMessageLumis_
bool readNextEventForStream(WaitingTaskHolder const &, unsigned int iStreamIndex, LuminosityBlockProcessingStatus &)
void readEvent(unsigned int iStreamIndex)
void streamEndRunAsync(WaitingTaskHolder, unsigned int iStreamIndex)
ProcessContext processContext_
Log< level::System, false > LogSystem
void setProcessesWithMergeableRunProducts(ProductRegistry const &productRegistry)
T getParameter(std::string const &) const
void clear()
Not thread safe.
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
void clearRunPrincipal(RunProcessingStatus &)
void setExceptionMessageRuns()
void globalEndRunAsync(WaitingTaskHolder, std::shared_ptr< RunProcessingStatus >)
SharedResourcesAcquirer sourceResourcesAcquirer_
Timestamp const & endTime() const
void handleNextEventForStreamAsync(WaitingTaskHolder, unsigned int iStreamIndex)
int totalEventsFailed() const
std::shared_ptr< ProductRegistry const > preg() const
void processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
InputSource::ItemTypeInfo lastSourceTransition_
def create(alignables, pedeDump, additionalData, outputFile, config)
std::shared_ptr< RunPrincipal > readRun()
void handleEndLumiExceptions(std::exception_ptr, WaitingTaskHolder const &)
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
int merge(int argc, char *argv[])
static PFTauRenderPlugin instance
EventProcessingState eventProcessingState() const
SerialTaskQueue streamQueuesInserter_
void endUnfinishedRun(bool cleaningUpAfterException)
void streamBeginRunAsync(unsigned int iStream, std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder) noexcept
void setExceptionMessageFiles(std::string &message)
InputSource::ItemTypeInfo nextTransitionType()
RunPrincipal const & runPrincipal() const
void processEventAsyncImpl(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
void processEventWithLooper(EventPrincipal &, unsigned int iStreamIndex)
void handleEndRunExceptions(std::exception_ptr, WaitingTaskHolder const &)
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
std::unique_ptr< edm::LimitedTaskQueue > lumiQueue_
volatile std::atomic< bool > shutdown_flag
static LuminosityBlockNumber_t maxLuminosityBlockNumber()
void clearCounters()
Clears counters used by trigger report.
ProcessBlockPrincipal & inputProcessBlockPrincipal() const
void push(oneapi::tbb::task_group &, const T &iAction)
asynchronously pushes functor iAction into queue
bool checkForAsyncStopRequest(StatusCode &)
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
bool needToCallNext() const
void writeLumiAsync(WaitingTaskHolder, LuminosityBlockPrincipal &)
std::unique_ptr< edm::ModuleTypeResolverMaker const > makeModuleTypeResolverMaker(edm::ParameterSet const &pset)
std::shared_ptr< EDLooperBase const > looper() const
void ensureAvailableAccelerators(edm::ParameterSet const ¶meterSet)
ParameterSet getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
std::shared_ptr< RunPrincipal > getAvailableRunPrincipalPtr()
std::shared_ptr< RunPrincipal > & runPrincipal()
void adjustEventsToNewProductRegistry(std::shared_ptr< ProductRegistry const >)
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
Log< level::Error, false > LogError
void adjustIndexesAfterProductRegistryAddition()
StreamID streamID() const
ParameterSet const & parameterSet(StableProvenance const &provenance, ProcessHistory const &history)
PreallocationConfiguration preallocations_
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
constexpr auto then(O &&iO)
std::vector< SubProcess > subProcesses_
static unsigned int getUniqueID()
Returns a unique id each time called. Intended to be passed to ModuleDescription's constructor's modI...
void swap(Association< C > &lhs, Association< C > &rhs)
bool didGlobalBeginSucceed() const
std::atomic< bool > exceptionMessageRuns_
void mergeAuxiliary(RunAuxiliary const &aux)
edm::SerialTaskQueue queueWhichWaitsForIOVsToFinish_
void validateTopLevelParameterSets(ParameterSet *processParameterSet)
MergeableRunProductProcesses mergeableRunProductProcesses_
static std::string const input
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
void fillProcessBlockPrincipal(std::string const &processName, DelayedReader *reader=nullptr)
void setEndTime(Timestamp const &time)
ProcessBlockPrincipal & processBlockPrincipal() const
edm::propagate_const< std::unique_ptr< ModuleTypeResolverMaker const > > moduleTypeResolverMaker_
void endProcessBlock(bool cleaningUpAfterException, bool beginProcessBlockSucceeded)
U second(std::pair< T, U > const &p)
std::shared_ptr< LuminosityBlockPrincipal > & lumiPrincipal()
oneapi::tbb::task_group * group() const noexcept
void emit(Args &&... args) const
std::multimap< std::string, std::string > referencesToBranches_
unsigned int numberOfThreads() const
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
ServiceToken serviceToken_
ParameterSetID id() const
std::atomic< bool > deferredExceptionPtrIsSet_
bool resume()
Resumes processing if the queue was paused.
ParameterSet const & registerIt()
std::shared_ptr< EDLooperBase > fillLooper(eventsetup::EventSetupsController &esController, eventsetup::EventSetupProvider &cp, ParameterSet ¶ms, std::vector< std::string > const &loopers)
std::vector< edm::SerialTaskQueue > streamQueues_
auto runLast(edm::WaitingTaskHolder iTask)
void nextTransitionTypeAsync(std::shared_ptr< RunProcessingStatus > iRunStatus, WaitingTaskHolder nextTask)
bool taskHasFailed() const noexcept
std::vector< std::string > modulesToIgnoreForDeleteEarly_
ShouldWriteRun shouldWriteRun() const
std::unique_ptr< edm::LimitedTaskQueue > runQueue_
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
void writeRunAsync(WaitingTaskHolder, RunPrincipal const &, MergeableRunProductMetadata const *)
SerialTaskQueueChain & serialQueueChain() const
static void setThrowAnException(bool v)
void writeProcessBlockAsync(WaitingTaskHolder, ProcessBlockType)
void setLastOperationSucceeded(bool value)
unsigned int numberOfStreams() const
void initialize(Schedule const *, std::shared_ptr< ProductRegistry const >)
std::vector< ModuleDescription const * > nonConsumedUnscheduledModules(edm::PathsAndConsumesOfModulesBase const &iPnC, std::vector< ModuleProcessName > &consumedByChildren)
static ServiceRegistry & instance()
void clear()
Not thread safe.
void setNeedToCallNext(bool val)
StatusCode runToCompletion()
FunctorWaitingTask< F > * make_waiting_task(F f)
void clearLumiPrincipal(LuminosityBlockProcessingStatus &)
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
PreSourceNextTransition preSourceNextTransitionSignal_
bool lastOperationSucceeded() const
unsigned int numberOfRuns() const
void insert(std::unique_ptr< ProcessBlockPrincipal >)
std::vector< std::shared_ptr< RunProcessingStatus > > streamRunStatus_
void doneWaiting(std::exception_ptr iExcept) noexcept
std::string exceptionMessageFiles_
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
std::unique_ptr< InputSource > makeInput(unsigned int moduleIndex, ParameterSet ¶ms, CommonParams const &common, std::shared_ptr< ProductRegistry > preg, std::shared_ptr< BranchIDListHelper > branchIDListHelper, std::shared_ptr< ProcessBlockHelper > const &processBlockHelper, std::shared_ptr< ThinnedAssociationsHelper > thinnedAssociationsHelper, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, PreallocationConfiguration const &allocations)
void releaseBeginRunResources(unsigned int iStream)
std::shared_ptr< RunProcessingStatus > exceptionRunStatus_
void respondToCloseInputFile()
Log< level::Info, false > LogInfo
void readAndMergeRun(RunProcessingStatus &)
std::vector< ParameterSet > popSubProcessVParameterSet(ParameterSet ¶meterSet)
void reportShutdownSignal()
void warnAboutModulesRequiringRunSynchronization() const
void checkForModuleDependencyCorrectness(edm::PathsAndConsumesOfModulesBase const &iPnC, bool iPrintDependencies)
void respondToOpenInputFile()
unsigned int numberOfLuminosityBlocks() const
void beginLumiAsync(IOVSyncValue const &, std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
EventSetupImpl const & eventSetupImpl(unsigned subProcessIndex) const
void handleNextItemAfterMergingRunEntries(std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
InputSource::ItemTypeInfo lastTransitionType() const
InputSource::ItemType processRuns()
MergeableRunProductMetadata * mergeableRunProductMetadata()
oneapi::tbb::task_group taskGroup_
void throwAboutModulesRequiringLuminosityBlockSynchronization() const
void insertForInput(std::unique_ptr< ProcessBlockPrincipal >)
void addContext(std::string const &context)
static EventNumber_t maxEventNumber()
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
ShouldWriteLumi shouldWriteLumi() const
edm::EventID specifiedEventTransition() const
void endRunAsync(std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
bool forceESCacheClearOnNewRun_
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
bool shouldWeStop() const
std::vector< std::shared_ptr< const EventSetupImpl > > & eventSetupImpls()
std::atomic< unsigned int > streamRunActive_
void readAndMergeLumi(LuminosityBlockProcessingStatus &)
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
std::vector< std::string > branchesToDeleteEarly_
void readAndMergeLumiEntriesAsync(std::shared_ptr< LuminosityBlockProcessingStatus >, WaitingTaskHolder)
void closeInputFile(bool cleaningUpAfterException)
void readAndMergeRunEntriesAsync(std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
void readProcessBlock(ProcessBlockPrincipal &)
bool adjustToNewProductRegistry(ProductRegistry const ®)
static ComponentFactory< T > const * get()
std::exception_ptr deferredExceptionPtr_
void removeModules(std::vector< ModuleDescription const *> const &modules)
std::shared_ptr< LuminosityBlockPrincipal > readLuminosityBlock(std::shared_ptr< RunPrincipal > rp)
auto lastTask(edm::WaitingTaskHolder iTask)
T const & get(Event const &event, InputTag const &tag) noexcept(false)
void endUnfinishedLumi(bool cleaningUpAfterException)
bool shouldWeCloseOutput() const
PathsAndConsumesOfModules pathsAndConsumesOfModules_
int totalEventsPassed() const
edm::propagate_const< std::shared_ptr< ProcessBlockHelper > > processBlockHelper_
void continueLumiAsync(WaitingTaskHolder)
Transition requestedTransition() const
void globalEndLumiAsync(WaitingTaskHolder, std::shared_ptr< LuminosityBlockProcessingStatus >)
Log< level::System, true > LogAbsolute
void prepareForNextLoop()
std::shared_ptr< LuminosityBlockPrincipal > getAvailableLumiPrincipalPtr()
auto wrap(F iFunc) -> decltype(iFunc())
std::shared_ptr< ActivityRegistry > actReg_
std::atomic< unsigned int > streamLumiActive_
Log< level::Warning, false > LogWarning
void streamEndLumiAsync(WaitingTaskHolder, unsigned int iStreamIndex)
void beginProcessBlock(bool &beginProcessBlockSucceeded)
The Signals That Services Can Subscribe To This is based on ActivityRegistry h
Helper function to determine trigger accepts.
T first(std::pair< T, U > const &p)
static ParentageRegistry * instance()
void setExceptionMessageLumis()
void beginRunAsync(IOVSyncValue const &, WaitingTaskHolder)
bool setDeferredException(std::exception_ptr)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
void setEventProcessingState(EventProcessingState val)
bool deleteNonConsumedUnscheduledModules_
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
void inputProcessBlocks()
bool insertMapped(value_type const &v)
static Registry * instance()
PrincipalCache principalCache_
void push(oneapi::tbb::task_group &iGroup, T &&iAction)
asynchronously pushes functor iAction into queue
EventProcessor(std::unique_ptr< ParameterSet > parameterSet, ServiceToken const &token=ServiceToken(), serviceregistry::ServiceLegacy=serviceregistry::kOverlapIsError, std::vector< std::string > const &defaultServices=std::vector< std::string >(), std::vector< std::string > const &forcedServices=std::vector< std::string >())
void dumpOptionsToLogFile(unsigned int nThreads, unsigned int nStreams, unsigned int nConcurrentLumis, unsigned int nConcurrentRuns)