77 #include "boost/range/adaptor/reversed.hpp"
101 class SendSourceTerminationSignalIfException {
104 ~SendSourceTerminationSignalIfException() {
109 void completedSuccessfully() { reg_ =
nullptr; }
122 std::shared_ptr<ProductRegistry> preg,
123 std::shared_ptr<BranchIDListHelper> branchIDListHelper,
124 std::shared_ptr<ThinnedAssociationsHelper> thinnedAssociationsHelper,
125 std::shared_ptr<ActivityRegistry> areg,
126 std::shared_ptr<ProcessConfiguration const> processConfiguration,
129 if (main_input ==
nullptr) {
131 <<
"There must be exactly one source in the configuration.\n"
132 <<
"It is missing (or there are sufficient syntax errors such that it is not recognized as the source)\n";
137 std::unique_ptr<ParameterSetDescriptionFillerBase>
filler(
140 filler->fill(descriptions);
145 std::ostringstream ost;
146 ost <<
"Validating configuration of input source of type " << modtype;
162 processConfiguration.get(),
168 thinnedAssociationsHelper,
172 common.maxSecondsUntilRampdown_,
175 areg->preSourceConstructionSignal_(md);
176 std::unique_ptr<InputSource>
input;
179 std::shared_ptr<int> sentry(
nullptr, [areg, &md](
void*) { areg->postSourceConstructionSignal_(md); });
182 input->preEventReadFromSourceSignal_.connect(std::cref(areg->preEventReadFromSourceSignal_));
183 input->postEventReadFromSourceSignal_.connect(std::cref(areg->postEventReadFromSourceSignal_));
186 std::ostringstream ost;
187 ost <<
"Constructing input source of type " << modtype;
198 std::shared_ptr<EDLooperBase> vLooper;
200 std::vector<std::string> loopers =
params.getParameter<std::vector<std::string>>(
"@all_loopers");
202 if (loopers.empty()) {
206 assert(1 == loopers.size());
208 for (std::vector<std::string>::iterator itName = loopers.begin(), itNameEnd = loopers.end(); itName != itNameEnd;
221 std::vector<std::string>
const& defaultServices,
222 std::vector<std::string>
const& forcedServices)
225 branchIDListHelper_(),
228 espController_(new eventsetup::EventSetupsController),
231 processConfiguration_(),
237 deferredExceptionPtrIsSet_(
false),
241 beginJobCalled_(
false),
242 shouldWeStop_(
false),
243 fileModeNoMerge_(
false),
244 exceptionMessageFiles_(),
245 exceptionMessageRuns_(),
246 exceptionMessageLumis_(
false),
247 forceLooperToEnd_(
false),
248 looperBeginJobRun_(
false),
249 forceESCacheClearOnNewRun_(
false),
250 eventSetupDataToExcludeFromPrefetching_() {
252 processDesc->addServices(defaultServices, forcedServices);
253 init(processDesc, iToken, iLegacy);
257 std::vector<std::string>
const& defaultServices,
258 std::vector<std::string>
const& forcedServices)
261 branchIDListHelper_(),
264 espController_(new eventsetup::EventSetupsController),
267 processConfiguration_(),
273 deferredExceptionPtrIsSet_(
false),
277 beginJobCalled_(
false),
278 shouldWeStop_(
false),
279 fileModeNoMerge_(
false),
280 exceptionMessageFiles_(),
281 exceptionMessageRuns_(),
282 exceptionMessageLumis_(
false),
283 forceLooperToEnd_(
false),
284 looperBeginJobRun_(
false),
285 forceESCacheClearOnNewRun_(
false),
286 asyncStopRequestedWhileProcessingEvents_(
false),
287 eventSetupDataToExcludeFromPrefetching_() {
289 processDesc->addServices(defaultServices, forcedServices);
298 branchIDListHelper_(),
301 espController_(new eventsetup::EventSetupsController),
304 processConfiguration_(),
310 deferredExceptionPtrIsSet_(
false),
314 beginJobCalled_(
false),
315 shouldWeStop_(
false),
316 fileModeNoMerge_(
false),
317 exceptionMessageFiles_(),
318 exceptionMessageRuns_(),
319 exceptionMessageLumis_(
false),
320 forceLooperToEnd_(
false),
321 looperBeginJobRun_(
false),
322 forceESCacheClearOnNewRun_(
false),
323 asyncStopRequestedWhileProcessingEvents_(
false),
324 eventSetupDataToExcludeFromPrefetching_() {
339 std::shared_ptr<ParameterSet>
parameterSet = processDesc->getProcessPSet();
343 bool const hasSubProcesses = !subProcessVParameterSet.empty();
356 <<
"Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
363 unsigned int nThreads = optionsPset.getUntrackedParameter<
unsigned int>(
"numberOfThreads");
369 unsigned int nStreams = optionsPset.getUntrackedParameter<
unsigned int>(
"numberOfStreams");
376 unsigned int nConcurrentRuns = optionsPset.getUntrackedParameter<
unsigned int>(
"numberOfConcurrentRuns");
377 if (nConcurrentRuns != 1) {
379 <<
"Although the plan is to change this in the future, currently nConcurrentRuns must always be 1.\n";
381 unsigned int nConcurrentLumis =
382 optionsPset.getUntrackedParameter<
unsigned int>(
"numberOfConcurrentLuminosityBlocks");
383 if (nConcurrentLumis == 0) {
384 nConcurrentLumis = nConcurrentRuns;
407 auto& serviceSets = processDesc->getServicesPSets();
416 handler->willBeUsingThreads();
423 ParameterSet const& eventSetupPset(optionsPset.getUntrackedParameterSet(
"eventSetup"));
434 nConcurrentLumis = 1;
441 lumiQueue_ = std::make_unique<LimitedTaskQueue>(nConcurrentLumis);
449 items.branchIDListHelper(),
450 items.thinnedAssociationsHelper(),
452 items.processConfiguration(),
474 auto ep = std::make_shared<EventPrincipal>(
preg(),
499 for (
auto& subProcessPSet : subProcessVParameterSet) {
549 actReg_->preallocateSignal_(bounds);
577 ex.
addContext(
"Calling beginJob for the source");
584 actReg_->postBeginJobSignal_();
595 "Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
604 c.call([&subProcess,
i]() { subProcess.doEndStream(
i); });
608 c.call([actReg]() { actReg->preEndJobSignal_(); });
617 c.call([actReg]() { actReg->postEndJobSignal_(); });
626 return schedule_->getAllModuleDescriptions();
642 #include "TransitionProcessors.icc"
646 bool returnValue =
false;
662 SendSourceTerminationSignalIfException sentry(
actReg_.get());
666 itemType =
input_->nextItemType();
670 sentry.completedSuccessfully();
683 return std::make_pair(
input_->reducedProcessHistoryID(),
input_->run());
702 bool firstTime =
true;
712 auto trans =
fp.processFiles(*
this);
723 throw cms::Exception(
"BadTransition") <<
"Unexpected transition change " << trans;
732 "Another exception was caught while trying to clean up lumis after the primary fatal exception.");
733 e.addAdditionalInfo(message);
734 if (
e.alreadyPrinted()) {
740 if (
e.alreadyPrinted()) {
746 if (
e.alreadyPrinted()) {
758 FDEBUG(1) <<
" \treadFile\n";
760 SendSourceTerminationSignalIfException sentry(
actReg_.get());
765 if (size < preg_->
size()) {
772 sentry.completedSuccessfully();
776 if (
fb_.get() !=
nullptr) {
777 SendSourceTerminationSignalIfException sentry(
actReg_.get());
778 input_->closeFile(
fb_.get(), cleaningUpAfterException);
779 sentry.completedSuccessfully();
781 FDEBUG(1) <<
"\tcloseInputFile\n";
785 if (
fb_.get() !=
nullptr) {
789 FDEBUG(1) <<
"\topenOutputFiles\n";
793 if (
fb_.get() !=
nullptr) {
797 FDEBUG(1) <<
"\tcloseOutputFiles\n";
802 [
this](
auto& subProcess) { subProcess.updateBranchIDListHelper(
branchIDListHelper_->branchIDLists()); });
803 if (
fb_.get() !=
nullptr) {
807 FDEBUG(1) <<
"\trespondToOpenInputFile\n";
811 if (
fb_.get() !=
nullptr) {
815 FDEBUG(1) <<
"\trespondToCloseInputFile\n";
825 FDEBUG(1) <<
"\tstartingNewLoop\n";
831 looper_->setModuleChanger(&changer);
833 looper_->setModuleChanger(
nullptr);
839 FDEBUG(1) <<
"\tendOfLoop\n";
846 FDEBUG(1) <<
"\trewind\n";
851 FDEBUG(1) <<
"\tprepareForNextLoop\n";
855 FDEBUG(1) <<
"\tshouldWeCloseOutput\n";
858 if (subProcess.shouldWeCloseOutput()) {
868 FDEBUG(1) <<
"\tdoErrorStuff\n";
869 LogError(
"StateMachine") <<
"The EventProcessor state machine encountered an unexpected event\n"
870 <<
"and went to the error state\n"
871 <<
"Will attempt to terminate processing normally\n"
872 <<
"(IF using the looper the next loop will be attempted)\n"
873 <<
"This likely indicates a bug in an input module or corrupted input or both\n";
882 globalWaitTask->increment_ref_count();
885 beginGlobalTransitionAsync<Traits>(
888 globalWaitTask->wait_for_all();
889 if (globalWaitTask->exceptionPtr() !=
nullptr) {
890 std::rethrow_exception(*(globalWaitTask->exceptionPtr()));
892 beginProcessBlockSucceeded =
true;
902 while (
input_->readProcessBlock()) {
908 globalWaitTask->increment_ref_count();
911 beginGlobalTransitionAsync<Traits>(
914 globalWaitTask->wait_for_all();
915 if (globalWaitTask->exceptionPtr() !=
nullptr) {
916 std::rethrow_exception(*(globalWaitTask->exceptionPtr()));
920 writeWaitTask->increment_ref_count();
922 writeWaitTask->wait_for_all();
923 if (writeWaitTask->exceptionPtr()) {
924 std::rethrow_exception(*writeWaitTask->exceptionPtr());
939 globalWaitTask->increment_ref_count();
947 cleaningUpAfterException);
949 globalWaitTask->wait_for_all();
950 if (globalWaitTask->exceptionPtr() !=
nullptr) {
951 std::rethrow_exception(*(globalWaitTask->exceptionPtr()));
954 if (beginProcessBlockSucceeded) {
956 writeWaitTask->increment_ref_count();
958 writeWaitTask->wait_for_all();
959 if (writeWaitTask->exceptionPtr()) {
960 std::rethrow_exception(*writeWaitTask->exceptionPtr());
972 bool& globalBeginSucceeded,
973 bool& eventSetupForInstanceSucceeded) {
974 globalBeginSucceeded =
false;
977 SendSourceTerminationSignalIfException sentry(
actReg_.get());
980 sentry.completedSuccessfully();
988 SendSourceTerminationSignalIfException sentry(
actReg_.get());
990 eventSetupForInstanceSucceeded =
true;
991 sentry.completedSuccessfully();
993 auto const& es =
esp_->eventSetupImpl();
1003 globalWaitTask->increment_ref_count();
1005 beginGlobalTransitionAsync<Traits>(
1007 globalWaitTask->wait_for_all();
1008 if (globalWaitTask->exceptionPtr() !=
nullptr) {
1009 std::rethrow_exception(*(globalWaitTask->exceptionPtr()));
1012 globalBeginSucceeded =
true;
1013 FDEBUG(1) <<
"\tbeginRun " <<
run <<
"\n";
1020 streamLoopWaitTask->increment_ref_count();
1025 beginStreamsTransitionAsync<Traits>(streamLoopWaitTask.get(),
1032 streamLoopWaitTask->wait_for_all();
1033 if (streamLoopWaitTask->exceptionPtr() !=
nullptr) {
1034 std::rethrow_exception(*(streamLoopWaitTask->exceptionPtr()));
1037 FDEBUG(1) <<
"\tstreamBeginRun " <<
run <<
"\n";
1045 bool globalBeginSucceeded,
1046 bool cleaningUpAfterException,
1047 bool eventSetupForInstanceSucceeded) {
1048 if (eventSetupForInstanceSucceeded) {
1050 endRun(phid,
run, globalBeginSucceeded, cleaningUpAfterException);
1052 if (globalBeginSucceeded) {
1054 t->increment_ref_count();
1061 if (
t->exceptionPtr()) {
1062 std::rethrow_exception(*
t->exceptionPtr());
1071 bool globalBeginSucceeded,
1072 bool cleaningUpAfterException) {
1080 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1082 sentry.completedSuccessfully();
1084 auto const& es =
esp_->eventSetupImpl();
1085 if (globalBeginSucceeded) {
1088 streamLoopWaitTask->increment_ref_count();
1099 cleaningUpAfterException);
1101 streamLoopWaitTask->wait_for_all();
1102 if (streamLoopWaitTask->exceptionPtr() !=
nullptr) {
1103 std::rethrow_exception(*(streamLoopWaitTask->exceptionPtr()));
1106 FDEBUG(1) <<
"\tstreamEndRun " <<
run <<
"\n";
1112 globalWaitTask->increment_ref_count();
1121 cleaningUpAfterException);
1122 globalWaitTask->wait_for_all();
1123 if (globalWaitTask->exceptionPtr() !=
nullptr) {
1124 std::rethrow_exception(*(globalWaitTask->exceptionPtr()));
1127 FDEBUG(1) <<
"\tendRun " <<
run <<
"\n";
1135 waitTask->increment_ref_count();
1143 input_->luminosityBlockAuxiliary()->beginTime()),
1147 waitTask->wait_for_all();
1149 if (waitTask->exceptionPtr() !=
nullptr) {
1150 std::rethrow_exception(*(waitTask->exceptionPtr()));
1156 std::shared_ptr<void>
const& iRunResource,
1175 status->resetResources();
1190 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1193 sentry.completedSuccessfully();
1199 rng->preBeginLumi(lb);
1204 tbb::task::allocate_root(), [
this, holder = iHolder,
status](std::exception_ptr
const* iPtr)
mutable {
1206 status->resetResources();
1207 holder.doneWaiting(*iPtr);
1209 status->globalBeginDidSucceed();
1219 status->resetResources();
1220 holder.doneWaiting(std::current_exception());
1233 std::exception_ptr
const* exceptionFromBeginStreamLumi)
mutable {
1234 if (exceptionFromBeginStreamLumi) {
1236 tmp.doneWaiting(*exceptionFromBeginStreamLumi);
1246 auto eventSetupImpls = &
status->eventSetupImpls();
1247 auto lp =
status->lumiPrincipal().get();
1250 event.setLuminosityBlockPrincipal(lp);
1252 beginStreamTransitionAsync<Traits>(
1266 beginGlobalTransitionAsync<Traits>(
1270 status->resetResources();
1277 tbb::task::allocate_root(),
1278 [
this, lumiWorkLambda =
std::move(lumiWork), iHolder](std::exception_ptr
const* iPtr)
mutable {
1295 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1301 iSync, queueLumiWorkTaskHolder,
status->endIOVWaitingTasks(),
status->eventSetupImpls());
1302 sentry.completedSuccessfully();
1304 queueLumiWorkTaskHolder.doneWaiting(std::current_exception());
1316 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1323 iSync, queueLumiWorkTaskHolder,
status->endIOVWaitingTasks(),
status->eventSetupImpls());
1324 sentry.completedSuccessfully();
1327 queueLumiWorkTaskHolder.doneWaiting(std::current_exception());
1336 status->needToContinueLumi();
1337 status->startProcessingEvents();
1340 unsigned int streamIndex = 0;
1354 tmp.doneWaiting(*iPtr);
1361 std::shared_ptr<LuminosityBlockProcessingStatus> iLumiStatus) {
1364 auto& lp = *(iLumiStatus->lumiPrincipal());
1365 bool didGlobalBeginSucceed = iLumiStatus->didGlobalBeginSucceed();
1366 bool cleaningUpAfterException = iLumiStatus->cleaningUpAfterException();
1368 std::vector<std::shared_ptr<const EventSetupImpl>>
const* eventSetupImpls = &iLumiStatus->eventSetupImpls();
1371 tbb::task::allocate_root(),
1373 std::exception_ptr ptr;
1381 auto& lumiPrincipal = *(
status->lumiPrincipal());
1386 ptr = std::current_exception();
1397 ptr = std::current_exception();
1402 status->resumeGlobalLumiQueue();
1406 ptr = std::current_exception();
1414 status->resetResources();
1418 ptr = std::current_exception();
1428 tbb::task::allocate_root(),
1430 std::exception_ptr
const* iExcept)
mutable {
1432 task.doneWaiting(*iExcept);
1435 if (didGlobalBeginSucceed) {
1445 endGlobalTransitionAsync<Traits>(
1451 [
this, iStreamIndex, iTask](std::exception_ptr
const* iPtr)
mutable {
1462 if (
status->streamFinishedLumi()) {
1472 lumiStatus->setEndTime();
1476 bool cleaningUpAfterException = lumiStatus->cleaningUpAfterException();
1477 auto eventSetupImpls = &lumiStatus->eventSetupImpls();
1479 if (lumiStatus->didGlobalBeginSucceed()) {
1480 auto& lumiPrincipal = *lumiStatus->lumiPrincipal();
1482 lumiPrincipal.endTime());
1485 endStreamTransitionAsync<Traits>(
std::move(lumiDoneTask),
1491 cleaningUpAfterException);
1498 globalWaitTask->increment_ref_count();
1507 globalWaitTask->wait_for_all();
1508 if (globalWaitTask->exceptionPtr() !=
nullptr) {
1509 std::rethrow_exception(*(globalWaitTask->exceptionPtr()));
1517 <<
"Illegal attempt to insert run into cache\n"
1518 <<
"Contact a Framework Developer\n";
1520 auto rp = std::make_shared<RunPrincipal>(
input_->runAuxiliary(),
1528 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1530 sentry.completedSuccessfully();
1532 assert(
input_->reducedProcessHistoryID() == rp->reducedProcessHistoryID());
1534 return std::make_pair(rp->reducedProcessHistoryID(),
input_->run());
1541 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1542 input_->readAndMergeRun(*runPrincipal);
1543 sentry.completedSuccessfully();
1545 assert(
input_->reducedProcessHistoryID() == runPrincipal->reducedProcessHistoryID());
1546 return std::make_pair(runPrincipal->reducedProcessHistoryID(),
input_->run());
1552 <<
"Illegal attempt to insert lumi into cache\n"
1553 <<
"Run is invalid\n"
1554 <<
"Contact a Framework Developer\n";
1558 lbp->setAux(*
input_->luminosityBlockAuxiliary());
1560 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1562 sentry.completedSuccessfully();
1570 assert(lumiPrincipal.aux().sameIdentity(*
input_->luminosityBlockAuxiliary())
or
1571 input_->processHistoryRegistry().reducedProcessHistoryID(lumiPrincipal.aux().processHistoryID()) ==
1572 input_->processHistoryRegistry().reducedProcessHistoryID(
1573 input_->luminosityBlockAuxiliary()->processHistoryID()));
1574 bool lumiOK = lumiPrincipal.adjustToNewProductRegistry(*
preg());
1576 lumiPrincipal.mergeAuxiliary(*
input_->luminosityBlockAuxiliary());
1578 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1580 sentry.completedSuccessfully();
1582 return input_->luminosityBlock();
1587 [
this,
task, processBlockType](std::exception_ptr
const* iExcept)
mutable {
1589 task.doneWaiting(*iExcept);
1593 s.writeProcessBlockAsync(
task, processBlockType);
1609 tbb::task::allocate_root(),
1610 [
this, phid,
run,
task, mergeableRunProductMetadata](std::exception_ptr
const* iExcept)
mutable {
1612 task.doneWaiting(*iExcept);
1616 s.writeRunAsync(
task, phid,
run, mergeableRunProductMetadata);
1625 mergeableRunProductMetadata);
1631 FDEBUG(1) <<
"\tdeleteRunFromCache " <<
run <<
"\n";
1636 [
this,
task, &lumiPrincipal](std::exception_ptr
const* iExcept)
mutable {
1638 task.doneWaiting(*iExcept);
1642 s.writeLumiAsync(
task, lumiPrincipal);
1684 std::lock_guard<std::recursive_mutex> guard(*(
sourceMutex_.get()));
1696 input_->luminosityBlockAuxiliary()->beginTime()));
1713 bool expected =
false;
1733 tbb::task::allocate_root(), [
this, iTask, iStreamIndex](std::exception_ptr
const* iPtr)
mutable {
1737 bool expected =
false;
1755 if (
status->isLumiEnding()) {
1771 bool expected =
false;
1773 auto e = std::current_exception();
1786 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1790 sentry.completedSuccessfully();
1792 FDEBUG(1) <<
"\treadEvent\n";
1807 rng->postEventRead(
ev);
1811 tbb::task::allocate_root(), [
this, pep, iHolder, iStreamIndex](std::exception_ptr
const* iPtr)
mutable {
1818 FDEBUG(1) <<
"\tprocessEvent\n";
1819 pep->clearEventPrincipal();
1828 afterProcessTask =
std::move(finalizeEventTask);
1833 tbb::task::allocate_root(),
1834 [
this, pep, finalizeEventTask, iStreamIndex](std::exception_ptr
const* iPtr)
mutable {
1840 subProcess.doEventAsync(finalizeEventTask, *pep, &
streamLumiStatus_[iStreamIndex]->eventSetupImpls());
1854 bool randomAccess =
input_->randomAccess();
1863 status =
looper_->doDuringLoop(iPrincipal, es, pc, &streamContext);
1882 FDEBUG(1) <<
"\tshouldWeStop\n";
1887 if (subProcess.terminate()) {
1903 bool expected =
false;
1912 std::unique_ptr<LogSystem>
s;
1913 for (
auto worker :
schedule_->allWorkers()) {
1914 if (worker->wantsGlobalLuminosityBlocks() and worker->globalLuminosityBlocksQueue()) {
1916 s = std::make_unique<LogSystem>(
"ModulesSynchingOnLumis");
1917 (*s) <<
"The following modules require synchronizing on LuminosityBlock boundaries:";
1919 (*s) <<
"\n " << worker->description().moduleName() <<
" " << worker->description().moduleLabel();