72 #include "boost/range/adaptor/reversed.hpp" 96 class SendSourceTerminationSignalIfException {
99 ~SendSourceTerminationSignalIfException() {
104 void completedSuccessfully() { reg_ =
nullptr; }
117 std::shared_ptr<ProductRegistry> preg,
118 std::shared_ptr<BranchIDListHelper> branchIDListHelper,
119 std::shared_ptr<ThinnedAssociationsHelper> thinnedAssociationsHelper,
120 std::shared_ptr<ActivityRegistry> areg,
121 std::shared_ptr<ProcessConfiguration const> processConfiguration,
124 if (main_input ==
nullptr) {
126 <<
"There must be exactly one source in the configuration.\n" 127 <<
"It is missing (or there are sufficient syntax errors such that it is not recognized as the source)\n";
132 std::unique_ptr<ParameterSetDescriptionFillerBase>
filler(
135 filler->fill(descriptions);
140 std::ostringstream ost;
141 ost <<
"Validating configuration of input source of type " << modtype;
157 processConfiguration.get(),
158 ModuleDescription::getUniqueID());
163 thinnedAssociationsHelper,
170 areg->preSourceConstructionSignal_(md);
171 std::unique_ptr<InputSource>
input;
174 std::shared_ptr<int> sentry(
nullptr, [areg, &md](
void*) { areg->postSourceConstructionSignal_(md); });
176 input = std::unique_ptr<InputSource>(
InputSourceFactory::get()->makeInputSource(*main_input, isdesc).release());
177 input->preEventReadFromSourceSignal_.connect(std::cref(areg->preEventReadFromSourceSignal_));
178 input->postEventReadFromSourceSignal_.connect(std::cref(areg->postEventReadFromSourceSignal_));
181 std::ostringstream ost;
182 ost <<
"Constructing input source of type " << modtype;
193 std::shared_ptr<EDLooperBase> vLooper;
195 std::vector<std::string> loopers = params.
getParameter<std::vector<std::string> >(
"@all_loopers");
197 if (loopers.empty()) {
201 assert(1 == loopers.size());
203 for (std::vector<std::string>::iterator itName = loopers.begin(), itNameEnd = loopers.end(); itName != itNameEnd;
213 EventProcessor::EventProcessor(std::unique_ptr<ParameterSet>
parameterSet,
216 std::vector<std::string>
const& defaultServices,
217 std::vector<std::string>
const& forcedServices)
220 branchIDListHelper_(),
223 espController_(new eventsetup::EventSetupsController),
226 processConfiguration_(),
232 deferredExceptionPtrIsSet_(
false),
236 beginJobCalled_(
false),
237 shouldWeStop_(
false),
238 fileModeNoMerge_(
false),
239 exceptionMessageFiles_(),
240 exceptionMessageRuns_(),
241 exceptionMessageLumis_(
false),
242 forceLooperToEnd_(
false),
243 looperBeginJobRun_(
false),
244 forceESCacheClearOnNewRun_(
false),
245 eventSetupDataToExcludeFromPrefetching_() {
246 auto processDesc = std::make_shared<ProcessDesc>(
std::move(parameterSet));
247 processDesc->addServices(defaultServices, forcedServices);
248 init(processDesc, iToken, iLegacy);
252 std::vector<std::string>
const& defaultServices,
253 std::vector<std::string>
const& forcedServices)
283 auto processDesc = std::make_shared<ProcessDesc>(
std::move(parameterSet));
284 processDesc->addServices(defaultServices, forcedServices);
320 init(processDesc, token, legacy);
334 std::shared_ptr<ParameterSet>
parameterSet = processDesc->getProcessPSet();
338 bool const hasSubProcesses = !subProcessVParameterSet.empty();
346 ParameterSet const& optionsPset(parameterSet->getUntrackedParameterSet(
"options"));
348 if (fileMode !=
"NOMERGE" and fileMode !=
"FULLMERGE") {
351 <<
"Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
358 unsigned int nThreads = optionsPset.getUntrackedParameter<
unsigned int>(
"numberOfThreads");
362 assert(nThreads != 0);
364 unsigned int nStreams = optionsPset.getUntrackedParameter<
unsigned int>(
"numberOfStreams");
368 if (nThreads > 1
or nStreams > 1) {
369 edm::LogInfo(
"ThreadStreamSetup") <<
"setting # threads " << nThreads <<
"\nsetting # streams " << nStreams;
371 unsigned int nConcurrentRuns = optionsPset.getUntrackedParameter<
unsigned int>(
"numberOfConcurrentRuns");
372 if (nConcurrentRuns != 1) {
374 <<
"Although the plan is to change this in the future, currently nConcurrentRuns must always be 1.\n";
376 unsigned int nConcurrentLumis =
377 optionsPset.getUntrackedParameter<
unsigned int>(
"numberOfConcurrentLuminosityBlocks");
378 if (nConcurrentLumis == 0) {
379 nConcurrentLumis = nConcurrentRuns;
402 auto& serviceSets = processDesc->getServicesPSets();
403 ServiceToken token = items.initServices(serviceSets, *parameterSet, iToken, iLegacy,
true);
411 handler->willBeUsingThreads();
415 std::shared_ptr<CommonParams>
common(items.initMisc(*parameterSet));
423 looper_->setActionTable(items.act_table_.get());
424 looper_->attachTo(*items.actReg_);
428 nConcurrentLumis = 1;
434 lumiQueue_ = std::make_unique<LimitedTaskQueue>(nConcurrentLumis);
442 items.branchIDListHelper(),
443 items.thinnedAssociationsHelper(),
445 items.processConfiguration(),
454 preg_ = items.preg();
462 FDEBUG(2) << parameterSet << std::endl;
467 auto ep = std::make_shared<EventPrincipal>(
preg(),
484 for (
auto& subProcessPSet : subProcessVParameterSet) {
532 actReg_->preallocateSignal_(bounds);
560 ex.
addContext(
"Calling beginJob for the source");
567 actReg_->postBeginJobSignal_();
578 "Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
587 c.
call([&subProcess,
i]() { subProcess.doEndStream(
i); });
591 c.
call([actReg]() { actReg->preEndJobSignal_(); });
600 c.
call([actReg]() { actReg->postEndJobSignal_(); });
609 return schedule_->getAllModuleDescriptions();
627 #include "TransitionProcessors.icc" 631 bool returnValue =
false;
647 SendSourceTerminationSignalIfException sentry(
actReg_.get());
651 itemType =
input_->nextItemType();
655 sentry.completedSuccessfully();
668 return std::make_pair(
input_->reducedProcessHistoryID(),
input_->run());
687 bool firstTime =
true;
697 auto trans = fp.processFiles(*
this);
708 throw cms::Exception(
"BadTransition") <<
"Unexpected transition change " << trans;
717 "Another exception was caught while trying to clean up lumis after the primary fatal exception.");
743 FDEBUG(1) <<
" \treadFile\n";
745 SendSourceTerminationSignalIfException sentry(
actReg_.get());
750 if (size < preg_->
size()) {
757 sentry.completedSuccessfully();
761 if (
fb_.get() !=
nullptr) {
762 SendSourceTerminationSignalIfException sentry(
actReg_.get());
763 input_->closeFile(
fb_.get(), cleaningUpAfterException);
764 sentry.completedSuccessfully();
766 FDEBUG(1) <<
"\tcloseInputFile\n";
770 if (
fb_.get() !=
nullptr) {
774 FDEBUG(1) <<
"\topenOutputFiles\n";
778 if (
fb_.get() !=
nullptr) {
782 FDEBUG(1) <<
"\tcloseOutputFiles\n";
787 [
this](
auto& subProcess) { subProcess.updateBranchIDListHelper(
branchIDListHelper_->branchIDLists()); });
788 if (
fb_.get() !=
nullptr) {
792 FDEBUG(1) <<
"\trespondToOpenInputFile\n";
796 if (
fb_.get() !=
nullptr) {
800 FDEBUG(1) <<
"\trespondToCloseInputFile\n";
810 FDEBUG(1) <<
"\tstartingNewLoop\n";
816 looper_->setModuleChanger(&changer);
818 looper_->setModuleChanger(
nullptr);
824 FDEBUG(1) <<
"\tendOfLoop\n";
831 FDEBUG(1) <<
"\trewind\n";
836 FDEBUG(1) <<
"\tprepareForNextLoop\n";
840 FDEBUG(1) <<
"\tshouldWeCloseOutput\n";
843 if (subProcess.shouldWeCloseOutput()) {
853 FDEBUG(1) <<
"\tdoErrorStuff\n";
854 LogError(
"StateMachine") <<
"The EventProcessor state machine encountered an unexpected event\n" 855 <<
"and went to the error state\n" 856 <<
"Will attempt to terminate processing normally\n" 857 <<
"(IF using the looper the next loop will be attempted)\n" 858 <<
"This likely indicates a bug in an input module or corrupted input or both\n";
863 bool& globalBeginSucceeded,
864 bool& eventSetupForInstanceSucceeded) {
865 globalBeginSucceeded =
false;
868 SendSourceTerminationSignalIfException sentry(
actReg_.get());
871 sentry.completedSuccessfully();
879 SendSourceTerminationSignalIfException sentry(
actReg_.get());
881 eventSetupForInstanceSucceeded =
true;
882 sentry.completedSuccessfully();
884 auto const& es =
esp_->eventSetup();
894 globalWaitTask->increment_ref_count();
895 beginGlobalTransitionAsync<Traits>(
897 globalWaitTask->wait_for_all();
898 if (globalWaitTask->exceptionPtr() !=
nullptr) {
899 std::rethrow_exception(*(globalWaitTask->exceptionPtr()));
902 globalBeginSucceeded =
true;
903 FDEBUG(1) <<
"\tbeginRun " << run <<
"\n";
910 streamLoopWaitTask->increment_ref_count();
914 beginStreamsTransitionAsync<Traits>(streamLoopWaitTask.get(),
923 streamLoopWaitTask->wait_for_all();
924 if (streamLoopWaitTask->exceptionPtr() !=
nullptr) {
925 std::rethrow_exception(*(streamLoopWaitTask->exceptionPtr()));
928 FDEBUG(1) <<
"\tstreamBeginRun " << run <<
"\n";
936 bool globalBeginSucceeded,
937 bool cleaningUpAfterException,
938 bool eventSetupForInstanceSucceeded) {
939 if (eventSetupForInstanceSucceeded) {
941 endRun(phid, run, globalBeginSucceeded, cleaningUpAfterException);
943 if (globalBeginSucceeded) {
945 t->increment_ref_count();
952 if (
t->exceptionPtr()) {
953 std::rethrow_exception(*
t->exceptionPtr());
962 bool globalBeginSucceeded,
963 bool cleaningUpAfterException) {
971 SendSourceTerminationSignalIfException sentry(
actReg_.get());
973 sentry.completedSuccessfully();
975 auto const& es =
esp_->eventSetup();
976 if (globalBeginSucceeded) {
979 streamLoopWaitTask->increment_ref_count();
991 cleaningUpAfterException);
993 streamLoopWaitTask->wait_for_all();
994 if (streamLoopWaitTask->exceptionPtr() !=
nullptr) {
995 std::rethrow_exception(*(streamLoopWaitTask->exceptionPtr()));
998 FDEBUG(1) <<
"\tstreamEndRun " << run <<
"\n";
1004 globalWaitTask->increment_ref_count();
1014 cleaningUpAfterException);
1015 globalWaitTask->wait_for_all();
1016 if (globalWaitTask->exceptionPtr() !=
nullptr) {
1017 std::rethrow_exception(*(globalWaitTask->exceptionPtr()));
1020 FDEBUG(1) <<
"\tendRun " << run <<
"\n";
1028 waitTask->increment_ref_count();
1035 input_->luminosityBlockAuxiliary()->beginTime()),
1039 waitTask->wait_for_all();
1041 if (waitTask->exceptionPtr() !=
nullptr) {
1042 std::rethrow_exception(*(waitTask->exceptionPtr()));
1048 std::shared_ptr<void>
const& iRunResource,
1067 status->setResumer(
std::move(iResumer));
1078 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1081 sentry.completedSuccessfully();
1087 rng->preBeginLumi(lb);
1094 tbb::task::allocate_root(),
1095 [
this, holder = iHolder, status =
std::move(status), ts](std::exception_ptr
const* iPtr)
mutable {
1098 holder.doneWaiting(*iPtr);
1100 status->globalBeginDidSucceed();
1101 auto const& es =
esp_->eventSetup();
1109 holder.doneWaiting(std::current_exception());
1120 tbb::task::allocate_root(), [
this,
i,
h = holder](std::exception_ptr
const* iPtr)
mutable {
1132 auto lp = status->lumiPrincipal();
1133 event.setLuminosityBlockPrincipal(lp.get());
1134 beginStreamTransitionAsync<Traits>(
1145 auto const& es =
esp_->eventSetup();
1148 beginGlobalTransitionAsync<Traits>(
1167 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1169 sentry.completedSuccessfully();
1184 status->needToContinueLumi();
1185 status->startProcessingEvents();
1188 unsigned int streamIndex = 0;
1209 std::shared_ptr<LuminosityBlockProcessingStatus> iLumiStatus) {
1212 auto& lp = *(iLumiStatus->lumiPrincipal());
1213 bool didGlobalBeginSucceed = iLumiStatus->didGlobalBeginSucceed();
1214 bool cleaningUpAfterException = iLumiStatus->cleaningUpAfterException();
1217 tbb::task::allocate_root(),
1219 std::exception_ptr ptr;
1226 auto& lp = *(
status->lumiPrincipal());
1227 auto const& es =
esp_->eventSetup();
1231 ptr = std::current_exception();
1244 ptr = std::current_exception();
1253 ptr = std::current_exception();
1258 status->resumeGlobalLumiQueue();
1261 ptr = std::current_exception();
1272 ptr = std::current_exception();
1282 tbb::task::allocate_root(),
1284 std::exception_ptr
const* iExcept)
mutable {
1286 task.doneWaiting(*iExcept);
1289 if (didGlobalBeginSucceed) {
1298 auto const& es =
esp_->eventSetup();
1300 endGlobalTransitionAsync<Traits>(
1305 unsigned int iStreamIndex,
1306 std::shared_ptr<LuminosityBlockProcessingStatus> iLumiStatus) {
1308 [
this, iStreamIndex, iTask](std::exception_ptr
const* iPtr)
mutable {
1319 if (
status->streamFinishedLumi()) {
1328 iLumiStatus->setEndTime();
1330 if (iLumiStatus->didGlobalBeginSucceed()) {
1331 auto& lumiPrincipal = *iLumiStatus->lumiPrincipal();
1333 lumiPrincipal.endTime());
1334 auto const& es =
esp_->eventSetup();
1336 bool cleaningUpAfterException = iLumiStatus->cleaningUpAfterException();
1339 endStreamTransitionAsync<Traits>(
std::move(lumiDoneTask),
1347 cleaningUpAfterException);
1349 iLumiStatus.reset();
1355 globalWaitTask->increment_ref_count();
1364 globalWaitTask->wait_for_all();
1365 if (globalWaitTask->exceptionPtr() !=
nullptr) {
1366 std::rethrow_exception(*(globalWaitTask->exceptionPtr()));
1374 <<
"Illegal attempt to insert run into cache\n" 1375 <<
"Contact a Framework Developer\n";
1377 auto rp = std::make_shared<RunPrincipal>(
input_->runAuxiliary(),
1385 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1387 sentry.completedSuccessfully();
1389 assert(
input_->reducedProcessHistoryID() == rp->reducedProcessHistoryID());
1391 return std::make_pair(rp->reducedProcessHistoryID(),
input_->run());
1398 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1399 input_->readAndMergeRun(*runPrincipal);
1400 sentry.completedSuccessfully();
1402 assert(
input_->reducedProcessHistoryID() == runPrincipal->reducedProcessHistoryID());
1403 return std::make_pair(runPrincipal->reducedProcessHistoryID(),
input_->run());
1409 <<
"Illegal attempt to insert lumi into cache\n" 1410 <<
"Run is invalid\n" 1411 <<
"Contact a Framework Developer\n";
1415 lbp->setAux(*
input_->luminosityBlockAuxiliary());
1417 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1419 sentry.completedSuccessfully();
1427 assert(lumiPrincipal.aux().sameIdentity(*
input_->luminosityBlockAuxiliary())
or 1428 input_->processHistoryRegistry().reducedProcessHistoryID(lumiPrincipal.aux().processHistoryID()) ==
1429 input_->processHistoryRegistry().reducedProcessHistoryID(
1430 input_->luminosityBlockAuxiliary()->processHistoryID()));
1431 bool lumiOK = lumiPrincipal.adjustToNewProductRegistry(*
preg());
1433 lumiPrincipal.mergeAuxiliary(*
input_->luminosityBlockAuxiliary());
1435 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1437 sentry.completedSuccessfully();
1439 return input_->luminosityBlock();
1447 tbb::task::allocate_root(),
1448 [
this, phid, run, task, mergeableRunProductMetadata](std::exception_ptr
const* iExcept)
mutable {
1454 s.writeRunAsync(task, phid, run, mergeableRunProductMetadata);
1463 mergeableRunProductMetadata);
1468 for_all(
subProcesses_, [run, phid](
auto& subProcess) { subProcess.deleteRunFromCache(phid, run); });
1469 FDEBUG(1) <<
"\tdeleteRunFromCache " << run <<
"\n";
1474 [
this, task, &lumiPrincipal](std::exception_ptr
const* iExcept)
mutable {
1480 s.writeLumiAsync(task, lumiPrincipal);
1521 std::lock_guard<std::recursive_mutex> guard(*(
sourceMutex_.get()));
1533 input_->luminosityBlockAuxiliary()->beginTime()));
1550 bool expected =
false;
1567 tbb::task::allocate_root(), [
this, iTask, iStreamIndex](std::exception_ptr
const* iPtr)
mutable {
1571 bool expected =
false;
1589 if (
status->isLumiEnding()) {
1605 bool expected =
false;
1607 auto e = std::current_exception();
1620 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1624 sentry.completedSuccessfully();
1626 FDEBUG(1) <<
"\treadEvent\n";
1641 rng->postEventRead(ev);
1645 make_waiting_task(tbb::task::allocate_root(), [
this, pep, iHolder](std::exception_ptr
const* iPtr)
mutable {
1652 FDEBUG(1) <<
"\tprocessEvent\n";
1653 pep->clearEventPrincipal();
1662 afterProcessTask =
std::move(finalizeEventTask);
1667 tbb::task::allocate_root(), [
this, pep, finalizeEventTask](std::exception_ptr
const* iPtr)
mutable {
1673 subProcess.doEventAsync(finalizeEventTask, *pep);
1685 bool randomAccess =
input_->randomAccess();
1693 status =
looper_->doDuringLoop(iPrincipal,
esp_->eventSetup(), pc, &streamContext);
1712 FDEBUG(1) <<
"\tshouldWeStop\n";
1717 if (subProcess.terminate()) {
1733 bool expected =
false;
1742 std::unique_ptr<LogSystem>
s;
1743 for (
auto worker :
schedule_->allWorkers()) {
1744 if (worker->wantsGlobalLuminosityBlocks() and worker->globalLuminosityBlocksQueue()) {
1746 s = std::make_unique<LogSystem>(
"ModulesSynchingOnLumis");
1747 (*s) <<
"The following modules require synchronizing on LuminosityBlock boundaries:";
1749 (*s) <<
"\n " << worker->description().moduleName() <<
" " << worker->description().moduleLabel();
std::atomic< bool > exceptionMessageLumis_
RunPrincipal const & runPrincipal() const
void readLuminosityBlock(LuminosityBlockProcessingStatus &)
void insert(std::shared_ptr< RunPrincipal > rp)
T getParameter(std::string const &) const
void readEvent(unsigned int iStreamIndex)
T getUntrackedParameter(std::string const &, T const &) const
ProcessContext processContext_
bool readNextEventForStream(unsigned int iStreamIndex, LuminosityBlockProcessingStatus &iLumiStatus)
void setProcessesWithMergeableRunProducts(ProductRegistry const &productRegistry)
void clear()
Not thread safe.
bool wasEventProcessingStopped() const
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
void deleteRunFromCache(ProcessHistoryID const &phid, RunNumber_t run)
SharedResourcesAcquirer sourceResourcesAcquirer_
Timestamp const & beginTime() const
edm::EventID specifiedEventTransition() const
InputSource::ItemType nextTransitionType()
FWCore Framework interface EventSetupRecordImplementation h
Helper function to determine trigger accepts.
void processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
def create(alignables, pedeDump, additionalData, outputFile, config)
edm::propagate_const< std::unique_ptr< InputSource > > input_
void globalEndLumiAsync(edm::WaitingTaskHolder iTask, std::shared_ptr< LuminosityBlockProcessingStatus > iLumiStatus)
std::unique_ptr< ExceptionToActionTable const > act_table_
static PFTauRenderPlugin instance
ParameterSetID id() const
FunctorTask< F > * make_functor_task(ALLOC &&iAlloc, F f)
void beginRun(ProcessHistoryID const &phid, RunNumber_t run, bool &globalBeginSucceeded, bool &eventSetupForInstanceSucceeded)
void warnAboutModulesRequiringLuminosityBLockSynchronization() const
void setExceptionMessageFiles(std::string &message)
void processEventAsyncImpl(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
std::unique_ptr< edm::LimitedTaskQueue > lumiQueue_
volatile std::atomic< bool > shutdown_flag
static LuminosityBlockNumber_t maxLuminosityBlockNumber()
void clearCounters()
Clears counters used by trigger report.
unsigned int numberOfRuns() const
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
bool checkForAsyncStopRequest(StatusCode &)
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
bool lastOperationSucceeded() const
void writeRunAsync(WaitingTaskHolder, ProcessHistoryID const &phid, RunNumber_t run, MergeableRunProductMetadata const *)
unsigned int numberOfThreads() const
void streamEndLumiAsync(edm::WaitingTaskHolder iTask, unsigned int iStreamIndex, std::shared_ptr< LuminosityBlockProcessingStatus > iLumiStatus)
bool hasRunPrincipal() const
void push(T &&iAction)
asynchronously pushes functor iAction into queue
void adjustIndexesAfterProductRegistryAddition()
std::string exceptionMessageRuns_
void deleteLumiFromCache(LuminosityBlockProcessingStatus &)
PreallocationConfiguration preallocations_
void writeLumiAsync(WaitingTaskHolder, LuminosityBlockPrincipal &lumiPrincipal)
unsigned int LuminosityBlockNumber_t
std::vector< SubProcess > subProcesses_
bool alreadyPrinted() const
void validateTopLevelParameterSets(ParameterSet *processParameterSet)
MergeableRunProductProcesses mergeableRunProductProcesses_
static std::string const input
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
void handleNextEventForStreamAsync(WaitingTaskHolder iTask, unsigned int iStreamIndex)
void setEndTime(Timestamp const &time)
std::unique_ptr< InputSource > makeInput(ParameterSet ¶ms, CommonParams const &common, std::shared_ptr< ProductRegistry > preg, std::shared_ptr< BranchIDListHelper > branchIDListHelper, std::shared_ptr< ThinnedAssociationsHelper > thinnedAssociationsHelper, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, PreallocationConfiguration const &allocations)
U second(std::pair< T, U > const &p)
std::shared_ptr< LuminosityBlockPrincipal > & lumiPrincipal()
void initialize(Schedule const *, std::shared_ptr< ProductRegistry const >)
ServiceToken serviceToken_
bool endPathsEnabled() const
std::atomic< bool > deferredExceptionPtrIsSet_
bool resume()
Resumes processing if the queue was paused.
void doneWaiting(std::exception_ptr iExcept)
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
LuminosityBlockNumber_t luminosityBlock() const
std::shared_ptr< ProductRegistry const > preg() const
std::vector< edm::SerialTaskQueue > streamQueues_
InputSource::ItemType lastTransitionType() const
void setExceptionMessageRuns(std::string &message)
bool taskHasFailed() const
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
Timestamp const & beginTime() const
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
void endRun(ProcessHistoryID const &phid, RunNumber_t run, bool globalBeginSucceeded, bool cleaningUpAfterException)
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
static void setThrowAnException(bool v)
void setLastOperationSucceeded(bool value)
StreamID streamID() const
void clear()
Not thread safe.
Timestamp const & endTime() const
void continueLumiAsync(edm::WaitingTaskHolder iHolder)
StatusCode runToCompletion()
bool continuingLumi() const
void addAdditionalInfo(std::string const &info)
void endUnfinishedRun(ProcessHistoryID const &phid, RunNumber_t run, bool globalBeginSucceeded, bool cleaningUpAfterException, bool eventSetupForInstanceSucceeded)
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
unsigned int numberOfLuminosityBlocks() const
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
InputSource::ItemType processLumis(std::shared_ptr< void > const &iRunResource)
element_type const * get() const
void adjustEventsToNewProductRegistry(std::shared_ptr< ProductRegistry const > reg)
SerialTaskQueueChain & serialQueueChain() const
std::shared_ptr< EDLooperBase > fillLooper(eventsetup::EventSetupsController &esController, eventsetup::EventSetupProvider &cp, ParameterSet ¶ms)
std::string exceptionMessageFiles_
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
void respondToCloseInputFile()
InputSource::ItemType lastSourceTransition_
std::vector< ParameterSet > popSubProcessVParameterSet(ParameterSet ¶meterSet)
void setProcessHistoryRegistry(ProcessHistoryRegistry const &phr)
StatusCode asyncStopStatusCodeFromProcessingEvents_
void checkForModuleDependencyCorrectness(edm::PathsAndConsumesOfModulesBase const &iPnC, bool iPrintDependencies)
void respondToOpenInputFile()
bool shouldWeCloseOutput() const
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
MergeableRunProductMetadata * mergeableRunProductMetadata()
int readAndMergeLumi(LuminosityBlockProcessingStatus &)
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
void deleteRun(ProcessHistoryID const &phid, RunNumber_t run)
void addContext(std::string const &context)
void stopProcessingEvents()
static EventNumber_t maxEventNumber()
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
std::shared_ptr< RunPrincipal > const & runPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run) const
bool forceESCacheClearOnNewRun_
void beginLumiAsync(edm::IOVSyncValue const &iSyncValue, std::shared_ptr< void > const &iRunResource, edm::WaitingTaskHolder iHolder)
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
std::vector< std::vector< double > > tmp
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
void handleEndLumiExceptions(std::exception_ptr const *iPtr, WaitingTaskHolder &holder)
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
void closeInputFile(bool cleaningUpAfterException)
unsigned int numberOfStreams() const
std::exception_ptr deferredExceptionPtr_
int totalEventsFailed() const
void push(const T &iAction)
asynchronously pushes functor iAction into queue
edm::SerialTaskQueue iovQueue_
PathsAndConsumesOfModules pathsAndConsumesOfModules_
edm::LuminosityBlockNumber_t nextLuminosityBlockID()
void setNextSyncValue(IOVSyncValue iValue)
void prepareForNextLoop()
std::shared_ptr< LuminosityBlockPrincipal > getAvailableLumiPrincipalPtr()
auto wrap(F iFunc) -> decltype(iFunc())
bool asyncStopRequestedWhileProcessingEvents_
std::shared_ptr< ActivityRegistry > actReg_
void call(std::function< void(void)>)
std::atomic< unsigned int > streamLumiActive_
void processEventWithLooper(EventPrincipal &)
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
T first(std::pair< T, U > const &p)
std::pair< ProcessHistoryID, RunNumber_t > readRun()
static ParentageRegistry * instance()
void setExceptionMessageLumis()
bool setDeferredException(std::exception_ptr)
int totalEventsPassed() const
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
ParameterSet const & registerIt()
std::pair< ProcessHistoryID, RunNumber_t > readAndMergeRun()
bool pause()
Pauses processing of additional tasks from the queue.
bool insertMapped(value_type const &v)
ParameterSet const & parameterSet(Provenance const &provenance)
Transition requestedTransition() const
T get(const Candidate &c)
static Registry * instance()
std::shared_ptr< EDLooperBase const > looper() const
void merge(std::shared_ptr< RunAuxiliary > aux, std::shared_ptr< ProductRegistry const > reg)
PrincipalCache principalCache_
bool shouldWeStop() const
EventProcessor(std::unique_ptr< ParameterSet > parameterSet, ServiceToken const &token=ServiceToken(), serviceregistry::ServiceLegacy=serviceregistry::kOverlapIsError, std::vector< std::string > const &defaultServices=std::vector< std::string >(), std::vector< std::string > const &forcedServices=std::vector< std::string >())
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const
def operate(timelog, memlog, json_f, num)
void enableEndPaths(bool active)
void getTriggerReport(TriggerReport &rep) const
ParameterSet * getPSetForUpdate(std::string const &name, bool &isTracked)
std::pair< edm::ProcessHistoryID, edm::RunNumber_t > nextRunID()
int maxSecondsUntilRampdown_