CMS 3D CMS Logo

TestProcessor.cc
Go to the documentation of this file.
1 // -*- C++ -*-
2 //
3 // Package: Subsystem/Package
4 // Class : TestProcessor
5 //
6 // Implementation:
7 // [Notes on implementation]
8 //
9 // Original Author: Chris Jones
10 // Created: Mon, 30 Apr 2018 18:51:08 GMT
11 //
12 
13 // system include files
14 
15 // user include files
18 
36 
39 
42 
46 
48 
51 
53 
54 #define xstr(s) str(s)
55 #define str(s) #s
56 
57 namespace edm {
58  namespace test {
59 
60  namespace {
61 
62  bool oneTimeInitializationImpl() {
64 
65  static std::unique_ptr<edm::ThreadsController> tsiPtr = std::make_unique<edm::ThreadsController>(1);
66 
67  // register the empty parentage vector , once and for all
69 
70  // register the empty parameter set, once and for all.
71  ParameterSet().registerIt();
72  return true;
73  }
74 
75  bool oneTimeInitialization() {
76  static const bool s_init{oneTimeInitializationImpl()};
77  return s_init;
78  }
79  } // namespace
80 
81  //
82  // constructors and destructor
83  //
85  : globalControl_(oneapi::tbb::global_control::max_allowed_parallelism, 1),
86  arena_(1),
87  espController_(std::make_unique<eventsetup::EventSetupsController>()),
88  historyAppender_(std::make_unique<HistoryAppender>()),
89  moduleRegistry_(std::make_shared<ModuleRegistry>()) {
90  //Setup various singletons
91  (void)oneTimeInitialization();
92 
94 
95  auto psetPtr = desc.parameterSet();
96 
97  validateTopLevelParameterSets(psetPtr.get());
98 
100 
101  labelOfTestModule_ = psetPtr->getParameter<std::string>("@moduleToTest");
102 
103  auto procDesc = desc.processDesc();
104  // Now do general initialization
106 
107  //initialize the services
108  auto& serviceSets = procDesc->getServicesPSets();
109  ServiceToken token = items.initServices(serviceSets, *psetPtr, iToken, serviceregistry::kOverlapIsError, true);
110  serviceToken_ = items.addCPRandTNS(*psetPtr, token);
111 
112  //make the services available
114 
115  // intialize miscellaneous items
116  std::shared_ptr<CommonParams> common(items.initMisc(*psetPtr));
117 
118  // intialize the event setup provider
119  esp_ = espController_->makeProvider(*psetPtr, items.actReg_.get());
120 
121  auto nThreads = 1U;
122  auto nStreams = 1U;
123  auto nConcurrentLumis = 1U;
124  auto nConcurrentRuns = 1U;
125  preallocations_ = PreallocationConfiguration{nThreads, nStreams, nConcurrentLumis, nConcurrentRuns};
126 
127  if (not iConfig.esProduceEntries().empty()) {
128  esHelper_ = std::make_unique<EventSetupTestHelper>(iConfig.esProduceEntries());
129  esp_->add(std::dynamic_pointer_cast<eventsetup::DataProxyProvider>(esHelper_));
130  esp_->add(std::dynamic_pointer_cast<EventSetupRecordIntervalFinder>(esHelper_));
131  }
132 
133  preg_ = items.preg();
134  processConfiguration_ = items.processConfiguration();
135 
136  edm::ParameterSet emptyPSet;
137  emptyPSet.registerIt();
138  auto psetid = emptyPSet.id();
139 
140  ProcessHistory oldHistory;
141  for (auto const& p : iConfig.extraProcesses()) {
142  oldHistory.emplace_back(p, psetid, xstr(PROJECT_VERSION), "0");
144  }
145 
146  //setup the products we will be adding to the event
147  for (auto const& produce : iConfig.produceEntries()) {
148  auto processName = produce.processName_;
149  if (processName.empty()) {
150  processName = processConfiguration_->processName();
151  }
152  edm::TypeWithDict twd(produce.type_.typeInfo());
154  produce.moduleLabel_,
155  processName,
156  twd.userClassName(),
157  twd.friendlyClassName(),
158  produce.instanceLabel_,
159  "",
160  psetid,
161  twd,
162  true //force this to come from 'source'
163  );
164  product.init();
165  dataProducts_.emplace_back(product, std::unique_ptr<WrapperBase>());
166  preg_->addProduct(product);
167  }
168 
169  processBlockHelper_ = std::make_shared<ProcessBlockHelper>();
170 
171  schedule_ = items.initSchedule(*psetPtr, false, preallocations_, &processContext_, *processBlockHelper_);
172  // set the data members
173  act_table_ = std::move(items.act_table_);
174  actReg_ = items.actReg_;
175  branchIDListHelper_ = items.branchIDListHelper();
176  thinnedAssociationsHelper_ = items.thinnedAssociationsHelper();
179 
181 
182  preg_->setFrozen();
183  for (unsigned int index = 0; index < preallocations_.numberOfStreams(); ++index) {
184  // Reusable event principal
185  auto ep = std::make_shared<EventPrincipal>(preg_,
189  historyAppender_.get(),
190  index);
192  }
193 
194  for (unsigned int index = 0; index < preallocations_.numberOfLuminosityBlocks(); ++index) {
195  auto lp =
196  std::make_unique<LuminosityBlockPrincipal>(preg_, *processConfiguration_, historyAppender_.get(), index);
198  }
199  {
200  auto pb = std::make_unique<ProcessBlockPrincipal>(preg_, *processConfiguration_);
202  }
203  }
204 
206  //
207  // member functions
208  //
209 
210  void TestProcessor::put(unsigned int index, std::unique_ptr<WrapperBase> iWrapper) {
211  if (index >= dataProducts_.size()) {
212  throw cms::Exception("LogicError") << "Products must be declared to the TestProcessor::Config object\n"
213  "with a call to the function \'produces\' BEFORE passing the\n"
214  "TestProcessor::Config object to the TestProcessor constructor";
215  }
216  dataProducts_[index].second = std::move(iWrapper);
217  }
218 
220  bool result = arena_.execute([this]() {
221  setupProcessing();
222  event();
223 
224  return schedule_->totalEventsPassed() > 0;
225  });
226  schedule_->clearCounters();
227  if (esHelper_) {
228  //We want each test to have its own ES data products
229  esHelper_->resetAllProxies();
230  }
231  return edm::test::Event(
233  }
234 
236  arena_.execute([this, iNum]() {
237  if (not beginJobCalled_) {
238  beginJob();
239  }
240  if (not beginProcessBlockCalled_) {
242  }
243  if (not beginRunCalled_) {
244  beginRun();
245  }
246  if (beginLumiCalled_) {
248  assert(lumiNumber_ != iNum);
249  }
250  lumiNumber_ = iNum;
252  });
253 
254  if (esHelper_) {
255  //We want each test to have its own ES data products
256  esHelper_->resetAllProxies();
257  }
258 
260  }
261 
263  //using a return value from arena_.execute lead to double delete of shared_ptr
264  // based on valgrind output when exception occurred. Use lambda capture instead.
265  std::shared_ptr<edm::LuminosityBlockPrincipal> lumi;
266  arena_.execute([this, &lumi]() {
267  if (not beginJobCalled_) {
268  beginJob();
269  }
270  if (not beginProcessBlockCalled_) {
272  }
273  if (not beginRunCalled_) {
274  beginRun();
275  }
276  if (not beginLumiCalled_) {
278  }
280  });
281  if (esHelper_) {
282  //We want each test to have its own ES data products
283  esHelper_->resetAllProxies();
284  }
285 
287  }
288 
290  arena_.execute([this, iNum]() {
291  if (not beginJobCalled_) {
292  beginJob();
293  }
294  if (not beginProcessBlockCalled_) {
296  }
297  if (beginRunCalled_) {
298  assert(runNumber_ != iNum);
299  endRun();
300  }
301  runNumber_ = iNum;
302  beginRun();
303  });
304  if (esHelper_) {
305  //We want each test to have its own ES data products
306  esHelper_->resetAllProxies();
307  }
308 
309  return edm::test::Run(
311  }
313  //using a return value from arena_.execute lead to double delete of shared_ptr
314  // based on valgrind output when exception occurred. Use lambda capture instead.
315  std::shared_ptr<edm::RunPrincipal> rp;
316  arena_.execute([this, &rp]() {
317  if (not beginJobCalled_) {
318  beginJob();
319  }
320  if (not beginProcessBlockCalled_) {
322  }
323  if (not beginRunCalled_) {
324  beginRun();
325  }
326  rp = endRun();
327  });
328  if (esHelper_) {
329  //We want each test to have its own ES data products
330  esHelper_->resetAllProxies();
331  }
332 
333  return edm::test::Run(rp, labelOfTestModule_, processConfiguration_->processName());
334  }
335 
337  arena_.execute([this]() {
338  if (not beginJobCalled_) {
339  beginJob();
340  }
342  });
345  }
347  auto pbp = arena_.execute([this]() {
348  if (not beginJobCalled_) {
349  beginJob();
350  }
351  if (not beginProcessBlockCalled_) {
353  }
354  return endProcessBlock();
355  });
357  }
358 
360  if (not beginJobCalled_) {
361  beginJob();
362  }
363  if (not beginProcessBlockCalled_) {
365  }
366  if (not beginRunCalled_) {
367  beginRun();
368  }
369  if (not beginLumiCalled_) {
371  }
372  }
373 
375  arena_.execute([this]() {
376  if (beginLumiCalled_) {
378  beginLumiCalled_ = false;
379  }
380  if (beginRunCalled_) {
381  endRun();
382  beginRunCalled_ = false;
383  }
385  endProcessBlock();
386  beginProcessBlockCalled_ = false;
387  }
388  if (beginJobCalled_) {
389  endJob();
390  }
393  task.waitNoThrow();
394  });
395  }
396 
399 
404  actReg_->preallocateSignal_(bounds);
405  schedule_->convertCurrentProcessAlias(processConfiguration_->processName());
406  PathsAndConsumesOfModules pathsAndConsumesOfModules;
407 
408  //The code assumes only modules make data in the current process
409  // Since the test os also allowed to do so, it can lead to problems.
410  //pathsAndConsumesOfModules.initialize(schedule_.get(), preg_);
411 
412  //NOTE: this may throw
413  //checkForModuleDependencyCorrectness(pathsAndConsumesOfModules, false);
414  actReg_->preBeginJobSignal_(pathsAndConsumesOfModules, processContext_);
415 
416  espController_->finishConfiguration();
417 
418  schedule_->beginJob(*preg_, esp_->recordsToProxyIndices(), *processBlockHelper_);
419  actReg_->postBeginJobSignal_();
420 
421  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
422  schedule_->beginStream(i);
423  }
424  beginJobCalled_ = true;
425  }
426 
429  processBlockPrincipal.fillProcessBlockPrincipal(processConfiguration_->processName());
430 
431  std::vector<edm::SubProcess> emptyList;
432  {
433  ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
435  FinalWaitingTask globalWaitTask{taskGroup_};
436  beginGlobalTransitionAsync<Traits>(
437  WaitingTaskHolder(taskGroup_, &globalWaitTask), *schedule_, transitionInfo, serviceToken_, emptyList);
438  globalWaitTask.wait();
439  }
441  }
442 
444  ProcessHistoryID phid;
445  auto aux = std::make_shared<RunAuxiliary>(runNumber_, Timestamp(), Timestamp());
446  auto rp = std::make_shared<RunPrincipal>(aux, preg_, *processConfiguration_, historyAppender_.get(), 0);
447 
449  RunPrincipal& runPrincipal = principalCache_.runPrincipal(phid, runNumber_);
450 
451  IOVSyncValue ts(EventID(runPrincipal.run(), 0, 0), runPrincipal.beginTime());
453 
454  auto const& es = esp_->eventSetupImpl();
455 
456  RunTransitionInfo transitionInfo(runPrincipal, es);
457 
458  std::vector<edm::SubProcess> emptyList;
459  {
461  FinalWaitingTask globalWaitTask{taskGroup_};
462  beginGlobalTransitionAsync<Traits>(
463  WaitingTaskHolder(taskGroup_, &globalWaitTask), *schedule_, transitionInfo, serviceToken_, emptyList);
464  globalWaitTask.wait();
465  }
466  {
467  //To wait, the ref count has to be 1+#streams
468  FinalWaitingTask streamLoopWaitTask{taskGroup_};
469 
471  beginStreamsTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &streamLoopWaitTask),
472  *schedule_,
474  transitionInfo,
476  emptyList);
477  streamLoopWaitTask.wait();
478  }
479  beginRunCalled_ = true;
480  }
481 
485  lumiPrincipal_->clearPrincipal();
487  lumiPrincipal_->setAux(aux);
488 
489  lumiPrincipal_->setRunPrincipal(principalCache_.runPrincipalPtr());
490 
493 
494  auto const& es = esp_->eventSetupImpl();
495 
496  LumiTransitionInfo transitionInfo(*lumiPrincipal_, es, nullptr);
497 
498  std::vector<edm::SubProcess> emptyList;
499  {
501  FinalWaitingTask globalWaitTask{taskGroup_};
502  beginGlobalTransitionAsync<Traits>(
503  WaitingTaskHolder(taskGroup_, &globalWaitTask), *schedule_, transitionInfo, serviceToken_, emptyList);
504  globalWaitTask.wait();
505  }
506  {
507  //To wait, the ref count has to be 1+#streams
508  FinalWaitingTask streamLoopWaitTask{taskGroup_};
509 
511 
512  beginStreamsTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &streamLoopWaitTask),
513  *schedule_,
515  transitionInfo,
517  emptyList);
518 
519  streamLoopWaitTask.wait();
520  }
521  beginLumiCalled_ = true;
522  }
523 
525  auto pep = &(principalCache_.eventPrincipal(0));
526 
527  //this resets the EventPrincipal (if it had been used before)
528  pep->clearEventPrincipal();
529  pep->fillEventPrincipal(
531  nullptr,
532  nullptr);
533  assert(lumiPrincipal_.get() != nullptr);
534  pep->setLuminosityBlockPrincipal(lumiPrincipal_.get());
535 
536  for (auto& p : dataProducts_) {
537  if (p.second) {
538  pep->put(p.first, std::move(p.second), ProductProvenance(p.first.branchID()));
539  } else {
540  //The data product was not set so we need to
541  // tell the ProductResolver not to wait
542  auto r = pep->getProductResolver(p.first.branchID());
543  dynamic_cast<ProductPutterBase const*>(r)->putProduct(std::unique_ptr<WrapperBase>());
544  }
545  }
546 
548 
549  FinalWaitingTask waitTask{taskGroup_};
550 
551  EventTransitionInfo info(*pep, esp_->eventSetupImpl());
552  schedule_->processOneEventAsync(edm::WaitingTaskHolder(taskGroup_, &waitTask), 0, info, serviceToken_);
553 
554  waitTask.wait();
555  ++eventNumber_;
556  }
557 
558  std::shared_ptr<LuminosityBlockPrincipal> TestProcessor::endLuminosityBlock() {
559  auto lumiPrincipal = lumiPrincipal_;
560  if (beginLumiCalled_) {
561  beginLumiCalled_ = false;
562  lumiPrincipal_.reset();
563 
564  IOVSyncValue ts(EventID(runNumber_, lumiNumber_, eventNumber_), lumiPrincipal->endTime());
566 
567  auto const& es = esp_->eventSetupImpl();
568 
569  LumiTransitionInfo transitionInfo(*lumiPrincipal, es, nullptr);
570 
571  std::vector<edm::SubProcess> emptyList;
572 
573  //To wait, the ref count has to be 1+#streams
574  {
575  FinalWaitingTask streamLoopWaitTask{taskGroup_};
576 
578 
579  endStreamsTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &streamLoopWaitTask),
580  *schedule_,
582  transitionInfo,
584  emptyList,
585  false);
586 
587  streamLoopWaitTask.wait();
588  }
589  {
590  FinalWaitingTask globalWaitTask{taskGroup_};
591 
593  endGlobalTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &globalWaitTask),
594  *schedule_,
595  transitionInfo,
597  emptyList,
598  false);
599  globalWaitTask.wait();
600  }
601  }
602  return lumiPrincipal;
603  }
604 
605  std::shared_ptr<edm::RunPrincipal> TestProcessor::endRun() {
606  std::shared_ptr<RunPrincipal> rp;
607  if (beginRunCalled_) {
608  beginRunCalled_ = false;
609  ProcessHistoryID phid;
611  RunPrincipal& runPrincipal = *rp;
612 
613  IOVSyncValue ts(
615  runPrincipal.endTime());
617 
618  auto const& es = esp_->eventSetupImpl();
619 
620  RunTransitionInfo transitionInfo(runPrincipal, es);
621 
622  std::vector<edm::SubProcess> emptyList;
623 
624  //To wait, the ref count has to be 1+#streams
625  {
626  FinalWaitingTask streamLoopWaitTask{taskGroup_};
627 
629 
630  endStreamsTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &streamLoopWaitTask),
631  *schedule_,
633  transitionInfo,
635  emptyList,
636  false);
637 
638  streamLoopWaitTask.wait();
639  }
640  {
641  FinalWaitingTask globalWaitTask{taskGroup_};
642 
644  endGlobalTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &globalWaitTask),
645  *schedule_,
646  transitionInfo,
648  emptyList,
649  false);
650  globalWaitTask.wait();
651  }
652 
654  }
655  return rp;
656  }
657 
661  beginProcessBlockCalled_ = false;
662 
663  std::vector<edm::SubProcess> emptyList;
664  {
665  FinalWaitingTask globalWaitTask{taskGroup_};
666 
667  ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
669  endGlobalTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &globalWaitTask),
670  *schedule_,
671  transitionInfo,
673  emptyList,
674  false);
675  globalWaitTask.wait();
676  }
677  }
678  return &processBlockPrincipal;
679  }
680 
682  if (!beginJobCalled_) {
683  return;
684  }
685  beginJobCalled_ = false;
686 
687  // Collects exceptions, so we don't throw before all operations are performed.
689  "Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
690 
691  //make the services available
693 
694  //NOTE: this really should go elsewhere in the future
695  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
696  c.call([this, i]() { this->schedule_->endStream(i); });
697  }
698  auto actReg = actReg_.get();
699  c.call([actReg]() { actReg->preEndJobSignal_(); });
700  schedule_->endJob(c);
701  c.call([actReg]() { actReg->postEndJobSignal_(); });
702  if (c.hasThrown()) {
703  c.rethrow();
704  }
705  }
706 
708  if (beginRunCalled_) {
710  endRun();
711  }
712  runNumber_ = iRun;
713  }
716  lumiNumber_ = iLumi;
717  }
718 
720 
721  } // namespace test
722 } // namespace edm
void put(std::pair< edm::EDPutTokenT< T >, std::unique_ptr< T >> &&iPut)
std::shared_ptr< LuminosityBlockPrincipal > endLuminosityBlock()
std::shared_ptr< ActivityRegistry > actReg_
static const TGPicture * info(bool iBackgroundIsBlack)
edm::test::ProcessBlock testBeginProcessBlockImpl()
Timestamp const & endTime() const
Definition: RunPrincipal.h:69
#define xstr(s)
static PluginManager & configure(const Config &)
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
std::vector< std::pair< edm::BranchDescription, std::unique_ptr< WrapperBase > > > dataProducts_
static LuminosityBlockNumber_t maxLuminosityBlockNumber()
bool registerProcessHistory(ProcessHistory const &processHistory)
RunNumber_t run() const
Definition: RunPrincipal.h:61
LuminosityBlockNumber_t lumiNumber_
unsigned long long EventNumber_t
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const
edm::test::LuminosityBlock testEndLuminosityBlockImpl()
void ensureAvailableAccelerators(edm::ParameterSet const &parameterSet)
EventNumber_t eventNumber_
std::string const & pythonConfiguration() const
Definition: TestProcessor.h:94
assert(be >=bs)
ProcessContext processContext_
edm::test::Run testBeginRunImpl(edm::RunNumber_t iNum, std::pair< edm::test::ESPutTokenT< T >, std::unique_ptr< T >> &&iPut, U &&... iArgs)
unsigned int LuminosityBlockNumber_t
oneapi::tbb::task_group taskGroup_
PreallocationConfiguration preallocations_
TEMPL(T2) struct Divides void
Definition: Factorize.h:24
void validateTopLevelParameterSets(ParameterSet *processParameterSet)
~TestProcessor() noexcept(false)
void synchronousEventSetupForInstance(IOVSyncValue const &syncValue, oneapi::tbb::task_group &iGroup, eventsetup::EventSetupsController &espController)
void fillProcessBlockPrincipal(std::string const &processName, DelayedReader *reader=nullptr)
ProcessBlockPrincipal & processBlockPrincipal() const
std::vector< std::string > const & extraProcesses() const
void setEventNumber(edm::EventNumber_t)
std::shared_ptr< BranchIDListHelper > branchIDListHelper_
ParameterSetID id() const
Timestamp const & beginTime() const
Definition: RunPrincipal.h:67
ParameterSet const & registerIt()
PluginManager::Config config()
Definition: standard.cc:21
std::shared_ptr< EventSetupTestHelper > esHelper_
std::shared_ptr< ThinnedAssociationsHelper > thinnedAssociationsHelper_
std::string labelOfTestModule_
std::unique_ptr< eventsetup::EventSetupsController > espController_
edm::test::Event testImpl()
std::shared_ptr< eventsetup::EventSetupProvider > esp_
std::unique_ptr< Schedule > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
edm::test::LuminosityBlock testBeginLuminosityBlockImpl(edm::LuminosityBlockNumber_t iNum, std::pair< edm::test::ESPutTokenT< T >, std::unique_ptr< T >> &&iPut, U &&... iArgs)
edm::test::ProcessBlock testEndProcessBlockImpl()
void insert(std::unique_ptr< ProcessBlockPrincipal >)
std::vector< ProduceEntry > const & produceEntries() const
std::vector< ESProduceEntry > const & esProduceEntries() const
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
void setProcessHistoryRegistry(ProcessHistoryRegistry const &phr)
void setLuminosityBlockNumber(edm::LuminosityBlockNumber_t)
PrincipalCache principalCache_
ProcessBlockPrincipal const * endProcessBlock()
std::shared_ptr< ProductRegistry > preg_
void deleteRun(ProcessHistoryID const &phid, RunNumber_t run)
std::unique_ptr< ExceptionToActionTable const > act_table_
static EventNumber_t maxEventNumber()
Definition: EventID.h:96
oneapi::tbb::task_arena arena_
std::shared_ptr< RunPrincipal > const & runPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run) const
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
edm::test::Run testEndRunImpl()
void setRunNumber(edm::RunNumber_t)
HLT enums.
std::unique_ptr< HistoryAppender > historyAppender_
void emplace_back(Args &&... args)
Definition: Config.py:1
TestProcessor(Config const &iConfig, ServiceToken iToken=ServiceToken())
std::shared_ptr< LuminosityBlockPrincipal > getAvailableLumiPrincipalPtr()
unsigned int RunNumber_t
std::shared_ptr< LuminosityBlockPrincipal > lumiPrincipal_
static ParentageRegistry * instance()
std::shared_ptr< ProcessBlockHelper > processBlockHelper_
ProcessHistoryRegistry processHistoryRegistry_
std::shared_ptr< RunPrincipal > endRun()
bool insertMapped(value_type const &v)
def move(src, dest)
Definition: eostools.py:511