CMS 3D CMS Logo

TestProcessor.cc
Go to the documentation of this file.
1 // -*- C++ -*-
2 //
3 // Package: Subsystem/Package
4 // Class : TestProcessor
5 //
6 // Implementation:
7 // [Notes on implementation]
8 //
9 // Original Author: Chris Jones
10 // Created: Mon, 30 Apr 2018 18:51:08 GMT
11 //
12 
13 // system include files
14 
15 // user include files
18 
37 
40 
43 
47 
49 
52 
54 
55 #define xstr(s) str(s)
56 #define str(s) #s
57 
58 namespace edm {
59  namespace test {
60 
61  namespace {
62 
63  bool oneTimeInitializationImpl() {
65 
66  static std::unique_ptr<edm::ThreadsController> tsiPtr = std::make_unique<edm::ThreadsController>(1);
67 
68  // register the empty parentage vector , once and for all
70 
71  // register the empty parameter set, once and for all.
72  ParameterSet().registerIt();
73  return true;
74  }
75 
76  bool oneTimeInitialization() {
77  static const bool s_init{oneTimeInitializationImpl()};
78  return s_init;
79  }
80  } // namespace
81 
82  //
83  // constructors and destructor
84  //
86  : globalControl_(oneapi::tbb::global_control::max_allowed_parallelism, 1),
87  arena_(1),
88  historyAppender_(std::make_unique<HistoryAppender>()),
89  moduleRegistry_(std::make_shared<ModuleRegistry>()) {
90  //Setup various singletons
91  (void)oneTimeInitialization();
92 
94 
95  auto psetPtr = desc.parameterSet();
97  espController_ = std::make_unique<eventsetup::EventSetupsController>(moduleTypeResolverMaker_.get());
98 
99  validateTopLevelParameterSets(psetPtr.get());
100 
101  ensureAvailableAccelerators(*psetPtr);
102 
103  labelOfTestModule_ = psetPtr->getParameter<std::string>("@moduleToTest");
104 
105  auto procDesc = desc.processDesc();
106  // Now do general initialization
108 
109  //initialize the services
110  auto& serviceSets = procDesc->getServicesPSets();
111  ServiceToken token = items.initServices(serviceSets, *psetPtr, iToken, serviceregistry::kOverlapIsError, true);
112  serviceToken_ = items.addCPRandTNS(*psetPtr, token);
113 
114  //make the services available
116 
117  // intialize miscellaneous items
118  std::shared_ptr<CommonParams> common(items.initMisc(*psetPtr));
119 
120  // intialize the event setup provider
121  esp_ = espController_->makeProvider(*psetPtr, items.actReg_.get());
122 
123  auto nThreads = 1U;
124  auto nStreams = 1U;
125  auto nConcurrentLumis = 1U;
126  auto nConcurrentRuns = 1U;
127  preallocations_ = PreallocationConfiguration{nThreads, nStreams, nConcurrentLumis, nConcurrentRuns};
128 
129  if (not iConfig.esProduceEntries().empty()) {
130  esHelper_ = std::make_unique<EventSetupTestHelper>(iConfig.esProduceEntries());
131  esp_->add(std::dynamic_pointer_cast<eventsetup::DataProxyProvider>(esHelper_));
132  esp_->add(std::dynamic_pointer_cast<EventSetupRecordIntervalFinder>(esHelper_));
133  }
134 
135  preg_ = items.preg();
136  processConfiguration_ = items.processConfiguration();
137 
138  edm::ParameterSet emptyPSet;
139  emptyPSet.registerIt();
140  auto psetid = emptyPSet.id();
141 
142  ProcessHistory oldHistory;
143  for (auto const& p : iConfig.extraProcesses()) {
144  oldHistory.emplace_back(p, psetid, xstr(PROJECT_VERSION), "0");
146  }
147 
148  //setup the products we will be adding to the event
149  for (auto const& produce : iConfig.produceEntries()) {
150  auto processName = produce.processName_;
151  if (processName.empty()) {
152  processName = processConfiguration_->processName();
153  }
154  edm::TypeWithDict twd(produce.type_.typeInfo());
156  produce.moduleLabel_,
157  processName,
158  twd.userClassName(),
159  twd.friendlyClassName(),
160  produce.instanceLabel_,
161  "",
162  psetid,
163  twd,
164  true //force this to come from 'source'
165  );
166  product.init();
167  dataProducts_.emplace_back(product, std::unique_ptr<WrapperBase>());
168  preg_->addProduct(product);
169  }
170 
171  processBlockHelper_ = std::make_shared<ProcessBlockHelper>();
172 
173  schedule_ = items.initSchedule(
175  // set the data members
176  act_table_ = std::move(items.act_table_);
177  actReg_ = items.actReg_;
178  branchIDListHelper_ = items.branchIDListHelper();
179  thinnedAssociationsHelper_ = items.thinnedAssociationsHelper();
181 
183 
184  preg_->setFrozen();
185  for (unsigned int index = 0; index < preallocations_.numberOfStreams(); ++index) {
186  // Reusable event principal
187  auto ep = std::make_shared<EventPrincipal>(preg_,
191  historyAppender_.get(),
192  index);
194  }
195  for (unsigned int index = 0; index < preallocations_.numberOfRuns(); ++index) {
196  auto rp = std::make_unique<RunPrincipal>(preg_, *processConfiguration_, historyAppender_.get(), index);
198  }
199  for (unsigned int index = 0; index < preallocations_.numberOfLuminosityBlocks(); ++index) {
200  auto lp =
201  std::make_unique<LuminosityBlockPrincipal>(preg_, *processConfiguration_, historyAppender_.get(), index);
203  }
204  {
205  auto pb = std::make_unique<ProcessBlockPrincipal>(preg_, *processConfiguration_);
207  }
208  }
209 
211  //
212  // member functions
213  //
214 
215  void TestProcessor::put(unsigned int index, std::unique_ptr<WrapperBase> iWrapper) {
216  if (index >= dataProducts_.size()) {
217  throw cms::Exception("LogicError") << "Products must be declared to the TestProcessor::Config object\n"
218  "with a call to the function \'produces\' BEFORE passing the\n"
219  "TestProcessor::Config object to the TestProcessor constructor";
220  }
221  dataProducts_[index].second = std::move(iWrapper);
222  }
223 
225  bool result = arena_.execute([this]() {
226  setupProcessing();
227  event();
228 
229  return schedule_->totalEventsPassed() > 0;
230  });
231  schedule_->clearCounters();
232  if (esHelper_) {
233  //We want each test to have its own ES data products
234  esHelper_->resetAllProxies();
235  }
236  return edm::test::Event(
238  }
239 
241  arena_.execute([this, iNum]() {
242  if (not beginJobCalled_) {
243  beginJob();
244  }
245  if (not beginProcessBlockCalled_) {
247  }
248  if (not beginRunCalled_) {
249  beginRun();
250  }
251  if (beginLumiCalled_) {
253  assert(lumiNumber_ != iNum);
254  }
255  lumiNumber_ = iNum;
257  });
258 
259  if (esHelper_) {
260  //We want each test to have its own ES data products
261  esHelper_->resetAllProxies();
262  }
263 
265  }
266 
268  //using a return value from arena_.execute lead to double delete of shared_ptr
269  // based on valgrind output when exception occurred. Use lambda capture instead.
270  std::shared_ptr<edm::LuminosityBlockPrincipal> lumi;
271  arena_.execute([this, &lumi]() {
272  if (not beginJobCalled_) {
273  beginJob();
274  }
275  if (not beginProcessBlockCalled_) {
277  }
278  if (not beginRunCalled_) {
279  beginRun();
280  }
281  if (not beginLumiCalled_) {
283  }
285  });
286  if (esHelper_) {
287  //We want each test to have its own ES data products
288  esHelper_->resetAllProxies();
289  }
290 
292  }
293 
295  arena_.execute([this, iNum]() {
296  if (not beginJobCalled_) {
297  beginJob();
298  }
299  if (not beginProcessBlockCalled_) {
301  }
302  if (beginRunCalled_) {
303  assert(runNumber_ != iNum);
304  endRun();
305  }
306  runNumber_ = iNum;
307  beginRun();
308  });
309  if (esHelper_) {
310  //We want each test to have its own ES data products
311  esHelper_->resetAllProxies();
312  }
313 
315  }
317  //using a return value from arena_.execute lead to double delete of shared_ptr
318  // based on valgrind output when exception occurred. Use lambda capture instead.
319  std::shared_ptr<edm::RunPrincipal> rp;
320  arena_.execute([this, &rp]() {
321  if (not beginJobCalled_) {
322  beginJob();
323  }
324  if (not beginProcessBlockCalled_) {
326  }
327  if (not beginRunCalled_) {
328  beginRun();
329  }
330  rp = endRun();
331  });
332  if (esHelper_) {
333  //We want each test to have its own ES data products
334  esHelper_->resetAllProxies();
335  }
336 
337  return edm::test::Run(rp, labelOfTestModule_, processConfiguration_->processName());
338  }
339 
341  arena_.execute([this]() {
342  if (not beginJobCalled_) {
343  beginJob();
344  }
346  });
349  }
351  auto pbp = arena_.execute([this]() {
352  if (not beginJobCalled_) {
353  beginJob();
354  }
355  if (not beginProcessBlockCalled_) {
357  }
358  return endProcessBlock();
359  });
361  }
362 
364  if (not beginJobCalled_) {
365  beginJob();
366  }
367  if (not beginProcessBlockCalled_) {
369  }
370  if (not beginRunCalled_) {
371  beginRun();
372  }
373  if (not beginLumiCalled_) {
375  }
376  }
377 
379  arena_.execute([this]() {
380  if (beginLumiCalled_) {
382  beginLumiCalled_ = false;
383  }
384  if (beginRunCalled_) {
385  endRun();
386  beginRunCalled_ = false;
387  }
389  endProcessBlock();
390  beginProcessBlockCalled_ = false;
391  }
392  if (beginJobCalled_) {
393  endJob();
394  }
397  task.waitNoThrow();
398  });
399  }
400 
403 
408  actReg_->preallocateSignal_(bounds);
409  schedule_->convertCurrentProcessAlias(processConfiguration_->processName());
410  PathsAndConsumesOfModules pathsAndConsumesOfModules;
411 
412  //The code assumes only modules make data in the current process
413  // Since the test os also allowed to do so, it can lead to problems.
414  //pathsAndConsumesOfModules.initialize(schedule_.get(), preg_);
415 
416  //NOTE: this may throw
417  //checkForModuleDependencyCorrectness(pathsAndConsumesOfModules, false);
418  actReg_->preBeginJobSignal_(pathsAndConsumesOfModules, processContext_);
419 
420  espController_->finishConfiguration();
421 
422  schedule_->beginJob(*preg_, esp_->recordsToProxyIndices(), *processBlockHelper_);
423  actReg_->postBeginJobSignal_();
424 
425  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
426  schedule_->beginStream(i);
427  }
428  beginJobCalled_ = true;
429  }
430 
433  processBlockPrincipal.fillProcessBlockPrincipal(processConfiguration_->processName());
434 
435  std::vector<edm::SubProcess> emptyList;
436  {
437  ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
439  FinalWaitingTask globalWaitTask{taskGroup_};
440  beginGlobalTransitionAsync<Traits>(
441  WaitingTaskHolder(taskGroup_, &globalWaitTask), *schedule_, transitionInfo, serviceToken_, emptyList);
442  globalWaitTask.wait();
443  }
445  }
446 
449  runPrincipal_->clearPrincipal();
452 
453  IOVSyncValue ts(EventID(runPrincipal_->run(), 0, 0), runPrincipal_->beginTime());
455 
456  auto const& es = esp_->eventSetupImpl();
457 
458  RunTransitionInfo transitionInfo(*runPrincipal_, es, nullptr);
459 
460  std::vector<edm::SubProcess> emptyList;
461  {
463  FinalWaitingTask globalWaitTask{taskGroup_};
464  beginGlobalTransitionAsync<Traits>(
465  WaitingTaskHolder(taskGroup_, &globalWaitTask), *schedule_, transitionInfo, serviceToken_, emptyList);
466  globalWaitTask.wait();
467  }
468  {
469  //To wait, the ref count has to be 1+#streams
470  FinalWaitingTask streamLoopWaitTask{taskGroup_};
471 
473  beginStreamsTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &streamLoopWaitTask),
474  *schedule_,
476  transitionInfo,
478  emptyList);
479  streamLoopWaitTask.wait();
480  }
481  beginRunCalled_ = true;
482  }
483 
487  lumiPrincipal_->clearPrincipal();
489  lumiPrincipal_->setAux(aux);
490 
491  lumiPrincipal_->setRunPrincipal(runPrincipal_);
492 
495 
496  auto const& es = esp_->eventSetupImpl();
497 
498  LumiTransitionInfo transitionInfo(*lumiPrincipal_, es, nullptr);
499 
500  std::vector<edm::SubProcess> emptyList;
501  {
503  FinalWaitingTask globalWaitTask{taskGroup_};
504  beginGlobalTransitionAsync<Traits>(
505  WaitingTaskHolder(taskGroup_, &globalWaitTask), *schedule_, transitionInfo, serviceToken_, emptyList);
506  globalWaitTask.wait();
507  }
508  {
509  //To wait, the ref count has to be 1+#streams
510  FinalWaitingTask streamLoopWaitTask{taskGroup_};
511 
513 
514  beginStreamsTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &streamLoopWaitTask),
515  *schedule_,
517  transitionInfo,
519  emptyList);
520 
521  streamLoopWaitTask.wait();
522  }
523  beginLumiCalled_ = true;
524  }
525 
527  auto pep = &(principalCache_.eventPrincipal(0));
528 
529  //this resets the EventPrincipal (if it had been used before)
530  pep->clearEventPrincipal();
531  pep->fillEventPrincipal(
533  nullptr,
534  nullptr);
535  assert(lumiPrincipal_.get() != nullptr);
536  pep->setLuminosityBlockPrincipal(lumiPrincipal_.get());
537 
538  for (auto& p : dataProducts_) {
539  if (p.second) {
540  pep->put(p.first, std::move(p.second), ProductProvenance(p.first.branchID()));
541  } else {
542  //The data product was not set so we need to
543  // tell the ProductResolver not to wait
544  auto r = pep->getProductResolver(p.first.branchID());
545  dynamic_cast<ProductPutterBase const*>(r)->putProduct(std::unique_ptr<WrapperBase>());
546  }
547  }
548 
550 
551  FinalWaitingTask waitTask{taskGroup_};
552 
553  EventTransitionInfo info(*pep, esp_->eventSetupImpl());
554  schedule_->processOneEventAsync(edm::WaitingTaskHolder(taskGroup_, &waitTask), 0, info, serviceToken_);
555 
556  waitTask.wait();
557  ++eventNumber_;
558  }
559 
560  std::shared_ptr<LuminosityBlockPrincipal> TestProcessor::endLuminosityBlock() {
561  auto lumiPrincipal = lumiPrincipal_;
562  if (beginLumiCalled_) {
563  beginLumiCalled_ = false;
564  lumiPrincipal_.reset();
565 
566  IOVSyncValue ts(EventID(runNumber_, lumiNumber_, eventNumber_), lumiPrincipal->endTime());
568 
569  auto const& es = esp_->eventSetupImpl();
570 
571  LumiTransitionInfo transitionInfo(*lumiPrincipal, es, nullptr);
572 
573  std::vector<edm::SubProcess> emptyList;
574 
575  //To wait, the ref count has to be 1+#streams
576  {
577  FinalWaitingTask streamLoopWaitTask{taskGroup_};
578 
580 
581  endStreamsTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &streamLoopWaitTask),
582  *schedule_,
584  transitionInfo,
586  emptyList,
587  false);
588 
589  streamLoopWaitTask.wait();
590  }
591  {
592  FinalWaitingTask globalWaitTask{taskGroup_};
593 
595  endGlobalTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &globalWaitTask),
596  *schedule_,
597  transitionInfo,
599  emptyList,
600  false);
601  globalWaitTask.wait();
602  }
603  }
604  lumiPrincipal->setRunPrincipal(std::shared_ptr<RunPrincipal>());
605  return lumiPrincipal;
606  }
607 
608  std::shared_ptr<edm::RunPrincipal> TestProcessor::endRun() {
609  auto runPrincipal = runPrincipal_;
610  runPrincipal_.reset();
611  if (beginRunCalled_) {
612  beginRunCalled_ = false;
613 
614  IOVSyncValue ts(
616  runPrincipal->endTime());
618 
619  auto const& es = esp_->eventSetupImpl();
620 
621  RunTransitionInfo transitionInfo(*runPrincipal, es);
622 
623  std::vector<edm::SubProcess> emptyList;
624 
625  //To wait, the ref count has to be 1+#streams
626  {
627  FinalWaitingTask streamLoopWaitTask{taskGroup_};
628 
630 
631  endStreamsTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &streamLoopWaitTask),
632  *schedule_,
634  transitionInfo,
636  emptyList,
637  false);
638 
639  streamLoopWaitTask.wait();
640  }
641  {
642  FinalWaitingTask globalWaitTask{taskGroup_};
643 
645  endGlobalTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &globalWaitTask),
646  *schedule_,
647  transitionInfo,
649  emptyList,
650  false);
651  globalWaitTask.wait();
652  }
653  }
654  return runPrincipal;
655  }
656 
660  beginProcessBlockCalled_ = false;
661 
662  std::vector<edm::SubProcess> emptyList;
663  {
664  FinalWaitingTask globalWaitTask{taskGroup_};
665 
666  ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
668  endGlobalTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &globalWaitTask),
669  *schedule_,
670  transitionInfo,
672  emptyList,
673  false);
674  globalWaitTask.wait();
675  }
676  }
677  return &processBlockPrincipal;
678  }
679 
681  if (!beginJobCalled_) {
682  return;
683  }
684  beginJobCalled_ = false;
685 
686  // Collects exceptions, so we don't throw before all operations are performed.
688  "Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
689 
690  //make the services available
692 
693  //NOTE: this really should go elsewhere in the future
694  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
695  c.call([this, i]() { this->schedule_->endStream(i); });
696  }
697  auto actReg = actReg_.get();
698  c.call([actReg]() { actReg->preEndJobSignal_(); });
699  schedule_->endJob(c);
700  c.call([actReg]() { actReg->postEndJobSignal_(); });
701  if (c.hasThrown()) {
702  c.rethrow();
703  }
704  }
705 
707  if (beginRunCalled_) {
709  endRun();
710  }
711  runNumber_ = iRun;
712  }
715  lumiNumber_ = iLumi;
716  }
717 
719 
720  } // namespace test
721 } // namespace edm
void put(std::pair< edm::EDPutTokenT< T >, std::unique_ptr< T >> &&iPut)
std::shared_ptr< LuminosityBlockPrincipal > endLuminosityBlock()
std::shared_ptr< ActivityRegistry > actReg_
static const TGPicture * info(bool iBackgroundIsBlack)
edm::test::ProcessBlock testBeginProcessBlockImpl()
#define xstr(s)
static PluginManager & configure(const Config &)
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
std::vector< std::pair< edm::BranchDescription, std::unique_ptr< WrapperBase > > > dataProducts_
static LuminosityBlockNumber_t maxLuminosityBlockNumber()
bool registerProcessHistory(ProcessHistory const &processHistory)
LuminosityBlockNumber_t lumiNumber_
unsigned long long EventNumber_t
std::unique_ptr< edm::ModuleTypeResolverMaker const > makeModuleTypeResolverMaker(edm::ParameterSet const &pset)
edm::test::LuminosityBlock testEndLuminosityBlockImpl()
void ensureAvailableAccelerators(edm::ParameterSet const &parameterSet)
EventNumber_t eventNumber_
std::shared_ptr< RunPrincipal > getAvailableRunPrincipalPtr()
std::string const & pythonConfiguration() const
Definition: TestProcessor.h:95
std::shared_ptr< RunPrincipal > runPrincipal_
assert(be >=bs)
ProcessContext processContext_
edm::test::Run testBeginRunImpl(edm::RunNumber_t iNum, std::pair< edm::test::ESPutTokenT< T >, std::unique_ptr< T >> &&iPut, U &&... iArgs)
unsigned int LuminosityBlockNumber_t
oneapi::tbb::task_group taskGroup_
PreallocationConfiguration preallocations_
TEMPL(T2) struct Divides void
Definition: Factorize.h:24
void validateTopLevelParameterSets(ParameterSet *processParameterSet)
~TestProcessor() noexcept(false)
void synchronousEventSetupForInstance(IOVSyncValue const &syncValue, oneapi::tbb::task_group &iGroup, eventsetup::EventSetupsController &espController)
void fillProcessBlockPrincipal(std::string const &processName, DelayedReader *reader=nullptr)
ProcessBlockPrincipal & processBlockPrincipal() const
std::unique_ptr< ModuleTypeResolverMaker const > moduleTypeResolverMaker_
std::vector< std::string > const & extraProcesses() const
void setEventNumber(edm::EventNumber_t)
std::shared_ptr< BranchIDListHelper > branchIDListHelper_
ParameterSetID id() const
ParameterSet const & registerIt()
PluginManager::Config config()
Definition: standard.cc:21
std::shared_ptr< EventSetupTestHelper > esHelper_
std::shared_ptr< ThinnedAssociationsHelper > thinnedAssociationsHelper_
std::string labelOfTestModule_
std::unique_ptr< eventsetup::EventSetupsController > espController_
edm::test::Event testImpl()
std::shared_ptr< eventsetup::EventSetupProvider > esp_
std::unique_ptr< Schedule > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
edm::test::LuminosityBlock testBeginLuminosityBlockImpl(edm::LuminosityBlockNumber_t iNum, std::pair< edm::test::ESPutTokenT< T >, std::unique_ptr< T >> &&iPut, U &&... iArgs)
edm::test::ProcessBlock testEndProcessBlockImpl()
void insert(std::unique_ptr< ProcessBlockPrincipal >)
std::vector< ProduceEntry > const & produceEntries() const
std::vector< ESProduceEntry > const & esProduceEntries() const
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
void setLuminosityBlockNumber(edm::LuminosityBlockNumber_t)
PrincipalCache principalCache_
ProcessBlockPrincipal const * endProcessBlock()
std::shared_ptr< ProductRegistry > preg_
std::unique_ptr< ExceptionToActionTable const > act_table_
static EventNumber_t maxEventNumber()
Definition: EventID.h:96
oneapi::tbb::task_arena arena_
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
edm::test::Run testEndRunImpl()
void setRunNumber(edm::RunNumber_t)
HLT enums.
std::unique_ptr< HistoryAppender > historyAppender_
void emplace_back(Args &&... args)
Definition: Config.py:1
TestProcessor(Config const &iConfig, ServiceToken iToken=ServiceToken())
std::shared_ptr< LuminosityBlockPrincipal > getAvailableLumiPrincipalPtr()
unsigned int RunNumber_t
std::shared_ptr< LuminosityBlockPrincipal > lumiPrincipal_
static ParentageRegistry * instance()
std::shared_ptr< ProcessBlockHelper > processBlockHelper_
ProcessHistoryRegistry processHistoryRegistry_
std::shared_ptr< RunPrincipal > endRun()
bool insertMapped(value_type const &v)
def move(src, dest)
Definition: eostools.py:511