CMS 3D CMS Logo

OutputProcessBlockHelper.cc
Go to the documentation of this file.
2 
5 
6 #include <algorithm>
7 #include <cassert>
8 #include <utility>
9 
10 namespace edm {
11 
13  std::set<std::string> const& processesWithKeptProcessBlockProducts,
14  ProcessBlockHelperBase const& processBlockHelper) {
16 
17  // Copy the list of processes with ProcessBlock products from the EventProcessor or SubProcess,
18  // except remove any processes where the output module calling this has dropped all ProcessBlock
19  // products. We want to maintain the same order and only remove elements. Fill a vector that can
20  // translate from one process list to the other.
22  unsigned int iProcess = 0;
24  if (processesWithKeptProcessBlockProducts.find(process) != processesWithKeptProcessBlockProducts.end()) {
26  translateFromStoredIndex_.emplace_back(iProcess);
27  }
28  ++iProcess;
29  }
30 
31  for (auto const& addedProcess : processBlockHelper.addedProcesses()) {
32  // count new processes producing ProcessBlock products that are
33  // kept by the OutputModule. There can be at most 1 except
34  // in the case of SubProcesses.
37  addedProcess) != processesWithProcessBlockProducts_.end()) {
39  }
40  }
41 
42  // Determine if any ProcessBlock product from the input file is kept by the output module.
43  // Do this by looking for a process name on both the list of processes with ProcessBlock
44  // products kept by the output module and process names from the input with ProcessBlock
45  // products.
47  std::find_first_of(processesWithProcessBlockProducts_.begin(),
52  }
53 
55  // The stored cache indices are the ones we want to fill.
56  // This will get written to the output file.
57  // Note for output the vector of vectors is flattened into a single vector
58  std::vector<unsigned int> storedCacheIndices;
59 
60  // Number of processes in StoredProcessBlockHelper.
61  unsigned int nStoredProcesses = storedProcessBlockHelper.processesWithProcessBlockProducts().size();
62 
64  // This is really simple if we are not keeping any ProcessBlock products
65  // from the input file. Deal with that special case first.
66  // Except for the special case of a SubProcess, nStoredProcesses will be 1.
67  assert(nAddedProcesses_ == nStoredProcesses);
68  storedCacheIndices.reserve(nStoredProcesses);
69  for (unsigned int i = 0; i < nStoredProcesses; ++i) {
70  storedCacheIndices.push_back(i);
71  }
72  storedProcessBlockHelper.setProcessBlockCacheIndices(std::move(storedCacheIndices));
73  return;
74  }
75 
76  // Cache indices of the main ProcessBlockHelper we use as input. This
77  // ProcessBlockHelper is owned by the EventProcessor.
78  std::vector<std::vector<unsigned int>> const& cacheIndices = processBlockHelper_->processBlockCacheIndices();
79 
80  // We need to convert the cache indices in the ProcessBlockHelper to have different values when
81  // put in the StoredProcessBlockHelper. The values are not the same because the ProcessBlocks are
82  // not read in the same order in this process as compared to the next process which will read
83  // the output file that is being written (the read order is the same as the order the cache
84  // objects are placed in the cache vectors). In this process, they are ordered first by input file,
85  // second by process and last by TTree entry. In the next process, this output file will be read
86  // as a single input file. The ProcessBlocks are read in process order (this will be a subset
87  // of the process list in ProcessBlockHelper, maybe smaller or maybe the same), and finally in
88  // order of entry in the TTree. This conversion is done by subtracting and adding some
89  // offsets and a lot of the following code involves calculating these offsets to do the conversion.
90 
91  // We will need the info in these to calculate the offsets
92  std::vector<unsigned int> const& cacheIndexVectorsPerFile = processBlockHelper_->cacheIndexVectorsPerFile();
93  std::vector<unsigned int> const& cacheEntriesPerFile = processBlockHelper_->cacheEntriesPerFile();
94  std::vector<std::vector<unsigned int>> const& nEntries = processBlockHelper_->nEntries();
95 
96  assert(!cacheIndices.empty());
97  // Count processes in the input file with saved ProcessBlock products in the output
98  unsigned int nInputProcesses = 0;
99  for (unsigned int iStoredProcess = 0; iStoredProcess < nStoredProcesses; ++iStoredProcess) {
100  // The existing cache indices in processBlockHelper include only indices
101  // corresponding to the processes in the input files. If there are more, then
102  // they correspond to current process (and there only will be more if some
103  // of the newly produced ProcessBlock products are going to be kept).
104  // There will be at most 1 added process except in the case of SubProcesses.
105  if (translateFromStoredIndex_[iStoredProcess] < cacheIndices[0].size()) {
106  ++nInputProcesses;
107  }
108  }
109 
110  // The following are the 4 offsets. The first two are defined relative to the
111  // cache sequence in this process. The second two offsets are defined relative
112  // to the cache sequence when the output file we are writing is read.
113 
114  // 1. Total number of cache entries in all input files prior to the current input file
115  unsigned int fileOffset = 0;
116 
117  // 2. For each process, the total number of cache entries in processes in the current
118  // input file and before the process
119  std::vector<unsigned int> processOffset(nInputProcesses, 0);
120 
121  // 3. For each input process with ProcessBlock products stored by this
122  // output module, the total number of cache entries in earlier input processes
123  // that have ProcessBlock products stored by this output module.
124  // Summed over all input files and including only processes in StoredProcessBlockHelper.
125  // Plus an extra element at the end that includes all entries in all such processes.
126  assert(!nEntries.empty());
127  std::vector<unsigned int> storedProcessOffset(nInputProcesses + 1, 0);
128 
129  // 4. For each process with ProcessBlock products stored by this output module,
130  // the total number of cache entries in that process in all input files before
131  // the current input file.
132  std::vector<unsigned int> storedFileInProcessOffset(nInputProcesses, 0);
133 
134  setStoredProcessOffset(nInputProcesses, nEntries, storedProcessOffset);
135 
136  storedCacheIndices.reserve(cacheIndices.size() * nStoredProcesses);
137 
138  unsigned int iFile = 0;
139  unsigned int innerVectorsCurrentFile = 0;
140 
141  // In ProcessBlockHelper, there is a vector which contains vectors
142  // of cache indices. Iterate over the inner vectors.
143  for (auto const& innerVectorOfCacheIndices : cacheIndices) {
144  // The inner vectors are associated with input files. Several contiguous
145  // inner vectors can be associated with the same input file. Check to
146  // see if we have crossed an input file boundary and set the file
147  // index, iFile, at the next file associated with at least
148  // one inner vector if necessary.
149  while (innerVectorsCurrentFile == cacheIndexVectorsPerFile[iFile]) {
150  // Sum cache entries for all files before the current file in
151  // ProcessBlockHelper
152  fileOffset += cacheEntriesPerFile[iFile];
153  ++iFile;
154  innerVectorsCurrentFile = 0;
155  }
156  if (innerVectorsCurrentFile == 0) {
157  // Call these when the input file changes
158  setProcessOffset(iFile, nInputProcesses, nEntries, processOffset);
159  setStoredFileInProcessOffset(iFile, nInputProcesses, nEntries, storedFileInProcessOffset);
160  }
161  ++innerVectorsCurrentFile;
162 
163  assert(nInputProcesses + nAddedProcesses_ == nStoredProcesses);
164 
165  // Really fill the cache indices that will be stored in the output file in this loop
166  for (unsigned int iStoredProcess = 0; iStoredProcess < nStoredProcesses; ++iStoredProcess) {
167  unsigned int storedCacheIndex = ProcessBlockHelperBase::invalidCacheIndex();
168  if (iStoredProcess < nInputProcesses) {
169  unsigned int cacheIndex = innerVectorOfCacheIndices[translateFromStoredIndex_[iStoredProcess]];
170  if (cacheIndex != ProcessBlockHelperBase::invalidCacheIndex()) {
171  // The offsets count in the cache sequence to the first entry in
172  // the current input file and process
173  unsigned int inputOffset = fileOffset + processOffset[iStoredProcess];
174  unsigned int storedOffset = storedProcessOffset[iStoredProcess] + storedFileInProcessOffset[iStoredProcess];
175  storedCacheIndex = cacheIndex - inputOffset + storedOffset;
176  }
177  } else {
178  // This corresponds to the current process if it has newly produced
179  // ProcessBlock products (plus possibly SubProcesses).
180  storedCacheIndex = storedProcessOffset[nInputProcesses] + iStoredProcess - nInputProcesses;
181  }
182  storedCacheIndices.push_back(storedCacheIndex);
183  }
184  }
185  storedProcessBlockHelper.setProcessBlockCacheIndices(std::move(storedCacheIndices));
186  }
187 
188  void OutputProcessBlockHelper::setStoredProcessOffset(unsigned int nInputProcesses,
189  std::vector<std::vector<unsigned int>> const& nEntries,
190  std::vector<unsigned int>& storedProcessOffset) const {
191  unsigned int iStored = 0;
192  for (auto& offset : storedProcessOffset) {
193  offset = 0;
194  // loop over earlier processes
195  for (unsigned int jStored = 0; jStored < iStored; ++jStored) {
196  unsigned int indexInEventProcessor = translateFromStoredIndex_[jStored];
197  // loop over input files
198  for (auto const& entries : nEntries) {
199  assert(indexInEventProcessor < entries.size());
200  offset += entries[indexInEventProcessor];
201  }
202  }
203  ++iStored;
204  }
205  }
206 
208  unsigned int nInputProcesses,
209  std::vector<std::vector<unsigned int>> const& nEntries,
210  std::vector<unsigned int>& processOffset) const {
211  unsigned int iStored = 0;
212  for (auto& offset : processOffset) {
213  offset = 0;
214  unsigned int iProcess = translateFromStoredIndex_[iStored];
215  for (unsigned int jProcess = 0; jProcess < iProcess; ++jProcess) {
216  offset += nEntries[iFile][jProcess];
217  }
218  ++iStored;
219  }
220  }
221 
223  unsigned int iFile,
224  unsigned int nInputProcesses,
225  std::vector<std::vector<unsigned int>> const& nEntries,
226  std::vector<unsigned int>& storedFileInProcessOffset) const {
227  unsigned int iStored = 0;
228  for (auto& offset : storedFileInProcessOffset) {
229  offset = 0;
230  unsigned int indexInEventProcessor = translateFromStoredIndex_[iStored];
231  // loop over input files before current input file
232  for (unsigned int jFile = 0; jFile < iFile; ++jFile) {
233  assert(indexInEventProcessor < nEntries[jFile].size());
234  offset += nEntries[jFile][indexInEventProcessor];
235  }
236  ++iStored;
237  }
238  }
239 
240 } // namespace edm
virtual unsigned int nProcessesInFirstFile() const =0
std::vector< std::string > const & processesWithProcessBlockProducts() const
static constexpr unsigned int invalidCacheIndex()
ProcessBlockHelperBase const * processBlockHelper_
void find(edm::Handle< EcalRecHitCollection > &hits, DetId thisDet, std::vector< EcalRecHitCollection::const_iterator > &hit, bool debug=false)
Definition: FindCaloHit.cc:19
assert(be >=bs)
virtual std::vector< std::string > const & topProcessesWithProcessBlockProducts() const =0
std::vector< std::string > const & addedProcesses() const
void updateAfterProductSelection(std::set< std::string > const &processesWithKeptProcessBlockProducts, ProcessBlockHelperBase const &)
virtual std::vector< unsigned int > const & cacheIndexVectorsPerFile() const =0
std::vector< std::string > const & processesWithProcessBlockProducts() const
void setStoredFileInProcessOffset(unsigned int iFile, unsigned int nInputProcesses, std::vector< std::vector< unsigned int >> const &nEntries, std::vector< unsigned int > &storedFileInProcessOffset) const
void fillCacheIndices(StoredProcessBlockHelper &) const
std::vector< std::string > processesWithProcessBlockProducts_
std::vector< unsigned int > translateFromStoredIndex_
void setProcessOffset(unsigned int iFile, unsigned int nInputProcesses, std::vector< std::vector< unsigned int >> const &nEntries, std::vector< unsigned int > &processOffset) const
virtual std::vector< unsigned int > const & cacheEntriesPerFile() const =0
virtual std::vector< std::vector< unsigned int > > const & nEntries() const =0
void setProcessBlockCacheIndices(std::vector< unsigned int > val)
HLT enums.
void setStoredProcessOffset(unsigned int nInputProcesses, std::vector< std::vector< unsigned int >> const &nEntries, std::vector< unsigned int > &storedProcessOffset) const
virtual std::vector< std::vector< unsigned int > > const & processBlockCacheIndices() const =0
def move(src, dest)
Definition: eostools.py:511
ProcessBlockHelperBase const * processBlockHelper() const