CMS 3D CMS Logo

List of all members | Public Types | Public Member Functions | Private Types | Private Member Functions | Private Attributes
edm::EventProcessor Class Reference

#include <EventProcessor.h>

Public Types

using ProcessBlockType = PrincipalCache::ProcessBlockType
 
enum  StatusCode {
  epSuccess = 0, epException = 1, epOther = 2, epSignal = 3,
  epInputComplete = 4, epTimedOut = 5, epCountComplete = 6
}
 

Public Member Functions

void beginJob ()
 
void beginLumiAsync (edm::IOVSyncValue const &iSyncValue, std::shared_ptr< void > const &iRunResource, edm::WaitingTaskHolder iHolder)
 
void beginProcessBlock (bool &beginProcessBlockSucceeded)
 
void beginRun (ProcessHistoryID const &phid, RunNumber_t run, bool &globalBeginSucceeded, bool &eventSetupForInstanceSucceeded)
 
void clearCounters ()
 Clears counters used by trigger report. More...
 
void closeInputFile (bool cleaningUpAfterException)
 
void closeOutputFiles ()
 
void continueLumiAsync (edm::WaitingTaskHolder iHolder)
 
void deleteLumiFromCache (LuminosityBlockProcessingStatus &)
 
void deleteRunFromCache (ProcessHistoryID const &phid, RunNumber_t run)
 
void doErrorStuff ()
 
void enableEndPaths (bool active)
 
void endJob ()
 
bool endOfLoop ()
 
bool endPathsEnabled () const
 
void endProcessBlock (bool cleaningUpAfterException, bool beginProcessBlockSucceeded)
 
void endRun (ProcessHistoryID const &phid, RunNumber_t run, bool globalBeginSucceeded, bool cleaningUpAfterException)
 
void endUnfinishedLumi ()
 
void endUnfinishedRun (ProcessHistoryID const &phid, RunNumber_t run, bool globalBeginSucceeded, bool cleaningUpAfterException, bool eventSetupForInstanceSucceeded)
 
 EventProcessor (EventProcessor const &)=delete
 
 EventProcessor (std::shared_ptr< ProcessDesc > processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy legacy)
 
 EventProcessor (std::unique_ptr< ParameterSet > parameterSet, ServiceToken const &token=ServiceToken(), serviceregistry::ServiceLegacy=serviceregistry::kOverlapIsError, std::vector< std::string > const &defaultServices=std::vector< std::string >(), std::vector< std::string > const &forcedServices=std::vector< std::string >())
 
 EventProcessor (std::unique_ptr< ParameterSet > parameterSet, std::vector< std::string > const &defaultServices, std::vector< std::string > const &forcedServices=std::vector< std::string >())
 
bool fileBlockValid ()
 
std::vector< ModuleDescription const * > getAllModuleDescriptions () const
 
ServiceToken getToken ()
 
void globalEndLumiAsync (edm::WaitingTaskHolder iTask, std::shared_ptr< LuminosityBlockProcessingStatus > iLumiStatus)
 
void handleEndLumiExceptions (std::exception_ptr const *iPtr, WaitingTaskHolder &holder)
 
void inputProcessBlocks ()
 
InputSource::ItemType lastTransitionType () const
 
edm::LuminosityBlockNumber_t nextLuminosityBlockID ()
 
std::pair< edm::ProcessHistoryID, edm::RunNumber_tnextRunID ()
 
InputSource::ItemType nextTransitionType ()
 
void openOutputFiles ()
 
EventProcessoroperator= (EventProcessor const &)=delete
 
void prepareForNextLoop ()
 
ProcessConfiguration const & processConfiguration () const
 
InputSource::ItemType processLumis (std::shared_ptr< void > const &iRunResource)
 
int readAndMergeLumi (LuminosityBlockProcessingStatus &)
 
std::pair< ProcessHistoryID, RunNumber_treadAndMergeRun ()
 
void readFile ()
 
void readLuminosityBlock (LuminosityBlockProcessingStatus &)
 
std::pair< ProcessHistoryID, RunNumber_treadRun ()
 
void respondToCloseInputFile ()
 
void respondToOpenInputFile ()
 
void rewindInput ()
 
StatusCode run ()
 
StatusCode runToCompletion ()
 
bool setDeferredException (std::exception_ptr)
 
void setExceptionMessageFiles (std::string &message)
 
void setExceptionMessageLumis ()
 
void setExceptionMessageRuns (std::string &message)
 
bool shouldWeCloseOutput () const
 
bool shouldWeStop () const
 
void startingNewLoop ()
 
void streamEndLumiAsync (edm::WaitingTaskHolder iTask, unsigned int iStreamIndex)
 
void taskCleanup ()
 
int totalEvents () const
 
int totalEventsFailed () const
 
int totalEventsPassed () const
 
void writeLumiAsync (WaitingTaskHolder, LuminosityBlockPrincipal &lumiPrincipal)
 
void writeProcessBlockAsync (WaitingTaskHolder, ProcessBlockType)
 
void writeRunAsync (WaitingTaskHolder, ProcessHistoryID const &phid, RunNumber_t run, MergeableRunProductMetadata const *)
 
 ~EventProcessor ()
 

Private Types

typedef std::set< std::pair< std::string, std::string > > ExcludedData
 
typedef std::map< std::string, ExcludedDataExcludedDataMap
 

Private Member Functions

std::shared_ptr< BranchIDListHelper > & branchIDListHelper ()
 
std::shared_ptr< BranchIDListHelper const > branchIDListHelper () const
 
bool checkForAsyncStopRequest (StatusCode &)
 
void handleNextEventForStreamAsync (WaitingTaskHolder iTask, unsigned int iStreamIndex)
 
void init (std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
 
std::shared_ptr< EDLooperBase > & looper ()
 
std::shared_ptr< EDLooperBase const > looper () const
 
std::shared_ptr< ProductRegistry > & preg ()
 
std::shared_ptr< ProductRegistry const > preg () const
 
void processEventAsync (WaitingTaskHolder iHolder, unsigned int iStreamIndex)
 
void processEventAsyncImpl (WaitingTaskHolder iHolder, unsigned int iStreamIndex)
 
void processEventWithLooper (EventPrincipal &, unsigned int iStreamIndex)
 
void readEvent (unsigned int iStreamIndex)
 
bool readNextEventForStream (unsigned int iStreamIndex, LuminosityBlockProcessingStatus &iLumiStatus)
 
std::shared_ptr< ThinnedAssociationsHelper > & thinnedAssociationsHelper ()
 
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper () const
 
void warnAboutModulesRequiringLuminosityBLockSynchronization () const
 

Private Attributes

std::unique_ptr< ExceptionToActionTable const > act_table_
 
std::shared_ptr< ActivityRegistryactReg_
 
bool asyncStopRequestedWhileProcessingEvents_
 
StatusCode asyncStopStatusCodeFromProcessingEvents_
 
bool beginJobCalled_
 
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
 
std::exception_ptr deferredExceptionPtr_
 
std::atomic< bool > deferredExceptionPtrIsSet_
 
bool deleteNonConsumedUnscheduledModules_ = true
 
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
 
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
 
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
 
std::string exceptionMessageFiles_
 
std::atomic< bool > exceptionMessageLumis_
 
std::string exceptionMessageRuns_
 
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
 
bool fileModeNoMerge_
 
bool firstEventInBlock_ = true
 
bool forceESCacheClearOnNewRun_
 
bool forceLooperToEnd_
 
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
 
edm::propagate_const< std::unique_ptr< InputSource > > input_
 
InputSource::ItemType lastSourceTransition_
 
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
 
bool looperBeginJobRun_
 
std::unique_ptr< edm::LimitedTaskQueuelumiQueue_
 
MergeableRunProductProcesses mergeableRunProductProcesses_
 
PathsAndConsumesOfModules pathsAndConsumesOfModules_
 
PreallocationConfiguration preallocations_
 
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
 
PrincipalCache principalCache_
 
bool printDependencies_ = false
 
std::shared_ptr< ProcessConfiguration const > processConfiguration_
 
ProcessContext processContext_
 
edm::SerialTaskQueue queueWhichWaitsForIOVsToFinish_
 
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
 
ServiceToken serviceToken_
 
bool shouldWeStop_
 
std::shared_ptr< std::recursive_mutex > sourceMutex_
 
SharedResourcesAcquirer sourceResourcesAcquirer_
 
std::atomic< unsigned int > streamLumiActive_ {0}
 
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
 
std::vector< edm::SerialTaskQueuestreamQueues_
 
std::vector< SubProcesssubProcesses_
 
tbb::task_group taskGroup_
 
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
 

Detailed Description

Definition at line 64 of file EventProcessor.h.

Member Typedef Documentation

◆ ExcludedData

typedef std::set<std::pair<std::string, std::string> > edm::EventProcessor::ExcludedData
private

Definition at line 366 of file EventProcessor.h.

◆ ExcludedDataMap

typedef std::map<std::string, ExcludedData> edm::EventProcessor::ExcludedDataMap
private

Definition at line 367 of file EventProcessor.h.

◆ ProcessBlockType

Definition at line 248 of file EventProcessor.h.

Member Enumeration Documentation

◆ StatusCode

Enumerator
epSuccess 
epException 
epOther 
epSignal 
epInputComplete 
epTimedOut 
epCountComplete 

Definition at line 74 of file EventProcessor.h.

74  {
75  epSuccess = 0,
76  epException = 1,
77  epOther = 2,
78  epSignal = 3,
79  epInputComplete = 4,
80  epTimedOut = 5,
81  epCountComplete = 6
82  };

Constructor & Destructor Documentation

◆ EventProcessor() [1/4]

edm::EventProcessor::EventProcessor ( std::unique_ptr< ParameterSet parameterSet,
ServiceToken const &  token = ServiceToken(),
serviceregistry::ServiceLegacy  iLegacy = serviceregistry::kOverlapIsError,
std::vector< std::string > const &  defaultServices = std::vector<std::string>(),
std::vector< std::string > const &  forcedServices = std::vector<std::string>() 
)
explicit

Definition at line 234 of file EventProcessor.cc.

239  : actReg_(),
240  preg_(),
242  serviceToken_(),
243  input_(),
244  espController_(new eventsetup::EventSetupsController),
245  esp_(),
246  act_table_(),
248  schedule_(),
249  subProcesses_(),
250  historyAppender_(new HistoryAppender),
251  fb_(),
252  looper_(),
254  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
255  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
256  principalCache_(),
257  beginJobCalled_(false),
258  shouldWeStop_(false),
259  fileModeNoMerge_(false),
262  exceptionMessageLumis_(false),
263  forceLooperToEnd_(false),
264  looperBeginJobRun_(false),
267  auto processDesc = std::make_shared<ProcessDesc>(std::move(parameterSet));
268  processDesc->addServices(defaultServices, forcedServices);
269  init(processDesc, iToken, iLegacy);
270  }

References init(), eostools::move(), and edm::parameterSet().

◆ EventProcessor() [2/4]

edm::EventProcessor::EventProcessor ( std::unique_ptr< ParameterSet parameterSet,
std::vector< std::string > const &  defaultServices,
std::vector< std::string > const &  forcedServices = std::vector<std::string>() 
)

Definition at line 272 of file EventProcessor.cc.

275  : actReg_(),
276  preg_(),
278  serviceToken_(),
279  input_(),
280  espController_(new eventsetup::EventSetupsController),
281  esp_(),
282  act_table_(),
284  schedule_(),
285  subProcesses_(),
286  historyAppender_(new HistoryAppender),
287  fb_(),
288  looper_(),
290  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
291  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
292  principalCache_(),
293  beginJobCalled_(false),
294  shouldWeStop_(false),
295  fileModeNoMerge_(false),
298  exceptionMessageLumis_(false),
299  forceLooperToEnd_(false),
300  looperBeginJobRun_(false),
304  auto processDesc = std::make_shared<ProcessDesc>(std::move(parameterSet));
305  processDesc->addServices(defaultServices, forcedServices);
307  }

References init(), edm::serviceregistry::kOverlapIsError, eostools::move(), and edm::parameterSet().

◆ EventProcessor() [3/4]

edm::EventProcessor::EventProcessor ( std::shared_ptr< ProcessDesc processDesc,
ServiceToken const &  token,
serviceregistry::ServiceLegacy  legacy 
)

Definition at line 309 of file EventProcessor.cc.

312  : actReg_(),
313  preg_(),
315  serviceToken_(),
316  input_(),
317  espController_(new eventsetup::EventSetupsController),
318  esp_(),
319  act_table_(),
321  schedule_(),
322  subProcesses_(),
323  historyAppender_(new HistoryAppender),
324  fb_(),
325  looper_(),
327  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
328  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
329  principalCache_(),
330  beginJobCalled_(false),
331  shouldWeStop_(false),
332  fileModeNoMerge_(false),
335  exceptionMessageLumis_(false),
336  forceLooperToEnd_(false),
337  looperBeginJobRun_(false),
341  init(processDesc, token, legacy);
342  }

References init(), and unpackBuffers-CaloStage2::token.

◆ ~EventProcessor()

edm::EventProcessor::~EventProcessor ( )

Definition at line 535 of file EventProcessor.cc.

535  {
536  // Make the services available while everything is being deleted.
539 
540  // manually destroy all these thing that may need the services around
541  // propagate_const<T> has no reset() function
542  espController_ = nullptr;
543  esp_ = nullptr;
544  schedule_ = nullptr;
545  input_ = nullptr;
546  looper_ = nullptr;
547  actReg_ = nullptr;
548 
551  }

References actReg_, edm::ParentageRegistry::clear(), edm::pset::Registry::clear(), esp_, espController_, getToken(), input_, edm::ParentageRegistry::instance(), edm::pset::Registry::instance(), looper_, schedule_, and unpackBuffers-CaloStage2::token.

◆ EventProcessor() [4/4]

edm::EventProcessor::EventProcessor ( EventProcessor const &  )
delete

Member Function Documentation

◆ beginJob()

void edm::EventProcessor::beginJob ( void  )

This should be called before the first call to 'run' If this is not called in time, it will automatically be called the first time 'run' is called

Definition at line 560 of file EventProcessor.cc.

560  {
561  if (beginJobCalled_)
562  return;
563  beginJobCalled_ = true;
564  bk::beginJob();
565 
566  // StateSentry toerror(this); // should we add this ?
567  //make the services available
569 
570  service::SystemBounds bounds(preallocations_.numberOfStreams(),
574  actReg_->preallocateSignal_(bounds);
575  schedule_->convertCurrentProcessAlias(processConfiguration_->processName());
577 
578  std::vector<ModuleProcessName> consumedBySubProcesses;
580  [&consumedBySubProcesses, deleteModules = deleteNonConsumedUnscheduledModules_](auto& subProcess) {
581  auto c = subProcess.keepOnlyConsumedUnscheduledModules(deleteModules);
582  if (consumedBySubProcesses.empty()) {
583  consumedBySubProcesses = std::move(c);
584  } else if (not c.empty()) {
585  std::vector<ModuleProcessName> tmp;
586  tmp.reserve(consumedBySubProcesses.size() + c.size());
587  std::merge(consumedBySubProcesses.begin(),
588  consumedBySubProcesses.end(),
589  c.begin(),
590  c.end(),
591  std::back_inserter(tmp));
592  std::swap(consumedBySubProcesses, tmp);
593  }
594  });
595 
596  // Note: all these may throw
599  if (auto const unusedModules = nonConsumedUnscheduledModules(pathsAndConsumesOfModules_, consumedBySubProcesses);
600  not unusedModules.empty()) {
602 
603  edm::LogInfo("DeleteModules").log([&unusedModules](auto& l) {
604  l << "Following modules are not in any Path or EndPath, nor is their output consumed by any other module, "
605  "and "
606  "therefore they are deleted before beginJob transition.";
607  for (auto const& description : unusedModules) {
608  l << "\n " << description->moduleLabel();
609  }
610  });
611  for (auto const& description : unusedModules) {
612  schedule_->deleteModule(description->moduleLabel(), actReg_.get());
613  }
614  }
615  }
616 
617  actReg_->preBeginJobSignal_(pathsAndConsumesOfModules_, processContext_);
618 
621  }
622  //NOTE: This implementation assumes 'Job' means one call
623  // the EventProcessor::run
624  // If it really means once per 'application' then this code will
625  // have to be changed.
626  // Also have to deal with case where have 'run' then new Module
627  // added and do 'run'
628  // again. In that case the newly added Module needs its 'beginJob'
629  // to be called.
630 
631  //NOTE: in future we should have a beginOfJob for looper that takes no arguments
632  // For now we delay calling beginOfJob until first beginOfRun
633  //if(looper_) {
634  // looper_->beginOfJob(es);
635  //}
636  try {
637  convertException::wrap([&]() { input_->doBeginJob(); });
638  } catch (cms::Exception& ex) {
639  ex.addContext("Calling beginJob for the source");
640  throw;
641  }
642  espController_->finishConfiguration();
643  schedule_->beginJob(*preg_, esp_->recordsToProxyIndices());
644  if (looper_) {
645  constexpr bool mustPrefetchMayGet = true;
646  auto const processBlockLookup = preg_->productLookup(InProcess);
647  auto const runLookup = preg_->productLookup(InRun);
648  auto const lumiLookup = preg_->productLookup(InLumi);
649  auto const eventLookup = preg_->productLookup(InEvent);
650  looper_->updateLookup(InProcess, *processBlockLookup, mustPrefetchMayGet);
651  looper_->updateLookup(InRun, *runLookup, mustPrefetchMayGet);
652  looper_->updateLookup(InLumi, *lumiLookup, mustPrefetchMayGet);
653  looper_->updateLookup(InEvent, *eventLookup, mustPrefetchMayGet);
654  looper_->updateLookup(esp_->recordsToProxyIndices());
655  }
656  // toerror.succeeded(); // should we add this?
657  for_all(subProcesses_, [](auto& subProcess) { subProcess.doBeginJob(); });
658  actReg_->postBeginJobSignal_();
659 
660  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
661  schedule_->beginStream(i);
662  for_all(subProcesses_, [i](auto& subProcess) { subProcess.doBeginStream(i); });
663  }
664  }

References actReg_, cms::Exception::addContext(), bk::beginJob(), beginJobCalled_, c, edm::checkForModuleDependencyCorrectness(), deleteNonConsumedUnscheduledModules_, edmLumisInFiles::description, esp_, espController_, edm::for_all(), mps_fire::i, edm::InEvent, edm::PathsAndConsumesOfModules::initialize(), edm::InLumi, edm::InProcess, input_, edm::InRun, cmsLHEtoEOSManager::l, looper_, MatrixUtil::merge(), eostools::move(), edm::nonConsumedUnscheduledModules(), edm::PreallocationConfiguration::numberOfLuminosityBlocks(), edm::PreallocationConfiguration::numberOfRuns(), edm::PreallocationConfiguration::numberOfStreams(), edm::PreallocationConfiguration::numberOfThreads(), pathsAndConsumesOfModules_, preallocations_, preg(), preg_, printDependencies_, processConfiguration_, processContext_, edm::PathsAndConsumesOfModules::removeModules(), schedule_, serviceToken_, subProcesses_, std::swap(), createJobs::tmp, warnAboutModulesRequiringLuminosityBLockSynchronization(), and edm::convertException::wrap().

Referenced by runToCompletion().

◆ beginLumiAsync()

void edm::EventProcessor::beginLumiAsync ( edm::IOVSyncValue const &  iSyncValue,
std::shared_ptr< void > const &  iRunResource,
edm::WaitingTaskHolder  iHolder 
)

Definition at line 1236 of file EventProcessor.cc.

1238  {
1239  if (iHolder.taskHasFailed()) {
1240  return;
1241  }
1242 
1243  // We must be careful with the status object here and in code this function calls. IF we want
1244  // endRun to be called, then we must call resetResources before the things waiting on
1245  // iHolder are allowed to proceed. Otherwise, there will be race condition (possibly causing
1246  // endRun to be called much later than it should be, because status is holding iRunResource).
1247  // Note that this must be done explicitly. Relying on the destructor does not work well
1248  // because the LimitedTaskQueue for the lumiWork holds the shared_ptr in each of its internal
1249  // queues, plus it is difficult to guarantee the destructor is called before iHolder gets
1250  // destroyed inside this function and lumiWork.
1251  auto status =
1252  std::make_shared<LuminosityBlockProcessingStatus>(this, preallocations_.numberOfStreams(), iRunResource);
1253 
1254  auto lumiWork = [this, iHolder, status](edm::LimitedTaskQueue::Resumer iResumer) mutable {
1255  if (iHolder.taskHasFailed()) {
1256  status->resetResources();
1257  return;
1258  }
1259 
1260  status->setResumer(std::move(iResumer));
1261 
1263  *iHolder.group(), [this, iHolder, status = std::move(status)]() mutable {
1264  //make the services available
1266  // Caught exception is propagated via WaitingTaskHolder
1267  CMS_SA_ALLOW try {
1269 
1270  LuminosityBlockPrincipal& lumiPrincipal = *status->lumiPrincipal();
1271  {
1272  SendSourceTerminationSignalIfException sentry(actReg_.get());
1273 
1274  input_->doBeginLumi(lumiPrincipal, &processContext_);
1275  sentry.completedSuccessfully();
1276  }
1277 
1279  if (rng.isAvailable()) {
1280  LuminosityBlock lb(lumiPrincipal, ModuleDescription(), nullptr, false);
1281  rng->preBeginLumi(lb);
1282  }
1283 
1284  //Task to start the stream beginLumis
1285  auto beginStreamsTask =
1286  make_waiting_task([this, holder = iHolder, status](std::exception_ptr const* iPtr) mutable {
1287  if (iPtr) {
1288  status->resetResources();
1289  holder.doneWaiting(*iPtr);
1290  } else {
1291  status->globalBeginDidSucceed();
1292  EventSetupImpl const& es = status->eventSetupImpl(esp_->subProcessIndex());
1293 
1294  if (looper_) {
1295  // Caught exception is propagated via WaitingTaskHolder
1296  CMS_SA_ALLOW try {
1297  //make the services available
1298  ServiceRegistry::Operate operateLooper(serviceToken_);
1299  looper_->doBeginLuminosityBlock(*(status->lumiPrincipal()), es, &processContext_);
1300  } catch (...) {
1301  status->resetResources();
1302  holder.doneWaiting(std::current_exception());
1303  return;
1304  }
1305  }
1306  using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamBegin>;
1307 
1308  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1309  streamQueues_[i].push(*holder.group(), [this, i, status, holder, &es]() mutable {
1310  streamQueues_[i].pause();
1311 
1312  auto eventTask = edm::make_waiting_task(
1313  [this, i, h = std::move(holder)](
1314  std::exception_ptr const* exceptionFromBeginStreamLumi) mutable {
1315  if (exceptionFromBeginStreamLumi) {
1317  tmp.doneWaiting(*exceptionFromBeginStreamLumi);
1319  } else {
1321  }
1322  });
1323  auto& event = principalCache_.eventPrincipal(i);
1324  //We need to be sure that 'status' and its internal shared_ptr<LuminosityBlockPrincipal> are only
1325  // held by the container as this lambda may not finish executing before all the tasks it
1326  // spawns have already started to run.
1327  auto eventSetupImpls = &status->eventSetupImpls();
1328  auto lp = status->lumiPrincipal().get();
1331  event.setLuminosityBlockPrincipal(lp);
1332  LumiTransitionInfo transitionInfo(*lp, es, eventSetupImpls);
1333  beginStreamTransitionAsync<Traits>(WaitingTaskHolder(*holder.group(), eventTask),
1334  *schedule_,
1335  i,
1336  transitionInfo,
1337  serviceToken_,
1338  subProcesses_);
1339  });
1340  }
1341  }
1342  }); // beginStreamTask
1343 
1344  //task to start the global begin lumi
1345  WaitingTaskHolder beginStreamsHolder{*iHolder.group(), beginStreamsTask};
1346 
1347  EventSetupImpl const& es = status->eventSetupImpl(esp_->subProcessIndex());
1348  {
1349  LumiTransitionInfo transitionInfo(lumiPrincipal, es, &status->eventSetupImpls());
1350  using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalBegin>;
1351  beginGlobalTransitionAsync<Traits>(
1352  beginStreamsHolder, *schedule_, transitionInfo, serviceToken_, subProcesses_);
1353  }
1354  } catch (...) {
1355  status->resetResources();
1356  iHolder.doneWaiting(std::current_exception());
1357  }
1358  }); // task in sourceResourcesAcquirer
1359  }; // end lumiWork
1360 
1361  auto queueLumiWorkTask = make_waiting_task(
1362  [this, lumiWorkLambda = std::move(lumiWork), iHolder](std::exception_ptr const* iPtr) mutable {
1363  if (iPtr) {
1364  iHolder.doneWaiting(*iPtr);
1365  }
1366  lumiQueue_->pushAndPause(*iHolder.group(), std::move(lumiWorkLambda));
1367  });
1368 
1369  if (espController_->doWeNeedToWaitForIOVsToFinish(iSync)) {
1370  // We only get here inside this block if there is an EventSetup
1371  // module not able to handle concurrent IOVs (usually an ESSource)
1372  // and the new sync value is outside the current IOV of that module.
1373 
1374  WaitingTaskHolder queueLumiWorkTaskHolder{*iHolder.group(), queueLumiWorkTask};
1375 
1376  queueWhichWaitsForIOVsToFinish_.push(*iHolder.group(), [this, queueLumiWorkTaskHolder, iSync, status]() mutable {
1377  // Caught exception is propagated via WaitingTaskHolder
1378  CMS_SA_ALLOW try {
1379  SendSourceTerminationSignalIfException sentry(actReg_.get());
1380  // Pass in iSync to let the EventSetup system know which run and lumi
1381  // need to be processed and prepare IOVs for it.
1382  // Pass in the endIOVWaitingTasks so the lumi can notify them when the
1383  // lumi is done and no longer needs its EventSetup IOVs.
1384  espController_->eventSetupForInstanceAsync(
1385  iSync, queueLumiWorkTaskHolder, status->endIOVWaitingTasks(), status->eventSetupImpls());
1386  sentry.completedSuccessfully();
1387  } catch (...) {
1388  queueLumiWorkTaskHolder.doneWaiting(std::current_exception());
1389  }
1391  });
1392 
1393  } else {
1395 
1396  // This holder will be used to wait until the EventSetup IOVs are ready
1397  WaitingTaskHolder queueLumiWorkTaskHolder{*iHolder.group(), queueLumiWorkTask};
1398  // Caught exception is propagated via WaitingTaskHolder
1399  CMS_SA_ALLOW try {
1400  SendSourceTerminationSignalIfException sentry(actReg_.get());
1401 
1402  // Pass in iSync to let the EventSetup system know which run and lumi
1403  // need to be processed and prepare IOVs for it.
1404  // Pass in the endIOVWaitingTasks so the lumi can notify them when the
1405  // lumi is done and no longer needs its EventSetup IOVs.
1406  espController_->eventSetupForInstanceAsync(
1407  iSync, queueLumiWorkTaskHolder, status->endIOVWaitingTasks(), status->eventSetupImpls());
1408  sentry.completedSuccessfully();
1409 
1410  } catch (...) {
1411  queueLumiWorkTaskHolder.doneWaiting(std::current_exception());
1412  }
1413  }
1414  }

References actReg_, CMS_SA_ALLOW, edm::WaitingTaskHolder::doneWaiting(), esp_, espController_, edm::PrincipalCache::eventPrincipal(), edm::WaitingTaskHolder::group(), handleNextEventForStreamAsync(), mps_fire::i, input_, edm::Service< T >::isAvailable(), looper_, lumiQueue_, edm::make_waiting_task(), eostools::move(), edm::PreallocationConfiguration::numberOfStreams(), edm::SerialTaskQueue::pause(), preallocations_, principalCache_, processContext_, edm::SerialTaskQueueChain::push(), edm::SerialTaskQueue::push(), queueWhichWaitsForIOVsToFinish_, readLuminosityBlock(), schedule_, edm::SharedResourcesAcquirer::serialQueueChain(), serviceToken_, sourceResourcesAcquirer_, mps_update::status, streamEndLumiAsync(), streamLumiActive_, streamLumiStatus_, streamQueues_, subProcesses_, edm::WaitingTaskHolder::taskHasFailed(), and createJobs::tmp.

Referenced by handleNextEventForStreamAsync(), and processLumis().

◆ beginProcessBlock()

void edm::EventProcessor::beginProcessBlock ( bool &  beginProcessBlockSucceeded)

Definition at line 949 of file EventProcessor.cc.

949  {
950  ProcessBlockPrincipal& processBlockPrincipal = principalCache_.processBlockPrincipal();
951  processBlockPrincipal.fillProcessBlockPrincipal(processConfiguration_->processName());
952 
953  using Traits = OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalBegin>;
954  FinalWaitingTask globalWaitTask;
955 
956  ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
957  beginGlobalTransitionAsync<Traits>(
958  WaitingTaskHolder(taskGroup_, &globalWaitTask), *schedule_, transitionInfo, serviceToken_, subProcesses_);
959 
960  do {
961  taskGroup_.wait();
962  } while (not globalWaitTask.done());
963 
964  if (globalWaitTask.exceptionPtr() != nullptr) {
965  std::rethrow_exception(*(globalWaitTask.exceptionPtr()));
966  }
967  beginProcessBlockSucceeded = true;
968  }

References edm::FinalWaitingTask::done(), edm::WaitingTask::exceptionPtr(), edm::ProcessBlockPrincipal::fillProcessBlockPrincipal(), principalCache_, edm::PrincipalCache::processBlockPrincipal(), processConfiguration_, schedule_, serviceToken_, subProcesses_, and taskGroup_.

◆ beginRun()

void edm::EventProcessor::beginRun ( ProcessHistoryID const &  phid,
RunNumber_t  run,
bool &  globalBeginSucceeded,
bool &  eventSetupForInstanceSucceeded 
)

Definition at line 1048 of file EventProcessor.cc.

1051  {
1052  globalBeginSucceeded = false;
1053  RunPrincipal& runPrincipal = principalCache_.runPrincipal(phid, run);
1054  {
1055  SendSourceTerminationSignalIfException sentry(actReg_.get());
1056 
1057  input_->doBeginRun(runPrincipal, &processContext_);
1058  sentry.completedSuccessfully();
1059  }
1060 
1061  IOVSyncValue ts(EventID(runPrincipal.run(), 0, 0), runPrincipal.beginTime());
1063  espController_->forceCacheClear();
1064  }
1065  {
1066  SendSourceTerminationSignalIfException sentry(actReg_.get());
1068  eventSetupForInstanceSucceeded = true;
1069  sentry.completedSuccessfully();
1070  }
1071  auto const& es = esp_->eventSetupImpl();
1072  if (looper_ && looperBeginJobRun_ == false) {
1073  looper_->copyInfo(ScheduleInfo(schedule_.get()));
1074  looper_->beginOfJob(es);
1075  looperBeginJobRun_ = true;
1076  looper_->doStartingNewLoop();
1077  }
1078  {
1079  using Traits = OccurrenceTraits<RunPrincipal, BranchActionGlobalBegin>;
1080  FinalWaitingTask globalWaitTask;
1081  RunTransitionInfo transitionInfo(runPrincipal, es);
1082  beginGlobalTransitionAsync<Traits>(
1083  WaitingTaskHolder(taskGroup_, &globalWaitTask), *schedule_, transitionInfo, serviceToken_, subProcesses_);
1084  do {
1085  taskGroup_.wait();
1086  } while (not globalWaitTask.done());
1087  if (globalWaitTask.exceptionPtr() != nullptr) {
1088  std::rethrow_exception(*(globalWaitTask.exceptionPtr()));
1089  }
1090  }
1091  globalBeginSucceeded = true;
1092  FDEBUG(1) << "\tbeginRun " << run << "\n";
1093  if (looper_) {
1094  looper_->doBeginRun(runPrincipal, es, &processContext_);
1095  }
1096  {
1097  //To wait, the ref count has to be 1+#streams
1098  FinalWaitingTask streamLoopWaitTask;
1099 
1100  using Traits = OccurrenceTraits<RunPrincipal, BranchActionStreamBegin>;
1101 
1102  RunTransitionInfo transitionInfo(runPrincipal, es);
1103  beginStreamsTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &streamLoopWaitTask),
1104  *schedule_,
1106  transitionInfo,
1107  serviceToken_,
1108  subProcesses_);
1109  do {
1110  taskGroup_.wait();
1111  } while (not streamLoopWaitTask.done());
1112  if (streamLoopWaitTask.exceptionPtr() != nullptr) {
1113  std::rethrow_exception(*(streamLoopWaitTask.exceptionPtr()));
1114  }
1115  }
1116  FDEBUG(1) << "\tstreamBeginRun " << run << "\n";
1117  if (looper_) {
1118  //looper_->doStreamBeginRun(schedule_->streamID(),runPrincipal, es);
1119  }
1120  }

References actReg_, edm::RunPrincipal::beginTime(), edm::FinalWaitingTask::done(), esp_, espController_, edm::WaitingTask::exceptionPtr(), FDEBUG, forceESCacheClearOnNewRun_, input_, looper_, looperBeginJobRun_, edm::PreallocationConfiguration::numberOfStreams(), preallocations_, principalCache_, processContext_, edm::RunPrincipal::run(), run(), edm::PrincipalCache::runPrincipal(), schedule_, serviceToken_, subProcesses_, edm::eventsetup::synchronousEventSetupForInstance(), and taskGroup_.

◆ branchIDListHelper() [1/2]

std::shared_ptr<BranchIDListHelper>& edm::EventProcessor::branchIDListHelper ( )
inlineprivate

Definition at line 295 of file EventProcessor.h.

References branchIDListHelper_, and edm::get_underlying_safe().

◆ branchIDListHelper() [2/2]

std::shared_ptr<BranchIDListHelper const> edm::EventProcessor::branchIDListHelper ( ) const
inlineprivate

Definition at line 292 of file EventProcessor.h.

292  {
294  }

References branchIDListHelper_, and edm::get_underlying_safe().

Referenced by init().

◆ checkForAsyncStopRequest()

bool edm::EventProcessor::checkForAsyncStopRequest ( StatusCode returnCode)
private

Definition at line 719 of file EventProcessor.cc.

719  {
720  bool returnValue = false;
721 
722  // Look for a shutdown signal
723  if (shutdown_flag.load(std::memory_order_acquire)) {
724  returnValue = true;
726  }
727  return returnValue;
728  }

References epSignal, runEdmFileComparison::returnCode, and edm::shutdown_flag.

Referenced by nextTransitionType().

◆ clearCounters()

void edm::EventProcessor::clearCounters ( )

Clears counters used by trigger report.

Definition at line 713 of file EventProcessor.cc.

713 { schedule_->clearCounters(); }

References schedule_.

◆ closeInputFile()

void edm::EventProcessor::closeInputFile ( bool  cleaningUpAfterException)

Definition at line 849 of file EventProcessor.cc.

849  {
850  if (fileBlockValid()) {
851  SendSourceTerminationSignalIfException sentry(actReg_.get());
852  input_->closeFile(fb_.get(), cleaningUpAfterException);
853  sentry.completedSuccessfully();
854  }
855  FDEBUG(1) << "\tcloseInputFile\n";
856  }

References actReg_, fb_, FDEBUG, fileBlockValid(), and input_.

◆ closeOutputFiles()

void edm::EventProcessor::closeOutputFiles ( )

Definition at line 866 of file EventProcessor.cc.

866  {
867  schedule_->closeOutputFiles();
868  for_all(subProcesses_, [](auto& subProcess) { subProcess.closeOutputFiles(); });
869 
870  FDEBUG(1) << "\tcloseOutputFiles\n";
871  }

References FDEBUG, edm::for_all(), schedule_, and subProcesses_.

◆ continueLumiAsync()

void edm::EventProcessor::continueLumiAsync ( edm::WaitingTaskHolder  iHolder)

Definition at line 1416 of file EventProcessor.cc.

1416  {
1417  {
1418  //all streams are sharing the same status at the moment
1419  auto status = streamLumiStatus_[0]; //read from streamLumiActive_ happened in calling routine
1420  status->needToContinueLumi();
1421  status->startProcessingEvents();
1422  }
1423 
1424  unsigned int streamIndex = 0;
1425  tbb::task_arena arena{tbb::task_arena::attach()};
1426  for (; streamIndex < preallocations_.numberOfStreams() - 1; ++streamIndex) {
1427  arena.enqueue([this, streamIndex, h = iHolder]() { handleNextEventForStreamAsync(h, streamIndex); });
1428  }
1429  iHolder.group()->run(
1430  [this, streamIndex, h = std::move(iHolder)]() { handleNextEventForStreamAsync(h, streamIndex); });
1431  }

References edm::WaitingTaskHolder::group(), handleNextEventForStreamAsync(), eostools::move(), edm::PreallocationConfiguration::numberOfStreams(), preallocations_, mps_update::status, and streamLumiStatus_.

Referenced by processLumis().

◆ deleteLumiFromCache()

void edm::EventProcessor::deleteLumiFromCache ( LuminosityBlockProcessingStatus iStatus)

Definition at line 1736 of file EventProcessor.cc.

1736  {
1737  for (auto& s : subProcesses_) {
1738  s.deleteLumiFromCache(*iStatus.lumiPrincipal());
1739  }
1740  iStatus.lumiPrincipal()->clearPrincipal();
1741  //FDEBUG(1) << "\tdeleteLumiFromCache " << run << "/" << lumi << "\n";
1742  }

References edm::LuminosityBlockProcessingStatus::lumiPrincipal(), alignCSCRings::s, and subProcesses_.

Referenced by globalEndLumiAsync().

◆ deleteRunFromCache()

void edm::EventProcessor::deleteRunFromCache ( ProcessHistoryID const &  phid,
RunNumber_t  run 
)

Definition at line 1712 of file EventProcessor.cc.

1712  {
1713  principalCache_.deleteRun(phid, run);
1714  for_all(subProcesses_, [run, phid](auto& subProcess) { subProcess.deleteRunFromCache(phid, run); });
1715  FDEBUG(1) << "\tdeleteRunFromCache " << run << "\n";
1716  }

References edm::PrincipalCache::deleteRun(), FDEBUG, edm::for_all(), principalCache_, run(), and subProcesses_.

Referenced by endUnfinishedRun().

◆ doErrorStuff()

void edm::EventProcessor::doErrorStuff ( )

Definition at line 940 of file EventProcessor.cc.

940  {
941  FDEBUG(1) << "\tdoErrorStuff\n";
942  LogError("StateMachine") << "The EventProcessor state machine encountered an unexpected event\n"
943  << "and went to the error state\n"
944  << "Will attempt to terminate processing normally\n"
945  << "(IF using the looper the next loop will be attempted)\n"
946  << "This likely indicates a bug in an input module or corrupted input or both\n";
947  }

References FDEBUG.

Referenced by runToCompletion().

◆ enableEndPaths()

void edm::EventProcessor::enableEndPaths ( bool  active)

Turn end_paths "off" if "active" is false; turn end_paths "on" if "active" is true.

Definition at line 709 of file EventProcessor.cc.

709 { schedule_->enableEndPaths(active); }

References schedule_.

◆ endJob()

void edm::EventProcessor::endJob ( void  )

This should be called before the EventProcessor is destroyed throws if any module's endJob throws an exception.

Definition at line 666 of file EventProcessor.cc.

666  {
667  // Collects exceptions, so we don't throw before all operations are performed.
668  ExceptionCollector c(
669  "Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
670 
671  //make the services available
673 
674  //NOTE: this really should go elsewhere in the future
675  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
676  c.call([this, i]() { this->schedule_->endStream(i); });
677  for (auto& subProcess : subProcesses_) {
678  c.call([&subProcess, i]() { subProcess.doEndStream(i); });
679  }
680  }
681  auto actReg = actReg_.get();
682  c.call([actReg]() { actReg->preEndJobSignal_(); });
683  schedule_->endJob(c);
684  for (auto& subProcess : subProcesses_) {
685  c.call(std::bind(&SubProcess::doEndJob, &subProcess));
686  }
687  c.call(std::bind(&InputSource::doEndJob, input_.get()));
688  if (looper_) {
689  c.call(std::bind(&EDLooperBase::endOfJob, looper()));
690  }
691  c.call([actReg]() { actReg->postEndJobSignal_(); });
692  if (c.hasThrown()) {
693  c.rethrow();
694  }
695  }

References actReg_, c, edm::SubProcess::doEndJob(), edm::InputSource::doEndJob(), edm::EDLooperBase::endOfJob(), mps_fire::i, input_, looper(), looper_, edm::PreallocationConfiguration::numberOfStreams(), preallocations_, schedule_, serviceToken_, and subProcesses_.

Referenced by PythonEventProcessor::~PythonEventProcessor().

◆ endOfLoop()

bool edm::EventProcessor::endOfLoop ( )

Definition at line 901 of file EventProcessor.cc.

901  {
902  if (looper_) {
903  ModuleChanger changer(schedule_.get(), preg_.get(), esp_->recordsToProxyIndices());
904  looper_->setModuleChanger(&changer);
905  EDLooperBase::Status status = looper_->doEndOfLoop(esp_->eventSetupImpl());
906  looper_->setModuleChanger(nullptr);
908  return true;
909  else
910  return false;
911  }
912  FDEBUG(1) << "\tendOfLoop\n";
913  return true;
914  }

References esp_, FDEBUG, forceLooperToEnd_, edm::EDLooperBase::kContinue, looper_, preg_, schedule_, and mps_update::status.

Referenced by runToCompletion().

◆ endPathsEnabled()

bool edm::EventProcessor::endPathsEnabled ( ) const

Return true if end_paths are active, and false if they are inactive.

Definition at line 711 of file EventProcessor.cc.

711 { return schedule_->endPathsEnabled(); }

References schedule_.

◆ endProcessBlock()

void edm::EventProcessor::endProcessBlock ( bool  cleaningUpAfterException,
bool  beginProcessBlockSucceeded 
)

Definition at line 1011 of file EventProcessor.cc.

1011  {
1012  ProcessBlockPrincipal& processBlockPrincipal = principalCache_.processBlockPrincipal();
1013 
1014  using Traits = OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalEnd>;
1015  FinalWaitingTask globalWaitTask;
1016 
1017  ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
1018  endGlobalTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &globalWaitTask),
1019  *schedule_,
1020  transitionInfo,
1021  serviceToken_,
1022  subProcesses_,
1023  cleaningUpAfterException);
1024  do {
1025  taskGroup_.wait();
1026  } while (not globalWaitTask.done());
1027  if (globalWaitTask.exceptionPtr() != nullptr) {
1028  std::rethrow_exception(*(globalWaitTask.exceptionPtr()));
1029  }
1030 
1031  if (beginProcessBlockSucceeded) {
1032  FinalWaitingTask writeWaitTask;
1034  do {
1035  taskGroup_.wait();
1036  } while (not writeWaitTask.done());
1037  if (writeWaitTask.exceptionPtr()) {
1038  std::rethrow_exception(*writeWaitTask.exceptionPtr());
1039  }
1040  }
1041 
1042  processBlockPrincipal.clearPrincipal();
1043  for (auto& s : subProcesses_) {
1044  s.clearProcessBlockPrincipal(ProcessBlockType::New);
1045  }
1046  }

References edm::Principal::clearPrincipal(), edm::FinalWaitingTask::done(), edm::WaitingTask::exceptionPtr(), edm::PrincipalCache::New, principalCache_, edm::PrincipalCache::processBlockPrincipal(), alignCSCRings::s, schedule_, serviceToken_, subProcesses_, taskGroup_, and writeProcessBlockAsync().

◆ endRun()

void edm::EventProcessor::endRun ( ProcessHistoryID const &  phid,
RunNumber_t  run,
bool  globalBeginSucceeded,
bool  cleaningUpAfterException 
)

Definition at line 1149 of file EventProcessor.cc.

1152  {
1153  RunPrincipal& runPrincipal = principalCache_.runPrincipal(phid, run);
1154  runPrincipal.setEndTime(input_->timestamp());
1155 
1156  IOVSyncValue ts(
1157  EventID(runPrincipal.run(), LuminosityBlockID::maxLuminosityBlockNumber(), EventID::maxEventNumber()),
1158  runPrincipal.endTime());
1159  {
1160  SendSourceTerminationSignalIfException sentry(actReg_.get());
1162  sentry.completedSuccessfully();
1163  }
1164  auto const& es = esp_->eventSetupImpl();
1165  if (globalBeginSucceeded) {
1166  //To wait, the ref count has to be 1+#streams
1167  FinalWaitingTask streamLoopWaitTask;
1168 
1169  using Traits = OccurrenceTraits<RunPrincipal, BranchActionStreamEnd>;
1170 
1171  RunTransitionInfo transitionInfo(runPrincipal, es);
1172  endStreamsTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &streamLoopWaitTask),
1173  *schedule_,
1175  transitionInfo,
1176  serviceToken_,
1177  subProcesses_,
1178  cleaningUpAfterException);
1179  do {
1180  taskGroup_.wait();
1181  } while (not streamLoopWaitTask.done());
1182  if (streamLoopWaitTask.exceptionPtr() != nullptr) {
1183  std::rethrow_exception(*(streamLoopWaitTask.exceptionPtr()));
1184  }
1185  }
1186  FDEBUG(1) << "\tstreamEndRun " << run << "\n";
1187  if (looper_) {
1188  //looper_->doStreamEndRun(schedule_->streamID(),runPrincipal, es);
1189  }
1190  {
1191  FinalWaitingTask globalWaitTask;
1192 
1193  RunTransitionInfo transitionInfo(runPrincipal, es);
1194  using Traits = OccurrenceTraits<RunPrincipal, BranchActionGlobalEnd>;
1195  endGlobalTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &globalWaitTask),
1196  *schedule_,
1197  transitionInfo,
1198  serviceToken_,
1199  subProcesses_,
1200  cleaningUpAfterException);
1201  do {
1202  taskGroup_.wait();
1203  } while (not globalWaitTask.done());
1204  if (globalWaitTask.exceptionPtr() != nullptr) {
1205  std::rethrow_exception(*(globalWaitTask.exceptionPtr()));
1206  }
1207  }
1208  FDEBUG(1) << "\tendRun " << run << "\n";
1209  if (looper_) {
1210  looper_->doEndRun(runPrincipal, es, &processContext_);
1211  }
1212  }

References actReg_, edm::FinalWaitingTask::done(), edm::RunPrincipal::endTime(), esp_, espController_, edm::WaitingTask::exceptionPtr(), FDEBUG, input_, looper_, edm::EventID::maxEventNumber(), edm::LuminosityBlockID::maxLuminosityBlockNumber(), edm::PreallocationConfiguration::numberOfStreams(), preallocations_, principalCache_, processContext_, edm::RunPrincipal::run(), run(), edm::PrincipalCache::runPrincipal(), schedule_, serviceToken_, edm::RunPrincipal::setEndTime(), subProcesses_, edm::eventsetup::synchronousEventSetupForInstance(), and taskGroup_.

Referenced by endUnfinishedRun().

◆ endUnfinishedLumi()

void edm::EventProcessor::endUnfinishedLumi ( )

Definition at line 1580 of file EventProcessor.cc.

1580  {
1581  if (streamLumiActive_.load() > 0) {
1582  FinalWaitingTask globalWaitTask;
1583  {
1584  WaitingTaskHolder globalTaskHolder{taskGroup_, &globalWaitTask};
1585  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1586  if (streamLumiStatus_[i]) {
1587  streamEndLumiAsync(globalTaskHolder, i);
1588  }
1589  }
1590  }
1591  do {
1592  taskGroup_.wait();
1593  } while (not globalWaitTask.done());
1594  if (globalWaitTask.exceptionPtr() != nullptr) {
1595  std::rethrow_exception(*(globalWaitTask.exceptionPtr()));
1596  }
1597  }
1598  }

References edm::FinalWaitingTask::done(), edm::WaitingTask::exceptionPtr(), mps_fire::i, edm::PreallocationConfiguration::numberOfStreams(), preallocations_, streamEndLumiAsync(), streamLumiActive_, streamLumiStatus_, and taskGroup_.

◆ endUnfinishedRun()

void edm::EventProcessor::endUnfinishedRun ( ProcessHistoryID const &  phid,
RunNumber_t  run,
bool  globalBeginSucceeded,
bool  cleaningUpAfterException,
bool  eventSetupForInstanceSucceeded 
)

Definition at line 1122 of file EventProcessor.cc.

1126  {
1127  if (eventSetupForInstanceSucceeded) {
1128  //If we skip empty runs, this would be called conditionally
1129  endRun(phid, run, globalBeginSucceeded, cleaningUpAfterException);
1130 
1131  if (globalBeginSucceeded) {
1132  FinalWaitingTask t;
1133  RunPrincipal& runPrincipal = principalCache_.runPrincipal(phid, run);
1134  MergeableRunProductMetadata* mergeableRunProductMetadata = runPrincipal.mergeableRunProductMetadata();
1135  mergeableRunProductMetadata->preWriteRun();
1136  writeRunAsync(edm::WaitingTaskHolder{taskGroup_, &t}, phid, run, mergeableRunProductMetadata);
1137  do {
1138  taskGroup_.wait();
1139  } while (not t.done());
1140  mergeableRunProductMetadata->postWriteRun();
1141  if (t.exceptionPtr()) {
1142  std::rethrow_exception(*t.exceptionPtr());
1143  }
1144  }
1145  }
1146  deleteRunFromCache(phid, run);
1147  }

References deleteRunFromCache(), endRun(), edm::RunPrincipal::mergeableRunProductMetadata(), edm::MergeableRunProductMetadata::postWriteRun(), edm::MergeableRunProductMetadata::preWriteRun(), principalCache_, run(), edm::PrincipalCache::runPrincipal(), submitPVValidationJobs::t, taskGroup_, and writeRunAsync().

◆ fileBlockValid()

bool edm::EventProcessor::fileBlockValid ( )
inline

Definition at line 202 of file EventProcessor.h.

202 { return fb_.get() != nullptr; }

References fb_.

Referenced by closeInputFile(), openOutputFiles(), respondToCloseInputFile(), and respondToOpenInputFile().

◆ getAllModuleDescriptions()

std::vector< ModuleDescription const * > edm::EventProcessor::getAllModuleDescriptions ( ) const

Return a vector allowing const access to all the ModuleDescriptions for this EventProccessor. *** N.B. *** Ownership of the ModuleDescriptions is not *** passed to the caller. Do not call delete on these *** pointers!

Definition at line 699 of file EventProcessor.cc.

699  {
700  return schedule_->getAllModuleDescriptions();
701  }

References schedule_.

◆ getToken()

ServiceToken edm::EventProcessor::getToken ( )

Definition at line 697 of file EventProcessor.cc.

697 { return serviceToken_; }

References serviceToken_.

Referenced by ~EventProcessor().

◆ globalEndLumiAsync()

void edm::EventProcessor::globalEndLumiAsync ( edm::WaitingTaskHolder  iTask,
std::shared_ptr< LuminosityBlockProcessingStatus iLumiStatus 
)

Definition at line 1442 of file EventProcessor.cc.

1443  {
1444  // Get some needed info out of the status object before moving
1445  // it into finalTaskForThisLumi.
1446  auto& lp = *(iLumiStatus->lumiPrincipal());
1447  bool didGlobalBeginSucceed = iLumiStatus->didGlobalBeginSucceed();
1448  bool cleaningUpAfterException = iLumiStatus->cleaningUpAfterException();
1449  EventSetupImpl const& es = iLumiStatus->eventSetupImpl(esp_->subProcessIndex());
1450  std::vector<std::shared_ptr<const EventSetupImpl>> const* eventSetupImpls = &iLumiStatus->eventSetupImpls();
1451 
1452  // group is used later in this function, and lives outside of iTask
1453  tbb::task_group& taskGroup = *iTask.group();
1454  auto finalTaskForThisLumi = edm::make_waiting_task(
1455  [status = std::move(iLumiStatus), iTask = std::move(iTask), this](std::exception_ptr const* iPtr) mutable {
1456  std::exception_ptr ptr;
1457  if (iPtr) {
1458  handleEndLumiExceptions(iPtr, iTask);
1459  } else {
1460  // Caught exception is passed to handleEndLumiExceptions()
1461  CMS_SA_ALLOW try {
1463  if (looper_) {
1464  auto& lumiPrincipal = *(status->lumiPrincipal());
1465  EventSetupImpl const& eventSetupImpl = status->eventSetupImpl(esp_->subProcessIndex());
1466  looper_->doEndLuminosityBlock(lumiPrincipal, eventSetupImpl, &processContext_);
1467  }
1468  } catch (...) {
1469  ptr = std::current_exception();
1470  }
1471  }
1473 
1474  // Try hard to clean up resources so the
1475  // process can terminate in a controlled
1476  // fashion even after exceptions have occurred.
1477  // Caught exception is passed to handleEndLumiExceptions()
1478  CMS_SA_ALLOW try { deleteLumiFromCache(*status); } catch (...) {
1479  if (not ptr) {
1480  ptr = std::current_exception();
1481  }
1482  }
1483  // Caught exception is passed to handleEndLumiExceptions()
1484  CMS_SA_ALLOW try {
1485  status->resumeGlobalLumiQueue();
1487  } catch (...) {
1488  if (not ptr) {
1489  ptr = std::current_exception();
1490  }
1491  }
1492  // Caught exception is passed to handleEndLumiExceptions()
1493  CMS_SA_ALLOW try {
1494  // This call to status.resetResources() must occur before iTask is destroyed.
1495  // Otherwise there will be a data race which could result in endRun
1496  // being delayed until it is too late to successfully call it.
1497  status->resetResources();
1498  status.reset();
1499  } catch (...) {
1500  if (not ptr) {
1501  ptr = std::current_exception();
1502  }
1503  }
1504 
1505  if (ptr) {
1506  handleEndLumiExceptions(&ptr, iTask);
1507  }
1508  });
1509 
1510  auto writeT = edm::make_waiting_task(
1511  [this, didGlobalBeginSucceed, &lumiPrincipal = lp, task = WaitingTaskHolder(taskGroup, finalTaskForThisLumi)](
1512  std::exception_ptr const* iExcept) mutable {
1513  if (iExcept) {
1514  task.doneWaiting(*iExcept);
1515  } else {
1516  //Only call writeLumi if beginLumi succeeded
1517  if (didGlobalBeginSucceed) {
1518  writeLumiAsync(std::move(task), lumiPrincipal);
1519  }
1520  }
1521  });
1522 
1523  IOVSyncValue ts(EventID(lp.run(), lp.luminosityBlock(), EventID::maxEventNumber()), lp.beginTime());
1524 
1525  LumiTransitionInfo transitionInfo(lp, es, eventSetupImpls);
1526  using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalEnd>;
1527  endGlobalTransitionAsync<Traits>(WaitingTaskHolder(taskGroup, writeT),
1528  *schedule_,
1529  transitionInfo,
1530  serviceToken_,
1531  subProcesses_,
1532  cleaningUpAfterException);
1533  }

References CMS_SA_ALLOW, deleteLumiFromCache(), esp_, edm::WaitingTaskHolder::group(), handleEndLumiExceptions(), looper_, edm::make_waiting_task(), edm::EventID::maxEventNumber(), eostools::move(), processContext_, queueWhichWaitsForIOVsToFinish_, edm::SerialTaskQueue::resume(), schedule_, serviceToken_, mps_update::status, subProcesses_, TrackValidation_cff::task, and writeLumiAsync().

Referenced by streamEndLumiAsync().

◆ handleEndLumiExceptions()

void edm::EventProcessor::handleEndLumiExceptions ( std::exception_ptr const *  iPtr,
WaitingTaskHolder holder 
)

Definition at line 1433 of file EventProcessor.cc.

1433  {
1434  if (setDeferredException(*iPtr)) {
1435  WaitingTaskHolder tmp(holder);
1436  tmp.doneWaiting(*iPtr);
1437  } else {
1439  }
1440  }

References setDeferredException(), setExceptionMessageLumis(), and createJobs::tmp.

Referenced by globalEndLumiAsync(), and streamEndLumiAsync().

◆ handleNextEventForStreamAsync()

void edm::EventProcessor::handleNextEventForStreamAsync ( WaitingTaskHolder  iTask,
unsigned int  iStreamIndex 
)
private

Definition at line 1806 of file EventProcessor.cc.

1806  {
1807  sourceResourcesAcquirer_.serialQueueChain().push(*iTask.group(), [this, iTask, iStreamIndex]() mutable {
1809  //we do not want to extend the lifetime of the shared_ptr to the end of this function
1810  // as steramEndLumiAsync may clear the value from streamLumiStatus_[iStreamIndex]
1811  auto status = streamLumiStatus_[iStreamIndex].get();
1812  // Caught exception is propagated to EventProcessor::runToCompletion() via deferredExceptionPtr_
1813  CMS_SA_ALLOW try {
1814  if (readNextEventForStream(iStreamIndex, *status)) {
1815  auto recursionTask = make_waiting_task([this, iTask, iStreamIndex](std::exception_ptr const* iPtr) mutable {
1816  if (iPtr) {
1817  // Try to end the stream properly even if an exception was
1818  // thrown on an event.
1819  bool expected = false;
1820  if (deferredExceptionPtrIsSet_.compare_exchange_strong(expected, true)) {
1821  // This is the case where the exception in iPtr is the primary
1822  // exception and we want to see its message.
1823  deferredExceptionPtr_ = *iPtr;
1824  WaitingTaskHolder tempHolder(iTask);
1825  tempHolder.doneWaiting(*iPtr);
1826  }
1827  streamEndLumiAsync(std::move(iTask), iStreamIndex);
1828  //the stream will stop now
1829  return;
1830  }
1831  handleNextEventForStreamAsync(std::move(iTask), iStreamIndex);
1832  });
1833 
1834  processEventAsync(WaitingTaskHolder(*iTask.group(), recursionTask), iStreamIndex);
1835  } else {
1836  //the stream will stop now
1837  if (status->isLumiEnding()) {
1838  if (lastTransitionType() == InputSource::IsLumi and not status->haveStartedNextLumi()) {
1839  status->startNextLumi();
1840  beginLumiAsync(status->nextSyncValue(), status->runResource(), iTask);
1841  }
1842  streamEndLumiAsync(std::move(iTask), iStreamIndex);
1843  } else {
1844  iTask.doneWaiting(std::exception_ptr{});
1845  }
1846  }
1847  } catch (...) {
1848  // It is unlikely we will ever get in here ...
1849  // But if we do try to clean up and propagate the exception
1850  if (streamLumiStatus_[iStreamIndex]) {
1851  streamEndLumiAsync(iTask, iStreamIndex);
1852  }
1853  bool expected = false;
1854  if (deferredExceptionPtrIsSet_.compare_exchange_strong(expected, true)) {
1855  auto e = std::current_exception();
1857  iTask.doneWaiting(e);
1858  }
1859  }
1860  });
1861  }

References beginLumiAsync(), CMS_SA_ALLOW, deferredExceptionPtr_, deferredExceptionPtrIsSet_, edm::WaitingTaskHolder::doneWaiting(), MillePedeFileConverter_cfg::e, edm::WaitingTaskHolder::group(), edm::InputSource::IsLumi, lastTransitionType(), edm::make_waiting_task(), eostools::move(), processEventAsync(), edm::SerialTaskQueueChain::push(), readNextEventForStream(), edm::SharedResourcesAcquirer::serialQueueChain(), serviceToken_, sourceResourcesAcquirer_, mps_update::status, streamEndLumiAsync(), and streamLumiStatus_.

Referenced by beginLumiAsync(), and continueLumiAsync().

◆ init()

void edm::EventProcessor::init ( std::shared_ptr< ProcessDesc > &  processDesc,
ServiceToken const &  token,
serviceregistry::ServiceLegacy  iLegacy 
)
private

Definition at line 344 of file EventProcessor.cc.

346  {
347  //std::cerr << processDesc->dump() << std::endl;
348 
349  // register the empty parentage vector , once and for all
351 
352  // register the empty parameter set, once and for all.
353  ParameterSet().registerIt();
354 
355  std::shared_ptr<ParameterSet> parameterSet = processDesc->getProcessPSet();
356 
357  // If there are subprocesses, pop the subprocess parameter sets out of the process parameter set
358  auto subProcessVParameterSet = popSubProcessVParameterSet(*parameterSet);
359  bool const hasSubProcesses = !subProcessVParameterSet.empty();
360 
361  // Validates the parameters in the 'options', 'maxEvents', 'maxLuminosityBlocks',
362  // and 'maxSecondsUntilRampdown' top level parameter sets. Default values are also
363  // set in here if the parameters were not explicitly set.
365 
366  // Now set some parameters specific to the main process.
367  ParameterSet const& optionsPset(parameterSet->getUntrackedParameterSet("options"));
368  auto const& fileMode = optionsPset.getUntrackedParameter<std::string>("fileMode");
369  if (fileMode != "NOMERGE" and fileMode != "FULLMERGE") {
370  throw Exception(errors::Configuration, "Illegal fileMode parameter value: ")
371  << fileMode << ".\n"
372  << "Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
373  } else {
374  fileModeNoMerge_ = (fileMode == "NOMERGE");
375  }
376  forceESCacheClearOnNewRun_ = optionsPset.getUntrackedParameter<bool>("forceEventSetupCacheClearOnNewRun");
377 
378  //threading
379  unsigned int nThreads = optionsPset.getUntrackedParameter<unsigned int>("numberOfThreads");
380 
381  // Even if numberOfThreads was set to zero in the Python configuration, the code
382  // in cmsRun.cpp should have reset it to something else.
383  assert(nThreads != 0);
384 
385  unsigned int nStreams = optionsPset.getUntrackedParameter<unsigned int>("numberOfStreams");
386  if (nStreams == 0) {
387  nStreams = nThreads;
388  }
389  if (nThreads > 1 or nStreams > 1) {
390  edm::LogInfo("ThreadStreamSetup") << "setting # threads " << nThreads << "\nsetting # streams " << nStreams;
391  }
392  unsigned int nConcurrentRuns = optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentRuns");
393  if (nConcurrentRuns != 1) {
394  throw Exception(errors::Configuration, "Illegal value nConcurrentRuns : ")
395  << "Although the plan is to change this in the future, currently nConcurrentRuns must always be 1.\n";
396  }
397  unsigned int nConcurrentLumis =
398  optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentLuminosityBlocks");
399  if (nConcurrentLumis == 0) {
400  nConcurrentLumis = nConcurrentRuns;
401  }
402 
403  //Check that relationships between threading parameters makes sense
404  /*
405  if(nThreads<nStreams) {
406  //bad
407  }
408  if(nConcurrentRuns>nStreams) {
409  //bad
410  }
411  if(nConcurrentRuns>nConcurrentLumis) {
412  //bad
413  }
414  */
415  IllegalParameters::setThrowAnException(optionsPset.getUntrackedParameter<bool>("throwIfIllegalParameter"));
416 
417  printDependencies_ = optionsPset.getUntrackedParameter<bool>("printDependencies");
419  optionsPset.getUntrackedParameter<bool>("deleteNonConsumedUnscheduledModules");
420 
421  // Now do general initialization
422  ScheduleItems items;
423 
424  //initialize the services
425  auto& serviceSets = processDesc->getServicesPSets();
426  ServiceToken token = items.initServices(serviceSets, *parameterSet, iToken, iLegacy, true);
427  serviceToken_ = items.addCPRandTNS(*parameterSet, token);
428 
429  //make the services available
431 
432  if (nStreams > 1) {
434  handler->willBeUsingThreads();
435  }
436 
437  // intialize miscellaneous items
438  std::shared_ptr<CommonParams> common(items.initMisc(*parameterSet));
439 
440  // intialize the event setup provider
441  ParameterSet const& eventSetupPset(optionsPset.getUntrackedParameterSet("eventSetup"));
442  esp_ = espController_->makeProvider(*parameterSet, items.actReg_.get(), &eventSetupPset);
443 
444  // initialize the looper, if any
446  if (looper_) {
447  looper_->setActionTable(items.act_table_.get());
448  looper_->attachTo(*items.actReg_);
449 
450  //For now loopers make us run only 1 transition at a time
451  nStreams = 1;
452  nConcurrentLumis = 1;
453  nConcurrentRuns = 1;
454  // in presence of looper do not delete modules
456  }
457  espController_->setMaxConcurrentIOVs(nStreams, nConcurrentLumis);
458 
459  preallocations_ = PreallocationConfiguration{nThreads, nStreams, nConcurrentLumis, nConcurrentRuns};
460 
461  lumiQueue_ = std::make_unique<LimitedTaskQueue>(nConcurrentLumis);
462  streamQueues_.resize(nStreams);
463  streamLumiStatus_.resize(nStreams);
464 
465  // initialize the input source
467  *common,
468  items.preg(),
469  items.branchIDListHelper(),
470  items.thinnedAssociationsHelper(),
471  items.actReg_,
472  items.processConfiguration(),
474 
475  // intialize the Schedule
476  schedule_ = items.initSchedule(*parameterSet, hasSubProcesses, preallocations_, &processContext_);
477 
478  // set the data members
479  act_table_ = std::move(items.act_table_);
480  actReg_ = items.actReg_;
481  preg_ = items.preg();
483  branchIDListHelper_ = items.branchIDListHelper();
484  thinnedAssociationsHelper_ = items.thinnedAssociationsHelper();
485  processConfiguration_ = items.processConfiguration();
487  principalCache_.setProcessHistoryRegistry(input_->processHistoryRegistry());
488 
489  FDEBUG(2) << parameterSet << std::endl;
490 
492  for (unsigned int index = 0; index < preallocations_.numberOfStreams(); ++index) {
493  // Reusable event principal
494  auto ep = std::make_shared<EventPrincipal>(preg(),
498  historyAppender_.get(),
499  index);
501  }
502 
503  for (unsigned int index = 0; index < preallocations_.numberOfLuminosityBlocks(); ++index) {
504  auto lp =
505  std::make_unique<LuminosityBlockPrincipal>(preg(), *processConfiguration_, historyAppender_.get(), index);
507  }
508 
509  {
510  auto pb = std::make_unique<ProcessBlockPrincipal>(preg(), *processConfiguration_);
512 
513  auto pbForInput = std::make_unique<ProcessBlockPrincipal>(preg(), *processConfiguration_);
515  }
516 
517  // fill the subprocesses, if there are any
518  subProcesses_.reserve(subProcessVParameterSet.size());
519  for (auto& subProcessPSet : subProcessVParameterSet) {
520  subProcesses_.emplace_back(subProcessPSet,
521  *parameterSet,
522  preg(),
525  SubProcessParentageHelper(),
527  *actReg_,
528  token,
531  &processContext_);
532  }
533  }

References act_table_, actReg_, cms::cuda::assert(), branchIDListHelper(), branchIDListHelper_, trackingPlots::common, edm::errors::Configuration, deleteNonConsumedUnscheduledModules_, SiStripBadComponentsDQMServiceTemplate_cfg::ep, esp_, espController_, Exception, FDEBUG, processOptions_cff::fileMode, fileModeNoMerge_, edm::fillLooper(), forceESCacheClearOnNewRun_, edm::ParameterSet::getUntrackedParameterSet(), historyAppender_, input_, edm::PrincipalCache::insert(), edm::PrincipalCache::insertForInput(), edm::ParentageRegistry::insertMapped(), edm::ParentageRegistry::instance(), mps_monitormerge::items, edm::serviceregistry::kConfigurationOverrides, looper_, lumiQueue_, edm::makeInput(), mergeableRunProductProcesses_, eostools::move(), runTheMatrix::nStreams, runTheMatrix::nThreads, edm::PreallocationConfiguration::numberOfLuminosityBlocks(), edm::PreallocationConfiguration::numberOfStreams(), or, edm::parameterSet(), edm::popSubProcessVParameterSet(), preallocations_, preg(), preg_, principalCache_, printDependencies_, processConfiguration_, processContext_, edm::ParameterSet::registerIt(), schedule_, serviceToken_, edm::PrincipalCache::setNumberOfConcurrentPrincipals(), edm::ProcessContext::setProcessConfiguration(), edm::MergeableRunProductProcesses::setProcessesWithMergeableRunProducts(), edm::PrincipalCache::setProcessHistoryRegistry(), edm::IllegalParameters::setThrowAnException(), streamLumiStatus_, streamQueues_, AlCaHLTBitMon_QueryRunRegistry::string, subProcesses_, thinnedAssociationsHelper(), thinnedAssociationsHelper_, unpackBuffers-CaloStage2::token, and edm::validateTopLevelParameterSets().

Referenced by EventProcessor().

◆ inputProcessBlocks()

void edm::EventProcessor::inputProcessBlocks ( )

Definition at line 970 of file EventProcessor.cc.

970  {
971  ProcessBlockPrincipal& processBlockPrincipal = principalCache_.inputProcessBlockPrincipal();
972  // For now the input source always returns false from readProcessBlock,
973  // so this does nothing at all.
974  // Eventually the ProcessBlockPrincipal needs to be properly filled
975  // and cleared. The delayed reader needs to be set. The correct process name
976  // needs to be supplied.
977  while (input_->readProcessBlock()) {
978  DelayedReader* reader = nullptr;
979  processBlockPrincipal.fillProcessBlockPrincipal(processConfiguration_->processName(), reader);
980 
981  using Traits = OccurrenceTraits<ProcessBlockPrincipal, BranchActionProcessBlockInput>;
982  FinalWaitingTask globalWaitTask;
983 
984  ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
985  beginGlobalTransitionAsync<Traits>(
986  WaitingTaskHolder(taskGroup_, &globalWaitTask), *schedule_, transitionInfo, serviceToken_, subProcesses_);
987 
988  do {
989  taskGroup_.wait();
990  } while (not globalWaitTask.done());
991  if (globalWaitTask.exceptionPtr() != nullptr) {
992  std::rethrow_exception(*(globalWaitTask.exceptionPtr()));
993  }
994 
995  FinalWaitingTask writeWaitTask;
997  do {
998  taskGroup_.wait();
999  } while (not writeWaitTask.done());
1000  if (writeWaitTask.exceptionPtr()) {
1001  std::rethrow_exception(*writeWaitTask.exceptionPtr());
1002  }
1003 
1004  processBlockPrincipal.clearPrincipal();
1005  for (auto& s : subProcesses_) {
1006  s.clearProcessBlockPrincipal(ProcessBlockType::Input);
1007  }
1008  }
1009  }

References edm::Principal::clearPrincipal(), edm::FinalWaitingTask::done(), edm::WaitingTask::exceptionPtr(), edm::ProcessBlockPrincipal::fillProcessBlockPrincipal(), edm::PrincipalCache::Input, input_, edm::PrincipalCache::inputProcessBlockPrincipal(), principalCache_, processConfiguration_, DQM::reader, alignCSCRings::s, schedule_, serviceToken_, subProcesses_, taskGroup_, and writeProcessBlockAsync().

◆ lastTransitionType()

InputSource::ItemType edm::EventProcessor::lastTransitionType ( ) const
inline

Definition at line 192 of file EventProcessor.h.

192  {
194  return InputSource::IsStop;
195  }
196  return lastSourceTransition_;
197  }

References deferredExceptionPtrIsSet_, edm::InputSource::IsStop, and lastSourceTransition_.

Referenced by handleNextEventForStreamAsync(), and processLumis().

◆ looper() [1/2]

std::shared_ptr<EDLooperBase>& edm::EventProcessor::looper ( )
inlineprivate

Definition at line 303 of file EventProcessor.h.

303 { return get_underlying_safe(looper_); }

References edm::get_underlying_safe(), and looper_.

◆ looper() [2/2]

std::shared_ptr<EDLooperBase const> edm::EventProcessor::looper ( ) const
inlineprivate

Definition at line 302 of file EventProcessor.h.

302 { return get_underlying_safe(looper_); }

References edm::get_underlying_safe(), and looper_.

Referenced by endJob().

◆ nextLuminosityBlockID()

edm::LuminosityBlockNumber_t edm::EventProcessor::nextLuminosityBlockID ( )

Definition at line 760 of file EventProcessor.cc.

760 { return input_->luminosityBlock(); }

References input_.

Referenced by readNextEventForStream().

◆ nextRunID()

std::pair< edm::ProcessHistoryID, edm::RunNumber_t > edm::EventProcessor::nextRunID ( )

Definition at line 756 of file EventProcessor.cc.

756  {
757  return std::make_pair(input_->reducedProcessHistoryID(), input_->run());
758  }

References input_.

◆ nextTransitionType()

InputSource::ItemType edm::EventProcessor::nextTransitionType ( )

Definition at line 730 of file EventProcessor.cc.

730  {
731  if (deferredExceptionPtrIsSet_.load()) {
733  return InputSource::IsStop;
734  }
735 
736  SendSourceTerminationSignalIfException sentry(actReg_.get());
737  InputSource::ItemType itemType;
738  //For now, do nothing with InputSource::IsSynchronize
739  do {
740  itemType = input_->nextItemType();
741  } while (itemType == InputSource::IsSynchronize);
742 
743  lastSourceTransition_ = itemType;
744  sentry.completedSuccessfully();
745 
747 
749  actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExternalSignal);
751  }
752 
753  return lastSourceTransition_;
754  }

References actReg_, checkForAsyncStopRequest(), deferredExceptionPtrIsSet_, epSuccess, edm::ExternalSignal, input_, edm::InputSource::IsStop, edm::InputSource::IsSynchronize, lastSourceTransition_, and runEdmFileComparison::returnCode.

Referenced by readNextEventForStream().

◆ openOutputFiles()

void edm::EventProcessor::openOutputFiles ( )

Definition at line 858 of file EventProcessor.cc.

858  {
859  if (fileBlockValid()) {
860  schedule_->openOutputFiles(*fb_);
861  for_all(subProcesses_, [this](auto& subProcess) { subProcess.openOutputFiles(*fb_); });
862  }
863  FDEBUG(1) << "\topenOutputFiles\n";
864  }

References fb_, FDEBUG, fileBlockValid(), edm::for_all(), schedule_, and subProcesses_.

◆ operator=()

EventProcessor& edm::EventProcessor::operator= ( EventProcessor const &  )
delete

◆ preg() [1/2]

std::shared_ptr<ProductRegistry>& edm::EventProcessor::preg ( )
inlineprivate

Definition at line 291 of file EventProcessor.h.

291 { return get_underlying_safe(preg_); }

References edm::get_underlying_safe(), and preg_.

◆ preg() [2/2]

std::shared_ptr<ProductRegistry const> edm::EventProcessor::preg ( ) const
inlineprivate

Definition at line 290 of file EventProcessor.h.

290 { return get_underlying_safe(preg_); }

References edm::get_underlying_safe(), and preg_.

Referenced by beginJob(), init(), readAndMergeLumi(), readAndMergeRun(), readFile(), and readRun().

◆ prepareForNextLoop()

void edm::EventProcessor::prepareForNextLoop ( )

Definition at line 922 of file EventProcessor.cc.

922  {
923  looper_->prepareForNextLoop(esp_.get());
924  FDEBUG(1) << "\tprepareForNextLoop\n";
925  }

References esp_, FDEBUG, and looper_.

Referenced by runToCompletion().

◆ processConfiguration()

ProcessConfiguration const& edm::EventProcessor::processConfiguration ( ) const
inline

Definition at line 138 of file EventProcessor.h.

138 { return *processConfiguration_; }

References processConfiguration_.

◆ processEventAsync()

void edm::EventProcessor::processEventAsync ( WaitingTaskHolder  iHolder,
unsigned int  iStreamIndex 
)
private

Definition at line 1877 of file EventProcessor.cc.

1877  {
1878  iHolder.group()->run([=]() { processEventAsyncImpl(iHolder, iStreamIndex); });
1879  }

References edm::WaitingTaskHolder::group(), and processEventAsyncImpl().

Referenced by handleNextEventForStreamAsync().

◆ processEventAsyncImpl()

void edm::EventProcessor::processEventAsyncImpl ( WaitingTaskHolder  iHolder,
unsigned int  iStreamIndex 
)
private

Definition at line 1881 of file EventProcessor.cc.

1881  {
1882  auto pep = &(principalCache_.eventPrincipal(iStreamIndex));
1883 
1886  if (rng.isAvailable()) {
1887  Event ev(*pep, ModuleDescription(), nullptr);
1888  rng->postEventRead(ev);
1889  }
1890 
1891  WaitingTaskHolder finalizeEventTask(
1892  *iHolder.group(), make_waiting_task([this, pep, iHolder, iStreamIndex](std::exception_ptr const* iPtr) mutable {
1893  //NOTE: If we have a looper we only have one Stream
1894  if (looper_) {
1895  ServiceRegistry::Operate operateLooper(serviceToken_);
1896  processEventWithLooper(*pep, iStreamIndex);
1897  }
1898 
1899  FDEBUG(1) << "\tprocessEvent\n";
1900  pep->clearEventPrincipal();
1901  if (iPtr) {
1902  iHolder.doneWaiting(*iPtr);
1903  } else {
1904  iHolder.doneWaiting(std::exception_ptr());
1905  }
1906  }));
1907  WaitingTaskHolder afterProcessTask;
1908  if (subProcesses_.empty()) {
1909  afterProcessTask = std::move(finalizeEventTask);
1910  } else {
1911  //Need to run SubProcesses after schedule has finished
1912  // with the event
1913  afterProcessTask = WaitingTaskHolder(
1914  *iHolder.group(),
1915  make_waiting_task([this, pep, finalizeEventTask, iStreamIndex](std::exception_ptr const* iPtr) mutable {
1916  if (not iPtr) {
1917  //when run with 1 thread, we want to the order to be what
1918  // it was before. This requires reversing the order since
1919  // tasks are run last one in first one out
1920  for (auto& subProcess : boost::adaptors::reverse(subProcesses_)) {
1921  subProcess.doEventAsync(finalizeEventTask, *pep, &streamLumiStatus_[iStreamIndex]->eventSetupImpls());
1922  }
1923  } else {
1924  finalizeEventTask.doneWaiting(*iPtr);
1925  }
1926  }));
1927  }
1928 
1929  EventSetupImpl const& es = streamLumiStatus_[iStreamIndex]->eventSetupImpl(esp_->subProcessIndex());
1930  EventTransitionInfo info(*pep, es);
1931  schedule_->processOneEventAsync(std::move(afterProcessTask), iStreamIndex, info, serviceToken_);
1932  }

References edm::WaitingTaskHolder::doneWaiting(), esp_, ev, edm::PrincipalCache::eventPrincipal(), FDEBUG, edm::WaitingTaskHolder::group(), info(), edm::Service< T >::isAvailable(), looper_, edm::make_waiting_task(), eostools::move(), principalCache_, processEventWithLooper(), groupFilesInBlocks::reverse, schedule_, serviceToken_, streamLumiStatus_, and subProcesses_.

Referenced by processEventAsync().

◆ processEventWithLooper()

void edm::EventProcessor::processEventWithLooper ( EventPrincipal iPrincipal,
unsigned int  iStreamIndex 
)
private

Definition at line 1934 of file EventProcessor.cc.

1934  {
1935  bool randomAccess = input_->randomAccess();
1936  ProcessingController::ForwardState forwardState = input_->forwardState();
1937  ProcessingController::ReverseState reverseState = input_->reverseState();
1938  ProcessingController pc(forwardState, reverseState, randomAccess);
1939 
1941  do {
1942  StreamContext streamContext(iPrincipal.streamID(), &processContext_);
1943  EventSetupImpl const& es = streamLumiStatus_[iStreamIndex]->eventSetupImpl(esp_->subProcessIndex());
1944  status = looper_->doDuringLoop(iPrincipal, es, pc, &streamContext);
1945 
1946  bool succeeded = true;
1947  if (randomAccess) {
1948  if (pc.requestedTransition() == ProcessingController::kToPreviousEvent) {
1949  input_->skipEvents(-2);
1950  } else if (pc.requestedTransition() == ProcessingController::kToSpecifiedEvent) {
1951  succeeded = input_->goToEvent(pc.specifiedEventTransition());
1952  }
1953  }
1954  pc.setLastOperationSucceeded(succeeded);
1955  } while (!pc.lastOperationSucceeded());
1957  shouldWeStop_ = true;
1959  }
1960  }

References esp_, input_, edm::InputSource::IsStop, edm::EDLooperBase::kContinue, edm::ProcessingController::kToPreviousEvent, edm::ProcessingController::kToSpecifiedEvent, edm::ProcessingController::lastOperationSucceeded(), lastSourceTransition_, looper_, processContext_, edm::ProcessingController::requestedTransition(), edm::ProcessingController::setLastOperationSucceeded(), shouldWeStop_, edm::ProcessingController::specifiedEventTransition(), mps_update::status, edm::EventPrincipal::streamID(), streamLumiStatus_, and summarizeEdmComparisonLogfiles::succeeded.

Referenced by processEventAsyncImpl().

◆ processLumis()

InputSource::ItemType edm::EventProcessor::processLumis ( std::shared_ptr< void > const &  iRunResource)

Definition at line 1214 of file EventProcessor.cc.

1214  {
1215  FinalWaitingTask waitTask;
1216  if (streamLumiActive_ > 0) {
1218  // Continue after opening a new input file
1220  } else {
1221  beginLumiAsync(IOVSyncValue(EventID(input_->run(), input_->luminosityBlock(), 0),
1222  input_->luminosityBlockAuxiliary()->beginTime()),
1223  iRunResource,
1224  WaitingTaskHolder{taskGroup_, &waitTask});
1225  }
1226  do {
1227  taskGroup_.wait();
1228  } while (not waitTask.done());
1229 
1230  if (waitTask.exceptionPtr() != nullptr) {
1231  std::rethrow_exception(*(waitTask.exceptionPtr()));
1232  }
1233  return lastTransitionType();
1234  }

References cms::cuda::assert(), beginLumiAsync(), continueLumiAsync(), edm::FinalWaitingTask::done(), edm::WaitingTask::exceptionPtr(), input_, lastTransitionType(), edm::PreallocationConfiguration::numberOfStreams(), preallocations_, streamLumiActive_, and taskGroup_.

◆ readAndMergeLumi()

int edm::EventProcessor::readAndMergeLumi ( LuminosityBlockProcessingStatus iStatus)

Definition at line 1654 of file EventProcessor.cc.

1654  {
1655  auto& lumiPrincipal = *iStatus.lumiPrincipal();
1656  assert(lumiPrincipal.aux().sameIdentity(*input_->luminosityBlockAuxiliary()) or
1657  input_->processHistoryRegistry().reducedProcessHistoryID(lumiPrincipal.aux().processHistoryID()) ==
1658  input_->processHistoryRegistry().reducedProcessHistoryID(
1659  input_->luminosityBlockAuxiliary()->processHistoryID()));
1660  bool lumiOK = lumiPrincipal.adjustToNewProductRegistry(*preg());
1661  assert(lumiOK);
1662  lumiPrincipal.mergeAuxiliary(*input_->luminosityBlockAuxiliary());
1663  {
1664  SendSourceTerminationSignalIfException sentry(actReg_.get());
1665  input_->readAndMergeLumi(*iStatus.lumiPrincipal());
1666  sentry.completedSuccessfully();
1667  }
1668  return input_->luminosityBlock();
1669  }

References actReg_, cms::cuda::assert(), input_, edm::LuminosityBlockProcessingStatus::lumiPrincipal(), or, and preg().

Referenced by readNextEventForStream().

◆ readAndMergeRun()

std::pair< ProcessHistoryID, RunNumber_t > edm::EventProcessor::readAndMergeRun ( )

Definition at line 1623 of file EventProcessor.cc.

1623  {
1624  principalCache_.merge(input_->runAuxiliary(), preg());
1625  auto runPrincipal = principalCache_.runPrincipalPtr();
1626  {
1627  SendSourceTerminationSignalIfException sentry(actReg_.get());
1628  input_->readAndMergeRun(*runPrincipal);
1629  sentry.completedSuccessfully();
1630  }
1631  assert(input_->reducedProcessHistoryID() == runPrincipal->reducedProcessHistoryID());
1632  return std::make_pair(runPrincipal->reducedProcessHistoryID(), input_->run());
1633  }

References actReg_, cms::cuda::assert(), input_, edm::PrincipalCache::merge(), preg(), principalCache_, and edm::PrincipalCache::runPrincipalPtr().

◆ readEvent()

void edm::EventProcessor::readEvent ( unsigned int  iStreamIndex)
private

Definition at line 1863 of file EventProcessor.cc.

1863  {
1864  //TODO this will have to become per stream
1865  auto& event = principalCache_.eventPrincipal(iStreamIndex);
1866  StreamContext streamContext(event.streamID(), &processContext_);
1867 
1868  SendSourceTerminationSignalIfException sentry(actReg_.get());
1869  input_->readEvent(event, streamContext);
1870 
1871  streamLumiStatus_[iStreamIndex]->updateLastTimestamp(input_->timestamp());
1872  sentry.completedSuccessfully();
1873 
1874  FDEBUG(1) << "\treadEvent\n";
1875  }

References actReg_, edm::PrincipalCache::eventPrincipal(), FDEBUG, input_, principalCache_, processContext_, and streamLumiStatus_.

Referenced by readNextEventForStream().

◆ readFile()

void edm::EventProcessor::readFile ( )

◆ readLuminosityBlock()

void edm::EventProcessor::readLuminosityBlock ( LuminosityBlockProcessingStatus iStatus)

Definition at line 1635 of file EventProcessor.cc.

1635  {
1637  throw edm::Exception(edm::errors::LogicError) << "EventProcessor::readLuminosityBlock\n"
1638  << "Illegal attempt to insert lumi into cache\n"
1639  << "Run is invalid\n"
1640  << "Contact a Framework Developer\n";
1641  }
1643  assert(lbp);
1644  lbp->setAux(*input_->luminosityBlockAuxiliary());
1645  {
1646  SendSourceTerminationSignalIfException sentry(actReg_.get());
1647  input_->readLuminosityBlock(*lbp, *historyAppender_);
1648  sentry.completedSuccessfully();
1649  }
1650  lbp->setRunPrincipal(principalCache_.runPrincipalPtr());
1651  iStatus.lumiPrincipal() = std::move(lbp);
1652  }

References actReg_, cms::cuda::assert(), Exception, edm::PrincipalCache::getAvailableLumiPrincipalPtr(), edm::PrincipalCache::hasRunPrincipal(), historyAppender_, input_, edm::errors::LogicError, edm::LuminosityBlockProcessingStatus::lumiPrincipal(), eostools::move(), principalCache_, and edm::PrincipalCache::runPrincipalPtr().

Referenced by beginLumiAsync().

◆ readNextEventForStream()

bool edm::EventProcessor::readNextEventForStream ( unsigned int  iStreamIndex,
LuminosityBlockProcessingStatus iLumiStatus 
)
private

Definition at line 1744 of file EventProcessor.cc.

1744  {
1745  if (deferredExceptionPtrIsSet_.load(std::memory_order_acquire)) {
1746  iStatus.endLumi();
1747  return false;
1748  }
1749 
1750  if (iStatus.wasEventProcessingStopped()) {
1751  return false;
1752  }
1753 
1754  if (shouldWeStop()) {
1756  iStatus.stopProcessingEvents();
1757  iStatus.endLumi();
1758  return false;
1759  }
1760 
1762  // Caught exception is propagated to EventProcessor::runToCompletion() via deferredExceptionPtr_
1763  CMS_SA_ALLOW try {
1764  //need to use lock in addition to the serial task queue because
1765  // of delayed provenance reading and reading data in response to
1766  // edm::Refs etc
1767  std::lock_guard<std::recursive_mutex> guard(*(sourceMutex_.get()));
1768 
1769  auto itemType = iStatus.continuingLumi() ? InputSource::IsLumi : nextTransitionType();
1770  if (InputSource::IsLumi == itemType) {
1771  iStatus.haveContinuedLumi();
1772  while (itemType == InputSource::IsLumi and iStatus.lumiPrincipal()->run() == input_->run() and
1773  iStatus.lumiPrincipal()->luminosityBlock() == nextLuminosityBlockID()) {
1774  readAndMergeLumi(iStatus);
1775  itemType = nextTransitionType();
1776  }
1777  if (InputSource::IsLumi == itemType) {
1778  iStatus.setNextSyncValue(IOVSyncValue(EventID(input_->run(), input_->luminosityBlock(), 0),
1779  input_->luminosityBlockAuxiliary()->beginTime()));
1780  }
1781  }
1782  if (InputSource::IsEvent != itemType) {
1783  iStatus.stopProcessingEvents();
1784 
1785  //IsFile may continue processing the lumi and
1786  // looper_ can cause the input source to declare a new IsRun which is actually
1787  // just a continuation of the previous run
1788  if (InputSource::IsStop == itemType or InputSource::IsLumi == itemType or
1789  (InputSource::IsRun == itemType and iStatus.lumiPrincipal()->run() != input_->run())) {
1790  iStatus.endLumi();
1791  }
1792  return false;
1793  }
1794  readEvent(iStreamIndex);
1795  } catch (...) {
1796  bool expected = false;
1797  if (deferredExceptionPtrIsSet_.compare_exchange_strong(expected, true)) {
1798  deferredExceptionPtr_ = std::current_exception();
1799  iStatus.endLumi();
1800  }
1801  return false;
1802  }
1803  return true;
1804  }

References CMS_SA_ALLOW, edm::LuminosityBlockProcessingStatus::continuingLumi(), deferredExceptionPtr_, deferredExceptionPtrIsSet_, edm::LuminosityBlockProcessingStatus::endLumi(), edm::LuminosityBlockProcessingStatus::haveContinuedLumi(), input_, edm::InputSource::IsEvent, edm::InputSource::IsLumi, edm::InputSource::IsRun, edm::InputSource::IsStop, lastSourceTransition_, edm::LuminosityBlockProcessingStatus::lumiPrincipal(), nextLuminosityBlockID(), nextTransitionType(), or, readAndMergeLumi(), readEvent(), serviceToken_, edm::LuminosityBlockProcessingStatus::setNextSyncValue(), shouldWeStop(), sourceMutex_, edm::LuminosityBlockProcessingStatus::stopProcessingEvents(), and edm::LuminosityBlockProcessingStatus::wasEventProcessingStopped().

Referenced by handleNextEventForStreamAsync().

◆ readRun()

std::pair< ProcessHistoryID, RunNumber_t > edm::EventProcessor::readRun ( )

Definition at line 1600 of file EventProcessor.cc.

1600  {
1602  throw edm::Exception(edm::errors::LogicError) << "EventProcessor::readRun\n"
1603  << "Illegal attempt to insert run into cache\n"
1604  << "Contact a Framework Developer\n";
1605  }
1606  auto rp = std::make_shared<RunPrincipal>(input_->runAuxiliary(),
1607  preg(),
1609  historyAppender_.get(),
1610  0,
1611  true,
1613  {
1614  SendSourceTerminationSignalIfException sentry(actReg_.get());
1615  input_->readRun(*rp, *historyAppender_);
1616  sentry.completedSuccessfully();
1617  }
1618  assert(input_->reducedProcessHistoryID() == rp->reducedProcessHistoryID());
1619  principalCache_.insert(rp);
1620  return std::make_pair(rp->reducedProcessHistoryID(), input_->run());
1621  }

References actReg_, cms::cuda::assert(), Exception, edm::PrincipalCache::hasRunPrincipal(), historyAppender_, input_, edm::PrincipalCache::insert(), edm::errors::LogicError, mergeableRunProductProcesses_, preg(), principalCache_, and processConfiguration_.

◆ respondToCloseInputFile()

void edm::EventProcessor::respondToCloseInputFile ( )

Definition at line 883 of file EventProcessor.cc.

883  {
884  if (fileBlockValid()) {
885  schedule_->respondToCloseInputFile(*fb_);
886  for_all(subProcesses_, [this](auto& subProcess) { subProcess.respondToCloseInputFile(*fb_); });
887  }
888  FDEBUG(1) << "\trespondToCloseInputFile\n";
889  }

References fb_, FDEBUG, fileBlockValid(), edm::for_all(), schedule_, and subProcesses_.

◆ respondToOpenInputFile()

void edm::EventProcessor::respondToOpenInputFile ( )

Definition at line 873 of file EventProcessor.cc.

873  {
874  if (fileBlockValid()) {
876  [this](auto& subProcess) { subProcess.updateBranchIDListHelper(branchIDListHelper_->branchIDLists()); });
877  schedule_->respondToOpenInputFile(*fb_);
878  for_all(subProcesses_, [this](auto& subProcess) { subProcess.respondToOpenInputFile(*fb_); });
879  }
880  FDEBUG(1) << "\trespondToOpenInputFile\n";
881  }

References branchIDListHelper_, fb_, FDEBUG, fileBlockValid(), edm::for_all(), schedule_, and subProcesses_.

◆ rewindInput()

void edm::EventProcessor::rewindInput ( )

Definition at line 916 of file EventProcessor.cc.

916  {
917  input_->repeat();
918  input_->rewind();
919  FDEBUG(1) << "\trewind\n";
920  }

References FDEBUG, and input_.

Referenced by runToCompletion().

◆ run()

EventProcessor::StatusCode edm::EventProcessor::run ( )
inline

◆ runToCompletion()

EventProcessor::StatusCode edm::EventProcessor::runToCompletion ( )

Definition at line 762 of file EventProcessor.cc.

762  {
765  {
766  beginJob(); //make sure this was called
767 
768  // make the services available
770 
772  try {
773  FilesProcessor fp(fileModeNoMerge_);
774 
775  convertException::wrap([&]() {
776  bool firstTime = true;
777  do {
778  if (not firstTime) {
780  rewindInput();
781  } else {
782  firstTime = false;
783  }
784  startingNewLoop();
785 
786  auto trans = fp.processFiles(*this);
787 
788  fp.normalEnd();
789 
790  if (deferredExceptionPtrIsSet_.load()) {
791  std::rethrow_exception(deferredExceptionPtr_);
792  }
793  if (trans != InputSource::IsStop) {
794  //problem with the source
795  doErrorStuff();
796 
797  throw cms::Exception("BadTransition") << "Unexpected transition change " << trans;
798  }
799  } while (not endOfLoop());
800  }); // convertException::wrap
801 
802  } // Try block
803  catch (cms::Exception& e) {
805  std::string message(
806  "Another exception was caught while trying to clean up lumis after the primary fatal exception.");
807  e.addAdditionalInfo(message);
808  if (e.alreadyPrinted()) {
809  LogAbsolute("Additional Exceptions") << message;
810  }
811  }
812  if (!exceptionMessageRuns_.empty()) {
813  e.addAdditionalInfo(exceptionMessageRuns_);
814  if (e.alreadyPrinted()) {
815  LogAbsolute("Additional Exceptions") << exceptionMessageRuns_;
816  }
817  }
818  if (!exceptionMessageFiles_.empty()) {
819  e.addAdditionalInfo(exceptionMessageFiles_);
820  if (e.alreadyPrinted()) {
821  LogAbsolute("Additional Exceptions") << exceptionMessageFiles_;
822  }
823  }
824  throw;
825  }
826  }
827 
828  return returnCode;
829  }

References asyncStopRequestedWhileProcessingEvents_, asyncStopStatusCodeFromProcessingEvents_, beginJob(), deferredExceptionPtr_, deferredExceptionPtrIsSet_, doErrorStuff(), MillePedeFileConverter_cfg::e, endOfLoop(), epSuccess, Exception, exceptionMessageFiles_, exceptionMessageLumis_, exceptionMessageRuns_, fileModeNoMerge_, personalPlayback::fp, edm::InputSource::IsStop, prepareForNextLoop(), runEdmFileComparison::returnCode, rewindInput(), serviceToken_, startingNewLoop(), AlCaHLTBitMon_QueryRunRegistry::string, and edm::convertException::wrap().

Referenced by PythonEventProcessor::run(), and run().

◆ setDeferredException()

bool edm::EventProcessor::setDeferredException ( std::exception_ptr  iException)

Definition at line 1983 of file EventProcessor.cc.

1983  {
1984  bool expected = false;
1985  if (deferredExceptionPtrIsSet_.compare_exchange_strong(expected, true)) {
1986  deferredExceptionPtr_ = iException;
1987  return true;
1988  }
1989  return false;
1990  }

References deferredExceptionPtr_, and deferredExceptionPtrIsSet_.

Referenced by handleEndLumiExceptions().

◆ setExceptionMessageFiles()

void edm::EventProcessor::setExceptionMessageFiles ( std::string &  message)

Definition at line 1977 of file EventProcessor.cc.

1977 { exceptionMessageFiles_ = message; }

References exceptionMessageFiles_.

◆ setExceptionMessageLumis()

void edm::EventProcessor::setExceptionMessageLumis ( )

Definition at line 1981 of file EventProcessor.cc.

1981 { exceptionMessageLumis_ = true; }

References exceptionMessageLumis_.

Referenced by handleEndLumiExceptions().

◆ setExceptionMessageRuns()

void edm::EventProcessor::setExceptionMessageRuns ( std::string &  message)

Definition at line 1979 of file EventProcessor.cc.

1979 { exceptionMessageRuns_ = message; }

References exceptionMessageRuns_.

◆ shouldWeCloseOutput()

bool edm::EventProcessor::shouldWeCloseOutput ( ) const

Definition at line 927 of file EventProcessor.cc.

927  {
928  FDEBUG(1) << "\tshouldWeCloseOutput\n";
929  if (!subProcesses_.empty()) {
930  for (auto const& subProcess : subProcesses_) {
931  if (subProcess.shouldWeCloseOutput()) {
932  return true;
933  }
934  }
935  return false;
936  }
937  return schedule_->shouldWeCloseOutput();
938  }

References FDEBUG, schedule_, and subProcesses_.

◆ shouldWeStop()

bool edm::EventProcessor::shouldWeStop ( ) const

Definition at line 1962 of file EventProcessor.cc.

1962  {
1963  FDEBUG(1) << "\tshouldWeStop\n";
1964  if (shouldWeStop_)
1965  return true;
1966  if (!subProcesses_.empty()) {
1967  for (auto const& subProcess : subProcesses_) {
1968  if (subProcess.terminate()) {
1969  return true;
1970  }
1971  }
1972  return false;
1973  }
1974  return schedule_->terminate();
1975  }

References FDEBUG, schedule_, shouldWeStop_, and subProcesses_.

Referenced by readNextEventForStream().

◆ startingNewLoop()

void edm::EventProcessor::startingNewLoop ( )

Definition at line 891 of file EventProcessor.cc.

891  {
892  shouldWeStop_ = false;
893  //NOTE: for first loop, need to delay calling 'doStartingNewLoop'
894  // until after we've called beginOfJob
895  if (looper_ && looperBeginJobRun_) {
896  looper_->doStartingNewLoop();
897  }
898  FDEBUG(1) << "\tstartingNewLoop\n";
899  }

References FDEBUG, looper_, looperBeginJobRun_, and shouldWeStop_.

Referenced by runToCompletion().

◆ streamEndLumiAsync()

void edm::EventProcessor::streamEndLumiAsync ( edm::WaitingTaskHolder  iTask,
unsigned int  iStreamIndex 
)

Definition at line 1535 of file EventProcessor.cc.

1535  {
1536  auto t = edm::make_waiting_task([this, iStreamIndex, iTask](std::exception_ptr const* iPtr) mutable {
1537  if (iPtr) {
1538  handleEndLumiExceptions(iPtr, iTask);
1539  }
1540  auto status = streamLumiStatus_[iStreamIndex];
1541  //reset status before releasing queue else get race condtion
1542  streamLumiStatus_[iStreamIndex].reset();
1544  streamQueues_[iStreamIndex].resume();
1545 
1546  //are we the last one?
1547  if (status->streamFinishedLumi()) {
1549  }
1550  });
1551 
1552  edm::WaitingTaskHolder lumiDoneTask{*iTask.group(), t};
1553 
1554  //Need to be sure the lumi status is released before lumiDoneTask can every be called.
1555  // therefore we do not want to hold the shared_ptr
1556  auto lumiStatus = streamLumiStatus_[iStreamIndex].get();
1557  lumiStatus->setEndTime();
1558 
1559  EventSetupImpl const& es = lumiStatus->eventSetupImpl(esp_->subProcessIndex());
1560 
1561  bool cleaningUpAfterException = lumiStatus->cleaningUpAfterException();
1562  auto eventSetupImpls = &lumiStatus->eventSetupImpls();
1563 
1564  if (lumiStatus->didGlobalBeginSucceed()) {
1565  auto& lumiPrincipal = *lumiStatus->lumiPrincipal();
1566  IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), EventID::maxEventNumber()),
1567  lumiPrincipal.endTime());
1568  using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamEnd>;
1569  LumiTransitionInfo transitionInfo(lumiPrincipal, es, eventSetupImpls);
1570  endStreamTransitionAsync<Traits>(std::move(lumiDoneTask),
1571  *schedule_,
1572  iStreamIndex,
1573  transitionInfo,
1574  serviceToken_,
1575  subProcesses_,
1576  cleaningUpAfterException);
1577  }
1578  }

References esp_, globalEndLumiAsync(), edm::WaitingTaskHolder::group(), handleEndLumiExceptions(), edm::make_waiting_task(), edm::EventID::maxEventNumber(), eostools::move(), schedule_, serviceToken_, mps_update::status, streamLumiActive_, streamLumiStatus_, streamQueues_, subProcesses_, and submitPVValidationJobs::t.

Referenced by beginLumiAsync(), endUnfinishedLumi(), and handleNextEventForStreamAsync().

◆ taskCleanup()

void edm::EventProcessor::taskCleanup ( )

Definition at line 553 of file EventProcessor.cc.

553  {
556  taskGroup_.wait();
557  assert(task.done());
558  }

References cms::cuda::assert(), espController_, TrackValidation_cff::task, and taskGroup_.

◆ thinnedAssociationsHelper() [1/2]

std::shared_ptr<ThinnedAssociationsHelper>& edm::EventProcessor::thinnedAssociationsHelper ( )
inlineprivate

Definition at line 299 of file EventProcessor.h.

299  {
301  }

References edm::get_underlying_safe(), and thinnedAssociationsHelper_.

◆ thinnedAssociationsHelper() [2/2]

std::shared_ptr<ThinnedAssociationsHelper const> edm::EventProcessor::thinnedAssociationsHelper ( ) const
inlineprivate

Definition at line 296 of file EventProcessor.h.

296  {
298  }

References edm::get_underlying_safe(), and thinnedAssociationsHelper_.

Referenced by init().

◆ totalEvents()

int edm::EventProcessor::totalEvents ( ) const

Return the number of events this EventProcessor has tried to process (inclues both successes and failures, including failures due to exceptions during processing).

Definition at line 703 of file EventProcessor.cc.

703 { return schedule_->totalEvents(); }

References schedule_.

Referenced by PythonEventProcessor::totalEvents().

◆ totalEventsFailed()

int edm::EventProcessor::totalEventsFailed ( ) const

Return the number of events that have not passed any trigger. (N.B. totalEventsFailed() + totalEventsPassed() == totalEvents()

Definition at line 707 of file EventProcessor.cc.

707 { return schedule_->totalEventsFailed(); }

References schedule_.

Referenced by PythonEventProcessor::totalEventsFailed().

◆ totalEventsPassed()

int edm::EventProcessor::totalEventsPassed ( ) const

Return the number of events processed by this EventProcessor which have been passed by one or more trigger paths.

Definition at line 705 of file EventProcessor.cc.

705 { return schedule_->totalEventsPassed(); }

References schedule_.

Referenced by PythonEventProcessor::totalEventsPassed().

◆ warnAboutModulesRequiringLuminosityBLockSynchronization()

void edm::EventProcessor::warnAboutModulesRequiringLuminosityBLockSynchronization ( ) const
private

Definition at line 1992 of file EventProcessor.cc.

1992  {
1993  std::unique_ptr<LogSystem> s;
1994  for (auto worker : schedule_->allWorkers()) {
1995  if (worker->wantsGlobalLuminosityBlocks() and worker->globalLuminosityBlocksQueue()) {
1996  if (not s) {
1997  s = std::make_unique<LogSystem>("ModulesSynchingOnLumis");
1998  (*s) << "The following modules require synchronizing on LuminosityBlock boundaries:";
1999  }
2000  (*s) << "\n " << worker->description()->moduleName() << " " << worker->description()->moduleLabel();
2001  }
2002  }
2003  }

References alignCSCRings::s, and schedule_.

Referenced by beginJob().

◆ writeLumiAsync()

void edm::EventProcessor::writeLumiAsync ( WaitingTaskHolder  task,
LuminosityBlockPrincipal lumiPrincipal 
)

Definition at line 1718 of file EventProcessor.cc.

1718  {
1719  auto subsT = edm::make_waiting_task([this, task, &lumiPrincipal](std::exception_ptr const* iExcept) mutable {
1720  if (iExcept) {
1721  task.doneWaiting(*iExcept);
1722  } else {
1724  for (auto& s : subProcesses_) {
1725  s.writeLumiAsync(task, lumiPrincipal);
1726  }
1727  }
1728  });
1730 
1731  lumiPrincipal.runPrincipal().mergeableRunProductMetadata()->writeLumi(lumiPrincipal.luminosityBlock());
1732 
1733  schedule_->writeLumiAsync(WaitingTaskHolder{*task.group(), subsT}, lumiPrincipal, &processContext_, actReg_.get());
1734  }

References actReg_, edm::LuminosityBlockPrincipal::luminosityBlock(), edm::make_waiting_task(), edm::RunPrincipal::mergeableRunProductMetadata(), processContext_, edm::LuminosityBlockPrincipal::runPrincipal(), alignCSCRings::s, schedule_, serviceToken_, subProcesses_, TrackValidation_cff::task, and edm::MergeableRunProductMetadata::writeLumi().

Referenced by globalEndLumiAsync().

◆ writeProcessBlockAsync()

void edm::EventProcessor::writeProcessBlockAsync ( WaitingTaskHolder  task,
ProcessBlockType  processBlockType 
)

Definition at line 1671 of file EventProcessor.cc.

1671  {
1672  auto subsT = edm::make_waiting_task([this, task, processBlockType](std::exception_ptr const* iExcept) mutable {
1673  if (iExcept) {
1674  task.doneWaiting(*iExcept);
1675  } else {
1677  for (auto& s : subProcesses_) {
1678  s.writeProcessBlockAsync(task, processBlockType);
1679  }
1680  }
1681  });
1683  schedule_->writeProcessBlockAsync(WaitingTaskHolder(*task.group(), subsT),
1684  principalCache_.processBlockPrincipal(processBlockType),
1685  &processContext_,
1686  actReg_.get());
1687  }

References actReg_, edm::make_waiting_task(), principalCache_, edm::PrincipalCache::processBlockPrincipal(), processContext_, alignCSCRings::s, schedule_, serviceToken_, subProcesses_, and TrackValidation_cff::task.

Referenced by endProcessBlock(), and inputProcessBlocks().

◆ writeRunAsync()

void edm::EventProcessor::writeRunAsync ( WaitingTaskHolder  task,
ProcessHistoryID const &  phid,
RunNumber_t  run,
MergeableRunProductMetadata const *  mergeableRunProductMetadata 
)

Definition at line 1689 of file EventProcessor.cc.

1692  {
1693  auto subsT = edm::make_waiting_task(
1694  [this, phid, run, task, mergeableRunProductMetadata](std::exception_ptr const* iExcept) mutable {
1695  if (iExcept) {
1696  task.doneWaiting(*iExcept);
1697  } else {
1699  for (auto& s : subProcesses_) {
1700  s.writeRunAsync(task, phid, run, mergeableRunProductMetadata);
1701  }
1702  }
1703  });
1705  schedule_->writeRunAsync(WaitingTaskHolder(*task.group(), subsT),
1707  &processContext_,
1708  actReg_.get(),
1709  mergeableRunProductMetadata);
1710  }

References actReg_, edm::make_waiting_task(), principalCache_, processContext_, run(), edm::PrincipalCache::runPrincipal(), alignCSCRings::s, schedule_, serviceToken_, subProcesses_, and TrackValidation_cff::task.

Referenced by endUnfinishedRun().

Member Data Documentation

◆ act_table_

std::unique_ptr<ExceptionToActionTable const> edm::EventProcessor::act_table_
private

Definition at line 326 of file EventProcessor.h.

Referenced by init().

◆ actReg_

std::shared_ptr<ActivityRegistry> edm::EventProcessor::actReg_
private

◆ asyncStopRequestedWhileProcessingEvents_

bool edm::EventProcessor::asyncStopRequestedWhileProcessingEvents_
private

Definition at line 362 of file EventProcessor.h.

Referenced by runToCompletion().

◆ asyncStopStatusCodeFromProcessingEvents_

StatusCode edm::EventProcessor::asyncStopStatusCodeFromProcessingEvents_
private

Definition at line 363 of file EventProcessor.h.

Referenced by runToCompletion().

◆ beginJobCalled_

bool edm::EventProcessor::beginJobCalled_
private

Definition at line 350 of file EventProcessor.h.

Referenced by beginJob().

◆ branchIDListHelper_

edm::propagate_const<std::shared_ptr<BranchIDListHelper> > edm::EventProcessor::branchIDListHelper_
private

Definition at line 318 of file EventProcessor.h.

Referenced by branchIDListHelper(), init(), and respondToOpenInputFile().

◆ deferredExceptionPtr_

std::exception_ptr edm::EventProcessor::deferredExceptionPtr_
private

◆ deferredExceptionPtrIsSet_

std::atomic<bool> edm::EventProcessor::deferredExceptionPtrIsSet_
private

◆ deleteNonConsumedUnscheduledModules_

bool edm::EventProcessor::deleteNonConsumedUnscheduledModules_ = true
private

Definition at line 371 of file EventProcessor.h.

Referenced by beginJob(), and init().

◆ esp_

edm::propagate_const<std::shared_ptr<eventsetup::EventSetupProvider> > edm::EventProcessor::esp_
private

◆ espController_

edm::propagate_const<std::unique_ptr<eventsetup::EventSetupsController> > edm::EventProcessor::espController_
private

◆ eventSetupDataToExcludeFromPrefetching_

ExcludedDataMap edm::EventProcessor::eventSetupDataToExcludeFromPrefetching_
private

Definition at line 368 of file EventProcessor.h.

◆ exceptionMessageFiles_

std::string edm::EventProcessor::exceptionMessageFiles_
private

Definition at line 353 of file EventProcessor.h.

Referenced by runToCompletion(), and setExceptionMessageFiles().

◆ exceptionMessageLumis_

std::atomic<bool> edm::EventProcessor::exceptionMessageLumis_
private

Definition at line 355 of file EventProcessor.h.

Referenced by runToCompletion(), and setExceptionMessageLumis().

◆ exceptionMessageRuns_

std::string edm::EventProcessor::exceptionMessageRuns_
private

Definition at line 354 of file EventProcessor.h.

Referenced by runToCompletion(), and setExceptionMessageRuns().

◆ fb_

edm::propagate_const<std::unique_ptr<FileBlock> > edm::EventProcessor::fb_
private

◆ fileModeNoMerge_

bool edm::EventProcessor::fileModeNoMerge_
private

Definition at line 352 of file EventProcessor.h.

Referenced by init(), and runToCompletion().

◆ firstEventInBlock_

bool edm::EventProcessor::firstEventInBlock_ = true
private

Definition at line 364 of file EventProcessor.h.

◆ forceESCacheClearOnNewRun_

bool edm::EventProcessor::forceESCacheClearOnNewRun_
private

Definition at line 358 of file EventProcessor.h.

Referenced by beginRun(), and init().

◆ forceLooperToEnd_

bool edm::EventProcessor::forceLooperToEnd_
private

Definition at line 356 of file EventProcessor.h.

Referenced by endOfLoop().

◆ historyAppender_

edm::propagate_const<std::unique_ptr<HistoryAppender> > edm::EventProcessor::historyAppender_
private

Definition at line 338 of file EventProcessor.h.

Referenced by init(), readLuminosityBlock(), and readRun().

◆ input_

edm::propagate_const<std::unique_ptr<InputSource> > edm::EventProcessor::input_
private

◆ lastSourceTransition_

InputSource::ItemType edm::EventProcessor::lastSourceTransition_
private

◆ looper_

edm::propagate_const<std::shared_ptr<EDLooperBase> > edm::EventProcessor::looper_
private

◆ looperBeginJobRun_

bool edm::EventProcessor::looperBeginJobRun_
private

Definition at line 357 of file EventProcessor.h.

Referenced by beginRun(), and startingNewLoop().

◆ lumiQueue_

std::unique_ptr<edm::LimitedTaskQueue> edm::EventProcessor::lumiQueue_
private

Definition at line 333 of file EventProcessor.h.

Referenced by beginLumiAsync(), and init().

◆ mergeableRunProductProcesses_

MergeableRunProductProcesses edm::EventProcessor::mergeableRunProductProcesses_
private

Definition at line 330 of file EventProcessor.h.

Referenced by init(), and readRun().

◆ pathsAndConsumesOfModules_

PathsAndConsumesOfModules edm::EventProcessor::pathsAndConsumesOfModules_
private

Definition at line 329 of file EventProcessor.h.

Referenced by beginJob().

◆ preallocations_

PreallocationConfiguration edm::EventProcessor::preallocations_
private

◆ preg_

edm::propagate_const<std::shared_ptr<ProductRegistry> > edm::EventProcessor::preg_
private

Definition at line 317 of file EventProcessor.h.

Referenced by beginJob(), endOfLoop(), init(), preg(), and readFile().

◆ principalCache_

PrincipalCache edm::EventProcessor::principalCache_
private

◆ printDependencies_

bool edm::EventProcessor::printDependencies_ = false
private

Definition at line 370 of file EventProcessor.h.

Referenced by beginJob(), and init().

◆ processConfiguration_

std::shared_ptr<ProcessConfiguration const> edm::EventProcessor::processConfiguration_
private

◆ processContext_

ProcessContext edm::EventProcessor::processContext_
private

◆ queueWhichWaitsForIOVsToFinish_

edm::SerialTaskQueue edm::EventProcessor::queueWhichWaitsForIOVsToFinish_
private

Definition at line 325 of file EventProcessor.h.

Referenced by beginLumiAsync(), and globalEndLumiAsync().

◆ schedule_

edm::propagate_const<std::unique_ptr<Schedule> > edm::EventProcessor::schedule_
private

◆ serviceToken_

ServiceToken edm::EventProcessor::serviceToken_
private

◆ shouldWeStop_

bool edm::EventProcessor::shouldWeStop_
private

Definition at line 351 of file EventProcessor.h.

Referenced by processEventWithLooper(), shouldWeStop(), and startingNewLoop().

◆ sourceMutex_

std::shared_ptr<std::recursive_mutex> edm::EventProcessor::sourceMutex_
private

Definition at line 348 of file EventProcessor.h.

Referenced by readNextEventForStream().

◆ sourceResourcesAcquirer_

SharedResourcesAcquirer edm::EventProcessor::sourceResourcesAcquirer_
private

Definition at line 347 of file EventProcessor.h.

Referenced by beginLumiAsync(), and handleNextEventForStreamAsync().

◆ streamLumiActive_

std::atomic<unsigned int> edm::EventProcessor::streamLumiActive_ {0}
private

◆ streamLumiStatus_

std::vector<std::shared_ptr<LuminosityBlockProcessingStatus> > edm::EventProcessor::streamLumiStatus_
private

◆ streamQueues_

std::vector<edm::SerialTaskQueue> edm::EventProcessor::streamQueues_
private

Definition at line 332 of file EventProcessor.h.

Referenced by beginLumiAsync(), init(), and streamEndLumiAsync().

◆ subProcesses_

std::vector<SubProcess> edm::EventProcessor::subProcesses_
private

◆ taskGroup_

tbb::task_group edm::EventProcessor::taskGroup_
private

◆ thinnedAssociationsHelper_

edm::propagate_const<std::shared_ptr<ThinnedAssociationsHelper> > edm::EventProcessor::thinnedAssociationsHelper_
private

Definition at line 319 of file EventProcessor.h.

Referenced by init(), and thinnedAssociationsHelper().

edm::EventProcessor::looperBeginJobRun_
bool looperBeginJobRun_
Definition: EventProcessor.h:357
edm::SharedResourcesAcquirer::serialQueueChain
SerialTaskQueueChain & serialQueueChain() const
Definition: SharedResourcesAcquirer.h:54
edm::pset::Registry::instance
static Registry * instance()
Definition: Registry.cc:12
edm::EventProcessor::deleteRunFromCache
void deleteRunFromCache(ProcessHistoryID const &phid, RunNumber_t run)
Definition: EventProcessor.cc:1712
edm::EDLooperBase::Status
Status
Definition: EDLooperBase.h:80
edm::EventProcessor::historyAppender_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
Definition: EventProcessor.h:338
bk::beginJob
void beginJob()
Definition: Breakpoints.cc:14
edm::EventProcessor::thinnedAssociationsHelper
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
Definition: EventProcessor.h:296
processOptions_cff.fileMode
fileMode
Definition: processOptions_cff.py:5
edm::EventProcessor::rewindInput
void rewindInput()
Definition: EventProcessor.cc:916
edm::EventProcessor::eventSetupDataToExcludeFromPrefetching_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
Definition: EventProcessor.h:368
edm::PrincipalCache::ProcessBlockType::New
mps_fire.i
i
Definition: mps_fire.py:428
edm::PreallocationConfiguration::numberOfRuns
unsigned int numberOfRuns() const
Definition: PreallocationConfiguration.h:37
edm::EventProcessor::preg_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
Definition: EventProcessor.h:317
edm::popSubProcessVParameterSet
std::vector< ParameterSet > popSubProcessVParameterSet(ParameterSet &parameterSet)
Definition: SubProcess.cc:761
edm::EventProcessor::writeRunAsync
void writeRunAsync(WaitingTaskHolder, ProcessHistoryID const &phid, RunNumber_t run, MergeableRunProductMetadata const *)
Definition: EventProcessor.cc:1689
edm::EventProcessor::StatusCode
StatusCode
Definition: EventProcessor.h:74
edm::EventProcessor::endOfLoop
bool endOfLoop()
Definition: EventProcessor.cc:901
cms::Exception::addContext
void addContext(std::string const &context)
Definition: Exception.cc:165
edm::EventProcessor::input_
edm::propagate_const< std::unique_ptr< InputSource > > input_
Definition: EventProcessor.h:321
edm::EventProcessor::getToken
ServiceToken getToken()
Definition: EventProcessor.cc:697
edm::EventProcessor::startingNewLoop
void startingNewLoop()
Definition: EventProcessor.cc:891
edm::ParameterSet::getUntrackedParameterSet
ParameterSet getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
Definition: ParameterSet.cc:2136
edm::errors::LogicError
Definition: EDMException.h:37
edm::validateTopLevelParameterSets
void validateTopLevelParameterSets(ParameterSet *processParameterSet)
Definition: validateTopLevelParameterSets.cc:93
mps_update.status
status
Definition: mps_update.py:69
edm::PreallocationConfiguration::numberOfThreads
unsigned int numberOfThreads() const
Definition: PreallocationConfiguration.h:34
edm::EventProcessor::deleteLumiFromCache
void deleteLumiFromCache(LuminosityBlockProcessingStatus &)
Definition: EventProcessor.cc:1736
edm::SerialTaskQueue::resume
bool resume()
Resumes processing if the queue was paused.
Definition: SerialTaskQueue.cc:58
edmLumisInFiles.description
description
Definition: edmLumisInFiles.py:11
edm::PrincipalCache::setProcessHistoryRegistry
void setProcessHistoryRegistry(ProcessHistoryRegistry const &phr)
Definition: PrincipalCache.h:87
edm::eventsetup::synchronousEventSetupForInstance
void synchronousEventSetupForInstance(IOVSyncValue const &syncValue, tbb::task_group &iGroup, eventsetup::EventSetupsController &espController)
Definition: EventSetupsController.cc:414
edm::TerminationOrigin::ExternalSignal
edm::EventProcessor::globalEndLumiAsync
void globalEndLumiAsync(edm::WaitingTaskHolder iTask, std::shared_ptr< LuminosityBlockProcessingStatus > iLumiStatus)
Definition: EventProcessor.cc:1442
edm::ParentageRegistry::instance
static ParentageRegistry * instance()
Definition: ParentageRegistry.cc:4
edm::EventProcessor::runToCompletion
StatusCode runToCompletion()
Definition: EventProcessor.cc:762
edm::EventProcessor::esp_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
Definition: EventProcessor.h:324
edm::EventProcessor::shouldWeStop_
bool shouldWeStop_
Definition: EventProcessor.h:351
edm::EventProcessor::readEvent
void readEvent(unsigned int iStreamIndex)
Definition: EventProcessor.cc:1863
edm::EventProcessor::epTimedOut
Definition: EventProcessor.h:80
cms::cuda::assert
assert(be >=bs)
edm::ParentageRegistry::clear
void clear()
Not thread safe.
Definition: ParentageRegistry.cc:28
edm::second
U second(std::pair< T, U > const &p)
Definition: ParameterSet.cc:222
edm::EventProcessor::deferredExceptionPtr_
std::exception_ptr deferredExceptionPtr_
Definition: EventProcessor.h:345
info
static const TGPicture * info(bool iBackgroundIsBlack)
Definition: FWCollectionSummaryWidget.cc:153
edm::fillLooper
std::shared_ptr< EDLooperBase > fillLooper(eventsetup::EventSetupsController &esController, eventsetup::EventSetupProvider &cp, ParameterSet &params)
Definition: EventProcessor.cc:210
personalPlayback.fp
fp
Definition: personalPlayback.py:523
edm::EventProcessor::lumiQueue_
std::unique_ptr< edm::LimitedTaskQueue > lumiQueue_
Definition: EventProcessor.h:333
edm::get_underlying_safe
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
Definition: get_underlying_safe.h:41
edm::EventProcessor::processContext_
ProcessContext processContext_
Definition: EventProcessor.h:328
edm::EventProcessor::lastSourceTransition_
InputSource::ItemType lastSourceTransition_
Definition: EventProcessor.h:322
edm::SerialTaskQueue::push
void push(tbb::task_group &, const T &iAction)
asynchronously pushes functor iAction into queue
Definition: SerialTaskQueue.h:167
edm::LogInfo
Log< level::Info, false > LogInfo
Definition: MessageLogger.h:125
edm::WaitingTaskHolder::doneWaiting
void doneWaiting(std::exception_ptr iExcept)
Definition: WaitingTaskHolder.h:93
edm::EventProcessor::pathsAndConsumesOfModules_
PathsAndConsumesOfModules pathsAndConsumesOfModules_
Definition: EventProcessor.h:329
createJobs.tmp
tmp
align.sh
Definition: createJobs.py:716
edm::EventProcessor::exceptionMessageFiles_
std::string exceptionMessageFiles_
Definition: EventProcessor.h:353
edm::makeInput
std::unique_ptr< InputSource > makeInput(ParameterSet &params, CommonParams const &common, std::shared_ptr< ProductRegistry > preg, std::shared_ptr< BranchIDListHelper > branchIDListHelper, std::shared_ptr< ThinnedAssociationsHelper > thinnedAssociationsHelper, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, PreallocationConfiguration const &allocations)
Definition: EventProcessor.cc:120
edm::EventProcessor::asyncStopRequestedWhileProcessingEvents_
bool asyncStopRequestedWhileProcessingEvents_
Definition: EventProcessor.h:362
edm::PrincipalCache::runPrincipalPtr
std::shared_ptr< RunPrincipal > const & runPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run) const
Definition: PrincipalCache.cc:28
edm::InRun
Definition: BranchType.h:11
CMS_SA_ALLOW
#define CMS_SA_ALLOW
Definition: thread_safety_macros.h:5
edm::PathsAndConsumesOfModules::initialize
void initialize(Schedule const *, std::shared_ptr< ProductRegistry const >)
Definition: PathsAndConsumesOfModules.cc:16
runTheMatrix.nStreams
nStreams
Definition: runTheMatrix.py:371
mps_monitormerge.items
list items
Definition: mps_monitormerge.py:29
edm::WaitingTaskHolder::taskHasFailed
bool taskHasFailed() const noexcept
Definition: WaitingTaskHolder.h:71
groupFilesInBlocks.reverse
reverse
Definition: groupFilesInBlocks.py:131
edm::InputSource::IsRun
Definition: InputSource.h:78
edm::for_all
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
edm::ProcessingController::kToPreviousEvent
Definition: ProcessingController.h:58
edm::parameterSet
ParameterSet const & parameterSet(StableProvenance const &provenance, ProcessHistory const &history)
Definition: Provenance.cc:11
edm::EventProcessor::processEventAsyncImpl
void processEventAsyncImpl(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
Definition: EventProcessor.cc:1881
edm::first
T first(std::pair< T, U > const &p)
Definition: ParameterSet.cc:217
edm::PreallocationConfiguration::numberOfLuminosityBlocks
unsigned int numberOfLuminosityBlocks() const
Definition: PreallocationConfiguration.h:36
edm::InProcess
Definition: BranchType.h:11
edm::PrincipalCache::runPrincipal
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const
Definition: PrincipalCache.cc:21
edm::pset::Registry::clear
void clear()
Not thread safe.
Definition: Registry.cc:40
edm::ProcessContext::setProcessConfiguration
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
Definition: ProcessContext.cc:19
alignCSCRings.s
s
Definition: alignCSCRings.py:92
edm::EventProcessor::exceptionMessageLumis_
std::atomic< bool > exceptionMessageLumis_
Definition: EventProcessor.h:355
edm::ProcessingController::ForwardState
ForwardState
Definition: ProcessingController.h:31
edm::EventProcessor::readAndMergeLumi
int readAndMergeLumi(LuminosityBlockProcessingStatus &)
Definition: EventProcessor.cc:1654
edm::EventProcessor::queueWhichWaitsForIOVsToFinish_
edm::SerialTaskQueue queueWhichWaitsForIOVsToFinish_
Definition: EventProcessor.h:325
edm::EventProcessor::epSignal
Definition: EventProcessor.h:78
std::swap
void swap(edm::DataFrameContainer &lhs, edm::DataFrameContainer &rhs)
Definition: DataFrameContainer.h:209
DQM.reader
reader
Definition: DQM.py:105
edm::PrincipalCache::merge
void merge(std::shared_ptr< RunAuxiliary > aux, std::shared_ptr< ProductRegistry const > reg)
Definition: PrincipalCache.cc:54
edm::EventProcessor::streamQueues_
std::vector< edm::SerialTaskQueue > streamQueues_
Definition: EventProcessor.h:332
edm::EventProcessor::epInputComplete
Definition: EventProcessor.h:79
edm::convertException::wrap
auto wrap(F iFunc) -> decltype(iFunc())
Definition: ConvertException.h:19
ScheduleInfo
edm::EventProcessor::readLuminosityBlock
void readLuminosityBlock(LuminosityBlockProcessingStatus &)
Definition: EventProcessor.cc:1635
edm::InputSource::IsSynchronize
Definition: InputSource.h:78
TrackValidation_cff.task
task
Definition: TrackValidation_cff.py:252
edm::EventProcessor::sourceMutex_
std::shared_ptr< std::recursive_mutex > sourceMutex_
Definition: EventProcessor.h:348
edm::RunPrincipal::setEndTime
void setEndTime(Timestamp const &time)
Definition: RunPrincipal.h:71
edm::EventProcessor::beginJobCalled_
bool beginJobCalled_
Definition: EventProcessor.h:350
edm::ProcessingController::kToSpecifiedEvent
Definition: ProcessingController.h:58
Service
edm::checkForModuleDependencyCorrectness
void checkForModuleDependencyCorrectness(edm::PathsAndConsumesOfModulesBase const &iPnC, bool iPrintDependencies)
Definition: PathsAndConsumesOfModules.cc:269
LuminosityBlock
WaitingTaskHolder
edm::EventProcessor::fileModeNoMerge_
bool fileModeNoMerge_
Definition: EventProcessor.h:352
Event
IOVSyncValue
edm::EventProcessor::writeProcessBlockAsync
void writeProcessBlockAsync(WaitingTaskHolder, ProcessBlockType)
Definition: EventProcessor.cc:1671
edm::EventProcessor::warnAboutModulesRequiringLuminosityBLockSynchronization
void warnAboutModulesRequiringLuminosityBLockSynchronization() const
Definition: EventProcessor.cc:1992
edm::InEvent
Definition: BranchType.h:11
h
runTheMatrix.nThreads
nThreads
Definition: runTheMatrix.py:370
edm::IllegalParameters::setThrowAnException
static void setThrowAnException(bool v)
Definition: IllegalParameters.h:16
edm::EventProcessor::actReg_
std::shared_ptr< ActivityRegistry > actReg_
Definition: EventProcessor.h:316
edm::EventID::maxEventNumber
static EventNumber_t maxEventNumber()
Definition: EventID.h:96
edm::EventProcessor::exceptionMessageRuns_
std::string exceptionMessageRuns_
Definition: EventProcessor.h:354
edm::PrincipalCache::preReadFile
void preReadFile()
Definition: PrincipalCache.cc:149
edm::SharedResourcesRegistry::instance
static SharedResourcesRegistry * instance()
Definition: SharedResourcesRegistry.cc:25
AlCaHLTBitMon_QueryRunRegistry.string
string
Definition: AlCaHLTBitMon_QueryRunRegistry.py:256
edm::EventProcessor::printDependencies_
bool printDependencies_
Definition: EventProcessor.h:370
edm::EventProcessor::preallocations_
PreallocationConfiguration preallocations_
Definition: EventProcessor.h:360
edm::SerialTaskQueue::pause
bool pause()
Pauses processing of additional tasks from the queue.
Definition: SerialTaskQueue.h:99
edm::SubProcess::doEndJob
void doEndJob()
Definition: SubProcess.cc:280
summarizeEdmComparisonLogfiles.succeeded
succeeded
Definition: summarizeEdmComparisonLogfiles.py:101
edm::EventProcessor::act_table_
std::unique_ptr< ExceptionToActionTable const > act_table_
Definition: EventProcessor.h:326
edm::EventProcessor::deleteNonConsumedUnscheduledModules_
bool deleteNonConsumedUnscheduledModules_
Definition: EventProcessor.h:371
edm::serviceregistry::kConfigurationOverrides
Definition: ServiceLegacy.h:29
edm::ServiceRegistry::Operate
friend class Operate
Definition: ServiceRegistry.h:54
edm::EventProcessor::beginLumiAsync
void beginLumiAsync(edm::IOVSyncValue const &iSyncValue, std::shared_ptr< void > const &iRunResource, edm::WaitingTaskHolder iHolder)
Definition: EventProcessor.cc:1236
edm::EventProcessor::espController_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
Definition: EventProcessor.h:323
edm::EventProcessor::streamLumiStatus_
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
Definition: EventProcessor.h:334
edm::make_waiting_task
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:101
ParameterSet
Definition: Functions.h:16
edm::WaitingTaskHolder
Definition: WaitingTaskHolder.h:32
edm::InLumi
Definition: BranchType.h:11
FDEBUG
#define FDEBUG(lev)
Definition: DebugMacros.h:19
edm::InputSource::IsLumi
Definition: InputSource.h:78
edm::EventProcessor::streamEndLumiAsync
void streamEndLumiAsync(edm::WaitingTaskHolder iTask, unsigned int iStreamIndex)
Definition: EventProcessor.cc:1535
edm::shutdown_flag
volatile std::atomic< bool > shutdown_flag
Definition: UnixSignalHandlers.cc:22
edm::EventProcessor::epException
Definition: EventProcessor.h:76
edm::EventProcessor::serviceToken_
ServiceToken serviceToken_
Definition: EventProcessor.h:320
edm::serviceregistry::kOverlapIsError
Definition: ServiceLegacy.h:29
edm::PrincipalCache::deleteRun
void deleteRun(ProcessHistoryID const &phid, RunNumber_t run)
Definition: PrincipalCache.cc:110
edm::EventProcessor::lastTransitionType
InputSource::ItemType lastTransitionType() const
Definition: EventProcessor.h:192
edm::Service
Definition: Service.h:30
edm::InputSource::doEndJob
void doEndJob()
Called by framework at end of job.
Definition: InputSource.cc:207
edm::EventProcessor::thinnedAssociationsHelper_
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
Definition: EventProcessor.h:319
edm::EventProcessor::readNextEventForStream
bool readNextEventForStream(unsigned int iStreamIndex, LuminosityBlockProcessingStatus &iLumiStatus)
Definition: EventProcessor.cc:1744
runEdmFileComparison.returnCode
returnCode
Definition: runEdmFileComparison.py:263
edm::SerialTaskQueueChain::push
void push(tbb::task_group &iGroup, T &&iAction)
asynchronously pushes functor iAction into queue
Definition: SerialTaskQueueChain.h:75
edm::LogAbsolute
Log< level::System, true > LogAbsolute
Definition: MessageLogger.h:134
edm::EventProcessor::forceLooperToEnd_
bool forceLooperToEnd_
Definition: EventProcessor.h:356
edm::EventProcessor::epCountComplete
Definition: EventProcessor.h:81
edm::PrincipalCache::insertForInput
void insertForInput(std::unique_ptr< ProcessBlockPrincipal >)
Definition: PrincipalCache.cc:98
edm::InputSource::IsStop
Definition: InputSource.h:78
edm::PrincipalCache::adjustEventsToNewProductRegistry
void adjustEventsToNewProductRegistry(std::shared_ptr< ProductRegistry const > reg)
Definition: PrincipalCache.cc:127
edm::EventProcessor::fb_
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
Definition: EventProcessor.h:340
edm::EDLooperBase::kContinue
Definition: EDLooperBase.h:80
edm::EventProcessor::epOther
Definition: EventProcessor.h:77
edm::EDLooperBase::endOfJob
virtual void endOfJob()
Definition: EDLooperBase.cc:101
edm::LogError
Log< level::Error, false > LogError
Definition: MessageLogger.h:123
ServiceToken
edm::EventProcessor::streamLumiActive_
std::atomic< unsigned int > streamLumiActive_
Definition: EventProcessor.h:335
edm::EventProcessor::mergeableRunProductProcesses_
MergeableRunProductProcesses mergeableRunProductProcesses_
Definition: EventProcessor.h:330
edm::FinalWaitingTask
Definition: WaitingTask.h:76
cmsLHEtoEOSManager.l
l
Definition: cmsLHEtoEOSManager.py:204
edm::PrincipalCache::inputProcessBlockPrincipal
ProcessBlockPrincipal & inputProcessBlockPrincipal() const
Definition: PrincipalCache.h:55
edm::EventProcessor::asyncStopStatusCodeFromProcessingEvents_
StatusCode asyncStopStatusCodeFromProcessingEvents_
Definition: EventProcessor.h:363
edm::LimitedTaskQueue::Resumer
Definition: LimitedTaskQueue.h:57
edm::EventProcessor::looper_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
Definition: EventProcessor.h:341
edm::ProcessBlockPrincipal::fillProcessBlockPrincipal
void fillProcessBlockPrincipal(std::string const &processName, DelayedReader *reader=nullptr)
Definition: ProcessBlockPrincipal.cc:16
edm::EventProcessor::sourceResourcesAcquirer_
SharedResourcesAcquirer sourceResourcesAcquirer_
Definition: EventProcessor.h:347
edm::EventProcessor::setDeferredException
bool setDeferredException(std::exception_ptr)
Definition: EventProcessor.cc:1983
edm::InputSource::ItemType
ItemType
Definition: InputSource.h:78
edm::EventProcessor::continueLumiAsync
void continueLumiAsync(edm::WaitingTaskHolder iHolder)
Definition: EventProcessor.cc:1416
edm::EventProcessor::doErrorStuff
void doErrorStuff()
Definition: EventProcessor.cc:940
ModuleChanger
edm::PrincipalCache::ProcessBlockType::Input
edm::ParentageRegistry::insertMapped
bool insertMapped(value_type const &v)
Definition: ParentageRegistry.cc:24
eostools.move
def move(src, dest)
Definition: eostools.py:511
edm::RunPrincipal::mergeableRunProductMetadata
MergeableRunProductMetadata * mergeableRunProductMetadata()
Definition: RunPrincipal.h:81
edm::nonConsumedUnscheduledModules
std::vector< ModuleDescription const * > nonConsumedUnscheduledModules(edm::PathsAndConsumesOfModulesBase const &iPnC, std::vector< ModuleProcessName > &consumedByChildren)
Definition: PathsAndConsumesOfModules.cc:165
edm::EventProcessor::taskGroup_
tbb::task_group taskGroup_
Definition: EventProcessor.h:314
edm::EventProcessor::principalCache_
PrincipalCache principalCache_
Definition: EventProcessor.h:349
ProcessingController
edm::PathsAndConsumesOfModules::removeModules
void removeModules(std::vector< ModuleDescription const * > const &modules)
Definition: PathsAndConsumesOfModules.cc:53
edm::EventProcessor::branchIDListHelper
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
Definition: EventProcessor.h:292
edm::PrincipalCache::getAvailableLumiPrincipalPtr
std::shared_ptr< LuminosityBlockPrincipal > getAvailableLumiPrincipalPtr()
Definition: PrincipalCache.cc:50
edm::EventProcessor::deferredExceptionPtrIsSet_
std::atomic< bool > deferredExceptionPtrIsSet_
Definition: EventProcessor.h:344
edm::PrincipalCache::hasRunPrincipal
bool hasRunPrincipal() const
Definition: PrincipalCache.h:66
edm::EventProcessor::epSuccess
Definition: EventProcessor.h:75
ev
bool ev
Definition: Hydjet2Hadronizer.cc:95
edm::EventProcessor::processConfiguration_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
Definition: EventProcessor.h:327
Exception
Definition: hltDiff.cc:245
edm::EventProcessor::run
StatusCode run()
Definition: EventProcessor.h:376
edm::MergeableRunProductMetadata::preWriteRun
void preWriteRun()
Definition: MergeableRunProductMetadata.cc:125
edm::EventProcessor::handleEndLumiExceptions
void handleEndLumiExceptions(std::exception_ptr const *iPtr, WaitingTaskHolder &holder)
Definition: EventProcessor.cc:1433
or
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
edm::EventProcessor::processEventWithLooper
void processEventWithLooper(EventPrincipal &, unsigned int iStreamIndex)
Definition: EventProcessor.cc:1934
edm::PreallocationConfiguration::numberOfStreams
unsigned int numberOfStreams() const
Definition: PreallocationConfiguration.h:35
AlignmentPI::index
index
Definition: AlignmentPayloadInspectorHelper.h:46
edm::FileBlock::ParallelProcesses
Definition: FileBlock.h:28
edm::PrincipalCache::processBlockPrincipal
ProcessBlockPrincipal & processBlockPrincipal() const
Definition: PrincipalCache.h:54
edm::EventProcessor::fileBlockValid
bool fileBlockValid()
Definition: EventProcessor.h:202
edm::EventProcessor::endRun
void endRun(ProcessHistoryID const &phid, RunNumber_t run, bool globalBeginSucceeded, bool cleaningUpAfterException)
Definition: EventProcessor.cc:1149
edm::EventProcessor::setExceptionMessageLumis
void setExceptionMessageLumis()
Definition: EventProcessor.cc:1981
edm::EventProcessor::looper
std::shared_ptr< EDLooperBase const > looper() const
Definition: EventProcessor.h:302
cms::Exception
Definition: Exception.h:70
edm::EventProcessor::nextLuminosityBlockID
edm::LuminosityBlockNumber_t nextLuminosityBlockID()
Definition: EventProcessor.cc:760
edm::InputSource::IsEvent
Definition: InputSource.h:78
edm::EventProcessor::subProcesses_
std::vector< SubProcess > subProcesses_
Definition: EventProcessor.h:337
edm::EventProcessor::nextTransitionType
InputSource::ItemType nextTransitionType()
Definition: EventProcessor.cc:730
edm::EventProcessor::prepareForNextLoop
void prepareForNextLoop()
Definition: EventProcessor.cc:922
edm::PrincipalCache::eventPrincipal
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
Definition: PrincipalCache.h:70
edm::EventProcessor::init
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
Definition: EventProcessor.cc:344
c
auto & c
Definition: CAHitNtupletGeneratorKernelsImpl.h:46
edm::PrincipalCache::insert
void insert(std::unique_ptr< ProcessBlockPrincipal >)
Definition: PrincipalCache.cc:96
edm::EventProcessor::processEventAsync
void processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
Definition: EventProcessor.cc:1877
event
Definition: event.py:1
edm::ProcessingController::ReverseState
ReverseState
Definition: ProcessingController.h:38
MatrixUtil.merge
def merge(dictlist, TELL=False)
Definition: MatrixUtil.py:201
edm::EventProcessor::shouldWeStop
bool shouldWeStop() const
Definition: EventProcessor.cc:1962
submitPVValidationJobs.t
string t
Definition: submitPVValidationJobs.py:644
edm::errors::Configuration
Definition: EDMException.h:36
SiStripBadComponentsDQMServiceTemplate_cfg.ep
ep
Definition: SiStripBadComponentsDQMServiceTemplate_cfg.py:86
edm::EventProcessor::preg
std::shared_ptr< ProductRegistry const > preg() const
Definition: EventProcessor.h:290
edm::EventProcessor::schedule_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
Definition: EventProcessor.h:331
edm::EventProcessor::writeLumiAsync
void writeLumiAsync(WaitingTaskHolder, LuminosityBlockPrincipal &lumiPrincipal)
Definition: EventProcessor.cc:1718
edm::EventProcessor::handleNextEventForStreamAsync
void handleNextEventForStreamAsync(WaitingTaskHolder iTask, unsigned int iStreamIndex)
Definition: EventProcessor.cc:1806
edm::WaitingTaskHolder::group
tbb::task_group * group() const noexcept
Definition: WaitingTaskHolder.h:77
edm::EventProcessor::branchIDListHelper_
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
Definition: EventProcessor.h:318
trackingPlots.common
common
Definition: trackingPlots.py:206
edm::PrincipalCache::adjustIndexesAfterProductRegistryAddition
void adjustIndexesAfterProductRegistryAddition()
Definition: PrincipalCache.cc:137
edm::LuminosityBlockID::maxLuminosityBlockNumber
static LuminosityBlockNumber_t maxLuminosityBlockNumber()
Definition: LuminosityBlockID.h:84
edm::EventProcessor::checkForAsyncStopRequest
bool checkForAsyncStopRequest(StatusCode &)
Definition: EventProcessor.cc:719
common
Definition: common.py:1
edm::EventProcessor::forceESCacheClearOnNewRun_
bool forceESCacheClearOnNewRun_
Definition: EventProcessor.h:358
findQualityFiles.size
size
Write out results.
Definition: findQualityFiles.py:443
MillePedeFileConverter_cfg.e
e
Definition: MillePedeFileConverter_cfg.py:37
edm::EventProcessor::beginJob
void beginJob()
Definition: EventProcessor.cc:560
edm::PrincipalCache::setNumberOfConcurrentPrincipals
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
Definition: PrincipalCache.cc:17
unpackBuffers-CaloStage2.token
token
Definition: unpackBuffers-CaloStage2.py:316
edm::MergeableRunProductProcesses::setProcessesWithMergeableRunProducts
void setProcessesWithMergeableRunProducts(ProductRegistry const &productRegistry)
Definition: MergeableRunProductProcesses.cc:18