1 # idea stolen from:
2 #
3 # PhysicsTools/PatAlgos/python/tools/
4 import das_client
5 import json
6 import os
7 import bisect
8 import re
9 from FWCore.PythonUtilities.LumiList import LumiList
10 from TkAlExceptions import AllInOneError
13 class Dataset:
14  def __init__( self, datasetName, dasLimit = 0 ):
15  self.__name = datasetName
16  # check, if dataset name matches CMS dataset naming scheme
17  if re.match( r'/.+/.+/.+', self.__name ):
18  self.__dataType = self.__getDataType()
19  self.__predefined = False
20  else:
21  fileName = self.__name + ""
22  searchPath1 = os.path.join( os.environ["CMSSW_BASE"], "python",
23  "Alignment", "OfflineValidation",
24  fileName )
25  searchPath2 = os.path.join( os.environ["CMSSW_BASE"], "src",
26  "Alignment", "OfflineValidation",
27  "python", fileName )
28  searchPath3 = os.path.join( os.environ["CMSSW_RELEASE_BASE"],
29  "python", "Alignment",
30  "OfflineValidation", fileName )
31  if os.path.exists( searchPath1 ):
32  pass
33  elif os.path.exists( searchPath2 ):
34  msg = ("The predefined dataset '%s' does exist in '%s', but "
35  "you need to run 'scram b' first."
36  %( self.__name, searchPath2 ))
37  raise AllInOneError( msg )
38  elif os.path.exists( searchPath3 ):
39  pass
40  else:
41  msg = ("The predefined dataset '%s' does not exist. Please "
42  "create it first or check for typos."%( self.__name ))
43  raise AllInOneError( msg )
44  self.__dataType = "unknown"
45  self.__predefined = True
46  self.__dasLimit = dasLimit
47  self.__fileList = None
48  self.__fileInfoList = None
49  self.__runList = None
51  def __chunks( self, theList, n ):
52  """ Yield successive n-sized chunks from theList.
53  """
54  for i in xrange( 0, len( theList ), n ):
55  yield theList[i:i+n]
57  def __createSnippet( self, jsonPath = None, begin = None, end = None,
58  firstRun = None, lastRun = None, repMap = None,
59  crab = False ):
60  if firstRun:
61  firstRun = int( firstRun )
62  if lastRun:
63  lastRun = int( lastRun )
64  if ( begin and firstRun ) or ( end and lastRun ):
65  msg = ( "The Usage of "
66  + "'begin' & 'firstRun' " * int( bool( begin and
67  firstRun ) )
68  + "and " * int( bool( ( begin and firstRun ) and
69  ( end and lastRun ) ) )
70  + "'end' & 'lastRun' " * int( bool( end and lastRun ) )
71  + "is ambigous." )
72  raise AllInOneError( msg )
73  if begin or end:
74  ( firstRun, lastRun ) = self.convertTimeToRun(
75  begin = begin, end = end, firstRun = firstRun,
76  lastRun = lastRun )
77  if ( firstRun and lastRun ) and ( firstRun > lastRun ):
78  msg = ( "The lower time/runrange limit ('begin'/'firstRun') "
79  "chosen is greater than the upper time/runrange limit "
80  "('end'/'lastRun').")
81  raise AllInOneError( msg )
82  goodLumiSecStr = ""
83  lumiStr = ""
84  lumiSecExtend = ""
85  if firstRun or lastRun:
86  goodLumiSecStr = ( "lumiSecs = cms.untracked."
87  "VLuminosityBlockRange()\n" )
88  lumiStr = " lumisToProcess = lumiSecs,\n"
89  if not jsonPath:
90  selectedRunList = self.__getRunList()
91  if firstRun:
92  selectedRunList = [ run for run in selectedRunList \
93  if run["run_number"] >= firstRun ]
94  if lastRun:
95  selectedRunList = [ run for run in selectedRunList \
96  if run["run_number"] <= lastRun ]
97  lumiList = [ str( run["run_number"] ) + ":1-" \
98  + str( run["run_number"] ) + ":max" \
99  for run in selectedRunList ]
100  splitLumiList = list( self.__chunks( lumiList, 255 ) )
101  else:
102  theLumiList = LumiList ( filename = jsonPath )
103  allRuns = theLumiList.getRuns()
104  runsToRemove = []
105  for run in allRuns:
106  if firstRun and int( run ) < firstRun:
107  runsToRemove.append( run )
108  if lastRun and int( run ) > lastRun:
109  runsToRemove.append( run )
110  theLumiList.removeRuns( runsToRemove )
111  splitLumiList = list( self.__chunks(
112  theLumiList.getCMSSWString().split(','), 255 ) )
113  if not len(splitLumiList[0][0]) == 0:
114  lumiSecStr = [ "',\n'".join( lumis ) \
115  for lumis in splitLumiList ]
116  lumiSecStr = [ "lumiSecs.extend( [\n'" + lumis + "'\n] )" \
117  for lumis in lumiSecStr ]
118  lumiSecExtend = "\n".join( lumiSecStr )
119  elif jsonPath:
120  goodLumiSecStr = ( "goodLumiSecs = LumiList.LumiList(filename"
121  "= '%(json)s').getCMSSWString().split(',')\n"
122  "lumiSecs = cms.untracked"
123  ".VLuminosityBlockRange()\n"
124  )
125  lumiStr = " lumisToProcess = lumiSecs,\n"
126  lumiSecExtend = "lumiSecs.extend(goodLumiSecs)\n"
127  if crab:
128  files = ""
129  else:
130  splitFileList = list( self.__chunks( self.fileList(), 255 ) )
131  fileStr = [ "',\n'".join( files ) for files in splitFileList ]
132  fileStr = [ "readFiles.extend( [\n'" + files + "'\n] )" \
133  for files in fileStr ]
134  files = "\n".join( fileStr )
135  theMap = repMap
136  theMap["files"] = files
137  theMap["json"] = jsonPath
138  theMap["lumiStr"] = lumiStr
139  theMap["goodLumiSecStr"] = goodLumiSecStr%( theMap )
140  theMap["lumiSecExtend"] = lumiSecExtend
141  if crab:
142  dataset_snippet = self.__dummy_source_template%( theMap )
143  else:
144  dataset_snippet = self.__source_template%( theMap )
145  return dataset_snippet
147  __dummy_source_template = ("%(process)smaxEvents = cms.untracked.PSet( "
148  "input = cms.untracked.int32(%(nEvents)s) )\n"
149  "readFiles = cms.untracked.vstring()\n"
150  "secFiles = cms.untracked.vstring()\n"
151  "%(process)ssource = cms.Source(\"PoolSource\",\n"
152  "%(tab)s secondaryFileNames ="
153  "secFiles,\n"
154  "%(tab)s fileNames = readFiles\n"
155  ")\n"
156  "readFiles.extend(['dummy_File.root'])\n")
158  def __find_lt( self, a, x ):
159  'Find rightmost value less than x'
160  i = bisect.bisect_left( a, x )
161  if i:
162  return i-1
163  raise ValueError
165  def __find_ge( self, a, x):
166  'Find leftmost item greater than or equal to x'
167  i = bisect.bisect_left( a, x )
168  if i != len( a ):
169  return i
170  raise ValueError
172  def __getData( self, dasQuery, dasLimit = 0 ):
173  dasData = das_client.get_data( '',
174  dasQuery, 0, dasLimit, False )
175  jsondict = json.loads( dasData )
176  # Check, if the DAS query fails
177  if jsondict["status"] != 'ok':
178  msg = "Status not 'ok', but:", jsondict["status"]
179  raise AllInOneError(msg)
180  return jsondict["data"]
182  def __getDataType( self ):
183  dasQuery_type = ( 'dataset dataset=%s | grep dataset.datatype,'
184  ''%( self.__name ) )
185  data = self.__getData( dasQuery_type )
186  return data[0]["dataset"][0]["datatype"]
188  def __getFileInfoList( self, dasLimit ):
189  if self.__fileInfoList:
190  return self.__fileInfoList
191  dasQuery_files = ( 'file dataset=%s | grep, file.nevents, '
192  'file.creation_time, '
193  'file.modification_time'%( self.__name ) )
194  print "Requesting file information for '%s' from DAS..."%( self.__name ),
195  data = self.__getData( dasQuery_files, dasLimit )
196  print "Done."
197  data = [ entry["file"] for entry in data ]
198  if len( data ) == 0:
199  msg = ("No files are available for the dataset '%s'. This can be "
200  "due to a typo or due to a DAS problem. Please check the "
201  "spelling of the dataset and/or retry to run "
202  "''."%( ))
203  raise AllInOneError( msg )
204  fileInformationList = []
205  for file in data:
206  fileName = file[0]["name"]
207  fileCreationTime = file[0]["creation_time"]
208  for ii in range(3):
209  try:
210  fileNEvents = file[ii]["nevents"]
211  except KeyError:
212  continue
213  break
214  # select only non-empty files
215  if fileNEvents == 0:
216  continue
217  fileDict = { "name": fileName,
218  "creation_time": fileCreationTime,
219  "nevents": fileNEvents
220  }
221  fileInformationList.append( fileDict )
222  fileInformationList.sort( key=lambda info: info["name"] )
223  return fileInformationList
225  def __getRunList( self ):
226  if self.__runList:
227  return self.__runList
228  dasQuery_runs = ( 'run dataset=%s | grep run.run_number,'
229  'run.creation_time'%( self.__name ) )
230  print "Requesting run information for '%s' from DAS..."%( self.__name ),
231  data = self.__getData( dasQuery_runs )
232  print "Done."
233  data = [ entry["run"][0] for entry in data ]
234  data.sort( key = lambda run: run["creation_time"] )
235  self.__runList = data
236  return data
238  __source_template= ("%(importCms)s"
239  "import FWCore.PythonUtilities.LumiList as LumiList\n\n"
240  "%(goodLumiSecStr)s"
241  "%(process)smaxEvents = cms.untracked.PSet( "
242  "input = cms.untracked.int32(%(nEvents)s) )\n"
243  "readFiles = cms.untracked.vstring()\n"
244  "secFiles = cms.untracked.vstring()\n"
245  "%(process)ssource = cms.Source(\"PoolSource\",\n"
246  "%(lumiStr)s"
247  "%(tab)s secondaryFileNames ="
248  "secFiles,\n"
249  "%(tab)s fileNames = readFiles\n"
250  ")\n"
251  "%(files)s\n"
252  "%(lumiSecExtend)s\n")
254  def convertTimeToRun( self, begin = None, end = None,
255  firstRun = None, lastRun = None,
256  shortTuple = True ):
257  if ( begin and firstRun ) or ( end and lastRun ):
258  msg = ( "The Usage of "
259  + "'begin' & 'firstRun' " * int( bool( begin and
260  firstRun ) )
261  + "and " * int( bool( ( begin and firstRun ) and
262  ( end and lastRun ) ) )
263  + "'end' & 'lastRun' " * int( bool( end and lastRun ) )
264  + "is ambigous." )
265  raise AllInOneError( msg )
267  runList = [ run["run_number"] for run in self.__getRunList() ]
268  runTimeList = [ run["creation_time"] for run in self.__getRunList() ]
269  if begin:
270  try:
271  runIndex = self.__find_ge( runTimeList, begin )
272  except ValueError:
273  msg = ( "Your 'begin' is after the creation time of the last "
274  "run in the dataset\n'%s'"%( self.__name ) )
275  raise AllInOneError( msg )
276  firstRun = runList[runIndex]
277  begin = None
278  if end:
279  try:
280  runIndex = self.__find_lt( runTimeList, end )
281  except ValueError:
282  msg = ( "Your 'end' is before the creation time of the first "
283  "run in the dataset\n'%s'"%( self.__name ) )
284  raise AllInOneError( msg )
285  lastRun = runList[runIndex]
286  end = None
287  if shortTuple:
288  return firstRun, lastRun
289  else:
290  return begin, end, firstRun, lastRun
292  def dataType( self ):
293  return self.__dataType
295  def datasetSnippet( self, jsonPath = None, begin = None, end = None,
296  firstRun = None, lastRun = None, nEvents = None,
297  crab = False ):
298  if self.__predefined:
299  return ("process.load(\"Alignment.OfflineValidation.%s_cff\")\n"
300  "process.maxEvents = cms.untracked.PSet(\n"
301  " input = cms.untracked.int32(%s)\n"
302  ")"
303  %( self.__name, nEvents ))
304  theMap = { "process": "process.",
305  "tab": " " * len( "process." ),
306  "nEvents": str( nEvents ),
307  "importCms": ""
308  }
309  datasetSnippet = self.__createSnippet( jsonPath = jsonPath,
310  begin = begin,
311  end = end,
312  firstRun = firstRun,
313  lastRun = lastRun,
314  repMap = theMap,
315  crab = crab )
316  return datasetSnippet
318  def dump_cff( self, outName = None, jsonPath = None, begin = None,
319  end = None, firstRun = None, lastRun = None ):
320  if outName == None:
321  outName = "Dataset"
322  packageName = os.path.join( "Alignment", "OfflineValidation" )
323  if not os.path.exists( os.path.join(
324  os.environ["CMSSW_BASE"], "src", packageName ) ):
325  msg = ("You try to store the predefined dataset'%s'.\n"
326  "For that you need to check out the package '%s' to your "
327  "private relase area in\n"%( outName, packageName )
328  + os.environ["CMSSW_BASE"] )
329  raise AllInOneError( msg )
330  theMap = { "process": "",
331  "tab": "",
332  "nEvents": str( -1 ),
333  "importCms": "import FWCore.ParameterSet.Config as cms\n" }
334  dataset_cff = self.__createSnippet( jsonPath = jsonPath,
335  begin = begin,
336  end = end,
337  firstRun = firstRun,
338  lastRun = lastRun,
339  repMap = theMap)
340  filePath = os.path.join( os.environ["CMSSW_BASE"], "src", packageName,
341  "python", outName + "" )
342  if os.path.exists( filePath ):
343  existMsg = "The predefined dataset '%s' already exists.\n"%( outName )
344  askString = "Do you want to overwrite it? [y/n]\n"
345  inputQuery = existMsg + askString
346  while True:
347  userInput = raw_input( inputQuery ).lower()
348  if userInput == "y":
349  break
350  elif userInput == "n":
351  return
352  else:
353  inputQuery = askString
354  print ( "The predefined dataset '%s' will be stored in the file\n"
355  %( outName )
356  + filePath +
357  "\nFor future use you have to do 'scram b'." )
358  print
359  theFile = open( filePath, "w" )
360  theFile.write( dataset_cff )
361  theFile.close()
362  return
364  def fileList( self ):
365  if self.__fileList:
366  return self.__fileList
367  fileList = [ fileInfo["name"] \
368  for fileInfo in self.fileInfoList() ]
369  self.__fileList = fileList
370  return fileList
372  def fileInfoList( self ):
373  return self.__getFileInfoList( self.__dasLimit )
375  def name( self ):
376  return self.__name
378  def predefined( self ):
379  return self.__predefined
381  def runList( self ):
382  if self.__runList:
383  return self.__runList
384  return self.__getRunList()
387 if __name__ == '__main__':
388  print "Start testing..."
389  datasetName = '/MinimumBias/Run2012D-TkAlMinBias-v1/ALCARECO'
390  jsonFile = ( '/afs/'
391  'Collisions12/8TeV/Prompt/'
392  'Cert_190456-207898_8TeV_PromptReco_Collisions12_JSON.txt' )
393  dataset = Dataset( datasetName )
394  print dataset.datasetSnippet( nEvents = 100,jsonPath = jsonFile,
395  firstRun = "207983",
396  end = "2012-11-28 00:00:00" )
397  dataset.dump_cff( outName = "Dataset_Test_TkAlMinBias_Run2012D",
398  jsonPath = jsonFile,
399  firstRun = "207983",
400  end = "2012-11-28 00:00:00" )
