71 #include "boost/range/adaptor/reversed.hpp" 95 class SendSourceTerminationSignalIfException {
99 ~SendSourceTerminationSignalIfException() {
104 void completedSuccessfully() {
116 std::unique_ptr<InputSource>
119 std::shared_ptr<ProductRegistry> preg,
120 std::shared_ptr<BranchIDListHelper> branchIDListHelper,
121 std::shared_ptr<ThinnedAssociationsHelper> thinnedAssociationsHelper,
122 std::shared_ptr<ActivityRegistry> areg,
123 std::shared_ptr<ProcessConfiguration const> processConfiguration,
126 if(main_input ==
nullptr) {
128 <<
"There must be exactly one source in the configuration.\n" 129 <<
"It is missing (or there are sufficient syntax errors such that it is not recognized as the source)\n";
134 std::unique_ptr<ParameterSetDescriptionFillerBase>
filler(
137 filler->fill(descriptions);
141 descriptions.validate(*main_input,
std::string(
"source"));
145 std::ostringstream ost;
146 ost <<
"Validating configuration of input source of type " << modtype;
162 processConfiguration.get(),
163 ModuleDescription::getUniqueID());
169 areg->preSourceConstructionSignal_(md);
170 std::unique_ptr<InputSource>
input;
173 std::shared_ptr<int> sentry(
nullptr,[areg,&md](
void*){areg->postSourceConstructionSignal_(md);});
175 input = std::unique_ptr<InputSource>(
InputSourceFactory::get()->makeInputSource(*main_input, isdesc).release());
176 input->preEventReadFromSourceSignal_.connect(std::cref(areg->preEventReadFromSourceSignal_));
177 input->postEventReadFromSourceSignal_.connect(std::cref(areg->postEventReadFromSourceSignal_));
181 std::ostringstream ost;
182 ost <<
"Constructing input source of type " << modtype;
190 std::shared_ptr<EDLooperBase>
194 std::shared_ptr<EDLooperBase> vLooper;
196 std::vector<std::string> loopers = params.
getParameter<std::vector<std::string> >(
"@all_loopers");
198 if(loopers.empty()) {
202 assert(1 == loopers.size());
204 for(std::vector<std::string>::iterator itName = loopers.begin(), itNameEnd = loopers.end();
218 EventProcessor::EventProcessor(std::unique_ptr<ParameterSet>
parameterSet,
221 std::vector<std::string>
const& defaultServices,
222 std::vector<std::string>
const& forcedServices) :
225 branchIDListHelper_(),
228 espController_(new eventsetup::EventSetupsController),
231 processConfiguration_(),
237 deferredExceptionPtrIsSet_(
false),
241 beginJobCalled_(
false),
242 shouldWeStop_(
false),
243 fileModeNoMerge_(
false),
244 exceptionMessageFiles_(),
245 exceptionMessageRuns_(),
246 exceptionMessageLumis_(),
247 forceLooperToEnd_(
false),
248 looperBeginJobRun_(
false),
249 forceESCacheClearOnNewRun_(
false),
250 eventSetupDataToExcludeFromPrefetching_() {
251 auto processDesc = std::make_shared<ProcessDesc>(
std::move(parameterSet));
252 processDesc->addServices(defaultServices, forcedServices);
253 init(processDesc, iToken, iLegacy);
257 std::vector<std::string>
const& defaultServices,
258 std::vector<std::string>
const& forcedServices) :
289 auto processDesc = std::make_shared<ProcessDesc>(
std::move(parameterSet));
290 processDesc->addServices(defaultServices, forcedServices);
327 init(processDesc, token, legacy);
343 std::shared_ptr<ParameterSet>
parameterSet = processDesc->getProcessPSet();
347 bool const hasSubProcesses = !subProcessVParameterSet.empty();
355 ParameterSet const& optionsPset(parameterSet->getUntrackedParameterSet(
"options"));
357 if (fileMode !=
"NOMERGE" and fileMode !=
"FULLMERGE") {
360 <<
"Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
367 unsigned int nThreads = optionsPset.getUntrackedParameter<
unsigned int>(
"numberOfThreads");
371 assert(nThreads != 0);
373 unsigned int nStreams = optionsPset.getUntrackedParameter<
unsigned int>(
"numberOfStreams");
377 if (nThreads > 1
or nStreams > 1) {
378 edm::LogInfo(
"ThreadStreamSetup") <<
"setting # threads "<<nThreads<<
"\nsetting # streams "<<nStreams;
380 unsigned int nConcurrentRuns = optionsPset.getUntrackedParameter<
unsigned int>(
"numberOfConcurrentRuns");
381 if (nConcurrentRuns != 1) {
383 <<
"Although the plan is to change this in the future, currently nConcurrentRuns must always be 1.\n";
385 unsigned int nConcurrentLumis = optionsPset.getUntrackedParameter<
unsigned int>(
"numberOfConcurrentLuminosityBlocks");
386 if (nConcurrentLumis == 0) {
387 nConcurrentLumis = nConcurrentRuns;
410 auto& serviceSets = processDesc->getServicesPSets();
411 ServiceToken token = items.initServices(serviceSets, *parameterSet, iToken, iLegacy,
true);
419 handler->willBeUsingThreads();
423 std::shared_ptr<CommonParams>
common(items.initMisc(*parameterSet));
431 looper_->setActionTable(items.act_table_.get());
432 looper_->attachTo(*items.actReg_);
442 lumiQueue_ = std::make_unique<LimitedTaskQueue>(nConcurrentLumis);
450 items.branchIDListHelper(),
451 items.thinnedAssociationsHelper(),
453 items.processConfiguration(),
462 preg_ = items.preg();
470 FDEBUG(2) << parameterSet << std::endl;
488 for(
auto& subProcessPSet : subProcessVParameterSet) {
536 actReg_->preallocateSignal_(bounds);
564 ex.
addContext(
"Calling beginJob for the source");
570 actReg_->postBeginJobSignal_();
581 ExceptionCollector c(
"Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
590 c.
call([&subProcess,
i](){ subProcess.doEndStream(
i); } );
594 c.
call([actReg](){actReg->preEndJobSignal_();});
603 c.
call([actReg](){actReg->postEndJobSignal_();});
614 std::vector<ModuleDescription const*>
616 return schedule_->getAllModuleDescriptions();
655 #include "TransitionProcessors.icc" 660 bool returnValue =
false;
677 SendSourceTerminationSignalIfException sentry(
actReg_.get());
681 itemType =
input_->nextItemType();
685 sentry.completedSuccessfully();
697 std::pair<edm::ProcessHistoryID, edm::RunNumber_t>
699 return std::make_pair(
input_->reducedProcessHistoryID(),
input_->run());
704 return input_->luminosityBlock();
723 bool firstTime =
true;
733 auto trans = fp.processFiles(*
this);
745 <<
"Unexpected transition change " 780 FDEBUG(1) <<
" \treadFile\n";
782 SendSourceTerminationSignalIfException sentry(
actReg_.get());
787 if(size < preg_->
size()) {
795 sentry.completedSuccessfully();
799 if (
fb_.get() !=
nullptr) {
800 SendSourceTerminationSignalIfException sentry(
actReg_.get());
801 input_->closeFile(
fb_.get(), cleaningUpAfterException);
802 sentry.completedSuccessfully();
804 FDEBUG(1) <<
"\tcloseInputFile\n";
808 if (
fb_.get() !=
nullptr) {
812 FDEBUG(1) <<
"\topenOutputFiles\n";
816 if (
fb_.get() !=
nullptr) {
820 FDEBUG(1) <<
"\tcloseOutputFiles\n";
825 if (
fb_.get() !=
nullptr) {
829 FDEBUG(1) <<
"\trespondToOpenInputFile\n";
833 if (
fb_.get() !=
nullptr) {
837 FDEBUG(1) <<
"\trespondToCloseInputFile\n";
847 FDEBUG(1) <<
"\tstartingNewLoop\n";
853 looper_->setModuleChanger(&changer);
855 looper_->setModuleChanger(
nullptr);
859 FDEBUG(1) <<
"\tendOfLoop\n";
866 FDEBUG(1) <<
"\trewind\n";
871 FDEBUG(1) <<
"\tprepareForNextLoop\n";
875 FDEBUG(1) <<
"\tshouldWeCloseOutput\n";
878 if(subProcess.shouldWeCloseOutput()) {
888 FDEBUG(1) <<
"\tdoErrorStuff\n";
890 <<
"The EventProcessor state machine encountered an unexpected event\n" 891 <<
"and went to the error state\n" 892 <<
"Will attempt to terminate processing normally\n" 893 <<
"(IF using the looper the next loop will be attempted)\n" 894 <<
"This likely indicates a bug in an input module or corrupted input or both\n";
898 bool& eventSetupForInstanceSucceeded) {
899 globalBeginSucceeded =
false;
902 SendSourceTerminationSignalIfException sentry(
actReg_.get());
905 sentry.completedSuccessfully();
914 SendSourceTerminationSignalIfException sentry(
actReg_.get());
916 eventSetupForInstanceSucceeded =
true;
917 sentry.completedSuccessfully();
929 globalWaitTask->increment_ref_count();
937 globalWaitTask->wait_for_all();
938 if(globalWaitTask->exceptionPtr() !=
nullptr) {
939 std::rethrow_exception(* (globalWaitTask->exceptionPtr()) );
942 globalBeginSucceeded =
true;
943 FDEBUG(1) <<
"\tbeginRun " << run <<
"\n";
950 streamLoopWaitTask->increment_ref_count();
954 beginStreamsTransitionAsync<Traits>(streamLoopWaitTask.get(),
963 streamLoopWaitTask->wait_for_all();
964 if(streamLoopWaitTask->exceptionPtr() !=
nullptr) {
965 std::rethrow_exception(* (streamLoopWaitTask->exceptionPtr()) );
968 FDEBUG(1) <<
"\tstreamBeginRun " << run <<
"\n";
975 bool globalBeginSucceeded,
bool cleaningUpAfterException,
976 bool eventSetupForInstanceSucceeded) {
977 if (eventSetupForInstanceSucceeded) {
979 endRun(phid, run, globalBeginSucceeded, cleaningUpAfterException);
981 if(globalBeginSucceeded) {
983 t->increment_ref_count();
990 if(
t->exceptionPtr()) {
991 std::rethrow_exception(*
t->exceptionPtr());
1005 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1007 sentry.completedSuccessfully();
1010 if(globalBeginSucceeded){
1013 streamLoopWaitTask->increment_ref_count();
1025 cleaningUpAfterException);
1027 streamLoopWaitTask->wait_for_all();
1028 if(streamLoopWaitTask->exceptionPtr() !=
nullptr) {
1029 std::rethrow_exception(* (streamLoopWaitTask->exceptionPtr()) );
1032 FDEBUG(1) <<
"\tstreamEndRun " << run <<
"\n";
1038 globalWaitTask->increment_ref_count();
1048 cleaningUpAfterException);
1049 globalWaitTask->wait_for_all();
1050 if(globalWaitTask->exceptionPtr() !=
nullptr) {
1051 std::rethrow_exception(* (globalWaitTask->exceptionPtr()) );
1054 FDEBUG(1) <<
"\tendRun " << run <<
"\n";
1063 waitTask->increment_ref_count();
1070 input_->luminosityBlockAuxiliary()->beginTime()),
1074 waitTask->wait_for_all();
1076 if(waitTask->exceptionPtr() !=
nullptr) {
1077 std::rethrow_exception(* (waitTask->exceptionPtr()) );
1103 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1106 sentry.completedSuccessfully();
1112 rng->preBeginLumi(lb);
1119 ,[
this, holder = iHolder,
status, ts] (std::exception_ptr
const* iPtr)
mutable {
1121 holder.doneWaiting(*iPtr);
1124 status->globalBeginDidSucceed();
1132 holder.doneWaiting(std::current_exception());
1143 [
this,
i,
h = holder](std::exception_ptr
const* iPtr)
mutable 1146 h.doneWaiting(*iPtr);
1154 auto lp =
status->lumiPrincipal();
1155 event.setLuminosityBlockPrincipal(lp.get());
1169 beginGlobalTransitionAsync<Traits>(beginStreamsHolder,
1171 *(
status->lumiPrincipal()),
1192 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1194 sentry.completedSuccessfully();
1210 status->needToContinueLumi();
1211 status->startProcessingEvents();
1214 unsigned int streamIndex = 0;
1217 [
this,streamIndex,
h = iHolder](){
1230 std::exception_ptr ptr;
1243 auto& lp = *(
status->lumiPrincipal());
1249 ptr = std::current_exception();
1258 status->resumeGlobalLumiQueue();
1261 ptr = std::current_exception();
1268 ptr = std::current_exception();
1272 items.second.doneWaiting(ptr);
1277 task.doneWaiting(*iExcept);
1280 if(
status->didGlobalBeginSucceed()) {
1285 auto& lp = *(iLumiStatus->lumiPrincipal());
1301 iLumiStatus->cleaningUpAfterException());
1305 unsigned int iStreamIndex,
1306 std::shared_ptr<LuminosityBlockProcessingStatus> iLumiStatus) {
1308 auto t =
edm::make_waiting_task(tbb::task::allocate_root(), [
this, iStreamIndex, iTask](std::exception_ptr
const* iPtr)
mutable {
1309 std::exception_ptr ptr;
1320 if(
status->streamFinishedLumi()) {
1328 iLumiStatus->setEndTime();
1330 if(iLumiStatus->didGlobalBeginSucceed()) {
1331 auto & lumiPrincipal = *iLumiStatus->lumiPrincipal();
1333 lumiPrincipal.endTime());
1336 bool cleaningUpAfterException = iLumiStatus->cleaningUpAfterException();
1339 endStreamTransitionAsync<Traits>(
std::move(lumiDoneTask),
1341 lumiPrincipal,ts,es,
1351 globalWaitTask->increment_ref_count();
1360 globalWaitTask->wait_for_all();
1361 if(globalWaitTask->exceptionPtr() !=
nullptr) {
1362 std::rethrow_exception(* (globalWaitTask->exceptionPtr()) );
1370 <<
"EventProcessor::readRun\n" 1371 <<
"Illegal attempt to insert run into cache\n" 1372 <<
"Contact a Framework Developer\n";
1374 auto rp = std::make_shared<RunPrincipal>(
input_->runAuxiliary(),
preg(),
1378 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1380 sentry.completedSuccessfully();
1382 assert(
input_->reducedProcessHistoryID() == rp->reducedProcessHistoryID());
1384 return std::make_pair(rp->reducedProcessHistoryID(),
input_->run());
1391 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1392 input_->readAndMergeRun(*runPrincipal);
1393 sentry.completedSuccessfully();
1395 assert(
input_->reducedProcessHistoryID() == runPrincipal->reducedProcessHistoryID());
1396 return std::make_pair(runPrincipal->reducedProcessHistoryID(),
input_->run());
1402 <<
"EventProcessor::readLuminosityBlock\n" 1403 <<
"Illegal attempt to insert lumi into cache\n" 1404 <<
"Run is invalid\n" 1405 <<
"Contact a Framework Developer\n";
1409 lbp->setAux(*
input_->luminosityBlockAuxiliary());
1411 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1413 sentry.completedSuccessfully();
1421 assert(lumiPrincipal.aux().sameIdentity(*
input_->luminosityBlockAuxiliary())
or 1422 input_->processHistoryRegistry().reducedProcessHistoryID(lumiPrincipal.aux().processHistoryID()) ==
input_->processHistoryRegistry().reducedProcessHistoryID(
input_->luminosityBlockAuxiliary()->processHistoryID()));
1423 bool lumiOK = lumiPrincipal.adjustToNewProductRegistry(*
preg());
1425 lumiPrincipal.mergeAuxiliary(*
input_->luminosityBlockAuxiliary());
1427 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1429 sentry.completedSuccessfully();
1431 return input_->luminosityBlock();
1436 auto subsT =
edm::make_waiting_task(tbb::task::allocate_root(), [
this,phid,run,task,mergeableRunProductMetadata]
1437 (std::exception_ptr
const* iExcept)
mutable {
1443 s.writeRunAsync(task,phid,run,mergeableRunProductMetadata);
1454 for_all(
subProcesses_, [run,phid](
auto& subProcess){ subProcess.deleteRunFromCache(phid, run); });
1455 FDEBUG(1) <<
"\tdeleteRunFromCache " << run <<
"\n";
1459 auto subsT =
edm::make_waiting_task(tbb::task::allocate_root(), [
this,task, iStatus](std::exception_ptr
const* iExcept)
mutable {
1465 s.writeLumiAsync(task,*(iStatus->lumiPrincipal()));
1471 std::shared_ptr<LuminosityBlockPrincipal>
const& lumiPrincipal = iStatus->lumiPrincipal();
1472 lumiPrincipal->runPrincipal().mergeableRunProductMetadata()->writeLumi(lumiPrincipal->luminosityBlock());
1502 std::lock_guard<std::recursive_mutex> guard(*(
sourceMutex_.get()));
1515 input_->luminosityBlockAuxiliary()->beginTime()));
1533 bool expected =
false;
1543 unsigned int iStreamIndex)
1550 auto recursionTask =
make_waiting_task(tbb::task::allocate_root(), [
this,iTask,iStreamIndex](std::exception_ptr
const* iPtr)
mutable {
1552 bool expected =
false;
1566 if(
status->isLumiEnding()) {
1577 bool expected =
false;
1579 auto e =std::current_exception();
1592 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1596 sentry.completedSuccessfully();
1598 FDEBUG(1) <<
"\treadEvent\n";
1602 unsigned int iStreamIndex) {
1609 unsigned int iStreamIndex) {
1616 rng->postEventRead(ev);
1620 tbb::task::allocate_root(),
1621 [
this,pep,iHolder](std::exception_ptr
const* iPtr)
mutable 1630 FDEBUG(1) <<
"\tprocessEvent\n";
1631 pep->clearEventPrincipal();
1642 afterProcessTask =
std::move(finalizeEventTask);
1648 [
this,pep,finalizeEventTask] (std::exception_ptr
const* iPtr)
mutable 1655 subProcess.doEventAsync(finalizeEventTask,*pep);
1670 bool randomAccess =
input_->randomAccess();
1679 status =
looper_->doDuringLoop(iPrincipal,
esp_->eventSetup(), pc, &streamContext);
1699 FDEBUG(1) <<
"\tshouldWeStop\n";
1703 if(subProcess.terminate()) {
1725 bool expected =
false;
void readLuminosityBlock(LuminosityBlockProcessingStatus &)
void insert(std::shared_ptr< RunPrincipal > rp)
T getParameter(std::string const &) const
void readEvent(unsigned int iStreamIndex)
T getUntrackedParameter(std::string const &, T const &) const
ProcessContext processContext_
bool readNextEventForStream(unsigned int iStreamIndex, LuminosityBlockProcessingStatus &iLumiStatus)
void setProcessesWithMergeableRunProducts(ProductRegistry const &productRegistry)
void clear()
Not thread safe.
bool wasEventProcessingStopped() const
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
void deleteRunFromCache(ProcessHistoryID const &phid, RunNumber_t run)
SharedResourcesAcquirer sourceResourcesAcquirer_
Timestamp const & beginTime() const
edm::EventID specifiedEventTransition() const
InputSource::ItemType nextTransitionType()
FWCore Framework interface EventSetupRecordImplementation h
Helper function to determine trigger accepts.
void processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
def create(alignables, pedeDump, additionalData, outputFile, config)
edm::propagate_const< std::unique_ptr< InputSource > > input_
void globalEndLumiAsync(edm::WaitingTaskHolder iTask, std::shared_ptr< LuminosityBlockProcessingStatus > iLumiStatus)
std::unique_ptr< ExceptionToActionTable const > act_table_
static PFTauRenderPlugin instance
ParameterSetID id() const
FunctorTask< F > * make_functor_task(ALLOC &&iAlloc, F f)
void beginRun(ProcessHistoryID const &phid, RunNumber_t run, bool &globalBeginSucceeded, bool &eventSetupForInstanceSucceeded)
void setExceptionMessageFiles(std::string &message)
void processEventAsyncImpl(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
std::unique_ptr< edm::LimitedTaskQueue > lumiQueue_
volatile std::atomic< bool > shutdown_flag
static LuminosityBlockNumber_t maxLuminosityBlockNumber()
void clearCounters()
Clears counters used by trigger report.
unsigned int numberOfRuns() const
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
bool checkForAsyncStopRequest(StatusCode &)
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
bool lastOperationSucceeded() const
void writeRunAsync(WaitingTaskHolder, ProcessHistoryID const &phid, RunNumber_t run, MergeableRunProductMetadata const *)
unsigned int numberOfThreads() const
void streamEndLumiAsync(edm::WaitingTaskHolder iTask, unsigned int iStreamIndex, std::shared_ptr< LuminosityBlockProcessingStatus > iLumiStatus)
bool hasRunPrincipal() const
void push(T &&iAction)
asynchronously pushes functor iAction into queue
void adjustIndexesAfterProductRegistryAddition()
std::string exceptionMessageRuns_
void deleteLumiFromCache(LuminosityBlockProcessingStatus &)
PreallocationConfiguration preallocations_
unsigned int LuminosityBlockNumber_t
std::vector< SubProcess > subProcesses_
bool alreadyPrinted() const
void validateTopLevelParameterSets(ParameterSet *processParameterSet)
MergeableRunProductProcesses mergeableRunProductProcesses_
static std::string const input
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
void handleNextEventForStreamAsync(WaitingTaskHolder iTask, unsigned int iStreamIndex)
void setEndTime(Timestamp const &time)
std::unique_ptr< InputSource > makeInput(ParameterSet ¶ms, CommonParams const &common, std::shared_ptr< ProductRegistry > preg, std::shared_ptr< BranchIDListHelper > branchIDListHelper, std::shared_ptr< ThinnedAssociationsHelper > thinnedAssociationsHelper, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, PreallocationConfiguration const &allocations)
U second(std::pair< T, U > const &p)
std::shared_ptr< LuminosityBlockPrincipal > & lumiPrincipal()
void initialize(Schedule const *, std::shared_ptr< ProductRegistry const >)
ServiceToken serviceToken_
bool endPathsEnabled() const
std::atomic< bool > deferredExceptionPtrIsSet_
bool resume()
Resumes processing if the queue was paused.
void doneWaiting(std::exception_ptr iExcept)
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
std::string exceptionMessageLumis_
LuminosityBlockNumber_t luminosityBlock() const
std::shared_ptr< ProductRegistry const > preg() const
void setExceptionMessageLumis(std::string &message)
std::vector< edm::SerialTaskQueue > streamQueues_
InputSource::ItemType lastTransitionType() const
void setExceptionMessageRuns(std::string &message)
bool taskHasFailed() const
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
Timestamp const & beginTime() const
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
void endRun(ProcessHistoryID const &phid, RunNumber_t run, bool globalBeginSucceeded, bool cleaningUpAfterException)
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
static void setThrowAnException(bool v)
void setLastOperationSucceeded(bool value)
StreamID streamID() const
void clear()
Not thread safe.
Timestamp const & endTime() const
void continueLumiAsync(edm::WaitingTaskHolder iHolder)
StatusCode runToCompletion()
bool continuingLumi() const
void addAdditionalInfo(std::string const &info)
void endUnfinishedRun(ProcessHistoryID const &phid, RunNumber_t run, bool globalBeginSucceeded, bool cleaningUpAfterException, bool eventSetupForInstanceSucceeded)
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
unsigned int numberOfLuminosityBlocks() const
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
void writeLumiAsync(WaitingTaskHolder, std::shared_ptr< LuminosityBlockProcessingStatus >)
InputSource::ItemType processLumis(std::shared_ptr< void > const &iRunResource)
element_type const * get() const
void adjustEventsToNewProductRegistry(std::shared_ptr< ProductRegistry const > reg)
SerialTaskQueueChain & serialQueueChain() const
std::shared_ptr< EDLooperBase > fillLooper(eventsetup::EventSetupsController &esController, eventsetup::EventSetupProvider &cp, ParameterSet ¶ms)
std::string exceptionMessageFiles_
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
void respondToCloseInputFile()
InputSource::ItemType lastSourceTransition_
std::vector< ParameterSet > popSubProcessVParameterSet(ParameterSet ¶meterSet)
void setProcessHistoryRegistry(ProcessHistoryRegistry const &phr)
StatusCode asyncStopStatusCodeFromProcessingEvents_
void checkForModuleDependencyCorrectness(edm::PathsAndConsumesOfModulesBase const &iPnC, bool iPrintDependencies)
void respondToOpenInputFile()
bool shouldWeCloseOutput() const
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
MergeableRunProductMetadata * mergeableRunProductMetadata()
int readAndMergeLumi(LuminosityBlockProcessingStatus &)
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
void deleteRun(ProcessHistoryID const &phid, RunNumber_t run)
void addContext(std::string const &context)
void stopProcessingEvents()
static EventNumber_t maxEventNumber()
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
std::shared_ptr< RunPrincipal > const & runPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run) const
bool forceESCacheClearOnNewRun_
void beginLumiAsync(edm::IOVSyncValue const &iSyncValue, std::shared_ptr< void > const &iRunResource, edm::WaitingTaskHolder iHolder)
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
std::vector< std::vector< double > > tmp
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
void closeInputFile(bool cleaningUpAfterException)
unsigned int numberOfStreams() const
std::exception_ptr deferredExceptionPtr_
int totalEventsFailed() const
void push(const T &iAction)
asynchronously pushes functor iAction into queue
edm::SerialTaskQueue iovQueue_
PathsAndConsumesOfModules pathsAndConsumesOfModules_
edm::LuminosityBlockNumber_t nextLuminosityBlockID()
void setNextSyncValue(IOVSyncValue iValue)
void prepareForNextLoop()
std::shared_ptr< LuminosityBlockPrincipal > getAvailableLumiPrincipalPtr()
auto wrap(F iFunc) -> decltype(iFunc())
bool asyncStopRequestedWhileProcessingEvents_
std::shared_ptr< ActivityRegistry > actReg_
void call(std::function< void(void)>)
std::atomic< unsigned int > streamLumiActive_
void processEventWithLooper(EventPrincipal &)
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
T first(std::pair< T, U > const &p)
std::pair< ProcessHistoryID, RunNumber_t > readRun()
static ParentageRegistry * instance()
bool setDeferredException(std::exception_ptr)
int totalEventsPassed() const
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
ParameterSet const & registerIt()
std::pair< ProcessHistoryID, RunNumber_t > readAndMergeRun()
bool pause()
Pauses processing of additional tasks from the queue.
bool insertMapped(value_type const &v)
ParameterSet const & parameterSet(Provenance const &provenance)
Transition requestedTransition() const
T get(const Candidate &c)
static Registry * instance()
std::shared_ptr< EDLooperBase const > looper() const
void merge(std::shared_ptr< RunAuxiliary > aux, std::shared_ptr< ProductRegistry const > reg)
PrincipalCache principalCache_
bool shouldWeStop() const
EventProcessor(std::unique_ptr< ParameterSet > parameterSet, ServiceToken const &token=ServiceToken(), serviceregistry::ServiceLegacy=serviceregistry::kOverlapIsError, std::vector< std::string > const &defaultServices=std::vector< std::string >(), std::vector< std::string > const &forcedServices=std::vector< std::string >())
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const
def operate(timelog, memlog, json_f, num)
void enableEndPaths(bool active)
void getTriggerReport(TriggerReport &rep) const
ParameterSet * getPSetForUpdate(std::string const &name, bool &isTracked)
std::pair< edm::ProcessHistoryID, edm::RunNumber_t > nextRunID()
int maxSecondsUntilRampdown_