CMS 3D CMS Logo

List of all members | Public Types | Public Member Functions | Private Types | Private Member Functions | Private Attributes
edm::EventProcessor Class Reference

#include <EventProcessor.h>

Public Types

using ProcessBlockType = PrincipalCache::ProcessBlockType
 
enum  StatusCode {
  epSuccess = 0, epException = 1, epOther = 2, epSignal = 3,
  epInputComplete = 4, epTimedOut = 5, epCountComplete = 6
}
 

Public Member Functions

void beginJob ()
 
void beginLumiAsync (edm::IOVSyncValue const &iSyncValue, std::shared_ptr< void > const &iRunResource, edm::WaitingTaskHolder iHolder)
 
void beginProcessBlock (bool &beginProcessBlockSucceeded)
 
void beginRun (ProcessHistoryID const &phid, RunNumber_t run, bool &globalBeginSucceeded, bool &eventSetupForInstanceSucceeded)
 
void clearCounters ()
 Clears counters used by trigger report. More...
 
void closeInputFile (bool cleaningUpAfterException)
 
void closeOutputFiles ()
 
void continueLumiAsync (edm::WaitingTaskHolder iHolder)
 
void deleteLumiFromCache (LuminosityBlockProcessingStatus &)
 
void deleteRunFromCache (ProcessHistoryID const &phid, RunNumber_t run)
 
void doErrorStuff ()
 
void enableEndPaths (bool active)
 
void endJob ()
 
bool endOfLoop ()
 
bool endPathsEnabled () const
 
void endProcessBlock (bool cleaningUpAfterException, bool beginProcessBlockSucceeded)
 
void endRun (ProcessHistoryID const &phid, RunNumber_t run, bool globalBeginSucceeded, bool cleaningUpAfterException)
 
void endUnfinishedLumi ()
 
void endUnfinishedRun (ProcessHistoryID const &phid, RunNumber_t run, bool globalBeginSucceeded, bool cleaningUpAfterException, bool eventSetupForInstanceSucceeded)
 
 EventProcessor (EventProcessor const &)=delete
 
 EventProcessor (std::shared_ptr< ProcessDesc > processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy legacy)
 
 EventProcessor (std::unique_ptr< ParameterSet > parameterSet, ServiceToken const &token=ServiceToken(), serviceregistry::ServiceLegacy=serviceregistry::kOverlapIsError, std::vector< std::string > const &defaultServices=std::vector< std::string >(), std::vector< std::string > const &forcedServices=std::vector< std::string >())
 
 EventProcessor (std::unique_ptr< ParameterSet > parameterSet, std::vector< std::string > const &defaultServices, std::vector< std::string > const &forcedServices=std::vector< std::string >())
 
bool fileBlockValid ()
 
std::vector< ModuleDescription const * > getAllModuleDescriptions () const
 
ServiceToken getToken ()
 
void globalEndLumiAsync (edm::WaitingTaskHolder iTask, std::shared_ptr< LuminosityBlockProcessingStatus > iLumiStatus)
 
void handleEndLumiExceptions (std::exception_ptr const *iPtr, WaitingTaskHolder &holder)
 
void inputProcessBlocks ()
 
InputSource::ItemType lastTransitionType () const
 
edm::LuminosityBlockNumber_t nextLuminosityBlockID ()
 
std::pair< edm::ProcessHistoryID, edm::RunNumber_tnextRunID ()
 
InputSource::ItemType nextTransitionType ()
 
void openOutputFiles ()
 
EventProcessoroperator= (EventProcessor const &)=delete
 
void prepareForNextLoop ()
 
ProcessConfiguration const & processConfiguration () const
 
InputSource::ItemType processLumis (std::shared_ptr< void > const &iRunResource)
 
int readAndMergeLumi (LuminosityBlockProcessingStatus &)
 
std::pair< ProcessHistoryID, RunNumber_treadAndMergeRun ()
 
void readFile ()
 
void readLuminosityBlock (LuminosityBlockProcessingStatus &)
 
void readProcessBlock (ProcessBlockPrincipal &)
 
std::pair< ProcessHistoryID, RunNumber_treadRun ()
 
void respondToCloseInputFile ()
 
void respondToOpenInputFile ()
 
void rewindInput ()
 
StatusCode run ()
 
StatusCode runToCompletion ()
 
bool setDeferredException (std::exception_ptr)
 
void setExceptionMessageFiles (std::string &message)
 
void setExceptionMessageLumis ()
 
void setExceptionMessageRuns (std::string &message)
 
bool shouldWeCloseOutput () const
 
bool shouldWeStop () const
 
void startingNewLoop ()
 
void streamEndLumiAsync (edm::WaitingTaskHolder iTask, unsigned int iStreamIndex)
 
void taskCleanup ()
 
int totalEvents () const
 
int totalEventsFailed () const
 
int totalEventsPassed () const
 
void writeLumiAsync (WaitingTaskHolder, LuminosityBlockPrincipal &lumiPrincipal)
 
void writeProcessBlockAsync (WaitingTaskHolder, ProcessBlockType)
 
void writeRunAsync (WaitingTaskHolder, ProcessHistoryID const &phid, RunNumber_t run, MergeableRunProductMetadata const *)
 
 ~EventProcessor ()
 

Private Types

typedef std::set< std::pair< std::string, std::string > > ExcludedData
 
typedef std::map< std::string, ExcludedDataExcludedDataMap
 

Private Member Functions

std::shared_ptr< BranchIDListHelper > & branchIDListHelper ()
 
std::shared_ptr< BranchIDListHelper const > branchIDListHelper () const
 
bool checkForAsyncStopRequest (StatusCode &)
 
void handleNextEventForStreamAsync (WaitingTaskHolder iTask, unsigned int iStreamIndex)
 
void init (std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
 
std::shared_ptr< EDLooperBase > & looper ()
 
std::shared_ptr< EDLooperBase const > looper () const
 
std::shared_ptr< ProductRegistry > & preg ()
 
std::shared_ptr< ProductRegistry const > preg () const
 
void processEventAsync (WaitingTaskHolder iHolder, unsigned int iStreamIndex)
 
void processEventAsyncImpl (WaitingTaskHolder iHolder, unsigned int iStreamIndex)
 
void processEventWithLooper (EventPrincipal &, unsigned int iStreamIndex)
 
void readEvent (unsigned int iStreamIndex)
 
bool readNextEventForStream (unsigned int iStreamIndex, LuminosityBlockProcessingStatus &iLumiStatus)
 
std::shared_ptr< ThinnedAssociationsHelper > & thinnedAssociationsHelper ()
 
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper () const
 
void warnAboutLegacyModules () const
 
void warnAboutModulesRequiringLuminosityBLockSynchronization () const
 

Private Attributes

std::unique_ptr< ExceptionToActionTable const > act_table_
 
std::shared_ptr< ActivityRegistryactReg_
 
bool asyncStopRequestedWhileProcessingEvents_
 
StatusCode asyncStopStatusCodeFromProcessingEvents_
 
bool beginJobCalled_
 
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
 
std::exception_ptr deferredExceptionPtr_
 
std::atomic< bool > deferredExceptionPtrIsSet_
 
bool deleteNonConsumedUnscheduledModules_ = true
 
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
 
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
 
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
 
std::string exceptionMessageFiles_
 
std::atomic< bool > exceptionMessageLumis_
 
std::string exceptionMessageRuns_
 
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
 
bool fileModeNoMerge_
 
bool firstEventInBlock_ = true
 
bool forceESCacheClearOnNewRun_
 
bool forceLooperToEnd_
 
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
 
edm::propagate_const< std::unique_ptr< InputSource > > input_
 
InputSource::ItemType lastSourceTransition_
 
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
 
bool looperBeginJobRun_
 
std::unique_ptr< edm::LimitedTaskQueuelumiQueue_
 
MergeableRunProductProcesses mergeableRunProductProcesses_
 
PathsAndConsumesOfModules pathsAndConsumesOfModules_
 
PreallocationConfiguration preallocations_
 
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
 
PrincipalCache principalCache_
 
bool printDependencies_ = false
 
edm::propagate_const< std::shared_ptr< ProcessBlockHelper > > processBlockHelper_
 
std::shared_ptr< ProcessConfiguration const > processConfiguration_
 
ProcessContext processContext_
 
edm::SerialTaskQueue queueWhichWaitsForIOVsToFinish_
 
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
 
ServiceToken serviceToken_
 
bool shouldWeStop_
 
std::shared_ptr< std::recursive_mutex > sourceMutex_
 
SharedResourcesAcquirer sourceResourcesAcquirer_
 
std::atomic< unsigned int > streamLumiActive_ {0}
 
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
 
std::vector< edm::SerialTaskQueuestreamQueues_
 
std::vector< SubProcesssubProcesses_
 
tbb::task_group taskGroup_
 
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
 

Detailed Description

Definition at line 66 of file EventProcessor.h.

Member Typedef Documentation

◆ ExcludedData

typedef std::set<std::pair<std::string, std::string> > edm::EventProcessor::ExcludedData
private

Definition at line 371 of file EventProcessor.h.

◆ ExcludedDataMap

typedef std::map<std::string, ExcludedData> edm::EventProcessor::ExcludedDataMap
private

Definition at line 372 of file EventProcessor.h.

◆ ProcessBlockType

Definition at line 251 of file EventProcessor.h.

Member Enumeration Documentation

◆ StatusCode

Enumerator
epSuccess 
epException 
epOther 
epSignal 
epInputComplete 
epTimedOut 
epCountComplete 

Definition at line 76 of file EventProcessor.h.

76  {
77  epSuccess = 0,
78  epException = 1,
79  epOther = 2,
80  epSignal = 3,
81  epInputComplete = 4,
82  epTimedOut = 5,
83  epCountComplete = 6
84  };

Constructor & Destructor Documentation

◆ EventProcessor() [1/4]

edm::EventProcessor::EventProcessor ( std::unique_ptr< ParameterSet parameterSet,
ServiceToken const &  token = ServiceToken(),
serviceregistry::ServiceLegacy  iLegacy = serviceregistry::kOverlapIsError,
std::vector< std::string > const &  defaultServices = std::vector<std::string>(),
std::vector< std::string > const &  forcedServices = std::vector<std::string>() 
)
explicit

Definition at line 232 of file EventProcessor.cc.

237  : actReg_(),
238  preg_(),
240  serviceToken_(),
241  input_(),
242  espController_(new eventsetup::EventSetupsController),
243  esp_(),
244  act_table_(),
246  schedule_(),
247  subProcesses_(),
248  historyAppender_(new HistoryAppender),
249  fb_(),
250  looper_(),
252  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
253  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
254  principalCache_(),
255  beginJobCalled_(false),
256  shouldWeStop_(false),
257  fileModeNoMerge_(false),
260  exceptionMessageLumis_(false),
261  forceLooperToEnd_(false),
262  looperBeginJobRun_(false),
265  auto processDesc = std::make_shared<ProcessDesc>(std::move(parameterSet));
266  processDesc->addServices(defaultServices, forcedServices);
267  init(processDesc, iToken, iLegacy);
268  }

References init(), eostools::move(), and edm::parameterSet().

◆ EventProcessor() [2/4]

edm::EventProcessor::EventProcessor ( std::unique_ptr< ParameterSet parameterSet,
std::vector< std::string > const &  defaultServices,
std::vector< std::string > const &  forcedServices = std::vector<std::string>() 
)

Definition at line 270 of file EventProcessor.cc.

273  : actReg_(),
274  preg_(),
276  serviceToken_(),
277  input_(),
278  espController_(new eventsetup::EventSetupsController),
279  esp_(),
280  act_table_(),
282  schedule_(),
283  subProcesses_(),
284  historyAppender_(new HistoryAppender),
285  fb_(),
286  looper_(),
288  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
289  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
290  principalCache_(),
291  beginJobCalled_(false),
292  shouldWeStop_(false),
293  fileModeNoMerge_(false),
296  exceptionMessageLumis_(false),
297  forceLooperToEnd_(false),
298  looperBeginJobRun_(false),
302  auto processDesc = std::make_shared<ProcessDesc>(std::move(parameterSet));
303  processDesc->addServices(defaultServices, forcedServices);
305  }

References init(), edm::serviceregistry::kOverlapIsError, eostools::move(), and edm::parameterSet().

◆ EventProcessor() [3/4]

edm::EventProcessor::EventProcessor ( std::shared_ptr< ProcessDesc processDesc,
ServiceToken const &  token,
serviceregistry::ServiceLegacy  legacy 
)

Definition at line 307 of file EventProcessor.cc.

310  : actReg_(),
311  preg_(),
313  serviceToken_(),
314  input_(),
315  espController_(new eventsetup::EventSetupsController),
316  esp_(),
317  act_table_(),
319  schedule_(),
320  subProcesses_(),
321  historyAppender_(new HistoryAppender),
322  fb_(),
323  looper_(),
325  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
326  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
327  principalCache_(),
328  beginJobCalled_(false),
329  shouldWeStop_(false),
330  fileModeNoMerge_(false),
333  exceptionMessageLumis_(false),
334  forceLooperToEnd_(false),
335  looperBeginJobRun_(false),
339  init(processDesc, token, legacy);
340  }

References init(), and unpackBuffers-CaloStage2::token.

◆ ~EventProcessor()

edm::EventProcessor::~EventProcessor ( )

Definition at line 560 of file EventProcessor.cc.

560  {
561  // Make the services available while everything is being deleted.
564 
565  // manually destroy all these thing that may need the services around
566  // propagate_const<T> has no reset() function
567  espController_ = nullptr;
568  esp_ = nullptr;
569  schedule_ = nullptr;
570  input_ = nullptr;
571  looper_ = nullptr;
572  actReg_ = nullptr;
573 
576  }

References actReg_, edm::ParentageRegistry::clear(), edm::pset::Registry::clear(), esp_, espController_, getToken(), input_, edm::ParentageRegistry::instance(), edm::pset::Registry::instance(), looper_, schedule_, and unpackBuffers-CaloStage2::token.

◆ EventProcessor() [4/4]

edm::EventProcessor::EventProcessor ( EventProcessor const &  )
delete

Member Function Documentation

◆ beginJob()

void edm::EventProcessor::beginJob ( void  )

This should be called before the first call to 'run' If this is not called in time, it will automatically be called the first time 'run' is called

Definition at line 585 of file EventProcessor.cc.

585  {
586  if (beginJobCalled_)
587  return;
588  beginJobCalled_ = true;
589  bk::beginJob();
590 
591  // StateSentry toerror(this); // should we add this ?
592  //make the services available
594 
595  service::SystemBounds bounds(preallocations_.numberOfStreams(),
599  actReg_->preallocateSignal_(bounds);
600  schedule_->convertCurrentProcessAlias(processConfiguration_->processName());
602 
603  std::vector<ModuleProcessName> consumedBySubProcesses;
605  [&consumedBySubProcesses, deleteModules = deleteNonConsumedUnscheduledModules_](auto& subProcess) {
606  auto c = subProcess.keepOnlyConsumedUnscheduledModules(deleteModules);
607  if (consumedBySubProcesses.empty()) {
608  consumedBySubProcesses = std::move(c);
609  } else if (not c.empty()) {
610  std::vector<ModuleProcessName> tmp;
611  tmp.reserve(consumedBySubProcesses.size() + c.size());
612  std::merge(consumedBySubProcesses.begin(),
613  consumedBySubProcesses.end(),
614  c.begin(),
615  c.end(),
616  std::back_inserter(tmp));
617  std::swap(consumedBySubProcesses, tmp);
618  }
619  });
620 
621  // Note: all these may throw
624  if (auto const unusedModules = nonConsumedUnscheduledModules(pathsAndConsumesOfModules_, consumedBySubProcesses);
625  not unusedModules.empty()) {
627 
628  edm::LogInfo("DeleteModules").log([&unusedModules](auto& l) {
629  l << "Following modules are not in any Path or EndPath, nor is their output consumed by any other module, "
630  "and "
631  "therefore they are deleted before beginJob transition.";
632  for (auto const& description : unusedModules) {
633  l << "\n " << description->moduleLabel();
634  }
635  });
636  for (auto const& description : unusedModules) {
637  schedule_->deleteModule(description->moduleLabel(), actReg_.get());
638  }
639  }
640  }
641 
642  actReg_->preBeginJobSignal_(pathsAndConsumesOfModules_, processContext_);
643 
646  }
648 
649  //NOTE: This implementation assumes 'Job' means one call
650  // the EventProcessor::run
651  // If it really means once per 'application' then this code will
652  // have to be changed.
653  // Also have to deal with case where have 'run' then new Module
654  // added and do 'run'
655  // again. In that case the newly added Module needs its 'beginJob'
656  // to be called.
657 
658  //NOTE: in future we should have a beginOfJob for looper that takes no arguments
659  // For now we delay calling beginOfJob until first beginOfRun
660  //if(looper_) {
661  // looper_->beginOfJob(es);
662  //}
663  try {
664  convertException::wrap([&]() { input_->doBeginJob(); });
665  } catch (cms::Exception& ex) {
666  ex.addContext("Calling beginJob for the source");
667  throw;
668  }
669  espController_->finishConfiguration();
670  schedule_->beginJob(*preg_, esp_->recordsToProxyIndices(), *processBlockHelper_);
671  if (looper_) {
672  constexpr bool mustPrefetchMayGet = true;
673  auto const processBlockLookup = preg_->productLookup(InProcess);
674  auto const runLookup = preg_->productLookup(InRun);
675  auto const lumiLookup = preg_->productLookup(InLumi);
676  auto const eventLookup = preg_->productLookup(InEvent);
677  looper_->updateLookup(InProcess, *processBlockLookup, mustPrefetchMayGet);
678  looper_->updateLookup(InRun, *runLookup, mustPrefetchMayGet);
679  looper_->updateLookup(InLumi, *lumiLookup, mustPrefetchMayGet);
680  looper_->updateLookup(InEvent, *eventLookup, mustPrefetchMayGet);
681  looper_->updateLookup(esp_->recordsToProxyIndices());
682  }
683  // toerror.succeeded(); // should we add this?
684  for_all(subProcesses_, [](auto& subProcess) { subProcess.doBeginJob(); });
685  actReg_->postBeginJobSignal_();
686 
687  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
688  schedule_->beginStream(i);
689  for_all(subProcesses_, [i](auto& subProcess) { subProcess.doBeginStream(i); });
690  }
691  }

References actReg_, cms::Exception::addContext(), bk::beginJob(), beginJobCalled_, c, edm::checkForModuleDependencyCorrectness(), deleteNonConsumedUnscheduledModules_, edmLumisInFiles::description, esp_, espController_, edm::for_all(), mps_fire::i, edm::InEvent, edm::PathsAndConsumesOfModules::initialize(), edm::InLumi, edm::InProcess, input_, edm::InRun, cmsLHEtoEOSManager::l, looper_, MatrixUtil::merge(), eostools::move(), edm::nonConsumedUnscheduledModules(), edm::PreallocationConfiguration::numberOfLuminosityBlocks(), edm::PreallocationConfiguration::numberOfRuns(), edm::PreallocationConfiguration::numberOfStreams(), edm::PreallocationConfiguration::numberOfThreads(), pathsAndConsumesOfModules_, preallocations_, preg(), preg_, printDependencies_, processBlockHelper_, processConfiguration_, processContext_, edm::PathsAndConsumesOfModules::removeModules(), schedule_, serviceToken_, subProcesses_, std::swap(), createJobs::tmp, warnAboutLegacyModules(), warnAboutModulesRequiringLuminosityBLockSynchronization(), and edm::convertException::wrap().

Referenced by runToCompletion().

◆ beginLumiAsync()

void edm::EventProcessor::beginLumiAsync ( edm::IOVSyncValue const &  iSyncValue,
std::shared_ptr< void > const &  iRunResource,
edm::WaitingTaskHolder  iHolder 
)

Definition at line 1280 of file EventProcessor.cc.

1282  {
1283  if (iHolder.taskHasFailed()) {
1284  return;
1285  }
1286 
1287  // We must be careful with the status object here and in code this function calls. IF we want
1288  // endRun to be called, then we must call resetResources before the things waiting on
1289  // iHolder are allowed to proceed. Otherwise, there will be race condition (possibly causing
1290  // endRun to be called much later than it should be, because status is holding iRunResource).
1291  // Note that this must be done explicitly. Relying on the destructor does not work well
1292  // because the LimitedTaskQueue for the lumiWork holds the shared_ptr in each of its internal
1293  // queues, plus it is difficult to guarantee the destructor is called before iHolder gets
1294  // destroyed inside this function and lumiWork.
1295  auto status =
1296  std::make_shared<LuminosityBlockProcessingStatus>(this, preallocations_.numberOfStreams(), iRunResource);
1297 
1298  auto lumiWork = [this, iHolder, status](edm::LimitedTaskQueue::Resumer iResumer) mutable {
1299  if (iHolder.taskHasFailed()) {
1300  status->resetResources();
1301  return;
1302  }
1303 
1304  status->setResumer(std::move(iResumer));
1305 
1307  *iHolder.group(), [this, iHolder, status = std::move(status)]() mutable {
1308  //make the services available
1310  // Caught exception is propagated via WaitingTaskHolder
1311  CMS_SA_ALLOW try {
1313 
1314  LuminosityBlockPrincipal& lumiPrincipal = *status->lumiPrincipal();
1315  {
1316  SendSourceTerminationSignalIfException sentry(actReg_.get());
1317 
1318  input_->doBeginLumi(lumiPrincipal, &processContext_);
1319  sentry.completedSuccessfully();
1320  }
1321 
1322  Service<RandomNumberGenerator> rng;
1323  if (rng.isAvailable()) {
1324  LuminosityBlock lb(lumiPrincipal, ModuleDescription(), nullptr, false);
1325  rng->preBeginLumi(lb);
1326  }
1327 
1328  EventSetupImpl const& es = status->eventSetupImpl(esp_->subProcessIndex());
1329 
1330  using namespace edm::waiting_task::chain;
1331  chain::first([this, status, &lumiPrincipal](auto nextTask) {
1332  EventSetupImpl const& es = status->eventSetupImpl(esp_->subProcessIndex());
1333  {
1334  LumiTransitionInfo transitionInfo(lumiPrincipal, es, &status->eventSetupImpls());
1335  using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalBegin>;
1336  beginGlobalTransitionAsync<Traits>(
1337  nextTask, *schedule_, transitionInfo, serviceToken_, subProcesses_);
1338  }
1339  }) | ifThen(looper_, [this, status, &es](auto nextTask) {
1340  looper_->prefetchAsync(
1341  nextTask, serviceToken_, Transition::BeginLuminosityBlock, *(status->lumiPrincipal()), es);
1342  }) | ifThen(looper_, [this, status, &es](auto nextTask) {
1343  status->globalBeginDidSucceed();
1344  //make the services available
1345  ServiceRegistry::Operate operateLooper(serviceToken_);
1346  looper_->doBeginLuminosityBlock(*(status->lumiPrincipal()), es, &processContext_);
1347  }) | then([this, status](std::exception_ptr const* iPtr, auto holder) mutable {
1348  if (iPtr) {
1349  status->resetResources();
1350  holder.doneWaiting(*iPtr);
1351  } else {
1352  if (not looper_) {
1353  status->globalBeginDidSucceed();
1354  }
1355  EventSetupImpl const& es = status->eventSetupImpl(esp_->subProcessIndex());
1356  using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamBegin>;
1357 
1358  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1359  streamQueues_[i].push(*holder.group(), [this, i, status, holder, &es]() mutable {
1360  streamQueues_[i].pause();
1361 
1362  auto& event = principalCache_.eventPrincipal(i);
1363  //We need to be sure that 'status' and its internal shared_ptr<LuminosityBlockPrincipal> are only
1364  // held by the container as this lambda may not finish executing before all the tasks it
1365  // spawns have already started to run.
1366  auto eventSetupImpls = &status->eventSetupImpls();
1367  auto lp = status->lumiPrincipal().get();
1370  event.setLuminosityBlockPrincipal(lp);
1371  LumiTransitionInfo transitionInfo(*lp, es, eventSetupImpls);
1372  using namespace edm::waiting_task::chain;
1373  chain::first([this, i, &transitionInfo](auto nextTask) {
1374  beginStreamTransitionAsync<Traits>(
1375  std::move(nextTask), *schedule_, i, transitionInfo, serviceToken_, subProcesses_);
1376  }) | then([this, i](std::exception_ptr const* exceptionFromBeginStreamLumi, auto nextTask) {
1377  if (exceptionFromBeginStreamLumi) {
1378  WaitingTaskHolder tmp(nextTask);
1379  tmp.doneWaiting(*exceptionFromBeginStreamLumi);
1380  streamEndLumiAsync(nextTask, i);
1381  } else {
1383  }
1384  }) | runLast(holder);
1385  });
1386  }
1387  }
1388  }) | runLast(iHolder);
1389 
1390  } catch (...) {
1391  status->resetResources();
1392  iHolder.doneWaiting(std::current_exception());
1393  }
1394  }); // task in sourceResourcesAcquirer
1395  }; // end lumiWork
1396 
1397  auto queueLumiWorkTask = make_waiting_task(
1398  [this, lumiWorkLambda = std::move(lumiWork), iHolder](std::exception_ptr const* iPtr) mutable {
1399  if (iPtr) {
1400  iHolder.doneWaiting(*iPtr);
1401  }
1402  lumiQueue_->pushAndPause(*iHolder.group(), std::move(lumiWorkLambda));
1403  });
1404 
1405  if (espController_->doWeNeedToWaitForIOVsToFinish(iSync)) {
1406  // We only get here inside this block if there is an EventSetup
1407  // module not able to handle concurrent IOVs (usually an ESSource)
1408  // and the new sync value is outside the current IOV of that module.
1409 
1410  WaitingTaskHolder queueLumiWorkTaskHolder{*iHolder.group(), queueLumiWorkTask};
1411 
1412  queueWhichWaitsForIOVsToFinish_.push(*iHolder.group(), [this, queueLumiWorkTaskHolder, iSync, status]() mutable {
1413  // Caught exception is propagated via WaitingTaskHolder
1414  CMS_SA_ALLOW try {
1415  SendSourceTerminationSignalIfException sentry(actReg_.get());
1416  // Pass in iSync to let the EventSetup system know which run and lumi
1417  // need to be processed and prepare IOVs for it.
1418  // Pass in the endIOVWaitingTasks so the lumi can notify them when the
1419  // lumi is done and no longer needs its EventSetup IOVs.
1420  espController_->eventSetupForInstanceAsync(
1421  iSync, queueLumiWorkTaskHolder, status->endIOVWaitingTasks(), status->eventSetupImpls());
1422  sentry.completedSuccessfully();
1423  } catch (...) {
1424  queueLumiWorkTaskHolder.doneWaiting(std::current_exception());
1425  }
1427  });
1428 
1429  } else {
1431 
1432  // This holder will be used to wait until the EventSetup IOVs are ready
1433  WaitingTaskHolder queueLumiWorkTaskHolder{*iHolder.group(), queueLumiWorkTask};
1434  // Caught exception is propagated via WaitingTaskHolder
1435  CMS_SA_ALLOW try {
1436  SendSourceTerminationSignalIfException sentry(actReg_.get());
1437 
1438  // Pass in iSync to let the EventSetup system know which run and lumi
1439  // need to be processed and prepare IOVs for it.
1440  // Pass in the endIOVWaitingTasks so the lumi can notify them when the
1441  // lumi is done and no longer needs its EventSetup IOVs.
1442  espController_->eventSetupForInstanceAsync(
1443  iSync, queueLumiWorkTaskHolder, status->endIOVWaitingTasks(), status->eventSetupImpls());
1444  sentry.completedSuccessfully();
1445 
1446  } catch (...) {
1447  queueLumiWorkTaskHolder.doneWaiting(std::current_exception());
1448  }
1449  }
1450  }

References actReg_, edm::BeginLuminosityBlock, CMS_SA_ALLOW, edm::WaitingTaskHolder::doneWaiting(), esp_, espController_, edm::PrincipalCache::eventPrincipal(), first, edm::WaitingTaskHolder::group(), handleNextEventForStreamAsync(), mps_fire::i, edm::waiting_task::chain::ifThen(), input_, edm::Service< T >::isAvailable(), looper_, lumiQueue_, edm::make_waiting_task(), eostools::move(), edm::PreallocationConfiguration::numberOfStreams(), edm::SerialTaskQueue::pause(), preallocations_, principalCache_, processContext_, edm::SerialTaskQueueChain::push(), edm::SerialTaskQueue::push(), queueWhichWaitsForIOVsToFinish_, readLuminosityBlock(), edm::waiting_task::chain::runLast(), schedule_, edm::SharedResourcesAcquirer::serialQueueChain(), serviceToken_, sourceResourcesAcquirer_, mps_update::status, streamEndLumiAsync(), streamLumiActive_, streamLumiStatus_, streamQueues_, subProcesses_, edm::WaitingTaskHolder::taskHasFailed(), edm::waiting_task::chain::then(), and createJobs::tmp.

Referenced by handleNextEventForStreamAsync(), and processLumis().

◆ beginProcessBlock()

void edm::EventProcessor::beginProcessBlock ( bool &  beginProcessBlockSucceeded)

Definition at line 976 of file EventProcessor.cc.

976  {
977  ProcessBlockPrincipal& processBlockPrincipal = principalCache_.processBlockPrincipal();
978  processBlockPrincipal.fillProcessBlockPrincipal(processConfiguration_->processName());
979 
980  using Traits = OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalBegin>;
981  FinalWaitingTask globalWaitTask;
982 
983  ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
984  beginGlobalTransitionAsync<Traits>(
985  WaitingTaskHolder(taskGroup_, &globalWaitTask), *schedule_, transitionInfo, serviceToken_, subProcesses_);
986 
987  do {
988  taskGroup_.wait();
989  } while (not globalWaitTask.done());
990 
991  if (globalWaitTask.exceptionPtr() != nullptr) {
992  std::rethrow_exception(*(globalWaitTask.exceptionPtr()));
993  }
994  beginProcessBlockSucceeded = true;
995  }

References edm::FinalWaitingTask::done(), edm::WaitingTask::exceptionPtr(), edm::ProcessBlockPrincipal::fillProcessBlockPrincipal(), principalCache_, edm::PrincipalCache::processBlockPrincipal(), processConfiguration_, schedule_, serviceToken_, subProcesses_, and taskGroup_.

◆ beginRun()

void edm::EventProcessor::beginRun ( ProcessHistoryID const &  phid,
RunNumber_t  run,
bool &  globalBeginSucceeded,
bool &  eventSetupForInstanceSucceeded 
)

Definition at line 1070 of file EventProcessor.cc.

1073  {
1074  globalBeginSucceeded = false;
1075  RunPrincipal& runPrincipal = principalCache_.runPrincipal(phid, run);
1076  {
1077  SendSourceTerminationSignalIfException sentry(actReg_.get());
1078 
1079  input_->doBeginRun(runPrincipal, &processContext_);
1080  sentry.completedSuccessfully();
1081  }
1082 
1083  IOVSyncValue ts(EventID(runPrincipal.run(), 0, 0), runPrincipal.beginTime());
1085  espController_->forceCacheClear();
1086  }
1087  {
1088  SendSourceTerminationSignalIfException sentry(actReg_.get());
1090  eventSetupForInstanceSucceeded = true;
1091  sentry.completedSuccessfully();
1092  }
1093  auto const& es = esp_->eventSetupImpl();
1094  if (looper_ && looperBeginJobRun_ == false) {
1095  looper_->copyInfo(ScheduleInfo(schedule_.get()));
1096 
1097  FinalWaitingTask waitTask;
1098  using namespace edm::waiting_task::chain;
1099  chain::first([this, &es](auto nextTask) {
1100  looper_->esPrefetchAsync(nextTask, es, Transition::BeginRun, serviceToken_);
1101  }) | then([this, &es](auto nextTask) mutable {
1102  looper_->beginOfJob(es);
1103  looperBeginJobRun_ = true;
1104  looper_->doStartingNewLoop();
1105  }) | runLast(WaitingTaskHolder(taskGroup_, &waitTask));
1106 
1107  do {
1108  taskGroup_.wait();
1109  } while (not waitTask.done());
1110  if (waitTask.exceptionPtr() != nullptr) {
1111  std::rethrow_exception(*(waitTask.exceptionPtr()));
1112  }
1113  }
1114  {
1115  using Traits = OccurrenceTraits<RunPrincipal, BranchActionGlobalBegin>;
1116  FinalWaitingTask globalWaitTask;
1117 
1118  using namespace edm::waiting_task::chain;
1119  chain::first([&runPrincipal, &es, this](auto waitTask) {
1120  RunTransitionInfo transitionInfo(runPrincipal, es);
1121  beginGlobalTransitionAsync<Traits>(
1122  std::move(waitTask), *schedule_, transitionInfo, serviceToken_, subProcesses_);
1123  }) | then([&globalBeginSucceeded, run](auto waitTask) mutable {
1124  globalBeginSucceeded = true;
1125  FDEBUG(1) << "\tbeginRun " << run << "\n";
1126  }) | ifThen(looper_, [this, &runPrincipal, &es](auto waitTask) {
1127  looper_->prefetchAsync(waitTask, serviceToken_, Transition::BeginRun, runPrincipal, es);
1128  }) | ifThen(looper_, [this, &runPrincipal, &es](auto waitTask) {
1129  looper_->doBeginRun(runPrincipal, es, &processContext_);
1130  }) | runLast(WaitingTaskHolder(taskGroup_, &globalWaitTask));
1131 
1132  do {
1133  taskGroup_.wait();
1134  } while (not globalWaitTask.done());
1135  if (globalWaitTask.exceptionPtr() != nullptr) {
1136  std::rethrow_exception(*(globalWaitTask.exceptionPtr()));
1137  }
1138  }
1139  {
1140  //To wait, the ref count has to be 1+#streams
1141  FinalWaitingTask streamLoopWaitTask;
1142 
1143  using Traits = OccurrenceTraits<RunPrincipal, BranchActionStreamBegin>;
1144 
1145  RunTransitionInfo transitionInfo(runPrincipal, es);
1146  beginStreamsTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &streamLoopWaitTask),
1147  *schedule_,
1149  transitionInfo,
1150  serviceToken_,
1151  subProcesses_);
1152  do {
1153  taskGroup_.wait();
1154  } while (not streamLoopWaitTask.done());
1155  if (streamLoopWaitTask.exceptionPtr() != nullptr) {
1156  std::rethrow_exception(*(streamLoopWaitTask.exceptionPtr()));
1157  }
1158  }
1159  FDEBUG(1) << "\tstreamBeginRun " << run << "\n";
1160  if (looper_) {
1161  //looper_->doStreamBeginRun(schedule_->streamID(),runPrincipal, es);
1162  }
1163  }

References actReg_, edm::BeginRun, edm::RunPrincipal::beginTime(), edm::FinalWaitingTask::done(), esp_, espController_, edm::WaitingTask::exceptionPtr(), FDEBUG, first, forceESCacheClearOnNewRun_, edm::waiting_task::chain::ifThen(), input_, looper_, looperBeginJobRun_, eostools::move(), edm::PreallocationConfiguration::numberOfStreams(), preallocations_, principalCache_, processContext_, edm::RunPrincipal::run(), run(), edm::waiting_task::chain::runLast(), edm::PrincipalCache::runPrincipal(), schedule_, serviceToken_, subProcesses_, edm::eventsetup::synchronousEventSetupForInstance(), taskGroup_, and edm::waiting_task::chain::then().

◆ branchIDListHelper() [1/2]

std::shared_ptr<BranchIDListHelper>& edm::EventProcessor::branchIDListHelper ( )
inlineprivate

Definition at line 298 of file EventProcessor.h.

References branchIDListHelper_, and edm::get_underlying_safe().

◆ branchIDListHelper() [2/2]

std::shared_ptr<BranchIDListHelper const> edm::EventProcessor::branchIDListHelper ( ) const
inlineprivate

Definition at line 295 of file EventProcessor.h.

295  {
297  }

References branchIDListHelper_, and edm::get_underlying_safe().

Referenced by init().

◆ checkForAsyncStopRequest()

bool edm::EventProcessor::checkForAsyncStopRequest ( StatusCode returnCode)
private

Definition at line 746 of file EventProcessor.cc.

746  {
747  bool returnValue = false;
748 
749  // Look for a shutdown signal
750  if (shutdown_flag.load(std::memory_order_acquire)) {
751  returnValue = true;
753  }
754  return returnValue;
755  }

References epSignal, runEdmFileComparison::returnCode, and edm::shutdown_flag.

Referenced by nextTransitionType().

◆ clearCounters()

void edm::EventProcessor::clearCounters ( )

Clears counters used by trigger report.

Definition at line 740 of file EventProcessor.cc.

740 { schedule_->clearCounters(); }

References schedule_.

◆ closeInputFile()

void edm::EventProcessor::closeInputFile ( bool  cleaningUpAfterException)

Definition at line 876 of file EventProcessor.cc.

876  {
877  if (fileBlockValid()) {
878  SendSourceTerminationSignalIfException sentry(actReg_.get());
879  input_->closeFile(fb_.get(), cleaningUpAfterException);
880  sentry.completedSuccessfully();
881  }
882  FDEBUG(1) << "\tcloseInputFile\n";
883  }

References actReg_, fb_, FDEBUG, fileBlockValid(), and input_.

◆ closeOutputFiles()

void edm::EventProcessor::closeOutputFiles ( )

Definition at line 893 of file EventProcessor.cc.

893  {
894  schedule_->closeOutputFiles();
895  for_all(subProcesses_, [](auto& subProcess) { subProcess.closeOutputFiles(); });
896  processBlockHelper_->clearAfterOutputFilesClose();
897  FDEBUG(1) << "\tcloseOutputFiles\n";
898  }

References FDEBUG, edm::for_all(), processBlockHelper_, schedule_, and subProcesses_.

◆ continueLumiAsync()

void edm::EventProcessor::continueLumiAsync ( edm::WaitingTaskHolder  iHolder)

Definition at line 1452 of file EventProcessor.cc.

1452  {
1453  {
1454  //all streams are sharing the same status at the moment
1455  auto status = streamLumiStatus_[0]; //read from streamLumiActive_ happened in calling routine
1456  status->needToContinueLumi();
1457  status->startProcessingEvents();
1458  }
1459 
1460  unsigned int streamIndex = 0;
1461  tbb::task_arena arena{tbb::task_arena::attach()};
1462  for (; streamIndex < preallocations_.numberOfStreams() - 1; ++streamIndex) {
1463  arena.enqueue([this, streamIndex, h = iHolder]() { handleNextEventForStreamAsync(h, streamIndex); });
1464  }
1465  iHolder.group()->run(
1466  [this, streamIndex, h = std::move(iHolder)]() { handleNextEventForStreamAsync(h, streamIndex); });
1467  }

References edm::WaitingTaskHolder::group(), handleNextEventForStreamAsync(), eostools::move(), edm::PreallocationConfiguration::numberOfStreams(), preallocations_, mps_update::status, and streamLumiStatus_.

Referenced by processLumis().

◆ deleteLumiFromCache()

void edm::EventProcessor::deleteLumiFromCache ( LuminosityBlockProcessingStatus iStatus)

Definition at line 1758 of file EventProcessor.cc.

1758  {
1759  for (auto& s : subProcesses_) {
1760  s.deleteLumiFromCache(*iStatus.lumiPrincipal());
1761  }
1762  iStatus.lumiPrincipal()->clearPrincipal();
1763  //FDEBUG(1) << "\tdeleteLumiFromCache " << run << "/" << lumi << "\n";
1764  }

References edm::LuminosityBlockProcessingStatus::lumiPrincipal(), alignCSCRings::s, and subProcesses_.

Referenced by globalEndLumiAsync().

◆ deleteRunFromCache()

void edm::EventProcessor::deleteRunFromCache ( ProcessHistoryID const &  phid,
RunNumber_t  run 
)

Definition at line 1734 of file EventProcessor.cc.

1734  {
1735  principalCache_.deleteRun(phid, run);
1736  for_all(subProcesses_, [run, phid](auto& subProcess) { subProcess.deleteRunFromCache(phid, run); });
1737  FDEBUG(1) << "\tdeleteRunFromCache " << run << "\n";
1738  }

References edm::PrincipalCache::deleteRun(), FDEBUG, edm::for_all(), principalCache_, run(), and subProcesses_.

Referenced by endUnfinishedRun().

◆ doErrorStuff()

void edm::EventProcessor::doErrorStuff ( )

Definition at line 967 of file EventProcessor.cc.

967  {
968  FDEBUG(1) << "\tdoErrorStuff\n";
969  LogError("StateMachine") << "The EventProcessor state machine encountered an unexpected event\n"
970  << "and went to the error state\n"
971  << "Will attempt to terminate processing normally\n"
972  << "(IF using the looper the next loop will be attempted)\n"
973  << "This likely indicates a bug in an input module or corrupted input or both\n";
974  }

References FDEBUG.

Referenced by runToCompletion().

◆ enableEndPaths()

void edm::EventProcessor::enableEndPaths ( bool  active)

Turn end_paths "off" if "active" is false; turn end_paths "on" if "active" is true.

Definition at line 736 of file EventProcessor.cc.

736 { schedule_->enableEndPaths(active); }

References schedule_.

◆ endJob()

void edm::EventProcessor::endJob ( void  )

This should be called before the EventProcessor is destroyed throws if any module's endJob throws an exception.

Definition at line 693 of file EventProcessor.cc.

693  {
694  // Collects exceptions, so we don't throw before all operations are performed.
695  ExceptionCollector c(
696  "Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
697 
698  //make the services available
700 
701  //NOTE: this really should go elsewhere in the future
702  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
703  c.call([this, i]() { this->schedule_->endStream(i); });
704  for (auto& subProcess : subProcesses_) {
705  c.call([&subProcess, i]() { subProcess.doEndStream(i); });
706  }
707  }
708  auto actReg = actReg_.get();
709  c.call([actReg]() { actReg->preEndJobSignal_(); });
710  schedule_->endJob(c);
711  for (auto& subProcess : subProcesses_) {
712  c.call(std::bind(&SubProcess::doEndJob, &subProcess));
713  }
714  c.call(std::bind(&InputSource::doEndJob, input_.get()));
715  if (looper_) {
716  c.call(std::bind(&EDLooperBase::endOfJob, looper()));
717  }
718  c.call([actReg]() { actReg->postEndJobSignal_(); });
719  if (c.hasThrown()) {
720  c.rethrow();
721  }
722  }

References actReg_, c, edm::SubProcess::doEndJob(), edm::InputSource::doEndJob(), edm::EDLooperBase::endOfJob(), mps_fire::i, input_, looper(), looper_, edm::PreallocationConfiguration::numberOfStreams(), preallocations_, schedule_, serviceToken_, and subProcesses_.

Referenced by PythonEventProcessor::~PythonEventProcessor().

◆ endOfLoop()

bool edm::EventProcessor::endOfLoop ( )

Definition at line 928 of file EventProcessor.cc.

928  {
929  if (looper_) {
930  ModuleChanger changer(schedule_.get(), preg_.get(), esp_->recordsToProxyIndices());
931  looper_->setModuleChanger(&changer);
932  EDLooperBase::Status status = looper_->doEndOfLoop(esp_->eventSetupImpl());
933  looper_->setModuleChanger(nullptr);
935  return true;
936  else
937  return false;
938  }
939  FDEBUG(1) << "\tendOfLoop\n";
940  return true;
941  }

References esp_, FDEBUG, forceLooperToEnd_, edm::EDLooperBase::kContinue, looper_, preg_, schedule_, and mps_update::status.

Referenced by runToCompletion().

◆ endPathsEnabled()

bool edm::EventProcessor::endPathsEnabled ( ) const

Return true if end_paths are active, and false if they are inactive.

Definition at line 738 of file EventProcessor.cc.

738 { return schedule_->endPathsEnabled(); }

References schedule_.

◆ endProcessBlock()

void edm::EventProcessor::endProcessBlock ( bool  cleaningUpAfterException,
bool  beginProcessBlockSucceeded 
)

Definition at line 1033 of file EventProcessor.cc.

1033  {
1034  ProcessBlockPrincipal& processBlockPrincipal = principalCache_.processBlockPrincipal();
1035 
1036  using Traits = OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalEnd>;
1037  FinalWaitingTask globalWaitTask;
1038 
1039  ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
1040  endGlobalTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &globalWaitTask),
1041  *schedule_,
1042  transitionInfo,
1043  serviceToken_,
1044  subProcesses_,
1045  cleaningUpAfterException);
1046  do {
1047  taskGroup_.wait();
1048  } while (not globalWaitTask.done());
1049  if (globalWaitTask.exceptionPtr() != nullptr) {
1050  std::rethrow_exception(*(globalWaitTask.exceptionPtr()));
1051  }
1052 
1053  if (beginProcessBlockSucceeded) {
1054  FinalWaitingTask writeWaitTask;
1056  do {
1057  taskGroup_.wait();
1058  } while (not writeWaitTask.done());
1059  if (writeWaitTask.exceptionPtr()) {
1060  std::rethrow_exception(*writeWaitTask.exceptionPtr());
1061  }
1062  }
1063 
1064  processBlockPrincipal.clearPrincipal();
1065  for (auto& s : subProcesses_) {
1066  s.clearProcessBlockPrincipal(ProcessBlockType::New);
1067  }
1068  }

References edm::Principal::clearPrincipal(), edm::FinalWaitingTask::done(), edm::WaitingTask::exceptionPtr(), edm::PrincipalCache::New, principalCache_, edm::PrincipalCache::processBlockPrincipal(), alignCSCRings::s, schedule_, serviceToken_, subProcesses_, taskGroup_, and writeProcessBlockAsync().

◆ endRun()

void edm::EventProcessor::endRun ( ProcessHistoryID const &  phid,
RunNumber_t  run,
bool  globalBeginSucceeded,
bool  cleaningUpAfterException 
)

Definition at line 1192 of file EventProcessor.cc.

1195  {
1196  RunPrincipal& runPrincipal = principalCache_.runPrincipal(phid, run);
1197  runPrincipal.setEndTime(input_->timestamp());
1198 
1199  IOVSyncValue ts(
1200  EventID(runPrincipal.run(), LuminosityBlockID::maxLuminosityBlockNumber(), EventID::maxEventNumber()),
1201  runPrincipal.endTime());
1202  {
1203  SendSourceTerminationSignalIfException sentry(actReg_.get());
1205  sentry.completedSuccessfully();
1206  }
1207  auto const& es = esp_->eventSetupImpl();
1208  if (globalBeginSucceeded) {
1209  //To wait, the ref count has to be 1+#streams
1210  FinalWaitingTask streamLoopWaitTask;
1211 
1212  using Traits = OccurrenceTraits<RunPrincipal, BranchActionStreamEnd>;
1213 
1214  RunTransitionInfo transitionInfo(runPrincipal, es);
1215  endStreamsTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &streamLoopWaitTask),
1216  *schedule_,
1218  transitionInfo,
1219  serviceToken_,
1220  subProcesses_,
1221  cleaningUpAfterException);
1222  do {
1223  taskGroup_.wait();
1224  } while (not streamLoopWaitTask.done());
1225  if (streamLoopWaitTask.exceptionPtr() != nullptr) {
1226  std::rethrow_exception(*(streamLoopWaitTask.exceptionPtr()));
1227  }
1228  }
1229  FDEBUG(1) << "\tstreamEndRun " << run << "\n";
1230  if (looper_) {
1231  //looper_->doStreamEndRun(schedule_->streamID(),runPrincipal, es);
1232  }
1233  {
1234  FinalWaitingTask globalWaitTask;
1235 
1236  using namespace edm::waiting_task::chain;
1237  chain::first([this, &runPrincipal, &es, cleaningUpAfterException](auto nextTask) {
1238  RunTransitionInfo transitionInfo(runPrincipal, es);
1239  using Traits = OccurrenceTraits<RunPrincipal, BranchActionGlobalEnd>;
1240  endGlobalTransitionAsync<Traits>(
1241  std::move(nextTask), *schedule_, transitionInfo, serviceToken_, subProcesses_, cleaningUpAfterException);
1242  }) | ifThen(looper_, [this, &runPrincipal, &es](auto nextTask) {
1243  looper_->prefetchAsync(std::move(nextTask), serviceToken_, Transition::EndRun, runPrincipal, es);
1244  }) | ifThen(looper_, [this, &runPrincipal, &es](auto nextTask) {
1245  looper_->doEndRun(runPrincipal, es, &processContext_);
1246  }) | runLast(WaitingTaskHolder(taskGroup_, &globalWaitTask));
1247 
1248  do {
1249  taskGroup_.wait();
1250  } while (not globalWaitTask.done());
1251  if (globalWaitTask.exceptionPtr() != nullptr) {
1252  std::rethrow_exception(*(globalWaitTask.exceptionPtr()));
1253  }
1254  }
1255  FDEBUG(1) << "\tendRun " << run << "\n";
1256  }

References actReg_, edm::FinalWaitingTask::done(), edm::EndRun, edm::RunPrincipal::endTime(), esp_, espController_, edm::WaitingTask::exceptionPtr(), FDEBUG, first, edm::waiting_task::chain::ifThen(), input_, looper_, edm::EventID::maxEventNumber(), edm::LuminosityBlockID::maxLuminosityBlockNumber(), eostools::move(), edm::PreallocationConfiguration::numberOfStreams(), preallocations_, principalCache_, processContext_, edm::RunPrincipal::run(), run(), edm::waiting_task::chain::runLast(), edm::PrincipalCache::runPrincipal(), schedule_, serviceToken_, edm::RunPrincipal::setEndTime(), subProcesses_, edm::eventsetup::synchronousEventSetupForInstance(), and taskGroup_.

Referenced by endUnfinishedRun().

◆ endUnfinishedLumi()

void edm::EventProcessor::endUnfinishedLumi ( )

Definition at line 1596 of file EventProcessor.cc.

1596  {
1597  if (streamLumiActive_.load() > 0) {
1598  FinalWaitingTask globalWaitTask;
1599  {
1600  WaitingTaskHolder globalTaskHolder{taskGroup_, &globalWaitTask};
1601  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1602  if (streamLumiStatus_[i]) {
1603  streamEndLumiAsync(globalTaskHolder, i);
1604  }
1605  }
1606  }
1607  do {
1608  taskGroup_.wait();
1609  } while (not globalWaitTask.done());
1610  if (globalWaitTask.exceptionPtr() != nullptr) {
1611  std::rethrow_exception(*(globalWaitTask.exceptionPtr()));
1612  }
1613  }
1614  }

References edm::FinalWaitingTask::done(), edm::WaitingTask::exceptionPtr(), mps_fire::i, edm::PreallocationConfiguration::numberOfStreams(), preallocations_, streamEndLumiAsync(), streamLumiActive_, streamLumiStatus_, and taskGroup_.

◆ endUnfinishedRun()

void edm::EventProcessor::endUnfinishedRun ( ProcessHistoryID const &  phid,
RunNumber_t  run,
bool  globalBeginSucceeded,
bool  cleaningUpAfterException,
bool  eventSetupForInstanceSucceeded 
)

Definition at line 1165 of file EventProcessor.cc.

1169  {
1170  if (eventSetupForInstanceSucceeded) {
1171  //If we skip empty runs, this would be called conditionally
1172  endRun(phid, run, globalBeginSucceeded, cleaningUpAfterException);
1173 
1174  if (globalBeginSucceeded) {
1175  FinalWaitingTask t;
1176  RunPrincipal& runPrincipal = principalCache_.runPrincipal(phid, run);
1177  MergeableRunProductMetadata* mergeableRunProductMetadata = runPrincipal.mergeableRunProductMetadata();
1178  mergeableRunProductMetadata->preWriteRun();
1179  writeRunAsync(edm::WaitingTaskHolder{taskGroup_, &t}, phid, run, mergeableRunProductMetadata);
1180  do {
1181  taskGroup_.wait();
1182  } while (not t.done());
1183  mergeableRunProductMetadata->postWriteRun();
1184  if (t.exceptionPtr()) {
1185  std::rethrow_exception(*t.exceptionPtr());
1186  }
1187  }
1188  }
1189  deleteRunFromCache(phid, run);
1190  }

References deleteRunFromCache(), endRun(), edm::RunPrincipal::mergeableRunProductMetadata(), edm::MergeableRunProductMetadata::postWriteRun(), edm::MergeableRunProductMetadata::preWriteRun(), principalCache_, run(), edm::PrincipalCache::runPrincipal(), submitPVValidationJobs::t, taskGroup_, and writeRunAsync().

◆ fileBlockValid()

bool edm::EventProcessor::fileBlockValid ( )
inline

Definition at line 204 of file EventProcessor.h.

204 { return fb_.get() != nullptr; }

References fb_.

Referenced by closeInputFile(), openOutputFiles(), respondToCloseInputFile(), and respondToOpenInputFile().

◆ getAllModuleDescriptions()

std::vector< ModuleDescription const * > edm::EventProcessor::getAllModuleDescriptions ( ) const

Return a vector allowing const access to all the ModuleDescriptions for this EventProccessor. *** N.B. *** Ownership of the ModuleDescriptions is not *** passed to the caller. Do not call delete on these *** pointers!

Definition at line 726 of file EventProcessor.cc.

726  {
727  return schedule_->getAllModuleDescriptions();
728  }

References schedule_.

◆ getToken()

ServiceToken edm::EventProcessor::getToken ( )

Definition at line 724 of file EventProcessor.cc.

724 { return serviceToken_; }

References serviceToken_.

Referenced by ~EventProcessor().

◆ globalEndLumiAsync()

void edm::EventProcessor::globalEndLumiAsync ( edm::WaitingTaskHolder  iTask,
std::shared_ptr< LuminosityBlockProcessingStatus iLumiStatus 
)

Definition at line 1478 of file EventProcessor.cc.

1479  {
1480  // Get some needed info out of the status object before moving
1481  // it into finalTaskForThisLumi.
1482  auto& lp = *(iLumiStatus->lumiPrincipal());
1483  bool didGlobalBeginSucceed = iLumiStatus->didGlobalBeginSucceed();
1484  bool cleaningUpAfterException = iLumiStatus->cleaningUpAfterException();
1485  EventSetupImpl const& es = iLumiStatus->eventSetupImpl(esp_->subProcessIndex());
1486  std::vector<std::shared_ptr<const EventSetupImpl>> const* eventSetupImpls = &iLumiStatus->eventSetupImpls();
1487 
1488  using namespace edm::waiting_task::chain;
1489  chain::first([this, &lp, &es, &eventSetupImpls, cleaningUpAfterException](auto nextTask) {
1490  IOVSyncValue ts(EventID(lp.run(), lp.luminosityBlock(), EventID::maxEventNumber()), lp.beginTime());
1491 
1492  LumiTransitionInfo transitionInfo(lp, es, eventSetupImpls);
1493  using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalEnd>;
1494  endGlobalTransitionAsync<Traits>(
1495  std::move(nextTask), *schedule_, transitionInfo, serviceToken_, subProcesses_, cleaningUpAfterException);
1496  }) | then([this, didGlobalBeginSucceed, &lumiPrincipal = lp](auto nextTask) {
1497  //Only call writeLumi if beginLumi succeeded
1498  if (didGlobalBeginSucceed) {
1499  writeLumiAsync(std::move(nextTask), lumiPrincipal);
1500  }
1501  }) | ifThen(looper_, [this, &lp, &es](auto nextTask) {
1502  looper_->prefetchAsync(std::move(nextTask), serviceToken_, Transition::EndLuminosityBlock, lp, es);
1503  }) | ifThen(looper_, [this, &lp, &es](auto nextTask) {
1504  //any thrown exception auto propagates to nextTask via the chain
1506  looper_->doEndLuminosityBlock(lp, es, &processContext_);
1507  }) | then([status = std::move(iLumiStatus), this](std::exception_ptr const* iPtr, auto nextTask) mutable {
1508  std::exception_ptr ptr;
1509  if (iPtr) {
1510  ptr = *iPtr;
1511  }
1513 
1514  // Try hard to clean up resources so the
1515  // process can terminate in a controlled
1516  // fashion even after exceptions have occurred.
1517  // Caught exception is passed to handleEndLumiExceptions()
1518  CMS_SA_ALLOW try { deleteLumiFromCache(*status); } catch (...) {
1519  if (not ptr) {
1520  ptr = std::current_exception();
1521  }
1522  }
1523  // Caught exception is passed to handleEndLumiExceptions()
1524  CMS_SA_ALLOW try {
1525  status->resumeGlobalLumiQueue();
1527  } catch (...) {
1528  if (not ptr) {
1529  ptr = std::current_exception();
1530  }
1531  }
1532  // Caught exception is passed to handleEndLumiExceptions()
1533  CMS_SA_ALLOW try {
1534  // This call to status.resetResources() must occur before iTask is destroyed.
1535  // Otherwise there will be a data race which could result in endRun
1536  // being delayed until it is too late to successfully call it.
1537  status->resetResources();
1538  status.reset();
1539  } catch (...) {
1540  if (not ptr) {
1541  ptr = std::current_exception();
1542  }
1543  }
1544 
1545  if (ptr) {
1546  handleEndLumiExceptions(&ptr, nextTask);
1547  }
1548  }) | runLast(std::move(iTask));
1549  }

References CMS_SA_ALLOW, deleteLumiFromCache(), edm::EndLuminosityBlock, esp_, first, handleEndLumiExceptions(), edm::waiting_task::chain::ifThen(), looper_, edm::EventID::maxEventNumber(), eostools::move(), processContext_, queueWhichWaitsForIOVsToFinish_, edm::SerialTaskQueue::resume(), edm::waiting_task::chain::runLast(), schedule_, serviceToken_, mps_update::status, subProcesses_, edm::waiting_task::chain::then(), and writeLumiAsync().

Referenced by streamEndLumiAsync().

◆ handleEndLumiExceptions()

void edm::EventProcessor::handleEndLumiExceptions ( std::exception_ptr const *  iPtr,
WaitingTaskHolder holder 
)

Definition at line 1469 of file EventProcessor.cc.

1469  {
1470  if (setDeferredException(*iPtr)) {
1471  WaitingTaskHolder tmp(holder);
1472  tmp.doneWaiting(*iPtr);
1473  } else {
1475  }
1476  }

References setDeferredException(), setExceptionMessageLumis(), and createJobs::tmp.

Referenced by globalEndLumiAsync(), and streamEndLumiAsync().

◆ handleNextEventForStreamAsync()

void edm::EventProcessor::handleNextEventForStreamAsync ( WaitingTaskHolder  iTask,
unsigned int  iStreamIndex 
)
private

Definition at line 1828 of file EventProcessor.cc.

1828  {
1829  sourceResourcesAcquirer_.serialQueueChain().push(*iTask.group(), [this, iTask, iStreamIndex]() mutable {
1831  //we do not want to extend the lifetime of the shared_ptr to the end of this function
1832  // as steramEndLumiAsync may clear the value from streamLumiStatus_[iStreamIndex]
1833  auto status = streamLumiStatus_[iStreamIndex].get();
1834  // Caught exception is propagated to EventProcessor::runToCompletion() via deferredExceptionPtr_
1835  CMS_SA_ALLOW try {
1836  if (readNextEventForStream(iStreamIndex, *status)) {
1837  auto recursionTask = make_waiting_task([this, iTask, iStreamIndex](std::exception_ptr const* iPtr) mutable {
1838  if (iPtr) {
1839  // Try to end the stream properly even if an exception was
1840  // thrown on an event.
1841  bool expected = false;
1842  if (deferredExceptionPtrIsSet_.compare_exchange_strong(expected, true)) {
1843  // This is the case where the exception in iPtr is the primary
1844  // exception and we want to see its message.
1845  deferredExceptionPtr_ = *iPtr;
1846  WaitingTaskHolder tempHolder(iTask);
1847  tempHolder.doneWaiting(*iPtr);
1848  }
1849  streamEndLumiAsync(std::move(iTask), iStreamIndex);
1850  //the stream will stop now
1851  return;
1852  }
1853  handleNextEventForStreamAsync(std::move(iTask), iStreamIndex);
1854  });
1855 
1856  processEventAsync(WaitingTaskHolder(*iTask.group(), recursionTask), iStreamIndex);
1857  } else {
1858  //the stream will stop now
1859  if (status->isLumiEnding()) {
1860  if (lastTransitionType() == InputSource::IsLumi and not status->haveStartedNextLumi()) {
1861  status->startNextLumi();
1862  beginLumiAsync(status->nextSyncValue(), status->runResource(), iTask);
1863  }
1864  streamEndLumiAsync(std::move(iTask), iStreamIndex);
1865  } else {
1866  iTask.doneWaiting(std::exception_ptr{});
1867  }
1868  }
1869  } catch (...) {
1870  // It is unlikely we will ever get in here ...
1871  // But if we do try to clean up and propagate the exception
1872  if (streamLumiStatus_[iStreamIndex]) {
1873  streamEndLumiAsync(iTask, iStreamIndex);
1874  }
1875  bool expected = false;
1876  if (deferredExceptionPtrIsSet_.compare_exchange_strong(expected, true)) {
1877  auto e = std::current_exception();
1879  iTask.doneWaiting(e);
1880  }
1881  }
1882  });
1883  }

References beginLumiAsync(), CMS_SA_ALLOW, deferredExceptionPtr_, deferredExceptionPtrIsSet_, edm::WaitingTaskHolder::doneWaiting(), MillePedeFileConverter_cfg::e, edm::WaitingTaskHolder::group(), edm::InputSource::IsLumi, lastTransitionType(), edm::make_waiting_task(), eostools::move(), processEventAsync(), edm::SerialTaskQueueChain::push(), readNextEventForStream(), edm::SharedResourcesAcquirer::serialQueueChain(), serviceToken_, sourceResourcesAcquirer_, mps_update::status, streamEndLumiAsync(), and streamLumiStatus_.

Referenced by beginLumiAsync(), and continueLumiAsync().

◆ init()

void edm::EventProcessor::init ( std::shared_ptr< ProcessDesc > &  processDesc,
ServiceToken const &  token,
serviceregistry::ServiceLegacy  iLegacy 
)
private

Definition at line 342 of file EventProcessor.cc.

344  {
345  //std::cerr << processDesc->dump() << std::endl;
346 
347  // register the empty parentage vector , once and for all
349 
350  // register the empty parameter set, once and for all.
351  ParameterSet().registerIt();
352 
353  std::shared_ptr<ParameterSet> parameterSet = processDesc->getProcessPSet();
354 
355  // If there are subprocesses, pop the subprocess parameter sets out of the process parameter set
356  auto subProcessVParameterSet = popSubProcessVParameterSet(*parameterSet);
357  bool const hasSubProcesses = !subProcessVParameterSet.empty();
358 
359  // Validates the parameters in the 'options', 'maxEvents', 'maxLuminosityBlocks',
360  // and 'maxSecondsUntilRampdown' top level parameter sets. Default values are also
361  // set in here if the parameters were not explicitly set.
363 
364  // Now set some parameters specific to the main process.
365  ParameterSet const& optionsPset(parameterSet->getUntrackedParameterSet("options"));
366  auto const& fileMode = optionsPset.getUntrackedParameter<std::string>("fileMode");
367  if (fileMode != "NOMERGE" and fileMode != "FULLMERGE") {
368  throw Exception(errors::Configuration, "Illegal fileMode parameter value: ")
369  << fileMode << ".\n"
370  << "Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
371  } else {
372  fileModeNoMerge_ = (fileMode == "NOMERGE");
373  }
374  forceESCacheClearOnNewRun_ = optionsPset.getUntrackedParameter<bool>("forceEventSetupCacheClearOnNewRun");
375 
376  //threading
377  unsigned int nThreads = optionsPset.getUntrackedParameter<unsigned int>("numberOfThreads");
378 
379  // Even if numberOfThreads was set to zero in the Python configuration, the code
380  // in cmsRun.cpp should have reset it to something else.
381  assert(nThreads != 0);
382 
383  unsigned int nStreams = optionsPset.getUntrackedParameter<unsigned int>("numberOfStreams");
384  if (nStreams == 0) {
385  nStreams = nThreads;
386  }
387  unsigned int nConcurrentRuns = optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentRuns");
388  if (nConcurrentRuns != 1) {
389  throw Exception(errors::Configuration, "Illegal value nConcurrentRuns : ")
390  << "Although the plan is to change this in the future, currently nConcurrentRuns must always be 1.\n";
391  }
392  unsigned int nConcurrentLumis =
393  optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentLuminosityBlocks");
394  if (nConcurrentLumis == 0) {
395  nConcurrentLumis = 2;
396  }
397  if (nConcurrentLumis > nStreams) {
398  nConcurrentLumis = nStreams;
399  }
400  std::vector<std::string> loopers = parameterSet->getParameter<std::vector<std::string>>("@all_loopers");
401  if (!loopers.empty()) {
402  //For now loopers make us run only 1 transition at a time
403  if (nStreams != 1 || nConcurrentLumis != 1 || nConcurrentRuns != 1) {
404  edm::LogWarning("ThreadStreamSetup") << "There is a looper, so the number of streams, the number "
405  "of concurrent runs, and the number of concurrent lumis "
406  "are all being reset to 1. Loopers cannot currently support "
407  "values greater than 1.";
408  nStreams = 1;
409  nConcurrentLumis = 1;
410  nConcurrentRuns = 1;
411  }
412  }
413  bool dumpOptions = optionsPset.getUntrackedParameter<bool>("dumpOptions");
414  if (dumpOptions) {
415  dumpOptionsToLogFile(nThreads, nStreams, nConcurrentLumis, nConcurrentRuns);
416  } else {
417  if (nThreads > 1 or nStreams > 1) {
418  edm::LogInfo("ThreadStreamSetup") << "setting # threads " << nThreads << "\nsetting # streams " << nStreams;
419  }
420  }
421  // The number of concurrent IOVs is configured individually for each record in
422  // the class NumberOfConcurrentIOVs to values less than or equal to this.
423  unsigned int maxConcurrentIOVs = nConcurrentLumis;
424 
425  //Check that relationships between threading parameters makes sense
426  /*
427  if(nThreads<nStreams) {
428  //bad
429  }
430  if(nConcurrentRuns>nStreams) {
431  //bad
432  }
433  if(nConcurrentRuns>nConcurrentLumis) {
434  //bad
435  }
436  */
437  IllegalParameters::setThrowAnException(optionsPset.getUntrackedParameter<bool>("throwIfIllegalParameter"));
438 
439  printDependencies_ = optionsPset.getUntrackedParameter<bool>("printDependencies");
441  optionsPset.getUntrackedParameter<bool>("deleteNonConsumedUnscheduledModules");
442 
443  // Now do general initialization
444  ScheduleItems items;
445 
446  //initialize the services
447  auto& serviceSets = processDesc->getServicesPSets();
448  ServiceToken token = items.initServices(serviceSets, *parameterSet, iToken, iLegacy, true);
449  serviceToken_ = items.addCPRandTNS(*parameterSet, token);
450 
451  //make the services available
453 
454  if (nStreams > 1) {
456  handler->willBeUsingThreads();
457  }
458 
459  // intialize miscellaneous items
460  std::shared_ptr<CommonParams> common(items.initMisc(*parameterSet));
461 
462  // intialize the event setup provider
463  ParameterSet const& eventSetupPset(optionsPset.getUntrackedParameterSet("eventSetup"));
464  esp_ = espController_->makeProvider(
465  *parameterSet, items.actReg_.get(), &eventSetupPset, maxConcurrentIOVs, dumpOptions);
466 
467  // initialize the looper, if any
468  if (!loopers.empty()) {
470  looper_->setActionTable(items.act_table_.get());
471  looper_->attachTo(*items.actReg_);
472 
473  // in presence of looper do not delete modules
475  }
476 
477  preallocations_ = PreallocationConfiguration{nThreads, nStreams, nConcurrentLumis, nConcurrentRuns};
478 
479  lumiQueue_ = std::make_unique<LimitedTaskQueue>(nConcurrentLumis);
480  streamQueues_.resize(nStreams);
481  streamLumiStatus_.resize(nStreams);
482 
483  processBlockHelper_ = std::make_shared<ProcessBlockHelper>();
484 
485  // initialize the input source
487  *common,
488  items.preg(),
489  items.branchIDListHelper(),
491  items.thinnedAssociationsHelper(),
492  items.actReg_,
493  items.processConfiguration(),
495 
496  // initialize the Schedule
497  schedule_ =
498  items.initSchedule(*parameterSet, hasSubProcesses, preallocations_, &processContext_, *processBlockHelper_);
499 
500  // set the data members
501  act_table_ = std::move(items.act_table_);
502  actReg_ = items.actReg_;
503  preg_ = items.preg();
505  branchIDListHelper_ = items.branchIDListHelper();
506  thinnedAssociationsHelper_ = items.thinnedAssociationsHelper();
507  processConfiguration_ = items.processConfiguration();
509  principalCache_.setProcessHistoryRegistry(input_->processHistoryRegistry());
510 
511  FDEBUG(2) << parameterSet << std::endl;
512 
514  for (unsigned int index = 0; index < preallocations_.numberOfStreams(); ++index) {
515  // Reusable event principal
516  auto ep = std::make_shared<EventPrincipal>(preg(),
520  historyAppender_.get(),
521  index,
522  true /*primary process*/,
525  }
526 
527  for (unsigned int index = 0; index < preallocations_.numberOfLuminosityBlocks(); ++index) {
528  auto lp =
529  std::make_unique<LuminosityBlockPrincipal>(preg(), *processConfiguration_, historyAppender_.get(), index);
531  }
532 
533  {
534  auto pb = std::make_unique<ProcessBlockPrincipal>(preg(), *processConfiguration_);
536 
537  auto pbForInput = std::make_unique<ProcessBlockPrincipal>(preg(), *processConfiguration_);
539  }
540 
541  // fill the subprocesses, if there are any
542  subProcesses_.reserve(subProcessVParameterSet.size());
543  for (auto& subProcessPSet : subProcessVParameterSet) {
544  subProcesses_.emplace_back(subProcessPSet,
545  *parameterSet,
546  preg(),
550  SubProcessParentageHelper(),
552  *actReg_,
553  token,
556  &processContext_);
557  }
558  }

References act_table_, actReg_, cms::cuda::assert(), branchIDListHelper(), branchIDListHelper_, trackingPlots::common, edm::errors::Configuration, deleteNonConsumedUnscheduledModules_, edm::dumpOptionsToLogFile(), SiStripBadComponentsDQMServiceTemplate_cfg::ep, esp_, espController_, Exception, FDEBUG, processOptions_cff::fileMode, fileModeNoMerge_, edm::fillLooper(), forceESCacheClearOnNewRun_, edm::get_underlying_safe(), edm::ParameterSet::getParameter(), edm::ParameterSet::getUntrackedParameterSet(), historyAppender_, input_, edm::PrincipalCache::insert(), edm::PrincipalCache::insertForInput(), edm::ParentageRegistry::insertMapped(), edm::ParentageRegistry::instance(), mps_monitormerge::items, edm::serviceregistry::kConfigurationOverrides, looper_, lumiQueue_, edm::makeInput(), mergeableRunProductProcesses_, eostools::move(), runTheMatrix::nStreams, runTheMatrix::nThreads, edm::PreallocationConfiguration::numberOfLuminosityBlocks(), edm::PreallocationConfiguration::numberOfStreams(), or, edm::parameterSet(), edm::popSubProcessVParameterSet(), preallocations_, preg(), preg_, principalCache_, printDependencies_, processBlockHelper_, processConfiguration_, processContext_, edm::ParameterSet::registerIt(), schedule_, serviceToken_, edm::PrincipalCache::setNumberOfConcurrentPrincipals(), edm::ProcessContext::setProcessConfiguration(), edm::MergeableRunProductProcesses::setProcessesWithMergeableRunProducts(), edm::PrincipalCache::setProcessHistoryRegistry(), edm::IllegalParameters::setThrowAnException(), streamLumiStatus_, streamQueues_, AlCaHLTBitMon_QueryRunRegistry::string, subProcesses_, thinnedAssociationsHelper(), thinnedAssociationsHelper_, unpackBuffers-CaloStage2::token, and edm::validateTopLevelParameterSets().

Referenced by EventProcessor().

◆ inputProcessBlocks()

void edm::EventProcessor::inputProcessBlocks ( )

Definition at line 997 of file EventProcessor.cc.

997  {
998  input_->fillProcessBlockHelper();
999  ProcessBlockPrincipal& processBlockPrincipal = principalCache_.inputProcessBlockPrincipal();
1000  while (input_->nextProcessBlock(processBlockPrincipal)) {
1001  readProcessBlock(processBlockPrincipal);
1002 
1003  using Traits = OccurrenceTraits<ProcessBlockPrincipal, BranchActionProcessBlockInput>;
1004  FinalWaitingTask globalWaitTask;
1005 
1006  ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
1007  beginGlobalTransitionAsync<Traits>(
1008  WaitingTaskHolder(taskGroup_, &globalWaitTask), *schedule_, transitionInfo, serviceToken_, subProcesses_);
1009 
1010  do {
1011  taskGroup_.wait();
1012  } while (not globalWaitTask.done());
1013  if (globalWaitTask.exceptionPtr() != nullptr) {
1014  std::rethrow_exception(*(globalWaitTask.exceptionPtr()));
1015  }
1016 
1017  FinalWaitingTask writeWaitTask;
1019  do {
1020  taskGroup_.wait();
1021  } while (not writeWaitTask.done());
1022  if (writeWaitTask.exceptionPtr()) {
1023  std::rethrow_exception(*writeWaitTask.exceptionPtr());
1024  }
1025 
1026  processBlockPrincipal.clearPrincipal();
1027  for (auto& s : subProcesses_) {
1028  s.clearProcessBlockPrincipal(ProcessBlockType::Input);
1029  }
1030  }
1031  }

References edm::Principal::clearPrincipal(), edm::FinalWaitingTask::done(), edm::WaitingTask::exceptionPtr(), edm::PrincipalCache::Input, input_, edm::PrincipalCache::inputProcessBlockPrincipal(), principalCache_, readProcessBlock(), alignCSCRings::s, schedule_, serviceToken_, subProcesses_, taskGroup_, and writeProcessBlockAsync().

◆ lastTransitionType()

InputSource::ItemType edm::EventProcessor::lastTransitionType ( ) const
inline

Definition at line 194 of file EventProcessor.h.

194  {
196  return InputSource::IsStop;
197  }
198  return lastSourceTransition_;
199  }

References deferredExceptionPtrIsSet_, edm::InputSource::IsStop, and lastSourceTransition_.

Referenced by handleNextEventForStreamAsync(), and processLumis().

◆ looper() [1/2]

std::shared_ptr<EDLooperBase>& edm::EventProcessor::looper ( )
inlineprivate

Definition at line 306 of file EventProcessor.h.

306 { return get_underlying_safe(looper_); }

References edm::get_underlying_safe(), and looper_.

◆ looper() [2/2]

std::shared_ptr<EDLooperBase const> edm::EventProcessor::looper ( ) const
inlineprivate

Definition at line 305 of file EventProcessor.h.

305 { return get_underlying_safe(looper_); }

References edm::get_underlying_safe(), and looper_.

Referenced by endJob().

◆ nextLuminosityBlockID()

edm::LuminosityBlockNumber_t edm::EventProcessor::nextLuminosityBlockID ( )

Definition at line 787 of file EventProcessor.cc.

787 { return input_->luminosityBlock(); }

References input_.

Referenced by readNextEventForStream().

◆ nextRunID()

std::pair< edm::ProcessHistoryID, edm::RunNumber_t > edm::EventProcessor::nextRunID ( )

Definition at line 783 of file EventProcessor.cc.

783  {
784  return std::make_pair(input_->reducedProcessHistoryID(), input_->run());
785  }

References input_.

◆ nextTransitionType()

InputSource::ItemType edm::EventProcessor::nextTransitionType ( )

Definition at line 757 of file EventProcessor.cc.

757  {
758  if (deferredExceptionPtrIsSet_.load()) {
760  return InputSource::IsStop;
761  }
762 
763  SendSourceTerminationSignalIfException sentry(actReg_.get());
764  InputSource::ItemType itemType;
765  //For now, do nothing with InputSource::IsSynchronize
766  do {
767  itemType = input_->nextItemType();
768  } while (itemType == InputSource::IsSynchronize);
769 
770  lastSourceTransition_ = itemType;
771  sentry.completedSuccessfully();
772 
774 
776  actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExternalSignal);
778  }
779 
780  return lastSourceTransition_;
781  }

References actReg_, checkForAsyncStopRequest(), deferredExceptionPtrIsSet_, epSuccess, edm::ExternalSignal, input_, edm::InputSource::IsStop, edm::InputSource::IsSynchronize, lastSourceTransition_, and runEdmFileComparison::returnCode.

Referenced by readNextEventForStream().

◆ openOutputFiles()

void edm::EventProcessor::openOutputFiles ( )

Definition at line 885 of file EventProcessor.cc.

885  {
886  if (fileBlockValid()) {
887  schedule_->openOutputFiles(*fb_);
888  for_all(subProcesses_, [this](auto& subProcess) { subProcess.openOutputFiles(*fb_); });
889  }
890  FDEBUG(1) << "\topenOutputFiles\n";
891  }

References fb_, FDEBUG, fileBlockValid(), edm::for_all(), schedule_, and subProcesses_.

◆ operator=()

EventProcessor& edm::EventProcessor::operator= ( EventProcessor const &  )
delete

◆ preg() [1/2]

std::shared_ptr<ProductRegistry>& edm::EventProcessor::preg ( )
inlineprivate

Definition at line 294 of file EventProcessor.h.

294 { return get_underlying_safe(preg_); }

References edm::get_underlying_safe(), and preg_.

◆ preg() [2/2]

std::shared_ptr<ProductRegistry const> edm::EventProcessor::preg ( ) const
inlineprivate

Definition at line 293 of file EventProcessor.h.

293 { return get_underlying_safe(preg_); }

References edm::get_underlying_safe(), and preg_.

Referenced by beginJob(), init(), readAndMergeLumi(), readAndMergeRun(), readFile(), and readRun().

◆ prepareForNextLoop()

void edm::EventProcessor::prepareForNextLoop ( )

Definition at line 949 of file EventProcessor.cc.

949  {
950  looper_->prepareForNextLoop(esp_.get());
951  FDEBUG(1) << "\tprepareForNextLoop\n";
952  }

References esp_, FDEBUG, and looper_.

Referenced by runToCompletion().

◆ processConfiguration()

ProcessConfiguration const& edm::EventProcessor::processConfiguration ( ) const
inline

Definition at line 140 of file EventProcessor.h.

140 { return *processConfiguration_; }

References processConfiguration_.

◆ processEventAsync()

void edm::EventProcessor::processEventAsync ( WaitingTaskHolder  iHolder,
unsigned int  iStreamIndex 
)
private

Definition at line 1899 of file EventProcessor.cc.

1899  {
1900  iHolder.group()->run([=]() { processEventAsyncImpl(iHolder, iStreamIndex); });
1901  }

References edm::WaitingTaskHolder::group(), and processEventAsyncImpl().

Referenced by handleNextEventForStreamAsync().

◆ processEventAsyncImpl()

void edm::EventProcessor::processEventAsyncImpl ( WaitingTaskHolder  iHolder,
unsigned int  iStreamIndex 
)
private

Definition at line 1903 of file EventProcessor.cc.

1903  {
1904  auto pep = &(principalCache_.eventPrincipal(iStreamIndex));
1905 
1907  Service<RandomNumberGenerator> rng;
1908  if (rng.isAvailable()) {
1909  Event ev(*pep, ModuleDescription(), nullptr);
1910  rng->postEventRead(ev);
1911  }
1912 
1913  EventSetupImpl const& es = streamLumiStatus_[iStreamIndex]->eventSetupImpl(esp_->subProcessIndex());
1914  using namespace edm::waiting_task::chain;
1915  chain::first([this, &es, pep, iStreamIndex](auto nextTask) {
1916  EventTransitionInfo info(*pep, es);
1917  schedule_->processOneEventAsync(std::move(nextTask), iStreamIndex, info, serviceToken_);
1918  }) | ifThen(not subProcesses_.empty(), [this, pep, iStreamIndex](auto nextTask) {
1919  for (auto& subProcess : boost::adaptors::reverse(subProcesses_)) {
1920  subProcess.doEventAsync(nextTask, *pep, &streamLumiStatus_[iStreamIndex]->eventSetupImpls());
1921  }
1922  }) | ifThen(looper_, [this, iStreamIndex, pep](auto nextTask) {
1923  //NOTE: behavior change. previously if an exception happened looper was still called. Now it will not be called
1924  ServiceRegistry::Operate operateLooper(serviceToken_);
1925  processEventWithLooper(*pep, iStreamIndex);
1926  }) | then([pep](auto nextTask) {
1927  FDEBUG(1) << "\tprocessEvent\n";
1928  pep->clearEventPrincipal();
1929  }) | runLast(iHolder);
1930  }

References esp_, ev, edm::PrincipalCache::eventPrincipal(), FDEBUG, first, edm::waiting_task::chain::ifThen(), info(), edm::Service< T >::isAvailable(), looper_, eostools::move(), principalCache_, processEventWithLooper(), groupFilesInBlocks::reverse, edm::waiting_task::chain::runLast(), schedule_, serviceToken_, streamLumiStatus_, subProcesses_, and edm::waiting_task::chain::then().

Referenced by processEventAsync().

◆ processEventWithLooper()

void edm::EventProcessor::processEventWithLooper ( EventPrincipal iPrincipal,
unsigned int  iStreamIndex 
)
private

Definition at line 1932 of file EventProcessor.cc.

1932  {
1933  bool randomAccess = input_->randomAccess();
1934  ProcessingController::ForwardState forwardState = input_->forwardState();
1935  ProcessingController::ReverseState reverseState = input_->reverseState();
1936  ProcessingController pc(forwardState, reverseState, randomAccess);
1937 
1939  do {
1940  StreamContext streamContext(iPrincipal.streamID(), &processContext_);
1941  EventSetupImpl const& es = streamLumiStatus_[iStreamIndex]->eventSetupImpl(esp_->subProcessIndex());
1942  status = looper_->doDuringLoop(iPrincipal, es, pc, &streamContext);
1943 
1944  bool succeeded = true;
1945  if (randomAccess) {
1946  if (pc.requestedTransition() == ProcessingController::kToPreviousEvent) {
1947  input_->skipEvents(-2);
1948  } else if (pc.requestedTransition() == ProcessingController::kToSpecifiedEvent) {
1949  succeeded = input_->goToEvent(pc.specifiedEventTransition());
1950  }
1951  }
1952  pc.setLastOperationSucceeded(succeeded);
1953  } while (!pc.lastOperationSucceeded());
1955  shouldWeStop_ = true;
1957  }
1958  }

References esp_, input_, edm::InputSource::IsStop, edm::EDLooperBase::kContinue, edm::ProcessingController::kToPreviousEvent, edm::ProcessingController::kToSpecifiedEvent, edm::ProcessingController::lastOperationSucceeded(), lastSourceTransition_, looper_, processContext_, edm::ProcessingController::requestedTransition(), edm::ProcessingController::setLastOperationSucceeded(), shouldWeStop_, edm::ProcessingController::specifiedEventTransition(), mps_update::status, edm::EventPrincipal::streamID(), streamLumiStatus_, and summarizeEdmComparisonLogfiles::succeeded.

Referenced by processEventAsyncImpl().

◆ processLumis()

InputSource::ItemType edm::EventProcessor::processLumis ( std::shared_ptr< void > const &  iRunResource)

Definition at line 1258 of file EventProcessor.cc.

1258  {
1259  FinalWaitingTask waitTask;
1260  if (streamLumiActive_ > 0) {
1262  // Continue after opening a new input file
1264  } else {
1265  beginLumiAsync(IOVSyncValue(EventID(input_->run(), input_->luminosityBlock(), 0),
1266  input_->luminosityBlockAuxiliary()->beginTime()),
1267  iRunResource,
1268  WaitingTaskHolder{taskGroup_, &waitTask});
1269  }
1270  do {
1271  taskGroup_.wait();
1272  } while (not waitTask.done());
1273 
1274  if (waitTask.exceptionPtr() != nullptr) {
1275  std::rethrow_exception(*(waitTask.exceptionPtr()));
1276  }
1277  return lastTransitionType();
1278  }

References cms::cuda::assert(), beginLumiAsync(), continueLumiAsync(), edm::FinalWaitingTask::done(), edm::WaitingTask::exceptionPtr(), input_, lastTransitionType(), edm::PreallocationConfiguration::numberOfStreams(), preallocations_, streamLumiActive_, and taskGroup_.

◆ readAndMergeLumi()

int edm::EventProcessor::readAndMergeLumi ( LuminosityBlockProcessingStatus iStatus)

Definition at line 1676 of file EventProcessor.cc.

1676  {
1677  auto& lumiPrincipal = *iStatus.lumiPrincipal();
1678  assert(lumiPrincipal.aux().sameIdentity(*input_->luminosityBlockAuxiliary()) or
1679  input_->processHistoryRegistry().reducedProcessHistoryID(lumiPrincipal.aux().processHistoryID()) ==
1680  input_->processHistoryRegistry().reducedProcessHistoryID(
1681  input_->luminosityBlockAuxiliary()->processHistoryID()));
1682  bool lumiOK = lumiPrincipal.adjustToNewProductRegistry(*preg());
1683  assert(lumiOK);
1684  lumiPrincipal.mergeAuxiliary(*input_->luminosityBlockAuxiliary());
1685  {
1686  SendSourceTerminationSignalIfException sentry(actReg_.get());
1687  input_->readAndMergeLumi(*iStatus.lumiPrincipal());
1688  sentry.completedSuccessfully();
1689  }
1690  return input_->luminosityBlock();
1691  }

References actReg_, cms::cuda::assert(), input_, edm::LuminosityBlockProcessingStatus::lumiPrincipal(), or, and preg().

Referenced by readNextEventForStream().

◆ readAndMergeRun()

std::pair< ProcessHistoryID, RunNumber_t > edm::EventProcessor::readAndMergeRun ( )

Definition at line 1645 of file EventProcessor.cc.

1645  {
1646  principalCache_.merge(input_->runAuxiliary(), preg());
1647  auto runPrincipal = principalCache_.runPrincipalPtr();
1648  {
1649  SendSourceTerminationSignalIfException sentry(actReg_.get());
1650  input_->readAndMergeRun(*runPrincipal);
1651  sentry.completedSuccessfully();
1652  }
1653  assert(input_->reducedProcessHistoryID() == runPrincipal->reducedProcessHistoryID());
1654  return std::make_pair(runPrincipal->reducedProcessHistoryID(), input_->run());
1655  }

References actReg_, cms::cuda::assert(), input_, edm::PrincipalCache::merge(), preg(), principalCache_, and edm::PrincipalCache::runPrincipalPtr().

◆ readEvent()

void edm::EventProcessor::readEvent ( unsigned int  iStreamIndex)
private

Definition at line 1885 of file EventProcessor.cc.

1885  {
1886  //TODO this will have to become per stream
1887  auto& event = principalCache_.eventPrincipal(iStreamIndex);
1888  StreamContext streamContext(event.streamID(), &processContext_);
1889 
1890  SendSourceTerminationSignalIfException sentry(actReg_.get());
1891  input_->readEvent(event, streamContext);
1892 
1893  streamLumiStatus_[iStreamIndex]->updateLastTimestamp(input_->timestamp());
1894  sentry.completedSuccessfully();
1895 
1896  FDEBUG(1) << "\treadEvent\n";
1897  }

References actReg_, edm::PrincipalCache::eventPrincipal(), FDEBUG, input_, principalCache_, processContext_, and streamLumiStatus_.

Referenced by readNextEventForStream().

◆ readFile()

void edm::EventProcessor::readFile ( )

◆ readLuminosityBlock()

void edm::EventProcessor::readLuminosityBlock ( LuminosityBlockProcessingStatus iStatus)

Definition at line 1657 of file EventProcessor.cc.

1657  {
1659  throw edm::Exception(edm::errors::LogicError) << "EventProcessor::readLuminosityBlock\n"
1660  << "Illegal attempt to insert lumi into cache\n"
1661  << "Run is invalid\n"
1662  << "Contact a Framework Developer\n";
1663  }
1665  assert(lbp);
1666  lbp->setAux(*input_->luminosityBlockAuxiliary());
1667  {
1668  SendSourceTerminationSignalIfException sentry(actReg_.get());
1669  input_->readLuminosityBlock(*lbp, *historyAppender_);
1670  sentry.completedSuccessfully();
1671  }
1672  lbp->setRunPrincipal(principalCache_.runPrincipalPtr());
1673  iStatus.lumiPrincipal() = std::move(lbp);
1674  }

References actReg_, cms::cuda::assert(), Exception, edm::PrincipalCache::getAvailableLumiPrincipalPtr(), edm::PrincipalCache::hasRunPrincipal(), historyAppender_, input_, edm::errors::LogicError, edm::LuminosityBlockProcessingStatus::lumiPrincipal(), eostools::move(), principalCache_, and edm::PrincipalCache::runPrincipalPtr().

Referenced by beginLumiAsync().

◆ readNextEventForStream()

bool edm::EventProcessor::readNextEventForStream ( unsigned int  iStreamIndex,
LuminosityBlockProcessingStatus iLumiStatus 
)
private

Definition at line 1766 of file EventProcessor.cc.

1766  {
1767  if (deferredExceptionPtrIsSet_.load(std::memory_order_acquire)) {
1768  iStatus.endLumi();
1769  return false;
1770  }
1771 
1772  if (iStatus.wasEventProcessingStopped()) {
1773  return false;
1774  }
1775 
1776  if (shouldWeStop()) {
1778  iStatus.stopProcessingEvents();
1779  iStatus.endLumi();
1780  return false;
1781  }
1782 
1784  // Caught exception is propagated to EventProcessor::runToCompletion() via deferredExceptionPtr_
1785  CMS_SA_ALLOW try {
1786  //need to use lock in addition to the serial task queue because
1787  // of delayed provenance reading and reading data in response to
1788  // edm::Refs etc
1789  std::lock_guard<std::recursive_mutex> guard(*(sourceMutex_.get()));
1790 
1791  auto itemType = iStatus.continuingLumi() ? InputSource::IsLumi : nextTransitionType();
1792  if (InputSource::IsLumi == itemType) {
1793  iStatus.haveContinuedLumi();
1794  while (itemType == InputSource::IsLumi and iStatus.lumiPrincipal()->run() == input_->run() and
1795  iStatus.lumiPrincipal()->luminosityBlock() == nextLuminosityBlockID()) {
1796  readAndMergeLumi(iStatus);
1797  itemType = nextTransitionType();
1798  }
1799  if (InputSource::IsLumi == itemType) {
1800  iStatus.setNextSyncValue(IOVSyncValue(EventID(input_->run(), input_->luminosityBlock(), 0),
1801  input_->luminosityBlockAuxiliary()->beginTime()));
1802  }
1803  }
1804  if (InputSource::IsEvent != itemType) {
1805  iStatus.stopProcessingEvents();
1806 
1807  //IsFile may continue processing the lumi and
1808  // looper_ can cause the input source to declare a new IsRun which is actually
1809  // just a continuation of the previous run
1810  if (InputSource::IsStop == itemType or InputSource::IsLumi == itemType or
1811  (InputSource::IsRun == itemType and iStatus.lumiPrincipal()->run() != input_->run())) {
1812  iStatus.endLumi();
1813  }
1814  return false;
1815  }
1816  readEvent(iStreamIndex);
1817  } catch (...) {
1818  bool expected = false;
1819  if (deferredExceptionPtrIsSet_.compare_exchange_strong(expected, true)) {
1820  deferredExceptionPtr_ = std::current_exception();
1821  iStatus.endLumi();
1822  }
1823  return false;
1824  }
1825  return true;
1826  }

References CMS_SA_ALLOW, edm::LuminosityBlockProcessingStatus::continuingLumi(), deferredExceptionPtr_, deferredExceptionPtrIsSet_, edm::LuminosityBlockProcessingStatus::endLumi(), edm::LuminosityBlockProcessingStatus::haveContinuedLumi(), input_, edm::InputSource::IsEvent, edm::InputSource::IsLumi, edm::InputSource::IsRun, edm::InputSource::IsStop, lastSourceTransition_, edm::LuminosityBlockProcessingStatus::lumiPrincipal(), nextLuminosityBlockID(), nextTransitionType(), or, readAndMergeLumi(), readEvent(), serviceToken_, edm::LuminosityBlockProcessingStatus::setNextSyncValue(), shouldWeStop(), sourceMutex_, edm::LuminosityBlockProcessingStatus::stopProcessingEvents(), and edm::LuminosityBlockProcessingStatus::wasEventProcessingStopped().

Referenced by handleNextEventForStreamAsync().

◆ readProcessBlock()

void edm::EventProcessor::readProcessBlock ( ProcessBlockPrincipal processBlockPrincipal)

Definition at line 1616 of file EventProcessor.cc.

1616  {
1617  SendSourceTerminationSignalIfException sentry(actReg_.get());
1618  input_->readProcessBlock(processBlockPrincipal);
1619  sentry.completedSuccessfully();
1620  }

References actReg_, and input_.

Referenced by inputProcessBlocks().

◆ readRun()

std::pair< ProcessHistoryID, RunNumber_t > edm::EventProcessor::readRun ( )

Definition at line 1622 of file EventProcessor.cc.

1622  {
1624  throw edm::Exception(edm::errors::LogicError) << "EventProcessor::readRun\n"
1625  << "Illegal attempt to insert run into cache\n"
1626  << "Contact a Framework Developer\n";
1627  }
1628  auto rp = std::make_shared<RunPrincipal>(input_->runAuxiliary(),
1629  preg(),
1631  historyAppender_.get(),
1632  0,
1633  true,
1635  {
1636  SendSourceTerminationSignalIfException sentry(actReg_.get());
1637  input_->readRun(*rp, *historyAppender_);
1638  sentry.completedSuccessfully();
1639  }
1640  assert(input_->reducedProcessHistoryID() == rp->reducedProcessHistoryID());
1641  principalCache_.insert(rp);
1642  return std::make_pair(rp->reducedProcessHistoryID(), input_->run());
1643  }

References actReg_, cms::cuda::assert(), Exception, edm::PrincipalCache::hasRunPrincipal(), historyAppender_, input_, edm::PrincipalCache::insert(), edm::errors::LogicError, mergeableRunProductProcesses_, preg(), principalCache_, and processConfiguration_.

◆ respondToCloseInputFile()

void edm::EventProcessor::respondToCloseInputFile ( )

Definition at line 910 of file EventProcessor.cc.

910  {
911  if (fileBlockValid()) {
912  schedule_->respondToCloseInputFile(*fb_);
913  for_all(subProcesses_, [this](auto& subProcess) { subProcess.respondToCloseInputFile(*fb_); });
914  }
915  FDEBUG(1) << "\trespondToCloseInputFile\n";
916  }

References fb_, FDEBUG, fileBlockValid(), edm::for_all(), schedule_, and subProcesses_.

◆ respondToOpenInputFile()

void edm::EventProcessor::respondToOpenInputFile ( )

Definition at line 900 of file EventProcessor.cc.

900  {
901  if (fileBlockValid()) {
903  [this](auto& subProcess) { subProcess.updateBranchIDListHelper(branchIDListHelper_->branchIDLists()); });
904  schedule_->respondToOpenInputFile(*fb_);
905  for_all(subProcesses_, [this](auto& subProcess) { subProcess.respondToOpenInputFile(*fb_); });
906  }
907  FDEBUG(1) << "\trespondToOpenInputFile\n";
908  }

References branchIDListHelper_, fb_, FDEBUG, fileBlockValid(), edm::for_all(), schedule_, and subProcesses_.

◆ rewindInput()

void edm::EventProcessor::rewindInput ( )

Definition at line 943 of file EventProcessor.cc.

943  {
944  input_->repeat();
945  input_->rewind();
946  FDEBUG(1) << "\trewind\n";
947  }

References FDEBUG, and input_.

Referenced by runToCompletion().

◆ run()

EventProcessor::StatusCode edm::EventProcessor::run ( )
inline

◆ runToCompletion()

EventProcessor::StatusCode edm::EventProcessor::runToCompletion ( )

Definition at line 789 of file EventProcessor.cc.

789  {
792  {
793  beginJob(); //make sure this was called
794 
795  // make the services available
797 
799  try {
800  FilesProcessor fp(fileModeNoMerge_);
801 
802  convertException::wrap([&]() {
803  bool firstTime = true;
804  do {
805  if (not firstTime) {
807  rewindInput();
808  } else {
809  firstTime = false;
810  }
811  startingNewLoop();
812 
813  auto trans = fp.processFiles(*this);
814 
815  fp.normalEnd();
816 
817  if (deferredExceptionPtrIsSet_.load()) {
818  std::rethrow_exception(deferredExceptionPtr_);
819  }
820  if (trans != InputSource::IsStop) {
821  //problem with the source
822  doErrorStuff();
823 
824  throw cms::Exception("BadTransition") << "Unexpected transition change " << trans;
825  }
826  } while (not endOfLoop());
827  }); // convertException::wrap
828 
829  } // Try block
830  catch (cms::Exception& e) {
832  std::string message(
833  "Another exception was caught while trying to clean up lumis after the primary fatal exception.");
834  e.addAdditionalInfo(message);
835  if (e.alreadyPrinted()) {
836  LogAbsolute("Additional Exceptions") << message;
837  }
838  }
839  if (!exceptionMessageRuns_.empty()) {
840  e.addAdditionalInfo(exceptionMessageRuns_);
841  if (e.alreadyPrinted()) {
842  LogAbsolute("Additional Exceptions") << exceptionMessageRuns_;
843  }
844  }
845  if (!exceptionMessageFiles_.empty()) {
846  e.addAdditionalInfo(exceptionMessageFiles_);
847  if (e.alreadyPrinted()) {
848  LogAbsolute("Additional Exceptions") << exceptionMessageFiles_;
849  }
850  }
851  throw;
852  }
853  }
854 
855  return returnCode;
856  }

References asyncStopRequestedWhileProcessingEvents_, asyncStopStatusCodeFromProcessingEvents_, beginJob(), deferredExceptionPtr_, deferredExceptionPtrIsSet_, doErrorStuff(), MillePedeFileConverter_cfg::e, endOfLoop(), epSuccess, Exception, exceptionMessageFiles_, exceptionMessageLumis_, exceptionMessageRuns_, fileModeNoMerge_, personalPlayback::fp, edm::InputSource::IsStop, prepareForNextLoop(), runEdmFileComparison::returnCode, rewindInput(), serviceToken_, startingNewLoop(), AlCaHLTBitMon_QueryRunRegistry::string, and edm::convertException::wrap().

Referenced by PythonEventProcessor::run(), and run().

◆ setDeferredException()

bool edm::EventProcessor::setDeferredException ( std::exception_ptr  iException)

Definition at line 1981 of file EventProcessor.cc.

1981  {
1982  bool expected = false;
1983  if (deferredExceptionPtrIsSet_.compare_exchange_strong(expected, true)) {
1984  deferredExceptionPtr_ = iException;
1985  return true;
1986  }
1987  return false;
1988  }

References deferredExceptionPtr_, and deferredExceptionPtrIsSet_.

Referenced by handleEndLumiExceptions().

◆ setExceptionMessageFiles()

void edm::EventProcessor::setExceptionMessageFiles ( std::string &  message)

Definition at line 1975 of file EventProcessor.cc.

1975 { exceptionMessageFiles_ = message; }

References exceptionMessageFiles_.

◆ setExceptionMessageLumis()

void edm::EventProcessor::setExceptionMessageLumis ( )

Definition at line 1979 of file EventProcessor.cc.

1979 { exceptionMessageLumis_ = true; }

References exceptionMessageLumis_.

Referenced by handleEndLumiExceptions().

◆ setExceptionMessageRuns()

void edm::EventProcessor::setExceptionMessageRuns ( std::string &  message)

Definition at line 1977 of file EventProcessor.cc.

1977 { exceptionMessageRuns_ = message; }

References exceptionMessageRuns_.

◆ shouldWeCloseOutput()

bool edm::EventProcessor::shouldWeCloseOutput ( ) const

Definition at line 954 of file EventProcessor.cc.

954  {
955  FDEBUG(1) << "\tshouldWeCloseOutput\n";
956  if (!subProcesses_.empty()) {
957  for (auto const& subProcess : subProcesses_) {
958  if (subProcess.shouldWeCloseOutput()) {
959  return true;
960  }
961  }
962  return false;
963  }
964  return schedule_->shouldWeCloseOutput();
965  }

References FDEBUG, schedule_, and subProcesses_.

◆ shouldWeStop()

bool edm::EventProcessor::shouldWeStop ( ) const

Definition at line 1960 of file EventProcessor.cc.

1960  {
1961  FDEBUG(1) << "\tshouldWeStop\n";
1962  if (shouldWeStop_)
1963  return true;
1964  if (!subProcesses_.empty()) {
1965  for (auto const& subProcess : subProcesses_) {
1966  if (subProcess.terminate()) {
1967  return true;
1968  }
1969  }
1970  return false;
1971  }
1972  return schedule_->terminate();
1973  }

References FDEBUG, schedule_, shouldWeStop_, and subProcesses_.

Referenced by readNextEventForStream().

◆ startingNewLoop()

void edm::EventProcessor::startingNewLoop ( )

Definition at line 918 of file EventProcessor.cc.

918  {
919  shouldWeStop_ = false;
920  //NOTE: for first loop, need to delay calling 'doStartingNewLoop'
921  // until after we've called beginOfJob
922  if (looper_ && looperBeginJobRun_) {
923  looper_->doStartingNewLoop();
924  }
925  FDEBUG(1) << "\tstartingNewLoop\n";
926  }

References FDEBUG, looper_, looperBeginJobRun_, and shouldWeStop_.

Referenced by runToCompletion().

◆ streamEndLumiAsync()

void edm::EventProcessor::streamEndLumiAsync ( edm::WaitingTaskHolder  iTask,
unsigned int  iStreamIndex 
)

Definition at line 1551 of file EventProcessor.cc.

1551  {
1552  auto t = edm::make_waiting_task([this, iStreamIndex, iTask](std::exception_ptr const* iPtr) mutable {
1553  if (iPtr) {
1554  handleEndLumiExceptions(iPtr, iTask);
1555  }
1556  auto status = streamLumiStatus_[iStreamIndex];
1557  //reset status before releasing queue else get race condtion
1558  streamLumiStatus_[iStreamIndex].reset();
1560  streamQueues_[iStreamIndex].resume();
1561 
1562  //are we the last one?
1563  if (status->streamFinishedLumi()) {
1565  }
1566  });
1567 
1568  edm::WaitingTaskHolder lumiDoneTask{*iTask.group(), t};
1569 
1570  //Need to be sure the lumi status is released before lumiDoneTask can every be called.
1571  // therefore we do not want to hold the shared_ptr
1572  auto lumiStatus = streamLumiStatus_[iStreamIndex].get();
1573  lumiStatus->setEndTime();
1574 
1575  EventSetupImpl const& es = lumiStatus->eventSetupImpl(esp_->subProcessIndex());
1576 
1577  bool cleaningUpAfterException = lumiStatus->cleaningUpAfterException();
1578  auto eventSetupImpls = &lumiStatus->eventSetupImpls();
1579 
1580  if (lumiStatus->didGlobalBeginSucceed()) {
1581  auto& lumiPrincipal = *lumiStatus->lumiPrincipal();
1582  IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), EventID::maxEventNumber()),
1583  lumiPrincipal.endTime());
1584  using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamEnd>;
1585  LumiTransitionInfo transitionInfo(lumiPrincipal, es, eventSetupImpls);
1586  endStreamTransitionAsync<Traits>(std::move(lumiDoneTask),
1587  *schedule_,
1588  iStreamIndex,
1589  transitionInfo,
1590  serviceToken_,
1591  subProcesses_,
1592  cleaningUpAfterException);
1593  }
1594  }

References esp_, globalEndLumiAsync(), edm::WaitingTaskHolder::group(), handleEndLumiExceptions(), edm::make_waiting_task(), edm::EventID::maxEventNumber(), eostools::move(), schedule_, serviceToken_, mps_update::status, streamLumiActive_, streamLumiStatus_, streamQueues_, subProcesses_, and submitPVValidationJobs::t.

Referenced by beginLumiAsync(), endUnfinishedLumi(), and handleNextEventForStreamAsync().

◆ taskCleanup()

void edm::EventProcessor::taskCleanup ( )

Definition at line 578 of file EventProcessor.cc.

578  {
581  taskGroup_.wait();
582  assert(task.done());
583  }

References cms::cuda::assert(), espController_, TrackValidation_cff::task, and taskGroup_.

◆ thinnedAssociationsHelper() [1/2]

std::shared_ptr<ThinnedAssociationsHelper>& edm::EventProcessor::thinnedAssociationsHelper ( )
inlineprivate

Definition at line 302 of file EventProcessor.h.

302  {
304  }

References edm::get_underlying_safe(), and thinnedAssociationsHelper_.

◆ thinnedAssociationsHelper() [2/2]

std::shared_ptr<ThinnedAssociationsHelper const> edm::EventProcessor::thinnedAssociationsHelper ( ) const
inlineprivate

Definition at line 299 of file EventProcessor.h.

299  {
301  }

References edm::get_underlying_safe(), and thinnedAssociationsHelper_.

Referenced by init().

◆ totalEvents()

int edm::EventProcessor::totalEvents ( ) const

Return the number of events this EventProcessor has tried to process (inclues both successes and failures, including failures due to exceptions during processing).

Definition at line 730 of file EventProcessor.cc.

730 { return schedule_->totalEvents(); }

References schedule_.

Referenced by PythonEventProcessor::totalEvents().

◆ totalEventsFailed()

int edm::EventProcessor::totalEventsFailed ( ) const

Return the number of events that have not passed any trigger. (N.B. totalEventsFailed() + totalEventsPassed() == totalEvents()

Definition at line 734 of file EventProcessor.cc.

734 { return schedule_->totalEventsFailed(); }

References schedule_.

Referenced by PythonEventProcessor::totalEventsFailed().

◆ totalEventsPassed()

int edm::EventProcessor::totalEventsPassed ( ) const

Return the number of events processed by this EventProcessor which have been passed by one or more trigger paths.

Definition at line 732 of file EventProcessor.cc.

732 { return schedule_->totalEventsPassed(); }

References schedule_.

Referenced by PythonEventProcessor::totalEventsPassed().

◆ warnAboutLegacyModules()

void edm::EventProcessor::warnAboutLegacyModules ( ) const
private

Definition at line 2003 of file EventProcessor.cc.

2003  {
2004  std::unique_ptr<LogSystem> s;
2005  for (auto worker : schedule_->allWorkers()) {
2006  if (worker->moduleConcurrencyType() == Worker::kLegacy) {
2007  if (not s) {
2008  s = std::make_unique<LogSystem>("LegacyModules");
2009  (*s) << "The following legacy modules are configured. Support for legacy modules\n"
2010  "is going to end soon. These modules need to be converted to have type\n"
2011  "edm::global, edm::stream, edm::one, or in rare cases edm::limited.";
2012  }
2013  (*s) << "\n " << worker->description()->moduleName() << " " << worker->description()->moduleLabel();
2014  }
2015  }
2016  }

References edm::Worker::kLegacy, alignCSCRings::s, and schedule_.

Referenced by beginJob().

◆ warnAboutModulesRequiringLuminosityBLockSynchronization()

void edm::EventProcessor::warnAboutModulesRequiringLuminosityBLockSynchronization ( ) const
private

Definition at line 1990 of file EventProcessor.cc.

1990  {
1991  std::unique_ptr<LogSystem> s;
1992  for (auto worker : schedule_->allWorkers()) {
1993  if (worker->wantsGlobalLuminosityBlocks() and worker->globalLuminosityBlocksQueue()) {
1994  if (not s) {
1995  s = std::make_unique<LogSystem>("ModulesSynchingOnLumis");
1996  (*s) << "The following modules require synchronizing on LuminosityBlock boundaries:";
1997  }
1998  (*s) << "\n " << worker->description()->moduleName() << " " << worker->description()->moduleLabel();
1999  }
2000  }
2001  }

References alignCSCRings::s, and schedule_.

Referenced by beginJob().

◆ writeLumiAsync()

void edm::EventProcessor::writeLumiAsync ( WaitingTaskHolder  task,
LuminosityBlockPrincipal lumiPrincipal 
)

Definition at line 1740 of file EventProcessor.cc.

1740  {
1741  auto subsT = edm::make_waiting_task([this, task, &lumiPrincipal](std::exception_ptr const* iExcept) mutable {
1742  if (iExcept) {
1743  task.doneWaiting(*iExcept);
1744  } else {
1746  for (auto& s : subProcesses_) {
1747  s.writeLumiAsync(task, lumiPrincipal);
1748  }
1749  }
1750  });
1752 
1753  lumiPrincipal.runPrincipal().mergeableRunProductMetadata()->writeLumi(lumiPrincipal.luminosityBlock());
1754 
1755  schedule_->writeLumiAsync(WaitingTaskHolder{*task.group(), subsT}, lumiPrincipal, &processContext_, actReg_.get());
1756  }

References actReg_, edm::LuminosityBlockPrincipal::luminosityBlock(), edm::make_waiting_task(), edm::RunPrincipal::mergeableRunProductMetadata(), processContext_, edm::LuminosityBlockPrincipal::runPrincipal(), alignCSCRings::s, schedule_, serviceToken_, subProcesses_, TrackValidation_cff::task, and edm::MergeableRunProductMetadata::writeLumi().

Referenced by globalEndLumiAsync().

◆ writeProcessBlockAsync()

void edm::EventProcessor::writeProcessBlockAsync ( WaitingTaskHolder  task,
ProcessBlockType  processBlockType 
)

Definition at line 1693 of file EventProcessor.cc.

1693  {
1694  auto subsT = edm::make_waiting_task([this, task, processBlockType](std::exception_ptr const* iExcept) mutable {
1695  if (iExcept) {
1696  task.doneWaiting(*iExcept);
1697  } else {
1699  for (auto& s : subProcesses_) {
1700  s.writeProcessBlockAsync(task, processBlockType);
1701  }
1702  }
1703  });
1705  schedule_->writeProcessBlockAsync(WaitingTaskHolder(*task.group(), subsT),
1706  principalCache_.processBlockPrincipal(processBlockType),
1707  &processContext_,
1708  actReg_.get());
1709  }

References actReg_, edm::make_waiting_task(), principalCache_, edm::PrincipalCache::processBlockPrincipal(), processContext_, alignCSCRings::s, schedule_, serviceToken_, subProcesses_, and TrackValidation_cff::task.

Referenced by endProcessBlock(), and inputProcessBlocks().

◆ writeRunAsync()

void edm::EventProcessor::writeRunAsync ( WaitingTaskHolder  task,
ProcessHistoryID const &  phid,
RunNumber_t  run,
MergeableRunProductMetadata const *  mergeableRunProductMetadata 
)

Definition at line 1711 of file EventProcessor.cc.

1714  {
1715  auto subsT = edm::make_waiting_task(
1716  [this, phid, run, task, mergeableRunProductMetadata](std::exception_ptr const* iExcept) mutable {
1717  if (iExcept) {
1718  task.doneWaiting(*iExcept);
1719  } else {
1721  for (auto& s : subProcesses_) {
1722  s.writeRunAsync(task, phid, run, mergeableRunProductMetadata);
1723  }
1724  }
1725  });
1727  schedule_->writeRunAsync(WaitingTaskHolder(*task.group(), subsT),
1729  &processContext_,
1730  actReg_.get(),
1731  mergeableRunProductMetadata);
1732  }

References actReg_, edm::make_waiting_task(), principalCache_, processContext_, run(), edm::PrincipalCache::runPrincipal(), alignCSCRings::s, schedule_, serviceToken_, subProcesses_, and TrackValidation_cff::task.

Referenced by endUnfinishedRun().

Member Data Documentation

◆ act_table_

std::unique_ptr<ExceptionToActionTable const> edm::EventProcessor::act_table_
private

Definition at line 331 of file EventProcessor.h.

Referenced by init().

◆ actReg_

std::shared_ptr<ActivityRegistry> edm::EventProcessor::actReg_
private

◆ asyncStopRequestedWhileProcessingEvents_

bool edm::EventProcessor::asyncStopRequestedWhileProcessingEvents_
private

Definition at line 367 of file EventProcessor.h.

Referenced by runToCompletion().

◆ asyncStopStatusCodeFromProcessingEvents_

StatusCode edm::EventProcessor::asyncStopStatusCodeFromProcessingEvents_
private

Definition at line 368 of file EventProcessor.h.

Referenced by runToCompletion().

◆ beginJobCalled_

bool edm::EventProcessor::beginJobCalled_
private

Definition at line 355 of file EventProcessor.h.

Referenced by beginJob().

◆ branchIDListHelper_

edm::propagate_const<std::shared_ptr<BranchIDListHelper> > edm::EventProcessor::branchIDListHelper_
private

Definition at line 322 of file EventProcessor.h.

Referenced by branchIDListHelper(), init(), and respondToOpenInputFile().

◆ deferredExceptionPtr_

std::exception_ptr edm::EventProcessor::deferredExceptionPtr_
private

◆ deferredExceptionPtrIsSet_

std::atomic<bool> edm::EventProcessor::deferredExceptionPtrIsSet_
private

◆ deleteNonConsumedUnscheduledModules_

bool edm::EventProcessor::deleteNonConsumedUnscheduledModules_ = true
private

Definition at line 376 of file EventProcessor.h.

Referenced by beginJob(), and init().

◆ esp_

edm::propagate_const<std::shared_ptr<eventsetup::EventSetupProvider> > edm::EventProcessor::esp_
private

◆ espController_

edm::propagate_const<std::unique_ptr<eventsetup::EventSetupsController> > edm::EventProcessor::espController_
private

◆ eventSetupDataToExcludeFromPrefetching_

ExcludedDataMap edm::EventProcessor::eventSetupDataToExcludeFromPrefetching_
private

Definition at line 373 of file EventProcessor.h.

◆ exceptionMessageFiles_

std::string edm::EventProcessor::exceptionMessageFiles_
private

Definition at line 358 of file EventProcessor.h.

Referenced by runToCompletion(), and setExceptionMessageFiles().

◆ exceptionMessageLumis_

std::atomic<bool> edm::EventProcessor::exceptionMessageLumis_
private

Definition at line 360 of file EventProcessor.h.

Referenced by runToCompletion(), and setExceptionMessageLumis().

◆ exceptionMessageRuns_

std::string edm::EventProcessor::exceptionMessageRuns_
private

Definition at line 359 of file EventProcessor.h.

Referenced by runToCompletion(), and setExceptionMessageRuns().

◆ fb_

edm::propagate_const<std::shared_ptr<FileBlock> > edm::EventProcessor::fb_
private

◆ fileModeNoMerge_

bool edm::EventProcessor::fileModeNoMerge_
private

Definition at line 357 of file EventProcessor.h.

Referenced by init(), and runToCompletion().

◆ firstEventInBlock_

bool edm::EventProcessor::firstEventInBlock_ = true
private

Definition at line 369 of file EventProcessor.h.

◆ forceESCacheClearOnNewRun_

bool edm::EventProcessor::forceESCacheClearOnNewRun_
private

Definition at line 363 of file EventProcessor.h.

Referenced by beginRun(), and init().

◆ forceLooperToEnd_

bool edm::EventProcessor::forceLooperToEnd_
private

Definition at line 361 of file EventProcessor.h.

Referenced by endOfLoop().

◆ historyAppender_

edm::propagate_const<std::unique_ptr<HistoryAppender> > edm::EventProcessor::historyAppender_
private

Definition at line 343 of file EventProcessor.h.

Referenced by init(), readLuminosityBlock(), and readRun().

◆ input_

edm::propagate_const<std::unique_ptr<InputSource> > edm::EventProcessor::input_
private

◆ lastSourceTransition_

InputSource::ItemType edm::EventProcessor::lastSourceTransition_
private

◆ looper_

edm::propagate_const<std::shared_ptr<EDLooperBase> > edm::EventProcessor::looper_
private

◆ looperBeginJobRun_

bool edm::EventProcessor::looperBeginJobRun_
private

Definition at line 362 of file EventProcessor.h.

Referenced by beginRun(), and startingNewLoop().

◆ lumiQueue_

std::unique_ptr<edm::LimitedTaskQueue> edm::EventProcessor::lumiQueue_
private

Definition at line 338 of file EventProcessor.h.

Referenced by beginLumiAsync(), and init().

◆ mergeableRunProductProcesses_

MergeableRunProductProcesses edm::EventProcessor::mergeableRunProductProcesses_
private

Definition at line 335 of file EventProcessor.h.

Referenced by init(), and readRun().

◆ pathsAndConsumesOfModules_

PathsAndConsumesOfModules edm::EventProcessor::pathsAndConsumesOfModules_
private

Definition at line 334 of file EventProcessor.h.

Referenced by beginJob().

◆ preallocations_

PreallocationConfiguration edm::EventProcessor::preallocations_
private

◆ preg_

edm::propagate_const<std::shared_ptr<ProductRegistry> > edm::EventProcessor::preg_
private

Definition at line 321 of file EventProcessor.h.

Referenced by beginJob(), endOfLoop(), init(), preg(), and readFile().

◆ principalCache_

PrincipalCache edm::EventProcessor::principalCache_
private

◆ printDependencies_

bool edm::EventProcessor::printDependencies_ = false
private

Definition at line 375 of file EventProcessor.h.

Referenced by beginJob(), and init().

◆ processBlockHelper_

edm::propagate_const<std::shared_ptr<ProcessBlockHelper> > edm::EventProcessor::processBlockHelper_
private

Definition at line 323 of file EventProcessor.h.

Referenced by beginJob(), closeOutputFiles(), and init().

◆ processConfiguration_

std::shared_ptr<ProcessConfiguration const> edm::EventProcessor::processConfiguration_
private

Definition at line 332 of file EventProcessor.h.

Referenced by beginJob(), beginProcessBlock(), init(), processConfiguration(), and readRun().

◆ processContext_

ProcessContext edm::EventProcessor::processContext_
private

◆ queueWhichWaitsForIOVsToFinish_

edm::SerialTaskQueue edm::EventProcessor::queueWhichWaitsForIOVsToFinish_
private

Definition at line 330 of file EventProcessor.h.

Referenced by beginLumiAsync(), and globalEndLumiAsync().

◆ schedule_

edm::propagate_const<std::unique_ptr<Schedule> > edm::EventProcessor::schedule_
private

◆ serviceToken_

ServiceToken edm::EventProcessor::serviceToken_
private

◆ shouldWeStop_

bool edm::EventProcessor::shouldWeStop_
private

Definition at line 356 of file EventProcessor.h.

Referenced by processEventWithLooper(), shouldWeStop(), and startingNewLoop().

◆ sourceMutex_

std::shared_ptr<std::recursive_mutex> edm::EventProcessor::sourceMutex_
private

Definition at line 353 of file EventProcessor.h.

Referenced by readNextEventForStream().

◆ sourceResourcesAcquirer_

SharedResourcesAcquirer edm::EventProcessor::sourceResourcesAcquirer_
private

Definition at line 352 of file EventProcessor.h.

Referenced by beginLumiAsync(), and handleNextEventForStreamAsync().

◆ streamLumiActive_

std::atomic<unsigned int> edm::EventProcessor::streamLumiActive_ {0}
private

◆ streamLumiStatus_

std::vector<std::shared_ptr<LuminosityBlockProcessingStatus> > edm::EventProcessor::streamLumiStatus_
private

◆ streamQueues_

std::vector<edm::SerialTaskQueue> edm::EventProcessor::streamQueues_
private

Definition at line 337 of file EventProcessor.h.

Referenced by beginLumiAsync(), init(), and streamEndLumiAsync().

◆ subProcesses_

std::vector<SubProcess> edm::EventProcessor::subProcesses_
private

◆ taskGroup_

tbb::task_group edm::EventProcessor::taskGroup_
private

◆ thinnedAssociationsHelper_

edm::propagate_const<std::shared_ptr<ThinnedAssociationsHelper> > edm::EventProcessor::thinnedAssociationsHelper_
private

Definition at line 324 of file EventProcessor.h.

Referenced by init(), and thinnedAssociationsHelper().

edm::EventProcessor::looperBeginJobRun_
bool looperBeginJobRun_
Definition: EventProcessor.h:362
edm::SharedResourcesAcquirer::serialQueueChain
SerialTaskQueueChain & serialQueueChain() const
Definition: SharedResourcesAcquirer.h:54
edm::pset::Registry::instance
static Registry * instance()
Definition: Registry.cc:12
edm::EventProcessor::deleteRunFromCache
void deleteRunFromCache(ProcessHistoryID const &phid, RunNumber_t run)
Definition: EventProcessor.cc:1734
edm::EDLooperBase::Status
Status
Definition: EDLooperBase.h:82
edm::EventProcessor::historyAppender_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
Definition: EventProcessor.h:343
bk::beginJob
void beginJob()
Definition: Breakpoints.cc:14
edm::EventProcessor::readProcessBlock
void readProcessBlock(ProcessBlockPrincipal &)
Definition: EventProcessor.cc:1616
edm::EventProcessor::thinnedAssociationsHelper
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
Definition: EventProcessor.h:299
processOptions_cff.fileMode
fileMode
Definition: processOptions_cff.py:5
edm::EventProcessor::rewindInput
void rewindInput()
Definition: EventProcessor.cc:943
edm::EventProcessor::eventSetupDataToExcludeFromPrefetching_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
Definition: EventProcessor.h:373
edm::PrincipalCache::ProcessBlockType::New
mps_fire.i
i
Definition: mps_fire.py:428
edm::PreallocationConfiguration::numberOfRuns
unsigned int numberOfRuns() const
Definition: PreallocationConfiguration.h:37
edm::EventProcessor::preg_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
Definition: EventProcessor.h:321
edm::popSubProcessVParameterSet
std::vector< ParameterSet > popSubProcessVParameterSet(ParameterSet &parameterSet)
Definition: SubProcess.cc:835
edm::EventProcessor::writeRunAsync
void writeRunAsync(WaitingTaskHolder, ProcessHistoryID const &phid, RunNumber_t run, MergeableRunProductMetadata const *)
Definition: EventProcessor.cc:1711
edm::EventProcessor::StatusCode
StatusCode
Definition: EventProcessor.h:76
edm::EventProcessor::processBlockHelper_
edm::propagate_const< std::shared_ptr< ProcessBlockHelper > > processBlockHelper_
Definition: EventProcessor.h:323
edm::EventProcessor::endOfLoop
bool endOfLoop()
Definition: EventProcessor.cc:928
edm::EventProcessor::fb_
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
Definition: EventProcessor.h:345
cms::Exception::addContext
void addContext(std::string const &context)
Definition: Exception.cc:165
edm::EventProcessor::input_
edm::propagate_const< std::unique_ptr< InputSource > > input_
Definition: EventProcessor.h:326
edm::EventProcessor::getToken
ServiceToken getToken()
Definition: EventProcessor.cc:724
edm::EventProcessor::startingNewLoop
void startingNewLoop()
Definition: EventProcessor.cc:918
edm::ParameterSet::getUntrackedParameterSet
ParameterSet getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
Definition: ParameterSet.cc:2136
edm::errors::LogicError
Definition: EDMException.h:37
edm::validateTopLevelParameterSets
void validateTopLevelParameterSets(ParameterSet *processParameterSet)
Definition: validateTopLevelParameterSets.cc:115
mps_update.status
status
Definition: mps_update.py:68
edm::PreallocationConfiguration::numberOfThreads
unsigned int numberOfThreads() const
Definition: PreallocationConfiguration.h:34
edm::EventProcessor::deleteLumiFromCache
void deleteLumiFromCache(LuminosityBlockProcessingStatus &)
Definition: EventProcessor.cc:1758
edm::Transition::Event
edm::EventProcessor::warnAboutLegacyModules
void warnAboutLegacyModules() const
Definition: EventProcessor.cc:2003
edm::SerialTaskQueue::resume
bool resume()
Resumes processing if the queue was paused.
Definition: SerialTaskQueue.cc:58
edmLumisInFiles.description
description
Definition: edmLumisInFiles.py:11
edm::PrincipalCache::setProcessHistoryRegistry
void setProcessHistoryRegistry(ProcessHistoryRegistry const &phr)
Definition: PrincipalCache.h:87
edm::eventsetup::synchronousEventSetupForInstance
void synchronousEventSetupForInstance(IOVSyncValue const &syncValue, tbb::task_group &iGroup, eventsetup::EventSetupsController &espController)
Definition: EventSetupsController.cc:412
edm::TerminationOrigin::ExternalSignal
edm::EventProcessor::globalEndLumiAsync
void globalEndLumiAsync(edm::WaitingTaskHolder iTask, std::shared_ptr< LuminosityBlockProcessingStatus > iLumiStatus)
Definition: EventProcessor.cc:1478
edm::ParentageRegistry::instance
static ParentageRegistry * instance()
Definition: ParentageRegistry.cc:5
edm::EventProcessor::runToCompletion
StatusCode runToCompletion()
Definition: EventProcessor.cc:789
edm::EventProcessor::esp_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
Definition: EventProcessor.h:329
edm::EventProcessor::shouldWeStop_
bool shouldWeStop_
Definition: EventProcessor.h:356
edm::EventProcessor::readEvent
void readEvent(unsigned int iStreamIndex)
Definition: EventProcessor.cc:1885
edm::EventProcessor::epTimedOut
Definition: EventProcessor.h:82
cms::cuda::assert
assert(be >=bs)
edm::ParentageRegistry::clear
void clear()
Not thread safe.
Definition: ParentageRegistry.cc:29
edm::second
U second(std::pair< T, U > const &p)
Definition: ParameterSet.cc:222
edm::EventProcessor::deferredExceptionPtr_
std::exception_ptr deferredExceptionPtr_
Definition: EventProcessor.h:350
info
static const TGPicture * info(bool iBackgroundIsBlack)
Definition: FWCollectionSummaryWidget.cc:153
personalPlayback.fp
fp
Definition: personalPlayback.py:523
edm::dumpOptionsToLogFile
void dumpOptionsToLogFile(unsigned int nThreads, unsigned int nStreams, unsigned int nConcurrentLumis, unsigned int nConcurrentRuns)
Definition: validateTopLevelParameterSets.cc:156
edm::EventProcessor::lumiQueue_
std::unique_ptr< edm::LimitedTaskQueue > lumiQueue_
Definition: EventProcessor.h:338
edm::get_underlying_safe
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
Definition: get_underlying_safe.h:41
edm::EventProcessor::processContext_
ProcessContext processContext_
Definition: EventProcessor.h:333
edm::EventProcessor::lastSourceTransition_
InputSource::ItemType lastSourceTransition_
Definition: EventProcessor.h:327
edm::SerialTaskQueue::push
void push(tbb::task_group &, const T &iAction)
asynchronously pushes functor iAction into queue
Definition: SerialTaskQueue.h:167
edm::LogInfo
Log< level::Info, false > LogInfo
Definition: MessageLogger.h:125
edm::WaitingTaskHolder::doneWaiting
void doneWaiting(std::exception_ptr iExcept)
Definition: WaitingTaskHolder.h:93
edm::EventProcessor::pathsAndConsumesOfModules_
PathsAndConsumesOfModules pathsAndConsumesOfModules_
Definition: EventProcessor.h:334
createJobs.tmp
tmp
align.sh
Definition: createJobs.py:716
edm::EventProcessor::exceptionMessageFiles_
std::string exceptionMessageFiles_
Definition: EventProcessor.h:358
edm::Worker::kLegacy
Definition: Worker.h:95
edm::EventProcessor::asyncStopRequestedWhileProcessingEvents_
bool asyncStopRequestedWhileProcessingEvents_
Definition: EventProcessor.h:367
edm::PrincipalCache::runPrincipalPtr
std::shared_ptr< RunPrincipal > const & runPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run) const
Definition: PrincipalCache.cc:28
edm::LogWarning
Log< level::Warning, false > LogWarning
Definition: MessageLogger.h:122
edm::InRun
Definition: BranchType.h:11
CMS_SA_ALLOW
#define CMS_SA_ALLOW
Definition: thread_safety_macros.h:5
edm::PathsAndConsumesOfModules::initialize
void initialize(Schedule const *, std::shared_ptr< ProductRegistry const >)
Definition: PathsAndConsumesOfModules.cc:16
runTheMatrix.nStreams
nStreams
Definition: runTheMatrix.py:372
edm::waiting_task::chain::runLast
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
mps_monitormerge.items
list items
Definition: mps_monitormerge.py:29
edm::WaitingTaskHolder::taskHasFailed
bool taskHasFailed() const noexcept
Definition: WaitingTaskHolder.h:71
groupFilesInBlocks.reverse
reverse
Definition: groupFilesInBlocks.py:131
edm::InputSource::IsRun
Definition: InputSource.h:54
edm::for_all
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
edm::ProcessingController::kToPreviousEvent
Definition: ProcessingController.h:58
edm::parameterSet
ParameterSet const & parameterSet(StableProvenance const &provenance, ProcessHistory const &history)
Definition: Provenance.cc:11
edm::EventProcessor::processEventAsyncImpl
void processEventAsyncImpl(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
Definition: EventProcessor.cc:1903
edm::first
T first(std::pair< T, U > const &p)
Definition: ParameterSet.cc:217
edm::PreallocationConfiguration::numberOfLuminosityBlocks
unsigned int numberOfLuminosityBlocks() const
Definition: PreallocationConfiguration.h:36
edm::InProcess
Definition: BranchType.h:11
edm::PrincipalCache::runPrincipal
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const
Definition: PrincipalCache.cc:21
edm::pset::Registry::clear
void clear()
Not thread safe.
Definition: Registry.cc:40
edm::ProcessContext::setProcessConfiguration
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
Definition: ProcessContext.cc:19
alignCSCRings.s
s
Definition: alignCSCRings.py:92
edm::EventProcessor::exceptionMessageLumis_
std::atomic< bool > exceptionMessageLumis_
Definition: EventProcessor.h:360
edm::ProcessingController::ForwardState
ForwardState
Definition: ProcessingController.h:31
edm::waiting_task::chain::then
constexpr auto then(O &&iO)
Definition: chain_first.h:277
edm::EventProcessor::readAndMergeLumi
int readAndMergeLumi(LuminosityBlockProcessingStatus &)
Definition: EventProcessor.cc:1676
h
edm::EventProcessor::queueWhichWaitsForIOVsToFinish_
edm::SerialTaskQueue queueWhichWaitsForIOVsToFinish_
Definition: EventProcessor.h:330
edm::EventProcessor::epSignal
Definition: EventProcessor.h:80
std::swap
void swap(edm::DataFrameContainer &lhs, edm::DataFrameContainer &rhs)
Definition: DataFrameContainer.h:209
edm::PrincipalCache::merge
void merge(std::shared_ptr< RunAuxiliary > aux, std::shared_ptr< ProductRegistry const > reg)
Definition: PrincipalCache.cc:54
edm::EventProcessor::streamQueues_
std::vector< edm::SerialTaskQueue > streamQueues_
Definition: EventProcessor.h:337
edm::EventProcessor::epInputComplete
Definition: EventProcessor.h:81
edm::convertException::wrap
auto wrap(F iFunc) -> decltype(iFunc())
Definition: ConvertException.h:19
ScheduleInfo
edm::EventProcessor::readLuminosityBlock
void readLuminosityBlock(LuminosityBlockProcessingStatus &)
Definition: EventProcessor.cc:1657
edm::InputSource::IsSynchronize
Definition: InputSource.h:54
TrackValidation_cff.task
task
Definition: TrackValidation_cff.py:253
edm::EventProcessor::sourceMutex_
std::shared_ptr< std::recursive_mutex > sourceMutex_
Definition: EventProcessor.h:353
edm::RunPrincipal::setEndTime
void setEndTime(Timestamp const &time)
Definition: RunPrincipal.h:71
edm::EventProcessor::beginJobCalled_
bool beginJobCalled_
Definition: EventProcessor.h:355
edm::ProcessingController::kToSpecifiedEvent
Definition: ProcessingController.h:58
edm::checkForModuleDependencyCorrectness
void checkForModuleDependencyCorrectness(edm::PathsAndConsumesOfModulesBase const &iPnC, bool iPrintDependencies)
Definition: PathsAndConsumesOfModules.cc:269
LuminosityBlock
WaitingTaskHolder
edm::EventProcessor::fileModeNoMerge_
bool fileModeNoMerge_
Definition: EventProcessor.h:357
IOVSyncValue
edm::EventProcessor::writeProcessBlockAsync
void writeProcessBlockAsync(WaitingTaskHolder, ProcessBlockType)
Definition: EventProcessor.cc:1693
edm::EventProcessor::warnAboutModulesRequiringLuminosityBLockSynchronization
void warnAboutModulesRequiringLuminosityBLockSynchronization() const
Definition: EventProcessor.cc:1990
edm::InEvent
Definition: BranchType.h:11
edm::Transition::BeginLuminosityBlock
runTheMatrix.nThreads
nThreads
Definition: runTheMatrix.py:371
edm::IllegalParameters::setThrowAnException
static void setThrowAnException(bool v)
Definition: IllegalParameters.h:16
first
auto first
Definition: CAHitNtupletGeneratorKernelsImpl.h:125
edm::EventProcessor::actReg_
std::shared_ptr< ActivityRegistry > actReg_
Definition: EventProcessor.h:320
edm::EventID::maxEventNumber
static EventNumber_t maxEventNumber()
Definition: EventID.h:96
edm::EventProcessor::exceptionMessageRuns_
std::string exceptionMessageRuns_
Definition: EventProcessor.h:359
edm::PrincipalCache::preReadFile
void preReadFile()
Definition: PrincipalCache.cc:149
edm::SharedResourcesRegistry::instance
static SharedResourcesRegistry * instance()
Definition: SharedResourcesRegistry.cc:25
edm::EventProcessor::printDependencies_
bool printDependencies_
Definition: EventProcessor.h:375
edm::EventProcessor::preallocations_
PreallocationConfiguration preallocations_
Definition: EventProcessor.h:365
edm::SerialTaskQueue::pause
bool pause()
Pauses processing of additional tasks from the queue.
Definition: SerialTaskQueue.h:99
edm::SubProcess::doEndJob
void doEndJob()
Definition: SubProcess.cc:288
edm::waiting_task::chain
Definition: chain_first.h:254
summarizeEdmComparisonLogfiles.succeeded
succeeded
Definition: summarizeEdmComparisonLogfiles.py:100
edm::EventProcessor::act_table_
std::unique_ptr< ExceptionToActionTable const > act_table_
Definition: EventProcessor.h:331
edm::EventProcessor::deleteNonConsumedUnscheduledModules_
bool deleteNonConsumedUnscheduledModules_
Definition: EventProcessor.h:376
edm::serviceregistry::kConfigurationOverrides
Definition: ServiceLegacy.h:29
edm::ServiceRegistry::Operate
friend class Operate
Definition: ServiceRegistry.h:54
edm::EventProcessor::beginLumiAsync
void beginLumiAsync(edm::IOVSyncValue const &iSyncValue, std::shared_ptr< void > const &iRunResource, edm::WaitingTaskHolder iHolder)
Definition: EventProcessor.cc:1280
edm::EventProcessor::espController_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
Definition: EventProcessor.h:328
edm::EventProcessor::streamLumiStatus_
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
Definition: EventProcessor.h:339
edm::make_waiting_task
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:101
ParameterSet
Definition: Functions.h:16
edm::WaitingTaskHolder
Definition: WaitingTaskHolder.h:32
edm::InLumi
Definition: BranchType.h:11
FDEBUG
#define FDEBUG(lev)
Definition: DebugMacros.h:19
edm::InputSource::IsLumi
Definition: InputSource.h:54
edm::EventProcessor::streamEndLumiAsync
void streamEndLumiAsync(edm::WaitingTaskHolder iTask, unsigned int iStreamIndex)
Definition: EventProcessor.cc:1551
edm::shutdown_flag
volatile std::atomic< bool > shutdown_flag
Definition: UnixSignalHandlers.cc:22
edm::EventProcessor::epException
Definition: EventProcessor.h:78
edm::EventProcessor::serviceToken_
ServiceToken serviceToken_
Definition: EventProcessor.h:325
edm::serviceregistry::kOverlapIsError
Definition: ServiceLegacy.h:29
edm::PrincipalCache::deleteRun
void deleteRun(ProcessHistoryID const &phid, RunNumber_t run)
Definition: PrincipalCache.cc:110
edm::EventProcessor::lastTransitionType
InputSource::ItemType lastTransitionType() const
Definition: EventProcessor.h:194
edm::Service
Definition: Service.h:30
edm::InputSource::doEndJob
void doEndJob()
Called by framework at end of job.
Definition: InputSource.cc:209
edm::EventProcessor::thinnedAssociationsHelper_
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
Definition: EventProcessor.h:324
edm::EventProcessor::readNextEventForStream
bool readNextEventForStream(unsigned int iStreamIndex, LuminosityBlockProcessingStatus &iLumiStatus)
Definition: EventProcessor.cc:1766
runEdmFileComparison.returnCode
returnCode
Definition: runEdmFileComparison.py:262
edm::SerialTaskQueueChain::push
void push(tbb::task_group &iGroup, T &&iAction)
asynchronously pushes functor iAction into queue
Definition: SerialTaskQueueChain.h:75
edm::LogAbsolute
Log< level::System, true > LogAbsolute
Definition: MessageLogger.h:134
edm::EventProcessor::forceLooperToEnd_
bool forceLooperToEnd_
Definition: EventProcessor.h:361
edm::EventProcessor::epCountComplete
Definition: EventProcessor.h:83
edm::PrincipalCache::insertForInput
void insertForInput(std::unique_ptr< ProcessBlockPrincipal >)
Definition: PrincipalCache.cc:98
edm::InputSource::IsStop
Definition: InputSource.h:54
edm::PrincipalCache::adjustEventsToNewProductRegistry
void adjustEventsToNewProductRegistry(std::shared_ptr< ProductRegistry const > reg)
Definition: PrincipalCache.cc:127
edm::fillLooper
std::shared_ptr< EDLooperBase > fillLooper(eventsetup::EventSetupsController &esController, eventsetup::EventSetupProvider &cp, ParameterSet &params, std::vector< std::string > const &loopers)
Definition: EventProcessor.cc:214
edm::EDLooperBase::kContinue
Definition: EDLooperBase.h:82
edm::EventProcessor::epOther
Definition: EventProcessor.h:79
edm::EDLooperBase::endOfJob
virtual void endOfJob()
Definition: EDLooperBase.cc:107
edm::LogError
Log< level::Error, false > LogError
Definition: MessageLogger.h:123
ServiceToken
edm::EventProcessor::streamLumiActive_
std::atomic< unsigned int > streamLumiActive_
Definition: EventProcessor.h:340
edm::EventProcessor::mergeableRunProductProcesses_
MergeableRunProductProcesses mergeableRunProductProcesses_
Definition: EventProcessor.h:335
AlCaHLTBitMon_QueryRunRegistry.string
string string
Definition: AlCaHLTBitMon_QueryRunRegistry.py:256
edm::FinalWaitingTask
Definition: WaitingTask.h:76
cmsLHEtoEOSManager.l
l
Definition: cmsLHEtoEOSManager.py:204
edm::PrincipalCache::inputProcessBlockPrincipal
ProcessBlockPrincipal & inputProcessBlockPrincipal() const
Definition: PrincipalCache.h:55
edm::EventProcessor::asyncStopStatusCodeFromProcessingEvents_
StatusCode asyncStopStatusCodeFromProcessingEvents_
Definition: EventProcessor.h:368
edm::LimitedTaskQueue::Resumer
Definition: LimitedTaskQueue.h:57
edm::EventProcessor::looper_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
Definition: EventProcessor.h:346
edm::ProcessBlockPrincipal::fillProcessBlockPrincipal
void fillProcessBlockPrincipal(std::string const &processName, DelayedReader *reader=nullptr)
Definition: ProcessBlockPrincipal.cc:16
edm::EventProcessor::sourceResourcesAcquirer_
SharedResourcesAcquirer sourceResourcesAcquirer_
Definition: EventProcessor.h:352
edm::EventProcessor::setDeferredException
bool setDeferredException(std::exception_ptr)
Definition: EventProcessor.cc:1981
edm::InputSource::ItemType
ItemType
Definition: InputSource.h:54
edm::EventProcessor::continueLumiAsync
void continueLumiAsync(edm::WaitingTaskHolder iHolder)
Definition: EventProcessor.cc:1452
edm::EventProcessor::doErrorStuff
void doErrorStuff()
Definition: EventProcessor.cc:967
ModuleChanger
edm::PrincipalCache::ProcessBlockType::Input
edm::ParentageRegistry::insertMapped
bool insertMapped(value_type const &v)
Definition: ParentageRegistry.cc:25
edm::Transition::EndLuminosityBlock
eostools.move
def move(src, dest)
Definition: eostools.py:511
edm::RunPrincipal::mergeableRunProductMetadata
MergeableRunProductMetadata * mergeableRunProductMetadata()
Definition: RunPrincipal.h:81
edm::nonConsumedUnscheduledModules
std::vector< ModuleDescription const * > nonConsumedUnscheduledModules(edm::PathsAndConsumesOfModulesBase const &iPnC, std::vector< ModuleProcessName > &consumedByChildren)
Definition: PathsAndConsumesOfModules.cc:165
edm::EventProcessor::taskGroup_
tbb::task_group taskGroup_
Definition: EventProcessor.h:318
edm::EventProcessor::principalCache_
PrincipalCache principalCache_
Definition: EventProcessor.h:354
ProcessingController
edm::PathsAndConsumesOfModules::removeModules
void removeModules(std::vector< ModuleDescription const * > const &modules)
Definition: PathsAndConsumesOfModules.cc:53
edm::EventProcessor::branchIDListHelper
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
Definition: EventProcessor.h:295
edm::PrincipalCache::getAvailableLumiPrincipalPtr
std::shared_ptr< LuminosityBlockPrincipal > getAvailableLumiPrincipalPtr()
Definition: PrincipalCache.cc:50
edm::EventProcessor::deferredExceptionPtrIsSet_
std::atomic< bool > deferredExceptionPtrIsSet_
Definition: EventProcessor.h:349
edm::PrincipalCache::hasRunPrincipal
bool hasRunPrincipal() const
Definition: PrincipalCache.h:66
edm::Transition::BeginRun
edm::EventProcessor::epSuccess
Definition: EventProcessor.h:77
ev
bool ev
Definition: Hydjet2Hadronizer.cc:97
edm::EventProcessor::processConfiguration_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
Definition: EventProcessor.h:332
Exception
Definition: hltDiff.cc:245
edm::EventProcessor::run
StatusCode run()
Definition: EventProcessor.h:381
edm::MergeableRunProductMetadata::preWriteRun
void preWriteRun()
Definition: MergeableRunProductMetadata.cc:125
edm::EventProcessor::handleEndLumiExceptions
void handleEndLumiExceptions(std::exception_ptr const *iPtr, WaitingTaskHolder &holder)
Definition: EventProcessor.cc:1469
edm::waiting_task::chain::ifThen
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false,...
Definition: chain_first.h:288
or
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
edm::EventProcessor::processEventWithLooper
void processEventWithLooper(EventPrincipal &, unsigned int iStreamIndex)
Definition: EventProcessor.cc:1932
edm::ParameterSet::getParameter
T getParameter(std::string const &) const
Definition: ParameterSet.h:303
edm::PreallocationConfiguration::numberOfStreams
unsigned int numberOfStreams() const
Definition: PreallocationConfiguration.h:35
AlignmentPI::index
index
Definition: AlignmentPayloadInspectorHelper.h:46
edm::FileBlock::ParallelProcesses
Definition: FileBlock.h:30
edm::PrincipalCache::processBlockPrincipal
ProcessBlockPrincipal & processBlockPrincipal() const
Definition: PrincipalCache.h:54
edm::EventProcessor::fileBlockValid
bool fileBlockValid()
Definition: EventProcessor.h:204
edm::EventProcessor::endRun
void endRun(ProcessHistoryID const &phid, RunNumber_t run, bool globalBeginSucceeded, bool cleaningUpAfterException)
Definition: EventProcessor.cc:1192
edm::EventProcessor::setExceptionMessageLumis
void setExceptionMessageLumis()
Definition: EventProcessor.cc:1979
edm::EventProcessor::looper
std::shared_ptr< EDLooperBase const > looper() const
Definition: EventProcessor.h:305
cms::Exception
Definition: Exception.h:70
edm::EventProcessor::nextLuminosityBlockID
edm::LuminosityBlockNumber_t nextLuminosityBlockID()
Definition: EventProcessor.cc:787
edm::InputSource::IsEvent
Definition: InputSource.h:54
edm::EventProcessor::subProcesses_
std::vector< SubProcess > subProcesses_
Definition: EventProcessor.h:342
edm::EventProcessor::nextTransitionType
InputSource::ItemType nextTransitionType()
Definition: EventProcessor.cc:757
edm::EventProcessor::prepareForNextLoop
void prepareForNextLoop()
Definition: EventProcessor.cc:949
edm::PrincipalCache::eventPrincipal
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
Definition: PrincipalCache.h:70
edm::EventProcessor::init
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
Definition: EventProcessor.cc:342
c
auto & c
Definition: CAHitNtupletGeneratorKernelsImpl.h:56
edm::PrincipalCache::insert
void insert(std::unique_ptr< ProcessBlockPrincipal >)
Definition: PrincipalCache.cc:96
edm::EventProcessor::processEventAsync
void processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
Definition: EventProcessor.cc:1899
event
Definition: event.py:1
edm::ProcessingController::ReverseState
ReverseState
Definition: ProcessingController.h:38
MatrixUtil.merge
def merge(dictlist, TELL=False)
Definition: MatrixUtil.py:201
edm::Transition::EndRun
edm::EventProcessor::shouldWeStop
bool shouldWeStop() const
Definition: EventProcessor.cc:1960
submitPVValidationJobs.t
string t
Definition: submitPVValidationJobs.py:644
edm::errors::Configuration
Definition: EDMException.h:36
SiStripBadComponentsDQMServiceTemplate_cfg.ep
ep
Definition: SiStripBadComponentsDQMServiceTemplate_cfg.py:86
edm::EventProcessor::preg
std::shared_ptr< ProductRegistry const > preg() const
Definition: EventProcessor.h:293
edm::EventProcessor::schedule_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
Definition: EventProcessor.h:336
edm::makeInput
std::unique_ptr< InputSource > makeInput(ParameterSet &params, CommonParams const &common, std::shared_ptr< ProductRegistry > preg, std::shared_ptr< BranchIDListHelper > branchIDListHelper, std::shared_ptr< ProcessBlockHelper > const &processBlockHelper, std::shared_ptr< ThinnedAssociationsHelper > thinnedAssociationsHelper, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, PreallocationConfiguration const &allocations)
Definition: EventProcessor.cc:122
edm::EventProcessor::writeLumiAsync
void writeLumiAsync(WaitingTaskHolder, LuminosityBlockPrincipal &lumiPrincipal)
Definition: EventProcessor.cc:1740
edm::EventProcessor::handleNextEventForStreamAsync
void handleNextEventForStreamAsync(WaitingTaskHolder iTask, unsigned int iStreamIndex)
Definition: EventProcessor.cc:1828
edm::WaitingTaskHolder::group
tbb::task_group * group() const noexcept
Definition: WaitingTaskHolder.h:77
edm::EventProcessor::branchIDListHelper_
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
Definition: EventProcessor.h:322
trackingPlots.common
common
Definition: trackingPlots.py:205
edm::PrincipalCache::adjustIndexesAfterProductRegistryAddition
void adjustIndexesAfterProductRegistryAddition()
Definition: PrincipalCache.cc:137
edm::LuminosityBlockID::maxLuminosityBlockNumber
static LuminosityBlockNumber_t maxLuminosityBlockNumber()
Definition: LuminosityBlockID.h:84
edm::EventProcessor::checkForAsyncStopRequest
bool checkForAsyncStopRequest(StatusCode &)
Definition: EventProcessor.cc:746
common
Definition: common.py:1
edm::EventProcessor::forceESCacheClearOnNewRun_
bool forceESCacheClearOnNewRun_
Definition: EventProcessor.h:363
findQualityFiles.size
size
Write out results.
Definition: findQualityFiles.py:443
MillePedeFileConverter_cfg.e
e
Definition: MillePedeFileConverter_cfg.py:37
edm::EventProcessor::beginJob
void beginJob()
Definition: EventProcessor.cc:585
edm::PrincipalCache::setNumberOfConcurrentPrincipals
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
Definition: PrincipalCache.cc:17
unpackBuffers-CaloStage2.token
token
Definition: unpackBuffers-CaloStage2.py:316
edm::MergeableRunProductProcesses::setProcessesWithMergeableRunProducts
void setProcessesWithMergeableRunProducts(ProductRegistry const &productRegistry)
Definition: MergeableRunProductProcesses.cc:18