84 #include "boost/range/adaptor/reversed.hpp" 96 #include "oneapi/tbb/task.h" 104 class PauseQueueSentry {
107 ~PauseQueueSentry() { queue_.resume(); }
116 namespace chain = waiting_task::chain;
119 std::unique_ptr<InputSource>
makeInput(
unsigned int moduleIndex,
122 std::shared_ptr<ProductRegistry> preg,
123 std::shared_ptr<BranchIDListHelper> branchIDListHelper,
124 std::shared_ptr<ProcessBlockHelper>
const& processBlockHelper,
125 std::shared_ptr<ThinnedAssociationsHelper> thinnedAssociationsHelper,
126 std::shared_ptr<ActivityRegistry> areg,
127 std::shared_ptr<ProcessConfiguration const> processConfiguration,
130 if (main_input ==
nullptr) {
132 <<
"There must be exactly one source in the configuration.\n" 133 <<
"It is missing (or there are sufficient syntax errors such that it is not recognized as the source)\n";
138 std::unique_ptr<ParameterSetDescriptionFillerBase>
filler(
141 filler->fill(descriptions);
146 std::ostringstream ost;
147 ost <<
"Validating configuration of input source of type " << modtype;
163 processConfiguration.get(),
170 thinnedAssociationsHelper,
174 common.maxSecondsUntilRampdown_,
177 areg->preSourceConstructionSignal_(md);
178 std::unique_ptr<InputSource>
input;
181 std::shared_ptr<int> sentry(
nullptr, [areg, &md](
void*) { areg->postSourceConstructionSignal_(md); });
184 input->preEventReadFromSourceSignal_.connect(std::cref(areg->preEventReadFromSourceSignal_));
185 input->postEventReadFromSourceSignal_.connect(std::cref(areg->postEventReadFromSourceSignal_));
188 std::ostringstream ost;
189 ost <<
"Constructing input source of type " << modtype;
200 std::vector<std::string>
const& loopers) {
201 std::shared_ptr<EDLooperBase> vLooper;
203 assert(1 == loopers.size());
205 for (
auto const& looperName : loopers) {
217 std::vector<std::string>
const& defaultServices,
218 std::vector<std::string>
const& forcedServices)
221 branchIDListHelper_(),
225 espController_(
std::make_unique<eventsetup::EventSetupsController>(moduleTypeResolverMaker_.
get())),
228 processConfiguration_(),
234 deferredExceptionPtrIsSet_(
false),
238 beginJobCalled_(
false),
239 shouldWeStop_(
false),
240 fileModeNoMerge_(
false),
241 exceptionMessageFiles_(),
242 exceptionMessageRuns_(
false),
243 exceptionMessageLumis_(
false),
244 forceLooperToEnd_(
false),
245 looperBeginJobRun_(
false),
246 forceESCacheClearOnNewRun_(
false),
247 eventSetupDataToExcludeFromPrefetching_() {
249 processDesc->addServices(defaultServices, forcedServices);
250 init(processDesc, iToken, iLegacy);
254 std::vector<std::string>
const& defaultServices,
255 std::vector<std::string>
const& forcedServices)
258 branchIDListHelper_(),
262 espController_(
std::make_unique<eventsetup::EventSetupsController>(moduleTypeResolverMaker_.
get())),
265 processConfiguration_(),
271 deferredExceptionPtrIsSet_(
false),
275 beginJobCalled_(
false),
276 shouldWeStop_(
false),
277 fileModeNoMerge_(
false),
278 exceptionMessageFiles_(),
279 exceptionMessageRuns_(
false),
280 exceptionMessageLumis_(
false),
281 forceLooperToEnd_(
false),
282 looperBeginJobRun_(
false),
283 forceESCacheClearOnNewRun_(
false),
284 eventSetupDataToExcludeFromPrefetching_() {
286 processDesc->addServices(defaultServices, forcedServices);
295 branchIDListHelper_(),
299 espController_(
std::make_unique<eventsetup::EventSetupsController>(moduleTypeResolverMaker_.
get())),
302 processConfiguration_(),
308 deferredExceptionPtrIsSet_(
false),
312 beginJobCalled_(
false),
313 shouldWeStop_(
false),
314 fileModeNoMerge_(
false),
315 exceptionMessageFiles_(),
316 exceptionMessageRuns_(
false),
317 exceptionMessageLumis_(
false),
318 forceLooperToEnd_(
false),
319 looperBeginJobRun_(
false),
320 forceESCacheClearOnNewRun_(
false),
321 eventSetupDataToExcludeFromPrefetching_() {
336 std::shared_ptr<ParameterSet>
parameterSet = processDesc->getProcessPSet();
340 bool const hasSubProcesses = !subProcessVParameterSet.empty();
353 <<
"Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
361 unsigned int nThreads = optionsPset.getUntrackedParameter<
unsigned int>(
"numberOfThreads");
367 unsigned int nStreams = optionsPset.getUntrackedParameter<
unsigned int>(
"numberOfStreams");
371 unsigned int nConcurrentLumis =
372 optionsPset.getUntrackedParameter<
unsigned int>(
"numberOfConcurrentLuminosityBlocks");
373 if (nConcurrentLumis == 0) {
374 nConcurrentLumis = 2;
376 if (nConcurrentLumis > nStreams) {
377 nConcurrentLumis = nStreams;
379 unsigned int nConcurrentRuns = optionsPset.getUntrackedParameter<
unsigned int>(
"numberOfConcurrentRuns");
380 if (nConcurrentRuns == 0 || nConcurrentRuns > nConcurrentLumis) {
381 nConcurrentRuns = nConcurrentLumis;
384 if (!loopers.empty()) {
386 if (nStreams != 1 || nConcurrentLumis != 1 || nConcurrentRuns != 1) {
387 edm::LogWarning(
"ThreadStreamSetup") <<
"There is a looper, so the number of streams, the number " 388 "of concurrent runs, and the number of concurrent lumis " 389 "are all being reset to 1. Loopers cannot currently support " 390 "values greater than 1.";
392 nConcurrentLumis = 1;
396 bool dumpOptions = optionsPset.getUntrackedParameter<
bool>(
"dumpOptions");
400 if (nThreads > 1
or nStreams > 1) {
401 edm::LogInfo(
"ThreadStreamSetup") <<
"setting # threads " << nThreads <<
"\nsetting # streams " << nStreams;
410 unsigned int maxConcurrentIOVs =
411 3 * nConcurrentRuns - 2 + ((nConcurrentLumis > nConcurrentRuns) ? (nConcurrentLumis - nConcurrentRuns) : 0);
417 optionsPset.getUntrackedParameter<
bool>(
"deleteNonConsumedUnscheduledModules");
420 if (not hasSubProcesses) {
424 auto referencePSets =
425 optionsPset.getUntrackedParameter<std::vector<edm::ParameterSet>>(
"holdsReferencesToDeleteEarly");
426 for (
auto const&
pset : referencePSets) {
428 auto references =
pset.getParameter<std::vector<std::string>>(
"references");
429 for (
auto const& ref : references) {
434 optionsPset.getUntrackedParameter<std::vector<std::string>>(
"modulesToIgnoreForDeleteEarly");
441 auto& serviceSets = processDesc->getServicesPSets();
451 handler->willBeUsingThreads();
458 ParameterSet const& eventSetupPset(optionsPset.getUntrackedParameterSet(
"eventSetup"));
463 if (!loopers.empty()) {
474 runQueue_ = std::make_unique<LimitedTaskQueue>(nConcurrentRuns);
475 lumiQueue_ = std::make_unique<LimitedTaskQueue>(nConcurrentLumis);
483 std::optional<ScheduleItems::MadeModules> madeModules;
486 tbb::task_group
group;
489 auto tempReg = std::make_shared<ProductRegistry>();
492 group.run([&,
this]() {
500 group.run([&,
this, tempReg]() {
506 items.branchIDListHelper(),
508 items.thinnedAssociationsHelper(),
510 items.processConfiguration(),
515 items.preg()->addFromInput(*tempReg);
545 auto ep = std::make_shared<EventPrincipal>(
preg(),
557 auto rp = std::make_unique<RunPrincipal>(
578 for (
auto& subProcessPSet : subProcessVParameterSet) {
650 std::vector<ModuleProcessName> consumedBySubProcesses;
653 auto c = subProcess.keepOnlyConsumedUnscheduledModules(deleteModules);
654 if (consumedBySubProcesses.empty()) {
656 }
else if (not
c.empty()) {
657 std::vector<ModuleProcessName>
tmp;
658 tmp.reserve(consumedBySubProcesses.size() +
c.size());
660 consumedBySubProcesses.end(),
663 std::back_inserter(
tmp));
672 not unusedModules.empty()) {
675 edm::LogInfo(
"DeleteModules").log([&unusedModules](
auto&
l) {
676 l <<
"Following modules are not in any Path or EndPath, nor is their output consumed by any other module, " 678 "therefore they are deleted before beginJob transition.";
695 schedule_->initializeEarlyDelete(branchesToDeleteEarly, referencesToBranches, modulesToSkip, *
preg_);
726 ex.
addContext(
"Calling beginJob for the source");
732 constexpr
bool mustPrefetchMayGet =
true;
734 auto const runLookup =
preg_->productLookup(
InRun);
735 auto const lumiLookup =
preg_->productLookup(
InLumi);
738 looper_->updateLookup(
InRun, *runLookup, mustPrefetchMayGet);
739 looper_->updateLookup(
InLumi, *lumiLookup, mustPrefetchMayGet);
741 looper_->updateLookup(
esp_->recordsToResolverIndices());
745 actReg_->postBeginJobSignal_();
747 oneapi::tbb::task_group
group;
750 first([
this](
auto nextTask) {
752 first([
i,
this](
auto nextTask) {
767 "Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
774 oneapi::tbb::task_group
group;
782 first([
this,
i, &
c, &collectorMutex](
auto nextTask) {
783 std::exception_ptr
ep;
788 ep = std::current_exception();
791 std::lock_guard<std::mutex>
l(collectorMutex);
792 c.call([&
ep]() { std::rethrow_exception(
ep); });
794 }) |
then([
this,
i, &
c, &collectorMutex](
auto nextTask) {
796 first([
this,
i, &
c, &collectorMutex, &subProcess](
auto nextTask) {
797 std::exception_ptr
ep;
800 subProcess.doEndStream(
i);
802 ep = std::current_exception();
805 std::lock_guard<std::mutex>
l(collectorMutex);
806 c.call([&
ep]() { std::rethrow_exception(
ep); });
813 waitTask.waitNoThrow();
816 c.call([actReg]() { actReg->preEndJobSignal_(); });
825 c.call([actReg]() { actReg->postEndJobSignal_(); });
834 return schedule_->getAllModuleDescriptions();
846 #include "TransitionProcessors.icc" 850 bool returnValue =
false;
855 edm::LogSystem(
"ShutdownSignal") <<
"an external signal was sent to shutdown the job early.";
868 itemType =
input_->nextItemType();
872 sentry.completedSuccessfully();
889 actReg_->beginProcessingSignal_();
890 auto endSignal = [](
ActivityRegistry* iReg) { iReg->endProcessingSignal_(); };
891 std::unique_ptr<ActivityRegistry, decltype(endSignal)> guard(
actReg_.get(), endSignal);
896 bool firstTime =
true;
906 auto trans =
fp.processFiles(*
this);
917 throw cms::Exception(
"BadTransition") <<
"Unexpected transition change " << trans;
926 "Another exception was caught while trying to clean up lumis after the primary fatal exception.");
927 e.addAdditionalInfo(message);
928 if (
e.alreadyPrinted()) {
934 "Another exception was caught while trying to clean up runs after the primary fatal exception.");
935 e.addAdditionalInfo(message);
936 if (
e.alreadyPrinted()) {
942 if (
e.alreadyPrinted()) {
952 FDEBUG(1) <<
" \treadFile\n";
953 size_t size =
preg_->size();
958 streamRunStatus_[0]->runPrincipal()->adjustIndexesAfterProductRegistryAddition();
962 streamLumiStatus_[0]->lumiPrincipal()->adjustIndexesAfterProductRegistryAddition();
966 if (size < preg_->size()) {
973 sentry.completedSuccessfully();
979 input_->closeFile(
fb_.get(), cleaningUpAfterException);
980 sentry.completedSuccessfully();
982 FDEBUG(1) <<
"\tcloseInputFile\n";
990 FDEBUG(1) <<
"\topenOutputFiles\n";
997 FDEBUG(1) <<
"\tcloseOutputFiles\n";
1003 [
this](
auto& subProcess) { subProcess.updateBranchIDListHelper(
branchIDListHelper_->branchIDLists()); });
1007 FDEBUG(1) <<
"\trespondToOpenInputFile\n";
1015 FDEBUG(1) <<
"\trespondToCloseInputFile\n";
1025 FDEBUG(1) <<
"\tstartingNewLoop\n";
1031 looper_->setModuleChanger(&changer);
1033 looper_->setModuleChanger(
nullptr);
1039 FDEBUG(1) <<
"\tendOfLoop\n";
1046 FDEBUG(1) <<
"\trewind\n";
1051 FDEBUG(1) <<
"\tprepareForNextLoop\n";
1055 FDEBUG(1) <<
"\tshouldWeCloseOutput\n";
1058 if (subProcess.shouldWeCloseOutput()) {
1064 return schedule_->shouldWeCloseOutput();
1068 FDEBUG(1) <<
"\tdoErrorStuff\n";
1069 LogError(
"StateMachine") <<
"The EventProcessor state machine encountered an unexpected event\n" 1070 <<
"and went to the error state\n" 1071 <<
"Will attempt to terminate processing normally\n" 1072 <<
"(IF using the looper the next loop will be attempted)\n" 1073 <<
"This likely indicates a bug in an input module or corrupted input or both\n";
1084 beginGlobalTransitionAsync<Traits>(
1087 globalWaitTask.wait();
1088 beginProcessBlockSucceeded =
true;
1092 input_->fillProcessBlockHelper();
1094 while (
input_->nextProcessBlock(processBlockPrincipal)) {
1101 beginGlobalTransitionAsync<Traits>(
1104 globalWaitTask.wait();
1108 writeWaitTask.wait();
1129 cleaningUpAfterException);
1130 globalWaitTask.wait();
1132 if (beginProcessBlockSucceeded) {
1135 writeWaitTask.wait();
1158 runStatus->runPrincipal()->reducedProcessHistoryID() ==
input_->reducedProcessHistoryID()) {
1164 runStatus->setHolderOfTaskInProcessRuns(holder);
1181 actReg_->esSyncIOVQueuingSignal_.emit(iSync);
1188 status->endIOVWaitingTasks(),
1189 status->eventSetupImpls(),
1194 }) |
chain::then([
this,
status](std::exception_ptr
const* iException,
auto nextTask) {
1207 if (postRunQueueTask.taskHasFailed()) {
1208 status->resetBeginResources();
1216 *postRunQueueTask.group(), [
this, postSourceTask = postRunQueueTask,
status]()
mutable {
1220 if (postSourceTask.taskHasFailed()) {
1221 status->resetBeginResources();
1223 status->resumeGlobalRunQueue();
1233 sentry.completedSuccessfully();
1240 oneapi::tbb::task_group
group;
1245 }) |
then([
this, &es](
auto nextTask)
mutable {
1256 status->setStopBeforeProcessingRun(
true);
1257 nextTask.doneWaiting(std::current_exception());
1259 }) |
then([
this,
status, &es](
auto nextTask) {
1260 if (
status->stopBeforeProcessingRun()) {
1265 beginGlobalTransitionAsync<Traits>(
1268 if (
status->stopBeforeProcessingRun()) {
1271 status->globalBeginDidSucceed();
1273 if (
status->stopBeforeProcessingRun()) {
1279 if (
status->stopBeforeProcessingRun()) {
1284 }) |
then([
this,
status](std::exception_ptr
const* iException,
auto holder)
mutable {
1285 bool precedingTasksSucceeded =
true;
1287 precedingTasksSucceeded =
false;
1292 if (
status->stopBeforeProcessingRun()) {
1294 status->resetBeginResources();
1296 status->resumeGlobalRunQueue();
1302 auto globalEndRunTask =
1305 status->holderOfTaskInProcessRuns().doneWaiting(std::exception_ptr{});
1310 status->resetBeginResources();
1312 status->resumeGlobalRunQueue();
1313 holder.doneWaiting(std::current_exception());
1328 *holder.group(), [
this,
status, precedingTasksSucceeded, holder]()
mutable {
1333 [
this,
i,
status, precedingTasksSucceeded, holder]()
mutable {
1338 if (
status->streamFinishedBeginRun()) {
1341 status->resetBeginResources();
1351 status->resetBeginResources();
1358 status->resetBeginResources();
1360 status->resumeGlobalRunQueue();
1361 postSourceTask.doneWaiting(std::current_exception());
1365 status->resetBeginResources();
1367 status->resumeGlobalRunQueue();
1368 postRunQueueTask.doneWaiting(std::current_exception());
1372 status->resetBeginResources();
1374 nextTask.doneWaiting(std::current_exception());
1380 std::shared_ptr<RunProcessingStatus>
status,
1381 bool precedingTasksSucceeded,
1390 chain::first([
this, iStream, precedingTasksSucceeded](
auto nextTask) {
1391 if (precedingTasksSucceeded) {
1396 beginStreamTransitionAsync<Traits>(
1399 }) |
then([
this, iStream](std::exception_ptr
const* exceptionFromBeginStreamRun,
auto nextTask) {
1400 if (exceptionFromBeginStreamRun) {
1401 nextTask.doneWaiting(*exceptionFromBeginStreamRun);
1413 if (
status->streamFinishedBeginRun()) {
1414 status->resetBeginResources();
1421 RunPrincipal& runPrincipal = *iRunStatus->runPrincipal();
1434 iRunStatus->endIOVWaitingTasksEndRun(),
1435 iRunStatus->eventSetupImplsEndRun(),
1439 }) |
chain::then([
this, iRunStatus](std::exception_ptr
const* iException,
auto nextTask) {
1441 iRunStatus->setEndingEventSetupSucceeded(
false);
1448 streamQueues_[
i].push(*nextTask.group(), [
this,
i, nextTask]()
mutable {
1475 tmp.doneWaiting(iException);
1480 auto& runPrincipal = *(iRunStatus->runPrincipal());
1481 bool didGlobalBeginSucceed = iRunStatus->didGlobalBeginSucceed();
1482 bool cleaningUpAfterException = iRunStatus->cleaningUpAfterException() || iTask.
taskHasFailed();
1483 EventSetupImpl const& es = iRunStatus->eventSetupImplEndRun(
esp_->subProcessIndex());
1484 std::vector<std::shared_ptr<const EventSetupImpl>>
const* eventSetupImpls = &iRunStatus->eventSetupImplsEndRun();
1485 bool endingEventSetupSucceeded = iRunStatus->endingEventSetupSucceeded();
1489 chain::first([
this, &runPrincipal, &es, &eventSetupImpls, cleaningUpAfterException, endingEventSetupSucceeded](
1491 if (endingEventSetupSucceeded) {
1494 endGlobalTransitionAsync<Traits>(
1499 [
this, &runPrincipal, &es](
auto nextTask) {
1503 [
this, &runPrincipal, &es](
auto nextTask) {
1507 ifThen(didGlobalBeginSucceed && endingEventSetupSucceeded,
1508 [
this, mergeableRunProductMetadata, &runPrincipal = runPrincipal](
auto nextTask) {
1510 writeRunAsync(nextTask, runPrincipal, mergeableRunProductMetadata);
1514 didGlobalBeginSucceed,
1515 mergeableRunProductMetadata,
1516 endingEventSetupSucceeded](std::exception_ptr
const* iException,
auto nextTask)
mutable {
1517 if (didGlobalBeginSucceed && endingEventSetupSucceeded) {
1518 mergeableRunProductMetadata->postWriteRun();
1525 std::exception_ptr ptr;
1532 ptr = std::current_exception();
1536 status->resumeGlobalRunQueue();
1540 ptr = std::current_exception();
1544 status->resetEndResources();
1548 ptr = std::current_exception();
1552 if (ptr && !iException) {
1578 if (runStatus->streamFinishedRun()) {
1579 runStatus->globalEndRunHolder().doneWaiting(std::exception_ptr());
1590 if (runStatus->didGlobalBeginSucceed() && runStatus->endingEventSetupSucceeded()) {
1592 auto eventSetupImpls = &runStatus->eventSetupImplsEndRun();
1593 bool cleaningUpAfterException = runStatus->cleaningUpAfterException() || iTask.
taskHasFailed();
1595 auto& runPrincipal = *runStatus->runPrincipal();
1598 endStreamTransitionAsync<Traits>(
std::move(runDoneTaskHolder),
1604 cleaningUpAfterException);
1616 runStatus->setCleaningUpAfterException(cleaningUpAfterException);
1618 runStatus->setHolderOfTaskInProcessRuns(holder);
1626 std::shared_ptr<RunProcessingStatus> iRunStatus,
1628 actReg_->esSyncIOVQueuingSignal_.emit(iSync);
1634 status->endIOVWaitingTasks(),
1635 status->eventSetupImpls(),
1639 }) |
chain::then([
this,
status, iRunStatus](std::exception_ptr
const* iException,
auto nextTask) {
1651 if (postLumiQueueTask.taskHasFailed()) {
1652 status->resetResources();
1661 *postLumiQueueTask.group(),
1662 [
this, postSourceTask = postLumiQueueTask,
status, iRunStatus]()
mutable {
1666 if (postSourceTask.taskHasFailed()) {
1667 status->resetResources();
1679 sentry.completedSuccessfully();
1685 rng->preBeginLumi(lb);
1694 }) |
then([
this,
status, &es, &lumiPrincipal](
auto nextTask) {
1697 beginGlobalTransitionAsync<Traits>(
1703 status->globalBeginDidSucceed();
1706 }) |
then([
this,
status, iRunStatus](std::exception_ptr
const* iException,
auto holder)
mutable {
1708 status->resetResources();
1715 status->globalBeginDidSucceed();
1718 status->setGlobalEndRunHolder(iRunStatus->globalEndRunHolder());
1732 auto eventSetupImpls = &
status->eventSetupImpls();
1733 auto lp =
status->lumiPrincipal().get();
1736 event.setLuminosityBlockPrincipal(lp);
1740 beginStreamTransitionAsync<Traits>(
std::move(nextTask),
1747 then([
this,
i](std::exception_ptr
const* exceptionFromBeginStreamLumi,
1749 if (exceptionFromBeginStreamLumi) {
1751 copyHolder.
doneWaiting(*exceptionFromBeginStreamLumi);
1762 status->resetResources();
1770 status->resetResources();
1778 status->resetResources();
1794 status->lumiPrincipal()->luminosityBlock() ==
input_->luminosityBlock()) {
1800 unsigned int streamIndex = 0;
1801 oneapi::tbb::task_arena arena{oneapi::tbb::task_arena::attach()};
1805 nextTask.group()->run(
1815 tmp.doneWaiting(iException);
1820 std::shared_ptr<LuminosityBlockProcessingStatus> iLumiStatus) {
1823 auto& lp = *(iLumiStatus->lumiPrincipal());
1824 bool didGlobalBeginSucceed = iLumiStatus->didGlobalBeginSucceed();
1825 bool cleaningUpAfterException = iLumiStatus->cleaningUpAfterException() || iTask.
taskHasFailed();
1827 std::vector<std::shared_ptr<const EventSetupImpl>>
const* eventSetupImpls = &iLumiStatus->eventSetupImpls();
1830 chain::first([
this, &lp, &es, &eventSetupImpls, cleaningUpAfterException](
auto nextTask) {
1835 endGlobalTransitionAsync<Traits>(
1837 }) |
then([
this, didGlobalBeginSucceed, &lumiPrincipal = lp](
auto nextTask) {
1839 if (didGlobalBeginSucceed) {
1848 }) |
then([
status =
std::move(iLumiStatus),
this](std::exception_ptr
const* iException,
auto nextTask)
mutable {
1854 std::exception_ptr ptr;
1862 ptr = std::current_exception();
1868 ptr = std::current_exception();
1873 status->resetResources();
1874 status->globalEndRunHolderDoneWaiting();
1878 ptr = std::current_exception();
1882 if (ptr && !iException) {
1901 if (
status->streamFinishedLumi()) {
1911 lumiStatus->setEndTime();
1914 auto eventSetupImpls = &lumiStatus->eventSetupImpls();
1915 bool cleaningUpAfterException = lumiStatus->cleaningUpAfterException() || iTask.
taskHasFailed();
1917 if (lumiStatus->didGlobalBeginSucceed()) {
1918 auto& lumiPrincipal = *lumiStatus->lumiPrincipal();
1921 endStreamTransitionAsync<Traits>(
std::move(lumiDoneTask),
1927 cleaningUpAfterException);
1943 globalWaitTask.wait();
1950 input_->readProcessBlock(processBlockPrincipal);
1951 sentry.completedSuccessfully();
1957 rp->setAux(*
input_->runAuxiliary());
1961 sentry.completedSuccessfully();
1963 assert(
input_->reducedProcessHistoryID() == rp->reducedProcessHistoryID());
1975 input_->readAndMergeRun(runPrincipal);
1976 sentry.completedSuccessfully();
1983 lbp->setAux(*
input_->luminosityBlockAuxiliary());
1987 sentry.completedSuccessfully();
1995 assert(lumiPrincipal.aux().sameIdentity(*
input_->luminosityBlockAuxiliary())
or 1996 input_->processHistoryRegistry().reducedProcessHistoryID(lumiPrincipal.aux().processHistoryID()) ==
1997 input_->processHistoryRegistry().reducedProcessHistoryID(
1998 input_->luminosityBlockAuxiliary()->processHistoryID()));
1999 bool lumiOK = lumiPrincipal.adjustToNewProductRegistry(*
preg());
2001 lumiPrincipal.mergeAuxiliary(*
input_->luminosityBlockAuxiliary());
2005 sentry.completedSuccessfully();
2018 s.writeProcessBlockAsync(nextTask, processBlockType);
2034 s.writeRunAsync(nextTask, runPrincipal, mergeableRunProductMetadata);
2059 s.writeLumiAsync(nextTask, lumiPrincipal);
2069 iStatus.
lumiPrincipal()->setRunPrincipal(std::shared_ptr<RunPrincipal>());
2082 std::lock_guard<std::recursive_mutex> guard(*(
sourceMutex_.get()));
2086 status->runPrincipal()->reducedProcessHistoryID() ==
input_->reducedProcessHistoryID()) {
2087 if (
status->holderOfTaskInProcessRuns().taskHasFailed()) {
2088 status->setStopBeforeProcessingRun(
true);
2095 status->setStopBeforeProcessingRun(
true);
2096 holder.doneWaiting(std::current_exception());
2109 std::lock_guard<std::recursive_mutex> guard(*(
sourceMutex_.get()));
2113 iLumiStatus->lumiPrincipal()->luminosityBlock() ==
input_->luminosityBlock()) {
2118 holder.doneWaiting(std::current_exception());
2126 iRunStatus->holderOfTaskInProcessRuns().doneWaiting(std::exception_ptr{});
2131 input_->luminosityBlockAuxiliary()->beginTime()),
2147 unsigned int iStreamIndex,
2179 std::lock_guard<std::recursive_mutex> guard(*(
sourceMutex_.get()));
2194 iStatus.
lumiPrincipal()->runPrincipal().reducedProcessHistoryID() !=
input_->reducedProcessHistoryID()))) {
2213 auto recursionTask =
2214 make_waiting_task([
this, iTask, iStreamIndex](std::exception_ptr
const* iEventException)
mutable {
2215 if (iEventException) {
2229 if (not
status->haveStartedNextLumiOrEndedRun()) {
2230 status->startNextLumiOrEndRun();
2234 input_->luminosityBlockAuxiliary()->beginTime()),
2253 if (runStatus->holderOfTaskInProcessRuns().hasTask()) {
2254 runStatus->holderOfTaskInProcessRuns().doneWaiting(std::exception_ptr{});
2276 sentry.completedSuccessfully();
2278 FDEBUG(1) <<
"\treadEvent\n";
2292 rng->postEventRead(
ev);
2297 chain::first([
this, &es, pep, iStreamIndex](
auto nextTask) {
2302 subProcess.doEventAsync(nextTask, *pep, &
streamLumiStatus_[iStreamIndex]->eventSetupImpls());
2304 }) |
ifThen(
looper_, [
this, iStreamIndex, pep](
auto nextTask) {
2308 }) |
then([pep](
auto nextTask) {
2309 FDEBUG(1) <<
"\tprocessEvent\n";
2310 pep->clearEventPrincipal();
2315 bool randomAccess =
input_->randomAccess();
2324 status =
looper_->doDuringLoop(iPrincipal, es, pc, &streamContext);
2342 FDEBUG(1) <<
"\tshouldWeStop\n";
2347 if (subProcess.terminate()) {
2363 bool expected =
false;
2373 ex <<
"The framework is configured to use at least two streams, but the following modules\n" 2374 <<
"require synchronizing on LuminosityBlock boundaries:";
2376 for (
auto worker :
schedule_->allWorkers()) {
2377 if (worker->wantsGlobalLuminosityBlocks() and worker->globalLuminosityBlocksQueue()) {
2379 ex <<
"\n " << worker->description()->moduleName() <<
" " << worker->description()->moduleLabel();
2383 ex <<
"\n\nThe situation can be fixed by either\n" 2384 <<
" * modifying the modules to support concurrent LuminosityBlocks (preferred), or\n" 2385 <<
" * setting 'process.options.numberOfConcurrentLuminosityBlocks = 1' in the configuration file";
2391 std::unique_ptr<LogSystem>
s;
2392 for (
auto worker :
schedule_->allWorkers()) {
2393 if (worker->wantsGlobalRuns() and worker->globalRunsQueue()) {
2395 s = std::make_unique<LogSystem>(
"ModulesSynchingOnRuns");
2396 (*s) <<
"The following modules require synchronizing on Run boundaries:";
2398 (*s) <<
"\n " << worker->description()->moduleName() <<
" " << worker->description()->moduleLabel();
2404 std::unique_ptr<LogSystem>
s;
2405 for (
auto worker :
schedule_->allWorkers()) {
2408 s = std::make_unique<LogSystem>(
"LegacyModules");
2409 (*s) <<
"The following legacy modules are configured. Support for legacy modules\n" 2410 "is going to end soon. These modules need to be converted to have type\n" 2411 "edm::global, edm::stream, edm::one, or in rare cases edm::limited.";
2413 (*s) <<
"\n " << worker->description()->moduleName() <<
" " << worker->description()->moduleLabel();
LuminosityBlockNumber_t luminosityBlock() const
std::atomic< bool > exceptionMessageLumis_
bool readNextEventForStream(WaitingTaskHolder const &, unsigned int iStreamIndex, LuminosityBlockProcessingStatus &)
void readEvent(unsigned int iStreamIndex)
void streamEndRunAsync(WaitingTaskHolder, unsigned int iStreamIndex)
ProcessContext processContext_
Log< level::System, false > LogSystem
void setProcessesWithMergeableRunProducts(ProductRegistry const &productRegistry)
T getParameter(std::string const &) const
void clear()
Not thread safe.
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
void clearRunPrincipal(RunProcessingStatus &)
void setExceptionMessageRuns()
void globalEndRunAsync(WaitingTaskHolder, std::shared_ptr< RunProcessingStatus >)
SharedResourcesAcquirer sourceResourcesAcquirer_
Timestamp const & endTime() const
void handleNextEventForStreamAsync(WaitingTaskHolder, unsigned int iStreamIndex)
int totalEventsFailed() const
InputSource::ItemType nextTransitionType()
std::shared_ptr< ProductRegistry const > preg() const
void warnAboutLegacyModules() const
void processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
def create(alignables, pedeDump, additionalData, outputFile, config)
std::shared_ptr< RunPrincipal > readRun()
void handleEndLumiExceptions(std::exception_ptr, WaitingTaskHolder const &)
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
static PFTauRenderPlugin instance
EventProcessingState eventProcessingState() const
SerialTaskQueue streamQueuesInserter_
void endUnfinishedRun(bool cleaningUpAfterException)
void setExceptionMessageFiles(std::string &message)
RunPrincipal const & runPrincipal() const
void processEventAsyncImpl(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
void processEventWithLooper(EventPrincipal &, unsigned int iStreamIndex)
void handleEndRunExceptions(std::exception_ptr, WaitingTaskHolder const &)
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
std::unique_ptr< edm::LimitedTaskQueue > lumiQueue_
volatile std::atomic< bool > shutdown_flag
static LuminosityBlockNumber_t maxLuminosityBlockNumber()
void clearCounters()
Clears counters used by trigger report.
ProcessBlockPrincipal & inputProcessBlockPrincipal() const
void push(oneapi::tbb::task_group &, const T &iAction)
asynchronously pushes functor iAction into queue
bool checkForAsyncStopRequest(StatusCode &)
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
void writeLumiAsync(WaitingTaskHolder, LuminosityBlockPrincipal &)
std::unique_ptr< edm::ModuleTypeResolverMaker const > makeModuleTypeResolverMaker(edm::ParameterSet const &pset)
std::shared_ptr< EDLooperBase const > looper() const
void ensureAvailableAccelerators(edm::ParameterSet const ¶meterSet)
ParameterSet getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
std::shared_ptr< RunPrincipal > getAvailableRunPrincipalPtr()
InputSource::ItemType lastTransitionType() const
std::shared_ptr< RunPrincipal > & runPrincipal()
void adjustEventsToNewProductRegistry(std::shared_ptr< ProductRegistry const >)
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
Log< level::Error, false > LogError
void adjustIndexesAfterProductRegistryAddition()
StreamID streamID() const
ParameterSet const & parameterSet(StableProvenance const &provenance, ProcessHistory const &history)
PreallocationConfiguration preallocations_
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
constexpr auto then(O &&iO)
std::vector< SubProcess > subProcesses_
static unsigned int getUniqueID()
Returns a unique id each time called. Intended to be passed to ModuleDescription's constructor's modI...
void swap(Association< C > &lhs, Association< C > &rhs)
std::atomic< bool > exceptionMessageRuns_
void mergeAuxiliary(RunAuxiliary const &aux)
edm::SerialTaskQueue queueWhichWaitsForIOVsToFinish_
void validateTopLevelParameterSets(ParameterSet *processParameterSet)
MergeableRunProductProcesses mergeableRunProductProcesses_
static std::string const input
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
void fillProcessBlockPrincipal(std::string const &processName, DelayedReader *reader=nullptr)
void setEndTime(Timestamp const &time)
ProcessBlockPrincipal & processBlockPrincipal() const
edm::propagate_const< std::unique_ptr< ModuleTypeResolverMaker const > > moduleTypeResolverMaker_
void endProcessBlock(bool cleaningUpAfterException, bool beginProcessBlockSucceeded)
U second(std::pair< T, U > const &p)
std::shared_ptr< LuminosityBlockPrincipal > & lumiPrincipal()
bool firstItemAfterLumiMerge_
oneapi::tbb::task_group * group() const noexcept
std::multimap< std::string, std::string > referencesToBranches_
unsigned int numberOfThreads() const
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
ServiceToken serviceToken_
ParameterSetID id() const
std::atomic< bool > deferredExceptionPtrIsSet_
bool resume()
Resumes processing if the queue was paused.
void doneWaiting(std::exception_ptr iExcept)
ParameterSet const & registerIt()
std::shared_ptr< EDLooperBase > fillLooper(eventsetup::EventSetupsController &esController, eventsetup::EventSetupProvider &cp, ParameterSet ¶ms, std::vector< std::string > const &loopers)
std::vector< edm::SerialTaskQueue > streamQueues_
auto runLast(edm::WaitingTaskHolder iTask)
bool taskHasFailed() const noexcept
std::vector< std::string > modulesToIgnoreForDeleteEarly_
ShouldWriteRun shouldWriteRun() const
std::unique_ptr< edm::LimitedTaskQueue > runQueue_
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
void writeRunAsync(WaitingTaskHolder, RunPrincipal const &, MergeableRunProductMetadata const *)
int merge(int argc, char *argv[])
SerialTaskQueueChain & serialQueueChain() const
static void setThrowAnException(bool v)
void writeProcessBlockAsync(WaitingTaskHolder, ProcessBlockType)
void setLastOperationSucceeded(bool value)
unsigned int numberOfStreams() const
void initialize(Schedule const *, std::shared_ptr< ProductRegistry const >)
std::vector< ModuleDescription const * > nonConsumedUnscheduledModules(edm::PathsAndConsumesOfModulesBase const &iPnC, std::vector< ModuleProcessName > &consumedByChildren)
static ServiceRegistry & instance()
void clear()
Not thread safe.
StatusCode runToCompletion()
FunctorWaitingTask< F > * make_waiting_task(F f)
void clearLumiPrincipal(LuminosityBlockProcessingStatus &)
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
bool lastOperationSucceeded() const
unsigned int numberOfRuns() const
void insert(std::unique_ptr< ProcessBlockPrincipal >)
std::vector< std::shared_ptr< RunProcessingStatus > > streamRunStatus_
std::string exceptionMessageFiles_
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
std::unique_ptr< InputSource > makeInput(unsigned int moduleIndex, ParameterSet ¶ms, CommonParams const &common, std::shared_ptr< ProductRegistry > preg, std::shared_ptr< BranchIDListHelper > branchIDListHelper, std::shared_ptr< ProcessBlockHelper > const &processBlockHelper, std::shared_ptr< ThinnedAssociationsHelper > thinnedAssociationsHelper, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, PreallocationConfiguration const &allocations)
void releaseBeginRunResources(unsigned int iStream)
std::shared_ptr< RunProcessingStatus > exceptionRunStatus_
void respondToCloseInputFile()
InputSource::ItemType lastSourceTransition_
Log< level::Info, false > LogInfo
void readAndMergeRun(RunProcessingStatus &)
std::vector< ParameterSet > popSubProcessVParameterSet(ParameterSet ¶meterSet)
void reportShutdownSignal()
void warnAboutModulesRequiringRunSynchronization() const
void checkForModuleDependencyCorrectness(edm::PathsAndConsumesOfModulesBase const &iPnC, bool iPrintDependencies)
void respondToOpenInputFile()
unsigned int numberOfLuminosityBlocks() const
void beginLumiAsync(IOVSyncValue const &, std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
EventSetupImpl const & eventSetupImpl(unsigned subProcessIndex) const
void handleNextItemAfterMergingRunEntries(std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
InputSource::ItemType processRuns()
MergeableRunProductMetadata * mergeableRunProductMetadata()
oneapi::tbb::task_group taskGroup_
void throwAboutModulesRequiringLuminosityBlockSynchronization() const
void insertForInput(std::unique_ptr< ProcessBlockPrincipal >)
void addContext(std::string const &context)
static EventNumber_t maxEventNumber()
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
ShouldWriteLumi shouldWriteLumi() const
edm::EventID specifiedEventTransition() const
void endRunAsync(std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
bool forceESCacheClearOnNewRun_
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
bool shouldWeStop() const
std::vector< std::shared_ptr< const EventSetupImpl > > & eventSetupImpls()
std::atomic< unsigned int > streamRunActive_
void readAndMergeLumi(LuminosityBlockProcessingStatus &)
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
std::vector< std::string > branchesToDeleteEarly_
void readAndMergeLumiEntriesAsync(std::shared_ptr< LuminosityBlockProcessingStatus >, WaitingTaskHolder)
void closeInputFile(bool cleaningUpAfterException)
void readAndMergeRunEntriesAsync(std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
void readProcessBlock(ProcessBlockPrincipal &)
bool adjustToNewProductRegistry(ProductRegistry const ®)
static ComponentFactory< T > const * get()
std::exception_ptr deferredExceptionPtr_
void removeModules(std::vector< ModuleDescription const *> const &modules)
std::shared_ptr< LuminosityBlockPrincipal > readLuminosityBlock(std::shared_ptr< RunPrincipal > rp)
auto lastTask(edm::WaitingTaskHolder iTask)
T const & get(Event const &event, InputTag const &tag) noexcept(false)
void endUnfinishedLumi(bool cleaningUpAfterException)
bool shouldWeCloseOutput() const
PathsAndConsumesOfModules pathsAndConsumesOfModules_
int totalEventsPassed() const
edm::propagate_const< std::shared_ptr< ProcessBlockHelper > > processBlockHelper_
void continueLumiAsync(WaitingTaskHolder)
Transition requestedTransition() const
void globalEndLumiAsync(WaitingTaskHolder, std::shared_ptr< LuminosityBlockProcessingStatus >)
Log< level::System, true > LogAbsolute
void prepareForNextLoop()
std::shared_ptr< LuminosityBlockPrincipal > getAvailableLumiPrincipalPtr()
void streamBeginRunAsync(unsigned int iStream, std::shared_ptr< RunProcessingStatus >, bool precedingTasksSucceeded, WaitingTaskHolder)
auto wrap(F iFunc) -> decltype(iFunc())
std::shared_ptr< ActivityRegistry > actReg_
std::atomic< unsigned int > streamLumiActive_
Log< level::Warning, false > LogWarning
void streamEndLumiAsync(WaitingTaskHolder, unsigned int iStreamIndex)
void beginProcessBlock(bool &beginProcessBlockSucceeded)
The Signals That Services Can Subscribe To This is based on ActivityRegistry h
Helper function to determine trigger accepts.
T first(std::pair< T, U > const &p)
static ParentageRegistry * instance()
void setExceptionMessageLumis()
void beginRunAsync(IOVSyncValue const &, WaitingTaskHolder)
bool setDeferredException(std::exception_ptr)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
void setEventProcessingState(EventProcessingState val)
bool deleteNonConsumedUnscheduledModules_
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
void inputProcessBlocks()
bool insertMapped(value_type const &v)
static Registry * instance()
PrincipalCache principalCache_
void push(oneapi::tbb::task_group &iGroup, T &&iAction)
asynchronously pushes functor iAction into queue
EventProcessor(std::unique_ptr< ParameterSet > parameterSet, ServiceToken const &token=ServiceToken(), serviceregistry::ServiceLegacy=serviceregistry::kOverlapIsError, std::vector< std::string > const &defaultServices=std::vector< std::string >(), std::vector< std::string > const &forcedServices=std::vector< std::string >())
void dumpOptionsToLogFile(unsigned int nThreads, unsigned int nStreams, unsigned int nConcurrentLumis, unsigned int nConcurrentRuns)