CMS 3D CMS Logo

MergeableRunProductMetadata.cc
Go to the documentation of this file.
1 
3 
8 
9 #include <algorithm>
10 #include <memory>
11 
12 namespace edm {
13 
15  MergeableRunProductMetadata(MergeableRunProductProcesses const& mergeableRunProductProcesses) :
16  mergeableRunProductProcesses_(&mergeableRunProductProcesses),
17  metadataForProcesses_(mergeableRunProductProcesses.size()) {
18  }
19 
21  }
22 
25  }
26 
28  long long inputRunEntry,
29  StoredMergeableRunProductMetadata const& inputStoredMergeableRunProductMetadata,
30  IndexIntoFileItrHolder const& inputIndexIntoFileItr) {
31 
32  unsigned int processIndex {0};
33  for (auto & metadataForProcess : metadataForProcesses_) {
34 
35  bool valid = true;
36  std::vector<LuminosityBlockNumber_t>::const_iterator lumisInRunBeingReadBegin;
37  std::vector<LuminosityBlockNumber_t>::const_iterator lumisInRunBeingReadEnd;
38 
40 
41  if (inputStoredMergeableRunProductMetadata.getLumiContent(inputRunEntry,
42  processName,
43  valid,
44  lumisInRunBeingReadBegin,
45  lumisInRunBeingReadEnd)) {
46 
47  // This is a reference to the container accumulating the luminosity
48  // block numbers for the run entries read associated with the current
49  // run being processed that correspond to the luminosity block content
50  // for the mergeable run products created in the process.
51  std::vector<LuminosityBlockNumber_t>& lumis = metadataForProcess.lumis();
52 
53  // In the following, iter1 refers to the lumis associated with run entries already read
54  // and iter2 refers to the lumis associated with the current run entry being read.
55 
56  bool elementsIn2NotIn1 = false;
57  bool elementsIn1NotIn2 = false;
58  bool sharedElements = false;
59 
60  std::vector<LuminosityBlockNumber_t> temp;
61  temp.reserve(lumis.size() + (lumisInRunBeingReadEnd - lumisInRunBeingReadBegin));
62  std::vector<LuminosityBlockNumber_t>::const_iterator end1 = lumis.end();
63  std::vector<LuminosityBlockNumber_t>::const_iterator end2 = lumisInRunBeingReadEnd;
64  for (std::vector<LuminosityBlockNumber_t>::const_iterator iter1 = lumis.begin(),
65  iter2 = lumisInRunBeingReadBegin;
66  iter1 != end1 || iter2 != end2;) {
67  if (iter1 == end1) {
68  temp.push_back(*iter2);
69  ++iter2;
70  elementsIn2NotIn1 = true;
71  continue;
72  } else if (iter2 == end2) {
73  temp.push_back(*iter1);
74  ++iter1;
75  elementsIn1NotIn2 = true;
76  continue;
77  } else if (*iter1 < *iter2) {
78  temp.push_back(*iter1);
79  ++iter1;
80  elementsIn1NotIn2 = true;
81  } else if (*iter1 > *iter2) {
82  temp.push_back(*iter2);
83  ++iter2;
84  elementsIn2NotIn1 = true;
85  } else {
86  // they must be equal
87  sharedElements = true;
88  temp.push_back(*iter1);
89  ++iter1;
90  ++iter2;
91  }
92  }
93  lumis.swap(temp);
94  if (!sharedElements && elementsIn2NotIn1 && elementsIn1NotIn2) {
95  metadataForProcess.setMergeDecision(MERGE);
96  if (!valid) {
97  metadataForProcess.setValid(false);
98  }
99  } else if (!elementsIn2NotIn1) {
100  metadataForProcess.setMergeDecision(IGNORE);
101  } else if (!elementsIn1NotIn2) {
102  metadataForProcess.setMergeDecision(REPLACE);
103  if (!valid) {
104  metadataForProcess.setValid(false);
105  }
106  } else {
107  // In this case there is no way to get the correct answer.
108  // The result will always be invalid.
109  metadataForProcess.setMergeDecision(MERGE);
110  metadataForProcess.setValid(false);
111  }
112 
113  } else {
114 
115  metadataForProcess.setMergeDecision(MERGE);
116  if (!valid) {
117  metadataForProcess.setValid(false);
118  }
119  metadataForProcess.setUseIndexIntoFile(true);
121  inputIndexIntoFileItr.getLumisInRun(lumisFromIndexIntoFile_);
123  }
124  }
125  ++processIndex;
126  } // end of loop over processes
127  } // end of readRun function
128 
130 
131  if (metadataForProcesses_.empty()) {
132  return;
133  }
134  lumisProcessed_.push_back(lumi);
135  }
136 
138 
139  if (metadataForProcesses_.empty()) {
140  return;
141  }
142 
144 
145  // Sort the lumiProcessed vector and ignore the duplicate
146  // entries
147 
148  // Not sure if this copy is necessary. I'm copying because
149  // I am not sure the standard algorithms work on TBB containers.
150  // I couldn't find anything saying they did when I searched ...
151  std::vector<LuminosityBlockNumber_t> lumisProcessed;
152  lumisProcessed.reserve(lumisProcessed_.size());
153  for (auto const& lumi : lumisProcessed_) {
154  lumisProcessed.push_back(lumi);
155  }
156 
157  std::sort(lumisProcessed.begin(), lumisProcessed.end());
158  auto uniqueEnd = std::unique(lumisProcessed.begin(), lumisProcessed.end());
159 
160  for (auto & metadataForProcess : metadataForProcesses_) {
161 
162  // Did we process all the lumis in this process that were processed
163  // in the process that created the mergeable run products.
164  metadataForProcess.setAllLumisProcessed(
165  std::includes(lumisProcessed.begin(), uniqueEnd,
166  metadataForProcess.lumis().begin(),
167  metadataForProcess.lumis().end()));
168  }
169  }
170 
172 
173  lumisProcessed_.clear();
174  for (auto & metadataForProcess : metadataForProcesses_) {
175  metadataForProcess.reset();
176  }
177  }
178 
181 
182  if (metadataForProcesses_.empty()) {
183  return;
184  }
185 
186  std::vector<std::string> const& storedProcesses = storedMetadata.processesWithMergeableRunProducts();
187  if (storedProcesses.empty()) {
188  return;
189  }
190 
191  unsigned long long beginProcess = storedMetadata.singleRunEntryAndProcesses().size();
192  unsigned long long endProcess = beginProcess;
193 
194 
195  std::vector<std::string> const& processesWithMergeableRunProducts =
197 
198  for (unsigned int storedProcessIndex = 0;
199  storedProcessIndex < storedProcesses.size();
200  ++storedProcessIndex) {
201 
202  // Look for a matching process. It is intentional that no process
203  // is added when there is no match. storedProcesses only contains
204  // processes which created mergeable run products selected by the
205  // output module to be written out. processesWithMergeableRunProducts_
206  // only has processes which created mergeable run products that were
207  // read from the input data files. Note storedProcesses may be
208  // missing processes because the output module dropped products.
209  // The other vector may be missing processes because of SubProcesses.
210  for (unsigned int transientProcessIndex = 0;
211  transientProcessIndex < processesWithMergeableRunProducts.size();
212  ++transientProcessIndex) {
213 
214  // This string comparison could be optimized away by storing an index mapping in
215  // OutputModuleBase calculated once early in a job. (? Given how rare
216  // mergeable run products are this optimization may not be worth doing)
217  if (processesWithMergeableRunProducts[transientProcessIndex] ==
218  storedProcesses[storedProcessIndex]) {
219 
220  if (addProcess(storedMetadata,
221  metadataForProcesses_.at(transientProcessIndex),
222  storedProcessIndex,
223  beginProcess,
224  endProcess)) {
225  ++endProcess;
226  }
227  break;
228  }
229  }
230  }
231  storedMetadata.singleRunEntries().emplace_back(beginProcess, endProcess);
232  }
233 
236  MetadataForProcess const& metadataForProcess,
237  unsigned int storedProcessIndex,
238  unsigned long long beginProcess,
239  unsigned long long endProcess) const {
240 
241  if (metadataForProcess.valid() &&
242  metadataForProcess.allLumisProcessed()) {
243  return false;
244  }
245 
246  storedMetadata.allValidAndUseIndexIntoFile() = false;
247 
248  unsigned long long iBeginLumi = 0;
249  unsigned long long iEndLumi = 0;
250 
251  // See if we need to store the set of lumi numbers corresponding
252  // to this process and run entry. If they were all processed then
253  // we can just get the lumi numbers out of IndexIntoFile and do
254  // not need to store them here
255  if (!metadataForProcess.allLumisProcessed()) {
256  // If we need to store the numbers, then we can check to
257  // make sure this does not duplicate the lumi numbers we
258  // stored for another process. If we did then we can just
259  // just reference same indices and avoid copying a duplicate
260  // sequence of lumi numbers. It is sufficient to check the
261  // size only. As you go back in the time sequence of processes
262  // the only thing that can happen is more lumi numbers appear
263  // at steps where a run was only partially processed.
264  bool found = false;
265  for (unsigned long long kProcess = beginProcess;
266  kProcess < endProcess;
267  ++kProcess) {
268  StoredMergeableRunProductMetadata::SingleRunEntryAndProcess const& storedSingleRunEntryAndProcess =
269  storedMetadata.singleRunEntryAndProcesses().at(kProcess);
270 
271  if (metadataForProcess.lumis().size() ==
272  (storedSingleRunEntryAndProcess.endLumi() - storedSingleRunEntryAndProcess.beginLumi())) {
273 
274  iBeginLumi = storedSingleRunEntryAndProcess.beginLumi();
275  iEndLumi = storedSingleRunEntryAndProcess.endLumi();
276  found = true;
277  break;
278  }
279  }
280  if (!found) {
281  std::vector<LuminosityBlockNumber_t>& storedLumis = storedMetadata.lumis();
282  std::vector<LuminosityBlockNumber_t> const& metdataLumis = metadataForProcess.lumis();
283  iBeginLumi = storedLumis.size();
284  storedLumis.insert( storedLumis.end(), metdataLumis.begin(), metdataLumis.end() );
285  iEndLumi = storedLumis.size();
286  }
287  }
288  storedMetadata.singleRunEntryAndProcesses().emplace_back(iBeginLumi,
289  iEndLumi,
290  storedProcessIndex,
291  metadataForProcess.valid(),
292  metadataForProcess.allLumisProcessed());
293  return true;
294  }
295 
297  MergeableRunProductMetadata::getMergeDecision(std::string const& processThatCreatedProduct) const {
298 
299  MetadataForProcess const* metadataForProcess = metadataForOneProcess(processThatCreatedProduct);
300  if (metadataForProcess) {
301  return metadataForProcess->mergeDecision();
302  }
304  << "MergeableRunProductMetadata::getMergeDecision could not find process.\n"
305  << "It should not be possible for this error to occur.\n"
306  << "Contact a Framework developer\n";
307  return MERGE;
308  }
309 
310  bool
311  MergeableRunProductMetadata::knownImproperlyMerged(std::string const& processThatCreatedProduct) const {
312 
313  MetadataForProcess const* metadataForProcess = metadataForOneProcess(processThatCreatedProduct);
314  if (metadataForProcess) {
315  return !metadataForProcess->valid();
316  }
317  return false;
318  }
319 
321  lumis_.clear();
322  mergeDecision_ = MERGE;
323  useIndexIntoFile_ = false;
324  valid_ = true;
325  allLumisProcessed_ = false;
326  }
327 
330  unsigned int processIndex = 0;
331  for (auto const& metadataForProcess : metadataForProcesses_) {
332  // This string comparison could be optimized away by storing an index in
333  // BranchDescription as a transient calculated once early in a job.
334  if (getProcessName(processIndex) == processName) {
335  return &metadataForProcess;
336  }
337  ++processIndex;
338  }
339  return nullptr;
340  }
341 
343 
344  for (auto & metadataForProcess : metadataForProcesses_) {
345  if (metadataForProcess.useIndexIntoFile()) {
346  metadataForProcess.setUseIndexIntoFile(false);
347 
348  std::vector<LuminosityBlockNumber_t> temp;
349  temp.reserve(metadataForProcess.lumis().size() + lumisFromIndexIntoFile_.size());
350  std::vector<LuminosityBlockNumber_t>::const_iterator end1 = metadataForProcess.lumis().end();
351  std::vector<LuminosityBlockNumber_t>::const_iterator end2 = lumisFromIndexIntoFile_.end();
352  for (std::vector<LuminosityBlockNumber_t>::const_iterator iter1 = metadataForProcess.lumis().begin(),
353  iter2 = lumisFromIndexIntoFile_.begin();
354  iter1 != end1 || iter2 != end2;) {
355  if (iter1 == end1) {
356  temp.push_back(*iter2);
357  ++iter2;
358  continue;
359  } else if (iter2 == end2) {
360  temp.push_back(*iter1);
361  ++iter1;
362  continue;
363  } else if (*iter1 < *iter2) {
364  temp.push_back(*iter1);
365  ++iter1;
366  } else if (*iter1 > *iter2) {
367  temp.push_back(*iter2);
368  ++iter2;
369  } else {
370  // they must be equal
371  temp.push_back(*iter1);
372  ++iter1;
373  ++iter2;
374  }
375  }
376  metadataForProcess.lumis().swap(temp);
377  }
378  }
379  lumisFromIndexIntoFile_.clear();
381  }
382 }
size
Write out results.
bool knownImproperlyMerged(std::string const &processThatCreatedProduct) const override
std::vector< std::string > const & processesWithMergeableRunProducts() const
void addEntryToStoredMetadata(StoredMergeableRunProductMetadata &) const
bool addProcess(StoredMergeableRunProductMetadata &storedMetadata, MetadataForProcess const &metadataForProcess, unsigned int storedProcessIndex, unsigned long long beginProcess, unsigned long long endProcess) const
static const uint16_t valid_
Definition: Constants.h:17
unsigned int LuminosityBlockNumber_t
std::vector< SingleRunEntryAndProcess > & singleRunEntryAndProcesses()
MergeableRunProductProcesses const * mergeableRunProductProcesses_
tbb::concurrent_vector< LuminosityBlockNumber_t > const & lumisProcessed() const
std::vector< MetadataForProcess > metadataForProcesses_
def unique(seq, keepstr=True)
Definition: tier0.py:25
MergeableRunProductMetadata(MergeableRunProductProcesses const &)
MergeDecision getMergeDecision(std::string const &processThatCreatedProduct) const
std::vector< LuminosityBlockNumber_t > & lumis()
bool getLumiContent(unsigned long long runEntry, std::string const &process, bool &valid, std::vector< LuminosityBlockNumber_t >::const_iterator &lumisBegin, std::vector< LuminosityBlockNumber_t >::const_iterator &lumisEnd) const
void writeLumi(LuminosityBlockNumber_t lumi)
std::vector< LuminosityBlockNumber_t > lumisFromIndexIntoFile_
std::string const & getProcessName(unsigned int index) const
tbb::concurrent_vector< LuminosityBlockNumber_t > lumisProcessed_
std::vector< std::string > const & processesWithMergeableRunProducts() const
MetadataForProcess const * metadataForOneProcess(std::string const &processName) const
HLT enums.
void getLumisInRun(std::vector< LuminosityBlockNumber_t > &lumis) const
std::vector< xml::Document > includes
void readRun(long long inputRunEntry, StoredMergeableRunProductMetadata const &inputStoredMergeableRunProductMetadata, IndexIntoFileItrHolder const &inputIndexIntoFileItr)