CMS 3D CMS Logo

dataset.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 from __future__ import print_function
4 from __future__ import absolute_import
5 from builtins import range
6 import os
7 import pprint
8 import re
9 import pickle
10 import sys
11 
12 from .castorBaseDir import castorBaseDir
13 from . import eostools as castortools
14 import fnmatch
15 import six
16 
18  def __init__(self, value):
19  self.value = value
20  def __str__(self):
21  return repr(self.value)
22 
24 
25  ### def __init__(self, name, user, pattern='.*root', run_range=None):
26  def __init__(self, name, user, pattern='.*root', run_range=None, dbsInstance=None):
27  self.name = name
28  self.user = user
29  self.pattern = pattern
30  self.run_range = run_range
31  ### MM
32  self.dbsInstance = dbsInstance
33  ### MM
35  self.report = None
36  self.buildListOfFiles( self.pattern )
37  self.extractFileSizes()
38  self.buildListOfBadFiles()
40 
41  def buildListOfFiles( self, pattern ):
42  self.files = []
43 
44  def extractFileSizes(self):
45  '''Get the file size for each file,
46  from the eos ls -l command.'''
47  self.filesAndSizes = {}
48 
50  self.good_files = []
51  self.bad_files = {}
52 
53  def printInfo(self):
54  print('sample : ' + self.name)
55  print('user : ' + self.user)
56 
58  return self.primaryDatasetEntries
59 
60  def printFiles(self, abspath=True, info=True):
61  # import pdb; pdb.set_trace()
62  if self.files == None:
63  self.buildListOfFiles(self.pattern)
64  for file in self.files:
65  status = 'OK'
66  if file in self.bad_files:
67  status = self.bad_files[file]
68  elif file not in self.good_files:
69  status = 'UNKNOWN'
70  fileNameToPrint = file
71  if abspath == False:
72  fileNameToPrint = os.path.basename(file)
73  if info:
74  size=self.filesAndSizes.get(file,'UNKNOWN').rjust(10)
75  # if size is not None:
76  # size = size.rjust(10)
77  print(status.ljust(10), size, \
78  '\t', fileNameToPrint)
79  else:
80  print(fileNameToPrint)
81  print('PrimaryDatasetEntries: %d' % self.primaryDatasetEntries)
82 
83  def listOfFiles(self):
84  '''Returns all files, even the bad ones.'''
85  return self.files
86 
87  def listOfGoodFiles(self):
88  '''Returns all files flagged as good in the integrity
89  check text output, or not present in this file, are
90  considered as good.'''
91  self.good_files = []
92  for file in self.files:
93  if file not in self.bad_files:
94  self.good_files.append( file )
95  return self.good_files
96 
97  def listOfGoodFilesWithPrescale(self, prescale):
98  """Takes the list of good files and selects a random sample
99  from them according to the prescale factor.
100  E.g. a prescale of 10 will select 1 in 10 files."""
101 
102  good_files = self.listOfGoodFiles()
103  if prescale < 2:
104  return self.good_files
105 
106  #the number of files to select from the dataset
107  num_files = int( (len(good_files)/(1.0*prescale)) + 0.5)
108  if num_files < 1:
109  num_files = 1
110  if num_files > len(good_files):
111  num_files = len(good_files)
112 
113  #pick unique good files randomly
114  import random
115  subset = set()
116  while len(subset) < num_files:
117  #pick a random file from the list
118  choice = random.choice(good_files)
119  slen = len(subset)
120  #add to the set
121  subset.add(choice)
122  #if this was a unique file remove so we don't get
123  #very slow corner cases where prescale is small
124  if len(subset) > slen:
125  good_files.remove(choice)
126  assert len(subset)==num_files,'The number of files does not match'
127 
128  return [f for f in subset]
129 
131 
132  def __init__(self, name, run_range = None):
133  super(CMSDataset, self).__init__( name, 'CMS', run_range=run_range)
134 
135  def buildListOfFilesDBS(self, pattern, begin=-1, end=-1):
136  print('buildListOfFilesDBS',begin,end)
137  sampleName = self.name.rstrip('/')
138  query, qwhat = sampleName, "dataset"
139  if "#" in sampleName: qwhat = "block"
140  if self.run_range is not None and self.run_range != (-1,-1):
141  if self.run_range[0] == self.run_range[1]:
142  query += " run=%s" % self.run_range[0]
143  else:
144  print("WARNING: queries with run ranges are slow in DAS")
145  query += " run between [%s,%s]" % ( self.run_range[0],self.run_range[1] )
146  dbs='das_client.py --query="file %s=%s"'%(qwhat,query)
147  if begin >= 0:
148  dbs += ' --index %d' % begin
149  if end >= 0:
150  dbs += ' --limit %d' % (end-begin+1)
151  else:
152  dbs += ' --limit 0'
153  print('dbs\t: %s' % dbs)
154  dbsOut = os.popen(dbs)
155  files = []
156  for line in dbsOut:
157  if line.find('/store')==-1:
158  continue
159  line = line.rstrip()
160  # print 'line',line
161  files.append(line)
162  return files
163 
164  def buildListOfFiles(self, pattern='.*root'):
165  runs = (-1,-1)
166  if self.run_range is not None:
167  runs = self.run_range
168  num_files=self.findPrimaryDatasetNumFiles(self.name.rstrip('/'),
169  runs[0],runs[1])
170  limit = 10000
171  if num_files > limit:
172  num_steps = int(num_files/limit)+1
173  self.files = []
174  for i in range(num_steps):
175  DBSFiles=self.buildListOfFilesDBS(pattern,
176  i*limit,
177  ((i+1)*limit)-1)
178  self.files.extend(DBSFiles)
179  else:
180  self.files = self.buildListOfFilesDBS(pattern)
181 
182  @staticmethod
183  def findPrimaryDatasetEntries(dataset, runmin, runmax):
184 
185  query, qwhat = dataset, "dataset"
186  if "#" in dataset: qwhat = "block"
187  if runmin >0 or runmax > 0:
188  if runmin == runmax:
189  query = "%s run=%d" % (query,runmin)
190  else:
191  print("WARNING: queries with run ranges are slow in DAS")
192  query = "%s run between [%d, %d]" % (query,runmin if runmin > 0 else 1, runmax if runmax > 0 else 999999)
193  dbs='das_client.py --query="summary %s=%s"'%(qwhat,query)
194  dbsOut = os.popen(dbs).readlines()
195 
196  entries = []
197  for line in dbsOut:
198  line = line.replace('\n','')
199  if "nevents" in line:
200  entries.append(int(line.split(":")[1]))
201  if entries:
202  return sum(entries)
203  return -1
204 
205  @staticmethod
206  def findPrimaryDatasetNumFiles(dataset, runmin, runmax):
207 
208  query, qwhat = dataset, "dataset"
209  if "#" in dataset: qwhat = "block"
210  if runmin >0 or runmax > 0:
211  if runmin == runmax:
212  query = "%s run=%d" % (query,runmin)
213  else:
214  print("WARNING: queries with run ranges are slow in DAS")
215  query = "%s run between [%d, %d]" % (query,runmin if runmin > 0 else 1, runmax if runmax > 0 else 999999)
216  dbs='das_client.py --query="summary %s=%s"'%(qwhat,query)
217  dbsOut = os.popen(dbs).readlines()
218 
219  entries = []
220  for line in dbsOut:
221  line = line.replace('\n','')
222  if "nfiles" in line:
223  entries.append(int(line.split(":")[1]))
224  if entries:
225  return sum(entries)
226  return -1
227 
229  runmin = -1
230  runmax = -1
231  if self.run_range is not None:
232  runmin = self.run_range[0]
233  runmax = self.run_range[1]
234  return self.findPrimaryDatasetEntries(self.name, runmin, runmax)
235 
237 
238  def __init__(self, name, basedir, pattern):
239  self.basedir = basedir
240  super(LocalDataset, self).__init__( name, 'LOCAL', pattern)
241 
242  def buildListOfFiles(self, pattern='.*root'):
243  pat = re.compile( pattern )
244  sampleName = self.name.rstrip('/')
245  self.dir = ''.join( [os.path.abspath(self.basedir),
246  sampleName ] )
247  self.files = []
248  for file in sorted(os.listdir( self.dir )):
249  if pat.match( file ) is not None:
250  self.files.append( '/'.join([self.dir, file]) )
251  # print file
252 
254  '''A dataset located in any given eos directory'''
255 
256  def __init__(self, name, basedir, pattern):
257  self.castorDir = '/'.join([basedir, name])
258  if not castortools.isEOSDir(self.castorDir):
259  raise ValueError('directory should be a directory on EOS.')
260  super(EOSDataset, self).__init__( name, 'EOS', pattern)
261 
262  def buildListOfFiles(self, pattern='.*root'):
263  self.files = castortools.matchingFiles( self.castorDir, pattern )
264 
265 
266 class Dataset( BaseDataset ):
267 
268  def __init__(self, name, user, pattern='.*root'):
269  self.lfnDir = castorBaseDir(user) + name
270  self.castorDir = castortools.lfnToCastor( self.lfnDir )
271  self.maskExists = False
272  self.report = None
273  super(Dataset, self).__init__(name, user, pattern)
274 
275  def buildListOfFiles(self, pattern='.*root'):
276  '''fills list of files, taking all root files matching the pattern in the castor dir'''
277  self.files = castortools.matchingFiles( self.castorDir, pattern )
278 
280  '''fills the list of bad files from the IntegrityCheck log.
281 
282  When the integrity check file is not available,
283  files are considered as good.'''
284  mask = "IntegrityCheck"
285 
286  self.bad_files = {}
287  self.good_files = []
288 
289  file_mask = castortools.matchingFiles(self.castorDir, '^%s_.*\.txt$' % mask)
290  if file_mask:
291  # here to avoid circular dependency
292  from .edmIntegrityCheck import PublishToFileSystem
293  p = PublishToFileSystem(mask)
294  report = p.get(self.castorDir)
295  if report is not None and report:
296  self.maskExists = True
297  self.report = report
298  dup = report.get('ValidDuplicates',{})
299  for name, status in six.iteritems(report['Files']):
300  # print name, status
301  if not status[0]:
302  self.bad_files[name] = 'MarkedBad'
303  elif name in dup:
304  self.bad_files[name] = 'ValidDup'
305  else:
306  self.good_files.append( name )
307  else:
308  raise IntegrityCheckError( "ERROR: IntegrityCheck log file IntegrityCheck_XXXXXXXXXX.txt not found" )
309 
310  def extractFileSizes(self):
311  '''Get the file size for each file, from the eos ls -l command.'''
312  # EOS command does not work in tier3
313  lsout = castortools.runXRDCommand(self.castorDir,'dirlist')[0]
314  lsout = lsout.split('\n')
315  self.filesAndSizes = {}
316  for entry in lsout:
317  values = entry.split()
318  if( len(values) != 5):
319  continue
320  # using full abs path as a key.
321  file = '/'.join([self.lfnDir, values[4].split("/")[-1]])
322  size = values[1]
323  self.filesAndSizes[file] = size
324 
325  def printInfo(self):
326  print('sample : ' + self.name)
327  print('LFN : ' + self.lfnDir)
328  print('Castor path : ' + self.castorDir)
329 
331  if self.report is not None and self.report:
332  return int(self.report.get('PrimaryDatasetEntries',-1))
333  return -1
334 
335 
336 ### MM
338 
339  def __init__(self, name, dbsInstance=None):
340  super(PrivateDataset, self).__init__(name, 'PRIVATE', dbsInstance=dbsInstance)
341 
342  def buildListOfFilesDBS(self, name, dbsInstance):
343  entries = self.findPrimaryDatasetNumFiles(name, dbsInstance, -1, -1)
344  files = []
345  dbs = 'das_client.py --query="file dataset=%s instance=prod/%s" --limit=%s' % (name, dbsInstance, entries)
346  dbsOut = os.popen(dbs)
347  for line in dbsOut:
348  if line.find('/store')==-1:
349  continue
350  line = line.rstrip()
351  # print 'line',line
352  files.append(line)
353  #return ['root://eoscms//eos/cms%s' % f for f in files]
354  return files
355 
356  def buildListOfFiles(self, pattern='.*root'):
357  self.files = self.buildListOfFilesDBS(self.name, self.dbsInstance)
358 
359 
360  @staticmethod
361  def findPrimaryDatasetEntries(dataset, dbsInstance, runmin, runmax):
362 
363  query, qwhat = dataset, "dataset"
364  if "#" in dataset: qwhat = "block"
365  if runmin >0 or runmax > 0:
366  if runmin == runmax:
367  query = "%s run=%d" % (query,runmin)
368  else:
369  print("WARNING: queries with run ranges are slow in DAS")
370  query = "%s run between [%d, %d]" % (query,runmin if runmin > 0 else 1, runmax if runmax > 0 else 999999)
371  dbs='das_client.py --query="summary %s=%s instance=prod/%s"'%(qwhat, query, dbsInstance)
372  dbsOut = os.popen(dbs).readlines()
373 
374  entries = []
375  for line in dbsOut:
376  line = line.replace('\n','')
377  if "nevents" in line:
378  entries.append(int(line.split(":")[1]))
379  if entries:
380  return sum(entries)
381  return -1
382 
383 
384  @staticmethod
385  def findPrimaryDatasetNumFiles(dataset, dbsInstance, runmin, runmax):
386 
387  query, qwhat = dataset, "dataset"
388  if "#" in dataset: qwhat = "block"
389  if runmin >0 or runmax > 0:
390  if runmin == runmax:
391  query = "%s run=%d" % (query,runmin)
392  else:
393  print("WARNING: queries with run ranges are slow in DAS")
394  query = "%s run between [%d, %d]" % (query,runmin if runmin > 0 else 1, runmax if runmax > 0 else 999999)
395  dbs='das_client.py --query="summary %s=%s instance=prod/%s"'%(qwhat, query, dbsInstance)
396  dbsOut = os.popen(dbs).readlines()
397 
398  entries = []
399  for line in dbsOut:
400  line = line.replace('\n','')
401  if "nfiles" in line:
402  entries.append(int(line.split(":")[1]))
403  if entries:
404  return sum(entries)
405  return -1
406 
408  runmin = -1
409  runmax = -1
410  if self.run_range is not None:
411  runmin = self.run_range[0]
412  runmax = self.run_range[1]
413  return self.findPrimaryDatasetEntries(self.name, self.dbsInstance, runmin, runmax)
414 ### MM
415 
416 def getDatasetFromCache( cachename ) :
417  cachedir = '/'.join( [os.environ['HOME'],'.cmgdataset'])
418  pckfile = open( cachedir + "/" + cachename )
419  dataset = pickle.load(pckfile)
420  return dataset
421 
422 def writeDatasetToCache( cachename, dataset ):
423  cachedir = '/'.join( [os.environ['HOME'],'.cmgdataset'])
424  if not os.path.exists(cachedir):
425  os.mkdir(cachedir)
426  pckfile = open( cachedir + "/" + cachename, 'w')
427  pickle.dump(dataset, pckfile)
428 
429 def createDataset( user, dataset, pattern, readcache=False,
430  basedir = None, run_range = None):
431 
432 
433  def cacheFileName(data, user, pattern):
434  return '{user}%{name}%{pattern}.pck'.format( user = user, name = data.replace('/','_'), pattern = pattern)
435 
436  def writeCache(dataset):
437  writeDatasetToCache( cacheFileName(dataset.name, dataset.user, dataset.pattern), dataset )
438 
439  def readCache(data, user, pattern):
440  return getDatasetFromCache( cacheFileName(data, user, pattern) )
441 
442  if readcache:
443  try:
444  data = readCache(dataset, user, pattern)
445  except IOError:
446  readcache = False
447  if not readcache:
448  if user == 'CMS':
449  data = CMSDataset( dataset , run_range = run_range)
450  info = False
451  elif user == 'LOCAL':
452  data = LocalDataset( dataset, basedir, pattern)
453  info = False
454  elif user == 'EOS':
455  data = EOSDataset(dataset, basedir, pattern)
456  info = False
457  else:
458  data = Dataset( dataset, user, pattern)
459  writeCache(data)
460 ## if user == 'CMS':
461 ## data = CMSDataset( dataset )
462 ## elif user == 'LOCAL':
463 ## if basedir is None:
464 ## basedir = os.environ['CMGLOCALBASEDIR']
465 ## data = LocalDataset( dataset, basedir, pattern )
466 ## else:
467 ## data = Dataset( user, dataset, pattern )
468  return data
469 
470 ### MM
471 def createMyDataset( user, dataset, pattern, dbsInstance, readcache=False):
472 
473  cachedir = '/'.join( [os.environ['HOME'],'.cmgdataset'])
474 
475  def cacheFileName(data, user, dbsInstance, pattern):
476  cf = data.replace('/','_')
477  name = '{dir}/{user}%{dbsInstance}%{name}%{pattern}.pck'.format(
478  dir = cachedir,
479  user = user,
480  dbsInstance = dbsInstance,
481  name = cf,
482  pattern = pattern)
483  return name
484 
485  def writeCache(dataset):
486  if not os.path.exists(cachedir):
487  os.mkdir(cachedir)
488  cachename = cacheFileName(dataset.name,
489  dataset.user,
490  dataset.dbsInstance,
491  dataset.pattern)
492  pckfile = open( cachename, 'w')
493  pickle.dump(dataset, pckfile)
494 
495  def readCache(data, user, dbsInstance, pattern):
496  cachename = cacheFileName(data, user, dbsInstance, pattern)
497 
498  pckfile = open( cachename)
499  dataset = pickle.load(pckfile)
500  #print 'reading cache'
501  return dataset
502 
503  if readcache:
504  try:
505  data = readCache(dataset, user, dbsInstance, pattern)
506  except IOError:
507  readcache = False
508  if not readcache:
509  if user == 'PRIVATE':
510  data = PrivateDataset( dataset, dbsInstance )
511  info = False
512  writeCache(data)
513  return data
514 ### MM
def printInfo(self)
Definition: dataset.py:325
def printInfo(self)
Definition: dataset.py:53
def listOfFiles(self)
Definition: dataset.py:83
def __init__(self, name, basedir, pattern)
Definition: dataset.py:238
def __init__(self, name, user, pattern='.*root', run_range=None, dbsInstance=None)
def init(self, name, user, pattern=&#39;.
Definition: dataset.py:26
def getPrimaryDatasetEntries(self)
Definition: dataset.py:330
def listOfGoodFiles(self)
Definition: dataset.py:87
def writeDatasetToCache(cachename, dataset)
Definition: dataset.py:422
S & print(S &os, JobReport::InputFile const &f)
Definition: JobReport.cc:66
def findPrimaryDatasetNumFiles(dataset, runmin, runmax)
Definition: dataset.py:206
def buildListOfFiles(self, pattern)
Definition: dataset.py:41
def getPrimaryDatasetEntries(self)
Definition: dataset.py:57
def getPrimaryDatasetEntries(self)
Definition: dataset.py:407
def findPrimaryDatasetEntries(dataset, dbsInstance, runmin, runmax)
Definition: dataset.py:361
def extractFileSizes(self)
Definition: dataset.py:44
def buildListOfFiles(self, pattern='.*root')
Definition: dataset.py:356
def __init__(self, value)
Definition: dataset.py:18
def extractFileSizes(self)
Definition: dataset.py:310
def createDataset(user, dataset, pattern, readcache=False, basedir=None, run_range=None)
Definition: dataset.py:430
def __init__(self, name, basedir, pattern)
Definition: dataset.py:256
def buildListOfFilesDBS(self, pattern, begin=-1, end=-1)
Definition: dataset.py:135
def listOfGoodFilesWithPrescale(self, prescale)
Definition: dataset.py:97
static std::string join(char **cmd)
Definition: RemoteFile.cc:18
def getDatasetFromCache(cachename)
Definition: dataset.py:416
primaryDatasetEntries
MM.
Definition: dataset.py:34
def buildListOfFiles(self, pattern='.*root')
Definition: dataset.py:164
def buildListOfFiles(self, pattern='.*root')
Definition: dataset.py:242
def buildListOfFiles(self, pattern='.*root')
Definition: dataset.py:275
def getPrimaryDatasetEntries(self)
Definition: dataset.py:228
def findPrimaryDatasetEntries(dataset, runmin, runmax)
Definition: dataset.py:183
def __init__(self, datasetname, dasinstance=defaultdasinstance)
Definition: dataset.py:199
def createMyDataset(user, dataset, pattern, dbsInstance, readcache=False)
if user == &#39;CMS&#39;: data = CMSDataset( dataset ) elif user == &#39;LOCAL&#39;: if basedir is None: basedir = os...
Definition: dataset.py:471
def buildListOfBadFiles(self)
Definition: dataset.py:49
def findPrimaryDatasetNumFiles(dataset, dbsInstance, runmin, runmax)
Definition: dataset.py:385
def __init__(self, name, dbsInstance=None)
Definition: dataset.py:339
def buildListOfFilesDBS(self, name, dbsInstance)
Definition: dataset.py:342
def buildListOfBadFiles(self)
Definition: dataset.py:279
def printFiles(self, abspath=True, info=True)
Definition: dataset.py:60
double split
Definition: MVATrainer.cc:139
def __init__(self, name, run_range=None)
Definition: dataset.py:132
def buildListOfFiles(self, pattern='.*root')
Definition: dataset.py:262