CMS 3D CMS Logo

OutputModuleBase.cc
Go to the documentation of this file.
1 // -*- C++ -*-
2 //
3 // Package: FWCore/Framework
4 // Class : OutputModuleBase
5 //
6 // Implementation:
7 // [Notes on implementation]
8 //
9 //
10 
11 // system include files
12 #include <cassert>
13 
14 // user include files
16 
37 
38 
39 namespace edm {
40  namespace limited {
41 
42  // -------------------------------------------------------
44  maxEvents_(-1),
45  remainingEvents_(maxEvents_),
46  keptProducts_(),
47  hasNewlyDroppedBranch_(),
48  process_name_(),
49  productSelectorRules_(pset, "outputCommands", "OutputModule"),
50  productSelector_(),
51  moduleDescription_(),
52  wantAllEvents_(false),
53  selectors_(),
54  selector_config_id_(),
55  droppedBranchIDToKeptBranchID_(),
56  branchIDLists_(new BranchIDLists),
57  origBranchIDLists_(nullptr),
58  thinnedAssociationsHelper_(new ThinnedAssociationsHelper),
59  queue_(pset.getUntrackedParameter<unsigned int>("concurrencyLimit")) {
60 
61  hasNewlyDroppedBranch_.fill(false);
62 
64  process_name_ = tns->getProcessName();
65 
67  pset.getUntrackedParameterSet("SelectEvents", ParameterSet());
68 
69  selectEvents_.registerIt(); // Just in case this PSet is not registered
70 
72 
73  //need to set wantAllEvents_ in constructor
74  // we will make the remaining selectors once we know how many streams
75  selectors_.resize(1);
79  selectors_[0],
81 
82  }
83 
87  }
88 
91  if(productSelector_.initialized()) return;
93 
94  // TODO: See if we can collapse keptProducts_ and productSelector_ into a
95  // single object. See the notes in the header for ProductSelector
96  // for more information.
97 
98  std::map<BranchID, BranchDescription const*> trueBranchIDToKeptBranchDesc;
99  std::vector<BranchDescription const*> associationDescriptions;
100  std::set<BranchID> keptProductsInEvent;
101  std::set<std::string> processesWithSelectedMergeableRunProducts;
102 
103  for(auto const& it : preg.productList()) {
104  BranchDescription const& desc = it.second;
105  if(desc.transient()) {
106  // if the class of the branch is marked transient, output nothing
107  } else if(!desc.present() && !desc.produced()) {
108  // else if the branch containing the product has been previously dropped,
109  // output nothing
110  } else if(desc.unwrappedType() == typeid(ThinnedAssociation)) {
111  associationDescriptions.push_back(&desc);
112  } else if(selected(desc)) {
113  keepThisBranch(desc, trueBranchIDToKeptBranchDesc, keptProductsInEvent);
115  processesWithSelectedMergeableRunProducts);
116  } else {
117  // otherwise, output nothing,
118  // and mark the fact that there is a newly dropped branch of this type.
119  hasNewlyDroppedBranch_[desc.branchType()] = true;
120  }
121  }
122 
123  setProcessesWithSelectedMergeableRunProducts(processesWithSelectedMergeableRunProducts);
124 
125  thinnedAssociationsHelper.selectAssociationProducts(associationDescriptions,
126  keptProductsInEvent,
128 
129  for(auto association : associationDescriptions) {
130  if(keepAssociation_[association->branchID()]) {
131  keepThisBranch(*association, trueBranchIDToKeptBranchDesc, keptProductsInEvent);
132  } else {
133  hasNewlyDroppedBranch_[association->branchType()] = true;
134  }
135  }
136 
137  // Now fill in a mapping needed in the case that a branch was dropped while its EDAlias was kept.
138  ProductSelector::fillDroppedToKept(preg, trueBranchIDToKeptBranchDesc, droppedBranchIDToKeptBranchID_);
139 
140  thinnedAssociationsHelper_->updateFromParentProcess(thinnedAssociationsHelper, keepAssociation_, droppedBranchIDToKeptBranchID_);
141  }
142 
144  if(!droppedBranchIDToKeptBranchID_.empty()) {
145  // Make a private copy of the BranchIDLists.
147  // Check for branches dropped while an EDAlias was kept.
148  for(BranchIDList& branchIDList : *branchIDLists_) {
149  for(BranchID::value_type& branchID : branchIDList) {
150  // Replace BranchID of each dropped branch with that of the kept
151  // alias, so the alias branch will have the product ID of the original branch.
152  std::map<BranchID::value_type, BranchID::value_type>::const_iterator iter = droppedBranchIDToKeptBranchID_.find(branchID);
153  if(iter != droppedBranchIDToKeptBranchID_.end()) {
154  branchID = iter->second;
155  }
156  }
157  }
158  }
159  }
160 
162  std::map<BranchID, BranchDescription const*>& trueBranchIDToKeptBranchDesc,
163  std::set<BranchID>& keptProductsInEvent) {
164 
166  trueBranchIDToKeptBranchDesc);
167 
168  EDGetToken token;
169  switch (desc.branchType()) {
170  case InEvent:
171  {
172  if(desc.produced()) {
173  keptProductsInEvent.insert(desc.originalBranchID());
174  } else {
175  keptProductsInEvent.insert(desc.branchID());
176  }
178  InputTag{desc.moduleLabel(),
179  desc.productInstanceName(),
180  desc.processName()});
181  break;
182  }
183  case InLumi:
184  {
185  token = consumes<InLumi>(TypeToGet{desc.unwrappedTypeID(),PRODUCT_TYPE},
186  InputTag(desc.moduleLabel(),
187  desc.productInstanceName(),
188  desc.processName()));
189  break;
190  }
191  case InRun:
192  {
193  token = consumes<InRun>(TypeToGet{desc.unwrappedTypeID(),PRODUCT_TYPE},
194  InputTag(desc.moduleLabel(),
195  desc.productInstanceName(),
196  desc.processName()));
197  break;
198  }
199  default:
200  assert(false);
201  break;
202  }
203  // Now put it in the list of selected branches.
204  keptProducts_[desc.branchType()].push_back(std::make_pair(&desc, token));
205  }
206 
208 
210  auto nstreams = iPC.numberOfStreams();
211  selectors_.resize(nstreams);
212 
213  bool seenFirst = false;
214  for(auto& s : selectors_) {
215  if(seenFirst) {
219  s,
221  } else {
222  seenFirst = true;
223  }
224  }
225  preallocStreams(nstreams);
227  preallocate(iPC);
228  }
229 
231  this->beginJob();
232  }
233 
235  endJob();
236  }
237 
239  return !wantAllEvents_;
240  }
241 
242  std::vector<ProductResolverIndexAndSkipBit>
244  std::vector<ProductResolverIndexAndSkipBit> returnValue;
245  auto const& s = selectors_[0];
246  auto const n = s.numberOfTokens();
247  returnValue.reserve(n);
248 
249  for(unsigned int i=0; i< n;++i) {
250  returnValue.emplace_back(uncheckedIndexFrom(s.token(i)));
251  }
252  return returnValue;
253  }
254 
256  if(wantAllEvents_) return true;
257  auto& s = selectors_[id.value()];
259  e.setConsumer(this);
260  return s.wantEvent(e);
261  }
262 
263  bool
265  EventSetup const&,
266  ActivityRegistry* act,
267  ModuleCallingContext const* mcc) {
268 
269  {
271  e.setConsumer(this);
272  EventSignalsSentry sentry(act,mcc);
273  write(e);
274  }
275 
276  auto remainingEvents = remainingEvents_.load();
277  bool keepTrying = remainingEvents > 0;
278  while(keepTrying) {
279  auto newValue = remainingEvents - 1;
280  keepTrying = !remainingEvents_.compare_exchange_strong(remainingEvents, newValue);
281  if(keepTrying) {
282  // the exchange failed because the value was changed by another thread.
283  // remainingEvents was changed to be the new value of remainingEvents_;
284  keepTrying = remainingEvents > 0;
285  }
286  }
287  return true;
288  }
289 
290  bool
292  EventSetup const&,
293  ModuleCallingContext const* mcc) {
294  RunForOutput r(rp, moduleDescription_, mcc, false);
295  r.setConsumer(this);
296  doBeginRun_(r);
297  return true;
298  }
299 
300  bool
302  EventSetup const&,
303  ModuleCallingContext const* mcc) {
304  RunForOutput r(rp, moduleDescription_, mcc, true);
305  r.setConsumer(this);
306  doEndRun_(r);
307  return true;
308  }
309 
310  void
312  ModuleCallingContext const* mcc,
313  MergeableRunProductMetadata const* mrpm) {
314  RunForOutput r(rp, moduleDescription_, mcc, true, mrpm);
315  r.setConsumer(this);
316  writeRun(r);
317  }
318 
319  bool
321  EventSetup const&,
322  ModuleCallingContext const* mcc) {
323  LuminosityBlockForOutput lb(lbp, moduleDescription_, mcc, false);
324  lb.setConsumer(this);
326  return true;
327  }
328 
329  bool
331  EventSetup const&,
332  ModuleCallingContext const* mcc) {
333  LuminosityBlockForOutput lb(lbp, moduleDescription_, mcc, true);
334  lb.setConsumer(this);
336  return true;
337  }
338 
340  ModuleCallingContext const* mcc) {
341  LuminosityBlockForOutput lb(lbp, moduleDescription_, mcc, true);
342  lb.setConsumer(this);
344  }
345 
347  openFile(fb);
348  }
349 
353  }
354 
357  }
358 
360  if(isFileOpen()) {
361  reallyCloseFile();
362  }
363  }
364 
366  }
367 
368  BranchIDLists const*
370  if(!droppedBranchIDToKeptBranchID_.empty()) {
371  return branchIDLists_.get();
372  }
373  return origBranchIDLists_;
374  }
375 
378  return thinnedAssociationsHelper_.get();
379  }
380 
381  ModuleDescription const&
383  return moduleDescription_;
384  }
385 
386  bool
388  return productSelector_.selected(desc);
389  }
390 
391  void
394  desc.setUnknown();
395  descriptions.addDefault(desc);
396  }
397 
398  void
400  ProductSelectorRules::fillDescription(desc, "outputCommands");
402  desc.addUntracked<unsigned int>("concurrencyLimit",1);
403  }
404 
405  void
407  }
408 
409 
410  static const std::string kBaseType("OutputModule");
411  const std::string&
413  return kBaseType;
414  }
415 
416  void
417  OutputModuleBase::setEventSelectionInfo(std::map<std::string, std::vector<std::pair<std::string, int> > > const& outputModulePathPositions,
418  bool anyProductProduced) {
420  description().moduleLabel(),
421  outputModulePathPositions,
422  anyProductProduced);
423  }
424  }
425 }
virtual void writeLuminosityBlock(LuminosityBlockForOutput const &)=0
bool doBeginLuminosityBlock(LuminosityBlockPrincipal const &lbp, EventSetup const &c, ModuleCallingContext const *)
std::atomic< int > remainingEvents_
bool selected(BranchDescription const &desc) const
edm::propagate_const< std::unique_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
ThinnedAssociationsHelper const * thinnedAssociationsHelper() const
std::map< BranchID, bool > keepAssociation_
virtual void setProcessesWithSelectedMergeableRunProducts(std::set< std::string > const &)
virtual bool isFileOpen() const
BranchType const & branchType() const
std::vector< BranchIDList > BranchIDLists
Definition: BranchIDList.h:19
ParameterDescriptionBase * addUntracked(U const &iLabel, T const &value)
void doPreallocate(PreallocationConfiguration const &)
static void fillDroppedToKept(ProductRegistry const &preg, std::map< BranchID, BranchDescription const * > const &trueBranchIDToKeptBranchDesc, std::map< BranchID::value_type, BranchID::value_type > &droppedBranchIDToKeptBranchID_)
virtual void doRespondToOpenInputFile_(FileBlock const &)
BranchIDLists const * origBranchIDLists_
virtual void write(EventForOutput const &)=0
ParameterSetID id() const
bool doEvent(EventPrincipal const &ep, EventSetup const &c, ActivityRegistry *, ModuleCallingContext const *)
bool doEndRun(RunPrincipal const &rp, EventSetup const &c, ModuleCallingContext const *)
void setEventSelectionInfo(std::map< std::string, std::vector< std::pair< std::string, int > > > const &outputModulePathPositions, bool anyProductProduced)
ParameterSet const & getParameterSet(ParameterSetID const &id)
bool selected(BranchDescription const &desc) const
virtual void doBeginLuminosityBlock_(LuminosityBlockForOutput const &)
OutputModuleBase(ParameterSet const &pset)
bool prePrefetchSelection(StreamID id, EventPrincipal const &, ModuleCallingContext const *)
edm::propagate_const< std::unique_ptr< BranchIDLists > > branchIDLists_
std::string const & processName() const
std::vector< ProductResolverIndexAndSkipBit > productsUsedBySelection() const
static void fillDescription(ParameterSetDescription &desc, char const *parameterName, std::vector< std::string > const &defaultStrings=defaultSelectionStrings())
ParameterSet getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
ModuleDescription const & description() const
void doWriteLuminosityBlock(LuminosityBlockPrincipal const &lbp, ModuleCallingContext const *)
#define nullptr
void configure(OutputModuleDescription const &desc)
BranchIDLists const * branchIDLists_
EDGetTokenT< ProductType > consumes(edm::InputTag const &tag)
ProductList const & productList() const
virtual void openFile(FileBlock const &) const
void selectAssociationProducts(std::vector< BranchDescription const * > const &associationDescriptions, std::set< BranchID > const &keptProductsInEvent, std::map< BranchID, bool > &keepAssociation) const
virtual void doEndRun_(RunForOutput const &)
ProductResolverIndexAndSkipBit uncheckedIndexFrom(EDGetToken) const
void addDefault(ParameterSetDescription const &psetDescription)
static const std::string kBaseType("EDAnalyzer")
virtual void doBeginRun_(RunForOutput const &)
std::string const & moduleLabel() const
unsigned int value_type
Definition: BranchID.h:16
std::string const & productInstanceName() const
ConsumesCollector consumesCollector()
Use a ConsumesCollector to gather consumes information from helper functions.
virtual void preallocate(PreallocationConfiguration const &)
virtual void writeRun(RunForOutput const &)=0
void selectProducts(ProductRegistry const &preg, ThinnedAssociationsHelper const &)
void keepThisBranch(BranchDescription const &desc, std::map< BranchID, BranchDescription const * > &trueBranchIDToKeptBranchDesc, std::set< BranchID > &keptProductsInEvent)
TypeID unwrappedTypeID() const
static void fillDescription(ParameterSetDescription &desc)
std::vector< BranchDescription const * > allBranchDescriptions() const
bool doBeginRun(RunPrincipal const &rp, EventSetup const &c, ModuleCallingContext const *)
bool configureEventSelector(edm::ParameterSet const &iPSet, std::string const &iProcessName, std::vector< std::string > const &iAllTriggerNames, edm::detail::TriggerResultsBasedEventSelector &oSelector, ConsumesCollector &&iC)
BranchID const & branchID() const
TypeWithDict const & unwrappedType() const
static void fillDescription(ParameterSetDescription &desc)
void doWriteRun(RunPrincipal const &rp, ModuleCallingContext const *, MergeableRunProductMetadata const *)
void doOpenFile(FileBlock const &fb)
element_type const * get() const
void doRespondToOpenInputFile(FileBlock const &fb)
ProductSelectorRules productSelectorRules_
virtual void preallocStreams(unsigned int)
static void fillDescriptions(ConfigurationDescriptions &descriptions)
virtual void doEndLuminosityBlock_(LuminosityBlockForOutput const &)
std::vector< detail::TriggerResultsBasedEventSelector > selectors_
BranchIDLists const * branchIDLists() const
std::array< bool, NumBranchTypes > hasNewlyDroppedBranch_
std::vector< BranchID::value_type > BranchIDList
Definition: BranchIDList.h:18
ModuleDescription moduleDescription_
HLT enums.
bool initialized() const
std::vector< std::string > const & getAllTriggerNames()
virtual void doRespondToCloseInputFile_(FileBlock const &)
void initialize(ProductSelectorRules const &rules, std::vector< BranchDescription const * > const &branchDescriptions)
std::map< BranchID::value_type, BranchID::value_type > droppedBranchIDToKeptBranchID_
virtual void preallocLumis(unsigned int)
void setConsumer(EDConsumerBase const *iConsumer)
BranchID const & originalBranchID() const
void insertSelectedProcesses(BranchDescription const &desc, std::set< std::string > &processes)
SelectedProductsForBranchType keptProducts_
static void prevalidate(ConfigurationDescriptions &)
bool doEndLuminosityBlock(LuminosityBlockPrincipal const &lbp, EventSetup const &c, ModuleCallingContext const *)
void doCloseFile()
Tell the OutputModule that is must end the current file.
ParameterSetID registerProperSelectionInfo(edm::ParameterSet const &iInitial, std::string const &iLabel, std::map< std::string, std::vector< std::pair< std::string, int > > > const &outputModulePathPositions, bool anyProductProduced)
ParameterSet const & registerIt()
static const std::string & baseType()
void doRespondToCloseInputFile(FileBlock const &fb)
static void checkForDuplicateKeptBranch(BranchDescription const &desc, std::map< BranchID, BranchDescription const * > &trueBranchIDToKeptBranchDesc)