81 #include "boost/range/adaptor/reversed.hpp" 93 #include "oneapi/tbb/task.h" 105 class SendSourceTerminationSignalIfException {
108 ~SendSourceTerminationSignalIfException() {
113 void completedSuccessfully() { reg_ =
nullptr; }
122 namespace chain = waiting_task::chain;
125 std::unique_ptr<InputSource>
makeInput(
unsigned int moduleIndex,
128 std::shared_ptr<ProductRegistry> preg,
129 std::shared_ptr<BranchIDListHelper> branchIDListHelper,
130 std::shared_ptr<ProcessBlockHelper>
const& processBlockHelper,
131 std::shared_ptr<ThinnedAssociationsHelper> thinnedAssociationsHelper,
132 std::shared_ptr<ActivityRegistry> areg,
133 std::shared_ptr<ProcessConfiguration const> processConfiguration,
136 if (main_input ==
nullptr) {
138 <<
"There must be exactly one source in the configuration.\n" 139 <<
"It is missing (or there are sufficient syntax errors such that it is not recognized as the source)\n";
144 std::unique_ptr<ParameterSetDescriptionFillerBase>
filler(
147 filler->fill(descriptions);
152 std::ostringstream ost;
153 ost <<
"Validating configuration of input source of type " << modtype;
169 processConfiguration.get(),
176 thinnedAssociationsHelper,
180 common.maxSecondsUntilRampdown_,
183 areg->preSourceConstructionSignal_(md);
184 std::unique_ptr<InputSource>
input;
187 std::shared_ptr<int> sentry(
nullptr, [areg, &md](
void*) { areg->postSourceConstructionSignal_(md); });
190 input->preEventReadFromSourceSignal_.connect(std::cref(areg->preEventReadFromSourceSignal_));
191 input->postEventReadFromSourceSignal_.connect(std::cref(areg->postEventReadFromSourceSignal_));
194 std::ostringstream ost;
195 ost <<
"Constructing input source of type " << modtype;
208 filler->fill(descriptions);
221 std::vector<std::string>
const& loopers) {
222 std::shared_ptr<EDLooperBase> vLooper;
224 assert(1 == loopers.size());
226 for (
auto const& looperName : loopers) {
239 std::vector<std::string>
const& defaultServices,
240 std::vector<std::string>
const& forcedServices)
243 branchIDListHelper_(),
246 espController_(new eventsetup::EventSetupsController),
249 processConfiguration_(),
255 deferredExceptionPtrIsSet_(
false),
259 beginJobCalled_(
false),
260 shouldWeStop_(
false),
261 fileModeNoMerge_(
false),
262 exceptionMessageFiles_(),
263 exceptionMessageRuns_(
false),
264 exceptionMessageLumis_(
false),
265 forceLooperToEnd_(
false),
266 looperBeginJobRun_(
false),
267 forceESCacheClearOnNewRun_(
false),
268 eventSetupDataToExcludeFromPrefetching_() {
270 processDesc->addServices(defaultServices, forcedServices);
271 init(processDesc, iToken, iLegacy);
275 std::vector<std::string>
const& defaultServices,
276 std::vector<std::string>
const& forcedServices)
279 branchIDListHelper_(),
282 espController_(new eventsetup::EventSetupsController),
285 processConfiguration_(),
291 deferredExceptionPtrIsSet_(
false),
295 beginJobCalled_(
false),
296 shouldWeStop_(
false),
297 fileModeNoMerge_(
false),
298 exceptionMessageFiles_(),
299 exceptionMessageRuns_(
false),
300 exceptionMessageLumis_(
false),
301 forceLooperToEnd_(
false),
302 looperBeginJobRun_(
false),
303 forceESCacheClearOnNewRun_(
false),
304 eventSetupDataToExcludeFromPrefetching_() {
306 processDesc->addServices(defaultServices, forcedServices);
315 branchIDListHelper_(),
318 espController_(new eventsetup::EventSetupsController),
321 processConfiguration_(),
327 deferredExceptionPtrIsSet_(
false),
331 beginJobCalled_(
false),
332 shouldWeStop_(
false),
333 fileModeNoMerge_(
false),
334 exceptionMessageFiles_(),
335 exceptionMessageRuns_(
false),
336 exceptionMessageLumis_(
false),
337 forceLooperToEnd_(
false),
338 looperBeginJobRun_(
false),
339 forceESCacheClearOnNewRun_(
false),
340 eventSetupDataToExcludeFromPrefetching_() {
355 std::shared_ptr<ParameterSet>
parameterSet = processDesc->getProcessPSet();
359 bool const hasSubProcesses = !subProcessVParameterSet.empty();
372 <<
"Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
380 unsigned int nThreads = optionsPset.getUntrackedParameter<
unsigned int>(
"numberOfThreads");
386 unsigned int nStreams = optionsPset.getUntrackedParameter<
unsigned int>(
"numberOfStreams");
390 unsigned int nConcurrentRuns = optionsPset.getUntrackedParameter<
unsigned int>(
"numberOfConcurrentRuns");
391 if (nConcurrentRuns != 1) {
393 <<
"Although the plan is to change this in the future, currently nConcurrentRuns must always be 1.\n";
395 unsigned int nConcurrentLumis =
396 optionsPset.getUntrackedParameter<
unsigned int>(
"numberOfConcurrentLuminosityBlocks");
397 if (nConcurrentLumis == 0) {
398 nConcurrentLumis = 2;
400 if (nConcurrentLumis > nStreams) {
401 nConcurrentLumis = nStreams;
404 if (!loopers.empty()) {
406 if (nStreams != 1 || nConcurrentLumis != 1 || nConcurrentRuns != 1) {
407 edm::LogWarning(
"ThreadStreamSetup") <<
"There is a looper, so the number of streams, the number " 408 "of concurrent runs, and the number of concurrent lumis " 409 "are all being reset to 1. Loopers cannot currently support " 410 "values greater than 1.";
412 nConcurrentLumis = 1;
416 bool dumpOptions = optionsPset.getUntrackedParameter<
bool>(
"dumpOptions");
420 if (nThreads > 1
or nStreams > 1) {
421 edm::LogInfo(
"ThreadStreamSetup") <<
"setting # threads " << nThreads <<
"\nsetting # streams " << nStreams;
426 unsigned int maxConcurrentIOVs = nConcurrentLumis;
444 optionsPset.getUntrackedParameter<
bool>(
"deleteNonConsumedUnscheduledModules");
447 if (not hasSubProcesses) {
455 auto& serviceSets = processDesc->getServicesPSets();
465 handler->willBeUsingThreads();
472 ParameterSet const& eventSetupPset(optionsPset.getUntrackedParameterSet(
"eventSetup"));
477 if (!loopers.empty()) {
488 lumiQueue_ = std::make_unique<LimitedTaskQueue>(nConcurrentLumis);
495 std::optional<ScheduleItems::MadeModules> madeModules;
498 tbb::task_group
group;
501 auto tempReg = std::make_shared<ProductRegistry>();
504 group.run([&,
this]() {
511 group.run([&,
this, tempReg]() {
517 items.branchIDListHelper(),
519 items.thinnedAssociationsHelper(),
521 items.processConfiguration(),
526 items.preg()->addFromInput(*tempReg);
557 auto ep = std::make_shared<EventPrincipal>(
preg(),
584 for (
auto& subProcessPSet : subProcessVParameterSet) {
655 std::vector<ModuleProcessName> consumedBySubProcesses;
658 auto c = subProcess.keepOnlyConsumedUnscheduledModules(deleteModules);
659 if (consumedBySubProcesses.empty()) {
661 }
else if (not
c.empty()) {
662 std::vector<ModuleProcessName>
tmp;
663 tmp.reserve(consumedBySubProcesses.size() +
c.size());
665 consumedBySubProcesses.end(),
668 std::back_inserter(
tmp));
677 not unusedModules.empty()) {
680 edm::LogInfo(
"DeleteModules").log([&unusedModules](
auto&
l) {
681 l <<
"Following modules are not in any Path or EndPath, nor is their output consumed by any other module, " 683 "therefore they are deleted before beginJob transition.";
725 ex.
addContext(
"Calling beginJob for the source");
731 constexpr
bool mustPrefetchMayGet =
true;
733 auto const runLookup =
preg_->productLookup(
InRun);
734 auto const lumiLookup =
preg_->productLookup(
InLumi);
737 looper_->updateLookup(
InRun, *runLookup, mustPrefetchMayGet);
738 looper_->updateLookup(
InLumi, *lumiLookup, mustPrefetchMayGet);
740 looper_->updateLookup(
esp_->recordsToProxyIndices());
744 actReg_->postBeginJobSignal_();
746 oneapi::tbb::task_group
group;
749 first([
this](
auto nextTask) {
751 first([
i,
this](
auto nextTask) {
766 "Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
773 oneapi::tbb::task_group
group;
781 first([
this,
i, &
c, &collectorMutex](
auto nextTask) {
782 std::exception_ptr
ep;
787 ep = std::current_exception();
790 std::lock_guard<std::mutex>
l(collectorMutex);
791 c.call([&
ep]() { std::rethrow_exception(
ep); });
793 }) |
then([
this,
i, &
c, &collectorMutex](
auto nextTask) {
795 first([
this,
i, &
c, &collectorMutex, &subProcess](
auto nextTask) {
796 std::exception_ptr
ep;
799 subProcess.doEndStream(
i);
801 ep = std::current_exception();
804 std::lock_guard<std::mutex>
l(collectorMutex);
805 c.call([&
ep]() { std::rethrow_exception(
ep); });
812 waitTask.waitNoThrow();
815 c.call([actReg]() { actReg->preEndJobSignal_(); });
824 c.call([actReg]() { actReg->postEndJobSignal_(); });
833 return schedule_->getAllModuleDescriptions();
845 #include "TransitionProcessors.icc" 849 bool returnValue =
false;
865 SendSourceTerminationSignalIfException sentry(
actReg_.get());
869 itemType =
input_->nextItemType();
873 sentry.completedSuccessfully();
886 return std::make_pair(
input_->reducedProcessHistoryID(),
input_->run());
901 bool firstTime =
true;
911 auto trans =
fp.processFiles(*
this);
922 throw cms::Exception(
"BadTransition") <<
"Unexpected transition change " << trans;
931 "Another exception was caught while trying to clean up lumis after the primary fatal exception.");
932 e.addAdditionalInfo(message);
933 if (
e.alreadyPrinted()) {
939 "Another exception was caught while trying to clean up runs after the primary fatal exception.");
940 e.addAdditionalInfo(message);
941 if (
e.alreadyPrinted()) {
947 if (
e.alreadyPrinted()) {
957 FDEBUG(1) <<
" \treadFile\n";
959 SendSourceTerminationSignalIfException sentry(
actReg_.get());
964 if (size < preg_->
size()) {
971 sentry.completedSuccessfully();
976 SendSourceTerminationSignalIfException sentry(
actReg_.get());
977 input_->closeFile(
fb_.get(), cleaningUpAfterException);
978 sentry.completedSuccessfully();
980 FDEBUG(1) <<
"\tcloseInputFile\n";
988 FDEBUG(1) <<
"\topenOutputFiles\n";
995 FDEBUG(1) <<
"\tcloseOutputFiles\n";
1001 [
this](
auto& subProcess) { subProcess.updateBranchIDListHelper(
branchIDListHelper_->branchIDLists()); });
1005 FDEBUG(1) <<
"\trespondToOpenInputFile\n";
1013 FDEBUG(1) <<
"\trespondToCloseInputFile\n";
1023 FDEBUG(1) <<
"\tstartingNewLoop\n";
1029 looper_->setModuleChanger(&changer);
1031 looper_->setModuleChanger(
nullptr);
1037 FDEBUG(1) <<
"\tendOfLoop\n";
1044 FDEBUG(1) <<
"\trewind\n";
1049 FDEBUG(1) <<
"\tprepareForNextLoop\n";
1053 FDEBUG(1) <<
"\tshouldWeCloseOutput\n";
1056 if (subProcess.shouldWeCloseOutput()) {
1062 return schedule_->shouldWeCloseOutput();
1066 FDEBUG(1) <<
"\tdoErrorStuff\n";
1067 LogError(
"StateMachine") <<
"The EventProcessor state machine encountered an unexpected event\n" 1068 <<
"and went to the error state\n" 1069 <<
"Will attempt to terminate processing normally\n" 1070 <<
"(IF using the looper the next loop will be attempted)\n" 1071 <<
"This likely indicates a bug in an input module or corrupted input or both\n";
1082 beginGlobalTransitionAsync<Traits>(
1085 globalWaitTask.wait();
1086 beginProcessBlockSucceeded =
true;
1090 input_->fillProcessBlockHelper();
1092 while (
input_->nextProcessBlock(processBlockPrincipal)) {
1099 beginGlobalTransitionAsync<Traits>(
1102 globalWaitTask.wait();
1106 writeWaitTask.wait();
1127 cleaningUpAfterException);
1128 globalWaitTask.wait();
1130 if (beginProcessBlockSucceeded) {
1133 writeWaitTask.wait();
1144 bool& globalBeginSucceeded,
1145 bool& eventSetupForInstanceSucceeded) {
1146 globalBeginSucceeded =
false;
1149 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1152 sentry.completedSuccessfully();
1160 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1161 actReg_->preESSyncIOVSignal_.emit(ts);
1163 actReg_->postESSyncIOVSignal_.emit(ts);
1164 eventSetupForInstanceSucceeded =
true;
1165 sentry.completedSuccessfully();
1167 auto const& es =
esp_->eventSetupImpl();
1175 }) |
then([
this, &es](
auto nextTask)
mutable {
1188 chain::first([&runPrincipal, &es,
this](
auto waitTask) {
1190 beginGlobalTransitionAsync<Traits>(
1192 }) |
then([&globalBeginSucceeded,
run](
auto waitTask)
mutable {
1193 globalBeginSucceeded =
true;
1194 FDEBUG(1) <<
"\tbeginRun " <<
run <<
"\n";
1195 }) |
ifThen(
looper_, [
this, &runPrincipal, &es](
auto waitTask) {
1197 }) |
ifThen(
looper_, [
this, &runPrincipal, &es](
auto waitTask) {
1201 globalWaitTask.wait();
1216 streamLoopWaitTask.wait();
1218 FDEBUG(1) <<
"\tstreamBeginRun " <<
run <<
"\n";
1226 bool globalBeginSucceeded,
1227 bool cleaningUpAfterException,
1228 bool eventSetupForInstanceSucceeded) {
1229 if (eventSetupForInstanceSucceeded) {
1231 endRun(phid,
run, globalBeginSucceeded, cleaningUpAfterException);
1233 if (globalBeginSucceeded) {
1240 auto exceptn =
t.waitNoThrow();
1243 std::rethrow_exception(exceptn);
1253 bool globalBeginSucceeded,
1254 bool cleaningUpAfterException) {
1262 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1263 actReg_->preESSyncIOVSignal_.emit(ts);
1265 actReg_->postESSyncIOVSignal_.emit(ts);
1266 sentry.completedSuccessfully();
1268 auto const& es =
esp_->eventSetupImpl();
1269 if (globalBeginSucceeded) {
1282 cleaningUpAfterException);
1283 streamLoopWaitTask.wait();
1285 FDEBUG(1) <<
"\tstreamEndRun " <<
run <<
"\n";
1293 chain::first([
this, &runPrincipal, &es, cleaningUpAfterException](
auto nextTask) {
1296 endGlobalTransitionAsync<Traits>(
1298 }) |
ifThen(
looper_, [
this, &runPrincipal, &es](
auto nextTask) {
1300 }) |
ifThen(
looper_, [
this, &runPrincipal, &es](
auto nextTask) {
1304 globalWaitTask.wait();
1306 FDEBUG(1) <<
"\tendRun " <<
run <<
"\n";
1317 input_->luminosityBlockAuxiliary()->beginTime()),
1326 std::shared_ptr<void>
const& iRunResource,
1332 actReg_->esSyncIOVQueuingSignal_.emit(iSync);
1345 auto* espController,
1352 SendSourceTerminationSignalIfException sentry(actReg);
1358 espController->eventSetupForInstanceAsync(
1360 sentry.completedSuccessfully();
1362 task.doneWaiting(std::current_exception());
1369 auto group = nextTask.group();
1379 }) |
chain::then([
this,
status, iSync](std::exception_ptr
const* iPtr,
auto nextTask) {
1380 actReg_->postESSyncIOVSignal_.emit(iSync);
1382 auto copyTask = nextTask;
1384 nextTask.doneWaiting(*iPtr);
1386 auto group = copyTask.group();
1389 if (
task.taskHasFailed()) {
1390 status->resetResources();
1407 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1410 sentry.completedSuccessfully();
1416 rng->preBeginLumi(lb);
1427 beginGlobalTransitionAsync<Traits>(
1434 status->globalBeginDidSucceed();
1438 }) |
then([
this,
status](std::exception_ptr
const* iPtr,
auto holder)
mutable {
1440 status->resetResources();
1441 holder.doneWaiting(*iPtr);
1444 status->globalBeginDidSucceed();
1457 auto eventSetupImpls = &
status->eventSetupImpls();
1458 auto lp =
status->lumiPrincipal().get();
1461 event.setLuminosityBlockPrincipal(lp);
1465 beginStreamTransitionAsync<Traits>(
1467 }) |
then([
this,
i](std::exception_ptr
const* exceptionFromBeginStreamLumi,
auto nextTask) {
1468 if (exceptionFromBeginStreamLumi) {
1470 tmp.doneWaiting(*exceptionFromBeginStreamLumi);
1482 status->resetResources();
1483 postQueueTask.doneWaiting(std::current_exception());
1494 status->needToContinueLumi();
1495 status->startProcessingEvents();
1498 unsigned int streamIndex = 0;
1499 oneapi::tbb::task_arena arena{oneapi::tbb::task_arena::attach()};
1503 iHolder.
group()->run(
1510 tmp.doneWaiting(*iPtr);
1517 std::shared_ptr<LuminosityBlockProcessingStatus> iLumiStatus) {
1520 auto& lp = *(iLumiStatus->lumiPrincipal());
1521 bool didGlobalBeginSucceed = iLumiStatus->didGlobalBeginSucceed();
1522 bool cleaningUpAfterException = iLumiStatus->cleaningUpAfterException();
1524 std::vector<std::shared_ptr<const EventSetupImpl>>
const* eventSetupImpls = &iLumiStatus->eventSetupImpls();
1527 chain::first([
this, &lp, &es, &eventSetupImpls, cleaningUpAfterException](
auto nextTask) {
1532 endGlobalTransitionAsync<Traits>(
1534 }) |
then([
this, didGlobalBeginSucceed, &lumiPrincipal = lp](
auto nextTask) {
1536 if (didGlobalBeginSucceed) {
1545 }) |
then([
status =
std::move(iLumiStatus),
this](std::exception_ptr
const* iPtr,
auto nextTask)
mutable {
1546 std::exception_ptr ptr;
1558 ptr = std::current_exception();
1563 status->resumeGlobalLumiQueue();
1567 ptr = std::current_exception();
1575 status->resetResources();
1579 ptr = std::current_exception();
1601 if (
status->streamFinishedLumi()) {
1611 lumiStatus->setEndTime();
1615 bool cleaningUpAfterException = lumiStatus->cleaningUpAfterException();
1616 auto eventSetupImpls = &lumiStatus->eventSetupImpls();
1618 if (lumiStatus->didGlobalBeginSucceed()) {
1619 auto& lumiPrincipal = *lumiStatus->lumiPrincipal();
1621 lumiPrincipal.endTime());
1624 endStreamTransitionAsync<Traits>(
std::move(lumiDoneTask),
1630 cleaningUpAfterException);
1645 globalWaitTask.wait();
1650 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1651 input_->readProcessBlock(processBlockPrincipal);
1652 sentry.completedSuccessfully();
1658 <<
"Illegal attempt to insert run into cache\n" 1659 <<
"Contact a Framework Developer\n";
1661 auto rp = std::make_shared<RunPrincipal>(
input_->runAuxiliary(),
1669 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1671 sentry.completedSuccessfully();
1673 assert(
input_->reducedProcessHistoryID() == rp->reducedProcessHistoryID());
1675 return std::make_pair(rp->reducedProcessHistoryID(),
input_->run());
1682 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1683 input_->readAndMergeRun(*runPrincipal);
1684 sentry.completedSuccessfully();
1686 assert(
input_->reducedProcessHistoryID() == runPrincipal->reducedProcessHistoryID());
1687 return std::make_pair(runPrincipal->reducedProcessHistoryID(),
input_->run());
1693 <<
"Illegal attempt to insert lumi into cache\n" 1694 <<
"Run is invalid\n" 1695 <<
"Contact a Framework Developer\n";
1699 lbp->setAux(*
input_->luminosityBlockAuxiliary());
1701 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1703 sentry.completedSuccessfully();
1711 assert(lumiPrincipal.aux().sameIdentity(*
input_->luminosityBlockAuxiliary())
or 1712 input_->processHistoryRegistry().reducedProcessHistoryID(lumiPrincipal.aux().processHistoryID()) ==
1713 input_->processHistoryRegistry().reducedProcessHistoryID(
1714 input_->luminosityBlockAuxiliary()->processHistoryID()));
1715 bool lumiOK = lumiPrincipal.adjustToNewProductRegistry(*
preg());
1717 lumiPrincipal.mergeAuxiliary(*
input_->luminosityBlockAuxiliary());
1719 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1721 sentry.completedSuccessfully();
1723 return input_->luminosityBlock();
1735 s.writeProcessBlockAsync(nextTask, processBlockType);
1751 mergeableRunProductMetadata);
1755 s.writeRunAsync(nextTask, phid,
run, mergeableRunProductMetadata);
1763 FDEBUG(1) <<
"\tdeleteRunFromCache " <<
run <<
"\n";
1777 s.writeLumiAsync(nextTask, lumiPrincipal);
1815 std::lock_guard<std::recursive_mutex> guard(*(
sourceMutex_.get()));
1827 input_->luminosityBlockAuxiliary()->beginTime()));
1844 bool expected =
false;
1863 auto recursionTask =
make_waiting_task([
this, iTask, iStreamIndex](std::exception_ptr
const* iPtr)
mutable {
1867 bool expected =
false;
1885 if (
status->isLumiEnding()) {
1901 bool expected =
false;
1903 auto e = std::current_exception();
1916 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1920 sentry.completedSuccessfully();
1922 FDEBUG(1) <<
"\treadEvent\n";
1936 rng->postEventRead(
ev);
1941 chain::first([
this, &es, pep, iStreamIndex](
auto nextTask) {
1946 subProcess.doEventAsync(nextTask, *pep, &
streamLumiStatus_[iStreamIndex]->eventSetupImpls());
1948 }) |
ifThen(
looper_, [
this, iStreamIndex, pep](
auto nextTask) {
1952 }) |
then([pep](
auto nextTask) {
1953 FDEBUG(1) <<
"\tprocessEvent\n";
1954 pep->clearEventPrincipal();
1959 bool randomAccess =
input_->randomAccess();
1968 status =
looper_->doDuringLoop(iPrincipal, es, pc, &streamContext);
1987 FDEBUG(1) <<
"\tshouldWeStop\n";
1992 if (subProcess.terminate()) {
2008 bool expected =
false;
2018 ex <<
"The framework is configured to use at least two streams, but the following modules\n" 2019 <<
"require synchronizing on LuminosityBlock boundaries:";
2021 for (
auto worker :
schedule_->allWorkers()) {
2022 if (worker->wantsGlobalLuminosityBlocks() and worker->globalLuminosityBlocksQueue()) {
2024 ex <<
"\n " << worker->description()->moduleName() <<
" " << worker->description()->moduleLabel();
2028 ex <<
"\n\nThe situation can be fixed by either\n" 2029 <<
" * modifying the modules to support concurrent LuminosityBlocks (preferred), or\n" 2030 <<
" * setting 'process.options.numberOfConcurrentLuminosityBlocks = 1' in the configuration file";
2036 std::unique_ptr<LogSystem>
s;
2037 for (
auto worker :
schedule_->allWorkers()) {
2040 s = std::make_unique<LogSystem>(
"LegacyModules");
2041 (*s) <<
"The following legacy modules are configured. Support for legacy modules\n" 2042 "is going to end soon. These modules need to be converted to have type\n" 2043 "edm::global, edm::stream, edm::one, or in rare cases edm::limited.";
2045 (*s) <<
"\n " << worker->description()->moduleName() <<
" " << worker->description()->moduleLabel();
LuminosityBlockNumber_t luminosityBlock() const
std::atomic< bool > exceptionMessageLumis_
void readLuminosityBlock(LuminosityBlockProcessingStatus &)
void readEvent(unsigned int iStreamIndex)
ProcessContext processContext_
bool readNextEventForStream(unsigned int iStreamIndex, LuminosityBlockProcessingStatus &iLumiStatus)
void setProcessesWithMergeableRunProducts(ProductRegistry const &productRegistry)
T getParameter(std::string const &) const
void clear()
Not thread safe.
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
void setExceptionMessageRuns()
void deleteRunFromCache(ProcessHistoryID const &phid, RunNumber_t run)
SharedResourcesAcquirer sourceResourcesAcquirer_
Timestamp const & endTime() const
int totalEventsFailed() const
InputSource::ItemType nextTransitionType()
std::shared_ptr< ProductRegistry const > preg() const
void warnAboutLegacyModules() const
void processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
def create(alignables, pedeDump, additionalData, outputFile, config)
bool continuingLumi() const
edm::propagate_const< std::unique_ptr< InputSource > > input_
void globalEndLumiAsync(edm::WaitingTaskHolder iTask, std::shared_ptr< LuminosityBlockProcessingStatus > iLumiStatus)
std::unique_ptr< ExceptionToActionTable const > act_table_
PreESSyncIOV preESSyncIOVSignal_
static PFTauRenderPlugin instance
void beginRun(ProcessHistoryID const &phid, RunNumber_t run, bool &globalBeginSucceeded, bool &eventSetupForInstanceSucceeded)
void setExceptionMessageFiles(std::string &message)
RunPrincipal const & runPrincipal() const
void processEventAsyncImpl(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
void processEventWithLooper(EventPrincipal &, unsigned int iStreamIndex)
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
std::unique_ptr< edm::LimitedTaskQueue > lumiQueue_
volatile std::atomic< bool > shutdown_flag
static LuminosityBlockNumber_t maxLuminosityBlockNumber()
void clearCounters()
Clears counters used by trigger report.
ProcessBlockPrincipal & inputProcessBlockPrincipal() const
void push(oneapi::tbb::task_group &, const T &iAction)
asynchronously pushes functor iAction into queue
bool checkForAsyncStopRequest(StatusCode &)
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
void writeRunAsync(WaitingTaskHolder, ProcessHistoryID const &phid, RunNumber_t run, MergeableRunProductMetadata const *)
std::shared_ptr< EDLooperBase const > looper() const
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const
void ensureAvailableAccelerators(edm::ParameterSet const ¶meterSet)
ParameterSet getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
InputSource::ItemType lastTransitionType() const
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
Log< level::Error, false > LogError
void adjustIndexesAfterProductRegistryAddition()
bool hasRunPrincipal() const
void deleteLumiFromCache(LuminosityBlockProcessingStatus &)
StreamID streamID() const
ParameterSet const & parameterSet(StableProvenance const &provenance, ProcessHistory const &history)
PreallocationConfiguration preallocations_
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
void writeLumiAsync(WaitingTaskHolder, LuminosityBlockPrincipal &lumiPrincipal)
constexpr auto then(O &&iO)
unsigned int LuminosityBlockNumber_t
std::vector< SubProcess > subProcesses_
static unsigned int getUniqueID()
Returns a unique id each time called. Intended to be passed to ModuleDescription's constructor's modI...
void swap(Association< C > &lhs, Association< C > &rhs)
std::atomic< bool > exceptionMessageRuns_
edm::SerialTaskQueue queueWhichWaitsForIOVsToFinish_
void validateTopLevelParameterSets(ParameterSet *processParameterSet)
MergeableRunProductProcesses mergeableRunProductProcesses_
static std::string const input
void synchronousEventSetupForInstance(IOVSyncValue const &syncValue, oneapi::tbb::task_group &iGroup, eventsetup::EventSetupsController &espController)
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
void handleNextEventForStreamAsync(WaitingTaskHolder iTask, unsigned int iStreamIndex)
void fillProcessBlockPrincipal(std::string const &processName, DelayedReader *reader=nullptr)
void setEndTime(Timestamp const &time)
ProcessBlockPrincipal & processBlockPrincipal() const
void endProcessBlock(bool cleaningUpAfterException, bool beginProcessBlockSucceeded)
U second(std::pair< T, U > const &p)
std::shared_ptr< LuminosityBlockPrincipal > & lumiPrincipal()
oneapi::tbb::task_group * group() const noexcept
void emit(Args &&... args) const
unsigned int numberOfThreads() const
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
ServiceToken serviceToken_
ParameterSetID id() const
std::atomic< bool > deferredExceptionPtrIsSet_
Timestamp const & beginTime() const
bool resume()
Resumes processing if the queue was paused.
void doneWaiting(std::exception_ptr iExcept)
ParameterSet const & registerIt()
std::shared_ptr< EDLooperBase > fillLooper(eventsetup::EventSetupsController &esController, eventsetup::EventSetupProvider &cp, ParameterSet ¶ms, std::vector< std::string > const &loopers)
std::vector< edm::SerialTaskQueue > streamQueues_
auto runLast(edm::WaitingTaskHolder iTask)
void validateLooper(ParameterSet &pset)
bool taskHasFailed() const noexcept
void swap(edm::DataFrameContainer &lhs, edm::DataFrameContainer &rhs)
ShouldWriteRun shouldWriteRun() const
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
void endRun(ProcessHistoryID const &phid, RunNumber_t run, bool globalBeginSucceeded, bool cleaningUpAfterException)
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
SerialTaskQueueChain & serialQueueChain() const
static void setThrowAnException(bool v)
void writeProcessBlockAsync(WaitingTaskHolder, ProcessBlockType)
void setLastOperationSucceeded(bool value)
unsigned int numberOfStreams() const
void initialize(Schedule const *, std::shared_ptr< ProductRegistry const >)
std::vector< ModuleDescription const * > nonConsumedUnscheduledModules(edm::PathsAndConsumesOfModulesBase const &iPnC, std::vector< ModuleProcessName > &consumedByChildren)
static ServiceRegistry & instance()
void clear()
Not thread safe.
void continueLumiAsync(edm::WaitingTaskHolder iHolder)
StatusCode runToCompletion()
FunctorWaitingTask< F > * make_waiting_task(F f)
void endUnfinishedRun(ProcessHistoryID const &phid, RunNumber_t run, bool globalBeginSucceeded, bool cleaningUpAfterException, bool eventSetupForInstanceSucceeded)
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
bool lastOperationSucceeded() const
unsigned int numberOfRuns() const
InputSource::ItemType processLumis(std::shared_ptr< void > const &iRunResource)
void insert(std::unique_ptr< ProcessBlockPrincipal >)
std::string exceptionMessageFiles_
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
std::unique_ptr< InputSource > makeInput(unsigned int moduleIndex, ParameterSet ¶ms, CommonParams const &common, std::shared_ptr< ProductRegistry > preg, std::shared_ptr< BranchIDListHelper > branchIDListHelper, std::shared_ptr< ProcessBlockHelper > const &processBlockHelper, std::shared_ptr< ThinnedAssociationsHelper > thinnedAssociationsHelper, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, PreallocationConfiguration const &allocations)
void respondToCloseInputFile()
InputSource::ItemType lastSourceTransition_
Log< level::Info, false > LogInfo
bool wasEventProcessingStopped() const
std::vector< ParameterSet > popSubProcessVParameterSet(ParameterSet ¶meterSet)
void setProcessHistoryRegistry(ProcessHistoryRegistry const &phr)
void checkForModuleDependencyCorrectness(edm::PathsAndConsumesOfModulesBase const &iPnC, bool iPrintDependencies)
void respondToOpenInputFile()
unsigned int numberOfLuminosityBlocks() const
MergeableRunProductMetadata * mergeableRunProductMetadata()
int readAndMergeLumi(LuminosityBlockProcessingStatus &)
void merge(std::shared_ptr< RunAuxiliary > aux, std::shared_ptr< ProductRegistry const > reg)
oneapi::tbb::task_group taskGroup_
void deleteRun(ProcessHistoryID const &phid, RunNumber_t run)
void throwAboutModulesRequiringLuminosityBlockSynchronization() const
void insertForInput(std::unique_ptr< ProcessBlockPrincipal >)
void addContext(std::string const &context)
void stopProcessingEvents()
static EventNumber_t maxEventNumber()
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
ShouldWriteLumi shouldWriteLumi() const
edm::EventID specifiedEventTransition() const
bool forceESCacheClearOnNewRun_
void beginLumiAsync(edm::IOVSyncValue const &iSyncValue, std::shared_ptr< void > const &iRunResource, edm::WaitingTaskHolder iHolder)
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
bool shouldWeStop() const
void adjustEventsToNewProductRegistry(std::shared_ptr< ProductRegistry const > reg)
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
void handleEndLumiExceptions(std::exception_ptr const *iPtr, WaitingTaskHolder &holder)
std::shared_ptr< RunPrincipal > const & runPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run) const
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
std::vector< std::string > branchesToDeleteEarly_
void streamEndLumiAsync(edm::WaitingTaskHolder iTask, unsigned int iStreamIndex)
void closeInputFile(bool cleaningUpAfterException)
void readProcessBlock(ProcessBlockPrincipal &)
static ComponentFactory< T > const * get()
std::exception_ptr deferredExceptionPtr_
void removeModules(std::vector< ModuleDescription const *> const &modules)
auto lastTask(edm::WaitingTaskHolder iTask)
bool shouldWeCloseOutput() const
PathsAndConsumesOfModules pathsAndConsumesOfModules_
edm::LuminosityBlockNumber_t nextLuminosityBlockID()
int totalEventsPassed() const
edm::propagate_const< std::shared_ptr< ProcessBlockHelper > > processBlockHelper_
Transition requestedTransition() const
Log< level::System, true > LogAbsolute
void setNextSyncValue(IOVSyncValue const &iValue)
void prepareForNextLoop()
std::shared_ptr< LuminosityBlockPrincipal > getAvailableLumiPrincipalPtr()
auto wrap(F iFunc) -> decltype(iFunc())
std::shared_ptr< ActivityRegistry > actReg_
std::atomic< unsigned int > streamLumiActive_
Log< level::Warning, false > LogWarning
void beginProcessBlock(bool &beginProcessBlockSucceeded)
The Signals That Services Can Subscribe To This is based on ActivityRegistry h
Helper function to determine trigger accepts.
T first(std::pair< T, U > const &p)
std::pair< ProcessHistoryID, RunNumber_t > readRun()
static ParentageRegistry * instance()
void setExceptionMessageLumis()
bool setDeferredException(std::exception_ptr)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
bool deleteNonConsumedUnscheduledModules_
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
std::pair< ProcessHistoryID, RunNumber_t > readAndMergeRun()
void inputProcessBlocks()
bool insertMapped(value_type const &v)
static Registry * instance()
PrincipalCache principalCache_
void push(oneapi::tbb::task_group &iGroup, T &&iAction)
asynchronously pushes functor iAction into queue
EventProcessor(std::unique_ptr< ParameterSet > parameterSet, ServiceToken const &token=ServiceToken(), serviceregistry::ServiceLegacy=serviceregistry::kOverlapIsError, std::vector< std::string > const &defaultServices=std::vector< std::string >(), std::vector< std::string > const &forcedServices=std::vector< std::string >())
void dumpOptionsToLogFile(unsigned int nThreads, unsigned int nStreams, unsigned int nConcurrentLumis, unsigned int nConcurrentRuns)
std::pair< edm::ProcessHistoryID, edm::RunNumber_t > nextRunID()
def merge(dictlist, TELL=False)