CMS 3D CMS Logo

List of all members | Public Types | Public Member Functions | Private Types | Private Member Functions | Private Attributes
edm::EventProcessor Class Reference

#include <EventProcessor.h>

Public Types

enum  StatusCode {
  epSuccess = 0, epException = 1, epOther = 2, epSignal = 3,
  epInputComplete = 4, epTimedOut = 5, epCountComplete = 6
}
 

Public Member Functions

void beginJob ()
 
void beginLumiAsync (edm::IOVSyncValue const &iSyncValue, std::shared_ptr< void > const &iRunResource, edm::WaitingTaskHolder iHolder)
 
void beginRun (ProcessHistoryID const &phid, RunNumber_t run, bool &globalBeginSucceeded, bool &eventSetupForInstanceSucceeded)
 
void clearCounters ()
 Clears counters used by trigger report. More...
 
void closeInputFile (bool cleaningUpAfterException)
 
void closeOutputFiles ()
 
void continueLumiAsync (edm::WaitingTaskHolder iHolder)
 
void deleteLumiFromCache (LuminosityBlockProcessingStatus &)
 
void deleteRunFromCache (ProcessHistoryID const &phid, RunNumber_t run)
 
void doErrorStuff ()
 
void enableEndPaths (bool active)
 
void endJob ()
 
bool endOfLoop ()
 
bool endPathsEnabled () const
 
void endRun (ProcessHistoryID const &phid, RunNumber_t run, bool globalBeginSucceeded, bool cleaningUpAfterException)
 
void endUnfinishedLumi ()
 
void endUnfinishedRun (ProcessHistoryID const &phid, RunNumber_t run, bool globalBeginSucceeded, bool cleaningUpAfterException, bool eventSetupForInstanceSucceeded)
 
 EventProcessor (EventProcessor const &)=delete
 
 EventProcessor (std::shared_ptr< ProcessDesc > processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy legacy)
 
 EventProcessor (std::unique_ptr< ParameterSet > parameterSet, ServiceToken const &token=ServiceToken(), serviceregistry::ServiceLegacy=serviceregistry::kOverlapIsError, std::vector< std::string > const &defaultServices=std::vector< std::string >(), std::vector< std::string > const &forcedServices=std::vector< std::string >())
 
 EventProcessor (std::unique_ptr< ParameterSet > parameterSet, std::vector< std::string > const &defaultServices, std::vector< std::string > const &forcedServices=std::vector< std::string >())
 
std::vector< ModuleDescription const * > getAllModuleDescriptions () const
 
ServiceToken getToken ()
 
void getTriggerReport (TriggerReport &rep) const
 
void globalEndLumiAsync (edm::WaitingTaskHolder iTask, std::shared_ptr< LuminosityBlockProcessingStatus > iLumiStatus)
 
void handleEndLumiExceptions (std::exception_ptr const *iPtr, WaitingTaskHolder &holder)
 
InputSource::ItemType lastTransitionType () const
 
edm::LuminosityBlockNumber_t nextLuminosityBlockID ()
 
std::pair< edm::ProcessHistoryID, edm::RunNumber_tnextRunID ()
 
InputSource::ItemType nextTransitionType ()
 
void openOutputFiles ()
 
EventProcessoroperator= (EventProcessor const &)=delete
 
void prepareForNextLoop ()
 
ProcessConfiguration const & processConfiguration () const
 
InputSource::ItemType processLumis (std::shared_ptr< void > const &iRunResource)
 
int readAndMergeLumi (LuminosityBlockProcessingStatus &)
 
std::pair< ProcessHistoryID, RunNumber_treadAndMergeRun ()
 
void readFile ()
 
void readLuminosityBlock (LuminosityBlockProcessingStatus &)
 
std::pair< ProcessHistoryID, RunNumber_treadRun ()
 
void respondToCloseInputFile ()
 
void respondToOpenInputFile ()
 
void rewindInput ()
 
StatusCode run ()
 
StatusCode runToCompletion ()
 
bool setDeferredException (std::exception_ptr)
 
void setExceptionMessageFiles (std::string &message)
 
void setExceptionMessageLumis ()
 
void setExceptionMessageRuns (std::string &message)
 
bool shouldWeCloseOutput () const
 
bool shouldWeStop () const
 
void startingNewLoop ()
 
void streamEndLumiAsync (edm::WaitingTaskHolder iTask, unsigned int iStreamIndex)
 
int totalEvents () const
 
int totalEventsFailed () const
 
int totalEventsPassed () const
 
void writeLumiAsync (WaitingTaskHolder, LuminosityBlockPrincipal &lumiPrincipal)
 
void writeRunAsync (WaitingTaskHolder, ProcessHistoryID const &phid, RunNumber_t run, MergeableRunProductMetadata const *)
 
 ~EventProcessor ()
 

Private Types

typedef std::set< std::pair< std::string, std::string > > ExcludedData
 
typedef std::map< std::string, ExcludedDataExcludedDataMap
 

Private Member Functions

std::shared_ptr< BranchIDListHelper > & branchIDListHelper ()
 
std::shared_ptr< BranchIDListHelper const > branchIDListHelper () const
 
bool checkForAsyncStopRequest (StatusCode &)
 
void handleNextEventForStreamAsync (WaitingTaskHolder iTask, unsigned int iStreamIndex)
 
void init (std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
 
std::shared_ptr< EDLooperBase > & looper ()
 
std::shared_ptr< EDLooperBase const > looper () const
 
std::shared_ptr< ProductRegistry > & preg ()
 
std::shared_ptr< ProductRegistry const > preg () const
 
void processEventAsync (WaitingTaskHolder iHolder, unsigned int iStreamIndex)
 
void processEventAsyncImpl (WaitingTaskHolder iHolder, unsigned int iStreamIndex)
 
void processEventWithLooper (EventPrincipal &, unsigned int iStreamIndex)
 
void readEvent (unsigned int iStreamIndex)
 
bool readNextEventForStream (unsigned int iStreamIndex, LuminosityBlockProcessingStatus &iLumiStatus)
 
std::shared_ptr< ThinnedAssociationsHelper > & thinnedAssociationsHelper ()
 
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper () const
 
void warnAboutModulesRequiringLuminosityBLockSynchronization () const
 

Private Attributes

std::unique_ptr< ExceptionToActionTable const > act_table_
 
std::shared_ptr< ActivityRegistryactReg_
 
bool asyncStopRequestedWhileProcessingEvents_
 
StatusCode asyncStopStatusCodeFromProcessingEvents_
 
bool beginJobCalled_
 
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
 
std::exception_ptr deferredExceptionPtr_
 
std::atomic< bool > deferredExceptionPtrIsSet_
 
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
 
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
 
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
 
std::string exceptionMessageFiles_
 
std::atomic< bool > exceptionMessageLumis_
 
std::string exceptionMessageRuns_
 
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
 
bool fileModeNoMerge_
 
bool firstEventInBlock_ = true
 
bool forceESCacheClearOnNewRun_
 
bool forceLooperToEnd_
 
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
 
edm::propagate_const< std::unique_ptr< InputSource > > input_
 
InputSource::ItemType lastSourceTransition_
 
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
 
bool looperBeginJobRun_
 
std::unique_ptr< edm::LimitedTaskQueuelumiQueue_
 
MergeableRunProductProcesses mergeableRunProductProcesses_
 
PathsAndConsumesOfModules pathsAndConsumesOfModules_
 
PreallocationConfiguration preallocations_
 
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
 
PrincipalCache principalCache_
 
bool printDependencies_ = false
 
std::shared_ptr< ProcessConfiguration const > processConfiguration_
 
ProcessContext processContext_
 
edm::SerialTaskQueue queueWhichWaitsForIOVsToFinish_
 
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
 
ServiceToken serviceToken_
 
bool shouldWeStop_
 
std::shared_ptr< std::recursive_mutex > sourceMutex_
 
SharedResourcesAcquirer sourceResourcesAcquirer_
 
std::atomic< unsigned int > streamLumiActive_ {0}
 
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
 
std::vector< edm::SerialTaskQueuestreamQueues_
 
std::vector< SubProcesssubProcesses_
 
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
 

Detailed Description

Definition at line 64 of file EventProcessor.h.

Member Typedef Documentation

◆ ExcludedData

typedef std::set<std::pair<std::string, std::string> > edm::EventProcessor::ExcludedData
private

Definition at line 358 of file EventProcessor.h.

◆ ExcludedDataMap

typedef std::map<std::string, ExcludedData> edm::EventProcessor::ExcludedDataMap
private

Definition at line 359 of file EventProcessor.h.

Member Enumeration Documentation

◆ StatusCode

Enumerator
epSuccess 
epException 
epOther 
epSignal 
epInputComplete 
epTimedOut 
epCountComplete 

Definition at line 74 of file EventProcessor.h.

74  {
75  epSuccess = 0,
76  epException = 1,
77  epOther = 2,
78  epSignal = 3,
79  epInputComplete = 4,
80  epTimedOut = 5,
81  epCountComplete = 6
82  };

Constructor & Destructor Documentation

◆ EventProcessor() [1/4]

edm::EventProcessor::EventProcessor ( std::unique_ptr< ParameterSet parameterSet,
ServiceToken const &  token = ServiceToken(),
serviceregistry::ServiceLegacy  iLegacy = serviceregistry::kOverlapIsError,
std::vector< std::string > const &  defaultServices = std::vector<std::string>(),
std::vector< std::string > const &  forcedServices = std::vector<std::string>() 
)
explicit

Definition at line 215 of file EventProcessor.cc.

220  : actReg_(),
221  preg_(),
223  serviceToken_(),
224  input_(),
225  espController_(new eventsetup::EventSetupsController),
226  esp_(),
227  act_table_(),
229  schedule_(),
230  subProcesses_(),
231  historyAppender_(new HistoryAppender),
232  fb_(),
233  looper_(),
235  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
236  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
237  principalCache_(),
238  beginJobCalled_(false),
239  shouldWeStop_(false),
240  fileModeNoMerge_(false),
243  exceptionMessageLumis_(false),
244  forceLooperToEnd_(false),
245  looperBeginJobRun_(false),
248  auto processDesc = std::make_shared<ProcessDesc>(std::move(parameterSet));
249  processDesc->addServices(defaultServices, forcedServices);
250  init(processDesc, iToken, iLegacy);
251  }

References init(), eostools::move(), and edm::parameterSet().

◆ EventProcessor() [2/4]

edm::EventProcessor::EventProcessor ( std::unique_ptr< ParameterSet parameterSet,
std::vector< std::string > const &  defaultServices,
std::vector< std::string > const &  forcedServices = std::vector<std::string>() 
)

Definition at line 253 of file EventProcessor.cc.

256  : actReg_(),
257  preg_(),
259  serviceToken_(),
260  input_(),
261  espController_(new eventsetup::EventSetupsController),
262  esp_(),
263  act_table_(),
265  schedule_(),
266  subProcesses_(),
267  historyAppender_(new HistoryAppender),
268  fb_(),
269  looper_(),
271  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
272  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
273  principalCache_(),
274  beginJobCalled_(false),
275  shouldWeStop_(false),
276  fileModeNoMerge_(false),
279  exceptionMessageLumis_(false),
280  forceLooperToEnd_(false),
281  looperBeginJobRun_(false),
285  auto processDesc = std::make_shared<ProcessDesc>(std::move(parameterSet));
286  processDesc->addServices(defaultServices, forcedServices);
288  }

References init(), edm::serviceregistry::kOverlapIsError, eostools::move(), and edm::parameterSet().

◆ EventProcessor() [3/4]

edm::EventProcessor::EventProcessor ( std::shared_ptr< ProcessDesc processDesc,
ServiceToken const &  token,
serviceregistry::ServiceLegacy  legacy 
)

Definition at line 290 of file EventProcessor.cc.

293  : actReg_(),
294  preg_(),
296  serviceToken_(),
297  input_(),
298  espController_(new eventsetup::EventSetupsController),
299  esp_(),
300  act_table_(),
302  schedule_(),
303  subProcesses_(),
304  historyAppender_(new HistoryAppender),
305  fb_(),
306  looper_(),
308  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
309  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
310  principalCache_(),
311  beginJobCalled_(false),
312  shouldWeStop_(false),
313  fileModeNoMerge_(false),
316  exceptionMessageLumis_(false),
317  forceLooperToEnd_(false),
318  looperBeginJobRun_(false),
322  init(processDesc, token, legacy);
323  }

References init(), and unpackBuffers-CaloStage2::token.

◆ ~EventProcessor()

edm::EventProcessor::~EventProcessor ( )

Definition at line 504 of file EventProcessor.cc.

504  {
505  // Make the services available while everything is being deleted.
508 
509  // manually destroy all these thing that may need the services around
510  // propagate_const<T> has no reset() function
511  espController_ = nullptr;
512  esp_ = nullptr;
513  schedule_ = nullptr;
514  input_ = nullptr;
515  looper_ = nullptr;
516  actReg_ = nullptr;
517 
520  }

References actReg_, edm::ParentageRegistry::clear(), edm::pset::Registry::clear(), esp_, espController_, getToken(), input_, edm::ParentageRegistry::instance(), edm::pset::Registry::instance(), looper_, schedule_, and unpackBuffers-CaloStage2::token.

◆ EventProcessor() [4/4]

edm::EventProcessor::EventProcessor ( EventProcessor const &  )
delete

Member Function Documentation

◆ beginJob()

void edm::EventProcessor::beginJob ( void  )

This should be called before the first call to 'run' If this is not called in time, it will automatically be called the first time 'run' is called

Definition at line 522 of file EventProcessor.cc.

522  {
523  if (beginJobCalled_)
524  return;
525  beginJobCalled_ = true;
526  bk::beginJob();
527 
528  // StateSentry toerror(this); // should we add this ?
529  //make the services available
531 
532  service::SystemBounds bounds(preallocations_.numberOfStreams(),
536  actReg_->preallocateSignal_(bounds);
537  schedule_->convertCurrentProcessAlias(processConfiguration_->processName());
539 
540  //NOTE: this may throw
542  actReg_->preBeginJobSignal_(pathsAndConsumesOfModules_, processContext_);
543 
546  }
547  //NOTE: This implementation assumes 'Job' means one call
548  // the EventProcessor::run
549  // If it really means once per 'application' then this code will
550  // have to be changed.
551  // Also have to deal with case where have 'run' then new Module
552  // added and do 'run'
553  // again. In that case the newly added Module needs its 'beginJob'
554  // to be called.
555 
556  //NOTE: in future we should have a beginOfJob for looper that takes no arguments
557  // For now we delay calling beginOfJob until first beginOfRun
558  //if(looper_) {
559  // looper_->beginOfJob(es);
560  //}
561  try {
562  convertException::wrap([&]() { input_->doBeginJob(); });
563  } catch (cms::Exception& ex) {
564  ex.addContext("Calling beginJob for the source");
565  throw;
566  }
567  espController_->finishConfiguration();
568  schedule_->beginJob(*preg_, esp_->recordsToProxyIndices());
569  // toerror.succeeded(); // should we add this?
570  for_all(subProcesses_, [](auto& subProcess) { subProcess.doBeginJob(); });
571  actReg_->postBeginJobSignal_();
572 
573  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
574  schedule_->beginStream(i);
575  for_all(subProcesses_, [i](auto& subProcess) { subProcess.doBeginStream(i); });
576  }
577  }

References actReg_, cms::Exception::addContext(), bk::beginJob(), beginJobCalled_, edm::checkForModuleDependencyCorrectness(), esp_, espController_, edm::for_all(), mps_fire::i, edm::PathsAndConsumesOfModules::initialize(), input_, edm::PreallocationConfiguration::numberOfLuminosityBlocks(), edm::PreallocationConfiguration::numberOfRuns(), edm::PreallocationConfiguration::numberOfStreams(), edm::PreallocationConfiguration::numberOfThreads(), pathsAndConsumesOfModules_, preallocations_, preg(), preg_, printDependencies_, processConfiguration_, processContext_, schedule_, serviceToken_, subProcesses_, warnAboutModulesRequiringLuminosityBLockSynchronization(), and edm::convertException::wrap().

Referenced by runToCompletion().

◆ beginLumiAsync()

void edm::EventProcessor::beginLumiAsync ( edm::IOVSyncValue const &  iSyncValue,
std::shared_ptr< void > const &  iRunResource,
edm::WaitingTaskHolder  iHolder 
)

Definition at line 1061 of file EventProcessor.cc.

1063  {
1064  if (iHolder.taskHasFailed()) {
1065  return;
1066  }
1067 
1068  // We must be careful with the status object here and in code this function calls. IF we want
1069  // endRun to be called, then we must call resetResources before the things waiting on
1070  // iHolder are allowed to proceed. Otherwise, there will be race condition (possibly causing
1071  // endRun to be called much later than it should be, because status is holding iRunResource).
1072  // Note that this must be done explicitly. Relying on the destructor does not work well
1073  // because the LimitedTaskQueue for the lumiWork holds the shared_ptr in each of its internal
1074  // queues, plus it is difficult to guarantee the destructor is called before iHolder gets
1075  // destroyed inside this function and lumiWork.
1076  auto status =
1077  std::make_shared<LuminosityBlockProcessingStatus>(this, preallocations_.numberOfStreams(), iRunResource);
1078 
1079  auto lumiWork = [this, iHolder, status](edm::LimitedTaskQueue::Resumer iResumer) mutable {
1080  if (iHolder.taskHasFailed()) {
1081  status->resetResources();
1082  return;
1083  }
1084 
1085  status->setResumer(std::move(iResumer));
1086 
1087  sourceResourcesAcquirer_.serialQueueChain().push([this, iHolder, status = std::move(status)]() mutable {
1088  //make the services available
1090  // Caught exception is propagated via WaitingTaskHolder
1091  CMS_SA_ALLOW try {
1093 
1094  LuminosityBlockPrincipal& lumiPrincipal = *status->lumiPrincipal();
1095  {
1096  SendSourceTerminationSignalIfException sentry(actReg_.get());
1097 
1098  input_->doBeginLumi(lumiPrincipal, &processContext_);
1099  sentry.completedSuccessfully();
1100  }
1101 
1103  if (rng.isAvailable()) {
1104  LuminosityBlock lb(lumiPrincipal, ModuleDescription(), nullptr, false);
1105  rng->preBeginLumi(lb);
1106  }
1107 
1108  IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), 0), lumiPrincipal.beginTime());
1109 
1110  //Task to start the stream beginLumis
1111  auto beginStreamsTask = make_waiting_task(
1112  tbb::task::allocate_root(), [this, holder = iHolder, status, ts](std::exception_ptr const* iPtr) mutable {
1113  if (iPtr) {
1114  status->resetResources();
1115  holder.doneWaiting(*iPtr);
1116  } else {
1117  status->globalBeginDidSucceed();
1118  EventSetupImpl const& es = status->eventSetupImpl(esp_->subProcessIndex());
1119 
1120  if (looper_) {
1121  // Caught exception is propagated via WaitingTaskHolder
1122  CMS_SA_ALLOW try {
1123  //make the services available
1124  ServiceRegistry::Operate operateLooper(serviceToken_);
1125  looper_->doBeginLuminosityBlock(*(status->lumiPrincipal()), es, &processContext_);
1126  } catch (...) {
1127  status->resetResources();
1128  holder.doneWaiting(std::current_exception());
1129  return;
1130  }
1131  }
1132  typedef OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamBegin> Traits;
1133 
1134  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1135  streamQueues_[i].push([this, i, status, holder, ts, &es]() mutable {
1136  streamQueues_[i].pause();
1137 
1138  auto eventTask =
1139  edm::make_waiting_task(tbb::task::allocate_root(),
1140  [this, i, h = std::move(holder)](
1141  std::exception_ptr const* exceptionFromBeginStreamLumi) mutable {
1142  if (exceptionFromBeginStreamLumi) {
1144  tmp.doneWaiting(*exceptionFromBeginStreamLumi);
1146  } else {
1148  }
1149  });
1150  auto& event = principalCache_.eventPrincipal(i);
1151  //We need to be sure that 'status' and its internal shared_ptr<LuminosityBlockPrincipal> are only
1152  // held by the container as this lambda may not finish executing before all the tasks it
1153  // spawns have already started to run.
1154  auto eventSetupImpls = &status->eventSetupImpls();
1155  auto lp = status->lumiPrincipal().get();
1158  event.setLuminosityBlockPrincipal(lp);
1159  beginStreamTransitionAsync<Traits>(WaitingTaskHolder{eventTask},
1160  *schedule_,
1161  i,
1162  *lp,
1163  ts,
1164  es,
1165  eventSetupImpls,
1166  serviceToken_,
1167  subProcesses_);
1168  });
1169  }
1170  }
1171  }); // beginStreamTask
1172 
1173  //task to start the global begin lumi
1174  WaitingTaskHolder beginStreamsHolder{beginStreamsTask};
1175 
1176  EventSetupImpl const& es = status->eventSetupImpl(esp_->subProcessIndex());
1177  {
1178  typedef OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalBegin> Traits;
1179  beginGlobalTransitionAsync<Traits>(beginStreamsHolder,
1180  *schedule_,
1181  lumiPrincipal,
1182  ts,
1183  es,
1184  &status->eventSetupImpls(),
1185  serviceToken_,
1186  subProcesses_);
1187  }
1188  } catch (...) {
1189  status->resetResources();
1190  iHolder.doneWaiting(std::current_exception());
1191  }
1192  }); // task in sourceResourcesAcquirer
1193  }; // end lumiWork
1194 
1195  auto queueLumiWorkTask = make_waiting_task(
1196  tbb::task::allocate_root(),
1197  [this, lumiWorkLambda = std::move(lumiWork), iHolder](std::exception_ptr const* iPtr) mutable {
1198  if (iPtr) {
1199  iHolder.doneWaiting(*iPtr);
1200  }
1201  lumiQueue_->pushAndPause(std::move(lumiWorkLambda));
1202  });
1203 
1204  if (espController_->doWeNeedToWaitForIOVsToFinish(iSync)) {
1205  // We only get here inside this block if there is an EventSetup
1206  // module not able to handle concurrent IOVs (usually an ESSource)
1207  // and the new sync value is outside the current IOV of that module.
1208 
1209  WaitingTaskHolder queueLumiWorkTaskHolder{queueLumiWorkTask};
1210 
1211  queueWhichWaitsForIOVsToFinish_.push([this, queueLumiWorkTaskHolder, iSync, status]() mutable {
1212  // Caught exception is propagated via WaitingTaskHolder
1213  CMS_SA_ALLOW try {
1214  SendSourceTerminationSignalIfException sentry(actReg_.get());
1215  // Pass in iSync to let the EventSetup system know which run and lumi
1216  // need to be processed and prepare IOVs for it.
1217  // Pass in the endIOVWaitingTasks so the lumi can notify them when the
1218  // lumi is done and no longer needs its EventSetup IOVs.
1219  espController_->eventSetupForInstance(
1220  iSync, queueLumiWorkTaskHolder, status->endIOVWaitingTasks(), status->eventSetupImpls());
1221  sentry.completedSuccessfully();
1222  } catch (...) {
1223  queueLumiWorkTaskHolder.doneWaiting(std::current_exception());
1224  }
1226  });
1227 
1228  } else {
1230 
1231  // This holder will be used to wait until the EventSetup IOVs are ready
1232  WaitingTaskHolder queueLumiWorkTaskHolder{queueLumiWorkTask};
1233  // Caught exception is propagated via WaitingTaskHolder
1234  CMS_SA_ALLOW try {
1235  SendSourceTerminationSignalIfException sentry(actReg_.get());
1236 
1237  // Pass in iSync to let the EventSetup system know which run and lumi
1238  // need to be processed and prepare IOVs for it.
1239  // Pass in the endIOVWaitingTasks so the lumi can notify them when the
1240  // lumi is done and no longer needs its EventSetup IOVs.
1241  espController_->eventSetupForInstance(
1242  iSync, queueLumiWorkTaskHolder, status->endIOVWaitingTasks(), status->eventSetupImpls());
1243  sentry.completedSuccessfully();
1244 
1245  } catch (...) {
1246  queueLumiWorkTaskHolder.doneWaiting(std::current_exception());
1247  }
1248  }
1249  }

References actReg_, edm::LuminosityBlockPrincipal::beginTime(), CMS_SA_ALLOW, edm::WaitingTaskHolder::doneWaiting(), esp_, espController_, edm::PrincipalCache::eventPrincipal(), handleNextEventForStreamAsync(), mps_fire::i, input_, edm::Service< T >::isAvailable(), looper_, edm::LuminosityBlockPrincipal::luminosityBlock(), lumiQueue_, edm::make_waiting_task(), eostools::move(), edm::PreallocationConfiguration::numberOfStreams(), edm::SerialTaskQueue::pause(), preallocations_, principalCache_, processContext_, edm::SerialTaskQueueChain::push(), edm::SerialTaskQueue::push(), queueWhichWaitsForIOVsToFinish_, readLuminosityBlock(), edm::LuminosityBlockPrincipal::run(), schedule_, edm::SharedResourcesAcquirer::serialQueueChain(), serviceToken_, sourceResourcesAcquirer_, mps_update::status, streamEndLumiAsync(), streamLumiActive_, streamLumiStatus_, streamQueues_, subProcesses_, edm::WaitingTaskHolder::taskHasFailed(), and createJobs::tmp.

Referenced by handleNextEventForStreamAsync(), and processLumis().

◆ beginRun()

void edm::EventProcessor::beginRun ( ProcessHistoryID const &  phid,
RunNumber_t  run,
bool &  globalBeginSucceeded,
bool &  eventSetupForInstanceSucceeded 
)

Definition at line 865 of file EventProcessor.cc.

868  {
869  globalBeginSucceeded = false;
870  RunPrincipal& runPrincipal = principalCache_.runPrincipal(phid, run);
871  {
872  SendSourceTerminationSignalIfException sentry(actReg_.get());
873 
874  input_->doBeginRun(runPrincipal, &processContext_);
875  sentry.completedSuccessfully();
876  }
877 
878  IOVSyncValue ts(EventID(runPrincipal.run(), 0, 0), runPrincipal.beginTime());
880  espController_->forceCacheClear();
881  }
882  {
883  SendSourceTerminationSignalIfException sentry(actReg_.get());
884  espController_->eventSetupForInstance(ts);
885  eventSetupForInstanceSucceeded = true;
886  sentry.completedSuccessfully();
887  }
888  auto const& es = esp_->eventSetupImpl();
889  if (looper_ && looperBeginJobRun_ == false) {
890  looper_->copyInfo(ScheduleInfo(schedule_.get()));
891  looper_->beginOfJob(es);
892  looperBeginJobRun_ = true;
893  looper_->doStartingNewLoop();
894  }
895  {
896  typedef OccurrenceTraits<RunPrincipal, BranchActionGlobalBegin> Traits;
897  auto globalWaitTask = make_empty_waiting_task();
898  globalWaitTask->increment_ref_count();
899  beginGlobalTransitionAsync<Traits>(WaitingTaskHolder(globalWaitTask.get()),
900  *schedule_,
901  runPrincipal,
902  ts,
903  es,
904  nullptr,
906  subProcesses_);
907  globalWaitTask->wait_for_all();
908  if (globalWaitTask->exceptionPtr() != nullptr) {
909  std::rethrow_exception(*(globalWaitTask->exceptionPtr()));
910  }
911  }
912  globalBeginSucceeded = true;
913  FDEBUG(1) << "\tbeginRun " << run << "\n";
914  if (looper_) {
915  looper_->doBeginRun(runPrincipal, es, &processContext_);
916  }
917  {
918  //To wait, the ref count has to be 1+#streams
919  auto streamLoopWaitTask = make_empty_waiting_task();
920  streamLoopWaitTask->increment_ref_count();
921 
922  typedef OccurrenceTraits<RunPrincipal, BranchActionStreamBegin> Traits;
923 
924  beginStreamsTransitionAsync<Traits>(streamLoopWaitTask.get(),
925  *schedule_,
927  runPrincipal,
928  ts,
929  es,
930  nullptr,
932  subProcesses_);
933 
934  streamLoopWaitTask->wait_for_all();
935  if (streamLoopWaitTask->exceptionPtr() != nullptr) {
936  std::rethrow_exception(*(streamLoopWaitTask->exceptionPtr()));
937  }
938  }
939  FDEBUG(1) << "\tstreamBeginRun " << run << "\n";
940  if (looper_) {
941  //looper_->doStreamBeginRun(schedule_->streamID(),runPrincipal, es);
942  }
943  }

References actReg_, edm::RunPrincipal::beginTime(), esp_, espController_, FDEBUG, forceESCacheClearOnNewRun_, input_, looper_, looperBeginJobRun_, edm::make_empty_waiting_task(), edm::PreallocationConfiguration::numberOfStreams(), preallocations_, principalCache_, processContext_, edm::RunPrincipal::run(), run(), edm::PrincipalCache::runPrincipal(), schedule_, serviceToken_, and subProcesses_.

◆ branchIDListHelper() [1/2]

std::shared_ptr<BranchIDListHelper>& edm::EventProcessor::branchIDListHelper ( )
inlineprivate

Definition at line 290 of file EventProcessor.h.

References branchIDListHelper_, and edm::get_underlying_safe().

◆ branchIDListHelper() [2/2]

std::shared_ptr<BranchIDListHelper const> edm::EventProcessor::branchIDListHelper ( ) const
inlineprivate

Definition at line 287 of file EventProcessor.h.

287  {
289  }

References branchIDListHelper_, and edm::get_underlying_safe().

Referenced by init().

◆ checkForAsyncStopRequest()

bool edm::EventProcessor::checkForAsyncStopRequest ( StatusCode returnCode)
private

Definition at line 634 of file EventProcessor.cc.

634  {
635  bool returnValue = false;
636 
637  // Look for a shutdown signal
638  if (shutdown_flag.load(std::memory_order_acquire)) {
639  returnValue = true;
641  }
642  return returnValue;
643  }

References epSignal, runEdmFileComparison::returnCode, and edm::shutdown_flag.

Referenced by nextTransitionType().

◆ clearCounters()

void edm::EventProcessor::clearCounters ( )

Clears counters used by trigger report.

Definition at line 628 of file EventProcessor.cc.

628 { schedule_->clearCounters(); }

References schedule_.

◆ closeInputFile()

void edm::EventProcessor::closeInputFile ( bool  cleaningUpAfterException)

Definition at line 764 of file EventProcessor.cc.

764  {
765  if (fb_.get() != nullptr) {
766  SendSourceTerminationSignalIfException sentry(actReg_.get());
767  input_->closeFile(fb_.get(), cleaningUpAfterException);
768  sentry.completedSuccessfully();
769  }
770  FDEBUG(1) << "\tcloseInputFile\n";
771  }

References actReg_, fb_, FDEBUG, and input_.

◆ closeOutputFiles()

void edm::EventProcessor::closeOutputFiles ( )

Definition at line 781 of file EventProcessor.cc.

781  {
782  if (fb_.get() != nullptr) {
783  schedule_->closeOutputFiles();
784  for_all(subProcesses_, [](auto& subProcess) { subProcess.closeOutputFiles(); });
785  }
786  FDEBUG(1) << "\tcloseOutputFiles\n";
787  }

References fb_, FDEBUG, edm::for_all(), schedule_, and subProcesses_.

◆ continueLumiAsync()

void edm::EventProcessor::continueLumiAsync ( edm::WaitingTaskHolder  iHolder)

Definition at line 1251 of file EventProcessor.cc.

1251  {
1252  {
1253  //all streams are sharing the same status at the moment
1254  auto status = streamLumiStatus_[0]; //read from streamLumiActive_ happened in calling routine
1255  status->needToContinueLumi();
1256  status->startProcessingEvents();
1257  }
1258 
1259  unsigned int streamIndex = 0;
1260  for (; streamIndex < preallocations_.numberOfStreams() - 1; ++streamIndex) {
1261  tbb::task::enqueue(*edm::make_functor_task(tbb::task::allocate_root(), [this, streamIndex, h = iHolder]() {
1262  handleNextEventForStreamAsync(std::move(h), streamIndex);
1263  }));
1264  }
1265  tbb::task::spawn(*edm::make_functor_task(tbb::task::allocate_root(), [this, streamIndex, h = std::move(iHolder)]() {
1266  handleNextEventForStreamAsync(std::move(h), streamIndex);
1267  }));
1268  }

References handleNextEventForStreamAsync(), edm::make_functor_task(), eostools::move(), edm::PreallocationConfiguration::numberOfStreams(), preallocations_, mps_update::status, and streamLumiStatus_.

Referenced by processLumis().

◆ deleteLumiFromCache()

void edm::EventProcessor::deleteLumiFromCache ( LuminosityBlockProcessingStatus iStatus)

Definition at line 1562 of file EventProcessor.cc.

1562  {
1563  for (auto& s : subProcesses_) {
1564  s.deleteLumiFromCache(*iStatus.lumiPrincipal());
1565  }
1566  iStatus.lumiPrincipal()->clearPrincipal();
1567  //FDEBUG(1) << "\tdeleteLumiFromCache " << run << "/" << lumi << "\n";
1568  }

References edm::LuminosityBlockProcessingStatus::lumiPrincipal(), alignCSCRings::s, and subProcesses_.

Referenced by globalEndLumiAsync().

◆ deleteRunFromCache()

void edm::EventProcessor::deleteRunFromCache ( ProcessHistoryID const &  phid,
RunNumber_t  run 
)

Definition at line 1537 of file EventProcessor.cc.

1537  {
1538  principalCache_.deleteRun(phid, run);
1539  for_all(subProcesses_, [run, phid](auto& subProcess) { subProcess.deleteRunFromCache(phid, run); });
1540  FDEBUG(1) << "\tdeleteRunFromCache " << run << "\n";
1541  }

References edm::PrincipalCache::deleteRun(), FDEBUG, edm::for_all(), principalCache_, run(), and subProcesses_.

Referenced by endUnfinishedRun().

◆ doErrorStuff()

void edm::EventProcessor::doErrorStuff ( )

Definition at line 856 of file EventProcessor.cc.

856  {
857  FDEBUG(1) << "\tdoErrorStuff\n";
858  LogError("StateMachine") << "The EventProcessor state machine encountered an unexpected event\n"
859  << "and went to the error state\n"
860  << "Will attempt to terminate processing normally\n"
861  << "(IF using the looper the next loop will be attempted)\n"
862  << "This likely indicates a bug in an input module or corrupted input or both\n";
863  }

References FDEBUG.

Referenced by runToCompletion().

◆ enableEndPaths()

void edm::EventProcessor::enableEndPaths ( bool  active)

Turn end_paths "off" if "active" is false; turn end_paths "on" if "active" is true.

Definition at line 622 of file EventProcessor.cc.

622 { schedule_->enableEndPaths(active); }

References schedule_.

◆ endJob()

void edm::EventProcessor::endJob ( void  )

This should be called before the EventProcessor is destroyed throws if any module's endJob throws an exception.

Definition at line 579 of file EventProcessor.cc.

579  {
580  // Collects exceptions, so we don't throw before all operations are performed.
581  ExceptionCollector c(
582  "Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
583 
584  //make the services available
586 
587  //NOTE: this really should go elsewhere in the future
588  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
589  c.call([this, i]() { this->schedule_->endStream(i); });
590  for (auto& subProcess : subProcesses_) {
591  c.call([&subProcess, i]() { subProcess.doEndStream(i); });
592  }
593  }
594  auto actReg = actReg_.get();
595  c.call([actReg]() { actReg->preEndJobSignal_(); });
596  schedule_->endJob(c);
597  for (auto& subProcess : subProcesses_) {
598  c.call(std::bind(&SubProcess::doEndJob, &subProcess));
599  }
600  c.call(std::bind(&InputSource::doEndJob, input_.get()));
601  if (looper_) {
602  c.call(std::bind(&EDLooperBase::endOfJob, looper()));
603  }
604  c.call([actReg]() { actReg->postEndJobSignal_(); });
605  if (c.hasThrown()) {
606  c.rethrow();
607  }
608  }

References actReg_, HltBtagPostValidation_cff::c, edm::SubProcess::doEndJob(), edm::InputSource::doEndJob(), edm::EDLooperBase::endOfJob(), mps_fire::i, input_, looper(), looper_, edm::PreallocationConfiguration::numberOfStreams(), preallocations_, schedule_, serviceToken_, and subProcesses_.

Referenced by PythonEventProcessor::~PythonEventProcessor().

◆ endOfLoop()

bool edm::EventProcessor::endOfLoop ( )

Definition at line 817 of file EventProcessor.cc.

817  {
818  if (looper_) {
819  ModuleChanger changer(schedule_.get(), preg_.get(), esp_->recordsToProxyIndices());
820  looper_->setModuleChanger(&changer);
821  EDLooperBase::Status status = looper_->doEndOfLoop(esp_->eventSetupImpl());
822  looper_->setModuleChanger(nullptr);
824  return true;
825  else
826  return false;
827  }
828  FDEBUG(1) << "\tendOfLoop\n";
829  return true;
830  }

References esp_, FDEBUG, forceLooperToEnd_, edm::EDLooperBase::kContinue, looper_, preg_, schedule_, and mps_update::status.

Referenced by runToCompletion().

◆ endPathsEnabled()

bool edm::EventProcessor::endPathsEnabled ( ) const

Return true if end_paths are active, and false if they are inactive.

Definition at line 624 of file EventProcessor.cc.

624 { return schedule_->endPathsEnabled(); }

References schedule_.

◆ endRun()

void edm::EventProcessor::endRun ( ProcessHistoryID const &  phid,
RunNumber_t  run,
bool  globalBeginSucceeded,
bool  cleaningUpAfterException 
)

Definition at line 971 of file EventProcessor.cc.

974  {
975  RunPrincipal& runPrincipal = principalCache_.runPrincipal(phid, run);
976  runPrincipal.setEndTime(input_->timestamp());
977 
978  IOVSyncValue ts(
980  runPrincipal.endTime());
981  {
982  SendSourceTerminationSignalIfException sentry(actReg_.get());
983  espController_->eventSetupForInstance(ts);
984  sentry.completedSuccessfully();
985  }
986  auto const& es = esp_->eventSetupImpl();
987  if (globalBeginSucceeded) {
988  //To wait, the ref count has to be 1+#streams
989  auto streamLoopWaitTask = make_empty_waiting_task();
990  streamLoopWaitTask->increment_ref_count();
991 
992  typedef OccurrenceTraits<RunPrincipal, BranchActionStreamEnd> Traits;
993 
994  endStreamsTransitionAsync<Traits>(WaitingTaskHolder(streamLoopWaitTask.get()),
995  *schedule_,
997  runPrincipal,
998  ts,
999  es,
1000  nullptr,
1001  serviceToken_,
1002  subProcesses_,
1003  cleaningUpAfterException);
1004 
1005  streamLoopWaitTask->wait_for_all();
1006  if (streamLoopWaitTask->exceptionPtr() != nullptr) {
1007  std::rethrow_exception(*(streamLoopWaitTask->exceptionPtr()));
1008  }
1009  }
1010  FDEBUG(1) << "\tstreamEndRun " << run << "\n";
1011  if (looper_) {
1012  //looper_->doStreamEndRun(schedule_->streamID(),runPrincipal, es);
1013  }
1014  {
1015  auto globalWaitTask = make_empty_waiting_task();
1016  globalWaitTask->increment_ref_count();
1017 
1018  typedef OccurrenceTraits<RunPrincipal, BranchActionGlobalEnd> Traits;
1019  endGlobalTransitionAsync<Traits>(WaitingTaskHolder(globalWaitTask.get()),
1020  *schedule_,
1021  runPrincipal,
1022  ts,
1023  es,
1024  nullptr,
1025  serviceToken_,
1026  subProcesses_,
1027  cleaningUpAfterException);
1028  globalWaitTask->wait_for_all();
1029  if (globalWaitTask->exceptionPtr() != nullptr) {
1030  std::rethrow_exception(*(globalWaitTask->exceptionPtr()));
1031  }
1032  }
1033  FDEBUG(1) << "\tendRun " << run << "\n";
1034  if (looper_) {
1035  looper_->doEndRun(runPrincipal, es, &processContext_);
1036  }
1037  }

References actReg_, edm::RunPrincipal::endTime(), esp_, espController_, FDEBUG, input_, looper_, edm::make_empty_waiting_task(), edm::EventID::maxEventNumber(), edm::LuminosityBlockID::maxLuminosityBlockNumber(), edm::PreallocationConfiguration::numberOfStreams(), preallocations_, principalCache_, processContext_, edm::RunPrincipal::run(), run(), edm::PrincipalCache::runPrincipal(), schedule_, serviceToken_, edm::RunPrincipal::setEndTime(), and subProcesses_.

Referenced by endUnfinishedRun().

◆ endUnfinishedLumi()

void edm::EventProcessor::endUnfinishedLumi ( )

Definition at line 1423 of file EventProcessor.cc.

1423  {
1424  if (streamLumiActive_.load() > 0) {
1425  auto globalWaitTask = make_empty_waiting_task();
1426  globalWaitTask->increment_ref_count();
1427  {
1428  WaitingTaskHolder globalTaskHolder{globalWaitTask.get()};
1429  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1430  if (streamLumiStatus_[i]) {
1431  streamEndLumiAsync(globalTaskHolder, i);
1432  }
1433  }
1434  }
1435  globalWaitTask->wait_for_all();
1436  if (globalWaitTask->exceptionPtr() != nullptr) {
1437  std::rethrow_exception(*(globalWaitTask->exceptionPtr()));
1438  }
1439  }
1440  }

References mps_fire::i, edm::make_empty_waiting_task(), edm::PreallocationConfiguration::numberOfStreams(), preallocations_, streamEndLumiAsync(), streamLumiActive_, and streamLumiStatus_.

◆ endUnfinishedRun()

void edm::EventProcessor::endUnfinishedRun ( ProcessHistoryID const &  phid,
RunNumber_t  run,
bool  globalBeginSucceeded,
bool  cleaningUpAfterException,
bool  eventSetupForInstanceSucceeded 
)

Definition at line 945 of file EventProcessor.cc.

949  {
950  if (eventSetupForInstanceSucceeded) {
951  //If we skip empty runs, this would be called conditionally
952  endRun(phid, run, globalBeginSucceeded, cleaningUpAfterException);
953 
954  if (globalBeginSucceeded) {
956  t->increment_ref_count();
957  RunPrincipal& runPrincipal = principalCache_.runPrincipal(phid, run);
958  MergeableRunProductMetadata* mergeableRunProductMetadata = runPrincipal.mergeableRunProductMetadata();
959  mergeableRunProductMetadata->preWriteRun();
960  writeRunAsync(edm::WaitingTaskHolder{t.get()}, phid, run, mergeableRunProductMetadata);
961  t->wait_for_all();
962  mergeableRunProductMetadata->postWriteRun();
963  if (t->exceptionPtr()) {
964  std::rethrow_exception(*t->exceptionPtr());
965  }
966  }
967  }
968  deleteRunFromCache(phid, run);
969  }

References deleteRunFromCache(), endRun(), edm::make_empty_waiting_task(), edm::RunPrincipal::mergeableRunProductMetadata(), edm::MergeableRunProductMetadata::postWriteRun(), edm::MergeableRunProductMetadata::preWriteRun(), principalCache_, run(), edm::PrincipalCache::runPrincipal(), OrderedSet::t, and writeRunAsync().

◆ getAllModuleDescriptions()

std::vector< ModuleDescription const * > edm::EventProcessor::getAllModuleDescriptions ( ) const

Return a vector allowing const access to all the ModuleDescriptions for this EventProccessor. *** N.B. *** Ownership of the ModuleDescriptions is not *** passed to the caller. Do not call delete on these *** pointers!

Definition at line 612 of file EventProcessor.cc.

612  {
613  return schedule_->getAllModuleDescriptions();
614  }

References schedule_.

◆ getToken()

ServiceToken edm::EventProcessor::getToken ( )

Definition at line 610 of file EventProcessor.cc.

610 { return serviceToken_; }

References serviceToken_.

Referenced by ~EventProcessor().

◆ getTriggerReport()

void edm::EventProcessor::getTriggerReport ( TriggerReport rep) const

Return the trigger report information on paths, modules-in-path, modules-in-endpath, and modules.

Definition at line 626 of file EventProcessor.cc.

626 { schedule_->getTriggerReport(rep); }

References cuy::rep, and schedule_.

◆ globalEndLumiAsync()

void edm::EventProcessor::globalEndLumiAsync ( edm::WaitingTaskHolder  iTask,
std::shared_ptr< LuminosityBlockProcessingStatus iLumiStatus 
)

Definition at line 1279 of file EventProcessor.cc.

1280  {
1281  // Get some needed info out of the status object before moving
1282  // it into finalTaskForThisLumi.
1283  auto& lp = *(iLumiStatus->lumiPrincipal());
1284  bool didGlobalBeginSucceed = iLumiStatus->didGlobalBeginSucceed();
1285  bool cleaningUpAfterException = iLumiStatus->cleaningUpAfterException();
1286  EventSetupImpl const& es = iLumiStatus->eventSetupImpl(esp_->subProcessIndex());
1287  std::vector<std::shared_ptr<const EventSetupImpl>> const* eventSetupImpls = &iLumiStatus->eventSetupImpls();
1288 
1289  auto finalTaskForThisLumi = edm::make_waiting_task(
1290  tbb::task::allocate_root(),
1291  [status = std::move(iLumiStatus), iTask = std::move(iTask), this](std::exception_ptr const* iPtr) mutable {
1292  std::exception_ptr ptr;
1293  if (iPtr) {
1294  handleEndLumiExceptions(iPtr, iTask);
1295  } else {
1296  // Caught exception is passed to handleEndLumiExceptions()
1297  CMS_SA_ALLOW try {
1299  if (looper_) {
1300  auto& lumiPrincipal = *(status->lumiPrincipal());
1301  EventSetupImpl const& eventSetupImpl = status->eventSetupImpl(esp_->subProcessIndex());
1302  looper_->doEndLuminosityBlock(lumiPrincipal, eventSetupImpl, &processContext_);
1303  }
1304  } catch (...) {
1305  ptr = std::current_exception();
1306  }
1307  }
1309 
1310  // Try hard to clean up resources so the
1311  // process can terminate in a controlled
1312  // fashion even after exceptions have occurred.
1313  // Caught exception is passed to handleEndLumiExceptions()
1314  CMS_SA_ALLOW try { deleteLumiFromCache(*status); } catch (...) {
1315  if (not ptr) {
1316  ptr = std::current_exception();
1317  }
1318  }
1319  // Caught exception is passed to handleEndLumiExceptions()
1320  CMS_SA_ALLOW try {
1321  status->resumeGlobalLumiQueue();
1323  } catch (...) {
1324  if (not ptr) {
1325  ptr = std::current_exception();
1326  }
1327  }
1328  // Caught exception is passed to handleEndLumiExceptions()
1329  CMS_SA_ALLOW try {
1330  // This call to status.resetResources() must occur before iTask is destroyed.
1331  // Otherwise there will be a data race which could result in endRun
1332  // being delayed until it is too late to successfully call it.
1333  status->resetResources();
1334  status.reset();
1335  } catch (...) {
1336  if (not ptr) {
1337  ptr = std::current_exception();
1338  }
1339  }
1340 
1341  if (ptr) {
1342  handleEndLumiExceptions(&ptr, iTask);
1343  }
1344  });
1345 
1346  auto writeT = edm::make_waiting_task(
1347  tbb::task::allocate_root(),
1348  [this, didGlobalBeginSucceed, &lumiPrincipal = lp, task = WaitingTaskHolder(finalTaskForThisLumi)](
1349  std::exception_ptr const* iExcept) mutable {
1350  if (iExcept) {
1351  task.doneWaiting(*iExcept);
1352  } else {
1353  //Only call writeLumi if beginLumi succeeded
1354  if (didGlobalBeginSucceed) {
1355  writeLumiAsync(std::move(task), lumiPrincipal);
1356  }
1357  }
1358  });
1359 
1360  IOVSyncValue ts(EventID(lp.run(), lp.luminosityBlock(), EventID::maxEventNumber()), lp.beginTime());
1361 
1362  typedef OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalEnd> Traits;
1363 
1364  endGlobalTransitionAsync<Traits>(WaitingTaskHolder(writeT),
1365  *schedule_,
1366  lp,
1367  ts,
1368  es,
1369  eventSetupImpls,
1370  serviceToken_,
1371  subProcesses_,
1372  cleaningUpAfterException);
1373  }

References CMS_SA_ALLOW, deleteLumiFromCache(), esp_, handleEndLumiExceptions(), looper_, edm::make_waiting_task(), edm::EventID::maxEventNumber(), eostools::move(), processContext_, queueWhichWaitsForIOVsToFinish_, edm::SerialTaskQueue::resume(), schedule_, serviceToken_, mps_update::status, subProcesses_, TrackValidation_cff::task, and writeLumiAsync().

Referenced by streamEndLumiAsync().

◆ handleEndLumiExceptions()

void edm::EventProcessor::handleEndLumiExceptions ( std::exception_ptr const *  iPtr,
WaitingTaskHolder holder 
)

Definition at line 1270 of file EventProcessor.cc.

1270  {
1271  if (setDeferredException(*iPtr)) {
1272  WaitingTaskHolder tmp(holder);
1273  tmp.doneWaiting(*iPtr);
1274  } else {
1276  }
1277  }

References setDeferredException(), setExceptionMessageLumis(), and createJobs::tmp.

Referenced by globalEndLumiAsync(), and streamEndLumiAsync().

◆ handleNextEventForStreamAsync()

void edm::EventProcessor::handleNextEventForStreamAsync ( WaitingTaskHolder  iTask,
unsigned int  iStreamIndex 
)
private

Definition at line 1632 of file EventProcessor.cc.

1632  {
1633  sourceResourcesAcquirer_.serialQueueChain().push([this, iTask, iStreamIndex]() mutable {
1635  //we do not want to extend the lifetime of the shared_ptr to the end of this function
1636  // as steramEndLumiAsync may clear the value from streamLumiStatus_[iStreamIndex]
1637  auto status = streamLumiStatus_[iStreamIndex].get();
1638  // Caught exception is propagated to EventProcessor::runToCompletion() via deferredExceptionPtr_
1639  CMS_SA_ALLOW try {
1640  if (readNextEventForStream(iStreamIndex, *status)) {
1641  auto recursionTask = make_waiting_task(
1642  tbb::task::allocate_root(), [this, iTask, iStreamIndex](std::exception_ptr const* iPtr) mutable {
1643  if (iPtr) {
1644  // Try to end the stream properly even if an exception was
1645  // thrown on an event.
1646  bool expected = false;
1647  if (deferredExceptionPtrIsSet_.compare_exchange_strong(expected, true)) {
1648  // This is the case where the exception in iPtr is the primary
1649  // exception and we want to see its message.
1650  deferredExceptionPtr_ = *iPtr;
1651  WaitingTaskHolder tempHolder(iTask);
1652  tempHolder.doneWaiting(*iPtr);
1653  }
1654  streamEndLumiAsync(std::move(iTask), iStreamIndex);
1655  //the stream will stop now
1656  return;
1657  }
1658  handleNextEventForStreamAsync(std::move(iTask), iStreamIndex);
1659  });
1660 
1661  processEventAsync(WaitingTaskHolder(recursionTask), iStreamIndex);
1662  } else {
1663  //the stream will stop now
1664  if (status->isLumiEnding()) {
1665  if (lastTransitionType() == InputSource::IsLumi and not status->haveStartedNextLumi()) {
1666  status->startNextLumi();
1667  beginLumiAsync(status->nextSyncValue(), status->runResource(), iTask);
1668  }
1669  streamEndLumiAsync(std::move(iTask), iStreamIndex);
1670  } else {
1671  iTask.doneWaiting(std::exception_ptr{});
1672  }
1673  }
1674  } catch (...) {
1675  // It is unlikely we will ever get in here ...
1676  // But if we do try to clean up and propagate the exception
1677  if (streamLumiStatus_[iStreamIndex]) {
1678  streamEndLumiAsync(iTask, iStreamIndex);
1679  }
1680  bool expected = false;
1681  if (deferredExceptionPtrIsSet_.compare_exchange_strong(expected, true)) {
1682  auto e = std::current_exception();
1684  iTask.doneWaiting(e);
1685  }
1686  }
1687  });
1688  }

References beginLumiAsync(), CMS_SA_ALLOW, deferredExceptionPtr_, deferredExceptionPtrIsSet_, edm::WaitingTaskHolder::doneWaiting(), MillePedeFileConverter_cfg::e, edm::InputSource::IsLumi, lastTransitionType(), edm::make_waiting_task(), eostools::move(), processEventAsync(), edm::SerialTaskQueueChain::push(), readNextEventForStream(), edm::SharedResourcesAcquirer::serialQueueChain(), serviceToken_, sourceResourcesAcquirer_, mps_update::status, streamEndLumiAsync(), and streamLumiStatus_.

Referenced by beginLumiAsync(), and continueLumiAsync().

◆ init()

void edm::EventProcessor::init ( std::shared_ptr< ProcessDesc > &  processDesc,
ServiceToken const &  token,
serviceregistry::ServiceLegacy  iLegacy 
)
private

Definition at line 325 of file EventProcessor.cc.

327  {
328  //std::cerr << processDesc->dump() << std::endl;
329 
330  // register the empty parentage vector , once and for all
332 
333  // register the empty parameter set, once and for all.
334  ParameterSet().registerIt();
335 
336  std::shared_ptr<ParameterSet> parameterSet = processDesc->getProcessPSet();
337 
338  // If there are subprocesses, pop the subprocess parameter sets out of the process parameter set
339  auto subProcessVParameterSet = popSubProcessVParameterSet(*parameterSet);
340  bool const hasSubProcesses = !subProcessVParameterSet.empty();
341 
342  // Validates the parameters in the 'options', 'maxEvents', 'maxLuminosityBlocks',
343  // and 'maxSecondsUntilRampdown' top level parameter sets. Default values are also
344  // set in here if the parameters were not explicitly set.
346 
347  // Now set some parameters specific to the main process.
348  ParameterSet const& optionsPset(parameterSet->getUntrackedParameterSet("options"));
349  auto const& fileMode = optionsPset.getUntrackedParameter<std::string>("fileMode");
350  if (fileMode != "NOMERGE" and fileMode != "FULLMERGE") {
351  throw Exception(errors::Configuration, "Illegal fileMode parameter value: ")
352  << fileMode << ".\n"
353  << "Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
354  } else {
355  fileModeNoMerge_ = (fileMode == "NOMERGE");
356  }
357  forceESCacheClearOnNewRun_ = optionsPset.getUntrackedParameter<bool>("forceEventSetupCacheClearOnNewRun");
358 
359  //threading
360  unsigned int nThreads = optionsPset.getUntrackedParameter<unsigned int>("numberOfThreads");
361 
362  // Even if numberOfThreads was set to zero in the Python configuration, the code
363  // in cmsRun.cpp should have reset it to something else.
364  assert(nThreads != 0);
365 
366  unsigned int nStreams = optionsPset.getUntrackedParameter<unsigned int>("numberOfStreams");
367  if (nStreams == 0) {
368  nStreams = nThreads;
369  }
370  if (nThreads > 1 or nStreams > 1) {
371  edm::LogInfo("ThreadStreamSetup") << "setting # threads " << nThreads << "\nsetting # streams " << nStreams;
372  }
373  unsigned int nConcurrentRuns = optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentRuns");
374  if (nConcurrentRuns != 1) {
375  throw Exception(errors::Configuration, "Illegal value nConcurrentRuns : ")
376  << "Although the plan is to change this in the future, currently nConcurrentRuns must always be 1.\n";
377  }
378  unsigned int nConcurrentLumis =
379  optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentLuminosityBlocks");
380  if (nConcurrentLumis == 0) {
381  nConcurrentLumis = nConcurrentRuns;
382  }
383 
384  //Check that relationships between threading parameters makes sense
385  /*
386  if(nThreads<nStreams) {
387  //bad
388  }
389  if(nConcurrentRuns>nStreams) {
390  //bad
391  }
392  if(nConcurrentRuns>nConcurrentLumis) {
393  //bad
394  }
395  */
396  IllegalParameters::setThrowAnException(optionsPset.getUntrackedParameter<bool>("throwIfIllegalParameter"));
397 
398  printDependencies_ = optionsPset.getUntrackedParameter<bool>("printDependencies");
399 
400  // Now do general initialization
401  ScheduleItems items;
402 
403  //initialize the services
404  auto& serviceSets = processDesc->getServicesPSets();
405  ServiceToken token = items.initServices(serviceSets, *parameterSet, iToken, iLegacy, true);
406  serviceToken_ = items.addCPRandTNS(*parameterSet, token);
407 
408  //make the services available
410 
411  if (nStreams > 1) {
413  handler->willBeUsingThreads();
414  }
415 
416  // intialize miscellaneous items
417  std::shared_ptr<CommonParams> common(items.initMisc(*parameterSet));
418 
419  // intialize the event setup provider
420  ParameterSet const& eventSetupPset(optionsPset.getUntrackedParameterSet("eventSetup"));
421  esp_ = espController_->makeProvider(*parameterSet, items.actReg_.get(), &eventSetupPset);
422 
423  // initialize the looper, if any
425  if (looper_) {
426  looper_->setActionTable(items.act_table_.get());
427  looper_->attachTo(*items.actReg_);
428 
429  //For now loopers make us run only 1 transition at a time
430  nStreams = 1;
431  nConcurrentLumis = 1;
432  nConcurrentRuns = 1;
433  }
434  espController_->setMaxConcurrentIOVs(nStreams, nConcurrentLumis);
435 
436  preallocations_ = PreallocationConfiguration{nThreads, nStreams, nConcurrentLumis, nConcurrentRuns};
437 
438  lumiQueue_ = std::make_unique<LimitedTaskQueue>(nConcurrentLumis);
439  streamQueues_.resize(nStreams);
440  streamLumiStatus_.resize(nStreams);
441 
442  // initialize the input source
444  *common,
445  items.preg(),
446  items.branchIDListHelper(),
447  items.thinnedAssociationsHelper(),
448  items.actReg_,
449  items.processConfiguration(),
451 
452  // intialize the Schedule
453  schedule_ = items.initSchedule(*parameterSet, hasSubProcesses, preallocations_, &processContext_);
454 
455  // set the data members
456  act_table_ = std::move(items.act_table_);
457  actReg_ = items.actReg_;
458  preg_ = items.preg();
460  branchIDListHelper_ = items.branchIDListHelper();
461  thinnedAssociationsHelper_ = items.thinnedAssociationsHelper();
462  processConfiguration_ = items.processConfiguration();
464  principalCache_.setProcessHistoryRegistry(input_->processHistoryRegistry());
465 
466  FDEBUG(2) << parameterSet << std::endl;
467 
469  for (unsigned int index = 0; index < preallocations_.numberOfStreams(); ++index) {
470  // Reusable event principal
471  auto ep = std::make_shared<EventPrincipal>(preg(),
475  historyAppender_.get(),
476  index);
478  }
479 
480  for (unsigned int index = 0; index < preallocations_.numberOfLuminosityBlocks(); ++index) {
481  auto lp =
482  std::make_unique<LuminosityBlockPrincipal>(preg(), *processConfiguration_, historyAppender_.get(), index);
484  }
485 
486  // fill the subprocesses, if there are any
487  subProcesses_.reserve(subProcessVParameterSet.size());
488  for (auto& subProcessPSet : subProcessVParameterSet) {
489  subProcesses_.emplace_back(subProcessPSet,
490  *parameterSet,
491  preg(),
494  SubProcessParentageHelper(),
496  *actReg_,
497  token,
500  &processContext_);
501  }
502  }

References act_table_, actReg_, cms::cuda::assert(), branchIDListHelper(), branchIDListHelper_, trackingPlots::common, edm::errors::Configuration, SiStripBadComponentsDQMServiceTemplate_cfg::ep, esp_, espController_, Exception, FDEBUG, processOptions_cff::fileMode, fileModeNoMerge_, edm::fillLooper(), forceESCacheClearOnNewRun_, edm::ParameterSet::getUntrackedParameterSet(), historyAppender_, input_, edm::PrincipalCache::insert(), edm::ParentageRegistry::insertMapped(), edm::ParentageRegistry::instance(), mps_monitormerge::items, edm::serviceregistry::kConfigurationOverrides, looper_, lumiQueue_, edm::makeInput(), mergeableRunProductProcesses_, eostools::move(), runTheMatrix::nThreads, edm::PreallocationConfiguration::numberOfLuminosityBlocks(), edm::PreallocationConfiguration::numberOfStreams(), or, edm::parameterSet(), edm::popSubProcessVParameterSet(), preallocations_, preg(), preg_, principalCache_, printDependencies_, processConfiguration_, processContext_, edm::ParameterSet::registerIt(), schedule_, serviceToken_, edm::PrincipalCache::setNumberOfConcurrentPrincipals(), edm::ProcessContext::setProcessConfiguration(), edm::MergeableRunProductProcesses::setProcessesWithMergeableRunProducts(), edm::PrincipalCache::setProcessHistoryRegistry(), edm::IllegalParameters::setThrowAnException(), streamLumiStatus_, streamQueues_, AlCaHLTBitMon_QueryRunRegistry::string, subProcesses_, thinnedAssociationsHelper(), thinnedAssociationsHelper_, unpackBuffers-CaloStage2::token, and edm::validateTopLevelParameterSets().

Referenced by EventProcessor().

◆ lastTransitionType()

InputSource::ItemType edm::EventProcessor::lastTransitionType ( ) const
inline

Definition at line 194 of file EventProcessor.h.

194  {
196  return InputSource::IsStop;
197  }
198  return lastSourceTransition_;
199  }

References deferredExceptionPtrIsSet_, edm::InputSource::IsStop, and lastSourceTransition_.

Referenced by handleNextEventForStreamAsync(), and processLumis().

◆ looper() [1/2]

std::shared_ptr<EDLooperBase>& edm::EventProcessor::looper ( )
inlineprivate

Definition at line 298 of file EventProcessor.h.

298 { return get_underlying_safe(looper_); }

References edm::get_underlying_safe(), and looper_.

◆ looper() [2/2]

std::shared_ptr<EDLooperBase const> edm::EventProcessor::looper ( ) const
inlineprivate

Definition at line 297 of file EventProcessor.h.

297 { return get_underlying_safe(looper_); }

References edm::get_underlying_safe(), and looper_.

Referenced by endJob().

◆ nextLuminosityBlockID()

edm::LuminosityBlockNumber_t edm::EventProcessor::nextLuminosityBlockID ( )

Definition at line 675 of file EventProcessor.cc.

675 { return input_->luminosityBlock(); }

References input_.

Referenced by readNextEventForStream().

◆ nextRunID()

std::pair< edm::ProcessHistoryID, edm::RunNumber_t > edm::EventProcessor::nextRunID ( )

Definition at line 671 of file EventProcessor.cc.

671  {
672  return std::make_pair(input_->reducedProcessHistoryID(), input_->run());
673  }

References input_.

◆ nextTransitionType()

InputSource::ItemType edm::EventProcessor::nextTransitionType ( )

Definition at line 645 of file EventProcessor.cc.

645  {
646  if (deferredExceptionPtrIsSet_.load()) {
648  return InputSource::IsStop;
649  }
650 
651  SendSourceTerminationSignalIfException sentry(actReg_.get());
652  InputSource::ItemType itemType;
653  //For now, do nothing with InputSource::IsSynchronize
654  do {
655  itemType = input_->nextItemType();
656  } while (itemType == InputSource::IsSynchronize);
657 
658  lastSourceTransition_ = itemType;
659  sentry.completedSuccessfully();
660 
662 
664  actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExternalSignal);
666  }
667 
668  return lastSourceTransition_;
669  }

References actReg_, checkForAsyncStopRequest(), deferredExceptionPtrIsSet_, epSuccess, edm::ExternalSignal, input_, edm::InputSource::IsStop, edm::InputSource::IsSynchronize, lastSourceTransition_, and runEdmFileComparison::returnCode.

Referenced by readNextEventForStream().

◆ openOutputFiles()

void edm::EventProcessor::openOutputFiles ( )

Definition at line 773 of file EventProcessor.cc.

773  {
774  if (fb_.get() != nullptr) {
775  schedule_->openOutputFiles(*fb_);
776  for_all(subProcesses_, [this](auto& subProcess) { subProcess.openOutputFiles(*fb_); });
777  }
778  FDEBUG(1) << "\topenOutputFiles\n";
779  }

References fb_, FDEBUG, edm::for_all(), schedule_, and subProcesses_.

◆ operator=()

EventProcessor& edm::EventProcessor::operator= ( EventProcessor const &  )
delete

◆ preg() [1/2]

std::shared_ptr<ProductRegistry>& edm::EventProcessor::preg ( )
inlineprivate

Definition at line 286 of file EventProcessor.h.

286 { return get_underlying_safe(preg_); }

References edm::get_underlying_safe(), and preg_.

◆ preg() [2/2]

std::shared_ptr<ProductRegistry const> edm::EventProcessor::preg ( ) const
inlineprivate

Definition at line 285 of file EventProcessor.h.

285 { return get_underlying_safe(preg_); }

References edm::get_underlying_safe(), and preg_.

Referenced by beginJob(), init(), readAndMergeLumi(), readAndMergeRun(), readFile(), and readRun().

◆ prepareForNextLoop()

void edm::EventProcessor::prepareForNextLoop ( )

Definition at line 838 of file EventProcessor.cc.

838  {
839  looper_->prepareForNextLoop(esp_.get());
840  FDEBUG(1) << "\tprepareForNextLoop\n";
841  }

References esp_, FDEBUG, and looper_.

Referenced by runToCompletion().

◆ processConfiguration()

ProcessConfiguration const& edm::EventProcessor::processConfiguration ( ) const
inline

Definition at line 136 of file EventProcessor.h.

136 { return *processConfiguration_; }

References processConfiguration_.

◆ processEventAsync()

void edm::EventProcessor::processEventAsync ( WaitingTaskHolder  iHolder,
unsigned int  iStreamIndex 
)
private

Definition at line 1704 of file EventProcessor.cc.

1704  {
1705  tbb::task::spawn(
1706  *make_functor_task(tbb::task::allocate_root(), [=]() { processEventAsyncImpl(iHolder, iStreamIndex); }));
1707  }

References edm::make_functor_task(), and processEventAsyncImpl().

Referenced by handleNextEventForStreamAsync().

◆ processEventAsyncImpl()

void edm::EventProcessor::processEventAsyncImpl ( WaitingTaskHolder  iHolder,
unsigned int  iStreamIndex 
)
private

Definition at line 1709 of file EventProcessor.cc.

1709  {
1710  auto pep = &(principalCache_.eventPrincipal(iStreamIndex));
1711 
1714  if (rng.isAvailable()) {
1715  Event ev(*pep, ModuleDescription(), nullptr);
1716  rng->postEventRead(ev);
1717  }
1718 
1719  WaitingTaskHolder finalizeEventTask(make_waiting_task(
1720  tbb::task::allocate_root(), [this, pep, iHolder, iStreamIndex](std::exception_ptr const* iPtr) mutable {
1721  //NOTE: If we have a looper we only have one Stream
1722  if (looper_) {
1723  ServiceRegistry::Operate operateLooper(serviceToken_);
1724  processEventWithLooper(*pep, iStreamIndex);
1725  }
1726 
1727  FDEBUG(1) << "\tprocessEvent\n";
1728  pep->clearEventPrincipal();
1729  if (iPtr) {
1730  iHolder.doneWaiting(*iPtr);
1731  } else {
1732  iHolder.doneWaiting(std::exception_ptr());
1733  }
1734  }));
1735  WaitingTaskHolder afterProcessTask;
1736  if (subProcesses_.empty()) {
1737  afterProcessTask = std::move(finalizeEventTask);
1738  } else {
1739  //Need to run SubProcesses after schedule has finished
1740  // with the event
1741  afterProcessTask = WaitingTaskHolder(make_waiting_task(
1742  tbb::task::allocate_root(),
1743  [this, pep, finalizeEventTask, iStreamIndex](std::exception_ptr const* iPtr) mutable {
1744  if (not iPtr) {
1745  //when run with 1 thread, we want to the order to be what
1746  // it was before. This requires reversing the order since
1747  // tasks are run last one in first one out
1748  for (auto& subProcess : boost::adaptors::reverse(subProcesses_)) {
1749  subProcess.doEventAsync(finalizeEventTask, *pep, &streamLumiStatus_[iStreamIndex]->eventSetupImpls());
1750  }
1751  } else {
1752  finalizeEventTask.doneWaiting(*iPtr);
1753  }
1754  }));
1755  }
1756 
1757  EventSetupImpl const& es = streamLumiStatus_[iStreamIndex]->eventSetupImpl(esp_->subProcessIndex());
1758  schedule_->processOneEventAsync(std::move(afterProcessTask), iStreamIndex, *pep, es, serviceToken_);
1759  }

References edm::WaitingTaskHolder::doneWaiting(), esp_, ev, edm::PrincipalCache::eventPrincipal(), FDEBUG, edm::Service< T >::isAvailable(), looper_, edm::make_waiting_task(), eostools::move(), principalCache_, processEventWithLooper(), groupFilesInBlocks::reverse, schedule_, serviceToken_, streamLumiStatus_, and subProcesses_.

Referenced by processEventAsync().

◆ processEventWithLooper()

void edm::EventProcessor::processEventWithLooper ( EventPrincipal iPrincipal,
unsigned int  iStreamIndex 
)
private

Definition at line 1761 of file EventProcessor.cc.

1761  {
1762  bool randomAccess = input_->randomAccess();
1763  ProcessingController::ForwardState forwardState = input_->forwardState();
1764  ProcessingController::ReverseState reverseState = input_->reverseState();
1765  ProcessingController pc(forwardState, reverseState, randomAccess);
1766 
1768  do {
1769  StreamContext streamContext(iPrincipal.streamID(), &processContext_);
1770  EventSetupImpl const& es = streamLumiStatus_[iStreamIndex]->eventSetupImpl(esp_->subProcessIndex());
1771  status = looper_->doDuringLoop(iPrincipal, es, pc, &streamContext);
1772 
1773  bool succeeded = true;
1774  if (randomAccess) {
1775  if (pc.requestedTransition() == ProcessingController::kToPreviousEvent) {
1776  input_->skipEvents(-2);
1777  } else if (pc.requestedTransition() == ProcessingController::kToSpecifiedEvent) {
1778  succeeded = input_->goToEvent(pc.specifiedEventTransition());
1779  }
1780  }
1781  pc.setLastOperationSucceeded(succeeded);
1782  } while (!pc.lastOperationSucceeded());
1784  shouldWeStop_ = true;
1786  }
1787  }

References esp_, input_, edm::InputSource::IsStop, edm::EDLooperBase::kContinue, edm::ProcessingController::kToPreviousEvent, edm::ProcessingController::kToSpecifiedEvent, edm::ProcessingController::lastOperationSucceeded(), lastSourceTransition_, looper_, processContext_, edm::ProcessingController::requestedTransition(), edm::ProcessingController::setLastOperationSucceeded(), shouldWeStop_, edm::ProcessingController::specifiedEventTransition(), mps_update::status, edm::EventPrincipal::streamID(), streamLumiStatus_, and summarizeEdmComparisonLogfiles::succeeded.

Referenced by processEventAsyncImpl().

◆ processLumis()

InputSource::ItemType edm::EventProcessor::processLumis ( std::shared_ptr< void > const &  iRunResource)

Definition at line 1039 of file EventProcessor.cc.

1039  {
1040  auto waitTask = make_empty_waiting_task();
1041  waitTask->increment_ref_count();
1042 
1043  if (streamLumiActive_ > 0) {
1045  // Continue after opening a new input file
1046  continueLumiAsync(WaitingTaskHolder{waitTask.get()});
1047  } else {
1048  beginLumiAsync(IOVSyncValue(EventID(input_->run(), input_->luminosityBlock(), 0),
1049  input_->luminosityBlockAuxiliary()->beginTime()),
1050  iRunResource,
1051  WaitingTaskHolder{waitTask.get()});
1052  }
1053  waitTask->wait_for_all();
1054 
1055  if (waitTask->exceptionPtr() != nullptr) {
1056  std::rethrow_exception(*(waitTask->exceptionPtr()));
1057  }
1058  return lastTransitionType();
1059  }

References cms::cuda::assert(), beginLumiAsync(), continueLumiAsync(), input_, lastTransitionType(), edm::make_empty_waiting_task(), edm::PreallocationConfiguration::numberOfStreams(), preallocations_, and streamLumiActive_.

◆ readAndMergeLumi()

int edm::EventProcessor::readAndMergeLumi ( LuminosityBlockProcessingStatus iStatus)

Definition at line 1496 of file EventProcessor.cc.

1496  {
1497  auto& lumiPrincipal = *iStatus.lumiPrincipal();
1498  assert(lumiPrincipal.aux().sameIdentity(*input_->luminosityBlockAuxiliary()) or
1499  input_->processHistoryRegistry().reducedProcessHistoryID(lumiPrincipal.aux().processHistoryID()) ==
1500  input_->processHistoryRegistry().reducedProcessHistoryID(
1501  input_->luminosityBlockAuxiliary()->processHistoryID()));
1502  bool lumiOK = lumiPrincipal.adjustToNewProductRegistry(*preg());
1503  assert(lumiOK);
1504  lumiPrincipal.mergeAuxiliary(*input_->luminosityBlockAuxiliary());
1505  {
1506  SendSourceTerminationSignalIfException sentry(actReg_.get());
1507  input_->readAndMergeLumi(*iStatus.lumiPrincipal());
1508  sentry.completedSuccessfully();
1509  }
1510  return input_->luminosityBlock();
1511  }

References actReg_, cms::cuda::assert(), input_, edm::LuminosityBlockProcessingStatus::lumiPrincipal(), or, and preg().

Referenced by readNextEventForStream().

◆ readAndMergeRun()

std::pair< ProcessHistoryID, RunNumber_t > edm::EventProcessor::readAndMergeRun ( )

Definition at line 1465 of file EventProcessor.cc.

1465  {
1466  principalCache_.merge(input_->runAuxiliary(), preg());
1467  auto runPrincipal = principalCache_.runPrincipalPtr();
1468  {
1469  SendSourceTerminationSignalIfException sentry(actReg_.get());
1470  input_->readAndMergeRun(*runPrincipal);
1471  sentry.completedSuccessfully();
1472  }
1473  assert(input_->reducedProcessHistoryID() == runPrincipal->reducedProcessHistoryID());
1474  return std::make_pair(runPrincipal->reducedProcessHistoryID(), input_->run());
1475  }

References actReg_, cms::cuda::assert(), input_, edm::PrincipalCache::merge(), preg(), principalCache_, and edm::PrincipalCache::runPrincipalPtr().

◆ readEvent()

void edm::EventProcessor::readEvent ( unsigned int  iStreamIndex)
private

Definition at line 1690 of file EventProcessor.cc.

1690  {
1691  //TODO this will have to become per stream
1692  auto& event = principalCache_.eventPrincipal(iStreamIndex);
1693  StreamContext streamContext(event.streamID(), &processContext_);
1694 
1695  SendSourceTerminationSignalIfException sentry(actReg_.get());
1696  input_->readEvent(event, streamContext);
1697 
1698  streamLumiStatus_[iStreamIndex]->updateLastTimestamp(input_->timestamp());
1699  sentry.completedSuccessfully();
1700 
1701  FDEBUG(1) << "\treadEvent\n";
1702  }

References actReg_, edm::PrincipalCache::eventPrincipal(), FDEBUG, input_, principalCache_, processContext_, and streamLumiStatus_.

Referenced by readNextEventForStream().

◆ readFile()

void edm::EventProcessor::readFile ( )

◆ readLuminosityBlock()

void edm::EventProcessor::readLuminosityBlock ( LuminosityBlockProcessingStatus iStatus)

Definition at line 1477 of file EventProcessor.cc.

1477  {
1479  throw edm::Exception(edm::errors::LogicError) << "EventProcessor::readLuminosityBlock\n"
1480  << "Illegal attempt to insert lumi into cache\n"
1481  << "Run is invalid\n"
1482  << "Contact a Framework Developer\n";
1483  }
1485  assert(lbp);
1486  lbp->setAux(*input_->luminosityBlockAuxiliary());
1487  {
1488  SendSourceTerminationSignalIfException sentry(actReg_.get());
1489  input_->readLuminosityBlock(*lbp, *historyAppender_);
1490  sentry.completedSuccessfully();
1491  }
1492  lbp->setRunPrincipal(principalCache_.runPrincipalPtr());
1493  iStatus.lumiPrincipal() = std::move(lbp);
1494  }

References actReg_, cms::cuda::assert(), Exception, edm::PrincipalCache::getAvailableLumiPrincipalPtr(), edm::PrincipalCache::hasRunPrincipal(), historyAppender_, input_, edm::errors::LogicError, edm::LuminosityBlockProcessingStatus::lumiPrincipal(), eostools::move(), principalCache_, and edm::PrincipalCache::runPrincipalPtr().

Referenced by beginLumiAsync().

◆ readNextEventForStream()

bool edm::EventProcessor::readNextEventForStream ( unsigned int  iStreamIndex,
LuminosityBlockProcessingStatus iLumiStatus 
)
private

Definition at line 1570 of file EventProcessor.cc.

1570  {
1571  if (deferredExceptionPtrIsSet_.load(std::memory_order_acquire)) {
1572  iStatus.endLumi();
1573  return false;
1574  }
1575 
1576  if (iStatus.wasEventProcessingStopped()) {
1577  return false;
1578  }
1579 
1580  if (shouldWeStop()) {
1582  iStatus.stopProcessingEvents();
1583  iStatus.endLumi();
1584  return false;
1585  }
1586 
1588  // Caught exception is propagated to EventProcessor::runToCompletion() via deferredExceptionPtr_
1589  CMS_SA_ALLOW try {
1590  //need to use lock in addition to the serial task queue because
1591  // of delayed provenance reading and reading data in response to
1592  // edm::Refs etc
1593  std::lock_guard<std::recursive_mutex> guard(*(sourceMutex_.get()));
1594 
1595  auto itemType = iStatus.continuingLumi() ? InputSource::IsLumi : nextTransitionType();
1596  if (InputSource::IsLumi == itemType) {
1597  iStatus.haveContinuedLumi();
1598  while (itemType == InputSource::IsLumi and iStatus.lumiPrincipal()->run() == input_->run() and
1599  iStatus.lumiPrincipal()->luminosityBlock() == nextLuminosityBlockID()) {
1600  readAndMergeLumi(iStatus);
1601  itemType = nextTransitionType();
1602  }
1603  if (InputSource::IsLumi == itemType) {
1604  iStatus.setNextSyncValue(IOVSyncValue(EventID(input_->run(), input_->luminosityBlock(), 0),
1605  input_->luminosityBlockAuxiliary()->beginTime()));
1606  }
1607  }
1608  if (InputSource::IsEvent != itemType) {
1609  iStatus.stopProcessingEvents();
1610 
1611  //IsFile may continue processing the lumi and
1612  // looper_ can cause the input source to declare a new IsRun which is actually
1613  // just a continuation of the previous run
1614  if (InputSource::IsStop == itemType or InputSource::IsLumi == itemType or
1615  (InputSource::IsRun == itemType and iStatus.lumiPrincipal()->run() != input_->run())) {
1616  iStatus.endLumi();
1617  }
1618  return false;
1619  }
1620  readEvent(iStreamIndex);
1621  } catch (...) {
1622  bool expected = false;
1623  if (deferredExceptionPtrIsSet_.compare_exchange_strong(expected, true)) {
1624  deferredExceptionPtr_ = std::current_exception();
1625  iStatus.endLumi();
1626  }
1627  return false;
1628  }
1629  return true;
1630  }

References CMS_SA_ALLOW, edm::LuminosityBlockProcessingStatus::continuingLumi(), deferredExceptionPtr_, deferredExceptionPtrIsSet_, edm::LuminosityBlockProcessingStatus::endLumi(), edm::LuminosityBlockProcessingStatus::haveContinuedLumi(), input_, edm::InputSource::IsEvent, edm::InputSource::IsLumi, edm::InputSource::IsRun, edm::InputSource::IsStop, lastSourceTransition_, edm::LuminosityBlockProcessingStatus::lumiPrincipal(), nextLuminosityBlockID(), nextTransitionType(), or, readAndMergeLumi(), readEvent(), serviceToken_, edm::LuminosityBlockProcessingStatus::setNextSyncValue(), shouldWeStop(), sourceMutex_, edm::LuminosityBlockProcessingStatus::stopProcessingEvents(), and edm::LuminosityBlockProcessingStatus::wasEventProcessingStopped().

Referenced by handleNextEventForStreamAsync().

◆ readRun()

std::pair< ProcessHistoryID, RunNumber_t > edm::EventProcessor::readRun ( )

Definition at line 1442 of file EventProcessor.cc.

1442  {
1444  throw edm::Exception(edm::errors::LogicError) << "EventProcessor::readRun\n"
1445  << "Illegal attempt to insert run into cache\n"
1446  << "Contact a Framework Developer\n";
1447  }
1448  auto rp = std::make_shared<RunPrincipal>(input_->runAuxiliary(),
1449  preg(),
1451  historyAppender_.get(),
1452  0,
1453  true,
1455  {
1456  SendSourceTerminationSignalIfException sentry(actReg_.get());
1457  input_->readRun(*rp, *historyAppender_);
1458  sentry.completedSuccessfully();
1459  }
1460  assert(input_->reducedProcessHistoryID() == rp->reducedProcessHistoryID());
1461  principalCache_.insert(rp);
1462  return std::make_pair(rp->reducedProcessHistoryID(), input_->run());
1463  }

References actReg_, cms::cuda::assert(), Exception, edm::PrincipalCache::hasRunPrincipal(), historyAppender_, input_, edm::PrincipalCache::insert(), edm::errors::LogicError, mergeableRunProductProcesses_, preg(), principalCache_, and processConfiguration_.

◆ respondToCloseInputFile()

void edm::EventProcessor::respondToCloseInputFile ( )

Definition at line 799 of file EventProcessor.cc.

799  {
800  if (fb_.get() != nullptr) {
801  schedule_->respondToCloseInputFile(*fb_);
802  for_all(subProcesses_, [this](auto& subProcess) { subProcess.respondToCloseInputFile(*fb_); });
803  }
804  FDEBUG(1) << "\trespondToCloseInputFile\n";
805  }

References fb_, FDEBUG, edm::for_all(), schedule_, and subProcesses_.

◆ respondToOpenInputFile()

void edm::EventProcessor::respondToOpenInputFile ( )

Definition at line 789 of file EventProcessor.cc.

789  {
791  [this](auto& subProcess) { subProcess.updateBranchIDListHelper(branchIDListHelper_->branchIDLists()); });
792  if (fb_.get() != nullptr) {
793  schedule_->respondToOpenInputFile(*fb_);
794  for_all(subProcesses_, [this](auto& subProcess) { subProcess.respondToOpenInputFile(*fb_); });
795  }
796  FDEBUG(1) << "\trespondToOpenInputFile\n";
797  }

References branchIDListHelper_, fb_, FDEBUG, edm::for_all(), schedule_, and subProcesses_.

◆ rewindInput()

void edm::EventProcessor::rewindInput ( )

Definition at line 832 of file EventProcessor.cc.

832  {
833  input_->repeat();
834  input_->rewind();
835  FDEBUG(1) << "\trewind\n";
836  }

References FDEBUG, and input_.

Referenced by runToCompletion().

◆ run()

EventProcessor::StatusCode edm::EventProcessor::run ( )
inline

◆ runToCompletion()

EventProcessor::StatusCode edm::EventProcessor::runToCompletion ( )

Definition at line 677 of file EventProcessor.cc.

677  {
680  {
681  beginJob(); //make sure this was called
682 
683  // make the services available
685 
687  try {
688  FilesProcessor fp(fileModeNoMerge_);
689 
690  convertException::wrap([&]() {
691  bool firstTime = true;
692  do {
693  if (not firstTime) {
695  rewindInput();
696  } else {
697  firstTime = false;
698  }
699  startingNewLoop();
700 
701  auto trans = fp.processFiles(*this);
702 
703  fp.normalEnd();
704 
705  if (deferredExceptionPtrIsSet_.load()) {
706  std::rethrow_exception(deferredExceptionPtr_);
707  }
708  if (trans != InputSource::IsStop) {
709  //problem with the source
710  doErrorStuff();
711 
712  throw cms::Exception("BadTransition") << "Unexpected transition change " << trans;
713  }
714  } while (not endOfLoop());
715  }); // convertException::wrap
716 
717  } // Try block
718  catch (cms::Exception& e) {
720  std::string message(
721  "Another exception was caught while trying to clean up lumis after the primary fatal exception.");
722  e.addAdditionalInfo(message);
723  if (e.alreadyPrinted()) {
724  LogAbsolute("Additional Exceptions") << message;
725  }
726  }
727  if (!exceptionMessageRuns_.empty()) {
728  e.addAdditionalInfo(exceptionMessageRuns_);
729  if (e.alreadyPrinted()) {
730  LogAbsolute("Additional Exceptions") << exceptionMessageRuns_;
731  }
732  }
733  if (!exceptionMessageFiles_.empty()) {
734  e.addAdditionalInfo(exceptionMessageFiles_);
735  if (e.alreadyPrinted()) {
736  LogAbsolute("Additional Exceptions") << exceptionMessageFiles_;
737  }
738  }
739  throw;
740  }
741  }
742 
743  return returnCode;
744  }

References asyncStopRequestedWhileProcessingEvents_, asyncStopStatusCodeFromProcessingEvents_, beginJob(), deferredExceptionPtr_, deferredExceptionPtrIsSet_, doErrorStuff(), MillePedeFileConverter_cfg::e, endOfLoop(), epSuccess, Exception, exceptionMessageFiles_, exceptionMessageLumis_, exceptionMessageRuns_, fileModeNoMerge_, personalPlayback::fp, edm::InputSource::IsStop, prepareForNextLoop(), runEdmFileComparison::returnCode, rewindInput(), serviceToken_, startingNewLoop(), AlCaHLTBitMon_QueryRunRegistry::string, and edm::convertException::wrap().

Referenced by PythonEventProcessor::run(), and run().

◆ setDeferredException()

bool edm::EventProcessor::setDeferredException ( std::exception_ptr  iException)

Definition at line 1810 of file EventProcessor.cc.

1810  {
1811  bool expected = false;
1812  if (deferredExceptionPtrIsSet_.compare_exchange_strong(expected, true)) {
1813  deferredExceptionPtr_ = iException;
1814  return true;
1815  }
1816  return false;
1817  }

References deferredExceptionPtr_, and deferredExceptionPtrIsSet_.

Referenced by handleEndLumiExceptions().

◆ setExceptionMessageFiles()

void edm::EventProcessor::setExceptionMessageFiles ( std::string &  message)

Definition at line 1804 of file EventProcessor.cc.

1804 { exceptionMessageFiles_ = message; }

References exceptionMessageFiles_.

◆ setExceptionMessageLumis()

void edm::EventProcessor::setExceptionMessageLumis ( )

Definition at line 1808 of file EventProcessor.cc.

1808 { exceptionMessageLumis_ = true; }

References exceptionMessageLumis_.

Referenced by handleEndLumiExceptions().

◆ setExceptionMessageRuns()

void edm::EventProcessor::setExceptionMessageRuns ( std::string &  message)

Definition at line 1806 of file EventProcessor.cc.

1806 { exceptionMessageRuns_ = message; }

References exceptionMessageRuns_.

◆ shouldWeCloseOutput()

bool edm::EventProcessor::shouldWeCloseOutput ( ) const

Definition at line 843 of file EventProcessor.cc.

843  {
844  FDEBUG(1) << "\tshouldWeCloseOutput\n";
845  if (!subProcesses_.empty()) {
846  for (auto const& subProcess : subProcesses_) {
847  if (subProcess.shouldWeCloseOutput()) {
848  return true;
849  }
850  }
851  return false;
852  }
853  return schedule_->shouldWeCloseOutput();
854  }

References FDEBUG, schedule_, and subProcesses_.

◆ shouldWeStop()

bool edm::EventProcessor::shouldWeStop ( ) const

Definition at line 1789 of file EventProcessor.cc.

1789  {
1790  FDEBUG(1) << "\tshouldWeStop\n";
1791  if (shouldWeStop_)
1792  return true;
1793  if (!subProcesses_.empty()) {
1794  for (auto const& subProcess : subProcesses_) {
1795  if (subProcess.terminate()) {
1796  return true;
1797  }
1798  }
1799  return false;
1800  }
1801  return schedule_->terminate();
1802  }

References FDEBUG, schedule_, shouldWeStop_, and subProcesses_.

Referenced by readNextEventForStream().

◆ startingNewLoop()

void edm::EventProcessor::startingNewLoop ( )

Definition at line 807 of file EventProcessor.cc.

807  {
808  shouldWeStop_ = false;
809  //NOTE: for first loop, need to delay calling 'doStartingNewLoop'
810  // until after we've called beginOfJob
811  if (looper_ && looperBeginJobRun_) {
812  looper_->doStartingNewLoop();
813  }
814  FDEBUG(1) << "\tstartingNewLoop\n";
815  }

References FDEBUG, looper_, looperBeginJobRun_, and shouldWeStop_.

Referenced by runToCompletion().

◆ streamEndLumiAsync()

void edm::EventProcessor::streamEndLumiAsync ( edm::WaitingTaskHolder  iTask,
unsigned int  iStreamIndex 
)

Definition at line 1375 of file EventProcessor.cc.

1375  {
1376  auto t = edm::make_waiting_task(tbb::task::allocate_root(),
1377  [this, iStreamIndex, iTask](std::exception_ptr const* iPtr) mutable {
1378  if (iPtr) {
1379  handleEndLumiExceptions(iPtr, iTask);
1380  }
1381  auto status = streamLumiStatus_[iStreamIndex];
1382  //reset status before releasing queue else get race condtion
1383  streamLumiStatus_[iStreamIndex].reset();
1385  streamQueues_[iStreamIndex].resume();
1386 
1387  //are we the last one?
1388  if (status->streamFinishedLumi()) {
1390  }
1391  });
1392 
1393  edm::WaitingTaskHolder lumiDoneTask{t};
1394 
1395  //Need to be sure the lumi status is released before lumiDoneTask can every be called.
1396  // therefore we do not want to hold the shared_ptr
1397  auto lumiStatus = streamLumiStatus_[iStreamIndex].get();
1398  lumiStatus->setEndTime();
1399 
1400  EventSetupImpl const& es = lumiStatus->eventSetupImpl(esp_->subProcessIndex());
1401 
1402  bool cleaningUpAfterException = lumiStatus->cleaningUpAfterException();
1403  auto eventSetupImpls = &lumiStatus->eventSetupImpls();
1404 
1405  if (lumiStatus->didGlobalBeginSucceed()) {
1406  auto& lumiPrincipal = *lumiStatus->lumiPrincipal();
1407  IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), EventID::maxEventNumber()),
1408  lumiPrincipal.endTime());
1409  using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamEnd>;
1410  endStreamTransitionAsync<Traits>(std::move(lumiDoneTask),
1411  *schedule_,
1412  iStreamIndex,
1413  lumiPrincipal,
1414  ts,
1415  es,
1416  eventSetupImpls,
1417  serviceToken_,
1418  subProcesses_,
1419  cleaningUpAfterException);
1420  }
1421  }

References esp_, globalEndLumiAsync(), handleEndLumiExceptions(), edm::make_waiting_task(), edm::EventID::maxEventNumber(), eostools::move(), schedule_, serviceToken_, mps_update::status, streamLumiActive_, streamLumiStatus_, streamQueues_, subProcesses_, and OrderedSet::t.

Referenced by beginLumiAsync(), endUnfinishedLumi(), and handleNextEventForStreamAsync().

◆ thinnedAssociationsHelper() [1/2]

std::shared_ptr<ThinnedAssociationsHelper>& edm::EventProcessor::thinnedAssociationsHelper ( )
inlineprivate

Definition at line 294 of file EventProcessor.h.

294  {
296  }

References edm::get_underlying_safe(), and thinnedAssociationsHelper_.

◆ thinnedAssociationsHelper() [2/2]

std::shared_ptr<ThinnedAssociationsHelper const> edm::EventProcessor::thinnedAssociationsHelper ( ) const
inlineprivate

Definition at line 291 of file EventProcessor.h.

291  {
293  }

References edm::get_underlying_safe(), and thinnedAssociationsHelper_.

Referenced by init().

◆ totalEvents()

int edm::EventProcessor::totalEvents ( ) const

Return the number of events this EventProcessor has tried to process (inclues both successes and failures, including failures due to exceptions during processing).

Definition at line 616 of file EventProcessor.cc.

616 { return schedule_->totalEvents(); }

References schedule_.

Referenced by PythonEventProcessor::totalEvents().

◆ totalEventsFailed()

int edm::EventProcessor::totalEventsFailed ( ) const

Return the number of events that have not passed any trigger. (N.B. totalEventsFailed() + totalEventsPassed() == totalEvents()

Definition at line 620 of file EventProcessor.cc.

620 { return schedule_->totalEventsFailed(); }

References schedule_.

Referenced by PythonEventProcessor::totalEventsFailed().

◆ totalEventsPassed()

int edm::EventProcessor::totalEventsPassed ( ) const

Return the number of events processed by this EventProcessor which have been passed by one or more trigger paths.

Definition at line 618 of file EventProcessor.cc.

618 { return schedule_->totalEventsPassed(); }

References schedule_.

Referenced by PythonEventProcessor::totalEventsPassed().

◆ warnAboutModulesRequiringLuminosityBLockSynchronization()

void edm::EventProcessor::warnAboutModulesRequiringLuminosityBLockSynchronization ( ) const
private

Definition at line 1819 of file EventProcessor.cc.

1819  {
1820  std::unique_ptr<LogSystem> s;
1821  for (auto worker : schedule_->allWorkers()) {
1822  if (worker->wantsGlobalLuminosityBlocks() and worker->globalLuminosityBlocksQueue()) {
1823  if (not s) {
1824  s = std::make_unique<LogSystem>("ModulesSynchingOnLumis");
1825  (*s) << "The following modules require synchronizing on LuminosityBlock boundaries:";
1826  }
1827  (*s) << "\n " << worker->description().moduleName() << " " << worker->description().moduleLabel();
1828  }
1829  }
1830  }

References alignCSCRings::s, and schedule_.

Referenced by beginJob().

◆ writeLumiAsync()

void edm::EventProcessor::writeLumiAsync ( WaitingTaskHolder  task,
LuminosityBlockPrincipal lumiPrincipal 
)

Definition at line 1543 of file EventProcessor.cc.

1543  {
1544  auto subsT = edm::make_waiting_task(tbb::task::allocate_root(),
1545  [this, task, &lumiPrincipal](std::exception_ptr const* iExcept) mutable {
1546  if (iExcept) {
1547  task.doneWaiting(*iExcept);
1548  } else {
1550  for (auto& s : subProcesses_) {
1551  s.writeLumiAsync(task, lumiPrincipal);
1552  }
1553  }
1554  });
1556 
1557  lumiPrincipal.runPrincipal().mergeableRunProductMetadata()->writeLumi(lumiPrincipal.luminosityBlock());
1558 
1559  schedule_->writeLumiAsync(WaitingTaskHolder{subsT}, lumiPrincipal, &processContext_, actReg_.get());
1560  }

References actReg_, edm::LuminosityBlockPrincipal::luminosityBlock(), edm::make_waiting_task(), edm::RunPrincipal::mergeableRunProductMetadata(), processContext_, edm::LuminosityBlockPrincipal::runPrincipal(), alignCSCRings::s, schedule_, serviceToken_, subProcesses_, TrackValidation_cff::task, and edm::MergeableRunProductMetadata::writeLumi().

Referenced by globalEndLumiAsync().

◆ writeRunAsync()

void edm::EventProcessor::writeRunAsync ( WaitingTaskHolder  task,
ProcessHistoryID const &  phid,
RunNumber_t  run,
MergeableRunProductMetadata const *  mergeableRunProductMetadata 
)

Definition at line 1513 of file EventProcessor.cc.

1516  {
1517  auto subsT = edm::make_waiting_task(
1518  tbb::task::allocate_root(),
1519  [this, phid, run, task, mergeableRunProductMetadata](std::exception_ptr const* iExcept) mutable {
1520  if (iExcept) {
1521  task.doneWaiting(*iExcept);
1522  } else {
1524  for (auto& s : subProcesses_) {
1525  s.writeRunAsync(task, phid, run, mergeableRunProductMetadata);
1526  }
1527  }
1528  });
1530  schedule_->writeRunAsync(WaitingTaskHolder(subsT),
1532  &processContext_,
1533  actReg_.get(),
1534  mergeableRunProductMetadata);
1535  }

References actReg_, edm::make_waiting_task(), principalCache_, processContext_, run(), edm::PrincipalCache::runPrincipal(), alignCSCRings::s, schedule_, serviceToken_, subProcesses_, and TrackValidation_cff::task.

Referenced by endUnfinishedRun().

Member Data Documentation

◆ act_table_

std::unique_ptr<ExceptionToActionTable const> edm::EventProcessor::act_table_
private

Definition at line 318 of file EventProcessor.h.

Referenced by init().

◆ actReg_

std::shared_ptr<ActivityRegistry> edm::EventProcessor::actReg_
private

◆ asyncStopRequestedWhileProcessingEvents_

bool edm::EventProcessor::asyncStopRequestedWhileProcessingEvents_
private

Definition at line 354 of file EventProcessor.h.

Referenced by runToCompletion().

◆ asyncStopStatusCodeFromProcessingEvents_

StatusCode edm::EventProcessor::asyncStopStatusCodeFromProcessingEvents_
private

Definition at line 355 of file EventProcessor.h.

Referenced by runToCompletion().

◆ beginJobCalled_

bool edm::EventProcessor::beginJobCalled_
private

Definition at line 342 of file EventProcessor.h.

Referenced by beginJob().

◆ branchIDListHelper_

edm::propagate_const<std::shared_ptr<BranchIDListHelper> > edm::EventProcessor::branchIDListHelper_
private

Definition at line 310 of file EventProcessor.h.

Referenced by branchIDListHelper(), init(), and respondToOpenInputFile().

◆ deferredExceptionPtr_

std::exception_ptr edm::EventProcessor::deferredExceptionPtr_
private

◆ deferredExceptionPtrIsSet_

std::atomic<bool> edm::EventProcessor::deferredExceptionPtrIsSet_
private

◆ esp_

edm::propagate_const<std::shared_ptr<eventsetup::EventSetupProvider> > edm::EventProcessor::esp_
private

◆ espController_

edm::propagate_const<std::unique_ptr<eventsetup::EventSetupsController> > edm::EventProcessor::espController_
private

Definition at line 315 of file EventProcessor.h.

Referenced by beginJob(), beginLumiAsync(), beginRun(), endRun(), init(), and ~EventProcessor().

◆ eventSetupDataToExcludeFromPrefetching_

ExcludedDataMap edm::EventProcessor::eventSetupDataToExcludeFromPrefetching_
private

Definition at line 360 of file EventProcessor.h.

◆ exceptionMessageFiles_

std::string edm::EventProcessor::exceptionMessageFiles_
private

Definition at line 345 of file EventProcessor.h.

Referenced by runToCompletion(), and setExceptionMessageFiles().

◆ exceptionMessageLumis_

std::atomic<bool> edm::EventProcessor::exceptionMessageLumis_
private

Definition at line 347 of file EventProcessor.h.

Referenced by runToCompletion(), and setExceptionMessageLumis().

◆ exceptionMessageRuns_

std::string edm::EventProcessor::exceptionMessageRuns_
private

Definition at line 346 of file EventProcessor.h.

Referenced by runToCompletion(), and setExceptionMessageRuns().

◆ fb_

edm::propagate_const<std::unique_ptr<FileBlock> > edm::EventProcessor::fb_
private

◆ fileModeNoMerge_

bool edm::EventProcessor::fileModeNoMerge_
private

Definition at line 344 of file EventProcessor.h.

Referenced by init(), and runToCompletion().

◆ firstEventInBlock_

bool edm::EventProcessor::firstEventInBlock_ = true
private

Definition at line 356 of file EventProcessor.h.

◆ forceESCacheClearOnNewRun_

bool edm::EventProcessor::forceESCacheClearOnNewRun_
private

Definition at line 350 of file EventProcessor.h.

Referenced by beginRun(), and init().

◆ forceLooperToEnd_

bool edm::EventProcessor::forceLooperToEnd_
private

Definition at line 348 of file EventProcessor.h.

Referenced by endOfLoop().

◆ historyAppender_

edm::propagate_const<std::unique_ptr<HistoryAppender> > edm::EventProcessor::historyAppender_
private

Definition at line 330 of file EventProcessor.h.

Referenced by init(), readLuminosityBlock(), and readRun().

◆ input_

edm::propagate_const<std::unique_ptr<InputSource> > edm::EventProcessor::input_
private

◆ lastSourceTransition_

InputSource::ItemType edm::EventProcessor::lastSourceTransition_
private

◆ looper_

edm::propagate_const<std::shared_ptr<EDLooperBase> > edm::EventProcessor::looper_
private

◆ looperBeginJobRun_

bool edm::EventProcessor::looperBeginJobRun_
private

Definition at line 349 of file EventProcessor.h.

Referenced by beginRun(), and startingNewLoop().

◆ lumiQueue_

std::unique_ptr<edm::LimitedTaskQueue> edm::EventProcessor::lumiQueue_
private

Definition at line 325 of file EventProcessor.h.

Referenced by beginLumiAsync(), and init().

◆ mergeableRunProductProcesses_

MergeableRunProductProcesses edm::EventProcessor::mergeableRunProductProcesses_
private

Definition at line 322 of file EventProcessor.h.

Referenced by init(), and readRun().

◆ pathsAndConsumesOfModules_

PathsAndConsumesOfModules edm::EventProcessor::pathsAndConsumesOfModules_
private

Definition at line 321 of file EventProcessor.h.

Referenced by beginJob().

◆ preallocations_

PreallocationConfiguration edm::EventProcessor::preallocations_
private

◆ preg_

edm::propagate_const<std::shared_ptr<ProductRegistry> > edm::EventProcessor::preg_
private

Definition at line 309 of file EventProcessor.h.

Referenced by beginJob(), endOfLoop(), init(), preg(), and readFile().

◆ principalCache_

PrincipalCache edm::EventProcessor::principalCache_
private

◆ printDependencies_

bool edm::EventProcessor::printDependencies_ = false
private

Definition at line 362 of file EventProcessor.h.

Referenced by beginJob(), and init().

◆ processConfiguration_

std::shared_ptr<ProcessConfiguration const> edm::EventProcessor::processConfiguration_
private

Definition at line 319 of file EventProcessor.h.

Referenced by beginJob(), init(), processConfiguration(), and readRun().

◆ processContext_

ProcessContext edm::EventProcessor::processContext_
private

◆ queueWhichWaitsForIOVsToFinish_

edm::SerialTaskQueue edm::EventProcessor::queueWhichWaitsForIOVsToFinish_
private

Definition at line 317 of file EventProcessor.h.

Referenced by beginLumiAsync(), and globalEndLumiAsync().

◆ schedule_

edm::propagate_const<std::unique_ptr<Schedule> > edm::EventProcessor::schedule_
private

◆ serviceToken_

ServiceToken edm::EventProcessor::serviceToken_
private

◆ shouldWeStop_

bool edm::EventProcessor::shouldWeStop_
private

Definition at line 343 of file EventProcessor.h.

Referenced by processEventWithLooper(), shouldWeStop(), and startingNewLoop().

◆ sourceMutex_

std::shared_ptr<std::recursive_mutex> edm::EventProcessor::sourceMutex_
private

Definition at line 340 of file EventProcessor.h.

Referenced by readNextEventForStream().

◆ sourceResourcesAcquirer_

SharedResourcesAcquirer edm::EventProcessor::sourceResourcesAcquirer_
private

Definition at line 339 of file EventProcessor.h.

Referenced by beginLumiAsync(), and handleNextEventForStreamAsync().

◆ streamLumiActive_

std::atomic<unsigned int> edm::EventProcessor::streamLumiActive_ {0}
private

◆ streamLumiStatus_

std::vector<std::shared_ptr<LuminosityBlockProcessingStatus> > edm::EventProcessor::streamLumiStatus_
private

◆ streamQueues_

std::vector<edm::SerialTaskQueue> edm::EventProcessor::streamQueues_
private

Definition at line 324 of file EventProcessor.h.

Referenced by beginLumiAsync(), init(), and streamEndLumiAsync().

◆ subProcesses_

std::vector<SubProcess> edm::EventProcessor::subProcesses_
private

◆ thinnedAssociationsHelper_

edm::propagate_const<std::shared_ptr<ThinnedAssociationsHelper> > edm::EventProcessor::thinnedAssociationsHelper_
private

Definition at line 311 of file EventProcessor.h.

Referenced by init(), and thinnedAssociationsHelper().

edm::EventProcessor::looperBeginJobRun_
bool looperBeginJobRun_
Definition: EventProcessor.h:349
edm::SharedResourcesAcquirer::serialQueueChain
SerialTaskQueueChain & serialQueueChain() const
Definition: SharedResourcesAcquirer.h:54
edm::pset::Registry::instance
static Registry * instance()
Definition: Registry.cc:12
edm::EventProcessor::deleteRunFromCache
void deleteRunFromCache(ProcessHistoryID const &phid, RunNumber_t run)
Definition: EventProcessor.cc:1537
edm::EDLooperBase::Status
Status
Definition: EDLooperBase.h:79
edm::EventProcessor::historyAppender_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
Definition: EventProcessor.h:330
bk::beginJob
void beginJob()
Definition: Breakpoints.cc:14
edm::EventProcessor::thinnedAssociationsHelper
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
Definition: EventProcessor.h:291
processOptions_cff.fileMode
fileMode
Definition: processOptions_cff.py:5
edm::EventProcessor::rewindInput
void rewindInput()
Definition: EventProcessor.cc:832
edm::EventProcessor::eventSetupDataToExcludeFromPrefetching_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
Definition: EventProcessor.h:360
mps_fire.i
i
Definition: mps_fire.py:355
edm::PreallocationConfiguration::numberOfRuns
unsigned int numberOfRuns() const
Definition: PreallocationConfiguration.h:37
edm::EventProcessor::preg_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
Definition: EventProcessor.h:309
edm::popSubProcessVParameterSet
std::vector< ParameterSet > popSubProcessVParameterSet(ParameterSet &parameterSet)
Definition: SubProcess.cc:693
edm::EventProcessor::writeRunAsync
void writeRunAsync(WaitingTaskHolder, ProcessHistoryID const &phid, RunNumber_t run, MergeableRunProductMetadata const *)
Definition: EventProcessor.cc:1513
edm::EventProcessor::StatusCode
StatusCode
Definition: EventProcessor.h:74
edm::EventProcessor::endOfLoop
bool endOfLoop()
Definition: EventProcessor.cc:817
cms::Exception::addContext
void addContext(std::string const &context)
Definition: Exception.cc:165
edm::EventProcessor::input_
edm::propagate_const< std::unique_ptr< InputSource > > input_
Definition: EventProcessor.h:313
edm::EventProcessor::getToken
ServiceToken getToken()
Definition: EventProcessor.cc:610
edm::EventProcessor::startingNewLoop
void startingNewLoop()
Definition: EventProcessor.cc:807
edm::ParameterSet::getUntrackedParameterSet
ParameterSet getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
Definition: ParameterSet.cc:2129
edm::errors::LogicError
Definition: EDMException.h:37
edm::validateTopLevelParameterSets
void validateTopLevelParameterSets(ParameterSet *processParameterSet)
Definition: validateTopLevelParameterSets.cc:88
mps_update.status
status
Definition: mps_update.py:69
edm::PreallocationConfiguration::numberOfThreads
unsigned int numberOfThreads() const
Definition: PreallocationConfiguration.h:34
edm::EventProcessor::deleteLumiFromCache
void deleteLumiFromCache(LuminosityBlockProcessingStatus &)
Definition: EventProcessor.cc:1562
edm::SerialTaskQueue::resume
bool resume()
Resumes processing if the queue was paused.
Definition: SerialTaskQueue.cc:35
edm::PrincipalCache::setProcessHistoryRegistry
void setProcessHistoryRegistry(ProcessHistoryRegistry const &phr)
Definition: PrincipalCache.h:76
edm::TerminationOrigin::ExternalSignal
edm::EventProcessor::globalEndLumiAsync
void globalEndLumiAsync(edm::WaitingTaskHolder iTask, std::shared_ptr< LuminosityBlockProcessingStatus > iLumiStatus)
Definition: EventProcessor.cc:1279
edm::ParentageRegistry::instance
static ParentageRegistry * instance()
Definition: ParentageRegistry.cc:4
edm::EventProcessor::runToCompletion
StatusCode runToCompletion()
Definition: EventProcessor.cc:677
edm::LogInfo
Definition: MessageLogger.h:254
edm::EventProcessor::esp_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
Definition: EventProcessor.h:316
edm::EventProcessor::shouldWeStop_
bool shouldWeStop_
Definition: EventProcessor.h:343
edm::EventProcessor::readEvent
void readEvent(unsigned int iStreamIndex)
Definition: EventProcessor.cc:1690
edm::EventProcessor::epTimedOut
Definition: EventProcessor.h:80
edm::make_functor_task
FunctorTask< F > * make_functor_task(ALLOC &&iAlloc, F f)
Definition: FunctorTask.h:47
cms::cuda::assert
assert(be >=bs)
edm::ParentageRegistry::clear
void clear()
Not thread safe.
Definition: ParentageRegistry.cc:28
edm::second
U second(std::pair< T, U > const &p)
Definition: ParameterSet.cc:215
edm::EventProcessor::deferredExceptionPtr_
std::exception_ptr deferredExceptionPtr_
Definition: EventProcessor.h:337
edm::fillLooper
std::shared_ptr< EDLooperBase > fillLooper(eventsetup::EventSetupsController &esController, eventsetup::EventSetupProvider &cp, ParameterSet &params)
Definition: EventProcessor.cc:192
personalPlayback.fp
fp
Definition: personalPlayback.py:523
edm::EventProcessor::lumiQueue_
std::unique_ptr< edm::LimitedTaskQueue > lumiQueue_
Definition: EventProcessor.h:325
edm::EventProcessor::processContext_
ProcessContext processContext_
Definition: EventProcessor.h:320
edm::EventProcessor::lastSourceTransition_
InputSource::ItemType lastSourceTransition_
Definition: EventProcessor.h:314
edm::WaitingTaskHolder::doneWaiting
void doneWaiting(std::exception_ptr iExcept)
Definition: WaitingTaskHolder.h:75
edm::EventProcessor::pathsAndConsumesOfModules_
PathsAndConsumesOfModules pathsAndConsumesOfModules_
Definition: EventProcessor.h:321
createJobs.tmp
tmp
align.sh
Definition: createJobs.py:716
edm::EventProcessor::exceptionMessageFiles_
std::string exceptionMessageFiles_
Definition: EventProcessor.h:345
edm::makeInput
std::unique_ptr< InputSource > makeInput(ParameterSet &params, CommonParams const &common, std::shared_ptr< ProductRegistry > preg, std::shared_ptr< BranchIDListHelper > branchIDListHelper, std::shared_ptr< ThinnedAssociationsHelper > thinnedAssociationsHelper, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, PreallocationConfiguration const &allocations)
Definition: EventProcessor.cc:117
edm::EventProcessor::asyncStopRequestedWhileProcessingEvents_
bool asyncStopRequestedWhileProcessingEvents_
Definition: EventProcessor.h:354
edm::PrincipalCache::runPrincipalPtr
std::shared_ptr< RunPrincipal > const & runPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run) const
Definition: PrincipalCache.cc:27
CMS_SA_ALLOW
#define CMS_SA_ALLOW
Definition: thread_safety_macros.h:5
edm::PathsAndConsumesOfModules::initialize
void initialize(Schedule const *, std::shared_ptr< ProductRegistry const >)
Definition: PathsAndConsumesOfModules.cc:14
mps_monitormerge.items
list items
Definition: mps_monitormerge.py:29
groupFilesInBlocks.reverse
reverse
Definition: groupFilesInBlocks.py:131
edm::InputSource::IsRun
Definition: InputSource.h:78
edm::for_all
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
edm::ProcessingController::kToPreviousEvent
Definition: ProcessingController.h:57
edm::EventProcessor::processEventAsyncImpl
void processEventAsyncImpl(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
Definition: EventProcessor.cc:1709
edm::first
T first(std::pair< T, U > const &p)
Definition: ParameterSet.cc:210
edm::PreallocationConfiguration::numberOfLuminosityBlocks
unsigned int numberOfLuminosityBlocks() const
Definition: PreallocationConfiguration.h:36
edm::PrincipalCache::runPrincipal
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const
Definition: PrincipalCache.cc:20
edm::pset::Registry::clear
void clear()
Not thread safe.
Definition: Registry.cc:40
edm::ProcessContext::setProcessConfiguration
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
Definition: ProcessContext.cc:19
alignCSCRings.s
s
Definition: alignCSCRings.py:92
edm::EventProcessor::exceptionMessageLumis_
std::atomic< bool > exceptionMessageLumis_
Definition: EventProcessor.h:347
edm::ProcessingController::ForwardState
ForwardState
Definition: ProcessingController.h:31
edm::EventProcessor::readAndMergeLumi
int readAndMergeLumi(LuminosityBlockProcessingStatus &)
Definition: EventProcessor.cc:1496
edm::EventProcessor::queueWhichWaitsForIOVsToFinish_
edm::SerialTaskQueue queueWhichWaitsForIOVsToFinish_
Definition: EventProcessor.h:317
edm::EventProcessor::epSignal
Definition: EventProcessor.h:78
edm::SerialTaskQueue::push
void push(const T &iAction)
asynchronously pushes functor iAction into queue
Definition: SerialTaskQueue.h:187
edm::WaitingTaskHolder::taskHasFailed
bool taskHasFailed() const
Definition: WaitingTaskHolder.h:58
edm::SerialTaskQueueChain::push
void push(T &&iAction)
asynchronously pushes functor iAction into queue
Definition: SerialTaskQueueChain.h:86
edm::PrincipalCache::merge
void merge(std::shared_ptr< RunAuxiliary > aux, std::shared_ptr< ProductRegistry const > reg)
Definition: PrincipalCache.cc:53
edm::EventProcessor::streamQueues_
std::vector< edm::SerialTaskQueue > streamQueues_
Definition: EventProcessor.h:324
edm::EventProcessor::epInputComplete
Definition: EventProcessor.h:79
edm::convertException::wrap
auto wrap(F iFunc) -> decltype(iFunc())
Definition: ConvertException.h:19
ScheduleInfo
edm::EventProcessor::readLuminosityBlock
void readLuminosityBlock(LuminosityBlockProcessingStatus &)
Definition: EventProcessor.cc:1477
edm::InputSource::IsSynchronize
Definition: InputSource.h:78
TrackValidation_cff.task
task
Definition: TrackValidation_cff.py:252
edm::EventProcessor::sourceMutex_
std::shared_ptr< std::recursive_mutex > sourceMutex_
Definition: EventProcessor.h:340
edm::RunPrincipal::setEndTime
void setEndTime(Timestamp const &time)
Definition: RunPrincipal.h:71
edm::EventProcessor::beginJobCalled_
bool beginJobCalled_
Definition: EventProcessor.h:342
edm::ProcessingController::kToSpecifiedEvent
Definition: ProcessingController.h:57
Service
edm::checkForModuleDependencyCorrectness
void checkForModuleDependencyCorrectness(edm::PathsAndConsumesOfModulesBase const &iPnC, bool iPrintDependencies)
Definition: PathsAndConsumesOfModules.cc:130
LuminosityBlock
WaitingTaskHolder
edm::EventProcessor::fileModeNoMerge_
bool fileModeNoMerge_
Definition: EventProcessor.h:344
Event
IOVSyncValue
edm::EventProcessor::warnAboutModulesRequiringLuminosityBLockSynchronization
void warnAboutModulesRequiringLuminosityBLockSynchronization() const
Definition: EventProcessor.cc:1819
h
runTheMatrix.nThreads
nThreads
Definition: runTheMatrix.py:344
OrderedSet.t
t
Definition: OrderedSet.py:90
edm::make_waiting_task
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:87
edm::IllegalParameters::setThrowAnException
static void setThrowAnException(bool v)
Definition: IllegalParameters.h:16
edm::EventProcessor::actReg_
std::shared_ptr< ActivityRegistry > actReg_
Definition: EventProcessor.h:308
edm::EventID::maxEventNumber
static EventNumber_t maxEventNumber()
Definition: EventID.h:96
edm::EventProcessor::exceptionMessageRuns_
std::string exceptionMessageRuns_
Definition: EventProcessor.h:346
edm::PrincipalCache::preReadFile
void preReadFile()
Definition: PrincipalCache.cc:142
edm::SharedResourcesRegistry::instance
static SharedResourcesRegistry * instance()
Definition: SharedResourcesRegistry.cc:25
AlCaHLTBitMon_QueryRunRegistry.string
string
Definition: AlCaHLTBitMon_QueryRunRegistry.py:256
edm::EventProcessor::printDependencies_
bool printDependencies_
Definition: EventProcessor.h:362
edm::EventProcessor::preallocations_
PreallocationConfiguration preallocations_
Definition: EventProcessor.h:352
edm::SerialTaskQueue::pause
bool pause()
Pauses processing of additional tasks from the queue.
Definition: SerialTaskQueue.h:96
edm::SubProcess::doEndJob
void doEndJob()
Definition: SubProcess.cc:215
edm::get_underlying_safe
std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
Definition: get_underlying_safe.h:40
summarizeEdmComparisonLogfiles.succeeded
succeeded
Definition: summarizeEdmComparisonLogfiles.py:101
edm::EventProcessor::act_table_
std::unique_ptr< ExceptionToActionTable const > act_table_
Definition: EventProcessor.h:318
edm::serviceregistry::kConfigurationOverrides
Definition: ServiceLegacy.h:29
edm::ServiceRegistry::Operate
friend class Operate
Definition: ServiceRegistry.h:54
edm::EventProcessor::beginLumiAsync
void beginLumiAsync(edm::IOVSyncValue const &iSyncValue, std::shared_ptr< void > const &iRunResource, edm::WaitingTaskHolder iHolder)
Definition: EventProcessor.cc:1061
edm::EventProcessor::espController_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
Definition: EventProcessor.h:315
edm::EventProcessor::streamLumiStatus_
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
Definition: EventProcessor.h:326
ParameterSet
Definition: Functions.h:16
edm::WaitingTaskHolder
Definition: WaitingTaskHolder.h:30
FDEBUG
#define FDEBUG(lev)
Definition: DebugMacros.h:19
edm::InputSource::IsLumi
Definition: InputSource.h:78
edm::EventProcessor::streamEndLumiAsync
void streamEndLumiAsync(edm::WaitingTaskHolder iTask, unsigned int iStreamIndex)
Definition: EventProcessor.cc:1375
edm::shutdown_flag
volatile std::atomic< bool > shutdown_flag
Definition: UnixSignalHandlers.cc:22
edm::EventProcessor::epException
Definition: EventProcessor.h:76
edm::EventProcessor::serviceToken_
ServiceToken serviceToken_
Definition: EventProcessor.h:312
edm::serviceregistry::kOverlapIsError
Definition: ServiceLegacy.h:29
edm::PrincipalCache::deleteRun
void deleteRun(ProcessHistoryID const &phid, RunNumber_t run)
Definition: PrincipalCache.cc:103
edm::EventProcessor::lastTransitionType
InputSource::ItemType lastTransitionType() const
Definition: EventProcessor.h:194
edm::Service
Definition: Service.h:30
edm::InputSource::doEndJob
void doEndJob()
Called by framework at end of job.
Definition: InputSource.cc:207
edm::EventProcessor::thinnedAssociationsHelper_
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
Definition: EventProcessor.h:311
edm::EventProcessor::readNextEventForStream
bool readNextEventForStream(unsigned int iStreamIndex, LuminosityBlockProcessingStatus &iLumiStatus)
Definition: EventProcessor.cc:1570
runEdmFileComparison.returnCode
returnCode
Definition: runEdmFileComparison.py:263
cuy.rep
rep
Definition: cuy.py:1190
edm::EventProcessor::forceLooperToEnd_
bool forceLooperToEnd_
Definition: EventProcessor.h:348
edm::EventProcessor::epCountComplete
Definition: EventProcessor.h:81
edm::InputSource::IsStop
Definition: InputSource.h:78
edm::PrincipalCache::adjustEventsToNewProductRegistry
void adjustEventsToNewProductRegistry(std::shared_ptr< ProductRegistry const > reg)
Definition: PrincipalCache.cc:120
edm::EventProcessor::fb_
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
Definition: EventProcessor.h:332
edm::EDLooperBase::kContinue
Definition: EDLooperBase.h:79
edm::EventProcessor::epOther
Definition: EventProcessor.h:77
HltBtagPostValidation_cff.c
c
Definition: HltBtagPostValidation_cff.py:31
edm::EDLooperBase::endOfJob
virtual void endOfJob()
Definition: EDLooperBase.cc:90
ServiceToken
edm::EventProcessor::streamLumiActive_
std::atomic< unsigned int > streamLumiActive_
Definition: EventProcessor.h:327
edm::EventProcessor::mergeableRunProductProcesses_
MergeableRunProductProcesses mergeableRunProductProcesses_
Definition: EventProcessor.h:322
edm::make_empty_waiting_task
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
Definition: WaitingTaskList.h:96
edm::EventProcessor::asyncStopStatusCodeFromProcessingEvents_
StatusCode asyncStopStatusCodeFromProcessingEvents_
Definition: EventProcessor.h:355
edm::LimitedTaskQueue::Resumer
Definition: LimitedTaskQueue.h:66
edm::EventProcessor::looper_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
Definition: EventProcessor.h:333
edm::EventProcessor::sourceResourcesAcquirer_
SharedResourcesAcquirer sourceResourcesAcquirer_
Definition: EventProcessor.h:339
edm::EventProcessor::setDeferredException
bool setDeferredException(std::exception_ptr)
Definition: EventProcessor.cc:1810
edm::InputSource::ItemType
ItemType
Definition: InputSource.h:78
edm::EventProcessor::continueLumiAsync
void continueLumiAsync(edm::WaitingTaskHolder iHolder)
Definition: EventProcessor.cc:1251
edm::EventProcessor::doErrorStuff
void doErrorStuff()
Definition: EventProcessor.cc:856
ModuleChanger
edm::ParentageRegistry::insertMapped
bool insertMapped(value_type const &v)
Definition: ParentageRegistry.cc:24
eostools.move
def move(src, dest)
Definition: eostools.py:511
edm::RunPrincipal::mergeableRunProductMetadata
MergeableRunProductMetadata * mergeableRunProductMetadata()
Definition: RunPrincipal.h:79
edm::EventProcessor::principalCache_
PrincipalCache principalCache_
Definition: EventProcessor.h:341
ProcessingController
edm::EventProcessor::branchIDListHelper
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
Definition: EventProcessor.h:287
edm::PrincipalCache::getAvailableLumiPrincipalPtr
std::shared_ptr< LuminosityBlockPrincipal > getAvailableLumiPrincipalPtr()
Definition: PrincipalCache.cc:49
edm::EventProcessor::deferredExceptionPtrIsSet_
std::atomic< bool > deferredExceptionPtrIsSet_
Definition: EventProcessor.h:336
edm::PrincipalCache::hasRunPrincipal
bool hasRunPrincipal() const
Definition: PrincipalCache.h:57
edm::EventProcessor::epSuccess
Definition: EventProcessor.h:75
ev
bool ev
Definition: Hydjet2Hadronizer.cc:95
edm::EventProcessor::processConfiguration_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
Definition: EventProcessor.h:319
Exception
Definition: hltDiff.cc:246
edm::EventProcessor::run
StatusCode run()
Definition: EventProcessor.h:367
edm::MergeableRunProductMetadata::preWriteRun
void preWriteRun()
Definition: MergeableRunProductMetadata.cc:125
edm::parameterSet
ParameterSet const & parameterSet(Provenance const &provenance, ProcessHistory const &history)
Definition: Provenance.cc:11
edm::EventProcessor::handleEndLumiExceptions
void handleEndLumiExceptions(std::exception_ptr const *iPtr, WaitingTaskHolder &holder)
Definition: EventProcessor.cc:1270
or
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
edm::EventProcessor::processEventWithLooper
void processEventWithLooper(EventPrincipal &, unsigned int iStreamIndex)
Definition: EventProcessor.cc:1761
edm::PreallocationConfiguration::numberOfStreams
unsigned int numberOfStreams() const
Definition: PreallocationConfiguration.h:35
AlignmentPI::index
index
Definition: AlignmentPayloadInspectorHelper.h:46
edm::FileBlock::ParallelProcesses
Definition: FileBlock.h:28
edm::EventProcessor::endRun
void endRun(ProcessHistoryID const &phid, RunNumber_t run, bool globalBeginSucceeded, bool cleaningUpAfterException)
Definition: EventProcessor.cc:971
edm::EventProcessor::setExceptionMessageLumis
void setExceptionMessageLumis()
Definition: EventProcessor.cc:1808
edm::EventProcessor::looper
std::shared_ptr< EDLooperBase const > looper() const
Definition: EventProcessor.h:297
cms::Exception
Definition: Exception.h:70
edm::EventProcessor::nextLuminosityBlockID
edm::LuminosityBlockNumber_t nextLuminosityBlockID()
Definition: EventProcessor.cc:675
edm::InputSource::IsEvent
Definition: InputSource.h:78
edm::EventProcessor::subProcesses_
std::vector< SubProcess > subProcesses_
Definition: EventProcessor.h:329
edm::EventProcessor::nextTransitionType
InputSource::ItemType nextTransitionType()
Definition: EventProcessor.cc:645
edm::EventProcessor::prepareForNextLoop
void prepareForNextLoop()
Definition: EventProcessor.cc:838
edm::PrincipalCache::eventPrincipal
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
Definition: PrincipalCache.h:61
edm::EventProcessor::init
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
Definition: EventProcessor.cc:325
edm::EventProcessor::processEventAsync
void processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
Definition: EventProcessor.cc:1704
event
Definition: event.py:1
edm::ProcessingController::ReverseState
ReverseState
Definition: ProcessingController.h:38
edm::EventProcessor::shouldWeStop
bool shouldWeStop() const
Definition: EventProcessor.cc:1789
edm::PrincipalCache::insert
void insert(std::shared_ptr< RunPrincipal > rp)
Definition: PrincipalCache.cc:81
edm::errors::Configuration
Definition: EDMException.h:36
SiStripBadComponentsDQMServiceTemplate_cfg.ep
ep
Definition: SiStripBadComponentsDQMServiceTemplate_cfg.py:86
edm::EventProcessor::preg
std::shared_ptr< ProductRegistry const > preg() const
Definition: EventProcessor.h:285
edm::EventProcessor::schedule_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
Definition: EventProcessor.h:323
edm::EventProcessor::writeLumiAsync
void writeLumiAsync(WaitingTaskHolder, LuminosityBlockPrincipal &lumiPrincipal)
Definition: EventProcessor.cc:1543
edm::EventProcessor::handleNextEventForStreamAsync
void handleNextEventForStreamAsync(WaitingTaskHolder iTask, unsigned int iStreamIndex)
Definition: EventProcessor.cc:1632
edm::EventProcessor::branchIDListHelper_
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
Definition: EventProcessor.h:310
trackingPlots.common
common
Definition: trackingPlots.py:205
edm::PrincipalCache::adjustIndexesAfterProductRegistryAddition
void adjustIndexesAfterProductRegistryAddition()
Definition: PrincipalCache.cc:130
edm::LuminosityBlockID::maxLuminosityBlockNumber
static LuminosityBlockNumber_t maxLuminosityBlockNumber()
Definition: LuminosityBlockID.h:84
edm::EventProcessor::checkForAsyncStopRequest
bool checkForAsyncStopRequest(StatusCode &)
Definition: EventProcessor.cc:634
common
Definition: common.py:1
edm::EventProcessor::forceESCacheClearOnNewRun_
bool forceESCacheClearOnNewRun_
Definition: EventProcessor.h:350
findQualityFiles.size
size
Write out results.
Definition: findQualityFiles.py:443
MillePedeFileConverter_cfg.e
e
Definition: MillePedeFileConverter_cfg.py:37
edm::EventProcessor::beginJob
void beginJob()
Definition: EventProcessor.cc:522
edm::PrincipalCache::setNumberOfConcurrentPrincipals
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
Definition: PrincipalCache.cc:16
unpackBuffers-CaloStage2.token
token
Definition: unpackBuffers-CaloStage2.py:316
edm::MergeableRunProductProcesses::setProcessesWithMergeableRunProducts
void setProcessesWithMergeableRunProducts(ProductRegistry const &productRegistry)
Definition: MergeableRunProductProcesses.cc:18