CMS 3D CMS Logo

List of all members | Public Types | Public Member Functions | Private Member Functions | Private Attributes
edm::Schedule Class Reference

#include <Schedule.h>

Public Types

typedef std::vector< edm::propagate_const< std::shared_ptr< OutputModuleCommunicator > > > AllOutputModuleCommunicators
 
typedef std::vector< Worker * > AllWorkers
 
typedef std::vector< std::string > vstring
 
typedef std::vector< Worker * > Workers
 

Public Member Functions

AllWorkers const & allWorkers () const
 returns the collection of pointers to workers More...
 
void availablePaths (std::vector< std::string > &oLabelsToFill) const
 adds to oLabelsToFill the labels for all paths in the process More...
 
void beginJob (ProductRegistry const &, eventsetup::ESRecordsToProductResolverIndices const &, ProcessBlockHelperBase const &, PathsAndConsumesOfModulesBase const &, ProcessContext const &)
 
void beginStream (unsigned int streamID)
 
bool changeModule (std::string const &iLabel, ParameterSet const &iPSet, const ProductRegistry &iRegistry, eventsetup::ESRecordsToProductResolverIndices const &)
 
void clearCounters ()
 Clear all the counters in the trigger report. More...
 
void closeOutputFiles ()
 
void convertCurrentProcessAlias (std::string const &processName)
 Convert "@currentProcess" in InputTag process names to the actual current process name. More...
 
void deleteModule (std::string const &iLabel, ActivityRegistry *areg)
 Deletes module with label iLabel. More...
 
void endJob (ExceptionCollector &collector)
 
void endPaths (std::vector< std::string > &oLabelsToFill) const
 adds to oLabelsToFill the labels for all end paths in the process More...
 
void endStream (unsigned int streamID, ExceptionCollector &collector, std::mutex &collectorMutex) noexcept
 
void fillModuleAndConsumesInfo (std::vector< ModuleDescription const *> &allModuleDescriptions, std::vector< std::pair< unsigned int, unsigned int >> &moduleIDToIndex, std::array< std::vector< std::vector< ModuleDescription const *>>, NumBranchTypes > &modulesWhoseProductsAreConsumedBy, std::vector< std::vector< ModuleProcessName >> &modulesInPreviousProcessesWhoseProductsAreConsumedBy, ProductRegistry const &preg) const
 
void finishSetup (ParameterSet &proc_pset, service::TriggerNamesService const &tns, ProductRegistry &preg, BranchIDListHelper &branchIDListHelper, ProcessBlockHelperBase &processBlockHelper, ThinnedAssociationsHelper &thinnedAssociationsHelper, SubProcessParentageHelper const *subProcessParentageHelper, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration > processConfiguration, bool hasSubprocesses, PreallocationConfiguration const &prealloc, ProcessContext const *processContext)
 
std::vector< ModuleDescription const * > getAllModuleDescriptions () const
 
void getTriggerReport (TriggerReport &rep) const
 
void getTriggerTimingReport (TriggerTimingReport &rep) const
 
void initializeEarlyDelete (std::vector< std::string > const &branchesToDeleteEarly, std::multimap< std::string, std::string > const &referencesToBranches, std::vector< std::string > const &modulesToSkip, edm::ProductRegistry const &preg)
 
void moduleDescriptionsInEndPath (std::string const &iEndPathLabel, std::vector< ModuleDescription const *> &descriptions, unsigned int hint) const
 
void moduleDescriptionsInPath (std::string const &iPathLabel, std::vector< ModuleDescription const *> &descriptions, unsigned int hint) const
 
void modulesInPath (std::string const &iPathLabel, std::vector< std::string > &oLabelsToFill) const
 adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel More...
 
void openOutputFiles (FileBlock &fb)
 
void processOneEventAsync (WaitingTaskHolder iTask, unsigned int iStreamID, EventTransitionInfo &, ServiceToken const &token)
 
template<typename T >
void processOneGlobalAsync (WaitingTaskHolder iTask, typename T::TransitionInfoType &transitionInfo, ServiceToken const &token, bool cleaningUpAfterException=false)
 
template<typename T >
void processOneStreamAsync (WaitingTaskHolder iTask, unsigned int iStreamID, typename T::TransitionInfoType &transitionInfo, ServiceToken const &token, bool cleaningUpAfterException=false)
 
void respondToCloseInputFile (FileBlock const &fb)
 
void respondToOpenInputFile (FileBlock const &fb)
 
 Schedule (ParameterSet &proc_pset, service::TriggerNamesService const &tns, ProductRegistry &pregistry, ExceptionToActionTable const &actions, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, PreallocationConfiguration const &config, ProcessContext const *processContext, ModuleTypeResolverMaker const *resolverMaker)
 
void sendFwkSummaryToMessageLogger () const
 
bool shouldWeCloseOutput () const
 
bool terminate () const
 Return whether each output module has reached its maximum count. More...
 
int totalEvents () const
 
int totalEventsFailed () const
 
int totalEventsPassed () const
 
void triggerPaths (std::vector< std::string > &oLabelsToFill) const
 
void writeLumiAsync (WaitingTaskHolder iTask, LuminosityBlockPrincipal const &lbp, ProcessContext const *, ActivityRegistry *)
 
void writeProcessBlockAsync (WaitingTaskHolder iTask, ProcessBlockPrincipal const &, ProcessContext const *, ActivityRegistry *)
 
void writeRunAsync (WaitingTaskHolder iTask, RunPrincipal const &rp, ProcessContext const *, ActivityRegistry *, MergeableRunProductMetadata const *)
 

Private Member Functions

void limitOutput (ParameterSet const &proc_pset, BranchIDLists const &branchIDLists, SubProcessParentageHelper const *subProcessParentageHelper)
 
std::shared_ptr< ModuleRegistry const > moduleRegistry () const
 
std::shared_ptr< ModuleRegistry > & moduleRegistry ()
 
std::shared_ptr< TriggerResultInserter const > resultsInserter () const
 
std::shared_ptr< TriggerResultInserter > & resultsInserter ()
 

Private Attributes

AllOutputModuleCommunicators all_output_communicators_
 
std::vector< std::string > const * endPathNames_
 
std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter > > > endPathStatusInserters_
 
edm::propagate_const< std::unique_ptr< GlobalSchedule > > globalSchedule_
 
edm::propagate_const< std::shared_ptr< ModuleRegistry > > moduleRegistry_
 
std::vector< std::string > const * pathNames_
 
std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter > > > pathStatusInserters_
 
PreallocationConfiguration preallocConfig_
 
edm::propagate_const< std::shared_ptr< TriggerResultInserter > > resultsInserter_
 
std::vector< edm::propagate_const< std::shared_ptr< StreamSchedule > > > streamSchedules_
 
edm::propagate_const< std::unique_ptr< SystemTimeKeeper > > summaryTimeKeeper_
 
bool wantSummary_
 

Detailed Description

Definition at line 123 of file Schedule.h.

Member Typedef Documentation

◆ AllOutputModuleCommunicators

Definition at line 127 of file Schedule.h.

◆ AllWorkers

typedef std::vector<Worker*> edm::Schedule::AllWorkers

Definition at line 126 of file Schedule.h.

◆ vstring

typedef std::vector<std::string> edm::Schedule::vstring

Definition at line 125 of file Schedule.h.

◆ Workers

typedef std::vector<Worker*> edm::Schedule::Workers

Definition at line 129 of file Schedule.h.

Constructor & Destructor Documentation

◆ Schedule()

edm::Schedule::Schedule ( ParameterSet proc_pset,
service::TriggerNamesService const &  tns,
ProductRegistry pregistry,
ExceptionToActionTable const &  actions,
std::shared_ptr< ActivityRegistry areg,
std::shared_ptr< ProcessConfiguration const >  processConfiguration,
PreallocationConfiguration const &  config,
ProcessContext const *  processContext,
ModuleTypeResolverMaker const *  resolverMaker 
)

Definition at line 486 of file Schedule.cc.

References actions, and edm::service::TriggerNamesService::getTrigPaths().

495  : //Only create a resultsInserter if there is a trigger path
496  resultsInserter_{tns.getTrigPaths().empty()
497  ? std::shared_ptr<TriggerResultInserter>{}
498  : makeInserter(proc_pset, prealloc, preg, actions, areg, processConfiguration)},
499  moduleRegistry_(std::make_shared<ModuleRegistry>(resolverMaker)),
501  preallocConfig_(prealloc),
502  pathNames_(&tns.getTrigPaths()),
503  endPathNames_(&tns.getEndPaths()),
504  wantSummary_(tns.wantSummary()) {
505  makePathStatusInserters(pathStatusInserters_,
506  *pathNames_,
507  prealloc,
508  preg,
509  areg,
510  processConfiguration,
511  std::string("PathStatusInserter"));
512 
513  if (endPathNames_->size() > 1) {
514  //NOTE: FinalPaths are a type of EndPath
515  makePathStatusInserters(endPathStatusInserters_,
516  *endPathNames_,
517  prealloc,
518  preg,
519  areg,
520  processConfiguration,
521  std::string("EndPathStatusInserter"));
522  }
523 
524  assert(0 < prealloc.numberOfStreams());
525  streamSchedules_.reserve(prealloc.numberOfStreams());
526  for (unsigned int i = 0; i < prealloc.numberOfStreams(); ++i) {
527  streamSchedules_.emplace_back(make_shared_noexcept_false<StreamSchedule>(resultsInserter(),
530  moduleRegistry(),
531  proc_pset,
532  tns,
533  prealloc,
534  preg,
535  actions,
536  areg,
537  processConfiguration,
538  StreamID{i},
539  processContext));
540  }
541 
542  //TriggerResults are injected automatically by StreamSchedules and are
543  // unknown to the ModuleRegistry
544  const std::string kTriggerResults("TriggerResults");
545  std::vector<std::string> modulesToUse;
546  modulesToUse.reserve(streamSchedules_[0]->allWorkersLumisAndEvents().size());
547  for (auto const& worker : streamSchedules_[0]->allWorkersLumisAndEvents()) {
548  if (worker->description()->moduleLabel() != kTriggerResults) {
549  modulesToUse.push_back(worker->description()->moduleLabel());
550  }
551  }
552  //The unscheduled modules are at the end of the list, but we want them at the front
553  unsigned int const nUnscheduledModules = streamSchedules_[0]->numberOfUnscheduledModules();
554  if (nUnscheduledModules > 0) {
555  std::vector<std::string> temp;
556  temp.reserve(modulesToUse.size());
557  auto itBeginUnscheduled = modulesToUse.begin() + modulesToUse.size() - nUnscheduledModules;
558  std::copy(itBeginUnscheduled, modulesToUse.end(), std::back_inserter(temp));
559  std::copy(modulesToUse.begin(), itBeginUnscheduled, std::back_inserter(temp));
560  temp.swap(modulesToUse);
561  }
562 
563  // propagate_const<T> has no reset() function
564  globalSchedule_ = std::make_unique<GlobalSchedule>(resultsInserter(),
567  moduleRegistry(),
568  modulesToUse,
569  proc_pset,
570  preg,
571  prealloc,
572  actions,
573  areg,
574  processConfiguration,
575  processContext);
576  }
size
Write out results.
roAction_t actions[nactions]
Definition: GenABIO.cc:181
std::vector< std::string > const * pathNames_
Definition: Schedule.h:330
assert(be >=bs)
edm::propagate_const< std::unique_ptr< GlobalSchedule > > globalSchedule_
Definition: Schedule.h:323
std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter > > > pathStatusInserters_
Definition: Schedule.h:318
edm::propagate_const< std::shared_ptr< TriggerResultInserter > > resultsInserter_
Definition: Schedule.h:317
edm::propagate_const< std::shared_ptr< ModuleRegistry > > moduleRegistry_
Definition: Schedule.h:320
std::shared_ptr< ModuleRegistry const > moduleRegistry() const
Definition: Schedule.h:314
PreallocationConfiguration preallocConfig_
Definition: Schedule.h:326
std::vector< edm::propagate_const< std::shared_ptr< StreamSchedule > > > streamSchedules_
Definition: Schedule.h:321
std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter > > > endPathStatusInserters_
Definition: Schedule.h:319
AllOutputModuleCommunicators all_output_communicators_
Definition: Schedule.h:325
bool wantSummary_
Definition: Schedule.h:332
std::vector< std::string > const * endPathNames_
Definition: Schedule.h:331
std::shared_ptr< TriggerResultInserter const > resultsInserter() const
Definition: Schedule.h:310

Member Function Documentation

◆ allWorkers()

Schedule::AllWorkers const & edm::Schedule::allWorkers ( ) const

returns the collection of pointers to workers

Definition at line 1292 of file Schedule.cc.

References globalSchedule_.

Referenced by changeModule(), closeOutputFiles(), convertCurrentProcessAlias(), edm::PathsAndConsumesOfModules::doConsumesInfo(), fillModuleAndConsumesInfo(), finishSetup(), getAllModuleDescriptions(), respondToCloseInputFile(), and respondToOpenInputFile().

1292 { return globalSchedule_->allWorkers(); }
edm::propagate_const< std::unique_ptr< GlobalSchedule > > globalSchedule_
Definition: Schedule.h:323

◆ availablePaths()

void edm::Schedule::availablePaths ( std::vector< std::string > &  oLabelsToFill) const

adds to oLabelsToFill the labels for all paths in the process

Definition at line 1300 of file Schedule.cc.

References streamSchedules_.

Referenced by edm::ScheduleInfo::availablePaths().

1300  {
1301  streamSchedules_[0]->availablePaths(oLabelsToFill);
1302  }
std::vector< edm::propagate_const< std::shared_ptr< StreamSchedule > > > streamSchedules_
Definition: Schedule.h:321

◆ beginJob()

void edm::Schedule::beginJob ( ProductRegistry const &  iRegistry,
eventsetup::ESRecordsToProductResolverIndices const &  iESIndices,
ProcessBlockHelperBase const &  processBlockHelperBase,
PathsAndConsumesOfModulesBase const &  pathsAndConsumesOfModules,
ProcessContext const &  processContext 
)

Definition at line 1188 of file Schedule.cc.

References globalSchedule_.

1192  {
1193  globalSchedule_->beginJob(iRegistry, iESIndices, processBlockHelperBase, pathsAndConsumesOfModules, processContext);
1194  }
edm::propagate_const< std::unique_ptr< GlobalSchedule > > globalSchedule_
Definition: Schedule.h:323

◆ beginStream()

void edm::Schedule::beginStream ( unsigned int  streamID)

Definition at line 1196 of file Schedule.cc.

References cms::cuda::assert(), and streamSchedules_.

1196  {
1197  assert(streamID < streamSchedules_.size());
1198  streamSchedules_[streamID]->beginStream();
1199  }
assert(be >=bs)
std::vector< edm::propagate_const< std::shared_ptr< StreamSchedule > > > streamSchedules_
Definition: Schedule.h:321

◆ changeModule()

bool edm::Schedule::changeModule ( std::string const &  iLabel,
ParameterSet const &  iPSet,
const ProductRegistry iRegistry,
eventsetup::ESRecordsToProductResolverIndices const &  iIndices 
)

clone the type of module with label iLabel but configure with iPSet. Returns true if successful.

Definition at line 1214 of file Schedule.cc.

References allWorkers(), newFWLiteAna::found, globalSchedule_, edm::InEvent, edm::InLumi, edm::InProcess, edm::InRun, moduleRegistry_, preallocConfig_, SimL1EmulatorRepack_CalouGT_cff::processName, edm::ProductRegistry::productLookup(), alignCSCRings::s, and streamSchedules_.

1217  {
1218  Worker* found = nullptr;
1219  for (auto const& worker : allWorkers()) {
1220  if (worker->description()->moduleLabel() == iLabel) {
1221  found = worker;
1222  break;
1223  }
1224  }
1225  if (nullptr == found) {
1226  return false;
1227  }
1228 
1229  auto newMod = moduleRegistry_->replaceModule(iLabel, iPSet, preallocConfig_);
1230 
1231  globalSchedule_->replaceModule(newMod, iLabel);
1232 
1233  for (auto& s : streamSchedules_) {
1234  s->replaceModule(newMod, iLabel);
1235  }
1236 
1237  {
1238  //Need to updateLookup in order to make getByToken work
1239  auto const processBlockLookup = iRegistry.productLookup(InProcess);
1240  auto const runLookup = iRegistry.productLookup(InRun);
1241  auto const lumiLookup = iRegistry.productLookup(InLumi);
1242  auto const eventLookup = iRegistry.productLookup(InEvent);
1243  found->updateLookup(InProcess, *runLookup);
1244  found->updateLookup(InRun, *runLookup);
1245  found->updateLookup(InLumi, *lumiLookup);
1246  found->updateLookup(InEvent, *eventLookup);
1247  found->updateLookup(iIndices);
1248 
1249  auto const& processName = newMod->moduleDescription().processName();
1250  auto const& processBlockModuleToIndicies = processBlockLookup->indiciesForModulesInProcess(processName);
1251  auto const& runModuleToIndicies = runLookup->indiciesForModulesInProcess(processName);
1252  auto const& lumiModuleToIndicies = lumiLookup->indiciesForModulesInProcess(processName);
1253  auto const& eventModuleToIndicies = eventLookup->indiciesForModulesInProcess(processName);
1254  found->resolvePutIndicies(InProcess, processBlockModuleToIndicies);
1255  found->resolvePutIndicies(InRun, runModuleToIndicies);
1256  found->resolvePutIndicies(InLumi, lumiModuleToIndicies);
1257  found->resolvePutIndicies(InEvent, eventModuleToIndicies);
1258  }
1259 
1260  return true;
1261  }
edm::propagate_const< std::unique_ptr< GlobalSchedule > > globalSchedule_
Definition: Schedule.h:323
edm::propagate_const< std::shared_ptr< ModuleRegistry > > moduleRegistry_
Definition: Schedule.h:320
PreallocationConfiguration preallocConfig_
Definition: Schedule.h:326
std::vector< edm::propagate_const< std::shared_ptr< StreamSchedule > > > streamSchedules_
Definition: Schedule.h:321
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
Definition: Schedule.cc:1292

◆ clearCounters()

void edm::Schedule::clearCounters ( )

Clear all the counters in the trigger report.

Definition at line 1416 of file Schedule.cc.

References alignCSCRings::s, and streamSchedules_.

1416  {
1417  for (auto& s : streamSchedules_) {
1418  s->clearCounters();
1419  }
1420  }
std::vector< edm::propagate_const< std::shared_ptr< StreamSchedule > > > streamSchedules_
Definition: Schedule.h:321

◆ closeOutputFiles()

void edm::Schedule::closeOutputFiles ( )

Definition at line 1064 of file Schedule.cc.

References all_output_communicators_, allWorkers(), edm::OutputModuleCommunicator::closeFile(), and edm::for_all().

1064  {
1065  using std::placeholders::_1;
1067  for (auto& worker : allWorkers()) {
1068  worker->respondToCloseOutputFile();
1069  }
1070  }
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
Definition: Schedule.cc:1292
AllOutputModuleCommunicators all_output_communicators_
Definition: Schedule.h:325

◆ convertCurrentProcessAlias()

void edm::Schedule::convertCurrentProcessAlias ( std::string const &  processName)

Convert "@currentProcess" in InputTag process names to the actual current process name.

Definition at line 1294 of file Schedule.cc.

References allWorkers(), and SimL1EmulatorRepack_CalouGT_cff::processName.

1294  {
1295  for (auto const& worker : allWorkers()) {
1296  worker->convertCurrentProcessAlias(processName);
1297  }
1298  }
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
Definition: Schedule.cc:1292

◆ deleteModule()

void edm::Schedule::deleteModule ( std::string const &  iLabel,
ActivityRegistry areg 
)

Deletes module with label iLabel.

Definition at line 1263 of file Schedule.cc.

References globalSchedule_, moduleRegistry_, edm::ActivityRegistry::postModuleDestructionSignal_, edm::ActivityRegistry::preModuleDestructionSignal_, cms::cuda::stream, and streamSchedules_.

1263  {
1264  globalSchedule_->deleteModule(iLabel);
1265  for (auto& stream : streamSchedules_) {
1266  stream->deleteModule(iLabel);
1267  }
1268  moduleRegistry_->deleteModule(iLabel, areg->preModuleDestructionSignal_, areg->postModuleDestructionSignal_);
1269  }
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
edm::propagate_const< std::unique_ptr< GlobalSchedule > > globalSchedule_
Definition: Schedule.h:323
edm::propagate_const< std::shared_ptr< ModuleRegistry > > moduleRegistry_
Definition: Schedule.h:320
std::vector< edm::propagate_const< std::shared_ptr< StreamSchedule > > > streamSchedules_
Definition: Schedule.h:321

◆ endJob()

void edm::Schedule::endJob ( ExceptionCollector collector)

Definition at line 783 of file Schedule.cc.

References edm::ExceptionCollector::addException(), globalSchedule_, edm::ExceptionCollector::hasThrown(), sendFwkSummaryToMessageLogger(), wantSummary_, and edm::convertException::wrap().

783  {
784  globalSchedule_->endJob(collector);
785  if (collector.hasThrown()) {
786  return;
787  }
788 
789  if (wantSummary_) {
790  try {
792  } catch (cms::Exception const& ex) {
793  collector.addException(ex);
794  }
795  }
796  }
edm::propagate_const< std::unique_ptr< GlobalSchedule > > globalSchedule_
Definition: Schedule.h:323
bool wantSummary_
Definition: Schedule.h:332
void sendFwkSummaryToMessageLogger() const
Definition: Schedule.cc:798
auto wrap(F iFunc) -> decltype(iFunc())

◆ endPaths()

void edm::Schedule::endPaths ( std::vector< std::string > &  oLabelsToFill) const

adds to oLabelsToFill the labels for all end paths in the process

Definition at line 1306 of file Schedule.cc.

References endPathNames_.

1306 { oLabelsToFill = *endPathNames_; }
std::vector< std::string > const * endPathNames_
Definition: Schedule.h:331

◆ endStream()

void edm::Schedule::endStream ( unsigned int  streamID,
ExceptionCollector collector,
std::mutex collectorMutex 
)
noexcept

Definition at line 1201 of file Schedule.cc.

References cms::cuda::assert(), and streamSchedules_.

1201  {
1202  assert(streamID < streamSchedules_.size());
1203  streamSchedules_[streamID]->endStream(collector, collectorMutex);
1204  }
assert(be >=bs)
std::vector< edm::propagate_const< std::shared_ptr< StreamSchedule > > > streamSchedules_
Definition: Schedule.h:321

◆ fillModuleAndConsumesInfo()

void edm::Schedule::fillModuleAndConsumesInfo ( std::vector< ModuleDescription const *> &  allModuleDescriptions,
std::vector< std::pair< unsigned int, unsigned int >> &  moduleIDToIndex,
std::array< std::vector< std::vector< ModuleDescription const *>>, NumBranchTypes > &  modulesWhoseProductsAreConsumedBy,
std::vector< std::vector< ModuleProcessName >> &  modulesInPreviousProcessesWhoseProductsAreConsumedBy,
ProductRegistry const &  preg 
) const

Definition at line 1324 of file Schedule.cc.

References cms::Exception::addContext(), allWorkers(), mps_fire::i, LogMessageMonitor_cff::modules, edm::NumBranchTypes, AlCaHLTBitMon_ParallelJobs::p, edm::sort_all(), and mitigatedMETSequence_cff::U.

1329  {
1330  allModuleDescriptions.clear();
1331  moduleIDToIndex.clear();
1332  for (auto iBranchType = 0U; iBranchType < NumBranchTypes; ++iBranchType) {
1333  modulesWhoseProductsAreConsumedBy[iBranchType].clear();
1334  }
1335  modulesInPreviousProcessesWhoseProductsAreConsumedBy.clear();
1336 
1337  allModuleDescriptions.reserve(allWorkers().size());
1338  moduleIDToIndex.reserve(allWorkers().size());
1339  for (auto iBranchType = 0U; iBranchType < NumBranchTypes; ++iBranchType) {
1340  modulesWhoseProductsAreConsumedBy[iBranchType].resize(allWorkers().size());
1341  }
1342  modulesInPreviousProcessesWhoseProductsAreConsumedBy.resize(allWorkers().size());
1343 
1344  std::map<std::string, ModuleDescription const*> labelToDesc;
1345  unsigned int i = 0;
1346  for (auto const& worker : allWorkers()) {
1347  ModuleDescription const* p = worker->description();
1348  allModuleDescriptions.push_back(p);
1349  moduleIDToIndex.push_back(std::pair<unsigned int, unsigned int>(p->id(), i));
1350  labelToDesc[p->moduleLabel()] = p;
1351  ++i;
1352  }
1353  sort_all(moduleIDToIndex);
1354 
1355  i = 0;
1356  for (auto const& worker : allWorkers()) {
1357  std::array<std::vector<ModuleDescription const*>*, NumBranchTypes> modules;
1358  for (auto iBranchType = 0U; iBranchType < NumBranchTypes; ++iBranchType) {
1359  modules[iBranchType] = &modulesWhoseProductsAreConsumedBy[iBranchType].at(i);
1360  }
1361 
1362  std::vector<ModuleProcessName>& modulesInPreviousProcesses =
1363  modulesInPreviousProcessesWhoseProductsAreConsumedBy.at(i);
1364  try {
1365  worker->modulesWhoseProductsAreConsumed(modules, modulesInPreviousProcesses, preg, labelToDesc);
1366  } catch (cms::Exception& ex) {
1367  ex.addContext("Calling Worker::modulesWhoseProductsAreConsumed() for module " +
1368  worker->description()->moduleLabel());
1369  throw;
1370  }
1371  ++i;
1372  }
1373  }
size
Write out results.
void sort_all(RandomAccessSequence &s)
wrappers for std::sort
Definition: Algorithms.h:92
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
Definition: Schedule.cc:1292
void addContext(std::string const &context)
Definition: Exception.cc:169

◆ finishSetup()

void edm::Schedule::finishSetup ( ParameterSet proc_pset,
service::TriggerNamesService const &  tns,
ProductRegistry preg,
BranchIDListHelper branchIDListHelper,
ProcessBlockHelperBase processBlockHelper,
ThinnedAssociationsHelper thinnedAssociationsHelper,
SubProcessParentageHelper const *  subProcessParentageHelper,
std::shared_ptr< ActivityRegistry areg,
std::shared_ptr< ProcessConfiguration processConfiguration,
bool  hasSubprocesses,
PreallocationConfiguration const &  prealloc,
ProcessContext const *  processContext 
)

Definition at line 578 of file Schedule.cc.

References autoCond::aliases, all_output_communicators_, allWorkers(), edm::ProductRegistry::anyProductProduced(), cms::cuda::assert(), edm::BranchIDListHelper::branchIDLists(), DummyCfis::c, edm::maker::ModuleHolder::createOutputModuleCommunicator(), edm::service::TriggerNamesService::getEndPaths(), edm::ParameterSet::getParameter(), edm::ParameterSet::id(), limitOutput(), moduleRegistry_, edm::PreallocationConfiguration::numberOfStreams(), edm::SystemTimeKeeper::pauseModuleEvent(), edm::detail::processEDAliases(), edm::PRODUCT_TYPE, edm::ProductRegistry::productListUpdator(), edm::ParameterSet::registerIt(), edm::SystemTimeKeeper::removeModuleIfExists(), edm::SystemTimeKeeper::restartModuleEvent(), edm::ProductRegistry::setFrozen(), edm::setIsMergeable(), edm::ProductRegistry::setUnscheduledProducts(), edm::SystemTimeKeeper::startEvent(), edm::SystemTimeKeeper::startModuleEvent(), edm::SystemTimeKeeper::startPath(), edm::SystemTimeKeeper::startProcessingLoop(), edm::SystemTimeKeeper::stopEvent(), edm::SystemTimeKeeper::stopModuleEvent(), edm::SystemTimeKeeper::stopPath(), edm::SystemTimeKeeper::stopProcessingLoop(), streamSchedules_, AlCaHLTBitMon_QueryRunRegistry::string, summaryTimeKeeper_, HcalDetIdTransform::transform(), edm::ProcessBlockHelperBase::updateForNewProcess(), edm::BranchIDListHelper::updateFromRegistry(), wantSummary_, and EcalCalibMonitorClient_cfi::workers.

589  {
590  //TriggerResults is not in the top level ParameterSet so the call to
591  // reduceParameterSet would fail to find it. Just remove it up front.
592  const std::string kTriggerResults("TriggerResults");
593 
594  std::set<std::string> usedModuleLabels;
595  for (auto const& worker : allWorkers()) {
596  if (worker->description()->moduleLabel() != kTriggerResults) {
597  usedModuleLabels.insert(worker->description()->moduleLabel());
598  }
599  }
600  std::vector<std::string> modulesInConfig(proc_pset.getParameter<std::vector<std::string>>("@all_modules"));
601  std::map<std::string, std::vector<std::pair<std::string, int>>> outputModulePathPositions;
602  reduceParameterSet(proc_pset, tns.getEndPaths(), modulesInConfig, usedModuleLabels, outputModulePathPositions);
603  {
604  std::vector<std::string> aliases = proc_pset.getParameter<std::vector<std::string>>("@all_aliases");
605  detail::processEDAliases(aliases, {}, proc_pset, processConfiguration->processName(), preg);
606  }
607 
608  // At this point all BranchDescriptions are created. Mark now the
609  // ones of unscheduled workers to be on-demand.
610  {
611  auto const& unsched = streamSchedules_[0]->unscheduledWorkersLumisAndEvents();
612  if (not unsched.empty()) {
613  std::set<std::string> unscheduledModules;
614  std::transform(unsched.begin(),
615  unsched.end(),
616  std::insert_iterator<std::set<std::string>>(unscheduledModules, unscheduledModules.begin()),
617  [](auto worker) { return worker->description()->moduleLabel(); });
618  preg.setUnscheduledProducts(unscheduledModules);
619  }
620  }
621 
622  processSwitchProducers(proc_pset, processConfiguration->processName(), preg);
623  proc_pset.registerIt();
624  processConfiguration->setParameterSetID(proc_pset.id());
625  processConfiguration->setProcessConfigurationID();
626 
627  // This is used for a little sanity-check to make sure no code
628  // modifications alter the number of workers at a later date.
629  size_t all_workers_count = allWorkers().size();
630 
631  moduleRegistry_->forAllModuleHolders([this](maker::ModuleHolder* iHolder) {
632  auto comm = iHolder->createOutputModuleCommunicator();
633  if (comm) {
634  all_output_communicators_.emplace_back(std::shared_ptr<OutputModuleCommunicator>{comm.release()});
635  }
636  });
637  // Now that the output workers are filled in, set any output limits or information.
638  limitOutput(proc_pset, branchIDListHelper.branchIDLists(), subProcessParentageHelper);
639 
640  // Sanity check: make sure nobody has added a worker after we've
641  // already relied on the WorkerManager being full.
642  assert(all_workers_count == allWorkers().size());
643 
644  branchIDListHelper.updateFromRegistry(preg);
645 
646  for (auto const& worker : streamSchedules_[0]->allWorkersLumisAndEvents()) {
647  worker->registerThinnedAssociations(preg, thinnedAssociationsHelper);
648  }
649 
650  processBlockHelper.updateForNewProcess(preg, processConfiguration->processName());
651 
652  // The output modules consume products in kept branches.
653  // So we must set this up before freezing.
654  for (auto& c : all_output_communicators_) {
655  c->selectProducts(preg, thinnedAssociationsHelper, processBlockHelper);
656  }
657 
658  for (auto& product : preg.productListUpdator()) {
659  setIsMergeable(product.second);
660  }
661 
662  {
663  // We now get a collection of types that may be consumed.
664  std::set<TypeID> productTypesConsumed;
665  std::set<TypeID> elementTypesConsumed;
666  // Loop over all modules
667  for (auto const& worker : allWorkers()) {
668  for (auto const& consumesInfo : worker->consumesInfo()) {
669  if (consumesInfo.kindOfType() == PRODUCT_TYPE) {
670  productTypesConsumed.emplace(consumesInfo.type());
671  } else {
672  elementTypesConsumed.emplace(consumesInfo.type());
673  }
674  }
675  }
676  // The SubProcess class is not a module, yet it may consume.
677  if (hasSubprocesses) {
678  productTypesConsumed.emplace(typeid(TriggerResults));
679  }
680  // The RandomNumberGeneratorService is not a module, yet it consumes.
681  { RngEDConsumer rngConsumer = RngEDConsumer(productTypesConsumed); }
682  preg.setFrozen(productTypesConsumed, elementTypesConsumed, processConfiguration->processName());
683  }
684 
685  for (auto& c : all_output_communicators_) {
686  c->setEventSelectionInfo(outputModulePathPositions, preg.anyProductProduced());
687  }
688 
689  if (wantSummary_) {
690  std::vector<const ModuleDescription*> modDesc;
691  const auto& workers = allWorkers();
692  modDesc.reserve(workers.size());
693 
694  std::transform(workers.begin(),
695  workers.end(),
696  std::back_inserter(modDesc),
697  [](const Worker* iWorker) -> const ModuleDescription* { return iWorker->description(); });
698 
699  // propagate_const<T> has no reset() function
700  summaryTimeKeeper_ = std::make_unique<SystemTimeKeeper>(prealloc.numberOfStreams(), modDesc, tns, processContext);
701  auto timeKeeperPtr = summaryTimeKeeper_.get();
702 
703  areg->watchPreModuleDestruction(timeKeeperPtr, &SystemTimeKeeper::removeModuleIfExists);
704 
705  areg->watchPreModuleEvent(timeKeeperPtr, &SystemTimeKeeper::startModuleEvent);
706  areg->watchPostModuleEvent(timeKeeperPtr, &SystemTimeKeeper::stopModuleEvent);
707  areg->watchPreModuleEventAcquire(timeKeeperPtr, &SystemTimeKeeper::restartModuleEvent);
708  areg->watchPostModuleEventAcquire(timeKeeperPtr, &SystemTimeKeeper::stopModuleEvent);
709  areg->watchPreModuleEventDelayedGet(timeKeeperPtr, &SystemTimeKeeper::pauseModuleEvent);
710  areg->watchPostModuleEventDelayedGet(timeKeeperPtr, &SystemTimeKeeper::restartModuleEvent);
711 
712  areg->watchPreSourceEvent(timeKeeperPtr, &SystemTimeKeeper::startEvent);
713  areg->watchPostEvent(timeKeeperPtr, &SystemTimeKeeper::stopEvent);
714 
715  areg->watchPrePathEvent(timeKeeperPtr, &SystemTimeKeeper::startPath);
716  areg->watchPostPathEvent(timeKeeperPtr, &SystemTimeKeeper::stopPath);
717 
718  areg->watchPostBeginJob(timeKeeperPtr, &SystemTimeKeeper::startProcessingLoop);
719  areg->watchPreEndJob(timeKeeperPtr, &SystemTimeKeeper::stopProcessingLoop);
720  //areg->preModuleEventSignal_.connect([timeKeeperPtr](StreamContext const& iContext, ModuleCallingContext const& iMod) {
721  //timeKeeperPtr->startModuleEvent(iContext,iMod);
722  //});
723  }
724 
725  } // Schedule::Schedule
size
Write out results.
dictionary aliases
Definition: autoCond.py:112
void stopEvent(StreamContext const &)
void restartModuleEvent(StreamContext const &, ModuleCallingContext const &)
void startModuleEvent(StreamContext const &, ModuleCallingContext const &)
void removeModuleIfExists(ModuleDescription const &module)
edm::propagate_const< std::unique_ptr< SystemTimeKeeper > > summaryTimeKeeper_
Definition: Schedule.h:328
assert(be >=bs)
void processEDAliases(std::vector< std::string > const &aliasNamesToProcess, std::unordered_set< std::string > const &aliasModulesToProcess, ParameterSet const &proc_pset, std::string const &processName, ProductRegistry &preg)
void limitOutput(ParameterSet const &proc_pset, BranchIDLists const &branchIDLists, SubProcessParentageHelper const *subProcessParentageHelper)
Definition: Schedule.cc:727
edm::propagate_const< std::shared_ptr< ModuleRegistry > > moduleRegistry_
Definition: Schedule.h:320
void stopPath(StreamContext const &, PathContext const &, HLTPathStatus const &)
void stopModuleEvent(StreamContext const &, ModuleCallingContext const &)
std::vector< edm::propagate_const< std::shared_ptr< StreamSchedule > > > streamSchedules_
Definition: Schedule.h:321
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
Definition: Schedule.cc:1292
void startPath(StreamContext const &, PathContext const &)
AllOutputModuleCommunicators all_output_communicators_
Definition: Schedule.h:325
void pauseModuleEvent(StreamContext const &, ModuleCallingContext const &)
bool wantSummary_
Definition: Schedule.h:332
void setIsMergeable(BranchDescription &)
unsigned transform(const HcalDetId &id, unsigned transformCode)

◆ getAllModuleDescriptions()

std::vector< ModuleDescription const * > edm::Schedule::getAllModuleDescriptions ( ) const

Return a vector allowing const access to all the ModuleDescriptions for this Schedule. *** N.B. *** Ownership of the ModuleDescriptions is not *** passed to the caller. Do not call delete on these *** pointers!

Definition at line 1281 of file Schedule.cc.

References allWorkers(), AlCaHLTBitMon_ParallelJobs::p, and mps_fire::result.

Referenced by edm::ScheduleInfo::availableModuleLabels(), and edm::ScheduleInfo::parametersForModule().

1281  {
1282  std::vector<ModuleDescription const*> result;
1283  result.reserve(allWorkers().size());
1284 
1285  for (auto const& worker : allWorkers()) {
1286  ModuleDescription const* p = worker->description();
1287  result.push_back(p);
1288  }
1289  return result;
1290  }
size
Write out results.
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
Definition: Schedule.cc:1292

◆ getTriggerReport()

void edm::Schedule::getTriggerReport ( TriggerReport rep) const

Return the trigger report information on paths, modules-in-path, modules-in-endpath, and modules.

Definition at line 1375 of file Schedule.cc.

References cuy::rep, alignCSCRings::s, edm::sort_all(), and streamSchedules_.

Referenced by sendFwkSummaryToMessageLogger().

1375  {
1376  rep.eventSummary.totalEvents = 0;
1377  rep.eventSummary.totalEventsPassed = 0;
1378  rep.eventSummary.totalEventsFailed = 0;
1379  for (auto& s : streamSchedules_) {
1380  s->getTriggerReport(rep);
1381  }
1382  sort_all(rep.workerSummaries);
1383  }
rep
Definition: cuy.py:1189
std::vector< edm::propagate_const< std::shared_ptr< StreamSchedule > > > streamSchedules_
Definition: Schedule.h:321
void sort_all(RandomAccessSequence &s)
wrappers for std::sort
Definition: Algorithms.h:92

◆ getTriggerTimingReport()

void edm::Schedule::getTriggerTimingReport ( TriggerTimingReport rep) const

Return the trigger timing report information on paths, modules-in-path, modules-in-endpath, and modules.

Definition at line 1385 of file Schedule.cc.

References cuy::rep, and summaryTimeKeeper_.

Referenced by sendFwkSummaryToMessageLogger().

1385  {
1386  rep.eventSummary.totalEvents = 0;
1387  rep.eventSummary.cpuTime = 0.;
1388  rep.eventSummary.realTime = 0.;
1389  summaryTimeKeeper_->fillTriggerTimingReport(rep);
1390  }
edm::propagate_const< std::unique_ptr< SystemTimeKeeper > > summaryTimeKeeper_
Definition: Schedule.h:328
rep
Definition: cuy.py:1189

◆ initializeEarlyDelete()

void edm::Schedule::initializeEarlyDelete ( std::vector< std::string > const &  branchesToDeleteEarly,
std::multimap< std::string, std::string > const &  referencesToBranches,
std::vector< std::string > const &  modulesToSkip,
edm::ProductRegistry const &  preg 
)

Definition at line 1271 of file Schedule.cc.

References moduleRegistry(), cms::cuda::stream, and streamSchedules_.

1274  {
1275  for (auto& stream : streamSchedules_) {
1276  stream->initializeEarlyDelete(
1277  *moduleRegistry(), branchesToDeleteEarly, referencesToBranches, modulesToSkip, preg);
1278  }
1279  }
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
std::shared_ptr< ModuleRegistry const > moduleRegistry() const
Definition: Schedule.h:314
std::vector< edm::propagate_const< std::shared_ptr< StreamSchedule > > > streamSchedules_
Definition: Schedule.h:321

◆ limitOutput()

void edm::Schedule::limitOutput ( ParameterSet const &  proc_pset,
BranchIDLists const &  branchIDLists,
SubProcessParentageHelper const *  subProcessParentageHelper 
)
private

Definition at line 727 of file Schedule.cc.

References all_output_communicators_, DummyCfis::c, edm::errors::Configuration, submitPVResolutionJobs::desc, edm::ParameterSet::empty(), Exception, edm::ParameterSet::getParameterNamesForType(), edm::ParameterSet::getParameterSetNames(), edm::ParameterSet::getUntrackedParameter(), edm::ParameterSet::getUntrackedParameterSet(), HerwigMaxPtPartonFilter_cfi::moduleLabel, convertSQLitetoXML_cfg::output, edm::search_all(), and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by finishSetup().

729  {
730  std::string const output("output");
731 
732  ParameterSet const& maxEventsPSet = proc_pset.getUntrackedParameterSet("maxEvents");
733  int maxEventSpecs = 0;
734  int maxEventsOut = -1;
735  ParameterSet const* vMaxEventsOut = nullptr;
736  std::vector<std::string> intNamesE = maxEventsPSet.getParameterNamesForType<int>(false);
737  if (search_all(intNamesE, output)) {
738  maxEventsOut = maxEventsPSet.getUntrackedParameter<int>(output);
739  ++maxEventSpecs;
740  }
741  std::vector<std::string> psetNamesE;
742  maxEventsPSet.getParameterSetNames(psetNamesE, false);
743  if (search_all(psetNamesE, output)) {
744  vMaxEventsOut = &maxEventsPSet.getUntrackedParameterSet(output);
745  ++maxEventSpecs;
746  }
747 
748  if (maxEventSpecs > 1) {
750  << "\nAt most, one form of 'output' may appear in the 'maxEvents' parameter set";
751  }
752 
753  for (auto& c : all_output_communicators_) {
754  OutputModuleDescription desc(branchIDLists, maxEventsOut, subProcessParentageHelper);
755  if (vMaxEventsOut != nullptr && !vMaxEventsOut->empty()) {
756  std::string const& moduleLabel = c->description().moduleLabel();
757  try {
758  desc.maxEvents_ = vMaxEventsOut->getUntrackedParameter<int>(moduleLabel);
759  } catch (Exception const&) {
761  << "\nNo entry in 'maxEvents' for output module label '" << moduleLabel << "'.\n";
762  }
763  }
764  c->configure(desc);
765  }
766  }
bool search_all(ForwardSequence const &s, Datum const &d)
Definition: Algorithms.h:36
AllOutputModuleCommunicators all_output_communicators_
Definition: Schedule.h:325
Definition: output.py:1

◆ moduleDescriptionsInEndPath()

void edm::Schedule::moduleDescriptionsInEndPath ( std::string const &  iEndPathLabel,
std::vector< ModuleDescription const *> &  descriptions,
unsigned int  hint 
) const

adds the ModuleDescriptions into the vector for the modules scheduled in path iEndPathLabel hint is a performance optimization if you might know the position of the module in the path

Definition at line 1318 of file Schedule.cc.

References streamSchedules_.

1320  {
1321  streamSchedules_[0]->moduleDescriptionsInEndPath(iEndPathLabel, descriptions, hint);
1322  }
std::vector< edm::propagate_const< std::shared_ptr< StreamSchedule > > > streamSchedules_
Definition: Schedule.h:321

◆ moduleDescriptionsInPath()

void edm::Schedule::moduleDescriptionsInPath ( std::string const &  iPathLabel,
std::vector< ModuleDescription const *> &  descriptions,
unsigned int  hint 
) const

adds the ModuleDescriptions into the vector for the modules scheduled in path iPathLabel hint is a performance optimization if you might know the position of the module in the path

Definition at line 1312 of file Schedule.cc.

References streamSchedules_.

1314  {
1315  streamSchedules_[0]->moduleDescriptionsInPath(iPathLabel, descriptions, hint);
1316  }
std::vector< edm::propagate_const< std::shared_ptr< StreamSchedule > > > streamSchedules_
Definition: Schedule.h:321

◆ moduleRegistry() [1/2]

std::shared_ptr<ModuleRegistry const> edm::Schedule::moduleRegistry ( ) const
inlineprivate

Definition at line 314 of file Schedule.h.

References edm::get_underlying_safe(), and moduleRegistry_.

Referenced by initializeEarlyDelete().

constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::shared_ptr< ModuleRegistry > > moduleRegistry_
Definition: Schedule.h:320

◆ moduleRegistry() [2/2]

std::shared_ptr<ModuleRegistry>& edm::Schedule::moduleRegistry ( )
inlineprivate

Definition at line 315 of file Schedule.h.

References edm::get_underlying_safe(), and moduleRegistry_.

constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::shared_ptr< ModuleRegistry > > moduleRegistry_
Definition: Schedule.h:320

◆ modulesInPath()

void edm::Schedule::modulesInPath ( std::string const &  iPathLabel,
std::vector< std::string > &  oLabelsToFill 
) const

adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel

Definition at line 1308 of file Schedule.cc.

References streamSchedules_.

Referenced by edm::ScheduleInfo::modulesInPath().

1308  {
1309  streamSchedules_[0]->modulesInPath(iPathLabel, oLabelsToFill);
1310  }
std::vector< edm::propagate_const< std::shared_ptr< StreamSchedule > > > streamSchedules_
Definition: Schedule.h:321

◆ openOutputFiles()

void edm::Schedule::openOutputFiles ( FileBlock fb)

Definition at line 1072 of file Schedule.cc.

References all_output_communicators_, edm::for_all(), and edm::OutputModuleCommunicator::openFile().

1072  {
1073  using std::placeholders::_1;
1074  for_all(all_output_communicators_, std::bind(&OutputModuleCommunicator::openFile, _1, std::cref(fb)));
1075  }
virtual void openFile(FileBlock const &fb)=0
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
AllOutputModuleCommunicators all_output_communicators_
Definition: Schedule.h:325

◆ processOneEventAsync()

void edm::Schedule::processOneEventAsync ( WaitingTaskHolder  iTask,
unsigned int  iStreamID,
EventTransitionInfo info,
ServiceToken const &  token 
)

Definition at line 1206 of file Schedule.cc.

References cms::cuda::assert(), info(), eostools::move(), pathStatusInserters_, streamSchedules_, and unpackBuffers-CaloStage2::token.

1209  {
1210  assert(iStreamID < streamSchedules_.size());
1211  streamSchedules_[iStreamID]->processOneEventAsync(std::move(iTask), info, token, pathStatusInserters_);
1212  }
static const TGPicture * info(bool iBackgroundIsBlack)
assert(be >=bs)
std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter > > > pathStatusInserters_
Definition: Schedule.h:318
std::vector< edm::propagate_const< std::shared_ptr< StreamSchedule > > > streamSchedules_
Definition: Schedule.h:321
def move(src, dest)
Definition: eostools.py:511

◆ processOneGlobalAsync()

template<typename T >
void edm::Schedule::processOneGlobalAsync ( WaitingTaskHolder  iTask,
typename T::TransitionInfoType &  transitionInfo,
ServiceToken const &  token,
bool  cleaningUpAfterException = false 
)

Definition at line 347 of file Schedule.h.

References globalSchedule_, and unpackBuffers-CaloStage2::token.

Referenced by edm::beginGlobalTransitionAsync(), and edm::endGlobalTransitionAsync().

350  {
351  globalSchedule_->processOneGlobalAsync<T>(iTaskHolder, transitionInfo, token, cleaningUpAfterException);
352  }
edm::propagate_const< std::unique_ptr< GlobalSchedule > > globalSchedule_
Definition: Schedule.h:323
long double T

◆ processOneStreamAsync()

template<typename T >
void edm::Schedule::processOneStreamAsync ( WaitingTaskHolder  iTask,
unsigned int  iStreamID,
typename T::TransitionInfoType &  transitionInfo,
ServiceToken const &  token,
bool  cleaningUpAfterException = false 
)

Definition at line 336 of file Schedule.h.

References cms::cuda::assert(), eostools::move(), streamSchedules_, and unpackBuffers-CaloStage2::token.

Referenced by edm::beginStreamTransitionAsync(), and edm::endStreamTransitionAsync().

340  {
341  assert(iStreamID < streamSchedules_.size());
342  streamSchedules_[iStreamID]->processOneStreamAsync<T>(
343  std::move(iTaskHolder), transitionInfo, token, cleaningUpAfterException);
344  }
assert(be >=bs)
std::vector< edm::propagate_const< std::shared_ptr< StreamSchedule > > > streamSchedules_
Definition: Schedule.h:321
long double T
def move(src, dest)
Definition: eostools.py:511

◆ respondToCloseInputFile()

void edm::Schedule::respondToCloseInputFile ( FileBlock const &  fb)

Definition at line 1183 of file Schedule.cc.

References allWorkers(), edm::for_all(), and edm::Worker::respondToCloseInputFile().

1183  {
1184  using std::placeholders::_1;
1185  for_all(allWorkers(), std::bind(&Worker::respondToCloseInputFile, _1, std::cref(fb)));
1186  }
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
Definition: Schedule.cc:1292
void respondToCloseInputFile(FileBlock const &fb)
Definition: Worker.h:186

◆ respondToOpenInputFile()

void edm::Schedule::respondToOpenInputFile ( FileBlock const &  fb)

Definition at line 1178 of file Schedule.cc.

References allWorkers(), edm::for_all(), and edm::Worker::respondToOpenInputFile().

1178  {
1179  using std::placeholders::_1;
1180  for_all(allWorkers(), std::bind(&Worker::respondToOpenInputFile, _1, std::cref(fb)));
1181  }
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
Definition: Schedule.cc:1292
void respondToOpenInputFile(FileBlock const &fb)
Definition: Worker.h:185

◆ resultsInserter() [1/2]

std::shared_ptr<TriggerResultInserter const> edm::Schedule::resultsInserter ( ) const
inlineprivate

Definition at line 310 of file Schedule.h.

References edm::get_underlying_safe(), and resultsInserter_.

310  {
312  }
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::shared_ptr< TriggerResultInserter > > resultsInserter_
Definition: Schedule.h:317

◆ resultsInserter() [2/2]

std::shared_ptr<TriggerResultInserter>& edm::Schedule::resultsInserter ( )
inlineprivate

Definition at line 313 of file Schedule.h.

References edm::get_underlying_safe(), and resultsInserter_.

constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::shared_ptr< TriggerResultInserter > > resultsInserter_
Definition: Schedule.h:317

◆ sendFwkSummaryToMessageLogger()

void edm::Schedule::sendFwkSummaryToMessageLogger ( ) const

Definition at line 798 of file Schedule.cc.

References ALPAKA_ACCELERATOR_NAMESPACE::brokenline::constexpr(), visDQMUpload::context, submitPVResolutionJobs::count, edm::EventTimingSummary::cpuTime, edm::TriggerTimingReport::endPathSummaries, edm::TriggerReport::endPathSummaries, edm::TriggerTimingReport::eventSummary, edm::TriggerReport::eventSummary, alignBH_cfg::fixed, getTriggerReport(), getTriggerTimingReport(), WZElectronSkims53X_cff::max, mod(), edm::PreallocationConfiguration::numberOfThreads(), AlCaHLTBitMon_ParallelJobs::p, preallocConfig_, edm::EventTimingSummary::realTime, streamSchedules_, edm::EventTimingSummary::sumStreamRealTime, edm::EventTimingSummary::totalEvents, edm::EventSummary::totalEvents, totalEvents(), edm::EventSummary::totalEventsFailed, edm::EventSummary::totalEventsPassed, edm::TriggerTimingReport::trigPathSummaries, edm::TriggerReport::trigPathSummaries, edm::TriggerTimingReport::workerSummaries, and edm::TriggerReport::workerSummaries.

Referenced by endJob().

798  {
799  //Function to loop over items in a container and periodically
800  // flush to the message logger.
801  auto logForEach = [](auto const& iContainer, auto iMessage) {
802  auto logger = LogFwkVerbatim("FwkSummary");
803  int count = 0;
804  for (auto const& element : iContainer) {
805  iMessage(logger, element);
806  if (++count == 10) {
807  logger = LogFwkVerbatim("FwkSummary");
808  count = 0;
809  } else {
810  logger << "\n";
811  }
812  }
813  };
814 
815  {
816  TriggerReport tr;
817  getTriggerReport(tr);
818 
819  // The trigger report (pass/fail etc.):
820 
821  LogFwkVerbatim("FwkSummary") << "";
822  if (streamSchedules_[0]->context().processContext()->isSubProcess()) {
823  LogFwkVerbatim("FwkSummary") << "TrigReport Process: "
824  << streamSchedules_[0]->context().processContext()->processName();
825  }
826  LogFwkVerbatim("FwkSummary") << "TrigReport "
827  << "---------- Event Summary ------------";
828  if (!tr.trigPathSummaries.empty()) {
829  LogFwkVerbatim("FwkSummary") << "TrigReport"
830  << " Events total = " << tr.eventSummary.totalEvents
831  << " passed = " << tr.eventSummary.totalEventsPassed
832  << " failed = " << tr.eventSummary.totalEventsFailed << "";
833  } else {
834  LogFwkVerbatim("FwkSummary") << "TrigReport"
835  << " Events total = " << tr.eventSummary.totalEvents
836  << " passed = " << tr.eventSummary.totalEvents << " failed = 0";
837  }
838 
839  LogFwkVerbatim("FwkSummary") << "";
840  LogFwkVerbatim("FwkSummary") << "TrigReport "
841  << "---------- Path Summary ------------";
842  LogFwkVerbatim("FwkSummary") << "TrigReport " << std::right << std::setw(10) << "Trig Bit#"
843  << " " << std::right << std::setw(10) << "Executed"
844  << " " << std::right << std::setw(10) << "Passed"
845  << " " << std::right << std::setw(10) << "Failed"
846  << " " << std::right << std::setw(10) << "Error"
847  << " "
848  << "Name"
849  << "";
850  logForEach(tr.trigPathSummaries, [](auto& logger, auto const& p) {
851  logger << "TrigReport " << std::right << std::setw(5) << 1 << std::right << std::setw(5) << p.bitPosition << " "
852  << std::right << std::setw(10) << p.timesRun << " " << std::right << std::setw(10) << p.timesPassed
853  << " " << std::right << std::setw(10) << p.timesFailed << " " << std::right << std::setw(10)
854  << p.timesExcept << " " << p.name;
855  });
856 
857  /*
858  std::vector<int>::const_iterator epi = empty_trig_paths_.begin();
859  std::vector<int>::const_iterator epe = empty_trig_paths_.end();
860  std::vector<std::string>::const_iterator epn = empty_trig_path_names_.begin();
861  for (; epi != epe; ++epi, ++epn) {
862 
863  LogFwkVerbatim("FwkSummary") << "TrigReport "
864  << std::right << std::setw(5) << 1
865  << std::right << std::setw(5) << *epi << " "
866  << std::right << std::setw(10) << totalEvents() << " "
867  << std::right << std::setw(10) << totalEvents() << " "
868  << std::right << std::setw(10) << 0 << " "
869  << std::right << std::setw(10) << 0 << " "
870  << *epn << "";
871  }
872  */
873 
874  LogFwkVerbatim("FwkSummary") << "\n"
875  << "TrigReport "
876  << "-------End-Path Summary ------------\n"
877  << "TrigReport " << std::right << std::setw(10) << "Trig Bit#"
878  << " " << std::right << std::setw(10) << "Executed"
879  << " " << std::right << std::setw(10) << "Passed"
880  << " " << std::right << std::setw(10) << "Failed"
881  << " " << std::right << std::setw(10) << "Error"
882  << " "
883  << "Name";
884  logForEach(tr.endPathSummaries, [](auto& logger, auto const& p) {
885  logger << "TrigReport " << std::right << std::setw(5) << 0 << std::right << std::setw(5) << p.bitPosition << " "
886  << std::right << std::setw(10) << p.timesRun << " " << std::right << std::setw(10) << p.timesPassed
887  << " " << std::right << std::setw(10) << p.timesFailed << " " << std::right << std::setw(10)
888  << p.timesExcept << " " << p.name;
889  });
890 
891  for (auto const& p : tr.trigPathSummaries) {
892  LogFwkVerbatim("FwkSummary") << "\n"
893  << "TrigReport "
894  << "---------- Modules in Path: " << p.name << " ------------\n"
895  << "TrigReport " << std::right << std::setw(10) << "Trig Bit#"
896  << " " << std::right << std::setw(10) << "Visited"
897  << " " << std::right << std::setw(10) << "Passed"
898  << " " << std::right << std::setw(10) << "Failed"
899  << " " << std::right << std::setw(10) << "Error"
900  << " "
901  << "Name";
902 
903  logForEach(p.moduleInPathSummaries, [](auto& logger, auto const& mod) {
904  logger << "TrigReport " << std::right << std::setw(5) << 1 << std::right << std::setw(5) << mod.bitPosition
905  << " " << std::right << std::setw(10) << mod.timesVisited << " " << std::right << std::setw(10)
906  << mod.timesPassed << " " << std::right << std::setw(10) << mod.timesFailed << " " << std::right
907  << std::setw(10) << mod.timesExcept << " " << mod.moduleLabel;
908  });
909  }
910 
911  for (auto const& p : tr.endPathSummaries) {
912  LogFwkVerbatim("FwkSummary") << "\n"
913  << "TrigReport "
914  << "------ Modules in End-Path: " << p.name << " ------------\n"
915  << "TrigReport " << std::right << std::setw(10) << "Trig Bit#"
916  << " " << std::right << std::setw(10) << "Visited"
917  << " " << std::right << std::setw(10) << "Passed"
918  << " " << std::right << std::setw(10) << "Failed"
919  << " " << std::right << std::setw(10) << "Error"
920  << " "
921  << "Name";
922 
923  unsigned int bitpos = 0;
924  logForEach(p.moduleInPathSummaries, [&bitpos](auto& logger, auto const& mod) {
925  logger << "TrigReport " << std::right << std::setw(5) << 0 << std::right << std::setw(5) << bitpos << " "
926  << std::right << std::setw(10) << mod.timesVisited << " " << std::right << std::setw(10)
927  << mod.timesPassed << " " << std::right << std::setw(10) << mod.timesFailed << " " << std::right
928  << std::setw(10) << mod.timesExcept << " " << mod.moduleLabel;
929  ++bitpos;
930  });
931  }
932 
933  LogFwkVerbatim("FwkSummary") << "\n"
934  << "TrigReport "
935  << "---------- Module Summary ------------\n"
936  << "TrigReport " << std::right << std::setw(10) << "Visited"
937  << " " << std::right << std::setw(10) << "Executed"
938  << " " << std::right << std::setw(10) << "Passed"
939  << " " << std::right << std::setw(10) << "Failed"
940  << " " << std::right << std::setw(10) << "Error"
941  << " "
942  << "Name";
943 
944  logForEach(tr.workerSummaries, [](auto& logger, auto const& worker) {
945  logger << "TrigReport " << std::right << std::setw(10) << worker.timesVisited << " " << std::right
946  << std::setw(10) << worker.timesRun << " " << std::right << std::setw(10) << worker.timesPassed << " "
947  << std::right << std::setw(10) << worker.timesFailed << " " << std::right << std::setw(10)
948  << worker.timesExcept << " " << worker.moduleLabel;
949  });
950  LogFwkVerbatim("FwkSummary") << "";
951  }
952  // The timing report (CPU and Real Time):
953  TriggerTimingReport tr;
955 
956  const int totalEvents = std::max(1, tr.eventSummary.totalEvents);
957 
958  LogFwkVerbatim("FwkSummary") << "TimeReport "
959  << "---------- Event Summary ---[sec]----";
960  LogFwkVerbatim("FwkSummary") << "TimeReport" << std::setprecision(6) << std::fixed
961  << " event loop CPU/event = " << tr.eventSummary.cpuTime / totalEvents;
962  LogFwkVerbatim("FwkSummary") << "TimeReport" << std::setprecision(6) << std::fixed
963  << " event loop Real/event = " << tr.eventSummary.realTime / totalEvents;
964  LogFwkVerbatim("FwkSummary") << "TimeReport" << std::setprecision(6) << std::fixed
965  << " sum Streams Real/event = " << tr.eventSummary.sumStreamRealTime / totalEvents;
966  LogFwkVerbatim("FwkSummary") << "TimeReport" << std::setprecision(6) << std::fixed
967  << " efficiency CPU/Real/thread = "
968  << tr.eventSummary.cpuTime / tr.eventSummary.realTime /
970 
971  constexpr int kColumn1Size = 10;
972  constexpr int kColumn2Size = 12;
973  constexpr int kColumn3Size = 12;
974  LogFwkVerbatim("FwkSummary") << "";
975  LogFwkVerbatim("FwkSummary") << "TimeReport "
976  << "---------- Path Summary ---[Real sec]----";
977  LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
978  << " " << std::right << std::setw(kColumn2Size) << "per exec"
979  << " Name";
980  logForEach(tr.trigPathSummaries, [&](auto& logger, auto const& p) {
981  const int timesRun = std::max(1, p.timesRun);
982  logger << "TimeReport " << std::setprecision(6) << std::fixed << std::right << std::setw(kColumn1Size)
983  << p.realTime / totalEvents << " " << std::right << std::setw(kColumn2Size) << p.realTime / timesRun
984  << " " << p.name;
985  });
986  LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
987  << " " << std::right << std::setw(kColumn2Size) << "per exec"
988  << " Name";
989 
990  LogFwkVerbatim("FwkSummary") << "\n"
991  << "TimeReport "
992  << "-------End-Path Summary ---[Real sec]----\n"
993  << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
994  << " " << std::right << std::setw(kColumn2Size) << "per exec"
995  << " Name";
996  logForEach(tr.endPathSummaries, [&](auto& logger, auto const& p) {
997  const int timesRun = std::max(1, p.timesRun);
998 
999  logger << "TimeReport " << std::setprecision(6) << std::fixed << std::right << std::setw(kColumn1Size)
1000  << p.realTime / totalEvents << " " << std::right << std::setw(kColumn2Size) << p.realTime / timesRun
1001  << " " << p.name;
1002  });
1003  LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
1004  << " " << std::right << std::setw(kColumn2Size) << "per exec"
1005  << " Name";
1006 
1007  for (auto const& p : tr.trigPathSummaries) {
1008  LogFwkVerbatim("FwkSummary") << "\n"
1009  << "TimeReport "
1010  << "---------- Modules in Path: " << p.name << " ---[Real sec]----\n"
1011  << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
1012  << " " << std::right << std::setw(kColumn2Size) << "per visit"
1013  << " Name";
1014  logForEach(p.moduleInPathSummaries, [&](auto& logger, auto const& mod) {
1015  logger << "TimeReport " << std::setprecision(6) << std::fixed << std::right << std::setw(kColumn1Size)
1016  << mod.realTime / totalEvents << " " << std::right << std::setw(kColumn2Size)
1017  << mod.realTime / std::max(1, mod.timesVisited) << " " << mod.moduleLabel;
1018  });
1019  }
1020  if (not tr.trigPathSummaries.empty()) {
1021  LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
1022  << " " << std::right << std::setw(kColumn2Size) << "per visit"
1023  << " Name";
1024  }
1025  for (auto const& p : tr.endPathSummaries) {
1026  LogFwkVerbatim("FwkSummary") << "\n"
1027  << "TimeReport "
1028  << "------ Modules in End-Path: " << p.name << " ---[Real sec]----\n"
1029  << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
1030  << " " << std::right << std::setw(kColumn2Size) << "per visit"
1031  << " Name";
1032  logForEach(p.moduleInPathSummaries, [&](auto& logger, auto const& mod) {
1033  logger << "TimeReport " << std::setprecision(6) << std::fixed << std::right << std::setw(kColumn1Size)
1034  << mod.realTime / totalEvents << " " << std::right << std::setw(kColumn2Size)
1035  << mod.realTime / std::max(1, mod.timesVisited) << " " << mod.moduleLabel;
1036  });
1037  }
1038  if (not tr.endPathSummaries.empty()) {
1039  LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
1040  << " " << std::right << std::setw(kColumn2Size) << "per visit"
1041  << " Name";
1042  }
1043  LogFwkVerbatim("FwkSummary") << "\n"
1044  << "TimeReport "
1045  << "---------- Module Summary ---[Real sec]----\n"
1046  << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
1047  << " " << std::right << std::setw(kColumn2Size) << "per exec"
1048  << " " << std::right << std::setw(kColumn3Size) << "per visit"
1049  << " Name";
1050  logForEach(tr.workerSummaries, [&](auto& logger, auto const& worker) {
1051  logger << "TimeReport " << std::setprecision(6) << std::fixed << std::right << std::setw(kColumn1Size)
1052  << worker.realTime / totalEvents << " " << std::right << std::setw(kColumn2Size)
1053  << worker.realTime / std::max(1, worker.timesRun) << " " << std::right << std::setw(kColumn3Size)
1054  << worker.realTime / std::max(1, worker.timesVisited) << " " << worker.moduleLabel;
1055  });
1056  LogFwkVerbatim("FwkSummary") << "TimeReport " << std::right << std::setw(kColumn1Size) << "per event"
1057  << " " << std::right << std::setw(kColumn2Size) << "per exec"
1058  << " " << std::right << std::setw(kColumn3Size) << "per visit"
1059  << " Name";
1060 
1061  LogFwkVerbatim("FwkSummary") << "\nT---Report end!\n\n";
1062  }
void getTriggerTimingReport(TriggerTimingReport &rep) const
Definition: Schedule.cc:1385
int totalEvents() const
Definition: Schedule.cc:1392
void getTriggerReport(TriggerReport &rep) const
Definition: Schedule.cc:1375
Definition: logger.py:1
PreallocationConfiguration preallocConfig_
Definition: Schedule.h:326
Log< level::FwkInfo, true > LogFwkVerbatim
std::vector< edm::propagate_const< std::shared_ptr< StreamSchedule > > > streamSchedules_
Definition: Schedule.h:321
T mod(const T &a, const T &b)
Definition: ecalDccMap.h:4

◆ shouldWeCloseOutput()

bool edm::Schedule::shouldWeCloseOutput ( ) const

Definition at line 1169 of file Schedule.cc.

References all_output_communicators_, and edm::OutputModuleCommunicator::shouldWeCloseFile().

1169  {
1170  using std::placeholders::_1;
1171  // Return true iff at least one output module returns true.
1172  return (std::find_if(all_output_communicators_.begin(),
1176  }
virtual bool shouldWeCloseFile() const =0
AllOutputModuleCommunicators all_output_communicators_
Definition: Schedule.h:325

◆ terminate()

bool edm::Schedule::terminate ( void  ) const

Return whether each output module has reached its maximum count.

Definition at line 768 of file Schedule.cc.

References all_output_communicators_, and DummyCfis::c.

768  {
769  if (all_output_communicators_.empty()) {
770  return false;
771  }
772  for (auto& c : all_output_communicators_) {
773  if (!c->limitReached()) {
774  // Found an output module that has not reached output event count.
775  return false;
776  }
777  }
778  LogInfo("SuccessfulTermination") << "The job is terminating successfully because each output module\n"
779  << "has reached its configured limit.\n";
780  return true;
781  }
Log< level::Info, false > LogInfo
AllOutputModuleCommunicators all_output_communicators_
Definition: Schedule.h:325

◆ totalEvents()

int edm::Schedule::totalEvents ( ) const

Return the number of events this Schedule has tried to process (inclues both successes and failures, including failures due to exceptions during processing).

Definition at line 1392 of file Schedule.cc.

References alignCSCRings::s, and streamSchedules_.

Referenced by sendFwkSummaryToMessageLogger().

1392  {
1393  int returnValue = 0;
1394  for (auto& s : streamSchedules_) {
1395  returnValue += s->totalEvents();
1396  }
1397  return returnValue;
1398  }
std::vector< edm::propagate_const< std::shared_ptr< StreamSchedule > > > streamSchedules_
Definition: Schedule.h:321

◆ totalEventsFailed()

int edm::Schedule::totalEventsFailed ( ) const

Return the number of events that have not passed any trigger. (N.B. totalEventsFailed() + totalEventsPassed() == totalEvents()

Definition at line 1408 of file Schedule.cc.

References alignCSCRings::s, and streamSchedules_.

1408  {
1409  int returnValue = 0;
1410  for (auto& s : streamSchedules_) {
1411  returnValue += s->totalEventsFailed();
1412  }
1413  return returnValue;
1414  }
std::vector< edm::propagate_const< std::shared_ptr< StreamSchedule > > > streamSchedules_
Definition: Schedule.h:321

◆ totalEventsPassed()

int edm::Schedule::totalEventsPassed ( ) const

Return the number of events which have been passed by one or more trigger paths.

Definition at line 1400 of file Schedule.cc.

References alignCSCRings::s, and streamSchedules_.

1400  {
1401  int returnValue = 0;
1402  for (auto& s : streamSchedules_) {
1403  returnValue += s->totalEventsPassed();
1404  }
1405  return returnValue;
1406  }
std::vector< edm::propagate_const< std::shared_ptr< StreamSchedule > > > streamSchedules_
Definition: Schedule.h:321

◆ triggerPaths()

void edm::Schedule::triggerPaths ( std::vector< std::string > &  oLabelsToFill) const

Adds to oLabelsToFill the labels for all trigger paths in the process. This is different from availablePaths because it includes the empty paths to match the entries in TriggerResults exactly.

Definition at line 1304 of file Schedule.cc.

References pathNames_.

1304 { oLabelsToFill = *pathNames_; }
std::vector< std::string > const * pathNames_
Definition: Schedule.h:330

◆ writeLumiAsync()

void edm::Schedule::writeLumiAsync ( WaitingTaskHolder  iTask,
LuminosityBlockPrincipal const &  lbp,
ProcessContext const *  processContext,
ActivityRegistry activityRegistry 
)

Definition at line 1140 of file Schedule.cc.

References all_output_communicators_, edm::LuminosityBlockPrincipal::beginTime(), DummyCfis::c, CMS_SA_ALLOW, dqmdumpme::first, edm::LuminosityBlockPrincipal::id(), edm::LuminosityBlockPrincipal::index(), edm::RunPrincipal::index(), edm::ServiceRegistry::instance(), edm::GlobalContext::kWriteLuminosityBlock, findAndChange::op, edm::ActivityRegistry::postGlobalWriteLumiSignal_, edm::ActivityRegistry::preGlobalWriteLumiSignal_, edm::ServiceRegistry::presentToken(), edm::waiting_task::chain::runLast(), edm::LuminosityBlockPrincipal::runPrincipal(), TrackValidation_cff::task, edm::waiting_task::chain::then(), and unpackBuffers-CaloStage2::token.

1143  {
1145  GlobalContext globalContext(GlobalContext::Transition::kWriteLuminosityBlock,
1146  lbp.id(),
1147  lbp.runPrincipal().index(),
1148  lbp.index(),
1149  lbp.beginTime(),
1150  processContext);
1151 
1152  using namespace edm::waiting_task;
1153  chain::first([&](auto nextTask) {
1155  CMS_SA_ALLOW try { activityRegistry->preGlobalWriteLumiSignal_(globalContext); } catch (...) {
1156  }
1157  for (auto& c : all_output_communicators_) {
1158  c->writeLumiAsync(nextTask, lbp, processContext, activityRegistry);
1159  }
1160  }) | chain::then(doCleanup([activityRegistry, globalContext, token]() {
1161  //services can depend on other services
1163 
1164  activityRegistry->postGlobalWriteLumiSignal_(globalContext);
1165  })) |
1167  }
#define CMS_SA_ALLOW
constexpr auto then(O &&iO)
Definition: chain_first.h:277
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
static ServiceRegistry & instance()
AllOutputModuleCommunicators all_output_communicators_
Definition: Schedule.h:325
ServiceToken presentToken() const

◆ writeProcessBlockAsync()

void edm::Schedule::writeProcessBlockAsync ( WaitingTaskHolder  iTask,
ProcessBlockPrincipal const &  pbp,
ProcessContext const *  processContext,
ActivityRegistry activityRegistry 
)

Definition at line 1110 of file Schedule.cc.

References all_output_communicators_, DummyCfis::c, CMS_SA_ALLOW, dqmdumpme::first, edm::ServiceRegistry::instance(), edm::LuminosityBlockIndex::invalidLuminosityBlockIndex(), edm::RunIndex::invalidRunIndex(), edm::Timestamp::invalidTimestamp(), edm::GlobalContext::kWriteProcessBlock, eostools::move(), findAndChange::op, edm::ActivityRegistry::postWriteProcessBlockSignal_, edm::ServiceRegistry::presentToken(), edm::ActivityRegistry::preWriteProcessBlockSignal_, edm::waiting_task::chain::runLast(), TrackValidation_cff::task, edm::waiting_task::chain::then(), and unpackBuffers-CaloStage2::token.

1113  {
1115  GlobalContext globalContext(GlobalContext::Transition::kWriteProcessBlock,
1116  LuminosityBlockID(),
1120  processContext);
1121 
1122  using namespace edm::waiting_task;
1123  chain::first([&](auto nextTask) {
1124  // Propagating the exception would be nontrivial, and signal actions are not supposed to throw exceptions
1126  CMS_SA_ALLOW try { activityRegistry->preWriteProcessBlockSignal_(globalContext); } catch (...) {
1127  }
1128  for (auto& c : all_output_communicators_) {
1129  c->writeProcessBlockAsync(nextTask, pbp, processContext, activityRegistry);
1130  }
1131  }) | chain::then(doCleanup([activityRegistry, globalContext, token]() {
1132  //services can depend on other services
1134 
1135  activityRegistry->postWriteProcessBlockSignal_(globalContext);
1136  })) |
1138  }
#define CMS_SA_ALLOW
static Timestamp invalidTimestamp()
Definition: Timestamp.h:75
constexpr auto then(O &&iO)
Definition: chain_first.h:277
static RunIndex invalidRunIndex()
Definition: RunIndex.cc:9
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
static ServiceRegistry & instance()
static LuminosityBlockIndex invalidLuminosityBlockIndex()
AllOutputModuleCommunicators all_output_communicators_
Definition: Schedule.h:325
ServiceToken presentToken() const
def move(src, dest)
Definition: eostools.py:511

◆ writeRunAsync()

void edm::Schedule::writeRunAsync ( WaitingTaskHolder  iTask,
RunPrincipal const &  rp,
ProcessContext const *  processContext,
ActivityRegistry activityRegistry,
MergeableRunProductMetadata const *  mergeableRunProductMetadata 
)

Definition at line 1077 of file Schedule.cc.

References all_output_communicators_, DummyCfis::c, CMS_SA_ALLOW, edm::RunPrincipal::endTime(), dqmdumpme::first, edm::RunPrincipal::index(), edm::ServiceRegistry::instance(), edm::LuminosityBlockIndex::invalidLuminosityBlockIndex(), edm::GlobalContext::kWriteRun, findAndChange::op, edm::ActivityRegistry::postGlobalWriteRunSignal_, edm::ActivityRegistry::preGlobalWriteRunSignal_, edm::ServiceRegistry::presentToken(), edm::RunPrincipal::run(), edm::waiting_task::chain::runLast(), TrackValidation_cff::task, edm::waiting_task::chain::then(), and unpackBuffers-CaloStage2::token.

1081  {
1083  GlobalContext globalContext(GlobalContext::Transition::kWriteRun,
1084  LuminosityBlockID(rp.run(), 0),
1085  rp.index(),
1087  rp.endTime(),
1088  processContext);
1089 
1090  using namespace edm::waiting_task;
1091  chain::first([&](auto nextTask) {
1092  //services can depend on other services
1094 
1095  // Propagating the exception would be nontrivial, and signal actions are not supposed to throw exceptions
1096  CMS_SA_ALLOW try { activityRegistry->preGlobalWriteRunSignal_(globalContext); } catch (...) {
1097  }
1098  for (auto& c : all_output_communicators_) {
1099  c->writeRunAsync(nextTask, rp, processContext, activityRegistry, mergeableRunProductMetadata);
1100  }
1101  }) | chain::then(doCleanup([activityRegistry, globalContext, token]() {
1102  //services can depend on other services
1104 
1105  activityRegistry->postGlobalWriteRunSignal_(globalContext);
1106  })) |
1108  }
#define CMS_SA_ALLOW
constexpr auto then(O &&iO)
Definition: chain_first.h:277
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
static ServiceRegistry & instance()
static LuminosityBlockIndex invalidLuminosityBlockIndex()
AllOutputModuleCommunicators all_output_communicators_
Definition: Schedule.h:325
ServiceToken presentToken() const

Member Data Documentation

◆ all_output_communicators_

AllOutputModuleCommunicators edm::Schedule::all_output_communicators_
private

◆ endPathNames_

std::vector<std::string> const* edm::Schedule::endPathNames_
private

Definition at line 331 of file Schedule.h.

Referenced by endPaths().

◆ endPathStatusInserters_

std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter> > > edm::Schedule::endPathStatusInserters_
private

Definition at line 319 of file Schedule.h.

◆ globalSchedule_

edm::propagate_const<std::unique_ptr<GlobalSchedule> > edm::Schedule::globalSchedule_
private

◆ moduleRegistry_

edm::propagate_const<std::shared_ptr<ModuleRegistry> > edm::Schedule::moduleRegistry_
private

Definition at line 320 of file Schedule.h.

Referenced by changeModule(), deleteModule(), finishSetup(), and moduleRegistry().

◆ pathNames_

std::vector<std::string> const* edm::Schedule::pathNames_
private

Definition at line 330 of file Schedule.h.

Referenced by triggerPaths().

◆ pathStatusInserters_

std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter> > > edm::Schedule::pathStatusInserters_
private

Definition at line 318 of file Schedule.h.

Referenced by processOneEventAsync().

◆ preallocConfig_

PreallocationConfiguration edm::Schedule::preallocConfig_
private

Definition at line 326 of file Schedule.h.

Referenced by changeModule(), and sendFwkSummaryToMessageLogger().

◆ resultsInserter_

edm::propagate_const<std::shared_ptr<TriggerResultInserter> > edm::Schedule::resultsInserter_
private

Definition at line 317 of file Schedule.h.

Referenced by resultsInserter().

◆ streamSchedules_

std::vector<edm::propagate_const<std::shared_ptr<StreamSchedule> > > edm::Schedule::streamSchedules_
private

◆ summaryTimeKeeper_

edm::propagate_const<std::unique_ptr<SystemTimeKeeper> > edm::Schedule::summaryTimeKeeper_
private

Definition at line 328 of file Schedule.h.

Referenced by finishSetup(), and getTriggerTimingReport().

◆ wantSummary_

bool edm::Schedule::wantSummary_
private

Definition at line 332 of file Schedule.h.

Referenced by endJob(), and finishSetup().