72 #include "boost/range/adaptor/reversed.hpp" 96 class SendSourceTerminationSignalIfException {
100 ~SendSourceTerminationSignalIfException() {
105 void completedSuccessfully() {
117 std::unique_ptr<InputSource>
120 std::shared_ptr<ProductRegistry> preg,
121 std::shared_ptr<BranchIDListHelper> branchIDListHelper,
122 std::shared_ptr<ThinnedAssociationsHelper> thinnedAssociationsHelper,
123 std::shared_ptr<ActivityRegistry> areg,
124 std::shared_ptr<ProcessConfiguration const> processConfiguration,
127 if(main_input ==
nullptr) {
129 <<
"There must be exactly one source in the configuration.\n" 130 <<
"It is missing (or there are sufficient syntax errors such that it is not recognized as the source)\n";
135 std::unique_ptr<ParameterSetDescriptionFillerBase>
filler(
138 filler->fill(descriptions);
142 descriptions.validate(*main_input,
std::string(
"source"));
146 std::ostringstream ost;
147 ost <<
"Validating configuration of input source of type " << modtype;
163 processConfiguration.get(),
164 ModuleDescription::getUniqueID());
170 areg->preSourceConstructionSignal_(md);
171 std::unique_ptr<InputSource>
input;
174 std::shared_ptr<int> sentry(
nullptr,[areg,&md](
void*){areg->postSourceConstructionSignal_(md);});
176 input = std::unique_ptr<InputSource>(
InputSourceFactory::get()->makeInputSource(*main_input, isdesc).release());
177 input->preEventReadFromSourceSignal_.connect(std::cref(areg->preEventReadFromSourceSignal_));
178 input->postEventReadFromSourceSignal_.connect(std::cref(areg->postEventReadFromSourceSignal_));
182 std::ostringstream ost;
183 ost <<
"Constructing input source of type " << modtype;
191 std::shared_ptr<EDLooperBase>
195 std::shared_ptr<EDLooperBase> vLooper;
197 std::vector<std::string> loopers = params.
getParameter<std::vector<std::string> >(
"@all_loopers");
199 if(loopers.empty()) {
203 assert(1 == loopers.size());
205 for(std::vector<std::string>::iterator itName = loopers.begin(), itNameEnd = loopers.end();
222 std::vector<std::string>
const& defaultServices,
223 std::vector<std::string>
const& forcedServices) :
226 branchIDListHelper_(),
229 espController_(new eventsetup::EventSetupsController),
232 processConfiguration_(),
238 deferredExceptionPtrIsSet_(
false),
242 beginJobCalled_(
false),
243 shouldWeStop_(
false),
244 fileModeNoMerge_(
false),
245 exceptionMessageFiles_(),
246 exceptionMessageRuns_(),
247 exceptionMessageLumis_(),
248 forceLooperToEnd_(
false),
249 looperBeginJobRun_(
false),
250 forceESCacheClearOnNewRun_(
false),
251 eventSetupDataToExcludeFromPrefetching_() {
253 auto processDesc = std::make_shared<ProcessDesc>(
parameterSet);
254 processDesc->addServices(defaultServices, forcedServices);
255 init(processDesc, iToken, iLegacy);
259 std::vector<std::string>
const& defaultServices,
260 std::vector<std::string>
const& forcedServices) :
292 auto processDesc = std::make_shared<ProcessDesc>(
parameterSet);
293 processDesc->addServices(defaultServices, forcedServices);
330 init(processDesc, token, legacy);
367 auto processDesc = std::make_shared<ProcessDesc>(
parameterSet);
371 auto processDesc = std::make_shared<ProcessDesc>(
config);
389 std::shared_ptr<ParameterSet>
parameterSet = processDesc->getProcessPSet();
393 bool const hasSubProcesses = !subProcessVParameterSet.empty();
401 ParameterSet const& optionsPset(parameterSet->getUntrackedParameterSet(
"options"));
403 if (fileMode !=
"NOMERGE" and fileMode !=
"FULLMERGE") {
406 <<
"Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
413 unsigned int nThreads = optionsPset.getUntrackedParameter<
unsigned int>(
"numberOfThreads");
417 assert(nThreads != 0);
419 unsigned int nStreams = optionsPset.getUntrackedParameter<
unsigned int>(
"numberOfStreams");
423 if (nThreads > 1
or nStreams > 1) {
424 edm::LogInfo(
"ThreadStreamSetup") <<
"setting # threads "<<nThreads<<
"\nsetting # streams "<<nStreams;
426 unsigned int nConcurrentRuns = optionsPset.getUntrackedParameter<
unsigned int>(
"numberOfConcurrentRuns");
427 if (nConcurrentRuns != 1) {
429 <<
"Although the plan is to change this in the future, currently nConcurrentRuns must always be 1.\n";
431 unsigned int nConcurrentLumis = optionsPset.getUntrackedParameter<
unsigned int>(
"numberOfConcurrentLuminosityBlocks");
432 if (nConcurrentLumis == 0) {
433 nConcurrentLumis = nConcurrentRuns;
456 auto& serviceSets = processDesc->getServicesPSets();
457 ServiceToken token = items.initServices(serviceSets, *parameterSet, iToken, iLegacy,
true);
465 handler->willBeUsingThreads();
469 std::shared_ptr<CommonParams>
common(items.initMisc(*parameterSet));
477 looper_->setActionTable(items.act_table_.get());
478 looper_->attachTo(*items.actReg_);
488 lumiQueue_ = std::make_unique<LimitedTaskQueue>(nConcurrentLumis);
496 items.branchIDListHelper(),
497 items.thinnedAssociationsHelper(),
499 items.processConfiguration(),
508 preg_ = items.preg();
516 FDEBUG(2) << parameterSet << std::endl;
534 for(
auto& subProcessPSet : subProcessVParameterSet) {
582 actReg_->preallocateSignal_(bounds);
610 ex.
addContext(
"Calling beginJob for the source");
616 actReg_->postBeginJobSignal_();
627 ExceptionCollector c(
"Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
636 c.
call([&subProcess,
i](){ subProcess.doEndStream(
i); } );
640 c.
call([actReg](){actReg->preEndJobSignal_();});
649 c.
call([actReg](){actReg->postEndJobSignal_();});
660 std::vector<ModuleDescription const*>
662 return schedule_->getAllModuleDescriptions();
701 #include "TransitionProcessors.icc" 706 bool returnValue =
false;
723 SendSourceTerminationSignalIfException sentry(
actReg_.get());
727 itemType =
input_->nextItemType();
731 sentry.completedSuccessfully();
743 std::pair<edm::ProcessHistoryID, edm::RunNumber_t>
745 return std::make_pair(
input_->reducedProcessHistoryID(),
input_->run());
750 return input_->luminosityBlock();
769 bool firstTime =
true;
779 auto trans = fp.processFiles(*
this);
791 <<
"Unexpected transition change " 826 FDEBUG(1) <<
" \treadFile\n";
828 SendSourceTerminationSignalIfException sentry(
actReg_.get());
833 if(size < preg_->
size()) {
841 sentry.completedSuccessfully();
845 if (
fb_.get() !=
nullptr) {
846 SendSourceTerminationSignalIfException sentry(
actReg_.get());
847 input_->closeFile(
fb_.get(), cleaningUpAfterException);
848 sentry.completedSuccessfully();
850 FDEBUG(1) <<
"\tcloseInputFile\n";
854 if (
fb_.get() !=
nullptr) {
858 FDEBUG(1) <<
"\topenOutputFiles\n";
862 if (
fb_.get() !=
nullptr) {
866 FDEBUG(1) <<
"\tcloseOutputFiles\n";
871 if (
fb_.get() !=
nullptr) {
875 FDEBUG(1) <<
"\trespondToOpenInputFile\n";
879 if (
fb_.get() !=
nullptr) {
883 FDEBUG(1) <<
"\trespondToCloseInputFile\n";
893 FDEBUG(1) <<
"\tstartingNewLoop\n";
899 looper_->setModuleChanger(&changer);
901 looper_->setModuleChanger(
nullptr);
905 FDEBUG(1) <<
"\tendOfLoop\n";
912 FDEBUG(1) <<
"\trewind\n";
917 FDEBUG(1) <<
"\tprepareForNextLoop\n";
921 FDEBUG(1) <<
"\tshouldWeCloseOutput\n";
924 if(subProcess.shouldWeCloseOutput()) {
934 FDEBUG(1) <<
"\tdoErrorStuff\n";
936 <<
"The EventProcessor state machine encountered an unexpected event\n" 937 <<
"and went to the error state\n" 938 <<
"Will attempt to terminate processing normally\n" 939 <<
"(IF using the looper the next loop will be attempted)\n" 940 <<
"This likely indicates a bug in an input module or corrupted input or both\n";
944 bool& eventSetupForInstanceSucceeded) {
945 globalBeginSucceeded =
false;
948 SendSourceTerminationSignalIfException sentry(
actReg_.get());
951 sentry.completedSuccessfully();
960 SendSourceTerminationSignalIfException sentry(
actReg_.get());
962 eventSetupForInstanceSucceeded =
true;
963 sentry.completedSuccessfully();
975 globalWaitTask->increment_ref_count();
983 globalWaitTask->wait_for_all();
984 if(globalWaitTask->exceptionPtr() !=
nullptr) {
985 std::rethrow_exception(* (globalWaitTask->exceptionPtr()) );
988 globalBeginSucceeded =
true;
989 FDEBUG(1) <<
"\tbeginRun " << run <<
"\n";
996 streamLoopWaitTask->increment_ref_count();
1000 beginStreamsTransitionAsync<Traits>(streamLoopWaitTask.get(),
1009 streamLoopWaitTask->wait_for_all();
1010 if(streamLoopWaitTask->exceptionPtr() !=
nullptr) {
1011 std::rethrow_exception(* (streamLoopWaitTask->exceptionPtr()) );
1014 FDEBUG(1) <<
"\tstreamBeginRun " << run <<
"\n";
1021 bool globalBeginSucceeded,
bool cleaningUpAfterException,
1022 bool eventSetupForInstanceSucceeded) {
1023 if (eventSetupForInstanceSucceeded) {
1025 endRun(phid, run, globalBeginSucceeded, cleaningUpAfterException);
1027 if(globalBeginSucceeded) {
1029 t->increment_ref_count();
1036 if(
t->exceptionPtr()) {
1037 std::rethrow_exception(*
t->exceptionPtr());
1051 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1053 sentry.completedSuccessfully();
1056 if(globalBeginSucceeded){
1059 streamLoopWaitTask->increment_ref_count();
1071 cleaningUpAfterException);
1073 streamLoopWaitTask->wait_for_all();
1074 if(streamLoopWaitTask->exceptionPtr() !=
nullptr) {
1075 std::rethrow_exception(* (streamLoopWaitTask->exceptionPtr()) );
1078 FDEBUG(1) <<
"\tstreamEndRun " << run <<
"\n";
1084 globalWaitTask->increment_ref_count();
1094 cleaningUpAfterException);
1095 globalWaitTask->wait_for_all();
1096 if(globalWaitTask->exceptionPtr() !=
nullptr) {
1097 std::rethrow_exception(* (globalWaitTask->exceptionPtr()) );
1100 FDEBUG(1) <<
"\tendRun " << run <<
"\n";
1109 waitTask->increment_ref_count();
1116 input_->luminosityBlockAuxiliary()->beginTime()),
1120 waitTask->wait_for_all();
1122 if(waitTask->exceptionPtr() !=
nullptr) {
1123 std::rethrow_exception(* (waitTask->exceptionPtr()) );
1149 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1152 sentry.completedSuccessfully();
1158 rng->preBeginLumi(lb);
1165 ,[
this, holder = iHolder,
status, ts] (std::exception_ptr
const* iPtr)
mutable {
1167 holder.doneWaiting(*iPtr);
1170 status->globalBeginDidSucceed();
1178 holder.doneWaiting(std::current_exception());
1189 [
this,
i,
h = holder](std::exception_ptr
const* iPtr)
mutable 1192 h.doneWaiting(*iPtr);
1200 auto lp =
status->lumiPrincipal();
1201 event.setLuminosityBlockPrincipal(lp.get());
1215 beginGlobalTransitionAsync<Traits>(beginStreamsHolder,
1217 *(
status->lumiPrincipal()),
1238 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1240 sentry.completedSuccessfully();
1256 status->needToContinueLumi();
1257 status->startProcessingEvents();
1260 unsigned int streamIndex = 0;
1263 [
this,streamIndex,
h = iHolder](){
1276 std::exception_ptr ptr;
1289 auto& lp = *(
status->lumiPrincipal());
1295 ptr = std::current_exception();
1304 status->resumeGlobalLumiQueue();
1307 ptr = std::current_exception();
1314 ptr = std::current_exception();
1318 items.second.doneWaiting(ptr);
1323 task.doneWaiting(*iExcept);
1326 if(
status->didGlobalBeginSucceed()) {
1331 auto& lp = *(iLumiStatus->lumiPrincipal());
1347 iLumiStatus->cleaningUpAfterException());
1351 unsigned int iStreamIndex,
1352 std::shared_ptr<LuminosityBlockProcessingStatus> iLumiStatus) {
1354 auto t =
edm::make_waiting_task(tbb::task::allocate_root(), [
this, iStreamIndex, iTask](std::exception_ptr
const* iPtr)
mutable {
1355 std::exception_ptr ptr;
1366 if(
status->streamFinishedLumi()) {
1374 iLumiStatus->setEndTime();
1376 if(iLumiStatus->didGlobalBeginSucceed()) {
1377 auto & lumiPrincipal = *iLumiStatus->lumiPrincipal();
1379 lumiPrincipal.endTime());
1382 bool cleaningUpAfterException = iLumiStatus->cleaningUpAfterException();
1385 endStreamTransitionAsync<Traits>(
std::move(lumiDoneTask),
1387 lumiPrincipal,ts,es,
1397 globalWaitTask->increment_ref_count();
1406 globalWaitTask->wait_for_all();
1407 if(globalWaitTask->exceptionPtr() !=
nullptr) {
1408 std::rethrow_exception(* (globalWaitTask->exceptionPtr()) );
1416 <<
"EventProcessor::readRun\n" 1417 <<
"Illegal attempt to insert run into cache\n" 1418 <<
"Contact a Framework Developer\n";
1420 auto rp = std::make_shared<RunPrincipal>(
input_->runAuxiliary(),
preg(),
1424 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1426 sentry.completedSuccessfully();
1428 assert(
input_->reducedProcessHistoryID() == rp->reducedProcessHistoryID());
1430 return std::make_pair(rp->reducedProcessHistoryID(),
input_->run());
1437 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1438 input_->readAndMergeRun(*runPrincipal);
1439 sentry.completedSuccessfully();
1441 assert(
input_->reducedProcessHistoryID() == runPrincipal->reducedProcessHistoryID());
1442 return std::make_pair(runPrincipal->reducedProcessHistoryID(),
input_->run());
1448 <<
"EventProcessor::readLuminosityBlock\n" 1449 <<
"Illegal attempt to insert lumi into cache\n" 1450 <<
"Run is invalid\n" 1451 <<
"Contact a Framework Developer\n";
1455 lbp->setAux(*
input_->luminosityBlockAuxiliary());
1457 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1459 sentry.completedSuccessfully();
1467 assert(lumiPrincipal.aux().sameIdentity(*
input_->luminosityBlockAuxiliary())
or 1468 input_->processHistoryRegistry().reducedProcessHistoryID(lumiPrincipal.aux().processHistoryID()) ==
input_->processHistoryRegistry().reducedProcessHistoryID(
input_->luminosityBlockAuxiliary()->processHistoryID()));
1469 bool lumiOK = lumiPrincipal.adjustToNewProductRegistry(*
preg());
1471 lumiPrincipal.mergeAuxiliary(*
input_->luminosityBlockAuxiliary());
1473 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1475 sentry.completedSuccessfully();
1477 return input_->luminosityBlock();
1482 auto subsT =
edm::make_waiting_task(tbb::task::allocate_root(), [
this,phid,run,task,mergeableRunProductMetadata]
1483 (std::exception_ptr
const* iExcept)
mutable {
1489 s.writeRunAsync(task,phid,run,mergeableRunProductMetadata);
1500 for_all(
subProcesses_, [run,phid](
auto& subProcess){ subProcess.deleteRunFromCache(phid, run); });
1501 FDEBUG(1) <<
"\tdeleteRunFromCache " << run <<
"\n";
1505 auto subsT =
edm::make_waiting_task(tbb::task::allocate_root(), [
this,task, iStatus](std::exception_ptr
const* iExcept)
mutable {
1511 s.writeLumiAsync(task,*(iStatus->lumiPrincipal()));
1517 std::shared_ptr<LuminosityBlockPrincipal>
const& lumiPrincipal = iStatus->lumiPrincipal();
1518 lumiPrincipal->runPrincipal().mergeableRunProductMetadata()->writeLumi(lumiPrincipal->luminosityBlock());
1548 std::lock_guard<std::recursive_mutex> guard(*(
sourceMutex_.get()));
1561 input_->luminosityBlockAuxiliary()->beginTime()));
1579 bool expected =
false;
1589 unsigned int iStreamIndex)
1596 auto recursionTask =
make_waiting_task(tbb::task::allocate_root(), [
this,iTask,iStreamIndex](std::exception_ptr
const* iPtr)
mutable {
1598 bool expected =
false;
1612 if(
status->isLumiEnding()) {
1623 bool expected =
false;
1625 auto e =std::current_exception();
1638 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1642 sentry.completedSuccessfully();
1644 FDEBUG(1) <<
"\treadEvent\n";
1648 unsigned int iStreamIndex) {
1655 unsigned int iStreamIndex) {
1662 rng->postEventRead(ev);
1666 tbb::task::allocate_root(),
1667 [
this,pep,iHolder](std::exception_ptr
const* iPtr)
mutable 1676 FDEBUG(1) <<
"\tprocessEvent\n";
1677 pep->clearEventPrincipal();
1688 afterProcessTask =
std::move(finalizeEventTask);
1694 [
this,pep,finalizeEventTask] (std::exception_ptr
const* iPtr)
mutable 1701 subProcess.doEventAsync(finalizeEventTask,*pep);
1716 bool randomAccess =
input_->randomAccess();
1725 status =
looper_->doDuringLoop(iPrincipal,
esp_->eventSetup(), pc, &streamContext);
1745 FDEBUG(1) <<
"\tshouldWeStop\n";
1749 if(subProcess.terminate()) {
1771 bool expected =
false;
void readLuminosityBlock(LuminosityBlockProcessingStatus &)
void insert(std::shared_ptr< RunPrincipal > rp)
T getParameter(std::string const &) const
void readEvent(unsigned int iStreamIndex)
T getUntrackedParameter(std::string const &, T const &) const
ProcessContext processContext_
bool readNextEventForStream(unsigned int iStreamIndex, LuminosityBlockProcessingStatus &iLumiStatus)
void setProcessesWithMergeableRunProducts(ProductRegistry const &productRegistry)
void clear()
Not thread safe.
bool wasEventProcessingStopped() const
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
void deleteRunFromCache(ProcessHistoryID const &phid, RunNumber_t run)
SharedResourcesAcquirer sourceResourcesAcquirer_
Timestamp const & beginTime() const
edm::EventID specifiedEventTransition() const
InputSource::ItemType nextTransitionType()
FWCore Framework interface EventSetupRecordImplementation h
Helper function to determine trigger accepts.
void processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
def create(alignables, pedeDump, additionalData, outputFile, config)
edm::propagate_const< std::unique_ptr< InputSource > > input_
void globalEndLumiAsync(edm::WaitingTaskHolder iTask, std::shared_ptr< LuminosityBlockProcessingStatus > iLumiStatus)
std::unique_ptr< ExceptionToActionTable const > act_table_
static PFTauRenderPlugin instance
ParameterSetID id() const
FunctorTask< F > * make_functor_task(ALLOC &&iAlloc, F f)
void beginRun(ProcessHistoryID const &phid, RunNumber_t run, bool &globalBeginSucceeded, bool &eventSetupForInstanceSucceeded)
void setExceptionMessageFiles(std::string &message)
void processEventAsyncImpl(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
std::unique_ptr< edm::LimitedTaskQueue > lumiQueue_
volatile std::atomic< bool > shutdown_flag
static LuminosityBlockNumber_t maxLuminosityBlockNumber()
void clearCounters()
Clears counters used by trigger report.
unsigned int numberOfRuns() const
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
bool checkForAsyncStopRequest(StatusCode &)
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
bool lastOperationSucceeded() const
void writeRunAsync(WaitingTaskHolder, ProcessHistoryID const &phid, RunNumber_t run, MergeableRunProductMetadata const *)
unsigned int numberOfThreads() const
void streamEndLumiAsync(edm::WaitingTaskHolder iTask, unsigned int iStreamIndex, std::shared_ptr< LuminosityBlockProcessingStatus > iLumiStatus)
bool hasRunPrincipal() const
void push(T &&iAction)
asynchronously pushes functor iAction into queue
void adjustIndexesAfterProductRegistryAddition()
std::string exceptionMessageRuns_
void deleteLumiFromCache(LuminosityBlockProcessingStatus &)
PreallocationConfiguration preallocations_
unsigned int LuminosityBlockNumber_t
std::vector< SubProcess > subProcesses_
EventProcessor(std::string const &config, ServiceToken const &token=ServiceToken(), serviceregistry::ServiceLegacy=serviceregistry::kOverlapIsError, std::vector< std::string > const &defaultServices=std::vector< std::string >(), std::vector< std::string > const &forcedServices=std::vector< std::string >())
bool alreadyPrinted() const
void validateTopLevelParameterSets(ParameterSet *processParameterSet)
MergeableRunProductProcesses mergeableRunProductProcesses_
static std::string const input
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
void handleNextEventForStreamAsync(WaitingTaskHolder iTask, unsigned int iStreamIndex)
void setEndTime(Timestamp const &time)
std::unique_ptr< InputSource > makeInput(ParameterSet ¶ms, CommonParams const &common, std::shared_ptr< ProductRegistry > preg, std::shared_ptr< BranchIDListHelper > branchIDListHelper, std::shared_ptr< ThinnedAssociationsHelper > thinnedAssociationsHelper, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, PreallocationConfiguration const &allocations)
U second(std::pair< T, U > const &p)
std::shared_ptr< LuminosityBlockPrincipal > & lumiPrincipal()
void initialize(Schedule const *, std::shared_ptr< ProductRegistry const >)
ServiceToken serviceToken_
bool endPathsEnabled() const
std::atomic< bool > deferredExceptionPtrIsSet_
bool resume()
Resumes processing if the queue was paused.
void doneWaiting(std::exception_ptr iExcept)
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
std::string exceptionMessageLumis_
LuminosityBlockNumber_t luminosityBlock() const
std::shared_ptr< ProductRegistry const > preg() const
void setExceptionMessageLumis(std::string &message)
std::vector< edm::SerialTaskQueue > streamQueues_
InputSource::ItemType lastTransitionType() const
void setExceptionMessageRuns(std::string &message)
bool taskHasFailed() const
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
Timestamp const & beginTime() const
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
void endRun(ProcessHistoryID const &phid, RunNumber_t run, bool globalBeginSucceeded, bool cleaningUpAfterException)
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
static void setThrowAnException(bool v)
void setLastOperationSucceeded(bool value)
StreamID streamID() const
void clear()
Not thread safe.
Timestamp const & endTime() const
void continueLumiAsync(edm::WaitingTaskHolder iHolder)
StatusCode runToCompletion()
bool continuingLumi() const
void addAdditionalInfo(std::string const &info)
void endUnfinishedRun(ProcessHistoryID const &phid, RunNumber_t run, bool globalBeginSucceeded, bool cleaningUpAfterException, bool eventSetupForInstanceSucceeded)
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
unsigned int numberOfLuminosityBlocks() const
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
void writeLumiAsync(WaitingTaskHolder, std::shared_ptr< LuminosityBlockProcessingStatus >)
std::shared_ptr< edm::ParameterSet > parameterSet() const
InputSource::ItemType processLumis(std::shared_ptr< void > const &iRunResource)
element_type const * get() const
void adjustEventsToNewProductRegistry(std::shared_ptr< ProductRegistry const > reg)
SerialTaskQueueChain & serialQueueChain() const
std::shared_ptr< EDLooperBase > fillLooper(eventsetup::EventSetupsController &esController, eventsetup::EventSetupProvider &cp, ParameterSet ¶ms)
std::string exceptionMessageFiles_
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
void respondToCloseInputFile()
InputSource::ItemType lastSourceTransition_
std::vector< ParameterSet > popSubProcessVParameterSet(ParameterSet ¶meterSet)
void setProcessHistoryRegistry(ProcessHistoryRegistry const &phr)
StatusCode asyncStopStatusCodeFromProcessingEvents_
void checkForModuleDependencyCorrectness(edm::PathsAndConsumesOfModulesBase const &iPnC, bool iPrintDependencies)
void respondToOpenInputFile()
bool shouldWeCloseOutput() const
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
MergeableRunProductMetadata * mergeableRunProductMetadata()
int readAndMergeLumi(LuminosityBlockProcessingStatus &)
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
void deleteRun(ProcessHistoryID const &phid, RunNumber_t run)
void addContext(std::string const &context)
void stopProcessingEvents()
static EventNumber_t maxEventNumber()
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
std::shared_ptr< RunPrincipal > const & runPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run) const
bool forceESCacheClearOnNewRun_
void beginLumiAsync(edm::IOVSyncValue const &iSyncValue, std::shared_ptr< void > const &iRunResource, edm::WaitingTaskHolder iHolder)
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
std::vector< std::vector< double > > tmp
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
void closeInputFile(bool cleaningUpAfterException)
unsigned int numberOfStreams() const
std::exception_ptr deferredExceptionPtr_
int totalEventsFailed() const
void push(const T &iAction)
asynchronously pushes functor iAction into queue
edm::SerialTaskQueue iovQueue_
PathsAndConsumesOfModules pathsAndConsumesOfModules_
edm::LuminosityBlockNumber_t nextLuminosityBlockID()
void setNextSyncValue(IOVSyncValue iValue)
void prepareForNextLoop()
std::shared_ptr< LuminosityBlockPrincipal > getAvailableLumiPrincipalPtr()
auto wrap(F iFunc) -> decltype(iFunc())
bool asyncStopRequestedWhileProcessingEvents_
std::shared_ptr< ActivityRegistry > actReg_
void call(std::function< void(void)>)
std::atomic< unsigned int > streamLumiActive_
void processEventWithLooper(EventPrincipal &)
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
T first(std::pair< T, U > const &p)
std::pair< ProcessHistoryID, RunNumber_t > readRun()
static ParentageRegistry * instance()
bool setDeferredException(std::exception_ptr)
int totalEventsPassed() const
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
ParameterSet const & registerIt()
std::pair< ProcessHistoryID, RunNumber_t > readAndMergeRun()
bool pause()
Pauses processing of additional tasks from the queue.
bool insertMapped(value_type const &v)
ParameterSet const & parameterSet(Provenance const &provenance)
Transition requestedTransition() const
T get(const Candidate &c)
static Registry * instance()
std::shared_ptr< EDLooperBase const > looper() const
void merge(std::shared_ptr< RunAuxiliary > aux, std::shared_ptr< ProductRegistry const > reg)
PrincipalCache principalCache_
bool shouldWeStop() const
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const
def operate(timelog, memlog, json_f, num)
void enableEndPaths(bool active)
void getTriggerReport(TriggerReport &rep) const
ParameterSet * getPSetForUpdate(std::string const &name, bool &isTracked)
std::pair< edm::ProcessHistoryID, edm::RunNumber_t > nextRunID()
int maxSecondsUntilRampdown_