78 #include "boost/range/adaptor/reversed.hpp"
102 class SendSourceTerminationSignalIfException {
105 ~SendSourceTerminationSignalIfException() {
110 void completedSuccessfully() { reg_ =
nullptr; }
119 namespace chain = waiting_task::chain;
124 std::shared_ptr<ProductRegistry>
preg,
125 std::shared_ptr<BranchIDListHelper> branchIDListHelper,
126 std::shared_ptr<ProcessBlockHelper>
const& processBlockHelper,
127 std::shared_ptr<ThinnedAssociationsHelper> thinnedAssociationsHelper,
128 std::shared_ptr<ActivityRegistry>
areg,
132 if (main_input ==
nullptr) {
134 <<
"There must be exactly one source in the configuration.\n"
135 <<
"It is missing (or there are sufficient syntax errors such that it is not recognized as the source)\n";
140 std::unique_ptr<ParameterSetDescriptionFillerBase> filler(
143 filler->fill(descriptions);
148 std::ostringstream ost;
149 ost <<
"Validating configuration of input source of type " << modtype;
165 processConfiguration.get(),
172 thinnedAssociationsHelper,
179 areg->preSourceConstructionSignal_(md);
180 std::unique_ptr<InputSource>
input;
183 std::shared_ptr<int> sentry(
nullptr, [areg, &md](
void*) { areg->postSourceConstructionSignal_(md); });
186 input->preEventReadFromSourceSignal_.connect(std::cref(areg->preEventReadFromSourceSignal_));
187 input->postEventReadFromSourceSignal_.connect(std::cref(areg->postEventReadFromSourceSignal_));
190 std::ostringstream ost;
191 ost <<
"Constructing input source of type " << modtype;
204 filler->fill(descriptions);
209 fmt::format(
"Validating configuration of EDLooper of type {} with label: '{}'", modtype, moduleLabel));
217 std::vector<std::string>
const& loopers) {
218 std::shared_ptr<EDLooperBase> vLooper;
220 assert(1 == loopers.size());
222 for (
auto const& looperName : loopers) {
235 std::vector<std::string>
const& defaultServices,
236 std::vector<std::string>
const& forcedServices)
239 branchIDListHelper_(),
242 espController_(new eventsetup::EventSetupsController),
245 processConfiguration_(),
251 deferredExceptionPtrIsSet_(
false),
255 beginJobCalled_(
false),
256 shouldWeStop_(
false),
257 fileModeNoMerge_(
false),
258 exceptionMessageFiles_(),
259 exceptionMessageRuns_(),
260 exceptionMessageLumis_(
false),
261 forceLooperToEnd_(
false),
262 looperBeginJobRun_(
false),
263 forceESCacheClearOnNewRun_(
false),
264 eventSetupDataToExcludeFromPrefetching_() {
265 auto processDesc = std::make_shared<ProcessDesc>(
std::move(parameterSet));
266 processDesc->addServices(defaultServices, forcedServices);
267 init(processDesc, iToken, iLegacy);
271 std::vector<std::string>
const& defaultServices,
272 std::vector<std::string>
const& forcedServices)
275 branchIDListHelper_(),
278 espController_(new eventsetup::EventSetupsController),
281 processConfiguration_(),
287 deferredExceptionPtrIsSet_(
false),
291 beginJobCalled_(
false),
292 shouldWeStop_(
false),
293 fileModeNoMerge_(
false),
294 exceptionMessageFiles_(),
295 exceptionMessageRuns_(),
296 exceptionMessageLumis_(
false),
297 forceLooperToEnd_(
false),
298 looperBeginJobRun_(
false),
299 forceESCacheClearOnNewRun_(
false),
300 asyncStopRequestedWhileProcessingEvents_(
false),
301 eventSetupDataToExcludeFromPrefetching_() {
302 auto processDesc = std::make_shared<ProcessDesc>(
std::move(parameterSet));
303 processDesc->addServices(defaultServices, forcedServices);
312 branchIDListHelper_(),
315 espController_(new eventsetup::EventSetupsController),
318 processConfiguration_(),
324 deferredExceptionPtrIsSet_(
false),
328 beginJobCalled_(
false),
329 shouldWeStop_(
false),
330 fileModeNoMerge_(
false),
331 exceptionMessageFiles_(),
332 exceptionMessageRuns_(),
333 exceptionMessageLumis_(
false),
334 forceLooperToEnd_(
false),
335 looperBeginJobRun_(
false),
336 forceESCacheClearOnNewRun_(
false),
337 asyncStopRequestedWhileProcessingEvents_(
false),
338 eventSetupDataToExcludeFromPrefetching_() {
339 init(processDesc, token, legacy);
353 std::shared_ptr<ParameterSet>
parameterSet = processDesc->getProcessPSet();
357 bool const hasSubProcesses = !subProcessVParameterSet.empty();
365 ParameterSet const& optionsPset(parameterSet->getUntrackedParameterSet(
"options"));
367 if (fileMode !=
"NOMERGE" and fileMode !=
"FULLMERGE") {
370 <<
"Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
377 unsigned int nThreads = optionsPset.getUntrackedParameter<
unsigned int>(
"numberOfThreads");
383 unsigned int nStreams = optionsPset.getUntrackedParameter<
unsigned int>(
"numberOfStreams");
387 unsigned int nConcurrentRuns = optionsPset.getUntrackedParameter<
unsigned int>(
"numberOfConcurrentRuns");
388 if (nConcurrentRuns != 1) {
390 <<
"Although the plan is to change this in the future, currently nConcurrentRuns must always be 1.\n";
392 unsigned int nConcurrentLumis =
393 optionsPset.getUntrackedParameter<
unsigned int>(
"numberOfConcurrentLuminosityBlocks");
394 if (nConcurrentLumis == 0) {
395 nConcurrentLumis = 2;
397 if (nConcurrentLumis > nStreams) {
398 nConcurrentLumis = nStreams;
400 std::vector<std::string> loopers = parameterSet->getParameter<std::vector<std::string>>(
"@all_loopers");
401 if (!loopers.empty()) {
403 if (nStreams != 1 || nConcurrentLumis != 1 || nConcurrentRuns != 1) {
404 edm::LogWarning(
"ThreadStreamSetup") <<
"There is a looper, so the number of streams, the number "
405 "of concurrent runs, and the number of concurrent lumis "
406 "are all being reset to 1. Loopers cannot currently support "
407 "values greater than 1.";
409 nConcurrentLumis = 1;
413 bool dumpOptions = optionsPset.getUntrackedParameter<
bool>(
"dumpOptions");
417 if (nThreads > 1
or nStreams > 1) {
418 edm::LogInfo(
"ThreadStreamSetup") <<
"setting # threads " << nThreads <<
"\nsetting # streams " << nStreams;
423 unsigned int maxConcurrentIOVs = nConcurrentLumis;
441 optionsPset.getUntrackedParameter<
bool>(
"deleteNonConsumedUnscheduledModules");
447 auto& serviceSets = processDesc->getServicesPSets();
448 ServiceToken token = items.initServices(serviceSets, *parameterSet, iToken, iLegacy,
true);
456 handler->willBeUsingThreads();
460 std::shared_ptr<CommonParams> common(items.initMisc(*parameterSet));
463 ParameterSet const& eventSetupPset(optionsPset.getUntrackedParameterSet(
"eventSetup"));
465 *parameterSet, items.actReg_.get(), &eventSetupPset, maxConcurrentIOVs, dumpOptions);
468 if (!loopers.empty()) {
470 looper_->setActionTable(items.act_table_.get());
471 looper_->attachTo(*items.actReg_);
474 deleteNonConsumedUnscheduledModules_ =
false;
479 lumiQueue_ = std::make_unique<LimitedTaskQueue>(nConcurrentLumis);
489 items.branchIDListHelper(),
491 items.thinnedAssociationsHelper(),
493 items.processConfiguration(),
503 preg_ = items.preg();
511 FDEBUG(2) << parameterSet << std::endl;
516 auto ep = std::make_shared<EventPrincipal>(
preg(),
543 for (
auto& subProcessPSet : subProcessVParameterSet) {
599 actReg_->preallocateSignal_(bounds);
603 std::vector<ModuleProcessName> consumedBySubProcesses;
606 auto c = subProcess.keepOnlyConsumedUnscheduledModules(deleteModules);
607 if (consumedBySubProcesses.empty()) {
609 }
else if (not
c.empty()) {
610 std::vector<ModuleProcessName>
tmp;
611 tmp.reserve(consumedBySubProcesses.size() +
c.size());
613 consumedBySubProcesses.end(),
616 std::back_inserter(tmp));
625 not unusedModules.empty()) {
628 edm::LogInfo(
"DeleteModules").log([&unusedModules](
auto&
l) {
629 l <<
"Following modules are not in any Path or EndPath, nor is their output consumed by any other module, "
631 "therefore they are deleted before beginJob transition.";
666 ex.
addContext(
"Calling beginJob for the source");
672 constexpr
bool mustPrefetchMayGet =
true;
674 auto const runLookup =
preg_->productLookup(
InRun);
675 auto const lumiLookup =
preg_->productLookup(
InLumi);
678 looper_->updateLookup(
InRun, *runLookup, mustPrefetchMayGet);
679 looper_->updateLookup(
InLumi, *lumiLookup, mustPrefetchMayGet);
681 looper_->updateLookup(
esp_->recordsToProxyIndices());
685 actReg_->postBeginJobSignal_();
696 "Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
701 using namespace edm::waiting_task::chain;
704 tbb::task_group
group;
711 first([
this,
i, &c, &collectorMutex](
auto nextTask) {
712 std::exception_ptr ep;
717 ep = std::current_exception();
720 std::lock_guard<std::mutex>
l(collectorMutex);
721 c.
call([&ep]() { std::rethrow_exception(ep); });
723 }) |
then([
this,
i, &c, &collectorMutex](
auto nextTask) {
725 first([
this,
i, &c, &collectorMutex, &subProcess](
auto nextTask) {
726 std::exception_ptr ep;
729 subProcess.doEndStream(
i);
731 ep = std::current_exception();
734 std::lock_guard<std::mutex>
l(collectorMutex);
735 c.
call([&ep]() { std::rethrow_exception(ep); });
745 c.
call([actReg]() { actReg->preEndJobSignal_(); });
754 c.
call([actReg]() { actReg->postEndJobSignal_(); });
763 return schedule_->getAllModuleDescriptions();
775 #include "TransitionProcessors.icc"
779 bool returnValue =
false;
795 SendSourceTerminationSignalIfException sentry(
actReg_.get());
799 itemType =
input_->nextItemType();
803 sentry.completedSuccessfully();
816 return std::make_pair(
input_->reducedProcessHistoryID(),
input_->run());
835 bool firstTime =
true;
845 auto trans = fp.processFiles(*
this);
856 throw cms::Exception(
"BadTransition") <<
"Unexpected transition change " << trans;
865 "Another exception was caught while trying to clean up lumis after the primary fatal exception.");
891 FDEBUG(1) <<
" \treadFile\n";
893 SendSourceTerminationSignalIfException sentry(
actReg_.get());
898 if (size < preg_->
size()) {
905 sentry.completedSuccessfully();
910 SendSourceTerminationSignalIfException sentry(
actReg_.get());
911 input_->closeFile(
fb_.get(), cleaningUpAfterException);
912 sentry.completedSuccessfully();
914 FDEBUG(1) <<
"\tcloseInputFile\n";
922 FDEBUG(1) <<
"\topenOutputFiles\n";
929 FDEBUG(1) <<
"\tcloseOutputFiles\n";
935 [
this](
auto& subProcess) { subProcess.updateBranchIDListHelper(
branchIDListHelper_->branchIDLists()); });
939 FDEBUG(1) <<
"\trespondToOpenInputFile\n";
947 FDEBUG(1) <<
"\trespondToCloseInputFile\n";
957 FDEBUG(1) <<
"\tstartingNewLoop\n";
963 looper_->setModuleChanger(&changer);
965 looper_->setModuleChanger(
nullptr);
971 FDEBUG(1) <<
"\tendOfLoop\n";
978 FDEBUG(1) <<
"\trewind\n";
983 FDEBUG(1) <<
"\tprepareForNextLoop\n";
987 FDEBUG(1) <<
"\tshouldWeCloseOutput\n";
990 if (subProcess.shouldWeCloseOutput()) {
1000 FDEBUG(1) <<
"\tdoErrorStuff\n";
1001 LogError(
"StateMachine") <<
"The EventProcessor state machine encountered an unexpected event\n"
1002 <<
"and went to the error state\n"
1003 <<
"Will attempt to terminate processing normally\n"
1004 <<
"(IF using the looper the next loop will be attempted)\n"
1005 <<
"This likely indicates a bug in an input module or corrupted input or both\n";
1016 beginGlobalTransitionAsync<Traits>(
1021 }
while (not globalWaitTask.
done());
1024 std::rethrow_exception(*(globalWaitTask.
exceptionPtr()));
1026 beginProcessBlockSucceeded =
true;
1030 input_->fillProcessBlockHelper();
1032 while (
input_->nextProcessBlock(processBlockPrincipal)) {
1039 beginGlobalTransitionAsync<Traits>(
1044 }
while (not globalWaitTask.
done());
1046 std::rethrow_exception(*(globalWaitTask.
exceptionPtr()));
1053 }
while (not writeWaitTask.
done());
1077 cleaningUpAfterException);
1080 }
while (not globalWaitTask.
done());
1082 std::rethrow_exception(*(globalWaitTask.
exceptionPtr()));
1085 if (beginProcessBlockSucceeded) {
1090 }
while (not writeWaitTask.
done());
1097 for (
auto&
s : subProcesses_) {
1104 bool& globalBeginSucceeded,
1105 bool& eventSetupForInstanceSucceeded) {
1106 globalBeginSucceeded =
false;
1109 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1112 sentry.completedSuccessfully();
1120 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1122 eventSetupForInstanceSucceeded =
true;
1123 sentry.completedSuccessfully();
1125 auto const& es =
esp_->eventSetupImpl();
1130 using namespace edm::waiting_task::chain;
1133 }) |
then([
this, &es](
auto nextTask)
mutable {
1141 }
while (not waitTask.
done());
1150 using namespace edm::waiting_task::chain;
1151 chain::first([&runPrincipal, &es,
this](
auto waitTask) {
1153 beginGlobalTransitionAsync<Traits>(
1155 }) |
then([&globalBeginSucceeded, run](
auto waitTask)
mutable {
1156 globalBeginSucceeded =
true;
1157 FDEBUG(1) <<
"\tbeginRun " << run <<
"\n";
1158 }) |
ifThen(
looper_, [
this, &runPrincipal, &es](
auto waitTask) {
1160 }) |
ifThen(
looper_, [
this, &runPrincipal, &es](
auto waitTask) {
1166 }
while (not globalWaitTask.
done());
1168 std::rethrow_exception(*(globalWaitTask.
exceptionPtr()));
1186 }
while (not streamLoopWaitTask.
done());
1188 std::rethrow_exception(*(streamLoopWaitTask.
exceptionPtr()));
1191 FDEBUG(1) <<
"\tstreamBeginRun " << run <<
"\n";
1199 bool globalBeginSucceeded,
1200 bool cleaningUpAfterException,
1201 bool eventSetupForInstanceSucceeded) {
1202 if (eventSetupForInstanceSucceeded) {
1204 endRun(phid, run, globalBeginSucceeded, cleaningUpAfterException);
1206 if (globalBeginSucceeded) {
1214 }
while (not t.
done());
1226 bool globalBeginSucceeded,
1227 bool cleaningUpAfterException) {
1235 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1237 sentry.completedSuccessfully();
1239 auto const& es =
esp_->eventSetupImpl();
1240 if (globalBeginSucceeded) {
1253 cleaningUpAfterException);
1256 }
while (not streamLoopWaitTask.
done());
1258 std::rethrow_exception(*(streamLoopWaitTask.
exceptionPtr()));
1261 FDEBUG(1) <<
"\tstreamEndRun " << run <<
"\n";
1268 using namespace edm::waiting_task::chain;
1269 chain::first([
this, &runPrincipal, &es, cleaningUpAfterException](
auto nextTask) {
1272 endGlobalTransitionAsync<Traits>(
1274 }) |
ifThen(
looper_, [
this, &runPrincipal, &es](
auto nextTask) {
1276 }) |
ifThen(
looper_, [
this, &runPrincipal, &es](
auto nextTask) {
1282 }
while (not globalWaitTask.
done());
1284 std::rethrow_exception(*(globalWaitTask.
exceptionPtr()));
1287 FDEBUG(1) <<
"\tendRun " << run <<
"\n";
1298 input_->luminosityBlockAuxiliary()->beginTime()),
1304 }
while (not waitTask.
done());
1313 std::shared_ptr<void>
const& iRunResource,
1331 auto* espController,
1338 SendSourceTerminationSignalIfException sentry(actReg);
1343 espController->eventSetupForInstanceAsync(
1344 iSync, task, status->endIOVWaitingTasks(), status->eventSetupImpls());
1345 sentry.completedSuccessfully();
1347 task.doneWaiting(std::current_exception());
1354 auto group = nextTask.group();
1356 *
group, [
this, task =
std::move(nextTask), iSync, status, asyncEventSetup]()
mutable {
1364 }) |
chain::then([
this,
status](std::exception_ptr
const* iPtr,
auto nextTask) {
1366 auto copyTask = nextTask;
1368 nextTask.doneWaiting(*iPtr);
1370 auto group = copyTask.group();
1373 if (task.taskHasFailed()) {
1374 status->resetResources();
1380 auto group = task.group();
1391 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1394 sentry.completedSuccessfully();
1400 rng->preBeginLumi(lb);
1405 using namespace edm::waiting_task::chain;
1411 beginGlobalTransitionAsync<Traits>(
1418 status->globalBeginDidSucceed();
1422 }) |
then([
this,
status](std::exception_ptr
const* iPtr,
auto holder)
mutable {
1424 status->resetResources();
1425 holder.doneWaiting(*iPtr);
1428 status->globalBeginDidSucceed();
1441 auto eventSetupImpls = &status->eventSetupImpls();
1442 auto lp = status->lumiPrincipal().get();
1445 event.setLuminosityBlockPrincipal(lp);
1447 using namespace edm::waiting_task::chain;
1449 beginStreamTransitionAsync<Traits>(
1451 }) |
then([
this,
i](std::exception_ptr
const* exceptionFromBeginStreamLumi,
auto nextTask) {
1452 if (exceptionFromBeginStreamLumi) {
1466 status->resetResources();
1467 postQueueTask.doneWaiting(std::current_exception());
1478 status->needToContinueLumi();
1479 status->startProcessingEvents();
1482 unsigned int streamIndex = 0;
1483 tbb::task_arena arena{tbb::task_arena::attach()};
1487 iHolder.
group()->run(
1501 std::shared_ptr<LuminosityBlockProcessingStatus> iLumiStatus) {
1504 auto& lp = *(iLumiStatus->lumiPrincipal());
1505 bool didGlobalBeginSucceed = iLumiStatus->didGlobalBeginSucceed();
1506 bool cleaningUpAfterException = iLumiStatus->cleaningUpAfterException();
1508 std::vector<std::shared_ptr<const EventSetupImpl>>
const* eventSetupImpls = &iLumiStatus->eventSetupImpls();
1510 using namespace edm::waiting_task::chain;
1511 chain::first([
this, &lp, &es, &eventSetupImpls, cleaningUpAfterException](
auto nextTask) {
1516 endGlobalTransitionAsync<Traits>(
1518 }) |
then([
this, didGlobalBeginSucceed, &lumiPrincipal = lp](
auto nextTask) {
1520 if (didGlobalBeginSucceed) {
1529 }) |
then([
status =
std::move(iLumiStatus),
this](std::exception_ptr
const* iPtr,
auto nextTask)
mutable {
1530 std::exception_ptr ptr;
1542 ptr = std::current_exception();
1547 status->resumeGlobalLumiQueue();
1551 ptr = std::current_exception();
1559 status->resetResources();
1563 ptr = std::current_exception();
1585 if (
status->streamFinishedLumi()) {
1595 lumiStatus->setEndTime();
1599 bool cleaningUpAfterException = lumiStatus->cleaningUpAfterException();
1600 auto eventSetupImpls = &lumiStatus->eventSetupImpls();
1602 if (lumiStatus->didGlobalBeginSucceed()) {
1603 auto& lumiPrincipal = *lumiStatus->lumiPrincipal();
1605 lumiPrincipal.endTime());
1608 endStreamTransitionAsync<Traits>(
std::move(lumiDoneTask),
1614 cleaningUpAfterException);
1631 }
while (not globalWaitTask.
done());
1633 std::rethrow_exception(*(globalWaitTask.
exceptionPtr()));
1639 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1640 input_->readProcessBlock(processBlockPrincipal);
1641 sentry.completedSuccessfully();
1647 <<
"Illegal attempt to insert run into cache\n"
1648 <<
"Contact a Framework Developer\n";
1650 auto rp = std::make_shared<RunPrincipal>(
input_->runAuxiliary(),
1658 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1660 sentry.completedSuccessfully();
1662 assert(
input_->reducedProcessHistoryID() == rp->reducedProcessHistoryID());
1664 return std::make_pair(rp->reducedProcessHistoryID(),
input_->run());
1671 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1672 input_->readAndMergeRun(*runPrincipal);
1673 sentry.completedSuccessfully();
1675 assert(
input_->reducedProcessHistoryID() == runPrincipal->reducedProcessHistoryID());
1676 return std::make_pair(runPrincipal->reducedProcessHistoryID(),
input_->run());
1682 <<
"Illegal attempt to insert lumi into cache\n"
1683 <<
"Run is invalid\n"
1684 <<
"Contact a Framework Developer\n";
1688 lbp->setAux(*
input_->luminosityBlockAuxiliary());
1690 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1692 sentry.completedSuccessfully();
1700 assert(lumiPrincipal.aux().sameIdentity(*
input_->luminosityBlockAuxiliary())
or
1701 input_->processHistoryRegistry().reducedProcessHistoryID(lumiPrincipal.aux().processHistoryID()) ==
1702 input_->processHistoryRegistry().reducedProcessHistoryID(
1703 input_->luminosityBlockAuxiliary()->processHistoryID()));
1704 bool lumiOK = lumiPrincipal.adjustToNewProductRegistry(*
preg());
1706 lumiPrincipal.mergeAuxiliary(*
input_->luminosityBlockAuxiliary());
1708 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1710 sentry.completedSuccessfully();
1712 return input_->luminosityBlock();
1716 using namespace edm::waiting_task;
1724 s.writeProcessBlockAsync(nextTask, processBlockType);
1733 using namespace edm::waiting_task;
1740 mergeableRunProductMetadata);
1744 s.writeRunAsync(nextTask, phid, run, mergeableRunProductMetadata);
1751 for_all(
subProcesses_, [run, phid](
auto& subProcess) { subProcess.deleteRunFromCache(phid, run); });
1752 FDEBUG(1) <<
"\tdeleteRunFromCache " << run <<
"\n";
1756 using namespace edm::waiting_task;
1766 s.writeLumiAsync(nextTask, lumiPrincipal);
1803 std::lock_guard<std::recursive_mutex> guard(*(
sourceMutex_.get()));
1815 input_->luminosityBlockAuxiliary()->beginTime()));
1832 bool expected =
false;
1851 auto recursionTask =
make_waiting_task([
this, iTask, iStreamIndex](std::exception_ptr
const* iPtr)
mutable {
1855 bool expected =
false;
1873 if (
status->isLumiEnding()) {
1889 bool expected =
false;
1891 auto e = std::current_exception();
1904 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1908 sentry.completedSuccessfully();
1910 FDEBUG(1) <<
"\treadEvent\n";
1924 rng->postEventRead(ev);
1928 using namespace edm::waiting_task::chain;
1929 chain::first([
this, &es, pep, iStreamIndex](
auto nextTask) {
1933 for (
auto& subProcess : boost::adaptors::reverse(
subProcesses_)) {
1934 subProcess.doEventAsync(nextTask, *pep, &
streamLumiStatus_[iStreamIndex]->eventSetupImpls());
1936 }) |
ifThen(
looper_, [
this, iStreamIndex, pep](
auto nextTask) {
1940 }) |
then([pep](
auto nextTask) {
1941 FDEBUG(1) <<
"\tprocessEvent\n";
1942 pep->clearEventPrincipal();
1947 bool randomAccess =
input_->randomAccess();
1956 status =
looper_->doDuringLoop(iPrincipal, es, pc, &streamContext);
1975 FDEBUG(1) <<
"\tshouldWeStop\n";
1980 if (subProcess.terminate()) {
1996 bool expected =
false;
2006 ex <<
"The framework is configured to use at least two streams, but the following modules\n"
2007 <<
"require synchronizing on LuminosityBlock boundaries:";
2009 for (
auto worker :
schedule_->allWorkers()) {
2010 if (worker->wantsGlobalLuminosityBlocks() and worker->globalLuminosityBlocksQueue()) {
2012 ex <<
"\n " << worker->description()->moduleName() <<
" " << worker->description()->moduleLabel();
2016 ex <<
"\n\nThe situation can be fixed by either\n"
2017 <<
" * modifying the modules to support concurrent LuminosityBlocks (preferred), or\n"
2018 <<
" * setting 'process.options.numberOfConcurrentLuminosityBlocks = 1' in the configuration file";
2024 std::unique_ptr<LogSystem>
s;
2025 for (
auto worker :
schedule_->allWorkers()) {
2028 s = std::make_unique<LogSystem>(
"LegacyModules");
2029 (*s) <<
"The following legacy modules are configured. Support for legacy modules\n"
2030 "is going to end soon. These modules need to be converted to have type\n"
2031 "edm::global, edm::stream, edm::one, or in rare cases edm::limited.";
2033 (*s) <<
"\n " << worker->description()->moduleName() <<
" " << worker->description()->moduleLabel();
std::atomic< bool > exceptionMessageLumis_
RunPrincipal const & runPrincipal() const
void readLuminosityBlock(LuminosityBlockProcessingStatus &)
void readEvent(unsigned int iStreamIndex)
ProcessContext processContext_
bool readNextEventForStream(unsigned int iStreamIndex, LuminosityBlockProcessingStatus &iLumiStatus)
void setProcessesWithMergeableRunProducts(ProductRegistry const &productRegistry)
void clear()
Not thread safe.
bool wasEventProcessingStopped() const
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
const edm::EventSetup & c
void deleteRunFromCache(ProcessHistoryID const &phid, RunNumber_t run)
SharedResourcesAcquirer sourceResourcesAcquirer_
edm::EventID specifiedEventTransition() const
InputSource::ItemType nextTransitionType()
void processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
tbb::task_group taskGroup_
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventIDconst &, edm::Timestampconst & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
edm::propagate_const< std::unique_ptr< InputSource > > input_
void globalEndLumiAsync(edm::WaitingTaskHolder iTask, std::shared_ptr< LuminosityBlockProcessingStatus > iLumiStatus)
std::unique_ptr< ExceptionToActionTable const > act_table_
void push(tbb::task_group &iGroup, T &&iAction)
asynchronously pushes functor iAction into queue
static PFTauRenderPlugin instance
ParameterSetID id() const
void beginRun(ProcessHistoryID const &phid, RunNumber_t run, bool &globalBeginSucceeded, bool &eventSetupForInstanceSucceeded)
void setExceptionMessageFiles(std::string &message)
void processEventAsyncImpl(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
bool willBeContinued() const
The source is replaying overlapping LuminosityBlocks and this is not the last part for this Lumiosity...
void processEventWithLooper(EventPrincipal &, unsigned int iStreamIndex)
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
std::unique_ptr< edm::LimitedTaskQueue > lumiQueue_
volatile std::atomic< bool > shutdown_flag
static LuminosityBlockNumber_t maxLuminosityBlockNumber()
void clearCounters()
Clears counters used by trigger report.
unsigned int numberOfRuns() const
std::vector< ModuleDescription const * > nonConsumedUnscheduledModules(edm::PathsAndConsumesOfModulesBase const &iPnC, std::vector< ModuleProcessName > &consumedByChildren)
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
bool checkForAsyncStopRequest(StatusCode &)
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
bool lastOperationSucceeded() const
void writeRunAsync(WaitingTaskHolder, ProcessHistoryID const &phid, RunNumber_t run, MergeableRunProductMetadata const *)
unsigned int numberOfThreads() const
bool hasRunPrincipal() const
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
Log< level::Error, false > LogError
void adjustIndexesAfterProductRegistryAddition()
std::string exceptionMessageRuns_
void deleteLumiFromCache(LuminosityBlockProcessingStatus &)
PreallocationConfiguration preallocations_
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
void writeLumiAsync(WaitingTaskHolder, LuminosityBlockPrincipal &lumiPrincipal)
constexpr auto then(O &&iO)
unsigned int LuminosityBlockNumber_t
std::vector< SubProcess > subProcesses_
static unsigned int getUniqueID()
Returns a unique id each time called. Intended to be passed to ModuleDescription's constructor's modI...
bool alreadyPrinted() const
edm::SerialTaskQueue queueWhichWaitsForIOVsToFinish_
void validateTopLevelParameterSets(ParameterSet *processParameterSet)
MergeableRunProductProcesses mergeableRunProductProcesses_
static std::string const input
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
void handleNextEventForStreamAsync(WaitingTaskHolder iTask, unsigned int iStreamIndex)
void fillProcessBlockPrincipal(std::string const &processName, DelayedReader *reader=nullptr)
void setEndTime(Timestamp const &time)
void endProcessBlock(bool cleaningUpAfterException, bool beginProcessBlockSucceeded)
U second(std::pair< T, U > const &p)
std::shared_ptr< LuminosityBlockPrincipal > & lumiPrincipal()
void initialize(Schedule const *, std::shared_ptr< ProductRegistry const >)
ServiceToken serviceToken_
ProcessBlockPrincipal & inputProcessBlockPrincipal() const
std::atomic< bool > deferredExceptionPtrIsSet_
bool resume()
Resumes processing if the queue was paused.
void push(tbb::task_group &, const T &iAction)
asynchronously pushes functor iAction into queue
void doneWaiting(std::exception_ptr iExcept)
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
LuminosityBlockNumber_t luminosityBlock() const
std::shared_ptr< ProductRegistry const > preg() const
std::shared_ptr< EDLooperBase > fillLooper(eventsetup::EventSetupsController &esController, eventsetup::EventSetupProvider &cp, ParameterSet ¶ms, std::vector< std::string > const &loopers)
void warnAboutLegacyModules() const
void synchronousEventSetupForInstance(IOVSyncValue const &syncValue, tbb::task_group &iGroup, eventsetup::EventSetupsController &espController)
std::vector< edm::SerialTaskQueue > streamQueues_
InputSource::ItemType lastTransitionType() const
auto runLast(edm::WaitingTaskHolder iTask)
void setExceptionMessageRuns(std::string &message)
void validateLooper(ParameterSet &pset)
bool taskHasFailed() const noexcept
void swap(edm::DataFrameContainer &lhs, edm::DataFrameContainer &rhs)
Timestamp const & beginTime() const
ParameterSet const & parameterSet(StableProvenance const &provenance, ProcessHistory const &history)
void endRun(ProcessHistoryID const &phid, RunNumber_t run, bool globalBeginSucceeded, bool cleaningUpAfterException)
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
static void setThrowAnException(bool v)
void writeProcessBlockAsync(WaitingTaskHolder, ProcessBlockType)
void setLastOperationSucceeded(bool value)
StreamID streamID() const
void clear()
Not thread safe.
Timestamp const & endTime() const
void continueLumiAsync(edm::WaitingTaskHolder iHolder)
StatusCode runToCompletion()
bool continuingLumi() const
void addAdditionalInfo(std::string const &info)
FunctorWaitingTask< F > * make_waiting_task(F f)
void endUnfinishedRun(ProcessHistoryID const &phid, RunNumber_t run, bool globalBeginSucceeded, bool cleaningUpAfterException, bool eventSetupForInstanceSucceeded)
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
void throwAboutModulesRequiringLuminosityBlockSynchronization() const
unsigned int numberOfLuminosityBlocks() const
ProcessBlockPrincipal & processBlockPrincipal() const
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
std::unique_ptr< InputSource > makeInput(ParameterSet ¶ms, CommonParams const &common, std::shared_ptr< ProductRegistry > preg, std::shared_ptr< BranchIDListHelper > branchIDListHelper, std::shared_ptr< ProcessBlockHelper > const &processBlockHelper, std::shared_ptr< ThinnedAssociationsHelper > thinnedAssociationsHelper, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, PreallocationConfiguration const &allocations)
InputSource::ItemType processLumis(std::shared_ptr< void > const &iRunResource)
void insert(std::unique_ptr< ProcessBlockPrincipal >)
void adjustEventsToNewProductRegistry(std::shared_ptr< ProductRegistry const > reg)
SerialTaskQueueChain & serialQueueChain() const
std::string exceptionMessageFiles_
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
void respondToCloseInputFile()
InputSource::ItemType lastSourceTransition_
Log< level::Info, false > LogInfo
std::vector< ParameterSet > popSubProcessVParameterSet(ParameterSet ¶meterSet)
void setProcessHistoryRegistry(ProcessHistoryRegistry const &phr)
StatusCode asyncStopStatusCodeFromProcessingEvents_
void checkForModuleDependencyCorrectness(edm::PathsAndConsumesOfModulesBase const &iPnC, bool iPrintDependencies)
void respondToOpenInputFile()
bool shouldWeCloseOutput() const
MergeableRunProductMetadata * mergeableRunProductMetadata()
int readAndMergeLumi(LuminosityBlockProcessingStatus &)
T getParameter(std::string const &) const
void deleteRun(ProcessHistoryID const &phid, RunNumber_t run)
void insertForInput(std::unique_ptr< ProcessBlockPrincipal >)
void addContext(std::string const &context)
void stopProcessingEvents()
static EventNumber_t maxEventNumber()
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
std::shared_ptr< RunPrincipal > const & runPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run) const
tbb::task_group * group() const noexcept
constexpr element_type const * get() const
bool forceESCacheClearOnNewRun_
void beginLumiAsync(edm::IOVSyncValue const &iSyncValue, std::shared_ptr< void > const &iRunResource, edm::WaitingTaskHolder iHolder)
void removeModules(std::vector< ModuleDescription const * > const &modules)
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
void handleEndLumiExceptions(std::exception_ptr const *iPtr, WaitingTaskHolder &holder)
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
void streamEndLumiAsync(edm::WaitingTaskHolder iTask, unsigned int iStreamIndex)
void closeInputFile(bool cleaningUpAfterException)
void readProcessBlock(ProcessBlockPrincipal &)
static ComponentFactory< T > const * get()
unsigned int numberOfStreams() const
std::exception_ptr deferredExceptionPtr_
int totalEventsFailed() const
auto lastTask(edm::WaitingTaskHolder iTask)
PathsAndConsumesOfModules pathsAndConsumesOfModules_
edm::LuminosityBlockNumber_t nextLuminosityBlockID()
edm::propagate_const< std::shared_ptr< ProcessBlockHelper > > processBlockHelper_
Log< level::System, true > LogAbsolute
void setNextSyncValue(IOVSyncValue const &iValue)
void prepareForNextLoop()
std::shared_ptr< LuminosityBlockPrincipal > getAvailableLumiPrincipalPtr()
auto wrap(F iFunc) -> decltype(iFunc())
bool asyncStopRequestedWhileProcessingEvents_
std::shared_ptr< ActivityRegistry > actReg_
void call(std::function< void(void)>)
std::atomic< unsigned int > streamLumiActive_
Log< level::Warning, false > LogWarning
void beginProcessBlock(bool &beginProcessBlockSucceeded)
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
std::exception_ptr const * exceptionPtr() const
Returns exception thrown by dependent task.
The Signals That Services Can Subscribe To This is based on ActivityRegistry h
Helper function to determine trigger accepts.
T first(std::pair< T, U > const &p)
std::pair< ProcessHistoryID, RunNumber_t > readRun()
static ParentageRegistry * instance()
void setExceptionMessageLumis()
bool setDeferredException(std::exception_ptr)
int totalEventsPassed() const
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
bool deleteNonConsumedUnscheduledModules_
ParameterSet const & registerIt()
tuple size
Write out results.
std::pair< ProcessHistoryID, RunNumber_t > readAndMergeRun()
void inputProcessBlocks()
bool insertMapped(value_type const &v)
Transition requestedTransition() const
static Registry * instance()
std::shared_ptr< EDLooperBase const > looper() const
void merge(std::shared_ptr< RunAuxiliary > aux, std::shared_ptr< ProductRegistry const > reg)
PrincipalCache principalCache_
bool shouldWeStop() const
EventProcessor(std::unique_ptr< ParameterSet > parameterSet, ServiceToken const &token=ServiceToken(), serviceregistry::ServiceLegacy=serviceregistry::kOverlapIsError, std::vector< std::string > const &defaultServices=std::vector< std::string >(), std::vector< std::string > const &forcedServices=std::vector< std::string >())
void dumpOptionsToLogFile(unsigned int nThreads, unsigned int nStreams, unsigned int nConcurrentLumis, unsigned int nConcurrentRuns)
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const
ParameterSet * getPSetForUpdate(std::string const &name, bool &isTracked)
std::pair< edm::ProcessHistoryID, edm::RunNumber_t > nextRunID()
int maxSecondsUntilRampdown_