CMS 3D CMS Logo

MergeableRunProductMetadata.cc
Go to the documentation of this file.
1 
3 
8 
9 #include <algorithm>
10 #include <memory>
11 
12 namespace edm {
13 
15  MergeableRunProductProcesses const& mergeableRunProductProcesses)
16  : mergeableRunProductProcesses_(&mergeableRunProductProcesses),
17  metadataForProcesses_(mergeableRunProductProcesses.size()) {}
18 
20 
22 
24  long long inputRunEntry,
25  StoredMergeableRunProductMetadata const& inputStoredMergeableRunProductMetadata,
26  IndexIntoFileItrHolder const& inputIndexIntoFileItr) {
27  unsigned int processIndex{0};
28  for (auto& metadataForProcess : metadataForProcesses_) {
29  bool valid = true;
30  std::vector<LuminosityBlockNumber_t>::const_iterator lumisInRunBeingReadBegin;
31  std::vector<LuminosityBlockNumber_t>::const_iterator lumisInRunBeingReadEnd;
32 
34 
35  if (inputStoredMergeableRunProductMetadata.getLumiContent(
36  inputRunEntry, processName, valid, lumisInRunBeingReadBegin, lumisInRunBeingReadEnd)) {
37  // This is a reference to the container accumulating the luminosity
38  // block numbers for the run entries read associated with the current
39  // run being processed that correspond to the luminosity block content
40  // for the mergeable run products created in the process.
41  std::vector<LuminosityBlockNumber_t>& lumis = metadataForProcess.lumis();
42 
43  // In the following, iter1 refers to the lumis associated with run entries already read
44  // and iter2 refers to the lumis associated with the current run entry being read.
45 
46  bool elementsIn2NotIn1 = false;
47  bool elementsIn1NotIn2 = false;
48  bool sharedElements = false;
49 
50  std::vector<LuminosityBlockNumber_t> temp;
51  temp.reserve(lumis.size() + (lumisInRunBeingReadEnd - lumisInRunBeingReadBegin));
52  std::vector<LuminosityBlockNumber_t>::const_iterator end1 = lumis.end();
53  std::vector<LuminosityBlockNumber_t>::const_iterator end2 = lumisInRunBeingReadEnd;
54  for (std::vector<LuminosityBlockNumber_t>::const_iterator iter1 = lumis.begin(),
55  iter2 = lumisInRunBeingReadBegin;
56  iter1 != end1 || iter2 != end2;) {
57  if (iter1 == end1) {
58  temp.push_back(*iter2);
59  ++iter2;
60  elementsIn2NotIn1 = true;
61  continue;
62  } else if (iter2 == end2) {
63  temp.push_back(*iter1);
64  ++iter1;
65  elementsIn1NotIn2 = true;
66  continue;
67  } else if (*iter1 < *iter2) {
68  temp.push_back(*iter1);
69  ++iter1;
70  elementsIn1NotIn2 = true;
71  } else if (*iter1 > *iter2) {
72  temp.push_back(*iter2);
73  ++iter2;
74  elementsIn2NotIn1 = true;
75  } else {
76  // they must be equal
77  sharedElements = true;
78  temp.push_back(*iter1);
79  ++iter1;
80  ++iter2;
81  }
82  }
83  lumis.swap(temp);
84  if (!sharedElements && elementsIn2NotIn1 && elementsIn1NotIn2) {
85  metadataForProcess.setMergeDecision(MERGE);
86  if (!valid) {
87  metadataForProcess.setValid(false);
88  }
89  } else if (!elementsIn2NotIn1) {
90  metadataForProcess.setMergeDecision(IGNORE);
91  } else if (!elementsIn1NotIn2) {
92  metadataForProcess.setMergeDecision(REPLACE);
93  if (!valid) {
94  metadataForProcess.setValid(false);
95  }
96  } else {
97  // In this case there is no way to get the correct answer.
98  // The result will always be invalid.
99  metadataForProcess.setMergeDecision(MERGE);
100  metadataForProcess.setValid(false);
101  }
102 
103  } else {
104  metadataForProcess.setMergeDecision(MERGE);
105  if (!valid) {
106  metadataForProcess.setValid(false);
107  }
108  metadataForProcess.setUseIndexIntoFile(true);
110  inputIndexIntoFileItr.getLumisInRun(lumisFromIndexIntoFile_);
112  }
113  }
114  ++processIndex;
115  } // end of loop over processes
116  } // end of readRun function
117 
119  if (metadataForProcesses_.empty()) {
120  return;
121  }
122  lumisProcessed_.push_back(lumi);
123  }
124 
126  if (metadataForProcesses_.empty()) {
127  return;
128  }
129 
131 
132  // Sort the lumiProcessed vector and ignore the duplicate
133  // entries
134 
135  // Not sure if this copy is necessary. I'm copying because
136  // I am not sure the standard algorithms work on TBB containers.
137  // I couldn't find anything saying they did when I searched ...
138  std::vector<LuminosityBlockNumber_t> lumisProcessed;
139  lumisProcessed.reserve(lumisProcessed_.size());
140  for (auto const& lumi : lumisProcessed_) {
141  lumisProcessed.push_back(lumi);
142  }
143 
144  std::sort(lumisProcessed.begin(), lumisProcessed.end());
145  auto uniqueEnd = std::unique(lumisProcessed.begin(), lumisProcessed.end());
146 
147  for (auto& metadataForProcess : metadataForProcesses_) {
148  // Did we process all the lumis in this process that were processed
149  // in the process that created the mergeable run products.
150  metadataForProcess.setAllLumisProcessed(std::includes(
151  lumisProcessed.begin(), uniqueEnd, metadataForProcess.lumis().begin(), metadataForProcess.lumis().end()));
152  }
153  }
154 
156  lumisProcessed_.clear();
157  for (auto& metadataForProcess : metadataForProcesses_) {
158  metadataForProcess.reset();
159  }
160  }
161 
163  if (metadataForProcesses_.empty()) {
164  return;
165  }
166 
167  std::vector<std::string> const& storedProcesses = storedMetadata.processesWithMergeableRunProducts();
168  if (storedProcesses.empty()) {
169  return;
170  }
171 
172  unsigned long long beginProcess = storedMetadata.singleRunEntryAndProcesses().size();
173  unsigned long long endProcess = beginProcess;
174 
175  std::vector<std::string> const& processesWithMergeableRunProducts =
177 
178  for (unsigned int storedProcessIndex = 0; storedProcessIndex < storedProcesses.size(); ++storedProcessIndex) {
179  // Look for a matching process. It is intentional that no process
180  // is added when there is no match. storedProcesses only contains
181  // processes which created mergeable run products selected by the
182  // output module to be written out. processesWithMergeableRunProducts_
183  // only has processes which created mergeable run products that were
184  // read from the input data files. Note storedProcesses may be
185  // missing processes because the output module dropped products.
186  // The other vector may be missing processes because of SubProcesses.
187  for (unsigned int transientProcessIndex = 0; transientProcessIndex < processesWithMergeableRunProducts.size();
188  ++transientProcessIndex) {
189  // This string comparison could be optimized away by storing an index mapping in
190  // OutputModuleBase calculated once early in a job. (? Given how rare
191  // mergeable run products are this optimization may not be worth doing)
192  if (processesWithMergeableRunProducts[transientProcessIndex] == storedProcesses[storedProcessIndex]) {
193  if (addProcess(storedMetadata,
194  metadataForProcesses_.at(transientProcessIndex),
195  storedProcessIndex,
196  beginProcess,
197  endProcess)) {
198  ++endProcess;
199  }
200  break;
201  }
202  }
203  }
204  storedMetadata.singleRunEntries().emplace_back(beginProcess, endProcess);
205  }
206 
208  MetadataForProcess const& metadataForProcess,
209  unsigned int storedProcessIndex,
210  unsigned long long beginProcess,
211  unsigned long long endProcess) const {
212  if (metadataForProcess.valid() && metadataForProcess.allLumisProcessed()) {
213  return false;
214  }
215 
216  storedMetadata.allValidAndUseIndexIntoFile() = false;
217 
218  unsigned long long iBeginLumi = 0;
219  unsigned long long iEndLumi = 0;
220 
221  // See if we need to store the set of lumi numbers corresponding
222  // to this process and run entry. If they were all processed then
223  // we can just get the lumi numbers out of IndexIntoFile and do
224  // not need to store them here
225  if (!metadataForProcess.allLumisProcessed()) {
226  // If we need to store the numbers, then we can check to
227  // make sure this does not duplicate the lumi numbers we
228  // stored for another process. If we did then we can just
229  // just reference same indices and avoid copying a duplicate
230  // sequence of lumi numbers. It is sufficient to check the
231  // size only. As you go back in the time sequence of processes
232  // the only thing that can happen is more lumi numbers appear
233  // at steps where a run was only partially processed.
234  bool found = false;
235  for (unsigned long long kProcess = beginProcess; kProcess < endProcess; ++kProcess) {
236  StoredMergeableRunProductMetadata::SingleRunEntryAndProcess const& storedSingleRunEntryAndProcess =
237  storedMetadata.singleRunEntryAndProcesses().at(kProcess);
238 
239  if (metadataForProcess.lumis().size() ==
240  (storedSingleRunEntryAndProcess.endLumi() - storedSingleRunEntryAndProcess.beginLumi())) {
241  iBeginLumi = storedSingleRunEntryAndProcess.beginLumi();
242  iEndLumi = storedSingleRunEntryAndProcess.endLumi();
243  found = true;
244  break;
245  }
246  }
247  if (!found) {
248  std::vector<LuminosityBlockNumber_t>& storedLumis = storedMetadata.lumis();
249  std::vector<LuminosityBlockNumber_t> const& metdataLumis = metadataForProcess.lumis();
250  iBeginLumi = storedLumis.size();
251  storedLumis.insert(storedLumis.end(), metdataLumis.begin(), metdataLumis.end());
252  iEndLumi = storedLumis.size();
253  }
254  }
255  storedMetadata.singleRunEntryAndProcesses().emplace_back(
256  iBeginLumi, iEndLumi, storedProcessIndex, metadataForProcess.valid(), metadataForProcess.allLumisProcessed());
257  return true;
258  }
259 
261  std::string const& processThatCreatedProduct) const {
262  MetadataForProcess const* metadataForProcess = metadataForOneProcess(processThatCreatedProduct);
263  if (metadataForProcess) {
264  return metadataForProcess->mergeDecision();
265  }
266  throw Exception(errors::LogicError) << "MergeableRunProductMetadata::getMergeDecision could not find process.\n"
267  << "It should not be possible for this error to occur.\n"
268  << "Contact a Framework developer\n";
269  return MERGE;
270  }
271 
272  bool MergeableRunProductMetadata::knownImproperlyMerged(std::string const& processThatCreatedProduct) const {
273  MetadataForProcess const* metadataForProcess = metadataForOneProcess(processThatCreatedProduct);
274  if (metadataForProcess) {
275  return !metadataForProcess->valid();
276  }
277  return false;
278  }
279 
281  lumis_.clear();
283  useIndexIntoFile_ = false;
284  valid_ = true;
285  allLumisProcessed_ = false;
286  }
287 
289  std::string const& processName) const {
290  unsigned int processIndex = 0;
291  for (auto const& metadataForProcess : metadataForProcesses_) {
292  // This string comparison could be optimized away by storing an index in
293  // BranchDescription as a transient calculated once early in a job.
294  if (getProcessName(processIndex) == processName) {
295  return &metadataForProcess;
296  }
297  ++processIndex;
298  }
299  return nullptr;
300  }
301 
303  for (auto& metadataForProcess : metadataForProcesses_) {
304  if (metadataForProcess.useIndexIntoFile()) {
305  metadataForProcess.setUseIndexIntoFile(false);
306 
307  std::vector<LuminosityBlockNumber_t> temp;
308  temp.reserve(metadataForProcess.lumis().size() + lumisFromIndexIntoFile_.size());
309  std::vector<LuminosityBlockNumber_t>::const_iterator end1 = metadataForProcess.lumis().end();
310  std::vector<LuminosityBlockNumber_t>::const_iterator end2 = lumisFromIndexIntoFile_.end();
311  for (std::vector<LuminosityBlockNumber_t>::const_iterator iter1 = metadataForProcess.lumis().begin(),
312  iter2 = lumisFromIndexIntoFile_.begin();
313  iter1 != end1 || iter2 != end2;) {
314  if (iter1 == end1) {
315  temp.push_back(*iter2);
316  ++iter2;
317  continue;
318  } else if (iter2 == end2) {
319  temp.push_back(*iter1);
320  ++iter1;
321  continue;
322  } else if (*iter1 < *iter2) {
323  temp.push_back(*iter1);
324  ++iter1;
325  } else if (*iter1 > *iter2) {
326  temp.push_back(*iter2);
327  ++iter2;
328  } else {
329  // they must be equal
330  temp.push_back(*iter1);
331  ++iter1;
332  ++iter2;
333  }
334  }
335  metadataForProcess.lumis().swap(temp);
336  }
337  }
338  lumisFromIndexIntoFile_.clear();
340  }
341 } // namespace edm
size
Write out results.
bool addProcess(StoredMergeableRunProductMetadata &storedMetadata, MetadataForProcess const &metadataForProcess, unsigned int storedProcessIndex, unsigned long long beginProcess, unsigned long long endProcess) const
unsigned int LuminosityBlockNumber_t
std::vector< std::string > const & processesWithMergeableRunProducts() const
std::vector< SingleRunEntryAndProcess > & singleRunEntryAndProcesses()
MergeDecision getMergeDecision(std::string const &processThatCreatedProduct) const
MergeableRunProductProcesses const * mergeableRunProductProcesses_
std::vector< MetadataForProcess > metadataForProcesses_
def unique(seq, keepstr=True)
Definition: tier0.py:24
MergeableRunProductMetadata(MergeableRunProductProcesses const &)
bool knownImproperlyMerged(std::string const &processThatCreatedProduct) const override
std::vector< LuminosityBlockNumber_t > & lumis()
void writeLumi(LuminosityBlockNumber_t lumi)
std::vector< LuminosityBlockNumber_t > lumisFromIndexIntoFile_
oneapi::tbb::concurrent_vector< LuminosityBlockNumber_t > const & lumisProcessed() const
void getLumisInRun(std::vector< LuminosityBlockNumber_t > &lumis) const
bool getLumiContent(unsigned long long runEntry, std::string const &process, bool &valid, std::vector< LuminosityBlockNumber_t >::const_iterator &lumisBegin, std::vector< LuminosityBlockNumber_t >::const_iterator &lumisEnd) const
void addEntryToStoredMetadata(StoredMergeableRunProductMetadata &) const
HLT enums.
std::vector< xml::Document > includes
std::string const & getProcessName(unsigned int index) const
oneapi::tbb::concurrent_vector< LuminosityBlockNumber_t > lumisProcessed_
void readRun(long long inputRunEntry, StoredMergeableRunProductMetadata const &inputStoredMergeableRunProductMetadata, IndexIntoFileItrHolder const &inputIndexIntoFileItr)
MetadataForProcess const * metadataForOneProcess(std::string const &processName) const
std::vector< std::string > const & processesWithMergeableRunProducts() const