CMS 3D CMS Logo

List of all members | Public Types | Public Member Functions | Private Types | Private Member Functions | Private Attributes
edm::EventProcessor Class Reference

#include <EventProcessor.h>

Public Types

using ProcessBlockType = PrincipalCache::ProcessBlockType
 
enum  StatusCode {
  epSuccess = 0, epException = 1, epOther = 2, epSignal = 3,
  epInputComplete = 4, epTimedOut = 5, epCountComplete = 6
}
 

Public Member Functions

void beginJob ()
 
void beginLumiAsync (edm::IOVSyncValue const &iSyncValue, std::shared_ptr< void > const &iRunResource, edm::WaitingTaskHolder iHolder)
 
void beginProcessBlock (bool &beginProcessBlockSucceeded)
 
void beginRun (ProcessHistoryID const &phid, RunNumber_t run, bool &globalBeginSucceeded, bool &eventSetupForInstanceSucceeded)
 
void clearCounters ()
 Clears counters used by trigger report. More...
 
void closeInputFile (bool cleaningUpAfterException)
 
void closeOutputFiles ()
 
void continueLumiAsync (edm::WaitingTaskHolder iHolder)
 
void deleteLumiFromCache (LuminosityBlockProcessingStatus &)
 
void deleteRunFromCache (ProcessHistoryID const &phid, RunNumber_t run)
 
void doErrorStuff ()
 
void enableEndPaths (bool active)
 
void endJob ()
 
bool endOfLoop ()
 
bool endPathsEnabled () const
 
void endProcessBlock (bool cleaningUpAfterException, bool beginProcessBlockSucceeded)
 
void endRun (ProcessHistoryID const &phid, RunNumber_t run, bool globalBeginSucceeded, bool cleaningUpAfterException)
 
void endUnfinishedLumi ()
 
void endUnfinishedRun (ProcessHistoryID const &phid, RunNumber_t run, bool globalBeginSucceeded, bool cleaningUpAfterException, bool eventSetupForInstanceSucceeded)
 
 EventProcessor (EventProcessor const &)=delete
 
 EventProcessor (std::shared_ptr< ProcessDesc > processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy legacy)
 
 EventProcessor (std::unique_ptr< ParameterSet > parameterSet, ServiceToken const &token=ServiceToken(), serviceregistry::ServiceLegacy=serviceregistry::kOverlapIsError, std::vector< std::string > const &defaultServices=std::vector< std::string >(), std::vector< std::string > const &forcedServices=std::vector< std::string >())
 
 EventProcessor (std::unique_ptr< ParameterSet > parameterSet, std::vector< std::string > const &defaultServices, std::vector< std::string > const &forcedServices=std::vector< std::string >())
 
std::vector< ModuleDescription const * > getAllModuleDescriptions () const
 
ServiceToken getToken ()
 
void globalEndLumiAsync (edm::WaitingTaskHolder iTask, std::shared_ptr< LuminosityBlockProcessingStatus > iLumiStatus)
 
void handleEndLumiExceptions (std::exception_ptr const *iPtr, WaitingTaskHolder &holder)
 
void inputProcessBlocks ()
 
InputSource::ItemType lastTransitionType () const
 
edm::LuminosityBlockNumber_t nextLuminosityBlockID ()
 
std::pair< edm::ProcessHistoryID, edm::RunNumber_tnextRunID ()
 
InputSource::ItemType nextTransitionType ()
 
void openOutputFiles ()
 
EventProcessoroperator= (EventProcessor const &)=delete
 
void prepareForNextLoop ()
 
ProcessConfiguration const & processConfiguration () const
 
InputSource::ItemType processLumis (std::shared_ptr< void > const &iRunResource)
 
int readAndMergeLumi (LuminosityBlockProcessingStatus &)
 
std::pair< ProcessHistoryID, RunNumber_treadAndMergeRun ()
 
void readFile ()
 
void readLuminosityBlock (LuminosityBlockProcessingStatus &)
 
std::pair< ProcessHistoryID, RunNumber_treadRun ()
 
void respondToCloseInputFile ()
 
void respondToOpenInputFile ()
 
void rewindInput ()
 
StatusCode run ()
 
StatusCode runToCompletion ()
 
bool setDeferredException (std::exception_ptr)
 
void setExceptionMessageFiles (std::string &message)
 
void setExceptionMessageLumis ()
 
void setExceptionMessageRuns (std::string &message)
 
bool shouldWeCloseOutput () const
 
bool shouldWeStop () const
 
void startingNewLoop ()
 
void streamEndLumiAsync (edm::WaitingTaskHolder iTask, unsigned int iStreamIndex)
 
void taskCleanup ()
 
int totalEvents () const
 
int totalEventsFailed () const
 
int totalEventsPassed () const
 
void writeLumiAsync (WaitingTaskHolder, LuminosityBlockPrincipal &lumiPrincipal)
 
void writeProcessBlockAsync (WaitingTaskHolder, ProcessBlockType)
 
void writeRunAsync (WaitingTaskHolder, ProcessHistoryID const &phid, RunNumber_t run, MergeableRunProductMetadata const *)
 
 ~EventProcessor ()
 

Private Types

typedef std::set< std::pair< std::string, std::string > > ExcludedData
 
typedef std::map< std::string, ExcludedDataExcludedDataMap
 

Private Member Functions

std::shared_ptr< BranchIDListHelper > & branchIDListHelper ()
 
std::shared_ptr< BranchIDListHelper const > branchIDListHelper () const
 
bool checkForAsyncStopRequest (StatusCode &)
 
void handleNextEventForStreamAsync (WaitingTaskHolder iTask, unsigned int iStreamIndex)
 
void init (std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
 
std::shared_ptr< EDLooperBase > & looper ()
 
std::shared_ptr< EDLooperBase const > looper () const
 
std::shared_ptr< ProductRegistry > & preg ()
 
std::shared_ptr< ProductRegistry const > preg () const
 
void processEventAsync (WaitingTaskHolder iHolder, unsigned int iStreamIndex)
 
void processEventAsyncImpl (WaitingTaskHolder iHolder, unsigned int iStreamIndex)
 
void processEventWithLooper (EventPrincipal &, unsigned int iStreamIndex)
 
void readEvent (unsigned int iStreamIndex)
 
bool readNextEventForStream (unsigned int iStreamIndex, LuminosityBlockProcessingStatus &iLumiStatus)
 
std::shared_ptr< ThinnedAssociationsHelper > & thinnedAssociationsHelper ()
 
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper () const
 
void warnAboutModulesRequiringLuminosityBLockSynchronization () const
 

Private Attributes

std::unique_ptr< ExceptionToActionTable const > act_table_
 
std::shared_ptr< ActivityRegistryactReg_
 
bool asyncStopRequestedWhileProcessingEvents_
 
StatusCode asyncStopStatusCodeFromProcessingEvents_
 
bool beginJobCalled_
 
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
 
std::exception_ptr deferredExceptionPtr_
 
std::atomic< bool > deferredExceptionPtrIsSet_
 
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
 
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
 
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
 
std::string exceptionMessageFiles_
 
std::atomic< bool > exceptionMessageLumis_
 
std::string exceptionMessageRuns_
 
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
 
bool fileModeNoMerge_
 
bool firstEventInBlock_ = true
 
bool forceESCacheClearOnNewRun_
 
bool forceLooperToEnd_
 
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
 
edm::propagate_const< std::unique_ptr< InputSource > > input_
 
InputSource::ItemType lastSourceTransition_
 
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
 
bool looperBeginJobRun_
 
std::unique_ptr< edm::LimitedTaskQueuelumiQueue_
 
MergeableRunProductProcesses mergeableRunProductProcesses_
 
PathsAndConsumesOfModules pathsAndConsumesOfModules_
 
PreallocationConfiguration preallocations_
 
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
 
PrincipalCache principalCache_
 
bool printDependencies_ = false
 
std::shared_ptr< ProcessConfiguration const > processConfiguration_
 
ProcessContext processContext_
 
edm::SerialTaskQueue queueWhichWaitsForIOVsToFinish_
 
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
 
ServiceToken serviceToken_
 
bool shouldWeStop_
 
std::shared_ptr< std::recursive_mutex > sourceMutex_
 
SharedResourcesAcquirer sourceResourcesAcquirer_
 
std::atomic< unsigned int > streamLumiActive_ {0}
 
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
 
std::vector< edm::SerialTaskQueuestreamQueues_
 
std::vector< SubProcesssubProcesses_
 
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
 

Detailed Description

Definition at line 64 of file EventProcessor.h.

Member Typedef Documentation

◆ ExcludedData

typedef std::set<std::pair<std::string, std::string> > edm::EventProcessor::ExcludedData
private

Definition at line 362 of file EventProcessor.h.

◆ ExcludedDataMap

typedef std::map<std::string, ExcludedData> edm::EventProcessor::ExcludedDataMap
private

Definition at line 363 of file EventProcessor.h.

◆ ProcessBlockType

Definition at line 247 of file EventProcessor.h.

Member Enumeration Documentation

◆ StatusCode

Enumerator
epSuccess 
epException 
epOther 
epSignal 
epInputComplete 
epTimedOut 
epCountComplete 

Definition at line 74 of file EventProcessor.h.

74  {
75  epSuccess = 0,
76  epException = 1,
77  epOther = 2,
78  epSignal = 3,
79  epInputComplete = 4,
80  epTimedOut = 5,
81  epCountComplete = 6
82  };

Constructor & Destructor Documentation

◆ EventProcessor() [1/4]

edm::EventProcessor::EventProcessor ( std::unique_ptr< ParameterSet parameterSet,
ServiceToken const &  token = ServiceToken(),
serviceregistry::ServiceLegacy  iLegacy = serviceregistry::kOverlapIsError,
std::vector< std::string > const &  defaultServices = std::vector<std::string>(),
std::vector< std::string > const &  forcedServices = std::vector<std::string>() 
)
explicit

Definition at line 218 of file EventProcessor.cc.

223  : actReg_(),
224  preg_(),
226  serviceToken_(),
227  input_(),
228  espController_(new eventsetup::EventSetupsController),
229  esp_(),
230  act_table_(),
232  schedule_(),
233  subProcesses_(),
234  historyAppender_(new HistoryAppender),
235  fb_(),
236  looper_(),
238  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
239  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
240  principalCache_(),
241  beginJobCalled_(false),
242  shouldWeStop_(false),
243  fileModeNoMerge_(false),
246  exceptionMessageLumis_(false),
247  forceLooperToEnd_(false),
248  looperBeginJobRun_(false),
251  auto processDesc = std::make_shared<ProcessDesc>(std::move(parameterSet));
252  processDesc->addServices(defaultServices, forcedServices);
253  init(processDesc, iToken, iLegacy);
254  }

References init(), eostools::move(), and edm::parameterSet().

◆ EventProcessor() [2/4]

edm::EventProcessor::EventProcessor ( std::unique_ptr< ParameterSet parameterSet,
std::vector< std::string > const &  defaultServices,
std::vector< std::string > const &  forcedServices = std::vector<std::string>() 
)

Definition at line 256 of file EventProcessor.cc.

259  : actReg_(),
260  preg_(),
262  serviceToken_(),
263  input_(),
264  espController_(new eventsetup::EventSetupsController),
265  esp_(),
266  act_table_(),
268  schedule_(),
269  subProcesses_(),
270  historyAppender_(new HistoryAppender),
271  fb_(),
272  looper_(),
274  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
275  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
276  principalCache_(),
277  beginJobCalled_(false),
278  shouldWeStop_(false),
279  fileModeNoMerge_(false),
282  exceptionMessageLumis_(false),
283  forceLooperToEnd_(false),
284  looperBeginJobRun_(false),
288  auto processDesc = std::make_shared<ProcessDesc>(std::move(parameterSet));
289  processDesc->addServices(defaultServices, forcedServices);
291  }

References init(), edm::serviceregistry::kOverlapIsError, eostools::move(), and edm::parameterSet().

◆ EventProcessor() [3/4]

edm::EventProcessor::EventProcessor ( std::shared_ptr< ProcessDesc processDesc,
ServiceToken const &  token,
serviceregistry::ServiceLegacy  legacy 
)

Definition at line 293 of file EventProcessor.cc.

296  : actReg_(),
297  preg_(),
299  serviceToken_(),
300  input_(),
301  espController_(new eventsetup::EventSetupsController),
302  esp_(),
303  act_table_(),
305  schedule_(),
306  subProcesses_(),
307  historyAppender_(new HistoryAppender),
308  fb_(),
309  looper_(),
311  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
312  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
313  principalCache_(),
314  beginJobCalled_(false),
315  shouldWeStop_(false),
316  fileModeNoMerge_(false),
319  exceptionMessageLumis_(false),
320  forceLooperToEnd_(false),
321  looperBeginJobRun_(false),
325  init(processDesc, token, legacy);
326  }

References init(), and unpackBuffers-CaloStage2::token.

◆ ~EventProcessor()

edm::EventProcessor::~EventProcessor ( )

Definition at line 515 of file EventProcessor.cc.

515  {
516  // Make the services available while everything is being deleted.
519 
520  // manually destroy all these thing that may need the services around
521  // propagate_const<T> has no reset() function
522  espController_ = nullptr;
523  esp_ = nullptr;
524  schedule_ = nullptr;
525  input_ = nullptr;
526  looper_ = nullptr;
527  actReg_ = nullptr;
528 
531  }

References actReg_, edm::ParentageRegistry::clear(), edm::pset::Registry::clear(), esp_, espController_, getToken(), input_, edm::ParentageRegistry::instance(), edm::pset::Registry::instance(), looper_, schedule_, and unpackBuffers-CaloStage2::token.

◆ EventProcessor() [4/4]

edm::EventProcessor::EventProcessor ( EventProcessor const &  )
delete

Member Function Documentation

◆ beginJob()

void edm::EventProcessor::beginJob ( void  )

This should be called before the first call to 'run' If this is not called in time, it will automatically be called the first time 'run' is called

Definition at line 535 of file EventProcessor.cc.

535  {
536  if (beginJobCalled_)
537  return;
538  beginJobCalled_ = true;
539  bk::beginJob();
540 
541  // StateSentry toerror(this); // should we add this ?
542  //make the services available
544 
545  service::SystemBounds bounds(preallocations_.numberOfStreams(),
549  actReg_->preallocateSignal_(bounds);
550  schedule_->convertCurrentProcessAlias(processConfiguration_->processName());
552 
553  //NOTE: this may throw
555  actReg_->preBeginJobSignal_(pathsAndConsumesOfModules_, processContext_);
556 
559  }
560  //NOTE: This implementation assumes 'Job' means one call
561  // the EventProcessor::run
562  // If it really means once per 'application' then this code will
563  // have to be changed.
564  // Also have to deal with case where have 'run' then new Module
565  // added and do 'run'
566  // again. In that case the newly added Module needs its 'beginJob'
567  // to be called.
568 
569  //NOTE: in future we should have a beginOfJob for looper that takes no arguments
570  // For now we delay calling beginOfJob until first beginOfRun
571  //if(looper_) {
572  // looper_->beginOfJob(es);
573  //}
574  try {
575  convertException::wrap([&]() { input_->doBeginJob(); });
576  } catch (cms::Exception& ex) {
577  ex.addContext("Calling beginJob for the source");
578  throw;
579  }
580  espController_->finishConfiguration();
581  schedule_->beginJob(*preg_, esp_->recordsToProxyIndices());
582  // toerror.succeeded(); // should we add this?
583  for_all(subProcesses_, [](auto& subProcess) { subProcess.doBeginJob(); });
584  actReg_->postBeginJobSignal_();
585 
586  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
587  schedule_->beginStream(i);
588  for_all(subProcesses_, [i](auto& subProcess) { subProcess.doBeginStream(i); });
589  }
590  }

References actReg_, cms::Exception::addContext(), bk::beginJob(), beginJobCalled_, edm::checkForModuleDependencyCorrectness(), esp_, espController_, edm::for_all(), mps_fire::i, edm::PathsAndConsumesOfModules::initialize(), input_, edm::PreallocationConfiguration::numberOfLuminosityBlocks(), edm::PreallocationConfiguration::numberOfRuns(), edm::PreallocationConfiguration::numberOfStreams(), edm::PreallocationConfiguration::numberOfThreads(), pathsAndConsumesOfModules_, preallocations_, preg(), preg_, printDependencies_, processConfiguration_, processContext_, schedule_, serviceToken_, subProcesses_, warnAboutModulesRequiringLuminosityBLockSynchronization(), and edm::convertException::wrap().

Referenced by runToCompletion().

◆ beginLumiAsync()

void edm::EventProcessor::beginLumiAsync ( edm::IOVSyncValue const &  iSyncValue,
std::shared_ptr< void > const &  iRunResource,
edm::WaitingTaskHolder  iHolder 
)

Definition at line 1155 of file EventProcessor.cc.

1157  {
1158  if (iHolder.taskHasFailed()) {
1159  return;
1160  }
1161 
1162  // We must be careful with the status object here and in code this function calls. IF we want
1163  // endRun to be called, then we must call resetResources before the things waiting on
1164  // iHolder are allowed to proceed. Otherwise, there will be race condition (possibly causing
1165  // endRun to be called much later than it should be, because status is holding iRunResource).
1166  // Note that this must be done explicitly. Relying on the destructor does not work well
1167  // because the LimitedTaskQueue for the lumiWork holds the shared_ptr in each of its internal
1168  // queues, plus it is difficult to guarantee the destructor is called before iHolder gets
1169  // destroyed inside this function and lumiWork.
1170  auto status =
1171  std::make_shared<LuminosityBlockProcessingStatus>(this, preallocations_.numberOfStreams(), iRunResource);
1172 
1173  auto lumiWork = [this, iHolder, status](edm::LimitedTaskQueue::Resumer iResumer) mutable {
1174  if (iHolder.taskHasFailed()) {
1175  status->resetResources();
1176  return;
1177  }
1178 
1179  status->setResumer(std::move(iResumer));
1180 
1181  sourceResourcesAcquirer_.serialQueueChain().push([this, iHolder, status = std::move(status)]() mutable {
1182  //make the services available
1184  // Caught exception is propagated via WaitingTaskHolder
1185  CMS_SA_ALLOW try {
1187 
1188  LuminosityBlockPrincipal& lumiPrincipal = *status->lumiPrincipal();
1189  {
1190  SendSourceTerminationSignalIfException sentry(actReg_.get());
1191 
1192  input_->doBeginLumi(lumiPrincipal, &processContext_);
1193  sentry.completedSuccessfully();
1194  }
1195 
1197  if (rng.isAvailable()) {
1198  LuminosityBlock lb(lumiPrincipal, ModuleDescription(), nullptr, false);
1199  rng->preBeginLumi(lb);
1200  }
1201 
1202  //Task to start the stream beginLumis
1203  auto beginStreamsTask = make_waiting_task(
1204  tbb::task::allocate_root(), [this, holder = iHolder, status](std::exception_ptr const* iPtr) mutable {
1205  if (iPtr) {
1206  status->resetResources();
1207  holder.doneWaiting(*iPtr);
1208  } else {
1209  status->globalBeginDidSucceed();
1210  EventSetupImpl const& es = status->eventSetupImpl(esp_->subProcessIndex());
1211 
1212  if (looper_) {
1213  // Caught exception is propagated via WaitingTaskHolder
1214  CMS_SA_ALLOW try {
1215  //make the services available
1216  ServiceRegistry::Operate operateLooper(serviceToken_);
1217  looper_->doBeginLuminosityBlock(*(status->lumiPrincipal()), es, &processContext_);
1218  } catch (...) {
1219  status->resetResources();
1220  holder.doneWaiting(std::current_exception());
1221  return;
1222  }
1223  }
1224  using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamBegin>;
1225 
1226  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1227  streamQueues_[i].push([this, i, status, holder, &es]() mutable {
1228  streamQueues_[i].pause();
1229 
1230  auto eventTask =
1231  edm::make_waiting_task(tbb::task::allocate_root(),
1232  [this, i, h = std::move(holder)](
1233  std::exception_ptr const* exceptionFromBeginStreamLumi) mutable {
1234  if (exceptionFromBeginStreamLumi) {
1236  tmp.doneWaiting(*exceptionFromBeginStreamLumi);
1238  } else {
1240  }
1241  });
1242  auto& event = principalCache_.eventPrincipal(i);
1243  //We need to be sure that 'status' and its internal shared_ptr<LuminosityBlockPrincipal> are only
1244  // held by the container as this lambda may not finish executing before all the tasks it
1245  // spawns have already started to run.
1246  auto eventSetupImpls = &status->eventSetupImpls();
1247  auto lp = status->lumiPrincipal().get();
1250  event.setLuminosityBlockPrincipal(lp);
1251  LumiTransitionInfo transitionInfo(*lp, es, eventSetupImpls);
1252  beginStreamTransitionAsync<Traits>(
1253  WaitingTaskHolder{eventTask}, *schedule_, i, transitionInfo, serviceToken_, subProcesses_);
1254  });
1255  }
1256  }
1257  }); // beginStreamTask
1258 
1259  //task to start the global begin lumi
1260  WaitingTaskHolder beginStreamsHolder{beginStreamsTask};
1261 
1262  EventSetupImpl const& es = status->eventSetupImpl(esp_->subProcessIndex());
1263  {
1264  LumiTransitionInfo transitionInfo(lumiPrincipal, es, &status->eventSetupImpls());
1265  using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalBegin>;
1266  beginGlobalTransitionAsync<Traits>(
1267  beginStreamsHolder, *schedule_, transitionInfo, serviceToken_, subProcesses_);
1268  }
1269  } catch (...) {
1270  status->resetResources();
1271  iHolder.doneWaiting(std::current_exception());
1272  }
1273  }); // task in sourceResourcesAcquirer
1274  }; // end lumiWork
1275 
1276  auto queueLumiWorkTask = make_waiting_task(
1277  tbb::task::allocate_root(),
1278  [this, lumiWorkLambda = std::move(lumiWork), iHolder](std::exception_ptr const* iPtr) mutable {
1279  if (iPtr) {
1280  iHolder.doneWaiting(*iPtr);
1281  }
1282  lumiQueue_->pushAndPause(std::move(lumiWorkLambda));
1283  });
1284 
1285  if (espController_->doWeNeedToWaitForIOVsToFinish(iSync)) {
1286  // We only get here inside this block if there is an EventSetup
1287  // module not able to handle concurrent IOVs (usually an ESSource)
1288  // and the new sync value is outside the current IOV of that module.
1289 
1290  WaitingTaskHolder queueLumiWorkTaskHolder{queueLumiWorkTask};
1291 
1292  queueWhichWaitsForIOVsToFinish_.push([this, queueLumiWorkTaskHolder, iSync, status]() mutable {
1293  // Caught exception is propagated via WaitingTaskHolder
1294  CMS_SA_ALLOW try {
1295  SendSourceTerminationSignalIfException sentry(actReg_.get());
1296  // Pass in iSync to let the EventSetup system know which run and lumi
1297  // need to be processed and prepare IOVs for it.
1298  // Pass in the endIOVWaitingTasks so the lumi can notify them when the
1299  // lumi is done and no longer needs its EventSetup IOVs.
1300  espController_->eventSetupForInstance(
1301  iSync, queueLumiWorkTaskHolder, status->endIOVWaitingTasks(), status->eventSetupImpls());
1302  sentry.completedSuccessfully();
1303  } catch (...) {
1304  queueLumiWorkTaskHolder.doneWaiting(std::current_exception());
1305  }
1307  });
1308 
1309  } else {
1311 
1312  // This holder will be used to wait until the EventSetup IOVs are ready
1313  WaitingTaskHolder queueLumiWorkTaskHolder{queueLumiWorkTask};
1314  // Caught exception is propagated via WaitingTaskHolder
1315  CMS_SA_ALLOW try {
1316  SendSourceTerminationSignalIfException sentry(actReg_.get());
1317 
1318  // Pass in iSync to let the EventSetup system know which run and lumi
1319  // need to be processed and prepare IOVs for it.
1320  // Pass in the endIOVWaitingTasks so the lumi can notify them when the
1321  // lumi is done and no longer needs its EventSetup IOVs.
1322  espController_->eventSetupForInstance(
1323  iSync, queueLumiWorkTaskHolder, status->endIOVWaitingTasks(), status->eventSetupImpls());
1324  sentry.completedSuccessfully();
1325 
1326  } catch (...) {
1327  queueLumiWorkTaskHolder.doneWaiting(std::current_exception());
1328  }
1329  }
1330  }

References actReg_, CMS_SA_ALLOW, edm::WaitingTaskHolder::doneWaiting(), esp_, espController_, edm::PrincipalCache::eventPrincipal(), handleNextEventForStreamAsync(), mps_fire::i, input_, edm::Service< T >::isAvailable(), looper_, lumiQueue_, edm::make_waiting_task(), eostools::move(), edm::PreallocationConfiguration::numberOfStreams(), edm::SerialTaskQueue::pause(), preallocations_, principalCache_, processContext_, edm::SerialTaskQueueChain::push(), edm::SerialTaskQueue::push(), queueWhichWaitsForIOVsToFinish_, readLuminosityBlock(), schedule_, edm::SharedResourcesAcquirer::serialQueueChain(), serviceToken_, sourceResourcesAcquirer_, mps_update::status, streamEndLumiAsync(), streamLumiActive_, streamLumiStatus_, streamQueues_, subProcesses_, edm::WaitingTaskHolder::taskHasFailed(), and createJobs::tmp.

Referenced by handleNextEventForStreamAsync(), and processLumis().

◆ beginProcessBlock()

void edm::EventProcessor::beginProcessBlock ( bool &  beginProcessBlockSucceeded)

Definition at line 876 of file EventProcessor.cc.

876  {
877  ProcessBlockPrincipal& processBlockPrincipal = principalCache_.processBlockPrincipal();
878  processBlockPrincipal.fillProcessBlockPrincipal(processConfiguration_->processName());
879 
880  using Traits = OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalBegin>;
881  auto globalWaitTask = make_empty_waiting_task();
882  globalWaitTask->increment_ref_count();
883 
884  ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
885  beginGlobalTransitionAsync<Traits>(
886  WaitingTaskHolder(globalWaitTask.get()), *schedule_, transitionInfo, serviceToken_, subProcesses_);
887 
888  globalWaitTask->wait_for_all();
889  if (globalWaitTask->exceptionPtr() != nullptr) {
890  std::rethrow_exception(*(globalWaitTask->exceptionPtr()));
891  }
892  beginProcessBlockSucceeded = true;
893  }

References edm::ProcessBlockPrincipal::fillProcessBlockPrincipal(), edm::make_empty_waiting_task(), principalCache_, edm::PrincipalCache::processBlockPrincipal(), processConfiguration_, schedule_, serviceToken_, and subProcesses_.

◆ beginRun()

void edm::EventProcessor::beginRun ( ProcessHistoryID const &  phid,
RunNumber_t  run,
bool &  globalBeginSucceeded,
bool &  eventSetupForInstanceSucceeded 
)

Definition at line 970 of file EventProcessor.cc.

973  {
974  globalBeginSucceeded = false;
975  RunPrincipal& runPrincipal = principalCache_.runPrincipal(phid, run);
976  {
977  SendSourceTerminationSignalIfException sentry(actReg_.get());
978 
979  input_->doBeginRun(runPrincipal, &processContext_);
980  sentry.completedSuccessfully();
981  }
982 
983  IOVSyncValue ts(EventID(runPrincipal.run(), 0, 0), runPrincipal.beginTime());
985  espController_->forceCacheClear();
986  }
987  {
988  SendSourceTerminationSignalIfException sentry(actReg_.get());
989  espController_->eventSetupForInstance(ts);
990  eventSetupForInstanceSucceeded = true;
991  sentry.completedSuccessfully();
992  }
993  auto const& es = esp_->eventSetupImpl();
994  if (looper_ && looperBeginJobRun_ == false) {
995  looper_->copyInfo(ScheduleInfo(schedule_.get()));
996  looper_->beginOfJob(es);
997  looperBeginJobRun_ = true;
998  looper_->doStartingNewLoop();
999  }
1000  {
1001  using Traits = OccurrenceTraits<RunPrincipal, BranchActionGlobalBegin>;
1002  auto globalWaitTask = make_empty_waiting_task();
1003  globalWaitTask->increment_ref_count();
1004  RunTransitionInfo transitionInfo(runPrincipal, es);
1005  beginGlobalTransitionAsync<Traits>(
1006  WaitingTaskHolder(globalWaitTask.get()), *schedule_, transitionInfo, serviceToken_, subProcesses_);
1007  globalWaitTask->wait_for_all();
1008  if (globalWaitTask->exceptionPtr() != nullptr) {
1009  std::rethrow_exception(*(globalWaitTask->exceptionPtr()));
1010  }
1011  }
1012  globalBeginSucceeded = true;
1013  FDEBUG(1) << "\tbeginRun " << run << "\n";
1014  if (looper_) {
1015  looper_->doBeginRun(runPrincipal, es, &processContext_);
1016  }
1017  {
1018  //To wait, the ref count has to be 1+#streams
1019  auto streamLoopWaitTask = make_empty_waiting_task();
1020  streamLoopWaitTask->increment_ref_count();
1021 
1022  using Traits = OccurrenceTraits<RunPrincipal, BranchActionStreamBegin>;
1023 
1024  RunTransitionInfo transitionInfo(runPrincipal, es);
1025  beginStreamsTransitionAsync<Traits>(streamLoopWaitTask.get(),
1026  *schedule_,
1028  transitionInfo,
1029  serviceToken_,
1030  subProcesses_);
1031 
1032  streamLoopWaitTask->wait_for_all();
1033  if (streamLoopWaitTask->exceptionPtr() != nullptr) {
1034  std::rethrow_exception(*(streamLoopWaitTask->exceptionPtr()));
1035  }
1036  }
1037  FDEBUG(1) << "\tstreamBeginRun " << run << "\n";
1038  if (looper_) {
1039  //looper_->doStreamBeginRun(schedule_->streamID(),runPrincipal, es);
1040  }
1041  }

References actReg_, edm::RunPrincipal::beginTime(), esp_, espController_, FDEBUG, forceESCacheClearOnNewRun_, input_, looper_, looperBeginJobRun_, edm::make_empty_waiting_task(), edm::PreallocationConfiguration::numberOfStreams(), preallocations_, principalCache_, processContext_, edm::RunPrincipal::run(), run(), edm::PrincipalCache::runPrincipal(), schedule_, serviceToken_, and subProcesses_.

◆ branchIDListHelper() [1/2]

std::shared_ptr<BranchIDListHelper>& edm::EventProcessor::branchIDListHelper ( )
inlineprivate

Definition at line 294 of file EventProcessor.h.

References branchIDListHelper_, and edm::get_underlying_safe().

◆ branchIDListHelper() [2/2]

std::shared_ptr<BranchIDListHelper const> edm::EventProcessor::branchIDListHelper ( ) const
inlineprivate

Definition at line 291 of file EventProcessor.h.

291  {
293  }

References branchIDListHelper_, and edm::get_underlying_safe().

Referenced by init().

◆ checkForAsyncStopRequest()

bool edm::EventProcessor::checkForAsyncStopRequest ( StatusCode returnCode)
private

Definition at line 645 of file EventProcessor.cc.

645  {
646  bool returnValue = false;
647 
648  // Look for a shutdown signal
649  if (shutdown_flag.load(std::memory_order_acquire)) {
650  returnValue = true;
652  }
653  return returnValue;
654  }

References epSignal, runEdmFileComparison::returnCode, and edm::shutdown_flag.

Referenced by nextTransitionType().

◆ clearCounters()

void edm::EventProcessor::clearCounters ( )

Clears counters used by trigger report.

Definition at line 639 of file EventProcessor.cc.

639 { schedule_->clearCounters(); }

References schedule_.

◆ closeInputFile()

void edm::EventProcessor::closeInputFile ( bool  cleaningUpAfterException)

Definition at line 775 of file EventProcessor.cc.

775  {
776  if (fb_.get() != nullptr) {
777  SendSourceTerminationSignalIfException sentry(actReg_.get());
778  input_->closeFile(fb_.get(), cleaningUpAfterException);
779  sentry.completedSuccessfully();
780  }
781  FDEBUG(1) << "\tcloseInputFile\n";
782  }

References actReg_, fb_, FDEBUG, and input_.

◆ closeOutputFiles()

void edm::EventProcessor::closeOutputFiles ( )

Definition at line 792 of file EventProcessor.cc.

792  {
793  if (fb_.get() != nullptr) {
794  schedule_->closeOutputFiles();
795  for_all(subProcesses_, [](auto& subProcess) { subProcess.closeOutputFiles(); });
796  }
797  FDEBUG(1) << "\tcloseOutputFiles\n";
798  }

References fb_, FDEBUG, edm::for_all(), schedule_, and subProcesses_.

◆ continueLumiAsync()

void edm::EventProcessor::continueLumiAsync ( edm::WaitingTaskHolder  iHolder)

Definition at line 1332 of file EventProcessor.cc.

1332  {
1333  {
1334  //all streams are sharing the same status at the moment
1335  auto status = streamLumiStatus_[0]; //read from streamLumiActive_ happened in calling routine
1336  status->needToContinueLumi();
1337  status->startProcessingEvents();
1338  }
1339 
1340  unsigned int streamIndex = 0;
1341  for (; streamIndex < preallocations_.numberOfStreams() - 1; ++streamIndex) {
1342  tbb::task::enqueue(*edm::make_functor_task(tbb::task::allocate_root(), [this, streamIndex, h = iHolder]() {
1343  handleNextEventForStreamAsync(h, streamIndex);
1344  }));
1345  }
1346  tbb::task::spawn(*edm::make_functor_task(tbb::task::allocate_root(), [this, streamIndex, h = std::move(iHolder)]() {
1347  handleNextEventForStreamAsync(h, streamIndex);
1348  }));
1349  }

References handleNextEventForStreamAsync(), edm::make_functor_task(), eostools::move(), edm::PreallocationConfiguration::numberOfStreams(), preallocations_, mps_update::status, and streamLumiStatus_.

Referenced by processLumis().

◆ deleteLumiFromCache()

void edm::EventProcessor::deleteLumiFromCache ( LuminosityBlockProcessingStatus iStatus)

Definition at line 1653 of file EventProcessor.cc.

1653  {
1654  for (auto& s : subProcesses_) {
1655  s.deleteLumiFromCache(*iStatus.lumiPrincipal());
1656  }
1657  iStatus.lumiPrincipal()->clearPrincipal();
1658  //FDEBUG(1) << "\tdeleteLumiFromCache " << run << "/" << lumi << "\n";
1659  }

References edm::LuminosityBlockProcessingStatus::lumiPrincipal(), alignCSCRings::s, and subProcesses_.

Referenced by globalEndLumiAsync().

◆ deleteRunFromCache()

void edm::EventProcessor::deleteRunFromCache ( ProcessHistoryID const &  phid,
RunNumber_t  run 
)

Definition at line 1628 of file EventProcessor.cc.

1628  {
1629  principalCache_.deleteRun(phid, run);
1630  for_all(subProcesses_, [run, phid](auto& subProcess) { subProcess.deleteRunFromCache(phid, run); });
1631  FDEBUG(1) << "\tdeleteRunFromCache " << run << "\n";
1632  }

References edm::PrincipalCache::deleteRun(), FDEBUG, edm::for_all(), principalCache_, run(), and subProcesses_.

Referenced by endUnfinishedRun().

◆ doErrorStuff()

void edm::EventProcessor::doErrorStuff ( )

Definition at line 867 of file EventProcessor.cc.

867  {
868  FDEBUG(1) << "\tdoErrorStuff\n";
869  LogError("StateMachine") << "The EventProcessor state machine encountered an unexpected event\n"
870  << "and went to the error state\n"
871  << "Will attempt to terminate processing normally\n"
872  << "(IF using the looper the next loop will be attempted)\n"
873  << "This likely indicates a bug in an input module or corrupted input or both\n";
874  }

References FDEBUG.

Referenced by runToCompletion().

◆ enableEndPaths()

void edm::EventProcessor::enableEndPaths ( bool  active)

Turn end_paths "off" if "active" is false; turn end_paths "on" if "active" is true.

Definition at line 635 of file EventProcessor.cc.

635 { schedule_->enableEndPaths(active); }

References schedule_.

◆ endJob()

void edm::EventProcessor::endJob ( void  )

This should be called before the EventProcessor is destroyed throws if any module's endJob throws an exception.

Definition at line 592 of file EventProcessor.cc.

592  {
593  // Collects exceptions, so we don't throw before all operations are performed.
594  ExceptionCollector c(
595  "Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
596 
597  //make the services available
599 
600  //NOTE: this really should go elsewhere in the future
601  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
602  c.call([this, i]() { this->schedule_->endStream(i); });
603  for (auto& subProcess : subProcesses_) {
604  c.call([&subProcess, i]() { subProcess.doEndStream(i); });
605  }
606  }
607  auto actReg = actReg_.get();
608  c.call([actReg]() { actReg->preEndJobSignal_(); });
609  schedule_->endJob(c);
610  for (auto& subProcess : subProcesses_) {
611  c.call(std::bind(&SubProcess::doEndJob, &subProcess));
612  }
613  c.call(std::bind(&InputSource::doEndJob, input_.get()));
614  if (looper_) {
615  c.call(std::bind(&EDLooperBase::endOfJob, looper()));
616  }
617  c.call([actReg]() { actReg->postEndJobSignal_(); });
618  if (c.hasThrown()) {
619  c.rethrow();
620  }
621  }

References actReg_, HltBtagPostValidation_cff::c, edm::SubProcess::doEndJob(), edm::InputSource::doEndJob(), edm::EDLooperBase::endOfJob(), mps_fire::i, input_, looper(), looper_, edm::PreallocationConfiguration::numberOfStreams(), preallocations_, schedule_, serviceToken_, and subProcesses_.

Referenced by PythonEventProcessor::~PythonEventProcessor().

◆ endOfLoop()

bool edm::EventProcessor::endOfLoop ( )

Definition at line 828 of file EventProcessor.cc.

828  {
829  if (looper_) {
830  ModuleChanger changer(schedule_.get(), preg_.get(), esp_->recordsToProxyIndices());
831  looper_->setModuleChanger(&changer);
832  EDLooperBase::Status status = looper_->doEndOfLoop(esp_->eventSetupImpl());
833  looper_->setModuleChanger(nullptr);
835  return true;
836  else
837  return false;
838  }
839  FDEBUG(1) << "\tendOfLoop\n";
840  return true;
841  }

References esp_, FDEBUG, forceLooperToEnd_, edm::EDLooperBase::kContinue, looper_, preg_, schedule_, and mps_update::status.

Referenced by runToCompletion().

◆ endPathsEnabled()

bool edm::EventProcessor::endPathsEnabled ( ) const

Return true if end_paths are active, and false if they are inactive.

Definition at line 637 of file EventProcessor.cc.

637 { return schedule_->endPathsEnabled(); }

References schedule_.

◆ endProcessBlock()

void edm::EventProcessor::endProcessBlock ( bool  cleaningUpAfterException,
bool  beginProcessBlockSucceeded 
)

Definition at line 934 of file EventProcessor.cc.

934  {
935  ProcessBlockPrincipal& processBlockPrincipal = principalCache_.processBlockPrincipal();
936 
937  using Traits = OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalEnd>;
938  auto globalWaitTask = make_empty_waiting_task();
939  globalWaitTask->increment_ref_count();
940 
941  ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
942  endGlobalTransitionAsync<Traits>(WaitingTaskHolder(globalWaitTask.get()),
943  *schedule_,
944  transitionInfo,
947  cleaningUpAfterException);
948 
949  globalWaitTask->wait_for_all();
950  if (globalWaitTask->exceptionPtr() != nullptr) {
951  std::rethrow_exception(*(globalWaitTask->exceptionPtr()));
952  }
953 
954  if (beginProcessBlockSucceeded) {
955  auto writeWaitTask = edm::make_empty_waiting_task();
956  writeWaitTask->increment_ref_count();
958  writeWaitTask->wait_for_all();
959  if (writeWaitTask->exceptionPtr()) {
960  std::rethrow_exception(*writeWaitTask->exceptionPtr());
961  }
962  }
963 
964  processBlockPrincipal.clearPrincipal();
965  for (auto& s : subProcesses_) {
966  s.clearProcessBlockPrincipal(ProcessBlockType::New);
967  }
968  }

References edm::Principal::clearPrincipal(), edm::make_empty_waiting_task(), edm::PrincipalCache::New, principalCache_, edm::PrincipalCache::processBlockPrincipal(), alignCSCRings::s, schedule_, serviceToken_, subProcesses_, and writeProcessBlockAsync().

◆ endRun()

void edm::EventProcessor::endRun ( ProcessHistoryID const &  phid,
RunNumber_t  run,
bool  globalBeginSucceeded,
bool  cleaningUpAfterException 
)

Definition at line 1069 of file EventProcessor.cc.

1072  {
1073  RunPrincipal& runPrincipal = principalCache_.runPrincipal(phid, run);
1074  runPrincipal.setEndTime(input_->timestamp());
1075 
1076  IOVSyncValue ts(
1077  EventID(runPrincipal.run(), LuminosityBlockID::maxLuminosityBlockNumber(), EventID::maxEventNumber()),
1078  runPrincipal.endTime());
1079  {
1080  SendSourceTerminationSignalIfException sentry(actReg_.get());
1081  espController_->eventSetupForInstance(ts);
1082  sentry.completedSuccessfully();
1083  }
1084  auto const& es = esp_->eventSetupImpl();
1085  if (globalBeginSucceeded) {
1086  //To wait, the ref count has to be 1+#streams
1087  auto streamLoopWaitTask = make_empty_waiting_task();
1088  streamLoopWaitTask->increment_ref_count();
1089 
1090  using Traits = OccurrenceTraits<RunPrincipal, BranchActionStreamEnd>;
1091 
1092  RunTransitionInfo transitionInfo(runPrincipal, es);
1093  endStreamsTransitionAsync<Traits>(WaitingTaskHolder(streamLoopWaitTask.get()),
1094  *schedule_,
1096  transitionInfo,
1097  serviceToken_,
1098  subProcesses_,
1099  cleaningUpAfterException);
1100 
1101  streamLoopWaitTask->wait_for_all();
1102  if (streamLoopWaitTask->exceptionPtr() != nullptr) {
1103  std::rethrow_exception(*(streamLoopWaitTask->exceptionPtr()));
1104  }
1105  }
1106  FDEBUG(1) << "\tstreamEndRun " << run << "\n";
1107  if (looper_) {
1108  //looper_->doStreamEndRun(schedule_->streamID(),runPrincipal, es);
1109  }
1110  {
1111  auto globalWaitTask = make_empty_waiting_task();
1112  globalWaitTask->increment_ref_count();
1113 
1114  RunTransitionInfo transitionInfo(runPrincipal, es);
1115  using Traits = OccurrenceTraits<RunPrincipal, BranchActionGlobalEnd>;
1116  endGlobalTransitionAsync<Traits>(WaitingTaskHolder(globalWaitTask.get()),
1117  *schedule_,
1118  transitionInfo,
1119  serviceToken_,
1120  subProcesses_,
1121  cleaningUpAfterException);
1122  globalWaitTask->wait_for_all();
1123  if (globalWaitTask->exceptionPtr() != nullptr) {
1124  std::rethrow_exception(*(globalWaitTask->exceptionPtr()));
1125  }
1126  }
1127  FDEBUG(1) << "\tendRun " << run << "\n";
1128  if (looper_) {
1129  looper_->doEndRun(runPrincipal, es, &processContext_);
1130  }
1131  }

References actReg_, edm::RunPrincipal::endTime(), esp_, espController_, FDEBUG, input_, looper_, edm::make_empty_waiting_task(), edm::EventID::maxEventNumber(), edm::LuminosityBlockID::maxLuminosityBlockNumber(), edm::PreallocationConfiguration::numberOfStreams(), preallocations_, principalCache_, processContext_, edm::RunPrincipal::run(), run(), edm::PrincipalCache::runPrincipal(), schedule_, serviceToken_, edm::RunPrincipal::setEndTime(), and subProcesses_.

Referenced by endUnfinishedRun().

◆ endUnfinishedLumi()

void edm::EventProcessor::endUnfinishedLumi ( )

Definition at line 1495 of file EventProcessor.cc.

1495  {
1496  if (streamLumiActive_.load() > 0) {
1497  auto globalWaitTask = make_empty_waiting_task();
1498  globalWaitTask->increment_ref_count();
1499  {
1500  WaitingTaskHolder globalTaskHolder{globalWaitTask.get()};
1501  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1502  if (streamLumiStatus_[i]) {
1503  streamEndLumiAsync(globalTaskHolder, i);
1504  }
1505  }
1506  }
1507  globalWaitTask->wait_for_all();
1508  if (globalWaitTask->exceptionPtr() != nullptr) {
1509  std::rethrow_exception(*(globalWaitTask->exceptionPtr()));
1510  }
1511  }
1512  }

References mps_fire::i, edm::make_empty_waiting_task(), edm::PreallocationConfiguration::numberOfStreams(), preallocations_, streamEndLumiAsync(), streamLumiActive_, and streamLumiStatus_.

◆ endUnfinishedRun()

void edm::EventProcessor::endUnfinishedRun ( ProcessHistoryID const &  phid,
RunNumber_t  run,
bool  globalBeginSucceeded,
bool  cleaningUpAfterException,
bool  eventSetupForInstanceSucceeded 
)

Definition at line 1043 of file EventProcessor.cc.

1047  {
1048  if (eventSetupForInstanceSucceeded) {
1049  //If we skip empty runs, this would be called conditionally
1050  endRun(phid, run, globalBeginSucceeded, cleaningUpAfterException);
1051 
1052  if (globalBeginSucceeded) {
1054  t->increment_ref_count();
1055  RunPrincipal& runPrincipal = principalCache_.runPrincipal(phid, run);
1056  MergeableRunProductMetadata* mergeableRunProductMetadata = runPrincipal.mergeableRunProductMetadata();
1057  mergeableRunProductMetadata->preWriteRun();
1058  writeRunAsync(edm::WaitingTaskHolder{t.get()}, phid, run, mergeableRunProductMetadata);
1059  t->wait_for_all();
1060  mergeableRunProductMetadata->postWriteRun();
1061  if (t->exceptionPtr()) {
1062  std::rethrow_exception(*t->exceptionPtr());
1063  }
1064  }
1065  }
1066  deleteRunFromCache(phid, run);
1067  }

References deleteRunFromCache(), endRun(), edm::make_empty_waiting_task(), edm::RunPrincipal::mergeableRunProductMetadata(), edm::MergeableRunProductMetadata::postWriteRun(), edm::MergeableRunProductMetadata::preWriteRun(), principalCache_, run(), edm::PrincipalCache::runPrincipal(), submitPVValidationJobs::t, and writeRunAsync().

◆ getAllModuleDescriptions()

std::vector< ModuleDescription const * > edm::EventProcessor::getAllModuleDescriptions ( ) const

Return a vector allowing const access to all the ModuleDescriptions for this EventProccessor. *** N.B. *** Ownership of the ModuleDescriptions is not *** passed to the caller. Do not call delete on these *** pointers!

Definition at line 625 of file EventProcessor.cc.

625  {
626  return schedule_->getAllModuleDescriptions();
627  }

References schedule_.

◆ getToken()

ServiceToken edm::EventProcessor::getToken ( )

Definition at line 623 of file EventProcessor.cc.

623 { return serviceToken_; }

References serviceToken_.

Referenced by ~EventProcessor().

◆ globalEndLumiAsync()

void edm::EventProcessor::globalEndLumiAsync ( edm::WaitingTaskHolder  iTask,
std::shared_ptr< LuminosityBlockProcessingStatus iLumiStatus 
)

Definition at line 1360 of file EventProcessor.cc.

1361  {
1362  // Get some needed info out of the status object before moving
1363  // it into finalTaskForThisLumi.
1364  auto& lp = *(iLumiStatus->lumiPrincipal());
1365  bool didGlobalBeginSucceed = iLumiStatus->didGlobalBeginSucceed();
1366  bool cleaningUpAfterException = iLumiStatus->cleaningUpAfterException();
1367  EventSetupImpl const& es = iLumiStatus->eventSetupImpl(esp_->subProcessIndex());
1368  std::vector<std::shared_ptr<const EventSetupImpl>> const* eventSetupImpls = &iLumiStatus->eventSetupImpls();
1369 
1370  auto finalTaskForThisLumi = edm::make_waiting_task(
1371  tbb::task::allocate_root(),
1372  [status = std::move(iLumiStatus), iTask = std::move(iTask), this](std::exception_ptr const* iPtr) mutable {
1373  std::exception_ptr ptr;
1374  if (iPtr) {
1375  handleEndLumiExceptions(iPtr, iTask);
1376  } else {
1377  // Caught exception is passed to handleEndLumiExceptions()
1378  CMS_SA_ALLOW try {
1380  if (looper_) {
1381  auto& lumiPrincipal = *(status->lumiPrincipal());
1382  EventSetupImpl const& eventSetupImpl = status->eventSetupImpl(esp_->subProcessIndex());
1383  looper_->doEndLuminosityBlock(lumiPrincipal, eventSetupImpl, &processContext_);
1384  }
1385  } catch (...) {
1386  ptr = std::current_exception();
1387  }
1388  }
1390 
1391  // Try hard to clean up resources so the
1392  // process can terminate in a controlled
1393  // fashion even after exceptions have occurred.
1394  // Caught exception is passed to handleEndLumiExceptions()
1395  CMS_SA_ALLOW try { deleteLumiFromCache(*status); } catch (...) {
1396  if (not ptr) {
1397  ptr = std::current_exception();
1398  }
1399  }
1400  // Caught exception is passed to handleEndLumiExceptions()
1401  CMS_SA_ALLOW try {
1402  status->resumeGlobalLumiQueue();
1404  } catch (...) {
1405  if (not ptr) {
1406  ptr = std::current_exception();
1407  }
1408  }
1409  // Caught exception is passed to handleEndLumiExceptions()
1410  CMS_SA_ALLOW try {
1411  // This call to status.resetResources() must occur before iTask is destroyed.
1412  // Otherwise there will be a data race which could result in endRun
1413  // being delayed until it is too late to successfully call it.
1414  status->resetResources();
1415  status.reset();
1416  } catch (...) {
1417  if (not ptr) {
1418  ptr = std::current_exception();
1419  }
1420  }
1421 
1422  if (ptr) {
1423  handleEndLumiExceptions(&ptr, iTask);
1424  }
1425  });
1426 
1427  auto writeT = edm::make_waiting_task(
1428  tbb::task::allocate_root(),
1429  [this, didGlobalBeginSucceed, &lumiPrincipal = lp, task = WaitingTaskHolder(finalTaskForThisLumi)](
1430  std::exception_ptr const* iExcept) mutable {
1431  if (iExcept) {
1432  task.doneWaiting(*iExcept);
1433  } else {
1434  //Only call writeLumi if beginLumi succeeded
1435  if (didGlobalBeginSucceed) {
1436  writeLumiAsync(std::move(task), lumiPrincipal);
1437  }
1438  }
1439  });
1440 
1441  IOVSyncValue ts(EventID(lp.run(), lp.luminosityBlock(), EventID::maxEventNumber()), lp.beginTime());
1442 
1443  LumiTransitionInfo transitionInfo(lp, es, eventSetupImpls);
1444  using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalEnd>;
1445  endGlobalTransitionAsync<Traits>(
1446  WaitingTaskHolder(writeT), *schedule_, transitionInfo, serviceToken_, subProcesses_, cleaningUpAfterException);
1447  }

References CMS_SA_ALLOW, deleteLumiFromCache(), esp_, handleEndLumiExceptions(), looper_, edm::make_waiting_task(), edm::EventID::maxEventNumber(), eostools::move(), processContext_, queueWhichWaitsForIOVsToFinish_, edm::SerialTaskQueue::resume(), schedule_, serviceToken_, mps_update::status, subProcesses_, TrackValidation_cff::task, and writeLumiAsync().

Referenced by streamEndLumiAsync().

◆ handleEndLumiExceptions()

void edm::EventProcessor::handleEndLumiExceptions ( std::exception_ptr const *  iPtr,
WaitingTaskHolder holder 
)

Definition at line 1351 of file EventProcessor.cc.

1351  {
1352  if (setDeferredException(*iPtr)) {
1353  WaitingTaskHolder tmp(holder);
1354  tmp.doneWaiting(*iPtr);
1355  } else {
1357  }
1358  }

References setDeferredException(), setExceptionMessageLumis(), and createJobs::tmp.

Referenced by globalEndLumiAsync(), and streamEndLumiAsync().

◆ handleNextEventForStreamAsync()

void edm::EventProcessor::handleNextEventForStreamAsync ( WaitingTaskHolder  iTask,
unsigned int  iStreamIndex 
)
private

Definition at line 1723 of file EventProcessor.cc.

1723  {
1724  sourceResourcesAcquirer_.serialQueueChain().push([this, iTask, iStreamIndex]() mutable {
1726  //we do not want to extend the lifetime of the shared_ptr to the end of this function
1727  // as steramEndLumiAsync may clear the value from streamLumiStatus_[iStreamIndex]
1728  auto status = streamLumiStatus_[iStreamIndex].get();
1729  // Caught exception is propagated to EventProcessor::runToCompletion() via deferredExceptionPtr_
1730  CMS_SA_ALLOW try {
1731  if (readNextEventForStream(iStreamIndex, *status)) {
1732  auto recursionTask = make_waiting_task(
1733  tbb::task::allocate_root(), [this, iTask, iStreamIndex](std::exception_ptr const* iPtr) mutable {
1734  if (iPtr) {
1735  // Try to end the stream properly even if an exception was
1736  // thrown on an event.
1737  bool expected = false;
1738  if (deferredExceptionPtrIsSet_.compare_exchange_strong(expected, true)) {
1739  // This is the case where the exception in iPtr is the primary
1740  // exception and we want to see its message.
1741  deferredExceptionPtr_ = *iPtr;
1742  WaitingTaskHolder tempHolder(iTask);
1743  tempHolder.doneWaiting(*iPtr);
1744  }
1745  streamEndLumiAsync(std::move(iTask), iStreamIndex);
1746  //the stream will stop now
1747  return;
1748  }
1749  handleNextEventForStreamAsync(std::move(iTask), iStreamIndex);
1750  });
1751 
1752  processEventAsync(WaitingTaskHolder(recursionTask), iStreamIndex);
1753  } else {
1754  //the stream will stop now
1755  if (status->isLumiEnding()) {
1756  if (lastTransitionType() == InputSource::IsLumi and not status->haveStartedNextLumi()) {
1757  status->startNextLumi();
1758  beginLumiAsync(status->nextSyncValue(), status->runResource(), iTask);
1759  }
1760  streamEndLumiAsync(std::move(iTask), iStreamIndex);
1761  } else {
1762  iTask.doneWaiting(std::exception_ptr{});
1763  }
1764  }
1765  } catch (...) {
1766  // It is unlikely we will ever get in here ...
1767  // But if we do try to clean up and propagate the exception
1768  if (streamLumiStatus_[iStreamIndex]) {
1769  streamEndLumiAsync(iTask, iStreamIndex);
1770  }
1771  bool expected = false;
1772  if (deferredExceptionPtrIsSet_.compare_exchange_strong(expected, true)) {
1773  auto e = std::current_exception();
1775  iTask.doneWaiting(e);
1776  }
1777  }
1778  });
1779  }

References beginLumiAsync(), CMS_SA_ALLOW, deferredExceptionPtr_, deferredExceptionPtrIsSet_, edm::WaitingTaskHolder::doneWaiting(), MillePedeFileConverter_cfg::e, edm::InputSource::IsLumi, lastTransitionType(), edm::make_waiting_task(), eostools::move(), processEventAsync(), edm::SerialTaskQueueChain::push(), readNextEventForStream(), edm::SharedResourcesAcquirer::serialQueueChain(), serviceToken_, sourceResourcesAcquirer_, mps_update::status, streamEndLumiAsync(), and streamLumiStatus_.

Referenced by beginLumiAsync(), and continueLumiAsync().

◆ init()

void edm::EventProcessor::init ( std::shared_ptr< ProcessDesc > &  processDesc,
ServiceToken const &  token,
serviceregistry::ServiceLegacy  iLegacy 
)
private

Definition at line 328 of file EventProcessor.cc.

330  {
331  //std::cerr << processDesc->dump() << std::endl;
332 
333  // register the empty parentage vector , once and for all
335 
336  // register the empty parameter set, once and for all.
337  ParameterSet().registerIt();
338 
339  std::shared_ptr<ParameterSet> parameterSet = processDesc->getProcessPSet();
340 
341  // If there are subprocesses, pop the subprocess parameter sets out of the process parameter set
342  auto subProcessVParameterSet = popSubProcessVParameterSet(*parameterSet);
343  bool const hasSubProcesses = !subProcessVParameterSet.empty();
344 
345  // Validates the parameters in the 'options', 'maxEvents', 'maxLuminosityBlocks',
346  // and 'maxSecondsUntilRampdown' top level parameter sets. Default values are also
347  // set in here if the parameters were not explicitly set.
349 
350  // Now set some parameters specific to the main process.
351  ParameterSet const& optionsPset(parameterSet->getUntrackedParameterSet("options"));
352  auto const& fileMode = optionsPset.getUntrackedParameter<std::string>("fileMode");
353  if (fileMode != "NOMERGE" and fileMode != "FULLMERGE") {
354  throw Exception(errors::Configuration, "Illegal fileMode parameter value: ")
355  << fileMode << ".\n"
356  << "Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
357  } else {
358  fileModeNoMerge_ = (fileMode == "NOMERGE");
359  }
360  forceESCacheClearOnNewRun_ = optionsPset.getUntrackedParameter<bool>("forceEventSetupCacheClearOnNewRun");
361 
362  //threading
363  unsigned int nThreads = optionsPset.getUntrackedParameter<unsigned int>("numberOfThreads");
364 
365  // Even if numberOfThreads was set to zero in the Python configuration, the code
366  // in cmsRun.cpp should have reset it to something else.
367  assert(nThreads != 0);
368 
369  unsigned int nStreams = optionsPset.getUntrackedParameter<unsigned int>("numberOfStreams");
370  if (nStreams == 0) {
371  nStreams = nThreads;
372  }
373  if (nThreads > 1 or nStreams > 1) {
374  edm::LogInfo("ThreadStreamSetup") << "setting # threads " << nThreads << "\nsetting # streams " << nStreams;
375  }
376  unsigned int nConcurrentRuns = optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentRuns");
377  if (nConcurrentRuns != 1) {
378  throw Exception(errors::Configuration, "Illegal value nConcurrentRuns : ")
379  << "Although the plan is to change this in the future, currently nConcurrentRuns must always be 1.\n";
380  }
381  unsigned int nConcurrentLumis =
382  optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentLuminosityBlocks");
383  if (nConcurrentLumis == 0) {
384  nConcurrentLumis = nConcurrentRuns;
385  }
386 
387  //Check that relationships between threading parameters makes sense
388  /*
389  if(nThreads<nStreams) {
390  //bad
391  }
392  if(nConcurrentRuns>nStreams) {
393  //bad
394  }
395  if(nConcurrentRuns>nConcurrentLumis) {
396  //bad
397  }
398  */
399  IllegalParameters::setThrowAnException(optionsPset.getUntrackedParameter<bool>("throwIfIllegalParameter"));
400 
401  printDependencies_ = optionsPset.getUntrackedParameter<bool>("printDependencies");
402 
403  // Now do general initialization
404  ScheduleItems items;
405 
406  //initialize the services
407  auto& serviceSets = processDesc->getServicesPSets();
408  ServiceToken token = items.initServices(serviceSets, *parameterSet, iToken, iLegacy, true);
409  serviceToken_ = items.addCPRandTNS(*parameterSet, token);
410 
411  //make the services available
413 
414  if (nStreams > 1) {
416  handler->willBeUsingThreads();
417  }
418 
419  // intialize miscellaneous items
420  std::shared_ptr<CommonParams> common(items.initMisc(*parameterSet));
421 
422  // intialize the event setup provider
423  ParameterSet const& eventSetupPset(optionsPset.getUntrackedParameterSet("eventSetup"));
424  esp_ = espController_->makeProvider(*parameterSet, items.actReg_.get(), &eventSetupPset);
425 
426  // initialize the looper, if any
428  if (looper_) {
429  looper_->setActionTable(items.act_table_.get());
430  looper_->attachTo(*items.actReg_);
431 
432  //For now loopers make us run only 1 transition at a time
433  nStreams = 1;
434  nConcurrentLumis = 1;
435  nConcurrentRuns = 1;
436  }
437  espController_->setMaxConcurrentIOVs(nStreams, nConcurrentLumis);
438 
439  preallocations_ = PreallocationConfiguration{nThreads, nStreams, nConcurrentLumis, nConcurrentRuns};
440 
441  lumiQueue_ = std::make_unique<LimitedTaskQueue>(nConcurrentLumis);
442  streamQueues_.resize(nStreams);
443  streamLumiStatus_.resize(nStreams);
444 
445  // initialize the input source
447  *common,
448  items.preg(),
449  items.branchIDListHelper(),
450  items.thinnedAssociationsHelper(),
451  items.actReg_,
452  items.processConfiguration(),
454 
455  // intialize the Schedule
456  schedule_ = items.initSchedule(*parameterSet, hasSubProcesses, preallocations_, &processContext_);
457 
458  // set the data members
459  act_table_ = std::move(items.act_table_);
460  actReg_ = items.actReg_;
461  preg_ = items.preg();
463  branchIDListHelper_ = items.branchIDListHelper();
464  thinnedAssociationsHelper_ = items.thinnedAssociationsHelper();
465  processConfiguration_ = items.processConfiguration();
467  principalCache_.setProcessHistoryRegistry(input_->processHistoryRegistry());
468 
469  FDEBUG(2) << parameterSet << std::endl;
470 
472  for (unsigned int index = 0; index < preallocations_.numberOfStreams(); ++index) {
473  // Reusable event principal
474  auto ep = std::make_shared<EventPrincipal>(preg(),
478  historyAppender_.get(),
479  index);
481  }
482 
483  for (unsigned int index = 0; index < preallocations_.numberOfLuminosityBlocks(); ++index) {
484  auto lp =
485  std::make_unique<LuminosityBlockPrincipal>(preg(), *processConfiguration_, historyAppender_.get(), index);
487  }
488 
489  {
490  auto pb = std::make_unique<ProcessBlockPrincipal>(preg(), *processConfiguration_);
492 
493  auto pbForInput = std::make_unique<ProcessBlockPrincipal>(preg(), *processConfiguration_);
495  }
496 
497  // fill the subprocesses, if there are any
498  subProcesses_.reserve(subProcessVParameterSet.size());
499  for (auto& subProcessPSet : subProcessVParameterSet) {
500  subProcesses_.emplace_back(subProcessPSet,
501  *parameterSet,
502  preg(),
505  SubProcessParentageHelper(),
507  *actReg_,
508  token,
511  &processContext_);
512  }
513  }

References act_table_, actReg_, cms::cuda::assert(), branchIDListHelper(), branchIDListHelper_, trackingPlots::common, edm::errors::Configuration, SiStripBadComponentsDQMServiceTemplate_cfg::ep, esp_, espController_, Exception, FDEBUG, processOptions_cff::fileMode, fileModeNoMerge_, edm::fillLooper(), forceESCacheClearOnNewRun_, edm::ParameterSet::getUntrackedParameterSet(), historyAppender_, input_, edm::PrincipalCache::insert(), edm::PrincipalCache::insertForInput(), edm::ParentageRegistry::insertMapped(), edm::ParentageRegistry::instance(), mps_monitormerge::items, edm::serviceregistry::kConfigurationOverrides, looper_, lumiQueue_, edm::makeInput(), mergeableRunProductProcesses_, eostools::move(), runTheMatrix::nStreams, runTheMatrix::nThreads, edm::PreallocationConfiguration::numberOfLuminosityBlocks(), edm::PreallocationConfiguration::numberOfStreams(), or, edm::parameterSet(), edm::popSubProcessVParameterSet(), preallocations_, preg(), preg_, principalCache_, printDependencies_, processConfiguration_, processContext_, edm::ParameterSet::registerIt(), schedule_, serviceToken_, edm::PrincipalCache::setNumberOfConcurrentPrincipals(), edm::ProcessContext::setProcessConfiguration(), edm::MergeableRunProductProcesses::setProcessesWithMergeableRunProducts(), edm::PrincipalCache::setProcessHistoryRegistry(), edm::IllegalParameters::setThrowAnException(), streamLumiStatus_, streamQueues_, AlCaHLTBitMon_QueryRunRegistry::string, subProcesses_, thinnedAssociationsHelper(), thinnedAssociationsHelper_, unpackBuffers-CaloStage2::token, and edm::validateTopLevelParameterSets().

Referenced by EventProcessor().

◆ inputProcessBlocks()

void edm::EventProcessor::inputProcessBlocks ( )

Definition at line 895 of file EventProcessor.cc.

895  {
896  ProcessBlockPrincipal& processBlockPrincipal = principalCache_.inputProcessBlockPrincipal();
897  // For now the input source always returns false from readProcessBlock,
898  // so this does nothing at all.
899  // Eventually the ProcessBlockPrincipal needs to be properly filled
900  // and cleared. The delayed reader needs to be set. The correct process name
901  // needs to be supplied.
902  while (input_->readProcessBlock()) {
903  DelayedReader* reader = nullptr;
904  processBlockPrincipal.fillProcessBlockPrincipal(processConfiguration_->processName(), reader);
905 
906  using Traits = OccurrenceTraits<ProcessBlockPrincipal, BranchActionProcessBlockInput>;
907  auto globalWaitTask = make_empty_waiting_task();
908  globalWaitTask->increment_ref_count();
909 
910  ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
911  beginGlobalTransitionAsync<Traits>(
912  WaitingTaskHolder(globalWaitTask.get()), *schedule_, transitionInfo, serviceToken_, subProcesses_);
913 
914  globalWaitTask->wait_for_all();
915  if (globalWaitTask->exceptionPtr() != nullptr) {
916  std::rethrow_exception(*(globalWaitTask->exceptionPtr()));
917  }
918 
919  auto writeWaitTask = edm::make_empty_waiting_task();
920  writeWaitTask->increment_ref_count();
922  writeWaitTask->wait_for_all();
923  if (writeWaitTask->exceptionPtr()) {
924  std::rethrow_exception(*writeWaitTask->exceptionPtr());
925  }
926 
927  processBlockPrincipal.clearPrincipal();
928  for (auto& s : subProcesses_) {
929  s.clearProcessBlockPrincipal(ProcessBlockType::Input);
930  }
931  }
932  }

References edm::Principal::clearPrincipal(), edm::ProcessBlockPrincipal::fillProcessBlockPrincipal(), edm::PrincipalCache::Input, input_, edm::PrincipalCache::inputProcessBlockPrincipal(), edm::make_empty_waiting_task(), principalCache_, processConfiguration_, DQM::reader, alignCSCRings::s, schedule_, serviceToken_, subProcesses_, and writeProcessBlockAsync().

◆ lastTransitionType()

InputSource::ItemType edm::EventProcessor::lastTransitionType ( ) const
inline

Definition at line 192 of file EventProcessor.h.

192  {
194  return InputSource::IsStop;
195  }
196  return lastSourceTransition_;
197  }

References deferredExceptionPtrIsSet_, edm::InputSource::IsStop, and lastSourceTransition_.

Referenced by handleNextEventForStreamAsync(), and processLumis().

◆ looper() [1/2]

std::shared_ptr<EDLooperBase>& edm::EventProcessor::looper ( )
inlineprivate

Definition at line 302 of file EventProcessor.h.

302 { return get_underlying_safe(looper_); }

References edm::get_underlying_safe(), and looper_.

◆ looper() [2/2]

std::shared_ptr<EDLooperBase const> edm::EventProcessor::looper ( ) const
inlineprivate

Definition at line 301 of file EventProcessor.h.

301 { return get_underlying_safe(looper_); }

References edm::get_underlying_safe(), and looper_.

Referenced by endJob().

◆ nextLuminosityBlockID()

edm::LuminosityBlockNumber_t edm::EventProcessor::nextLuminosityBlockID ( )

Definition at line 686 of file EventProcessor.cc.

686 { return input_->luminosityBlock(); }

References input_.

Referenced by readNextEventForStream().

◆ nextRunID()

std::pair< edm::ProcessHistoryID, edm::RunNumber_t > edm::EventProcessor::nextRunID ( )

Definition at line 682 of file EventProcessor.cc.

682  {
683  return std::make_pair(input_->reducedProcessHistoryID(), input_->run());
684  }

References input_.

◆ nextTransitionType()

InputSource::ItemType edm::EventProcessor::nextTransitionType ( )

Definition at line 656 of file EventProcessor.cc.

656  {
657  if (deferredExceptionPtrIsSet_.load()) {
659  return InputSource::IsStop;
660  }
661 
662  SendSourceTerminationSignalIfException sentry(actReg_.get());
663  InputSource::ItemType itemType;
664  //For now, do nothing with InputSource::IsSynchronize
665  do {
666  itemType = input_->nextItemType();
667  } while (itemType == InputSource::IsSynchronize);
668 
669  lastSourceTransition_ = itemType;
670  sentry.completedSuccessfully();
671 
673 
675  actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExternalSignal);
677  }
678 
679  return lastSourceTransition_;
680  }

References actReg_, checkForAsyncStopRequest(), deferredExceptionPtrIsSet_, epSuccess, edm::ExternalSignal, input_, edm::InputSource::IsStop, edm::InputSource::IsSynchronize, lastSourceTransition_, and runEdmFileComparison::returnCode.

Referenced by readNextEventForStream().

◆ openOutputFiles()

void edm::EventProcessor::openOutputFiles ( )

Definition at line 784 of file EventProcessor.cc.

784  {
785  if (fb_.get() != nullptr) {
786  schedule_->openOutputFiles(*fb_);
787  for_all(subProcesses_, [this](auto& subProcess) { subProcess.openOutputFiles(*fb_); });
788  }
789  FDEBUG(1) << "\topenOutputFiles\n";
790  }

References fb_, FDEBUG, edm::for_all(), schedule_, and subProcesses_.

◆ operator=()

EventProcessor& edm::EventProcessor::operator= ( EventProcessor const &  )
delete

◆ preg() [1/2]

std::shared_ptr<ProductRegistry>& edm::EventProcessor::preg ( )
inlineprivate

Definition at line 290 of file EventProcessor.h.

290 { return get_underlying_safe(preg_); }

References edm::get_underlying_safe(), and preg_.

◆ preg() [2/2]

std::shared_ptr<ProductRegistry const> edm::EventProcessor::preg ( ) const
inlineprivate

Definition at line 289 of file EventProcessor.h.

289 { return get_underlying_safe(preg_); }

References edm::get_underlying_safe(), and preg_.

Referenced by beginJob(), init(), readAndMergeLumi(), readAndMergeRun(), readFile(), and readRun().

◆ prepareForNextLoop()

void edm::EventProcessor::prepareForNextLoop ( )

Definition at line 849 of file EventProcessor.cc.

849  {
850  looper_->prepareForNextLoop(esp_.get());
851  FDEBUG(1) << "\tprepareForNextLoop\n";
852  }

References esp_, FDEBUG, and looper_.

Referenced by runToCompletion().

◆ processConfiguration()

ProcessConfiguration const& edm::EventProcessor::processConfiguration ( ) const
inline

Definition at line 138 of file EventProcessor.h.

138 { return *processConfiguration_; }

References processConfiguration_.

◆ processEventAsync()

void edm::EventProcessor::processEventAsync ( WaitingTaskHolder  iHolder,
unsigned int  iStreamIndex 
)
private

Definition at line 1795 of file EventProcessor.cc.

1795  {
1796  tbb::task::spawn(
1797  *make_functor_task(tbb::task::allocate_root(), [=]() { processEventAsyncImpl(iHolder, iStreamIndex); }));
1798  }

References edm::make_functor_task(), and processEventAsyncImpl().

Referenced by handleNextEventForStreamAsync().

◆ processEventAsyncImpl()

void edm::EventProcessor::processEventAsyncImpl ( WaitingTaskHolder  iHolder,
unsigned int  iStreamIndex 
)
private

Definition at line 1800 of file EventProcessor.cc.

1800  {
1801  auto pep = &(principalCache_.eventPrincipal(iStreamIndex));
1802 
1805  if (rng.isAvailable()) {
1806  Event ev(*pep, ModuleDescription(), nullptr);
1807  rng->postEventRead(ev);
1808  }
1809 
1810  WaitingTaskHolder finalizeEventTask(make_waiting_task(
1811  tbb::task::allocate_root(), [this, pep, iHolder, iStreamIndex](std::exception_ptr const* iPtr) mutable {
1812  //NOTE: If we have a looper we only have one Stream
1813  if (looper_) {
1814  ServiceRegistry::Operate operateLooper(serviceToken_);
1815  processEventWithLooper(*pep, iStreamIndex);
1816  }
1817 
1818  FDEBUG(1) << "\tprocessEvent\n";
1819  pep->clearEventPrincipal();
1820  if (iPtr) {
1821  iHolder.doneWaiting(*iPtr);
1822  } else {
1823  iHolder.doneWaiting(std::exception_ptr());
1824  }
1825  }));
1826  WaitingTaskHolder afterProcessTask;
1827  if (subProcesses_.empty()) {
1828  afterProcessTask = std::move(finalizeEventTask);
1829  } else {
1830  //Need to run SubProcesses after schedule has finished
1831  // with the event
1832  afterProcessTask = WaitingTaskHolder(make_waiting_task(
1833  tbb::task::allocate_root(),
1834  [this, pep, finalizeEventTask, iStreamIndex](std::exception_ptr const* iPtr) mutable {
1835  if (not iPtr) {
1836  //when run with 1 thread, we want to the order to be what
1837  // it was before. This requires reversing the order since
1838  // tasks are run last one in first one out
1839  for (auto& subProcess : boost::adaptors::reverse(subProcesses_)) {
1840  subProcess.doEventAsync(finalizeEventTask, *pep, &streamLumiStatus_[iStreamIndex]->eventSetupImpls());
1841  }
1842  } else {
1843  finalizeEventTask.doneWaiting(*iPtr);
1844  }
1845  }));
1846  }
1847 
1848  EventSetupImpl const& es = streamLumiStatus_[iStreamIndex]->eventSetupImpl(esp_->subProcessIndex());
1849  EventTransitionInfo info(*pep, es);
1850  schedule_->processOneEventAsync(std::move(afterProcessTask), iStreamIndex, info, serviceToken_);
1851  }

References edm::WaitingTaskHolder::doneWaiting(), esp_, ev, edm::PrincipalCache::eventPrincipal(), FDEBUG, info(), edm::Service< T >::isAvailable(), looper_, edm::make_waiting_task(), eostools::move(), principalCache_, processEventWithLooper(), groupFilesInBlocks::reverse, schedule_, serviceToken_, streamLumiStatus_, and subProcesses_.

Referenced by processEventAsync().

◆ processEventWithLooper()

void edm::EventProcessor::processEventWithLooper ( EventPrincipal iPrincipal,
unsigned int  iStreamIndex 
)
private

Definition at line 1853 of file EventProcessor.cc.

1853  {
1854  bool randomAccess = input_->randomAccess();
1855  ProcessingController::ForwardState forwardState = input_->forwardState();
1856  ProcessingController::ReverseState reverseState = input_->reverseState();
1857  ProcessingController pc(forwardState, reverseState, randomAccess);
1858 
1860  do {
1861  StreamContext streamContext(iPrincipal.streamID(), &processContext_);
1862  EventSetupImpl const& es = streamLumiStatus_[iStreamIndex]->eventSetupImpl(esp_->subProcessIndex());
1863  status = looper_->doDuringLoop(iPrincipal, es, pc, &streamContext);
1864 
1865  bool succeeded = true;
1866  if (randomAccess) {
1867  if (pc.requestedTransition() == ProcessingController::kToPreviousEvent) {
1868  input_->skipEvents(-2);
1869  } else if (pc.requestedTransition() == ProcessingController::kToSpecifiedEvent) {
1870  succeeded = input_->goToEvent(pc.specifiedEventTransition());
1871  }
1872  }
1873  pc.setLastOperationSucceeded(succeeded);
1874  } while (!pc.lastOperationSucceeded());
1876  shouldWeStop_ = true;
1878  }
1879  }

References esp_, input_, edm::InputSource::IsStop, edm::EDLooperBase::kContinue, edm::ProcessingController::kToPreviousEvent, edm::ProcessingController::kToSpecifiedEvent, edm::ProcessingController::lastOperationSucceeded(), lastSourceTransition_, looper_, processContext_, edm::ProcessingController::requestedTransition(), edm::ProcessingController::setLastOperationSucceeded(), shouldWeStop_, edm::ProcessingController::specifiedEventTransition(), mps_update::status, edm::EventPrincipal::streamID(), streamLumiStatus_, and summarizeEdmComparisonLogfiles::succeeded.

Referenced by processEventAsyncImpl().

◆ processLumis()

InputSource::ItemType edm::EventProcessor::processLumis ( std::shared_ptr< void > const &  iRunResource)

Definition at line 1133 of file EventProcessor.cc.

1133  {
1134  auto waitTask = make_empty_waiting_task();
1135  waitTask->increment_ref_count();
1136 
1137  if (streamLumiActive_ > 0) {
1139  // Continue after opening a new input file
1140  continueLumiAsync(WaitingTaskHolder{waitTask.get()});
1141  } else {
1142  beginLumiAsync(IOVSyncValue(EventID(input_->run(), input_->luminosityBlock(), 0),
1143  input_->luminosityBlockAuxiliary()->beginTime()),
1144  iRunResource,
1145  WaitingTaskHolder{waitTask.get()});
1146  }
1147  waitTask->wait_for_all();
1148 
1149  if (waitTask->exceptionPtr() != nullptr) {
1150  std::rethrow_exception(*(waitTask->exceptionPtr()));
1151  }
1152  return lastTransitionType();
1153  }

References cms::cuda::assert(), beginLumiAsync(), continueLumiAsync(), input_, lastTransitionType(), edm::make_empty_waiting_task(), edm::PreallocationConfiguration::numberOfStreams(), preallocations_, and streamLumiActive_.

◆ readAndMergeLumi()

int edm::EventProcessor::readAndMergeLumi ( LuminosityBlockProcessingStatus iStatus)

Definition at line 1568 of file EventProcessor.cc.

1568  {
1569  auto& lumiPrincipal = *iStatus.lumiPrincipal();
1570  assert(lumiPrincipal.aux().sameIdentity(*input_->luminosityBlockAuxiliary()) or
1571  input_->processHistoryRegistry().reducedProcessHistoryID(lumiPrincipal.aux().processHistoryID()) ==
1572  input_->processHistoryRegistry().reducedProcessHistoryID(
1573  input_->luminosityBlockAuxiliary()->processHistoryID()));
1574  bool lumiOK = lumiPrincipal.adjustToNewProductRegistry(*preg());
1575  assert(lumiOK);
1576  lumiPrincipal.mergeAuxiliary(*input_->luminosityBlockAuxiliary());
1577  {
1578  SendSourceTerminationSignalIfException sentry(actReg_.get());
1579  input_->readAndMergeLumi(*iStatus.lumiPrincipal());
1580  sentry.completedSuccessfully();
1581  }
1582  return input_->luminosityBlock();
1583  }

References actReg_, cms::cuda::assert(), input_, edm::LuminosityBlockProcessingStatus::lumiPrincipal(), or, and preg().

Referenced by readNextEventForStream().

◆ readAndMergeRun()

std::pair< ProcessHistoryID, RunNumber_t > edm::EventProcessor::readAndMergeRun ( )

Definition at line 1537 of file EventProcessor.cc.

1537  {
1538  principalCache_.merge(input_->runAuxiliary(), preg());
1539  auto runPrincipal = principalCache_.runPrincipalPtr();
1540  {
1541  SendSourceTerminationSignalIfException sentry(actReg_.get());
1542  input_->readAndMergeRun(*runPrincipal);
1543  sentry.completedSuccessfully();
1544  }
1545  assert(input_->reducedProcessHistoryID() == runPrincipal->reducedProcessHistoryID());
1546  return std::make_pair(runPrincipal->reducedProcessHistoryID(), input_->run());
1547  }

References actReg_, cms::cuda::assert(), input_, edm::PrincipalCache::merge(), preg(), principalCache_, and edm::PrincipalCache::runPrincipalPtr().

◆ readEvent()

void edm::EventProcessor::readEvent ( unsigned int  iStreamIndex)
private

Definition at line 1781 of file EventProcessor.cc.

1781  {
1782  //TODO this will have to become per stream
1783  auto& event = principalCache_.eventPrincipal(iStreamIndex);
1784  StreamContext streamContext(event.streamID(), &processContext_);
1785 
1786  SendSourceTerminationSignalIfException sentry(actReg_.get());
1787  input_->readEvent(event, streamContext);
1788 
1789  streamLumiStatus_[iStreamIndex]->updateLastTimestamp(input_->timestamp());
1790  sentry.completedSuccessfully();
1791 
1792  FDEBUG(1) << "\treadEvent\n";
1793  }

References actReg_, edm::PrincipalCache::eventPrincipal(), FDEBUG, input_, principalCache_, processContext_, and streamLumiStatus_.

Referenced by readNextEventForStream().

◆ readFile()

void edm::EventProcessor::readFile ( )

◆ readLuminosityBlock()

void edm::EventProcessor::readLuminosityBlock ( LuminosityBlockProcessingStatus iStatus)

Definition at line 1549 of file EventProcessor.cc.

1549  {
1551  throw edm::Exception(edm::errors::LogicError) << "EventProcessor::readLuminosityBlock\n"
1552  << "Illegal attempt to insert lumi into cache\n"
1553  << "Run is invalid\n"
1554  << "Contact a Framework Developer\n";
1555  }
1557  assert(lbp);
1558  lbp->setAux(*input_->luminosityBlockAuxiliary());
1559  {
1560  SendSourceTerminationSignalIfException sentry(actReg_.get());
1561  input_->readLuminosityBlock(*lbp, *historyAppender_);
1562  sentry.completedSuccessfully();
1563  }
1564  lbp->setRunPrincipal(principalCache_.runPrincipalPtr());
1565  iStatus.lumiPrincipal() = std::move(lbp);
1566  }

References actReg_, cms::cuda::assert(), Exception, edm::PrincipalCache::getAvailableLumiPrincipalPtr(), edm::PrincipalCache::hasRunPrincipal(), historyAppender_, input_, edm::errors::LogicError, edm::LuminosityBlockProcessingStatus::lumiPrincipal(), eostools::move(), principalCache_, and edm::PrincipalCache::runPrincipalPtr().

Referenced by beginLumiAsync().

◆ readNextEventForStream()

bool edm::EventProcessor::readNextEventForStream ( unsigned int  iStreamIndex,
LuminosityBlockProcessingStatus iLumiStatus 
)
private

Definition at line 1661 of file EventProcessor.cc.

1661  {
1662  if (deferredExceptionPtrIsSet_.load(std::memory_order_acquire)) {
1663  iStatus.endLumi();
1664  return false;
1665  }
1666 
1667  if (iStatus.wasEventProcessingStopped()) {
1668  return false;
1669  }
1670 
1671  if (shouldWeStop()) {
1673  iStatus.stopProcessingEvents();
1674  iStatus.endLumi();
1675  return false;
1676  }
1677 
1679  // Caught exception is propagated to EventProcessor::runToCompletion() via deferredExceptionPtr_
1680  CMS_SA_ALLOW try {
1681  //need to use lock in addition to the serial task queue because
1682  // of delayed provenance reading and reading data in response to
1683  // edm::Refs etc
1684  std::lock_guard<std::recursive_mutex> guard(*(sourceMutex_.get()));
1685 
1686  auto itemType = iStatus.continuingLumi() ? InputSource::IsLumi : nextTransitionType();
1687  if (InputSource::IsLumi == itemType) {
1688  iStatus.haveContinuedLumi();
1689  while (itemType == InputSource::IsLumi and iStatus.lumiPrincipal()->run() == input_->run() and
1690  iStatus.lumiPrincipal()->luminosityBlock() == nextLuminosityBlockID()) {
1691  readAndMergeLumi(iStatus);
1692  itemType = nextTransitionType();
1693  }
1694  if (InputSource::IsLumi == itemType) {
1695  iStatus.setNextSyncValue(IOVSyncValue(EventID(input_->run(), input_->luminosityBlock(), 0),
1696  input_->luminosityBlockAuxiliary()->beginTime()));
1697  }
1698  }
1699  if (InputSource::IsEvent != itemType) {
1700  iStatus.stopProcessingEvents();
1701 
1702  //IsFile may continue processing the lumi and
1703  // looper_ can cause the input source to declare a new IsRun which is actually
1704  // just a continuation of the previous run
1705  if (InputSource::IsStop == itemType or InputSource::IsLumi == itemType or
1706  (InputSource::IsRun == itemType and iStatus.lumiPrincipal()->run() != input_->run())) {
1707  iStatus.endLumi();
1708  }
1709  return false;
1710  }
1711  readEvent(iStreamIndex);
1712  } catch (...) {
1713  bool expected = false;
1714  if (deferredExceptionPtrIsSet_.compare_exchange_strong(expected, true)) {
1715  deferredExceptionPtr_ = std::current_exception();
1716  iStatus.endLumi();
1717  }
1718  return false;
1719  }
1720  return true;
1721  }

References CMS_SA_ALLOW, edm::LuminosityBlockProcessingStatus::continuingLumi(), deferredExceptionPtr_, deferredExceptionPtrIsSet_, edm::LuminosityBlockProcessingStatus::endLumi(), edm::LuminosityBlockProcessingStatus::haveContinuedLumi(), input_, edm::InputSource::IsEvent, edm::InputSource::IsLumi, edm::InputSource::IsRun, edm::InputSource::IsStop, lastSourceTransition_, edm::LuminosityBlockProcessingStatus::lumiPrincipal(), nextLuminosityBlockID(), nextTransitionType(), or, readAndMergeLumi(), readEvent(), serviceToken_, edm::LuminosityBlockProcessingStatus::setNextSyncValue(), shouldWeStop(), sourceMutex_, edm::LuminosityBlockProcessingStatus::stopProcessingEvents(), and edm::LuminosityBlockProcessingStatus::wasEventProcessingStopped().

Referenced by handleNextEventForStreamAsync().

◆ readRun()

std::pair< ProcessHistoryID, RunNumber_t > edm::EventProcessor::readRun ( )

Definition at line 1514 of file EventProcessor.cc.

1514  {
1516  throw edm::Exception(edm::errors::LogicError) << "EventProcessor::readRun\n"
1517  << "Illegal attempt to insert run into cache\n"
1518  << "Contact a Framework Developer\n";
1519  }
1520  auto rp = std::make_shared<RunPrincipal>(input_->runAuxiliary(),
1521  preg(),
1523  historyAppender_.get(),
1524  0,
1525  true,
1527  {
1528  SendSourceTerminationSignalIfException sentry(actReg_.get());
1529  input_->readRun(*rp, *historyAppender_);
1530  sentry.completedSuccessfully();
1531  }
1532  assert(input_->reducedProcessHistoryID() == rp->reducedProcessHistoryID());
1533  principalCache_.insert(rp);
1534  return std::make_pair(rp->reducedProcessHistoryID(), input_->run());
1535  }

References actReg_, cms::cuda::assert(), Exception, edm::PrincipalCache::hasRunPrincipal(), historyAppender_, input_, edm::PrincipalCache::insert(), edm::errors::LogicError, mergeableRunProductProcesses_, preg(), principalCache_, and processConfiguration_.

◆ respondToCloseInputFile()

void edm::EventProcessor::respondToCloseInputFile ( )

Definition at line 810 of file EventProcessor.cc.

810  {
811  if (fb_.get() != nullptr) {
812  schedule_->respondToCloseInputFile(*fb_);
813  for_all(subProcesses_, [this](auto& subProcess) { subProcess.respondToCloseInputFile(*fb_); });
814  }
815  FDEBUG(1) << "\trespondToCloseInputFile\n";
816  }

References fb_, FDEBUG, edm::for_all(), schedule_, and subProcesses_.

◆ respondToOpenInputFile()

void edm::EventProcessor::respondToOpenInputFile ( )

Definition at line 800 of file EventProcessor.cc.

800  {
802  [this](auto& subProcess) { subProcess.updateBranchIDListHelper(branchIDListHelper_->branchIDLists()); });
803  if (fb_.get() != nullptr) {
804  schedule_->respondToOpenInputFile(*fb_);
805  for_all(subProcesses_, [this](auto& subProcess) { subProcess.respondToOpenInputFile(*fb_); });
806  }
807  FDEBUG(1) << "\trespondToOpenInputFile\n";
808  }

References branchIDListHelper_, fb_, FDEBUG, edm::for_all(), schedule_, and subProcesses_.

◆ rewindInput()

void edm::EventProcessor::rewindInput ( )

Definition at line 843 of file EventProcessor.cc.

843  {
844  input_->repeat();
845  input_->rewind();
846  FDEBUG(1) << "\trewind\n";
847  }

References FDEBUG, and input_.

Referenced by runToCompletion().

◆ run()

EventProcessor::StatusCode edm::EventProcessor::run ( )
inline

◆ runToCompletion()

EventProcessor::StatusCode edm::EventProcessor::runToCompletion ( )

Definition at line 688 of file EventProcessor.cc.

688  {
691  {
692  beginJob(); //make sure this was called
693 
694  // make the services available
696 
698  try {
699  FilesProcessor fp(fileModeNoMerge_);
700 
701  convertException::wrap([&]() {
702  bool firstTime = true;
703  do {
704  if (not firstTime) {
706  rewindInput();
707  } else {
708  firstTime = false;
709  }
710  startingNewLoop();
711 
712  auto trans = fp.processFiles(*this);
713 
714  fp.normalEnd();
715 
716  if (deferredExceptionPtrIsSet_.load()) {
717  std::rethrow_exception(deferredExceptionPtr_);
718  }
719  if (trans != InputSource::IsStop) {
720  //problem with the source
721  doErrorStuff();
722 
723  throw cms::Exception("BadTransition") << "Unexpected transition change " << trans;
724  }
725  } while (not endOfLoop());
726  }); // convertException::wrap
727 
728  } // Try block
729  catch (cms::Exception& e) {
731  std::string message(
732  "Another exception was caught while trying to clean up lumis after the primary fatal exception.");
733  e.addAdditionalInfo(message);
734  if (e.alreadyPrinted()) {
735  LogAbsolute("Additional Exceptions") << message;
736  }
737  }
738  if (!exceptionMessageRuns_.empty()) {
739  e.addAdditionalInfo(exceptionMessageRuns_);
740  if (e.alreadyPrinted()) {
741  LogAbsolute("Additional Exceptions") << exceptionMessageRuns_;
742  }
743  }
744  if (!exceptionMessageFiles_.empty()) {
745  e.addAdditionalInfo(exceptionMessageFiles_);
746  if (e.alreadyPrinted()) {
747  LogAbsolute("Additional Exceptions") << exceptionMessageFiles_;
748  }
749  }
750  throw;
751  }
752  }
753 
754  return returnCode;
755  }

References asyncStopRequestedWhileProcessingEvents_, asyncStopStatusCodeFromProcessingEvents_, beginJob(), deferredExceptionPtr_, deferredExceptionPtrIsSet_, doErrorStuff(), MillePedeFileConverter_cfg::e, endOfLoop(), epSuccess, Exception, exceptionMessageFiles_, exceptionMessageLumis_, exceptionMessageRuns_, fileModeNoMerge_, personalPlayback::fp, edm::InputSource::IsStop, prepareForNextLoop(), runEdmFileComparison::returnCode, rewindInput(), serviceToken_, startingNewLoop(), AlCaHLTBitMon_QueryRunRegistry::string, and edm::convertException::wrap().

Referenced by PythonEventProcessor::run(), and run().

◆ setDeferredException()

bool edm::EventProcessor::setDeferredException ( std::exception_ptr  iException)

Definition at line 1902 of file EventProcessor.cc.

1902  {
1903  bool expected = false;
1904  if (deferredExceptionPtrIsSet_.compare_exchange_strong(expected, true)) {
1905  deferredExceptionPtr_ = iException;
1906  return true;
1907  }
1908  return false;
1909  }

References deferredExceptionPtr_, and deferredExceptionPtrIsSet_.

Referenced by handleEndLumiExceptions().

◆ setExceptionMessageFiles()

void edm::EventProcessor::setExceptionMessageFiles ( std::string &  message)

Definition at line 1896 of file EventProcessor.cc.

1896 { exceptionMessageFiles_ = message; }

References exceptionMessageFiles_.

◆ setExceptionMessageLumis()

void edm::EventProcessor::setExceptionMessageLumis ( )

Definition at line 1900 of file EventProcessor.cc.

1900 { exceptionMessageLumis_ = true; }

References exceptionMessageLumis_.

Referenced by handleEndLumiExceptions().

◆ setExceptionMessageRuns()

void edm::EventProcessor::setExceptionMessageRuns ( std::string &  message)

Definition at line 1898 of file EventProcessor.cc.

1898 { exceptionMessageRuns_ = message; }

References exceptionMessageRuns_.

◆ shouldWeCloseOutput()

bool edm::EventProcessor::shouldWeCloseOutput ( ) const

Definition at line 854 of file EventProcessor.cc.

854  {
855  FDEBUG(1) << "\tshouldWeCloseOutput\n";
856  if (!subProcesses_.empty()) {
857  for (auto const& subProcess : subProcesses_) {
858  if (subProcess.shouldWeCloseOutput()) {
859  return true;
860  }
861  }
862  return false;
863  }
864  return schedule_->shouldWeCloseOutput();
865  }

References FDEBUG, schedule_, and subProcesses_.

◆ shouldWeStop()

bool edm::EventProcessor::shouldWeStop ( ) const

Definition at line 1881 of file EventProcessor.cc.

1881  {
1882  FDEBUG(1) << "\tshouldWeStop\n";
1883  if (shouldWeStop_)
1884  return true;
1885  if (!subProcesses_.empty()) {
1886  for (auto const& subProcess : subProcesses_) {
1887  if (subProcess.terminate()) {
1888  return true;
1889  }
1890  }
1891  return false;
1892  }
1893  return schedule_->terminate();
1894  }

References FDEBUG, schedule_, shouldWeStop_, and subProcesses_.

Referenced by readNextEventForStream().

◆ startingNewLoop()

void edm::EventProcessor::startingNewLoop ( )

Definition at line 818 of file EventProcessor.cc.

818  {
819  shouldWeStop_ = false;
820  //NOTE: for first loop, need to delay calling 'doStartingNewLoop'
821  // until after we've called beginOfJob
822  if (looper_ && looperBeginJobRun_) {
823  looper_->doStartingNewLoop();
824  }
825  FDEBUG(1) << "\tstartingNewLoop\n";
826  }

References FDEBUG, looper_, looperBeginJobRun_, and shouldWeStop_.

Referenced by runToCompletion().

◆ streamEndLumiAsync()

void edm::EventProcessor::streamEndLumiAsync ( edm::WaitingTaskHolder  iTask,
unsigned int  iStreamIndex 
)

Definition at line 1449 of file EventProcessor.cc.

1449  {
1450  auto t = edm::make_waiting_task(tbb::task::allocate_root(),
1451  [this, iStreamIndex, iTask](std::exception_ptr const* iPtr) mutable {
1452  if (iPtr) {
1453  handleEndLumiExceptions(iPtr, iTask);
1454  }
1455  auto status = streamLumiStatus_[iStreamIndex];
1456  //reset status before releasing queue else get race condtion
1457  streamLumiStatus_[iStreamIndex].reset();
1459  streamQueues_[iStreamIndex].resume();
1460 
1461  //are we the last one?
1462  if (status->streamFinishedLumi()) {
1464  }
1465  });
1466 
1467  edm::WaitingTaskHolder lumiDoneTask{t};
1468 
1469  //Need to be sure the lumi status is released before lumiDoneTask can every be called.
1470  // therefore we do not want to hold the shared_ptr
1471  auto lumiStatus = streamLumiStatus_[iStreamIndex].get();
1472  lumiStatus->setEndTime();
1473 
1474  EventSetupImpl const& es = lumiStatus->eventSetupImpl(esp_->subProcessIndex());
1475 
1476  bool cleaningUpAfterException = lumiStatus->cleaningUpAfterException();
1477  auto eventSetupImpls = &lumiStatus->eventSetupImpls();
1478 
1479  if (lumiStatus->didGlobalBeginSucceed()) {
1480  auto& lumiPrincipal = *lumiStatus->lumiPrincipal();
1481  IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), EventID::maxEventNumber()),
1482  lumiPrincipal.endTime());
1483  using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamEnd>;
1484  LumiTransitionInfo transitionInfo(lumiPrincipal, es, eventSetupImpls);
1485  endStreamTransitionAsync<Traits>(std::move(lumiDoneTask),
1486  *schedule_,
1487  iStreamIndex,
1488  transitionInfo,
1489  serviceToken_,
1490  subProcesses_,
1491  cleaningUpAfterException);
1492  }
1493  }

References esp_, globalEndLumiAsync(), handleEndLumiExceptions(), edm::make_waiting_task(), edm::EventID::maxEventNumber(), eostools::move(), schedule_, serviceToken_, mps_update::status, streamLumiActive_, streamLumiStatus_, streamQueues_, subProcesses_, and submitPVValidationJobs::t.

Referenced by beginLumiAsync(), endUnfinishedLumi(), and handleNextEventForStreamAsync().

◆ taskCleanup()

void edm::EventProcessor::taskCleanup ( )

Definition at line 533 of file EventProcessor.cc.

533 { espController_->endIOVs(); }

References espController_.

◆ thinnedAssociationsHelper() [1/2]

std::shared_ptr<ThinnedAssociationsHelper>& edm::EventProcessor::thinnedAssociationsHelper ( )
inlineprivate

Definition at line 298 of file EventProcessor.h.

298  {
300  }

References edm::get_underlying_safe(), and thinnedAssociationsHelper_.

◆ thinnedAssociationsHelper() [2/2]

std::shared_ptr<ThinnedAssociationsHelper const> edm::EventProcessor::thinnedAssociationsHelper ( ) const
inlineprivate

Definition at line 295 of file EventProcessor.h.

295  {
297  }

References edm::get_underlying_safe(), and thinnedAssociationsHelper_.

Referenced by init().

◆ totalEvents()

int edm::EventProcessor::totalEvents ( ) const

Return the number of events this EventProcessor has tried to process (inclues both successes and failures, including failures due to exceptions during processing).

Definition at line 629 of file EventProcessor.cc.

629 { return schedule_->totalEvents(); }

References schedule_.

Referenced by PythonEventProcessor::totalEvents().

◆ totalEventsFailed()

int edm::EventProcessor::totalEventsFailed ( ) const

Return the number of events that have not passed any trigger. (N.B. totalEventsFailed() + totalEventsPassed() == totalEvents()

Definition at line 633 of file EventProcessor.cc.

633 { return schedule_->totalEventsFailed(); }

References schedule_.

Referenced by PythonEventProcessor::totalEventsFailed().

◆ totalEventsPassed()

int edm::EventProcessor::totalEventsPassed ( ) const

Return the number of events processed by this EventProcessor which have been passed by one or more trigger paths.

Definition at line 631 of file EventProcessor.cc.

631 { return schedule_->totalEventsPassed(); }

References schedule_.

Referenced by PythonEventProcessor::totalEventsPassed().

◆ warnAboutModulesRequiringLuminosityBLockSynchronization()

void edm::EventProcessor::warnAboutModulesRequiringLuminosityBLockSynchronization ( ) const
private

Definition at line 1911 of file EventProcessor.cc.

1911  {
1912  std::unique_ptr<LogSystem> s;
1913  for (auto worker : schedule_->allWorkers()) {
1914  if (worker->wantsGlobalLuminosityBlocks() and worker->globalLuminosityBlocksQueue()) {
1915  if (not s) {
1916  s = std::make_unique<LogSystem>("ModulesSynchingOnLumis");
1917  (*s) << "The following modules require synchronizing on LuminosityBlock boundaries:";
1918  }
1919  (*s) << "\n " << worker->description().moduleName() << " " << worker->description().moduleLabel();
1920  }
1921  }
1922  }

References alignCSCRings::s, and schedule_.

Referenced by beginJob().

◆ writeLumiAsync()

void edm::EventProcessor::writeLumiAsync ( WaitingTaskHolder  task,
LuminosityBlockPrincipal lumiPrincipal 
)

Definition at line 1634 of file EventProcessor.cc.

1634  {
1635  auto subsT = edm::make_waiting_task(tbb::task::allocate_root(),
1636  [this, task, &lumiPrincipal](std::exception_ptr const* iExcept) mutable {
1637  if (iExcept) {
1638  task.doneWaiting(*iExcept);
1639  } else {
1641  for (auto& s : subProcesses_) {
1642  s.writeLumiAsync(task, lumiPrincipal);
1643  }
1644  }
1645  });
1647 
1648  lumiPrincipal.runPrincipal().mergeableRunProductMetadata()->writeLumi(lumiPrincipal.luminosityBlock());
1649 
1650  schedule_->writeLumiAsync(WaitingTaskHolder{subsT}, lumiPrincipal, &processContext_, actReg_.get());
1651  }

References actReg_, edm::LuminosityBlockPrincipal::luminosityBlock(), edm::make_waiting_task(), edm::RunPrincipal::mergeableRunProductMetadata(), processContext_, edm::LuminosityBlockPrincipal::runPrincipal(), alignCSCRings::s, schedule_, serviceToken_, subProcesses_, TrackValidation_cff::task, and edm::MergeableRunProductMetadata::writeLumi().

Referenced by globalEndLumiAsync().

◆ writeProcessBlockAsync()

void edm::EventProcessor::writeProcessBlockAsync ( WaitingTaskHolder  task,
ProcessBlockType  processBlockType 
)

Definition at line 1585 of file EventProcessor.cc.

1585  {
1586  auto subsT = edm::make_waiting_task(tbb::task::allocate_root(),
1587  [this, task, processBlockType](std::exception_ptr const* iExcept) mutable {
1588  if (iExcept) {
1589  task.doneWaiting(*iExcept);
1590  } else {
1592  for (auto& s : subProcesses_) {
1593  s.writeProcessBlockAsync(task, processBlockType);
1594  }
1595  }
1596  });
1598  schedule_->writeProcessBlockAsync(WaitingTaskHolder(subsT),
1599  principalCache_.processBlockPrincipal(processBlockType),
1600  &processContext_,
1601  actReg_.get());
1602  }

References actReg_, edm::make_waiting_task(), principalCache_, edm::PrincipalCache::processBlockPrincipal(), processContext_, alignCSCRings::s, schedule_, serviceToken_, subProcesses_, and TrackValidation_cff::task.

Referenced by endProcessBlock(), and inputProcessBlocks().

◆ writeRunAsync()

void edm::EventProcessor::writeRunAsync ( WaitingTaskHolder  task,
ProcessHistoryID const &  phid,
RunNumber_t  run,
MergeableRunProductMetadata const *  mergeableRunProductMetadata 
)

Definition at line 1604 of file EventProcessor.cc.

1607  {
1608  auto subsT = edm::make_waiting_task(
1609  tbb::task::allocate_root(),
1610  [this, phid, run, task, mergeableRunProductMetadata](std::exception_ptr const* iExcept) mutable {
1611  if (iExcept) {
1612  task.doneWaiting(*iExcept);
1613  } else {
1615  for (auto& s : subProcesses_) {
1616  s.writeRunAsync(task, phid, run, mergeableRunProductMetadata);
1617  }
1618  }
1619  });
1621  schedule_->writeRunAsync(WaitingTaskHolder(subsT),
1623  &processContext_,
1624  actReg_.get(),
1625  mergeableRunProductMetadata);
1626  }

References actReg_, edm::make_waiting_task(), principalCache_, processContext_, run(), edm::PrincipalCache::runPrincipal(), alignCSCRings::s, schedule_, serviceToken_, subProcesses_, and TrackValidation_cff::task.

Referenced by endUnfinishedRun().

Member Data Documentation

◆ act_table_

std::unique_ptr<ExceptionToActionTable const> edm::EventProcessor::act_table_
private

Definition at line 322 of file EventProcessor.h.

Referenced by init().

◆ actReg_

std::shared_ptr<ActivityRegistry> edm::EventProcessor::actReg_
private

◆ asyncStopRequestedWhileProcessingEvents_

bool edm::EventProcessor::asyncStopRequestedWhileProcessingEvents_
private

Definition at line 358 of file EventProcessor.h.

Referenced by runToCompletion().

◆ asyncStopStatusCodeFromProcessingEvents_

StatusCode edm::EventProcessor::asyncStopStatusCodeFromProcessingEvents_
private

Definition at line 359 of file EventProcessor.h.

Referenced by runToCompletion().

◆ beginJobCalled_

bool edm::EventProcessor::beginJobCalled_
private

Definition at line 346 of file EventProcessor.h.

Referenced by beginJob().

◆ branchIDListHelper_

edm::propagate_const<std::shared_ptr<BranchIDListHelper> > edm::EventProcessor::branchIDListHelper_
private

Definition at line 314 of file EventProcessor.h.

Referenced by branchIDListHelper(), init(), and respondToOpenInputFile().

◆ deferredExceptionPtr_

std::exception_ptr edm::EventProcessor::deferredExceptionPtr_
private

◆ deferredExceptionPtrIsSet_

std::atomic<bool> edm::EventProcessor::deferredExceptionPtrIsSet_
private

◆ esp_

edm::propagate_const<std::shared_ptr<eventsetup::EventSetupProvider> > edm::EventProcessor::esp_
private

◆ espController_

edm::propagate_const<std::unique_ptr<eventsetup::EventSetupsController> > edm::EventProcessor::espController_
private

◆ eventSetupDataToExcludeFromPrefetching_

ExcludedDataMap edm::EventProcessor::eventSetupDataToExcludeFromPrefetching_
private

Definition at line 364 of file EventProcessor.h.

◆ exceptionMessageFiles_

std::string edm::EventProcessor::exceptionMessageFiles_
private

Definition at line 349 of file EventProcessor.h.

Referenced by runToCompletion(), and setExceptionMessageFiles().

◆ exceptionMessageLumis_

std::atomic<bool> edm::EventProcessor::exceptionMessageLumis_
private

Definition at line 351 of file EventProcessor.h.

Referenced by runToCompletion(), and setExceptionMessageLumis().

◆ exceptionMessageRuns_

std::string edm::EventProcessor::exceptionMessageRuns_
private

Definition at line 350 of file EventProcessor.h.

Referenced by runToCompletion(), and setExceptionMessageRuns().

◆ fb_

edm::propagate_const<std::unique_ptr<FileBlock> > edm::EventProcessor::fb_
private

◆ fileModeNoMerge_

bool edm::EventProcessor::fileModeNoMerge_
private

Definition at line 348 of file EventProcessor.h.

Referenced by init(), and runToCompletion().

◆ firstEventInBlock_

bool edm::EventProcessor::firstEventInBlock_ = true
private

Definition at line 360 of file EventProcessor.h.

◆ forceESCacheClearOnNewRun_

bool edm::EventProcessor::forceESCacheClearOnNewRun_
private

Definition at line 354 of file EventProcessor.h.

Referenced by beginRun(), and init().

◆ forceLooperToEnd_

bool edm::EventProcessor::forceLooperToEnd_
private

Definition at line 352 of file EventProcessor.h.

Referenced by endOfLoop().

◆ historyAppender_

edm::propagate_const<std::unique_ptr<HistoryAppender> > edm::EventProcessor::historyAppender_
private

Definition at line 334 of file EventProcessor.h.

Referenced by init(), readLuminosityBlock(), and readRun().

◆ input_

edm::propagate_const<std::unique_ptr<InputSource> > edm::EventProcessor::input_
private

◆ lastSourceTransition_

InputSource::ItemType edm::EventProcessor::lastSourceTransition_
private

◆ looper_

edm::propagate_const<std::shared_ptr<EDLooperBase> > edm::EventProcessor::looper_
private

◆ looperBeginJobRun_

bool edm::EventProcessor::looperBeginJobRun_
private

Definition at line 353 of file EventProcessor.h.

Referenced by beginRun(), and startingNewLoop().

◆ lumiQueue_

std::unique_ptr<edm::LimitedTaskQueue> edm::EventProcessor::lumiQueue_
private

Definition at line 329 of file EventProcessor.h.

Referenced by beginLumiAsync(), and init().

◆ mergeableRunProductProcesses_

MergeableRunProductProcesses edm::EventProcessor::mergeableRunProductProcesses_
private

Definition at line 326 of file EventProcessor.h.

Referenced by init(), and readRun().

◆ pathsAndConsumesOfModules_

PathsAndConsumesOfModules edm::EventProcessor::pathsAndConsumesOfModules_
private

Definition at line 325 of file EventProcessor.h.

Referenced by beginJob().

◆ preallocations_

PreallocationConfiguration edm::EventProcessor::preallocations_
private

◆ preg_

edm::propagate_const<std::shared_ptr<ProductRegistry> > edm::EventProcessor::preg_
private

Definition at line 313 of file EventProcessor.h.

Referenced by beginJob(), endOfLoop(), init(), preg(), and readFile().

◆ principalCache_

PrincipalCache edm::EventProcessor::principalCache_
private

◆ printDependencies_

bool edm::EventProcessor::printDependencies_ = false
private

Definition at line 366 of file EventProcessor.h.

Referenced by beginJob(), and init().

◆ processConfiguration_

std::shared_ptr<ProcessConfiguration const> edm::EventProcessor::processConfiguration_
private

◆ processContext_

ProcessContext edm::EventProcessor::processContext_
private

◆ queueWhichWaitsForIOVsToFinish_

edm::SerialTaskQueue edm::EventProcessor::queueWhichWaitsForIOVsToFinish_
private

Definition at line 321 of file EventProcessor.h.

Referenced by beginLumiAsync(), and globalEndLumiAsync().

◆ schedule_

edm::propagate_const<std::unique_ptr<Schedule> > edm::EventProcessor::schedule_
private

◆ serviceToken_

ServiceToken edm::EventProcessor::serviceToken_
private

◆ shouldWeStop_

bool edm::EventProcessor::shouldWeStop_
private

Definition at line 347 of file EventProcessor.h.

Referenced by processEventWithLooper(), shouldWeStop(), and startingNewLoop().

◆ sourceMutex_

std::shared_ptr<std::recursive_mutex> edm::EventProcessor::sourceMutex_
private

Definition at line 344 of file EventProcessor.h.

Referenced by readNextEventForStream().

◆ sourceResourcesAcquirer_

SharedResourcesAcquirer edm::EventProcessor::sourceResourcesAcquirer_
private

Definition at line 343 of file EventProcessor.h.

Referenced by beginLumiAsync(), and handleNextEventForStreamAsync().

◆ streamLumiActive_

std::atomic<unsigned int> edm::EventProcessor::streamLumiActive_ {0}
private

◆ streamLumiStatus_

std::vector<std::shared_ptr<LuminosityBlockProcessingStatus> > edm::EventProcessor::streamLumiStatus_
private

◆ streamQueues_

std::vector<edm::SerialTaskQueue> edm::EventProcessor::streamQueues_
private

Definition at line 328 of file EventProcessor.h.

Referenced by beginLumiAsync(), init(), and streamEndLumiAsync().

◆ subProcesses_

std::vector<SubProcess> edm::EventProcessor::subProcesses_
private

◆ thinnedAssociationsHelper_

edm::propagate_const<std::shared_ptr<ThinnedAssociationsHelper> > edm::EventProcessor::thinnedAssociationsHelper_
private

Definition at line 315 of file EventProcessor.h.

Referenced by init(), and thinnedAssociationsHelper().

edm::EventProcessor::looperBeginJobRun_
bool looperBeginJobRun_
Definition: EventProcessor.h:353
edm::SharedResourcesAcquirer::serialQueueChain
SerialTaskQueueChain & serialQueueChain() const
Definition: SharedResourcesAcquirer.h:54
edm::pset::Registry::instance
static Registry * instance()
Definition: Registry.cc:12
edm::EventProcessor::deleteRunFromCache
void deleteRunFromCache(ProcessHistoryID const &phid, RunNumber_t run)
Definition: EventProcessor.cc:1628
edm::EDLooperBase::Status
Status
Definition: EDLooperBase.h:79
edm::EventProcessor::historyAppender_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
Definition: EventProcessor.h:334
bk::beginJob
void beginJob()
Definition: Breakpoints.cc:14
edm::EventProcessor::thinnedAssociationsHelper
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
Definition: EventProcessor.h:295
processOptions_cff.fileMode
fileMode
Definition: processOptions_cff.py:5
edm::EventProcessor::rewindInput
void rewindInput()
Definition: EventProcessor.cc:843
edm::EventProcessor::eventSetupDataToExcludeFromPrefetching_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
Definition: EventProcessor.h:364
edm::PrincipalCache::ProcessBlockType::New
mps_fire.i
i
Definition: mps_fire.py:428
edm::PreallocationConfiguration::numberOfRuns
unsigned int numberOfRuns() const
Definition: PreallocationConfiguration.h:37
edm::EventProcessor::preg_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
Definition: EventProcessor.h:313
edm::popSubProcessVParameterSet
std::vector< ParameterSet > popSubProcessVParameterSet(ParameterSet &parameterSet)
Definition: SubProcess.cc:713
edm::EventProcessor::writeRunAsync
void writeRunAsync(WaitingTaskHolder, ProcessHistoryID const &phid, RunNumber_t run, MergeableRunProductMetadata const *)
Definition: EventProcessor.cc:1604
edm::EventProcessor::StatusCode
StatusCode
Definition: EventProcessor.h:74
edm::EventProcessor::endOfLoop
bool endOfLoop()
Definition: EventProcessor.cc:828
cms::Exception::addContext
void addContext(std::string const &context)
Definition: Exception.cc:165
edm::EventProcessor::input_
edm::propagate_const< std::unique_ptr< InputSource > > input_
Definition: EventProcessor.h:317
edm::EventProcessor::getToken
ServiceToken getToken()
Definition: EventProcessor.cc:623
edm::EventProcessor::startingNewLoop
void startingNewLoop()
Definition: EventProcessor.cc:818
edm::ParameterSet::getUntrackedParameterSet
ParameterSet getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
Definition: ParameterSet.cc:2136
edm::errors::LogicError
Definition: EDMException.h:37
edm::validateTopLevelParameterSets
void validateTopLevelParameterSets(ParameterSet *processParameterSet)
Definition: validateTopLevelParameterSets.cc:89
mps_update.status
status
Definition: mps_update.py:69
edm::PreallocationConfiguration::numberOfThreads
unsigned int numberOfThreads() const
Definition: PreallocationConfiguration.h:34
edm::EventProcessor::deleteLumiFromCache
void deleteLumiFromCache(LuminosityBlockProcessingStatus &)
Definition: EventProcessor.cc:1653
edm::SerialTaskQueue::resume
bool resume()
Resumes processing if the queue was paused.
Definition: SerialTaskQueue.cc:35
edm::PrincipalCache::setProcessHistoryRegistry
void setProcessHistoryRegistry(ProcessHistoryRegistry const &phr)
Definition: PrincipalCache.h:87
edm::TerminationOrigin::ExternalSignal
edm::EventProcessor::globalEndLumiAsync
void globalEndLumiAsync(edm::WaitingTaskHolder iTask, std::shared_ptr< LuminosityBlockProcessingStatus > iLumiStatus)
Definition: EventProcessor.cc:1360
edm::ParentageRegistry::instance
static ParentageRegistry * instance()
Definition: ParentageRegistry.cc:4
edm::EventProcessor::runToCompletion
StatusCode runToCompletion()
Definition: EventProcessor.cc:688
edm::EventProcessor::esp_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
Definition: EventProcessor.h:320
edm::EventProcessor::shouldWeStop_
bool shouldWeStop_
Definition: EventProcessor.h:347
edm::EventProcessor::readEvent
void readEvent(unsigned int iStreamIndex)
Definition: EventProcessor.cc:1781
edm::EventProcessor::epTimedOut
Definition: EventProcessor.h:80
edm::make_functor_task
FunctorTask< F > * make_functor_task(ALLOC &&iAlloc, F f)
Definition: FunctorTask.h:47
cms::cuda::assert
assert(be >=bs)
edm::ParentageRegistry::clear
void clear()
Not thread safe.
Definition: ParentageRegistry.cc:28
edm::second
U second(std::pair< T, U > const &p)
Definition: ParameterSet.cc:222
edm::EventProcessor::deferredExceptionPtr_
std::exception_ptr deferredExceptionPtr_
Definition: EventProcessor.h:341
info
static const TGPicture * info(bool iBackgroundIsBlack)
Definition: FWCollectionSummaryWidget.cc:153
edm::fillLooper
std::shared_ptr< EDLooperBase > fillLooper(eventsetup::EventSetupsController &esController, eventsetup::EventSetupProvider &cp, ParameterSet &params)
Definition: EventProcessor.cc:195
personalPlayback.fp
fp
Definition: personalPlayback.py:523
edm::EventProcessor::lumiQueue_
std::unique_ptr< edm::LimitedTaskQueue > lumiQueue_
Definition: EventProcessor.h:329
edm::get_underlying_safe
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
Definition: get_underlying_safe.h:41
edm::EventProcessor::processContext_
ProcessContext processContext_
Definition: EventProcessor.h:324
edm::EventProcessor::lastSourceTransition_
InputSource::ItemType lastSourceTransition_
Definition: EventProcessor.h:318
edm::LogInfo
Log< level::Info, false > LogInfo
Definition: MessageLogger.h:125
edm::WaitingTaskHolder::doneWaiting
void doneWaiting(std::exception_ptr iExcept)
Definition: WaitingTaskHolder.h:75
edm::EventProcessor::pathsAndConsumesOfModules_
PathsAndConsumesOfModules pathsAndConsumesOfModules_
Definition: EventProcessor.h:325
createJobs.tmp
tmp
align.sh
Definition: createJobs.py:716
edm::EventProcessor::exceptionMessageFiles_
std::string exceptionMessageFiles_
Definition: EventProcessor.h:349
edm::makeInput
std::unique_ptr< InputSource > makeInput(ParameterSet &params, CommonParams const &common, std::shared_ptr< ProductRegistry > preg, std::shared_ptr< BranchIDListHelper > branchIDListHelper, std::shared_ptr< ThinnedAssociationsHelper > thinnedAssociationsHelper, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, PreallocationConfiguration const &allocations)
Definition: EventProcessor.cc:120
edm::EventProcessor::asyncStopRequestedWhileProcessingEvents_
bool asyncStopRequestedWhileProcessingEvents_
Definition: EventProcessor.h:358
edm::PrincipalCache::runPrincipalPtr
std::shared_ptr< RunPrincipal > const & runPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run) const
Definition: PrincipalCache.cc:28
CMS_SA_ALLOW
#define CMS_SA_ALLOW
Definition: thread_safety_macros.h:5
edm::PathsAndConsumesOfModules::initialize
void initialize(Schedule const *, std::shared_ptr< ProductRegistry const >)
Definition: PathsAndConsumesOfModules.cc:14
runTheMatrix.nStreams
nStreams
Definition: runTheMatrix.py:362
mps_monitormerge.items
list items
Definition: mps_monitormerge.py:29
groupFilesInBlocks.reverse
reverse
Definition: groupFilesInBlocks.py:131
edm::InputSource::IsRun
Definition: InputSource.h:78
edm::for_all
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
edm::ProcessingController::kToPreviousEvent
Definition: ProcessingController.h:58
edm::EventProcessor::processEventAsyncImpl
void processEventAsyncImpl(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
Definition: EventProcessor.cc:1800
edm::first
T first(std::pair< T, U > const &p)
Definition: ParameterSet.cc:217
edm::PreallocationConfiguration::numberOfLuminosityBlocks
unsigned int numberOfLuminosityBlocks() const
Definition: PreallocationConfiguration.h:36
edm::PrincipalCache::runPrincipal
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const
Definition: PrincipalCache.cc:21
edm::pset::Registry::clear
void clear()
Not thread safe.
Definition: Registry.cc:40
edm::ProcessContext::setProcessConfiguration
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
Definition: ProcessContext.cc:19
alignCSCRings.s
s
Definition: alignCSCRings.py:92
edm::EventProcessor::exceptionMessageLumis_
std::atomic< bool > exceptionMessageLumis_
Definition: EventProcessor.h:351
edm::ProcessingController::ForwardState
ForwardState
Definition: ProcessingController.h:31
edm::EventProcessor::readAndMergeLumi
int readAndMergeLumi(LuminosityBlockProcessingStatus &)
Definition: EventProcessor.cc:1568
edm::EventProcessor::queueWhichWaitsForIOVsToFinish_
edm::SerialTaskQueue queueWhichWaitsForIOVsToFinish_
Definition: EventProcessor.h:321
edm::EventProcessor::epSignal
Definition: EventProcessor.h:78
edm::SerialTaskQueue::push
void push(const T &iAction)
asynchronously pushes functor iAction into queue
Definition: SerialTaskQueue.h:187
DQM.reader
reader
Definition: DQM.py:105
edm::WaitingTaskHolder::taskHasFailed
bool taskHasFailed() const
Definition: WaitingTaskHolder.h:58
edm::SerialTaskQueueChain::push
void push(T &&iAction)
asynchronously pushes functor iAction into queue
Definition: SerialTaskQueueChain.h:86
edm::PrincipalCache::merge
void merge(std::shared_ptr< RunAuxiliary > aux, std::shared_ptr< ProductRegistry const > reg)
Definition: PrincipalCache.cc:54
edm::EventProcessor::streamQueues_
std::vector< edm::SerialTaskQueue > streamQueues_
Definition: EventProcessor.h:328
edm::EventProcessor::epInputComplete
Definition: EventProcessor.h:79
edm::convertException::wrap
auto wrap(F iFunc) -> decltype(iFunc())
Definition: ConvertException.h:19
ScheduleInfo
edm::EventProcessor::readLuminosityBlock
void readLuminosityBlock(LuminosityBlockProcessingStatus &)
Definition: EventProcessor.cc:1549
edm::InputSource::IsSynchronize
Definition: InputSource.h:78
TrackValidation_cff.task
task
Definition: TrackValidation_cff.py:252
edm::EventProcessor::sourceMutex_
std::shared_ptr< std::recursive_mutex > sourceMutex_
Definition: EventProcessor.h:344
edm::RunPrincipal::setEndTime
void setEndTime(Timestamp const &time)
Definition: RunPrincipal.h:71
edm::EventProcessor::beginJobCalled_
bool beginJobCalled_
Definition: EventProcessor.h:346
edm::ProcessingController::kToSpecifiedEvent
Definition: ProcessingController.h:58
Service
edm::checkForModuleDependencyCorrectness
void checkForModuleDependencyCorrectness(edm::PathsAndConsumesOfModulesBase const &iPnC, bool iPrintDependencies)
Definition: PathsAndConsumesOfModules.cc:130
LuminosityBlock
WaitingTaskHolder
edm::EventProcessor::fileModeNoMerge_
bool fileModeNoMerge_
Definition: EventProcessor.h:348
Event
IOVSyncValue
edm::EventProcessor::writeProcessBlockAsync
void writeProcessBlockAsync(WaitingTaskHolder, ProcessBlockType)
Definition: EventProcessor.cc:1585
edm::EventProcessor::warnAboutModulesRequiringLuminosityBLockSynchronization
void warnAboutModulesRequiringLuminosityBLockSynchronization() const
Definition: EventProcessor.cc:1911
h
runTheMatrix.nThreads
nThreads
Definition: runTheMatrix.py:361
edm::make_waiting_task
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:87
edm::IllegalParameters::setThrowAnException
static void setThrowAnException(bool v)
Definition: IllegalParameters.h:16
edm::EventProcessor::actReg_
std::shared_ptr< ActivityRegistry > actReg_
Definition: EventProcessor.h:312
edm::EventID::maxEventNumber
static EventNumber_t maxEventNumber()
Definition: EventID.h:96
edm::EventProcessor::exceptionMessageRuns_
std::string exceptionMessageRuns_
Definition: EventProcessor.h:350
edm::PrincipalCache::preReadFile
void preReadFile()
Definition: PrincipalCache.cc:149
edm::SharedResourcesRegistry::instance
static SharedResourcesRegistry * instance()
Definition: SharedResourcesRegistry.cc:25
AlCaHLTBitMon_QueryRunRegistry.string
string
Definition: AlCaHLTBitMon_QueryRunRegistry.py:256
edm::EventProcessor::printDependencies_
bool printDependencies_
Definition: EventProcessor.h:366
edm::EventProcessor::preallocations_
PreallocationConfiguration preallocations_
Definition: EventProcessor.h:356
edm::SerialTaskQueue::pause
bool pause()
Pauses processing of additional tasks from the queue.
Definition: SerialTaskQueue.h:99
edm::SubProcess::doEndJob
void doEndJob()
Definition: SubProcess.cc:225
summarizeEdmComparisonLogfiles.succeeded
succeeded
Definition: summarizeEdmComparisonLogfiles.py:101
edm::EventProcessor::act_table_
std::unique_ptr< ExceptionToActionTable const > act_table_
Definition: EventProcessor.h:322
edm::serviceregistry::kConfigurationOverrides
Definition: ServiceLegacy.h:29
edm::ServiceRegistry::Operate
friend class Operate
Definition: ServiceRegistry.h:54
edm::EventProcessor::beginLumiAsync
void beginLumiAsync(edm::IOVSyncValue const &iSyncValue, std::shared_ptr< void > const &iRunResource, edm::WaitingTaskHolder iHolder)
Definition: EventProcessor.cc:1155
edm::EventProcessor::espController_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
Definition: EventProcessor.h:319
edm::EventProcessor::streamLumiStatus_
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
Definition: EventProcessor.h:330
ParameterSet
Definition: Functions.h:16
edm::WaitingTaskHolder
Definition: WaitingTaskHolder.h:30
FDEBUG
#define FDEBUG(lev)
Definition: DebugMacros.h:19
edm::InputSource::IsLumi
Definition: InputSource.h:78
edm::EventProcessor::streamEndLumiAsync
void streamEndLumiAsync(edm::WaitingTaskHolder iTask, unsigned int iStreamIndex)
Definition: EventProcessor.cc:1449
edm::shutdown_flag
volatile std::atomic< bool > shutdown_flag
Definition: UnixSignalHandlers.cc:22
edm::EventProcessor::epException
Definition: EventProcessor.h:76
edm::EventProcessor::serviceToken_
ServiceToken serviceToken_
Definition: EventProcessor.h:316
edm::serviceregistry::kOverlapIsError
Definition: ServiceLegacy.h:29
edm::PrincipalCache::deleteRun
void deleteRun(ProcessHistoryID const &phid, RunNumber_t run)
Definition: PrincipalCache.cc:110
edm::EventProcessor::lastTransitionType
InputSource::ItemType lastTransitionType() const
Definition: EventProcessor.h:192
edm::Service
Definition: Service.h:30
edm::InputSource::doEndJob
void doEndJob()
Called by framework at end of job.
Definition: InputSource.cc:207
edm::EventProcessor::thinnedAssociationsHelper_
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
Definition: EventProcessor.h:315
edm::EventProcessor::readNextEventForStream
bool readNextEventForStream(unsigned int iStreamIndex, LuminosityBlockProcessingStatus &iLumiStatus)
Definition: EventProcessor.cc:1661
runEdmFileComparison.returnCode
returnCode
Definition: runEdmFileComparison.py:263
edm::LogAbsolute
Log< level::System, true > LogAbsolute
Definition: MessageLogger.h:134
edm::EventProcessor::forceLooperToEnd_
bool forceLooperToEnd_
Definition: EventProcessor.h:352
edm::EventProcessor::epCountComplete
Definition: EventProcessor.h:81
edm::PrincipalCache::insertForInput
void insertForInput(std::unique_ptr< ProcessBlockPrincipal >)
Definition: PrincipalCache.cc:98
edm::InputSource::IsStop
Definition: InputSource.h:78
edm::PrincipalCache::adjustEventsToNewProductRegistry
void adjustEventsToNewProductRegistry(std::shared_ptr< ProductRegistry const > reg)
Definition: PrincipalCache.cc:127
edm::EventProcessor::fb_
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
Definition: EventProcessor.h:336
edm::EDLooperBase::kContinue
Definition: EDLooperBase.h:79
edm::EventProcessor::epOther
Definition: EventProcessor.h:77
HltBtagPostValidation_cff.c
c
Definition: HltBtagPostValidation_cff.py:31
edm::EDLooperBase::endOfJob
virtual void endOfJob()
Definition: EDLooperBase.cc:90
edm::LogError
Log< level::Error, false > LogError
Definition: MessageLogger.h:123
ServiceToken
edm::EventProcessor::streamLumiActive_
std::atomic< unsigned int > streamLumiActive_
Definition: EventProcessor.h:331
edm::EventProcessor::mergeableRunProductProcesses_
MergeableRunProductProcesses mergeableRunProductProcesses_
Definition: EventProcessor.h:326
edm::make_empty_waiting_task
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
Definition: WaitingTaskList.h:96
edm::PrincipalCache::inputProcessBlockPrincipal
ProcessBlockPrincipal & inputProcessBlockPrincipal() const
Definition: PrincipalCache.h:55
edm::EventProcessor::asyncStopStatusCodeFromProcessingEvents_
StatusCode asyncStopStatusCodeFromProcessingEvents_
Definition: EventProcessor.h:359
edm::LimitedTaskQueue::Resumer
Definition: LimitedTaskQueue.h:68
edm::EventProcessor::looper_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
Definition: EventProcessor.h:337
edm::ProcessBlockPrincipal::fillProcessBlockPrincipal
void fillProcessBlockPrincipal(std::string const &processName, DelayedReader *reader=nullptr)
Definition: ProcessBlockPrincipal.cc:15
edm::EventProcessor::sourceResourcesAcquirer_
SharedResourcesAcquirer sourceResourcesAcquirer_
Definition: EventProcessor.h:343
edm::EventProcessor::setDeferredException
bool setDeferredException(std::exception_ptr)
Definition: EventProcessor.cc:1902
edm::InputSource::ItemType
ItemType
Definition: InputSource.h:78
edm::EventProcessor::continueLumiAsync
void continueLumiAsync(edm::WaitingTaskHolder iHolder)
Definition: EventProcessor.cc:1332
edm::EventProcessor::doErrorStuff
void doErrorStuff()
Definition: EventProcessor.cc:867
ModuleChanger
edm::PrincipalCache::ProcessBlockType::Input
edm::ParentageRegistry::insertMapped
bool insertMapped(value_type const &v)
Definition: ParentageRegistry.cc:24
eostools.move
def move(src, dest)
Definition: eostools.py:511
edm::RunPrincipal::mergeableRunProductMetadata
MergeableRunProductMetadata * mergeableRunProductMetadata()
Definition: RunPrincipal.h:79
edm::EventProcessor::principalCache_
PrincipalCache principalCache_
Definition: EventProcessor.h:345
ProcessingController
edm::EventProcessor::branchIDListHelper
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
Definition: EventProcessor.h:291
edm::PrincipalCache::getAvailableLumiPrincipalPtr
std::shared_ptr< LuminosityBlockPrincipal > getAvailableLumiPrincipalPtr()
Definition: PrincipalCache.cc:50
edm::EventProcessor::deferredExceptionPtrIsSet_
std::atomic< bool > deferredExceptionPtrIsSet_
Definition: EventProcessor.h:340
edm::PrincipalCache::hasRunPrincipal
bool hasRunPrincipal() const
Definition: PrincipalCache.h:66
edm::EventProcessor::epSuccess
Definition: EventProcessor.h:75
ev
bool ev
Definition: Hydjet2Hadronizer.cc:95
edm::EventProcessor::processConfiguration_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
Definition: EventProcessor.h:323
Exception
Definition: hltDiff.cc:246
edm::EventProcessor::run
StatusCode run()
Definition: EventProcessor.h:371
edm::MergeableRunProductMetadata::preWriteRun
void preWriteRun()
Definition: MergeableRunProductMetadata.cc:125
edm::parameterSet
ParameterSet const & parameterSet(Provenance const &provenance, ProcessHistory const &history)
Definition: Provenance.cc:11
edm::EventProcessor::handleEndLumiExceptions
void handleEndLumiExceptions(std::exception_ptr const *iPtr, WaitingTaskHolder &holder)
Definition: EventProcessor.cc:1351
or
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
edm::EventProcessor::processEventWithLooper
void processEventWithLooper(EventPrincipal &, unsigned int iStreamIndex)
Definition: EventProcessor.cc:1853
edm::PreallocationConfiguration::numberOfStreams
unsigned int numberOfStreams() const
Definition: PreallocationConfiguration.h:35
AlignmentPI::index
index
Definition: AlignmentPayloadInspectorHelper.h:46
edm::FileBlock::ParallelProcesses
Definition: FileBlock.h:28
edm::PrincipalCache::processBlockPrincipal
ProcessBlockPrincipal & processBlockPrincipal() const
Definition: PrincipalCache.h:54
edm::EventProcessor::endRun
void endRun(ProcessHistoryID const &phid, RunNumber_t run, bool globalBeginSucceeded, bool cleaningUpAfterException)
Definition: EventProcessor.cc:1069
edm::EventProcessor::setExceptionMessageLumis
void setExceptionMessageLumis()
Definition: EventProcessor.cc:1900
edm::EventProcessor::looper
std::shared_ptr< EDLooperBase const > looper() const
Definition: EventProcessor.h:301
cms::Exception
Definition: Exception.h:70
edm::EventProcessor::nextLuminosityBlockID
edm::LuminosityBlockNumber_t nextLuminosityBlockID()
Definition: EventProcessor.cc:686
edm::InputSource::IsEvent
Definition: InputSource.h:78
edm::EventProcessor::subProcesses_
std::vector< SubProcess > subProcesses_
Definition: EventProcessor.h:333
edm::EventProcessor::nextTransitionType
InputSource::ItemType nextTransitionType()
Definition: EventProcessor.cc:656
edm::EventProcessor::prepareForNextLoop
void prepareForNextLoop()
Definition: EventProcessor.cc:849
edm::PrincipalCache::eventPrincipal
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
Definition: PrincipalCache.h:70
edm::EventProcessor::init
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
Definition: EventProcessor.cc:328
edm::PrincipalCache::insert
void insert(std::unique_ptr< ProcessBlockPrincipal >)
Definition: PrincipalCache.cc:96
edm::EventProcessor::processEventAsync
void processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
Definition: EventProcessor.cc:1795
event
Definition: event.py:1
edm::ProcessingController::ReverseState
ReverseState
Definition: ProcessingController.h:38
edm::EventProcessor::shouldWeStop
bool shouldWeStop() const
Definition: EventProcessor.cc:1881
submitPVValidationJobs.t
string t
Definition: submitPVValidationJobs.py:644
edm::errors::Configuration
Definition: EDMException.h:36
SiStripBadComponentsDQMServiceTemplate_cfg.ep
ep
Definition: SiStripBadComponentsDQMServiceTemplate_cfg.py:86
edm::EventProcessor::preg
std::shared_ptr< ProductRegistry const > preg() const
Definition: EventProcessor.h:289
edm::EventProcessor::schedule_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
Definition: EventProcessor.h:327
edm::EventProcessor::writeLumiAsync
void writeLumiAsync(WaitingTaskHolder, LuminosityBlockPrincipal &lumiPrincipal)
Definition: EventProcessor.cc:1634
edm::EventProcessor::handleNextEventForStreamAsync
void handleNextEventForStreamAsync(WaitingTaskHolder iTask, unsigned int iStreamIndex)
Definition: EventProcessor.cc:1723
edm::EventProcessor::branchIDListHelper_
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
Definition: EventProcessor.h:314
trackingPlots.common
common
Definition: trackingPlots.py:206
edm::PrincipalCache::adjustIndexesAfterProductRegistryAddition
void adjustIndexesAfterProductRegistryAddition()
Definition: PrincipalCache.cc:137
edm::LuminosityBlockID::maxLuminosityBlockNumber
static LuminosityBlockNumber_t maxLuminosityBlockNumber()
Definition: LuminosityBlockID.h:84
edm::EventProcessor::checkForAsyncStopRequest
bool checkForAsyncStopRequest(StatusCode &)
Definition: EventProcessor.cc:645
common
Definition: common.py:1
edm::EventProcessor::forceESCacheClearOnNewRun_
bool forceESCacheClearOnNewRun_
Definition: EventProcessor.h:354
findQualityFiles.size
size
Write out results.
Definition: findQualityFiles.py:443
MillePedeFileConverter_cfg.e
e
Definition: MillePedeFileConverter_cfg.py:37
edm::EventProcessor::beginJob
void beginJob()
Definition: EventProcessor.cc:535
edm::PrincipalCache::setNumberOfConcurrentPrincipals
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
Definition: PrincipalCache.cc:17
unpackBuffers-CaloStage2.token
token
Definition: unpackBuffers-CaloStage2.py:318
edm::MergeableRunProductProcesses::setProcessesWithMergeableRunProducts
void setProcessesWithMergeableRunProducts(ProductRegistry const &productRegistry)
Definition: MergeableRunProductProcesses.cc:18