84 #include "boost/range/adaptor/reversed.hpp" 96 #include "oneapi/tbb/task.h" 104 class PauseQueueSentry {
107 ~PauseQueueSentry() { queue_.resume(); }
116 namespace chain = waiting_task::chain;
119 std::unique_ptr<InputSource>
makeInput(
unsigned int moduleIndex,
122 std::shared_ptr<ProductRegistry> preg,
123 std::shared_ptr<BranchIDListHelper> branchIDListHelper,
124 std::shared_ptr<ProcessBlockHelper>
const& processBlockHelper,
125 std::shared_ptr<ThinnedAssociationsHelper> thinnedAssociationsHelper,
126 std::shared_ptr<ActivityRegistry> areg,
127 std::shared_ptr<ProcessConfiguration const> processConfiguration,
130 if (main_input ==
nullptr) {
132 <<
"There must be exactly one source in the configuration.\n" 133 <<
"It is missing (or there are sufficient syntax errors such that it is not recognized as the source)\n";
138 std::unique_ptr<ParameterSetDescriptionFillerBase>
filler(
141 filler->fill(descriptions);
146 std::ostringstream ost;
147 ost <<
"Validating configuration of input source of type " << modtype;
163 processConfiguration.get(),
170 thinnedAssociationsHelper,
174 common.maxSecondsUntilRampdown_,
177 areg->preSourceConstructionSignal_(md);
178 std::unique_ptr<InputSource>
input;
181 std::shared_ptr<int> sentry(
nullptr, [areg, &md](
void*) { areg->postSourceConstructionSignal_(md); });
184 input->preEventReadFromSourceSignal_.connect(std::cref(areg->preEventReadFromSourceSignal_));
185 input->postEventReadFromSourceSignal_.connect(std::cref(areg->postEventReadFromSourceSignal_));
188 std::ostringstream ost;
189 ost <<
"Constructing input source of type " << modtype;
200 std::vector<std::string>
const& loopers) {
201 std::shared_ptr<EDLooperBase> vLooper;
203 assert(1 == loopers.size());
205 for (
auto const& looperName : loopers) {
217 std::vector<std::string>
const& defaultServices,
218 std::vector<std::string>
const& forcedServices)
221 branchIDListHelper_(),
225 espController_(
std::make_unique<eventsetup::EventSetupsController>(moduleTypeResolverMaker_.
get())),
228 processConfiguration_(),
234 deferredExceptionPtrIsSet_(
false),
238 beginJobCalled_(
false),
239 shouldWeStop_(
false),
240 fileModeNoMerge_(
false),
241 exceptionMessageFiles_(),
242 exceptionMessageRuns_(
false),
243 exceptionMessageLumis_(
false),
244 forceLooperToEnd_(
false),
245 looperBeginJobRun_(
false),
246 forceESCacheClearOnNewRun_(
false),
247 eventSetupDataToExcludeFromPrefetching_() {
249 processDesc->addServices(defaultServices, forcedServices);
250 init(processDesc, iToken, iLegacy);
254 std::vector<std::string>
const& defaultServices,
255 std::vector<std::string>
const& forcedServices)
258 branchIDListHelper_(),
262 espController_(
std::make_unique<eventsetup::EventSetupsController>(moduleTypeResolverMaker_.
get())),
265 processConfiguration_(),
271 deferredExceptionPtrIsSet_(
false),
275 beginJobCalled_(
false),
276 shouldWeStop_(
false),
277 fileModeNoMerge_(
false),
278 exceptionMessageFiles_(),
279 exceptionMessageRuns_(
false),
280 exceptionMessageLumis_(
false),
281 forceLooperToEnd_(
false),
282 looperBeginJobRun_(
false),
283 forceESCacheClearOnNewRun_(
false),
284 eventSetupDataToExcludeFromPrefetching_() {
286 processDesc->addServices(defaultServices, forcedServices);
295 branchIDListHelper_(),
299 espController_(
std::make_unique<eventsetup::EventSetupsController>(moduleTypeResolverMaker_.
get())),
302 processConfiguration_(),
308 deferredExceptionPtrIsSet_(
false),
312 beginJobCalled_(
false),
313 shouldWeStop_(
false),
314 fileModeNoMerge_(
false),
315 exceptionMessageFiles_(),
316 exceptionMessageRuns_(
false),
317 exceptionMessageLumis_(
false),
318 forceLooperToEnd_(
false),
319 looperBeginJobRun_(
false),
320 forceESCacheClearOnNewRun_(
false),
321 eventSetupDataToExcludeFromPrefetching_() {
336 std::shared_ptr<ParameterSet>
parameterSet = processDesc->getProcessPSet();
340 bool const hasSubProcesses = !subProcessVParameterSet.empty();
353 <<
"Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
361 unsigned int nThreads = optionsPset.getUntrackedParameter<
unsigned int>(
"numberOfThreads");
367 unsigned int nStreams = optionsPset.getUntrackedParameter<
unsigned int>(
"numberOfStreams");
371 unsigned int nConcurrentLumis =
372 optionsPset.getUntrackedParameter<
unsigned int>(
"numberOfConcurrentLuminosityBlocks");
373 if (nConcurrentLumis == 0) {
374 nConcurrentLumis = 2;
376 if (nConcurrentLumis > nStreams) {
377 nConcurrentLumis = nStreams;
379 unsigned int nConcurrentRuns = optionsPset.getUntrackedParameter<
unsigned int>(
"numberOfConcurrentRuns");
380 if (nConcurrentRuns == 0 || nConcurrentRuns > nConcurrentLumis) {
381 nConcurrentRuns = nConcurrentLumis;
384 if (!loopers.empty()) {
386 if (nStreams != 1 || nConcurrentLumis != 1 || nConcurrentRuns != 1) {
387 edm::LogWarning(
"ThreadStreamSetup") <<
"There is a looper, so the number of streams, the number " 388 "of concurrent runs, and the number of concurrent lumis " 389 "are all being reset to 1. Loopers cannot currently support " 390 "values greater than 1.";
392 nConcurrentLumis = 1;
396 bool dumpOptions = optionsPset.getUntrackedParameter<
bool>(
"dumpOptions");
400 if (nThreads > 1
or nStreams > 1) {
401 edm::LogInfo(
"ThreadStreamSetup") <<
"setting # threads " << nThreads <<
"\nsetting # streams " << nStreams;
410 unsigned int maxConcurrentIOVs =
411 3 * nConcurrentRuns - 2 + ((nConcurrentLumis > nConcurrentRuns) ? (nConcurrentLumis - nConcurrentRuns) : 0);
417 optionsPset.getUntrackedParameter<
bool>(
"deleteNonConsumedUnscheduledModules");
420 if (not hasSubProcesses) {
424 auto referencePSets =
425 optionsPset.getUntrackedParameter<std::vector<edm::ParameterSet>>(
"holdsReferencesToDeleteEarly");
426 for (
auto const&
pset : referencePSets) {
428 auto references =
pset.getParameter<std::vector<std::string>>(
"references");
429 for (
auto const& ref : references) {
434 optionsPset.getUntrackedParameter<std::vector<std::string>>(
"modulesToIgnoreForDeleteEarly");
441 auto& serviceSets = processDesc->getServicesPSets();
451 handler->willBeUsingThreads();
458 ParameterSet const& eventSetupPset(optionsPset.getUntrackedParameterSet(
"eventSetup"));
463 if (!loopers.empty()) {
474 runQueue_ = std::make_unique<LimitedTaskQueue>(nConcurrentRuns);
475 lumiQueue_ = std::make_unique<LimitedTaskQueue>(nConcurrentLumis);
483 std::optional<ScheduleItems::MadeModules> madeModules;
486 tbb::task_group
group;
489 auto tempReg = std::make_shared<ProductRegistry>();
492 group.run([&,
this]() {
500 group.run([&,
this, tempReg]() {
506 items.branchIDListHelper(),
508 items.thinnedAssociationsHelper(),
510 items.processConfiguration(),
515 items.preg()->addFromInput(*tempReg);
545 auto ep = std::make_shared<EventPrincipal>(
preg(),
557 auto rp = std::make_unique<RunPrincipal>(
578 for (
auto& subProcessPSet : subProcessVParameterSet) {
650 std::vector<ModuleProcessName> consumedBySubProcesses;
653 auto c = subProcess.keepOnlyConsumedUnscheduledModules(deleteModules);
654 if (consumedBySubProcesses.empty()) {
656 }
else if (not
c.empty()) {
657 std::vector<ModuleProcessName>
tmp;
658 tmp.reserve(consumedBySubProcesses.size() +
c.size());
660 consumedBySubProcesses.end(),
663 std::back_inserter(
tmp));
672 not unusedModules.empty()) {
675 edm::LogInfo(
"DeleteModules").log([&unusedModules](
auto&
l) {
676 l <<
"Following modules are not in any Path or EndPath, nor is their output consumed by any other module, " 678 "therefore they are deleted before beginJob transition.";
695 schedule_->initializeEarlyDelete(branchesToDeleteEarly, referencesToBranches, modulesToSkip, *
preg_);
725 ex.
addContext(
"Calling beginJob for the source");
731 constexpr bool mustPrefetchMayGet =
true;
733 auto const runLookup =
preg_->productLookup(
InRun);
734 auto const lumiLookup =
preg_->productLookup(
InLumi);
737 looper_->updateLookup(
InRun, *runLookup, mustPrefetchMayGet);
738 looper_->updateLookup(
InLumi, *lumiLookup, mustPrefetchMayGet);
740 looper_->updateLookup(
esp_->recordsToResolverIndices());
744 actReg_->postBeginJobSignal_();
746 oneapi::tbb::task_group
group;
749 first([
this](
auto nextTask) {
751 first([
i,
this](
auto nextTask) {
766 "Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
773 oneapi::tbb::task_group
group;
781 first([
this,
i, &
c, &collectorMutex](
auto nextTask) {
782 std::exception_ptr
ep;
787 ep = std::current_exception();
790 std::lock_guard<std::mutex>
l(collectorMutex);
791 c.call([&
ep]() { std::rethrow_exception(
ep); });
793 }) |
then([
this,
i, &
c, &collectorMutex](
auto nextTask) {
795 first([
this,
i, &
c, &collectorMutex, &subProcess](
auto nextTask) {
796 std::exception_ptr
ep;
799 subProcess.doEndStream(
i);
801 ep = std::current_exception();
804 std::lock_guard<std::mutex>
l(collectorMutex);
805 c.call([&
ep]() { std::rethrow_exception(
ep); });
812 waitTask.waitNoThrow();
815 c.call([actReg]() { actReg->preEndJobSignal_(); });
824 c.call([actReg]() { actReg->postEndJobSignal_(); });
833 return schedule_->getAllModuleDescriptions();
845 #include "TransitionProcessors.icc" 849 bool returnValue =
false;
854 edm::LogSystem(
"ShutdownSignal") <<
"an external signal was sent to shutdown the job early.";
863 struct SourceNextGuard {
865 ~SourceNextGuard() { act_.postSourceNextTransitionSignal_.emit(); }
874 SourceNextGuard guard(*
actReg_.get());
877 itemTypeInfo =
input_->nextItemType();
881 sentry.completedSuccessfully();
900 std::lock_guard<std::recursive_mutex> guard(*(
sourceMutex_.get()));
903 runStatus->runPrincipal()->run() ==
input_->run() &&
904 runStatus->runPrincipal()->reducedProcessHistoryID() ==
input_->reducedProcessHistoryID()) {
906 <<
"InputSource claimed previous Run Entry was last to be merged in this file,\n" 907 <<
"but the next entry has the same run number and reduced ProcessHistoryID.\n" 908 <<
"This is probably a bug in the InputSource. Please report to the Core group.\n";
911 nextHolder.doneWaiting(std::current_exception());
921 actReg_->beginProcessingSignal_();
922 auto endSignal = [](
ActivityRegistry* iReg) { iReg->endProcessingSignal_(); };
923 std::unique_ptr<ActivityRegistry, decltype(endSignal)> guard(
actReg_.get(), endSignal);
928 bool firstTime =
true;
938 auto trans =
fp.processFiles(*
this);
949 throw cms::Exception(
"BadTransition") <<
"Unexpected transition change " <<
static_cast<int>(trans);
958 "Another exception was caught while trying to clean up lumis after the primary fatal exception.");
959 e.addAdditionalInfo(message);
960 if (
e.alreadyPrinted()) {
966 "Another exception was caught while trying to clean up runs after the primary fatal exception.");
967 e.addAdditionalInfo(message);
968 if (
e.alreadyPrinted()) {
974 if (
e.alreadyPrinted()) {
984 FDEBUG(1) <<
" \treadFile\n";
985 size_t size =
preg_->size();
990 streamRunStatus_[0]->runPrincipal()->adjustIndexesAfterProductRegistryAddition();
994 streamLumiStatus_[0]->lumiPrincipal()->adjustIndexesAfterProductRegistryAddition();
998 if (size < preg_->size()) {
1005 sentry.completedSuccessfully();
1011 input_->closeFile(
fb_.get(), cleaningUpAfterException);
1012 sentry.completedSuccessfully();
1014 FDEBUG(1) <<
"\tcloseInputFile\n";
1022 FDEBUG(1) <<
"\topenOutputFiles\n";
1029 FDEBUG(1) <<
"\tcloseOutputFiles\n";
1035 [
this](
auto& subProcess) { subProcess.updateBranchIDListHelper(
branchIDListHelper_->branchIDLists()); });
1039 FDEBUG(1) <<
"\trespondToOpenInputFile\n";
1047 FDEBUG(1) <<
"\trespondToCloseInputFile\n";
1057 FDEBUG(1) <<
"\tstartingNewLoop\n";
1063 looper_->setModuleChanger(&changer);
1065 looper_->setModuleChanger(
nullptr);
1071 FDEBUG(1) <<
"\tendOfLoop\n";
1078 FDEBUG(1) <<
"\trewind\n";
1083 FDEBUG(1) <<
"\tprepareForNextLoop\n";
1087 FDEBUG(1) <<
"\tshouldWeCloseOutput\n";
1090 if (subProcess.shouldWeCloseOutput()) {
1096 return schedule_->shouldWeCloseOutput();
1100 FDEBUG(1) <<
"\tdoErrorStuff\n";
1101 LogError(
"StateMachine") <<
"The EventProcessor state machine encountered an unexpected event\n" 1102 <<
"and went to the error state\n" 1103 <<
"Will attempt to terminate processing normally\n" 1104 <<
"(IF using the looper the next loop will be attempted)\n" 1105 <<
"This likely indicates a bug in an input module or corrupted input or both\n";
1116 beginGlobalTransitionAsync<Traits>(
1119 globalWaitTask.wait();
1120 beginProcessBlockSucceeded =
true;
1124 input_->fillProcessBlockHelper();
1126 while (
input_->nextProcessBlock(processBlockPrincipal)) {
1133 beginGlobalTransitionAsync<Traits>(
1136 globalWaitTask.wait();
1140 writeWaitTask.wait();
1161 cleaningUpAfterException);
1162 globalWaitTask.wait();
1164 if (beginProcessBlockSucceeded) {
1167 writeWaitTask.wait();
1190 runStatus->runPrincipal()->run() ==
input_->run() and
1191 runStatus->runPrincipal()->reducedProcessHistoryID() ==
input_->reducedProcessHistoryID()) {
1199 runStatus->setHolderOfTaskInProcessRuns(holder);
1216 actReg_->esSyncIOVQueuingSignal_.emit(iSync);
1223 status->endIOVWaitingTasks(),
1224 status->eventSetupImpls(),
1229 }) |
chain::then([
this,
status](std::exception_ptr
const* iException,
auto nextTask) {
1242 if (postRunQueueTask.taskHasFailed()) {
1243 status->resetBeginResources();
1251 *postRunQueueTask.group(), [
this, postSourceTask = postRunQueueTask,
status]()
mutable {
1255 if (postSourceTask.taskHasFailed()) {
1256 status->resetBeginResources();
1258 status->resumeGlobalRunQueue();
1268 sentry.completedSuccessfully();
1275 oneapi::tbb::task_group
group;
1280 }) |
then([
this, &es](
auto nextTask)
mutable {
1297 status->setStopBeforeProcessingRun(
true);
1298 nextTask.doneWaiting(std::current_exception());
1300 }) |
then([
this,
status, &es](
auto nextTask) {
1301 if (
status->stopBeforeProcessingRun()) {
1306 beginGlobalTransitionAsync<Traits>(
1309 if (
status->stopBeforeProcessingRun()) {
1312 status->globalBeginDidSucceed();
1314 if (
status->stopBeforeProcessingRun()) {
1320 if (
status->stopBeforeProcessingRun()) {
1325 }) |
then([
this,
status](std::exception_ptr
const* iException,
auto holder)
mutable {
1326 bool precedingTasksSucceeded =
true;
1328 precedingTasksSucceeded =
false;
1333 if (
status->stopBeforeProcessingRun()) {
1335 status->resetBeginResources();
1337 status->resumeGlobalRunQueue();
1343 auto globalEndRunTask =
1346 status->holderOfTaskInProcessRuns().doneWaiting(std::exception_ptr{});
1351 status->resetBeginResources();
1353 status->resumeGlobalRunQueue();
1354 holder.doneWaiting(std::current_exception());
1369 *holder.group(), [
this,
status, precedingTasksSucceeded, holder]()
mutable {
1374 [
this,
i,
status, precedingTasksSucceeded, holder]()
mutable {
1379 if (
status->streamFinishedBeginRun()) {
1382 status->resetBeginResources();
1392 status->resetBeginResources();
1399 status->resetBeginResources();
1401 status->resumeGlobalRunQueue();
1402 postSourceTask.doneWaiting(std::current_exception());
1406 status->resetBeginResources();
1408 status->resumeGlobalRunQueue();
1409 postRunQueueTask.doneWaiting(std::current_exception());
1413 status->resetBeginResources();
1415 nextTask.doneWaiting(std::current_exception());
1421 std::shared_ptr<RunProcessingStatus>
status,
1422 bool precedingTasksSucceeded,
1431 chain::first([
this, iStream, precedingTasksSucceeded](
auto nextTask) {
1432 if (precedingTasksSucceeded) {
1437 beginStreamTransitionAsync<Traits>(
1440 }) |
then([
this, iStream](std::exception_ptr
const* exceptionFromBeginStreamRun,
auto nextTask) {
1441 if (exceptionFromBeginStreamRun) {
1442 nextTask.doneWaiting(*exceptionFromBeginStreamRun);
1454 if (
status->streamFinishedBeginRun()) {
1455 status->resetBeginResources();
1462 RunPrincipal& runPrincipal = *iRunStatus->runPrincipal();
1475 iRunStatus->endIOVWaitingTasksEndRun(),
1476 iRunStatus->eventSetupImplsEndRun(),
1480 }) |
chain::then([
this, iRunStatus](std::exception_ptr
const* iException,
auto nextTask) {
1482 iRunStatus->setEndingEventSetupSucceeded(
false);
1489 streamQueues_[
i].push(*nextTask.group(), [
this,
i, nextTask]()
mutable {
1516 tmp.doneWaiting(iException);
1521 auto& runPrincipal = *(iRunStatus->runPrincipal());
1522 bool didGlobalBeginSucceed = iRunStatus->didGlobalBeginSucceed();
1523 bool cleaningUpAfterException = iRunStatus->cleaningUpAfterException() || iTask.
taskHasFailed();
1524 EventSetupImpl const& es = iRunStatus->eventSetupImplEndRun(
esp_->subProcessIndex());
1525 std::vector<std::shared_ptr<const EventSetupImpl>>
const* eventSetupImpls = &iRunStatus->eventSetupImplsEndRun();
1526 bool endingEventSetupSucceeded = iRunStatus->endingEventSetupSucceeded();
1530 chain::first([
this, &runPrincipal, &es, &eventSetupImpls, cleaningUpAfterException, endingEventSetupSucceeded](
1532 if (endingEventSetupSucceeded) {
1535 endGlobalTransitionAsync<Traits>(
1540 [
this, &runPrincipal, &es](
auto nextTask) {
1544 [
this, &runPrincipal, &es](
auto nextTask) {
1548 ifThen(didGlobalBeginSucceed && endingEventSetupSucceeded,
1549 [
this, mergeableRunProductMetadata, &runPrincipal = runPrincipal](
auto nextTask) {
1551 writeRunAsync(nextTask, runPrincipal, mergeableRunProductMetadata);
1555 didGlobalBeginSucceed,
1556 mergeableRunProductMetadata,
1557 endingEventSetupSucceeded](std::exception_ptr
const* iException,
auto nextTask)
mutable {
1558 if (didGlobalBeginSucceed && endingEventSetupSucceeded) {
1559 mergeableRunProductMetadata->postWriteRun();
1566 std::exception_ptr ptr;
1573 ptr = std::current_exception();
1577 status->resumeGlobalRunQueue();
1581 ptr = std::current_exception();
1585 status->resetEndResources();
1589 ptr = std::current_exception();
1593 if (ptr && !iException) {
1619 if (runStatus->streamFinishedRun()) {
1620 runStatus->globalEndRunHolder().doneWaiting(std::exception_ptr());
1631 if (runStatus->didGlobalBeginSucceed() && runStatus->endingEventSetupSucceeded()) {
1633 auto eventSetupImpls = &runStatus->eventSetupImplsEndRun();
1634 bool cleaningUpAfterException = runStatus->cleaningUpAfterException() || iTask.
taskHasFailed();
1636 auto& runPrincipal = *runStatus->runPrincipal();
1639 endStreamTransitionAsync<Traits>(
std::move(runDoneTaskHolder),
1645 cleaningUpAfterException);
1657 runStatus->setCleaningUpAfterException(cleaningUpAfterException);
1659 runStatus->setHolderOfTaskInProcessRuns(holder);
1667 std::shared_ptr<RunProcessingStatus> iRunStatus,
1669 actReg_->esSyncIOVQueuingSignal_.emit(iSync);
1671 auto status = std::make_shared<LuminosityBlockProcessingStatus>();
1675 status->endIOVWaitingTasks(),
1676 status->eventSetupImpls(),
1680 }) |
chain::then([
this,
status, iRunStatus](std::exception_ptr
const* iException,
auto nextTask) {
1692 if (postLumiQueueTask.taskHasFailed()) {
1693 status->resetResources();
1702 *postLumiQueueTask.group(),
1703 [
this, postSourceTask = postLumiQueueTask,
status, iRunStatus]()
mutable {
1707 if (postSourceTask.taskHasFailed()) {
1708 status->resetResources();
1720 sentry.completedSuccessfully();
1726 rng->preBeginLumi(lb);
1738 }) |
then([
this,
status, &es, &lumiPrincipal](
auto nextTask) {
1741 beginGlobalTransitionAsync<Traits>(
1747 status->globalBeginDidSucceed();
1750 }) |
then([
this,
status, iRunStatus](std::exception_ptr
const* iException,
auto holder)
mutable {
1752 status->resetResources();
1759 status->globalBeginDidSucceed();
1762 status->setGlobalEndRunHolder(iRunStatus->globalEndRunHolder());
1770 if (!
status->shouldStreamStartLumi()) {
1776 auto eventSetupImpls = &
status->eventSetupImpls();
1777 auto lp =
status->lumiPrincipal().get();
1780 event.setLuminosityBlockPrincipal(lp);
1784 beginStreamTransitionAsync<Traits>(
std::move(nextTask),
1791 then([
this,
i](std::exception_ptr
const* exceptionFromBeginStreamLumi,
1793 if (exceptionFromBeginStreamLumi) {
1795 copyHolder.
doneWaiting(*exceptionFromBeginStreamLumi);
1806 status->resetResources();
1814 status->resetResources();
1822 status->resetResources();
1838 status->lumiPrincipal()->luminosityBlock() ==
input_->luminosityBlock()) {
1843 unsigned int streamIndex = 0;
1844 oneapi::tbb::task_arena arena{oneapi::tbb::task_arena::attach()};
1848 nextTask.group()->run(
1858 tmp.doneWaiting(iException);
1863 std::shared_ptr<LuminosityBlockProcessingStatus> iLumiStatus) {
1866 auto& lp = *(iLumiStatus->lumiPrincipal());
1867 bool didGlobalBeginSucceed = iLumiStatus->didGlobalBeginSucceed();
1868 bool cleaningUpAfterException = iLumiStatus->cleaningUpAfterException() || iTask.
taskHasFailed();
1870 std::vector<std::shared_ptr<const EventSetupImpl>>
const* eventSetupImpls = &iLumiStatus->eventSetupImpls();
1873 chain::first([
this, &lp, &es, &eventSetupImpls, cleaningUpAfterException](
auto nextTask) {
1878 endGlobalTransitionAsync<Traits>(
1880 }) |
then([
this, didGlobalBeginSucceed, &lumiPrincipal = lp](
auto nextTask) {
1882 if (didGlobalBeginSucceed) {
1891 }) |
then([
status =
std::move(iLumiStatus),
this](std::exception_ptr
const* iException,
auto nextTask)
mutable {
1897 std::exception_ptr ptr;
1905 ptr = std::current_exception();
1911 ptr = std::current_exception();
1916 status->resetResources();
1917 status->globalEndRunHolderDoneWaiting();
1921 ptr = std::current_exception();
1925 if (ptr && !iException) {
1944 if (
status->streamFinishedLumi()) {
1954 lumiStatus->setEndTime();
1957 auto eventSetupImpls = &lumiStatus->eventSetupImpls();
1958 bool cleaningUpAfterException = lumiStatus->cleaningUpAfterException() || iTask.
taskHasFailed();
1960 if (lumiStatus->didGlobalBeginSucceed()) {
1961 auto& lumiPrincipal = *lumiStatus->lumiPrincipal();
1964 endStreamTransitionAsync<Traits>(
std::move(lumiDoneTask),
1970 cleaningUpAfterException);
1987 globalWaitTask.wait();
1994 input_->readProcessBlock(processBlockPrincipal);
1995 sentry.completedSuccessfully();
2001 rp->setAux(*
input_->runAuxiliary());
2005 sentry.completedSuccessfully();
2007 assert(
input_->reducedProcessHistoryID() == rp->reducedProcessHistoryID());
2019 input_->readAndMergeRun(runPrincipal);
2020 sentry.completedSuccessfully();
2027 lbp->setAux(*
input_->luminosityBlockAuxiliary());
2031 sentry.completedSuccessfully();
2039 assert(lumiPrincipal.aux().sameIdentity(*
input_->luminosityBlockAuxiliary())
or 2040 input_->processHistoryRegistry().reducedProcessHistoryID(lumiPrincipal.aux().processHistoryID()) ==
2041 input_->processHistoryRegistry().reducedProcessHistoryID(
2042 input_->luminosityBlockAuxiliary()->processHistoryID()));
2043 bool lumiOK = lumiPrincipal.adjustToNewProductRegistry(*
preg());
2045 lumiPrincipal.mergeAuxiliary(*
input_->luminosityBlockAuxiliary());
2049 sentry.completedSuccessfully();
2062 s.writeProcessBlockAsync(nextTask, processBlockType);
2078 s.writeRunAsync(nextTask, runPrincipal, mergeableRunProductMetadata);
2103 s.writeLumiAsync(nextTask, lumiPrincipal);
2113 iStatus.
lumiPrincipal()->setRunPrincipal(std::shared_ptr<RunPrincipal>());
2126 std::lock_guard<std::recursive_mutex> guard(*(
sourceMutex_.get()));
2133 status->runPrincipal()->reducedProcessHistoryID() ==
input_->reducedProcessHistoryID()) {
2134 if (
status->holderOfTaskInProcessRuns().taskHasFailed()) {
2135 status->setStopBeforeProcessingRun(
true);
2146 status->setStopBeforeProcessingRun(
true);
2147 holder.doneWaiting(std::current_exception());
2160 std::lock_guard<std::recursive_mutex> guard(*(
sourceMutex_.get()));
2166 iLumiStatus->lumiPrincipal()->luminosityBlock() ==
input_->luminosityBlock()) {
2175 holder.doneWaiting(std::current_exception());
2182 chain::first([
this, iRunStatus](
auto nextTask)
mutable {
2186 }) |
chain::then([
this, iRunStatus](std::exception_ptr
const* iException,
auto nextTask) {
2193 iRunStatus->holderOfTaskInProcessRuns().doneWaiting(std::exception_ptr{});
2199 input_->luminosityBlockAuxiliary()->beginTime()),
2215 unsigned int iStreamIndex,
2247 std::lock_guard<std::recursive_mutex> guard(*(
sourceMutex_.get()));
2262 iStatus.
lumiPrincipal()->runPrincipal().reducedProcessHistoryID() !=
input_->reducedProcessHistoryID()))) {
2266 <<
"InputSource claimed previous Lumi Entry was last to be merged in this file,\n" 2267 <<
"but the next lumi entry has the same lumi number.\n" 2268 <<
"This is probably a bug in the InputSource. Please report to the Core group.\n";
2292 auto recursionTask =
2293 make_waiting_task([
this, iTask, iStreamIndex](std::exception_ptr
const* iEventException)
mutable {
2294 if (iEventException) {
2308 if (not
status->haveStartedNextLumiOrEndedRun()) {
2309 status->noMoreEventsInLumi();
2310 status->startNextLumiOrEndRun();
2314 input_->luminosityBlockAuxiliary()->beginTime()),
2333 if (runStatus->holderOfTaskInProcessRuns().hasTask()) {
2334 runStatus->holderOfTaskInProcessRuns().doneWaiting(std::exception_ptr{});
2356 sentry.completedSuccessfully();
2358 FDEBUG(1) <<
"\treadEvent\n";
2366 struct ClearEventGuard {
2368 : act_(iReg), context_(iContext) {
2371 ~ClearEventGuard() { act_.postClearEventSignal_.emit(context_); }
2384 rng->postEventRead(
ev);
2389 chain::first([
this, &es, pep, iStreamIndex](
auto nextTask) {
2394 subProcess.doEventAsync(nextTask, *pep, &
streamLumiStatus_[iStreamIndex]->eventSetupImpls());
2396 }) |
ifThen(
looper_, [
this, iStreamIndex, pep](
auto nextTask) {
2400 }) |
then([
this, pep](
auto nextTask) {
2401 FDEBUG(1) <<
"\tprocessEvent\n";
2405 pep->runPrincipal().index(),
2406 pep->luminosityBlockPrincipal().index(),
2409 ClearEventGuard guard(*this->
actReg_.get(), streamContext);
2410 pep->clearEventPrincipal();
2415 bool randomAccess =
input_->randomAccess();
2424 status =
looper_->doDuringLoop(iPrincipal, es, pc, &streamContext);
2442 FDEBUG(1) <<
"\tshouldWeStop\n";
2447 if (subProcess.terminate()) {
2463 bool expected =
false;
2473 ex <<
"The framework is configured to use at least two streams, but the following modules\n" 2474 <<
"require synchronizing on LuminosityBlock boundaries:";
2476 for (
auto worker :
schedule_->allWorkers()) {
2477 if (worker->wantsGlobalLuminosityBlocks() and worker->globalLuminosityBlocksQueue()) {
2479 ex <<
"\n " << worker->description()->moduleName() <<
" " << worker->description()->moduleLabel();
2483 ex <<
"\n\nThe situation can be fixed by either\n" 2484 <<
" * modifying the modules to support concurrent LuminosityBlocks (preferred), or\n" 2485 <<
" * setting 'process.options.numberOfConcurrentLuminosityBlocks = 1' in the configuration file";
2491 std::unique_ptr<LogSystem>
s;
2492 for (
auto worker :
schedule_->allWorkers()) {
2493 if (worker->wantsGlobalRuns() and worker->globalRunsQueue()) {
2495 s = std::make_unique<LogSystem>(
"ModulesSynchingOnRuns");
2496 (*s) <<
"The following modules require synchronizing on Run boundaries:";
2498 (*s) <<
"\n " << worker->description()->moduleName() <<
" " << worker->description()->moduleLabel();
LuminosityBlockNumber_t luminosityBlock() const
PreClearEvent preClearEventSignal_
signal is emitted before the data products in the Event are cleared
std::atomic< bool > exceptionMessageLumis_
bool readNextEventForStream(WaitingTaskHolder const &, unsigned int iStreamIndex, LuminosityBlockProcessingStatus &)
void readEvent(unsigned int iStreamIndex)
void streamEndRunAsync(WaitingTaskHolder, unsigned int iStreamIndex)
ProcessContext processContext_
Log< level::System, false > LogSystem
void setProcessesWithMergeableRunProducts(ProductRegistry const &productRegistry)
T getParameter(std::string const &) const
void clear()
Not thread safe.
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
void clearRunPrincipal(RunProcessingStatus &)
void setExceptionMessageRuns()
void globalEndRunAsync(WaitingTaskHolder, std::shared_ptr< RunProcessingStatus >)
SharedResourcesAcquirer sourceResourcesAcquirer_
Timestamp const & endTime() const
void handleNextEventForStreamAsync(WaitingTaskHolder, unsigned int iStreamIndex)
int totalEventsFailed() const
std::shared_ptr< ProductRegistry const > preg() const
void processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
InputSource::ItemTypeInfo lastSourceTransition_
def create(alignables, pedeDump, additionalData, outputFile, config)
std::shared_ptr< RunPrincipal > readRun()
void handleEndLumiExceptions(std::exception_ptr, WaitingTaskHolder const &)
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
int merge(int argc, char *argv[])
static PFTauRenderPlugin instance
EventProcessingState eventProcessingState() const
SerialTaskQueue streamQueuesInserter_
void endUnfinishedRun(bool cleaningUpAfterException)
void setExceptionMessageFiles(std::string &message)
InputSource::ItemTypeInfo nextTransitionType()
RunPrincipal const & runPrincipal() const
void processEventAsyncImpl(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
void processEventWithLooper(EventPrincipal &, unsigned int iStreamIndex)
void handleEndRunExceptions(std::exception_ptr, WaitingTaskHolder const &)
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
std::unique_ptr< edm::LimitedTaskQueue > lumiQueue_
volatile std::atomic< bool > shutdown_flag
static LuminosityBlockNumber_t maxLuminosityBlockNumber()
void clearCounters()
Clears counters used by trigger report.
ProcessBlockPrincipal & inputProcessBlockPrincipal() const
void push(oneapi::tbb::task_group &, const T &iAction)
asynchronously pushes functor iAction into queue
bool checkForAsyncStopRequest(StatusCode &)
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
bool needToCallNext() const
void writeLumiAsync(WaitingTaskHolder, LuminosityBlockPrincipal &)
std::unique_ptr< edm::ModuleTypeResolverMaker const > makeModuleTypeResolverMaker(edm::ParameterSet const &pset)
std::shared_ptr< EDLooperBase const > looper() const
void ensureAvailableAccelerators(edm::ParameterSet const ¶meterSet)
ParameterSet getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
std::shared_ptr< RunPrincipal > getAvailableRunPrincipalPtr()
std::shared_ptr< RunPrincipal > & runPrincipal()
void adjustEventsToNewProductRegistry(std::shared_ptr< ProductRegistry const >)
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
Log< level::Error, false > LogError
void adjustIndexesAfterProductRegistryAddition()
StreamID streamID() const
ParameterSet const & parameterSet(StableProvenance const &provenance, ProcessHistory const &history)
PreallocationConfiguration preallocations_
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
constexpr auto then(O &&iO)
std::vector< SubProcess > subProcesses_
static unsigned int getUniqueID()
Returns a unique id each time called. Intended to be passed to ModuleDescription's constructor's modI...
void swap(Association< C > &lhs, Association< C > &rhs)
std::atomic< bool > exceptionMessageRuns_
void mergeAuxiliary(RunAuxiliary const &aux)
edm::SerialTaskQueue queueWhichWaitsForIOVsToFinish_
void validateTopLevelParameterSets(ParameterSet *processParameterSet)
MergeableRunProductProcesses mergeableRunProductProcesses_
static std::string const input
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
void fillProcessBlockPrincipal(std::string const &processName, DelayedReader *reader=nullptr)
void setEndTime(Timestamp const &time)
ProcessBlockPrincipal & processBlockPrincipal() const
edm::propagate_const< std::unique_ptr< ModuleTypeResolverMaker const > > moduleTypeResolverMaker_
void endProcessBlock(bool cleaningUpAfterException, bool beginProcessBlockSucceeded)
U second(std::pair< T, U > const &p)
std::shared_ptr< LuminosityBlockPrincipal > & lumiPrincipal()
oneapi::tbb::task_group * group() const noexcept
void emit(Args &&... args) const
std::multimap< std::string, std::string > referencesToBranches_
unsigned int numberOfThreads() const
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
ServiceToken serviceToken_
ParameterSetID id() const
std::atomic< bool > deferredExceptionPtrIsSet_
bool resume()
Resumes processing if the queue was paused.
void doneWaiting(std::exception_ptr iExcept)
ParameterSet const & registerIt()
std::shared_ptr< EDLooperBase > fillLooper(eventsetup::EventSetupsController &esController, eventsetup::EventSetupProvider &cp, ParameterSet ¶ms, std::vector< std::string > const &loopers)
std::vector< edm::SerialTaskQueue > streamQueues_
auto runLast(edm::WaitingTaskHolder iTask)
void nextTransitionTypeAsync(std::shared_ptr< RunProcessingStatus > iRunStatus, WaitingTaskHolder nextTask)
bool taskHasFailed() const noexcept
std::vector< std::string > modulesToIgnoreForDeleteEarly_
ShouldWriteRun shouldWriteRun() const
std::unique_ptr< edm::LimitedTaskQueue > runQueue_
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
void writeRunAsync(WaitingTaskHolder, RunPrincipal const &, MergeableRunProductMetadata const *)
SerialTaskQueueChain & serialQueueChain() const
static void setThrowAnException(bool v)
void writeProcessBlockAsync(WaitingTaskHolder, ProcessBlockType)
void setLastOperationSucceeded(bool value)
unsigned int numberOfStreams() const
void initialize(Schedule const *, std::shared_ptr< ProductRegistry const >)
std::vector< ModuleDescription const * > nonConsumedUnscheduledModules(edm::PathsAndConsumesOfModulesBase const &iPnC, std::vector< ModuleProcessName > &consumedByChildren)
static ServiceRegistry & instance()
void clear()
Not thread safe.
void setNeedToCallNext(bool val)
StatusCode runToCompletion()
FunctorWaitingTask< F > * make_waiting_task(F f)
void clearLumiPrincipal(LuminosityBlockProcessingStatus &)
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
PreSourceNextTransition preSourceNextTransitionSignal_
bool lastOperationSucceeded() const
unsigned int numberOfRuns() const
void insert(std::unique_ptr< ProcessBlockPrincipal >)
std::vector< std::shared_ptr< RunProcessingStatus > > streamRunStatus_
std::string exceptionMessageFiles_
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
std::unique_ptr< InputSource > makeInput(unsigned int moduleIndex, ParameterSet ¶ms, CommonParams const &common, std::shared_ptr< ProductRegistry > preg, std::shared_ptr< BranchIDListHelper > branchIDListHelper, std::shared_ptr< ProcessBlockHelper > const &processBlockHelper, std::shared_ptr< ThinnedAssociationsHelper > thinnedAssociationsHelper, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, PreallocationConfiguration const &allocations)
void releaseBeginRunResources(unsigned int iStream)
std::shared_ptr< RunProcessingStatus > exceptionRunStatus_
void respondToCloseInputFile()
Log< level::Info, false > LogInfo
void readAndMergeRun(RunProcessingStatus &)
std::vector< ParameterSet > popSubProcessVParameterSet(ParameterSet ¶meterSet)
void reportShutdownSignal()
void warnAboutModulesRequiringRunSynchronization() const
void checkForModuleDependencyCorrectness(edm::PathsAndConsumesOfModulesBase const &iPnC, bool iPrintDependencies)
void respondToOpenInputFile()
unsigned int numberOfLuminosityBlocks() const
void beginLumiAsync(IOVSyncValue const &, std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
EventSetupImpl const & eventSetupImpl(unsigned subProcessIndex) const
void handleNextItemAfterMergingRunEntries(std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
InputSource::ItemTypeInfo lastTransitionType() const
InputSource::ItemType processRuns()
MergeableRunProductMetadata * mergeableRunProductMetadata()
oneapi::tbb::task_group taskGroup_
void throwAboutModulesRequiringLuminosityBlockSynchronization() const
void insertForInput(std::unique_ptr< ProcessBlockPrincipal >)
void addContext(std::string const &context)
static EventNumber_t maxEventNumber()
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
ShouldWriteLumi shouldWriteLumi() const
edm::EventID specifiedEventTransition() const
void endRunAsync(std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
bool forceESCacheClearOnNewRun_
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
bool shouldWeStop() const
std::vector< std::shared_ptr< const EventSetupImpl > > & eventSetupImpls()
std::atomic< unsigned int > streamRunActive_
void readAndMergeLumi(LuminosityBlockProcessingStatus &)
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
std::vector< std::string > branchesToDeleteEarly_
void readAndMergeLumiEntriesAsync(std::shared_ptr< LuminosityBlockProcessingStatus >, WaitingTaskHolder)
void closeInputFile(bool cleaningUpAfterException)
void readAndMergeRunEntriesAsync(std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
void readProcessBlock(ProcessBlockPrincipal &)
bool adjustToNewProductRegistry(ProductRegistry const ®)
static ComponentFactory< T > const * get()
std::exception_ptr deferredExceptionPtr_
void removeModules(std::vector< ModuleDescription const *> const &modules)
std::shared_ptr< LuminosityBlockPrincipal > readLuminosityBlock(std::shared_ptr< RunPrincipal > rp)
auto lastTask(edm::WaitingTaskHolder iTask)
T const & get(Event const &event, InputTag const &tag) noexcept(false)
void endUnfinishedLumi(bool cleaningUpAfterException)
bool shouldWeCloseOutput() const
PathsAndConsumesOfModules pathsAndConsumesOfModules_
int totalEventsPassed() const
edm::propagate_const< std::shared_ptr< ProcessBlockHelper > > processBlockHelper_
void continueLumiAsync(WaitingTaskHolder)
Transition requestedTransition() const
void globalEndLumiAsync(WaitingTaskHolder, std::shared_ptr< LuminosityBlockProcessingStatus >)
Log< level::System, true > LogAbsolute
void prepareForNextLoop()
std::shared_ptr< LuminosityBlockPrincipal > getAvailableLumiPrincipalPtr()
void streamBeginRunAsync(unsigned int iStream, std::shared_ptr< RunProcessingStatus >, bool precedingTasksSucceeded, WaitingTaskHolder)
auto wrap(F iFunc) -> decltype(iFunc())
std::shared_ptr< ActivityRegistry > actReg_
std::atomic< unsigned int > streamLumiActive_
Log< level::Warning, false > LogWarning
void streamEndLumiAsync(WaitingTaskHolder, unsigned int iStreamIndex)
void beginProcessBlock(bool &beginProcessBlockSucceeded)
The Signals That Services Can Subscribe To This is based on ActivityRegistry h
Helper function to determine trigger accepts.
T first(std::pair< T, U > const &p)
static ParentageRegistry * instance()
void setExceptionMessageLumis()
void beginRunAsync(IOVSyncValue const &, WaitingTaskHolder)
bool setDeferredException(std::exception_ptr)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
void setEventProcessingState(EventProcessingState val)
bool deleteNonConsumedUnscheduledModules_
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
void inputProcessBlocks()
bool insertMapped(value_type const &v)
static Registry * instance()
PrincipalCache principalCache_
void push(oneapi::tbb::task_group &iGroup, T &&iAction)
asynchronously pushes functor iAction into queue
EventProcessor(std::unique_ptr< ParameterSet > parameterSet, ServiceToken const &token=ServiceToken(), serviceregistry::ServiceLegacy=serviceregistry::kOverlapIsError, std::vector< std::string > const &defaultServices=std::vector< std::string >(), std::vector< std::string > const &forcedServices=std::vector< std::string >())
void dumpOptionsToLogFile(unsigned int nThreads, unsigned int nStreams, unsigned int nConcurrentLumis, unsigned int nConcurrentRuns)