CMS 3D CMS Logo

TestProcessor.cc
Go to the documentation of this file.
1 // -*- C++ -*-
2 //
3 // Package: Subsystem/Package
4 // Class : TestProcessor
5 //
6 // Implementation:
7 // [Notes on implementation]
8 //
9 // Original Author: root
10 // Created: Mon, 30 Apr 2018 18:51:08 GMT
11 //
12 
13 // system include files
14 
15 // user include files
18 
30 
33 
36 
40 
42 
44 
45 #include "tbb/task_scheduler_init.h"
46 
47 #define xstr(s) str(s)
48 #define str(s) #s
49 
50 namespace edm {
51  namespace test {
52 
53  //
54  // constants, enums and typedefs
55  //
56 
57  //
58  // static data member definitions
59  //
60  namespace {
61 
62  bool oneTimeInitializationImpl() {
64 
65  static std::unique_ptr<tbb::task_scheduler_init> tsiPtr = std::make_unique<tbb::task_scheduler_init>(1);
66 
67  // register the empty parentage vector , once and for all
69 
70  // register the empty parameter set, once and for all.
71  ParameterSet().registerIt();
72  return true;
73  }
74 
75  bool oneTimeInitialization() {
76  static const bool s_init{oneTimeInitializationImpl()};
77  return s_init;
78  }
79  } // namespace
80 
81  //
82  // constructors and destructor
83  //
85  : espController_(std::make_unique<eventsetup::EventSetupsController>()),
86  historyAppender_(std::make_unique<HistoryAppender>()),
87  moduleRegistry_(std::make_shared<ModuleRegistry>()) {
88  //Setup various singletons
89  (void)oneTimeInitialization();
90 
91  ProcessDescImpl desc(iConfig.pythonConfiguration());
92 
93  auto psetPtr = desc.parameterSet();
94 
95  validateTopLevelParameterSets(psetPtr.get());
96 
97  labelOfTestModule_ = psetPtr->getParameter<std::string>("@moduleToTest");
98 
99  auto procDesc = desc.processDesc();
100  // Now do general initialization
102 
103  //initialize the services
104  auto& serviceSets = procDesc->getServicesPSets();
106  items.initServices(serviceSets, *psetPtr, ServiceToken(), serviceregistry::kOverlapIsError, true);
107  serviceToken_ = items.addCPRandTNS(*psetPtr, token);
108 
109  //make the services available
111 
112  // intialize miscellaneous items
113  std::shared_ptr<CommonParams> common(items.initMisc(*psetPtr));
114 
115  // intialize the event setup provider
116  esp_ = espController_->makeProvider(*psetPtr, items.actReg_.get());
117 
118  if (not iConfig.esProduceEntries().empty()) {
119  esHelper_ = std::make_unique<EventSetupTestHelper>(iConfig.esProduceEntries());
120  esp_->add(std::dynamic_pointer_cast<eventsetup::DataProxyProvider>(esHelper_));
121  esp_->add(std::dynamic_pointer_cast<EventSetupRecordIntervalFinder>(esHelper_));
122  }
123 
124  auto nThreads = 1U;
125  auto nStreams = 1U;
126  auto nConcurrentLumis = 1U;
127  auto nConcurrentRuns = 1U;
128  preallocations_ = PreallocationConfiguration{nThreads, nStreams, nConcurrentLumis, nConcurrentRuns};
129 
130  preg_ = items.preg();
132 
133  edm::ParameterSet emptyPSet;
134  emptyPSet.registerIt();
135  auto psetid = emptyPSet.id();
136 
137  ProcessHistory oldHistory;
138  for (auto const& p : iConfig.extraProcesses()) {
139  oldHistory.emplace_back(p, psetid, xstr(PROJECT_VERSION), "0");
141  }
142 
143  //setup the products we will be adding to the event
144  for (auto const& produce : iConfig.produceEntries()) {
145  auto processName = produce.processName_;
146  if (processName.empty()) {
147  processName = processConfiguration_->processName();
148  }
149  edm::TypeWithDict twd(produce.type_.typeInfo());
151  produce.moduleLabel_,
152  processName,
153  twd.userClassName(),
154  twd.friendlyClassName(),
155  produce.instanceLabel_,
156  "",
157  psetid,
158  twd,
159  true //force this to come from 'source'
160  );
161  product.init();
162  dataProducts_.emplace_back(product, std::unique_ptr<WrapperBase>());
163  preg_->addProduct(product);
164  }
165 
166  schedule_ = items.initSchedule(*psetPtr, false, preallocations_, &processContext_);
167  // set the data members
169  actReg_ = items.actReg_;
174 
176 
177  preg_->setFrozen();
178  for (unsigned int index = 0; index < preallocations_.numberOfStreams(); ++index) {
179  // Reusable event principal
180  auto ep = std::make_shared<EventPrincipal>(preg_,
184  historyAppender_.get(),
185  index);
187  }
188 
189  for (unsigned int index = 0; index < preallocations_.numberOfLuminosityBlocks(); ++index) {
190  auto lp =
191  std::make_unique<LuminosityBlockPrincipal>(preg_, *processConfiguration_, historyAppender_.get(), index);
193  }
194  }
195 
197  //
198  // member functions
199  //
200 
201  void TestProcessor::put(unsigned int index, std::unique_ptr<WrapperBase> iWrapper) {
202  if (index >= dataProducts_.size()) {
203  throw cms::Exception("LogicError") << "Products must be declared to the TestProcessor::Config object\n"
204  "with a call to the function \'produces\' BEFORE passing the\n"
205  "TestProcessor::Config object to the TestProcessor constructor";
206  }
207  dataProducts_[index].second = std::move(iWrapper);
208  }
209 
211  setupProcessing();
212  event();
213 
214  bool result = schedule_->totalEventsPassed() > 0;
215  schedule_->clearCounters();
216  if (esHelper_) {
217  //We want each test to have its own ES data products
218  esHelper_->resetAllProxies();
219  }
220  return edm::test::Event(
222  }
223 
225  if (not beginJobCalled_) {
226  beginJob();
227  }
228  if (not beginRunCalled_) {
229  beginRun();
230  }
231  if (not beginLumiCalled_) {
233  }
234  }
235 
237  if (beginLumiCalled_) {
239  beginLumiCalled_ = false;
240  }
241  if (beginRunCalled_) {
242  endRun();
243  beginRunCalled_ = false;
244  }
245  if (beginJobCalled_) {
246  endJob();
247  }
248  }
249 
252 
257  actReg_->preallocateSignal_(bounds);
258  schedule_->convertCurrentProcessAlias(processConfiguration_->processName());
259  PathsAndConsumesOfModules pathsAndConsumesOfModules;
260 
261  //The code assumes only modules make data in the current process
262  // Since the test os also allowed to do so, it can lead to problems.
263  //pathsAndConsumesOfModules.initialize(schedule_.get(), preg_);
264 
265  //NOTE: this may throw
266  //checkForModuleDependencyCorrectness(pathsAndConsumesOfModules, false);
267  actReg_->preBeginJobSignal_(pathsAndConsumesOfModules, processContext_);
268 
269  schedule_->beginJob(*preg_, esp_->recordsToProxyIndices());
270  actReg_->postBeginJobSignal_();
271 
272  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
273  schedule_->beginStream(i);
274  }
275  beginJobCalled_ = true;
276  }
277 
279  ProcessHistoryID phid;
280  auto aux = std::make_shared<RunAuxiliary>(runNumber_, Timestamp(), Timestamp());
281  auto rp = std::make_shared<RunPrincipal>(aux, preg_, *processConfiguration_, historyAppender_.get(), 0);
282 
284  RunPrincipal& runPrincipal = principalCache_.runPrincipal(phid, runNumber_);
285 
286  IOVSyncValue ts(EventID(runPrincipal.run(), 0, 0), runPrincipal.beginTime());
287  espController_->eventSetupForInstance(ts);
288 
289  auto const& es = esp_->eventSetupImpl();
290 
291  std::vector<edm::SubProcess> emptyList;
292  {
294  auto globalWaitTask = make_empty_waiting_task();
295  globalWaitTask->increment_ref_count();
296  beginGlobalTransitionAsync<Traits>(WaitingTaskHolder(globalWaitTask.get()),
297  *schedule_,
298  runPrincipal,
299  ts,
300  es,
301  nullptr,
303  emptyList);
304  globalWaitTask->wait_for_all();
305  if (globalWaitTask->exceptionPtr() != nullptr) {
306  std::rethrow_exception(*(globalWaitTask->exceptionPtr()));
307  }
308  }
309  {
310  //To wait, the ref count has to be 1+#streams
311  auto streamLoopWaitTask = make_empty_waiting_task();
312  streamLoopWaitTask->increment_ref_count();
313 
315 
316  beginStreamsTransitionAsync<Traits>(streamLoopWaitTask.get(),
317  *schedule_,
319  runPrincipal,
320  ts,
321  es,
322  nullptr,
324  emptyList);
325 
326  streamLoopWaitTask->wait_for_all();
327  if (streamLoopWaitTask->exceptionPtr() != nullptr) {
328  std::rethrow_exception(*(streamLoopWaitTask->exceptionPtr()));
329  }
330  }
331  beginRunCalled_ = true;
332  }
333 
337  assert(lumiPrincipal_);
338  lumiPrincipal_->setAux(aux);
339 
340  lumiPrincipal_->setRunPrincipal(principalCache_.runPrincipalPtr());
341 
343  espController_->eventSetupForInstance(ts);
344 
345  auto const& es = esp_->eventSetupImpl();
346 
347  std::vector<edm::SubProcess> emptyList;
348  {
350  auto globalWaitTask = make_empty_waiting_task();
351  globalWaitTask->increment_ref_count();
352  beginGlobalTransitionAsync<Traits>(WaitingTaskHolder(globalWaitTask.get()),
353  *schedule_,
355  ts,
356  es,
357  nullptr,
359  emptyList);
360  globalWaitTask->wait_for_all();
361  if (globalWaitTask->exceptionPtr() != nullptr) {
362  std::rethrow_exception(*(globalWaitTask->exceptionPtr()));
363  }
364  }
365  {
366  //To wait, the ref count has to be 1+#streams
367  auto streamLoopWaitTask = make_empty_waiting_task();
368  streamLoopWaitTask->increment_ref_count();
369 
371 
372  beginStreamsTransitionAsync<Traits>(streamLoopWaitTask.get(),
373  *schedule_,
376  ts,
377  es,
378  nullptr,
380  emptyList);
381 
382  streamLoopWaitTask->wait_for_all();
383  if (streamLoopWaitTask->exceptionPtr() != nullptr) {
384  std::rethrow_exception(*(streamLoopWaitTask->exceptionPtr()));
385  }
386  }
387  beginLumiCalled_ = true;
388  }
389 
391  auto pep = &(principalCache_.eventPrincipal(0));
392 
393  //this resets the EventPrincipal (if it had been used before)
394  pep->clearEventPrincipal();
395  pep->fillEventPrincipal(
398  nullptr);
399  assert(lumiPrincipal_.get() != nullptr);
400  pep->setLuminosityBlockPrincipal(lumiPrincipal_.get());
401 
402  for (auto& p : dataProducts_) {
403  if (p.second) {
404  pep->put(p.first, std::move(p.second), ProductProvenance(p.first.branchID()));
405  } else {
406  //The data product was not set so we need to
407  // tell the ProductResolver not to wait
408  auto r = pep->getProductResolver(p.first.branchID());
409  r->putProduct(std::unique_ptr<WrapperBase>());
410  }
411  }
412 
414 
415  auto waitTask = make_empty_waiting_task();
416  waitTask->increment_ref_count();
417 
418  schedule_->processOneEventAsync(
419  edm::WaitingTaskHolder(waitTask.get()), 0, *pep, esp_->eventSetupImpl(), serviceToken_);
420 
421  waitTask->wait_for_all();
422  if (waitTask->exceptionPtr() != nullptr) {
423  std::rethrow_exception(*(waitTask->exceptionPtr()));
424  }
425  ++eventNumber_;
426  }
427 
429  if (beginLumiCalled_) {
430  beginLumiCalled_ = false;
431  auto lumiPrincipal = lumiPrincipal_;
432  lumiPrincipal_.reset();
433 
434  IOVSyncValue ts(EventID(runNumber_, lumiNumber_, eventNumber_), lumiPrincipal->endTime());
435  espController_->eventSetupForInstance(ts);
436 
437  auto const& es = esp_->eventSetupImpl();
438 
439  std::vector<edm::SubProcess> emptyList;
440 
441  //To wait, the ref count has to be 1+#streams
442  {
443  auto streamLoopWaitTask = make_empty_waiting_task();
444  streamLoopWaitTask->increment_ref_count();
445 
447 
448  endStreamsTransitionAsync<Traits>(WaitingTaskHolder(streamLoopWaitTask.get()),
449  *schedule_,
451  *lumiPrincipal,
452  ts,
453  es,
454  nullptr,
456  emptyList,
457  false);
458 
459  streamLoopWaitTask->wait_for_all();
460  if (streamLoopWaitTask->exceptionPtr() != nullptr) {
461  std::rethrow_exception(*(streamLoopWaitTask->exceptionPtr()));
462  }
463  }
464  {
465  auto globalWaitTask = make_empty_waiting_task();
466  globalWaitTask->increment_ref_count();
467 
469  endGlobalTransitionAsync<Traits>(WaitingTaskHolder(globalWaitTask.get()),
470  *schedule_,
471  *lumiPrincipal,
472  ts,
473  es,
474  nullptr,
476  emptyList,
477  false);
478  globalWaitTask->wait_for_all();
479  if (globalWaitTask->exceptionPtr() != nullptr) {
480  std::rethrow_exception(*(globalWaitTask->exceptionPtr()));
481  }
482  }
483  }
484  }
485 
487  if (beginRunCalled_) {
488  beginRunCalled_ = false;
489  ProcessHistoryID phid;
490 
491  RunPrincipal& runPrincipal = principalCache_.runPrincipal(phid, runNumber_);
492 
493  IOVSyncValue ts(
495  runPrincipal.endTime());
496  espController_->eventSetupForInstance(ts);
497 
498  auto const& es = esp_->eventSetupImpl();
499 
500  std::vector<edm::SubProcess> emptyList;
501 
502  //To wait, the ref count has to be 1+#streams
503  {
504  auto streamLoopWaitTask = make_empty_waiting_task();
505  streamLoopWaitTask->increment_ref_count();
506 
508 
509  endStreamsTransitionAsync<Traits>(WaitingTaskHolder(streamLoopWaitTask.get()),
510  *schedule_,
512  runPrincipal,
513  ts,
514  es,
515  nullptr,
517  emptyList,
518  false);
519 
520  streamLoopWaitTask->wait_for_all();
521  if (streamLoopWaitTask->exceptionPtr() != nullptr) {
522  std::rethrow_exception(*(streamLoopWaitTask->exceptionPtr()));
523  }
524  }
525  {
526  auto globalWaitTask = make_empty_waiting_task();
527  globalWaitTask->increment_ref_count();
528 
530  endGlobalTransitionAsync<Traits>(WaitingTaskHolder(globalWaitTask.get()),
531  *schedule_,
532  runPrincipal,
533  ts,
534  es,
535  nullptr,
537  emptyList,
538  false);
539  globalWaitTask->wait_for_all();
540  if (globalWaitTask->exceptionPtr() != nullptr) {
541  std::rethrow_exception(*(globalWaitTask->exceptionPtr()));
542  }
543  }
544 
546  }
547  }
548 
550  // Collects exceptions, so we don't throw before all operations are performed.
552  "Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
553 
554  //make the services available
556 
557  //NOTE: this really should go elsewhere in the future
558  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
559  c.call([this, i]() { this->schedule_->endStream(i); });
560  }
561  auto actReg = actReg_.get();
562  c.call([actReg]() { actReg->preEndJobSignal_(); });
563  schedule_->endJob(c);
564  c.call([actReg]() { actReg->postEndJobSignal_(); });
565  if (c.hasThrown()) {
566  c.rethrow();
567  }
568  }
569 
571  if (beginRunCalled_) {
573  endRun();
574  }
575  runNumber_ = iRun;
576  }
579  lumiNumber_ = iLumi;
580  }
581 
583 
584  //
585  // const member functions
586  //
587 
588  //
589  // static member functions
590  //
591 
592  } // namespace test
593 } // namespace edm
std::shared_ptr< ActivityRegistry > actReg_
Definition: ScheduleItems.h:73
void insert(std::shared_ptr< RunPrincipal > rp)
void put(std::pair< edm::EDPutTokenT< T >, std::unique_ptr< T >> &&iPut)
std::shared_ptr< ActivityRegistry > actReg_
std::unique_ptr< edm::ParameterSet > parameterSet() const
#define xstr(s)
static PluginManager & configure(const Config &)
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
std::unique_ptr< ExceptionToActionTable const > act_table_
Definition: ScheduleItems.h:78
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
Definition: ScheduleItems.h:59
std::vector< std::pair< edm::BranchDescription, std::unique_ptr< WrapperBase > > > dataProducts_
ParameterSetID id() const
static LuminosityBlockNumber_t maxLuminosityBlockNumber()
bool registerProcessHistory(ProcessHistory const &processHistory)
LuminosityBlockNumber_t lumiNumber_
unsigned long long EventNumber_t
EventNumber_t eventNumber_
RunNumber_t run() const
Definition: RunPrincipal.h:60
ProcessContext processContext_
unsigned int LuminosityBlockNumber_t
void emplace_back(Args &&...args)
PreallocationConfiguration preallocations_
void validateTopLevelParameterSets(ParameterSet *processParameterSet)
void setEventNumber(edm::EventNumber_t)
std::shared_ptr< BranchIDListHelper > branchIDListHelper_
PluginManager::Config config()
Definition: standard.cc:21
std::shared_ptr< EventSetupTestHelper > esHelper_
std::shared_ptr< CommonParams > initMisc(ParameterSet &parameterSet)
std::shared_ptr< ThinnedAssociationsHelper > thinnedAssociationsHelper_
Timestamp const & beginTime() const
Definition: RunPrincipal.h:66
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
std::string labelOfTestModule_
std::unique_ptr< eventsetup::EventSetupsController > espController_
edm::test::Event testImpl()
std::shared_ptr< eventsetup::EventSetupProvider > esp_
std::vector< ProduceEntry > const & produceEntries() const
Timestamp const & endTime() const
Definition: RunPrincipal.h:68
std::vector< ESProduceEntry > const & esProduceEntries() const
std::unique_ptr< Schedule > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
TestProcessor(Config const &iConfig)
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
#define noexcept
void setProcessHistoryRegistry(ProcessHistoryRegistry const &phr)
std::shared_ptr< ProcessConfiguration const > processConfiguration() const
Definition: ScheduleItems.h:68
void setLuminosityBlockNumber(edm::LuminosityBlockNumber_t)
PrincipalCache principalCache_
ServiceToken initServices(std::vector< ParameterSet > &servicePSets, ParameterSet &processPSet, ServiceToken const &iToken, serviceregistry::ServiceLegacy iLegacy, bool associate)
std::shared_ptr< ProductRegistry > preg_
void deleteRun(ProcessHistoryID const &phid, RunNumber_t run)
std::unique_ptr< ExceptionToActionTable const > act_table_
ServiceToken addCPRandTNS(ParameterSet const &parameterSet, ServiceToken const &token)
static EventNumber_t maxEventNumber()
Definition: EventID.h:96
std::shared_ptr< RunPrincipal > const & runPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run) const
void setRunNumber(edm::RunNumber_t)
HLT enums.
std::unique_ptr< HistoryAppender > historyAppender_
std::shared_ptr< SignallingProductRegistry const > preg() const
Definition: ScheduleItems.h:53
Definition: Config.py:1
std::shared_ptr< LuminosityBlockPrincipal > getAvailableLumiPrincipalPtr()
unsigned int RunNumber_t
void call(std::function< void(void)>)
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
Definition: ScheduleItems.h:55
std::shared_ptr< LuminosityBlockPrincipal > lumiPrincipal_
static ParentageRegistry * instance()
ProcessHistoryRegistry processHistoryRegistry_
std::unique_ptr< Schedule > initSchedule(ParameterSet &parameterSet, bool hasSubprocesses, PreallocationConfiguration const &iAllocConfig, ProcessContext const *)
ParameterSet const & registerIt()
std::string const & pythonConfiguration() const
Definition: TestProcessor.h:87
bool insertMapped(value_type const &v)
def move(src, dest)
Definition: eostools.py:511
std::vector< std::string > const & extraProcesses() const
Definition: TestProcessor.h:97
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const