CMS 3D CMS Logo

List of all members | Public Types | Public Member Functions | Private Types | Private Member Functions | Private Attributes
edm::EventProcessor Class Reference

#include <EventProcessor.h>

Public Types

enum  StatusCode {
  epSuccess =0, epException =1, epOther =2, epSignal =3,
  epInputComplete =4, epTimedOut =5, epCountComplete =6
}
 

Public Member Functions

void beginJob ()
 
void beginLumiAsync (edm::IOVSyncValue const &iSyncValue, std::shared_ptr< void > const &iRunResource, edm::WaitingTaskHolder iHolder)
 
void beginRun (ProcessHistoryID const &phid, RunNumber_t run, bool &globalBeginSucceeded, bool &eventSetupForInstanceSucceeded)
 
void clearCounters ()
 Clears counters used by trigger report. More...
 
void closeInputFile (bool cleaningUpAfterException)
 
void closeOutputFiles ()
 
void continueLumiAsync (edm::WaitingTaskHolder iHolder)
 
void deleteLumiFromCache (LuminosityBlockProcessingStatus &)
 
void deleteRunFromCache (ProcessHistoryID const &phid, RunNumber_t run)
 
void doErrorStuff ()
 
void enableEndPaths (bool active)
 
void endJob ()
 
bool endOfLoop ()
 
bool endPathsEnabled () const
 
void endRun (ProcessHistoryID const &phid, RunNumber_t run, bool globalBeginSucceeded, bool cleaningUpAfterException)
 
void endUnfinishedLumi ()
 
void endUnfinishedRun (ProcessHistoryID const &phid, RunNumber_t run, bool globalBeginSucceeded, bool cleaningUpAfterException, bool eventSetupForInstanceSucceeded)
 
 EventProcessor (std::string const &config, ServiceToken const &token=ServiceToken(), serviceregistry::ServiceLegacy=serviceregistry::kOverlapIsError, std::vector< std::string > const &defaultServices=std::vector< std::string >(), std::vector< std::string > const &forcedServices=std::vector< std::string >())
 
 EventProcessor (std::string const &config, std::vector< std::string > const &defaultServices, std::vector< std::string > const &forcedServices=std::vector< std::string >())
 
 EventProcessor (std::shared_ptr< ProcessDesc > processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy legacy)
 
 EventProcessor (std::string const &config, bool isPython)
 meant for unit tests More...
 
 EventProcessor (EventProcessor const &)=delete
 
std::vector< ModuleDescription const * > getAllModuleDescriptions () const
 
ServiceToken getToken ()
 
void getTriggerReport (TriggerReport &rep) const
 
void globalEndLumiAsync (edm::WaitingTaskHolder iTask, std::shared_ptr< LuminosityBlockProcessingStatus > iLumiStatus)
 
InputSource::ItemType lastTransitionType () const
 
edm::LuminosityBlockNumber_t nextLuminosityBlockID ()
 
std::pair< edm::ProcessHistoryID, edm::RunNumber_tnextRunID ()
 
InputSource::ItemType nextTransitionType ()
 
void openOutputFiles ()
 
EventProcessoroperator= (EventProcessor const &)=delete
 
void prepareForNextLoop ()
 
ProcessConfiguration const & processConfiguration () const
 
InputSource::ItemType processLumis (std::shared_ptr< void > const &iRunResource)
 
int readAndMergeLumi (LuminosityBlockProcessingStatus &)
 
std::pair< ProcessHistoryID, RunNumber_treadAndMergeRun ()
 
void readFile ()
 
void readLuminosityBlock (LuminosityBlockProcessingStatus &)
 
std::pair< ProcessHistoryID, RunNumber_treadRun ()
 
void respondToCloseInputFile ()
 
void respondToOpenInputFile ()
 
void rewindInput ()
 
StatusCode run ()
 
StatusCode runToCompletion ()
 
bool setDeferredException (std::exception_ptr)
 
void setExceptionMessageFiles (std::string &message)
 
void setExceptionMessageLumis (std::string &message)
 
void setExceptionMessageRuns (std::string &message)
 
bool shouldWeCloseOutput () const
 
bool shouldWeStop () const
 
void startingNewLoop ()
 
void streamEndLumiAsync (edm::WaitingTaskHolder iTask, unsigned int iStreamIndex, std::shared_ptr< LuminosityBlockProcessingStatus > iLumiStatus)
 
int totalEvents () const
 
int totalEventsFailed () const
 
int totalEventsPassed () const
 
void writeLumiAsync (WaitingTaskHolder, std::shared_ptr< LuminosityBlockProcessingStatus >)
 
void writeRunAsync (WaitingTaskHolder, ProcessHistoryID const &phid, RunNumber_t run, MergeableRunProductMetadata const *)
 
 ~EventProcessor ()
 

Private Types

typedef std::set< std::pair< std::string, std::string > > ExcludedData
 
typedef std::map< std::string, ExcludedDataExcludedDataMap
 

Private Member Functions

std::shared_ptr< BranchIDListHelper const > branchIDListHelper () const
 
std::shared_ptr< BranchIDListHelper > & branchIDListHelper ()
 
bool checkForAsyncStopRequest (StatusCode &)
 
void handleNextEventForStreamAsync (WaitingTaskHolder iTask, unsigned int iStreamIndex)
 
void init (std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
 
std::shared_ptr< EDLooperBase const > looper () const
 
std::shared_ptr< EDLooperBase > & looper ()
 
std::shared_ptr< ProductRegistry const > preg () const
 
std::shared_ptr< ProductRegistry > & preg ()
 
void processEventAsync (WaitingTaskHolder iHolder, unsigned int iStreamIndex)
 
void processEventAsyncImpl (WaitingTaskHolder iHolder, unsigned int iStreamIndex)
 
void processEventWithLooper (EventPrincipal &)
 
void readEvent (unsigned int iStreamIndex)
 
bool readNextEventForStream (unsigned int iStreamIndex, LuminosityBlockProcessingStatus &iLumiStatus)
 
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper () const
 
std::shared_ptr< ThinnedAssociationsHelper > & thinnedAssociationsHelper ()
 

Private Attributes

std::unique_ptr< ExceptionToActionTable const > act_table_
 
std::shared_ptr< ActivityRegistryactReg_
 
bool asyncStopRequestedWhileProcessingEvents_
 
StatusCode asyncStopStatusCodeFromProcessingEvents_
 
bool beginJobCalled_
 
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
 
std::exception_ptr deferredExceptionPtr_
 
std::atomic< bool > deferredExceptionPtrIsSet_
 
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
 
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
 
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
 
std::string exceptionMessageFiles_
 
std::string exceptionMessageLumis_
 
std::string exceptionMessageRuns_
 
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
 
bool fileModeNoMerge_
 
bool firstEventInBlock_ =true
 
bool forceESCacheClearOnNewRun_
 
bool forceLooperToEnd_
 
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
 
edm::propagate_const< std::unique_ptr< InputSource > > input_
 
edm::SerialTaskQueue iovQueue_
 
InputSource::ItemType lastSourceTransition_
 
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
 
bool looperBeginJobRun_
 
std::unique_ptr< edm::LimitedTaskQueuelumiQueue_
 
MergeableRunProductProcesses mergeableRunProductProcesses_
 
PathsAndConsumesOfModules pathsAndConsumesOfModules_
 
PreallocationConfiguration preallocations_
 
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
 
PrincipalCache principalCache_
 
bool printDependencies_ = false
 
std::shared_ptr< ProcessConfiguration const > processConfiguration_
 
ProcessContext processContext_
 
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
 
ServiceToken serviceToken_
 
bool shouldWeStop_
 
std::shared_ptr< std::recursive_mutex > sourceMutex_
 
SharedResourcesAcquirer sourceResourcesAcquirer_
 
std::atomic< unsigned int > streamLumiActive_ {0}
 
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
 
std::vector< edm::SerialTaskQueuestreamQueues_
 
std::vector< SubProcesssubProcesses_
 
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
 

Detailed Description

Definition at line 63 of file EventProcessor.h.

Member Typedef Documentation

typedef std::set<std::pair<std::string, std::string> > edm::EventProcessor::ExcludedData
private

Definition at line 344 of file EventProcessor.h.

typedef std::map<std::string, ExcludedData> edm::EventProcessor::ExcludedDataMap
private

Definition at line 345 of file EventProcessor.h.

Member Enumeration Documentation

Constructor & Destructor Documentation

edm::EventProcessor::EventProcessor ( std::string const &  config,
ServiceToken const &  token = ServiceToken(),
serviceregistry::ServiceLegacy  iLegacy = serviceregistry::kOverlapIsError,
std::vector< std::string > const &  defaultServices = std::vector<std::string>(),
std::vector< std::string > const &  forcedServices = std::vector<std::string>() 
)
explicit

Definition at line 219 of file EventProcessor.cc.

References init(), edm::parameterSet(), and PythonProcessDesc::parameterSet().

223  :
224  actReg_(),
225  preg_(),
227  serviceToken_(),
228  input_(),
229  espController_(new eventsetup::EventSetupsController),
230  esp_(),
231  act_table_(),
233  schedule_(),
234  subProcesses_(),
235  historyAppender_(new HistoryAppender),
236  fb_(),
237  looper_(),
239  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
240  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
241  principalCache_(),
242  beginJobCalled_(false),
243  shouldWeStop_(false),
244  fileModeNoMerge_(false),
248  forceLooperToEnd_(false),
249  looperBeginJobRun_(false),
252  std::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
253  auto processDesc = std::make_shared<ProcessDesc>(parameterSet);
254  processDesc->addServices(defaultServices, forcedServices);
255  init(processDesc, iToken, iLegacy);
256  }
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
SharedResourcesAcquirer sourceResourcesAcquirer_
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
Definition: config.py:1
std::string exceptionMessageRuns_
std::vector< SubProcess > subProcesses_
U second(std::pair< T, U > const &p)
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
std::string exceptionMessageLumis_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
static SharedResourcesRegistry * instance()
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
std::shared_ptr< edm::ParameterSet > parameterSet() const
std::string exceptionMessageFiles_
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
T first(std::pair< T, U > const &p)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
ParameterSet const & parameterSet(Provenance const &provenance)
Definition: Provenance.cc:11
PrincipalCache principalCache_
edm::EventProcessor::EventProcessor ( std::string const &  config,
std::vector< std::string > const &  defaultServices,
std::vector< std::string > const &  forcedServices = std::vector<std::string>() 
)

Definition at line 258 of file EventProcessor.cc.

References init(), edm::serviceregistry::kOverlapIsError, edm::parameterSet(), and PythonProcessDesc::parameterSet().

260  :
261  actReg_(),
262  preg_(),
264  serviceToken_(),
265  input_(),
266  espController_(new eventsetup::EventSetupsController),
267  esp_(),
268  act_table_(),
270  schedule_(),
271  subProcesses_(),
272  historyAppender_(new HistoryAppender),
273  fb_(),
274  looper_(),
276  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
277  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
278  principalCache_(),
279  beginJobCalled_(false),
280  shouldWeStop_(false),
281  fileModeNoMerge_(false),
285  forceLooperToEnd_(false),
286  looperBeginJobRun_(false),
290  {
291  std::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
292  auto processDesc = std::make_shared<ProcessDesc>(parameterSet);
293  processDesc->addServices(defaultServices, forcedServices);
295  }
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
SharedResourcesAcquirer sourceResourcesAcquirer_
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
Definition: config.py:1
std::string exceptionMessageRuns_
std::vector< SubProcess > subProcesses_
U second(std::pair< T, U > const &p)
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
std::string exceptionMessageLumis_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
static SharedResourcesRegistry * instance()
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
std::shared_ptr< edm::ParameterSet > parameterSet() const
std::string exceptionMessageFiles_
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
bool asyncStopRequestedWhileProcessingEvents_
std::shared_ptr< ActivityRegistry > actReg_
T first(std::pair< T, U > const &p)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
ParameterSet const & parameterSet(Provenance const &provenance)
Definition: Provenance.cc:11
PrincipalCache principalCache_
edm::EventProcessor::EventProcessor ( std::shared_ptr< ProcessDesc processDesc,
ServiceToken const &  token,
serviceregistry::ServiceLegacy  legacy 
)

Definition at line 297 of file EventProcessor.cc.

References init().

299  :
300  actReg_(),
301  preg_(),
303  serviceToken_(),
304  input_(),
305  espController_(new eventsetup::EventSetupsController),
306  esp_(),
307  act_table_(),
309  schedule_(),
310  subProcesses_(),
311  historyAppender_(new HistoryAppender),
312  fb_(),
313  looper_(),
315  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
316  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
317  principalCache_(),
318  beginJobCalled_(false),
319  shouldWeStop_(false),
320  fileModeNoMerge_(false),
324  forceLooperToEnd_(false),
325  looperBeginJobRun_(false),
329  {
330  init(processDesc, token, legacy);
331  }
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
SharedResourcesAcquirer sourceResourcesAcquirer_
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
std::string exceptionMessageRuns_
std::vector< SubProcess > subProcesses_
U second(std::pair< T, U > const &p)
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
std::string exceptionMessageLumis_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
static SharedResourcesRegistry * instance()
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
std::string exceptionMessageFiles_
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
bool asyncStopRequestedWhileProcessingEvents_
std::shared_ptr< ActivityRegistry > actReg_
T first(std::pair< T, U > const &p)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
PrincipalCache principalCache_
edm::EventProcessor::EventProcessor ( std::string const &  config,
bool  isPython 
)

meant for unit tests

Definition at line 334 of file EventProcessor.cc.

References looper::config, init(), edm::serviceregistry::kOverlapIsError, edm::parameterSet(), and PythonProcessDesc::parameterSet().

334  :
335  actReg_(),
336  preg_(),
338  serviceToken_(),
339  input_(),
340  espController_(new eventsetup::EventSetupsController),
341  esp_(),
342  act_table_(),
344  schedule_(),
345  subProcesses_(),
346  historyAppender_(new HistoryAppender),
347  fb_(),
348  looper_(),
350  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
351  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
352  principalCache_(),
353  beginJobCalled_(false),
354  shouldWeStop_(false),
355  fileModeNoMerge_(false),
359  forceLooperToEnd_(false),
360  looperBeginJobRun_(false),
364  {
365  if(isPython) {
366  std::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
367  auto processDesc = std::make_shared<ProcessDesc>(parameterSet);
369  }
370  else {
371  auto processDesc = std::make_shared<ProcessDesc>(config);
373  }
374  }
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
SharedResourcesAcquirer sourceResourcesAcquirer_
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
Definition: config.py:1
std::string exceptionMessageRuns_
std::vector< SubProcess > subProcesses_
U second(std::pair< T, U > const &p)
config
Definition: looper.py:289
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
std::string exceptionMessageLumis_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
static SharedResourcesRegistry * instance()
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
std::shared_ptr< edm::ParameterSet > parameterSet() const
std::string exceptionMessageFiles_
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
bool asyncStopRequestedWhileProcessingEvents_
std::shared_ptr< ActivityRegistry > actReg_
T first(std::pair< T, U > const &p)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
ParameterSet const & parameterSet(Provenance const &provenance)
Definition: Provenance.cc:11
PrincipalCache principalCache_
edm::EventProcessor::~EventProcessor ( )

Definition at line 550 of file EventProcessor.cc.

References actReg_, edm::ParentageRegistry::clear(), edm::pset::Registry::clear(), esp_, espController_, getToken(), input_, edm::ParentageRegistry::instance(), edm::pset::Registry::instance(), looper_, and schedule_.

550  {
551  // Make the services available while everything is being deleted.
552  ServiceToken token = getToken();
553  ServiceRegistry::Operate op(token);
554 
555  // manually destroy all these thing that may need the services around
556  // propagate_const<T> has no reset() function
557  espController_ = nullptr;
558  esp_ = nullptr;
559  schedule_ = nullptr;
560  input_ = nullptr;
561  looper_ = nullptr;
562  actReg_ = nullptr;
563 
566  }
void clear()
Not thread safe.
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
void clear()
Not thread safe.
Definition: Registry.cc:45
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
ServiceToken getToken()
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
static ParentageRegistry * instance()
static Registry * instance()
Definition: Registry.cc:13
edm::EventProcessor::EventProcessor ( EventProcessor const &  )
delete

Member Function Documentation

void edm::EventProcessor::beginJob ( void  )

This should be called before the first call to 'run' If this is not called in time, it will automatically be called the first time 'run' is called

Definition at line 569 of file EventProcessor.cc.

References actReg_, cms::Exception::addContext(), bk::beginJob(), beginJobCalled_, edm::checkForModuleDependencyCorrectness(), edm::for_all(), mps_fire::i, edm::PathsAndConsumesOfModules::initialize(), input_, edm::PreallocationConfiguration::numberOfLuminosityBlocks(), edm::PreallocationConfiguration::numberOfRuns(), edm::PreallocationConfiguration::numberOfStreams(), edm::PreallocationConfiguration::numberOfThreads(), cmsPerfStripChart::operate(), pathsAndConsumesOfModules_, preallocations_, preg(), preg_, printDependencies_, processConfiguration_, processContext_, schedule_, serviceToken_, subProcesses_, and edm::convertException::wrap().

Referenced by runToCompletion().

569  {
570  if(beginJobCalled_) return;
571  beginJobCalled_=true;
572  bk::beginJob();
573 
574  // StateSentry toerror(this); // should we add this ?
575  //make the services available
577 
578  service::SystemBounds bounds(preallocations_.numberOfStreams(),
582  actReg_->preallocateSignal_(bounds);
583  schedule_->convertCurrentProcessAlias(processConfiguration_->processName());
585 
586  //NOTE: this may throw
588  actReg_->preBeginJobSignal_(pathsAndConsumesOfModules_, processContext_);
589 
590  //NOTE: This implementation assumes 'Job' means one call
591  // the EventProcessor::run
592  // If it really means once per 'application' then this code will
593  // have to be changed.
594  // Also have to deal with case where have 'run' then new Module
595  // added and do 'run'
596  // again. In that case the newly added Module needs its 'beginJob'
597  // to be called.
598 
599  //NOTE: in future we should have a beginOfJob for looper that takes no arguments
600  // For now we delay calling beginOfJob until first beginOfRun
601  //if(looper_) {
602  // looper_->beginOfJob(es);
603  //}
604  try {
605  convertException::wrap([&]() {
606  input_->doBeginJob();
607  });
608  }
609  catch(cms::Exception& ex) {
610  ex.addContext("Calling beginJob for the source");
611  throw;
612  }
613  schedule_->beginJob(*preg_);
614  // toerror.succeeded(); // should we add this?
615  for_all(subProcesses_, [](auto& subProcess){ subProcess.doBeginJob(); });
616  actReg_->postBeginJobSignal_();
617 
618  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
619  schedule_->beginStream(i);
620  for_all(subProcesses_, [i](auto& subProcess){ subProcess.doBeginStream(i); });
621  }
622  }
ProcessContext processContext_
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
PreallocationConfiguration preallocations_
std::vector< SubProcess > subProcesses_
void beginJob()
Definition: Breakpoints.cc:15
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
void initialize(Schedule const *, std::shared_ptr< ProductRegistry const >)
ServiceToken serviceToken_
std::shared_ptr< ProductRegistry const > preg() const
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
void checkForModuleDependencyCorrectness(edm::PathsAndConsumesOfModulesBase const &iPnC, bool iPrintDependencies)
void addContext(std::string const &context)
Definition: Exception.cc:227
PathsAndConsumesOfModules pathsAndConsumesOfModules_
auto wrap(F iFunc) -> decltype(iFunc())
std::shared_ptr< ActivityRegistry > actReg_
def operate(timelog, memlog, json_f, num)
void edm::EventProcessor::beginLumiAsync ( edm::IOVSyncValue const &  iSyncValue,
std::shared_ptr< void > const &  iRunResource,
edm::WaitingTaskHolder  iHolder 
)

Definition at line 1129 of file EventProcessor.cc.

References actReg_, edm::LuminosityBlockPrincipal::beginTime(), edm::WaitingTaskHolder::doneWaiting(), esp_, espController_, edm::PrincipalCache::eventPrincipal(), h, handleNextEventForStreamAsync(), mps_fire::i, input_, iovQueue_, edm::Service< T >::isAvailable(), looper_, edm::LuminosityBlockPrincipal::luminosityBlock(), lumiQueue_, edm::make_waiting_task(), eostools::move(), edm::PreallocationConfiguration::numberOfStreams(), cmsPerfStripChart::operate(), edm::SerialTaskQueue::pause(), preallocations_, principalCache_, processContext_, edm::SerialTaskQueueChain::push(), edm::SerialTaskQueue::push(), readLuminosityBlock(), edm::LuminosityBlockPrincipal::run(), schedule_, edm::SharedResourcesAcquirer::serialQueueChain(), serviceToken_, sourceResourcesAcquirer_, mps_update::status, streamLumiActive_, streamLumiStatus_, streamQueues_, subProcesses_, and edm::WaitingTaskHolder::taskHasFailed().

Referenced by handleNextEventForStreamAsync(), and processLumis().

1130  {
1131  if(iHolder.taskHasFailed()) { return; }
1132 
1133  auto status= std::make_shared<LuminosityBlockProcessingStatus>(this, preallocations_.numberOfStreams(), iRunResource) ;
1134 
1135  auto lumiWork = [this, iHolder, status](edm::LimitedTaskQueue::Resumer iResumer) mutable {
1136  if(iHolder.taskHasFailed()) { return; }
1137 
1138  status->setResumer(std::move(iResumer));
1139 
1140  sourceResourcesAcquirer_.serialQueueChain().push([this,iHolder,status]() mutable {
1141  //make the services available
1143 
1144  try {
1146 
1147  LuminosityBlockPrincipal& lumiPrincipal = *status->lumiPrincipal();
1148  {
1149  SendSourceTerminationSignalIfException sentry(actReg_.get());
1150 
1151  input_->doBeginLumi(lumiPrincipal, &processContext_);
1152  sentry.completedSuccessfully();
1153  }
1154 
1156  if(rng.isAvailable()) {
1157  LuminosityBlock lb(lumiPrincipal, ModuleDescription(), nullptr, false);
1158  rng->preBeginLumi(lb);
1159  }
1160 
1161  IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), 0), lumiPrincipal.beginTime());
1162 
1163  //Task to start the stream beginLumis
1164  auto beginStreamsTask= make_waiting_task(tbb::task::allocate_root()
1165  ,[this, holder = iHolder, status, ts] (std::exception_ptr const* iPtr) mutable {
1166  if (iPtr) {
1167  holder.doneWaiting(*iPtr);
1168  } else {
1169 
1170  status->globalBeginDidSucceed();
1171  EventSetup const& es = esp_->eventSetup();
1172  if(looper_) {
1173  try {
1174  //make the services available
1176  looper_->doBeginLuminosityBlock(*(status->lumiPrincipal()), es, &processContext_);
1177  }catch(...) {
1178  holder.doneWaiting(std::current_exception());
1179  return;
1180  }
1181  }
1182  typedef OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamBegin> Traits;
1183 
1184  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
1185  streamQueues_[i].push([this,i,status,holder,ts,&es] () {
1186  streamQueues_[i].pause();
1187 
1188  auto eventTask = edm::make_waiting_task(tbb::task::allocate_root(),
1189  [this,i,h = holder](std::exception_ptr const* iPtr) mutable
1190  {
1191  if(iPtr) {
1192  h.doneWaiting(*iPtr);
1193  } else {
1195  }
1196  });
1197  auto& event = principalCache_.eventPrincipal(i);
1200  auto lp = status->lumiPrincipal();
1201  event.setLuminosityBlockPrincipal(lp.get());
1202  beginStreamTransitionAsync<Traits>(WaitingTaskHolder{eventTask},
1203  *schedule_,i,*lp,ts,es,
1205  });
1206  }
1207  }
1208  });
1209 
1210  //task to start the global begin lumi
1211  WaitingTaskHolder beginStreamsHolder{beginStreamsTask};
1212  EventSetup const& es = esp_->eventSetup();
1213  {
1214  typedef OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalBegin> Traits;
1215  beginGlobalTransitionAsync<Traits>(beginStreamsHolder,
1216  *schedule_,
1217  *(status->lumiPrincipal()),
1218  ts,
1219  es,
1220  serviceToken_,
1221  subProcesses_);
1222  }
1223  } catch(...) {
1224  iHolder.doneWaiting(std::current_exception());
1225  }
1226  });
1227  };
1228 
1229  //Safe to do check now since can not have multiple beginLumis at same time in this part of the code
1230  // because we do not attempt to read from the source again until we try to get the first event in a lumi
1231  if(espController_->isWithinValidityInterval(iSync)) {
1232  iovQueue_.pause();
1233  lumiQueue_->pushAndPause(std::move(lumiWork));
1234  } else {
1235  //If EventSetup fails, need beginStreamsHolder in order to pass back exception
1236  iovQueue_.push([this,iHolder,lumiWork,iSync]() mutable {
1237  try {
1238  SendSourceTerminationSignalIfException sentry(actReg_.get());
1239  espController_->eventSetupForInstance(iSync);
1240  sentry.completedSuccessfully();
1241  } catch(...) {
1242  iHolder.doneWaiting(std::current_exception());
1243  return;
1244  }
1245  iovQueue_.pause();
1246  lumiQueue_->pushAndPause(std::move(lumiWork));
1247  });
1248  }
1249  }
void readLuminosityBlock(LuminosityBlockProcessingStatus &)
ProcessContext processContext_
SharedResourcesAcquirer sourceResourcesAcquirer_
FWCore Framework interface EventSetupRecordImplementation h
Helper function to determine trigger accepts.
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< edm::LimitedTaskQueue > lumiQueue_
void push(T &&iAction)
asynchronously pushes functor iAction into queue
PreallocationConfiguration preallocations_
std::vector< SubProcess > subProcesses_
void handleNextEventForStreamAsync(WaitingTaskHolder iTask, unsigned int iStreamIndex)
ServiceToken serviceToken_
void doneWaiting(std::exception_ptr iExcept)
std::vector< edm::SerialTaskQueue > streamQueues_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
SerialTaskQueueChain & serialQueueChain() const
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:92
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
void push(const T &iAction)
asynchronously pushes functor iAction into queue
edm::SerialTaskQueue iovQueue_
std::shared_ptr< ActivityRegistry > actReg_
std::atomic< unsigned int > streamLumiActive_
bool pause()
Pauses processing of additional tasks from the queue.
def move(src, dest)
Definition: eostools.py:511
PrincipalCache principalCache_
def operate(timelog, memlog, json_f, num)
void edm::EventProcessor::beginRun ( ProcessHistoryID const &  phid,
RunNumber_t  run,
bool &  globalBeginSucceeded,
bool &  eventSetupForInstanceSucceeded 
)

Definition at line 943 of file EventProcessor.cc.

References actReg_, edm::RunPrincipal::beginTime(), esp_, espController_, FDEBUG, forceESCacheClearOnNewRun_, input_, looper_, looperBeginJobRun_, edm::make_empty_waiting_task(), edm::PreallocationConfiguration::numberOfStreams(), preallocations_, principalCache_, processContext_, edm::RunPrincipal::run(), edm::PrincipalCache::runPrincipal(), schedule_, serviceToken_, and subProcesses_.

944  {
945  globalBeginSucceeded = false;
946  RunPrincipal& runPrincipal = principalCache_.runPrincipal(phid, run);
947  {
948  SendSourceTerminationSignalIfException sentry(actReg_.get());
949 
950  input_->doBeginRun(runPrincipal, &processContext_);
951  sentry.completedSuccessfully();
952  }
953 
954  IOVSyncValue ts(EventID(runPrincipal.run(), 0, 0),
955  runPrincipal.beginTime());
957  espController_->forceCacheClear();
958  }
959  {
960  SendSourceTerminationSignalIfException sentry(actReg_.get());
961  espController_->eventSetupForInstance(ts);
962  eventSetupForInstanceSucceeded = true;
963  sentry.completedSuccessfully();
964  }
965  EventSetup const& es = esp_->eventSetup();
966  if(looper_ && looperBeginJobRun_== false) {
967  looper_->copyInfo(ScheduleInfo(schedule_.get()));
968  looper_->beginOfJob(es);
969  looperBeginJobRun_ = true;
970  looper_->doStartingNewLoop();
971  }
972  {
973  typedef OccurrenceTraits<RunPrincipal, BranchActionGlobalBegin> Traits;
974  auto globalWaitTask = make_empty_waiting_task();
975  globalWaitTask->increment_ref_count();
976  beginGlobalTransitionAsync<Traits>(WaitingTaskHolder(globalWaitTask.get()),
977  *schedule_,
978  runPrincipal,
979  ts,
980  es,
982  subProcesses_);
983  globalWaitTask->wait_for_all();
984  if(globalWaitTask->exceptionPtr() != nullptr) {
985  std::rethrow_exception(* (globalWaitTask->exceptionPtr()) );
986  }
987  }
988  globalBeginSucceeded = true;
989  FDEBUG(1) << "\tbeginRun " << run << "\n";
990  if(looper_) {
991  looper_->doBeginRun(runPrincipal, es, &processContext_);
992  }
993  {
994  //To wait, the ref count has to be 1+#streams
995  auto streamLoopWaitTask = make_empty_waiting_task();
996  streamLoopWaitTask->increment_ref_count();
997 
998  typedef OccurrenceTraits<RunPrincipal, BranchActionStreamBegin> Traits;
999 
1000  beginStreamsTransitionAsync<Traits>(streamLoopWaitTask.get(),
1001  *schedule_,
1003  runPrincipal,
1004  ts,
1005  es,
1006  serviceToken_,
1007  subProcesses_);
1008 
1009  streamLoopWaitTask->wait_for_all();
1010  if(streamLoopWaitTask->exceptionPtr() != nullptr) {
1011  std::rethrow_exception(* (streamLoopWaitTask->exceptionPtr()) );
1012  }
1013  }
1014  FDEBUG(1) << "\tstreamBeginRun " << run << "\n";
1015  if(looper_) {
1016  //looper_->doStreamBeginRun(schedule_->streamID(),runPrincipal, es);
1017  }
1018  }
ProcessContext processContext_
edm::propagate_const< std::unique_ptr< InputSource > > input_
#define FDEBUG(lev)
Definition: DebugMacros.h:20
PreallocationConfiguration preallocations_
std::vector< SubProcess > subProcesses_
ServiceToken serviceToken_
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
PrincipalCache principalCache_
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const
std::shared_ptr<BranchIDListHelper const> edm::EventProcessor::branchIDListHelper ( ) const
inlineprivate

Definition at line 281 of file EventProcessor.h.

References edm::get_underlying_safe().

Referenced by init().

std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
std::shared_ptr<BranchIDListHelper>& edm::EventProcessor::branchIDListHelper ( )
inlineprivate

Definition at line 282 of file EventProcessor.h.

References edm::get_underlying_safe().

std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
bool edm::EventProcessor::checkForAsyncStopRequest ( StatusCode returnCode)
private

Definition at line 705 of file EventProcessor.cc.

References epSignal, and edm::shutdown_flag.

Referenced by nextTransitionType().

705  {
706  bool returnValue = false;
707 
708  // Look for a shutdown signal
709  if(shutdown_flag.load(std::memory_order_acquire)) {
710  returnValue = true;
712  }
713  return returnValue;
714  }
volatile std::atomic< bool > shutdown_flag
void edm::EventProcessor::clearCounters ( )

Clears counters used by trigger report.

Definition at line 696 of file EventProcessor.cc.

References schedule_.

696  {
697  schedule_->clearCounters();
698  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
void edm::EventProcessor::closeInputFile ( bool  cleaningUpAfterException)

Definition at line 844 of file EventProcessor.cc.

References actReg_, fb_, FDEBUG, and input_.

844  {
845  if (fb_.get() != nullptr) {
846  SendSourceTerminationSignalIfException sentry(actReg_.get());
847  input_->closeFile(fb_.get(), cleaningUpAfterException);
848  sentry.completedSuccessfully();
849  }
850  FDEBUG(1) << "\tcloseInputFile\n";
851  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
#define FDEBUG(lev)
Definition: DebugMacros.h:20
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
std::shared_ptr< ActivityRegistry > actReg_
void edm::EventProcessor::closeOutputFiles ( )

Definition at line 861 of file EventProcessor.cc.

References fb_, FDEBUG, edm::for_all(), schedule_, and subProcesses_.

861  {
862  if (fb_.get() != nullptr) {
863  schedule_->closeOutputFiles();
864  for_all(subProcesses_, [](auto& subProcess){ subProcess.closeOutputFiles(); });
865  }
866  FDEBUG(1) << "\tcloseOutputFiles\n";
867  }
#define FDEBUG(lev)
Definition: DebugMacros.h:20
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
void edm::EventProcessor::continueLumiAsync ( edm::WaitingTaskHolder  iHolder)

Definition at line 1252 of file EventProcessor.cc.

References h, handleNextEventForStreamAsync(), edm::make_functor_task(), eostools::move(), edm::PreallocationConfiguration::numberOfStreams(), preallocations_, mps_update::status, and streamLumiStatus_.

Referenced by processLumis().

1252  {
1253  {
1254  //all streams are sharing the same status at the moment
1255  auto status = streamLumiStatus_[0]; //read from streamLumiActive_ happened in calling routine
1256  status->needToContinueLumi();
1257  status->startProcessingEvents();
1258  }
1259 
1260  unsigned int streamIndex = 0;
1261  for(; streamIndex< preallocations_.numberOfStreams()-1; ++streamIndex) {
1262  tbb::task::enqueue( *edm::make_functor_task(tbb::task::allocate_root(),
1263  [this,streamIndex,h = iHolder](){
1264  handleNextEventForStreamAsync(std::move(h), streamIndex);
1265  }) );
1266 
1267  }
1268  tbb::task::spawn( *edm::make_functor_task(tbb::task::allocate_root(),[this,streamIndex,h=std::move(iHolder)](){
1270  }) );
1271  }
FWCore Framework interface EventSetupRecordImplementation h
Helper function to determine trigger accepts.
FunctorTask< F > * make_functor_task(ALLOC &&iAlloc, F f)
Definition: FunctorTask.h:47
PreallocationConfiguration preallocations_
void handleNextEventForStreamAsync(WaitingTaskHolder iTask, unsigned int iStreamIndex)
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
def move(src, dest)
Definition: eostools.py:511
void edm::EventProcessor::deleteLumiFromCache ( LuminosityBlockProcessingStatus iStatus)

Definition at line 1523 of file EventProcessor.cc.

References edm::LuminosityBlockProcessingStatus::lumiPrincipal(), alignCSCRings::s, and subProcesses_.

Referenced by globalEndLumiAsync().

1523  {
1524  for(auto& s: subProcesses_) { s.deleteLumiFromCache(*iStatus.lumiPrincipal());}
1525  iStatus.lumiPrincipal()->clearPrincipal();
1526  //FDEBUG(1) << "\tdeleteLumiFromCache " << run << "/" << lumi << "\n";
1527  }
std::vector< SubProcess > subProcesses_
void edm::EventProcessor::deleteRunFromCache ( ProcessHistoryID const &  phid,
RunNumber_t  run 
)

Definition at line 1498 of file EventProcessor.cc.

References edm::PrincipalCache::deleteRun(), FDEBUG, edm::for_all(), principalCache_, and subProcesses_.

Referenced by endUnfinishedRun().

1498  {
1499  principalCache_.deleteRun(phid, run);
1500  for_all(subProcesses_, [run,phid](auto& subProcess){ subProcess.deleteRunFromCache(phid, run); });
1501  FDEBUG(1) << "\tdeleteRunFromCache " << run << "\n";
1502  }
#define FDEBUG(lev)
Definition: DebugMacros.h:20
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
void deleteRun(ProcessHistoryID const &phid, RunNumber_t run)
PrincipalCache principalCache_
void edm::EventProcessor::doErrorStuff ( )

Definition at line 933 of file EventProcessor.cc.

References FDEBUG.

Referenced by runToCompletion().

933  {
934  FDEBUG(1) << "\tdoErrorStuff\n";
935  LogError("StateMachine")
936  << "The EventProcessor state machine encountered an unexpected event\n"
937  << "and went to the error state\n"
938  << "Will attempt to terminate processing normally\n"
939  << "(IF using the looper the next loop will be attempted)\n"
940  << "This likely indicates a bug in an input module or corrupted input or both\n";
941  }
#define FDEBUG(lev)
Definition: DebugMacros.h:20
void edm::EventProcessor::enableEndPaths ( bool  active)

Turn end_paths "off" if "active" is false; turn end_paths "on" if "active" is true.

Definition at line 681 of file EventProcessor.cc.

References schedule_.

681  {
682  schedule_->enableEndPaths(active);
683  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
void edm::EventProcessor::endJob ( void  )

This should be called before the EventProcessor is destroyed throws if any module's endJob throws an exception.

Definition at line 625 of file EventProcessor.cc.

References actReg_, EnergyCorrector::c, edm::ExceptionCollector::call(), edm::SubProcess::doEndJob(), edm::InputSource::doEndJob(), edm::EDLooperBase::endOfJob(), edm::ExceptionCollector::hasThrown(), mps_fire::i, input_, looper(), looper_, edm::PreallocationConfiguration::numberOfStreams(), cmsPerfStripChart::operate(), preallocations_, edm::ExceptionCollector::rethrow(), schedule_, serviceToken_, and subProcesses_.

Referenced by PythonEventProcessor::~PythonEventProcessor().

625  {
626  // Collects exceptions, so we don't throw before all operations are performed.
627  ExceptionCollector c("Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
628 
629  //make the services available
631 
632  //NOTE: this really should go elsewhere in the future
633  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
634  c.call([this,i](){this->schedule_->endStream(i);});
635  for(auto& subProcess : subProcesses_) {
636  c.call([&subProcess,i](){ subProcess.doEndStream(i); } );
637  }
638  }
639  auto actReg = actReg_.get();
640  c.call([actReg](){actReg->preEndJobSignal_();});
641  schedule_->endJob(c);
642  for(auto& subProcess : subProcesses_) {
643  c.call(std::bind(&SubProcess::doEndJob, &subProcess));
644  }
645  c.call(std::bind(&InputSource::doEndJob, input_.get()));
646  if(looper_) {
647  c.call(std::bind(&EDLooperBase::endOfJob, looper()));
648  }
649  c.call([actReg](){actReg->postEndJobSignal_();});
650  if(c.hasThrown()) {
651  c.rethrow();
652  }
653  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
void doEndJob()
Called by framework at end of job.
Definition: InputSource.cc:219
PreallocationConfiguration preallocations_
std::vector< SubProcess > subProcesses_
ServiceToken serviceToken_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
virtual void endOfJob()
Definition: EDLooperBase.cc:90
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ActivityRegistry > actReg_
std::shared_ptr< EDLooperBase const > looper() const
def operate(timelog, memlog, json_f, num)
bool edm::EventProcessor::endOfLoop ( )

Definition at line 896 of file EventProcessor.cc.

References esp_, FDEBUG, forceLooperToEnd_, edm::EDLooperBase::kContinue, looper_, preg_, schedule_, and mps_update::status.

Referenced by runToCompletion().

896  {
897  if(looper_) {
898  ModuleChanger changer(schedule_.get(),preg_.get());
899  looper_->setModuleChanger(&changer);
900  EDLooperBase::Status status = looper_->doEndOfLoop(esp_->eventSetup());
901  looper_->setModuleChanger(nullptr);
902  if(status != EDLooperBase::kContinue || forceLooperToEnd_) return true;
903  else return false;
904  }
905  FDEBUG(1) << "\tendOfLoop\n";
906  return true;
907  }
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
#define FDEBUG(lev)
Definition: DebugMacros.h:20
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
bool edm::EventProcessor::endPathsEnabled ( ) const

Return true if end_paths are active, and false if they are inactive.

Definition at line 686 of file EventProcessor.cc.

References schedule_.

686  {
687  return schedule_->endPathsEnabled();
688  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
void edm::EventProcessor::endRun ( ProcessHistoryID const &  phid,
RunNumber_t  run,
bool  globalBeginSucceeded,
bool  cleaningUpAfterException 
)

Definition at line 1044 of file EventProcessor.cc.

References actReg_, edm::RunPrincipal::endTime(), esp_, espController_, FDEBUG, input_, looper_, edm::make_empty_waiting_task(), edm::EventID::maxEventNumber(), edm::LuminosityBlockID::maxLuminosityBlockNumber(), edm::PreallocationConfiguration::numberOfStreams(), preallocations_, principalCache_, processContext_, edm::RunPrincipal::run(), edm::PrincipalCache::runPrincipal(), schedule_, serviceToken_, edm::RunPrincipal::setEndTime(), and subProcesses_.

Referenced by endUnfinishedRun().

1044  {
1045  RunPrincipal& runPrincipal = principalCache_.runPrincipal(phid, run);
1046  runPrincipal.setEndTime(input_->timestamp());
1047 
1049  runPrincipal.endTime());
1050  {
1051  SendSourceTerminationSignalIfException sentry(actReg_.get());
1052  espController_->eventSetupForInstance(ts);
1053  sentry.completedSuccessfully();
1054  }
1055  EventSetup const& es = esp_->eventSetup();
1056  if(globalBeginSucceeded){
1057  //To wait, the ref count has to be 1+#streams
1058  auto streamLoopWaitTask = make_empty_waiting_task();
1059  streamLoopWaitTask->increment_ref_count();
1060 
1061  typedef OccurrenceTraits<RunPrincipal, BranchActionStreamEnd> Traits;
1062 
1063  endStreamsTransitionAsync<Traits>(WaitingTaskHolder(streamLoopWaitTask.get()),
1064  *schedule_,
1066  runPrincipal,
1067  ts,
1068  es,
1069  serviceToken_,
1070  subProcesses_,
1071  cleaningUpAfterException);
1072 
1073  streamLoopWaitTask->wait_for_all();
1074  if(streamLoopWaitTask->exceptionPtr() != nullptr) {
1075  std::rethrow_exception(* (streamLoopWaitTask->exceptionPtr()) );
1076  }
1077  }
1078  FDEBUG(1) << "\tstreamEndRun " << run << "\n";
1079  if(looper_) {
1080  //looper_->doStreamEndRun(schedule_->streamID(),runPrincipal, es);
1081  }
1082  {
1083  auto globalWaitTask = make_empty_waiting_task();
1084  globalWaitTask->increment_ref_count();
1085 
1086  typedef OccurrenceTraits<RunPrincipal, BranchActionGlobalEnd> Traits;
1087  endGlobalTransitionAsync<Traits>(WaitingTaskHolder(globalWaitTask.get()),
1088  *schedule_,
1089  runPrincipal,
1090  ts,
1091  es,
1092  serviceToken_,
1093  subProcesses_,
1094  cleaningUpAfterException);
1095  globalWaitTask->wait_for_all();
1096  if(globalWaitTask->exceptionPtr() != nullptr) {
1097  std::rethrow_exception(* (globalWaitTask->exceptionPtr()) );
1098  }
1099  }
1100  FDEBUG(1) << "\tendRun " << run << "\n";
1101  if(looper_) {
1102  looper_->doEndRun(runPrincipal, es, &processContext_);
1103  }
1104  }
ProcessContext processContext_
edm::propagate_const< std::unique_ptr< InputSource > > input_
static LuminosityBlockNumber_t maxLuminosityBlockNumber()
#define FDEBUG(lev)
Definition: DebugMacros.h:20
PreallocationConfiguration preallocations_
std::vector< SubProcess > subProcesses_
void setEndTime(Timestamp const &time)
Definition: RunPrincipal.h:85
ServiceToken serviceToken_
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
static EventNumber_t maxEventNumber()
Definition: EventID.h:111
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
PrincipalCache principalCache_
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const
void edm::EventProcessor::endUnfinishedLumi ( )

Definition at line 1394 of file EventProcessor.cc.

References mps_fire::i, edm::make_empty_waiting_task(), edm::PreallocationConfiguration::numberOfStreams(), preallocations_, streamEndLumiAsync(), streamLumiActive_, and streamLumiStatus_.

1394  {
1395  if(streamLumiActive_.load() > 0) {
1396  auto globalWaitTask = make_empty_waiting_task();
1397  globalWaitTask->increment_ref_count();
1398  {
1399  WaitingTaskHolder globalTaskHolder{globalWaitTask.get()};
1400  for(unsigned int i=0; i< preallocations_.numberOfStreams(); ++i) {
1401  if(streamLumiStatus_[i]) {
1402  streamEndLumiAsync(globalTaskHolder, i, streamLumiStatus_[i]);
1403  }
1404  }
1405  }
1406  globalWaitTask->wait_for_all();
1407  if(globalWaitTask->exceptionPtr() != nullptr) {
1408  std::rethrow_exception(* (globalWaitTask->exceptionPtr()) );
1409  }
1410  }
1411  }
void streamEndLumiAsync(edm::WaitingTaskHolder iTask, unsigned int iStreamIndex, std::shared_ptr< LuminosityBlockProcessingStatus > iLumiStatus)
PreallocationConfiguration preallocations_
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
std::atomic< unsigned int > streamLumiActive_
void edm::EventProcessor::endUnfinishedRun ( ProcessHistoryID const &  phid,
RunNumber_t  run,
bool  globalBeginSucceeded,
bool  cleaningUpAfterException,
bool  eventSetupForInstanceSucceeded 
)

Definition at line 1020 of file EventProcessor.cc.

References deleteRunFromCache(), endRun(), edm::make_empty_waiting_task(), edm::RunPrincipal::mergeableRunProductMetadata(), edm::MergeableRunProductMetadata::postWriteRun(), edm::MergeableRunProductMetadata::preWriteRun(), principalCache_, run(), edm::PrincipalCache::runPrincipal(), lumiQTWidget::t, and writeRunAsync().

1022  {
1023  if (eventSetupForInstanceSucceeded) {
1024  //If we skip empty runs, this would be called conditionally
1025  endRun(phid, run, globalBeginSucceeded, cleaningUpAfterException);
1026 
1027  if(globalBeginSucceeded) {
1029  t->increment_ref_count();
1030  RunPrincipal& runPrincipal = principalCache_.runPrincipal(phid, run);
1031  MergeableRunProductMetadata* mergeableRunProductMetadata = runPrincipal.mergeableRunProductMetadata();
1032  mergeableRunProductMetadata->preWriteRun();
1033  writeRunAsync(edm::WaitingTaskHolder{t.get()}, phid, run, mergeableRunProductMetadata);
1034  t->wait_for_all();
1035  mergeableRunProductMetadata->postWriteRun();
1036  if(t->exceptionPtr()) {
1037  std::rethrow_exception(*t->exceptionPtr());
1038  }
1039  }
1040  }
1041  deleteRunFromCache(phid, run);
1042  }
void deleteRunFromCache(ProcessHistoryID const &phid, RunNumber_t run)
void writeRunAsync(WaitingTaskHolder, ProcessHistoryID const &phid, RunNumber_t run, MergeableRunProductMetadata const *)
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
void endRun(ProcessHistoryID const &phid, RunNumber_t run, bool globalBeginSucceeded, bool cleaningUpAfterException)
MergeableRunProductMetadata * mergeableRunProductMetadata()
Definition: RunPrincipal.h:100
PrincipalCache principalCache_
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const
std::vector< ModuleDescription const * > edm::EventProcessor::getAllModuleDescriptions ( ) const

Return a vector allowing const access to all the ModuleDescriptions for this EventProccessor. *** N.B. *** Ownership of the ModuleDescriptions is not *** passed to the caller. Do not call delete on these *** pointers!

Definition at line 661 of file EventProcessor.cc.

References schedule_.

661  {
662  return schedule_->getAllModuleDescriptions();
663  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
ServiceToken edm::EventProcessor::getToken ( )

Definition at line 656 of file EventProcessor.cc.

References serviceToken_.

Referenced by ~EventProcessor().

656  {
657  return serviceToken_;
658  }
ServiceToken serviceToken_
void edm::EventProcessor::getTriggerReport ( TriggerReport rep) const

Return the trigger report information on paths, modules-in-path, modules-in-endpath, and modules.

Definition at line 691 of file EventProcessor.cc.

References schedule_.

691  {
692  schedule_->getTriggerReport(rep);
693  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
rep
Definition: cuy.py:1190
void edm::EventProcessor::globalEndLumiAsync ( edm::WaitingTaskHolder  iTask,
std::shared_ptr< LuminosityBlockProcessingStatus iLumiStatus 
)

Definition at line 1273 of file EventProcessor.cc.

References deleteLumiFromCache(), edm::WaitingTaskHolder::doneWaiting(), esp_, iovQueue_, mps_monitormerge::items, looper_, edm::make_waiting_task(), edm::EventID::maxEventNumber(), eostools::move(), cmsPerfStripChart::operate(), processContext_, edm::SerialTaskQueue::resume(), schedule_, serviceToken_, mps_update::status, subProcesses_, lumiQTWidget::t, tmp, and writeLumiAsync().

Referenced by streamEndLumiAsync().

1273  {
1274  //Need to be sure iTask is always destroyed after iLumiStatus since iLumiStatus can cause endRun to start.
1275  auto t = edm::make_waiting_task(tbb::task::allocate_root(), [ items = std::make_pair(iLumiStatus,std::move(iTask)), this] (std::exception_ptr const* iPtr) mutable {
1276  std::exception_ptr ptr;
1277  //use an easier to remember variable name
1278  auto status = std::move(items.first);
1279  if(iPtr) {
1280  ptr = *iPtr;
1281  WaitingTaskHolder tmp(items.second);
1282  //set the exception early to prevent a beginLumi from running
1283  // we use a copy to keep t from resetting on doneWaiting call.
1284  tmp.doneWaiting(ptr);
1285  } else {
1286  try {
1288  if(looper_) {
1289  auto& lp = *(status->lumiPrincipal());
1290  EventSetup const& es = esp_->eventSetup();
1291  looper_->doEndLuminosityBlock(lp, es, &processContext_);
1292  }
1293  }catch(...) {
1294  if(not ptr) {
1295  ptr = std::current_exception();
1296  }
1297  }
1298  }
1300  try {
1302  //release our hold on the IOV
1303  iovQueue_.resume();
1304  status->resumeGlobalLumiQueue();
1305  } catch(...) {
1306  if( not ptr) {
1307  ptr = std::current_exception();
1308  }
1309  }
1310  try {
1311  status.reset();
1312  } catch(...) {
1313  if( not ptr) {
1314  ptr = std::current_exception();
1315  }
1316  }
1317  //have to wait until reset is called since that could call endRun
1318  items.second.doneWaiting(ptr);
1319  });
1320 
1321  auto writeT = edm::make_waiting_task(tbb::task::allocate_root(), [this,status =iLumiStatus, task = WaitingTaskHolder(t)] (std::exception_ptr const* iExcept) mutable {
1322  if(iExcept) {
1323  task.doneWaiting(*iExcept);
1324  } else {
1325  //Only call writeLumi if beginLumi succeeded
1326  if(status->didGlobalBeginSucceed()) {
1328  }
1329  }
1330  });
1331  auto& lp = *(iLumiStatus->lumiPrincipal());
1332 
1333  IOVSyncValue ts(EventID(lp.run(), lp.luminosityBlock(), EventID::maxEventNumber()),
1334  lp.beginTime());
1335 
1336 
1337  typedef OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalEnd> Traits;
1338  EventSetup const& es = esp_->eventSetup();
1339 
1340  endGlobalTransitionAsync<Traits>(WaitingTaskHolder(writeT),
1341  *schedule_,
1342  lp,
1343  ts,
1344  es,
1345  serviceToken_,
1346  subProcesses_,
1347  iLumiStatus->cleaningUpAfterException());
1348  }
ProcessContext processContext_
void deleteLumiFromCache(LuminosityBlockProcessingStatus &)
std::vector< SubProcess > subProcesses_
ServiceToken serviceToken_
bool resume()
Resumes processing if the queue was paused.
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
void writeLumiAsync(WaitingTaskHolder, std::shared_ptr< LuminosityBlockProcessingStatus >)
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:92
static EventNumber_t maxEventNumber()
Definition: EventID.h:111
std::vector< std::vector< double > > tmp
Definition: MVATrainer.cc:100
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
edm::SerialTaskQueue iovQueue_
def move(src, dest)
Definition: eostools.py:511
def operate(timelog, memlog, json_f, num)
void edm::EventProcessor::handleNextEventForStreamAsync ( WaitingTaskHolder  iTask,
unsigned int  iStreamIndex 
)
private

Definition at line 1588 of file EventProcessor.cc.

References beginLumiAsync(), deferredExceptionPtr_, deferredExceptionPtrIsSet_, edm::WaitingTaskHolder::doneWaiting(), MillePedeFileConverter_cfg::e, edm::InputSource::IsLumi, lastTransitionType(), edm::make_waiting_task(), eostools::move(), cmsPerfStripChart::operate(), processEventAsync(), edm::SerialTaskQueueChain::push(), readNextEventForStream(), edm::SharedResourcesAcquirer::serialQueueChain(), serviceToken_, sourceResourcesAcquirer_, mps_update::status, streamEndLumiAsync(), and streamLumiStatus_.

Referenced by beginLumiAsync(), and continueLumiAsync().

1590  {
1591  sourceResourcesAcquirer_.serialQueueChain().push([this,iTask,iStreamIndex]() mutable {
1593  auto& status = streamLumiStatus_[iStreamIndex];
1594  try {
1595  if(readNextEventForStream(iStreamIndex, *status) ) {
1596  auto recursionTask = make_waiting_task(tbb::task::allocate_root(), [this,iTask,iStreamIndex](std::exception_ptr const* iPtr) mutable {
1597  if(iPtr) {
1598  bool expected = false;
1599  if(deferredExceptionPtrIsSet_.compare_exchange_strong(expected,true)) {
1600  deferredExceptionPtr_ = *iPtr;
1601  iTask.doneWaiting(*iPtr);
1602  }
1603  //the stream will stop now
1604  return;
1605  }
1606  handleNextEventForStreamAsync(std::move(iTask), iStreamIndex);
1607  });
1608 
1609  processEventAsync( WaitingTaskHolder(recursionTask), iStreamIndex);
1610  } else {
1611  //the stream will stop now
1612  if(status->isLumiEnding()) {
1613  if(lastTransitionType() == InputSource::IsLumi and not status->haveStartedNextLumi()) {
1614  status->startNextLumi();
1615  beginLumiAsync(status->nextSyncValue(), status->runResource(), iTask);
1616  }
1617  streamEndLumiAsync(std::move(iTask),iStreamIndex, status);
1618  } else {
1619  iTask.doneWaiting(std::exception_ptr{});
1620  }
1621  }
1622  } catch(...) {
1623  bool expected = false;
1624  if(deferredExceptionPtrIsSet_.compare_exchange_strong(expected,true)) {
1625  auto e =std::current_exception();
1627  iTask.doneWaiting(e);
1628  }
1629  }
1630  });
1631  }
bool readNextEventForStream(unsigned int iStreamIndex, LuminosityBlockProcessingStatus &iLumiStatus)
SharedResourcesAcquirer sourceResourcesAcquirer_
void processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
void streamEndLumiAsync(edm::WaitingTaskHolder iTask, unsigned int iStreamIndex, std::shared_ptr< LuminosityBlockProcessingStatus > iLumiStatus)
void push(T &&iAction)
asynchronously pushes functor iAction into queue
void handleNextEventForStreamAsync(WaitingTaskHolder iTask, unsigned int iStreamIndex)
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
InputSource::ItemType lastTransitionType() const
SerialTaskQueueChain & serialQueueChain() const
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:92
void beginLumiAsync(edm::IOVSyncValue const &iSyncValue, std::shared_ptr< void > const &iRunResource, edm::WaitingTaskHolder iHolder)
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
std::exception_ptr deferredExceptionPtr_
def move(src, dest)
Definition: eostools.py:511
def operate(timelog, memlog, json_f, num)
void edm::EventProcessor::init ( std::shared_ptr< ProcessDesc > &  processDesc,
ServiceToken const &  token,
serviceregistry::ServiceLegacy  iLegacy 
)
private

Definition at line 377 of file EventProcessor.cc.

References act_table_, actReg_, branchIDListHelper(), branchIDListHelper_, trackingPlots::common, edm::errors::Configuration, esp_, espController_, Exception, FDEBUG, fileModeNoMerge_, edm::fillLooper(), forceESCacheClearOnNewRun_, edm::ParameterSet::getUntrackedParameter(), historyAppender_, input_, edm::PrincipalCache::insert(), edm::ParentageRegistry::insertMapped(), edm::ParentageRegistry::instance(), mps_monitormerge::items, edm::serviceregistry::kConfigurationOverrides, looper_, lumiQueue_, edm::makeInput(), mergeableRunProductProcesses_, eostools::move(), jets_cff::nThreads, edm::PreallocationConfiguration::numberOfLuminosityBlocks(), edm::PreallocationConfiguration::numberOfStreams(), cmsPerfStripChart::operate(), or, edm::parameterSet(), edm::popSubProcessVParameterSet(), preallocations_, preg(), preg_, principalCache_, printDependencies_, processConfiguration_, processContext_, edm::ParameterSet::registerIt(), schedule_, serviceToken_, edm::PrincipalCache::setNumberOfConcurrentPrincipals(), edm::ProcessContext::setProcessConfiguration(), edm::MergeableRunProductProcesses::setProcessesWithMergeableRunProducts(), edm::PrincipalCache::setProcessHistoryRegistry(), edm::IllegalParameters::setThrowAnException(), streamLumiStatus_, streamQueues_, AlCaHLTBitMon_QueryRunRegistry::string, subProcesses_, thinnedAssociationsHelper(), thinnedAssociationsHelper_, and edm::validateTopLevelParameterSets().

Referenced by EventProcessor().

379  {
380 
381  //std::cerr << processDesc->dump() << std::endl;
382 
383  // register the empty parentage vector , once and for all
385 
386  // register the empty parameter set, once and for all.
387  ParameterSet().registerIt();
388 
389  std::shared_ptr<ParameterSet> parameterSet = processDesc->getProcessPSet();
390 
391  // If there are subprocesses, pop the subprocess parameter sets out of the process parameter set
392  auto subProcessVParameterSet = popSubProcessVParameterSet(*parameterSet);
393  bool const hasSubProcesses = !subProcessVParameterSet.empty();
394 
395  // Validates the parameters in the 'options', 'maxEvents', 'maxLuminosityBlocks',
396  // and 'maxSecondsUntilRampdown' top level parameter sets. Default values are also
397  // set in here if the parameters were not explicitly set.
398  validateTopLevelParameterSets(parameterSet.get());
399 
400  // Now set some parameters specific to the main process.
401  ParameterSet const& optionsPset(parameterSet->getUntrackedParameterSet("options"));
402  auto const& fileMode = optionsPset.getUntrackedParameter<std::string>("fileMode");
403  if (fileMode != "NOMERGE" and fileMode != "FULLMERGE") {
404  throw Exception(errors::Configuration, "Illegal fileMode parameter value: ")
405  << fileMode << ".\n"
406  << "Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
407  } else {
408  fileModeNoMerge_ = (fileMode == "NOMERGE");
409  }
410  forceESCacheClearOnNewRun_ = optionsPset.getUntrackedParameter<bool>("forceEventSetupCacheClearOnNewRun");
411 
412  //threading
413  unsigned int nThreads = optionsPset.getUntrackedParameter<unsigned int>("numberOfThreads");
414 
415  // Even if numberOfThreads was set to zero in the Python configuration, the code
416  // in cmsRun.cpp should have reset it to something else.
417  assert(nThreads != 0);
418 
419  unsigned int nStreams = optionsPset.getUntrackedParameter<unsigned int>("numberOfStreams");
420  if (nStreams == 0) {
421  nStreams = nThreads;
422  }
423  if (nThreads > 1 or nStreams > 1) {
424  edm::LogInfo("ThreadStreamSetup") <<"setting # threads "<<nThreads<<"\nsetting # streams "<<nStreams;
425  }
426  unsigned int nConcurrentRuns = optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentRuns");
427  if (nConcurrentRuns != 1) {
428  throw Exception(errors::Configuration, "Illegal value nConcurrentRuns : ")
429  << "Although the plan is to change this in the future, currently nConcurrentRuns must always be 1.\n";
430  }
431  unsigned int nConcurrentLumis = optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentLuminosityBlocks");
432  if (nConcurrentLumis == 0) {
433  nConcurrentLumis = nConcurrentRuns;
434  }
435 
436  //Check that relationships between threading parameters makes sense
437  /*
438  if(nThreads<nStreams) {
439  //bad
440  }
441  if(nConcurrentRuns>nStreams) {
442  //bad
443  }
444  if(nConcurrentRuns>nConcurrentLumis) {
445  //bad
446  }
447  */
448  IllegalParameters::setThrowAnException(optionsPset.getUntrackedParameter<bool>("throwIfIllegalParameter"));
449 
450  printDependencies_ = optionsPset.getUntrackedParameter<bool>("printDependencies");
451 
452  // Now do general initialization
453  ScheduleItems items;
454 
455  //initialize the services
456  auto& serviceSets = processDesc->getServicesPSets();
457  ServiceToken token = items.initServices(serviceSets, *parameterSet, iToken, iLegacy, true);
458  serviceToken_ = items.addCPRandTNS(*parameterSet, token);
459 
460  //make the services available
462 
463  if(nStreams>1) {
465  handler->willBeUsingThreads();
466  }
467 
468  // intialize miscellaneous items
469  std::shared_ptr<CommonParams> common(items.initMisc(*parameterSet));
470 
471  // intialize the event setup provider
472  esp_ = espController_->makeProvider(*parameterSet, items.actReg_.get());
473 
474  // initialize the looper, if any
475  looper_ = fillLooper(*espController_, *esp_, *parameterSet);
476  if(looper_) {
477  looper_->setActionTable(items.act_table_.get());
478  looper_->attachTo(*items.actReg_);
479 
480  //For now loopers make us run only 1 transition at a time
481  nStreams=1;
482  nConcurrentLumis=1;
483  nConcurrentRuns=1;
484  }
485 
486  preallocations_ = PreallocationConfiguration{nThreads,nStreams,nConcurrentLumis,nConcurrentRuns};
487 
488  lumiQueue_ = std::make_unique<LimitedTaskQueue>(nConcurrentLumis);
489  streamQueues_.resize(nStreams);
490  streamLumiStatus_.resize(nStreams);
491 
492  // initialize the input source
493  input_ = makeInput(*parameterSet,
494  *common,
495  items.preg(),
496  items.branchIDListHelper(),
497  items.thinnedAssociationsHelper(),
498  items.actReg_,
499  items.processConfiguration(),
501 
502  // intialize the Schedule
503  schedule_ = items.initSchedule(*parameterSet,hasSubProcesses,preallocations_,&processContext_);
504 
505  // set the data members
506  act_table_ = std::move(items.act_table_);
507  actReg_ = items.actReg_;
508  preg_ = items.preg();
510  branchIDListHelper_ = items.branchIDListHelper();
511  thinnedAssociationsHelper_ = items.thinnedAssociationsHelper();
512  processConfiguration_ = items.processConfiguration();
514  principalCache_.setProcessHistoryRegistry(input_->processHistoryRegistry());
515 
516  FDEBUG(2) << parameterSet << std::endl;
517 
519  for(unsigned int index = 0; index<preallocations_.numberOfStreams(); ++index ) {
520  // Reusable event principal
521  auto ep = std::make_shared<EventPrincipal>(preg(), branchIDListHelper(),
524  }
525 
526  for(unsigned int index =0; index < preallocations_.numberOfLuminosityBlocks(); ++index) {
527  auto lp = std::make_unique<LuminosityBlockPrincipal>(preg(), *processConfiguration_,
528  historyAppender_.get(), index);
530  }
531 
532  // fill the subprocesses, if there are any
533  subProcesses_.reserve(subProcessVParameterSet.size());
534  for(auto& subProcessPSet : subProcessVParameterSet) {
535  subProcesses_.emplace_back(subProcessPSet,
536  *parameterSet,
537  preg(),
540  SubProcessParentageHelper(),
542  *actReg_,
543  token,
546  &processContext_);
547  }
548  }
void insert(std::shared_ptr< RunPrincipal > rp)
ProcessContext processContext_
void setProcessesWithMergeableRunProducts(ProductRegistry const &productRegistry)
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
std::unique_ptr< edm::LimitedTaskQueue > lumiQueue_
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
#define FDEBUG(lev)
Definition: DebugMacros.h:20
PreallocationConfiguration preallocations_
std::vector< SubProcess > subProcesses_
void validateTopLevelParameterSets(ParameterSet *processParameterSet)
MergeableRunProductProcesses mergeableRunProductProcesses_
std::unique_ptr< InputSource > makeInput(ParameterSet &params, CommonParams const &common, std::shared_ptr< ProductRegistry > preg, std::shared_ptr< BranchIDListHelper > branchIDListHelper, std::shared_ptr< ThinnedAssociationsHelper > thinnedAssociationsHelper, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, PreallocationConfiguration const &allocations)
ServiceToken serviceToken_
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
std::shared_ptr< ProductRegistry const > preg() const
std::vector< edm::SerialTaskQueue > streamQueues_
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
static void setThrowAnException(bool v)
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< EDLooperBase > fillLooper(eventsetup::EventSetupsController &esController, eventsetup::EventSetupProvider &cp, ParameterSet &params)
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
Definition: common.py:1
std::vector< ParameterSet > popSubProcessVParameterSet(ParameterSet &parameterSet)
Definition: SubProcess.cc:686
void setProcessHistoryRegistry(ProcessHistoryRegistry const &phr)
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
static ParentageRegistry * instance()
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
bool insertMapped(value_type const &v)
ParameterSet const & parameterSet(Provenance const &provenance)
Definition: Provenance.cc:11
def move(src, dest)
Definition: eostools.py:511
PrincipalCache principalCache_
def operate(timelog, memlog, json_f, num)
InputSource::ItemType edm::EventProcessor::lastTransitionType ( ) const
inline
std::shared_ptr<EDLooperBase const> edm::EventProcessor::looper ( ) const
inlineprivate

Definition at line 285 of file EventProcessor.h.

References edm::get_underlying_safe().

Referenced by endJob().

285 {return get_underlying_safe(looper_);}
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
std::shared_ptr<EDLooperBase>& edm::EventProcessor::looper ( )
inlineprivate

Definition at line 286 of file EventProcessor.h.

References edm::get_underlying_safe().

286 {return get_underlying_safe(looper_);}
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::LuminosityBlockNumber_t edm::EventProcessor::nextLuminosityBlockID ( )

Definition at line 749 of file EventProcessor.cc.

References input_.

Referenced by readNextEventForStream().

749  {
750  return input_->luminosityBlock();
751  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::pair< edm::ProcessHistoryID, edm::RunNumber_t > edm::EventProcessor::nextRunID ( )

Definition at line 744 of file EventProcessor.cc.

References input_.

744  {
745  return std::make_pair(input_->reducedProcessHistoryID(), input_->run());
746  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
InputSource::ItemType edm::EventProcessor::nextTransitionType ( )

Definition at line 717 of file EventProcessor.cc.

References actReg_, checkForAsyncStopRequest(), deferredExceptionPtrIsSet_, epSuccess, edm::ExternalSignal, input_, edm::InputSource::IsStop, edm::InputSource::IsSynchronize, lastSourceTransition_, and runEdmFileComparison::returnCode.

Referenced by readNextEventForStream().

717  {
718  if (deferredExceptionPtrIsSet_.load()) {
720  return InputSource::IsStop;
721  }
722 
723  SendSourceTerminationSignalIfException sentry(actReg_.get());
724  InputSource::ItemType itemType;
725  //For now, do nothing with InputSource::IsSynchronize
726  do {
727  itemType = input_->nextItemType();
728  } while( itemType == InputSource::IsSynchronize);
729 
730  lastSourceTransition_ = itemType;
731  sentry.completedSuccessfully();
732 
734 
735  if(checkForAsyncStopRequest(returnCode)) {
736  actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExternalSignal);
738  }
739 
740  return lastSourceTransition_;
741  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
bool checkForAsyncStopRequest(StatusCode &)
std::atomic< bool > deferredExceptionPtrIsSet_
InputSource::ItemType lastSourceTransition_
std::shared_ptr< ActivityRegistry > actReg_
void edm::EventProcessor::openOutputFiles ( )

Definition at line 853 of file EventProcessor.cc.

References fb_, FDEBUG, edm::for_all(), schedule_, and subProcesses_.

853  {
854  if (fb_.get() != nullptr) {
855  schedule_->openOutputFiles(*fb_);
856  for_all(subProcesses_, [this](auto& subProcess){ subProcess.openOutputFiles(*fb_); });
857  }
858  FDEBUG(1) << "\topenOutputFiles\n";
859  }
#define FDEBUG(lev)
Definition: DebugMacros.h:20
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
EventProcessor& edm::EventProcessor::operator= ( EventProcessor const &  )
delete
std::shared_ptr<ProductRegistry const> edm::EventProcessor::preg ( ) const
inlineprivate

Definition at line 279 of file EventProcessor.h.

References edm::get_underlying_safe().

Referenced by beginJob(), init(), readAndMergeLumi(), readAndMergeRun(), readFile(), and readRun().

279 {return get_underlying_safe(preg_);}
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
std::shared_ptr<ProductRegistry>& edm::EventProcessor::preg ( )
inlineprivate

Definition at line 280 of file EventProcessor.h.

References edm::get_underlying_safe().

280 {return get_underlying_safe(preg_);}
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
void edm::EventProcessor::prepareForNextLoop ( )

Definition at line 915 of file EventProcessor.cc.

References esp_, FDEBUG, edm::propagate_const< T >::get(), and looper_.

Referenced by runToCompletion().

915  {
916  looper_->prepareForNextLoop(esp_.get());
917  FDEBUG(1) << "\tprepareForNextLoop\n";
918  }
#define FDEBUG(lev)
Definition: DebugMacros.h:20
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
element_type const * get() const
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
ProcessConfiguration const& edm::EventProcessor::processConfiguration ( ) const
inline

Definition at line 133 of file EventProcessor.h.

References EcalCondTools::getToken(), and cuy::rep.

133 { return *processConfiguration_; }
std::shared_ptr< ProcessConfiguration const > processConfiguration_
void edm::EventProcessor::processEventAsync ( WaitingTaskHolder  iHolder,
unsigned int  iStreamIndex 
)
private

Definition at line 1647 of file EventProcessor.cc.

References edm::make_functor_task(), and processEventAsyncImpl().

Referenced by handleNextEventForStreamAsync().

1648  {
1649  tbb::task::spawn( *make_functor_task( tbb::task::allocate_root(), [=]() {
1650  processEventAsyncImpl(iHolder, iStreamIndex);
1651  }) );
1652  }
FunctorTask< F > * make_functor_task(ALLOC &&iAlloc, F f)
Definition: FunctorTask.h:47
void processEventAsyncImpl(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
void edm::EventProcessor::processEventAsyncImpl ( WaitingTaskHolder  iHolder,
unsigned int  iStreamIndex 
)
private

Definition at line 1654 of file EventProcessor.cc.

References edm::WaitingTaskHolder::doneWaiting(), esp_, ev, edm::PrincipalCache::eventPrincipal(), FDEBUG, edm::Service< T >::isAvailable(), looper_, edm::make_waiting_task(), eostools::move(), cmsPerfStripChart::operate(), principalCache_, processEventWithLooper(), groupFilesInBlocks::reverse, schedule_, serviceToken_, and subProcesses_.

Referenced by processEventAsync().

1655  {
1656  auto pep = &(principalCache_.eventPrincipal(iStreamIndex));
1657 
1660  if(rng.isAvailable()) {
1661  Event ev(*pep, ModuleDescription(), nullptr);
1662  rng->postEventRead(ev);
1663  }
1664 
1665  WaitingTaskHolder finalizeEventTask( make_waiting_task(
1666  tbb::task::allocate_root(),
1667  [this,pep,iHolder](std::exception_ptr const* iPtr) mutable
1668  {
1669 
1670  //NOTE: If we have a looper we only have one Stream
1671  if(looper_) {
1673  processEventWithLooper(*pep);
1674  }
1675 
1676  FDEBUG(1) << "\tprocessEvent\n";
1677  pep->clearEventPrincipal();
1678  if(iPtr) {
1679  iHolder.doneWaiting(*iPtr);
1680  } else {
1681  iHolder.doneWaiting(std::exception_ptr());
1682  }
1683  }
1684  )
1685  );
1686  WaitingTaskHolder afterProcessTask;
1687  if(subProcesses_.empty()) {
1688  afterProcessTask = std::move(finalizeEventTask);
1689  } else {
1690  //Need to run SubProcesses after schedule has finished
1691  // with the event
1692  afterProcessTask = WaitingTaskHolder(
1693  make_waiting_task(tbb::task::allocate_root(),
1694  [this,pep,finalizeEventTask] (std::exception_ptr const* iPtr) mutable
1695  {
1696  if(not iPtr) {
1697  //when run with 1 thread, we want to the order to be what
1698  // it was before. This requires reversing the order since
1699  // tasks are run last one in first one out
1700  for(auto& subProcess: boost::adaptors::reverse(subProcesses_)) {
1701  subProcess.doEventAsync(finalizeEventTask,*pep);
1702  }
1703  } else {
1704  finalizeEventTask.doneWaiting(*iPtr);
1705  }
1706  })
1707  );
1708  }
1709 
1710  schedule_->processOneEventAsync(std::move(afterProcessTask),
1711  iStreamIndex,*pep, esp_->eventSetup(), serviceToken_);
1712 
1713  }
bool ev
#define FDEBUG(lev)
Definition: DebugMacros.h:20
std::vector< SubProcess > subProcesses_
ServiceToken serviceToken_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:92
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
void processEventWithLooper(EventPrincipal &)
def move(src, dest)
Definition: eostools.py:511
PrincipalCache principalCache_
def operate(timelog, memlog, json_f, num)
void edm::EventProcessor::processEventWithLooper ( EventPrincipal iPrincipal)
private

Definition at line 1715 of file EventProcessor.cc.

References esp_, input_, edm::InputSource::IsStop, edm::EDLooperBase::kContinue, edm::ProcessingController::kToPreviousEvent, edm::ProcessingController::kToSpecifiedEvent, edm::ProcessingController::lastOperationSucceeded(), lastSourceTransition_, looper_, processContext_, edm::ProcessingController::requestedTransition(), edm::ProcessingController::setLastOperationSucceeded(), shouldWeStop_, edm::ProcessingController::specifiedEventTransition(), mps_update::status, edm::EventPrincipal::streamID(), and summarizeEdmComparisonLogfiles::succeeded.

Referenced by processEventAsyncImpl().

1715  {
1716  bool randomAccess = input_->randomAccess();
1717  ProcessingController::ForwardState forwardState = input_->forwardState();
1718  ProcessingController::ReverseState reverseState = input_->reverseState();
1719  ProcessingController pc(forwardState, reverseState, randomAccess);
1720 
1722  do {
1723 
1724  StreamContext streamContext(iPrincipal.streamID(), &processContext_);
1725  status = looper_->doDuringLoop(iPrincipal, esp_->eventSetup(), pc, &streamContext);
1726 
1727  bool succeeded = true;
1728  if(randomAccess) {
1729  if(pc.requestedTransition() == ProcessingController::kToPreviousEvent) {
1730  input_->skipEvents(-2);
1731  }
1732  else if(pc.requestedTransition() == ProcessingController::kToSpecifiedEvent) {
1733  succeeded = input_->goToEvent(pc.specifiedEventTransition());
1734  }
1735  }
1736  pc.setLastOperationSucceeded(succeeded);
1737  } while(!pc.lastOperationSucceeded());
1738  if(status != EDLooperBase::kContinue) {
1739  shouldWeStop_ = true;
1741  }
1742  }
ProcessContext processContext_
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
InputSource::ItemType lastSourceTransition_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
InputSource::ItemType edm::EventProcessor::processLumis ( std::shared_ptr< void > const &  iRunResource)

Definition at line 1107 of file EventProcessor.cc.

References beginLumiAsync(), continueLumiAsync(), input_, lastTransitionType(), edm::make_empty_waiting_task(), edm::PreallocationConfiguration::numberOfStreams(), preallocations_, and streamLumiActive_.

1107  {
1108  auto waitTask = make_empty_waiting_task();
1109  waitTask->increment_ref_count();
1110 
1111  if(streamLumiActive_> 0) {
1113  continueLumiAsync(WaitingTaskHolder{waitTask.get()});
1114  } else {
1115  beginLumiAsync(IOVSyncValue(EventID(input_->run(), input_->luminosityBlock(), 0),
1116  input_->luminosityBlockAuxiliary()->beginTime()),
1117  iRunResource,
1118  WaitingTaskHolder{waitTask.get()});
1119  }
1120  waitTask->wait_for_all();
1121 
1122  if(waitTask->exceptionPtr() != nullptr) {
1123  std::rethrow_exception(* (waitTask->exceptionPtr()) );
1124  }
1125  return lastTransitionType();
1126  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
PreallocationConfiguration preallocations_
InputSource::ItemType lastTransitionType() const
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
void continueLumiAsync(edm::WaitingTaskHolder iHolder)
void beginLumiAsync(edm::IOVSyncValue const &iSyncValue, std::shared_ptr< void > const &iRunResource, edm::WaitingTaskHolder iHolder)
std::atomic< unsigned int > streamLumiActive_
int edm::EventProcessor::readAndMergeLumi ( LuminosityBlockProcessingStatus iStatus)

Definition at line 1465 of file EventProcessor.cc.

References actReg_, input_, edm::LuminosityBlockProcessingStatus::lumiPrincipal(), or, and preg().

Referenced by readNextEventForStream().

1465  {
1466  auto& lumiPrincipal = *iStatus.lumiPrincipal();
1467  assert(lumiPrincipal.aux().sameIdentity(*input_->luminosityBlockAuxiliary()) or
1468  input_->processHistoryRegistry().reducedProcessHistoryID(lumiPrincipal.aux().processHistoryID()) == input_->processHistoryRegistry().reducedProcessHistoryID(input_->luminosityBlockAuxiliary()->processHistoryID()));
1469  bool lumiOK = lumiPrincipal.adjustToNewProductRegistry(*preg());
1470  assert(lumiOK);
1471  lumiPrincipal.mergeAuxiliary(*input_->luminosityBlockAuxiliary());
1472  {
1473  SendSourceTerminationSignalIfException sentry(actReg_.get());
1474  input_->readAndMergeLumi(*iStatus.lumiPrincipal());
1475  sentry.completedSuccessfully();
1476  }
1477  return input_->luminosityBlock();
1478  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::shared_ptr< ProductRegistry const > preg() const
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
std::shared_ptr< ActivityRegistry > actReg_
std::pair< ProcessHistoryID, RunNumber_t > edm::EventProcessor::readAndMergeRun ( )

Definition at line 1433 of file EventProcessor.cc.

References actReg_, input_, edm::PrincipalCache::merge(), preg(), principalCache_, and edm::PrincipalCache::runPrincipalPtr().

1433  {
1434  principalCache_.merge(input_->runAuxiliary(), preg());
1435  auto runPrincipal =principalCache_.runPrincipalPtr();
1436  {
1437  SendSourceTerminationSignalIfException sentry(actReg_.get());
1438  input_->readAndMergeRun(*runPrincipal);
1439  sentry.completedSuccessfully();
1440  }
1441  assert(input_->reducedProcessHistoryID() == runPrincipal->reducedProcessHistoryID());
1442  return std::make_pair(runPrincipal->reducedProcessHistoryID(), input_->run());
1443  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::shared_ptr< ProductRegistry const > preg() const
std::shared_ptr< RunPrincipal > const & runPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run) const
std::shared_ptr< ActivityRegistry > actReg_
void merge(std::shared_ptr< RunAuxiliary > aux, std::shared_ptr< ProductRegistry const > reg)
PrincipalCache principalCache_
void edm::EventProcessor::readEvent ( unsigned int  iStreamIndex)
private

Definition at line 1633 of file EventProcessor.cc.

References actReg_, edm::PrincipalCache::eventPrincipal(), FDEBUG, input_, principalCache_, processContext_, and streamLumiStatus_.

Referenced by readNextEventForStream().

1633  {
1634  //TODO this will have to become per stream
1635  auto& event = principalCache_.eventPrincipal(iStreamIndex);
1636  StreamContext streamContext(event.streamID(), &processContext_);
1637 
1638  SendSourceTerminationSignalIfException sentry(actReg_.get());
1639  input_->readEvent(event, streamContext);
1640 
1641  streamLumiStatus_[iStreamIndex]->updateLastTimestamp(input_->timestamp());
1642  sentry.completedSuccessfully();
1643 
1644  FDEBUG(1) << "\treadEvent\n";
1645  }
ProcessContext processContext_
edm::propagate_const< std::unique_ptr< InputSource > > input_
#define FDEBUG(lev)
Definition: DebugMacros.h:20
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
std::shared_ptr< ActivityRegistry > actReg_
Definition: event.py:1
PrincipalCache principalCache_
void edm::EventProcessor::readFile ( )

Definition at line 825 of file EventProcessor.cc.

References actReg_, edm::PrincipalCache::adjustEventsToNewProductRegistry(), edm::PrincipalCache::adjustIndexesAfterProductRegistryAddition(), fb_, FDEBUG, input_, edm::PreallocationConfiguration::numberOfStreams(), edm::PreallocationConfiguration::numberOfThreads(), edm::FileBlock::ParallelProcesses, preallocations_, preg(), preg_, edm::PrincipalCache::preReadFile(), principalCache_, and findQualityFiles::size.

Referenced by Vispa.Plugins.EventBrowser.EventBrowserTabController.EventBrowserTabController::navigate(), Vispa.Main.TabController.TabController::open(), and Vispa.Main.TabController.TabController::refresh().

825  {
826  FDEBUG(1) << " \treadFile\n";
827  size_t size = preg_->size();
828  SendSourceTerminationSignalIfException sentry(actReg_.get());
829 
831 
832  fb_ = input_->readFile();
833  if(size < preg_->size()) {
835  }
839  fb_->setNotFastClonable(FileBlock::ParallelProcesses);
840  }
841  sentry.completedSuccessfully();
842  }
size
Write out results.
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
void adjustIndexesAfterProductRegistryAddition()
#define FDEBUG(lev)
Definition: DebugMacros.h:20
PreallocationConfiguration preallocations_
std::shared_ptr< ProductRegistry const > preg() const
void adjustEventsToNewProductRegistry(std::shared_ptr< ProductRegistry const > reg)
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
std::shared_ptr< ActivityRegistry > actReg_
PrincipalCache principalCache_
void edm::EventProcessor::readLuminosityBlock ( LuminosityBlockProcessingStatus iStatus)

Definition at line 1445 of file EventProcessor.cc.

References actReg_, Exception, edm::PrincipalCache::getAvailableLumiPrincipalPtr(), edm::PrincipalCache::hasRunPrincipal(), historyAppender_, input_, edm::errors::LogicError, edm::LuminosityBlockProcessingStatus::lumiPrincipal(), eostools::move(), principalCache_, and edm::PrincipalCache::runPrincipalPtr().

Referenced by beginLumiAsync().

1445  {
1448  << "EventProcessor::readLuminosityBlock\n"
1449  << "Illegal attempt to insert lumi into cache\n"
1450  << "Run is invalid\n"
1451  << "Contact a Framework Developer\n";
1452  }
1454  assert(lbp);
1455  lbp->setAux(*input_->luminosityBlockAuxiliary());
1456  {
1457  SendSourceTerminationSignalIfException sentry(actReg_.get());
1458  input_->readLuminosityBlock(*lbp, *historyAppender_);
1459  sentry.completedSuccessfully();
1460  }
1461  lbp->setRunPrincipal(principalCache_.runPrincipalPtr());
1462  iStatus.lumiPrincipal() = std::move(lbp);
1463  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
bool hasRunPrincipal() const
std::shared_ptr< RunPrincipal > const & runPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run) const
std::shared_ptr< LuminosityBlockPrincipal > getAvailableLumiPrincipalPtr()
std::shared_ptr< ActivityRegistry > actReg_
def move(src, dest)
Definition: eostools.py:511
PrincipalCache principalCache_
bool edm::EventProcessor::readNextEventForStream ( unsigned int  iStreamIndex,
LuminosityBlockProcessingStatus iLumiStatus 
)
private

Definition at line 1529 of file EventProcessor.cc.

References edm::LuminosityBlockProcessingStatus::continuingLumi(), deferredExceptionPtr_, deferredExceptionPtrIsSet_, edm::LuminosityBlockProcessingStatus::endLumi(), edm::LuminosityBlockProcessingStatus::haveContinuedLumi(), input_, edm::InputSource::IsEvent, edm::InputSource::IsLumi, edm::InputSource::IsRun, edm::InputSource::IsStop, edm::LuminosityBlockProcessingStatus::lumiPrincipal(), nextLuminosityBlockID(), nextTransitionType(), cmsPerfStripChart::operate(), or, readAndMergeLumi(), readEvent(), serviceToken_, edm::LuminosityBlockProcessingStatus::setNextSyncValue(), shouldWeStop(), sourceMutex_, edm::LuminosityBlockProcessingStatus::stopProcessingEvents(), and edm::LuminosityBlockProcessingStatus::wasEventProcessingStopped().

Referenced by handleNextEventForStreamAsync().

1530  {
1531  if(shouldWeStop()) {
1532  return false;
1533  }
1534 
1535  if(deferredExceptionPtrIsSet_.load(std::memory_order_acquire)) {
1536  return false;
1537  }
1538 
1539  if(iStatus.wasEventProcessingStopped()) {
1540  return false;
1541  }
1542 
1544  try {
1545  //need to use lock in addition to the serial task queue because
1546  // of delayed provenance reading and reading data in response to
1547  // edm::Refs etc
1548  std::lock_guard<std::recursive_mutex> guard(*(sourceMutex_.get()));
1549 
1550  auto itemType = iStatus.continuingLumi()? InputSource::IsLumi : nextTransitionType();
1551  if(InputSource::IsLumi == itemType) {
1552  iStatus.haveContinuedLumi();
1553  while(itemType == InputSource::IsLumi and
1554  iStatus.lumiPrincipal()->run() == input_->run() and
1555  iStatus.lumiPrincipal()->luminosityBlock() == nextLuminosityBlockID()) {
1556  readAndMergeLumi(iStatus);
1557  itemType = nextTransitionType();
1558  }
1559  if(InputSource::IsLumi == itemType) {
1560  iStatus.setNextSyncValue(IOVSyncValue(EventID(input_->run(), input_->luminosityBlock(), 0),
1561  input_->luminosityBlockAuxiliary()->beginTime()));
1562  }
1563  }
1564  if(InputSource::IsEvent != itemType) {
1565  iStatus.stopProcessingEvents();
1566 
1567  //IsFile may continue processing the lumi and
1568  // looper_ can cause the input source to declare a new IsRun which is actually
1569  // just a continuation of the previous run
1570  if(InputSource::IsStop == itemType or
1571  InputSource::IsLumi == itemType or
1572  (InputSource::IsRun == itemType and iStatus.lumiPrincipal()->run() != input_->run())) {
1573  iStatus.endLumi();
1574  }
1575  return false;
1576  }
1577  readEvent(iStreamIndex);
1578  } catch (...) {
1579  bool expected =false;
1580  if(deferredExceptionPtrIsSet_.compare_exchange_strong(expected,true)) {
1581  deferredExceptionPtr_ = std::current_exception();
1582  }
1583  return false;
1584  }
1585  return true;
1586  }
void readEvent(unsigned int iStreamIndex)
InputSource::ItemType nextTransitionType()
edm::propagate_const< std::unique_ptr< InputSource > > input_
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
std::shared_ptr< std::recursive_mutex > sourceMutex_
int readAndMergeLumi(LuminosityBlockProcessingStatus &)
std::exception_ptr deferredExceptionPtr_
edm::LuminosityBlockNumber_t nextLuminosityBlockID()
bool shouldWeStop() const
def operate(timelog, memlog, json_f, num)
std::pair< ProcessHistoryID, RunNumber_t > edm::EventProcessor::readRun ( )

Definition at line 1413 of file EventProcessor.cc.

References actReg_, Exception, edm::PrincipalCache::hasRunPrincipal(), historyAppender_, input_, edm::PrincipalCache::insert(), edm::errors::LogicError, mergeableRunProductProcesses_, preg(), principalCache_, and processConfiguration_.

1413  {
1416  << "EventProcessor::readRun\n"
1417  << "Illegal attempt to insert run into cache\n"
1418  << "Contact a Framework Developer\n";
1419  }
1420  auto rp = std::make_shared<RunPrincipal>(input_->runAuxiliary(), preg(),
1422  0, true, &mergeableRunProductProcesses_);
1423  {
1424  SendSourceTerminationSignalIfException sentry(actReg_.get());
1425  input_->readRun(*rp, *historyAppender_);
1426  sentry.completedSuccessfully();
1427  }
1428  assert(input_->reducedProcessHistoryID() == rp->reducedProcessHistoryID());
1429  principalCache_.insert(rp);
1430  return std::make_pair(rp->reducedProcessHistoryID(), input_->run());
1431  }
void insert(std::shared_ptr< RunPrincipal > rp)
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
bool hasRunPrincipal() const
MergeableRunProductProcesses mergeableRunProductProcesses_
std::shared_ptr< ProductRegistry const > preg() const
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< ActivityRegistry > actReg_
PrincipalCache principalCache_
void edm::EventProcessor::respondToCloseInputFile ( )

Definition at line 878 of file EventProcessor.cc.

References fb_, FDEBUG, edm::for_all(), schedule_, and subProcesses_.

878  {
879  if (fb_.get() != nullptr) {
880  schedule_->respondToCloseInputFile(*fb_);
881  for_all(subProcesses_, [this](auto& subProcess){ subProcess.respondToCloseInputFile(*fb_); });
882  }
883  FDEBUG(1) << "\trespondToCloseInputFile\n";
884  }
#define FDEBUG(lev)
Definition: DebugMacros.h:20
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
void edm::EventProcessor::respondToOpenInputFile ( )

Definition at line 869 of file EventProcessor.cc.

References branchIDListHelper_, fb_, FDEBUG, edm::for_all(), schedule_, and subProcesses_.

869  {
870  for_all(subProcesses_, [this](auto& subProcess){ subProcess.updateBranchIDListHelper(branchIDListHelper_->branchIDLists()); } );
871  if (fb_.get() != nullptr) {
872  schedule_->respondToOpenInputFile(*fb_);
873  for_all(subProcesses_, [this](auto& subProcess) { subProcess.respondToOpenInputFile(*fb_); });
874  }
875  FDEBUG(1) << "\trespondToOpenInputFile\n";
876  }
#define FDEBUG(lev)
Definition: DebugMacros.h:20
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
void edm::EventProcessor::rewindInput ( )

Definition at line 909 of file EventProcessor.cc.

References FDEBUG, and input_.

Referenced by runToCompletion().

909  {
910  input_->repeat();
911  input_->rewind();
912  FDEBUG(1) << "\trewind\n";
913  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
#define FDEBUG(lev)
Definition: DebugMacros.h:20
EventProcessor::StatusCode edm::EventProcessor::run ( )
inline

Definition at line 355 of file EventProcessor.h.

Referenced by Types.EventID::cppID(), Types.LuminosityBlockID::cppID(), and endUnfinishedRun().

355  {
356  return runToCompletion();
357  }
StatusCode runToCompletion()
EventProcessor::StatusCode edm::EventProcessor::runToCompletion ( )

Definition at line 754 of file EventProcessor.cc.

References cms::Exception::addAdditionalInfo(), cms::Exception::alreadyPrinted(), asyncStopRequestedWhileProcessingEvents_, asyncStopStatusCodeFromProcessingEvents_, beginJob(), deferredExceptionPtr_, deferredExceptionPtrIsSet_, doErrorStuff(), MillePedeFileConverter_cfg::e, endOfLoop(), epSuccess, Exception, exceptionMessageFiles_, exceptionMessageLumis_, exceptionMessageRuns_, fileModeNoMerge_, edm::InputSource::IsStop, cmsPerfStripChart::operate(), prepareForNextLoop(), runEdmFileComparison::returnCode, rewindInput(), serviceToken_, startingNewLoop(), and edm::convertException::wrap().

Referenced by PythonEventProcessor::run().

754  {
755 
758  {
759  beginJob(); //make sure this was called
760 
761  // make the services available
763 
765  try {
766  FilesProcessor fp(fileModeNoMerge_);
767 
768  convertException::wrap([&]() {
769  bool firstTime = true;
770  do {
771  if(not firstTime) {
773  rewindInput();
774  } else {
775  firstTime = false;
776  }
777  startingNewLoop();
778 
779  auto trans = fp.processFiles(*this);
780 
781  fp.normalEnd();
782 
783  if(deferredExceptionPtrIsSet_.load()) {
784  std::rethrow_exception(deferredExceptionPtr_);
785  }
786  if(trans != InputSource::IsStop) {
787  //problem with the source
788  doErrorStuff();
789 
790  throw cms::Exception("BadTransition")
791  << "Unexpected transition change "
792  << trans;
793 
794  }
795  } while(not endOfLoop());
796  }); // convertException::wrap
797 
798  } // Try block
799  catch (cms::Exception & e) {
800  if (!exceptionMessageLumis_.empty()) {
802  if (e.alreadyPrinted()) {
803  LogAbsolute("Additional Exceptions") << exceptionMessageLumis_;
804  }
805  }
806  if (!exceptionMessageRuns_.empty()) {
808  if (e.alreadyPrinted()) {
809  LogAbsolute("Additional Exceptions") << exceptionMessageRuns_;
810  }
811  }
812  if (!exceptionMessageFiles_.empty()) {
814  if (e.alreadyPrinted()) {
815  LogAbsolute("Additional Exceptions") << exceptionMessageFiles_;
816  }
817  }
818  throw;
819  }
820  }
821 
822  return returnCode;
823  }
std::string exceptionMessageRuns_
bool alreadyPrinted() const
Definition: Exception.cc:251
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
std::string exceptionMessageLumis_
void addAdditionalInfo(std::string const &info)
Definition: Exception.cc:235
std::string exceptionMessageFiles_
StatusCode asyncStopStatusCodeFromProcessingEvents_
std::exception_ptr deferredExceptionPtr_
auto wrap(F iFunc) -> decltype(iFunc())
bool asyncStopRequestedWhileProcessingEvents_
def operate(timelog, memlog, json_f, num)
bool edm::EventProcessor::setDeferredException ( std::exception_ptr  iException)

Definition at line 1770 of file EventProcessor.cc.

References deferredExceptionPtr_, and deferredExceptionPtrIsSet_.

1770  {
1771  bool expected =false;
1772  if(deferredExceptionPtrIsSet_.compare_exchange_strong(expected,true)) {
1773  deferredExceptionPtr_ = iException;
1774  return true;
1775  }
1776  return false;
1777  }
std::atomic< bool > deferredExceptionPtrIsSet_
std::exception_ptr deferredExceptionPtr_
void edm::EventProcessor::setExceptionMessageFiles ( std::string &  message)

Definition at line 1758 of file EventProcessor.cc.

References exceptionMessageFiles_.

1758  {
1759  exceptionMessageFiles_ = message;
1760  }
std::string exceptionMessageFiles_
void edm::EventProcessor::setExceptionMessageLumis ( std::string &  message)

Definition at line 1766 of file EventProcessor.cc.

References exceptionMessageLumis_.

1766  {
1767  exceptionMessageLumis_ = message;
1768  }
std::string exceptionMessageLumis_
void edm::EventProcessor::setExceptionMessageRuns ( std::string &  message)

Definition at line 1762 of file EventProcessor.cc.

References exceptionMessageRuns_.

1762  {
1763  exceptionMessageRuns_ = message;
1764  }
std::string exceptionMessageRuns_
bool edm::EventProcessor::shouldWeCloseOutput ( ) const

Definition at line 920 of file EventProcessor.cc.

References FDEBUG, schedule_, and subProcesses_.

920  {
921  FDEBUG(1) << "\tshouldWeCloseOutput\n";
922  if(!subProcesses_.empty()) {
923  for(auto const& subProcess : subProcesses_) {
924  if(subProcess.shouldWeCloseOutput()) {
925  return true;
926  }
927  }
928  return false;
929  }
930  return schedule_->shouldWeCloseOutput();
931  }
#define FDEBUG(lev)
Definition: DebugMacros.h:20
std::vector< SubProcess > subProcesses_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
bool edm::EventProcessor::shouldWeStop ( ) const

Definition at line 1744 of file EventProcessor.cc.

References FDEBUG, schedule_, shouldWeStop_, and subProcesses_.

Referenced by readNextEventForStream().

1744  {
1745  FDEBUG(1) << "\tshouldWeStop\n";
1746  if(shouldWeStop_) return true;
1747  if(!subProcesses_.empty()) {
1748  for(auto const& subProcess : subProcesses_) {
1749  if(subProcess.terminate()) {
1750  return true;
1751  }
1752  }
1753  return false;
1754  }
1755  return schedule_->terminate();
1756  }
#define FDEBUG(lev)
Definition: DebugMacros.h:20
std::vector< SubProcess > subProcesses_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
void edm::EventProcessor::startingNewLoop ( )

Definition at line 886 of file EventProcessor.cc.

References FDEBUG, looper_, looperBeginJobRun_, and shouldWeStop_.

Referenced by runToCompletion().

886  {
887  shouldWeStop_ = false;
888  //NOTE: for first loop, need to delay calling 'doStartingNewLoop'
889  // until after we've called beginOfJob
890  if(looper_ && looperBeginJobRun_) {
891  looper_->doStartingNewLoop();
892  }
893  FDEBUG(1) << "\tstartingNewLoop\n";
894  }
#define FDEBUG(lev)
Definition: DebugMacros.h:20
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
void edm::EventProcessor::streamEndLumiAsync ( edm::WaitingTaskHolder  iTask,
unsigned int  iStreamIndex,
std::shared_ptr< LuminosityBlockProcessingStatus iLumiStatus 
)

Definition at line 1350 of file EventProcessor.cc.

References edm::WaitingTaskHolder::doneWaiting(), esp_, globalEndLumiAsync(), edm::make_waiting_task(), edm::EventID::maxEventNumber(), eostools::move(), schedule_, serviceToken_, mps_update::status, streamLumiActive_, streamLumiStatus_, streamQueues_, subProcesses_, and lumiQTWidget::t.

Referenced by endUnfinishedLumi(), and handleNextEventForStreamAsync().

1352  {
1353 
1354  auto t =edm::make_waiting_task(tbb::task::allocate_root(), [this, iStreamIndex, iTask](std::exception_ptr const* iPtr) mutable {
1355  std::exception_ptr ptr;
1356  if(iPtr) {
1357  ptr = *iPtr;
1358  }
1359  auto status =streamLumiStatus_[iStreamIndex];
1360  //reset status before releasing queue else get race condtion
1361  streamLumiStatus_[iStreamIndex].reset();
1363  streamQueues_[iStreamIndex].resume();
1364 
1365  //are we the last one?
1366  if( status->streamFinishedLumi()) {
1368  }
1369  iTask.doneWaiting(ptr);
1370  });
1371 
1372  edm::WaitingTaskHolder lumiDoneTask{t};
1373 
1374  iLumiStatus->setEndTime();
1375 
1376  if(iLumiStatus->didGlobalBeginSucceed()) {
1377  auto & lumiPrincipal = *iLumiStatus->lumiPrincipal();
1378  IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), EventID::maxEventNumber()),
1379  lumiPrincipal.endTime());
1380  EventSetup const& es = esp_->eventSetup();
1381 
1382  bool cleaningUpAfterException = iLumiStatus->cleaningUpAfterException();
1383 
1384  using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamEnd>;
1385  endStreamTransitionAsync<Traits>(std::move(lumiDoneTask),
1386  *schedule_,iStreamIndex,
1387  lumiPrincipal,ts,es,
1388  serviceToken_,
1389  subProcesses_,cleaningUpAfterException);
1390  }
1391  }
void globalEndLumiAsync(edm::WaitingTaskHolder iTask, std::shared_ptr< LuminosityBlockProcessingStatus > iLumiStatus)
std::vector< SubProcess > subProcesses_
ServiceToken serviceToken_
void doneWaiting(std::exception_ptr iExcept)
std::vector< edm::SerialTaskQueue > streamQueues_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:92
static EventNumber_t maxEventNumber()
Definition: EventID.h:111
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::atomic< unsigned int > streamLumiActive_
def move(src, dest)
Definition: eostools.py:511
std::shared_ptr<ThinnedAssociationsHelper const> edm::EventProcessor::thinnedAssociationsHelper ( ) const
inlineprivate

Definition at line 283 of file EventProcessor.h.

References edm::get_underlying_safe().

Referenced by init().

std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
std::shared_ptr<ThinnedAssociationsHelper>& edm::EventProcessor::thinnedAssociationsHelper ( )
inlineprivate

Definition at line 284 of file EventProcessor.h.

References edm::get_underlying_safe().

std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
int edm::EventProcessor::totalEvents ( ) const

Return the number of events this EventProcessor has tried to process (inclues both successes and failures, including failures due to exceptions during processing).

Definition at line 666 of file EventProcessor.cc.

References schedule_.

Referenced by PythonEventProcessor::totalEvents().

666  {
667  return schedule_->totalEvents();
668  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
int edm::EventProcessor::totalEventsFailed ( ) const

Return the number of events that have not passed any trigger. (N.B. totalEventsFailed() + totalEventsPassed() == totalEvents()

Definition at line 676 of file EventProcessor.cc.

References schedule_.

Referenced by PythonEventProcessor::totalEventsFailed().

676  {
677  return schedule_->totalEventsFailed();
678  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
int edm::EventProcessor::totalEventsPassed ( ) const

Return the number of events processed by this EventProcessor which have been passed by one or more trigger paths.

Definition at line 671 of file EventProcessor.cc.

References schedule_.

Referenced by PythonEventProcessor::totalEventsPassed().

671  {
672  return schedule_->totalEventsPassed();
673  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
void edm::EventProcessor::writeLumiAsync ( WaitingTaskHolder  task,
std::shared_ptr< LuminosityBlockProcessingStatus iStatus 
)

Definition at line 1504 of file EventProcessor.cc.

References actReg_, edm::WaitingTaskHolder::doneWaiting(), edm::make_waiting_task(), processContext_, alignCSCRings::s, schedule_, serviceToken_, and subProcesses_.

Referenced by globalEndLumiAsync().

1504  {
1505  auto subsT = edm::make_waiting_task(tbb::task::allocate_root(), [this,task, iStatus](std::exception_ptr const* iExcept) mutable {
1506  if(iExcept) {
1507  task.doneWaiting(*iExcept);
1508  } else {
1510  for(auto&s : subProcesses_) {
1511  s.writeLumiAsync(task,*(iStatus->lumiPrincipal()));
1512  }
1513  }
1514  });
1516 
1517  std::shared_ptr<LuminosityBlockPrincipal> const& lumiPrincipal = iStatus->lumiPrincipal();
1518  lumiPrincipal->runPrincipal().mergeableRunProductMetadata()->writeLumi(lumiPrincipal->luminosityBlock());
1519 
1520  schedule_->writeLumiAsync(WaitingTaskHolder{subsT}, *lumiPrincipal, &processContext_, actReg_.get());
1521  }
ProcessContext processContext_
std::vector< SubProcess > subProcesses_
ServiceToken serviceToken_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:92
std::shared_ptr< ActivityRegistry > actReg_
void edm::EventProcessor::writeRunAsync ( WaitingTaskHolder  task,
ProcessHistoryID const &  phid,
RunNumber_t  run,
MergeableRunProductMetadata const *  mergeableRunProductMetadata 
)

Definition at line 1480 of file EventProcessor.cc.

References actReg_, edm::WaitingTaskHolder::doneWaiting(), edm::make_waiting_task(), principalCache_, processContext_, edm::PrincipalCache::runPrincipal(), alignCSCRings::s, schedule_, serviceToken_, and subProcesses_.

Referenced by endUnfinishedRun().

1481  {
1482  auto subsT = edm::make_waiting_task(tbb::task::allocate_root(), [this,phid,run,task,mergeableRunProductMetadata]
1483  (std::exception_ptr const* iExcept) mutable {
1484  if(iExcept) {
1485  task.doneWaiting(*iExcept);
1486  } else {
1488  for(auto&s : subProcesses_) {
1489  s.writeRunAsync(task,phid,run,mergeableRunProductMetadata);
1490  }
1491  }
1492  });
1494  schedule_->writeRunAsync(WaitingTaskHolder(subsT), principalCache_.runPrincipal(phid, run),
1495  &processContext_, actReg_.get(), mergeableRunProductMetadata);
1496  }
ProcessContext processContext_
std::vector< SubProcess > subProcesses_
ServiceToken serviceToken_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:92
std::shared_ptr< ActivityRegistry > actReg_
PrincipalCache principalCache_
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const

Member Data Documentation

std::unique_ptr<ExceptionToActionTable const> edm::EventProcessor::act_table_
private

Definition at line 304 of file EventProcessor.h.

Referenced by init().

std::shared_ptr<ActivityRegistry> edm::EventProcessor::actReg_
private
bool edm::EventProcessor::asyncStopRequestedWhileProcessingEvents_
private

Definition at line 340 of file EventProcessor.h.

Referenced by runToCompletion().

StatusCode edm::EventProcessor::asyncStopStatusCodeFromProcessingEvents_
private

Definition at line 341 of file EventProcessor.h.

Referenced by runToCompletion().

bool edm::EventProcessor::beginJobCalled_
private

Definition at line 328 of file EventProcessor.h.

Referenced by beginJob().

edm::propagate_const<std::shared_ptr<BranchIDListHelper> > edm::EventProcessor::branchIDListHelper_
private

Definition at line 296 of file EventProcessor.h.

Referenced by init(), and respondToOpenInputFile().

std::exception_ptr edm::EventProcessor::deferredExceptionPtr_
private
std::atomic<bool> edm::EventProcessor::deferredExceptionPtrIsSet_
private
edm::propagate_const<std::shared_ptr<eventsetup::EventSetupProvider> > edm::EventProcessor::esp_
private
edm::propagate_const<std::unique_ptr<eventsetup::EventSetupsController> > edm::EventProcessor::espController_
private

Definition at line 301 of file EventProcessor.h.

Referenced by beginLumiAsync(), beginRun(), endRun(), init(), and ~EventProcessor().

ExcludedDataMap edm::EventProcessor::eventSetupDataToExcludeFromPrefetching_
private

Definition at line 346 of file EventProcessor.h.

std::string edm::EventProcessor::exceptionMessageFiles_
private

Definition at line 331 of file EventProcessor.h.

Referenced by runToCompletion(), and setExceptionMessageFiles().

std::string edm::EventProcessor::exceptionMessageLumis_
private

Definition at line 333 of file EventProcessor.h.

Referenced by runToCompletion(), and setExceptionMessageLumis().

std::string edm::EventProcessor::exceptionMessageRuns_
private

Definition at line 332 of file EventProcessor.h.

Referenced by runToCompletion(), and setExceptionMessageRuns().

edm::propagate_const<std::unique_ptr<FileBlock> > edm::EventProcessor::fb_
private
bool edm::EventProcessor::fileModeNoMerge_
private

Definition at line 330 of file EventProcessor.h.

Referenced by init(), and runToCompletion().

bool edm::EventProcessor::firstEventInBlock_ =true
private

Definition at line 342 of file EventProcessor.h.

bool edm::EventProcessor::forceESCacheClearOnNewRun_
private

Definition at line 336 of file EventProcessor.h.

Referenced by beginRun(), and init().

bool edm::EventProcessor::forceLooperToEnd_
private

Definition at line 334 of file EventProcessor.h.

Referenced by endOfLoop().

edm::propagate_const<std::unique_ptr<HistoryAppender> > edm::EventProcessor::historyAppender_
private

Definition at line 316 of file EventProcessor.h.

Referenced by init(), readLuminosityBlock(), and readRun().

edm::propagate_const<std::unique_ptr<InputSource> > edm::EventProcessor::input_
private
edm::SerialTaskQueue edm::EventProcessor::iovQueue_
private

Definition at line 303 of file EventProcessor.h.

Referenced by beginLumiAsync(), and globalEndLumiAsync().

InputSource::ItemType edm::EventProcessor::lastSourceTransition_
private

Definition at line 300 of file EventProcessor.h.

Referenced by nextTransitionType(), and processEventWithLooper().

edm::propagate_const<std::shared_ptr<EDLooperBase> > edm::EventProcessor::looper_
private
bool edm::EventProcessor::looperBeginJobRun_
private

Definition at line 335 of file EventProcessor.h.

Referenced by beginRun(), and startingNewLoop().

std::unique_ptr<edm::LimitedTaskQueue> edm::EventProcessor::lumiQueue_
private

Definition at line 311 of file EventProcessor.h.

Referenced by beginLumiAsync(), and init().

MergeableRunProductProcesses edm::EventProcessor::mergeableRunProductProcesses_
private

Definition at line 308 of file EventProcessor.h.

Referenced by init(), and readRun().

PathsAndConsumesOfModules edm::EventProcessor::pathsAndConsumesOfModules_
private

Definition at line 307 of file EventProcessor.h.

Referenced by beginJob().

PreallocationConfiguration edm::EventProcessor::preallocations_
private
edm::propagate_const<std::shared_ptr<ProductRegistry> > edm::EventProcessor::preg_
private

Definition at line 295 of file EventProcessor.h.

Referenced by beginJob(), endOfLoop(), init(), and readFile().

PrincipalCache edm::EventProcessor::principalCache_
private
bool edm::EventProcessor::printDependencies_ = false
private

Definition at line 348 of file EventProcessor.h.

Referenced by beginJob(), and init().

std::shared_ptr<ProcessConfiguration const> edm::EventProcessor::processConfiguration_
private

Definition at line 305 of file EventProcessor.h.

Referenced by beginJob(), init(), and readRun().

ProcessContext edm::EventProcessor::processContext_
private
edm::propagate_const<std::unique_ptr<Schedule> > edm::EventProcessor::schedule_
private
ServiceToken edm::EventProcessor::serviceToken_
private
bool edm::EventProcessor::shouldWeStop_
private

Definition at line 329 of file EventProcessor.h.

Referenced by processEventWithLooper(), shouldWeStop(), and startingNewLoop().

std::shared_ptr<std::recursive_mutex> edm::EventProcessor::sourceMutex_
private

Definition at line 326 of file EventProcessor.h.

Referenced by readNextEventForStream().

SharedResourcesAcquirer edm::EventProcessor::sourceResourcesAcquirer_
private

Definition at line 325 of file EventProcessor.h.

Referenced by beginLumiAsync(), and handleNextEventForStreamAsync().

std::atomic<unsigned int> edm::EventProcessor::streamLumiActive_ {0}
private
std::vector<std::shared_ptr<LuminosityBlockProcessingStatus> > edm::EventProcessor::streamLumiStatus_
private
std::vector<edm::SerialTaskQueue> edm::EventProcessor::streamQueues_
private

Definition at line 310 of file EventProcessor.h.

Referenced by beginLumiAsync(), init(), and streamEndLumiAsync().

std::vector<SubProcess> edm::EventProcessor::subProcesses_
private
edm::propagate_const<std::shared_ptr<ThinnedAssociationsHelper> > edm::EventProcessor::thinnedAssociationsHelper_
private

Definition at line 297 of file EventProcessor.h.

Referenced by init().