71 #include "boost/range/adaptor/reversed.hpp" 95 class SendSourceTerminationSignalIfException {
99 ~SendSourceTerminationSignalIfException() {
104 void completedSuccessfully() {
116 std::unique_ptr<InputSource>
119 std::shared_ptr<ProductRegistry> preg,
120 std::shared_ptr<BranchIDListHelper> branchIDListHelper,
121 std::shared_ptr<ThinnedAssociationsHelper> thinnedAssociationsHelper,
122 std::shared_ptr<ActivityRegistry> areg,
123 std::shared_ptr<ProcessConfiguration const> processConfiguration,
126 if(main_input ==
nullptr) {
128 <<
"There must be exactly one source in the configuration.\n" 129 <<
"It is missing (or there are sufficient syntax errors such that it is not recognized as the source)\n";
134 std::unique_ptr<ParameterSetDescriptionFillerBase>
filler(
137 filler->fill(descriptions);
141 descriptions.validate(*main_input,
std::string(
"source"));
145 std::ostringstream ost;
146 ost <<
"Validating configuration of input source of type " << modtype;
162 processConfiguration.get(),
163 ModuleDescription::getUniqueID());
169 areg->preSourceConstructionSignal_(md);
170 std::unique_ptr<InputSource>
input;
173 std::shared_ptr<int> sentry(
nullptr,[areg,&md](
void*){areg->postSourceConstructionSignal_(md);});
175 input = std::unique_ptr<InputSource>(
InputSourceFactory::get()->makeInputSource(*main_input, isdesc).release());
176 input->preEventReadFromSourceSignal_.connect(std::cref(areg->preEventReadFromSourceSignal_));
177 input->postEventReadFromSourceSignal_.connect(std::cref(areg->postEventReadFromSourceSignal_));
181 std::ostringstream ost;
182 ost <<
"Constructing input source of type " << modtype;
190 std::shared_ptr<EDLooperBase>
194 std::shared_ptr<EDLooperBase> vLooper;
196 std::vector<std::string> loopers = params.
getParameter<std::vector<std::string> >(
"@all_loopers");
198 if(loopers.empty()) {
202 assert(1 == loopers.size());
204 for(std::vector<std::string>::iterator itName = loopers.begin(), itNameEnd = loopers.end();
221 std::vector<std::string>
const& defaultServices,
222 std::vector<std::string>
const& forcedServices) :
225 branchIDListHelper_(),
228 espController_(new eventsetup::EventSetupsController),
231 processConfiguration_(),
237 deferredExceptionPtrIsSet_(
false),
241 beginJobCalled_(
false),
242 shouldWeStop_(
false),
243 fileModeNoMerge_(
false),
244 exceptionMessageFiles_(),
245 exceptionMessageRuns_(),
246 exceptionMessageLumis_(),
247 forceLooperToEnd_(
false),
248 looperBeginJobRun_(
false),
249 forceESCacheClearOnNewRun_(
false),
250 eventSetupDataToExcludeFromPrefetching_() {
252 auto processDesc = std::make_shared<ProcessDesc>(
parameterSet);
253 processDesc->addServices(defaultServices, forcedServices);
254 init(processDesc, iToken, iLegacy);
258 std::vector<std::string>
const& defaultServices,
259 std::vector<std::string>
const& forcedServices) :
291 auto processDesc = std::make_shared<ProcessDesc>(
parameterSet);
292 processDesc->addServices(defaultServices, forcedServices);
329 init(processDesc, token, legacy);
366 auto processDesc = std::make_shared<ProcessDesc>(
parameterSet);
370 auto processDesc = std::make_shared<ProcessDesc>(
config);
388 std::shared_ptr<ParameterSet>
parameterSet = processDesc->getProcessPSet();
392 bool const hasSubProcesses = !subProcessVParameterSet.empty();
400 ParameterSet const& optionsPset(parameterSet->getUntrackedParameterSet(
"options"));
402 if (fileMode !=
"NOMERGE" and fileMode !=
"FULLMERGE") {
405 <<
"Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
412 unsigned int nThreads = optionsPset.getUntrackedParameter<
unsigned int>(
"numberOfThreads");
416 assert(nThreads != 0);
418 unsigned int nStreams = optionsPset.getUntrackedParameter<
unsigned int>(
"numberOfStreams");
423 edm::LogInfo(
"ThreadStreamSetup") <<
"setting # threads "<<nThreads<<
"\nsetting # streams "<<nStreams;
425 unsigned int nConcurrentRuns = optionsPset.getUntrackedParameter<
unsigned int>(
"numberOfConcurrentRuns");
426 if (nConcurrentRuns != 1) {
428 <<
"Although the plan is to change this in the future, currently nConcurrentRuns must always be 1.\n";
430 unsigned int nConcurrentLumis = optionsPset.getUntrackedParameter<
unsigned int>(
"numberOfConcurrentLuminosityBlocks");
431 if (nConcurrentLumis == 0) {
432 nConcurrentLumis = nConcurrentRuns;
455 auto& serviceSets = processDesc->getServicesPSets();
456 ServiceToken token = items.initServices(serviceSets, *parameterSet, iToken, iLegacy,
true);
464 handler->willBeUsingThreads();
468 std::shared_ptr<CommonParams>
common(items.initMisc(*parameterSet));
476 looper_->setActionTable(items.act_table_.get());
477 looper_->attachTo(*items.actReg_);
487 lumiQueue_ = std::make_unique<LimitedTaskQueue>(nConcurrentLumis);
495 items.branchIDListHelper(),
496 items.thinnedAssociationsHelper(),
498 items.processConfiguration(),
507 preg_ = items.preg();
514 FDEBUG(2) << parameterSet << std::endl;
532 for(
auto& subProcessPSet : subProcessVParameterSet) {
580 actReg_->preallocateSignal_(bounds);
608 ex.
addContext(
"Calling beginJob for the source");
614 actReg_->postBeginJobSignal_();
625 ExceptionCollector c(
"Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
634 c.
call([&subProcess,
i](){ subProcess.doEndStream(
i); } );
638 c.
call([actReg](){actReg->preEndJobSignal_();});
647 c.
call([actReg](){actReg->postEndJobSignal_();});
658 std::vector<ModuleDescription const*>
660 return schedule_->getAllModuleDescriptions();
699 #include "TransitionProcessors.icc" 704 bool returnValue =
false;
721 SendSourceTerminationSignalIfException sentry(
actReg_.get());
725 itemType =
input_->nextItemType();
729 sentry.completedSuccessfully();
741 std::pair<edm::ProcessHistoryID, edm::RunNumber_t>
743 return std::make_pair(
input_->reducedProcessHistoryID(),
input_->run());
748 return input_->luminosityBlock();
767 bool firstTime =
true;
777 auto trans = fp.processFiles(*
this);
789 <<
"Unexpected transition change " 824 FDEBUG(1) <<
" \treadFile\n";
826 SendSourceTerminationSignalIfException sentry(
actReg_.get());
829 if(size < preg_->
size()) {
837 sentry.completedSuccessfully();
841 if (
fb_.get() !=
nullptr) {
842 SendSourceTerminationSignalIfException sentry(
actReg_.get());
843 input_->closeFile(
fb_.get(), cleaningUpAfterException);
844 sentry.completedSuccessfully();
846 FDEBUG(1) <<
"\tcloseInputFile\n";
850 if (
fb_.get() !=
nullptr) {
854 FDEBUG(1) <<
"\topenOutputFiles\n";
858 if (
fb_.get() !=
nullptr) {
862 FDEBUG(1) <<
"\tcloseOutputFiles\n";
867 if (
fb_.get() !=
nullptr) {
871 FDEBUG(1) <<
"\trespondToOpenInputFile\n";
875 if (
fb_.get() !=
nullptr) {
879 FDEBUG(1) <<
"\trespondToCloseInputFile\n";
889 FDEBUG(1) <<
"\tstartingNewLoop\n";
895 looper_->setModuleChanger(&changer);
897 looper_->setModuleChanger(
nullptr);
901 FDEBUG(1) <<
"\tendOfLoop\n";
908 FDEBUG(1) <<
"\trewind\n";
913 FDEBUG(1) <<
"\tprepareForNextLoop\n";
917 FDEBUG(1) <<
"\tshouldWeCloseOutput\n";
920 if(subProcess.shouldWeCloseOutput()) {
930 FDEBUG(1) <<
"\tdoErrorStuff\n";
932 <<
"The EventProcessor state machine encountered an unexpected event\n" 933 <<
"and went to the error state\n" 934 <<
"Will attempt to terminate processing normally\n" 935 <<
"(IF using the looper the next loop will be attempted)\n" 936 <<
"This likely indicates a bug in an input module or corrupted input or both\n";
940 globalBeginSucceeded =
false;
943 SendSourceTerminationSignalIfException sentry(
actReg_.get());
946 sentry.completedSuccessfully();
955 SendSourceTerminationSignalIfException sentry(
actReg_.get());
957 sentry.completedSuccessfully();
969 globalWaitTask->increment_ref_count();
977 globalWaitTask->wait_for_all();
978 if(globalWaitTask->exceptionPtr() !=
nullptr) {
979 std::rethrow_exception(* (globalWaitTask->exceptionPtr()) );
982 globalBeginSucceeded =
true;
983 FDEBUG(1) <<
"\tbeginRun " << run <<
"\n";
990 streamLoopWaitTask->increment_ref_count();
994 beginStreamsTransitionAsync<Traits>(streamLoopWaitTask.get(),
1003 streamLoopWaitTask->wait_for_all();
1004 if(streamLoopWaitTask->exceptionPtr() !=
nullptr) {
1005 std::rethrow_exception(* (streamLoopWaitTask->exceptionPtr()) );
1008 FDEBUG(1) <<
"\tstreamBeginRun " << run <<
"\n";
1016 endRun(phid, run, globalBeginSucceeded, cleaningUpAfterException);
1018 if(globalBeginSucceeded) {
1020 t->increment_ref_count();
1023 if(
t->exceptionPtr()) {
1024 std::rethrow_exception(*
t->exceptionPtr());
1037 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1039 sentry.completedSuccessfully();
1042 if(globalBeginSucceeded){
1045 streamLoopWaitTask->increment_ref_count();
1057 cleaningUpAfterException);
1059 streamLoopWaitTask->wait_for_all();
1060 if(streamLoopWaitTask->exceptionPtr() !=
nullptr) {
1061 std::rethrow_exception(* (streamLoopWaitTask->exceptionPtr()) );
1064 FDEBUG(1) <<
"\tstreamEndRun " << run <<
"\n";
1070 globalWaitTask->increment_ref_count();
1080 cleaningUpAfterException);
1081 globalWaitTask->wait_for_all();
1082 if(globalWaitTask->exceptionPtr() !=
nullptr) {
1083 std::rethrow_exception(* (globalWaitTask->exceptionPtr()) );
1086 FDEBUG(1) <<
"\tendRun " << run <<
"\n";
1095 waitTask->increment_ref_count();
1102 input_->luminosityBlockAuxiliary()->beginTime()),
1106 waitTask->wait_for_all();
1108 if(waitTask->exceptionPtr() !=
nullptr) {
1109 std::rethrow_exception(* (waitTask->exceptionPtr()) );
1135 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1138 sentry.completedSuccessfully();
1144 rng->preBeginLumi(lb);
1151 ,[
this, holder = iHolder,
status, ts] (std::exception_ptr
const* iPtr)
mutable {
1153 holder.doneWaiting(*iPtr);
1156 status->globalBeginDidSucceed();
1164 holder.doneWaiting(std::current_exception());
1175 [
this,
i,
h = holder](std::exception_ptr
const* iPtr)
mutable 1178 h.doneWaiting(*iPtr);
1186 auto lp =
status->lumiPrincipal();
1187 event.setLuminosityBlockPrincipal(lp.get());
1201 beginGlobalTransitionAsync<Traits>(beginStreamsHolder,
1203 *(
status->lumiPrincipal()),
1224 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1226 sentry.completedSuccessfully();
1242 status->needToContinueLumi();
1243 status->startProcessingEvents();
1246 unsigned int streamIndex = 0;
1249 [
this,streamIndex,
h = iHolder](){
1261 std::exception_ptr ptr;
1272 auto& lp = *(
status->lumiPrincipal());
1278 ptr = std::current_exception();
1285 status->resumeGlobalLumiQueue();
1290 ptr = std::current_exception();
1299 task.doneWaiting(*iExcept);
1302 if(
status->didGlobalBeginSucceed()) {
1307 auto& lp = *(iLumiStatus->lumiPrincipal());
1323 iLumiStatus->cleaningUpAfterException());
1327 unsigned int iStreamIndex,
1328 std::shared_ptr<LuminosityBlockProcessingStatus> iLumiStatus) {
1330 auto t =
edm::make_waiting_task(tbb::task::allocate_root(), [
this, iStreamIndex, iTask](std::exception_ptr
const* iPtr)
mutable {
1331 std::exception_ptr ptr;
1342 if(
status->streamFinishedLumi()) {
1350 iLumiStatus->setEndTime();
1352 if(iLumiStatus->didGlobalBeginSucceed()) {
1353 auto & lumiPrincipal = *iLumiStatus->lumiPrincipal();
1355 lumiPrincipal.endTime());
1358 bool cleaningUpAfterException = iLumiStatus->cleaningUpAfterException();
1361 endStreamTransitionAsync<Traits>(
std::move(lumiDoneTask),
1363 lumiPrincipal,ts,es,
1373 globalWaitTask->increment_ref_count();
1382 globalWaitTask->wait_for_all();
1383 if(globalWaitTask->exceptionPtr() !=
nullptr) {
1384 std::rethrow_exception(* (globalWaitTask->exceptionPtr()) );
1392 <<
"EventProcessor::readRun\n" 1393 <<
"Illegal attempt to insert run into cache\n" 1394 <<
"Contact a Framework Developer\n";
1398 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1400 sentry.completedSuccessfully();
1402 assert(
input_->reducedProcessHistoryID() == rp->reducedProcessHistoryID());
1404 return std::make_pair(rp->reducedProcessHistoryID(),
input_->run());
1411 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1412 input_->readAndMergeRun(*runPrincipal);
1413 sentry.completedSuccessfully();
1415 assert(
input_->reducedProcessHistoryID() == runPrincipal->reducedProcessHistoryID());
1416 return std::make_pair(runPrincipal->reducedProcessHistoryID(),
input_->run());
1422 <<
"EventProcessor::readLuminosityBlock\n" 1423 <<
"Illegal attempt to insert lumi into cache\n" 1424 <<
"Run is invalid\n" 1425 <<
"Contact a Framework Developer\n";
1429 lbp->setAux(*
input_->luminosityBlockAuxiliary());
1431 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1433 sentry.completedSuccessfully();
1441 assert(lumiPrincipal.aux().sameIdentity(*
input_->luminosityBlockAuxiliary())
or 1442 input_->processHistoryRegistry().reducedProcessHistoryID(lumiPrincipal.aux().processHistoryID()) ==
input_->processHistoryRegistry().reducedProcessHistoryID(
input_->luminosityBlockAuxiliary()->processHistoryID()));
1443 bool lumiOK = lumiPrincipal.adjustToNewProductRegistry(*
preg());
1445 lumiPrincipal.mergeAuxiliary(*
input_->luminosityBlockAuxiliary());
1447 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1449 sentry.completedSuccessfully();
1451 return input_->luminosityBlock();
1455 auto subsT =
edm::make_waiting_task(tbb::task::allocate_root(), [
this,phid,run,task](std::exception_ptr
const* iExcept)
mutable {
1461 s.writeRunAsync(task,phid,run);
1471 for_all(
subProcesses_, [run,phid](
auto& subProcess){ subProcess.deleteRunFromCache(phid, run); });
1472 FDEBUG(1) <<
"\tdeleteRunFromCache " << run <<
"\n";
1476 auto subsT =
edm::make_waiting_task(tbb::task::allocate_root(), [
this,task, iStatus](std::exception_ptr
const* iExcept)
mutable {
1482 s.writeLumiAsync(task,*(iStatus->lumiPrincipal()));
1516 std::lock_guard<std::recursive_mutex> guard(*(
sourceMutex_.get()));
1529 input_->luminosityBlockAuxiliary()->beginTime()));
1547 bool expected =
false;
1557 unsigned int iStreamIndex)
1564 auto recursionTask =
make_waiting_task(tbb::task::allocate_root(), [
this,iTask,iStreamIndex](std::exception_ptr
const* iPtr)
mutable {
1566 bool expected =
false;
1580 if(
status->isLumiEnding()) {
1591 bool expected =
false;
1593 auto e =std::current_exception();
1606 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1610 sentry.completedSuccessfully();
1612 FDEBUG(1) <<
"\treadEvent\n";
1616 unsigned int iStreamIndex) {
1623 unsigned int iStreamIndex) {
1630 rng->postEventRead(ev);
1634 tbb::task::allocate_root(),
1635 [
this,pep,iHolder](std::exception_ptr
const* iPtr)
mutable 1644 FDEBUG(1) <<
"\tprocessEvent\n";
1645 pep->clearEventPrincipal();
1656 afterProcessTask =
std::move(finalizeEventTask);
1662 [
this,pep,finalizeEventTask] (std::exception_ptr
const* iPtr)
mutable 1669 subProcess.doEventAsync(finalizeEventTask,*pep);
1684 bool randomAccess =
input_->randomAccess();
1693 status =
looper_->doDuringLoop(iPrincipal,
esp_->eventSetup(), pc, &streamContext);
1713 FDEBUG(1) <<
"\tshouldWeStop\n";
1717 if(subProcess.terminate()) {
1739 bool expected =
false;
void readLuminosityBlock(LuminosityBlockProcessingStatus &)
void insert(std::shared_ptr< RunPrincipal > rp)
T getParameter(std::string const &) const
void readEvent(unsigned int iStreamIndex)
T getUntrackedParameter(std::string const &, T const &) const
ProcessContext processContext_
bool readNextEventForStream(unsigned int iStreamIndex, LuminosityBlockProcessingStatus &iLumiStatus)
void clear()
Not thread safe.
bool wasEventProcessingStopped() const
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
void deleteRunFromCache(ProcessHistoryID const &phid, RunNumber_t run)
SharedResourcesAcquirer sourceResourcesAcquirer_
Timestamp const & beginTime() const
edm::EventID specifiedEventTransition() const
InputSource::ItemType nextTransitionType()
void processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
def create(alignables, pedeDump, additionalData, outputFile, config)
edm::propagate_const< std::unique_ptr< InputSource > > input_
void globalEndLumiAsync(edm::WaitingTaskHolder iTask, std::shared_ptr< LuminosityBlockProcessingStatus > iLumiStatus)
std::unique_ptr< ExceptionToActionTable const > act_table_
static PFTauRenderPlugin instance
ParameterSetID id() const
FunctorTask< F > * make_functor_task(ALLOC &&iAlloc, F f)
void setExceptionMessageFiles(std::string &message)
void processEventAsyncImpl(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
std::unique_ptr< edm::LimitedTaskQueue > lumiQueue_
volatile std::atomic< bool > shutdown_flag
static LuminosityBlockNumber_t maxLuminosityBlockNumber()
void clearCounters()
Clears counters used by trigger report.
unsigned int numberOfRuns() const
void beginRun(ProcessHistoryID const &phid, RunNumber_t run, bool &globalBeginSucceeded)
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
bool checkForAsyncStopRequest(StatusCode &)
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
bool lastOperationSucceeded() const
unsigned int numberOfThreads() const
void streamEndLumiAsync(edm::WaitingTaskHolder iTask, unsigned int iStreamIndex, std::shared_ptr< LuminosityBlockProcessingStatus > iLumiStatus)
bool hasRunPrincipal() const
void push(T &&iAction)
asynchronously pushes functor iAction into queue
void adjustIndexesAfterProductRegistryAddition()
std::string exceptionMessageRuns_
void deleteLumiFromCache(LuminosityBlockProcessingStatus &)
PreallocationConfiguration preallocations_
unsigned int LuminosityBlockNumber_t
std::vector< SubProcess > subProcesses_
EventProcessor(std::string const &config, ServiceToken const &token=ServiceToken(), serviceregistry::ServiceLegacy=serviceregistry::kOverlapIsError, std::vector< std::string > const &defaultServices=std::vector< std::string >(), std::vector< std::string > const &forcedServices=std::vector< std::string >())
bool alreadyPrinted() const
void validateTopLevelParameterSets(ParameterSet *processParameterSet)
static std::string const input
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
void handleNextEventForStreamAsync(WaitingTaskHolder iTask, unsigned int iStreamIndex)
void setEndTime(Timestamp const &time)
std::unique_ptr< InputSource > makeInput(ParameterSet ¶ms, CommonParams const &common, std::shared_ptr< ProductRegistry > preg, std::shared_ptr< BranchIDListHelper > branchIDListHelper, std::shared_ptr< ThinnedAssociationsHelper > thinnedAssociationsHelper, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, PreallocationConfiguration const &allocations)
U second(std::pair< T, U > const &p)
std::shared_ptr< LuminosityBlockPrincipal > & lumiPrincipal()
void initialize(Schedule const *, std::shared_ptr< ProductRegistry const >)
void endUnfinishedRun(ProcessHistoryID const &phid, RunNumber_t run, bool globalBeginSucceeded, bool cleaningUpAfterException)
ServiceToken serviceToken_
void writeRunAsync(WaitingTaskHolder, ProcessHistoryID const &phid, RunNumber_t run)
bool endPathsEnabled() const
std::atomic< bool > deferredExceptionPtrIsSet_
bool resume()
Resumes processing if the queue was paused.
void doneWaiting(std::exception_ptr iExcept)
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
std::string exceptionMessageLumis_
LuminosityBlockNumber_t luminosityBlock() const
std::shared_ptr< ProductRegistry const > preg() const
void setExceptionMessageLumis(std::string &message)
std::vector< edm::SerialTaskQueue > streamQueues_
InputSource::ItemType lastTransitionType() const
void setExceptionMessageRuns(std::string &message)
bool taskHasFailed() const
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
Timestamp const & beginTime() const
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
void endRun(ProcessHistoryID const &phid, RunNumber_t run, bool globalBeginSucceeded, bool cleaningUpAfterException)
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
static void setThrowAnException(bool v)
void setLastOperationSucceeded(bool value)
StreamID streamID() const
void clear()
Not thread safe.
Timestamp const & endTime() const
void continueLumiAsync(edm::WaitingTaskHolder iHolder)
StatusCode runToCompletion()
bool continuingLumi() const
void addAdditionalInfo(std::string const &info)
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
unsigned int numberOfLuminosityBlocks() const
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
void writeLumiAsync(WaitingTaskHolder, std::shared_ptr< LuminosityBlockProcessingStatus >)
std::shared_ptr< edm::ParameterSet > parameterSet() const
InputSource::ItemType processLumis(std::shared_ptr< void > const &iRunResource)
element_type const * get() const
void adjustEventsToNewProductRegistry(std::shared_ptr< ProductRegistry const > reg)
SerialTaskQueueChain & serialQueueChain() const
std::shared_ptr< EDLooperBase > fillLooper(eventsetup::EventSetupsController &esController, eventsetup::EventSetupProvider &cp, ParameterSet ¶ms)
std::string exceptionMessageFiles_
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
void respondToCloseInputFile()
InputSource::ItemType lastSourceTransition_
std::vector< ParameterSet > popSubProcessVParameterSet(ParameterSet ¶meterSet)
void setProcessHistoryRegistry(ProcessHistoryRegistry const &phr)
StatusCode asyncStopStatusCodeFromProcessingEvents_
void checkForModuleDependencyCorrectness(edm::PathsAndConsumesOfModulesBase const &iPnC, bool iPrintDependencies)
void respondToOpenInputFile()
bool shouldWeCloseOutput() const
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
int readAndMergeLumi(LuminosityBlockProcessingStatus &)
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
void deleteRun(ProcessHistoryID const &phid, RunNumber_t run)
void addContext(std::string const &context)
void stopProcessingEvents()
static EventNumber_t maxEventNumber()
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
std::shared_ptr< RunPrincipal > const & runPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run) const
bool forceESCacheClearOnNewRun_
void beginLumiAsync(edm::IOVSyncValue const &iSyncValue, std::shared_ptr< void > const &iRunResource, edm::WaitingTaskHolder iHolder)
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
std::vector< std::vector< double > > tmp
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
void closeInputFile(bool cleaningUpAfterException)
unsigned int numberOfStreams() const
std::exception_ptr deferredExceptionPtr_
int totalEventsFailed() const
void push(const T &iAction)
asynchronously pushes functor iAction into queue
edm::SerialTaskQueue iovQueue_
PathsAndConsumesOfModules pathsAndConsumesOfModules_
edm::LuminosityBlockNumber_t nextLuminosityBlockID()
void setNextSyncValue(IOVSyncValue iValue)
void prepareForNextLoop()
std::shared_ptr< LuminosityBlockPrincipal > getAvailableLumiPrincipalPtr()
auto wrap(F iFunc) -> decltype(iFunc())
bool asyncStopRequestedWhileProcessingEvents_
std::shared_ptr< ActivityRegistry > actReg_
void call(std::function< void(void)>)
std::atomic< unsigned int > streamLumiActive_
void processEventWithLooper(EventPrincipal &)
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
T first(std::pair< T, U > const &p)
std::pair< ProcessHistoryID, RunNumber_t > readRun()
static ParentageRegistry * instance()
bool setDeferredException(std::exception_ptr)
int totalEventsPassed() const
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
ParameterSet const & registerIt()
std::pair< ProcessHistoryID, RunNumber_t > readAndMergeRun()
bool pause()
Pauses processing of additional tasks from the queue.
bool insertMapped(value_type const &v)
ParameterSet const & parameterSet(Provenance const &provenance)
Transition requestedTransition() const
T get(const Candidate &c)
static Registry * instance()
std::shared_ptr< EDLooperBase const > looper() const
void merge(std::shared_ptr< RunAuxiliary > aux, std::shared_ptr< ProductRegistry const > reg)
PrincipalCache principalCache_
bool shouldWeStop() const
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const
def operate(timelog, memlog, json_f, num)
void enableEndPaths(bool active)
void getTriggerReport(TriggerReport &rep) const
ParameterSet * getPSetForUpdate(std::string const &name, bool &isTracked)
std::pair< edm::ProcessHistoryID, edm::RunNumber_t > nextRunID()
int maxSecondsUntilRampdown_