73 #include "boost/range/adaptor/reversed.hpp" 97 class SendSourceTerminationSignalIfException {
100 ~SendSourceTerminationSignalIfException() {
105 void completedSuccessfully() { reg_ =
nullptr; }
118 std::shared_ptr<ProductRegistry> preg,
119 std::shared_ptr<BranchIDListHelper> branchIDListHelper,
120 std::shared_ptr<ThinnedAssociationsHelper> thinnedAssociationsHelper,
121 std::shared_ptr<ActivityRegistry> areg,
122 std::shared_ptr<ProcessConfiguration const> processConfiguration,
125 if (main_input ==
nullptr) {
127 <<
"There must be exactly one source in the configuration.\n" 128 <<
"It is missing (or there are sufficient syntax errors such that it is not recognized as the source)\n";
133 std::unique_ptr<ParameterSetDescriptionFillerBase>
filler(
136 filler->fill(descriptions);
141 std::ostringstream ost;
142 ost <<
"Validating configuration of input source of type " << modtype;
158 processConfiguration.get(),
159 ModuleDescription::getUniqueID());
164 thinnedAssociationsHelper,
171 areg->preSourceConstructionSignal_(md);
172 std::unique_ptr<InputSource>
input;
175 std::shared_ptr<int> sentry(
nullptr, [areg, &md](
void*) { areg->postSourceConstructionSignal_(md); });
177 input = std::unique_ptr<InputSource>(
InputSourceFactory::get()->makeInputSource(*main_input, isdesc).release());
178 input->preEventReadFromSourceSignal_.connect(std::cref(areg->preEventReadFromSourceSignal_));
179 input->postEventReadFromSourceSignal_.connect(std::cref(areg->postEventReadFromSourceSignal_));
182 std::ostringstream ost;
183 ost <<
"Constructing input source of type " << modtype;
194 std::shared_ptr<EDLooperBase> vLooper;
196 std::vector<std::string> loopers = params.
getParameter<std::vector<std::string>>(
"@all_loopers");
198 if (loopers.empty()) {
202 assert(1 == loopers.size());
204 for (std::vector<std::string>::iterator itName = loopers.begin(), itNameEnd = loopers.end(); itName != itNameEnd;
217 std::vector<std::string>
const& defaultServices,
218 std::vector<std::string>
const& forcedServices)
221 branchIDListHelper_(),
224 espController_(new eventsetup::EventSetupsController),
227 processConfiguration_(),
233 deferredExceptionPtrIsSet_(
false),
237 beginJobCalled_(
false),
238 shouldWeStop_(
false),
239 fileModeNoMerge_(
false),
240 exceptionMessageFiles_(),
241 exceptionMessageRuns_(),
242 exceptionMessageLumis_(
false),
243 forceLooperToEnd_(
false),
244 looperBeginJobRun_(
false),
245 forceESCacheClearOnNewRun_(
false),
246 eventSetupDataToExcludeFromPrefetching_() {
247 auto processDesc = std::make_shared<ProcessDesc>(
std::move(parameterSet));
248 processDesc->addServices(defaultServices, forcedServices);
249 init(processDesc, iToken, iLegacy);
253 std::vector<std::string>
const& defaultServices,
254 std::vector<std::string>
const& forcedServices)
284 auto processDesc = std::make_shared<ProcessDesc>(
std::move(parameterSet));
285 processDesc->addServices(defaultServices, forcedServices);
321 init(processDesc, token, legacy);
335 std::shared_ptr<ParameterSet>
parameterSet = processDesc->getProcessPSet();
339 bool const hasSubProcesses = !subProcessVParameterSet.empty();
347 ParameterSet const& optionsPset(parameterSet->getUntrackedParameterSet(
"options"));
349 if (fileMode !=
"NOMERGE" and fileMode !=
"FULLMERGE") {
352 <<
"Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
359 unsigned int nThreads = optionsPset.getUntrackedParameter<
unsigned int>(
"numberOfThreads");
363 assert(nThreads != 0);
365 unsigned int nStreams = optionsPset.getUntrackedParameter<
unsigned int>(
"numberOfStreams");
369 if (nThreads > 1
or nStreams > 1) {
370 edm::LogInfo(
"ThreadStreamSetup") <<
"setting # threads " << nThreads <<
"\nsetting # streams " << nStreams;
372 unsigned int nConcurrentRuns = optionsPset.getUntrackedParameter<
unsigned int>(
"numberOfConcurrentRuns");
373 if (nConcurrentRuns != 1) {
375 <<
"Although the plan is to change this in the future, currently nConcurrentRuns must always be 1.\n";
377 unsigned int nConcurrentLumis =
378 optionsPset.getUntrackedParameter<
unsigned int>(
"numberOfConcurrentLuminosityBlocks");
379 if (nConcurrentLumis == 0) {
380 nConcurrentLumis = nConcurrentRuns;
403 auto& serviceSets = processDesc->getServicesPSets();
404 ServiceToken token = items.initServices(serviceSets, *parameterSet, iToken, iLegacy,
true);
412 handler->willBeUsingThreads();
416 std::shared_ptr<CommonParams>
common(items.initMisc(*parameterSet));
419 ParameterSet const& eventSetupPset(optionsPset.getUntrackedParameterSet(
"eventSetup"));
420 esp_ =
espController_->makeProvider(*parameterSet, items.actReg_.get(), &eventSetupPset);
425 looper_->setActionTable(items.act_table_.get());
426 looper_->attachTo(*items.actReg_);
430 nConcurrentLumis = 1;
437 lumiQueue_ = std::make_unique<LimitedTaskQueue>(nConcurrentLumis);
445 items.branchIDListHelper(),
446 items.thinnedAssociationsHelper(),
448 items.processConfiguration(),
457 preg_ = items.preg();
465 FDEBUG(2) << parameterSet << std::endl;
470 auto ep = std::make_shared<EventPrincipal>(
preg(),
487 for (
auto& subProcessPSet : subProcessVParameterSet) {
535 actReg_->preallocateSignal_(bounds);
563 ex.
addContext(
"Calling beginJob for the source");
570 actReg_->postBeginJobSignal_();
581 "Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
590 c.
call([&subProcess,
i]() { subProcess.doEndStream(
i); });
594 c.
call([actReg]() { actReg->preEndJobSignal_(); });
603 c.
call([actReg]() { actReg->postEndJobSignal_(); });
612 return schedule_->getAllModuleDescriptions();
630 #include "TransitionProcessors.icc" 634 bool returnValue =
false;
650 SendSourceTerminationSignalIfException sentry(
actReg_.get());
654 itemType =
input_->nextItemType();
658 sentry.completedSuccessfully();
671 return std::make_pair(
input_->reducedProcessHistoryID(),
input_->run());
690 bool firstTime =
true;
700 auto trans = fp.processFiles(*
this);
711 throw cms::Exception(
"BadTransition") <<
"Unexpected transition change " << trans;
720 "Another exception was caught while trying to clean up lumis after the primary fatal exception.");
746 FDEBUG(1) <<
" \treadFile\n";
748 SendSourceTerminationSignalIfException sentry(
actReg_.get());
753 if (size < preg_->
size()) {
760 sentry.completedSuccessfully();
764 if (
fb_.get() !=
nullptr) {
765 SendSourceTerminationSignalIfException sentry(
actReg_.get());
766 input_->closeFile(
fb_.get(), cleaningUpAfterException);
767 sentry.completedSuccessfully();
769 FDEBUG(1) <<
"\tcloseInputFile\n";
773 if (
fb_.get() !=
nullptr) {
777 FDEBUG(1) <<
"\topenOutputFiles\n";
781 if (
fb_.get() !=
nullptr) {
785 FDEBUG(1) <<
"\tcloseOutputFiles\n";
790 [
this](
auto& subProcess) { subProcess.updateBranchIDListHelper(
branchIDListHelper_->branchIDLists()); });
791 if (
fb_.get() !=
nullptr) {
795 FDEBUG(1) <<
"\trespondToOpenInputFile\n";
799 if (
fb_.get() !=
nullptr) {
803 FDEBUG(1) <<
"\trespondToCloseInputFile\n";
813 FDEBUG(1) <<
"\tstartingNewLoop\n";
819 looper_->setModuleChanger(&changer);
821 looper_->setModuleChanger(
nullptr);
827 FDEBUG(1) <<
"\tendOfLoop\n";
834 FDEBUG(1) <<
"\trewind\n";
839 FDEBUG(1) <<
"\tprepareForNextLoop\n";
843 FDEBUG(1) <<
"\tshouldWeCloseOutput\n";
846 if (subProcess.shouldWeCloseOutput()) {
856 FDEBUG(1) <<
"\tdoErrorStuff\n";
857 LogError(
"StateMachine") <<
"The EventProcessor state machine encountered an unexpected event\n" 858 <<
"and went to the error state\n" 859 <<
"Will attempt to terminate processing normally\n" 860 <<
"(IF using the looper the next loop will be attempted)\n" 861 <<
"This likely indicates a bug in an input module or corrupted input or both\n";
866 bool& globalBeginSucceeded,
867 bool& eventSetupForInstanceSucceeded) {
868 globalBeginSucceeded =
false;
871 SendSourceTerminationSignalIfException sentry(
actReg_.get());
874 sentry.completedSuccessfully();
882 SendSourceTerminationSignalIfException sentry(
actReg_.get());
884 eventSetupForInstanceSucceeded =
true;
885 sentry.completedSuccessfully();
887 auto const& es =
esp_->eventSetupImpl();
897 globalWaitTask->increment_ref_count();
906 globalWaitTask->wait_for_all();
907 if (globalWaitTask->exceptionPtr() !=
nullptr) {
908 std::rethrow_exception(*(globalWaitTask->exceptionPtr()));
911 globalBeginSucceeded =
true;
912 FDEBUG(1) <<
"\tbeginRun " << run <<
"\n";
919 streamLoopWaitTask->increment_ref_count();
923 beginStreamsTransitionAsync<Traits>(streamLoopWaitTask.get(),
933 streamLoopWaitTask->wait_for_all();
934 if (streamLoopWaitTask->exceptionPtr() !=
nullptr) {
935 std::rethrow_exception(*(streamLoopWaitTask->exceptionPtr()));
938 FDEBUG(1) <<
"\tstreamBeginRun " << run <<
"\n";
946 bool globalBeginSucceeded,
947 bool cleaningUpAfterException,
948 bool eventSetupForInstanceSucceeded) {
949 if (eventSetupForInstanceSucceeded) {
951 endRun(phid, run, globalBeginSucceeded, cleaningUpAfterException);
953 if (globalBeginSucceeded) {
955 t->increment_ref_count();
962 if (
t->exceptionPtr()) {
963 std::rethrow_exception(*
t->exceptionPtr());
972 bool globalBeginSucceeded,
973 bool cleaningUpAfterException) {
981 SendSourceTerminationSignalIfException sentry(
actReg_.get());
983 sentry.completedSuccessfully();
985 auto const& es =
esp_->eventSetupImpl();
986 if (globalBeginSucceeded) {
989 streamLoopWaitTask->increment_ref_count();
1002 cleaningUpAfterException);
1004 streamLoopWaitTask->wait_for_all();
1005 if (streamLoopWaitTask->exceptionPtr() !=
nullptr) {
1006 std::rethrow_exception(*(streamLoopWaitTask->exceptionPtr()));
1009 FDEBUG(1) <<
"\tstreamEndRun " << run <<
"\n";
1015 globalWaitTask->increment_ref_count();
1026 cleaningUpAfterException);
1027 globalWaitTask->wait_for_all();
1028 if (globalWaitTask->exceptionPtr() !=
nullptr) {
1029 std::rethrow_exception(*(globalWaitTask->exceptionPtr()));
1032 FDEBUG(1) <<
"\tendRun " << run <<
"\n";
1040 waitTask->increment_ref_count();
1048 input_->luminosityBlockAuxiliary()->beginTime()),
1052 waitTask->wait_for_all();
1054 if (waitTask->exceptionPtr() !=
nullptr) {
1055 std::rethrow_exception(*(waitTask->exceptionPtr()));
1061 std::shared_ptr<void>
const& iRunResource,
1080 status->resetResources();
1095 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1098 sentry.completedSuccessfully();
1104 rng->preBeginLumi(lb);
1111 tbb::task::allocate_root(), [
this, holder = iHolder,
status, ts](std::exception_ptr
const* iPtr)
mutable {
1113 status->resetResources();
1114 holder.doneWaiting(*iPtr);
1116 status->globalBeginDidSucceed();
1125 status->resetResources();
1126 holder.doneWaiting(std::current_exception());
1137 tbb::task::allocate_root(),
1138 [
this,
i,
h = holder](std::exception_ptr
const* exceptionFromBeginStreamLumi)
mutable {
1139 if (exceptionFromBeginStreamLumi) {
1150 auto lp =
status->lumiPrincipal();
1151 event.setLuminosityBlockPrincipal(lp.get());
1158 &
status->eventSetupImpls(),
1172 beginGlobalTransitionAsync<Traits>(beginStreamsHolder,
1177 &
status->eventSetupImpls(),
1182 status->resetResources();
1189 tbb::task::allocate_root(),
1190 [
this, lumiWorkLambda =
std::move(lumiWork), iHolder](std::exception_ptr
const* iPtr)
mutable {
1206 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1212 iSync, queueLumiWorkTaskHolder,
status->endIOVWaitingTasks(),
status->eventSetupImpls());
1213 sentry.completedSuccessfully();
1215 queueLumiWorkTaskHolder.doneWaiting(std::current_exception());
1227 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1234 iSync, queueLumiWorkTaskHolder,
status->endIOVWaitingTasks(),
status->eventSetupImpls());
1235 sentry.completedSuccessfully();
1238 queueLumiWorkTaskHolder.doneWaiting(std::current_exception());
1247 status->needToContinueLumi();
1248 status->startProcessingEvents();
1251 unsigned int streamIndex = 0;
1272 std::shared_ptr<LuminosityBlockProcessingStatus> iLumiStatus) {
1275 auto& lp = *(iLumiStatus->lumiPrincipal());
1276 bool didGlobalBeginSucceed = iLumiStatus->didGlobalBeginSucceed();
1277 bool cleaningUpAfterException = iLumiStatus->cleaningUpAfterException();
1279 std::vector<std::shared_ptr<const EventSetupImpl>>
const* eventSetupImpls = &iLumiStatus->eventSetupImpls();
1282 tbb::task::allocate_root(),
1284 std::exception_ptr ptr;
1291 auto& lumiPrincipal = *(
status->lumiPrincipal());
1296 ptr = std::current_exception();
1309 ptr = std::current_exception();
1314 status->resumeGlobalLumiQueue();
1318 ptr = std::current_exception();
1326 status->resetResources();
1330 ptr = std::current_exception();
1340 tbb::task::allocate_root(),
1342 std::exception_ptr
const* iExcept)
mutable {
1344 task.doneWaiting(*iExcept);
1347 if (didGlobalBeginSucceed) {
1365 cleaningUpAfterException);
1369 unsigned int iStreamIndex,
1370 std::shared_ptr<LuminosityBlockProcessingStatus> iLumiStatus) {
1372 [
this, iStreamIndex, iTask](std::exception_ptr
const* iPtr)
mutable {
1383 if (
status->streamFinishedLumi()) {
1390 iLumiStatus->setEndTime();
1392 if (iLumiStatus->didGlobalBeginSucceed()) {
1393 auto& lumiPrincipal = *iLumiStatus->lumiPrincipal();
1395 lumiPrincipal.endTime());
1398 bool cleaningUpAfterException = iLumiStatus->cleaningUpAfterException();
1401 endStreamTransitionAsync<Traits>(
std::move(lumiDoneTask),
1407 &iLumiStatus->eventSetupImpls(),
1410 cleaningUpAfterException);
1417 globalWaitTask->increment_ref_count();
1426 globalWaitTask->wait_for_all();
1427 if (globalWaitTask->exceptionPtr() !=
nullptr) {
1428 std::rethrow_exception(*(globalWaitTask->exceptionPtr()));
1436 <<
"Illegal attempt to insert run into cache\n" 1437 <<
"Contact a Framework Developer\n";
1439 auto rp = std::make_shared<RunPrincipal>(
input_->runAuxiliary(),
1447 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1449 sentry.completedSuccessfully();
1451 assert(
input_->reducedProcessHistoryID() == rp->reducedProcessHistoryID());
1453 return std::make_pair(rp->reducedProcessHistoryID(),
input_->run());
1460 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1461 input_->readAndMergeRun(*runPrincipal);
1462 sentry.completedSuccessfully();
1464 assert(
input_->reducedProcessHistoryID() == runPrincipal->reducedProcessHistoryID());
1465 return std::make_pair(runPrincipal->reducedProcessHistoryID(),
input_->run());
1471 <<
"Illegal attempt to insert lumi into cache\n" 1472 <<
"Run is invalid\n" 1473 <<
"Contact a Framework Developer\n";
1477 lbp->setAux(*
input_->luminosityBlockAuxiliary());
1479 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1481 sentry.completedSuccessfully();
1489 assert(lumiPrincipal.aux().sameIdentity(*
input_->luminosityBlockAuxiliary())
or 1490 input_->processHistoryRegistry().reducedProcessHistoryID(lumiPrincipal.aux().processHistoryID()) ==
1491 input_->processHistoryRegistry().reducedProcessHistoryID(
1492 input_->luminosityBlockAuxiliary()->processHistoryID()));
1493 bool lumiOK = lumiPrincipal.adjustToNewProductRegistry(*
preg());
1495 lumiPrincipal.mergeAuxiliary(*
input_->luminosityBlockAuxiliary());
1497 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1499 sentry.completedSuccessfully();
1501 return input_->luminosityBlock();
1509 tbb::task::allocate_root(),
1510 [
this, phid, run, task, mergeableRunProductMetadata](std::exception_ptr
const* iExcept)
mutable {
1516 s.writeRunAsync(task, phid, run, mergeableRunProductMetadata);
1525 mergeableRunProductMetadata);
1530 for_all(
subProcesses_, [run, phid](
auto& subProcess) { subProcess.deleteRunFromCache(phid, run); });
1531 FDEBUG(1) <<
"\tdeleteRunFromCache " << run <<
"\n";
1536 [
this, task, &lumiPrincipal](std::exception_ptr
const* iExcept)
mutable {
1542 s.writeLumiAsync(task, lumiPrincipal);
1583 std::lock_guard<std::recursive_mutex> guard(*(
sourceMutex_.get()));
1595 input_->luminosityBlockAuxiliary()->beginTime()));
1612 bool expected =
false;
1629 tbb::task::allocate_root(), [
this, iTask, iStreamIndex](std::exception_ptr
const* iPtr)
mutable {
1633 bool expected =
false;
1651 if (
status->isLumiEnding()) {
1667 bool expected =
false;
1669 auto e = std::current_exception();
1682 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1686 sentry.completedSuccessfully();
1688 FDEBUG(1) <<
"\treadEvent\n";
1703 rng->postEventRead(ev);
1707 tbb::task::allocate_root(), [
this, pep, iHolder, iStreamIndex](std::exception_ptr
const* iPtr)
mutable {
1714 FDEBUG(1) <<
"\tprocessEvent\n";
1715 pep->clearEventPrincipal();
1724 afterProcessTask =
std::move(finalizeEventTask);
1729 tbb::task::allocate_root(),
1730 [
this, pep, finalizeEventTask, iStreamIndex](std::exception_ptr
const* iPtr)
mutable {
1736 subProcess.doEventAsync(finalizeEventTask, *pep, &
streamLumiStatus_[iStreamIndex]->eventSetupImpls());
1749 bool randomAccess =
input_->randomAccess();
1758 status =
looper_->doDuringLoop(iPrincipal, es, pc, &streamContext);
1777 FDEBUG(1) <<
"\tshouldWeStop\n";
1782 if (subProcess.terminate()) {
1798 bool expected =
false;
1807 std::unique_ptr<LogSystem>
s;
1808 for (
auto worker :
schedule_->allWorkers()) {
1809 if (worker->wantsGlobalLuminosityBlocks() and worker->globalLuminosityBlocksQueue()) {
1811 s = std::make_unique<LogSystem>(
"ModulesSynchingOnLumis");
1812 (*s) <<
"The following modules require synchronizing on LuminosityBlock boundaries:";
1814 (*s) <<
"\n " << worker->description().moduleName() <<
" " << worker->description().moduleLabel();
std::atomic< bool > exceptionMessageLumis_
RunPrincipal const & runPrincipal() const
void readLuminosityBlock(LuminosityBlockProcessingStatus &)
void insert(std::shared_ptr< RunPrincipal > rp)
T getParameter(std::string const &) const
void readEvent(unsigned int iStreamIndex)
FWCore Framework interface EventSetupRecordImplementation h
Helper function to determine trigger accepts.
ProcessContext processContext_
bool readNextEventForStream(unsigned int iStreamIndex, LuminosityBlockProcessingStatus &iLumiStatus)
void setProcessesWithMergeableRunProducts(ProductRegistry const &productRegistry)
void clear()
Not thread safe.
bool wasEventProcessingStopped() const
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
void deleteRunFromCache(ProcessHistoryID const &phid, RunNumber_t run)
SharedResourcesAcquirer sourceResourcesAcquirer_
Timestamp const & beginTime() const
edm::EventID specifiedEventTransition() const
InputSource::ItemType nextTransitionType()
void processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
def create(alignables, pedeDump, additionalData, outputFile, config)
edm::propagate_const< std::unique_ptr< InputSource > > input_
void globalEndLumiAsync(edm::WaitingTaskHolder iTask, std::shared_ptr< LuminosityBlockProcessingStatus > iLumiStatus)
std::unique_ptr< ExceptionToActionTable const > act_table_
static PFTauRenderPlugin instance
ParameterSetID id() const
FunctorTask< F > * make_functor_task(ALLOC &&iAlloc, F f)
void beginRun(ProcessHistoryID const &phid, RunNumber_t run, bool &globalBeginSucceeded, bool &eventSetupForInstanceSucceeded)
void warnAboutModulesRequiringLuminosityBLockSynchronization() const
void setExceptionMessageFiles(std::string &message)
void processEventAsyncImpl(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
void processEventWithLooper(EventPrincipal &, unsigned int iStreamIndex)
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
std::unique_ptr< edm::LimitedTaskQueue > lumiQueue_
volatile std::atomic< bool > shutdown_flag
static LuminosityBlockNumber_t maxLuminosityBlockNumber()
void clearCounters()
Clears counters used by trigger report.
unsigned int numberOfRuns() const
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
bool checkForAsyncStopRequest(StatusCode &)
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
bool lastOperationSucceeded() const
void writeRunAsync(WaitingTaskHolder, ProcessHistoryID const &phid, RunNumber_t run, MergeableRunProductMetadata const *)
unsigned int numberOfThreads() const
void streamEndLumiAsync(edm::WaitingTaskHolder iTask, unsigned int iStreamIndex, std::shared_ptr< LuminosityBlockProcessingStatus > iLumiStatus)
bool hasRunPrincipal() const
void push(T &&iAction)
asynchronously pushes functor iAction into queue
void adjustIndexesAfterProductRegistryAddition()
std::string exceptionMessageRuns_
void deleteLumiFromCache(LuminosityBlockProcessingStatus &)
PreallocationConfiguration preallocations_
void writeLumiAsync(WaitingTaskHolder, LuminosityBlockPrincipal &lumiPrincipal)
unsigned int LuminosityBlockNumber_t
std::vector< SubProcess > subProcesses_
bool alreadyPrinted() const
edm::SerialTaskQueue queueWhichWaitsForIOVsToFinish_
void validateTopLevelParameterSets(ParameterSet *processParameterSet)
MergeableRunProductProcesses mergeableRunProductProcesses_
static std::string const input
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
void handleNextEventForStreamAsync(WaitingTaskHolder iTask, unsigned int iStreamIndex)
void setEndTime(Timestamp const &time)
std::unique_ptr< InputSource > makeInput(ParameterSet ¶ms, CommonParams const &common, std::shared_ptr< ProductRegistry > preg, std::shared_ptr< BranchIDListHelper > branchIDListHelper, std::shared_ptr< ThinnedAssociationsHelper > thinnedAssociationsHelper, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, PreallocationConfiguration const &allocations)
U second(std::pair< T, U > const &p)
std::shared_ptr< LuminosityBlockPrincipal > & lumiPrincipal()
void initialize(Schedule const *, std::shared_ptr< ProductRegistry const >)
ServiceToken serviceToken_
bool endPathsEnabled() const
std::atomic< bool > deferredExceptionPtrIsSet_
bool resume()
Resumes processing if the queue was paused.
void doneWaiting(std::exception_ptr iExcept)
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
LuminosityBlockNumber_t luminosityBlock() const
std::shared_ptr< ProductRegistry const > preg() const
std::vector< edm::SerialTaskQueue > streamQueues_
InputSource::ItemType lastTransitionType() const
void setExceptionMessageRuns(std::string &message)
bool taskHasFailed() const
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
Timestamp const & beginTime() const
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
void endRun(ProcessHistoryID const &phid, RunNumber_t run, bool globalBeginSucceeded, bool cleaningUpAfterException)
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
static void setThrowAnException(bool v)
void setLastOperationSucceeded(bool value)
StreamID streamID() const
void clear()
Not thread safe.
Timestamp const & endTime() const
void continueLumiAsync(edm::WaitingTaskHolder iHolder)
StatusCode runToCompletion()
bool continuingLumi() const
void addAdditionalInfo(std::string const &info)
void endUnfinishedRun(ProcessHistoryID const &phid, RunNumber_t run, bool globalBeginSucceeded, bool cleaningUpAfterException, bool eventSetupForInstanceSucceeded)
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
unsigned int numberOfLuminosityBlocks() const
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
InputSource::ItemType processLumis(std::shared_ptr< void > const &iRunResource)
element_type const * get() const
void adjustEventsToNewProductRegistry(std::shared_ptr< ProductRegistry const > reg)
SerialTaskQueueChain & serialQueueChain() const
std::shared_ptr< EDLooperBase > fillLooper(eventsetup::EventSetupsController &esController, eventsetup::EventSetupProvider &cp, ParameterSet ¶ms)
std::string exceptionMessageFiles_
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
void respondToCloseInputFile()
InputSource::ItemType lastSourceTransition_
std::vector< ParameterSet > popSubProcessVParameterSet(ParameterSet ¶meterSet)
void setProcessHistoryRegistry(ProcessHistoryRegistry const &phr)
StatusCode asyncStopStatusCodeFromProcessingEvents_
void checkForModuleDependencyCorrectness(edm::PathsAndConsumesOfModulesBase const &iPnC, bool iPrintDependencies)
void respondToOpenInputFile()
bool shouldWeCloseOutput() const
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
MergeableRunProductMetadata * mergeableRunProductMetadata()
int readAndMergeLumi(LuminosityBlockProcessingStatus &)
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
void deleteRun(ProcessHistoryID const &phid, RunNumber_t run)
void addContext(std::string const &context)
void stopProcessingEvents()
static EventNumber_t maxEventNumber()
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
std::shared_ptr< RunPrincipal > const & runPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run) const
bool forceESCacheClearOnNewRun_
void beginLumiAsync(edm::IOVSyncValue const &iSyncValue, std::shared_ptr< void > const &iRunResource, edm::WaitingTaskHolder iHolder)
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
void handleEndLumiExceptions(std::exception_ptr const *iPtr, WaitingTaskHolder &holder)
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
void closeInputFile(bool cleaningUpAfterException)
unsigned int numberOfStreams() const
std::exception_ptr deferredExceptionPtr_
int totalEventsFailed() const
void push(const T &iAction)
asynchronously pushes functor iAction into queue
PathsAndConsumesOfModules pathsAndConsumesOfModules_
edm::LuminosityBlockNumber_t nextLuminosityBlockID()
void setNextSyncValue(IOVSyncValue iValue)
void prepareForNextLoop()
std::shared_ptr< LuminosityBlockPrincipal > getAvailableLumiPrincipalPtr()
auto wrap(F iFunc) -> decltype(iFunc())
bool asyncStopRequestedWhileProcessingEvents_
std::shared_ptr< ActivityRegistry > actReg_
void call(std::function< void(void)>)
std::atomic< unsigned int > streamLumiActive_
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
T first(std::pair< T, U > const &p)
std::pair< ProcessHistoryID, RunNumber_t > readRun()
static ParentageRegistry * instance()
void setExceptionMessageLumis()
bool setDeferredException(std::exception_ptr)
int totalEventsPassed() const
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
ParameterSet const & registerIt()
std::pair< ProcessHistoryID, RunNumber_t > readAndMergeRun()
bool pause()
Pauses processing of additional tasks from the queue.
bool insertMapped(value_type const &v)
ParameterSet const & parameterSet(Provenance const &provenance)
Transition requestedTransition() const
static Registry * instance()
std::shared_ptr< EDLooperBase const > looper() const
void merge(std::shared_ptr< RunAuxiliary > aux, std::shared_ptr< ProductRegistry const > reg)
PrincipalCache principalCache_
bool shouldWeStop() const
EventProcessor(std::unique_ptr< ParameterSet > parameterSet, ServiceToken const &token=ServiceToken(), serviceregistry::ServiceLegacy=serviceregistry::kOverlapIsError, std::vector< std::string > const &defaultServices=std::vector< std::string >(), std::vector< std::string > const &forcedServices=std::vector< std::string >())
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const
void enableEndPaths(bool active)
void getTriggerReport(TriggerReport &rep) const
ParameterSet * getPSetForUpdate(std::string const &name, bool &isTracked)
std::pair< edm::ProcessHistoryID, edm::RunNumber_t > nextRunID()
int maxSecondsUntilRampdown_