71 #include "boost/range/adaptor/reversed.hpp" 95 class SendSourceTerminationSignalIfException {
99 ~SendSourceTerminationSignalIfException() {
104 void completedSuccessfully() {
116 std::unique_ptr<InputSource>
119 std::shared_ptr<ProductRegistry> preg,
120 std::shared_ptr<BranchIDListHelper> branchIDListHelper,
121 std::shared_ptr<ThinnedAssociationsHelper> thinnedAssociationsHelper,
122 std::shared_ptr<ActivityRegistry> areg,
123 std::shared_ptr<ProcessConfiguration const> processConfiguration,
126 if(main_input ==
nullptr) {
128 <<
"There must be exactly one source in the configuration.\n" 129 <<
"It is missing (or there are sufficient syntax errors such that it is not recognized as the source)\n";
134 std::unique_ptr<ParameterSetDescriptionFillerBase>
filler(
137 filler->fill(descriptions);
141 descriptions.validate(*main_input,
std::string(
"source"));
145 std::ostringstream ost;
146 ost <<
"Validating configuration of input source of type " << modtype;
162 processConfiguration.get(),
163 ModuleDescription::getUniqueID());
169 areg->preSourceConstructionSignal_(md);
170 std::unique_ptr<InputSource>
input;
173 std::shared_ptr<int> sentry(
nullptr,[areg,&md](
void*){areg->postSourceConstructionSignal_(md);});
175 input = std::unique_ptr<InputSource>(
InputSourceFactory::get()->makeInputSource(*main_input, isdesc).release());
176 input->preEventReadFromSourceSignal_.connect(std::cref(areg->preEventReadFromSourceSignal_));
177 input->postEventReadFromSourceSignal_.connect(std::cref(areg->postEventReadFromSourceSignal_));
181 std::ostringstream ost;
182 ost <<
"Constructing input source of type " << modtype;
190 std::shared_ptr<EDLooperBase>
194 std::shared_ptr<EDLooperBase> vLooper;
196 std::vector<std::string> loopers = params.
getParameter<std::vector<std::string> >(
"@all_loopers");
198 if(loopers.empty()) {
202 assert(1 == loopers.size());
204 for(std::vector<std::string>::iterator itName = loopers.begin(), itNameEnd = loopers.end();
218 EventProcessor::EventProcessor(std::unique_ptr<ParameterSet>
parameterSet,
221 std::vector<std::string>
const& defaultServices,
222 std::vector<std::string>
const& forcedServices) :
225 branchIDListHelper_(),
228 espController_(new eventsetup::EventSetupsController),
231 processConfiguration_(),
237 deferredExceptionPtrIsSet_(
false),
241 beginJobCalled_(
false),
242 shouldWeStop_(
false),
243 fileModeNoMerge_(
false),
244 exceptionMessageFiles_(),
245 exceptionMessageRuns_(),
246 exceptionMessageLumis_(),
247 forceLooperToEnd_(
false),
248 looperBeginJobRun_(
false),
249 forceESCacheClearOnNewRun_(
false),
250 eventSetupDataToExcludeFromPrefetching_() {
251 auto processDesc = std::make_shared<ProcessDesc>(
std::move(parameterSet));
252 processDesc->addServices(defaultServices, forcedServices);
253 init(processDesc, iToken, iLegacy);
257 std::vector<std::string>
const& defaultServices,
258 std::vector<std::string>
const& forcedServices) :
289 auto processDesc = std::make_shared<ProcessDesc>(
std::move(parameterSet));
290 processDesc->addServices(defaultServices, forcedServices);
327 init(processDesc, token, legacy);
343 std::shared_ptr<ParameterSet>
parameterSet = processDesc->getProcessPSet();
347 bool const hasSubProcesses = !subProcessVParameterSet.empty();
355 ParameterSet const& optionsPset(parameterSet->getUntrackedParameterSet(
"options"));
357 if (fileMode !=
"NOMERGE" and fileMode !=
"FULLMERGE") {
360 <<
"Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
367 unsigned int nThreads = optionsPset.getUntrackedParameter<
unsigned int>(
"numberOfThreads");
371 assert(nThreads != 0);
373 unsigned int nStreams = optionsPset.getUntrackedParameter<
unsigned int>(
"numberOfStreams");
377 if (nThreads > 1
or nStreams > 1) {
378 edm::LogInfo(
"ThreadStreamSetup") <<
"setting # threads "<<nThreads<<
"\nsetting # streams "<<nStreams;
380 unsigned int nConcurrentRuns = optionsPset.getUntrackedParameter<
unsigned int>(
"numberOfConcurrentRuns");
381 if (nConcurrentRuns != 1) {
383 <<
"Although the plan is to change this in the future, currently nConcurrentRuns must always be 1.\n";
385 unsigned int nConcurrentLumis = optionsPset.getUntrackedParameter<
unsigned int>(
"numberOfConcurrentLuminosityBlocks");
386 if (nConcurrentLumis == 0) {
387 nConcurrentLumis = nConcurrentRuns;
410 auto& serviceSets = processDesc->getServicesPSets();
411 ServiceToken token = items.initServices(serviceSets, *parameterSet, iToken, iLegacy,
true);
419 handler->willBeUsingThreads();
423 std::shared_ptr<CommonParams>
common(items.initMisc(*parameterSet));
431 looper_->setActionTable(items.act_table_.get());
432 looper_->attachTo(*items.actReg_);
442 lumiQueue_ = std::make_unique<LimitedTaskQueue>(nConcurrentLumis);
450 items.branchIDListHelper(),
451 items.thinnedAssociationsHelper(),
453 items.processConfiguration(),
462 preg_ = items.preg();
470 FDEBUG(2) << parameterSet << std::endl;
488 for(
auto& subProcessPSet : subProcessVParameterSet) {
536 actReg_->preallocateSignal_(bounds);
567 ex.
addContext(
"Calling beginJob for the source");
573 actReg_->postBeginJobSignal_();
584 ExceptionCollector c(
"Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
593 c.
call([&subProcess,
i](){ subProcess.doEndStream(
i); } );
597 c.
call([actReg](){actReg->preEndJobSignal_();});
606 c.
call([actReg](){actReg->postEndJobSignal_();});
617 std::vector<ModuleDescription const*>
619 return schedule_->getAllModuleDescriptions();
658 #include "TransitionProcessors.icc" 663 bool returnValue =
false;
680 SendSourceTerminationSignalIfException sentry(
actReg_.get());
684 itemType =
input_->nextItemType();
688 sentry.completedSuccessfully();
700 std::pair<edm::ProcessHistoryID, edm::RunNumber_t>
702 return std::make_pair(
input_->reducedProcessHistoryID(),
input_->run());
707 return input_->luminosityBlock();
726 bool firstTime =
true;
736 auto trans = fp.processFiles(*
this);
748 <<
"Unexpected transition change " 783 FDEBUG(1) <<
" \treadFile\n";
785 SendSourceTerminationSignalIfException sentry(
actReg_.get());
790 if(size < preg_->
size()) {
798 sentry.completedSuccessfully();
802 if (
fb_.get() !=
nullptr) {
803 SendSourceTerminationSignalIfException sentry(
actReg_.get());
804 input_->closeFile(
fb_.get(), cleaningUpAfterException);
805 sentry.completedSuccessfully();
807 FDEBUG(1) <<
"\tcloseInputFile\n";
811 if (
fb_.get() !=
nullptr) {
815 FDEBUG(1) <<
"\topenOutputFiles\n";
819 if (
fb_.get() !=
nullptr) {
823 FDEBUG(1) <<
"\tcloseOutputFiles\n";
828 if (
fb_.get() !=
nullptr) {
832 FDEBUG(1) <<
"\trespondToOpenInputFile\n";
836 if (
fb_.get() !=
nullptr) {
840 FDEBUG(1) <<
"\trespondToCloseInputFile\n";
850 FDEBUG(1) <<
"\tstartingNewLoop\n";
856 looper_->setModuleChanger(&changer);
858 looper_->setModuleChanger(
nullptr);
862 FDEBUG(1) <<
"\tendOfLoop\n";
869 FDEBUG(1) <<
"\trewind\n";
874 FDEBUG(1) <<
"\tprepareForNextLoop\n";
878 FDEBUG(1) <<
"\tshouldWeCloseOutput\n";
881 if(subProcess.shouldWeCloseOutput()) {
891 FDEBUG(1) <<
"\tdoErrorStuff\n";
893 <<
"The EventProcessor state machine encountered an unexpected event\n" 894 <<
"and went to the error state\n" 895 <<
"Will attempt to terminate processing normally\n" 896 <<
"(IF using the looper the next loop will be attempted)\n" 897 <<
"This likely indicates a bug in an input module or corrupted input or both\n";
901 bool& eventSetupForInstanceSucceeded) {
902 globalBeginSucceeded =
false;
905 SendSourceTerminationSignalIfException sentry(
actReg_.get());
908 sentry.completedSuccessfully();
917 SendSourceTerminationSignalIfException sentry(
actReg_.get());
919 eventSetupForInstanceSucceeded =
true;
920 sentry.completedSuccessfully();
932 globalWaitTask->increment_ref_count();
940 globalWaitTask->wait_for_all();
941 if(globalWaitTask->exceptionPtr() !=
nullptr) {
942 std::rethrow_exception(* (globalWaitTask->exceptionPtr()) );
945 globalBeginSucceeded =
true;
946 FDEBUG(1) <<
"\tbeginRun " << run <<
"\n";
953 streamLoopWaitTask->increment_ref_count();
957 beginStreamsTransitionAsync<Traits>(streamLoopWaitTask.get(),
966 streamLoopWaitTask->wait_for_all();
967 if(streamLoopWaitTask->exceptionPtr() !=
nullptr) {
968 std::rethrow_exception(* (streamLoopWaitTask->exceptionPtr()) );
971 FDEBUG(1) <<
"\tstreamBeginRun " << run <<
"\n";
978 bool globalBeginSucceeded,
bool cleaningUpAfterException,
979 bool eventSetupForInstanceSucceeded) {
980 if (eventSetupForInstanceSucceeded) {
982 endRun(phid, run, globalBeginSucceeded, cleaningUpAfterException);
984 if(globalBeginSucceeded) {
986 t->increment_ref_count();
993 if(
t->exceptionPtr()) {
994 std::rethrow_exception(*
t->exceptionPtr());
1008 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1010 sentry.completedSuccessfully();
1013 if(globalBeginSucceeded){
1016 streamLoopWaitTask->increment_ref_count();
1028 cleaningUpAfterException);
1030 streamLoopWaitTask->wait_for_all();
1031 if(streamLoopWaitTask->exceptionPtr() !=
nullptr) {
1032 std::rethrow_exception(* (streamLoopWaitTask->exceptionPtr()) );
1035 FDEBUG(1) <<
"\tstreamEndRun " << run <<
"\n";
1041 globalWaitTask->increment_ref_count();
1051 cleaningUpAfterException);
1052 globalWaitTask->wait_for_all();
1053 if(globalWaitTask->exceptionPtr() !=
nullptr) {
1054 std::rethrow_exception(* (globalWaitTask->exceptionPtr()) );
1057 FDEBUG(1) <<
"\tendRun " << run <<
"\n";
1066 waitTask->increment_ref_count();
1073 input_->luminosityBlockAuxiliary()->beginTime()),
1077 waitTask->wait_for_all();
1079 if(waitTask->exceptionPtr() !=
nullptr) {
1080 std::rethrow_exception(* (waitTask->exceptionPtr()) );
1106 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1109 sentry.completedSuccessfully();
1115 rng->preBeginLumi(lb);
1122 ,[
this, holder = iHolder,
status, ts] (std::exception_ptr
const* iPtr)
mutable {
1124 holder.doneWaiting(*iPtr);
1127 status->globalBeginDidSucceed();
1135 holder.doneWaiting(std::current_exception());
1146 [
this,
i,
h = holder](std::exception_ptr
const* iPtr)
mutable 1149 h.doneWaiting(*iPtr);
1157 auto lp =
status->lumiPrincipal();
1158 event.setLuminosityBlockPrincipal(lp.get());
1172 beginGlobalTransitionAsync<Traits>(beginStreamsHolder,
1174 *(
status->lumiPrincipal()),
1195 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1197 sentry.completedSuccessfully();
1213 status->needToContinueLumi();
1214 status->startProcessingEvents();
1217 unsigned int streamIndex = 0;
1220 [
this,streamIndex,
h = iHolder](){
1233 std::exception_ptr ptr;
1246 auto& lp = *(
status->lumiPrincipal());
1252 ptr = std::current_exception();
1261 status->resumeGlobalLumiQueue();
1264 ptr = std::current_exception();
1271 ptr = std::current_exception();
1275 items.second.doneWaiting(ptr);
1280 task.doneWaiting(*iExcept);
1283 if(
status->didGlobalBeginSucceed()) {
1288 auto& lp = *(iLumiStatus->lumiPrincipal());
1304 iLumiStatus->cleaningUpAfterException());
1308 unsigned int iStreamIndex,
1309 std::shared_ptr<LuminosityBlockProcessingStatus> iLumiStatus) {
1311 auto t =
edm::make_waiting_task(tbb::task::allocate_root(), [
this, iStreamIndex, iTask](std::exception_ptr
const* iPtr)
mutable {
1312 std::exception_ptr ptr;
1323 if(
status->streamFinishedLumi()) {
1331 iLumiStatus->setEndTime();
1333 if(iLumiStatus->didGlobalBeginSucceed()) {
1334 auto & lumiPrincipal = *iLumiStatus->lumiPrincipal();
1336 lumiPrincipal.endTime());
1339 bool cleaningUpAfterException = iLumiStatus->cleaningUpAfterException();
1342 endStreamTransitionAsync<Traits>(
std::move(lumiDoneTask),
1344 lumiPrincipal,ts,es,
1354 globalWaitTask->increment_ref_count();
1363 globalWaitTask->wait_for_all();
1364 if(globalWaitTask->exceptionPtr() !=
nullptr) {
1365 std::rethrow_exception(* (globalWaitTask->exceptionPtr()) );
1373 <<
"EventProcessor::readRun\n" 1374 <<
"Illegal attempt to insert run into cache\n" 1375 <<
"Contact a Framework Developer\n";
1377 auto rp = std::make_shared<RunPrincipal>(
input_->runAuxiliary(),
preg(),
1381 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1383 sentry.completedSuccessfully();
1385 assert(
input_->reducedProcessHistoryID() == rp->reducedProcessHistoryID());
1387 return std::make_pair(rp->reducedProcessHistoryID(),
input_->run());
1394 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1395 input_->readAndMergeRun(*runPrincipal);
1396 sentry.completedSuccessfully();
1398 assert(
input_->reducedProcessHistoryID() == runPrincipal->reducedProcessHistoryID());
1399 return std::make_pair(runPrincipal->reducedProcessHistoryID(),
input_->run());
1405 <<
"EventProcessor::readLuminosityBlock\n" 1406 <<
"Illegal attempt to insert lumi into cache\n" 1407 <<
"Run is invalid\n" 1408 <<
"Contact a Framework Developer\n";
1412 lbp->setAux(*
input_->luminosityBlockAuxiliary());
1414 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1416 sentry.completedSuccessfully();
1424 assert(lumiPrincipal.aux().sameIdentity(*
input_->luminosityBlockAuxiliary())
or 1425 input_->processHistoryRegistry().reducedProcessHistoryID(lumiPrincipal.aux().processHistoryID()) ==
input_->processHistoryRegistry().reducedProcessHistoryID(
input_->luminosityBlockAuxiliary()->processHistoryID()));
1426 bool lumiOK = lumiPrincipal.adjustToNewProductRegistry(*
preg());
1428 lumiPrincipal.mergeAuxiliary(*
input_->luminosityBlockAuxiliary());
1430 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1432 sentry.completedSuccessfully();
1434 return input_->luminosityBlock();
1439 auto subsT =
edm::make_waiting_task(tbb::task::allocate_root(), [
this,phid,run,task,mergeableRunProductMetadata]
1440 (std::exception_ptr
const* iExcept)
mutable {
1446 s.writeRunAsync(task,phid,run,mergeableRunProductMetadata);
1457 for_all(
subProcesses_, [run,phid](
auto& subProcess){ subProcess.deleteRunFromCache(phid, run); });
1458 FDEBUG(1) <<
"\tdeleteRunFromCache " << run <<
"\n";
1462 auto subsT =
edm::make_waiting_task(tbb::task::allocate_root(), [
this,task, iStatus](std::exception_ptr
const* iExcept)
mutable {
1468 s.writeLumiAsync(task,*(iStatus->lumiPrincipal()));
1474 std::shared_ptr<LuminosityBlockPrincipal>
const& lumiPrincipal = iStatus->lumiPrincipal();
1475 lumiPrincipal->runPrincipal().mergeableRunProductMetadata()->writeLumi(lumiPrincipal->luminosityBlock());
1505 std::lock_guard<std::recursive_mutex> guard(*(
sourceMutex_.get()));
1518 input_->luminosityBlockAuxiliary()->beginTime()));
1536 bool expected =
false;
1546 unsigned int iStreamIndex)
1553 auto recursionTask =
make_waiting_task(tbb::task::allocate_root(), [
this,iTask,iStreamIndex](std::exception_ptr
const* iPtr)
mutable {
1555 bool expected =
false;
1569 if(
status->isLumiEnding()) {
1580 bool expected =
false;
1582 auto e =std::current_exception();
1595 SendSourceTerminationSignalIfException sentry(
actReg_.get());
1599 sentry.completedSuccessfully();
1601 FDEBUG(1) <<
"\treadEvent\n";
1605 unsigned int iStreamIndex) {
1612 unsigned int iStreamIndex) {
1619 rng->postEventRead(ev);
1623 tbb::task::allocate_root(),
1624 [
this,pep,iHolder](std::exception_ptr
const* iPtr)
mutable 1633 FDEBUG(1) <<
"\tprocessEvent\n";
1634 pep->clearEventPrincipal();
1645 afterProcessTask =
std::move(finalizeEventTask);
1651 [
this,pep,finalizeEventTask] (std::exception_ptr
const* iPtr)
mutable 1658 subProcess.doEventAsync(finalizeEventTask,*pep);
1673 bool randomAccess =
input_->randomAccess();
1682 status =
looper_->doDuringLoop(iPrincipal,
esp_->eventSetup(), pc, &streamContext);
1702 FDEBUG(1) <<
"\tshouldWeStop\n";
1706 if(subProcess.terminate()) {
1728 bool expected =
false;
1737 std::unique_ptr<LogSystem>
s;
1738 for(
auto worker:
schedule_->allWorkers()) {
1739 if( worker->wantsGlobalLuminosityBlocks() and worker->globalLuminosityBlocksQueue()) {
1741 s = std::make_unique<LogSystem>(
"ModulesSynchingOnLumis");
1742 (*s) <<
"The following modules require synchronizing on LuminosityBlock boundaries:";
1744 (*s)<<
"\n "<<worker->description().moduleName()<<
" "<<worker->description().moduleLabel();
void readLuminosityBlock(LuminosityBlockProcessingStatus &)
void insert(std::shared_ptr< RunPrincipal > rp)
T getParameter(std::string const &) const
void readEvent(unsigned int iStreamIndex)
T getUntrackedParameter(std::string const &, T const &) const
ProcessContext processContext_
bool readNextEventForStream(unsigned int iStreamIndex, LuminosityBlockProcessingStatus &iLumiStatus)
void setProcessesWithMergeableRunProducts(ProductRegistry const &productRegistry)
void clear()
Not thread safe.
bool wasEventProcessingStopped() const
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
void deleteRunFromCache(ProcessHistoryID const &phid, RunNumber_t run)
SharedResourcesAcquirer sourceResourcesAcquirer_
Timestamp const & beginTime() const
edm::EventID specifiedEventTransition() const
InputSource::ItemType nextTransitionType()
FWCore Framework interface EventSetupRecordImplementation h
Helper function to determine trigger accepts.
void processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
def create(alignables, pedeDump, additionalData, outputFile, config)
edm::propagate_const< std::unique_ptr< InputSource > > input_
void globalEndLumiAsync(edm::WaitingTaskHolder iTask, std::shared_ptr< LuminosityBlockProcessingStatus > iLumiStatus)
std::unique_ptr< ExceptionToActionTable const > act_table_
static PFTauRenderPlugin instance
ParameterSetID id() const
FunctorTask< F > * make_functor_task(ALLOC &&iAlloc, F f)
void beginRun(ProcessHistoryID const &phid, RunNumber_t run, bool &globalBeginSucceeded, bool &eventSetupForInstanceSucceeded)
void warnAboutModulesRequiringLuminosityBLockSynchronization() const
void setExceptionMessageFiles(std::string &message)
void processEventAsyncImpl(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
std::unique_ptr< edm::LimitedTaskQueue > lumiQueue_
volatile std::atomic< bool > shutdown_flag
static LuminosityBlockNumber_t maxLuminosityBlockNumber()
void clearCounters()
Clears counters used by trigger report.
unsigned int numberOfRuns() const
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
bool checkForAsyncStopRequest(StatusCode &)
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
bool lastOperationSucceeded() const
void writeRunAsync(WaitingTaskHolder, ProcessHistoryID const &phid, RunNumber_t run, MergeableRunProductMetadata const *)
unsigned int numberOfThreads() const
void streamEndLumiAsync(edm::WaitingTaskHolder iTask, unsigned int iStreamIndex, std::shared_ptr< LuminosityBlockProcessingStatus > iLumiStatus)
bool hasRunPrincipal() const
void push(T &&iAction)
asynchronously pushes functor iAction into queue
void adjustIndexesAfterProductRegistryAddition()
std::string exceptionMessageRuns_
void deleteLumiFromCache(LuminosityBlockProcessingStatus &)
PreallocationConfiguration preallocations_
unsigned int LuminosityBlockNumber_t
std::vector< SubProcess > subProcesses_
bool alreadyPrinted() const
void validateTopLevelParameterSets(ParameterSet *processParameterSet)
MergeableRunProductProcesses mergeableRunProductProcesses_
static std::string const input
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
void handleNextEventForStreamAsync(WaitingTaskHolder iTask, unsigned int iStreamIndex)
void setEndTime(Timestamp const &time)
std::unique_ptr< InputSource > makeInput(ParameterSet ¶ms, CommonParams const &common, std::shared_ptr< ProductRegistry > preg, std::shared_ptr< BranchIDListHelper > branchIDListHelper, std::shared_ptr< ThinnedAssociationsHelper > thinnedAssociationsHelper, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, PreallocationConfiguration const &allocations)
U second(std::pair< T, U > const &p)
std::shared_ptr< LuminosityBlockPrincipal > & lumiPrincipal()
void initialize(Schedule const *, std::shared_ptr< ProductRegistry const >)
ServiceToken serviceToken_
bool endPathsEnabled() const
std::atomic< bool > deferredExceptionPtrIsSet_
bool resume()
Resumes processing if the queue was paused.
void doneWaiting(std::exception_ptr iExcept)
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
std::string exceptionMessageLumis_
LuminosityBlockNumber_t luminosityBlock() const
std::shared_ptr< ProductRegistry const > preg() const
void setExceptionMessageLumis(std::string &message)
std::vector< edm::SerialTaskQueue > streamQueues_
InputSource::ItemType lastTransitionType() const
void setExceptionMessageRuns(std::string &message)
bool taskHasFailed() const
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
Timestamp const & beginTime() const
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
void endRun(ProcessHistoryID const &phid, RunNumber_t run, bool globalBeginSucceeded, bool cleaningUpAfterException)
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
static void setThrowAnException(bool v)
void setLastOperationSucceeded(bool value)
StreamID streamID() const
void clear()
Not thread safe.
Timestamp const & endTime() const
void continueLumiAsync(edm::WaitingTaskHolder iHolder)
StatusCode runToCompletion()
bool continuingLumi() const
void addAdditionalInfo(std::string const &info)
void endUnfinishedRun(ProcessHistoryID const &phid, RunNumber_t run, bool globalBeginSucceeded, bool cleaningUpAfterException, bool eventSetupForInstanceSucceeded)
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
unsigned int numberOfLuminosityBlocks() const
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
void writeLumiAsync(WaitingTaskHolder, std::shared_ptr< LuminosityBlockProcessingStatus >)
InputSource::ItemType processLumis(std::shared_ptr< void > const &iRunResource)
element_type const * get() const
void adjustEventsToNewProductRegistry(std::shared_ptr< ProductRegistry const > reg)
SerialTaskQueueChain & serialQueueChain() const
std::shared_ptr< EDLooperBase > fillLooper(eventsetup::EventSetupsController &esController, eventsetup::EventSetupProvider &cp, ParameterSet ¶ms)
std::string exceptionMessageFiles_
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
void respondToCloseInputFile()
InputSource::ItemType lastSourceTransition_
std::vector< ParameterSet > popSubProcessVParameterSet(ParameterSet ¶meterSet)
void setProcessHistoryRegistry(ProcessHistoryRegistry const &phr)
StatusCode asyncStopStatusCodeFromProcessingEvents_
void checkForModuleDependencyCorrectness(edm::PathsAndConsumesOfModulesBase const &iPnC, bool iPrintDependencies)
void respondToOpenInputFile()
bool shouldWeCloseOutput() const
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
MergeableRunProductMetadata * mergeableRunProductMetadata()
int readAndMergeLumi(LuminosityBlockProcessingStatus &)
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
void deleteRun(ProcessHistoryID const &phid, RunNumber_t run)
void addContext(std::string const &context)
void stopProcessingEvents()
static EventNumber_t maxEventNumber()
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
std::shared_ptr< RunPrincipal > const & runPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run) const
bool forceESCacheClearOnNewRun_
void beginLumiAsync(edm::IOVSyncValue const &iSyncValue, std::shared_ptr< void > const &iRunResource, edm::WaitingTaskHolder iHolder)
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
std::vector< std::vector< double > > tmp
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
void closeInputFile(bool cleaningUpAfterException)
unsigned int numberOfStreams() const
std::exception_ptr deferredExceptionPtr_
int totalEventsFailed() const
void push(const T &iAction)
asynchronously pushes functor iAction into queue
edm::SerialTaskQueue iovQueue_
PathsAndConsumesOfModules pathsAndConsumesOfModules_
edm::LuminosityBlockNumber_t nextLuminosityBlockID()
void setNextSyncValue(IOVSyncValue iValue)
void prepareForNextLoop()
std::shared_ptr< LuminosityBlockPrincipal > getAvailableLumiPrincipalPtr()
auto wrap(F iFunc) -> decltype(iFunc())
bool asyncStopRequestedWhileProcessingEvents_
std::shared_ptr< ActivityRegistry > actReg_
void call(std::function< void(void)>)
std::atomic< unsigned int > streamLumiActive_
void processEventWithLooper(EventPrincipal &)
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
T first(std::pair< T, U > const &p)
std::pair< ProcessHistoryID, RunNumber_t > readRun()
static ParentageRegistry * instance()
bool setDeferredException(std::exception_ptr)
int totalEventsPassed() const
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
ParameterSet const & registerIt()
std::pair< ProcessHistoryID, RunNumber_t > readAndMergeRun()
bool pause()
Pauses processing of additional tasks from the queue.
bool insertMapped(value_type const &v)
ParameterSet const & parameterSet(Provenance const &provenance)
Transition requestedTransition() const
T get(const Candidate &c)
static Registry * instance()
std::shared_ptr< EDLooperBase const > looper() const
void merge(std::shared_ptr< RunAuxiliary > aux, std::shared_ptr< ProductRegistry const > reg)
PrincipalCache principalCache_
bool shouldWeStop() const
EventProcessor(std::unique_ptr< ParameterSet > parameterSet, ServiceToken const &token=ServiceToken(), serviceregistry::ServiceLegacy=serviceregistry::kOverlapIsError, std::vector< std::string > const &defaultServices=std::vector< std::string >(), std::vector< std::string > const &forcedServices=std::vector< std::string >())
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const
def operate(timelog, memlog, json_f, num)
void enableEndPaths(bool active)
void getTriggerReport(TriggerReport &rep) const
ParameterSet * getPSetForUpdate(std::string const &name, bool &isTracked)
std::pair< edm::ProcessHistoryID, edm::RunNumber_t > nextRunID()
int maxSecondsUntilRampdown_