CMS 3D CMS Logo

TestProcessor.cc
Go to the documentation of this file.
1 // -*- C++ -*-
2 //
3 // Package: Subsystem/Package
4 // Class : TestProcessor
5 //
6 // Implementation:
7 // [Notes on implementation]
8 //
9 // Original Author: root
10 // Created: Mon, 30 Apr 2018 18:51:08 GMT
11 //
12 
13 // system include files
14 
15 // user include files
18 
30 
33 
36 
40 
42 
44 
45 #include "tbb/task_scheduler_init.h"
46 
47 #define xstr(s) str(s)
48 #define str(s) #s
49 
50 namespace edm {
51  namespace test {
52 
53  //
54  // constants, enums and typedefs
55  //
56 
57  //
58  // static data member definitions
59  //
60  namespace {
61 
62  bool oneTimeInitializationImpl() {
64 
65  static std::unique_ptr<tbb::task_scheduler_init> tsiPtr = std::make_unique<tbb::task_scheduler_init>(1);
66 
67  // register the empty parentage vector , once and for all
69 
70  // register the empty parameter set, once and for all.
71  ParameterSet().registerIt();
72  return true;
73  }
74 
75  bool oneTimeInitialization() {
76  static const bool s_init{oneTimeInitializationImpl()};
77  return s_init;
78  }
79  } // namespace
80 
81  //
82  // constructors and destructor
83  //
85  : espController_(std::make_unique<eventsetup::EventSetupsController>()),
86  historyAppender_(std::make_unique<HistoryAppender>()),
87  moduleRegistry_(std::make_shared<ModuleRegistry>()) {
88  //Setup various singletons
89  (void)oneTimeInitialization();
90 
91  ProcessDescImpl desc(iConfig.pythonConfiguration());
92 
93  auto psetPtr = desc.parameterSet();
94 
95  validateTopLevelParameterSets(psetPtr.get());
96 
97  labelOfTestModule_ = psetPtr->getParameter<std::string>("@moduleToTest");
98 
99  auto procDesc = desc.processDesc();
100  // Now do general initialization
102 
103  //initialize the services
104  auto& serviceSets = procDesc->getServicesPSets();
105  ServiceToken token = items.initServices(serviceSets, *psetPtr, iToken, serviceregistry::kOverlapIsError, true);
106  serviceToken_ = items.addCPRandTNS(*psetPtr, token);
107 
108  //make the services available
110 
111  // intialize miscellaneous items
112  std::shared_ptr<CommonParams> common(items.initMisc(*psetPtr));
113 
114  // intialize the event setup provider
115  esp_ = espController_->makeProvider(*psetPtr, items.actReg_.get());
116 
117  if (not iConfig.esProduceEntries().empty()) {
118  esHelper_ = std::make_unique<EventSetupTestHelper>(iConfig.esProduceEntries());
119  esp_->add(std::dynamic_pointer_cast<eventsetup::DataProxyProvider>(esHelper_));
120  esp_->add(std::dynamic_pointer_cast<EventSetupRecordIntervalFinder>(esHelper_));
121  }
122 
123  auto nThreads = 1U;
124  auto nStreams = 1U;
125  auto nConcurrentLumis = 1U;
126  auto nConcurrentRuns = 1U;
127  preallocations_ = PreallocationConfiguration{nThreads, nStreams, nConcurrentLumis, nConcurrentRuns};
128 
129  preg_ = items.preg();
131 
132  edm::ParameterSet emptyPSet;
133  emptyPSet.registerIt();
134  auto psetid = emptyPSet.id();
135 
136  ProcessHistory oldHistory;
137  for (auto const& p : iConfig.extraProcesses()) {
138  oldHistory.emplace_back(p, psetid, xstr(PROJECT_VERSION), "0");
140  }
141 
142  //setup the products we will be adding to the event
143  for (auto const& produce : iConfig.produceEntries()) {
144  auto processName = produce.processName_;
145  if (processName.empty()) {
146  processName = processConfiguration_->processName();
147  }
148  edm::TypeWithDict twd(produce.type_.typeInfo());
150  produce.moduleLabel_,
151  processName,
152  twd.userClassName(),
153  twd.friendlyClassName(),
154  produce.instanceLabel_,
155  "",
156  psetid,
157  twd,
158  true //force this to come from 'source'
159  );
160  product.init();
161  dataProducts_.emplace_back(product, std::unique_ptr<WrapperBase>());
162  preg_->addProduct(product);
163  }
164 
165  schedule_ = items.initSchedule(*psetPtr, false, preallocations_, &processContext_);
166  // set the data members
168  actReg_ = items.actReg_;
173 
175 
176  preg_->setFrozen();
177  for (unsigned int index = 0; index < preallocations_.numberOfStreams(); ++index) {
178  // Reusable event principal
179  auto ep = std::make_shared<EventPrincipal>(preg_,
183  historyAppender_.get(),
184  index);
186  }
187 
188  for (unsigned int index = 0; index < preallocations_.numberOfLuminosityBlocks(); ++index) {
189  auto lp =
190  std::make_unique<LuminosityBlockPrincipal>(preg_, *processConfiguration_, historyAppender_.get(), index);
192  }
193  }
194 
196  //
197  // member functions
198  //
199 
200  void TestProcessor::put(unsigned int index, std::unique_ptr<WrapperBase> iWrapper) {
201  if (index >= dataProducts_.size()) {
202  throw cms::Exception("LogicError") << "Products must be declared to the TestProcessor::Config object\n"
203  "with a call to the function \'produces\' BEFORE passing the\n"
204  "TestProcessor::Config object to the TestProcessor constructor";
205  }
206  dataProducts_[index].second = std::move(iWrapper);
207  }
208 
210  setupProcessing();
211  event();
212 
213  bool result = schedule_->totalEventsPassed() > 0;
214  schedule_->clearCounters();
215  if (esHelper_) {
216  //We want each test to have its own ES data products
217  esHelper_->resetAllProxies();
218  }
219  return edm::test::Event(
221  }
222 
224  if (not beginJobCalled_) {
225  beginJob();
226  }
227  if (not beginRunCalled_) {
228  beginRun();
229  }
230  if (beginLumiCalled_) {
232  assert(lumiNumber_ != iNum);
233  }
234  lumiNumber_ = iNum;
236 
237  if (esHelper_) {
238  //We want each test to have its own ES data products
239  esHelper_->resetAllProxies();
240  }
241 
243  }
244 
246  if (not beginJobCalled_) {
247  beginJob();
248  }
249  if (not beginRunCalled_) {
250  beginRun();
251  }
252  if (not beginLumiCalled_) {
254  }
255  auto lumi = endLuminosityBlock();
256 
257  if (esHelper_) {
258  //We want each test to have its own ES data products
259  esHelper_->resetAllProxies();
260  }
261 
263  }
264 
266  if (not beginJobCalled_) {
267  beginJob();
268  }
269 
270  if (beginRunCalled_) {
271  assert(runNumber_ != iNum);
272  endRun();
273  }
274  runNumber_ = iNum;
275  beginRun();
276 
277  if (esHelper_) {
278  //We want each test to have its own ES data products
279  esHelper_->resetAllProxies();
280  }
281 
282  return edm::test::Run(
284  }
286  if (not beginJobCalled_) {
287  beginJob();
288  }
289  if (not beginRunCalled_) {
290  beginRun();
291  }
292  auto rp = endRun();
293 
294  if (esHelper_) {
295  //We want each test to have its own ES data products
296  esHelper_->resetAllProxies();
297  }
298 
299  return edm::test::Run(rp, labelOfTestModule_, processConfiguration_->processName());
300  }
301 
303  if (not beginJobCalled_) {
304  beginJob();
305  }
306  if (not beginRunCalled_) {
307  beginRun();
308  }
309  if (not beginLumiCalled_) {
311  }
312  }
313 
315  if (beginLumiCalled_) {
317  beginLumiCalled_ = false;
318  }
319  if (beginRunCalled_) {
320  endRun();
321  beginRunCalled_ = false;
322  }
323  if (beginJobCalled_) {
324  endJob();
325  }
326  }
327 
330 
335  actReg_->preallocateSignal_(bounds);
336  schedule_->convertCurrentProcessAlias(processConfiguration_->processName());
337  PathsAndConsumesOfModules pathsAndConsumesOfModules;
338 
339  //The code assumes only modules make data in the current process
340  // Since the test os also allowed to do so, it can lead to problems.
341  //pathsAndConsumesOfModules.initialize(schedule_.get(), preg_);
342 
343  //NOTE: this may throw
344  //checkForModuleDependencyCorrectness(pathsAndConsumesOfModules, false);
345  actReg_->preBeginJobSignal_(pathsAndConsumesOfModules, processContext_);
346 
347  schedule_->beginJob(*preg_, esp_->recordsToProxyIndices());
348  actReg_->postBeginJobSignal_();
349 
350  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
351  schedule_->beginStream(i);
352  }
353  beginJobCalled_ = true;
354  }
355 
357  ProcessHistoryID phid;
358  auto aux = std::make_shared<RunAuxiliary>(runNumber_, Timestamp(), Timestamp());
359  auto rp = std::make_shared<RunPrincipal>(aux, preg_, *processConfiguration_, historyAppender_.get(), 0);
360 
362  RunPrincipal& runPrincipal = principalCache_.runPrincipal(phid, runNumber_);
363 
364  IOVSyncValue ts(EventID(runPrincipal.run(), 0, 0), runPrincipal.beginTime());
365  espController_->eventSetupForInstance(ts);
366 
367  auto const& es = esp_->eventSetup();
368 
369  std::vector<edm::SubProcess> emptyList;
370  {
372  auto globalWaitTask = make_empty_waiting_task();
373  globalWaitTask->increment_ref_count();
374  beginGlobalTransitionAsync<Traits>(
375  WaitingTaskHolder(globalWaitTask.get()), *schedule_, runPrincipal, ts, es, serviceToken_, emptyList);
376  globalWaitTask->wait_for_all();
377  if (globalWaitTask->exceptionPtr() != nullptr) {
378  std::rethrow_exception(*(globalWaitTask->exceptionPtr()));
379  }
380  }
381  {
382  //To wait, the ref count has to be 1+#streams
383  auto streamLoopWaitTask = make_empty_waiting_task();
384  streamLoopWaitTask->increment_ref_count();
385 
387 
388  beginStreamsTransitionAsync<Traits>(streamLoopWaitTask.get(),
389  *schedule_,
391  runPrincipal,
392  ts,
393  es,
395  emptyList);
396 
397  streamLoopWaitTask->wait_for_all();
398  if (streamLoopWaitTask->exceptionPtr() != nullptr) {
399  std::rethrow_exception(*(streamLoopWaitTask->exceptionPtr()));
400  }
401  }
402  beginRunCalled_ = true;
403  }
404 
408  lumiPrincipal_->clearPrincipal();
409  assert(lumiPrincipal_);
410  lumiPrincipal_->setAux(aux);
411 
412  lumiPrincipal_->setRunPrincipal(principalCache_.runPrincipalPtr());
413 
415  espController_->eventSetupForInstance(ts);
416 
417  auto const& es = esp_->eventSetup();
418 
419  std::vector<edm::SubProcess> emptyList;
420  {
422  auto globalWaitTask = make_empty_waiting_task();
423  globalWaitTask->increment_ref_count();
424  beginGlobalTransitionAsync<Traits>(
425  WaitingTaskHolder(globalWaitTask.get()), *schedule_, *lumiPrincipal_, ts, es, serviceToken_, emptyList);
426  globalWaitTask->wait_for_all();
427  if (globalWaitTask->exceptionPtr() != nullptr) {
428  std::rethrow_exception(*(globalWaitTask->exceptionPtr()));
429  }
430  }
431  {
432  //To wait, the ref count has to be 1+#streams
433  auto streamLoopWaitTask = make_empty_waiting_task();
434  streamLoopWaitTask->increment_ref_count();
435 
437 
438  beginStreamsTransitionAsync<Traits>(streamLoopWaitTask.get(),
439  *schedule_,
442  ts,
443  es,
445  emptyList);
446 
447  streamLoopWaitTask->wait_for_all();
448  if (streamLoopWaitTask->exceptionPtr() != nullptr) {
449  std::rethrow_exception(*(streamLoopWaitTask->exceptionPtr()));
450  }
451  }
452  beginLumiCalled_ = true;
453  }
454 
456  auto pep = &(principalCache_.eventPrincipal(0));
457 
458  //this resets the EventPrincipal (if it had been used before)
459  pep->clearEventPrincipal();
460  pep->fillEventPrincipal(
463  nullptr);
464  assert(lumiPrincipal_.get() != nullptr);
465  pep->setLuminosityBlockPrincipal(lumiPrincipal_.get());
466 
467  for (auto& p : dataProducts_) {
468  if (p.second) {
469  pep->put(p.first, std::move(p.second), ProductProvenance(p.first.branchID()));
470  } else {
471  //The data product was not set so we need to
472  // tell the ProductResolver not to wait
473  auto r = pep->getProductResolver(p.first.branchID());
474  r->putProduct(std::unique_ptr<WrapperBase>());
475  }
476  }
477 
479 
480  auto waitTask = make_empty_waiting_task();
481  waitTask->increment_ref_count();
482 
483  schedule_->processOneEventAsync(
484  edm::WaitingTaskHolder(waitTask.get()), 0, *pep, esp_->eventSetup(), serviceToken_);
485 
486  waitTask->wait_for_all();
487  if (waitTask->exceptionPtr() != nullptr) {
488  std::rethrow_exception(*(waitTask->exceptionPtr()));
489  }
490  ++eventNumber_;
491  }
492 
493  std::shared_ptr<LuminosityBlockPrincipal> TestProcessor::endLuminosityBlock() {
494  auto lumiPrincipal = lumiPrincipal_;
495  if (beginLumiCalled_) {
496  beginLumiCalled_ = false;
497  lumiPrincipal_.reset();
498 
499  IOVSyncValue ts(EventID(runNumber_, lumiNumber_, eventNumber_), lumiPrincipal->endTime());
500  espController_->eventSetupForInstance(ts);
501 
502  auto const& es = esp_->eventSetup();
503 
504  std::vector<edm::SubProcess> emptyList;
505 
506  //To wait, the ref count has to be 1+#streams
507  {
508  auto streamLoopWaitTask = make_empty_waiting_task();
509  streamLoopWaitTask->increment_ref_count();
510 
512 
513  endStreamsTransitionAsync<Traits>(WaitingTaskHolder(streamLoopWaitTask.get()),
514  *schedule_,
516  *lumiPrincipal,
517  ts,
518  es,
520  emptyList,
521  false);
522 
523  streamLoopWaitTask->wait_for_all();
524  if (streamLoopWaitTask->exceptionPtr() != nullptr) {
525  std::rethrow_exception(*(streamLoopWaitTask->exceptionPtr()));
526  }
527  }
528  {
529  auto globalWaitTask = make_empty_waiting_task();
530  globalWaitTask->increment_ref_count();
531 
533  endGlobalTransitionAsync<Traits>(WaitingTaskHolder(globalWaitTask.get()),
534  *schedule_,
535  *lumiPrincipal,
536  ts,
537  es,
539  emptyList,
540  false);
541  globalWaitTask->wait_for_all();
542  if (globalWaitTask->exceptionPtr() != nullptr) {
543  std::rethrow_exception(*(globalWaitTask->exceptionPtr()));
544  }
545  }
546  }
547  return lumiPrincipal;
548  }
549 
550  std::shared_ptr<edm::RunPrincipal> TestProcessor::endRun() {
551  std::shared_ptr<RunPrincipal> rp;
552  if (beginRunCalled_) {
553  beginRunCalled_ = false;
554  ProcessHistoryID phid;
556  RunPrincipal& runPrincipal = *rp;
557 
558  IOVSyncValue ts(
560  runPrincipal.endTime());
561  espController_->eventSetupForInstance(ts);
562 
563  auto const& es = esp_->eventSetup();
564 
565  std::vector<edm::SubProcess> emptyList;
566 
567  //To wait, the ref count has to be 1+#streams
568  {
569  auto streamLoopWaitTask = make_empty_waiting_task();
570  streamLoopWaitTask->increment_ref_count();
571 
573 
574  endStreamsTransitionAsync<Traits>(WaitingTaskHolder(streamLoopWaitTask.get()),
575  *schedule_,
577  runPrincipal,
578  ts,
579  es,
581  emptyList,
582  false);
583 
584  streamLoopWaitTask->wait_for_all();
585  if (streamLoopWaitTask->exceptionPtr() != nullptr) {
586  std::rethrow_exception(*(streamLoopWaitTask->exceptionPtr()));
587  }
588  }
589  {
590  auto globalWaitTask = make_empty_waiting_task();
591  globalWaitTask->increment_ref_count();
592 
594  endGlobalTransitionAsync<Traits>(WaitingTaskHolder(globalWaitTask.get()),
595  *schedule_,
596  runPrincipal,
597  ts,
598  es,
600  emptyList,
601  false);
602  globalWaitTask->wait_for_all();
603  if (globalWaitTask->exceptionPtr() != nullptr) {
604  std::rethrow_exception(*(globalWaitTask->exceptionPtr()));
605  }
606  }
607 
609  }
610  return rp;
611  }
612 
614  // Collects exceptions, so we don't throw before all operations are performed.
616  "Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
617 
618  //make the services available
620 
621  //NOTE: this really should go elsewhere in the future
622  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
623  c.call([this, i]() { this->schedule_->endStream(i); });
624  }
625  auto actReg = actReg_.get();
626  c.call([actReg]() { actReg->preEndJobSignal_(); });
627  schedule_->endJob(c);
628  c.call([actReg]() { actReg->postEndJobSignal_(); });
629  if (c.hasThrown()) {
630  c.rethrow();
631  }
632  }
633 
635  if (beginRunCalled_) {
637  endRun();
638  }
639  runNumber_ = iRun;
640  }
643  lumiNumber_ = iLumi;
644  }
645 
647 
648  //
649  // const member functions
650  //
651 
652  //
653  // static member functions
654  //
655 
656  } // namespace test
657 } // namespace edm
std::shared_ptr< ActivityRegistry > actReg_
Definition: ScheduleItems.h:73
void insert(std::shared_ptr< RunPrincipal > rp)
void put(std::pair< edm::EDPutTokenT< T >, std::unique_ptr< T >> &&iPut)
std::shared_ptr< LuminosityBlockPrincipal > endLuminosityBlock()
std::shared_ptr< ActivityRegistry > actReg_
std::unique_ptr< edm::ParameterSet > parameterSet() const
#define xstr(s)
static PluginManager & configure(const Config &)
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
std::unique_ptr< ExceptionToActionTable const > act_table_
Definition: ScheduleItems.h:78
edm::test::LuminosityBlock testBeginLuminosityBlockImpl(edm::LuminosityBlockNumber_t iNum, std::pair< edm::test::ESPutTokenT< T >, std::unique_ptr< T >> &&iPut, U &&...iArgs)
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
Definition: ScheduleItems.h:59
std::vector< std::pair< edm::BranchDescription, std::unique_ptr< WrapperBase > > > dataProducts_
ParameterSetID id() const
static LuminosityBlockNumber_t maxLuminosityBlockNumber()
bool registerProcessHistory(ProcessHistory const &processHistory)
LuminosityBlockNumber_t lumiNumber_
unsigned long long EventNumber_t
edm::test::LuminosityBlock testEndLuminosityBlockImpl()
EventNumber_t eventNumber_
RunNumber_t run() const
Definition: RunPrincipal.h:60
ProcessContext processContext_
unsigned int LuminosityBlockNumber_t
void emplace_back(Args &&...args)
PreallocationConfiguration preallocations_
void validateTopLevelParameterSets(ParameterSet *processParameterSet)
void setEventNumber(edm::EventNumber_t)
std::shared_ptr< BranchIDListHelper > branchIDListHelper_
PluginManager::Config config()
Definition: standard.cc:21
std::shared_ptr< EventSetupTestHelper > esHelper_
std::shared_ptr< CommonParams > initMisc(ParameterSet &parameterSet)
std::shared_ptr< ThinnedAssociationsHelper > thinnedAssociationsHelper_
Timestamp const & beginTime() const
Definition: RunPrincipal.h:66
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
std::string labelOfTestModule_
std::unique_ptr< eventsetup::EventSetupsController > espController_
edm::test::Event testImpl()
std::shared_ptr< eventsetup::EventSetupProvider > esp_
std::vector< ProduceEntry > const & produceEntries() const
Timestamp const & endTime() const
Definition: RunPrincipal.h:68
std::vector< ESProduceEntry > const & esProduceEntries() const
std::unique_ptr< Schedule > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
#define noexcept
void setProcessHistoryRegistry(ProcessHistoryRegistry const &phr)
std::shared_ptr< ProcessConfiguration const > processConfiguration() const
Definition: ScheduleItems.h:68
void setLuminosityBlockNumber(edm::LuminosityBlockNumber_t)
PrincipalCache principalCache_
ServiceToken initServices(std::vector< ParameterSet > &servicePSets, ParameterSet &processPSet, ServiceToken const &iToken, serviceregistry::ServiceLegacy iLegacy, bool associate)
std::shared_ptr< ProductRegistry > preg_
edm::test::Run testBeginRunImpl(edm::RunNumber_t iNum, std::pair< edm::test::ESPutTokenT< T >, std::unique_ptr< T >> &&iPut, U &&...iArgs)
void deleteRun(ProcessHistoryID const &phid, RunNumber_t run)
std::unique_ptr< ExceptionToActionTable const > act_table_
ServiceToken addCPRandTNS(ParameterSet const &parameterSet, ServiceToken const &token)
static EventNumber_t maxEventNumber()
Definition: EventID.h:111
std::shared_ptr< RunPrincipal > const & runPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run) const
edm::test::Run testEndRunImpl()
void setRunNumber(edm::RunNumber_t)
HLT enums.
std::unique_ptr< HistoryAppender > historyAppender_
std::shared_ptr< SignallingProductRegistry const > preg() const
Definition: ScheduleItems.h:53
Definition: Config.py:1
TestProcessor(Config const &iConfig, ServiceToken iToken=ServiceToken())
std::shared_ptr< LuminosityBlockPrincipal > getAvailableLumiPrincipalPtr()
unsigned int RunNumber_t
void call(std::function< void(void)>)
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
Definition: ScheduleItems.h:55
std::shared_ptr< LuminosityBlockPrincipal > lumiPrincipal_
static ParentageRegistry * instance()
ProcessHistoryRegistry processHistoryRegistry_
std::unique_ptr< Schedule > initSchedule(ParameterSet &parameterSet, bool hasSubprocesses, PreallocationConfiguration const &iAllocConfig, ProcessContext const *)
ParameterSet const & registerIt()
std::shared_ptr< RunPrincipal > endRun()
std::string const & pythonConfiguration() const
Definition: TestProcessor.h:89
bool insertMapped(value_type const &v)
def move(src, dest)
Definition: eostools.py:511
std::vector< std::string > const & extraProcesses() const
Definition: TestProcessor.h:99
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const
def operate(timelog, memlog, json_f, num)