79 #include "boost/range/adaptor/reversed.hpp"
91 #include "oneapi/tbb/task.h"
103 class SendSourceTerminationSignalIfException {
106 ~SendSourceTerminationSignalIfException() {
111 void completedSuccessfully() { reg_ =
nullptr; }
120 namespace chain = waiting_task::chain;
125 std::shared_ptr<ProductRegistry>
preg,
126 std::shared_ptr<BranchIDListHelper> branchIDListHelper,
127 std::shared_ptr<ProcessBlockHelper>
const& processBlockHelper,
128 std::shared_ptr<ThinnedAssociationsHelper> thinnedAssociationsHelper,
129 std::shared_ptr<ActivityRegistry>
areg,
133 if (main_input ==
nullptr) {
135 <<
"There must be exactly one source in the configuration.\n"
136 <<
"It is missing (or there are sufficient syntax errors such that it is not recognized as the source)\n";
141 std::unique_ptr<ParameterSetDescriptionFillerBase> filler(
144 filler->fill(descriptions);
149 std::ostringstream ost;
150 ost <<
"Validating configuration of input source of type " << modtype;
166 processConfiguration.get(),
173 thinnedAssociationsHelper,
180 areg->preSourceConstructionSignal_(md);
181 std::unique_ptr<InputSource>
input;
184 std::shared_ptr<int> sentry(
nullptr, [areg, &md](
void*) { areg->postSourceConstructionSignal_(md); });
187 input->preEventReadFromSourceSignal_.connect(std::cref(areg->preEventReadFromSourceSignal_));
188 input->postEventReadFromSourceSignal_.connect(std::cref(areg->postEventReadFromSourceSignal_));
191 std::ostringstream ost;
192 ost <<
"Constructing input source of type " << modtype;
205 filler->fill(descriptions);
210 fmt::format(
"Validating configuration of EDLooper of type {} with label: '{}'", modtype, moduleLabel));
218 std::vector<std::string>
const& loopers) {
219 std::shared_ptr<EDLooperBase> vLooper;
221 assert(1 == loopers.size());
223 for (
auto const& looperName : loopers) {
236 std::vector<std::string>
const& defaultServices,
237 std::vector<std::string>
const& forcedServices)
240 branchIDListHelper_(),
243 espController_(new eventsetup::EventSetupsController),
246 processConfiguration_(),
252 deferredExceptionPtrIsSet_(
false),
256 beginJobCalled_(
false),
257 shouldWeStop_(
false),
258 fileModeNoMerge_(
false),
259 exceptionMessageFiles_(),
260 exceptionMessageRuns_(),
261 exceptionMessageLumis_(
false),
262 forceLooperToEnd_(
false),
263 looperBeginJobRun_(
false),
264 forceESCacheClearOnNewRun_(
false),
265 eventSetupDataToExcludeFromPrefetching_() {
266 auto processDesc = std::make_shared<ProcessDesc>(
std::move(parameterSet));
267 processDesc->addServices(defaultServices, forcedServices);
268 init(processDesc, iToken, iLegacy);
272 std::vector<std::string>
const& defaultServices,
273 std::vector<std::string>
const& forcedServices)
276 branchIDListHelper_(),
279 espController_(new eventsetup::EventSetupsController),
282 processConfiguration_(),
288 deferredExceptionPtrIsSet_(
false),
292 beginJobCalled_(
false),
293 shouldWeStop_(
false),
294 fileModeNoMerge_(
false),
295 exceptionMessageFiles_(),
296 exceptionMessageRuns_(),
297 exceptionMessageLumis_(
false),
298 forceLooperToEnd_(
false),
299 looperBeginJobRun_(
false),
300 forceESCacheClearOnNewRun_(
false),
301 asyncStopRequestedWhileProcessingEvents_(
false),
302 eventSetupDataToExcludeFromPrefetching_() {
303 auto processDesc = std::make_shared<ProcessDesc>(
std::move(parameterSet));
304 processDesc->addServices(defaultServices, forcedServices);
313 branchIDListHelper_(),
316 espController_(new eventsetup::EventSetupsController),
319 processConfiguration_(),
325 deferredExceptionPtrIsSet_(
false),
329 beginJobCalled_(
false),
330 shouldWeStop_(
false),
331 fileModeNoMerge_(
false),
332 exceptionMessageFiles_(),
333 exceptionMessageRuns_(),
334 exceptionMessageLumis_(
false),
335 forceLooperToEnd_(
false),
336 looperBeginJobRun_(
false),
337 forceESCacheClearOnNewRun_(
false),
338 asyncStopRequestedWhileProcessingEvents_(
false),
339 eventSetupDataToExcludeFromPrefetching_() {
340 init(processDesc, token, legacy);
354 std::shared_ptr<ParameterSet>
parameterSet = processDesc->getProcessPSet();
358 bool const hasSubProcesses = !subProcessVParameterSet.empty();
366 ParameterSet const& optionsPset(parameterSet->getUntrackedParameterSet(
"options"));
368 if (fileMode !=
"NOMERGE" and fileMode !=
"FULLMERGE") {
371 <<
"Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
379 unsigned int nThreads = optionsPset.getUntrackedParameter<
unsigned int>(
"numberOfThreads");
385 unsigned int nStreams = optionsPset.getUntrackedParameter<
unsigned int>(
"numberOfStreams");
389 unsigned int nConcurrentRuns = optionsPset.getUntrackedParameter<
unsigned int>(
"numberOfConcurrentRuns");
390 if (nConcurrentRuns != 1) {
392 <<
"Although the plan is to change this in the future, currently nConcurrentRuns must always be 1.\n";
394 unsigned int nConcurrentLumis =
395 optionsPset.getUntrackedParameter<
unsigned int>(
"numberOfConcurrentLuminosityBlocks");
396 if (nConcurrentLumis == 0) {
397 nConcurrentLumis = 2;
399 if (nConcurrentLumis > nStreams) {
400 nConcurrentLumis = nStreams;
402 std::vector<std::string> loopers = parameterSet->getParameter<std::vector<std::string>>(
"@all_loopers");
403 if (!loopers.empty()) {
405 if (nStreams != 1 || nConcurrentLumis != 1 || nConcurrentRuns != 1) {
406 edm::LogWarning(
"ThreadStreamSetup") <<
"There is a looper, so the number of streams, the number "
407 "of concurrent runs, and the number of concurrent lumis "
408 "are all being reset to 1. Loopers cannot currently support "
409 "values greater than 1.";
411 nConcurrentLumis = 1;
415 bool dumpOptions = optionsPset.getUntrackedParameter<
bool>(
"dumpOptions");
419 if (nThreads > 1
or nStreams > 1) {
420 edm::LogInfo(
"ThreadStreamSetup") <<
"setting # threads " << nThreads <<
"\nsetting # streams " << nStreams;
425 unsigned int maxConcurrentIOVs = nConcurrentLumis;
443 optionsPset.getUntrackedParameter<
bool>(
"deleteNonConsumedUnscheduledModules");
446 if (not hasSubProcesses) {
454 auto& serviceSets = processDesc->getServicesPSets();
463 handler->willBeUsingThreads();
467 std::shared_ptr<CommonParams> common(items.
initMisc(*parameterSet));
470 ParameterSet const& eventSetupPset(optionsPset.getUntrackedParameterSet(
"eventSetup"));
472 *parameterSet, items.
actReg_.get(), &eventSetupPset, maxConcurrentIOVs, dumpOptions);
475 if (!loopers.empty()) {
481 deleteNonConsumedUnscheduledModules_ =
false;
486 lumiQueue_ = std::make_unique<LimitedTaskQueue>(nConcurrentLumis);
518 FDEBUG(2) << parameterSet << std::endl;
523 auto ep = std::make_shared<EventPrincipal>(
preg(),
550 for (
auto& subProcessPSet : subProcessVParameterSet) {
606 actReg_->preallocateSignal_(bounds);
610 std::vector<ModuleProcessName> consumedBySubProcesses;
613 auto c = subProcess.keepOnlyConsumedUnscheduledModules(deleteModules);
614 if (consumedBySubProcesses.empty()) {
616 }
else if (not
c.empty()) {
617 std::vector<ModuleProcessName>
tmp;
618 tmp.reserve(consumedBySubProcesses.size() +
c.size());
620 consumedBySubProcesses.end(),
623 std::back_inserter(tmp));
632 not unusedModules.empty()) {
635 edm::LogInfo(
"DeleteModules").log([&unusedModules](
auto&
l) {
636 l <<
"Following modules are not in any Path or EndPath, nor is their output consumed by any other module, "
638 "therefore they are deleted before beginJob transition.";
680 ex.
addContext(
"Calling beginJob for the source");
686 constexpr
bool mustPrefetchMayGet =
true;
688 auto const runLookup =
preg_->productLookup(
InRun);
689 auto const lumiLookup =
preg_->productLookup(
InLumi);
692 looper_->updateLookup(
InRun, *runLookup, mustPrefetchMayGet);
693 looper_->updateLookup(
InLumi, *lumiLookup, mustPrefetchMayGet);
695 looper_->updateLookup(
esp_->recordsToProxyIndices());
699 actReg_->postBeginJobSignal_();
702 oneapi::tbb::task_group
group;
703 using namespace edm::waiting_task::chain;
704 first([
this](
auto nextTask) {
706 first([
i,
this](
auto nextTask) {
724 "Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
729 using namespace edm::waiting_task::chain;
732 oneapi::tbb::task_group
group;
739 first([
this,
i, &c, &collectorMutex](
auto nextTask) {
740 std::exception_ptr ep;
745 ep = std::current_exception();
748 std::lock_guard<std::mutex>
l(collectorMutex);
749 c.
call([&ep]() { std::rethrow_exception(ep); });
751 }) |
then([
this,
i, &c, &collectorMutex](
auto nextTask) {
753 first([
this,
i, &c, &collectorMutex, &subProcess](
auto nextTask) {
754 std::exception_ptr ep;
757 subProcess.doEndStream(
i);
759 ep = std::current_exception();
762 std::lock_guard<std::mutex>
l(collectorMutex);
763 c.
call([&ep]() { std::rethrow_exception(ep); });
773 c.
call([actReg]() { actReg->preEndJobSignal_(); });
782 c.
call([actReg]() { actReg->postEndJobSignal_(); });
791 return schedule_->getAllModuleDescriptions();
803 #include "TransitionProcessors.icc"
807 bool returnValue =
false;
823 SendSourceTerminationSignalIfException sentry(
actReg_.get());
827 itemType =
input_->nextItemType();
831 sentry.completedSuccessfully();
844 return std::make_pair(
input_->reducedProcessHistoryID(),
input_->run());
863 bool firstTime =
true;
873 auto trans = fp.processFiles(*
this);
884 throw cms::Exception(
"BadTransition") <<
"Unexpected transition change " << trans;
893 "Another exception was caught while trying to clean up lumis after the primary fatal exception.");
919 FDEBUG(1) <<
" \treadFile\n";
921 SendSourceTerminationSignalIfException sentry(
actReg_.get());
926 if (size < preg_->
size()) {
933 sentry.completedSuccessfully();
938 SendSourceTerminationSignalIfException sentry(
actReg_.get());
939 input_->closeFile(
fb_.get(), cleaningUpAfterException);
940 sentry.completedSuccessfully();
942 FDEBUG(1) <<
"\tcloseInputFile\n";
950 FDEBUG(1) <<
"\topenOutputFiles\n";
957 FDEBUG(1) <<
"\tcloseOutputFiles\n";
963 [
this](
auto& subProcess) { subProcess.updateBranchIDListHelper(
branchIDListHelper_->branchIDLists()); });
967 FDEBUG(1) <<
"\trespondToOpenInputFile\n";
975 FDEBUG(1) <<
"\trespondToCloseInputFile\n";
985 FDEBUG(1) <<
"\tstartingNewLoop\n";
991 looper_->setModuleChanger(&changer);
993 looper_->setModuleChanger(
nullptr);
999 FDEBUG(1) <<
"\tendOfLoop\n";
1006 FDEBUG(1) <<
"\trewind\n";
1011 FDEBUG(1) <<
"\tprepareForNextLoop\n";
1015 FDEBUG(1) <<
"\tshouldWeCloseOutput\n";
1018 if (subProcess.shouldWeCloseOutput()) {
1024 return schedule_->shouldWeCloseOutput();
1028 FDEBUG(1) <<
"\tdoErrorStuff\n";
1029 LogError(
"StateMachine") <<
"The EventProcessor state machine encountered an unexpected event\n"
1030 <<
"and went to the error state\n"
1031 <<
"Will attempt to terminate processing normally\n"
1032 <<
"(IF using the looper the next loop will be attempted)\n"
1033 <<
"This likely indicates a bug in an input module or corrupted input or both\n";
1044 beginGlobalTransitionAsync<Traits>(
1049 }
while (not globalWaitTask.
done());
1052 std::rethrow_exception(*(globalWaitTask.
exceptionPtr()));
1054 beginProcessBlockSucceeded =
true;
1058 input_->fillProcessBlockHelper();
1060 while (
input_->nextProcessBlock(processBlockPrincipal)) {
1067 beginGlobalTransitionAsync<Traits>(
1072 }
while (not globalWaitTask.
done());
1074 std::rethrow_exception(*(globalWaitTask.
exceptionPtr()));
1081 }
while (not writeWaitTask.
done());
1105 cleaningUpAfterException);
1108 }
while (not globalWaitTask.
done());
1110 std::rethrow_exception(*(globalWaitTask.
exceptionPtr()));
1113 if (beginProcessBlockSucceeded) {
1118 }
while (not writeWaitTask.
done());
1125 for (
auto&
s : subProcesses_) {
1132 bool& globalBeginSucceeded,
1133 bool& eventSetupForInstanceSucceeded) {
1134 globalBeginSucceeded =
false;
1137 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1140 sentry.completedSuccessfully();
1148 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1150 eventSetupForInstanceSucceeded =
true;
1151 sentry.completedSuccessfully();
1153 auto const& es =
esp_->eventSetupImpl();
1158 using namespace edm::waiting_task::chain;
1161 }) |
then([
this, &es](
auto nextTask)
mutable {
1169 }
while (not waitTask.
done());
1178 using namespace edm::waiting_task::chain;
1179 chain::first([&runPrincipal, &es,
this](
auto waitTask) {
1181 beginGlobalTransitionAsync<Traits>(
1183 }) |
then([&globalBeginSucceeded, run](
auto waitTask)
mutable {
1184 globalBeginSucceeded =
true;
1185 FDEBUG(1) <<
"\tbeginRun " << run <<
"\n";
1186 }) |
ifThen(
looper_, [
this, &runPrincipal, &es](
auto waitTask) {
1188 }) |
ifThen(
looper_, [
this, &runPrincipal, &es](
auto waitTask) {
1194 }
while (not globalWaitTask.
done());
1196 std::rethrow_exception(*(globalWaitTask.
exceptionPtr()));
1214 }
while (not streamLoopWaitTask.
done());
1216 std::rethrow_exception(*(streamLoopWaitTask.
exceptionPtr()));
1219 FDEBUG(1) <<
"\tstreamBeginRun " << run <<
"\n";
1227 bool globalBeginSucceeded,
1228 bool cleaningUpAfterException,
1229 bool eventSetupForInstanceSucceeded) {
1230 if (eventSetupForInstanceSucceeded) {
1232 endRun(phid, run, globalBeginSucceeded, cleaningUpAfterException);
1234 if (globalBeginSucceeded) {
1242 }
while (not t.
done());
1254 bool globalBeginSucceeded,
1255 bool cleaningUpAfterException) {
1263 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1265 sentry.completedSuccessfully();
1267 auto const& es =
esp_->eventSetupImpl();
1268 if (globalBeginSucceeded) {
1281 cleaningUpAfterException);
1284 }
while (not streamLoopWaitTask.
done());
1286 std::rethrow_exception(*(streamLoopWaitTask.
exceptionPtr()));
1289 FDEBUG(1) <<
"\tstreamEndRun " << run <<
"\n";
1296 using namespace edm::waiting_task::chain;
1297 chain::first([
this, &runPrincipal, &es, cleaningUpAfterException](
auto nextTask) {
1300 endGlobalTransitionAsync<Traits>(
1302 }) |
ifThen(
looper_, [
this, &runPrincipal, &es](
auto nextTask) {
1304 }) |
ifThen(
looper_, [
this, &runPrincipal, &es](
auto nextTask) {
1310 }
while (not globalWaitTask.
done());
1312 std::rethrow_exception(*(globalWaitTask.
exceptionPtr()));
1315 FDEBUG(1) <<
"\tendRun " << run <<
"\n";
1326 input_->luminosityBlockAuxiliary()->beginTime()),
1332 }
while (not waitTask.
done());
1341 std::shared_ptr<void>
const& iRunResource,
1359 auto* espController,
1366 SendSourceTerminationSignalIfException sentry(actReg);
1371 espController->eventSetupForInstanceAsync(
1372 iSync, task, status->endIOVWaitingTasks(), status->eventSetupImpls());
1373 sentry.completedSuccessfully();
1375 task.doneWaiting(std::current_exception());
1382 auto group = nextTask.group();
1384 *
group, [
this, task =
std::move(nextTask), iSync, status, asyncEventSetup]()
mutable {
1392 }) |
chain::then([
this,
status](std::exception_ptr
const* iPtr,
auto nextTask) {
1394 auto copyTask = nextTask;
1396 nextTask.doneWaiting(*iPtr);
1398 auto group = copyTask.group();
1401 if (task.taskHasFailed()) {
1402 status->resetResources();
1408 auto group = task.group();
1419 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1422 sentry.completedSuccessfully();
1428 rng->preBeginLumi(lb);
1433 using namespace edm::waiting_task::chain;
1439 beginGlobalTransitionAsync<Traits>(
1446 status->globalBeginDidSucceed();
1450 }) |
then([
this,
status](std::exception_ptr
const* iPtr,
auto holder)
mutable {
1452 status->resetResources();
1453 holder.doneWaiting(*iPtr);
1456 status->globalBeginDidSucceed();
1469 auto eventSetupImpls = &status->eventSetupImpls();
1470 auto lp = status->lumiPrincipal().get();
1473 event.setLuminosityBlockPrincipal(lp);
1475 using namespace edm::waiting_task::chain;
1477 beginStreamTransitionAsync<Traits>(
1479 }) |
then([
this,
i](std::exception_ptr
const* exceptionFromBeginStreamLumi,
auto nextTask) {
1480 if (exceptionFromBeginStreamLumi) {
1494 status->resetResources();
1495 postQueueTask.doneWaiting(std::current_exception());
1506 status->needToContinueLumi();
1507 status->startProcessingEvents();
1510 unsigned int streamIndex = 0;
1511 oneapi::tbb::task_arena arena{oneapi::tbb::task_arena::attach()};
1515 iHolder.
group()->run(
1529 std::shared_ptr<LuminosityBlockProcessingStatus> iLumiStatus) {
1532 auto& lp = *(iLumiStatus->lumiPrincipal());
1533 bool didGlobalBeginSucceed = iLumiStatus->didGlobalBeginSucceed();
1534 bool cleaningUpAfterException = iLumiStatus->cleaningUpAfterException();
1536 std::vector<std::shared_ptr<const EventSetupImpl>>
const* eventSetupImpls = &iLumiStatus->eventSetupImpls();
1538 using namespace edm::waiting_task::chain;
1539 chain::first([
this, &lp, &es, &eventSetupImpls, cleaningUpAfterException](
auto nextTask) {
1544 endGlobalTransitionAsync<Traits>(
1546 }) |
then([
this, didGlobalBeginSucceed, &lumiPrincipal = lp](
auto nextTask) {
1548 if (didGlobalBeginSucceed) {
1557 }) |
then([
status =
std::move(iLumiStatus),
this](std::exception_ptr
const* iPtr,
auto nextTask)
mutable {
1558 std::exception_ptr ptr;
1570 ptr = std::current_exception();
1575 status->resumeGlobalLumiQueue();
1579 ptr = std::current_exception();
1587 status->resetResources();
1591 ptr = std::current_exception();
1613 if (
status->streamFinishedLumi()) {
1623 lumiStatus->setEndTime();
1627 bool cleaningUpAfterException = lumiStatus->cleaningUpAfterException();
1628 auto eventSetupImpls = &lumiStatus->eventSetupImpls();
1630 if (lumiStatus->didGlobalBeginSucceed()) {
1631 auto& lumiPrincipal = *lumiStatus->lumiPrincipal();
1633 lumiPrincipal.endTime());
1636 endStreamTransitionAsync<Traits>(
std::move(lumiDoneTask),
1642 cleaningUpAfterException);
1659 }
while (not globalWaitTask.
done());
1661 std::rethrow_exception(*(globalWaitTask.
exceptionPtr()));
1667 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1668 input_->readProcessBlock(processBlockPrincipal);
1669 sentry.completedSuccessfully();
1675 <<
"Illegal attempt to insert run into cache\n"
1676 <<
"Contact a Framework Developer\n";
1678 auto rp = std::make_shared<RunPrincipal>(
input_->runAuxiliary(),
1686 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1688 sentry.completedSuccessfully();
1690 assert(
input_->reducedProcessHistoryID() == rp->reducedProcessHistoryID());
1692 return std::make_pair(rp->reducedProcessHistoryID(),
input_->run());
1699 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1700 input_->readAndMergeRun(*runPrincipal);
1701 sentry.completedSuccessfully();
1703 assert(
input_->reducedProcessHistoryID() == runPrincipal->reducedProcessHistoryID());
1704 return std::make_pair(runPrincipal->reducedProcessHistoryID(),
input_->run());
1710 <<
"Illegal attempt to insert lumi into cache\n"
1711 <<
"Run is invalid\n"
1712 <<
"Contact a Framework Developer\n";
1716 lbp->setAux(*
input_->luminosityBlockAuxiliary());
1718 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1720 sentry.completedSuccessfully();
1728 assert(lumiPrincipal.aux().sameIdentity(*
input_->luminosityBlockAuxiliary())
or
1729 input_->processHistoryRegistry().reducedProcessHistoryID(lumiPrincipal.aux().processHistoryID()) ==
1730 input_->processHistoryRegistry().reducedProcessHistoryID(
1731 input_->luminosityBlockAuxiliary()->processHistoryID()));
1732 bool lumiOK = lumiPrincipal.adjustToNewProductRegistry(*
preg());
1734 lumiPrincipal.mergeAuxiliary(*
input_->luminosityBlockAuxiliary());
1736 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1738 sentry.completedSuccessfully();
1740 return input_->luminosityBlock();
1744 using namespace edm::waiting_task;
1752 s.writeProcessBlockAsync(nextTask, processBlockType);
1761 using namespace edm::waiting_task;
1768 mergeableRunProductMetadata);
1772 s.writeRunAsync(nextTask, phid, run, mergeableRunProductMetadata);
1779 for_all(
subProcesses_, [run, phid](
auto& subProcess) { subProcess.deleteRunFromCache(phid, run); });
1780 FDEBUG(1) <<
"\tdeleteRunFromCache " << run <<
"\n";
1784 using namespace edm::waiting_task;
1794 s.writeLumiAsync(nextTask, lumiPrincipal);
1831 std::lock_guard<std::recursive_mutex> guard(*(
sourceMutex_.get()));
1843 input_->luminosityBlockAuxiliary()->beginTime()));
1860 bool expected =
false;
1879 auto recursionTask =
make_waiting_task([
this, iTask, iStreamIndex](std::exception_ptr
const* iPtr)
mutable {
1883 bool expected =
false;
1901 if (
status->isLumiEnding()) {
1917 bool expected =
false;
1919 auto e = std::current_exception();
1932 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1936 sentry.completedSuccessfully();
1938 FDEBUG(1) <<
"\treadEvent\n";
1952 rng->postEventRead(ev);
1956 using namespace edm::waiting_task::chain;
1957 chain::first([
this, &es, pep, iStreamIndex](
auto nextTask) {
1961 for (
auto& subProcess : boost::adaptors::reverse(
subProcesses_)) {
1962 subProcess.doEventAsync(nextTask, *pep, &
streamLumiStatus_[iStreamIndex]->eventSetupImpls());
1964 }) |
ifThen(
looper_, [
this, iStreamIndex, pep](
auto nextTask) {
1968 }) |
then([pep](
auto nextTask) {
1969 FDEBUG(1) <<
"\tprocessEvent\n";
1970 pep->clearEventPrincipal();
1975 bool randomAccess =
input_->randomAccess();
1984 status =
looper_->doDuringLoop(iPrincipal, es, pc, &streamContext);
2003 FDEBUG(1) <<
"\tshouldWeStop\n";
2008 if (subProcess.terminate()) {
2024 bool expected =
false;
2034 ex <<
"The framework is configured to use at least two streams, but the following modules\n"
2035 <<
"require synchronizing on LuminosityBlock boundaries:";
2037 for (
auto worker :
schedule_->allWorkers()) {
2038 if (worker->wantsGlobalLuminosityBlocks() and worker->globalLuminosityBlocksQueue()) {
2040 ex <<
"\n " << worker->description()->moduleName() <<
" " << worker->description()->moduleLabel();
2044 ex <<
"\n\nThe situation can be fixed by either\n"
2045 <<
" * modifying the modules to support concurrent LuminosityBlocks (preferred), or\n"
2046 <<
" * setting 'process.options.numberOfConcurrentLuminosityBlocks = 1' in the configuration file";
2052 std::unique_ptr<LogSystem>
s;
2053 for (
auto worker :
schedule_->allWorkers()) {
2056 s = std::make_unique<LogSystem>(
"LegacyModules");
2057 (*s) <<
"The following legacy modules are configured. Support for legacy modules\n"
2058 "is going to end soon. These modules need to be converted to have type\n"
2059 "edm::global, edm::stream, edm::one, or in rare cases edm::limited.";
2061 (*s) <<
"\n " << worker->description()->moduleName() <<
" " << worker->description()->moduleLabel();
std::atomic< bool > exceptionMessageLumis_
RunPrincipal const & runPrincipal() const
std::shared_ptr< ActivityRegistry > actReg_
void readLuminosityBlock(LuminosityBlockProcessingStatus &)
std::unique_ptr< Schedule > initSchedule(ParameterSet ¶meterSet, bool hasSubprocesses, PreallocationConfiguration const &iAllocConfig, ProcessContext const *, ProcessBlockHelperBase &processBlockHelper)
void readEvent(unsigned int iStreamIndex)
ProcessContext processContext_
bool readNextEventForStream(unsigned int iStreamIndex, LuminosityBlockProcessingStatus &iLumiStatus)
void setProcessesWithMergeableRunProducts(ProductRegistry const &productRegistry)
void clear()
Not thread safe.
bool wasEventProcessingStopped() const
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
const edm::EventSetup & c
void deleteRunFromCache(ProcessHistoryID const &phid, RunNumber_t run)
SharedResourcesAcquirer sourceResourcesAcquirer_
edm::EventID specifiedEventTransition() const
InputSource::ItemType nextTransitionType()
void processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventIDconst &, edm::Timestampconst & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
std::unique_ptr< ExceptionToActionTable const > act_table_
edm::propagate_const< std::unique_ptr< InputSource > > input_
void globalEndLumiAsync(edm::WaitingTaskHolder iTask, std::shared_ptr< LuminosityBlockProcessingStatus > iLumiStatus)
std::unique_ptr< ExceptionToActionTable const > act_table_
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
static PFTauRenderPlugin instance
ParameterSetID id() const
void beginRun(ProcessHistoryID const &phid, RunNumber_t run, bool &globalBeginSucceeded, bool &eventSetupForInstanceSucceeded)
void setExceptionMessageFiles(std::string &message)
void processEventAsyncImpl(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
bool willBeContinued() const
The source is replaying overlapping LuminosityBlocks and this is not the last part for this Lumiosity...
void processEventWithLooper(EventPrincipal &, unsigned int iStreamIndex)
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
std::unique_ptr< edm::LimitedTaskQueue > lumiQueue_
volatile std::atomic< bool > shutdown_flag
static LuminosityBlockNumber_t maxLuminosityBlockNumber()
void clearCounters()
Clears counters used by trigger report.
unsigned int numberOfRuns() const
std::vector< ModuleDescription const * > nonConsumedUnscheduledModules(edm::PathsAndConsumesOfModulesBase const &iPnC, std::vector< ModuleProcessName > &consumedByChildren)
void push(oneapi::tbb::task_group &, const T &iAction)
asynchronously pushes functor iAction into queue
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
bool checkForAsyncStopRequest(StatusCode &)
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
bool lastOperationSucceeded() const
void writeRunAsync(WaitingTaskHolder, ProcessHistoryID const &phid, RunNumber_t run, MergeableRunProductMetadata const *)
void ensureAvailableAccelerators(edm::ParameterSet const ¶meterSet)
unsigned int numberOfThreads() const
bool hasRunPrincipal() const
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
Log< level::Error, false > LogError
void adjustIndexesAfterProductRegistryAddition()
std::string exceptionMessageRuns_
void deleteLumiFromCache(LuminosityBlockProcessingStatus &)
PreallocationConfiguration preallocations_
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
void writeLumiAsync(WaitingTaskHolder, LuminosityBlockPrincipal &lumiPrincipal)
constexpr auto then(O &&iO)
unsigned int LuminosityBlockNumber_t
std::vector< SubProcess > subProcesses_
static unsigned int getUniqueID()
Returns a unique id each time called. Intended to be passed to ModuleDescription's constructor's modI...
bool alreadyPrinted() const
void swap(Association< C > &lhs, Association< C > &rhs)
edm::SerialTaskQueue queueWhichWaitsForIOVsToFinish_
void validateTopLevelParameterSets(ParameterSet *processParameterSet)
MergeableRunProductProcesses mergeableRunProductProcesses_
static std::string const input
void synchronousEventSetupForInstance(IOVSyncValue const &syncValue, oneapi::tbb::task_group &iGroup, eventsetup::EventSetupsController &espController)
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
void handleNextEventForStreamAsync(WaitingTaskHolder iTask, unsigned int iStreamIndex)
void fillProcessBlockPrincipal(std::string const &processName, DelayedReader *reader=nullptr)
void setEndTime(Timestamp const &time)
void endProcessBlock(bool cleaningUpAfterException, bool beginProcessBlockSucceeded)
U second(std::pair< T, U > const &p)
std::shared_ptr< LuminosityBlockPrincipal > & lumiPrincipal()
oneapi::tbb::task_group * group() const noexcept
void initialize(Schedule const *, std::shared_ptr< ProductRegistry const >)
ServiceToken serviceToken_
ProcessBlockPrincipal & inputProcessBlockPrincipal() const
std::atomic< bool > deferredExceptionPtrIsSet_
bool resume()
Resumes processing if the queue was paused.
void doneWaiting(std::exception_ptr iExcept)
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
LuminosityBlockNumber_t luminosityBlock() const
std::shared_ptr< ProductRegistry const > preg() const
std::shared_ptr< EDLooperBase > fillLooper(eventsetup::EventSetupsController &esController, eventsetup::EventSetupProvider &cp, ParameterSet ¶ms, std::vector< std::string > const &loopers)
void warnAboutLegacyModules() const
std::vector< edm::SerialTaskQueue > streamQueues_
InputSource::ItemType lastTransitionType() const
auto runLast(edm::WaitingTaskHolder iTask)
void setExceptionMessageRuns(std::string &message)
void validateLooper(ParameterSet &pset)
std::shared_ptr< CommonParams > initMisc(ParameterSet ¶meterSet)
bool taskHasFailed() const noexcept
void swap(edm::DataFrameContainer &lhs, edm::DataFrameContainer &rhs)
Timestamp const & beginTime() const
ParameterSet const & parameterSet(StableProvenance const &provenance, ProcessHistory const &history)
void endRun(ProcessHistoryID const &phid, RunNumber_t run, bool globalBeginSucceeded, bool cleaningUpAfterException)
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
static void setThrowAnException(bool v)
void writeProcessBlockAsync(WaitingTaskHolder, ProcessBlockType)
void setLastOperationSucceeded(bool value)
StreamID streamID() const
void clear()
Not thread safe.
Timestamp const & endTime() const
void continueLumiAsync(edm::WaitingTaskHolder iHolder)
StatusCode runToCompletion()
bool continuingLumi() const
void addAdditionalInfo(std::string const &info)
FunctorWaitingTask< F > * make_waiting_task(F f)
void endUnfinishedRun(ProcessHistoryID const &phid, RunNumber_t run, bool globalBeginSucceeded, bool cleaningUpAfterException, bool eventSetupForInstanceSucceeded)
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
void throwAboutModulesRequiringLuminosityBlockSynchronization() const
unsigned int numberOfLuminosityBlocks() const
ProcessBlockPrincipal & processBlockPrincipal() const
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
std::unique_ptr< InputSource > makeInput(ParameterSet ¶ms, CommonParams const &common, std::shared_ptr< ProductRegistry > preg, std::shared_ptr< BranchIDListHelper > branchIDListHelper, std::shared_ptr< ProcessBlockHelper > const &processBlockHelper, std::shared_ptr< ThinnedAssociationsHelper > thinnedAssociationsHelper, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, PreallocationConfiguration const &allocations)
InputSource::ItemType processLumis(std::shared_ptr< void > const &iRunResource)
void insert(std::unique_ptr< ProcessBlockPrincipal >)
void adjustEventsToNewProductRegistry(std::shared_ptr< ProductRegistry const > reg)
SerialTaskQueueChain & serialQueueChain() const
std::string exceptionMessageFiles_
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
void respondToCloseInputFile()
InputSource::ItemType lastSourceTransition_
Log< level::Info, false > LogInfo
std::vector< ParameterSet > popSubProcessVParameterSet(ParameterSet ¶meterSet)
void setProcessHistoryRegistry(ProcessHistoryRegistry const &phr)
std::shared_ptr< ProcessConfiguration const > processConfiguration() const
StatusCode asyncStopStatusCodeFromProcessingEvents_
void checkForModuleDependencyCorrectness(edm::PathsAndConsumesOfModulesBase const &iPnC, bool iPrintDependencies)
void respondToOpenInputFile()
bool shouldWeCloseOutput() const
ServiceToken initServices(std::vector< ParameterSet > &servicePSets, ParameterSet &processPSet, ServiceToken const &iToken, serviceregistry::ServiceLegacy iLegacy, bool associate)
MergeableRunProductMetadata * mergeableRunProductMetadata()
int readAndMergeLumi(LuminosityBlockProcessingStatus &)
oneapi::tbb::task_group taskGroup_
T getParameter(std::string const &) const
void deleteRun(ProcessHistoryID const &phid, RunNumber_t run)
void insertForInput(std::unique_ptr< ProcessBlockPrincipal >)
ServiceToken addCPRandTNS(ParameterSet const ¶meterSet, ServiceToken const &token)
void addContext(std::string const &context)
void stopProcessingEvents()
static EventNumber_t maxEventNumber()
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
std::shared_ptr< RunPrincipal > const & runPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run) const
constexpr element_type const * get() const
bool forceESCacheClearOnNewRun_
void beginLumiAsync(edm::IOVSyncValue const &iSyncValue, std::shared_ptr< void > const &iRunResource, edm::WaitingTaskHolder iHolder)
void removeModules(std::vector< ModuleDescription const * > const &modules)
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
void handleEndLumiExceptions(std::exception_ptr const *iPtr, WaitingTaskHolder &holder)
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::vector< std::string > branchesToDeleteEarly_
void streamEndLumiAsync(edm::WaitingTaskHolder iTask, unsigned int iStreamIndex)
void closeInputFile(bool cleaningUpAfterException)
void readProcessBlock(ProcessBlockPrincipal &)
static ComponentFactory< T > const * get()
unsigned int numberOfStreams() const
std::exception_ptr deferredExceptionPtr_
int totalEventsFailed() const
auto lastTask(edm::WaitingTaskHolder iTask)
std::shared_ptr< SignallingProductRegistry const > preg() const
PathsAndConsumesOfModules pathsAndConsumesOfModules_
edm::LuminosityBlockNumber_t nextLuminosityBlockID()
edm::propagate_const< std::shared_ptr< ProcessBlockHelper > > processBlockHelper_
Log< level::System, true > LogAbsolute
void setNextSyncValue(IOVSyncValue const &iValue)
void prepareForNextLoop()
std::shared_ptr< LuminosityBlockPrincipal > getAvailableLumiPrincipalPtr()
auto wrap(F iFunc) -> decltype(iFunc())
bool asyncStopRequestedWhileProcessingEvents_
std::shared_ptr< ActivityRegistry > actReg_
void call(std::function< void(void)>)
std::atomic< unsigned int > streamLumiActive_
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
Log< level::Warning, false > LogWarning
void beginProcessBlock(bool &beginProcessBlockSucceeded)
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
std::exception_ptr const * exceptionPtr() const
Returns exception thrown by dependent task.
The Signals That Services Can Subscribe To This is based on ActivityRegistry h
Helper function to determine trigger accepts.
T first(std::pair< T, U > const &p)
std::pair< ProcessHistoryID, RunNumber_t > readRun()
static ParentageRegistry * instance()
void setExceptionMessageLumis()
bool setDeferredException(std::exception_ptr)
int totalEventsPassed() const
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
bool deleteNonConsumedUnscheduledModules_
ParameterSet const & registerIt()
tuple size
Write out results.
std::pair< ProcessHistoryID, RunNumber_t > readAndMergeRun()
void inputProcessBlocks()
bool insertMapped(value_type const &v)
Transition requestedTransition() const
static Registry * instance()
std::shared_ptr< EDLooperBase const > looper() const
void merge(std::shared_ptr< RunAuxiliary > aux, std::shared_ptr< ProductRegistry const > reg)
PrincipalCache principalCache_
bool shouldWeStop() const
void push(oneapi::tbb::task_group &iGroup, T &&iAction)
asynchronously pushes functor iAction into queue
EventProcessor(std::unique_ptr< ParameterSet > parameterSet, ServiceToken const &token=ServiceToken(), serviceregistry::ServiceLegacy=serviceregistry::kOverlapIsError, std::vector< std::string > const &defaultServices=std::vector< std::string >(), std::vector< std::string > const &forcedServices=std::vector< std::string >())
void dumpOptionsToLogFile(unsigned int nThreads, unsigned int nStreams, unsigned int nConcurrentLumis, unsigned int nConcurrentRuns)
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const
ParameterSet * getPSetForUpdate(std::string const &name, bool &isTracked)
std::pair< edm::ProcessHistoryID, edm::RunNumber_t > nextRunID()
int maxSecondsUntilRampdown_