CMS 3D CMS Logo

List of all members | Public Types | Public Member Functions | Private Types | Private Member Functions | Private Attributes
edm::EventProcessor Class Reference

#include <EventProcessor.h>

Public Types

enum  StatusCode {
  epSuccess =0, epException =1, epOther =2, epSignal =3,
  epInputComplete =4, epTimedOut =5, epCountComplete =6
}
 

Public Member Functions

void beginJob ()
 
void beginLumi (ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi)
 
void beginRun (ProcessHistoryID const &phid, RunNumber_t run)
 
void clearCounters ()
 Clears counters used by trigger report. More...
 
void closeInputFile (bool cleaningUpAfterException)
 
void closeOutputFiles ()
 
void deleteLumiFromCache (ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi)
 
void deleteRunFromCache (ProcessHistoryID const &phid, RunNumber_t run)
 
void doErrorStuff ()
 
void enableEndPaths (bool active)
 
void endJob ()
 
void endLumi (ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi, bool cleaningUpAfterException)
 
bool endOfLoop ()
 
bool endPathsEnabled () const
 
void endRun (ProcessHistoryID const &phid, RunNumber_t run, bool cleaningUpAfterException)
 
 EventProcessor (std::string const &config, ServiceToken const &token=ServiceToken(), serviceregistry::ServiceLegacy=serviceregistry::kOverlapIsError, std::vector< std::string > const &defaultServices=std::vector< std::string >(), std::vector< std::string > const &forcedServices=std::vector< std::string >())
 
 EventProcessor (std::string const &config, std::vector< std::string > const &defaultServices, std::vector< std::string > const &forcedServices=std::vector< std::string >())
 
 EventProcessor (std::shared_ptr< ProcessDesc > processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy legacy)
 
 EventProcessor (std::string const &config, bool isPython)
 meant for unit tests More...
 
 EventProcessor (EventProcessor const &)=delete
 
std::vector< ModuleDescription const * > getAllModuleDescriptions () const
 
ServiceToken getToken ()
 
void getTriggerReport (TriggerReport &rep) const
 
edm::LuminosityBlockNumber_t nextLuminosityBlockID ()
 
std::pair< edm::ProcessHistoryID, edm::RunNumber_tnextRunID ()
 
InputSource::ItemType nextTransitionType ()
 
void openOutputFiles ()
 
EventProcessoroperator= (EventProcessor const &)=delete
 
void prepareForNextLoop ()
 
ProcessConfiguration const & processConfiguration () const
 
int readAndMergeLumi ()
 
std::pair< ProcessHistoryID, RunNumber_treadAndMergeRun ()
 
InputSource::ItemType readAndProcessEvents ()
 
void readFile ()
 
int readLuminosityBlock ()
 
std::pair< ProcessHistoryID, RunNumber_treadRun ()
 
void respondToCloseInputFile ()
 
void respondToOpenInputFile ()
 
void rewindInput ()
 
StatusCode run ()
 
StatusCode runToCompletion ()
 
bool setDeferredException (std::exception_ptr)
 
void setExceptionMessageFiles (std::string &message)
 
void setExceptionMessageLumis (std::string &message)
 
void setExceptionMessageRuns (std::string &message)
 
bool shouldWeCloseOutput () const
 
bool shouldWeStop () const
 
void startingNewLoop ()
 
int totalEvents () const
 
int totalEventsFailed () const
 
int totalEventsPassed () const
 
void writeLumi (ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi)
 
void writeRun (ProcessHistoryID const &phid, RunNumber_t run)
 
 ~EventProcessor ()
 

Private Types

typedef std::set< std::pair< std::string, std::string > > ExcludedData
 
typedef std::map< std::string, ExcludedDataExcludedDataMap
 

Private Member Functions

std::shared_ptr< BranchIDListHelper const > branchIDListHelper () const
 
std::shared_ptr< BranchIDListHelper > & branchIDListHelper ()
 
bool checkForAsyncStopRequest (StatusCode &)
 
void handleNextEventForStreamAsync (WaitingTask *iTask, unsigned int iStreamIndex, std::atomic< bool > *finishedProcessingEvents)
 
void init (std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
 
std::shared_ptr< EDLooperBase const > looper () const
 
std::shared_ptr< EDLooperBase > & looper ()
 
std::shared_ptr< ProductRegistry const > preg () const
 
std::shared_ptr< ProductRegistry > & preg ()
 
void processEventAsync (WaitingTaskHolder iHolder, unsigned int iStreamIndex)
 
void processEventAsyncImpl (WaitingTaskHolder iHolder, unsigned int iStreamIndex)
 
void processEventWithLooper (EventPrincipal &)
 
void readEvent (unsigned int iStreamIndex)
 
bool readNextEventForStream (unsigned int iStreamIndex, std::atomic< bool > *finishedProcessingEvents)
 
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper () const
 
std::shared_ptr< ThinnedAssociationsHelper > & thinnedAssociationsHelper ()
 

Private Attributes

std::unique_ptr< ExceptionToActionTable const > act_table_
 
std::shared_ptr< ActivityRegistryactReg_
 
bool asyncStopRequestedWhileProcessingEvents_
 
StatusCode asyncStopStatusCodeFromProcessingEvents_
 
bool beginJobCalled_
 
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
 
std::exception_ptr deferredExceptionPtr_
 
std::atomic< bool > deferredExceptionPtrIsSet_
 
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
 
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
 
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
 
std::string exceptionMessageFiles_
 
std::string exceptionMessageLumis_
 
std::string exceptionMessageRuns_
 
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
 
bool fileModeNoMerge_
 
bool firstEventInBlock_ =true
 
bool forceESCacheClearOnNewRun_
 
bool forceLooperToEnd_
 
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
 
edm::propagate_const< std::unique_ptr< InputSource > > input_
 
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
 
bool looperBeginJobRun_
 
InputSource::ItemType nextItemTypeFromProcessingEvents_
 
PathsAndConsumesOfModules pathsAndConsumesOfModules_
 
PreallocationConfiguration preallocations_
 
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
 
PrincipalCache principalCache_
 
bool printDependencies_ = false
 
std::shared_ptr< ProcessConfiguration const > processConfiguration_
 
ProcessContext processContext_
 
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
 
ServiceToken serviceToken_
 
bool shouldWeStop_
 
std::shared_ptr< std::recursive_mutex > sourceMutex_
 
SharedResourcesAcquirer sourceResourcesAcquirer_
 
std::vector< SubProcesssubProcesses_
 
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
 

Detailed Description

Definition at line 57 of file EventProcessor.h.

Member Typedef Documentation

typedef std::set<std::pair<std::string, std::string> > edm::EventProcessor::ExcludedData
private

Definition at line 319 of file EventProcessor.h.

typedef std::map<std::string, ExcludedData> edm::EventProcessor::ExcludedDataMap
private

Definition at line 320 of file EventProcessor.h.

Member Enumeration Documentation

Constructor & Destructor Documentation

edm::EventProcessor::EventProcessor ( std::string const &  config,
ServiceToken const &  token = ServiceToken(),
serviceregistry::ServiceLegacy  iLegacy = serviceregistry::kOverlapIsError,
std::vector< std::string > const &  defaultServices = std::vector<std::string>(),
std::vector< std::string > const &  forcedServices = std::vector<std::string>() 
)
explicit

Definition at line 215 of file EventProcessor.cc.

References init(), edm::parameterSet(), and PythonProcessDesc::parameterSet().

219  :
220  actReg_(),
221  preg_(),
223  serviceToken_(),
224  input_(),
225  espController_(new eventsetup::EventSetupsController),
226  esp_(),
227  act_table_(),
229  schedule_(),
230  subProcesses_(),
231  historyAppender_(new HistoryAppender),
232  fb_(),
233  looper_(),
235  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
236  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
237  principalCache_(),
238  beginJobCalled_(false),
239  shouldWeStop_(false),
240  fileModeNoMerge_(false),
244  forceLooperToEnd_(false),
245  looperBeginJobRun_(false),
248  std::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
249  auto processDesc = std::make_shared<ProcessDesc>(parameterSet);
250  processDesc->addServices(defaultServices, forcedServices);
251  init(processDesc, iToken, iLegacy);
252  }
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
SharedResourcesAcquirer sourceResourcesAcquirer_
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
Definition: config.py:1
std::string exceptionMessageRuns_
std::vector< SubProcess > subProcesses_
U second(std::pair< T, U > const &p)
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
std::string exceptionMessageLumis_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
static SharedResourcesRegistry * instance()
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
std::shared_ptr< edm::ParameterSet > parameterSet() const
std::string exceptionMessageFiles_
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
T first(std::pair< T, U > const &p)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
ParameterSet const & parameterSet(Provenance const &provenance)
Definition: Provenance.cc:11
PrincipalCache principalCache_
edm::EventProcessor::EventProcessor ( std::string const &  config,
std::vector< std::string > const &  defaultServices,
std::vector< std::string > const &  forcedServices = std::vector<std::string>() 
)

Definition at line 254 of file EventProcessor.cc.

References init(), edm::serviceregistry::kOverlapIsError, edm::parameterSet(), and PythonProcessDesc::parameterSet().

256  :
257  actReg_(),
258  preg_(),
260  serviceToken_(),
261  input_(),
262  espController_(new eventsetup::EventSetupsController),
263  esp_(),
264  act_table_(),
266  schedule_(),
267  subProcesses_(),
268  historyAppender_(new HistoryAppender),
269  fb_(),
270  looper_(),
272  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
273  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
274  principalCache_(),
275  beginJobCalled_(false),
276  shouldWeStop_(false),
277  fileModeNoMerge_(false),
281  forceLooperToEnd_(false),
282  looperBeginJobRun_(false),
287  {
288  std::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
289  auto processDesc = std::make_shared<ProcessDesc>(parameterSet);
290  processDesc->addServices(defaultServices, forcedServices);
292  }
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
SharedResourcesAcquirer sourceResourcesAcquirer_
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
Definition: config.py:1
std::string exceptionMessageRuns_
std::vector< SubProcess > subProcesses_
U second(std::pair< T, U > const &p)
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
std::string exceptionMessageLumis_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
static SharedResourcesRegistry * instance()
InputSource::ItemType nextItemTypeFromProcessingEvents_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
std::shared_ptr< edm::ParameterSet > parameterSet() const
std::string exceptionMessageFiles_
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
bool asyncStopRequestedWhileProcessingEvents_
std::shared_ptr< ActivityRegistry > actReg_
T first(std::pair< T, U > const &p)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
ParameterSet const & parameterSet(Provenance const &provenance)
Definition: Provenance.cc:11
PrincipalCache principalCache_
edm::EventProcessor::EventProcessor ( std::shared_ptr< ProcessDesc processDesc,
ServiceToken const &  token,
serviceregistry::ServiceLegacy  legacy 
)

Definition at line 294 of file EventProcessor.cc.

References init().

296  :
297  actReg_(),
298  preg_(),
300  serviceToken_(),
301  input_(),
302  espController_(new eventsetup::EventSetupsController),
303  esp_(),
304  act_table_(),
306  schedule_(),
307  subProcesses_(),
308  historyAppender_(new HistoryAppender),
309  fb_(),
310  looper_(),
312  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
313  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
314  principalCache_(),
315  beginJobCalled_(false),
316  shouldWeStop_(false),
317  fileModeNoMerge_(false),
321  forceLooperToEnd_(false),
322  looperBeginJobRun_(false),
327  {
328  init(processDesc, token, legacy);
329  }
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
SharedResourcesAcquirer sourceResourcesAcquirer_
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
std::string exceptionMessageRuns_
std::vector< SubProcess > subProcesses_
U second(std::pair< T, U > const &p)
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
std::string exceptionMessageLumis_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
static SharedResourcesRegistry * instance()
InputSource::ItemType nextItemTypeFromProcessingEvents_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
std::string exceptionMessageFiles_
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
bool asyncStopRequestedWhileProcessingEvents_
std::shared_ptr< ActivityRegistry > actReg_
T first(std::pair< T, U > const &p)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
PrincipalCache principalCache_
edm::EventProcessor::EventProcessor ( std::string const &  config,
bool  isPython 
)

meant for unit tests

Definition at line 332 of file EventProcessor.cc.

References looper::config, init(), edm::serviceregistry::kOverlapIsError, edm::parameterSet(), and PythonProcessDesc::parameterSet().

332  :
333  actReg_(),
334  preg_(),
336  serviceToken_(),
337  input_(),
338  espController_(new eventsetup::EventSetupsController),
339  esp_(),
340  act_table_(),
342  schedule_(),
343  subProcesses_(),
344  historyAppender_(new HistoryAppender),
345  fb_(),
346  looper_(),
348  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
349  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
350  principalCache_(),
351  beginJobCalled_(false),
352  shouldWeStop_(false),
353  fileModeNoMerge_(false),
357  forceLooperToEnd_(false),
358  looperBeginJobRun_(false),
363  {
364  if(isPython) {
365  std::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
366  auto processDesc = std::make_shared<ProcessDesc>(parameterSet);
368  }
369  else {
370  auto processDesc = std::make_shared<ProcessDesc>(config);
372  }
373  }
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
SharedResourcesAcquirer sourceResourcesAcquirer_
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
Definition: config.py:1
std::string exceptionMessageRuns_
std::vector< SubProcess > subProcesses_
U second(std::pair< T, U > const &p)
config
Definition: looper.py:287
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
std::string exceptionMessageLumis_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
static SharedResourcesRegistry * instance()
InputSource::ItemType nextItemTypeFromProcessingEvents_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
std::shared_ptr< edm::ParameterSet > parameterSet() const
std::string exceptionMessageFiles_
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
bool asyncStopRequestedWhileProcessingEvents_
std::shared_ptr< ActivityRegistry > actReg_
T first(std::pair< T, U > const &p)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
ParameterSet const & parameterSet(Provenance const &provenance)
Definition: Provenance.cc:11
PrincipalCache principalCache_
edm::EventProcessor::~EventProcessor ( )

Definition at line 547 of file EventProcessor.cc.

References actReg_, edm::ParentageRegistry::clear(), edm::pset::Registry::clear(), esp_, espController_, getToken(), input_, edm::ParentageRegistry::instance(), edm::pset::Registry::instance(), looper_, and schedule_.

547  {
548  // Make the services available while everything is being deleted.
549  ServiceToken token = getToken();
550  ServiceRegistry::Operate op(token);
551 
552  // manually destroy all these thing that may need the services around
553  // propagate_const<T> has no reset() function
554  espController_ = nullptr;
555  esp_ = nullptr;
556  schedule_ = nullptr;
557  input_ = nullptr;
558  looper_ = nullptr;
559  actReg_ = nullptr;
560 
563  }
void clear()
Not thread safe.
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
void clear()
Not thread safe.
Definition: Registry.cc:44
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
ServiceToken getToken()
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
static ParentageRegistry * instance()
static Registry * instance()
Definition: Registry.cc:12
edm::EventProcessor::EventProcessor ( EventProcessor const &  )
delete

Member Function Documentation

void edm::EventProcessor::beginJob ( void  )

This should be called before the first call to 'run' If this is not called in time, it will automatically be called the first time 'run' is called

Definition at line 566 of file EventProcessor.cc.

References actReg_, cms::Exception::addContext(), bk::beginJob(), beginJobCalled_, edm::checkForModuleDependencyCorrectness(), edm::for_all(), mps_fire::i, edm::PathsAndConsumesOfModules::initialize(), input_, edm::PreallocationConfiguration::numberOfLuminosityBlocks(), edm::PreallocationConfiguration::numberOfRuns(), edm::PreallocationConfiguration::numberOfStreams(), edm::PreallocationConfiguration::numberOfThreads(), cmsPerfStripChart::operate(), pathsAndConsumesOfModules_, preallocations_, preg(), preg_, printDependencies_, processConfiguration_, processContext_, schedule_, serviceToken_, subProcesses_, and edm::convertException::wrap().

Referenced by runToCompletion().

566  {
567  if(beginJobCalled_) return;
568  beginJobCalled_=true;
569  bk::beginJob();
570 
571  // StateSentry toerror(this); // should we add this ?
572  //make the services available
574 
575  service::SystemBounds bounds(preallocations_.numberOfStreams(),
579  actReg_->preallocateSignal_(bounds);
580  schedule_->convertCurrentProcessAlias(processConfiguration_->processName());
582 
583  //NOTE: this may throw
585  actReg_->preBeginJobSignal_(pathsAndConsumesOfModules_, processContext_);
586 
587  //NOTE: This implementation assumes 'Job' means one call
588  // the EventProcessor::run
589  // If it really means once per 'application' then this code will
590  // have to be changed.
591  // Also have to deal with case where have 'run' then new Module
592  // added and do 'run'
593  // again. In that case the newly added Module needs its 'beginJob'
594  // to be called.
595 
596  //NOTE: in future we should have a beginOfJob for looper that takes no arguments
597  // For now we delay calling beginOfJob until first beginOfRun
598  //if(looper_) {
599  // looper_->beginOfJob(es);
600  //}
601  try {
602  convertException::wrap([&]() {
603  input_->doBeginJob();
604  });
605  }
606  catch(cms::Exception& ex) {
607  ex.addContext("Calling beginJob for the source");
608  throw;
609  }
610  schedule_->beginJob(*preg_);
611  // toerror.succeeded(); // should we add this?
612  for_all(subProcesses_, [](auto& subProcess){ subProcess.doBeginJob(); });
613  actReg_->postBeginJobSignal_();
614 
615  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
616  schedule_->beginStream(i);
617  for_all(subProcesses_, [i](auto& subProcess){ subProcess.doBeginStream(i); });
618  }
619  }
ProcessContext processContext_
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
PreallocationConfiguration preallocations_
std::vector< SubProcess > subProcesses_
void beginJob()
Definition: Breakpoints.cc:15
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
void initialize(Schedule const *, std::shared_ptr< ProductRegistry const >)
ServiceToken serviceToken_
std::shared_ptr< ProductRegistry const > preg() const
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
void checkForModuleDependencyCorrectness(edm::PathsAndConsumesOfModulesBase const &iPnC, bool iPrintDependencies)
void addContext(std::string const &context)
Definition: Exception.cc:227
PathsAndConsumesOfModules pathsAndConsumesOfModules_
auto wrap(F iFunc) -> decltype(iFunc())
std::shared_ptr< ActivityRegistry > actReg_
def operate(timelog, memlog, json_f, num)
void edm::EventProcessor::beginLumi ( ProcessHistoryID const &  phid,
RunNumber_t  run,
LuminosityBlockNumber_t  lumi 
)

Definition at line 1077 of file EventProcessor.cc.

References actReg_, edm::LuminosityBlockPrincipal::beginTime(), esp_, espController_, FDEBUG, input_, edm::Service< T >::isAvailable(), looper_, edm::LuminosityBlockPrincipal::luminosityBlock(), edm::PrincipalCache::lumiPrincipal(), edm::make_empty_waiting_task(), edm::PreallocationConfiguration::numberOfStreams(), preallocations_, principalCache_, processContext_, edm::LuminosityBlockPrincipal::run(), schedule_, and subProcesses_.

1077  {
1078  LuminosityBlockPrincipal& lumiPrincipal = principalCache_.lumiPrincipal(phid, run, lumi);
1079  {
1080  SendSourceTerminationSignalIfException sentry(actReg_.get());
1081 
1082  input_->doBeginLumi(lumiPrincipal, &processContext_);
1083  sentry.completedSuccessfully();
1084  }
1085 
1087  if(rng.isAvailable()) {
1088  LuminosityBlock lb(lumiPrincipal, ModuleDescription(), nullptr);
1089  rng->preBeginLumi(lb);
1090  }
1091 
1092  // NOTE: Using 0 as the event number for the begin of a lumi block is a bad idea
1093  // lumi blocks know their start and end times why not also start and end events?
1094  IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), 0), lumiPrincipal.beginTime());
1095  {
1096  SendSourceTerminationSignalIfException sentry(actReg_.get());
1097  espController_->eventSetupForInstance(ts);
1098  sentry.completedSuccessfully();
1099  }
1100  EventSetup const& es = esp_->eventSetup();
1101  {
1102  typedef OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalBegin> Traits;
1103  auto globalWaitTask = make_empty_waiting_task();
1104  globalWaitTask->increment_ref_count();
1105  beginGlobalTransitionAsync<Traits>(WaitingTaskHolder(globalWaitTask.get()),
1106  *schedule_,
1107  lumiPrincipal,
1108  ts,
1109  es,
1110  subProcesses_);
1111  globalWaitTask->wait_for_all();
1112  if(globalWaitTask->exceptionPtr() != nullptr) {
1113  std::rethrow_exception(* (globalWaitTask->exceptionPtr()) );
1114  }
1115  }
1116  FDEBUG(1) << "\tbeginLumi " << run << "/" << lumi << "\n";
1117  if(looper_) {
1118  looper_->doBeginLuminosityBlock(lumiPrincipal, es, &processContext_);
1119  }
1120  {
1121  //To wait, the ref count has to b 1+#streams
1122  auto streamLoopWaitTask = make_empty_waiting_task();
1123  streamLoopWaitTask->increment_ref_count();
1124 
1125  typedef OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamBegin> Traits;
1126 
1127  beginStreamsTransitionAsync<Traits>(streamLoopWaitTask.get(),
1128  *schedule_,
1130  lumiPrincipal,
1131  ts,
1132  es,
1133  subProcesses_);
1134  streamLoopWaitTask->wait_for_all();
1135  if(streamLoopWaitTask->exceptionPtr() != nullptr) {
1136  std::rethrow_exception(* (streamLoopWaitTask->exceptionPtr()) );
1137  }
1138  }
1139 
1140  FDEBUG(1) << "\tstreamBeginLumi " << run << "/" << lumi << "\n";
1141  if(looper_) {
1142  //looper_->doStreamBeginLuminosityBlock(schedule_->streamID(),lumiPrincipal, es);
1143  }
1144  }
ProcessContext processContext_
edm::propagate_const< std::unique_ptr< InputSource > > input_
LuminosityBlockPrincipal & lumiPrincipal(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) const
#define FDEBUG(lev)
Definition: DebugMacros.h:18
PreallocationConfiguration preallocations_
std::vector< SubProcess > subProcesses_
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
PrincipalCache principalCache_
void edm::EventProcessor::beginRun ( ProcessHistoryID const &  phid,
RunNumber_t  run 
)

Definition at line 934 of file EventProcessor.cc.

References actReg_, edm::RunPrincipal::beginTime(), esp_, espController_, FDEBUG, forceESCacheClearOnNewRun_, input_, looper_, looperBeginJobRun_, edm::make_empty_waiting_task(), edm::PreallocationConfiguration::numberOfStreams(), preallocations_, principalCache_, processContext_, edm::RunPrincipal::run(), edm::PrincipalCache::runPrincipal(), schedule_, and subProcesses_.

934  {
935  RunPrincipal& runPrincipal = principalCache_.runPrincipal(phid, run);
936  {
937  SendSourceTerminationSignalIfException sentry(actReg_.get());
938 
939  input_->doBeginRun(runPrincipal, &processContext_);
940  sentry.completedSuccessfully();
941  }
942 
943  IOVSyncValue ts(EventID(runPrincipal.run(), 0, 0),
944  runPrincipal.beginTime());
946  espController_->forceCacheClear();
947  }
948  {
949  SendSourceTerminationSignalIfException sentry(actReg_.get());
950  espController_->eventSetupForInstance(ts);
951  sentry.completedSuccessfully();
952  }
953  EventSetup const& es = esp_->eventSetup();
954  if(looper_ && looperBeginJobRun_== false) {
955  looper_->copyInfo(ScheduleInfo(schedule_.get()));
956  looper_->beginOfJob(es);
957  looperBeginJobRun_ = true;
958  looper_->doStartingNewLoop();
959  }
960  {
961  typedef OccurrenceTraits<RunPrincipal, BranchActionGlobalBegin> Traits;
962  auto globalWaitTask = make_empty_waiting_task();
963  globalWaitTask->increment_ref_count();
964  beginGlobalTransitionAsync<Traits>(WaitingTaskHolder(globalWaitTask.get()),
965  *schedule_,
966  runPrincipal,
967  ts,
968  es,
969  subProcesses_);
970  globalWaitTask->wait_for_all();
971  if(globalWaitTask->exceptionPtr() != nullptr) {
972  std::rethrow_exception(* (globalWaitTask->exceptionPtr()) );
973  }
974  }
975  FDEBUG(1) << "\tbeginRun " << run << "\n";
976  if(looper_) {
977  looper_->doBeginRun(runPrincipal, es, &processContext_);
978  }
979  {
980  //To wait, the ref count has to be 1+#streams
981  auto streamLoopWaitTask = make_empty_waiting_task();
982  streamLoopWaitTask->increment_ref_count();
983 
984  typedef OccurrenceTraits<RunPrincipal, BranchActionStreamBegin> Traits;
985 
986  beginStreamsTransitionAsync<Traits>(streamLoopWaitTask.get(),
987  *schedule_,
989  runPrincipal,
990  ts,
991  es,
992  subProcesses_);
993 
994  streamLoopWaitTask->wait_for_all();
995  if(streamLoopWaitTask->exceptionPtr() != nullptr) {
996  std::rethrow_exception(* (streamLoopWaitTask->exceptionPtr()) );
997  }
998  }
999  FDEBUG(1) << "\tstreamBeginRun " << run << "\n";
1000  if(looper_) {
1001  //looper_->doStreamBeginRun(schedule_->streamID(),runPrincipal, es);
1002  }
1003  }
ProcessContext processContext_
edm::propagate_const< std::unique_ptr< InputSource > > input_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
PreallocationConfiguration preallocations_
std::vector< SubProcess > subProcesses_
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
PrincipalCache principalCache_
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const
std::shared_ptr<BranchIDListHelper const> edm::EventProcessor::branchIDListHelper ( ) const
inlineprivate

Definition at line 263 of file EventProcessor.h.

References edm::get_underlying_safe().

Referenced by init().

std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
std::shared_ptr<BranchIDListHelper>& edm::EventProcessor::branchIDListHelper ( )
inlineprivate

Definition at line 264 of file EventProcessor.h.

References edm::get_underlying_safe().

std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
bool edm::EventProcessor::checkForAsyncStopRequest ( StatusCode returnCode)
private

Definition at line 702 of file EventProcessor.cc.

References epSignal, and edm::shutdown_flag.

Referenced by nextTransitionType(), and readNextEventForStream().

702  {
703  bool returnValue = false;
704 
705  // Look for a shutdown signal
706  if(shutdown_flag.load(std::memory_order_acquire)) {
707  returnValue = true;
709  }
710  return returnValue;
711  }
volatile std::atomic< bool > shutdown_flag
void edm::EventProcessor::clearCounters ( )

Clears counters used by trigger report.

Definition at line 693 of file EventProcessor.cc.

References schedule_.

693  {
694  schedule_->clearCounters();
695  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
void edm::EventProcessor::closeInputFile ( bool  cleaningUpAfterException)

Definition at line 835 of file EventProcessor.cc.

References actReg_, fb_, FDEBUG, and input_.

835  {
836  if (fb_.get() != nullptr) {
837  SendSourceTerminationSignalIfException sentry(actReg_.get());
838  input_->closeFile(fb_.get(), cleaningUpAfterException);
839  sentry.completedSuccessfully();
840  }
841  FDEBUG(1) << "\tcloseInputFile\n";
842  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
std::shared_ptr< ActivityRegistry > actReg_
void edm::EventProcessor::closeOutputFiles ( )

Definition at line 852 of file EventProcessor.cc.

References fb_, FDEBUG, edm::for_all(), schedule_, and subProcesses_.

852  {
853  if (fb_.get() != nullptr) {
854  schedule_->closeOutputFiles();
855  for_all(subProcesses_, [](auto& subProcess){ subProcess.closeOutputFiles(); });
856  }
857  FDEBUG(1) << "\tcloseOutputFiles\n";
858  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
void edm::EventProcessor::deleteLumiFromCache ( ProcessHistoryID const &  phid,
RunNumber_t  run,
LuminosityBlockNumber_t  lumi 
)

Definition at line 1301 of file EventProcessor.cc.

References edm::PrincipalCache::deleteLumi(), FDEBUG, edm::for_all(), principalCache_, and subProcesses_.

1301  {
1303  for_all(subProcesses_, [&phid, run, lumi](auto& subProcess){ subProcess.deleteLumiFromCache(phid, run, lumi); });
1304  FDEBUG(1) << "\tdeleteLumiFromCache " << run << "/" << lumi << "\n";
1305  }
void deleteLumi(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi)
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
PrincipalCache principalCache_
void edm::EventProcessor::deleteRunFromCache ( ProcessHistoryID const &  phid,
RunNumber_t  run 
)

Definition at line 1289 of file EventProcessor.cc.

References edm::PrincipalCache::deleteRun(), FDEBUG, edm::for_all(), principalCache_, and subProcesses_.

1289  {
1290  principalCache_.deleteRun(phid, run);
1291  for_all(subProcesses_, [run,phid](auto& subProcess){ subProcess.deleteRunFromCache(phid, run); });
1292  FDEBUG(1) << "\tdeleteRunFromCache " << run << "\n";
1293  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
void deleteRun(ProcessHistoryID const &phid, RunNumber_t run)
PrincipalCache principalCache_
void edm::EventProcessor::doErrorStuff ( )

Definition at line 924 of file EventProcessor.cc.

References FDEBUG.

Referenced by runToCompletion().

924  {
925  FDEBUG(1) << "\tdoErrorStuff\n";
926  LogError("StateMachine")
927  << "The EventProcessor state machine encountered an unexpected event\n"
928  << "and went to the error state\n"
929  << "Will attempt to terminate processing normally\n"
930  << "(IF using the looper the next loop will be attempted)\n"
931  << "This likely indicates a bug in an input module or corrupted input or both\n";
932  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
void edm::EventProcessor::enableEndPaths ( bool  active)

Turn end_paths "off" if "active" is false; turn end_paths "on" if "active" is true.

Definition at line 678 of file EventProcessor.cc.

References schedule_.

678  {
679  schedule_->enableEndPaths(active);
680  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
void edm::EventProcessor::endJob ( void  )

This should be called before the EventProcessor is destroyed throws if any module's endJob throws an exception.

Definition at line 622 of file EventProcessor.cc.

References actReg_, EnergyCorrector::c, edm::ExceptionCollector::call(), edm::SubProcess::doEndJob(), edm::InputSource::doEndJob(), edm::EDLooperBase::endOfJob(), edm::ExceptionCollector::hasThrown(), mps_fire::i, input_, looper(), looper_, edm::PreallocationConfiguration::numberOfStreams(), cmsPerfStripChart::operate(), preallocations_, edm::ExceptionCollector::rethrow(), schedule_, serviceToken_, and subProcesses_.

Referenced by PythonEventProcessor::~PythonEventProcessor().

622  {
623  // Collects exceptions, so we don't throw before all operations are performed.
624  ExceptionCollector c("Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
625 
626  //make the services available
628 
629  //NOTE: this really should go elsewhere in the future
630  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
631  c.call([this,i](){this->schedule_->endStream(i);});
632  for(auto& subProcess : subProcesses_) {
633  c.call([&subProcess,i](){ subProcess.doEndStream(i); } );
634  }
635  }
636  auto actReg = actReg_.get();
637  c.call([actReg](){actReg->preEndJobSignal_();});
638  schedule_->endJob(c);
639  for(auto& subProcess : subProcesses_) {
640  c.call(std::bind(&SubProcess::doEndJob, &subProcess));
641  }
642  c.call(std::bind(&InputSource::doEndJob, input_.get()));
643  if(looper_) {
644  c.call(std::bind(&EDLooperBase::endOfJob, looper()));
645  }
646  c.call([actReg](){actReg->postEndJobSignal_();});
647  if(c.hasThrown()) {
648  c.rethrow();
649  }
650  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
void doEndJob()
Called by framework at end of job.
Definition: InputSource.cc:219
PreallocationConfiguration preallocations_
std::vector< SubProcess > subProcesses_
ServiceToken serviceToken_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
virtual void endOfJob()
Definition: EDLooperBase.cc:90
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ActivityRegistry > actReg_
std::shared_ptr< EDLooperBase const > looper() const
def operate(timelog, memlog, json_f, num)
void edm::EventProcessor::endLumi ( ProcessHistoryID const &  phid,
RunNumber_t  run,
LuminosityBlockNumber_t  lumi,
bool  cleaningUpAfterException 
)

Definition at line 1146 of file EventProcessor.cc.

References actReg_, edm::LuminosityBlockPrincipal::endTime(), esp_, espController_, FDEBUG, input_, looper_, edm::LuminosityBlockPrincipal::luminosityBlock(), edm::PrincipalCache::lumiPrincipal(), edm::make_empty_waiting_task(), edm::EventID::maxEventNumber(), edm::PreallocationConfiguration::numberOfStreams(), preallocations_, principalCache_, processContext_, edm::Principal::resetFailedFromThisProcess(), edm::LuminosityBlockPrincipal::run(), schedule_, edm::Principal::setAtEndTransition(), edm::LuminosityBlockPrincipal::setComplete(), edm::LuminosityBlockPrincipal::setEndTime(), and subProcesses_.

Referenced by Types.EventRange::cppID().

1146  {
1147  LuminosityBlockPrincipal& lumiPrincipal = principalCache_.lumiPrincipal(phid, run, lumi);
1148  lumiPrincipal.setAtEndTransition(true);
1149  //We need to reset failed items since they might
1150  // be set this time around
1151  lumiPrincipal.resetFailedFromThisProcess();
1152 
1153  {
1154  SendSourceTerminationSignalIfException sentry(actReg_.get());
1155 
1156  lumiPrincipal.setEndTime(input_->timestamp());
1157  lumiPrincipal.setComplete();
1158  input_->doEndLumi(lumiPrincipal, cleaningUpAfterException, &processContext_);
1159  sentry.completedSuccessfully();
1160  }
1161  //NOTE: Using the max event number for the end of a lumi block is a bad idea
1162  // lumi blocks know their start and end times why not also start and end events?
1163  IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), EventID::maxEventNumber()),
1164  lumiPrincipal.endTime());
1165  {
1166  SendSourceTerminationSignalIfException sentry(actReg_.get());
1167  espController_->eventSetupForInstance(ts);
1168  sentry.completedSuccessfully();
1169  }
1170  EventSetup const& es = esp_->eventSetup();
1171  {
1172  //To wait, the ref count has to b 1+#streams
1173  auto streamLoopWaitTask = make_empty_waiting_task();
1174  streamLoopWaitTask->increment_ref_count();
1175 
1176  typedef OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamEnd> Traits;
1177 
1178  endStreamsTransitionAsync<Traits>(streamLoopWaitTask.get(),
1179  *schedule_,
1181  lumiPrincipal,
1182  ts,
1183  es,
1184  subProcesses_,
1185  cleaningUpAfterException);
1186  streamLoopWaitTask->wait_for_all();
1187  if(streamLoopWaitTask->exceptionPtr() != nullptr) {
1188  std::rethrow_exception(* (streamLoopWaitTask->exceptionPtr()) );
1189  }
1190  }
1191  FDEBUG(1) << "\tendLumi " << run << "/" << lumi << "\n";
1192  if(looper_) {
1193  //looper_->doStreamEndLuminosityBlock(schedule_->streamID(),lumiPrincipal, es);
1194  }
1195  {
1196  auto globalWaitTask = make_empty_waiting_task();
1197  globalWaitTask->increment_ref_count();
1198 
1199  typedef OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalEnd> Traits;
1200  endGlobalTransitionAsync<Traits>(WaitingTaskHolder(globalWaitTask.get()),
1201  *schedule_,
1202  lumiPrincipal,
1203  ts,
1204  es,
1205  subProcesses_,
1206  cleaningUpAfterException);
1207  globalWaitTask->wait_for_all();
1208  if(globalWaitTask->exceptionPtr() != nullptr) {
1209  std::rethrow_exception(* (globalWaitTask->exceptionPtr()) );
1210  }
1211  }
1212  FDEBUG(1) << "\tendLumi " << run << "/" << lumi << "\n";
1213  if(looper_) {
1214  looper_->doEndLuminosityBlock(lumiPrincipal, es, &processContext_);
1215  }
1216  }
ProcessContext processContext_
edm::propagate_const< std::unique_ptr< InputSource > > input_
LuminosityBlockPrincipal & lumiPrincipal(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) const
void setAtEndTransition(bool iAtEnd)
Definition: Principal.cc:325
#define FDEBUG(lev)
Definition: DebugMacros.h:18
PreallocationConfiguration preallocations_
std::vector< SubProcess > subProcesses_
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
static EventNumber_t maxEventNumber()
Definition: EventID.h:111
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
PrincipalCache principalCache_
bool edm::EventProcessor::endOfLoop ( )

Definition at line 887 of file EventProcessor.cc.

References esp_, FDEBUG, forceLooperToEnd_, edm::EDLooperBase::kContinue, looper_, preg_, schedule_, and mps_update::status.

Referenced by runToCompletion().

887  {
888  if(looper_) {
889  ModuleChanger changer(schedule_.get(),preg_.get());
890  looper_->setModuleChanger(&changer);
891  EDLooperBase::Status status = looper_->doEndOfLoop(esp_->eventSetup());
892  looper_->setModuleChanger(nullptr);
893  if(status != EDLooperBase::kContinue || forceLooperToEnd_) return true;
894  else return false;
895  }
896  FDEBUG(1) << "\tendOfLoop\n";
897  return true;
898  }
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
bool edm::EventProcessor::endPathsEnabled ( ) const

Return true if end_paths are active, and false if they are inactive.

Definition at line 683 of file EventProcessor.cc.

References schedule_.

683  {
684  return schedule_->endPathsEnabled();
685  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
void edm::EventProcessor::endRun ( ProcessHistoryID const &  phid,
RunNumber_t  run,
bool  cleaningUpAfterException 
)

Definition at line 1005 of file EventProcessor.cc.

References actReg_, edm::RunPrincipal::endTime(), esp_, espController_, FDEBUG, input_, looper_, edm::make_empty_waiting_task(), edm::EventID::maxEventNumber(), edm::LuminosityBlockID::maxLuminosityBlockNumber(), edm::PreallocationConfiguration::numberOfStreams(), preallocations_, principalCache_, processContext_, edm::Principal::resetFailedFromThisProcess(), edm::RunPrincipal::run(), edm::PrincipalCache::runPrincipal(), schedule_, edm::Principal::setAtEndTransition(), edm::RunPrincipal::setComplete(), edm::RunPrincipal::setEndTime(), and subProcesses_.

1005  {
1006  RunPrincipal& runPrincipal = principalCache_.runPrincipal(phid, run);
1007  runPrincipal.setAtEndTransition(true);
1008  //We need to reset failed items since they might
1009  // be set this time around
1010  runPrincipal.resetFailedFromThisProcess();
1011 
1012  {
1013  SendSourceTerminationSignalIfException sentry(actReg_.get());
1014 
1015  runPrincipal.setEndTime(input_->timestamp());
1016  runPrincipal.setComplete();
1017  input_->doEndRun(runPrincipal, cleaningUpAfterException, &processContext_);
1018  sentry.completedSuccessfully();
1019  }
1020 
1022  runPrincipal.endTime());
1023  {
1024  SendSourceTerminationSignalIfException sentry(actReg_.get());
1025  espController_->eventSetupForInstance(ts);
1026  sentry.completedSuccessfully();
1027  }
1028  EventSetup const& es = esp_->eventSetup();
1029  {
1030  //To wait, the ref count has to be 1+#streams
1031  auto streamLoopWaitTask = make_empty_waiting_task();
1032  streamLoopWaitTask->increment_ref_count();
1033 
1034  typedef OccurrenceTraits<RunPrincipal, BranchActionStreamEnd> Traits;
1035 
1036  endStreamsTransitionAsync<Traits>(streamLoopWaitTask.get(),
1037  *schedule_,
1039  runPrincipal,
1040  ts,
1041  es,
1042  subProcesses_,
1043  cleaningUpAfterException);
1044 
1045  streamLoopWaitTask->wait_for_all();
1046  if(streamLoopWaitTask->exceptionPtr() != nullptr) {
1047  std::rethrow_exception(* (streamLoopWaitTask->exceptionPtr()) );
1048  }
1049  }
1050  FDEBUG(1) << "\tstreamEndRun " << run << "\n";
1051  if(looper_) {
1052  //looper_->doStreamEndRun(schedule_->streamID(),runPrincipal, es);
1053  }
1054  {
1055  auto globalWaitTask = make_empty_waiting_task();
1056  globalWaitTask->increment_ref_count();
1057 
1058  typedef OccurrenceTraits<RunPrincipal, BranchActionGlobalEnd> Traits;
1059  endGlobalTransitionAsync<Traits>(WaitingTaskHolder(globalWaitTask.get()),
1060  *schedule_,
1061  runPrincipal,
1062  ts,
1063  es,
1064  subProcesses_,
1065  cleaningUpAfterException);
1066  globalWaitTask->wait_for_all();
1067  if(globalWaitTask->exceptionPtr() != nullptr) {
1068  std::rethrow_exception(* (globalWaitTask->exceptionPtr()) );
1069  }
1070  }
1071  FDEBUG(1) << "\tendRun " << run << "\n";
1072  if(looper_) {
1073  looper_->doEndRun(runPrincipal, es, &processContext_);
1074  }
1075  }
ProcessContext processContext_
edm::propagate_const< std::unique_ptr< InputSource > > input_
static LuminosityBlockNumber_t maxLuminosityBlockNumber()
void setAtEndTransition(bool iAtEnd)
Definition: Principal.cc:325
#define FDEBUG(lev)
Definition: DebugMacros.h:18
PreallocationConfiguration preallocations_
std::vector< SubProcess > subProcesses_
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
static EventNumber_t maxEventNumber()
Definition: EventID.h:111
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
PrincipalCache principalCache_
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const
std::vector< ModuleDescription const * > edm::EventProcessor::getAllModuleDescriptions ( ) const

Return a vector allowing const access to all the ModuleDescriptions for this EventProccessor. *** N.B. *** Ownership of the ModuleDescriptions is not *** passed to the caller. Do not call delete on these *** pointers!

Definition at line 658 of file EventProcessor.cc.

References schedule_.

658  {
659  return schedule_->getAllModuleDescriptions();
660  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
ServiceToken edm::EventProcessor::getToken ( )

Definition at line 653 of file EventProcessor.cc.

References serviceToken_.

Referenced by ~EventProcessor().

653  {
654  return serviceToken_;
655  }
ServiceToken serviceToken_
void edm::EventProcessor::getTriggerReport ( TriggerReport rep) const

Return the trigger report information on paths, modules-in-path, modules-in-endpath, and modules.

Definition at line 688 of file EventProcessor.cc.

References schedule_.

688  {
689  schedule_->getTriggerReport(rep);
690  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
rep
Definition: cuy.py:1188
void edm::EventProcessor::handleNextEventForStreamAsync ( WaitingTask iTask,
unsigned int  iStreamIndex,
std::atomic< bool > *  finishedProcessingEvents 
)
private

Definition at line 1358 of file EventProcessor.cc.

References deferredExceptionPtr_, deferredExceptionPtrIsSet_, pyrootRender::destroy(), edm::WaitingTaskHolder::doneWaiting(), h, edm::make_waiting_task(), cmsPerfStripChart::operate(), processEventAsync(), edm::SerialTaskQueueChain::push(), readNextEventForStream(), edm::SharedResourcesAcquirer::serialQueueChain(), serviceToken_, and sourceResourcesAcquirer_.

Referenced by readAndProcessEvents().

1361  {
1362  auto recursionTask = make_waiting_task(tbb::task::allocate_root(), [this,iTask,iStreamIndex,finishedProcessingEvents](std::exception_ptr const* iPtr) {
1363  if(iPtr) {
1364  bool expected = false;
1365  if(deferredExceptionPtrIsSet_.compare_exchange_strong(expected,true)) {
1366  deferredExceptionPtr_ = *iPtr;
1367  {
1368  WaitingTaskHolder h(iTask);
1369  h.doneWaiting(*iPtr);
1370  }
1371  }
1372  //the stream will stop now
1373  iTask->decrement_ref_count();
1374  return;
1375  }
1376 
1377  handleNextEventForStreamAsync(iTask, iStreamIndex,finishedProcessingEvents);
1378  });
1379 
1380  sourceResourcesAcquirer_.serialQueueChain().push([this,finishedProcessingEvents,recursionTask,iTask,iStreamIndex]() {
1382 
1383  try {
1384  if(readNextEventForStream(iStreamIndex, finishedProcessingEvents) ) {
1385  processEventAsync( WaitingTaskHolder(recursionTask), iStreamIndex);
1386  } else {
1387  //the stream will stop now
1388  tbb::task::destroy(*recursionTask);
1389  iTask->decrement_ref_count();
1390  }
1391  } catch(...) {
1392  WaitingTaskHolder h(recursionTask);
1393  h.doneWaiting(std::current_exception());
1394  }
1395  });
1396  }
SharedResourcesAcquirer sourceResourcesAcquirer_
FWCore Framework interface EventSetupRecordImplementation h
Helper function to determine trigger accepts.
void processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
bool readNextEventForStream(unsigned int iStreamIndex, std::atomic< bool > *finishedProcessingEvents)
def destroy(e)
Definition: pyrootRender.py:13
void push(const T &iAction)
asynchronously pushes functor iAction into queue
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
SerialTaskQueueChain & serialQueueChain() const
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:90
std::exception_ptr deferredExceptionPtr_
void handleNextEventForStreamAsync(WaitingTask *iTask, unsigned int iStreamIndex, std::atomic< bool > *finishedProcessingEvents)
def operate(timelog, memlog, json_f, num)
void edm::EventProcessor::init ( std::shared_ptr< ProcessDesc > &  processDesc,
ServiceToken const &  token,
serviceregistry::ServiceLegacy  iLegacy 
)
private

Definition at line 376 of file EventProcessor.cc.

References edm::ScheduleItems::act_table_, act_table_, edm::ScheduleItems::actReg_, actReg_, edm::ScheduleItems::addCPRandTNS(), edm::ScheduleItems::branchIDListHelper(), branchIDListHelper(), branchIDListHelper_, trackingPlots::common, edm::errors::Configuration, esp_, espController_, Exception, FDEBUG, fileModeNoMerge_, edm::fillLooper(), forceESCacheClearOnNewRun_, edm::ParameterSet::getUntrackedParameter(), historyAppender_, diffTreeTool::index, edm::ScheduleItems::initMisc(), edm::ScheduleItems::initSchedule(), edm::ScheduleItems::initServices(), input_, edm::PrincipalCache::insert(), edm::ParentageRegistry::insertMapped(), edm::ParentageRegistry::instance(), mps_monitormerge::items, edm::serviceregistry::kConfigurationOverrides, looper_, edm::makeInput(), eostools::move(), jets_cff::nThreads, edm::PreallocationConfiguration::numberOfStreams(), cmsPerfStripChart::operate(), edm::parameterSet(), edm::popSubProcessVParameterSet(), preallocations_, edm::ScheduleItems::preg(), preg(), preg_, principalCache_, printDependencies_, edm::ScheduleItems::processConfiguration(), processConfiguration_, processContext_, edm::ParameterSet::registerIt(), schedule_, serviceToken_, edm::PrincipalCache::setNumberOfConcurrentPrincipals(), edm::ProcessContext::setProcessConfiguration(), edm::PrincipalCache::setProcessHistoryRegistry(), edm::IllegalParameters::setThrowAnException(), AlCaHLTBitMon_QueryRunRegistry::string, subProcesses_, edm::ScheduleItems::thinnedAssociationsHelper(), thinnedAssociationsHelper(), and thinnedAssociationsHelper_.

Referenced by EventProcessor().

378  {
379 
380  //std::cerr << processDesc->dump() << std::endl;
381 
382  // register the empty parentage vector , once and for all
384 
385  // register the empty parameter set, once and for all.
386  ParameterSet().registerIt();
387 
388  std::shared_ptr<ParameterSet> parameterSet = processDesc->getProcessPSet();
389 
390  // If there are subprocesses, pop the subprocess parameter sets out of the process parameter set
391  auto subProcessVParameterSet = popSubProcessVParameterSet(*parameterSet);
392  bool const hasSubProcesses = !subProcessVParameterSet.empty();
393 
394  // Now set some parameters specific to the main process.
395  ParameterSet const& optionsPset(parameterSet->getUntrackedParameterSet("options", ParameterSet()));
396  auto const& fileMode = optionsPset.getUntrackedParameter<std::string>("fileMode", "FULLMERGE");
397  if(fileMode != "NOMERGE" and fileMode != "FULLMERGE") {
398  throw Exception(errors::Configuration, "Illegal fileMode parameter value: ")
399  << fileMode << ".\n"
400  << "Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
401  } else {
402  fileModeNoMerge_ = (fileMode == "NOMERGE");
403  }
404  forceESCacheClearOnNewRun_ = optionsPset.getUntrackedParameter<bool>("forceEventSetupCacheClearOnNewRun", false);
405  //threading
406  unsigned int nThreads=1;
407  if(optionsPset.existsAs<unsigned int>("numberOfThreads",false)) {
408  nThreads = optionsPset.getUntrackedParameter<unsigned int>("numberOfThreads");
409  if(nThreads == 0) {
410  nThreads = 1;
411  }
412  }
413  /* TODO: when we support having each stream run in a different thread use this default
414  unsigned int nStreams =nThreads;
415  */
416  unsigned int nStreams =1;
417  if(optionsPset.existsAs<unsigned int>("numberOfStreams",false)) {
418  nStreams = optionsPset.getUntrackedParameter<unsigned int>("numberOfStreams");
419  if(nStreams==0) {
420  nStreams = nThreads;
421  }
422  }
423  if(nThreads >1) {
424  edm::LogInfo("ThreadStreamSetup") <<"setting # threads "<<nThreads<<"\nsetting # streams "<<nStreams;
425  }
426 
427  /*
428  bool nRunsSet = false;
429  */
430  unsigned int nConcurrentRuns =1;
431  /*
432  if(nRunsSet = optionsPset.existsAs<unsigned int>("numberOfConcurrentRuns",false)) {
433  nConcurrentRuns = optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentRuns");
434  }
435  */
436  unsigned int nConcurrentLumis =1;
437  /*
438  if(optionsPset.existsAs<unsigned int>("numberOfConcurrentLuminosityBlocks",false)) {
439  nConcurrentLumis = optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentLuminosityBlocks");
440  } else {
441  nConcurrentLumis = nConcurrentRuns;
442  }
443  */
444  //Check that relationships between threading parameters makes sense
445  /*
446  if(nThreads<nStreams) {
447  //bad
448  }
449  if(nConcurrentRuns>nStreams) {
450  //bad
451  }
452  if(nConcurrentRuns>nConcurrentLumis) {
453  //bad
454  }
455  */
456  IllegalParameters::setThrowAnException(optionsPset.getUntrackedParameter<bool>("throwIfIllegalParameter", true));
457 
458  printDependencies_ = optionsPset.getUntrackedParameter("printDependencies", false);
459 
460  // Now do general initialization
461  ScheduleItems items;
462 
463  //initialize the services
464  auto& serviceSets = processDesc->getServicesPSets();
465  ServiceToken token = items.initServices(serviceSets, *parameterSet, iToken, iLegacy, true);
466  serviceToken_ = items.addCPRandTNS(*parameterSet, token);
467 
468  //make the services available
470 
471  if(nStreams>1) {
473  handler->willBeUsingThreads();
474  }
475 
476  // intialize miscellaneous items
477  std::shared_ptr<CommonParams> common(items.initMisc(*parameterSet));
478 
479  // intialize the event setup provider
480  esp_ = espController_->makeProvider(*parameterSet);
481 
482  // initialize the looper, if any
483  looper_ = fillLooper(*espController_, *esp_, *parameterSet);
484  if(looper_) {
485  looper_->setActionTable(items.act_table_.get());
486  looper_->attachTo(*items.actReg_);
487 
488  //For now loopers make us run only 1 transition at a time
489  nStreams=1;
490  nConcurrentLumis=1;
491  nConcurrentRuns=1;
492  }
493 
494  preallocations_ = PreallocationConfiguration{nThreads,nStreams,nConcurrentLumis,nConcurrentRuns};
495 
496  // initialize the input source
497  input_ = makeInput(*parameterSet,
498  *common,
499  items.preg(),
500  items.branchIDListHelper(),
501  items.thinnedAssociationsHelper(),
502  items.actReg_,
503  items.processConfiguration(),
505 
506  // intialize the Schedule
507  schedule_ = items.initSchedule(*parameterSet,hasSubProcesses,preallocations_,&processContext_);
508 
509  // set the data members
510  act_table_ = std::move(items.act_table_);
511  actReg_ = items.actReg_;
512  preg_ = items.preg();
513  branchIDListHelper_ = items.branchIDListHelper();
514  thinnedAssociationsHelper_ = items.thinnedAssociationsHelper();
515  processConfiguration_ = items.processConfiguration();
517  principalCache_.setProcessHistoryRegistry(input_->processHistoryRegistry());
518 
519  FDEBUG(2) << parameterSet << std::endl;
520 
522  for(unsigned int index = 0; index<preallocations_.numberOfStreams(); ++index ) {
523  // Reusable event principal
524  auto ep = std::make_shared<EventPrincipal>(preg(), branchIDListHelper(),
527  }
528 
529  // fill the subprocesses, if there are any
530  subProcesses_.reserve(subProcessVParameterSet.size());
531  for(auto& subProcessPSet : subProcessVParameterSet) {
532  subProcesses_.emplace_back(subProcessPSet,
533  *parameterSet,
534  preg(),
537  SubProcessParentageHelper(),
539  *actReg_,
540  token,
543  &processContext_);
544  }
545  }
void insert(std::shared_ptr< RunPrincipal > rp)
ProcessContext processContext_
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
PreallocationConfiguration preallocations_
std::vector< SubProcess > subProcesses_
std::unique_ptr< InputSource > makeInput(ParameterSet &params, CommonParams const &common, std::shared_ptr< ProductRegistry > preg, std::shared_ptr< BranchIDListHelper > branchIDListHelper, std::shared_ptr< ThinnedAssociationsHelper > thinnedAssociationsHelper, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, PreallocationConfiguration const &allocations)
ServiceToken serviceToken_
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
std::shared_ptr< ProductRegistry const > preg() const
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
static void setThrowAnException(bool v)
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< EDLooperBase > fillLooper(eventsetup::EventSetupsController &esController, eventsetup::EventSetupProvider &cp, ParameterSet &params)
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
Definition: common.py:1
std::vector< ParameterSet > popSubProcessVParameterSet(ParameterSet &parameterSet)
Definition: SubProcess.cc:657
void setProcessHistoryRegistry(ProcessHistoryRegistry const &phr)
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
static ParentageRegistry * instance()
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
bool insertMapped(value_type const &v)
ParameterSet const & parameterSet(Provenance const &provenance)
Definition: Provenance.cc:11
def move(src, dest)
Definition: eostools.py:510
PrincipalCache principalCache_
def operate(timelog, memlog, json_f, num)
std::shared_ptr<EDLooperBase const> edm::EventProcessor::looper ( ) const
inlineprivate

Definition at line 267 of file EventProcessor.h.

References edm::get_underlying_safe().

Referenced by endJob().

267 {return get_underlying_safe(looper_);}
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
std::shared_ptr<EDLooperBase>& edm::EventProcessor::looper ( )
inlineprivate

Definition at line 268 of file EventProcessor.h.

References edm::get_underlying_safe().

268 {return get_underlying_safe(looper_);}
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::LuminosityBlockNumber_t edm::EventProcessor::nextLuminosityBlockID ( )

Definition at line 740 of file EventProcessor.cc.

References input_.

740  {
741  return input_->luminosityBlock();
742  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::pair< edm::ProcessHistoryID, edm::RunNumber_t > edm::EventProcessor::nextRunID ( )

Definition at line 735 of file EventProcessor.cc.

References input_.

735  {
736  return std::make_pair(input_->reducedProcessHistoryID(), input_->run());
737  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
InputSource::ItemType edm::EventProcessor::nextTransitionType ( )

Definition at line 714 of file EventProcessor.cc.

References actReg_, checkForAsyncStopRequest(), epSuccess, edm::ExternalSignal, input_, edm::InputSource::IsStop, edm::InputSource::IsSynchronize, and runEdmFileComparison::returnCode.

714  {
715  SendSourceTerminationSignalIfException sentry(actReg_.get());
716  InputSource::ItemType itemType;
717  //For now, do nothing with InputSource::IsSynchronize
718  do {
719  itemType = input_->nextItemType();
720  } while( itemType == InputSource::IsSynchronize);
721 
722  sentry.completedSuccessfully();
723 
725 
726  if(checkForAsyncStopRequest(returnCode)) {
727  actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExternalSignal);
728  return InputSource::IsStop;
729  }
730 
731  return itemType;
732  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
bool checkForAsyncStopRequest(StatusCode &)
std::shared_ptr< ActivityRegistry > actReg_
void edm::EventProcessor::openOutputFiles ( )

Definition at line 844 of file EventProcessor.cc.

References fb_, FDEBUG, edm::for_all(), schedule_, and subProcesses_.

844  {
845  if (fb_.get() != nullptr) {
846  schedule_->openOutputFiles(*fb_);
847  for_all(subProcesses_, [this](auto& subProcess){ subProcess.openOutputFiles(*fb_); });
848  }
849  FDEBUG(1) << "\topenOutputFiles\n";
850  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
EventProcessor& edm::EventProcessor::operator= ( EventProcessor const &  )
delete
std::shared_ptr<ProductRegistry const> edm::EventProcessor::preg ( ) const
inlineprivate

Definition at line 261 of file EventProcessor.h.

References edm::get_underlying_safe().

Referenced by beginJob(), init(), readAndMergeLumi(), readAndMergeRun(), readFile(), readLuminosityBlock(), and readRun().

261 {return get_underlying_safe(preg_);}
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
std::shared_ptr<ProductRegistry>& edm::EventProcessor::preg ( )
inlineprivate

Definition at line 262 of file EventProcessor.h.

References edm::get_underlying_safe().

262 {return get_underlying_safe(preg_);}
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
void edm::EventProcessor::prepareForNextLoop ( )

Definition at line 906 of file EventProcessor.cc.

References esp_, FDEBUG, edm::propagate_const< T >::get(), and looper_.

Referenced by runToCompletion().

906  {
907  looper_->prepareForNextLoop(esp_.get());
908  FDEBUG(1) << "\tprepareForNextLoop\n";
909  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
element_type const * get() const
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
ProcessConfiguration const& edm::EventProcessor::processConfiguration ( ) const
inline
void edm::EventProcessor::processEventAsync ( WaitingTaskHolder  iHolder,
unsigned int  iStreamIndex 
)
private

Definition at line 1446 of file EventProcessor.cc.

References edm::make_functor_task(), and processEventAsyncImpl().

Referenced by handleNextEventForStreamAsync().

1447  {
1448  tbb::task::spawn( *make_functor_task( tbb::task::allocate_root(), [=]() {
1449  processEventAsyncImpl(iHolder, iStreamIndex);
1450  }) );
1451  }
FunctorTask< F > * make_functor_task(ALLOC &&iAlloc, F f)
Definition: FunctorTask.h:47
void processEventAsyncImpl(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
void edm::EventProcessor::processEventAsyncImpl ( WaitingTaskHolder  iHolder,
unsigned int  iStreamIndex 
)
private

Definition at line 1453 of file EventProcessor.cc.

References edm::WaitingTaskHolder::doneWaiting(), esp_, ev, edm::PrincipalCache::eventPrincipal(), FDEBUG, edm::Service< T >::isAvailable(), looper_, edm::PrincipalCache::lumiPrincipalPtr(), edm::make_waiting_task(), eostools::move(), cmsPerfStripChart::operate(), principalCache_, processEventWithLooper(), groupFilesInBlocks::reverse, schedule_, serviceToken_, and subProcesses_.

Referenced by processEventAsync().

1454  {
1455  auto pep = &(principalCache_.eventPrincipal(iStreamIndex));
1456  pep->setLuminosityBlockPrincipal(principalCache_.lumiPrincipalPtr());
1457 
1460  if(rng.isAvailable()) {
1461  Event ev(*pep, ModuleDescription(), nullptr);
1462  rng->postEventRead(ev);
1463  }
1464  assert(pep->luminosityBlockPrincipalPtrValid());
1465  assert(principalCache_.lumiPrincipalPtr()->run() == pep->run());
1466  assert(principalCache_.lumiPrincipalPtr()->luminosityBlock() == pep->luminosityBlock());
1467 
1468  WaitingTaskHolder finalizeEventTask( make_waiting_task(
1469  tbb::task::allocate_root(),
1470  [this,pep,iHolder](std::exception_ptr const* iPtr) mutable
1471  {
1473 
1474  //NOTE: If we have a looper we only have one Stream
1475  if(looper_) {
1476  processEventWithLooper(*pep);
1477  }
1478 
1479  FDEBUG(1) << "\tprocessEvent\n";
1480  pep->clearEventPrincipal();
1481  if(iPtr) {
1482  iHolder.doneWaiting(*iPtr);
1483  } else {
1484  iHolder.doneWaiting(std::exception_ptr());
1485  }
1486  }
1487  )
1488  );
1489  WaitingTaskHolder afterProcessTask;
1490  if(subProcesses_.empty()) {
1491  afterProcessTask = std::move(finalizeEventTask);
1492  } else {
1493  //Need to run SubProcesses after schedule has finished
1494  // with the event
1495  afterProcessTask = WaitingTaskHolder(
1496  make_waiting_task(tbb::task::allocate_root(),
1497  [this,pep,finalizeEventTask] (std::exception_ptr const* iPtr) mutable
1498  {
1499  if(not iPtr) {
1501 
1502  //when run with 1 thread, we want to the order to be what
1503  // it was before. This requires reversing the order since
1504  // tasks are run last one in first one out
1505  for(auto& subProcess: boost::adaptors::reverse(subProcesses_)) {
1506  subProcess.doEventAsync(finalizeEventTask,*pep);
1507  }
1508  } else {
1509  finalizeEventTask.doneWaiting(*iPtr);
1510  }
1511  })
1512  );
1513  }
1514 
1515  schedule_->processOneEventAsync(std::move(afterProcessTask),
1516  iStreamIndex,*pep, esp_->eventSetup());
1517 
1518  }
bool ev
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::vector< SubProcess > subProcesses_
ServiceToken serviceToken_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:90
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< LuminosityBlockPrincipal > const & lumiPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) const
void processEventWithLooper(EventPrincipal &)
def move(src, dest)
Definition: eostools.py:510
PrincipalCache principalCache_
def operate(timelog, memlog, json_f, num)
void edm::EventProcessor::processEventWithLooper ( EventPrincipal iPrincipal)
private

Definition at line 1520 of file EventProcessor.cc.

References esp_, input_, edm::EDLooperBase::kContinue, edm::ProcessingController::kToPreviousEvent, edm::ProcessingController::kToSpecifiedEvent, edm::ProcessingController::lastOperationSucceeded(), looper_, processContext_, edm::ProcessingController::requestedTransition(), edm::ProcessingController::setLastOperationSucceeded(), shouldWeStop_, edm::ProcessingController::specifiedEventTransition(), mps_update::status, edm::EventPrincipal::streamID(), and summarizeEdmComparisonLogfiles::succeeded.

Referenced by processEventAsyncImpl().

1520  {
1521  bool randomAccess = input_->randomAccess();
1522  ProcessingController::ForwardState forwardState = input_->forwardState();
1523  ProcessingController::ReverseState reverseState = input_->reverseState();
1524  ProcessingController pc(forwardState, reverseState, randomAccess);
1525 
1527  do {
1528 
1529  StreamContext streamContext(iPrincipal.streamID(), &processContext_);
1530  status = looper_->doDuringLoop(iPrincipal, esp_->eventSetup(), pc, &streamContext);
1531 
1532  bool succeeded = true;
1533  if(randomAccess) {
1534  if(pc.requestedTransition() == ProcessingController::kToPreviousEvent) {
1535  input_->skipEvents(-2);
1536  }
1537  else if(pc.requestedTransition() == ProcessingController::kToSpecifiedEvent) {
1538  succeeded = input_->goToEvent(pc.specifiedEventTransition());
1539  }
1540  }
1541  pc.setLastOperationSucceeded(succeeded);
1542  } while(!pc.lastOperationSucceeded());
1543  if(status != EDLooperBase::kContinue) shouldWeStop_ = true;
1544  }
ProcessContext processContext_
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
int edm::EventProcessor::readAndMergeLumi ( )

Definition at line 1273 of file EventProcessor.cc.

References actReg_, input_, edm::PrincipalCache::lumiPrincipalPtr(), edm::PrincipalCache::merge(), preg(), and principalCache_.

1273  {
1274  principalCache_.merge(input_->luminosityBlockAuxiliary(), preg());
1275  {
1276  SendSourceTerminationSignalIfException sentry(actReg_.get());
1277  input_->readAndMergeLumi(*principalCache_.lumiPrincipalPtr());
1278  sentry.completedSuccessfully();
1279  }
1280  return input_->luminosityBlock();
1281  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::shared_ptr< ProductRegistry const > preg() const
std::shared_ptr< LuminosityBlockPrincipal > const & lumiPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) const
std::shared_ptr< ActivityRegistry > actReg_
void merge(std::shared_ptr< RunAuxiliary > aux, std::shared_ptr< ProductRegistry const > reg)
PrincipalCache principalCache_
std::pair< ProcessHistoryID, RunNumber_t > edm::EventProcessor::readAndMergeRun ( )

Definition at line 1236 of file EventProcessor.cc.

References actReg_, input_, edm::PrincipalCache::merge(), preg(), principalCache_, and edm::PrincipalCache::runPrincipalPtr().

1236  {
1237  principalCache_.merge(input_->runAuxiliary(), preg());
1238  auto runPrincipal =principalCache_.runPrincipalPtr();
1239  {
1240  SendSourceTerminationSignalIfException sentry(actReg_.get());
1241  input_->readAndMergeRun(*runPrincipal);
1242  sentry.completedSuccessfully();
1243  }
1244  assert(input_->reducedProcessHistoryID() == runPrincipal->reducedProcessHistoryID());
1245  return std::make_pair(runPrincipal->reducedProcessHistoryID(), input_->run());
1246  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::shared_ptr< ProductRegistry const > preg() const
std::shared_ptr< RunPrincipal > const & runPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run) const
std::shared_ptr< ActivityRegistry > actReg_
void merge(std::shared_ptr< RunAuxiliary > aux, std::shared_ptr< ProductRegistry const > reg)
PrincipalCache principalCache_
InputSource::ItemType edm::EventProcessor::readAndProcessEvents ( )

Definition at line 1398 of file EventProcessor.cc.

References asyncStopRequestedWhileProcessingEvents_, deferredExceptionPtr_, deferredExceptionPtrIsSet_, firstEventInBlock_, handleNextEventForStreamAsync(), edm::InputSource::IsEvent, edm::make_empty_waiting_task(), edm::make_waiting_task(), nextItemTypeFromProcessingEvents_, edm::PreallocationConfiguration::numberOfStreams(), and preallocations_.

1398  {
1401 
1402  std::atomic<bool> finishedProcessingEvents{false};
1403  auto finishedProcessingEventsPtr = &finishedProcessingEvents;
1404 
1405  //The state machine already found the event so
1406  // we have to avoid looking again
1407  firstEventInBlock_ = true;
1408 
1409  //To wait, the ref count has to b 1+#streams
1410  auto eventLoopWaitTask = make_empty_waiting_task();
1411  auto eventLoopWaitTaskPtr = eventLoopWaitTask.get();
1412  eventLoopWaitTask->increment_ref_count();
1413 
1414  const unsigned int kNumStreams = preallocations_.numberOfStreams();
1415  unsigned int iStreamIndex = 0;
1416  for(; iStreamIndex<kNumStreams-1; ++iStreamIndex) {
1417  eventLoopWaitTask->increment_ref_count();
1418  tbb::task::enqueue( *make_waiting_task(tbb::task::allocate_root(),[this,iStreamIndex,finishedProcessingEventsPtr,eventLoopWaitTaskPtr](std::exception_ptr const*){
1419  handleNextEventForStreamAsync(eventLoopWaitTaskPtr,iStreamIndex,finishedProcessingEventsPtr);
1420  }) );
1421  }
1422  eventLoopWaitTask->increment_ref_count();
1423  eventLoopWaitTask->spawn_and_wait_for_all( *make_waiting_task(tbb::task::allocate_root(),[this,iStreamIndex,finishedProcessingEventsPtr,eventLoopWaitTaskPtr](std::exception_ptr const*){
1424  handleNextEventForStreamAsync(eventLoopWaitTaskPtr,iStreamIndex,finishedProcessingEventsPtr);
1425  }));
1426 
1427  //One of the processing threads saw an exception
1429  std::rethrow_exception(deferredExceptionPtr_);
1430  }
1432  }
PreallocationConfiguration preallocations_
std::atomic< bool > deferredExceptionPtrIsSet_
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
InputSource::ItemType nextItemTypeFromProcessingEvents_
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:90
std::exception_ptr deferredExceptionPtr_
bool asyncStopRequestedWhileProcessingEvents_
void handleNextEventForStreamAsync(WaitingTask *iTask, unsigned int iStreamIndex, std::atomic< bool > *finishedProcessingEvents)
void edm::EventProcessor::readEvent ( unsigned int  iStreamIndex)
private

Definition at line 1434 of file EventProcessor.cc.

References actReg_, edm::PrincipalCache::eventPrincipal(), FDEBUG, input_, principalCache_, and processContext_.

Referenced by readNextEventForStream().

1434  {
1435  //TODO this will have to become per stream
1436  auto& event = principalCache_.eventPrincipal(iStreamIndex);
1437  StreamContext streamContext(event.streamID(), &processContext_);
1438 
1439  SendSourceTerminationSignalIfException sentry(actReg_.get());
1440  input_->readEvent(event, streamContext);
1441  sentry.completedSuccessfully();
1442 
1443  FDEBUG(1) << "\treadEvent\n";
1444  }
ProcessContext processContext_
edm::propagate_const< std::unique_ptr< InputSource > > input_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
std::shared_ptr< ActivityRegistry > actReg_
Definition: event.py:1
PrincipalCache principalCache_
void edm::EventProcessor::readFile ( )

Definition at line 818 of file EventProcessor.cc.

References actReg_, edm::PrincipalCache::adjustEventsToNewProductRegistry(), edm::PrincipalCache::adjustIndexesAfterProductRegistryAddition(), fb_, FDEBUG, input_, edm::PreallocationConfiguration::numberOfStreams(), edm::PreallocationConfiguration::numberOfThreads(), edm::FileBlock::ParallelProcesses, preallocations_, preg(), preg_, principalCache_, and findQualityFiles::size.

Referenced by Vispa.Plugins.EventBrowser.EventBrowserTabController.EventBrowserTabController::navigate(), Vispa.Main.TabController.TabController::open(), and Vispa.Main.TabController.TabController::refresh().

818  {
819  FDEBUG(1) << " \treadFile\n";
820  size_t size = preg_->size();
821  SendSourceTerminationSignalIfException sentry(actReg_.get());
822 
823  fb_ = input_->readFile();
824  if(size < preg_->size()) {
826  }
830  fb_->setNotFastClonable(FileBlock::ParallelProcesses);
831  }
832  sentry.completedSuccessfully();
833  }
size
Write out results.
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
void adjustIndexesAfterProductRegistryAddition()
#define FDEBUG(lev)
Definition: DebugMacros.h:18
PreallocationConfiguration preallocations_
std::shared_ptr< ProductRegistry const > preg() const
void adjustEventsToNewProductRegistry(std::shared_ptr< ProductRegistry const > reg)
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
std::shared_ptr< ActivityRegistry > actReg_
PrincipalCache principalCache_
int edm::EventProcessor::readLuminosityBlock ( )

Definition at line 1248 of file EventProcessor.cc.

References actReg_, Exception, edm::PrincipalCache::hasLumiPrincipal(), edm::PrincipalCache::hasRunPrincipal(), historyAppender_, input_, edm::PrincipalCache::insert(), edm::errors::LogicError, preg(), principalCache_, processConfiguration_, and edm::PrincipalCache::runPrincipalPtr().

1248  {
1251  << "EventProcessor::readRun\n"
1252  << "Illegal attempt to insert lumi into cache\n"
1253  << "Contact a Framework Developer\n";
1254  }
1257  << "EventProcessor::readRun\n"
1258  << "Illegal attempt to insert lumi into cache\n"
1259  << "Run is invalid\n"
1260  << "Contact a Framework Developer\n";
1261  }
1262  auto lbp = std::make_shared<LuminosityBlockPrincipal>(input_->luminosityBlockAuxiliary(), preg(), *processConfiguration_, historyAppender_.get(), 0);
1263  {
1264  SendSourceTerminationSignalIfException sentry(actReg_.get());
1265  input_->readLuminosityBlock(*lbp, *historyAppender_);
1266  sentry.completedSuccessfully();
1267  }
1268  lbp->setRunPrincipal(principalCache_.runPrincipalPtr());
1269  principalCache_.insert(lbp);
1270  return input_->luminosityBlock();
1271  }
void insert(std::shared_ptr< RunPrincipal > rp)
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
bool hasRunPrincipal() const
std::shared_ptr< ProductRegistry const > preg() const
std::shared_ptr< ProcessConfiguration const > processConfiguration_
bool hasLumiPrincipal() const
std::shared_ptr< RunPrincipal > const & runPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run) const
std::shared_ptr< ActivityRegistry > actReg_
PrincipalCache principalCache_
bool edm::EventProcessor::readNextEventForStream ( unsigned int  iStreamIndex,
std::atomic< bool > *  finishedProcessingEvents 
)
private

Definition at line 1307 of file EventProcessor.cc.

References actReg_, asyncStopRequestedWhileProcessingEvents_, asyncStopStatusCodeFromProcessingEvents_, checkForAsyncStopRequest(), deferredExceptionPtr_, deferredExceptionPtrIsSet_, edm::ExternalSignal, firstEventInBlock_, input_, edm::InputSource::IsEvent, nextItemTypeFromProcessingEvents_, cmsPerfStripChart::operate(), readEvent(), serviceToken_, shouldWeStop(), and sourceMutex_.

Referenced by handleNextEventForStreamAsync().

1308  {
1309  if(shouldWeStop()) {
1310  return false;
1311  }
1312 
1313  if(deferredExceptionPtrIsSet_.load(std::memory_order_acquire)) {
1314  return false;
1315  }
1316 
1317  if(finishedProcessingEvents->load(std::memory_order_acquire)) {
1318  return false;
1319  }
1320 
1322  try {
1323  //need to use lock in addition to the serial task queue because
1324  // of delayed provenance reading and reading data in response to
1325  // edm::Refs etc
1326  std::lock_guard<std::recursive_mutex> guard(*(sourceMutex_.get()));
1327  if(not firstEventInBlock_) {
1328  //The state machine already called input_->nextItemType
1329  // and found an event. We can't call input_->nextItemType
1330  // again since it would move to the next transition
1331  InputSource::ItemType itemType = input_->nextItemType();
1332  if (InputSource::IsEvent !=itemType) {
1334  finishedProcessingEvents->store(true,std::memory_order_release);
1335  //std::cerr<<"next item type "<<itemType<<"\n";
1336  return false;
1337  }
1339  //std::cerr<<"task told to async stop\n";
1340  actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExternalSignal);
1341  return false;
1342  }
1343  } else {
1344  firstEventInBlock_ = false;
1345  }
1346  readEvent(iStreamIndex);
1347  } catch (...) {
1348  bool expected =false;
1349  if(deferredExceptionPtrIsSet_.compare_exchange_strong(expected,true)) {
1350  deferredExceptionPtr_ = std::current_exception();
1351 
1352  }
1353  return false;
1354  }
1355  return true;
1356  }
void readEvent(unsigned int iStreamIndex)
edm::propagate_const< std::unique_ptr< InputSource > > input_
bool checkForAsyncStopRequest(StatusCode &)
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
InputSource::ItemType nextItemTypeFromProcessingEvents_
std::shared_ptr< std::recursive_mutex > sourceMutex_
StatusCode asyncStopStatusCodeFromProcessingEvents_
std::exception_ptr deferredExceptionPtr_
bool asyncStopRequestedWhileProcessingEvents_
std::shared_ptr< ActivityRegistry > actReg_
bool shouldWeStop() const
def operate(timelog, memlog, json_f, num)
std::pair< ProcessHistoryID, RunNumber_t > edm::EventProcessor::readRun ( )

Definition at line 1218 of file EventProcessor.cc.

References actReg_, Exception, edm::PrincipalCache::hasRunPrincipal(), historyAppender_, input_, edm::PrincipalCache::insert(), edm::errors::LogicError, preg(), principalCache_, and processConfiguration_.

1218  {
1221  << "EventProcessor::readRun\n"
1222  << "Illegal attempt to insert run into cache\n"
1223  << "Contact a Framework Developer\n";
1224  }
1225  auto rp = std::make_shared<RunPrincipal>(input_->runAuxiliary(), preg(), *processConfiguration_, historyAppender_.get(), 0);
1226  {
1227  SendSourceTerminationSignalIfException sentry(actReg_.get());
1228  input_->readRun(*rp, *historyAppender_);
1229  sentry.completedSuccessfully();
1230  }
1231  assert(input_->reducedProcessHistoryID() == rp->reducedProcessHistoryID());
1232  principalCache_.insert(rp);
1233  return std::make_pair(rp->reducedProcessHistoryID(), input_->run());
1234  }
void insert(std::shared_ptr< RunPrincipal > rp)
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
bool hasRunPrincipal() const
std::shared_ptr< ProductRegistry const > preg() const
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< ActivityRegistry > actReg_
PrincipalCache principalCache_
void edm::EventProcessor::respondToCloseInputFile ( )

Definition at line 869 of file EventProcessor.cc.

References fb_, FDEBUG, edm::for_all(), schedule_, and subProcesses_.

869  {
870  if (fb_.get() != nullptr) {
871  schedule_->respondToCloseInputFile(*fb_);
872  for_all(subProcesses_, [this](auto& subProcess){ subProcess.respondToCloseInputFile(*fb_); });
873  }
874  FDEBUG(1) << "\trespondToCloseInputFile\n";
875  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
void edm::EventProcessor::respondToOpenInputFile ( )

Definition at line 860 of file EventProcessor.cc.

References branchIDListHelper_, fb_, FDEBUG, edm::for_all(), schedule_, and subProcesses_.

860  {
861  for_all(subProcesses_, [this](auto& subProcess){ subProcess.updateBranchIDListHelper(branchIDListHelper_->branchIDLists()); } );
862  if (fb_.get() != nullptr) {
863  schedule_->respondToOpenInputFile(*fb_);
864  for_all(subProcesses_, [this](auto& subProcess) { subProcess.respondToOpenInputFile(*fb_); });
865  }
866  FDEBUG(1) << "\trespondToOpenInputFile\n";
867  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
void edm::EventProcessor::rewindInput ( )

Definition at line 900 of file EventProcessor.cc.

References FDEBUG, and input_.

Referenced by runToCompletion().

900  {
901  input_->repeat();
902  input_->rewind();
903  FDEBUG(1) << "\trewind\n";
904  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
EventProcessor::StatusCode edm::EventProcessor::run ( void  )
inline

Definition at line 330 of file EventProcessor.h.

Referenced by Types.EventID::cppID(), and Types.LuminosityBlockID::cppID().

330  {
331  return runToCompletion();
332  }
StatusCode runToCompletion()
EventProcessor::StatusCode edm::EventProcessor::runToCompletion ( )

Definition at line 745 of file EventProcessor.cc.

References cms::Exception::addAdditionalInfo(), cms::Exception::alreadyPrinted(), asyncStopRequestedWhileProcessingEvents_, asyncStopStatusCodeFromProcessingEvents_, beginJob(), deferredExceptionPtr_, deferredExceptionPtrIsSet_, doErrorStuff(), MillePedeFileConverter_cfg::e, endOfLoop(), epSuccess, Exception, exceptionMessageFiles_, exceptionMessageLumis_, exceptionMessageRuns_, fileModeNoMerge_, edm::InputSource::IsEvent, edm::InputSource::IsStop, nextItemTypeFromProcessingEvents_, cmsPerfStripChart::operate(), prepareForNextLoop(), runEdmFileComparison::returnCode, rewindInput(), serviceToken_, startingNewLoop(), and edm::convertException::wrap().

Referenced by PythonEventProcessor::run().

745  {
746 
749  {
750  beginJob(); //make sure this was called
751 
752  // make the services available
754 
755 
758  try {
759  FilesProcessor fp(fileModeNoMerge_);
760 
761  convertException::wrap([&]() {
762  bool firstTime = true;
763  do {
764  if(not firstTime) {
766  rewindInput();
767  } else {
768  firstTime = false;
769  }
770  startingNewLoop();
771 
772  auto trans = fp.processFiles(*this);
773 
774  fp.normalEnd();
775 
776  if(deferredExceptionPtrIsSet_.load()) {
777  std::rethrow_exception(deferredExceptionPtr_);
778  }
779  if(trans != InputSource::IsStop) {
780  //problem with the source
781  doErrorStuff();
782 
783  throw cms::Exception("BadTransition")
784  << "Unexpected transition change "
785  << trans;
786 
787  }
788  } while(not endOfLoop());
789  }); // convertException::wrap
790 
791  } // Try block
792  catch (cms::Exception & e) {
793  if (!exceptionMessageLumis_.empty()) {
795  if (e.alreadyPrinted()) {
796  LogAbsolute("Additional Exceptions") << exceptionMessageLumis_;
797  }
798  }
799  if (!exceptionMessageRuns_.empty()) {
801  if (e.alreadyPrinted()) {
802  LogAbsolute("Additional Exceptions") << exceptionMessageRuns_;
803  }
804  }
805  if (!exceptionMessageFiles_.empty()) {
807  if (e.alreadyPrinted()) {
808  LogAbsolute("Additional Exceptions") << exceptionMessageFiles_;
809  }
810  }
811  throw;
812  }
813  }
814 
815  return returnCode;
816  }
std::string exceptionMessageRuns_
bool alreadyPrinted() const
Definition: Exception.cc:251
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
std::string exceptionMessageLumis_
InputSource::ItemType nextItemTypeFromProcessingEvents_
void addAdditionalInfo(std::string const &info)
Definition: Exception.cc:235
std::string exceptionMessageFiles_
StatusCode asyncStopStatusCodeFromProcessingEvents_
std::exception_ptr deferredExceptionPtr_
auto wrap(F iFunc) -> decltype(iFunc())
bool asyncStopRequestedWhileProcessingEvents_
def operate(timelog, memlog, json_f, num)
bool edm::EventProcessor::setDeferredException ( std::exception_ptr  iException)

Definition at line 1572 of file EventProcessor.cc.

References deferredExceptionPtr_, and deferredExceptionPtrIsSet_.

1572  {
1573  bool expected =false;
1574  if(deferredExceptionPtrIsSet_.compare_exchange_strong(expected,true)) {
1575  deferredExceptionPtr_ = iException;
1576  return true;
1577  }
1578  return false;
1579  }
std::atomic< bool > deferredExceptionPtrIsSet_
std::exception_ptr deferredExceptionPtr_
void edm::EventProcessor::setExceptionMessageFiles ( std::string &  message)

Definition at line 1560 of file EventProcessor.cc.

References exceptionMessageFiles_, and python.rootplot.argparse::message.

1560  {
1562  }
std::string exceptionMessageFiles_
void edm::EventProcessor::setExceptionMessageLumis ( std::string &  message)

Definition at line 1568 of file EventProcessor.cc.

References exceptionMessageLumis_, and python.rootplot.argparse::message.

1568  {
1570  }
std::string exceptionMessageLumis_
void edm::EventProcessor::setExceptionMessageRuns ( std::string &  message)

Definition at line 1564 of file EventProcessor.cc.

References exceptionMessageRuns_, and python.rootplot.argparse::message.

1564  {
1566  }
std::string exceptionMessageRuns_
bool edm::EventProcessor::shouldWeCloseOutput ( ) const

Definition at line 911 of file EventProcessor.cc.

References FDEBUG, schedule_, and subProcesses_.

911  {
912  FDEBUG(1) << "\tshouldWeCloseOutput\n";
913  if(!subProcesses_.empty()) {
914  for(auto const& subProcess : subProcesses_) {
915  if(subProcess.shouldWeCloseOutput()) {
916  return true;
917  }
918  }
919  return false;
920  }
921  return schedule_->shouldWeCloseOutput();
922  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::vector< SubProcess > subProcesses_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
bool edm::EventProcessor::shouldWeStop ( ) const

Definition at line 1546 of file EventProcessor.cc.

References FDEBUG, schedule_, shouldWeStop_, and subProcesses_.

Referenced by readNextEventForStream().

1546  {
1547  FDEBUG(1) << "\tshouldWeStop\n";
1548  if(shouldWeStop_) return true;
1549  if(!subProcesses_.empty()) {
1550  for(auto const& subProcess : subProcesses_) {
1551  if(subProcess.terminate()) {
1552  return true;
1553  }
1554  }
1555  return false;
1556  }
1557  return schedule_->terminate();
1558  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::vector< SubProcess > subProcesses_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
void edm::EventProcessor::startingNewLoop ( )

Definition at line 877 of file EventProcessor.cc.

References FDEBUG, looper_, looperBeginJobRun_, and shouldWeStop_.

Referenced by runToCompletion().

877  {
878  shouldWeStop_ = false;
879  //NOTE: for first loop, need to delay calling 'doStartingNewLoop'
880  // until after we've called beginOfJob
881  if(looper_ && looperBeginJobRun_) {
882  looper_->doStartingNewLoop();
883  }
884  FDEBUG(1) << "\tstartingNewLoop\n";
885  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
std::shared_ptr<ThinnedAssociationsHelper const> edm::EventProcessor::thinnedAssociationsHelper ( ) const
inlineprivate

Definition at line 265 of file EventProcessor.h.

References edm::get_underlying_safe().

Referenced by init().

std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
std::shared_ptr<ThinnedAssociationsHelper>& edm::EventProcessor::thinnedAssociationsHelper ( )
inlineprivate

Definition at line 266 of file EventProcessor.h.

References edm::get_underlying_safe().

std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
int edm::EventProcessor::totalEvents ( ) const

Return the number of events this EventProcessor has tried to process (inclues both successes and failures, including failures due to exceptions during processing).

Definition at line 663 of file EventProcessor.cc.

References schedule_.

Referenced by PythonEventProcessor::totalEvents().

663  {
664  return schedule_->totalEvents();
665  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
int edm::EventProcessor::totalEventsFailed ( ) const

Return the number of events that have not passed any trigger. (N.B. totalEventsFailed() + totalEventsPassed() == totalEvents()

Definition at line 673 of file EventProcessor.cc.

References schedule_.

Referenced by PythonEventProcessor::totalEventsFailed().

673  {
674  return schedule_->totalEventsFailed();
675  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
int edm::EventProcessor::totalEventsPassed ( ) const

Return the number of events processed by this EventProcessor which have been passed by one or more trigger paths.

Definition at line 668 of file EventProcessor.cc.

References schedule_.

Referenced by PythonEventProcessor::totalEventsPassed().

668  {
669  return schedule_->totalEventsPassed();
670  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
void edm::EventProcessor::writeLumi ( ProcessHistoryID const &  phid,
RunNumber_t  run,
LuminosityBlockNumber_t  lumi 
)

Definition at line 1295 of file EventProcessor.cc.

References FDEBUG, edm::for_all(), edm::PrincipalCache::lumiPrincipal(), principalCache_, processContext_, schedule_, and subProcesses_.

1295  {
1297  for_all(subProcesses_, [&phid, run, lumi](auto& subProcess){ subProcess.writeLumi(phid, run, lumi); });
1298  FDEBUG(1) << "\twriteLumi " << run << "/" << lumi << "\n";
1299  }
ProcessContext processContext_
LuminosityBlockPrincipal & lumiPrincipal(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) const
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
PrincipalCache principalCache_
void edm::EventProcessor::writeRun ( ProcessHistoryID const &  phid,
RunNumber_t  run 
)

Definition at line 1283 of file EventProcessor.cc.

References FDEBUG, edm::for_all(), principalCache_, processContext_, edm::PrincipalCache::runPrincipal(), schedule_, and subProcesses_.

1283  {
1285  for_all(subProcesses_, [run,phid](auto& subProcess){ subProcess.writeRun(phid, run); });
1286  FDEBUG(1) << "\twriteRun " << run << "\n";
1287  }
ProcessContext processContext_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
PrincipalCache principalCache_
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const

Member Data Documentation

std::unique_ptr<ExceptionToActionTable const> edm::EventProcessor::act_table_
private

Definition at line 284 of file EventProcessor.h.

Referenced by init().

std::shared_ptr<ActivityRegistry> edm::EventProcessor::actReg_
private
bool edm::EventProcessor::asyncStopRequestedWhileProcessingEvents_
private

Definition at line 314 of file EventProcessor.h.

Referenced by readAndProcessEvents(), readNextEventForStream(), and runToCompletion().

StatusCode edm::EventProcessor::asyncStopStatusCodeFromProcessingEvents_
private

Definition at line 316 of file EventProcessor.h.

Referenced by readNextEventForStream(), and runToCompletion().

bool edm::EventProcessor::beginJobCalled_
private

Definition at line 302 of file EventProcessor.h.

Referenced by beginJob().

edm::propagate_const<std::shared_ptr<BranchIDListHelper> > edm::EventProcessor::branchIDListHelper_
private

Definition at line 278 of file EventProcessor.h.

Referenced by init(), and respondToOpenInputFile().

std::exception_ptr edm::EventProcessor::deferredExceptionPtr_
private
std::atomic<bool> edm::EventProcessor::deferredExceptionPtrIsSet_
private
edm::propagate_const<std::shared_ptr<eventsetup::EventSetupProvider> > edm::EventProcessor::esp_
private
edm::propagate_const<std::unique_ptr<eventsetup::EventSetupsController> > edm::EventProcessor::espController_
private

Definition at line 282 of file EventProcessor.h.

Referenced by beginLumi(), beginRun(), endLumi(), endRun(), init(), and ~EventProcessor().

ExcludedDataMap edm::EventProcessor::eventSetupDataToExcludeFromPrefetching_
private

Definition at line 321 of file EventProcessor.h.

std::string edm::EventProcessor::exceptionMessageFiles_
private

Definition at line 305 of file EventProcessor.h.

Referenced by runToCompletion(), and setExceptionMessageFiles().

std::string edm::EventProcessor::exceptionMessageLumis_
private

Definition at line 307 of file EventProcessor.h.

Referenced by runToCompletion(), and setExceptionMessageLumis().

std::string edm::EventProcessor::exceptionMessageRuns_
private

Definition at line 306 of file EventProcessor.h.

Referenced by runToCompletion(), and setExceptionMessageRuns().

edm::propagate_const<std::unique_ptr<FileBlock> > edm::EventProcessor::fb_
private
bool edm::EventProcessor::fileModeNoMerge_
private

Definition at line 304 of file EventProcessor.h.

Referenced by init(), and runToCompletion().

bool edm::EventProcessor::firstEventInBlock_ =true
private

Definition at line 317 of file EventProcessor.h.

Referenced by readAndProcessEvents(), and readNextEventForStream().

bool edm::EventProcessor::forceESCacheClearOnNewRun_
private

Definition at line 310 of file EventProcessor.h.

Referenced by beginRun(), and init().

bool edm::EventProcessor::forceLooperToEnd_
private

Definition at line 308 of file EventProcessor.h.

Referenced by endOfLoop().

edm::propagate_const<std::unique_ptr<HistoryAppender> > edm::EventProcessor::historyAppender_
private

Definition at line 290 of file EventProcessor.h.

Referenced by init(), readLuminosityBlock(), and readRun().

edm::propagate_const<std::unique_ptr<InputSource> > edm::EventProcessor::input_
private
edm::propagate_const<std::shared_ptr<EDLooperBase> > edm::EventProcessor::looper_
private
bool edm::EventProcessor::looperBeginJobRun_
private

Definition at line 309 of file EventProcessor.h.

Referenced by beginRun(), and startingNewLoop().

InputSource::ItemType edm::EventProcessor::nextItemTypeFromProcessingEvents_
private

Definition at line 315 of file EventProcessor.h.

Referenced by readAndProcessEvents(), readNextEventForStream(), and runToCompletion().

PathsAndConsumesOfModules edm::EventProcessor::pathsAndConsumesOfModules_
private

Definition at line 287 of file EventProcessor.h.

Referenced by beginJob().

PreallocationConfiguration edm::EventProcessor::preallocations_
private
edm::propagate_const<std::shared_ptr<ProductRegistry> > edm::EventProcessor::preg_
private

Definition at line 277 of file EventProcessor.h.

Referenced by beginJob(), endOfLoop(), init(), and readFile().

PrincipalCache edm::EventProcessor::principalCache_
private
bool edm::EventProcessor::printDependencies_ = false
private

Definition at line 323 of file EventProcessor.h.

Referenced by beginJob(), and init().

std::shared_ptr<ProcessConfiguration const> edm::EventProcessor::processConfiguration_
private

Definition at line 285 of file EventProcessor.h.

Referenced by beginJob(), init(), readLuminosityBlock(), and readRun().

ProcessContext edm::EventProcessor::processContext_
private
edm::propagate_const<std::unique_ptr<Schedule> > edm::EventProcessor::schedule_
private
ServiceToken edm::EventProcessor::serviceToken_
private
bool edm::EventProcessor::shouldWeStop_
private

Definition at line 303 of file EventProcessor.h.

Referenced by processEventWithLooper(), shouldWeStop(), and startingNewLoop().

std::shared_ptr<std::recursive_mutex> edm::EventProcessor::sourceMutex_
private

Definition at line 300 of file EventProcessor.h.

Referenced by readNextEventForStream().

SharedResourcesAcquirer edm::EventProcessor::sourceResourcesAcquirer_
private

Definition at line 299 of file EventProcessor.h.

Referenced by handleNextEventForStreamAsync().

std::vector<SubProcess> edm::EventProcessor::subProcesses_
private
edm::propagate_const<std::shared_ptr<ThinnedAssociationsHelper> > edm::EventProcessor::thinnedAssociationsHelper_
private

Definition at line 279 of file EventProcessor.h.

Referenced by init().