79 #include "boost/range/adaptor/reversed.hpp" 91 #include "oneapi/tbb/task.h" 103 class SendSourceTerminationSignalIfException {
106 ~SendSourceTerminationSignalIfException() {
111 void completedSuccessfully() { reg_ =
nullptr; }
120 namespace chain = waiting_task::chain;
125 std::shared_ptr<ProductRegistry> preg,
126 std::shared_ptr<BranchIDListHelper> branchIDListHelper,
127 std::shared_ptr<ProcessBlockHelper>
const& processBlockHelper,
128 std::shared_ptr<ThinnedAssociationsHelper> thinnedAssociationsHelper,
129 std::shared_ptr<ActivityRegistry> areg,
130 std::shared_ptr<ProcessConfiguration const> processConfiguration,
133 if (main_input ==
nullptr) {
135 <<
"There must be exactly one source in the configuration.\n" 136 <<
"It is missing (or there are sufficient syntax errors such that it is not recognized as the source)\n";
141 std::unique_ptr<ParameterSetDescriptionFillerBase>
filler(
144 filler->fill(descriptions);
149 std::ostringstream ost;
150 ost <<
"Validating configuration of input source of type " << modtype;
166 processConfiguration.get(),
173 thinnedAssociationsHelper,
177 common.maxSecondsUntilRampdown_,
180 areg->preSourceConstructionSignal_(md);
181 std::unique_ptr<InputSource>
input;
184 std::shared_ptr<int> sentry(
nullptr, [areg, &md](
void*) { areg->postSourceConstructionSignal_(md); });
187 input->preEventReadFromSourceSignal_.connect(std::cref(areg->preEventReadFromSourceSignal_));
188 input->postEventReadFromSourceSignal_.connect(std::cref(areg->postEventReadFromSourceSignal_));
191 std::ostringstream ost;
192 ost <<
"Constructing input source of type " << modtype;
205 filler->fill(descriptions);
218 std::vector<std::string>
const& loopers) {
219 std::shared_ptr<EDLooperBase> vLooper;
221 assert(1 == loopers.size());
223 for (
auto const& looperName : loopers) {
236 std::vector<std::string>
const& defaultServices,
237 std::vector<std::string>
const& forcedServices)
240 branchIDListHelper_(),
243 espController_(new eventsetup::EventSetupsController),
246 processConfiguration_(),
252 deferredExceptionPtrIsSet_(
false),
256 beginJobCalled_(
false),
257 shouldWeStop_(
false),
258 fileModeNoMerge_(
false),
259 exceptionMessageFiles_(),
260 exceptionMessageRuns_(),
261 exceptionMessageLumis_(
false),
262 forceLooperToEnd_(
false),
263 looperBeginJobRun_(
false),
264 forceESCacheClearOnNewRun_(
false),
265 eventSetupDataToExcludeFromPrefetching_() {
267 processDesc->addServices(defaultServices, forcedServices);
268 init(processDesc, iToken, iLegacy);
272 std::vector<std::string>
const& defaultServices,
273 std::vector<std::string>
const& forcedServices)
276 branchIDListHelper_(),
279 espController_(new eventsetup::EventSetupsController),
282 processConfiguration_(),
288 deferredExceptionPtrIsSet_(
false),
292 beginJobCalled_(
false),
293 shouldWeStop_(
false),
294 fileModeNoMerge_(
false),
295 exceptionMessageFiles_(),
296 exceptionMessageRuns_(),
297 exceptionMessageLumis_(
false),
298 forceLooperToEnd_(
false),
299 looperBeginJobRun_(
false),
300 forceESCacheClearOnNewRun_(
false),
301 asyncStopRequestedWhileProcessingEvents_(
false),
302 eventSetupDataToExcludeFromPrefetching_() {
304 processDesc->addServices(defaultServices, forcedServices);
313 branchIDListHelper_(),
316 espController_(new eventsetup::EventSetupsController),
319 processConfiguration_(),
325 deferredExceptionPtrIsSet_(
false),
329 beginJobCalled_(
false),
330 shouldWeStop_(
false),
331 fileModeNoMerge_(
false),
332 exceptionMessageFiles_(),
333 exceptionMessageRuns_(),
334 exceptionMessageLumis_(
false),
335 forceLooperToEnd_(
false),
336 looperBeginJobRun_(
false),
337 forceESCacheClearOnNewRun_(
false),
338 asyncStopRequestedWhileProcessingEvents_(
false),
339 eventSetupDataToExcludeFromPrefetching_() {
354 std::shared_ptr<ParameterSet>
parameterSet = processDesc->getProcessPSet();
358 bool const hasSubProcesses = !subProcessVParameterSet.empty();
371 <<
"Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
379 unsigned int nThreads = optionsPset.getUntrackedParameter<
unsigned int>(
"numberOfThreads");
385 unsigned int nStreams = optionsPset.getUntrackedParameter<
unsigned int>(
"numberOfStreams");
389 unsigned int nConcurrentRuns = optionsPset.getUntrackedParameter<
unsigned int>(
"numberOfConcurrentRuns");
390 if (nConcurrentRuns != 1) {
392 <<
"Although the plan is to change this in the future, currently nConcurrentRuns must always be 1.\n";
394 unsigned int nConcurrentLumis =
395 optionsPset.getUntrackedParameter<
unsigned int>(
"numberOfConcurrentLuminosityBlocks");
396 if (nConcurrentLumis == 0) {
397 nConcurrentLumis = 2;
399 if (nConcurrentLumis > nStreams) {
400 nConcurrentLumis = nStreams;
403 if (!loopers.empty()) {
405 if (nStreams != 1 || nConcurrentLumis != 1 || nConcurrentRuns != 1) {
406 edm::LogWarning(
"ThreadStreamSetup") <<
"There is a looper, so the number of streams, the number " 407 "of concurrent runs, and the number of concurrent lumis " 408 "are all being reset to 1. Loopers cannot currently support " 409 "values greater than 1.";
411 nConcurrentLumis = 1;
415 bool dumpOptions = optionsPset.getUntrackedParameter<
bool>(
"dumpOptions");
419 if (nThreads > 1
or nStreams > 1) {
420 edm::LogInfo(
"ThreadStreamSetup") <<
"setting # threads " << nThreads <<
"\nsetting # streams " << nStreams;
425 unsigned int maxConcurrentIOVs = nConcurrentLumis;
443 optionsPset.getUntrackedParameter<
bool>(
"deleteNonConsumedUnscheduledModules");
446 if (not hasSubProcesses) {
454 auto& serviceSets = processDesc->getServicesPSets();
464 handler->willBeUsingThreads();
471 ParameterSet const& eventSetupPset(optionsPset.getUntrackedParameterSet(
"eventSetup"));
473 *
parameterSet,
items.actReg_.get(), &eventSetupPset, maxConcurrentIOVs, dumpOptions);
476 if (!loopers.empty()) {
487 lumiQueue_ = std::make_unique<LimitedTaskQueue>(nConcurrentLumis);
497 items.branchIDListHelper(),
499 items.thinnedAssociationsHelper(),
501 items.processConfiguration(),
524 auto ep = std::make_shared<EventPrincipal>(
preg(),
551 for (
auto& subProcessPSet : subProcessVParameterSet) {
622 std::vector<ModuleProcessName> consumedBySubProcesses;
625 auto c = subProcess.keepOnlyConsumedUnscheduledModules(deleteModules);
626 if (consumedBySubProcesses.empty()) {
628 }
else if (not
c.empty()) {
629 std::vector<ModuleProcessName>
tmp;
630 tmp.reserve(consumedBySubProcesses.size() +
c.size());
632 consumedBySubProcesses.end(),
635 std::back_inserter(
tmp));
644 not unusedModules.empty()) {
647 edm::LogInfo(
"DeleteModules").log([&unusedModules](
auto&
l) {
648 l <<
"Following modules are not in any Path or EndPath, nor is their output consumed by any other module, " 650 "therefore they are deleted before beginJob transition.";
692 ex.
addContext(
"Calling beginJob for the source");
698 constexpr
bool mustPrefetchMayGet =
true;
700 auto const runLookup =
preg_->productLookup(
InRun);
701 auto const lumiLookup =
preg_->productLookup(
InLumi);
704 looper_->updateLookup(
InRun, *runLookup, mustPrefetchMayGet);
705 looper_->updateLookup(
InLumi, *lumiLookup, mustPrefetchMayGet);
707 looper_->updateLookup(
esp_->recordsToProxyIndices());
711 actReg_->postBeginJobSignal_();
714 oneapi::tbb::task_group
group;
716 first([
this](
auto nextTask) {
718 first([
i,
this](
auto nextTask) {
728 if (
last.exceptionPtr()) {
729 std::rethrow_exception(*
last.exceptionPtr());
736 "Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
744 oneapi::tbb::task_group
group;
751 first([
this,
i, &
c, &collectorMutex](
auto nextTask) {
752 std::exception_ptr
ep;
757 ep = std::current_exception();
760 std::lock_guard<std::mutex>
l(collectorMutex);
761 c.call([&
ep]() { std::rethrow_exception(
ep); });
763 }) |
then([
this,
i, &
c, &collectorMutex](
auto nextTask) {
765 first([
this,
i, &
c, &collectorMutex, &subProcess](
auto nextTask) {
766 std::exception_ptr
ep;
769 subProcess.doEndStream(
i);
771 ep = std::current_exception();
774 std::lock_guard<std::mutex>
l(collectorMutex);
775 c.call([&
ep]() { std::rethrow_exception(
ep); });
785 c.call([actReg]() { actReg->preEndJobSignal_(); });
794 c.call([actReg]() { actReg->postEndJobSignal_(); });
803 return schedule_->getAllModuleDescriptions();
815 #include "TransitionProcessors.icc" 819 bool returnValue =
false;
835 SendSourceTerminationSignalIfException sentry(
actReg_.get());
839 itemType =
input_->nextItemType();
843 sentry.completedSuccessfully();
856 return std::make_pair(
input_->reducedProcessHistoryID(),
input_->run());
875 bool firstTime =
true;
885 auto trans =
fp.processFiles(*
this);
896 throw cms::Exception(
"BadTransition") <<
"Unexpected transition change " << trans;
905 "Another exception was caught while trying to clean up lumis after the primary fatal exception.");
906 e.addAdditionalInfo(message);
907 if (
e.alreadyPrinted()) {
913 if (
e.alreadyPrinted()) {
919 if (
e.alreadyPrinted()) {
931 FDEBUG(1) <<
" \treadFile\n";
933 SendSourceTerminationSignalIfException sentry(
actReg_.get());
938 if (size < preg_->
size()) {
945 sentry.completedSuccessfully();
950 SendSourceTerminationSignalIfException sentry(
actReg_.get());
951 input_->closeFile(
fb_.get(), cleaningUpAfterException);
952 sentry.completedSuccessfully();
954 FDEBUG(1) <<
"\tcloseInputFile\n";
962 FDEBUG(1) <<
"\topenOutputFiles\n";
969 FDEBUG(1) <<
"\tcloseOutputFiles\n";
975 [
this](
auto& subProcess) { subProcess.updateBranchIDListHelper(
branchIDListHelper_->branchIDLists()); });
979 FDEBUG(1) <<
"\trespondToOpenInputFile\n";
987 FDEBUG(1) <<
"\trespondToCloseInputFile\n";
997 FDEBUG(1) <<
"\tstartingNewLoop\n";
1003 looper_->setModuleChanger(&changer);
1005 looper_->setModuleChanger(
nullptr);
1011 FDEBUG(1) <<
"\tendOfLoop\n";
1018 FDEBUG(1) <<
"\trewind\n";
1023 FDEBUG(1) <<
"\tprepareForNextLoop\n";
1027 FDEBUG(1) <<
"\tshouldWeCloseOutput\n";
1030 if (subProcess.shouldWeCloseOutput()) {
1036 return schedule_->shouldWeCloseOutput();
1040 FDEBUG(1) <<
"\tdoErrorStuff\n";
1041 LogError(
"StateMachine") <<
"The EventProcessor state machine encountered an unexpected event\n" 1042 <<
"and went to the error state\n" 1043 <<
"Will attempt to terminate processing normally\n" 1044 <<
"(IF using the looper the next loop will be attempted)\n" 1045 <<
"This likely indicates a bug in an input module or corrupted input or both\n";
1056 beginGlobalTransitionAsync<Traits>(
1061 }
while (not globalWaitTask.
done());
1064 std::rethrow_exception(*(globalWaitTask.
exceptionPtr()));
1066 beginProcessBlockSucceeded =
true;
1070 input_->fillProcessBlockHelper();
1072 while (
input_->nextProcessBlock(processBlockPrincipal)) {
1079 beginGlobalTransitionAsync<Traits>(
1084 }
while (not globalWaitTask.
done());
1086 std::rethrow_exception(*(globalWaitTask.
exceptionPtr()));
1093 }
while (not writeWaitTask.
done());
1117 cleaningUpAfterException);
1120 }
while (not globalWaitTask.
done());
1122 std::rethrow_exception(*(globalWaitTask.
exceptionPtr()));
1125 if (beginProcessBlockSucceeded) {
1130 }
while (not writeWaitTask.
done());
1144 bool& globalBeginSucceeded,
1145 bool& eventSetupForInstanceSucceeded) {
1146 globalBeginSucceeded =
false;
1149 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1152 sentry.completedSuccessfully();
1160 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1161 actReg_->preESSyncIOVSignal_.emit(ts);
1163 actReg_->postESSyncIOVSignal_.emit(ts);
1164 eventSetupForInstanceSucceeded =
true;
1165 sentry.completedSuccessfully();
1167 auto const& es =
esp_->eventSetupImpl();
1175 }) |
then([
this, &es](
auto nextTask)
mutable {
1183 }
while (not waitTask.
done());
1193 chain::first([&runPrincipal, &es,
this](
auto waitTask) {
1195 beginGlobalTransitionAsync<Traits>(
1197 }) |
then([&globalBeginSucceeded,
run](
auto waitTask)
mutable {
1198 globalBeginSucceeded =
true;
1199 FDEBUG(1) <<
"\tbeginRun " <<
run <<
"\n";
1200 }) |
ifThen(
looper_, [
this, &runPrincipal, &es](
auto waitTask) {
1202 }) |
ifThen(
looper_, [
this, &runPrincipal, &es](
auto waitTask) {
1208 }
while (not globalWaitTask.
done());
1210 std::rethrow_exception(*(globalWaitTask.
exceptionPtr()));
1228 }
while (not streamLoopWaitTask.
done());
1230 std::rethrow_exception(*(streamLoopWaitTask.
exceptionPtr()));
1233 FDEBUG(1) <<
"\tstreamBeginRun " <<
run <<
"\n";
1241 bool globalBeginSucceeded,
1242 bool cleaningUpAfterException,
1243 bool eventSetupForInstanceSucceeded) {
1244 if (eventSetupForInstanceSucceeded) {
1246 endRun(phid,
run, globalBeginSucceeded, cleaningUpAfterException);
1248 if (globalBeginSucceeded) {
1257 }
while (not
t.done());
1259 if (
t.exceptionPtr()) {
1260 std::rethrow_exception(*
t.exceptionPtr());
1270 bool globalBeginSucceeded,
1271 bool cleaningUpAfterException) {
1279 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1280 actReg_->preESSyncIOVSignal_.emit(ts);
1282 actReg_->postESSyncIOVSignal_.emit(ts);
1283 sentry.completedSuccessfully();
1285 auto const& es =
esp_->eventSetupImpl();
1286 if (globalBeginSucceeded) {
1299 cleaningUpAfterException);
1302 }
while (not streamLoopWaitTask.
done());
1304 std::rethrow_exception(*(streamLoopWaitTask.
exceptionPtr()));
1307 FDEBUG(1) <<
"\tstreamEndRun " <<
run <<
"\n";
1315 chain::first([
this, &runPrincipal, &es, cleaningUpAfterException](
auto nextTask) {
1318 endGlobalTransitionAsync<Traits>(
1320 }) |
ifThen(
looper_, [
this, &runPrincipal, &es](
auto nextTask) {
1322 }) |
ifThen(
looper_, [
this, &runPrincipal, &es](
auto nextTask) {
1328 }
while (not globalWaitTask.
done());
1330 std::rethrow_exception(*(globalWaitTask.
exceptionPtr()));
1333 FDEBUG(1) <<
"\tendRun " <<
run <<
"\n";
1344 input_->luminosityBlockAuxiliary()->beginTime()),
1350 }
while (not waitTask.
done());
1359 std::shared_ptr<void>
const& iRunResource,
1365 actReg_->esSyncIOVQueuingSignal_.emit(iSync);
1378 auto* espController,
1385 SendSourceTerminationSignalIfException sentry(actReg);
1391 espController->eventSetupForInstanceAsync(
1393 sentry.completedSuccessfully();
1395 task.doneWaiting(std::current_exception());
1402 auto group = nextTask.group();
1412 }) |
chain::then([
this,
status, iSync](std::exception_ptr
const* iPtr,
auto nextTask) {
1413 actReg_->postESSyncIOVSignal_.emit(iSync);
1415 auto copyTask = nextTask;
1417 nextTask.doneWaiting(*iPtr);
1419 auto group = copyTask.group();
1422 if (
task.taskHasFailed()) {
1423 status->resetResources();
1440 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1443 sentry.completedSuccessfully();
1449 rng->preBeginLumi(lb);
1460 beginGlobalTransitionAsync<Traits>(
1467 status->globalBeginDidSucceed();
1471 }) |
then([
this,
status](std::exception_ptr
const* iPtr,
auto holder)
mutable {
1473 status->resetResources();
1474 holder.doneWaiting(*iPtr);
1477 status->globalBeginDidSucceed();
1490 auto eventSetupImpls = &
status->eventSetupImpls();
1491 auto lp =
status->lumiPrincipal().get();
1494 event.setLuminosityBlockPrincipal(lp);
1498 beginStreamTransitionAsync<Traits>(
1500 }) |
then([
this,
i](std::exception_ptr
const* exceptionFromBeginStreamLumi,
auto nextTask) {
1501 if (exceptionFromBeginStreamLumi) {
1503 tmp.doneWaiting(*exceptionFromBeginStreamLumi);
1515 status->resetResources();
1516 postQueueTask.doneWaiting(std::current_exception());
1527 status->needToContinueLumi();
1528 status->startProcessingEvents();
1531 unsigned int streamIndex = 0;
1532 oneapi::tbb::task_arena arena{oneapi::tbb::task_arena::attach()};
1536 iHolder.
group()->run(
1543 tmp.doneWaiting(*iPtr);
1550 std::shared_ptr<LuminosityBlockProcessingStatus> iLumiStatus) {
1553 auto& lp = *(iLumiStatus->lumiPrincipal());
1554 bool didGlobalBeginSucceed = iLumiStatus->didGlobalBeginSucceed();
1555 bool cleaningUpAfterException = iLumiStatus->cleaningUpAfterException();
1557 std::vector<std::shared_ptr<const EventSetupImpl>>
const* eventSetupImpls = &iLumiStatus->eventSetupImpls();
1560 chain::first([
this, &lp, &es, &eventSetupImpls, cleaningUpAfterException](
auto nextTask) {
1565 endGlobalTransitionAsync<Traits>(
1567 }) |
then([
this, didGlobalBeginSucceed, &lumiPrincipal = lp](
auto nextTask) {
1569 if (didGlobalBeginSucceed) {
1578 }) |
then([
status =
std::move(iLumiStatus),
this](std::exception_ptr
const* iPtr,
auto nextTask)
mutable {
1579 std::exception_ptr ptr;
1591 ptr = std::current_exception();
1596 status->resumeGlobalLumiQueue();
1600 ptr = std::current_exception();
1608 status->resetResources();
1612 ptr = std::current_exception();
1634 if (
status->streamFinishedLumi()) {
1644 lumiStatus->setEndTime();
1648 bool cleaningUpAfterException = lumiStatus->cleaningUpAfterException();
1649 auto eventSetupImpls = &lumiStatus->eventSetupImpls();
1651 if (lumiStatus->didGlobalBeginSucceed()) {
1652 auto& lumiPrincipal = *lumiStatus->lumiPrincipal();
1654 lumiPrincipal.endTime());
1657 endStreamTransitionAsync<Traits>(
std::move(lumiDoneTask),
1663 cleaningUpAfterException);
1680 }
while (not globalWaitTask.
done());
1682 std::rethrow_exception(*(globalWaitTask.
exceptionPtr()));
1688 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1689 input_->readProcessBlock(processBlockPrincipal);
1690 sentry.completedSuccessfully();
1696 <<
"Illegal attempt to insert run into cache\n" 1697 <<
"Contact a Framework Developer\n";
1699 auto rp = std::make_shared<RunPrincipal>(
input_->runAuxiliary(),
1707 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1709 sentry.completedSuccessfully();
1711 assert(
input_->reducedProcessHistoryID() == rp->reducedProcessHistoryID());
1713 return std::make_pair(rp->reducedProcessHistoryID(),
input_->run());
1720 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1721 input_->readAndMergeRun(*runPrincipal);
1722 sentry.completedSuccessfully();
1724 assert(
input_->reducedProcessHistoryID() == runPrincipal->reducedProcessHistoryID());
1725 return std::make_pair(runPrincipal->reducedProcessHistoryID(),
input_->run());
1731 <<
"Illegal attempt to insert lumi into cache\n" 1732 <<
"Run is invalid\n" 1733 <<
"Contact a Framework Developer\n";
1737 lbp->setAux(*
input_->luminosityBlockAuxiliary());
1739 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1741 sentry.completedSuccessfully();
1749 assert(lumiPrincipal.aux().sameIdentity(*
input_->luminosityBlockAuxiliary())
or 1750 input_->processHistoryRegistry().reducedProcessHistoryID(lumiPrincipal.aux().processHistoryID()) ==
1751 input_->processHistoryRegistry().reducedProcessHistoryID(
1752 input_->luminosityBlockAuxiliary()->processHistoryID()));
1753 bool lumiOK = lumiPrincipal.adjustToNewProductRegistry(*
preg());
1755 lumiPrincipal.mergeAuxiliary(*
input_->luminosityBlockAuxiliary());
1757 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1759 sentry.completedSuccessfully();
1761 return input_->luminosityBlock();
1773 s.writeProcessBlockAsync(nextTask, processBlockType);
1789 mergeableRunProductMetadata);
1793 s.writeRunAsync(nextTask, phid,
run, mergeableRunProductMetadata);
1801 FDEBUG(1) <<
"\tdeleteRunFromCache " <<
run <<
"\n";
1815 s.writeLumiAsync(nextTask, lumiPrincipal);
1853 std::lock_guard<std::recursive_mutex> guard(*(
sourceMutex_.get()));
1865 input_->luminosityBlockAuxiliary()->beginTime()));
1882 bool expected =
false;
1901 auto recursionTask =
make_waiting_task([
this, iTask, iStreamIndex](std::exception_ptr
const* iPtr)
mutable {
1905 bool expected =
false;
1923 if (
status->isLumiEnding()) {
1939 bool expected =
false;
1941 auto e = std::current_exception();
1954 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1958 sentry.completedSuccessfully();
1960 FDEBUG(1) <<
"\treadEvent\n";
1974 rng->postEventRead(
ev);
1979 chain::first([
this, &es, pep, iStreamIndex](
auto nextTask) {
1984 subProcess.doEventAsync(nextTask, *pep, &
streamLumiStatus_[iStreamIndex]->eventSetupImpls());
1986 }) |
ifThen(
looper_, [
this, iStreamIndex, pep](
auto nextTask) {
1990 }) |
then([pep](
auto nextTask) {
1991 FDEBUG(1) <<
"\tprocessEvent\n";
1992 pep->clearEventPrincipal();
1997 bool randomAccess =
input_->randomAccess();
2006 status =
looper_->doDuringLoop(iPrincipal, es, pc, &streamContext);
2025 FDEBUG(1) <<
"\tshouldWeStop\n";
2030 if (subProcess.terminate()) {
2046 bool expected =
false;
2056 ex <<
"The framework is configured to use at least two streams, but the following modules\n" 2057 <<
"require synchronizing on LuminosityBlock boundaries:";
2059 for (
auto worker :
schedule_->allWorkers()) {
2060 if (worker->wantsGlobalLuminosityBlocks() and worker->globalLuminosityBlocksQueue()) {
2062 ex <<
"\n " << worker->description()->moduleName() <<
" " << worker->description()->moduleLabel();
2066 ex <<
"\n\nThe situation can be fixed by either\n" 2067 <<
" * modifying the modules to support concurrent LuminosityBlocks (preferred), or\n" 2068 <<
" * setting 'process.options.numberOfConcurrentLuminosityBlocks = 1' in the configuration file";
2074 std::unique_ptr<LogSystem>
s;
2075 for (
auto worker :
schedule_->allWorkers()) {
2078 s = std::make_unique<LogSystem>(
"LegacyModules");
2079 (*s) <<
"The following legacy modules are configured. Support for legacy modules\n" 2080 "is going to end soon. These modules need to be converted to have type\n" 2081 "edm::global, edm::stream, edm::one, or in rare cases edm::limited.";
2083 (*s) <<
"\n " << worker->description()->moduleName() <<
" " << worker->description()->moduleLabel();
LuminosityBlockNumber_t luminosityBlock() const
std::atomic< bool > exceptionMessageLumis_
void readLuminosityBlock(LuminosityBlockProcessingStatus &)
void readEvent(unsigned int iStreamIndex)
ProcessContext processContext_
bool readNextEventForStream(unsigned int iStreamIndex, LuminosityBlockProcessingStatus &iLumiStatus)
void setProcessesWithMergeableRunProducts(ProductRegistry const &productRegistry)
T getParameter(std::string const &) const
void clear()
Not thread safe.
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
void deleteRunFromCache(ProcessHistoryID const &phid, RunNumber_t run)
SharedResourcesAcquirer sourceResourcesAcquirer_
Timestamp const & endTime() const
int totalEventsFailed() const
InputSource::ItemType nextTransitionType()
std::shared_ptr< ProductRegistry const > preg() const
void warnAboutLegacyModules() const
void processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
def create(alignables, pedeDump, additionalData, outputFile, config)
bool continuingLumi() const
edm::propagate_const< std::unique_ptr< InputSource > > input_
void globalEndLumiAsync(edm::WaitingTaskHolder iTask, std::shared_ptr< LuminosityBlockProcessingStatus > iLumiStatus)
std::unique_ptr< ExceptionToActionTable const > act_table_
PreESSyncIOV preESSyncIOVSignal_
static PFTauRenderPlugin instance
void beginRun(ProcessHistoryID const &phid, RunNumber_t run, bool &globalBeginSucceeded, bool &eventSetupForInstanceSucceeded)
std::exception_ptr const * exceptionPtr() const
Returns exception thrown by dependent task.
void setExceptionMessageFiles(std::string &message)
RunPrincipal const & runPrincipal() const
void processEventAsyncImpl(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
void processEventWithLooper(EventPrincipal &, unsigned int iStreamIndex)
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
std::unique_ptr< edm::LimitedTaskQueue > lumiQueue_
volatile std::atomic< bool > shutdown_flag
static LuminosityBlockNumber_t maxLuminosityBlockNumber()
void clearCounters()
Clears counters used by trigger report.
ProcessBlockPrincipal & inputProcessBlockPrincipal() const
void push(oneapi::tbb::task_group &, const T &iAction)
asynchronously pushes functor iAction into queue
bool checkForAsyncStopRequest(StatusCode &)
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
void writeRunAsync(WaitingTaskHolder, ProcessHistoryID const &phid, RunNumber_t run, MergeableRunProductMetadata const *)
std::shared_ptr< EDLooperBase const > looper() const
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const
void ensureAvailableAccelerators(edm::ParameterSet const ¶meterSet)
ParameterSet getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
InputSource::ItemType lastTransitionType() const
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
Log< level::Error, false > LogError
void adjustIndexesAfterProductRegistryAddition()
bool hasRunPrincipal() const
std::string exceptionMessageRuns_
void deleteLumiFromCache(LuminosityBlockProcessingStatus &)
StreamID streamID() const
ParameterSet const & parameterSet(StableProvenance const &provenance, ProcessHistory const &history)
PreallocationConfiguration preallocations_
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
void writeLumiAsync(WaitingTaskHolder, LuminosityBlockPrincipal &lumiPrincipal)
constexpr auto then(O &&iO)
unsigned int LuminosityBlockNumber_t
std::vector< SubProcess > subProcesses_
static unsigned int getUniqueID()
Returns a unique id each time called. Intended to be passed to ModuleDescription's constructor's modI...
void swap(Association< C > &lhs, Association< C > &rhs)
edm::SerialTaskQueue queueWhichWaitsForIOVsToFinish_
void validateTopLevelParameterSets(ParameterSet *processParameterSet)
MergeableRunProductProcesses mergeableRunProductProcesses_
static std::string const input
void synchronousEventSetupForInstance(IOVSyncValue const &syncValue, oneapi::tbb::task_group &iGroup, eventsetup::EventSetupsController &espController)
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
void handleNextEventForStreamAsync(WaitingTaskHolder iTask, unsigned int iStreamIndex)
void fillProcessBlockPrincipal(std::string const &processName, DelayedReader *reader=nullptr)
void setEndTime(Timestamp const &time)
ProcessBlockPrincipal & processBlockPrincipal() const
void endProcessBlock(bool cleaningUpAfterException, bool beginProcessBlockSucceeded)
U second(std::pair< T, U > const &p)
std::shared_ptr< LuminosityBlockPrincipal > & lumiPrincipal()
oneapi::tbb::task_group * group() const noexcept
void emit(Args &&... args) const
unsigned int numberOfThreads() const
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
ServiceToken serviceToken_
ParameterSetID id() const
std::atomic< bool > deferredExceptionPtrIsSet_
Timestamp const & beginTime() const
bool resume()
Resumes processing if the queue was paused.
void doneWaiting(std::exception_ptr iExcept)
ParameterSet const & registerIt()
std::shared_ptr< EDLooperBase > fillLooper(eventsetup::EventSetupsController &esController, eventsetup::EventSetupProvider &cp, ParameterSet ¶ms, std::vector< std::string > const &loopers)
std::vector< edm::SerialTaskQueue > streamQueues_
auto runLast(edm::WaitingTaskHolder iTask)
void setExceptionMessageRuns(std::string &message)
void validateLooper(ParameterSet &pset)
bool taskHasFailed() const noexcept
void swap(edm::DataFrameContainer &lhs, edm::DataFrameContainer &rhs)
ShouldWriteRun shouldWriteRun() const
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
void endRun(ProcessHistoryID const &phid, RunNumber_t run, bool globalBeginSucceeded, bool cleaningUpAfterException)
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
SerialTaskQueueChain & serialQueueChain() const
static void setThrowAnException(bool v)
void writeProcessBlockAsync(WaitingTaskHolder, ProcessBlockType)
void setLastOperationSucceeded(bool value)
unsigned int numberOfStreams() const
void initialize(Schedule const *, std::shared_ptr< ProductRegistry const >)
std::vector< ModuleDescription const * > nonConsumedUnscheduledModules(edm::PathsAndConsumesOfModulesBase const &iPnC, std::vector< ModuleProcessName > &consumedByChildren)
void clear()
Not thread safe.
void continueLumiAsync(edm::WaitingTaskHolder iHolder)
StatusCode runToCompletion()
FunctorWaitingTask< F > * make_waiting_task(F f)
void endUnfinishedRun(ProcessHistoryID const &phid, RunNumber_t run, bool globalBeginSucceeded, bool cleaningUpAfterException, bool eventSetupForInstanceSucceeded)
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
bool lastOperationSucceeded() const
unsigned int numberOfRuns() const
InputSource::ItemType processLumis(std::shared_ptr< void > const &iRunResource)
void insert(std::unique_ptr< ProcessBlockPrincipal >)
std::string exceptionMessageFiles_
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
void respondToCloseInputFile()
InputSource::ItemType lastSourceTransition_
Log< level::Info, false > LogInfo
bool wasEventProcessingStopped() const
std::vector< ParameterSet > popSubProcessVParameterSet(ParameterSet ¶meterSet)
void setProcessHistoryRegistry(ProcessHistoryRegistry const &phr)
StatusCode asyncStopStatusCodeFromProcessingEvents_
void checkForModuleDependencyCorrectness(edm::PathsAndConsumesOfModulesBase const &iPnC, bool iPrintDependencies)
void respondToOpenInputFile()
unsigned int numberOfLuminosityBlocks() const
MergeableRunProductMetadata * mergeableRunProductMetadata()
int readAndMergeLumi(LuminosityBlockProcessingStatus &)
void merge(std::shared_ptr< RunAuxiliary > aux, std::shared_ptr< ProductRegistry const > reg)
oneapi::tbb::task_group taskGroup_
std::unique_ptr< InputSource > makeInput(ParameterSet ¶ms, CommonParams const &common, std::shared_ptr< ProductRegistry > preg, std::shared_ptr< BranchIDListHelper > branchIDListHelper, std::shared_ptr< ProcessBlockHelper > const &processBlockHelper, std::shared_ptr< ThinnedAssociationsHelper > thinnedAssociationsHelper, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, PreallocationConfiguration const &allocations)
void deleteRun(ProcessHistoryID const &phid, RunNumber_t run)
void throwAboutModulesRequiringLuminosityBlockSynchronization() const
void insertForInput(std::unique_ptr< ProcessBlockPrincipal >)
void addContext(std::string const &context)
void stopProcessingEvents()
static EventNumber_t maxEventNumber()
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
ShouldWriteLumi shouldWriteLumi() const
edm::EventID specifiedEventTransition() const
bool forceESCacheClearOnNewRun_
void beginLumiAsync(edm::IOVSyncValue const &iSyncValue, std::shared_ptr< void > const &iRunResource, edm::WaitingTaskHolder iHolder)
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
bool shouldWeStop() const
void adjustEventsToNewProductRegistry(std::shared_ptr< ProductRegistry const > reg)
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
void handleEndLumiExceptions(std::exception_ptr const *iPtr, WaitingTaskHolder &holder)
std::shared_ptr< RunPrincipal > const & runPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run) const
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
std::vector< std::string > branchesToDeleteEarly_
void streamEndLumiAsync(edm::WaitingTaskHolder iTask, unsigned int iStreamIndex)
void closeInputFile(bool cleaningUpAfterException)
void readProcessBlock(ProcessBlockPrincipal &)
static ComponentFactory< T > const * get()
std::exception_ptr deferredExceptionPtr_
void removeModules(std::vector< ModuleDescription const *> const &modules)
auto lastTask(edm::WaitingTaskHolder iTask)
bool shouldWeCloseOutput() const
PathsAndConsumesOfModules pathsAndConsumesOfModules_
edm::LuminosityBlockNumber_t nextLuminosityBlockID()
int totalEventsPassed() const
edm::propagate_const< std::shared_ptr< ProcessBlockHelper > > processBlockHelper_
Transition requestedTransition() const
Log< level::System, true > LogAbsolute
void setNextSyncValue(IOVSyncValue const &iValue)
void prepareForNextLoop()
std::shared_ptr< LuminosityBlockPrincipal > getAvailableLumiPrincipalPtr()
auto wrap(F iFunc) -> decltype(iFunc())
bool asyncStopRequestedWhileProcessingEvents_
std::shared_ptr< ActivityRegistry > actReg_
std::atomic< unsigned int > streamLumiActive_
Log< level::Warning, false > LogWarning
void beginProcessBlock(bool &beginProcessBlockSucceeded)
The Signals That Services Can Subscribe To This is based on ActivityRegistry h
Helper function to determine trigger accepts.
T first(std::pair< T, U > const &p)
std::pair< ProcessHistoryID, RunNumber_t > readRun()
static ParentageRegistry * instance()
void setExceptionMessageLumis()
bool setDeferredException(std::exception_ptr)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
bool deleteNonConsumedUnscheduledModules_
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
std::pair< ProcessHistoryID, RunNumber_t > readAndMergeRun()
void inputProcessBlocks()
bool insertMapped(value_type const &v)
static Registry * instance()
PrincipalCache principalCache_
void push(oneapi::tbb::task_group &iGroup, T &&iAction)
asynchronously pushes functor iAction into queue
EventProcessor(std::unique_ptr< ParameterSet > parameterSet, ServiceToken const &token=ServiceToken(), serviceregistry::ServiceLegacy=serviceregistry::kOverlapIsError, std::vector< std::string > const &defaultServices=std::vector< std::string >(), std::vector< std::string > const &forcedServices=std::vector< std::string >())
void dumpOptionsToLogFile(unsigned int nThreads, unsigned int nStreams, unsigned int nConcurrentLumis, unsigned int nConcurrentRuns)
std::pair< edm::ProcessHistoryID, edm::RunNumber_t > nextRunID()
def merge(dictlist, TELL=False)