69 #include "boost/range/adaptor/reversed.hpp" 92 class SendSourceTerminationSignalIfException {
96 ~SendSourceTerminationSignalIfException() {
101 void completedSuccessfully() {
113 std::unique_ptr<InputSource>
116 std::shared_ptr<ProductRegistry> preg,
117 std::shared_ptr<BranchIDListHelper> branchIDListHelper,
118 std::shared_ptr<ThinnedAssociationsHelper> thinnedAssociationsHelper,
119 std::shared_ptr<ActivityRegistry> areg,
120 std::shared_ptr<ProcessConfiguration const> processConfiguration,
123 if(main_input ==
nullptr) {
125 <<
"There must be exactly one source in the configuration.\n" 126 <<
"It is missing (or there are sufficient syntax errors such that it is not recognized as the source)\n";
131 std::unique_ptr<ParameterSetDescriptionFillerBase>
filler(
134 filler->fill(descriptions);
138 descriptions.validate(*main_input,
std::string(
"source"));
142 std::ostringstream ost;
143 ost <<
"Validating configuration of input source of type " << modtype;
159 processConfiguration.get(),
160 ModuleDescription::getUniqueID());
166 areg->preSourceConstructionSignal_(md);
167 std::unique_ptr<InputSource>
input;
170 std::shared_ptr<int> sentry(
nullptr,[areg,&md](
void*){areg->postSourceConstructionSignal_(md);});
172 input = std::unique_ptr<InputSource>(
InputSourceFactory::get()->makeInputSource(*main_input, isdesc).release());
173 input->preEventReadFromSourceSignal_.connect(std::cref(areg->preEventReadFromSourceSignal_));
174 input->postEventReadFromSourceSignal_.connect(std::cref(areg->postEventReadFromSourceSignal_));
178 std::ostringstream ost;
179 ost <<
"Constructing input source of type " << modtype;
187 std::shared_ptr<EDLooperBase>
191 std::shared_ptr<EDLooperBase> vLooper;
193 std::vector<std::string> loopers = params.
getParameter<std::vector<std::string> >(
"@all_loopers");
195 if(loopers.empty()) {
199 assert(1 == loopers.size());
201 for(std::vector<std::string>::iterator itName = loopers.begin(), itNameEnd = loopers.end();
218 std::vector<std::string>
const& defaultServices,
219 std::vector<std::string>
const& forcedServices) :
222 branchIDListHelper_(),
225 espController_(new eventsetup::EventSetupsController),
228 processConfiguration_(),
234 deferredExceptionPtrIsSet_(
false),
238 beginJobCalled_(
false),
239 shouldWeStop_(
false),
240 fileModeNoMerge_(
false),
241 exceptionMessageFiles_(),
242 exceptionMessageRuns_(),
243 exceptionMessageLumis_(),
244 forceLooperToEnd_(
false),
245 looperBeginJobRun_(
false),
246 forceESCacheClearOnNewRun_(
false),
247 eventSetupDataToExcludeFromPrefetching_() {
249 auto processDesc = std::make_shared<ProcessDesc>(
parameterSet);
250 processDesc->addServices(defaultServices, forcedServices);
251 init(processDesc, iToken, iLegacy);
255 std::vector<std::string>
const& defaultServices,
256 std::vector<std::string>
const& forcedServices) :
289 auto processDesc = std::make_shared<ProcessDesc>(
parameterSet);
290 processDesc->addServices(defaultServices, forcedServices);
328 init(processDesc, token, legacy);
366 auto processDesc = std::make_shared<ProcessDesc>(
parameterSet);
370 auto processDesc = std::make_shared<ProcessDesc>(
config);
388 std::shared_ptr<ParameterSet>
parameterSet = processDesc->getProcessPSet();
392 bool const hasSubProcesses = !subProcessVParameterSet.empty();
397 if(fileMode !=
"NOMERGE" and fileMode !=
"FULLMERGE") {
400 <<
"Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
406 unsigned int nThreads=1;
407 if(optionsPset.existsAs<
unsigned int>(
"numberOfThreads",
false)) {
408 nThreads = optionsPset.getUntrackedParameter<
unsigned int>(
"numberOfThreads");
416 unsigned int nStreams =1;
417 if(optionsPset.existsAs<
unsigned int>(
"numberOfStreams",
false)) {
418 nStreams = optionsPset.getUntrackedParameter<
unsigned int>(
"numberOfStreams");
424 edm::LogInfo(
"ThreadStreamSetup") <<
"setting # threads "<<nThreads<<
"\nsetting # streams "<<nStreams;
430 unsigned int nConcurrentRuns =1;
436 unsigned int nConcurrentLumis =1;
464 auto& serviceSets = processDesc->getServicesPSets();
473 handler->willBeUsingThreads();
477 std::shared_ptr<CommonParams>
common(items.
initMisc(*parameterSet));
519 FDEBUG(2) << parameterSet << std::endl;
531 for(
auto& subProcessPSet : subProcessVParameterSet) {
579 actReg_->preallocateSignal_(bounds);
607 ex.
addContext(
"Calling beginJob for the source");
613 actReg_->postBeginJobSignal_();
624 ExceptionCollector c(
"Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
633 c.
call([&subProcess,
i](){ subProcess.doEndStream(
i); } );
637 c.
call([actReg](){actReg->preEndJobSignal_();});
646 c.
call([actReg](){actReg->postEndJobSignal_();});
657 std::vector<ModuleDescription const*>
659 return schedule_->getAllModuleDescriptions();
698 #include "TransitionProcessors.icc" 703 bool returnValue =
false;
715 SendSourceTerminationSignalIfException sentry(
actReg_.get());
719 itemType =
input_->nextItemType();
722 sentry.completedSuccessfully();
734 std::pair<edm::ProcessHistoryID, edm::RunNumber_t>
736 return std::make_pair(
input_->reducedProcessHistoryID(),
input_->run());
741 return input_->luminosityBlock();
762 bool firstTime =
true;
772 auto trans = fp.processFiles(*
this);
784 <<
"Unexpected transition change " 819 FDEBUG(1) <<
" \treadFile\n";
821 SendSourceTerminationSignalIfException sentry(
actReg_.get());
824 if(size < preg_->
size()) {
832 sentry.completedSuccessfully();
836 if (
fb_.get() !=
nullptr) {
837 SendSourceTerminationSignalIfException sentry(
actReg_.get());
838 input_->closeFile(
fb_.get(), cleaningUpAfterException);
839 sentry.completedSuccessfully();
841 FDEBUG(1) <<
"\tcloseInputFile\n";
845 if (
fb_.get() !=
nullptr) {
849 FDEBUG(1) <<
"\topenOutputFiles\n";
853 if (
fb_.get() !=
nullptr) {
857 FDEBUG(1) <<
"\tcloseOutputFiles\n";
862 if (
fb_.get() !=
nullptr) {
866 FDEBUG(1) <<
"\trespondToOpenInputFile\n";
870 if (
fb_.get() !=
nullptr) {
874 FDEBUG(1) <<
"\trespondToCloseInputFile\n";
884 FDEBUG(1) <<
"\tstartingNewLoop\n";
890 looper_->setModuleChanger(&changer);
892 looper_->setModuleChanger(
nullptr);
896 FDEBUG(1) <<
"\tendOfLoop\n";
903 FDEBUG(1) <<
"\trewind\n";
908 FDEBUG(1) <<
"\tprepareForNextLoop\n";
912 FDEBUG(1) <<
"\tshouldWeCloseOutput\n";
915 if(subProcess.shouldWeCloseOutput()) {
925 FDEBUG(1) <<
"\tdoErrorStuff\n";
927 <<
"The EventProcessor state machine encountered an unexpected event\n" 928 <<
"and went to the error state\n" 929 <<
"Will attempt to terminate processing normally\n" 930 <<
"(IF using the looper the next loop will be attempted)\n" 931 <<
"This likely indicates a bug in an input module or corrupted input or both\n";
937 SendSourceTerminationSignalIfException sentry(
actReg_.get());
940 sentry.completedSuccessfully();
949 SendSourceTerminationSignalIfException sentry(
actReg_.get());
951 sentry.completedSuccessfully();
963 globalWaitTask->increment_ref_count();
970 globalWaitTask->wait_for_all();
971 if(globalWaitTask->exceptionPtr() !=
nullptr) {
972 std::rethrow_exception(* (globalWaitTask->exceptionPtr()) );
975 FDEBUG(1) <<
"\tbeginRun " << run <<
"\n";
982 streamLoopWaitTask->increment_ref_count();
986 beginStreamsTransitionAsync<Traits>(streamLoopWaitTask.get(),
994 streamLoopWaitTask->wait_for_all();
995 if(streamLoopWaitTask->exceptionPtr() !=
nullptr) {
996 std::rethrow_exception(* (streamLoopWaitTask->exceptionPtr()) );
999 FDEBUG(1) <<
"\tstreamBeginRun " << run <<
"\n";
1013 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1018 sentry.completedSuccessfully();
1024 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1026 sentry.completedSuccessfully();
1032 streamLoopWaitTask->increment_ref_count();
1036 endStreamsTransitionAsync<Traits>(streamLoopWaitTask.get(),
1043 cleaningUpAfterException);
1045 streamLoopWaitTask->wait_for_all();
1046 if(streamLoopWaitTask->exceptionPtr() !=
nullptr) {
1047 std::rethrow_exception(* (streamLoopWaitTask->exceptionPtr()) );
1050 FDEBUG(1) <<
"\tstreamEndRun " << run <<
"\n";
1056 globalWaitTask->increment_ref_count();
1065 cleaningUpAfterException);
1066 globalWaitTask->wait_for_all();
1067 if(globalWaitTask->exceptionPtr() !=
nullptr) {
1068 std::rethrow_exception(* (globalWaitTask->exceptionPtr()) );
1071 FDEBUG(1) <<
"\tendRun " << run <<
"\n";
1080 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1083 sentry.completedSuccessfully();
1089 rng->preBeginLumi(lb);
1096 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1098 sentry.completedSuccessfully();
1104 globalWaitTask->increment_ref_count();
1111 globalWaitTask->wait_for_all();
1112 if(globalWaitTask->exceptionPtr() !=
nullptr) {
1113 std::rethrow_exception(* (globalWaitTask->exceptionPtr()) );
1116 FDEBUG(1) <<
"\tbeginLumi " << run <<
"/" << lumi <<
"\n";
1123 streamLoopWaitTask->increment_ref_count();
1127 beginStreamsTransitionAsync<Traits>(streamLoopWaitTask.get(),
1134 streamLoopWaitTask->wait_for_all();
1135 if(streamLoopWaitTask->exceptionPtr() !=
nullptr) {
1136 std::rethrow_exception(* (streamLoopWaitTask->exceptionPtr()) );
1140 FDEBUG(1) <<
"\tstreamBeginLumi " << run <<
"/" << lumi <<
"\n";
1154 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1159 sentry.completedSuccessfully();
1166 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1168 sentry.completedSuccessfully();
1174 streamLoopWaitTask->increment_ref_count();
1178 endStreamsTransitionAsync<Traits>(streamLoopWaitTask.get(),
1185 cleaningUpAfterException);
1186 streamLoopWaitTask->wait_for_all();
1187 if(streamLoopWaitTask->exceptionPtr() !=
nullptr) {
1188 std::rethrow_exception(* (streamLoopWaitTask->exceptionPtr()) );
1191 FDEBUG(1) <<
"\tendLumi " << run <<
"/" << lumi <<
"\n";
1197 globalWaitTask->increment_ref_count();
1206 cleaningUpAfterException);
1207 globalWaitTask->wait_for_all();
1208 if(globalWaitTask->exceptionPtr() !=
nullptr) {
1209 std::rethrow_exception(* (globalWaitTask->exceptionPtr()) );
1212 FDEBUG(1) <<
"\tendLumi " << run <<
"/" << lumi <<
"\n";
1221 <<
"EventProcessor::readRun\n" 1222 <<
"Illegal attempt to insert run into cache\n" 1223 <<
"Contact a Framework Developer\n";
1227 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1229 sentry.completedSuccessfully();
1231 assert(
input_->reducedProcessHistoryID() == rp->reducedProcessHistoryID());
1233 return std::make_pair(rp->reducedProcessHistoryID(),
input_->run());
1240 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1241 input_->readAndMergeRun(*runPrincipal);
1242 sentry.completedSuccessfully();
1244 assert(
input_->reducedProcessHistoryID() == runPrincipal->reducedProcessHistoryID());
1245 return std::make_pair(runPrincipal->reducedProcessHistoryID(),
input_->run());
1251 <<
"EventProcessor::readRun\n" 1252 <<
"Illegal attempt to insert lumi into cache\n" 1253 <<
"Contact a Framework Developer\n";
1257 <<
"EventProcessor::readRun\n" 1258 <<
"Illegal attempt to insert lumi into cache\n" 1259 <<
"Run is invalid\n" 1260 <<
"Contact a Framework Developer\n";
1264 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1266 sentry.completedSuccessfully();
1270 return input_->luminosityBlock();
1276 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1278 sentry.completedSuccessfully();
1280 return input_->luminosityBlock();
1286 FDEBUG(1) <<
"\twriteRun " << run <<
"\n";
1291 for_all(
subProcesses_, [run,phid](
auto& subProcess){ subProcess.deleteRunFromCache(phid, run); });
1292 FDEBUG(1) <<
"\tdeleteRunFromCache " << run <<
"\n";
1297 for_all(
subProcesses_, [&phid, run, lumi](
auto& subProcess){ subProcess.writeLumi(phid, run, lumi); });
1298 FDEBUG(1) <<
"\twriteLumi " << run <<
"/" << lumi <<
"\n";
1303 for_all(
subProcesses_, [&phid, run, lumi](
auto& subProcess){ subProcess.deleteLumiFromCache(phid, run, lumi); });
1304 FDEBUG(1) <<
"\tdeleteLumiFromCache " << run <<
"/" << lumi <<
"\n";
1308 std::atomic<bool>* finishedProcessingEvents) {
1317 if(finishedProcessingEvents->load(std::memory_order_acquire)) {
1326 std::lock_guard<std::recursive_mutex> guard(*(
sourceMutex_.get()));
1334 finishedProcessingEvents->store(
true,std::memory_order_release);
1344 firstEventInBlock_ =
false;
1348 bool expected =
false;
1359 unsigned int iStreamIndex,
1360 std::atomic<bool>* finishedProcessingEvents)
1362 auto recursionTask =
make_waiting_task(tbb::task::allocate_root(), [
this,iTask,iStreamIndex,finishedProcessingEvents](std::exception_ptr
const* iPtr) {
1364 bool expected =
false;
1373 iTask->decrement_ref_count();
1389 iTask->decrement_ref_count();
1402 std::atomic<bool> finishedProcessingEvents{
false};
1403 auto finishedProcessingEventsPtr = &finishedProcessingEvents;
1411 auto eventLoopWaitTaskPtr = eventLoopWaitTask.get();
1412 eventLoopWaitTask->increment_ref_count();
1415 unsigned int iStreamIndex = 0;
1416 for(; iStreamIndex<kNumStreams-1; ++iStreamIndex) {
1417 eventLoopWaitTask->increment_ref_count();
1418 tbb::task::enqueue( *
make_waiting_task(tbb::task::allocate_root(),[
this,iStreamIndex,finishedProcessingEventsPtr,eventLoopWaitTaskPtr](std::exception_ptr
const*){
1422 eventLoopWaitTask->increment_ref_count();
1423 eventLoopWaitTask->spawn_and_wait_for_all( *
make_waiting_task(tbb::task::allocate_root(),[
this,iStreamIndex,finishedProcessingEventsPtr,eventLoopWaitTaskPtr](std::exception_ptr
const*){
1439 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1441 sentry.completedSuccessfully();
1443 FDEBUG(1) <<
"\treadEvent\n";
1447 unsigned int iStreamIndex) {
1454 unsigned int iStreamIndex) {
1462 rng->postEventRead(ev);
1464 assert(pep->luminosityBlockPrincipalPtrValid());
1469 tbb::task::allocate_root(),
1470 [
this,pep,iHolder](std::exception_ptr
const* iPtr)
mutable 1479 FDEBUG(1) <<
"\tprocessEvent\n";
1480 pep->clearEventPrincipal();
1491 afterProcessTask =
std::move(finalizeEventTask);
1497 [
this,pep,finalizeEventTask] (std::exception_ptr
const* iPtr)
mutable 1506 subProcess.doEventAsync(finalizeEventTask,*pep);
1509 finalizeEventTask.doneWaiting(*iPtr);
1516 iStreamIndex,*pep,
esp_->eventSetup());
1521 bool randomAccess =
input_->randomAccess();
1530 status =
looper_->doDuringLoop(iPrincipal,
esp_->eventSetup(), pc, &streamContext);
1547 FDEBUG(1) <<
"\tshouldWeStop\n";
1551 if(subProcess.terminate()) {
1573 bool expected =
false;
std::shared_ptr< ActivityRegistry > actReg_
void insert(std::shared_ptr< RunPrincipal > rp)
T getParameter(std::string const &) const
void readEvent(unsigned int iStreamIndex)
T getUntrackedParameter(std::string const &, T const &) const
ProcessContext processContext_
void clear()
Not thread safe.
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
void deleteRunFromCache(ProcessHistoryID const &phid, RunNumber_t run)
SharedResourcesAcquirer sourceResourcesAcquirer_
void deleteLumi(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi)
Timestamp const & beginTime() const
edm::EventID specifiedEventTransition() const
InputSource::ItemType nextTransitionType()
FWCore Framework interface EventSetupRecordImplementation h
Helper function to determine trigger accepts.
void processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
def create(alignables, pedeDump, additionalData, outputFile, config)
std::unique_ptr< ExceptionToActionTable const > act_table_
edm::propagate_const< std::unique_ptr< InputSource > > input_
bool readNextEventForStream(unsigned int iStreamIndex, std::atomic< bool > *finishedProcessingEvents)
std::unique_ptr< ExceptionToActionTable const > act_table_
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
static PFTauRenderPlugin instance
void beginLumi(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi)
ParameterSetID id() const
FunctorTask< F > * make_functor_task(ALLOC &&iAlloc, F f)
void setExceptionMessageFiles(std::string &message)
void push(const T &iAction)
asynchronously pushes functor iAction into queue
void processEventAsyncImpl(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
void writeLumi(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi)
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
volatile std::atomic< bool > shutdown_flag
static LuminosityBlockNumber_t maxLuminosityBlockNumber()
Timestamp const & endTime() const
void clearCounters()
Clears counters used by trigger report.
unsigned int numberOfRuns() const
LuminosityBlockPrincipal & lumiPrincipal(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) const
void writeRun(ProcessHistoryID const &phid, RunNumber_t run)
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
bool checkForAsyncStopRequest(StatusCode &)
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
bool lastOperationSucceeded() const
unsigned int numberOfThreads() const
void setAtEndTransition(bool iAtEnd)
bool hasRunPrincipal() const
void adjustIndexesAfterProductRegistryAddition()
std::string exceptionMessageRuns_
PreallocationConfiguration preallocations_
unsigned int LuminosityBlockNumber_t
std::vector< SubProcess > subProcesses_
EventProcessor(std::string const &config, ServiceToken const &token=ServiceToken(), serviceregistry::ServiceLegacy=serviceregistry::kOverlapIsError, std::vector< std::string > const &defaultServices=std::vector< std::string >(), std::vector< std::string > const &forcedServices=std::vector< std::string >())
bool alreadyPrinted() const
static std::string const input
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
void setEndTime(Timestamp const &time)
std::unique_ptr< InputSource > makeInput(ParameterSet ¶ms, CommonParams const &common, std::shared_ptr< ProductRegistry > preg, std::shared_ptr< BranchIDListHelper > branchIDListHelper, std::shared_ptr< ThinnedAssociationsHelper > thinnedAssociationsHelper, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, PreallocationConfiguration const &allocations)
void endRun(ProcessHistoryID const &phid, RunNumber_t run, bool cleaningUpAfterException)
U second(std::pair< T, U > const &p)
void initialize(Schedule const *, std::shared_ptr< ProductRegistry const >)
ServiceToken serviceToken_
bool endPathsEnabled() const
std::atomic< bool > deferredExceptionPtrIsSet_
void doneWaiting(std::exception_ptr iExcept)
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
std::string exceptionMessageLumis_
LuminosityBlockNumber_t luminosityBlock() const
std::shared_ptr< ProductRegistry const > preg() const
void beginRun(ProcessHistoryID const &phid, RunNumber_t run)
void setExceptionMessageLumis(std::string &message)
void setExceptionMessageRuns(std::string &message)
std::shared_ptr< CommonParams > initMisc(ParameterSet ¶meterSet)
void setEndTime(Timestamp const &time)
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
Timestamp const & beginTime() const
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
InputSource::ItemType nextItemTypeFromProcessingEvents_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
static void setThrowAnException(bool v)
void setLastOperationSucceeded(bool value)
StreamID streamID() const
void clear()
Not thread safe.
Timestamp const & endTime() const
StatusCode runToCompletion()
void addAdditionalInfo(std::string const &info)
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
unsigned int numberOfLuminosityBlocks() const
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
std::shared_ptr< edm::ParameterSet > parameterSet() const
element_type const * get() const
void adjustEventsToNewProductRegistry(std::shared_ptr< ProductRegistry const > reg)
SerialTaskQueueChain & serialQueueChain() const
std::shared_ptr< EDLooperBase > fillLooper(eventsetup::EventSetupsController &esController, eventsetup::EventSetupProvider &cp, ParameterSet ¶ms)
std::string exceptionMessageFiles_
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
int readLuminosityBlock()
void respondToCloseInputFile()
std::vector< ParameterSet > popSubProcessVParameterSet(ParameterSet ¶meterSet)
void setProcessHistoryRegistry(ProcessHistoryRegistry const &phr)
std::shared_ptr< ProcessConfiguration const > processConfiguration() const
StatusCode asyncStopStatusCodeFromProcessingEvents_
bool hasLumiPrincipal() const
void checkForModuleDependencyCorrectness(edm::PathsAndConsumesOfModulesBase const &iPnC, bool iPrintDependencies)
void respondToOpenInputFile()
InputSource::ItemType readAndProcessEvents()
bool shouldWeCloseOutput() const
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
ServiceToken initServices(std::vector< ParameterSet > &servicePSets, ParameterSet &processPSet, ServiceToken const &iToken, serviceregistry::ServiceLegacy iLegacy, bool associate)
void endLumi(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi, bool cleaningUpAfterException)
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
void deleteRun(ProcessHistoryID const &phid, RunNumber_t run)
void resetFailedFromThisProcess()
ServiceToken addCPRandTNS(ParameterSet const ¶meterSet, ServiceToken const &token)
void addContext(std::string const &context)
static EventNumber_t maxEventNumber()
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
std::shared_ptr< RunPrincipal > const & runPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run) const
bool forceESCacheClearOnNewRun_
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
void deleteLumiFromCache(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi)
void closeInputFile(bool cleaningUpAfterException)
unsigned int numberOfStreams() const
std::exception_ptr deferredExceptionPtr_
int totalEventsFailed() const
std::shared_ptr< SignallingProductRegistry const > preg() const
std::shared_ptr< LuminosityBlockPrincipal > const & lumiPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) const
PathsAndConsumesOfModules pathsAndConsumesOfModules_
edm::LuminosityBlockNumber_t nextLuminosityBlockID()
void prepareForNextLoop()
auto wrap(F iFunc) -> decltype(iFunc())
bool asyncStopRequestedWhileProcessingEvents_
std::shared_ptr< ActivityRegistry > actReg_
void call(std::function< void(void)>)
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
void processEventWithLooper(EventPrincipal &)
void handleNextEventForStreamAsync(WaitingTask *iTask, unsigned int iStreamIndex, std::atomic< bool > *finishedProcessingEvents)
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
T first(std::pair< T, U > const &p)
std::pair< ProcessHistoryID, RunNumber_t > readRun()
static ParentageRegistry * instance()
bool setDeferredException(std::exception_ptr)
int totalEventsPassed() const
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
std::unique_ptr< Schedule > initSchedule(ParameterSet ¶meterSet, bool hasSubprocesses, PreallocationConfiguration const &iAllocConfig, ProcessContext const *)
ParameterSet const & registerIt()
std::pair< ProcessHistoryID, RunNumber_t > readAndMergeRun()
bool insertMapped(value_type const &v)
ParameterSet const & parameterSet(Provenance const &provenance)
Transition requestedTransition() const
T get(const Candidate &c)
static Registry * instance()
std::shared_ptr< EDLooperBase const > looper() const
void merge(std::shared_ptr< RunAuxiliary > aux, std::shared_ptr< ProductRegistry const > reg)
PrincipalCache principalCache_
bool shouldWeStop() const
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const
def operate(timelog, memlog, json_f, num)
void enableEndPaths(bool active)
void getTriggerReport(TriggerReport &rep) const
ParameterSet * getPSetForUpdate(std::string const &name, bool &isTracked)
std::pair< edm::ProcessHistoryID, edm::RunNumber_t > nextRunID()
int maxSecondsUntilRampdown_