CMS 3D CMS Logo

TestProcessor.cc
Go to the documentation of this file.
1 // -*- C++ -*-
2 //
3 // Package: Subsystem/Package
4 // Class : TestProcessor
5 //
6 // Implementation:
7 // [Notes on implementation]
8 //
9 // Original Author: root
10 // Created: Mon, 30 Apr 2018 18:51:08 GMT
11 //
12 
13 // system include files
14 
15 // user include files
18 
29 
32 
35 
39 
41 
43 
44 #include "tbb/task_scheduler_init.h"
45 
46 #define xstr(s) str(s)
47 #define str(s) #s
48 
49 
50 namespace edm {
51 namespace test {
52 
53 //
54 // constants, enums and typedefs
55 //
56 
57 //
58 // static data member definitions
59 //
60  namespace {
61 
62  bool oneTimeInitializationImpl() {
63 
65 
66  static std::unique_ptr<tbb::task_scheduler_init> tsiPtr = std::make_unique<tbb::task_scheduler_init>(1);
67 
68  // register the empty parentage vector , once and for all
70 
71  // register the empty parameter set, once and for all.
72  ParameterSet().registerIt();
73  return true;
74  }
75 
76  bool oneTimeInitialization() {
77  static const bool s_init{ oneTimeInitializationImpl()};
78  return s_init;
79  }
80  }
81 
82 //
83 // constructors and destructor
84 //
86  espController_(std::make_unique<eventsetup::EventSetupsController>()),
87  historyAppender_(std::make_unique<HistoryAppender>()),
88  moduleRegistry_(std::make_shared<ModuleRegistry>())
89 {
90  //Setup various singletons
91  (void) oneTimeInitialization();
92 
93  ProcessDescImpl desc(iConfig.pythonConfiguration());
94 
95  auto psetPtr = desc.parameterSet();
96 
97  validateTopLevelParameterSets(psetPtr.get());
98 
99  labelOfTestModule_ = psetPtr->getParameter<std::string>("@moduleToTest");
100 
101  auto procDesc = desc.processDesc();
102  // Now do general initialization
104 
105  //initialize the services
106  auto& serviceSets = procDesc->getServicesPSets();
107  ServiceToken token = items.initServices(serviceSets, *psetPtr, ServiceToken(), serviceregistry::kOverlapIsError, true);
108  serviceToken_ = items.addCPRandTNS(*psetPtr, token);
109 
110  //make the services available
112 
113  // intialize miscellaneous items
114  std::shared_ptr<CommonParams> common(items.initMisc(*psetPtr));
115 
116  // intialize the event setup provider
117  esp_ = espController_->makeProvider(*psetPtr, items.actReg_.get());
118 
119  if(not iConfig.esProduceEntries().empty()) {
120  esHelper_ = std::make_unique<EventSetupTestHelper>(iConfig.esProduceEntries());
121  esp_->add( std::dynamic_pointer_cast<eventsetup::DataProxyProvider>(esHelper_));
122  esp_->add( std::dynamic_pointer_cast<EventSetupRecordIntervalFinder>(esHelper_));
123  }
124 
125  auto nThreads = 1U;
126  auto nStreams = 1U;
127  auto nConcurrentLumis = 1U;
128  auto nConcurrentRuns = 1U;
129  preallocations_ = PreallocationConfiguration{nThreads,nStreams,nConcurrentLumis,nConcurrentRuns};
130 
131  preg_ = items.preg();
133 
134 
135  edm::ParameterSet emptyPSet;
136  emptyPSet.registerIt();
137  auto psetid = emptyPSet.id();
138 
139  ProcessHistory oldHistory;
140  for(auto const&p: iConfig.extraProcesses() ) {
141  oldHistory.emplace_back(p, psetid, xstr(PROJECT_VERSION), "0");
143  }
144 
145  //setup the products we will be adding to the event
146  for(auto const& produce: iConfig.produceEntries()) {
147  auto processName = produce.processName_;
148  if(processName.empty()) {
149  processName =processConfiguration_->processName();
150  }
151  edm::TypeWithDict twd( produce.type_.typeInfo());
153  produce.moduleLabel_,
154  processName,
155  twd.userClassName(),
156  twd.friendlyClassName(),
157  produce.instanceLabel_,
158  "",
159  psetid,
160  twd,
161  true //force this to come from 'source'
162  );
163  product.init();
164  dataProducts_.emplace_back(product, std::unique_ptr<WrapperBase>());
165  preg_->addProduct(product);
166  }
167 
168 
169  schedule_ = items.initSchedule(*psetPtr,false,preallocations_,&processContext_);
170  // set the data members
172  actReg_ = items.actReg_;
177 
179 
180 
181 
182  preg_->setFrozen();
183  for(unsigned int index = 0; index<preallocations_.numberOfStreams(); ++index ) {
184  // Reusable event principal
185  auto ep = std::make_shared<EventPrincipal>(preg_, branchIDListHelper_,
188  }
189 
190  for(unsigned int index =0; index < preallocations_.numberOfLuminosityBlocks(); ++index) {
191  auto lp = std::make_unique<LuminosityBlockPrincipal>(preg_, *processConfiguration_,
192  historyAppender_.get(), index);
194  }
195 }
196 
199 }
200 //
201 // member functions
202 //
203 
204 
205 void
206 TestProcessor::put(unsigned int index, std::unique_ptr<WrapperBase> iWrapper) {
207  if (index >= dataProducts_.size()) {
208  throw cms::Exception("LogicError")
209  << "Products must be declared to the TestProcessor::Config object\n"
210  "with a call to the function \'produces\' BEFORE passing the\n"
211  "TestProcessor::Config object to the TestProcessor constructor";
212  }
213  dataProducts_[index].second = std::move(iWrapper);
214 }
215 
218  setupProcessing();
219  event();
220 
221  bool result = schedule_->totalEventsPassed()> 0;
222  schedule_->clearCounters();
223  if(esHelper_) {
224  //We want each test to have its own ES data products
225  esHelper_->resetAllProxies();
226  }
228 }
229 
230 
231 void
233  if(not beginJobCalled_) {
234  beginJob();
235  }
236  if(not beginRunCalled_) {
237  beginRun();
238  }
239  if(not beginLumiCalled_) {
241  }
242 }
243 
244 void
246  if(beginLumiCalled_) {
248  beginLumiCalled_ = false;
249  }
250  if(beginRunCalled_) {
251  endRun();
252  beginRunCalled_ = false;
253  }
254  if(beginJobCalled_) {
255  endJob();
256  }
257 }
258 
259 void
262 
267  actReg_->preallocateSignal_(bounds);
268  schedule_->convertCurrentProcessAlias(processConfiguration_->processName());
269  PathsAndConsumesOfModules pathsAndConsumesOfModules;
270 
271  //The code assumes only modules make data in the current process
272  // Since the test os also allowed to do so, it can lead to problems.
273  //pathsAndConsumesOfModules.initialize(schedule_.get(), preg_);
274 
275  //NOTE: this may throw
276  //checkForModuleDependencyCorrectness(pathsAndConsumesOfModules, false);
277  actReg_->preBeginJobSignal_(pathsAndConsumesOfModules, processContext_);
278 
279  schedule_->beginJob(*preg_);
280  actReg_->postBeginJobSignal_();
281 
282  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
283  schedule_->beginStream(i);
284  }
285  beginJobCalled_ = true;
286 }
287 
288 void
290  ProcessHistoryID phid;
291  auto aux = std::make_shared<RunAuxiliary>(runNumber_,Timestamp(),Timestamp());
292  auto rp = std::make_shared<RunPrincipal>(aux, preg_, *processConfiguration_, historyAppender_.get(), 0);
293 
295  RunPrincipal& runPrincipal = principalCache_.runPrincipal(phid, runNumber_);
296 
297  IOVSyncValue ts(EventID(runPrincipal.run(), 0, 0),
298  runPrincipal.beginTime());
299  espController_->eventSetupForInstance(ts);
300 
301  EventSetup const& es = esp_->eventSetup();
302 
303  std::vector<edm::SubProcess> emptyList;
304  {
306  auto globalWaitTask = make_empty_waiting_task();
307  globalWaitTask->increment_ref_count();
308  beginGlobalTransitionAsync<Traits>(WaitingTaskHolder(globalWaitTask.get()),
309  *schedule_,
310  runPrincipal,
311  ts,
312  es,
314  emptyList);
315  globalWaitTask->wait_for_all();
316  if(globalWaitTask->exceptionPtr() != nullptr) {
317  std::rethrow_exception(* (globalWaitTask->exceptionPtr()) );
318  }
319  }
320  {
321  //To wait, the ref count has to be 1+#streams
322  auto streamLoopWaitTask = make_empty_waiting_task();
323  streamLoopWaitTask->increment_ref_count();
324 
326 
327  beginStreamsTransitionAsync<Traits>(streamLoopWaitTask.get(),
328  *schedule_,
330  runPrincipal,
331  ts,
332  es,
334  emptyList);
335 
336  streamLoopWaitTask->wait_for_all();
337  if(streamLoopWaitTask->exceptionPtr() != nullptr) {
338  std::rethrow_exception(* (streamLoopWaitTask->exceptionPtr()) );
339  }
340  }
341  beginRunCalled_ = true;
342 }
343 
344 void
348  assert(lumiPrincipal_);
349  lumiPrincipal_->setAux(aux);
350 
351  lumiPrincipal_->setRunPrincipal(principalCache_.runPrincipalPtr());
352 
354  lumiPrincipal_->beginTime());
355  espController_->eventSetupForInstance(ts);
356 
357  EventSetup const& es = esp_->eventSetup();
358 
359  std::vector<edm::SubProcess> emptyList;
360  {
362  auto globalWaitTask = make_empty_waiting_task();
363  globalWaitTask->increment_ref_count();
364  beginGlobalTransitionAsync<Traits>(WaitingTaskHolder(globalWaitTask.get()),
365  *schedule_,
367  ts,
368  es,
370  emptyList);
371  globalWaitTask->wait_for_all();
372  if(globalWaitTask->exceptionPtr() != nullptr) {
373  std::rethrow_exception(* (globalWaitTask->exceptionPtr()) );
374  }
375  }
376  {
377  //To wait, the ref count has to be 1+#streams
378  auto streamLoopWaitTask = make_empty_waiting_task();
379  streamLoopWaitTask->increment_ref_count();
380 
382 
383  beginStreamsTransitionAsync<Traits>(streamLoopWaitTask.get(),
384  *schedule_,
387  ts,
388  es,
390  emptyList);
391 
392  streamLoopWaitTask->wait_for_all();
393  if(streamLoopWaitTask->exceptionPtr() != nullptr) {
394  std::rethrow_exception(* (streamLoopWaitTask->exceptionPtr()) );
395  }
396  }
397  beginLumiCalled_ = true;
398 }
399 
400 void
402  auto pep = &(principalCache_.eventPrincipal(0));
403 
404  //this resets the EventPrincipal (if it had been used before)
405  pep->clearEventPrincipal();
407  false),
409  nullptr
410  );
411  assert(lumiPrincipal_.get() != nullptr);
412  pep->setLuminosityBlockPrincipal(lumiPrincipal_.get());
413 
414  for(auto& p: dataProducts_) {
415  if(p.second) {
416  pep->put(p.first, std::move(p.second), ProductProvenance(p.first.branchID()));
417  } else {
418  //The data product was not set so we need to
419  // tell the ProductResolver not to wait
420  auto r = pep->getProductResolver(p.first.branchID());
421  r->putProduct(std::unique_ptr<WrapperBase>());
422  }
423  }
424 
426 
427  auto waitTask = make_empty_waiting_task();
428  waitTask->increment_ref_count();
429 
430  schedule_->processOneEventAsync(edm::WaitingTaskHolder(waitTask.get()),
431  0,*pep, esp_->eventSetup(), serviceToken_);
432 
433  waitTask->wait_for_all();
434  if(waitTask->exceptionPtr() != nullptr) {
435  std::rethrow_exception(* (waitTask->exceptionPtr()) );
436  }
437  ++eventNumber_;
438 }
439 
440 void
442  if(beginLumiCalled_) {
443  beginLumiCalled_ = false;
444  auto lumiPrincipal = lumiPrincipal_;
445  lumiPrincipal_.reset();
446 
448  lumiPrincipal->endTime());
449  espController_->eventSetupForInstance(ts);
450 
451  EventSetup const& es = esp_->eventSetup();
452 
453  std::vector<edm::SubProcess> emptyList;
454 
455  //To wait, the ref count has to be 1+#streams
456  {
457  auto streamLoopWaitTask = make_empty_waiting_task();
458  streamLoopWaitTask->increment_ref_count();
459 
461 
462  endStreamsTransitionAsync<Traits>(WaitingTaskHolder(streamLoopWaitTask.get()),
463  *schedule_,
465  *lumiPrincipal,
466  ts,
467  es,
469  emptyList,
470  false);
471 
472  streamLoopWaitTask->wait_for_all();
473  if(streamLoopWaitTask->exceptionPtr() != nullptr) {
474  std::rethrow_exception(* (streamLoopWaitTask->exceptionPtr()) );
475  }
476  }
477  {
478  auto globalWaitTask = make_empty_waiting_task();
479  globalWaitTask->increment_ref_count();
480 
482  endGlobalTransitionAsync<Traits>(WaitingTaskHolder(globalWaitTask.get()),
483  *schedule_,
484  *lumiPrincipal,
485  ts,
486  es,
488  emptyList,
489  false);
490  globalWaitTask->wait_for_all();
491  if(globalWaitTask->exceptionPtr() != nullptr) {
492  std::rethrow_exception(* (globalWaitTask->exceptionPtr()) );
493  }
494  }
495  }
496 }
497 
498 void
500  if(beginRunCalled_) {
501  beginRunCalled_ = false;
502  ProcessHistoryID phid;
503 
504  RunPrincipal& runPrincipal = principalCache_.runPrincipal(phid, runNumber_);
505 
507  runPrincipal.endTime());
508  espController_->eventSetupForInstance(ts);
509 
510  EventSetup const& es = esp_->eventSetup();
511 
512  std::vector<edm::SubProcess> emptyList;
513 
514  //To wait, the ref count has to be 1+#streams
515  {
516  auto streamLoopWaitTask = make_empty_waiting_task();
517  streamLoopWaitTask->increment_ref_count();
518 
520 
521  endStreamsTransitionAsync<Traits>(WaitingTaskHolder(streamLoopWaitTask.get()),
522  *schedule_,
524  runPrincipal,
525  ts,
526  es,
528  emptyList,
529  false);
530 
531  streamLoopWaitTask->wait_for_all();
532  if(streamLoopWaitTask->exceptionPtr() != nullptr) {
533  std::rethrow_exception(* (streamLoopWaitTask->exceptionPtr()) );
534  }
535  }
536  {
537  auto globalWaitTask = make_empty_waiting_task();
538  globalWaitTask->increment_ref_count();
539 
541  endGlobalTransitionAsync<Traits>(WaitingTaskHolder(globalWaitTask.get()),
542  *schedule_,
543  runPrincipal,
544  ts,
545  es,
547  emptyList,
548  false);
549  globalWaitTask->wait_for_all();
550  if(globalWaitTask->exceptionPtr() != nullptr) {
551  std::rethrow_exception(* (globalWaitTask->exceptionPtr()) );
552  }
553  }
554 
556  }
557 }
558 
559 void
561  // Collects exceptions, so we don't throw before all operations are performed.
562  ExceptionCollector c("Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
563 
564  //make the services available
566 
567  //NOTE: this really should go elsewhere in the future
568  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
569  c.call([this,i](){this->schedule_->endStream(i);});
570  }
571  auto actReg = actReg_.get();
572  c.call([actReg](){actReg->preEndJobSignal_();});
573  schedule_->endJob(c);
574  c.call([actReg](){actReg->postEndJobSignal_();});
575  if(c.hasThrown()) {
576  c.rethrow();
577  }
578 }
579 
580 void
582  if(beginRunCalled_) {
584  endRun();
585  }
586  runNumber_=iRun;
587 
588 }
589 void
592  lumiNumber_ = iLumi;
593 }
594 
595 void
597  eventNumber_ = iEv;
598 }
599 
600 //
601 // const member functions
602 //
603 
604 //
605 // static member functions
606 //
607 
608 }
609 }
610 
std::shared_ptr< ActivityRegistry > actReg_
Definition: ScheduleItems.h:68
void insert(std::shared_ptr< RunPrincipal > rp)
void put(std::pair< edm::EDPutTokenT< T >, std::unique_ptr< T >> &&iPut)
std::shared_ptr< ActivityRegistry > actReg_
#define xstr(s)
static PluginManager & configure(const Config &)
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
std::unique_ptr< edm::ParameterSet > parameterSet() const
std::unique_ptr< ExceptionToActionTable const > act_table_
Definition: ScheduleItems.h:73
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
Definition: ScheduleItems.h:62
ParameterSetID id() const
static LuminosityBlockNumber_t maxLuminosityBlockNumber()
bool registerProcessHistory(ProcessHistory const &processHistory)
LuminosityBlockNumber_t lumiNumber_
unsigned long long EventNumber_t
#define noexcept
EventNumber_t eventNumber_
RunNumber_t run() const
Definition: RunPrincipal.h:65
ProcessContext processContext_
unsigned int LuminosityBlockNumber_t
void emplace_back(Args &&...args)
PreallocationConfiguration preallocations_
void validateTopLevelParameterSets(ParameterSet *processParameterSet)
void setEventNumber(edm::EventNumber_t)
std::shared_ptr< BranchIDListHelper > branchIDListHelper_
PluginManager::Config config()
Definition: standard.cc:21
std::shared_ptr< EventSetupTestHelper > esHelper_
std::shared_ptr< CommonParams > initMisc(ParameterSet &parameterSet)
std::shared_ptr< ThinnedAssociationsHelper > thinnedAssociationsHelper_
Timestamp const & beginTime() const
Definition: RunPrincipal.h:77
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
std::string labelOfTestModule_
std::unique_ptr< eventsetup::EventSetupsController > espController_
edm::test::Event testImpl()
std::shared_ptr< eventsetup::EventSetupProvider > esp_
std::vector< ProduceEntry > const & produceEntries() const
Timestamp const & endTime() const
Definition: RunPrincipal.h:81
std::vector< ESProduceEntry > const & esProduceEntries() const
std::unique_ptr< Schedule > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
TestProcessor(Config const &iConfig)
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
void setProcessHistoryRegistry(ProcessHistoryRegistry const &phr)
std::shared_ptr< ProcessConfiguration const > processConfiguration() const
Definition: ScheduleItems.h:65
void setLuminosityBlockNumber(edm::LuminosityBlockNumber_t)
PrincipalCache principalCache_
ServiceToken initServices(std::vector< ParameterSet > &servicePSets, ParameterSet &processPSet, ServiceToken const &iToken, serviceregistry::ServiceLegacy iLegacy, bool associate)
std::shared_ptr< ProductRegistry > preg_
void deleteRun(ProcessHistoryID const &phid, RunNumber_t run)
std::unique_ptr< ExceptionToActionTable const > act_table_
ServiceToken addCPRandTNS(ParameterSet const &parameterSet, ServiceToken const &token)
static EventNumber_t maxEventNumber()
Definition: EventID.h:111
std::shared_ptr< RunPrincipal > const & runPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run) const
void setRunNumber(edm::RunNumber_t)
HLT enums.
std::unique_ptr< HistoryAppender > historyAppender_
std::shared_ptr< SignallingProductRegistry const > preg() const
Definition: ScheduleItems.h:58
Definition: Config.py:1
std::shared_ptr< LuminosityBlockPrincipal > getAvailableLumiPrincipalPtr()
unsigned int RunNumber_t
void call(std::function< void(void)>)
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
Definition: ScheduleItems.h:60
std::shared_ptr< LuminosityBlockPrincipal > lumiPrincipal_
static ParentageRegistry * instance()
ProcessHistoryRegistry processHistoryRegistry_
std::unique_ptr< Schedule > initSchedule(ParameterSet &parameterSet, bool hasSubprocesses, PreallocationConfiguration const &iAllocConfig, ProcessContext const *)
ParameterSet const & registerIt()
std::vector< std::pair< edm::BranchDescription, std::unique_ptr< WrapperBase > > > dataProducts_
std::string const & pythonConfiguration() const
Definition: TestProcessor.h:88
bool insertMapped(value_type const &v)
def move(src, dest)
Definition: eostools.py:511
std::vector< std::string > const & extraProcesses() const
Definition: TestProcessor.h:98
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const
def operate(timelog, memlog, json_f, num)