CMS 3D CMS Logo

List of all members | Public Types | Public Member Functions | Private Types | Private Member Functions | Private Attributes
edm::EventProcessor Class Reference

#include <EventProcessor.h>

Public Types

enum  StatusCode {
  epSuccess =0, epException =1, epOther =2, epSignal =3,
  epInputComplete =4, epTimedOut =5, epCountComplete =6
}
 

Public Member Functions

void beginJob ()
 
void beginLumiAsync (edm::IOVSyncValue const &iSyncValue, std::shared_ptr< void > const &iRunResource, edm::WaitingTaskHolder iHolder)
 
void beginRun (ProcessHistoryID const &phid, RunNumber_t run, bool &globalBeginSucceeded)
 
void clearCounters ()
 Clears counters used by trigger report. More...
 
void closeInputFile (bool cleaningUpAfterException)
 
void closeOutputFiles ()
 
void continueLumiAsync (edm::WaitingTaskHolder iHolder)
 
void deleteLumiFromCache (LuminosityBlockProcessingStatus &)
 
void deleteRunFromCache (ProcessHistoryID const &phid, RunNumber_t run)
 
void doErrorStuff ()
 
void enableEndPaths (bool active)
 
void endJob ()
 
bool endOfLoop ()
 
bool endPathsEnabled () const
 
void endRun (ProcessHistoryID const &phid, RunNumber_t run, bool globalBeginSucceeded, bool cleaningUpAfterException)
 
void endUnfinishedLumi ()
 
void endUnfinishedRun (ProcessHistoryID const &phid, RunNumber_t run, bool globalBeginSucceeded, bool cleaningUpAfterException)
 
 EventProcessor (std::string const &config, ServiceToken const &token=ServiceToken(), serviceregistry::ServiceLegacy=serviceregistry::kOverlapIsError, std::vector< std::string > const &defaultServices=std::vector< std::string >(), std::vector< std::string > const &forcedServices=std::vector< std::string >())
 
 EventProcessor (std::string const &config, std::vector< std::string > const &defaultServices, std::vector< std::string > const &forcedServices=std::vector< std::string >())
 
 EventProcessor (std::shared_ptr< ProcessDesc > processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy legacy)
 
 EventProcessor (std::string const &config, bool isPython)
 meant for unit tests More...
 
 EventProcessor (EventProcessor const &)=delete
 
std::vector< ModuleDescription const * > getAllModuleDescriptions () const
 
ServiceToken getToken ()
 
void getTriggerReport (TriggerReport &rep) const
 
void globalEndLumiAsync (edm::WaitingTaskHolder iTask, std::shared_ptr< LuminosityBlockProcessingStatus > iLumiStatus)
 
InputSource::ItemType lastTransitionType () const
 
edm::LuminosityBlockNumber_t nextLuminosityBlockID ()
 
std::pair< edm::ProcessHistoryID, edm::RunNumber_tnextRunID ()
 
InputSource::ItemType nextTransitionType ()
 
void openOutputFiles ()
 
EventProcessoroperator= (EventProcessor const &)=delete
 
void prepareForNextLoop ()
 
ProcessConfiguration const & processConfiguration () const
 
InputSource::ItemType processLumis (std::shared_ptr< void > const &iRunResource)
 
int readAndMergeLumi (LuminosityBlockProcessingStatus &)
 
std::pair< ProcessHistoryID, RunNumber_treadAndMergeRun ()
 
void readFile ()
 
void readLuminosityBlock (LuminosityBlockProcessingStatus &)
 
std::pair< ProcessHistoryID, RunNumber_treadRun ()
 
void respondToCloseInputFile ()
 
void respondToOpenInputFile ()
 
void rewindInput ()
 
StatusCode run ()
 
StatusCode runToCompletion ()
 
bool setDeferredException (std::exception_ptr)
 
void setExceptionMessageFiles (std::string &message)
 
void setExceptionMessageLumis (std::string &message)
 
void setExceptionMessageRuns (std::string &message)
 
bool shouldWeCloseOutput () const
 
bool shouldWeStop () const
 
void startingNewLoop ()
 
void streamEndLumiAsync (edm::WaitingTaskHolder iTask, unsigned int iStreamIndex, std::shared_ptr< LuminosityBlockProcessingStatus > iLumiStatus)
 
int totalEvents () const
 
int totalEventsFailed () const
 
int totalEventsPassed () const
 
void writeLumiAsync (WaitingTaskHolder, std::shared_ptr< LuminosityBlockProcessingStatus >)
 
void writeRunAsync (WaitingTaskHolder, ProcessHistoryID const &phid, RunNumber_t run)
 
 ~EventProcessor ()
 

Private Types

typedef std::set< std::pair< std::string, std::string > > ExcludedData
 
typedef std::map< std::string, ExcludedDataExcludedDataMap
 

Private Member Functions

std::shared_ptr< BranchIDListHelper const > branchIDListHelper () const
 
std::shared_ptr< BranchIDListHelper > & branchIDListHelper ()
 
bool checkForAsyncStopRequest (StatusCode &)
 
void handleNextEventForStreamAsync (WaitingTaskHolder iTask, unsigned int iStreamIndex)
 
void init (std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
 
std::shared_ptr< EDLooperBase const > looper () const
 
std::shared_ptr< EDLooperBase > & looper ()
 
std::shared_ptr< ProductRegistry const > preg () const
 
std::shared_ptr< ProductRegistry > & preg ()
 
void processEventAsync (WaitingTaskHolder iHolder, unsigned int iStreamIndex)
 
void processEventAsyncImpl (WaitingTaskHolder iHolder, unsigned int iStreamIndex)
 
void processEventWithLooper (EventPrincipal &)
 
void readEvent (unsigned int iStreamIndex)
 
bool readNextEventForStream (unsigned int iStreamIndex, LuminosityBlockProcessingStatus &iLumiStatus)
 
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper () const
 
std::shared_ptr< ThinnedAssociationsHelper > & thinnedAssociationsHelper ()
 

Private Attributes

std::unique_ptr< ExceptionToActionTable const > act_table_
 
std::shared_ptr< ActivityRegistryactReg_
 
bool asyncStopRequestedWhileProcessingEvents_
 
StatusCode asyncStopStatusCodeFromProcessingEvents_
 
bool beginJobCalled_
 
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
 
std::exception_ptr deferredExceptionPtr_
 
std::atomic< bool > deferredExceptionPtrIsSet_
 
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
 
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
 
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
 
std::string exceptionMessageFiles_
 
std::string exceptionMessageLumis_
 
std::string exceptionMessageRuns_
 
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
 
bool fileModeNoMerge_
 
bool firstEventInBlock_ =true
 
bool forceESCacheClearOnNewRun_
 
bool forceLooperToEnd_
 
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
 
edm::propagate_const< std::unique_ptr< InputSource > > input_
 
edm::SerialTaskQueue iovQueue_
 
InputSource::ItemType lastSourceTransition_
 
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
 
bool looperBeginJobRun_
 
std::unique_ptr< edm::LimitedTaskQueuelumiQueue_
 
PathsAndConsumesOfModules pathsAndConsumesOfModules_
 
PreallocationConfiguration preallocations_
 
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
 
PrincipalCache principalCache_
 
bool printDependencies_ = false
 
std::shared_ptr< ProcessConfiguration const > processConfiguration_
 
ProcessContext processContext_
 
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
 
ServiceToken serviceToken_
 
bool shouldWeStop_
 
std::shared_ptr< std::recursive_mutex > sourceMutex_
 
SharedResourcesAcquirer sourceResourcesAcquirer_
 
std::atomic< unsigned int > streamLumiActive_ {0}
 
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
 
std::vector< edm::SerialTaskQueuestreamQueues_
 
std::vector< SubProcesssubProcesses_
 
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
 

Detailed Description

Definition at line 61 of file EventProcessor.h.

Member Typedef Documentation

typedef std::set<std::pair<std::string, std::string> > edm::EventProcessor::ExcludedData
private

Definition at line 338 of file EventProcessor.h.

typedef std::map<std::string, ExcludedData> edm::EventProcessor::ExcludedDataMap
private

Definition at line 339 of file EventProcessor.h.

Member Enumeration Documentation

Constructor & Destructor Documentation

edm::EventProcessor::EventProcessor ( std::string const &  config,
ServiceToken const &  token = ServiceToken(),
serviceregistry::ServiceLegacy  iLegacy = serviceregistry::kOverlapIsError,
std::vector< std::string > const &  defaultServices = std::vector<std::string>(),
std::vector< std::string > const &  forcedServices = std::vector<std::string>() 
)
explicit

Definition at line 218 of file EventProcessor.cc.

References init(), edm::parameterSet(), and PythonProcessDesc::parameterSet().

222  :
223  actReg_(),
224  preg_(),
226  serviceToken_(),
227  input_(),
228  espController_(new eventsetup::EventSetupsController),
229  esp_(),
230  act_table_(),
232  schedule_(),
233  subProcesses_(),
234  historyAppender_(new HistoryAppender),
235  fb_(),
236  looper_(),
238  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
239  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
240  principalCache_(),
241  beginJobCalled_(false),
242  shouldWeStop_(false),
243  fileModeNoMerge_(false),
247  forceLooperToEnd_(false),
248  looperBeginJobRun_(false),
251  std::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
252  auto processDesc = std::make_shared<ProcessDesc>(parameterSet);
253  processDesc->addServices(defaultServices, forcedServices);
254  init(processDesc, iToken, iLegacy);
255  }
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
SharedResourcesAcquirer sourceResourcesAcquirer_
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
Definition: config.py:1
std::string exceptionMessageRuns_
std::vector< SubProcess > subProcesses_
U second(std::pair< T, U > const &p)
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
std::string exceptionMessageLumis_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
static SharedResourcesRegistry * instance()
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
std::shared_ptr< edm::ParameterSet > parameterSet() const
std::string exceptionMessageFiles_
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
T first(std::pair< T, U > const &p)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
ParameterSet const & parameterSet(Provenance const &provenance)
Definition: Provenance.cc:11
PrincipalCache principalCache_
edm::EventProcessor::EventProcessor ( std::string const &  config,
std::vector< std::string > const &  defaultServices,
std::vector< std::string > const &  forcedServices = std::vector<std::string>() 
)

Definition at line 257 of file EventProcessor.cc.

References init(), edm::serviceregistry::kOverlapIsError, edm::parameterSet(), and PythonProcessDesc::parameterSet().

259  :
260  actReg_(),
261  preg_(),
263  serviceToken_(),
264  input_(),
265  espController_(new eventsetup::EventSetupsController),
266  esp_(),
267  act_table_(),
269  schedule_(),
270  subProcesses_(),
271  historyAppender_(new HistoryAppender),
272  fb_(),
273  looper_(),
275  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
276  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
277  principalCache_(),
278  beginJobCalled_(false),
279  shouldWeStop_(false),
280  fileModeNoMerge_(false),
284  forceLooperToEnd_(false),
285  looperBeginJobRun_(false),
289  {
290  std::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
291  auto processDesc = std::make_shared<ProcessDesc>(parameterSet);
292  processDesc->addServices(defaultServices, forcedServices);
294  }
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
SharedResourcesAcquirer sourceResourcesAcquirer_
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
Definition: config.py:1
std::string exceptionMessageRuns_
std::vector< SubProcess > subProcesses_
U second(std::pair< T, U > const &p)
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
std::string exceptionMessageLumis_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
static SharedResourcesRegistry * instance()
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
std::shared_ptr< edm::ParameterSet > parameterSet() const
std::string exceptionMessageFiles_
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
bool asyncStopRequestedWhileProcessingEvents_
std::shared_ptr< ActivityRegistry > actReg_
T first(std::pair< T, U > const &p)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
ParameterSet const & parameterSet(Provenance const &provenance)
Definition: Provenance.cc:11
PrincipalCache principalCache_
edm::EventProcessor::EventProcessor ( std::shared_ptr< ProcessDesc processDesc,
ServiceToken const &  token,
serviceregistry::ServiceLegacy  legacy 
)

Definition at line 296 of file EventProcessor.cc.

References init().

298  :
299  actReg_(),
300  preg_(),
302  serviceToken_(),
303  input_(),
304  espController_(new eventsetup::EventSetupsController),
305  esp_(),
306  act_table_(),
308  schedule_(),
309  subProcesses_(),
310  historyAppender_(new HistoryAppender),
311  fb_(),
312  looper_(),
314  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
315  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
316  principalCache_(),
317  beginJobCalled_(false),
318  shouldWeStop_(false),
319  fileModeNoMerge_(false),
323  forceLooperToEnd_(false),
324  looperBeginJobRun_(false),
328  {
329  init(processDesc, token, legacy);
330  }
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
SharedResourcesAcquirer sourceResourcesAcquirer_
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
std::string exceptionMessageRuns_
std::vector< SubProcess > subProcesses_
U second(std::pair< T, U > const &p)
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
std::string exceptionMessageLumis_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
static SharedResourcesRegistry * instance()
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
std::string exceptionMessageFiles_
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
bool asyncStopRequestedWhileProcessingEvents_
std::shared_ptr< ActivityRegistry > actReg_
T first(std::pair< T, U > const &p)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
PrincipalCache principalCache_
edm::EventProcessor::EventProcessor ( std::string const &  config,
bool  isPython 
)

meant for unit tests

Definition at line 333 of file EventProcessor.cc.

References looper::config, init(), edm::serviceregistry::kOverlapIsError, edm::parameterSet(), and PythonProcessDesc::parameterSet().

333  :
334  actReg_(),
335  preg_(),
337  serviceToken_(),
338  input_(),
339  espController_(new eventsetup::EventSetupsController),
340  esp_(),
341  act_table_(),
343  schedule_(),
344  subProcesses_(),
345  historyAppender_(new HistoryAppender),
346  fb_(),
347  looper_(),
349  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
350  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
351  principalCache_(),
352  beginJobCalled_(false),
353  shouldWeStop_(false),
354  fileModeNoMerge_(false),
358  forceLooperToEnd_(false),
359  looperBeginJobRun_(false),
363  {
364  if(isPython) {
365  std::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
366  auto processDesc = std::make_shared<ProcessDesc>(parameterSet);
368  }
369  else {
370  auto processDesc = std::make_shared<ProcessDesc>(config);
372  }
373  }
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
SharedResourcesAcquirer sourceResourcesAcquirer_
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
Definition: config.py:1
std::string exceptionMessageRuns_
std::vector< SubProcess > subProcesses_
U second(std::pair< T, U > const &p)
config
Definition: looper.py:288
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
std::string exceptionMessageLumis_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
static SharedResourcesRegistry * instance()
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
std::shared_ptr< edm::ParameterSet > parameterSet() const
std::string exceptionMessageFiles_
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
bool asyncStopRequestedWhileProcessingEvents_
std::shared_ptr< ActivityRegistry > actReg_
T first(std::pair< T, U > const &p)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
ParameterSet const & parameterSet(Provenance const &provenance)
Definition: Provenance.cc:11
PrincipalCache principalCache_
edm::EventProcessor::~EventProcessor ( )

Definition at line 548 of file EventProcessor.cc.

References actReg_, edm::ParentageRegistry::clear(), edm::pset::Registry::clear(), esp_, espController_, getToken(), input_, edm::ParentageRegistry::instance(), edm::pset::Registry::instance(), looper_, and schedule_.

548  {
549  // Make the services available while everything is being deleted.
550  ServiceToken token = getToken();
551  ServiceRegistry::Operate op(token);
552 
553  // manually destroy all these thing that may need the services around
554  // propagate_const<T> has no reset() function
555  espController_ = nullptr;
556  esp_ = nullptr;
557  schedule_ = nullptr;
558  input_ = nullptr;
559  looper_ = nullptr;
560  actReg_ = nullptr;
561 
564  }
void clear()
Not thread safe.
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
void clear()
Not thread safe.
Definition: Registry.cc:45
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
ServiceToken getToken()
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
static ParentageRegistry * instance()
static Registry * instance()
Definition: Registry.cc:13
edm::EventProcessor::EventProcessor ( EventProcessor const &  )
delete

Member Function Documentation

void edm::EventProcessor::beginJob ( void  )

This should be called before the first call to 'run' If this is not called in time, it will automatically be called the first time 'run' is called

Definition at line 567 of file EventProcessor.cc.

References actReg_, cms::Exception::addContext(), bk::beginJob(), beginJobCalled_, edm::checkForModuleDependencyCorrectness(), edm::for_all(), mps_fire::i, edm::PathsAndConsumesOfModules::initialize(), input_, edm::PreallocationConfiguration::numberOfLuminosityBlocks(), edm::PreallocationConfiguration::numberOfRuns(), edm::PreallocationConfiguration::numberOfStreams(), edm::PreallocationConfiguration::numberOfThreads(), cmsPerfStripChart::operate(), pathsAndConsumesOfModules_, preallocations_, preg(), preg_, printDependencies_, processConfiguration_, processContext_, schedule_, serviceToken_, subProcesses_, and edm::convertException::wrap().

Referenced by runToCompletion().

567  {
568  if(beginJobCalled_) return;
569  beginJobCalled_=true;
570  bk::beginJob();
571 
572  // StateSentry toerror(this); // should we add this ?
573  //make the services available
575 
576  service::SystemBounds bounds(preallocations_.numberOfStreams(),
580  actReg_->preallocateSignal_(bounds);
581  schedule_->convertCurrentProcessAlias(processConfiguration_->processName());
583 
584  //NOTE: this may throw
586  actReg_->preBeginJobSignal_(pathsAndConsumesOfModules_, processContext_);
587 
588  //NOTE: This implementation assumes 'Job' means one call
589  // the EventProcessor::run
590  // If it really means once per 'application' then this code will
591  // have to be changed.
592  // Also have to deal with case where have 'run' then new Module
593  // added and do 'run'
594  // again. In that case the newly added Module needs its 'beginJob'
595  // to be called.
596 
597  //NOTE: in future we should have a beginOfJob for looper that takes no arguments
598  // For now we delay calling beginOfJob until first beginOfRun
599  //if(looper_) {
600  // looper_->beginOfJob(es);
601  //}
602  try {
603  convertException::wrap([&]() {
604  input_->doBeginJob();
605  });
606  }
607  catch(cms::Exception& ex) {
608  ex.addContext("Calling beginJob for the source");
609  throw;
610  }
611  schedule_->beginJob(*preg_);
612  // toerror.succeeded(); // should we add this?
613  for_all(subProcesses_, [](auto& subProcess){ subProcess.doBeginJob(); });
614  actReg_->postBeginJobSignal_();
615 
616  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
617  schedule_->beginStream(i);
618  for_all(subProcesses_, [i](auto& subProcess){ subProcess.doBeginStream(i); });
619  }
620  }
ProcessContext processContext_
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
PreallocationConfiguration preallocations_
std::vector< SubProcess > subProcesses_
void beginJob()
Definition: Breakpoints.cc:15
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
void initialize(Schedule const *, std::shared_ptr< ProductRegistry const >)
ServiceToken serviceToken_
std::shared_ptr< ProductRegistry const > preg() const
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
void checkForModuleDependencyCorrectness(edm::PathsAndConsumesOfModulesBase const &iPnC, bool iPrintDependencies)
void addContext(std::string const &context)
Definition: Exception.cc:227
PathsAndConsumesOfModules pathsAndConsumesOfModules_
auto wrap(F iFunc) -> decltype(iFunc())
std::shared_ptr< ActivityRegistry > actReg_
def operate(timelog, memlog, json_f, num)
void edm::EventProcessor::beginLumiAsync ( edm::IOVSyncValue const &  iSyncValue,
std::shared_ptr< void > const &  iRunResource,
edm::WaitingTaskHolder  iHolder 
)

Definition at line 1115 of file EventProcessor.cc.

References actReg_, edm::LuminosityBlockPrincipal::beginTime(), edm::WaitingTaskHolder::doneWaiting(), esp_, espController_, edm::PrincipalCache::eventPrincipal(), h, handleNextEventForStreamAsync(), mps_fire::i, input_, iovQueue_, edm::Service< T >::isAvailable(), looper_, edm::LuminosityBlockPrincipal::luminosityBlock(), lumiQueue_, edm::make_waiting_task(), eostools::move(), edm::PreallocationConfiguration::numberOfStreams(), cmsPerfStripChart::operate(), edm::SerialTaskQueue::pause(), preallocations_, principalCache_, processContext_, edm::SerialTaskQueueChain::push(), edm::SerialTaskQueue::push(), readLuminosityBlock(), edm::LuminosityBlockPrincipal::run(), schedule_, edm::SharedResourcesAcquirer::serialQueueChain(), serviceToken_, sourceResourcesAcquirer_, mps_update::status, streamLumiActive_, streamLumiStatus_, streamQueues_, subProcesses_, and edm::WaitingTaskHolder::taskHasFailed().

Referenced by handleNextEventForStreamAsync(), and processLumis().

1116  {
1117  if(iHolder.taskHasFailed()) { return; }
1118 
1119  auto status= std::make_shared<LuminosityBlockProcessingStatus>(this, preallocations_.numberOfStreams(), iRunResource) ;
1120 
1121  auto lumiWork = [this, iHolder, status](edm::LimitedTaskQueue::Resumer iResumer) mutable {
1122  if(iHolder.taskHasFailed()) { return; }
1123 
1124  status->setResumer(std::move(iResumer));
1125 
1126  sourceResourcesAcquirer_.serialQueueChain().push([this,iHolder,status]() mutable {
1127  //make the services available
1129 
1130  try {
1132 
1133  LuminosityBlockPrincipal& lumiPrincipal = *status->lumiPrincipal();
1134  {
1135  SendSourceTerminationSignalIfException sentry(actReg_.get());
1136 
1137  input_->doBeginLumi(lumiPrincipal, &processContext_);
1138  sentry.completedSuccessfully();
1139  }
1140 
1142  if(rng.isAvailable()) {
1143  LuminosityBlock lb(lumiPrincipal, ModuleDescription(), nullptr, false);
1144  rng->preBeginLumi(lb);
1145  }
1146 
1147  IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), 0), lumiPrincipal.beginTime());
1148 
1149  //Task to start the stream beginLumis
1150  auto beginStreamsTask= make_waiting_task(tbb::task::allocate_root()
1151  ,[this, holder = iHolder, status, ts] (std::exception_ptr const* iPtr) mutable {
1152  if (iPtr) {
1153  holder.doneWaiting(*iPtr);
1154  } else {
1155 
1156  status->globalBeginDidSucceed();
1157  EventSetup const& es = esp_->eventSetup();
1158  if(looper_) {
1159  try {
1160  //make the services available
1162  looper_->doBeginLuminosityBlock(*(status->lumiPrincipal()), es, &processContext_);
1163  }catch(...) {
1164  holder.doneWaiting(std::current_exception());
1165  return;
1166  }
1167  }
1168  typedef OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamBegin> Traits;
1169 
1170  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
1171  streamQueues_[i].push([this,i,status,holder,ts,&es] () {
1172  streamQueues_[i].pause();
1173 
1174  auto eventTask = edm::make_waiting_task(tbb::task::allocate_root(),
1175  [this,i,h = holder](std::exception_ptr const* iPtr) mutable
1176  {
1177  if(iPtr) {
1178  h.doneWaiting(*iPtr);
1179  } else {
1181  }
1182  });
1183  auto& event = principalCache_.eventPrincipal(i);
1186  auto lp = status->lumiPrincipal();
1187  event.setLuminosityBlockPrincipal(lp.get());
1188  beginStreamTransitionAsync<Traits>(WaitingTaskHolder{eventTask},
1189  *schedule_,i,*lp,ts,es,
1191  });
1192  }
1193  }
1194  });
1195 
1196  //task to start the global begin lumi
1197  WaitingTaskHolder beginStreamsHolder{beginStreamsTask};
1198  EventSetup const& es = esp_->eventSetup();
1199  {
1200  typedef OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalBegin> Traits;
1201  beginGlobalTransitionAsync<Traits>(beginStreamsHolder,
1202  *schedule_,
1203  *(status->lumiPrincipal()),
1204  ts,
1205  es,
1206  serviceToken_,
1207  subProcesses_);
1208  }
1209  } catch(...) {
1210  iHolder.doneWaiting(std::current_exception());
1211  }
1212  });
1213  };
1214 
1215  //Safe to do check now since can not have multiple beginLumis at same time in this part of the code
1216  // because we do not attempt to read from the source again until we try to get the first event in a lumi
1217  if(espController_->isWithinValidityInterval(iSync)) {
1218  iovQueue_.pause();
1219  lumiQueue_->pushAndPause(std::move(lumiWork));
1220  } else {
1221  //If EventSetup fails, need beginStreamsHolder in order to pass back exception
1222  iovQueue_.push([this,iHolder,lumiWork,iSync]() mutable {
1223  try {
1224  SendSourceTerminationSignalIfException sentry(actReg_.get());
1225  espController_->eventSetupForInstance(iSync);
1226  sentry.completedSuccessfully();
1227  } catch(...) {
1228  iHolder.doneWaiting(std::current_exception());
1229  return;
1230  }
1231  iovQueue_.pause();
1232  lumiQueue_->pushAndPause(std::move(lumiWork));
1233  });
1234  }
1235  }
void readLuminosityBlock(LuminosityBlockProcessingStatus &)
ProcessContext processContext_
SharedResourcesAcquirer sourceResourcesAcquirer_
FWCore Framework interface EventSetupRecordImplementation h
Helper function to determine trigger accepts.
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< edm::LimitedTaskQueue > lumiQueue_
void push(T &&iAction)
asynchronously pushes functor iAction into queue
PreallocationConfiguration preallocations_
std::vector< SubProcess > subProcesses_
void handleNextEventForStreamAsync(WaitingTaskHolder iTask, unsigned int iStreamIndex)
ServiceToken serviceToken_
void doneWaiting(std::exception_ptr iExcept)
std::vector< edm::SerialTaskQueue > streamQueues_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
SerialTaskQueueChain & serialQueueChain() const
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:92
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
void push(const T &iAction)
asynchronously pushes functor iAction into queue
edm::SerialTaskQueue iovQueue_
std::shared_ptr< ActivityRegistry > actReg_
std::atomic< unsigned int > streamLumiActive_
bool pause()
Pauses processing of additional tasks from the queue.
def move(src, dest)
Definition: eostools.py:510
PrincipalCache principalCache_
def operate(timelog, memlog, json_f, num)
void edm::EventProcessor::beginRun ( ProcessHistoryID const &  phid,
RunNumber_t  run,
bool &  globalBeginSucceeded 
)

Definition at line 939 of file EventProcessor.cc.

References actReg_, edm::RunPrincipal::beginTime(), esp_, espController_, FDEBUG, forceESCacheClearOnNewRun_, input_, looper_, looperBeginJobRun_, edm::make_empty_waiting_task(), edm::PreallocationConfiguration::numberOfStreams(), preallocations_, principalCache_, processContext_, edm::RunPrincipal::run(), edm::PrincipalCache::runPrincipal(), schedule_, serviceToken_, and subProcesses_.

939  {
940  globalBeginSucceeded = false;
941  RunPrincipal& runPrincipal = principalCache_.runPrincipal(phid, run);
942  {
943  SendSourceTerminationSignalIfException sentry(actReg_.get());
944 
945  input_->doBeginRun(runPrincipal, &processContext_);
946  sentry.completedSuccessfully();
947  }
948 
949  IOVSyncValue ts(EventID(runPrincipal.run(), 0, 0),
950  runPrincipal.beginTime());
952  espController_->forceCacheClear();
953  }
954  {
955  SendSourceTerminationSignalIfException sentry(actReg_.get());
956  espController_->eventSetupForInstance(ts);
957  sentry.completedSuccessfully();
958  }
959  EventSetup const& es = esp_->eventSetup();
960  if(looper_ && looperBeginJobRun_== false) {
961  looper_->copyInfo(ScheduleInfo(schedule_.get()));
962  looper_->beginOfJob(es);
963  looperBeginJobRun_ = true;
964  looper_->doStartingNewLoop();
965  }
966  {
967  typedef OccurrenceTraits<RunPrincipal, BranchActionGlobalBegin> Traits;
968  auto globalWaitTask = make_empty_waiting_task();
969  globalWaitTask->increment_ref_count();
970  beginGlobalTransitionAsync<Traits>(WaitingTaskHolder(globalWaitTask.get()),
971  *schedule_,
972  runPrincipal,
973  ts,
974  es,
976  subProcesses_);
977  globalWaitTask->wait_for_all();
978  if(globalWaitTask->exceptionPtr() != nullptr) {
979  std::rethrow_exception(* (globalWaitTask->exceptionPtr()) );
980  }
981  }
982  globalBeginSucceeded = true;
983  FDEBUG(1) << "\tbeginRun " << run << "\n";
984  if(looper_) {
985  looper_->doBeginRun(runPrincipal, es, &processContext_);
986  }
987  {
988  //To wait, the ref count has to be 1+#streams
989  auto streamLoopWaitTask = make_empty_waiting_task();
990  streamLoopWaitTask->increment_ref_count();
991 
992  typedef OccurrenceTraits<RunPrincipal, BranchActionStreamBegin> Traits;
993 
994  beginStreamsTransitionAsync<Traits>(streamLoopWaitTask.get(),
995  *schedule_,
997  runPrincipal,
998  ts,
999  es,
1000  serviceToken_,
1001  subProcesses_);
1002 
1003  streamLoopWaitTask->wait_for_all();
1004  if(streamLoopWaitTask->exceptionPtr() != nullptr) {
1005  std::rethrow_exception(* (streamLoopWaitTask->exceptionPtr()) );
1006  }
1007  }
1008  FDEBUG(1) << "\tstreamBeginRun " << run << "\n";
1009  if(looper_) {
1010  //looper_->doStreamBeginRun(schedule_->streamID(),runPrincipal, es);
1011  }
1012  }
ProcessContext processContext_
edm::propagate_const< std::unique_ptr< InputSource > > input_
#define FDEBUG(lev)
Definition: DebugMacros.h:20
PreallocationConfiguration preallocations_
std::vector< SubProcess > subProcesses_
ServiceToken serviceToken_
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
PrincipalCache principalCache_
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const
std::shared_ptr<BranchIDListHelper const> edm::EventProcessor::branchIDListHelper ( ) const
inlineprivate

Definition at line 276 of file EventProcessor.h.

References edm::get_underlying_safe().

Referenced by init().

std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
std::shared_ptr<BranchIDListHelper>& edm::EventProcessor::branchIDListHelper ( )
inlineprivate

Definition at line 277 of file EventProcessor.h.

References edm::get_underlying_safe().

std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
bool edm::EventProcessor::checkForAsyncStopRequest ( StatusCode returnCode)
private

Definition at line 703 of file EventProcessor.cc.

References epSignal, and edm::shutdown_flag.

Referenced by nextTransitionType().

703  {
704  bool returnValue = false;
705 
706  // Look for a shutdown signal
707  if(shutdown_flag.load(std::memory_order_acquire)) {
708  returnValue = true;
710  }
711  return returnValue;
712  }
volatile std::atomic< bool > shutdown_flag
void edm::EventProcessor::clearCounters ( )

Clears counters used by trigger report.

Definition at line 694 of file EventProcessor.cc.

References schedule_.

694  {
695  schedule_->clearCounters();
696  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
void edm::EventProcessor::closeInputFile ( bool  cleaningUpAfterException)

Definition at line 840 of file EventProcessor.cc.

References actReg_, fb_, FDEBUG, and input_.

840  {
841  if (fb_.get() != nullptr) {
842  SendSourceTerminationSignalIfException sentry(actReg_.get());
843  input_->closeFile(fb_.get(), cleaningUpAfterException);
844  sentry.completedSuccessfully();
845  }
846  FDEBUG(1) << "\tcloseInputFile\n";
847  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
#define FDEBUG(lev)
Definition: DebugMacros.h:20
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
std::shared_ptr< ActivityRegistry > actReg_
void edm::EventProcessor::closeOutputFiles ( )

Definition at line 857 of file EventProcessor.cc.

References fb_, FDEBUG, edm::for_all(), schedule_, and subProcesses_.

857  {
858  if (fb_.get() != nullptr) {
859  schedule_->closeOutputFiles();
860  for_all(subProcesses_, [](auto& subProcess){ subProcess.closeOutputFiles(); });
861  }
862  FDEBUG(1) << "\tcloseOutputFiles\n";
863  }
#define FDEBUG(lev)
Definition: DebugMacros.h:20
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
void edm::EventProcessor::continueLumiAsync ( edm::WaitingTaskHolder  iHolder)

Definition at line 1238 of file EventProcessor.cc.

References h, handleNextEventForStreamAsync(), edm::make_functor_task(), eostools::move(), edm::PreallocationConfiguration::numberOfStreams(), preallocations_, mps_update::status, and streamLumiStatus_.

Referenced by processLumis().

1238  {
1239  {
1240  //all streams are sharing the same status at the moment
1241  auto status = streamLumiStatus_[0]; //read from streamLumiActive_ happened in calling routine
1242  status->needToContinueLumi();
1243  status->startProcessingEvents();
1244  }
1245 
1246  unsigned int streamIndex = 0;
1247  for(; streamIndex< preallocations_.numberOfStreams()-1; ++streamIndex) {
1248  tbb::task::enqueue( *edm::make_functor_task(tbb::task::allocate_root(),
1249  [this,streamIndex,h = iHolder](){
1250  handleNextEventForStreamAsync(std::move(h), streamIndex);
1251  }) );
1252 
1253  }
1254  tbb::task::spawn( *edm::make_functor_task(tbb::task::allocate_root(),[this,streamIndex,h=std::move(iHolder)](){
1256  }) );
1257  }
FWCore Framework interface EventSetupRecordImplementation h
Helper function to determine trigger accepts.
FunctorTask< F > * make_functor_task(ALLOC &&iAlloc, F f)
Definition: FunctorTask.h:47
PreallocationConfiguration preallocations_
void handleNextEventForStreamAsync(WaitingTaskHolder iTask, unsigned int iStreamIndex)
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
def move(src, dest)
Definition: eostools.py:510
void edm::EventProcessor::deleteLumiFromCache ( LuminosityBlockProcessingStatus iStatus)

Definition at line 1501 of file EventProcessor.cc.

References edm::LuminosityBlockProcessingStatus::lumiPrincipal(), alignCSCRings::s, and subProcesses_.

Referenced by globalEndLumiAsync().

1501  {
1502  for(auto& s: subProcesses_) { s.deleteLumiFromCache(*iStatus.lumiPrincipal());}
1503  iStatus.lumiPrincipal()->clearPrincipal();
1504  //FDEBUG(1) << "\tdeleteLumiFromCache " << run << "/" << lumi << "\n";
1505  }
std::vector< SubProcess > subProcesses_
void edm::EventProcessor::deleteRunFromCache ( ProcessHistoryID const &  phid,
RunNumber_t  run 
)

Definition at line 1479 of file EventProcessor.cc.

References edm::PrincipalCache::deleteRun(), FDEBUG, edm::for_all(), principalCache_, and subProcesses_.

Referenced by endUnfinishedRun().

1479  {
1480  principalCache_.deleteRun(phid, run);
1481  for_all(subProcesses_, [run,phid](auto& subProcess){ subProcess.deleteRunFromCache(phid, run); });
1482  FDEBUG(1) << "\tdeleteRunFromCache " << run << "\n";
1483  }
#define FDEBUG(lev)
Definition: DebugMacros.h:20
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
void deleteRun(ProcessHistoryID const &phid, RunNumber_t run)
PrincipalCache principalCache_
void edm::EventProcessor::doErrorStuff ( )

Definition at line 929 of file EventProcessor.cc.

References FDEBUG.

Referenced by runToCompletion().

929  {
930  FDEBUG(1) << "\tdoErrorStuff\n";
931  LogError("StateMachine")
932  << "The EventProcessor state machine encountered an unexpected event\n"
933  << "and went to the error state\n"
934  << "Will attempt to terminate processing normally\n"
935  << "(IF using the looper the next loop will be attempted)\n"
936  << "This likely indicates a bug in an input module or corrupted input or both\n";
937  }
#define FDEBUG(lev)
Definition: DebugMacros.h:20
void edm::EventProcessor::enableEndPaths ( bool  active)

Turn end_paths "off" if "active" is false; turn end_paths "on" if "active" is true.

Definition at line 679 of file EventProcessor.cc.

References schedule_.

679  {
680  schedule_->enableEndPaths(active);
681  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
void edm::EventProcessor::endJob ( void  )

This should be called before the EventProcessor is destroyed throws if any module's endJob throws an exception.

Definition at line 623 of file EventProcessor.cc.

References actReg_, EnergyCorrector::c, edm::ExceptionCollector::call(), edm::SubProcess::doEndJob(), edm::InputSource::doEndJob(), edm::EDLooperBase::endOfJob(), edm::ExceptionCollector::hasThrown(), mps_fire::i, input_, looper(), looper_, edm::PreallocationConfiguration::numberOfStreams(), cmsPerfStripChart::operate(), preallocations_, edm::ExceptionCollector::rethrow(), schedule_, serviceToken_, and subProcesses_.

Referenced by PythonEventProcessor::~PythonEventProcessor().

623  {
624  // Collects exceptions, so we don't throw before all operations are performed.
625  ExceptionCollector c("Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
626 
627  //make the services available
629 
630  //NOTE: this really should go elsewhere in the future
631  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
632  c.call([this,i](){this->schedule_->endStream(i);});
633  for(auto& subProcess : subProcesses_) {
634  c.call([&subProcess,i](){ subProcess.doEndStream(i); } );
635  }
636  }
637  auto actReg = actReg_.get();
638  c.call([actReg](){actReg->preEndJobSignal_();});
639  schedule_->endJob(c);
640  for(auto& subProcess : subProcesses_) {
641  c.call(std::bind(&SubProcess::doEndJob, &subProcess));
642  }
643  c.call(std::bind(&InputSource::doEndJob, input_.get()));
644  if(looper_) {
645  c.call(std::bind(&EDLooperBase::endOfJob, looper()));
646  }
647  c.call([actReg](){actReg->postEndJobSignal_();});
648  if(c.hasThrown()) {
649  c.rethrow();
650  }
651  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
void doEndJob()
Called by framework at end of job.
Definition: InputSource.cc:219
PreallocationConfiguration preallocations_
std::vector< SubProcess > subProcesses_
ServiceToken serviceToken_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
virtual void endOfJob()
Definition: EDLooperBase.cc:90
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ActivityRegistry > actReg_
std::shared_ptr< EDLooperBase const > looper() const
def operate(timelog, memlog, json_f, num)
bool edm::EventProcessor::endOfLoop ( )

Definition at line 892 of file EventProcessor.cc.

References esp_, FDEBUG, forceLooperToEnd_, edm::EDLooperBase::kContinue, looper_, preg_, schedule_, and mps_update::status.

Referenced by runToCompletion().

892  {
893  if(looper_) {
894  ModuleChanger changer(schedule_.get(),preg_.get());
895  looper_->setModuleChanger(&changer);
896  EDLooperBase::Status status = looper_->doEndOfLoop(esp_->eventSetup());
897  looper_->setModuleChanger(nullptr);
898  if(status != EDLooperBase::kContinue || forceLooperToEnd_) return true;
899  else return false;
900  }
901  FDEBUG(1) << "\tendOfLoop\n";
902  return true;
903  }
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
#define FDEBUG(lev)
Definition: DebugMacros.h:20
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
bool edm::EventProcessor::endPathsEnabled ( ) const

Return true if end_paths are active, and false if they are inactive.

Definition at line 684 of file EventProcessor.cc.

References schedule_.

684  {
685  return schedule_->endPathsEnabled();
686  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
void edm::EventProcessor::endRun ( ProcessHistoryID const &  phid,
RunNumber_t  run,
bool  globalBeginSucceeded,
bool  cleaningUpAfterException 
)

Definition at line 1030 of file EventProcessor.cc.

References actReg_, edm::RunPrincipal::endTime(), esp_, espController_, FDEBUG, input_, looper_, edm::make_empty_waiting_task(), edm::EventID::maxEventNumber(), edm::LuminosityBlockID::maxLuminosityBlockNumber(), edm::PreallocationConfiguration::numberOfStreams(), preallocations_, principalCache_, processContext_, edm::RunPrincipal::run(), edm::PrincipalCache::runPrincipal(), schedule_, serviceToken_, edm::RunPrincipal::setEndTime(), and subProcesses_.

Referenced by endUnfinishedRun().

1030  {
1031  RunPrincipal& runPrincipal = principalCache_.runPrincipal(phid, run);
1032  runPrincipal.setEndTime(input_->timestamp());
1033 
1035  runPrincipal.endTime());
1036  {
1037  SendSourceTerminationSignalIfException sentry(actReg_.get());
1038  espController_->eventSetupForInstance(ts);
1039  sentry.completedSuccessfully();
1040  }
1041  EventSetup const& es = esp_->eventSetup();
1042  if(globalBeginSucceeded){
1043  //To wait, the ref count has to be 1+#streams
1044  auto streamLoopWaitTask = make_empty_waiting_task();
1045  streamLoopWaitTask->increment_ref_count();
1046 
1047  typedef OccurrenceTraits<RunPrincipal, BranchActionStreamEnd> Traits;
1048 
1049  endStreamsTransitionAsync<Traits>(WaitingTaskHolder(streamLoopWaitTask.get()),
1050  *schedule_,
1052  runPrincipal,
1053  ts,
1054  es,
1055  serviceToken_,
1056  subProcesses_,
1057  cleaningUpAfterException);
1058 
1059  streamLoopWaitTask->wait_for_all();
1060  if(streamLoopWaitTask->exceptionPtr() != nullptr) {
1061  std::rethrow_exception(* (streamLoopWaitTask->exceptionPtr()) );
1062  }
1063  }
1064  FDEBUG(1) << "\tstreamEndRun " << run << "\n";
1065  if(looper_) {
1066  //looper_->doStreamEndRun(schedule_->streamID(),runPrincipal, es);
1067  }
1068  {
1069  auto globalWaitTask = make_empty_waiting_task();
1070  globalWaitTask->increment_ref_count();
1071 
1072  typedef OccurrenceTraits<RunPrincipal, BranchActionGlobalEnd> Traits;
1073  endGlobalTransitionAsync<Traits>(WaitingTaskHolder(globalWaitTask.get()),
1074  *schedule_,
1075  runPrincipal,
1076  ts,
1077  es,
1078  serviceToken_,
1079  subProcesses_,
1080  cleaningUpAfterException);
1081  globalWaitTask->wait_for_all();
1082  if(globalWaitTask->exceptionPtr() != nullptr) {
1083  std::rethrow_exception(* (globalWaitTask->exceptionPtr()) );
1084  }
1085  }
1086  FDEBUG(1) << "\tendRun " << run << "\n";
1087  if(looper_) {
1088  looper_->doEndRun(runPrincipal, es, &processContext_);
1089  }
1090  }
ProcessContext processContext_
edm::propagate_const< std::unique_ptr< InputSource > > input_
static LuminosityBlockNumber_t maxLuminosityBlockNumber()
#define FDEBUG(lev)
Definition: DebugMacros.h:20
PreallocationConfiguration preallocations_
std::vector< SubProcess > subProcesses_
void setEndTime(Timestamp const &time)
Definition: RunPrincipal.h:81
ServiceToken serviceToken_
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
static EventNumber_t maxEventNumber()
Definition: EventID.h:111
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
PrincipalCache principalCache_
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const
void edm::EventProcessor::endUnfinishedLumi ( )

Definition at line 1380 of file EventProcessor.cc.

References mps_fire::i, edm::make_empty_waiting_task(), edm::PreallocationConfiguration::numberOfStreams(), preallocations_, streamEndLumiAsync(), streamLumiActive_, and streamLumiStatus_.

1380  {
1381  if(streamLumiActive_.load() > 0) {
1382  auto globalWaitTask = make_empty_waiting_task();
1383  globalWaitTask->increment_ref_count();
1384  {
1385  WaitingTaskHolder globalTaskHolder{globalWaitTask.get()};
1386  for(unsigned int i=0; i< preallocations_.numberOfStreams(); ++i) {
1387  if(streamLumiStatus_[i]) {
1388  streamEndLumiAsync(globalTaskHolder, i, streamLumiStatus_[i]);
1389  }
1390  }
1391  }
1392  globalWaitTask->wait_for_all();
1393  if(globalWaitTask->exceptionPtr() != nullptr) {
1394  std::rethrow_exception(* (globalWaitTask->exceptionPtr()) );
1395  }
1396  }
1397  }
void streamEndLumiAsync(edm::WaitingTaskHolder iTask, unsigned int iStreamIndex, std::shared_ptr< LuminosityBlockProcessingStatus > iLumiStatus)
PreallocationConfiguration preallocations_
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
std::atomic< unsigned int > streamLumiActive_
void edm::EventProcessor::endUnfinishedRun ( ProcessHistoryID const &  phid,
RunNumber_t  run,
bool  globalBeginSucceeded,
bool  cleaningUpAfterException 
)

Definition at line 1014 of file EventProcessor.cc.

References deleteRunFromCache(), endRun(), edm::make_empty_waiting_task(), run(), lumiQTWidget::t, and writeRunAsync().

1014  {
1015  //If we skip empty runs, this would be called conditionally
1016  endRun(phid, run, globalBeginSucceeded, cleaningUpAfterException);
1017 
1018  if(globalBeginSucceeded) {
1020  t->increment_ref_count();
1021  writeRunAsync(edm::WaitingTaskHolder{t.get()}, phid, run);
1022  t->wait_for_all();
1023  if(t->exceptionPtr()) {
1024  std::rethrow_exception(*t->exceptionPtr());
1025  }
1026  }
1027  deleteRunFromCache(phid, run);
1028  }
void deleteRunFromCache(ProcessHistoryID const &phid, RunNumber_t run)
void writeRunAsync(WaitingTaskHolder, ProcessHistoryID const &phid, RunNumber_t run)
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
void endRun(ProcessHistoryID const &phid, RunNumber_t run, bool globalBeginSucceeded, bool cleaningUpAfterException)
std::vector< ModuleDescription const * > edm::EventProcessor::getAllModuleDescriptions ( ) const

Return a vector allowing const access to all the ModuleDescriptions for this EventProccessor. *** N.B. *** Ownership of the ModuleDescriptions is not *** passed to the caller. Do not call delete on these *** pointers!

Definition at line 659 of file EventProcessor.cc.

References schedule_.

659  {
660  return schedule_->getAllModuleDescriptions();
661  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
ServiceToken edm::EventProcessor::getToken ( )

Definition at line 654 of file EventProcessor.cc.

References serviceToken_.

Referenced by ~EventProcessor().

654  {
655  return serviceToken_;
656  }
ServiceToken serviceToken_
void edm::EventProcessor::getTriggerReport ( TriggerReport rep) const

Return the trigger report information on paths, modules-in-path, modules-in-endpath, and modules.

Definition at line 689 of file EventProcessor.cc.

References schedule_.

689  {
690  schedule_->getTriggerReport(rep);
691  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
rep
Definition: cuy.py:1189
void edm::EventProcessor::globalEndLumiAsync ( edm::WaitingTaskHolder  iTask,
std::shared_ptr< LuminosityBlockProcessingStatus iLumiStatus 
)

Definition at line 1259 of file EventProcessor.cc.

References deleteLumiFromCache(), edm::WaitingTaskHolder::doneWaiting(), esp_, iovQueue_, mps_monitormerge::items, looper_, edm::make_waiting_task(), edm::EventID::maxEventNumber(), eostools::move(), cmsPerfStripChart::operate(), processContext_, edm::SerialTaskQueue::resume(), schedule_, serviceToken_, mps_update::status, subProcesses_, lumiQTWidget::t, tmp, and writeLumiAsync().

Referenced by streamEndLumiAsync().

1259  {
1260  //Need to be sure iTask is always destroyed after iLumiStatus since iLumiStatus can cause endRun to start.
1261  auto t = edm::make_waiting_task(tbb::task::allocate_root(), [ items = std::make_pair(iLumiStatus,std::move(iTask)), this] (std::exception_ptr const* iPtr) mutable {
1262  std::exception_ptr ptr;
1263  //use an easier to remember variable name
1264  auto status = std::move(items.first);
1265  if(iPtr) {
1266  ptr = *iPtr;
1267  WaitingTaskHolder tmp(items.second);
1268  //set the exception early to prevent a beginLumi from running
1269  // we use a copy to keep t from resetting on doneWaiting call.
1270  tmp.doneWaiting(ptr);
1271  } else {
1272  try {
1274  if(looper_) {
1275  auto& lp = *(status->lumiPrincipal());
1276  EventSetup const& es = esp_->eventSetup();
1277  looper_->doEndLuminosityBlock(lp, es, &processContext_);
1278  }
1279  }catch(...) {
1280  if(not ptr) {
1281  ptr = std::current_exception();
1282  }
1283  }
1284  }
1286  try {
1288  //release our hold on the IOV
1289  iovQueue_.resume();
1290  status->resumeGlobalLumiQueue();
1291  } catch(...) {
1292  if( not ptr) {
1293  ptr = std::current_exception();
1294  }
1295  }
1296  try {
1297  status.reset();
1298  } catch(...) {
1299  if( not ptr) {
1300  ptr = std::current_exception();
1301  }
1302  }
1303  //have to wait until reset is called since that could call endRun
1304  items.second.doneWaiting(ptr);
1305  });
1306 
1307  auto writeT = edm::make_waiting_task(tbb::task::allocate_root(), [this,status =iLumiStatus, task = WaitingTaskHolder(t)] (std::exception_ptr const* iExcept) mutable {
1308  if(iExcept) {
1309  task.doneWaiting(*iExcept);
1310  } else {
1311  //Only call writeLumi if beginLumi succeeded
1312  if(status->didGlobalBeginSucceed()) {
1314  }
1315  }
1316  });
1317  auto& lp = *(iLumiStatus->lumiPrincipal());
1318 
1319  IOVSyncValue ts(EventID(lp.run(), lp.luminosityBlock(), EventID::maxEventNumber()),
1320  lp.beginTime());
1321 
1322 
1323  typedef OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalEnd> Traits;
1324  EventSetup const& es = esp_->eventSetup();
1325 
1326  endGlobalTransitionAsync<Traits>(WaitingTaskHolder(writeT),
1327  *schedule_,
1328  lp,
1329  ts,
1330  es,
1331  serviceToken_,
1332  subProcesses_,
1333  iLumiStatus->cleaningUpAfterException());
1334  }
ProcessContext processContext_
void deleteLumiFromCache(LuminosityBlockProcessingStatus &)
std::vector< SubProcess > subProcesses_
ServiceToken serviceToken_
bool resume()
Resumes processing if the queue was paused.
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
void writeLumiAsync(WaitingTaskHolder, std::shared_ptr< LuminosityBlockProcessingStatus >)
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:92
static EventNumber_t maxEventNumber()
Definition: EventID.h:111
std::vector< std::vector< double > > tmp
Definition: MVATrainer.cc:100
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
edm::SerialTaskQueue iovQueue_
def move(src, dest)
Definition: eostools.py:510
def operate(timelog, memlog, json_f, num)
void edm::EventProcessor::handleNextEventForStreamAsync ( WaitingTaskHolder  iTask,
unsigned int  iStreamIndex 
)
private

Definition at line 1566 of file EventProcessor.cc.

References beginLumiAsync(), deferredExceptionPtr_, deferredExceptionPtrIsSet_, edm::WaitingTaskHolder::doneWaiting(), MillePedeFileConverter_cfg::e, edm::InputSource::IsLumi, lastTransitionType(), edm::make_waiting_task(), eostools::move(), cmsPerfStripChart::operate(), processEventAsync(), edm::SerialTaskQueueChain::push(), readNextEventForStream(), edm::SharedResourcesAcquirer::serialQueueChain(), serviceToken_, sourceResourcesAcquirer_, mps_update::status, streamEndLumiAsync(), and streamLumiStatus_.

Referenced by beginLumiAsync(), and continueLumiAsync().

1568  {
1569  sourceResourcesAcquirer_.serialQueueChain().push([this,iTask,iStreamIndex]() mutable {
1571  auto& status = streamLumiStatus_[iStreamIndex];
1572  try {
1573  if(readNextEventForStream(iStreamIndex, *status) ) {
1574  auto recursionTask = make_waiting_task(tbb::task::allocate_root(), [this,iTask,iStreamIndex](std::exception_ptr const* iPtr) mutable {
1575  if(iPtr) {
1576  bool expected = false;
1577  if(deferredExceptionPtrIsSet_.compare_exchange_strong(expected,true)) {
1578  deferredExceptionPtr_ = *iPtr;
1579  iTask.doneWaiting(*iPtr);
1580  }
1581  //the stream will stop now
1582  return;
1583  }
1584  handleNextEventForStreamAsync(std::move(iTask), iStreamIndex);
1585  });
1586 
1587  processEventAsync( WaitingTaskHolder(recursionTask), iStreamIndex);
1588  } else {
1589  //the stream will stop now
1590  if(status->isLumiEnding()) {
1591  if(lastTransitionType() == InputSource::IsLumi and not status->haveStartedNextLumi()) {
1592  status->startNextLumi();
1593  beginLumiAsync(status->nextSyncValue(), status->runResource(), iTask);
1594  }
1595  streamEndLumiAsync(std::move(iTask),iStreamIndex, status);
1596  } else {
1597  iTask.doneWaiting(std::exception_ptr{});
1598  }
1599  }
1600  } catch(...) {
1601  bool expected = false;
1602  if(deferredExceptionPtrIsSet_.compare_exchange_strong(expected,true)) {
1603  auto e =std::current_exception();
1605  iTask.doneWaiting(e);
1606  }
1607  }
1608  });
1609  }
bool readNextEventForStream(unsigned int iStreamIndex, LuminosityBlockProcessingStatus &iLumiStatus)
SharedResourcesAcquirer sourceResourcesAcquirer_
void processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
void streamEndLumiAsync(edm::WaitingTaskHolder iTask, unsigned int iStreamIndex, std::shared_ptr< LuminosityBlockProcessingStatus > iLumiStatus)
void push(T &&iAction)
asynchronously pushes functor iAction into queue
void handleNextEventForStreamAsync(WaitingTaskHolder iTask, unsigned int iStreamIndex)
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
InputSource::ItemType lastTransitionType() const
SerialTaskQueueChain & serialQueueChain() const
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:92
void beginLumiAsync(edm::IOVSyncValue const &iSyncValue, std::shared_ptr< void > const &iRunResource, edm::WaitingTaskHolder iHolder)
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
std::exception_ptr deferredExceptionPtr_
def move(src, dest)
Definition: eostools.py:510
def operate(timelog, memlog, json_f, num)
void edm::EventProcessor::init ( std::shared_ptr< ProcessDesc > &  processDesc,
ServiceToken const &  token,
serviceregistry::ServiceLegacy  iLegacy 
)
private

Definition at line 376 of file EventProcessor.cc.

References act_table_, actReg_, branchIDListHelper(), branchIDListHelper_, trackingPlots::common, edm::errors::Configuration, esp_, espController_, Exception, FDEBUG, fileModeNoMerge_, edm::fillLooper(), forceESCacheClearOnNewRun_, edm::ParameterSet::getUntrackedParameter(), historyAppender_, input_, edm::PrincipalCache::insert(), edm::ParentageRegistry::insertMapped(), edm::ParentageRegistry::instance(), mps_monitormerge::items, edm::serviceregistry::kConfigurationOverrides, looper_, lumiQueue_, edm::makeInput(), eostools::move(), jets_cff::nThreads, edm::PreallocationConfiguration::numberOfLuminosityBlocks(), edm::PreallocationConfiguration::numberOfStreams(), cmsPerfStripChart::operate(), edm::parameterSet(), edm::popSubProcessVParameterSet(), preallocations_, preg(), preg_, principalCache_, printDependencies_, processConfiguration_, processContext_, edm::ParameterSet::registerIt(), schedule_, serviceToken_, edm::PrincipalCache::setNumberOfConcurrentPrincipals(), edm::ProcessContext::setProcessConfiguration(), edm::PrincipalCache::setProcessHistoryRegistry(), edm::IllegalParameters::setThrowAnException(), streamLumiStatus_, streamQueues_, AlCaHLTBitMon_QueryRunRegistry::string, subProcesses_, thinnedAssociationsHelper(), thinnedAssociationsHelper_, and edm::validateTopLevelParameterSets().

Referenced by EventProcessor().

378  {
379 
380  //std::cerr << processDesc->dump() << std::endl;
381 
382  // register the empty parentage vector , once and for all
384 
385  // register the empty parameter set, once and for all.
386  ParameterSet().registerIt();
387 
388  std::shared_ptr<ParameterSet> parameterSet = processDesc->getProcessPSet();
389 
390  // If there are subprocesses, pop the subprocess parameter sets out of the process parameter set
391  auto subProcessVParameterSet = popSubProcessVParameterSet(*parameterSet);
392  bool const hasSubProcesses = !subProcessVParameterSet.empty();
393 
394  // Validates the parameters in the 'options', 'maxEvents', 'maxLuminosityBlocks',
395  // and 'maxSecondsUntilRampdown' top level parameter sets. Default values are also
396  // set in here if the parameters were not explicitly set.
397  validateTopLevelParameterSets(parameterSet.get());
398 
399  // Now set some parameters specific to the main process.
400  ParameterSet const& optionsPset(parameterSet->getUntrackedParameterSet("options"));
401  auto const& fileMode = optionsPset.getUntrackedParameter<std::string>("fileMode");
402  if (fileMode != "NOMERGE" and fileMode != "FULLMERGE") {
403  throw Exception(errors::Configuration, "Illegal fileMode parameter value: ")
404  << fileMode << ".\n"
405  << "Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
406  } else {
407  fileModeNoMerge_ = (fileMode == "NOMERGE");
408  }
409  forceESCacheClearOnNewRun_ = optionsPset.getUntrackedParameter<bool>("forceEventSetupCacheClearOnNewRun");
410 
411  //threading
412  unsigned int nThreads = optionsPset.getUntrackedParameter<unsigned int>("numberOfThreads");
413 
414  // Even if numberOfThreads was set to zero in the Python configuration, the code
415  // in cmsRun.cpp should have reset it to something else.
416  assert(nThreads != 0);
417 
418  unsigned int nStreams = optionsPset.getUntrackedParameter<unsigned int>("numberOfStreams");
419  if (nStreams == 0) {
420  nStreams = nThreads;
421  }
422  if(nThreads > 1) {
423  edm::LogInfo("ThreadStreamSetup") <<"setting # threads "<<nThreads<<"\nsetting # streams "<<nStreams;
424  }
425  unsigned int nConcurrentRuns = optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentRuns");
426  if (nConcurrentRuns != 1) {
427  throw Exception(errors::Configuration, "Illegal value nConcurrentRuns : ")
428  << "Although the plan is to change this in the future, currently nConcurrentRuns must always be 1.\n";
429  }
430  unsigned int nConcurrentLumis = optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentLuminosityBlocks");
431  if (nConcurrentLumis == 0) {
432  nConcurrentLumis = nConcurrentRuns;
433  }
434 
435  //Check that relationships between threading parameters makes sense
436  /*
437  if(nThreads<nStreams) {
438  //bad
439  }
440  if(nConcurrentRuns>nStreams) {
441  //bad
442  }
443  if(nConcurrentRuns>nConcurrentLumis) {
444  //bad
445  }
446  */
447  IllegalParameters::setThrowAnException(optionsPset.getUntrackedParameter<bool>("throwIfIllegalParameter"));
448 
449  printDependencies_ = optionsPset.getUntrackedParameter<bool>("printDependencies");
450 
451  // Now do general initialization
452  ScheduleItems items;
453 
454  //initialize the services
455  auto& serviceSets = processDesc->getServicesPSets();
456  ServiceToken token = items.initServices(serviceSets, *parameterSet, iToken, iLegacy, true);
457  serviceToken_ = items.addCPRandTNS(*parameterSet, token);
458 
459  //make the services available
461 
462  if(nStreams>1) {
464  handler->willBeUsingThreads();
465  }
466 
467  // intialize miscellaneous items
468  std::shared_ptr<CommonParams> common(items.initMisc(*parameterSet));
469 
470  // intialize the event setup provider
471  esp_ = espController_->makeProvider(*parameterSet, items.actReg_.get());
472 
473  // initialize the looper, if any
474  looper_ = fillLooper(*espController_, *esp_, *parameterSet);
475  if(looper_) {
476  looper_->setActionTable(items.act_table_.get());
477  looper_->attachTo(*items.actReg_);
478 
479  //For now loopers make us run only 1 transition at a time
480  nStreams=1;
481  nConcurrentLumis=1;
482  nConcurrentRuns=1;
483  }
484 
485  preallocations_ = PreallocationConfiguration{nThreads,nStreams,nConcurrentLumis,nConcurrentRuns};
486 
487  lumiQueue_ = std::make_unique<LimitedTaskQueue>(nConcurrentLumis);
488  streamQueues_.resize(nStreams);
489  streamLumiStatus_.resize(nStreams);
490 
491  // initialize the input source
492  input_ = makeInput(*parameterSet,
493  *common,
494  items.preg(),
495  items.branchIDListHelper(),
496  items.thinnedAssociationsHelper(),
497  items.actReg_,
498  items.processConfiguration(),
500 
501  // intialize the Schedule
502  schedule_ = items.initSchedule(*parameterSet,hasSubProcesses,preallocations_,&processContext_);
503 
504  // set the data members
505  act_table_ = std::move(items.act_table_);
506  actReg_ = items.actReg_;
507  preg_ = items.preg();
508  branchIDListHelper_ = items.branchIDListHelper();
509  thinnedAssociationsHelper_ = items.thinnedAssociationsHelper();
510  processConfiguration_ = items.processConfiguration();
512  principalCache_.setProcessHistoryRegistry(input_->processHistoryRegistry());
513 
514  FDEBUG(2) << parameterSet << std::endl;
515 
517  for(unsigned int index = 0; index<preallocations_.numberOfStreams(); ++index ) {
518  // Reusable event principal
519  auto ep = std::make_shared<EventPrincipal>(preg(), branchIDListHelper(),
522  }
523 
524  for(unsigned int index =0; index < preallocations_.numberOfLuminosityBlocks(); ++index) {
525  auto lp = std::make_unique<LuminosityBlockPrincipal>(preg(), *processConfiguration_,
526  historyAppender_.get(), index);
528  }
529 
530  // fill the subprocesses, if there are any
531  subProcesses_.reserve(subProcessVParameterSet.size());
532  for(auto& subProcessPSet : subProcessVParameterSet) {
533  subProcesses_.emplace_back(subProcessPSet,
534  *parameterSet,
535  preg(),
538  SubProcessParentageHelper(),
540  *actReg_,
541  token,
544  &processContext_);
545  }
546  }
void insert(std::shared_ptr< RunPrincipal > rp)
ProcessContext processContext_
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
std::unique_ptr< edm::LimitedTaskQueue > lumiQueue_
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
#define FDEBUG(lev)
Definition: DebugMacros.h:20
PreallocationConfiguration preallocations_
std::vector< SubProcess > subProcesses_
void validateTopLevelParameterSets(ParameterSet *processParameterSet)
std::unique_ptr< InputSource > makeInput(ParameterSet &params, CommonParams const &common, std::shared_ptr< ProductRegistry > preg, std::shared_ptr< BranchIDListHelper > branchIDListHelper, std::shared_ptr< ThinnedAssociationsHelper > thinnedAssociationsHelper, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, PreallocationConfiguration const &allocations)
ServiceToken serviceToken_
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
std::shared_ptr< ProductRegistry const > preg() const
std::vector< edm::SerialTaskQueue > streamQueues_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
static void setThrowAnException(bool v)
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< EDLooperBase > fillLooper(eventsetup::EventSetupsController &esController, eventsetup::EventSetupProvider &cp, ParameterSet &params)
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
Definition: common.py:1
std::vector< ParameterSet > popSubProcessVParameterSet(ParameterSet &parameterSet)
Definition: SubProcess.cc:684
void setProcessHistoryRegistry(ProcessHistoryRegistry const &phr)
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
static ParentageRegistry * instance()
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
bool insertMapped(value_type const &v)
ParameterSet const & parameterSet(Provenance const &provenance)
Definition: Provenance.cc:11
def move(src, dest)
Definition: eostools.py:510
PrincipalCache principalCache_
def operate(timelog, memlog, json_f, num)
InputSource::ItemType edm::EventProcessor::lastTransitionType ( ) const
inline
std::shared_ptr<EDLooperBase const> edm::EventProcessor::looper ( ) const
inlineprivate

Definition at line 280 of file EventProcessor.h.

References edm::get_underlying_safe().

Referenced by endJob().

280 {return get_underlying_safe(looper_);}
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
std::shared_ptr<EDLooperBase>& edm::EventProcessor::looper ( )
inlineprivate

Definition at line 281 of file EventProcessor.h.

References edm::get_underlying_safe().

281 {return get_underlying_safe(looper_);}
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::LuminosityBlockNumber_t edm::EventProcessor::nextLuminosityBlockID ( )

Definition at line 747 of file EventProcessor.cc.

References input_.

Referenced by readNextEventForStream().

747  {
748  return input_->luminosityBlock();
749  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::pair< edm::ProcessHistoryID, edm::RunNumber_t > edm::EventProcessor::nextRunID ( )

Definition at line 742 of file EventProcessor.cc.

References input_.

742  {
743  return std::make_pair(input_->reducedProcessHistoryID(), input_->run());
744  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
InputSource::ItemType edm::EventProcessor::nextTransitionType ( )

Definition at line 715 of file EventProcessor.cc.

References actReg_, checkForAsyncStopRequest(), deferredExceptionPtrIsSet_, epSuccess, edm::ExternalSignal, input_, edm::InputSource::IsStop, edm::InputSource::IsSynchronize, lastSourceTransition_, and runEdmFileComparison::returnCode.

Referenced by readNextEventForStream().

715  {
716  if (deferredExceptionPtrIsSet_.load()) {
718  return InputSource::IsStop;
719  }
720 
721  SendSourceTerminationSignalIfException sentry(actReg_.get());
722  InputSource::ItemType itemType;
723  //For now, do nothing with InputSource::IsSynchronize
724  do {
725  itemType = input_->nextItemType();
726  } while( itemType == InputSource::IsSynchronize);
727 
728  lastSourceTransition_ = itemType;
729  sentry.completedSuccessfully();
730 
732 
733  if(checkForAsyncStopRequest(returnCode)) {
734  actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExternalSignal);
736  }
737 
738  return lastSourceTransition_;
739  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
bool checkForAsyncStopRequest(StatusCode &)
std::atomic< bool > deferredExceptionPtrIsSet_
InputSource::ItemType lastSourceTransition_
std::shared_ptr< ActivityRegistry > actReg_
void edm::EventProcessor::openOutputFiles ( )

Definition at line 849 of file EventProcessor.cc.

References fb_, FDEBUG, edm::for_all(), schedule_, and subProcesses_.

849  {
850  if (fb_.get() != nullptr) {
851  schedule_->openOutputFiles(*fb_);
852  for_all(subProcesses_, [this](auto& subProcess){ subProcess.openOutputFiles(*fb_); });
853  }
854  FDEBUG(1) << "\topenOutputFiles\n";
855  }
#define FDEBUG(lev)
Definition: DebugMacros.h:20
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
EventProcessor& edm::EventProcessor::operator= ( EventProcessor const &  )
delete
std::shared_ptr<ProductRegistry const> edm::EventProcessor::preg ( ) const
inlineprivate

Definition at line 274 of file EventProcessor.h.

References edm::get_underlying_safe().

Referenced by beginJob(), init(), readAndMergeLumi(), readAndMergeRun(), readFile(), and readRun().

274 {return get_underlying_safe(preg_);}
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
std::shared_ptr<ProductRegistry>& edm::EventProcessor::preg ( )
inlineprivate

Definition at line 275 of file EventProcessor.h.

References edm::get_underlying_safe().

275 {return get_underlying_safe(preg_);}
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
void edm::EventProcessor::prepareForNextLoop ( )

Definition at line 911 of file EventProcessor.cc.

References esp_, FDEBUG, edm::propagate_const< T >::get(), and looper_.

Referenced by runToCompletion().

911  {
912  looper_->prepareForNextLoop(esp_.get());
913  FDEBUG(1) << "\tprepareForNextLoop\n";
914  }
#define FDEBUG(lev)
Definition: DebugMacros.h:20
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
element_type const * get() const
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
ProcessConfiguration const& edm::EventProcessor::processConfiguration ( ) const
inline

Definition at line 131 of file EventProcessor.h.

References EcalCondTools::getToken(), and cuy::rep.

131 { return *processConfiguration_; }
std::shared_ptr< ProcessConfiguration const > processConfiguration_
void edm::EventProcessor::processEventAsync ( WaitingTaskHolder  iHolder,
unsigned int  iStreamIndex 
)
private

Definition at line 1625 of file EventProcessor.cc.

References edm::make_functor_task(), and processEventAsyncImpl().

Referenced by handleNextEventForStreamAsync().

1626  {
1627  tbb::task::spawn( *make_functor_task( tbb::task::allocate_root(), [=]() {
1628  processEventAsyncImpl(iHolder, iStreamIndex);
1629  }) );
1630  }
FunctorTask< F > * make_functor_task(ALLOC &&iAlloc, F f)
Definition: FunctorTask.h:47
void processEventAsyncImpl(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
void edm::EventProcessor::processEventAsyncImpl ( WaitingTaskHolder  iHolder,
unsigned int  iStreamIndex 
)
private

Definition at line 1632 of file EventProcessor.cc.

References edm::WaitingTaskHolder::doneWaiting(), esp_, ev, edm::PrincipalCache::eventPrincipal(), FDEBUG, edm::Service< T >::isAvailable(), looper_, edm::make_waiting_task(), eostools::move(), cmsPerfStripChart::operate(), principalCache_, processEventWithLooper(), groupFilesInBlocks::reverse, schedule_, serviceToken_, and subProcesses_.

Referenced by processEventAsync().

1633  {
1634  auto pep = &(principalCache_.eventPrincipal(iStreamIndex));
1635 
1638  if(rng.isAvailable()) {
1639  Event ev(*pep, ModuleDescription(), nullptr);
1640  rng->postEventRead(ev);
1641  }
1642 
1643  WaitingTaskHolder finalizeEventTask( make_waiting_task(
1644  tbb::task::allocate_root(),
1645  [this,pep,iHolder](std::exception_ptr const* iPtr) mutable
1646  {
1647 
1648  //NOTE: If we have a looper we only have one Stream
1649  if(looper_) {
1651  processEventWithLooper(*pep);
1652  }
1653 
1654  FDEBUG(1) << "\tprocessEvent\n";
1655  pep->clearEventPrincipal();
1656  if(iPtr) {
1657  iHolder.doneWaiting(*iPtr);
1658  } else {
1659  iHolder.doneWaiting(std::exception_ptr());
1660  }
1661  }
1662  )
1663  );
1664  WaitingTaskHolder afterProcessTask;
1665  if(subProcesses_.empty()) {
1666  afterProcessTask = std::move(finalizeEventTask);
1667  } else {
1668  //Need to run SubProcesses after schedule has finished
1669  // with the event
1670  afterProcessTask = WaitingTaskHolder(
1671  make_waiting_task(tbb::task::allocate_root(),
1672  [this,pep,finalizeEventTask] (std::exception_ptr const* iPtr) mutable
1673  {
1674  if(not iPtr) {
1675  //when run with 1 thread, we want to the order to be what
1676  // it was before. This requires reversing the order since
1677  // tasks are run last one in first one out
1678  for(auto& subProcess: boost::adaptors::reverse(subProcesses_)) {
1679  subProcess.doEventAsync(finalizeEventTask,*pep);
1680  }
1681  } else {
1682  finalizeEventTask.doneWaiting(*iPtr);
1683  }
1684  })
1685  );
1686  }
1687 
1688  schedule_->processOneEventAsync(std::move(afterProcessTask),
1689  iStreamIndex,*pep, esp_->eventSetup(), serviceToken_);
1690 
1691  }
bool ev
#define FDEBUG(lev)
Definition: DebugMacros.h:20
std::vector< SubProcess > subProcesses_
ServiceToken serviceToken_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:92
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
void processEventWithLooper(EventPrincipal &)
def move(src, dest)
Definition: eostools.py:510
PrincipalCache principalCache_
def operate(timelog, memlog, json_f, num)
void edm::EventProcessor::processEventWithLooper ( EventPrincipal iPrincipal)
private

Definition at line 1693 of file EventProcessor.cc.

References esp_, input_, edm::InputSource::IsStop, edm::EDLooperBase::kContinue, edm::ProcessingController::kToPreviousEvent, edm::ProcessingController::kToSpecifiedEvent, edm::ProcessingController::lastOperationSucceeded(), lastSourceTransition_, looper_, processContext_, edm::ProcessingController::requestedTransition(), edm::ProcessingController::setLastOperationSucceeded(), shouldWeStop_, edm::ProcessingController::specifiedEventTransition(), mps_update::status, edm::EventPrincipal::streamID(), and summarizeEdmComparisonLogfiles::succeeded.

Referenced by processEventAsyncImpl().

1693  {
1694  bool randomAccess = input_->randomAccess();
1695  ProcessingController::ForwardState forwardState = input_->forwardState();
1696  ProcessingController::ReverseState reverseState = input_->reverseState();
1697  ProcessingController pc(forwardState, reverseState, randomAccess);
1698 
1700  do {
1701 
1702  StreamContext streamContext(iPrincipal.streamID(), &processContext_);
1703  status = looper_->doDuringLoop(iPrincipal, esp_->eventSetup(), pc, &streamContext);
1704 
1705  bool succeeded = true;
1706  if(randomAccess) {
1707  if(pc.requestedTransition() == ProcessingController::kToPreviousEvent) {
1708  input_->skipEvents(-2);
1709  }
1710  else if(pc.requestedTransition() == ProcessingController::kToSpecifiedEvent) {
1711  succeeded = input_->goToEvent(pc.specifiedEventTransition());
1712  }
1713  }
1714  pc.setLastOperationSucceeded(succeeded);
1715  } while(!pc.lastOperationSucceeded());
1716  if(status != EDLooperBase::kContinue) {
1717  shouldWeStop_ = true;
1719  }
1720  }
ProcessContext processContext_
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
InputSource::ItemType lastSourceTransition_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
InputSource::ItemType edm::EventProcessor::processLumis ( std::shared_ptr< void > const &  iRunResource)

Definition at line 1093 of file EventProcessor.cc.

References beginLumiAsync(), continueLumiAsync(), input_, lastTransitionType(), edm::make_empty_waiting_task(), edm::PreallocationConfiguration::numberOfStreams(), preallocations_, and streamLumiActive_.

1093  {
1094  auto waitTask = make_empty_waiting_task();
1095  waitTask->increment_ref_count();
1096 
1097  if(streamLumiActive_> 0) {
1099  continueLumiAsync(WaitingTaskHolder{waitTask.get()});
1100  } else {
1101  beginLumiAsync(IOVSyncValue(EventID(input_->run(), input_->luminosityBlock(), 0),
1102  input_->luminosityBlockAuxiliary()->beginTime()),
1103  iRunResource,
1104  WaitingTaskHolder{waitTask.get()});
1105  }
1106  waitTask->wait_for_all();
1107 
1108  if(waitTask->exceptionPtr() != nullptr) {
1109  std::rethrow_exception(* (waitTask->exceptionPtr()) );
1110  }
1111  return lastTransitionType();
1112  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
PreallocationConfiguration preallocations_
InputSource::ItemType lastTransitionType() const
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
void continueLumiAsync(edm::WaitingTaskHolder iHolder)
void beginLumiAsync(edm::IOVSyncValue const &iSyncValue, std::shared_ptr< void > const &iRunResource, edm::WaitingTaskHolder iHolder)
std::atomic< unsigned int > streamLumiActive_
int edm::EventProcessor::readAndMergeLumi ( LuminosityBlockProcessingStatus iStatus)

Definition at line 1449 of file EventProcessor.cc.

References actReg_, input_, edm::LuminosityBlockProcessingStatus::lumiPrincipal(), or, and preg().

Referenced by readNextEventForStream().

1449  {
1450  auto& lumiPrincipal = *iStatus.lumiPrincipal();
1451  assert(lumiPrincipal.aux().sameIdentity(*input_->luminosityBlockAuxiliary()) or
1452  input_->processHistoryRegistry().reducedProcessHistoryID(lumiPrincipal.aux().processHistoryID()) == input_->processHistoryRegistry().reducedProcessHistoryID(input_->luminosityBlockAuxiliary()->processHistoryID()));
1453  bool lumiOK = lumiPrincipal.adjustToNewProductRegistry(*preg());
1454  assert(lumiOK);
1455  lumiPrincipal.mergeAuxiliary(*input_->luminosityBlockAuxiliary());
1456  {
1457  SendSourceTerminationSignalIfException sentry(actReg_.get());
1458  input_->readAndMergeLumi(*iStatus.lumiPrincipal());
1459  sentry.completedSuccessfully();
1460  }
1461  return input_->luminosityBlock();
1462  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::shared_ptr< ProductRegistry const > preg() const
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
std::shared_ptr< ActivityRegistry > actReg_
std::pair< ProcessHistoryID, RunNumber_t > edm::EventProcessor::readAndMergeRun ( )

Definition at line 1417 of file EventProcessor.cc.

References actReg_, input_, edm::PrincipalCache::merge(), preg(), principalCache_, and edm::PrincipalCache::runPrincipalPtr().

1417  {
1418  principalCache_.merge(input_->runAuxiliary(), preg());
1419  auto runPrincipal =principalCache_.runPrincipalPtr();
1420  {
1421  SendSourceTerminationSignalIfException sentry(actReg_.get());
1422  input_->readAndMergeRun(*runPrincipal);
1423  sentry.completedSuccessfully();
1424  }
1425  assert(input_->reducedProcessHistoryID() == runPrincipal->reducedProcessHistoryID());
1426  return std::make_pair(runPrincipal->reducedProcessHistoryID(), input_->run());
1427  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::shared_ptr< ProductRegistry const > preg() const
std::shared_ptr< RunPrincipal > const & runPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run) const
std::shared_ptr< ActivityRegistry > actReg_
void merge(std::shared_ptr< RunAuxiliary > aux, std::shared_ptr< ProductRegistry const > reg)
PrincipalCache principalCache_
void edm::EventProcessor::readEvent ( unsigned int  iStreamIndex)
private

Definition at line 1611 of file EventProcessor.cc.

References actReg_, edm::PrincipalCache::eventPrincipal(), FDEBUG, input_, principalCache_, processContext_, and streamLumiStatus_.

Referenced by readNextEventForStream().

1611  {
1612  //TODO this will have to become per stream
1613  auto& event = principalCache_.eventPrincipal(iStreamIndex);
1614  StreamContext streamContext(event.streamID(), &processContext_);
1615 
1616  SendSourceTerminationSignalIfException sentry(actReg_.get());
1617  input_->readEvent(event, streamContext);
1618 
1619  streamLumiStatus_[iStreamIndex]->updateLastTimestamp(input_->timestamp());
1620  sentry.completedSuccessfully();
1621 
1622  FDEBUG(1) << "\treadEvent\n";
1623  }
ProcessContext processContext_
edm::propagate_const< std::unique_ptr< InputSource > > input_
#define FDEBUG(lev)
Definition: DebugMacros.h:20
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
std::shared_ptr< ActivityRegistry > actReg_
Definition: event.py:1
PrincipalCache principalCache_
void edm::EventProcessor::readFile ( )

Definition at line 823 of file EventProcessor.cc.

References actReg_, edm::PrincipalCache::adjustEventsToNewProductRegistry(), edm::PrincipalCache::adjustIndexesAfterProductRegistryAddition(), fb_, FDEBUG, input_, edm::PreallocationConfiguration::numberOfStreams(), edm::PreallocationConfiguration::numberOfThreads(), edm::FileBlock::ParallelProcesses, preallocations_, preg(), preg_, principalCache_, and findQualityFiles::size.

Referenced by Vispa.Plugins.EventBrowser.EventBrowserTabController.EventBrowserTabController::navigate(), Vispa.Main.TabController.TabController::open(), and Vispa.Main.TabController.TabController::refresh().

823  {
824  FDEBUG(1) << " \treadFile\n";
825  size_t size = preg_->size();
826  SendSourceTerminationSignalIfException sentry(actReg_.get());
827 
828  fb_ = input_->readFile();
829  if(size < preg_->size()) {
831  }
835  fb_->setNotFastClonable(FileBlock::ParallelProcesses);
836  }
837  sentry.completedSuccessfully();
838  }
size
Write out results.
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
void adjustIndexesAfterProductRegistryAddition()
#define FDEBUG(lev)
Definition: DebugMacros.h:20
PreallocationConfiguration preallocations_
std::shared_ptr< ProductRegistry const > preg() const
void adjustEventsToNewProductRegistry(std::shared_ptr< ProductRegistry const > reg)
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
std::shared_ptr< ActivityRegistry > actReg_
PrincipalCache principalCache_
void edm::EventProcessor::readLuminosityBlock ( LuminosityBlockProcessingStatus iStatus)

Definition at line 1429 of file EventProcessor.cc.

References actReg_, Exception, edm::PrincipalCache::getAvailableLumiPrincipalPtr(), edm::PrincipalCache::hasRunPrincipal(), historyAppender_, input_, edm::errors::LogicError, edm::LuminosityBlockProcessingStatus::lumiPrincipal(), eostools::move(), principalCache_, and edm::PrincipalCache::runPrincipalPtr().

Referenced by beginLumiAsync().

1429  {
1432  << "EventProcessor::readLuminosityBlock\n"
1433  << "Illegal attempt to insert lumi into cache\n"
1434  << "Run is invalid\n"
1435  << "Contact a Framework Developer\n";
1436  }
1438  assert(lbp);
1439  lbp->setAux(*input_->luminosityBlockAuxiliary());
1440  {
1441  SendSourceTerminationSignalIfException sentry(actReg_.get());
1442  input_->readLuminosityBlock(*lbp, *historyAppender_);
1443  sentry.completedSuccessfully();
1444  }
1445  lbp->setRunPrincipal(principalCache_.runPrincipalPtr());
1446  iStatus.lumiPrincipal() = std::move(lbp);
1447  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
bool hasRunPrincipal() const
std::shared_ptr< RunPrincipal > const & runPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run) const
std::shared_ptr< LuminosityBlockPrincipal > getAvailableLumiPrincipalPtr()
std::shared_ptr< ActivityRegistry > actReg_
def move(src, dest)
Definition: eostools.py:510
PrincipalCache principalCache_
bool edm::EventProcessor::readNextEventForStream ( unsigned int  iStreamIndex,
LuminosityBlockProcessingStatus iLumiStatus 
)
private

Definition at line 1507 of file EventProcessor.cc.

References edm::LuminosityBlockProcessingStatus::continuingLumi(), deferredExceptionPtr_, deferredExceptionPtrIsSet_, edm::LuminosityBlockProcessingStatus::endLumi(), edm::LuminosityBlockProcessingStatus::haveContinuedLumi(), input_, edm::InputSource::IsEvent, edm::InputSource::IsLumi, edm::InputSource::IsRun, edm::InputSource::IsStop, edm::LuminosityBlockProcessingStatus::lumiPrincipal(), nextLuminosityBlockID(), nextTransitionType(), cmsPerfStripChart::operate(), or, readAndMergeLumi(), readEvent(), serviceToken_, edm::LuminosityBlockProcessingStatus::setNextSyncValue(), shouldWeStop(), sourceMutex_, edm::LuminosityBlockProcessingStatus::stopProcessingEvents(), and edm::LuminosityBlockProcessingStatus::wasEventProcessingStopped().

Referenced by handleNextEventForStreamAsync().

1508  {
1509  if(shouldWeStop()) {
1510  return false;
1511  }
1512 
1513  if(deferredExceptionPtrIsSet_.load(std::memory_order_acquire)) {
1514  return false;
1515  }
1516 
1517  if(iStatus.wasEventProcessingStopped()) {
1518  return false;
1519  }
1520 
1522  try {
1523  //need to use lock in addition to the serial task queue because
1524  // of delayed provenance reading and reading data in response to
1525  // edm::Refs etc
1526  std::lock_guard<std::recursive_mutex> guard(*(sourceMutex_.get()));
1527 
1528  auto itemType = iStatus.continuingLumi()? InputSource::IsLumi : nextTransitionType();
1529  if(InputSource::IsLumi == itemType) {
1530  iStatus.haveContinuedLumi();
1531  while(itemType == InputSource::IsLumi and
1532  iStatus.lumiPrincipal()->run() == input_->run() and
1533  iStatus.lumiPrincipal()->luminosityBlock() == nextLuminosityBlockID()) {
1534  readAndMergeLumi(iStatus);
1535  itemType = nextTransitionType();
1536  }
1537  if(InputSource::IsLumi == itemType) {
1538  iStatus.setNextSyncValue(IOVSyncValue(EventID(input_->run(), input_->luminosityBlock(), 0),
1539  input_->luminosityBlockAuxiliary()->beginTime()));
1540  }
1541  }
1542  if(InputSource::IsEvent != itemType) {
1543  iStatus.stopProcessingEvents();
1544 
1545  //IsFile may continue processing the lumi and
1546  // looper_ can cause the input source to declare a new IsRun which is actually
1547  // just a continuation of the previous run
1548  if(InputSource::IsStop == itemType or
1549  InputSource::IsLumi == itemType or
1550  (InputSource::IsRun == itemType and iStatus.lumiPrincipal()->run() != input_->run())) {
1551  iStatus.endLumi();
1552  }
1553  return false;
1554  }
1555  readEvent(iStreamIndex);
1556  } catch (...) {
1557  bool expected =false;
1558  if(deferredExceptionPtrIsSet_.compare_exchange_strong(expected,true)) {
1559  deferredExceptionPtr_ = std::current_exception();
1560  }
1561  return false;
1562  }
1563  return true;
1564  }
void readEvent(unsigned int iStreamIndex)
InputSource::ItemType nextTransitionType()
edm::propagate_const< std::unique_ptr< InputSource > > input_
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
std::shared_ptr< std::recursive_mutex > sourceMutex_
int readAndMergeLumi(LuminosityBlockProcessingStatus &)
std::exception_ptr deferredExceptionPtr_
edm::LuminosityBlockNumber_t nextLuminosityBlockID()
bool shouldWeStop() const
def operate(timelog, memlog, json_f, num)
std::pair< ProcessHistoryID, RunNumber_t > edm::EventProcessor::readRun ( )

Definition at line 1399 of file EventProcessor.cc.

References actReg_, Exception, edm::PrincipalCache::hasRunPrincipal(), historyAppender_, input_, edm::PrincipalCache::insert(), edm::errors::LogicError, preg(), principalCache_, and processConfiguration_.

1399  {
1402  << "EventProcessor::readRun\n"
1403  << "Illegal attempt to insert run into cache\n"
1404  << "Contact a Framework Developer\n";
1405  }
1406  auto rp = std::make_shared<RunPrincipal>(input_->runAuxiliary(), preg(), *processConfiguration_, historyAppender_.get(), 0);
1407  {
1408  SendSourceTerminationSignalIfException sentry(actReg_.get());
1409  input_->readRun(*rp, *historyAppender_);
1410  sentry.completedSuccessfully();
1411  }
1412  assert(input_->reducedProcessHistoryID() == rp->reducedProcessHistoryID());
1413  principalCache_.insert(rp);
1414  return std::make_pair(rp->reducedProcessHistoryID(), input_->run());
1415  }
void insert(std::shared_ptr< RunPrincipal > rp)
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
bool hasRunPrincipal() const
std::shared_ptr< ProductRegistry const > preg() const
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< ActivityRegistry > actReg_
PrincipalCache principalCache_
void edm::EventProcessor::respondToCloseInputFile ( )

Definition at line 874 of file EventProcessor.cc.

References fb_, FDEBUG, edm::for_all(), schedule_, and subProcesses_.

874  {
875  if (fb_.get() != nullptr) {
876  schedule_->respondToCloseInputFile(*fb_);
877  for_all(subProcesses_, [this](auto& subProcess){ subProcess.respondToCloseInputFile(*fb_); });
878  }
879  FDEBUG(1) << "\trespondToCloseInputFile\n";
880  }
#define FDEBUG(lev)
Definition: DebugMacros.h:20
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
void edm::EventProcessor::respondToOpenInputFile ( )

Definition at line 865 of file EventProcessor.cc.

References branchIDListHelper_, fb_, FDEBUG, edm::for_all(), schedule_, and subProcesses_.

865  {
866  for_all(subProcesses_, [this](auto& subProcess){ subProcess.updateBranchIDListHelper(branchIDListHelper_->branchIDLists()); } );
867  if (fb_.get() != nullptr) {
868  schedule_->respondToOpenInputFile(*fb_);
869  for_all(subProcesses_, [this](auto& subProcess) { subProcess.respondToOpenInputFile(*fb_); });
870  }
871  FDEBUG(1) << "\trespondToOpenInputFile\n";
872  }
#define FDEBUG(lev)
Definition: DebugMacros.h:20
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
void edm::EventProcessor::rewindInput ( )

Definition at line 905 of file EventProcessor.cc.

References FDEBUG, and input_.

Referenced by runToCompletion().

905  {
906  input_->repeat();
907  input_->rewind();
908  FDEBUG(1) << "\trewind\n";
909  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
#define FDEBUG(lev)
Definition: DebugMacros.h:20
EventProcessor::StatusCode edm::EventProcessor::run ( )
inline

Definition at line 349 of file EventProcessor.h.

Referenced by Types.EventID::cppID(), Types.LuminosityBlockID::cppID(), and endUnfinishedRun().

349  {
350  return runToCompletion();
351  }
StatusCode runToCompletion()
EventProcessor::StatusCode edm::EventProcessor::runToCompletion ( )

Definition at line 752 of file EventProcessor.cc.

References cms::Exception::addAdditionalInfo(), cms::Exception::alreadyPrinted(), asyncStopRequestedWhileProcessingEvents_, asyncStopStatusCodeFromProcessingEvents_, beginJob(), deferredExceptionPtr_, deferredExceptionPtrIsSet_, doErrorStuff(), MillePedeFileConverter_cfg::e, endOfLoop(), epSuccess, Exception, exceptionMessageFiles_, exceptionMessageLumis_, exceptionMessageRuns_, fileModeNoMerge_, edm::InputSource::IsStop, cmsPerfStripChart::operate(), prepareForNextLoop(), runEdmFileComparison::returnCode, rewindInput(), serviceToken_, startingNewLoop(), and edm::convertException::wrap().

Referenced by PythonEventProcessor::run().

752  {
753 
756  {
757  beginJob(); //make sure this was called
758 
759  // make the services available
761 
763  try {
764  FilesProcessor fp(fileModeNoMerge_);
765 
766  convertException::wrap([&]() {
767  bool firstTime = true;
768  do {
769  if(not firstTime) {
771  rewindInput();
772  } else {
773  firstTime = false;
774  }
775  startingNewLoop();
776 
777  auto trans = fp.processFiles(*this);
778 
779  fp.normalEnd();
780 
781  if(deferredExceptionPtrIsSet_.load()) {
782  std::rethrow_exception(deferredExceptionPtr_);
783  }
784  if(trans != InputSource::IsStop) {
785  //problem with the source
786  doErrorStuff();
787 
788  throw cms::Exception("BadTransition")
789  << "Unexpected transition change "
790  << trans;
791 
792  }
793  } while(not endOfLoop());
794  }); // convertException::wrap
795 
796  } // Try block
797  catch (cms::Exception & e) {
798  if (!exceptionMessageLumis_.empty()) {
800  if (e.alreadyPrinted()) {
801  LogAbsolute("Additional Exceptions") << exceptionMessageLumis_;
802  }
803  }
804  if (!exceptionMessageRuns_.empty()) {
806  if (e.alreadyPrinted()) {
807  LogAbsolute("Additional Exceptions") << exceptionMessageRuns_;
808  }
809  }
810  if (!exceptionMessageFiles_.empty()) {
812  if (e.alreadyPrinted()) {
813  LogAbsolute("Additional Exceptions") << exceptionMessageFiles_;
814  }
815  }
816  throw;
817  }
818  }
819 
820  return returnCode;
821  }
std::string exceptionMessageRuns_
bool alreadyPrinted() const
Definition: Exception.cc:251
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
std::string exceptionMessageLumis_
void addAdditionalInfo(std::string const &info)
Definition: Exception.cc:235
std::string exceptionMessageFiles_
StatusCode asyncStopStatusCodeFromProcessingEvents_
std::exception_ptr deferredExceptionPtr_
auto wrap(F iFunc) -> decltype(iFunc())
bool asyncStopRequestedWhileProcessingEvents_
def operate(timelog, memlog, json_f, num)
bool edm::EventProcessor::setDeferredException ( std::exception_ptr  iException)

Definition at line 1748 of file EventProcessor.cc.

References deferredExceptionPtr_, and deferredExceptionPtrIsSet_.

1748  {
1749  bool expected =false;
1750  if(deferredExceptionPtrIsSet_.compare_exchange_strong(expected,true)) {
1751  deferredExceptionPtr_ = iException;
1752  return true;
1753  }
1754  return false;
1755  }
std::atomic< bool > deferredExceptionPtrIsSet_
std::exception_ptr deferredExceptionPtr_
void edm::EventProcessor::setExceptionMessageFiles ( std::string &  message)

Definition at line 1736 of file EventProcessor.cc.

References exceptionMessageFiles_.

1736  {
1737  exceptionMessageFiles_ = message;
1738  }
std::string exceptionMessageFiles_
void edm::EventProcessor::setExceptionMessageLumis ( std::string &  message)

Definition at line 1744 of file EventProcessor.cc.

References exceptionMessageLumis_.

1744  {
1745  exceptionMessageLumis_ = message;
1746  }
std::string exceptionMessageLumis_
void edm::EventProcessor::setExceptionMessageRuns ( std::string &  message)

Definition at line 1740 of file EventProcessor.cc.

References exceptionMessageRuns_.

1740  {
1741  exceptionMessageRuns_ = message;
1742  }
std::string exceptionMessageRuns_
bool edm::EventProcessor::shouldWeCloseOutput ( ) const

Definition at line 916 of file EventProcessor.cc.

References FDEBUG, schedule_, and subProcesses_.

916  {
917  FDEBUG(1) << "\tshouldWeCloseOutput\n";
918  if(!subProcesses_.empty()) {
919  for(auto const& subProcess : subProcesses_) {
920  if(subProcess.shouldWeCloseOutput()) {
921  return true;
922  }
923  }
924  return false;
925  }
926  return schedule_->shouldWeCloseOutput();
927  }
#define FDEBUG(lev)
Definition: DebugMacros.h:20
std::vector< SubProcess > subProcesses_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
bool edm::EventProcessor::shouldWeStop ( ) const

Definition at line 1722 of file EventProcessor.cc.

References FDEBUG, schedule_, shouldWeStop_, and subProcesses_.

Referenced by readNextEventForStream().

1722  {
1723  FDEBUG(1) << "\tshouldWeStop\n";
1724  if(shouldWeStop_) return true;
1725  if(!subProcesses_.empty()) {
1726  for(auto const& subProcess : subProcesses_) {
1727  if(subProcess.terminate()) {
1728  return true;
1729  }
1730  }
1731  return false;
1732  }
1733  return schedule_->terminate();
1734  }
#define FDEBUG(lev)
Definition: DebugMacros.h:20
std::vector< SubProcess > subProcesses_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
void edm::EventProcessor::startingNewLoop ( )

Definition at line 882 of file EventProcessor.cc.

References FDEBUG, looper_, looperBeginJobRun_, and shouldWeStop_.

Referenced by runToCompletion().

882  {
883  shouldWeStop_ = false;
884  //NOTE: for first loop, need to delay calling 'doStartingNewLoop'
885  // until after we've called beginOfJob
886  if(looper_ && looperBeginJobRun_) {
887  looper_->doStartingNewLoop();
888  }
889  FDEBUG(1) << "\tstartingNewLoop\n";
890  }
#define FDEBUG(lev)
Definition: DebugMacros.h:20
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
void edm::EventProcessor::streamEndLumiAsync ( edm::WaitingTaskHolder  iTask,
unsigned int  iStreamIndex,
std::shared_ptr< LuminosityBlockProcessingStatus iLumiStatus 
)

Definition at line 1336 of file EventProcessor.cc.

References edm::WaitingTaskHolder::doneWaiting(), esp_, globalEndLumiAsync(), edm::make_waiting_task(), edm::EventID::maxEventNumber(), eostools::move(), schedule_, serviceToken_, mps_update::status, streamLumiActive_, streamLumiStatus_, streamQueues_, subProcesses_, and lumiQTWidget::t.

Referenced by endUnfinishedLumi(), and handleNextEventForStreamAsync().

1338  {
1339 
1340  auto t =edm::make_waiting_task(tbb::task::allocate_root(), [this, iStreamIndex, iTask](std::exception_ptr const* iPtr) mutable {
1341  std::exception_ptr ptr;
1342  if(iPtr) {
1343  ptr = *iPtr;
1344  }
1345  auto status =streamLumiStatus_[iStreamIndex];
1346  //reset status before releasing queue else get race condtion
1347  streamLumiStatus_[iStreamIndex].reset();
1349  streamQueues_[iStreamIndex].resume();
1350 
1351  //are we the last one?
1352  if( status->streamFinishedLumi()) {
1354  }
1355  iTask.doneWaiting(ptr);
1356  });
1357 
1358  edm::WaitingTaskHolder lumiDoneTask{t};
1359 
1360  iLumiStatus->setEndTime();
1361 
1362  if(iLumiStatus->didGlobalBeginSucceed()) {
1363  auto & lumiPrincipal = *iLumiStatus->lumiPrincipal();
1364  IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), EventID::maxEventNumber()),
1365  lumiPrincipal.endTime());
1366  EventSetup const& es = esp_->eventSetup();
1367 
1368  bool cleaningUpAfterException = iLumiStatus->cleaningUpAfterException();
1369 
1370  using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamEnd>;
1371  endStreamTransitionAsync<Traits>(std::move(lumiDoneTask),
1372  *schedule_,iStreamIndex,
1373  lumiPrincipal,ts,es,
1374  serviceToken_,
1375  subProcesses_,cleaningUpAfterException);
1376  }
1377  }
void globalEndLumiAsync(edm::WaitingTaskHolder iTask, std::shared_ptr< LuminosityBlockProcessingStatus > iLumiStatus)
std::vector< SubProcess > subProcesses_
ServiceToken serviceToken_
void doneWaiting(std::exception_ptr iExcept)
std::vector< edm::SerialTaskQueue > streamQueues_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:92
static EventNumber_t maxEventNumber()
Definition: EventID.h:111
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::atomic< unsigned int > streamLumiActive_
def move(src, dest)
Definition: eostools.py:510
std::shared_ptr<ThinnedAssociationsHelper const> edm::EventProcessor::thinnedAssociationsHelper ( ) const
inlineprivate

Definition at line 278 of file EventProcessor.h.

References edm::get_underlying_safe().

Referenced by init().

std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
std::shared_ptr<ThinnedAssociationsHelper>& edm::EventProcessor::thinnedAssociationsHelper ( )
inlineprivate

Definition at line 279 of file EventProcessor.h.

References edm::get_underlying_safe().

std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
int edm::EventProcessor::totalEvents ( ) const

Return the number of events this EventProcessor has tried to process (inclues both successes and failures, including failures due to exceptions during processing).

Definition at line 664 of file EventProcessor.cc.

References schedule_.

Referenced by PythonEventProcessor::totalEvents().

664  {
665  return schedule_->totalEvents();
666  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
int edm::EventProcessor::totalEventsFailed ( ) const

Return the number of events that have not passed any trigger. (N.B. totalEventsFailed() + totalEventsPassed() == totalEvents()

Definition at line 674 of file EventProcessor.cc.

References schedule_.

Referenced by PythonEventProcessor::totalEventsFailed().

674  {
675  return schedule_->totalEventsFailed();
676  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
int edm::EventProcessor::totalEventsPassed ( ) const

Return the number of events processed by this EventProcessor which have been passed by one or more trigger paths.

Definition at line 669 of file EventProcessor.cc.

References schedule_.

Referenced by PythonEventProcessor::totalEventsPassed().

669  {
670  return schedule_->totalEventsPassed();
671  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
void edm::EventProcessor::writeLumiAsync ( WaitingTaskHolder  task,
std::shared_ptr< LuminosityBlockProcessingStatus iStatus 
)

Definition at line 1485 of file EventProcessor.cc.

References actReg_, edm::WaitingTaskHolder::doneWaiting(), edm::make_waiting_task(), processContext_, alignCSCRings::s, schedule_, serviceToken_, and subProcesses_.

Referenced by globalEndLumiAsync().

1485  {
1486  auto subsT = edm::make_waiting_task(tbb::task::allocate_root(), [this,task, iStatus](std::exception_ptr const* iExcept) mutable {
1487  if(iExcept) {
1488  task.doneWaiting(*iExcept);
1489  } else {
1491  for(auto&s : subProcesses_) {
1492  s.writeLumiAsync(task,*(iStatus->lumiPrincipal()));
1493  }
1494  }
1495  });
1497 
1498  schedule_->writeLumiAsync(WaitingTaskHolder{subsT}, *(iStatus->lumiPrincipal()), &processContext_, actReg_.get());
1499  }
ProcessContext processContext_
std::vector< SubProcess > subProcesses_
ServiceToken serviceToken_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:92
std::shared_ptr< ActivityRegistry > actReg_
void edm::EventProcessor::writeRunAsync ( WaitingTaskHolder  task,
ProcessHistoryID const &  phid,
RunNumber_t  run 
)

Definition at line 1464 of file EventProcessor.cc.

References actReg_, edm::WaitingTaskHolder::doneWaiting(), edm::make_waiting_task(), principalCache_, processContext_, edm::PrincipalCache::runPrincipal(), alignCSCRings::s, schedule_, serviceToken_, and subProcesses_.

Referenced by endUnfinishedRun().

1464  {
1465  auto subsT = edm::make_waiting_task(tbb::task::allocate_root(), [this,phid,run,task](std::exception_ptr const* iExcept) mutable {
1466  if(iExcept) {
1467  task.doneWaiting(*iExcept);
1468  } else {
1470  for(auto&s : subProcesses_) {
1471  s.writeRunAsync(task,phid,run);
1472  }
1473  }
1474  });
1476  schedule_->writeRunAsync(WaitingTaskHolder(subsT), principalCache_.runPrincipal(phid, run), &processContext_, actReg_.get());
1477  }
ProcessContext processContext_
std::vector< SubProcess > subProcesses_
ServiceToken serviceToken_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:92
std::shared_ptr< ActivityRegistry > actReg_
PrincipalCache principalCache_
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const

Member Data Documentation

std::unique_ptr<ExceptionToActionTable const> edm::EventProcessor::act_table_
private

Definition at line 299 of file EventProcessor.h.

Referenced by init().

std::shared_ptr<ActivityRegistry> edm::EventProcessor::actReg_
private
bool edm::EventProcessor::asyncStopRequestedWhileProcessingEvents_
private

Definition at line 334 of file EventProcessor.h.

Referenced by runToCompletion().

StatusCode edm::EventProcessor::asyncStopStatusCodeFromProcessingEvents_
private

Definition at line 335 of file EventProcessor.h.

Referenced by runToCompletion().

bool edm::EventProcessor::beginJobCalled_
private

Definition at line 322 of file EventProcessor.h.

Referenced by beginJob().

edm::propagate_const<std::shared_ptr<BranchIDListHelper> > edm::EventProcessor::branchIDListHelper_
private

Definition at line 291 of file EventProcessor.h.

Referenced by init(), and respondToOpenInputFile().

std::exception_ptr edm::EventProcessor::deferredExceptionPtr_
private
std::atomic<bool> edm::EventProcessor::deferredExceptionPtrIsSet_
private
edm::propagate_const<std::shared_ptr<eventsetup::EventSetupProvider> > edm::EventProcessor::esp_
private
edm::propagate_const<std::unique_ptr<eventsetup::EventSetupsController> > edm::EventProcessor::espController_
private

Definition at line 296 of file EventProcessor.h.

Referenced by beginLumiAsync(), beginRun(), endRun(), init(), and ~EventProcessor().

ExcludedDataMap edm::EventProcessor::eventSetupDataToExcludeFromPrefetching_
private

Definition at line 340 of file EventProcessor.h.

std::string edm::EventProcessor::exceptionMessageFiles_
private

Definition at line 325 of file EventProcessor.h.

Referenced by runToCompletion(), and setExceptionMessageFiles().

std::string edm::EventProcessor::exceptionMessageLumis_
private

Definition at line 327 of file EventProcessor.h.

Referenced by runToCompletion(), and setExceptionMessageLumis().

std::string edm::EventProcessor::exceptionMessageRuns_
private

Definition at line 326 of file EventProcessor.h.

Referenced by runToCompletion(), and setExceptionMessageRuns().

edm::propagate_const<std::unique_ptr<FileBlock> > edm::EventProcessor::fb_
private
bool edm::EventProcessor::fileModeNoMerge_
private

Definition at line 324 of file EventProcessor.h.

Referenced by init(), and runToCompletion().

bool edm::EventProcessor::firstEventInBlock_ =true
private

Definition at line 336 of file EventProcessor.h.

bool edm::EventProcessor::forceESCacheClearOnNewRun_
private

Definition at line 330 of file EventProcessor.h.

Referenced by beginRun(), and init().

bool edm::EventProcessor::forceLooperToEnd_
private

Definition at line 328 of file EventProcessor.h.

Referenced by endOfLoop().

edm::propagate_const<std::unique_ptr<HistoryAppender> > edm::EventProcessor::historyAppender_
private

Definition at line 310 of file EventProcessor.h.

Referenced by init(), readLuminosityBlock(), and readRun().

edm::propagate_const<std::unique_ptr<InputSource> > edm::EventProcessor::input_
private
edm::SerialTaskQueue edm::EventProcessor::iovQueue_
private

Definition at line 298 of file EventProcessor.h.

Referenced by beginLumiAsync(), and globalEndLumiAsync().

InputSource::ItemType edm::EventProcessor::lastSourceTransition_
private

Definition at line 295 of file EventProcessor.h.

Referenced by nextTransitionType(), and processEventWithLooper().

edm::propagate_const<std::shared_ptr<EDLooperBase> > edm::EventProcessor::looper_
private
bool edm::EventProcessor::looperBeginJobRun_
private

Definition at line 329 of file EventProcessor.h.

Referenced by beginRun(), and startingNewLoop().

std::unique_ptr<edm::LimitedTaskQueue> edm::EventProcessor::lumiQueue_
private

Definition at line 305 of file EventProcessor.h.

Referenced by beginLumiAsync(), and init().

PathsAndConsumesOfModules edm::EventProcessor::pathsAndConsumesOfModules_
private

Definition at line 302 of file EventProcessor.h.

Referenced by beginJob().

PreallocationConfiguration edm::EventProcessor::preallocations_
private
edm::propagate_const<std::shared_ptr<ProductRegistry> > edm::EventProcessor::preg_
private

Definition at line 290 of file EventProcessor.h.

Referenced by beginJob(), endOfLoop(), init(), and readFile().

PrincipalCache edm::EventProcessor::principalCache_
private
bool edm::EventProcessor::printDependencies_ = false
private

Definition at line 342 of file EventProcessor.h.

Referenced by beginJob(), and init().

std::shared_ptr<ProcessConfiguration const> edm::EventProcessor::processConfiguration_
private

Definition at line 300 of file EventProcessor.h.

Referenced by beginJob(), init(), and readRun().

ProcessContext edm::EventProcessor::processContext_
private
edm::propagate_const<std::unique_ptr<Schedule> > edm::EventProcessor::schedule_
private
ServiceToken edm::EventProcessor::serviceToken_
private
bool edm::EventProcessor::shouldWeStop_
private

Definition at line 323 of file EventProcessor.h.

Referenced by processEventWithLooper(), shouldWeStop(), and startingNewLoop().

std::shared_ptr<std::recursive_mutex> edm::EventProcessor::sourceMutex_
private

Definition at line 320 of file EventProcessor.h.

Referenced by readNextEventForStream().

SharedResourcesAcquirer edm::EventProcessor::sourceResourcesAcquirer_
private

Definition at line 319 of file EventProcessor.h.

Referenced by beginLumiAsync(), and handleNextEventForStreamAsync().

std::atomic<unsigned int> edm::EventProcessor::streamLumiActive_ {0}
private
std::vector<std::shared_ptr<LuminosityBlockProcessingStatus> > edm::EventProcessor::streamLumiStatus_
private
std::vector<edm::SerialTaskQueue> edm::EventProcessor::streamQueues_
private

Definition at line 304 of file EventProcessor.h.

Referenced by beginLumiAsync(), init(), and streamEndLumiAsync().

std::vector<SubProcess> edm::EventProcessor::subProcesses_
private
edm::propagate_const<std::shared_ptr<ThinnedAssociationsHelper> > edm::EventProcessor::thinnedAssociationsHelper_
private

Definition at line 292 of file EventProcessor.h.

Referenced by init().