84 #include "boost/range/adaptor/reversed.hpp" 96 #include "oneapi/tbb/task.h" 104 class PauseQueueSentry {
107 ~PauseQueueSentry() { queue_.resume(); }
116 namespace chain = waiting_task::chain;
119 std::unique_ptr<InputSource>
makeInput(
unsigned int moduleIndex,
122 std::shared_ptr<ProductRegistry> preg,
123 std::shared_ptr<BranchIDListHelper> branchIDListHelper,
124 std::shared_ptr<ProcessBlockHelper>
const& processBlockHelper,
125 std::shared_ptr<ThinnedAssociationsHelper> thinnedAssociationsHelper,
126 std::shared_ptr<ActivityRegistry> areg,
127 std::shared_ptr<ProcessConfiguration const> processConfiguration,
130 if (main_input ==
nullptr) {
132 <<
"There must be exactly one source in the configuration.\n" 133 <<
"It is missing (or there are sufficient syntax errors such that it is not recognized as the source)\n";
138 std::unique_ptr<ParameterSetDescriptionFillerBase>
filler(
141 filler->fill(descriptions);
146 std::ostringstream ost;
147 ost <<
"Validating configuration of input source of type " << modtype;
163 processConfiguration.get(),
170 thinnedAssociationsHelper,
174 common.maxSecondsUntilRampdown_,
177 areg->preSourceConstructionSignal_(md);
178 std::unique_ptr<InputSource>
input;
181 std::shared_ptr<int> sentry(
nullptr, [areg, &md](
void*) { areg->postSourceConstructionSignal_(md); });
184 input->preEventReadFromSourceSignal_.connect(std::cref(areg->preEventReadFromSourceSignal_));
185 input->postEventReadFromSourceSignal_.connect(std::cref(areg->postEventReadFromSourceSignal_));
188 std::ostringstream ost;
189 ost <<
"Constructing input source of type " << modtype;
200 std::vector<std::string>
const& loopers) {
201 std::shared_ptr<EDLooperBase> vLooper;
203 assert(1 == loopers.size());
205 for (
auto const& looperName : loopers) {
217 std::vector<std::string>
const& defaultServices,
218 std::vector<std::string>
const& forcedServices)
221 branchIDListHelper_(),
225 espController_(
std::make_unique<eventsetup::EventSetupsController>(moduleTypeResolverMaker_.
get())),
228 processConfiguration_(),
234 deferredExceptionPtrIsSet_(
false),
238 beginJobCalled_(
false),
239 shouldWeStop_(
false),
240 fileModeNoMerge_(
false),
241 exceptionMessageFiles_(),
242 exceptionMessageRuns_(
false),
243 exceptionMessageLumis_(
false),
244 forceLooperToEnd_(
false),
245 looperBeginJobRun_(
false),
246 forceESCacheClearOnNewRun_(
false),
247 eventSetupDataToExcludeFromPrefetching_() {
249 processDesc->addServices(defaultServices, forcedServices);
250 init(processDesc, iToken, iLegacy);
254 std::vector<std::string>
const& defaultServices,
255 std::vector<std::string>
const& forcedServices)
258 branchIDListHelper_(),
262 espController_(
std::make_unique<eventsetup::EventSetupsController>(moduleTypeResolverMaker_.
get())),
265 processConfiguration_(),
271 deferredExceptionPtrIsSet_(
false),
275 beginJobCalled_(
false),
276 shouldWeStop_(
false),
277 fileModeNoMerge_(
false),
278 exceptionMessageFiles_(),
279 exceptionMessageRuns_(
false),
280 exceptionMessageLumis_(
false),
281 forceLooperToEnd_(
false),
282 looperBeginJobRun_(
false),
283 forceESCacheClearOnNewRun_(
false),
284 eventSetupDataToExcludeFromPrefetching_() {
286 processDesc->addServices(defaultServices, forcedServices);
295 branchIDListHelper_(),
299 espController_(
std::make_unique<eventsetup::EventSetupsController>(moduleTypeResolverMaker_.
get())),
302 processConfiguration_(),
308 deferredExceptionPtrIsSet_(
false),
312 beginJobCalled_(
false),
313 shouldWeStop_(
false),
314 fileModeNoMerge_(
false),
315 exceptionMessageFiles_(),
316 exceptionMessageRuns_(
false),
317 exceptionMessageLumis_(
false),
318 forceLooperToEnd_(
false),
319 looperBeginJobRun_(
false),
320 forceESCacheClearOnNewRun_(
false),
321 eventSetupDataToExcludeFromPrefetching_() {
336 std::shared_ptr<ParameterSet>
parameterSet = processDesc->getProcessPSet();
340 bool const hasSubProcesses = !subProcessVParameterSet.empty();
353 <<
"Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
361 unsigned int nThreads = optionsPset.getUntrackedParameter<
unsigned int>(
"numberOfThreads");
367 unsigned int nStreams = optionsPset.getUntrackedParameter<
unsigned int>(
"numberOfStreams");
371 unsigned int nConcurrentLumis =
372 optionsPset.getUntrackedParameter<
unsigned int>(
"numberOfConcurrentLuminosityBlocks");
373 if (nConcurrentLumis == 0) {
374 nConcurrentLumis = 2;
376 if (nConcurrentLumis > nStreams) {
377 nConcurrentLumis = nStreams;
379 unsigned int nConcurrentRuns = optionsPset.getUntrackedParameter<
unsigned int>(
"numberOfConcurrentRuns");
380 if (nConcurrentRuns == 0 || nConcurrentRuns > nConcurrentLumis) {
381 nConcurrentRuns = nConcurrentLumis;
384 if (!loopers.empty()) {
386 if (nStreams != 1 || nConcurrentLumis != 1 || nConcurrentRuns != 1) {
387 edm::LogWarning(
"ThreadStreamSetup") <<
"There is a looper, so the number of streams, the number " 388 "of concurrent runs, and the number of concurrent lumis " 389 "are all being reset to 1. Loopers cannot currently support " 390 "values greater than 1.";
392 nConcurrentLumis = 1;
396 bool dumpOptions = optionsPset.getUntrackedParameter<
bool>(
"dumpOptions");
400 if (nThreads > 1
or nStreams > 1) {
401 edm::LogInfo(
"ThreadStreamSetup") <<
"setting # threads " << nThreads <<
"\nsetting # streams " << nStreams;
410 unsigned int maxConcurrentIOVs =
411 3 * nConcurrentRuns - 2 + ((nConcurrentLumis > nConcurrentRuns) ? (nConcurrentLumis - nConcurrentRuns) : 0);
417 optionsPset.getUntrackedParameter<
bool>(
"deleteNonConsumedUnscheduledModules");
420 if (not hasSubProcesses) {
424 auto referencePSets =
425 optionsPset.getUntrackedParameter<std::vector<edm::ParameterSet>>(
"holdsReferencesToDeleteEarly");
426 for (
auto const&
pset : referencePSets) {
428 auto references =
pset.getParameter<std::vector<std::string>>(
"references");
429 for (
auto const& ref : references) {
434 optionsPset.getUntrackedParameter<std::vector<std::string>>(
"modulesToIgnoreForDeleteEarly");
441 auto& serviceSets = processDesc->getServicesPSets();
451 handler->willBeUsingThreads();
458 ParameterSet const& eventSetupPset(optionsPset.getUntrackedParameterSet(
"eventSetup"));
463 if (!loopers.empty()) {
474 runQueue_ = std::make_unique<LimitedTaskQueue>(nConcurrentRuns);
475 lumiQueue_ = std::make_unique<LimitedTaskQueue>(nConcurrentLumis);
483 std::optional<ScheduleItems::MadeModules> madeModules;
486 tbb::task_group
group;
489 auto tempReg = std::make_shared<ProductRegistry>();
492 group.run([&,
this]() {
500 group.run([&,
this, tempReg]() {
506 items.branchIDListHelper(),
508 items.thinnedAssociationsHelper(),
510 items.processConfiguration(),
515 items.preg()->addFromInput(*tempReg);
545 auto ep = std::make_shared<EventPrincipal>(
preg(),
557 auto rp = std::make_unique<RunPrincipal>(
578 for (
auto& subProcessPSet : subProcessVParameterSet) {
650 std::vector<ModuleProcessName> consumedBySubProcesses;
653 auto c = subProcess.keepOnlyConsumedUnscheduledModules(deleteModules);
654 if (consumedBySubProcesses.empty()) {
656 }
else if (not
c.empty()) {
657 std::vector<ModuleProcessName>
tmp;
658 tmp.reserve(consumedBySubProcesses.size() +
c.size());
660 consumedBySubProcesses.end(),
663 std::back_inserter(
tmp));
672 not unusedModules.empty()) {
675 edm::LogInfo(
"DeleteModules").log([&unusedModules](
auto&
l) {
676 l <<
"Following modules are not in any Path or EndPath, nor is their output consumed by any other module, " 678 "therefore they are deleted before beginJob transition.";
695 schedule_->initializeEarlyDelete(branchesToDeleteEarly, referencesToBranches, modulesToSkip, *
preg_);
726 ex.
addContext(
"Calling beginJob for the source");
732 constexpr bool mustPrefetchMayGet =
true;
734 auto const runLookup =
preg_->productLookup(
InRun);
735 auto const lumiLookup =
preg_->productLookup(
InLumi);
738 looper_->updateLookup(
InRun, *runLookup, mustPrefetchMayGet);
739 looper_->updateLookup(
InLumi, *lumiLookup, mustPrefetchMayGet);
741 looper_->updateLookup(
esp_->recordsToResolverIndices());
745 actReg_->postBeginJobSignal_();
747 oneapi::tbb::task_group
group;
750 first([
this](
auto nextTask) {
752 first([
i,
this](
auto nextTask) {
767 "Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
774 oneapi::tbb::task_group
group;
782 first([
this,
i, &
c, &collectorMutex](
auto nextTask) {
783 std::exception_ptr
ep;
788 ep = std::current_exception();
791 std::lock_guard<std::mutex>
l(collectorMutex);
792 c.call([&
ep]() { std::rethrow_exception(
ep); });
794 }) |
then([
this,
i, &
c, &collectorMutex](
auto nextTask) {
796 first([
this,
i, &
c, &collectorMutex, &subProcess](
auto nextTask) {
797 std::exception_ptr
ep;
800 subProcess.doEndStream(
i);
802 ep = std::current_exception();
805 std::lock_guard<std::mutex>
l(collectorMutex);
806 c.call([&
ep]() { std::rethrow_exception(
ep); });
813 waitTask.waitNoThrow();
816 c.call([actReg]() { actReg->preEndJobSignal_(); });
825 c.call([actReg]() { actReg->postEndJobSignal_(); });
834 return schedule_->getAllModuleDescriptions();
846 #include "TransitionProcessors.icc" 850 bool returnValue =
false;
855 edm::LogSystem(
"ShutdownSignal") <<
"an external signal was sent to shutdown the job early.";
864 struct SourceNextGuard {
866 ~SourceNextGuard() { act_.postSourceNextTransitionSignal_.emit(); }
875 SourceNextGuard guard(*
actReg_.get());
878 itemTypeInfo =
input_->nextItemType();
882 sentry.completedSuccessfully();
901 std::lock_guard<std::recursive_mutex> guard(*(
sourceMutex_.get()));
904 runStatus->runPrincipal()->run() ==
input_->run() &&
905 runStatus->runPrincipal()->reducedProcessHistoryID() ==
input_->reducedProcessHistoryID()) {
907 <<
"InputSource claimed previous Run Entry was last to be merged in this file,\n" 908 <<
"but the next entry has the same run number and reduced ProcessHistoryID.\n" 909 <<
"This is probably a bug in the InputSource. Please report to the Core group.\n";
912 nextHolder.doneWaiting(std::current_exception());
922 actReg_->beginProcessingSignal_();
923 auto endSignal = [](
ActivityRegistry* iReg) { iReg->endProcessingSignal_(); };
924 std::unique_ptr<ActivityRegistry, decltype(endSignal)> guard(
actReg_.get(), endSignal);
929 bool firstTime =
true;
939 auto trans =
fp.processFiles(*
this);
950 throw cms::Exception(
"BadTransition") <<
"Unexpected transition change " <<
static_cast<int>(trans);
959 "Another exception was caught while trying to clean up lumis after the primary fatal exception.");
960 e.addAdditionalInfo(message);
961 if (
e.alreadyPrinted()) {
967 "Another exception was caught while trying to clean up runs after the primary fatal exception.");
968 e.addAdditionalInfo(message);
969 if (
e.alreadyPrinted()) {
975 if (
e.alreadyPrinted()) {
985 FDEBUG(1) <<
" \treadFile\n";
986 size_t size =
preg_->size();
991 streamRunStatus_[0]->runPrincipal()->adjustIndexesAfterProductRegistryAddition();
995 streamLumiStatus_[0]->lumiPrincipal()->adjustIndexesAfterProductRegistryAddition();
999 if (size < preg_->size()) {
1006 sentry.completedSuccessfully();
1012 input_->closeFile(
fb_.get(), cleaningUpAfterException);
1013 sentry.completedSuccessfully();
1015 FDEBUG(1) <<
"\tcloseInputFile\n";
1023 FDEBUG(1) <<
"\topenOutputFiles\n";
1030 FDEBUG(1) <<
"\tcloseOutputFiles\n";
1036 [
this](
auto& subProcess) { subProcess.updateBranchIDListHelper(
branchIDListHelper_->branchIDLists()); });
1040 FDEBUG(1) <<
"\trespondToOpenInputFile\n";
1048 FDEBUG(1) <<
"\trespondToCloseInputFile\n";
1058 FDEBUG(1) <<
"\tstartingNewLoop\n";
1064 looper_->setModuleChanger(&changer);
1066 looper_->setModuleChanger(
nullptr);
1072 FDEBUG(1) <<
"\tendOfLoop\n";
1079 FDEBUG(1) <<
"\trewind\n";
1084 FDEBUG(1) <<
"\tprepareForNextLoop\n";
1088 FDEBUG(1) <<
"\tshouldWeCloseOutput\n";
1091 if (subProcess.shouldWeCloseOutput()) {
1097 return schedule_->shouldWeCloseOutput();
1101 FDEBUG(1) <<
"\tdoErrorStuff\n";
1102 LogError(
"StateMachine") <<
"The EventProcessor state machine encountered an unexpected event\n" 1103 <<
"and went to the error state\n" 1104 <<
"Will attempt to terminate processing normally\n" 1105 <<
"(IF using the looper the next loop will be attempted)\n" 1106 <<
"This likely indicates a bug in an input module or corrupted input or both\n";
1117 beginGlobalTransitionAsync<Traits>(
1120 globalWaitTask.wait();
1121 beginProcessBlockSucceeded =
true;
1125 input_->fillProcessBlockHelper();
1127 while (
input_->nextProcessBlock(processBlockPrincipal)) {
1134 beginGlobalTransitionAsync<Traits>(
1137 globalWaitTask.wait();
1141 writeWaitTask.wait();
1162 cleaningUpAfterException);
1163 globalWaitTask.wait();
1165 if (beginProcessBlockSucceeded) {
1168 writeWaitTask.wait();
1191 runStatus->runPrincipal()->run() ==
input_->run() and
1192 runStatus->runPrincipal()->reducedProcessHistoryID() ==
input_->reducedProcessHistoryID()) {
1200 runStatus->setHolderOfTaskInProcessRuns(holder);
1217 actReg_->esSyncIOVQueuingSignal_.emit(iSync);
1224 status->endIOVWaitingTasks(),
1225 status->eventSetupImpls(),
1230 }) |
chain::then([
this,
status](std::exception_ptr
const* iException,
auto nextTask) {
1243 if (postRunQueueTask.taskHasFailed()) {
1244 status->resetBeginResources();
1252 *postRunQueueTask.group(), [
this, postSourceTask = postRunQueueTask,
status]()
mutable {
1256 if (postSourceTask.taskHasFailed()) {
1257 status->resetBeginResources();
1259 status->resumeGlobalRunQueue();
1269 sentry.completedSuccessfully();
1276 oneapi::tbb::task_group
group;
1281 }) |
then([
this, &es](
auto nextTask)
mutable {
1298 status->setStopBeforeProcessingRun(
true);
1299 nextTask.doneWaiting(std::current_exception());
1301 }) |
then([
this,
status, &es](
auto nextTask) {
1302 if (
status->stopBeforeProcessingRun()) {
1307 beginGlobalTransitionAsync<Traits>(
1310 if (
status->stopBeforeProcessingRun()) {
1313 status->globalBeginDidSucceed();
1315 if (
status->stopBeforeProcessingRun()) {
1321 if (
status->stopBeforeProcessingRun()) {
1326 }) |
then([
this,
status](std::exception_ptr
const* iException,
auto holder)
mutable {
1327 bool precedingTasksSucceeded =
true;
1329 precedingTasksSucceeded =
false;
1334 if (
status->stopBeforeProcessingRun()) {
1336 status->resetBeginResources();
1338 status->resumeGlobalRunQueue();
1344 auto globalEndRunTask =
1347 status->holderOfTaskInProcessRuns().doneWaiting(std::exception_ptr{});
1352 status->resetBeginResources();
1354 status->resumeGlobalRunQueue();
1355 holder.doneWaiting(std::current_exception());
1370 *holder.group(), [
this,
status, precedingTasksSucceeded, holder]()
mutable {
1375 [
this,
i,
status, precedingTasksSucceeded, holder]()
mutable {
1380 if (
status->streamFinishedBeginRun()) {
1383 status->resetBeginResources();
1393 status->resetBeginResources();
1400 status->resetBeginResources();
1402 status->resumeGlobalRunQueue();
1403 postSourceTask.doneWaiting(std::current_exception());
1407 status->resetBeginResources();
1409 status->resumeGlobalRunQueue();
1410 postRunQueueTask.doneWaiting(std::current_exception());
1414 status->resetBeginResources();
1416 nextTask.doneWaiting(std::current_exception());
1422 std::shared_ptr<RunProcessingStatus>
status,
1423 bool precedingTasksSucceeded,
1432 chain::first([
this, iStream, precedingTasksSucceeded](
auto nextTask) {
1433 if (precedingTasksSucceeded) {
1438 beginStreamTransitionAsync<Traits>(
1441 }) |
then([
this, iStream](std::exception_ptr
const* exceptionFromBeginStreamRun,
auto nextTask) {
1442 if (exceptionFromBeginStreamRun) {
1443 nextTask.doneWaiting(*exceptionFromBeginStreamRun);
1455 if (
status->streamFinishedBeginRun()) {
1456 status->resetBeginResources();
1463 RunPrincipal& runPrincipal = *iRunStatus->runPrincipal();
1476 iRunStatus->endIOVWaitingTasksEndRun(),
1477 iRunStatus->eventSetupImplsEndRun(),
1481 }) |
chain::then([
this, iRunStatus](std::exception_ptr
const* iException,
auto nextTask) {
1483 iRunStatus->setEndingEventSetupSucceeded(
false);
1490 streamQueues_[
i].push(*nextTask.group(), [
this,
i, nextTask]()
mutable {
1517 tmp.doneWaiting(iException);
1522 auto& runPrincipal = *(iRunStatus->runPrincipal());
1523 bool didGlobalBeginSucceed = iRunStatus->didGlobalBeginSucceed();
1524 bool cleaningUpAfterException = iRunStatus->cleaningUpAfterException() || iTask.
taskHasFailed();
1525 EventSetupImpl const& es = iRunStatus->eventSetupImplEndRun(
esp_->subProcessIndex());
1526 std::vector<std::shared_ptr<const EventSetupImpl>>
const* eventSetupImpls = &iRunStatus->eventSetupImplsEndRun();
1527 bool endingEventSetupSucceeded = iRunStatus->endingEventSetupSucceeded();
1531 chain::first([
this, &runPrincipal, &es, &eventSetupImpls, cleaningUpAfterException, endingEventSetupSucceeded](
1533 if (endingEventSetupSucceeded) {
1536 endGlobalTransitionAsync<Traits>(
1541 [
this, &runPrincipal, &es](
auto nextTask) {
1545 [
this, &runPrincipal, &es](
auto nextTask) {
1549 ifThen(didGlobalBeginSucceed && endingEventSetupSucceeded,
1550 [
this, mergeableRunProductMetadata, &runPrincipal = runPrincipal](
auto nextTask) {
1552 writeRunAsync(nextTask, runPrincipal, mergeableRunProductMetadata);
1556 didGlobalBeginSucceed,
1557 mergeableRunProductMetadata,
1558 endingEventSetupSucceeded](std::exception_ptr
const* iException,
auto nextTask)
mutable {
1559 if (didGlobalBeginSucceed && endingEventSetupSucceeded) {
1560 mergeableRunProductMetadata->postWriteRun();
1567 std::exception_ptr ptr;
1574 ptr = std::current_exception();
1578 status->resumeGlobalRunQueue();
1582 ptr = std::current_exception();
1586 status->resetEndResources();
1590 ptr = std::current_exception();
1594 if (ptr && !iException) {
1620 if (runStatus->streamFinishedRun()) {
1621 runStatus->globalEndRunHolder().doneWaiting(std::exception_ptr());
1632 if (runStatus->didGlobalBeginSucceed() && runStatus->endingEventSetupSucceeded()) {
1634 auto eventSetupImpls = &runStatus->eventSetupImplsEndRun();
1635 bool cleaningUpAfterException = runStatus->cleaningUpAfterException() || iTask.
taskHasFailed();
1637 auto& runPrincipal = *runStatus->runPrincipal();
1640 endStreamTransitionAsync<Traits>(
std::move(runDoneTaskHolder),
1646 cleaningUpAfterException);
1658 runStatus->setCleaningUpAfterException(cleaningUpAfterException);
1660 runStatus->setHolderOfTaskInProcessRuns(holder);
1668 std::shared_ptr<RunProcessingStatus> iRunStatus,
1670 actReg_->esSyncIOVQueuingSignal_.emit(iSync);
1676 status->endIOVWaitingTasks(),
1677 status->eventSetupImpls(),
1681 }) |
chain::then([
this,
status, iRunStatus](std::exception_ptr
const* iException,
auto nextTask) {
1693 if (postLumiQueueTask.taskHasFailed()) {
1694 status->resetResources();
1703 *postLumiQueueTask.group(),
1704 [
this, postSourceTask = postLumiQueueTask,
status, iRunStatus]()
mutable {
1708 if (postSourceTask.taskHasFailed()) {
1709 status->resetResources();
1721 sentry.completedSuccessfully();
1727 rng->preBeginLumi(lb);
1739 }) |
then([
this,
status, &es, &lumiPrincipal](
auto nextTask) {
1742 beginGlobalTransitionAsync<Traits>(
1748 status->globalBeginDidSucceed();
1751 }) |
then([
this,
status, iRunStatus](std::exception_ptr
const* iException,
auto holder)
mutable {
1753 status->resetResources();
1760 status->globalBeginDidSucceed();
1763 status->setGlobalEndRunHolder(iRunStatus->globalEndRunHolder());
1777 auto eventSetupImpls = &
status->eventSetupImpls();
1778 auto lp =
status->lumiPrincipal().get();
1781 event.setLuminosityBlockPrincipal(lp);
1785 beginStreamTransitionAsync<Traits>(
std::move(nextTask),
1792 then([
this,
i](std::exception_ptr
const* exceptionFromBeginStreamLumi,
1794 if (exceptionFromBeginStreamLumi) {
1796 copyHolder.
doneWaiting(*exceptionFromBeginStreamLumi);
1807 status->resetResources();
1815 status->resetResources();
1823 status->resetResources();
1839 status->lumiPrincipal()->luminosityBlock() ==
input_->luminosityBlock()) {
1844 unsigned int streamIndex = 0;
1845 oneapi::tbb::task_arena arena{oneapi::tbb::task_arena::attach()};
1849 nextTask.group()->run(
1859 tmp.doneWaiting(iException);
1864 std::shared_ptr<LuminosityBlockProcessingStatus> iLumiStatus) {
1867 auto& lp = *(iLumiStatus->lumiPrincipal());
1868 bool didGlobalBeginSucceed = iLumiStatus->didGlobalBeginSucceed();
1869 bool cleaningUpAfterException = iLumiStatus->cleaningUpAfterException() || iTask.
taskHasFailed();
1871 std::vector<std::shared_ptr<const EventSetupImpl>>
const* eventSetupImpls = &iLumiStatus->eventSetupImpls();
1874 chain::first([
this, &lp, &es, &eventSetupImpls, cleaningUpAfterException](
auto nextTask) {
1879 endGlobalTransitionAsync<Traits>(
1881 }) |
then([
this, didGlobalBeginSucceed, &lumiPrincipal = lp](
auto nextTask) {
1883 if (didGlobalBeginSucceed) {
1892 }) |
then([
status =
std::move(iLumiStatus),
this](std::exception_ptr
const* iException,
auto nextTask)
mutable {
1898 std::exception_ptr ptr;
1906 ptr = std::current_exception();
1912 ptr = std::current_exception();
1917 status->resetResources();
1918 status->globalEndRunHolderDoneWaiting();
1922 ptr = std::current_exception();
1926 if (ptr && !iException) {
1945 if (
status->streamFinishedLumi()) {
1955 lumiStatus->setEndTime();
1958 auto eventSetupImpls = &lumiStatus->eventSetupImpls();
1959 bool cleaningUpAfterException = lumiStatus->cleaningUpAfterException() || iTask.
taskHasFailed();
1961 if (lumiStatus->didGlobalBeginSucceed()) {
1962 auto& lumiPrincipal = *lumiStatus->lumiPrincipal();
1965 endStreamTransitionAsync<Traits>(
std::move(lumiDoneTask),
1971 cleaningUpAfterException);
1987 globalWaitTask.wait();
1994 input_->readProcessBlock(processBlockPrincipal);
1995 sentry.completedSuccessfully();
2001 rp->setAux(*
input_->runAuxiliary());
2005 sentry.completedSuccessfully();
2007 assert(
input_->reducedProcessHistoryID() == rp->reducedProcessHistoryID());
2019 input_->readAndMergeRun(runPrincipal);
2020 sentry.completedSuccessfully();
2027 lbp->setAux(*
input_->luminosityBlockAuxiliary());
2031 sentry.completedSuccessfully();
2039 assert(lumiPrincipal.aux().sameIdentity(*
input_->luminosityBlockAuxiliary())
or 2040 input_->processHistoryRegistry().reducedProcessHistoryID(lumiPrincipal.aux().processHistoryID()) ==
2041 input_->processHistoryRegistry().reducedProcessHistoryID(
2042 input_->luminosityBlockAuxiliary()->processHistoryID()));
2043 bool lumiOK = lumiPrincipal.adjustToNewProductRegistry(*
preg());
2045 lumiPrincipal.mergeAuxiliary(*
input_->luminosityBlockAuxiliary());
2049 sentry.completedSuccessfully();
2062 s.writeProcessBlockAsync(nextTask, processBlockType);
2078 s.writeRunAsync(nextTask, runPrincipal, mergeableRunProductMetadata);
2103 s.writeLumiAsync(nextTask, lumiPrincipal);
2113 iStatus.
lumiPrincipal()->setRunPrincipal(std::shared_ptr<RunPrincipal>());
2126 std::lock_guard<std::recursive_mutex> guard(*(
sourceMutex_.get()));
2133 status->runPrincipal()->reducedProcessHistoryID() ==
input_->reducedProcessHistoryID()) {
2134 if (
status->holderOfTaskInProcessRuns().taskHasFailed()) {
2135 status->setStopBeforeProcessingRun(
true);
2146 status->setStopBeforeProcessingRun(
true);
2147 holder.doneWaiting(std::current_exception());
2160 std::lock_guard<std::recursive_mutex> guard(*(
sourceMutex_.get()));
2166 iLumiStatus->lumiPrincipal()->luminosityBlock() ==
input_->luminosityBlock()) {
2175 holder.doneWaiting(std::current_exception());
2182 chain::first([
this, iRunStatus](
auto nextTask)
mutable {
2186 }) |
chain::then([
this, iRunStatus](std::exception_ptr
const* iException,
auto nextTask) {
2193 iRunStatus->holderOfTaskInProcessRuns().doneWaiting(std::exception_ptr{});
2199 input_->luminosityBlockAuxiliary()->beginTime()),
2215 unsigned int iStreamIndex,
2247 std::lock_guard<std::recursive_mutex> guard(*(
sourceMutex_.get()));
2262 iStatus.
lumiPrincipal()->runPrincipal().reducedProcessHistoryID() !=
input_->reducedProcessHistoryID()))) {
2266 <<
"InputSource claimed previous Lumi Entry was last to be merged in this file,\n" 2267 <<
"but the next lumi entry has the same lumi number.\n" 2268 <<
"This is probably a bug in the InputSource. Please report to the Core group.\n";
2292 auto recursionTask =
2293 make_waiting_task([
this, iTask, iStreamIndex](std::exception_ptr
const* iEventException)
mutable {
2294 if (iEventException) {
2308 if (not
status->haveStartedNextLumiOrEndedRun()) {
2309 status->startNextLumiOrEndRun();
2313 input_->luminosityBlockAuxiliary()->beginTime()),
2332 if (runStatus->holderOfTaskInProcessRuns().hasTask()) {
2333 runStatus->holderOfTaskInProcessRuns().doneWaiting(std::exception_ptr{});
2355 sentry.completedSuccessfully();
2357 FDEBUG(1) <<
"\treadEvent\n";
2365 struct ClearEventGuard {
2367 : act_(iReg), context_(iContext) {
2370 ~ClearEventGuard() { act_.postClearEventSignal_.emit(context_); }
2383 rng->postEventRead(
ev);
2388 chain::first([
this, &es, pep, iStreamIndex](
auto nextTask) {
2393 subProcess.doEventAsync(nextTask, *pep, &
streamLumiStatus_[iStreamIndex]->eventSetupImpls());
2395 }) |
ifThen(
looper_, [
this, iStreamIndex, pep](
auto nextTask) {
2399 }) |
then([
this, pep](
auto nextTask) {
2400 FDEBUG(1) <<
"\tprocessEvent\n";
2404 pep->runPrincipal().index(),
2405 pep->luminosityBlockPrincipal().index(),
2408 ClearEventGuard guard(*this->
actReg_.get(), streamContext);
2409 pep->clearEventPrincipal();
2414 bool randomAccess =
input_->randomAccess();
2423 status =
looper_->doDuringLoop(iPrincipal, es, pc, &streamContext);
2441 FDEBUG(1) <<
"\tshouldWeStop\n";
2446 if (subProcess.terminate()) {
2462 bool expected =
false;
2472 ex <<
"The framework is configured to use at least two streams, but the following modules\n" 2473 <<
"require synchronizing on LuminosityBlock boundaries:";
2475 for (
auto worker :
schedule_->allWorkers()) {
2476 if (worker->wantsGlobalLuminosityBlocks() and worker->globalLuminosityBlocksQueue()) {
2478 ex <<
"\n " << worker->description()->moduleName() <<
" " << worker->description()->moduleLabel();
2482 ex <<
"\n\nThe situation can be fixed by either\n" 2483 <<
" * modifying the modules to support concurrent LuminosityBlocks (preferred), or\n" 2484 <<
" * setting 'process.options.numberOfConcurrentLuminosityBlocks = 1' in the configuration file";
2490 std::unique_ptr<LogSystem>
s;
2491 for (
auto worker :
schedule_->allWorkers()) {
2492 if (worker->wantsGlobalRuns() and worker->globalRunsQueue()) {
2494 s = std::make_unique<LogSystem>(
"ModulesSynchingOnRuns");
2495 (*s) <<
"The following modules require synchronizing on Run boundaries:";
2497 (*s) <<
"\n " << worker->description()->moduleName() <<
" " << worker->description()->moduleLabel();
2503 std::unique_ptr<LogSystem>
s;
2504 for (
auto worker :
schedule_->allWorkers()) {
2507 s = std::make_unique<LogSystem>(
"LegacyModules");
2508 (*s) <<
"The following legacy modules are configured. Support for legacy modules\n" 2509 "is going to end soon. These modules need to be converted to have type\n" 2510 "edm::global, edm::stream, edm::one, or in rare cases edm::limited.";
2512 (*s) <<
"\n " << worker->description()->moduleName() <<
" " << worker->description()->moduleLabel();
LuminosityBlockNumber_t luminosityBlock() const
PreClearEvent preClearEventSignal_
signal is emitted before the data products in the Event are cleared
std::atomic< bool > exceptionMessageLumis_
bool readNextEventForStream(WaitingTaskHolder const &, unsigned int iStreamIndex, LuminosityBlockProcessingStatus &)
void readEvent(unsigned int iStreamIndex)
void streamEndRunAsync(WaitingTaskHolder, unsigned int iStreamIndex)
ProcessContext processContext_
Log< level::System, false > LogSystem
void setProcessesWithMergeableRunProducts(ProductRegistry const &productRegistry)
T getParameter(std::string const &) const
void clear()
Not thread safe.
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
void clearRunPrincipal(RunProcessingStatus &)
void setExceptionMessageRuns()
void globalEndRunAsync(WaitingTaskHolder, std::shared_ptr< RunProcessingStatus >)
SharedResourcesAcquirer sourceResourcesAcquirer_
Timestamp const & endTime() const
void handleNextEventForStreamAsync(WaitingTaskHolder, unsigned int iStreamIndex)
int totalEventsFailed() const
std::shared_ptr< ProductRegistry const > preg() const
void warnAboutLegacyModules() const
void processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
InputSource::ItemTypeInfo lastSourceTransition_
def create(alignables, pedeDump, additionalData, outputFile, config)
std::shared_ptr< RunPrincipal > readRun()
void handleEndLumiExceptions(std::exception_ptr, WaitingTaskHolder const &)
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
int merge(int argc, char *argv[])
static PFTauRenderPlugin instance
EventProcessingState eventProcessingState() const
SerialTaskQueue streamQueuesInserter_
void endUnfinishedRun(bool cleaningUpAfterException)
void setExceptionMessageFiles(std::string &message)
InputSource::ItemTypeInfo nextTransitionType()
RunPrincipal const & runPrincipal() const
void processEventAsyncImpl(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
void processEventWithLooper(EventPrincipal &, unsigned int iStreamIndex)
void handleEndRunExceptions(std::exception_ptr, WaitingTaskHolder const &)
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
std::unique_ptr< edm::LimitedTaskQueue > lumiQueue_
volatile std::atomic< bool > shutdown_flag
static LuminosityBlockNumber_t maxLuminosityBlockNumber()
void clearCounters()
Clears counters used by trigger report.
ProcessBlockPrincipal & inputProcessBlockPrincipal() const
void push(oneapi::tbb::task_group &, const T &iAction)
asynchronously pushes functor iAction into queue
bool checkForAsyncStopRequest(StatusCode &)
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
bool needToCallNext() const
void writeLumiAsync(WaitingTaskHolder, LuminosityBlockPrincipal &)
std::unique_ptr< edm::ModuleTypeResolverMaker const > makeModuleTypeResolverMaker(edm::ParameterSet const &pset)
std::shared_ptr< EDLooperBase const > looper() const
void ensureAvailableAccelerators(edm::ParameterSet const ¶meterSet)
ParameterSet getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
std::shared_ptr< RunPrincipal > getAvailableRunPrincipalPtr()
std::shared_ptr< RunPrincipal > & runPrincipal()
void adjustEventsToNewProductRegistry(std::shared_ptr< ProductRegistry const >)
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
Log< level::Error, false > LogError
void adjustIndexesAfterProductRegistryAddition()
StreamID streamID() const
ParameterSet const & parameterSet(StableProvenance const &provenance, ProcessHistory const &history)
PreallocationConfiguration preallocations_
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
constexpr auto then(O &&iO)
std::vector< SubProcess > subProcesses_
static unsigned int getUniqueID()
Returns a unique id each time called. Intended to be passed to ModuleDescription's constructor's modI...
void swap(Association< C > &lhs, Association< C > &rhs)
std::atomic< bool > exceptionMessageRuns_
void mergeAuxiliary(RunAuxiliary const &aux)
edm::SerialTaskQueue queueWhichWaitsForIOVsToFinish_
void validateTopLevelParameterSets(ParameterSet *processParameterSet)
MergeableRunProductProcesses mergeableRunProductProcesses_
static std::string const input
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
void fillProcessBlockPrincipal(std::string const &processName, DelayedReader *reader=nullptr)
void setEndTime(Timestamp const &time)
ProcessBlockPrincipal & processBlockPrincipal() const
edm::propagate_const< std::unique_ptr< ModuleTypeResolverMaker const > > moduleTypeResolverMaker_
void endProcessBlock(bool cleaningUpAfterException, bool beginProcessBlockSucceeded)
U second(std::pair< T, U > const &p)
std::shared_ptr< LuminosityBlockPrincipal > & lumiPrincipal()
oneapi::tbb::task_group * group() const noexcept
void emit(Args &&... args) const
std::multimap< std::string, std::string > referencesToBranches_
unsigned int numberOfThreads() const
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
ServiceToken serviceToken_
ParameterSetID id() const
std::atomic< bool > deferredExceptionPtrIsSet_
bool resume()
Resumes processing if the queue was paused.
void doneWaiting(std::exception_ptr iExcept)
ParameterSet const & registerIt()
std::shared_ptr< EDLooperBase > fillLooper(eventsetup::EventSetupsController &esController, eventsetup::EventSetupProvider &cp, ParameterSet ¶ms, std::vector< std::string > const &loopers)
std::vector< edm::SerialTaskQueue > streamQueues_
auto runLast(edm::WaitingTaskHolder iTask)
void nextTransitionTypeAsync(std::shared_ptr< RunProcessingStatus > iRunStatus, WaitingTaskHolder nextTask)
bool taskHasFailed() const noexcept
std::vector< std::string > modulesToIgnoreForDeleteEarly_
ShouldWriteRun shouldWriteRun() const
std::unique_ptr< edm::LimitedTaskQueue > runQueue_
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
void writeRunAsync(WaitingTaskHolder, RunPrincipal const &, MergeableRunProductMetadata const *)
SerialTaskQueueChain & serialQueueChain() const
static void setThrowAnException(bool v)
void writeProcessBlockAsync(WaitingTaskHolder, ProcessBlockType)
void setLastOperationSucceeded(bool value)
unsigned int numberOfStreams() const
void initialize(Schedule const *, std::shared_ptr< ProductRegistry const >)
std::vector< ModuleDescription const * > nonConsumedUnscheduledModules(edm::PathsAndConsumesOfModulesBase const &iPnC, std::vector< ModuleProcessName > &consumedByChildren)
static ServiceRegistry & instance()
void clear()
Not thread safe.
void setNeedToCallNext(bool val)
StatusCode runToCompletion()
FunctorWaitingTask< F > * make_waiting_task(F f)
void clearLumiPrincipal(LuminosityBlockProcessingStatus &)
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
PreSourceNextTransition preSourceNextTransitionSignal_
bool lastOperationSucceeded() const
unsigned int numberOfRuns() const
void insert(std::unique_ptr< ProcessBlockPrincipal >)
std::vector< std::shared_ptr< RunProcessingStatus > > streamRunStatus_
std::string exceptionMessageFiles_
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
std::unique_ptr< InputSource > makeInput(unsigned int moduleIndex, ParameterSet ¶ms, CommonParams const &common, std::shared_ptr< ProductRegistry > preg, std::shared_ptr< BranchIDListHelper > branchIDListHelper, std::shared_ptr< ProcessBlockHelper > const &processBlockHelper, std::shared_ptr< ThinnedAssociationsHelper > thinnedAssociationsHelper, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, PreallocationConfiguration const &allocations)
void releaseBeginRunResources(unsigned int iStream)
std::shared_ptr< RunProcessingStatus > exceptionRunStatus_
void respondToCloseInputFile()
Log< level::Info, false > LogInfo
void readAndMergeRun(RunProcessingStatus &)
std::vector< ParameterSet > popSubProcessVParameterSet(ParameterSet ¶meterSet)
void reportShutdownSignal()
void warnAboutModulesRequiringRunSynchronization() const
void checkForModuleDependencyCorrectness(edm::PathsAndConsumesOfModulesBase const &iPnC, bool iPrintDependencies)
void respondToOpenInputFile()
unsigned int numberOfLuminosityBlocks() const
void beginLumiAsync(IOVSyncValue const &, std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
EventSetupImpl const & eventSetupImpl(unsigned subProcessIndex) const
void handleNextItemAfterMergingRunEntries(std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
InputSource::ItemTypeInfo lastTransitionType() const
InputSource::ItemType processRuns()
MergeableRunProductMetadata * mergeableRunProductMetadata()
oneapi::tbb::task_group taskGroup_
void throwAboutModulesRequiringLuminosityBlockSynchronization() const
void insertForInput(std::unique_ptr< ProcessBlockPrincipal >)
void addContext(std::string const &context)
static EventNumber_t maxEventNumber()
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
ShouldWriteLumi shouldWriteLumi() const
edm::EventID specifiedEventTransition() const
void endRunAsync(std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
bool forceESCacheClearOnNewRun_
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
bool shouldWeStop() const
std::vector< std::shared_ptr< const EventSetupImpl > > & eventSetupImpls()
std::atomic< unsigned int > streamRunActive_
void readAndMergeLumi(LuminosityBlockProcessingStatus &)
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
std::vector< std::string > branchesToDeleteEarly_
void readAndMergeLumiEntriesAsync(std::shared_ptr< LuminosityBlockProcessingStatus >, WaitingTaskHolder)
void closeInputFile(bool cleaningUpAfterException)
void readAndMergeRunEntriesAsync(std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
void readProcessBlock(ProcessBlockPrincipal &)
bool adjustToNewProductRegistry(ProductRegistry const ®)
static ComponentFactory< T > const * get()
std::exception_ptr deferredExceptionPtr_
void removeModules(std::vector< ModuleDescription const *> const &modules)
std::shared_ptr< LuminosityBlockPrincipal > readLuminosityBlock(std::shared_ptr< RunPrincipal > rp)
auto lastTask(edm::WaitingTaskHolder iTask)
T const & get(Event const &event, InputTag const &tag) noexcept(false)
void endUnfinishedLumi(bool cleaningUpAfterException)
bool shouldWeCloseOutput() const
PathsAndConsumesOfModules pathsAndConsumesOfModules_
int totalEventsPassed() const
edm::propagate_const< std::shared_ptr< ProcessBlockHelper > > processBlockHelper_
void continueLumiAsync(WaitingTaskHolder)
Transition requestedTransition() const
void globalEndLumiAsync(WaitingTaskHolder, std::shared_ptr< LuminosityBlockProcessingStatus >)
Log< level::System, true > LogAbsolute
void prepareForNextLoop()
std::shared_ptr< LuminosityBlockPrincipal > getAvailableLumiPrincipalPtr()
void streamBeginRunAsync(unsigned int iStream, std::shared_ptr< RunProcessingStatus >, bool precedingTasksSucceeded, WaitingTaskHolder)
auto wrap(F iFunc) -> decltype(iFunc())
std::shared_ptr< ActivityRegistry > actReg_
std::atomic< unsigned int > streamLumiActive_
Log< level::Warning, false > LogWarning
void streamEndLumiAsync(WaitingTaskHolder, unsigned int iStreamIndex)
void beginProcessBlock(bool &beginProcessBlockSucceeded)
The Signals That Services Can Subscribe To This is based on ActivityRegistry h
Helper function to determine trigger accepts.
T first(std::pair< T, U > const &p)
static ParentageRegistry * instance()
void setExceptionMessageLumis()
void beginRunAsync(IOVSyncValue const &, WaitingTaskHolder)
bool setDeferredException(std::exception_ptr)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
void setEventProcessingState(EventProcessingState val)
bool deleteNonConsumedUnscheduledModules_
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
void inputProcessBlocks()
bool insertMapped(value_type const &v)
static Registry * instance()
PrincipalCache principalCache_
void push(oneapi::tbb::task_group &iGroup, T &&iAction)
asynchronously pushes functor iAction into queue
EventProcessor(std::unique_ptr< ParameterSet > parameterSet, ServiceToken const &token=ServiceToken(), serviceregistry::ServiceLegacy=serviceregistry::kOverlapIsError, std::vector< std::string > const &defaultServices=std::vector< std::string >(), std::vector< std::string > const &forcedServices=std::vector< std::string >())
void dumpOptionsToLogFile(unsigned int nThreads, unsigned int nStreams, unsigned int nConcurrentLumis, unsigned int nConcurrentRuns)