CMS 3D CMS Logo

TestProcessor.cc
Go to the documentation of this file.
1 // -*- C++ -*-
2 //
3 // Package: Subsystem/Package
4 // Class : TestProcessor
5 //
6 // Implementation:
7 // [Notes on implementation]
8 //
9 // Original Author: Chris Jones
10 // Created: Mon, 30 Apr 2018 18:51:08 GMT
11 //
12 
13 // system include files
14 
15 // user include files
18 
36 
39 
42 
46 
48 
51 
53 
54 #define xstr(s) str(s)
55 #define str(s) #s
56 
57 namespace edm {
58  namespace test {
59 
60  namespace {
61 
62  bool oneTimeInitializationImpl() {
64 
65  static std::unique_ptr<edm::ThreadsController> tsiPtr = std::make_unique<edm::ThreadsController>(1);
66 
67  // register the empty parentage vector , once and for all
69 
70  // register the empty parameter set, once and for all.
71  ParameterSet().registerIt();
72  return true;
73  }
74 
75  bool oneTimeInitialization() {
76  static const bool s_init{oneTimeInitializationImpl()};
77  return s_init;
78  }
79  } // namespace
80 
81  //
82  // constructors and destructor
83  //
85  : globalControl_(oneapi::tbb::global_control::max_allowed_parallelism, 1),
86  arena_(1),
87  espController_(std::make_unique<eventsetup::EventSetupsController>()),
88  historyAppender_(std::make_unique<HistoryAppender>()),
89  moduleRegistry_(std::make_shared<ModuleRegistry>()) {
90  //Setup various singletons
91  (void)oneTimeInitialization();
92 
94 
95  auto psetPtr = desc.parameterSet();
96 
97  validateTopLevelParameterSets(psetPtr.get());
98 
100 
101  labelOfTestModule_ = psetPtr->getParameter<std::string>("@moduleToTest");
102 
103  auto procDesc = desc.processDesc();
104  // Now do general initialization
106 
107  //initialize the services
108  auto& serviceSets = procDesc->getServicesPSets();
109  ServiceToken token = items.initServices(serviceSets, *psetPtr, iToken, serviceregistry::kOverlapIsError, true);
110  serviceToken_ = items.addCPRandTNS(*psetPtr, token);
111 
112  //make the services available
114 
115  // intialize miscellaneous items
116  std::shared_ptr<CommonParams> common(items.initMisc(*psetPtr));
117 
118  // intialize the event setup provider
119  esp_ = espController_->makeProvider(*psetPtr, items.actReg_.get());
120 
121  auto nThreads = 1U;
122  auto nStreams = 1U;
123  auto nConcurrentLumis = 1U;
124  auto nConcurrentRuns = 1U;
125  preallocations_ = PreallocationConfiguration{nThreads, nStreams, nConcurrentLumis, nConcurrentRuns};
126 
127  if (not iConfig.esProduceEntries().empty()) {
128  esHelper_ = std::make_unique<EventSetupTestHelper>(iConfig.esProduceEntries());
129  esp_->add(std::dynamic_pointer_cast<eventsetup::DataProxyProvider>(esHelper_));
130  esp_->add(std::dynamic_pointer_cast<EventSetupRecordIntervalFinder>(esHelper_));
131  }
132 
133  preg_ = items.preg();
134  processConfiguration_ = items.processConfiguration();
135 
136  edm::ParameterSet emptyPSet;
137  emptyPSet.registerIt();
138  auto psetid = emptyPSet.id();
139 
140  ProcessHistory oldHistory;
141  for (auto const& p : iConfig.extraProcesses()) {
142  oldHistory.emplace_back(p, psetid, xstr(PROJECT_VERSION), "0");
144  }
145 
146  //setup the products we will be adding to the event
147  for (auto const& produce : iConfig.produceEntries()) {
148  auto processName = produce.processName_;
149  if (processName.empty()) {
150  processName = processConfiguration_->processName();
151  }
152  edm::TypeWithDict twd(produce.type_.typeInfo());
154  produce.moduleLabel_,
155  processName,
156  twd.userClassName(),
157  twd.friendlyClassName(),
158  produce.instanceLabel_,
159  "",
160  psetid,
161  twd,
162  true //force this to come from 'source'
163  );
164  product.init();
165  dataProducts_.emplace_back(product, std::unique_ptr<WrapperBase>());
166  preg_->addProduct(product);
167  }
168 
169  processBlockHelper_ = std::make_shared<ProcessBlockHelper>();
170 
171  schedule_ = items.initSchedule(*psetPtr, false, preallocations_, &processContext_, *processBlockHelper_);
172  // set the data members
173  act_table_ = std::move(items.act_table_);
174  actReg_ = items.actReg_;
175  branchIDListHelper_ = items.branchIDListHelper();
176  thinnedAssociationsHelper_ = items.thinnedAssociationsHelper();
178 
180 
181  preg_->setFrozen();
182  for (unsigned int index = 0; index < preallocations_.numberOfStreams(); ++index) {
183  // Reusable event principal
184  auto ep = std::make_shared<EventPrincipal>(preg_,
188  historyAppender_.get(),
189  index);
191  }
192  for (unsigned int index = 0; index < preallocations_.numberOfRuns(); ++index) {
193  auto rp = std::make_unique<RunPrincipal>(preg_, *processConfiguration_, historyAppender_.get(), index);
195  }
196  for (unsigned int index = 0; index < preallocations_.numberOfLuminosityBlocks(); ++index) {
197  auto lp =
198  std::make_unique<LuminosityBlockPrincipal>(preg_, *processConfiguration_, historyAppender_.get(), index);
200  }
201  {
202  auto pb = std::make_unique<ProcessBlockPrincipal>(preg_, *processConfiguration_);
204  }
205  }
206 
208  //
209  // member functions
210  //
211 
212  void TestProcessor::put(unsigned int index, std::unique_ptr<WrapperBase> iWrapper) {
213  if (index >= dataProducts_.size()) {
214  throw cms::Exception("LogicError") << "Products must be declared to the TestProcessor::Config object\n"
215  "with a call to the function \'produces\' BEFORE passing the\n"
216  "TestProcessor::Config object to the TestProcessor constructor";
217  }
218  dataProducts_[index].second = std::move(iWrapper);
219  }
220 
222  bool result = arena_.execute([this]() {
223  setupProcessing();
224  event();
225 
226  return schedule_->totalEventsPassed() > 0;
227  });
228  schedule_->clearCounters();
229  if (esHelper_) {
230  //We want each test to have its own ES data products
231  esHelper_->resetAllProxies();
232  }
233  return edm::test::Event(
235  }
236 
238  arena_.execute([this, iNum]() {
239  if (not beginJobCalled_) {
240  beginJob();
241  }
242  if (not beginProcessBlockCalled_) {
244  }
245  if (not beginRunCalled_) {
246  beginRun();
247  }
248  if (beginLumiCalled_) {
250  assert(lumiNumber_ != iNum);
251  }
252  lumiNumber_ = iNum;
254  });
255 
256  if (esHelper_) {
257  //We want each test to have its own ES data products
258  esHelper_->resetAllProxies();
259  }
260 
262  }
263 
265  //using a return value from arena_.execute lead to double delete of shared_ptr
266  // based on valgrind output when exception occurred. Use lambda capture instead.
267  std::shared_ptr<edm::LuminosityBlockPrincipal> lumi;
268  arena_.execute([this, &lumi]() {
269  if (not beginJobCalled_) {
270  beginJob();
271  }
272  if (not beginProcessBlockCalled_) {
274  }
275  if (not beginRunCalled_) {
276  beginRun();
277  }
278  if (not beginLumiCalled_) {
280  }
282  });
283  if (esHelper_) {
284  //We want each test to have its own ES data products
285  esHelper_->resetAllProxies();
286  }
287 
289  }
290 
292  arena_.execute([this, iNum]() {
293  if (not beginJobCalled_) {
294  beginJob();
295  }
296  if (not beginProcessBlockCalled_) {
298  }
299  if (beginRunCalled_) {
300  assert(runNumber_ != iNum);
301  endRun();
302  }
303  runNumber_ = iNum;
304  beginRun();
305  });
306  if (esHelper_) {
307  //We want each test to have its own ES data products
308  esHelper_->resetAllProxies();
309  }
310 
312  }
314  //using a return value from arena_.execute lead to double delete of shared_ptr
315  // based on valgrind output when exception occurred. Use lambda capture instead.
316  std::shared_ptr<edm::RunPrincipal> rp;
317  arena_.execute([this, &rp]() {
318  if (not beginJobCalled_) {
319  beginJob();
320  }
321  if (not beginProcessBlockCalled_) {
323  }
324  if (not beginRunCalled_) {
325  beginRun();
326  }
327  rp = endRun();
328  });
329  if (esHelper_) {
330  //We want each test to have its own ES data products
331  esHelper_->resetAllProxies();
332  }
333 
334  return edm::test::Run(rp, labelOfTestModule_, processConfiguration_->processName());
335  }
336 
338  arena_.execute([this]() {
339  if (not beginJobCalled_) {
340  beginJob();
341  }
343  });
346  }
348  auto pbp = arena_.execute([this]() {
349  if (not beginJobCalled_) {
350  beginJob();
351  }
352  if (not beginProcessBlockCalled_) {
354  }
355  return endProcessBlock();
356  });
358  }
359 
361  if (not beginJobCalled_) {
362  beginJob();
363  }
364  if (not beginProcessBlockCalled_) {
366  }
367  if (not beginRunCalled_) {
368  beginRun();
369  }
370  if (not beginLumiCalled_) {
372  }
373  }
374 
376  arena_.execute([this]() {
377  if (beginLumiCalled_) {
379  beginLumiCalled_ = false;
380  }
381  if (beginRunCalled_) {
382  endRun();
383  beginRunCalled_ = false;
384  }
386  endProcessBlock();
387  beginProcessBlockCalled_ = false;
388  }
389  if (beginJobCalled_) {
390  endJob();
391  }
394  task.waitNoThrow();
395  });
396  }
397 
400 
405  actReg_->preallocateSignal_(bounds);
406  schedule_->convertCurrentProcessAlias(processConfiguration_->processName());
407  PathsAndConsumesOfModules pathsAndConsumesOfModules;
408 
409  //The code assumes only modules make data in the current process
410  // Since the test os also allowed to do so, it can lead to problems.
411  //pathsAndConsumesOfModules.initialize(schedule_.get(), preg_);
412 
413  //NOTE: this may throw
414  //checkForModuleDependencyCorrectness(pathsAndConsumesOfModules, false);
415  actReg_->preBeginJobSignal_(pathsAndConsumesOfModules, processContext_);
416 
417  espController_->finishConfiguration();
418 
419  schedule_->beginJob(*preg_, esp_->recordsToProxyIndices(), *processBlockHelper_);
420  actReg_->postBeginJobSignal_();
421 
422  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
423  schedule_->beginStream(i);
424  }
425  beginJobCalled_ = true;
426  }
427 
430  processBlockPrincipal.fillProcessBlockPrincipal(processConfiguration_->processName());
431 
432  std::vector<edm::SubProcess> emptyList;
433  {
434  ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
436  FinalWaitingTask globalWaitTask{taskGroup_};
437  beginGlobalTransitionAsync<Traits>(
438  WaitingTaskHolder(taskGroup_, &globalWaitTask), *schedule_, transitionInfo, serviceToken_, emptyList);
439  globalWaitTask.wait();
440  }
442  }
443 
446  runPrincipal_->clearPrincipal();
449 
450  IOVSyncValue ts(EventID(runPrincipal_->run(), 0, 0), runPrincipal_->beginTime());
452 
453  auto const& es = esp_->eventSetupImpl();
454 
455  RunTransitionInfo transitionInfo(*runPrincipal_, es, nullptr);
456 
457  std::vector<edm::SubProcess> emptyList;
458  {
460  FinalWaitingTask globalWaitTask{taskGroup_};
461  beginGlobalTransitionAsync<Traits>(
462  WaitingTaskHolder(taskGroup_, &globalWaitTask), *schedule_, transitionInfo, serviceToken_, emptyList);
463  globalWaitTask.wait();
464  }
465  {
466  //To wait, the ref count has to be 1+#streams
467  FinalWaitingTask streamLoopWaitTask{taskGroup_};
468 
470  beginStreamsTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &streamLoopWaitTask),
471  *schedule_,
473  transitionInfo,
475  emptyList);
476  streamLoopWaitTask.wait();
477  }
478  beginRunCalled_ = true;
479  }
480 
484  lumiPrincipal_->clearPrincipal();
486  lumiPrincipal_->setAux(aux);
487 
488  lumiPrincipal_->setRunPrincipal(runPrincipal_);
489 
492 
493  auto const& es = esp_->eventSetupImpl();
494 
495  LumiTransitionInfo transitionInfo(*lumiPrincipal_, es, nullptr);
496 
497  std::vector<edm::SubProcess> emptyList;
498  {
500  FinalWaitingTask globalWaitTask{taskGroup_};
501  beginGlobalTransitionAsync<Traits>(
502  WaitingTaskHolder(taskGroup_, &globalWaitTask), *schedule_, transitionInfo, serviceToken_, emptyList);
503  globalWaitTask.wait();
504  }
505  {
506  //To wait, the ref count has to be 1+#streams
507  FinalWaitingTask streamLoopWaitTask{taskGroup_};
508 
510 
511  beginStreamsTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &streamLoopWaitTask),
512  *schedule_,
514  transitionInfo,
516  emptyList);
517 
518  streamLoopWaitTask.wait();
519  }
520  beginLumiCalled_ = true;
521  }
522 
524  auto pep = &(principalCache_.eventPrincipal(0));
525 
526  //this resets the EventPrincipal (if it had been used before)
527  pep->clearEventPrincipal();
528  pep->fillEventPrincipal(
530  nullptr,
531  nullptr);
532  assert(lumiPrincipal_.get() != nullptr);
533  pep->setLuminosityBlockPrincipal(lumiPrincipal_.get());
534 
535  for (auto& p : dataProducts_) {
536  if (p.second) {
537  pep->put(p.first, std::move(p.second), ProductProvenance(p.first.branchID()));
538  } else {
539  //The data product was not set so we need to
540  // tell the ProductResolver not to wait
541  auto r = pep->getProductResolver(p.first.branchID());
542  dynamic_cast<ProductPutterBase const*>(r)->putProduct(std::unique_ptr<WrapperBase>());
543  }
544  }
545 
547 
548  FinalWaitingTask waitTask{taskGroup_};
549 
550  EventTransitionInfo info(*pep, esp_->eventSetupImpl());
551  schedule_->processOneEventAsync(edm::WaitingTaskHolder(taskGroup_, &waitTask), 0, info, serviceToken_);
552 
553  waitTask.wait();
554  ++eventNumber_;
555  }
556 
557  std::shared_ptr<LuminosityBlockPrincipal> TestProcessor::endLuminosityBlock() {
558  auto lumiPrincipal = lumiPrincipal_;
559  if (beginLumiCalled_) {
560  beginLumiCalled_ = false;
561  lumiPrincipal_.reset();
562 
563  IOVSyncValue ts(EventID(runNumber_, lumiNumber_, eventNumber_), lumiPrincipal->endTime());
565 
566  auto const& es = esp_->eventSetupImpl();
567 
568  LumiTransitionInfo transitionInfo(*lumiPrincipal, es, nullptr);
569 
570  std::vector<edm::SubProcess> emptyList;
571 
572  //To wait, the ref count has to be 1+#streams
573  {
574  FinalWaitingTask streamLoopWaitTask{taskGroup_};
575 
577 
578  endStreamsTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &streamLoopWaitTask),
579  *schedule_,
581  transitionInfo,
583  emptyList,
584  false);
585 
586  streamLoopWaitTask.wait();
587  }
588  {
589  FinalWaitingTask globalWaitTask{taskGroup_};
590 
592  endGlobalTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &globalWaitTask),
593  *schedule_,
594  transitionInfo,
596  emptyList,
597  false);
598  globalWaitTask.wait();
599  }
600  }
601  lumiPrincipal->setRunPrincipal(std::shared_ptr<RunPrincipal>());
602  return lumiPrincipal;
603  }
604 
605  std::shared_ptr<edm::RunPrincipal> TestProcessor::endRun() {
606  auto runPrincipal = runPrincipal_;
607  runPrincipal_.reset();
608  if (beginRunCalled_) {
609  beginRunCalled_ = false;
610 
611  IOVSyncValue ts(
613  runPrincipal->endTime());
615 
616  auto const& es = esp_->eventSetupImpl();
617 
618  RunTransitionInfo transitionInfo(*runPrincipal, es);
619 
620  std::vector<edm::SubProcess> emptyList;
621 
622  //To wait, the ref count has to be 1+#streams
623  {
624  FinalWaitingTask streamLoopWaitTask{taskGroup_};
625 
627 
628  endStreamsTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &streamLoopWaitTask),
629  *schedule_,
631  transitionInfo,
633  emptyList,
634  false);
635 
636  streamLoopWaitTask.wait();
637  }
638  {
639  FinalWaitingTask globalWaitTask{taskGroup_};
640 
642  endGlobalTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &globalWaitTask),
643  *schedule_,
644  transitionInfo,
646  emptyList,
647  false);
648  globalWaitTask.wait();
649  }
650  }
651  return runPrincipal;
652  }
653 
657  beginProcessBlockCalled_ = false;
658 
659  std::vector<edm::SubProcess> emptyList;
660  {
661  FinalWaitingTask globalWaitTask{taskGroup_};
662 
663  ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
665  endGlobalTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &globalWaitTask),
666  *schedule_,
667  transitionInfo,
669  emptyList,
670  false);
671  globalWaitTask.wait();
672  }
673  }
674  return &processBlockPrincipal;
675  }
676 
678  if (!beginJobCalled_) {
679  return;
680  }
681  beginJobCalled_ = false;
682 
683  // Collects exceptions, so we don't throw before all operations are performed.
685  "Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
686 
687  //make the services available
689 
690  //NOTE: this really should go elsewhere in the future
691  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
692  c.call([this, i]() { this->schedule_->endStream(i); });
693  }
694  auto actReg = actReg_.get();
695  c.call([actReg]() { actReg->preEndJobSignal_(); });
696  schedule_->endJob(c);
697  c.call([actReg]() { actReg->postEndJobSignal_(); });
698  if (c.hasThrown()) {
699  c.rethrow();
700  }
701  }
702 
704  if (beginRunCalled_) {
706  endRun();
707  }
708  runNumber_ = iRun;
709  }
712  lumiNumber_ = iLumi;
713  }
714 
716 
717  } // namespace test
718 } // namespace edm
void put(std::pair< edm::EDPutTokenT< T >, std::unique_ptr< T >> &&iPut)
std::shared_ptr< LuminosityBlockPrincipal > endLuminosityBlock()
std::shared_ptr< ActivityRegistry > actReg_
static const TGPicture * info(bool iBackgroundIsBlack)
edm::test::ProcessBlock testBeginProcessBlockImpl()
#define xstr(s)
static PluginManager & configure(const Config &)
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
std::vector< std::pair< edm::BranchDescription, std::unique_ptr< WrapperBase > > > dataProducts_
static LuminosityBlockNumber_t maxLuminosityBlockNumber()
bool registerProcessHistory(ProcessHistory const &processHistory)
LuminosityBlockNumber_t lumiNumber_
unsigned long long EventNumber_t
edm::test::LuminosityBlock testEndLuminosityBlockImpl()
void ensureAvailableAccelerators(edm::ParameterSet const &parameterSet)
EventNumber_t eventNumber_
std::shared_ptr< RunPrincipal > getAvailableRunPrincipalPtr()
std::string const & pythonConfiguration() const
Definition: TestProcessor.h:94
std::shared_ptr< RunPrincipal > runPrincipal_
assert(be >=bs)
ProcessContext processContext_
edm::test::Run testBeginRunImpl(edm::RunNumber_t iNum, std::pair< edm::test::ESPutTokenT< T >, std::unique_ptr< T >> &&iPut, U &&... iArgs)
unsigned int LuminosityBlockNumber_t
oneapi::tbb::task_group taskGroup_
PreallocationConfiguration preallocations_
TEMPL(T2) struct Divides void
Definition: Factorize.h:24
void validateTopLevelParameterSets(ParameterSet *processParameterSet)
~TestProcessor() noexcept(false)
void synchronousEventSetupForInstance(IOVSyncValue const &syncValue, oneapi::tbb::task_group &iGroup, eventsetup::EventSetupsController &espController)
void fillProcessBlockPrincipal(std::string const &processName, DelayedReader *reader=nullptr)
ProcessBlockPrincipal & processBlockPrincipal() const
std::vector< std::string > const & extraProcesses() const
void setEventNumber(edm::EventNumber_t)
std::shared_ptr< BranchIDListHelper > branchIDListHelper_
ParameterSetID id() const
ParameterSet const & registerIt()
PluginManager::Config config()
Definition: standard.cc:21
std::shared_ptr< EventSetupTestHelper > esHelper_
std::shared_ptr< ThinnedAssociationsHelper > thinnedAssociationsHelper_
std::string labelOfTestModule_
std::unique_ptr< eventsetup::EventSetupsController > espController_
edm::test::Event testImpl()
std::shared_ptr< eventsetup::EventSetupProvider > esp_
std::unique_ptr< Schedule > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
edm::test::LuminosityBlock testBeginLuminosityBlockImpl(edm::LuminosityBlockNumber_t iNum, std::pair< edm::test::ESPutTokenT< T >, std::unique_ptr< T >> &&iPut, U &&... iArgs)
edm::test::ProcessBlock testEndProcessBlockImpl()
void insert(std::unique_ptr< ProcessBlockPrincipal >)
std::vector< ProduceEntry > const & produceEntries() const
std::vector< ESProduceEntry > const & esProduceEntries() const
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
void setLuminosityBlockNumber(edm::LuminosityBlockNumber_t)
PrincipalCache principalCache_
ProcessBlockPrincipal const * endProcessBlock()
std::shared_ptr< ProductRegistry > preg_
std::unique_ptr< ExceptionToActionTable const > act_table_
static EventNumber_t maxEventNumber()
Definition: EventID.h:96
oneapi::tbb::task_arena arena_
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
edm::test::Run testEndRunImpl()
void setRunNumber(edm::RunNumber_t)
HLT enums.
std::unique_ptr< HistoryAppender > historyAppender_
void emplace_back(Args &&... args)
Definition: Config.py:1
TestProcessor(Config const &iConfig, ServiceToken iToken=ServiceToken())
std::shared_ptr< LuminosityBlockPrincipal > getAvailableLumiPrincipalPtr()
unsigned int RunNumber_t
std::shared_ptr< LuminosityBlockPrincipal > lumiPrincipal_
static ParentageRegistry * instance()
std::shared_ptr< ProcessBlockHelper > processBlockHelper_
ProcessHistoryRegistry processHistoryRegistry_
std::shared_ptr< RunPrincipal > endRun()
bool insertMapped(value_type const &v)
def move(src, dest)
Definition: eostools.py:511