71 #include "boost/range/adaptor/reversed.hpp" 95 class SendSourceTerminationSignalIfException {
99 ~SendSourceTerminationSignalIfException() {
104 void completedSuccessfully() {
116 std::unique_ptr<InputSource>
119 std::shared_ptr<ProductRegistry> preg,
120 std::shared_ptr<BranchIDListHelper> branchIDListHelper,
121 std::shared_ptr<ThinnedAssociationsHelper> thinnedAssociationsHelper,
122 std::shared_ptr<ActivityRegistry> areg,
123 std::shared_ptr<ProcessConfiguration const> processConfiguration,
126 if(main_input ==
nullptr) {
128 <<
"There must be exactly one source in the configuration.\n" 129 <<
"It is missing (or there are sufficient syntax errors such that it is not recognized as the source)\n";
134 std::unique_ptr<ParameterSetDescriptionFillerBase>
filler(
137 filler->fill(descriptions);
141 descriptions.validate(*main_input,
std::string(
"source"));
145 std::ostringstream ost;
146 ost <<
"Validating configuration of input source of type " << modtype;
162 processConfiguration.get(),
163 ModuleDescription::getUniqueID());
169 areg->preSourceConstructionSignal_(md);
170 std::unique_ptr<InputSource>
input;
173 std::shared_ptr<int> sentry(
nullptr,[areg,&md](
void*){areg->postSourceConstructionSignal_(md);});
175 input = std::unique_ptr<InputSource>(
InputSourceFactory::get()->makeInputSource(*main_input, isdesc).release());
176 input->preEventReadFromSourceSignal_.connect(std::cref(areg->preEventReadFromSourceSignal_));
177 input->postEventReadFromSourceSignal_.connect(std::cref(areg->postEventReadFromSourceSignal_));
181 std::ostringstream ost;
182 ost <<
"Constructing input source of type " << modtype;
190 std::shared_ptr<EDLooperBase>
194 std::shared_ptr<EDLooperBase> vLooper;
196 std::vector<std::string> loopers = params.
getParameter<std::vector<std::string> >(
"@all_loopers");
198 if(loopers.empty()) {
202 assert(1 == loopers.size());
204 for(std::vector<std::string>::iterator itName = loopers.begin(), itNameEnd = loopers.end();
221 std::vector<std::string>
const& defaultServices,
222 std::vector<std::string>
const& forcedServices) :
225 branchIDListHelper_(),
228 espController_(new eventsetup::EventSetupsController),
231 processConfiguration_(),
237 deferredExceptionPtrIsSet_(
false),
241 beginJobCalled_(
false),
242 shouldWeStop_(
false),
243 fileModeNoMerge_(
false),
244 exceptionMessageFiles_(),
245 exceptionMessageRuns_(),
246 exceptionMessageLumis_(),
247 forceLooperToEnd_(
false),
248 looperBeginJobRun_(
false),
249 forceESCacheClearOnNewRun_(
false),
250 eventSetupDataToExcludeFromPrefetching_() {
252 auto processDesc = std::make_shared<ProcessDesc>(
parameterSet);
253 processDesc->addServices(defaultServices, forcedServices);
254 init(processDesc, iToken, iLegacy);
258 std::vector<std::string>
const& defaultServices,
259 std::vector<std::string>
const& forcedServices) :
291 auto processDesc = std::make_shared<ProcessDesc>(
parameterSet);
292 processDesc->addServices(defaultServices, forcedServices);
329 init(processDesc, token, legacy);
366 auto processDesc = std::make_shared<ProcessDesc>(
parameterSet);
370 auto processDesc = std::make_shared<ProcessDesc>(
config);
388 std::shared_ptr<ParameterSet>
parameterSet = processDesc->getProcessPSet();
392 bool const hasSubProcesses = !subProcessVParameterSet.empty();
400 ParameterSet const& optionsPset(parameterSet->getUntrackedParameterSet(
"options"));
402 if (fileMode !=
"NOMERGE" and fileMode !=
"FULLMERGE") {
405 <<
"Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
412 unsigned int nThreads = optionsPset.getUntrackedParameter<
unsigned int>(
"numberOfThreads");
416 assert(nThreads != 0);
418 unsigned int nStreams = optionsPset.getUntrackedParameter<
unsigned int>(
"numberOfStreams");
423 edm::LogInfo(
"ThreadStreamSetup") <<
"setting # threads "<<nThreads<<
"\nsetting # streams "<<nStreams;
425 unsigned int nConcurrentRuns = optionsPset.getUntrackedParameter<
unsigned int>(
"numberOfConcurrentRuns");
426 if (nConcurrentRuns != 1) {
428 <<
"Although the plan is to change this in the future, currently nConcurrentRuns must always be 1.\n";
430 unsigned int nConcurrentLumis = optionsPset.getUntrackedParameter<
unsigned int>(
"numberOfConcurrentLuminosityBlocks");
431 if (nConcurrentLumis == 0) {
432 nConcurrentLumis = nConcurrentRuns;
455 auto& serviceSets = processDesc->getServicesPSets();
456 ServiceToken token = items.initServices(serviceSets, *parameterSet, iToken, iLegacy,
true);
464 handler->willBeUsingThreads();
468 std::shared_ptr<CommonParams>
common(items.initMisc(*parameterSet));
476 looper_->setActionTable(items.act_table_.get());
477 looper_->attachTo(*items.actReg_);
487 lumiQueue_ = std::make_unique<LimitedTaskQueue>(nConcurrentLumis);
495 items.branchIDListHelper(),
496 items.thinnedAssociationsHelper(),
498 items.processConfiguration(),
507 preg_ = items.preg();
514 FDEBUG(2) << parameterSet << std::endl;
532 for(
auto& subProcessPSet : subProcessVParameterSet) {
580 actReg_->preallocateSignal_(bounds);
608 ex.
addContext(
"Calling beginJob for the source");
614 actReg_->postBeginJobSignal_();
625 ExceptionCollector c(
"Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
634 c.
call([&subProcess,
i](){ subProcess.doEndStream(
i); } );
638 c.
call([actReg](){actReg->preEndJobSignal_();});
647 c.
call([actReg](){actReg->postEndJobSignal_();});
658 std::vector<ModuleDescription const*>
660 return schedule_->getAllModuleDescriptions();
699 #include "TransitionProcessors.icc" 704 bool returnValue =
false;
721 SendSourceTerminationSignalIfException sentry(
actReg_.get());
725 itemType =
input_->nextItemType();
729 sentry.completedSuccessfully();
741 std::pair<edm::ProcessHistoryID, edm::RunNumber_t>
743 return std::make_pair(
input_->reducedProcessHistoryID(),
input_->run());
748 return input_->luminosityBlock();
767 bool firstTime =
true;
777 auto trans = fp.processFiles(*
this);
789 <<
"Unexpected transition change " 824 FDEBUG(1) <<
" \treadFile\n";
826 SendSourceTerminationSignalIfException sentry(
actReg_.get());
829 if(size < preg_->
size()) {
837 sentry.completedSuccessfully();
841 if (
fb_.get() !=
nullptr) {
842 SendSourceTerminationSignalIfException sentry(
actReg_.get());
843 input_->closeFile(
fb_.get(), cleaningUpAfterException);
844 sentry.completedSuccessfully();
846 FDEBUG(1) <<
"\tcloseInputFile\n";
850 if (
fb_.get() !=
nullptr) {
854 FDEBUG(1) <<
"\topenOutputFiles\n";
858 if (
fb_.get() !=
nullptr) {
862 FDEBUG(1) <<
"\tcloseOutputFiles\n";
867 if (
fb_.get() !=
nullptr) {
871 FDEBUG(1) <<
"\trespondToOpenInputFile\n";
875 if (
fb_.get() !=
nullptr) {
879 FDEBUG(1) <<
"\trespondToCloseInputFile\n";
889 FDEBUG(1) <<
"\tstartingNewLoop\n";
895 looper_->setModuleChanger(&changer);
897 looper_->setModuleChanger(
nullptr);
901 FDEBUG(1) <<
"\tendOfLoop\n";
908 FDEBUG(1) <<
"\trewind\n";
913 FDEBUG(1) <<
"\tprepareForNextLoop\n";
917 FDEBUG(1) <<
"\tshouldWeCloseOutput\n";
920 if(subProcess.shouldWeCloseOutput()) {
930 FDEBUG(1) <<
"\tdoErrorStuff\n";
932 <<
"The EventProcessor state machine encountered an unexpected event\n" 933 <<
"and went to the error state\n" 934 <<
"Will attempt to terminate processing normally\n" 935 <<
"(IF using the looper the next loop will be attempted)\n" 936 <<
"This likely indicates a bug in an input module or corrupted input or both\n";
940 globalBeginSucceeded =
false;
943 SendSourceTerminationSignalIfException sentry(
actReg_.get());
946 sentry.completedSuccessfully();
955 SendSourceTerminationSignalIfException sentry(
actReg_.get());
957 sentry.completedSuccessfully();
969 globalWaitTask->increment_ref_count();
977 globalWaitTask->wait_for_all();
978 if(globalWaitTask->exceptionPtr() !=
nullptr) {
979 std::rethrow_exception(* (globalWaitTask->exceptionPtr()) );
982 globalBeginSucceeded =
true;
983 FDEBUG(1) <<
"\tbeginRun " << run <<
"\n";
990 streamLoopWaitTask->increment_ref_count();
994 beginStreamsTransitionAsync<Traits>(streamLoopWaitTask.get(),
1003 streamLoopWaitTask->wait_for_all();
1004 if(streamLoopWaitTask->exceptionPtr() !=
nullptr) {
1005 std::rethrow_exception(* (streamLoopWaitTask->exceptionPtr()) );
1008 FDEBUG(1) <<
"\tstreamBeginRun " << run <<
"\n";
1016 endRun(phid, run, globalBeginSucceeded, cleaningUpAfterException);
1018 if(globalBeginSucceeded) {
1020 t->increment_ref_count();
1023 if(
t->exceptionPtr()) {
1024 std::rethrow_exception(*
t->exceptionPtr());
1037 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1039 sentry.completedSuccessfully();
1042 if(globalBeginSucceeded){
1045 streamLoopWaitTask->increment_ref_count();
1057 cleaningUpAfterException);
1059 streamLoopWaitTask->wait_for_all();
1060 if(streamLoopWaitTask->exceptionPtr() !=
nullptr) {
1061 std::rethrow_exception(* (streamLoopWaitTask->exceptionPtr()) );
1064 FDEBUG(1) <<
"\tstreamEndRun " << run <<
"\n";
1070 globalWaitTask->increment_ref_count();
1080 cleaningUpAfterException);
1081 globalWaitTask->wait_for_all();
1082 if(globalWaitTask->exceptionPtr() !=
nullptr) {
1083 std::rethrow_exception(* (globalWaitTask->exceptionPtr()) );
1086 FDEBUG(1) <<
"\tendRun " << run <<
"\n";
1095 waitTask->increment_ref_count();
1102 input_->luminosityBlockAuxiliary()->beginTime()),
1106 waitTask->wait_for_all();
1108 if(waitTask->exceptionPtr() !=
nullptr) {
1109 std::rethrow_exception(* (waitTask->exceptionPtr()) );
1135 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1138 sentry.completedSuccessfully();
1144 rng->preBeginLumi(lb);
1151 ,[
this, holder = iHolder,
status, ts] (std::exception_ptr
const* iPtr)
mutable {
1153 holder.doneWaiting(*iPtr);
1156 status->globalBeginDidSucceed();
1164 holder.doneWaiting(std::current_exception());
1175 [
this,
i,
h = holder](std::exception_ptr
const* iPtr)
mutable 1178 h.doneWaiting(*iPtr);
1186 auto lp =
status->lumiPrincipal();
1187 event.setLuminosityBlockPrincipal(lp.get());
1201 beginGlobalTransitionAsync<Traits>(beginStreamsHolder,
1203 *(
status->lumiPrincipal()),
1224 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1226 sentry.completedSuccessfully();
1242 status->needToContinueLumi();
1243 status->startProcessingEvents();
1246 unsigned int streamIndex = 0;
1249 [
this,streamIndex,
h = iHolder](){
1262 std::exception_ptr ptr;
1275 auto& lp = *(
status->lumiPrincipal());
1281 ptr = std::current_exception();
1290 status->resumeGlobalLumiQueue();
1293 ptr = std::current_exception();
1300 ptr = std::current_exception();
1304 items.second.doneWaiting(ptr);
1309 task.doneWaiting(*iExcept);
1312 if(
status->didGlobalBeginSucceed()) {
1317 auto& lp = *(iLumiStatus->lumiPrincipal());
1333 iLumiStatus->cleaningUpAfterException());
1337 unsigned int iStreamIndex,
1338 std::shared_ptr<LuminosityBlockProcessingStatus> iLumiStatus) {
1340 auto t =
edm::make_waiting_task(tbb::task::allocate_root(), [
this, iStreamIndex, iTask](std::exception_ptr
const* iPtr)
mutable {
1341 std::exception_ptr ptr;
1352 if(
status->streamFinishedLumi()) {
1360 iLumiStatus->setEndTime();
1362 if(iLumiStatus->didGlobalBeginSucceed()) {
1363 auto & lumiPrincipal = *iLumiStatus->lumiPrincipal();
1365 lumiPrincipal.endTime());
1368 bool cleaningUpAfterException = iLumiStatus->cleaningUpAfterException();
1371 endStreamTransitionAsync<Traits>(
std::move(lumiDoneTask),
1373 lumiPrincipal,ts,es,
1383 globalWaitTask->increment_ref_count();
1392 globalWaitTask->wait_for_all();
1393 if(globalWaitTask->exceptionPtr() !=
nullptr) {
1394 std::rethrow_exception(* (globalWaitTask->exceptionPtr()) );
1402 <<
"EventProcessor::readRun\n" 1403 <<
"Illegal attempt to insert run into cache\n" 1404 <<
"Contact a Framework Developer\n";
1408 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1410 sentry.completedSuccessfully();
1412 assert(
input_->reducedProcessHistoryID() == rp->reducedProcessHistoryID());
1414 return std::make_pair(rp->reducedProcessHistoryID(),
input_->run());
1421 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1422 input_->readAndMergeRun(*runPrincipal);
1423 sentry.completedSuccessfully();
1425 assert(
input_->reducedProcessHistoryID() == runPrincipal->reducedProcessHistoryID());
1426 return std::make_pair(runPrincipal->reducedProcessHistoryID(),
input_->run());
1432 <<
"EventProcessor::readLuminosityBlock\n" 1433 <<
"Illegal attempt to insert lumi into cache\n" 1434 <<
"Run is invalid\n" 1435 <<
"Contact a Framework Developer\n";
1439 lbp->setAux(*
input_->luminosityBlockAuxiliary());
1441 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1443 sentry.completedSuccessfully();
1451 assert(lumiPrincipal.aux().sameIdentity(*
input_->luminosityBlockAuxiliary())
or 1452 input_->processHistoryRegistry().reducedProcessHistoryID(lumiPrincipal.aux().processHistoryID()) ==
input_->processHistoryRegistry().reducedProcessHistoryID(
input_->luminosityBlockAuxiliary()->processHistoryID()));
1453 bool lumiOK = lumiPrincipal.adjustToNewProductRegistry(*
preg());
1455 lumiPrincipal.mergeAuxiliary(*
input_->luminosityBlockAuxiliary());
1457 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1459 sentry.completedSuccessfully();
1461 return input_->luminosityBlock();
1465 auto subsT =
edm::make_waiting_task(tbb::task::allocate_root(), [
this,phid,run,task](std::exception_ptr
const* iExcept)
mutable {
1471 s.writeRunAsync(task,phid,run);
1481 for_all(
subProcesses_, [run,phid](
auto& subProcess){ subProcess.deleteRunFromCache(phid, run); });
1482 FDEBUG(1) <<
"\tdeleteRunFromCache " << run <<
"\n";
1486 auto subsT =
edm::make_waiting_task(tbb::task::allocate_root(), [
this,task, iStatus](std::exception_ptr
const* iExcept)
mutable {
1492 s.writeLumiAsync(task,*(iStatus->lumiPrincipal()));
1526 std::lock_guard<std::recursive_mutex> guard(*(
sourceMutex_.get()));
1539 input_->luminosityBlockAuxiliary()->beginTime()));
1557 bool expected =
false;
1567 unsigned int iStreamIndex)
1574 auto recursionTask =
make_waiting_task(tbb::task::allocate_root(), [
this,iTask,iStreamIndex](std::exception_ptr
const* iPtr)
mutable {
1576 bool expected =
false;
1590 if(
status->isLumiEnding()) {
1601 bool expected =
false;
1603 auto e =std::current_exception();
1616 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1620 sentry.completedSuccessfully();
1622 FDEBUG(1) <<
"\treadEvent\n";
1626 unsigned int iStreamIndex) {
1633 unsigned int iStreamIndex) {
1640 rng->postEventRead(ev);
1644 tbb::task::allocate_root(),
1645 [
this,pep,iHolder](std::exception_ptr
const* iPtr)
mutable 1654 FDEBUG(1) <<
"\tprocessEvent\n";
1655 pep->clearEventPrincipal();
1666 afterProcessTask =
std::move(finalizeEventTask);
1672 [
this,pep,finalizeEventTask] (std::exception_ptr
const* iPtr)
mutable 1679 subProcess.doEventAsync(finalizeEventTask,*pep);
1694 bool randomAccess =
input_->randomAccess();
1703 status =
looper_->doDuringLoop(iPrincipal,
esp_->eventSetup(), pc, &streamContext);
1723 FDEBUG(1) <<
"\tshouldWeStop\n";
1727 if(subProcess.terminate()) {
1749 bool expected =
false;
void readLuminosityBlock(LuminosityBlockProcessingStatus &)
void insert(std::shared_ptr< RunPrincipal > rp)
T getParameter(std::string const &) const
void readEvent(unsigned int iStreamIndex)
T getUntrackedParameter(std::string const &, T const &) const
ProcessContext processContext_
bool readNextEventForStream(unsigned int iStreamIndex, LuminosityBlockProcessingStatus &iLumiStatus)
void clear()
Not thread safe.
bool wasEventProcessingStopped() const
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
void deleteRunFromCache(ProcessHistoryID const &phid, RunNumber_t run)
SharedResourcesAcquirer sourceResourcesAcquirer_
Timestamp const & beginTime() const
edm::EventID specifiedEventTransition() const
InputSource::ItemType nextTransitionType()
FWCore Framework interface EventSetupRecordImplementation h
Helper function to determine trigger accepts.
void processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
def create(alignables, pedeDump, additionalData, outputFile, config)
edm::propagate_const< std::unique_ptr< InputSource > > input_
void globalEndLumiAsync(edm::WaitingTaskHolder iTask, std::shared_ptr< LuminosityBlockProcessingStatus > iLumiStatus)
std::unique_ptr< ExceptionToActionTable const > act_table_
static PFTauRenderPlugin instance
ParameterSetID id() const
FunctorTask< F > * make_functor_task(ALLOC &&iAlloc, F f)
void setExceptionMessageFiles(std::string &message)
void processEventAsyncImpl(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
std::unique_ptr< edm::LimitedTaskQueue > lumiQueue_
volatile std::atomic< bool > shutdown_flag
static LuminosityBlockNumber_t maxLuminosityBlockNumber()
void clearCounters()
Clears counters used by trigger report.
unsigned int numberOfRuns() const
void beginRun(ProcessHistoryID const &phid, RunNumber_t run, bool &globalBeginSucceeded)
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
bool checkForAsyncStopRequest(StatusCode &)
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
bool lastOperationSucceeded() const
unsigned int numberOfThreads() const
void streamEndLumiAsync(edm::WaitingTaskHolder iTask, unsigned int iStreamIndex, std::shared_ptr< LuminosityBlockProcessingStatus > iLumiStatus)
bool hasRunPrincipal() const
void push(T &&iAction)
asynchronously pushes functor iAction into queue
void adjustIndexesAfterProductRegistryAddition()
std::string exceptionMessageRuns_
void deleteLumiFromCache(LuminosityBlockProcessingStatus &)
PreallocationConfiguration preallocations_
unsigned int LuminosityBlockNumber_t
std::vector< SubProcess > subProcesses_
EventProcessor(std::string const &config, ServiceToken const &token=ServiceToken(), serviceregistry::ServiceLegacy=serviceregistry::kOverlapIsError, std::vector< std::string > const &defaultServices=std::vector< std::string >(), std::vector< std::string > const &forcedServices=std::vector< std::string >())
bool alreadyPrinted() const
void validateTopLevelParameterSets(ParameterSet *processParameterSet)
static std::string const input
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
void handleNextEventForStreamAsync(WaitingTaskHolder iTask, unsigned int iStreamIndex)
void setEndTime(Timestamp const &time)
std::unique_ptr< InputSource > makeInput(ParameterSet ¶ms, CommonParams const &common, std::shared_ptr< ProductRegistry > preg, std::shared_ptr< BranchIDListHelper > branchIDListHelper, std::shared_ptr< ThinnedAssociationsHelper > thinnedAssociationsHelper, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, PreallocationConfiguration const &allocations)
U second(std::pair< T, U > const &p)
std::shared_ptr< LuminosityBlockPrincipal > & lumiPrincipal()
void initialize(Schedule const *, std::shared_ptr< ProductRegistry const >)
void endUnfinishedRun(ProcessHistoryID const &phid, RunNumber_t run, bool globalBeginSucceeded, bool cleaningUpAfterException)
ServiceToken serviceToken_
void writeRunAsync(WaitingTaskHolder, ProcessHistoryID const &phid, RunNumber_t run)
bool endPathsEnabled() const
std::atomic< bool > deferredExceptionPtrIsSet_
bool resume()
Resumes processing if the queue was paused.
void doneWaiting(std::exception_ptr iExcept)
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
std::string exceptionMessageLumis_
LuminosityBlockNumber_t luminosityBlock() const
std::shared_ptr< ProductRegistry const > preg() const
void setExceptionMessageLumis(std::string &message)
std::vector< edm::SerialTaskQueue > streamQueues_
InputSource::ItemType lastTransitionType() const
void setExceptionMessageRuns(std::string &message)
bool taskHasFailed() const
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
Timestamp const & beginTime() const
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
void endRun(ProcessHistoryID const &phid, RunNumber_t run, bool globalBeginSucceeded, bool cleaningUpAfterException)
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
static void setThrowAnException(bool v)
void setLastOperationSucceeded(bool value)
StreamID streamID() const
void clear()
Not thread safe.
Timestamp const & endTime() const
void continueLumiAsync(edm::WaitingTaskHolder iHolder)
StatusCode runToCompletion()
bool continuingLumi() const
void addAdditionalInfo(std::string const &info)
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
unsigned int numberOfLuminosityBlocks() const
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
void writeLumiAsync(WaitingTaskHolder, std::shared_ptr< LuminosityBlockProcessingStatus >)
std::shared_ptr< edm::ParameterSet > parameterSet() const
InputSource::ItemType processLumis(std::shared_ptr< void > const &iRunResource)
element_type const * get() const
void adjustEventsToNewProductRegistry(std::shared_ptr< ProductRegistry const > reg)
SerialTaskQueueChain & serialQueueChain() const
std::shared_ptr< EDLooperBase > fillLooper(eventsetup::EventSetupsController &esController, eventsetup::EventSetupProvider &cp, ParameterSet ¶ms)
std::string exceptionMessageFiles_
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
void respondToCloseInputFile()
InputSource::ItemType lastSourceTransition_
std::vector< ParameterSet > popSubProcessVParameterSet(ParameterSet ¶meterSet)
void setProcessHistoryRegistry(ProcessHistoryRegistry const &phr)
StatusCode asyncStopStatusCodeFromProcessingEvents_
void checkForModuleDependencyCorrectness(edm::PathsAndConsumesOfModulesBase const &iPnC, bool iPrintDependencies)
void respondToOpenInputFile()
bool shouldWeCloseOutput() const
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
int readAndMergeLumi(LuminosityBlockProcessingStatus &)
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
void deleteRun(ProcessHistoryID const &phid, RunNumber_t run)
void addContext(std::string const &context)
void stopProcessingEvents()
static EventNumber_t maxEventNumber()
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
std::shared_ptr< RunPrincipal > const & runPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run) const
bool forceESCacheClearOnNewRun_
void beginLumiAsync(edm::IOVSyncValue const &iSyncValue, std::shared_ptr< void > const &iRunResource, edm::WaitingTaskHolder iHolder)
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
std::vector< std::vector< double > > tmp
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
void closeInputFile(bool cleaningUpAfterException)
unsigned int numberOfStreams() const
std::exception_ptr deferredExceptionPtr_
int totalEventsFailed() const
void push(const T &iAction)
asynchronously pushes functor iAction into queue
edm::SerialTaskQueue iovQueue_
PathsAndConsumesOfModules pathsAndConsumesOfModules_
edm::LuminosityBlockNumber_t nextLuminosityBlockID()
void setNextSyncValue(IOVSyncValue iValue)
void prepareForNextLoop()
std::shared_ptr< LuminosityBlockPrincipal > getAvailableLumiPrincipalPtr()
auto wrap(F iFunc) -> decltype(iFunc())
bool asyncStopRequestedWhileProcessingEvents_
std::shared_ptr< ActivityRegistry > actReg_
void call(std::function< void(void)>)
std::atomic< unsigned int > streamLumiActive_
void processEventWithLooper(EventPrincipal &)
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
T first(std::pair< T, U > const &p)
std::pair< ProcessHistoryID, RunNumber_t > readRun()
static ParentageRegistry * instance()
bool setDeferredException(std::exception_ptr)
int totalEventsPassed() const
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
ParameterSet const & registerIt()
std::pair< ProcessHistoryID, RunNumber_t > readAndMergeRun()
bool pause()
Pauses processing of additional tasks from the queue.
bool insertMapped(value_type const &v)
ParameterSet const & parameterSet(Provenance const &provenance)
Transition requestedTransition() const
T get(const Candidate &c)
static Registry * instance()
std::shared_ptr< EDLooperBase const > looper() const
void merge(std::shared_ptr< RunAuxiliary > aux, std::shared_ptr< ProductRegistry const > reg)
PrincipalCache principalCache_
bool shouldWeStop() const
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const
def operate(timelog, memlog, json_f, num)
void enableEndPaths(bool active)
void getTriggerReport(TriggerReport &rep) const
ParameterSet * getPSetForUpdate(std::string const &name, bool &isTracked)
std::pair< edm::ProcessHistoryID, edm::RunNumber_t > nextRunID()
int maxSecondsUntilRampdown_