CMS 3D CMS Logo

dataset.py
Go to the documentation of this file.
1 from __future__ import print_function
2 from __future__ import absolute_import
3 
4 import abc
5 import csv
6 import os
7 import re
8 
9 import Utilities.General.cmssw_das_client as das_client
10 
11 from .utilities import cache
12 
13 class DatasetError(Exception): pass
14 
15 defaultdasinstance = "prod/global"
16 
18  def __init__(self, firstrun, lastrun, runs):
19  self.firstrun = firstrun
20  self.lastrun = lastrun
21  self.runs = runs
22 
23  def __contains__(self, run):
24  if self.runs and run not in self.runs: return False
25  return self.firstrun <= run <= self.lastrun
26 
27 def dasquery(dasQuery, dasLimit=0):
28  dasData = das_client.get_data(dasQuery, dasLimit)
29  if isinstance(dasData, str):
30  jsondict = json.loads( dasData )
31  else:
32  jsondict = dasData
33  # Check, if the DAS query fails
34  try:
35  error = findinjson(jsondict, "data","error")
36  except KeyError:
37  error = None
38  if error or findinjson(jsondict, "status") != 'ok' or "data" not in jsondict:
39  try:
40  jsonstr = findinjson(jsondict, "reason")
41  except KeyError:
42  jsonstr = str(jsondict)
43  if len(jsonstr) > 10000:
44  jsonfile = "das_query_output_%i.txt"
45  i = 0
46  while os.path.lexists(jsonfile % i):
47  i += 1
48  jsonfile = jsonfile % i
49  theFile = open( jsonfile, "w" )
50  theFile.write( jsonstr )
51  theFile.close()
52  msg = "The DAS query returned an error. The output is very long, and has been stored in:\n" + jsonfile
53  else:
54  msg = "The DAS query returned a error. Here is the output\n" + jsonstr
55  msg += "\nIt's possible that this was a server error. If so, it may work if you try again later"
56  raise DatasetError(msg)
57  return findinjson(jsondict, "data")
58 
59 def getrunnumbersfromfile(filename, trydas=True, allowunknown=False, dasinstance=defaultdasinstance):
60  parts = filename.split("/")
61  error = None
62  if parts[0] != "" or parts[1] != "store":
63  error = "does not start with /store"
64  elif parts[2] in ["mc", "relval"]:
65  return [1]
66  elif not parts[-1].endswith(".root"):
67  error = "does not end with something.root"
68  elif len(parts) != 12:
69  error = "should be exactly 11 slashes counting the first one"
70  else:
71  runnumberparts = parts[-5:-2]
72  if not all(len(part)==3 for part in runnumberparts):
73  error = "the 3 directories {} do not have length 3 each".format("/".join(runnumberparts))
74  try:
75  return [int("".join(runnumberparts))]
76  except ValueError:
77  error = "the 3 directories {} do not form an integer".format("/".join(runnumberparts))
78 
79  if error and trydas:
80  try:
81  query = "run file={} instance={}".format(filename, dasinstance)
82  dasoutput = dasquery(query)
83  result = findinjson(dasoutput, "run")
84  return sum((findinjson(run, "run_number") for run in result), [])
85  except Exception as e:
86  error = str(e)
87 
88  if error and allowunknown:
89  return [-1]
90 
91  if error:
92  error = "could not figure out which run number this file is from.\nMaybe try with allowunknown=True?\n {}\n{}".format(filename, error)
93  raise DatasetError(error)
94 
95 def findinjson(jsondict, *strings):
96  if len(strings) == 0:
97  return jsondict
98  if isinstance(jsondict,dict):
99  if strings[0] in jsondict:
100  try:
101  return findinjson(jsondict[strings[0]], *strings[1:])
102  except KeyError:
103  pass
104  else:
105  for a in jsondict:
106  if strings[0] in a:
107  try:
108  return findinjson(a[strings[0]], *strings[1:])
109  except (TypeError, KeyError): #TypeError because a could be a string and contain strings[0]
110  pass
111  #if it's not found
112  raise KeyError("Can't find " + strings[0])
113 
115  def __init__(self, filename, nevents, runs=None, trydas=True, allowunknown=False, dasinstance=defaultdasinstance):
116  self.filename = filename
117  self.nevents = int(nevents)
118  if runs is None:
119  runs = getrunnumbersfromfile(filename, trydas=trydas, allowunknown=allowunknown, dasinstance=dasinstance)
120  if isinstance(runs, str):
121  runs = runs.split()
122  self.runs = [int(_) for _ in runs]
123 
124  def getdict(self):
125  return {"filename": self.filename, "nevents": str(self.nevents), "runs": " ".join(str(_) for _ in self.runs)}
126 
128  __metaclass__ = abc.ABCMeta
129 
130  @abc.abstractmethod
131  def getfiles(self, usecache):
132  pass
133 
134  @abc.abstractproperty
135  def headercomment(self):
136  pass
137 
138  def writefilelist_validation(self, firstrun, lastrun, runs, maxevents, outputfile=None, usecache=True):
139  runrange = RunRange(firstrun=firstrun, lastrun=lastrun, runs=runs)
140 
141  if outputfile is None:
142  outputfile = os.path.join(os.environ["CMSSW_BASE"], "src", "Alignment", "OfflineValidation", "python", self.filenamebase+"_cff.py")
143 
144  if maxevents < 0: maxevents = float("inf")
145  totalevents = sum(datafile.nevents for datafile in self.getfiles(usecache) if all(run in runrange for run in datafile.runs))
146  if totalevents == 0:
147  raise ValueError("No events within the run range!")
148  accepted = rejected = 0. #float so fractions are easier
149 
150  fractiontoaccept = 1.*maxevents / totalevents
151 
152  with open(outputfile, "w") as f:
153  f.write("#"+self.headercomment+"\n")
154  f.write(validationheader)
155  for datafile in self.getfiles(usecache):
156  if all(run in runrange for run in datafile.runs):
157  if accepted == 0 or accepted / (accepted+rejected) <= fractiontoaccept:
158  f.write('"' + datafile.filename + '",\n')
159  accepted += datafile.nevents
160  else:
161  rejected += datafile.nevents
162  elif any(run in runrange for run in datafile.runs):
163  raise DatasetError("file {} has multiple runs {}, which straddle firstrun or lastrun".format(datafile.filename, datafile.runs))
164  f.write("#total events in these files: {}".format(accepted))
165  f.write(validationfooter)
166 
167  def writefilelist_hippy(self, firstrun, lastrun, runs, eventsperjob, maxevents, outputfile, usecache=True):
168  runrange = RunRange(firstrun=firstrun, lastrun=lastrun, runs=runs)
169  if maxevents < 0: maxevents = float("inf")
170  totalevents = sum(datafile.nevents for datafile in self.getfiles(usecache) if all(run in runrange for run in datafile.runs))
171  if totalevents == 0:
172  raise ValueError("No events within the run range!")
173  accepted = rejected = inthisjob = 0. #float so fractions are easier
174 
175  fractiontoaccept = 1.*maxevents / totalevents
176  writecomma = False
177 
178  with open(outputfile, "w") as f:
179  for datafile in self.getfiles(usecache):
180  if all(run in runrange for run in datafile.runs):
181  if accepted == 0 or accepted / (accepted+rejected) <= fractiontoaccept:
182  if writecomma: f.write(",")
183  f.write("'" + datafile.filename + "'")
184  accepted += datafile.nevents
185  inthisjob += datafile.nevents
186  if inthisjob >= eventsperjob:
187  f.write("\n")
188  inthisjob = 0
189  writecomma = False
190  else:
191  writecomma = True
192  else:
193  rejected += datafile.nevents
194  elif any(run in runrange for run in datafile.runs):
195  raise DatasetError("file {} has multiple runs {}, which straddle firstrun or lastrun".format(datafile.filename, datafile.runs))
196  f.write("\n")
197 
199  def __init__(self, datasetname, dasinstance=defaultdasinstance):
200  self.datasetname = datasetname
201  if re.match(r'/.+/.+/.+', datasetname):
202  self.official = True
203  self.filenamebase = "Dataset" + self.datasetname.replace("/","_")
204  else:
205  self.official = False
206  self.filenamebase = datasetname
207 
208  self.dasinstance = dasinstance
209 
210  @cache
211  def getfiles(self, usecache):
212  filename = os.path.join(os.environ["CMSSW_BASE"], "src", "Alignment", "CommonAlignment", "data", self.filenamebase+".csv")
213  if not usecache:
214  try:
215  os.remove(filename)
216  except OSError as e:
217  if os.path.exists(filename):
218  raise
219 
220  result = []
221  try:
222  with open(filename) as f:
223  for row in csv.DictReader(f):
224  result.append(DataFile(**row))
225  return result
226  except IOError:
227  pass
228 
229  query = "file dataset={} instance={} detail=true | grep file.name, file.nevents".format(self.datasetname, self.dasinstance)
230  dasoutput = dasquery(query)
231  if not dasoutput:
232  raise DatasetError("No files are available for the dataset '{}'. This can be "
233  "due to a typo or due to a DAS problem. Please check the "
234  "spelling of the dataset and/or try again.".format(datasetname))
235  result = [DataFile(findinjson(_, "file", "name"), findinjson(_, "file", "nevents")) for _ in dasoutput if int(findinjson(_, "file", "nevents"))]
236  try:
237  with open(filename, "w") as f:
238  writer = csv.DictWriter(f, ("filename", "nevents", "runs"))
239  writer.writeheader()
240  for datafile in result:
241  writer.writerow(datafile.getdict())
242  except Exception as e:
243  print("Couldn't write the dataset csv file:\n\n{}".format(e))
244  return result
245 
246  @property
247  def headercomment(self):
248  return self.datasetname
249 
251  def __init__(self, *datasets, **kwargs):
252  dasinstance = defaultdasinstance
253  for kw, kwarg in kwargs.iteritems():
254  if kw == "dasinstance":
255  dasinstance = kwarg
256  else:
257  raise TypeError("Unknown kwarg {}={}".format(kw, kwarg))
258  self.datasets = [Dataset(dataset, dasinstance=dasinstance) for dataset in datasets]
259 
260  @cache
261  def getfiles(self, usecache):
262  return sum([d.getfiles(usecache=usecache) for d in self.datasets], [])
263 
264  @property
265  def headercomment(self):
266  return ", ".join(d.headercomment for d in self.datasets)
267 
268 validationheader = """
269 import FWCore.ParameterSet.Config as cms
270 
271 maxEvents = cms.untracked.PSet( input = cms.untracked.int32(-1) )
272 readFiles = cms.untracked.vstring()
273 secFiles = cms.untracked.vstring()
274 source = cms.Source ("PoolSource",fileNames = readFiles, secondaryFileNames = secFiles)
275 readFiles.extend( [
276 """
277 
278 validationfooter = """
279 ] )
280 """
def __init__(self, datasets, kwargs)
Definition: dataset.py:251
def dasquery(dasQuery, dasLimit=0)
Definition: dataset.py:27
bool any(const std::vector< T > &v, const T &what)
Definition: ECalSD.cc:37
def getfiles(self, usecache)
Definition: dataset.py:211
S & print(S &os, JobReport::InputFile const &f)
Definition: JobReport.cc:66
def getrunnumbersfromfile(filename, trydas=True, allowunknown=False, dasinstance=defaultdasinstance)
Definition: dataset.py:59
def __contains__(self, run)
Definition: dataset.py:23
def getdict(self)
Definition: dataset.py:124
def getfiles(self, usecache)
Definition: dataset.py:131
def writefilelist_hippy(self, firstrun, lastrun, runs, eventsperjob, maxevents, outputfile, usecache=True)
Definition: dataset.py:167
def headercomment(self)
Definition: dataset.py:265
def headercomment(self)
Definition: dataset.py:247
std::pair< RunNumber, RunNumber > RunRange
Definition: Utilities.h:39
def writefilelist_validation(self, firstrun, lastrun, runs, maxevents, outputfile=None, usecache=True)
Definition: dataset.py:138
static std::string join(char **cmd)
Definition: RemoteFile.cc:18
def __init__(self, datasetname, dasinstance=defaultdasinstance)
Definition: dataset.py:199
def findinjson(jsondict, strings)
Definition: dataset.py:95
def getfiles(self, usecache)
Definition: dataset.py:261
#define str(s)
def headercomment(self)
Definition: dataset.py:135
def __init__(self, filename, nevents, runs=None, trydas=True, allowunknown=False, dasinstance=defaultdasinstance)
Definition: dataset.py:115
def __init__(self, firstrun, lastrun, runs)
Definition: dataset.py:18