CMS 3D CMS Logo

List of all members | Public Types | Public Member Functions | Private Types | Private Member Functions | Private Attributes
edm::EventProcessor Class Reference

#include <EventProcessor.h>

Public Types

enum  StatusCode {
  epSuccess = 0, epException = 1, epOther = 2, epSignal = 3,
  epInputComplete = 4, epTimedOut = 5, epCountComplete = 6
}
 

Public Member Functions

void beginJob ()
 
void beginLumiAsync (edm::IOVSyncValue const &iSyncValue, std::shared_ptr< void > const &iRunResource, edm::WaitingTaskHolder iHolder)
 
void beginRun (ProcessHistoryID const &phid, RunNumber_t run, bool &globalBeginSucceeded, bool &eventSetupForInstanceSucceeded)
 
void clearCounters ()
 Clears counters used by trigger report. More...
 
void closeInputFile (bool cleaningUpAfterException)
 
void closeOutputFiles ()
 
void continueLumiAsync (edm::WaitingTaskHolder iHolder)
 
void deleteLumiFromCache (LuminosityBlockProcessingStatus &)
 
void deleteRunFromCache (ProcessHistoryID const &phid, RunNumber_t run)
 
void doErrorStuff ()
 
void enableEndPaths (bool active)
 
void endJob ()
 
bool endOfLoop ()
 
bool endPathsEnabled () const
 
void endRun (ProcessHistoryID const &phid, RunNumber_t run, bool globalBeginSucceeded, bool cleaningUpAfterException)
 
void endUnfinishedLumi ()
 
void endUnfinishedRun (ProcessHistoryID const &phid, RunNumber_t run, bool globalBeginSucceeded, bool cleaningUpAfterException, bool eventSetupForInstanceSucceeded)
 
 EventProcessor (std::unique_ptr< ParameterSet > parameterSet, ServiceToken const &token=ServiceToken(), serviceregistry::ServiceLegacy=serviceregistry::kOverlapIsError, std::vector< std::string > const &defaultServices=std::vector< std::string >(), std::vector< std::string > const &forcedServices=std::vector< std::string >())
 
 EventProcessor (std::unique_ptr< ParameterSet > parameterSet, std::vector< std::string > const &defaultServices, std::vector< std::string > const &forcedServices=std::vector< std::string >())
 
 EventProcessor (std::shared_ptr< ProcessDesc > processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy legacy)
 
 EventProcessor (EventProcessor const &)=delete
 
std::vector< ModuleDescription const * > getAllModuleDescriptions () const
 
ServiceToken getToken ()
 
void getTriggerReport (TriggerReport &rep) const
 
void globalEndLumiAsync (edm::WaitingTaskHolder iTask, std::shared_ptr< LuminosityBlockProcessingStatus > iLumiStatus)
 
void handleEndLumiExceptions (std::exception_ptr const *iPtr, WaitingTaskHolder &holder)
 
InputSource::ItemType lastTransitionType () const
 
edm::LuminosityBlockNumber_t nextLuminosityBlockID ()
 
std::pair< edm::ProcessHistoryID, edm::RunNumber_tnextRunID ()
 
InputSource::ItemType nextTransitionType ()
 
void openOutputFiles ()
 
EventProcessoroperator= (EventProcessor const &)=delete
 
void prepareForNextLoop ()
 
ProcessConfiguration const & processConfiguration () const
 
InputSource::ItemType processLumis (std::shared_ptr< void > const &iRunResource)
 
int readAndMergeLumi (LuminosityBlockProcessingStatus &)
 
std::pair< ProcessHistoryID, RunNumber_treadAndMergeRun ()
 
void readFile ()
 
void readLuminosityBlock (LuminosityBlockProcessingStatus &)
 
std::pair< ProcessHistoryID, RunNumber_treadRun ()
 
void respondToCloseInputFile ()
 
void respondToOpenInputFile ()
 
void rewindInput ()
 
StatusCode run ()
 
StatusCode runToCompletion ()
 
bool setDeferredException (std::exception_ptr)
 
void setExceptionMessageFiles (std::string &message)
 
void setExceptionMessageLumis ()
 
void setExceptionMessageRuns (std::string &message)
 
bool shouldWeCloseOutput () const
 
bool shouldWeStop () const
 
void startingNewLoop ()
 
void streamEndLumiAsync (edm::WaitingTaskHolder iTask, unsigned int iStreamIndex, std::shared_ptr< LuminosityBlockProcessingStatus > iLumiStatus)
 
int totalEvents () const
 
int totalEventsFailed () const
 
int totalEventsPassed () const
 
void writeLumiAsync (WaitingTaskHolder, LuminosityBlockPrincipal &lumiPrincipal)
 
void writeRunAsync (WaitingTaskHolder, ProcessHistoryID const &phid, RunNumber_t run, MergeableRunProductMetadata const *)
 
 ~EventProcessor ()
 

Private Types

typedef std::set< std::pair< std::string, std::string > > ExcludedData
 
typedef std::map< std::string, ExcludedDataExcludedDataMap
 

Private Member Functions

std::shared_ptr< BranchIDListHelper const > branchIDListHelper () const
 
std::shared_ptr< BranchIDListHelper > & branchIDListHelper ()
 
bool checkForAsyncStopRequest (StatusCode &)
 
void handleNextEventForStreamAsync (WaitingTaskHolder iTask, unsigned int iStreamIndex)
 
void init (std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
 
std::shared_ptr< EDLooperBase const > looper () const
 
std::shared_ptr< EDLooperBase > & looper ()
 
std::shared_ptr< ProductRegistry const > preg () const
 
std::shared_ptr< ProductRegistry > & preg ()
 
void processEventAsync (WaitingTaskHolder iHolder, unsigned int iStreamIndex)
 
void processEventAsyncImpl (WaitingTaskHolder iHolder, unsigned int iStreamIndex)
 
void processEventWithLooper (EventPrincipal &, unsigned int iStreamIndex)
 
void readEvent (unsigned int iStreamIndex)
 
bool readNextEventForStream (unsigned int iStreamIndex, LuminosityBlockProcessingStatus &iLumiStatus)
 
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper () const
 
std::shared_ptr< ThinnedAssociationsHelper > & thinnedAssociationsHelper ()
 
void warnAboutModulesRequiringLuminosityBLockSynchronization () const
 

Private Attributes

std::unique_ptr< ExceptionToActionTable const > act_table_
 
std::shared_ptr< ActivityRegistryactReg_
 
bool asyncStopRequestedWhileProcessingEvents_
 
StatusCode asyncStopStatusCodeFromProcessingEvents_
 
bool beginJobCalled_
 
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
 
std::exception_ptr deferredExceptionPtr_
 
std::atomic< bool > deferredExceptionPtrIsSet_
 
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
 
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
 
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
 
std::string exceptionMessageFiles_
 
std::atomic< bool > exceptionMessageLumis_
 
std::string exceptionMessageRuns_
 
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
 
bool fileModeNoMerge_
 
bool firstEventInBlock_ = true
 
bool forceESCacheClearOnNewRun_
 
bool forceLooperToEnd_
 
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
 
edm::propagate_const< std::unique_ptr< InputSource > > input_
 
InputSource::ItemType lastSourceTransition_
 
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
 
bool looperBeginJobRun_
 
std::unique_ptr< edm::LimitedTaskQueuelumiQueue_
 
MergeableRunProductProcesses mergeableRunProductProcesses_
 
PathsAndConsumesOfModules pathsAndConsumesOfModules_
 
PreallocationConfiguration preallocations_
 
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
 
PrincipalCache principalCache_
 
bool printDependencies_ = false
 
std::shared_ptr< ProcessConfiguration const > processConfiguration_
 
ProcessContext processContext_
 
edm::SerialTaskQueue queueWhichWaitsForIOVsToFinish_
 
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
 
ServiceToken serviceToken_
 
bool shouldWeStop_
 
std::shared_ptr< std::recursive_mutex > sourceMutex_
 
SharedResourcesAcquirer sourceResourcesAcquirer_
 
std::atomic< unsigned int > streamLumiActive_ {0}
 
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
 
std::vector< edm::SerialTaskQueuestreamQueues_
 
std::vector< SubProcesssubProcesses_
 
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
 

Detailed Description

Definition at line 64 of file EventProcessor.h.

Member Typedef Documentation

typedef std::set<std::pair<std::string, std::string> > edm::EventProcessor::ExcludedData
private

Definition at line 360 of file EventProcessor.h.

typedef std::map<std::string, ExcludedData> edm::EventProcessor::ExcludedDataMap
private

Definition at line 361 of file EventProcessor.h.

Member Enumeration Documentation

Enumerator
epSuccess 
epException 
epOther 
epSignal 
epInputComplete 
epTimedOut 
epCountComplete 

Definition at line 74 of file EventProcessor.h.

Constructor & Destructor Documentation

edm::EventProcessor::EventProcessor ( std::unique_ptr< ParameterSet parameterSet,
ServiceToken const &  token = ServiceToken(),
serviceregistry::ServiceLegacy  iLegacy = serviceregistry::kOverlapIsError,
std::vector< std::string > const &  defaultServices = std::vector<std::string>(),
std::vector< std::string > const &  forcedServices = std::vector<std::string>() 
)
explicit

Definition at line 214 of file EventProcessor.cc.

References init(), and eostools::move().

219  : actReg_(),
220  preg_(),
222  serviceToken_(),
223  input_(),
224  espController_(new eventsetup::EventSetupsController),
225  esp_(),
226  act_table_(),
228  schedule_(),
229  subProcesses_(),
230  historyAppender_(new HistoryAppender),
231  fb_(),
232  looper_(),
234  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
235  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
236  principalCache_(),
237  beginJobCalled_(false),
238  shouldWeStop_(false),
239  fileModeNoMerge_(false),
242  exceptionMessageLumis_(false),
243  forceLooperToEnd_(false),
244  looperBeginJobRun_(false),
247  auto processDesc = std::make_shared<ProcessDesc>(std::move(parameterSet));
248  processDesc->addServices(defaultServices, forcedServices);
249  init(processDesc, iToken, iLegacy);
250  }
std::atomic< bool > exceptionMessageLumis_
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
SharedResourcesAcquirer sourceResourcesAcquirer_
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
std::string exceptionMessageRuns_
std::vector< SubProcess > subProcesses_
U second(std::pair< T, U > const &p)
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
static SharedResourcesRegistry * instance()
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
std::string exceptionMessageFiles_
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
T first(std::pair< T, U > const &p)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
ParameterSet const & parameterSet(Provenance const &provenance)
Definition: Provenance.cc:11
def move(src, dest)
Definition: eostools.py:511
PrincipalCache principalCache_
edm::EventProcessor::EventProcessor ( std::unique_ptr< ParameterSet parameterSet,
std::vector< std::string > const &  defaultServices,
std::vector< std::string > const &  forcedServices = std::vector<std::string>() 
)

Definition at line 252 of file EventProcessor.cc.

References init(), edm::serviceregistry::kOverlapIsError, and eostools::move().

255  : actReg_(),
256  preg_(),
258  serviceToken_(),
259  input_(),
260  espController_(new eventsetup::EventSetupsController),
261  esp_(),
262  act_table_(),
264  schedule_(),
265  subProcesses_(),
266  historyAppender_(new HistoryAppender),
267  fb_(),
268  looper_(),
270  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
271  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
272  principalCache_(),
273  beginJobCalled_(false),
274  shouldWeStop_(false),
275  fileModeNoMerge_(false),
278  exceptionMessageLumis_(false),
279  forceLooperToEnd_(false),
280  looperBeginJobRun_(false),
284  auto processDesc = std::make_shared<ProcessDesc>(std::move(parameterSet));
285  processDesc->addServices(defaultServices, forcedServices);
287  }
std::atomic< bool > exceptionMessageLumis_
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
SharedResourcesAcquirer sourceResourcesAcquirer_
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
std::string exceptionMessageRuns_
std::vector< SubProcess > subProcesses_
U second(std::pair< T, U > const &p)
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
static SharedResourcesRegistry * instance()
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
std::string exceptionMessageFiles_
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
bool asyncStopRequestedWhileProcessingEvents_
std::shared_ptr< ActivityRegistry > actReg_
T first(std::pair< T, U > const &p)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
ParameterSet const & parameterSet(Provenance const &provenance)
Definition: Provenance.cc:11
def move(src, dest)
Definition: eostools.py:511
PrincipalCache principalCache_
edm::EventProcessor::EventProcessor ( std::shared_ptr< ProcessDesc processDesc,
ServiceToken const &  token,
serviceregistry::ServiceLegacy  legacy 
)

Definition at line 289 of file EventProcessor.cc.

References init().

292  : actReg_(),
293  preg_(),
295  serviceToken_(),
296  input_(),
297  espController_(new eventsetup::EventSetupsController),
298  esp_(),
299  act_table_(),
301  schedule_(),
302  subProcesses_(),
303  historyAppender_(new HistoryAppender),
304  fb_(),
305  looper_(),
307  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
308  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
309  principalCache_(),
310  beginJobCalled_(false),
311  shouldWeStop_(false),
312  fileModeNoMerge_(false),
315  exceptionMessageLumis_(false),
316  forceLooperToEnd_(false),
317  looperBeginJobRun_(false),
321  init(processDesc, token, legacy);
322  }
std::atomic< bool > exceptionMessageLumis_
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
SharedResourcesAcquirer sourceResourcesAcquirer_
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
std::string exceptionMessageRuns_
std::vector< SubProcess > subProcesses_
U second(std::pair< T, U > const &p)
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
static SharedResourcesRegistry * instance()
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
std::string exceptionMessageFiles_
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
bool asyncStopRequestedWhileProcessingEvents_
std::shared_ptr< ActivityRegistry > actReg_
T first(std::pair< T, U > const &p)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
PrincipalCache principalCache_
edm::EventProcessor::~EventProcessor ( )

Definition at line 503 of file EventProcessor.cc.

References actReg_, edm::ParentageRegistry::clear(), edm::pset::Registry::clear(), esp_, espController_, getToken(), input_, edm::ParentageRegistry::instance(), edm::pset::Registry::instance(), looper_, schedule_, and unpackBuffers-CaloStage2::token.

503  {
504  // Make the services available while everything is being deleted.
506  ServiceRegistry::Operate op(token);
507 
508  // manually destroy all these thing that may need the services around
509  // propagate_const<T> has no reset() function
510  espController_ = nullptr;
511  esp_ = nullptr;
512  schedule_ = nullptr;
513  input_ = nullptr;
514  looper_ = nullptr;
515  actReg_ = nullptr;
516 
519  }
void clear()
Not thread safe.
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
void clear()
Not thread safe.
Definition: Registry.cc:40
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
ServiceToken getToken()
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
static ParentageRegistry * instance()
static Registry * instance()
Definition: Registry.cc:12
edm::EventProcessor::EventProcessor ( EventProcessor const &  )
delete

Member Function Documentation

void edm::EventProcessor::beginJob ( void  )

This should be called before the first call to 'run' If this is not called in time, it will automatically be called the first time 'run' is called

Definition at line 521 of file EventProcessor.cc.

References actReg_, cms::Exception::addContext(), bk::beginJob(), beginJobCalled_, edm::checkForModuleDependencyCorrectness(), esp_, espController_, edm::for_all(), mps_fire::i, edm::PathsAndConsumesOfModules::initialize(), input_, edm::PreallocationConfiguration::numberOfLuminosityBlocks(), edm::PreallocationConfiguration::numberOfRuns(), edm::PreallocationConfiguration::numberOfStreams(), edm::PreallocationConfiguration::numberOfThreads(), pathsAndConsumesOfModules_, preallocations_, preg(), preg_, printDependencies_, processConfiguration_, processContext_, schedule_, serviceToken_, subProcesses_, warnAboutModulesRequiringLuminosityBLockSynchronization(), and edm::convertException::wrap().

Referenced by runToCompletion().

521  {
522  if (beginJobCalled_)
523  return;
524  beginJobCalled_ = true;
525  bk::beginJob();
526 
527  // StateSentry toerror(this); // should we add this ?
528  //make the services available
530 
531  service::SystemBounds bounds(preallocations_.numberOfStreams(),
535  actReg_->preallocateSignal_(bounds);
536  schedule_->convertCurrentProcessAlias(processConfiguration_->processName());
538 
539  //NOTE: this may throw
541  actReg_->preBeginJobSignal_(pathsAndConsumesOfModules_, processContext_);
542 
545  }
546  //NOTE: This implementation assumes 'Job' means one call
547  // the EventProcessor::run
548  // If it really means once per 'application' then this code will
549  // have to be changed.
550  // Also have to deal with case where have 'run' then new Module
551  // added and do 'run'
552  // again. In that case the newly added Module needs its 'beginJob'
553  // to be called.
554 
555  //NOTE: in future we should have a beginOfJob for looper that takes no arguments
556  // For now we delay calling beginOfJob until first beginOfRun
557  //if(looper_) {
558  // looper_->beginOfJob(es);
559  //}
560  try {
561  convertException::wrap([&]() { input_->doBeginJob(); });
562  } catch (cms::Exception& ex) {
563  ex.addContext("Calling beginJob for the source");
564  throw;
565  }
566  espController_->finishConfiguration();
567  schedule_->beginJob(*preg_, esp_->recordsToProxyIndices());
568  // toerror.succeeded(); // should we add this?
569  for_all(subProcesses_, [](auto& subProcess) { subProcess.doBeginJob(); });
570  actReg_->postBeginJobSignal_();
571 
572  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
573  schedule_->beginStream(i);
574  for_all(subProcesses_, [i](auto& subProcess) { subProcess.doBeginStream(i); });
575  }
576  }
ProcessContext processContext_
edm::propagate_const< std::unique_ptr< InputSource > > input_
void warnAboutModulesRequiringLuminosityBLockSynchronization() const
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
PreallocationConfiguration preallocations_
std::vector< SubProcess > subProcesses_
void beginJob()
Definition: Breakpoints.cc:14
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
void initialize(Schedule const *, std::shared_ptr< ProductRegistry const >)
ServiceToken serviceToken_
std::shared_ptr< ProductRegistry const > preg() const
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
void checkForModuleDependencyCorrectness(edm::PathsAndConsumesOfModulesBase const &iPnC, bool iPrintDependencies)
void addContext(std::string const &context)
Definition: Exception.cc:165
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
PathsAndConsumesOfModules pathsAndConsumesOfModules_
auto wrap(F iFunc) -> decltype(iFunc())
std::shared_ptr< ActivityRegistry > actReg_
void edm::EventProcessor::beginLumiAsync ( edm::IOVSyncValue const &  iSyncValue,
std::shared_ptr< void > const &  iRunResource,
edm::WaitingTaskHolder  iHolder 
)

Definition at line 1060 of file EventProcessor.cc.

References actReg_, edm::LuminosityBlockPrincipal::beginTime(), edm::WaitingTaskHolder::doneWaiting(), esp_, espController_, edm::PrincipalCache::eventPrincipal(), h, handleNextEventForStreamAsync(), mps_fire::i, input_, edm::Service< T >::isAvailable(), looper_, edm::LuminosityBlockPrincipal::luminosityBlock(), lumiQueue_, edm::make_waiting_task(), eostools::move(), edm::PreallocationConfiguration::numberOfStreams(), edm::SerialTaskQueue::pause(), preallocations_, principalCache_, processContext_, edm::SerialTaskQueueChain::push(), edm::SerialTaskQueue::push(), queueWhichWaitsForIOVsToFinish_, readLuminosityBlock(), edm::LuminosityBlockPrincipal::run(), schedule_, edm::SharedResourcesAcquirer::serialQueueChain(), serviceToken_, sourceResourcesAcquirer_, mps_update::status, streamEndLumiAsync(), streamLumiActive_, streamLumiStatus_, streamQueues_, subProcesses_, edm::WaitingTaskHolder::taskHasFailed(), and createJobs::tmp.

Referenced by handleNextEventForStreamAsync(), and processLumis().

1062  {
1063  if (iHolder.taskHasFailed()) {
1064  return;
1065  }
1066 
1067  // We must be careful with the status object here and in code this function calls. IF we want
1068  // endRun to be called, then we must call resetResources before the things waiting on
1069  // iHolder are allowed to proceed. Otherwise, there will be race condition (possibly causing
1070  // endRun to be called much later than it should be, because status is holding iRunResource).
1071  // Note that this must be done explicitly. Relying on the destructor does not work well
1072  // because the LimitedTaskQueue for the lumiWork holds the shared_ptr in each of its internal
1073  // queues, plus it is difficult to guarantee the destructor is called before iHolder gets
1074  // destroyed inside this function and lumiWork.
1075  auto status =
1076  std::make_shared<LuminosityBlockProcessingStatus>(this, preallocations_.numberOfStreams(), iRunResource);
1077 
1078  auto lumiWork = [this, iHolder, status](edm::LimitedTaskQueue::Resumer iResumer) mutable {
1079  if (iHolder.taskHasFailed()) {
1080  status->resetResources();
1081  return;
1082  }
1083 
1084  status->setResumer(std::move(iResumer));
1085 
1086  sourceResourcesAcquirer_.serialQueueChain().push([this, iHolder, status = std::move(status)]() mutable {
1087  //make the services available
1089 
1090  try {
1092 
1093  LuminosityBlockPrincipal& lumiPrincipal = *status->lumiPrincipal();
1094  {
1095  SendSourceTerminationSignalIfException sentry(actReg_.get());
1096 
1097  input_->doBeginLumi(lumiPrincipal, &processContext_);
1098  sentry.completedSuccessfully();
1099  }
1100 
1102  if (rng.isAvailable()) {
1103  LuminosityBlock lb(lumiPrincipal, ModuleDescription(), nullptr, false);
1104  rng->preBeginLumi(lb);
1105  }
1106 
1107  IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), 0), lumiPrincipal.beginTime());
1108 
1109  //Task to start the stream beginLumis
1110  auto beginStreamsTask = make_waiting_task(
1111  tbb::task::allocate_root(), [this, holder = iHolder, status, ts](std::exception_ptr const* iPtr) mutable {
1112  if (iPtr) {
1113  status->resetResources();
1114  holder.doneWaiting(*iPtr);
1115  } else {
1116  status->globalBeginDidSucceed();
1117  EventSetupImpl const& es = status->eventSetupImpl(esp_->subProcessIndex());
1118 
1119  if (looper_) {
1120  try {
1121  //make the services available
1122  ServiceRegistry::Operate operateLooper(serviceToken_);
1123  looper_->doBeginLuminosityBlock(*(status->lumiPrincipal()), es, &processContext_);
1124  } catch (...) {
1125  status->resetResources();
1126  holder.doneWaiting(std::current_exception());
1127  return;
1128  }
1129  }
1130  typedef OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamBegin> Traits;
1131 
1132  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1133  streamQueues_[i].push([this, i, status, holder, ts, &es]() mutable {
1134  streamQueues_[i].pause();
1135 
1136  auto eventTask = edm::make_waiting_task(
1137  tbb::task::allocate_root(),
1138  [this, i, h = holder](std::exception_ptr const* exceptionFromBeginStreamLumi) mutable {
1139  if (exceptionFromBeginStreamLumi) {
1141  tmp.doneWaiting(*exceptionFromBeginStreamLumi);
1143  } else {
1145  }
1146  });
1147  auto& event = principalCache_.eventPrincipal(i);
1150  auto lp = status->lumiPrincipal();
1151  event.setLuminosityBlockPrincipal(lp.get());
1152  beginStreamTransitionAsync<Traits>(WaitingTaskHolder{eventTask},
1153  *schedule_,
1154  i,
1155  *lp,
1156  ts,
1157  es,
1158  &status->eventSetupImpls(),
1159  serviceToken_,
1160  subProcesses_);
1161  });
1162  }
1163  }
1164  }); // beginStreamTask
1165 
1166  //task to start the global begin lumi
1167  WaitingTaskHolder beginStreamsHolder{beginStreamsTask};
1168 
1169  EventSetupImpl const& es = status->eventSetupImpl(esp_->subProcessIndex());
1170  {
1171  typedef OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalBegin> Traits;
1172  beginGlobalTransitionAsync<Traits>(beginStreamsHolder,
1173  *schedule_,
1174  lumiPrincipal,
1175  ts,
1176  es,
1177  &status->eventSetupImpls(),
1178  serviceToken_,
1179  subProcesses_);
1180  }
1181  } catch (...) {
1182  status->resetResources();
1183  iHolder.doneWaiting(std::current_exception());
1184  }
1185  }); // task in sourceResourcesAcquirer
1186  }; // end lumiWork
1187 
1188  auto queueLumiWorkTask = make_waiting_task(
1189  tbb::task::allocate_root(),
1190  [this, lumiWorkLambda = std::move(lumiWork), iHolder](std::exception_ptr const* iPtr) mutable {
1191  if (iPtr) {
1192  iHolder.doneWaiting(*iPtr);
1193  }
1194  lumiQueue_->pushAndPause(std::move(lumiWorkLambda));
1195  });
1196 
1197  if (espController_->doWeNeedToWaitForIOVsToFinish(iSync)) {
1198  // We only get here inside this block if there is an EventSetup
1199  // module not able to handle concurrent IOVs (usually an ESSource)
1200  // and the new sync value is outside the current IOV of that module.
1201 
1202  WaitingTaskHolder queueLumiWorkTaskHolder{queueLumiWorkTask};
1203 
1204  queueWhichWaitsForIOVsToFinish_.push([this, queueLumiWorkTaskHolder, iSync, status]() mutable {
1205  try {
1206  SendSourceTerminationSignalIfException sentry(actReg_.get());
1207  // Pass in iSync to let the EventSetup system know which run and lumi
1208  // need to be processed and prepare IOVs for it.
1209  // Pass in the endIOVWaitingTasks so the lumi can notify them when the
1210  // lumi is done and no longer needs its EventSetup IOVs.
1211  espController_->eventSetupForInstance(
1212  iSync, queueLumiWorkTaskHolder, status->endIOVWaitingTasks(), status->eventSetupImpls());
1213  sentry.completedSuccessfully();
1214  } catch (...) {
1215  queueLumiWorkTaskHolder.doneWaiting(std::current_exception());
1216  }
1218  });
1219 
1220  } else {
1222 
1223  // This holder will be used to wait until the EventSetup IOVs are ready
1224  WaitingTaskHolder queueLumiWorkTaskHolder{queueLumiWorkTask};
1225 
1226  try {
1227  SendSourceTerminationSignalIfException sentry(actReg_.get());
1228 
1229  // Pass in iSync to let the EventSetup system know which run and lumi
1230  // need to be processed and prepare IOVs for it.
1231  // Pass in the endIOVWaitingTasks so the lumi can notify them when the
1232  // lumi is done and no longer needs its EventSetup IOVs.
1233  espController_->eventSetupForInstance(
1234  iSync, queueLumiWorkTaskHolder, status->endIOVWaitingTasks(), status->eventSetupImpls());
1235  sentry.completedSuccessfully();
1236 
1237  } catch (...) {
1238  queueLumiWorkTaskHolder.doneWaiting(std::current_exception());
1239  }
1240  }
1241  }
void readLuminosityBlock(LuminosityBlockProcessingStatus &)
FWCore Framework interface EventSetupRecordImplementation h
Helper function to determine trigger accepts.
ProcessContext processContext_
SharedResourcesAcquirer sourceResourcesAcquirer_
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< edm::LimitedTaskQueue > lumiQueue_
void streamEndLumiAsync(edm::WaitingTaskHolder iTask, unsigned int iStreamIndex, std::shared_ptr< LuminosityBlockProcessingStatus > iLumiStatus)
void push(T &&iAction)
asynchronously pushes functor iAction into queue
PreallocationConfiguration preallocations_
std::vector< SubProcess > subProcesses_
edm::SerialTaskQueue queueWhichWaitsForIOVsToFinish_
void handleNextEventForStreamAsync(WaitingTaskHolder iTask, unsigned int iStreamIndex)
ServiceToken serviceToken_
void doneWaiting(std::exception_ptr iExcept)
std::vector< edm::SerialTaskQueue > streamQueues_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
SerialTaskQueueChain & serialQueueChain() const
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:87
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
void push(const T &iAction)
asynchronously pushes functor iAction into queue
std::shared_ptr< ActivityRegistry > actReg_
std::atomic< unsigned int > streamLumiActive_
tmp
align.sh
Definition: createJobs.py:716
bool pause()
Pauses processing of additional tasks from the queue.
def move(src, dest)
Definition: eostools.py:511
PrincipalCache principalCache_
void edm::EventProcessor::beginRun ( ProcessHistoryID const &  phid,
RunNumber_t  run,
bool &  globalBeginSucceeded,
bool &  eventSetupForInstanceSucceeded 
)

Definition at line 864 of file EventProcessor.cc.

References actReg_, edm::RunPrincipal::beginTime(), esp_, espController_, FDEBUG, forceESCacheClearOnNewRun_, input_, looper_, looperBeginJobRun_, edm::make_empty_waiting_task(), edm::PreallocationConfiguration::numberOfStreams(), preallocations_, principalCache_, processContext_, edm::RunPrincipal::run(), edm::PrincipalCache::runPrincipal(), schedule_, serviceToken_, and subProcesses_.

867  {
868  globalBeginSucceeded = false;
869  RunPrincipal& runPrincipal = principalCache_.runPrincipal(phid, run);
870  {
871  SendSourceTerminationSignalIfException sentry(actReg_.get());
872 
873  input_->doBeginRun(runPrincipal, &processContext_);
874  sentry.completedSuccessfully();
875  }
876 
877  IOVSyncValue ts(EventID(runPrincipal.run(), 0, 0), runPrincipal.beginTime());
879  espController_->forceCacheClear();
880  }
881  {
882  SendSourceTerminationSignalIfException sentry(actReg_.get());
883  espController_->eventSetupForInstance(ts);
884  eventSetupForInstanceSucceeded = true;
885  sentry.completedSuccessfully();
886  }
887  auto const& es = esp_->eventSetupImpl();
888  if (looper_ && looperBeginJobRun_ == false) {
889  looper_->copyInfo(ScheduleInfo(schedule_.get()));
890  looper_->beginOfJob(es);
891  looperBeginJobRun_ = true;
892  looper_->doStartingNewLoop();
893  }
894  {
895  typedef OccurrenceTraits<RunPrincipal, BranchActionGlobalBegin> Traits;
896  auto globalWaitTask = make_empty_waiting_task();
897  globalWaitTask->increment_ref_count();
898  beginGlobalTransitionAsync<Traits>(WaitingTaskHolder(globalWaitTask.get()),
899  *schedule_,
900  runPrincipal,
901  ts,
902  es,
903  nullptr,
905  subProcesses_);
906  globalWaitTask->wait_for_all();
907  if (globalWaitTask->exceptionPtr() != nullptr) {
908  std::rethrow_exception(*(globalWaitTask->exceptionPtr()));
909  }
910  }
911  globalBeginSucceeded = true;
912  FDEBUG(1) << "\tbeginRun " << run << "\n";
913  if (looper_) {
914  looper_->doBeginRun(runPrincipal, es, &processContext_);
915  }
916  {
917  //To wait, the ref count has to be 1+#streams
918  auto streamLoopWaitTask = make_empty_waiting_task();
919  streamLoopWaitTask->increment_ref_count();
920 
921  typedef OccurrenceTraits<RunPrincipal, BranchActionStreamBegin> Traits;
922 
923  beginStreamsTransitionAsync<Traits>(streamLoopWaitTask.get(),
924  *schedule_,
926  runPrincipal,
927  ts,
928  es,
929  nullptr,
931  subProcesses_);
932 
933  streamLoopWaitTask->wait_for_all();
934  if (streamLoopWaitTask->exceptionPtr() != nullptr) {
935  std::rethrow_exception(*(streamLoopWaitTask->exceptionPtr()));
936  }
937  }
938  FDEBUG(1) << "\tstreamBeginRun " << run << "\n";
939  if (looper_) {
940  //looper_->doStreamBeginRun(schedule_->streamID(),runPrincipal, es);
941  }
942  }
ProcessContext processContext_
edm::propagate_const< std::unique_ptr< InputSource > > input_
PreallocationConfiguration preallocations_
std::vector< SubProcess > subProcesses_
ServiceToken serviceToken_
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
#define FDEBUG(lev)
Definition: DebugMacros.h:19
PrincipalCache principalCache_
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const
std::shared_ptr<BranchIDListHelper const> edm::EventProcessor::branchIDListHelper ( ) const
inlineprivate

Definition at line 289 of file EventProcessor.h.

References edm::get_underlying_safe().

Referenced by init().

289  {
291  }
std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
std::shared_ptr<BranchIDListHelper>& edm::EventProcessor::branchIDListHelper ( )
inlineprivate

Definition at line 292 of file EventProcessor.h.

References edm::get_underlying_safe().

std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
bool edm::EventProcessor::checkForAsyncStopRequest ( StatusCode returnCode)
private

Definition at line 633 of file EventProcessor.cc.

References epSignal, and edm::shutdown_flag.

Referenced by nextTransitionType().

633  {
634  bool returnValue = false;
635 
636  // Look for a shutdown signal
637  if (shutdown_flag.load(std::memory_order_acquire)) {
638  returnValue = true;
640  }
641  return returnValue;
642  }
volatile std::atomic< bool > shutdown_flag
void edm::EventProcessor::clearCounters ( )

Clears counters used by trigger report.

Definition at line 627 of file EventProcessor.cc.

References schedule_.

627 { schedule_->clearCounters(); }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
void edm::EventProcessor::closeInputFile ( bool  cleaningUpAfterException)

Definition at line 763 of file EventProcessor.cc.

References actReg_, fb_, FDEBUG, and input_.

763  {
764  if (fb_.get() != nullptr) {
765  SendSourceTerminationSignalIfException sentry(actReg_.get());
766  input_->closeFile(fb_.get(), cleaningUpAfterException);
767  sentry.completedSuccessfully();
768  }
769  FDEBUG(1) << "\tcloseInputFile\n";
770  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
std::shared_ptr< ActivityRegistry > actReg_
#define FDEBUG(lev)
Definition: DebugMacros.h:19
void edm::EventProcessor::closeOutputFiles ( )

Definition at line 780 of file EventProcessor.cc.

References fb_, FDEBUG, edm::for_all(), schedule_, and subProcesses_.

780  {
781  if (fb_.get() != nullptr) {
782  schedule_->closeOutputFiles();
783  for_all(subProcesses_, [](auto& subProcess) { subProcess.closeOutputFiles(); });
784  }
785  FDEBUG(1) << "\tcloseOutputFiles\n";
786  }
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
#define FDEBUG(lev)
Definition: DebugMacros.h:19
void edm::EventProcessor::continueLumiAsync ( edm::WaitingTaskHolder  iHolder)

Definition at line 1243 of file EventProcessor.cc.

References h, handleNextEventForStreamAsync(), edm::make_functor_task(), eostools::move(), edm::PreallocationConfiguration::numberOfStreams(), preallocations_, mps_update::status, and streamLumiStatus_.

Referenced by processLumis().

1243  {
1244  {
1245  //all streams are sharing the same status at the moment
1246  auto status = streamLumiStatus_[0]; //read from streamLumiActive_ happened in calling routine
1247  status->needToContinueLumi();
1248  status->startProcessingEvents();
1249  }
1250 
1251  unsigned int streamIndex = 0;
1252  for (; streamIndex < preallocations_.numberOfStreams() - 1; ++streamIndex) {
1253  tbb::task::enqueue(*edm::make_functor_task(tbb::task::allocate_root(), [this, streamIndex, h = iHolder]() {
1254  handleNextEventForStreamAsync(std::move(h), streamIndex);
1255  }));
1256  }
1257  tbb::task::spawn(*edm::make_functor_task(tbb::task::allocate_root(), [this, streamIndex, h = std::move(iHolder)]() {
1258  handleNextEventForStreamAsync(std::move(h), streamIndex);
1259  }));
1260  }
FWCore Framework interface EventSetupRecordImplementation h
Helper function to determine trigger accepts.
FunctorTask< F > * make_functor_task(ALLOC &&iAlloc, F f)
Definition: FunctorTask.h:47
PreallocationConfiguration preallocations_
void handleNextEventForStreamAsync(WaitingTaskHolder iTask, unsigned int iStreamIndex)
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
def move(src, dest)
Definition: eostools.py:511
void edm::EventProcessor::deleteLumiFromCache ( LuminosityBlockProcessingStatus iStatus)

Definition at line 1553 of file EventProcessor.cc.

References edm::LuminosityBlockProcessingStatus::lumiPrincipal(), alignCSCRings::s, and subProcesses_.

Referenced by globalEndLumiAsync().

1553  {
1554  for (auto& s : subProcesses_) {
1555  s.deleteLumiFromCache(*iStatus.lumiPrincipal());
1556  }
1557  iStatus.lumiPrincipal()->clearPrincipal();
1558  //FDEBUG(1) << "\tdeleteLumiFromCache " << run << "/" << lumi << "\n";
1559  }
std::vector< SubProcess > subProcesses_
void edm::EventProcessor::deleteRunFromCache ( ProcessHistoryID const &  phid,
RunNumber_t  run 
)

Definition at line 1528 of file EventProcessor.cc.

References edm::PrincipalCache::deleteRun(), FDEBUG, edm::for_all(), principalCache_, and subProcesses_.

Referenced by endUnfinishedRun().

1528  {
1529  principalCache_.deleteRun(phid, run);
1530  for_all(subProcesses_, [run, phid](auto& subProcess) { subProcess.deleteRunFromCache(phid, run); });
1531  FDEBUG(1) << "\tdeleteRunFromCache " << run << "\n";
1532  }
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
void deleteRun(ProcessHistoryID const &phid, RunNumber_t run)
#define FDEBUG(lev)
Definition: DebugMacros.h:19
PrincipalCache principalCache_
void edm::EventProcessor::doErrorStuff ( )

Definition at line 855 of file EventProcessor.cc.

References FDEBUG.

Referenced by runToCompletion().

855  {
856  FDEBUG(1) << "\tdoErrorStuff\n";
857  LogError("StateMachine") << "The EventProcessor state machine encountered an unexpected event\n"
858  << "and went to the error state\n"
859  << "Will attempt to terminate processing normally\n"
860  << "(IF using the looper the next loop will be attempted)\n"
861  << "This likely indicates a bug in an input module or corrupted input or both\n";
862  }
#define FDEBUG(lev)
Definition: DebugMacros.h:19
void edm::EventProcessor::enableEndPaths ( bool  active)

Turn end_paths "off" if "active" is false; turn end_paths "on" if "active" is true.

Definition at line 621 of file EventProcessor.cc.

References schedule_.

621 { schedule_->enableEndPaths(active); }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
void edm::EventProcessor::endJob ( void  )

This should be called before the EventProcessor is destroyed throws if any module's endJob throws an exception.

Definition at line 578 of file EventProcessor.cc.

References actReg_, HltBtagPostValidation_cff::c, edm::ExceptionCollector::call(), edm::SubProcess::doEndJob(), edm::InputSource::doEndJob(), edm::EDLooperBase::endOfJob(), edm::ExceptionCollector::hasThrown(), mps_fire::i, input_, looper(), looper_, edm::PreallocationConfiguration::numberOfStreams(), preallocations_, edm::ExceptionCollector::rethrow(), schedule_, serviceToken_, and subProcesses_.

Referenced by PythonEventProcessor::~PythonEventProcessor().

578  {
579  // Collects exceptions, so we don't throw before all operations are performed.
580  ExceptionCollector c(
581  "Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
582 
583  //make the services available
585 
586  //NOTE: this really should go elsewhere in the future
587  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
588  c.call([this, i]() { this->schedule_->endStream(i); });
589  for (auto& subProcess : subProcesses_) {
590  c.call([&subProcess, i]() { subProcess.doEndStream(i); });
591  }
592  }
593  auto actReg = actReg_.get();
594  c.call([actReg]() { actReg->preEndJobSignal_(); });
595  schedule_->endJob(c);
596  for (auto& subProcess : subProcesses_) {
597  c.call(std::bind(&SubProcess::doEndJob, &subProcess));
598  }
599  c.call(std::bind(&InputSource::doEndJob, input_.get()));
600  if (looper_) {
601  c.call(std::bind(&EDLooperBase::endOfJob, looper()));
602  }
603  c.call([actReg]() { actReg->postEndJobSignal_(); });
604  if (c.hasThrown()) {
605  c.rethrow();
606  }
607  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
void doEndJob()
Called by framework at end of job.
Definition: InputSource.cc:207
PreallocationConfiguration preallocations_
std::vector< SubProcess > subProcesses_
ServiceToken serviceToken_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
virtual void endOfJob()
Definition: EDLooperBase.cc:90
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ActivityRegistry > actReg_
std::shared_ptr< EDLooperBase const > looper() const
bool edm::EventProcessor::endOfLoop ( )

Definition at line 816 of file EventProcessor.cc.

References esp_, FDEBUG, forceLooperToEnd_, edm::EDLooperBase::kContinue, looper_, preg_, schedule_, and mps_update::status.

Referenced by runToCompletion().

816  {
817  if (looper_) {
818  ModuleChanger changer(schedule_.get(), preg_.get(), esp_->recordsToProxyIndices());
819  looper_->setModuleChanger(&changer);
820  EDLooperBase::Status status = looper_->doEndOfLoop(esp_->eventSetupImpl());
821  looper_->setModuleChanger(nullptr);
823  return true;
824  else
825  return false;
826  }
827  FDEBUG(1) << "\tendOfLoop\n";
828  return true;
829  }
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
#define FDEBUG(lev)
Definition: DebugMacros.h:19
bool edm::EventProcessor::endPathsEnabled ( ) const

Return true if end_paths are active, and false if they are inactive.

Definition at line 623 of file EventProcessor.cc.

References schedule_.

623 { return schedule_->endPathsEnabled(); }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
void edm::EventProcessor::endRun ( ProcessHistoryID const &  phid,
RunNumber_t  run,
bool  globalBeginSucceeded,
bool  cleaningUpAfterException 
)

Definition at line 970 of file EventProcessor.cc.

References actReg_, edm::RunPrincipal::endTime(), esp_, espController_, FDEBUG, input_, looper_, edm::make_empty_waiting_task(), edm::EventID::maxEventNumber(), edm::LuminosityBlockID::maxLuminosityBlockNumber(), edm::PreallocationConfiguration::numberOfStreams(), preallocations_, principalCache_, processContext_, edm::RunPrincipal::run(), edm::PrincipalCache::runPrincipal(), schedule_, serviceToken_, edm::RunPrincipal::setEndTime(), and subProcesses_.

Referenced by endUnfinishedRun().

973  {
974  RunPrincipal& runPrincipal = principalCache_.runPrincipal(phid, run);
975  runPrincipal.setEndTime(input_->timestamp());
976 
977  IOVSyncValue ts(
979  runPrincipal.endTime());
980  {
981  SendSourceTerminationSignalIfException sentry(actReg_.get());
982  espController_->eventSetupForInstance(ts);
983  sentry.completedSuccessfully();
984  }
985  auto const& es = esp_->eventSetupImpl();
986  if (globalBeginSucceeded) {
987  //To wait, the ref count has to be 1+#streams
988  auto streamLoopWaitTask = make_empty_waiting_task();
989  streamLoopWaitTask->increment_ref_count();
990 
991  typedef OccurrenceTraits<RunPrincipal, BranchActionStreamEnd> Traits;
992 
993  endStreamsTransitionAsync<Traits>(WaitingTaskHolder(streamLoopWaitTask.get()),
994  *schedule_,
996  runPrincipal,
997  ts,
998  es,
999  nullptr,
1000  serviceToken_,
1001  subProcesses_,
1002  cleaningUpAfterException);
1003 
1004  streamLoopWaitTask->wait_for_all();
1005  if (streamLoopWaitTask->exceptionPtr() != nullptr) {
1006  std::rethrow_exception(*(streamLoopWaitTask->exceptionPtr()));
1007  }
1008  }
1009  FDEBUG(1) << "\tstreamEndRun " << run << "\n";
1010  if (looper_) {
1011  //looper_->doStreamEndRun(schedule_->streamID(),runPrincipal, es);
1012  }
1013  {
1014  auto globalWaitTask = make_empty_waiting_task();
1015  globalWaitTask->increment_ref_count();
1016 
1017  typedef OccurrenceTraits<RunPrincipal, BranchActionGlobalEnd> Traits;
1018  endGlobalTransitionAsync<Traits>(WaitingTaskHolder(globalWaitTask.get()),
1019  *schedule_,
1020  runPrincipal,
1021  ts,
1022  es,
1023  nullptr,
1024  serviceToken_,
1025  subProcesses_,
1026  cleaningUpAfterException);
1027  globalWaitTask->wait_for_all();
1028  if (globalWaitTask->exceptionPtr() != nullptr) {
1029  std::rethrow_exception(*(globalWaitTask->exceptionPtr()));
1030  }
1031  }
1032  FDEBUG(1) << "\tendRun " << run << "\n";
1033  if (looper_) {
1034  looper_->doEndRun(runPrincipal, es, &processContext_);
1035  }
1036  }
ProcessContext processContext_
edm::propagate_const< std::unique_ptr< InputSource > > input_
static LuminosityBlockNumber_t maxLuminosityBlockNumber()
PreallocationConfiguration preallocations_
std::vector< SubProcess > subProcesses_
void setEndTime(Timestamp const &time)
Definition: RunPrincipal.h:70
ServiceToken serviceToken_
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
static EventNumber_t maxEventNumber()
Definition: EventID.h:96
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
#define FDEBUG(lev)
Definition: DebugMacros.h:19
PrincipalCache principalCache_
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const
void edm::EventProcessor::endUnfinishedLumi ( )

Definition at line 1414 of file EventProcessor.cc.

References mps_fire::i, edm::make_empty_waiting_task(), edm::PreallocationConfiguration::numberOfStreams(), preallocations_, streamEndLumiAsync(), streamLumiActive_, and streamLumiStatus_.

1414  {
1415  if (streamLumiActive_.load() > 0) {
1416  auto globalWaitTask = make_empty_waiting_task();
1417  globalWaitTask->increment_ref_count();
1418  {
1419  WaitingTaskHolder globalTaskHolder{globalWaitTask.get()};
1420  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1421  if (streamLumiStatus_[i]) {
1422  streamEndLumiAsync(globalTaskHolder, i, streamLumiStatus_[i]);
1423  }
1424  }
1425  }
1426  globalWaitTask->wait_for_all();
1427  if (globalWaitTask->exceptionPtr() != nullptr) {
1428  std::rethrow_exception(*(globalWaitTask->exceptionPtr()));
1429  }
1430  }
1431  }
void streamEndLumiAsync(edm::WaitingTaskHolder iTask, unsigned int iStreamIndex, std::shared_ptr< LuminosityBlockProcessingStatus > iLumiStatus)
PreallocationConfiguration preallocations_
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
std::atomic< unsigned int > streamLumiActive_
void edm::EventProcessor::endUnfinishedRun ( ProcessHistoryID const &  phid,
RunNumber_t  run,
bool  globalBeginSucceeded,
bool  cleaningUpAfterException,
bool  eventSetupForInstanceSucceeded 
)

Definition at line 944 of file EventProcessor.cc.

References deleteRunFromCache(), endRun(), edm::make_empty_waiting_task(), edm::RunPrincipal::mergeableRunProductMetadata(), edm::MergeableRunProductMetadata::postWriteRun(), edm::MergeableRunProductMetadata::preWriteRun(), principalCache_, run(), edm::PrincipalCache::runPrincipal(), OrderedSet::t, and writeRunAsync().

948  {
949  if (eventSetupForInstanceSucceeded) {
950  //If we skip empty runs, this would be called conditionally
951  endRun(phid, run, globalBeginSucceeded, cleaningUpAfterException);
952 
953  if (globalBeginSucceeded) {
955  t->increment_ref_count();
956  RunPrincipal& runPrincipal = principalCache_.runPrincipal(phid, run);
957  MergeableRunProductMetadata* mergeableRunProductMetadata = runPrincipal.mergeableRunProductMetadata();
958  mergeableRunProductMetadata->preWriteRun();
959  writeRunAsync(edm::WaitingTaskHolder{t.get()}, phid, run, mergeableRunProductMetadata);
960  t->wait_for_all();
961  mergeableRunProductMetadata->postWriteRun();
962  if (t->exceptionPtr()) {
963  std::rethrow_exception(*t->exceptionPtr());
964  }
965  }
966  }
967  deleteRunFromCache(phid, run);
968  }
void deleteRunFromCache(ProcessHistoryID const &phid, RunNumber_t run)
void writeRunAsync(WaitingTaskHolder, ProcessHistoryID const &phid, RunNumber_t run, MergeableRunProductMetadata const *)
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
void endRun(ProcessHistoryID const &phid, RunNumber_t run, bool globalBeginSucceeded, bool cleaningUpAfterException)
MergeableRunProductMetadata * mergeableRunProductMetadata()
Definition: RunPrincipal.h:78
PrincipalCache principalCache_
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const
std::vector< ModuleDescription const * > edm::EventProcessor::getAllModuleDescriptions ( ) const

Return a vector allowing const access to all the ModuleDescriptions for this EventProccessor. *** N.B. *** Ownership of the ModuleDescriptions is not *** passed to the caller. Do not call delete on these *** pointers!

Definition at line 611 of file EventProcessor.cc.

References schedule_.

611  {
612  return schedule_->getAllModuleDescriptions();
613  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
ServiceToken edm::EventProcessor::getToken ( )

Definition at line 609 of file EventProcessor.cc.

References serviceToken_.

Referenced by ~EventProcessor().

609 { return serviceToken_; }
ServiceToken serviceToken_
void edm::EventProcessor::getTriggerReport ( TriggerReport rep) const

Return the trigger report information on paths, modules-in-path, modules-in-endpath, and modules.

Definition at line 625 of file EventProcessor.cc.

References schedule_.

625 { schedule_->getTriggerReport(rep); }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
rep
Definition: cuy.py:1190
void edm::EventProcessor::globalEndLumiAsync ( edm::WaitingTaskHolder  iTask,
std::shared_ptr< LuminosityBlockProcessingStatus iLumiStatus 
)

Definition at line 1271 of file EventProcessor.cc.

References deleteLumiFromCache(), esp_, handleEndLumiExceptions(), looper_, edm::make_waiting_task(), edm::EventID::maxEventNumber(), eostools::move(), processContext_, queueWhichWaitsForIOVsToFinish_, edm::SerialTaskQueue::resume(), schedule_, serviceToken_, mps_update::status, subProcesses_, TrackValidation_cff::task, and writeLumiAsync().

Referenced by streamEndLumiAsync().

1272  {
1273  // Get some needed info out of the status object before moving
1274  // it into finalTaskForThisLumi.
1275  auto& lp = *(iLumiStatus->lumiPrincipal());
1276  bool didGlobalBeginSucceed = iLumiStatus->didGlobalBeginSucceed();
1277  bool cleaningUpAfterException = iLumiStatus->cleaningUpAfterException();
1278  EventSetupImpl const& es = iLumiStatus->eventSetupImpl(esp_->subProcessIndex());
1279  std::vector<std::shared_ptr<const EventSetupImpl>> const* eventSetupImpls = &iLumiStatus->eventSetupImpls();
1280 
1281  auto finalTaskForThisLumi = edm::make_waiting_task(
1282  tbb::task::allocate_root(),
1283  [status = std::move(iLumiStatus), iTask = std::move(iTask), this](std::exception_ptr const* iPtr) mutable {
1284  std::exception_ptr ptr;
1285  if (iPtr) {
1286  handleEndLumiExceptions(iPtr, iTask);
1287  } else {
1288  try {
1290  if (looper_) {
1291  auto& lumiPrincipal = *(status->lumiPrincipal());
1292  EventSetupImpl const& eventSetupImpl = status->eventSetupImpl(esp_->subProcessIndex());
1293  looper_->doEndLuminosityBlock(lumiPrincipal, eventSetupImpl, &processContext_);
1294  }
1295  } catch (...) {
1296  ptr = std::current_exception();
1297  }
1298  }
1300 
1301  // Try hard to clean up resources so the
1302  // process can terminate in a controlled
1303  // fashion even after exceptions have occurred.
1304 
1305  try {
1307  } catch (...) {
1308  if (not ptr) {
1309  ptr = std::current_exception();
1310  }
1311  }
1312 
1313  try {
1314  status->resumeGlobalLumiQueue();
1316  } catch (...) {
1317  if (not ptr) {
1318  ptr = std::current_exception();
1319  }
1320  }
1321 
1322  try {
1323  // This call to status.resetResources() must occur before iTask is destroyed.
1324  // Otherwise there will be a data race which could result in endRun
1325  // being delayed until it is too late to successfully call it.
1326  status->resetResources();
1327  status.reset();
1328  } catch (...) {
1329  if (not ptr) {
1330  ptr = std::current_exception();
1331  }
1332  }
1333 
1334  if (ptr) {
1335  handleEndLumiExceptions(&ptr, iTask);
1336  }
1337  });
1338 
1339  auto writeT = edm::make_waiting_task(
1340  tbb::task::allocate_root(),
1341  [this, didGlobalBeginSucceed, &lumiPrincipal = lp, task = WaitingTaskHolder(finalTaskForThisLumi)](
1342  std::exception_ptr const* iExcept) mutable {
1343  if (iExcept) {
1344  task.doneWaiting(*iExcept);
1345  } else {
1346  //Only call writeLumi if beginLumi succeeded
1347  if (didGlobalBeginSucceed) {
1348  writeLumiAsync(std::move(task), lumiPrincipal);
1349  }
1350  }
1351  });
1352 
1353  IOVSyncValue ts(EventID(lp.run(), lp.luminosityBlock(), EventID::maxEventNumber()), lp.beginTime());
1354 
1355  typedef OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalEnd> Traits;
1356 
1357  endGlobalTransitionAsync<Traits>(WaitingTaskHolder(writeT),
1358  *schedule_,
1359  lp,
1360  ts,
1361  es,
1362  eventSetupImpls,
1363  serviceToken_,
1364  subProcesses_,
1365  cleaningUpAfterException);
1366  }
ProcessContext processContext_
void deleteLumiFromCache(LuminosityBlockProcessingStatus &)
void writeLumiAsync(WaitingTaskHolder, LuminosityBlockPrincipal &lumiPrincipal)
std::vector< SubProcess > subProcesses_
edm::SerialTaskQueue queueWhichWaitsForIOVsToFinish_
ServiceToken serviceToken_
bool resume()
Resumes processing if the queue was paused.
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:87
static EventNumber_t maxEventNumber()
Definition: EventID.h:96
void handleEndLumiExceptions(std::exception_ptr const *iPtr, WaitingTaskHolder &holder)
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
def move(src, dest)
Definition: eostools.py:511
void edm::EventProcessor::handleEndLumiExceptions ( std::exception_ptr const *  iPtr,
WaitingTaskHolder holder 
)

Definition at line 1262 of file EventProcessor.cc.

References edm::WaitingTaskHolder::doneWaiting(), setDeferredException(), setExceptionMessageLumis(), and createJobs::tmp.

Referenced by globalEndLumiAsync(), and streamEndLumiAsync().

1262  {
1263  if (setDeferredException(*iPtr)) {
1264  WaitingTaskHolder tmp(holder);
1265  tmp.doneWaiting(*iPtr);
1266  } else {
1268  }
1269  }
tmp
align.sh
Definition: createJobs.py:716
bool setDeferredException(std::exception_ptr)
void edm::EventProcessor::handleNextEventForStreamAsync ( WaitingTaskHolder  iTask,
unsigned int  iStreamIndex 
)
private

Definition at line 1622 of file EventProcessor.cc.

References beginLumiAsync(), deferredExceptionPtr_, deferredExceptionPtrIsSet_, edm::WaitingTaskHolder::doneWaiting(), MillePedeFileConverter_cfg::e, edm::InputSource::IsLumi, lastTransitionType(), edm::make_waiting_task(), eostools::move(), processEventAsync(), edm::SerialTaskQueueChain::push(), readNextEventForStream(), edm::SharedResourcesAcquirer::serialQueueChain(), serviceToken_, sourceResourcesAcquirer_, mps_update::status, streamEndLumiAsync(), and streamLumiStatus_.

Referenced by beginLumiAsync(), and continueLumiAsync().

1622  {
1623  sourceResourcesAcquirer_.serialQueueChain().push([this, iTask, iStreamIndex]() mutable {
1625  auto& status = streamLumiStatus_[iStreamIndex];
1626  try {
1627  if (readNextEventForStream(iStreamIndex, *status)) {
1628  auto recursionTask = make_waiting_task(
1629  tbb::task::allocate_root(), [this, iTask, iStreamIndex](std::exception_ptr const* iPtr) mutable {
1630  if (iPtr) {
1631  // Try to end the stream properly even if an exception was
1632  // thrown on an event.
1633  bool expected = false;
1634  if (deferredExceptionPtrIsSet_.compare_exchange_strong(expected, true)) {
1635  // This is the case where the exception in iPtr is the primary
1636  // exception and we want to see its message.
1637  deferredExceptionPtr_ = *iPtr;
1638  WaitingTaskHolder tempHolder(iTask);
1639  tempHolder.doneWaiting(*iPtr);
1640  }
1641  streamEndLumiAsync(std::move(iTask), iStreamIndex, streamLumiStatus_[iStreamIndex]);
1642  //the stream will stop now
1643  return;
1644  }
1645  handleNextEventForStreamAsync(std::move(iTask), iStreamIndex);
1646  });
1647 
1648  processEventAsync(WaitingTaskHolder(recursionTask), iStreamIndex);
1649  } else {
1650  //the stream will stop now
1651  if (status->isLumiEnding()) {
1652  if (lastTransitionType() == InputSource::IsLumi and not status->haveStartedNextLumi()) {
1653  status->startNextLumi();
1654  beginLumiAsync(status->nextSyncValue(), status->runResource(), iTask);
1655  }
1656  streamEndLumiAsync(std::move(iTask), iStreamIndex, status);
1657  } else {
1658  iTask.doneWaiting(std::exception_ptr{});
1659  }
1660  }
1661  } catch (...) {
1662  // It is unlikely we will ever get in here ...
1663  // But if we do try to clean up and propagate the exception
1664  if (streamLumiStatus_[iStreamIndex]) {
1665  streamEndLumiAsync(iTask, iStreamIndex, streamLumiStatus_[iStreamIndex]);
1666  }
1667  bool expected = false;
1668  if (deferredExceptionPtrIsSet_.compare_exchange_strong(expected, true)) {
1669  auto e = std::current_exception();
1671  iTask.doneWaiting(e);
1672  }
1673  }
1674  });
1675  }
bool readNextEventForStream(unsigned int iStreamIndex, LuminosityBlockProcessingStatus &iLumiStatus)
SharedResourcesAcquirer sourceResourcesAcquirer_
void processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
void streamEndLumiAsync(edm::WaitingTaskHolder iTask, unsigned int iStreamIndex, std::shared_ptr< LuminosityBlockProcessingStatus > iLumiStatus)
void push(T &&iAction)
asynchronously pushes functor iAction into queue
void handleNextEventForStreamAsync(WaitingTaskHolder iTask, unsigned int iStreamIndex)
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
InputSource::ItemType lastTransitionType() const
SerialTaskQueueChain & serialQueueChain() const
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:87
void beginLumiAsync(edm::IOVSyncValue const &iSyncValue, std::shared_ptr< void > const &iRunResource, edm::WaitingTaskHolder iHolder)
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
std::exception_ptr deferredExceptionPtr_
def move(src, dest)
Definition: eostools.py:511
void edm::EventProcessor::init ( std::shared_ptr< ProcessDesc > &  processDesc,
ServiceToken const &  token,
serviceregistry::ServiceLegacy  iLegacy 
)
private

Definition at line 324 of file EventProcessor.cc.

References act_table_, actReg_, branchIDListHelper(), branchIDListHelper_, trackingPlots::common, edm::errors::Configuration, SiStripBadComponentsDQMServiceTemplate_cfg::ep, esp_, espController_, Exception, FDEBUG, processOptions_cff::fileMode, fileModeNoMerge_, edm::fillLooper(), forceESCacheClearOnNewRun_, historyAppender_, input_, edm::PrincipalCache::insert(), edm::ParentageRegistry::insertMapped(), edm::ParentageRegistry::instance(), mps_monitormerge::items, edm::serviceregistry::kConfigurationOverrides, looper_, lumiQueue_, edm::makeInput(), mergeableRunProductProcesses_, eostools::move(), runTheMatrix::nThreads, edm::PreallocationConfiguration::numberOfLuminosityBlocks(), edm::PreallocationConfiguration::numberOfStreams(), or, edm::parameterSet(), edm::popSubProcessVParameterSet(), preallocations_, preg(), preg_, principalCache_, printDependencies_, processConfiguration_, processContext_, edm::ParameterSet::registerIt(), schedule_, serviceToken_, edm::PrincipalCache::setNumberOfConcurrentPrincipals(), edm::ProcessContext::setProcessConfiguration(), edm::MergeableRunProductProcesses::setProcessesWithMergeableRunProducts(), edm::PrincipalCache::setProcessHistoryRegistry(), edm::IllegalParameters::setThrowAnException(), streamLumiStatus_, streamQueues_, AlCaHLTBitMon_QueryRunRegistry::string, subProcesses_, thinnedAssociationsHelper(), thinnedAssociationsHelper_, unpackBuffers-CaloStage2::token, and edm::validateTopLevelParameterSets().

Referenced by EventProcessor().

326  {
327  //std::cerr << processDesc->dump() << std::endl;
328 
329  // register the empty parentage vector , once and for all
331 
332  // register the empty parameter set, once and for all.
333  ParameterSet().registerIt();
334 
335  std::shared_ptr<ParameterSet> parameterSet = processDesc->getProcessPSet();
336 
337  // If there are subprocesses, pop the subprocess parameter sets out of the process parameter set
338  auto subProcessVParameterSet = popSubProcessVParameterSet(*parameterSet);
339  bool const hasSubProcesses = !subProcessVParameterSet.empty();
340 
341  // Validates the parameters in the 'options', 'maxEvents', 'maxLuminosityBlocks',
342  // and 'maxSecondsUntilRampdown' top level parameter sets. Default values are also
343  // set in here if the parameters were not explicitly set.
344  validateTopLevelParameterSets(parameterSet.get());
345 
346  // Now set some parameters specific to the main process.
347  ParameterSet const& optionsPset(parameterSet->getUntrackedParameterSet("options"));
348  auto const& fileMode = optionsPset.getUntrackedParameter<std::string>("fileMode");
349  if (fileMode != "NOMERGE" and fileMode != "FULLMERGE") {
350  throw Exception(errors::Configuration, "Illegal fileMode parameter value: ")
351  << fileMode << ".\n"
352  << "Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
353  } else {
354  fileModeNoMerge_ = (fileMode == "NOMERGE");
355  }
356  forceESCacheClearOnNewRun_ = optionsPset.getUntrackedParameter<bool>("forceEventSetupCacheClearOnNewRun");
357 
358  //threading
359  unsigned int nThreads = optionsPset.getUntrackedParameter<unsigned int>("numberOfThreads");
360 
361  // Even if numberOfThreads was set to zero in the Python configuration, the code
362  // in cmsRun.cpp should have reset it to something else.
363  assert(nThreads != 0);
364 
365  unsigned int nStreams = optionsPset.getUntrackedParameter<unsigned int>("numberOfStreams");
366  if (nStreams == 0) {
367  nStreams = nThreads;
368  }
369  if (nThreads > 1 or nStreams > 1) {
370  edm::LogInfo("ThreadStreamSetup") << "setting # threads " << nThreads << "\nsetting # streams " << nStreams;
371  }
372  unsigned int nConcurrentRuns = optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentRuns");
373  if (nConcurrentRuns != 1) {
374  throw Exception(errors::Configuration, "Illegal value nConcurrentRuns : ")
375  << "Although the plan is to change this in the future, currently nConcurrentRuns must always be 1.\n";
376  }
377  unsigned int nConcurrentLumis =
378  optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentLuminosityBlocks");
379  if (nConcurrentLumis == 0) {
380  nConcurrentLumis = nConcurrentRuns;
381  }
382 
383  //Check that relationships between threading parameters makes sense
384  /*
385  if(nThreads<nStreams) {
386  //bad
387  }
388  if(nConcurrentRuns>nStreams) {
389  //bad
390  }
391  if(nConcurrentRuns>nConcurrentLumis) {
392  //bad
393  }
394  */
395  IllegalParameters::setThrowAnException(optionsPset.getUntrackedParameter<bool>("throwIfIllegalParameter"));
396 
397  printDependencies_ = optionsPset.getUntrackedParameter<bool>("printDependencies");
398 
399  // Now do general initialization
400  ScheduleItems items;
401 
402  //initialize the services
403  auto& serviceSets = processDesc->getServicesPSets();
404  ServiceToken token = items.initServices(serviceSets, *parameterSet, iToken, iLegacy, true);
405  serviceToken_ = items.addCPRandTNS(*parameterSet, token);
406 
407  //make the services available
409 
410  if (nStreams > 1) {
412  handler->willBeUsingThreads();
413  }
414 
415  // intialize miscellaneous items
416  std::shared_ptr<CommonParams> common(items.initMisc(*parameterSet));
417 
418  // intialize the event setup provider
419  ParameterSet const& eventSetupPset(optionsPset.getUntrackedParameterSet("eventSetup"));
420  esp_ = espController_->makeProvider(*parameterSet, items.actReg_.get(), &eventSetupPset);
421 
422  // initialize the looper, if any
423  looper_ = fillLooper(*espController_, *esp_, *parameterSet);
424  if (looper_) {
425  looper_->setActionTable(items.act_table_.get());
426  looper_->attachTo(*items.actReg_);
427 
428  //For now loopers make us run only 1 transition at a time
429  nStreams = 1;
430  nConcurrentLumis = 1;
431  nConcurrentRuns = 1;
432  }
433  espController_->setMaxConcurrentIOVs(nStreams, nConcurrentLumis);
434 
435  preallocations_ = PreallocationConfiguration{nThreads, nStreams, nConcurrentLumis, nConcurrentRuns};
436 
437  lumiQueue_ = std::make_unique<LimitedTaskQueue>(nConcurrentLumis);
438  streamQueues_.resize(nStreams);
439  streamLumiStatus_.resize(nStreams);
440 
441  // initialize the input source
442  input_ = makeInput(*parameterSet,
443  *common,
444  items.preg(),
445  items.branchIDListHelper(),
446  items.thinnedAssociationsHelper(),
447  items.actReg_,
448  items.processConfiguration(),
450 
451  // intialize the Schedule
452  schedule_ = items.initSchedule(*parameterSet, hasSubProcesses, preallocations_, &processContext_);
453 
454  // set the data members
455  act_table_ = std::move(items.act_table_);
456  actReg_ = items.actReg_;
457  preg_ = items.preg();
459  branchIDListHelper_ = items.branchIDListHelper();
460  thinnedAssociationsHelper_ = items.thinnedAssociationsHelper();
461  processConfiguration_ = items.processConfiguration();
463  principalCache_.setProcessHistoryRegistry(input_->processHistoryRegistry());
464 
465  FDEBUG(2) << parameterSet << std::endl;
466 
468  for (unsigned int index = 0; index < preallocations_.numberOfStreams(); ++index) {
469  // Reusable event principal
470  auto ep = std::make_shared<EventPrincipal>(preg(),
474  historyAppender_.get(),
475  index);
477  }
478 
479  for (unsigned int index = 0; index < preallocations_.numberOfLuminosityBlocks(); ++index) {
480  auto lp =
481  std::make_unique<LuminosityBlockPrincipal>(preg(), *processConfiguration_, historyAppender_.get(), index);
483  }
484 
485  // fill the subprocesses, if there are any
486  subProcesses_.reserve(subProcessVParameterSet.size());
487  for (auto& subProcessPSet : subProcessVParameterSet) {
488  subProcesses_.emplace_back(subProcessPSet,
489  *parameterSet,
490  preg(),
493  SubProcessParentageHelper(),
495  *actReg_,
496  token,
499  &processContext_);
500  }
501  }
void insert(std::shared_ptr< RunPrincipal > rp)
ProcessContext processContext_
void setProcessesWithMergeableRunProducts(ProductRegistry const &productRegistry)
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
std::unique_ptr< edm::LimitedTaskQueue > lumiQueue_
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
PreallocationConfiguration preallocations_
std::vector< SubProcess > subProcesses_
void validateTopLevelParameterSets(ParameterSet *processParameterSet)
MergeableRunProductProcesses mergeableRunProductProcesses_
std::unique_ptr< InputSource > makeInput(ParameterSet &params, CommonParams const &common, std::shared_ptr< ProductRegistry > preg, std::shared_ptr< BranchIDListHelper > branchIDListHelper, std::shared_ptr< ThinnedAssociationsHelper > thinnedAssociationsHelper, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, PreallocationConfiguration const &allocations)
ServiceToken serviceToken_
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
std::shared_ptr< ProductRegistry const > preg() const
std::vector< edm::SerialTaskQueue > streamQueues_
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
static void setThrowAnException(bool v)
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< EDLooperBase > fillLooper(eventsetup::EventSetupsController &esController, eventsetup::EventSetupProvider &cp, ParameterSet &params)
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
Definition: common.py:1
std::vector< ParameterSet > popSubProcessVParameterSet(ParameterSet &parameterSet)
Definition: SubProcess.cc:693
void setProcessHistoryRegistry(ProcessHistoryRegistry const &phr)
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
#define FDEBUG(lev)
Definition: DebugMacros.h:19
static ParentageRegistry * instance()
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
bool insertMapped(value_type const &v)
ParameterSet const & parameterSet(Provenance const &provenance)
Definition: Provenance.cc:11
def move(src, dest)
Definition: eostools.py:511
PrincipalCache principalCache_
InputSource::ItemType edm::EventProcessor::lastTransitionType ( ) const
inline
std::shared_ptr<EDLooperBase const> edm::EventProcessor::looper ( ) const
inlineprivate

Definition at line 299 of file EventProcessor.h.

References edm::get_underlying_safe().

Referenced by endJob().

299 { return get_underlying_safe(looper_); }
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
std::shared_ptr<EDLooperBase>& edm::EventProcessor::looper ( )
inlineprivate

Definition at line 300 of file EventProcessor.h.

References edm::get_underlying_safe().

300 { return get_underlying_safe(looper_); }
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::LuminosityBlockNumber_t edm::EventProcessor::nextLuminosityBlockID ( )

Definition at line 674 of file EventProcessor.cc.

References input_.

Referenced by readNextEventForStream().

674 { return input_->luminosityBlock(); }
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::pair< edm::ProcessHistoryID, edm::RunNumber_t > edm::EventProcessor::nextRunID ( )

Definition at line 670 of file EventProcessor.cc.

References input_.

670  {
671  return std::make_pair(input_->reducedProcessHistoryID(), input_->run());
672  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
InputSource::ItemType edm::EventProcessor::nextTransitionType ( )

Definition at line 644 of file EventProcessor.cc.

References actReg_, checkForAsyncStopRequest(), deferredExceptionPtrIsSet_, epSuccess, edm::ExternalSignal, input_, edm::InputSource::IsStop, edm::InputSource::IsSynchronize, lastSourceTransition_, and runEdmFileComparison::returnCode.

Referenced by readNextEventForStream().

644  {
645  if (deferredExceptionPtrIsSet_.load()) {
647  return InputSource::IsStop;
648  }
649 
650  SendSourceTerminationSignalIfException sentry(actReg_.get());
651  InputSource::ItemType itemType;
652  //For now, do nothing with InputSource::IsSynchronize
653  do {
654  itemType = input_->nextItemType();
655  } while (itemType == InputSource::IsSynchronize);
656 
657  lastSourceTransition_ = itemType;
658  sentry.completedSuccessfully();
659 
661 
662  if (checkForAsyncStopRequest(returnCode)) {
663  actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExternalSignal);
665  }
666 
667  return lastSourceTransition_;
668  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
bool checkForAsyncStopRequest(StatusCode &)
std::atomic< bool > deferredExceptionPtrIsSet_
InputSource::ItemType lastSourceTransition_
std::shared_ptr< ActivityRegistry > actReg_
void edm::EventProcessor::openOutputFiles ( )

Definition at line 772 of file EventProcessor.cc.

References fb_, FDEBUG, edm::for_all(), schedule_, and subProcesses_.

772  {
773  if (fb_.get() != nullptr) {
774  schedule_->openOutputFiles(*fb_);
775  for_all(subProcesses_, [this](auto& subProcess) { subProcess.openOutputFiles(*fb_); });
776  }
777  FDEBUG(1) << "\topenOutputFiles\n";
778  }
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
#define FDEBUG(lev)
Definition: DebugMacros.h:19
EventProcessor& edm::EventProcessor::operator= ( EventProcessor const &  )
delete
std::shared_ptr<ProductRegistry const> edm::EventProcessor::preg ( ) const
inlineprivate

Definition at line 287 of file EventProcessor.h.

References edm::get_underlying_safe().

Referenced by beginJob(), init(), readAndMergeLumi(), readAndMergeRun(), readFile(), and readRun().

287 { return get_underlying_safe(preg_); }
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
std::shared_ptr<ProductRegistry>& edm::EventProcessor::preg ( )
inlineprivate

Definition at line 288 of file EventProcessor.h.

References edm::get_underlying_safe().

288 { return get_underlying_safe(preg_); }
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
void edm::EventProcessor::prepareForNextLoop ( )

Definition at line 837 of file EventProcessor.cc.

References esp_, FDEBUG, edm::propagate_const< T >::get(), and looper_.

Referenced by runToCompletion().

837  {
838  looper_->prepareForNextLoop(esp_.get());
839  FDEBUG(1) << "\tprepareForNextLoop\n";
840  }
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
element_type const * get() const
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
#define FDEBUG(lev)
Definition: DebugMacros.h:19
ProcessConfiguration const& edm::EventProcessor::processConfiguration ( ) const
inline

Definition at line 136 of file EventProcessor.h.

References EcalCondTools::getToken(), and cuy::rep.

136 { return *processConfiguration_; }
std::shared_ptr< ProcessConfiguration const > processConfiguration_
void edm::EventProcessor::processEventAsync ( WaitingTaskHolder  iHolder,
unsigned int  iStreamIndex 
)
private

Definition at line 1691 of file EventProcessor.cc.

References edm::make_functor_task(), and processEventAsyncImpl().

Referenced by handleNextEventForStreamAsync().

1691  {
1692  tbb::task::spawn(
1693  *make_functor_task(tbb::task::allocate_root(), [=]() { processEventAsyncImpl(iHolder, iStreamIndex); }));
1694  }
FunctorTask< F > * make_functor_task(ALLOC &&iAlloc, F f)
Definition: FunctorTask.h:47
void processEventAsyncImpl(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
void edm::EventProcessor::processEventAsyncImpl ( WaitingTaskHolder  iHolder,
unsigned int  iStreamIndex 
)
private

Definition at line 1696 of file EventProcessor.cc.

References edm::WaitingTaskHolder::doneWaiting(), esp_, ev, edm::PrincipalCache::eventPrincipal(), FDEBUG, edm::Service< T >::isAvailable(), looper_, edm::make_waiting_task(), eostools::move(), principalCache_, processEventWithLooper(), groupFilesInBlocks::reverse, schedule_, serviceToken_, streamLumiStatus_, and subProcesses_.

Referenced by processEventAsync().

1696  {
1697  auto pep = &(principalCache_.eventPrincipal(iStreamIndex));
1698 
1701  if (rng.isAvailable()) {
1702  Event ev(*pep, ModuleDescription(), nullptr);
1703  rng->postEventRead(ev);
1704  }
1705 
1706  WaitingTaskHolder finalizeEventTask(make_waiting_task(
1707  tbb::task::allocate_root(), [this, pep, iHolder, iStreamIndex](std::exception_ptr const* iPtr) mutable {
1708  //NOTE: If we have a looper we only have one Stream
1709  if (looper_) {
1710  ServiceRegistry::Operate operateLooper(serviceToken_);
1711  processEventWithLooper(*pep, iStreamIndex);
1712  }
1713 
1714  FDEBUG(1) << "\tprocessEvent\n";
1715  pep->clearEventPrincipal();
1716  if (iPtr) {
1717  iHolder.doneWaiting(*iPtr);
1718  } else {
1719  iHolder.doneWaiting(std::exception_ptr());
1720  }
1721  }));
1722  WaitingTaskHolder afterProcessTask;
1723  if (subProcesses_.empty()) {
1724  afterProcessTask = std::move(finalizeEventTask);
1725  } else {
1726  //Need to run SubProcesses after schedule has finished
1727  // with the event
1728  afterProcessTask = WaitingTaskHolder(make_waiting_task(
1729  tbb::task::allocate_root(),
1730  [this, pep, finalizeEventTask, iStreamIndex](std::exception_ptr const* iPtr) mutable {
1731  if (not iPtr) {
1732  //when run with 1 thread, we want to the order to be what
1733  // it was before. This requires reversing the order since
1734  // tasks are run last one in first one out
1735  for (auto& subProcess : boost::adaptors::reverse(subProcesses_)) {
1736  subProcess.doEventAsync(finalizeEventTask, *pep, &streamLumiStatus_[iStreamIndex]->eventSetupImpls());
1737  }
1738  } else {
1739  finalizeEventTask.doneWaiting(*iPtr);
1740  }
1741  }));
1742  }
1743 
1744  EventSetupImpl const& es = streamLumiStatus_[iStreamIndex]->eventSetupImpl(esp_->subProcessIndex());
1745  schedule_->processOneEventAsync(std::move(afterProcessTask), iStreamIndex, *pep, es, serviceToken_);
1746  }
void processEventWithLooper(EventPrincipal &, unsigned int iStreamIndex)
bool ev
std::vector< SubProcess > subProcesses_
ServiceToken serviceToken_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:87
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
#define FDEBUG(lev)
Definition: DebugMacros.h:19
def move(src, dest)
Definition: eostools.py:511
PrincipalCache principalCache_
void edm::EventProcessor::processEventWithLooper ( EventPrincipal iPrincipal,
unsigned int  iStreamIndex 
)
private

Definition at line 1748 of file EventProcessor.cc.

References esp_, input_, edm::InputSource::IsStop, edm::EDLooperBase::kContinue, edm::ProcessingController::kToPreviousEvent, edm::ProcessingController::kToSpecifiedEvent, edm::ProcessingController::lastOperationSucceeded(), lastSourceTransition_, looper_, processContext_, edm::ProcessingController::requestedTransition(), edm::ProcessingController::setLastOperationSucceeded(), shouldWeStop_, edm::ProcessingController::specifiedEventTransition(), mps_update::status, edm::EventPrincipal::streamID(), streamLumiStatus_, and summarizeEdmComparisonLogfiles::succeeded.

Referenced by processEventAsyncImpl().

1748  {
1749  bool randomAccess = input_->randomAccess();
1750  ProcessingController::ForwardState forwardState = input_->forwardState();
1751  ProcessingController::ReverseState reverseState = input_->reverseState();
1752  ProcessingController pc(forwardState, reverseState, randomAccess);
1753 
1755  do {
1756  StreamContext streamContext(iPrincipal.streamID(), &processContext_);
1757  EventSetupImpl const& es = streamLumiStatus_[iStreamIndex]->eventSetupImpl(esp_->subProcessIndex());
1758  status = looper_->doDuringLoop(iPrincipal, es, pc, &streamContext);
1759 
1760  bool succeeded = true;
1761  if (randomAccess) {
1762  if (pc.requestedTransition() == ProcessingController::kToPreviousEvent) {
1763  input_->skipEvents(-2);
1764  } else if (pc.requestedTransition() == ProcessingController::kToSpecifiedEvent) {
1765  succeeded = input_->goToEvent(pc.specifiedEventTransition());
1766  }
1767  }
1768  pc.setLastOperationSucceeded(succeeded);
1769  } while (!pc.lastOperationSucceeded());
1770  if (status != EDLooperBase::kContinue) {
1771  shouldWeStop_ = true;
1773  }
1774  }
ProcessContext processContext_
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
InputSource::ItemType lastSourceTransition_
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
InputSource::ItemType edm::EventProcessor::processLumis ( std::shared_ptr< void > const &  iRunResource)

Definition at line 1038 of file EventProcessor.cc.

References beginLumiAsync(), continueLumiAsync(), input_, lastTransitionType(), edm::make_empty_waiting_task(), edm::PreallocationConfiguration::numberOfStreams(), preallocations_, and streamLumiActive_.

1038  {
1039  auto waitTask = make_empty_waiting_task();
1040  waitTask->increment_ref_count();
1041 
1042  if (streamLumiActive_ > 0) {
1044  // Continue after opening a new input file
1045  continueLumiAsync(WaitingTaskHolder{waitTask.get()});
1046  } else {
1047  beginLumiAsync(IOVSyncValue(EventID(input_->run(), input_->luminosityBlock(), 0),
1048  input_->luminosityBlockAuxiliary()->beginTime()),
1049  iRunResource,
1050  WaitingTaskHolder{waitTask.get()});
1051  }
1052  waitTask->wait_for_all();
1053 
1054  if (waitTask->exceptionPtr() != nullptr) {
1055  std::rethrow_exception(*(waitTask->exceptionPtr()));
1056  }
1057  return lastTransitionType();
1058  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
PreallocationConfiguration preallocations_
InputSource::ItemType lastTransitionType() const
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
void continueLumiAsync(edm::WaitingTaskHolder iHolder)
void beginLumiAsync(edm::IOVSyncValue const &iSyncValue, std::shared_ptr< void > const &iRunResource, edm::WaitingTaskHolder iHolder)
std::atomic< unsigned int > streamLumiActive_
int edm::EventProcessor::readAndMergeLumi ( LuminosityBlockProcessingStatus iStatus)

Definition at line 1487 of file EventProcessor.cc.

References actReg_, input_, edm::LuminosityBlockProcessingStatus::lumiPrincipal(), or, and preg().

Referenced by readNextEventForStream().

1487  {
1488  auto& lumiPrincipal = *iStatus.lumiPrincipal();
1489  assert(lumiPrincipal.aux().sameIdentity(*input_->luminosityBlockAuxiliary()) or
1490  input_->processHistoryRegistry().reducedProcessHistoryID(lumiPrincipal.aux().processHistoryID()) ==
1491  input_->processHistoryRegistry().reducedProcessHistoryID(
1492  input_->luminosityBlockAuxiliary()->processHistoryID()));
1493  bool lumiOK = lumiPrincipal.adjustToNewProductRegistry(*preg());
1494  assert(lumiOK);
1495  lumiPrincipal.mergeAuxiliary(*input_->luminosityBlockAuxiliary());
1496  {
1497  SendSourceTerminationSignalIfException sentry(actReg_.get());
1498  input_->readAndMergeLumi(*iStatus.lumiPrincipal());
1499  sentry.completedSuccessfully();
1500  }
1501  return input_->luminosityBlock();
1502  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::shared_ptr< ProductRegistry const > preg() const
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
std::shared_ptr< ActivityRegistry > actReg_
std::pair< ProcessHistoryID, RunNumber_t > edm::EventProcessor::readAndMergeRun ( )

Definition at line 1456 of file EventProcessor.cc.

References actReg_, input_, edm::PrincipalCache::merge(), preg(), principalCache_, and edm::PrincipalCache::runPrincipalPtr().

1456  {
1457  principalCache_.merge(input_->runAuxiliary(), preg());
1458  auto runPrincipal = principalCache_.runPrincipalPtr();
1459  {
1460  SendSourceTerminationSignalIfException sentry(actReg_.get());
1461  input_->readAndMergeRun(*runPrincipal);
1462  sentry.completedSuccessfully();
1463  }
1464  assert(input_->reducedProcessHistoryID() == runPrincipal->reducedProcessHistoryID());
1465  return std::make_pair(runPrincipal->reducedProcessHistoryID(), input_->run());
1466  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::shared_ptr< ProductRegistry const > preg() const
std::shared_ptr< RunPrincipal > const & runPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run) const
std::shared_ptr< ActivityRegistry > actReg_
void merge(std::shared_ptr< RunAuxiliary > aux, std::shared_ptr< ProductRegistry const > reg)
PrincipalCache principalCache_
void edm::EventProcessor::readEvent ( unsigned int  iStreamIndex)
private

Definition at line 1677 of file EventProcessor.cc.

References actReg_, edm::PrincipalCache::eventPrincipal(), FDEBUG, input_, principalCache_, processContext_, and streamLumiStatus_.

Referenced by readNextEventForStream().

1677  {
1678  //TODO this will have to become per stream
1679  auto& event = principalCache_.eventPrincipal(iStreamIndex);
1680  StreamContext streamContext(event.streamID(), &processContext_);
1681 
1682  SendSourceTerminationSignalIfException sentry(actReg_.get());
1683  input_->readEvent(event, streamContext);
1684 
1685  streamLumiStatus_[iStreamIndex]->updateLastTimestamp(input_->timestamp());
1686  sentry.completedSuccessfully();
1687 
1688  FDEBUG(1) << "\treadEvent\n";
1689  }
ProcessContext processContext_
edm::propagate_const< std::unique_ptr< InputSource > > input_
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
std::shared_ptr< ActivityRegistry > actReg_
#define FDEBUG(lev)
Definition: DebugMacros.h:19
Definition: event.py:1
PrincipalCache principalCache_
void edm::EventProcessor::readFile ( )

Definition at line 745 of file EventProcessor.cc.

References actReg_, edm::PrincipalCache::adjustEventsToNewProductRegistry(), edm::PrincipalCache::adjustIndexesAfterProductRegistryAddition(), fb_, FDEBUG, input_, edm::PreallocationConfiguration::numberOfStreams(), edm::PreallocationConfiguration::numberOfThreads(), edm::FileBlock::ParallelProcesses, preallocations_, preg(), preg_, edm::PrincipalCache::preReadFile(), principalCache_, and findQualityFiles::size.

Referenced by Vispa.Plugins.EventBrowser.EventBrowserTabController.EventBrowserTabController::navigate(), Vispa.Main.TabController.TabController::open(), and Vispa.Main.TabController.TabController::refresh().

745  {
746  FDEBUG(1) << " \treadFile\n";
747  size_t size = preg_->size();
748  SendSourceTerminationSignalIfException sentry(actReg_.get());
749 
751 
752  fb_ = input_->readFile();
753  if (size < preg_->size()) {
755  }
758  fb_->setNotFastClonable(FileBlock::ParallelProcesses);
759  }
760  sentry.completedSuccessfully();
761  }
size
Write out results.
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
void adjustIndexesAfterProductRegistryAddition()
PreallocationConfiguration preallocations_
std::shared_ptr< ProductRegistry const > preg() const
void adjustEventsToNewProductRegistry(std::shared_ptr< ProductRegistry const > reg)
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
std::shared_ptr< ActivityRegistry > actReg_
#define FDEBUG(lev)
Definition: DebugMacros.h:19
PrincipalCache principalCache_
void edm::EventProcessor::readLuminosityBlock ( LuminosityBlockProcessingStatus iStatus)

Definition at line 1468 of file EventProcessor.cc.

References actReg_, Exception, edm::PrincipalCache::getAvailableLumiPrincipalPtr(), edm::PrincipalCache::hasRunPrincipal(), historyAppender_, input_, edm::errors::LogicError, edm::LuminosityBlockProcessingStatus::lumiPrincipal(), eostools::move(), principalCache_, and edm::PrincipalCache::runPrincipalPtr().

Referenced by beginLumiAsync().

1468  {
1470  throw edm::Exception(edm::errors::LogicError) << "EventProcessor::readLuminosityBlock\n"
1471  << "Illegal attempt to insert lumi into cache\n"
1472  << "Run is invalid\n"
1473  << "Contact a Framework Developer\n";
1474  }
1476  assert(lbp);
1477  lbp->setAux(*input_->luminosityBlockAuxiliary());
1478  {
1479  SendSourceTerminationSignalIfException sentry(actReg_.get());
1480  input_->readLuminosityBlock(*lbp, *historyAppender_);
1481  sentry.completedSuccessfully();
1482  }
1483  lbp->setRunPrincipal(principalCache_.runPrincipalPtr());
1484  iStatus.lumiPrincipal() = std::move(lbp);
1485  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
bool hasRunPrincipal() const
std::shared_ptr< RunPrincipal > const & runPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run) const
std::shared_ptr< LuminosityBlockPrincipal > getAvailableLumiPrincipalPtr()
std::shared_ptr< ActivityRegistry > actReg_
def move(src, dest)
Definition: eostools.py:511
PrincipalCache principalCache_
bool edm::EventProcessor::readNextEventForStream ( unsigned int  iStreamIndex,
LuminosityBlockProcessingStatus iLumiStatus 
)
private

Definition at line 1561 of file EventProcessor.cc.

References edm::LuminosityBlockProcessingStatus::continuingLumi(), deferredExceptionPtr_, deferredExceptionPtrIsSet_, edm::LuminosityBlockProcessingStatus::endLumi(), edm::LuminosityBlockProcessingStatus::haveContinuedLumi(), input_, edm::InputSource::IsEvent, edm::InputSource::IsLumi, edm::InputSource::IsRun, edm::InputSource::IsStop, lastSourceTransition_, edm::LuminosityBlockProcessingStatus::lumiPrincipal(), nextLuminosityBlockID(), nextTransitionType(), or, readAndMergeLumi(), readEvent(), serviceToken_, edm::LuminosityBlockProcessingStatus::setNextSyncValue(), shouldWeStop(), sourceMutex_, edm::LuminosityBlockProcessingStatus::stopProcessingEvents(), and edm::LuminosityBlockProcessingStatus::wasEventProcessingStopped().

Referenced by handleNextEventForStreamAsync().

1561  {
1562  if (deferredExceptionPtrIsSet_.load(std::memory_order_acquire)) {
1563  iStatus.endLumi();
1564  return false;
1565  }
1566 
1567  if (iStatus.wasEventProcessingStopped()) {
1568  return false;
1569  }
1570 
1571  if (shouldWeStop()) {
1573  iStatus.stopProcessingEvents();
1574  iStatus.endLumi();
1575  return false;
1576  }
1577 
1579  try {
1580  //need to use lock in addition to the serial task queue because
1581  // of delayed provenance reading and reading data in response to
1582  // edm::Refs etc
1583  std::lock_guard<std::recursive_mutex> guard(*(sourceMutex_.get()));
1584 
1585  auto itemType = iStatus.continuingLumi() ? InputSource::IsLumi : nextTransitionType();
1586  if (InputSource::IsLumi == itemType) {
1587  iStatus.haveContinuedLumi();
1588  while (itemType == InputSource::IsLumi and iStatus.lumiPrincipal()->run() == input_->run() and
1589  iStatus.lumiPrincipal()->luminosityBlock() == nextLuminosityBlockID()) {
1590  readAndMergeLumi(iStatus);
1591  itemType = nextTransitionType();
1592  }
1593  if (InputSource::IsLumi == itemType) {
1594  iStatus.setNextSyncValue(IOVSyncValue(EventID(input_->run(), input_->luminosityBlock(), 0),
1595  input_->luminosityBlockAuxiliary()->beginTime()));
1596  }
1597  }
1598  if (InputSource::IsEvent != itemType) {
1599  iStatus.stopProcessingEvents();
1600 
1601  //IsFile may continue processing the lumi and
1602  // looper_ can cause the input source to declare a new IsRun which is actually
1603  // just a continuation of the previous run
1604  if (InputSource::IsStop == itemType or InputSource::IsLumi == itemType or
1605  (InputSource::IsRun == itemType and iStatus.lumiPrincipal()->run() != input_->run())) {
1606  iStatus.endLumi();
1607  }
1608  return false;
1609  }
1610  readEvent(iStreamIndex);
1611  } catch (...) {
1612  bool expected = false;
1613  if (deferredExceptionPtrIsSet_.compare_exchange_strong(expected, true)) {
1614  deferredExceptionPtr_ = std::current_exception();
1615  iStatus.endLumi();
1616  }
1617  return false;
1618  }
1619  return true;
1620  }
void readEvent(unsigned int iStreamIndex)
InputSource::ItemType nextTransitionType()
edm::propagate_const< std::unique_ptr< InputSource > > input_
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
std::shared_ptr< std::recursive_mutex > sourceMutex_
InputSource::ItemType lastSourceTransition_
int readAndMergeLumi(LuminosityBlockProcessingStatus &)
std::exception_ptr deferredExceptionPtr_
edm::LuminosityBlockNumber_t nextLuminosityBlockID()
bool shouldWeStop() const
std::pair< ProcessHistoryID, RunNumber_t > edm::EventProcessor::readRun ( )

Definition at line 1433 of file EventProcessor.cc.

References actReg_, Exception, edm::PrincipalCache::hasRunPrincipal(), historyAppender_, input_, edm::PrincipalCache::insert(), edm::errors::LogicError, mergeableRunProductProcesses_, preg(), principalCache_, and processConfiguration_.

1433  {
1435  throw edm::Exception(edm::errors::LogicError) << "EventProcessor::readRun\n"
1436  << "Illegal attempt to insert run into cache\n"
1437  << "Contact a Framework Developer\n";
1438  }
1439  auto rp = std::make_shared<RunPrincipal>(input_->runAuxiliary(),
1440  preg(),
1442  historyAppender_.get(),
1443  0,
1444  true,
1446  {
1447  SendSourceTerminationSignalIfException sentry(actReg_.get());
1448  input_->readRun(*rp, *historyAppender_);
1449  sentry.completedSuccessfully();
1450  }
1451  assert(input_->reducedProcessHistoryID() == rp->reducedProcessHistoryID());
1452  principalCache_.insert(rp);
1453  return std::make_pair(rp->reducedProcessHistoryID(), input_->run());
1454  }
void insert(std::shared_ptr< RunPrincipal > rp)
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
bool hasRunPrincipal() const
MergeableRunProductProcesses mergeableRunProductProcesses_
std::shared_ptr< ProductRegistry const > preg() const
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< ActivityRegistry > actReg_
PrincipalCache principalCache_
void edm::EventProcessor::respondToCloseInputFile ( )

Definition at line 798 of file EventProcessor.cc.

References fb_, FDEBUG, edm::for_all(), schedule_, and subProcesses_.

798  {
799  if (fb_.get() != nullptr) {
800  schedule_->respondToCloseInputFile(*fb_);
801  for_all(subProcesses_, [this](auto& subProcess) { subProcess.respondToCloseInputFile(*fb_); });
802  }
803  FDEBUG(1) << "\trespondToCloseInputFile\n";
804  }
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
#define FDEBUG(lev)
Definition: DebugMacros.h:19
void edm::EventProcessor::respondToOpenInputFile ( )

Definition at line 788 of file EventProcessor.cc.

References branchIDListHelper_, fb_, FDEBUG, edm::for_all(), schedule_, and subProcesses_.

788  {
790  [this](auto& subProcess) { subProcess.updateBranchIDListHelper(branchIDListHelper_->branchIDLists()); });
791  if (fb_.get() != nullptr) {
792  schedule_->respondToOpenInputFile(*fb_);
793  for_all(subProcesses_, [this](auto& subProcess) { subProcess.respondToOpenInputFile(*fb_); });
794  }
795  FDEBUG(1) << "\trespondToOpenInputFile\n";
796  }
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
#define FDEBUG(lev)
Definition: DebugMacros.h:19
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
void edm::EventProcessor::rewindInput ( )

Definition at line 831 of file EventProcessor.cc.

References FDEBUG, and input_.

Referenced by runToCompletion().

831  {
832  input_->repeat();
833  input_->rewind();
834  FDEBUG(1) << "\trewind\n";
835  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
#define FDEBUG(lev)
Definition: DebugMacros.h:19
EventProcessor::StatusCode edm::EventProcessor::run ( )
inline

Definition at line 369 of file EventProcessor.h.

Referenced by Types.EventID::cppID(), Types.LuminosityBlockID::cppID(), and endUnfinishedRun().

369 { return runToCompletion(); }
StatusCode runToCompletion()
EventProcessor::StatusCode edm::EventProcessor::runToCompletion ( )

Definition at line 676 of file EventProcessor.cc.

References cms::Exception::addAdditionalInfo(), cms::Exception::alreadyPrinted(), asyncStopRequestedWhileProcessingEvents_, asyncStopStatusCodeFromProcessingEvents_, beginJob(), deferredExceptionPtr_, deferredExceptionPtrIsSet_, doErrorStuff(), MillePedeFileConverter_cfg::e, endOfLoop(), epSuccess, Exception, exceptionMessageFiles_, exceptionMessageLumis_, exceptionMessageRuns_, fileModeNoMerge_, personalPlayback::fp, edm::InputSource::IsStop, prepareForNextLoop(), runEdmFileComparison::returnCode, rewindInput(), serviceToken_, startingNewLoop(), AlCaHLTBitMon_QueryRunRegistry::string, and edm::convertException::wrap().

Referenced by PythonEventProcessor::run().

676  {
679  {
680  beginJob(); //make sure this was called
681 
682  // make the services available
684 
686  try {
687  FilesProcessor fp(fileModeNoMerge_);
688 
689  convertException::wrap([&]() {
690  bool firstTime = true;
691  do {
692  if (not firstTime) {
694  rewindInput();
695  } else {
696  firstTime = false;
697  }
698  startingNewLoop();
699 
700  auto trans = fp.processFiles(*this);
701 
702  fp.normalEnd();
703 
704  if (deferredExceptionPtrIsSet_.load()) {
705  std::rethrow_exception(deferredExceptionPtr_);
706  }
707  if (trans != InputSource::IsStop) {
708  //problem with the source
709  doErrorStuff();
710 
711  throw cms::Exception("BadTransition") << "Unexpected transition change " << trans;
712  }
713  } while (not endOfLoop());
714  }); // convertException::wrap
715 
716  } // Try block
717  catch (cms::Exception& e) {
719  std::string message(
720  "Another exception was caught while trying to clean up lumis after the primary fatal exception.");
721  e.addAdditionalInfo(message);
722  if (e.alreadyPrinted()) {
723  LogAbsolute("Additional Exceptions") << message;
724  }
725  }
726  if (!exceptionMessageRuns_.empty()) {
728  if (e.alreadyPrinted()) {
729  LogAbsolute("Additional Exceptions") << exceptionMessageRuns_;
730  }
731  }
732  if (!exceptionMessageFiles_.empty()) {
734  if (e.alreadyPrinted()) {
735  LogAbsolute("Additional Exceptions") << exceptionMessageFiles_;
736  }
737  }
738  throw;
739  }
740  }
741 
742  return returnCode;
743  }
std::atomic< bool > exceptionMessageLumis_
std::string exceptionMessageRuns_
bool alreadyPrinted() const
Definition: Exception.cc:177
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
void addAdditionalInfo(std::string const &info)
Definition: Exception.cc:169
std::string exceptionMessageFiles_
StatusCode asyncStopStatusCodeFromProcessingEvents_
std::exception_ptr deferredExceptionPtr_
auto wrap(F iFunc) -> decltype(iFunc())
bool asyncStopRequestedWhileProcessingEvents_
bool edm::EventProcessor::setDeferredException ( std::exception_ptr  iException)

Definition at line 1797 of file EventProcessor.cc.

References deferredExceptionPtr_, and deferredExceptionPtrIsSet_.

Referenced by handleEndLumiExceptions().

1797  {
1798  bool expected = false;
1799  if (deferredExceptionPtrIsSet_.compare_exchange_strong(expected, true)) {
1800  deferredExceptionPtr_ = iException;
1801  return true;
1802  }
1803  return false;
1804  }
std::atomic< bool > deferredExceptionPtrIsSet_
std::exception_ptr deferredExceptionPtr_
void edm::EventProcessor::setExceptionMessageFiles ( std::string &  message)

Definition at line 1791 of file EventProcessor.cc.

References exceptionMessageFiles_.

1791 { exceptionMessageFiles_ = message; }
std::string exceptionMessageFiles_
void edm::EventProcessor::setExceptionMessageLumis ( )

Definition at line 1795 of file EventProcessor.cc.

References exceptionMessageLumis_.

Referenced by handleEndLumiExceptions().

1795 { exceptionMessageLumis_ = true; }
std::atomic< bool > exceptionMessageLumis_
void edm::EventProcessor::setExceptionMessageRuns ( std::string &  message)

Definition at line 1793 of file EventProcessor.cc.

References exceptionMessageRuns_.

1793 { exceptionMessageRuns_ = message; }
std::string exceptionMessageRuns_
bool edm::EventProcessor::shouldWeCloseOutput ( ) const

Definition at line 842 of file EventProcessor.cc.

References FDEBUG, schedule_, and subProcesses_.

842  {
843  FDEBUG(1) << "\tshouldWeCloseOutput\n";
844  if (!subProcesses_.empty()) {
845  for (auto const& subProcess : subProcesses_) {
846  if (subProcess.shouldWeCloseOutput()) {
847  return true;
848  }
849  }
850  return false;
851  }
852  return schedule_->shouldWeCloseOutput();
853  }
std::vector< SubProcess > subProcesses_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
#define FDEBUG(lev)
Definition: DebugMacros.h:19
bool edm::EventProcessor::shouldWeStop ( ) const

Definition at line 1776 of file EventProcessor.cc.

References FDEBUG, schedule_, shouldWeStop_, and subProcesses_.

Referenced by readNextEventForStream().

1776  {
1777  FDEBUG(1) << "\tshouldWeStop\n";
1778  if (shouldWeStop_)
1779  return true;
1780  if (!subProcesses_.empty()) {
1781  for (auto const& subProcess : subProcesses_) {
1782  if (subProcess.terminate()) {
1783  return true;
1784  }
1785  }
1786  return false;
1787  }
1788  return schedule_->terminate();
1789  }
std::vector< SubProcess > subProcesses_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
#define FDEBUG(lev)
Definition: DebugMacros.h:19
void edm::EventProcessor::startingNewLoop ( )

Definition at line 806 of file EventProcessor.cc.

References FDEBUG, looper_, looperBeginJobRun_, and shouldWeStop_.

Referenced by runToCompletion().

806  {
807  shouldWeStop_ = false;
808  //NOTE: for first loop, need to delay calling 'doStartingNewLoop'
809  // until after we've called beginOfJob
810  if (looper_ && looperBeginJobRun_) {
811  looper_->doStartingNewLoop();
812  }
813  FDEBUG(1) << "\tstartingNewLoop\n";
814  }
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
#define FDEBUG(lev)
Definition: DebugMacros.h:19
void edm::EventProcessor::streamEndLumiAsync ( edm::WaitingTaskHolder  iTask,
unsigned int  iStreamIndex,
std::shared_ptr< LuminosityBlockProcessingStatus iLumiStatus 
)

Definition at line 1368 of file EventProcessor.cc.

References esp_, globalEndLumiAsync(), handleEndLumiExceptions(), edm::make_waiting_task(), edm::EventID::maxEventNumber(), eostools::move(), schedule_, serviceToken_, mps_update::status, streamLumiActive_, streamLumiStatus_, streamQueues_, subProcesses_, and OrderedSet::t.

Referenced by beginLumiAsync(), endUnfinishedLumi(), and handleNextEventForStreamAsync().

1370  {
1371  auto t = edm::make_waiting_task(tbb::task::allocate_root(),
1372  [this, iStreamIndex, iTask](std::exception_ptr const* iPtr) mutable {
1373  if (iPtr) {
1374  handleEndLumiExceptions(iPtr, iTask);
1375  }
1376  auto status = streamLumiStatus_[iStreamIndex];
1377  //reset status before releasing queue else get race condtion
1378  streamLumiStatus_[iStreamIndex].reset();
1380  streamQueues_[iStreamIndex].resume();
1381 
1382  //are we the last one?
1383  if (status->streamFinishedLumi()) {
1385  }
1386  });
1387 
1388  edm::WaitingTaskHolder lumiDoneTask{t};
1389 
1390  iLumiStatus->setEndTime();
1391 
1392  if (iLumiStatus->didGlobalBeginSucceed()) {
1393  auto& lumiPrincipal = *iLumiStatus->lumiPrincipal();
1394  IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), EventID::maxEventNumber()),
1395  lumiPrincipal.endTime());
1396  EventSetupImpl const& es = iLumiStatus->eventSetupImpl(esp_->subProcessIndex());
1397 
1398  bool cleaningUpAfterException = iLumiStatus->cleaningUpAfterException();
1399 
1400  using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamEnd>;
1401  endStreamTransitionAsync<Traits>(std::move(lumiDoneTask),
1402  *schedule_,
1403  iStreamIndex,
1404  lumiPrincipal,
1405  ts,
1406  es,
1407  &iLumiStatus->eventSetupImpls(),
1408  serviceToken_,
1409  subProcesses_,
1410  cleaningUpAfterException);
1411  }
1412  }
void globalEndLumiAsync(edm::WaitingTaskHolder iTask, std::shared_ptr< LuminosityBlockProcessingStatus > iLumiStatus)
std::vector< SubProcess > subProcesses_
ServiceToken serviceToken_
std::vector< edm::SerialTaskQueue > streamQueues_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:87
static EventNumber_t maxEventNumber()
Definition: EventID.h:96
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
void handleEndLumiExceptions(std::exception_ptr const *iPtr, WaitingTaskHolder &holder)
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::atomic< unsigned int > streamLumiActive_
def move(src, dest)
Definition: eostools.py:511
std::shared_ptr<ThinnedAssociationsHelper const> edm::EventProcessor::thinnedAssociationsHelper ( ) const
inlineprivate

Definition at line 293 of file EventProcessor.h.

References edm::get_underlying_safe().

Referenced by init().

293  {
295  }
std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
std::shared_ptr<ThinnedAssociationsHelper>& edm::EventProcessor::thinnedAssociationsHelper ( )
inlineprivate

Definition at line 296 of file EventProcessor.h.

References edm::get_underlying_safe().

296  {
298  }
std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
int edm::EventProcessor::totalEvents ( ) const

Return the number of events this EventProcessor has tried to process (inclues both successes and failures, including failures due to exceptions during processing).

Definition at line 615 of file EventProcessor.cc.

References schedule_.

Referenced by PythonEventProcessor::totalEvents().

615 { return schedule_->totalEvents(); }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
int edm::EventProcessor::totalEventsFailed ( ) const

Return the number of events that have not passed any trigger. (N.B. totalEventsFailed() + totalEventsPassed() == totalEvents()

Definition at line 619 of file EventProcessor.cc.

References schedule_.

Referenced by PythonEventProcessor::totalEventsFailed().

619 { return schedule_->totalEventsFailed(); }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
int edm::EventProcessor::totalEventsPassed ( ) const

Return the number of events processed by this EventProcessor which have been passed by one or more trigger paths.

Definition at line 617 of file EventProcessor.cc.

References schedule_.

Referenced by PythonEventProcessor::totalEventsPassed().

617 { return schedule_->totalEventsPassed(); }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
void edm::EventProcessor::warnAboutModulesRequiringLuminosityBLockSynchronization ( ) const
private

Definition at line 1806 of file EventProcessor.cc.

References alignCSCRings::s, and schedule_.

Referenced by beginJob().

1806  {
1807  std::unique_ptr<LogSystem> s;
1808  for (auto worker : schedule_->allWorkers()) {
1809  if (worker->wantsGlobalLuminosityBlocks() and worker->globalLuminosityBlocksQueue()) {
1810  if (not s) {
1811  s = std::make_unique<LogSystem>("ModulesSynchingOnLumis");
1812  (*s) << "The following modules require synchronizing on LuminosityBlock boundaries:";
1813  }
1814  (*s) << "\n " << worker->description().moduleName() << " " << worker->description().moduleLabel();
1815  }
1816  }
1817  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
void edm::EventProcessor::writeLumiAsync ( WaitingTaskHolder  task,
LuminosityBlockPrincipal lumiPrincipal 
)

Definition at line 1534 of file EventProcessor.cc.

References actReg_, edm::WaitingTaskHolder::doneWaiting(), edm::LuminosityBlockPrincipal::luminosityBlock(), edm::make_waiting_task(), edm::RunPrincipal::mergeableRunProductMetadata(), processContext_, edm::LuminosityBlockPrincipal::runPrincipal(), alignCSCRings::s, schedule_, serviceToken_, subProcesses_, and edm::MergeableRunProductMetadata::writeLumi().

Referenced by globalEndLumiAsync().

1534  {
1535  auto subsT = edm::make_waiting_task(tbb::task::allocate_root(),
1536  [this, task, &lumiPrincipal](std::exception_ptr const* iExcept) mutable {
1537  if (iExcept) {
1538  task.doneWaiting(*iExcept);
1539  } else {
1541  for (auto& s : subProcesses_) {
1542  s.writeLumiAsync(task, lumiPrincipal);
1543  }
1544  }
1545  });
1547 
1548  lumiPrincipal.runPrincipal().mergeableRunProductMetadata()->writeLumi(lumiPrincipal.luminosityBlock());
1549 
1550  schedule_->writeLumiAsync(WaitingTaskHolder{subsT}, lumiPrincipal, &processContext_, actReg_.get());
1551  }
ProcessContext processContext_
std::vector< SubProcess > subProcesses_
ServiceToken serviceToken_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:87
std::shared_ptr< ActivityRegistry > actReg_
void edm::EventProcessor::writeRunAsync ( WaitingTaskHolder  task,
ProcessHistoryID const &  phid,
RunNumber_t  run,
MergeableRunProductMetadata const *  mergeableRunProductMetadata 
)

Definition at line 1504 of file EventProcessor.cc.

References actReg_, edm::WaitingTaskHolder::doneWaiting(), edm::make_waiting_task(), principalCache_, processContext_, edm::PrincipalCache::runPrincipal(), alignCSCRings::s, schedule_, serviceToken_, and subProcesses_.

Referenced by endUnfinishedRun().

1507  {
1508  auto subsT = edm::make_waiting_task(
1509  tbb::task::allocate_root(),
1510  [this, phid, run, task, mergeableRunProductMetadata](std::exception_ptr const* iExcept) mutable {
1511  if (iExcept) {
1512  task.doneWaiting(*iExcept);
1513  } else {
1515  for (auto& s : subProcesses_) {
1516  s.writeRunAsync(task, phid, run, mergeableRunProductMetadata);
1517  }
1518  }
1519  });
1521  schedule_->writeRunAsync(WaitingTaskHolder(subsT),
1523  &processContext_,
1524  actReg_.get(),
1525  mergeableRunProductMetadata);
1526  }
ProcessContext processContext_
std::vector< SubProcess > subProcesses_
ServiceToken serviceToken_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:87
std::shared_ptr< ActivityRegistry > actReg_
PrincipalCache principalCache_
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const

Member Data Documentation

std::unique_ptr<ExceptionToActionTable const> edm::EventProcessor::act_table_
private

Definition at line 320 of file EventProcessor.h.

Referenced by init().

std::shared_ptr<ActivityRegistry> edm::EventProcessor::actReg_
private
bool edm::EventProcessor::asyncStopRequestedWhileProcessingEvents_
private

Definition at line 356 of file EventProcessor.h.

Referenced by runToCompletion().

StatusCode edm::EventProcessor::asyncStopStatusCodeFromProcessingEvents_
private

Definition at line 357 of file EventProcessor.h.

Referenced by runToCompletion().

bool edm::EventProcessor::beginJobCalled_
private

Definition at line 344 of file EventProcessor.h.

Referenced by beginJob().

edm::propagate_const<std::shared_ptr<BranchIDListHelper> > edm::EventProcessor::branchIDListHelper_
private

Definition at line 312 of file EventProcessor.h.

Referenced by init(), and respondToOpenInputFile().

std::exception_ptr edm::EventProcessor::deferredExceptionPtr_
private
std::atomic<bool> edm::EventProcessor::deferredExceptionPtrIsSet_
private
edm::propagate_const<std::shared_ptr<eventsetup::EventSetupProvider> > edm::EventProcessor::esp_
private
edm::propagate_const<std::unique_ptr<eventsetup::EventSetupsController> > edm::EventProcessor::espController_
private

Definition at line 317 of file EventProcessor.h.

Referenced by beginJob(), beginLumiAsync(), beginRun(), endRun(), init(), and ~EventProcessor().

ExcludedDataMap edm::EventProcessor::eventSetupDataToExcludeFromPrefetching_
private

Definition at line 362 of file EventProcessor.h.

std::string edm::EventProcessor::exceptionMessageFiles_
private

Definition at line 347 of file EventProcessor.h.

Referenced by runToCompletion(), and setExceptionMessageFiles().

std::atomic<bool> edm::EventProcessor::exceptionMessageLumis_
private

Definition at line 349 of file EventProcessor.h.

Referenced by runToCompletion(), and setExceptionMessageLumis().

std::string edm::EventProcessor::exceptionMessageRuns_
private

Definition at line 348 of file EventProcessor.h.

Referenced by runToCompletion(), and setExceptionMessageRuns().

edm::propagate_const<std::unique_ptr<FileBlock> > edm::EventProcessor::fb_
private
bool edm::EventProcessor::fileModeNoMerge_
private

Definition at line 346 of file EventProcessor.h.

Referenced by init(), and runToCompletion().

bool edm::EventProcessor::firstEventInBlock_ = true
private

Definition at line 358 of file EventProcessor.h.

bool edm::EventProcessor::forceESCacheClearOnNewRun_
private

Definition at line 352 of file EventProcessor.h.

Referenced by beginRun(), and init().

bool edm::EventProcessor::forceLooperToEnd_
private

Definition at line 350 of file EventProcessor.h.

Referenced by endOfLoop().

edm::propagate_const<std::unique_ptr<HistoryAppender> > edm::EventProcessor::historyAppender_
private

Definition at line 332 of file EventProcessor.h.

Referenced by init(), readLuminosityBlock(), and readRun().

edm::propagate_const<std::unique_ptr<InputSource> > edm::EventProcessor::input_
private
InputSource::ItemType edm::EventProcessor::lastSourceTransition_
private
edm::propagate_const<std::shared_ptr<EDLooperBase> > edm::EventProcessor::looper_
private
bool edm::EventProcessor::looperBeginJobRun_
private

Definition at line 351 of file EventProcessor.h.

Referenced by beginRun(), and startingNewLoop().

std::unique_ptr<edm::LimitedTaskQueue> edm::EventProcessor::lumiQueue_
private

Definition at line 327 of file EventProcessor.h.

Referenced by beginLumiAsync(), and init().

MergeableRunProductProcesses edm::EventProcessor::mergeableRunProductProcesses_
private

Definition at line 324 of file EventProcessor.h.

Referenced by init(), and readRun().

PathsAndConsumesOfModules edm::EventProcessor::pathsAndConsumesOfModules_
private

Definition at line 323 of file EventProcessor.h.

Referenced by beginJob().

PreallocationConfiguration edm::EventProcessor::preallocations_
private
edm::propagate_const<std::shared_ptr<ProductRegistry> > edm::EventProcessor::preg_
private

Definition at line 311 of file EventProcessor.h.

Referenced by beginJob(), endOfLoop(), init(), and readFile().

PrincipalCache edm::EventProcessor::principalCache_
private
bool edm::EventProcessor::printDependencies_ = false
private

Definition at line 364 of file EventProcessor.h.

Referenced by beginJob(), and init().

std::shared_ptr<ProcessConfiguration const> edm::EventProcessor::processConfiguration_
private

Definition at line 321 of file EventProcessor.h.

Referenced by beginJob(), init(), and readRun().

ProcessContext edm::EventProcessor::processContext_
private
edm::SerialTaskQueue edm::EventProcessor::queueWhichWaitsForIOVsToFinish_
private

Definition at line 319 of file EventProcessor.h.

Referenced by beginLumiAsync(), and globalEndLumiAsync().

edm::propagate_const<std::unique_ptr<Schedule> > edm::EventProcessor::schedule_
private
ServiceToken edm::EventProcessor::serviceToken_
private
bool edm::EventProcessor::shouldWeStop_
private

Definition at line 345 of file EventProcessor.h.

Referenced by processEventWithLooper(), shouldWeStop(), and startingNewLoop().

std::shared_ptr<std::recursive_mutex> edm::EventProcessor::sourceMutex_
private

Definition at line 342 of file EventProcessor.h.

Referenced by readNextEventForStream().

SharedResourcesAcquirer edm::EventProcessor::sourceResourcesAcquirer_
private

Definition at line 341 of file EventProcessor.h.

Referenced by beginLumiAsync(), and handleNextEventForStreamAsync().

std::atomic<unsigned int> edm::EventProcessor::streamLumiActive_ {0}
private
std::vector<std::shared_ptr<LuminosityBlockProcessingStatus> > edm::EventProcessor::streamLumiStatus_
private
std::vector<edm::SerialTaskQueue> edm::EventProcessor::streamQueues_
private

Definition at line 326 of file EventProcessor.h.

Referenced by beginLumiAsync(), init(), and streamEndLumiAsync().

std::vector<SubProcess> edm::EventProcessor::subProcesses_
private
edm::propagate_const<std::shared_ptr<ThinnedAssociationsHelper> > edm::EventProcessor::thinnedAssociationsHelper_
private

Definition at line 313 of file EventProcessor.h.

Referenced by init().