CMS 3D CMS Logo

List of all members | Public Types | Public Member Functions | Private Types | Private Member Functions | Private Attributes
edm::EventProcessor Class Reference

#include <EventProcessor.h>

Public Types

enum  StatusCode {
  epSuccess = 0, epException = 1, epOther = 2, epSignal = 3,
  epInputComplete = 4, epTimedOut = 5, epCountComplete = 6
}
 

Public Member Functions

void beginJob ()
 
void beginLumiAsync (edm::IOVSyncValue const &iSyncValue, std::shared_ptr< void > const &iRunResource, edm::WaitingTaskHolder iHolder)
 
void beginRun (ProcessHistoryID const &phid, RunNumber_t run, bool &globalBeginSucceeded, bool &eventSetupForInstanceSucceeded)
 
void clearCounters ()
 Clears counters used by trigger report. More...
 
void closeInputFile (bool cleaningUpAfterException)
 
void closeOutputFiles ()
 
void continueLumiAsync (edm::WaitingTaskHolder iHolder)
 
void deleteLumiFromCache (LuminosityBlockProcessingStatus &)
 
void deleteRunFromCache (ProcessHistoryID const &phid, RunNumber_t run)
 
void doErrorStuff ()
 
void enableEndPaths (bool active)
 
void endJob ()
 
bool endOfLoop ()
 
bool endPathsEnabled () const
 
void endRun (ProcessHistoryID const &phid, RunNumber_t run, bool globalBeginSucceeded, bool cleaningUpAfterException)
 
void endUnfinishedLumi ()
 
void endUnfinishedRun (ProcessHistoryID const &phid, RunNumber_t run, bool globalBeginSucceeded, bool cleaningUpAfterException, bool eventSetupForInstanceSucceeded)
 
 EventProcessor (std::unique_ptr< ParameterSet > parameterSet, ServiceToken const &token=ServiceToken(), serviceregistry::ServiceLegacy=serviceregistry::kOverlapIsError, std::vector< std::string > const &defaultServices=std::vector< std::string >(), std::vector< std::string > const &forcedServices=std::vector< std::string >())
 
 EventProcessor (std::unique_ptr< ParameterSet > parameterSet, std::vector< std::string > const &defaultServices, std::vector< std::string > const &forcedServices=std::vector< std::string >())
 
 EventProcessor (std::shared_ptr< ProcessDesc > processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy legacy)
 
 EventProcessor (EventProcessor const &)=delete
 
std::vector< ModuleDescription const * > getAllModuleDescriptions () const
 
ServiceToken getToken ()
 
void getTriggerReport (TriggerReport &rep) const
 
void globalEndLumiAsync (edm::WaitingTaskHolder iTask, std::shared_ptr< LuminosityBlockProcessingStatus > iLumiStatus)
 
void handleEndLumiExceptions (std::exception_ptr const *iPtr, WaitingTaskHolder &holder)
 
InputSource::ItemType lastTransitionType () const
 
edm::LuminosityBlockNumber_t nextLuminosityBlockID ()
 
std::pair< edm::ProcessHistoryID, edm::RunNumber_tnextRunID ()
 
InputSource::ItemType nextTransitionType ()
 
void openOutputFiles ()
 
EventProcessoroperator= (EventProcessor const &)=delete
 
void prepareForNextLoop ()
 
ProcessConfiguration const & processConfiguration () const
 
InputSource::ItemType processLumis (std::shared_ptr< void > const &iRunResource)
 
int readAndMergeLumi (LuminosityBlockProcessingStatus &)
 
std::pair< ProcessHistoryID, RunNumber_treadAndMergeRun ()
 
void readFile ()
 
void readLuminosityBlock (LuminosityBlockProcessingStatus &)
 
std::pair< ProcessHistoryID, RunNumber_treadRun ()
 
void respondToCloseInputFile ()
 
void respondToOpenInputFile ()
 
void rewindInput ()
 
StatusCode run ()
 
StatusCode runToCompletion ()
 
bool setDeferredException (std::exception_ptr)
 
void setExceptionMessageFiles (std::string &message)
 
void setExceptionMessageLumis ()
 
void setExceptionMessageRuns (std::string &message)
 
bool shouldWeCloseOutput () const
 
bool shouldWeStop () const
 
void startingNewLoop ()
 
void streamEndLumiAsync (edm::WaitingTaskHolder iTask, unsigned int iStreamIndex, std::shared_ptr< LuminosityBlockProcessingStatus > iLumiStatus)
 
int totalEvents () const
 
int totalEventsFailed () const
 
int totalEventsPassed () const
 
void writeLumiAsync (WaitingTaskHolder, LuminosityBlockPrincipal &lumiPrincipal)
 
void writeRunAsync (WaitingTaskHolder, ProcessHistoryID const &phid, RunNumber_t run, MergeableRunProductMetadata const *)
 
 ~EventProcessor ()
 

Private Types

typedef std::set< std::pair< std::string, std::string > > ExcludedData
 
typedef std::map< std::string, ExcludedDataExcludedDataMap
 

Private Member Functions

std::shared_ptr< BranchIDListHelper const > branchIDListHelper () const
 
std::shared_ptr< BranchIDListHelper > & branchIDListHelper ()
 
bool checkForAsyncStopRequest (StatusCode &)
 
void handleNextEventForStreamAsync (WaitingTaskHolder iTask, unsigned int iStreamIndex)
 
void init (std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
 
std::shared_ptr< EDLooperBase const > looper () const
 
std::shared_ptr< EDLooperBase > & looper ()
 
std::shared_ptr< ProductRegistry const > preg () const
 
std::shared_ptr< ProductRegistry > & preg ()
 
void processEventAsync (WaitingTaskHolder iHolder, unsigned int iStreamIndex)
 
void processEventAsyncImpl (WaitingTaskHolder iHolder, unsigned int iStreamIndex)
 
void processEventWithLooper (EventPrincipal &)
 
void readEvent (unsigned int iStreamIndex)
 
bool readNextEventForStream (unsigned int iStreamIndex, LuminosityBlockProcessingStatus &iLumiStatus)
 
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper () const
 
std::shared_ptr< ThinnedAssociationsHelper > & thinnedAssociationsHelper ()
 
void warnAboutModulesRequiringLuminosityBLockSynchronization () const
 

Private Attributes

std::unique_ptr< ExceptionToActionTable const > act_table_
 
std::shared_ptr< ActivityRegistryactReg_
 
bool asyncStopRequestedWhileProcessingEvents_
 
StatusCode asyncStopStatusCodeFromProcessingEvents_
 
bool beginJobCalled_
 
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
 
std::exception_ptr deferredExceptionPtr_
 
std::atomic< bool > deferredExceptionPtrIsSet_
 
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
 
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
 
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
 
std::string exceptionMessageFiles_
 
std::atomic< bool > exceptionMessageLumis_
 
std::string exceptionMessageRuns_
 
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
 
bool fileModeNoMerge_
 
bool firstEventInBlock_ = true
 
bool forceESCacheClearOnNewRun_
 
bool forceLooperToEnd_
 
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
 
edm::propagate_const< std::unique_ptr< InputSource > > input_
 
edm::SerialTaskQueue iovQueue_
 
InputSource::ItemType lastSourceTransition_
 
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
 
bool looperBeginJobRun_
 
std::unique_ptr< edm::LimitedTaskQueuelumiQueue_
 
MergeableRunProductProcesses mergeableRunProductProcesses_
 
PathsAndConsumesOfModules pathsAndConsumesOfModules_
 
PreallocationConfiguration preallocations_
 
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
 
PrincipalCache principalCache_
 
bool printDependencies_ = false
 
std::shared_ptr< ProcessConfiguration const > processConfiguration_
 
ProcessContext processContext_
 
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
 
ServiceToken serviceToken_
 
bool shouldWeStop_
 
std::shared_ptr< std::recursive_mutex > sourceMutex_
 
SharedResourcesAcquirer sourceResourcesAcquirer_
 
std::atomic< unsigned int > streamLumiActive_ {0}
 
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
 
std::vector< edm::SerialTaskQueuestreamQueues_
 
std::vector< SubProcesssubProcesses_
 
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
 

Detailed Description

Definition at line 64 of file EventProcessor.h.

Member Typedef Documentation

typedef std::set<std::pair<std::string, std::string> > edm::EventProcessor::ExcludedData
private

Definition at line 360 of file EventProcessor.h.

typedef std::map<std::string, ExcludedData> edm::EventProcessor::ExcludedDataMap
private

Definition at line 361 of file EventProcessor.h.

Member Enumeration Documentation

Enumerator
epSuccess 
epException 
epOther 
epSignal 
epInputComplete 
epTimedOut 
epCountComplete 

Definition at line 74 of file EventProcessor.h.

Constructor & Destructor Documentation

edm::EventProcessor::EventProcessor ( std::unique_ptr< ParameterSet parameterSet,
ServiceToken const &  token = ServiceToken(),
serviceregistry::ServiceLegacy  iLegacy = serviceregistry::kOverlapIsError,
std::vector< std::string > const &  defaultServices = std::vector<std::string>(),
std::vector< std::string > const &  forcedServices = std::vector<std::string>() 
)
explicit

Definition at line 213 of file EventProcessor.cc.

References init(), and eostools::move().

218  : actReg_(),
219  preg_(),
221  serviceToken_(),
222  input_(),
223  espController_(new eventsetup::EventSetupsController),
224  esp_(),
225  act_table_(),
227  schedule_(),
228  subProcesses_(),
229  historyAppender_(new HistoryAppender),
230  fb_(),
231  looper_(),
233  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
234  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
235  principalCache_(),
236  beginJobCalled_(false),
237  shouldWeStop_(false),
238  fileModeNoMerge_(false),
241  exceptionMessageLumis_(false),
242  forceLooperToEnd_(false),
243  looperBeginJobRun_(false),
246  auto processDesc = std::make_shared<ProcessDesc>(std::move(parameterSet));
247  processDesc->addServices(defaultServices, forcedServices);
248  init(processDesc, iToken, iLegacy);
249  }
std::atomic< bool > exceptionMessageLumis_
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
SharedResourcesAcquirer sourceResourcesAcquirer_
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
std::string exceptionMessageRuns_
std::vector< SubProcess > subProcesses_
U second(std::pair< T, U > const &p)
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
static SharedResourcesRegistry * instance()
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
std::string exceptionMessageFiles_
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
T first(std::pair< T, U > const &p)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
ParameterSet const & parameterSet(Provenance const &provenance)
Definition: Provenance.cc:11
def move(src, dest)
Definition: eostools.py:511
PrincipalCache principalCache_
edm::EventProcessor::EventProcessor ( std::unique_ptr< ParameterSet parameterSet,
std::vector< std::string > const &  defaultServices,
std::vector< std::string > const &  forcedServices = std::vector<std::string>() 
)

Definition at line 251 of file EventProcessor.cc.

References init(), edm::serviceregistry::kOverlapIsError, and eostools::move().

254  : actReg_(),
255  preg_(),
257  serviceToken_(),
258  input_(),
259  espController_(new eventsetup::EventSetupsController),
260  esp_(),
261  act_table_(),
263  schedule_(),
264  subProcesses_(),
265  historyAppender_(new HistoryAppender),
266  fb_(),
267  looper_(),
269  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
270  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
271  principalCache_(),
272  beginJobCalled_(false),
273  shouldWeStop_(false),
274  fileModeNoMerge_(false),
277  exceptionMessageLumis_(false),
278  forceLooperToEnd_(false),
279  looperBeginJobRun_(false),
283  auto processDesc = std::make_shared<ProcessDesc>(std::move(parameterSet));
284  processDesc->addServices(defaultServices, forcedServices);
286  }
std::atomic< bool > exceptionMessageLumis_
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
SharedResourcesAcquirer sourceResourcesAcquirer_
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
std::string exceptionMessageRuns_
std::vector< SubProcess > subProcesses_
U second(std::pair< T, U > const &p)
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
static SharedResourcesRegistry * instance()
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
std::string exceptionMessageFiles_
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
bool asyncStopRequestedWhileProcessingEvents_
std::shared_ptr< ActivityRegistry > actReg_
T first(std::pair< T, U > const &p)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
ParameterSet const & parameterSet(Provenance const &provenance)
Definition: Provenance.cc:11
def move(src, dest)
Definition: eostools.py:511
PrincipalCache principalCache_
edm::EventProcessor::EventProcessor ( std::shared_ptr< ProcessDesc processDesc,
ServiceToken const &  token,
serviceregistry::ServiceLegacy  legacy 
)

Definition at line 288 of file EventProcessor.cc.

References init().

291  : actReg_(),
292  preg_(),
294  serviceToken_(),
295  input_(),
296  espController_(new eventsetup::EventSetupsController),
297  esp_(),
298  act_table_(),
300  schedule_(),
301  subProcesses_(),
302  historyAppender_(new HistoryAppender),
303  fb_(),
304  looper_(),
306  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
307  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
308  principalCache_(),
309  beginJobCalled_(false),
310  shouldWeStop_(false),
311  fileModeNoMerge_(false),
314  exceptionMessageLumis_(false),
315  forceLooperToEnd_(false),
316  looperBeginJobRun_(false),
320  init(processDesc, token, legacy);
321  }
std::atomic< bool > exceptionMessageLumis_
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
SharedResourcesAcquirer sourceResourcesAcquirer_
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
std::string exceptionMessageRuns_
std::vector< SubProcess > subProcesses_
U second(std::pair< T, U > const &p)
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
static SharedResourcesRegistry * instance()
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
std::string exceptionMessageFiles_
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
bool asyncStopRequestedWhileProcessingEvents_
std::shared_ptr< ActivityRegistry > actReg_
T first(std::pair< T, U > const &p)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
PrincipalCache principalCache_
edm::EventProcessor::~EventProcessor ( )

Definition at line 500 of file EventProcessor.cc.

References actReg_, edm::ParentageRegistry::clear(), edm::pset::Registry::clear(), esp_, espController_, getToken(), input_, edm::ParentageRegistry::instance(), edm::pset::Registry::instance(), looper_, and schedule_.

500  {
501  // Make the services available while everything is being deleted.
502  ServiceToken token = getToken();
503  ServiceRegistry::Operate op(token);
504 
505  // manually destroy all these thing that may need the services around
506  // propagate_const<T> has no reset() function
507  espController_ = nullptr;
508  esp_ = nullptr;
509  schedule_ = nullptr;
510  input_ = nullptr;
511  looper_ = nullptr;
512  actReg_ = nullptr;
513 
516  }
void clear()
Not thread safe.
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
void clear()
Not thread safe.
Definition: Registry.cc:40
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
ServiceToken getToken()
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
static ParentageRegistry * instance()
static Registry * instance()
Definition: Registry.cc:12
edm::EventProcessor::EventProcessor ( EventProcessor const &  )
delete

Member Function Documentation

void edm::EventProcessor::beginJob ( void  )

This should be called before the first call to 'run' If this is not called in time, it will automatically be called the first time 'run' is called

Definition at line 518 of file EventProcessor.cc.

References actReg_, cms::Exception::addContext(), bk::beginJob(), beginJobCalled_, edm::checkForModuleDependencyCorrectness(), esp_, espController_, edm::for_all(), mps_fire::i, edm::PathsAndConsumesOfModules::initialize(), input_, edm::PreallocationConfiguration::numberOfLuminosityBlocks(), edm::PreallocationConfiguration::numberOfRuns(), edm::PreallocationConfiguration::numberOfStreams(), edm::PreallocationConfiguration::numberOfThreads(), cmsPerfStripChart::operate(), pathsAndConsumesOfModules_, preallocations_, preg(), preg_, printDependencies_, processConfiguration_, processContext_, schedule_, serviceToken_, subProcesses_, warnAboutModulesRequiringLuminosityBLockSynchronization(), and edm::convertException::wrap().

Referenced by runToCompletion().

518  {
519  if (beginJobCalled_)
520  return;
521  beginJobCalled_ = true;
522  bk::beginJob();
523 
524  // StateSentry toerror(this); // should we add this ?
525  //make the services available
527 
528  service::SystemBounds bounds(preallocations_.numberOfStreams(),
532  actReg_->preallocateSignal_(bounds);
533  schedule_->convertCurrentProcessAlias(processConfiguration_->processName());
535 
536  //NOTE: this may throw
538  actReg_->preBeginJobSignal_(pathsAndConsumesOfModules_, processContext_);
539 
542  }
543  //NOTE: This implementation assumes 'Job' means one call
544  // the EventProcessor::run
545  // If it really means once per 'application' then this code will
546  // have to be changed.
547  // Also have to deal with case where have 'run' then new Module
548  // added and do 'run'
549  // again. In that case the newly added Module needs its 'beginJob'
550  // to be called.
551 
552  //NOTE: in future we should have a beginOfJob for looper that takes no arguments
553  // For now we delay calling beginOfJob until first beginOfRun
554  //if(looper_) {
555  // looper_->beginOfJob(es);
556  //}
557  try {
558  convertException::wrap([&]() { input_->doBeginJob(); });
559  } catch (cms::Exception& ex) {
560  ex.addContext("Calling beginJob for the source");
561  throw;
562  }
563  espController_->finishConfiguration();
564  schedule_->beginJob(*preg_, esp_->recordsToProxyIndices());
565  // toerror.succeeded(); // should we add this?
566  for_all(subProcesses_, [](auto& subProcess) { subProcess.doBeginJob(); });
567  actReg_->postBeginJobSignal_();
568 
569  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
570  schedule_->beginStream(i);
571  for_all(subProcesses_, [i](auto& subProcess) { subProcess.doBeginStream(i); });
572  }
573  }
ProcessContext processContext_
edm::propagate_const< std::unique_ptr< InputSource > > input_
void warnAboutModulesRequiringLuminosityBLockSynchronization() const
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
PreallocationConfiguration preallocations_
std::vector< SubProcess > subProcesses_
void beginJob()
Definition: Breakpoints.cc:14
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
void initialize(Schedule const *, std::shared_ptr< ProductRegistry const >)
ServiceToken serviceToken_
std::shared_ptr< ProductRegistry const > preg() const
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
void checkForModuleDependencyCorrectness(edm::PathsAndConsumesOfModulesBase const &iPnC, bool iPrintDependencies)
void addContext(std::string const &context)
Definition: Exception.cc:165
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
PathsAndConsumesOfModules pathsAndConsumesOfModules_
auto wrap(F iFunc) -> decltype(iFunc())
std::shared_ptr< ActivityRegistry > actReg_
def operate(timelog, memlog, json_f, num)
void edm::EventProcessor::beginLumiAsync ( edm::IOVSyncValue const &  iSyncValue,
std::shared_ptr< void > const &  iRunResource,
edm::WaitingTaskHolder  iHolder 
)

Definition at line 1047 of file EventProcessor.cc.

References actReg_, edm::LuminosityBlockPrincipal::beginTime(), edm::WaitingTaskHolder::doneWaiting(), esp_, espController_, edm::PrincipalCache::eventPrincipal(), h, handleNextEventForStreamAsync(), mps_fire::i, input_, iovQueue_, edm::Service< T >::isAvailable(), looper_, edm::LuminosityBlockPrincipal::luminosityBlock(), lumiQueue_, edm::make_waiting_task(), eostools::move(), edm::PreallocationConfiguration::numberOfStreams(), cmsPerfStripChart::operate(), edm::SerialTaskQueue::pause(), preallocations_, principalCache_, processContext_, edm::SerialTaskQueueChain::push(), edm::SerialTaskQueue::push(), readLuminosityBlock(), edm::LuminosityBlockPrincipal::run(), schedule_, edm::SharedResourcesAcquirer::serialQueueChain(), serviceToken_, sourceResourcesAcquirer_, mps_update::status, streamEndLumiAsync(), streamLumiActive_, streamLumiStatus_, streamQueues_, subProcesses_, edm::WaitingTaskHolder::taskHasFailed(), and tmp.

Referenced by handleNextEventForStreamAsync(), and processLumis().

1049  {
1050  if (iHolder.taskHasFailed()) {
1051  return;
1052  }
1053 
1054  // We must be careful with the status object here and in code this function calls. IF we want
1055  // endRun to be called, then the status object must be destroyed before the things waiting on
1056  // iHolder are allowed to proceed. Otherwise, there will be race condition (possibly causing
1057  // endRun to be called much later than it should be, because it is holding iRunResource).
1058  auto status =
1059  std::make_shared<LuminosityBlockProcessingStatus>(this, preallocations_.numberOfStreams(), iRunResource);
1060 
1061  auto lumiWork = [this, iHolder, status = std::move(status)](edm::LimitedTaskQueue::Resumer iResumer) mutable {
1062  if (iHolder.taskHasFailed()) {
1063  status.reset();
1064  return;
1065  }
1066 
1067  status->setResumer(std::move(iResumer));
1068 
1069  sourceResourcesAcquirer_.serialQueueChain().push([this, iHolder, status = std::move(status)]() mutable {
1070  //make the services available
1072 
1073  try {
1074  readLuminosityBlock(*status);
1075 
1076  LuminosityBlockPrincipal& lumiPrincipal = *status->lumiPrincipal();
1077  {
1078  SendSourceTerminationSignalIfException sentry(actReg_.get());
1079 
1080  input_->doBeginLumi(lumiPrincipal, &processContext_);
1081  sentry.completedSuccessfully();
1082  }
1083 
1085  if (rng.isAvailable()) {
1086  LuminosityBlock lb(lumiPrincipal, ModuleDescription(), nullptr, false);
1087  rng->preBeginLumi(lb);
1088  }
1089 
1090  IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), 0), lumiPrincipal.beginTime());
1091 
1092  //Task to start the stream beginLumis
1093  auto beginStreamsTask = make_waiting_task(
1094  tbb::task::allocate_root(),
1095  [this, holder = iHolder, status = std::move(status), ts](std::exception_ptr const* iPtr) mutable {
1096  if (iPtr) {
1097  status.reset();
1098  holder.doneWaiting(*iPtr);
1099  } else {
1100  status->globalBeginDidSucceed();
1101  auto const& es = esp_->eventSetup();
1102  if (looper_) {
1103  try {
1104  //make the services available
1106  looper_->doBeginLuminosityBlock(*(status->lumiPrincipal()), es, &processContext_);
1107  } catch (...) {
1108  status.reset();
1109  holder.doneWaiting(std::current_exception());
1110  return;
1111  }
1112  }
1113  typedef OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamBegin> Traits;
1114 
1115  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1116  streamQueues_[i].push([this, i, status, holder, ts, &es]() mutable {
1117  streamQueues_[i].pause();
1118 
1119  auto eventTask = edm::make_waiting_task(
1120  tbb::task::allocate_root(), [this, i, h = holder](std::exception_ptr const* iPtr) mutable {
1121  if (iPtr) {
1123  tmp.doneWaiting(*iPtr);
1125  } else {
1127  }
1128  });
1129  auto& event = principalCache_.eventPrincipal(i);
1132  auto lp = status->lumiPrincipal();
1133  event.setLuminosityBlockPrincipal(lp.get());
1134  beginStreamTransitionAsync<Traits>(
1135  WaitingTaskHolder{eventTask}, *schedule_, i, *lp, ts, es, serviceToken_, subProcesses_);
1136  status.reset();
1137  });
1138  }
1139  status.reset();
1140  }
1141  }); // beginStreamTask
1142 
1143  //task to start the global begin lumi
1144  WaitingTaskHolder beginStreamsHolder{beginStreamsTask};
1145  auto const& es = esp_->eventSetup();
1146  {
1147  typedef OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalBegin> Traits;
1148  beginGlobalTransitionAsync<Traits>(
1149  beginStreamsHolder, *schedule_, lumiPrincipal, ts, es, serviceToken_, subProcesses_);
1150  }
1151  } catch (...) {
1152  status.reset();
1153  iHolder.doneWaiting(std::current_exception());
1154  }
1155  }); // task in sourceResourcesAcquirer
1156  }; // end lumiWork
1157 
1158  //Safe to do check now since can not have multiple beginLumis at same time in this part of the code
1159  // because we do not attempt to read from the source again until we try to get the first event in a lumi
1160  if (espController_->isWithinValidityInterval(iSync)) {
1161  iovQueue_.pause();
1162  lumiQueue_->pushAndPause(std::move(lumiWork));
1163  } else {
1164  //If EventSetup fails, need beginStreamsHolder in order to pass back exception
1165  iovQueue_.push([this, iHolder, lumiWork, iSync]() mutable {
1166  try {
1167  SendSourceTerminationSignalIfException sentry(actReg_.get());
1168  espController_->eventSetupForInstance(iSync);
1169  sentry.completedSuccessfully();
1170  } catch (...) {
1171  iHolder.doneWaiting(std::current_exception());
1172  return;
1173  }
1174  iovQueue_.pause();
1175  lumiQueue_->pushAndPause(std::move(lumiWork));
1176  });
1177  }
1178  }
void readLuminosityBlock(LuminosityBlockProcessingStatus &)
ProcessContext processContext_
SharedResourcesAcquirer sourceResourcesAcquirer_
FWCore Framework interface EventSetupRecordImplementation h
Helper function to determine trigger accepts.
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< edm::LimitedTaskQueue > lumiQueue_
void streamEndLumiAsync(edm::WaitingTaskHolder iTask, unsigned int iStreamIndex, std::shared_ptr< LuminosityBlockProcessingStatus > iLumiStatus)
void push(T &&iAction)
asynchronously pushes functor iAction into queue
PreallocationConfiguration preallocations_
std::vector< SubProcess > subProcesses_
void handleNextEventForStreamAsync(WaitingTaskHolder iTask, unsigned int iStreamIndex)
ServiceToken serviceToken_
void doneWaiting(std::exception_ptr iExcept)
std::vector< edm::SerialTaskQueue > streamQueues_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
SerialTaskQueueChain & serialQueueChain() const
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:87
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
std::vector< std::vector< double > > tmp
Definition: MVATrainer.cc:100
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
void push(const T &iAction)
asynchronously pushes functor iAction into queue
edm::SerialTaskQueue iovQueue_
std::shared_ptr< ActivityRegistry > actReg_
std::atomic< unsigned int > streamLumiActive_
bool pause()
Pauses processing of additional tasks from the queue.
def move(src, dest)
Definition: eostools.py:511
PrincipalCache principalCache_
def operate(timelog, memlog, json_f, num)
void edm::EventProcessor::beginRun ( ProcessHistoryID const &  phid,
RunNumber_t  run,
bool &  globalBeginSucceeded,
bool &  eventSetupForInstanceSucceeded 
)

Definition at line 861 of file EventProcessor.cc.

References actReg_, edm::RunPrincipal::beginTime(), esp_, espController_, FDEBUG, forceESCacheClearOnNewRun_, input_, looper_, looperBeginJobRun_, edm::make_empty_waiting_task(), edm::PreallocationConfiguration::numberOfStreams(), preallocations_, principalCache_, processContext_, edm::RunPrincipal::run(), edm::PrincipalCache::runPrincipal(), schedule_, serviceToken_, and subProcesses_.

864  {
865  globalBeginSucceeded = false;
866  RunPrincipal& runPrincipal = principalCache_.runPrincipal(phid, run);
867  {
868  SendSourceTerminationSignalIfException sentry(actReg_.get());
869 
870  input_->doBeginRun(runPrincipal, &processContext_);
871  sentry.completedSuccessfully();
872  }
873 
874  IOVSyncValue ts(EventID(runPrincipal.run(), 0, 0), runPrincipal.beginTime());
876  espController_->forceCacheClear();
877  }
878  {
879  SendSourceTerminationSignalIfException sentry(actReg_.get());
880  espController_->eventSetupForInstance(ts);
881  eventSetupForInstanceSucceeded = true;
882  sentry.completedSuccessfully();
883  }
884  auto const& es = esp_->eventSetup();
885  if (looper_ && looperBeginJobRun_ == false) {
886  looper_->copyInfo(ScheduleInfo(schedule_.get()));
887  looper_->beginOfJob(es);
888  looperBeginJobRun_ = true;
889  looper_->doStartingNewLoop();
890  }
891  {
892  typedef OccurrenceTraits<RunPrincipal, BranchActionGlobalBegin> Traits;
893  auto globalWaitTask = make_empty_waiting_task();
894  globalWaitTask->increment_ref_count();
895  beginGlobalTransitionAsync<Traits>(
896  WaitingTaskHolder(globalWaitTask.get()), *schedule_, runPrincipal, ts, es, serviceToken_, subProcesses_);
897  globalWaitTask->wait_for_all();
898  if (globalWaitTask->exceptionPtr() != nullptr) {
899  std::rethrow_exception(*(globalWaitTask->exceptionPtr()));
900  }
901  }
902  globalBeginSucceeded = true;
903  FDEBUG(1) << "\tbeginRun " << run << "\n";
904  if (looper_) {
905  looper_->doBeginRun(runPrincipal, es, &processContext_);
906  }
907  {
908  //To wait, the ref count has to be 1+#streams
909  auto streamLoopWaitTask = make_empty_waiting_task();
910  streamLoopWaitTask->increment_ref_count();
911 
912  typedef OccurrenceTraits<RunPrincipal, BranchActionStreamBegin> Traits;
913 
914  beginStreamsTransitionAsync<Traits>(streamLoopWaitTask.get(),
915  *schedule_,
917  runPrincipal,
918  ts,
919  es,
921  subProcesses_);
922 
923  streamLoopWaitTask->wait_for_all();
924  if (streamLoopWaitTask->exceptionPtr() != nullptr) {
925  std::rethrow_exception(*(streamLoopWaitTask->exceptionPtr()));
926  }
927  }
928  FDEBUG(1) << "\tstreamBeginRun " << run << "\n";
929  if (looper_) {
930  //looper_->doStreamBeginRun(schedule_->streamID(),runPrincipal, es);
931  }
932  }
ProcessContext processContext_
edm::propagate_const< std::unique_ptr< InputSource > > input_
PreallocationConfiguration preallocations_
std::vector< SubProcess > subProcesses_
ServiceToken serviceToken_
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
#define FDEBUG(lev)
Definition: DebugMacros.h:19
PrincipalCache principalCache_
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const
std::shared_ptr<BranchIDListHelper const> edm::EventProcessor::branchIDListHelper ( ) const
inlineprivate

Definition at line 289 of file EventProcessor.h.

References edm::get_underlying_safe().

Referenced by init().

289  {
291  }
std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
std::shared_ptr<BranchIDListHelper>& edm::EventProcessor::branchIDListHelper ( )
inlineprivate

Definition at line 292 of file EventProcessor.h.

References edm::get_underlying_safe().

std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
bool edm::EventProcessor::checkForAsyncStopRequest ( StatusCode returnCode)
private

Definition at line 630 of file EventProcessor.cc.

References epSignal, and edm::shutdown_flag.

Referenced by nextTransitionType().

630  {
631  bool returnValue = false;
632 
633  // Look for a shutdown signal
634  if (shutdown_flag.load(std::memory_order_acquire)) {
635  returnValue = true;
637  }
638  return returnValue;
639  }
volatile std::atomic< bool > shutdown_flag
void edm::EventProcessor::clearCounters ( )

Clears counters used by trigger report.

Definition at line 624 of file EventProcessor.cc.

References schedule_.

624 { schedule_->clearCounters(); }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
void edm::EventProcessor::closeInputFile ( bool  cleaningUpAfterException)

Definition at line 760 of file EventProcessor.cc.

References actReg_, fb_, FDEBUG, and input_.

760  {
761  if (fb_.get() != nullptr) {
762  SendSourceTerminationSignalIfException sentry(actReg_.get());
763  input_->closeFile(fb_.get(), cleaningUpAfterException);
764  sentry.completedSuccessfully();
765  }
766  FDEBUG(1) << "\tcloseInputFile\n";
767  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
std::shared_ptr< ActivityRegistry > actReg_
#define FDEBUG(lev)
Definition: DebugMacros.h:19
void edm::EventProcessor::closeOutputFiles ( )

Definition at line 777 of file EventProcessor.cc.

References fb_, FDEBUG, edm::for_all(), schedule_, and subProcesses_.

777  {
778  if (fb_.get() != nullptr) {
779  schedule_->closeOutputFiles();
780  for_all(subProcesses_, [](auto& subProcess) { subProcess.closeOutputFiles(); });
781  }
782  FDEBUG(1) << "\tcloseOutputFiles\n";
783  }
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
#define FDEBUG(lev)
Definition: DebugMacros.h:19
void edm::EventProcessor::continueLumiAsync ( edm::WaitingTaskHolder  iHolder)

Definition at line 1180 of file EventProcessor.cc.

References h, handleNextEventForStreamAsync(), edm::make_functor_task(), eostools::move(), edm::PreallocationConfiguration::numberOfStreams(), preallocations_, mps_update::status, and streamLumiStatus_.

Referenced by processLumis().

1180  {
1181  {
1182  //all streams are sharing the same status at the moment
1183  auto status = streamLumiStatus_[0]; //read from streamLumiActive_ happened in calling routine
1184  status->needToContinueLumi();
1185  status->startProcessingEvents();
1186  }
1187 
1188  unsigned int streamIndex = 0;
1189  for (; streamIndex < preallocations_.numberOfStreams() - 1; ++streamIndex) {
1190  tbb::task::enqueue(*edm::make_functor_task(tbb::task::allocate_root(), [this, streamIndex, h = iHolder]() {
1191  handleNextEventForStreamAsync(std::move(h), streamIndex);
1192  }));
1193  }
1194  tbb::task::spawn(*edm::make_functor_task(tbb::task::allocate_root(), [this, streamIndex, h = std::move(iHolder)]() {
1195  handleNextEventForStreamAsync(std::move(h), streamIndex);
1196  }));
1197  }
FWCore Framework interface EventSetupRecordImplementation h
Helper function to determine trigger accepts.
FunctorTask< F > * make_functor_task(ALLOC &&iAlloc, F f)
Definition: FunctorTask.h:47
PreallocationConfiguration preallocations_
void handleNextEventForStreamAsync(WaitingTaskHolder iTask, unsigned int iStreamIndex)
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
def move(src, dest)
Definition: eostools.py:511
void edm::EventProcessor::deleteLumiFromCache ( LuminosityBlockProcessingStatus iStatus)

Definition at line 1491 of file EventProcessor.cc.

References edm::LuminosityBlockProcessingStatus::lumiPrincipal(), alignCSCRings::s, and subProcesses_.

Referenced by globalEndLumiAsync().

1491  {
1492  for (auto& s : subProcesses_) {
1493  s.deleteLumiFromCache(*iStatus.lumiPrincipal());
1494  }
1495  iStatus.lumiPrincipal()->clearPrincipal();
1496  //FDEBUG(1) << "\tdeleteLumiFromCache " << run << "/" << lumi << "\n";
1497  }
std::vector< SubProcess > subProcesses_
void edm::EventProcessor::deleteRunFromCache ( ProcessHistoryID const &  phid,
RunNumber_t  run 
)

Definition at line 1466 of file EventProcessor.cc.

References edm::PrincipalCache::deleteRun(), FDEBUG, edm::for_all(), principalCache_, and subProcesses_.

Referenced by endUnfinishedRun().

1466  {
1467  principalCache_.deleteRun(phid, run);
1468  for_all(subProcesses_, [run, phid](auto& subProcess) { subProcess.deleteRunFromCache(phid, run); });
1469  FDEBUG(1) << "\tdeleteRunFromCache " << run << "\n";
1470  }
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
void deleteRun(ProcessHistoryID const &phid, RunNumber_t run)
#define FDEBUG(lev)
Definition: DebugMacros.h:19
PrincipalCache principalCache_
void edm::EventProcessor::doErrorStuff ( )

Definition at line 852 of file EventProcessor.cc.

References FDEBUG.

Referenced by runToCompletion().

852  {
853  FDEBUG(1) << "\tdoErrorStuff\n";
854  LogError("StateMachine") << "The EventProcessor state machine encountered an unexpected event\n"
855  << "and went to the error state\n"
856  << "Will attempt to terminate processing normally\n"
857  << "(IF using the looper the next loop will be attempted)\n"
858  << "This likely indicates a bug in an input module or corrupted input or both\n";
859  }
#define FDEBUG(lev)
Definition: DebugMacros.h:19
void edm::EventProcessor::enableEndPaths ( bool  active)

Turn end_paths "off" if "active" is false; turn end_paths "on" if "active" is true.

Definition at line 618 of file EventProcessor.cc.

References schedule_.

618 { schedule_->enableEndPaths(active); }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
void edm::EventProcessor::endJob ( void  )

This should be called before the EventProcessor is destroyed throws if any module's endJob throws an exception.

Definition at line 575 of file EventProcessor.cc.

References actReg_, EnergyCorrector::c, edm::ExceptionCollector::call(), edm::SubProcess::doEndJob(), edm::InputSource::doEndJob(), edm::EDLooperBase::endOfJob(), edm::ExceptionCollector::hasThrown(), mps_fire::i, input_, looper(), looper_, edm::PreallocationConfiguration::numberOfStreams(), cmsPerfStripChart::operate(), preallocations_, edm::ExceptionCollector::rethrow(), schedule_, serviceToken_, and subProcesses_.

Referenced by PythonEventProcessor::~PythonEventProcessor().

575  {
576  // Collects exceptions, so we don't throw before all operations are performed.
577  ExceptionCollector c(
578  "Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
579 
580  //make the services available
582 
583  //NOTE: this really should go elsewhere in the future
584  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
585  c.call([this, i]() { this->schedule_->endStream(i); });
586  for (auto& subProcess : subProcesses_) {
587  c.call([&subProcess, i]() { subProcess.doEndStream(i); });
588  }
589  }
590  auto actReg = actReg_.get();
591  c.call([actReg]() { actReg->preEndJobSignal_(); });
592  schedule_->endJob(c);
593  for (auto& subProcess : subProcesses_) {
594  c.call(std::bind(&SubProcess::doEndJob, &subProcess));
595  }
596  c.call(std::bind(&InputSource::doEndJob, input_.get()));
597  if (looper_) {
598  c.call(std::bind(&EDLooperBase::endOfJob, looper()));
599  }
600  c.call([actReg]() { actReg->postEndJobSignal_(); });
601  if (c.hasThrown()) {
602  c.rethrow();
603  }
604  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
void doEndJob()
Called by framework at end of job.
Definition: InputSource.cc:207
PreallocationConfiguration preallocations_
std::vector< SubProcess > subProcesses_
ServiceToken serviceToken_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
virtual void endOfJob()
Definition: EDLooperBase.cc:90
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ActivityRegistry > actReg_
std::shared_ptr< EDLooperBase const > looper() const
def operate(timelog, memlog, json_f, num)
bool edm::EventProcessor::endOfLoop ( )

Definition at line 813 of file EventProcessor.cc.

References esp_, FDEBUG, forceLooperToEnd_, edm::EDLooperBase::kContinue, looper_, preg_, schedule_, and mps_update::status.

Referenced by runToCompletion().

813  {
814  if (looper_) {
815  ModuleChanger changer(schedule_.get(), preg_.get(), esp_->recordsToProxyIndices());
816  looper_->setModuleChanger(&changer);
817  EDLooperBase::Status status = looper_->doEndOfLoop(esp_->eventSetup());
818  looper_->setModuleChanger(nullptr);
820  return true;
821  else
822  return false;
823  }
824  FDEBUG(1) << "\tendOfLoop\n";
825  return true;
826  }
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
#define FDEBUG(lev)
Definition: DebugMacros.h:19
bool edm::EventProcessor::endPathsEnabled ( ) const

Return true if end_paths are active, and false if they are inactive.

Definition at line 620 of file EventProcessor.cc.

References schedule_.

620 { return schedule_->endPathsEnabled(); }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
void edm::EventProcessor::endRun ( ProcessHistoryID const &  phid,
RunNumber_t  run,
bool  globalBeginSucceeded,
bool  cleaningUpAfterException 
)

Definition at line 960 of file EventProcessor.cc.

References actReg_, edm::RunPrincipal::endTime(), esp_, espController_, FDEBUG, input_, looper_, edm::make_empty_waiting_task(), edm::EventID::maxEventNumber(), edm::LuminosityBlockID::maxLuminosityBlockNumber(), edm::PreallocationConfiguration::numberOfStreams(), preallocations_, principalCache_, processContext_, edm::RunPrincipal::run(), edm::PrincipalCache::runPrincipal(), schedule_, serviceToken_, edm::RunPrincipal::setEndTime(), and subProcesses_.

Referenced by endUnfinishedRun().

963  {
964  RunPrincipal& runPrincipal = principalCache_.runPrincipal(phid, run);
965  runPrincipal.setEndTime(input_->timestamp());
966 
967  IOVSyncValue ts(
969  runPrincipal.endTime());
970  {
971  SendSourceTerminationSignalIfException sentry(actReg_.get());
972  espController_->eventSetupForInstance(ts);
973  sentry.completedSuccessfully();
974  }
975  auto const& es = esp_->eventSetup();
976  if (globalBeginSucceeded) {
977  //To wait, the ref count has to be 1+#streams
978  auto streamLoopWaitTask = make_empty_waiting_task();
979  streamLoopWaitTask->increment_ref_count();
980 
981  typedef OccurrenceTraits<RunPrincipal, BranchActionStreamEnd> Traits;
982 
983  endStreamsTransitionAsync<Traits>(WaitingTaskHolder(streamLoopWaitTask.get()),
984  *schedule_,
986  runPrincipal,
987  ts,
988  es,
991  cleaningUpAfterException);
992 
993  streamLoopWaitTask->wait_for_all();
994  if (streamLoopWaitTask->exceptionPtr() != nullptr) {
995  std::rethrow_exception(*(streamLoopWaitTask->exceptionPtr()));
996  }
997  }
998  FDEBUG(1) << "\tstreamEndRun " << run << "\n";
999  if (looper_) {
1000  //looper_->doStreamEndRun(schedule_->streamID(),runPrincipal, es);
1001  }
1002  {
1003  auto globalWaitTask = make_empty_waiting_task();
1004  globalWaitTask->increment_ref_count();
1005 
1006  typedef OccurrenceTraits<RunPrincipal, BranchActionGlobalEnd> Traits;
1007  endGlobalTransitionAsync<Traits>(WaitingTaskHolder(globalWaitTask.get()),
1008  *schedule_,
1009  runPrincipal,
1010  ts,
1011  es,
1012  serviceToken_,
1013  subProcesses_,
1014  cleaningUpAfterException);
1015  globalWaitTask->wait_for_all();
1016  if (globalWaitTask->exceptionPtr() != nullptr) {
1017  std::rethrow_exception(*(globalWaitTask->exceptionPtr()));
1018  }
1019  }
1020  FDEBUG(1) << "\tendRun " << run << "\n";
1021  if (looper_) {
1022  looper_->doEndRun(runPrincipal, es, &processContext_);
1023  }
1024  }
ProcessContext processContext_
edm::propagate_const< std::unique_ptr< InputSource > > input_
static LuminosityBlockNumber_t maxLuminosityBlockNumber()
PreallocationConfiguration preallocations_
std::vector< SubProcess > subProcesses_
void setEndTime(Timestamp const &time)
Definition: RunPrincipal.h:70
ServiceToken serviceToken_
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
static EventNumber_t maxEventNumber()
Definition: EventID.h:111
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
#define FDEBUG(lev)
Definition: DebugMacros.h:19
PrincipalCache principalCache_
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const
void edm::EventProcessor::endUnfinishedLumi ( )

Definition at line 1352 of file EventProcessor.cc.

References mps_fire::i, edm::make_empty_waiting_task(), edm::PreallocationConfiguration::numberOfStreams(), preallocations_, streamEndLumiAsync(), streamLumiActive_, and streamLumiStatus_.

1352  {
1353  if (streamLumiActive_.load() > 0) {
1354  auto globalWaitTask = make_empty_waiting_task();
1355  globalWaitTask->increment_ref_count();
1356  {
1357  WaitingTaskHolder globalTaskHolder{globalWaitTask.get()};
1358  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1359  if (streamLumiStatus_[i]) {
1360  streamEndLumiAsync(globalTaskHolder, i, streamLumiStatus_[i]);
1361  }
1362  }
1363  }
1364  globalWaitTask->wait_for_all();
1365  if (globalWaitTask->exceptionPtr() != nullptr) {
1366  std::rethrow_exception(*(globalWaitTask->exceptionPtr()));
1367  }
1368  }
1369  }
void streamEndLumiAsync(edm::WaitingTaskHolder iTask, unsigned int iStreamIndex, std::shared_ptr< LuminosityBlockProcessingStatus > iLumiStatus)
PreallocationConfiguration preallocations_
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
std::atomic< unsigned int > streamLumiActive_
void edm::EventProcessor::endUnfinishedRun ( ProcessHistoryID const &  phid,
RunNumber_t  run,
bool  globalBeginSucceeded,
bool  cleaningUpAfterException,
bool  eventSetupForInstanceSucceeded 
)

Definition at line 934 of file EventProcessor.cc.

References deleteRunFromCache(), endRun(), edm::make_empty_waiting_task(), edm::RunPrincipal::mergeableRunProductMetadata(), edm::MergeableRunProductMetadata::postWriteRun(), edm::MergeableRunProductMetadata::preWriteRun(), principalCache_, run(), edm::PrincipalCache::runPrincipal(), lumiQTWidget::t, and writeRunAsync().

938  {
939  if (eventSetupForInstanceSucceeded) {
940  //If we skip empty runs, this would be called conditionally
941  endRun(phid, run, globalBeginSucceeded, cleaningUpAfterException);
942 
943  if (globalBeginSucceeded) {
945  t->increment_ref_count();
946  RunPrincipal& runPrincipal = principalCache_.runPrincipal(phid, run);
947  MergeableRunProductMetadata* mergeableRunProductMetadata = runPrincipal.mergeableRunProductMetadata();
948  mergeableRunProductMetadata->preWriteRun();
949  writeRunAsync(edm::WaitingTaskHolder{t.get()}, phid, run, mergeableRunProductMetadata);
950  t->wait_for_all();
951  mergeableRunProductMetadata->postWriteRun();
952  if (t->exceptionPtr()) {
953  std::rethrow_exception(*t->exceptionPtr());
954  }
955  }
956  }
957  deleteRunFromCache(phid, run);
958  }
void deleteRunFromCache(ProcessHistoryID const &phid, RunNumber_t run)
void writeRunAsync(WaitingTaskHolder, ProcessHistoryID const &phid, RunNumber_t run, MergeableRunProductMetadata const *)
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
void endRun(ProcessHistoryID const &phid, RunNumber_t run, bool globalBeginSucceeded, bool cleaningUpAfterException)
MergeableRunProductMetadata * mergeableRunProductMetadata()
Definition: RunPrincipal.h:78
PrincipalCache principalCache_
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const
std::vector< ModuleDescription const * > edm::EventProcessor::getAllModuleDescriptions ( ) const

Return a vector allowing const access to all the ModuleDescriptions for this EventProccessor. *** N.B. *** Ownership of the ModuleDescriptions is not *** passed to the caller. Do not call delete on these *** pointers!

Definition at line 608 of file EventProcessor.cc.

References schedule_.

608  {
609  return schedule_->getAllModuleDescriptions();
610  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
ServiceToken edm::EventProcessor::getToken ( )

Definition at line 606 of file EventProcessor.cc.

References serviceToken_.

Referenced by ~EventProcessor().

606 { return serviceToken_; }
ServiceToken serviceToken_
void edm::EventProcessor::getTriggerReport ( TriggerReport rep) const

Return the trigger report information on paths, modules-in-path, modules-in-endpath, and modules.

Definition at line 622 of file EventProcessor.cc.

References schedule_.

622 { schedule_->getTriggerReport(rep); }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
rep
Definition: cuy.py:1190
void edm::EventProcessor::globalEndLumiAsync ( edm::WaitingTaskHolder  iTask,
std::shared_ptr< LuminosityBlockProcessingStatus iLumiStatus 
)

Definition at line 1208 of file EventProcessor.cc.

References deleteLumiFromCache(), esp_, handleEndLumiExceptions(), iovQueue_, looper_, edm::make_waiting_task(), edm::EventID::maxEventNumber(), eostools::move(), cmsPerfStripChart::operate(), processContext_, edm::SerialTaskQueue::resume(), schedule_, serviceToken_, mps_update::status, subProcesses_, TrackValidation_cff::task, and writeLumiAsync().

Referenced by streamEndLumiAsync().

1209  {
1210  // Get some needed info out of the status object before moving
1211  // it into finalTaskForThisLumi.
1212  auto& lp = *(iLumiStatus->lumiPrincipal());
1213  bool didGlobalBeginSucceed = iLumiStatus->didGlobalBeginSucceed();
1214  bool cleaningUpAfterException = iLumiStatus->cleaningUpAfterException();
1215 
1216  auto finalTaskForThisLumi = edm::make_waiting_task(
1217  tbb::task::allocate_root(),
1218  [status = std::move(iLumiStatus), iTask = std::move(iTask), this](std::exception_ptr const* iPtr) mutable {
1219  std::exception_ptr ptr;
1220  if (iPtr) {
1221  handleEndLumiExceptions(iPtr, iTask);
1222  } else {
1223  try {
1225  if (looper_) {
1226  auto& lp = *(status->lumiPrincipal());
1227  auto const& es = esp_->eventSetup();
1228  looper_->doEndLuminosityBlock(lp, es, &processContext_);
1229  }
1230  } catch (...) {
1231  ptr = std::current_exception();
1232  }
1233  }
1235 
1236  // Try hard to clean up resources so the
1237  // process can terminate in a controlled
1238  // fashion even after exceptions have occurred.
1239 
1240  try {
1242  } catch (...) {
1243  if (not ptr) {
1244  ptr = std::current_exception();
1245  }
1246  }
1247 
1248  try {
1249  //release our hold on the IOV
1250  iovQueue_.resume();
1251  } catch (...) {
1252  if (not ptr) {
1253  ptr = std::current_exception();
1254  }
1255  }
1256 
1257  try {
1258  status->resumeGlobalLumiQueue();
1259  } catch (...) {
1260  if (not ptr) {
1261  ptr = std::current_exception();
1262  }
1263  }
1264 
1265  try {
1266  // This call to status.reset() must occur before iTask is destroyed.
1267  // Otherwise there will be a data race which could result in endRun
1268  // being delayed until it is too late to successfully call it.
1269  status.reset();
1270  } catch (...) {
1271  if (not ptr) {
1272  ptr = std::current_exception();
1273  }
1274  }
1275 
1276  if (ptr) {
1277  handleEndLumiExceptions(&ptr, iTask);
1278  }
1279  });
1280 
1281  auto writeT = edm::make_waiting_task(
1282  tbb::task::allocate_root(),
1283  [this, didGlobalBeginSucceed, &lumiPrincipal = lp, task = WaitingTaskHolder(finalTaskForThisLumi)](
1284  std::exception_ptr const* iExcept) mutable {
1285  if (iExcept) {
1286  task.doneWaiting(*iExcept);
1287  } else {
1288  //Only call writeLumi if beginLumi succeeded
1289  if (didGlobalBeginSucceed) {
1290  writeLumiAsync(std::move(task), lumiPrincipal);
1291  }
1292  }
1293  });
1294 
1295  IOVSyncValue ts(EventID(lp.run(), lp.luminosityBlock(), EventID::maxEventNumber()), lp.beginTime());
1296 
1297  typedef OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalEnd> Traits;
1298  auto const& es = esp_->eventSetup();
1299 
1300  endGlobalTransitionAsync<Traits>(
1301  WaitingTaskHolder(writeT), *schedule_, lp, ts, es, serviceToken_, subProcesses_, cleaningUpAfterException);
1302  }
ProcessContext processContext_
void deleteLumiFromCache(LuminosityBlockProcessingStatus &)
void writeLumiAsync(WaitingTaskHolder, LuminosityBlockPrincipal &lumiPrincipal)
std::vector< SubProcess > subProcesses_
ServiceToken serviceToken_
bool resume()
Resumes processing if the queue was paused.
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:87
static EventNumber_t maxEventNumber()
Definition: EventID.h:111
void handleEndLumiExceptions(std::exception_ptr const *iPtr, WaitingTaskHolder &holder)
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
edm::SerialTaskQueue iovQueue_
def move(src, dest)
Definition: eostools.py:511
def operate(timelog, memlog, json_f, num)
void edm::EventProcessor::handleEndLumiExceptions ( std::exception_ptr const *  iPtr,
WaitingTaskHolder holder 
)

Definition at line 1199 of file EventProcessor.cc.

References edm::WaitingTaskHolder::doneWaiting(), setDeferredException(), setExceptionMessageLumis(), and tmp.

Referenced by globalEndLumiAsync(), and streamEndLumiAsync().

1199  {
1200  if (setDeferredException(*iPtr)) {
1201  WaitingTaskHolder tmp(holder);
1202  tmp.doneWaiting(*iPtr);
1203  } else {
1205  }
1206  }
std::vector< std::vector< double > > tmp
Definition: MVATrainer.cc:100
bool setDeferredException(std::exception_ptr)
void edm::EventProcessor::handleNextEventForStreamAsync ( WaitingTaskHolder  iTask,
unsigned int  iStreamIndex 
)
private

Definition at line 1560 of file EventProcessor.cc.

References beginLumiAsync(), deferredExceptionPtr_, deferredExceptionPtrIsSet_, edm::WaitingTaskHolder::doneWaiting(), MillePedeFileConverter_cfg::e, edm::InputSource::IsLumi, lastTransitionType(), edm::make_waiting_task(), eostools::move(), cmsPerfStripChart::operate(), processEventAsync(), edm::SerialTaskQueueChain::push(), readNextEventForStream(), edm::SharedResourcesAcquirer::serialQueueChain(), serviceToken_, sourceResourcesAcquirer_, mps_update::status, streamEndLumiAsync(), and streamLumiStatus_.

Referenced by beginLumiAsync(), and continueLumiAsync().

1560  {
1561  sourceResourcesAcquirer_.serialQueueChain().push([this, iTask, iStreamIndex]() mutable {
1563  auto& status = streamLumiStatus_[iStreamIndex];
1564  try {
1565  if (readNextEventForStream(iStreamIndex, *status)) {
1566  auto recursionTask = make_waiting_task(
1567  tbb::task::allocate_root(), [this, iTask, iStreamIndex](std::exception_ptr const* iPtr) mutable {
1568  if (iPtr) {
1569  // Try to end the stream properly even if an exception was
1570  // thrown on an event.
1571  bool expected = false;
1572  if (deferredExceptionPtrIsSet_.compare_exchange_strong(expected, true)) {
1573  // This is the case where the exception in iPtr is the primary
1574  // exception and we want to see its message.
1575  deferredExceptionPtr_ = *iPtr;
1576  WaitingTaskHolder tempHolder(iTask);
1577  tempHolder.doneWaiting(*iPtr);
1578  }
1579  streamEndLumiAsync(std::move(iTask), iStreamIndex, streamLumiStatus_[iStreamIndex]);
1580  //the stream will stop now
1581  return;
1582  }
1583  handleNextEventForStreamAsync(std::move(iTask), iStreamIndex);
1584  });
1585 
1586  processEventAsync(WaitingTaskHolder(recursionTask), iStreamIndex);
1587  } else {
1588  //the stream will stop now
1589  if (status->isLumiEnding()) {
1590  if (lastTransitionType() == InputSource::IsLumi and not status->haveStartedNextLumi()) {
1591  status->startNextLumi();
1592  beginLumiAsync(status->nextSyncValue(), status->runResource(), iTask);
1593  }
1594  streamEndLumiAsync(std::move(iTask), iStreamIndex, status);
1595  } else {
1596  iTask.doneWaiting(std::exception_ptr{});
1597  }
1598  }
1599  } catch (...) {
1600  // It is unlikely we will ever get in here ...
1601  // But if we do try to clean up and propagate the exception
1602  if (streamLumiStatus_[iStreamIndex]) {
1603  streamEndLumiAsync(iTask, iStreamIndex, streamLumiStatus_[iStreamIndex]);
1604  }
1605  bool expected = false;
1606  if (deferredExceptionPtrIsSet_.compare_exchange_strong(expected, true)) {
1607  auto e = std::current_exception();
1609  iTask.doneWaiting(e);
1610  }
1611  }
1612  });
1613  }
bool readNextEventForStream(unsigned int iStreamIndex, LuminosityBlockProcessingStatus &iLumiStatus)
SharedResourcesAcquirer sourceResourcesAcquirer_
void processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
void streamEndLumiAsync(edm::WaitingTaskHolder iTask, unsigned int iStreamIndex, std::shared_ptr< LuminosityBlockProcessingStatus > iLumiStatus)
void push(T &&iAction)
asynchronously pushes functor iAction into queue
void handleNextEventForStreamAsync(WaitingTaskHolder iTask, unsigned int iStreamIndex)
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
InputSource::ItemType lastTransitionType() const
SerialTaskQueueChain & serialQueueChain() const
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:87
void beginLumiAsync(edm::IOVSyncValue const &iSyncValue, std::shared_ptr< void > const &iRunResource, edm::WaitingTaskHolder iHolder)
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
std::exception_ptr deferredExceptionPtr_
def move(src, dest)
Definition: eostools.py:511
def operate(timelog, memlog, json_f, num)
void edm::EventProcessor::init ( std::shared_ptr< ProcessDesc > &  processDesc,
ServiceToken const &  token,
serviceregistry::ServiceLegacy  iLegacy 
)
private

Definition at line 323 of file EventProcessor.cc.

References act_table_, actReg_, branchIDListHelper(), branchIDListHelper_, trackingPlots::common, edm::errors::Configuration, esp_, espController_, Exception, FDEBUG, fileModeNoMerge_, edm::fillLooper(), forceESCacheClearOnNewRun_, edm::ParameterSet::getUntrackedParameter(), historyAppender_, input_, edm::PrincipalCache::insert(), edm::ParentageRegistry::insertMapped(), edm::ParentageRegistry::instance(), mps_monitormerge::items, edm::serviceregistry::kConfigurationOverrides, looper_, lumiQueue_, edm::makeInput(), mergeableRunProductProcesses_, eostools::move(), jets_cff::nThreads, edm::PreallocationConfiguration::numberOfLuminosityBlocks(), edm::PreallocationConfiguration::numberOfStreams(), cmsPerfStripChart::operate(), or, edm::parameterSet(), edm::popSubProcessVParameterSet(), preallocations_, preg(), preg_, principalCache_, printDependencies_, processConfiguration_, processContext_, edm::ParameterSet::registerIt(), schedule_, serviceToken_, edm::PrincipalCache::setNumberOfConcurrentPrincipals(), edm::ProcessContext::setProcessConfiguration(), edm::MergeableRunProductProcesses::setProcessesWithMergeableRunProducts(), edm::PrincipalCache::setProcessHistoryRegistry(), edm::IllegalParameters::setThrowAnException(), streamLumiStatus_, streamQueues_, AlCaHLTBitMon_QueryRunRegistry::string, subProcesses_, thinnedAssociationsHelper(), thinnedAssociationsHelper_, and edm::validateTopLevelParameterSets().

Referenced by EventProcessor().

325  {
326  //std::cerr << processDesc->dump() << std::endl;
327 
328  // register the empty parentage vector , once and for all
330 
331  // register the empty parameter set, once and for all.
332  ParameterSet().registerIt();
333 
334  std::shared_ptr<ParameterSet> parameterSet = processDesc->getProcessPSet();
335 
336  // If there are subprocesses, pop the subprocess parameter sets out of the process parameter set
337  auto subProcessVParameterSet = popSubProcessVParameterSet(*parameterSet);
338  bool const hasSubProcesses = !subProcessVParameterSet.empty();
339 
340  // Validates the parameters in the 'options', 'maxEvents', 'maxLuminosityBlocks',
341  // and 'maxSecondsUntilRampdown' top level parameter sets. Default values are also
342  // set in here if the parameters were not explicitly set.
343  validateTopLevelParameterSets(parameterSet.get());
344 
345  // Now set some parameters specific to the main process.
346  ParameterSet const& optionsPset(parameterSet->getUntrackedParameterSet("options"));
347  auto const& fileMode = optionsPset.getUntrackedParameter<std::string>("fileMode");
348  if (fileMode != "NOMERGE" and fileMode != "FULLMERGE") {
349  throw Exception(errors::Configuration, "Illegal fileMode parameter value: ")
350  << fileMode << ".\n"
351  << "Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
352  } else {
353  fileModeNoMerge_ = (fileMode == "NOMERGE");
354  }
355  forceESCacheClearOnNewRun_ = optionsPset.getUntrackedParameter<bool>("forceEventSetupCacheClearOnNewRun");
356 
357  //threading
358  unsigned int nThreads = optionsPset.getUntrackedParameter<unsigned int>("numberOfThreads");
359 
360  // Even if numberOfThreads was set to zero in the Python configuration, the code
361  // in cmsRun.cpp should have reset it to something else.
362  assert(nThreads != 0);
363 
364  unsigned int nStreams = optionsPset.getUntrackedParameter<unsigned int>("numberOfStreams");
365  if (nStreams == 0) {
366  nStreams = nThreads;
367  }
368  if (nThreads > 1 or nStreams > 1) {
369  edm::LogInfo("ThreadStreamSetup") << "setting # threads " << nThreads << "\nsetting # streams " << nStreams;
370  }
371  unsigned int nConcurrentRuns = optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentRuns");
372  if (nConcurrentRuns != 1) {
373  throw Exception(errors::Configuration, "Illegal value nConcurrentRuns : ")
374  << "Although the plan is to change this in the future, currently nConcurrentRuns must always be 1.\n";
375  }
376  unsigned int nConcurrentLumis =
377  optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentLuminosityBlocks");
378  if (nConcurrentLumis == 0) {
379  nConcurrentLumis = nConcurrentRuns;
380  }
381 
382  //Check that relationships between threading parameters makes sense
383  /*
384  if(nThreads<nStreams) {
385  //bad
386  }
387  if(nConcurrentRuns>nStreams) {
388  //bad
389  }
390  if(nConcurrentRuns>nConcurrentLumis) {
391  //bad
392  }
393  */
394  IllegalParameters::setThrowAnException(optionsPset.getUntrackedParameter<bool>("throwIfIllegalParameter"));
395 
396  printDependencies_ = optionsPset.getUntrackedParameter<bool>("printDependencies");
397 
398  // Now do general initialization
399  ScheduleItems items;
400 
401  //initialize the services
402  auto& serviceSets = processDesc->getServicesPSets();
403  ServiceToken token = items.initServices(serviceSets, *parameterSet, iToken, iLegacy, true);
404  serviceToken_ = items.addCPRandTNS(*parameterSet, token);
405 
406  //make the services available
408 
409  if (nStreams > 1) {
411  handler->willBeUsingThreads();
412  }
413 
414  // intialize miscellaneous items
415  std::shared_ptr<CommonParams> common(items.initMisc(*parameterSet));
416 
417  // intialize the event setup provider
418  esp_ = espController_->makeProvider(*parameterSet, items.actReg_.get());
419 
420  // initialize the looper, if any
421  looper_ = fillLooper(*espController_, *esp_, *parameterSet);
422  if (looper_) {
423  looper_->setActionTable(items.act_table_.get());
424  looper_->attachTo(*items.actReg_);
425 
426  //For now loopers make us run only 1 transition at a time
427  nStreams = 1;
428  nConcurrentLumis = 1;
429  nConcurrentRuns = 1;
430  }
431 
432  preallocations_ = PreallocationConfiguration{nThreads, nStreams, nConcurrentLumis, nConcurrentRuns};
433 
434  lumiQueue_ = std::make_unique<LimitedTaskQueue>(nConcurrentLumis);
435  streamQueues_.resize(nStreams);
436  streamLumiStatus_.resize(nStreams);
437 
438  // initialize the input source
439  input_ = makeInput(*parameterSet,
440  *common,
441  items.preg(),
442  items.branchIDListHelper(),
443  items.thinnedAssociationsHelper(),
444  items.actReg_,
445  items.processConfiguration(),
447 
448  // intialize the Schedule
449  schedule_ = items.initSchedule(*parameterSet, hasSubProcesses, preallocations_, &processContext_);
450 
451  // set the data members
452  act_table_ = std::move(items.act_table_);
453  actReg_ = items.actReg_;
454  preg_ = items.preg();
456  branchIDListHelper_ = items.branchIDListHelper();
457  thinnedAssociationsHelper_ = items.thinnedAssociationsHelper();
458  processConfiguration_ = items.processConfiguration();
460  principalCache_.setProcessHistoryRegistry(input_->processHistoryRegistry());
461 
462  FDEBUG(2) << parameterSet << std::endl;
463 
465  for (unsigned int index = 0; index < preallocations_.numberOfStreams(); ++index) {
466  // Reusable event principal
467  auto ep = std::make_shared<EventPrincipal>(preg(),
471  historyAppender_.get(),
472  index);
474  }
475 
476  for (unsigned int index = 0; index < preallocations_.numberOfLuminosityBlocks(); ++index) {
477  auto lp =
478  std::make_unique<LuminosityBlockPrincipal>(preg(), *processConfiguration_, historyAppender_.get(), index);
480  }
481 
482  // fill the subprocesses, if there are any
483  subProcesses_.reserve(subProcessVParameterSet.size());
484  for (auto& subProcessPSet : subProcessVParameterSet) {
485  subProcesses_.emplace_back(subProcessPSet,
486  *parameterSet,
487  preg(),
490  SubProcessParentageHelper(),
492  *actReg_,
493  token,
496  &processContext_);
497  }
498  }
void insert(std::shared_ptr< RunPrincipal > rp)
ProcessContext processContext_
void setProcessesWithMergeableRunProducts(ProductRegistry const &productRegistry)
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
std::unique_ptr< edm::LimitedTaskQueue > lumiQueue_
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
PreallocationConfiguration preallocations_
std::vector< SubProcess > subProcesses_
void validateTopLevelParameterSets(ParameterSet *processParameterSet)
MergeableRunProductProcesses mergeableRunProductProcesses_
std::unique_ptr< InputSource > makeInput(ParameterSet &params, CommonParams const &common, std::shared_ptr< ProductRegistry > preg, std::shared_ptr< BranchIDListHelper > branchIDListHelper, std::shared_ptr< ThinnedAssociationsHelper > thinnedAssociationsHelper, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, PreallocationConfiguration const &allocations)
ServiceToken serviceToken_
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
std::shared_ptr< ProductRegistry const > preg() const
std::vector< edm::SerialTaskQueue > streamQueues_
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
static void setThrowAnException(bool v)
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< EDLooperBase > fillLooper(eventsetup::EventSetupsController &esController, eventsetup::EventSetupProvider &cp, ParameterSet &params)
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
Definition: common.py:1
std::vector< ParameterSet > popSubProcessVParameterSet(ParameterSet &parameterSet)
Definition: SubProcess.cc:648
void setProcessHistoryRegistry(ProcessHistoryRegistry const &phr)
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
#define FDEBUG(lev)
Definition: DebugMacros.h:19
static ParentageRegistry * instance()
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
bool insertMapped(value_type const &v)
ParameterSet const & parameterSet(Provenance const &provenance)
Definition: Provenance.cc:11
def move(src, dest)
Definition: eostools.py:511
PrincipalCache principalCache_
def operate(timelog, memlog, json_f, num)
InputSource::ItemType edm::EventProcessor::lastTransitionType ( ) const
inline
std::shared_ptr<EDLooperBase const> edm::EventProcessor::looper ( ) const
inlineprivate

Definition at line 299 of file EventProcessor.h.

References edm::get_underlying_safe().

Referenced by endJob().

299 { return get_underlying_safe(looper_); }
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
std::shared_ptr<EDLooperBase>& edm::EventProcessor::looper ( )
inlineprivate

Definition at line 300 of file EventProcessor.h.

References edm::get_underlying_safe().

300 { return get_underlying_safe(looper_); }
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::LuminosityBlockNumber_t edm::EventProcessor::nextLuminosityBlockID ( )

Definition at line 671 of file EventProcessor.cc.

References input_.

Referenced by readNextEventForStream().

671 { return input_->luminosityBlock(); }
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::pair< edm::ProcessHistoryID, edm::RunNumber_t > edm::EventProcessor::nextRunID ( )

Definition at line 667 of file EventProcessor.cc.

References input_.

667  {
668  return std::make_pair(input_->reducedProcessHistoryID(), input_->run());
669  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
InputSource::ItemType edm::EventProcessor::nextTransitionType ( )

Definition at line 641 of file EventProcessor.cc.

References actReg_, checkForAsyncStopRequest(), deferredExceptionPtrIsSet_, epSuccess, edm::ExternalSignal, input_, edm::InputSource::IsStop, edm::InputSource::IsSynchronize, lastSourceTransition_, and runEdmFileComparison::returnCode.

Referenced by readNextEventForStream().

641  {
642  if (deferredExceptionPtrIsSet_.load()) {
644  return InputSource::IsStop;
645  }
646 
647  SendSourceTerminationSignalIfException sentry(actReg_.get());
648  InputSource::ItemType itemType;
649  //For now, do nothing with InputSource::IsSynchronize
650  do {
651  itemType = input_->nextItemType();
652  } while (itemType == InputSource::IsSynchronize);
653 
654  lastSourceTransition_ = itemType;
655  sentry.completedSuccessfully();
656 
658 
659  if (checkForAsyncStopRequest(returnCode)) {
660  actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExternalSignal);
662  }
663 
664  return lastSourceTransition_;
665  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
bool checkForAsyncStopRequest(StatusCode &)
std::atomic< bool > deferredExceptionPtrIsSet_
InputSource::ItemType lastSourceTransition_
std::shared_ptr< ActivityRegistry > actReg_
void edm::EventProcessor::openOutputFiles ( )

Definition at line 769 of file EventProcessor.cc.

References fb_, FDEBUG, edm::for_all(), schedule_, and subProcesses_.

769  {
770  if (fb_.get() != nullptr) {
771  schedule_->openOutputFiles(*fb_);
772  for_all(subProcesses_, [this](auto& subProcess) { subProcess.openOutputFiles(*fb_); });
773  }
774  FDEBUG(1) << "\topenOutputFiles\n";
775  }
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
#define FDEBUG(lev)
Definition: DebugMacros.h:19
EventProcessor& edm::EventProcessor::operator= ( EventProcessor const &  )
delete
std::shared_ptr<ProductRegistry const> edm::EventProcessor::preg ( ) const
inlineprivate

Definition at line 287 of file EventProcessor.h.

References edm::get_underlying_safe().

Referenced by beginJob(), init(), readAndMergeLumi(), readAndMergeRun(), readFile(), and readRun().

287 { return get_underlying_safe(preg_); }
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
std::shared_ptr<ProductRegistry>& edm::EventProcessor::preg ( )
inlineprivate

Definition at line 288 of file EventProcessor.h.

References edm::get_underlying_safe().

288 { return get_underlying_safe(preg_); }
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
void edm::EventProcessor::prepareForNextLoop ( )

Definition at line 834 of file EventProcessor.cc.

References esp_, FDEBUG, edm::propagate_const< T >::get(), and looper_.

Referenced by runToCompletion().

834  {
835  looper_->prepareForNextLoop(esp_.get());
836  FDEBUG(1) << "\tprepareForNextLoop\n";
837  }
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
element_type const * get() const
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
#define FDEBUG(lev)
Definition: DebugMacros.h:19
ProcessConfiguration const& edm::EventProcessor::processConfiguration ( ) const
inline

Definition at line 136 of file EventProcessor.h.

References EcalCondTools::getToken(), and cuy::rep.

136 { return *processConfiguration_; }
std::shared_ptr< ProcessConfiguration const > processConfiguration_
void edm::EventProcessor::processEventAsync ( WaitingTaskHolder  iHolder,
unsigned int  iStreamIndex 
)
private

Definition at line 1629 of file EventProcessor.cc.

References edm::make_functor_task(), and processEventAsyncImpl().

Referenced by handleNextEventForStreamAsync().

1629  {
1630  tbb::task::spawn(
1631  *make_functor_task(tbb::task::allocate_root(), [=]() { processEventAsyncImpl(iHolder, iStreamIndex); }));
1632  }
FunctorTask< F > * make_functor_task(ALLOC &&iAlloc, F f)
Definition: FunctorTask.h:47
void processEventAsyncImpl(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
void edm::EventProcessor::processEventAsyncImpl ( WaitingTaskHolder  iHolder,
unsigned int  iStreamIndex 
)
private

Definition at line 1634 of file EventProcessor.cc.

References edm::WaitingTaskHolder::doneWaiting(), esp_, ev, edm::PrincipalCache::eventPrincipal(), FDEBUG, edm::Service< T >::isAvailable(), looper_, edm::make_waiting_task(), eostools::move(), cmsPerfStripChart::operate(), principalCache_, processEventWithLooper(), groupFilesInBlocks::reverse, schedule_, serviceToken_, and subProcesses_.

Referenced by processEventAsync().

1634  {
1635  auto pep = &(principalCache_.eventPrincipal(iStreamIndex));
1636 
1639  if (rng.isAvailable()) {
1640  Event ev(*pep, ModuleDescription(), nullptr);
1641  rng->postEventRead(ev);
1642  }
1643 
1644  WaitingTaskHolder finalizeEventTask(
1645  make_waiting_task(tbb::task::allocate_root(), [this, pep, iHolder](std::exception_ptr const* iPtr) mutable {
1646  //NOTE: If we have a looper we only have one Stream
1647  if (looper_) {
1649  processEventWithLooper(*pep);
1650  }
1651 
1652  FDEBUG(1) << "\tprocessEvent\n";
1653  pep->clearEventPrincipal();
1654  if (iPtr) {
1655  iHolder.doneWaiting(*iPtr);
1656  } else {
1657  iHolder.doneWaiting(std::exception_ptr());
1658  }
1659  }));
1660  WaitingTaskHolder afterProcessTask;
1661  if (subProcesses_.empty()) {
1662  afterProcessTask = std::move(finalizeEventTask);
1663  } else {
1664  //Need to run SubProcesses after schedule has finished
1665  // with the event
1666  afterProcessTask = WaitingTaskHolder(make_waiting_task(
1667  tbb::task::allocate_root(), [this, pep, finalizeEventTask](std::exception_ptr const* iPtr) mutable {
1668  if (not iPtr) {
1669  //when run with 1 thread, we want to the order to be what
1670  // it was before. This requires reversing the order since
1671  // tasks are run last one in first one out
1672  for (auto& subProcess : boost::adaptors::reverse(subProcesses_)) {
1673  subProcess.doEventAsync(finalizeEventTask, *pep);
1674  }
1675  } else {
1676  finalizeEventTask.doneWaiting(*iPtr);
1677  }
1678  }));
1679  }
1680 
1681  schedule_->processOneEventAsync(std::move(afterProcessTask), iStreamIndex, *pep, esp_->eventSetup(), serviceToken_);
1682  }
bool ev
std::vector< SubProcess > subProcesses_
ServiceToken serviceToken_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:87
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
void processEventWithLooper(EventPrincipal &)
#define FDEBUG(lev)
Definition: DebugMacros.h:19
def move(src, dest)
Definition: eostools.py:511
PrincipalCache principalCache_
def operate(timelog, memlog, json_f, num)
void edm::EventProcessor::processEventWithLooper ( EventPrincipal iPrincipal)
private

Definition at line 1684 of file EventProcessor.cc.

References esp_, input_, edm::InputSource::IsStop, edm::EDLooperBase::kContinue, edm::ProcessingController::kToPreviousEvent, edm::ProcessingController::kToSpecifiedEvent, edm::ProcessingController::lastOperationSucceeded(), lastSourceTransition_, looper_, processContext_, edm::ProcessingController::requestedTransition(), edm::ProcessingController::setLastOperationSucceeded(), shouldWeStop_, edm::ProcessingController::specifiedEventTransition(), mps_update::status, edm::EventPrincipal::streamID(), and summarizeEdmComparisonLogfiles::succeeded.

Referenced by processEventAsyncImpl().

1684  {
1685  bool randomAccess = input_->randomAccess();
1686  ProcessingController::ForwardState forwardState = input_->forwardState();
1687  ProcessingController::ReverseState reverseState = input_->reverseState();
1688  ProcessingController pc(forwardState, reverseState, randomAccess);
1689 
1691  do {
1692  StreamContext streamContext(iPrincipal.streamID(), &processContext_);
1693  status = looper_->doDuringLoop(iPrincipal, esp_->eventSetup(), pc, &streamContext);
1694 
1695  bool succeeded = true;
1696  if (randomAccess) {
1697  if (pc.requestedTransition() == ProcessingController::kToPreviousEvent) {
1698  input_->skipEvents(-2);
1699  } else if (pc.requestedTransition() == ProcessingController::kToSpecifiedEvent) {
1700  succeeded = input_->goToEvent(pc.specifiedEventTransition());
1701  }
1702  }
1703  pc.setLastOperationSucceeded(succeeded);
1704  } while (!pc.lastOperationSucceeded());
1705  if (status != EDLooperBase::kContinue) {
1706  shouldWeStop_ = true;
1708  }
1709  }
ProcessContext processContext_
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
InputSource::ItemType lastSourceTransition_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
InputSource::ItemType edm::EventProcessor::processLumis ( std::shared_ptr< void > const &  iRunResource)

Definition at line 1026 of file EventProcessor.cc.

References beginLumiAsync(), continueLumiAsync(), input_, lastTransitionType(), edm::make_empty_waiting_task(), edm::PreallocationConfiguration::numberOfStreams(), preallocations_, and streamLumiActive_.

1026  {
1027  auto waitTask = make_empty_waiting_task();
1028  waitTask->increment_ref_count();
1029 
1030  if (streamLumiActive_ > 0) {
1032  continueLumiAsync(WaitingTaskHolder{waitTask.get()});
1033  } else {
1034  beginLumiAsync(IOVSyncValue(EventID(input_->run(), input_->luminosityBlock(), 0),
1035  input_->luminosityBlockAuxiliary()->beginTime()),
1036  iRunResource,
1037  WaitingTaskHolder{waitTask.get()});
1038  }
1039  waitTask->wait_for_all();
1040 
1041  if (waitTask->exceptionPtr() != nullptr) {
1042  std::rethrow_exception(*(waitTask->exceptionPtr()));
1043  }
1044  return lastTransitionType();
1045  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
PreallocationConfiguration preallocations_
InputSource::ItemType lastTransitionType() const
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
void continueLumiAsync(edm::WaitingTaskHolder iHolder)
void beginLumiAsync(edm::IOVSyncValue const &iSyncValue, std::shared_ptr< void > const &iRunResource, edm::WaitingTaskHolder iHolder)
std::atomic< unsigned int > streamLumiActive_
int edm::EventProcessor::readAndMergeLumi ( LuminosityBlockProcessingStatus iStatus)

Definition at line 1425 of file EventProcessor.cc.

References actReg_, input_, edm::LuminosityBlockProcessingStatus::lumiPrincipal(), or, and preg().

Referenced by readNextEventForStream().

1425  {
1426  auto& lumiPrincipal = *iStatus.lumiPrincipal();
1427  assert(lumiPrincipal.aux().sameIdentity(*input_->luminosityBlockAuxiliary()) or
1428  input_->processHistoryRegistry().reducedProcessHistoryID(lumiPrincipal.aux().processHistoryID()) ==
1429  input_->processHistoryRegistry().reducedProcessHistoryID(
1430  input_->luminosityBlockAuxiliary()->processHistoryID()));
1431  bool lumiOK = lumiPrincipal.adjustToNewProductRegistry(*preg());
1432  assert(lumiOK);
1433  lumiPrincipal.mergeAuxiliary(*input_->luminosityBlockAuxiliary());
1434  {
1435  SendSourceTerminationSignalIfException sentry(actReg_.get());
1436  input_->readAndMergeLumi(*iStatus.lumiPrincipal());
1437  sentry.completedSuccessfully();
1438  }
1439  return input_->luminosityBlock();
1440  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::shared_ptr< ProductRegistry const > preg() const
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
std::shared_ptr< ActivityRegistry > actReg_
std::pair< ProcessHistoryID, RunNumber_t > edm::EventProcessor::readAndMergeRun ( )

Definition at line 1394 of file EventProcessor.cc.

References actReg_, input_, edm::PrincipalCache::merge(), preg(), principalCache_, and edm::PrincipalCache::runPrincipalPtr().

1394  {
1395  principalCache_.merge(input_->runAuxiliary(), preg());
1396  auto runPrincipal = principalCache_.runPrincipalPtr();
1397  {
1398  SendSourceTerminationSignalIfException sentry(actReg_.get());
1399  input_->readAndMergeRun(*runPrincipal);
1400  sentry.completedSuccessfully();
1401  }
1402  assert(input_->reducedProcessHistoryID() == runPrincipal->reducedProcessHistoryID());
1403  return std::make_pair(runPrincipal->reducedProcessHistoryID(), input_->run());
1404  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::shared_ptr< ProductRegistry const > preg() const
std::shared_ptr< RunPrincipal > const & runPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run) const
std::shared_ptr< ActivityRegistry > actReg_
void merge(std::shared_ptr< RunAuxiliary > aux, std::shared_ptr< ProductRegistry const > reg)
PrincipalCache principalCache_
void edm::EventProcessor::readEvent ( unsigned int  iStreamIndex)
private

Definition at line 1615 of file EventProcessor.cc.

References actReg_, edm::PrincipalCache::eventPrincipal(), FDEBUG, input_, principalCache_, processContext_, and streamLumiStatus_.

Referenced by readNextEventForStream().

1615  {
1616  //TODO this will have to become per stream
1617  auto& event = principalCache_.eventPrincipal(iStreamIndex);
1618  StreamContext streamContext(event.streamID(), &processContext_);
1619 
1620  SendSourceTerminationSignalIfException sentry(actReg_.get());
1621  input_->readEvent(event, streamContext);
1622 
1623  streamLumiStatus_[iStreamIndex]->updateLastTimestamp(input_->timestamp());
1624  sentry.completedSuccessfully();
1625 
1626  FDEBUG(1) << "\treadEvent\n";
1627  }
ProcessContext processContext_
edm::propagate_const< std::unique_ptr< InputSource > > input_
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
std::shared_ptr< ActivityRegistry > actReg_
#define FDEBUG(lev)
Definition: DebugMacros.h:19
Definition: event.py:1
PrincipalCache principalCache_
void edm::EventProcessor::readFile ( )

Definition at line 742 of file EventProcessor.cc.

References actReg_, edm::PrincipalCache::adjustEventsToNewProductRegistry(), edm::PrincipalCache::adjustIndexesAfterProductRegistryAddition(), fb_, FDEBUG, input_, edm::PreallocationConfiguration::numberOfStreams(), edm::PreallocationConfiguration::numberOfThreads(), edm::FileBlock::ParallelProcesses, preallocations_, preg(), preg_, edm::PrincipalCache::preReadFile(), principalCache_, and findQualityFiles::size.

Referenced by Vispa.Plugins.EventBrowser.EventBrowserTabController.EventBrowserTabController::navigate(), Vispa.Main.TabController.TabController::open(), and Vispa.Main.TabController.TabController::refresh().

742  {
743  FDEBUG(1) << " \treadFile\n";
744  size_t size = preg_->size();
745  SendSourceTerminationSignalIfException sentry(actReg_.get());
746 
748 
749  fb_ = input_->readFile();
750  if (size < preg_->size()) {
752  }
755  fb_->setNotFastClonable(FileBlock::ParallelProcesses);
756  }
757  sentry.completedSuccessfully();
758  }
size
Write out results.
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
void adjustIndexesAfterProductRegistryAddition()
PreallocationConfiguration preallocations_
std::shared_ptr< ProductRegistry const > preg() const
void adjustEventsToNewProductRegistry(std::shared_ptr< ProductRegistry const > reg)
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
std::shared_ptr< ActivityRegistry > actReg_
#define FDEBUG(lev)
Definition: DebugMacros.h:19
PrincipalCache principalCache_
void edm::EventProcessor::readLuminosityBlock ( LuminosityBlockProcessingStatus iStatus)

Definition at line 1406 of file EventProcessor.cc.

References actReg_, Exception, edm::PrincipalCache::getAvailableLumiPrincipalPtr(), edm::PrincipalCache::hasRunPrincipal(), historyAppender_, input_, edm::errors::LogicError, edm::LuminosityBlockProcessingStatus::lumiPrincipal(), eostools::move(), principalCache_, and edm::PrincipalCache::runPrincipalPtr().

Referenced by beginLumiAsync().

1406  {
1408  throw edm::Exception(edm::errors::LogicError) << "EventProcessor::readLuminosityBlock\n"
1409  << "Illegal attempt to insert lumi into cache\n"
1410  << "Run is invalid\n"
1411  << "Contact a Framework Developer\n";
1412  }
1414  assert(lbp);
1415  lbp->setAux(*input_->luminosityBlockAuxiliary());
1416  {
1417  SendSourceTerminationSignalIfException sentry(actReg_.get());
1418  input_->readLuminosityBlock(*lbp, *historyAppender_);
1419  sentry.completedSuccessfully();
1420  }
1421  lbp->setRunPrincipal(principalCache_.runPrincipalPtr());
1422  iStatus.lumiPrincipal() = std::move(lbp);
1423  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
bool hasRunPrincipal() const
std::shared_ptr< RunPrincipal > const & runPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run) const
std::shared_ptr< LuminosityBlockPrincipal > getAvailableLumiPrincipalPtr()
std::shared_ptr< ActivityRegistry > actReg_
def move(src, dest)
Definition: eostools.py:511
PrincipalCache principalCache_
bool edm::EventProcessor::readNextEventForStream ( unsigned int  iStreamIndex,
LuminosityBlockProcessingStatus iLumiStatus 
)
private

Definition at line 1499 of file EventProcessor.cc.

References edm::LuminosityBlockProcessingStatus::continuingLumi(), deferredExceptionPtr_, deferredExceptionPtrIsSet_, edm::LuminosityBlockProcessingStatus::endLumi(), edm::LuminosityBlockProcessingStatus::haveContinuedLumi(), input_, edm::InputSource::IsEvent, edm::InputSource::IsLumi, edm::InputSource::IsRun, edm::InputSource::IsStop, lastSourceTransition_, edm::LuminosityBlockProcessingStatus::lumiPrincipal(), nextLuminosityBlockID(), nextTransitionType(), cmsPerfStripChart::operate(), or, readAndMergeLumi(), readEvent(), serviceToken_, edm::LuminosityBlockProcessingStatus::setNextSyncValue(), shouldWeStop(), sourceMutex_, edm::LuminosityBlockProcessingStatus::stopProcessingEvents(), and edm::LuminosityBlockProcessingStatus::wasEventProcessingStopped().

Referenced by handleNextEventForStreamAsync().

1499  {
1500  if (deferredExceptionPtrIsSet_.load(std::memory_order_acquire)) {
1501  iStatus.endLumi();
1502  return false;
1503  }
1504 
1505  if (iStatus.wasEventProcessingStopped()) {
1506  return false;
1507  }
1508 
1509  if (shouldWeStop()) {
1511  iStatus.stopProcessingEvents();
1512  iStatus.endLumi();
1513  return false;
1514  }
1515 
1517  try {
1518  //need to use lock in addition to the serial task queue because
1519  // of delayed provenance reading and reading data in response to
1520  // edm::Refs etc
1521  std::lock_guard<std::recursive_mutex> guard(*(sourceMutex_.get()));
1522 
1523  auto itemType = iStatus.continuingLumi() ? InputSource::IsLumi : nextTransitionType();
1524  if (InputSource::IsLumi == itemType) {
1525  iStatus.haveContinuedLumi();
1526  while (itemType == InputSource::IsLumi and iStatus.lumiPrincipal()->run() == input_->run() and
1527  iStatus.lumiPrincipal()->luminosityBlock() == nextLuminosityBlockID()) {
1528  readAndMergeLumi(iStatus);
1529  itemType = nextTransitionType();
1530  }
1531  if (InputSource::IsLumi == itemType) {
1532  iStatus.setNextSyncValue(IOVSyncValue(EventID(input_->run(), input_->luminosityBlock(), 0),
1533  input_->luminosityBlockAuxiliary()->beginTime()));
1534  }
1535  }
1536  if (InputSource::IsEvent != itemType) {
1537  iStatus.stopProcessingEvents();
1538 
1539  //IsFile may continue processing the lumi and
1540  // looper_ can cause the input source to declare a new IsRun which is actually
1541  // just a continuation of the previous run
1542  if (InputSource::IsStop == itemType or InputSource::IsLumi == itemType or
1543  (InputSource::IsRun == itemType and iStatus.lumiPrincipal()->run() != input_->run())) {
1544  iStatus.endLumi();
1545  }
1546  return false;
1547  }
1548  readEvent(iStreamIndex);
1549  } catch (...) {
1550  bool expected = false;
1551  if (deferredExceptionPtrIsSet_.compare_exchange_strong(expected, true)) {
1552  deferredExceptionPtr_ = std::current_exception();
1553  iStatus.endLumi();
1554  }
1555  return false;
1556  }
1557  return true;
1558  }
void readEvent(unsigned int iStreamIndex)
InputSource::ItemType nextTransitionType()
edm::propagate_const< std::unique_ptr< InputSource > > input_
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
std::shared_ptr< std::recursive_mutex > sourceMutex_
InputSource::ItemType lastSourceTransition_
int readAndMergeLumi(LuminosityBlockProcessingStatus &)
std::exception_ptr deferredExceptionPtr_
edm::LuminosityBlockNumber_t nextLuminosityBlockID()
bool shouldWeStop() const
def operate(timelog, memlog, json_f, num)
std::pair< ProcessHistoryID, RunNumber_t > edm::EventProcessor::readRun ( )

Definition at line 1371 of file EventProcessor.cc.

References actReg_, Exception, edm::PrincipalCache::hasRunPrincipal(), historyAppender_, input_, edm::PrincipalCache::insert(), edm::errors::LogicError, mergeableRunProductProcesses_, preg(), principalCache_, and processConfiguration_.

1371  {
1373  throw edm::Exception(edm::errors::LogicError) << "EventProcessor::readRun\n"
1374  << "Illegal attempt to insert run into cache\n"
1375  << "Contact a Framework Developer\n";
1376  }
1377  auto rp = std::make_shared<RunPrincipal>(input_->runAuxiliary(),
1378  preg(),
1380  historyAppender_.get(),
1381  0,
1382  true,
1384  {
1385  SendSourceTerminationSignalIfException sentry(actReg_.get());
1386  input_->readRun(*rp, *historyAppender_);
1387  sentry.completedSuccessfully();
1388  }
1389  assert(input_->reducedProcessHistoryID() == rp->reducedProcessHistoryID());
1390  principalCache_.insert(rp);
1391  return std::make_pair(rp->reducedProcessHistoryID(), input_->run());
1392  }
void insert(std::shared_ptr< RunPrincipal > rp)
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
bool hasRunPrincipal() const
MergeableRunProductProcesses mergeableRunProductProcesses_
std::shared_ptr< ProductRegistry const > preg() const
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< ActivityRegistry > actReg_
PrincipalCache principalCache_
void edm::EventProcessor::respondToCloseInputFile ( )

Definition at line 795 of file EventProcessor.cc.

References fb_, FDEBUG, edm::for_all(), schedule_, and subProcesses_.

795  {
796  if (fb_.get() != nullptr) {
797  schedule_->respondToCloseInputFile(*fb_);
798  for_all(subProcesses_, [this](auto& subProcess) { subProcess.respondToCloseInputFile(*fb_); });
799  }
800  FDEBUG(1) << "\trespondToCloseInputFile\n";
801  }
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
#define FDEBUG(lev)
Definition: DebugMacros.h:19
void edm::EventProcessor::respondToOpenInputFile ( )

Definition at line 785 of file EventProcessor.cc.

References branchIDListHelper_, fb_, FDEBUG, edm::for_all(), schedule_, and subProcesses_.

785  {
787  [this](auto& subProcess) { subProcess.updateBranchIDListHelper(branchIDListHelper_->branchIDLists()); });
788  if (fb_.get() != nullptr) {
789  schedule_->respondToOpenInputFile(*fb_);
790  for_all(subProcesses_, [this](auto& subProcess) { subProcess.respondToOpenInputFile(*fb_); });
791  }
792  FDEBUG(1) << "\trespondToOpenInputFile\n";
793  }
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
#define FDEBUG(lev)
Definition: DebugMacros.h:19
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
void edm::EventProcessor::rewindInput ( )

Definition at line 828 of file EventProcessor.cc.

References FDEBUG, and input_.

Referenced by runToCompletion().

828  {
829  input_->repeat();
830  input_->rewind();
831  FDEBUG(1) << "\trewind\n";
832  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
#define FDEBUG(lev)
Definition: DebugMacros.h:19
EventProcessor::StatusCode edm::EventProcessor::run ( )
inline

Definition at line 369 of file EventProcessor.h.

Referenced by Types.EventID::cppID(), Types.LuminosityBlockID::cppID(), and endUnfinishedRun().

369 { return runToCompletion(); }
StatusCode runToCompletion()
EventProcessor::StatusCode edm::EventProcessor::runToCompletion ( )

Definition at line 673 of file EventProcessor.cc.

References cms::Exception::addAdditionalInfo(), cms::Exception::alreadyPrinted(), asyncStopRequestedWhileProcessingEvents_, asyncStopStatusCodeFromProcessingEvents_, beginJob(), deferredExceptionPtr_, deferredExceptionPtrIsSet_, doErrorStuff(), MillePedeFileConverter_cfg::e, endOfLoop(), epSuccess, Exception, exceptionMessageFiles_, exceptionMessageLumis_, exceptionMessageRuns_, fileModeNoMerge_, edm::InputSource::IsStop, cmsPerfStripChart::operate(), prepareForNextLoop(), runEdmFileComparison::returnCode, rewindInput(), serviceToken_, startingNewLoop(), AlCaHLTBitMon_QueryRunRegistry::string, and edm::convertException::wrap().

Referenced by PythonEventProcessor::run().

673  {
676  {
677  beginJob(); //make sure this was called
678 
679  // make the services available
681 
683  try {
684  FilesProcessor fp(fileModeNoMerge_);
685 
686  convertException::wrap([&]() {
687  bool firstTime = true;
688  do {
689  if (not firstTime) {
691  rewindInput();
692  } else {
693  firstTime = false;
694  }
695  startingNewLoop();
696 
697  auto trans = fp.processFiles(*this);
698 
699  fp.normalEnd();
700 
701  if (deferredExceptionPtrIsSet_.load()) {
702  std::rethrow_exception(deferredExceptionPtr_);
703  }
704  if (trans != InputSource::IsStop) {
705  //problem with the source
706  doErrorStuff();
707 
708  throw cms::Exception("BadTransition") << "Unexpected transition change " << trans;
709  }
710  } while (not endOfLoop());
711  }); // convertException::wrap
712 
713  } // Try block
714  catch (cms::Exception& e) {
716  std::string message(
717  "Another exception was caught while trying to clean up lumis after the primary fatal exception.");
718  e.addAdditionalInfo(message);
719  if (e.alreadyPrinted()) {
720  LogAbsolute("Additional Exceptions") << message;
721  }
722  }
723  if (!exceptionMessageRuns_.empty()) {
725  if (e.alreadyPrinted()) {
726  LogAbsolute("Additional Exceptions") << exceptionMessageRuns_;
727  }
728  }
729  if (!exceptionMessageFiles_.empty()) {
731  if (e.alreadyPrinted()) {
732  LogAbsolute("Additional Exceptions") << exceptionMessageFiles_;
733  }
734  }
735  throw;
736  }
737  }
738 
739  return returnCode;
740  }
std::atomic< bool > exceptionMessageLumis_
std::string exceptionMessageRuns_
bool alreadyPrinted() const
Definition: Exception.cc:177
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
void addAdditionalInfo(std::string const &info)
Definition: Exception.cc:169
std::string exceptionMessageFiles_
StatusCode asyncStopStatusCodeFromProcessingEvents_
std::exception_ptr deferredExceptionPtr_
auto wrap(F iFunc) -> decltype(iFunc())
bool asyncStopRequestedWhileProcessingEvents_
def operate(timelog, memlog, json_f, num)
bool edm::EventProcessor::setDeferredException ( std::exception_ptr  iException)

Definition at line 1732 of file EventProcessor.cc.

References deferredExceptionPtr_, and deferredExceptionPtrIsSet_.

Referenced by handleEndLumiExceptions().

1732  {
1733  bool expected = false;
1734  if (deferredExceptionPtrIsSet_.compare_exchange_strong(expected, true)) {
1735  deferredExceptionPtr_ = iException;
1736  return true;
1737  }
1738  return false;
1739  }
std::atomic< bool > deferredExceptionPtrIsSet_
std::exception_ptr deferredExceptionPtr_
void edm::EventProcessor::setExceptionMessageFiles ( std::string &  message)

Definition at line 1726 of file EventProcessor.cc.

References exceptionMessageFiles_.

1726 { exceptionMessageFiles_ = message; }
std::string exceptionMessageFiles_
void edm::EventProcessor::setExceptionMessageLumis ( )

Definition at line 1730 of file EventProcessor.cc.

References exceptionMessageLumis_.

Referenced by handleEndLumiExceptions().

1730 { exceptionMessageLumis_ = true; }
std::atomic< bool > exceptionMessageLumis_
void edm::EventProcessor::setExceptionMessageRuns ( std::string &  message)

Definition at line 1728 of file EventProcessor.cc.

References exceptionMessageRuns_.

1728 { exceptionMessageRuns_ = message; }
std::string exceptionMessageRuns_
bool edm::EventProcessor::shouldWeCloseOutput ( ) const

Definition at line 839 of file EventProcessor.cc.

References FDEBUG, schedule_, and subProcesses_.

839  {
840  FDEBUG(1) << "\tshouldWeCloseOutput\n";
841  if (!subProcesses_.empty()) {
842  for (auto const& subProcess : subProcesses_) {
843  if (subProcess.shouldWeCloseOutput()) {
844  return true;
845  }
846  }
847  return false;
848  }
849  return schedule_->shouldWeCloseOutput();
850  }
std::vector< SubProcess > subProcesses_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
#define FDEBUG(lev)
Definition: DebugMacros.h:19
bool edm::EventProcessor::shouldWeStop ( ) const

Definition at line 1711 of file EventProcessor.cc.

References FDEBUG, schedule_, shouldWeStop_, and subProcesses_.

Referenced by readNextEventForStream().

1711  {
1712  FDEBUG(1) << "\tshouldWeStop\n";
1713  if (shouldWeStop_)
1714  return true;
1715  if (!subProcesses_.empty()) {
1716  for (auto const& subProcess : subProcesses_) {
1717  if (subProcess.terminate()) {
1718  return true;
1719  }
1720  }
1721  return false;
1722  }
1723  return schedule_->terminate();
1724  }
std::vector< SubProcess > subProcesses_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
#define FDEBUG(lev)
Definition: DebugMacros.h:19
void edm::EventProcessor::startingNewLoop ( )

Definition at line 803 of file EventProcessor.cc.

References FDEBUG, looper_, looperBeginJobRun_, and shouldWeStop_.

Referenced by runToCompletion().

803  {
804  shouldWeStop_ = false;
805  //NOTE: for first loop, need to delay calling 'doStartingNewLoop'
806  // until after we've called beginOfJob
807  if (looper_ && looperBeginJobRun_) {
808  looper_->doStartingNewLoop();
809  }
810  FDEBUG(1) << "\tstartingNewLoop\n";
811  }
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
#define FDEBUG(lev)
Definition: DebugMacros.h:19
void edm::EventProcessor::streamEndLumiAsync ( edm::WaitingTaskHolder  iTask,
unsigned int  iStreamIndex,
std::shared_ptr< LuminosityBlockProcessingStatus iLumiStatus 
)

Definition at line 1304 of file EventProcessor.cc.

References esp_, globalEndLumiAsync(), handleEndLumiExceptions(), edm::make_waiting_task(), edm::EventID::maxEventNumber(), eostools::move(), schedule_, serviceToken_, mps_update::status, streamLumiActive_, streamLumiStatus_, streamQueues_, subProcesses_, and lumiQTWidget::t.

Referenced by beginLumiAsync(), endUnfinishedLumi(), and handleNextEventForStreamAsync().

1306  {
1307  auto t = edm::make_waiting_task(tbb::task::allocate_root(),
1308  [this, iStreamIndex, iTask](std::exception_ptr const* iPtr) mutable {
1309  if (iPtr) {
1310  handleEndLumiExceptions(iPtr, iTask);
1311  }
1312  auto status = streamLumiStatus_[iStreamIndex];
1313  //reset status before releasing queue else get race condtion
1314  streamLumiStatus_[iStreamIndex].reset();
1316  streamQueues_[iStreamIndex].resume();
1317 
1318  //are we the last one?
1319  if (status->streamFinishedLumi()) {
1321  } else {
1322  status.reset();
1323  }
1324  });
1325 
1326  edm::WaitingTaskHolder lumiDoneTask{t};
1327 
1328  iLumiStatus->setEndTime();
1329 
1330  if (iLumiStatus->didGlobalBeginSucceed()) {
1331  auto& lumiPrincipal = *iLumiStatus->lumiPrincipal();
1332  IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), EventID::maxEventNumber()),
1333  lumiPrincipal.endTime());
1334  auto const& es = esp_->eventSetup();
1335 
1336  bool cleaningUpAfterException = iLumiStatus->cleaningUpAfterException();
1337 
1338  using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamEnd>;
1339  endStreamTransitionAsync<Traits>(std::move(lumiDoneTask),
1340  *schedule_,
1341  iStreamIndex,
1342  lumiPrincipal,
1343  ts,
1344  es,
1345  serviceToken_,
1346  subProcesses_,
1347  cleaningUpAfterException);
1348  }
1349  iLumiStatus.reset();
1350  }
void globalEndLumiAsync(edm::WaitingTaskHolder iTask, std::shared_ptr< LuminosityBlockProcessingStatus > iLumiStatus)
std::vector< SubProcess > subProcesses_
ServiceToken serviceToken_
std::vector< edm::SerialTaskQueue > streamQueues_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:87
static EventNumber_t maxEventNumber()
Definition: EventID.h:111
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
void handleEndLumiExceptions(std::exception_ptr const *iPtr, WaitingTaskHolder &holder)
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::atomic< unsigned int > streamLumiActive_
def move(src, dest)
Definition: eostools.py:511
std::shared_ptr<ThinnedAssociationsHelper const> edm::EventProcessor::thinnedAssociationsHelper ( ) const
inlineprivate

Definition at line 293 of file EventProcessor.h.

References edm::get_underlying_safe().

Referenced by init().

293  {
295  }
std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
std::shared_ptr<ThinnedAssociationsHelper>& edm::EventProcessor::thinnedAssociationsHelper ( )
inlineprivate

Definition at line 296 of file EventProcessor.h.

References edm::get_underlying_safe().

296  {
298  }
std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
int edm::EventProcessor::totalEvents ( ) const

Return the number of events this EventProcessor has tried to process (inclues both successes and failures, including failures due to exceptions during processing).

Definition at line 612 of file EventProcessor.cc.

References schedule_.

Referenced by PythonEventProcessor::totalEvents().

612 { return schedule_->totalEvents(); }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
int edm::EventProcessor::totalEventsFailed ( ) const

Return the number of events that have not passed any trigger. (N.B. totalEventsFailed() + totalEventsPassed() == totalEvents()

Definition at line 616 of file EventProcessor.cc.

References schedule_.

Referenced by PythonEventProcessor::totalEventsFailed().

616 { return schedule_->totalEventsFailed(); }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
int edm::EventProcessor::totalEventsPassed ( ) const

Return the number of events processed by this EventProcessor which have been passed by one or more trigger paths.

Definition at line 614 of file EventProcessor.cc.

References schedule_.

Referenced by PythonEventProcessor::totalEventsPassed().

614 { return schedule_->totalEventsPassed(); }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
void edm::EventProcessor::warnAboutModulesRequiringLuminosityBLockSynchronization ( ) const
private

Definition at line 1741 of file EventProcessor.cc.

References alignCSCRings::s, and schedule_.

Referenced by beginJob().

1741  {
1742  std::unique_ptr<LogSystem> s;
1743  for (auto worker : schedule_->allWorkers()) {
1744  if (worker->wantsGlobalLuminosityBlocks() and worker->globalLuminosityBlocksQueue()) {
1745  if (not s) {
1746  s = std::make_unique<LogSystem>("ModulesSynchingOnLumis");
1747  (*s) << "The following modules require synchronizing on LuminosityBlock boundaries:";
1748  }
1749  (*s) << "\n " << worker->description().moduleName() << " " << worker->description().moduleLabel();
1750  }
1751  }
1752  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
void edm::EventProcessor::writeLumiAsync ( WaitingTaskHolder  task,
LuminosityBlockPrincipal lumiPrincipal 
)

Definition at line 1472 of file EventProcessor.cc.

References actReg_, edm::WaitingTaskHolder::doneWaiting(), edm::LuminosityBlockPrincipal::luminosityBlock(), edm::make_waiting_task(), edm::RunPrincipal::mergeableRunProductMetadata(), processContext_, edm::LuminosityBlockPrincipal::runPrincipal(), alignCSCRings::s, schedule_, serviceToken_, subProcesses_, and edm::MergeableRunProductMetadata::writeLumi().

Referenced by globalEndLumiAsync().

1472  {
1473  auto subsT = edm::make_waiting_task(tbb::task::allocate_root(),
1474  [this, task, &lumiPrincipal](std::exception_ptr const* iExcept) mutable {
1475  if (iExcept) {
1476  task.doneWaiting(*iExcept);
1477  } else {
1479  for (auto& s : subProcesses_) {
1480  s.writeLumiAsync(task, lumiPrincipal);
1481  }
1482  }
1483  });
1485 
1486  lumiPrincipal.runPrincipal().mergeableRunProductMetadata()->writeLumi(lumiPrincipal.luminosityBlock());
1487 
1488  schedule_->writeLumiAsync(WaitingTaskHolder{subsT}, lumiPrincipal, &processContext_, actReg_.get());
1489  }
ProcessContext processContext_
std::vector< SubProcess > subProcesses_
ServiceToken serviceToken_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:87
std::shared_ptr< ActivityRegistry > actReg_
void edm::EventProcessor::writeRunAsync ( WaitingTaskHolder  task,
ProcessHistoryID const &  phid,
RunNumber_t  run,
MergeableRunProductMetadata const *  mergeableRunProductMetadata 
)

Definition at line 1442 of file EventProcessor.cc.

References actReg_, edm::WaitingTaskHolder::doneWaiting(), edm::make_waiting_task(), principalCache_, processContext_, edm::PrincipalCache::runPrincipal(), alignCSCRings::s, schedule_, serviceToken_, and subProcesses_.

Referenced by endUnfinishedRun().

1445  {
1446  auto subsT = edm::make_waiting_task(
1447  tbb::task::allocate_root(),
1448  [this, phid, run, task, mergeableRunProductMetadata](std::exception_ptr const* iExcept) mutable {
1449  if (iExcept) {
1450  task.doneWaiting(*iExcept);
1451  } else {
1453  for (auto& s : subProcesses_) {
1454  s.writeRunAsync(task, phid, run, mergeableRunProductMetadata);
1455  }
1456  }
1457  });
1459  schedule_->writeRunAsync(WaitingTaskHolder(subsT),
1461  &processContext_,
1462  actReg_.get(),
1463  mergeableRunProductMetadata);
1464  }
ProcessContext processContext_
std::vector< SubProcess > subProcesses_
ServiceToken serviceToken_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:87
std::shared_ptr< ActivityRegistry > actReg_
PrincipalCache principalCache_
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const

Member Data Documentation

std::unique_ptr<ExceptionToActionTable const> edm::EventProcessor::act_table_
private

Definition at line 320 of file EventProcessor.h.

Referenced by init().

std::shared_ptr<ActivityRegistry> edm::EventProcessor::actReg_
private
bool edm::EventProcessor::asyncStopRequestedWhileProcessingEvents_
private

Definition at line 356 of file EventProcessor.h.

Referenced by runToCompletion().

StatusCode edm::EventProcessor::asyncStopStatusCodeFromProcessingEvents_
private

Definition at line 357 of file EventProcessor.h.

Referenced by runToCompletion().

bool edm::EventProcessor::beginJobCalled_
private

Definition at line 344 of file EventProcessor.h.

Referenced by beginJob().

edm::propagate_const<std::shared_ptr<BranchIDListHelper> > edm::EventProcessor::branchIDListHelper_
private

Definition at line 312 of file EventProcessor.h.

Referenced by init(), and respondToOpenInputFile().

std::exception_ptr edm::EventProcessor::deferredExceptionPtr_
private
std::atomic<bool> edm::EventProcessor::deferredExceptionPtrIsSet_
private
edm::propagate_const<std::shared_ptr<eventsetup::EventSetupProvider> > edm::EventProcessor::esp_
private
edm::propagate_const<std::unique_ptr<eventsetup::EventSetupsController> > edm::EventProcessor::espController_
private

Definition at line 317 of file EventProcessor.h.

Referenced by beginJob(), beginLumiAsync(), beginRun(), endRun(), init(), and ~EventProcessor().

ExcludedDataMap edm::EventProcessor::eventSetupDataToExcludeFromPrefetching_
private

Definition at line 362 of file EventProcessor.h.

std::string edm::EventProcessor::exceptionMessageFiles_
private

Definition at line 347 of file EventProcessor.h.

Referenced by runToCompletion(), and setExceptionMessageFiles().

std::atomic<bool> edm::EventProcessor::exceptionMessageLumis_
private

Definition at line 349 of file EventProcessor.h.

Referenced by runToCompletion(), and setExceptionMessageLumis().

std::string edm::EventProcessor::exceptionMessageRuns_
private

Definition at line 348 of file EventProcessor.h.

Referenced by runToCompletion(), and setExceptionMessageRuns().

edm::propagate_const<std::unique_ptr<FileBlock> > edm::EventProcessor::fb_
private
bool edm::EventProcessor::fileModeNoMerge_
private

Definition at line 346 of file EventProcessor.h.

Referenced by init(), and runToCompletion().

bool edm::EventProcessor::firstEventInBlock_ = true
private

Definition at line 358 of file EventProcessor.h.

bool edm::EventProcessor::forceESCacheClearOnNewRun_
private

Definition at line 352 of file EventProcessor.h.

Referenced by beginRun(), and init().

bool edm::EventProcessor::forceLooperToEnd_
private

Definition at line 350 of file EventProcessor.h.

Referenced by endOfLoop().

edm::propagate_const<std::unique_ptr<HistoryAppender> > edm::EventProcessor::historyAppender_
private

Definition at line 332 of file EventProcessor.h.

Referenced by init(), readLuminosityBlock(), and readRun().

edm::propagate_const<std::unique_ptr<InputSource> > edm::EventProcessor::input_
private
edm::SerialTaskQueue edm::EventProcessor::iovQueue_
private

Definition at line 319 of file EventProcessor.h.

Referenced by beginLumiAsync(), and globalEndLumiAsync().

InputSource::ItemType edm::EventProcessor::lastSourceTransition_
private
edm::propagate_const<std::shared_ptr<EDLooperBase> > edm::EventProcessor::looper_
private
bool edm::EventProcessor::looperBeginJobRun_
private

Definition at line 351 of file EventProcessor.h.

Referenced by beginRun(), and startingNewLoop().

std::unique_ptr<edm::LimitedTaskQueue> edm::EventProcessor::lumiQueue_
private

Definition at line 327 of file EventProcessor.h.

Referenced by beginLumiAsync(), and init().

MergeableRunProductProcesses edm::EventProcessor::mergeableRunProductProcesses_
private

Definition at line 324 of file EventProcessor.h.

Referenced by init(), and readRun().

PathsAndConsumesOfModules edm::EventProcessor::pathsAndConsumesOfModules_
private

Definition at line 323 of file EventProcessor.h.

Referenced by beginJob().

PreallocationConfiguration edm::EventProcessor::preallocations_
private
edm::propagate_const<std::shared_ptr<ProductRegistry> > edm::EventProcessor::preg_
private

Definition at line 311 of file EventProcessor.h.

Referenced by beginJob(), endOfLoop(), init(), and readFile().

PrincipalCache edm::EventProcessor::principalCache_
private
bool edm::EventProcessor::printDependencies_ = false
private

Definition at line 364 of file EventProcessor.h.

Referenced by beginJob(), and init().

std::shared_ptr<ProcessConfiguration const> edm::EventProcessor::processConfiguration_
private

Definition at line 321 of file EventProcessor.h.

Referenced by beginJob(), init(), and readRun().

ProcessContext edm::EventProcessor::processContext_
private
edm::propagate_const<std::unique_ptr<Schedule> > edm::EventProcessor::schedule_
private
ServiceToken edm::EventProcessor::serviceToken_
private
bool edm::EventProcessor::shouldWeStop_
private

Definition at line 345 of file EventProcessor.h.

Referenced by processEventWithLooper(), shouldWeStop(), and startingNewLoop().

std::shared_ptr<std::recursive_mutex> edm::EventProcessor::sourceMutex_
private

Definition at line 342 of file EventProcessor.h.

Referenced by readNextEventForStream().

SharedResourcesAcquirer edm::EventProcessor::sourceResourcesAcquirer_
private

Definition at line 341 of file EventProcessor.h.

Referenced by beginLumiAsync(), and handleNextEventForStreamAsync().

std::atomic<unsigned int> edm::EventProcessor::streamLumiActive_ {0}
private
std::vector<std::shared_ptr<LuminosityBlockProcessingStatus> > edm::EventProcessor::streamLumiStatus_
private
std::vector<edm::SerialTaskQueue> edm::EventProcessor::streamQueues_
private

Definition at line 326 of file EventProcessor.h.

Referenced by beginLumiAsync(), init(), and streamEndLumiAsync().

std::vector<SubProcess> edm::EventProcessor::subProcesses_
private
edm::propagate_const<std::shared_ptr<ThinnedAssociationsHelper> > edm::EventProcessor::thinnedAssociationsHelper_
private

Definition at line 313 of file EventProcessor.h.

Referenced by init().