CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Groups Pages
edmIntegrityCheck.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 from __future__ import print_function
4 from PhysicsTools.HeppyCore.utils.edmIntegrityCheck import PublishToFileSystem, IntegrityCheck
5 import das
6 
7 import copy, os
8 
9 if __name__ == '__main__':
10 
11  from optparse import OptionParser, OptionGroup
12 
13  usage = """usage: %prog [options] /Sample/Name/On/Castor
14 
15 e.g.: %prog -u wreece -p -w 'PFAOD_*.root' /MultiJet/Run2011A-05Aug2011-v1/AOD/V2
16  """
17  das = das.DASOptionParser(usage=usage)
18  group = OptionGroup(das.parser,'edmIntegrityCheck Options','Options related to checking files on CASTOR')
19 
20  group.add_option("-d", "--device", dest="device", default='cmst3',help="The storage device to write to, e.g. 'cmst3'")
21  group.add_option("-n", "--name", dest="name", default=None,help='The name of the dataset in DAS. Will be guessed if not specified')
22  group.add_option("-p", "--printout", dest="printout", default=False, action='store_true',help='Print a report to stdout')
23  group.add_option("-r", "--recursive", dest="resursive", default=False, action='store_true',help='Walk the mass storage device recursively')
24  group.add_option("-u", "--user", dest="user", default=os.environ['USER'],help='The username to use when looking at mass storage devices')
25  group.add_option("-w", "--wildcard", dest="wildcard", default=None,help='A UNIX style wildcard to specify which files to check')
26  group.add_option("--update", dest="update", default=False, action='store_true',help='Only update the status of corrupted files')
27  group.add_option("-t","--timeout", dest="timeout", default=-1, type=int, help='Set a timeout on the edmFileUtil calls')
28  group.add_option("--min-run", dest="min_run", default=-1, type=int, help='When querying DBS, require runs >= than this run')
29  group.add_option("--max-run", dest="max_run", default=-1, type=int, help='When querying DBS, require runs <= than this run')
30  group.add_option("--max_threads", dest="max_threads", default=None,help='The maximum number of threads to use')
31  das.parser.add_option_group(group)
32  (opts, datasets) = das.get_opt()
33 
34  if len(datasets)==0:
35  print(das.parser.print_help())
36  print()
37  print('need to provide a dataset in argument')
38 
39  def work(d,op):
40  tokens = d.split('%')
41  if len(tokens) == 2:
42  op.user = tokens[0]
43  d = tokens[1]
44 
45  check = IntegrityCheck(d,op)
46  pub = PublishToFileSystem(check)
47 
48  previous = None
49  if op.update:
50  previous = pub.get(check.directory)
51 
52  check.test(previous = previous, timeout = op.timeout)
53  if op.printout:
54  check.report()
55  report = check.structured()
56  pub.publish(report)
57 
58  return d
59 
60  def callback(result):
61  print('Checking thread done: ',str(result))
62 
63  #submit the main work in a multi-threaded way
64 
65  if len(datasets) == 1:
66  d = datasets[0]
67  work(d, copy.deepcopy(opts))
68  else:
69  import multiprocessing
70  if opts.max_threads is not None and opts.max_threads:
71  opts.max_threads = int(opts.max_threads)
72  pool = multiprocessing.Pool(processes=opts.max_threads)
73 
74  for d in datasets:
75  pool.apply_async(work, args=(d,copy.deepcopy(opts)),callback=callback)
76  pool.close()
77  pool.join()
void print(TMatrixD &m, const char *label=nullptr, bool mathematicaFormat=false)
Definition: Utilities.cc:47
#define str(s)