CMS 3D CMS Logo

List of all members | Public Types | Public Member Functions | Private Types | Private Member Functions | Private Attributes
edm::EventProcessor Class Reference

#include <EventProcessor.h>

Public Types

enum  StatusCode {
  epSuccess =0, epException =1, epOther =2, epSignal =3,
  epInputComplete =4, epTimedOut =5, epCountComplete =6
}
 

Public Member Functions

void beginJob ()
 
void beginLumi (ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi, bool &globalBeginSucceeded)
 
void beginRun (ProcessHistoryID const &phid, RunNumber_t run, bool &globalBeginSucceeded)
 
void clearCounters ()
 Clears counters used by trigger report. More...
 
void closeInputFile (bool cleaningUpAfterException)
 
void closeOutputFiles ()
 
void deleteLumiFromCache (ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi)
 
void deleteRunFromCache (ProcessHistoryID const &phid, RunNumber_t run)
 
void doErrorStuff ()
 
void enableEndPaths (bool active)
 
void endJob ()
 
void endLumi (ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi, bool globalBeginSucceeded, bool cleaningUpAfterException)
 
bool endOfLoop ()
 
bool endPathsEnabled () const
 
void endRun (ProcessHistoryID const &phid, RunNumber_t run, bool globalBeginSucceeded, bool cleaningUpAfterException)
 
 EventProcessor (std::string const &config, ServiceToken const &token=ServiceToken(), serviceregistry::ServiceLegacy=serviceregistry::kOverlapIsError, std::vector< std::string > const &defaultServices=std::vector< std::string >(), std::vector< std::string > const &forcedServices=std::vector< std::string >())
 
 EventProcessor (std::string const &config, std::vector< std::string > const &defaultServices, std::vector< std::string > const &forcedServices=std::vector< std::string >())
 
 EventProcessor (std::shared_ptr< ProcessDesc > processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy legacy)
 
 EventProcessor (std::string const &config, bool isPython)
 meant for unit tests More...
 
 EventProcessor (EventProcessor const &)=delete
 
std::vector< ModuleDescription const * > getAllModuleDescriptions () const
 
ServiceToken getToken ()
 
void getTriggerReport (TriggerReport &rep) const
 
edm::LuminosityBlockNumber_t nextLuminosityBlockID ()
 
std::pair< edm::ProcessHistoryID, edm::RunNumber_tnextRunID ()
 
InputSource::ItemType nextTransitionType ()
 
void openOutputFiles ()
 
EventProcessoroperator= (EventProcessor const &)=delete
 
void prepareForNextLoop ()
 
ProcessConfiguration const & processConfiguration () const
 
int readAndMergeLumi ()
 
std::pair< ProcessHistoryID, RunNumber_treadAndMergeRun ()
 
InputSource::ItemType readAndProcessEvents ()
 
void readFile ()
 
int readLuminosityBlock ()
 
std::pair< ProcessHistoryID, RunNumber_treadRun ()
 
void respondToCloseInputFile ()
 
void respondToOpenInputFile ()
 
void rewindInput ()
 
StatusCode run ()
 
StatusCode runToCompletion ()
 
bool setDeferredException (std::exception_ptr)
 
void setExceptionMessageFiles (std::string &message)
 
void setExceptionMessageLumis (std::string &message)
 
void setExceptionMessageRuns (std::string &message)
 
bool shouldWeCloseOutput () const
 
bool shouldWeStop () const
 
void startingNewLoop ()
 
int totalEvents () const
 
int totalEventsFailed () const
 
int totalEventsPassed () const
 
void writeLumi (ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi)
 
void writeRun (ProcessHistoryID const &phid, RunNumber_t run)
 
 ~EventProcessor ()
 

Private Types

typedef std::set< std::pair< std::string, std::string > > ExcludedData
 
typedef std::map< std::string, ExcludedDataExcludedDataMap
 

Private Member Functions

std::shared_ptr< BranchIDListHelper const > branchIDListHelper () const
 
std::shared_ptr< BranchIDListHelper > & branchIDListHelper ()
 
bool checkForAsyncStopRequest (StatusCode &)
 
void handleNextEventForStreamAsync (WaitingTaskHolder iTask, unsigned int iStreamIndex, std::atomic< bool > *finishedProcessingEvents)
 
void init (std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
 
std::shared_ptr< EDLooperBase const > looper () const
 
std::shared_ptr< EDLooperBase > & looper ()
 
std::shared_ptr< ProductRegistry const > preg () const
 
std::shared_ptr< ProductRegistry > & preg ()
 
void processEventAsync (WaitingTaskHolder iHolder, unsigned int iStreamIndex)
 
void processEventAsyncImpl (WaitingTaskHolder iHolder, unsigned int iStreamIndex)
 
void processEventWithLooper (EventPrincipal &)
 
void readEvent (unsigned int iStreamIndex)
 
bool readNextEventForStream (unsigned int iStreamIndex, std::atomic< bool > *finishedProcessingEvents)
 
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper () const
 
std::shared_ptr< ThinnedAssociationsHelper > & thinnedAssociationsHelper ()
 

Private Attributes

std::unique_ptr< ExceptionToActionTable const > act_table_
 
std::shared_ptr< ActivityRegistryactReg_
 
bool asyncStopRequestedWhileProcessingEvents_
 
StatusCode asyncStopStatusCodeFromProcessingEvents_
 
bool beginJobCalled_
 
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
 
std::exception_ptr deferredExceptionPtr_
 
std::atomic< bool > deferredExceptionPtrIsSet_
 
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
 
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
 
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
 
std::string exceptionMessageFiles_
 
std::string exceptionMessageLumis_
 
std::string exceptionMessageRuns_
 
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
 
bool fileModeNoMerge_
 
bool firstEventInBlock_ =true
 
bool forceESCacheClearOnNewRun_
 
bool forceLooperToEnd_
 
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
 
edm::propagate_const< std::unique_ptr< InputSource > > input_
 
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
 
bool looperBeginJobRun_
 
std::atomic< InputSource::ItemTypenextItemTypeFromProcessingEvents_
 
PathsAndConsumesOfModules pathsAndConsumesOfModules_
 
PreallocationConfiguration preallocations_
 
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
 
PrincipalCache principalCache_
 
bool printDependencies_ = false
 
std::shared_ptr< ProcessConfiguration const > processConfiguration_
 
ProcessContext processContext_
 
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
 
ServiceToken serviceToken_
 
bool shouldWeStop_
 
std::shared_ptr< std::recursive_mutex > sourceMutex_
 
SharedResourcesAcquirer sourceResourcesAcquirer_
 
std::vector< SubProcesssubProcesses_
 
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
 

Detailed Description

Definition at line 56 of file EventProcessor.h.

Member Typedef Documentation

typedef std::set<std::pair<std::string, std::string> > edm::EventProcessor::ExcludedData
private

Definition at line 318 of file EventProcessor.h.

typedef std::map<std::string, ExcludedData> edm::EventProcessor::ExcludedDataMap
private

Definition at line 319 of file EventProcessor.h.

Member Enumeration Documentation

Constructor & Destructor Documentation

edm::EventProcessor::EventProcessor ( std::string const &  config,
ServiceToken const &  token = ServiceToken(),
serviceregistry::ServiceLegacy  iLegacy = serviceregistry::kOverlapIsError,
std::vector< std::string > const &  defaultServices = std::vector<std::string>(),
std::vector< std::string > const &  forcedServices = std::vector<std::string>() 
)
explicit

Definition at line 215 of file EventProcessor.cc.

References init(), edm::parameterSet(), and PythonProcessDesc::parameterSet().

219  :
220  actReg_(),
221  preg_(),
223  serviceToken_(),
224  input_(),
225  espController_(new eventsetup::EventSetupsController),
226  esp_(),
227  act_table_(),
229  schedule_(),
230  subProcesses_(),
231  historyAppender_(new HistoryAppender),
232  fb_(),
233  looper_(),
235  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
236  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
237  principalCache_(),
238  beginJobCalled_(false),
239  shouldWeStop_(false),
240  fileModeNoMerge_(false),
244  forceLooperToEnd_(false),
245  looperBeginJobRun_(false),
248  std::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
249  auto processDesc = std::make_shared<ProcessDesc>(parameterSet);
250  processDesc->addServices(defaultServices, forcedServices);
251  init(processDesc, iToken, iLegacy);
252  }
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
SharedResourcesAcquirer sourceResourcesAcquirer_
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
Definition: config.py:1
std::string exceptionMessageRuns_
std::vector< SubProcess > subProcesses_
U second(std::pair< T, U > const &p)
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
std::string exceptionMessageLumis_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
static SharedResourcesRegistry * instance()
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
std::shared_ptr< edm::ParameterSet > parameterSet() const
std::string exceptionMessageFiles_
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
T first(std::pair< T, U > const &p)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
ParameterSet const & parameterSet(Provenance const &provenance)
Definition: Provenance.cc:11
PrincipalCache principalCache_
edm::EventProcessor::EventProcessor ( std::string const &  config,
std::vector< std::string > const &  defaultServices,
std::vector< std::string > const &  forcedServices = std::vector<std::string>() 
)

Definition at line 254 of file EventProcessor.cc.

References init(), edm::serviceregistry::kOverlapIsError, edm::parameterSet(), and PythonProcessDesc::parameterSet().

256  :
257  actReg_(),
258  preg_(),
260  serviceToken_(),
261  input_(),
262  espController_(new eventsetup::EventSetupsController),
263  esp_(),
264  act_table_(),
266  schedule_(),
267  subProcesses_(),
268  historyAppender_(new HistoryAppender),
269  fb_(),
270  looper_(),
272  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
273  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
274  principalCache_(),
275  beginJobCalled_(false),
276  shouldWeStop_(false),
277  fileModeNoMerge_(false),
281  forceLooperToEnd_(false),
282  looperBeginJobRun_(false),
287  {
288  std::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
289  auto processDesc = std::make_shared<ProcessDesc>(parameterSet);
290  processDesc->addServices(defaultServices, forcedServices);
292  }
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
SharedResourcesAcquirer sourceResourcesAcquirer_
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
Definition: config.py:1
std::string exceptionMessageRuns_
std::vector< SubProcess > subProcesses_
U second(std::pair< T, U > const &p)
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
std::string exceptionMessageLumis_
std::atomic< InputSource::ItemType > nextItemTypeFromProcessingEvents_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
static SharedResourcesRegistry * instance()
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
std::shared_ptr< edm::ParameterSet > parameterSet() const
std::string exceptionMessageFiles_
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
bool asyncStopRequestedWhileProcessingEvents_
std::shared_ptr< ActivityRegistry > actReg_
T first(std::pair< T, U > const &p)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
ParameterSet const & parameterSet(Provenance const &provenance)
Definition: Provenance.cc:11
PrincipalCache principalCache_
edm::EventProcessor::EventProcessor ( std::shared_ptr< ProcessDesc processDesc,
ServiceToken const &  token,
serviceregistry::ServiceLegacy  legacy 
)

Definition at line 294 of file EventProcessor.cc.

References init().

296  :
297  actReg_(),
298  preg_(),
300  serviceToken_(),
301  input_(),
302  espController_(new eventsetup::EventSetupsController),
303  esp_(),
304  act_table_(),
306  schedule_(),
307  subProcesses_(),
308  historyAppender_(new HistoryAppender),
309  fb_(),
310  looper_(),
312  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
313  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
314  principalCache_(),
315  beginJobCalled_(false),
316  shouldWeStop_(false),
317  fileModeNoMerge_(false),
321  forceLooperToEnd_(false),
322  looperBeginJobRun_(false),
327  {
328  init(processDesc, token, legacy);
329  }
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
SharedResourcesAcquirer sourceResourcesAcquirer_
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
std::string exceptionMessageRuns_
std::vector< SubProcess > subProcesses_
U second(std::pair< T, U > const &p)
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
std::string exceptionMessageLumis_
std::atomic< InputSource::ItemType > nextItemTypeFromProcessingEvents_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
static SharedResourcesRegistry * instance()
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
std::string exceptionMessageFiles_
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
bool asyncStopRequestedWhileProcessingEvents_
std::shared_ptr< ActivityRegistry > actReg_
T first(std::pair< T, U > const &p)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
PrincipalCache principalCache_
edm::EventProcessor::EventProcessor ( std::string const &  config,
bool  isPython 
)

meant for unit tests

Definition at line 332 of file EventProcessor.cc.

References looper::config, init(), edm::serviceregistry::kOverlapIsError, edm::parameterSet(), and PythonProcessDesc::parameterSet().

332  :
333  actReg_(),
334  preg_(),
336  serviceToken_(),
337  input_(),
338  espController_(new eventsetup::EventSetupsController),
339  esp_(),
340  act_table_(),
342  schedule_(),
343  subProcesses_(),
344  historyAppender_(new HistoryAppender),
345  fb_(),
346  looper_(),
348  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
349  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
350  principalCache_(),
351  beginJobCalled_(false),
352  shouldWeStop_(false),
353  fileModeNoMerge_(false),
357  forceLooperToEnd_(false),
358  looperBeginJobRun_(false),
363  {
364  if(isPython) {
365  std::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
366  auto processDesc = std::make_shared<ProcessDesc>(parameterSet);
368  }
369  else {
370  auto processDesc = std::make_shared<ProcessDesc>(config);
372  }
373  }
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
SharedResourcesAcquirer sourceResourcesAcquirer_
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
Definition: config.py:1
std::string exceptionMessageRuns_
std::vector< SubProcess > subProcesses_
U second(std::pair< T, U > const &p)
config
Definition: looper.py:287
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
std::string exceptionMessageLumis_
std::atomic< InputSource::ItemType > nextItemTypeFromProcessingEvents_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
static SharedResourcesRegistry * instance()
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
std::shared_ptr< edm::ParameterSet > parameterSet() const
std::string exceptionMessageFiles_
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
bool asyncStopRequestedWhileProcessingEvents_
std::shared_ptr< ActivityRegistry > actReg_
T first(std::pair< T, U > const &p)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
ParameterSet const & parameterSet(Provenance const &provenance)
Definition: Provenance.cc:11
PrincipalCache principalCache_
edm::EventProcessor::~EventProcessor ( )

Definition at line 547 of file EventProcessor.cc.

References actReg_, edm::ParentageRegistry::clear(), edm::pset::Registry::clear(), esp_, espController_, getToken(), input_, edm::ParentageRegistry::instance(), edm::pset::Registry::instance(), looper_, and schedule_.

547  {
548  // Make the services available while everything is being deleted.
549  ServiceToken token = getToken();
550  ServiceRegistry::Operate op(token);
551 
552  // manually destroy all these thing that may need the services around
553  // propagate_const<T> has no reset() function
554  espController_ = nullptr;
555  esp_ = nullptr;
556  schedule_ = nullptr;
557  input_ = nullptr;
558  looper_ = nullptr;
559  actReg_ = nullptr;
560 
563  }
void clear()
Not thread safe.
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
void clear()
Not thread safe.
Definition: Registry.cc:45
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
ServiceToken getToken()
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
static ParentageRegistry * instance()
static Registry * instance()
Definition: Registry.cc:13
edm::EventProcessor::EventProcessor ( EventProcessor const &  )
delete

Member Function Documentation

void edm::EventProcessor::beginJob ( void  )

This should be called before the first call to 'run' If this is not called in time, it will automatically be called the first time 'run' is called

Definition at line 566 of file EventProcessor.cc.

References actReg_, cms::Exception::addContext(), bk::beginJob(), beginJobCalled_, edm::checkForModuleDependencyCorrectness(), edm::for_all(), mps_fire::i, edm::PathsAndConsumesOfModules::initialize(), input_, edm::PreallocationConfiguration::numberOfLuminosityBlocks(), edm::PreallocationConfiguration::numberOfRuns(), edm::PreallocationConfiguration::numberOfStreams(), edm::PreallocationConfiguration::numberOfThreads(), cmsPerfStripChart::operate(), pathsAndConsumesOfModules_, preallocations_, preg(), preg_, printDependencies_, processConfiguration_, processContext_, schedule_, serviceToken_, subProcesses_, and edm::convertException::wrap().

Referenced by runToCompletion().

566  {
567  if(beginJobCalled_) return;
568  beginJobCalled_=true;
569  bk::beginJob();
570 
571  // StateSentry toerror(this); // should we add this ?
572  //make the services available
574 
575  service::SystemBounds bounds(preallocations_.numberOfStreams(),
579  actReg_->preallocateSignal_(bounds);
580  schedule_->convertCurrentProcessAlias(processConfiguration_->processName());
582 
583  //NOTE: this may throw
585  actReg_->preBeginJobSignal_(pathsAndConsumesOfModules_, processContext_);
586 
587  //NOTE: This implementation assumes 'Job' means one call
588  // the EventProcessor::run
589  // If it really means once per 'application' then this code will
590  // have to be changed.
591  // Also have to deal with case where have 'run' then new Module
592  // added and do 'run'
593  // again. In that case the newly added Module needs its 'beginJob'
594  // to be called.
595 
596  //NOTE: in future we should have a beginOfJob for looper that takes no arguments
597  // For now we delay calling beginOfJob until first beginOfRun
598  //if(looper_) {
599  // looper_->beginOfJob(es);
600  //}
601  try {
602  convertException::wrap([&]() {
603  input_->doBeginJob();
604  });
605  }
606  catch(cms::Exception& ex) {
607  ex.addContext("Calling beginJob for the source");
608  throw;
609  }
610  schedule_->beginJob(*preg_);
611  // toerror.succeeded(); // should we add this?
612  for_all(subProcesses_, [](auto& subProcess){ subProcess.doBeginJob(); });
613  actReg_->postBeginJobSignal_();
614 
615  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
616  schedule_->beginStream(i);
617  for_all(subProcesses_, [i](auto& subProcess){ subProcess.doBeginStream(i); });
618  }
619  }
ProcessContext processContext_
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
PreallocationConfiguration preallocations_
std::vector< SubProcess > subProcesses_
void beginJob()
Definition: Breakpoints.cc:15
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
void initialize(Schedule const *, std::shared_ptr< ProductRegistry const >)
ServiceToken serviceToken_
std::shared_ptr< ProductRegistry const > preg() const
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
void checkForModuleDependencyCorrectness(edm::PathsAndConsumesOfModulesBase const &iPnC, bool iPrintDependencies)
void addContext(std::string const &context)
Definition: Exception.cc:227
PathsAndConsumesOfModules pathsAndConsumesOfModules_
auto wrap(F iFunc) -> decltype(iFunc())
std::shared_ptr< ActivityRegistry > actReg_
def operate(timelog, memlog, json_f, num)
void edm::EventProcessor::beginLumi ( ProcessHistoryID const &  phid,
RunNumber_t  run,
LuminosityBlockNumber_t  lumi,
bool &  globalBeginSucceeded 
)

Definition at line 1079 of file EventProcessor.cc.

References actReg_, edm::LuminosityBlockPrincipal::beginTime(), esp_, espController_, FDEBUG, input_, edm::Service< T >::isAvailable(), looper_, edm::LuminosityBlockPrincipal::luminosityBlock(), edm::PrincipalCache::lumiPrincipal(), edm::make_empty_waiting_task(), edm::PreallocationConfiguration::numberOfStreams(), preallocations_, principalCache_, processContext_, edm::LuminosityBlockPrincipal::run(), schedule_, and subProcesses_.

1079  {
1080  globalBeginSucceeded = false;
1081  LuminosityBlockPrincipal& lumiPrincipal = principalCache_.lumiPrincipal(phid, run, lumi);
1082  {
1083  SendSourceTerminationSignalIfException sentry(actReg_.get());
1084 
1085  input_->doBeginLumi(lumiPrincipal, &processContext_);
1086  sentry.completedSuccessfully();
1087  }
1088 
1090  if(rng.isAvailable()) {
1091  LuminosityBlock lb(lumiPrincipal, ModuleDescription(), nullptr);
1092  rng->preBeginLumi(lb);
1093  }
1094 
1095  // NOTE: Using 0 as the event number for the begin of a lumi block is a bad idea
1096  // lumi blocks know their start and end times why not also start and end events?
1097  IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), 0), lumiPrincipal.beginTime());
1098  {
1099  SendSourceTerminationSignalIfException sentry(actReg_.get());
1100  espController_->eventSetupForInstance(ts);
1101  sentry.completedSuccessfully();
1102  }
1103  EventSetup const& es = esp_->eventSetup();
1104  {
1105  typedef OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalBegin> Traits;
1106  auto globalWaitTask = make_empty_waiting_task();
1107  globalWaitTask->increment_ref_count();
1108  beginGlobalTransitionAsync<Traits>(WaitingTaskHolder(globalWaitTask.get()),
1109  *schedule_,
1110  lumiPrincipal,
1111  ts,
1112  es,
1113  subProcesses_);
1114  globalWaitTask->wait_for_all();
1115  if(globalWaitTask->exceptionPtr() != nullptr) {
1116  std::rethrow_exception(* (globalWaitTask->exceptionPtr()) );
1117  }
1118  }
1119  globalBeginSucceeded=true;
1120  FDEBUG(1) << "\tbeginLumi " << run << "/" << lumi << "\n";
1121  if(looper_) {
1122  looper_->doBeginLuminosityBlock(lumiPrincipal, es, &processContext_);
1123  }
1124  {
1125  //To wait, the ref count has to b 1+#streams
1126  auto streamLoopWaitTask = make_empty_waiting_task();
1127  streamLoopWaitTask->increment_ref_count();
1128 
1129  typedef OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamBegin> Traits;
1130 
1131  beginStreamsTransitionAsync<Traits>(streamLoopWaitTask.get(),
1132  *schedule_,
1134  lumiPrincipal,
1135  ts,
1136  es,
1137  subProcesses_);
1138  streamLoopWaitTask->wait_for_all();
1139  if(streamLoopWaitTask->exceptionPtr() != nullptr) {
1140  std::rethrow_exception(* (streamLoopWaitTask->exceptionPtr()) );
1141  }
1142  }
1143 
1144  FDEBUG(1) << "\tstreamBeginLumi " << run << "/" << lumi << "\n";
1145  if(looper_) {
1146  //looper_->doStreamBeginLuminosityBlock(schedule_->streamID(),lumiPrincipal, es);
1147  }
1148  }
ProcessContext processContext_
edm::propagate_const< std::unique_ptr< InputSource > > input_
LuminosityBlockPrincipal & lumiPrincipal(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) const
#define FDEBUG(lev)
Definition: DebugMacros.h:20
PreallocationConfiguration preallocations_
std::vector< SubProcess > subProcesses_
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
PrincipalCache principalCache_
void edm::EventProcessor::beginRun ( ProcessHistoryID const &  phid,
RunNumber_t  run,
bool &  globalBeginSucceeded 
)

Definition at line 934 of file EventProcessor.cc.

References actReg_, edm::RunPrincipal::beginTime(), esp_, espController_, FDEBUG, forceESCacheClearOnNewRun_, input_, looper_, looperBeginJobRun_, edm::make_empty_waiting_task(), edm::PreallocationConfiguration::numberOfStreams(), preallocations_, principalCache_, processContext_, edm::RunPrincipal::run(), edm::PrincipalCache::runPrincipal(), schedule_, and subProcesses_.

934  {
935  globalBeginSucceeded = false;
936  RunPrincipal& runPrincipal = principalCache_.runPrincipal(phid, run);
937  {
938  SendSourceTerminationSignalIfException sentry(actReg_.get());
939 
940  input_->doBeginRun(runPrincipal, &processContext_);
941  sentry.completedSuccessfully();
942  }
943 
944  IOVSyncValue ts(EventID(runPrincipal.run(), 0, 0),
945  runPrincipal.beginTime());
947  espController_->forceCacheClear();
948  }
949  {
950  SendSourceTerminationSignalIfException sentry(actReg_.get());
951  espController_->eventSetupForInstance(ts);
952  sentry.completedSuccessfully();
953  }
954  EventSetup const& es = esp_->eventSetup();
955  if(looper_ && looperBeginJobRun_== false) {
956  looper_->copyInfo(ScheduleInfo(schedule_.get()));
957  looper_->beginOfJob(es);
958  looperBeginJobRun_ = true;
959  looper_->doStartingNewLoop();
960  }
961  {
962  typedef OccurrenceTraits<RunPrincipal, BranchActionGlobalBegin> Traits;
963  auto globalWaitTask = make_empty_waiting_task();
964  globalWaitTask->increment_ref_count();
965  beginGlobalTransitionAsync<Traits>(WaitingTaskHolder(globalWaitTask.get()),
966  *schedule_,
967  runPrincipal,
968  ts,
969  es,
970  subProcesses_);
971  globalWaitTask->wait_for_all();
972  if(globalWaitTask->exceptionPtr() != nullptr) {
973  std::rethrow_exception(* (globalWaitTask->exceptionPtr()) );
974  }
975  }
976  globalBeginSucceeded = true;
977  FDEBUG(1) << "\tbeginRun " << run << "\n";
978  if(looper_) {
979  looper_->doBeginRun(runPrincipal, es, &processContext_);
980  }
981  {
982  //To wait, the ref count has to be 1+#streams
983  auto streamLoopWaitTask = make_empty_waiting_task();
984  streamLoopWaitTask->increment_ref_count();
985 
986  typedef OccurrenceTraits<RunPrincipal, BranchActionStreamBegin> Traits;
987 
988  beginStreamsTransitionAsync<Traits>(streamLoopWaitTask.get(),
989  *schedule_,
991  runPrincipal,
992  ts,
993  es,
994  subProcesses_);
995 
996  streamLoopWaitTask->wait_for_all();
997  if(streamLoopWaitTask->exceptionPtr() != nullptr) {
998  std::rethrow_exception(* (streamLoopWaitTask->exceptionPtr()) );
999  }
1000  }
1001  FDEBUG(1) << "\tstreamBeginRun " << run << "\n";
1002  if(looper_) {
1003  //looper_->doStreamBeginRun(schedule_->streamID(),runPrincipal, es);
1004  }
1005  }
ProcessContext processContext_
edm::propagate_const< std::unique_ptr< InputSource > > input_
#define FDEBUG(lev)
Definition: DebugMacros.h:20
PreallocationConfiguration preallocations_
std::vector< SubProcess > subProcesses_
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
PrincipalCache principalCache_
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const
std::shared_ptr<BranchIDListHelper const> edm::EventProcessor::branchIDListHelper ( ) const
inlineprivate

Definition at line 262 of file EventProcessor.h.

References edm::get_underlying_safe().

Referenced by init().

std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
std::shared_ptr<BranchIDListHelper>& edm::EventProcessor::branchIDListHelper ( )
inlineprivate

Definition at line 263 of file EventProcessor.h.

References edm::get_underlying_safe().

std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
bool edm::EventProcessor::checkForAsyncStopRequest ( StatusCode returnCode)
private

Definition at line 702 of file EventProcessor.cc.

References epSignal, and edm::shutdown_flag.

Referenced by nextTransitionType(), and readNextEventForStream().

702  {
703  bool returnValue = false;
704 
705  // Look for a shutdown signal
706  if(shutdown_flag.load(std::memory_order_acquire)) {
707  returnValue = true;
709  }
710  return returnValue;
711  }
volatile std::atomic< bool > shutdown_flag
void edm::EventProcessor::clearCounters ( )

Clears counters used by trigger report.

Definition at line 693 of file EventProcessor.cc.

References schedule_.

693  {
694  schedule_->clearCounters();
695  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
void edm::EventProcessor::closeInputFile ( bool  cleaningUpAfterException)

Definition at line 835 of file EventProcessor.cc.

References actReg_, fb_, FDEBUG, and input_.

835  {
836  if (fb_.get() != nullptr) {
837  SendSourceTerminationSignalIfException sentry(actReg_.get());
838  input_->closeFile(fb_.get(), cleaningUpAfterException);
839  sentry.completedSuccessfully();
840  }
841  FDEBUG(1) << "\tcloseInputFile\n";
842  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
#define FDEBUG(lev)
Definition: DebugMacros.h:20
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
std::shared_ptr< ActivityRegistry > actReg_
void edm::EventProcessor::closeOutputFiles ( )

Definition at line 852 of file EventProcessor.cc.

References fb_, FDEBUG, edm::for_all(), schedule_, and subProcesses_.

852  {
853  if (fb_.get() != nullptr) {
854  schedule_->closeOutputFiles();
855  for_all(subProcesses_, [](auto& subProcess){ subProcess.closeOutputFiles(); });
856  }
857  FDEBUG(1) << "\tcloseOutputFiles\n";
858  }
#define FDEBUG(lev)
Definition: DebugMacros.h:20
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
void edm::EventProcessor::deleteLumiFromCache ( ProcessHistoryID const &  phid,
RunNumber_t  run,
LuminosityBlockNumber_t  lumi 
)

Definition at line 1305 of file EventProcessor.cc.

References edm::PrincipalCache::deleteLumi(), FDEBUG, edm::for_all(), principalCache_, and subProcesses_.

1305  {
1307  for_all(subProcesses_, [&phid, run, lumi](auto& subProcess){ subProcess.deleteLumiFromCache(phid, run, lumi); });
1308  FDEBUG(1) << "\tdeleteLumiFromCache " << run << "/" << lumi << "\n";
1309  }
void deleteLumi(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi)
#define FDEBUG(lev)
Definition: DebugMacros.h:20
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
PrincipalCache principalCache_
void edm::EventProcessor::deleteRunFromCache ( ProcessHistoryID const &  phid,
RunNumber_t  run 
)

Definition at line 1293 of file EventProcessor.cc.

References edm::PrincipalCache::deleteRun(), FDEBUG, edm::for_all(), principalCache_, and subProcesses_.

1293  {
1294  principalCache_.deleteRun(phid, run);
1295  for_all(subProcesses_, [run,phid](auto& subProcess){ subProcess.deleteRunFromCache(phid, run); });
1296  FDEBUG(1) << "\tdeleteRunFromCache " << run << "\n";
1297  }
#define FDEBUG(lev)
Definition: DebugMacros.h:20
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
void deleteRun(ProcessHistoryID const &phid, RunNumber_t run)
PrincipalCache principalCache_
void edm::EventProcessor::doErrorStuff ( )

Definition at line 924 of file EventProcessor.cc.

References FDEBUG.

Referenced by runToCompletion().

924  {
925  FDEBUG(1) << "\tdoErrorStuff\n";
926  LogError("StateMachine")
927  << "The EventProcessor state machine encountered an unexpected event\n"
928  << "and went to the error state\n"
929  << "Will attempt to terminate processing normally\n"
930  << "(IF using the looper the next loop will be attempted)\n"
931  << "This likely indicates a bug in an input module or corrupted input or both\n";
932  }
#define FDEBUG(lev)
Definition: DebugMacros.h:20
void edm::EventProcessor::enableEndPaths ( bool  active)

Turn end_paths "off" if "active" is false; turn end_paths "on" if "active" is true.

Definition at line 678 of file EventProcessor.cc.

References schedule_.

678  {
679  schedule_->enableEndPaths(active);
680  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
void edm::EventProcessor::endJob ( void  )

This should be called before the EventProcessor is destroyed throws if any module's endJob throws an exception.

Definition at line 622 of file EventProcessor.cc.

References actReg_, EnergyCorrector::c, edm::ExceptionCollector::call(), edm::SubProcess::doEndJob(), edm::InputSource::doEndJob(), edm::EDLooperBase::endOfJob(), edm::ExceptionCollector::hasThrown(), mps_fire::i, input_, looper(), looper_, edm::PreallocationConfiguration::numberOfStreams(), cmsPerfStripChart::operate(), preallocations_, edm::ExceptionCollector::rethrow(), schedule_, serviceToken_, and subProcesses_.

Referenced by PythonEventProcessor::~PythonEventProcessor().

622  {
623  // Collects exceptions, so we don't throw before all operations are performed.
624  ExceptionCollector c("Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
625 
626  //make the services available
628 
629  //NOTE: this really should go elsewhere in the future
630  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
631  c.call([this,i](){this->schedule_->endStream(i);});
632  for(auto& subProcess : subProcesses_) {
633  c.call([&subProcess,i](){ subProcess.doEndStream(i); } );
634  }
635  }
636  auto actReg = actReg_.get();
637  c.call([actReg](){actReg->preEndJobSignal_();});
638  schedule_->endJob(c);
639  for(auto& subProcess : subProcesses_) {
640  c.call(std::bind(&SubProcess::doEndJob, &subProcess));
641  }
642  c.call(std::bind(&InputSource::doEndJob, input_.get()));
643  if(looper_) {
644  c.call(std::bind(&EDLooperBase::endOfJob, looper()));
645  }
646  c.call([actReg](){actReg->postEndJobSignal_();});
647  if(c.hasThrown()) {
648  c.rethrow();
649  }
650  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
void doEndJob()
Called by framework at end of job.
Definition: InputSource.cc:219
PreallocationConfiguration preallocations_
std::vector< SubProcess > subProcesses_
ServiceToken serviceToken_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
virtual void endOfJob()
Definition: EDLooperBase.cc:90
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ActivityRegistry > actReg_
std::shared_ptr< EDLooperBase const > looper() const
def operate(timelog, memlog, json_f, num)
void edm::EventProcessor::endLumi ( ProcessHistoryID const &  phid,
RunNumber_t  run,
LuminosityBlockNumber_t  lumi,
bool  globalBeginSucceeded,
bool  cleaningUpAfterException 
)

Definition at line 1150 of file EventProcessor.cc.

References actReg_, edm::LuminosityBlockPrincipal::endTime(), esp_, espController_, FDEBUG, input_, looper_, edm::LuminosityBlockPrincipal::luminosityBlock(), edm::PrincipalCache::lumiPrincipal(), edm::make_empty_waiting_task(), edm::EventID::maxEventNumber(), edm::PreallocationConfiguration::numberOfStreams(), preallocations_, principalCache_, processContext_, edm::Principal::resetFailedFromThisProcess(), edm::LuminosityBlockPrincipal::run(), schedule_, edm::Principal::setAtEndTransition(), edm::LuminosityBlockPrincipal::setComplete(), edm::LuminosityBlockPrincipal::setEndTime(), and subProcesses_.

Referenced by Types.EventRange::cppID().

1150  {
1151  LuminosityBlockPrincipal& lumiPrincipal = principalCache_.lumiPrincipal(phid, run, lumi);
1152  lumiPrincipal.setAtEndTransition(true);
1153  //We need to reset failed items since they might
1154  // be set this time around
1155  lumiPrincipal.resetFailedFromThisProcess();
1156 
1157  {
1158  SendSourceTerminationSignalIfException sentry(actReg_.get());
1159 
1160  lumiPrincipal.setEndTime(input_->timestamp());
1161  lumiPrincipal.setComplete();
1162  input_->doEndLumi(lumiPrincipal, cleaningUpAfterException, &processContext_);
1163  sentry.completedSuccessfully();
1164  }
1165  //NOTE: Using the max event number for the end of a lumi block is a bad idea
1166  // lumi blocks know their start and end times why not also start and end events?
1167  IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), EventID::maxEventNumber()),
1168  lumiPrincipal.endTime());
1169  {
1170  SendSourceTerminationSignalIfException sentry(actReg_.get());
1171  espController_->eventSetupForInstance(ts);
1172  sentry.completedSuccessfully();
1173  }
1174  EventSetup const& es = esp_->eventSetup();
1175  if(globalBeginSucceeded){
1176  //To wait, the ref count has to b 1+#streams
1177  auto streamLoopWaitTask = make_empty_waiting_task();
1178  streamLoopWaitTask->increment_ref_count();
1179 
1180  typedef OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamEnd> Traits;
1181 
1182  endStreamsTransitionAsync<Traits>(streamLoopWaitTask.get(),
1183  *schedule_,
1185  lumiPrincipal,
1186  ts,
1187  es,
1188  subProcesses_,
1189  cleaningUpAfterException);
1190  streamLoopWaitTask->wait_for_all();
1191  if(streamLoopWaitTask->exceptionPtr() != nullptr) {
1192  std::rethrow_exception(* (streamLoopWaitTask->exceptionPtr()) );
1193  }
1194  }
1195  FDEBUG(1) << "\tendLumi " << run << "/" << lumi << "\n";
1196  if(looper_) {
1197  //looper_->doStreamEndLuminosityBlock(schedule_->streamID(),lumiPrincipal, es);
1198  }
1199  {
1200  auto globalWaitTask = make_empty_waiting_task();
1201  globalWaitTask->increment_ref_count();
1202 
1203  typedef OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalEnd> Traits;
1204  endGlobalTransitionAsync<Traits>(WaitingTaskHolder(globalWaitTask.get()),
1205  *schedule_,
1206  lumiPrincipal,
1207  ts,
1208  es,
1209  subProcesses_,
1210  cleaningUpAfterException);
1211  globalWaitTask->wait_for_all();
1212  if(globalWaitTask->exceptionPtr() != nullptr) {
1213  std::rethrow_exception(* (globalWaitTask->exceptionPtr()) );
1214  }
1215  }
1216  FDEBUG(1) << "\tendLumi " << run << "/" << lumi << "\n";
1217  if(looper_) {
1218  looper_->doEndLuminosityBlock(lumiPrincipal, es, &processContext_);
1219  }
1220  }
ProcessContext processContext_
edm::propagate_const< std::unique_ptr< InputSource > > input_
LuminosityBlockPrincipal & lumiPrincipal(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) const
void setAtEndTransition(bool iAtEnd)
Definition: Principal.cc:325
#define FDEBUG(lev)
Definition: DebugMacros.h:20
PreallocationConfiguration preallocations_
std::vector< SubProcess > subProcesses_
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
static EventNumber_t maxEventNumber()
Definition: EventID.h:111
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
PrincipalCache principalCache_
bool edm::EventProcessor::endOfLoop ( )

Definition at line 887 of file EventProcessor.cc.

References esp_, FDEBUG, forceLooperToEnd_, edm::EDLooperBase::kContinue, looper_, preg_, schedule_, and mps_update::status.

Referenced by runToCompletion().

887  {
888  if(looper_) {
889  ModuleChanger changer(schedule_.get(),preg_.get());
890  looper_->setModuleChanger(&changer);
891  EDLooperBase::Status status = looper_->doEndOfLoop(esp_->eventSetup());
892  looper_->setModuleChanger(nullptr);
893  if(status != EDLooperBase::kContinue || forceLooperToEnd_) return true;
894  else return false;
895  }
896  FDEBUG(1) << "\tendOfLoop\n";
897  return true;
898  }
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
#define FDEBUG(lev)
Definition: DebugMacros.h:20
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
bool edm::EventProcessor::endPathsEnabled ( ) const

Return true if end_paths are active, and false if they are inactive.

Definition at line 683 of file EventProcessor.cc.

References schedule_.

683  {
684  return schedule_->endPathsEnabled();
685  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
void edm::EventProcessor::endRun ( ProcessHistoryID const &  phid,
RunNumber_t  run,
bool  globalBeginSucceeded,
bool  cleaningUpAfterException 
)

Definition at line 1007 of file EventProcessor.cc.

References actReg_, edm::RunPrincipal::endTime(), esp_, espController_, FDEBUG, input_, looper_, edm::make_empty_waiting_task(), edm::EventID::maxEventNumber(), edm::LuminosityBlockID::maxLuminosityBlockNumber(), edm::PreallocationConfiguration::numberOfStreams(), preallocations_, principalCache_, processContext_, edm::Principal::resetFailedFromThisProcess(), edm::RunPrincipal::run(), edm::PrincipalCache::runPrincipal(), schedule_, edm::Principal::setAtEndTransition(), edm::RunPrincipal::setComplete(), edm::RunPrincipal::setEndTime(), and subProcesses_.

1007  {
1008  RunPrincipal& runPrincipal = principalCache_.runPrincipal(phid, run);
1009  runPrincipal.setAtEndTransition(true);
1010  //We need to reset failed items since they might
1011  // be set this time around
1012  runPrincipal.resetFailedFromThisProcess();
1013 
1014  {
1015  SendSourceTerminationSignalIfException sentry(actReg_.get());
1016 
1017  runPrincipal.setEndTime(input_->timestamp());
1018  runPrincipal.setComplete();
1019  input_->doEndRun(runPrincipal, cleaningUpAfterException, &processContext_);
1020  sentry.completedSuccessfully();
1021  }
1022 
1024  runPrincipal.endTime());
1025  {
1026  SendSourceTerminationSignalIfException sentry(actReg_.get());
1027  espController_->eventSetupForInstance(ts);
1028  sentry.completedSuccessfully();
1029  }
1030  EventSetup const& es = esp_->eventSetup();
1031  if(globalBeginSucceeded){
1032  //To wait, the ref count has to be 1+#streams
1033  auto streamLoopWaitTask = make_empty_waiting_task();
1034  streamLoopWaitTask->increment_ref_count();
1035 
1036  typedef OccurrenceTraits<RunPrincipal, BranchActionStreamEnd> Traits;
1037 
1038  endStreamsTransitionAsync<Traits>(streamLoopWaitTask.get(),
1039  *schedule_,
1041  runPrincipal,
1042  ts,
1043  es,
1044  subProcesses_,
1045  cleaningUpAfterException);
1046 
1047  streamLoopWaitTask->wait_for_all();
1048  if(streamLoopWaitTask->exceptionPtr() != nullptr) {
1049  std::rethrow_exception(* (streamLoopWaitTask->exceptionPtr()) );
1050  }
1051  }
1052  FDEBUG(1) << "\tstreamEndRun " << run << "\n";
1053  if(looper_) {
1054  //looper_->doStreamEndRun(schedule_->streamID(),runPrincipal, es);
1055  }
1056  {
1057  auto globalWaitTask = make_empty_waiting_task();
1058  globalWaitTask->increment_ref_count();
1059 
1060  typedef OccurrenceTraits<RunPrincipal, BranchActionGlobalEnd> Traits;
1061  endGlobalTransitionAsync<Traits>(WaitingTaskHolder(globalWaitTask.get()),
1062  *schedule_,
1063  runPrincipal,
1064  ts,
1065  es,
1066  subProcesses_,
1067  cleaningUpAfterException);
1068  globalWaitTask->wait_for_all();
1069  if(globalWaitTask->exceptionPtr() != nullptr) {
1070  std::rethrow_exception(* (globalWaitTask->exceptionPtr()) );
1071  }
1072  }
1073  FDEBUG(1) << "\tendRun " << run << "\n";
1074  if(looper_) {
1075  looper_->doEndRun(runPrincipal, es, &processContext_);
1076  }
1077  }
ProcessContext processContext_
edm::propagate_const< std::unique_ptr< InputSource > > input_
static LuminosityBlockNumber_t maxLuminosityBlockNumber()
void setAtEndTransition(bool iAtEnd)
Definition: Principal.cc:325
#define FDEBUG(lev)
Definition: DebugMacros.h:20
PreallocationConfiguration preallocations_
std::vector< SubProcess > subProcesses_
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
static EventNumber_t maxEventNumber()
Definition: EventID.h:111
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
PrincipalCache principalCache_
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const
std::vector< ModuleDescription const * > edm::EventProcessor::getAllModuleDescriptions ( ) const

Return a vector allowing const access to all the ModuleDescriptions for this EventProccessor. *** N.B. *** Ownership of the ModuleDescriptions is not *** passed to the caller. Do not call delete on these *** pointers!

Definition at line 658 of file EventProcessor.cc.

References schedule_.

658  {
659  return schedule_->getAllModuleDescriptions();
660  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
ServiceToken edm::EventProcessor::getToken ( )

Definition at line 653 of file EventProcessor.cc.

References serviceToken_.

Referenced by ~EventProcessor().

653  {
654  return serviceToken_;
655  }
ServiceToken serviceToken_
void edm::EventProcessor::getTriggerReport ( TriggerReport rep) const

Return the trigger report information on paths, modules-in-path, modules-in-endpath, and modules.

Definition at line 688 of file EventProcessor.cc.

References schedule_.

688  {
689  schedule_->getTriggerReport(rep);
690  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
rep
Definition: cuy.py:1188
void edm::EventProcessor::handleNextEventForStreamAsync ( WaitingTaskHolder  iTask,
unsigned int  iStreamIndex,
std::atomic< bool > *  finishedProcessingEvents 
)
private

Definition at line 1362 of file EventProcessor.cc.

References deferredExceptionPtr_, deferredExceptionPtrIsSet_, edm::WaitingTaskHolder::doneWaiting(), MillePedeFileConverter_cfg::e, edm::make_waiting_task(), cmsPerfStripChart::operate(), processEventAsync(), edm::SerialTaskQueueChain::push(), readNextEventForStream(), edm::SharedResourcesAcquirer::serialQueueChain(), serviceToken_, and sourceResourcesAcquirer_.

Referenced by readAndProcessEvents().

1365  {
1366  sourceResourcesAcquirer_.serialQueueChain().push([this,finishedProcessingEvents,iTask,iStreamIndex]() mutable {
1368 
1369  try {
1370  if(readNextEventForStream(iStreamIndex, finishedProcessingEvents) ) {
1371  auto recursionTask = make_waiting_task(tbb::task::allocate_root(), [this,iTask,iStreamIndex,finishedProcessingEvents](std::exception_ptr const* iPtr) mutable {
1372  if(iPtr) {
1373  bool expected = false;
1374  if(deferredExceptionPtrIsSet_.compare_exchange_strong(expected,true)) {
1375  deferredExceptionPtr_ = *iPtr;
1376  iTask.doneWaiting(*iPtr);
1377  }
1378  //the stream will stop now
1379  return;
1380  }
1381  handleNextEventForStreamAsync(iTask, iStreamIndex,finishedProcessingEvents);
1382  });
1383 
1384  processEventAsync( WaitingTaskHolder(recursionTask), iStreamIndex);
1385  } else {
1386  //the stream will stop now
1387  iTask.doneWaiting(std::exception_ptr{});
1388  }
1389  } catch(...) {
1390  bool expected = false;
1391  if(deferredExceptionPtrIsSet_.compare_exchange_strong(expected,true)) {
1392  auto e =std::current_exception();
1394  iTask.doneWaiting(e);
1395  }
1396  }
1397  });
1398  }
SharedResourcesAcquirer sourceResourcesAcquirer_
void processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
bool readNextEventForStream(unsigned int iStreamIndex, std::atomic< bool > *finishedProcessingEvents)
void push(T &&iAction)
asynchronously pushes functor iAction into queue
void handleNextEventForStreamAsync(WaitingTaskHolder iTask, unsigned int iStreamIndex, std::atomic< bool > *finishedProcessingEvents)
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
SerialTaskQueueChain & serialQueueChain() const
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:92
std::exception_ptr deferredExceptionPtr_
def operate(timelog, memlog, json_f, num)
void edm::EventProcessor::init ( std::shared_ptr< ProcessDesc > &  processDesc,
ServiceToken const &  token,
serviceregistry::ServiceLegacy  iLegacy 
)
private

Definition at line 376 of file EventProcessor.cc.

References edm::ScheduleItems::act_table_, act_table_, edm::ScheduleItems::actReg_, actReg_, edm::ScheduleItems::addCPRandTNS(), edm::ScheduleItems::branchIDListHelper(), branchIDListHelper(), branchIDListHelper_, trackingPlots::common, edm::errors::Configuration, esp_, espController_, Exception, FDEBUG, fileModeNoMerge_, edm::fillLooper(), forceESCacheClearOnNewRun_, edm::ParameterSet::getUntrackedParameter(), historyAppender_, diffTreeTool::index, edm::ScheduleItems::initMisc(), edm::ScheduleItems::initSchedule(), edm::ScheduleItems::initServices(), input_, edm::PrincipalCache::insert(), edm::ParentageRegistry::insertMapped(), edm::ParentageRegistry::instance(), mps_monitormerge::items, edm::serviceregistry::kConfigurationOverrides, looper_, edm::makeInput(), eostools::move(), edm::PreallocationConfiguration::numberOfStreams(), cmsPerfStripChart::operate(), edm::parameterSet(), edm::popSubProcessVParameterSet(), preallocations_, edm::ScheduleItems::preg(), preg(), preg_, principalCache_, printDependencies_, edm::ScheduleItems::processConfiguration(), processConfiguration_, processContext_, edm::ParameterSet::registerIt(), schedule_, serviceToken_, edm::PrincipalCache::setNumberOfConcurrentPrincipals(), edm::ProcessContext::setProcessConfiguration(), edm::PrincipalCache::setProcessHistoryRegistry(), edm::IllegalParameters::setThrowAnException(), AlCaHLTBitMon_QueryRunRegistry::string, subProcesses_, edm::ScheduleItems::thinnedAssociationsHelper(), thinnedAssociationsHelper(), and thinnedAssociationsHelper_.

Referenced by EventProcessor().

378  {
379 
380  //std::cerr << processDesc->dump() << std::endl;
381 
382  // register the empty parentage vector , once and for all
384 
385  // register the empty parameter set, once and for all.
386  ParameterSet().registerIt();
387 
388  std::shared_ptr<ParameterSet> parameterSet = processDesc->getProcessPSet();
389 
390  // If there are subprocesses, pop the subprocess parameter sets out of the process parameter set
391  auto subProcessVParameterSet = popSubProcessVParameterSet(*parameterSet);
392  bool const hasSubProcesses = !subProcessVParameterSet.empty();
393 
394  // Now set some parameters specific to the main process.
395  ParameterSet const& optionsPset(parameterSet->getUntrackedParameterSet("options", ParameterSet()));
396  auto const& fileMode = optionsPset.getUntrackedParameter<std::string>("fileMode", "FULLMERGE");
397  if(fileMode != "NOMERGE" and fileMode != "FULLMERGE") {
398  throw Exception(errors::Configuration, "Illegal fileMode parameter value: ")
399  << fileMode << ".\n"
400  << "Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
401  } else {
402  fileModeNoMerge_ = (fileMode == "NOMERGE");
403  }
404  forceESCacheClearOnNewRun_ = optionsPset.getUntrackedParameter<bool>("forceEventSetupCacheClearOnNewRun", false);
405  //threading
406  unsigned int nThreads=1;
407  if(optionsPset.existsAs<unsigned int>("numberOfThreads",false)) {
408  nThreads = optionsPset.getUntrackedParameter<unsigned int>("numberOfThreads");
409  if(nThreads == 0) {
410  nThreads = 1;
411  }
412  }
413  /* TODO: when we support having each stream run in a different thread use this default
414  unsigned int nStreams =nThreads;
415  */
416  unsigned int nStreams =1;
417  if(optionsPset.existsAs<unsigned int>("numberOfStreams",false)) {
418  nStreams = optionsPset.getUntrackedParameter<unsigned int>("numberOfStreams");
419  if(nStreams==0) {
420  nStreams = nThreads;
421  }
422  }
423  if(nThreads >1) {
424  edm::LogInfo("ThreadStreamSetup") <<"setting # threads "<<nThreads<<"\nsetting # streams "<<nStreams;
425  }
426 
427  /*
428  bool nRunsSet = false;
429  */
430  unsigned int nConcurrentRuns =1;
431  /*
432  if(nRunsSet = optionsPset.existsAs<unsigned int>("numberOfConcurrentRuns",false)) {
433  nConcurrentRuns = optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentRuns");
434  }
435  */
436  unsigned int nConcurrentLumis =1;
437  /*
438  if(optionsPset.existsAs<unsigned int>("numberOfConcurrentLuminosityBlocks",false)) {
439  nConcurrentLumis = optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentLuminosityBlocks");
440  } else {
441  nConcurrentLumis = nConcurrentRuns;
442  }
443  */
444  //Check that relationships between threading parameters makes sense
445  /*
446  if(nThreads<nStreams) {
447  //bad
448  }
449  if(nConcurrentRuns>nStreams) {
450  //bad
451  }
452  if(nConcurrentRuns>nConcurrentLumis) {
453  //bad
454  }
455  */
456  IllegalParameters::setThrowAnException(optionsPset.getUntrackedParameter<bool>("throwIfIllegalParameter", true));
457 
458  printDependencies_ = optionsPset.getUntrackedParameter("printDependencies", false);
459 
460  // Now do general initialization
461  ScheduleItems items;
462 
463  //initialize the services
464  auto& serviceSets = processDesc->getServicesPSets();
465  ServiceToken token = items.initServices(serviceSets, *parameterSet, iToken, iLegacy, true);
466  serviceToken_ = items.addCPRandTNS(*parameterSet, token);
467 
468  //make the services available
470 
471  if(nStreams>1) {
473  handler->willBeUsingThreads();
474  }
475 
476  // intialize miscellaneous items
477  std::shared_ptr<CommonParams> common(items.initMisc(*parameterSet));
478 
479  // intialize the event setup provider
480  esp_ = espController_->makeProvider(*parameterSet);
481 
482  // initialize the looper, if any
483  looper_ = fillLooper(*espController_, *esp_, *parameterSet);
484  if(looper_) {
485  looper_->setActionTable(items.act_table_.get());
486  looper_->attachTo(*items.actReg_);
487 
488  //For now loopers make us run only 1 transition at a time
489  nStreams=1;
490  nConcurrentLumis=1;
491  nConcurrentRuns=1;
492  }
493 
494  preallocations_ = PreallocationConfiguration{nThreads,nStreams,nConcurrentLumis,nConcurrentRuns};
495 
496  // initialize the input source
497  input_ = makeInput(*parameterSet,
498  *common,
499  items.preg(),
500  items.branchIDListHelper(),
501  items.thinnedAssociationsHelper(),
502  items.actReg_,
503  items.processConfiguration(),
505 
506  // intialize the Schedule
507  schedule_ = items.initSchedule(*parameterSet,hasSubProcesses,preallocations_,&processContext_);
508 
509  // set the data members
510  act_table_ = std::move(items.act_table_);
511  actReg_ = items.actReg_;
512  preg_ = items.preg();
513  branchIDListHelper_ = items.branchIDListHelper();
514  thinnedAssociationsHelper_ = items.thinnedAssociationsHelper();
515  processConfiguration_ = items.processConfiguration();
517  principalCache_.setProcessHistoryRegistry(input_->processHistoryRegistry());
518 
519  FDEBUG(2) << parameterSet << std::endl;
520 
522  for(unsigned int index = 0; index<preallocations_.numberOfStreams(); ++index ) {
523  // Reusable event principal
524  auto ep = std::make_shared<EventPrincipal>(preg(), branchIDListHelper(),
527  }
528 
529  // fill the subprocesses, if there are any
530  subProcesses_.reserve(subProcessVParameterSet.size());
531  for(auto& subProcessPSet : subProcessVParameterSet) {
532  subProcesses_.emplace_back(subProcessPSet,
533  *parameterSet,
534  preg(),
537  SubProcessParentageHelper(),
539  *actReg_,
540  token,
543  &processContext_);
544  }
545  }
void insert(std::shared_ptr< RunPrincipal > rp)
ProcessContext processContext_
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
#define FDEBUG(lev)
Definition: DebugMacros.h:20
PreallocationConfiguration preallocations_
std::vector< SubProcess > subProcesses_
std::unique_ptr< InputSource > makeInput(ParameterSet &params, CommonParams const &common, std::shared_ptr< ProductRegistry > preg, std::shared_ptr< BranchIDListHelper > branchIDListHelper, std::shared_ptr< ThinnedAssociationsHelper > thinnedAssociationsHelper, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, PreallocationConfiguration const &allocations)
ServiceToken serviceToken_
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
std::shared_ptr< ProductRegistry const > preg() const
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
static void setThrowAnException(bool v)
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< EDLooperBase > fillLooper(eventsetup::EventSetupsController &esController, eventsetup::EventSetupProvider &cp, ParameterSet &params)
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
Definition: common.py:1
std::vector< ParameterSet > popSubProcessVParameterSet(ParameterSet &parameterSet)
Definition: SubProcess.cc:657
void setProcessHistoryRegistry(ProcessHistoryRegistry const &phr)
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
static ParentageRegistry * instance()
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
bool insertMapped(value_type const &v)
ParameterSet const & parameterSet(Provenance const &provenance)
Definition: Provenance.cc:11
def move(src, dest)
Definition: eostools.py:510
PrincipalCache principalCache_
def operate(timelog, memlog, json_f, num)
std::shared_ptr<EDLooperBase const> edm::EventProcessor::looper ( ) const
inlineprivate

Definition at line 266 of file EventProcessor.h.

References edm::get_underlying_safe().

Referenced by endJob().

266 {return get_underlying_safe(looper_);}
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
std::shared_ptr<EDLooperBase>& edm::EventProcessor::looper ( )
inlineprivate

Definition at line 267 of file EventProcessor.h.

References edm::get_underlying_safe().

267 {return get_underlying_safe(looper_);}
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::LuminosityBlockNumber_t edm::EventProcessor::nextLuminosityBlockID ( )

Definition at line 740 of file EventProcessor.cc.

References input_.

740  {
741  return input_->luminosityBlock();
742  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::pair< edm::ProcessHistoryID, edm::RunNumber_t > edm::EventProcessor::nextRunID ( )

Definition at line 735 of file EventProcessor.cc.

References input_.

735  {
736  return std::make_pair(input_->reducedProcessHistoryID(), input_->run());
737  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
InputSource::ItemType edm::EventProcessor::nextTransitionType ( )

Definition at line 714 of file EventProcessor.cc.

References actReg_, checkForAsyncStopRequest(), epSuccess, edm::ExternalSignal, input_, edm::InputSource::IsStop, edm::InputSource::IsSynchronize, and runEdmFileComparison::returnCode.

714  {
715  SendSourceTerminationSignalIfException sentry(actReg_.get());
716  InputSource::ItemType itemType;
717  //For now, do nothing with InputSource::IsSynchronize
718  do {
719  itemType = input_->nextItemType();
720  } while( itemType == InputSource::IsSynchronize);
721 
722  sentry.completedSuccessfully();
723 
725 
726  if(checkForAsyncStopRequest(returnCode)) {
727  actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExternalSignal);
728  return InputSource::IsStop;
729  }
730 
731  return itemType;
732  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
bool checkForAsyncStopRequest(StatusCode &)
std::shared_ptr< ActivityRegistry > actReg_
void edm::EventProcessor::openOutputFiles ( )

Definition at line 844 of file EventProcessor.cc.

References fb_, FDEBUG, edm::for_all(), schedule_, and subProcesses_.

844  {
845  if (fb_.get() != nullptr) {
846  schedule_->openOutputFiles(*fb_);
847  for_all(subProcesses_, [this](auto& subProcess){ subProcess.openOutputFiles(*fb_); });
848  }
849  FDEBUG(1) << "\topenOutputFiles\n";
850  }
#define FDEBUG(lev)
Definition: DebugMacros.h:20
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
EventProcessor& edm::EventProcessor::operator= ( EventProcessor const &  )
delete
std::shared_ptr<ProductRegistry const> edm::EventProcessor::preg ( ) const
inlineprivate

Definition at line 260 of file EventProcessor.h.

References edm::get_underlying_safe().

Referenced by beginJob(), init(), readAndMergeLumi(), readAndMergeRun(), readFile(), readLuminosityBlock(), and readRun().

260 {return get_underlying_safe(preg_);}
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
std::shared_ptr<ProductRegistry>& edm::EventProcessor::preg ( )
inlineprivate

Definition at line 261 of file EventProcessor.h.

References edm::get_underlying_safe().

261 {return get_underlying_safe(preg_);}
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
void edm::EventProcessor::prepareForNextLoop ( )

Definition at line 906 of file EventProcessor.cc.

References esp_, FDEBUG, edm::propagate_const< T >::get(), and looper_.

Referenced by runToCompletion().

906  {
907  looper_->prepareForNextLoop(esp_.get());
908  FDEBUG(1) << "\tprepareForNextLoop\n";
909  }
#define FDEBUG(lev)
Definition: DebugMacros.h:20
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
element_type const * get() const
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
ProcessConfiguration const& edm::EventProcessor::processConfiguration ( ) const
inline
void edm::EventProcessor::processEventAsync ( WaitingTaskHolder  iHolder,
unsigned int  iStreamIndex 
)
private

Definition at line 1450 of file EventProcessor.cc.

References edm::make_functor_task(), and processEventAsyncImpl().

Referenced by handleNextEventForStreamAsync().

1451  {
1452  tbb::task::spawn( *make_functor_task( tbb::task::allocate_root(), [=]() {
1453  processEventAsyncImpl(iHolder, iStreamIndex);
1454  }) );
1455  }
FunctorTask< F > * make_functor_task(ALLOC &&iAlloc, F f)
Definition: FunctorTask.h:47
void processEventAsyncImpl(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
void edm::EventProcessor::processEventAsyncImpl ( WaitingTaskHolder  iHolder,
unsigned int  iStreamIndex 
)
private

Definition at line 1457 of file EventProcessor.cc.

References edm::WaitingTaskHolder::doneWaiting(), esp_, ev, edm::PrincipalCache::eventPrincipal(), FDEBUG, edm::Service< T >::isAvailable(), looper_, edm::PrincipalCache::lumiPrincipalPtr(), edm::make_waiting_task(), eostools::move(), cmsPerfStripChart::operate(), principalCache_, processEventWithLooper(), groupFilesInBlocks::reverse, schedule_, serviceToken_, and subProcesses_.

Referenced by processEventAsync().

1458  {
1459  auto pep = &(principalCache_.eventPrincipal(iStreamIndex));
1460  pep->setLuminosityBlockPrincipal(principalCache_.lumiPrincipalPtr());
1461 
1464  if(rng.isAvailable()) {
1465  Event ev(*pep, ModuleDescription(), nullptr);
1466  rng->postEventRead(ev);
1467  }
1468  assert(pep->luminosityBlockPrincipalPtrValid());
1469  assert(principalCache_.lumiPrincipalPtr()->run() == pep->run());
1470  assert(principalCache_.lumiPrincipalPtr()->luminosityBlock() == pep->luminosityBlock());
1471 
1472  WaitingTaskHolder finalizeEventTask( make_waiting_task(
1473  tbb::task::allocate_root(),
1474  [this,pep,iHolder](std::exception_ptr const* iPtr) mutable
1475  {
1477 
1478  //NOTE: If we have a looper we only have one Stream
1479  if(looper_) {
1480  processEventWithLooper(*pep);
1481  }
1482 
1483  FDEBUG(1) << "\tprocessEvent\n";
1484  pep->clearEventPrincipal();
1485  if(iPtr) {
1486  iHolder.doneWaiting(*iPtr);
1487  } else {
1488  iHolder.doneWaiting(std::exception_ptr());
1489  }
1490  }
1491  )
1492  );
1493  WaitingTaskHolder afterProcessTask;
1494  if(subProcesses_.empty()) {
1495  afterProcessTask = std::move(finalizeEventTask);
1496  } else {
1497  //Need to run SubProcesses after schedule has finished
1498  // with the event
1499  afterProcessTask = WaitingTaskHolder(
1500  make_waiting_task(tbb::task::allocate_root(),
1501  [this,pep,finalizeEventTask] (std::exception_ptr const* iPtr) mutable
1502  {
1503  if(not iPtr) {
1505 
1506  //when run with 1 thread, we want to the order to be what
1507  // it was before. This requires reversing the order since
1508  // tasks are run last one in first one out
1509  for(auto& subProcess: boost::adaptors::reverse(subProcesses_)) {
1510  subProcess.doEventAsync(finalizeEventTask,*pep);
1511  }
1512  } else {
1513  finalizeEventTask.doneWaiting(*iPtr);
1514  }
1515  })
1516  );
1517  }
1518 
1519  schedule_->processOneEventAsync(std::move(afterProcessTask),
1520  iStreamIndex,*pep, esp_->eventSetup());
1521 
1522  }
bool ev
#define FDEBUG(lev)
Definition: DebugMacros.h:20
std::vector< SubProcess > subProcesses_
ServiceToken serviceToken_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:92
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< LuminosityBlockPrincipal > const & lumiPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) const
void processEventWithLooper(EventPrincipal &)
def move(src, dest)
Definition: eostools.py:510
PrincipalCache principalCache_
def operate(timelog, memlog, json_f, num)
void edm::EventProcessor::processEventWithLooper ( EventPrincipal iPrincipal)
private

Definition at line 1524 of file EventProcessor.cc.

References esp_, input_, edm::EDLooperBase::kContinue, edm::ProcessingController::kToPreviousEvent, edm::ProcessingController::kToSpecifiedEvent, edm::ProcessingController::lastOperationSucceeded(), looper_, processContext_, edm::ProcessingController::requestedTransition(), edm::ProcessingController::setLastOperationSucceeded(), shouldWeStop_, edm::ProcessingController::specifiedEventTransition(), mps_update::status, edm::EventPrincipal::streamID(), and summarizeEdmComparisonLogfiles::succeeded.

Referenced by processEventAsyncImpl().

1524  {
1525  bool randomAccess = input_->randomAccess();
1526  ProcessingController::ForwardState forwardState = input_->forwardState();
1527  ProcessingController::ReverseState reverseState = input_->reverseState();
1528  ProcessingController pc(forwardState, reverseState, randomAccess);
1529 
1531  do {
1532 
1533  StreamContext streamContext(iPrincipal.streamID(), &processContext_);
1534  status = looper_->doDuringLoop(iPrincipal, esp_->eventSetup(), pc, &streamContext);
1535 
1536  bool succeeded = true;
1537  if(randomAccess) {
1538  if(pc.requestedTransition() == ProcessingController::kToPreviousEvent) {
1539  input_->skipEvents(-2);
1540  }
1541  else if(pc.requestedTransition() == ProcessingController::kToSpecifiedEvent) {
1542  succeeded = input_->goToEvent(pc.specifiedEventTransition());
1543  }
1544  }
1545  pc.setLastOperationSucceeded(succeeded);
1546  } while(!pc.lastOperationSucceeded());
1547  if(status != EDLooperBase::kContinue) shouldWeStop_ = true;
1548  }
ProcessContext processContext_
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
int edm::EventProcessor::readAndMergeLumi ( )

Definition at line 1277 of file EventProcessor.cc.

References actReg_, input_, edm::PrincipalCache::lumiPrincipalPtr(), edm::PrincipalCache::merge(), preg(), and principalCache_.

1277  {
1278  principalCache_.merge(input_->luminosityBlockAuxiliary(), preg());
1279  {
1280  SendSourceTerminationSignalIfException sentry(actReg_.get());
1281  input_->readAndMergeLumi(*principalCache_.lumiPrincipalPtr());
1282  sentry.completedSuccessfully();
1283  }
1284  return input_->luminosityBlock();
1285  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::shared_ptr< ProductRegistry const > preg() const
std::shared_ptr< LuminosityBlockPrincipal > const & lumiPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) const
std::shared_ptr< ActivityRegistry > actReg_
void merge(std::shared_ptr< RunAuxiliary > aux, std::shared_ptr< ProductRegistry const > reg)
PrincipalCache principalCache_
std::pair< ProcessHistoryID, RunNumber_t > edm::EventProcessor::readAndMergeRun ( )

Definition at line 1240 of file EventProcessor.cc.

References actReg_, input_, edm::PrincipalCache::merge(), preg(), principalCache_, and edm::PrincipalCache::runPrincipalPtr().

1240  {
1241  principalCache_.merge(input_->runAuxiliary(), preg());
1242  auto runPrincipal =principalCache_.runPrincipalPtr();
1243  {
1244  SendSourceTerminationSignalIfException sentry(actReg_.get());
1245  input_->readAndMergeRun(*runPrincipal);
1246  sentry.completedSuccessfully();
1247  }
1248  assert(input_->reducedProcessHistoryID() == runPrincipal->reducedProcessHistoryID());
1249  return std::make_pair(runPrincipal->reducedProcessHistoryID(), input_->run());
1250  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::shared_ptr< ProductRegistry const > preg() const
std::shared_ptr< RunPrincipal > const & runPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run) const
std::shared_ptr< ActivityRegistry > actReg_
void merge(std::shared_ptr< RunAuxiliary > aux, std::shared_ptr< ProductRegistry const > reg)
PrincipalCache principalCache_
InputSource::ItemType edm::EventProcessor::readAndProcessEvents ( )

Definition at line 1400 of file EventProcessor.cc.

References asyncStopRequestedWhileProcessingEvents_, deferredExceptionPtr_, deferredExceptionPtrIsSet_, firstEventInBlock_, handleNextEventForStreamAsync(), edm::InputSource::IsEvent, edm::make_empty_waiting_task(), edm::make_waiting_task(), nextItemTypeFromProcessingEvents_, edm::PreallocationConfiguration::numberOfStreams(), preallocations_, and lumiQTWidget::t.

1400  {
1403 
1404  std::atomic<bool> finishedProcessingEvents{false};
1405  auto finishedProcessingEventsPtr = &finishedProcessingEvents;
1406 
1407  //The state machine already found the event so
1408  // we have to avoid looking again
1409  firstEventInBlock_ = true;
1410 
1411  //To wait, the ref count has to b 1+#streams
1412  auto eventLoopWaitTask = make_empty_waiting_task();
1413  eventLoopWaitTask->increment_ref_count();
1414 
1415  const unsigned int kNumStreams = preallocations_.numberOfStreams();
1416  unsigned int iStreamIndex = 0;
1417  for(; iStreamIndex<kNumStreams-1; ++iStreamIndex) {
1418  tbb::task::enqueue( *make_waiting_task(tbb::task::allocate_root(),
1419  [this,iStreamIndex,finishedProcessingEventsPtr,h=WaitingTaskHolder{eventLoopWaitTask.get()}](std::exception_ptr const*){
1420  handleNextEventForStreamAsync(h,iStreamIndex,finishedProcessingEventsPtr);
1421  }) );
1422  }
1423  //need a temporary Task so that the temporary WaitingTaskHolder assigned to h will go out of scope
1424  // before the call to spawn_and_wait_for_all
1425  auto t = make_waiting_task(tbb::task::allocate_root(),
1426  [this,iStreamIndex,finishedProcessingEventsPtr,h=WaitingTaskHolder{eventLoopWaitTask.get()}](std::exception_ptr const*){
1427  handleNextEventForStreamAsync(h,iStreamIndex,finishedProcessingEventsPtr);
1428  });
1429  eventLoopWaitTask->spawn_and_wait_for_all( *t);
1430 
1431  //One of the processing threads saw an exception
1433  std::rethrow_exception(deferredExceptionPtr_);
1434  }
1435  return nextItemTypeFromProcessingEvents_.load();
1436  }
PreallocationConfiguration preallocations_
void handleNextEventForStreamAsync(WaitingTaskHolder iTask, unsigned int iStreamIndex, std::atomic< bool > *finishedProcessingEvents)
std::atomic< bool > deferredExceptionPtrIsSet_
std::atomic< InputSource::ItemType > nextItemTypeFromProcessingEvents_
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:92
std::exception_ptr deferredExceptionPtr_
bool asyncStopRequestedWhileProcessingEvents_
void edm::EventProcessor::readEvent ( unsigned int  iStreamIndex)
private

Definition at line 1438 of file EventProcessor.cc.

References actReg_, edm::PrincipalCache::eventPrincipal(), FDEBUG, input_, principalCache_, and processContext_.

Referenced by readNextEventForStream().

1438  {
1439  //TODO this will have to become per stream
1440  auto& event = principalCache_.eventPrincipal(iStreamIndex);
1441  StreamContext streamContext(event.streamID(), &processContext_);
1442 
1443  SendSourceTerminationSignalIfException sentry(actReg_.get());
1444  input_->readEvent(event, streamContext);
1445  sentry.completedSuccessfully();
1446 
1447  FDEBUG(1) << "\treadEvent\n";
1448  }
ProcessContext processContext_
edm::propagate_const< std::unique_ptr< InputSource > > input_
#define FDEBUG(lev)
Definition: DebugMacros.h:20
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
std::shared_ptr< ActivityRegistry > actReg_
Definition: event.py:1
PrincipalCache principalCache_
void edm::EventProcessor::readFile ( )

Definition at line 818 of file EventProcessor.cc.

References actReg_, edm::PrincipalCache::adjustEventsToNewProductRegistry(), edm::PrincipalCache::adjustIndexesAfterProductRegistryAddition(), fb_, FDEBUG, input_, edm::PreallocationConfiguration::numberOfStreams(), edm::PreallocationConfiguration::numberOfThreads(), edm::FileBlock::ParallelProcesses, preallocations_, preg(), preg_, principalCache_, and findQualityFiles::size.

Referenced by Vispa.Plugins.EventBrowser.EventBrowserTabController.EventBrowserTabController::navigate(), Vispa.Main.TabController.TabController::open(), and Vispa.Main.TabController.TabController::refresh().

818  {
819  FDEBUG(1) << " \treadFile\n";
820  size_t size = preg_->size();
821  SendSourceTerminationSignalIfException sentry(actReg_.get());
822 
823  fb_ = input_->readFile();
824  if(size < preg_->size()) {
826  }
830  fb_->setNotFastClonable(FileBlock::ParallelProcesses);
831  }
832  sentry.completedSuccessfully();
833  }
size
Write out results.
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
void adjustIndexesAfterProductRegistryAddition()
#define FDEBUG(lev)
Definition: DebugMacros.h:20
PreallocationConfiguration preallocations_
std::shared_ptr< ProductRegistry const > preg() const
void adjustEventsToNewProductRegistry(std::shared_ptr< ProductRegistry const > reg)
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
std::shared_ptr< ActivityRegistry > actReg_
PrincipalCache principalCache_
int edm::EventProcessor::readLuminosityBlock ( )

Definition at line 1252 of file EventProcessor.cc.

References actReg_, Exception, edm::PrincipalCache::hasLumiPrincipal(), edm::PrincipalCache::hasRunPrincipal(), historyAppender_, input_, edm::PrincipalCache::insert(), edm::errors::LogicError, preg(), principalCache_, processConfiguration_, and edm::PrincipalCache::runPrincipalPtr().

1252  {
1255  << "EventProcessor::readRun\n"
1256  << "Illegal attempt to insert lumi into cache\n"
1257  << "Contact a Framework Developer\n";
1258  }
1261  << "EventProcessor::readRun\n"
1262  << "Illegal attempt to insert lumi into cache\n"
1263  << "Run is invalid\n"
1264  << "Contact a Framework Developer\n";
1265  }
1266  auto lbp = std::make_shared<LuminosityBlockPrincipal>(input_->luminosityBlockAuxiliary(), preg(), *processConfiguration_, historyAppender_.get(), 0);
1267  {
1268  SendSourceTerminationSignalIfException sentry(actReg_.get());
1269  input_->readLuminosityBlock(*lbp, *historyAppender_);
1270  sentry.completedSuccessfully();
1271  }
1272  lbp->setRunPrincipal(principalCache_.runPrincipalPtr());
1273  principalCache_.insert(lbp);
1274  return input_->luminosityBlock();
1275  }
void insert(std::shared_ptr< RunPrincipal > rp)
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
bool hasRunPrincipal() const
std::shared_ptr< ProductRegistry const > preg() const
std::shared_ptr< ProcessConfiguration const > processConfiguration_
bool hasLumiPrincipal() const
std::shared_ptr< RunPrincipal > const & runPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run) const
std::shared_ptr< ActivityRegistry > actReg_
PrincipalCache principalCache_
bool edm::EventProcessor::readNextEventForStream ( unsigned int  iStreamIndex,
std::atomic< bool > *  finishedProcessingEvents 
)
private

Definition at line 1311 of file EventProcessor.cc.

References actReg_, asyncStopRequestedWhileProcessingEvents_, asyncStopStatusCodeFromProcessingEvents_, checkForAsyncStopRequest(), deferredExceptionPtr_, deferredExceptionPtrIsSet_, edm::ExternalSignal, firstEventInBlock_, input_, edm::InputSource::IsEvent, nextItemTypeFromProcessingEvents_, cmsPerfStripChart::operate(), readEvent(), serviceToken_, shouldWeStop(), and sourceMutex_.

Referenced by handleNextEventForStreamAsync().

1312  {
1313  if(shouldWeStop()) {
1314  return false;
1315  }
1316 
1317  if(deferredExceptionPtrIsSet_.load(std::memory_order_acquire)) {
1318  return false;
1319  }
1320 
1321  if(finishedProcessingEvents->load(std::memory_order_acquire)) {
1322  return false;
1323  }
1324 
1326  try {
1327  //need to use lock in addition to the serial task queue because
1328  // of delayed provenance reading and reading data in response to
1329  // edm::Refs etc
1330  std::lock_guard<std::recursive_mutex> guard(*(sourceMutex_.get()));
1331  if(not firstEventInBlock_) {
1332  //The state machine already called input_->nextItemType
1333  // and found an event. We can't call input_->nextItemType
1334  // again since it would move to the next transition
1335  InputSource::ItemType itemType = input_->nextItemType();
1336  if (InputSource::IsEvent !=itemType) {
1338  finishedProcessingEvents->store(true,std::memory_order_release);
1339  //std::cerr<<"next item type "<<itemType<<"\n";
1340  return false;
1341  }
1343  //std::cerr<<"task told to async stop\n";
1344  actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExternalSignal);
1345  return false;
1346  }
1347  } else {
1348  firstEventInBlock_ = false;
1349  }
1350  readEvent(iStreamIndex);
1351  } catch (...) {
1352  bool expected =false;
1353  if(deferredExceptionPtrIsSet_.compare_exchange_strong(expected,true)) {
1354  deferredExceptionPtr_ = std::current_exception();
1355 
1356  }
1357  return false;
1358  }
1359  return true;
1360  }
void readEvent(unsigned int iStreamIndex)
edm::propagate_const< std::unique_ptr< InputSource > > input_
bool checkForAsyncStopRequest(StatusCode &)
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
std::atomic< InputSource::ItemType > nextItemTypeFromProcessingEvents_
std::shared_ptr< std::recursive_mutex > sourceMutex_
StatusCode asyncStopStatusCodeFromProcessingEvents_
std::exception_ptr deferredExceptionPtr_
bool asyncStopRequestedWhileProcessingEvents_
std::shared_ptr< ActivityRegistry > actReg_
bool shouldWeStop() const
def operate(timelog, memlog, json_f, num)
std::pair< ProcessHistoryID, RunNumber_t > edm::EventProcessor::readRun ( )

Definition at line 1222 of file EventProcessor.cc.

References actReg_, Exception, edm::PrincipalCache::hasRunPrincipal(), historyAppender_, input_, edm::PrincipalCache::insert(), edm::errors::LogicError, preg(), principalCache_, and processConfiguration_.

1222  {
1225  << "EventProcessor::readRun\n"
1226  << "Illegal attempt to insert run into cache\n"
1227  << "Contact a Framework Developer\n";
1228  }
1229  auto rp = std::make_shared<RunPrincipal>(input_->runAuxiliary(), preg(), *processConfiguration_, historyAppender_.get(), 0);
1230  {
1231  SendSourceTerminationSignalIfException sentry(actReg_.get());
1232  input_->readRun(*rp, *historyAppender_);
1233  sentry.completedSuccessfully();
1234  }
1235  assert(input_->reducedProcessHistoryID() == rp->reducedProcessHistoryID());
1236  principalCache_.insert(rp);
1237  return std::make_pair(rp->reducedProcessHistoryID(), input_->run());
1238  }
void insert(std::shared_ptr< RunPrincipal > rp)
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
bool hasRunPrincipal() const
std::shared_ptr< ProductRegistry const > preg() const
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< ActivityRegistry > actReg_
PrincipalCache principalCache_
void edm::EventProcessor::respondToCloseInputFile ( )

Definition at line 869 of file EventProcessor.cc.

References fb_, FDEBUG, edm::for_all(), schedule_, and subProcesses_.

869  {
870  if (fb_.get() != nullptr) {
871  schedule_->respondToCloseInputFile(*fb_);
872  for_all(subProcesses_, [this](auto& subProcess){ subProcess.respondToCloseInputFile(*fb_); });
873  }
874  FDEBUG(1) << "\trespondToCloseInputFile\n";
875  }
#define FDEBUG(lev)
Definition: DebugMacros.h:20
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
void edm::EventProcessor::respondToOpenInputFile ( )

Definition at line 860 of file EventProcessor.cc.

References branchIDListHelper_, fb_, FDEBUG, edm::for_all(), schedule_, and subProcesses_.

860  {
861  for_all(subProcesses_, [this](auto& subProcess){ subProcess.updateBranchIDListHelper(branchIDListHelper_->branchIDLists()); } );
862  if (fb_.get() != nullptr) {
863  schedule_->respondToOpenInputFile(*fb_);
864  for_all(subProcesses_, [this](auto& subProcess) { subProcess.respondToOpenInputFile(*fb_); });
865  }
866  FDEBUG(1) << "\trespondToOpenInputFile\n";
867  }
#define FDEBUG(lev)
Definition: DebugMacros.h:20
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
void edm::EventProcessor::rewindInput ( )

Definition at line 900 of file EventProcessor.cc.

References FDEBUG, and input_.

Referenced by runToCompletion().

900  {
901  input_->repeat();
902  input_->rewind();
903  FDEBUG(1) << "\trewind\n";
904  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
#define FDEBUG(lev)
Definition: DebugMacros.h:20
EventProcessor::StatusCode edm::EventProcessor::run ( )
inline

Definition at line 329 of file EventProcessor.h.

Referenced by Types.EventID::cppID(), and Types.LuminosityBlockID::cppID().

329  {
330  return runToCompletion();
331  }
StatusCode runToCompletion()
EventProcessor::StatusCode edm::EventProcessor::runToCompletion ( )

Definition at line 745 of file EventProcessor.cc.

References cms::Exception::addAdditionalInfo(), cms::Exception::alreadyPrinted(), asyncStopRequestedWhileProcessingEvents_, asyncStopStatusCodeFromProcessingEvents_, beginJob(), deferredExceptionPtr_, deferredExceptionPtrIsSet_, doErrorStuff(), MillePedeFileConverter_cfg::e, endOfLoop(), epSuccess, Exception, exceptionMessageFiles_, exceptionMessageLumis_, exceptionMessageRuns_, fileModeNoMerge_, edm::InputSource::IsEvent, edm::InputSource::IsStop, nextItemTypeFromProcessingEvents_, cmsPerfStripChart::operate(), prepareForNextLoop(), runEdmFileComparison::returnCode, rewindInput(), serviceToken_, startingNewLoop(), and edm::convertException::wrap().

Referenced by PythonEventProcessor::run().

745  {
746 
749  {
750  beginJob(); //make sure this was called
751 
752  // make the services available
754 
755 
758  try {
759  FilesProcessor fp(fileModeNoMerge_);
760 
761  convertException::wrap([&]() {
762  bool firstTime = true;
763  do {
764  if(not firstTime) {
766  rewindInput();
767  } else {
768  firstTime = false;
769  }
770  startingNewLoop();
771 
772  auto trans = fp.processFiles(*this);
773 
774  fp.normalEnd();
775 
776  if(deferredExceptionPtrIsSet_.load()) {
777  std::rethrow_exception(deferredExceptionPtr_);
778  }
779  if(trans != InputSource::IsStop) {
780  //problem with the source
781  doErrorStuff();
782 
783  throw cms::Exception("BadTransition")
784  << "Unexpected transition change "
785  << trans;
786 
787  }
788  } while(not endOfLoop());
789  }); // convertException::wrap
790 
791  } // Try block
792  catch (cms::Exception & e) {
793  if (!exceptionMessageLumis_.empty()) {
795  if (e.alreadyPrinted()) {
796  LogAbsolute("Additional Exceptions") << exceptionMessageLumis_;
797  }
798  }
799  if (!exceptionMessageRuns_.empty()) {
801  if (e.alreadyPrinted()) {
802  LogAbsolute("Additional Exceptions") << exceptionMessageRuns_;
803  }
804  }
805  if (!exceptionMessageFiles_.empty()) {
807  if (e.alreadyPrinted()) {
808  LogAbsolute("Additional Exceptions") << exceptionMessageFiles_;
809  }
810  }
811  throw;
812  }
813  }
814 
815  return returnCode;
816  }
std::string exceptionMessageRuns_
bool alreadyPrinted() const
Definition: Exception.cc:251
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
std::string exceptionMessageLumis_
std::atomic< InputSource::ItemType > nextItemTypeFromProcessingEvents_
void addAdditionalInfo(std::string const &info)
Definition: Exception.cc:235
std::string exceptionMessageFiles_
StatusCode asyncStopStatusCodeFromProcessingEvents_
std::exception_ptr deferredExceptionPtr_
auto wrap(F iFunc) -> decltype(iFunc())
bool asyncStopRequestedWhileProcessingEvents_
def operate(timelog, memlog, json_f, num)
bool edm::EventProcessor::setDeferredException ( std::exception_ptr  iException)

Definition at line 1576 of file EventProcessor.cc.

References deferredExceptionPtr_, and deferredExceptionPtrIsSet_.

1576  {
1577  bool expected =false;
1578  if(deferredExceptionPtrIsSet_.compare_exchange_strong(expected,true)) {
1579  deferredExceptionPtr_ = iException;
1580  return true;
1581  }
1582  return false;
1583  }
std::atomic< bool > deferredExceptionPtrIsSet_
std::exception_ptr deferredExceptionPtr_
void edm::EventProcessor::setExceptionMessageFiles ( std::string &  message)

Definition at line 1564 of file EventProcessor.cc.

References exceptionMessageFiles_, and python.rootplot.argparse::message.

1564  {
1566  }
std::string exceptionMessageFiles_
void edm::EventProcessor::setExceptionMessageLumis ( std::string &  message)

Definition at line 1572 of file EventProcessor.cc.

References exceptionMessageLumis_, and python.rootplot.argparse::message.

1572  {
1574  }
std::string exceptionMessageLumis_
void edm::EventProcessor::setExceptionMessageRuns ( std::string &  message)

Definition at line 1568 of file EventProcessor.cc.

References exceptionMessageRuns_, and python.rootplot.argparse::message.

1568  {
1570  }
std::string exceptionMessageRuns_
bool edm::EventProcessor::shouldWeCloseOutput ( ) const

Definition at line 911 of file EventProcessor.cc.

References FDEBUG, schedule_, and subProcesses_.

911  {
912  FDEBUG(1) << "\tshouldWeCloseOutput\n";
913  if(!subProcesses_.empty()) {
914  for(auto const& subProcess : subProcesses_) {
915  if(subProcess.shouldWeCloseOutput()) {
916  return true;
917  }
918  }
919  return false;
920  }
921  return schedule_->shouldWeCloseOutput();
922  }
#define FDEBUG(lev)
Definition: DebugMacros.h:20
std::vector< SubProcess > subProcesses_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
bool edm::EventProcessor::shouldWeStop ( ) const

Definition at line 1550 of file EventProcessor.cc.

References FDEBUG, schedule_, shouldWeStop_, and subProcesses_.

Referenced by readNextEventForStream().

1550  {
1551  FDEBUG(1) << "\tshouldWeStop\n";
1552  if(shouldWeStop_) return true;
1553  if(!subProcesses_.empty()) {
1554  for(auto const& subProcess : subProcesses_) {
1555  if(subProcess.terminate()) {
1556  return true;
1557  }
1558  }
1559  return false;
1560  }
1561  return schedule_->terminate();
1562  }
#define FDEBUG(lev)
Definition: DebugMacros.h:20
std::vector< SubProcess > subProcesses_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
void edm::EventProcessor::startingNewLoop ( )

Definition at line 877 of file EventProcessor.cc.

References FDEBUG, looper_, looperBeginJobRun_, and shouldWeStop_.

Referenced by runToCompletion().

877  {
878  shouldWeStop_ = false;
879  //NOTE: for first loop, need to delay calling 'doStartingNewLoop'
880  // until after we've called beginOfJob
881  if(looper_ && looperBeginJobRun_) {
882  looper_->doStartingNewLoop();
883  }
884  FDEBUG(1) << "\tstartingNewLoop\n";
885  }
#define FDEBUG(lev)
Definition: DebugMacros.h:20
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
std::shared_ptr<ThinnedAssociationsHelper const> edm::EventProcessor::thinnedAssociationsHelper ( ) const
inlineprivate

Definition at line 264 of file EventProcessor.h.

References edm::get_underlying_safe().

Referenced by init().

std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
std::shared_ptr<ThinnedAssociationsHelper>& edm::EventProcessor::thinnedAssociationsHelper ( )
inlineprivate

Definition at line 265 of file EventProcessor.h.

References edm::get_underlying_safe().

std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
int edm::EventProcessor::totalEvents ( ) const

Return the number of events this EventProcessor has tried to process (inclues both successes and failures, including failures due to exceptions during processing).

Definition at line 663 of file EventProcessor.cc.

References schedule_.

Referenced by PythonEventProcessor::totalEvents().

663  {
664  return schedule_->totalEvents();
665  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
int edm::EventProcessor::totalEventsFailed ( ) const

Return the number of events that have not passed any trigger. (N.B. totalEventsFailed() + totalEventsPassed() == totalEvents()

Definition at line 673 of file EventProcessor.cc.

References schedule_.

Referenced by PythonEventProcessor::totalEventsFailed().

673  {
674  return schedule_->totalEventsFailed();
675  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
int edm::EventProcessor::totalEventsPassed ( ) const

Return the number of events processed by this EventProcessor which have been passed by one or more trigger paths.

Definition at line 668 of file EventProcessor.cc.

References schedule_.

Referenced by PythonEventProcessor::totalEventsPassed().

668  {
669  return schedule_->totalEventsPassed();
670  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
void edm::EventProcessor::writeLumi ( ProcessHistoryID const &  phid,
RunNumber_t  run,
LuminosityBlockNumber_t  lumi 
)

Definition at line 1299 of file EventProcessor.cc.

References FDEBUG, edm::for_all(), edm::PrincipalCache::lumiPrincipal(), principalCache_, processContext_, schedule_, and subProcesses_.

1299  {
1301  for_all(subProcesses_, [&phid, run, lumi](auto& subProcess){ subProcess.writeLumi(phid, run, lumi); });
1302  FDEBUG(1) << "\twriteLumi " << run << "/" << lumi << "\n";
1303  }
ProcessContext processContext_
LuminosityBlockPrincipal & lumiPrincipal(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) const
#define FDEBUG(lev)
Definition: DebugMacros.h:20
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
PrincipalCache principalCache_
void edm::EventProcessor::writeRun ( ProcessHistoryID const &  phid,
RunNumber_t  run 
)

Definition at line 1287 of file EventProcessor.cc.

References FDEBUG, edm::for_all(), principalCache_, processContext_, edm::PrincipalCache::runPrincipal(), schedule_, and subProcesses_.

1287  {
1289  for_all(subProcesses_, [run,phid](auto& subProcess){ subProcess.writeRun(phid, run); });
1290  FDEBUG(1) << "\twriteRun " << run << "\n";
1291  }
ProcessContext processContext_
#define FDEBUG(lev)
Definition: DebugMacros.h:20
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
PrincipalCache principalCache_
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const

Member Data Documentation

std::unique_ptr<ExceptionToActionTable const> edm::EventProcessor::act_table_
private

Definition at line 283 of file EventProcessor.h.

Referenced by init().

std::shared_ptr<ActivityRegistry> edm::EventProcessor::actReg_
private
bool edm::EventProcessor::asyncStopRequestedWhileProcessingEvents_
private

Definition at line 313 of file EventProcessor.h.

Referenced by readAndProcessEvents(), readNextEventForStream(), and runToCompletion().

StatusCode edm::EventProcessor::asyncStopStatusCodeFromProcessingEvents_
private

Definition at line 315 of file EventProcessor.h.

Referenced by readNextEventForStream(), and runToCompletion().

bool edm::EventProcessor::beginJobCalled_
private

Definition at line 301 of file EventProcessor.h.

Referenced by beginJob().

edm::propagate_const<std::shared_ptr<BranchIDListHelper> > edm::EventProcessor::branchIDListHelper_
private

Definition at line 277 of file EventProcessor.h.

Referenced by init(), and respondToOpenInputFile().

std::exception_ptr edm::EventProcessor::deferredExceptionPtr_
private
std::atomic<bool> edm::EventProcessor::deferredExceptionPtrIsSet_
private
edm::propagate_const<std::shared_ptr<eventsetup::EventSetupProvider> > edm::EventProcessor::esp_
private
edm::propagate_const<std::unique_ptr<eventsetup::EventSetupsController> > edm::EventProcessor::espController_
private

Definition at line 281 of file EventProcessor.h.

Referenced by beginLumi(), beginRun(), endLumi(), endRun(), init(), and ~EventProcessor().

ExcludedDataMap edm::EventProcessor::eventSetupDataToExcludeFromPrefetching_
private

Definition at line 320 of file EventProcessor.h.

std::string edm::EventProcessor::exceptionMessageFiles_
private

Definition at line 304 of file EventProcessor.h.

Referenced by runToCompletion(), and setExceptionMessageFiles().

std::string edm::EventProcessor::exceptionMessageLumis_
private

Definition at line 306 of file EventProcessor.h.

Referenced by runToCompletion(), and setExceptionMessageLumis().

std::string edm::EventProcessor::exceptionMessageRuns_
private

Definition at line 305 of file EventProcessor.h.

Referenced by runToCompletion(), and setExceptionMessageRuns().

edm::propagate_const<std::unique_ptr<FileBlock> > edm::EventProcessor::fb_
private
bool edm::EventProcessor::fileModeNoMerge_
private

Definition at line 303 of file EventProcessor.h.

Referenced by init(), and runToCompletion().

bool edm::EventProcessor::firstEventInBlock_ =true
private

Definition at line 316 of file EventProcessor.h.

Referenced by readAndProcessEvents(), and readNextEventForStream().

bool edm::EventProcessor::forceESCacheClearOnNewRun_
private

Definition at line 309 of file EventProcessor.h.

Referenced by beginRun(), and init().

bool edm::EventProcessor::forceLooperToEnd_
private

Definition at line 307 of file EventProcessor.h.

Referenced by endOfLoop().

edm::propagate_const<std::unique_ptr<HistoryAppender> > edm::EventProcessor::historyAppender_
private

Definition at line 289 of file EventProcessor.h.

Referenced by init(), readLuminosityBlock(), and readRun().

edm::propagate_const<std::unique_ptr<InputSource> > edm::EventProcessor::input_
private
edm::propagate_const<std::shared_ptr<EDLooperBase> > edm::EventProcessor::looper_
private
bool edm::EventProcessor::looperBeginJobRun_
private

Definition at line 308 of file EventProcessor.h.

Referenced by beginRun(), and startingNewLoop().

std::atomic<InputSource::ItemType> edm::EventProcessor::nextItemTypeFromProcessingEvents_
private

Definition at line 314 of file EventProcessor.h.

Referenced by readAndProcessEvents(), readNextEventForStream(), and runToCompletion().

PathsAndConsumesOfModules edm::EventProcessor::pathsAndConsumesOfModules_
private

Definition at line 286 of file EventProcessor.h.

Referenced by beginJob().

PreallocationConfiguration edm::EventProcessor::preallocations_
private
edm::propagate_const<std::shared_ptr<ProductRegistry> > edm::EventProcessor::preg_
private

Definition at line 276 of file EventProcessor.h.

Referenced by beginJob(), endOfLoop(), init(), and readFile().

PrincipalCache edm::EventProcessor::principalCache_
private
bool edm::EventProcessor::printDependencies_ = false
private

Definition at line 322 of file EventProcessor.h.

Referenced by beginJob(), and init().

std::shared_ptr<ProcessConfiguration const> edm::EventProcessor::processConfiguration_
private

Definition at line 284 of file EventProcessor.h.

Referenced by beginJob(), init(), readLuminosityBlock(), and readRun().

ProcessContext edm::EventProcessor::processContext_
private
edm::propagate_const<std::unique_ptr<Schedule> > edm::EventProcessor::schedule_
private
ServiceToken edm::EventProcessor::serviceToken_
private
bool edm::EventProcessor::shouldWeStop_
private

Definition at line 302 of file EventProcessor.h.

Referenced by processEventWithLooper(), shouldWeStop(), and startingNewLoop().

std::shared_ptr<std::recursive_mutex> edm::EventProcessor::sourceMutex_
private

Definition at line 299 of file EventProcessor.h.

Referenced by readNextEventForStream().

SharedResourcesAcquirer edm::EventProcessor::sourceResourcesAcquirer_
private

Definition at line 298 of file EventProcessor.h.

Referenced by handleNextEventForStreamAsync().

std::vector<SubProcess> edm::EventProcessor::subProcesses_
private
edm::propagate_const<std::shared_ptr<ThinnedAssociationsHelper> > edm::EventProcessor::thinnedAssociationsHelper_
private

Definition at line 278 of file EventProcessor.h.

Referenced by init().